Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 183 additions & 0 deletions adapter/sqs_admin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package adapter

import (
"context"
"strconv"
"strings"
"time"

"github.com/cockroachdb/errors"
)

// AdminQueueSummary is the per-queue projection the admin dashboard
// surfaces. It deliberately covers only the fields the SPA renders so
// the package's wire-format types stay internal.
//
// Counters mirror the AWS Approximate* attribute set produced by
// scanApproxCounters; they are best-effort by AWS contract and stop
// counting once the catalog's per-call cap is reached (the SPA polls
// continuously, so an unbounded scan would pin the leader).
type AdminQueueSummary struct {
Name string
IsFIFO bool
Generation uint64
CreatedAt time.Time
Attributes map[string]string
Counters AdminQueueCounters
}

// AdminQueueCounters matches sqsApproxCounters (int64) so the admin
// bridge does not have to convert between widths. Visible /
// NotVisible / Delayed are the AWS Approximate* triple.
type AdminQueueCounters struct {
Visible int64
NotVisible int64
Delayed int64
}

// AdminListQueues returns every queue name this server knows about,
// in the lexicographic order the queue catalog index produces. Read
// path; runs on follower or leader and uses the same scanQueueNames
// helper the SigV4 ListQueues handler does.
func (s *SQSServer) AdminListQueues(ctx context.Context) ([]string, error) {
return s.scanQueueNames(ctx) //nolint:wrapcheck // pure pass-through; the adapter owns the error context.
}

// AdminDescribeQueue returns a snapshot of name's metadata plus the
// approximate counters. The triple (result, present, error) lets
// admin callers distinguish a missing queue from a storage error
// without sniffing sentinels.
//
// Like AdminDescribeTable on the Dynamo side, this entrypoint runs
// on either the leader or a follower (read-only); the counter scan
// uses a fresh nextTxnReadTS so the result is consistent with what
// SigV4 GetQueueAttributes would have returned at the same instant.
func (s *SQSServer) AdminDescribeQueue(ctx context.Context, name string) (*AdminQueueSummary, bool, error) {
if strings.TrimSpace(name) == "" {
return nil, false, ErrAdminSQSValidation
Comment on lines +56 to +57
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Validate admin queue names against SQS naming rules

AdminDescribeQueue currently treats any non-blank string as a valid queue name, so malformed names (for example bad*name or overlength names) flow into metadata lookup and are reported as not found instead of a 400 validation error. That contradicts this file’s own ErrAdminSQSValidation contract (“missing or syntactically-bad queue name”) and diverges from the regular SQS path that uses validateQueueName; AdminDeleteQueue has the same gap. Reusing canonical queue-name validation here would preserve consistent client-visible semantics for invalid input.

Useful? React with 👍 / 👎.

}
readTS := s.nextTxnReadTS(ctx)
meta, exists, err := s.loadQueueMetaAt(ctx, name, readTS)
if err != nil {
return nil, false, errors.WithStack(err)
}
if !exists {
return nil, false, nil
}
counters, err := s.scanApproxCounters(ctx, name, meta.Generation, readTS)
if err != nil {
return nil, false, err
}
return adminQueueSummary(name, meta, counters, s.queueArn(name)), true, nil
}

// adminQueueSummary projects a queue meta + counters into the
// SPA-facing AdminQueueSummary. CreatedAt comes from the canonical
// wall-clock CreatedAtMillis (not CreatedAtHLC, which the meta's own
// comment calls "unsuitable for wall-clock display"); a zero millis
// value yields a zero time.Time so the JSON omitempty drops the field
// and the SPA renders "—" instead of an HLC-derived 1970 epoch.
// queueArn is threaded in by the caller (AdminDescribeQueue) because
// the server's region lives on *SQSServer and the helper itself is
// kept method-free for unit-testability without a coordinator.
// Pulled into a helper so the conversion is unit-testable without
// standing up a full coordinator.
func adminQueueSummary(name string, meta *sqsQueueMeta, counters sqsApproxCounters, queueArn string) *AdminQueueSummary {
var createdAt time.Time
if meta.CreatedAtMillis > 0 {
createdAt = time.UnixMilli(meta.CreatedAtMillis).UTC()
}
return &AdminQueueSummary{
Name: name,
IsFIFO: meta.IsFIFO,
Generation: meta.Generation,
CreatedAt: createdAt,
Attributes: metaAttributesForAdmin(meta, queueArn),
Counters: AdminQueueCounters(counters),
}
}

