Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,16 @@ jepsen/.ssh/
.cache/
.golangci-cache/
server

# Admin SPA build outputs. The placeholder internal/admin/dist/index.html
# is committed so `go build` succeeds in a fresh clone (the //go:embed
# directive needs at least one matching file). Everything else under
# dist/ is regenerated by `cd web/admin && npm run build` and must not
# be committed — committed bundles invariably drift from source.
/internal/admin/dist/assets/
/internal/admin/dist/index-*.js
/internal/admin/dist/index-*.css

# Admin SPA source toolchain
/web/admin/node_modules/
/web/admin/.vite/
153 changes: 153 additions & 0 deletions adapter/sqs_admin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package adapter

import (
"context"
"strconv"
"strings"
"time"

"github.com/cockroachdb/errors"
)

// AdminQueueSummary is the per-queue projection the admin dashboard
// surfaces. It deliberately covers only the fields the SPA renders so
// the package's wire-format types stay internal.
//
// Counters mirror the AWS Approximate* attribute set produced by
// computeApproxCounters; they are best-effort by AWS contract and may
// be reported as a lower bound when the visibility-index scan hits its
// per-call budget (CountersTruncated=true in that case).
type AdminQueueSummary struct {
Name string
IsFIFO bool
Generation uint64
CreatedAt time.Time
Attributes map[string]string
Counters AdminQueueCounters
CountersTruncated bool
}

// AdminQueueCounters mirrors the three Approximate* counters the
// dashboard polls. Visible / NotVisible / Delayed have the same
// definitions as in §16.1 of the SQS design doc.
type AdminQueueCounters struct {
Visible int
NotVisible int
Delayed int
}

// AdminListQueues returns every queue name this server knows about,
// in the lexicographic order the queue catalog index produces. Read
// path; runs on follower or leader and uses the same scanQueueNames
// helper the SigV4 ListQueues handler does.
func (s *SQSServer) AdminListQueues(ctx context.Context) ([]string, error) {
return s.scanQueueNames(ctx) //nolint:wrapcheck // pure pass-through; the adapter owns the error context.
}

// AdminDescribeQueue returns a snapshot of name's metadata plus the
// approximate counters. The triple (result, present, error) lets
// admin callers distinguish a missing queue from a storage error
// without sniffing sentinels.
//
// Like AdminDescribeTable on the Dynamo side, this entrypoint runs
// on either the leader or a follower (read-only); the counter scan
// uses a fresh nextTxnReadTS so the result is consistent with what
// SigV4 GetQueueAttributes would have returned at the same instant.
func (s *SQSServer) AdminDescribeQueue(ctx context.Context, name string) (*AdminQueueSummary, bool, error) {
if strings.TrimSpace(name) == "" {
return nil, false, ErrAdminSQSValidation
}
readTS := s.nextTxnReadTS(ctx)
meta, exists, err := s.loadQueueMetaAt(ctx, name, readTS)
if err != nil {
return nil, false, errors.WithStack(err)
}
if !exists {
return nil, false, nil
}
counters, err := s.computeApproxCounters(ctx, name, meta.Generation, readTS)
if err != nil {
return nil, false, err
}
summary := &AdminQueueSummary{
Name: name,
IsFIFO: meta.IsFIFO,
Generation: meta.Generation,
CreatedAt: hlcToTime(meta.CreatedAtHLC),
Attributes: metaAttributesForAdmin(meta),
Counters: AdminQueueCounters{Visible: counters.Visible, NotVisible: counters.NotVisible, Delayed: counters.Delayed},
CountersTruncated: counters.Truncated,
}
return summary, true, nil
}

