Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
264 changes: 264 additions & 0 deletions adapter/s3_admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"bytes"
"context"
"sort"
"strings"

"github.com/bootjp/elastickv/internal/s3keys"
"github.com/bootjp/elastickv/kv"
"github.com/bootjp/elastickv/store"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -175,3 +177,265 @@ func summaryFromBucketMeta(name string, meta *s3BucketMeta) AdminBucketSummary {
Owner: meta.Owner,
}
}

// Sentinel errors the admin write methods return so the bridge in
// main_admin.go can translate them into admin-package vocabulary
// without sniffing strings. Named separately from
// ErrAdminTableAlreadyExists / ErrAdminTableNotFound on the Dynamo
// side so a future per-resource role / status divergence does not
// require renaming both packages' callers.
var (
// ErrAdminBucketAlreadyExists signals that AdminCreateBucket
// targeted a name already in use. Maps to 409 Conflict.
ErrAdminBucketAlreadyExists = errors.New("s3 admin: bucket already exists")
// ErrAdminBucketNotFound signals that AdminDeleteBucket /
// AdminPutBucketAcl targeted a missing bucket. Maps to 404.
ErrAdminBucketNotFound = errors.New("s3 admin: bucket not found")
// ErrAdminBucketNotEmpty signals that AdminDeleteBucket targeted
// a bucket that still has objects. Maps to 409 Conflict to match
// the SigV4 path's BucketNotEmpty response (the dashboard cannot
// force a recursive delete; the operator must clean up first).
ErrAdminBucketNotEmpty = errors.New("s3 admin: bucket is not empty")
// ErrAdminInvalidBucketName signals that AdminCreateBucket got
// a name that does not satisfy validateS3BucketName. Maps to 400.
ErrAdminInvalidBucketName = errors.New("s3 admin: invalid bucket name")
// ErrAdminInvalidACL signals that the ACL string did not pass
// validateS3CannedAcl. Maps to 400 (the SigV4 path returns 501
// NotImplemented for unsupported canned ACLs, but the admin API
// is documented as private/public-read only and rejecting other
// values as invalid input is a more useful contract for the
// dashboard).
ErrAdminInvalidACL = errors.New("s3 admin: invalid ACL")
)

// AdminCreateBucket creates a bucket on behalf of the admin
// dashboard. The principal MUST be re-validated by the caller (the
// admin HTTP handler does this against the live RoleStore); this
// method enforces the authorisation invariant a second time so a
// follower-forwarded call cannot smuggle a read-only principal past
// the check on the leader side (Section 3.2 "認可の真実は常に
// adapter 側").
//
// The transaction is atomic: bucket meta + generation + ACL all land
// in a single OperationGroup, mirroring the SigV4 createBucket path.
// On success returns the freshly-stored summary; on conflict returns
// ErrAdminBucketAlreadyExists; on a non-leader / non-full-role / bad
// input returns the corresponding sentinel.
func (s *S3Server) AdminCreateBucket(ctx context.Context, principal AdminPrincipal, name, acl string) (*AdminBucketSummary, error) {
if !principal.Role.canWrite() {
return nil, ErrAdminForbidden
}
if !s.isVerifiedS3Leader() {
return nil, ErrAdminNotLeader
}
if err := validateS3BucketName(name); err != nil {
return nil, errors.Wrapf(ErrAdminInvalidBucketName, "%s", err.Error())
}
acl = adminCanonicalACL(acl)
if err := validateS3CannedAcl(acl); err != nil {
return nil, errors.Wrapf(ErrAdminInvalidACL, "%s", err.Error())
}

var summary *AdminBucketSummary
err := s.retryS3Mutation(ctx, func() error {
out, err := s.adminCreateBucketTxn(ctx, principal, name, acl)
if err != nil {
return err
}
summary = out
return nil
})
if err != nil {
return nil, err //nolint:wrapcheck // sentinel errors propagate as-is; structured errors are already wrapped above.
}
return summary, nil
}

