diff --git a/adapter/sqs_admin.go b/adapter/sqs_admin.go new file mode 100644 index 000000000..0788ff31a --- /dev/null +++ b/adapter/sqs_admin.go @@ -0,0 +1,183 @@ +package adapter + +import ( + "context" + "strconv" + "strings" + "time" + + "github.com/cockroachdb/errors" +) + +// AdminQueueSummary is the per-queue projection the admin dashboard +// surfaces. It deliberately covers only the fields the SPA renders so +// the package's wire-format types stay internal. +// +// Counters mirror the AWS Approximate* attribute set produced by +// scanApproxCounters; they are best-effort by AWS contract and stop +// counting once the catalog's per-call cap is reached (the SPA polls +// continuously, so an unbounded scan would pin the leader). +type AdminQueueSummary struct { + Name string + IsFIFO bool + Generation uint64 + CreatedAt time.Time + Attributes map[string]string + Counters AdminQueueCounters +} + +// AdminQueueCounters matches sqsApproxCounters (int64) so the admin +// bridge does not have to convert between widths. Visible / +// NotVisible / Delayed are the AWS Approximate* triple. +type AdminQueueCounters struct { + Visible int64 + NotVisible int64 + Delayed int64 +} + +// AdminListQueues returns every queue name this server knows about, +// in the lexicographic order the queue catalog index produces. Read +// path; runs on follower or leader and uses the same scanQueueNames +// helper the SigV4 ListQueues handler does. +func (s *SQSServer) AdminListQueues(ctx context.Context) ([]string, error) { + return s.scanQueueNames(ctx) //nolint:wrapcheck // pure pass-through; the adapter owns the error context. +} + +// AdminDescribeQueue returns a snapshot of name's metadata plus the +// approximate counters. The triple (result, present, error) lets +// admin callers distinguish a missing queue from a storage error +// without sniffing sentinels. +// +// Like AdminDescribeTable on the Dynamo side, this entrypoint runs +// on either the leader or a follower (read-only); the counter scan +// uses a fresh nextTxnReadTS so the result is consistent with what +// SigV4 GetQueueAttributes would have returned at the same instant. +func (s *SQSServer) AdminDescribeQueue(ctx context.Context, name string) (*AdminQueueSummary, bool, error) { + if strings.TrimSpace(name) == "" { + return nil, false, ErrAdminSQSValidation + } + readTS := s.nextTxnReadTS(ctx) + meta, exists, err := s.loadQueueMetaAt(ctx, name, readTS) + if err != nil { + return nil, false, errors.WithStack(err) + } + if !exists { + return nil, false, nil + } + counters, err := s.scanApproxCounters(ctx, name, meta.Generation, readTS) + if err != nil { + return nil, false, err + } + return adminQueueSummary(name, meta, counters, s.queueArn(name)), true, nil +} + +// adminQueueSummary projects a queue meta + counters into the +// SPA-facing AdminQueueSummary. CreatedAt comes from the canonical +// wall-clock CreatedAtMillis (not CreatedAtHLC, which the meta's own +// comment calls "unsuitable for wall-clock display"); a zero millis +// value yields a zero time.Time so the JSON omitempty drops the field +// and the SPA renders "—" instead of an HLC-derived 1970 epoch. +// queueArn is threaded in by the caller (AdminDescribeQueue) because +// the server's region lives on *SQSServer and the helper itself is +// kept method-free for unit-testability without a coordinator. +// Pulled into a helper so the conversion is unit-testable without +// standing up a full coordinator. +func adminQueueSummary(name string, meta *sqsQueueMeta, counters sqsApproxCounters, queueArn string) *AdminQueueSummary { + var createdAt time.Time + if meta.CreatedAtMillis > 0 { + createdAt = time.UnixMilli(meta.CreatedAtMillis).UTC() + } + return &AdminQueueSummary{ + Name: name, + IsFIFO: meta.IsFIFO, + Generation: meta.Generation, + CreatedAt: createdAt, + Attributes: metaAttributesForAdmin(meta, queueArn), + Counters: AdminQueueCounters(counters), + } +} + +// AdminDeleteQueue is the SigV4-bypass counterpart to deleteQueue. +// Returns the same sentinel errors as AdminCreateTable on the Dynamo +// side: ErrAdminForbidden on a read-only principal, ErrAdminNotLeader +// on a follower, ErrAdminSQSNotFound when the queue is absent. +func (s *SQSServer) AdminDeleteQueue(ctx context.Context, principal AdminPrincipal, name string) error { + if !principal.Role.canWrite() { + return ErrAdminForbidden + } + if !isVerifiedSQSLeader(s.coordinator) { + return ErrAdminNotLeader + } + if strings.TrimSpace(name) == "" { + return ErrAdminSQSValidation + } + if err := s.deleteQueueWithRetry(ctx, name); err != nil { + // deleteQueueWithRetry returns sqsAPIError with + // sqsErrQueueDoesNotExist when the queue is missing; map + // to the structured ErrAdminSQSNotFound so the admin + // handler can render 404 without sniffing the AWS code. + if isSQSAdminQueueDoesNotExist(err) { + return ErrAdminSQSNotFound + } + return errors.Wrap(err, "admin delete queue") + } + return nil +} + +// metaAttributesForAdmin renders the non-counter queue config +// attributes. Mirrors queueMetaToAttributes("All") (sqs_catalog.go) +// except for two deliberate omissions: +// +// - The Approximate* counters — the admin summary surfaces them as +// the typed AdminQueueCounters struct alongside this map, so the +// SPA can render them without round-tripping strings. +// - CreatedTimestamp — surfaced as the typed AdminQueueSummary.CreatedAt +// field for the same reason. +// +// LastModifiedTimestamp stays in the map (SetQueueAttributes updates +// LastModifiedAtMillis and operators need it for change-tracking; +// there is no dedicated typed field for it). QueueArn is included so +// the SPA can show the AWS-shaped identifier without recomputing it +// client-side. +func metaAttributesForAdmin(meta *sqsQueueMeta, queueArn string) map[string]string { + out := map[string]string{ + "QueueArn": queueArn, + "VisibilityTimeout": strconv.FormatInt(meta.VisibilityTimeoutSeconds, 10), + "MessageRetentionPeriod": strconv.FormatInt(meta.MessageRetentionSeconds, 10), + "DelaySeconds": strconv.FormatInt(meta.DelaySeconds, 10), + "ReceiveMessageWaitTimeSeconds": strconv.FormatInt(meta.ReceiveMessageWaitSeconds, 10), + "MaximumMessageSize": strconv.FormatInt(meta.MaximumMessageSize, 10), + "FifoQueue": strconv.FormatBool(meta.IsFIFO), + "ContentBasedDeduplication": strconv.FormatBool(meta.ContentBasedDedup), + } + if mod := meta.LastModifiedAtMillis; mod > 0 { + out["LastModifiedTimestamp"] = strconv.FormatInt(mod/sqsMillisPerSecond, 10) + } + if meta.RedrivePolicy != "" { + out["RedrivePolicy"] = meta.RedrivePolicy + } + return out +} + +// ErrAdminSQSValidation is returned when an admin entrypoint receives +// a request with a missing or syntactically-bad queue name. Maps to +// 400 in the admin HTTP handler. +var ErrAdminSQSValidation = errors.New("sqs admin: invalid queue name") + +// ErrAdminSQSNotFound is returned by write entrypoints when the +// target queue does not exist. Maps to 404. The describe path uses +// the (nil, false, nil) tuple instead of this sentinel for the +// not-found signal, mirroring AdminDescribeTable. +var ErrAdminSQSNotFound = errors.New("sqs admin: queue not found") + +// isSQSAdminQueueDoesNotExist matches the deleteQueueWithRetry path's +// "queue does not exist" sqsAPIError so AdminDeleteQueue can normalise +// it to ErrAdminSQSNotFound. Falls through to false on any unrelated +// error, which AdminDeleteQueue then wraps and propagates. +func isSQSAdminQueueDoesNotExist(err error) bool { + var apiErr *sqsAPIError + if !errors.As(err, &apiErr) || apiErr == nil { + return false + } + return apiErr.errorType == sqsErrQueueDoesNotExist +} diff --git a/adapter/sqs_admin_test.go b/adapter/sqs_admin_test.go new file mode 100644 index 000000000..c91bf4f53 --- /dev/null +++ b/adapter/sqs_admin_test.go @@ -0,0 +1,124 @@ +package adapter + +import ( + "strconv" + "testing" + "time" +) + +const testQueueArn = "arn:aws:sqs:us-east-1:000000000000:orders" + +// TestAdminQueueSummary_CreatedAtUsesMillisNotHLC pins the +// invariant that the admin AdminDescribeQueue path derives +// CreatedAt from sqsQueueMeta.CreatedAtMillis (the canonical +// wall-clock field), not from hlcToTime(CreatedAtHLC) — the meta +// struct documents HLC as "unsuitable for wall-clock display" and +// the SigV4 path (sqs_catalog.go:942) reads CreatedAtMillis. Two +// failure modes the test pins: +// +// 1. CreatedAtMillis == 0 must yield a zero time.Time so the JSON +// encoder's omitempty drops the field and the SPA renders "—" +// rather than the HLC-derived 1970-01-01T00:00:00Z. +// 2. CreatedAtMillis > 0 must round-trip through time.UnixMilli in +// UTC. +func TestAdminQueueSummary_CreatedAtUsesMillisNotHLC(t *testing.T) { + t.Parallel() + + t.Run("zero millis yields zero time even with HLC populated", func(t *testing.T) { + t.Parallel() + meta := sqsQueueMeta{ + Name: "orders", + Generation: 1, + CreatedAtHLC: 42 << s3HLCPhysicalShift, // would render as ~1970 epoch via hlcToTime + // CreatedAtMillis intentionally zero + } + summary := adminQueueSummary("orders", &meta, sqsApproxCounters{}, testQueueArn) + if !summary.CreatedAt.IsZero() { + t.Fatalf("CreatedAt should be zero when CreatedAtMillis==0; got %v", summary.CreatedAt) + } + }) + + t.Run("positive millis round-trips via time.UnixMilli UTC", func(t *testing.T) { + t.Parallel() + const wantMillis int64 = 1_724_419_200_000 // 2024-08-23T12:00:00Z + meta := sqsQueueMeta{ + Name: "orders", + Generation: 2, + CreatedAtMillis: wantMillis, + CreatedAtHLC: 1, // must be ignored + } + summary := adminQueueSummary("orders", &meta, sqsApproxCounters{}, testQueueArn) + want := time.UnixMilli(wantMillis).UTC() + if !summary.CreatedAt.Equal(want) { + t.Fatalf("CreatedAt=%v want=%v", summary.CreatedAt, want) + } + if summary.CreatedAt.Location() != time.UTC { + t.Fatalf("CreatedAt location=%v want UTC", summary.CreatedAt.Location()) + } + }) +} + +// TestMetaAttributesForAdmin_IncludesQueueArnAndLastModified pins +// the parity contract between metaAttributesForAdmin and +// queueMetaToAttributes("All"): QueueArn (the AWS-shaped identifier +// the SPA shows for change-tracking) and LastModifiedTimestamp +// (updated on SetQueueAttributes — the only handle operators have +// on "when did somebody last touch this queue's config") must both +// be present. +func TestMetaAttributesForAdmin_IncludesQueueArnAndLastModified(t *testing.T) { + t.Parallel() + + t.Run("QueueArn always present", func(t *testing.T) { + t.Parallel() + meta := sqsQueueMeta{Name: "orders", Generation: 1} + attrs := metaAttributesForAdmin(&meta, testQueueArn) + got, ok := attrs["QueueArn"] + if !ok { + t.Fatalf("QueueArn missing from attributes: %v", attrs) + } + if got != testQueueArn { + t.Fatalf("QueueArn=%q want=%q", got, testQueueArn) + } + }) + + t.Run("LastModifiedTimestamp emitted in unix seconds when populated", func(t *testing.T) { + t.Parallel() + const wantMillis int64 = 1_724_419_200_000 // 2024-08-23T12:00:00Z + meta := sqsQueueMeta{ + Name: "orders", + Generation: 1, + LastModifiedAtMillis: wantMillis, + } + attrs := metaAttributesForAdmin(&meta, testQueueArn) + got, ok := attrs["LastModifiedTimestamp"] + if !ok { + t.Fatalf("LastModifiedTimestamp missing from attributes: %v", attrs) + } + want := strconv.FormatInt(wantMillis/sqsMillisPerSecond, 10) + if got != want { + t.Fatalf("LastModifiedTimestamp=%q want=%q (unix seconds)", got, want) + } + }) + + t.Run("LastModifiedTimestamp omitted when zero", func(t *testing.T) { + t.Parallel() + meta := sqsQueueMeta{Name: "orders", Generation: 1} + attrs := metaAttributesForAdmin(&meta, testQueueArn) + if _, ok := attrs["LastModifiedTimestamp"]; ok { + t.Fatalf("LastModifiedTimestamp should be omitted when zero: got %q", attrs["LastModifiedTimestamp"]) + } + }) + + t.Run("CreatedTimestamp deliberately not in map (typed field instead)", func(t *testing.T) { + t.Parallel() + meta := sqsQueueMeta{ + Name: "orders", + Generation: 1, + CreatedAtMillis: 1_724_419_200_000, + } + attrs := metaAttributesForAdmin(&meta, testQueueArn) + if _, ok := attrs["CreatedTimestamp"]; ok { + t.Fatalf("CreatedTimestamp must NOT be in attrs (it lives on AdminQueueSummary.CreatedAt): got %q", attrs["CreatedTimestamp"]) + } + }) +} diff --git a/internal/admin/server.go b/internal/admin/server.go index 652a9fd84..c1e1587f7 100644 --- a/internal/admin/server.go +++ b/internal/admin/server.go @@ -61,6 +61,14 @@ type ServerDeps struct { // off" state instead of an empty matrix. KeyViz KeyVizSource + // Queues is the SQS admin source — covers list, describe, and + // delete via QueuesSource. Optional: a nil value disables + // /admin/api/v1/sqs/queues{,/{name}} (the mux answers them + // with 404). Same opt-in shape as Tables / Buckets; deployments + // that don't run the SQS adapter omit this without breaking the + // rest of the admin surface. + Queues QueuesSource + // StaticFS is the embed.FS (or any fs.FS) backing the SPA. May be // nil during early development; the router renders 404 for // /admin/assets/* and the SPA fallback in that case. @@ -112,7 +120,8 @@ func NewServer(deps ServerDeps) (*Server, error) { // nil it serves a 503 keyviz_disabled, which the SPA renders as // a clearer "feature off" state than an unknown_endpoint 404. keyviz := NewKeyVizHandler(deps.KeyViz).WithLogger(logger) - mux := buildAPIMux(auth, deps.Verifier, cluster, dynamo, s3, keyviz, logger) + sqs := buildSqsHandlerForDeps(deps, logger) + mux := buildAPIMux(auth, deps.Verifier, cluster, dynamo, s3, keyviz, sqs, logger) router := NewRouter(mux, deps.StaticFS) return &Server{deps: deps, router: router, auth: auth, mux: mux}, nil } @@ -177,6 +186,20 @@ func buildS3HandlerForDeps(deps ServerDeps, logger *slog.Logger) http.Handler { return NewS3Handler(deps.Buckets).WithLogger(logger) } +// buildSqsHandlerForDeps is the parallel constructor for the SQS +// admin handler. Read paths are open to any session; the DELETE +// path re-evaluates the principal's role against the live MapRoleStore +// on every request, so a downgraded key cannot keep mutating with a +// still-valid JWT. +func buildSqsHandlerForDeps(deps ServerDeps, logger *slog.Logger) http.Handler { + if deps.Queues == nil { + return nil + } + return NewSqsHandler(deps.Queues). + WithLogger(logger). + WithRoleStore(MapRoleStore(deps.Roles)) +} + // Handler returns an http.Handler that serves the full admin surface. // We wrap the router in BodyLimit at the top level so every endpoint // — including /admin/healthz and the static asset / SPA paths — is @@ -215,14 +238,14 @@ func (s *Server) APIHandler() http.Handler { // audit path inside AuthService because the generic Audit middleware // cannot see the claimed actor at that point in the chain. // -// dynamoHandler / s3Handler may be nil; in that case the corresponding -// paths fall through to the unknown-endpoint 404, matching the -// behaviour of any other unregistered admin path. +// dynamoHandler / s3Handler / sqsHandler may be nil; in that case +// the corresponding paths fall through to the unknown-endpoint 404, +// matching the behaviour of any other unregistered admin path. // // keyvizHandler is always non-nil even when the sampler is disabled — // it serves 503 keyviz_disabled itself so the SPA gets a clearer // signal than an unknown_endpoint 404 from the catch-all. -func buildAPIMux(auth *AuthService, verifier *Verifier, clusterHandler, dynamoHandler, s3Handler, keyvizHandler http.Handler, logger *slog.Logger) http.Handler { +func buildAPIMux(auth *AuthService, verifier *Verifier, clusterHandler, dynamoHandler, s3Handler, keyvizHandler, sqsHandler http.Handler, logger *slog.Logger) http.Handler { loginHandler := http.HandlerFunc(auth.HandleLogin) logoutHandler := http.HandlerFunc(auth.HandleLogout) @@ -290,6 +313,14 @@ func buildAPIMux(auth *AuthService, verifier *Verifier, clusterHandler, dynamoHa if s3Handler != nil { s3Chain = protect(s3Handler) } + // SQS endpoints share the same protect chain rationale: GET + // reads are session-gated to keep cross-site fetches from + // enumerating queue names; DELETE goes through CSRF + the + // in-handler RoleFull check inside SqsHandler. + var sqsChain http.Handler + if sqsHandler != nil { + sqsChain = protect(sqsHandler) + } routes := apiRouteTable{ login: loginChain, @@ -298,6 +329,7 @@ func buildAPIMux(auth *AuthService, verifier *Verifier, clusterHandler, dynamoHa dynamo: dynamoChain, s3: s3Chain, keyviz: keyvizChain, + sqs: sqsChain, } return http.HandlerFunc(routes.dispatch) } @@ -309,29 +341,55 @@ func buildAPIMux(auth *AuthService, verifier *Verifier, clusterHandler, dynamoHa // would otherwise push buildAPIMux's branch count past the limit. type apiRouteTable struct { login, logout, cluster http.Handler - dynamo, s3 http.Handler + dynamo, s3, sqs http.Handler keyviz http.Handler } // dispatch is the receiver method httpHandlerFunc adapts. Logic is -// the same path-prefix switch the call site previously inlined. +// the same path-prefix switch the call site previously inlined; the +// resource-prefix half of it lives in resourceHandlerFor so this +// function stays under the cyclop ceiling as new resources land. func (t apiRouteTable) dispatch(w http.ResponseWriter, r *http.Request) { - switch { - case r.URL.Path == "/admin/api/v1/auth/login": + switch r.URL.Path { + case "/admin/api/v1/auth/login": t.login.ServeHTTP(w, r) - case r.URL.Path == "/admin/api/v1/auth/logout": + return + case "/admin/api/v1/auth/logout": t.logout.ServeHTTP(w, r) - case r.URL.Path == "/admin/api/v1/cluster": + return + case "/admin/api/v1/cluster": t.cluster.ServeHTTP(w, r) - case r.URL.Path == "/admin/api/v1/keyviz/matrix": - t.keyviz.ServeHTTP(w, r) - case t.dynamo != nil && isDynamoPath(r.URL.Path): - t.dynamo.ServeHTTP(w, r) - case t.s3 != nil && isS3Path(r.URL.Path): - t.s3.ServeHTTP(w, r) + return + } + if h := t.resourceHandlerFor(r.URL.Path); h != nil { + h.ServeHTTP(w, r) + return + } + writeJSONError(w, http.StatusNotFound, "unknown_endpoint", + "no admin API handler is registered for this path") +} + +// resourceHandlerFor returns the handler that owns the URL path's +// resource family, or nil when no resource matches. Pulled out of +// dispatch so dispatch stays under cyclop=10 even as new admin +// resources (Dynamo, S3, SQS, KeyViz, future) get added. +// +// KeyViz is *always* registered (the constructor wires a non-nil +// handler that itself emits 503 keyviz_disabled when the underlying +// sampler is nil), so the switch matches against an exact path +// equality and never against a nil receiver. +func (t apiRouteTable) resourceHandlerFor(path string) http.Handler { + switch { + case t.keyviz != nil && path == "/admin/api/v1/keyviz/matrix": + return t.keyviz + case t.dynamo != nil && isDynamoPath(path): + return t.dynamo + case t.s3 != nil && isS3Path(path): + return t.s3 + case t.sqs != nil && isSqsPath(path): + return t.sqs default: - writeJSONError(w, http.StatusNotFound, "unknown_endpoint", - "no admin API handler is registered for this path") + return nil } } @@ -343,6 +401,10 @@ func isS3Path(p string) bool { return p == pathS3Buckets || strings.HasPrefix(p, pathPrefixS3Buckets) } +func isSqsPath(p string) bool { + return p == pathSqsQueues || strings.HasPrefix(p, pathPrefixSqsQueues) +} + func errMissing(field string) error { return &missingDepError{field: field} } diff --git a/internal/admin/sqs_handler.go b/internal/admin/sqs_handler.go new file mode 100644 index 000000000..f30877627 --- /dev/null +++ b/internal/admin/sqs_handler.go @@ -0,0 +1,305 @@ +package admin + +import ( + "context" + "errors" + "log/slog" + "net/http" + "strconv" + "strings" + "time" + + "github.com/goccy/go-json" +) + +// pathSqsQueues is the URL prefix the SQS handler owns. The "" suffix +// produces the collection root /admin/api/v1/sqs/queues; the +// pathPrefixSqsQueues form is used for the per-queue routes. +const ( + pathSqsQueues = "/admin/api/v1/sqs/queues" + pathPrefixSqsQueues = pathSqsQueues + "/" +) + +// QueueSummary is the SPA-facing projection of a single SQS queue. +// Mirrors adapter.AdminQueueSummary 1:1; the bridge in main_admin.go +// translates between the two so the admin package stays free of the +// adapter dependency tree. +// +// CreatedAt is a pointer so omitempty actually drops the field when +// the underlying queue has no wall-clock creation timestamp. Both +// encoding/json and goccy/go-json serialise a zero time.Time value +// as "0001-01-01T00:00:00Z" rather than dropping it, so the SPA +// would render an ancient date instead of the "—" placeholder its +// `created_at ? formatted : "—"` guard implies. The pointer makes +// the absent-vs-zero distinction explicit on the wire. +type QueueSummary struct { + Name string `json:"name"` + IsFIFO bool `json:"is_fifo"` + Generation uint64 `json:"generation"` + CreatedAt *time.Time `json:"created_at,omitempty"` + Attributes map[string]string `json:"attributes,omitempty"` + Counters QueueCounters `json:"counters"` +} + +// QueueCounters mirrors the three Approximate* counters AWS exposes +// on GetQueueAttributes. Definitions follow §16.1 of the SQS design +// doc. +type QueueCounters struct { + Visible int64 `json:"visible"` + NotVisible int64 `json:"not_visible"` + Delayed int64 `json:"delayed"` +} + +// QueuesSource is the contract the SQS handler depends on. Wired in +// production to *adapter.SQSServer via a small bridge in main_admin.go; +// tests use a stub. +// +// AdminDescribeQueue returns (nil, false, nil) for a missing queue so +// callers can distinguish "not found" from a storage error without +// sniffing sentinels. AdminDeleteQueue returns the structured +// sentinels below so the handler can map them to HTTP statuses +// without leaking the adapter's error vocabulary. +type QueuesSource interface { + AdminListQueues(ctx context.Context) ([]string, error) + AdminDescribeQueue(ctx context.Context, name string) (*QueueSummary, bool, error) + AdminDeleteQueue(ctx context.Context, principal AuthPrincipal, name string) error +} + +// Errors the QueuesSource may return for the handler to map onto a +// specific HTTP response. Sentinels rather than typed errors so the +// bridge can map any adapter-internal failure onto exactly one of +// these without the admin package importing adapter-private types. +var ( + // ErrQueuesForbidden — principal lacks the role required (403). + ErrQueuesForbidden = errors.New("admin sqs: principal lacks required role") + // ErrQueuesNotLeader — local node is not the verified Raft + // leader. Without follower-forwarding wired (out of scope for + // the SPA's read+delete surface), maps to 503 + Retry-After: 1. + ErrQueuesNotLeader = errors.New("admin sqs: local node is not the raft leader") + // ErrQueuesNotFound — DELETE / DESCRIBE targets a queue that + // does not exist (404). The describe path uses (nil, false, nil) + // instead of this sentinel for the not-found signal. + ErrQueuesNotFound = errors.New("admin sqs: queue not found") + // ErrQueuesValidation — request shape is bad (400). + ErrQueuesValidation = errors.New("admin sqs: validation failed") +) + +// SqsHandler serves /admin/api/v1/sqs/queues and +// /admin/api/v1/sqs/queues/{name}. Reads (list, describe) accept GET; +// delete accepts DELETE and goes through the same protected +// middleware chain (BodyLimit -> SessionAuth -> Audit -> CSRF) as +// every other write surface, with an in-handler RoleFull gate so a +// read-only key cannot delete even with a valid CSRF token. +type SqsHandler struct { + source QueuesSource + roles RoleStore + logger *slog.Logger +} + +// NewSqsHandler binds the source and seeds logging with +// slog.Default(). Use WithLogger to attach a tagged logger and +// WithRoleStore to plug in the live access-key role lookup so a +// downgraded key cannot continue mutating with a still-valid JWT. +func NewSqsHandler(source QueuesSource) *SqsHandler { + return &SqsHandler{source: source, logger: slog.Default()} +} + +// WithLogger overrides the default slog destination. No-ops on nil to +// preserve the constructor-seeded slog.Default(). +func (h *SqsHandler) WithLogger(l *slog.Logger) *SqsHandler { + if l == nil { + return h + } + h.logger = l + return h +} + +// WithRoleStore enables per-request role revalidation on the delete +// endpoint. Without it, the handler trusts whatever role is embedded +// in the session JWT — which is fine for single-tenant deployments +// where the role config never changes, but problematic when an +// operator revokes or downgrades a key. Production wiring in +// main_admin.go always sets this. +func (h *SqsHandler) WithRoleStore(r RoleStore) *SqsHandler { + h.roles = r + return h +} + +// ServeHTTP routes /queues and /queues/{name}. Method handling +// mirrors DynamoHandler — keep the two parallel so an operator +// reading one understands the other for free. +func (h *SqsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + switch { + case r.URL.Path == pathSqsQueues: + switch r.Method { + case http.MethodGet: + h.handleList(w, r) + default: + writeJSONError(w, http.StatusMethodNotAllowed, "method_not_allowed", "only GET") + } + case strings.HasPrefix(r.URL.Path, pathPrefixSqsQueues): + name := strings.TrimPrefix(r.URL.Path, pathPrefixSqsQueues) + switch r.Method { + case http.MethodGet: + h.handleDescribe(w, r, name) + case http.MethodDelete: + h.handleDelete(w, r, name) + default: + writeJSONError(w, http.StatusMethodNotAllowed, "method_not_allowed", "only GET or DELETE") + } + default: + writeJSONError(w, http.StatusNotFound, "unknown_endpoint", + "no admin SQS handler is registered for this path") + } +} + +type listQueuesResponse struct { + Queues []string `json:"queues"` +} + +func (h *SqsHandler) handleList(w http.ResponseWriter, r *http.Request) { + names, err := h.source.AdminListQueues(r.Context()) + if err != nil { + h.logger.LogAttrs(r.Context(), slog.LevelError, "admin sqs list queues failed", + slog.String("error", err.Error()), + ) + writeJSONError(w, http.StatusInternalServerError, "list_failed", + "failed to list queues; see server logs") + return + } + // Force the empty-result case to render as `{"queues": []}` rather + // than `{"queues": null}`. The SPA iterates the array directly and + // would crash on null. AdminListQueues returns nil when no queues + // exist, so the normalisation has to happen here before encoding. + if names == nil { + names = []string{} + } + w.Header().Set("Content-Type", "application/json; charset=utf-8") + w.Header().Set("Cache-Control", "no-store") + w.WriteHeader(http.StatusOK) + if err := json.NewEncoder(w).Encode(listQueuesResponse{Queues: names}); err != nil { + h.logger.LogAttrs(r.Context(), slog.LevelWarn, "admin sqs list response encode failed", + slog.String("error", err.Error()), + ) + } +} + +func (h *SqsHandler) handleDescribe(w http.ResponseWriter, r *http.Request, name string) { + if strings.TrimSpace(name) == "" { + writeJSONError(w, http.StatusBadRequest, "invalid_queue_name", "queue name is required") + return + } + summary, exists, err := h.source.AdminDescribeQueue(r.Context(), name) + if err != nil { + writeQueuesError(w, err, h.logger, r) + return + } + if !exists { + writeJSONError(w, http.StatusNotFound, "queue_not_found", + "no queue is registered with that name") + return + } + w.Header().Set("Content-Type", "application/json; charset=utf-8") + w.Header().Set("Cache-Control", "no-store") + w.WriteHeader(http.StatusOK) + if err := json.NewEncoder(w).Encode(summary); err != nil { + h.logger.LogAttrs(r.Context(), slog.LevelWarn, "admin sqs describe response encode failed", + slog.String("error", err.Error()), + ) + } +} + +func (h *SqsHandler) handleDelete(w http.ResponseWriter, r *http.Request, name string) { + principal, ok := h.principalForWrite(w, r) + if !ok { + return + } + if strings.TrimSpace(name) == "" { + writeJSONError(w, http.StatusBadRequest, "invalid_queue_name", "queue name is required") + return + } + if err := h.source.AdminDeleteQueue(r.Context(), principal, name); err != nil { + writeQueuesError(w, err, h.logger, r) + return + } + w.Header().Set("Cache-Control", "no-store") + w.WriteHeader(http.StatusNoContent) +} + +// principalForWrite resolves the live role from the RoleStore (when +// configured), gates the request, and returns the principal with the +// **live** role overridden in place — so the role that flows downstream +// to the adapter is the one the operator currently has, not whatever +// the JWT happens to remember. Mirrors DynamoHandler.principalForWrite. +// Without the role override, a JWT-read_only / store-full promoted key +// passes the handler-side check but the adapter rejects with +// ErrAdminForbidden, forcing the user to log out and back in for a +// delete to work. +// +// Failure paths write the response and return ok=false; callers +// short-circuit on the bool. Logged-out / wrong-role callers never +// reach the source layer, so the leader's identity is not leaked +// by indirection (forbidden response is the same shape regardless +// of leadership state). +func (h *SqsHandler) principalForWrite(w http.ResponseWriter, r *http.Request) (AuthPrincipal, bool) { + principal, ok := PrincipalFromContext(r.Context()) + if !ok { + // SessionAuth runs before this handler, so a missing + // principal is a wiring bug. 500 rather than 401 since + // 401 would be misleading — the request was authenticated. + writeJSONError(w, http.StatusInternalServerError, "internal", "missing session principal") + return AuthPrincipal{}, false + } + if h.roles != nil { + live, exists := h.roles.LookupRole(principal.AccessKey) + if !exists { + // Key has been removed from the role config since + // login. Treat it as no-access regardless of what + // the JWT claimed. + writeJSONError(w, http.StatusForbidden, "forbidden", + "this access key is not authorised to delete queues") + return AuthPrincipal{}, false + } + if !live.AllowsWrite() { + writeJSONError(w, http.StatusForbidden, "forbidden", + "this access key is not authorised to delete queues") + return AuthPrincipal{}, false + } + // Forward the live role downstream so the adapter + // re-check sees the same role the handler gated on. + // Without this, a key promoted from read_only → full + // after login still hits the adapter with the JWT's + // stale read_only and gets a confusing 403. + principal.Role = live + } else if !principal.Role.AllowsWrite() { + writeJSONError(w, http.StatusForbidden, "forbidden", + "this access key is not authorised to delete queues") + return AuthPrincipal{}, false + } + return principal, true +} + +// writeQueuesError translates a QueuesSource error onto an HTTP +// response. Unrecognised errors map to 500 with a sanitised message +// — the raw err.Error() may include adapter internals (Pebble paths, +// raft peer ids) that should not flow to the SPA. +func writeQueuesError(w http.ResponseWriter, err error, logger *slog.Logger, r *http.Request) { + switch { + case errors.Is(err, ErrQueuesForbidden): + writeJSONError(w, http.StatusForbidden, "forbidden", "principal lacks required role") + case errors.Is(err, ErrQueuesNotLeader): + w.Header().Set("Retry-After", strconv.Itoa(1)) + writeJSONError(w, http.StatusServiceUnavailable, "leader_unavailable", + "local node is not the leader; retry shortly") + case errors.Is(err, ErrQueuesNotFound): + writeJSONError(w, http.StatusNotFound, "queue_not_found", "no queue with that name") + case errors.Is(err, ErrQueuesValidation): + writeJSONError(w, http.StatusBadRequest, "invalid_request", err.Error()) + default: + logger.LogAttrs(r.Context(), slog.LevelError, "admin sqs operation failed", + slog.String("error", err.Error()), + ) + writeJSONError(w, http.StatusInternalServerError, "internal", + "queue operation failed; see server logs") + } +} diff --git a/internal/admin/sqs_handler_test.go b/internal/admin/sqs_handler_test.go new file mode 100644 index 000000000..30ebe23e6 --- /dev/null +++ b/internal/admin/sqs_handler_test.go @@ -0,0 +1,203 @@ +package admin + +import ( + "context" + "errors" + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/require" +) + +// stubQueuesSource is the in-memory test double for SqsHandler. +// Mirrors stubTablesSource in dynamo_handler_test.go: records the +// principal that flowed through to the source so tests can assert +// live-role forwarding. +type stubQueuesSource struct { + queues []string + describeErr error + deleteErr error + lastDeleteName string + lastDeletePrincipal AuthPrincipal +} + +func (s *stubQueuesSource) AdminListQueues(_ context.Context) ([]string, error) { + return s.queues, nil +} + +func (s *stubQueuesSource) AdminDescribeQueue(_ context.Context, name string) (*QueueSummary, bool, error) { + if s.describeErr != nil { + return nil, false, s.describeErr + } + for _, q := range s.queues { + if q == name { + return &QueueSummary{Name: name}, true, nil + } + } + return nil, false, nil +} + +func (s *stubQueuesSource) AdminDeleteQueue(_ context.Context, principal AuthPrincipal, name string) error { + s.lastDeleteName = name + s.lastDeletePrincipal = principal + if s.deleteErr != nil { + return s.deleteErr + } + for i, q := range s.queues { + if q == name { + s.queues = append(s.queues[:i], s.queues[i+1:]...) + return nil + } + } + return ErrQueuesNotFound +} + +// TestSqsHandler_DeleteQueue_LivePromotion pins the live-role +// forwarding contract: a JWT minted while the access key was +// read_only must, after the operator promotes the key to full in +// the live RoleStore, be allowed to delete *and* the principal +// arriving at AdminDeleteQueue must carry the live role (full), +// not the JWT's stale role. +// +// The bug before this fix: principalCanWrite returned true based on +// the live role, but the unmodified principal (with JWT read_only) +// was passed to AdminDeleteQueue, which independently checked +// principal.Role.canWrite() and returned ErrAdminForbidden. The +// user saw 403 and had to log out + back in for the new role to +// take effect. +func TestSqsHandler_DeleteQueue_LivePromotion(t *testing.T) { + src := &stubQueuesSource{queues: []string{"orders"}} + roles := MapRoleStore{"AKIA_PROMOTED": RoleFull} + h := NewSqsHandler(src).WithRoleStore(roles) + + req := httptest.NewRequest(http.MethodDelete, pathPrefixSqsQueues+"orders", nil) + // JWT was minted while the key was still read_only. The live + // role store has since been updated to full. + req = req.WithContext(context.WithValue(req.Context(), ctxKeyPrincipal, + AuthPrincipal{AccessKey: "AKIA_PROMOTED", Role: RoleReadOnly})) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + + require.Equal(t, http.StatusNoContent, rec.Code, + "promoted key must succeed at the handler layer; body=%s", rec.Body.String()) + require.Equal(t, "orders", src.lastDeleteName) + require.Equal(t, RoleFull, src.lastDeletePrincipal.Role, + "principal forwarded to AdminDeleteQueue must carry the live role (RoleFull), not the JWT's stale RoleReadOnly") + require.Equal(t, "AKIA_PROMOTED", src.lastDeletePrincipal.AccessKey) +} + +// TestSqsHandler_DeleteQueue_LiveRevocation is the symmetric case +// the live-role gate exists for in the first place: a JWT minted +// while full but the key was later removed from full_access_keys +// (or downgraded to read_only) — the request must be rejected at +// the handler, never reaching the source. +func TestSqsHandler_DeleteQueue_LiveRevocation(t *testing.T) { + src := &stubQueuesSource{queues: []string{"orders"}} + roles := MapRoleStore{"AKIA_DOWNGRADED": RoleReadOnly} + h := NewSqsHandler(src).WithRoleStore(roles) + + req := httptest.NewRequest(http.MethodDelete, pathPrefixSqsQueues+"orders", nil) + req = req.WithContext(context.WithValue(req.Context(), ctxKeyPrincipal, + AuthPrincipal{AccessKey: "AKIA_DOWNGRADED", Role: RoleFull})) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + + require.Equal(t, http.StatusForbidden, rec.Code) + require.Empty(t, src.lastDeleteName, "source must not be reached when role is revoked") +} + +// TestSqsHandler_DeleteQueue_KeyRemovedFromStore covers the third +// edge of the live-role gate: a JWT minted while authorised but the +// access key was *removed entirely* from the role config (operator +// rotated credentials). The request must 403 — same shape as the +// downgrade case — and never reach the source. +func TestSqsHandler_DeleteQueue_KeyRemovedFromStore(t *testing.T) { + src := &stubQueuesSource{queues: []string{"orders"}} + roles := MapRoleStore{} // key not present at all + h := NewSqsHandler(src).WithRoleStore(roles) + + req := httptest.NewRequest(http.MethodDelete, pathPrefixSqsQueues+"orders", nil) + req = req.WithContext(context.WithValue(req.Context(), ctxKeyPrincipal, + AuthPrincipal{AccessKey: "AKIA_GONE", Role: RoleFull})) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + + require.Equal(t, http.StatusForbidden, rec.Code) + require.Empty(t, src.lastDeleteName) +} + +// TestSqsHandler_DeleteQueue_NoRoleStore covers the single-tenant +// default: when no RoleStore is wired, the handler trusts the JWT's +// embedded role. A JWT-full request succeeds; a JWT-read_only +// request is rejected at the handler. +func TestSqsHandler_DeleteQueue_NoRoleStore(t *testing.T) { + t.Run("jwt full succeeds", func(t *testing.T) { + src := &stubQueuesSource{queues: []string{"orders"}} + h := NewSqsHandler(src) // no WithRoleStore + req := httptest.NewRequest(http.MethodDelete, pathPrefixSqsQueues+"orders", nil) + req = req.WithContext(context.WithValue(req.Context(), ctxKeyPrincipal, + AuthPrincipal{AccessKey: "AKIA_FULL", Role: RoleFull})) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusNoContent, rec.Code) + require.Equal(t, RoleFull, src.lastDeletePrincipal.Role) + }) + t.Run("jwt read-only is forbidden", func(t *testing.T) { + src := &stubQueuesSource{queues: []string{"orders"}} + h := NewSqsHandler(src) + req := httptest.NewRequest(http.MethodDelete, pathPrefixSqsQueues+"orders", nil) + req = req.WithContext(context.WithValue(req.Context(), ctxKeyPrincipal, + AuthPrincipal{AccessKey: "AKIA_RO", Role: RoleReadOnly})) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusForbidden, rec.Code) + require.Empty(t, src.lastDeleteName) + }) +} + +// TestSqsHandler_ListQueues_EmptyArrayNotNull pins the nil→[] +// normalisation in the list handler. Without it the empty-catalog +// case serialises as `{"queues": null}` and the SPA crashes on +// `queues.length` against null. +func TestSqsHandler_ListQueues_EmptyArrayNotNull(t *testing.T) { + src := &stubQueuesSource{queues: nil} + h := NewSqsHandler(src) + req := httptest.NewRequest(http.MethodGet, pathSqsQueues, nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusOK, rec.Code) + require.Contains(t, rec.Body.String(), `"queues":[]`, + "empty catalog must serialise as [] not null; body=%s", rec.Body.String()) +} + +// TestSqsHandler_DescribeQueue_ZeroCreatedAtIsOmittedOnTheWire pins +// the wire-level contract that a queue with no wall-clock creation +// timestamp does not surface a Go-zero time.Time on the wire. +// time.Time with `omitempty` is NOT dropped by encoding/json or +// goccy/go-json when zero — it serialises as "0001-01-01T00:00:00Z" +// and the SPA renders an ancient date instead of "—". The fix +// switched QueueSummary.CreatedAt to *time.Time and the bridge +// converts a zero time.Time to nil. This test exercises the wire +// representation, not the Go-side IsZero() check the adapter unit +// test already pins. +func TestSqsHandler_DescribeQueue_ZeroCreatedAtIsOmittedOnTheWire(t *testing.T) { + // AdminDescribeQueue stub returns a QueueSummary with no CreatedAt + // set (nil pointer, the post-bridge representation of an unknown + // CreatedAtMillis). + src := &stubQueuesSource{queues: []string{"orders"}} + h := NewSqsHandler(src) + req := httptest.NewRequest(http.MethodGet, pathPrefixSqsQueues+"orders", nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusOK, rec.Code) + body := rec.Body.String() + require.NotContains(t, body, "0001-01-01T00:00:00Z", + "a queue with no wall-clock timestamp must not surface the Go zero time on the wire; body=%s", body) + require.NotContains(t, body, `"created_at":`, + "created_at must be omitted entirely when the queue has no wall-clock timestamp so the SPA renders the placeholder; body=%s", body) +} + +// helper to silence the unused-import warning when errors is only +// referenced inside one of the test functions. +var _ = errors.New diff --git a/main.go b/main.go index 05176eff0..02f09f7fe 100644 --- a/main.go +++ b/main.go @@ -764,7 +764,7 @@ func startServers(in serversInput) error { // the handler hands ErrTablesNotLeader writes to the forwarder // which dials the leader over the cached gRPC pool. Without these // the handler falls back to 503 + Retry-After:1. - if err := startAdminFromFlags(in.ctx, in.lc, in.eg, in.runtimes, runner.dynamoServer, runner.s3Server, in.coordinate, connCache, in.keyvizSampler); err != nil { + if err := startAdminFromFlags(in.ctx, in.lc, in.eg, in.runtimes, runner.dynamoServer, runner.s3Server, runner.sqsServer, in.coordinate, connCache, in.keyvizSampler); err != nil { return waitErrgroupAfterStartupFailure(in.cancel, in.eg, err) } return nil @@ -1309,6 +1309,12 @@ type runtimeServerRunner struct { // 404, mirroring the dynamoServer == nil contract. s3Server *adapter.S3Server + // sqsServer plays the same role for the SQS admin entrypoints + // (adapter/sqs_admin.go). Nil when --sqsAddress is empty; the + // admin listener then leaves /admin/api/v1/sqs/* off the wire + // (the mux 404s those paths). + sqsServer *adapter.SQSServer + // roleStore is the access-key → role index the leader-side // gRPC AdminForward service uses to re-validate the principal // on every forwarded write. Mirrors what admin.Config.RoleIndex @@ -1362,9 +1368,11 @@ func (r *runtimeServerRunner) start() error { return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err) } r.s3Server = s3Server - if err := startSQSServer(r.ctx, r.lc, r.eg, r.sqsAddress, r.shardStore, r.coordinate, r.leaderSQS, r.sqsRegion, r.sqsCredsFile); err != nil { + sqsServer, err := startSQSServer(r.ctx, r.lc, r.eg, r.sqsAddress, r.shardStore, r.coordinate, r.leaderSQS, r.sqsRegion, r.sqsCredsFile) + if err != nil { return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err) } + r.sqsServer = sqsServer if err := startMetricsServer(r.ctx, r.lc, r.eg, r.metricsAddress, r.metricsToken, r.metricsRegistry.Handler()); err != nil { return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err) } diff --git a/main_admin.go b/main_admin.go index 75d37c374..d4d42435e 100644 --- a/main_admin.go +++ b/main_admin.go @@ -76,6 +76,7 @@ func startAdminFromFlags( runtimes []*raftGroupRuntime, dynamoServer *adapter.DynamoDBServer, s3Server *adapter.S3Server, + sqsServer *adapter.SQSServer, coordinate kv.Coordinator, connCache *kv.GRPCConnCache, keyvizSampler *keyviz.MemSampler, @@ -121,14 +122,124 @@ func startAdminFromFlags( clusterSrc := newClusterInfoSource(*raftId, buildVersion(), runtimes) tablesSrc := newDynamoTablesSource(dynamoServer) bucketsSrc := newBucketsSource(s3Server) + queuesSrc := newSqsQueuesSource(sqsServer) forwarder, err := buildAdminLeaderForwarder(coordinate, connCache, *raftId) if err != nil { return errors.Wrap(err, "build admin leader forwarder") } - _, err = startAdminServer(ctx, lc, eg, cfg, staticCreds, clusterSrc, tablesSrc, bucketsSrc, forwarder, keyvizSampler, buildVersion()) + _, err = startAdminServer(ctx, lc, eg, cfg, staticCreds, clusterSrc, tablesSrc, bucketsSrc, queuesSrc, forwarder, keyvizSampler, buildVersion()) return err } +// newSqsQueuesSource adapts *adapter.SQSServer to admin.QueuesSource. +// Same architectural reasoning as newDynamoTablesSource and +// newBucketsSource: the bridge stays in this file (rather than +// internal/admin) so the admin package stays free of the heavy +// adapter-package dependency tree. Returns nil when sqsServer is nil +// so admin.NewServer leaves /admin/api/v1/sqs/* off the wire. +func newSqsQueuesSource(sqsServer *adapter.SQSServer) admin.QueuesSource { + if sqsServer == nil { + return nil + } + return &sqsQueuesBridge{server: sqsServer} +} + +// sqsQueuesBridge re-shapes adapter.AdminQueueSummary into +// admin.QueueSummary. The two structs are deliberately isomorphic so +// this translation does no allocation more than necessary; if a +// future field is added on one side, the build breaks here, which +// is the drift signal we want. +type sqsQueuesBridge struct { + server *adapter.SQSServer +} + +func (b *sqsQueuesBridge) AdminListQueues(ctx context.Context) ([]string, error) { + return b.server.AdminListQueues(ctx) //nolint:wrapcheck // pure pass-through; adapter owns the error context. +} + +func (b *sqsQueuesBridge) AdminDescribeQueue(ctx context.Context, name string) (*admin.QueueSummary, bool, error) { + summary, exists, err := b.server.AdminDescribeQueue(ctx, name) + if err != nil { + return nil, false, translateAdminQueuesError(err) + } + if !exists { + return nil, false, nil + } + return convertAdminQueueSummary(summary), true, nil +} + +func (b *sqsQueuesBridge) AdminDeleteQueue(ctx context.Context, principal admin.AuthPrincipal, name string) error { + if err := b.server.AdminDeleteQueue(ctx, convertAdminPrincipal(principal), name); err != nil { + return translateAdminQueuesError(err) + } + return nil +} + +// convertAdminQueueSummary mirrors adapter.AdminQueueSummary into +// admin.QueueSummary. Counter fields are int64 on both sides; if +// either side grows a field, this function should be extended in the +// same commit so a compile error catches the drift. +// +// CreatedAt collapses a zero time.Time on the adapter side (queues +// stored before CreatedAtMillis was populated, or never populated) +// to a nil pointer on the admin side so omitempty actually drops the +// field at the wire layer. Without this collapse the SPA would see +// "0001-01-01T00:00:00Z" and render an ancient date. +func convertAdminQueueSummary(in *adapter.AdminQueueSummary) *admin.QueueSummary { + if in == nil { + return nil + } + var createdAt *time.Time + if !in.CreatedAt.IsZero() { + t := in.CreatedAt + createdAt = &t + } + return &admin.QueueSummary{ + Name: in.Name, + IsFIFO: in.IsFIFO, + Generation: in.Generation, + CreatedAt: createdAt, + Attributes: in.Attributes, + Counters: admin.QueueCounters{ + Visible: in.Counters.Visible, + NotVisible: in.Counters.NotVisible, + Delayed: in.Counters.Delayed, + }, + } +} + +// translateAdminQueuesError maps the adapter's SQS error vocabulary +// onto the admin-package sentinels the SQS handler matches against. +// Anything not recognised is forwarded as-is and answered with 500 +// + a sanitised body, matching the dynamo / s3 bridges' behaviour. +func translateAdminQueuesError(err error) error { + switch { + case err == nil: + return nil + case errors.Is(err, adapter.ErrAdminForbidden): + return admin.ErrQueuesForbidden + case errors.Is(err, adapter.ErrAdminNotLeader): + return admin.ErrQueuesNotLeader + case errors.Is(err, adapter.ErrAdminSQSNotFound): + return admin.ErrQueuesNotFound + case errors.Is(err, adapter.ErrAdminSQSValidation): + return admin.ErrQueuesValidation + case isLeaderChurnError(err): + // Leadership can be lost between AdminDeleteQueue's upfront + // isVerifiedSQSLeader check and the coordinator dispatch + // inside deleteQueueWithRetry. The kv coordinator surfaces + // that as ErrLeaderNotFound / ErrNotLeader (or a wrapped + // "not leader" / "leader not found" suffix), and the + // retry loop's isRetryableTransactWriteError does not catch + // them. Without this arm the error falls to default and the + // admin handler renders a generic 500. Mirrors the same arm + // in translateAdminTablesError on the Dynamo side. + return admin.ErrQueuesNotLeader + default: + return err //nolint:wrapcheck // forwarded so the handler logs but does not surface it. + } +} + // buildAdminLeaderForwarder constructs the production LeaderForwarder // for the dynamo HTTP handler when the wiring is complete enough to // reach a remote leader. The bridge tolerates a nil connCache (and a @@ -450,6 +561,7 @@ func startAdminServer( cluster admin.ClusterInfoSource, tables admin.TablesSource, buckets admin.BucketsSource, + queues admin.QueuesSource, forwarder admin.LeaderForwarder, keyvizSampler *keyviz.MemSampler, version string, @@ -459,7 +571,7 @@ func startAdminServer( if err != nil || !enabled { return "", err } - server, err := buildAdminHTTPServer(&adminCfg, creds, cluster, tables, buckets, forwarder, keyvizSampler) + server, err := buildAdminHTTPServer(&adminCfg, creds, cluster, tables, buckets, queues, forwarder, keyvizSampler) if err != nil { return "", err } @@ -499,7 +611,7 @@ func checkAdminConfig(adminCfg *admin.Config, cluster admin.ClusterInfoSource) ( return true, nil } -func buildAdminHTTPServer(adminCfg *admin.Config, creds map[string]string, cluster admin.ClusterInfoSource, tables admin.TablesSource, buckets admin.BucketsSource, forwarder admin.LeaderForwarder, keyvizSampler *keyviz.MemSampler) (*admin.Server, error) { +func buildAdminHTTPServer(adminCfg *admin.Config, creds map[string]string, cluster admin.ClusterInfoSource, tables admin.TablesSource, buckets admin.BucketsSource, queues admin.QueuesSource, forwarder admin.LeaderForwarder, keyvizSampler *keyviz.MemSampler) (*admin.Server, error) { primaryKeys, err := adminCfg.DecodedSigningKeys() if err != nil { return nil, errors.Wrap(err, "decode admin signing keys") @@ -524,6 +636,7 @@ func buildAdminHTTPServer(adminCfg *admin.Config, creds map[string]string, clust ClusterInfo: cluster, Tables: tables, Buckets: buckets, + Queues: queues, Forwarder: forwarder, KeyViz: keyvizSourceFromSampler(keyvizSampler), StaticFS: staticFS, diff --git a/main_admin_test.go b/main_admin_test.go index 2f83fdf0f..d3e298ef1 100644 --- a/main_admin_test.go +++ b/main_admin_test.go @@ -198,7 +198,7 @@ func TestStartAdminServer_DisabledNoOp(t *testing.T) { eg, ctx := errgroup.WithContext(context.Background()) defer func() { _ = eg.Wait() }() var lc net.ListenConfig - _, err := startAdminServer(ctx, &lc, eg, adminListenerConfig{enabled: false}, nil, nil, nil, nil, nil, nil, "") + _, err := startAdminServer(ctx, &lc, eg, adminListenerConfig{enabled: false}, nil, nil, nil, nil, nil, nil, nil, "") require.NoError(t, err) } @@ -211,7 +211,7 @@ func TestStartAdminServer_InvalidConfigRejected(t *testing.T) { listen: "127.0.0.1:0", // missing signing key } - _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, nil, nil, nil, nil, "") + _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, nil, nil, nil, nil, nil, "") require.Error(t, err) } @@ -224,7 +224,7 @@ func TestStartAdminServer_NonLoopbackWithoutTLSRejected(t *testing.T) { listen: "0.0.0.0:0", sessionSigningKey: freshKey(), } - _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, nil, nil, nil, nil, "") + _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, nil, nil, nil, nil, nil, "") require.Error(t, err) require.Contains(t, err.Error(), "TLS") } @@ -238,7 +238,7 @@ func TestStartAdminServer_RejectsMissingClusterSource(t *testing.T) { listen: "127.0.0.1:0", sessionSigningKey: freshKey(), } - _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, nil, nil, nil, nil, "") + _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, nil, nil, nil, nil, nil, "") require.Error(t, err) require.Contains(t, err.Error(), "cluster info source") } @@ -261,7 +261,7 @@ func TestStartAdminServer_ServesHealthz(t *testing.T) { cluster := admin.ClusterInfoFunc(func(_ context.Context) (admin.ClusterInfo, error) { return admin.ClusterInfo{NodeID: "n1", Version: "test"}, nil }) - addr, err := startAdminServer(eCtx, &lc, eg, cfg, map[string]string{}, cluster, nil, nil, nil, nil, "test") + addr, err := startAdminServer(eCtx, &lc, eg, cfg, map[string]string{}, cluster, nil, nil, nil, nil, nil, "test") require.NoError(t, err) // Poll /admin/healthz until success or the test deadline. @@ -304,7 +304,7 @@ func TestStartAdminServer_ServesTLS(t *testing.T) { cluster := admin.ClusterInfoFunc(func(_ context.Context) (admin.ClusterInfo, error) { return admin.ClusterInfo{NodeID: "n-tls", Version: "test"}, nil }) - addr, err := startAdminServer(eCtx, &lc, eg, cfg, map[string]string{}, cluster, nil, nil, nil, nil, "test") + addr, err := startAdminServer(eCtx, &lc, eg, cfg, map[string]string{}, cluster, nil, nil, nil, nil, nil, "test") require.NoError(t, err) transport := &http.Transport{TLSClientConfig: &tls.Config{ @@ -426,3 +426,62 @@ func TestTranslateAdminTablesError_UnrelatedErrorPassesThrough(t *testing.T) { require.NotErrorIs(t, out, admin.ErrTablesNotLeader) require.Equal(t, in, out) } + +// TestTranslateAdminQueuesError_LeaderChurn is the SQS counterpart of +// TestTranslateAdminTablesError_LeaderChurn. AdminDeleteQueue clears +// the upfront isVerifiedSQSLeader check but the kv coordinator can +// still drop leadership inside deleteQueueWithRetry's Dispatch; the +// resulting ErrLeaderNotFound / ErrNotLeader / wrapped suffixes must +// classify as 503 leader_unavailable, not the generic 500 fallthrough. +func TestTranslateAdminQueuesError_LeaderChurn(t *testing.T) { + cases := []struct { + name string + in error + }{ + {"kv.ErrLeaderNotFound", kv.ErrLeaderNotFound}, + {"adapter.ErrNotLeader", adapter.ErrNotLeader}, + {"adapter.ErrLeaderNotFound", adapter.ErrLeaderNotFound}, + {"wrapped not leader", errors.New("dispatch failed: not leader")}, + {"wrapped leader not found", errors.New("dispatch: leader not found")}, + {"wrapped leadership lost", errors.New("commit aborted: leadership lost")}, + {"wrapped leadership transfer", errors.New("retry exhausted: leadership transfer in progress")}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + out := translateAdminQueuesError(tc.in) + require.ErrorIs(t, out, admin.ErrQueuesNotLeader, + "input %q must map to ErrQueuesNotLeader", tc.in) + }) + } +} + +// TestTranslateAdminQueuesError_LeaderPhraseInMiddleOfMessage is the +// SQS counterpart of the same Tables test — pins that the HasSuffix +// matcher in isLeaderChurnError does not false-positive on +// user-supplied error messages that happen to mention a leader +// phrase mid-string (e.g. a queue name or attribute value that +// happens to contain "not leader"). +func TestTranslateAdminQueuesError_LeaderPhraseInMiddleOfMessage(t *testing.T) { + cases := []string{ + "not leader: actually a downstream error", + "leader not found, but recovered automatically", + "leadership lost mid-snapshot, retried successfully", + } + for _, msg := range cases { + t.Run(msg, func(t *testing.T) { + out := translateAdminQueuesError(errors.New(msg)) + require.NotErrorIs(t, out, admin.ErrQueuesNotLeader, + "mid-message leader phrase %q must not classify as leader-churn", msg) + }) + } +} + +// TestTranslateAdminQueuesError_UnrelatedErrorPassesThrough confirms +// the leader-churn detector does not swallow unrelated errors that +// happen to mention the word "leader" outside the canonical phrases. +func TestTranslateAdminQueuesError_UnrelatedErrorPassesThrough(t *testing.T) { + in := errors.New("team leader misconfigured") + out := translateAdminQueuesError(in) + require.NotErrorIs(t, out, admin.ErrQueuesNotLeader) + require.Equal(t, in, out) +} diff --git a/main_sqs.go b/main_sqs.go index 55ca41684..7bb8623e0 100644 --- a/main_sqs.go +++ b/main_sqs.go @@ -11,6 +11,11 @@ import ( "golang.org/x/sync/errgroup" ) +// startSQSServer stands up the SQS adapter on sqsAddr and returns the +// running *adapter.SQSServer so the admin listener can call SigV4-bypass +// admin entrypoints against it (see adapter/sqs_admin.go). Returns +// (nil, nil) when sqsAddr is empty — that is the "SQS disabled" branch +// and the admin listener leaves /admin/api/v1/sqs/* off the wire. func startSQSServer( ctx context.Context, lc *net.ListenConfig, @@ -21,19 +26,19 @@ func startSQSServer( leaderSQS map[string]string, region string, credentialsFile string, -) error { +) (*adapter.SQSServer, error) { sqsAddr = strings.TrimSpace(sqsAddr) if sqsAddr == "" { - return nil + return nil, nil } sqsL, err := lc.Listen(ctx, "tcp", sqsAddr) if err != nil { - return errors.Wrapf(err, "failed to listen on %s", sqsAddr) + return nil, errors.Wrapf(err, "failed to listen on %s", sqsAddr) } staticCreds, err := loadSigV4StaticCredentialsFile(credentialsFile, "sqs") if err != nil { _ = sqsL.Close() - return err + return nil, err } sqsServer := adapter.NewSQSServer( sqsL, @@ -63,5 +68,5 @@ func startSQSServer( } return errors.WithStack(err) }) - return nil + return sqsServer, nil } diff --git a/web/admin/src/App.tsx b/web/admin/src/App.tsx index 90272b146..08311dfcf 100644 --- a/web/admin/src/App.tsx +++ b/web/admin/src/App.tsx @@ -9,6 +9,8 @@ import { LoginPage } from "./pages/Login"; import { NotFoundPage } from "./pages/NotFound"; import { S3DetailPage } from "./pages/S3Detail"; import { S3ListPage } from "./pages/S3List"; +import { SqsDetailPage } from "./pages/SqsDetail"; +import { SqsListPage } from "./pages/SqsList"; export function App() { return ( @@ -25,6 +27,8 @@ export function App() { } /> } /> } /> + } /> + } /> } /> } /> } /> diff --git a/web/admin/src/api/client.ts b/web/admin/src/api/client.ts index c3ec92cdf..ee1dcea40 100644 --- a/web/admin/src/api/client.ts +++ b/web/admin/src/api/client.ts @@ -193,6 +193,29 @@ export interface CreateBucketRequest { acl?: "private" | "public-read"; } +// SQS queue admin DTOs (Section 16.2 of the SQS partial design doc). +// `attributes` mirrors the AWS GetQueueAttributes "All" set with +// snake_case keys; `counters` is the typed projection of the three +// Approximate* counters added in Phase 3.A. +export interface SqsQueueCounters { + visible: number; + not_visible: number; + delayed: number; +} + +export interface SqsQueueSummary { + name: string; + is_fifo: boolean; + generation: number; + created_at?: string; + attributes?: Record; + counters: SqsQueueCounters; +} + +export interface SqsQueueList { + queues: string[]; +} + export const api = { login: (access_key: string, secret_key: string) => apiFetch("/auth/login", { @@ -223,4 +246,10 @@ export const api = { }), deleteBucket: (name: string) => apiFetch(`/s3/buckets/${encodeURIComponent(name)}`, { method: "DELETE" }), + listQueues: (signal?: AbortSignal) => + apiFetch("/sqs/queues", { signal }), + describeQueue: (name: string, signal?: AbortSignal) => + apiFetch(`/sqs/queues/${encodeURIComponent(name)}`, { signal }), + deleteQueue: (name: string) => + apiFetch(`/sqs/queues/${encodeURIComponent(name)}`, { method: "DELETE" }), }; diff --git a/web/admin/src/components/Layout.tsx b/web/admin/src/components/Layout.tsx index 80e6641da..26620cdb8 100644 --- a/web/admin/src/components/Layout.tsx +++ b/web/admin/src/components/Layout.tsx @@ -4,6 +4,7 @@ import { useAuth } from "../auth"; const navItems: { to: string; label: string; end?: boolean }[] = [ { to: "/", label: "Overview", end: true }, { to: "/dynamo", label: "DynamoDB" }, + { to: "/sqs", label: "SQS" }, { to: "/s3", label: "S3" }, ]; diff --git a/web/admin/src/pages/SqsDetail.tsx b/web/admin/src/pages/SqsDetail.tsx new file mode 100644 index 000000000..2ad32c7ce --- /dev/null +++ b/web/admin/src/pages/SqsDetail.tsx @@ -0,0 +1,147 @@ +import { useState } from "react"; +import { Link, useNavigate, useParams } from "react-router-dom"; +import { api } from "../api/client"; +import { Modal } from "../components/Modal"; +import { formatApiError, useApiQuery } from "../lib/useApi"; + +export function SqsDetailPage() { + const { name = "" } = useParams<{ name: string }>(); + const detail = useApiQuery((signal) => api.describeQueue(name, signal), [name]); + const [confirmDelete, setConfirmDelete] = useState(false); + const [deleting, setDeleting] = useState(false); + const [deleteError, setDeleteError] = useState(null); + const navigate = useNavigate(); + // The delete button is gated by the backend's live-role check + // (internal/admin/sqs_handler.go principalForWrite), not the JWT + // role cached in this session. A JWT minted as read_only stays + // read_only in the cookie until logout, but the operator may have + // been promoted to full in the live role store after login — so + // gating the button on session.role would hide it for users who + // are currently authorized. A read_only operator who clicks delete + // gets a 403 from the backend, surfaced in the modal's error area. + + const onDelete = async () => { + setDeleting(true); + setDeleteError(null); + try { + await api.deleteQueue(name); + navigate("/sqs", { replace: true }); + } catch (err) { + setDeleteError(formatApiError(err)); + setDeleting(false); + } + }; + + return ( +
+
+ ← All queues +