// AdminDeleteQueue is the SigV4-bypass counterpart to deleteQueue.
// Returns the same sentinel errors as AdminCreateTable on the Dynamo
// side: ErrAdminForbidden on a read-only principal, ErrAdminNotLeader
// on a follower, ErrAdminSQSNotFound when the queue is absent.
func (s *SQSServer) AdminDeleteQueue(ctx context.Context, principal AdminPrincipal, name string) error {
if !principal.Role.canWrite() {
return ErrAdminForbidden
}
if !isVerifiedSQSLeader(s.coordinator) {
return ErrAdminNotLeader
}
if strings.TrimSpace(name) == "" {
return ErrAdminSQSValidation
}
if err := s.deleteQueueWithRetry(ctx, name); err != nil {
// deleteQueueWithRetry returns sqsAPIError with
// sqsErrQueueDoesNotExist when the queue is missing; map
// to the structured ErrAdminSQSNotFound so the admin
// handler can render 404 without sniffing the AWS code.
if isSQSAdminQueueDoesNotExist(err) {
return ErrAdminSQSNotFound
}
return errors.Wrap(err, "admin delete queue")
}
return nil
}

// metaAttributesForAdmin renders the non-counter queue config
// attributes. Mirrors queueMetaToAttributes("All") (sqs_catalog.go)
// except for two deliberate omissions:
//
// - The Approximate* counters — the admin summary surfaces them as
// the typed AdminQueueCounters struct alongside this map, so the
// SPA can render them without round-tripping strings.
// - CreatedTimestamp — surfaced as the typed AdminQueueSummary.CreatedAt
// field for the same reason.
//
// LastModifiedTimestamp stays in the map (SetQueueAttributes updates
// LastModifiedAtMillis and operators need it for change-tracking;
// there is no dedicated typed field for it). QueueArn is included so
// the SPA can show the AWS-shaped identifier without recomputing it
// client-side.
func metaAttributesForAdmin(meta *sqsQueueMeta, queueArn string) map[string]string {
out := map[string]string{
"QueueArn": queueArn,
"VisibilityTimeout": strconv.FormatInt(meta.VisibilityTimeoutSeconds, 10),
"MessageRetentionPeriod": strconv.FormatInt(meta.MessageRetentionSeconds, 10),
"DelaySeconds": strconv.FormatInt(meta.DelaySeconds, 10),
"ReceiveMessageWaitTimeSeconds": strconv.FormatInt(meta.ReceiveMessageWaitSeconds, 10),
Comment on lines +143 to +148
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Expose full non-counter queue attributes in admin describe

The helper says it mirrors GetQueueAttributes("All") minus counters, but this map omits non-counter fields that queueMetaToAttributes includes, notably QueueArn, CreatedTimestamp, and LastModifiedTimestamp. That makes admin describe payloads incomplete versus the canonical SQS attribute surface and breaks parity checks between admin and SigV4 outputs.

Useful? React with 👍 / 👎.

"MaximumMessageSize": strconv.FormatInt(meta.MaximumMessageSize, 10),
"FifoQueue": strconv.FormatBool(meta.IsFIFO),
"ContentBasedDeduplication": strconv.FormatBool(meta.ContentBasedDedup),
}
if mod := meta.LastModifiedAtMillis; mod > 0 {
out["LastModifiedTimestamp"] = strconv.FormatInt(mod/sqsMillisPerSecond, 10)
}
if meta.RedrivePolicy != "" {
out["RedrivePolicy"] = meta.RedrivePolicy
}
return out
}

// ErrAdminSQSValidation is returned when an admin entrypoint receives
// a request with a missing or syntactically-bad queue name. Maps to
// 400 in the admin HTTP handler.
var ErrAdminSQSValidation = errors.New("sqs admin: invalid queue name")

// ErrAdminSQSNotFound is returned by write entrypoints when the
// target queue does not exist. Maps to 404. The describe path uses
// the (nil, false, nil) tuple instead of this sentinel for the
// not-found signal, mirroring AdminDescribeTable.
var ErrAdminSQSNotFound = errors.New("sqs admin: queue not found")

// isSQSAdminQueueDoesNotExist matches the deleteQueueWithRetry path's
// "queue does not exist" sqsAPIError so AdminDeleteQueue can normalise
// it to ErrAdminSQSNotFound. Falls through to false on any unrelated
// error, which AdminDeleteQueue then wraps and propagates.
func isSQSAdminQueueDoesNotExist(err error) bool {
var apiErr *sqsAPIError
if !errors.As(err, &apiErr) || apiErr == nil {
return false
}
return apiErr.errorType == sqsErrQueueDoesNotExist
}
124 changes: 124 additions & 0 deletions adapter/sqs_admin_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package adapter

import (
"strconv"
"testing"
"time"
)

const testQueueArn = "arn:aws:sqs:us-east-1:000000000000:orders"

