diff --git a/adapter/s3_admin.go b/adapter/s3_admin.go new file mode 100644 index 000000000..8cef130d3 --- /dev/null +++ b/adapter/s3_admin.go @@ -0,0 +1,177 @@ +package adapter + +import ( + "bytes" + "context" + "sort" + + "github.com/bootjp/elastickv/internal/s3keys" + "github.com/bootjp/elastickv/store" + "github.com/cockroachdb/errors" +) + +// adminBucketScanPage is the per-iteration ScanAt page size used by +// AdminListBuckets. Smaller than s3MaxKeys is unnecessary — +// ScanAt's per-call memory budget is already this size on the +// SigV4 listBuckets path — but we use it as a named constant so the +// loop's intent is explicit. AdminListBuckets accumulates pages +// until the prefix is exhausted, so the total returned size is +// bounded by the cluster's bucket count rather than this knob. +const adminBucketScanPage = 1000 + +// AdminBucketSummary is the bucket-level information the admin +// dashboard surfaces. It deliberately projects only the fields the +// dashboard needs so the package's wire-format types +// (s3BucketMeta, s3ListBucketsResult) stay internal. +// +// CreatedAtHLC is the same physical-time-bearing HLC the bucket +// metadata persists; the admin HTTP handler formats it for the SPA. +// ACL is the canned-ACL string ("private" / "public-read") — the +// admin layer does not expand it into the AWS ACL XML grant tree +// because the dashboard renders the canned form directly. +type AdminBucketSummary struct { + Name string + ACL string + CreatedAtHLC uint64 + Generation uint64 + Region string + Owner string +} + +// AdminListBuckets returns every S3-style bucket this server knows +// about, in lexicographic order (the metadata-prefix scan natural +// ordering). Intended for the in-process admin listener as the +// SigV4-free counterpart to the listBuckets HTTP handler. +// +// Unlike the SigV4 path (which intentionally caps each call at +// s3MaxKeys = 1000 because the AWS API is page-based), the admin +// dashboard's pagination is implemented at the handler layer, which +// expects this method to return the full set. We loop the per-page +// ScanAt until the metadata prefix is exhausted — same pattern as +// scanAllByPrefixAt on the Dynamo side (Codex P1 + Claude Issue 1 +// on PR #658). +// +// Returns an empty slice (not nil) when no buckets exist so JSON +// callers see `[]` instead of `null`. +func (s *S3Server) AdminListBuckets(ctx context.Context) ([]AdminBucketSummary, error) { + readTS := s.readTS() + readPin := s.pinReadTS(readTS) + defer readPin.Release() + prefix := []byte(s3keys.BucketMetaPrefix) + end := prefixScanEnd(prefix) + start := bytes.Clone(prefix) + out := make([]AdminBucketSummary, 0, adminBucketScanPage) + for { + kvs, err := s.store.ScanAt(ctx, start, end, adminBucketScanPage, readTS) + if err != nil { + return nil, errors.Wrap(err, "admin list buckets: scan metadata") + } + if len(kvs) == 0 { + break + } + appended, halt, err := appendAdminBucketSummaries(out, kvs, prefix) + if err != nil { + return nil, err + } + out = appended + if halt { + // A key outside the metadata prefix means the + // table-of-contents layout changed mid-scan; returning + // what we have is safer than fabricating a summary + // from an unrelated key. + return finaliseAdminBucketList(out), nil + } + if len(kvs) < adminBucketScanPage { + break + } + start = nextScanCursor(kvs[len(kvs)-1].Key) + if end != nil && bytes.Compare(start, end) > 0 { + break + } + } + return finaliseAdminBucketList(out), nil +} + +// appendAdminBucketSummaries projects one ScanAt page into the +// accumulating result slice. Returns the extended slice plus a halt +// flag the caller uses to short-circuit when ScanAt yielded a key +// outside the bucket-meta prefix (a defensive check that should not +// trigger in practice but locks the contract). Splitting this out +// keeps AdminListBuckets under the cyclomatic ceiling without +// hiding the per-row decode + skip logic. +func appendAdminBucketSummaries(out []AdminBucketSummary, kvs []*store.KVPair, prefix []byte) ([]AdminBucketSummary, bool, error) { + for _, kvp := range kvs { + if !bytes.HasPrefix(kvp.Key, prefix) { + return out, true, nil + } + bucket, ok := s3keys.ParseBucketMetaKey(kvp.Key) + if !ok { + continue + } + meta, err := decodeS3BucketMeta(kvp.Value) + if err != nil { + return nil, false, errors.Wrapf(err, "admin list buckets: decode metadata for %q", bucket) + } + if meta == nil { + continue + } + out = append(out, summaryFromBucketMeta(bucket, meta)) + } + return out, false, nil +} + +// finaliseAdminBucketList sorts the accumulated summaries and +// returns the result. Pulled out because the scan loop has two +// early-exit branches (prefix-mismatch defensive return + the +// natural end-of-prefix exit) and both must guarantee +// lexicographic ordering — one place to enforce it is safer than +// two near-identical sort.Slice calls. +func finaliseAdminBucketList(out []AdminBucketSummary) []AdminBucketSummary { + // ScanAt yields metadata-prefix order, which is already + // lexicographic by escaped name on the ASCII bucket-name + // alphabet. The final sort is defensive against a future + // key-encoding change rather than a correction today. + sort.Slice(out, func(i, j int) bool { return out[i].Name < out[j].Name }) + return out +} + +// AdminDescribeBucket returns the bucket-level snapshot for name. +// The triple (result, present, error) lets admin callers +// distinguish a genuine "not found" from a storage error without +// sniffing sentinels — when the bucket is missing the function +// returns (nil, false, nil), mirroring AdminDescribeTable's +// contract on the Dynamo side. +// +// Like AdminListBuckets this is a read-only path that bypasses +// SigV4. The HTTP admin handler enforces session + CSRF + role at +// the boundary; the adapter trusts the caller for authentication +// (Section 3.2's exception for read-only paths). +func (s *S3Server) AdminDescribeBucket(ctx context.Context, name string) (*AdminBucketSummary, bool, error) { + readTS := s.readTS() + readPin := s.pinReadTS(readTS) + defer readPin.Release() + meta, exists, err := s.loadBucketMetaAt(ctx, name, readTS) + if err != nil { + return nil, false, errors.Wrapf(err, "admin describe bucket %q", name) + } + if !exists || meta == nil { + return nil, false, nil + } + summary := summaryFromBucketMeta(name, meta) + return &summary, true, nil +} + +// summaryFromBucketMeta projects the on-disk metadata into the +// admin DTO. Pulled out so list and describe both produce the same +// shape, including the empty-string defaults for optional fields — +// the SPA depends on these being present even when blank. +func summaryFromBucketMeta(name string, meta *s3BucketMeta) AdminBucketSummary { + return AdminBucketSummary{ + Name: name, + ACL: meta.Acl, + CreatedAtHLC: meta.CreatedAtHLC, + Generation: meta.Generation, + Region: meta.Region, + Owner: meta.Owner, + } +} diff --git a/adapter/s3_admin_test.go b/adapter/s3_admin_test.go new file mode 100644 index 000000000..70760649d --- /dev/null +++ b/adapter/s3_admin_test.go @@ -0,0 +1,135 @@ +package adapter + +import ( + "context" + "fmt" + "net/http" + "net/http/httptest" + "testing" + + "github.com/bootjp/elastickv/store" + "github.com/stretchr/testify/require" +) + +// TestS3Server_AdminListBuckets_EmptyReturnsEmptySlice covers the +// "no buckets at all" case so the admin handler can rely on getting +// an empty slice — not nil — and produce a stable `[]` JSON shape. +func TestS3Server_AdminListBuckets_EmptyReturnsEmptySlice(t *testing.T) { + t.Parallel() + + st := store.NewMVCCStore() + server := NewS3Server(nil, "", st, newLocalAdapterCoordinator(st), nil) + + got, err := server.AdminListBuckets(context.Background()) + require.NoError(t, err) + require.NotNil(t, got, "must return non-nil slice for empty state so the admin JSON shape is `[]`") + require.Empty(t, got) +} + +// TestS3Server_AdminListBuckets_ReflectsCreatedBuckets confirms the +// SigV4-bypass admin path sees the same buckets a normal SigV4 +// CreateBucket flow produced. The two views share loadBucketMetaAt +// + the metadata-prefix scan, so any drift here is an encoding bug +// in summaryFromBucketMeta — exactly the regression the test pins. +func TestS3Server_AdminListBuckets_ReflectsCreatedBuckets(t *testing.T) { + t.Parallel() + + st := store.NewMVCCStore() + server := NewS3Server(nil, "", st, newLocalAdapterCoordinator(st), nil) + + for _, name := range []string{"alpha", "bravo", "charlie"} { + rec := httptest.NewRecorder() + req := newS3TestRequest(http.MethodPut, "/"+name, nil) + server.handle(rec, req) + require.Equal(t, http.StatusOK, rec.Code, "create %s", name) + } + + got, err := server.AdminListBuckets(context.Background()) + require.NoError(t, err) + require.Len(t, got, 3) + // ScanAt produces metadata-prefix order (lexicographic by + // escaped name); summaryFromBucketMeta preserves that. + require.Equal(t, "alpha", got[0].Name) + require.Equal(t, "bravo", got[1].Name) + require.Equal(t, "charlie", got[2].Name) + for _, b := range got { + require.Equal(t, s3AclPrivate, b.ACL, + "unspecified ACL must default to private (matches createBucket)") + require.NotZero(t, b.CreatedAtHLC, "creation HLC must be populated") + require.NotZero(t, b.Generation, "generation must be populated") + } +} + +// TestS3Server_AdminDescribeBucket_Existing returns the populated +// summary with ACL / region preserved through the bridge, and +// (nil, false, nil) for a missing name. The handler depends on the +// (nil, false, nil) shape to differentiate "not found" from a +// storage failure without sniffing sentinels. +func TestS3Server_AdminDescribeBucket_Existing(t *testing.T) { + t.Parallel() + + st := store.NewMVCCStore() + server := NewS3Server(nil, "", st, newLocalAdapterCoordinator(st), nil) + + rec := httptest.NewRecorder() + req := newS3TestRequest(http.MethodPut, "/orders", nil) + req.Header.Set("x-amz-acl", s3AclPublicRead) + server.handle(rec, req) + require.Equal(t, http.StatusOK, rec.Code) + + got, exists, err := server.AdminDescribeBucket(context.Background(), "orders") + require.NoError(t, err) + require.True(t, exists) + require.NotNil(t, got) + require.Equal(t, "orders", got.Name) + require.Equal(t, s3AclPublicRead, got.ACL, + "explicit x-amz-acl must round-trip through the admin describe path") + require.NotZero(t, got.CreatedAtHLC) +} + +func TestS3Server_AdminDescribeBucket_Missing(t *testing.T) { + t.Parallel() + + st := store.NewMVCCStore() + server := NewS3Server(nil, "", st, newLocalAdapterCoordinator(st), nil) + + got, exists, err := server.AdminDescribeBucket(context.Background(), "no-such-bucket") + require.NoError(t, err) + require.False(t, exists) + require.Nil(t, got) +} + +// TestS3Server_AdminListBuckets_PaginatesPastSinglePage pins the +// fix for the truncation bug Codex P1 / Claude Issue 1 / Gemini +// flagged on PR #658: AdminListBuckets must walk the metadata +// prefix until exhausted, not stop at adminBucketScanPage. The +// test exceeds the per-iteration page by 100 buckets (1100 total) +// so a regression that re-introduces a single-call ScanAt would +// silently drop the tail and the assertion fails. +// +// Total bucket count (1100) is small enough to keep the test +// O(seconds) on the in-memory MVCC store. Names are zero-padded to +// 4 digits so lexicographic order matches numeric order — the test +// pins both the count AND the ordering contract. +func TestS3Server_AdminListBuckets_PaginatesPastSinglePage(t *testing.T) { + t.Parallel() + + st := store.NewMVCCStore() + server := NewS3Server(nil, "", st, newLocalAdapterCoordinator(st), nil) + + const total = adminBucketScanPage + 100 + for i := range total { + name := fmt.Sprintf("bucket-%04d", i) + rec := httptest.NewRecorder() + req := newS3TestRequest(http.MethodPut, "/"+name, nil) + server.handle(rec, req) + require.Equal(t, http.StatusOK, rec.Code, "create %s", name) + } + + got, err := server.AdminListBuckets(context.Background()) + require.NoError(t, err) + require.Len(t, got, total, + "AdminListBuckets must continue past adminBucketScanPage; truncating here is the regression") + require.Equal(t, "bucket-0000", got[0].Name) + require.Equal(t, fmt.Sprintf("bucket-%04d", total-1), got[total-1].Name) +} diff --git a/internal/admin/buckets_source.go b/internal/admin/buckets_source.go new file mode 100644 index 000000000..19234d163 --- /dev/null +++ b/internal/admin/buckets_source.go @@ -0,0 +1,75 @@ +package admin + +import ( + "context" + "errors" +) + +// BucketsSource is the in-process surface the admin S3 handler +// dispatches into. It mirrors TablesSource on the Dynamo side +// (Section 3.2 of the admin design): defining the contract here lets +// the bridge in main_admin.go translate adapter errors into the +// admin-package vocabulary without the adapter package importing +// internal/admin. +// +// All methods are read-only in this slice. Write methods +// (AdminCreateBucket, AdminPutBucketAcl, AdminDeleteBucket) ship in +// the next slice with AdminForward integration so a follower can +// hand them off to the leader transparently. +type BucketsSource interface { + // AdminListBuckets returns every bucket this server knows about, + // in stable lexicographic order. The empty list is a valid + // response — the handler returns `{"buckets":[]}` rather than + // 404 so the SPA can distinguish "no buckets yet" from "S3 + // admin not configured" (the latter shape is a 404 from the + // router fallthrough). + AdminListBuckets(ctx context.Context) ([]BucketSummary, error) + // AdminDescribeBucket returns the metadata snapshot for name. + // The triple (result, present, error) lets the handler emit a + // 404 for missing buckets without sniffing sentinels; storage + // failures still surface via the error return. + AdminDescribeBucket(ctx context.Context, name string) (*BucketSummary, bool, error) +} + +// BucketSummary is the bucket-level DTO the SPA receives. The JSON +// shape matches the design doc Section 4.1 / web/admin's +// `S3Bucket` interface — bucket_name + acl + created_at — plus +// generation/region/owner for operators inspecting via curl. +// +// CreatedAt is an ISO-8601 string (UTC, second precision). The +// adapter persists it as an HLC; the handler formats. Producing +// the formatted string here rather than in the SPA keeps timezone +// rendering server-side and prevents drift between the two SPA +// pages that surface buckets (S3List + S3Detail). +type BucketSummary struct { + Name string `json:"bucket_name"` + ACL string `json:"acl,omitempty"` + CreatedAt string `json:"created_at,omitempty"` + Generation uint64 `json:"generation,omitempty"` + Region string `json:"region,omitempty"` + Owner string `json:"owner,omitempty"` +} + +// ErrBucketsForbidden is returned when the principal lacks the +// role required for the operation. Maps to 403. Kept as its own +// sentinel (rather than reusing ErrTablesForbidden) so a future +// per-resource role model can diverge without breaking either +// handler's match list. +var ErrBucketsForbidden = errors.New("admin buckets: principal lacks required role") + +// ErrBucketsNotLeader is returned when the local node is not the +// Raft leader for the S3 group. Read-only methods do NOT return +// this — list / describe are leader-agnostic in this slice. Kept +// here so the next slice's write methods can wire it without +// adding a new sentinel. +var ErrBucketsNotLeader = errors.New("admin buckets: local node is not the raft leader") + +// ErrBucketsNotFound is returned when DELETE / DESCRIBE / a +// follow-up read targets a bucket that does not exist. The triple +// (nil, false, nil) is the preferred signal for the read path; +// this sentinel covers the future write paths only. +var ErrBucketsNotFound = errors.New("admin buckets: bucket does not exist") + +// ErrBucketsAlreadyExists is returned when CREATE targets a name +// that is already in use. Maps to 409. Reserved for the next slice. +var ErrBucketsAlreadyExists = errors.New("admin buckets: bucket already exists") diff --git a/internal/admin/dynamo_handler.go b/internal/admin/dynamo_handler.go index f94a11dc9..d4c95daa2 100644 --- a/internal/admin/dynamo_handler.go +++ b/internal/admin/dynamo_handler.go @@ -3,7 +3,6 @@ package admin import ( "bytes" "context" - "encoding/base64" "errors" "io" "log/slog" @@ -265,14 +264,8 @@ type dynamoListResponse struct { } func (h *DynamoHandler) handleList(w http.ResponseWriter, r *http.Request) { - limit, err := parseDynamoListLimit(r.URL.Query().Get("limit")) - if err != nil { - writeJSONError(w, http.StatusBadRequest, "invalid_limit", err.Error()) - return - } - startAfter, err := decodeDynamoNextToken(r.URL.Query().Get("next_token")) - if err != nil { - writeJSONError(w, http.StatusBadRequest, "invalid_next_token", err.Error()) + limit, startAfter, ok := parseListPaginationParams(w, r, defaultDynamoListLimit, dynamoListLimitMax) + if !ok { return } @@ -306,7 +299,7 @@ func (h *DynamoHandler) handleList(w http.ResponseWriter, r *http.Request) { page, next := paginateDynamoTableNames(names, startAfter, limit) resp := dynamoListResponse{Tables: page} if next != "" { - resp.NextToken = encodeDynamoNextToken(next) + resp.NextToken = encodeListNextToken(next) } // paginateDynamoTableNames is total over its input — it always // returns a non-nil slice (an empty []string{} on the @@ -716,47 +709,6 @@ func (h *DynamoHandler) handleDescribe(w http.ResponseWriter, r *http.Request, n writeAdminJSON(w, r.Context(), h.logger, summary) } -// parseDynamoListLimit translates the ?limit= query parameter into a -// concrete page size. Empty falls back to the design-doc default; -// negatives or non-numerics are an outright client error; values past -// the ceiling are silently clamped (not an error) so the SPA's -// "request the maximum" pattern works without a probe round-trip. -func parseDynamoListLimit(raw string) (int, error) { - if raw == "" { - return defaultDynamoListLimit, nil - } - n, err := strconv.Atoi(raw) - if err != nil { - return 0, errors.New("limit must be an integer") - } - if n <= 0 { - return 0, errors.New("limit must be positive") - } - if n > dynamoListLimitMax { - return dynamoListLimitMax, nil - } - return n, nil -} - -// decodeDynamoNextToken reverses encodeDynamoNextToken. We base64-wrap -// the raw last-table-name so the wire token is opaque from the -// client's perspective and we can change the cursor representation -// later without breaking the API contract. -func decodeDynamoNextToken(raw string) (string, error) { - if raw == "" { - return "", nil - } - decoded, err := base64.RawURLEncoding.DecodeString(raw) - if err != nil { - return "", errors.New("next_token is not valid base64url") - } - return string(decoded), nil -} - -func encodeDynamoNextToken(name string) string { - return base64.RawURLEncoding.EncodeToString([]byte(name)) -} - // paginateDynamoTableNames slices `names` (already lex-sorted by the // adapter) into a single page starting strictly after `startAfter`. // The second return is the opaque cursor the client should pass back diff --git a/internal/admin/list_pagination.go b/internal/admin/list_pagination.go new file mode 100644 index 000000000..1825725f5 --- /dev/null +++ b/internal/admin/list_pagination.go @@ -0,0 +1,78 @@ +package admin + +import ( + "encoding/base64" + "errors" + "net/http" + "strconv" +) + +// parseListPaginationParams extracts the (limit, startAfter) pair +// from a request's query string. Shared by every list endpoint — +// dynamo tables, s3 buckets, and any future paginated read — so the +// validation policy stays in one place: empty limit → caller's +// default; non-numeric / non-positive → 400 with a precise message; +// oversize → caller-supplied ceiling silently clamps; missing +// next_token → ""; non-base64url next_token → 400. +// +// On any rejection the helper writes the JSON error and returns +// ok=false so the handler can short-circuit. Returning the response +// from inside the helper keeps the call site to a single +// `if !ok { return }` line. +func parseListPaginationParams(w http.ResponseWriter, r *http.Request, defaultLimit, maxLimit int) (limit int, startAfter string, ok bool) { + rawLimit := r.URL.Query().Get("limit") + limit, err := parseListLimit(rawLimit, defaultLimit, maxLimit) + if err != nil { + writeJSONError(w, http.StatusBadRequest, "invalid_limit", err.Error()) + return 0, "", false + } + startAfter, err = decodeListNextToken(r.URL.Query().Get("next_token")) + if err != nil { + writeJSONError(w, http.StatusBadRequest, "invalid_next_token", err.Error()) + return 0, "", false + } + return limit, startAfter, true +} + +// parseListLimit centralises the limit-parsing rules: empty → +// default, negatives / non-numerics → typed error, oversize → silent +// clamp. Per-resource handlers wrap this with their own +// (default, max) pair so a future per-resource ceiling change does +// not have to re-thread through the shared helper. +func parseListLimit(raw string, defaultLimit, maxLimit int) (int, error) { + if raw == "" { + return defaultLimit, nil + } + n, err := strconv.Atoi(raw) + if err != nil { + return 0, errors.New("limit must be an integer") + } + if n <= 0 { + return 0, errors.New("limit must be positive") + } + if n > maxLimit { + return maxLimit, nil + } + return n, nil +} + +// decodeListNextToken reverses encodeListNextToken. We base64url-wrap +// the raw cursor string so the wire token is opaque from the +// client's perspective and the server can change the cursor +// representation later without breaking the API contract. +func decodeListNextToken(raw string) (string, error) { + if raw == "" { + return "", nil + } + decoded, err := base64.RawURLEncoding.DecodeString(raw) + if err != nil { + return "", errors.New("next_token is not valid base64url") + } + return string(decoded), nil +} + +// encodeListNextToken is the encoder counterpart used by every list +// handler when emitting a non-empty cursor in the response. +func encodeListNextToken(cursor string) string { + return base64.RawURLEncoding.EncodeToString([]byte(cursor)) +} diff --git a/internal/admin/s3_handler.go b/internal/admin/s3_handler.go new file mode 100644 index 000000000..90226900c --- /dev/null +++ b/internal/admin/s3_handler.go @@ -0,0 +1,229 @@ +package admin + +import ( + "errors" + "log/slog" + "net/http" + "sort" + "strings" + "time" +) + +// Pagination knobs for the read-only S3 bucket list endpoint. +// Values mirror the Dynamo side (defaultDynamoListLimit / +// dynamoListLimitMax) so a SPA component that reuses the same +// "page size" preset behaves identically across both resources. +const ( + defaultS3ListLimit = 100 + s3ListLimitMax = 1000 +) + +const ( + pathS3Buckets = "/admin/api/v1/s3/buckets" + pathPrefixS3Buckets = pathS3Buckets + "/" +) + +// hlcPhysicalShift is the bit position at which the 48-bit physical +// half of the HLC timestamp lives. Mirrors kv.HLC's wire format +// (upper 48 bits = Unix ms, lower 16 bits = logical counter). Pulled +// out here so the formatter is self-contained — admin must not +// import kv to read the field. +const hlcPhysicalShift = 16 + +// S3Handler serves /admin/api/v1/s3/buckets and the +// /admin/api/v1/s3/buckets/{name} sub-tree. Construct via +// NewS3Handler and hand to the admin router. +// +// The handler depends on a BucketsSource for in-process dispatch. +// When source is nil the constructor returns nil, which is the +// well-known "S3 admin disabled" signal the router keys off of +// (the routes fall through to the unknown-endpoint 404). +// +// Slice 1 ships only the read-only paths (list + describe). The +// next slice will add a RoleStore for live role re-validation on +// the write endpoints (mirrors DynamoHandler.WithRoleStore). +type S3Handler struct { + source BucketsSource + logger *slog.Logger +} + +// NewS3Handler wires a BucketsSource into the HTTP handler. Returns +// nil when source is nil so a build that ships without the S3 +// adapter can pass the zero value to ServerDeps and have the routes +// silently disappear from the wire — matching the Tables nil +// contract on the Dynamo side. +func NewS3Handler(source BucketsSource) *S3Handler { + if source == nil { + return nil + } + return &S3Handler{source: source, logger: slog.Default()} +} + +// WithLogger swaps the slog destination. Returns the receiver so +// option calls chain at construction sites +// (NewS3Handler(...).WithLogger(...)). +func (h *S3Handler) WithLogger(logger *slog.Logger) *S3Handler { + if logger != nil { + h.logger = logger + } + return h +} + +// ServeHTTP routes /buckets and /buckets/{name}. The next slice +// wires POST/PUT/DELETE; for now those return 405 so the SPA can +// distinguish "endpoint not configured" (404) from "method not +// implemented yet" (405). +func (h *S3Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + switch { + case r.URL.Path == pathS3Buckets: + switch r.Method { + case http.MethodGet: + h.handleList(w, r) + default: + writeJSONError(w, http.StatusMethodNotAllowed, "method_not_allowed", "only GET is implemented for /s3/buckets in this build") + } + case strings.HasPrefix(r.URL.Path, pathPrefixS3Buckets): + name := strings.TrimPrefix(r.URL.Path, pathPrefixS3Buckets) + // /buckets/{name}/acl is reserved for the next slice. Reject + // any sub-path with 404 here so a SPA bug that calls PUT /acl + // on this build sees a sensible error instead of mistakenly + // hitting the describe path with a "{name}/acl" string. The + // pinned test is TestS3Handler_DescribeBucket_SubpathReturns404 + // (CodeRabbit minor on PR #658 caught the previous comment + // referring to 405). + if strings.Contains(name, "/") { + writeJSONError(w, http.StatusNotFound, "not_found", + "no admin S3 handler is registered for this path") + return + } + switch r.Method { + case http.MethodGet: + h.handleDescribe(w, r, name) + default: + writeJSONError(w, http.StatusMethodNotAllowed, "method_not_allowed", "only GET is implemented for /s3/buckets/{name} in this build") + } + default: + writeJSONError(w, http.StatusNotFound, "not_found", "") + } +} + +// s3ListResponse is the JSON shape returned by GET /s3/buckets. +// Buckets is always emitted as `[]` even when empty so the SPA +// can use a presence check rather than guard against null. +type s3ListResponse struct { + Buckets []BucketSummary `json:"buckets"` + NextToken string `json:"next_token,omitempty"` +} + +func (h *S3Handler) handleList(w http.ResponseWriter, r *http.Request) { + limit, startAfter, ok := parseListPaginationParams(w, r, defaultS3ListLimit, s3ListLimitMax) + if !ok { + return + } + buckets, err := h.source.AdminListBuckets(r.Context()) + if err != nil { + // Map ErrBucketsForbidden to 403 here too so the contract + // stays symmetric with handleDescribe — when slice 2 wires + // a role gate on the source, the SPA gets the same 403 it + // would on the describe path rather than a generic 500 + // (Gemini medium on PR #658). + if errors.Is(err, ErrBucketsForbidden) { + writeJSONError(w, http.StatusForbidden, "forbidden", + "this endpoint requires a full-access role") + return + } + h.logger.LogAttrs(r.Context(), slog.LevelError, "admin s3 list buckets failed", + slog.String("error", err.Error()), + ) + writeJSONError(w, http.StatusInternalServerError, "s3_list_failed", + "failed to list buckets; see server logs") + return + } + page, next := paginateBuckets(buckets, startAfter, limit) + resp := s3ListResponse{Buckets: page} + if next != "" { + resp.NextToken = encodeListNextToken(next) + } + writeAdminJSON(w, r.Context(), h.logger, resp) +} + +func (h *S3Handler) handleDescribe(w http.ResponseWriter, r *http.Request, name string) { + if name == "" { + writeJSONError(w, http.StatusBadRequest, "invalid_bucket_name", "bucket name is empty") + return + } + summary, exists, err := h.source.AdminDescribeBucket(r.Context(), name) + if err != nil { + // Differentiate the two structured failures we expect: + // - ErrBucketsForbidden: the bridge translated an adapter- + // side authorization rejection. 403. + // - everything else: a real storage failure. 500 + log. + if errors.Is(err, ErrBucketsForbidden) { + writeJSONError(w, http.StatusForbidden, "forbidden", + "this endpoint requires a full-access role") + return + } + h.logger.LogAttrs(r.Context(), slog.LevelError, "admin s3 describe bucket failed", + slog.String("bucket", name), + slog.String("error", err.Error()), + ) + writeJSONError(w, http.StatusInternalServerError, "s3_describe_failed", + "failed to describe bucket; see server logs") + return + } + if !exists { + writeJSONError(w, http.StatusNotFound, "not_found", "bucket does not exist") + return + } + writeAdminJSON(w, r.Context(), h.logger, summary) +} + +// FormatBucketCreatedAt converts an HLC timestamp into the ISO-8601 +// string the SPA expects. Exposed (rather than kept package-private) +// so the bridge in main_admin.go can call it from the BucketsSource +// implementation — both the handler's response and any future +// audit-log enrichment land on identical formatting. +func FormatBucketCreatedAt(hlc uint64) string { + if hlc == 0 { + return "" + } + ms := int64(hlc >> hlcPhysicalShift) //nolint:gosec // 48-bit physical half always fits in int64. + return time.UnixMilli(ms).UTC().Format(time.RFC3339) +} + +// paginateBuckets slices `buckets` (already lex-sorted by the +// adapter) into a single page starting strictly after `startAfter`. +// Returns the page plus the opaque cursor for the next call ("" if +// this was the last page). +// +// Mirrors paginateDynamoTableNames but operates on []BucketSummary +// rather than []string. A generic helper would force callers to +// write a key-extractor closure on every call site, which obscures +// the resume contract more than the four-line copy clarifies it. +func paginateBuckets(buckets []BucketSummary, startAfter string, limit int) ([]BucketSummary, string) { + start := 0 + if startAfter != "" { + idx := sort.Search(len(buckets), func(i int) bool { return buckets[i].Name >= startAfter }) + switch { + case idx >= len(buckets): + return []BucketSummary{}, "" + case buckets[idx].Name == startAfter: + start = idx + 1 + default: + start = idx + } + } + end := start + limit + if end > len(buckets) { + end = len(buckets) + } + // A slice expression on a non-nil slice is itself non-nil even + // when its length is zero, so the result already produces the + // `"buckets":[]` JSON shape the SPA expects without an extra + // nil-guard (Claude Issue 2 on PR #658). + page := buckets[start:end] + if end < len(buckets) && len(page) > 0 { + return page, page[len(page)-1].Name + } + return page, "" +} diff --git a/internal/admin/s3_handler_test.go b/internal/admin/s3_handler_test.go new file mode 100644 index 000000000..6b408f4fb --- /dev/null +++ b/internal/admin/s3_handler_test.go @@ -0,0 +1,301 @@ +package admin + +import ( + "context" + "encoding/base64" + "errors" + "net/http" + "net/http/httptest" + "sort" + "testing" + + "github.com/goccy/go-json" + "github.com/stretchr/testify/require" +) + +// stubBucketsSource is the in-memory test double the S3 admin +// handler tests use. AdminListBuckets returns summaries in lex order +// of bucket name, matching the adapter contract; descErr / listErr +// let tests trigger the storage-failure paths without standing up a +// real adapter. +type stubBucketsSource struct { + buckets map[string]BucketSummary + listErr error + descErr error +} + +func (s *stubBucketsSource) AdminListBuckets(_ context.Context) ([]BucketSummary, error) { + if s.listErr != nil { + return nil, s.listErr + } + out := make([]BucketSummary, 0, len(s.buckets)) + names := make([]string, 0, len(s.buckets)) + for k := range s.buckets { + names = append(names, k) + } + sort.Strings(names) + for _, n := range names { + out = append(out, s.buckets[n]) + } + return out, nil +} + +func (s *stubBucketsSource) AdminDescribeBucket(_ context.Context, name string) (*BucketSummary, bool, error) { + if s.descErr != nil { + return nil, false, s.descErr + } + b, ok := s.buckets[name] + if !ok { + return nil, false, nil + } + return &b, true, nil +} + +func newS3HandlerForTest(src BucketsSource) *S3Handler { + return NewS3Handler(src) +} + +func TestNewS3Handler_NilSourceReturnsNil(t *testing.T) { + // A nil source is the well-known "S3 admin disabled" signal so a + // build that ships without the S3 adapter can pass nil into + // ServerDeps.Buckets and have the routes silently disappear. + require.Nil(t, NewS3Handler(nil)) +} + +func TestS3Handler_ListBuckets_EmptyArrayNotNull(t *testing.T) { + h := newS3HandlerForTest(&stubBucketsSource{buckets: map[string]BucketSummary{}}) + req := httptest.NewRequest(http.MethodGet, pathS3Buckets, nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + + require.Equal(t, http.StatusOK, rec.Code) + require.Contains(t, rec.Body.String(), `"buckets":[]`) + require.NotContains(t, rec.Body.String(), `"next_token"`) +} + +func TestS3Handler_ListBuckets_HappyPath(t *testing.T) { + h := newS3HandlerForTest(&stubBucketsSource{buckets: map[string]BucketSummary{ + "alpha": {Name: "alpha", ACL: "private", Generation: 1}, + "bravo": {Name: "bravo", ACL: "public-read", Generation: 2}, + }}) + req := httptest.NewRequest(http.MethodGet, pathS3Buckets, nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + + require.Equal(t, http.StatusOK, rec.Code) + var resp s3ListResponse + require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &resp)) + require.Len(t, resp.Buckets, 2) + require.Equal(t, "alpha", resp.Buckets[0].Name) + require.Equal(t, "private", resp.Buckets[0].ACL) + require.Equal(t, "bravo", resp.Buckets[1].Name) + require.Equal(t, "public-read", resp.Buckets[1].ACL) + require.Empty(t, resp.NextToken) +} + +func TestS3Handler_ListBuckets_PaginationCursorRoundtrips(t *testing.T) { + // Three buckets + limit=2 should produce a first page of 2 + + // a next_token; passing that token back yields the third. + src := &stubBucketsSource{buckets: map[string]BucketSummary{ + "a-bucket": {Name: "a-bucket"}, + "b-bucket": {Name: "b-bucket"}, + "c-bucket": {Name: "c-bucket"}, + }} + h := newS3HandlerForTest(src) + + rec := httptest.NewRecorder() + h.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, pathS3Buckets+"?limit=2", nil)) + require.Equal(t, http.StatusOK, rec.Code) + var page1 s3ListResponse + require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &page1)) + require.Len(t, page1.Buckets, 2) + require.Equal(t, "a-bucket", page1.Buckets[0].Name) + require.Equal(t, "b-bucket", page1.Buckets[1].Name) + require.NotEmpty(t, page1.NextToken) + + // Cursor must be base64url-encoded "b-bucket" (the last name on + // page 1) so the decoder can consume it without round-tripping + // through encodeListNextToken. + decoded, err := base64.RawURLEncoding.DecodeString(page1.NextToken) + require.NoError(t, err) + require.Equal(t, "b-bucket", string(decoded)) + + rec = httptest.NewRecorder() + h.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, + pathS3Buckets+"?limit=2&next_token="+page1.NextToken, nil)) + require.Equal(t, http.StatusOK, rec.Code) + var page2 s3ListResponse + require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &page2)) + require.Len(t, page2.Buckets, 1) + require.Equal(t, "c-bucket", page2.Buckets[0].Name) + require.Empty(t, page2.NextToken) +} + +func TestS3Handler_ListBuckets_RejectsInvalidLimit(t *testing.T) { + cases := []struct { + name string + limit string + want string + }{ + {"non-numeric", "abc", "limit must be an integer"}, + {"zero", "0", "limit must be positive"}, + {"negative", "-3", "limit must be positive"}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + h := newS3HandlerForTest(&stubBucketsSource{}) + req := httptest.NewRequest(http.MethodGet, pathS3Buckets+"?limit="+tc.limit, nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusBadRequest, rec.Code) + require.Contains(t, rec.Body.String(), tc.want) + }) + } +} + +func TestS3Handler_ListBuckets_OversizeLimitClamped(t *testing.T) { + // limit beyond the ceiling is silently clamped (not rejected) so + // the SPA's "request the maximum" pattern works without a probe + // round-trip. Mirrors the Dynamo handler's policy. + h := newS3HandlerForTest(&stubBucketsSource{buckets: map[string]BucketSummary{}}) + req := httptest.NewRequest(http.MethodGet, pathS3Buckets+"?limit=99999", nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusOK, rec.Code) +} + +func TestS3Handler_ListBuckets_RejectsInvalidNextToken(t *testing.T) { + h := newS3HandlerForTest(&stubBucketsSource{}) + req := httptest.NewRequest(http.MethodGet, pathS3Buckets+"?next_token=!!!", nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusBadRequest, rec.Code) + require.Contains(t, rec.Body.String(), "next_token is not valid base64url") +} + +func TestS3Handler_ListBuckets_StorageErrorReturns500(t *testing.T) { + src := &stubBucketsSource{listErr: errors.New("storage backing sentinel LIST-1")} + h := newS3HandlerForTest(src) + req := httptest.NewRequest(http.MethodGet, pathS3Buckets, nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusInternalServerError, rec.Code) + require.Contains(t, rec.Body.String(), "s3_list_failed") + require.NotContains(t, rec.Body.String(), "LIST-1", + "server-side error detail must not leak to the client") +} + +// TestS3Handler_ListBuckets_ForbiddenReturns403 mirrors the +// describe-side coverage for the slice 2 role gate. handleList now +// maps ErrBucketsForbidden to 403 (Gemini medium on PR #658); this +// test pins that behaviour so a future refactor that drops the +// sentinel match does not silently downgrade the response to a +// generic 500. +func TestS3Handler_ListBuckets_ForbiddenReturns403(t *testing.T) { + src := &stubBucketsSource{listErr: ErrBucketsForbidden} + h := newS3HandlerForTest(src) + req := httptest.NewRequest(http.MethodGet, pathS3Buckets, nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusForbidden, rec.Code) + require.Contains(t, rec.Body.String(), "forbidden") +} + +func TestS3Handler_ListBuckets_RejectsNonGet(t *testing.T) { + // POST/PUT/DELETE on /buckets are reserved for the next slice; + // for now the handler returns 405 so a SPA bug that calls them + // against this build sees a sensible error rather than a 404. + cases := []string{http.MethodPost, http.MethodPut, http.MethodDelete, http.MethodPatch} + for _, m := range cases { + t.Run(m, func(t *testing.T) { + h := newS3HandlerForTest(&stubBucketsSource{}) + req := httptest.NewRequest(m, pathS3Buckets, nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusMethodNotAllowed, rec.Code) + }) + } +} + +func TestS3Handler_DescribeBucket_HappyPath(t *testing.T) { + src := &stubBucketsSource{buckets: map[string]BucketSummary{ + "orders": {Name: "orders", ACL: "private", Region: "us-east-1", Generation: 7}, + }} + h := newS3HandlerForTest(src) + req := httptest.NewRequest(http.MethodGet, pathS3Buckets+"/orders", nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusOK, rec.Code) + var got BucketSummary + require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &got)) + require.Equal(t, "orders", got.Name) + require.Equal(t, "private", got.ACL) + require.Equal(t, "us-east-1", got.Region) + require.EqualValues(t, 7, got.Generation) +} + +func TestS3Handler_DescribeBucket_MissingReturns404(t *testing.T) { + h := newS3HandlerForTest(&stubBucketsSource{buckets: map[string]BucketSummary{}}) + req := httptest.NewRequest(http.MethodGet, pathS3Buckets+"/missing", nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusNotFound, rec.Code) + require.Contains(t, rec.Body.String(), "not_found") +} + +func TestS3Handler_DescribeBucket_StorageErrorReturns500(t *testing.T) { + src := &stubBucketsSource{descErr: errors.New("storage backing sentinel DESC-1")} + h := newS3HandlerForTest(src) + req := httptest.NewRequest(http.MethodGet, pathS3Buckets+"/anything", nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusInternalServerError, rec.Code) + require.Contains(t, rec.Body.String(), "s3_describe_failed") + require.NotContains(t, rec.Body.String(), "DESC-1") +} + +func TestS3Handler_DescribeBucket_ForbiddenReturns403(t *testing.T) { + // ErrBucketsForbidden is reserved for the next slice's role + // gate (read-only is fine for any authenticated session today) + // but the handler already maps the sentinel so the slice can + // land without re-touching the error translator. + src := &stubBucketsSource{descErr: ErrBucketsForbidden} + h := newS3HandlerForTest(src) + req := httptest.NewRequest(http.MethodGet, pathS3Buckets+"/locked", nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusForbidden, rec.Code) + require.Contains(t, rec.Body.String(), "forbidden") +} + +func TestS3Handler_DescribeBucket_SubpathReturns404(t *testing.T) { + // /buckets/foo/acl is reserved for the next slice. Until then, + // any path with a slash inside the bucket-name segment must 404 + // rather than mistakenly reach handleDescribe with the full + // "foo/acl" string. + h := newS3HandlerForTest(&stubBucketsSource{buckets: map[string]BucketSummary{ + "foo": {Name: "foo"}, + }}) + req := httptest.NewRequest(http.MethodGet, pathS3Buckets+"/foo/acl", nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusNotFound, rec.Code) +} + +func TestFormatBucketCreatedAt_ZeroProducesEmpty(t *testing.T) { + // Zero HLC means "no creation time recorded" — the SPA renders + // it as a dash, so we emit an empty string rather than the Unix + // epoch (1970-01-01) which would be misleading. + require.Empty(t, FormatBucketCreatedAt(0)) +} + +func TestFormatBucketCreatedAt_RoundtripsSecondPrecision(t *testing.T) { + // HLC's upper 48 bits are Unix ms. 1_777_874_400_000 ms = + // 2026-05-04T06:00:00Z; shift left by hlcPhysicalShift (16) to + // produce a wire HLC, format, and confirm the formatter recovers + // UTC RFC3339 with second precision (sub-second is intentionally + // truncated — the SPA renders timestamps to the second). + const wallMillis = int64(1_777_874_400_000) + hlc := uint64(wallMillis) << hlcPhysicalShift + require.Equal(t, "2026-05-04T06:00:00Z", FormatBucketCreatedAt(hlc)) +} diff --git a/internal/admin/server.go b/internal/admin/server.go index 7742eedd9..479c9424e 100644 --- a/internal/admin/server.go +++ b/internal/admin/server.go @@ -47,6 +47,13 @@ type ServerDeps struct { // node clusters wire the production gRPC client. Forwarder LeaderForwarder + // Buckets is the S3 admin source — read-only in this slice + // (list + describe). Optional: a nil value disables + // /admin/api/v1/s3/buckets{,/{name}} (the mux answers them + // with 404). Mirrors the Tables nil contract for cluster-only + // builds. + Buckets BucketsSource + // StaticFS is the embed.FS (or any fs.FS) backing the SPA. May be // nil during early development; the router renders 404 for // /admin/assets/* and the SPA fallback in that case. @@ -74,24 +81,8 @@ type Server struct { // dependencies are inconsistent enough to be unusable; otherwise it is // total over its configuration space. func NewServer(deps ServerDeps) (*Server, error) { - if deps.Signer == nil { - return nil, errMissing("Signer") - } - if deps.Verifier == nil { - return nil, errMissing("Verifier") - } - if isNilCredentialStore(deps.Credentials) { - return nil, errMissing("Credentials") - } - if deps.Roles == nil { - // A nil role index would silently 403 every login. Treat it - // as a wiring bug rather than a valid "admin is locked down" - // state: operators who really want zero admin access can - // set admin.enabled=false or pass an empty (non-nil) map. - return nil, errMissing("Roles") - } - if deps.ClusterInfo == nil { - return nil, errMissing("ClusterInfo") + if err := validateServerDeps(deps); err != nil { + return nil, err } logger := deps.Logger if logger == nil { @@ -108,29 +99,73 @@ func NewServer(deps ServerDeps) (*Server, error) { } auth := NewAuthService(deps.Signer, deps.Credentials, deps.Roles, authOpts) cluster := NewClusterHandler(deps.ClusterInfo).WithLogger(logger) - var dynamo http.Handler - if deps.Tables != nil { - // Re-evaluate the principal's role on every state- - // changing request against the live role map (Codex P1 - // on PR #635). MapRoleStore wraps the same map the auth - // layer uses for login, so a config reload that updates - // deps.Roles does NOT automatically propagate here — - // operators must restart the listener for revocation to - // take effect, but the JWT no longer extends a revoked - // key past the next request. - dynamoHandler := NewDynamoHandler(deps.Tables). - WithLogger(logger). - WithRoleStore(MapRoleStore(deps.Roles)) - if deps.Forwarder != nil { - dynamoHandler = dynamoHandler.WithLeaderForwarder(deps.Forwarder) - } - dynamo = dynamoHandler - } - mux := buildAPIMux(auth, deps.Verifier, cluster, dynamo, logger) + dynamo := buildDynamoHandlerForDeps(deps, logger) + s3 := buildS3HandlerForDeps(deps, logger) + mux := buildAPIMux(auth, deps.Verifier, cluster, dynamo, s3, logger) router := NewRouter(mux, deps.StaticFS) return &Server{deps: deps, router: router, auth: auth, mux: mux}, nil } +// validateServerDeps centralises the wiring-bug guards that NewServer +// applies before constructing anything. Pulled out so NewServer's own +// body can stay under the cyclomatic-complexity ceiling without +// hiding the contract — every required field is enumerated here. +func validateServerDeps(deps ServerDeps) error { + switch { + case deps.Signer == nil: + return errMissing("Signer") + case deps.Verifier == nil: + return errMissing("Verifier") + case isNilCredentialStore(deps.Credentials): + return errMissing("Credentials") + case deps.Roles == nil: + // A nil role index would silently 403 every login. Treat + // it as a wiring bug rather than a valid "admin is locked + // down" state: operators who really want zero admin + // access can set admin.enabled=false or pass an empty + // (non-nil) map. + return errMissing("Roles") + case deps.ClusterInfo == nil: + return errMissing("ClusterInfo") + } + return nil +} + +// buildDynamoHandlerForDeps assembles the Dynamo HTTP handler from +// ServerDeps when Tables is wired, threading the logger and the +// optional LeaderForwarder. Returns nil when Tables is nil so the +// router falls through to the unknown-endpoint 404. +// +// Re-evaluates the principal's role on every state-changing request +// against the live role map (Codex P1 on PR #635). MapRoleStore +// wraps the same map the auth layer uses for login, so a config +// reload that updates deps.Roles does NOT automatically propagate +// here — operators must restart the listener for revocation to take +// effect, but the JWT no longer extends a revoked key past the next +// request. +func buildDynamoHandlerForDeps(deps ServerDeps, logger *slog.Logger) http.Handler { + if deps.Tables == nil { + return nil + } + h := NewDynamoHandler(deps.Tables). + WithLogger(logger). + WithRoleStore(MapRoleStore(deps.Roles)) + if deps.Forwarder != nil { + h = h.WithLeaderForwarder(deps.Forwarder) + } + return h +} + +// buildS3HandlerForDeps is the parallel constructor for the S3 +// admin handler. Slice 1 is read-only; the next slice will plumb a +// MapRoleStore and the LeaderForwarder through the same shape. +func buildS3HandlerForDeps(deps ServerDeps, logger *slog.Logger) http.Handler { + if deps.Buckets == nil { + return nil + } + return NewS3Handler(deps.Buckets).WithLogger(logger) +} + // Handler returns an http.Handler that serves the full admin surface. // We wrap the router in BodyLimit at the top level so every endpoint // — including /admin/healthz and the static asset / SPA paths — is @@ -160,16 +195,18 @@ func (s *Server) APIHandler() http.Handler { // POST /admin/api/v1/dynamo/tables (auth required, full role) // GET /admin/api/v1/dynamo/tables/{name} (auth required) // DELETE /admin/api/v1/dynamo/tables/{name} (auth required, full role) +// GET /admin/api/v1/s3/buckets (auth required) +// GET /admin/api/v1/s3/buckets/{name} (auth required) // // Body limit applies uniformly. CSRF and Audit middleware apply to // write-capable protected endpoints; login and logout carry their own // audit path inside AuthService because the generic Audit middleware // cannot see the claimed actor at that point in the chain. // -// dynamoHandler may be nil; in that case the dynamo paths fall through -// to the unknown-endpoint 404, matching the behaviour of any other -// unregistered admin path. -func buildAPIMux(auth *AuthService, verifier *Verifier, clusterHandler, dynamoHandler http.Handler, logger *slog.Logger) http.Handler { +// dynamoHandler / s3Handler may be nil; in that case the corresponding +// paths fall through to the unknown-endpoint 404, matching the +// behaviour of any other unregistered admin path. +func buildAPIMux(auth *AuthService, verifier *Verifier, clusterHandler, dynamoHandler, s3Handler http.Handler, logger *slog.Logger) http.Handler { loginHandler := http.HandlerFunc(auth.HandleLogin) logoutHandler := http.HandlerFunc(auth.HandleLogout) @@ -228,23 +265,61 @@ func buildAPIMux(auth *AuthService, verifier *Verifier, clusterHandler, dynamoHa if dynamoHandler != nil { dynamoChain = protect(dynamoHandler) } + // S3 endpoints (read-only in this slice) share the protect chain + // for the same reason: even GETs need a session + CSRF cookie so + // a cross-site page cannot enumerate bucket names by tricking a + // logged-in browser into a fetch with credentials. + var s3Chain http.Handler + if s3Handler != nil { + s3Chain = protect(s3Handler) + } + + routes := apiRouteTable{ + login: loginChain, + logout: logoutChain, + cluster: clusterChain, + dynamo: dynamoChain, + s3: s3Chain, + } + return http.HandlerFunc(routes.dispatch) +} + +// apiRouteTable bundles the precomposed middleware chains for each +// admin API path family. Pulled into a type so the dispatch switch +// keeps buildAPIMux under the cyclop ceiling — every additional +// resource family (S3 buckets here, future SQS / queues / etc.) +// would otherwise push buildAPIMux's branch count past the limit. +type apiRouteTable struct { + login, logout, cluster http.Handler + dynamo, s3 http.Handler +} + +// dispatch is the receiver method httpHandlerFunc adapts. Logic is +// the same path-prefix switch the call site previously inlined. +func (t apiRouteTable) dispatch(w http.ResponseWriter, r *http.Request) { + switch { + case r.URL.Path == "/admin/api/v1/auth/login": + t.login.ServeHTTP(w, r) + case r.URL.Path == "/admin/api/v1/auth/logout": + t.logout.ServeHTTP(w, r) + case r.URL.Path == "/admin/api/v1/cluster": + t.cluster.ServeHTTP(w, r) + case t.dynamo != nil && isDynamoPath(r.URL.Path): + t.dynamo.ServeHTTP(w, r) + case t.s3 != nil && isS3Path(r.URL.Path): + t.s3.ServeHTTP(w, r) + default: + writeJSONError(w, http.StatusNotFound, "unknown_endpoint", + "no admin API handler is registered for this path") + } +} + +func isDynamoPath(p string) bool { + return p == pathDynamoTables || strings.HasPrefix(p, pathPrefixDynamoTables) +} - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - switch { - case r.URL.Path == "/admin/api/v1/auth/login": - loginChain.ServeHTTP(w, r) - case r.URL.Path == "/admin/api/v1/auth/logout": - logoutChain.ServeHTTP(w, r) - case r.URL.Path == "/admin/api/v1/cluster": - clusterChain.ServeHTTP(w, r) - case dynamoChain != nil && (r.URL.Path == pathDynamoTables || - strings.HasPrefix(r.URL.Path, pathPrefixDynamoTables)): - dynamoChain.ServeHTTP(w, r) - default: - writeJSONError(w, http.StatusNotFound, "unknown_endpoint", - "no admin API handler is registered for this path") - } - }) +func isS3Path(p string) bool { + return p == pathS3Buckets || strings.HasPrefix(p, pathPrefixS3Buckets) } func errMissing(field string) error { diff --git a/kv/hlc.go b/kv/hlc.go index e27d696de..58c6e8c53 100644 --- a/kv/hlc.go +++ b/kv/hlc.go @@ -9,6 +9,15 @@ import ( const hlcLogicalBits = 16 const hlcLogicalMask uint64 = (1 << hlcLogicalBits) - 1 +// HLCLogicalBits is the number of low bits in an HLC timestamp +// reserved for the in-memory logical counter (vs the upper bits +// which encode the Raft-agreed wall-clock millis). Exported so +// downstream tools — admin dashboard ISO-8601 formatting being the +// motivating case — can recover the physical half without +// hard-coding a magic number that silently drifts when the layout +// changes (Claude Issue 4 on PR #658). +const HLCLogicalBits = hlcLogicalBits + // HLC implements a hybrid logical clock where the physical part is agreed upon // via Raft consensus and the logical counter is managed purely in memory. // diff --git a/main.go b/main.go index 8ae368739..520bb499c 100644 --- a/main.go +++ b/main.go @@ -763,7 +763,7 @@ func startServers(in serversInput) error { // the handler hands ErrTablesNotLeader writes to the forwarder // which dials the leader over the cached gRPC pool. Without these // the handler falls back to 503 + Retry-After:1. - if err := startAdminFromFlags(in.ctx, in.lc, in.eg, in.runtimes, runner.dynamoServer, in.coordinate, connCache); err != nil { + if err := startAdminFromFlags(in.ctx, in.lc, in.eg, in.runtimes, runner.dynamoServer, runner.s3Server, in.coordinate, connCache); err != nil { return waitErrgroupAfterStartupFailure(in.cancel, in.eg, err) } return nil @@ -1302,6 +1302,12 @@ type runtimeServerRunner struct { // not a public API. Nil until start() reaches the dynamo step. dynamoServer *adapter.DynamoDBServer + // s3Server is the parallel field for the S3 admin endpoints + // (read-only in this slice). Nil when --s3Address is empty, + // in which case the admin handler answers /s3/buckets* with + // 404, mirroring the dynamoServer == nil contract. + s3Server *adapter.S3Server + // roleStore is the access-key → role index the leader-side // gRPC AdminForward service uses to re-validate the principal // on every forwarded write. Mirrors what admin.Config.RoleIndex @@ -1350,9 +1356,11 @@ func (r *runtimeServerRunner) start() error { ); err != nil { return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err) } - if err := startS3Server(r.ctx, r.lc, r.eg, r.s3Address, r.shardStore, r.coordinate, r.leaderS3, r.s3Region, r.s3CredsFile, r.s3PathStyleOnly, r.readTracker); err != nil { + s3Server, err := startS3Server(r.ctx, r.lc, r.eg, r.s3Address, r.shardStore, r.coordinate, r.leaderS3, r.s3Region, r.s3CredsFile, r.s3PathStyleOnly, r.readTracker) + if err != nil { return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err) } + r.s3Server = s3Server if err := startSQSServer(r.ctx, r.lc, r.eg, r.sqsAddress, r.shardStore, r.coordinate, r.leaderSQS, r.sqsRegion, r.sqsCredsFile); err != nil { return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err) } diff --git a/main_admin.go b/main_admin.go index f6d76f7a3..0b7815146 100644 --- a/main_admin.go +++ b/main_admin.go @@ -74,6 +74,7 @@ func startAdminFromFlags( eg *errgroup.Group, runtimes []*raftGroupRuntime, dynamoServer *adapter.DynamoDBServer, + s3Server *adapter.S3Server, coordinate kv.Coordinator, connCache *kv.GRPCConnCache, ) error { @@ -117,11 +118,12 @@ func startAdminFromFlags( } clusterSrc := newClusterInfoSource(*raftId, buildVersion(), runtimes) tablesSrc := newDynamoTablesSource(dynamoServer) + bucketsSrc := newBucketsSource(s3Server) forwarder, err := buildAdminLeaderForwarder(coordinate, connCache, *raftId) if err != nil { return errors.Wrap(err, "build admin leader forwarder") } - _, err = startAdminServer(ctx, lc, eg, cfg, staticCreds, clusterSrc, tablesSrc, forwarder, buildVersion()) + _, err = startAdminServer(ctx, lc, eg, cfg, staticCreds, clusterSrc, tablesSrc, bucketsSrc, forwarder, buildVersion()) return err } @@ -167,6 +169,67 @@ func newDynamoTablesSource(dynamoServer *adapter.DynamoDBServer) admin.TablesSou return &dynamoTablesBridge{server: dynamoServer} } +// newBucketsSource is the S3 counterpart of newDynamoTablesSource. +// Returns nil when s3Server is nil so a build that ships without the +// S3 adapter (--s3Address empty) silently disables the +// /admin/api/v1/s3/buckets routes. +func newBucketsSource(s3Server *adapter.S3Server) admin.BucketsSource { + if s3Server == nil { + return nil + } + return &bucketsBridge{server: s3Server} +} + +// bucketsBridge re-shapes the adapter's AdminBucketSummary DTO into +// the admin package's BucketSummary, threading through the +// (result, present, error) tuple semantics for the describe path. +// CreatedAtHLC is formatted into the SPA's expected ISO-8601 string +// here rather than in the adapter — formatting is a UI concern, not +// a storage one. +type bucketsBridge struct { + server *adapter.S3Server +} + +func (b *bucketsBridge) AdminListBuckets(ctx context.Context) ([]admin.BucketSummary, error) { + rows, err := b.server.AdminListBuckets(ctx) + if err != nil { + // Wrap with the bridge frame so an operator debugging a 500 + // from /admin/api/v1/s3/buckets sees the bridge in the error + // chain (Claude Issue 5 on PR #658). cockroachdb/errors + // already preserves the adapter's stack trace; this just + // adds the call-site context. + return nil, errors.Wrap(err, "admin buckets bridge: list") + } + out := make([]admin.BucketSummary, len(rows)) + for i, r := range rows { + out[i] = bucketSummaryFromAdapter(r) + } + return out, nil +} + +func (b *bucketsBridge) AdminDescribeBucket(ctx context.Context, name string) (*admin.BucketSummary, bool, error) { + row, exists, err := b.server.AdminDescribeBucket(ctx, name) + if err != nil { + return nil, false, errors.Wrapf(err, "admin buckets bridge: describe %q", name) + } + if !exists || row == nil { + return nil, false, nil + } + summary := bucketSummaryFromAdapter(*row) + return &summary, true, nil +} + +func bucketSummaryFromAdapter(in adapter.AdminBucketSummary) admin.BucketSummary { + return admin.BucketSummary{ + Name: in.Name, + ACL: in.ACL, + CreatedAt: admin.FormatBucketCreatedAt(in.CreatedAtHLC), + Generation: in.Generation, + Region: in.Region, + Owner: in.Owner, + } +} + // dynamoTablesBridge is the thin adapter that re-shapes the adapter's // AdminTableSummary DTO into the admin package's DynamoTableSummary. // The two structs are deliberately isomorphic so this translation @@ -384,6 +447,7 @@ func startAdminServer( creds map[string]string, cluster admin.ClusterInfoSource, tables admin.TablesSource, + buckets admin.BucketsSource, forwarder admin.LeaderForwarder, version string, ) (string, error) { @@ -392,7 +456,7 @@ func startAdminServer( if err != nil || !enabled { return "", err } - server, err := buildAdminHTTPServer(&adminCfg, creds, cluster, tables, forwarder) + server, err := buildAdminHTTPServer(&adminCfg, creds, cluster, tables, buckets, forwarder) if err != nil { return "", err } @@ -432,7 +496,7 @@ func checkAdminConfig(adminCfg *admin.Config, cluster admin.ClusterInfoSource) ( return true, nil } -func buildAdminHTTPServer(adminCfg *admin.Config, creds map[string]string, cluster admin.ClusterInfoSource, tables admin.TablesSource, forwarder admin.LeaderForwarder) (*admin.Server, error) { +func buildAdminHTTPServer(adminCfg *admin.Config, creds map[string]string, cluster admin.ClusterInfoSource, tables admin.TablesSource, buckets admin.BucketsSource, forwarder admin.LeaderForwarder) (*admin.Server, error) { primaryKeys, err := adminCfg.DecodedSigningKeys() if err != nil { return nil, errors.Wrap(err, "decode admin signing keys") @@ -452,6 +516,7 @@ func buildAdminHTTPServer(adminCfg *admin.Config, creds map[string]string, clust Roles: adminCfg.RoleIndex(), ClusterInfo: cluster, Tables: tables, + Buckets: buckets, Forwarder: forwarder, StaticFS: nil, AuthOpts: admin.AuthServiceOpts{ diff --git a/main_admin_forward_test.go b/main_admin_forward_test.go index 041713052..f82037036 100644 --- a/main_admin_forward_test.go +++ b/main_admin_forward_test.go @@ -177,6 +177,30 @@ func TestBuildAdminLeaderForwarder_HappyPathReturnsForwarder(t *testing.T) { require.NotNil(t, fwd) } +// TestAdminHLCPhysicalShiftMatchesKVLogicalBits guards against +// silent drift between admin.FormatBucketCreatedAt's shift constant +// (currently 16) and kv.HLCLogicalBits, the upstream truth the +// timestamp encoding obeys. If a future HLC format change +// re-partitions the wire layout in kv and the admin formatter is +// not updated, this test fails immediately rather than letting +// every CreatedAt render at the wrong hour silently (Claude +// Issue 4 on PR #658). +// +// admin cannot import kv (it is a low-level dependency the admin +// package stays decoupled from), so the assertion lives in main +// where both packages are already in scope. +func TestAdminHLCPhysicalShiftMatchesKVLogicalBits(t *testing.T) { + // FormatBucketCreatedAt(hlc) shifts hlc right by 16 to recover + // the wall-clock millis. Shift a known wall-clock value left by + // kv.HLCLogicalBits and confirm the formatter recovers exactly + // the right RFC3339 string — if the two constants drift apart, + // the round-trip produces a wrong year / hour and the test + // fails. + const wallMillis = int64(1_777_874_400_000) // 2026-05-04T06:00:00Z + hlc := uint64(wallMillis) << kv.HLCLogicalBits + require.Equal(t, "2026-05-04T06:00:00Z", admin.FormatBucketCreatedAt(hlc)) +} + // dummyTablesSource is the smallest concrete admin.TablesSource for // the readyForRegistration gate test — no method body needs to // execute, so every method just panics. Using a real implementation diff --git a/main_admin_test.go b/main_admin_test.go index d463dbe1a..d167804a1 100644 --- a/main_admin_test.go +++ b/main_admin_test.go @@ -198,7 +198,7 @@ func TestStartAdminServer_DisabledNoOp(t *testing.T) { eg, ctx := errgroup.WithContext(context.Background()) defer func() { _ = eg.Wait() }() var lc net.ListenConfig - _, err := startAdminServer(ctx, &lc, eg, adminListenerConfig{enabled: false}, nil, nil, nil, nil, "") + _, err := startAdminServer(ctx, &lc, eg, adminListenerConfig{enabled: false}, nil, nil, nil, nil, nil, "") require.NoError(t, err) } @@ -211,7 +211,7 @@ func TestStartAdminServer_InvalidConfigRejected(t *testing.T) { listen: "127.0.0.1:0", // missing signing key } - _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, nil, nil, "") + _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, nil, nil, nil, "") require.Error(t, err) } @@ -224,7 +224,7 @@ func TestStartAdminServer_NonLoopbackWithoutTLSRejected(t *testing.T) { listen: "0.0.0.0:0", sessionSigningKey: freshKey(), } - _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, nil, nil, "") + _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, nil, nil, nil, "") require.Error(t, err) require.Contains(t, err.Error(), "TLS") } @@ -238,7 +238,7 @@ func TestStartAdminServer_RejectsMissingClusterSource(t *testing.T) { listen: "127.0.0.1:0", sessionSigningKey: freshKey(), } - _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, nil, nil, "") + _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, nil, nil, nil, "") require.Error(t, err) require.Contains(t, err.Error(), "cluster info source") } @@ -261,7 +261,7 @@ func TestStartAdminServer_ServesHealthz(t *testing.T) { cluster := admin.ClusterInfoFunc(func(_ context.Context) (admin.ClusterInfo, error) { return admin.ClusterInfo{NodeID: "n1", Version: "test"}, nil }) - addr, err := startAdminServer(eCtx, &lc, eg, cfg, map[string]string{}, cluster, nil, nil, "test") + addr, err := startAdminServer(eCtx, &lc, eg, cfg, map[string]string{}, cluster, nil, nil, nil, "test") require.NoError(t, err) // Poll /admin/healthz until success or the test deadline. @@ -304,7 +304,7 @@ func TestStartAdminServer_ServesTLS(t *testing.T) { cluster := admin.ClusterInfoFunc(func(_ context.Context) (admin.ClusterInfo, error) { return admin.ClusterInfo{NodeID: "n-tls", Version: "test"}, nil }) - addr, err := startAdminServer(eCtx, &lc, eg, cfg, map[string]string{}, cluster, nil, nil, "test") + addr, err := startAdminServer(eCtx, &lc, eg, cfg, map[string]string{}, cluster, nil, nil, nil, "test") require.NoError(t, err) transport := &http.Transport{TLSClientConfig: &tls.Config{ diff --git a/main_s3.go b/main_s3.go index 5de290c7f..c00b2abd8 100644 --- a/main_s3.go +++ b/main_s3.go @@ -11,6 +11,15 @@ import ( "golang.org/x/sync/errgroup" ) +// startS3Server stands up the S3-compatible HTTP listener on s3Addr +// and returns the constructed *adapter.S3Server. Callers that need a +// reference to the running server (e.g. the admin HTTP listener, +// which calls SigV4-bypass admin entrypoints on it) hold the +// returned value; callers that don't can ignore it. +// +// Returns (nil, nil) when s3Addr is empty — that is the well-known +// "S3 disabled" state, not a configuration error. Other failures +// surface as a non-nil error and a nil server. func startS3Server( ctx context.Context, lc *net.ListenConfig, @@ -23,22 +32,26 @@ func startS3Server( credentialsFile string, pathStyleOnly bool, readTracker *kv.ActiveTimestampTracker, -) error { +) (*adapter.S3Server, error) { s3Addr = strings.TrimSpace(s3Addr) if s3Addr == "" { - return nil + // (nil, nil) is the explicit "S3 disabled" signal — the empty + // flag value is a valid configuration, not an error. The + // nilnil linter is not enabled in .golangci.yaml so no + // suppression directive is needed. + return nil, nil } if !pathStyleOnly { - return errors.New("virtual-hosted style S3 requests are not implemented") + return nil, errors.New("virtual-hosted style S3 requests are not implemented") } s3L, err := lc.Listen(ctx, "tcp", s3Addr) if err != nil { - return errors.Wrapf(err, "failed to listen on %s", s3Addr) + return nil, errors.Wrapf(err, "failed to listen on %s", s3Addr) } staticCreds, err := loadS3StaticCredentials(credentialsFile) if err != nil { _ = s3L.Close() - return err + return nil, err } s3Server := adapter.NewS3Server( s3L, @@ -67,7 +80,7 @@ func startS3Server( } return errors.WithStack(err) }) - return nil + return s3Server, nil } func loadS3StaticCredentials(path string) (map[string]string, error) { diff --git a/main_s3_test.go b/main_s3_test.go index 3dcd048f3..9e4cc71c5 100644 --- a/main_s3_test.go +++ b/main_s3_test.go @@ -13,14 +13,16 @@ import ( func TestStartS3ServerRejectsVirtualHostedStyleConfig(t *testing.T) { eg, ctx := errgroup.WithContext(context.Background()) - err := startS3Server(ctx, &net.ListenConfig{}, eg, "localhost:9000", nil, nil, nil, "us-east-1", "", false, nil) + srv, err := startS3Server(ctx, &net.ListenConfig{}, eg, "localhost:9000", nil, nil, nil, "us-east-1", "", false, nil) require.ErrorContains(t, err, "virtual-hosted style S3 requests are not implemented") + require.Nil(t, srv) } func TestStartS3ServerAllowsEmptyAddress(t *testing.T) { eg, ctx := errgroup.WithContext(context.Background()) - err := startS3Server(ctx, &net.ListenConfig{}, eg, "", nil, nil, nil, "us-east-1", "", false, nil) + srv, err := startS3Server(ctx, &net.ListenConfig{}, eg, "", nil, nil, nil, "us-east-1", "", false, nil) require.NoError(t, err) + require.Nil(t, srv) } func TestLoadS3StaticCredentials(t *testing.T) {