Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 159 additions & 32 deletions maintainer/barrier_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ type BarrierEvent struct {
rangeChecker range_checker.RangeChecker
lastResendTime time.Time

lastWarningLogTime time.Time
lastWarningLogTime time.Time
lastForwardReconcileTime time.Time
}

func NewBlockEvent(cfID common.ChangeFeedID,
Expand Down Expand Up @@ -106,7 +107,8 @@ func NewBlockEvent(cfID common.ChangeFeedID,
reportedDispatchers: make(map[common.DispatcherID]struct{}),
lastResendTime: time.Time{},

lastWarningLogTime: time.Now(),
lastWarningLogTime: time.Now(),
lastForwardReconcileTime: time.Time{},
}

if status.BlockTables != nil {
Expand Down Expand Up @@ -172,6 +174,92 @@ func (be *BarrierEvent) createRangeCheckerForTypeDB() {
log.Info("create range checker for block event", zap.Any("influcenceType", be.blockedDispatchers.InfluenceType), zap.Any("commitTs", be.commitTs), zap.Int64("mode", be.mode))
}

func (be *BarrierEvent) ensureRangeCheckerForCurrentTasks() {
if be.rangeChecker != nil {
return
}

switch be.blockedDispatchers.InfluenceType {
case heartbeatpb.InfluenceType_Normal:
if be.dynamicSplitEnabled {
be.rangeChecker = range_checker.NewTableSpanRangeChecker(be.spanController.GetkeyspaceID(), be.blockedDispatchers.TableIDs)
} else {
be.rangeChecker = range_checker.NewTableCountChecker(be.blockedDispatchers.TableIDs)
}
case heartbeatpb.InfluenceType_DB:
be.createRangeCheckerForTypeDB()
case heartbeatpb.InfluenceType_All:
be.createRangeCheckerForTypeAll()
}
}

// shortcutSyncPointToPassPhase switches a syncpoint from the WAITING coverage phase
// to the PASS/DONE coverage phase. The first phase tracks who has reached the barrier,
// while the second phase tracks who has finished it. We must reset the first-phase
// bookkeeping before reconciling forward progress, otherwise stale WAITING reports may
// be mistaken for finished dispatchers and keep the event in an inconsistent state.
func (be *BarrierEvent) shortcutSyncPointToPassPhase() {
be.ensureRangeCheckerForCurrentTasks()
if be.rangeChecker != nil {
be.rangeChecker.Reset()
}
be.reportedDispatchers = make(map[common.DispatcherID]struct{})
be.selected.Store(true)
be.writerDispatcherAdvanced = true
be.reconcileForwardedDispatchers()
be.lastForwardReconcileTime = time.Now()
}

func (be *BarrierEvent) reconcileForwardedDispatchers() {
for _, replication := range be.collectRelevantReplications() {
if forwardBarrierEvent(replication, be) {
be.markDispatcherEventDone(replication.ID)
}
}
}

func (be *BarrierEvent) collectRelevantReplications() []*replica.SpanReplication {
switch be.blockedDispatchers.InfluenceType {
case heartbeatpb.InfluenceType_DB:
replications := be.spanController.GetTasksBySchemaID(be.blockedDispatchers.SchemaID)
ddlDispatcher := be.spanController.GetDDLDispatcher()
if ddlDispatcher != nil {
replications = append(replications, ddlDispatcher)
}
return dedupReplications(replications)
case heartbeatpb.InfluenceType_All:
return be.spanController.GetAllTasks()
case heartbeatpb.InfluenceType_Normal:
replications := make([]*replica.SpanReplication, 0)
for _, tableID := range be.blockedDispatchers.TableIDs {
replications = append(replications, be.spanController.GetTasksByTableID(tableID)...)
}
return dedupReplications(replications)
default:
return nil
}
}

func dedupReplications(replications []*replica.SpanReplication) []*replica.SpanReplication {
if len(replications) <= 1 {
return replications
}

seen := make(map[common.DispatcherID]struct{}, len(replications))
result := make([]*replica.SpanReplication, 0, len(replications))
for _, replication := range replications {
if replication == nil {
continue
}
if _, ok := seen[replication.ID]; ok {
continue
}
seen[replication.ID] = struct{}{}
result = append(result, replication)
}
return result
}

func (be *BarrierEvent) checkEventAction(dispatcherID common.DispatcherID) (*heartbeatpb.DispatcherStatus, node.ID) {
if !be.allDispatcherReported() {
return nil, ""
Expand Down Expand Up @@ -498,54 +586,89 @@ func (be *BarrierEvent) checkBlockedDispatchers() {
replications := be.spanController.GetTasksByTableID(tableId)
for _, replication := range replications {
if forwardBarrierEvent(replication, be) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic inside this if block is almost identical to the ones in the InfluenceType_DB and InfluenceType_All cases below. This duplication makes the code harder to read and maintain.

Consider extracting this logic into a private helper method to improve code clarity and reduce redundancy. The helper method could take the replication and any case-specific logging fields as arguments.

For example, you could introduce a method like handleForwardedDispatcher:

func (be *BarrierEvent) handleForwardedDispatcher(replication *replica.SpanReplication, extraLogFields ...zap.Field) {
	if be.isSyncPoint {
		be.shortcutSyncPointToPassPhase()
		fields := []zap.Field{
			zap.String("changefeed", be.cfID.Name()),
			zap.Uint64("commitTs", be.commitTs),
			zap.Uint64("checkpointTs", replication.GetStatus().CheckpointTs),
			zap.String("dispatcher", replication.ID.String()),
			zap.Int64("mode", be.mode),
		}
		fields = append(fields, extraLogFields...)
		log.Info("one related dispatcher has forward checkpointTs, shortcut syncpoint to pass phase", fields...)
	} else {
		be.selected.Store(true)
		be.writerDispatcherAdvanced = true
		fields := []zap.Field{
			zap.String("changefeed", be.cfID.Name()),
			zap.Uint64("commitTs", be.commitTs),
			zap.Uint64("checkpointTs", replication.GetStatus().CheckpointTs),
			zap.String("dispatcher", replication.ID.String()),
			zap.Int64("mode", be.mode),
		}
		fields = append(fields, extraLogFields...)
		log.Info("one related dispatcher has forward checkpointTs, means the block event can be advanced", fields...)
	}
}

Then you can simplify this block and the others to a single call, for example:
be.handleForwardedDispatcher(replication, zap.Int64("tableId", tableId))

if be.isSyncPoint {
be.shortcutSyncPointToPassPhase()
log.Info("one related dispatcher has forward checkpointTs, shortcut syncpoint to pass phase",
zap.String("changefeed", be.cfID.Name()),
zap.Uint64("commitTs", be.commitTs),
zap.Int64("tableId", tableId),
zap.Uint64("checkpointTs", replication.GetStatus().CheckpointTs),
zap.String("dispatcher", replication.ID.String()),
zap.Int64("mode", be.mode),
)
} else {
// one related table has forward checkpointTs, means the block event can be advanced
be.selected.Store(true)
be.writerDispatcherAdvanced = true
log.Info("one related dispatcher has forward checkpointTs, means the block event can be advanced",
zap.String("changefeed", be.cfID.Name()),
zap.Uint64("commitTs", be.commitTs),
zap.Int64("tableId", tableId),
zap.Uint64("checkpointTs", replication.GetStatus().CheckpointTs),
zap.String("dispatcher", replication.ID.String()),
zap.Int64("mode", be.mode),
)
}
return
}
}
}
case heartbeatpb.InfluenceType_DB:
schemaID := be.blockedDispatchers.SchemaID
replications := be.spanController.GetTasksBySchemaID(schemaID)
for _, replication := range replications {
if forwardBarrierEvent(replication, be) {
if be.isSyncPoint {
be.shortcutSyncPointToPassPhase()
log.Info("one related dispatcher has forward checkpointTs, shortcut syncpoint to pass phase",
zap.String("changefeed", be.cfID.Name()),
zap.Uint64("commitTs", be.commitTs),
zap.Int64("schemaID", schemaID),
zap.Uint64("checkpointTs", replication.GetStatus().CheckpointTs),
zap.String("dispatcher", replication.ID.String()),
zap.Int64("mode", be.mode),
)
} else {
// one related table has forward checkpointTs, means the block event can be advanced
be.selected.Store(true)
be.writerDispatcherAdvanced = true
log.Info("one related dispatcher has forward checkpointTs, means the block event can be advanced",
zap.String("changefeed", be.cfID.Name()),
zap.Uint64("commitTs", be.commitTs),
zap.Int64("tableId", tableId),
zap.Int64("schemaID", schemaID),
zap.Uint64("checkpointTs", replication.GetStatus().CheckpointTs),
zap.String("dispatcher", replication.ID.String()),
zap.Int64("mode", be.mode),
)
return
}
}
}
case heartbeatpb.InfluenceType_DB:
schemaID := be.blockedDispatchers.SchemaID
replications := be.spanController.GetTasksBySchemaID(schemaID)
for _, replication := range replications {
if forwardBarrierEvent(replication, be) {
// one related table has forward checkpointTs, means the block event can be advanced
be.selected.Store(true)
be.writerDispatcherAdvanced = true
log.Info("one related dispatcher has forward checkpointTs, means the block event can be advanced",
zap.String("changefeed", be.cfID.Name()),
zap.Uint64("commitTs", be.commitTs),
zap.Int64("schemaID", schemaID),
zap.Uint64("checkpointTs", replication.GetStatus().CheckpointTs),
zap.String("dispatcher", replication.ID.String()),
zap.Int64("mode", be.mode),
)
return
}
}
case heartbeatpb.InfluenceType_All:
replications := be.spanController.GetAllTasks()
for _, replication := range replications {
if forwardBarrierEvent(replication, be) {
// one related table has forward checkpointTs, means the block event can be advanced
be.selected.Store(true)
be.writerDispatcherAdvanced = true
log.Info("one related dispatcher has forward checkpointTs, means the block event can be advanced",
zap.String("changefeed", be.cfID.Name()),
zap.Uint64("commitTs", be.commitTs),
zap.Uint64("checkpointTs", replication.GetStatus().CheckpointTs),
zap.String("dispatcher", replication.ID.String()),
zap.Int64("mode", be.mode),
)
if be.isSyncPoint {
be.shortcutSyncPointToPassPhase()
log.Info("one related dispatcher has forward checkpointTs, shortcut syncpoint to pass phase",
zap.String("changefeed", be.cfID.Name()),
zap.Uint64("commitTs", be.commitTs),
zap.Uint64("checkpointTs", replication.GetStatus().CheckpointTs),
zap.String("dispatcher", replication.ID.String()),
zap.Int64("mode", be.mode),
)
} else {
// one related table has forward checkpointTs, means the block event can be advanced
be.selected.Store(true)
be.writerDispatcherAdvanced = true
log.Info("one related dispatcher has forward checkpointTs, means the block event can be advanced",
zap.String("changefeed", be.cfID.Name()),
zap.Uint64("commitTs", be.commitTs),
zap.Uint64("checkpointTs", replication.GetStatus().CheckpointTs),
zap.String("dispatcher", replication.ID.String()),
zap.Int64("mode", be.mode),
)
}
return
}
}
Expand Down Expand Up @@ -675,6 +798,10 @@ func (be *BarrierEvent) resend(mode int64) []*messaging.TargetMessage {

msgs = []*messaging.TargetMessage{be.newWriterActionMessage(stm.GetNodeID(), mode)}
} else {
if be.isSyncPoint && time.Since(be.lastForwardReconcileTime) > time.Second*10 {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The duration time.Second*10 is a magic number. It's better to define it as a named constant to improve readability and maintainability. This makes it easier to understand the purpose of the value and to change it in one place if needed.

Consider defining a constant at the package level, for example:

const forwardReconcileInterval = 10 * time.Second

And then use it here.

be.reconcileForwardedDispatchers()
be.lastForwardReconcileTime = time.Now()
}
// the writer dispatcher is advanced, resend pass action
return be.sendPassAction(mode)
}
Expand Down
75 changes: 75 additions & 0 deletions maintainer/barrier_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,81 @@ func TestResendAction(t *testing.T) {
require.Equal(t, resp.DispatcherStatuses[0].Action.CommitTs, uint64(10))
}

func TestShortcutSyncPointToPassPhaseResetsWaitingCoverage(t *testing.T) {
testutil.SetUpTestServices(t)
tableTriggerEventDispatcherID := common.NewDispatcherID()
cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName)
ddlSpan := replica.NewWorkingSpanReplication(cfID, tableTriggerEventDispatcherID,
common.DDLSpanSchemaID,
common.KeyspaceDDLSpan(common.DefaultKeyspaceID), &heartbeatpb.TableSpanStatus{
ID: tableTriggerEventDispatcherID.ToPB(),
ComponentStatus: heartbeatpb.ComponentState_Working,
CheckpointTs: 1,
}, "node1", false)
spanController := span.NewController(cfID, ddlSpan, nil, nil, nil, common.DefaultKeyspaceID, common.DefaultMode)
operatorController := operator.NewOperatorController(cfID, spanController, 1000, common.DefaultMode)

spanController.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: 1}, 1)
spanController.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: 2}, 1)

