Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions pkg/eventservice/event_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,10 +342,9 @@ func (c *eventBroker) tickTableTriggerDispatchers(ctx context.Context) error {
case <-ticker.C:
c.tableTriggerDispatchers.Range(func(key, value interface{}) bool {
stat := value.(*atomic.Pointer[dispatcherStat]).Load()
if !c.checkAndSendReady(stat) {
if !c.activateDispatcherControlPlane(stat) {
return true
}
c.sendHandshakeIfNeed(stat)
startTs := stat.sentResolvedTs.Load()
remoteID := node.ID(stat.info.GetServerID())
keyspaceMeta := common.KeyspaceMeta{
Expand All @@ -357,6 +356,7 @@ func (c *eventBroker) tickTableTriggerDispatchers(ctx context.Context) error {
log.Error("table trigger ddl events fetch failed", zap.Uint32("keyspaceID", stat.info.GetTableSpan().KeyspaceID), zap.Stringer("dispatcherID", stat.id), zap.Error(err))
return true
}
// Keep the raw resolved-ts from schema store for scan readiness/lag visibility.
stat.receivedResolvedTs.Store(endTs)
for _, e := range ddlEvents {
ep := &e
Expand Down Expand Up @@ -505,23 +505,31 @@ func (c *eventBroker) getScanTaskDataRange(task scanTask) (bool, common.DataRang
// Note: A true return value only indicates potential scanning need,
// final determination occurs when the scanTask is actully processed.
func (c *eventBroker) scanReady(task scanTask) bool {
if task.isRemoved.Load() {
if task.isTaskScanning.Load() {
return false
}

if task.isTaskScanning.Load() {
if !c.activateDispatcherControlPlane(task) {
return false
}

ok, _ := c.getScanTaskDataRange(task)
return ok
}

// activateDispatcherControlPlane advances the dispatcher through ready/handshake
// without coupling it to scan task generation.
func (c *eventBroker) activateDispatcherControlPlane(task scanTask) bool {
if task.isRemoved.Load() {
return false
}

// If the dispatcher is not ready, we don't need do the scan.
if !c.checkAndSendReady(task) {
return false
}

c.sendHandshakeIfNeed(task)

ok, _ := c.getScanTaskDataRange(task)
return ok
return true
}

func (c *eventBroker) checkAndSendReady(task scanTask) bool {
Expand Down Expand Up @@ -1070,6 +1078,7 @@ func (c *eventBroker) addDispatcher(info DispatcherInfo) error {
}
c.dispatchers.Store(id, dispatcherPtr)
c.metricsCollector.metricDispatcherCount.Inc()
c.activateDispatcherControlPlane(dispatcher)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Calling activateDispatcherControlPlane here for a dispatcher with epoch > 0 will trigger a HandshakeEvent. However, dispatcher was initialized with nil for startTableInfo at line 1349. This means the handshake event will be sent without table information, which might cause issues for the receiver if it expects metadata for a newly registered table. Consider if addDispatcher should fetch table info when epoch > 0, similar to how resetDispatcher does at line 1531.

log.Info("register dispatcher",
zap.Uint64("clusterID", c.tidbClusterID),
zap.Stringer("changefeedID", changefeedID),
Expand Down Expand Up @@ -1233,6 +1242,7 @@ func (c *eventBroker) resetDispatcher(dispatcherInfo DispatcherInfo) error {
zap.Uint64("newStartTs", dispatcherInfo.GetStartTs()),
zap.Uint64("newEpoch", newStat.epoch),
zap.Duration("resetTime", time.Since(start)))
c.activateDispatcherControlPlane(newStat)

return nil
}
Expand Down
108 changes: 94 additions & 14 deletions pkg/eventservice/event_broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,18 @@ type notifyMsg struct {
latestCommitTs uint64
}

func collectWrapEventTypes(ch chan *wrapEvent) []int {
eventTypes := make([]int, 0)
for {
select {
case e := <-ch:
eventTypes = append(eventTypes, e.msgType)
default:
return eventTypes
}
}
}

func TestCheckNeedScan(t *testing.T) {
broker, _, _, _ := newEventBrokerForTest()
// Close the broker, so we can catch all message in the test.
Expand Down Expand Up @@ -119,6 +131,64 @@ func TestGetOrSetChangefeedStatusInitializesFilter(t *testing.T) {
require.Same(t, status.filter, reused.filter)
}

func TestAddDispatcherSendsReadyImmediatelyAfterRegistration(t *testing.T) {
broker, _, _, _ := newEventBrokerForTest()
broker.close()

info := newMockDispatcherInfoForTest(t)
err := broker.addDispatcher(info)
require.NoError(t, err)

dispPtr := broker.getDispatcher(info.GetID())
require.NotNil(t, dispPtr)
disp := dispPtr.Load()
require.NotNil(t, disp)

eventTypes := collectWrapEventTypes(broker.messageCh[disp.messageWorkerIndex])
require.Equal(t, []int{event.TypeReadyEvent}, eventTypes)
}

func TestResetDispatcherSendsHandshakeImmediatelyAfterEpochSwitch(t *testing.T) {
broker, _, _, _ := newEventBrokerForTest()
broker.close()

info := newMockDispatcherInfoForTest(t)
err := broker.addDispatcher(info)
require.NoError(t, err)

disp := broker.getDispatcher(info.GetID()).Load()
require.NotNil(t, disp)
collectWrapEventTypes(broker.messageCh[disp.messageWorkerIndex])

resetInfo := newMockDispatcherInfo(t, 500, info.GetID(), info.GetTableSpan().GetTableID(), eventpb.ActionType_ACTION_TYPE_RESET)
resetInfo.epoch = 1
err = broker.resetDispatcher(resetInfo)
require.NoError(t, err)

newDisp := broker.getDispatcher(info.GetID()).Load()
require.NotNil(t, newDisp)
require.Equal(t, uint64(1), newDisp.epoch)

eventTypes := collectWrapEventTypes(broker.messageCh[newDisp.messageWorkerIndex])
require.Equal(t, []int{event.TypeHandshakeEvent}, eventTypes)
}

func TestAddTableTriggerDispatcherDoesNotSendReadyImmediately(t *testing.T) {
broker, _, _, _ := newEventBrokerForTest()
broker.close()

info := newMockDispatcherInfo(t, 300, common.NewDispatcherID(), 0, eventpb.ActionType_ACTION_TYPE_REGISTER)
info.span = common.KeyspaceDDLSpan(testTableTriggerKeyspaceID)
err := broker.addDispatcher(info)
require.NoError(t, err)

disp := broker.getDispatcher(info.GetID()).Load()
require.NotNil(t, disp)

eventTypes := collectWrapEventTypes(broker.messageCh[disp.messageWorkerIndex])
require.Empty(t, eventTypes)
}

func TestOnNotify(t *testing.T) {
broker, _, ss, _ := newEventBrokerForTest()
// Close the broker, so we can catch all message in the test.
Expand Down Expand Up @@ -704,19 +774,25 @@ func TestHandleDispatcherHeartbeat_InactiveDispatcherCleanup(t *testing.T) {
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Verify that a response was sent indicating the dispatcher is removed
select {
case msg := <-outputCh:
require.Equal(t, messaging.TypeDispatcherHeartbeatResponse, msg.Type)
// The response should contain a dispatcher state indicating removal
require.Len(t, msg.Message, 1)
response := msg.Message[0].(*event.DispatcherHeartbeatResponse)
require.NotNil(t, response)
states := response.DispatcherStates
require.Len(t, states, 1)
require.Equal(t, dispInfo.GetID(), states[0].DispatcherID)
require.Equal(t, event.DSStateRemoved, states[0].State)
case <-ctx.Done():
require.Fail(t, "Expected to receive a dispatcher heartbeat response")
for {
select {
case msg := <-outputCh:
if msg.Type != messaging.TypeDispatcherHeartbeatResponse {
continue
}
// Ignore earlier ready/handshake traffic and verify the heartbeat response itself.
require.Len(t, msg.Message, 1)
response := msg.Message[0].(*event.DispatcherHeartbeatResponse)
require.NotNil(t, response)
states := response.DispatcherStates
require.Len(t, states, 1)
require.Equal(t, dispInfo.GetID(), states[0].DispatcherID)
require.Equal(t, event.DSStateRemoved, states[0].State)
return
case <-ctx.Done():
require.Fail(t, "Expected to receive a dispatcher heartbeat response")
return
}
}
}

Expand Down Expand Up @@ -939,7 +1015,7 @@ func TestSendHandshakeUsesStartTs(t *testing.T) {

func TestAddDispatcherFailure(t *testing.T) {
broker, _, ss, _ := newEventBrokerForTest()
defer broker.close()
broker.close()

// Simulate schema store failure
ss.registerTableError = errors.New("mock error")
Expand All @@ -950,4 +1026,8 @@ func TestAddDispatcherFailure(t *testing.T) {

_, ok := broker.changefeedMap.Load(dispInfo.GetChangefeedID())
require.False(t, ok, "changefeedStatus should be removed after failed registration")
require.Nil(t, broker.getDispatcher(dispInfo.GetID()))
for _, ch := range broker.messageCh {
require.Empty(t, collectWrapEventTypes(ch))
}
}
Loading