diff --git a/internal/admin/forward_client.go b/internal/admin/forward_client.go index edeadcd84..2d5dba684 100644 --- a/internal/admin/forward_client.go +++ b/internal/admin/forward_client.go @@ -32,6 +32,17 @@ type LeaderForwarder interface { ForwardCreateTable(ctx context.Context, principal AuthPrincipal, in CreateTableRequest) (*ForwardResult, error) // ForwardDeleteTable is the delete-side counterpart. ForwardDeleteTable(ctx context.Context, principal AuthPrincipal, name string) (*ForwardResult, error) + // ForwardCreateBucket issues a forwarded POST /s3/buckets on + // the leader's behalf. The leader echoes back the same JSON + // envelope a leader-direct call would produce. + ForwardCreateBucket(ctx context.Context, principal AuthPrincipal, in CreateBucketRequest) (*ForwardResult, error) + // ForwardDeleteBucket is the delete-side counterpart. + ForwardDeleteBucket(ctx context.Context, principal AuthPrincipal, name string) (*ForwardResult, error) + // ForwardPutBucketAcl issues a forwarded PUT + // /s3/buckets/{name}/acl. Both the bucket name and the + // new ACL travel inside the proto payload — see the leader-side + // handlePutBucketAcl for the JSON shape. + ForwardPutBucketAcl(ctx context.Context, principal AuthPrincipal, name, acl string) (*ForwardResult, error) } // ForwardResult is the leader's response replayed for the SPA. The @@ -142,6 +153,43 @@ func (c *gRPCForwardClient) ForwardDeleteTable(ctx context.Context, principal Au return c.forward(ctx, pb.AdminOperation_ADMIN_OP_DELETE_TABLE, principal, payload) } +// ForwardCreateBucket serialises `in` as JSON and dispatches the +// CreateBucket operation. Same contract as ForwardCreateTable — +// gRPC errors are wrapped, ErrLeaderUnavailable on no-leader. +func (c *gRPCForwardClient) ForwardCreateBucket(ctx context.Context, principal AuthPrincipal, in CreateBucketRequest) (*ForwardResult, error) { + payload, err := json.Marshal(in) + if err != nil { + return nil, pkgerrors.Wrap(err, "admin forward: marshal create-bucket request") + } + return c.forward(ctx, pb.AdminOperation_ADMIN_OP_CREATE_BUCKET, principal, payload) +} + +// ForwardDeleteBucket serialises the bucket name as `{"name":"..."}` +// to match the leader-side handleDeleteBucket contract. +func (c *gRPCForwardClient) ForwardDeleteBucket(ctx context.Context, principal AuthPrincipal, name string) (*ForwardResult, error) { + payload, err := json.Marshal(struct { + Name string `json:"name"` + }{Name: name}) + if err != nil { + return nil, pkgerrors.Wrap(err, "admin forward: marshal delete-bucket request") + } + return c.forward(ctx, pb.AdminOperation_ADMIN_OP_DELETE_BUCKET, principal, payload) +} + +// ForwardPutBucketAcl carries both the bucket name (from the URL +// path) and the new ACL (from the request body) in the proto +// payload — see handlePutBucketAcl for the leader-side decode. +func (c *gRPCForwardClient) ForwardPutBucketAcl(ctx context.Context, principal AuthPrincipal, name, acl string) (*ForwardResult, error) { + payload, err := json.Marshal(struct { + Name string `json:"name"` + ACL string `json:"acl"` + }{Name: name, ACL: acl}) + if err != nil { + return nil, pkgerrors.Wrap(err, "admin forward: marshal put-bucket-acl request") + } + return c.forward(ctx, pb.AdminOperation_ADMIN_OP_PUT_BUCKET_ACL, principal, payload) +} + func (c *gRPCForwardClient) forward(ctx context.Context, op pb.AdminOperation, principal AuthPrincipal, payload []byte) (*ForwardResult, error) { addr := c.resolver() if addr == "" { diff --git a/internal/admin/forward_client_test.go b/internal/admin/forward_client_test.go index df4a814f0..bf22fdad8 100644 --- a/internal/admin/forward_client_test.go +++ b/internal/admin/forward_client_test.go @@ -182,3 +182,72 @@ func TestGRPCForwardClient_FillsMissingContentType(t *testing.T) { CreateTableRequest{TableName: "t", PartitionKey: CreateTableAttribute{Name: "id", Type: "S"}}) require.Equal(t, "application/json; charset=utf-8", res.ContentType) } + +func TestGRPCForwardClient_ForwardCreateBucket_HappyPath(t *testing.T) { + conn := &stubForwardConn{resp: &pb.AdminForwardResponse{ + StatusCode: http.StatusCreated, + Payload: []byte(`{"bucket_name":"public-assets","acl":"public-read"}`), + ContentType: "application/json; charset=utf-8", + }} + dial := &stubConnFactory{conn: conn} + fwd, err := NewGRPCForwardClient(fixedResolver("leader.local:7000"), dial, "follower-2") + require.NoError(t, err) + + in := CreateBucketRequest{BucketName: "public-assets", ACL: "public-read"} + res, err := fwd.ForwardCreateBucket(context.Background(), + AuthPrincipal{AccessKey: "AKIA_F", Role: RoleFull}, in) + require.NoError(t, err) + require.Equal(t, http.StatusCreated, res.StatusCode) + require.Equal(t, pb.AdminOperation_ADMIN_OP_CREATE_BUCKET, conn.lastReq.GetOperation()) + require.Equal(t, "follower-2", conn.lastReq.GetForwardedFrom()) + var roundtripped CreateBucketRequest + require.NoError(t, json.Unmarshal(conn.lastReq.GetPayload(), &roundtripped)) + require.Equal(t, in, roundtripped) +} + +func TestGRPCForwardClient_ForwardDeleteBucket_HappyPath(t *testing.T) { + conn := &stubForwardConn{resp: &pb.AdminForwardResponse{ + StatusCode: http.StatusNoContent, + ContentType: "application/json; charset=utf-8", + }} + dial := &stubConnFactory{conn: conn} + fwd, _ := NewGRPCForwardClient(fixedResolver("leader.local:7000"), dial, "follower-3") + + res, err := fwd.ForwardDeleteBucket(context.Background(), + AuthPrincipal{AccessKey: "AKIA_F", Role: RoleFull}, "orders") + require.NoError(t, err) + require.Equal(t, http.StatusNoContent, res.StatusCode) + require.Equal(t, pb.AdminOperation_ADMIN_OP_DELETE_BUCKET, conn.lastReq.GetOperation()) + require.JSONEq(t, `{"name":"orders"}`, string(conn.lastReq.GetPayload())) +} + +func TestGRPCForwardClient_ForwardPutBucketAcl_HappyPath(t *testing.T) { + conn := &stubForwardConn{resp: &pb.AdminForwardResponse{ + StatusCode: http.StatusNoContent, + ContentType: "application/json; charset=utf-8", + }} + dial := &stubConnFactory{conn: conn} + fwd, _ := NewGRPCForwardClient(fixedResolver("leader.local:7000"), dial, "follower-4") + + res, err := fwd.ForwardPutBucketAcl(context.Background(), + AuthPrincipal{AccessKey: "AKIA_F", Role: RoleFull}, "orders", "public-read") + require.NoError(t, err) + require.Equal(t, http.StatusNoContent, res.StatusCode) + require.Equal(t, pb.AdminOperation_ADMIN_OP_PUT_BUCKET_ACL, conn.lastReq.GetOperation()) + // Both name and acl travel in the payload — see handlePutBucketAcl + // for the leader-side decode contract. + require.JSONEq(t, `{"name":"orders","acl":"public-read"}`, string(conn.lastReq.GetPayload())) +} + +func TestGRPCForwardClient_BucketOps_NoLeaderReturnsErrLeaderUnavailable(t *testing.T) { + dial := &stubConnFactory{conn: &stubForwardConn{}} + fwd, _ := NewGRPCForwardClient(fixedResolver(""), dial, "f") + + _, err := fwd.ForwardCreateBucket(context.Background(), AuthPrincipal{Role: RoleFull}, + CreateBucketRequest{BucketName: "x"}) + require.ErrorIs(t, err, ErrLeaderUnavailable) + _, err = fwd.ForwardDeleteBucket(context.Background(), AuthPrincipal{Role: RoleFull}, "x") + require.ErrorIs(t, err, ErrLeaderUnavailable) + _, err = fwd.ForwardPutBucketAcl(context.Background(), AuthPrincipal{Role: RoleFull}, "x", "private") + require.ErrorIs(t, err, ErrLeaderUnavailable) +} diff --git a/internal/admin/forward_integration_test.go b/internal/admin/forward_integration_test.go index 54bd1bcf4..fec7e186e 100644 --- a/internal/admin/forward_integration_test.go +++ b/internal/admin/forward_integration_test.go @@ -20,11 +20,28 @@ type stubLeaderForwarder struct { createErr error deleteRes *ForwardResult deleteErr error - - lastCreatePrincipal AuthPrincipal - lastCreateInput CreateTableRequest - lastDeletePrincipal AuthPrincipal - lastDeleteName string + // Bucket-side counterparts ship with P2 slice 2b. Tests that + // only exercise Dynamo behaviour leave these zero values; the + // stub honours the same nil-result-and-no-error contract the + // table side uses on a "did not call us" path. + createBucketRes *ForwardResult + createBucketErr error + deleteBucketRes *ForwardResult + deleteBucketErr error + putACLRes *ForwardResult + putACLErr error + + lastCreatePrincipal AuthPrincipal + lastCreateInput CreateTableRequest + lastDeletePrincipal AuthPrincipal + lastDeleteName string + lastCreateBucketPrincipal AuthPrincipal + lastCreateBucketInput CreateBucketRequest + lastDeleteBucketPrincipal AuthPrincipal + lastDeleteBucketName string + lastPutACLPrincipal AuthPrincipal + lastPutACLBucket string + lastPutACLValue string } func (s *stubLeaderForwarder) ForwardCreateTable(_ context.Context, principal AuthPrincipal, in CreateTableRequest) (*ForwardResult, error) { @@ -45,6 +62,34 @@ func (s *stubLeaderForwarder) ForwardDeleteTable(_ context.Context, principal Au return s.deleteRes, nil } +func (s *stubLeaderForwarder) ForwardCreateBucket(_ context.Context, principal AuthPrincipal, in CreateBucketRequest) (*ForwardResult, error) { + s.lastCreateBucketPrincipal = principal + s.lastCreateBucketInput = in + if s.createBucketErr != nil { + return nil, s.createBucketErr + } + return s.createBucketRes, nil +} + +func (s *stubLeaderForwarder) ForwardDeleteBucket(_ context.Context, principal AuthPrincipal, name string) (*ForwardResult, error) { + s.lastDeleteBucketPrincipal = principal + s.lastDeleteBucketName = name + if s.deleteBucketErr != nil { + return nil, s.deleteBucketErr + } + return s.deleteBucketRes, nil +} + +func (s *stubLeaderForwarder) ForwardPutBucketAcl(_ context.Context, principal AuthPrincipal, name, acl string) (*ForwardResult, error) { + s.lastPutACLPrincipal = principal + s.lastPutACLBucket = name + s.lastPutACLValue = acl + if s.putACLErr != nil { + return nil, s.putACLErr + } + return s.putACLRes, nil +} + // notLeaderSource is a TablesSource that always returns // ErrTablesNotLeader on writes — i.e., it simulates the local // node being a follower. Used to exercise the forwarder path diff --git a/internal/admin/forward_server.go b/internal/admin/forward_server.go index 068f3bee6..09ea83e26 100644 --- a/internal/admin/forward_server.go +++ b/internal/admin/forward_server.go @@ -37,14 +37,17 @@ const adminForwardPayloadLimit = 64 << 10 type ForwardServer struct { pb.UnimplementedAdminForwardServer - source TablesSource - roles RoleStore - logger *slog.Logger + source TablesSource + buckets BucketsSource + roles RoleStore + logger *slog.Logger } // NewForwardServer wires a TablesSource and a RoleStore behind the // gRPC AdminForward service. logger may be nil; defaults to -// slog.Default(). +// slog.Default(). The S3 BucketsSource is plumbed via WithBucketsSource +// so deployments that ship without the S3 adapter can still register +// the gRPC service for Dynamo forwarding. func NewForwardServer(source TablesSource, roles RoleStore, logger *slog.Logger) *ForwardServer { if logger == nil { logger = slog.Default() @@ -52,6 +55,17 @@ func NewForwardServer(source TablesSource, roles RoleStore, logger *slog.Logger) return &ForwardServer{source: source, roles: roles, logger: logger} } +// WithBucketsSource enables S3 admin operation forwarding. Returns +// the receiver so wiring code can chain the call: +// `NewForwardServer(...).WithBucketsSource(...)`. A nil BucketsSource +// leaves S3 forwarding disabled — the Forward dispatcher rejects +// CREATE_BUCKET / DELETE_BUCKET / PUT_BUCKET_ACL with 501 in that +// case so a follower can detect the missing capability. +func (s *ForwardServer) WithBucketsSource(b BucketsSource) *ForwardServer { + s.buckets = b + return s +} + // Forward is the gRPC entrypoint. It performs the principal // re-evaluation the design mandates, then dispatches by operation. // Errors that the SPA can act on are returned as a structured @@ -86,11 +100,37 @@ func (s *ForwardServer) Forward(ctx context.Context, req *pb.AdminForwardRequest return rejectForward(http.StatusForbidden, "forbidden", "this endpoint requires a full-access role") } - switch req.GetOperation() { + return s.dispatchForward(ctx, principal, forwardedFrom, req) +} + +// dispatchForward routes the validated request to the per-operation +// handler. Pulled out so Forward stays under the cyclomatic ceiling +// as the operation enum grows; the principal-validation + +// forwarded_from sanitisation logic stays in Forward where it belongs. +// +// Source-availability checks live in checkOpAvailability rather than +// in each handler: a Dynamo-only build has s.source != nil but +// s.buckets == nil, and an S3-only build (Codex P1 on PR #673) has +// the inverse. Centralising the check means every operation gets a +// consistent 501 error shape and a future op cannot ship without the +// operator-visible "not configured" message that the existing ops +// promise. +func (s *ForwardServer) dispatchForward(ctx context.Context, principal AuthPrincipal, forwardedFrom string, req *pb.AdminForwardRequest) (*pb.AdminForwardResponse, error) { + op := req.GetOperation() + if resp, err, ok := s.checkOpAvailability(op); !ok { + return resp, err + } + switch op { case pb.AdminOperation_ADMIN_OP_CREATE_TABLE: return s.handleCreate(ctx, principal, forwardedFrom, req) case pb.AdminOperation_ADMIN_OP_DELETE_TABLE: return s.handleDelete(ctx, principal, forwardedFrom, req) + case pb.AdminOperation_ADMIN_OP_CREATE_BUCKET: + return s.handleCreateBucket(ctx, principal, forwardedFrom, req) + case pb.AdminOperation_ADMIN_OP_DELETE_BUCKET: + return s.handleDeleteBucket(ctx, principal, forwardedFrom, req) + case pb.AdminOperation_ADMIN_OP_PUT_BUCKET_ACL: + return s.handlePutBucketAcl(ctx, principal, forwardedFrom, req) case pb.AdminOperation_ADMIN_OP_UNSPECIFIED: return rejectForward(http.StatusBadRequest, "invalid_request", "unknown admin operation") default: @@ -98,6 +138,43 @@ func (s *ForwardServer) Forward(ctx context.Context, req *pb.AdminForwardRequest } } +// checkOpAvailability returns (resp, err, true) when dispatchForward +// should continue to the per-op handler, or (resp, err, false) when +// the leader's build does not include the source the requested +// operation needs (S3-only deployment served a Dynamo op, or vice +// versa). Pulling the per-op switch out keeps dispatchForward's +// cyclomatic count under the linter ceiling as the enum grows. +func (s *ForwardServer) checkOpAvailability(op pb.AdminOperation) (*pb.AdminForwardResponse, error, bool) { + switch op { + case pb.AdminOperation_ADMIN_OP_CREATE_TABLE, pb.AdminOperation_ADMIN_OP_DELETE_TABLE: + if s.source == nil { + resp, err := notImplementedForwardResponse("DynamoDB") + return resp, err, false + } + case pb.AdminOperation_ADMIN_OP_CREATE_BUCKET, + pb.AdminOperation_ADMIN_OP_DELETE_BUCKET, + pb.AdminOperation_ADMIN_OP_PUT_BUCKET_ACL: + if s.buckets == nil { + resp, err := notImplementedForwardResponse("S3") + return resp, err, false + } + case pb.AdminOperation_ADMIN_OP_UNSPECIFIED: + // Unknown-op rejection is dispatchForward's responsibility, + // not this gate's. Falling through to ok=true lets the main + // switch's default branch produce the canonical 400 message. + } + return nil, nil, true +} + +// notImplementedForwardResponse produces the 501 response a follower +// receives when the leader is built without the source for this +// operation's surface. surface is the human-facing label that ends +// up in the error message ("DynamoDB" or "S3"). +func notImplementedForwardResponse(surface string) (*pb.AdminForwardResponse, error) { + return rejectForward(http.StatusNotImplemented, "not_implemented", + surface+" admin forwarding is not configured on this leader") +} + func (s *ForwardServer) validatePrincipal(p *pb.AdminPrincipal) (AuthPrincipal, bool) { accessKey := p.GetAccessKey() if accessKey == "" { @@ -154,55 +231,202 @@ func (s *ForwardServer) handleDelete(ctx context.Context, principal AuthPrincipa // proto stays operation-agnostic — there is no operation-specific // field in AdminForwardRequest, by design (adding one per op // would couple every new admin endpoint to the proto schema). + name, rejection, err := decodeNamedPayload(req.GetPayload(), "delete") + if rejection != nil || err != nil { + return rejection, err + } + if err := s.source.AdminDeleteTable(ctx, principal, name); err != nil { + s.logUnexpectedSourceError(ctx, "delete_table", name, forwardedFrom, err) + return forwardErrorResponse("delete", err), nil + } + s.auditDeleteSuccess(ctx, principal, forwardedFrom, "delete_table", "table", name) + return &pb.AdminForwardResponse{StatusCode: http.StatusNoContent}, nil +} + +// handleCreateBucket dispatches a forwarded POST /s3/buckets call. +// Mirrors handleCreate (Dynamo) but decodes a CreateBucketRequest +// and routes through BucketsSource. dispatchForward gates this on +// s.buckets != nil so callers reach here only when S3 forwarding is +// configured. +func (s *ForwardServer) handleCreateBucket(ctx context.Context, principal AuthPrincipal, forwardedFrom string, req *pb.AdminForwardRequest) (*pb.AdminForwardResponse, error) { + payload := req.GetPayload() + if len(payload) > adminForwardPayloadLimit { + return rejectForward(http.StatusRequestEntityTooLarge, "payload_too_large", + "forwarded payload exceeds the 64 KiB admin limit") + } + if bytes.IndexByte(payload, 0) >= 0 { + return rejectForward(http.StatusBadRequest, "invalid_body", + "create-bucket payload contains a NUL byte") + } + dec := json.NewDecoder(bytes.NewReader(payload)) + dec.DisallowUnknownFields() + var body CreateBucketRequest + if err := dec.Decode(&body); err != nil { + return rejectForward(http.StatusBadRequest, "invalid_body", + "create-bucket payload is not valid JSON") + } + if dec.More() { + return rejectForward(http.StatusBadRequest, "invalid_body", + "create-bucket payload has trailing data") + } + // Reuse the HTTP handler's validateCreateBucketRequest so the + // forwarded path enforces identical rules — empty / whitespace- + // padded bucket_name produces the same 400 message a leader- + // direct call would, instead of slipping through here and + // hitting the adapter's lower-level validateS3BucketName with + // a less actionable error (Gemini security-high + Claude #2 on + // PR #673). + if err := validateCreateBucketRequest(body); err != nil { + return rejectForward(http.StatusBadRequest, "invalid_body", err.Error()) + } + summary, err := s.buckets.AdminCreateBucket(ctx, principal, body) + if err != nil { + s.logUnexpectedSourceError(ctx, "create_bucket", body.BucketName, forwardedFrom, err) + return forwardBucketsErrorResponse("create", err), nil + } + s.logger.LogAttrs(ctx, slog.LevelInfo, "admin_audit", + slog.String("actor", principal.AccessKey), + slog.String("role", string(principal.Role)), + slog.String("forwarded_from", forwardedFrom), + slog.String("operation", "create_bucket"), + slog.String("bucket", body.BucketName), + ) + return jsonForwardResponse(http.StatusCreated, summary) +} + +// handleDeleteBucket dispatches a forwarded DELETE +// /s3/buckets/{name} call. Same payload shape as the Dynamo delete: +// a JSON object with a single "name" field, which the bridge +// generates from the URL path. +func (s *ForwardServer) handleDeleteBucket(ctx context.Context, principal AuthPrincipal, forwardedFrom string, req *pb.AdminForwardRequest) (*pb.AdminForwardResponse, error) { + name, rejection, err := decodeNamedPayload(req.GetPayload(), "delete-bucket") + if rejection != nil || err != nil { + return rejection, err + } + if err := s.buckets.AdminDeleteBucket(ctx, principal, name); err != nil { + s.logUnexpectedSourceError(ctx, "delete_bucket", name, forwardedFrom, err) + return forwardBucketsErrorResponse("delete", err), nil + } + s.auditDeleteSuccess(ctx, principal, forwardedFrom, "delete_bucket", "bucket", name) + return &pb.AdminForwardResponse{StatusCode: http.StatusNoContent}, nil +} + +// auditDeleteSuccess emits the admin_audit slog line both Dynamo and +// S3 forwarded delete handlers need. Centralised so handleDelete and +// handleDeleteBucket do not diverge on the field set, and so a future +// handler that mirrors the same delete shape (e.g. delete-namespace) +// keeps the audit-log contract by reusing this helper rather than +// re-emitting a hand-rolled subset. resourceField is "table" or +// "bucket"; opLabel is the audit "operation" value. +func (s *ForwardServer) auditDeleteSuccess(ctx context.Context, principal AuthPrincipal, forwardedFrom, opLabel, resourceField, name string) { + s.logger.LogAttrs(ctx, slog.LevelInfo, "admin_audit", + slog.String("actor", principal.AccessKey), + slog.String("role", string(principal.Role)), + slog.String("forwarded_from", forwardedFrom), + slog.String("operation", opLabel), + slog.String(resourceField, name), + ) +} + +// handlePutBucketAcl dispatches a forwarded PUT +// /s3/buckets/{name}/acl call. The bridge encodes both the bucket +// name and the new ACL into the payload so the proto stays +// operation-agnostic — same approach handleDeleteBucket takes for +// the bucket name. +func (s *ForwardServer) handlePutBucketAcl(ctx context.Context, principal AuthPrincipal, forwardedFrom string, req *pb.AdminForwardRequest) (*pb.AdminForwardResponse, error) { payload := req.GetPayload() if len(payload) > adminForwardPayloadLimit { return rejectForward(http.StatusRequestEntityTooLarge, "payload_too_large", "forwarded payload exceeds the 64 KiB admin limit") } - // Mirror decodeCreateTableRequest's NUL-byte guard: goccy/go-json - // treats raw NUL as end-of-input so dec.More() would otherwise - // miss `{"name":"users"}\x00{"extra":1}` payloads. Codex P2 on - // PR #635 flagged this as the same smuggling vector that the - // HTTP create path already covers. if bytes.IndexByte(payload, 0) >= 0 { - return rejectForward(http.StatusBadRequest, "invalid_body", "delete payload contains a NUL byte") + return rejectForward(http.StatusBadRequest, "invalid_body", + "put-bucket-acl payload contains a NUL byte") } dec := json.NewDecoder(bytes.NewReader(payload)) dec.DisallowUnknownFields() var body struct { Name string `json:"name"` + ACL string `json:"acl"` } if err := dec.Decode(&body); err != nil { - return rejectForward(http.StatusBadRequest, "invalid_body", "delete payload is not valid JSON") + return rejectForward(http.StatusBadRequest, "invalid_body", + "put-bucket-acl payload is not valid JSON") } if dec.More() { - return rejectForward(http.StatusBadRequest, "invalid_body", "delete payload has trailing data") + return rejectForward(http.StatusBadRequest, "invalid_body", + "put-bucket-acl payload has trailing data") } if body.Name == "" { - return rejectForward(http.StatusBadRequest, "invalid_body", "delete payload missing name") + return rejectForward(http.StatusBadRequest, "invalid_body", + "put-bucket-acl payload missing name") } - // Reject slash-bearing names symmetrically with the HTTP - // handleDelete and handleDescribe paths. Without this, a - // forwarded call could act on `foo/bar` while a leader-direct - // call would 404 — divergent behaviour Codex P2 flagged on - // PR #635. if strings.ContainsRune(body.Name, '/') { - return rejectForward(http.StatusBadRequest, "invalid_body", "delete payload name must not contain '/'") + return rejectForward(http.StatusBadRequest, "invalid_body", + "put-bucket-acl payload name must not contain '/'") } - if err := s.source.AdminDeleteTable(ctx, principal, body.Name); err != nil { - s.logUnexpectedSourceError(ctx, "delete_table", body.Name, forwardedFrom, err) - return forwardErrorResponse("delete", err), nil + if strings.TrimSpace(body.ACL) == "" { + return rejectForward(http.StatusBadRequest, "invalid_body", + "put-bucket-acl payload missing acl") + } + if err := s.buckets.AdminPutBucketAcl(ctx, principal, body.Name, body.ACL); err != nil { + s.logUnexpectedSourceError(ctx, "put_bucket_acl", body.Name, forwardedFrom, err) + return forwardBucketsErrorResponse("put_acl", err), nil } s.logger.LogAttrs(ctx, slog.LevelInfo, "admin_audit", slog.String("actor", principal.AccessKey), slog.String("role", string(principal.Role)), slog.String("forwarded_from", forwardedFrom), - slog.String("operation", "delete_table"), - slog.String("table", body.Name), + slog.String("operation", "put_bucket_acl"), + slog.String("bucket", body.Name), + slog.String("acl", body.ACL), ) return &pb.AdminForwardResponse{StatusCode: http.StatusNoContent}, nil } +// forwardBucketsErrorResponse re-encodes a BucketsSource error into +// the structured shape the follower's bridge can re-emit verbatim. +// Mirrors forwardErrorResponse on the Dynamo side: the same status +// codes and JSON envelopes the leader-direct HTTP path produces in +// writeBucketsError. +func forwardBucketsErrorResponse(op string, err error) *pb.AdminForwardResponse { + switch { + case errors.Is(err, ErrBucketsForbidden): + return mustForwardJSON(http.StatusForbidden, + errorResponse{Error: "forbidden", Message: "this endpoint requires a full-access role"}) + case errors.Is(err, ErrBucketsNotLeader): + // Should never happen on the leader path — the leader just + // verified itself — but a leadership transfer racing with + // the dispatch makes this theoretically reachable. Carry + // retry_after_seconds=1 so the follower's bridge translates + // it back into an HTTP Retry-After header. + resp := mustForwardJSON(http.StatusServiceUnavailable, + errorResponse{Error: "leader_unavailable", Message: "leader stepped down mid-request"}) + resp.RetryAfterSeconds = 1 + return resp + case errors.Is(err, ErrBucketsNotFound): + return mustForwardJSON(http.StatusNotFound, + errorResponse{Error: "not_found", Message: "bucket does not exist"}) + case errors.Is(err, ErrBucketsAlreadyExists): + return mustForwardJSON(http.StatusConflict, + errorResponse{Error: "already_exists", Message: "bucket already exists"}) + case errors.Is(err, ErrBucketsNotEmpty): + return mustForwardJSON(http.StatusConflict, + errorResponse{Error: "bucket_not_empty", + Message: "bucket still has objects; remove them and retry"}) + } + var verr *ValidationError + if errors.As(err, &verr) { + return mustForwardJSON(http.StatusBadRequest, + errorResponse{Error: "invalid_request", Message: verr.Error()}) + } + return mustForwardJSON(http.StatusInternalServerError, + errorResponse{ + Error: "s3_" + op + "_failed", + Message: "failed to " + op + " bucket; see leader logs", + }) +} + // sanitiseForwardedFrom strips CR/LF from a follower-supplied // node id so a malicious value cannot split a single audit log // line into two when slog is using a text-format handler. JSON @@ -219,6 +443,64 @@ func sanitiseForwardedFrom(s string) string { }, s) } +// decodeNamedPayload validates and decodes the {"name": "..."} JSON +// shape both the Dynamo and S3 delete forwarders accept. Returns the +// decoded name on success, or a populated rejection response (and +// nil name) on a 400 / 413. opLabel is the human-facing prefix that +// goes into the rejection messages ("delete" for Dynamo, +// "delete-bucket" for S3) so the response identifies which path +// rejected — and so a future op (e.g. "describe-bucket") that +// reuses this helper still produces an actionable error. +// +// All four guards mirror the leader-direct HTTP path: +// - 64 KiB payload cap (matches adminForwardPayloadLimit elsewhere) +// - NUL-byte rejection (goccy/go-json treats raw NUL as end-of- +// input; without this guard `{"name":"x"}\x00{"extra":1}` +// payloads slip past dec.More(); Codex P2 on PR #635) +// - DisallowUnknownFields + dec.More() trailing-token rejection +// - empty + slash-bearing name rejection (the HTTP handlers +// already 404 slash-bearing names; the forwarded path has to +// reject symmetrically or a hostile follower could act on +// `foo/bar` while a leader-direct call would not). +func decodeNamedPayload(payload []byte, opLabel string) (string, *pb.AdminForwardResponse, error) { + if len(payload) > adminForwardPayloadLimit { + resp, err := rejectForward(http.StatusRequestEntityTooLarge, "payload_too_large", + "forwarded payload exceeds the 64 KiB admin limit") + return "", resp, err + } + if bytes.IndexByte(payload, 0) >= 0 { + resp, err := rejectForward(http.StatusBadRequest, "invalid_body", + opLabel+" payload contains a NUL byte") + return "", resp, err + } + dec := json.NewDecoder(bytes.NewReader(payload)) + dec.DisallowUnknownFields() + var body struct { + Name string `json:"name"` + } + if err := dec.Decode(&body); err != nil { + resp, rerr := rejectForward(http.StatusBadRequest, "invalid_body", + opLabel+" payload is not valid JSON") + return "", resp, rerr + } + if dec.More() { + resp, err := rejectForward(http.StatusBadRequest, "invalid_body", + opLabel+" payload has trailing data") + return "", resp, err + } + if body.Name == "" { + resp, err := rejectForward(http.StatusBadRequest, "invalid_body", + opLabel+" payload missing name") + return "", resp, err + } + if strings.ContainsRune(body.Name, '/') { + resp, err := rejectForward(http.StatusBadRequest, "invalid_body", + opLabel+" payload name must not contain '/'") + return "", resp, err + } + return body.Name, nil, nil +} + // forwardErrorResponse re-encodes a TablesSource error in the // structured shape the follower's handler can re-emit verbatim. This // is the leader-side counterpart of writeTablesError: every status / @@ -270,12 +552,18 @@ func forwardErrorResponse(op string, err error) *pb.AdminForwardResponse { // regressions, and logging them at LevelError would drown the // operational signal. The HTTP path's writeTablesError applies // the same policy (Codex P2 on PR #635 flagged the silent path). -func (s *ForwardServer) logUnexpectedSourceError(ctx context.Context, op, table, forwardedFrom string, err error) { +// +// The resource argument carries either a Dynamo table name or an +// S3 bucket name, depending on op. The slog key is "resource" so +// log queries do not have to know which resource family produced a +// given line — Claude review on PR #673 caught the prior "table" +// key, which made bucket-error queries miss the audit entries. +func (s *ForwardServer) logUnexpectedSourceError(ctx context.Context, op, resource, forwardedFrom string, err error) { if isStructuredSourceError(err) { return } s.logger.LogAttrs(ctx, slog.LevelError, "admin_forward_"+op+"_failed", - slog.String("table", table), + slog.String("resource", resource), slog.String("forwarded_from", forwardedFrom), slog.String("error", err.Error()), ) @@ -290,7 +578,12 @@ func isStructuredSourceError(err error) bool { case errors.Is(err, ErrTablesForbidden), errors.Is(err, ErrTablesNotLeader), errors.Is(err, ErrTablesNotFound), - errors.Is(err, ErrTablesAlreadyExists): + errors.Is(err, ErrTablesAlreadyExists), + errors.Is(err, ErrBucketsForbidden), + errors.Is(err, ErrBucketsNotLeader), + errors.Is(err, ErrBucketsNotFound), + errors.Is(err, ErrBucketsAlreadyExists), + errors.Is(err, ErrBucketsNotEmpty): return true } var verr *ValidationError diff --git a/internal/admin/forward_server_test.go b/internal/admin/forward_server_test.go index 72a6bcf72..c722e29bd 100644 --- a/internal/admin/forward_server_test.go +++ b/internal/admin/forward_server_test.go @@ -445,3 +445,260 @@ func TestForwardServer_DoesNotLogStructuredSourceErrors(t *testing.T) { }) } } + +// newForwardServerWithBucketsForTest is the slice 2b counterpart of +// newForwardServerForTest: wires both a TablesSource (zero-value) and +// a BucketsSource so the bucket-side dispatch tests can mutate the +// inputs without rebuilding the role-lookup boilerplate. +func newForwardServerWithBucketsForTest(buckets BucketsSource, roles MapRoleStore) *ForwardServer { + return NewForwardServer(&stubTablesSource{tables: map[string]*DynamoTableSummary{}}, roles, nil). + WithBucketsSource(buckets) +} + +func TestForwardServer_CreateBucket_HappyPath(t *testing.T) { + buckets := &stubBucketsSource{} + srv := newForwardServerWithBucketsForTest(buckets, fullPrincipalRoleStore()) + body := CreateBucketRequest{BucketName: "public-assets", ACL: "public-read"} + resp, err := srv.Forward(context.Background(), &pb.AdminForwardRequest{ + Principal: &pb.AdminPrincipal{AccessKey: "AKIA_FULL", Role: "full"}, + Operation: pb.AdminOperation_ADMIN_OP_CREATE_BUCKET, + Payload: mustJSON(t, body), + ForwardedFrom: "node-2", + }) + require.NoError(t, err) + require.Equal(t, int32(http.StatusCreated), resp.GetStatusCode()) + require.Equal(t, "public-assets", buckets.lastCreateInput.BucketName) + require.Equal(t, RoleFull, buckets.lastCreatePrincipal.Role) + var summary BucketSummary + require.NoError(t, json.Unmarshal(resp.GetPayload(), &summary)) + require.Equal(t, "public-assets", summary.Name) +} + +func TestForwardServer_BucketOps_NoBucketsSourceReturns501(t *testing.T) { + // Builds without S3 do not call WithBucketsSource; the leader + // must still reject every bucket operation cleanly with 501 + // instead of reaching for a nil receiver and panicking. Sweep + // over all three operations so a future op added without a + // nil-receiver guard fails CI immediately (Claude review on + // PR #673 caught the original test only covering CREATE_BUCKET). + cases := []struct { + name string + op pb.AdminOperation + payload []byte + }{ + { + name: "create", + op: pb.AdminOperation_ADMIN_OP_CREATE_BUCKET, + payload: mustJSON(t, CreateBucketRequest{BucketName: "x"}), + }, + { + name: "delete", + op: pb.AdminOperation_ADMIN_OP_DELETE_BUCKET, + payload: []byte(`{"name":"x"}`), + }, + { + name: "put_acl", + op: pb.AdminOperation_ADMIN_OP_PUT_BUCKET_ACL, + payload: []byte(`{"name":"x","acl":"private"}`), + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + srv := newForwardServerForTest(&stubTablesSource{tables: map[string]*DynamoTableSummary{}}, fullPrincipalRoleStore()) + resp, err := srv.Forward(context.Background(), &pb.AdminForwardRequest{ + Principal: &pb.AdminPrincipal{AccessKey: "AKIA_FULL", Role: "full"}, + Operation: tc.op, + Payload: tc.payload, + }) + require.NoError(t, err) + require.Equal(t, int32(http.StatusNotImplemented), resp.GetStatusCode()) + require.Contains(t, string(resp.GetPayload()), "not_implemented") + }) + } +} + +func TestForwardServer_DynamoOps_NoTablesSourceReturns501(t *testing.T) { + // S3-only deployments construct ForwardServer with a nil + // TablesSource so leaders can still register the gRPC service + // for follower-forwarded bucket writes. Dynamo operations must + // then reject cleanly with 501 instead of dereferencing the nil + // source and panicking. Symmetric with + // TestForwardServer_BucketOps_NoBucketsSourceReturns501 for the + // inverse Dynamo-only build (Codex P1 on PR #673). + cases := []struct { + name string + op pb.AdminOperation + payload []byte + }{ + { + name: "create_table", + op: pb.AdminOperation_ADMIN_OP_CREATE_TABLE, + payload: mustJSON(t, CreateTableRequest{TableName: "orders"}), + }, + { + name: "delete_table", + op: pb.AdminOperation_ADMIN_OP_DELETE_TABLE, + payload: []byte(`{"name":"orders"}`), + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + srv := NewForwardServer(nil, fullPrincipalRoleStore(), nil). + WithBucketsSource(&stubBucketsSource{}) + resp, err := srv.Forward(context.Background(), &pb.AdminForwardRequest{ + Principal: &pb.AdminPrincipal{AccessKey: "AKIA_FULL", Role: "full"}, + Operation: tc.op, + Payload: tc.payload, + }) + require.NoError(t, err) + require.Equal(t, int32(http.StatusNotImplemented), resp.GetStatusCode()) + require.Contains(t, string(resp.GetPayload()), "not_implemented") + }) + } +} + +func TestForwardServer_CreateBucket_RejectsWhitespacePaddedName(t *testing.T) { + // Validation parity with the HTTP path's + // validateCreateBucketRequest: a name like " bucket " must + // produce the same 400 invalid_body the leader-direct path + // emits, instead of slipping through and hitting the lower- + // level adapter validator with a less actionable error + // (Gemini security-high + Claude #2 on PR #673). + srv := newForwardServerWithBucketsForTest(&stubBucketsSource{}, fullPrincipalRoleStore()) + resp, err := srv.Forward(context.Background(), &pb.AdminForwardRequest{ + Principal: &pb.AdminPrincipal{AccessKey: "AKIA_FULL", Role: "full"}, + Operation: pb.AdminOperation_ADMIN_OP_CREATE_BUCKET, + Payload: mustJSON(t, CreateBucketRequest{BucketName: " padded "}), + }) + require.NoError(t, err) + require.Equal(t, int32(http.StatusBadRequest), resp.GetStatusCode()) + require.Contains(t, string(resp.GetPayload()), "leading or trailing whitespace") +} + +func TestForwardServer_CreateBucket_BadJSONReturns400(t *testing.T) { + srv := newForwardServerWithBucketsForTest(&stubBucketsSource{}, fullPrincipalRoleStore()) + resp, err := srv.Forward(context.Background(), &pb.AdminForwardRequest{ + Principal: &pb.AdminPrincipal{AccessKey: "AKIA_FULL", Role: "full"}, + Operation: pb.AdminOperation_ADMIN_OP_CREATE_BUCKET, + Payload: []byte("{not json"), + }) + require.NoError(t, err) + require.Equal(t, int32(http.StatusBadRequest), resp.GetStatusCode()) + require.Contains(t, string(resp.GetPayload()), "invalid_body") +} + +func TestForwardServer_CreateBucket_AlreadyExistsReturns409(t *testing.T) { + buckets := &stubBucketsSource{ + buckets: map[string]BucketSummary{"existing": {Name: "existing"}}, + createErr: ErrBucketsAlreadyExists, + } + srv := newForwardServerWithBucketsForTest(buckets, fullPrincipalRoleStore()) + resp, err := srv.Forward(context.Background(), &pb.AdminForwardRequest{ + Principal: &pb.AdminPrincipal{AccessKey: "AKIA_FULL", Role: "full"}, + Operation: pb.AdminOperation_ADMIN_OP_CREATE_BUCKET, + Payload: mustJSON(t, CreateBucketRequest{BucketName: "existing"}), + }) + require.NoError(t, err) + require.Equal(t, int32(http.StatusConflict), resp.GetStatusCode()) + require.Contains(t, string(resp.GetPayload()), "already_exists") +} + +func TestForwardServer_DeleteBucket_HappyPath(t *testing.T) { + buckets := &stubBucketsSource{ + buckets: map[string]BucketSummary{"orders": {Name: "orders"}}, + } + srv := newForwardServerWithBucketsForTest(buckets, fullPrincipalRoleStore()) + resp, err := srv.Forward(context.Background(), &pb.AdminForwardRequest{ + Principal: &pb.AdminPrincipal{AccessKey: "AKIA_FULL", Role: "full"}, + Operation: pb.AdminOperation_ADMIN_OP_DELETE_BUCKET, + Payload: []byte(`{"name":"orders"}`), + }) + require.NoError(t, err) + require.Equal(t, int32(http.StatusNoContent), resp.GetStatusCode()) + require.Equal(t, "orders", buckets.lastDeleteName) +} + +func TestForwardServer_DeleteBucket_NotEmptyReturns409(t *testing.T) { + buckets := &stubBucketsSource{deleteErr: ErrBucketsNotEmpty} + srv := newForwardServerWithBucketsForTest(buckets, fullPrincipalRoleStore()) + resp, err := srv.Forward(context.Background(), &pb.AdminForwardRequest{ + Principal: &pb.AdminPrincipal{AccessKey: "AKIA_FULL", Role: "full"}, + Operation: pb.AdminOperation_ADMIN_OP_DELETE_BUCKET, + Payload: []byte(`{"name":"orders"}`), + }) + require.NoError(t, err) + require.Equal(t, int32(http.StatusConflict), resp.GetStatusCode()) + require.Contains(t, string(resp.GetPayload()), "bucket_not_empty") +} + +func TestForwardServer_DeleteBucket_RejectsSlashInName(t *testing.T) { + srv := newForwardServerWithBucketsForTest(&stubBucketsSource{}, fullPrincipalRoleStore()) + resp, err := srv.Forward(context.Background(), &pb.AdminForwardRequest{ + Principal: &pb.AdminPrincipal{AccessKey: "AKIA_FULL", Role: "full"}, + Operation: pb.AdminOperation_ADMIN_OP_DELETE_BUCKET, + Payload: []byte(`{"name":"foo/bar"}`), + }) + require.NoError(t, err) + require.Equal(t, int32(http.StatusBadRequest), resp.GetStatusCode()) + require.Contains(t, string(resp.GetPayload()), "must not contain") +} + +func TestForwardServer_PutBucketAcl_HappyPath(t *testing.T) { + buckets := &stubBucketsSource{ + buckets: map[string]BucketSummary{"orders": {Name: "orders", ACL: "private"}}, + } + srv := newForwardServerWithBucketsForTest(buckets, fullPrincipalRoleStore()) + resp, err := srv.Forward(context.Background(), &pb.AdminForwardRequest{ + Principal: &pb.AdminPrincipal{AccessKey: "AKIA_FULL", Role: "full"}, + Operation: pb.AdminOperation_ADMIN_OP_PUT_BUCKET_ACL, + Payload: []byte(`{"name":"orders","acl":"public-read"}`), + }) + require.NoError(t, err) + require.Equal(t, int32(http.StatusNoContent), resp.GetStatusCode()) + require.Equal(t, "orders", buckets.lastPutACLBucket) + require.Equal(t, "public-read", buckets.lastPutACLValue) +} + +func TestForwardServer_PutBucketAcl_RejectsMissingACL(t *testing.T) { + srv := newForwardServerWithBucketsForTest(&stubBucketsSource{}, fullPrincipalRoleStore()) + resp, err := srv.Forward(context.Background(), &pb.AdminForwardRequest{ + Principal: &pb.AdminPrincipal{AccessKey: "AKIA_FULL", Role: "full"}, + Operation: pb.AdminOperation_ADMIN_OP_PUT_BUCKET_ACL, + Payload: []byte(`{"name":"orders"}`), + }) + require.NoError(t, err) + require.Equal(t, int32(http.StatusBadRequest), resp.GetStatusCode()) + require.Contains(t, string(resp.GetPayload()), "missing acl") +} + +func TestForwardServer_PutBucketAcl_RejectsSlashInName(t *testing.T) { + srv := newForwardServerWithBucketsForTest(&stubBucketsSource{}, fullPrincipalRoleStore()) + resp, err := srv.Forward(context.Background(), &pb.AdminForwardRequest{ + Principal: &pb.AdminPrincipal{AccessKey: "AKIA_FULL", Role: "full"}, + Operation: pb.AdminOperation_ADMIN_OP_PUT_BUCKET_ACL, + Payload: []byte(`{"name":"foo/bar","acl":"private"}`), + }) + require.NoError(t, err) + require.Equal(t, int32(http.StatusBadRequest), resp.GetStatusCode()) +} + +func TestForwardServer_BucketOps_PayloadTooLargeReturns413(t *testing.T) { + cases := []pb.AdminOperation{ + pb.AdminOperation_ADMIN_OP_CREATE_BUCKET, + pb.AdminOperation_ADMIN_OP_DELETE_BUCKET, + pb.AdminOperation_ADMIN_OP_PUT_BUCKET_ACL, + } + for _, op := range cases { + t.Run(op.String(), func(t *testing.T) { + srv := newForwardServerWithBucketsForTest(&stubBucketsSource{}, fullPrincipalRoleStore()) + big := make([]byte, adminForwardPayloadLimit+1) + resp, err := srv.Forward(context.Background(), &pb.AdminForwardRequest{ + Principal: &pb.AdminPrincipal{AccessKey: "AKIA_FULL", Role: "full"}, + Operation: op, + Payload: big, + }) + require.NoError(t, err) + require.Equal(t, int32(http.StatusRequestEntityTooLarge), resp.GetStatusCode()) + }) + } +} diff --git a/internal/admin/s3_handler.go b/internal/admin/s3_handler.go index ec9c347ab..a4a96e9b0 100644 --- a/internal/admin/s3_handler.go +++ b/internal/admin/s3_handler.go @@ -55,9 +55,10 @@ const pathSuffixACL = "/acl" // continue mutating until the token expires (Codex P1 on PR #635 // applied the same fix on the Dynamo side). type S3Handler struct { - source BucketsSource - logger *slog.Logger - roles RoleStore + source BucketsSource + logger *slog.Logger + roles RoleStore + forwarder LeaderForwarder } // NewS3Handler wires a BucketsSource into the HTTP handler. Returns @@ -91,6 +92,15 @@ func (h *S3Handler) WithRoleStore(roles RoleStore) *S3Handler { return h } +// WithLeaderForwarder wires the LeaderForwarder used to hand +// follower-side ErrBucketsNotLeader writes off to the leader. +// nil keeps forwarding disabled (the handler falls back to +// 503 + Retry-After:1 directly, mirroring DynamoHandler's contract). +func (h *S3Handler) WithLeaderForwarder(f LeaderForwarder) *S3Handler { + h.forwarder = f + return h +} + // ServeHTTP routes /buckets, /buckets/{name}, and /buckets/{name}/acl. func (h *S3Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { switch { @@ -280,6 +290,9 @@ func (h *S3Handler) handleCreate(w http.ResponseWriter, r *http.Request) { } summary, err := h.source.AdminCreateBucket(r.Context(), principal, body) if err != nil { + if h.tryForwardCreateBucket(w, r, principal, body, err) { + return + } h.writeBucketsError(w, r, "create", err) return } @@ -311,6 +324,9 @@ func (h *S3Handler) handlePutACL(w http.ResponseWriter, r *http.Request, name st return } if err := h.source.AdminPutBucketAcl(r.Context(), principal, name, body.ACL); err != nil { + if h.tryForwardPutBucketAcl(w, r, principal, name, body.ACL, err) { + return + } h.writeBucketsError(w, r, "put_acl", err) return } @@ -330,6 +346,9 @@ func (h *S3Handler) handleDelete(w http.ResponseWriter, r *http.Request, name st return } if err := h.source.AdminDeleteBucket(r.Context(), principal, name); err != nil { + if h.tryForwardDeleteBucket(w, r, principal, name, err) { + return + } h.writeBucketsError(w, r, "delete", err) return } @@ -485,6 +504,89 @@ func decodeAdminS3JSONBody[T any](body io.Reader) (T, error) { return out, nil } +// tryForwardCreateBucket / tryForwardPutBucketAcl / +// tryForwardDeleteBucket mirror the Dynamo handler's forwarding +// pattern: only the ErrBucketsNotLeader source error triggers +// forwarding, and only when a forwarder is configured. Each helper +// owns the leader-response replay so the per-method handlers stay +// short. +func (h *S3Handler) tryForwardCreateBucket(w http.ResponseWriter, r *http.Request, principal AuthPrincipal, body CreateBucketRequest, sourceErr error) bool { + if !errors.Is(sourceErr, ErrBucketsNotLeader) || h.forwarder == nil { + return false + } + res, err := h.forwarder.ForwardCreateBucket(r.Context(), principal, body) + if err != nil { + h.writeForwardFailure(w, r, "create", err) + return true + } + h.writeForwardResult(w, r, res) + return true +} + +func (h *S3Handler) tryForwardPutBucketAcl(w http.ResponseWriter, r *http.Request, principal AuthPrincipal, name, acl string, sourceErr error) bool { + if !errors.Is(sourceErr, ErrBucketsNotLeader) || h.forwarder == nil { + return false + } + res, err := h.forwarder.ForwardPutBucketAcl(r.Context(), principal, name, acl) + if err != nil { + h.writeForwardFailure(w, r, "put_acl", err) + return true + } + h.writeForwardResult(w, r, res) + return true +} + +func (h *S3Handler) tryForwardDeleteBucket(w http.ResponseWriter, r *http.Request, principal AuthPrincipal, name string, sourceErr error) bool { + if !errors.Is(sourceErr, ErrBucketsNotLeader) || h.forwarder == nil { + return false + } + res, err := h.forwarder.ForwardDeleteBucket(r.Context(), principal, name) + if err != nil { + h.writeForwardFailure(w, r, "delete", err) + return true + } + h.writeForwardResult(w, r, res) + return true +} + +// writeForwardResult re-emits the leader's structured response +// verbatim. Mirrors DynamoHandler.writeForwardResult — same headers +// and 503-+-Retry-After contract. +func (h *S3Handler) writeForwardResult(w http.ResponseWriter, r *http.Request, res *ForwardResult) { + w.Header().Set("Content-Type", res.ContentType) + w.Header().Set("X-Content-Type-Options", "nosniff") + w.Header().Set("Cache-Control", "no-store") + if res.StatusCode == http.StatusServiceUnavailable { + w.Header().Set("Retry-After", "1") + } + w.WriteHeader(res.StatusCode) + if len(res.Payload) > 0 { + if _, err := w.Write(res.Payload); err != nil { + h.logger.LogAttrs(r.Context(), slog.LevelWarn, "admin s3 forward response write failed", + slog.String("error", err.Error()), + ) + } + } +} + +// writeForwardFailure handles forwarder-side errors that did not +// produce a structured leader response: ErrLeaderUnavailable +// (election in flight) and gRPC transport errors. Both surface as +// 503 + Retry-After: 1 so the SPA's retry contract is uniform. +// ErrLeaderUnavailable is intentionally NOT logged at LevelError +// — elections are routine; transport errors get logged so +// operators can investigate. +func (h *S3Handler) writeForwardFailure(w http.ResponseWriter, r *http.Request, op string, err error) { + if !errors.Is(err, ErrLeaderUnavailable) { + h.logger.LogAttrs(r.Context(), slog.LevelError, "admin s3 "+op+" forward failed", + slog.String("error", err.Error()), + ) + } + w.Header().Set("Retry-After", "1") + writeJSONError(w, http.StatusServiceUnavailable, "leader_unavailable", + "raft leader currently unavailable; retry shortly") +} + // paginateBuckets slices `buckets` (already lex-sorted by the // adapter) into a single page starting strictly after `startAfter`. // Returns the page plus the opaque cursor for the next call ("" if diff --git a/internal/admin/s3_handler_test.go b/internal/admin/s3_handler_test.go index 16449ebd7..8b24067df 100644 --- a/internal/admin/s3_handler_test.go +++ b/internal/admin/s3_handler_test.go @@ -700,6 +700,141 @@ func TestS3Handler_WriteEndpoints_ValidationErrorReturns400(t *testing.T) { }) } +// notLeaderBucketsSource simulates a follower's BucketsSource — every +// write path returns ErrBucketsNotLeader. Used to exercise the +// tryForward* integration path on S3Handler. +type notLeaderBucketsSource struct { + stubBucketsSource +} + +func (s *notLeaderBucketsSource) AdminCreateBucket(_ context.Context, _ AuthPrincipal, _ CreateBucketRequest) (*BucketSummary, error) { + return nil, ErrBucketsNotLeader +} + +func (s *notLeaderBucketsSource) AdminPutBucketAcl(_ context.Context, _ AuthPrincipal, _ string, _ string) error { + return ErrBucketsNotLeader +} + +func (s *notLeaderBucketsSource) AdminDeleteBucket(_ context.Context, _ AuthPrincipal, _ string) error { + return ErrBucketsNotLeader +} + +func TestS3Handler_CreateBucket_ForwardsOnNotLeader(t *testing.T) { + src := ¬LeaderBucketsSource{} + fwd := &stubLeaderForwarder{createBucketRes: &ForwardResult{ + StatusCode: http.StatusCreated, + Payload: []byte(`{"bucket_name":"public-assets","acl":"public-read"}`), + ContentType: "application/json; charset=utf-8", + }} + h := NewS3Handler(src).WithLeaderForwarder(fwd) + req := httptest.NewRequest(http.MethodPost, pathS3Buckets, + strings.NewReader(validCreateBucketBody())) + req = withFullPrincipal(req) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusCreated, rec.Code) + require.Equal(t, "public-assets", fwd.lastCreateBucketInput.BucketName, + "forwarder must be invoked when source returns ErrBucketsNotLeader") + require.JSONEq(t, `{"bucket_name":"public-assets","acl":"public-read"}`, rec.Body.String()) +} + +func TestS3Handler_DeleteBucket_ForwardsOnNotLeader(t *testing.T) { + src := ¬LeaderBucketsSource{} + fwd := &stubLeaderForwarder{deleteBucketRes: &ForwardResult{ + StatusCode: http.StatusNoContent, + ContentType: "application/json; charset=utf-8", + }} + h := NewS3Handler(src).WithLeaderForwarder(fwd) + req := httptest.NewRequest(http.MethodDelete, pathS3Buckets+"/orders", nil) + req = withFullPrincipal(req) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusNoContent, rec.Code) + require.Equal(t, "orders", fwd.lastDeleteBucketName) +} + +func TestS3Handler_PutBucketAcl_ForwardsOnNotLeader(t *testing.T) { + src := ¬LeaderBucketsSource{} + fwd := &stubLeaderForwarder{putACLRes: &ForwardResult{ + StatusCode: http.StatusNoContent, + ContentType: "application/json; charset=utf-8", + }} + h := NewS3Handler(src).WithLeaderForwarder(fwd) + req := httptest.NewRequest(http.MethodPut, pathS3Buckets+"/orders/acl", + strings.NewReader(`{"acl":"public-read"}`)) + req = withFullPrincipal(req) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusNoContent, rec.Code) + require.Equal(t, "orders", fwd.lastPutACLBucket) + require.Equal(t, "public-read", fwd.lastPutACLValue) +} + +func TestS3Handler_CreateBucket_ForwarderLeaderUnavailableReturns503(t *testing.T) { + // ErrLeaderUnavailable from the forwarder layer maps to 503 + + // Retry-After:1 — the SPA's retry contract is uniform whether + // the leader is briefly absent or the network hiccupped. + src := ¬LeaderBucketsSource{} + fwd := &stubLeaderForwarder{createBucketErr: ErrLeaderUnavailable} + h := NewS3Handler(src).WithLeaderForwarder(fwd) + req := httptest.NewRequest(http.MethodPost, pathS3Buckets, + strings.NewReader(validCreateBucketBody())) + req = withFullPrincipal(req) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusServiceUnavailable, rec.Code) + require.Equal(t, "1", rec.Header().Get("Retry-After")) + require.Contains(t, rec.Body.String(), "leader_unavailable") +} + +func TestS3Handler_CreateBucket_ForwarderTransportErrorReturns503(t *testing.T) { + // Generic gRPC error → 503 + Retry-After. The error is logged + // on the server but never surfaces to the SPA. + src := ¬LeaderBucketsSource{} + fwd := &stubLeaderForwarder{createBucketErr: errors.New("gRPC sentinel TX-1")} + h := NewS3Handler(src).WithLeaderForwarder(fwd) + req := httptest.NewRequest(http.MethodPost, pathS3Buckets, + strings.NewReader(validCreateBucketBody())) + req = withFullPrincipal(req) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusServiceUnavailable, rec.Code) + require.Equal(t, "1", rec.Header().Get("Retry-After")) + require.NotContains(t, rec.Body.String(), "TX-1", + "transport error detail must not leak to the client") +} + +func TestS3Handler_CreateBucket_ForwarderNotInvokedForNonNotLeader(t *testing.T) { + // The forwarder gate must run ONLY on ErrBucketsNotLeader; a + // generic source error or AlreadyExists must fall through to + // writeBucketsError. Otherwise a leader-direct 409 would be + // silently re-applied at the leader. + cases := []struct { + name string + err error + wantCode int + }{ + {"already_exists", ErrBucketsAlreadyExists, http.StatusConflict}, + {"forbidden", ErrBucketsForbidden, http.StatusForbidden}, + {"generic", errors.New("opaque storage failure"), http.StatusInternalServerError}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + fwd := &stubLeaderForwarder{} + src := &stubBucketsSource{createErr: tc.err} + h := NewS3Handler(src).WithLeaderForwarder(fwd) + req := httptest.NewRequest(http.MethodPost, pathS3Buckets, + strings.NewReader(validCreateBucketBody())) + req = withFullPrincipal(req) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, tc.wantCode, rec.Code) + require.Empty(t, fwd.lastCreateBucketInput.BucketName, + "forwarder must not be invoked for source error: %s", tc.name) + }) + } +} + func TestS3Handler_WriteEndpoints_RejectMissingPrincipal(t *testing.T) { // Without a session principal in the context, writes must // 401 — SessionAuth normally enforces this; principalForWrite diff --git a/internal/admin/server.go b/internal/admin/server.go index 89d0e3281..89b5a5225 100644 --- a/internal/admin/server.go +++ b/internal/admin/server.go @@ -169,16 +169,20 @@ func buildDynamoHandlerForDeps(deps ServerDeps, logger *slog.Logger) http.Handle // buildS3HandlerForDeps is the parallel constructor for the S3 // admin handler. Wires the live RoleStore so write endpoints -// re-validate the principal on every request — mirrors the Dynamo -// side. The future LeaderForwarder integration will plumb through -// the same shape. +// re-validate the principal on every request, plus the +// LeaderForwarder so a follower hands ErrBucketsNotLeader writes +// off to the leader transparently — both mirror the Dynamo side. func buildS3HandlerForDeps(deps ServerDeps, logger *slog.Logger) http.Handler { if deps.Buckets == nil { return nil } - return NewS3Handler(deps.Buckets). + h := NewS3Handler(deps.Buckets). WithLogger(logger). WithRoleStore(MapRoleStore(deps.Roles)) + if deps.Forwarder != nil { + h = h.WithLeaderForwarder(deps.Forwarder) + } + return h } // Handler returns an http.Handler that serves the full admin surface. diff --git a/main.go b/main.go index 05176eff0..d8a7cde8f 100644 --- a/main.go +++ b/main.go @@ -1323,21 +1323,28 @@ func (r *runtimeServerRunner) start() error { if err := startRedisServer(r.ctx, r.lc, r.eg, r.redisAddress, r.shardStore, r.coordinate, r.leaderRedis, r.pubsubRelay, r.metricsRegistry, r.readTracker); err != nil { return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err) } - // startDynamoDBServer must run BEFORE startRaftServers so the - // resulting DynamoDBServer is available to the leader-side gRPC - // AdminForward registration in startRaftServers (design 3.3). - // Both servers listen on different addresses; the dynamo HTTP - // listener accepting traffic before raft TCP listeners are up - // is no different from the existing startup-race semantics — a - // hit in that window already returned 503 before this reorder. + // startDynamoDBServer + startS3Server must run BEFORE + // startRaftServers so the resulting *DynamoDBServer / *S3Server + // are available to the leader-side gRPC AdminForward registration + // in startRaftServers (design 3.3, P2 slice 2b). Each server + // listens on its own address; them accepting traffic before the + // raft TCP listeners are up is no different from the existing + // startup-race semantics — a hit in that window already returned + // 503 before this reorder. dynamoServer, err := startDynamoDBServer(r.ctx, r.lc, r.eg, r.dynamoAddress, r.shardStore, r.coordinate, r.leaderDynamo, r.metricsRegistry, r.readTracker) if err != nil { return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err) } r.dynamoServer = dynamoServer + s3Server, err := startS3Server(r.ctx, r.lc, r.eg, r.s3Address, r.shardStore, r.coordinate, r.leaderS3, r.s3Region, r.s3CredsFile, r.s3PathStyleOnly, r.readTracker) + if err != nil { + return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err) + } + r.s3Server = s3Server forwardDeps := adminForwardServerDeps{ - tables: newDynamoTablesSource(r.dynamoServer), - roles: r.roleStore, + tables: newDynamoTablesSource(r.dynamoServer), + buckets: newBucketsSource(r.s3Server), + roles: r.roleStore, } if err := startRaftServers( r.ctx, @@ -1357,11 +1364,6 @@ func (r *runtimeServerRunner) start() error { ); err != nil { return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err) } - s3Server, err := startS3Server(r.ctx, r.lc, r.eg, r.s3Address, r.shardStore, r.coordinate, r.leaderS3, r.s3Region, r.s3CredsFile, r.s3PathStyleOnly, r.readTracker) - if err != nil { - return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err) - } - r.s3Server = s3Server if err := startSQSServer(r.ctx, r.lc, r.eg, r.sqsAddress, r.shardStore, r.coordinate, r.leaderSQS, r.sqsRegion, r.sqsCredsFile); err != nil { return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err) } diff --git a/main_admin_forward.go b/main_admin_forward.go index c06e57165..41c5f527b 100644 --- a/main_admin_forward.go +++ b/main_admin_forward.go @@ -62,21 +62,28 @@ func buildLeaderForwarder(coordinate kv.Coordinator, connCache *kv.GRPCConnCache // adminForwardServerDeps is the small bundle the gRPC ForwardServer // needs to be reachable from a follower's bridge. Collecting them in // a struct keeps startRaftServers' signature tractable as the wiring -// surface grows. All fields are required; a missing one means the -// ForwardServer is not registered (single-node / leader-only build). +// surface grows. tables + roles are required; buckets is optional +// (a build that ships without the S3 adapter sets it to nil and the +// ForwardServer's S3 dispatch returns 501 for those operations). type adminForwardServerDeps struct { - tables admin.TablesSource - roles admin.RoleStore + tables admin.TablesSource + buckets admin.BucketsSource + roles admin.RoleStore } // readyForRegistration reports whether the bundle has enough -// collaborators to construct + register a ForwardServer. Both fields -// must be non-nil; a nil TablesSource means the build ships without -// the dynamo adapter, and a nil RoleStore means admin auth is not -// configured. Either way, registering the gRPC service would 500 -// every forwarded call, so we silently skip registration instead. +// collaborators to construct + register a ForwardServer. +// RoleStore is always required (the principal re-evaluation step +// has no fallback). At least one of TablesSource or BucketsSource +// must be present — registering with neither would respond 501 to +// every operation, which is functionally identical to not +// registering at all. The dispatcher emits 501 for whichever +// source is nil so an S3-only or Dynamo-only build still serves +// its half of the admin surface (Codex P1 on PR #673: an S3-only +// cluster previously skipped registration entirely and surfaced +// follower forwards as gRPC Unimplemented / 503). func (d adminForwardServerDeps) readyForRegistration() bool { - return d.tables != nil && d.roles != nil + return d.roles != nil && (d.tables != nil || d.buckets != nil) } // registerAdminForwardServer attaches the leader-side gRPC @@ -89,7 +96,11 @@ func registerAdminForwardServer(gs *grpc.Server, deps adminForwardServerDeps, lo if !deps.readyForRegistration() { return } - pb.RegisterAdminForwardServer(gs, admin.NewForwardServer(deps.tables, deps.roles, logger)) + srv := admin.NewForwardServer(deps.tables, deps.roles, logger) + if deps.buckets != nil { + srv = srv.WithBucketsSource(deps.buckets) + } + pb.RegisterAdminForwardServer(gs, srv) } // roleStoreFromFlags builds the same access-key → role map that diff --git a/main_admin_forward_test.go b/main_admin_forward_test.go index f82037036..b01245694 100644 --- a/main_admin_forward_test.go +++ b/main_admin_forward_test.go @@ -110,18 +110,40 @@ func TestRoleStoreFromFlags(t *testing.T) { func TestAdminForwardServerDeps_ReadyForRegistration(t *testing.T) { // The bundle's readyForRegistration gate decides whether - // startRaftServers wires the gRPC ForwardServer at all. A nil - // TablesSource (cluster-only build) or nil RoleStore (admin - // auth disabled) means a registered service would 500 every - // forwarded call — silently skipping registration is the - // preferred behaviour. - require.False(t, adminForwardServerDeps{}.readyForRegistration()) - require.False(t, adminForwardServerDeps{tables: dummyTablesSource{}}.readyForRegistration()) - require.False(t, adminForwardServerDeps{roles: admin.MapRoleStore{}}.readyForRegistration()) + // startRaftServers wires the gRPC ForwardServer at all. + // RoleStore is always required (admin auth disabled means the + // principal re-evaluation step has nothing to compare against). + // At least one of TablesSource / BucketsSource must be present; + // registering with neither would 501 every operation, which is + // indistinguishable from not registering at all. + // + // The S3-only case (Codex P1 on PR #673) used to fail this gate + // because the predicate required tables != nil — a cluster + // started with --dynamoAddr empty but S3 enabled never + // registered AdminForward, and follower-side S3 writes hit + // gRPC Unimplemented / 503 instead of reaching the leader. The + // "buckets only" assertion below pins the fix. + require.False(t, adminForwardServerDeps{}.readyForRegistration(), + "empty bundle must not register") + require.False(t, adminForwardServerDeps{tables: dummyTablesSource{}}.readyForRegistration(), + "missing roles must not register") + require.False(t, adminForwardServerDeps{buckets: dummyBucketsSource{}}.readyForRegistration(), + "missing roles must not register (S3-only)") + require.False(t, adminForwardServerDeps{roles: admin.MapRoleStore{}}.readyForRegistration(), + "roles without any source must not register") require.True(t, adminForwardServerDeps{ tables: dummyTablesSource{}, roles: admin.MapRoleStore{}, - }.readyForRegistration()) + }.readyForRegistration(), "Dynamo-only deployment must register") + require.True(t, adminForwardServerDeps{ + buckets: dummyBucketsSource{}, + roles: admin.MapRoleStore{}, + }.readyForRegistration(), "S3-only deployment must register") + require.True(t, adminForwardServerDeps{ + tables: dummyTablesSource{}, + buckets: dummyBucketsSource{}, + roles: admin.MapRoleStore{}, + }.readyForRegistration(), "full bundle must register") } func TestBuildAdminLeaderForwarder_NilGateReturnsNoForwarder(t *testing.T) { @@ -223,3 +245,31 @@ func (dummyTablesSource) AdminCreateTable(_ context.Context, _ admin.AuthPrincip func (dummyTablesSource) AdminDeleteTable(_ context.Context, _ admin.AuthPrincipal, _ string) error { panic("dummyTablesSource.AdminDeleteTable should not be invoked") } + +// dummyBucketsSource is the smallest concrete admin.BucketsSource +// for the readyForRegistration gate test — symmetric with +// dummyTablesSource. The S3-only branch of the gate (Codex P1 on +// PR #673) needs a non-nil BucketsSource value to assert; using a +// real adapter source would pull S3 wiring into a main_admin test +// that is only checking the registration predicate. +type dummyBucketsSource struct{} + +func (dummyBucketsSource) AdminListBuckets(_ context.Context) ([]admin.BucketSummary, error) { + panic("dummyBucketsSource.AdminListBuckets should not be invoked") +} + +func (dummyBucketsSource) AdminDescribeBucket(_ context.Context, _ string) (*admin.BucketSummary, bool, error) { + panic("dummyBucketsSource.AdminDescribeBucket should not be invoked") +} + +func (dummyBucketsSource) AdminCreateBucket(_ context.Context, _ admin.AuthPrincipal, _ admin.CreateBucketRequest) (*admin.BucketSummary, error) { + panic("dummyBucketsSource.AdminCreateBucket should not be invoked") +} + +func (dummyBucketsSource) AdminPutBucketAcl(_ context.Context, _ admin.AuthPrincipal, _, _ string) error { + panic("dummyBucketsSource.AdminPutBucketAcl should not be invoked") +} + +func (dummyBucketsSource) AdminDeleteBucket(_ context.Context, _ admin.AuthPrincipal, _ string) error { + panic("dummyBucketsSource.AdminDeleteBucket should not be invoked") +} diff --git a/proto/admin_forward.pb.go b/proto/admin_forward.pb.go index bdedffe68..67865005a 100644 --- a/proto/admin_forward.pb.go +++ b/proto/admin_forward.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.11 -// protoc v7.34.0 +// protoc v5.29.3 // source: admin_forward.proto package proto @@ -31,6 +31,12 @@ const ( AdminOperation_ADMIN_OP_UNSPECIFIED AdminOperation = 0 AdminOperation_ADMIN_OP_CREATE_TABLE AdminOperation = 1 AdminOperation_ADMIN_OP_DELETE_TABLE AdminOperation = 2 + // S3 bucket admin operations (P2 slice 2b). New values appended + // after the Dynamo block so the wire-format integers for the + // existing operations stay stable. + AdminOperation_ADMIN_OP_CREATE_BUCKET AdminOperation = 3 + AdminOperation_ADMIN_OP_DELETE_BUCKET AdminOperation = 4 + AdminOperation_ADMIN_OP_PUT_BUCKET_ACL AdminOperation = 5 ) // Enum value maps for AdminOperation. @@ -39,11 +45,17 @@ var ( 0: "ADMIN_OP_UNSPECIFIED", 1: "ADMIN_OP_CREATE_TABLE", 2: "ADMIN_OP_DELETE_TABLE", + 3: "ADMIN_OP_CREATE_BUCKET", + 4: "ADMIN_OP_DELETE_BUCKET", + 5: "ADMIN_OP_PUT_BUCKET_ACL", } AdminOperation_value = map[string]int32{ - "ADMIN_OP_UNSPECIFIED": 0, - "ADMIN_OP_CREATE_TABLE": 1, - "ADMIN_OP_DELETE_TABLE": 2, + "ADMIN_OP_UNSPECIFIED": 0, + "ADMIN_OP_CREATE_TABLE": 1, + "ADMIN_OP_DELETE_TABLE": 2, + "ADMIN_OP_CREATE_BUCKET": 3, + "ADMIN_OP_DELETE_BUCKET": 4, + "ADMIN_OP_PUT_BUCKET_ACL": 5, } ) @@ -310,11 +322,14 @@ const file_admin_forward_proto_rawDesc = "" + "statusCode\x12\x18\n" + "\apayload\x18\x02 \x01(\fR\apayload\x12!\n" + "\fcontent_type\x18\x03 \x01(\tR\vcontentType\x12.\n" + - "\x13retry_after_seconds\x18\x04 \x01(\x05R\x11retryAfterSeconds*`\n" + + "\x13retry_after_seconds\x18\x04 \x01(\x05R\x11retryAfterSeconds*\xb5\x01\n" + "\x0eAdminOperation\x12\x18\n" + "\x14ADMIN_OP_UNSPECIFIED\x10\x00\x12\x19\n" + "\x15ADMIN_OP_CREATE_TABLE\x10\x01\x12\x19\n" + - "\x15ADMIN_OP_DELETE_TABLE\x10\x022H\n" + + "\x15ADMIN_OP_DELETE_TABLE\x10\x02\x12\x1a\n" + + "\x16ADMIN_OP_CREATE_BUCKET\x10\x03\x12\x1a\n" + + "\x16ADMIN_OP_DELETE_BUCKET\x10\x04\x12\x1b\n" + + "\x17ADMIN_OP_PUT_BUCKET_ACL\x10\x052H\n" + "\fAdminForward\x128\n" + "\aForward\x12\x14.AdminForwardRequest\x1a\x15.AdminForwardResponse\"\x00B#Z!github.com/bootjp/elastickv/protob\x06proto3" diff --git a/proto/admin_forward.proto b/proto/admin_forward.proto index 99d2d47f8..009daea4a 100644 --- a/proto/admin_forward.proto +++ b/proto/admin_forward.proto @@ -45,6 +45,12 @@ enum AdminOperation { ADMIN_OP_UNSPECIFIED = 0; ADMIN_OP_CREATE_TABLE = 1; ADMIN_OP_DELETE_TABLE = 2; + // S3 bucket admin operations (P2 slice 2b). New values appended + // after the Dynamo block so the wire-format integers for the + // existing operations stay stable. + ADMIN_OP_CREATE_BUCKET = 3; + ADMIN_OP_DELETE_BUCKET = 4; + ADMIN_OP_PUT_BUCKET_ACL = 5; } message AdminForwardRequest { diff --git a/proto/admin_forward_grpc.pb.go b/proto/admin_forward_grpc.pb.go index 9b9169893..78356d823 100644 --- a/proto/admin_forward_grpc.pb.go +++ b/proto/admin_forward_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.6.1 -// - protoc v7.34.0 +// - protoc v5.29.3 // source: admin_forward.proto package proto