@@ -107,8 +107,20 @@ func (s *ForwardServer) Forward(ctx context.Context, req *pb.AdminForwardRequest
107107// handler. Pulled out so Forward stays under the cyclomatic ceiling
108108// as the operation enum grows; the principal-validation +
109109// forwarded_from sanitisation logic stays in Forward where it belongs.
110+ //
111+ // Source-availability checks live in checkOpAvailability rather than
112+ // in each handler: a Dynamo-only build has s.source != nil but
113+ // s.buckets == nil, and an S3-only build (Codex P1 on PR #673) has
114+ // the inverse. Centralising the check means every operation gets a
115+ // consistent 501 error shape and a future op cannot ship without the
116+ // operator-visible "not configured" message that the existing ops
117+ // promise.
110118func (s * ForwardServer ) dispatchForward (ctx context.Context , principal AuthPrincipal , forwardedFrom string , req * pb.AdminForwardRequest ) (* pb.AdminForwardResponse , error ) {
111- switch req .GetOperation () {
119+ op := req .GetOperation ()
120+ if resp , err , ok := s .checkOpAvailability (op ); ! ok {
121+ return resp , err
122+ }
123+ switch op {
112124 case pb .AdminOperation_ADMIN_OP_CREATE_TABLE :
113125 return s .handleCreate (ctx , principal , forwardedFrom , req )
114126 case pb .AdminOperation_ADMIN_OP_DELETE_TABLE :
@@ -126,6 +138,43 @@ func (s *ForwardServer) dispatchForward(ctx context.Context, principal AuthPrinc
126138 }
127139}
128140
141+ // checkOpAvailability returns (resp, err, true) when dispatchForward
142+ // should continue to the per-op handler, or (resp, err, false) when
143+ // the leader's build does not include the source the requested
144+ // operation needs (S3-only deployment served a Dynamo op, or vice
145+ // versa). Pulling the per-op switch out keeps dispatchForward's
146+ // cyclomatic count under the linter ceiling as the enum grows.
147+ func (s * ForwardServer ) checkOpAvailability (op pb.AdminOperation ) (* pb.AdminForwardResponse , error , bool ) {
148+ switch op {
149+ case pb .AdminOperation_ADMIN_OP_CREATE_TABLE , pb .AdminOperation_ADMIN_OP_DELETE_TABLE :
150+ if s .source == nil {
151+ resp , err := notImplementedForwardResponse ("DynamoDB" )
152+ return resp , err , false
153+ }
154+ case pb .AdminOperation_ADMIN_OP_CREATE_BUCKET ,
155+ pb .AdminOperation_ADMIN_OP_DELETE_BUCKET ,
156+ pb .AdminOperation_ADMIN_OP_PUT_BUCKET_ACL :
157+ if s .buckets == nil {
158+ resp , err := notImplementedForwardResponse ("S3" )
159+ return resp , err , false
160+ }
161+ case pb .AdminOperation_ADMIN_OP_UNSPECIFIED :
162+ // Unknown-op rejection is dispatchForward's responsibility,
163+ // not this gate's. Falling through to ok=true lets the main
164+ // switch's default branch produce the canonical 400 message.
165+ }
166+ return nil , nil , true
167+ }
168+
169+ // notImplementedForwardResponse produces the 501 response a follower
170+ // receives when the leader is built without the source for this
171+ // operation's surface. surface is the human-facing label that ends
172+ // up in the error message ("DynamoDB" or "S3").
173+ func notImplementedForwardResponse (surface string ) (* pb.AdminForwardResponse , error ) {
174+ return rejectForward (http .StatusNotImplemented , "not_implemented" ,
175+ surface + " admin forwarding is not configured on this leader" )
176+ }
177+
129178func (s * ForwardServer ) validatePrincipal (p * pb.AdminPrincipal ) (AuthPrincipal , bool ) {
130179 accessKey := p .GetAccessKey ()
131180 if accessKey == "" {
@@ -182,65 +231,24 @@ func (s *ForwardServer) handleDelete(ctx context.Context, principal AuthPrincipa
182231 // proto stays operation-agnostic — there is no operation-specific
183232 // field in AdminForwardRequest, by design (adding one per op
184233 // would couple every new admin endpoint to the proto schema).
185- payload := req .GetPayload ()
186- if len (payload ) > adminForwardPayloadLimit {
187- return rejectForward (http .StatusRequestEntityTooLarge , "payload_too_large" ,
188- "forwarded payload exceeds the 64 KiB admin limit" )
189- }
190- // Mirror decodeCreateTableRequest's NUL-byte guard: goccy/go-json
191- // treats raw NUL as end-of-input so dec.More() would otherwise
192- // miss `{"name":"users"}\x00{"extra":1}` payloads. Codex P2 on
193- // PR #635 flagged this as the same smuggling vector that the
194- // HTTP create path already covers.
195- if bytes .IndexByte (payload , 0 ) >= 0 {
196- return rejectForward (http .StatusBadRequest , "invalid_body" , "delete payload contains a NUL byte" )
197- }
198- dec := json .NewDecoder (bytes .NewReader (payload ))
199- dec .DisallowUnknownFields ()
200- var body struct {
201- Name string `json:"name"`
202- }
203- if err := dec .Decode (& body ); err != nil {
204- return rejectForward (http .StatusBadRequest , "invalid_body" , "delete payload is not valid JSON" )
205- }
206- if dec .More () {
207- return rejectForward (http .StatusBadRequest , "invalid_body" , "delete payload has trailing data" )
234+ name , rejection , err := decodeNamedPayload (req .GetPayload (), "delete" )
235+ if rejection != nil || err != nil {
236+ return rejection , err
208237 }
209- if body .Name == "" {
210- return rejectForward (http .StatusBadRequest , "invalid_body" , "delete payload missing name" )
211- }
212- // Reject slash-bearing names symmetrically with the HTTP
213- // handleDelete and handleDescribe paths. Without this, a
214- // forwarded call could act on `foo/bar` while a leader-direct
215- // call would 404 — divergent behaviour Codex P2 flagged on
216- // PR #635.
217- if strings .ContainsRune (body .Name , '/' ) {
218- return rejectForward (http .StatusBadRequest , "invalid_body" , "delete payload name must not contain '/'" )
219- }
220- if err := s .source .AdminDeleteTable (ctx , principal , body .Name ); err != nil {
221- s .logUnexpectedSourceError (ctx , "delete_table" , body .Name , forwardedFrom , err )
238+ if err := s .source .AdminDeleteTable (ctx , principal , name ); err != nil {
239+ s .logUnexpectedSourceError (ctx , "delete_table" , name , forwardedFrom , err )
222240 return forwardErrorResponse ("delete" , err ), nil
223241 }
224- s .logger .LogAttrs (ctx , slog .LevelInfo , "admin_audit" ,
225- slog .String ("actor" , principal .AccessKey ),
226- slog .String ("role" , string (principal .Role )),
227- slog .String ("forwarded_from" , forwardedFrom ),
228- slog .String ("operation" , "delete_table" ),
229- slog .String ("table" , body .Name ),
230- )
242+ s .auditDeleteSuccess (ctx , principal , forwardedFrom , "delete_table" , "table" , name )
231243 return & pb.AdminForwardResponse {StatusCode : http .StatusNoContent }, nil
232244}
233245
234246// handleCreateBucket dispatches a forwarded POST /s3/buckets call.
235247// Mirrors handleCreate (Dynamo) but decodes a CreateBucketRequest
236- // and routes through BucketsSource. Returns 501 NotImplemented when
237- // no BucketsSource is wired so a follower whose S3 surface was
238- // disabled gets a deterministic error instead of a generic 500 .
248+ // and routes through BucketsSource. dispatchForward gates this on
249+ // s.buckets != nil so callers reach here only when S3 forwarding is
250+ // configured .
239251func (s * ForwardServer ) handleCreateBucket (ctx context.Context , principal AuthPrincipal , forwardedFrom string , req * pb.AdminForwardRequest ) (* pb.AdminForwardResponse , error ) {
240- if s .buckets == nil {
241- return rejectForward (http .StatusNotImplemented , "not_implemented" ,
242- "S3 admin forwarding is not configured on this leader" )
243- }
244252 payload := req .GetPayload ()
245253 if len (payload ) > adminForwardPayloadLimit {
246254 return rejectForward (http .StatusRequestEntityTooLarge , "payload_too_large" ,
@@ -291,52 +299,33 @@ func (s *ForwardServer) handleCreateBucket(ctx context.Context, principal AuthPr
291299// a JSON object with a single "name" field, which the bridge
292300// generates from the URL path.
293301func (s * ForwardServer ) handleDeleteBucket (ctx context.Context , principal AuthPrincipal , forwardedFrom string , req * pb.AdminForwardRequest ) (* pb.AdminForwardResponse , error ) {
294- if s . buckets == nil {
295- return rejectForward ( http . StatusNotImplemented , "not_implemented" ,
296- "S3 admin forwarding is not configured on this leader" )
302+ name , rejection , err := decodeNamedPayload ( req . GetPayload (), "delete-bucket" )
303+ if rejection != nil || err != nil {
304+ return rejection , err
297305 }
298- payload := req .GetPayload ()
299- if len (payload ) > adminForwardPayloadLimit {
300- return rejectForward (http .StatusRequestEntityTooLarge , "payload_too_large" ,
301- "forwarded payload exceeds the 64 KiB admin limit" )
302- }
303- if bytes .IndexByte (payload , 0 ) >= 0 {
304- return rejectForward (http .StatusBadRequest , "invalid_body" ,
305- "delete-bucket payload contains a NUL byte" )
306- }
307- dec := json .NewDecoder (bytes .NewReader (payload ))
308- dec .DisallowUnknownFields ()
309- var body struct {
310- Name string `json:"name"`
311- }
312- if err := dec .Decode (& body ); err != nil {
313- return rejectForward (http .StatusBadRequest , "invalid_body" ,
314- "delete-bucket payload is not valid JSON" )
315- }
316- if dec .More () {
317- return rejectForward (http .StatusBadRequest , "invalid_body" ,
318- "delete-bucket payload has trailing data" )
319- }
320- if body .Name == "" {
321- return rejectForward (http .StatusBadRequest , "invalid_body" ,
322- "delete-bucket payload missing name" )
323- }
324- if strings .ContainsRune (body .Name , '/' ) {
325- return rejectForward (http .StatusBadRequest , "invalid_body" ,
326- "delete-bucket payload name must not contain '/'" )
327- }
328- if err := s .buckets .AdminDeleteBucket (ctx , principal , body .Name ); err != nil {
329- s .logUnexpectedSourceError (ctx , "delete_bucket" , body .Name , forwardedFrom , err )
306+ if err := s .buckets .AdminDeleteBucket (ctx , principal , name ); err != nil {
307+ s .logUnexpectedSourceError (ctx , "delete_bucket" , name , forwardedFrom , err )
330308 return forwardBucketsErrorResponse ("delete" , err ), nil
331309 }
310+ s .auditDeleteSuccess (ctx , principal , forwardedFrom , "delete_bucket" , "bucket" , name )
311+ return & pb.AdminForwardResponse {StatusCode : http .StatusNoContent }, nil
312+ }
313+
314+ // auditDeleteSuccess emits the admin_audit slog line both Dynamo and
315+ // S3 forwarded delete handlers need. Centralised so handleDelete and
316+ // handleDeleteBucket do not diverge on the field set, and so a future
317+ // handler that mirrors the same delete shape (e.g. delete-namespace)
318+ // keeps the audit-log contract by reusing this helper rather than
319+ // re-emitting a hand-rolled subset. resourceField is "table" or
320+ // "bucket"; opLabel is the audit "operation" value.
321+ func (s * ForwardServer ) auditDeleteSuccess (ctx context.Context , principal AuthPrincipal , forwardedFrom , opLabel , resourceField , name string ) {
332322 s .logger .LogAttrs (ctx , slog .LevelInfo , "admin_audit" ,
333323 slog .String ("actor" , principal .AccessKey ),
334324 slog .String ("role" , string (principal .Role )),
335325 slog .String ("forwarded_from" , forwardedFrom ),
336- slog .String ("operation" , "delete_bucket" ),
337- slog .String ("bucket" , body . Name ),
326+ slog .String ("operation" , opLabel ),
327+ slog .String (resourceField , name ),
338328 )
339- return & pb.AdminForwardResponse {StatusCode : http .StatusNoContent }, nil
340329}
341330
342331// handlePutBucketAcl dispatches a forwarded PUT
@@ -345,10 +334,6 @@ func (s *ForwardServer) handleDeleteBucket(ctx context.Context, principal AuthPr
345334// operation-agnostic — same approach handleDeleteBucket takes for
346335// the bucket name.
347336func (s * ForwardServer ) handlePutBucketAcl (ctx context.Context , principal AuthPrincipal , forwardedFrom string , req * pb.AdminForwardRequest ) (* pb.AdminForwardResponse , error ) {
348- if s .buckets == nil {
349- return rejectForward (http .StatusNotImplemented , "not_implemented" ,
350- "S3 admin forwarding is not configured on this leader" )
351- }
352337 payload := req .GetPayload ()
353338 if len (payload ) > adminForwardPayloadLimit {
354339 return rejectForward (http .StatusRequestEntityTooLarge , "payload_too_large" ,
@@ -458,6 +443,64 @@ func sanitiseForwardedFrom(s string) string {
458443 }, s )
459444}
460445
446+ // decodeNamedPayload validates and decodes the {"name": "..."} JSON
447+ // shape both the Dynamo and S3 delete forwarders accept. Returns the
448+ // decoded name on success, or a populated rejection response (and
449+ // nil name) on a 400 / 413. opLabel is the human-facing prefix that
450+ // goes into the rejection messages ("delete" for Dynamo,
451+ // "delete-bucket" for S3) so the response identifies which path
452+ // rejected — and so a future op (e.g. "describe-bucket") that
453+ // reuses this helper still produces an actionable error.
454+ //
455+ // All four guards mirror the leader-direct HTTP path:
456+ // - 64 KiB payload cap (matches adminForwardPayloadLimit elsewhere)
457+ // - NUL-byte rejection (goccy/go-json treats raw NUL as end-of-
458+ // input; without this guard `{"name":"x"}\x00{"extra":1}`
459+ // payloads slip past dec.More(); Codex P2 on PR #635)
460+ // - DisallowUnknownFields + dec.More() trailing-token rejection
461+ // - empty + slash-bearing name rejection (the HTTP handlers
462+ // already 404 slash-bearing names; the forwarded path has to
463+ // reject symmetrically or a hostile follower could act on
464+ // `foo/bar` while a leader-direct call would not).
465+ func decodeNamedPayload (payload []byte , opLabel string ) (string , * pb.AdminForwardResponse , error ) {
466+ if len (payload ) > adminForwardPayloadLimit {
467+ resp , err := rejectForward (http .StatusRequestEntityTooLarge , "payload_too_large" ,
468+ "forwarded payload exceeds the 64 KiB admin limit" )
469+ return "" , resp , err
470+ }
471+ if bytes .IndexByte (payload , 0 ) >= 0 {
472+ resp , err := rejectForward (http .StatusBadRequest , "invalid_body" ,
473+ opLabel + " payload contains a NUL byte" )
474+ return "" , resp , err
475+ }
476+ dec := json .NewDecoder (bytes .NewReader (payload ))
477+ dec .DisallowUnknownFields ()
478+ var body struct {
479+ Name string `json:"name"`
480+ }
481+ if err := dec .Decode (& body ); err != nil {
482+ resp , rerr := rejectForward (http .StatusBadRequest , "invalid_body" ,
483+ opLabel + " payload is not valid JSON" )
484+ return "" , resp , rerr
485+ }
486+ if dec .More () {
487+ resp , err := rejectForward (http .StatusBadRequest , "invalid_body" ,
488+ opLabel + " payload has trailing data" )
489+ return "" , resp , err
490+ }
491+ if body .Name == "" {
492+ resp , err := rejectForward (http .StatusBadRequest , "invalid_body" ,
493+ opLabel + " payload missing name" )
494+ return "" , resp , err
495+ }
496+ if strings .ContainsRune (body .Name , '/' ) {
497+ resp , err := rejectForward (http .StatusBadRequest , "invalid_body" ,
498+ opLabel + " payload name must not contain '/'" )
499+ return "" , resp , err
500+ }
501+ return body .Name , nil , nil
502+ }
503+
461504// forwardErrorResponse re-encodes a TablesSource error in the
462505// structured shape the follower's handler can re-emit verbatim. This
463506// is the leader-side counterpart of writeTablesError: every status /
0 commit comments