eventservice: decouple dispatcher ready/handshake from scan triggering and activate the control plane immediately after register/reset#4840
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
📝 WalkthroughWalkthroughThe event broker decouples dispatcher ready/handshake event progression from scan-task range computation by introducing an Changes
Sequence Diagram(s)sequenceDiagram
participant Broker as Event Broker
participant DispMgr as Dispatcher Mgr
participant ControlPlane as Control Plane
participant EventCollector as Event Collector
Note over Broker,EventCollector: addDispatcher (New Flow)
Broker->>DispMgr: addDispatcher(task)
DispMgr->>ControlPlane: activateDispatcherControlPlane()
ControlPlane->>EventCollector: emit ReadyEvent
Note over DispMgr: Ready immediately
Note over Broker,EventCollector: resetDispatcher (New Flow)
Broker->>DispMgr: resetDispatcher(task)
DispMgr->>ControlPlane: activateDispatcherControlPlane()
ControlPlane->>EventCollector: emit HandshakeEvent
Note over DispMgr: Handshake immediately
Note over Broker,EventCollector: scanReady Path (Refactored)
Broker->>DispMgr: scanReady(task)
DispMgr->>ControlPlane: activateDispatcherControlPlane()
ControlPlane->>EventCollector: emit ReadyEvent (if needed)
DispMgr->>Broker: return eligibility after control-plane advance
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
Hi @zier-one. Thanks for your PR. I'm waiting for a pingcap member to verify that this patch is reasonable to test. If it is, they should reply with Once the patch is verified, the new status will be reflected by the I understand the commands that are listed here. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
There was a problem hiding this comment.
Code Review
This pull request refactors the event broker to decouple dispatcher control plane activation from scan task generation by introducing the activateDispatcherControlPlane method. This ensures that ready and handshake signals are sent immediately upon dispatcher registration or reset. A potential issue was identified in addDispatcher where a handshake event might be sent without necessary table metadata if the dispatcher is initialized with an epoch greater than zero, as the table information is not yet populated at that stage.
| } | ||
| c.dispatchers.Store(id, dispatcherPtr) | ||
| c.metricsCollector.metricDispatcherCount.Inc() | ||
| c.activateDispatcherControlPlane(dispatcher) |
There was a problem hiding this comment.
Calling activateDispatcherControlPlane here for a dispatcher with epoch > 0 will trigger a HandshakeEvent. However, dispatcher was initialized with nil for startTableInfo at line 1349. This means the handshake event will be sent without table information, which might cause issues for the receiver if it expects metadata for a newly registered table. Consider if addDispatcher should fetch table info when epoch > 0, similar to how resetDispatcher does at line 1531.
d4a649b to
ed419ca
Compare
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
pkg/eventservice/event_broker_test.go (1)
681-699:⚠️ Potential issue | 🔴 CriticalFilter the new ReadyEvent before asserting the batch resolved event.
addDispatchercan now emit a ReadyEvent, sooutputChno longer guarantees the next message isTypeBatchResolvedTs; CI is already failing with actual type3. Use the same filtering pattern as the heartbeat test. As per coding guidelines,**/*_test.go: favor deterministic tests and usetestify/require.Proposed fix
- msg := <-outputCh - require.Equal(t, msg.Type, messaging.TypeBatchResolvedTs) + deadline, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + for { + select { + case msg := <-outputCh: + if msg.Type != messaging.TypeBatchResolvedTs { + continue + } + return + case <-deadline.Done(): + require.Fail(t, "expected batch resolved event") + return + } + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/eventservice/event_broker_test.go` around lines 681 - 699, The test currently assumes the next message from outputCh is a BatchResolvedTs but addDispatcher may emit a ReadyEvent first, so change the assertion to drain messages from outputCh until you encounter messaging.TypeBatchResolvedTs: after invoking broker.handleResolvedTs, loop receiving msg := <-outputCh and ignore/discard messages of the ReadyEvent/type (check msg.Type or msg.GetType() as in the heartbeat test) using require.NotNil, and only then require.Equal(msg.Type, messaging.TypeBatchResolvedTs); keep references to broker.addDispatcher, broker.getDispatcher, broker.handleResolvedTs, outputCh, and messaging.TypeBatchResolvedTs to locate the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@pkg/eventservice/event_broker_test.go`:
- Around line 681-699: The test currently assumes the next message from outputCh
is a BatchResolvedTs but addDispatcher may emit a ReadyEvent first, so change
the assertion to drain messages from outputCh until you encounter
messaging.TypeBatchResolvedTs: after invoking broker.handleResolvedTs, loop
receiving msg := <-outputCh and ignore/discard messages of the ReadyEvent/type
(check msg.Type or msg.GetType() as in the heartbeat test) using require.NotNil,
and only then require.Equal(msg.Type, messaging.TypeBatchResolvedTs); keep
references to broker.addDispatcher, broker.getDispatcher,
broker.handleResolvedTs, outputCh, and messaging.TypeBatchResolvedTs to locate
the change.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 7ae110ec-5dd9-4ed2-ac2e-b68f14919a8b
📒 Files selected for processing (2)
pkg/eventservice/event_broker.gopkg/eventservice/event_broker_test.go
|
What problem does this PR solve?
Issue Number: close #4873
This PR fixes a control-plane timing issue in the event service. For normal dispatchers,
ReadyEventafter registration andHandshakeEventafter reset were still dependent on a later resolved-ts notification.What is changed and how it works?
This PR extracts dispatcher control-plane activation into
activateDispatcherControlPlane(), which is responsible for:ReadyEventfor dispatchers inepoch == 0;HandshakeEventfor dispatchers that are ready but not handshaked yet.The broker now calls this helper proactively at the key transition points instead of waiting for the next scan/notifier trigger:
scanReady()still uses the same helper, so the existing scan path remains intact. The change only removes the hidden coupling that required another resolved-ts advance before the dispatcher could enter ready/handshake.The PR also adds regression tests covering immediate ready after registration, immediate handshake after reset, and unchanged table-trigger registration behavior.
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit
Release Notes
Bug Fixes
Tests