diff --git a/pkg/eventservice/event_broker.go b/pkg/eventservice/event_broker.go index 21bdb93641..ddddee7377 100644 --- a/pkg/eventservice/event_broker.go +++ b/pkg/eventservice/event_broker.go @@ -342,10 +342,9 @@ func (c *eventBroker) tickTableTriggerDispatchers(ctx context.Context) error { case <-ticker.C: c.tableTriggerDispatchers.Range(func(key, value interface{}) bool { stat := value.(*atomic.Pointer[dispatcherStat]).Load() - if !c.checkAndSendReady(stat) { + if !c.activateDispatcherControlPlane(stat) { return true } - c.sendHandshakeIfNeed(stat) startTs := stat.sentResolvedTs.Load() remoteID := node.ID(stat.info.GetServerID()) keyspaceMeta := common.KeyspaceMeta{ @@ -357,6 +356,7 @@ func (c *eventBroker) tickTableTriggerDispatchers(ctx context.Context) error { log.Error("table trigger ddl events fetch failed", zap.Uint32("keyspaceID", stat.info.GetTableSpan().KeyspaceID), zap.Stringer("dispatcherID", stat.id), zap.Error(err)) return true } + // Keep the raw resolved-ts from schema store for scan readiness/lag visibility. stat.receivedResolvedTs.Store(endTs) for _, e := range ddlEvents { ep := &e @@ -505,23 +505,31 @@ func (c *eventBroker) getScanTaskDataRange(task scanTask) (bool, common.DataRang // Note: A true return value only indicates potential scanning need, // final determination occurs when the scanTask is actully processed. func (c *eventBroker) scanReady(task scanTask) bool { - if task.isRemoved.Load() { + if task.isTaskScanning.Load() { return false } - if task.isTaskScanning.Load() { + if !c.activateDispatcherControlPlane(task) { + return false + } + + ok, _ := c.getScanTaskDataRange(task) + return ok +} + +// activateDispatcherControlPlane advances the dispatcher through ready/handshake +// without coupling it to scan task generation. +func (c *eventBroker) activateDispatcherControlPlane(task scanTask) bool { + if task.isRemoved.Load() { return false } - // If the dispatcher is not ready, we don't need do the scan. if !c.checkAndSendReady(task) { return false } c.sendHandshakeIfNeed(task) - - ok, _ := c.getScanTaskDataRange(task) - return ok + return true } func (c *eventBroker) checkAndSendReady(task scanTask) bool { @@ -1070,6 +1078,7 @@ func (c *eventBroker) addDispatcher(info DispatcherInfo) error { } c.dispatchers.Store(id, dispatcherPtr) c.metricsCollector.metricDispatcherCount.Inc() + c.activateDispatcherControlPlane(dispatcher) log.Info("register dispatcher", zap.Uint64("clusterID", c.tidbClusterID), zap.Stringer("changefeedID", changefeedID), @@ -1233,6 +1242,7 @@ func (c *eventBroker) resetDispatcher(dispatcherInfo DispatcherInfo) error { zap.Uint64("newStartTs", dispatcherInfo.GetStartTs()), zap.Uint64("newEpoch", newStat.epoch), zap.Duration("resetTime", time.Since(start))) + c.activateDispatcherControlPlane(newStat) return nil } diff --git a/pkg/eventservice/event_broker_test.go b/pkg/eventservice/event_broker_test.go index 4a274e0c12..fb03a5902e 100644 --- a/pkg/eventservice/event_broker_test.go +++ b/pkg/eventservice/event_broker_test.go @@ -62,6 +62,18 @@ type notifyMsg struct { latestCommitTs uint64 } +func collectWrapEventTypes(ch chan *wrapEvent) []int { + eventTypes := make([]int, 0) + for { + select { + case e := <-ch: + eventTypes = append(eventTypes, e.msgType) + default: + return eventTypes + } + } +} + func TestCheckNeedScan(t *testing.T) { broker, _, _, _ := newEventBrokerForTest() // Close the broker, so we can catch all message in the test. @@ -119,6 +131,64 @@ func TestGetOrSetChangefeedStatusInitializesFilter(t *testing.T) { require.Same(t, status.filter, reused.filter) } +func TestAddDispatcherSendsReadyImmediatelyAfterRegistration(t *testing.T) { + broker, _, _, _ := newEventBrokerForTest() + broker.close() + + info := newMockDispatcherInfoForTest(t) + err := broker.addDispatcher(info) + require.NoError(t, err) + + dispPtr := broker.getDispatcher(info.GetID()) + require.NotNil(t, dispPtr) + disp := dispPtr.Load() + require.NotNil(t, disp) + + eventTypes := collectWrapEventTypes(broker.messageCh[disp.messageWorkerIndex]) + require.Equal(t, []int{event.TypeReadyEvent}, eventTypes) +} + +func TestResetDispatcherSendsHandshakeImmediatelyAfterEpochSwitch(t *testing.T) { + broker, _, _, _ := newEventBrokerForTest() + broker.close() + + info := newMockDispatcherInfoForTest(t) + err := broker.addDispatcher(info) + require.NoError(t, err) + + disp := broker.getDispatcher(info.GetID()).Load() + require.NotNil(t, disp) + collectWrapEventTypes(broker.messageCh[disp.messageWorkerIndex]) + + resetInfo := newMockDispatcherInfo(t, 500, info.GetID(), info.GetTableSpan().GetTableID(), eventpb.ActionType_ACTION_TYPE_RESET) + resetInfo.epoch = 1 + err = broker.resetDispatcher(resetInfo) + require.NoError(t, err) + + newDisp := broker.getDispatcher(info.GetID()).Load() + require.NotNil(t, newDisp) + require.Equal(t, uint64(1), newDisp.epoch) + + eventTypes := collectWrapEventTypes(broker.messageCh[newDisp.messageWorkerIndex]) + require.Equal(t, []int{event.TypeHandshakeEvent}, eventTypes) +} + +func TestAddTableTriggerDispatcherDoesNotSendReadyImmediately(t *testing.T) { + broker, _, _, _ := newEventBrokerForTest() + broker.close() + + info := newMockDispatcherInfo(t, 300, common.NewDispatcherID(), 0, eventpb.ActionType_ACTION_TYPE_REGISTER) + info.span = common.KeyspaceDDLSpan(testTableTriggerKeyspaceID) + err := broker.addDispatcher(info) + require.NoError(t, err) + + disp := broker.getDispatcher(info.GetID()).Load() + require.NotNil(t, disp) + + eventTypes := collectWrapEventTypes(broker.messageCh[disp.messageWorkerIndex]) + require.Empty(t, eventTypes) +} + func TestOnNotify(t *testing.T) { broker, _, ss, _ := newEventBrokerForTest() // Close the broker, so we can catch all message in the test. @@ -704,19 +774,25 @@ func TestHandleDispatcherHeartbeat_InactiveDispatcherCleanup(t *testing.T) { ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // Verify that a response was sent indicating the dispatcher is removed - select { - case msg := <-outputCh: - require.Equal(t, messaging.TypeDispatcherHeartbeatResponse, msg.Type) - // The response should contain a dispatcher state indicating removal - require.Len(t, msg.Message, 1) - response := msg.Message[0].(*event.DispatcherHeartbeatResponse) - require.NotNil(t, response) - states := response.DispatcherStates - require.Len(t, states, 1) - require.Equal(t, dispInfo.GetID(), states[0].DispatcherID) - require.Equal(t, event.DSStateRemoved, states[0].State) - case <-ctx.Done(): - require.Fail(t, "Expected to receive a dispatcher heartbeat response") + for { + select { + case msg := <-outputCh: + if msg.Type != messaging.TypeDispatcherHeartbeatResponse { + continue + } + // Ignore earlier ready/handshake traffic and verify the heartbeat response itself. + require.Len(t, msg.Message, 1) + response := msg.Message[0].(*event.DispatcherHeartbeatResponse) + require.NotNil(t, response) + states := response.DispatcherStates + require.Len(t, states, 1) + require.Equal(t, dispInfo.GetID(), states[0].DispatcherID) + require.Equal(t, event.DSStateRemoved, states[0].State) + return + case <-ctx.Done(): + require.Fail(t, "Expected to receive a dispatcher heartbeat response") + return + } } } @@ -939,7 +1015,7 @@ func TestSendHandshakeUsesStartTs(t *testing.T) { func TestAddDispatcherFailure(t *testing.T) { broker, _, ss, _ := newEventBrokerForTest() - defer broker.close() + broker.close() // Simulate schema store failure ss.registerTableError = errors.New("mock error") @@ -950,4 +1026,8 @@ func TestAddDispatcherFailure(t *testing.T) { _, ok := broker.changefeedMap.Load(dispInfo.GetChangefeedID()) require.False(t, ok, "changefeedStatus should be removed after failed registration") + require.Nil(t, broker.getDispatcher(dispInfo.GetID())) + for _, ch := range broker.messageCh { + require.Empty(t, collectWrapEventTypes(ch)) + } }