feat(sqs): finish Milestone 1 - purge, batch, tags, redrive, FIFO, retention reaper#638
feat(sqs): finish Milestone 1 - purge, batch, tags, redrive, FIFO, retention reaper#638
Conversation
…tention reaper Closes the remaining NotImplemented gaps on the SQS adapter so it can serve real client workloads, not just the catalog and single-message hot path. Catalog / queue ops - PurgeQueue: bumps generation under OCC retry; AWS 60-second cooldown is persisted on the meta record so it survives leader failover. - TagQueue / UntagQueue / ListQueueTags: read-modify-write of the Tags map under one OCC retry, capped at 50 tags per queue. - GetQueueAttributes: returns ApproximateNumberOfMessages / NotVisible / Delayed (single snapshot scan), QueueArn, and CreatedTimestamp / LastModifiedTimestamp. Message ops - SendMessageBatch / DeleteMessageBatch / ChangeMessageVisibilityBatch: per-entry validation with per-entry success/failure aggregation; request-shape errors (empty / over 10 / duplicate Ids) keep their AWS error codes. - MessageAttributes: full AWS canonical MD5 (length-prefixed sorted encoding, type byte). Send accepts the structured shape, persists the MD5, and Receive echoes filtered attributes back with a matching MD5OfMessageAttributes. DLQ redrive - RedrivePolicy is parsed and persisted; the receive transaction detects ReceiveCount+1 > maxReceiveCount and atomically moves the record to the DLQ (fresh MessageId, reset ReceiveCount, original body) inside one OCC dispatch. FIFO - Per-queue monotonic SequenceNumber stored on the record and returned by Send / Receive. - Dedup window: !sqs|msg|dedup| blocks duplicate sends for 5 minutes; ContentBasedDeduplication=true uses SHA-256(body) as the dedup id. - Group lock: !sqs|msg|group| pins the in-flight head message; a successor in the same group cannot leapfrog across visibility expiries. Released only on Delete / DLQ redrive / retention expiry, mirroring AWS semantics. Retention reaper - New !sqs|msg|byage|<queue><gen><send_ts><msg_id> index, written alongside data + vis on every send. - Leader-side goroutine wakes every 30 s, scans byage for entries past send_timestamp + retention, and removes (data, vis, byage, optional FIFO group lock) under one OCC dispatch per record. - Reap is bounded per-queue per-tick to prevent starvation; followers short-circuit so only the leader emits Dispatches. Tests - 11 new integration tests under adapter/sqs_extra_test.go cover every new code path (purge cooldown, batch shape errors, MD5 round-trip, malformed attributes, DLQ redrive, FIFO sequence, dedup, content-based dedup, group lock pinning, reaper). go vet ./... and go test ./adapter -run TestSQS pass.
|
Warning You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again! |
|
Warning Rate limit exceeded
Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 37 minutes and 45 seconds. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (13)
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@claude[agent] review |
Code review complete for SQS Milestone 1 (5ef9fa0). Overall, this is production-quality work with excellent attention to AWS API fidelity and transactional correctness. Key observations: Strengths:
Minor observations:
Test quality: The integration tests are particularly thorough:
All CI checks passing (lint, tests, Jepsen). Ready to merge. Commit: 5ef9fa0 |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 5ef9fa0f13
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| deadline := time.Now().Add(transactRetryMaxDuration) | ||
| readTS := initialReadTS | ||
| for range transactRetryMaxAttempts { | ||
| resp, retry, err := s.sendFifoMessage(r.Context(), queueName, meta, in, dedupID, delay, readTS) |
There was a problem hiding this comment.
Reload queue metadata on FIFO send retries
sendMessageFifoLoop retries by advancing readTS but keeps passing the original meta snapshot into sendFifoMessage. If the first attempt conflicts with a concurrent PurgeQueue/DeleteQueue (or metadata change), the next attempt can commit using stale generation/attributes because OCC ReadKeys only fence writes after the new StartTS, not against expected values. In practice this can return success while writing under an old generation (message becomes unreachable) or applying stale queue settings; the retry path should re-read queue metadata before retrying.
Useful? React with 👍 / 👎.
| dataKey := sqsMsgDataKey(queueName, meta.Generation, rec.MessageID) | ||
| visKey := sqsMsgVisKey(queueName, meta.Generation, rec.AvailableAtMillis, rec.MessageID) | ||
| byAgeKey := sqsMsgByAgeKey(queueName, meta.Generation, rec.SendTimestampMillis, rec.MessageID) |
There was a problem hiding this comment.
Enforce FIFO dedup/sequence in batch sends
Batch send currently persists each entry only as data/visibility/by-age records, with no dedup record or queue sequence counter mutation. For .fifo queues this bypasses the dedup window and monotonic sequence assignment used by single-message sends, so repeated SendMessageBatch calls with the same dedup IDs can enqueue duplicates and violate FIFO contract expectations. The batch path needs FIFO-specific handling equivalent to sendFifoMessage.
Useful? React with 👍 / 👎.
…tch dedup/sequence Three review-driven fixes on the SQS Milestone 1 PR: 1. **Race in Run/Stop**: NewSQSServer now creates the reaper context up front, so a Stop() that lands before Run() finishes its goroutine reads a stable cancel func instead of racing with Run()'s assignment. The race detector flagged this on every test that exercised Stop() (TestSQSServer_StopShutsDown, every test going through shutdownNode). 2. **FIFO send retry must re-read meta** (Codex P1): on OCC conflict the FIFO send loop only advanced readTS but reused the original meta snapshot, so a concurrent PurgeQueue / DeleteQueue / SetQueueAttributes that won the race could let the retry commit under a stale generation — silently writing a message that is unreachable via routing. The retry now re-reads meta against the new readTS and aborts with QueueDoesNotExist if the queue vanished. 3. **FIFO batch dedup + sequence** (Codex P1): SendMessageBatch on FIFO queues used the Standard single-OCC path, which skipped the dedup record write and assigned no per-queue sequence number. A repeated MessageDeduplicationId could enqueue duplicates, and ordering guarantees were lost. FIFO batches now route per-entry through a shared runFifoSendWithRetry helper so dedup + sequence + group lock semantics match single-message FIFO sends. Standard queues keep the single-OCC fast path. SequenceNumber is added to the batch result shape. Tests - New TestSQSServer_SendMessageBatchFifoDedupAndSequence pins the dedup-collapse + monotonic-sequence contract on FIFO batch sends. - go test -race ./adapter/... is now clean.
|
Fixes pushed in
@claude[agent] review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: f17601f9b9
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| readTS := s.nextTxnReadTS(ctx) | ||
| currentMeta := meta | ||
| for range transactRetryMaxAttempts { | ||
| resp, retry, err := s.sendFifoMessage(ctx, queueName, currentMeta, in, dedupID, delay, readTS) |
There was a problem hiding this comment.
Reuse one snapshot for initial FIFO send attempt
runFifoSendWithRetry seeds attempt 1 with a fresh readTS but the caller-provided meta snapshot, so those two can refer to different queue states. If PurgeQueue, DeleteQueue, or SetQueueAttributes commits between the metadata load and this function, the first FIFO send can commit with stale generation/attributes because OCC ReadKeys only guard writes after StartTS. That can acknowledge a send that is written under an old generation and becomes unreachable; initialize attempt 1 from a single consistent (meta, readTS) snapshot or reload metadata at the chosen readTS before dispatch.
Useful? React with 👍 / 👎.
| prefix := sqsMsgByAgePrefixForQueue(queueName, meta.Generation) | ||
| start := bytes.Clone(prefix) | ||
| upper := append(bytes.Clone(prefix), encodedU64(uint64MaxZero(cutoff)+1)...) |
There was a problem hiding this comment.
Reap prior generations after PurgeQueue bumps generation
The retention reaper only scans the by-age prefix for meta.Generation, but PurgeQueue advances generation instead of deleting old keys. After a purge, all prior-generation message records become unreachable by normal routing and this reaper path never visits them again, so each purge can permanently leak data/vis/by-age/group-lock state and grow storage unbounded. The reaper (or purge path) needs a strategy to clean older generations.
Useful? React with 👍 / 👎.
| if rec.ExpiresAtMillis > 0 && time.Now().UnixMilli() > rec.ExpiresAtMillis { | ||
| return nil, key, nil |
There was a problem hiding this comment.
Remove expired FIFO dedup keys when they age out
Expired dedup entries are treated as misses and returned to callers, but the key is left in storage. Because FIFO sends always write !sqs|msg|dedup|... records and no delete path exists for them, workloads with mostly unique dedup IDs accumulate permanently expired dedup rows, causing avoidable long-term storage growth. Delete expired rows on read or include this keyspace in background reaping.
Useful? React with 👍 / 👎.
… dedup Three findings from the second Codex review pass: 1. **FIFO send first attempt mismatch (Codex P1)**: `runFifoSendWithRetry` seeded attempt 1 with a fresh `readTS` but reused the caller's `meta` snapshot, which was loaded at an earlier wall-clock time. A PurgeQueue / DeleteQueue / SetQueueAttributes that committed in between would slip past `ReadKeys` (which only fence writes after `StartTS`), letting the dispatch commit under stale generation / attributes. Every attempt now re-reads meta at the readTS used for the OCC dispatch, so (meta, readTS) is always coherent. 2. **Reaper missed prior generations (Codex P1)**: PurgeQueue and DeleteQueue advance the queue generation rather than walking the keyspace. The retention reaper only scanned the current generation, so every purge would permanently leak data / vis / byage / group- lock state for the messages it left behind. The reaper now scans the byage prefix across every generation; entries on the live generation are gated by retention, entries on prior generations are unconditional orphans. New `parseSqsMsgByAgeKey` decodes the (gen, ts, msg_id) triple from the key. 3. **Expired dedup records leaked (Codex P2)**: FIFO sends always write `!sqs|msg|dedup|...` rows, and the send path treats expired ones as misses but never deletes them. Workloads with mostly unique MessageDeduplicationIds were accumulating permanent dedup storage. Added `reapExpiredDedup` to the per-queue reaper sweep: scans the dedup prefix, decodes each value, deletes rows whose ExpiresAtMillis is in the past. Tests - New TestSQSServer_RetentionReaperReclaimsPurgedGenerations stamps three records on gen N, calls PurgeQueue, and asserts no prior-generation rows survive a single reaper pass. - New TestSQSServer_RetentionReaperDropsExpiredFifoDedup backdates a dedup record's ExpiresAtMillis and asserts it is gone after one reaper pass. - `go test -race ./adapter/...` and `golangci-lint run` both clean.
|
Pushed
Two new integration tests pin the contracts:
@claude[agent] review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 04593f4420
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| meta, exists, err := s.loadQueueMetaAt(ctx, name, readTS) | ||
| if err != nil || !exists { | ||
| // Even when meta is gone (DeleteQueue), prior-generation |
There was a problem hiding this comment.
Reap deleted queue data instead of skipping missing metadata
This loop is driven by scanQueueNames, which only enumerates queue meta keys, and then immediately continues when loadQueueMetaAt reports missing metadata; after DeleteQueue removes the meta row, that queue is never visited again. As a result, old data/vis/byage/group/dedup keys for deleted queues are never reaped and can accumulate indefinitely across create/delete cycles.
Useful? React with 👍 / 👎.
| "FIFO send requires MessageDeduplicationId or ContentBasedDeduplication=true") | ||
| return | ||
| } | ||
| resp, err := s.runFifoSendWithRetry(r.Context(), queueName, in, dedupID, delay) |
There was a problem hiding this comment.
Derive FIFO dedup and delay from each retry snapshot
This call passes dedupID and delay that were computed before entering the retry loop, but runFifoSendWithRetry re-loads queue metadata per attempt. If SetQueueAttributes changes ContentBasedDeduplication or DelaySeconds between the initial read and the chosen retry snapshot, the send can be validated/committed with stale FIFO rules (for example, accepting a request that should fail MissingParameter, or applying the wrong delivery delay).
Useful? React with 👍 / 👎.
| if id == "" { | ||
| return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidBatchEntryId, | ||
| "every batch entry requires a non-empty Id") | ||
| } |
There was a problem hiding this comment.
Enforce AWS batch entry ID format and length constraints
Batch entry shape validation currently only checks that each Id is non-empty and distinct. AWS requires each batch entry Id to be limited to 80 characters and restricted to alphanumeric, hyphen, and underscore; accepting invalid IDs here causes malformed batch requests to succeed instead of returning InvalidBatchEntryId.
Useful? React with 👍 / 👎.
…d grammar Three findings from the third Codex review pass: 1. **Reaper missed deleted queues entirely (P1)**: `reapAllQueues` pulled the queue list from `scanQueueNames`, which only enumerates live meta keys, then `continue`d when meta was missing. After `DeleteQueue` removes the meta row, that queue was never visited again — leaving its data / vis / byage / dedup / group keys to leak forever. `DeleteQueue` now writes a `!sqs|queue|tombstone|<queue><gen>` marker in the same OCC transaction, and the reaper enumerates those tombstones every tick. Once a tombstone's byage / dedup / group prefixes are all empty the tombstone itself is deleted so the next tick does not re-walk an empty queue forever. 2. **FIFO retry derived dedup/delay before fresh meta (P2)**: `runFifoSendWithRetry` re-loaded queue metadata per attempt, but the caller computed `dedupID` and `delay` once outside the loop from the original (now-stale) snapshot. If `SetQueueAttributes` flipped `ContentBasedDeduplication` or `DelaySeconds` between the original load and the chosen retry snapshot, the send could have been validated and committed under the previous generation's rules. The retry loop now re-derives both fields from the fresh meta on every attempt, so the (meta, readTS, dedupID, delay) tuple is always coherent. 3. **Batch entry Id grammar (P2)**: AWS limits batch entry Ids to 1-80 chars of `[a-zA-Z0-9_-]`. `validateBatchEntryShape` only checked for non-empty distinct strings; arbitrary user input (whitespace, multi-byte unicode, slashes, oversized strings) passed through into per-entry processing. Now rejected with `InvalidBatchEntryId` at the request-shape stage. Tests - `TestSQSServer_ReaperCleansDeletedQueueOrphans` sends three records, deletes the queue, runs one reaper pass, and asserts both the byage rows and the tombstone are cleaned up. - `TestSQSServer_SendMessageBatchRejectsInvalidEntryId` exercises every malformed-Id shape AWS rejects. - `go test -race ./adapter/...` and `golangci-lint run` both clean.
|
Pushed
New tests:
@claude[agent] review |
|
You have reached your Codex usage limits for code reviews. You can see your limits in the Codex usage dashboard. |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: f2a94f2260
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| for _, e := range in.Entries { | ||
| total += len(e.MessageBody) |
There was a problem hiding this comment.
Count message attributes in SendMessageBatch payload limit
This payload guard only sums len(e.MessageBody) and ignores MessageAttributes bytes (especially large BinaryValues). A client can therefore submit a batch with small bodies but huge attributes and bypass the 262144-byte request cap, which defeats the DoS protection this check is meant to provide and allows oversized records to be accepted.
Useful? React with 👍 / 👎.
| srcArn string, | ||
| readTS uint64, | ||
| ) (bool, error) { | ||
| dlqMeta, exists, err := s.loadQueueMetaAt(ctx, policy.DLQName, readTS) |
There was a problem hiding this comment.
Reject self-referential RedrivePolicy targets
Redrive proceeds without checking whether the DLQ name equals the source queue name. When policy.DLQName == srcQueueName, redriveCandidateToDLQ deletes the current record and writes a new one back into the same queue with reset receive counters, so poison messages can loop forever instead of being quarantined to a separate queue.
Useful? React with 👍 / 👎.
| if int64(len(entry.MessageBody)) > meta.MaximumMessageSize { | ||
| return false, sqsSendMessageBatchResultEntry{}, batchErrorEntryFromAPIErr(entry.Id, | ||
| newSQSAPIError(http.StatusBadRequest, sqsErrMessageTooLong, "message body exceeds MaximumMessageSize")) | ||
| } | ||
| if err := validateSendFIFOParams(meta, asSingle); err != nil { | ||
| return false, sqsSendMessageBatchResultEntry{}, batchErrorEntryFromAPIErr(entry.Id, err) |
There was a problem hiding this comment.
Use fresh metadata for FIFO batch preflight validation
This preflight validation uses the meta snapshot captured before entry processing, but runFifoSendWithRetry later re-reads queue metadata per attempt. If SetQueueAttributes changes ContentBasedDeduplication or MaximumMessageSize while a batch is being processed, this stale preflight can fail an entry before the fresh-snapshot retry path runs, producing incorrect per-entry failures.
Useful? React with 👍 / 👎.
…ht uses fresh meta Three findings from the fourth Codex review pass: 1. **SendMessageBatch ignored attribute bytes (P1)**: the 256 KiB request cap only summed `len(MessageBody)`, so a client could ship tiny bodies plus a few-MiB BinaryValue per entry and slip past the DoS gate. Total now includes name + DataType + StringValue + BinaryValue across every attribute. 2. **Self-referential RedrivePolicy (P1)**: a policy whose `deadLetterTargetArn` named the source queue itself would let redrive loop poison messages back into the same queue with reset receive counters. Rejected at attribute-apply time (CreateQueue / SetQueueAttributes) and again at runtime in `redriveCandidateToDLQ` as defense-in-depth. 3. **FIFO batch preflight used stale meta (P2)**: `sendOneFifoBatchEntry` ran size + FIFO-param validation against the meta snapshot captured before per-entry processing, while `runFifoSendWithRetry` re-loaded meta per attempt. A SetQueueAttributes mid-batch could fail an entry that the per-attempt path would accept. All meta-dependent rules (MaximumMessageSize, validateSendFIFOParams, dedup id, delay) now resolve inside the retry loop's per-attempt resolveFreshFifoSnapshot helper. The preflight keeps only the meta-independent shape checks (empty body, MessageAttributes format). Tests - TestSQSServer_SendMessageBatchAttributesContributeToSizeCap ships two entries with 200 KiB BinaryValues + tiny bodies and asserts the request is rejected with BatchRequestTooLong. - TestSQSServer_RedrivePolicyRejectsSelfReference exercises the validator on both CreateQueue and SetQueueAttributes paths. - `go test -race ./adapter/...` and `golangci-lint run` both clean.
|
You have reached your Codex usage limits for code reviews. You can see your limits in the Codex usage dashboard. |
|
Pushed
New tests:
@claude review |
|
Claude finished @bootjp's task in 3m 35s —— View job Review of commit
|
Claude review on 701a21a noted that reapAllQueues' comment references reapDeletedQueueOrphans, which does not exist; the actual function handling deleted-queue orphans is reapTombstonedQueues. Comment-only change.
|
You have reached your Codex usage limits for code reviews. You can see your limits in the Codex usage dashboard. |
|
Pushed Thanks for the thorough review. With your "ready to merge" verdict and all CI green, the loop has reached its exit condition (review OK). I'll stop the periodic check now; please merge at your convenience. @claude review |
|
Claude finished @bootjp's task in 40s —— View job Review of commit
|
|
Pushed
New tests:
@claude review |
|
Claude finished @bootjp's task in 2m 54s —— View job Review of commit
|
|
Thanks for the thorough re-review on The two observations you flagged (source-message liveness assertion in the FIFO-DLQ rejection test, and the inherent TOCTOU on the batch queue-exists check) were both noted as low-priority / not blocking, so I'll leave them as follow-ups unless you'd prefer them addressed before merge — happy to do either. Please merge at your convenience. |
|
@claude review |
|
Claude finished @bootjp's task in 5m 47s —— View job Review of PR #638 — SQS Milestone 1 (commit
|
…don't leak PurgeQueue advanced the generation counter without writing a tombstone for the superseded gen. If DeleteQueue committed before the next reaper tick (~30 s), the pre-purge cohort became permanently unreachable: scanQueueNames sees no meta and skips, while reapTombstonedQueues only finds the post-purge gen's tombstone (DeleteQueue's), and reapDeadByAge filters by that gen so byage / data / vis / dedup / group rows from the pre-purge gen were never visited. Fix: write sqsQueueTombstoneKey(queueName, lastGen) in the same OCC OperationGroup that bumps the counter, mirroring deleteQueueWithRetry. The tombstone-driven reaper now sweeps both generations after a Purge then Delete sequence. Adds TestSQSServer_PurgeThenDeleteOrphansCleaned which fails on the prior code (3 leaked byage entries) and passes after the fix.
|
Pushed Storage orphan on Purge → Delete races ( Regression test (
The two prior low-priority observations (FIFO-DLQ rejection liveness assertion, batch queue-exists TOCTOU) are still left as follow-ups per the previous round's agreement. @claude review |
|
Claude finished @bootjp's task in 3m 26s —— View job Review of commit
|
The SqsQueueTombstonePrefix constant comment claimed only DeleteQueue wrote tombstones. As of 31a083f, PurgeQueue also writes one — the pre-bump-generation marker that lets the reaper find pre-purge orphans when DeleteQueue lands before the next reaper tick. Update the comment to record both writers and the invariant each preserves. No behavior change.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 31a083f31d
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if dlqMeta.IsFIFO && srcRec.MessageGroupId == "" { | ||
| return false, newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, | ||
| "FIFO DLQ requires source records to carry MessageGroupId") |
There was a problem hiding this comment.
Reject FIFO sources targeting Standard DLQs
This guard only rejects Standard→FIFO redrive (dlqMeta.IsFIFO && srcRec.MessageGroupId == ""), so FIFO→Standard policies still pass through. In that case buildDLQRecord copies a non-empty MessageGroupId into a Standard DLQ record, and the receive path later enables FIFO group-lock behavior whenever MessageGroupId is non-empty (tryDeliverCandidate), which can serialize delivery in a queue that should behave as Standard. Add an explicit queue-type compatibility check (or strip FIFO-only fields) before committing the redrive move.
Useful? React with 👍 / 👎.
|
Pushed With your "Ready to merge" verdict on @claude review |
|
Claude finished @bootjp's task in 48s —— View job Review of commit
|
…FIFO The existing guard in redriveCandidateToDLQ only rejected the Standard-source -> FIFO-DLQ direction. The symmetric mismatch (FIFO source -> Standard DLQ) still committed: buildDLQRecord copies srcRec.MessageGroupId verbatim, and tryDeliverCandidate takes its FIFO group-lock path whenever MessageGroupId != "" - serializing delivery in a queue that should behave as Standard. Fix: load srcMeta in addition to dlqMeta and require srcMeta.IsFIFO == dlqMeta.IsFIFO before building the OCC dispatch. This is the runtime defense AWS relies on - the catalog accepts a RedrivePolicy that names a queue created or recreated as a different type later, so attribute-time validation cannot catch every case. The existing FIFO-DLQ + empty-MessageGroupId guard is kept as a defense-in-depth rail for malformed FIFO->FIFO records that slip past the type-equality check. Validation logic moved to a validateRedriveTargets helper so redriveCandidateToDLQ stays under the cyclop=10 ceiling per project rules - no //nolint. Adds TestSQSServer_RedrivePolicyStandardDlqRejectsFifoSource which fails on the prior code (DLQ ends up with a record carrying MessageGroupId:g1) and passes after the fix.
|
Pushed FIFO -> Standard redrive direction now rejected (
Regression test (
@claude review |
|
Claude finished @bootjp's task in 2m 14s —— View job Review of commit
|
|
Thanks for the thorough re-review on The pre-existing minor gap you noted (source-message liveness assertion in the redrive rejection tests) remains as a low-priority follow-up per the earlier round's agreement. |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: cdb3c87db8
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| {Op: kv.Del, Key: srcDataKey}, | ||
| {Op: kv.Del, Key: srcByAgeKey}, | ||
| {Op: kv.Put, Key: dlqDataKey, Value: dlqRecordBytes}, | ||
| {Op: kv.Put, Key: dlqVisKey, Value: []byte(dlqRec.MessageID)}, | ||
| {Op: kv.Put, Key: dlqByAgeKey, Value: []byte(dlqRec.MessageID)}, |
There was a problem hiding this comment.
Advance FIFO sequence when moving messages to a DLQ
This redrive transaction writes FIFO DLQ message records directly but never updates the DLQ's sqsQueueSeqKey, so redriven FIFO messages keep a zero SequenceNumber (from buildDLQRecord) and later FIFO sends continue from a stale counter. In a FIFO DLQ this breaks the expected monotonic sequence behavior and causes receives to return messages without a valid SequenceNumber attribute. The redrive path should reserve and persist the next sequence in the same OCC transaction, just like sendFifoMessage.
Useful? React with 👍 / 👎.
| case sqsAttributeBaseTypeBinary: | ||
| if len(v.BinaryValue) == 0 { | ||
| return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, | ||
| "MessageAttribute "+name+" requires BinaryValue") |
There was a problem hiding this comment.
Reject Binary attributes that also include StringValue
Validation for Binary attributes only checks that BinaryValue is present and does not reject an accompanying StringValue, so malformed attributes with both fields are currently accepted. That violates the documented “exactly one value field” shape and can produce ambiguous behavior because hashing/serialization use the binary payload while silently ignoring the string payload. Return InvalidAttributeValue when a Binary attribute includes StringValue.
Useful? React with 👍 / 👎.
|
@claude review |
|
Claude finished @bootjp's task in 3m 15s —— View job Review of PR #638 — SQS Milestone 1 (commit
|
| Severity | Location | Finding |
|---|---|---|
| P1 | sqs_redrive.go:265-272, sqs_redrive.go:213 |
FIFO→FIFO redrive: SequenceNumber=0 and DLQ seq counter never bumped |
| P2 | sqs_messages.go:1523-1527 |
Binary attribute with non-empty StringValue not rejected |
The P1 is a FIFO-specific defect that became reachable only after cdb3c87 enabled FIFO→FIFO redrive. The P2 is a straightforward validator gap. Holding "Ready to merge" pending these two fixes.
…ute values
Two findings from the latest Codex review on `cdb3c87` that the
prior "Ready to merge" verdicts had not addressed.
P1 — FIFO->FIFO redrive committed `SequenceNumber=0` and never
advanced the DLQ's sqsQueueSeqKey (`sqs_redrive.go:265-272, 213`).
Once `cdb3c87` enabled FIFO->FIFO redrives by enforcing type
equality, the SequenceNumber path became reachable and showed:
- the DLQ record carried the zero-value `SequenceNumber` (AWS
surfaces this verbatim; AWS sequences start at 1, never 0),
- the DLQ's per-queue sequence counter was never incremented, so a
subsequent normal FIFO send to the DLQ assigned a number lower
than the redriven message — non-monotonic to consumers, breaking
the FIFO contract,
- two concurrent FIFO->FIFO redrives both wrote `SequenceNumber=0`
with no OCC conflict because `sqsQueueSeqKey(policy.DLQName)` was
not on `ReadKeys`.
Fix: when `dlqMeta.IsFIFO`, `redriveCandidateToDLQ` now loads the
DLQ's seq counter at `readTS`, computes `nextSeq = prev+1`, threads
the value through `buildDLQRecord` (where it is stamped onto the
record before encoding) and `buildRedriveOps` (where the seq Put is
added to `Elems` and `sqsQueueSeqKey(policy.DLQName)` is added to
`ReadKeys`). Standard DLQs are unchanged: `dlqSeq=0` flows through
unused.
P2 — `validateOneMessageAttribute` accepted Binary attributes that
also carried a non-empty `StringValue` (`sqs_messages.go:1523-1527`).
AWS requires each MessageAttributeValue to populate exactly one of
{StringValue, BinaryValue}; submitting both is an
`InvalidParameterValue`. The previous validator only checked the
type-required field was non-empty and silently let the foreign
field through, where it would be persisted into the record and
round-tripped on ReceiveMessage with mismatched MD5 hashes.
Fix: extracted the type-specific value-pair check into
`validateMessageAttributeValuePair` and added the symmetric guards
— Binary rejects `StringValue != ""`, String/Number rejects
`len(BinaryValue) > 0`. The split also keeps
`validateOneMessageAttribute` under the cyclop budget.
Tests:
- `TestSQSServer_FifoFifoRedriveAssignsSequenceNumber`
(`sqs_extra_test.go`): create FIFO source + FIFO DLQ, send poison,
redrive on the second receive, assert the DLQ message's
SequenceNumber > 0, then send a fresh FIFO message to the DLQ and
assert its SequenceNumber == redrivenSeq + 1 (monotonic
continuation, the invariant the fix guarantees).
- `TestSQSServer_SendMessageRejectsBinaryWithStringValue`
(`sqs_extra_test.go`): table-driven over the three illegal
combinations (Binary + non-empty StringValue, String + non-empty
BinaryValue, Number + non-empty BinaryValue); each must return
HTTP 400 / InvalidAttributeValue.
Per CLAUDE.md self-review:
1. Data loss — None. The poison message body / attributes flow
unchanged; only the SequenceNumber field and DLQ counter change.
2. Concurrency — `sqsQueueSeqKey(policy.DLQName)` is now on
ReadKeys, so two concurrent FIFO->FIFO redrives serialise on
write-write-conflict, same as concurrent FIFO sends.
3. Performance — One extra `loadFifoSequence` per FIFO redrive;
negligible vs. the existing `validateRedriveTargets` reads.
4. Data consistency — DLQ SequenceNumber now monotonic across both
redrive and normal-send paths; no two messages can share a
sequence number on the same FIFO queue.
5. Test coverage — Both fixes ship with co-located regression
tests; the existing FIFO-related tests
(`TestSQSServer_FifoSequenceNumberMonotonic`,
`TestSQSServer_DLQRedriveOnMaxReceiveCount`,
`TestSQSServer_RedrivePolicy*`) remain green.
|
Round-N Codex 2 件 (P1 + P2) を 0bd04e9 で対応。テスト 2 本付き。
Tests
Self-review (CLAUDE.md 5 lenses)
@claude review |
|
Claude finished @bootjp's task in 2m 22s —— View job Review of commit
|
| Severity | Location | Fix | Status |
|---|---|---|---|
| P1 | sqs_redrive.go:139-145, 311-317 |
FIFO→FIFO redrive assigns SequenceNumber = prevSeq+1, updates DLQ seq counter, adds OCC fence |
✓ Resolved |
| P2 | sqs_messages.go:1528-1553 |
validateMessageAttributeValuePair enforces "exactly one value field" for all attribute types |
✓ Resolved |
No new concerns. OCC discipline is consistent, seq monotonicity is preserved across both write paths, and regression tests pin both invariants precisely.
Ready to merge. All P1/P2 findings across every review round are resolved.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 25598fe6b1
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if err := s.deleteMessageWithRetry(r.Context(), queueName, handle); err != nil { | ||
| failed = append(failed, batchErrorEntryFromErr(entry.Id, err)) | ||
| continue |
There was a problem hiding this comment.
Return request-level QueueDoesNotExist on batch races
This loop still converts QueueDoesNotExist from per-entry execution into Failed[] and returns HTTP 200, so if the queue is deleted after the upfront requireQueueExists check, callers get a partial-success envelope for a queue-level failure. Fresh evidence is that the new precheck at lines 447/518 does not prevent the later per-entry path here (and in changeMessageVisibilityBatch) from misclassifying a post-check queue deletion.
Useful? React with 👍 / 👎.
| prefix := sqsMsgByAgePrefixAllGenerations(queueName) | ||
| upper := prefixScanEnd(prefix) | ||
| start := bytes.Clone(prefix) | ||
|
|
||
| processed := 0 |
There was a problem hiding this comment.
Preserve retention reaping for pre-indexed message records
The reaper enumerates only the !sqs|msg|byage|... index, so records created before this commit (which did not write by-age keys) are never discovered by retention or tombstone cleanup. In upgraded clusters, those legacy messages can outlive MessageRetentionPeriod and leave orphaned data/vis rows after purge/delete operations, causing correctness and storage-leak issues until a full migration/backfill path exists.
Useful? React with 👍 / 👎.
## Summary Phase **3.B** of [`docs/design/2026_04_24_partial_sqs_compatible_adapter.md`](https://github.com/bootjp/elastickv/blob/main/docs/design/2026_04_24_partial_sqs_compatible_adapter.md) §16.4. Adds the AWS SQS **query protocol** (form-encoded request, XML response) alongside the existing JSON protocol on the same listener — older `aws-sdk-java` v1, `boto < 1.34`, and AWS CLI clients can now talk to elastickv without modification. Detection happens per-request via `Content-Type` / `X-Amz-Target` / `Action` presence; no flag, no separate port. See the new proposal doc committed alongside. This is an **architectural proof PR**: dispatch + decoding + encoding + error envelope + the `*Core` refactor pattern are all in place, with **three verbs** wired end-to-end as concrete proof. Each follow-up verb is a parser + response struct + one switch arm — no further design work needed. ### Verbs in this PR | Verb | Why it's in the proof set | |---|---| | `CreateQueue` | Exercises the `Attribute.N.{Name,Value}` indexed-collection parser. | | `ListQueues` | Exercises the repeated-element XML response shape. | | `GetQueueUrl` | Exercises the `<ErrorResponse>` envelope path via `QueueDoesNotExist`. | Every other verb returns a structured **501 `NotImplementedYet`** XML envelope so operators see the gap explicitly. `SendMessage` / `ReceiveMessage` / `DeleteMessage` are the highest-priority follow-ups (they need the same `*Core` refactor on the FIFO send loop). ### Key design points - **No new listener / no flag.** `pickSqsProtocol(*http.Request)` decides per request. JSON and Query share the SQS port and the SigV4 path. - **Wire-format-free cores.** `createQueue` / `listQueues` / `getQueueUrl` are now `decode → core → encode` with `core(ctx, in) (out, error)`. The JSON wrappers are unchanged in behavior; existing JSON tests pass without modification. - **DoS protection inherited.** Body read is bounded by the same `sqsMaxRequestBodyBytes` the JSON path uses. - **SigV4 unchanged.** The signed canonical request includes the form-encoded body, so the existing SigV4 middleware verifies query requests without code changes. - **Error parity.** `<Code>` reuses the existing `sqsErr*` constants. HTTP status mirrors what the JSON path returns, so SDK retry classifiers work across protocols. - **Cyclomatic budget honoured.** `handle()` was refactored to extract `handleQueryProtocol` — `cyclop ≤ 10` per project rules, no `//nolint`. ### Known limitation (design §11.4) `proxyToLeader`'s error writer always emits the JSON envelope, so a query-protocol client hitting a follower during a leader flip sees one JSON error before retry lands on the new leader. Follow-up PR threads the detected protocol onto the request context so the proxy emits matching XML. ## Test plan - [x] `go build ./...` — clean - [x] `go test -count=1 -race -run "TestSQS|QueryProtocol|TestPickSqs|TestCollectIndexedKV|TestWriteSQSQueryError" ./adapter/` — passes - [x] `golangci-lint run ./adapter/...` — `0 issues.` - [x] `pickSqsProtocol` table tests cover documented edge cases (header precedence, charset suffix, GET with Action, missing Action, nil request). - [x] `collectIndexedKVPairs` tests cover happy path, orphan Name, empty input, unrelated prefix. - [x] End-to-end via the in-process listener: CreateQueue / ListQueues / GetQueueUrl round-trip on the query side. - [x] **Cross-protocol parity**: a queue created via Query is visible via JSON `GetQueueUrl` with the same URL. - [x] Error envelope: 4xx maps to `<Type>Sender</Type>`, 5xx to `<Type>Receiver</Type>`, namespace pinned, `x-amzn-ErrorType` header set. - [x] Unknown verb returns 501 with the `NotImplementedYet` XML envelope. - [x] Missing `Action` parameter returns 400 (per design §3). ## Self-review (5 lenses) 1. **Data loss** — Wire-format change only. Cores are byte-for-byte identical to the previous handler bodies; no Raft / OCC / MVCC code is touched. 2. **Concurrency** — No new shared state. Detection is request-local. Body parsing is bounded. 3. **Performance** — One additional `Content-Type` string compare per request on the dispatch hot path. Negligible. 4. **Data consistency** — `*Core` returns the same business-logic outputs as before; the JSON tests are the regression net for parity. Cross-protocol parity test pins behaviour. 5. **Test coverage** — 10 new test cases cover detection, parsing, envelope shape, and three end-to-end verbs. Existing `TestSQS*` race suite passes on the refactor. ## Stacking This PR is **independent** — branched from current `main` (which has #638 + #649 merged). It does not depend on PR #650 / PR #659. Merge whenever ready.
Summary
Closes the remaining
NotImplementedgaps on the SQS adapter so it can serve real client workloads, not just the catalog and the single-message hot path.!sqs|msg|dedup|), and group lock (!sqs|msg|group|) that keeps the head pinned across visibility expiries — released only on Delete / DLQ redrive / retention.!sqs|msg|byage|send-age index plus a leader-side goroutine that drops expired records (data + vis + byage + optional group lock) under one OCC dispatch per record. Per-queue budget prevents starvation; followers short-circuit so only the leader emits Dispatches.Test plan
go vet ./...go test -run 'TestSQS' ./adapter/...golangci-lint run --config=.golangci.yaml ./adapter/...clean.Out of scope
adapter/console.go).jepsen/sqs/).