Skip to content

Commit a15c5e0

Browse files
authored
sqlreplay: fix replay quits immediately after graceful pause (#995)
1 parent 704568c commit a15c5e0

3 files changed

Lines changed: 48 additions & 47 deletions

File tree

pkg/sqlreplay/manager/manager.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,6 @@ func (jm *jobManager) StartReplay(cfg replay.ReplayConfig) error {
164164
jm.lg.Warn("start replay failed", zap.String("job", newJob.String()), zap.Error(err))
165165
return errors.Wrapf(err, "start replay failed")
166166
}
167-
jm.lg.Info("start replay", zap.String("job", newJob.String()))
168167
jm.addToHistory(newJob)
169168
return nil
170169
}

pkg/sqlreplay/replay/replay.go

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ type ReplayConfig struct {
8686
Format string
8787
Input string
8888
Username string
89-
Password string
89+
Password string `json:"-"`
9090
KeyFile string
9191
// It's specified when executing with the statement `TRAFFIC REPLAY` so that all TiProxy instances
9292
// use the same start time and the time acts as the job ID.
@@ -303,11 +303,13 @@ func (r *replay) Start(cfg ReplayConfig, backendTLSConfig *tls.Config, hsHandler
303303
return err
304304
}
305305

306+
r.lg.Info("start replay", zap.Any("config", cfg))
306307
r.Lock()
307308
defer r.Unlock()
308309
r.cfg = cfg
309310
r.storages = storages
310311
r.meta = *r.readMeta()
312+
r.gracefulStop.Store(false)
311313
r.startTime = cfg.StartTime
312314
r.endTime = time.Time{}
313315
r.progress = 0
@@ -449,6 +451,7 @@ func (r *replay) readCommands(ctx context.Context) {
449451
err = nil
450452
continue
451453
} else {
454+
r.lg.Info("decode failed, stop", zap.Error(err))
452455
break
453456
}
454457
}
@@ -509,7 +512,9 @@ func (r *replay) readCommands(ctx context.Context) {
509512
zap.Duration("extra_wait_time", extraWaitTime),
510513
zap.Int("alive_conns", connCount),
511514
zap.Time("last_cmd_start_ts", time.Unix(0, r.replayStats.CurCmdTs.Load())),
512-
zap.Time("last_cmd_end_ts", time.Unix(0, r.replayStats.CurCmdEndTs.Load())))
515+
zap.Time("last_cmd_end_ts", time.Unix(0, r.replayStats.CurCmdEndTs.Load())),
516+
zap.NamedError("ctx_err", ctx.Err()),
517+
zap.Bool("graceful_stop", r.gracefulStop.Load()))
513518

514519
// Notify the connections that the commands are finished.
515520
for _, conn := range conns {
@@ -928,15 +933,9 @@ func (r *replay) stop(err error) {
928933
}
929934
commonFields := r.commonFields()
930935
fields := append(commonFields, []zap.Field{
931-
zap.Time("start_time", r.startTime),
932-
zap.Time("end_time", r.endTime),
933-
zap.Time("command_start_time", r.cfg.CommandStartTime),
934-
zap.Time("command_end_time", r.cfg.CommandEndTime),
935-
zap.String("format", r.cfg.Format),
936-
zap.String("username", r.cfg.Username),
937-
zap.Bool("ignore_errs", r.cfg.IgnoreErrs),
938-
zap.Float64("speed", r.cfg.Speed),
939-
zap.Bool("read_only", r.cfg.ReadOnly),
936+
zap.Time("replay_start_time", r.startTime),
937+
zap.Time("replay_end_time", r.endTime),
938+
zap.Any("config", r.cfg),
940939
}...)
941940
if r.meta.Cmds > 0 {
942941
r.progress = float64(decodedCmds) / float64(r.meta.Cmds)

pkg/sqlreplay/replay/replay_test.go

Lines changed: 38 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -431,43 +431,46 @@ func TestGracefulStop(t *testing.T) {
431431
replay := NewReplay(zap.NewNop(), id.NewIDManager())
432432
defer replay.Close()
433433

434-
i := 0
435-
loader := &customizedReader{
436-
getCmd: func() *cmd.Command {
437-
j := rand.Uint64N(100) + 1
438-
command := newMockCommand(j)
439-
i++
440-
command.StartTs = time.Unix(0, int64(i)*int64(time.Microsecond))
441-
return command
442-
},
443-
}
434+
// Test 2 rounds to test that the graceful flag will be reset before each round.
435+
for n := 0; n < 2; n++ {
436+
i := 0
437+
loader := &customizedReader{
438+
getCmd: func() *cmd.Command {
439+
j := rand.Uint64N(100) + 1
440+
command := newMockCommand(j)
441+
i++
442+
command.StartTs = time.Unix(0, int64(i)*int64(time.Microsecond))
443+
return command
444+
},
445+
}
444446

445-
cfg := ReplayConfig{
446-
Input: t.TempDir(),
447-
Username: "u1",
448-
StartTime: time.Now(),
449-
readers: []cmd.LineReader{loader},
450-
connCreator: func(connID uint64, _ uint64) conn.Conn {
451-
return &mockDelayConn{
452-
stats: &replay.replayStats,
453-
closeCh: replay.closeConnCh,
454-
connID: connID,
455-
}
456-
},
457-
report: newMockReport(replay.exceptionCh),
458-
PSCloseStrategy: cmd.PSCloseStrategyDirected,
459-
}
460-
require.NoError(t, replay.Start(cfg, nil, nil, &backend.BCConfig{}))
447+
cfg := ReplayConfig{
448+
Input: t.TempDir(),
449+
Username: "u1",
450+
StartTime: time.Now(),
451+
readers: []cmd.LineReader{loader},
452+
connCreator: func(connID uint64, _ uint64) conn.Conn {
453+
return &mockDelayConn{
454+
stats: &replay.replayStats,
455+
closeCh: replay.closeConnCh,
456+
connID: connID,
457+
}
458+
},
459+
report: newMockReport(replay.exceptionCh),
460+
PSCloseStrategy: cmd.PSCloseStrategyDirected,
461+
}
462+
require.NoError(t, replay.Start(cfg, nil, nil, &backend.BCConfig{}))
461463

462-
time.Sleep(2 * time.Second)
463-
replay.Stop(errors.New("graceful stop"), true)
464-
// check that all the pending commands are replayed
465-
curCmdTs := replay.replayStats.CurCmdTs.Load()
466-
require.EqualValues(t, 0, replay.replayStats.PendingCmds.Load())
467-
require.EqualValues(t, curCmdTs, int64(replay.replayStats.ReplayedCmds.Load())*int64(time.Microsecond))
468-
_, _, lastTs, _, _, err := replay.Progress()
469-
require.ErrorContains(t, err, "graceful stop")
470-
require.Equal(t, curCmdTs, lastTs.UnixNano())
464+
time.Sleep(2 * time.Second)
465+
replay.Stop(errors.New("graceful stop"), true)
466+
// check that all the pending commands are replayed
467+
curCmdTs := replay.replayStats.CurCmdTs.Load()
468+
require.EqualValues(t, 0, replay.replayStats.PendingCmds.Load())
469+
require.EqualValues(t, curCmdTs, int64(replay.replayStats.ReplayedCmds.Load())*int64(time.Microsecond))
470+
_, _, lastTs, _, _, err := replay.Progress()
471+
require.ErrorContains(t, err, "graceful stop")
472+
require.Equal(t, curCmdTs, lastTs.UnixNano())
473+
}
471474
}
472475

473476
func BenchmarkMultiBufferedDecoder(b *testing.B) {

0 commit comments

Comments
 (0)