Skip to content

Commit 8486a5d

Browse files
authored
consumer: ignore stale event after renaming table in storage consumer (#4871)
close #4334
1 parent 3c6a882 commit 8486a5d

1 file changed

Lines changed: 42 additions & 2 deletions

File tree

cmd/storage-consumer/consumer.go

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ import (
3535
"github.com/pingcap/ticdc/pkg/sink/codec/csv"
3636
putil "github.com/pingcap/ticdc/pkg/util"
3737
"github.com/pingcap/tidb/br/pkg/storage"
38+
timodel "github.com/pingcap/tidb/pkg/meta/model"
39+
"github.com/pingcap/tidb/pkg/parser"
40+
"github.com/pingcap/tidb/pkg/parser/ast"
3841
"go.uber.org/atomic"
3942
"go.uber.org/zap"
4043
"golang.org/x/sync/errgroup"
@@ -510,6 +513,42 @@ func (c *consumer) mustGetTableDef(key cloudstorage.SchemaPathKey) cloudstorage.
510513
return *tableDef
511514
}
512515

516+
func getRenameTableOldTableKey(tableDef cloudstorage.TableDefinition) (string, bool) {
517+
if tableDef.Type != byte(timodel.ActionRenameTable) {
518+
return "", false
519+
}
520+
schemaName := tableDef.Schema
521+
tableName := tableDef.Table
522+
stmt, err := parser.New().ParseOneStmt(tableDef.Query, "", "")
523+
if err != nil {
524+
log.Panic("parse statement failed", zap.Any("DDL", tableDef.Query), zap.Error(err))
525+
}
526+
// The query in job maybe "RENAME TABLE table1 to table2"
527+
renameStmt, ok := stmt.(*ast.RenameTableStmt)
528+
if !ok || len(renameStmt.TableToTables) == 0 {
529+
log.Panic("invalid rename table statement", zap.Any("DDL", tableDef.Query))
530+
}
531+
oldTable := renameStmt.TableToTables[0].OldTable
532+
if oldTable.Schema.O != "" {
533+
schemaName = oldTable.Schema.O
534+
}
535+
tableName = oldTable.Name.O
536+
return commonType.QuoteSchema(schemaName, tableName), true
537+
}
538+
539+
func (c *consumer) updateTableDDLWatermark(tableDef cloudstorage.TableDefinition) string {
540+
key := commonType.QuoteSchema(tableDef.Schema, tableDef.Table)
541+
if c.tableDDLWatermark[key] < tableDef.TableVersion {
542+
c.tableDDLWatermark[key] = tableDef.TableVersion
543+
}
544+
if oldTableKey, ok := getRenameTableOldTableKey(tableDef); ok {
545+
if c.tableDDLWatermark[oldTableKey] < tableDef.TableVersion {
546+
c.tableDDLWatermark[oldTableKey] = tableDef.TableVersion
547+
}
548+
}
549+
return key
550+
}
551+
513552
func (c *consumer) handleNewFiles(
514553
ctx context.Context,
515554
dmlFileMap map[cloudstorage.DmlPathKey]fileIndexRange,
@@ -582,12 +621,13 @@ func (c *consumer) handleNewFiles(
582621
if err := c.sink.WriteBlockEvent(ddlEvent); err != nil {
583622
return errors.Trace(err)
584623
}
585-
c.tableDDLWatermark[tableKey] = key.TableVersion
624+
watermarkKey := c.updateTableDDLWatermark(tableDef)
586625
// TODO: need to cleanup tableDefMap in the future.
587626
log.Info("execute ddl event successfully",
588627
zap.String("query", tableDef.Query),
589628
zap.String("schema", key.Schema), zap.String("table", key.Table),
590-
zap.Uint64("ddlWatermark", c.tableDDLWatermark[tableKey]))
629+
zap.Uint64("ddlWatermark", c.tableDDLWatermark[tableKey]),
630+
zap.String("watermarkKey", watermarkKey))
591631
continue
592632
}
593633

0 commit comments

Comments
 (0)