From 9c441d637c00fdec81c92db7e84edc48c6694f41 Mon Sep 17 00:00:00 2001 From: walter Date: Fri, 25 Oct 2024 16:40:56 +0800 Subject: [PATCH] Avoid blocking job in full/partial sync (#201) --- pkg/ccr/base/spec.go | 104 ++++++++++++++++---------------------- pkg/ccr/base/specer.go | 5 +- pkg/ccr/job.go | 107 +++++++++++++++++++++++++++++----------- pkg/ccr/job_progress.go | 2 + pkg/rpc/be.go | 3 +- 5 files changed, 126 insertions(+), 95 deletions(-) diff --git a/pkg/ccr/base/spec.go b/pkg/ccr/base/spec.go index 91be0fa3..835fbca2 100644 --- a/pkg/ccr/base/spec.go +++ b/pkg/ccr/base/spec.go @@ -156,6 +156,7 @@ type BackupInfo struct { State BackupState StateStr string SnapshotName string + Status string CreateTime string // 2024-10-22 06:27:06 } @@ -175,11 +176,17 @@ func parseBackupInfo(parser *utils.RowParser) (*BackupInfo, error) { return nil, xerror.Wrap(err, xerror.Normal, "parse backup CreateTime failed") } + status, err := parser.GetString("Status") + if err != nil { + return nil, xerror.Wrap(err, xerror.Normal, "parse backup Status failed") + } + info := &BackupInfo{ State: ParseBackupState(stateStr), StateStr: stateStr, SnapshotName: snapshotName, CreateTime: createTime, + Status: status, } return info, nil } @@ -621,7 +628,7 @@ func (s *Spec) CheckTableExistsByName(tableName string) (bool, error) { } // mysql> BACKUP SNAPSHOT ccr.snapshot_20230605 TO `__keep_on_local__` ON ( src_1 ) PROPERTIES ("type" = "full"); -func (s *Spec) CreateSnapshotAndWaitForDone(tables []string) (string, error) { +func (s *Spec) CreateSnapshot(tables []string) (string, error) { if tables == nil { tables = make([]string, 0) } @@ -662,20 +669,11 @@ func (s *Spec) CreateSnapshotAndWaitForDone(tables []string) (string, error) { return "", xerror.Wrapf(err, xerror.Normal, "backup snapshot %s failed, sql: %s", snapshotName, backupSnapshotSql) } - backupFinished, err := s.CheckBackupFinished(snapshotName) - if err != nil { - return "", err - } - if !backupFinished { - err = xerror.Errorf(xerror.Normal, "check backup state timeout, max try times: %d, sql: %s", MAX_CHECK_RETRY_TIMES, backupSnapshotSql) - return "", err - } - return snapshotName, nil } // mysql> BACKUP SNAPSHOT ccr.snapshot_20230605 TO `__keep_on_local__` ON (src_1 PARTITION (`p1`)) PROPERTIES ("type" = "full"); -func (s *Spec) CreatePartialSnapshotAndWaitForDone(table string, partitions []string) (string, error) { +func (s *Spec) CreatePartialSnapshot(table string, partitions []string) (string, error) { if len(table) == 0 { return "", xerror.Errorf(xerror.Normal, "source db is empty! you should have at least one table") } @@ -705,73 +703,60 @@ func (s *Spec) CreatePartialSnapshotAndWaitForDone(table string, partitions []st return "", xerror.Wrapf(err, xerror.Normal, "backup partial snapshot %s failed, sql: %s", snapshotName, backupSnapshotSql) } - backupFinished, err := s.CheckBackupFinished(snapshotName) - if err != nil { - return "", err - } - if !backupFinished { - err = xerror.Errorf(xerror.Normal, "check backup state timeout, max try times: %d, sql: %s", MAX_CHECK_RETRY_TIMES, backupSnapshotSql) - return "", err - } - return snapshotName, nil } // TODO: Add TaskErrMsg -func (s *Spec) checkBackupFinished(snapshotName string) (BackupState, error) { +func (s *Spec) checkBackupFinished(snapshotName string) (BackupState, string, error) { log.Debugf("check backup state of snapshot %s", snapshotName) db, err := s.Connect() if err != nil { - return BackupStateUnknown, err + return BackupStateUnknown, "", err } sql := fmt.Sprintf("SHOW BACKUP FROM %s WHERE SnapshotName = \"%s\"", utils.FormatKeywordName(s.Database), snapshotName) log.Debugf("check backup state sql: %s", sql) rows, err := db.Query(sql) if err != nil { - return BackupStateUnknown, xerror.Wrapf(err, xerror.Normal, "show backup failed, sql: %s", sql) + return BackupStateUnknown, "", xerror.Wrapf(err, xerror.Normal, "show backup failed, sql: %s", sql) } defer rows.Close() if rows.Next() { rowParser := utils.NewRowParser() if err := rowParser.Parse(rows); err != nil { - return BackupStateUnknown, xerror.Wrap(err, xerror.Normal, sql) + return BackupStateUnknown, "", xerror.Wrap(err, xerror.Normal, sql) } info, err := parseBackupInfo(rowParser) if err != nil { - return BackupStateUnknown, xerror.Wrap(err, xerror.Normal, sql) + return BackupStateUnknown, "", xerror.Wrap(err, xerror.Normal, sql) } log.Infof("check snapshot %s backup state: [%v]", snapshotName, info.StateStr) - return info.State, nil + return info.State, info.Status, nil } - return BackupStateUnknown, xerror.Errorf(xerror.Normal, "no backup state found, sql: %s", sql) + return BackupStateUnknown, "", xerror.Errorf(xerror.Normal, "no backup state found, sql: %s", sql) } func (s *Spec) CheckBackupFinished(snapshotName string) (bool, error) { log.Debugf("check backup state, spec: %s, snapshot: %s", s.String(), snapshotName) - for i := 0; i < MAX_CHECK_RETRY_TIMES; i++ { - // Retry network related error to avoid full sync when the target network is interrupted, process is restarted. - if backupState, err := s.checkBackupFinished(snapshotName); err != nil && !isNetworkRelated(err) { - return false, err - } else if err == nil && backupState == BackupStateFinished { - return true, nil - } else if err == nil && backupState == BackupStateCancelled { - return false, xerror.Errorf(xerror.Normal, "backup failed or canceled") - } else { - // BackupStatePending, BackupStateUnknown or network related errors. - if err != nil { - log.Warnf("check backup state is failed, spec: %s, snapshot: %s, err: %v", s.String(), snapshotName, err) - } - time.Sleep(BACKUP_CHECK_DURATION) + // Retry network related error to avoid full sync when the target network is interrupted, process is restarted. + if backupState, status, err := s.checkBackupFinished(snapshotName); err != nil && !isNetworkRelated(err) { + return false, err + } else if err == nil && backupState == BackupStateFinished { + return true, nil + } else if err == nil && backupState == BackupStateCancelled { + return false, xerror.Errorf(xerror.Normal, "backup failed or canceled, backup status: %s", status) + } else { + // BackupStatePending, BackupStateUnknown or network related errors. + if err != nil { + log.Warnf("check backup state is failed, spec: %s, snapshot: %s, err: %v", s.String(), snapshotName, err) } + return false, nil } - - return false, xerror.Errorf(xerror.Normal, "check backup state timeout, max try times: %d", MAX_CHECK_RETRY_TIMES) } func (s *Spec) CancelBackupIfExists() error { @@ -913,27 +898,22 @@ func (s *Spec) checkRestoreFinished(snapshotName string) (RestoreState, string, func (s *Spec) CheckRestoreFinished(snapshotName string) (bool, error) { log.Debugf("check restore state is finished, spec: %s, snapshot: %s", s.String(), snapshotName) - for i := 0; i < MAX_CHECK_RETRY_TIMES; i++ { - // Retry network related error to avoid full sync when the target network is interrupted, process is restarted. - if restoreState, status, err := s.checkRestoreFinished(snapshotName); err != nil && !isNetworkRelated(err) { - return false, err - } else if err == nil && restoreState == RestoreStateFinished { - return true, nil - } else if err == nil && restoreState == RestoreStateCancelled && strings.Contains(status, SIGNATURE_NOT_MATCHED) { - return false, xerror.XWrapf(ErrRestoreSignatureNotMatched, "restore failed, spec: %s, snapshot: %s, status: %s", s.String(), snapshotName, status) - } else if err == nil && restoreState == RestoreStateCancelled { - return false, xerror.Errorf(xerror.Normal, "restore failed or canceled, spec: %s, snapshot: %s, status: %s", s.String(), snapshotName, status) - } else { - // RestoreStatePending, RestoreStateUnknown or network error. - if err != nil { - log.Warnf("check restore state is failed, spec: %s, snapshot: %s, err: %v", s.String(), snapshotName, err) - } - time.Sleep(RESTORE_CHECK_DURATION) + // Retry network related error to avoid full sync when the target network is interrupted, process is restarted. + if restoreState, status, err := s.checkRestoreFinished(snapshotName); err != nil && !isNetworkRelated(err) { + return false, err + } else if err == nil && restoreState == RestoreStateFinished { + return true, nil + } else if err == nil && restoreState == RestoreStateCancelled && strings.Contains(status, SIGNATURE_NOT_MATCHED) { + return false, xerror.XWrapf(ErrRestoreSignatureNotMatched, "restore failed, spec: %s, snapshot: %s, status: %s", s.String(), snapshotName, status) + } else if err == nil && restoreState == RestoreStateCancelled { + return false, xerror.Errorf(xerror.Normal, "restore failed or canceled, spec: %s, snapshot: %s, status: %s", s.String(), snapshotName, status) + } else { + // RestoreStatePending, RestoreStateUnknown or network error. + if err != nil { + log.Warnf("check restore state is failed, spec: %s, snapshot: %s, err: %v", s.String(), snapshotName, err) } + return false, nil } - - log.Warnf("check restore state timeout, max try times: %d, spec: %s, snapshot: %s", MAX_CHECK_RETRY_TIMES, s, snapshotName) - return false, nil } func (s *Spec) GetRestoreSignatureNotMatchedTableOrView(snapshotName string) (string, bool, error) { diff --git a/pkg/ccr/base/specer.go b/pkg/ccr/base/specer.go index 541c74dc..ad00e290 100644 --- a/pkg/ccr/base/specer.go +++ b/pkg/ccr/base/specer.go @@ -25,8 +25,9 @@ type Specer interface { CheckDatabaseExists() (bool, error) CheckTableExists() (bool, error) CheckTableExistsByName(tableName string) (bool, error) - CreatePartialSnapshotAndWaitForDone(table string, partitions []string) (string, error) - CreateSnapshotAndWaitForDone(tables []string) (string, error) + CreatePartialSnapshot(table string, partitions []string) (string, error) + CreateSnapshot(tables []string) (string, error) + CheckBackupFinished(snapshotName string) (bool, error) CancelBackupIfExists() error CancelRestoreIfExists(srcDbName string) error CheckRestoreFinished(snapshotName string) (bool, error) diff --git a/pkg/ccr/job.go b/pkg/ccr/job.go index 9ca37580..2a658c4b 100644 --- a/pkg/ccr/job.go +++ b/pkg/ccr/job.go @@ -344,6 +344,7 @@ func (j *Job) partialSync() error { SnapshotName string `json:"snapshot_name"` SnapshotResp *festruct.TGetSnapshotResult_ `json:"snapshot_resp"` TableCommitSeqMap map[int64]int64 `json:"table_commit_seq_map"` + RestoreLabel string `json:"restore_label"` } if j.progress.PartialSyncData == nil { @@ -369,15 +370,30 @@ func (j *Job) partialSync() error { } } - snapshotName, err := j.ISrc.CreatePartialSnapshotAndWaitForDone(table, partitions) + snapshotName, err := j.ISrc.CreatePartialSnapshot(table, partitions) if err != nil { return err } + j.progress.NextSubVolatile(WaitBackupDone, snapshotName) + + case WaitBackupDone: + // Step 2: Wait backup job done + snapshotName := j.progress.InMemoryData.(string) + backupFinished, err := j.ISrc.CheckBackupFinished(snapshotName) + if err != nil { + return err + } + + if !backupFinished { + log.Debugf("partial sync status: backup job %s is running, retry later", snapshotName) + return nil + } + j.progress.NextSubCheckpoint(GetSnapshotInfo, snapshotName) case GetSnapshotInfo: - // Step 2: Get snapshot info + // Step 3: Get snapshot info log.Infof("partial sync status: get snapshot info") snapshotName := j.progress.PersistData @@ -427,7 +443,7 @@ func (j *Job) partialSync() error { j.progress.NextSubVolatile(AddExtraInfo, inMemoryData) case AddExtraInfo: - // Step 3: Add extra info + // Step 4: Add extra info log.Infof("partial sync status: add extra info") inMemoryData := j.progress.InMemoryData.(*inMemoryData) @@ -455,7 +471,7 @@ func (j *Job) partialSync() error { j.progress.NextSubCheckpoint(RestoreSnapshot, inMemoryData) case RestoreSnapshot: - // Step 4: Restore snapshot + // Step 5: Restore snapshot log.Infof("partial sync status: restore snapshot") if j.progress.InMemoryData == nil { @@ -467,20 +483,20 @@ func (j *Job) partialSync() error { j.progress.InMemoryData = inMemoryData } - // Step 4.1: cancel the running restore job which submitted by former progress, if exists + // Step 5.1: cancel the running restore job which submitted by former progress, if exists if featureCancelConflictBackupRestoreJob { if err := j.IDest.CancelRestoreIfExists(j.Src.Database); err != nil { return err } } - // Step 4.2: start a new fullsync && persist + // Step 5.2: start a new fullsync && persist inMemoryData := j.progress.InMemoryData.(*inMemoryData) snapshotName := inMemoryData.SnapshotName restoreSnapshotName := restoreSnapshotName(snapshotName) snapshotResp := inMemoryData.SnapshotResp - // Step 4.3: restore snapshot to dest + // Step 5.3: restore snapshot to dest dest := &j.Dest destRpc, err := j.factory.NewFeRpc(dest) if err != nil { @@ -527,22 +543,29 @@ func (j *Job) partialSync() error { return xerror.Errorf(xerror.Normal, "restore snapshot failed, status: %v", restoreResp.Status) } log.Infof("partial sync restore snapshot resp: %v", restoreResp) + inMemoryData.RestoreLabel = restoreSnapshotName - for { - restoreFinished, err := j.IDest.CheckRestoreFinished(restoreSnapshotName) - if err != nil { - return err - } + j.progress.NextSubVolatile(WaitRestoreDone, inMemoryData) - if restoreFinished { - j.progress.NextSubCheckpoint(PersistRestoreInfo, restoreSnapshotName) - break - } - // retry for MAX_CHECK_RETRY_TIMES, timeout, continue + case WaitRestoreDone: + // Step 6: Wait restore job done + inMemoryData := j.progress.InMemoryData.(*inMemoryData) + restoreSnapshotName := inMemoryData.RestoreLabel + + restoreFinished, err := j.IDest.CheckRestoreFinished(restoreSnapshotName) + if err != nil { + return err } + if !restoreFinished { + log.Debugf("partial sync status: restore job %s is running", restoreSnapshotName) + return nil + } + + j.progress.NextSubCheckpoint(PersistRestoreInfo, restoreSnapshotName) + case PersistRestoreInfo: - // Step 5: Update job progress && dest table id + // Step 7: Update job progress && dest table id // update job info, only for dest table id var targetName = table if alias, ok := j.progress.TableAliases[table]; ok { @@ -610,6 +633,7 @@ func (j *Job) fullSync() error { SnapshotName string `json:"snapshot_name"` SnapshotResp *festruct.TGetSnapshotResult_ `json:"snapshot_resp"` TableCommitSeqMap map[int64]int64 `json:"table_commit_seq_map"` + RestoreLabel string `json:"restore_label"` } switch j.progress.SubSyncState { @@ -645,15 +669,28 @@ func (j *Job) fullSync() error { } } - snapshotName, err := j.ISrc.CreateSnapshotAndWaitForDone(backupTableList) + snapshotName, err := j.ISrc.CreateSnapshot(backupTableList) + if err != nil { + return err + } + j.progress.NextSubVolatile(WaitBackupDone, snapshotName) + + case WaitBackupDone: + // Step 2: Wait backup job done + snapshotName := j.progress.InMemoryData.(string) + backupFinished, err := j.ISrc.CheckBackupFinished(snapshotName) if err != nil { return err } + if !backupFinished { + log.Debugf("fullsync status: backup job %s is running, retry later", snapshotName) + return nil + } j.progress.NextSubCheckpoint(GetSnapshotInfo, snapshotName) case GetSnapshotInfo: - // Step 2: Get snapshot info + // Step 3: Get snapshot info log.Infof("fullsync status: get snapshot info") snapshotName := j.progress.PersistData @@ -698,7 +735,7 @@ func (j *Job) fullSync() error { j.progress.NextSubVolatile(AddExtraInfo, inMemoryData) case AddExtraInfo: - // Step 3: Add extra info + // Step 4: Add extra info log.Infof("fullsync status: add extra info") inMemoryData := j.progress.InMemoryData.(*inMemoryData) @@ -729,7 +766,7 @@ func (j *Job) fullSync() error { j.progress.CommitNextSubWithPersist(commitSeq, RestoreSnapshot, inMemoryData) case RestoreSnapshot: - // Step 4: Restore snapshot + // Step 5: Restore snapshot log.Infof("fullsync status: restore snapshot") if j.progress.InMemoryData == nil { @@ -741,20 +778,20 @@ func (j *Job) fullSync() error { j.progress.InMemoryData = inMemoryData } - // Step 4.1: cancel the running restore job which by the former process, if exists + // Step 5.1: cancel the running restore job which by the former process, if exists if featureCancelConflictBackupRestoreJob { if err := j.IDest.CancelRestoreIfExists(j.Src.Database); err != nil { return err } } - // Step 4.2: start a new fullsync && persist + // Step 5.2: start a new fullsync && persist inMemoryData := j.progress.InMemoryData.(*inMemoryData) snapshotName := inMemoryData.SnapshotName restoreSnapshotName := restoreSnapshotName(snapshotName) snapshotResp := inMemoryData.SnapshotResp - // Step 4.3: restore snapshot to dest + // Step 5.3: restore snapshot to dest dest := &j.Dest destRpc, err := j.factory.NewFeRpc(dest) if err != nil { @@ -811,6 +848,14 @@ func (j *Job) fullSync() error { } log.Infof("resp: %v", restoreResp) + inMemoryData.RestoreLabel = restoreSnapshotName + j.progress.NextSubVolatile(WaitRestoreDone, inMemoryData) + + case WaitRestoreDone: + // Step 6: Wait restore job done + inMemoryData := j.progress.InMemoryData.(*inMemoryData) + restoreSnapshotName := inMemoryData.RestoreLabel + for { restoreFinished, err := j.IDest.CheckRestoreFinished(restoreSnapshotName) if err != nil && errors.Is(err, base.ErrRestoreSignatureNotMatched) { @@ -856,15 +901,17 @@ func (j *Job) fullSync() error { return err } - if restoreFinished { - j.progress.NextSubCheckpoint(PersistRestoreInfo, restoreSnapshotName) - break + if !restoreFinished { + log.Debugf("fullsync status: restore job %s is running, retry later", restoreSnapshotName) + return nil } - // retry for MAX_CHECK_RETRY_TIMES, timeout, continue + + j.progress.NextSubCheckpoint(PersistRestoreInfo, restoreSnapshotName) + break } case PersistRestoreInfo: - // Step 5: Update job progress && dest table id + // Step 7: Update job progress && dest table id // update job info, only for dest table id if len(j.progress.TableAliases) > 0 { diff --git a/pkg/ccr/job_progress.go b/pkg/ccr/job_progress.go index 82109684..62788dcf 100644 --- a/pkg/ccr/job_progress.go +++ b/pkg/ccr/job_progress.go @@ -95,6 +95,8 @@ var ( AddExtraInfo SubSyncState = SubSyncState{State: 2, BinlogType: BinlogNone} RestoreSnapshot SubSyncState = SubSyncState{State: 3, BinlogType: BinlogNone} PersistRestoreInfo SubSyncState = SubSyncState{State: 4, BinlogType: BinlogNone} + WaitBackupDone SubSyncState = SubSyncState{State: 5, BinlogType: BinlogNone} + WaitRestoreDone SubSyncState = SubSyncState{State: 6, BinlogType: BinlogNone} BeginTransaction SubSyncState = SubSyncState{State: 11, BinlogType: BinlogUpsert} IngestBinlog SubSyncState = SubSyncState{State: 12, BinlogType: BinlogUpsert} diff --git a/pkg/rpc/be.go b/pkg/rpc/be.go index cf8b012a..bec3d244 100644 --- a/pkg/rpc/be.go +++ b/pkg/rpc/be.go @@ -26,7 +26,8 @@ func (beRpc *BeRpc) IngestBinlog(req *bestruct.TIngestBinlogRequest) (*bestruct. client := beRpc.client if result, err := client.IngestBinlog(context.Background(), req); err != nil { - return nil, xerror.Wrapf(err, xerror.Normal, "IngestBinlog error: %v", err) + return nil, xerror.Wrapf(err, xerror.Normal, + "IngestBinlog error: %v, txnId: %d, be: %v", err, req.GetTxnId(), beRpc.backend) } else { return result, nil }