Skip to content

Commit 76cb788

Browse files
authored
feat(sqs): finish Milestone 1 - purge, batch, tags, redrive, FIFO, retention reaper (#638)
## Summary Closes the remaining `NotImplemented` gaps on the SQS adapter so it can serve real client workloads, not just the catalog and the single-message hot path. - **Queue ops**: PurgeQueue (with AWS's 60s cooldown persisted on the meta record), TagQueue / UntagQueue / ListQueueTags. GetQueueAttributes now returns ApproximateNumberOfMessages / NotVisible / Delayed, QueueArn, CreatedTimestamp, LastModifiedTimestamp. - **Message ops**: SendMessageBatch / DeleteMessageBatch / ChangeMessageVisibilityBatch with per-entry validation. MessageAttributes is fully wired: structured input, AWS-canonical MD5 (length-prefixed sorted encoding + type byte), filtered echo on Receive. - **DLQ redrive**: RedrivePolicy is parsed, persisted, and enforced — receive transactionally moves a message to the DLQ when it would exceed maxReceiveCount. - **FIFO**: per-queue monotonic SequenceNumber, 5-minute dedup window (`!sqs|msg|dedup|`), and group lock (`!sqs|msg|group|`) that keeps the head pinned across visibility expiries — released only on Delete / DLQ redrive / retention. - **Retention reaper**: new `!sqs|msg|byage|` send-age index plus a leader-side goroutine that drops expired records (data + vis + byage + optional group lock) under one OCC dispatch per record. Per-queue budget prevents starvation; followers short-circuit so only the leader emits Dispatches. ## Test plan - [x] `go vet ./...` - [x] `go test -run 'TestSQS' ./adapter/...` - [x] 11 new integration tests covering purge cooldown, batch shape errors, MD5 round-trip, malformed attributes, DLQ redrive, FIFO sequence / dedup / content-based dedup, FIFO group lock pinning across visibility expiry, retention reaper. - [x] `golangci-lint run --config=.golangci.yaml ./adapter/...` clean. ## Out of scope - Long-poll cross-node notifier (still polls every 200ms; design §7.3 considers polling acceptable for Milestone 1). - Operator console UI (`adapter/console.go`). - Jepsen workload (`jepsen/sqs/`). - Query-protocol XML compatibility (JSON-1.0 only).
2 parents 41c7f0f + 25598fe commit 76cb788

13 files changed

Lines changed: 5055 additions & 227 deletions

adapter/sqs.go

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ const (
5252
// "Common Errors" page of the SQS API reference.
5353
const (
5454
sqsErrInvalidAction = "InvalidAction"
55-
sqsErrNotImplemented = "NotImplemented"
5655
sqsErrInternalFailure = "InternalFailure"
5756
sqsErrServiceUnavailable = "ServiceUnavailable"
5857
sqsErrMalformedRequest = "MalformedQueryString"
@@ -69,6 +68,14 @@ type SQSServer struct {
6968
leaderSQS map[string]string
7069
region string
7170
staticCreds map[string]string
71+
// reaperCtx / reaperCancel drive the retention sweeper goroutine.
72+
// Both are initialized in NewSQSServer (never reassigned) so a
73+
// concurrent Stop() that lands before Run() completes still reads
74+
// a stable cancel func — unlike a Run-time assignment, which the
75+
// race detector flagged because Run and Stop run on different
76+
// goroutines without ordering between them.
77+
reaperCtx context.Context
78+
reaperCancel context.CancelFunc
7279
}
7380

7481
// WithSQSLeaderMap configures the Raft-address-to-SQS-address mapping used to
@@ -84,10 +91,13 @@ func WithSQSLeaderMap(m map[string]string) SQSServerOption {
8491
}
8592

8693
func NewSQSServer(listen net.Listener, st store.MVCCStore, coordinate kv.Coordinator, opts ...SQSServerOption) *SQSServer {
94+
reaperCtx, reaperCancel := context.WithCancel(context.Background())
8795
s := &SQSServer{
88-
listen: listen,
89-
store: st,
90-
coordinator: coordinate,
96+
listen: listen,
97+
store: st,
98+
coordinator: coordinate,
99+
reaperCtx: reaperCtx,
100+
reaperCancel: reaperCancel,
91101
}
92102
s.targetHandlers = map[string]func(http.ResponseWriter, *http.Request){
93103
sqsCreateQueueTarget: s.createQueue,
@@ -96,17 +106,17 @@ func NewSQSServer(listen net.Listener, st store.MVCCStore, coordinate kv.Coordin
96106
sqsGetQueueUrlTarget: s.getQueueUrl,
97107
sqsGetQueueAttributesTarget: s.getQueueAttributes,
98108
sqsSetQueueAttributesTarget: s.setQueueAttributes,
99-
sqsPurgeQueueTarget: s.notImplemented("PurgeQueue"),
109+
sqsPurgeQueueTarget: s.purgeQueue,
100110
sqsSendMessageTarget: s.sendMessage,
101-
sqsSendMessageBatchTarget: s.notImplemented("SendMessageBatch"),
111+
sqsSendMessageBatchTarget: s.sendMessageBatch,
102112
sqsReceiveMessageTarget: s.receiveMessage,
103113
sqsDeleteMessageTarget: s.deleteMessage,
104-
sqsDeleteMessageBatchTarget: s.notImplemented("DeleteMessageBatch"),
114+
sqsDeleteMessageBatchTarget: s.deleteMessageBatch,
105115
sqsChangeMessageVisibilityTarget: s.changeMessageVisibility,
106-
sqsChangeMessageVisibilityBatchTgt: s.notImplemented("ChangeMessageVisibilityBatch"),
107-
sqsTagQueueTarget: s.notImplemented("TagQueue"),
108-
sqsUntagQueueTarget: s.notImplemented("UntagQueue"),
109-
sqsListQueueTagsTarget: s.notImplemented("ListQueueTags"),
116+
sqsChangeMessageVisibilityBatchTgt: s.changeMessageVisibilityBatch,
117+
sqsTagQueueTarget: s.tagQueue,
118+
sqsUntagQueueTarget: s.untagQueue,
119+
sqsListQueueTagsTarget: s.listQueueTags,
110120
}
111121
mux := http.NewServeMux()
112122
mux.HandleFunc("/", s.handle)
@@ -120,13 +130,17 @@ func NewSQSServer(listen net.Listener, st store.MVCCStore, coordinate kv.Coordin
120130
}
121131

122132
func (s *SQSServer) Run() error {
133+
s.startReaper(s.reaperCtx)
123134
if err := s.httpServer.Serve(s.listen); err != nil && !errors.Is(err, http.ErrServerClosed) {
124135
return errors.WithStack(err)
125136
}
126137
return nil
127138
}
128139

129140
func (s *SQSServer) Stop() {
141+
if s.reaperCancel != nil {
142+
s.reaperCancel()
143+
}
130144
if s.httpServer != nil {
131145
_ = s.httpServer.Shutdown(context.Background())
132146
}
@@ -237,15 +251,6 @@ func sqsLeaderProxyErrorWriter(w http.ResponseWriter, status int, message string
237251
writeSQSError(w, status, sqsErrServiceUnavailable, message)
238252
}
239253

240-
// notImplemented returns a handler that responds with a JSON-protocol
241-
// NotImplemented error so clients get a clean signal while the real handlers
242-
// are still being built out.
243-
func (s *SQSServer) notImplemented(op string) func(http.ResponseWriter, *http.Request) {
244-
return func(w http.ResponseWriter, _ *http.Request) {
245-
writeSQSError(w, http.StatusNotImplemented, sqsErrNotImplemented, op+" is not implemented yet")
246-
}
247-
}
248-
249254
// writeSQSError emits an SQS JSON-protocol error envelope. AWS returns:
250255
//
251256
// { "__type": "<code>", "message": "<text>" }

0 commit comments

Comments
 (0)