Skip to content

Commit

Permalink
Retry handle upsert binlog, to fix meta error (selectdb#282)
Browse files Browse the repository at this point in the history
  • Loading branch information
w41ter authored Nov 29, 2024
1 parent b22f946 commit 75a5431
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 1 deletion.
12 changes: 11 additions & 1 deletion pkg/ccr/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1426,6 +1426,16 @@ func (j *Job) ingestBinlogForTxnInsert(txnId int64, tableRecords []*record.Table
return subTxnInfos, nil
}

func (j *Job) handleUpsertWithRetry(binlog *festruct.TBinlog) error {
err := j.handleUpsert(binlog)
if !xerror.IsCategory(err, xerror.Meta) {
return err
}

log.Warnf("a meta error occurred, retry to handle upsert binlog again, commitSeq: %d", binlog.GetCommitSeq())
return j.handleUpsert(binlog)
}

func (j *Job) handleUpsert(binlog *festruct.TBinlog) error {
log.Infof("handle upsert binlog, sub sync state: %s, prevCommitSeq: %d, commitSeq: %d",
j.progress.SubSyncState, j.progress.PrevCommitSeq, j.progress.CommitSeq)
Expand Down Expand Up @@ -2707,7 +2717,7 @@ func (j *Job) handleBinlog(binlog *festruct.TBinlog) error {

switch binlog.GetType() {
case festruct.TBinlogType_UPSERT:
return j.handleUpsert(binlog)
return j.handleUpsertWithRetry(binlog)
case festruct.TBinlogType_ADD_PARTITION:
return j.handleAddPartition(binlog)
case festruct.TBinlogType_CREATE_TABLE:
Expand Down
12 changes: 12 additions & 0 deletions pkg/xerror/xerror.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,3 +231,15 @@ func WithStack(err error) error {
callers(4),
}
}

func IsCategory(err error, category ErrorCategory) bool {
if err == nil {
return false
}

if xerr, ok := err.(*XError); ok {
return xerr.category == category
}

return false
}

0 comments on commit 75a5431

Please sign in to comment.