Skip to content

Commit 04874ad

Browse files
committed
fix(proxy): retry secondary writes on "read timestamp has been compacted"
EVALSHA replays to the ElasticKV secondary can surface FailedPrecondition / "read timestamp has been compacted" when the script's startTS falls behind a peer node's MinRetainedTS (the local readPin only protects the node that picked the timestamp). Each retry re-sends the command so the secondary re-selects a fresh read snapshot.
1 parent b0bc9d3 commit 04874ad

2 files changed

Lines changed: 110 additions & 8 deletions

File tree

proxy/dualwrite.go

Lines changed: 56 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"fmt"
77
"log/slog"
8+
"strings"
89
"sync"
910
"time"
1011

@@ -25,8 +26,30 @@ const (
2526
// contention bounded; this is only tolerable in modes where the script write
2627
// is targeting the non-authoritative backend.
2728
maxScriptWriteGoroutines = 64
29+
30+
// maxCompactedRetries caps retries when the secondary returns
31+
// "read timestamp has been compacted". Each attempt re-sends the command so
32+
// the secondary re-selects a fresh read snapshot; a small bound is enough
33+
// because the compaction waterline advances slowly relative to SecondaryTimeout.
34+
maxCompactedRetries = 3
35+
// compactedRetryInitialBackoff is the first delay before retrying a secondary
36+
// command that failed with a compacted-read error.
37+
compactedRetryInitialBackoff = 10 * time.Millisecond
2838
)
2939

