-
Notifications
You must be signed in to change notification settings - Fork 50
ddl-test #4906
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
ddl-test #4906
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -73,7 +73,8 @@ type BarrierEvent struct { | |
| rangeChecker range_checker.RangeChecker | ||
| lastResendTime time.Time | ||
|
|
||
| lastWarningLogTime time.Time | ||
| lastWarningLogTime time.Time | ||
| lastForwardReconcileTime time.Time | ||
| } | ||
|
|
||
| func NewBlockEvent(cfID common.ChangeFeedID, | ||
|
|
@@ -106,7 +107,8 @@ func NewBlockEvent(cfID common.ChangeFeedID, | |
| reportedDispatchers: make(map[common.DispatcherID]struct{}), | ||
| lastResendTime: time.Time{}, | ||
|
|
||
| lastWarningLogTime: time.Now(), | ||
| lastWarningLogTime: time.Now(), | ||
| lastForwardReconcileTime: time.Time{}, | ||
| } | ||
|
|
||
| if status.BlockTables != nil { | ||
|
|
@@ -172,6 +174,92 @@ func (be *BarrierEvent) createRangeCheckerForTypeDB() { | |
| log.Info("create range checker for block event", zap.Any("influcenceType", be.blockedDispatchers.InfluenceType), zap.Any("commitTs", be.commitTs), zap.Int64("mode", be.mode)) | ||
| } | ||
|
|
||
| func (be *BarrierEvent) ensureRangeCheckerForCurrentTasks() { | ||
| if be.rangeChecker != nil { | ||
| return | ||
| } | ||
|
|
||
| switch be.blockedDispatchers.InfluenceType { | ||
| case heartbeatpb.InfluenceType_Normal: | ||
| if be.dynamicSplitEnabled { | ||
| be.rangeChecker = range_checker.NewTableSpanRangeChecker(be.spanController.GetkeyspaceID(), be.blockedDispatchers.TableIDs) | ||
| } else { | ||
| be.rangeChecker = range_checker.NewTableCountChecker(be.blockedDispatchers.TableIDs) | ||
| } | ||
| case heartbeatpb.InfluenceType_DB: | ||
| be.createRangeCheckerForTypeDB() | ||
| case heartbeatpb.InfluenceType_All: | ||
| be.createRangeCheckerForTypeAll() | ||
| } | ||
| } | ||
|
|
||
| // shortcutSyncPointToPassPhase switches a syncpoint from the WAITING coverage phase | ||
| // to the PASS/DONE coverage phase. The first phase tracks who has reached the barrier, | ||
| // while the second phase tracks who has finished it. We must reset the first-phase | ||
| // bookkeeping before reconciling forward progress, otherwise stale WAITING reports may | ||
| // be mistaken for finished dispatchers and keep the event in an inconsistent state. | ||
| func (be *BarrierEvent) shortcutSyncPointToPassPhase() { | ||
| be.ensureRangeCheckerForCurrentTasks() | ||
| if be.rangeChecker != nil { | ||
| be.rangeChecker.Reset() | ||
| } | ||
| be.reportedDispatchers = make(map[common.DispatcherID]struct{}) | ||
| be.selected.Store(true) | ||
| be.writerDispatcherAdvanced = true | ||
| be.reconcileForwardedDispatchers() | ||
| be.lastForwardReconcileTime = time.Now() | ||
| } | ||
|
|
||
| func (be *BarrierEvent) reconcileForwardedDispatchers() { | ||
| for _, replication := range be.collectRelevantReplications() { | ||
| if forwardBarrierEvent(replication, be) { | ||
| be.markDispatcherEventDone(replication.ID) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| func (be *BarrierEvent) collectRelevantReplications() []*replica.SpanReplication { | ||
| switch be.blockedDispatchers.InfluenceType { | ||
| case heartbeatpb.InfluenceType_DB: | ||
| replications := be.spanController.GetTasksBySchemaID(be.blockedDispatchers.SchemaID) | ||
| ddlDispatcher := be.spanController.GetDDLDispatcher() | ||
| if ddlDispatcher != nil { | ||
| replications = append(replications, ddlDispatcher) | ||
| } | ||
| return dedupReplications(replications) | ||
| case heartbeatpb.InfluenceType_All: | ||
| return be.spanController.GetAllTasks() | ||
| case heartbeatpb.InfluenceType_Normal: | ||
| replications := make([]*replica.SpanReplication, 0) | ||
| for _, tableID := range be.blockedDispatchers.TableIDs { | ||
| replications = append(replications, be.spanController.GetTasksByTableID(tableID)...) | ||
| } | ||
| return dedupReplications(replications) | ||
| default: | ||
| return nil | ||
| } | ||
| } | ||
|
|
||
| func dedupReplications(replications []*replica.SpanReplication) []*replica.SpanReplication { | ||
| if len(replications) <= 1 { | ||
| return replications | ||
| } | ||
|
|
||
| seen := make(map[common.DispatcherID]struct{}, len(replications)) | ||
| result := make([]*replica.SpanReplication, 0, len(replications)) | ||
| for _, replication := range replications { | ||
| if replication == nil { | ||
| continue | ||
| } | ||
| if _, ok := seen[replication.ID]; ok { | ||
| continue | ||
| } | ||
| seen[replication.ID] = struct{}{} | ||
| result = append(result, replication) | ||
| } | ||
| return result | ||
| } | ||
|
|
||
| func (be *BarrierEvent) checkEventAction(dispatcherID common.DispatcherID) (*heartbeatpb.DispatcherStatus, node.ID) { | ||
| if !be.allDispatcherReported() { | ||
| return nil, "" | ||
|
|
@@ -498,54 +586,89 @@ func (be *BarrierEvent) checkBlockedDispatchers() { | |
| replications := be.spanController.GetTasksByTableID(tableId) | ||
| for _, replication := range replications { | ||
| if forwardBarrierEvent(replication, be) { | ||
| if be.isSyncPoint { | ||
| be.shortcutSyncPointToPassPhase() | ||
| log.Info("one related dispatcher has forward checkpointTs, shortcut syncpoint to pass phase", | ||
| zap.String("changefeed", be.cfID.Name()), | ||
| zap.Uint64("commitTs", be.commitTs), | ||
| zap.Int64("tableId", tableId), | ||
| zap.Uint64("checkpointTs", replication.GetStatus().CheckpointTs), | ||
| zap.String("dispatcher", replication.ID.String()), | ||
| zap.Int64("mode", be.mode), | ||
| ) | ||
| } else { | ||
| // one related table has forward checkpointTs, means the block event can be advanced | ||
| be.selected.Store(true) | ||
| be.writerDispatcherAdvanced = true | ||
| log.Info("one related dispatcher has forward checkpointTs, means the block event can be advanced", | ||
| zap.String("changefeed", be.cfID.Name()), | ||
| zap.Uint64("commitTs", be.commitTs), | ||
| zap.Int64("tableId", tableId), | ||
| zap.Uint64("checkpointTs", replication.GetStatus().CheckpointTs), | ||
| zap.String("dispatcher", replication.ID.String()), | ||
| zap.Int64("mode", be.mode), | ||
| ) | ||
| } | ||
| return | ||
| } | ||
| } | ||
| } | ||
| case heartbeatpb.InfluenceType_DB: | ||
| schemaID := be.blockedDispatchers.SchemaID | ||
| replications := be.spanController.GetTasksBySchemaID(schemaID) | ||
| for _, replication := range replications { | ||
| if forwardBarrierEvent(replication, be) { | ||
| if be.isSyncPoint { | ||
| be.shortcutSyncPointToPassPhase() | ||
| log.Info("one related dispatcher has forward checkpointTs, shortcut syncpoint to pass phase", | ||
| zap.String("changefeed", be.cfID.Name()), | ||
| zap.Uint64("commitTs", be.commitTs), | ||
| zap.Int64("schemaID", schemaID), | ||
| zap.Uint64("checkpointTs", replication.GetStatus().CheckpointTs), | ||
| zap.String("dispatcher", replication.ID.String()), | ||
| zap.Int64("mode", be.mode), | ||
| ) | ||
| } else { | ||
| // one related table has forward checkpointTs, means the block event can be advanced | ||
| be.selected.Store(true) | ||
| be.writerDispatcherAdvanced = true | ||
| log.Info("one related dispatcher has forward checkpointTs, means the block event can be advanced", | ||
| zap.String("changefeed", be.cfID.Name()), | ||
| zap.Uint64("commitTs", be.commitTs), | ||
| zap.Int64("tableId", tableId), | ||
| zap.Int64("schemaID", schemaID), | ||
| zap.Uint64("checkpointTs", replication.GetStatus().CheckpointTs), | ||
| zap.String("dispatcher", replication.ID.String()), | ||
| zap.Int64("mode", be.mode), | ||
| ) | ||
| return | ||
| } | ||
| } | ||
| } | ||
| case heartbeatpb.InfluenceType_DB: | ||
| schemaID := be.blockedDispatchers.SchemaID | ||
| replications := be.spanController.GetTasksBySchemaID(schemaID) | ||
| for _, replication := range replications { | ||
| if forwardBarrierEvent(replication, be) { | ||
| // one related table has forward checkpointTs, means the block event can be advanced | ||
| be.selected.Store(true) | ||
| be.writerDispatcherAdvanced = true | ||
| log.Info("one related dispatcher has forward checkpointTs, means the block event can be advanced", | ||
| zap.String("changefeed", be.cfID.Name()), | ||
| zap.Uint64("commitTs", be.commitTs), | ||
| zap.Int64("schemaID", schemaID), | ||
| zap.Uint64("checkpointTs", replication.GetStatus().CheckpointTs), | ||
| zap.String("dispatcher", replication.ID.String()), | ||
| zap.Int64("mode", be.mode), | ||
| ) | ||
| return | ||
| } | ||
| } | ||
| case heartbeatpb.InfluenceType_All: | ||
| replications := be.spanController.GetAllTasks() | ||
| for _, replication := range replications { | ||
| if forwardBarrierEvent(replication, be) { | ||
| // one related table has forward checkpointTs, means the block event can be advanced | ||
| be.selected.Store(true) | ||
| be.writerDispatcherAdvanced = true | ||
| log.Info("one related dispatcher has forward checkpointTs, means the block event can be advanced", | ||
| zap.String("changefeed", be.cfID.Name()), | ||
| zap.Uint64("commitTs", be.commitTs), | ||
| zap.Uint64("checkpointTs", replication.GetStatus().CheckpointTs), | ||
| zap.String("dispatcher", replication.ID.String()), | ||
| zap.Int64("mode", be.mode), | ||
| ) | ||
| if be.isSyncPoint { | ||
| be.shortcutSyncPointToPassPhase() | ||
| log.Info("one related dispatcher has forward checkpointTs, shortcut syncpoint to pass phase", | ||
| zap.String("changefeed", be.cfID.Name()), | ||
| zap.Uint64("commitTs", be.commitTs), | ||
| zap.Uint64("checkpointTs", replication.GetStatus().CheckpointTs), | ||
| zap.String("dispatcher", replication.ID.String()), | ||
| zap.Int64("mode", be.mode), | ||
| ) | ||
| } else { | ||
| // one related table has forward checkpointTs, means the block event can be advanced | ||
| be.selected.Store(true) | ||
| be.writerDispatcherAdvanced = true | ||
| log.Info("one related dispatcher has forward checkpointTs, means the block event can be advanced", | ||
| zap.String("changefeed", be.cfID.Name()), | ||
| zap.Uint64("commitTs", be.commitTs), | ||
| zap.Uint64("checkpointTs", replication.GetStatus().CheckpointTs), | ||
| zap.String("dispatcher", replication.ID.String()), | ||
| zap.Int64("mode", be.mode), | ||
| ) | ||
| } | ||
| return | ||
| } | ||
| } | ||
|
|
@@ -675,6 +798,10 @@ func (be *BarrierEvent) resend(mode int64) []*messaging.TargetMessage { | |
|
|
||
| msgs = []*messaging.TargetMessage{be.newWriterActionMessage(stm.GetNodeID(), mode)} | ||
| } else { | ||
| if be.isSyncPoint && time.Since(be.lastForwardReconcileTime) > time.Second*10 { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The duration Consider defining a constant at the package level, for example: const forwardReconcileInterval = 10 * time.SecondAnd then use it here. |
||
| be.reconcileForwardedDispatchers() | ||
| be.lastForwardReconcileTime = time.Now() | ||
| } | ||
| // the writer dispatcher is advanced, resend pass action | ||
| return be.sendPassAction(mode) | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic inside this
ifblock is almost identical to the ones in theInfluenceType_DBandInfluenceType_Allcases below. This duplication makes the code harder to read and maintain.Consider extracting this logic into a private helper method to improve code clarity and reduce redundancy. The helper method could take the
replicationand any case-specific logging fields as arguments.For example, you could introduce a method like
handleForwardedDispatcher:Then you can simplify this block and the others to a single call, for example:
be.handleForwardedDispatcher(replication, zap.Int64("tableId", tableId))