// adminCreateBucketTxn is the per-attempt body retryS3Mutation
// invokes. Pulled out so AdminCreateBucket stays under the
// cyclomatic ceiling without hiding the bucket-existence /
// generation / commit-ts dance — every step has a meaningful
// error path that the wrapping retry harness needs to see.
func (s *S3Server) adminCreateBucketTxn(ctx context.Context, principal AdminPrincipal, name, acl string) (*AdminBucketSummary, error) {
readTS := s.readTS()
startTS := s.txnStartTS(readTS)
readPin := s.pinReadTS(readTS)
defer readPin.Release()

existing, exists, err := s.loadBucketMetaAt(ctx, name, readTS)
if err != nil {
return nil, errors.WithStack(err)
}
if exists && existing != nil {
return nil, ErrAdminBucketAlreadyExists
}
nextGeneration, err := s.nextBucketGenerationAt(ctx, name, readTS)
if err != nil {
return nil, errors.WithStack(err)
}
commitTS, err := s.nextTxnCommitTS(startTS)
if err != nil {
return nil, errors.WithStack(err)
}
meta := &s3BucketMeta{
BucketName: name,
Generation: nextGeneration,
CreatedAtHLC: commitTS,
Region: s.effectiveRegion(),
Owner: principal.AccessKey,
Acl: acl,
}
body, err := encodeS3BucketMeta(meta)
if err != nil {
return nil, errors.WithStack(err)
}
_, err = s.coordinator.Dispatch(ctx, &kv.OperationGroup[kv.OP]{
IsTxn: true,
StartTS: startTS,
CommitTS: commitTS,
Elems: []*kv.Elem[kv.OP]{
{Op: kv.Put, Key: s3keys.BucketMetaKey(name), Value: body},
{Op: kv.Put, Key: s3keys.BucketGenerationKey(name), Value: encodeS3Generation(nextGeneration)},
},
})
if err != nil {
return nil, errors.WithStack(err)
}
out := summaryFromBucketMeta(name, meta)
return &out, nil
}

// AdminPutBucketAcl swaps the canned ACL on an existing bucket.
// Same authorisation contract as AdminCreateBucket. Mutates only
// the meta.Acl field; generation is preserved so existing object
// references stay valid.
func (s *S3Server) AdminPutBucketAcl(ctx context.Context, principal AdminPrincipal, name, acl string) error {
if !principal.Role.canWrite() {
return ErrAdminForbidden
}
if !s.isVerifiedS3Leader() {
return ErrAdminNotLeader
}
acl = adminCanonicalACL(acl)
if err := validateS3CannedAcl(acl); err != nil {
return errors.Wrapf(ErrAdminInvalidACL, "%s", err.Error())
}

err := s.retryS3Mutation(ctx, func() error {
readTS := s.readTS()
startTS := s.txnStartTS(readTS)
readPin := s.pinReadTS(readTS)
defer readPin.Release()

meta, exists, err := s.loadBucketMetaAt(ctx, name, readTS)
if err != nil {
return errors.WithStack(err)
}
if !exists || meta == nil {
return ErrAdminBucketNotFound
}
meta.Acl = acl
body, err := encodeS3BucketMeta(meta)
if err != nil {
return errors.WithStack(err)
}
_, err = s.coordinator.Dispatch(ctx, &kv.OperationGroup[kv.OP]{
IsTxn: true,
StartTS: startTS,
Elems: []*kv.Elem[kv.OP]{
{Op: kv.Put, Key: s3keys.BucketMetaKey(name), Value: body},
},
})
return errors.WithStack(err)
})
if err != nil {
return err //nolint:wrapcheck // sentinel errors propagate as-is.
}
return nil
}
Comment on lines +342 to +355
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Locate the kv.OperationGroup struct definition and check how Dispatch
# treats a zero CommitTS when IsTxn=true.
ast-grep --pattern 'type OperationGroup[$_] struct {
  $$$
}'

# Find the Dispatch implementation that consumes OperationGroup.
rg -nP --type=go -C5 'func.*Dispatch\s*\(.*OperationGroup'

# Find any other call sites that set IsTxn=true without CommitTS to see
# whether this is a known idiom or a regression.
rg -nPU --type=go -C3 'IsTxn:\s*true' -g '!**/*_test.go'

Repository: bootjp/elastickv

Length of output: 34769


🏁 Script executed:

#!/bin/bash
# Find OperationGroup struct definition
rg -nP 'type OperationGroup\[' --type=go -A 10

# Find validateOperationGroup function
rg -nP 'func.*validateOperationGroup' --type=go -A 20

# Check the Dispatch implementation in coordinator.go more closely
sed -n '240,280p' kv/coordinator.go

Repository: bootjp/elastickv

Length of output: 4198


🏁 Script executed:

#!/bin/bash
# Find AdminDeleteBucket implementation with full context
sed -n '361,403p' adapter/s3_admin.go

# Check if there's any range-read/read-intent tracking in OCC logic
rg -nP 'ReadKeys|read.*intent' kv/coordinator.go -A 3 -B 1

