@@ -12,49 +12,81 @@ import (
1212 "github.com/github/gh-ost/go/binlog"
1313 "github.com/github/gh-ost/go/mysql"
1414 "github.com/github/gh-ost/go/sql"
15- "github.com/stretchr/testify/require "
15+ "github.com/stretchr/testify/suite "
1616 "github.com/testcontainers/testcontainers-go"
1717 "github.com/testcontainers/testcontainers-go/wait"
1818)
1919
20- func TestCoordinator (t * testing.T ) {
20+ type CoordinatorTestSuite struct {
21+ suite.Suite
22+
23+ mysqlContainer testcontainers.Container
24+ db * gosql.DB
25+ }
26+
27+ func (suite * CoordinatorTestSuite ) SetupSuite () {
2128 ctx := context .Background ()
2229 req := testcontainers.ContainerRequest {
23- Image : "mysql:8.0" ,
24- Env : map [string ]string {"MYSQL_ROOT_PASSWORD" : "root" },
25- WaitingFor : wait .ForLog ( "port: 3306 MySQL Community Server - GPL " ),
26- ExposedPorts : []string {"3306" },
30+ Image : "mysql:8.0.40 " ,
31+ Env : map [string ]string {"MYSQL_ROOT_PASSWORD" : "root-password " },
32+ WaitingFor : wait .ForListeningPort ( " 3306/tcp " ),
33+ ExposedPorts : []string {"3306/tcp " },
2734 }
2835
2936 mysqlContainer , err := testcontainers .GenericContainer (ctx , testcontainers.GenericContainerRequest {
3037 ContainerRequest : req ,
3138 Started : true ,
3239 })
33- require .NoError (t , err )
34- t .Cleanup (func () {
35- ctx := context .Background ()
36- require .NoError (t , mysqlContainer .Terminate (ctx ))
37- })
40+ suite .Require ().NoError (err )
3841
39- host , err := mysqlContainer .Host (ctx )
40- require .NoError (t , err )
42+ suite .mysqlContainer = mysqlContainer
4143
42- mappedPort , err := mysqlContainer . MappedPort (ctx , "3306" )
43- require . NoError (t , err )
44+ dsn , err := GetDSN (ctx , mysqlContainer )
45+ suite . Require (). NoError (err )
4446
45- db , err := gosql .Open ("mysql" , "root:root@tcp(" + host + ":" + mappedPort . Port () + ")/" )
46- require . NoError (t , err )
47+ db , err := gosql .Open ("mysql" , dsn )
48+ suite . Require (). NoError (err )
4749
48- t .Cleanup (func () {
49- require .NoError (t , db .Close ())
50- })
50+ suite .db = db
51+ }
52+
53+ func (suite * CoordinatorTestSuite ) SetupTest () {
54+ ctx := context .Background ()
55+ _ , err := suite .db .ExecContext (ctx , "RESET MASTER" )
56+ suite .Require ().NoError (err )
57+
58+ _ , err = suite .db .ExecContext (ctx , "SET @@GLOBAL.binlog_transaction_dependency_tracking = WRITESET" )
59+ suite .Require ().NoError (err )
60+
61+ _ , err = suite .db .ExecContext (ctx , "CREATE DATABASE test" )
62+ suite .Require ().NoError (err )
63+ }
64+
65+ func (suite * CoordinatorTestSuite ) TearDownTest () {
66+ ctx := context .Background ()
67+ _ , err := suite .db .ExecContext (ctx , "DROP DATABASE test" )
68+ suite .Require ().NoError (err )
69+ }
70+
71+ func (suite * CoordinatorTestSuite ) TeardownSuite () {
72+ ctx := context .Background ()
73+
74+ suite .Assert ().NoError (suite .db .Close ())
75+ suite .Assert ().NoError (suite .mysqlContainer .Terminate (ctx ))
76+ }
77+
78+ func (suite * CoordinatorTestSuite ) TestApplyDML () {
79+ ctx := context .Background ()
80+
81+ connectionConfig , err := GetConnectionConfig (ctx , suite .mysqlContainer )
5182
5283 _ = os .Remove ("/tmp/gh-ost.sock" )
5384
54- //prepareDatabase(t, db)
85+ _ , err = suite .db .Exec ("CREATE TABLE test.gh_ost_test (id INT PRIMARY KEY AUTO_INCREMENT, name VARCHAR(255)) ENGINE=InnoDB" )
86+ suite .Require ().NoError (err )
5587
56- _ , err = db .Exec ("CREATE TABLE test._gh_ost_test_gho (id INT PRIMARY KEY AUTO_INCREMENT, name VARCHAR(255))" )
57- require . NoError (t , err )
88+ _ , err = suite . db .Exec ("CREATE TABLE test._gh_ost_test_gho (id INT PRIMARY KEY AUTO_INCREMENT, name VARCHAR(255))" )
89+ suite . Require (). NoError (err )
5890
5991 migrationContext := base .NewMigrationContext ()
6092 migrationContext .DatabaseName = "test"
@@ -67,23 +99,8 @@ func TestCoordinator(t *testing.T) {
6799 migrationContext .ThrottleHTTPIntervalMillis = 100
68100 migrationContext .DMLBatchSize = 10
69101
70- migrationContext .ApplierConnectionConfig = & mysql.ConnectionConfig {
71- Key : mysql.InstanceKey {
72- Hostname : host ,
73- Port : mappedPort .Int (),
74- },
75- User : "root" ,
76- Password : "root" ,
77- }
78-
79- migrationContext .InspectorConnectionConfig = & mysql.ConnectionConfig {
80- Key : mysql.InstanceKey {
81- Hostname : host ,
82- Port : mappedPort .Int (),
83- },
84- User : "root" ,
85- Password : "root" ,
86- }
102+ migrationContext .ApplierConnectionConfig = connectionConfig
103+ migrationContext .InspectorConnectionConfig = connectionConfig
87104
88105 migrationContext .OriginalTableColumns = sql .NewColumnList ([]string {"id" , "name" })
89106 migrationContext .GhostTableColumns = sql .NewColumnList ([]string {"id" , "name" })
@@ -96,41 +113,41 @@ func TestCoordinator(t *testing.T) {
96113 }
97114
98115 migrationContext .SetConnectionConfig ("innodb" )
116+ migrationContext .SkipPortValidation = true
99117 migrationContext .NumWorkers = 4
100- // HACK: so
101- migrationContext .AzureMySQL = true
102118
103119 applier := NewApplier (migrationContext )
104120 err = applier .InitDBConnections (migrationContext .NumWorkers )
105- require . NoError (t , err )
121+ suite . Require (). NoError (err )
106122
107123 err = applier .prepareQueries ()
108- require . NoError (t , err )
124+ suite . Require (). NoError (err )
109125
110126 err = applier .CreateChangelogTable ()
111- require . NoError (t , err )
127+ suite . Require (). NoError (err )
112128
129+ // TODO: use errgroup
113130 for i := 0 ; i < 100 ; i ++ {
114- tx , err := db .Begin ()
115- require . NoError (t , err )
131+ tx , err := suite . db .Begin ()
132+ suite . Require (). NoError (err )
116133
117134 for j := 0 ; j < 100 ; j ++ {
118135 _ , err = tx .Exec ("INSERT INTO test.gh_ost_test (name) VALUES ('test')" )
119- require . NoError (t , err )
136+ suite . Require (). NoError (err )
120137 }
121138
122139 err = tx .Commit ()
123- require . NoError (t , err )
140+ suite . Require (). NoError (err )
124141 }
125142
126- _ , err = db .Exec ("UPDATE test.gh_ost_test SET name = 'foobar' WHERE id = 1" )
127- require . NoError (t , err )
143+ _ , err = suite . db .Exec ("UPDATE test.gh_ost_test SET name = 'foobar' WHERE id = 1" )
144+ suite . Require (). NoError (err )
128145
129- _ , err = db .Exec ("INSERT INTO test.gh_ost_test (name) VALUES ('test')" )
130- require . NoError (t , err )
146+ _ , err = suite . db .Exec ("INSERT INTO test.gh_ost_test (name) VALUES ('test')" )
147+ suite . Require (). NoError (err )
131148
132149 _ , err = applier .WriteChangelogState ("completed" )
133- require . NoError (t , err )
150+ suite . Require (). NoError (err )
134151
135152 ctx , cancel := context .WithCancel (context .Background ())
136153
@@ -148,15 +165,15 @@ func TestCoordinator(t *testing.T) {
148165 LogFile : "binlog.000001" ,
149166 LogPos : int64 (4 ),
150167 }
151- coord .InitializeWorkers (8 )
168+ coord .InitializeWorkers (4 )
152169
153170 streamCtx , cancelStreaming := context .WithCancel (context .Background ())
154171 canStopStreaming := func () bool {
155172 return streamCtx .Err () != nil
156173 }
157174 go func () {
158175 err = coord .StartStreaming (streamCtx , canStopStreaming )
159- require . Equal (t , context .Canceled , err )
176+ suite . Require (). Equal (context .Canceled , err )
160177 }()
161178
162179 // Give streamer some time to start
@@ -171,8 +188,12 @@ func TestCoordinator(t *testing.T) {
171188 }
172189
173190 err = coord .ProcessEventsUntilDrained ()
174- require . NoError (t , err )
191+ suite . Require (). NoError (err )
175192 }
176193
177194 fmt .Printf ("Time taken: %s\n " , time .Since (startAt ))
178195}
196+
197+ func TestCoordinator (t * testing.T ) {
198+ suite .Run (t , new (CoordinatorTestSuite ))
199+ }
0 commit comments