-
Notifications
You must be signed in to change notification settings - Fork 2
admin: S3 bucket write endpoints (P2 slice 2a) #669
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
4b2e028
b40cb19
fa73720
f6e5239
e911458
84a764a
ee243c8
8c2fa3e
1133178
0813968
35df946
885eb50
cadffe4
55c93c8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,8 +4,10 @@ import ( | |
| "bytes" | ||
| "context" | ||
| "sort" | ||
| "strings" | ||
|
|
||
| "github.com/bootjp/elastickv/internal/s3keys" | ||
| "github.com/bootjp/elastickv/kv" | ||
| "github.com/bootjp/elastickv/store" | ||
| "github.com/cockroachdb/errors" | ||
| ) | ||
|
|
@@ -175,3 +177,265 @@ func summaryFromBucketMeta(name string, meta *s3BucketMeta) AdminBucketSummary { | |
| Owner: meta.Owner, | ||
| } | ||
| } | ||
|
|
||
| // Sentinel errors the admin write methods return so the bridge in | ||
| // main_admin.go can translate them into admin-package vocabulary | ||
| // without sniffing strings. Named separately from | ||
| // ErrAdminTableAlreadyExists / ErrAdminTableNotFound on the Dynamo | ||
| // side so a future per-resource role / status divergence does not | ||
| // require renaming both packages' callers. | ||
| var ( | ||
| // ErrAdminBucketAlreadyExists signals that AdminCreateBucket | ||
| // targeted a name already in use. Maps to 409 Conflict. | ||
| ErrAdminBucketAlreadyExists = errors.New("s3 admin: bucket already exists") | ||
| // ErrAdminBucketNotFound signals that AdminDeleteBucket / | ||
| // AdminPutBucketAcl targeted a missing bucket. Maps to 404. | ||
| ErrAdminBucketNotFound = errors.New("s3 admin: bucket not found") | ||
| // ErrAdminBucketNotEmpty signals that AdminDeleteBucket targeted | ||
| // a bucket that still has objects. Maps to 409 Conflict to match | ||
| // the SigV4 path's BucketNotEmpty response (the dashboard cannot | ||
| // force a recursive delete; the operator must clean up first). | ||
| ErrAdminBucketNotEmpty = errors.New("s3 admin: bucket is not empty") | ||
| // ErrAdminInvalidBucketName signals that AdminCreateBucket got | ||
| // a name that does not satisfy validateS3BucketName. Maps to 400. | ||
| ErrAdminInvalidBucketName = errors.New("s3 admin: invalid bucket name") | ||
| // ErrAdminInvalidACL signals that the ACL string did not pass | ||
| // validateS3CannedAcl. Maps to 400 (the SigV4 path returns 501 | ||
| // NotImplemented for unsupported canned ACLs, but the admin API | ||
| // is documented as private/public-read only and rejecting other | ||
| // values as invalid input is a more useful contract for the | ||
| // dashboard). | ||
| ErrAdminInvalidACL = errors.New("s3 admin: invalid ACL") | ||
| ) | ||
|
|
||
| // AdminCreateBucket creates a bucket on behalf of the admin | ||
| // dashboard. The principal MUST be re-validated by the caller (the | ||
| // admin HTTP handler does this against the live RoleStore); this | ||
| // method enforces the authorisation invariant a second time so a | ||
| // follower-forwarded call cannot smuggle a read-only principal past | ||
| // the check on the leader side (Section 3.2 "認可の真実は常に | ||
| // adapter 側"). | ||
| // | ||
| // The transaction is atomic: bucket meta + generation + ACL all land | ||
| // in a single OperationGroup, mirroring the SigV4 createBucket path. | ||
| // On success returns the freshly-stored summary; on conflict returns | ||
| // ErrAdminBucketAlreadyExists; on a non-leader / non-full-role / bad | ||
| // input returns the corresponding sentinel. | ||
| func (s *S3Server) AdminCreateBucket(ctx context.Context, principal AdminPrincipal, name, acl string) (*AdminBucketSummary, error) { | ||
| if !principal.Role.canWrite() { | ||
| return nil, ErrAdminForbidden | ||
| } | ||
| if !s.isVerifiedS3Leader() { | ||
| return nil, ErrAdminNotLeader | ||
| } | ||
| if err := validateS3BucketName(name); err != nil { | ||
| return nil, errors.Wrapf(ErrAdminInvalidBucketName, "%s", err.Error()) | ||
| } | ||
| acl = adminCanonicalACL(acl) | ||
| if err := validateS3CannedAcl(acl); err != nil { | ||
| return nil, errors.Wrapf(ErrAdminInvalidACL, "%s", err.Error()) | ||
| } | ||
|
|
||
| var summary *AdminBucketSummary | ||
| err := s.retryS3Mutation(ctx, func() error { | ||
| out, err := s.adminCreateBucketTxn(ctx, principal, name, acl) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| summary = out | ||
| return nil | ||
| }) | ||
| if err != nil { | ||
| return nil, err //nolint:wrapcheck // sentinel errors propagate as-is; structured errors are already wrapped above. | ||
| } | ||
| return summary, nil | ||
| } | ||
|
|
||
| // adminCreateBucketTxn is the per-attempt body retryS3Mutation | ||
| // invokes. Pulled out so AdminCreateBucket stays under the | ||
| // cyclomatic ceiling without hiding the bucket-existence / | ||
| // generation / commit-ts dance — every step has a meaningful | ||
| // error path that the wrapping retry harness needs to see. | ||
| func (s *S3Server) adminCreateBucketTxn(ctx context.Context, principal AdminPrincipal, name, acl string) (*AdminBucketSummary, error) { | ||
| readTS := s.readTS() | ||
| startTS := s.txnStartTS(readTS) | ||
| readPin := s.pinReadTS(readTS) | ||
| defer readPin.Release() | ||
|
|
||
| existing, exists, err := s.loadBucketMetaAt(ctx, name, readTS) | ||
| if err != nil { | ||
| return nil, errors.WithStack(err) | ||
| } | ||
| if exists && existing != nil { | ||
| return nil, ErrAdminBucketAlreadyExists | ||
| } | ||
| nextGeneration, err := s.nextBucketGenerationAt(ctx, name, readTS) | ||
| if err != nil { | ||
| return nil, errors.WithStack(err) | ||
| } | ||
| commitTS, err := s.nextTxnCommitTS(startTS) | ||
| if err != nil { | ||
| return nil, errors.WithStack(err) | ||
| } | ||
| meta := &s3BucketMeta{ | ||
| BucketName: name, | ||
| Generation: nextGeneration, | ||
| CreatedAtHLC: commitTS, | ||
| Region: s.effectiveRegion(), | ||
| Owner: principal.AccessKey, | ||
| Acl: acl, | ||
| } | ||
| body, err := encodeS3BucketMeta(meta) | ||
| if err != nil { | ||
| return nil, errors.WithStack(err) | ||
| } | ||
| _, err = s.coordinator.Dispatch(ctx, &kv.OperationGroup[kv.OP]{ | ||
| IsTxn: true, | ||
| StartTS: startTS, | ||
| CommitTS: commitTS, | ||
| Elems: []*kv.Elem[kv.OP]{ | ||
| {Op: kv.Put, Key: s3keys.BucketMetaKey(name), Value: body}, | ||
| {Op: kv.Put, Key: s3keys.BucketGenerationKey(name), Value: encodeS3Generation(nextGeneration)}, | ||
| }, | ||
| }) | ||
| if err != nil { | ||
| return nil, errors.WithStack(err) | ||
| } | ||
| out := summaryFromBucketMeta(name, meta) | ||
| return &out, nil | ||
| } | ||
|
|
||
| // AdminPutBucketAcl swaps the canned ACL on an existing bucket. | ||
| // Same authorisation contract as AdminCreateBucket. Mutates only | ||
| // the meta.Acl field; generation is preserved so existing object | ||
| // references stay valid. | ||
| func (s *S3Server) AdminPutBucketAcl(ctx context.Context, principal AdminPrincipal, name, acl string) error { | ||
| if !principal.Role.canWrite() { | ||
| return ErrAdminForbidden | ||
| } | ||
| if !s.isVerifiedS3Leader() { | ||
| return ErrAdminNotLeader | ||
| } | ||
| acl = adminCanonicalACL(acl) | ||
| if err := validateS3CannedAcl(acl); err != nil { | ||
| return errors.Wrapf(ErrAdminInvalidACL, "%s", err.Error()) | ||
| } | ||
|
|
||
| err := s.retryS3Mutation(ctx, func() error { | ||
| readTS := s.readTS() | ||
| startTS := s.txnStartTS(readTS) | ||
| readPin := s.pinReadTS(readTS) | ||
| defer readPin.Release() | ||
|
|
||
| meta, exists, err := s.loadBucketMetaAt(ctx, name, readTS) | ||
| if err != nil { | ||
| return errors.WithStack(err) | ||
| } | ||
| if !exists || meta == nil { | ||
| return ErrAdminBucketNotFound | ||
| } | ||
| meta.Acl = acl | ||
| body, err := encodeS3BucketMeta(meta) | ||
| if err != nil { | ||
| return errors.WithStack(err) | ||
| } | ||
| _, err = s.coordinator.Dispatch(ctx, &kv.OperationGroup[kv.OP]{ | ||
| IsTxn: true, | ||
| StartTS: startTS, | ||
| Elems: []*kv.Elem[kv.OP]{ | ||
| {Op: kv.Put, Key: s3keys.BucketMetaKey(name), Value: body}, | ||
| }, | ||
| }) | ||
| return errors.WithStack(err) | ||
| }) | ||
| if err != nil { | ||
| return err //nolint:wrapcheck // sentinel errors propagate as-is. | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // AdminDeleteBucket removes a bucket if it is empty. Same | ||
| // authorisation contract as the other admin write methods. The | ||
| // bucket-must-be-empty rule mirrors the SigV4 deleteBucket path — | ||
| // the dashboard cannot force a recursive delete, by design. | ||
| // | ||
| // Known orphan-race limitation (coderabbitai 🔴 / 🟠 on PR #669): | ||
| // the empty-bucket probe (ScanAt with limit=1 on | ||
| // ObjectManifestPrefixForBucket) reads at readTS but the | ||
| // subsequent BucketMetaKey delete only carries that single point | ||
| // key in its ReadKeys set. A concurrent PutObject that inserts a | ||
| // manifest key in the scanned prefix between readTS and the | ||
| // delete's commitTS will not conflict — the OCC validator only | ||
| // inspects keys that appear in ReadKeys, and there is no | ||
| // ReadRanges mechanism today. The object's manifest key survives | ||
| // under a now-deleted bucket meta and becomes orphaned. | ||
| // | ||
| // This race exists pre-existing in the SigV4 path | ||
| // (adapter/s3.go:deleteBucket — same shape, same limitation), so | ||
| // AdminDeleteBucket inherits the contract; closing the gap | ||
| // requires either (a) bumping BucketGenerationKey on every | ||
| // PutObject so it can serve as an OCC token in this read set, or | ||
| // (b) extending OperationGroup with ReadRanges and teaching the | ||
| // FSM to validate range emptiness atomically with commit. Both | ||
| // are larger changes outside this PR's scope; tracked in | ||
| // docs/design/2026_04_24_partial_admin_dashboard.md under the | ||
| // Outstanding open items section. Operators concerned about the | ||
| // orphan window today should pause writes against the target | ||
| // bucket before issuing the admin delete. | ||
| func (s *S3Server) AdminDeleteBucket(ctx context.Context, principal AdminPrincipal, name string) error { | ||
| if !principal.Role.canWrite() { | ||
| return ErrAdminForbidden | ||
| } | ||
| if !s.isVerifiedS3Leader() { | ||
| return ErrAdminNotLeader | ||
| } | ||
|
|
||
| err := s.retryS3Mutation(ctx, func() error { | ||
| readTS := s.readTS() | ||
| startTS := s.txnStartTS(readTS) | ||
| readPin := s.pinReadTS(readTS) | ||
| defer readPin.Release() | ||
|
|
||
| meta, exists, err := s.loadBucketMetaAt(ctx, name, readTS) | ||
| if err != nil { | ||
| return errors.WithStack(err) | ||
| } | ||
| if !exists || meta == nil { | ||
| return ErrAdminBucketNotFound | ||
| } | ||
| start := s3keys.ObjectManifestPrefixForBucket(name, meta.Generation) | ||
| kvs, err := s.store.ScanAt(ctx, start, prefixScanEnd(start), 1, readTS) | ||
| if err != nil { | ||
| return errors.WithStack(err) | ||
| } | ||
| if len(kvs) > 0 { | ||
| return ErrAdminBucketNotEmpty | ||
| } | ||
| _, err = s.coordinator.Dispatch(ctx, &kv.OperationGroup[kv.OP]{ | ||
| IsTxn: true, | ||
| StartTS: startTS, | ||
| Elems: []*kv.Elem[kv.OP]{ | ||
| {Op: kv.Del, Key: s3keys.BucketMetaKey(name)}, | ||
| }, | ||
| }) | ||
| return errors.WithStack(err) | ||
| }) | ||
| if err != nil { | ||
| return err //nolint:wrapcheck // sentinel errors propagate as-is. | ||
| } | ||
| return nil | ||
| } | ||
|
Comment on lines
+393
to
+427
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
# Check how retryS3Mutation handles dispatch errors and whether it
# retries OCC conflicts on range reads.
ast-grep --pattern 'func ($_ *S3Server) retryS3Mutation($$$) $_ {
$$$
}'
# Look for OCC / read-intent tracking inside kv coordinator.
rg -nP --type=go -C3 'readPin|ReadIntent|ReadSet|range.*read'
# See whether the SigV4 deleteBucket path uses the same pattern (so we
# can scope this concern as pre-existing or new).
rg -nP --type=go -C5 'deleteBucket\s*\(' -g '!**/*_test.go' -g '!**/s3_admin*.go'Repository: bootjp/elastickv Length of output: 33198 🏁 Script executed: # Find isRetryableS3MutationErr implementation
rg -nP --type=go -A10 'func isRetryableS3MutationErr' -g '!**/*_test.go'
# Check what error types are retryable (look for OCC/write conflict handling)
rg -nP --type=go 'WriteConflict|OCC.*conflict|retryable' -g '!**/*_test.go' -C2
# Verify: does AdminDeleteBucket need to track ScanAt as ReadKeys?
rg -nP --type=go -B5 -A10 'func.*AdminDeleteBucket' adapter/s3_admin.goRepository: bootjp/elastickv Length of output: 42188 🏁 Script executed: # Check kv.OperationGroup definition and ReadKeys field
rg -nP --type=go -A5 'type OperationGroup' kv/
# Search for tests of AdminDeleteBucket or deleteBucket with concurrent writes
rg -nP --type=go 'TestAdmin.*DeleteBucket|Test.*deleteBucket' -g '**/*_test.go'
# Check if any S3 path passes keys from a range scan as ReadKeys
rg -nP --type=go -B10 'ReadKeys.*Scan' adapter/s3.goRepository: bootjp/elastickv Length of output: 375 🏁 Script executed: # Find the Dispatch signature to understand ReadKeys parameter
rg -nP --type=go 'func.*Dispatch.*ctx.*error' kv/ -A8
# Check if ReadKeys is in pb.Request
rg -nP --type=go 'ReadKeys' kv/ -B2 -A2 | head -100Repository: bootjp/elastickv Length of output: 8737 🏁 Script executed: # Search for existing tests of AdminDeleteBucket or deleteBucket with concurrent mutations
rg -nP --type=go 'AdminDeleteBucket|deleteBucket' adapter/s3_admin_test.go adapter/s3_test.go 2>/dev/null | head -50
# Check if there are any tests at all for these functions
fd -e _test.go -type f | xargs rg -l 'AdminDeleteBucket|deleteBucket' 2>/dev/nullRepository: bootjp/elastickv Length of output: 1193 🏁 Script executed: # Read the test that checks for non-empty bucket
sed -n '287,320p' adapter/s3_admin_test.goRepository: bootjp/elastickv Length of output: 1474 🏁 Script executed: # Verify there are no race-condition tests for AdminDeleteBucket/deleteBucket
rg -nP --type=go 'concurrent|race|TOCTOU' adapter/s3_admin_test.go adapter/s3_test.goRepository: bootjp/elastickv Length of output: 181 Verify the OCC read-set handling for the range scan in The concern is valid:
The OCC layer validates only explicit point keys in the This race exists in both 🤖 Prompt for AI Agents |
||
|
|
||
| // adminCanonicalACL normalises an empty input to the canned | ||
| // "private" default. The SigV4 createBucket / putBucketAcl paths | ||
| // apply the same default after trimming the x-amz-acl header. | ||
| // Pulled out so the admin write methods do not silently accept a | ||
| // blank string and create / mutate with whatever validateS3CannedAcl | ||
| // happens to allow on its empty branch. | ||
| func adminCanonicalACL(acl string) string { | ||
| trimmed := strings.TrimSpace(acl) | ||
| if trimmed == "" { | ||
| return s3AclPrivate | ||
| } | ||
| return trimmed | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
Repository: bootjp/elastickv
Length of output: 34769
🏁 Script executed:
Repository: bootjp/elastickv
Length of output: 4198
🏁 Script executed:
Repository: bootjp/elastickv
Length of output: 6862
🏁 Script executed:
Repository: bootjp/elastickv
Length of output: 1240
🏁 Script executed:
Repository: bootjp/elastickv
Length of output: 2854
🏁 Script executed:
Repository: bootjp/elastickv
Length of output: 6935
🏁 Script executed:
Repository: bootjp/elastickv
Length of output: 3165
🏁 Script executed:
Repository: bootjp/elastickv
Length of output: 2993
Remove the CommitTS concern; verify the range-scan TOCTOU issue in
AdminDeleteBucket.The
CommitTSasymmetry is intentional. TheOperationGroupstruct (kv/transcoder.go:29-30) explicitly documents thatCommitTSis optional: "Coordinators choose one automatically when this is zero." ThevalidateOperationGroupfunction does not validateCommitTS, and this pattern is used throughout the codebase (SQS, Redis, DynamoDB adapters all omit it when unset). No comment is needed.However,
AdminDeleteBuckethas a genuine TOCTOU issue: it scansObjectManifestPrefixForBucketatreadTS(lines 370–385), finds no objects, then dispatches a delete ofBucketMetaKeywithStartTSbut noReadKeys. If a concurrent write inserts an object after the scan but before the delete commits, the object becomes orphaned. TheretryS3Mutationloop only retries on conflicts toBucketMetaKeyitself, not to keys within the scanned range.Compare to SQS code (e.g., adapter/sqs_catalog.go:656–661), which correctly passes
ReadKeysafter range scans to enable OCC conflict detection. The same issue exists ins3.go:deleteBucket()(lines 638–680).Pass the scanned range as
ReadKeysto enable OCC validation, or re-scan at transactionStartTSbefore deleting to close the window.🤖 Prompt for AI Agents