Skip to content

Commit ed419ca

Browse files
committed
update
1 parent 8486a5d commit ed419ca

2 files changed

Lines changed: 112 additions & 22 deletions

File tree

pkg/eventservice/event_broker.go

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -342,10 +342,9 @@ func (c *eventBroker) tickTableTriggerDispatchers(ctx context.Context) error {
342342
case <-ticker.C:
343343
c.tableTriggerDispatchers.Range(func(key, value interface{}) bool {
344344
stat := value.(*atomic.Pointer[dispatcherStat]).Load()
345-
if !c.checkAndSendReady(stat) {
345+
if !c.activateDispatcherControlPlane(stat) {
346346
return true
347347
}
348-
c.sendHandshakeIfNeed(stat)
349348
startTs := stat.sentResolvedTs.Load()
350349
remoteID := node.ID(stat.info.GetServerID())
351350
keyspaceMeta := common.KeyspaceMeta{
@@ -357,6 +356,7 @@ func (c *eventBroker) tickTableTriggerDispatchers(ctx context.Context) error {
357356
log.Error("table trigger ddl events fetch failed", zap.Uint32("keyspaceID", stat.info.GetTableSpan().KeyspaceID), zap.Stringer("dispatcherID", stat.id), zap.Error(err))
358357
return true
359358
}
359+
// Keep the raw resolved-ts from schema store for scan readiness/lag visibility.
360360
stat.receivedResolvedTs.Store(endTs)
361361
for _, e := range ddlEvents {
362362
ep := &e
@@ -505,23 +505,31 @@ func (c *eventBroker) getScanTaskDataRange(task scanTask) (bool, common.DataRang
505505
// Note: A true return value only indicates potential scanning need,
506506
// final determination occurs when the scanTask is actully processed.
507507
func (c *eventBroker) scanReady(task scanTask) bool {
508-
if task.isRemoved.Load() {
508+
if task.isTaskScanning.Load() {
509509
return false
510510
}
511511

512-
if task.isTaskScanning.Load() {
512+
if !c.activateDispatcherControlPlane(task) {
513+
return false
514+
}
515+
516+
ok, _ := c.getScanTaskDataRange(task)
517+
return ok
518+
}
519+
520+
// activateDispatcherControlPlane advances the dispatcher through ready/handshake
521+
// without coupling it to scan task generation.
522+
func (c *eventBroker) activateDispatcherControlPlane(task scanTask) bool {
523+
if task.isRemoved.Load() {
513524
return false
514525
}
515526

516-
// If the dispatcher is not ready, we don't need do the scan.
517527
if !c.checkAndSendReady(task) {
518528
return false
519529
}
520530

521531
c.sendHandshakeIfNeed(task)
522-
523-
ok, _ := c.getScanTaskDataRange(task)
524-
return ok
532+
return true
525533
}
526534

527535
func (c *eventBroker) checkAndSendReady(task scanTask) bool {
@@ -1070,6 +1078,7 @@ func (c *eventBroker) addDispatcher(info DispatcherInfo) error {
10701078
}
10711079
c.dispatchers.Store(id, dispatcherPtr)
10721080
c.metricsCollector.metricDispatcherCount.Inc()
1081+
c.activateDispatcherControlPlane(dispatcher)
10731082
log.Info("register dispatcher",
10741083
zap.Uint64("clusterID", c.tidbClusterID),
10751084
zap.Stringer("changefeedID", changefeedID),
@@ -1233,6 +1242,7 @@ func (c *eventBroker) resetDispatcher(dispatcherInfo DispatcherInfo) error {
12331242
zap.Uint64("newStartTs", dispatcherInfo.GetStartTs()),
12341243
zap.Uint64("newEpoch", newStat.epoch),
12351244
zap.Duration("resetTime", time.Since(start)))
1245+
c.activateDispatcherControlPlane(newStat)
12361246

12371247
return nil
12381248
}

pkg/eventservice/event_broker_test.go

Lines changed: 94 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,18 @@ type notifyMsg struct {
6262
latestCommitTs uint64
6363
}
6464

65+
func collectWrapEventTypes(ch chan *wrapEvent) []int {
66+
eventTypes := make([]int, 0)
67+
for {
68+
select {
69+
case e := <-ch:
70+
eventTypes = append(eventTypes, e.msgType)
71+
default:
72+
return eventTypes
73+
}
74+
}
75+
}
76+
6577
func TestCheckNeedScan(t *testing.T) {
6678
broker, _, _, _ := newEventBrokerForTest()
6779
// Close the broker, so we can catch all message in the test.
@@ -119,6 +131,64 @@ func TestGetOrSetChangefeedStatusInitializesFilter(t *testing.T) {
119131
require.Same(t, status.filter, reused.filter)
120132
}
121133

134+
func TestAddDispatcherSendsReadyImmediatelyAfterRegistration(t *testing.T) {
135+
broker, _, _, _ := newEventBrokerForTest()
136+
broker.close()
137+
138+
info := newMockDispatcherInfoForTest(t)
139+
err := broker.addDispatcher(info)
140+
require.NoError(t, err)
141+
142+
dispPtr := broker.getDispatcher(info.GetID())
143+
require.NotNil(t, dispPtr)
144+
disp := dispPtr.Load()
145+
require.NotNil(t, disp)
146+
147+
eventTypes := collectWrapEventTypes(broker.messageCh[disp.messageWorkerIndex])
148+
require.Equal(t, []int{event.TypeReadyEvent}, eventTypes)
149+
}
150+
151+
func TestResetDispatcherSendsHandshakeImmediatelyAfterEpochSwitch(t *testing.T) {
152+
broker, _, _, _ := newEventBrokerForTest()
153+
broker.close()
154+
155+
info := newMockDispatcherInfoForTest(t)
156+
err := broker.addDispatcher(info)
157+
require.NoError(t, err)
158+
159+
disp := broker.getDispatcher(info.GetID()).Load()
160+
require.NotNil(t, disp)
161+
collectWrapEventTypes(broker.messageCh[disp.messageWorkerIndex])
162+
163+
resetInfo := newMockDispatcherInfo(t, 500, info.GetID(), info.GetTableSpan().GetTableID(), eventpb.ActionType_ACTION_TYPE_RESET)
164+
resetInfo.epoch = 1
165+
err = broker.resetDispatcher(resetInfo)
166+
require.NoError(t, err)
167+
168+
newDisp := broker.getDispatcher(info.GetID()).Load()
169+
require.NotNil(t, newDisp)
170+
require.Equal(t, uint64(1), newDisp.epoch)
171+
172+
eventTypes := collectWrapEventTypes(broker.messageCh[newDisp.messageWorkerIndex])
173+
require.Equal(t, []int{event.TypeHandshakeEvent}, eventTypes)
174+
}
175+
176+
func TestAddTableTriggerDispatcherDoesNotSendReadyImmediately(t *testing.T) {
177+
broker, _, _, _ := newEventBrokerForTest()
178+
broker.close()
179+
180+
info := newMockDispatcherInfo(t, 300, common.NewDispatcherID(), 0, eventpb.ActionType_ACTION_TYPE_REGISTER)
181+
info.span = common.KeyspaceDDLSpan(testTableTriggerKeyspaceID)
182+
err := broker.addDispatcher(info)
183+
require.NoError(t, err)
184+
185+
disp := broker.getDispatcher(info.GetID()).Load()
186+
require.NotNil(t, disp)
187+
188+
eventTypes := collectWrapEventTypes(broker.messageCh[disp.messageWorkerIndex])
189+
require.Empty(t, eventTypes)
190+
}
191+
122192
func TestOnNotify(t *testing.T) {
123193
broker, _, ss, _ := newEventBrokerForTest()
124194
// Close the broker, so we can catch all message in the test.
@@ -704,19 +774,25 @@ func TestHandleDispatcherHeartbeat_InactiveDispatcherCleanup(t *testing.T) {
704774
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
705775
defer cancel()
706776
// Verify that a response was sent indicating the dispatcher is removed
707-
select {
708-
case msg := <-outputCh:
709-
require.Equal(t, messaging.TypeDispatcherHeartbeatResponse, msg.Type)
710-
// The response should contain a dispatcher state indicating removal
711-
require.Len(t, msg.Message, 1)
712-
response := msg.Message[0].(*event.DispatcherHeartbeatResponse)
713-
require.NotNil(t, response)
714-
states := response.DispatcherStates
715-
require.Len(t, states, 1)
716-
require.Equal(t, dispInfo.GetID(), states[0].DispatcherID)
717-
require.Equal(t, event.DSStateRemoved, states[0].State)
718-
case <-ctx.Done():
719-
require.Fail(t, "Expected to receive a dispatcher heartbeat response")
777+
for {
778+
select {
779+
case msg := <-outputCh:
780+
if msg.Type != messaging.TypeDispatcherHeartbeatResponse {
781+
continue
782+
}
783+
// Ignore earlier ready/handshake traffic and verify the heartbeat response itself.
784+
require.Len(t, msg.Message, 1)
785+
response := msg.Message[0].(*event.DispatcherHeartbeatResponse)
786+
require.NotNil(t, response)
787+
states := response.DispatcherStates
788+
require.Len(t, states, 1)
789+
require.Equal(t, dispInfo.GetID(), states[0].DispatcherID)
790+
require.Equal(t, event.DSStateRemoved, states[0].State)
791+
return
792+
case <-ctx.Done():
793+
require.Fail(t, "Expected to receive a dispatcher heartbeat response")
794+
return
795+
}
720796
}
721797
}
722798

@@ -939,7 +1015,7 @@ func TestSendHandshakeUsesStartTs(t *testing.T) {
9391015

9401016
func TestAddDispatcherFailure(t *testing.T) {
9411017
broker, _, ss, _ := newEventBrokerForTest()
942-
defer broker.close()
1018+
broker.close()
9431019

9441020
// Simulate schema store failure
9451021
ss.registerTableError = errors.New("mock error")
@@ -950,4 +1026,8 @@ func TestAddDispatcherFailure(t *testing.T) {
9501026

9511027
_, ok := broker.changefeedMap.Load(dispInfo.GetChangefeedID())
9521028
require.False(t, ok, "changefeedStatus should be removed after failed registration")
1029+
require.Nil(t, broker.getDispatcher(dispInfo.GetID()))
1030+
for _, ch := range broker.messageCh {
1031+
require.Empty(t, collectWrapEventTypes(ch))
1032+
}
9531033
}

0 commit comments

Comments
 (0)