Skip to content

Commit 0aecc59

Browse files
committed
refactor(proxy): address review feedback on retry and leader discovery
- proxy/dualwrite.go: preserve NOSCRIPT→EVAL fallback across compacted retries so each attempt after the first skips the known-missing EVALSHA; add jitter + bounded exponential backoff; use time.NewTimer so the timer is released on ctx cancellation; tag the two Result() reads with the same nolint:wrapcheck used elsewhere. - proxy/leader_aware_backend.go: derive a cancellable context from stopCh so Close() interrupts in-flight INFO probes instead of waiting for refreshTimeout, check ctx.Err() between candidates, and log per-client Close errors. - proxy/leader_aware_backend_test.go: replace net.Listen with net.ListenConfig.Listen (noctx linter). - adapter/redis_info_test.go: gofmt. - proxy/proxy_test.go: regression test that the NOSCRIPT resolution is reused across compacted retries.
1 parent 1d57c01 commit 0aecc59

5 files changed

Lines changed: 128 additions & 21 deletions

File tree

adapter/redis_info_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@ type infoTestCoordinator struct {
2121
func (c *infoTestCoordinator) Dispatch(context.Context, *kv.OperationGroup[kv.OP]) (*kv.CoordinateResponse, error) {
2222
return &kv.CoordinateResponse{}, nil
2323
}
24-
func (c *infoTestCoordinator) IsLeader() bool { return c.isLeader }
25-
func (c *infoTestCoordinator) VerifyLeader() error { return nil }
26-
func (c *infoTestCoordinator) RaftLeader() raft.ServerAddress { return c.raftLeader }
27-
func (c *infoTestCoordinator) IsLeaderForKey([]byte) bool { return c.isLeader }
28-
func (c *infoTestCoordinator) VerifyLeaderForKey([]byte) error { return nil }
29-
func (c *infoTestCoordinator) RaftLeaderForKey([]byte) raft.ServerAddress { return c.raftLeader }
24+
func (c *infoTestCoordinator) IsLeader() bool { return c.isLeader }
25+
func (c *infoTestCoordinator) VerifyLeader() error { return nil }
26+
func (c *infoTestCoordinator) RaftLeader() raft.ServerAddress { return c.raftLeader }
27+
func (c *infoTestCoordinator) IsLeaderForKey([]byte) bool { return c.isLeader }
28+
func (c *infoTestCoordinator) VerifyLeaderForKey([]byte) error { return nil }
29+
func (c *infoTestCoordinator) RaftLeaderForKey([]byte) raft.ServerAddress { return c.raftLeader }
3030
func (c *infoTestCoordinator) Clock() *kv.HLC {
3131
if c.clock == nil {
3232
c.clock = kv.NewHLC()

proxy/dualwrite.go

Lines changed: 54 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"fmt"
77
"log/slog"
8+
"math/rand/v2"
89
"strings"
910
"sync"
1011
"time"
@@ -35,6 +36,10 @@ const (
3536
// compactedRetryInitialBackoff is the first delay before retrying a secondary
3637
// command that failed with a compacted-read error.
3738
compactedRetryInitialBackoff = 10 * time.Millisecond
39+
// compactedRetryMaxBackoff caps the jittered exponential backoff so it
40+
// stays well within SecondaryTimeout even if the retry policy is ever
41+
// widened.
42+
compactedRetryMaxBackoff = 100 * time.Millisecond
3843
)
3944

4045
// readTSCompactedMarker is the substring produced by
@@ -269,16 +274,22 @@ func (d *DualWriter) writeSecondary(cmd string, iArgs []any) {
269274
// the read snapshot has been compacted. A re-sent command causes the backend
270275
// to re-select a fresh read timestamp, which is the only way to recover once
271276
// the original startTS has fallen behind MinRetainedTS on a peer node.
277+
//
278+
// If the first attempt triggers a NOSCRIPT fallback, subsequent retry
279+
// attempts use the resolved EVAL args directly so we don't waste a
280+
// round-trip on the known-missing EVALSHA every iteration.
272281
func (d *DualWriter) executeSecondary(sCtx context.Context, cmd string, iArgs []any) error {
273282
backoff := compactedRetryInitialBackoff
274283
var sErr error
284+
args := iArgs
275285
for attempt := 0; ; attempt++ {
276-
result := d.secondary.Do(sCtx, iArgs...)
277-
_, sErr = result.Result()
286+
result := d.secondary.Do(sCtx, args...)
287+
_, sErr = result.Result() //nolint:wrapcheck // classification/error propagation preserves redis.Error types
278288
if isNoScriptError(sErr) {
279-
if fallbackArgs, ok := d.evalFallbackArgs(cmd, iArgs); ok {
280-
result = d.secondary.Do(sCtx, fallbackArgs...)
281-
_, sErr = result.Result()
289+
if fallbackArgs, ok := d.evalFallbackArgs(cmd, args); ok {
290+
args = fallbackArgs
291+
result = d.secondary.Do(sCtx, args...)
292+
_, sErr = result.Result() //nolint:wrapcheck // classification/error propagation preserves redis.Error types
282293
}
283294
}
284295
if !isReadTSCompactedError(sErr) {
@@ -287,15 +298,49 @@ func (d *DualWriter) executeSecondary(sCtx context.Context, cmd string, iArgs []
287298
if attempt >= maxCompactedRetries {
288299
return sErr
289300
}
290-
select {
291-
case <-sCtx.Done():
301+
if !waitCompactedRetryBackoff(sCtx, backoff) {
292302
return sErr
293-
case <-time.After(backoff):
294303
}
295-
backoff *= 2
304+
backoff = nextCompactedRetryBackoff(backoff)
296305
}
297306
}
298307

308+
// waitCompactedRetryBackoff sleeps for a jittered interval or returns early
309+
// when the context is cancelled. Returns false if the caller should abort
310+
// the retry loop (context done).
311+
//
312+
// Jitter is in [backoff, backoff + backoff/2) so that concurrent retries
313+
// caused by a single compaction waterline advancement do not re-hit the
314+
// secondary in lockstep. A NewTimer is used instead of time.After so the
315+
// timer is released promptly on ctx cancellation (avoiding a leak until
316+
// expiry when the async goroutine is shutting down).
317+
func waitCompactedRetryBackoff(ctx context.Context, backoff time.Duration) bool {
318+
if backoff <= 0 {
319+
return ctx.Err() == nil
320+
}
321+
jitter := time.Duration(0)
322+
if half := int64(backoff / 2); half > 0 {
323+
jitter = time.Duration(rand.Int64N(half)) //nolint:gosec // jitter for retry backoff, not security sensitive
324+
}
325+
timer := time.NewTimer(backoff + jitter)
326+
defer timer.Stop()
327+
328+
select {
329+
case <-ctx.Done():
330+
return false
331+
case <-timer.C:
332+
return true
333+
}
334+
}
335+
336+
func nextCompactedRetryBackoff(current time.Duration) time.Duration {
337+
next := current * 2
338+
if next > compactedRetryMaxBackoff {
339+
return compactedRetryMaxBackoff
340+
}
341+
return next
342+
}
343+
299344
// goWrite launches fn in a bounded write goroutine.
300345
func (d *DualWriter) goWrite(fn func()) {
301346
d.goAsyncWithSem(d.writeSem, fn)

proxy/leader_aware_backend.go

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,17 @@ func normalizeSeeds(seeds []string) []string {
112112
func (b *LeaderAwareRedisBackend) refreshLoop() {
113113
defer close(b.done)
114114

115-
b.refreshLeader(context.Background())
115+
// Derive a cancellable context from stopCh so that an in-flight INFO
116+
// probe is interrupted as soon as Close() is called; otherwise the
117+
// refreshTimeout must elapse before the loop can exit.
118+
ctx, cancel := context.WithCancel(context.Background())
119+
defer cancel()
120+
go func() {
121+
<-b.stopCh
122+
cancel()
123+
}()
124+
125+
b.refreshLeader(ctx)
116126

117127
t := time.NewTicker(b.refreshInterval)
118128
defer t.Stop()
@@ -121,9 +131,9 @@ func (b *LeaderAwareRedisBackend) refreshLoop() {
121131
case <-b.stopCh:
122132
return
123133
case <-t.C:
124-
b.refreshLeader(context.Background())
134+
b.refreshLeader(ctx)
125135
case <-b.refreshCh:
126-
b.refreshLeader(context.Background())
136+
b.refreshLeader(ctx)
127137
}
128138
}
129139
}
@@ -145,6 +155,9 @@ func (b *LeaderAwareRedisBackend) TriggerRefresh() {
145155
// healthy, so this converges in one probe during steady state.
146156
func (b *LeaderAwareRedisBackend) refreshLeader(ctx context.Context) {
147157
for _, addr := range b.candidateAddrs() {
158+
if ctx.Err() != nil {
159+
return
160+
}
148161
leader, err := b.probeLeader(ctx, addr)
149162
if err != nil {
150163
b.logger.Debug("leader probe failed", "addr", addr, "err", err)
@@ -331,8 +344,11 @@ func (b *LeaderAwareRedisBackend) Close() error {
331344

332345
var firstErr error
333346
for addr, cli := range clients {
334-
if err := cli.Close(); err != nil && firstErr == nil {
335-
firstErr = fmt.Errorf("close %s client %s: %w", b.name, addr, err)
347+
if err := cli.Close(); err != nil {
348+
b.logger.Warn("close leader-aware client failed", "backend", b.name, "addr", addr, "err", err)
349+
if firstErr == nil {
350+
firstErr = fmt.Errorf("close %s client %s: %w", b.name, addr, err)
351+
}
336352
}
337353
}
338354
return firstErr

proxy/leader_aware_backend_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ type fakeElasticKVNode struct {
6767

6868
func newFakeElasticKVNode(t *testing.T, leader string) *fakeElasticKVNode {
6969
t.Helper()
70-
ln, err := net.Listen("tcp", "127.0.0.1:0")
70+
var lc net.ListenConfig
71+
ln, err := lc.Listen(context.Background(), "tcp", "127.0.0.1:0")
7172
require.NoError(t, err)
7273
n := &fakeElasticKVNode{ln: ln, addr: ln.Addr().String()}
7374
n.SetLeader(leader)

proxy/proxy_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -974,6 +974,51 @@ func TestDualWriter_writeSecondary_ReadTSCompactedRetriesAreBounded(t *testing.T
974974
"a persistent compacted error must still be reported as a secondary write error")
975975
}
976976

977+
func TestDualWriter_writeSecondary_RetriesDoNotRepeatNoScriptProbe(t *testing.T) {
978+
// After the EVAL fallback resolves a NOSCRIPT, a compacted-retry must
979+
// re-send the resolved EVAL form directly — never the known-missing
980+
// EVALSHA — so we never burn an extra round-trip per retry attempt.
981+
primary := newMockBackend("primary")
982+
primary.doFunc = makeCmd("OK", nil)
983+
984+
script := "return ARGV[1]"
985+
sha := scriptSHA(script)
986+
compactedErr := testRedisErr("rpc error: code = FailedPrecondition desc = read timestamp has been compacted")
987+
988+
secondary := newMockBackend("secondary")
989+
var evalshaCalls, evalCalls int
990+
secondary.doFunc = func(ctx context.Context, args ...any) *redis.Cmd {
991+
cmd := redis.NewCmd(ctx, args...)
992+
switch string(args[0].([]byte)) {
993+
case "EVALSHA":
994+
evalshaCalls++
995+
cmd.SetErr(testRedisErr("NOSCRIPT No matching script. Please use EVAL."))
996+
case "EVAL":
997+
evalCalls++
998+
// First resolved EVAL fails with compacted; second succeeds.
999+
if evalCalls == 1 {
1000+
cmd.SetErr(compactedErr)
1001+
} else {
1002+
cmd.SetVal("OK")
1003+
}
1004+
default:
1005+
t.Fatalf("unexpected secondary command %v", args[0])
1006+
}
1007+
return cmd
1008+
}
1009+
1010+
metrics := newTestMetrics()
1011+
d := NewDualWriter(primary, secondary, ProxyConfig{Mode: ModeRedisOnly, SecondaryTimeout: time.Second}, metrics, newTestSentry(), testLogger)
1012+
d.storeScript(script)
1013+
1014+
d.cfg.Mode = ModeDualWrite
1015+
d.writeSecondary("EVALSHA", []any{[]byte("EVALSHA"), []byte(sha), []byte("0"), []byte("value")})
1016+
1017+
assert.Equal(t, 1, evalshaCalls, "EVALSHA must be probed only once; the compacted retry must use the resolved EVAL form")
1018+
assert.Equal(t, 2, evalCalls, "EVAL must be retried after the compacted failure")
1019+
assert.InDelta(t, 0, testutil.ToFloat64(metrics.SecondaryWriteErrors), 0.001)
1020+
}
1021+
9771022
func TestDualWriter_Script_NoRememberOnPrimaryError(t *testing.T) {
9781023
// Verify that a failed SCRIPT FLUSH on the primary does NOT clear the proxy
9791024
// script cache, so that subsequent EVALSHA → EVAL fallbacks still work.

0 commit comments

Comments
 (0)