Skip to content
This repository was archived by the owner on Nov 24, 2023. It is now read-only.

Commit 28a0f4d

Browse files
lance6716GMHDBJD
authored andcommitted
syncer: don't skip jobs from same event when comparing table checkpoint (#1752) (#1783)
1 parent 690b64b commit 28a0f4d

6 files changed

Lines changed: 123 additions & 26 deletions

File tree

syncer/checkpoint.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,8 @@ type CheckPoint interface {
145145
// DeleteTablePoint deletes checkpoint for specified table in memory and storage
146146
DeleteTablePoint(tctx *tcontext.Context, sourceSchema, sourceTable string) error
147147

148-
// IsNewerTablePoint checks whether job's checkpoint is newer than previous saved checkpoint
149-
IsNewerTablePoint(sourceSchema, sourceTable string, pos mysql.Position) bool
148+
// IsOlderThanTablePoint checks whether job's checkpoint is older than previous saved checkpoint
149+
IsOlderThanTablePoint(sourceSchema, sourceTable string, pos mysql.Position, useLE bool) bool
150150

151151
// SaveGlobalPoint saves the global binlog stream's checkpoint
152152
// corresponding to Meta.Save
@@ -327,20 +327,26 @@ func (cp *RemoteCheckPoint) DeleteTablePoint(tctx *tcontext.Context, sourceSchem
327327
return nil
328328
}
329329

330-
// IsNewerTablePoint implements CheckPoint.IsNewerTablePoint
331-
func (cp *RemoteCheckPoint) IsNewerTablePoint(sourceSchema, sourceTable string, pos mysql.Position) bool {
330+
// IsOlderThanTablePoint implements CheckPoint.IsOlderThanTablePoint.
331+
// For binlog position replication, currently DM will split rows changes of an event to jobs, so some job may has save position.
332+
// if useLE is true, we use less than or equal.
333+
func (cp *RemoteCheckPoint) IsOlderThanTablePoint(sourceSchema, sourceTable string, pos mysql.Position, useLE bool) bool {
332334
cp.RLock()
333335
defer cp.RUnlock()
334336
mSchema, ok := cp.points[sourceSchema]
335337
if !ok {
336-
return true
338+
return false
337339
}
338340
point, ok := mSchema[sourceTable]
339341
if !ok {
340-
return true
342+
return false
341343
}
342344
oldPos := point.MySQLPos()
343-
return pos.Compare(oldPos) > 0
345+
cp.logCtx.L().Debug("compare table position whether is newer", zap.Stringer("position", pos), zap.Stringer("old position", oldPos))
346+
if useLE {
347+
return pos.Compare(oldPos) <= 0
348+
}
349+
return pos.Compare(oldPos) < 0
344350
}
345351

346352
// SaveGlobalPoint implements CheckPoint.SaveGlobalPoint

syncer/checkpoint_test.go

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -269,23 +269,23 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) {
269269
)
270270

271271
// not exist
272-
newer := cp.IsNewerTablePoint(schema, table, pos1)
273-
c.Assert(newer, IsTrue)
272+
older := cp.IsOlderThanTablePoint(schema, table, pos1, false)
273+
c.Assert(older, IsFalse)
274274

275275
// save
276276
cp.SaveTablePoint(schema, table, pos2)
277-
newer = cp.IsNewerTablePoint(schema, table, pos1)
278-
c.Assert(newer, IsFalse)
277+
older = cp.IsOlderThanTablePoint(schema, table, pos1, false)
278+
c.Assert(older, IsTrue)
279279

280280
// rollback, to min
281281
cp.Rollback()
282-
newer = cp.IsNewerTablePoint(schema, table, pos1)
283-
c.Assert(newer, IsTrue)
282+
older = cp.IsOlderThanTablePoint(schema, table, pos1, false)
283+
c.Assert(older, IsFalse)
284284

285285
// save again
286286
cp.SaveTablePoint(schema, table, pos2)
287-
newer = cp.IsNewerTablePoint(schema, table, pos1)
288-
c.Assert(newer, IsFalse)
287+
older = cp.IsOlderThanTablePoint(schema, table, pos1, false)
288+
c.Assert(older, IsTrue)
289289

290290
// flush + rollback
291291
s.mock.ExpectBegin()
@@ -294,22 +294,22 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) {
294294
err = cp.FlushPointsExcept(tctx, nil, nil, nil)
295295
c.Assert(err, IsNil)
296296
cp.Rollback()
297-
newer = cp.IsNewerTablePoint(schema, table, pos1)
298-
c.Assert(newer, IsFalse)
297+
older = cp.IsOlderThanTablePoint(schema, table, pos1, false)
298+
c.Assert(older, IsTrue)
299299

300300
// clear, to min
301301
s.mock.ExpectBegin()
302302
s.mock.ExpectExec(clearCheckPointSQL).WillReturnResult(sqlmock.NewResult(0, 1))
303303
s.mock.ExpectCommit()
304304
err = cp.Clear(tctx)
305305
c.Assert(err, IsNil)
306-
newer = cp.IsNewerTablePoint(schema, table, pos1)
307-
c.Assert(newer, IsTrue)
306+
older = cp.IsOlderThanTablePoint(schema, table, pos1, false)
307+
c.Assert(older, IsFalse)
308308

309309
// save
310310
cp.SaveTablePoint(schema, table, pos2)
311-
newer = cp.IsNewerTablePoint(schema, table, pos1)
312-
c.Assert(newer, IsFalse)
311+
older = cp.IsOlderThanTablePoint(schema, table, pos1, false)
312+
c.Assert(older, IsTrue)
313313

314314
// test save table point less than global point
315315
func() {
@@ -330,6 +330,6 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) {
330330
err = cp.FlushPointsExcept(tctx, [][]string{{schema, table}}, nil, nil)
331331
c.Assert(err, IsNil)
332332
cp.Rollback()
333-
newer = cp.IsNewerTablePoint(schema, table, pos1)
334-
c.Assert(newer, IsTrue)
333+
older = cp.IsOlderThanTablePoint(schema, table, pos1, false)
334+
c.Assert(older, IsFalse)
335335
}

syncer/syncer.go

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ package syncer
1515

1616
import (
1717
"context"
18+
"errors"
1819
"fmt"
1920
"math"
2021
"os"
@@ -751,7 +752,34 @@ func (s *Syncer) checkWait(job *job) bool {
751752
return false
752753
}
753754

755+
// only used in tests.
756+
var (
757+
lastPos mysql.Position
758+
lastPosNum int
759+
waitJobsDone bool
760+
failExecuteSQL bool
761+
failOnce sync2.AtomicInt64
762+
)
763+
754764
func (s *Syncer) addJob(job *job) error {
765+
failpoint.Inject("countJobFromOneEvent", func() {
766+
if job.currentPos.Compare(lastPos) == 0 {
767+
lastPosNum++
768+
} else {
769+
lastPos = job.currentPos
770+
lastPosNum = 1
771+
}
772+
// trigger a flush after see one job
773+
if lastPosNum == 1 {
774+
waitJobsDone = true
775+
s.tctx.L().Info("meet the first job of an event", zap.Any("binlog position", lastPos))
776+
}
777+
// mock a execution error after see two jobs.
778+
if lastPosNum == 2 {
779+
failExecuteSQL = true
780+
s.tctx.L().Info("meet the second job of an event", zap.Any("binlog position", lastPos))
781+
}
782+
})
755783
var (
756784
queueBucket int
757785
execDDLReq *pb.ExecDDLRequest
@@ -802,6 +830,13 @@ func (s *Syncer) addJob(job *job) error {
802830
}
803831

804832
wait := s.checkWait(job)
833+
failpoint.Inject("flushFirstJobOfEvent", func() {
834+
if waitJobsDone {
835+
s.tctx.L().Info("trigger flushFirstJobOfEvent")
836+
waitJobsDone = false
837+
wait = true
838+
}
839+
})
805840
if wait {
806841
s.jobWg.Wait()
807842
s.c.reset()
@@ -1001,6 +1036,13 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, jo
10011036
if len(jobs) == 0 {
10021037
return nil
10031038
}
1039+
1040+
failpoint.Inject("failSecondJobOfEvent", func() {
1041+
if failExecuteSQL && failOnce.CompareAndSwap(0, 1) {
1042+
s.tctx.L().Info("trigger failSecondJobOfEvent")
1043+
failpoint.Return(errors.New("failSecondJobOfEvent"))
1044+
}
1045+
})
10041046
queries := make([]string, 0, len(jobs))
10051047
args := make([][]interface{}, 0, len(jobs))
10061048
for _, j := range jobs {
@@ -1470,7 +1512,7 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err
14701512
}
14711513

14721514
// DML position before table checkpoint, ignore it
1473-
if !s.checkpoint.IsNewerTablePoint(originSchema, originTable, *ec.currentPos) {
1515+
if s.checkpoint.IsOlderThanTablePoint(originSchema, originTable, *ec.currentPos, false) {
14741516
s.tctx.L().Debug("ignore obsolete event that is old than table checkpoint", zap.String("event", "row"), log.WrapStringerField("position", ec.currentPos), zap.String("origin schema", originSchema), zap.String("origin table", originTable))
14751517
return nil
14761518
}
@@ -1700,7 +1742,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e
17001742

17011743
// for DDL, we wait it to be executed, so we can check if event is newer in this syncer's main process goroutine
17021744
// ignore obsolete DDL here can avoid to try-sync again for already synced DDLs
1703-
if !s.checkpoint.IsNewerTablePoint(tableNames[0][0].Schema, tableNames[0][0].Name, *ec.currentPos) {
1745+
if s.checkpoint.IsOlderThanTablePoint(tableNames[0][0].Schema, tableNames[0][0].Name, *ec.currentPos, true) {
17041746
s.tctx.L().Info("ignore obsolete DDL", zap.String("event", "query"), zap.String("statement", sql), log.WrapStringerField("position", ec.currentPos))
17051747
continue
17061748
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
use all_mode;
2+
insert into t1 (id, name) values (10, '10'), (20, '20');
3+
insert into t1 (id, name) values (30, '30');
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
use all_mode;
2+
insert into t2 (id, name) values (10, '10'), (20, '20');
3+
insert into t2 (id, name) values (30, '30');

tests/all_mode/run.sh

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,54 @@ function test_session_config(){
5252
echo "[$(date)] <<<<<< finish test_session_config >>>>>>"
5353
}
5454

55-
function run() {
55+
function test_fail_job_between_event() {
56+
run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
57+
check_contains 'Query OK, 2 rows affected'
58+
run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2
59+
check_contains 'Query OK, 3 rows affected'
60+
61+
# start DM worker and master
62+
run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
63+
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT
64+
65+
inject_points=(
66+
"github.com/pingcap/dm/dm/worker/TaskCheckInterval=return(\"500ms\")"
67+
"github.com/pingcap/dm/syncer/countJobFromOneEvent=return()"
68+
"github.com/pingcap/dm/syncer/flushFirstJobOfEvent=return()"
69+
"github.com/pingcap/dm/syncer/failSecondJobOfEvent=return()"
70+
)
71+
export GO_FAILPOINTS="$(join_string \; ${inject_points[@]})"
72+
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
73+
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
74+
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
75+
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT
76+
77+
cp $cur/conf/dm-task.yaml $WORK_DIR/dm-task.yaml
78+
sed -i 's/sql_mode: ".*"/sql_mode: "NO_AUTO_VALUE_ON_ZERO"/g' $WORK_DIR/dm-task.yaml
79+
dmctl_start_task "$WORK_DIR/dm-task.yaml"
80+
81+
run_sql_file $cur/data/db1.increment3.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
82+
run_sql_file $cur/data/db2.increment3.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2
83+
sleep 2
84+
check_log_contains $WORK_DIR/worker1/log/dm-worker.log "failSecondJobOfEvent"
85+
check_log_contains $WORK_DIR/worker2/log/dm-worker.log "failSecondJobOfEvent"
86+
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
87+
"query-status test" \
88+
"\"result\": true" 3
89+
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml
90+
91+
cleanup_data all_mode
92+
cleanup_process $*
93+
94+
export GO_FAILPOINTS=''
95+
}
5696

97+
function run() {
5798
run_sql "SET @@GLOBAL.SQL_MODE='ANSI_QUOTES,NO_AUTO_VALUE_ON_ZERO'" $MYSQL_PORT1 $MYSQL_PASSWORD1
5899
run_sql "SET @@GLOBAL.SQL_MODE='ANSI_QUOTES,NO_AUTO_VALUE_ON_ZERO'" $MYSQL_PORT2 $MYSQL_PASSWORD2
59100

101+
test_fail_job_between_event
102+
60103
test_session_config
61104

62105
export GO_FAILPOINTS="github.com/pingcap/dm/dm/worker/TaskCheckInterval=return(\"500ms\")"

0 commit comments

Comments
 (0)