Skip to content

Commit d23cf69

Browse files
authored
fix(adapter/sqs): standard SendMessageBatch double-send under leader churn (#923)
## Summary Fixes a **double-send** bug in the SQS standard-queue `SendMessageBatch` path — the same `:duplicate-elements` class fixed for DynamoDB in PR #920, found by an adversarial S3/SQS audit prompted by the "extend dedup to S3/SQS" follow-up. ## The bug `sendMessageBatchWithRetry` → `trySendMessageBatchOnce` → `sendBatchStandardOnce` runs an **in-process OCC retry loop** that, on every attempt, re-ran `buildBatchSendRecord` → `buildSendRecord` → `newMessageIDHex()`, minting a **fresh random MessageID (and send timestamp) per entry per attempt**, and derived the `data`/`vis`/`byage` storage keys from them. Under a Raft leader-election storm, attempt 1 can **commit** its writes yet surface a self-inflicted `WriteConflict` to the adapter (the mechanism PR #920 documented). The retry then mints **new** MessageIDs and writes a **second copy of every entry** at new keys: 1. Attempt 1: mints `{M1,M2,M3}`, commits, but returns `WriteConflict` under churn. 2. Retry: mints `{M1',M2',M3'}`, commits cleanly, returns `{M1',M2',M3'}` + HTTP 200. 3. Queue holds **both** sets → every batch entry duplicated; the client only learns the second set. Standard queues carry no dedup identity, and the OCC `ReadKeys` are only `[meta, gen]` (the random data keys never self-conflict), so nothing fenced this. **Why only batch:** single `SendMessage` has **no** in-process retry (a conflict becomes a normal at-least-once SDK retry), and FIFO send is fenced by the `MessageDeduplicationId` dedup record. The batch path added an in-process retry without either fence. ## The fix Pre-generate **one stable `sqsSendIdentity`** (MessageID + receipt token + send timestamp) **per entry, before the retry loop**, and reuse it on every attempt (`buildSendRecordWithIdentity`). The storage keys then stay constant across retries, so a committed-but-conflicted attempt is **overwritten idempotently** instead of duplicated. No FSM probe / `PrevCommitTS` / gate is needed — unlike DynamoDB's `list_append` (which re-read a *growing* value), the batch write is a whole-value PUT at a now-stable key, so re-applying is a plain idempotent overwrite. The record's `QueueGeneration` is still re-derived per attempt, so a concurrent `DeleteQueue`/`PurgeQueue` generation bump is followed correctly (the stale-gen attempt-1 write is orphaned/unreachable). ## S3/SQS audit outcome (recorded in the design doc) A handler-by-handler adversarial sweep found the original "S3/SQS share the same latent gap" note was over-broad: - **S3 — safe.** Whole-value PUT at a stable key; `PutObject` is OCC-fenced by a caller-supplied `StartTS` (a leader-move retry that races an interleaved write **fails the fence rather than clobbering**), and has no in-process re-read-recompute. `CompleteMultipartUpload`'s manifest is re-assembled deterministically from the **immutable** uploaded parts (computed once before its retry loop), not from a re-read of the object value → no duplicate. - **SQS single `SendMessage` / FIFO — safe** (random-UUID key + no in-process retry; FIFO dedup record). - **SQS standard `SendMessageBatch` — fixed here.** ## Tests (`adapter/sqs_batch_send_dedup_test.go`) - `TestSendMessageBatchStandard_RetryReusesStableKeys` — first dispatch commits-then-conflicts; asserts the retry dispatches the **same** storage keys (fresh MessageIDs would fail it → reproduces the bug pre-fix). - `TestSendMessageBatchStandard_ReturnedMessageIdsStableAcrossRetry` — the returned MessageId is the one actually written on both attempts (client id == stored id). ## Validation - `go test ./adapter/ -run 'SendMessage|Batch|FIFO|Send'` pass; new dedup tests pass. - `golangci-lint run ./adapter/...` 0 issues; `go build ./...` clean. ## Self-review (5 passes) 1. **Data loss** — the fix prevents a double-send (extra messages), and keeps the generation re-derivation so a concurrent queue delete/recreate still routes to the live generation; no committed message is dropped. 2. **Concurrency / distributed** — stable keys make the leader-churn retry an idempotent overwrite; the `[meta,gen]` ReadKeys fence against `DeleteQueue`/`PurgeQueue` is unchanged. 3. **Performance** — identities are minted once instead of once-per-attempt (strictly fewer `crypto/rand` calls on the retry path); hot path (no retry) is one mint per entry, same as before. 4. **Data consistency** — eliminates the duplicate-element anomaly for standard batch send; FIFO/exactly-once and single-send semantics unchanged. 5. **Test coverage** — new co-located tests pin key-stability across the retry (the regression), the load-bearing property of the fix.
2 parents 13a1b61 + 65fe7d0 commit d23cf69

4 files changed

Lines changed: 347 additions & 34 deletions

File tree

Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
package adapter
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/bootjp/elastickv/kv"
8+
"github.com/bootjp/elastickv/store"
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
// recordingBatchCoordinator records the mutation keys of each dispatch and
13+
// fails the FIRST dispatch with a retryable WriteConflict — modelling a
14+
// committed-but-conflicted attempt under leader churn (the apply may have
15+
// landed, but the dispatch surfaced WriteConflict). The second dispatch
16+
// succeeds. It embeds stubAdapterCoordinator for the Coordinator surface
17+
// (IsLeader, Clock, ...) and only overrides Dispatch.
18+
type recordingBatchCoordinator struct {
19+
stubAdapterCoordinator
20+
dispatchKeys [][]string
21+
// beforeDispatch, if set, runs at the start of each Dispatch with the
22+
// 1-based dispatch number — lets a test mutate queue state between attempts.
23+
beforeDispatch func(n int)
24+
}
25+
26+
func (c *recordingBatchCoordinator) Dispatch(_ context.Context, req *kv.OperationGroup[kv.OP]) (*kv.CoordinateResponse, error) {
27+
if c.beforeDispatch != nil {
28+
c.beforeDispatch(len(c.dispatchKeys) + 1)
29+
}
30+
keys := make([]string, 0, len(req.Elems))
31+
for _, e := range req.Elems {
32+
keys = append(keys, string(e.Key))
33+
}
34+
c.dispatchKeys = append(c.dispatchKeys, keys)
35+
if len(c.dispatchKeys) == 1 {
36+
return nil, store.ErrWriteConflict
37+
}
38+
return &kv.CoordinateResponse{}, nil
39+
}
40+
41+
func seedStandardQueue(t *testing.T, st store.MVCCStore) {
42+
const name = "q"
43+
t.Helper()
44+
meta := &sqsQueueMeta{
45+
Name: name,
46+
Generation: 1,
47+
MaximumMessageSize: 262144,
48+
IsFIFO: false,
49+
}
50+
body, err := encodeSQSQueueMeta(meta)
51+
require.NoError(t, err)
52+
require.NoError(t, st.PutAt(context.Background(), sqsQueueMetaKey(name), body, 1, 0))
53+
}
54+
55+
// TestSendMessageBatchStandard_RetryReusesStableKeys pins the standard-queue
56+
// batch-send dedup fix: when the first dispatch commits-then-conflicts under
57+
// leader churn, the retry must reuse the SAME storage keys (stable MessageIDs +
58+
// timestamps minted once before the loop) so the committed attempt is
59+
// overwritten idempotently. Without the fix the retry re-mints fresh
60+
// MessageIDs, writing a SECOND copy of every entry at new keys — the
61+
// :duplicate-elements double-send (same class as DynamoDB PR #920).
62+
func TestSendMessageBatchStandard_RetryReusesStableKeys(t *testing.T) {
63+
t.Parallel()
64+
ctx := context.Background()
65+
st := store.NewMVCCStore()
66+
seedStandardQueue(t, st)
67+
68+
coord := &recordingBatchCoordinator{stubAdapterCoordinator: stubAdapterCoordinator{clock: kv.NewHLC()}}
69+
srv := &SQSServer{store: st, coordinator: coord}
70+
71+
entries := []sqsSendMessageBatchEntryInput{
72+
{Id: "a", MessageBody: "alpha"},
73+
{Id: "b", MessageBody: "bravo"},
74+
}
75+
successful, failed, err := srv.sendMessageBatchWithRetry(ctx, "q", entries)
76+
require.NoError(t, err)
77+
require.Empty(t, failed)
78+
require.Len(t, successful, 2)
79+
80+
require.Len(t, coord.dispatchKeys, 2, "attempt 1 conflicts (committed-but-conflicted), attempt 2 retries")
81+
require.NotEmpty(t, coord.dispatchKeys[0])
82+
require.Equal(t, coord.dispatchKeys[0], coord.dispatchKeys[1],
83+
"the retry MUST reuse the same storage keys; fresh MessageIDs would double-send every entry under leader churn")
84+
}
85+
86+
// TestSendMessageBatchStandard_VisKeyStableAcrossDelayChange pins codex P2:
87+
// if a SetQueueAttributes changes the queue's DelaySeconds between a
88+
// committed-but-conflicted attempt and its retry, the retry must still write
89+
// the SAME vis/by-age keys (AvailableAtMillis is captured in the identity at
90+
// first mint, not recomputed from the changed delay) — otherwise the first
91+
// attempt's vis index entry is orphaned and can redeliver the message.
92+
func TestSendMessageBatchStandard_VisKeyStableAcrossDelayChange(t *testing.T) {
93+
t.Parallel()
94+
ctx := context.Background()
95+
st := store.NewMVCCStore()
96+
seedStandardQueue(t, st) // DelaySeconds = 0
97+
98+
coord := &recordingBatchCoordinator{stubAdapterCoordinator: stubAdapterCoordinator{clock: kv.NewHLC()}}
99+
coord.beforeDispatch = func(n int) {
100+
if n != 2 {
101+
return
102+
}
103+
// SetQueueAttributes raises DelaySeconds to 900 before the retry.
104+
meta := &sqsQueueMeta{Name: "q", Generation: 1, MaximumMessageSize: 262144, DelaySeconds: 900}
105+
body, err := encodeSQSQueueMeta(meta)
106+
require.NoError(t, err)
107+
require.NoError(t, st.PutAt(ctx, sqsQueueMetaKey("q"), body, 100, 0))
108+
}
109+
srv := &SQSServer{store: st, coordinator: coord}
110+
111+
entries := []sqsSendMessageBatchEntryInput{{Id: "a", MessageBody: "alpha"}} // no per-message DelaySeconds
112+
successful, _, err := srv.sendMessageBatchWithRetry(ctx, "q", entries)
113+
require.NoError(t, err)
114+
require.Len(t, successful, 1)
115+
116+
require.Len(t, coord.dispatchKeys, 2)
117+
require.Equal(t, coord.dispatchKeys[0], coord.dispatchKeys[1],
118+
"vis/by-age keys must stay stable across a mid-retry DelaySeconds change (cached AvailableAtMillis)")
119+
}
120+
121+
// TestSendMessageBatchStandard_RevalidatesAgainstCurrentMetaOnRetry pins codex
122+
// P2 round-2: if a prior attempt minted the identity but did NOT commit (lost a
123+
// normal OCC race) and a concurrent SetQueueAttributes then lowered
124+
// MaximumMessageSize below the body, the retry must re-validate against the
125+
// fresh meta and REJECT the entry rather than commit it on the cached identity.
126+
func TestSendMessageBatchStandard_RevalidatesAgainstCurrentMetaOnRetry(t *testing.T) {
127+
t.Parallel()
128+
ctx := context.Background()
129+
st := store.NewMVCCStore()
130+
seedStandardQueue(t, st) // MaximumMessageSize = 262144
131+
132+
coord := &recordingBatchCoordinator{stubAdapterCoordinator: stubAdapterCoordinator{clock: kv.NewHLC()}}
133+
coord.beforeDispatch = func(n int) {
134+
if n != 1 {
135+
return
136+
}
137+
// SetQueueAttributes lowers MaximumMessageSize below the body size,
138+
// landing during attempt 1's dispatch (which does not commit — it
139+
// returns a WriteConflict). The retry then loads this lowered meta
140+
// BEFORE re-validating, so the entry must be rejected.
141+
meta := &sqsQueueMeta{Name: "q", Generation: 1, MaximumMessageSize: 3}
142+
body, err := encodeSQSQueueMeta(meta)
143+
require.NoError(t, err)
144+
require.NoError(t, st.PutAt(ctx, sqsQueueMetaKey("q"), body, 100, 0))
145+
}
146+
srv := &SQSServer{store: st, coordinator: coord}
147+
148+
entries := []sqsSendMessageBatchEntryInput{{Id: "a", MessageBody: "alpha"}} // 5 bytes > new limit 3
149+
successful, failed, err := srv.sendMessageBatchWithRetry(ctx, "q", entries)
150+
require.NoError(t, err)
151+
require.Empty(t, successful, "the retry must re-validate against the lowered limit and reject the entry")
152+
require.Len(t, failed, 1)
153+
require.Equal(t, "a", failed[0].Id)
154+
}
155+
156+
// TestSendMessageBatchStandard_ReturnedMessageIdsStableAcrossRetry pins that
157+
// the MessageIds reported to the client are the ones actually stored on the
158+
// retry (not a discarded first-attempt set), so a consumer cannot observe a
159+
// MessageId the client never received.
160+
func TestSendMessageBatchStandard_ReturnedMessageIdsStableAcrossRetry(t *testing.T) {
161+
t.Parallel()
162+
ctx := context.Background()
163+
st := store.NewMVCCStore()
164+
seedStandardQueue(t, st)
165+
166+
coord := &recordingBatchCoordinator{stubAdapterCoordinator: stubAdapterCoordinator{clock: kv.NewHLC()}}
167+
srv := &SQSServer{store: st, coordinator: coord}
168+
169+
entries := []sqsSendMessageBatchEntryInput{{Id: "a", MessageBody: "alpha"}}
170+
successful, _, err := srv.sendMessageBatchWithRetry(ctx, "q", entries)
171+
require.NoError(t, err)
172+
require.Len(t, successful, 1)
173+
174+
// The data key embeds the MessageID; both attempts must carry the
175+
// returned MessageID so the client's id matches what is stored.
176+
wantDataKey := string(sqsMsgDataKeyDispatch(&sqsQueueMeta{Name: "q", Generation: 1}, "q", 0, 1, successful[0].MessageId))
177+
for i, keys := range coord.dispatchKeys {
178+
require.Contains(t, keys, wantDataKey, "dispatch %d must write the returned MessageID's data key", i)
179+
}
180+
}

adapter/sqs_messages.go

Lines changed: 59 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -653,28 +653,78 @@ func resolveSendDelay(meta *sqsQueueMeta, requested *int64) (int64, error) {
653653
return *requested, nil
654654
}
655655

656-
func buildSendRecord(meta *sqsQueueMeta, in sqsSendMessageInput, delay int64) (*sqsMessageRecord, []byte, error) {
656+
// sqsSendIdentity is the per-message identity that MUST stay stable across a
657+
// retry loop. Every field feeds the message's storage keys: the data key via
658+
// MessageID; the vis key via AvailableAtMillis; the by-age key via
659+
// SendTimestampMillis. If they were re-minted on every retry attempt, a
660+
// committed-but-conflicted attempt under leader churn plus the recomputed retry
661+
// would land at two different key sets, double-sending the message (the
662+
// :duplicate-elements class fixed for DynamoDB in PR #920).
663+
//
664+
// AvailableAtMillis (= sendTs + resolved delay) is captured here too, NOT
665+
// recomputed per attempt: otherwise a SetQueueAttributes that changes the
666+
// queue's DelaySeconds between a committed-but-conflicted attempt and its retry
667+
// would shift the vis key and leave the first attempt's vis entry orphaned
668+
// (codex P2 on PR #923) — a stale visibility index entry that can redeliver the
669+
// message. The batch send path lazily mints one identity per entry on its first
670+
// standard-path attempt and reuses it on every retry, so the keys are stable.
671+
type sqsSendIdentity struct {
672+
messageID string
673+
token []byte
674+
sendTsMillis int64
675+
availableAtMillis int64
676+
}
677+
678+
// newSendIdentity mints a fresh stable identity for the given resolved delay.
679+
// Single-message sends call this inline via buildSendRecord (they have no
680+
// in-process retry, so a self-inflicted conflict surfaces to the client as a
681+
// normal at-least-once SDK retry); the batch path mints once per entry and
682+
// reuses across its retry loop.
683+
func newSendIdentity(delay int64) (sqsSendIdentity, error) {
657684
messageID, err := newMessageIDHex()
658685
if err != nil {
659-
return nil, nil, errors.WithStack(err)
686+
return sqsSendIdentity{}, errors.WithStack(err)
660687
}
661688
token, err := newReceiptToken()
662689
if err != nil {
663-
return nil, nil, errors.WithStack(err)
690+
return sqsSendIdentity{}, errors.WithStack(err)
664691
}
665692
now := time.Now().UnixMilli()
666-
availableAt := now + delay*sqsMillisPerSecond
693+
return sqsSendIdentity{
694+
messageID: messageID,
695+
token: token,
696+
sendTsMillis: now,
697+
availableAtMillis: now + delay*sqsMillisPerSecond,
698+
}, nil
699+
}
700+
701+
func buildSendRecord(meta *sqsQueueMeta, in sqsSendMessageInput, delay int64) (*sqsMessageRecord, []byte, error) {
702+
id, err := newSendIdentity(delay)
703+
if err != nil {
704+
return nil, nil, err
705+
}
706+
return buildSendRecordWithIdentity(meta, in, id)
707+
}
708+
709+
// buildSendRecordWithIdentity builds the message record from a caller-supplied
710+
// stable identity (MessageID + token + timestamps) instead of minting a fresh
711+
// one. The record value embeds the current meta.Generation, so callers re-invoke
712+
// this per retry attempt with the freshly-read generation while keeping the
713+
// identity fixed — the keys stay stable for a given generation (idempotent
714+
// overwrite under leader churn) and follow the generation if a concurrent
715+
// DeleteQueue/PurgeQueue bumps it.
716+
func buildSendRecordWithIdentity(meta *sqsQueueMeta, in sqsSendMessageInput, id sqsSendIdentity) (*sqsMessageRecord, []byte, error) {
667717
body := []byte(in.MessageBody)
668718
rec := &sqsMessageRecord{
669-
MessageID: messageID,
719+
MessageID: id.messageID,
670720
Body: body,
671721
MD5OfBody: sqsMD5Hex(body),
672722
MD5OfMessageAttributes: md5OfAttributesHex(in.MessageAttributes),
673723
MessageAttributes: in.MessageAttributes,
674-
SendTimestampMillis: now,
675-
AvailableAtMillis: availableAt,
676-
VisibleAtMillis: availableAt,
677-
CurrentReceiptToken: token,
724+
SendTimestampMillis: id.sendTsMillis,
725+
AvailableAtMillis: id.availableAtMillis,
726+
VisibleAtMillis: id.availableAtMillis,
727+
CurrentReceiptToken: id.token,
678728
QueueGeneration: meta.Generation,
679729
MessageGroupId: in.MessageGroupId,
680730
MessageDeduplicationId: in.MessageDeduplicationId,

0 commit comments

Comments
 (0)