// AdminDeleteQueue is the SigV4-bypass counterpart to deleteQueue.
// Returns the same sentinel errors as AdminCreateTable on the Dynamo
// side: ErrAdminForbidden on a read-only principal, ErrAdminNotLeader
// on a follower, ErrAdminSQSNotFound when the queue is absent.
func (s *SQSServer) AdminDeleteQueue(ctx context.Context, principal AdminPrincipal, name string) error {
if !principal.Role.canWrite() {
return ErrAdminForbidden
}
if !isVerifiedSQSLeader(s.coordinator) {
return ErrAdminNotLeader
}
if strings.TrimSpace(name) == "" {
return ErrAdminSQSValidation
}
if err := s.deleteQueueWithRetry(ctx, name); err != nil {
// deleteQueueWithRetry returns sqsAPIError with
// sqsErrQueueDoesNotExist when the queue is missing; map
// to the structured ErrAdminSQSNotFound so the admin
// handler can render 404 without sniffing the AWS code.
if isSQSAdminQueueDoesNotExist(err) {
return ErrAdminSQSNotFound
}
return errors.Wrap(err, "admin delete queue")
}
return nil
}

// metaAttributesForAdmin renders the queue meta into the same shape
// queueMetaToAttributes("All") would, minus the counters (the admin
// summary surfaces them as a typed struct alongside, not as strings).
// Kept as a small dedicated helper so the SigV4 path's selection
// machinery stays untouched.
func metaAttributesForAdmin(meta *sqsQueueMeta) map[string]string {
out := map[string]string{
"VisibilityTimeout": strconv.FormatInt(meta.VisibilityTimeoutSeconds, 10),
"MessageRetentionPeriod": strconv.FormatInt(meta.MessageRetentionSeconds, 10),
"DelaySeconds": strconv.FormatInt(meta.DelaySeconds, 10),
"ReceiveMessageWaitTimeSeconds": strconv.FormatInt(meta.ReceiveMessageWaitSeconds, 10),
"MaximumMessageSize": strconv.FormatInt(meta.MaximumMessageSize, 10),
"FifoQueue": strconv.FormatBool(meta.IsFIFO),
"ContentBasedDeduplication": strconv.FormatBool(meta.ContentBasedDedup),
}
if meta.RedrivePolicy != "" {
out["RedrivePolicy"] = meta.RedrivePolicy
}
return out
}

// ErrAdminSQSValidation is returned when an admin entrypoint receives
// a request with a missing or syntactically-bad queue name. Maps to
// 400 in the admin HTTP handler.
var ErrAdminSQSValidation = errors.New("sqs admin: invalid queue name")

// ErrAdminSQSNotFound is returned by write entrypoints when the
// target queue does not exist. Maps to 404. The describe path uses
// the (nil, false, nil) tuple instead of this sentinel for the
// not-found signal, mirroring AdminDescribeTable.
var ErrAdminSQSNotFound = errors.New("sqs admin: queue not found")

// isSQSAdminQueueDoesNotExist matches the deleteQueueWithRetry path's
// "queue does not exist" sqsAPIError so AdminDeleteQueue can normalise
// it to ErrAdminSQSNotFound. Falls through to false on any unrelated
// error, which AdminDeleteQueue then wraps and propagates.
func isSQSAdminQueueDoesNotExist(err error) bool {
var apiErr *sqsAPIError
if !errors.As(err, &apiErr) || apiErr == nil {
return false
}
return apiErr.errorType == sqsErrQueueDoesNotExist
}
56 changes: 53 additions & 3 deletions adapter/sqs_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,7 +777,8 @@ func (s *SQSServer) getQueueAttributes(w http.ResponseWriter, r *http.Request) {
writeSQSErrorFromErr(w, err)
return
}
meta, exists, err := s.loadQueueMetaAt(r.Context(), name, s.nextTxnReadTS(r.Context()))
readTS := s.nextTxnReadTS(r.Context())
meta, exists, err := s.loadQueueMetaAt(r.Context(), name, readTS)
if err != nil {
writeSQSErrorFromErr(w, err)
return
Expand All @@ -787,10 +788,50 @@ func (s *SQSServer) getQueueAttributes(w http.ResponseWriter, r *http.Request) {
return
}
selection := selectedAttributeNames(in.AttributeNames)
attrs := queueMetaToAttributes(meta, selection)
// Compute the approximate counters only when the caller is
// asking for them. AWS lists them on the "All" set so an
// AttributeNames=All caller pays the scan; an explicit-name
// caller only pays it if they listed at least one of the three.
// This keeps GetQueueAttributes O(1) for the common config-only
// callers while still serving operator dashboards that ask for
// the counters on every poll.
var counters *sqsApproxCounters
if approxCountersRequested(selection) {
c, cerr := s.computeApproxCounters(r.Context(), name, meta.Generation, readTS)
if cerr != nil {
writeSQSErrorFromErr(w, cerr)
return
}
counters = &c
}
attrs := queueMetaToAttributes(meta, selection, counters)
writeSQSJSON(w, map[string]any{"Attributes": attrs})
}