40+
// readTSCompactedMarker is the substring produced by
41+
// store.ErrReadTSCompacted as it flows through gRPC (wrapped as
42+
// FailedPrecondition) and Lua PCall. Matching on substring is necessary
43+
// because both layers erase the typed error.
44+
const readTSCompactedMarker = "read timestamp has been compacted"
45+
46+
func isReadTSCompactedError(err error) bool {
47+
if err == nil {
48+
return false
49+
}
50+
return strings.Contains(err.Error(), readTSCompactedMarker)
51+
}
52+
3053
// DualWriter routes commands to primary and secondary backends based on mode.
3154
type DualWriter struct {
3255
primary Backend
@@ -225,14 +248,7 @@ func (d *DualWriter) writeSecondary(cmd string, iArgs []any) {
225248
defer cancel()
226249

227250
start := time.Now()
228-
result := d.secondary.Do(sCtx, iArgs...)
229-
_, sErr := result.Result()
230-
if isNoScriptError(sErr) {
231-
if fallbackArgs, ok := d.evalFallbackArgs(cmd, iArgs); ok {
232-
result = d.secondary.Do(sCtx, fallbackArgs...)
233-
_, sErr = result.Result()
234-
}
235-
}
251+
sErr := d.executeSecondary(sCtx, cmd, iArgs)
236252
d.metrics.CommandDuration.WithLabelValues(cmd, d.secondary.Name()).Observe(time.Since(start).Seconds())
237253

238254
if sErr != nil && !errors.Is(sErr, redis.Nil) {
@@ -248,6 +264,38 @@ func (d *DualWriter) writeSecondary(cmd string, iArgs []any) {
248264
d.metrics.CommandTotal.WithLabelValues(cmd, d.secondary.Name(), "ok").Inc()
249265
}
250266

267+
// executeSecondary sends the command to the secondary, handling the NOSCRIPT
268+
// → EVAL fallback and transparently retrying when the secondary reports that
269+
// the read snapshot has been compacted. A re-sent command causes the backend
270+
// to re-select a fresh read timestamp, which is the only way to recover once
271+
// the original startTS has fallen behind MinRetainedTS on a peer node.
272+
func (d *DualWriter) executeSecondary(sCtx context.Context, cmd string, iArgs []any) error {
273+
backoff := compactedRetryInitialBackoff
274+
var sErr error
275+
for attempt := 0; ; attempt++ {
276+
result := d.secondary.Do(sCtx, iArgs...)
277+
_, sErr = result.Result()
278+
if isNoScriptError(sErr) {
279+
if fallbackArgs, ok := d.evalFallbackArgs(cmd, iArgs); ok {
280+
result = d.secondary.Do(sCtx, fallbackArgs...)
281+
_, sErr = result.Result()
282+
}
283+
}
284+
if !isReadTSCompactedError(sErr) {
285+
return sErr
286+
}
287+
if attempt >= maxCompactedRetries {
288+
return sErr
289+
}
290+
select {
291+
case <-sCtx.Done():
292+
return sErr
293+
case <-time.After(backoff):
294+
}
295+
backoff *= 2
296+
}
297+
}
298+
251299
// goWrite launches fn in a bounded write goroutine.
252300
func (d *DualWriter) goWrite(fn func()) {
253301
d.goAsyncWithSem(d.writeSem, fn)

proxy/proxy_test.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -920,6 +920,60 @@ func TestDualWriter_Script_EvalSHARO_FallsBackToEvalRO(t *testing.T) {
920920
assert.InDelta(t, 0, testutil.ToFloat64(metrics.SecondaryWriteErrors), 0.001)
921921
}
922922

923+
func TestDualWriter_writeSecondary_RetriesReadTSCompacted(t *testing.T) {
924+
// Simulate ElasticKV returning "read timestamp has been compacted"
925+
// (wrapped as gRPC FailedPrecondition and surfaced through Lua PCall).
926+
// The proxy must transparently retry so the async EVALSHA replay
927+
// eventually succeeds with a fresher snapshot timestamp.
928+
primary := newMockBackend("primary")
929+
primary.doFunc = makeCmd("OK", nil)
930+
931+
secondary := newMockBackend("secondary")
932+
compactedErr := testRedisErr("<string>:71: rpc error: code = FailedPrecondition desc = rpc error: code = FailedPrecondition desc = read timestamp has been compacted stack traceback: \t[G]: in function 'rcall'")
933+
var calls int
934+
secondary.doFunc = func(ctx context.Context, args ...any) *redis.Cmd {
935+
calls++
936+
cmd := redis.NewCmd(ctx, args...)
937+
if calls < 3 {
938+
cmd.SetErr(compactedErr)
939+
return cmd
940+
}
941+
cmd.SetVal("OK")
942+
return cmd
943+
}
944+
945+
metrics := newTestMetrics()
946+
d := NewDualWriter(primary, secondary, ProxyConfig{Mode: ModeDualWrite, SecondaryTimeout: time.Second}, metrics, newTestSentry(), testLogger)
947+
948+
d.writeSecondary("EVALSHA", []any{[]byte("EVALSHA"), []byte("deadbeef"), []byte("0")})
949+
950+
assert.Equal(t, 3, calls, "secondary must be retried until it succeeds")
951+
assert.InDelta(t, 0, testutil.ToFloat64(metrics.SecondaryWriteErrors), 0.001,
952+
"a retried success must not count as a secondary write error")
953+
}
954+
955+
func TestDualWriter_writeSecondary_ReadTSCompactedRetriesAreBounded(t *testing.T) {
956+
// When the compacted error is persistent, the retry loop must stop after
957+
// maxCompactedRetries+1 attempts so the secondary goroutine returns
958+
// instead of burning a scriptSem slot indefinitely.
959+
primary := newMockBackend("primary")
960+
primary.doFunc = makeCmd("OK", nil)
961+
962+
secondary := newMockBackend("secondary")
963+
compactedErr := testRedisErr("rpc error: code = FailedPrecondition desc = read timestamp has been compacted")
964+
secondary.doFunc = makeCmd(nil, compactedErr)
965+
966+
metrics := newTestMetrics()
967+
d := NewDualWriter(primary, secondary, ProxyConfig{Mode: ModeDualWrite, SecondaryTimeout: time.Second}, metrics, newTestSentry(), testLogger)
968+
969+
d.writeSecondary("EVALSHA", []any{[]byte("EVALSHA"), []byte("deadbeef"), []byte("0")})
970+
971+
assert.Equal(t, maxCompactedRetries+1, secondary.CallCount(),
972+
"secondary must stop after maxCompactedRetries+1 attempts")
973+
assert.InDelta(t, 1, testutil.ToFloat64(metrics.SecondaryWriteErrors), 0.001,
974+
"a persistent compacted error must still be reported as a secondary write error")
975+
}
976+
923977
func TestDualWriter_Script_NoRememberOnPrimaryError(t *testing.T) {
924978
// Verify that a failed SCRIPT FLUSH on the primary does NOT clear the proxy
925979
// script cache, so that subsequent EVALSHA → EVAL fallbacks still work.

0 commit comments

Comments
 (0)