@@ -352,21 +352,17 @@ func (c *eventBroker) tickTableTriggerDispatchers(ctx context.Context) error {
352352 ID : stat .info .GetTableSpan ().KeyspaceID ,
353353 Name : stat .info .GetChangefeedID ().Keyspace (),
354354 }
355-
356- ddlEvents , endTs , err := c .schemaStore .FetchTableTriggerDDLEvents (keyspaceMeta , dispatcherID , stat .filter , startTs , 100 )
355+ ddlEvents , endTs , err := c .schemaStore .FetchTableTriggerDDLEvents (keyspaceMeta , key .(common.DispatcherID ), stat .filter , startTs , 100 )
357356 if err != nil {
358- log .Error ("table trigger ddl events fetch failed" , zap .Uint32 ("keyspaceID" , stat .info .GetTableSpan ().KeyspaceID ),
359- zap .Stringer ("dispatcherID" , stat .id ), zap .Uint64 ("startTs" , startTs ), zap .Error (err ))
357+ log .Error ("table trigger ddl events fetch failed" , zap .Uint32 ("keyspaceID" , stat .info .GetTableSpan ().KeyspaceID ), zap .Stringer ("dispatcherID" , stat .id ), zap .Error (err ))
360358 return true
361359 }
362360 // Keep the raw resolved-ts from schema store for scan readiness/lag visibility.
363361 stat .receivedResolvedTs .Store (endTs )
364-
365362 for _ , e := range ddlEvents {
366363 ep := & e
367364 c .sendDDL (ctx , remoteID , ep , stat )
368365 }
369-
370366 if endTs > startTs {
371367 // After all the events are sent, we send the watermark to the dispatcher.
372368 c .sendResolvedTs (stat , endTs )
0 commit comments