Skip to content

Commit ac3e128

Browse files
committed
fix(adapter/sqs): PR #923 round-2 — re-validate batch entries against current meta every attempt
codex P2 round-2: round-1 moved per-entry validation into the first-mint path, so an entry whose identity was minted by a prior UNCOMMITTED attempt (lost a normal OCC race) skipped validation on retry. If a concurrent SetQueueAttributes lowered MaximumMessageSize (or tightened another meta-dependent limit) in the meantime, the retry could commit a message the fresh meta should reject. Split the two concerns: - validateBatchEntry runs against the CURRENT meta on EVERY attempt (body required, MaximumMessageSize, attributes, FIFO params, delay bounds) and returns the resolved delay. - The stable identity (MessageID/token/timestamps incl. AvailableAtMillis) is still minted ONCE and reused, so the keys stay stable across retries (round-1 vis-key stability preserved); AvailableAtMillis is seeded from the first attempt's delay, and re-resolving delay per attempt only re-checks bounds. Caller audit: validateBatchEntry has one caller (buildBatchSendRecord); buildBatchSendRecord has one caller (sendBatchStandardOnce); mintBatchSendIdentity removed (no refs). Single-send/FIFO paths unaffected (buildSendRecord unchanged). Test: TestSendMessageBatchStandard_RevalidatesAgainstCurrentMetaOnRetry — a mid-flight MaximumMessageSize drop must make the retry reject the now-too-big entry.
1 parent 8770f13 commit ac3e128

2 files changed

Lines changed: 74 additions & 31 deletions

File tree

adapter/sqs_batch_send_dedup_test.go

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ func (c *recordingBatchCoordinator) Dispatch(_ context.Context, req *kv.Operatio
3838
return &kv.CoordinateResponse{}, nil
3939
}
4040

