From 49b0bfd76a01dbfe9126301a64a7c15925ee669d Mon Sep 17 00:00:00 2001 From: w41ter Date: Fri, 29 Nov 2024 07:12:16 +0000 Subject: [PATCH 1/2] Support replace table in table sync --- pkg/ccr/job.go | 28 +++-- regression-test/common/helper.groovy | 2 +- .../replace/test_cts_fullsync_replace.groovy | 104 +++++++++++++++ .../test_cts_tbl_alter_replace.groovy | 111 ++++++++++++++++ .../table/replace/test_ts_tbl_replace.groovy | 119 ++++++++++++++++++ 5 files changed, 356 insertions(+), 8 deletions(-) create mode 100644 regression-test/suites/cross_ts/fullsync/replace/test_cts_fullsync_replace.groovy create mode 100644 regression-test/suites/cross_ts/table/alter_replace/test_cts_tbl_alter_replace.groovy create mode 100644 regression-test/suites/table_sync/table/replace/test_ts_tbl_replace.groovy diff --git a/pkg/ccr/job.go b/pkg/ccr/job.go index e3eb46c3..caa6795e 100644 --- a/pkg/ccr/job.go +++ b/pkg/ccr/job.go @@ -380,11 +380,14 @@ func (j *Job) handlePartialSyncTableNotFound() error { return nil } else if newTableName, err := j.srcMeta.GetTableNameById(tableId); err != nil { return err - } else { + } else if j.SyncType == DBSync { // The table might be renamed, so we need to update the table name. log.Warnf("force new partial snapshot, since table %d has renamed from %s to %s", tableId, table, newTableName) replace := true // replace the old data to avoid blocking reading return j.newPartialSnapshot(tableId, newTableName, nil, replace) + } else { + return xerror.Errorf(xerror.Normal, "table sync but table has renamed from %s to %s, table id %d", + table, newTableName, tableId) } } @@ -510,6 +513,10 @@ func (j *Job) partialSync() error { } else if backupObject.Id != tableId { log.Warnf("partial sync table %s id not match, force full sync. table id %d, backup object id %d", table, tableId, backupObject.Id) + if j.SyncType == TableSync { + log.Infof("reset src table id from %d to %d, table %s", j.Src.TableId, backupObject.Id, table) + j.Src.TableId = backupObject.Id + } return j.newSnapshot(j.progress.CommitSeq) } else if _, ok := tableCommitSeqMap[backupObject.Id]; !ok { return xerror.Errorf(xerror.Normal, "commit seq not found, table id %d, table name: %s", backupObject.Id, table) @@ -860,7 +867,15 @@ func (j *Job) fullSync() error { views := backupJobInfo.Views() if j.SyncType == TableSync { - if _, ok := tableCommitSeqMap[j.Src.TableId]; !ok { + if backupObject, ok := backupJobInfo.BackupObjects[j.Src.Table]; !ok { + return xerror.Errorf(xerror.Normal, "table %s not found in backup objects", j.Src.Table) + } else if backupObject.Id != j.Src.TableId { + // Might be the table has been replace. + log.Warnf("full sync table %s id not match, force full sync and reset table id from %d to %d", + j.Src.Table, j.Src.TableId, backupObject.Id) + j.Src.TableId = backupObject.Id + return j.newSnapshot(j.progress.CommitSeq) + } else if _, ok := tableCommitSeqMap[j.Src.TableId]; !ok { return xerror.Errorf(xerror.Normal, "table id %d, commit seq not found", j.Src.TableId) } } else { @@ -2304,12 +2319,11 @@ func (j *Job) handleReplaceTable(binlog *festruct.TBinlog) error { } func (j *Job) handleReplaceTableRecord(commitSeq int64, record *record.ReplaceTableRecord) error { - // don't support replace table when table sync - // - // replace table will change the table id, and it depends the new table exists in the dest cluster. if j.SyncType == TableSync { - log.Warnf("replace table is not supported when table sync, consider rebuilding this job instead") - return xerror.Errorf(xerror.Normal, "replace table is not supported when table sync, consider rebuilding this job instead") + log.Infof("replace table %s with fullsync in table sync, reset src table id from %d to %d, swap: %t", + record.OriginTableName, record.OriginTableId, record.NewTableId, record.SwapTable) + j.Src.TableId = record.NewTableId + return j.newSnapshot(commitSeq) } if j.isBinlogCommitted(record.OriginTableId, commitSeq) { diff --git a/regression-test/common/helper.groovy b/regression-test/common/helper.groovy index a139e0f4..294d0d5b 100644 --- a/regression-test/common/helper.groovy +++ b/regression-test/common/helper.groovy @@ -332,7 +332,7 @@ class Helper { throw "request failed, error msg: ${object.error_msg}" } logger.info("job progress: ${object.job_progress}") - result = jsonSlurper.parseText object.job_progress + result = object.job_progress } return result } diff --git a/regression-test/suites/cross_ts/fullsync/replace/test_cts_fullsync_replace.groovy b/regression-test/suites/cross_ts/fullsync/replace/test_cts_fullsync_replace.groovy new file mode 100644 index 00000000..f4411b80 --- /dev/null +++ b/regression-test/suites/cross_ts/fullsync/replace/test_cts_fullsync_replace.groovy @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_cts_fullsync_replace") { + def helper = new GroovyShell(new Binding(['suite': delegate])) + .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + + if (!helper.is_version_supported([30003, 20108, 20016])) { + // at least doris 3.0.3, 2.1.8 and doris 2.0.16 + def version = helper.upstream_version() + logger.info("skip this suite because version is not supported, upstream version ${version}") + return + } + + logger.info("replace part and replace table without swap") + + def oldTableName = "tbl_old_" + helper.randomSuffix() + def newTableName = "tbl_new_" + helper.randomSuffix() + + def exist = { res -> Boolean + return res.size() != 0 + } + def notExist = { res -> Boolean + return res.size() == 0 + } + + logger.info("=== Create both table ===") + sql """ + CREATE TABLE if NOT EXISTS ${oldTableName} + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + PARTITION BY RANGE(`id`) + ( + PARTITION `p1` VALUES LESS THAN ("0"), + PARTITION `p2` VALUES LESS THAN ("100"), + PARTITION `p3` VALUES LESS THAN ("200"), + PARTITION `p4` VALUES LESS THAN ("300"), + PARTITION `p5` VALUES LESS THAN ("1000") + ) + DISTRIBUTED BY HASH(id) BUCKETS AUTO + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "binlog.enable" = "true" + ) + """ + sql """ + CREATE TABLE if NOT EXISTS ${newTableName} + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + PARTITION BY RANGE(`id`) + ( + PARTITION `p100` VALUES LESS THAN ("1000") + ) + DISTRIBUTED BY HASH(id) BUCKETS AUTO + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "binlog.enable" = "true" + ) + """ + + helper.ccrJobDelete(oldTableName) + helper.ccrJobCreate(oldTableName) + + assertTrue(helper.checkRestoreFinishTimesOf("${oldTableName}", 60)) + + sql "INSERT INTO ${oldTableName} VALUES (1, 100), (100, 1), (2, 200), (200, 2)" + assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${oldTableName}", 4, 60)) + + logger.info(" ==== replace without swap and trigger fullsync ==== ") + helper.ccrJobPause(oldTableName) + + sql "INSERT INTO ${newTableName} VALUES (3, 300), (300, 3)" // o:n, 4:2 + sql "INSERT INTO ${oldTableName} VALUES (3, 300), (300, 3)" // o:n, 6:2 + sql "ALTER TABLE ${oldTableName} REPLACE WITH TABLE ${newTableName} PROPERTIES (\"swap\"=\"false\")" // o:n, 2:6 + sql "INSERT INTO ${oldTableName} VALUES (4, 400)" // o:n, 3:6 + + helper.force_fullsync(oldTableName) + helper.ccrJobResume(oldTableName) + + assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${oldTableName}", 3, 60)) + +} diff --git a/regression-test/suites/cross_ts/table/alter_replace/test_cts_tbl_alter_replace.groovy b/regression-test/suites/cross_ts/table/alter_replace/test_cts_tbl_alter_replace.groovy new file mode 100644 index 00000000..8432c188 --- /dev/null +++ b/regression-test/suites/cross_ts/table/alter_replace/test_cts_tbl_alter_replace.groovy @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_cts_tbl_alter_replace") { + def helper = new GroovyShell(new Binding(['suite': delegate])) + .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + + if (!helper.is_version_supported([30003, 20108, 20016])) { + // at least doris 3.0.3, 2.1.8 and doris 2.0.16 + def version = helper.upstream_version() + logger.info("skip this suite because version is not supported, upstream version ${version}") + return + } + + def oldTableName = "tbl_old_" + helper.randomSuffix() + def newTableName = "tbl_new_" + helper.randomSuffix() + + def exist = { res -> Boolean + return res.size() != 0 + } + def notExist = { res -> Boolean + return res.size() == 0 + } + + logger.info("=== Create both table ===") + sql """ + CREATE TABLE if NOT EXISTS ${oldTableName} + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + PARTITION BY RANGE(`id`) + ( + PARTITION `p1` VALUES LESS THAN ("0"), + PARTITION `p2` VALUES LESS THAN ("100"), + PARTITION `p3` VALUES LESS THAN ("200"), + PARTITION `p4` VALUES LESS THAN ("300"), + PARTITION `p5` VALUES LESS THAN ("1000") + ) + DISTRIBUTED BY HASH(id) BUCKETS AUTO + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "binlog.enable" = "true" + ) + """ + sql """ + CREATE TABLE if NOT EXISTS ${newTableName} + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + PARTITION BY RANGE(`id`) + ( + PARTITION `p100` VALUES LESS THAN ("1000") + ) + DISTRIBUTED BY HASH(id) BUCKETS AUTO + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "binlog.enable" = "true" + ) + """ + + helper.ccrJobDelete(oldTableName) + helper.ccrJobCreate(oldTableName) + + assertTrue(helper.checkRestoreFinishTimesOf("${oldTableName}", 60)) + + sql "INSERT INTO ${oldTableName} VALUES (1, 100), (100, 1), (2, 200), (200, 2)" + assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${oldTableName}", 4, 60)) + + logger.info(" ==== replace without swap and trigger fullsync ==== ") + helper.ccrJobPause(oldTableName) + + sql "ALTER TABLE ${oldTableName} ADD COLUMN `new_col` INT KEY DEFAULT \"0\"" + + assertTrue(helper.checkShowTimesOf(""" + SHOW ALTER TABLE COLUMN + FROM ${context.dbName} + WHERE TableName = "${oldTableName}" AND State = "FINISHED" + """, + exist, 30)) + + sql "INSERT INTO ${newTableName} VALUES (3, 300), (300, 3)" // o:n, 4:2 + sql "INSERT INTO ${oldTableName} VALUES (3, 300, 3), (300, 3, 3)" // o:n, 6:2 + sql "ALTER TABLE ${oldTableName} REPLACE WITH TABLE ${newTableName} PROPERTIES (\"swap\"=\"false\")" // o:n, 2:6 + sql "INSERT INTO ${oldTableName} VALUES (4, 400)" // o:n, 3:6 + + helper.ccrJobResume(oldTableName) + + assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${oldTableName}", 3, 60)) + +} + diff --git a/regression-test/suites/table_sync/table/replace/test_ts_tbl_replace.groovy b/regression-test/suites/table_sync/table/replace/test_ts_tbl_replace.groovy new file mode 100644 index 00000000..15417be9 --- /dev/null +++ b/regression-test/suites/table_sync/table/replace/test_ts_tbl_replace.groovy @@ -0,0 +1,119 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_ts_tbl_replace") { + def helper = new GroovyShell(new Binding(['suite': delegate])) + .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + + if (!helper.is_version_supported([30003, 20108, 20016])) { + // at least doris 3.0.3, 2.1.8 and doris 2.0.16 + def version = helper.upstream_version() + logger.info("skip this suite because version is not supported, upstream version ${version}") + return + } + + def oldTableName = "tbl_old_" + helper.randomSuffix() + def newTableName = "tbl_new_" + helper.randomSuffix() + + def exist = { res -> Boolean + return res.size() != 0 + } + def notExist = { res -> Boolean + return res.size() == 0 + } + + logger.info("=== Create both table ===") + sql """ + CREATE TABLE if NOT EXISTS ${oldTableName} + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + PARTITION BY RANGE(`id`) + ( + PARTITION `p1` VALUES LESS THAN ("0"), + PARTITION `p2` VALUES LESS THAN ("100"), + PARTITION `p3` VALUES LESS THAN ("200"), + PARTITION `p4` VALUES LESS THAN ("300"), + PARTITION `p5` VALUES LESS THAN ("1000") + ) + DISTRIBUTED BY HASH(id) BUCKETS AUTO + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "binlog.enable" = "true" + ) + """ + sql """ + CREATE TABLE if NOT EXISTS ${newTableName} + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + PARTITION BY RANGE(`id`) + ( + PARTITION `p1` VALUES LESS THAN ("0"), + PARTITION `p2` VALUES LESS THAN ("100"), + PARTITION `p3` VALUES LESS THAN ("200"), + PARTITION `p4` VALUES LESS THAN ("300"), + PARTITION `p5` VALUES LESS THAN ("1000") + ) + DISTRIBUTED BY HASH(id) BUCKETS AUTO + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "binlog.enable" = "true" + ) + """ + + helper.ccrJobDelete(oldTableName) + helper.ccrJobCreate(oldTableName) + + assertTrue(helper.checkRestoreFinishTimesOf("${oldTableName}", 60)) + + sql "INSERT INTO ${oldTableName} VALUES (1, 100), (100, 1), (2, 200), (200, 2)" + assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${oldTableName}", 4, 60)) + + logger.info(" ==== replace with swap ==== ") + helper.ccrJobPause(oldTableName) + + sql "INSERT INTO ${newTableName} VALUES (3, 300), (300, 3)" // o:n, 4:2 + sql "INSERT INTO ${oldTableName} VALUES (3, 300), (300, 3)" // o:n, 6:2 + sql "ALTER TABLE ${oldTableName} REPLACE WITH TABLE ${newTableName} PROPERTIES (\"swap\"=\"true\")" // o:n, 2:6 + sql "INSERT INTO ${oldTableName} VALUES (4, 400)" // o:n, 3:6 + sql "INSERT INTO ${newTableName} VALUES (4, 400)" // o:n, 3:7 + + helper.ccrJobResume(oldTableName) + + assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${oldTableName}", 3, 60)) + + logger.info(" ==== replace without swap ==== ") + + helper.ccrJobPause(oldTableName) + + sql "INSERT INTO ${newTableName} VALUES (5, 500), (500, 5)" // o:n, 3:9 + sql "INSERT INTO ${oldTableName} VALUES (5, 500), (500, 5)" // o:n, 5:9 + sql "ALTER TABLE ${oldTableName} REPLACE WITH TABLE ${newTableName} PROPERTIES (\"swap\"=\"false\")" // o:n, 9:0 + sql "INSERT INTO ${oldTableName} VALUES (6, 600)" // o:n, 10:0 + + helper.ccrJobResume(oldTableName) + + assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${oldTableName}", 10, 60)) +} + From 4f55a18361a2c0069a2190c70306045a3eb91ee7 Mon Sep 17 00:00:00 2001 From: w41ter Date: Fri, 29 Nov 2024 15:16:07 +0800 Subject: [PATCH 2/2] fixup --- pkg/ccr/job.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/ccr/job.go b/pkg/ccr/job.go index caa6795e..e2001f32 100644 --- a/pkg/ccr/job.go +++ b/pkg/ccr/job.go @@ -968,10 +968,10 @@ func (j *Job) fullSync() error { } if len(j.progress.TableAliases) > 0 { tableRefs = make([]*festruct.TTableRef, 0) - viewMap := make(map[string]interface{}) + viewMap := make(map[string]interface{}) for _, viewName := range inMemoryData.Views { log.Debugf("fullsync alias with view ref %s", viewName) - viewMap[viewName] = nil + viewMap[viewName] = nil tableRef := &festruct.TTableRef{Table: utils.ThriftValueWrapper(viewName)} tableRefs = append(tableRefs, tableRef) } @@ -980,9 +980,9 @@ func (j *Job) fullSync() error { log.Debugf("fullsync alias skip table ref %s because it has alias %s", tableName, alias) continue } - if _, ok := viewMap[tableName]; ok { - continue - } + if _, ok := viewMap[tableName]; ok { + continue + } log.Debugf("fullsync alias with table ref %s", tableName) tableRef := &festruct.TTableRef{Table: utils.ThriftValueWrapper(tableName)} tableRefs = append(tableRefs, tableRef)