Skip to content

Commit

Permalink
Add feature atomic restore (selectdb#166)
Browse files Browse the repository at this point in the history
Atomic restore will replace tables instead of update tables inplace
so that the read is not affected during the fullsync.
  • Loading branch information
w41ter authored Sep 11, 2024
1 parent e553a51 commit 1e0d0bc
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 20 deletions.
37 changes: 30 additions & 7 deletions pkg/ccr/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/selectdb/ccr_syncer/pkg/ccr/base"
"github.com/selectdb/ccr_syncer/pkg/ccr/record"
"github.com/selectdb/ccr_syncer/pkg/rpc"
"github.com/selectdb/ccr_syncer/pkg/storage"
utils "github.com/selectdb/ccr_syncer/pkg/utils"
"github.com/selectdb/ccr_syncer/pkg/xerror"
Expand All @@ -37,6 +38,7 @@ const (
var (
featureSchemaChangePartialSync bool
featureCleanTableAndPartitions bool
featureAtomicRestore bool
)

func init() {
Expand All @@ -46,6 +48,8 @@ func init() {
// The default value is false, since clean tables will erase views unexpectedly.
flag.BoolVar(&featureCleanTableAndPartitions, "feature_clean_table_and_partitions", false,
"clean non restored tables and partitions during fullsync")
flag.BoolVar(&featureAtomicRestore, "feature_atomic_restore", true,
"replace tables in atomic during fullsync (otherwise the dest table will not be able to read).")
}

type SyncType int
Expand Down Expand Up @@ -446,8 +450,17 @@ func (j *Job) partialSync() error {
tableRefs = append(tableRefs, tableRef)
}

cleanPartitions, cleanTables := false, false // DO NOT drop exists tables and partitions
restoreResp, err := destRpc.RestoreSnapshot(dest, tableRefs, restoreSnapshotName, snapshotResp, cleanTables, cleanPartitions)
restoreReq := rpc.RestoreSnapshotRequest{
TableRefs: tableRefs,
SnapshotName: restoreSnapshotName,
SnapshotResult: snapshotResp,

// DO NOT drop exists tables and partitions
CleanPartitions: false,
CleanTables: false,
AtomicRestore: false,
}
restoreResp, err := destRpc.RestoreSnapshot(dest, &restoreReq)
if err != nil {
return err
}
Expand Down Expand Up @@ -671,15 +684,25 @@ func (j *Job) fullSync() error {
tableRefs = append(tableRefs, tableRef)
}

// drop exists partitions, and drop tables if in db sync.
cleanTables, cleanPartitions := false, false
restoreReq := rpc.RestoreSnapshotRequest{
TableRefs: tableRefs,
SnapshotName: restoreSnapshotName,
SnapshotResult: snapshotResp,
CleanPartitions: false,
CleanTables: false,
AtomicRestore: false,
}
if featureCleanTableAndPartitions {
cleanPartitions = true
// drop exists partitions, and drop tables if in db sync.
restoreReq.CleanPartitions = true
if j.SyncType == DBSync {
cleanTables = true
restoreReq.CleanTables = true
}
}
restoreResp, err := destRpc.RestoreSnapshot(dest, tableRefs, restoreSnapshotName, snapshotResp, cleanTables, cleanPartitions)
if featureAtomicRestore {
restoreReq.AtomicRestore = true
}
restoreResp, err := destRpc.RestoreSnapshot(dest, &restoreReq)
if err != nil {
return err
}
Expand Down
40 changes: 27 additions & 13 deletions pkg/rpc/fe.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,23 @@ func canUseNextAddr(err error) bool {
return false
}

type RestoreSnapshotRequest struct {
TableRefs []*festruct.TTableRef
SnapshotName string
SnapshotResult *festruct.TGetSnapshotResult_
AtomicRestore bool
CleanPartitions bool
CleanTables bool
}

type IFeRpc interface {
BeginTransaction(*base.Spec, string, []int64) (*festruct.TBeginTxnResult_, error)
CommitTransaction(*base.Spec, int64, []*festruct_types.TTabletCommitInfo) (*festruct.TCommitTxnResult_, error)
RollbackTransaction(spec *base.Spec, txnId int64) (*festruct.TRollbackTxnResult_, error)
GetBinlog(*base.Spec, int64) (*festruct.TGetBinlogResult_, error)
GetBinlogLag(*base.Spec, int64) (*festruct.TGetBinlogLagResult_, error)
GetSnapshot(*base.Spec, string) (*festruct.TGetSnapshotResult_, error)
RestoreSnapshot(*base.Spec, []*festruct.TTableRef, string, *festruct.TGetSnapshotResult_, bool, bool) (*festruct.TRestoreSnapshotResult_, error)
RestoreSnapshot(*base.Spec, *RestoreSnapshotRequest) (*festruct.TRestoreSnapshotResult_, error)
GetMasterToken(*base.Spec) (*festruct.TGetMasterTokenResult_, error)
GetDbMeta(spec *base.Spec) (*festruct.TGetMetaResult_, error)
GetTableMeta(spec *base.Spec, tableIds []int64) (*festruct.TGetMetaResult_, error)
Expand Down Expand Up @@ -384,10 +393,9 @@ func (rpc *FeRpc) GetSnapshot(spec *base.Spec, labelName string) (*festruct.TGet
return convertResult[festruct.TGetSnapshotResult_](result, err)
}

func (rpc *FeRpc) RestoreSnapshot(spec *base.Spec, tableRefs []*festruct.TTableRef, label string, snapshotResult *festruct.TGetSnapshotResult_, cleanTables bool, cleanPartitions bool) (*festruct.TRestoreSnapshotResult_, error) {
// return rpc.masterClient.RestoreSnapshot(spec, tableRefs, label, snapshotResult)
func (rpc *FeRpc) RestoreSnapshot(spec *base.Spec, req *RestoreSnapshotRequest) (*festruct.TRestoreSnapshotResult_, error) {
caller := func(client IFeRpc) (resultType, error) {
return client.RestoreSnapshot(spec, tableRefs, label, snapshotResult, cleanTables, cleanPartitions)
return client.RestoreSnapshot(spec, req)
}
result, err := rpc.callWithMasterRedirect(caller)
return convertResult[festruct.TRestoreSnapshotResult_](result, err)
Expand Down Expand Up @@ -661,10 +669,13 @@ func (rpc *singleFeClient) GetSnapshot(spec *base.Spec, labelName string) (*fest
// 10: optional map<string, string> properties
// 11: optional binary meta
// 12: optional binary job_info
// 13: optional bool clean_tables
// 14: optional bool clean_partitions
// 15: optional bool atomic_restore
// }
//
// Restore Snapshot rpc
func (rpc *singleFeClient) RestoreSnapshot(spec *base.Spec, tableRefs []*festruct.TTableRef, label string, snapshotResult *festruct.TGetSnapshotResult_, cleanTables bool, cleanPartitions bool) (*festruct.TRestoreSnapshotResult_, error) {
func (rpc *singleFeClient) RestoreSnapshot(spec *base.Spec, restoreReq *RestoreSnapshotRequest) (*festruct.TRestoreSnapshotResult_, error) {
// NOTE: ignore meta, because it's too large
log.Debugf("Call RestoreSnapshot, addr: %s, spec: %s", rpc.Address(), spec)

Expand All @@ -674,20 +685,23 @@ func (rpc *singleFeClient) RestoreSnapshot(spec *base.Spec, tableRefs []*festruc
properties["reserve_replica"] = "true"
req := &festruct.TRestoreSnapshotRequest{
Table: &spec.Table,
LabelName: &label,
LabelName: &restoreReq.SnapshotName,
RepoName: &repoName,
TableRefs: tableRefs,
TableRefs: restoreReq.TableRefs,
Properties: properties,
Meta: snapshotResult.GetMeta(),
JobInfo: snapshotResult.GetJobInfo(),
CleanTables: &cleanTables,
CleanPartitions: &cleanPartitions,
Meta: restoreReq.SnapshotResult.GetMeta(),
JobInfo: restoreReq.SnapshotResult.GetJobInfo(),
CleanTables: &restoreReq.CleanTables,
CleanPartitions: &restoreReq.CleanPartitions,
AtomicRestore: &restoreReq.AtomicRestore,
}
setAuthInfo(req, spec)

// NOTE: ignore meta, because it's too large
log.Debugf("RestoreSnapshotRequest user %s, db %s, table %s, label name %s, properties %v, clean tables: %v, clean partitions: %v",
req.GetUser(), req.GetDb(), req.GetTable(), req.GetLabelName(), properties, cleanTables, cleanPartitions)
log.Debugf("RestoreSnapshotRequest user %s, db %s, table %s, label name %s, properties %v, clean tables: %t, clean partitions: %t, atomic restore: %t",
req.GetUser(), req.GetDb(), req.GetTable(), req.GetLabelName(), properties,
restoreReq.CleanTables, restoreReq.CleanPartitions, restoreReq.AtomicRestore)

if resp, err := client.RestoreSnapshot(context.Background(), req); err != nil {
return nil, xerror.Wrapf(err, xerror.RPC, "RestoreSnapshot failed")
} else {
Expand Down

0 comments on commit 1e0d0bc

Please sign in to comment.