Skip to content

Commit 105635e

Browse files
committed
feat(sqs/admin): SigV4-bypass admin entrypoints + SPA queues pages
Replaces PR #659, which conflicted heavily after main moved (PR #649 squashed; PR #658 added S3 admin endpoints; the Approximate counters implementation now lives directly in adapter/sqs_catalog.go). This PR: Backend (adapter/sqs_admin.go + internal/admin/sqs_handler.go): - SQSServer.AdminListQueues / AdminDescribeQueue / AdminDeleteQueue are SigV4-bypass entrypoints, mirroring the AdminListTables / AdminListBuckets pattern. - AdminDescribeQueue uses the existing scanApproxCounters from sqs_catalog.go (already on main) so the admin path returns the same Visible / NotVisible / Delayed numbers as GetQueueAttributes("All") would, taken at one snapshot read TS. - sqsQueuesBridge in main_admin.go re-shapes adapter.AdminQueueSummary into admin.QueueSummary, keeping internal/admin free of the heavy adapter dependency tree — same pattern as dynamoTablesBridge / s3BucketsBridge. - admin.QueuesSource is opt-in; deployments that don't run --sqsAddress leave /admin/api/v1/sqs/* off the wire and the SPA renders a soft "endpoint pending" notice on the 404. - Role re-evaluation against the live RoleStore on DELETE so a downgraded key cannot keep mutating with a still-valid JWT. - apiRouteTable.dispatch refactored: resourceHandlerFor extracted so the dispatcher stays under cyclop=10 as new resources land (Dynamo, S3, SQS, future). Frontend (web/admin/src/pages/SqsList.tsx, SqsDetail.tsx): - /sqs queue list with refresh + per-row link to detail. - /sqs/:name detail showing FIFO badge, counters card (Visible / In-flight / Delayed), raw attributes table, and a Delete confirmation Modal gated by RequireFullAccess. - api/client.ts gains listQueues / describeQueue / deleteQueue with the same AbortSignal pattern used for cluster / dynamo / s3 reads. - Layout nav adds an SQS tab between DynamoDB and S3. Out of scope (recorded in the SQS partial design doc §16.2): - PurgeQueue from the SPA. Underlying purgeQueueWithRetry is on main; the admin entrypoint is a trivial follow-up. - Send / Peek / CreateQueue from the SPA. Each needs its own adapter entrypoint and form UX; deferred to keep this PR focused. Verified with go build ./..., go test -race ./internal/admin/..., go test -race -run TestSQS ./adapter/, go test -run TestStartAdmin ., golangci-lint run ./adapter/... ./internal/admin/... ./... (0 issues, no //nolint), and cd web/admin && npm run build.
1 parent ee521be commit 105635e

11 files changed

Lines changed: 861 additions & 29 deletions

File tree

