Skip to content

Commit 4c25d02

Browse files
mysqlcdc: checkpoint at transaction boundary (#4396)
Ensure we only checkpoint at the transaction boundary as opposed to the row boundary. This way we don't risk recovering mid transaction in the event of a crash.
1 parent 6510f1c commit 4c25d02

3 files changed

Lines changed: 30 additions & 16 deletions

File tree

internal/impl/mysql/event.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@ const (
3535
// snapshot rows have been sent so the checkpoint can advance once all snapshot batches
3636
// are acknowledged
3737
messageOperationSnapshotComplete MessageOperation = "snapshot_complete"
38+
39+
// messageOperationXID is an internal sentinel emitted when a transaction commits
40+
// so readMessages can advance the checkpoint to a transaction boundary, ensuring
41+
// we never resume mid-transaction (which would miss the TABLE_MAP_EVENT).
42+
messageOperationXID MessageOperation = "xid"
3843
)
3944

4045
// MessageEvent represents a message from mysql cdc plugin

internal/impl/mysql/input_mysql_stream.go

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -777,6 +777,11 @@ func prepSnapshotScannerAndMappers(cols []*sql.ColumnType) (values []any, mapper
777777

778778
func (i *mysqlStreamInput) readMessages(ctx context.Context) error {
779779
var nextTimedBatchChan <-chan time.Time
780+
// latestXIDPos tracks the most recently committed transaction boundary.
781+
// Checkpoints only advance to XID positions so that on restart canal.RunFrom
782+
// always resumes at the start of a new transaction, ensuring TABLE_MAP_EVENTs
783+
// are received before any row events.
784+
var latestXIDPos *position
780785
for {
781786
select {
782787
case <-ctx.Done():
@@ -788,17 +793,22 @@ func (i *mysqlStreamInput) readMessages(ctx context.Context) error {
788793
return fmt.Errorf("timed flush batch error: %w", err)
789794
}
790795

791-
if err := i.flushBatch(ctx, i.cp, flushedBatch); err != nil {
796+
if err := i.flushBatch(ctx, i.cp, flushedBatch, latestXIDPos); err != nil {
792797
return fmt.Errorf("flushing periodic batch: %w", err)
793798
}
794799
case me := <-i.rawMessageEvents:
800+
if me.Operation == messageOperationXID {
801+
latestXIDPos = me.Position
802+
continue
803+
}
804+
795805
if me.Operation == messageOperationSnapshotComplete {
796806
// Flush any remaining messages before post snapshot checkpoint
797807
flushedBatch, err := i.batchPolicy.Flush(ctx)
798808
if err != nil {
799809
return fmt.Errorf("flushing snapshot completion batch: %w", err)
800810
}
801-
if err := i.flushBatch(ctx, i.cp, flushedBatch); err != nil {
811+
if err := i.flushBatch(ctx, i.cp, flushedBatch, latestXIDPos); err != nil {
802812
return fmt.Errorf("flushing snapshot completion batch: %w", err)
803813
}
804814

@@ -841,7 +851,7 @@ func (i *mysqlStreamInput) readMessages(ctx context.Context) error {
841851
if err != nil {
842852
return fmt.Errorf("flush batch error: %w", err)
843853
}
844-
if err := i.flushBatch(ctx, i.cp, flushedBatch); err != nil {
854+
if err := i.flushBatch(ctx, i.cp, flushedBatch, latestXIDPos); err != nil {
845855
return fmt.Errorf("flushing batch: %w", err)
846856
}
847857
} else {
@@ -858,23 +868,13 @@ func (i *mysqlStreamInput) flushBatch(
858868
ctx context.Context,
859869
checkpointer *checkpoint.Capped[*position],
860870
batch service.MessageBatch,
871+
checkpointPos *position,
861872
) error {
862873
if len(batch) == 0 {
863874
return nil
864875
}
865876

866-
lastMsg := batch[len(batch)-1]
867-
strPosition, ok := lastMsg.MetaGet("binlog_position")
868-
var binLogPos *position
869-
if ok {
870-
pos, err := parseBinlogPosition(strPosition)
871-
if err != nil {
872-
return err
873-
}
874-
binLogPos = &pos
875-
}
876-
877-
resolveFn, err := checkpointer.Track(ctx, binLogPos, int64(len(batch)))
877+
resolveFn, err := checkpointer.Track(ctx, checkpointPos, int64(len(batch)))
878878
if err != nil {
879879
return fmt.Errorf("tracking checkpoint for batch: %w", err)
880880
}
@@ -987,6 +987,15 @@ func (i *mysqlStreamInput) OnRotate(_ *replication.EventHeader, re *replication.
987987
return nil
988988
}
989989

990+
func (i *mysqlStreamInput) OnXID(_ *replication.EventHeader, nextPos gomysql.Position) error {
991+
select {
992+
case i.rawMessageEvents <- MessageEvent{Operation: messageOperationXID, Position: &nextPos}:
993+
case <-i.shutSig.SoftStopChan():
994+
return context.Canceled
995+
}
996+
return nil
997+
}
998+
990999
// OnTableChanged is called when a table is created, altered, renamed, or dropped.
9911000
// We invalidate the cached schema so it will be re-extracted on the next row event.
9921001
func (i *mysqlStreamInput) OnTableChanged(_ *replication.EventHeader, schema, table string) error {

internal/impl/mysql/integration_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ file:
207207
assert.Eventually(t, func() bool {
208208
outBatchMut.Lock()
209209
defer outBatchMut.Unlock()
210-
return len(outBatches) == 1000
210+
return len(outBatches) >= 1000
211211
}, time.Minute*5, time.Millisecond*100)
212212

213213
require.NoError(t, streamOut.StopWithin(time.Second*10))

0 commit comments

Comments
 (0)