Skip to content

Commit

Permalink
Support partial snapshot to reduce fullsync overhead (selectdb#125)
Browse files Browse the repository at this point in the history
  • Loading branch information
w41ter authored Aug 5, 2024
1 parent d2564c1 commit bf8d9ee
Show file tree
Hide file tree
Showing 9 changed files with 1,040 additions and 26 deletions.
42 changes: 42 additions & 0 deletions pkg/ccr/base/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,48 @@ func (s *Spec) CreateSnapshotAndWaitForDone(tables []string) (string, error) {
return snapshotName, nil
}

// mysql> BACKUP SNAPSHOT ccr.snapshot_20230605 TO `__keep_on_local__` ON (src_1 PARTITION (`p1`)) PROPERTIES ("type" = "full");
func (s *Spec) CreatePartialSnapshotAndWaitForDone(table string, partitions []string) (string, error) {
if len(table) == 0 {
return "", xerror.Errorf(xerror.Normal, "source db is empty! you should have at least one table")
}

if len(partitions) == 0 {
return "", xerror.Errorf(xerror.Normal, "partition is empty! you should have at least one partition")
}

// snapshot name format "ccrp_${table}_${timestamp}"
// table refs = table
snapshotName := fmt.Sprintf("ccrp_%s_%s_%d", s.Database, s.Table, time.Now().Unix())
tableRef := utils.FormatKeywordName(table)
partitionRefs := "`" + strings.Join(partitions, "`,`") + "`"

log.Infof("create partial snapshot %s.%s", s.Database, snapshotName)

db, err := s.Connect()
if err != nil {
return "", err
}

backupSnapshotSql := fmt.Sprintf("BACKUP SNAPSHOT %s.%s TO `__keep_on_local__` ON ( %s PARTITION (%s) ) PROPERTIES (\"type\" = \"full\")", utils.FormatKeywordName(s.Database), snapshotName, tableRef, partitionRefs)
log.Debugf("backup partial snapshot sql: %s", backupSnapshotSql)
_, err = db.Exec(backupSnapshotSql)
if err != nil {
return "", xerror.Wrapf(err, xerror.Normal, "backup partial snapshot %s failed, sql: %s", snapshotName, backupSnapshotSql)
}

backupFinished, err := s.CheckBackupFinished(snapshotName)
if err != nil {
return "", err
}
if !backupFinished {
err = xerror.Errorf(xerror.Normal, "check backup state timeout, max try times: %d, sql: %s", MAX_CHECK_RETRY_TIMES, backupSnapshotSql)
return "", err
}

return snapshotName, nil
}

// TODO: Add TaskErrMsg
func (s *Spec) checkBackupFinished(snapshotName string) (BackupState, error) {
log.Debugf("check backup state of snapshot %s", snapshotName)
Expand Down
1 change: 1 addition & 0 deletions pkg/ccr/base/specer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Specer interface {
CreateTableOrView(createTable *record.CreateTable, srcDatabase string) error
CheckDatabaseExists() (bool, error)
CheckTableExists() (bool, error)
CreatePartialSnapshotAndWaitForDone(table string, partitions []string) (string, error)
CreateSnapshotAndWaitForDone(tables []string) (string, error)
CheckRestoreFinished(snapshotName string) (bool, error)
GetRestoreSignatureNotMatchedTable(snapshotName string) (string, error)
Expand Down
265 changes: 246 additions & 19 deletions pkg/ccr/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,194 @@ func (j *Job) isIncrementalSync() bool {
}
}

func (j *Job) addExtraInfo(jobInfo []byte) ([]byte, error) {
var jobInfoMap map[string]interface{}
err := json.Unmarshal(jobInfo, &jobInfoMap)
if err != nil {
return nil, xerror.Wrapf(err, xerror.Normal, "unmarshal jobInfo failed, jobInfo: %s", string(jobInfo))
}

extraInfo, err := j.genExtraInfo()
if err != nil {
return nil, err
}
log.Debugf("extraInfo: %v", extraInfo)
jobInfoMap["extra_info"] = extraInfo

jobInfoBytes, err := json.Marshal(jobInfoMap)
if err != nil {
return nil, xerror.Errorf(xerror.Normal, "marshal jobInfo failed, jobInfo: %v", jobInfoMap)
}

return jobInfoBytes, nil
}

// Like fullSync, but only backup and restore partial of the partitions of a table.
func (j *Job) partialSync() error {
type inMemoryData struct {
SnapshotName string `json:"snapshot_name"`
SnapshotResp *festruct.TGetSnapshotResult_ `json:"snapshot_resp"`
}

if j.progress.PartialSyncData == nil {
return xerror.Errorf(xerror.Normal, "run partial sync but data is nil")
}

table := j.progress.PartialSyncData.Table
partitions := j.progress.PartialSyncData.Partitions
switch j.progress.SubSyncState {
case Done:
log.Infof("partial sync status: done")
if err := j.newPartialSnapshot(table, partitions); err != nil {
return err
}

case BeginCreateSnapshot:
// Step 1: Create snapshot
log.Infof("partial sync status: create snapshot")
snapshotName, err := j.ISrc.CreatePartialSnapshotAndWaitForDone(table, partitions)
if err != nil {
return err
}

j.progress.NextSubCheckpoint(GetSnapshotInfo, snapshotName)

case GetSnapshotInfo:
// Step 2: Get snapshot info
log.Infof("partial sync status: get snapshot info")

snapshotName := j.progress.PersistData
src := &j.Src
srcRpc, err := j.factory.NewFeRpc(src)
if err != nil {
return err
}

log.Debugf("partial sync begin get snapshot %s", snapshotName)
snapshotResp, err := srcRpc.GetSnapshot(src, snapshotName)
if err != nil {
return err
}

if snapshotResp.Status.GetStatusCode() != tstatus.TStatusCode_OK {
err = xerror.Errorf(xerror.FE, "get snapshot failed, status: %v", snapshotResp.Status)
return err
}

if !snapshotResp.IsSetJobInfo() {
return xerror.New(xerror.Normal, "jobInfo is not set")
}

log.Tracef("job: %.128s", snapshotResp.GetJobInfo())
inMemoryData := &inMemoryData{
SnapshotName: snapshotName,
SnapshotResp: snapshotResp,
}
j.progress.NextSubVolatile(AddExtraInfo, inMemoryData)

case AddExtraInfo:
// Step 3: Add extra info
log.Infof("partial sync status: add extra info")

inMemoryData := j.progress.InMemoryData.(*inMemoryData)
snapshotResp := inMemoryData.SnapshotResp
jobInfo := snapshotResp.GetJobInfo()

log.Infof("partial sync snapshot response meta size: %d, job info size: %d",
len(snapshotResp.Meta), len(snapshotResp.JobInfo))

jobInfoBytes, err := j.addExtraInfo(jobInfo)
if err != nil {
return err
}

log.Debugf("partial sync job info size: %d, bytes: %.128s", len(jobInfoBytes), string(jobInfoBytes))
snapshotResp.SetJobInfo(jobInfoBytes)

j.progress.NextSubCheckpoint(RestoreSnapshot, inMemoryData)

case RestoreSnapshot:
// Step 4: Restore snapshot
log.Infof("partial sync status: restore snapshot")

if j.progress.InMemoryData == nil {
persistData := j.progress.PersistData
inMemoryData := &inMemoryData{}
if err := json.Unmarshal([]byte(persistData), inMemoryData); err != nil {
return xerror.Errorf(xerror.Normal, "unmarshal persistData failed, persistData: %s", persistData)
}
j.progress.InMemoryData = inMemoryData
}

// Step 4.1: start a new fullsync && persist
inMemoryData := j.progress.InMemoryData.(*inMemoryData)
snapshotName := inMemoryData.SnapshotName
restoreSnapshotName := restoreSnapshotName(snapshotName)
snapshotResp := inMemoryData.SnapshotResp

// Step 4.2: restore snapshot to dest
dest := &j.Dest
destRpc, err := j.factory.NewFeRpc(dest)
if err != nil {
return err
}
log.Debugf("partial sync begin restore snapshot %s to %s", snapshotName, restoreSnapshotName)

var tableRefs []*festruct.TTableRef
if j.SyncType == TableSync && j.Src.Table != j.Dest.Table {
log.Debugf("table sync snapshot not same name, table: %s, dest table: %s", j.Src.Table, j.Dest.Table)
tableRefs = make([]*festruct.TTableRef, 0)
tableRef := &festruct.TTableRef{
Table: &j.Src.Table,
AliasName: &j.Dest.Table,
}
tableRefs = append(tableRefs, tableRef)
}
restoreResp, err := destRpc.RestoreSnapshot(dest, tableRefs, restoreSnapshotName, snapshotResp)
if err != nil {
return err
}
if restoreResp.Status.GetStatusCode() != tstatus.TStatusCode_OK {
return xerror.Errorf(xerror.Normal, "restore snapshot failed, status: %v", restoreResp.Status)
}
log.Infof("partial sync restore snapshot resp: %v", restoreResp)

for {
restoreFinished, err := j.IDest.CheckRestoreFinished(restoreSnapshotName)
if err != nil {
return err
}

if restoreFinished {
j.progress.NextSubCheckpoint(PersistRestoreInfo, restoreSnapshotName)
break
}
// retry for MAX_CHECK_RETRY_TIMES, timeout, continue
}

case PersistRestoreInfo:
// Step 5: Update job progress && dest table id
// update job info, only for dest table id
log.Infof("fullsync status: persist restore info")

switch j.SyncType {
case DBSync:
j.progress.NextWithPersist(j.progress.CommitSeq, DBTablesIncrementalSync, Done, "")
case TableSync:
j.progress.NextWithPersist(j.progress.CommitSeq, TableIncrementalSync, Done, "")
default:
return xerror.Errorf(xerror.Normal, "invalid sync type %d", j.SyncType)
}

return nil

default:
return xerror.Errorf(xerror.Normal, "invalid job sub sync state %d", j.progress.SubSyncState)
}

return j.partialSync()
}

func (j *Job) fullSync() error {
type inMemoryData struct {
SnapshotName string `json:"snapshot_name"`
Expand Down Expand Up @@ -318,7 +506,7 @@ func (j *Job) fullSync() error {
return err
}

log.Debugf("begin get snapshot %s", snapshotName)
log.Debugf("fullsync begin get snapshot %s", snapshotName)
snapshotResp, err := srcRpc.GetSnapshot(src, snapshotName)
if err != nil {
return err
Expand All @@ -329,7 +517,7 @@ func (j *Job) fullSync() error {
return err
}

log.Tracef("job: %.128s", snapshotResp.GetJobInfo())
log.Tracef("fullsync snapshot job: %.128s", snapshotResp.GetJobInfo())
if !snapshotResp.IsSetJobInfo() {
return xerror.New(xerror.Normal, "jobInfo is not set")
}
Expand Down Expand Up @@ -364,24 +552,10 @@ func (j *Job) fullSync() error {
log.Infof("snapshot response meta size: %d, job info size: %d",
len(snapshotResp.Meta), len(snapshotResp.JobInfo))

var jobInfoMap map[string]interface{}
err := json.Unmarshal(jobInfo, &jobInfoMap)
if err != nil {
return xerror.Wrapf(err, xerror.Normal, "unmarshal jobInfo failed, jobInfo: %s", string(jobInfo))
}
log.Debugf("jobInfoMap: %v", jobInfoMap)

extraInfo, err := j.genExtraInfo()
jobInfoBytes, err := j.addExtraInfo(jobInfo)
if err != nil {
return err
}
log.Debugf("extraInfo: %v", extraInfo)
jobInfoMap["extra_info"] = extraInfo

jobInfoBytes, err := json.Marshal(jobInfoMap)
if err != nil {
return xerror.Errorf(xerror.Normal, "marshal jobInfo failed, jobInfo: %v", jobInfoMap)
}
log.Debugf("job info size: %d, bytes: %s", len(jobInfoBytes), string(jobInfoBytes))
snapshotResp.SetJobInfo(jobInfoBytes)

Expand Down Expand Up @@ -1131,9 +1305,33 @@ func (j *Job) handleTruncateTable(binlog *festruct.TBinlog) error {
func (j *Job) handleReplacePartitions(binlog *festruct.TBinlog) error {
log.Infof("handle replace partitions binlog, commit seq: %d", *binlog.CommitSeq)

// TODO(walter) replace partitions once backuping/restoring with temporary partitions is supportted.
data := binlog.GetData()
replacePartition, err := record.NewReplacePartitionFromJson(data)
if err != nil {
return err
}

if !replacePartition.StrictRange {
log.Warnf("replacing partitions with non strict range is not supported yet, replace partition record: %s", string(data))
return j.newSnapshot(j.progress.CommitSeq)
}

return j.newSnapshot(j.progress.CommitSeq)
if replacePartition.UseTempName {
log.Warnf("replacing partitions with use tmp name is not supported yet, replace partition record: %s", string(data))
return j.newSnapshot(j.progress.CommitSeq)
}

oldPartitions := strings.Join(replacePartition.Partitions, ",")
newPartitions := strings.Join(replacePartition.TempPartitions, ",")
log.Infof("table %s replace partitions %s with temp partitions %s",
replacePartition.TableName, oldPartitions, newPartitions)

partitions := replacePartition.Partitions
if replacePartition.UseTempName {
partitions = replacePartition.TempPartitions
}

return j.newPartialSnapshot(replacePartition.TableName, partitions)
}

// return: error && bool backToRunLoop
Expand Down Expand Up @@ -1317,6 +1515,9 @@ func (j *Job) tableSync() error {
case TableIncrementalSync:
log.Debug("table incremental sync")
return j.incrementalSync()
case TablePartialSync:
log.Debug("table partial sync")
return j.partialSync()
default:
return xerror.Errorf(xerror.Normal, "unknown sync state: %v", j.progress.SyncState)
}
Expand Down Expand Up @@ -1346,6 +1547,9 @@ func (j *Job) dbSync() error {
case DBIncrementalSync:
log.Debug("db incremental sync")
return j.incrementalSync()
case DBPartialSync:
log.Debug("db partial sync")
return j.partialSync()
default:
return xerror.Errorf(xerror.Normal, "unknown db sync state: %v", j.progress.SyncState)
}
Expand Down Expand Up @@ -1443,6 +1647,29 @@ func (j *Job) newSnapshot(commitSeq int64) error {
}
}

func (j *Job) newPartialSnapshot(table string, partitions []string) error {
// The binlog of commitSeq will be skipped once the partial snapshot finished.
commitSeq := j.progress.CommitSeq
log.Infof("new partial snapshot, commitSeq: %d, table: %s, partitions: %v", commitSeq, table, partitions)

j.progress.PartialSyncData = &JobPartialSyncData{
Table: table,
Partitions: partitions,
}
switch j.SyncType {
case TableSync:
j.progress.NextWithPersist(commitSeq, TablePartialSync, BeginCreateSnapshot, "")
return nil
case DBSync:
j.progress.NextWithPersist(commitSeq, DBPartialSync, BeginCreateSnapshot, "")
return nil
default:
err := xerror.Panicf(xerror.Normal, "unknown table sync type: %v", j.SyncType)
log.Fatalf("run %+v", err)
return err
}
}

// run job
func (j *Job) Run() error {
gls.ResetGls(gls.GoID(), map[interface{}]interface{}{})
Expand Down
Loading

0 comments on commit bf8d9ee

Please sign in to comment.