diff --git a/pkg/ccr/base/spec.go b/pkg/ccr/base/spec.go index 90912332..21d272f7 100644 --- a/pkg/ccr/base/spec.go +++ b/pkg/ccr/base/spec.go @@ -551,6 +551,48 @@ func (s *Spec) CreateSnapshotAndWaitForDone(tables []string) (string, error) { return snapshotName, nil } +// mysql> BACKUP SNAPSHOT ccr.snapshot_20230605 TO `__keep_on_local__` ON (src_1 PARTITION (`p1`)) PROPERTIES ("type" = "full"); +func (s *Spec) CreatePartialSnapshotAndWaitForDone(table string, partitions []string) (string, error) { + if len(table) == 0 { + return "", xerror.Errorf(xerror.Normal, "source db is empty! you should have at least one table") + } + + if len(partitions) == 0 { + return "", xerror.Errorf(xerror.Normal, "partition is empty! you should have at least one partition") + } + + // snapshot name format "ccrp_${table}_${timestamp}" + // table refs = table + snapshotName := fmt.Sprintf("ccrp_%s_%s_%d", s.Database, s.Table, time.Now().Unix()) + tableRef := utils.FormatKeywordName(table) + partitionRefs := "`" + strings.Join(partitions, "`,`") + "`" + + log.Infof("create partial snapshot %s.%s", s.Database, snapshotName) + + db, err := s.Connect() + if err != nil { + return "", err + } + + backupSnapshotSql := fmt.Sprintf("BACKUP SNAPSHOT %s.%s TO `__keep_on_local__` ON ( %s PARTITION (%s) ) PROPERTIES (\"type\" = \"full\")", utils.FormatKeywordName(s.Database), snapshotName, tableRef, partitionRefs) + log.Debugf("backup partial snapshot sql: %s", backupSnapshotSql) + _, err = db.Exec(backupSnapshotSql) + if err != nil { + return "", xerror.Wrapf(err, xerror.Normal, "backup partial snapshot %s failed, sql: %s", snapshotName, backupSnapshotSql) + } + + backupFinished, err := s.CheckBackupFinished(snapshotName) + if err != nil { + return "", err + } + if !backupFinished { + err = xerror.Errorf(xerror.Normal, "check backup state timeout, max try times: %d, sql: %s", MAX_CHECK_RETRY_TIMES, backupSnapshotSql) + return "", err + } + + return snapshotName, nil +} + // TODO: Add TaskErrMsg func (s *Spec) checkBackupFinished(snapshotName string) (BackupState, error) { log.Debugf("check backup state of snapshot %s", snapshotName) diff --git a/pkg/ccr/base/specer.go b/pkg/ccr/base/specer.go index dacd5f1c..418afb78 100644 --- a/pkg/ccr/base/specer.go +++ b/pkg/ccr/base/specer.go @@ -24,6 +24,7 @@ type Specer interface { CreateTableOrView(createTable *record.CreateTable, srcDatabase string) error CheckDatabaseExists() (bool, error) CheckTableExists() (bool, error) + CreatePartialSnapshotAndWaitForDone(table string, partitions []string) (string, error) CreateSnapshotAndWaitForDone(tables []string) (string, error) CheckRestoreFinished(snapshotName string) (bool, error) GetRestoreSignatureNotMatchedTable(snapshotName string) (string, error) diff --git a/pkg/ccr/job.go b/pkg/ccr/job.go index 2d5deac0..d5a52aa5 100644 --- a/pkg/ccr/job.go +++ b/pkg/ccr/job.go @@ -267,6 +267,194 @@ func (j *Job) isIncrementalSync() bool { } } +func (j *Job) addExtraInfo(jobInfo []byte) ([]byte, error) { + var jobInfoMap map[string]interface{} + err := json.Unmarshal(jobInfo, &jobInfoMap) + if err != nil { + return nil, xerror.Wrapf(err, xerror.Normal, "unmarshal jobInfo failed, jobInfo: %s", string(jobInfo)) + } + + extraInfo, err := j.genExtraInfo() + if err != nil { + return nil, err + } + log.Debugf("extraInfo: %v", extraInfo) + jobInfoMap["extra_info"] = extraInfo + + jobInfoBytes, err := json.Marshal(jobInfoMap) + if err != nil { + return nil, xerror.Errorf(xerror.Normal, "marshal jobInfo failed, jobInfo: %v", jobInfoMap) + } + + return jobInfoBytes, nil +} + +// Like fullSync, but only backup and restore partial of the partitions of a table. +func (j *Job) partialSync() error { + type inMemoryData struct { + SnapshotName string `json:"snapshot_name"` + SnapshotResp *festruct.TGetSnapshotResult_ `json:"snapshot_resp"` + } + + if j.progress.PartialSyncData == nil { + return xerror.Errorf(xerror.Normal, "run partial sync but data is nil") + } + + table := j.progress.PartialSyncData.Table + partitions := j.progress.PartialSyncData.Partitions + switch j.progress.SubSyncState { + case Done: + log.Infof("partial sync status: done") + if err := j.newPartialSnapshot(table, partitions); err != nil { + return err + } + + case BeginCreateSnapshot: + // Step 1: Create snapshot + log.Infof("partial sync status: create snapshot") + snapshotName, err := j.ISrc.CreatePartialSnapshotAndWaitForDone(table, partitions) + if err != nil { + return err + } + + j.progress.NextSubCheckpoint(GetSnapshotInfo, snapshotName) + + case GetSnapshotInfo: + // Step 2: Get snapshot info + log.Infof("partial sync status: get snapshot info") + + snapshotName := j.progress.PersistData + src := &j.Src + srcRpc, err := j.factory.NewFeRpc(src) + if err != nil { + return err + } + + log.Debugf("partial sync begin get snapshot %s", snapshotName) + snapshotResp, err := srcRpc.GetSnapshot(src, snapshotName) + if err != nil { + return err + } + + if snapshotResp.Status.GetStatusCode() != tstatus.TStatusCode_OK { + err = xerror.Errorf(xerror.FE, "get snapshot failed, status: %v", snapshotResp.Status) + return err + } + + if !snapshotResp.IsSetJobInfo() { + return xerror.New(xerror.Normal, "jobInfo is not set") + } + + log.Tracef("job: %.128s", snapshotResp.GetJobInfo()) + inMemoryData := &inMemoryData{ + SnapshotName: snapshotName, + SnapshotResp: snapshotResp, + } + j.progress.NextSubVolatile(AddExtraInfo, inMemoryData) + + case AddExtraInfo: + // Step 3: Add extra info + log.Infof("partial sync status: add extra info") + + inMemoryData := j.progress.InMemoryData.(*inMemoryData) + snapshotResp := inMemoryData.SnapshotResp + jobInfo := snapshotResp.GetJobInfo() + + log.Infof("partial sync snapshot response meta size: %d, job info size: %d", + len(snapshotResp.Meta), len(snapshotResp.JobInfo)) + + jobInfoBytes, err := j.addExtraInfo(jobInfo) + if err != nil { + return err + } + + log.Debugf("partial sync job info size: %d, bytes: %.128s", len(jobInfoBytes), string(jobInfoBytes)) + snapshotResp.SetJobInfo(jobInfoBytes) + + j.progress.NextSubCheckpoint(RestoreSnapshot, inMemoryData) + + case RestoreSnapshot: + // Step 4: Restore snapshot + log.Infof("partial sync status: restore snapshot") + + if j.progress.InMemoryData == nil { + persistData := j.progress.PersistData + inMemoryData := &inMemoryData{} + if err := json.Unmarshal([]byte(persistData), inMemoryData); err != nil { + return xerror.Errorf(xerror.Normal, "unmarshal persistData failed, persistData: %s", persistData) + } + j.progress.InMemoryData = inMemoryData + } + + // Step 4.1: start a new fullsync && persist + inMemoryData := j.progress.InMemoryData.(*inMemoryData) + snapshotName := inMemoryData.SnapshotName + restoreSnapshotName := restoreSnapshotName(snapshotName) + snapshotResp := inMemoryData.SnapshotResp + + // Step 4.2: restore snapshot to dest + dest := &j.Dest + destRpc, err := j.factory.NewFeRpc(dest) + if err != nil { + return err + } + log.Debugf("partial sync begin restore snapshot %s to %s", snapshotName, restoreSnapshotName) + + var tableRefs []*festruct.TTableRef + if j.SyncType == TableSync && j.Src.Table != j.Dest.Table { + log.Debugf("table sync snapshot not same name, table: %s, dest table: %s", j.Src.Table, j.Dest.Table) + tableRefs = make([]*festruct.TTableRef, 0) + tableRef := &festruct.TTableRef{ + Table: &j.Src.Table, + AliasName: &j.Dest.Table, + } + tableRefs = append(tableRefs, tableRef) + } + restoreResp, err := destRpc.RestoreSnapshot(dest, tableRefs, restoreSnapshotName, snapshotResp) + if err != nil { + return err + } + if restoreResp.Status.GetStatusCode() != tstatus.TStatusCode_OK { + return xerror.Errorf(xerror.Normal, "restore snapshot failed, status: %v", restoreResp.Status) + } + log.Infof("partial sync restore snapshot resp: %v", restoreResp) + + for { + restoreFinished, err := j.IDest.CheckRestoreFinished(restoreSnapshotName) + if err != nil { + return err + } + + if restoreFinished { + j.progress.NextSubCheckpoint(PersistRestoreInfo, restoreSnapshotName) + break + } + // retry for MAX_CHECK_RETRY_TIMES, timeout, continue + } + + case PersistRestoreInfo: + // Step 5: Update job progress && dest table id + // update job info, only for dest table id + log.Infof("fullsync status: persist restore info") + + switch j.SyncType { + case DBSync: + j.progress.NextWithPersist(j.progress.CommitSeq, DBTablesIncrementalSync, Done, "") + case TableSync: + j.progress.NextWithPersist(j.progress.CommitSeq, TableIncrementalSync, Done, "") + default: + return xerror.Errorf(xerror.Normal, "invalid sync type %d", j.SyncType) + } + + return nil + + default: + return xerror.Errorf(xerror.Normal, "invalid job sub sync state %d", j.progress.SubSyncState) + } + + return j.partialSync() +} + func (j *Job) fullSync() error { type inMemoryData struct { SnapshotName string `json:"snapshot_name"` @@ -318,7 +506,7 @@ func (j *Job) fullSync() error { return err } - log.Debugf("begin get snapshot %s", snapshotName) + log.Debugf("fullsync begin get snapshot %s", snapshotName) snapshotResp, err := srcRpc.GetSnapshot(src, snapshotName) if err != nil { return err @@ -329,7 +517,7 @@ func (j *Job) fullSync() error { return err } - log.Tracef("job: %.128s", snapshotResp.GetJobInfo()) + log.Tracef("fullsync snapshot job: %.128s", snapshotResp.GetJobInfo()) if !snapshotResp.IsSetJobInfo() { return xerror.New(xerror.Normal, "jobInfo is not set") } @@ -364,24 +552,10 @@ func (j *Job) fullSync() error { log.Infof("snapshot response meta size: %d, job info size: %d", len(snapshotResp.Meta), len(snapshotResp.JobInfo)) - var jobInfoMap map[string]interface{} - err := json.Unmarshal(jobInfo, &jobInfoMap) - if err != nil { - return xerror.Wrapf(err, xerror.Normal, "unmarshal jobInfo failed, jobInfo: %s", string(jobInfo)) - } - log.Debugf("jobInfoMap: %v", jobInfoMap) - - extraInfo, err := j.genExtraInfo() + jobInfoBytes, err := j.addExtraInfo(jobInfo) if err != nil { return err } - log.Debugf("extraInfo: %v", extraInfo) - jobInfoMap["extra_info"] = extraInfo - - jobInfoBytes, err := json.Marshal(jobInfoMap) - if err != nil { - return xerror.Errorf(xerror.Normal, "marshal jobInfo failed, jobInfo: %v", jobInfoMap) - } log.Debugf("job info size: %d, bytes: %s", len(jobInfoBytes), string(jobInfoBytes)) snapshotResp.SetJobInfo(jobInfoBytes) @@ -1131,9 +1305,33 @@ func (j *Job) handleTruncateTable(binlog *festruct.TBinlog) error { func (j *Job) handleReplacePartitions(binlog *festruct.TBinlog) error { log.Infof("handle replace partitions binlog, commit seq: %d", *binlog.CommitSeq) - // TODO(walter) replace partitions once backuping/restoring with temporary partitions is supportted. + data := binlog.GetData() + replacePartition, err := record.NewReplacePartitionFromJson(data) + if err != nil { + return err + } + + if !replacePartition.StrictRange { + log.Warnf("replacing partitions with non strict range is not supported yet, replace partition record: %s", string(data)) + return j.newSnapshot(j.progress.CommitSeq) + } - return j.newSnapshot(j.progress.CommitSeq) + if replacePartition.UseTempName { + log.Warnf("replacing partitions with use tmp name is not supported yet, replace partition record: %s", string(data)) + return j.newSnapshot(j.progress.CommitSeq) + } + + oldPartitions := strings.Join(replacePartition.Partitions, ",") + newPartitions := strings.Join(replacePartition.TempPartitions, ",") + log.Infof("table %s replace partitions %s with temp partitions %s", + replacePartition.TableName, oldPartitions, newPartitions) + + partitions := replacePartition.Partitions + if replacePartition.UseTempName { + partitions = replacePartition.TempPartitions + } + + return j.newPartialSnapshot(replacePartition.TableName, partitions) } // return: error && bool backToRunLoop @@ -1317,6 +1515,9 @@ func (j *Job) tableSync() error { case TableIncrementalSync: log.Debug("table incremental sync") return j.incrementalSync() + case TablePartialSync: + log.Debug("table partial sync") + return j.partialSync() default: return xerror.Errorf(xerror.Normal, "unknown sync state: %v", j.progress.SyncState) } @@ -1346,6 +1547,9 @@ func (j *Job) dbSync() error { case DBIncrementalSync: log.Debug("db incremental sync") return j.incrementalSync() + case DBPartialSync: + log.Debug("db partial sync") + return j.partialSync() default: return xerror.Errorf(xerror.Normal, "unknown db sync state: %v", j.progress.SyncState) } @@ -1443,6 +1647,29 @@ func (j *Job) newSnapshot(commitSeq int64) error { } } +func (j *Job) newPartialSnapshot(table string, partitions []string) error { + // The binlog of commitSeq will be skipped once the partial snapshot finished. + commitSeq := j.progress.CommitSeq + log.Infof("new partial snapshot, commitSeq: %d, table: %s, partitions: %v", commitSeq, table, partitions) + + j.progress.PartialSyncData = &JobPartialSyncData{ + Table: table, + Partitions: partitions, + } + switch j.SyncType { + case TableSync: + j.progress.NextWithPersist(commitSeq, TablePartialSync, BeginCreateSnapshot, "") + return nil + case DBSync: + j.progress.NextWithPersist(commitSeq, DBPartialSync, BeginCreateSnapshot, "") + return nil + default: + err := xerror.Panicf(xerror.Normal, "unknown table sync type: %v", j.SyncType) + log.Fatalf("run %+v", err) + return err + } +} + // run job func (j *Job) Run() error { gls.ResetGls(gls.GoID(), map[interface{}]interface{}{}) diff --git a/pkg/ccr/job_progress.go b/pkg/ccr/job_progress.go index 4744c10e..9feac215 100644 --- a/pkg/ccr/job_progress.go +++ b/pkg/ccr/job_progress.go @@ -26,10 +26,12 @@ const ( DBTablesIncrementalSync SyncState = 1 DBSpecificTableFullSync SyncState = 2 DBIncrementalSync SyncState = 3 + DBPartialSync SyncState = 4 // sync partitions // Table sync state machine states TableFullSync SyncState = 500 TableIncrementalSync SyncState = 501 + TablePartialSync SyncState = 502 // TODO: add timeout state for restart full sync ) @@ -45,10 +47,14 @@ func (s SyncState) String() string { return "DBSpecificTableFullSync" case DBIncrementalSync: return "DBIncrementalSync" + case DBPartialSync: + return "DBPartialSync" case TableFullSync: return "TableFullSync" case TableIncrementalSync: return "TableIncrementalSync" + case TablePartialSync: + return "TablePartialSync" default: return fmt.Sprintf("Unknown SyncState: %d", s) } @@ -127,6 +133,13 @@ func (s SubSyncState) String() string { } } +type JobPartialSyncData struct { + TableId int64 `json:"table_id"` + Table string `json:"table"` + PartitionIds []int64 `json:"partition_ids"` + Partitions []string `json:"partitions"` +} + type JobProgress struct { JobName string `json:"job_name"` db storage.DB `json:"-"` @@ -137,12 +150,13 @@ type JobProgress struct { SubSyncState SubSyncState `json:"sub_sync_state"` // The commit seq where the target cluster has synced. - PrevCommitSeq int64 `json:"prev_commit_seq"` - CommitSeq int64 `json:"commit_seq"` - TableMapping map[int64]int64 `json:"table_mapping"` - TableCommitSeqMap map[int64]int64 `json:"table_commit_seq_map"` // only for DBTablesIncrementalSync - InMemoryData any `json:"-"` - PersistData string `json:"data"` // this often for binlog or snapshot info + PrevCommitSeq int64 `json:"prev_commit_seq"` + CommitSeq int64 `json:"commit_seq"` + TableMapping map[int64]int64 `json:"table_mapping"` + TableCommitSeqMap map[int64]int64 `json:"table_commit_seq_map"` // only for DBTablesIncrementalSync + InMemoryData any `json:"-"` + PersistData string `json:"data"` // this often for binlog or snapshot info + PartialSyncData *JobPartialSyncData `json:"partial_sync_data,omitempty"` // Some fields to save the unix epoch time of the key timepoint. CreatedAt int64 `json:"created_at,omitempty"` diff --git a/pkg/ccr/record/replace_partition.go b/pkg/ccr/record/replace_partition.go new file mode 100644 index 00000000..02b1bd90 --- /dev/null +++ b/pkg/ccr/record/replace_partition.go @@ -0,0 +1,40 @@ +package record + +import ( + "encoding/json" + + "github.com/selectdb/ccr_syncer/pkg/xerror" +) + +type ReplacePartitionRecord struct { + DbId int64 `json:"dbId"` + DbName string `json:"dbName"` + TableId int64 `json:"tblId"` + TableName string `json:"tblName"` + Partitions []string `json:"partitions"` + TempPartitions []string `json:"tempPartitions"` + StrictRange bool `json:"strictRange"` + UseTempName bool `json:"useTempPartitionName"` +} + +func NewReplacePartitionFromJson(data string) (*ReplacePartitionRecord, error) { + var replacePartition ReplacePartitionRecord + err := json.Unmarshal([]byte(data), &replacePartition) + if err != nil { + return nil, xerror.Wrap(err, xerror.Normal, "unmarshal replace partition error") + } + + if len(replacePartition.TempPartitions) == 0 { + return nil, xerror.Errorf(xerror.Normal, "the temp partitions of the replace partition record is empty") + } + + if replacePartition.TableId == 0 { + return nil, xerror.Errorf(xerror.Normal, "table id not found") + } + + if replacePartition.TableName == "" { + return nil, xerror.Errorf(xerror.Normal, "table name is empty") + } + + return &replacePartition, nil +} diff --git a/regression-test/suites/db-sync-insert-overwrite/test_db_insert_overwrite.groovy b/regression-test/suites/db-sync-insert-overwrite/test_db_insert_overwrite.groovy new file mode 100644 index 00000000..2b3af810 --- /dev/null +++ b/regression-test/suites/db-sync-insert-overwrite/test_db_insert_overwrite.groovy @@ -0,0 +1,218 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +suite("test_db_insert_overwrite") { + // The doris has two kind of insert overwrite handle logic: leagcy and nereids. + // The first will + // 1. create temp table + // 2. insert into temp table + // 3. replace table + // The second will + // 1. create temp partitions + // 2. insert into temp partitions + // 3. replace overlap partitions + def tableName = "tbl_insert_overwrite_" + UUID.randomUUID().toString().replace("-", "") + def uniqueTable = "${tableName}_unique" + def syncerAddress = "127.0.0.1:9190" + def test_num = 0 + def insert_num = 5 + def sync_gap_time = 5000 + String response + + def checkSelectTimesOf = { sqlString, rowSize, times -> Boolean + def tmpRes = target_sql "${sqlString}" + while (tmpRes.size() != rowSize) { + sleep(sync_gap_time) + if (--times > 0) { + tmpRes = target_sql "${sqlString}" + } else { + break + } + } + return tmpRes.size() == rowSize + } + + def checkShowTimesOf = { sqlString, myClosure, times, func = "sql" -> Boolean + Boolean ret = false + List> res + while (times > 0) { + try { + if (func == "sql") { + res = sql "${sqlString}" + } else { + res = target_sql "${sqlString}" + } + if (myClosure.call(res)) { + ret = true + } + } catch (Exception e) {} + + if (ret) { + break + } else if (--times > 0) { + sleep(sync_gap_time) + } + } + + return ret + } + + def checkRestoreFinishTimesOf = { checkTable, times -> Boolean + Boolean ret = false + while (times > 0) { + def sqlInfo = target_sql "SHOW RESTORE FROM TEST_${context.dbName}" + for (List row : sqlInfo) { + if ((row[10] as String).contains(checkTable)) { + ret = row[4] == "FINISHED" + } + } + + if (ret) { + break + } else if (--times > 0) { + sleep(sync_gap_time) + } + } + + return ret + } + + def checkBackupFinishTimesOf = { checkTable, times -> Boolean + Boolean ret = false + while (times > 0) { + def sqlInfo = sql "SHOW BACKUP FROM ${context.dbName}" + for (List row : sqlInfo) { + if ((row[4] as String).contains(checkTable)) { + ret = row[3] == "FINISHED" + } + } + + if (ret) { + break + } else if (--times > 0) { + sleep(sync_gap_time) + } + } + + return ret + } + + def checkData = { data, beginCol, value -> Boolean + if (data.size() < beginCol + value.size()) { + return false + } + + for (int i = 0; i < value.size(); ++i) { + if ((data[beginCol + i] as int) != value[i]) { + return false + } + } + + return true + } + + def exist = { res -> Boolean + return res.size() != 0 + } + def notExist = { res -> Boolean + return res.size() == 0 + } + + sql "ALTER DATABASE ${context.dbName} SET properties (\"binlog.enable\" = \"true\")" + + sql """ + CREATE TABLE if NOT EXISTS ${uniqueTable} + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + PARTITION BY RANGE(id) + ( + PARTITION `p1` VALUES LESS THAN ("100"), + PARTITION `p2` VALUES LESS THAN ("200") + ) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "binlog.enable" = "true", + "binlog.ttl_seconds" = "180" + ) + """ + + sql """ + INSERT INTO ${uniqueTable} VALUES + (1, 0), + (1, 1), + (1, 2), + (1, 3), + (1, 4) + """ + sql "sync" + + // test 1: target cluster follow source cluster + logger.info("=== Test 1: backup/restore case ===") + httpTest { + uri "/create_ccr" + endpoint syncerAddress + def bodyJson = get_ccr_body "" + body "${bodyJson}" + op "post" + result response + } + assertTrue(checkRestoreFinishTimesOf("${uniqueTable}", 60)) + assertTrue(checkShowTimesOf("SELECT * FROM ${uniqueTable} WHERE test = 1", exist, 60, "sql")) + assertTrue(checkShowTimesOf("SELECT * FROM ${uniqueTable} WHERE test = 1", exist, 60, "target")) + assertTrue(checkSelectTimesOf("SELECT * FROM ${uniqueTable} WHERE test = 1 ORDER BY id", 5, 60)) + + logger.info("=== Test 2: dest cluster follow source cluster case ===") + + sql """ + INSERT INTO ${uniqueTable} VALUES + (2, 0), + (2, 1), + (2, 2), + (2, 3), + (2, 4) + """ + sql "sync" + + assertTrue(checkShowTimesOf("SELECT * FROM ${uniqueTable} WHERE test=2", exist, 60, "sql")) + assertTrue(checkShowTimesOf("SELECT * FROM ${uniqueTable} WHERE test=2", exist, 60, "target")) + assertTrue(checkSelectTimesOf("SELECT * FROM ${uniqueTable} WHERE test=2", 5, 60)) + + logger.info("=== Test 3: insert overwrite source table ===") + + sql """ + INSERT OVERWRITE TABLE ${uniqueTable} VALUES + (3, 0), + (3, 1), + (3, 2), + (3, 3), + (3, 4) + """ + sql "sync" + + assertTrue(checkShowTimesOf("SELECT * FROM ${uniqueTable} WHERE test=3", exist, 60, "sql")) + assertTrue(checkShowTimesOf("SELECT * FROM ${uniqueTable} WHERE test=2", notExist, 60, "sql")) + assertTrue(checkShowTimesOf("SELECT * FROM ${uniqueTable} WHERE test=3", exist, 60, "target")) + assertTrue(checkShowTimesOf("SELECT * FROM ${uniqueTable} WHERE test=2", notExist, 60, "target")) + + assertTrue(checkSelectTimesOf("SELECT * FROM ${uniqueTable} WHERE test=3", 5, 60)) + assertTrue(checkSelectTimesOf("SELECT * FROM ${uniqueTable}", 5, 60)) +} + diff --git a/regression-test/suites/table-sync/test_insert_overwrite.groovy b/regression-test/suites/table-sync/test_insert_overwrite.groovy index 54d0f2a6..27bfe021 100644 --- a/regression-test/suites/table-sync/test_insert_overwrite.groovy +++ b/regression-test/suites/table-sync/test_insert_overwrite.groovy @@ -22,7 +22,7 @@ suite("test_insert_overwrite") { // 3. replace table // The second will // 1. create temp partitions - // 2. insert int temp partitions + // 2. insert into temp partitions // 3. replace overlap partitions def tableName = "tbl_insert_overwrite_" + UUID.randomUUID().toString().replace("-", "") def uniqueTable = "${tableName}_unique" diff --git a/regression-test/suites/table-sync/test_replace_partial_partition.groovy b/regression-test/suites/table-sync/test_replace_partial_partition.groovy new file mode 100644 index 00000000..0c90ec93 --- /dev/null +++ b/regression-test/suites/table-sync/test_replace_partial_partition.groovy @@ -0,0 +1,209 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_replace_partial_partition") { + + def baseTableName = "test_replace_partial_p_" + UUID.randomUUID().toString().replace("-", "") + def syncerAddress = "127.0.0.1:9190" + def test_num = 0 + def insert_num = 5 + def sync_gap_time = 5000 + def opPartitonName = "less0" + String response + + def checkSelectTimesOf = { sqlString, rowSize, times -> Boolean + def tmpRes = target_sql "${sqlString}" + while (tmpRes.size() != rowSize) { + sleep(sync_gap_time) + if (--times > 0) { + tmpRes = target_sql "${sqlString}" + } else { + break + } + } + return tmpRes.size() == rowSize + } + + def checkShowTimesOf = { sqlString, myClosure, times, func = "sql" -> Boolean + Boolean ret = false + List> res + while (times > 0) { + try { + if (func == "sql") { + res = sql "${sqlString}" + } else { + res = target_sql "${sqlString}" + } + if (myClosure.call(res)) { + ret = true + } + } catch (Exception e) {} + + if (ret) { + break + } else if (--times > 0) { + sleep(sync_gap_time) + } + } + + return ret + } + + def checkRestoreFinishTimesOf = { checkTable, times -> Boolean + Boolean ret = false + while (times > 0) { + def sqlInfo = target_sql "SHOW RESTORE FROM TEST_${context.dbName}" + for (List row : sqlInfo) { + if ((row[10] as String).contains(checkTable)) { + ret = (row[4] as String) == "FINISHED" + } + } + + if (ret) { + break + } else if (--times > 0) { + sleep(sync_gap_time) + } + } + + return ret + } + + def exist = { res -> Boolean + return res.size() != 0 + } + def notExist = { res -> Boolean + return res.size() == 0 + } + + logger.info("=== Create table ===") + tableName = "${baseTableName}" + sql """ + CREATE TABLE if NOT EXISTS ${tableName} + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + PARTITION BY RANGE(`id`) + ( + PARTITION `p1` VALUES LESS THAN ("0"), + PARTITION `p2` VALUES LESS THAN ("100"), + PARTITION `p3` VALUES LESS THAN ("200"), + PARTITION `p4` VALUES LESS THAN ("300") + ) + DISTRIBUTED BY HASH(id) BUCKETS AUTO + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "binlog.enable" = "true" + ) + """ + + // insert into p2,p3,p4 + sql """ + INSERT INTO ${tableName} VALUES + (1, 10), + (1, 11), + (1, 12), + (1, 13), + (1, 14), + (2, 100), + (2, 110), + (2, 120), + (2, 130), + (2, 140), + (3, 200), + (3, 210), + (3, 220), + (3, 230), + (3, 240) + """ + sql "sync" + + httpTest { + uri "/create_ccr" + endpoint syncerAddress + def bodyJson = get_ccr_body "${tableName}" + body "${bodyJson}" + op "post" + result response + } + + assertTrue(checkRestoreFinishTimesOf("${tableName}", 60)) + // p2,p3,p4 all has 5 rows + assertTrue(checkSelectTimesOf("SELECT * FROM ${tableName} WHERE test=1", 5, 60)) + assertTrue(checkSelectTimesOf("SELECT * FROM ${tableName} WHERE test=2", 5, 60)) + assertTrue(checkSelectTimesOf("SELECT * FROM ${tableName} WHERE test=3", 5, 60)) + + logger.info("=== Add temp partition p5 ===") + + sql """ + ALTER TABLE ${tableName} ADD TEMPORARY PARTITION p5 VALUES [("0"), ("100")) + """ + + assertTrue(checkShowTimesOf(""" + SHOW TEMPORARY PARTITIONS + FROM ${tableName} + WHERE PartitionName = "p5" + """, + exist, 60, "sql")) + + sql "INSERT INTO ${tableName} TEMPORARY PARTITION (p5) VALUES (1, 50)" + + assertTrue(checkShowTimesOf(""" + SELECT * + FROM ${tableName} + TEMPORARY PARTITION (p5) + WHERE id = 50 + """, + exist, 60, "sql")) + + logger.info("=== Replace partition p2 by p5 ===") + + assertTrue(checkShowTimesOf(""" + SELECT * + FROM ${tableName} + WHERE id = 50 + """, + notExist, 60, "target")) + + sql "ALTER TABLE ${tableName} REPLACE PARTITION (p2) WITH TEMPORARY PARTITION (p5)" + + assertTrue(checkShowTimesOf(""" + SELECT * + FROM ${tableName} + WHERE id = 50 + """, + exist, 60, "target")) + + // p3,p4 all has 5 rows, p2 has 1 row + assertTrue(checkSelectTimesOf("SELECT * FROM ${tableName} WHERE test=1", 1, 60)) + assertTrue(checkSelectTimesOf("SELECT * FROM ${tableName} WHERE test=2", 5, 60)) + assertTrue(checkSelectTimesOf("SELECT * FROM ${tableName} WHERE test=3", 5, 60)) + + // The last restore should contains only partition p2 + def show_restore_result = target_sql "SHOW RESTORE FROM TEST_${context.dbName}" + def restore_num = show_restore_result.size() + def last_restore_result = show_restore_result[restore_num-1] + def restore_objects = last_restore_result[10] // RestoreObjs + logger.info("The restore result: ${last_restore_result}") + logger.info("The restore objects: ${restore_objects}") + assertTrue(restore_objects.contains("""partition_names":["p2"]""")) +} + + diff --git a/regression-test/suites/table-sync/test_replace_partition.groovy b/regression-test/suites/table-sync/test_replace_partition.groovy new file mode 100644 index 00000000..4b63b204 --- /dev/null +++ b/regression-test/suites/table-sync/test_replace_partition.groovy @@ -0,0 +1,263 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_replace_partition") { + + def baseTableName = "test_replace_partition_" + UUID.randomUUID().toString().replace("-", "") + def syncerAddress = "127.0.0.1:9190" + def test_num = 0 + def insert_num = 5 + def sync_gap_time = 5000 + def opPartitonName = "less0" + String response + + def checkSelectTimesOf = { sqlString, rowSize, times -> Boolean + def tmpRes = target_sql "${sqlString}" + while (tmpRes.size() != rowSize) { + sleep(sync_gap_time) + if (--times > 0) { + tmpRes = target_sql "${sqlString}" + } else { + break + } + } + return tmpRes.size() == rowSize + } + + def checkShowTimesOf = { sqlString, myClosure, times, func = "sql" -> Boolean + Boolean ret = false + List> res + while (times > 0) { + try { + if (func == "sql") { + res = sql "${sqlString}" + } else { + res = target_sql "${sqlString}" + } + if (myClosure.call(res)) { + ret = true + } + } catch (Exception e) {} + + if (ret) { + break + } else if (--times > 0) { + sleep(sync_gap_time) + } + } + + return ret + } + + def checkRestoreFinishTimesOf = { checkTable, times -> Boolean + Boolean ret = false + while (times > 0) { + def sqlInfo = target_sql "SHOW RESTORE FROM TEST_${context.dbName}" + for (List row : sqlInfo) { + if ((row[10] as String).contains(checkTable)) { + ret = (row[4] as String) == "FINISHED" + } + } + + if (ret) { + break + } else if (--times > 0) { + sleep(sync_gap_time) + } + } + + return ret + } + + def exist = { res -> Boolean + return res.size() != 0 + } + def notExist = { res -> Boolean + return res.size() == 0 + } + + logger.info("=== Create table ===") + tableName = "${baseTableName}" + sql """ + CREATE TABLE if NOT EXISTS ${tableName} + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + PARTITION BY RANGE(`id`) + ( + PARTITION `p1` VALUES LESS THAN ("0"), + PARTITION `p2` VALUES LESS THAN ("100"), + PARTITION `p3` VALUES LESS THAN ("200"), + PARTITION `p4` VALUES LESS THAN ("300") + ) + DISTRIBUTED BY HASH(id) BUCKETS AUTO + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "binlog.enable" = "true" + ) + """ + + httpTest { + uri "/create_ccr" + endpoint syncerAddress + def bodyJson = get_ccr_body "${tableName}" + body "${bodyJson}" + op "post" + result response + } + + assertTrue(checkRestoreFinishTimesOf("${tableName}", 60)) + + logger.info("=== Add temp partition p5 ===") + + sql """ + ALTER TABLE ${tableName} ADD TEMPORARY PARTITION p5 VALUES [("0"), ("100")) + """ + + assertTrue(checkShowTimesOf(""" + SHOW TEMPORARY PARTITIONS + FROM ${tableName} + WHERE PartitionName = "p5" + """, + exist, 60, "sql")) + + sql "INSERT INTO ${tableName} TEMPORARY PARTITION (p5) VALUES (1, 50)" + + assertTrue(checkShowTimesOf(""" + SELECT * + FROM ${tableName} + TEMPORARY PARTITION (p5) + WHERE id = 50 + """, + exist, 60, "sql")) + + logger.info("=== Replace partition p2 by p5 ===") + + assertTrue(checkShowTimesOf(""" + SELECT * + FROM ${tableName} + WHERE id = 50 + """, + notExist, 60, "target")) + + sql "ALTER TABLE ${tableName} REPLACE PARTITION (p2) WITH TEMPORARY PARTITION (p5)" + + assertTrue(checkShowTimesOf(""" + SELECT * + FROM ${tableName} + WHERE id = 50 + """, + exist, 60, "target")) + + // We don't support replace partition with non-strict range and use temp name. + + // logger.info("=== Add temp partition p6 ===") + + // sql """ + // ALTER TABLE ${tableName} ADD TEMPORARY PARTITION p6 VALUES [("100"), ("200")) + // """ + + // assertTrue(checkShowTimesOf(""" + // SHOW TEMPORARY PARTITIONS + // FROM ${tableName} + // WHERE PartitionName = "p6" + // """, + // exist, 60, "sql")) + + // sql "INSERT INTO ${tableName} TEMPORARY PARTITION (p6) VALUES (2, 150)" + + // assertTrue(checkShowTimesOf(""" + // SELECT * + // FROM ${tableName} + // TEMPORARY PARTITION (p6) + // WHERE id = 150 + // """, + // exist, 60, "sql")) + + // logger.info("=== Replace partition p3 by p6, with tmp partition name ===") + + // assertTrue(checkShowTimesOf(""" + // SELECT * + // FROM ${tableName} + // WHERE id = 150 + // """, + // notExist, 60, "target")) + + // sql """ALTER TABLE ${tableName} REPLACE PARTITION (p3) WITH TEMPORARY PARTITION (p6) + // PROPERTIES ( + // "use_temp_partition_name" = "true" + // ) + // """ + + // assertTrue(checkShowTimesOf(""" + // SELECT * + // FROM ${tableName} + // WHERE id = 150 + // """, + // exist, 60, "target")) + +// // for non strict range +// logger.info("=== Add temp partition p7 ===") + +// sql """ +// ALTER TABLE ${tableName} ADD TEMPORARY PARTITION p7 VALUES [("0"), ("200")) +// """ + +// assertTrue(checkShowTimesOf(""" +// SHOW TEMPORARY PARTITIONS +// FROM ${tableName} +// WHERE PartitionName = "p7" +// """, +// exist, 60, "sql")) + +// sql "INSERT INTO ${tableName} TEMPORARY PARTITION (p7) VALUES (1, 60), (2, 160)" + +// assertTrue(checkShowTimesOf(""" +// SELECT * +// FROM ${tableName} +// TEMPORARY PARTITION (p7) +// WHERE id = 60 +// """, +// exist, 60, "sql")) + +// logger.info("=== Replace partition p2,p6 by p7 ===") + +// assertTrue(checkShowTimesOf(""" +// SELECT * +// FROM ${tableName} +// WHERE id = 60 +// """, +// notExist, 60, "target")) + +// sql """ALTER TABLE ${tableName} REPLACE PARTITION (p2,p6) WITH TEMPORARY PARTITION (p7) +// PROPERTIES( +// "strict_range" = "false" +// ) +// """ + +// assertTrue(checkShowTimesOf(""" +// SELECT * +// FROM ${tableName} +// WHERE id = 60 +// """, +// exist, 60, "target")) + +} +