diff --git a/adapter/sqs_admin.go b/adapter/sqs_admin.go
new file mode 100644
index 000000000..0788ff31a
--- /dev/null
+++ b/adapter/sqs_admin.go
@@ -0,0 +1,183 @@
+package adapter
+
+import (
+ "context"
+ "strconv"
+ "strings"
+ "time"
+
+ "github.com/cockroachdb/errors"
+)
+
+// AdminQueueSummary is the per-queue projection the admin dashboard
+// surfaces. It deliberately covers only the fields the SPA renders so
+// the package's wire-format types stay internal.
+//
+// Counters mirror the AWS Approximate* attribute set produced by
+// scanApproxCounters; they are best-effort by AWS contract and stop
+// counting once the catalog's per-call cap is reached (the SPA polls
+// continuously, so an unbounded scan would pin the leader).
+type AdminQueueSummary struct {
+ Name string
+ IsFIFO bool
+ Generation uint64
+ CreatedAt time.Time
+ Attributes map[string]string
+ Counters AdminQueueCounters
+}
+
+// AdminQueueCounters matches sqsApproxCounters (int64) so the admin
+// bridge does not have to convert between widths. Visible /
+// NotVisible / Delayed are the AWS Approximate* triple.
+type AdminQueueCounters struct {
+ Visible int64
+ NotVisible int64
+ Delayed int64
+}
+
+// AdminListQueues returns every queue name this server knows about,
+// in the lexicographic order the queue catalog index produces. Read
+// path; runs on follower or leader and uses the same scanQueueNames
+// helper the SigV4 ListQueues handler does.
+func (s *SQSServer) AdminListQueues(ctx context.Context) ([]string, error) {
+ return s.scanQueueNames(ctx) //nolint:wrapcheck // pure pass-through; the adapter owns the error context.
+}
+
+// AdminDescribeQueue returns a snapshot of name's metadata plus the
+// approximate counters. The triple (result, present, error) lets
+// admin callers distinguish a missing queue from a storage error
+// without sniffing sentinels.
+//
+// Like AdminDescribeTable on the Dynamo side, this entrypoint runs
+// on either the leader or a follower (read-only); the counter scan
+// uses a fresh nextTxnReadTS so the result is consistent with what
+// SigV4 GetQueueAttributes would have returned at the same instant.
+func (s *SQSServer) AdminDescribeQueue(ctx context.Context, name string) (*AdminQueueSummary, bool, error) {
+ if strings.TrimSpace(name) == "" {
+ return nil, false, ErrAdminSQSValidation
+ }
+ readTS := s.nextTxnReadTS(ctx)
+ meta, exists, err := s.loadQueueMetaAt(ctx, name, readTS)
+ if err != nil {
+ return nil, false, errors.WithStack(err)
+ }
+ if !exists {
+ return nil, false, nil
+ }
+ counters, err := s.scanApproxCounters(ctx, name, meta.Generation, readTS)
+ if err != nil {
+ return nil, false, err
+ }
+ return adminQueueSummary(name, meta, counters, s.queueArn(name)), true, nil
+}
+
+// adminQueueSummary projects a queue meta + counters into the
+// SPA-facing AdminQueueSummary. CreatedAt comes from the canonical
+// wall-clock CreatedAtMillis (not CreatedAtHLC, which the meta's own
+// comment calls "unsuitable for wall-clock display"); a zero millis
+// value yields a zero time.Time so the JSON omitempty drops the field
+// and the SPA renders "—" instead of an HLC-derived 1970 epoch.
+// queueArn is threaded in by the caller (AdminDescribeQueue) because
+// the server's region lives on *SQSServer and the helper itself is
+// kept method-free for unit-testability without a coordinator.
+// Pulled into a helper so the conversion is unit-testable without
+// standing up a full coordinator.
+func adminQueueSummary(name string, meta *sqsQueueMeta, counters sqsApproxCounters, queueArn string) *AdminQueueSummary {
+ var createdAt time.Time
+ if meta.CreatedAtMillis > 0 {
+ createdAt = time.UnixMilli(meta.CreatedAtMillis).UTC()
+ }
+ return &AdminQueueSummary{
+ Name: name,
+ IsFIFO: meta.IsFIFO,
+ Generation: meta.Generation,
+ CreatedAt: createdAt,
+ Attributes: metaAttributesForAdmin(meta, queueArn),
+ Counters: AdminQueueCounters(counters),
+ }
+}
+
+// AdminDeleteQueue is the SigV4-bypass counterpart to deleteQueue.
+// Returns the same sentinel errors as AdminCreateTable on the Dynamo
+// side: ErrAdminForbidden on a read-only principal, ErrAdminNotLeader
+// on a follower, ErrAdminSQSNotFound when the queue is absent.
+func (s *SQSServer) AdminDeleteQueue(ctx context.Context, principal AdminPrincipal, name string) error {
+ if !principal.Role.canWrite() {
+ return ErrAdminForbidden
+ }
+ if !isVerifiedSQSLeader(s.coordinator) {
+ return ErrAdminNotLeader
+ }
+ if strings.TrimSpace(name) == "" {
+ return ErrAdminSQSValidation
+ }
+ if err := s.deleteQueueWithRetry(ctx, name); err != nil {
+ // deleteQueueWithRetry returns sqsAPIError with
+ // sqsErrQueueDoesNotExist when the queue is missing; map
+ // to the structured ErrAdminSQSNotFound so the admin
+ // handler can render 404 without sniffing the AWS code.
+ if isSQSAdminQueueDoesNotExist(err) {
+ return ErrAdminSQSNotFound
+ }
+ return errors.Wrap(err, "admin delete queue")
+ }
+ return nil
+}
+
+// metaAttributesForAdmin renders the non-counter queue config
+// attributes. Mirrors queueMetaToAttributes("All") (sqs_catalog.go)
+// except for two deliberate omissions:
+//
+// - The Approximate* counters — the admin summary surfaces them as
+// the typed AdminQueueCounters struct alongside this map, so the
+// SPA can render them without round-tripping strings.
+// - CreatedTimestamp — surfaced as the typed AdminQueueSummary.CreatedAt
+// field for the same reason.
+//
+// LastModifiedTimestamp stays in the map (SetQueueAttributes updates
+// LastModifiedAtMillis and operators need it for change-tracking;
+// there is no dedicated typed field for it). QueueArn is included so
+// the SPA can show the AWS-shaped identifier without recomputing it
+// client-side.
+func metaAttributesForAdmin(meta *sqsQueueMeta, queueArn string) map[string]string {
+ out := map[string]string{
+ "QueueArn": queueArn,
+ "VisibilityTimeout": strconv.FormatInt(meta.VisibilityTimeoutSeconds, 10),
+ "MessageRetentionPeriod": strconv.FormatInt(meta.MessageRetentionSeconds, 10),
+ "DelaySeconds": strconv.FormatInt(meta.DelaySeconds, 10),
+ "ReceiveMessageWaitTimeSeconds": strconv.FormatInt(meta.ReceiveMessageWaitSeconds, 10),
+ "MaximumMessageSize": strconv.FormatInt(meta.MaximumMessageSize, 10),
+ "FifoQueue": strconv.FormatBool(meta.IsFIFO),
+ "ContentBasedDeduplication": strconv.FormatBool(meta.ContentBasedDedup),
+ }
+ if mod := meta.LastModifiedAtMillis; mod > 0 {
+ out["LastModifiedTimestamp"] = strconv.FormatInt(mod/sqsMillisPerSecond, 10)
+ }
+ if meta.RedrivePolicy != "" {
+ out["RedrivePolicy"] = meta.RedrivePolicy
+ }
+ return out
+}
+
+// ErrAdminSQSValidation is returned when an admin entrypoint receives
+// a request with a missing or syntactically-bad queue name. Maps to
+// 400 in the admin HTTP handler.
+var ErrAdminSQSValidation = errors.New("sqs admin: invalid queue name")
+
+// ErrAdminSQSNotFound is returned by write entrypoints when the
+// target queue does not exist. Maps to 404. The describe path uses
+// the (nil, false, nil) tuple instead of this sentinel for the
+// not-found signal, mirroring AdminDescribeTable.
+var ErrAdminSQSNotFound = errors.New("sqs admin: queue not found")
+
+// isSQSAdminQueueDoesNotExist matches the deleteQueueWithRetry path's
+// "queue does not exist" sqsAPIError so AdminDeleteQueue can normalise
+// it to ErrAdminSQSNotFound. Falls through to false on any unrelated
+// error, which AdminDeleteQueue then wraps and propagates.
+func isSQSAdminQueueDoesNotExist(err error) bool {
+ var apiErr *sqsAPIError
+ if !errors.As(err, &apiErr) || apiErr == nil {
+ return false
+ }
+ return apiErr.errorType == sqsErrQueueDoesNotExist
+}
diff --git a/adapter/sqs_admin_test.go b/adapter/sqs_admin_test.go
new file mode 100644
index 000000000..c91bf4f53
--- /dev/null
+++ b/adapter/sqs_admin_test.go
@@ -0,0 +1,124 @@
+package adapter
+
+import (
+ "strconv"
+ "testing"
+ "time"
+)
+
+const testQueueArn = "arn:aws:sqs:us-east-1:000000000000:orders"
+
+// TestAdminQueueSummary_CreatedAtUsesMillisNotHLC pins the
+// invariant that the admin AdminDescribeQueue path derives
+// CreatedAt from sqsQueueMeta.CreatedAtMillis (the canonical
+// wall-clock field), not from hlcToTime(CreatedAtHLC) — the meta
+// struct documents HLC as "unsuitable for wall-clock display" and
+// the SigV4 path (sqs_catalog.go:942) reads CreatedAtMillis. Two
+// failure modes the test pins:
+//
+// 1. CreatedAtMillis == 0 must yield a zero time.Time so the JSON
+// encoder's omitempty drops the field and the SPA renders "—"
+// rather than the HLC-derived 1970-01-01T00:00:00Z.
+// 2. CreatedAtMillis > 0 must round-trip through time.UnixMilli in
+// UTC.
+func TestAdminQueueSummary_CreatedAtUsesMillisNotHLC(t *testing.T) {
+ t.Parallel()
+
+ t.Run("zero millis yields zero time even with HLC populated", func(t *testing.T) {
+ t.Parallel()
+ meta := sqsQueueMeta{
+ Name: "orders",
+ Generation: 1,
+ CreatedAtHLC: 42 << s3HLCPhysicalShift, // would render as ~1970 epoch via hlcToTime
+ // CreatedAtMillis intentionally zero
+ }
+ summary := adminQueueSummary("orders", &meta, sqsApproxCounters{}, testQueueArn)
+ if !summary.CreatedAt.IsZero() {
+ t.Fatalf("CreatedAt should be zero when CreatedAtMillis==0; got %v", summary.CreatedAt)
+ }
+ })
+
+ t.Run("positive millis round-trips via time.UnixMilli UTC", func(t *testing.T) {
+ t.Parallel()
+ const wantMillis int64 = 1_724_419_200_000 // 2024-08-23T12:00:00Z
+ meta := sqsQueueMeta{
+ Name: "orders",
+ Generation: 2,
+ CreatedAtMillis: wantMillis,
+ CreatedAtHLC: 1, // must be ignored
+ }
+ summary := adminQueueSummary("orders", &meta, sqsApproxCounters{}, testQueueArn)
+ want := time.UnixMilli(wantMillis).UTC()
+ if !summary.CreatedAt.Equal(want) {
+ t.Fatalf("CreatedAt=%v want=%v", summary.CreatedAt, want)
+ }
+ if summary.CreatedAt.Location() != time.UTC {
+ t.Fatalf("CreatedAt location=%v want UTC", summary.CreatedAt.Location())
+ }
+ })
+}
+
+// TestMetaAttributesForAdmin_IncludesQueueArnAndLastModified pins
+// the parity contract between metaAttributesForAdmin and
+// queueMetaToAttributes("All"): QueueArn (the AWS-shaped identifier
+// the SPA shows for change-tracking) and LastModifiedTimestamp
+// (updated on SetQueueAttributes — the only handle operators have
+// on "when did somebody last touch this queue's config") must both
+// be present.
+func TestMetaAttributesForAdmin_IncludesQueueArnAndLastModified(t *testing.T) {
+ t.Parallel()
+
+ t.Run("QueueArn always present", func(t *testing.T) {
+ t.Parallel()
+ meta := sqsQueueMeta{Name: "orders", Generation: 1}
+ attrs := metaAttributesForAdmin(&meta, testQueueArn)
+ got, ok := attrs["QueueArn"]
+ if !ok {
+ t.Fatalf("QueueArn missing from attributes: %v", attrs)
+ }
+ if got != testQueueArn {
+ t.Fatalf("QueueArn=%q want=%q", got, testQueueArn)
+ }
+ })
+
+ t.Run("LastModifiedTimestamp emitted in unix seconds when populated", func(t *testing.T) {
+ t.Parallel()
+ const wantMillis int64 = 1_724_419_200_000 // 2024-08-23T12:00:00Z
+ meta := sqsQueueMeta{
+ Name: "orders",
+ Generation: 1,
+ LastModifiedAtMillis: wantMillis,
+ }
+ attrs := metaAttributesForAdmin(&meta, testQueueArn)
+ got, ok := attrs["LastModifiedTimestamp"]
+ if !ok {
+ t.Fatalf("LastModifiedTimestamp missing from attributes: %v", attrs)
+ }
+ want := strconv.FormatInt(wantMillis/sqsMillisPerSecond, 10)
+ if got != want {
+ t.Fatalf("LastModifiedTimestamp=%q want=%q (unix seconds)", got, want)
+ }
+ })
+
+ t.Run("LastModifiedTimestamp omitted when zero", func(t *testing.T) {
+ t.Parallel()
+ meta := sqsQueueMeta{Name: "orders", Generation: 1}
+ attrs := metaAttributesForAdmin(&meta, testQueueArn)
+ if _, ok := attrs["LastModifiedTimestamp"]; ok {
+ t.Fatalf("LastModifiedTimestamp should be omitted when zero: got %q", attrs["LastModifiedTimestamp"])
+ }
+ })
+
+ t.Run("CreatedTimestamp deliberately not in map (typed field instead)", func(t *testing.T) {
+ t.Parallel()
+ meta := sqsQueueMeta{
+ Name: "orders",
+ Generation: 1,
+ CreatedAtMillis: 1_724_419_200_000,
+ }
+ attrs := metaAttributesForAdmin(&meta, testQueueArn)
+ if _, ok := attrs["CreatedTimestamp"]; ok {
+ t.Fatalf("CreatedTimestamp must NOT be in attrs (it lives on AdminQueueSummary.CreatedAt): got %q", attrs["CreatedTimestamp"])
+ }
+ })
+}
diff --git a/internal/admin/server.go b/internal/admin/server.go
index 652a9fd84..c1e1587f7 100644
--- a/internal/admin/server.go
+++ b/internal/admin/server.go
@@ -61,6 +61,14 @@ type ServerDeps struct {
// off" state instead of an empty matrix.
KeyViz KeyVizSource
+ // Queues is the SQS admin source — covers list, describe, and
+ // delete via QueuesSource. Optional: a nil value disables
+ // /admin/api/v1/sqs/queues{,/{name}} (the mux answers them
+ // with 404). Same opt-in shape as Tables / Buckets; deployments
+ // that don't run the SQS adapter omit this without breaking the
+ // rest of the admin surface.
+ Queues QueuesSource
+
// StaticFS is the embed.FS (or any fs.FS) backing the SPA. May be
// nil during early development; the router renders 404 for
// /admin/assets/* and the SPA fallback in that case.
@@ -112,7 +120,8 @@ func NewServer(deps ServerDeps) (*Server, error) {
// nil it serves a 503 keyviz_disabled, which the SPA renders as
// a clearer "feature off" state than an unknown_endpoint 404.
keyviz := NewKeyVizHandler(deps.KeyViz).WithLogger(logger)
- mux := buildAPIMux(auth, deps.Verifier, cluster, dynamo, s3, keyviz, logger)
+ sqs := buildSqsHandlerForDeps(deps, logger)
+ mux := buildAPIMux(auth, deps.Verifier, cluster, dynamo, s3, keyviz, sqs, logger)
router := NewRouter(mux, deps.StaticFS)
return &Server{deps: deps, router: router, auth: auth, mux: mux}, nil
}
@@ -177,6 +186,20 @@ func buildS3HandlerForDeps(deps ServerDeps, logger *slog.Logger) http.Handler {
return NewS3Handler(deps.Buckets).WithLogger(logger)
}
+// buildSqsHandlerForDeps is the parallel constructor for the SQS
+// admin handler. Read paths are open to any session; the DELETE
+// path re-evaluates the principal's role against the live MapRoleStore
+// on every request, so a downgraded key cannot keep mutating with a
+// still-valid JWT.
+func buildSqsHandlerForDeps(deps ServerDeps, logger *slog.Logger) http.Handler {
+ if deps.Queues == nil {
+ return nil
+ }
+ return NewSqsHandler(deps.Queues).
+ WithLogger(logger).
+ WithRoleStore(MapRoleStore(deps.Roles))
+}
+
// Handler returns an http.Handler that serves the full admin surface.
// We wrap the router in BodyLimit at the top level so every endpoint
// — including /admin/healthz and the static asset / SPA paths — is
@@ -215,14 +238,14 @@ func (s *Server) APIHandler() http.Handler {
// audit path inside AuthService because the generic Audit middleware
// cannot see the claimed actor at that point in the chain.
//
-// dynamoHandler / s3Handler may be nil; in that case the corresponding
-// paths fall through to the unknown-endpoint 404, matching the
-// behaviour of any other unregistered admin path.
+// dynamoHandler / s3Handler / sqsHandler may be nil; in that case
+// the corresponding paths fall through to the unknown-endpoint 404,
+// matching the behaviour of any other unregistered admin path.
//
// keyvizHandler is always non-nil even when the sampler is disabled —
// it serves 503 keyviz_disabled itself so the SPA gets a clearer
// signal than an unknown_endpoint 404 from the catch-all.
-func buildAPIMux(auth *AuthService, verifier *Verifier, clusterHandler, dynamoHandler, s3Handler, keyvizHandler http.Handler, logger *slog.Logger) http.Handler {
+func buildAPIMux(auth *AuthService, verifier *Verifier, clusterHandler, dynamoHandler, s3Handler, keyvizHandler, sqsHandler http.Handler, logger *slog.Logger) http.Handler {
loginHandler := http.HandlerFunc(auth.HandleLogin)
logoutHandler := http.HandlerFunc(auth.HandleLogout)
@@ -290,6 +313,14 @@ func buildAPIMux(auth *AuthService, verifier *Verifier, clusterHandler, dynamoHa
if s3Handler != nil {
s3Chain = protect(s3Handler)
}
+ // SQS endpoints share the same protect chain rationale: GET
+ // reads are session-gated to keep cross-site fetches from
+ // enumerating queue names; DELETE goes through CSRF + the
+ // in-handler RoleFull check inside SqsHandler.
+ var sqsChain http.Handler
+ if sqsHandler != nil {
+ sqsChain = protect(sqsHandler)
+ }
routes := apiRouteTable{
login: loginChain,
@@ -298,6 +329,7 @@ func buildAPIMux(auth *AuthService, verifier *Verifier, clusterHandler, dynamoHa
dynamo: dynamoChain,
s3: s3Chain,
keyviz: keyvizChain,
+ sqs: sqsChain,
}
return http.HandlerFunc(routes.dispatch)
}
@@ -309,29 +341,55 @@ func buildAPIMux(auth *AuthService, verifier *Verifier, clusterHandler, dynamoHa
// would otherwise push buildAPIMux's branch count past the limit.
type apiRouteTable struct {
login, logout, cluster http.Handler
- dynamo, s3 http.Handler
+ dynamo, s3, sqs http.Handler
keyviz http.Handler
}
// dispatch is the receiver method httpHandlerFunc adapts. Logic is
-// the same path-prefix switch the call site previously inlined.
+// the same path-prefix switch the call site previously inlined; the
+// resource-prefix half of it lives in resourceHandlerFor so this
+// function stays under the cyclop ceiling as new resources land.
func (t apiRouteTable) dispatch(w http.ResponseWriter, r *http.Request) {
- switch {
- case r.URL.Path == "/admin/api/v1/auth/login":
+ switch r.URL.Path {
+ case "/admin/api/v1/auth/login":
t.login.ServeHTTP(w, r)
- case r.URL.Path == "/admin/api/v1/auth/logout":
+ return
+ case "/admin/api/v1/auth/logout":
t.logout.ServeHTTP(w, r)
- case r.URL.Path == "/admin/api/v1/cluster":
+ return
+ case "/admin/api/v1/cluster":
t.cluster.ServeHTTP(w, r)
- case r.URL.Path == "/admin/api/v1/keyviz/matrix":
- t.keyviz.ServeHTTP(w, r)
- case t.dynamo != nil && isDynamoPath(r.URL.Path):
- t.dynamo.ServeHTTP(w, r)
- case t.s3 != nil && isS3Path(r.URL.Path):
- t.s3.ServeHTTP(w, r)
+ return
+ }
+ if h := t.resourceHandlerFor(r.URL.Path); h != nil {
+ h.ServeHTTP(w, r)
+ return
+ }
+ writeJSONError(w, http.StatusNotFound, "unknown_endpoint",
+ "no admin API handler is registered for this path")
+}
+
+// resourceHandlerFor returns the handler that owns the URL path's
+// resource family, or nil when no resource matches. Pulled out of
+// dispatch so dispatch stays under cyclop=10 even as new admin
+// resources (Dynamo, S3, SQS, KeyViz, future) get added.
+//
+// KeyViz is *always* registered (the constructor wires a non-nil
+// handler that itself emits 503 keyviz_disabled when the underlying
+// sampler is nil), so the switch matches against an exact path
+// equality and never against a nil receiver.
+func (t apiRouteTable) resourceHandlerFor(path string) http.Handler {
+ switch {
+ case t.keyviz != nil && path == "/admin/api/v1/keyviz/matrix":
+ return t.keyviz
+ case t.dynamo != nil && isDynamoPath(path):
+ return t.dynamo
+ case t.s3 != nil && isS3Path(path):
+ return t.s3
+ case t.sqs != nil && isSqsPath(path):
+ return t.sqs
default:
- writeJSONError(w, http.StatusNotFound, "unknown_endpoint",
- "no admin API handler is registered for this path")
+ return nil
}
}
@@ -343,6 +401,10 @@ func isS3Path(p string) bool {
return p == pathS3Buckets || strings.HasPrefix(p, pathPrefixS3Buckets)
}
+func isSqsPath(p string) bool {
+ return p == pathSqsQueues || strings.HasPrefix(p, pathPrefixSqsQueues)
+}
+
func errMissing(field string) error {
return &missingDepError{field: field}
}
diff --git a/internal/admin/sqs_handler.go b/internal/admin/sqs_handler.go
new file mode 100644
index 000000000..f30877627
--- /dev/null
+++ b/internal/admin/sqs_handler.go
@@ -0,0 +1,305 @@
+package admin
+
+import (
+ "context"
+ "errors"
+ "log/slog"
+ "net/http"
+ "strconv"
+ "strings"
+ "time"
+
+ "github.com/goccy/go-json"
+)
+
+// pathSqsQueues is the URL prefix the SQS handler owns. The "" suffix
+// produces the collection root /admin/api/v1/sqs/queues; the
+// pathPrefixSqsQueues form is used for the per-queue routes.
+const (
+ pathSqsQueues = "/admin/api/v1/sqs/queues"
+ pathPrefixSqsQueues = pathSqsQueues + "/"
+)
+
+// QueueSummary is the SPA-facing projection of a single SQS queue.
+// Mirrors adapter.AdminQueueSummary 1:1; the bridge in main_admin.go
+// translates between the two so the admin package stays free of the
+// adapter dependency tree.
+//
+// CreatedAt is a pointer so omitempty actually drops the field when
+// the underlying queue has no wall-clock creation timestamp. Both
+// encoding/json and goccy/go-json serialise a zero time.Time value
+// as "0001-01-01T00:00:00Z" rather than dropping it, so the SPA
+// would render an ancient date instead of the "—" placeholder its
+// `created_at ? formatted : "—"` guard implies. The pointer makes
+// the absent-vs-zero distinction explicit on the wire.
+type QueueSummary struct {
+ Name string `json:"name"`
+ IsFIFO bool `json:"is_fifo"`
+ Generation uint64 `json:"generation"`
+ CreatedAt *time.Time `json:"created_at,omitempty"`
+ Attributes map[string]string `json:"attributes,omitempty"`
+ Counters QueueCounters `json:"counters"`
+}
+
+// QueueCounters mirrors the three Approximate* counters AWS exposes
+// on GetQueueAttributes. Definitions follow §16.1 of the SQS design
+// doc.
+type QueueCounters struct {
+ Visible int64 `json:"visible"`
+ NotVisible int64 `json:"not_visible"`
+ Delayed int64 `json:"delayed"`
+}
+
+// QueuesSource is the contract the SQS handler depends on. Wired in
+// production to *adapter.SQSServer via a small bridge in main_admin.go;
+// tests use a stub.
+//
+// AdminDescribeQueue returns (nil, false, nil) for a missing queue so
+// callers can distinguish "not found" from a storage error without
+// sniffing sentinels. AdminDeleteQueue returns the structured
+// sentinels below so the handler can map them to HTTP statuses
+// without leaking the adapter's error vocabulary.
+type QueuesSource interface {
+ AdminListQueues(ctx context.Context) ([]string, error)
+ AdminDescribeQueue(ctx context.Context, name string) (*QueueSummary, bool, error)
+ AdminDeleteQueue(ctx context.Context, principal AuthPrincipal, name string) error
+}
+
+// Errors the QueuesSource may return for the handler to map onto a
+// specific HTTP response. Sentinels rather than typed errors so the
+// bridge can map any adapter-internal failure onto exactly one of
+// these without the admin package importing adapter-private types.
+var (
+ // ErrQueuesForbidden — principal lacks the role required (403).
+ ErrQueuesForbidden = errors.New("admin sqs: principal lacks required role")
+ // ErrQueuesNotLeader — local node is not the verified Raft
+ // leader. Without follower-forwarding wired (out of scope for
+ // the SPA's read+delete surface), maps to 503 + Retry-After: 1.
+ ErrQueuesNotLeader = errors.New("admin sqs: local node is not the raft leader")
+ // ErrQueuesNotFound — DELETE / DESCRIBE targets a queue that
+ // does not exist (404). The describe path uses (nil, false, nil)
+ // instead of this sentinel for the not-found signal.
+ ErrQueuesNotFound = errors.New("admin sqs: queue not found")
+ // ErrQueuesValidation — request shape is bad (400).
+ ErrQueuesValidation = errors.New("admin sqs: validation failed")
+)
+
+// SqsHandler serves /admin/api/v1/sqs/queues and
+// /admin/api/v1/sqs/queues/{name}. Reads (list, describe) accept GET;
+// delete accepts DELETE and goes through the same protected
+// middleware chain (BodyLimit -> SessionAuth -> Audit -> CSRF) as
+// every other write surface, with an in-handler RoleFull gate so a
+// read-only key cannot delete even with a valid CSRF token.
+type SqsHandler struct {
+ source QueuesSource
+ roles RoleStore
+ logger *slog.Logger
+}
+
+// NewSqsHandler binds the source and seeds logging with
+// slog.Default(). Use WithLogger to attach a tagged logger and
+// WithRoleStore to plug in the live access-key role lookup so a
+// downgraded key cannot continue mutating with a still-valid JWT.
+func NewSqsHandler(source QueuesSource) *SqsHandler {
+ return &SqsHandler{source: source, logger: slog.Default()}
+}
+
+// WithLogger overrides the default slog destination. No-ops on nil to
+// preserve the constructor-seeded slog.Default().
+func (h *SqsHandler) WithLogger(l *slog.Logger) *SqsHandler {
+ if l == nil {
+ return h
+ }
+ h.logger = l
+ return h
+}
+
+// WithRoleStore enables per-request role revalidation on the delete
+// endpoint. Without it, the handler trusts whatever role is embedded
+// in the session JWT — which is fine for single-tenant deployments
+// where the role config never changes, but problematic when an
+// operator revokes or downgrades a key. Production wiring in
+// main_admin.go always sets this.
+func (h *SqsHandler) WithRoleStore(r RoleStore) *SqsHandler {
+ h.roles = r
+ return h
+}
+
+// ServeHTTP routes /queues and /queues/{name}. Method handling
+// mirrors DynamoHandler — keep the two parallel so an operator
+// reading one understands the other for free.
+func (h *SqsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ switch {
+ case r.URL.Path == pathSqsQueues:
+ switch r.Method {
+ case http.MethodGet:
+ h.handleList(w, r)
+ default:
+ writeJSONError(w, http.StatusMethodNotAllowed, "method_not_allowed", "only GET")
+ }
+ case strings.HasPrefix(r.URL.Path, pathPrefixSqsQueues):
+ name := strings.TrimPrefix(r.URL.Path, pathPrefixSqsQueues)
+ switch r.Method {
+ case http.MethodGet:
+ h.handleDescribe(w, r, name)
+ case http.MethodDelete:
+ h.handleDelete(w, r, name)
+ default:
+ writeJSONError(w, http.StatusMethodNotAllowed, "method_not_allowed", "only GET or DELETE")
+ }
+ default:
+ writeJSONError(w, http.StatusNotFound, "unknown_endpoint",
+ "no admin SQS handler is registered for this path")
+ }
+}
+
+type listQueuesResponse struct {
+ Queues []string `json:"queues"`
+}
+
+func (h *SqsHandler) handleList(w http.ResponseWriter, r *http.Request) {
+ names, err := h.source.AdminListQueues(r.Context())
+ if err != nil {
+ h.logger.LogAttrs(r.Context(), slog.LevelError, "admin sqs list queues failed",
+ slog.String("error", err.Error()),
+ )
+ writeJSONError(w, http.StatusInternalServerError, "list_failed",
+ "failed to list queues; see server logs")
+ return
+ }
+ // Force the empty-result case to render as `{"queues": []}` rather
+ // than `{"queues": null}`. The SPA iterates the array directly and
+ // would crash on null. AdminListQueues returns nil when no queues
+ // exist, so the normalisation has to happen here before encoding.
+ if names == nil {
+ names = []string{}
+ }
+ w.Header().Set("Content-Type", "application/json; charset=utf-8")
+ w.Header().Set("Cache-Control", "no-store")
+ w.WriteHeader(http.StatusOK)
+ if err := json.NewEncoder(w).Encode(listQueuesResponse{Queues: names}); err != nil {
+ h.logger.LogAttrs(r.Context(), slog.LevelWarn, "admin sqs list response encode failed",
+ slog.String("error", err.Error()),
+ )
+ }
+}
+
+func (h *SqsHandler) handleDescribe(w http.ResponseWriter, r *http.Request, name string) {
+ if strings.TrimSpace(name) == "" {
+ writeJSONError(w, http.StatusBadRequest, "invalid_queue_name", "queue name is required")
+ return
+ }
+ summary, exists, err := h.source.AdminDescribeQueue(r.Context(), name)
+ if err != nil {
+ writeQueuesError(w, err, h.logger, r)
+ return
+ }
+ if !exists {
+ writeJSONError(w, http.StatusNotFound, "queue_not_found",
+ "no queue is registered with that name")
+ return
+ }
+ w.Header().Set("Content-Type", "application/json; charset=utf-8")
+ w.Header().Set("Cache-Control", "no-store")
+ w.WriteHeader(http.StatusOK)
+ if err := json.NewEncoder(w).Encode(summary); err != nil {
+ h.logger.LogAttrs(r.Context(), slog.LevelWarn, "admin sqs describe response encode failed",
+ slog.String("error", err.Error()),
+ )
+ }
+}
+
+func (h *SqsHandler) handleDelete(w http.ResponseWriter, r *http.Request, name string) {
+ principal, ok := h.principalForWrite(w, r)
+ if !ok {
+ return
+ }
+ if strings.TrimSpace(name) == "" {
+ writeJSONError(w, http.StatusBadRequest, "invalid_queue_name", "queue name is required")
+ return
+ }
+ if err := h.source.AdminDeleteQueue(r.Context(), principal, name); err != nil {
+ writeQueuesError(w, err, h.logger, r)
+ return
+ }
+ w.Header().Set("Cache-Control", "no-store")
+ w.WriteHeader(http.StatusNoContent)
+}
+
+// principalForWrite resolves the live role from the RoleStore (when
+// configured), gates the request, and returns the principal with the
+// **live** role overridden in place — so the role that flows downstream
+// to the adapter is the one the operator currently has, not whatever
+// the JWT happens to remember. Mirrors DynamoHandler.principalForWrite.
+// Without the role override, a JWT-read_only / store-full promoted key
+// passes the handler-side check but the adapter rejects with
+// ErrAdminForbidden, forcing the user to log out and back in for a
+// delete to work.
+//
+// Failure paths write the response and return ok=false; callers
+// short-circuit on the bool. Logged-out / wrong-role callers never
+// reach the source layer, so the leader's identity is not leaked
+// by indirection (forbidden response is the same shape regardless
+// of leadership state).
+func (h *SqsHandler) principalForWrite(w http.ResponseWriter, r *http.Request) (AuthPrincipal, bool) {
+ principal, ok := PrincipalFromContext(r.Context())
+ if !ok {
+ // SessionAuth runs before this handler, so a missing
+ // principal is a wiring bug. 500 rather than 401 since
+ // 401 would be misleading — the request was authenticated.
+ writeJSONError(w, http.StatusInternalServerError, "internal", "missing session principal")
+ return AuthPrincipal{}, false
+ }
+ if h.roles != nil {
+ live, exists := h.roles.LookupRole(principal.AccessKey)
+ if !exists {
+ // Key has been removed from the role config since
+ // login. Treat it as no-access regardless of what
+ // the JWT claimed.
+ writeJSONError(w, http.StatusForbidden, "forbidden",
+ "this access key is not authorised to delete queues")
+ return AuthPrincipal{}, false
+ }
+ if !live.AllowsWrite() {
+ writeJSONError(w, http.StatusForbidden, "forbidden",
+ "this access key is not authorised to delete queues")
+ return AuthPrincipal{}, false
+ }
+ // Forward the live role downstream so the adapter
+ // re-check sees the same role the handler gated on.
+ // Without this, a key promoted from read_only → full
+ // after login still hits the adapter with the JWT's
+ // stale read_only and gets a confusing 403.
+ principal.Role = live
+ } else if !principal.Role.AllowsWrite() {
+ writeJSONError(w, http.StatusForbidden, "forbidden",
+ "this access key is not authorised to delete queues")
+ return AuthPrincipal{}, false
+ }
+ return principal, true
+}
+
+// writeQueuesError translates a QueuesSource error onto an HTTP
+// response. Unrecognised errors map to 500 with a sanitised message
+// — the raw err.Error() may include adapter internals (Pebble paths,
+// raft peer ids) that should not flow to the SPA.
+func writeQueuesError(w http.ResponseWriter, err error, logger *slog.Logger, r *http.Request) {
+ switch {
+ case errors.Is(err, ErrQueuesForbidden):
+ writeJSONError(w, http.StatusForbidden, "forbidden", "principal lacks required role")
+ case errors.Is(err, ErrQueuesNotLeader):
+ w.Header().Set("Retry-After", strconv.Itoa(1))
+ writeJSONError(w, http.StatusServiceUnavailable, "leader_unavailable",
+ "local node is not the leader; retry shortly")
+ case errors.Is(err, ErrQueuesNotFound):
+ writeJSONError(w, http.StatusNotFound, "queue_not_found", "no queue with that name")
+ case errors.Is(err, ErrQueuesValidation):
+ writeJSONError(w, http.StatusBadRequest, "invalid_request", err.Error())
+ default:
+ logger.LogAttrs(r.Context(), slog.LevelError, "admin sqs operation failed",
+ slog.String("error", err.Error()),
+ )
+ writeJSONError(w, http.StatusInternalServerError, "internal",
+ "queue operation failed; see server logs")
+ }
+}
diff --git a/internal/admin/sqs_handler_test.go b/internal/admin/sqs_handler_test.go
new file mode 100644
index 000000000..30ebe23e6
--- /dev/null
+++ b/internal/admin/sqs_handler_test.go
@@ -0,0 +1,203 @@
+package admin
+
+import (
+ "context"
+ "errors"
+ "net/http"
+ "net/http/httptest"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+)
+
+// stubQueuesSource is the in-memory test double for SqsHandler.
+// Mirrors stubTablesSource in dynamo_handler_test.go: records the
+// principal that flowed through to the source so tests can assert
+// live-role forwarding.
+type stubQueuesSource struct {
+ queues []string
+ describeErr error
+ deleteErr error
+ lastDeleteName string
+ lastDeletePrincipal AuthPrincipal
+}
+
+func (s *stubQueuesSource) AdminListQueues(_ context.Context) ([]string, error) {
+ return s.queues, nil
+}
+
+func (s *stubQueuesSource) AdminDescribeQueue(_ context.Context, name string) (*QueueSummary, bool, error) {
+ if s.describeErr != nil {
+ return nil, false, s.describeErr
+ }
+ for _, q := range s.queues {
+ if q == name {
+ return &QueueSummary{Name: name}, true, nil
+ }
+ }
+ return nil, false, nil
+}
+
+func (s *stubQueuesSource) AdminDeleteQueue(_ context.Context, principal AuthPrincipal, name string) error {
+ s.lastDeleteName = name
+ s.lastDeletePrincipal = principal
+ if s.deleteErr != nil {
+ return s.deleteErr
+ }
+ for i, q := range s.queues {
+ if q == name {
+ s.queues = append(s.queues[:i], s.queues[i+1:]...)
+ return nil
+ }
+ }
+ return ErrQueuesNotFound
+}
+
+// TestSqsHandler_DeleteQueue_LivePromotion pins the live-role
+// forwarding contract: a JWT minted while the access key was
+// read_only must, after the operator promotes the key to full in
+// the live RoleStore, be allowed to delete *and* the principal
+// arriving at AdminDeleteQueue must carry the live role (full),
+// not the JWT's stale role.
+//
+// The bug before this fix: principalCanWrite returned true based on
+// the live role, but the unmodified principal (with JWT read_only)
+// was passed to AdminDeleteQueue, which independently checked
+// principal.Role.canWrite() and returned ErrAdminForbidden. The
+// user saw 403 and had to log out + back in for the new role to
+// take effect.
+func TestSqsHandler_DeleteQueue_LivePromotion(t *testing.T) {
+ src := &stubQueuesSource{queues: []string{"orders"}}
+ roles := MapRoleStore{"AKIA_PROMOTED": RoleFull}
+ h := NewSqsHandler(src).WithRoleStore(roles)
+
+ req := httptest.NewRequest(http.MethodDelete, pathPrefixSqsQueues+"orders", nil)
+ // JWT was minted while the key was still read_only. The live
+ // role store has since been updated to full.
+ req = req.WithContext(context.WithValue(req.Context(), ctxKeyPrincipal,
+ AuthPrincipal{AccessKey: "AKIA_PROMOTED", Role: RoleReadOnly}))
+ rec := httptest.NewRecorder()
+ h.ServeHTTP(rec, req)
+
+ require.Equal(t, http.StatusNoContent, rec.Code,
+ "promoted key must succeed at the handler layer; body=%s", rec.Body.String())
+ require.Equal(t, "orders", src.lastDeleteName)
+ require.Equal(t, RoleFull, src.lastDeletePrincipal.Role,
+ "principal forwarded to AdminDeleteQueue must carry the live role (RoleFull), not the JWT's stale RoleReadOnly")
+ require.Equal(t, "AKIA_PROMOTED", src.lastDeletePrincipal.AccessKey)
+}
+
+// TestSqsHandler_DeleteQueue_LiveRevocation is the symmetric case
+// the live-role gate exists for in the first place: a JWT minted
+// while full but the key was later removed from full_access_keys
+// (or downgraded to read_only) — the request must be rejected at
+// the handler, never reaching the source.
+func TestSqsHandler_DeleteQueue_LiveRevocation(t *testing.T) {
+ src := &stubQueuesSource{queues: []string{"orders"}}
+ roles := MapRoleStore{"AKIA_DOWNGRADED": RoleReadOnly}
+ h := NewSqsHandler(src).WithRoleStore(roles)
+
+ req := httptest.NewRequest(http.MethodDelete, pathPrefixSqsQueues+"orders", nil)
+ req = req.WithContext(context.WithValue(req.Context(), ctxKeyPrincipal,
+ AuthPrincipal{AccessKey: "AKIA_DOWNGRADED", Role: RoleFull}))
+ rec := httptest.NewRecorder()
+ h.ServeHTTP(rec, req)
+
+ require.Equal(t, http.StatusForbidden, rec.Code)
+ require.Empty(t, src.lastDeleteName, "source must not be reached when role is revoked")
+}
+
+// TestSqsHandler_DeleteQueue_KeyRemovedFromStore covers the third
+// edge of the live-role gate: a JWT minted while authorised but the
+// access key was *removed entirely* from the role config (operator
+// rotated credentials). The request must 403 — same shape as the
+// downgrade case — and never reach the source.
+func TestSqsHandler_DeleteQueue_KeyRemovedFromStore(t *testing.T) {
+ src := &stubQueuesSource{queues: []string{"orders"}}
+ roles := MapRoleStore{} // key not present at all
+ h := NewSqsHandler(src).WithRoleStore(roles)
+
+ req := httptest.NewRequest(http.MethodDelete, pathPrefixSqsQueues+"orders", nil)
+ req = req.WithContext(context.WithValue(req.Context(), ctxKeyPrincipal,
+ AuthPrincipal{AccessKey: "AKIA_GONE", Role: RoleFull}))
+ rec := httptest.NewRecorder()
+ h.ServeHTTP(rec, req)
+
+ require.Equal(t, http.StatusForbidden, rec.Code)
+ require.Empty(t, src.lastDeleteName)
+}
+
+// TestSqsHandler_DeleteQueue_NoRoleStore covers the single-tenant
+// default: when no RoleStore is wired, the handler trusts the JWT's
+// embedded role. A JWT-full request succeeds; a JWT-read_only
+// request is rejected at the handler.
+func TestSqsHandler_DeleteQueue_NoRoleStore(t *testing.T) {
+ t.Run("jwt full succeeds", func(t *testing.T) {
+ src := &stubQueuesSource{queues: []string{"orders"}}
+ h := NewSqsHandler(src) // no WithRoleStore
+ req := httptest.NewRequest(http.MethodDelete, pathPrefixSqsQueues+"orders", nil)
+ req = req.WithContext(context.WithValue(req.Context(), ctxKeyPrincipal,
+ AuthPrincipal{AccessKey: "AKIA_FULL", Role: RoleFull}))
+ rec := httptest.NewRecorder()
+ h.ServeHTTP(rec, req)
+ require.Equal(t, http.StatusNoContent, rec.Code)
+ require.Equal(t, RoleFull, src.lastDeletePrincipal.Role)
+ })
+ t.Run("jwt read-only is forbidden", func(t *testing.T) {
+ src := &stubQueuesSource{queues: []string{"orders"}}
+ h := NewSqsHandler(src)
+ req := httptest.NewRequest(http.MethodDelete, pathPrefixSqsQueues+"orders", nil)
+ req = req.WithContext(context.WithValue(req.Context(), ctxKeyPrincipal,
+ AuthPrincipal{AccessKey: "AKIA_RO", Role: RoleReadOnly}))
+ rec := httptest.NewRecorder()
+ h.ServeHTTP(rec, req)
+ require.Equal(t, http.StatusForbidden, rec.Code)
+ require.Empty(t, src.lastDeleteName)
+ })
+}
+
+// TestSqsHandler_ListQueues_EmptyArrayNotNull pins the nil→[]
+// normalisation in the list handler. Without it the empty-catalog
+// case serialises as `{"queues": null}` and the SPA crashes on
+// `queues.length` against null.
+func TestSqsHandler_ListQueues_EmptyArrayNotNull(t *testing.T) {
+ src := &stubQueuesSource{queues: nil}
+ h := NewSqsHandler(src)
+ req := httptest.NewRequest(http.MethodGet, pathSqsQueues, nil)
+ rec := httptest.NewRecorder()
+ h.ServeHTTP(rec, req)
+ require.Equal(t, http.StatusOK, rec.Code)
+ require.Contains(t, rec.Body.String(), `"queues":[]`,
+ "empty catalog must serialise as [] not null; body=%s", rec.Body.String())
+}
+
+// TestSqsHandler_DescribeQueue_ZeroCreatedAtIsOmittedOnTheWire pins
+// the wire-level contract that a queue with no wall-clock creation
+// timestamp does not surface a Go-zero time.Time on the wire.
+// time.Time with `omitempty` is NOT dropped by encoding/json or
+// goccy/go-json when zero — it serialises as "0001-01-01T00:00:00Z"
+// and the SPA renders an ancient date instead of "—". The fix
+// switched QueueSummary.CreatedAt to *time.Time and the bridge
+// converts a zero time.Time to nil. This test exercises the wire
+// representation, not the Go-side IsZero() check the adapter unit
+// test already pins.
+func TestSqsHandler_DescribeQueue_ZeroCreatedAtIsOmittedOnTheWire(t *testing.T) {
+ // AdminDescribeQueue stub returns a QueueSummary with no CreatedAt
+ // set (nil pointer, the post-bridge representation of an unknown
+ // CreatedAtMillis).
+ src := &stubQueuesSource{queues: []string{"orders"}}
+ h := NewSqsHandler(src)
+ req := httptest.NewRequest(http.MethodGet, pathPrefixSqsQueues+"orders", nil)
+ rec := httptest.NewRecorder()
+ h.ServeHTTP(rec, req)
+ require.Equal(t, http.StatusOK, rec.Code)
+ body := rec.Body.String()
+ require.NotContains(t, body, "0001-01-01T00:00:00Z",
+ "a queue with no wall-clock timestamp must not surface the Go zero time on the wire; body=%s", body)
+ require.NotContains(t, body, `"created_at":`,
+ "created_at must be omitted entirely when the queue has no wall-clock timestamp so the SPA renders the placeholder; body=%s", body)
+}
+
+// helper to silence the unused-import warning when errors is only
+// referenced inside one of the test functions.
+var _ = errors.New
diff --git a/main.go b/main.go
index 05176eff0..02f09f7fe 100644
--- a/main.go
+++ b/main.go
@@ -764,7 +764,7 @@ func startServers(in serversInput) error {
// the handler hands ErrTablesNotLeader writes to the forwarder
// which dials the leader over the cached gRPC pool. Without these
// the handler falls back to 503 + Retry-After:1.
- if err := startAdminFromFlags(in.ctx, in.lc, in.eg, in.runtimes, runner.dynamoServer, runner.s3Server, in.coordinate, connCache, in.keyvizSampler); err != nil {
+ if err := startAdminFromFlags(in.ctx, in.lc, in.eg, in.runtimes, runner.dynamoServer, runner.s3Server, runner.sqsServer, in.coordinate, connCache, in.keyvizSampler); err != nil {
return waitErrgroupAfterStartupFailure(in.cancel, in.eg, err)
}
return nil
@@ -1309,6 +1309,12 @@ type runtimeServerRunner struct {
// 404, mirroring the dynamoServer == nil contract.
s3Server *adapter.S3Server
+ // sqsServer plays the same role for the SQS admin entrypoints
+ // (adapter/sqs_admin.go). Nil when --sqsAddress is empty; the
+ // admin listener then leaves /admin/api/v1/sqs/* off the wire
+ // (the mux 404s those paths).
+ sqsServer *adapter.SQSServer
+
// roleStore is the access-key → role index the leader-side
// gRPC AdminForward service uses to re-validate the principal
// on every forwarded write. Mirrors what admin.Config.RoleIndex
@@ -1362,9 +1368,11 @@ func (r *runtimeServerRunner) start() error {
return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err)
}
r.s3Server = s3Server
- if err := startSQSServer(r.ctx, r.lc, r.eg, r.sqsAddress, r.shardStore, r.coordinate, r.leaderSQS, r.sqsRegion, r.sqsCredsFile); err != nil {
+ sqsServer, err := startSQSServer(r.ctx, r.lc, r.eg, r.sqsAddress, r.shardStore, r.coordinate, r.leaderSQS, r.sqsRegion, r.sqsCredsFile)
+ if err != nil {
return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err)
}
+ r.sqsServer = sqsServer
if err := startMetricsServer(r.ctx, r.lc, r.eg, r.metricsAddress, r.metricsToken, r.metricsRegistry.Handler()); err != nil {
return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err)
}
diff --git a/main_admin.go b/main_admin.go
index 75d37c374..d4d42435e 100644
--- a/main_admin.go
+++ b/main_admin.go
@@ -76,6 +76,7 @@ func startAdminFromFlags(
runtimes []*raftGroupRuntime,
dynamoServer *adapter.DynamoDBServer,
s3Server *adapter.S3Server,
+ sqsServer *adapter.SQSServer,
coordinate kv.Coordinator,
connCache *kv.GRPCConnCache,
keyvizSampler *keyviz.MemSampler,
@@ -121,14 +122,124 @@ func startAdminFromFlags(
clusterSrc := newClusterInfoSource(*raftId, buildVersion(), runtimes)
tablesSrc := newDynamoTablesSource(dynamoServer)
bucketsSrc := newBucketsSource(s3Server)
+ queuesSrc := newSqsQueuesSource(sqsServer)
forwarder, err := buildAdminLeaderForwarder(coordinate, connCache, *raftId)
if err != nil {
return errors.Wrap(err, "build admin leader forwarder")
}
- _, err = startAdminServer(ctx, lc, eg, cfg, staticCreds, clusterSrc, tablesSrc, bucketsSrc, forwarder, keyvizSampler, buildVersion())
+ _, err = startAdminServer(ctx, lc, eg, cfg, staticCreds, clusterSrc, tablesSrc, bucketsSrc, queuesSrc, forwarder, keyvizSampler, buildVersion())
return err
}
+// newSqsQueuesSource adapts *adapter.SQSServer to admin.QueuesSource.
+// Same architectural reasoning as newDynamoTablesSource and
+// newBucketsSource: the bridge stays in this file (rather than
+// internal/admin) so the admin package stays free of the heavy
+// adapter-package dependency tree. Returns nil when sqsServer is nil
+// so admin.NewServer leaves /admin/api/v1/sqs/* off the wire.
+func newSqsQueuesSource(sqsServer *adapter.SQSServer) admin.QueuesSource {
+ if sqsServer == nil {
+ return nil
+ }
+ return &sqsQueuesBridge{server: sqsServer}
+}
+
+// sqsQueuesBridge re-shapes adapter.AdminQueueSummary into
+// admin.QueueSummary. The two structs are deliberately isomorphic so
+// this translation does no allocation more than necessary; if a
+// future field is added on one side, the build breaks here, which
+// is the drift signal we want.
+type sqsQueuesBridge struct {
+ server *adapter.SQSServer
+}
+
+func (b *sqsQueuesBridge) AdminListQueues(ctx context.Context) ([]string, error) {
+ return b.server.AdminListQueues(ctx) //nolint:wrapcheck // pure pass-through; adapter owns the error context.
+}
+
+func (b *sqsQueuesBridge) AdminDescribeQueue(ctx context.Context, name string) (*admin.QueueSummary, bool, error) {
+ summary, exists, err := b.server.AdminDescribeQueue(ctx, name)
+ if err != nil {
+ return nil, false, translateAdminQueuesError(err)
+ }
+ if !exists {
+ return nil, false, nil
+ }
+ return convertAdminQueueSummary(summary), true, nil
+}
+
+func (b *sqsQueuesBridge) AdminDeleteQueue(ctx context.Context, principal admin.AuthPrincipal, name string) error {
+ if err := b.server.AdminDeleteQueue(ctx, convertAdminPrincipal(principal), name); err != nil {
+ return translateAdminQueuesError(err)
+ }
+ return nil
+}
+
+// convertAdminQueueSummary mirrors adapter.AdminQueueSummary into
+// admin.QueueSummary. Counter fields are int64 on both sides; if
+// either side grows a field, this function should be extended in the
+// same commit so a compile error catches the drift.
+//
+// CreatedAt collapses a zero time.Time on the adapter side (queues
+// stored before CreatedAtMillis was populated, or never populated)
+// to a nil pointer on the admin side so omitempty actually drops the
+// field at the wire layer. Without this collapse the SPA would see
+// "0001-01-01T00:00:00Z" and render an ancient date.
+func convertAdminQueueSummary(in *adapter.AdminQueueSummary) *admin.QueueSummary {
+ if in == nil {
+ return nil
+ }
+ var createdAt *time.Time
+ if !in.CreatedAt.IsZero() {
+ t := in.CreatedAt
+ createdAt = &t
+ }
+ return &admin.QueueSummary{
+ Name: in.Name,
+ IsFIFO: in.IsFIFO,
+ Generation: in.Generation,
+ CreatedAt: createdAt,
+ Attributes: in.Attributes,
+ Counters: admin.QueueCounters{
+ Visible: in.Counters.Visible,
+ NotVisible: in.Counters.NotVisible,
+ Delayed: in.Counters.Delayed,
+ },
+ }
+}
+
+// translateAdminQueuesError maps the adapter's SQS error vocabulary
+// onto the admin-package sentinels the SQS handler matches against.
+// Anything not recognised is forwarded as-is and answered with 500
+// + a sanitised body, matching the dynamo / s3 bridges' behaviour.
+func translateAdminQueuesError(err error) error {
+ switch {
+ case err == nil:
+ return nil
+ case errors.Is(err, adapter.ErrAdminForbidden):
+ return admin.ErrQueuesForbidden
+ case errors.Is(err, adapter.ErrAdminNotLeader):
+ return admin.ErrQueuesNotLeader
+ case errors.Is(err, adapter.ErrAdminSQSNotFound):
+ return admin.ErrQueuesNotFound
+ case errors.Is(err, adapter.ErrAdminSQSValidation):
+ return admin.ErrQueuesValidation
+ case isLeaderChurnError(err):
+ // Leadership can be lost between AdminDeleteQueue's upfront
+ // isVerifiedSQSLeader check and the coordinator dispatch
+ // inside deleteQueueWithRetry. The kv coordinator surfaces
+ // that as ErrLeaderNotFound / ErrNotLeader (or a wrapped
+ // "not leader" / "leader not found" suffix), and the
+ // retry loop's isRetryableTransactWriteError does not catch
+ // them. Without this arm the error falls to default and the
+ // admin handler renders a generic 500. Mirrors the same arm
+ // in translateAdminTablesError on the Dynamo side.
+ return admin.ErrQueuesNotLeader
+ default:
+ return err //nolint:wrapcheck // forwarded so the handler logs but does not surface it.
+ }
+}
+
// buildAdminLeaderForwarder constructs the production LeaderForwarder
// for the dynamo HTTP handler when the wiring is complete enough to
// reach a remote leader. The bridge tolerates a nil connCache (and a
@@ -450,6 +561,7 @@ func startAdminServer(
cluster admin.ClusterInfoSource,
tables admin.TablesSource,
buckets admin.BucketsSource,
+ queues admin.QueuesSource,
forwarder admin.LeaderForwarder,
keyvizSampler *keyviz.MemSampler,
version string,
@@ -459,7 +571,7 @@ func startAdminServer(
if err != nil || !enabled {
return "", err
}
- server, err := buildAdminHTTPServer(&adminCfg, creds, cluster, tables, buckets, forwarder, keyvizSampler)
+ server, err := buildAdminHTTPServer(&adminCfg, creds, cluster, tables, buckets, queues, forwarder, keyvizSampler)
if err != nil {
return "", err
}
@@ -499,7 +611,7 @@ func checkAdminConfig(adminCfg *admin.Config, cluster admin.ClusterInfoSource) (
return true, nil
}
-func buildAdminHTTPServer(adminCfg *admin.Config, creds map[string]string, cluster admin.ClusterInfoSource, tables admin.TablesSource, buckets admin.BucketsSource, forwarder admin.LeaderForwarder, keyvizSampler *keyviz.MemSampler) (*admin.Server, error) {
+func buildAdminHTTPServer(adminCfg *admin.Config, creds map[string]string, cluster admin.ClusterInfoSource, tables admin.TablesSource, buckets admin.BucketsSource, queues admin.QueuesSource, forwarder admin.LeaderForwarder, keyvizSampler *keyviz.MemSampler) (*admin.Server, error) {
primaryKeys, err := adminCfg.DecodedSigningKeys()
if err != nil {
return nil, errors.Wrap(err, "decode admin signing keys")
@@ -524,6 +636,7 @@ func buildAdminHTTPServer(adminCfg *admin.Config, creds map[string]string, clust
ClusterInfo: cluster,
Tables: tables,
Buckets: buckets,
+ Queues: queues,
Forwarder: forwarder,
KeyViz: keyvizSourceFromSampler(keyvizSampler),
StaticFS: staticFS,
diff --git a/main_admin_test.go b/main_admin_test.go
index 2f83fdf0f..d3e298ef1 100644
--- a/main_admin_test.go
+++ b/main_admin_test.go
@@ -198,7 +198,7 @@ func TestStartAdminServer_DisabledNoOp(t *testing.T) {
eg, ctx := errgroup.WithContext(context.Background())
defer func() { _ = eg.Wait() }()
var lc net.ListenConfig
- _, err := startAdminServer(ctx, &lc, eg, adminListenerConfig{enabled: false}, nil, nil, nil, nil, nil, nil, "")
+ _, err := startAdminServer(ctx, &lc, eg, adminListenerConfig{enabled: false}, nil, nil, nil, nil, nil, nil, nil, "")
require.NoError(t, err)
}
@@ -211,7 +211,7 @@ func TestStartAdminServer_InvalidConfigRejected(t *testing.T) {
listen: "127.0.0.1:0",
// missing signing key
}
- _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, nil, nil, nil, nil, "")
+ _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, nil, nil, nil, nil, nil, "")
require.Error(t, err)
}
@@ -224,7 +224,7 @@ func TestStartAdminServer_NonLoopbackWithoutTLSRejected(t *testing.T) {
listen: "0.0.0.0:0",
sessionSigningKey: freshKey(),
}
- _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, nil, nil, nil, nil, "")
+ _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, nil, nil, nil, nil, nil, "")
require.Error(t, err)
require.Contains(t, err.Error(), "TLS")
}
@@ -238,7 +238,7 @@ func TestStartAdminServer_RejectsMissingClusterSource(t *testing.T) {
listen: "127.0.0.1:0",
sessionSigningKey: freshKey(),
}
- _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, nil, nil, nil, nil, "")
+ _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, nil, nil, nil, nil, nil, "")
require.Error(t, err)
require.Contains(t, err.Error(), "cluster info source")
}
@@ -261,7 +261,7 @@ func TestStartAdminServer_ServesHealthz(t *testing.T) {
cluster := admin.ClusterInfoFunc(func(_ context.Context) (admin.ClusterInfo, error) {
return admin.ClusterInfo{NodeID: "n1", Version: "test"}, nil
})
- addr, err := startAdminServer(eCtx, &lc, eg, cfg, map[string]string{}, cluster, nil, nil, nil, nil, "test")
+ addr, err := startAdminServer(eCtx, &lc, eg, cfg, map[string]string{}, cluster, nil, nil, nil, nil, nil, "test")
require.NoError(t, err)
// Poll /admin/healthz until success or the test deadline.
@@ -304,7 +304,7 @@ func TestStartAdminServer_ServesTLS(t *testing.T) {
cluster := admin.ClusterInfoFunc(func(_ context.Context) (admin.ClusterInfo, error) {
return admin.ClusterInfo{NodeID: "n-tls", Version: "test"}, nil
})
- addr, err := startAdminServer(eCtx, &lc, eg, cfg, map[string]string{}, cluster, nil, nil, nil, nil, "test")
+ addr, err := startAdminServer(eCtx, &lc, eg, cfg, map[string]string{}, cluster, nil, nil, nil, nil, nil, "test")
require.NoError(t, err)
transport := &http.Transport{TLSClientConfig: &tls.Config{
@@ -426,3 +426,62 @@ func TestTranslateAdminTablesError_UnrelatedErrorPassesThrough(t *testing.T) {
require.NotErrorIs(t, out, admin.ErrTablesNotLeader)
require.Equal(t, in, out)
}
+
+// TestTranslateAdminQueuesError_LeaderChurn is the SQS counterpart of
+// TestTranslateAdminTablesError_LeaderChurn. AdminDeleteQueue clears
+// the upfront isVerifiedSQSLeader check but the kv coordinator can
+// still drop leadership inside deleteQueueWithRetry's Dispatch; the
+// resulting ErrLeaderNotFound / ErrNotLeader / wrapped suffixes must
+// classify as 503 leader_unavailable, not the generic 500 fallthrough.
+func TestTranslateAdminQueuesError_LeaderChurn(t *testing.T) {
+ cases := []struct {
+ name string
+ in error
+ }{
+ {"kv.ErrLeaderNotFound", kv.ErrLeaderNotFound},
+ {"adapter.ErrNotLeader", adapter.ErrNotLeader},
+ {"adapter.ErrLeaderNotFound", adapter.ErrLeaderNotFound},
+ {"wrapped not leader", errors.New("dispatch failed: not leader")},
+ {"wrapped leader not found", errors.New("dispatch: leader not found")},
+ {"wrapped leadership lost", errors.New("commit aborted: leadership lost")},
+ {"wrapped leadership transfer", errors.New("retry exhausted: leadership transfer in progress")},
+ }
+ for _, tc := range cases {
+ t.Run(tc.name, func(t *testing.T) {
+ out := translateAdminQueuesError(tc.in)
+ require.ErrorIs(t, out, admin.ErrQueuesNotLeader,
+ "input %q must map to ErrQueuesNotLeader", tc.in)
+ })
+ }
+}
+
+// TestTranslateAdminQueuesError_LeaderPhraseInMiddleOfMessage is the
+// SQS counterpart of the same Tables test — pins that the HasSuffix
+// matcher in isLeaderChurnError does not false-positive on
+// user-supplied error messages that happen to mention a leader
+// phrase mid-string (e.g. a queue name or attribute value that
+// happens to contain "not leader").
+func TestTranslateAdminQueuesError_LeaderPhraseInMiddleOfMessage(t *testing.T) {
+ cases := []string{
+ "not leader: actually a downstream error",
+ "leader not found, but recovered automatically",
+ "leadership lost mid-snapshot, retried successfully",
+ }
+ for _, msg := range cases {
+ t.Run(msg, func(t *testing.T) {
+ out := translateAdminQueuesError(errors.New(msg))
+ require.NotErrorIs(t, out, admin.ErrQueuesNotLeader,
+ "mid-message leader phrase %q must not classify as leader-churn", msg)
+ })
+ }
+}
+
+// TestTranslateAdminQueuesError_UnrelatedErrorPassesThrough confirms
+// the leader-churn detector does not swallow unrelated errors that
+// happen to mention the word "leader" outside the canonical phrases.
+func TestTranslateAdminQueuesError_UnrelatedErrorPassesThrough(t *testing.T) {
+ in := errors.New("team leader misconfigured")
+ out := translateAdminQueuesError(in)
+ require.NotErrorIs(t, out, admin.ErrQueuesNotLeader)
+ require.Equal(t, in, out)
+}
diff --git a/main_sqs.go b/main_sqs.go
index 55ca41684..7bb8623e0 100644
--- a/main_sqs.go
+++ b/main_sqs.go
@@ -11,6 +11,11 @@ import (
"golang.org/x/sync/errgroup"
)
+// startSQSServer stands up the SQS adapter on sqsAddr and returns the
+// running *adapter.SQSServer so the admin listener can call SigV4-bypass
+// admin entrypoints against it (see adapter/sqs_admin.go). Returns
+// (nil, nil) when sqsAddr is empty — that is the "SQS disabled" branch
+// and the admin listener leaves /admin/api/v1/sqs/* off the wire.
func startSQSServer(
ctx context.Context,
lc *net.ListenConfig,
@@ -21,19 +26,19 @@ func startSQSServer(
leaderSQS map[string]string,
region string,
credentialsFile string,
-) error {
+) (*adapter.SQSServer, error) {
sqsAddr = strings.TrimSpace(sqsAddr)
if sqsAddr == "" {
- return nil
+ return nil, nil
}
sqsL, err := lc.Listen(ctx, "tcp", sqsAddr)
if err != nil {
- return errors.Wrapf(err, "failed to listen on %s", sqsAddr)
+ return nil, errors.Wrapf(err, "failed to listen on %s", sqsAddr)
}
staticCreds, err := loadSigV4StaticCredentialsFile(credentialsFile, "sqs")
if err != nil {
_ = sqsL.Close()
- return err
+ return nil, err
}
sqsServer := adapter.NewSQSServer(
sqsL,
@@ -63,5 +68,5 @@ func startSQSServer(
}
return errors.WithStack(err)
})
- return nil
+ return sqsServer, nil
}
diff --git a/web/admin/src/App.tsx b/web/admin/src/App.tsx
index 90272b146..08311dfcf 100644
--- a/web/admin/src/App.tsx
+++ b/web/admin/src/App.tsx
@@ -9,6 +9,8 @@ import { LoginPage } from "./pages/Login";
import { NotFoundPage } from "./pages/NotFound";
import { S3DetailPage } from "./pages/S3Detail";
import { S3ListPage } from "./pages/S3List";
+import { SqsDetailPage } from "./pages/SqsDetail";
+import { SqsListPage } from "./pages/SqsList";
export function App() {
return (
@@ -25,6 +27,8 @@ export function App() {
} />
} />
} />
+ } />
+ } />
} />
} />
} />
diff --git a/web/admin/src/api/client.ts b/web/admin/src/api/client.ts
index c3ec92cdf..ee1dcea40 100644
--- a/web/admin/src/api/client.ts
+++ b/web/admin/src/api/client.ts
@@ -193,6 +193,29 @@ export interface CreateBucketRequest {
acl?: "private" | "public-read";
}
+// SQS queue admin DTOs (Section 16.2 of the SQS partial design doc).
+// `attributes` mirrors the AWS GetQueueAttributes "All" set with
+// snake_case keys; `counters` is the typed projection of the three
+// Approximate* counters added in Phase 3.A.
+export interface SqsQueueCounters {
+ visible: number;
+ not_visible: number;
+ delayed: number;
+}
+
+export interface SqsQueueSummary {
+ name: string;
+ is_fifo: boolean;
+ generation: number;
+ created_at?: string;
+ attributes?: Record;
+ counters: SqsQueueCounters;
+}
+
+export interface SqsQueueList {
+ queues: string[];
+}
+
export const api = {
login: (access_key: string, secret_key: string) =>
apiFetch("/auth/login", {
@@ -223,4 +246,10 @@ export const api = {
}),
deleteBucket: (name: string) =>
apiFetch(`/s3/buckets/${encodeURIComponent(name)}`, { method: "DELETE" }),
+ listQueues: (signal?: AbortSignal) =>
+ apiFetch("/sqs/queues", { signal }),
+ describeQueue: (name: string, signal?: AbortSignal) =>
+ apiFetch(`/sqs/queues/${encodeURIComponent(name)}`, { signal }),
+ deleteQueue: (name: string) =>
+ apiFetch(`/sqs/queues/${encodeURIComponent(name)}`, { method: "DELETE" }),
};
diff --git a/web/admin/src/components/Layout.tsx b/web/admin/src/components/Layout.tsx
index 80e6641da..26620cdb8 100644
--- a/web/admin/src/components/Layout.tsx
+++ b/web/admin/src/components/Layout.tsx
@@ -4,6 +4,7 @@ import { useAuth } from "../auth";
const navItems: { to: string; label: string; end?: boolean }[] = [
{ to: "/", label: "Overview", end: true },
{ to: "/dynamo", label: "DynamoDB" },
+ { to: "/sqs", label: "SQS" },
{ to: "/s3", label: "S3" },
];
diff --git a/web/admin/src/pages/SqsDetail.tsx b/web/admin/src/pages/SqsDetail.tsx
new file mode 100644
index 000000000..2ad32c7ce
--- /dev/null
+++ b/web/admin/src/pages/SqsDetail.tsx
@@ -0,0 +1,147 @@
+import { useState } from "react";
+import { Link, useNavigate, useParams } from "react-router-dom";
+import { api } from "../api/client";
+import { Modal } from "../components/Modal";
+import { formatApiError, useApiQuery } from "../lib/useApi";
+
+export function SqsDetailPage() {
+ const { name = "" } = useParams<{ name: string }>();
+ const detail = useApiQuery((signal) => api.describeQueue(name, signal), [name]);
+ const [confirmDelete, setConfirmDelete] = useState(false);
+ const [deleting, setDeleting] = useState(false);
+ const [deleteError, setDeleteError] = useState(null);
+ const navigate = useNavigate();
+ // The delete button is gated by the backend's live-role check
+ // (internal/admin/sqs_handler.go principalForWrite), not the JWT
+ // role cached in this session. A JWT minted as read_only stays
+ // read_only in the cookie until logout, but the operator may have
+ // been promoted to full in the live role store after login — so
+ // gating the button on session.role would hide it for users who
+ // are currently authorized. A read_only operator who clicks delete
+ // gets a 403 from the backend, surfaced in the modal's error area.
+
+ const onDelete = async () => {
+ setDeleting(true);
+ setDeleteError(null);
+ try {
+ await api.deleteQueue(name);
+ navigate("/sqs", { replace: true });
+ } catch (err) {
+ setDeleteError(formatApiError(err));
+ setDeleting(false);
+ }
+ };
+
+ return (
+
+ Permanently delete {name}? All messages
+ will be removed and the queue cannot be recovered.
+
+ {deleteError &&
{deleteError}
}
+
+
+
+
+
+
+ );
+}
+
+function CounterCard({ label, value }: { label: string; value: number }) {
+ return (
+
+
{label}
+
{value}
+
+ );
+}
diff --git a/web/admin/src/pages/SqsList.tsx b/web/admin/src/pages/SqsList.tsx
new file mode 100644
index 000000000..f5b4b5517
--- /dev/null
+++ b/web/admin/src/pages/SqsList.tsx
@@ -0,0 +1,68 @@
+import { Link } from "react-router-dom";
+import { api } from "../api/client";
+import { formatApiError, useApiQuery } from "../lib/useApi";
+
+export function SqsListPage() {
+ const queues = useApiQuery((signal) => api.listQueues(signal), []);
+
+ return (
+
+
+
+
SQS queues
+
+ List, describe, and delete SQS queues. Detail pages also surface
+ the approximate visible / in-flight / delayed message counts.
+
+
+
+
+
+
+ {queues.loading &&
Loading…
}
+ {queues.error?.status === 404 && (
+
+ SQS admin endpoints not wired on this build (the cluster was started
+ without --sqsAddress, so the
+ admin listener leaves /admin/api/v1/sqs/*
+ off the wire).
+