Skip to content

Commit ce4c6ee

Browse files
committed
admin: AdminForward integration for S3 admin ops (P2 slice 2b)
Slice 2b of P2 (docs/design/2026_04_24_proposed_admin_dashboard.md Section 3.3.2 + 4.1): a follower-side S3 admin write (POST /buckets, PUT /buckets/{name}/acl, DELETE /buckets/{name}) now hands off to the leader transparently, completing the same end-to-end forwarding contract Dynamo writes received in #644 + Stacked on #669 (P2 slice 2a). Once #669 merges, this rebases cleanly onto main. Proto (proto/admin_forward.proto): - Three new ADMIN_OP enum values (CREATE_BUCKET / DELETE_BUCKET / PUT_BUCKET_ACL) appended after the Dynamo block so existing wire-format integers stay stable. Regenerated with the pinned protoc 29.3 / protoc-gen-go 1.36.11 / protoc-gen-go-grpc 1.6.1. Leader-side ForwardServer (internal/admin/forward_server.go): - WithBucketsSource lets deployments wire the S3 dispatcher optionally — Dynamo-only builds keep the receiver nil and the three new operations return 501 NotImplemented. - Three new dispatch arms: handleCreateBucket / handleDeleteBucket / handlePutBucketAcl. Each one mirrors the leader-direct HTTP path's payload contract (NUL-byte rejection, 64 KiB limit, DisallowUnknownFields, trailing-token rejection, slash-in-name rejection) so a hostile follower cannot smuggle a payload past validations the leader-direct path enforces. - forwardBucketsErrorResponse mirrors forwardErrorResponse on the Dynamo side: ErrBucketsForbidden / NotLeader / NotFound / AlreadyExists / NotEmpty + ValidationError each map to the same HTTP status the leader-direct writeBucketsError produces, so forwarded and leader-direct responses are byte-for-byte indistinguishable from the SPA's view. - isStructuredSourceError extended to recognise the bucket sentinels so they are NOT logged at LevelError on the leader. - Forward's switch was extracted into dispatchForward to keep the parent function under cyclop's 10-branch ceiling as the operation enum grew. Follower-side LeaderForwarder (internal/admin/forward_client.go): - Interface gains ForwardCreateBucket / ForwardDeleteBucket / ForwardPutBucketAcl. PutBucketAcl carries both the bucket name (from the URL path) and the new ACL (from the request body) in one JSON payload — same approach handleDeleteBucket takes for the bucket name. - gRPCForwardClient methods reuse the existing forward() helper for transport, so connection-cache reuse and ErrLeaderUnavailable signalling behave identically across resource types. Handler integration (internal/admin/s3_handler.go): - New `forwarder LeaderForwarder` field + WithLeaderForwarder method. - handleCreate / handlePutACL / handleDelete now consult tryForward* helpers when the source returned ErrBucketsNotLeader; the helpers are gated on `errors.Is(err, ErrBucketsNotLeader) && forwarder != nil` so a leader-direct rejection (already-exists, not-found, etc.) is never re-applied at the leader. - writeForwardResult / writeForwardFailure mirror the Dynamo handler's pattern: nosniff + Cache-Control:no-store + Retry-After:1 on 503. ErrLeaderUnavailable does NOT log at LevelError (elections are routine); transport errors do log so operators can investigate. Wiring (main.go + main_admin_forward.go): - adminForwardServerDeps gains a `buckets` field; readyForRegistration still requires only TablesSource + RoleStore (so cluster-only or Dynamo-only builds keep registering Dynamo forwarding without S3). - runtimeServerRunner.start() now creates *adapter.S3Server BEFORE startRaftServers (in addition to dynamoServer) so the leader-side ForwardServer registration sees both adapters. The reorder is safe: each adapter listens on its own address and the raft TCP listeners are independent. - ServerDeps.Forwarder now plumbs through buildS3HandlerForDeps too, so the follower's S3Handler picks up the same LeaderForwarder instance the Dynamo handler does. Tests: - 9 forward-server tests covering the three new bucket operations: happy path / no-BucketsSource→501 / bad-JSON 400 / already-exists 409 / not-empty 409 / slash-in-name 400 / missing-acl 400 / payload-too-large 413 (sweep over all three ops). - 4 forward-client tests covering ForwardCreateBucket / ForwardDeleteBucket / ForwardPutBucketAcl happy-path payload shapes + ErrLeaderUnavailable on no-leader. - stubLeaderForwarder gains 3 bucket-side forward methods so existing dynamo tests still satisfy the LeaderForwarder interface, and the new stub fields let bucket-handler tests verify the forward arguments. - 6 handler integration tests on S3Handler.tryForward*: forwarded create / delete / put-acl happy paths (replay leader's status + payload + content-type), forwarder ErrLeaderUnavailable → 503 + Retry-After, transport-error → 503 + no leakage, and a 3-axis gate sweep proving the forwarder is NOT invoked on AlreadyExists / Forbidden / generic source errors. Closes design 3.3.2 acceptance criteria 2 (transparent forwarding) + 6 (forwarded_from in audit log) for S3 admin writes; criterion 3 (election-period 503 + retry) is also live for S3 because the existing tryForward helpers reuse the same fallback paths.
1 parent 5a67834 commit ce4c6ee