// approxCountersRequested reports whether the caller asked for any of
// the three Approximate* counters (or for "All", which implies them).
func approxCountersRequested(sel sqsAttributeSelection) bool {
if sel.expandAll {
return true
}
for _, name := range approxCounterAttributeNames {
if sel.names[name] {
return true
}
}
return false
}

// approxCounterAttributeNames is the set of GetQueueAttributes names
// that trigger the visibility-index scan in computeApproxCounters.
// Kept as a package-level constant slice so approxCountersRequested
// stays cheap and the names are listed in one place.
var approxCounterAttributeNames = [...]string{
"ApproximateNumberOfMessages",
"ApproximateNumberOfMessagesNotVisible",
"ApproximateNumberOfMessagesDelayed",
}

// sqsAttributeSelection is a tri-state result from selectedAttributeNames:
// expandAll = AWS "All" (or any entry equals "All"); a non-nil map lists
// the specific attribute names the caller asked for; and an empty
Expand Down Expand Up @@ -823,7 +864,7 @@ func selectedAttributeNames(req []string) sqsAttributeSelection {
return sqsAttributeSelection{names: selection}
}

func queueMetaToAttributes(meta *sqsQueueMeta, selection sqsAttributeSelection) map[string]string {
func queueMetaToAttributes(meta *sqsQueueMeta, selection sqsAttributeSelection, counters *sqsApproxCounters) map[string]string {
// No AttributeNames supplied and no "All" → AWS returns nothing.
// The handler still emits "Attributes" as an empty map so the
// response shape is stable.
Expand All @@ -842,6 +883,15 @@ func queueMetaToAttributes(meta *sqsQueueMeta, selection sqsAttributeSelection)
if meta.RedrivePolicy != "" {
all["RedrivePolicy"] = meta.RedrivePolicy
}
// The three counters are populated only when the caller asked
// for them (or for "All"). When counters is nil the keys stay
// out of `all`, so the explicit-name selection branch below
// will not return zero strings for unrequested counters.
if counters != nil {
all["ApproximateNumberOfMessages"] = strconv.Itoa(counters.Visible)
all["ApproximateNumberOfMessagesNotVisible"] = strconv.Itoa(counters.NotVisible)
all["ApproximateNumberOfMessagesDelayed"] = strconv.Itoa(counters.Delayed)
}
if selection.expandAll {
return all
}
Expand Down
103 changes: 103 additions & 0 deletions adapter/sqs_catalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"io"
"net/http"
"strconv"
"strings"
"testing"

Expand Down Expand Up @@ -331,6 +332,108 @@ func TestSQSServer_GetQueueAttributesOmittedReturnsEmpty(t *testing.T) {
}
}

func TestSQSServer_GetQueueAttributesApproxCounters(t *testing.T) {
t.Parallel()
// AWS exposes three Approximate* counters on GetQueueAttributes:
// - ApproximateNumberOfMessages (visible right now)
// - ApproximateNumberOfMessagesNotVisible (in-flight after receive)
// - ApproximateNumberOfMessagesDelayed (sent with DelaySeconds, not yet eligible)
// Pre-Phase-3.A the adapter returned none of them. This test pins
// (a) that they appear under the All selector, (b) that they
// classify the three states correctly, and (c) that they appear
// only when explicitly requested or via All.
nodes, _, _ := createNode(t, 1)
defer shutdown(nodes)
node := sqsLeaderNode(t, nodes)
url := createSQSQueueForTest(t, node, "approx-counters")

// Send 3 ready messages and one delayed message.
for i := range 3 {
_, _ = callSQS(t, node, sqsSendMessageTarget, map[string]any{
"QueueUrl": url,
"MessageBody": "ready-" + strconv.Itoa(i),
})
}
_, _ = callSQS(t, node, sqsSendMessageTarget, map[string]any{
"QueueUrl": url,
"MessageBody": "delayed",
"DelaySeconds": 60,
})

// Receive 1 → in-flight.
_, _ = callSQS(t, node, sqsReceiveMessageTarget, map[string]any{
"QueueUrl": url,
"MaxNumberOfMessages": 1,
"VisibilityTimeout": 60,
})

status, out := callSQS(t, node, sqsGetQueueAttributesTarget, map[string]any{
"QueueUrl": url,
"AttributeNames": []string{"All"},
})
if status != http.StatusOK {
t.Fatalf("getAttrs All: %d %v", status, out)
}
attrs, _ := out["Attributes"].(map[string]any)
if got := attrs["ApproximateNumberOfMessages"]; got != "2" {
t.Errorf("ApproximateNumberOfMessages = %v, want 2 (3 sent - 1 in-flight)", got)
}
if got := attrs["ApproximateNumberOfMessagesNotVisible"]; got != "1" {
t.Errorf("ApproximateNumberOfMessagesNotVisible = %v, want 1", got)
}
if got := attrs["ApproximateNumberOfMessagesDelayed"]; got != "1" {
t.Errorf("ApproximateNumberOfMessagesDelayed = %v, want 1", got)
}
}

func TestSQSServer_GetQueueAttributesApproxCountersOnlyWhenSelected(t *testing.T) {
t.Parallel()
// The Approximate* counters trigger a visibility-index scan, so
// they must NOT be returned for callers that asked for unrelated
// attributes — both for cost and for AWS-shape parity (an
// explicit-name request only returns the listed names).
nodes, _, _ := createNode(t, 1)
defer shutdown(nodes)
node := sqsLeaderNode(t, nodes)
url := createSQSQueueForTest(t, node, "approx-isolation")
_, _ = callSQS(t, node, sqsSendMessageTarget, map[string]any{
"QueueUrl": url,
"MessageBody": "x",
})

// Only VisibilityTimeout: counters must be absent.
status, out := callSQS(t, node, sqsGetQueueAttributesTarget, map[string]any{
"QueueUrl": url,
"AttributeNames": []string{"VisibilityTimeout"},
})
if status != http.StatusOK {
t.Fatalf("getAttrs VisibilityTimeout: %d %v", status, out)
}
attrs, _ := out["Attributes"].(map[string]any)
if _, present := attrs["ApproximateNumberOfMessages"]; present {
t.Errorf("counters leaked into VisibilityTimeout-only request: %v", attrs)
}
if got := attrs["VisibilityTimeout"]; got == nil {
t.Errorf("VisibilityTimeout missing: %v", attrs)
}

// Only ApproximateNumberOfMessages: counter present, nothing else.
status, out = callSQS(t, node, sqsGetQueueAttributesTarget, map[string]any{
"QueueUrl": url,
"AttributeNames": []string{"ApproximateNumberOfMessages"},
})
if status != http.StatusOK {
t.Fatalf("getAttrs ApproximateNumberOfMessages: %d %v", status, out)
}
attrs, _ = out["Attributes"].(map[string]any)
if got := attrs["ApproximateNumberOfMessages"]; got != "1" {
t.Errorf("ApproximateNumberOfMessages = %v, want 1", got)
}
if _, present := attrs["VisibilityTimeout"]; present {
t.Errorf("VisibilityTimeout leaked into counter-only request: %v", attrs)
}
}

func TestSQSServer_SetQueueAttributesRequiresAttributes(t *testing.T) {
t.Parallel()
nodes, _, _ := createNode(t, 1)
Expand Down
Loading
Loading