diff --git a/adapter/dynamodb_admin.go b/adapter/dynamodb_admin.go new file mode 100644 index 000000000..046c20403 --- /dev/null +++ b/adapter/dynamodb_admin.go @@ -0,0 +1,414 @@ +package adapter + +import ( + "context" + "net/http" + "sort" + "strings" + + "github.com/cockroachdb/errors" +) + +// AdminTableSummary is the table-level information the admin dashboard +// surfaces for a single Dynamo-compatible table. It deliberately +// projects only the fields the dashboard needs so the package's +// wire-format types (dynamoTableSchema and friends) stay internal. +type AdminTableSummary struct { + Name string + PartitionKey string + SortKey string + Generation uint64 + GlobalSecondaryIndexes []AdminGSISummary +} + +// AdminGSISummary mirrors AdminTableSummary for a single GSI. +type AdminGSISummary struct { + Name string + PartitionKey string + SortKey string + ProjectionType string +} + +// AdminListTables returns every Dynamo-style table this server knows +// about, in the lexicographic order the metadata index produces. +// Intended for the in-process admin listener as the SigV4-free +// counterpart to the listTables HTTP handler; both share the same +// underlying lookup so the two views cannot drift. +func (d *DynamoDBServer) AdminListTables(ctx context.Context) ([]string, error) { + return d.listTableNames(ctx) +} + +// AdminDescribeTable returns a schema snapshot for name. The triple +// (result, present, error) lets admin callers distinguish a genuine +// "not found" from a storage error without sniffing sentinels: when +// the table is missing the function returns (nil, false, nil). +// +// Unlike the SigV4 describeTable handler, AdminDescribeTable does +// NOT invoke ensureLegacyTableMigration. The admin dashboard is a +// strictly read-only surface (Gemini medium review on PR #633), so +// triggering Raft-coordinated key-encoding migrations as a side +// effect of routine polling would (a) violate the read-only +// contract and (b) cause every dashboard refresh to write to the +// cluster. Migration still runs lazily on the next SigV4 read or +// write of the same table — the schema we return here is just a +// snapshot for display, not a guarantee that the table is +// up-to-date for serving. +func (d *DynamoDBServer) AdminDescribeTable(ctx context.Context, name string) (*AdminTableSummary, bool, error) { + schema, exists, err := d.loadTableSchema(ctx, name) + if err != nil { + return nil, false, err + } + if !exists { + return nil, false, nil + } + return summaryFromSchema(schema), true, nil +} + +// AdminRole is the authorization tier the adapter checks against on +// every admin write entrypoint. The constants intentionally mirror +// internal/admin.Role string values so the wire / persisted role +// vocabulary stays aligned across packages, but we keep a separate +// type here so the adapter has zero dependency on internal/admin. +type AdminRole string + +const ( + // AdminRoleReadOnly may issue list / describe but not create or delete. + AdminRoleReadOnly AdminRole = "read_only" + // AdminRoleFull may issue every admin operation. + AdminRoleFull AdminRole = "full" +) + +// canWrite reports whether the role authorises state-mutating +// operations. Kept as a method (rather than an inline check) so any +// future "delete-only" tier reads consistently across the package. +func (r AdminRole) canWrite() bool { return r == AdminRoleFull } + +// AdminPrincipal is the authentication context every admin write +// entrypoint takes. The adapter re-evaluates authorisation against +// this principal *itself* — it does not trust the caller to have +// already enforced the role. That is the design's "認可の真実は常に +// adapter 側" invariant (Section 3.2): if a follower forwards a +// pre-authenticated request via the future AdminForward RPC, the +// leader must still verify before acting. +type AdminPrincipal struct { + AccessKey string + Role AdminRole +} + +// ErrAdminNotLeader is returned by every write entrypoint when this +// node is not the verified Raft leader. The admin HTTP handler +// translates this to 503 + Retry-After: 1 today; the future +// AdminForward RPC catches it as the trigger to forward to the +// leader instead. +var ErrAdminNotLeader = errors.New("dynamodb admin: this node is not the raft leader") + +// ErrAdminForbidden is returned when the principal lacks the role +// required for the operation. Admin handlers translate this to 403 +// "forbidden" without leaking which field of the principal failed +// the check. +var ErrAdminForbidden = errors.New("dynamodb admin: principal lacks required role") + +// IsAdminTableAlreadyExists reports whether err is the adapter's +// "table already exists" failure (ResourceInUseException). The +// bridge in main_admin.go uses this to map the adapter's internal +// error vocabulary onto admin's HTTP-facing sentinels without +// importing the package-private dynamoAPIError type. +func IsAdminTableAlreadyExists(err error) bool { + return adminAPIErrorTypeIs(err, dynamoErrResourceInUse) +} + +// IsAdminTableNotFound is the ResourceNotFoundException counterpart +// for AdminDeleteTable / AdminDescribeTable mapped through the +// adapter's structured error chain. +func IsAdminTableNotFound(err error) bool { + return adminAPIErrorTypeIs(err, dynamoErrResourceNotFound) +} + +// IsAdminValidation reports whether err is a validation failure +// the adapter signalled via ValidationException. Admin handlers map +// this to 400 + a sanitised message. +func IsAdminValidation(err error) bool { + return adminAPIErrorTypeIs(err, dynamoErrValidation) +} + +// AdminErrorMessage extracts the human-readable message from a +// dynamoAPIError for surfacing back to the SPA. Returns "" when err +// is not a structured adapter error so callers fall back to a +// generic message instead of leaking arbitrary err.Error() output. +func AdminErrorMessage(err error) string { + var apiErr *dynamoAPIError + if errors.As(err, &apiErr) && apiErr != nil { + return apiErr.message + } + return "" +} + +func adminAPIErrorTypeIs(err error, want string) bool { + var apiErr *dynamoAPIError + if !errors.As(err, &apiErr) || apiErr == nil { + return false + } + return apiErr.errorType == want +} + +// AdminAttribute names a single primary-key or GSI key column. Type +// must be one of "S", "N", "B" — DynamoDB does not allow boolean or +// list keys and the adapter's existing schema validation enforces +// the same restriction at the next layer. +type AdminAttribute struct { + Name string + Type string +} + +// AdminCreateGSI describes one global secondary index in an admin +// CreateTable request. SortKey is optional (hash-only GSI). When +// ProjectionType is "INCLUDE", NonKeyAttributes lists the projected +// attribute names; otherwise NonKeyAttributes is ignored. +type AdminCreateGSI struct { + Name string + PartitionKey AdminAttribute + SortKey *AdminAttribute + ProjectionType string + NonKeyAttributes []string +} + +// AdminCreateTableInput is the admin-facing CreateTable shape. The +// HTTP handler maps the design 4.2 JSON body into this struct, then +// AdminCreateTable converts it to the adapter's internal +// createTableInput. We do not pass the SigV4-flavoured wire struct +// directly because that struct's field names track AWS exactly and +// would be awkward for the admin SPA to author. +type AdminCreateTableInput struct { + TableName string + PartitionKey AdminAttribute + SortKey *AdminAttribute + GSI []AdminCreateGSI +} + +// AdminCreateTable creates a Dynamo-compatible table on the local +// node, after re-validating the principal's role and confirming this +// node is the verified Raft leader. The returned summary mirrors the +// shape of AdminDescribeTable on the same name so the SPA can show +// the freshly-created table without an extra describe round-trip. +// +// Errors: +// - ErrAdminForbidden when the principal cannot write. +// - ErrAdminNotLeader when the node is a follower. +// - The adapter's standard dynamoAPIError chain for validation / +// storage failures, preserved unmodified so the HTTP handler can +// map the inner code (ValidationException, ResourceInUseException, +// etc.) to the appropriate status without re-classifying. +func (d *DynamoDBServer) AdminCreateTable(ctx context.Context, principal AdminPrincipal, in AdminCreateTableInput) (*AdminTableSummary, error) { + if !principal.Role.canWrite() { + return nil, ErrAdminForbidden + } + if !isVerifiedDynamoLeader(d.coordinator) { + return nil, ErrAdminNotLeader + } + legacy, err := buildLegacyCreateTableInput(in) + if err != nil { + return nil, err + } + // buildLegacyCreateTableInput already rejects an empty table + // name; the previous duplicated check here was dead code. + unlock := d.lockTableOperations([]string{legacy.TableName}) + defer unlock() + schema, err := buildCreateTableSchema(legacy) + if err != nil { + return nil, newDynamoAPIError(http.StatusBadRequest, dynamoErrValidation, err.Error()) + } + if err := d.createTableWithRetry(ctx, legacy.TableName, schema); err != nil { + return nil, err + } + d.observeTables(ctx, schema.TableName) + // Reload after commit so the returned summary carries the + // generation that createTableWithRetry actually persisted — + // `schema` going in had generation 0 because the next number is + // computed inside the retry loop. Reading it back also matches + // the SPA's mental model: the response shape is identical to a + // follow-up AdminDescribeTable call. + stored, exists, err := d.loadTableSchema(ctx, legacy.TableName) + if err != nil { + return nil, err + } + if !exists { + // Should be unreachable: createTableWithRetry succeeded above + // under the table lock, so a missing schema here means the + // metadata index is corrupt or another writer raced through + // the lock — both of which are server-internal failures. + return nil, newDynamoAPIError(http.StatusInternalServerError, dynamoErrInternal, "table missing immediately after create") + } + return summaryFromSchema(stored), nil +} + +// AdminDeleteTable is the SigV4-bypass counterpart to deleteTable. +// Returns the same sentinel errors as AdminCreateTable plus the +// adapter's standard dynamoErrResourceNotFound when the table is +// absent — admin handlers should map that to 404 rather than 500. +func (d *DynamoDBServer) AdminDeleteTable(ctx context.Context, principal AdminPrincipal, name string) error { + if !principal.Role.canWrite() { + return ErrAdminForbidden + } + if !isVerifiedDynamoLeader(d.coordinator) { + return ErrAdminNotLeader + } + if strings.TrimSpace(name) == "" { + return newDynamoAPIError(http.StatusBadRequest, dynamoErrValidation, "missing table name") + } + unlock := d.lockTableOperations([]string{name}) + defer unlock() + return d.deleteTableWithRetry(ctx, name) +} + +// buildLegacyCreateTableInput maps the admin-facing struct to the +// adapter's existing wire-format struct so the rest of the schema +// pipeline (buildCreateTableSchema → dispatch) can be reused as-is. +// Doing the translation here — rather than refactoring the wire +// types — keeps SigV4 path bit-exact and limits the blast radius of +// the admin feature. +func buildLegacyCreateTableInput(in AdminCreateTableInput) (createTableInput, error) { + if strings.TrimSpace(in.TableName) == "" { + return createTableInput{}, newDynamoAPIError(http.StatusBadRequest, dynamoErrValidation, "missing table name") + } + if strings.TrimSpace(in.PartitionKey.Name) == "" { + return createTableInput{}, newDynamoAPIError(http.StatusBadRequest, dynamoErrValidation, "missing partition key name") + } + collector := newAttrCollector() + out := createTableInput{TableName: in.TableName} + if err := appendKeySchema(&out, in, collector.add); err != nil { + return createTableInput{}, err + } + for _, gsi := range in.GSI { + legacy, err := buildLegacyGSI(gsi, collector.add) + if err != nil { + return createTableInput{}, err + } + out.GlobalSecondaryIndexes = append(out.GlobalSecondaryIndexes, legacy) + } + out.AttributeDefinitions = collector.sorted() + return out, nil +} + +// attrCollector merges every attribute referenced by primary key +// and GSIs, rejecting conflicting type declarations for the same +// name. Pulling the bookkeeping out of the build function lets us +// share it with appendKeySchema / buildLegacyGSI without exposing a +// raw map to callers. +type attrCollector struct{ set map[string]string } + +func newAttrCollector() *attrCollector { return &attrCollector{set: map[string]string{}} } + +func (c *attrCollector) add(a AdminAttribute) error { + if existing, ok := c.set[a.Name]; ok && existing != a.Type { + return newDynamoAPIError(http.StatusBadRequest, dynamoErrValidation, + "conflicting attribute type for "+a.Name) + } + c.set[a.Name] = a.Type + return nil +} + +// sorted emits the merged attribute definitions in lexicographic +// order so makeCreateTableRequest produces a byte-identical +// OperationGroup for inputs that differ only in map iteration +// order. Tests that assert against an already-created table can +// then compare schemas without pre-sorting on their side. +func (c *attrCollector) sorted() []createTableAttributeDefinition { + defs := make([]createTableAttributeDefinition, 0, len(c.set)) + for name, typ := range c.set { + defs = append(defs, createTableAttributeDefinition{ + AttributeName: name, + AttributeType: typ, + }) + } + sort.Slice(defs, func(i, j int) bool { + return defs[i].AttributeName < defs[j].AttributeName + }) + return defs +} + +// appendKeySchema writes the primary key (HASH + optional RANGE) +// into out.KeySchema and registers the same attributes with addAttr +// so AttributeDefinitions stays consistent with the key schema. +func appendKeySchema(out *createTableInput, in AdminCreateTableInput, addAttr func(AdminAttribute) error) error { + if err := addAttr(in.PartitionKey); err != nil { + return err + } + out.KeySchema = append(out.KeySchema, createTableKeySchemaElement{ + AttributeName: in.PartitionKey.Name, + KeyType: "HASH", + }) + if in.SortKey == nil { + return nil + } + if err := addAttr(*in.SortKey); err != nil { + return err + } + out.KeySchema = append(out.KeySchema, createTableKeySchemaElement{ + AttributeName: in.SortKey.Name, + KeyType: "RANGE", + }) + return nil +} + +func buildLegacyGSI(gsi AdminCreateGSI, addAttr func(AdminAttribute) error) (createTableGSI, error) { + if strings.TrimSpace(gsi.Name) == "" { + return createTableGSI{}, newDynamoAPIError(http.StatusBadRequest, dynamoErrValidation, "missing GSI name") + } + if strings.TrimSpace(gsi.PartitionKey.Name) == "" { + return createTableGSI{}, newDynamoAPIError(http.StatusBadRequest, dynamoErrValidation, "missing GSI partition key name") + } + if err := addAttr(gsi.PartitionKey); err != nil { + return createTableGSI{}, err + } + out := createTableGSI{ + IndexName: gsi.Name, + KeySchema: []createTableKeySchemaElement{ + {AttributeName: gsi.PartitionKey.Name, KeyType: "HASH"}, + }, + Projection: createTableProjection{ProjectionType: gsi.ProjectionType}, + } + if gsi.SortKey != nil { + if err := addAttr(*gsi.SortKey); err != nil { + return createTableGSI{}, err + } + out.KeySchema = append(out.KeySchema, createTableKeySchemaElement{ + AttributeName: gsi.SortKey.Name, + KeyType: "RANGE", + }) + } + if strings.EqualFold(gsi.ProjectionType, "INCLUDE") { + out.Projection.NonKeyAttributes = append([]string(nil), gsi.NonKeyAttributes...) + } + return out, nil +} + +func summaryFromSchema(s *dynamoTableSchema) *AdminTableSummary { + out := &AdminTableSummary{ + Name: s.TableName, + PartitionKey: s.PrimaryKey.HashKey, + SortKey: s.PrimaryKey.RangeKey, + Generation: s.Generation, + } + if len(s.GlobalSecondaryIndexes) == 0 { + return out + } + names := make([]string, 0, len(s.GlobalSecondaryIndexes)) + for n := range s.GlobalSecondaryIndexes { + names = append(names, n) + } + // Sort so the JSON the admin handler emits is deterministic; map + // iteration order would otherwise produce an unstable output that + // breaks both UI diffing and snapshot tests. + sort.Strings(names) + out.GlobalSecondaryIndexes = make([]AdminGSISummary, 0, len(names)) + for _, name := range names { + gsi := s.GlobalSecondaryIndexes[name] + out.GlobalSecondaryIndexes = append(out.GlobalSecondaryIndexes, AdminGSISummary{ + Name: name, + PartitionKey: gsi.KeySchema.HashKey, + SortKey: gsi.KeySchema.RangeKey, + ProjectionType: gsi.Projection.ProjectionType, + }) + } + return out +} diff --git a/adapter/dynamodb_admin_test.go b/adapter/dynamodb_admin_test.go new file mode 100644 index 000000000..3cff75a2a --- /dev/null +++ b/adapter/dynamodb_admin_test.go @@ -0,0 +1,413 @@ +package adapter + +import ( + "context" + "errors" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + ddbTypes "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + "github.com/stretchr/testify/require" +) + +// fullAdminPrincipal is the canonical "may write" principal used in +// every write-path test below. Defining it once keeps the fixture +// noise out of the assertions and means a future schema change to +// AdminPrincipal touches one spot, not 30. +var fullAdminPrincipal = AdminPrincipal{AccessKey: "AKIA_FULL", Role: AdminRoleFull} +var readOnlyAdminPrincipal = AdminPrincipal{AccessKey: "AKIA_RO", Role: AdminRoleReadOnly} + +// TestDynamoDB_AdminListTables_Empty exercises the SigV4-bypass admin +// entrypoint on a server that has no Dynamo tables. The expected shape +// is an empty (non-nil) slice so the admin JSON response stays a valid +// array rather than `null`, matching the design doc 4.3 contract. +func TestDynamoDB_AdminListTables_Empty(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + + got, err := nodes[0].dynamoServer.AdminListTables(context.Background()) + require.NoError(t, err) + require.Empty(t, got) +} + +// TestDynamoDB_AdminListTables_Sorted verifies that the admin entrypoint +// returns table names in lexicographic order, matching the listTables +// HTTP handler so the two admin views (SigV4 and bypass) cannot drift. +func TestDynamoDB_AdminListTables_Sorted(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + + client := newDynamoClient(t, nodes[0].dynamoAddress) + ctx := context.Background() + + for _, name := range []string{"zeta", "alpha", "mu"} { + _, err := client.CreateTable(ctx, &dynamodb.CreateTableInput{ + TableName: aws.String(name), + BillingMode: ddbTypes.BillingModePayPerRequest, + AttributeDefinitions: []ddbTypes.AttributeDefinition{ + {AttributeName: aws.String("pk"), AttributeType: ddbTypes.ScalarAttributeTypeS}, + }, + KeySchema: []ddbTypes.KeySchemaElement{ + {AttributeName: aws.String("pk"), KeyType: ddbTypes.KeyTypeHash}, + }, + }) + require.NoError(t, err) + } + + got, err := nodes[0].dynamoServer.AdminListTables(ctx) + require.NoError(t, err) + require.Equal(t, []string{"alpha", "mu", "zeta"}, got) +} + +// TestDynamoDB_AdminDescribeTable_Missing checks the (nil, false, nil) +// "not found" contract — admin callers must be able to tell a missing +// table apart from a storage error without sniffing sentinels. +func TestDynamoDB_AdminDescribeTable_Missing(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + + summary, exists, err := nodes[0].dynamoServer.AdminDescribeTable(context.Background(), "absent") + require.NoError(t, err) + require.False(t, exists) + require.Nil(t, summary) +} + +// TestDynamoDB_AdminDescribeTable_Composite covers the simple-key happy +// path: a table with hash + range key and no GSIs. The admin summary +// must mirror the schema's primary key fields exactly. +func TestDynamoDB_AdminDescribeTable_Composite(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + + client := newDynamoClient(t, nodes[0].dynamoAddress) + ctx := context.Background() + + _, err := client.CreateTable(ctx, &dynamodb.CreateTableInput{ + TableName: aws.String("orders"), + BillingMode: ddbTypes.BillingModePayPerRequest, + AttributeDefinitions: []ddbTypes.AttributeDefinition{ + {AttributeName: aws.String("customer"), AttributeType: ddbTypes.ScalarAttributeTypeS}, + {AttributeName: aws.String("orderID"), AttributeType: ddbTypes.ScalarAttributeTypeS}, + }, + KeySchema: []ddbTypes.KeySchemaElement{ + {AttributeName: aws.String("customer"), KeyType: ddbTypes.KeyTypeHash}, + {AttributeName: aws.String("orderID"), KeyType: ddbTypes.KeyTypeRange}, + }, + }) + require.NoError(t, err) + + summary, exists, err := nodes[0].dynamoServer.AdminDescribeTable(ctx, "orders") + require.NoError(t, err) + require.True(t, exists) + require.NotNil(t, summary) + require.Equal(t, "orders", summary.Name) + require.Equal(t, "customer", summary.PartitionKey) + require.Equal(t, "orderID", summary.SortKey) + require.NotZero(t, summary.Generation) + require.Empty(t, summary.GlobalSecondaryIndexes) +} + +// TestDynamoDB_AdminDescribeTable_GSI_SortedDeterministic exercises the +// GSI projection path. Two indexes are added in deliberately reversed +// alphabetical order to confirm summaryFromSchema's Sort.Strings call — +// without it, map iteration order would produce a flaky output. +func TestDynamoDB_AdminDescribeTable_GSI_SortedDeterministic(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + + client := newDynamoClient(t, nodes[0].dynamoAddress) + ctx := context.Background() + + _, err := client.CreateTable(ctx, &dynamodb.CreateTableInput{ + TableName: aws.String("threads"), + BillingMode: ddbTypes.BillingModePayPerRequest, + AttributeDefinitions: []ddbTypes.AttributeDefinition{ + {AttributeName: aws.String("threadId"), AttributeType: ddbTypes.ScalarAttributeTypeS}, + {AttributeName: aws.String("status"), AttributeType: ddbTypes.ScalarAttributeTypeS}, + {AttributeName: aws.String("owner"), AttributeType: ddbTypes.ScalarAttributeTypeS}, + {AttributeName: aws.String("createdAt"), AttributeType: ddbTypes.ScalarAttributeTypeS}, + }, + KeySchema: []ddbTypes.KeySchemaElement{ + {AttributeName: aws.String("threadId"), KeyType: ddbTypes.KeyTypeHash}, + }, + GlobalSecondaryIndexes: []ddbTypes.GlobalSecondaryIndex{ + { + IndexName: aws.String("zStatusIndex"), + KeySchema: []ddbTypes.KeySchemaElement{ + {AttributeName: aws.String("status"), KeyType: ddbTypes.KeyTypeHash}, + {AttributeName: aws.String("createdAt"), KeyType: ddbTypes.KeyTypeRange}, + }, + Projection: &ddbTypes.Projection{ProjectionType: ddbTypes.ProjectionTypeAll}, + }, + { + IndexName: aws.String("aOwnerIndex"), + KeySchema: []ddbTypes.KeySchemaElement{ + {AttributeName: aws.String("owner"), KeyType: ddbTypes.KeyTypeHash}, + }, + Projection: &ddbTypes.Projection{ProjectionType: ddbTypes.ProjectionTypeKeysOnly}, + }, + }, + }) + require.NoError(t, err) + + summary, exists, err := nodes[0].dynamoServer.AdminDescribeTable(ctx, "threads") + require.NoError(t, err) + require.True(t, exists) + require.NotNil(t, summary) + require.Equal(t, "threadId", summary.PartitionKey) + require.Empty(t, summary.SortKey) + + require.Len(t, summary.GlobalSecondaryIndexes, 2) + // Names sorted lexicographically: "aOwnerIndex" < "zStatusIndex". + require.Equal(t, "aOwnerIndex", summary.GlobalSecondaryIndexes[0].Name) + require.Equal(t, "owner", summary.GlobalSecondaryIndexes[0].PartitionKey) + require.Empty(t, summary.GlobalSecondaryIndexes[0].SortKey) + require.Equal(t, string(ddbTypes.ProjectionTypeKeysOnly), summary.GlobalSecondaryIndexes[0].ProjectionType) + + require.Equal(t, "zStatusIndex", summary.GlobalSecondaryIndexes[1].Name) + require.Equal(t, "status", summary.GlobalSecondaryIndexes[1].PartitionKey) + require.Equal(t, "createdAt", summary.GlobalSecondaryIndexes[1].SortKey) + require.Equal(t, string(ddbTypes.ProjectionTypeAll), summary.GlobalSecondaryIndexes[1].ProjectionType) +} + +// TestDynamoDB_AdminCreateTable_HappyPath confirms the SigV4-bypass +// CreateTable round-trips through the same OperationGroup pipeline as +// the SigV4 path: the resulting table appears under AdminListTables +// and AdminDescribeTable returns the expected key shape. +func TestDynamoDB_AdminCreateTable_HappyPath(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + + srv := nodes[0].dynamoServer + ctx := context.Background() + + summary, err := srv.AdminCreateTable(ctx, fullAdminPrincipal, AdminCreateTableInput{ + TableName: "users", + PartitionKey: AdminAttribute{Name: "id", Type: "S"}, + SortKey: &AdminAttribute{Name: "createdAt", Type: "N"}, + }) + require.NoError(t, err) + require.NotNil(t, summary) + require.Equal(t, "users", summary.Name) + require.Equal(t, "id", summary.PartitionKey) + require.Equal(t, "createdAt", summary.SortKey) + require.NotZero(t, summary.Generation) + + names, err := srv.AdminListTables(ctx) + require.NoError(t, err) + require.Equal(t, []string{"users"}, names) + + got, exists, err := srv.AdminDescribeTable(ctx, "users") + require.NoError(t, err) + require.True(t, exists) + require.Equal(t, summary.Name, got.Name) + require.Equal(t, summary.PartitionKey, got.PartitionKey) + require.Equal(t, summary.SortKey, got.SortKey) + require.Equal(t, summary.Generation, got.Generation) +} + +// TestDynamoDB_AdminCreateTable_WithGSI exercises a multi-GSI shape +// including INCLUDE projection. The two GSIs are added in +// reverse-alphabetical order to confirm the resulting summary's +// deterministic ordering (sort.Strings) carries through. +func TestDynamoDB_AdminCreateTable_WithGSI(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + + srv := nodes[0].dynamoServer + ctx := context.Background() + + summary, err := srv.AdminCreateTable(ctx, fullAdminPrincipal, AdminCreateTableInput{ + TableName: "messages", + PartitionKey: AdminAttribute{Name: "threadId", Type: "S"}, + GSI: []AdminCreateGSI{ + { + Name: "zStatusIndex", + PartitionKey: AdminAttribute{Name: "status", Type: "S"}, + SortKey: &AdminAttribute{Name: "ts", Type: "N"}, + ProjectionType: "ALL", + }, + { + Name: "aPriorityIndex", + PartitionKey: AdminAttribute{Name: "priority", Type: "N"}, + ProjectionType: "INCLUDE", + NonKeyAttributes: []string{"author"}, + }, + }, + }) + require.NoError(t, err) + require.Len(t, summary.GlobalSecondaryIndexes, 2) + require.Equal(t, "aPriorityIndex", summary.GlobalSecondaryIndexes[0].Name) + require.Equal(t, "zStatusIndex", summary.GlobalSecondaryIndexes[1].Name) + require.Equal(t, "ALL", summary.GlobalSecondaryIndexes[1].ProjectionType) + require.Equal(t, "INCLUDE", summary.GlobalSecondaryIndexes[0].ProjectionType) +} + +// TestDynamoDB_AdminCreateTable_DuplicateRejected covers the +// ResourceInUseException path. The adapter must not silently +// re-create the table; admin handlers map this to 4xx for the SPA. +func TestDynamoDB_AdminCreateTable_DuplicateRejected(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + + srv := nodes[0].dynamoServer + ctx := context.Background() + in := AdminCreateTableInput{ + TableName: "dup", + PartitionKey: AdminAttribute{Name: "id", Type: "S"}, + } + _, err := srv.AdminCreateTable(ctx, fullAdminPrincipal, in) + require.NoError(t, err) + _, err = srv.AdminCreateTable(ctx, fullAdminPrincipal, in) + require.Error(t, err) + require.Contains(t, err.Error(), "already exists") +} + +// TestDynamoDB_AdminCreateTable_RoleEnforcedAtAdapter is the design +// doc 3.2 invariant: even if the admin handler somehow let a +// read-only principal through, the adapter would still refuse. +func TestDynamoDB_AdminCreateTable_RoleEnforcedAtAdapter(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + + _, err := nodes[0].dynamoServer.AdminCreateTable(context.Background(), readOnlyAdminPrincipal, AdminCreateTableInput{ + TableName: "ro", + PartitionKey: AdminAttribute{Name: "id", Type: "S"}, + }) + require.Error(t, err) + require.True(t, errors.Is(err, ErrAdminForbidden)) +} + +// TestDynamoDB_AdminCreateTable_ValidationErrors covers the cheap +// up-front checks AdminCreateTable does before touching the +// coordinator. These are the codepaths the SPA hits when a user fills +// the form incorrectly; they must surface as validation errors rather +// than 500s. +func TestDynamoDB_AdminCreateTable_ValidationErrors(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + + srv := nodes[0].dynamoServer + ctx := context.Background() + + cases := []struct { + name string + in AdminCreateTableInput + }{ + {"empty table name", AdminCreateTableInput{TableName: " ", PartitionKey: AdminAttribute{Name: "id", Type: "S"}}}, + {"missing partition key name", AdminCreateTableInput{TableName: "t", PartitionKey: AdminAttribute{Name: "", Type: "S"}}}, + {"empty GSI name", AdminCreateTableInput{ + TableName: "t", + PartitionKey: AdminAttribute{Name: "id", Type: "S"}, + GSI: []AdminCreateGSI{{Name: ""}}, + }}, + {"GSI partition key missing", AdminCreateTableInput{ + TableName: "t", + PartitionKey: AdminAttribute{Name: "id", Type: "S"}, + GSI: []AdminCreateGSI{{Name: "gsi", PartitionKey: AdminAttribute{Name: ""}}}, + }}, + {"conflicting attribute types across primary and GSI", AdminCreateTableInput{ + TableName: "t", + PartitionKey: AdminAttribute{Name: "id", Type: "S"}, + GSI: []AdminCreateGSI{{Name: "gsi", PartitionKey: AdminAttribute{Name: "id", Type: "N"}, ProjectionType: "ALL"}}, + }}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + _, err := srv.AdminCreateTable(ctx, fullAdminPrincipal, tc.in) + require.Error(t, err) + }) + } +} + +// TestDynamoDB_AdminDeleteTable_HappyPath checks the round trip: +// create, confirm via list, delete, confirm absence. +func TestDynamoDB_AdminDeleteTable_HappyPath(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + + srv := nodes[0].dynamoServer + ctx := context.Background() + + _, err := srv.AdminCreateTable(ctx, fullAdminPrincipal, AdminCreateTableInput{ + TableName: "to-delete", + PartitionKey: AdminAttribute{Name: "id", Type: "S"}, + }) + require.NoError(t, err) + + require.NoError(t, srv.AdminDeleteTable(ctx, fullAdminPrincipal, "to-delete")) + + _, exists, err := srv.AdminDescribeTable(ctx, "to-delete") + require.NoError(t, err) + require.False(t, exists) +} + +// TestDynamoDB_AdminDeleteTable_MissingReturnsResourceNotFound +// confirms the adapter surfaces a structured error (not a generic +// 500) when the table never existed; admin handlers map this to 404. +func TestDynamoDB_AdminDeleteTable_MissingReturnsResourceNotFound(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + + err := nodes[0].dynamoServer.AdminDeleteTable(context.Background(), fullAdminPrincipal, "absent") + require.Error(t, err) + require.Contains(t, err.Error(), "not found") +} + +// TestDynamoDB_AdminDeleteTable_ReadOnlyForbidden mirrors the +// CreateTable role test on the delete path; both must enforce. +func TestDynamoDB_AdminDeleteTable_ReadOnlyForbidden(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + + err := nodes[0].dynamoServer.AdminDeleteTable(context.Background(), readOnlyAdminPrincipal, "anything") + require.Error(t, err) + require.True(t, errors.Is(err, ErrAdminForbidden)) +} + +// TestDynamoDB_AdminDeleteTable_EmptyName_Validation confirms the +// up-front guard returns a validation error rather than passing an +// empty name to the coordinator (which would either 500 or, worse, +// match a tombstone-leaning corner case). +func TestDynamoDB_AdminDeleteTable_EmptyName_Validation(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + + err := nodes[0].dynamoServer.AdminDeleteTable(context.Background(), fullAdminPrincipal, " ") + require.Error(t, err) +} + +func newDynamoClient(t *testing.T, address string) *dynamodb.Client { + t.Helper() + // Region is intentionally arbitrary here. The test DynamoDB + // server does not enforce a region match in its SigV4 path — + // every existing adapter test uses "us-west-2" as a placeholder + // for the same reason. If the server later starts requiring a + // specific region, source it from the same constant the server + // reads instead of hardcoding it on each side independently. + cfg, err := config.LoadDefaultConfig(context.Background(), + config.WithRegion("us-west-2"), + config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider("dummy", "dummy", "")), + ) + require.NoError(t, err) + return dynamodb.NewFromConfig(cfg, func(o *dynamodb.Options) { + o.BaseEndpoint = aws.String("http://" + address) + }) +} diff --git a/internal/admin/dynamo_handler.go b/internal/admin/dynamo_handler.go new file mode 100644 index 000000000..3b8ad9404 --- /dev/null +++ b/internal/admin/dynamo_handler.go @@ -0,0 +1,710 @@ +package admin + +import ( + "bytes" + "context" + "encoding/base64" + "errors" + "io" + "log/slog" + "net/http" + "sort" + "strconv" + "strings" + + "github.com/goccy/go-json" +) + +// Pagination knobs for the read-only Dynamo table list endpoint. +// +// defaultDynamoListLimit matches the design doc Section 4.3 default +// (100). dynamoListLimitMax is the hard ceiling; oversized client +// requests are clamped silently rather than rejected so the SPA can +// pass through an opaque "max" without a round-trip on validation. +const ( + defaultDynamoListLimit = 100 + dynamoListLimitMax = 1000 +) + +// pathPrefixDynamoTables is the URL prefix the dynamo handler owns. +// "" + suffix "/tables" produces /admin/api/v1/dynamo/tables; the +// trailing slash variant routes to the per-table sub-handler. +const ( + pathDynamoTables = "/admin/api/v1/dynamo/tables" + pathPrefixDynamoTables = pathDynamoTables + "/" +) + +// DynamoTableSummary is the JSON shape the admin dashboard consumes. +// Defined in the admin package — rather than reusing the adapter's +// AdminTableSummary directly — so the admin HTTP layer does not pull +// in the heavyweight adapter dependency tree (gRPC, Raft, etc.) and +// remains testable in isolation. main_admin.go translates between +// adapter.AdminTableSummary and this type. +type DynamoTableSummary struct { + Name string `json:"name"` + PartitionKey string `json:"partition_key"` + SortKey string `json:"sort_key,omitempty"` + Generation uint64 `json:"generation"` + GlobalSecondaryIndexes []DynamoGSISummary `json:"global_secondary_indexes,omitempty"` +} + +// DynamoGSISummary mirrors DynamoTableSummary for a single GSI. +type DynamoGSISummary struct { + Name string `json:"name"` + PartitionKey string `json:"partition_key"` + SortKey string `json:"sort_key,omitempty"` + ProjectionType string `json:"projection_type"` +} + +// TablesSource is the contract the dynamo handler depends on. Wired in +// production to *adapter.DynamoDBServer via a small bridge in +// main_admin.go; tests use a stub. +// +// AdminDescribeTable returns (nil, false, nil) for a missing table so +// callers can distinguish "not found" from a storage error without +// sniffing sentinels. The write entrypoints return the structured +// errors below (ErrTablesForbidden / ErrTablesNotLeader / ...) so +// the handler can map them to HTTP statuses without leaking the +// adapter's internal error shape into the admin package. +type TablesSource interface { + AdminListTables(ctx context.Context) ([]string, error) + AdminDescribeTable(ctx context.Context, name string) (*DynamoTableSummary, bool, error) + AdminCreateTable(ctx context.Context, principal AuthPrincipal, in CreateTableRequest) (*DynamoTableSummary, error) + AdminDeleteTable(ctx context.Context, principal AuthPrincipal, name string) error +} + +// CreateTableRequest is the JSON body shape for POST /tables per +// design Section 4.2. The handler validates each field before +// passing the request to the source. +type CreateTableRequest struct { + TableName string `json:"table_name"` + PartitionKey CreateTableAttribute `json:"partition_key"` + SortKey *CreateTableAttribute `json:"sort_key,omitempty"` + GSI []CreateTableGSI `json:"gsi,omitempty"` +} + +// CreateTableAttribute names a single primary-key or GSI key +// column. Type must be one of "S", "N", "B". +type CreateTableAttribute struct { + Name string `json:"name"` + Type string `json:"type"` +} + +// CreateTableGSI describes a single global secondary index in a +// CreateTableRequest. SortKey is optional (hash-only GSI). When +// Projection.Type is "INCLUDE", Projection.NonKeyAttributes lists +// the projected attribute names; otherwise it is ignored. +type CreateTableGSI struct { + Name string `json:"name"` + PartitionKey CreateTableAttribute `json:"partition_key"` + SortKey *CreateTableAttribute `json:"sort_key,omitempty"` + Projection CreateTableProjection `json:"projection"` +} + +// CreateTableProjection mirrors the DynamoDB Projection sub-struct +// in admin-friendly snake_case. Type defaults to "ALL" when omitted. +type CreateTableProjection struct { + Type string `json:"type,omitempty"` + NonKeyAttributes []string `json:"non_key_attributes,omitempty"` +} + +// Errors the source layer may return to signal a structured +// failure mode the handler maps to a specific HTTP response. +// +// They are sentinel values so a bridge implementation can map any +// adapter-internal error onto exactly one of these without the +// admin package importing the adapter package's private types. +var ( + // ErrTablesForbidden is returned when the principal lacks the + // role required for the operation. Maps to 403. + ErrTablesForbidden = errors.New("admin tables: principal lacks required role") + // ErrTablesNotLeader is returned when the local node is not the + // Raft leader. Maps to 503 + Retry-After: 1 today; the future + // AdminForward RPC catches this as the trigger to forward. + ErrTablesNotLeader = errors.New("admin tables: local node is not the raft leader") + // ErrTablesNotFound is returned when DELETE / DESCRIBE / a + // follow-up read targets a table that does not exist. Maps to + // 404. AdminDescribeTable's (nil, false, nil) tuple is the + // preferred signal for the read path; this sentinel covers the + // write paths only. + ErrTablesNotFound = errors.New("admin tables: table not found") + // ErrTablesAlreadyExists is returned when CreateTable hits a + // pre-existing table with the same name. Maps to 409. + ErrTablesAlreadyExists = errors.New("admin tables: table already exists") +) + +// errCreateBodyTooLarge is returned by decodeCreateTableRequest +// when the request body trips the BodyLimit middleware's +// MaxBytesReader. The handler matches this sentinel to map the +// failure to 413 payload_too_large rather than the generic 400 +// invalid_body — the BodyLimit/middleware contract documented in +// internal/admin/middleware.go (Codex P2 on PR #634 flagged the +// previous always-400 behaviour as a regression). +var errCreateBodyTooLarge = errors.New("request body exceeds the 64 KiB admin limit") + +// ValidationError is what the source returns when the input fails +// adapter-side validation. Surfaces a sanitised message back to the +// SPA — adapter-internal err.Error() output is never sent verbatim. +type ValidationError struct{ Message string } + +func (e *ValidationError) Error() string { + if e == nil || e.Message == "" { + return "admin tables: validation failed" + } + return e.Message +} + +// DynamoHandler serves /admin/api/v1/dynamo/tables and +// /admin/api/v1/dynamo/tables/{name}. The collection root accepts +// GET (list) and POST (create); the per-table route accepts GET +// (describe) and DELETE. Writes go through the same protected +// chain as reads (BodyLimit -> SessionAuth -> Audit -> CSRF) plus +// an in-handler RoleFull check so a read-only key cannot mutate +// even with a valid CSRF token. +// +// Writes additionally re-resolve the principal's access key +// against a live RoleStore (when configured) so that a downgraded +// or revoked key cannot continue mutating with a still-valid JWT +// — the JWT freezes the role at login time, and tokens last one +// hour. Codex P1 on PR #635 flagged the gap on the HTTP path; +// the forward server already does this re-evaluation on its side. +type DynamoHandler struct { + source TablesSource + roles RoleStore + logger *slog.Logger +} + +// NewDynamoHandler binds the source and seeds logging with +// slog.Default(). Use WithLogger to attach a tagged logger and +// WithRoleStore to plug in the live access-key role lookup. +func NewDynamoHandler(source TablesSource) *DynamoHandler { + return &DynamoHandler{source: source, logger: slog.Default()} +} + +// WithLogger overrides the default slog destination. +func (h *DynamoHandler) WithLogger(l *slog.Logger) *DynamoHandler { + if l == nil { + return h + } + h.logger = l + return h +} + +// WithRoleStore enables per-request role revalidation on write +// endpoints. Without it, the handler trusts whatever role is +// embedded in the session JWT — which is fine for single-tenant +// deployments where the role config never changes, but +// problematic when an operator revokes or downgrades a key. The +// production wiring in main_admin.go always sets this. +func (h *DynamoHandler) WithRoleStore(r RoleStore) *DynamoHandler { + h.roles = r + return h +} + +// ServeHTTP routes /tables and /tables/{name}. We do not use +// http.ServeMux because the admin router already guards the +// /admin/api/v1/* prefix — adding another mux here would just +// duplicate the path-parsing logic. +func (h *DynamoHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + switch { + case r.URL.Path == pathDynamoTables: + switch r.Method { + case http.MethodGet: + h.handleList(w, r) + case http.MethodPost: + h.handleCreate(w, r) + default: + writeJSONError(w, http.StatusMethodNotAllowed, "method_not_allowed", "only GET or POST") + } + case strings.HasPrefix(r.URL.Path, pathPrefixDynamoTables): + name := strings.TrimPrefix(r.URL.Path, pathPrefixDynamoTables) + switch r.Method { + case http.MethodGet: + h.handleDescribe(w, r, name) + case http.MethodDelete: + h.handleDelete(w, r, name) + default: + writeJSONError(w, http.StatusMethodNotAllowed, "method_not_allowed", "only GET or DELETE") + } + default: + writeJSONError(w, http.StatusNotFound, "not_found", "") + } +} + +// dynamoListResponse is the JSON shape returned by GET /tables. +// NextToken is omitted when there is no further page so the client +// can use a presence check rather than parsing an empty string. +type dynamoListResponse struct { + Tables []string `json:"tables"` + NextToken string `json:"next_token,omitempty"` +} + +func (h *DynamoHandler) handleList(w http.ResponseWriter, r *http.Request) { + limit, err := parseDynamoListLimit(r.URL.Query().Get("limit")) + if err != nil { + writeJSONError(w, http.StatusBadRequest, "invalid_limit", err.Error()) + return + } + startAfter, err := decodeDynamoNextToken(r.URL.Query().Get("next_token")) + if err != nil { + writeJSONError(w, http.StatusBadRequest, "invalid_next_token", err.Error()) + return + } + + // AdminListTables returns the full lex-sorted name list that + // the adapter's metadata prefix scan produces; we then slice + // to the requested page. The adapter's listTableNames already + // materialises the same list for the SigV4 listTables path + // (adapter/dynamodb.go:1146), which has been in production + // since DynamoDB-compat shipped — admin's memory profile is + // strictly the SigV4 path's, not a regression on top of it. + // + // Worst-case bound: a Dynamo table name caps at 255 bytes, so + // 1k tables ≈ 256 KiB and 10k tables ≈ 2.5 MiB of name + // strings on the heap during a single list call. That is well + // inside the per-request budget the admin listener targets. + // Beyond that scale the right fix is to teach the adapter to + // stream the metadata scan via a callback (and plumb it + // through here), not to bolt a streaming layer on top of the + // already-materialised slice. Tracked separately; this + // endpoint is not the limiting factor. + names, err := h.source.AdminListTables(r.Context()) + if err != nil { + h.logger.LogAttrs(r.Context(), slog.LevelError, "admin dynamo list tables failed", + slog.String("error", err.Error()), + ) + writeJSONError(w, http.StatusInternalServerError, "dynamo_list_failed", + "failed to list tables; see server logs") + return + } + + page, next := paginateDynamoTableNames(names, startAfter, limit) + resp := dynamoListResponse{Tables: page} + if next != "" { + resp.NextToken = encodeDynamoNextToken(next) + } + // paginateDynamoTableNames is total over its input — it always + // returns a non-nil slice (an empty []string{} on the + // "cursor past end" branch, a real sub-slice otherwise) so the + // JSON shape is always `"tables": []` rather than `null` even + // without an explicit nil-check here. The Tables array + // contract is enforced at the producer. + writeAdminJSON(w, r.Context(), h.logger, resp) +} + +// handleCreate is the POST /tables handler. It validates the body +// up front, requires a write-capable principal, and translates any +// structured error from the source into the appropriate HTTP status. +// Success response is 201 Created with the freshly-stored table +// summary in the body — same shape as a GET /tables/{name} call. +func (h *DynamoHandler) handleCreate(w http.ResponseWriter, r *http.Request) { + principal, ok := h.principalForWrite(w, r) + if !ok { + return + } + body, err := decodeCreateTableRequest(r.Body) + if err != nil { + if errors.Is(err, errCreateBodyTooLarge) { + WriteMaxBytesError(w) + return + } + writeJSONError(w, http.StatusBadRequest, "invalid_body", err.Error()) + return + } + summary, err := h.source.AdminCreateTable(r.Context(), principal, body) + if err != nil { + h.writeTablesError(w, r, "create", err) + return + } + writeAdminJSONStatus(w, r.Context(), h.logger, http.StatusCreated, summary) +} + +// handleDelete is the DELETE /tables/{name} handler. Success is +// 204 No Content; the body is intentionally empty so the SPA can +// treat both 200 and 204 as success without parsing. +func (h *DynamoHandler) handleDelete(w http.ResponseWriter, r *http.Request, name string) { + if name == "" || strings.ContainsRune(name, '/') { + writeJSONError(w, http.StatusNotFound, "not_found", "") + return + } + principal, ok := h.principalForWrite(w, r) + if !ok { + return + } + if err := h.source.AdminDeleteTable(r.Context(), principal, name); err != nil { + h.writeTablesError(w, r, "delete", err) + return + } + w.Header().Set("Cache-Control", "no-store") + w.WriteHeader(http.StatusNoContent) +} + +// principalForWrite is the centralised authorisation gate for +// state-changing endpoints. It pulls the principal out of the +// request context (failing closed if SessionAuth somehow did not +// attach one), enforces RoleFull on the JWT-embedded role, and — +// when a RoleStore is configured — re-validates the access key +// against the live cluster role index so a downgraded or revoked +// key cannot continue mutating with a still-valid JWT (Codex P1 +// on PR #635). +// +// On any rejection the helper writes the appropriate HTTP error +// directly and returns ok=false so callers can early-exit with no +// further work. The forward server applies the same re-validation +// on its side, so leader-direct and forwarded write requests have +// matching authorisation contracts. +func (h *DynamoHandler) principalForWrite(w http.ResponseWriter, r *http.Request) (AuthPrincipal, bool) { + principal, ok := PrincipalFromContext(r.Context()) + if !ok { + // Should be unreachable — SessionAuth runs before this + // handler and rejects any request without a principal — + // but failing closed here is the right defence-in-depth + // posture for any future routing change that might bypass + // the middleware chain. + writeJSONError(w, http.StatusUnauthorized, "unauthenticated", "no session principal") + return AuthPrincipal{}, false + } + if !principal.Role.AllowsWrite() { + writeJSONError(w, http.StatusForbidden, "forbidden", + "this endpoint requires a full-access role") + return AuthPrincipal{}, false + } + // Live re-validation against the current role map. Skip when + // no RoleStore is configured (single-tenant deployments where + // the JWT-embedded role is authoritative); production wiring + // always sets one. + if h.roles != nil { + liveRole, exists := h.roles.LookupRole(principal.AccessKey) + if !exists || !liveRole.AllowsWrite() { + // Don't surface "your key was revoked" vs "your key + // was downgraded" — both are 403 forbidden, and the + // distinction is operator-visible only. + writeJSONError(w, http.StatusForbidden, "forbidden", + "this endpoint requires a full-access role") + return AuthPrincipal{}, false + } + // Use the live role downstream; the JWT may carry a + // stale value but the live one is authoritative. + principal.Role = liveRole + } + return principal, true +} + +// writeTablesError translates a TablesSource error into the +// appropriate HTTP response. Internal-server-error fallthrough logs +// the raw err.Error() but never sends it to the client, matching +// the read-path policy. +func (h *DynamoHandler) writeTablesError(w http.ResponseWriter, r *http.Request, op string, err error) { + switch { + case errors.Is(err, ErrTablesForbidden): + writeJSONError(w, http.StatusForbidden, "forbidden", + "this endpoint requires a full-access role") + case errors.Is(err, ErrTablesNotLeader): + // The follower→leader forwarding RPC (design 3.3) will + // catch this case in a follow-up PR. Until then, surface + // 503 + Retry-After: 1 so the SPA / curl can re-issue. + w.Header().Set("Retry-After", "1") + writeJSONError(w, http.StatusServiceUnavailable, "leader_unavailable", + "this admin node is not the raft leader") + case errors.Is(err, ErrTablesNotFound): + writeJSONError(w, http.StatusNotFound, "not_found", "table does not exist") + case errors.Is(err, ErrTablesAlreadyExists): + writeJSONError(w, http.StatusConflict, "already_exists", "table already exists") + default: + var verr *ValidationError + if errors.As(err, &verr) { + writeJSONError(w, http.StatusBadRequest, "invalid_request", verr.Error()) + return + } + h.logger.LogAttrs(r.Context(), slog.LevelError, "admin dynamo "+op+" table failed", + slog.String("error", err.Error()), + ) + writeJSONError(w, http.StatusInternalServerError, "dynamo_"+op+"_failed", + "failed to "+op+" table; see server logs") + } +} + +// decodeCreateTableRequest parses + validates the JSON body. Each +// failure mode maps to a specific human-readable message so the SPA +// can show a useful error without the user having to look at the +// network tab. +func decodeCreateTableRequest(body io.Reader) (CreateTableRequest, error) { + if body == nil { + return CreateTableRequest{}, errors.New("request body is empty") + } + raw, err := io.ReadAll(body) + if err != nil { + if IsMaxBytesError(err) { + // Sentinel so handleCreate can map to 413 rather than + // the generic 400 invalid_body. + return CreateTableRequest{}, errCreateBodyTooLarge + } + return CreateTableRequest{}, errors.New("request body could not be read") + } + // Reject any NUL byte in the body. JSON has no need for a + // raw NUL (control characters must be \u-escaped), and at + // least one of our decoders (goccy/go-json) treats a raw + // NUL as end-of-input, so a body like + // `{"table_name":...}\x00{"extra":1}` would otherwise sneak + // past dec.More(). Codex P2 on PR #634 flagged this as a + // payload-smuggling vector. + if bytes.IndexByte(raw, 0) >= 0 { + return CreateTableRequest{}, errors.New("request body contains a NUL byte") + } + dec := json.NewDecoder(bytes.NewReader(raw)) + dec.DisallowUnknownFields() + var out CreateTableRequest + if err := dec.Decode(&out); err != nil { + return CreateTableRequest{}, errors.New("request body is not valid JSON") + } + // Reject trailing JSON tokens — `{"table_name":"a", ...}{...}` + // must surface as 400, not silently accept the first object and + // drop the rest. dec.More() returns true when there is at least + // one more JSON value in the stream beyond the one we just + // decoded. + if dec.More() { + return CreateTableRequest{}, errors.New("request body has trailing data after the JSON object") + } + if err := validateCreateTableRequest(&out); err != nil { + return CreateTableRequest{}, err + } + return out, nil +} + +// validateCreateTableRequest is the field-level validation pass +// kept separate from the JSON decoding so each function stays under +// the project's cyclomatic-complexity ceiling and the decoder is +// trivially auditable on its own. +func validateCreateTableRequest(in *CreateTableRequest) error { + // Trim whitespace in place so the canonical name flows through + // the rest of the pipeline. Without this, a name like " foo " + // passes the empty-after-trim check, propagates to the adapter + // (whose own TrimSpace check on creation also passes), and + // gets stored verbatim — leaving a table that the URL-based + // describe/delete routes cannot address because they trim the + // segment literally. Claude's review on PR #634 flagged the + // drift; trimming once at this boundary fixes it. + in.TableName = strings.TrimSpace(in.TableName) + if in.TableName == "" { + return errors.New("table_name is required") + } + // Reject slash-bearing names symmetrically with handleDescribe + // and handleDelete, which already 404 on `/`. Without this + // guard a user could create `foo/bar` and then never be able + // to describe or delete it through the same admin surface — + // the orphaned table would be reachable only through the SigV4 + // path. Blocking the asymmetric edge case at create time is + // strictly better than discovering it later. + if strings.ContainsRune(in.TableName, '/') { + return errors.New("table_name must not contain '/'") + } + if err := validateAttribute(in.PartitionKey, "partition_key"); err != nil { + return err + } + if in.SortKey != nil { + if err := validateAttribute(*in.SortKey, "sort_key"); err != nil { + return err + } + } + for i := range in.GSI { + if err := validateGSI(&in.GSI[i], i); err != nil { + return err + } + } + return nil +} + +// validateAttribute enforces the "S | N | B" rule for primary-key +// and GSI key columns. We deliberately do not silently accept +// lower-case or whitespace-padded variants — Dynamo's wire format +// requires the exact upper-case letter. +func validateAttribute(attr CreateTableAttribute, field string) error { + if strings.TrimSpace(attr.Name) == "" { + return errors.New(field + ".name is required") + } + switch attr.Type { + case "S", "N", "B": + return nil + default: + return errors.New(field + `.type must be one of "S", "N", "B"`) + } +} + +func validateGSI(gsi *CreateTableGSI, index int) error { + prefix := "gsi[" + strconv.Itoa(index) + "]" + if strings.TrimSpace(gsi.Name) == "" { + return errors.New(prefix + ".name is required") + } + if err := validateAttribute(gsi.PartitionKey, prefix+".partition_key"); err != nil { + return err + } + if gsi.SortKey != nil { + if err := validateAttribute(*gsi.SortKey, prefix+".sort_key"); err != nil { + return err + } + } + // Canonicalise the projection type in-place. The handler + // accepts case-insensitive input ("include" / "ALL") for SPA + // ergonomics, but the adapter's buildCreateTableProjection + // only matches exact uppercase. Normalising once at the + // boundary keeps that mismatch from surfacing as a confusing + // post-validation 500 — the bridge and the AdminForward server + // both forward whatever ends up in this field, so writing back + // the canonical form here means every downstream consumer sees + // the same shape. The empty string keeps its meaning ("default + // to ALL") on both sides. + canonical := strings.TrimSpace(strings.ToUpper(gsi.Projection.Type)) + switch canonical { + case "", "ALL", "KEYS_ONLY", "INCLUDE": + gsi.Projection.Type = canonical + return nil + default: + return errors.New(prefix + `.projection.type must be one of "ALL", "KEYS_ONLY", "INCLUDE"`) + } +} + +func (h *DynamoHandler) handleDescribe(w http.ResponseWriter, r *http.Request, name string) { + if name == "" || strings.ContainsRune(name, '/') { + writeJSONError(w, http.StatusNotFound, "not_found", "") + return + } + summary, exists, err := h.source.AdminDescribeTable(r.Context(), name) + if err != nil { + h.logger.LogAttrs(r.Context(), slog.LevelError, "admin dynamo describe table failed", + slog.String("table", name), + slog.String("error", err.Error()), + ) + writeJSONError(w, http.StatusInternalServerError, "dynamo_describe_failed", + "failed to describe table; see server logs") + return + } + if !exists { + writeJSONError(w, http.StatusNotFound, "not_found", "table does not exist") + return + } + writeAdminJSON(w, r.Context(), h.logger, summary) +} + +// parseDynamoListLimit translates the ?limit= query parameter into a +// concrete page size. Empty falls back to the design-doc default; +// negatives or non-numerics are an outright client error; values past +// the ceiling are silently clamped (not an error) so the SPA's +// "request the maximum" pattern works without a probe round-trip. +func parseDynamoListLimit(raw string) (int, error) { + if raw == "" { + return defaultDynamoListLimit, nil + } + n, err := strconv.Atoi(raw) + if err != nil { + return 0, errors.New("limit must be an integer") + } + if n <= 0 { + return 0, errors.New("limit must be positive") + } + if n > dynamoListLimitMax { + return dynamoListLimitMax, nil + } + return n, nil +} + +// decodeDynamoNextToken reverses encodeDynamoNextToken. We base64-wrap +// the raw last-table-name so the wire token is opaque from the +// client's perspective and we can change the cursor representation +// later without breaking the API contract. +func decodeDynamoNextToken(raw string) (string, error) { + if raw == "" { + return "", nil + } + decoded, err := base64.RawURLEncoding.DecodeString(raw) + if err != nil { + return "", errors.New("next_token is not valid base64url") + } + return string(decoded), nil +} + +func encodeDynamoNextToken(name string) string { + return base64.RawURLEncoding.EncodeToString([]byte(name)) +} + +// paginateDynamoTableNames slices `names` (already lex-sorted by the +// adapter) into a single page starting strictly after `startAfter`. +// The second return is the opaque cursor the client should pass back +// for the next call, or "" if this is the last page. +func paginateDynamoTableNames(names []string, startAfter string, limit int) ([]string, string) { + start := 0 + if startAfter != "" { + // sort.SearchStrings returns the first index >= startAfter; + // adding 1 only when the entry equals startAfter gives us + // "strictly after" semantics. A startAfter that no longer + // exists in the sorted list still produces a sane resume + // (we pick up at the first name greater than the cursor). + idx := sort.SearchStrings(names, startAfter) + switch { + case idx >= len(names): + return []string{}, "" + case names[idx] == startAfter: + start = idx + 1 + default: + start = idx + } + } + end := start + limit + if end > len(names) { + end = len(names) + } + page := names[start:end] + if end < len(names) && len(page) > 0 { + return page, page[len(page)-1] + } + return page, "" +} + +// writeAdminJSON is the 200-OK convenience wrapper around +// writeAdminJSONStatus. It exists only so the read-path call sites +// stay compact; both routes share the same marshal-then-write +// safety guarantee. +func writeAdminJSON(w http.ResponseWriter, ctx context.Context, logger *slog.Logger, body any) { + writeAdminJSONStatus(w, ctx, logger, http.StatusOK, body) +} + +// writeAdminJSONStatus marshals `body` to a buffer first, *then* +// writes status + body — never streaming an encoder directly to the +// ResponseWriter. The streaming form would commit the status header +// and then truncate mid-body if json.Marshal failed on a value deep +// in the struct (an unsupported type, a Marshaler returning an +// error), leaving a malformed JSON object on the wire that the SPA +// has no way to recover from. Marshalling first lets us upgrade an +// encode failure to a clean 500 with a well-formed error envelope. +func writeAdminJSONStatus(w http.ResponseWriter, ctx context.Context, logger *slog.Logger, status int, body any) { + payload, err := json.Marshal(body) + if err != nil { + if logger == nil { + logger = slog.Default() + } + logger.LogAttrs(ctx, slog.LevelError, "admin response marshal failed", + slog.String("error", err.Error()), + ) + writeJSONError(w, http.StatusInternalServerError, "internal", "failed to encode response") + return + } + w.Header().Set("Content-Type", "application/json; charset=utf-8") + // Defence-in-depth: tell the browser not to MIME-sniff the + // response body. The admin surface is JSON-only, so a sniffed + // "this might be HTML" guess is never useful and could enable + // XSS-via-sniffing on a hostile payload that somehow reached + // here. Cookie-gated admin endpoints + a single static + // Content-Type make this cheap and standard. + w.Header().Set("X-Content-Type-Options", "nosniff") + w.Header().Set("Cache-Control", "no-store") + w.WriteHeader(status) + if _, werr := w.Write(payload); werr != nil { + // Status is already on the wire, so we can only log. Write + // failures here usually mean the client closed the connection. + if logger == nil { + logger = slog.Default() + } + logger.LogAttrs(ctx, slog.LevelWarn, "admin response write failed", + slog.String("error", werr.Error()), + ) + } +} diff --git a/internal/admin/dynamo_handler_test.go b/internal/admin/dynamo_handler_test.go new file mode 100644 index 000000000..a82fb707c --- /dev/null +++ b/internal/admin/dynamo_handler_test.go @@ -0,0 +1,796 @@ +package admin + +import ( + "context" + "encoding/base64" + "errors" + "net/http" + "net/http/httptest" + "sort" + "strings" + "testing" + + "github.com/goccy/go-json" + "github.com/stretchr/testify/require" +) + +// stubTablesSource is the in-memory test double the dynamo handler +// tests use. AdminListTables returns names in lex order, matching +// the adapter's contract. +type stubTablesSource struct { + tables map[string]*DynamoTableSummary + listErr error + descErr error + createErr error + deleteErr error + + // Last-call tracking: tests assert the principal that reached + // the source so we can prove SessionAuth wired through + // correctly without parsing slog audit lines. + lastCreatePrincipal AuthPrincipal + lastCreateInput CreateTableRequest + lastDeletePrincipal AuthPrincipal + lastDeleteName string +} + +func (s *stubTablesSource) AdminListTables(_ context.Context) ([]string, error) { + if s.listErr != nil { + return nil, s.listErr + } + out := make([]string, 0, len(s.tables)) + for k := range s.tables { + out = append(out, k) + } + sort.Strings(out) + return out, nil +} + +func (s *stubTablesSource) AdminDescribeTable(_ context.Context, name string) (*DynamoTableSummary, bool, error) { + if s.descErr != nil { + return nil, false, s.descErr + } + t, ok := s.tables[name] + if !ok { + return nil, false, nil + } + return t, true, nil +} + +func (s *stubTablesSource) AdminCreateTable(_ context.Context, principal AuthPrincipal, in CreateTableRequest) (*DynamoTableSummary, error) { + s.lastCreatePrincipal = principal + s.lastCreateInput = in + if s.createErr != nil { + return nil, s.createErr + } + if _, exists := s.tables[in.TableName]; exists { + return nil, ErrTablesAlreadyExists + } + summary := &DynamoTableSummary{ + Name: in.TableName, + PartitionKey: in.PartitionKey.Name, + Generation: 1, + } + if in.SortKey != nil { + summary.SortKey = in.SortKey.Name + } + if s.tables == nil { + s.tables = map[string]*DynamoTableSummary{} + } + s.tables[in.TableName] = summary + return summary, nil +} + +func (s *stubTablesSource) AdminDeleteTable(_ context.Context, principal AuthPrincipal, name string) error { + s.lastDeletePrincipal = principal + s.lastDeleteName = name + if s.deleteErr != nil { + return s.deleteErr + } + if _, exists := s.tables[name]; !exists { + return ErrTablesNotFound + } + delete(s.tables, name) + return nil +} + +func newDynamoHandlerForTest(src TablesSource) *DynamoHandler { + return NewDynamoHandler(src) +} + +func TestDynamoHandler_ListTables_EmptyArrayNotNull(t *testing.T) { + h := newDynamoHandlerForTest(&stubTablesSource{tables: map[string]*DynamoTableSummary{}}) + req := httptest.NewRequest(http.MethodGet, pathDynamoTables, nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + + require.Equal(t, http.StatusOK, rec.Code) + require.Contains(t, rec.Body.String(), `"tables":[]`) + require.NotContains(t, rec.Body.String(), `"next_token"`) +} + +func TestDynamoHandler_ListTables_DefaultLimitAppliesAt100(t *testing.T) { + tables := make(map[string]*DynamoTableSummary, 250) + for i := 0; i < 250; i++ { + name := tableNameForIndex(i) + tables[name] = &DynamoTableSummary{Name: name} + } + h := newDynamoHandlerForTest(&stubTablesSource{tables: tables}) + req := httptest.NewRequest(http.MethodGet, pathDynamoTables, nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + + require.Equal(t, http.StatusOK, rec.Code) + var resp dynamoListResponse + require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &resp)) + require.Len(t, resp.Tables, 100) + require.NotEmpty(t, resp.NextToken) + + // next_token must round-trip through base64url back to the last + // page entry — opaque to the client but stable enough that the + // SPA's "next page" call resumes deterministically. + decoded, err := base64.RawURLEncoding.DecodeString(resp.NextToken) + require.NoError(t, err) + require.Equal(t, resp.Tables[len(resp.Tables)-1], string(decoded)) +} + +func TestDynamoHandler_ListTables_LimitClampedToMax(t *testing.T) { + tables := make(map[string]*DynamoTableSummary, 1500) + for i := 0; i < 1500; i++ { + name := tableNameForIndex(i) + tables[name] = &DynamoTableSummary{Name: name} + } + h := newDynamoHandlerForTest(&stubTablesSource{tables: tables}) + req := httptest.NewRequest(http.MethodGet, pathDynamoTables+"?limit=99999", nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + + require.Equal(t, http.StatusOK, rec.Code) + var resp dynamoListResponse + require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &resp)) + require.Len(t, resp.Tables, dynamoListLimitMax) +} + +func TestDynamoHandler_ListTables_PaginationResumesAfterCursor(t *testing.T) { + tables := map[string]*DynamoTableSummary{ + "alpha": {Name: "alpha"}, + "bravo": {Name: "bravo"}, + "charlie": {Name: "charlie"}, + "delta": {Name: "delta"}, + "echo": {Name: "echo"}, + } + h := newDynamoHandlerForTest(&stubTablesSource{tables: tables}) + + // First page: limit=2 → ["alpha", "bravo"], next_token=base64("bravo"). + req := httptest.NewRequest(http.MethodGet, pathDynamoTables+"?limit=2", nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + var page1 dynamoListResponse + require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &page1)) + require.Equal(t, []string{"alpha", "bravo"}, page1.Tables) + require.NotEmpty(t, page1.NextToken) + + // Second page: forward the opaque token verbatim — contract + // must not require the client to URL-escape it again. + req2 := httptest.NewRequest(http.MethodGet, pathDynamoTables+"?limit=2&next_token="+page1.NextToken, nil) + rec2 := httptest.NewRecorder() + h.ServeHTTP(rec2, req2) + var page2 dynamoListResponse + require.NoError(t, json.Unmarshal(rec2.Body.Bytes(), &page2)) + require.Equal(t, []string{"charlie", "delta"}, page2.Tables) + require.NotEmpty(t, page2.NextToken) + + // Third page exhausts the list and must omit next_token. + req3 := httptest.NewRequest(http.MethodGet, pathDynamoTables+"?limit=2&next_token="+page2.NextToken, nil) + rec3 := httptest.NewRecorder() + h.ServeHTTP(rec3, req3) + var page3 dynamoListResponse + require.NoError(t, json.Unmarshal(rec3.Body.Bytes(), &page3)) + require.Equal(t, []string{"echo"}, page3.Tables) + require.Empty(t, page3.NextToken) +} + +func TestDynamoHandler_ListTables_NextTokenForVanishedNameFastForwards(t *testing.T) { + // A cursor for a name that was deleted between pages must resume + // at the next surviving entry, not silently swallow the page. + tables := map[string]*DynamoTableSummary{ + "alpha": {Name: "alpha"}, + "delta": {Name: "delta"}, + } + h := newDynamoHandlerForTest(&stubTablesSource{tables: tables}) + + cursor := base64.RawURLEncoding.EncodeToString([]byte("bravo")) + req := httptest.NewRequest(http.MethodGet, pathDynamoTables+"?limit=10&next_token="+cursor, nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + + var resp dynamoListResponse + require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &resp)) + require.Equal(t, []string{"delta"}, resp.Tables) + require.Empty(t, resp.NextToken) +} + +func TestDynamoHandler_ListTables_RejectsBadLimit(t *testing.T) { + cases := []struct { + raw string + expect string + }{ + {"abc", "invalid_limit"}, + {"-5", "invalid_limit"}, + {"0", "invalid_limit"}, + } + for _, tc := range cases { + t.Run(tc.raw, func(t *testing.T) { + h := newDynamoHandlerForTest(&stubTablesSource{tables: map[string]*DynamoTableSummary{}}) + req := httptest.NewRequest(http.MethodGet, pathDynamoTables+"?limit="+tc.raw, nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusBadRequest, rec.Code) + require.Contains(t, rec.Body.String(), tc.expect) + }) + } +} + +func TestDynamoHandler_ListTables_RejectsBadNextToken(t *testing.T) { + h := newDynamoHandlerForTest(&stubTablesSource{tables: map[string]*DynamoTableSummary{}}) + req := httptest.NewRequest(http.MethodGet, pathDynamoTables+"?next_token=!!!not-base64!!!", nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusBadRequest, rec.Code) + require.Contains(t, rec.Body.String(), "invalid_next_token") +} + +func TestDynamoHandler_ListTables_SourceErrorIsHidden(t *testing.T) { + h := newDynamoHandlerForTest(&stubTablesSource{listErr: errors.New("kv backing sentinel ZZZ-471")}) + req := httptest.NewRequest(http.MethodGet, pathDynamoTables, nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + + require.Equal(t, http.StatusInternalServerError, rec.Code) + require.Contains(t, rec.Body.String(), "dynamo_list_failed") + require.NotContains(t, rec.Body.String(), "ZZZ-471") + require.NotContains(t, rec.Body.String(), "kv backing sentinel") +} + +func TestDynamoHandler_DescribeTable_HappyPath(t *testing.T) { + src := &stubTablesSource{tables: map[string]*DynamoTableSummary{ + "orders": { + Name: "orders", + PartitionKey: "customer", + SortKey: "orderID", + Generation: 42, + GlobalSecondaryIndexes: []DynamoGSISummary{ + {Name: "by-status", PartitionKey: "status", SortKey: "createdAt", ProjectionType: "ALL"}, + }, + }, + }} + h := newDynamoHandlerForTest(src) + req := httptest.NewRequest(http.MethodGet, pathDynamoTables+"/orders", nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + + require.Equal(t, http.StatusOK, rec.Code) + var got DynamoTableSummary + require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &got)) + require.Equal(t, "orders", got.Name) + require.Equal(t, "customer", got.PartitionKey) + require.Equal(t, "orderID", got.SortKey) + require.EqualValues(t, 42, got.Generation) + require.Len(t, got.GlobalSecondaryIndexes, 1) + require.Equal(t, "by-status", got.GlobalSecondaryIndexes[0].Name) + require.Equal(t, "ALL", got.GlobalSecondaryIndexes[0].ProjectionType) +} + +func TestDynamoHandler_DescribeTable_MissingReturns404(t *testing.T) { + h := newDynamoHandlerForTest(&stubTablesSource{tables: map[string]*DynamoTableSummary{}}) + req := httptest.NewRequest(http.MethodGet, pathDynamoTables+"/absent", nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + + require.Equal(t, http.StatusNotFound, rec.Code) + require.Contains(t, rec.Body.String(), "not_found") +} + +func TestDynamoHandler_DescribeTable_RejectsSlashInName(t *testing.T) { + // /admin/api/v1/dynamo/tables/foo/bar must not let the handler + // call AdminDescribeTable with a "/"-bearing name. Returning 404 + // is preferable to 400 here because the URL itself is the only + // way to express the table name; an embedded "/" simply does not + // route to a real table. + src := &stubTablesSource{ + descErr: errors.New("must not be invoked with slash-bearing name"), + } + h := newDynamoHandlerForTest(src) + req := httptest.NewRequest(http.MethodGet, pathDynamoTables+"/foo/bar", nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusNotFound, rec.Code) +} + +func TestDynamoHandler_DescribeTable_SourceErrorIsHidden(t *testing.T) { + h := newDynamoHandlerForTest(&stubTablesSource{descErr: errors.New("storage sentinel QQ-808")}) + req := httptest.NewRequest(http.MethodGet, pathDynamoTables+"/orders", nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + + require.Equal(t, http.StatusInternalServerError, rec.Code) + require.Contains(t, rec.Body.String(), "dynamo_describe_failed") + require.NotContains(t, rec.Body.String(), "QQ-808") +} + +func TestDynamoHandler_RejectsUnsupportedMethods(t *testing.T) { + h := newDynamoHandlerForTest(&stubTablesSource{tables: map[string]*DynamoTableSummary{}}) + // /tables accepts GET + POST; PUT/PATCH/DELETE on the + // collection root are 405. Wrapping the principal lets the + // handler reach the method-dispatch arm rather than 401-ing + // on missing principal first. + for _, m := range []string{http.MethodPut, http.MethodDelete, http.MethodPatch} { + req := httptest.NewRequest(m, pathDynamoTables, nil) + req = withWritePrincipal(req) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusMethodNotAllowed, rec.Code, "collection method %s", m) + } + // /tables/{name} accepts GET + DELETE; POST/PUT/PATCH are 405. + for _, m := range []string{http.MethodPost, http.MethodPut, http.MethodPatch} { + req := httptest.NewRequest(m, pathDynamoTables+"/x", nil) + req = withWritePrincipal(req) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusMethodNotAllowed, rec.Code, "item method %s", m) + } +} + +func TestDynamoHandler_UnknownSubpathReturns404(t *testing.T) { + h := newDynamoHandlerForTest(&stubTablesSource{tables: map[string]*DynamoTableSummary{}}) + // /admin/api/v1/dynamo/something-else falls outside the prefix the + // handler owns; the handler must answer 404 so the admin router's + // "API took it" prefix routing stays correct. + req := httptest.NewRequest(http.MethodGet, "/admin/api/v1/dynamo/things", nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusNotFound, rec.Code) +} + +func TestDynamoHandler_DescribeTable_TrailingSlashIsRejected(t *testing.T) { + // /admin/api/v1/dynamo/tables/ with an empty trailing component + // would otherwise pass an empty name down to the source. + h := newDynamoHandlerForTest(&stubTablesSource{tables: map[string]*DynamoTableSummary{}}) + req := httptest.NewRequest(http.MethodGet, pathDynamoTables+"/", nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusNotFound, rec.Code) +} + +// withWritePrincipal injects a full-access principal into the +// request context so handler tests bypass the SessionAuth middleware +// while keeping the role check live. Mirrors how SessionAuth wires +// the value in production. +func withWritePrincipal(req *http.Request) *http.Request { + return req.WithContext(context.WithValue(req.Context(), ctxKeyPrincipal, + AuthPrincipal{AccessKey: "AKIA_FULL", Role: RoleFull})) +} + +func withReadOnlyPrincipal(req *http.Request) *http.Request { + return req.WithContext(context.WithValue(req.Context(), ctxKeyPrincipal, + AuthPrincipal{AccessKey: "AKIA_RO", Role: RoleReadOnly})) +} + +// validCreateBody returns a minimal-but-valid POST body the +// happy-path tests share. +func validCreateBody() string { + return `{"table_name":"users","partition_key":{"name":"id","type":"S"}}` +} + +// TestDynamoHandler_CreateTable_TrimsWhitespaceFromTableName covers +// the Claude-review finding on PR #634: a name like " users " +// must be trimmed before reaching the source so that subsequent +// describe/delete URL segments (which are matched literally) +// resolve the same table. +func TestDynamoHandler_CreateTable_TrimsWhitespaceFromTableName(t *testing.T) { + src := &stubTablesSource{tables: map[string]*DynamoTableSummary{}} + h := newDynamoHandlerForTest(src) + body := `{"table_name":" users ","partition_key":{"name":"id","type":"S"}}` + req := httptest.NewRequest(http.MethodPost, pathDynamoTables, strings.NewReader(body)) + req = withWritePrincipal(req) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusCreated, rec.Code, rec.Body.String()) + require.Equal(t, "users", src.lastCreateInput.TableName, + "name reaching the source must be the trimmed canonical form") +} + +// TestDynamoHandler_CreateTable_WhitespaceOnlyNameRejected ensures +// that a name consisting solely of whitespace still fails after +// the trim — i.e., trimming does not weaken the empty-name guard. +func TestDynamoHandler_CreateTable_WhitespaceOnlyNameRejected(t *testing.T) { + src := &stubTablesSource{tables: map[string]*DynamoTableSummary{}} + h := newDynamoHandlerForTest(src) + body := `{"table_name":" ","partition_key":{"name":"id","type":"S"}}` + req := httptest.NewRequest(http.MethodPost, pathDynamoTables, strings.NewReader(body)) + req = withWritePrincipal(req) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusBadRequest, rec.Code) + require.Contains(t, rec.Body.String(), "table_name is required") +} + +// TestDynamoHandler_CreateTable_LiveRoleRevocation covers Codex P1 +// on PR #635: a session JWT with role=full from before a config +// reload must NOT keep mutating after the access key is revoked +// or downgraded. The handler re-validates against the live +// RoleStore on every write. +func TestDynamoHandler_CreateTable_LiveRoleRevocation(t *testing.T) { + src := &stubTablesSource{tables: map[string]*DynamoTableSummary{}} + // Live role map says the access key is read-only — even though + // the JWT in withWritePrincipal carries role=full. + roles := MapRoleStore{"AKIA_FULL": RoleReadOnly} + h := NewDynamoHandler(src).WithRoleStore(roles) + req := httptest.NewRequest(http.MethodPost, pathDynamoTables, strings.NewReader(validCreateBody())) + req = withWritePrincipal(req) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + + require.Equal(t, http.StatusForbidden, rec.Code, + "a downgraded access key must be rejected even with a still-valid full-role JWT") + require.Empty(t, src.lastCreateInput.TableName, + "source must not be touched on revocation") +} + +// TestDynamoHandler_CreateTable_LiveRoleAccessKeyRemoved covers +// the harder case: the access key was deleted entirely from the +// role index. Same 403, same defence-in-depth. +func TestDynamoHandler_CreateTable_LiveRoleAccessKeyRemoved(t *testing.T) { + src := &stubTablesSource{tables: map[string]*DynamoTableSummary{}} + roles := MapRoleStore{} // AKIA_FULL is absent + h := NewDynamoHandler(src).WithRoleStore(roles) + req := httptest.NewRequest(http.MethodPost, pathDynamoTables, strings.NewReader(validCreateBody())) + req = withWritePrincipal(req) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + + require.Equal(t, http.StatusForbidden, rec.Code) + require.Empty(t, src.lastCreateInput.TableName) +} + +// TestDynamoHandler_DeleteTable_LiveRoleRevocation mirrors the +// create-side coverage on the delete path. +func TestDynamoHandler_DeleteTable_LiveRoleRevocation(t *testing.T) { + src := &stubTablesSource{tables: map[string]*DynamoTableSummary{ + "users": {Name: "users"}, + }} + roles := MapRoleStore{"AKIA_FULL": RoleReadOnly} + h := NewDynamoHandler(src).WithRoleStore(roles) + req := httptest.NewRequest(http.MethodDelete, pathDynamoTables+"/users", nil) + req = withWritePrincipal(req) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + + require.Equal(t, http.StatusForbidden, rec.Code) + require.Empty(t, src.lastDeleteName) +} + +func TestDynamoHandler_CreateTable_HappyPath(t *testing.T) { + src := &stubTablesSource{tables: map[string]*DynamoTableSummary{}} + h := newDynamoHandlerForTest(src) + req := httptest.NewRequest(http.MethodPost, pathDynamoTables, strings.NewReader(validCreateBody())) + req = withWritePrincipal(req) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + + require.Equal(t, http.StatusCreated, rec.Code) + require.Equal(t, AuthPrincipal{AccessKey: "AKIA_FULL", Role: RoleFull}, src.lastCreatePrincipal) + require.Equal(t, "users", src.lastCreateInput.TableName) + var got DynamoTableSummary + require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &got)) + require.Equal(t, "users", got.Name) + require.Equal(t, "id", got.PartitionKey) +} + +// TestDynamoHandler_CreateTable_CanonicalisesProjectionType makes +// sure validateGSI normalises a lowercase "include" value back to +// the uppercase form the adapter expects. Without normalisation +// the request would pass handler validation only to fail at the +// adapter as ValidationException ("invalid projection") — exactly +// the boundary mismatch Codex P2 flagged on PR #635. +func TestDynamoHandler_CreateTable_CanonicalisesProjectionType(t *testing.T) { + src := &stubTablesSource{tables: map[string]*DynamoTableSummary{}} + h := newDynamoHandlerForTest(src) + body := `{ + "table_name":"t", + "partition_key":{"name":"id","type":"S"}, + "gsi":[{ + "name":"by-status", + "partition_key":{"name":"status","type":"S"}, + "projection":{"type":"include","non_key_attributes":["author"]} + }] + }` + req := httptest.NewRequest(http.MethodPost, pathDynamoTables, strings.NewReader(body)) + req = withWritePrincipal(req) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusCreated, rec.Code, rec.Body.String()) + require.Len(t, src.lastCreateInput.GSI, 1) + require.Equal(t, "INCLUDE", src.lastCreateInput.GSI[0].Projection.Type, + "projection type must reach the source canonicalised so the adapter does not re-reject it") +} + +// TestDynamoHandler_CreateTable_AcceptsMixedCaseProjection covers +// the same canonicalisation for KEYS_ONLY / Keys_Only / keys_only. +func TestDynamoHandler_CreateTable_AcceptsMixedCaseProjection(t *testing.T) { + cases := []struct{ in, want string }{ + {"all", "ALL"}, + {"All", "ALL"}, + {"KEYS_ONLY", "KEYS_ONLY"}, + {"keys_only", "KEYS_ONLY"}, + {"Include", "INCLUDE"}, + } + for _, tc := range cases { + t.Run(tc.in, func(t *testing.T) { + src := &stubTablesSource{tables: map[string]*DynamoTableSummary{}} + h := newDynamoHandlerForTest(src) + body := `{ + "table_name":"t", + "partition_key":{"name":"id","type":"S"}, + "gsi":[{ + "name":"i", + "partition_key":{"name":"k","type":"S"}, + "projection":{"type":"` + tc.in + `"} + }] + }` + req := httptest.NewRequest(http.MethodPost, pathDynamoTables, strings.NewReader(body)) + req = withWritePrincipal(req) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusCreated, rec.Code, rec.Body.String()) + require.Equal(t, tc.want, src.lastCreateInput.GSI[0].Projection.Type) + }) + } +} + +func TestDynamoHandler_CreateTable_RejectsReadOnlyPrincipal(t *testing.T) { + src := &stubTablesSource{tables: map[string]*DynamoTableSummary{}} + h := newDynamoHandlerForTest(src) + req := httptest.NewRequest(http.MethodPost, pathDynamoTables, strings.NewReader(validCreateBody())) + req = withReadOnlyPrincipal(req) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + + require.Equal(t, http.StatusForbidden, rec.Code) + // The source must not be touched at all when the role check + // fires — leaking a read-only call into the source layer would + // be a defence-in-depth regression. + require.Empty(t, src.lastCreateInput.TableName) +} + +func TestDynamoHandler_CreateTable_RejectsMissingPrincipal(t *testing.T) { + // Without a principal in context (SessionAuth would normally + // reject earlier, but defence-in-depth here matters), the + // handler must answer 401 rather than crashing on a zero + // AuthPrincipal. + src := &stubTablesSource{tables: map[string]*DynamoTableSummary{}} + h := newDynamoHandlerForTest(src) + req := httptest.NewRequest(http.MethodPost, pathDynamoTables, strings.NewReader(validCreateBody())) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusUnauthorized, rec.Code) +} + +// TestDynamoHandler_CreateTable_OversizedBodyReturns413 confirms a +// body that trips BodyLimit's MaxBytesReader surfaces as 413 +// payload_too_large rather than the generic 400 invalid_body +// (Codex P2 on PR #634). The middleware contract in +// internal/admin/middleware.go is that oversized bodies map to +// 413; the previous wholesale "decode failure → 400" path +// silently broke that for this endpoint. +func TestDynamoHandler_CreateTable_OversizedBodyReturns413(t *testing.T) { + src := &stubTablesSource{tables: map[string]*DynamoTableSummary{}} + h := newDynamoHandlerForTest(src) + // Build a body just over the limit. Padding a real-shape + // JSON object with whitespace keeps the structure valid up + // to the cap so the test isolates the size-rejection path. + oversize := `{"table_name":"u","partition_key":{"name":"id","type":"S"}` + + strings.Repeat(" ", int(defaultBodyLimit)+1) + "}" + req := httptest.NewRequest(http.MethodPost, pathDynamoTables, strings.NewReader(oversize)) + req = withWritePrincipal(req) + // The router applies BodyLimit at the outer wrap; emulate + // that here so MaxBytesReader is in play during ReadAll. + req.Body = http.MaxBytesReader(httptest.NewRecorder(), req.Body, defaultBodyLimit) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + + require.Equal(t, http.StatusRequestEntityTooLarge, rec.Code) + require.Contains(t, rec.Body.String(), "payload_too_large") + require.Empty(t, src.lastCreateInput.TableName, "source must not be touched on oversized body") +} + +func TestDynamoHandler_CreateTable_RejectsBadJSON(t *testing.T) { + src := &stubTablesSource{tables: map[string]*DynamoTableSummary{}} + h := newDynamoHandlerForTest(src) + cases := []string{ + ``, + `{`, + `{"table_name":""}`, + `{"table_name":"u"}`, // missing partition_key + `{"table_name":"u","partition_key":{"name":"id","type":"X"}}`, // bad type + `{"table_name":"u","partition_key":{"name":"id","type":"S"},"sort_key":{"name":"","type":"S"}}`, // bad sort key + `{"table_name":"u","partition_key":{"name":"id","type":"S"},"unknown_field":1}`, // strict decode + `{"table_name":"u","partition_key":{"name":"id","type":"S"}}{"second":"object"}`, // trailing JSON + `{"table_name":"u","partition_key":{"name":"id","type":"S"}} 42`, // trailing scalar + `{"table_name":"foo/bar","partition_key":{"name":"id","type":"S"}}`, // slash in name + `{"table_name":"a/b/c","partition_key":{"name":"id","type":"S"}}`, // multiple slashes + // NUL-delimited smuggling vector (Codex P2). goccy/go-json + // treats raw NUL as end-of-input so dec.More() would + // otherwise return false; the explicit NUL scan catches it. + "{\"table_name\":\"u\",\"partition_key\":{\"name\":\"id\",\"type\":\"S\"}}\x00{\"extra\":1}", + // Lone NUL inside the JSON object — same vector, less obvious. + "{\"table_name\":\"u\",\"partition_key\":{\"name\":\"id\",\"type\":\"S\"}}\x00", + } + for _, body := range cases { + t.Run(body, func(t *testing.T) { + req := httptest.NewRequest(http.MethodPost, pathDynamoTables, strings.NewReader(body)) + req = withWritePrincipal(req) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusBadRequest, rec.Code, "body=%q", body) + require.Contains(t, rec.Body.String(), "invalid_body") + }) + } +} + +func TestDynamoHandler_CreateTable_AlreadyExistsReturns409(t *testing.T) { + src := &stubTablesSource{tables: map[string]*DynamoTableSummary{ + "users": {Name: "users", PartitionKey: "id"}, + }} + h := newDynamoHandlerForTest(src) + req := httptest.NewRequest(http.MethodPost, pathDynamoTables, strings.NewReader(validCreateBody())) + req = withWritePrincipal(req) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusConflict, rec.Code) + require.Contains(t, rec.Body.String(), "already_exists") +} + +func TestDynamoHandler_CreateTable_NotLeaderReturns503WithRetryAfter(t *testing.T) { + src := &stubTablesSource{ + tables: map[string]*DynamoTableSummary{}, + createErr: ErrTablesNotLeader, + } + h := newDynamoHandlerForTest(src) + req := httptest.NewRequest(http.MethodPost, pathDynamoTables, strings.NewReader(validCreateBody())) + req = withWritePrincipal(req) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusServiceUnavailable, rec.Code) + require.Equal(t, "1", rec.Header().Get("Retry-After")) + require.Contains(t, rec.Body.String(), "leader_unavailable") +} + +func TestDynamoHandler_CreateTable_ForbiddenFromSourceMaps403(t *testing.T) { + src := &stubTablesSource{ + tables: map[string]*DynamoTableSummary{}, + createErr: ErrTablesForbidden, + } + h := newDynamoHandlerForTest(src) + req := httptest.NewRequest(http.MethodPost, pathDynamoTables, strings.NewReader(validCreateBody())) + req = withWritePrincipal(req) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusForbidden, rec.Code) +} + +func TestDynamoHandler_CreateTable_ValidationErrorMaps400(t *testing.T) { + src := &stubTablesSource{ + tables: map[string]*DynamoTableSummary{}, + createErr: &ValidationError{Message: "conflicting attribute type for id"}, + } + h := newDynamoHandlerForTest(src) + req := httptest.NewRequest(http.MethodPost, pathDynamoTables, strings.NewReader(validCreateBody())) + req = withWritePrincipal(req) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusBadRequest, rec.Code) + require.Contains(t, rec.Body.String(), "conflicting attribute type for id") +} + +func TestDynamoHandler_CreateTable_GenericErrorIsHidden(t *testing.T) { + src := &stubTablesSource{ + tables: map[string]*DynamoTableSummary{}, + createErr: errors.New("storage backing sentinel ZQ-993"), + } + h := newDynamoHandlerForTest(src) + req := httptest.NewRequest(http.MethodPost, pathDynamoTables, strings.NewReader(validCreateBody())) + req = withWritePrincipal(req) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusInternalServerError, rec.Code) + require.NotContains(t, rec.Body.String(), "ZQ-993") + require.NotContains(t, rec.Body.String(), "storage backing sentinel") +} + +func TestDynamoHandler_DeleteTable_HappyPath(t *testing.T) { + src := &stubTablesSource{tables: map[string]*DynamoTableSummary{ + "users": {Name: "users", PartitionKey: "id"}, + }} + h := newDynamoHandlerForTest(src) + req := httptest.NewRequest(http.MethodDelete, pathDynamoTables+"/users", nil) + req = withWritePrincipal(req) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusNoContent, rec.Code) + require.Empty(t, rec.Body.Bytes()) + require.Equal(t, "users", src.lastDeleteName) + require.Equal(t, AuthPrincipal{AccessKey: "AKIA_FULL", Role: RoleFull}, src.lastDeletePrincipal) +} + +func TestDynamoHandler_DeleteTable_ReadOnlyPrincipalRejected(t *testing.T) { + src := &stubTablesSource{tables: map[string]*DynamoTableSummary{ + "users": {Name: "users", PartitionKey: "id"}, + }} + h := newDynamoHandlerForTest(src) + req := httptest.NewRequest(http.MethodDelete, pathDynamoTables+"/users", nil) + req = withReadOnlyPrincipal(req) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusForbidden, rec.Code) + require.Empty(t, src.lastDeleteName, "source must not be reached when role check fails") +} + +func TestDynamoHandler_DeleteTable_MissingReturns404(t *testing.T) { + src := &stubTablesSource{tables: map[string]*DynamoTableSummary{}} + h := newDynamoHandlerForTest(src) + req := httptest.NewRequest(http.MethodDelete, pathDynamoTables+"/absent", nil) + req = withWritePrincipal(req) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusNotFound, rec.Code) + require.Contains(t, rec.Body.String(), "not_found") +} + +func TestDynamoHandler_DeleteTable_NotLeaderReturns503WithRetryAfter(t *testing.T) { + src := &stubTablesSource{ + tables: map[string]*DynamoTableSummary{"users": {Name: "users"}}, + deleteErr: ErrTablesNotLeader, + } + h := newDynamoHandlerForTest(src) + req := httptest.NewRequest(http.MethodDelete, pathDynamoTables+"/users", nil) + req = withWritePrincipal(req) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusServiceUnavailable, rec.Code) + require.Equal(t, "1", rec.Header().Get("Retry-After")) +} + +func TestDynamoHandler_DeleteTable_RejectsTrailingSlash(t *testing.T) { + src := &stubTablesSource{tables: map[string]*DynamoTableSummary{}} + h := newDynamoHandlerForTest(src) + req := httptest.NewRequest(http.MethodDelete, pathDynamoTables+"/", nil) + req = withWritePrincipal(req) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusNotFound, rec.Code) +} + +// tableNameForIndex generates lex-sortable monotonically increasing +// names so list-pagination tests can assert deterministic ordering. +func tableNameForIndex(i int) string { + const width = 4 + digits := []byte("0000") + for k := width - 1; k >= 0 && i > 0; k-- { + digits[k] = byte('0' + i%10) + i /= 10 + } + return "tbl-" + string(digits) +} + +// Sanity check on the helper itself so the pagination assertions +// have a stable backing dataset. +func TestTableNameForIndex_LexSortable(t *testing.T) { + prev := "" + for i := 0; i < 20; i++ { + cur := tableNameForIndex(i) + if prev != "" { + require.True(t, strings.Compare(prev, cur) < 0, "non-monotonic at %d: %s !< %s", i, prev, cur) + } + prev = cur + } +} diff --git a/internal/admin/role_store.go b/internal/admin/role_store.go new file mode 100644 index 000000000..298ce344b --- /dev/null +++ b/internal/admin/role_store.go @@ -0,0 +1,31 @@ +package admin + +// RoleStore is the live access-key → Role lookup the admin handlers +// re-evaluate on every state-changing request. Embedding the role +// in the JWT alone is insufficient: a token minted under role +// `full` would otherwise keep mutating tables for the rest of its +// 1-hour TTL even if an operator revoked or downgraded the access +// key in the cluster's role configuration. Codex P1 on PR #635 +// flagged the gap; the leader-side ForwardServer already does this +// re-evaluation, the HTTP path now does it too so leader-direct +// writes match the forwarded path's authorisation contract. +type RoleStore interface { + // LookupRole returns the role for an access key as understood + // by the local node's view of cluster configuration. The bool + // is false when the access key is not in the admin role index + // — a session whose key has been removed must not be able to + // perform any admin write, even if its JWT is still within + // its issued validity window. + LookupRole(accessKey string) (Role, bool) +} + +// MapRoleStore is the trivial in-memory implementation, sufficient +// for tests and for the production wiring (which already keeps the +// role map in memory). +type MapRoleStore map[string]Role + +// LookupRole implements RoleStore. +func (m MapRoleStore) LookupRole(accessKey string) (Role, bool) { + r, ok := m[accessKey] + return r, ok +} diff --git a/internal/admin/router.go b/internal/admin/router.go index 658d41f45..8b5a5b3c9 100644 --- a/internal/admin/router.go +++ b/internal/admin/router.go @@ -277,6 +277,11 @@ func writeJSONNotFound(w http.ResponseWriter, _ *http.Request) { func writeJSONError(w http.ResponseWriter, status int, code, msg string) { w.Header().Set("Content-Type", "application/json; charset=utf-8") + // Defence-in-depth header: the admin surface is JSON-only, so + // declare nosniff to prevent a misbehaving browser from + // content-sniffing an error body into something executable. + // Cheap and standard for cookie-gated admin endpoints. + w.Header().Set("X-Content-Type-Options", "nosniff") w.Header().Set("Cache-Control", "no-store") w.WriteHeader(status) _ = json.NewEncoder(w).Encode(errorResponse{Error: code, Message: msg}) diff --git a/internal/admin/server.go b/internal/admin/server.go index 93b23c78e..eff694a2d 100644 --- a/internal/admin/server.go +++ b/internal/admin/server.go @@ -5,6 +5,7 @@ import ( "log/slog" "net/http" "reflect" + "strings" ) // ServerDeps bundles the collaborators the admin HTTP surface needs. All @@ -31,6 +32,13 @@ type ServerDeps struct { // ClusterInfo describes the local node's Raft state. ClusterInfo ClusterInfoSource + // Tables is the DynamoDB admin source — covers list, describe, + // create, and delete via TablesSource. Optional: a nil value + // disables /admin/api/v1/dynamo/tables{,/{name}} (the mux + // answers them with 404). This lets a build that ships only the + // cluster page deploy without standing up the dynamo bridge. + Tables TablesSource + // StaticFS is the embed.FS (or any fs.FS) backing the SPA. May be // nil during early development; the router renders 404 for // /admin/assets/* and the SPA fallback in that case. @@ -92,7 +100,21 @@ func NewServer(deps ServerDeps) (*Server, error) { } auth := NewAuthService(deps.Signer, deps.Credentials, deps.Roles, authOpts) cluster := NewClusterHandler(deps.ClusterInfo).WithLogger(logger) - mux := buildAPIMux(auth, deps.Verifier, cluster, logger) + var dynamo http.Handler + if deps.Tables != nil { + // Re-evaluate the principal's role on every state- + // changing request against the live role map (Codex P1 + // on PR #635). MapRoleStore wraps the same map the auth + // layer uses for login, so a config reload that updates + // deps.Roles does NOT automatically propagate here — + // operators must restart the listener for revocation to + // take effect, but the JWT no longer extends a revoked + // key past the next request. + dynamo = NewDynamoHandler(deps.Tables). + WithLogger(logger). + WithRoleStore(MapRoleStore(deps.Roles)) + } + mux := buildAPIMux(auth, deps.Verifier, cluster, dynamo, logger) router := NewRouter(mux, deps.StaticFS) return &Server{deps: deps, router: router, auth: auth, mux: mux}, nil } @@ -119,15 +141,23 @@ func (s *Server) APIHandler() http.Handler { // // Layout: // -// POST /admin/api/v1/auth/login (no auth, rate-limited) -// POST /admin/api/v1/auth/logout (no auth required) -// GET /admin/api/v1/cluster (auth required) +// POST /admin/api/v1/auth/login (no auth, rate-limited) +// POST /admin/api/v1/auth/logout (auth required) +// GET /admin/api/v1/cluster (auth required) +// GET /admin/api/v1/dynamo/tables (auth required) +// POST /admin/api/v1/dynamo/tables (auth required, full role) +// GET /admin/api/v1/dynamo/tables/{name} (auth required) +// DELETE /admin/api/v1/dynamo/tables/{name} (auth required, full role) // // Body limit applies uniformly. CSRF and Audit middleware apply to // write-capable protected endpoints; login and logout carry their own // audit path inside AuthService because the generic Audit middleware // cannot see the claimed actor at that point in the chain. -func buildAPIMux(auth *AuthService, verifier *Verifier, clusterHandler http.Handler, logger *slog.Logger) http.Handler { +// +// dynamoHandler may be nil; in that case the dynamo paths fall through +// to the unknown-endpoint 404, matching the behaviour of any other +// unregistered admin path. +func buildAPIMux(auth *AuthService, verifier *Verifier, clusterHandler, dynamoHandler http.Handler, logger *slog.Logger) http.Handler { loginHandler := http.HandlerFunc(auth.HandleLogin) logoutHandler := http.HandlerFunc(auth.HandleLogout) @@ -177,15 +207,27 @@ func buildAPIMux(auth *AuthService, verifier *Verifier, clusterHandler http.Hand loginChain := publicAuth(loginHandler) logoutChain := protectNoAudit(logoutHandler) clusterChain := protect(clusterHandler) + // Dynamo endpoints (reads and writes) share the protect chain + // so a missing session or CSRF token 401s/403s the same way + // regardless of method. The Audit middleware is a no-op for + // GET (it only logs state-changing methods) so dashboard polls + // don't flood the audit log, while POST/DELETE always do. + var dynamoChain http.Handler + if dynamoHandler != nil { + dynamoChain = protect(dynamoHandler) + } return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - switch r.URL.Path { - case "/admin/api/v1/auth/login": + switch { + case r.URL.Path == "/admin/api/v1/auth/login": loginChain.ServeHTTP(w, r) - case "/admin/api/v1/auth/logout": + case r.URL.Path == "/admin/api/v1/auth/logout": logoutChain.ServeHTTP(w, r) - case "/admin/api/v1/cluster": + case r.URL.Path == "/admin/api/v1/cluster": clusterChain.ServeHTTP(w, r) + case dynamoChain != nil && (r.URL.Path == pathDynamoTables || + strings.HasPrefix(r.URL.Path, pathPrefixDynamoTables)): + dynamoChain.ServeHTTP(w, r) default: writeJSONError(w, http.StatusNotFound, "unknown_endpoint", "no admin API handler is registered for this path") diff --git a/internal/admin/server_test.go b/internal/admin/server_test.go index 59344e0a5..016d4ce28 100644 --- a/internal/admin/server_test.go +++ b/internal/admin/server_test.go @@ -3,6 +3,7 @@ package admin import ( "bytes" "context" + "io" "log/slog" "net/http" "net/http/httptest" @@ -169,6 +170,289 @@ func TestServer_APIHandlerReturnsBareMux(t *testing.T) { require.Equal(t, http.StatusNotFound, rec.Code) } +// newServerWithTablesForTest mirrors newServerForTest but also wires +// in a stub TablesSource so the dynamo paths are reachable. The same +// test setup pattern (single fixed clock, two access keys, JSON +// logger) keeps the assertion surface compact. +func newServerWithTablesForTest(t *testing.T, src TablesSource) *Server { + t.Helper() + clk := fixedClock(time.Unix(1_700_000_000, 0).UTC()) + signer := newSignerForTest(t, 1, clk) + verifier := newVerifierForTest(t, []byte{1}, clk) + creds := MapCredentialStore{ + "AKIA_ADMIN": "ADMIN_SECRET", + "AKIA_RO": "RO_SECRET", + } + roles := map[string]Role{ + "AKIA_ADMIN": RoleFull, + "AKIA_RO": RoleReadOnly, + } + cluster := ClusterInfoFunc(func(_ context.Context) (ClusterInfo, error) { + return ClusterInfo{NodeID: "node-1", Version: "0.1.0"}, nil + }) + buf := &bytes.Buffer{} + logger := slog.New(slog.NewJSONHandler(buf, &slog.HandlerOptions{Level: slog.LevelInfo})) + srv, err := NewServer(ServerDeps{ + Signer: signer, + Verifier: verifier, + Credentials: creds, + Roles: roles, + ClusterInfo: cluster, + Tables: src, + AuthOpts: AuthServiceOpts{Clock: clk}, + Logger: logger, + }) + require.NoError(t, err) + return srv +} + +// loginAndCookies completes a successful login and returns the +// session + CSRF cookies the SPA would carry on follow-up requests. +// Tests that exercise protected GET endpoints reuse this helper to +// avoid copy-pasting the login dance everywhere. +func loginAndCookies(t *testing.T, ts *httptest.Server) []*http.Cookie { + t.Helper() + body, _ := json.Marshal(loginRequest{AccessKey: "AKIA_RO", SecretKey: "RO_SECRET"}) + req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, + ts.URL+"/admin/api/v1/auth/login", strings.NewReader(string(body))) + require.NoError(t, err) + req.Header.Set("Content-Type", "application/json") + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + cookies := resp.Cookies() + _ = resp.Body.Close() + return cookies +} + +func TestServer_DynamoTables_RequiresSession(t *testing.T) { + src := &stubTablesSource{tables: map[string]*DynamoTableSummary{ + "orders": {Name: "orders", PartitionKey: "id"}, + }} + srv := newServerWithTablesForTest(t, src) + ts := httptest.NewServer(srv.Handler()) + defer ts.Close() + + req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, + ts.URL+"/admin/api/v1/dynamo/tables", nil) + require.NoError(t, err) + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusUnauthorized, resp.StatusCode) + _ = resp.Body.Close() +} + +func TestServer_DynamoTables_ReadOnlyRoleAllowed(t *testing.T) { + // Tables list is a GET — the read-only role must succeed without + // any extra opt-in. This guards against accidentally bolting + // RequireWriteRole onto the read chain. + src := &stubTablesSource{tables: map[string]*DynamoTableSummary{ + "orders": {Name: "orders", PartitionKey: "id"}, + "products": {Name: "products", PartitionKey: "sku"}, + }} + srv := newServerWithTablesForTest(t, src) + ts := httptest.NewServer(srv.Handler()) + defer ts.Close() + + cookies := loginAndCookies(t, ts) + req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, + ts.URL+"/admin/api/v1/dynamo/tables", nil) + require.NoError(t, err) + for _, c := range cookies { + req.AddCookie(c) + } + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + var body dynamoListResponse + require.NoError(t, json.NewDecoder(resp.Body).Decode(&body)) + require.Equal(t, []string{"orders", "products"}, body.Tables) + _ = resp.Body.Close() +} + +func TestServer_DynamoDescribeTable_AuthenticatedHappyPath(t *testing.T) { + src := &stubTablesSource{tables: map[string]*DynamoTableSummary{ + "orders": { + Name: "orders", + PartitionKey: "id", + SortKey: "ts", + Generation: 7, + }, + }} + srv := newServerWithTablesForTest(t, src) + ts := httptest.NewServer(srv.Handler()) + defer ts.Close() + + cookies := loginAndCookies(t, ts) + req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, + ts.URL+"/admin/api/v1/dynamo/tables/orders", nil) + require.NoError(t, err) + for _, c := range cookies { + req.AddCookie(c) + } + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + var got DynamoTableSummary + require.NoError(t, json.NewDecoder(resp.Body).Decode(&got)) + require.Equal(t, "orders", got.Name) + require.Equal(t, "id", got.PartitionKey) + require.Equal(t, "ts", got.SortKey) + require.EqualValues(t, 7, got.Generation) + _ = resp.Body.Close() +} + +func TestServer_DynamoTables_NilSourceFallsThroughTo404(t *testing.T) { + // A build that ships only the cluster page (Tables nil) must keep + // the dynamo paths off the wire entirely. The expected response is + // the standard "unknown_endpoint" JSON 404 — same as any other + // unregistered admin URL — so the SPA can detect the feature is + // disabled by HTTP status alone. + srv := newServerForTest(t) // built without Tables + ts := httptest.NewServer(srv.Handler()) + defer ts.Close() + + cookies := loginAndCookies(t, ts) + req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, + ts.URL+"/admin/api/v1/dynamo/tables", nil) + require.NoError(t, err) + for _, c := range cookies { + req.AddCookie(c) + } + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusNotFound, resp.StatusCode) + body, _ := io.ReadAll(resp.Body) + require.Contains(t, string(body), "unknown_endpoint") + _ = resp.Body.Close() +} + +// loginAsFullAdminAndCookies returns cookies for a full-access +// principal so write-path integration tests can hit +// POST/DELETE endpoints without copy-pasting the login dance. +func loginAsFullAdminAndCookies(t *testing.T, ts *httptest.Server) []*http.Cookie { + t.Helper() + body, _ := json.Marshal(loginRequest{AccessKey: "AKIA_ADMIN", SecretKey: "ADMIN_SECRET"}) + req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, + ts.URL+"/admin/api/v1/auth/login", strings.NewReader(string(body))) + require.NoError(t, err) + req.Header.Set("Content-Type", "application/json") + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + cookies := resp.Cookies() + _ = resp.Body.Close() + return cookies +} + +// csrfHeaderFromCookies extracts the admin_csrf cookie value so the +// write tests can attach the X-Admin-CSRF header. The double-submit +// middleware compares the two; missing the header would reject the +// request before it reaches the dynamo handler under test. +func csrfHeaderFromCookies(cookies []*http.Cookie) string { + for _, c := range cookies { + if c.Name == csrfCookieName { + return c.Value + } + } + return "" +} + +func TestServer_DynamoCreateTable_FullRoleHappyPath(t *testing.T) { + src := &stubTablesSource{tables: map[string]*DynamoTableSummary{}} + srv := newServerWithTablesForTest(t, src) + ts := httptest.NewServer(srv.Handler()) + defer ts.Close() + + cookies := loginAsFullAdminAndCookies(t, ts) + body := strings.NewReader(`{"table_name":"users","partition_key":{"name":"id","type":"S"}}`) + req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, + ts.URL+"/admin/api/v1/dynamo/tables", body) + require.NoError(t, err) + req.Header.Set("Content-Type", "application/json") + req.Header.Set(csrfHeaderName, csrfHeaderFromCookies(cookies)) + for _, c := range cookies { + req.AddCookie(c) + } + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusCreated, resp.StatusCode) + require.Equal(t, "users", src.lastCreateInput.TableName) + require.Equal(t, RoleFull, src.lastCreatePrincipal.Role) + _ = resp.Body.Close() +} + +func TestServer_DynamoCreateTable_ReadOnlyRoleRejected(t *testing.T) { + src := &stubTablesSource{tables: map[string]*DynamoTableSummary{}} + srv := newServerWithTablesForTest(t, src) + ts := httptest.NewServer(srv.Handler()) + defer ts.Close() + + cookies := loginAndCookies(t, ts) // read-only key + body := strings.NewReader(`{"table_name":"users","partition_key":{"name":"id","type":"S"}}`) + req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, + ts.URL+"/admin/api/v1/dynamo/tables", body) + require.NoError(t, err) + req.Header.Set("Content-Type", "application/json") + req.Header.Set(csrfHeaderName, csrfHeaderFromCookies(cookies)) + for _, c := range cookies { + req.AddCookie(c) + } + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusForbidden, resp.StatusCode) + require.Empty(t, src.lastCreateInput.TableName, "source must not be reached on role rejection") + _ = resp.Body.Close() +} + +func TestServer_DynamoCreateTable_MissingCSRFRejected(t *testing.T) { + src := &stubTablesSource{tables: map[string]*DynamoTableSummary{}} + srv := newServerWithTablesForTest(t, src) + ts := httptest.NewServer(srv.Handler()) + defer ts.Close() + + cookies := loginAsFullAdminAndCookies(t, ts) + body := strings.NewReader(`{"table_name":"users","partition_key":{"name":"id","type":"S"}}`) + req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, + ts.URL+"/admin/api/v1/dynamo/tables", body) + require.NoError(t, err) + req.Header.Set("Content-Type", "application/json") + // Deliberately *no* X-Admin-CSRF header. CSRFDoubleSubmit must + // reject before the handler ever sees the body. + for _, c := range cookies { + req.AddCookie(c) + } + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusForbidden, resp.StatusCode) + require.Empty(t, src.lastCreateInput.TableName, "CSRF gate must run before the handler") + _ = resp.Body.Close() +} + +func TestServer_DynamoDeleteTable_FullRoleHappyPath(t *testing.T) { + src := &stubTablesSource{tables: map[string]*DynamoTableSummary{ + "users": {Name: "users", PartitionKey: "id"}, + }} + srv := newServerWithTablesForTest(t, src) + ts := httptest.NewServer(srv.Handler()) + defer ts.Close() + + cookies := loginAsFullAdminAndCookies(t, ts) + req, err := http.NewRequestWithContext(context.Background(), http.MethodDelete, + ts.URL+"/admin/api/v1/dynamo/tables/users", nil) + require.NoError(t, err) + req.Header.Set(csrfHeaderName, csrfHeaderFromCookies(cookies)) + for _, c := range cookies { + req.AddCookie(c) + } + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusNoContent, resp.StatusCode) + require.Equal(t, "users", src.lastDeleteName) + require.Equal(t, RoleFull, src.lastDeletePrincipal.Role) + _ = resp.Body.Close() +} + func TestServer_WriteRejectsMissingCSRF(t *testing.T) { // Login to obtain a session, then hit cluster with POST to trigger // CSRF on what the router normally rejects as method_not_allowed. diff --git a/main.go b/main.go index 85b51e137..f6e38cc89 100644 --- a/main.go +++ b/main.go @@ -688,7 +688,12 @@ func startServers(in serversInput) error { if err := runner.start(); err != nil { return err } - if err := startAdminFromFlags(in.ctx, in.lc, in.eg, in.runtimes); err != nil { + // runner.start() populates runner.dynamoServer for the admin + // listener's SigV4-bypass entrypoints (see adapter/dynamodb_admin.go). + // Passing nil here would leave the admin dashboard with no + // access to table metadata; the admin handler answers + // /admin/api/v1/dynamo/* with 404 in that case. + if err := startAdminFromFlags(in.ctx, in.lc, in.eg, in.runtimes, runner.dynamoServer); err != nil { return waitErrgroupAfterStartupFailure(in.cancel, in.eg, err) } return nil @@ -1037,10 +1042,10 @@ func startRedisServer(ctx context.Context, lc *net.ListenConfig, eg *errgroup.Gr return nil } -func startDynamoDBServer(ctx context.Context, lc *net.ListenConfig, eg *errgroup.Group, dynamoAddr string, shardStore *kv.ShardStore, coordinate kv.Coordinator, leaderDynamo map[string]string, metricsRegistry *monitoring.Registry, readTracker *kv.ActiveTimestampTracker) error { +func startDynamoDBServer(ctx context.Context, lc *net.ListenConfig, eg *errgroup.Group, dynamoAddr string, shardStore *kv.ShardStore, coordinate kv.Coordinator, leaderDynamo map[string]string, metricsRegistry *monitoring.Registry, readTracker *kv.ActiveTimestampTracker) (*adapter.DynamoDBServer, error) { dynamoL, err := lc.Listen(ctx, "tcp", dynamoAddr) if err != nil { - return errors.Wrapf(err, "failed to listen on %s", dynamoAddr) + return nil, errors.Wrapf(err, "failed to listen on %s", dynamoAddr) } dynamoServer := adapter.NewDynamoDBServer( dynamoL, @@ -1067,7 +1072,7 @@ func startDynamoDBServer(ctx context.Context, lc *net.ListenConfig, eg *errgroup } return errors.WithStack(err) }) - return nil + return dynamoServer, nil } func startPprofServer(ctx context.Context, lc *net.ListenConfig, eg *errgroup.Group, pprofAddr string, pprofToken string) error { @@ -1207,9 +1212,17 @@ type runtimeServerRunner struct { pprofAddress string pprofToken string metricsRegistry *monitoring.Registry + + // dynamoServer is populated by start() and made available to + // startAdminFromFlags in this package so the admin listener can + // call SigV4-bypass admin entrypoints (see + // adapter/dynamodb_admin.go) without going through HTTP. The + // field is unexported on purpose — it is package-private state, + // not a public API. Nil until start() reaches the dynamo step. + dynamoServer *adapter.DynamoDBServer } -func (r runtimeServerRunner) start() error { +func (r *runtimeServerRunner) start() error { if err := startRedisServer(r.ctx, r.lc, r.eg, r.redisAddress, r.shardStore, r.coordinate, r.leaderRedis, r.pubsubRelay, r.metricsRegistry, r.readTracker); err != nil { return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err) } @@ -1230,9 +1243,11 @@ func (r runtimeServerRunner) start() error { ); err != nil { return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err) } - if err := startDynamoDBServer(r.ctx, r.lc, r.eg, r.dynamoAddress, r.shardStore, r.coordinate, r.leaderDynamo, r.metricsRegistry, r.readTracker); err != nil { + dynamoServer, err := startDynamoDBServer(r.ctx, r.lc, r.eg, r.dynamoAddress, r.shardStore, r.coordinate, r.leaderDynamo, r.metricsRegistry, r.readTracker) + if err != nil { return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err) } + r.dynamoServer = dynamoServer if err := startS3Server(r.ctx, r.lc, r.eg, r.s3Address, r.shardStore, r.coordinate, r.leaderS3, r.s3Region, r.s3CredsFile, r.s3PathStyleOnly, r.readTracker); err != nil { return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err) } diff --git a/main_admin.go b/main_admin.go index abd4bb09f..e9e1f526c 100644 --- a/main_admin.go +++ b/main_admin.go @@ -10,8 +10,10 @@ import ( "strings" "time" + "github.com/bootjp/elastickv/adapter" "github.com/bootjp/elastickv/internal/admin" "github.com/bootjp/elastickv/internal/raftengine" + "github.com/bootjp/elastickv/kv" "github.com/cockroachdb/errors" "golang.org/x/sync/errgroup" ) @@ -66,7 +68,7 @@ type adminListenerConfig struct { // without touching --s3CredentialsFile: pulling the admin feature into // a hard dependency on that file would break deployments that never // intended to use it. -func startAdminFromFlags(ctx context.Context, lc *net.ListenConfig, eg *errgroup.Group, runtimes []*raftGroupRuntime) error { +func startAdminFromFlags(ctx context.Context, lc *net.ListenConfig, eg *errgroup.Group, runtimes []*raftGroupRuntime, dynamoServer *adapter.DynamoDBServer) error { if !*adminEnabled { return nil } @@ -106,10 +108,209 @@ func startAdminFromFlags(ctx context.Context, lc *net.ListenConfig, eg *errgroup fullAccessKeys: parseCSV(*adminFullAccessKeys), } clusterSrc := newClusterInfoSource(*raftId, buildVersion(), runtimes) - _, err = startAdminServer(ctx, lc, eg, cfg, staticCreds, clusterSrc, buildVersion()) + tablesSrc := newDynamoTablesSource(dynamoServer) + _, err = startAdminServer(ctx, lc, eg, cfg, staticCreds, clusterSrc, tablesSrc, buildVersion()) return err } +// newDynamoTablesSource adapts *adapter.DynamoDBServer to the +// admin.TablesSource interface. The bridge stays in this file (rather +// than internal/admin) so the admin package stays free of the heavy +// adapter-package dependency tree (gRPC, Raft, store). +// +// Returns nil when dynamoServer is nil; admin.NewServer handles a nil +// Tables field by leaving the dynamo paths off the wire entirely, +// which is the right behaviour for builds that ship without the +// Dynamo adapter. +func newDynamoTablesSource(dynamoServer *adapter.DynamoDBServer) admin.TablesSource { + if dynamoServer == nil { + return nil + } + return &dynamoTablesBridge{server: dynamoServer} +} + +// dynamoTablesBridge is the thin adapter that re-shapes the adapter's +// AdminTableSummary DTO into the admin package's DynamoTableSummary. +// The two structs are deliberately isomorphic so this translation +// does no allocation more than necessary; if a future GSI field is +// added on one side, the build breaks here, which is exactly the +// drift signal we want. +type dynamoTablesBridge struct { + server *adapter.DynamoDBServer +} + +func (b *dynamoTablesBridge) AdminListTables(ctx context.Context) ([]string, error) { + return b.server.AdminListTables(ctx) //nolint:wrapcheck // pure pass-through; the adapter owns the error context. +} + +func (b *dynamoTablesBridge) AdminDescribeTable(ctx context.Context, name string) (*admin.DynamoTableSummary, bool, error) { + summary, exists, err := b.server.AdminDescribeTable(ctx, name) + if err != nil { + return nil, false, err //nolint:wrapcheck // adapter wraps internally. + } + if !exists { + return nil, false, nil + } + return convertAdminTableSummary(summary), true, nil +} + +func (b *dynamoTablesBridge) AdminCreateTable(ctx context.Context, principal admin.AuthPrincipal, in admin.CreateTableRequest) (*admin.DynamoTableSummary, error) { + summary, err := b.server.AdminCreateTable(ctx, convertAdminPrincipal(principal), convertCreateTableInput(in)) + if err != nil { + return nil, translateAdminTablesError(err) + } + return convertAdminTableSummary(summary), nil +} + +func (b *dynamoTablesBridge) AdminDeleteTable(ctx context.Context, principal admin.AuthPrincipal, name string) error { + if err := b.server.AdminDeleteTable(ctx, convertAdminPrincipal(principal), name); err != nil { + return translateAdminTablesError(err) + } + return nil +} + +// convertAdminPrincipal mirrors admin.AuthPrincipal onto the +// adapter's parallel struct. Both packages keep the principal type +// independent so the adapter stays free of internal/admin +// dependencies, but the role / access-key fields are deliberately +// 1:1 — any drift is a wiring bug, not a feature. +func convertAdminPrincipal(p admin.AuthPrincipal) adapter.AdminPrincipal { + role := adapter.AdminRoleReadOnly + if p.Role.AllowsWrite() { + role = adapter.AdminRoleFull + } + return adapter.AdminPrincipal{AccessKey: p.AccessKey, Role: role} +} + +// convertCreateTableInput translates the admin-handler request DTO +// into the adapter's parallel input struct. We do this here — not +// in the admin package — to keep `internal/admin` free of any +// adapter import. +func convertCreateTableInput(in admin.CreateTableRequest) adapter.AdminCreateTableInput { + out := adapter.AdminCreateTableInput{ + TableName: in.TableName, + PartitionKey: adapter.AdminAttribute{Name: in.PartitionKey.Name, Type: in.PartitionKey.Type}, + } + if in.SortKey != nil { + out.SortKey = &adapter.AdminAttribute{Name: in.SortKey.Name, Type: in.SortKey.Type} + } + if len(in.GSI) == 0 { + return out + } + out.GSI = make([]adapter.AdminCreateGSI, len(in.GSI)) + for i, g := range in.GSI { + gsi := adapter.AdminCreateGSI{ + Name: g.Name, + PartitionKey: adapter.AdminAttribute{Name: g.PartitionKey.Name, Type: g.PartitionKey.Type}, + ProjectionType: g.Projection.Type, + NonKeyAttributes: append([]string(nil), g.Projection.NonKeyAttributes...), + } + if g.SortKey != nil { + gsi.SortKey = &adapter.AdminAttribute{Name: g.SortKey.Name, Type: g.SortKey.Type} + } + out.GSI[i] = gsi + } + return out +} + +// translateAdminTablesError maps the adapter's error vocabulary +// onto the admin-package sentinels the HTTP handler matches against. +// Anything not recognised is forwarded as-is and answered with 500 +// + a sanitised body, so a future adapter error mode does not leak +// raw text to clients while we are still adding the translation. +func translateAdminTablesError(err error) error { + switch { + case err == nil: + return nil + case errors.Is(err, adapter.ErrAdminForbidden): + return admin.ErrTablesForbidden + case errors.Is(err, adapter.ErrAdminNotLeader): + return admin.ErrTablesNotLeader + // Check structured adapter errors BEFORE the leader-churn + // matcher: a ValidationException whose message contains + // "not leader" (e.g., a user-supplied attribute name like + // `not leader` triggering the conflicting-attribute-type + // validator) must be classified as 400 invalid_request, not + // 503 leader_unavailable. The kv-internal sentinel checks + // in isLeaderChurnError still catch real leadership churn + // because they are typed-sentinel matches, not substring. + case adapter.IsAdminTableAlreadyExists(err): + return admin.ErrTablesAlreadyExists + case adapter.IsAdminTableNotFound(err): + return admin.ErrTablesNotFound + case adapter.IsAdminValidation(err): + msg := adapter.AdminErrorMessage(err) + if msg == "" { + msg = "validation failed" + } + return &admin.ValidationError{Message: msg} + case isLeaderChurnError(err): + // Cover leader-churn that surfaces between the up-front + // isVerifiedDynamoLeader check and createTableWithRetry's + // dispatch — the kv coordinator can drop leadership in + // that window and the resulting transient error should + // surface as 503 leader_unavailable + Retry-After: 1 + // rather than a generic 500. Codex P2 on PR #634. + return admin.ErrTablesNotLeader + default: + return err //nolint:wrapcheck // forwarded so the handler logs but does not surface it. + } +} + +// isLeaderChurnError reports whether err looks like one of the +// transient leader sentinels the kv coordinator and adapter +// internals emit during a leadership change. The set mirrors the +// closed list in kv.leaderErrorPhrases — keep them in sync if a +// new sentinel is added on the kv side. +// +// Phrase matching uses HasSuffix (not Contains) on the standard +// canonical strings because every kv-internal sentinel emits the +// phrase at the END of its error chain (e.g., +// "raft engine: not leader", "dispatch failed: leader not found"). +// A user-supplied string that happens to contain a leader phrase +// in the MIDDLE of an unrelated error message therefore does not +// false-positive — Codex P2 on PR #634 flagged the original +// strings.Contains form for misclassifying validation messages +// like "conflicting attribute type for " when the +// name itself was "not leader". +func isLeaderChurnError(err error) bool { + if err == nil { + return false + } + if errors.Is(err, kv.ErrLeaderNotFound) || + errors.Is(err, adapter.ErrNotLeader) || + errors.Is(err, adapter.ErrLeaderNotFound) { + return true + } + msg := err.Error() + return strings.HasSuffix(msg, "not leader") || + strings.HasSuffix(msg, "leader not found") || + strings.HasSuffix(msg, "leadership lost") || + strings.HasSuffix(msg, "leadership transfer in progress") +} + +func convertAdminTableSummary(in *adapter.AdminTableSummary) *admin.DynamoTableSummary { + out := &admin.DynamoTableSummary{ + Name: in.Name, + PartitionKey: in.PartitionKey, + SortKey: in.SortKey, + Generation: in.Generation, + } + if len(in.GlobalSecondaryIndexes) == 0 { + return out + } + out.GlobalSecondaryIndexes = make([]admin.DynamoGSISummary, len(in.GlobalSecondaryIndexes)) + for i, g := range in.GlobalSecondaryIndexes { + out.GlobalSecondaryIndexes[i] = admin.DynamoGSISummary{ + Name: g.Name, + PartitionKey: g.PartitionKey, + SortKey: g.SortKey, + ProjectionType: g.ProjectionType, + } + } + return out +} + // buildAdminConfig translates flag values into an admin.Config. func buildAdminConfig(in adminListenerConfig) admin.Config { return admin.Config{ @@ -144,6 +345,7 @@ func startAdminServer( cfg adminListenerConfig, creds map[string]string, cluster admin.ClusterInfoSource, + tables admin.TablesSource, version string, ) (string, error) { adminCfg := buildAdminConfig(cfg) @@ -151,7 +353,7 @@ func startAdminServer( if err != nil || !enabled { return "", err } - server, err := buildAdminHTTPServer(&adminCfg, creds, cluster) + server, err := buildAdminHTTPServer(&adminCfg, creds, cluster, tables) if err != nil { return "", err } @@ -191,7 +393,7 @@ func checkAdminConfig(adminCfg *admin.Config, cluster admin.ClusterInfoSource) ( return true, nil } -func buildAdminHTTPServer(adminCfg *admin.Config, creds map[string]string, cluster admin.ClusterInfoSource) (*admin.Server, error) { +func buildAdminHTTPServer(adminCfg *admin.Config, creds map[string]string, cluster admin.ClusterInfoSource, tables admin.TablesSource) (*admin.Server, error) { primaryKeys, err := adminCfg.DecodedSigningKeys() if err != nil { return nil, errors.Wrap(err, "decode admin signing keys") @@ -210,6 +412,7 @@ func buildAdminHTTPServer(adminCfg *admin.Config, creds map[string]string, clust Credentials: admin.MapCredentialStore(creds), Roles: adminCfg.RoleIndex(), ClusterInfo: cluster, + Tables: tables, StaticFS: nil, AuthOpts: admin.AuthServiceOpts{ InsecureCookie: adminCfg.AllowInsecureDevCookie, diff --git a/main_admin_test.go b/main_admin_test.go index eb961311c..e62a61aaf 100644 --- a/main_admin_test.go +++ b/main_admin_test.go @@ -10,6 +10,7 @@ import ( "crypto/x509/pkix" "encoding/base64" "encoding/pem" + "errors" "io" "math/big" "net" @@ -23,6 +24,7 @@ import ( "github.com/bootjp/elastickv/adapter" "github.com/bootjp/elastickv/internal/admin" "github.com/bootjp/elastickv/internal/raftengine" + "github.com/bootjp/elastickv/kv" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" ) @@ -196,7 +198,7 @@ func TestStartAdminServer_DisabledNoOp(t *testing.T) { eg, ctx := errgroup.WithContext(context.Background()) defer func() { _ = eg.Wait() }() var lc net.ListenConfig - _, err := startAdminServer(ctx, &lc, eg, adminListenerConfig{enabled: false}, nil, nil, "") + _, err := startAdminServer(ctx, &lc, eg, adminListenerConfig{enabled: false}, nil, nil, nil, "") require.NoError(t, err) } @@ -209,7 +211,7 @@ func TestStartAdminServer_InvalidConfigRejected(t *testing.T) { listen: "127.0.0.1:0", // missing signing key } - _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, "") + _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, nil, "") require.Error(t, err) } @@ -222,7 +224,7 @@ func TestStartAdminServer_NonLoopbackWithoutTLSRejected(t *testing.T) { listen: "0.0.0.0:0", sessionSigningKey: freshKey(), } - _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, "") + _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, nil, "") require.Error(t, err) require.Contains(t, err.Error(), "TLS") } @@ -236,7 +238,7 @@ func TestStartAdminServer_RejectsMissingClusterSource(t *testing.T) { listen: "127.0.0.1:0", sessionSigningKey: freshKey(), } - _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, "") + _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, nil, "") require.Error(t, err) require.Contains(t, err.Error(), "cluster info source") } @@ -259,7 +261,7 @@ func TestStartAdminServer_ServesHealthz(t *testing.T) { cluster := admin.ClusterInfoFunc(func(_ context.Context) (admin.ClusterInfo, error) { return admin.ClusterInfo{NodeID: "n1", Version: "test"}, nil }) - addr, err := startAdminServer(eCtx, &lc, eg, cfg, map[string]string{}, cluster, "test") + addr, err := startAdminServer(eCtx, &lc, eg, cfg, map[string]string{}, cluster, nil, "test") require.NoError(t, err) // Poll /admin/healthz until success or the test deadline. @@ -302,7 +304,7 @@ func TestStartAdminServer_ServesTLS(t *testing.T) { cluster := admin.ClusterInfoFunc(func(_ context.Context) (admin.ClusterInfo, error) { return admin.ClusterInfo{NodeID: "n-tls", Version: "test"}, nil }) - addr, err := startAdminServer(eCtx, &lc, eg, cfg, map[string]string{}, cluster, "test") + addr, err := startAdminServer(eCtx, &lc, eg, cfg, map[string]string{}, cluster, nil, "test") require.NoError(t, err) transport := &http.Transport{TLSClientConfig: &tls.Config{ @@ -363,3 +365,64 @@ func writeSelfSignedCert(t *testing.T) (string, string) { require.NoError(t, keyOut.Close()) return certPath, keyPath } + +// TestTranslateAdminTablesError_LeaderChurn verifies that the +// bridge maps transient leader-churn errors from the kv coordinator +// (which AdminCreateTable/AdminDeleteTable can return after their +// initial isVerifiedDynamoLeader check) to admin.ErrTablesNotLeader +// rather than the generic 500 fallthrough. Codex P2 on PR #634. +func TestTranslateAdminTablesError_LeaderChurn(t *testing.T) { + cases := []struct { + name string + in error + }{ + {"kv.ErrLeaderNotFound", kv.ErrLeaderNotFound}, + {"adapter.ErrNotLeader", adapter.ErrNotLeader}, + {"adapter.ErrLeaderNotFound", adapter.ErrLeaderNotFound}, + {"wrapped not leader", errors.New("dispatch failed: not leader")}, + {"wrapped leader not found", errors.New("dispatch: leader not found")}, + {"wrapped leadership lost", errors.New("commit aborted: leadership lost")}, + {"wrapped leadership transfer", errors.New("retry exhausted: leadership transfer in progress")}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + out := translateAdminTablesError(tc.in) + require.ErrorIs(t, out, admin.ErrTablesNotLeader, + "input %q must map to ErrTablesNotLeader", tc.in) + }) + } +} + +// TestTranslateAdminTablesError_LeaderPhraseInMiddleOfMessage covers +// the false-positive class Codex P2 flagged: a message that +// happens to contain a leader phrase NOT at the suffix must NOT +// be classified as leader-churn. Switching from strings.Contains +// to strings.HasSuffix is what makes this test pass. +func TestTranslateAdminTablesError_LeaderPhraseInMiddleOfMessage(t *testing.T) { + cases := []string{ + // Phrase mid-message — kv-internal sentinels never look + // like this; they always end with the canonical phrase. + "not leader: actually a downstream error", + "leader not found, but recovered automatically", + "leadership lost mid-snapshot, retried successfully", + } + for _, msg := range cases { + t.Run(msg, func(t *testing.T) { + out := translateAdminTablesError(errors.New(msg)) + require.NotErrorIs(t, out, admin.ErrTablesNotLeader, + "mid-message leader phrase %q must not classify as leader-churn", msg) + }) + } +} + +// TestTranslateAdminTablesError_UnrelatedErrorPassesThrough confirms +// the leader-churn detector does not swallow unrelated errors that +// happen to mention the word "leader" outside the canonical phrases. +func TestTranslateAdminTablesError_UnrelatedErrorPassesThrough(t *testing.T) { + in := errors.New("team leader misconfigured") + out := translateAdminTablesError(in) + // Falls through to default — same error returned, NOT + // ErrTablesNotLeader. + require.NotErrorIs(t, out, admin.ErrTablesNotLeader) + require.Equal(t, in, out) +}