dispatcher1 := spanController.GetTasksByTableID(1)[0]
dispatcher2 := spanController.GetTasksByTableID(2)[0]
for _, dispatcher := range []*replica.SpanReplication{dispatcher1, dispatcher2} {
spanController.BindSpanToNode("", "node1", dispatcher)
spanController.MarkSpanReplicating(dispatcher)
}

event := NewBlockEvent(cfID, tableTriggerEventDispatcherID, spanController, operatorController, &heartbeatpb.State{
IsBlocked: true,
BlockTs: 10,
BlockTables: &heartbeatpb.InfluencedTables{
InfluenceType: heartbeatpb.InfluenceType_All,
},
IsSyncPoint: true,
}, false, common.DefaultMode)

// First-phase bookkeeping tracks who has reached WAITING. These entries must be
// discarded once the syncpoint is shortcut directly to the PASS/DONE phase.
event.markDispatcherEventDone(dispatcher1.ID)
event.markDispatcherEventDone(spanController.GetDDLDispatcherID())
require.Contains(t, event.reportedDispatchers, dispatcher1.ID)
require.Contains(t, event.reportedDispatchers, spanController.GetDDLDispatcherID())
require.False(t, event.rangeChecker.IsFullyCovered())

dispatcher2.UpdateStatus(&heartbeatpb.TableSpanStatus{
ID: dispatcher2.ID.ToPB(),
ComponentStatus: heartbeatpb.ComponentState_Working,
CheckpointTs: 11,
Mode: common.DefaultMode,
})