{name}

+ {detail.data && ( + + {detail.data.is_fifo ? "FIFO" : "Standard"} + + )} + {detail.data && ( + + )} +
+ +
+ {detail.loading &&
Loading…
} + {detail.error?.status === 404 && ( +
+ Either the queue does not exist or the SQS admin endpoints are not + wired (no --sqsAddress). +
+ )} + {detail.error && detail.error.status !== 404 && ( +
{formatApiError(detail.error)}
+ )} + {detail.data && ( +
+
Generation
+
{detail.data.generation}
+
Created
+
+ {detail.data.created_at ? new Date(detail.data.created_at).toLocaleString() : "—"} +
+
+ )} +
+ + {detail.data && ( +
+
+

Approximate message counts

+
+
+ + + +
+
+ )} + + {detail.data?.attributes && Object.keys(detail.data.attributes).length > 0 && ( +
+

Configuration

+
+ {Object.entries(detail.data.attributes).map(([k, v]) => ( +
+
{k}
+
{v}
+
+ ))} +
+
+ )} + + !deleting && setConfirmDelete(false)} + busy={deleting} + > +

+ Permanently delete {name}? All messages + will be removed and the queue cannot be recovered. +

+ {deleteError &&
{deleteError}
} +
+ + +
+
+
+ ); +} + +function CounterCard({ label, value }: { label: string; value: number }) { + return ( +
+
{label}
+
{value}
+
+ ); +} diff --git a/web/admin/src/pages/SqsList.tsx b/web/admin/src/pages/SqsList.tsx new file mode 100644 index 000000000..f5b4b5517 --- /dev/null +++ b/web/admin/src/pages/SqsList.tsx @@ -0,0 +1,68 @@ +import { Link } from "react-router-dom"; +import { api } from "../api/client"; +import { formatApiError, useApiQuery } from "../lib/useApi"; + +export function SqsListPage() { + const queues = useApiQuery((signal) => api.listQueues(signal), []); + + return ( +
+
+
+

SQS queues

+

+ List, describe, and delete SQS queues. Detail pages also surface + the approximate visible / in-flight / delayed message counts. +

+
+ +
+ +
+ {queues.loading &&
Loading…
} + {queues.error?.status === 404 && ( +
+ SQS admin endpoints not wired on this build (the cluster was started + without --sqsAddress, so the + admin listener leaves /admin/api/v1/sqs/* + off the wire). +
+ )} + {queues.error && queues.error.status !== 404 && ( +
{formatApiError(queues.error)}
+ )} + {queues.data && queues.data.queues.length === 0 && ( +
No queues yet.
+ )} + {queues.data && queues.data.queues.length > 0 && ( + + + + + + + + {queues.data.queues.map((name) => ( + + + + + ))} + +
Queue +
+ + {name} + + + + details → + +
+ )} +
+
+ ); +}