41-
func seedStandardQueue(t *testing.T, st store.MVCCStore, name string) {
41+
func seedStandardQueue(t *testing.T, st store.MVCCStore) {
42+
const name = "q"
4243
t.Helper()
4344
meta := &sqsQueueMeta{
4445
Name: name,
@@ -62,7 +63,7 @@ func TestSendMessageBatchStandard_RetryReusesStableKeys(t *testing.T) {
6263
t.Parallel()
6364
ctx := context.Background()
6465
st := store.NewMVCCStore()
65-
seedStandardQueue(t, st, "q")
66+
seedStandardQueue(t, st)
6667

6768
coord := &recordingBatchCoordinator{stubAdapterCoordinator: stubAdapterCoordinator{clock: kv.NewHLC()}}
6869
srv := &SQSServer{store: st, coordinator: coord}
@@ -92,7 +93,7 @@ func TestSendMessageBatchStandard_VisKeyStableAcrossDelayChange(t *testing.T) {
9293
t.Parallel()
9394
ctx := context.Background()
9495
st := store.NewMVCCStore()
95-
seedStandardQueue(t, st, "q") // DelaySeconds = 0
96+
seedStandardQueue(t, st) // DelaySeconds = 0
9697

9798
coord := &recordingBatchCoordinator{stubAdapterCoordinator: stubAdapterCoordinator{clock: kv.NewHLC()}}
9899
coord.beforeDispatch = func(n int) {
@@ -117,6 +118,41 @@ func TestSendMessageBatchStandard_VisKeyStableAcrossDelayChange(t *testing.T) {
117118
"vis/by-age keys must stay stable across a mid-retry DelaySeconds change (cached AvailableAtMillis)")
118119
}
119120

121+
// TestSendMessageBatchStandard_RevalidatesAgainstCurrentMetaOnRetry pins codex
122+
// P2 round-2: if a prior attempt minted the identity but did NOT commit (lost a
123+
// normal OCC race) and a concurrent SetQueueAttributes then lowered
124+
// MaximumMessageSize below the body, the retry must re-validate against the
125+
// fresh meta and REJECT the entry rather than commit it on the cached identity.
126+
func TestSendMessageBatchStandard_RevalidatesAgainstCurrentMetaOnRetry(t *testing.T) {
127+
t.Parallel()
128+
ctx := context.Background()
129+
st := store.NewMVCCStore()
130+
seedStandardQueue(t, st) // MaximumMessageSize = 262144
131+
132+
coord := &recordingBatchCoordinator{stubAdapterCoordinator: stubAdapterCoordinator{clock: kv.NewHLC()}}
133+
coord.beforeDispatch = func(n int) {
134+
if n != 1 {
135+
return
136+
}
137+
// SetQueueAttributes lowers MaximumMessageSize below the body size,
138+
// landing during attempt 1's dispatch (which does not commit — it
139+
// returns a WriteConflict). The retry then loads this lowered meta
140+
// BEFORE re-validating, so the entry must be rejected.
141+
meta := &sqsQueueMeta{Name: "q", Generation: 1, MaximumMessageSize: 3}
142+
body, err := encodeSQSQueueMeta(meta)
143+
require.NoError(t, err)
144+
require.NoError(t, st.PutAt(ctx, sqsQueueMetaKey("q"), body, 100, 0))
145+
}
146+
srv := &SQSServer{store: st, coordinator: coord}
147+
148+
entries := []sqsSendMessageBatchEntryInput{{Id: "a", MessageBody: "alpha"}} // 5 bytes > new limit 3
149+
successful, failed, err := srv.sendMessageBatchWithRetry(ctx, "q", entries)
150+
require.NoError(t, err)
151+
require.Empty(t, successful, "the retry must re-validate against the lowered limit and reject the entry")
152+
require.Len(t, failed, 1)
153+
require.Equal(t, "a", failed[0].Id)
154+
}
155+
120156
// TestSendMessageBatchStandard_ReturnedMessageIdsStableAcrossRetry pins that
121157
// the MessageIds reported to the client are the ones actually stored on the
122158
// retry (not a discarded first-attempt set), so a consumer cannot observe a
@@ -125,7 +161,7 @@ func TestSendMessageBatchStandard_ReturnedMessageIdsStableAcrossRetry(t *testing
125161
t.Parallel()
126162
ctx := context.Background()
127163
st := store.NewMVCCStore()
128-
seedStandardQueue(t, st, "q")
164+
seedStandardQueue(t, st)
129165

130166
coord := &recordingBatchCoordinator{stubAdapterCoordinator: stubAdapterCoordinator{clock: kv.NewHLC()}}
131167
srv := &SQSServer{store: st, coordinator: coord}

adapter/sqs_messages_batch.go

Lines changed: 34 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -407,15 +407,20 @@ func (s *SQSServer) resolveFreshFifoSnapshot(ctx context.Context, queueName stri
407407
// SendMessage would, but returns the *sqsAPIError so the batch path
408408
// can drop the entry into Failed[] instead of failing the whole
409409
// request.
410-
// buildBatchSendRecord builds one standard-queue batch entry's record, reusing
411-
// a stable per-entry identity across retries. id points into the caller's
412-
// identities slice: on the FIRST attempt for this entry (id.messageID == "")
413-
// it validates, resolves the delay, and mints the identity (capturing
414-
// AvailableAtMillis from the delay in effect at that moment); on retries it
415-
// skips validation/resolution and reuses the cached identity, so the storage
416-
// keys are stable even if a concurrent SetQueueAttributes changes the queue's
417-
// DelaySeconds between attempts (codex P2). An entry that already committed on a
418-
// prior attempt is never re-validated against a since-changed meta.
410+
// buildBatchSendRecord builds one standard-queue batch entry's record. It splits
411+
// two concerns that must NOT be conflated:
412+
//
413+
// - Validation against the CURRENT meta runs on EVERY attempt (`validateBatchEntry`).
414+
// A prior, uncommitted attempt may have minted the identity; if a concurrent
415+
// SetQueueAttributes then tightened a limit (e.g. lowered MaximumMessageSize),
416+
// the retry must still reject a now-invalid entry rather than commit it
417+
// (codex P2 round-2).
418+
// - The stable identity (MessageID + token + timestamps incl. AvailableAtMillis)
419+
// is minted ONCE, on the first attempt, and reused. This keeps the storage
420+
// keys constant across retries so a committed-but-conflicted attempt is
421+
// overwritten idempotently, and so a mid-retry DelaySeconds change cannot
422+
// shift the vis key (codex P2 round-1). AvailableAtMillis is captured from
423+
// the first attempt's resolved delay.
419424
func buildBatchSendRecord(meta *sqsQueueMeta, entry sqsSendMessageBatchEntryInput, id *sqsSendIdentity) (*sqsMessageRecord, []byte, error) {
420425
asSingle := sqsSendMessageInput{
421426
MessageBody: entry.MessageBody,
@@ -424,39 +429,41 @@ func buildBatchSendRecord(meta *sqsQueueMeta, entry sqsSendMessageBatchEntryInpu
424429
MessageGroupId: entry.MessageGroupId,
425430
MessageDeduplicationId: entry.MessageDeduplicationId,
426431
}
432+
delay, err := validateBatchEntry(meta, entry, asSingle)
433+
if err != nil {
434+
return nil, nil, err
435+
}
427436
if id.messageID == "" {
428-
// First attempt for this entry: validate, resolve delay, mint identity.
429-
minted, err := mintBatchSendIdentity(meta, entry, asSingle)
430-
if err != nil {
431-
return nil, nil, err
437+
minted, mintErr := newSendIdentity(delay)
438+
if mintErr != nil {
439+
return nil, nil, mintErr
432440
}
433441
*id = minted
434442
}
435443
return buildSendRecordWithIdentity(meta, asSingle, *id)
436444
}
437445

438-
// mintBatchSendIdentity validates one batch entry against the current queue
439-
// meta, resolves its delay, and mints the stable identity. It runs only on the
440-
// first attempt for an entry; retries reuse the cached identity so a
441-
// since-changed meta cannot re-fail an already-committed entry or shift its keys.
442-
func mintBatchSendIdentity(meta *sqsQueueMeta, entry sqsSendMessageBatchEntryInput, asSingle sqsSendMessageInput) (sqsSendIdentity, error) {
446+
// validateBatchEntry runs every per-attempt check for a standard batch entry
447+
// against the currently-loaded queue meta and returns the resolved delay. It is
448+
// invoked on EVERY attempt (not just the first mint) so a concurrent
449+
// SetQueueAttributes that tightened limits is honored on the retry. The returned
450+
// delay seeds AvailableAtMillis only on the first mint; on retries the cached
451+
// identity's AvailableAtMillis is reused, so re-resolving here never shifts the
452+
// keys — it only re-checks bounds.
453+
func validateBatchEntry(meta *sqsQueueMeta, entry sqsSendMessageBatchEntryInput, asSingle sqsSendMessageInput) (int64, error) {
443454
if len(entry.MessageBody) == 0 {
444-
return sqsSendIdentity{}, newSQSAPIError(http.StatusBadRequest, sqsErrValidation, "MessageBody is required")
455+
return 0, newSQSAPIError(http.StatusBadRequest, sqsErrValidation, "MessageBody is required")
445456
}
446457
if int64(len(entry.MessageBody)) > meta.MaximumMessageSize {
447-
return sqsSendIdentity{}, newSQSAPIError(http.StatusBadRequest, sqsErrMessageTooLong, "message body exceeds MaximumMessageSize")
458+
return 0, newSQSAPIError(http.StatusBadRequest, sqsErrMessageTooLong, "message body exceeds MaximumMessageSize")
448459
}
449460
if err := validateMessageAttributes(entry.MessageAttributes); err != nil {
450-
return sqsSendIdentity{}, err
461+
return 0, err
451462
}
452463
if err := validateSendFIFOParams(meta, asSingle); err != nil {
453-
return sqsSendIdentity{}, err
454-
}
455-
delay, err := resolveSendDelay(meta, entry.DelaySeconds)
456-
if err != nil {
457-
return sqsSendIdentity{}, err
464+
return 0, err
458465
}
459-
return newSendIdentity(delay)
466+
return resolveSendDelay(meta, entry.DelaySeconds)
460467
}
461468

462469
// ------------------------ DeleteMessageBatch ------------------------

0 commit comments

Comments
 (0)