event.checkBlockedDispatchers()
require.True(t, event.selected.Load())
require.True(t, event.writerDispatcherAdvanced)
require.True(t, event.writerDispatcher.IsZero())
require.Contains(t, event.reportedDispatchers, dispatcher2.ID)
require.NotContains(t, event.reportedDispatchers, dispatcher1.ID)
require.NotContains(t, event.reportedDispatchers, spanController.GetDDLDispatcherID())
require.False(t, event.rangeChecker.IsFullyCovered())
require.False(t, event.allDispatcherReported())

dispatcher1.UpdateStatus(&heartbeatpb.TableSpanStatus{
ID: dispatcher1.ID.ToPB(),
ComponentStatus: heartbeatpb.ComponentState_Working,
CheckpointTs: 11,
Mode: common.DefaultMode,
})
ddlSpan.UpdateStatus(&heartbeatpb.TableSpanStatus{
ID: spanController.GetDDLDispatcherID().ToPB(),
ComponentStatus: heartbeatpb.ComponentState_Working,
CheckpointTs: 11,
Mode: common.DefaultMode,
})

event.reconcileForwardedDispatchers()
require.True(t, event.allDispatcherReported())
}

func TestSendPassActionTypeDBIncludesWriterNode(t *testing.T) {
testutil.SetUpTestServices(t)
nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName)
Expand Down
Loading
Loading