diff --git a/maintainer/barrier_event.go b/maintainer/barrier_event.go index cf81e05a5e..6c4f12b9a5 100644 --- a/maintainer/barrier_event.go +++ b/maintainer/barrier_event.go @@ -73,7 +73,8 @@ type BarrierEvent struct { rangeChecker range_checker.RangeChecker lastResendTime time.Time - lastWarningLogTime time.Time + lastWarningLogTime time.Time + lastForwardReconcileTime time.Time } func NewBlockEvent(cfID common.ChangeFeedID, @@ -106,7 +107,8 @@ func NewBlockEvent(cfID common.ChangeFeedID, reportedDispatchers: make(map[common.DispatcherID]struct{}), lastResendTime: time.Time{}, - lastWarningLogTime: time.Now(), + lastWarningLogTime: time.Now(), + lastForwardReconcileTime: time.Time{}, } if status.BlockTables != nil { @@ -172,6 +174,92 @@ func (be *BarrierEvent) createRangeCheckerForTypeDB() { log.Info("create range checker for block event", zap.Any("influcenceType", be.blockedDispatchers.InfluenceType), zap.Any("commitTs", be.commitTs), zap.Int64("mode", be.mode)) } +func (be *BarrierEvent) ensureRangeCheckerForCurrentTasks() { + if be.rangeChecker != nil { + return + } + + switch be.blockedDispatchers.InfluenceType { + case heartbeatpb.InfluenceType_Normal: + if be.dynamicSplitEnabled { + be.rangeChecker = range_checker.NewTableSpanRangeChecker(be.spanController.GetkeyspaceID(), be.blockedDispatchers.TableIDs) + } else { + be.rangeChecker = range_checker.NewTableCountChecker(be.blockedDispatchers.TableIDs) + } + case heartbeatpb.InfluenceType_DB: + be.createRangeCheckerForTypeDB() + case heartbeatpb.InfluenceType_All: + be.createRangeCheckerForTypeAll() + } +} + +// shortcutSyncPointToPassPhase switches a syncpoint from the WAITING coverage phase +// to the PASS/DONE coverage phase. The first phase tracks who has reached the barrier, +// while the second phase tracks who has finished it. We must reset the first-phase +// bookkeeping before reconciling forward progress, otherwise stale WAITING reports may +// be mistaken for finished dispatchers and keep the event in an inconsistent state. +func (be *BarrierEvent) shortcutSyncPointToPassPhase() { + be.ensureRangeCheckerForCurrentTasks() + if be.rangeChecker != nil { + be.rangeChecker.Reset() + } + be.reportedDispatchers = make(map[common.DispatcherID]struct{}) + be.selected.Store(true) + be.writerDispatcherAdvanced = true + be.reconcileForwardedDispatchers() + be.lastForwardReconcileTime = time.Now() +} + +func (be *BarrierEvent) reconcileForwardedDispatchers() { + for _, replication := range be.collectRelevantReplications() { + if forwardBarrierEvent(replication, be) { + be.markDispatcherEventDone(replication.ID) + } + } +} + +func (be *BarrierEvent) collectRelevantReplications() []*replica.SpanReplication { + switch be.blockedDispatchers.InfluenceType { + case heartbeatpb.InfluenceType_DB: + replications := be.spanController.GetTasksBySchemaID(be.blockedDispatchers.SchemaID) + ddlDispatcher := be.spanController.GetDDLDispatcher() + if ddlDispatcher != nil { + replications = append(replications, ddlDispatcher) + } + return dedupReplications(replications) + case heartbeatpb.InfluenceType_All: + return be.spanController.GetAllTasks() + case heartbeatpb.InfluenceType_Normal: + replications := make([]*replica.SpanReplication, 0) + for _, tableID := range be.blockedDispatchers.TableIDs { + replications = append(replications, be.spanController.GetTasksByTableID(tableID)...) + } + return dedupReplications(replications) + default: + return nil + } +} + +func dedupReplications(replications []*replica.SpanReplication) []*replica.SpanReplication { + if len(replications) <= 1 { + return replications + } + + seen := make(map[common.DispatcherID]struct{}, len(replications)) + result := make([]*replica.SpanReplication, 0, len(replications)) + for _, replication := range replications { + if replication == nil { + continue + } + if _, ok := seen[replication.ID]; ok { + continue + } + seen[replication.ID] = struct{}{} + result = append(result, replication) + } + return result +} + func (be *BarrierEvent) checkEventAction(dispatcherID common.DispatcherID) (*heartbeatpb.DispatcherStatus, node.ID) { if !be.allDispatcherReported() { return nil, "" @@ -498,37 +586,61 @@ func (be *BarrierEvent) checkBlockedDispatchers() { replications := be.spanController.GetTasksByTableID(tableId) for _, replication := range replications { if forwardBarrierEvent(replication, be) { + if be.isSyncPoint { + be.shortcutSyncPointToPassPhase() + log.Info("one related dispatcher has forward checkpointTs, shortcut syncpoint to pass phase", + zap.String("changefeed", be.cfID.Name()), + zap.Uint64("commitTs", be.commitTs), + zap.Int64("tableId", tableId), + zap.Uint64("checkpointTs", replication.GetStatus().CheckpointTs), + zap.String("dispatcher", replication.ID.String()), + zap.Int64("mode", be.mode), + ) + } else { + // one related table has forward checkpointTs, means the block event can be advanced + be.selected.Store(true) + be.writerDispatcherAdvanced = true + log.Info("one related dispatcher has forward checkpointTs, means the block event can be advanced", + zap.String("changefeed", be.cfID.Name()), + zap.Uint64("commitTs", be.commitTs), + zap.Int64("tableId", tableId), + zap.Uint64("checkpointTs", replication.GetStatus().CheckpointTs), + zap.String("dispatcher", replication.ID.String()), + zap.Int64("mode", be.mode), + ) + } + return + } + } + } + case heartbeatpb.InfluenceType_DB: + schemaID := be.blockedDispatchers.SchemaID + replications := be.spanController.GetTasksBySchemaID(schemaID) + for _, replication := range replications { + if forwardBarrierEvent(replication, be) { + if be.isSyncPoint { + be.shortcutSyncPointToPassPhase() + log.Info("one related dispatcher has forward checkpointTs, shortcut syncpoint to pass phase", + zap.String("changefeed", be.cfID.Name()), + zap.Uint64("commitTs", be.commitTs), + zap.Int64("schemaID", schemaID), + zap.Uint64("checkpointTs", replication.GetStatus().CheckpointTs), + zap.String("dispatcher", replication.ID.String()), + zap.Int64("mode", be.mode), + ) + } else { // one related table has forward checkpointTs, means the block event can be advanced be.selected.Store(true) be.writerDispatcherAdvanced = true log.Info("one related dispatcher has forward checkpointTs, means the block event can be advanced", zap.String("changefeed", be.cfID.Name()), zap.Uint64("commitTs", be.commitTs), - zap.Int64("tableId", tableId), + zap.Int64("schemaID", schemaID), zap.Uint64("checkpointTs", replication.GetStatus().CheckpointTs), zap.String("dispatcher", replication.ID.String()), zap.Int64("mode", be.mode), ) - return } - } - } - case heartbeatpb.InfluenceType_DB: - schemaID := be.blockedDispatchers.SchemaID - replications := be.spanController.GetTasksBySchemaID(schemaID) - for _, replication := range replications { - if forwardBarrierEvent(replication, be) { - // one related table has forward checkpointTs, means the block event can be advanced - be.selected.Store(true) - be.writerDispatcherAdvanced = true - log.Info("one related dispatcher has forward checkpointTs, means the block event can be advanced", - zap.String("changefeed", be.cfID.Name()), - zap.Uint64("commitTs", be.commitTs), - zap.Int64("schemaID", schemaID), - zap.Uint64("checkpointTs", replication.GetStatus().CheckpointTs), - zap.String("dispatcher", replication.ID.String()), - zap.Int64("mode", be.mode), - ) return } } @@ -536,16 +648,27 @@ func (be *BarrierEvent) checkBlockedDispatchers() { replications := be.spanController.GetAllTasks() for _, replication := range replications { if forwardBarrierEvent(replication, be) { - // one related table has forward checkpointTs, means the block event can be advanced - be.selected.Store(true) - be.writerDispatcherAdvanced = true - log.Info("one related dispatcher has forward checkpointTs, means the block event can be advanced", - zap.String("changefeed", be.cfID.Name()), - zap.Uint64("commitTs", be.commitTs), - zap.Uint64("checkpointTs", replication.GetStatus().CheckpointTs), - zap.String("dispatcher", replication.ID.String()), - zap.Int64("mode", be.mode), - ) + if be.isSyncPoint { + be.shortcutSyncPointToPassPhase() + log.Info("one related dispatcher has forward checkpointTs, shortcut syncpoint to pass phase", + zap.String("changefeed", be.cfID.Name()), + zap.Uint64("commitTs", be.commitTs), + zap.Uint64("checkpointTs", replication.GetStatus().CheckpointTs), + zap.String("dispatcher", replication.ID.String()), + zap.Int64("mode", be.mode), + ) + } else { + // one related table has forward checkpointTs, means the block event can be advanced + be.selected.Store(true) + be.writerDispatcherAdvanced = true + log.Info("one related dispatcher has forward checkpointTs, means the block event can be advanced", + zap.String("changefeed", be.cfID.Name()), + zap.Uint64("commitTs", be.commitTs), + zap.Uint64("checkpointTs", replication.GetStatus().CheckpointTs), + zap.String("dispatcher", replication.ID.String()), + zap.Int64("mode", be.mode), + ) + } return } } @@ -675,6 +798,10 @@ func (be *BarrierEvent) resend(mode int64) []*messaging.TargetMessage { msgs = []*messaging.TargetMessage{be.newWriterActionMessage(stm.GetNodeID(), mode)} } else { + if be.isSyncPoint && time.Since(be.lastForwardReconcileTime) > time.Second*10 { + be.reconcileForwardedDispatchers() + be.lastForwardReconcileTime = time.Now() + } // the writer dispatcher is advanced, resend pass action return be.sendPassAction(mode) } diff --git a/maintainer/barrier_event_test.go b/maintainer/barrier_event_test.go index 79d3faae62..18b7bafcab 100644 --- a/maintainer/barrier_event_test.go +++ b/maintainer/barrier_event_test.go @@ -188,6 +188,81 @@ func TestResendAction(t *testing.T) { require.Equal(t, resp.DispatcherStatuses[0].Action.CommitTs, uint64(10)) } +func TestShortcutSyncPointToPassPhaseResetsWaitingCoverage(t *testing.T) { + testutil.SetUpTestServices(t) + tableTriggerEventDispatcherID := common.NewDispatcherID() + cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName) + ddlSpan := replica.NewWorkingSpanReplication(cfID, tableTriggerEventDispatcherID, + common.DDLSpanSchemaID, + common.KeyspaceDDLSpan(common.DefaultKeyspaceID), &heartbeatpb.TableSpanStatus{ + ID: tableTriggerEventDispatcherID.ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Working, + CheckpointTs: 1, + }, "node1", false) + spanController := span.NewController(cfID, ddlSpan, nil, nil, nil, common.DefaultKeyspaceID, common.DefaultMode) + operatorController := operator.NewOperatorController(cfID, spanController, 1000, common.DefaultMode) + + spanController.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: 1}, 1) + spanController.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: 2}, 1) + + dispatcher1 := spanController.GetTasksByTableID(1)[0] + dispatcher2 := spanController.GetTasksByTableID(2)[0] + for _, dispatcher := range []*replica.SpanReplication{dispatcher1, dispatcher2} { + spanController.BindSpanToNode("", "node1", dispatcher) + spanController.MarkSpanReplicating(dispatcher) + } + + event := NewBlockEvent(cfID, tableTriggerEventDispatcherID, spanController, operatorController, &heartbeatpb.State{ + IsBlocked: true, + BlockTs: 10, + BlockTables: &heartbeatpb.InfluencedTables{ + InfluenceType: heartbeatpb.InfluenceType_All, + }, + IsSyncPoint: true, + }, false, common.DefaultMode) + + // First-phase bookkeeping tracks who has reached WAITING. These entries must be + // discarded once the syncpoint is shortcut directly to the PASS/DONE phase. + event.markDispatcherEventDone(dispatcher1.ID) + event.markDispatcherEventDone(spanController.GetDDLDispatcherID()) + require.Contains(t, event.reportedDispatchers, dispatcher1.ID) + require.Contains(t, event.reportedDispatchers, spanController.GetDDLDispatcherID()) + require.False(t, event.rangeChecker.IsFullyCovered()) + + dispatcher2.UpdateStatus(&heartbeatpb.TableSpanStatus{ + ID: dispatcher2.ID.ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Working, + CheckpointTs: 11, + Mode: common.DefaultMode, + }) + + event.checkBlockedDispatchers() + require.True(t, event.selected.Load()) + require.True(t, event.writerDispatcherAdvanced) + require.True(t, event.writerDispatcher.IsZero()) + require.Contains(t, event.reportedDispatchers, dispatcher2.ID) + require.NotContains(t, event.reportedDispatchers, dispatcher1.ID) + require.NotContains(t, event.reportedDispatchers, spanController.GetDDLDispatcherID()) + require.False(t, event.rangeChecker.IsFullyCovered()) + require.False(t, event.allDispatcherReported()) + + dispatcher1.UpdateStatus(&heartbeatpb.TableSpanStatus{ + ID: dispatcher1.ID.ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Working, + CheckpointTs: 11, + Mode: common.DefaultMode, + }) + ddlSpan.UpdateStatus(&heartbeatpb.TableSpanStatus{ + ID: spanController.GetDDLDispatcherID().ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Working, + CheckpointTs: 11, + Mode: common.DefaultMode, + }) + + event.reconcileForwardedDispatchers() + require.True(t, event.allDispatcherReported()) +} + func TestSendPassActionTypeDBIncludesWriterNode(t *testing.T) { testutil.SetUpTestServices(t) nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName) diff --git a/maintainer/barrier_test.go b/maintainer/barrier_test.go index 5e982762a5..12f35a7475 100644 --- a/maintainer/barrier_test.go +++ b/maintainer/barrier_test.go @@ -1169,6 +1169,103 @@ func TestSyncPointBlockPerf(t *testing.T) { log.Info("duration", zap.Duration("duration", time.Since(now))) } +func TestSkippedSyncPointEventIsRemovedByReconcile(t *testing.T) { + testutil.SetUpTestServices(t) + nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName) + nodeManager.GetAliveNodes()["node1"] = &node.Info{ID: "node1"} + + tableTriggerEventDispatcherID := common.NewDispatcherID() + cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName) + ddlSpan := replica.NewWorkingSpanReplication(cfID, tableTriggerEventDispatcherID, + common.DDLSpanSchemaID, + common.KeyspaceDDLSpan(common.DefaultKeyspaceID), &heartbeatpb.TableSpanStatus{ + ID: tableTriggerEventDispatcherID.ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Working, + CheckpointTs: 1, + }, "node1", false) + spanController := span.NewController(cfID, ddlSpan, nil, nil, nil, common.DefaultKeyspaceID, common.DefaultMode) + operatorController := operator.NewOperatorController(cfID, spanController, 1000, common.DefaultMode) + + spanController.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: 1}, 1) + spanController.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: 2}, 1) + + dispatcher1 := spanController.GetTasksByTableID(1)[0] + dispatcher2 := spanController.GetTasksByTableID(2)[0] + for _, dispatcher := range []*replica.SpanReplication{dispatcher1, dispatcher2} { + spanController.BindSpanToNode("", "node1", dispatcher) + spanController.MarkSpanReplicating(dispatcher) + } + + barrier := NewBarrier(spanController, operatorController, false, nil, common.DefaultMode) + _ = barrier.HandleStatus("node1", &heartbeatpb.BlockStatusRequest{ + ChangefeedID: cfID.ToPB(), + BlockStatuses: []*heartbeatpb.TableSpanBlockStatus{ + { + ID: spanController.GetDDLDispatcherID().ToPB(), + State: &heartbeatpb.State{ + IsBlocked: true, + BlockTs: 10, + BlockTables: &heartbeatpb.InfluencedTables{ + InfluenceType: heartbeatpb.InfluenceType_All, + }, + IsSyncPoint: true, + }, + }, + { + ID: dispatcher1.ID.ToPB(), + State: &heartbeatpb.State{ + IsBlocked: true, + BlockTs: 10, + BlockTables: &heartbeatpb.InfluencedTables{ + InfluenceType: heartbeatpb.InfluenceType_All, + }, + IsSyncPoint: true, + }, + }, + }, + }) + + key := getEventKey(10, true) + event, ok := barrier.blockedEvents.Get(key) + require.True(t, ok) + require.False(t, event.selected.Load()) + + // dispatcher2 recreates/skips this syncpoint and moves directly beyond it. The + // maintainer should switch the event to the PASS/DONE phase without selecting a writer. + dispatcher2.UpdateStatus(&heartbeatpb.TableSpanStatus{ + ID: dispatcher2.ID.ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Working, + CheckpointTs: 11, + Mode: common.DefaultMode, + }) + + _ = barrier.Resend() + event, ok = barrier.blockedEvents.Get(key) + require.True(t, ok) + require.True(t, event.selected.Load()) + require.True(t, event.writerDispatcherAdvanced) + require.True(t, event.writerDispatcher.IsZero()) + + // The remaining dispatchers eventually move beyond the skipped syncpoint as well. + dispatcher1.UpdateStatus(&heartbeatpb.TableSpanStatus{ + ID: dispatcher1.ID.ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Working, + CheckpointTs: 11, + Mode: common.DefaultMode, + }) + ddlSpan.UpdateStatus(&heartbeatpb.TableSpanStatus{ + ID: spanController.GetDDLDispatcherID().ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Working, + CheckpointTs: 11, + Mode: common.DefaultMode, + }) + event.lastForwardReconcileTime = time.Now().Add(-11 * time.Second) + + _ = barrier.Resend() + _, ok = barrier.blockedEvents.Get(key) + require.False(t, ok) +} + // TestBarrierEventWithDispatcherReallocation tests the barrier's behavior when dispatchers are reallocated // during a blocking event. The test verifies that: // 1. When dispatchers are removed and new ones are created to replace them