Skip to content

Commit

Permalink
Avoid blocking job in full/partial sync (selectdb#201)
Browse files Browse the repository at this point in the history
  • Loading branch information
w41ter authored Oct 25, 2024
1 parent ec068c1 commit 9c441d6
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 95 deletions.
104 changes: 42 additions & 62 deletions pkg/ccr/base/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ type BackupInfo struct {
State BackupState
StateStr string
SnapshotName string
Status string
CreateTime string // 2024-10-22 06:27:06
}

Expand All @@ -175,11 +176,17 @@ func parseBackupInfo(parser *utils.RowParser) (*BackupInfo, error) {
return nil, xerror.Wrap(err, xerror.Normal, "parse backup CreateTime failed")
}

status, err := parser.GetString("Status")
if err != nil {
return nil, xerror.Wrap(err, xerror.Normal, "parse backup Status failed")
}

info := &BackupInfo{
State: ParseBackupState(stateStr),
StateStr: stateStr,
SnapshotName: snapshotName,
CreateTime: createTime,
Status: status,
}
return info, nil
}
Expand Down Expand Up @@ -621,7 +628,7 @@ func (s *Spec) CheckTableExistsByName(tableName string) (bool, error) {
}

// mysql> BACKUP SNAPSHOT ccr.snapshot_20230605 TO `__keep_on_local__` ON ( src_1 ) PROPERTIES ("type" = "full");
func (s *Spec) CreateSnapshotAndWaitForDone(tables []string) (string, error) {
func (s *Spec) CreateSnapshot(tables []string) (string, error) {
if tables == nil {
tables = make([]string, 0)
}
Expand Down Expand Up @@ -662,20 +669,11 @@ func (s *Spec) CreateSnapshotAndWaitForDone(tables []string) (string, error) {
return "", xerror.Wrapf(err, xerror.Normal, "backup snapshot %s failed, sql: %s", snapshotName, backupSnapshotSql)
}

backupFinished, err := s.CheckBackupFinished(snapshotName)
if err != nil {
return "", err
}
if !backupFinished {
err = xerror.Errorf(xerror.Normal, "check backup state timeout, max try times: %d, sql: %s", MAX_CHECK_RETRY_TIMES, backupSnapshotSql)
return "", err
}

return snapshotName, nil
}

// mysql> BACKUP SNAPSHOT ccr.snapshot_20230605 TO `__keep_on_local__` ON (src_1 PARTITION (`p1`)) PROPERTIES ("type" = "full");
func (s *Spec) CreatePartialSnapshotAndWaitForDone(table string, partitions []string) (string, error) {
func (s *Spec) CreatePartialSnapshot(table string, partitions []string) (string, error) {
if len(table) == 0 {
return "", xerror.Errorf(xerror.Normal, "source db is empty! you should have at least one table")
}
Expand Down Expand Up @@ -705,73 +703,60 @@ func (s *Spec) CreatePartialSnapshotAndWaitForDone(table string, partitions []st
return "", xerror.Wrapf(err, xerror.Normal, "backup partial snapshot %s failed, sql: %s", snapshotName, backupSnapshotSql)
}

backupFinished, err := s.CheckBackupFinished(snapshotName)
if err != nil {
return "", err
}
if !backupFinished {
err = xerror.Errorf(xerror.Normal, "check backup state timeout, max try times: %d, sql: %s", MAX_CHECK_RETRY_TIMES, backupSnapshotSql)
return "", err
}

return snapshotName, nil
}

// TODO: Add TaskErrMsg
func (s *Spec) checkBackupFinished(snapshotName string) (BackupState, error) {
func (s *Spec) checkBackupFinished(snapshotName string) (BackupState, string, error) {
log.Debugf("check backup state of snapshot %s", snapshotName)

db, err := s.Connect()
if err != nil {
return BackupStateUnknown, err
return BackupStateUnknown, "", err
}

sql := fmt.Sprintf("SHOW BACKUP FROM %s WHERE SnapshotName = \"%s\"", utils.FormatKeywordName(s.Database), snapshotName)
log.Debugf("check backup state sql: %s", sql)
rows, err := db.Query(sql)
if err != nil {
return BackupStateUnknown, xerror.Wrapf(err, xerror.Normal, "show backup failed, sql: %s", sql)
return BackupStateUnknown, "", xerror.Wrapf(err, xerror.Normal, "show backup failed, sql: %s", sql)
}
defer rows.Close()

if rows.Next() {
rowParser := utils.NewRowParser()
if err := rowParser.Parse(rows); err != nil {
return BackupStateUnknown, xerror.Wrap(err, xerror.Normal, sql)
return BackupStateUnknown, "", xerror.Wrap(err, xerror.Normal, sql)
}

info, err := parseBackupInfo(rowParser)
if err != nil {
return BackupStateUnknown, xerror.Wrap(err, xerror.Normal, sql)
return BackupStateUnknown, "", xerror.Wrap(err, xerror.Normal, sql)
}

log.Infof("check snapshot %s backup state: [%v]", snapshotName, info.StateStr)
return info.State, nil
return info.State, info.Status, nil
}
return BackupStateUnknown, xerror.Errorf(xerror.Normal, "no backup state found, sql: %s", sql)
return BackupStateUnknown, "", xerror.Errorf(xerror.Normal, "no backup state found, sql: %s", sql)
}

func (s *Spec) CheckBackupFinished(snapshotName string) (bool, error) {
log.Debugf("check backup state, spec: %s, snapshot: %s", s.String(), snapshotName)

for i := 0; i < MAX_CHECK_RETRY_TIMES; i++ {
// Retry network related error to avoid full sync when the target network is interrupted, process is restarted.
if backupState, err := s.checkBackupFinished(snapshotName); err != nil && !isNetworkRelated(err) {
return false, err
} else if err == nil && backupState == BackupStateFinished {
return true, nil
} else if err == nil && backupState == BackupStateCancelled {
return false, xerror.Errorf(xerror.Normal, "backup failed or canceled")
} else {
// BackupStatePending, BackupStateUnknown or network related errors.
if err != nil {
log.Warnf("check backup state is failed, spec: %s, snapshot: %s, err: %v", s.String(), snapshotName, err)
}
time.Sleep(BACKUP_CHECK_DURATION)
// Retry network related error to avoid full sync when the target network is interrupted, process is restarted.
if backupState, status, err := s.checkBackupFinished(snapshotName); err != nil && !isNetworkRelated(err) {
return false, err
} else if err == nil && backupState == BackupStateFinished {
return true, nil
} else if err == nil && backupState == BackupStateCancelled {
return false, xerror.Errorf(xerror.Normal, "backup failed or canceled, backup status: %s", status)
} else {
// BackupStatePending, BackupStateUnknown or network related errors.
if err != nil {
log.Warnf("check backup state is failed, spec: %s, snapshot: %s, err: %v", s.String(), snapshotName, err)
}
return false, nil
}

return false, xerror.Errorf(xerror.Normal, "check backup state timeout, max try times: %d", MAX_CHECK_RETRY_TIMES)
}

func (s *Spec) CancelBackupIfExists() error {
Expand Down Expand Up @@ -913,27 +898,22 @@ func (s *Spec) checkRestoreFinished(snapshotName string) (RestoreState, string,
func (s *Spec) CheckRestoreFinished(snapshotName string) (bool, error) {
log.Debugf("check restore state is finished, spec: %s, snapshot: %s", s.String(), snapshotName)

for i := 0; i < MAX_CHECK_RETRY_TIMES; i++ {
// Retry network related error to avoid full sync when the target network is interrupted, process is restarted.
if restoreState, status, err := s.checkRestoreFinished(snapshotName); err != nil && !isNetworkRelated(err) {
return false, err
} else if err == nil && restoreState == RestoreStateFinished {
return true, nil
} else if err == nil && restoreState == RestoreStateCancelled && strings.Contains(status, SIGNATURE_NOT_MATCHED) {
return false, xerror.XWrapf(ErrRestoreSignatureNotMatched, "restore failed, spec: %s, snapshot: %s, status: %s", s.String(), snapshotName, status)
} else if err == nil && restoreState == RestoreStateCancelled {
return false, xerror.Errorf(xerror.Normal, "restore failed or canceled, spec: %s, snapshot: %s, status: %s", s.String(), snapshotName, status)
} else {
// RestoreStatePending, RestoreStateUnknown or network error.
if err != nil {
log.Warnf("check restore state is failed, spec: %s, snapshot: %s, err: %v", s.String(), snapshotName, err)
}
time.Sleep(RESTORE_CHECK_DURATION)
// Retry network related error to avoid full sync when the target network is interrupted, process is restarted.
if restoreState, status, err := s.checkRestoreFinished(snapshotName); err != nil && !isNetworkRelated(err) {
return false, err
} else if err == nil && restoreState == RestoreStateFinished {
return true, nil
} else if err == nil && restoreState == RestoreStateCancelled && strings.Contains(status, SIGNATURE_NOT_MATCHED) {
return false, xerror.XWrapf(ErrRestoreSignatureNotMatched, "restore failed, spec: %s, snapshot: %s, status: %s", s.String(), snapshotName, status)
} else if err == nil && restoreState == RestoreStateCancelled {
return false, xerror.Errorf(xerror.Normal, "restore failed or canceled, spec: %s, snapshot: %s, status: %s", s.String(), snapshotName, status)
} else {
// RestoreStatePending, RestoreStateUnknown or network error.
if err != nil {
log.Warnf("check restore state is failed, spec: %s, snapshot: %s, err: %v", s.String(), snapshotName, err)
}
return false, nil
}

log.Warnf("check restore state timeout, max try times: %d, spec: %s, snapshot: %s", MAX_CHECK_RETRY_TIMES, s, snapshotName)
return false, nil
}

func (s *Spec) GetRestoreSignatureNotMatchedTableOrView(snapshotName string) (string, bool, error) {
Expand Down
5 changes: 3 additions & 2 deletions pkg/ccr/base/specer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ type Specer interface {
CheckDatabaseExists() (bool, error)
CheckTableExists() (bool, error)
CheckTableExistsByName(tableName string) (bool, error)
CreatePartialSnapshotAndWaitForDone(table string, partitions []string) (string, error)
CreateSnapshotAndWaitForDone(tables []string) (string, error)
CreatePartialSnapshot(table string, partitions []string) (string, error)
CreateSnapshot(tables []string) (string, error)
CheckBackupFinished(snapshotName string) (bool, error)
CancelBackupIfExists() error
CancelRestoreIfExists(srcDbName string) error
CheckRestoreFinished(snapshotName string) (bool, error)
Expand Down
Loading

0 comments on commit 9c441d6

Please sign in to comment.