// TestAdminQueueSummary_CreatedAtUsesMillisNotHLC pins the
// invariant that the admin AdminDescribeQueue path derives
// CreatedAt from sqsQueueMeta.CreatedAtMillis (the canonical
// wall-clock field), not from hlcToTime(CreatedAtHLC) — the meta
// struct documents HLC as "unsuitable for wall-clock display" and
// the SigV4 path (sqs_catalog.go:942) reads CreatedAtMillis. Two
// failure modes the test pins:
//
// 1. CreatedAtMillis == 0 must yield a zero time.Time so the JSON
// encoder's omitempty drops the field and the SPA renders "—"
// rather than the HLC-derived 1970-01-01T00:00:00Z.
// 2. CreatedAtMillis > 0 must round-trip through time.UnixMilli in
// UTC.
func TestAdminQueueSummary_CreatedAtUsesMillisNotHLC(t *testing.T) {
t.Parallel()

t.Run("zero millis yields zero time even with HLC populated", func(t *testing.T) {
t.Parallel()
meta := sqsQueueMeta{
Name: "orders",
Generation: 1,
CreatedAtHLC: 42 << s3HLCPhysicalShift, // would render as ~1970 epoch via hlcToTime
// CreatedAtMillis intentionally zero
}
summary := adminQueueSummary("orders", &meta, sqsApproxCounters{}, testQueueArn)
if !summary.CreatedAt.IsZero() {
t.Fatalf("CreatedAt should be zero when CreatedAtMillis==0; got %v", summary.CreatedAt)
}
})

t.Run("positive millis round-trips via time.UnixMilli UTC", func(t *testing.T) {
t.Parallel()
const wantMillis int64 = 1_724_419_200_000 // 2024-08-23T12:00:00Z
meta := sqsQueueMeta{
Name: "orders",
Generation: 2,
CreatedAtMillis: wantMillis,
CreatedAtHLC: 1, // must be ignored
}
summary := adminQueueSummary("orders", &meta, sqsApproxCounters{}, testQueueArn)
want := time.UnixMilli(wantMillis).UTC()
if !summary.CreatedAt.Equal(want) {
t.Fatalf("CreatedAt=%v want=%v", summary.CreatedAt, want)
}
if summary.CreatedAt.Location() != time.UTC {
t.Fatalf("CreatedAt location=%v want UTC", summary.CreatedAt.Location())
}
})
}

// TestMetaAttributesForAdmin_IncludesQueueArnAndLastModified pins
// the parity contract between metaAttributesForAdmin and
// queueMetaToAttributes("All"): QueueArn (the AWS-shaped identifier
// the SPA shows for change-tracking) and LastModifiedTimestamp
// (updated on SetQueueAttributes — the only handle operators have
// on "when did somebody last touch this queue's config") must both
// be present.
func TestMetaAttributesForAdmin_IncludesQueueArnAndLastModified(t *testing.T) {
t.Parallel()

t.Run("QueueArn always present", func(t *testing.T) {
t.Parallel()
meta := sqsQueueMeta{Name: "orders", Generation: 1}
attrs := metaAttributesForAdmin(&meta, testQueueArn)
got, ok := attrs["QueueArn"]
if !ok {
t.Fatalf("QueueArn missing from attributes: %v", attrs)
}
if got != testQueueArn {
t.Fatalf("QueueArn=%q want=%q", got, testQueueArn)
}
})

t.Run("LastModifiedTimestamp emitted in unix seconds when populated", func(t *testing.T) {
t.Parallel()
const wantMillis int64 = 1_724_419_200_000 // 2024-08-23T12:00:00Z
meta := sqsQueueMeta{
Name: "orders",
Generation: 1,
LastModifiedAtMillis: wantMillis,
}
attrs := metaAttributesForAdmin(&meta, testQueueArn)
got, ok := attrs["LastModifiedTimestamp"]
if !ok {
t.Fatalf("LastModifiedTimestamp missing from attributes: %v", attrs)
}
want := strconv.FormatInt(wantMillis/sqsMillisPerSecond, 10)
if got != want {
t.Fatalf("LastModifiedTimestamp=%q want=%q (unix seconds)", got, want)
}
})

t.Run("LastModifiedTimestamp omitted when zero", func(t *testing.T) {
t.Parallel()
meta := sqsQueueMeta{Name: "orders", Generation: 1}
attrs := metaAttributesForAdmin(&meta, testQueueArn)
if _, ok := attrs["LastModifiedTimestamp"]; ok {
t.Fatalf("LastModifiedTimestamp should be omitted when zero: got %q", attrs["LastModifiedTimestamp"])
}
})

t.Run("CreatedTimestamp deliberately not in map (typed field instead)", func(t *testing.T) {
t.Parallel()
meta := sqsQueueMeta{
Name: "orders",
Generation: 1,
CreatedAtMillis: 1_724_419_200_000,
}
attrs := metaAttributesForAdmin(&meta, testQueueArn)
if _, ok := attrs["CreatedTimestamp"]; ok {
t.Fatalf("CreatedTimestamp must NOT be in attrs (it lives on AdminQueueSummary.CreatedAt): got %q", attrs["CreatedTimestamp"])
}
})
}
Loading
Loading