Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,13 +336,20 @@ func (mgtr *Migrator) onChangelogHeartbeatEvent(dmlEntry *binlog.BinlogEntry) (e
heartbeatTime, err := time.Parse(time.RFC3339Nano, changelogHeartbeatString)
if err != nil {
return mgtr.migrationContext.Log.Errore(err)
} else {
mgtr.migrationContext.SetLastHeartbeatOnChangelogTime(heartbeatTime)
}
mgtr.migrationContext.SetLastHeartbeatOnChangelogTime(heartbeatTime)

// Route the coords bump through applyEventsQueue so it is ordered after
// any DMLs the streamer enqueued before this heartbeat.
coords := dmlEntry.Coordinates
var writeFunc tableWriteFunc = func() error {
mgtr.applier.CurrentCoordinatesMutex.Lock()
mgtr.applier.CurrentCoordinates = dmlEntry.Coordinates
mgtr.applier.CurrentCoordinates = coords
mgtr.applier.CurrentCoordinatesMutex.Unlock()
return nil
}
mgtr.applyEventsQueue <- newApplyEventStructByFunc(&writeFunc)
return nil
}

// abort stores the error, cancels the context, and logs the abort.
Expand Down
75 changes: 75 additions & 0 deletions go/logic/migrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,81 @@ func TestMigratorOnChangelogEvent(t *testing.T) {
})
}

// Regression: heartbeats must not advance applier.CurrentCoordinates past
// DMLs still sitting in applyEventsQueue. If they do, checkpointLoop will
// persist a GTID set that includes un-applied transactions, and resume via
// StartSyncGTID will skip them (the server treats them as already-seen).
func TestMigratorHeartbeatDoesNotAdvancePastUnappliedDML(t *testing.T) {
migrationContext := base.NewMigrationContext()
migrationContext.UseGTIDs = true
migrator := NewMigrator(migrationContext, "test")
migrator.applier = NewApplier(migrationContext)

const srcUUID = "00000000-0000-0000-0000-000000000001"

// A DML on the original table at GTID :100 is observed and enqueued, but
// not yet applied.
dmlCoords, err := mysql.NewGTIDBinlogCoordinates(srcUUID + ":1-100")
require.NoError(t, err)
migrator.applyEventsQueue <- newApplyEventStructByDML(&binlog.BinlogEntry{
DmlEvent: &binlog.BinlogDMLEvent{
DatabaseName: "test",
TableName: migrationContext.OriginalTableName,
DML: binlog.UpdateDML,
},
Coordinates: dmlCoords,
})
require.Equal(t, 1, len(migrator.applyEventsQueue),
"DML must be sitting un-applied in the queue")

// A heartbeat row is then written; its GTID set includes the un-applied
// DML plus a few additional transactions.
heartbeatCoords, err := mysql.NewGTIDBinlogCoordinates(srcUUID + ":1-105")
require.NoError(t, err)
heartbeatColumnValues := sql.ToColumnValues([]interface{}{
123,
time.Now().Unix(),
"heartbeat",
time.Now().Format(time.RFC3339Nano),
})
require.NoError(t, migrator.onChangelogHeartbeatEvent(&binlog.BinlogEntry{
DmlEvent: &binlog.BinlogDMLEvent{
DatabaseName: "test",
DML: binlog.InsertDML,
NewColumnValues: heartbeatColumnValues,
},
Coordinates: heartbeatCoords,
}))

// The DML is still un-applied; the heartbeat's coords-bump sentinel has
// been enqueued behind it.
require.Equal(t, 2, len(migrator.applyEventsQueue),
"queue must hold the un-applied DML and the heartbeat sentinel; "+
"this test does not drain the queue")

// Invariant: CurrentCoordinates must NOT have advanced past the queued DML.
currentCoords := migrator.applier.CurrentCoordinates
require.False(t, currentCoords != nil && dmlCoords.SmallerThanOrEquals(currentCoords),
"CurrentCoordinates must not cover the un-applied DML at %s (got %v)",
dmlCoords.DisplayString(), currentCoords)

// Consequence: the checkpoint gate in Migrator.Checkpoint must NOT fire
// for streamer coords that include the un-applied DML.
require.False(t, currentCoords != nil && heartbeatCoords.SmallerThanOrEquals(currentCoords),
"checkpoint gate must not fire while DML at %s is un-applied",
dmlCoords.DisplayString())

// Ordering: the DML must come first, then the heartbeat sentinel. If a
// future change ever wraps the heartbeat enqueue in `go func()`, this
// invariant breaks and the bug returns.
firstQueued := <-migrator.applyEventsQueue
secondQueued := <-migrator.applyEventsQueue
require.NotNil(t, firstQueued.dmlEvent, "first queued event must be the DML")
require.Nil(t, firstQueued.writeFunc, "first queued event must not be a sentinel")
require.Nil(t, secondQueued.dmlEvent, "second queued event must not be a DML")
require.NotNil(t, secondQueued.writeFunc, "second queued event must be the heartbeat sentinel")
}

func TestMigratorValidateStatement(t *testing.T) {
t.Run("add-column", func(t *testing.T) {
migrationContext := base.NewMigrationContext()
Expand Down
Loading