# Check transaction validation/conflict detection logic
rg -nP 'validateRead|checkConflict' kv/ --type=go -A 10 | head -60

Repository: bootjp/elastickv

Length of output: 6862


🏁 Script executed:

#!/bin/bash
# Check AdminPutBucketAcl (around line 312-355) for ReadKeys
sed -n '312,355p' adapter/s3_admin.go

# Check retryS3Mutation implementation
rg -nP 'func.*retryS3Mutation' adapter/s3_admin.go -A 30

Repository: bootjp/elastickv

Length of output: 1240


🏁 Script executed:

#!/bin/bash
# Find retryS3Mutation implementation
rg -nP 'func.*retryS3Mutation' adapter/s3.go -A 35

# Also check if AdminDeleteBucket passes ReadKeys for range scans
rg -nP 'ReadKeys.*=|ReadKeys:' adapter/s3_admin.go

# Check if there's any conflict detection specifically for AdminDeleteBucket
rg -nP 'ObjectManifestPrefix|retryS3Mutation' adapter/s3.go -B 2 -A 2

Repository: bootjp/elastickv

Length of output: 2854


🏁 Script executed:

#!/bin/bash
# Check if any other code passes ReadKeys for range scans
rg -nP 'ReadKeys.*=|ReadKeys:' adapter/ --type=go -B 2 | head -80

# Specifically check AdminCreateBucket to see if it uses ReadKeys
sed -n '259,310p' adapter/s3_admin.go

# Check dispatchTxn to understand how ReadKeys affect OCC
sed -n '801,870p' kv/coordinator.go | head -60

Repository: bootjp/elastickv

Length of output: 6935


🏁 Script executed:

#!/bin/bash
# Check if s3.go deleteBucket (regular, not admin) passes ReadKeys
sed -n '638,680p' adapter/s3.go

# Check if any S3 code passes ReadKeys for range scans
rg -nP 'ScanAt.*ReadKeys|ReadKeys.*ScanAt' adapter/s3.go adapter/s3_admin.go

# Look for patterns of range scan + dispatch in S3
rg -nP 'ScanAt' adapter/s3.go -A 10 | head -100

Repository: bootjp/elastickv

Length of output: 3165


🏁 Script executed:

#!/bin/bash
# Check the test coordinator that tests bucket delete races
sed -n '745,800p' adapter/s3_test.go

# Check containsObjectManifestMutation function
rg -nP 'func.*containsObjectManifestMutation' adapter/ --type=go -A 8

# Verify whether AdminDeleteBucket's dispatch is covered by the race test
rg -nP 'AdminDeleteBucket|bucketDeleteRaceCoordinator' adapter/s3_test.go adapter/s3_admin_test.go

Repository: bootjp/elastickv

Length of output: 2993


Remove the CommitTS concern; verify the range-scan TOCTOU issue in AdminDeleteBucket.

The CommitTS asymmetry is intentional. The OperationGroup struct (kv/transcoder.go:29-30) explicitly documents that CommitTS is optional: "Coordinators choose one automatically when this is zero." The validateOperationGroup function does not validate CommitTS, and this pattern is used throughout the codebase (SQS, Redis, DynamoDB adapters all omit it when unset). No comment is needed.

However, AdminDeleteBucket has a genuine TOCTOU issue: it scans ObjectManifestPrefixForBucket at readTS (lines 370–385), finds no objects, then dispatches a delete of BucketMetaKey with StartTS but no ReadKeys. If a concurrent write inserts an object after the scan but before the delete commits, the object becomes orphaned. The retryS3Mutation loop only retries on conflicts to BucketMetaKey itself, not to keys within the scanned range.

Compare to SQS code (e.g., adapter/sqs_catalog.go:656–661), which correctly passes ReadKeys after range scans to enable OCC conflict detection. The same issue exists in s3.go:deleteBucket() (lines 638–680).