13 files changed

Lines changed: 889 additions & 48 deletions

internal/admin/forward_client.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,17 @@ type LeaderForwarder interface {
3232
ForwardCreateTable(ctx context.Context, principal AuthPrincipal, in CreateTableRequest) (*ForwardResult, error)
3333
// ForwardDeleteTable is the delete-side counterpart.
3434
ForwardDeleteTable(ctx context.Context, principal AuthPrincipal, name string) (*ForwardResult, error)
35+
// ForwardCreateBucket issues a forwarded POST /s3/buckets on
36+
// the leader's behalf. The leader echoes back the same JSON
37+
// envelope a leader-direct call would produce.
38+
ForwardCreateBucket(ctx context.Context, principal AuthPrincipal, in CreateBucketRequest) (*ForwardResult, error)
39+
// ForwardDeleteBucket is the delete-side counterpart.
40+
ForwardDeleteBucket(ctx context.Context, principal AuthPrincipal, name string) (*ForwardResult, error)
41+
// ForwardPutBucketAcl issues a forwarded PUT
42+
// /s3/buckets/{name}/acl. Both the bucket name and the
43+
// new ACL travel inside the proto payload — see the leader-side
44+
// handlePutBucketAcl for the JSON shape.
45+
ForwardPutBucketAcl(ctx context.Context, principal AuthPrincipal, name, acl string) (*ForwardResult, error)
3546
}
3647

3748
// ForwardResult is the leader's response replayed for the SPA. The
@@ -142,6 +153,43 @@ func (c *gRPCForwardClient) ForwardDeleteTable(ctx context.Context, principal Au
142153
return c.forward(ctx, pb.AdminOperation_ADMIN_OP_DELETE_TABLE, principal, payload)
143154
}
144155

