diff --git a/pkg/ccr/job.go b/pkg/ccr/job.go index b5f59ec8..35c50413 100644 --- a/pkg/ccr/job.go +++ b/pkg/ccr/job.go @@ -80,6 +80,7 @@ var ( featureSeperatedHandles bool featureEnableSnapshotCompress bool featureOverrideReplicationNumInternal bool + featureResyncOnTableRecreate bool flagBinlogBatchSize int64 @@ -127,6 +128,8 @@ func init() { "enable snapshot compress") flag.BoolVar(&featureOverrideReplicationNumInternal, "feature_override_replication_num", true, "enable override replication_num for downstream cluster") + flag.BoolVar(&featureResyncOnTableRecreate, "feature_resync_on_table_recreate", false, + "enable auto resync when upstream table is dropped and recreated with the same name") flag.Int64Var(&flagBinlogBatchSize, "binlog_batch_size", 16, "the max num of binlogs to get in a batch") } @@ -1005,6 +1008,47 @@ func (j *Job) fullSync() error { return nil } case TableSync: + // Step 1.2.1: For table sync, check if the source table still exists before creating snapshot. + // This handles the case where the table is dropped again during full sync. + // Reuse checkTableExistsForTableSync to check table status. + tableExists, tableIdChanged, newTableId, err := j.checkTableExistsForTableSync() + if err != nil { + log.Warnf("check table exists failed during full sync, will retry, err: %+v", err) + return nil + } + + if !tableExists { + // Table has been dropped during full sync, wait for recreate (regardless of feature flag) + // TableDroppedFlag already set by checkTableExistsForTableSync + if featureResyncOnTableRecreate { + log.Warnf("table %s.%s has been dropped during full sync, waiting for recreate", + j.Src.Database, j.Src.Table) + } else { + log.Warnf("table %s.%s has been dropped during full sync. Auto-resync disabled, waiting and retry. "+ + "Enable with: -feature_resync_on_table_recreate=true", + j.Src.Database, j.Src.Table) + } + return nil + } + + // Step 1.2.2: Check if table_id changed again (table dropped and recreated during full sync) + if tableIdChanged { + // Table was recreated again with a new table_id during full sync + log.Infof("table %s.%s was recreated again during full sync, old table_id: %d, new table_id: %d", + j.Src.Database, j.Src.Table, j.Src.TableId, newTableId) + oldTableId := j.Src.TableId + j.Src.TableId = newTableId + // Note: Dest.TableId will be updated after full sync completes + if err := j.persistJob(); err != nil { + // Rollback: restore old table_id so next retry can detect the change again + j.Src.TableId = oldTableId + log.Errorf("persist job failed after table recreate during full sync, will retry, err: %+v", err) + return err + } + // Continue with the new table_id, no need to restart full sync + // because we haven't created snapshot yet + } + backupTableList = append(backupTableList, j.Src.Table) default: return xerror.Errorf(xerror.Normal, "invalid sync type %s", j.SyncType) @@ -1470,6 +1514,13 @@ func (j *Job) fullSync() error { return err } + // Reset table dropped state after successful full sync + if j.progress.TableDroppedFlag { + log.Infof("table %s.%s full sync completed, resetting table dropped state", j.Src.Database, j.Src.Table) + j.progress.TableDroppedFlag = false + j.progress.TableDroppedTime = 0 + } + j.progress.PartitionCommitSeqMap = nil j.progress.TableCommitSeqMap = nil j.progress.TableMapping = nil @@ -1488,6 +1539,12 @@ func (j *Job) fullSync() error { } func (j *Job) persistJob() error { + if utils.HasJobFailpoint(j.Name, "persist_job_failed") { + log.Warnf("fail to persist job by failpoint") + utils.RemoveJobFailpoint(j.Name, "persist_job_failed") // Auto-remove after first trigger + return xerror.Errorf(xerror.Normal, "fail to persist job by failpoint") + } + data, err := json.Marshal(j) if err != nil { return xerror.Errorf(xerror.Normal, "marshal job failed, job: %v", j) @@ -3812,6 +3869,24 @@ func (j *Job) incrementalSyncInternal() error { case tstatus.TStatusCode_BINLOG_NOT_FOUND_DB: return xerror.Errorf(xerror.Normal, "can't found db") case tstatus.TStatusCode_BINLOG_NOT_FOUND_TABLE: + // Could mean: 1) table dropped, or 2) binlog just enabled, or 3) other transient issues + if j.SyncType == TableSync { + // Use unified handler to check and handle all scenarios + action, err := j.handleTableNotFoundForTableSync() + switch action { + case TableNotFoundWait: + return nil + case TableNotFoundTriggerFullSync: + return nil // State changed to TableFullSync, will be handled in next loop + case TableNotFoundBinlogNotReady: + log.Infof("table %s.%s exists but binlog not ready, waiting and retry", + j.Src.Database, j.Src.Table) + return nil + case TableNotFoundError: + return err + } + } + // For DBSync, this is an error (table dropped is handled by DROP_TABLE binlog) return xerror.Errorf(xerror.Normal, "can't found table") default: return xerror.Errorf(xerror.Normal, "invalid binlog status type: %v, msg: %s", @@ -3834,6 +3909,149 @@ func (j *Job) incrementalSyncInternal() error { return nil } +// checkTableExistsForTableSync checks if the source table exists and whether its table_id has changed. +// This is used for auto-recovery when a table is dropped and recreated with the same name. +// Returns: (tableExists bool, tableIdChanged bool, newTableId int64, error) +func (j *Job) checkTableExistsForTableSync() (bool, bool, int64, error) { + if j.SyncType != TableSync { + return false, false, 0, xerror.Errorf(xerror.Normal, "checkTableExistsForTableSync only works for table sync") + } + + // Check if table exists by name + exists, err := j.Src.CheckTableExists() + if err != nil { + log.Warnf("check table exists failed, err: %+v", err) + return false, false, 0, err + } + + if !exists { + // Table does not exist + if !j.progress.TableDroppedFlag { + j.progress.TableDroppedFlag = true + j.progress.TableDroppedTime = time.Now().Unix() + j.progress.Persist() + log.Infof("table %s.%s is dropped, waiting for recreate", j.Src.Database, j.Src.Table) + } + return false, false, 0, nil + } + + // Table exists, check if table_id changed + // Use UpdateTable to force fetch from upstream cluster, not from cache + tableMeta, err := j.srcMeta.UpdateTable(j.Src.Table, 0) + if err != nil { + log.Warnf("update table meta failed, err: %+v", err) + return true, false, 0, err + } + newTableId := tableMeta.Id + + if newTableId != j.Src.TableId { + // Table was recreated with a new table_id + var droppedDuration time.Duration + if j.progress.TableDroppedTime > 0 { + droppedDuration = time.Since(time.Unix(j.progress.TableDroppedTime, 0)) + } + log.Infof("table %s.%s was recreated, old table_id: %d, new table_id: %d, dropped duration: %v", + j.Src.Database, j.Src.Table, j.Src.TableId, newTableId, droppedDuration) + return true, true, newTableId, nil + } + + // Table exists with same table_id, reset drop state + if j.progress.TableDroppedFlag { + j.progress.TableDroppedFlag = false + j.progress.TableDroppedTime = 0 + j.progress.Persist() + } + + return true, false, newTableId, nil +} + +// TableNotFoundAction represents the action to take when table is not found +type TableNotFoundAction int + +const ( + // TableNotFoundWait means continue waiting for table to be recreated + TableNotFoundWait TableNotFoundAction = iota + // TableNotFoundTriggerFullSync means table was recreated, trigger full sync + TableNotFoundTriggerFullSync + // TableNotFoundBinlogNotReady means table exists but binlog not ready (just enabled) + TableNotFoundBinlogNotReady + // TableNotFoundError means an error occurred and job should be paused or failed + TableNotFoundError +) + +// handleTableNotFoundForTableSync handles table not found scenario for table sync. +// This is the unified handler for auto-recovery when upstream table is dropped. +// Note: This function should only be called when j.SyncType == TableSync. +// Returns: (action TableNotFoundAction, err error) +// - TableNotFoundWait: table dropped, continue waiting for recreate +// - TableNotFoundTriggerFullSync: table was recreated, trigger full sync +// - TableNotFoundBinlogNotReady: table exists but binlog not ready, wait and retry +// - TableNotFoundError: error occurred +func (j *Job) handleTableNotFoundForTableSync() (TableNotFoundAction, error) { + // Check if table exists and whether table_id changed + tableExists, tableIdChanged, newTableId, err := j.checkTableExistsForTableSync() + if err != nil { + log.Warnf("check table exists failed, will retry later, err: %+v", err) + return TableNotFoundWait, nil + } + + // Case 1: Table does not exist (dropped) + if !tableExists { + if !featureResyncOnTableRecreate { + // Just log warning and continue waiting + log.Warnf("upstream table %s.%s has been dropped (table not found). "+ + "Auto-resync is disabled. Options: "+ + "1) Enable auto-resync by setting -feature_resync_on_table_recreate=true and restart syncer; "+ + "2) Or manually pause/delete the job.", + j.Src.Database, j.Src.Table) + } + // TableDroppedFlag already set by checkTableExistsForTableSync + return TableNotFoundWait, nil + } + + // Case 2: Table was recreated with a new table_id + if tableIdChanged { + if !featureResyncOnTableRecreate { + // Feature disabled: log warning and continue waiting (same as Case 1) + log.Warnf("upstream table %s.%s was recreated with new table_id %d (old: %d), but auto-resync is disabled. "+ + "Options: "+ + "1) Enable auto-resync by setting -feature_resync_on_table_recreate=true and restart syncer; "+ + "2) Or manually pause/delete the job.", + j.Src.Database, j.Src.Table, newTableId, j.Src.TableId) + return TableNotFoundWait, nil + } + + log.Infof("table %s.%s was recreated with new table_id %d, triggering full sync to recover", + j.Src.Database, j.Src.Table, newTableId) + + // Persist job with new table_id + oldTableId := j.Src.TableId + j.Src.TableId = newTableId + if err := j.persistJob(); err != nil { + // Rollback: restore old table_id so next retry can detect the change again + j.Src.TableId = oldTableId + log.Errorf("persist job failed after table recreate, will retry, err: %+v", err) + return TableNotFoundError, err + } + // Reset progress to trigger full sync + // NOTE: Do NOT reset TableDroppedFlag here. It will be reset after full sync completes. + // This handles the case where table is dropped again during full sync. + j.progress.SyncState = TableFullSync + j.progress.PrevCommitSeq = 0 + j.progress.CommitSeq = 0 + j.progress.SubSyncState = BeginCreateSnapshot + j.progress.Persist() + + log.Infof("table %s.%s auto-recovery triggered, switching to full sync", j.Src.Database, j.Src.Table) + return TableNotFoundTriggerFullSync, nil + } + + // Case 3: Table exists with same table_id + // TableDroppedFlag already reset by checkTableExistsForTableSync + // This means binlog just not ready + return TableNotFoundBinlogNotReady, nil +} + func (j *Job) recoverJobProgress() error { // parse progress if progress, err := NewJobProgressFromJson(j.Name, j.db); err != nil { @@ -4527,7 +4745,7 @@ func (j *Job) GetSpecifiedBinlog(commitSeq int64) (*festruct.TBinlog, error) { case tstatus.TStatusCode_BINLOG_NOT_FOUND_DB: return nil, xerror.Errorf(xerror.Normal, "can't found db, commit seq: %d", commitSeq) case tstatus.TStatusCode_BINLOG_NOT_FOUND_TABLE: - return nil, xerror.Errorf(xerror.Normal, "can't found table, commit seq: %d", commitSeq) + return nil, xerror.Errorf(xerror.Normal, "can't found table binlog, commit seq: %d", commitSeq) default: return nil, xerror.Errorf(xerror.Normal, "invalid binlog status type: %v, msg: %s", status.StatusCode, utils.FirstOr(status.GetErrorMsgs(), "")) diff --git a/pkg/ccr/job_pipeline.go b/pkg/ccr/job_pipeline.go index c5eb9660..d6af65ac 100644 --- a/pkg/ccr/job_pipeline.go +++ b/pkg/ccr/job_pipeline.go @@ -17,6 +17,10 @@ import ( log "github.com/sirupsen/logrus" ) +// errTableRecreatedTriggerFullSync is returned when table is recreated with a new table_id, +// triggering the auto-recovery mechanism to switch to full sync. +var errTableRecreatedTriggerFullSync = xerror.NewWithoutStack(xerror.Normal, "table recreated, trigger full sync") + type TxnLink struct { // The previous txn link Prev <-chan any @@ -240,7 +244,11 @@ func (j *Job) pipelineSync() error { // fetch the binlogs, if the binlogs is empty. if !j.pipelineCtx.hasBinlogs() { - if err := j.getNextBinlogs(); err != nil { + if err := j.getNextBinlogs(); err == errTableRecreatedTriggerFullSync { + // Table was recreated, exit pipeline and trigger full sync + j.resetPipeline() + return nil + } else if err != nil { return err } hasMoreBinlogs = len(j.pipelineCtx.Binlogs) > 0 @@ -491,6 +499,24 @@ func (j *Job) getNextBinlogs() error { case tstatus.TStatusCode_BINLOG_NOT_FOUND_DB: return xerror.Errorf(xerror.Normal, "can't found db") case tstatus.TStatusCode_BINLOG_NOT_FOUND_TABLE: + // Could mean: 1) table dropped, or 2) binlog just enabled, or 3) other transient issues + if j.SyncType == TableSync { + // Use unified handler to check and handle all scenarios + action, err := j.handleTableNotFoundForTableSync() + switch action { + case TableNotFoundWait: + return nil + case TableNotFoundTriggerFullSync: + return errTableRecreatedTriggerFullSync + case TableNotFoundBinlogNotReady: + log.Infof("table %s.%s exists but binlog not ready, waiting and retry", + j.Src.Database, j.Src.Table) + return nil + case TableNotFoundError: + return err + } + } + // For DBSync, this is an error (table dropped is handled by DROP_TABLE binlog) return xerror.Errorf(xerror.Normal, "can't found table") default: return xerror.Errorf(xerror.Normal, "invalid binlog status type: %v, msg: %s", diff --git a/pkg/ccr/job_progress.go b/pkg/ccr/job_progress.go index f509a0fe..16902dbf 100644 --- a/pkg/ccr/job_progress.go +++ b/pkg/ccr/job_progress.go @@ -205,6 +205,10 @@ type JobProgress struct { TableAliases map[string]string `json:"table_aliases,omitempty"` PrevTxnId int64 `json:"prev_txn_id,omitempty"` + // For table level auto recover when table is dropped and recreated + TableDroppedFlag bool `json:"table_dropped_flag,omitempty"` + TableDroppedTime int64 `json:"table_dropped_time,omitempty"` // Unix timestamp in seconds + // The shadow indexes of the pending schema changes ShadowIndexes map[int64]int64 `json:"shadow_index_map,omitempty"` diff --git a/regression-test/suites/table_sync/auto_recovery/test_ts_table_recreate_no_recovery.groovy b/regression-test/suites/table_sync/auto_recovery/test_ts_table_recreate_no_recovery.groovy new file mode 100644 index 00000000..915ee71b --- /dev/null +++ b/regression-test/suites/table_sync/auto_recovery/test_ts_table_recreate_no_recovery.groovy @@ -0,0 +1,193 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_ts_table_recreate_no_recovery") { + def helper = new GroovyShell(new Binding(['suite': delegate])) + .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + + def tableName = "tbl_no_recover_" + helper.randomSuffix() + def insert_num = 5 + + def exist = { res -> Boolean + return res.size() != 0 + } + + sql "DROP TABLE IF EXISTS ${tableName}" + target_sql "DROP TABLE IF EXISTS ${tableName}" + + helper.enableDbBinlog() + + // Check if feature_resync_on_table_recreate is disabled + def jsonSlurper = new groovy.json.JsonSlurper() + def featureEnabled = false + httpTest { + endpoint helper.syncerAddress + uri "/features" + op "get" + check { code, body -> + def result = jsonSlurper.parseText(body) + def features = result.flags?.collectEntries { [(it.feature): it.value] } + featureEnabled = features?.get("feature_resync_on_table_recreate") == true + } + } + + if (featureEnabled) { + logger.info("Feature 'feature_resync_on_table_recreate' is enabled, skipping this test") + logger.info("This test validates behavior when auto-recovery is DISABLED") + return + } + logger.info("Feature 'feature_resync_on_table_recreate' is disabled (expected for this test)") + + sql """ + CREATE TABLE ${tableName} + ( + `id` INT, + `name` VARCHAR(50), + `value` INT + ) + ENGINE=OLAP + UNIQUE KEY(`id`) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "binlog.enable" = "true" + ) + """ + + helper.ccrJobDelete(tableName) + helper.ccrJobCreate(tableName) + + try { + assertTrue(helper.checkRestoreFinishTimesOf("${tableName}", 30)) + + logger.info("Test 1: initial sync") + for (int i = 0; i < insert_num; i++) { + sql """ INSERT INTO ${tableName} VALUES (${i}, 'init_${i}', ${i * 100}) """ + } + + assertTrue(helper.checkShowTimesOf("SHOW TABLES LIKE \"${tableName}\"", exist, 60, "sql")) + assertTrue(helper.checkShowTimesOf("SHOW TABLES LIKE \"${tableName}\"", exist, 60, "target")) + assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName}", insert_num, 30)) + + logger.info("Test 2: drop table") + sql "DROP TABLE ${tableName}" + sleep(5000) + + def targetCountAfterDrop = target_sql "SELECT COUNT(*) FROM ${tableName}" + assertEquals(insert_num, targetCountAfterDrop[0][0] as Integer) + + logger.info("Test 3: recreate table with new structure") + sql """ + CREATE TABLE ${tableName} + ( + `id` INT, + `name` VARCHAR(50), + `value` INT, + `extra` VARCHAR(100) DEFAULT 'new_column' + ) + ENGINE=OLAP + UNIQUE KEY(`id`) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "binlog.enable" = "true" + ) + """ + + for (int i = 10; i < 10 + insert_num; i++) { + sql """ INSERT INTO ${tableName} (id, name, value) VALUES (${i}, 'new_${i}', ${i * 100}) """ + } + + logger.info("Test 3.2: wait and verify NO auto-recovery (feature disabled)") + sleep(40000) + + // With feature disabled: + // - Old data remains (not cleared) + // - New data is NOT synced (table_id mismatch, syncer keeps waiting) + // - Syncer continuously logs "table not found" and waits + def targetOld = target_sql "SELECT COUNT(*) FROM ${tableName} WHERE id < 10" + def targetNew = target_sql "SELECT COUNT(*) FROM ${tableName} WHERE id >= 10" + def targetTotal = target_sql "SELECT COUNT(*) FROM ${tableName}" + + logger.info("Target after table recreate: total=${targetTotal[0][0]}, old=${targetOld[0][0]}, new=${targetNew[0][0]}") + + // Verify old data is still present (NOT cleared) + assertEquals(insert_num, targetOld[0][0] as Integer, "Old data should still exist when feature disabled") + + // Verify new data is NOT synced (table_id mismatch, syncer keeps requesting old table_id) + assertEquals(0, targetNew[0][0] as Integer, "New data should NOT be synced when feature disabled") + + // Total should be only old data + assertEquals(insert_num, targetTotal[0][0] as Integer, "Total should be only old data") + + logger.info("Test 4: verify job is still running and waiting") + // Job should remain in Running state, continuously waiting (not Paused) + // Syncer continuously logs "table not found" or similar warnings + + // Insert more data to upstream (new table_id) + logger.info("Test 4.1: insert incremental data to upstream") + for (int i = 20; i < 20 + insert_num; i++) { + sql """ INSERT INTO ${tableName} (id, name, value) VALUES (${i}, 'incr_${i}', ${i * 100}) """ + } + + sleep(10000) + + // Verify upstream has new data + def srcFinal = sql "SELECT COUNT(*) FROM ${tableName}" + logger.info("Upstream count: ${srcFinal[0][0]}") + assertEquals(insert_num * 2, srcFinal[0][0] as Integer, "Upstream should have new + incr data") + + // But downstream still only has old data (NO sync happening) + def destFinal = target_sql "SELECT COUNT(*) FROM ${tableName}" + logger.info("Downstream count: ${destFinal[0][0]}") + assertEquals(insert_num, destFinal[0][0] as Integer, "Downstream should still only have old data") + + // Verify data distribution + def srcOld = sql "SELECT COUNT(*) FROM ${tableName} WHERE id < 10" + def srcNew = sql "SELECT COUNT(*) FROM ${tableName} WHERE id >= 10 AND id < 20" + def srcIncr = sql "SELECT COUNT(*) FROM ${tableName} WHERE id >= 20" + + def destOld = target_sql "SELECT COUNT(*) FROM ${tableName} WHERE id < 10" + def destNew = target_sql "SELECT COUNT(*) FROM ${tableName} WHERE id >= 10" + + logger.info("Data distribution: src(old=${srcOld[0][0]}, new=${srcNew[0][0]}, incr=${srcIncr[0][0]}), dest(old=${destOld[0][0]}, new=${destNew[0][0]})") + + // Source has no old data (table was recreated) + assertEquals(0, srcOld[0][0] as Integer) + assertEquals(insert_num, srcNew[0][0] as Integer) + assertEquals(insert_num, srcIncr[0][0] as Integer) + + // Dest only has old data, new data is NOT synced + assertEquals(insert_num, destOld[0][0] as Integer, "Dest should have old data") + assertEquals(0, destNew[0][0] as Integer, "Dest should NOT have new data (feature disabled)") + + logger.info("Test passed: feature disabled behavior validated") + logger.info("- Old data retained on downstream") + logger.info("- New data NOT synced (table_id mismatch)") + logger.info("- Job keeps running and waiting (not paused)") + logger.info("- User must enable feature or manually handle the job") + + } finally { + try { + helper.ccrJobDelete(tableName) + } catch (Exception e) { + logger.warn("Cleanup failed: ${e.message}") + } + } +} + diff --git a/regression-test/suites/table_sync/auto_recovery/test_ts_table_recreate_recovery.groovy b/regression-test/suites/table_sync/auto_recovery/test_ts_table_recreate_recovery.groovy new file mode 100644 index 00000000..db5e929b --- /dev/null +++ b/regression-test/suites/table_sync/auto_recovery/test_ts_table_recreate_recovery.groovy @@ -0,0 +1,181 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_ts_table_recreate_recovery") { + def helper = new GroovyShell(new Binding(['suite': delegate])) + .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + + def tableName = "tbl_recreate_" + helper.randomSuffix() + def insert_num = 5 + + def exist = { res -> Boolean + return res.size() != 0 + } + + sql "DROP TABLE IF EXISTS ${tableName}" + target_sql "DROP TABLE IF EXISTS ${tableName}" + + helper.enableDbBinlog() + + // Check if feature_resync_on_table_recreate is enabled + def jsonSlurper = new groovy.json.JsonSlurper() + def featureEnabled = false + httpTest { + endpoint helper.syncerAddress + uri "/features" + op "get" + check { code, body -> + def result = jsonSlurper.parseText(body) + def features = result.flags?.collectEntries { [(it.feature): it.value] } + featureEnabled = features?.get("feature_resync_on_table_recreate") == true + } + } + + if (!featureEnabled) { + throw new IllegalStateException("Feature 'feature_resync_on_table_recreate' is disabled. Enable syncer with: -feature_resync_on_table_recreate=true") + } + logger.info("Feature 'feature_resync_on_table_recreate' is enabled") + + sql """ + CREATE TABLE ${tableName} + ( + `id` INT, + `name` VARCHAR(50), + `value` INT + ) + ENGINE=OLAP + UNIQUE KEY(`id`) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "binlog.enable" = "true" + ) + """ + + helper.ccrJobDelete(tableName) + helper.ccrJobCreate(tableName) + + try { + assertTrue(helper.checkRestoreFinishTimesOf("${tableName}", 30)) + + logger.info("Test 1: initial sync") + for (int i = 0; i < insert_num; i++) { + sql """ INSERT INTO ${tableName} VALUES (${i}, 'init_${i}', ${i * 100}) """ + } + + assertTrue(helper.checkShowTimesOf("SHOW TABLES LIKE \"${tableName}\"", exist, 60, "sql")) + assertTrue(helper.checkShowTimesOf("SHOW TABLES LIKE \"${tableName}\"", exist, 60, "target")) + assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName}", insert_num, 30)) + + logger.info("Test 2: drop table") + sql "DROP TABLE ${tableName}" + sleep(5000) + + def targetCountAfterDrop = target_sql "SELECT COUNT(*) FROM ${tableName}" + assertEquals(insert_num, targetCountAfterDrop[0][0] as Integer) + + logger.info("Test 3: recreate table with new structure") + sql """ + CREATE TABLE ${tableName} + ( + `id` INT, + `name` VARCHAR(50), + `value` INT, + `extra` VARCHAR(100) DEFAULT 'new_column' + ) + ENGINE=OLAP + UNIQUE KEY(`id`) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "binlog.enable" = "true" + ) + """ + + for (int i = 10; i < 10 + insert_num; i++) { + sql """ INSERT INTO ${tableName} (id, name, value) VALUES (${i}, 'new_${i}', ${i * 100}) """ + } + + logger.info("Test 3.2: wait for auto recovery") + sleep(40000) + assertTrue(helper.checkShowTimesOf("SHOW TABLES LIKE \"${tableName}\"", exist, 60, "target")) + + def recoverySuccess = false + for (int retry = 0; retry < 3; retry++) { + def newData = target_sql "SELECT COUNT(*) FROM ${tableName} WHERE id >= 10" + def oldData = target_sql "SELECT COUNT(*) FROM ${tableName} WHERE id < 10" + logger.info("Check #${retry + 1}: new=${newData[0][0]}, old=${oldData[0][0]}") + + if (newData[0][0] >= insert_num && oldData[0][0] == 0) { + recoverySuccess = true + break + } + if (retry < 2) sleep(20000) + } + + if (!recoverySuccess) { + def finalNew = target_sql "SELECT COUNT(*) FROM ${tableName} WHERE id >= 10" + def finalOld = target_sql "SELECT COUNT(*) FROM ${tableName} WHERE id < 10" + fail("Auto recovery failed: expected old=0, new=${insert_num}, but got old=${finalOld[0][0]}, new=${finalNew[0][0]}") + } + + logger.info("Test 3.3: final verification") + def finalTotal = target_sql "SELECT COUNT(*) FROM ${tableName}" + assertEquals(insert_num, finalTotal[0][0] as Integer) + + logger.info("Test 4: incremental sync after recovery") + for (int i = 20; i < 20 + insert_num; i++) { + sql """ INSERT INTO ${tableName} (id, name, value) VALUES (${i}, 'incr_${i}', ${i * 100}) """ + } + + sleep(5000) + + def checkIncremental = { res -> Boolean + def count = res.size() > 0 ? res[0][0] as Integer : 0 + return count >= insert_num + } + assertTrue(helper.checkShowTimesOf("SELECT COUNT(*) FROM ${tableName} WHERE id >= 20", checkIncremental, 30, "target")) + + def srcFinal = sql "SELECT COUNT(*) FROM ${tableName}" + def destFinal = target_sql "SELECT COUNT(*) FROM ${tableName}" + + logger.info("Final count: src=${srcFinal[0][0]}, dest=${destFinal[0][0]}") + assertEquals(srcFinal[0][0], destFinal[0][0]) + + def srcOld = sql "SELECT COUNT(*) FROM ${tableName} WHERE id < 10" + def srcNew = sql "SELECT COUNT(*) FROM ${tableName} WHERE id >= 10 AND id < 20" + def srcIncr = sql "SELECT COUNT(*) FROM ${tableName} WHERE id >= 20" + def destOld = target_sql "SELECT COUNT(*) FROM ${tableName} WHERE id < 10" + def destNew = target_sql "SELECT COUNT(*) FROM ${tableName} WHERE id >= 10 AND id < 20" + def destIncr = target_sql "SELECT COUNT(*) FROM ${tableName} WHERE id >= 20" + + logger.info("Data distribution: old(${srcOld[0][0]}/${destOld[0][0]}), new(${srcNew[0][0]}/${destNew[0][0]}), incr(${srcIncr[0][0]}/${destIncr[0][0]})") + + assertEquals(0, srcOld[0][0] as Integer) + assertEquals(0, destOld[0][0] as Integer) + assertEquals(srcNew[0][0], destNew[0][0]) + assertEquals(srcIncr[0][0], destIncr[0][0]) + + } finally { + try { + helper.ccrJobDelete(tableName) + } catch (Exception e) { + logger.warn("Cleanup failed: ${e.message}") + } + } +} \ No newline at end of file