Skip to content

Commit caae357

Browse files
authored
[feat][evaluation] experiment retry mode (#434)
* feat(evaluation): retry extension * feat(evaluation): retry gen * feat(evaluation): retry extension * fix(evaluation): event mode * fix(evaluation): retryall * fix(evaluation): retry * fix(evaluation): custom raw err * fix(evaluation): errmsg * fix(evaluation): existed turn result * fix(evaluation): scanitems * fix(evaluation): scanitems * fix(evaluation): scancond * fix(evaluation): gorm sql * feat(evaluation): itemretry * fix(evaluation): ut * fix(evaluation): ut * fix(evaluation): experiment template itemretrynum * fix(evaluation): runid * fix(evaluation): appenditemids panic * fix(evaluation): unlock * fix(evaluation): completing lock * fix(evaluation): retryitems epxtend * fix(evaluation): evaluator reenter * fix(evaluation): expt event expiration * fix(evaluation): mockgen * fix(evaluation): ut * fix(evaluation): IgnoreExistedResult * fix(evaluation): itemretrynum * fix(evaluation): ListEvaluationSetItems return * fix(evaluation): ut
1 parent 34b145e commit caae357

53 files changed

Lines changed: 5130 additions & 852 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

backend/go.sum

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -473,6 +473,7 @@ github.com/google/pprof v0.0.0-20240827171923-fa2c70bbbfe5/go.mod h1:vavhavw2zAx
473473
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
474474
github.com/google/s2a-go v0.1.8 h1:zZDs9gcbt9ZPLV0ndSyQk6Kacx2g/X+SKYovpnz3SMM=
475475
github.com/google/s2a-go v0.1.8/go.mod h1:6iNWHTpQ+nfNRN5E00MSdfDwVesa8hhS32PhPO8deJA=
476+
github.com/google/subcommands v1.2.0 h1:vWQspBTo2nEqTUFita5/KeEWlUL8kQObDFbub/EN9oE=
476477
github.com/google/subcommands v1.2.0/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3yTrtFlrHVk=
477478
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
478479
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=

backend/infra/lock/lock.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,13 @@ type ILocker interface {
3030
// 后退出续期。调用方做写操作前应检查 ctx.Done 以确认仍持有锁,发生错误时应调用 cancel 以主动释放锁。
3131
LockBackoffWithRenew(parent context.Context, key string, ttl time.Duration, maxHold time.Duration) (locked bool, ctx context.Context, cancel func(), err error)
3232
LockWithRenew(parent context.Context, key string, ttl time.Duration, maxHold time.Duration) (locked bool, ctx context.Context, cancel func(), err error)
33+
34+
BackoffLockWithValue(ctx context.Context, key, val string, expiresIn time.Duration, backoff time.Duration) (bool, string, error)
35+
UnlockWithValue(ctx context.Context, key, val string) (bool, error)
36+
// UnlockForce deletes the key without comparing its value.
37+
UnlockForce(ctx context.Context, key string) (bool, error)
38+
// Exists returns true if the key exists.
39+
Exists(ctx context.Context, key string) (bool, error)
3340
}
3441

3542
func NewRedisLocker(c redis.Cmdable) ILocker {
@@ -143,6 +150,35 @@ func (r *redisLocker) Unlock(key string) (bool, error) {
143150
return rt == 1, nil
144151
}
145152

153+
func (r *redisLocker) UnlockWithValue(ctx context.Context, key, val string) (bool, error) {
154+
const unlockWithValueScript = `if redis.call('GET', KEYS[1]) == ARGV[1] then redis.call('DEL', KEYS[1]); return 1; end; return 0;`
155+
result, err := r.c.Eval(ctx, unlockWithValueScript, []string{key}, val).Result()
156+
if err != nil {
157+
return false, errors.WithMessage(err, "unlock with lua script")
158+
}
159+
rt, ok := result.(int64)
160+
if !ok {
161+
return false, errors.Errorf("unknown result type %T", result)
162+
}
163+
return rt == 1, nil
164+
}
165+
166+
func (r *redisLocker) UnlockForce(ctx context.Context, key string) (bool, error) {
167+
n, err := r.c.Del(ctx, key).Result()
168+
if err != nil {
169+
return false, errors.WithMessage(err, "unlock force del")
170+
}
171+
return n > 0, nil
172+
}
173+
174+
func (r *redisLocker) Exists(ctx context.Context, key string) (bool, error) {
175+
n, err := r.c.Exists(ctx, key).Result()
176+
if err != nil {
177+
return false, errors.WithMessage(err, "exists")
178+
}
179+
return n > 0, nil
180+
}
181+
146182
func (r *redisLocker) renewLock(ctx context.Context, key string, ttl time.Duration, maxHold time.Duration) {
147183
t1 := time.After(maxHold)
148184
t2 := time.NewTicker(gvalue.Max(time.Second, ttl>>2))
@@ -203,3 +239,59 @@ func (r *redisLocker) ExpireLockIn(key string, expiresIn time.Duration) (bool, e
203239
}
204240
return rt == 1, nil
205241
}
242+
243+
const setNXWithGetScript = `
244+
local ok = redis.call('SET', KEYS[1], ARGV[1], 'NX', 'PX', ARGV[2])
245+
if ok then
246+
return {1, ARGV[1]}
247+
else
248+
local cur = redis.call('GET', KEYS[1])
249+
return {0, cur or ''}
250+
end
251+
`
252+
253+
func (r *redisLocker) BackoffLockWithValue(ctx context.Context, key, val string, expiresIn time.Duration, maxWait time.Duration) (bool, string, error) {
254+
if expiresIn < time.Second {
255+
return false, "", fmt.Errorf("lock ttl too short")
256+
}
257+
258+
var ok bool
259+
var lastHolder string
260+
bf := backoff.NewExponentialBackOff()
261+
bf.InitialInterval = 50 * time.Millisecond
262+
bf.MaxInterval = 300 * time.Millisecond
263+
bf.MaxElapsedTime = maxWait
264+
265+
errNotLocked := errors.New("lock hold by others")
266+
err := backoff.Retry(func() error {
267+
result, err := r.c.Eval(ctx, setNXWithGetScript, []string{key}, val, int64(expiresIn/time.Millisecond)).Result()
268+
if err != nil {
269+
return errors.WithMessage(err, fmt.Sprintf("redis setnx with get fail, key: %v", key))
270+
}
271+
sl, okType := result.([]interface{})
272+
if !okType || len(sl) != 2 {
273+
return errors.Errorf("unexpected script result type %T or length", result)
274+
}
275+
locked, _ := sl[0].(int64)
276+
if locked == 1 {
277+
ok = true
278+
return nil
279+
}
280+
switch v := sl[1].(type) {
281+
case string:
282+
lastHolder = v
283+
case []byte:
284+
lastHolder = string(v)
285+
default:
286+
return errors.Errorf("unexpected lua script result type %T or length, key: %v", sl[1], key)
287+
}
288+
return errNotLocked
289+
}, bf)
290+
if err != nil {
291+
if errors.Is(err, errNotLocked) {
292+
return false, lastHolder, nil
293+
}
294+
return false, "", err
295+
}
296+
return ok, val, nil
297+
}

backend/infra/lock/lock_test.go

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,157 @@ func newIntCmdResult(val int64, err error) *redis.Cmd {
2525
return cmd
2626
}
2727

28+
// helper to build a redis.Cmd for Eval script result (val is typically []interface{}{locked, holder})
29+
func newEvalCmdResult(val interface{}, err error) *redis.Cmd {
30+
cmd := redis.NewCmd(context.Background())
31+
if err != nil {
32+
cmd.SetErr(err)
33+
return cmd
34+
}
35+
cmd.SetVal(val)
36+
return cmd
37+
}
38+
39+
func TestRedisLocker_BackoffLockWithValue(t *testing.T) {
40+
ctrl := gomock.NewController(t)
41+
defer ctrl.Finish()
42+
43+
cases := []struct {
44+
name string
45+
expiresIn time.Duration
46+
maxWait time.Duration
47+
setupMock func(m *redisMocks.MockPersistentCmdable)
48+
wantOk bool
49+
wantVal string
50+
wantErr bool
51+
}{
52+
{
53+
name: "ttl_too_short_returns_error",
54+
expiresIn: 100 * time.Millisecond,
55+
maxWait: time.Second,
56+
setupMock: func(m *redisMocks.MockPersistentCmdable) {},
57+
wantOk: false,
58+
wantVal: "",
59+
wantErr: true,
60+
},
61+
{
62+
name: "success_locked_on_first_try",
63+
expiresIn: 2 * time.Second,
64+
maxWait: time.Second,
65+
setupMock: func(m *redisMocks.MockPersistentCmdable) {
66+
m.EXPECT().
67+
Eval(gomock.Any(), setNXWithGetScript, []string{"key1"}, "val1", int64(2000)).
68+
Return(newEvalCmdResult([]interface{}{int64(1), "val1"}, nil)).
69+
Times(1)
70+
},
71+
wantOk: true,
72+
wantVal: "val1",
73+
wantErr: false,
74+
},
75+
{
76+
name: "locked_by_others_returns_holder_string",
77+
expiresIn: 2 * time.Second,
78+
maxWait: 100 * time.Millisecond,
79+
setupMock: func(m *redisMocks.MockPersistentCmdable) {
80+
m.EXPECT().
81+
Eval(gomock.Any(), setNXWithGetScript, gomock.Any(), gomock.Any(), gomock.Any()).
82+
Return(newEvalCmdResult([]interface{}{int64(0), "other_holder"}, nil)).
83+
AnyTimes()
84+
},
85+
wantOk: false,
86+
wantVal: "other_holder",
87+
wantErr: false,
88+
},
89+
{
90+
name: "locked_by_others_returns_holder_bytes",
91+
expiresIn: 2 * time.Second,
92+
maxWait: 100 * time.Millisecond,
93+
setupMock: func(m *redisMocks.MockPersistentCmdable) {
94+
m.EXPECT().
95+
Eval(gomock.Any(), setNXWithGetScript, gomock.Any(), gomock.Any(), gomock.Any()).
96+
Return(newEvalCmdResult([]interface{}{int64(0), []byte("byte_holder")}, nil)).
97+
AnyTimes()
98+
},
99+
wantOk: false,
100+
wantVal: "byte_holder",
101+
wantErr: false,
102+
},
103+
{
104+
name: "redis_error_returns_error",
105+
expiresIn: 2 * time.Second,
106+
maxWait: 100 * time.Millisecond,
107+
setupMock: func(m *redisMocks.MockPersistentCmdable) {
108+
m.EXPECT().
109+
Eval(gomock.Any(), setNXWithGetScript, gomock.Any(), gomock.Any(), gomock.Any()).
110+
Return(newEvalCmdResult(nil, context.DeadlineExceeded)).
111+
AnyTimes()
112+
},
113+
wantOk: false,
114+
wantVal: "",
115+
wantErr: true,
116+
},
117+
{
118+
name: "unexpected_script_result_type_returns_error",
119+
expiresIn: 2 * time.Second,
120+
maxWait: 100 * time.Millisecond,
121+
setupMock: func(m *redisMocks.MockPersistentCmdable) {
122+
m.EXPECT().
123+
Eval(gomock.Any(), setNXWithGetScript, gomock.Any(), gomock.Any(), gomock.Any()).
124+
Return(newEvalCmdResult(interface{}("not_a_slice"), nil)).
125+
AnyTimes()
126+
},
127+
wantOk: false,
128+
wantVal: "",
129+
wantErr: true,
130+
},
131+
{
132+
name: "unexpected_script_result_length_returns_error",
133+
expiresIn: 2 * time.Second,
134+
maxWait: 100 * time.Millisecond,
135+
setupMock: func(m *redisMocks.MockPersistentCmdable) {
136+
m.EXPECT().
137+
Eval(gomock.Any(), setNXWithGetScript, gomock.Any(), gomock.Any(), gomock.Any()).
138+
Return(newEvalCmdResult([]interface{}{int64(0)}, nil)).
139+
AnyTimes()
140+
},
141+
wantOk: false,
142+
wantVal: "",
143+
wantErr: true,
144+
},
145+
{
146+
name: "unexpected_holder_type_returns_error",
147+
expiresIn: 2 * time.Second,
148+
maxWait: 100 * time.Millisecond,
149+
setupMock: func(m *redisMocks.MockPersistentCmdable) {
150+
m.EXPECT().
151+
Eval(gomock.Any(), setNXWithGetScript, gomock.Any(), gomock.Any(), gomock.Any()).
152+
Return(newEvalCmdResult([]interface{}{int64(0), 12345}, nil)).
153+
AnyTimes()
154+
},
155+
wantOk: false,
156+
wantVal: "",
157+
wantErr: true,
158+
},
159+
}
160+
for _, c := range cases {
161+
t.Run(c.name, func(t *testing.T) {
162+
mockRedis := redisMocks.NewMockPersistentCmdable(ctrl)
163+
c.setupMock(mockRedis)
164+
locker := &redisLocker{c: mockRedis, holder: "holder"}
165+
ok, val, err := locker.BackoffLockWithValue(context.Background(), "key1", "val1", c.expiresIn, c.maxWait)
166+
if c.wantErr != (err != nil) {
167+
t.Fatalf("err: got %v, wantErr %v", err, c.wantErr)
168+
}
169+
if ok != c.wantOk {
170+
t.Errorf("ok: got %v, want %v", ok, c.wantOk)
171+
}
172+
if val != c.wantVal {
173+
t.Errorf("val: got %q, want %q", val, c.wantVal)
174+
}
175+
})
176+
}
177+
}
178+
28179
func TestRedisLocker_renewLock_ContextDoneUnlocksAndReturns(t *testing.T) {
29180
ctrl := gomock.NewController(t)
30181
defer ctrl.Finish()

backend/infra/lock/mocks/lock.go

Lines changed: 61 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)