156+
// ForwardCreateBucket serialises `in` as JSON and dispatches the
157+
// CreateBucket operation. Same contract as ForwardCreateTable —
158+
// gRPC errors are wrapped, ErrLeaderUnavailable on no-leader.
159+
func (c *gRPCForwardClient) ForwardCreateBucket(ctx context.Context, principal AuthPrincipal, in CreateBucketRequest) (*ForwardResult, error) {
160+
payload, err := json.Marshal(in)
161+
if err != nil {
162+
return nil, pkgerrors.Wrap(err, "admin forward: marshal create-bucket request")
163+
}
164+
return c.forward(ctx, pb.AdminOperation_ADMIN_OP_CREATE_BUCKET, principal, payload)
165+
}
166+
167+
// ForwardDeleteBucket serialises the bucket name as `{"name":"..."}`
168+
// to match the leader-side handleDeleteBucket contract.
169+
func (c *gRPCForwardClient) ForwardDeleteBucket(ctx context.Context, principal AuthPrincipal, name string) (*ForwardResult, error) {
170+
payload, err := json.Marshal(struct {
171+
Name string `json:"name"`
172+
}{Name: name})
173+
if err != nil {
174+
return nil, pkgerrors.Wrap(err, "admin forward: marshal delete-bucket request")
175+
}
176+
return c.forward(ctx, pb.AdminOperation_ADMIN_OP_DELETE_BUCKET, principal, payload)
177+
}
178+
179+
// ForwardPutBucketAcl carries both the bucket name (from the URL
180+
// path) and the new ACL (from the request body) in the proto
181+
// payload — see handlePutBucketAcl for the leader-side decode.
182+
func (c *gRPCForwardClient) ForwardPutBucketAcl(ctx context.Context, principal AuthPrincipal, name, acl string) (*ForwardResult, error) {
183+
payload, err := json.Marshal(struct {
184+
Name string `json:"name"`
185+
ACL string `json:"acl"`
186+
}{Name: name, ACL: acl})
187+
if err != nil {
188+
return nil, pkgerrors.Wrap(err, "admin forward: marshal put-bucket-acl request")
189+
}
190+
return c.forward(ctx, pb.AdminOperation_ADMIN_OP_PUT_BUCKET_ACL, principal, payload)
191+
}
192+
145193
func (c *gRPCForwardClient) forward(ctx context.Context, op pb.AdminOperation, principal AuthPrincipal, payload []byte) (*ForwardResult, error) {
146194
addr := c.resolver()
147195
if addr == "" {

internal/admin/forward_client_test.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,3 +182,72 @@ func TestGRPCForwardClient_FillsMissingContentType(t *testing.T) {
182182
CreateTableRequest{TableName: "t", PartitionKey: CreateTableAttribute{Name: "id", Type: "S"}})
183183
require.Equal(t, "application/json; charset=utf-8", res.ContentType)
184184
}
185+
186+
func TestGRPCForwardClient_ForwardCreateBucket_HappyPath(t *testing.T) {
187+
conn := &stubForwardConn{resp: &pb.AdminForwardResponse{
188+
StatusCode: http.StatusCreated,
189+
Payload: []byte(`{"bucket_name":"public-assets","acl":"public-read"}`),
190+
ContentType: "application/json; charset=utf-8",
191+
}}
192+
dial := &stubConnFactory{conn: conn}
193+
fwd, err := NewGRPCForwardClient(fixedResolver("leader.local:7000"), dial, "follower-2")
194+
require.NoError(t, err)
195+
196+
in := CreateBucketRequest{BucketName: "public-assets", ACL: "public-read"}
197+
res, err := fwd.ForwardCreateBucket(context.Background(),
198+
AuthPrincipal{AccessKey: "AKIA_F", Role: RoleFull}, in)
199+
require.NoError(t, err)
200+
require.Equal(t, http.StatusCreated, res.StatusCode)
201+
require.Equal(t, pb.AdminOperation_ADMIN_OP_CREATE_BUCKET, conn.lastReq.GetOperation())
202+
require.Equal(t, "follower-2", conn.lastReq.GetForwardedFrom())
203+
var roundtripped CreateBucketRequest
204+
require.NoError(t, json.Unmarshal(conn.lastReq.GetPayload(), &roundtripped))
205+
require.Equal(t, in, roundtripped)
206+
}
207+
208+
func TestGRPCForwardClient_ForwardDeleteBucket_HappyPath(t *testing.T) {
209+
conn := &stubForwardConn{resp: &pb.AdminForwardResponse{
210+
StatusCode: http.StatusNoContent,
211+
ContentType: "application/json; charset=utf-8",
212+
}}
213+
dial := &stubConnFactory{conn: conn}
214+
fwd, _ := NewGRPCForwardClient(fixedResolver("leader.local:7000"), dial, "follower-3")
215+
216+
res, err := fwd.ForwardDeleteBucket(context.Background(),
217+
AuthPrincipal{AccessKey: "AKIA_F", Role: RoleFull}, "orders")
218+
require.NoError(t, err)
219+
require.Equal(t, http.StatusNoContent, res.StatusCode)
220+
require.Equal(t, pb.AdminOperation_ADMIN_OP_DELETE_BUCKET, conn.lastReq.GetOperation())
221+
require.JSONEq(t, `{"name":"orders"}`, string(conn.lastReq.GetPayload()))
222+
}
223+
224+
func TestGRPCForwardClient_ForwardPutBucketAcl_HappyPath(t *testing.T) {
225+
conn := &stubForwardConn{resp: &pb.AdminForwardResponse{
226+
StatusCode: http.StatusNoContent,
227+
ContentType: "application/json; charset=utf-8",
228+
}}
229+
dial := &stubConnFactory{conn: conn}
230+
fwd, _ := NewGRPCForwardClient(fixedResolver("leader.local:7000"), dial, "follower-4")
231+
232+
res, err := fwd.ForwardPutBucketAcl(context.Background(),
233+
AuthPrincipal{AccessKey: "AKIA_F", Role: RoleFull}, "orders", "public-read")
234+
require.NoError(t, err)
235+
require.Equal(t, http.StatusNoContent, res.StatusCode)
236+
require.Equal(t, pb.AdminOperation_ADMIN_OP_PUT_BUCKET_ACL, conn.lastReq.GetOperation())
237+
// Both name and acl travel in the payload — see handlePutBucketAcl
238+
// for the leader-side decode contract.
239+
require.JSONEq(t, `{"name":"orders","acl":"public-read"}`, string(conn.lastReq.GetPayload()))
240+
}
241+
242+
func TestGRPCForwardClient_BucketOps_NoLeaderReturnsErrLeaderUnavailable(t *testing.T) {
243+
dial := &stubConnFactory{conn: &stubForwardConn{}}
244+
fwd, _ := NewGRPCForwardClient(fixedResolver(""), dial, "f")
245+
246+
_, err := fwd.ForwardCreateBucket(context.Background(), AuthPrincipal{Role: RoleFull},
247+
CreateBucketRequest{BucketName: "x"})
248+
require.ErrorIs(t, err, ErrLeaderUnavailable)
249+
_, err = fwd.ForwardDeleteBucket(context.Background(), AuthPrincipal{Role: RoleFull}, "x")
250+
require.ErrorIs(t, err, ErrLeaderUnavailable)
251+
_, err = fwd.ForwardPutBucketAcl(context.Background(), AuthPrincipal{Role: RoleFull}, "x", "private")
252+
require.ErrorIs(t, err, ErrLeaderUnavailable)
253+
}

internal/admin/forward_integration_test.go

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,28 @@ type stubLeaderForwarder struct {
2020
createErr error
2121
deleteRes *ForwardResult
2222
deleteErr error
23-
24-
lastCreatePrincipal AuthPrincipal
25-
lastCreateInput CreateTableRequest
26-
lastDeletePrincipal AuthPrincipal
27-
lastDeleteName string
23+
// Bucket-side counterparts ship with P2 slice 2b. Tests that
24+
// only exercise Dynamo behaviour leave these zero values; the
25+
// stub honours the same nil-result-and-no-error contract the
26+
// table side uses on a "did not call us" path.
27+
createBucketRes *ForwardResult
28+
createBucketErr error
29+
deleteBucketRes *ForwardResult
30+
deleteBucketErr error
31+
putACLRes *ForwardResult
32+
putACLErr error
33+
34+
lastCreatePrincipal AuthPrincipal
35+
lastCreateInput CreateTableRequest
36+
lastDeletePrincipal AuthPrincipal
37+
lastDeleteName string
38+
lastCreateBucketPrincipal AuthPrincipal
39+
lastCreateBucketInput CreateBucketRequest
40+
lastDeleteBucketPrincipal AuthPrincipal
41+
lastDeleteBucketName string
42+
lastPutACLPrincipal AuthPrincipal
43+
lastPutACLBucket string
44+
lastPutACLValue string
2845
}
2946

3047
func (s *stubLeaderForwarder) ForwardCreateTable(_ context.Context, principal AuthPrincipal, in CreateTableRequest) (*ForwardResult, error) {
@@ -45,6 +62,34 @@ func (s *stubLeaderForwarder) ForwardDeleteTable(_ context.Context, principal Au
4562
return s.deleteRes, nil
4663
}
4764

65+
func (s *stubLeaderForwarder) ForwardCreateBucket(_ context.Context, principal AuthPrincipal, in CreateBucketRequest) (*ForwardResult, error) {
66+
s.lastCreateBucketPrincipal = principal
67+
s.lastCreateBucketInput = in
68+
if s.createBucketErr != nil {
69+
return nil, s.createBucketErr
70+
}
71+
return s.createBucketRes, nil
72+
}
73+
74+
func (s *stubLeaderForwarder) ForwardDeleteBucket(_ context.Context, principal AuthPrincipal, name string) (*ForwardResult, error) {
75+
s.lastDeleteBucketPrincipal = principal
76+
s.lastDeleteBucketName = name
77+
if s.deleteBucketErr != nil {
78+
return nil, s.deleteBucketErr
79+
}
80+
return s.deleteBucketRes, nil
81+
}
82+
83+
func (s *stubLeaderForwarder) ForwardPutBucketAcl(_ context.Context, principal AuthPrincipal, name, acl string) (*ForwardResult, error) {
84+
s.lastPutACLPrincipal = principal
85+
s.lastPutACLBucket = name
86+
s.lastPutACLValue = acl
87+
if s.putACLErr != nil {
88+
return nil, s.putACLErr
89+
}
90+
return s.putACLRes, nil
91+
}
92+
4893
// notLeaderSource is a TablesSource that always returns
4994
// ErrTablesNotLeader on writes — i.e., it simulates the local
5095
// node being a follower. Used to exercise the forwarder path

0 commit comments

Comments
 (0)