adapter/sqs_admin.go

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
package adapter
2+
3+
import (
4+
"context"
5+
"strconv"
6+
"strings"
7+
"time"
8+
9+
"github.com/cockroachdb/errors"
10+
)
11+
12+
// AdminQueueSummary is the per-queue projection the admin dashboard
13+
// surfaces. It deliberately covers only the fields the SPA renders so
14+
// the package's wire-format types stay internal.
15+
//
16+
// Counters mirror the AWS Approximate* attribute set produced by
17+
// scanApproxCounters; they are best-effort by AWS contract and stop
18+
// counting once the catalog's per-call cap is reached (the SPA polls
19+
// continuously, so an unbounded scan would pin the leader).
20+
type AdminQueueSummary struct {
21+
Name string
22+
IsFIFO bool
23+
Generation uint64
24+
CreatedAt time.Time
25+
Attributes map[string]string
26+
Counters AdminQueueCounters
27+
}
28+
29+
// AdminQueueCounters matches sqsApproxCounters (int64) so the admin
30+
// bridge does not have to convert between widths. Visible /
31+
// NotVisible / Delayed are the AWS Approximate* triple.
32+
type AdminQueueCounters struct {
33+
Visible int64
34+
NotVisible int64
35+
Delayed int64
36+
}
37+
38+
// AdminListQueues returns every queue name this server knows about,
39+
// in the lexicographic order the queue catalog index produces. Read
40+
// path; runs on follower or leader and uses the same scanQueueNames
41+
// helper the SigV4 ListQueues handler does.
42+
func (s *SQSServer) AdminListQueues(ctx context.Context) ([]string, error) {
43+
return s.scanQueueNames(ctx) //nolint:wrapcheck // pure pass-through; the adapter owns the error context.
44+
}
45+
46+
// AdminDescribeQueue returns a snapshot of name's metadata plus the
47+
// approximate counters. The triple (result, present, error) lets
48+
// admin callers distinguish a missing queue from a storage error
49+
// without sniffing sentinels.
50+
//
51+
// Like AdminDescribeTable on the Dynamo side, this entrypoint runs
52+
// on either the leader or a follower (read-only); the counter scan
53+
// uses a fresh nextTxnReadTS so the result is consistent with what
54+
// SigV4 GetQueueAttributes would have returned at the same instant.
55+
func (s *SQSServer) AdminDescribeQueue(ctx context.Context, name string) (*AdminQueueSummary, bool, error) {
56+
if strings.TrimSpace(name) == "" {
57+
return nil, false, ErrAdminSQSValidation
58+
}
59+
readTS := s.nextTxnReadTS(ctx)
60+
meta, exists, err := s.loadQueueMetaAt(ctx, name, readTS)
61+
if err != nil {
62+
return nil, false, errors.WithStack(err)
63+
}
64+
if !exists {
65+
return nil, false, nil
66+
}
67+
counters, err := s.scanApproxCounters(ctx, name, meta.Generation, readTS)
68+
if err != nil {
69+
return nil, false, err
70+
}
71+
summary := &AdminQueueSummary{
72+
Name: name,
73+
IsFIFO: meta.IsFIFO,
74+
Generation: meta.Generation,
75+
CreatedAt: hlcToTime(meta.CreatedAtHLC),
76+
Attributes: metaAttributesForAdmin(meta),
77+
Counters: AdminQueueCounters(counters),
78+
}
79+
return summary, true, nil
80+
}
81+
82+
// AdminDeleteQueue is the SigV4-bypass counterpart to deleteQueue.
83+
// Returns the same sentinel errors as AdminCreateTable on the Dynamo
84+
// side: ErrAdminForbidden on a read-only principal, ErrAdminNotLeader
85+
// on a follower, ErrAdminSQSNotFound when the queue is absent.
86+
func (s *SQSServer) AdminDeleteQueue(ctx context.Context, principal AdminPrincipal, name string) error {
87+
if !principal.Role.canWrite() {
88+
return ErrAdminForbidden
89+
}
90+
if !isVerifiedSQSLeader(s.coordinator) {
91+
return ErrAdminNotLeader
92+
}
93+
if strings.TrimSpace(name) == "" {
94+
return ErrAdminSQSValidation
95+
}
96+
if err := s.deleteQueueWithRetry(ctx, name); err != nil {
97+
// deleteQueueWithRetry returns sqsAPIError with
98+
// sqsErrQueueDoesNotExist when the queue is missing; map
99+
// to the structured ErrAdminSQSNotFound so the admin
100+
// handler can render 404 without sniffing the AWS code.
101+
if isSQSAdminQueueDoesNotExist(err) {
102+
return ErrAdminSQSNotFound
103+
}
104+
return errors.Wrap(err, "admin delete queue")
105+
}
106+
return nil
107+
}
108+
109+
// metaAttributesForAdmin renders the queue meta into the same shape
110+
// queueMetaToAttributes("All") would, minus the counters (the admin
111+
// summary surfaces them as a typed struct alongside, not as strings).
112+
// Kept as a small dedicated helper so the SigV4 path's selection
113+
// machinery stays untouched.
114+
func metaAttributesForAdmin(meta *sqsQueueMeta) map[string]string {
115+
out := map[string]string{
116+
"VisibilityTimeout": strconv.FormatInt(meta.VisibilityTimeoutSeconds, 10),
117+
"MessageRetentionPeriod": strconv.FormatInt(meta.MessageRetentionSeconds, 10),
118+
"DelaySeconds": strconv.FormatInt(meta.DelaySeconds, 10),
119+
"ReceiveMessageWaitTimeSeconds": strconv.FormatInt(meta.ReceiveMessageWaitSeconds, 10),
120+
"MaximumMessageSize": strconv.FormatInt(meta.MaximumMessageSize, 10),
121+
"FifoQueue": strconv.FormatBool(meta.IsFIFO),
122+
"ContentBasedDeduplication": strconv.FormatBool(meta.ContentBasedDedup),
123+
}
124+
if meta.RedrivePolicy != "" {
125+
out["RedrivePolicy"] = meta.RedrivePolicy
126+
}
127+
return out
128+
}
129+
130+
// ErrAdminSQSValidation is returned when an admin entrypoint receives
131+
// a request with a missing or syntactically-bad queue name. Maps to
132+
// 400 in the admin HTTP handler.
133+
var ErrAdminSQSValidation = errors.New("sqs admin: invalid queue name")
134+
135+
// ErrAdminSQSNotFound is returned by write entrypoints when the
136+
// target queue does not exist. Maps to 404. The describe path uses
137+
// the (nil, false, nil) tuple instead of this sentinel for the
138+
// not-found signal, mirroring AdminDescribeTable.
139+
var ErrAdminSQSNotFound = errors.New("sqs admin: queue not found")
140+
141+
// isSQSAdminQueueDoesNotExist matches the deleteQueueWithRetry path's
142+
// "queue does not exist" sqsAPIError so AdminDeleteQueue can normalise
143+
// it to ErrAdminSQSNotFound. Falls through to false on any unrelated
144+
// error, which AdminDeleteQueue then wraps and propagates.
145+
func isSQSAdminQueueDoesNotExist(err error) bool {
146+
var apiErr *sqsAPIError
147+
if !errors.As(err, &apiErr) || apiErr == nil {
148+
return false
149+
}
150+
return apiErr.errorType == sqsErrQueueDoesNotExist
151+
}

