Skip to content

Commit da137c2

Browse files
committed
feat: auto-recover table sync when upstream table recreated
1 parent 866189c commit da137c2

4 files changed

Lines changed: 427 additions & 2 deletions

File tree

pkg/ccr/job.go

Lines changed: 215 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ var (
8080
featureSeperatedHandles bool
8181
featureEnableSnapshotCompress bool
8282
featureOverrideReplicationNumInternal bool
83+
featureResyncOnTableRecreate bool
8384

8485
flagBinlogBatchSize int64
8586

@@ -127,6 +128,8 @@ func init() {
127128
"enable snapshot compress")
128129
flag.BoolVar(&featureOverrideReplicationNumInternal, "feature_override_replication_num", true,
129130
"enable override replication_num for downstream cluster")
131+
flag.BoolVar(&featureResyncOnTableRecreate, "feature_resync_on_table_recreate", false,
132+
"enable auto resync when upstream table is dropped and recreated with the same name")
130133

131134
flag.Int64Var(&flagBinlogBatchSize, "binlog_batch_size", 16, "the max num of binlogs to get in a batch")
132135
}
@@ -1025,6 +1028,50 @@ func (j *Job) fullSync() error {
10251028
return nil
10261029
}
10271030
case TableSync:
1031+
// Step 1.2.1: For table sync, check if the source table still exists before creating snapshot.
1032+
// This handles the case where the table is dropped again during full sync.
1033+
// Reuse checkTableExistsForTableSync to check table status.
1034+
tableExists, tableIdChanged, newTableId, err := j.checkTableExistsForTableSync()
1035+
if err != nil {
1036+
log.Warnf("check table exists failed during full sync, will retry, err: %+v", err)
1037+
return nil
1038+
}
1039+
1040+
if !tableExists {
1041+
// Table has been dropped during full sync
1042+
if featureResyncOnTableRecreate {
1043+
// Wait for table to be recreated (TableDroppedFlag already set by checkTableExistsForTableSync)
1044+
log.Warnf("table %s.%s has been dropped during full sync, waiting for recreate",
1045+
j.Src.Database, j.Src.Table)
1046+
return nil
1047+
} else {
1048+
// Pause job
1049+
msg := fmt.Sprintf("table %s.%s has been dropped during full sync. Job will be paused.",
1050+
j.Src.Database, j.Src.Table)
1051+
log.Warnf(msg)
1052+
j.progress.SetFullSyncInfo(msg)
1053+
j.progress.Persist()
1054+
if err := j.Pause(); err != nil {
1055+
log.Errorf("failed to pause job: %+v", err)
1056+
}
1057+
return xerror.Errorf(xerror.Normal, msg)
1058+
}
1059+
}
1060+
1061+
// Step 1.2.2: Check if table_id changed again (table dropped and recreated during full sync)
1062+
if tableIdChanged {
1063+
// Table was recreated again with a new table_id during full sync
1064+
log.Infof("table %s.%s was recreated again during full sync, old table_id: %d, new table_id: %d",
1065+
j.Src.Database, j.Src.Table, j.Src.TableId, newTableId)
1066+
j.Src.TableId = newTableId
1067+
// Note: Dest.TableId will be updated after full sync completes
1068+
if err := j.persistJob(); err != nil {
1069+
log.Warnf("persist job failed after table recreate during full sync, err: %+v", err)
1070+
}
1071+
// Continue with the new table_id, no need to restart full sync
1072+
// because we haven't created snapshot yet
1073+
}
1074+
10281075
backupTableList = append(backupTableList, j.Src.Table)
10291076
default:
10301077
return xerror.Errorf(xerror.Normal, "invalid sync type %s", j.SyncType)
@@ -1503,6 +1550,13 @@ func (j *Job) fullSync() error {
15031550
return err
15041551
}
15051552

