Skip to content

Commit 831b47c

Browse files
authored
replay: add lock to avoid starting replay task multiple times (#992)
Signed-off-by: Yang Keao <yangkeao@chunibyo.icu>
1 parent 9d570e0 commit 831b47c

5 files changed

Lines changed: 65 additions & 3 deletions

File tree

cmd/replayer/main.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,13 +58,14 @@ func main() {
5858
replayerIndex := rootCmd.PersistentFlags().Uint64("replayer-index", 0, "the index of this replayer instance. Used only when dynamic-input is enabled.")
5959
outputPath := rootCmd.PersistentFlags().String("output-path", "", "the file path to store replayed sql. Empty indicates do not output replayed sql.")
6060
serviceMode := rootCmd.PersistentFlags().Bool("service-mode", false, "run replayer in service mode")
61+
logLevel := rootCmd.PersistentFlags().String("log-level", "info", "the log level: debug, info, warn, error, dpanic, panic, fatal")
6162

6263
rootCmd.RunE = func(cmd *cobra.Command, _ []string) error {
6364
// set up general managers
6465
cfg := &config.Config{
6566
Log: config.Log{
6667
LogOnline: config.LogOnline{
67-
Level: "info",
68+
Level: *logLevel,
6869
LogFile: config.LogFile{Filename: *logFile},
6970
},
7071
},

pkg/sqlreplay/manager/manager.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package manager
66
import (
77
"crypto/tls"
88
"encoding/json"
9+
"sync"
910
"time"
1011

1112
"github.com/pingcap/tiproxy/lib/config"
@@ -46,6 +47,8 @@ type JobManager interface {
4647
var _ JobManager = (*jobManager)(nil)
4748

4849
type jobManager struct {
50+
mu sync.Mutex
51+
4952
jobHistory []Job
5053
capture capture.Capture
5154
replay replay.Replay
@@ -100,6 +103,9 @@ func (jm *jobManager) runningJob() Job {
100103
}
101104

102105
func (jm *jobManager) StartCapture(cfg capture.CaptureConfig) error {
106+
jm.mu.Lock()
107+
defer jm.mu.Unlock()
108+
103109
running := jm.runningJob()
104110
if running != nil {
105111
return errors.Errorf("a job is running: %s", running.String())
@@ -121,6 +127,9 @@ func (jm *jobManager) StartCapture(cfg capture.CaptureConfig) error {
121127
}
122128

123129
func (jm *jobManager) StartReplay(cfg replay.ReplayConfig) error {
130+
jm.mu.Lock()
131+
defer jm.mu.Unlock()
132+
124133
running := jm.runningJob()
125134
if running != nil {
126135
return errors.Errorf("a job is running: %s", running.String())
@@ -174,6 +183,9 @@ func (jm *jobManager) GetCapture() capture.Capture {
174183
}
175184

176185
func (jm *jobManager) Jobs() string {
186+
jm.mu.Lock()
187+
defer jm.mu.Unlock()
188+
177189
jm.updateProgress()
178190
b, err := json.MarshalIndent(jm.jobHistory, "", " ")
179191
if err != nil {
@@ -182,6 +194,8 @@ func (jm *jobManager) Jobs() string {
182194
return hack.String(b)
183195
}
184196

197+
// Wait waits for the running job to finish.
198+
// As `Wait` is a blocking call, it'll not acquire the jobManager lock. For now it's only used in standalone player mode.
185199
func (jm *jobManager) Wait() {
186200
job := jm.runningJob()
187201
if job == nil {
@@ -196,6 +210,9 @@ func (jm *jobManager) Wait() {
196210
}
197211

198212
func (jm *jobManager) Stop(cfg CancelConfig) string {
213+
jm.mu.Lock()
214+
defer jm.mu.Unlock()
215+
199216
job := jm.runningJob()
200217
if job == nil {
201218
return "no job running"
@@ -214,6 +231,9 @@ func (jm *jobManager) Stop(cfg CancelConfig) string {
214231
}
215232

216233
func (jm *jobManager) Close() {
234+
jm.mu.Lock()
235+
defer jm.mu.Unlock()
236+
217237
if jm.capture != nil {
218238
jm.capture.Close()
219239
}

pkg/sqlreplay/manager/manager_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@
44
package manager
55

66
import (
7+
"sync"
78
"testing"
89
"time"
910

1011
"github.com/pingcap/tiproxy/lib/config"
1112
"github.com/pingcap/tiproxy/lib/util/errors"
1213
"github.com/pingcap/tiproxy/pkg/manager/id"
1314
"github.com/pingcap/tiproxy/pkg/sqlreplay/capture"
15+
"github.com/pingcap/tiproxy/pkg/sqlreplay/cmd"
1416
"github.com/pingcap/tiproxy/pkg/sqlreplay/replay"
1517
"github.com/stretchr/testify/require"
1618
"go.uber.org/zap"
@@ -198,3 +200,35 @@ func TestAllowAddrOnlyForStandaloneService(t *testing.T) {
198200
})
199201
require.NoError(t, err)
200202
}
203+
204+
func TestAvoidConcurrentReplay(t *testing.T) {
205+
mgr := NewJobManager(zap.NewNop(), &config.Config{}, &mockCertMgr{}, id.NewIDManager(), nil, true)
206+
defer mgr.Close()
207+
mgr.replay = &mockReplay{}
208+
209+
successCount := 0
210+
attempts := 1000
211+
212+
wg := &sync.WaitGroup{}
213+
for i := 0; i < attempts; i++ {
214+
wg.Add(1)
215+
go func() {
216+
defer wg.Done()
217+
218+
err := mgr.StartReplay(replay.ReplayConfig{
219+
Addr: "127.0.0.1:10000",
220+
Input: "/tmp/traffic",
221+
Username: "root",
222+
StartTime: time.Now(),
223+
PSCloseStrategy: cmd.PSCloseStrategyAlways,
224+
})
225+
if err == nil {
226+
successCount++
227+
} else {
228+
require.ErrorContains(t, err, "a job is running")
229+
}
230+
}()
231+
}
232+
wg.Wait()
233+
require.Equal(t, 1, successCount)
234+
}

pkg/sqlreplay/replay/replay.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,14 @@ func (cfg *ReplayConfig) LoadFromCheckpoint() error {
238238
decoder := json.NewDecoder(file)
239239
var state replayCheckpoint
240240
if err := decoder.Decode(&state); err != nil {
241-
return errors.Wrapf(err, "failed to decode checkpoint file %s", cfg.CheckPointFilePath)
241+
// Allow empty file.
242+
stat, err2 := file.Stat()
243+
if err2 != nil {
244+
return errors.Wrapf(err, "failed to stat checkpoint file %s", cfg.CheckPointFilePath)
245+
}
246+
if stat.Size() != 0 {
247+
return errors.Wrapf(err, "failed to decode checkpoint file %s", cfg.CheckPointFilePath)
248+
}
242249
}
243250

244251
if state.CurCmdTs > 0 {

pkg/sqlreplay/replay/replay_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -570,7 +570,7 @@ func TestLoadFromCheckpoint(t *testing.T) {
570570
{
571571
checkpointData: "",
572572
setupFile: true,
573-
expectedError: true,
573+
expectedError: false,
574574
},
575575
}
576576

0 commit comments

Comments
 (0)