@@ -2609,6 +2609,46 @@ func (j *Job) handleRecoverInfoRecord(commitSeq int64, recoverInfo *record.Recov
26092609 return j .newPartialSnapshot (recoverInfo .TableId , recoverInfo .TableName , nil , true )
26102610}
26112611
2612+ func (j * Job ) handleRestoreInfo (binlog * festruct.TBinlog ) error {
2613+ log .Infof ("handle restore info binlog, prevCommitSeq: %d, commitSeq: %d" ,
2614+ j .progress .PrevCommitSeq , j .progress .CommitSeq )
2615+
2616+ data := binlog .GetData ()
2617+ restoreInfo , err := record .NewRestoreInfoFromJson (data )
2618+ if err != nil {
2619+ return err
2620+ }
2621+ return j .handleRestoreInfoRecord (binlog .GetCommitSeq (), restoreInfo )
2622+ }
2623+
2624+ func (j * Job ) handleRestoreInfoRecord (commitSeq int64 , restoreInfo * record.RestoreInfo ) error {
2625+ if len (restoreInfo .TableInfo ) != 1 {
2626+ // for both table and db sync take a full snapshot.
2627+ log .Warnf ("Lets do new snapshot" )
2628+ return j .newSnapshot (commitSeq )
2629+ }
2630+
2631+ if len (restoreInfo .TableInfo ) == 1 {
2632+ for tableId , tableName := range restoreInfo .TableInfo {
2633+ switch j .SyncType {
2634+ case TableSync :
2635+ log .Warnf ("full snapshot, table:%d and name:%s" ,
2636+ tableId , tableName )
2637+ return j .newSnapshot (commitSeq )
2638+ case DBSync :
2639+ log .Warnf ("new partial snapshot, table:%d and name:%s" ,
2640+ tableId , tableName )
2641+ replace := true // replace the old data to avoid blocking reading
2642+ return j .newPartialSnapshot (tableId , tableName , nil , replace )
2643+ default :
2644+ break
2645+ }
2646+ }
2647+ }
2648+ //This is unreachable.
2649+ return nil
2650+ }
2651+
26122652func (j * Job ) handleBarrier (binlog * festruct.TBinlog ) error {
26132653 data := binlog .GetData ()
26142654 barrierLog , err := record .NewBarrierLogFromJson (data )
@@ -2693,6 +2733,12 @@ func (j *Job) handleBarrier(binlog *festruct.TBinlog) error {
26932733 return err
26942734 }
26952735 return j .handleRecoverInfoRecord (commitSeq , recoverInfo )
2736+ case festruct .TBinlogType_RESTORE_INFO :
2737+ restoreInfo , err := record .NewRestoreInfoFromJson (barrierLog .Binlog )
2738+ if err != nil {
2739+ return err
2740+ }
2741+ return j .handleRestoreInfoRecord (commitSeq , restoreInfo )
26962742 case festruct .TBinlogType_BARRIER :
26972743 log .Info ("handle barrier binlog, ignore it" )
26982744 default :
@@ -2807,6 +2853,8 @@ func (j *Job) handleBinlog(binlog *festruct.TBinlog) error {
28072853 return j .handleDropRollup (binlog )
28082854 case festruct .TBinlogType_RECOVER_INFO :
28092855 return j .handleRecoverInfo (binlog )
2856+ case festruct .TBinlogType_RESTORE_INFO :
2857+ return j .handleRestoreInfo (binlog )
28102858 default :
28112859 return xerror .Errorf (xerror .Normal , "unknown binlog type: %v" , binlog .GetType ())
28122860 }
0 commit comments