1553+
// Reset table dropped state after successful full sync
1554+
if j.progress.TableDroppedFlag {
1555+
log.Infof("table %s.%s full sync completed, resetting table dropped state", j.Src.Database, j.Src.Table)
1556+
j.progress.TableDroppedFlag = false
1557+
j.progress.TableDroppedTime = 0
1558+
}
1559+
15061560
j.progress.PartitionCommitSeqMap = nil
15071561
j.progress.TableCommitSeqMap = nil
15081562
j.progress.TableMapping = nil
@@ -3852,6 +3906,24 @@ func (j *Job) incrementalSyncInternal() error {
38523906
case tstatus.TStatusCode_BINLOG_NOT_FOUND_DB:
38533907
return xerror.Errorf(xerror.Normal, "can't found db")
38543908
case tstatus.TStatusCode_BINLOG_NOT_FOUND_TABLE:
3909+
// Could mean: 1) table dropped, or 2) binlog just enabled, or 3) other transient issues
3910+
if j.SyncType == TableSync {
3911+
// Use unified handler to check and handle all scenarios
3912+
action, err := j.handleTableNotFoundForTableSync()
3913+
switch action {
3914+
case TableNotFoundWait:
3915+
return nil
3916+
case TableNotFoundTriggerFullSync:
3917+
return nil // State changed to TableFullSync, will be handled in next loop
3918+
case TableNotFoundBinlogNotReady:
3919+
log.Infof("table %s.%s exists but binlog not ready, waiting and retry",
3920+
j.Src.Database, j.Src.Table)
3921+
return nil
3922+
case TableNotFoundError:
3923+
return err
3924+
}
3925+
}
3926+
// For DBSync, this is an error (table dropped is handled by DROP_TABLE binlog)
38553927
return xerror.Errorf(xerror.Normal, "can't found table")
38563928
default:
38573929
return xerror.Errorf(xerror.Normal, "invalid binlog status type: %v, msg: %s",
@@ -3874,6 +3946,148 @@ func (j *Job) incrementalSyncInternal() error {
38743946
return nil
38753947
}
38763948

3949+
// checkTableExistsForTableSync checks if the source table exists and whether its table_id has changed.
3950+
// This is used for auto-recovery when a table is dropped and recreated with the same name.
3951+
// Returns: (tableExists bool, tableIdChanged bool, newTableId int64, error)
3952+
func (j *Job) checkTableExistsForTableSync() (bool, bool, int64, error) {
3953+
if j.SyncType != TableSync {
3954+
return false, false, 0, xerror.Errorf(xerror.Normal, "checkTableExistsForTableSync only works for table sync")
3955+
}
3956+
3957+
// Check if table exists by name
3958+
exists, err := j.Src.CheckTableExists()
3959+
if err != nil {
3960+
log.Warnf("check table exists failed, err: %+v", err)
3961+
return false, false, 0, err
3962+
}
3963+
3964+
if !exists {
3965+
// Table does not exist
3966+
if !j.progress.TableDroppedFlag {
3967+
j.progress.TableDroppedFlag = true
3968+
j.progress.TableDroppedTime = time.Now().Unix()
3969+
j.progress.Persist()
3970+
log.Infof("table %s.%s is dropped, waiting for recreate", j.Src.Database, j.Src.Table)
3971+
}
3972+
return false, false, 0, nil
3973+
}
3974+
3975+
// Table exists, check if table_id changed
3976+
// Use UpdateTable to force fetch from upstream cluster, not from cache
3977+
tableMeta, err := j.srcMeta.UpdateTable(j.Src.Table, 0)
3978+
if err != nil {
3979+
log.Warnf("update table meta failed, err: %+v", err)
3980+
return true, false, 0, err
3981+
}
3982+
newTableId := tableMeta.Id
3983+
3984+
if newTableId != j.Src.TableId {
3985+
// Table was recreated with a new table_id
3986+
var droppedDuration time.Duration
3987+
if j.progress.TableDroppedTime > 0 {
3988+
droppedDuration = time.Since(time.Unix(j.progress.TableDroppedTime, 0))
3989+
}
3990+
log.Infof("table %s.%s was recreated, old table_id: %d, new table_id: %d, dropped duration: %v",
3991+
j.Src.Database, j.Src.Table, j.Src.TableId, newTableId, droppedDuration)
3992+
return true, true, newTableId, nil
3993+
}
3994+
3995+
// Table exists with same table_id, reset drop state
3996+
if j.progress.TableDroppedFlag {
3997+
j.progress.TableDroppedFlag = false
3998+
j.progress.TableDroppedTime = 0
3999+
j.progress.Persist()
4000+
}
4001+
4002+
return true, false, newTableId, nil
4003+
}
4004+
4005+
// TableNotFoundAction represents the action to take when table is not found
4006+
type TableNotFoundAction int
4007+
4008+
const (
4009+
// TableNotFoundWait means continue waiting for table to be recreated
4010+
TableNotFoundWait TableNotFoundAction = iota
4011+
// TableNotFoundTriggerFullSync means table was recreated, trigger full sync
4012+
TableNotFoundTriggerFullSync
4013+
// TableNotFoundBinlogNotReady means table exists but binlog not ready (just enabled)
4014+
TableNotFoundBinlogNotReady
4015+
// TableNotFoundError means an error occurred and job should be paused or failed
4016+
TableNotFoundError
4017+
)
4018+
4019+
// handleTableNotFoundForTableSync handles table not found scenario for table sync.
4020+
// This is the unified handler for auto-recovery when upstream table is dropped.
4021+
// Note: This function should only be called when j.SyncType == TableSync.
4022+
// Returns: (action TableNotFoundAction, err error)
4023+
// - TableNotFoundWait: table dropped, continue waiting for recreate
4024+
// - TableNotFoundTriggerFullSync: table was recreated, trigger full sync
4025+
// - TableNotFoundBinlogNotReady: table exists but binlog not ready, wait and retry
4026+
// - TableNotFoundError: error occurred
4027+
func (j *Job) handleTableNotFoundForTableSync() (TableNotFoundAction, error) {
4028+
// Check if table exists and whether table_id changed
4029+
tableExists, tableIdChanged, newTableId, err := j.checkTableExistsForTableSync()
4030+
if err != nil {
4031+
log.Warnf("check table exists failed, will retry later, err: %+v", err)
4032+
return TableNotFoundWait, nil
4033+
}
4034+
4035+
// Case 1: Table does not exist (dropped)
4036+
if !tableExists {
4037+
if !featureResyncOnTableRecreate {
4038+
// Just log warning and continue waiting
4039+
log.Warnf("upstream table %s.%s has been dropped (table not found). "+
4040+
"Auto-resync is disabled. Options: "+
4041+
"1) Enable auto-resync by setting -feature_resync_on_table_recreate=true and restart syncer; "+
4042+
"2) Or manually pause/delete the job.",
4043+
j.Src.Database, j.Src.Table)
4044+
}
4045+
// TableDroppedFlag already set by checkTableExistsForTableSync
4046+
return TableNotFoundWait, nil
4047+
}
4048+
4049+
// Case 2: Table was recreated with a new table_id
4050+
if tableIdChanged {
4051+
if !featureResyncOnTableRecreate {
4052+
// Feature disabled: log warning and continue waiting (same as Case 1)
4053+
log.Warnf("upstream table %s.%s was recreated with new table_id %d (old: %d), but auto-resync is disabled. "+
4054+
"Options: "+
4055+
"1) Enable auto-resync by setting -feature_resync_on_table_recreate=true and restart syncer; "+
4056+
"2) Or manually pause/delete the job.",
4057+
j.Src.Database, j.Src.Table, newTableId, j.Src.TableId)
4058+
return TableNotFoundWait, nil
4059+
}
4060+
4061+
log.Infof("table %s.%s was recreated with new table_id %d, triggering full sync to recover",
4062+
j.Src.Database, j.Src.Table, newTableId)
4063+
4064+
// Update table_id in Src spec only (Dest.TableId will be updated after full sync completes)
4065+
j.Src.TableId = newTableId
4066+
4067+
// Persist job with new table_id
4068+
if err := j.persistJob(); err != nil {
4069+
log.Warnf("persist job failed after table recreate, err: %+v", err)
4070+
}
4071+
4072+
// Reset progress to trigger full sync
4073+
// NOTE: Do NOT reset TableDroppedFlag here. It will be reset after full sync completes.
4074+
// This handles the case where table is dropped again during full sync.
4075+
j.progress.SyncState = TableFullSync
4076+
j.progress.PrevCommitSeq = 0
4077+
j.progress.CommitSeq = 0
4078+
j.progress.SubSyncState = BeginCreateSnapshot
4079+
j.progress.Persist()
4080+
4081+
log.Infof("table %s.%s auto-recovery triggered, switching to full sync", j.Src.Database, j.Src.Table)
4082+
return TableNotFoundTriggerFullSync, nil
4083+
}
4084+
4085+
// Case 3: Table exists with same table_id
4086+
// TableDroppedFlag already reset by checkTableExistsForTableSync
4087+
// This means binlog just not ready
4088+
return TableNotFoundBinlogNotReady, nil
4089+
}
4090+
38774091
func (j *Job) recoverJobProgress() error {
38784092
// parse progress
38794093
if progress, err := NewJobProgressFromJson(j.Name, j.db); err != nil {
@@ -4624,7 +4838,7 @@ func (j *Job) GetSpecifiedBinlog(commitSeq int64) (*festruct.TBinlog, error) {
46244838
case tstatus.TStatusCode_BINLOG_NOT_FOUND_DB:
46254839
return nil, xerror.Errorf(xerror.Normal, "can't found db, commit seq: %d", commitSeq)
46264840
case tstatus.TStatusCode_BINLOG_NOT_FOUND_TABLE:
4627-
return nil, xerror.Errorf(xerror.Normal, "can't found table, commit seq: %d", commitSeq)
4841+
return nil, xerror.Errorf(xerror.Normal, "can't found table binlog, commit seq: %d", commitSeq)
46284842
default:
46294843
return nil, xerror.Errorf(xerror.Normal, "invalid binlog status type: %v, msg: %s",
46304844
status.StatusCode, utils.FirstOr(status.GetErrorMsgs(), ""))

pkg/ccr/job_pipeline.go

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ import (
1717
log "github.com/sirupsen/logrus"
1818
)
1919

20+
// errTableRecreatedTriggerFullSync is returned when table is recreated with a new table_id,
21+
// triggering the auto-recovery mechanism to switch to full sync.
22+
var errTableRecreatedTriggerFullSync = xerror.NewWithoutStack(xerror.Normal, "table recreated, trigger full sync")
23+
2024
type TxnLink struct {
2125
// The previous txn link
2226
Prev <-chan any
@@ -240,7 +244,11 @@ func (j *Job) pipelineSync() error {
240244

241245
// fetch the binlogs, if the binlogs is empty.
242246
if !j.pipelineCtx.hasBinlogs() {
243-
if err := j.getNextBinlogs(); err != nil {
247+
if err := j.getNextBinlogs(); err == errTableRecreatedTriggerFullSync {
248+
// Table was recreated, exit pipeline and trigger full sync
249+
j.resetPipeline()
250+
return nil
251+
} else if err != nil {
244252
return err
245253
}
246254
hasMoreBinlogs = len(j.pipelineCtx.Binlogs) > 0
@@ -493,6 +501,24 @@ func (j *Job) getNextBinlogs() error {
493501
case tstatus.TStatusCode_BINLOG_NOT_FOUND_DB:
494502
return xerror.Errorf(xerror.Normal, "can't found db")
495503
case tstatus.TStatusCode_BINLOG_NOT_FOUND_TABLE:
504+
// Could mean: 1) table dropped, or 2) binlog just enabled, or 3) other transient issues
505+
if j.SyncType == TableSync {
506+
// Use unified handler to check and handle all scenarios
507+
action, err := j.handleTableNotFoundForTableSync()
508+
switch action {
509+
case TableNotFoundWait:
510+
return nil
511+
case TableNotFoundTriggerFullSync:
512+
return errTableRecreatedTriggerFullSync
513+
case TableNotFoundBinlogNotReady:
514+
log.Infof("table %s.%s exists but binlog not ready, waiting and retry",
515+
j.Src.Database, j.Src.Table)
516+
return nil
517+
case TableNotFoundError:
518+
return err
519+
}
520+
}
521+
// For DBSync, this is an error (table dropped is handled by DROP_TABLE binlog)
496522
return xerror.Errorf(xerror.Normal, "can't found table")
497523
default:
498524
return xerror.Errorf(xerror.Normal, "invalid binlog status type: %v, msg: %s",

pkg/ccr/job_progress.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,10 @@ type JobProgress struct {
207207
TableAliases map[string]string `json:"table_aliases,omitempty"`
208208
PrevTxnId int64 `json:"prev_txn_id,omitempty"`
209209

210+
// For table level auto recover when table is dropped and recreated
211+
TableDroppedFlag bool `json:"table_dropped_flag,omitempty"`
212+
TableDroppedTime int64 `json:"table_dropped_time,omitempty"` // Unix timestamp in seconds
213+
210214
// The shadow indexes of the pending schema changes
211215
ShadowIndexes map[int64]int64 `json:"shadow_index_map,omitempty"`
212216

0 commit comments

Comments
 (0)