Skip to content

Commit

Permalink
Fix binlog lost (#86)
Browse files Browse the repository at this point in the history
  • Loading branch information
w41ter authored May 24, 2024
1 parent c919829 commit dac089f
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 4 deletions.
4 changes: 3 additions & 1 deletion pkg/ccr/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1186,7 +1186,8 @@ func (j *Job) recoverIncrementalSync() error {

func (j *Job) incrementalSync() error {
if !j.progress.IsDone() {
log.Infof("job progress is not done, state is (%s), need recover", j.progress.SubSyncState)
log.Infof("job progress is not done, need recover. state: %s, prevCommitSeq: %d, commitSeq: %d",
j.progress.SubSyncState, j.progress.PrevCommitSeq, j.progress.CommitSeq)

return j.recoverIncrementalSync()
}
Expand All @@ -1202,6 +1203,7 @@ func (j *Job) incrementalSync() error {

// Step 2: handle all binlog
for {
// The CommitSeq is equals to PrevCommitSeq in here.
commitSeq := j.progress.CommitSeq
log.Debugf("src: %s, commitSeq: %v", src, commitSeq)

Expand Down
6 changes: 3 additions & 3 deletions pkg/ccr/job_progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,6 @@ func NewJobProgressFromJson(jobName string, db storage.DB) (*JobProgress, error)
}

func (j *JobProgress) StartHandle(commitSeq int64) {
j.PrevCommitSeq = j.CommitSeq
j.CommitSeq = commitSeq

j.Persist()
Expand Down Expand Up @@ -284,7 +283,7 @@ func (j *JobProgress) NextWithPersist(commitSeq int64, syncState SyncState, subS
j.Persist()
}

func (j *JobProgress) IsDone() bool { return j.SubSyncState == Done }
func (j *JobProgress) IsDone() bool { return j.SubSyncState == Done && j.PrevCommitSeq == j.CommitSeq }

// TODO(Drogon): check reset some fields
func (j *JobProgress) Done() {
Expand Down Expand Up @@ -338,5 +337,6 @@ func (j *JobProgress) Persist() {
break
}

log.Trace("update job progress done")
log.Tracef("update job progress done, state: %s, subState: %s, commitSeq: %d, prevCommitSeq: %d",
j.SyncState, j.SubSyncState, j.CommitSeq, j.PrevCommitSeq)
}

0 comments on commit dac089f

Please sign in to comment.