Skip to content

Commit 7168f79

Browse files
committed
support TiDB bidirectional input
1 parent 989fa54 commit 7168f79

14 files changed

Lines changed: 943 additions & 346 deletions

File tree

Makefile

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,6 @@ run-dev:
4949

5050
build:
5151
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/gravity cmd/gravity/main.go
52-
#$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/padder cmd/padder/main.go
53-
5452

5553
build-linux:
5654
GOARCH=amd64 GOOS=linux $(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/gravity-linux-amd64 cmd/gravity/main.go
@@ -74,14 +72,21 @@ proto:
7472
@ which protoc >/dev/null || brew install protobuf
7573
@ which protoc-gen-gofast >/dev/null || go get github.com/gogo/protobuf/protoc-gen-gofast
7674

77-
protoc -I=protocol/msgpb -I=${GOPATH}/src -I=${GOPATH}/src/github.com/gogo/protobuf/protobuf --gofast_out=\
75+
protoc -I=protocol/msgpb -I=${GOPATH}/src -I=${GOPATH}/src/github.com/gogo/protobuf/protobuf --gofast_out=.\
7876
plugins=grpc,\
7977
Mgoogle/protobuf/any.proto=github.com/gogo/protobuf/types,\
8078
Mgoogle/protobuf/struct.proto=github.com/gogo/protobuf/types,\
8179
Mgoogle/protobuf/timestamp.proto=github.com/gogo/protobuf/types,\
8280
Mgoogle/protobuf/wrappers.proto=github.com/gogo/protobuf/types:./pkg/protocol/msgpb \
8381
protocol/msgpb/message.proto
8482

83+
protoc -I=protocol/tidb -I=${GOPATH}/src -I=${GOPATH}/src/github.com/gogo/protobuf/protobuf --gofast_out=.\
84+
Mgoogle/protobuf/any.proto=github.com/gogo/protobuf/types,\
85+
Mgoogle/protobuf/struct.proto=github.com/gogo/protobuf/types,\
86+
Mgoogle/protobuf/timestamp.proto=github.com/gogo/protobuf/types,\
87+
Mgoogle/protobuf/wrappers.proto=github.com/gogo/protobuf/types:./pkg/protocol/tidb \
88+
protocol/tidb/tidb.proto
89+
8590

8691
mock:
8792
mockgen -destination ./mock/binlog_checker/mock.go github.com/moiot/gravity/pkg/inputs/helper/binlog_checker BinlogChecker

docs/2.0/example-tidb2tidb.toml

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
name = "tidb2tidbDemo"
2+
version = "1.0"
3+
4+
[input]
5+
type = "tidbkafka"
6+
mode = "stream"
7+
8+
[input.config]
9+
ignore-bidirectional-data = true
10+
11+
[input.config.position-repo]
12+
type = "mysql-repo"
13+
[input.config.position-repo.config.source]
14+
host = "127.0.0.1"
15+
username = "root"
16+
password = ""
17+
port = 4000
18+
19+
[input.config.source-db]
20+
host = "127.0.0.1"
21+
username = "root"
22+
password = ""
23+
port = 4000
24+
25+
[input.config.source-kafka]
26+
topics = ["obinlog"]
27+
consume-from = "oldest"
28+
group-id = "tidb2tidbDemo"
29+
[input.config.source-kafka.brokers]
30+
broker-addrs = ["localhost:9092", "localhost:9093", "localhost:9094"]
31+
32+
[output]
33+
type = "mysql"
34+
35+
[output.config]
36+
enable-ddl = true
37+
38+
[output.config.target]
39+
host = "127.0.0.1"
40+
username = "root"
41+
password = ""
42+
port = 4000
43+
max-open = 30 # optional, max connections
44+
max-idle = 30 # optional, suggest to be the same as max-open
45+
46+
# The definition of the routing rule
47+
[[output.config.routes]]
48+
match-schema = "test"
49+
match-table = "t"
50+
target-table = "t2"
51+
52+
[scheduler]
53+
type = "batch-table-scheduler"
54+
[scheduler.config]
55+
nr-worker = 30
56+
batch-size = 1000
57+
queue-size = 1024
58+
sliding-window-size = 16384

pkg/config/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ type SourceTiDBConfig struct {
121121
// OffsetStoreConfig *SourceProbeCfg `mapstructure:"offset-store" toml:"offset-store" json:"offset-store"`
122122
PositionRepo *GenericPluginConfig `mapstructure:"position-repo" toml:"position-repo" json:"position-repo"`
123123
IgnoreBiDirectionalData bool `mapstructure:"ignore-bidirectional-data" toml:"ignore-bidirectional-data" json:"ignore-bidirectional-data"`
124+
FailOnTxnTags []string `mapstructure:"fail-on-txn-tags" toml:"fail-on-txn-tags" json:"fail-on-txn-tags"`
124125
}
125126

126127
type GtmConfig struct {

pkg/core/msg.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ func IsInternalTrafficV2(db string, tbl string) bool {
225225
}
226226

227227
func MatchTxnTagPipelineName(patterns []string, msg *Msg) (string, bool) {
228-
if IsInternalTrafficV2(msg.Database, msg.Table) {
228+
if len(patterns) > 0 && IsInternalTrafficV2(msg.Database, msg.Table) {
229229
pipelineName := msg.DmlMsg.Data["pipeline_name"].(string)
230230
for _, pattern := range patterns {
231231
if utils.Glob(pattern, pipelineName) {

pkg/inputs/plugins.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@ import (
77
"github.com/moiot/gravity/pkg/inputs/mysql"
88
_ "github.com/moiot/gravity/pkg/inputs/mysqlbatch"
99
_ "github.com/moiot/gravity/pkg/inputs/mysqlstream"
10+
"github.com/moiot/gravity/pkg/inputs/tidb_kafka"
1011
_ "github.com/moiot/gravity/pkg/inputs/tidb_kafka"
1112
)
1213

1314
const (
1415
Mongo = mongo.Name
1516
Mysql = mysql.Name
17+
TiDB = tidb_kafka.TiDBKafka
1618
)

0 commit comments

Comments
 (0)