internal/admin/server.go

Lines changed: 81 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,14 @@ type ServerDeps struct {
6161
// off" state instead of an empty matrix.
6262
KeyViz KeyVizSource
6363

64+
// Queues is the SQS admin source — covers list, describe, and
65+
// delete via QueuesSource. Optional: a nil value disables
66+
// /admin/api/v1/sqs/queues{,/{name}} (the mux answers them
67+
// with 404). Same opt-in shape as Tables / Buckets; deployments
68+
// that don't run the SQS adapter omit this without breaking the
69+
// rest of the admin surface.
70+
Queues QueuesSource
71+
6472
// StaticFS is the embed.FS (or any fs.FS) backing the SPA. May be
6573
// nil during early development; the router renders 404 for
6674
// /admin/assets/* and the SPA fallback in that case.
@@ -112,7 +120,8 @@ func NewServer(deps ServerDeps) (*Server, error) {
112120
// nil it serves a 503 keyviz_disabled, which the SPA renders as
113121
// a clearer "feature off" state than an unknown_endpoint 404.
114122
keyviz := NewKeyVizHandler(deps.KeyViz).WithLogger(logger)
115-
mux := buildAPIMux(auth, deps.Verifier, cluster, dynamo, s3, keyviz, logger)
123+
sqs := buildSqsHandlerForDeps(deps, logger)
124+
mux := buildAPIMux(auth, deps.Verifier, cluster, dynamo, s3, keyviz, sqs, logger)
116125
router := NewRouter(mux, deps.StaticFS)
117126
return &Server{deps: deps, router: router, auth: auth, mux: mux}, nil
118127
}
@@ -177,6 +186,20 @@ func buildS3HandlerForDeps(deps ServerDeps, logger *slog.Logger) http.Handler {
177186
return NewS3Handler(deps.Buckets).WithLogger(logger)
178187
}
179188

189+
// buildSqsHandlerForDeps is the parallel constructor for the SQS
190+
// admin handler. Read paths are open to any session; the DELETE
191+
// path re-evaluates the principal's role against the live MapRoleStore
192+
// on every request, so a downgraded key cannot keep mutating with a
193+
// still-valid JWT.
194+
func buildSqsHandlerForDeps(deps ServerDeps, logger *slog.Logger) http.Handler {
195+
if deps.Queues == nil {
196+
return nil
197+
}
198+
return NewSqsHandler(deps.Queues).
199+
WithLogger(logger).
200+
WithRoleStore(MapRoleStore(deps.Roles))
201+
}
202+
180203
// Handler returns an http.Handler that serves the full admin surface.
181204
// We wrap the router in BodyLimit at the top level so every endpoint
182205
// — including /admin/healthz and the static asset / SPA paths — is
@@ -215,14 +238,14 @@ func (s *Server) APIHandler() http.Handler {
215238
// audit path inside AuthService because the generic Audit middleware
216239
// cannot see the claimed actor at that point in the chain.
217240
//
218-
// dynamoHandler / s3Handler may be nil; in that case the corresponding
219-
// paths fall through to the unknown-endpoint 404, matching the
220-
// behaviour of any other unregistered admin path.
241+
// dynamoHandler / s3Handler / sqsHandler may be nil; in that case
242+
// the corresponding paths fall through to the unknown-endpoint 404,
243+
// matching the behaviour of any other unregistered admin path.
221244
//
222245
// keyvizHandler is always non-nil even when the sampler is disabled —
223246
// it serves 503 keyviz_disabled itself so the SPA gets a clearer
224247
// signal than an unknown_endpoint 404 from the catch-all.
225-
func buildAPIMux(auth *AuthService, verifier *Verifier, clusterHandler, dynamoHandler, s3Handler, keyvizHandler http.Handler, logger *slog.Logger) http.Handler {
248+
func buildAPIMux(auth *AuthService, verifier *Verifier, clusterHandler, dynamoHandler, s3Handler, keyvizHandler, sqsHandler http.Handler, logger *slog.Logger) http.Handler {
226249
loginHandler := http.HandlerFunc(auth.HandleLogin)
227250
logoutHandler := http.HandlerFunc(auth.HandleLogout)
228251

@@ -290,6 +313,14 @@ func buildAPIMux(auth *AuthService, verifier *Verifier, clusterHandler, dynamoHa
290313
if s3Handler != nil {
291314
s3Chain = protect(s3Handler)
292315
}
316+
// SQS endpoints share the same protect chain rationale: GET
317+
// reads are session-gated to keep cross-site fetches from
318+
// enumerating queue names; DELETE goes through CSRF + the
319+
// in-handler RoleFull check inside SqsHandler.
320+
var sqsChain http.Handler
321+
if sqsHandler != nil {
322+
sqsChain = protect(sqsHandler)
323+
}
293324

294325
routes := apiRouteTable{
295326
login: loginChain,
@@ -298,6 +329,7 @@ func buildAPIMux(auth *AuthService, verifier *Verifier, clusterHandler, dynamoHa
298329
dynamo: dynamoChain,
299330
s3: s3Chain,
300331
keyviz: keyvizChain,
332+
sqs: sqsChain,
301333
}
302334
return http.HandlerFunc(routes.dispatch)
303335
}
@@ -309,29 +341,55 @@ func buildAPIMux(auth *AuthService, verifier *Verifier, clusterHandler, dynamoHa
309341
// would otherwise push buildAPIMux's branch count past the limit.
310342
type apiRouteTable struct {
311343
login, logout, cluster http.Handler
312-
dynamo, s3 http.Handler
344+
dynamo, s3, sqs http.Handler
313345
keyviz http.Handler
314346
}
315347

316348
// dispatch is the receiver method httpHandlerFunc adapts. Logic is
317-
// the same path-prefix switch the call site previously inlined.
349+
// the same path-prefix switch the call site previously inlined; the
350+
// resource-prefix half of it lives in resourceHandlerFor so this
351+
// function stays under the cyclop ceiling as new resources land.
318352
func (t apiRouteTable) dispatch(w http.ResponseWriter, r *http.Request) {
319-
switch {
320-
case r.URL.Path == "/admin/api/v1/auth/login":
353+
switch r.URL.Path {
354+
case "/admin/api/v1/auth/login":
321355
t.login.ServeHTTP(w, r)
322-
case r.URL.Path == "/admin/api/v1/auth/logout":
356+
return
357+
case "/admin/api/v1/auth/logout":
323358
t.logout.ServeHTTP(w, r)
324-
case r.URL.Path == "/admin/api/v1/cluster":
359+
return
360+
case "/admin/api/v1/cluster":
325361
t.cluster.ServeHTTP(w, r)
326-
case r.URL.Path == "/admin/api/v1/keyviz/matrix":
327-
t.keyviz.ServeHTTP(w, r)
328-
case t.dynamo != nil && isDynamoPath(r.URL.Path):
329-
t.dynamo.ServeHTTP(w, r)
330-
case t.s3 != nil && isS3Path(r.URL.Path):
331-
t.s3.ServeHTTP(w, r)
362+
return
363+
}
364+
if h := t.resourceHandlerFor(r.URL.Path); h != nil {
365+
h.ServeHTTP(w, r)
366+
return
367+
}
368+
writeJSONError(w, http.StatusNotFound, "unknown_endpoint",
369+
"no admin API handler is registered for this path")
370+
}
371+
372+
// resourceHandlerFor returns the handler that owns the URL path's
373+
// resource family, or nil when no resource matches. Pulled out of
374+
// dispatch so dispatch stays under cyclop=10 even as new admin
375+
// resources (Dynamo, S3, SQS, KeyViz, future) get added.
376+
//
377+
// KeyViz is *always* registered (the constructor wires a non-nil
378+
// handler that itself emits 503 keyviz_disabled when the underlying
379+
// sampler is nil), so the switch matches against an exact path
380+
// equality and never against a nil receiver.
381+
func (t apiRouteTable) resourceHandlerFor(path string) http.Handler {
382+
switch {
383+
case t.keyviz != nil && path == "/admin/api/v1/keyviz/matrix":
384+
return t.keyviz
385+
case t.dynamo != nil && isDynamoPath(path):
386+
return t.dynamo
387+
case t.s3 != nil && isS3Path(path):
388+
return t.s3
389+
case t.sqs != nil && isSqsPath(path):
390+
return t.sqs
332391
default:
333-
writeJSONError(w, http.StatusNotFound, "unknown_endpoint",
334-
"no admin API handler is registered for this path")
392+
return nil
335393
}
336394
}
337395

@@ -343,6 +401,10 @@ func isS3Path(p string) bool {
343401
return p == pathS3Buckets || strings.HasPrefix(p, pathPrefixS3Buckets)
344402
}
345403

404+
func isSqsPath(p string) bool {
405+
return p == pathSqsQueues || strings.HasPrefix(p, pathPrefixSqsQueues)
406+
}
407+
346408
func errMissing(field string) error {
347409
return &missingDepError{field: field}
348410
}

0 commit comments

Comments
 (0)