Pass the scanned range as ReadKeys to enable OCC validation, or re-scan at transaction StartTS before deleting to close the window.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@adapter/s3_admin.go` around lines 342 - 355, AdminDeleteBucket has a TOCTOU:
after scanning ObjectManifestPrefixForBucket at readTS it dispatches a delete of
BucketMetaKey without including the scanned range in the
OperationGroup.ReadKeys, so concurrent inserts can be orphaned; fix by
collecting the scanned range keys (the range returned by the scan over
ObjectManifestPrefixForBucket) and include them in the OperationGroup passed to
s.coordinator.Dispatch (or, alternatively, re-scan at StartTS before deleting)
so OCC validation detects conflicts; update the retryS3Mutation /
s.coordinator.Dispatch call in AdminDeleteBucket (and likewise in deleteBucket
in s3.go) to populate ReadKeys with the scanned range.


// AdminDeleteBucket removes a bucket if it is empty. Same
// authorisation contract as the other admin write methods. The
// bucket-must-be-empty rule mirrors the SigV4 deleteBucket path —
// the dashboard cannot force a recursive delete, by design.
//
// Known orphan-race limitation (coderabbitai 🔴 / 🟠 on PR #669):
// the empty-bucket probe (ScanAt with limit=1 on
// ObjectManifestPrefixForBucket) reads at readTS but the
// subsequent BucketMetaKey delete only carries that single point
// key in its ReadKeys set. A concurrent PutObject that inserts a
// manifest key in the scanned prefix between readTS and the
// delete's commitTS will not conflict — the OCC validator only
// inspects keys that appear in ReadKeys, and there is no
// ReadRanges mechanism today. The object's manifest key survives
// under a now-deleted bucket meta and becomes orphaned.
//
// This race exists pre-existing in the SigV4 path
// (adapter/s3.go:deleteBucket — same shape, same limitation), so
// AdminDeleteBucket inherits the contract; closing the gap
// requires either (a) bumping BucketGenerationKey on every
// PutObject so it can serve as an OCC token in this read set, or
// (b) extending OperationGroup with ReadRanges and teaching the
// FSM to validate range emptiness atomically with commit. Both
// are larger changes outside this PR's scope; tracked in
// docs/design/2026_04_24_partial_admin_dashboard.md under the
// Outstanding open items section. Operators concerned about the
// orphan window today should pause writes against the target
// bucket before issuing the admin delete.
func (s *S3Server) AdminDeleteBucket(ctx context.Context, principal AdminPrincipal, name string) error {
if !principal.Role.canWrite() {
return ErrAdminForbidden
}
if !s.isVerifiedS3Leader() {
return ErrAdminNotLeader
}

err := s.retryS3Mutation(ctx, func() error {
readTS := s.readTS()
startTS := s.txnStartTS(readTS)
readPin := s.pinReadTS(readTS)
defer readPin.Release()

meta, exists, err := s.loadBucketMetaAt(ctx, name, readTS)
if err != nil {
return errors.WithStack(err)
}
if !exists || meta == nil {
return ErrAdminBucketNotFound
}
start := s3keys.ObjectManifestPrefixForBucket(name, meta.Generation)
kvs, err := s.store.ScanAt(ctx, start, prefixScanEnd(start), 1, readTS)
if err != nil {
return errors.WithStack(err)
}
if len(kvs) > 0 {
return ErrAdminBucketNotEmpty
}
_, err = s.coordinator.Dispatch(ctx, &kv.OperationGroup[kv.OP]{
IsTxn: true,
StartTS: startTS,
Elems: []*kv.Elem[kv.OP]{
{Op: kv.Del, Key: s3keys.BucketMetaKey(name)},
},
})
return errors.WithStack(err)
})
if err != nil {
return err //nolint:wrapcheck // sentinel errors propagate as-is.
}
return nil
}
Comment on lines +393 to +427
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check how retryS3Mutation handles dispatch errors and whether it
# retries OCC conflicts on range reads.
ast-grep --pattern 'func ($_ *S3Server) retryS3Mutation($$$) $_ {
  $$$
}'

# Look for OCC / read-intent tracking inside kv coordinator.
rg -nP --type=go -C3 'readPin|ReadIntent|ReadSet|range.*read'

# See whether the SigV4 deleteBucket path uses the same pattern (so we
# can scope this concern as pre-existing or new).
rg -nP --type=go -C5 'deleteBucket\s*\(' -g '!**/*_test.go' -g '!**/s3_admin*.go'

Repository: bootjp/elastickv

Length of output: 33198


🏁 Script executed:

# Find isRetryableS3MutationErr implementation
rg -nP --type=go -A10 'func isRetryableS3MutationErr' -g '!**/*_test.go'

# Check what error types are retryable (look for OCC/write conflict handling)
rg -nP --type=go 'WriteConflict|OCC.*conflict|retryable' -g '!**/*_test.go' -C2

# Verify: does AdminDeleteBucket need to track ScanAt as ReadKeys?
rg -nP --type=go -B5 -A10 'func.*AdminDeleteBucket' adapter/s3_admin.go

Repository: bootjp/elastickv

Length of output: 42188


🏁 Script executed:

# Check kv.OperationGroup definition and ReadKeys field
rg -nP --type=go -A5 'type OperationGroup' kv/

# Search for tests of AdminDeleteBucket or deleteBucket with concurrent writes
rg -nP --type=go 'TestAdmin.*DeleteBucket|Test.*deleteBucket' -g '**/*_test.go'

# Check if any S3 path passes keys from a range scan as ReadKeys
rg -nP --type=go -B10 'ReadKeys.*Scan' adapter/s3.go

Repository: bootjp/elastickv

Length of output: 375


🏁 Script executed:

# Find the Dispatch signature to understand ReadKeys parameter
rg -nP --type=go 'func.*Dispatch.*ctx.*error' kv/ -A8

# Check if ReadKeys is in pb.Request
rg -nP --type=go 'ReadKeys' kv/ -B2 -A2 | head -100

Repository: bootjp/elastickv

Length of output: 8737


🏁 Script executed:

# Search for existing tests of AdminDeleteBucket or deleteBucket with concurrent mutations
rg -nP --type=go 'AdminDeleteBucket|deleteBucket' adapter/s3_admin_test.go adapter/s3_test.go 2>/dev/null | head -50

# Check if there are any tests at all for these functions
fd -e _test.go -type f | xargs rg -l 'AdminDeleteBucket|deleteBucket' 2>/dev/null

Repository: bootjp/elastickv

Length of output: 1193


🏁 Script executed:

# Read the test that checks for non-empty bucket
sed -n '287,320p' adapter/s3_admin_test.go

Repository: bootjp/elastickv

Length of output: 1474


🏁 Script executed:

# Verify there are no race-condition tests for AdminDeleteBucket/deleteBucket
rg -nP --type=go 'concurrent|race|TOCTOU' adapter/s3_admin_test.go adapter/s3_test.go

Repository: bootjp/elastickv

Length of output: 181


Verify the OCC read-set handling for the range scan in AdminDeleteBucket.

The concern is valid: AdminDeleteBucket reads the object-manifest prefix at readTS (limit 1 — a "is the bucket empty?" probe) and dispatches a delete on s3keys.BucketMetaKey(name) without passing the scanned manifest keys as ReadKeys to the OCC validator. The two operations touch disjoint key spaces:

  • read set: range over s3keys.ObjectManifestPrefixForBucket(name, meta.Generation)
  • write set: single point key s3keys.BucketMetaKey(name)

The OCC layer validates only explicit point keys in the ReadKeys field (see store.ApplyMutations — it iterates readKeys slice). A concurrent PutObject that inserts a new manifest key between readTS and the delete's commit will not conflict; the delete commits anyway, leaving orphaned objects under a now-deleted bucket meta. The retryS3Mutation retry loop only activates if a conflict is detected, which will not occur.

This race exists in both AdminDeleteBucket and the SigV4 deleteBucket path (same pattern at adapter/s3.go:638), making it a pre-existing concern. To close the gap, either populate ReadKeys with the scanned manifest keys, or explicitly document that the design accepts this orphan race.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@adapter/s3_admin.go` around lines 369 - 403, AdminDeleteBucket reads a
one-entry probe of s3keys.ObjectManifestPrefixForBucket(...) via
s.store.ScanAt(...) then issues a delete via s.coordinator.Dispatch(...) but
does not supply the scanned manifest keys as ReadKeys, so OCC won't detect
concurrent PutObject races; fix by extracting the keys from kvs returned by
ScanAt and populate the OperationGroup's ReadKeys slice before calling
coordinator.Dispatch in the retryS3Mutation closure (reference functions/values:
AdminDeleteBucket, retryS3Mutation, loadBucketMetaAt, s.store.ScanAt,
s.coordinator.Dispatch, s3keys.ObjectManifestPrefixForBucket,
s3keys.BucketMetaKey) so the OCC validator will consider those manifest keys and
trigger a retry on conflict.


// adminCanonicalACL normalises an empty input to the canned
// "private" default. The SigV4 createBucket / putBucketAcl paths
// apply the same default after trimming the x-amz-acl header.
// Pulled out so the admin write methods do not silently accept a
// blank string and create / mutate with whatever validateS3CannedAcl
// happens to allow on its empty branch.
func adminCanonicalACL(acl string) string {
trimmed := strings.TrimSpace(acl)
if trimmed == "" {
return s3AclPrivate
}
return trimmed
}
Loading
Loading