Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions internal/admin/forward_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@ type LeaderForwarder interface {
ForwardCreateTable(ctx context.Context, principal AuthPrincipal, in CreateTableRequest) (*ForwardResult, error)
// ForwardDeleteTable is the delete-side counterpart.
ForwardDeleteTable(ctx context.Context, principal AuthPrincipal, name string) (*ForwardResult, error)
// ForwardCreateBucket issues a forwarded POST /s3/buckets on
// the leader's behalf. The leader echoes back the same JSON
// envelope a leader-direct call would produce.
ForwardCreateBucket(ctx context.Context, principal AuthPrincipal, in CreateBucketRequest) (*ForwardResult, error)
// ForwardDeleteBucket is the delete-side counterpart.
ForwardDeleteBucket(ctx context.Context, principal AuthPrincipal, name string) (*ForwardResult, error)
// ForwardPutBucketAcl issues a forwarded PUT
// /s3/buckets/{name}/acl. Both the bucket name and the
// new ACL travel inside the proto payload — see the leader-side
// handlePutBucketAcl for the JSON shape.
ForwardPutBucketAcl(ctx context.Context, principal AuthPrincipal, name, acl string) (*ForwardResult, error)
}

// ForwardResult is the leader's response replayed for the SPA. The
Expand Down Expand Up @@ -142,6 +153,43 @@ func (c *gRPCForwardClient) ForwardDeleteTable(ctx context.Context, principal Au
return c.forward(ctx, pb.AdminOperation_ADMIN_OP_DELETE_TABLE, principal, payload)
}

// ForwardCreateBucket serialises `in` as JSON and dispatches the
// CreateBucket operation. Same contract as ForwardCreateTable —
// gRPC errors are wrapped, ErrLeaderUnavailable on no-leader.
func (c *gRPCForwardClient) ForwardCreateBucket(ctx context.Context, principal AuthPrincipal, in CreateBucketRequest) (*ForwardResult, error) {
payload, err := json.Marshal(in)
if err != nil {
return nil, pkgerrors.Wrap(err, "admin forward: marshal create-bucket request")
}
return c.forward(ctx, pb.AdminOperation_ADMIN_OP_CREATE_BUCKET, principal, payload)
}

// ForwardDeleteBucket serialises the bucket name as `{"name":"..."}`
// to match the leader-side handleDeleteBucket contract.
func (c *gRPCForwardClient) ForwardDeleteBucket(ctx context.Context, principal AuthPrincipal, name string) (*ForwardResult, error) {
payload, err := json.Marshal(struct {
Name string `json:"name"`
}{Name: name})
if err != nil {
return nil, pkgerrors.Wrap(err, "admin forward: marshal delete-bucket request")
}
return c.forward(ctx, pb.AdminOperation_ADMIN_OP_DELETE_BUCKET, principal, payload)
}

// ForwardPutBucketAcl carries both the bucket name (from the URL
// path) and the new ACL (from the request body) in the proto
// payload — see handlePutBucketAcl for the leader-side decode.
func (c *gRPCForwardClient) ForwardPutBucketAcl(ctx context.Context, principal AuthPrincipal, name, acl string) (*ForwardResult, error) {
payload, err := json.Marshal(struct {
Name string `json:"name"`
ACL string `json:"acl"`
}{Name: name, ACL: acl})
if err != nil {
return nil, pkgerrors.Wrap(err, "admin forward: marshal put-bucket-acl request")
}
return c.forward(ctx, pb.AdminOperation_ADMIN_OP_PUT_BUCKET_ACL, principal, payload)
}

func (c *gRPCForwardClient) forward(ctx context.Context, op pb.AdminOperation, principal AuthPrincipal, payload []byte) (*ForwardResult, error) {
addr := c.resolver()
if addr == "" {
Expand Down
69 changes: 69 additions & 0 deletions internal/admin/forward_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,3 +182,72 @@ func TestGRPCForwardClient_FillsMissingContentType(t *testing.T) {
CreateTableRequest{TableName: "t", PartitionKey: CreateTableAttribute{Name: "id", Type: "S"}})
require.Equal(t, "application/json; charset=utf-8", res.ContentType)
}

func TestGRPCForwardClient_ForwardCreateBucket_HappyPath(t *testing.T) {
conn := &stubForwardConn{resp: &pb.AdminForwardResponse{
StatusCode: http.StatusCreated,
Payload: []byte(`{"bucket_name":"public-assets","acl":"public-read"}`),
ContentType: "application/json; charset=utf-8",
}}
dial := &stubConnFactory{conn: conn}
fwd, err := NewGRPCForwardClient(fixedResolver("leader.local:7000"), dial, "follower-2")
require.NoError(t, err)

in := CreateBucketRequest{BucketName: "public-assets", ACL: "public-read"}
res, err := fwd.ForwardCreateBucket(context.Background(),
AuthPrincipal{AccessKey: "AKIA_F", Role: RoleFull}, in)
require.NoError(t, err)
require.Equal(t, http.StatusCreated, res.StatusCode)
require.Equal(t, pb.AdminOperation_ADMIN_OP_CREATE_BUCKET, conn.lastReq.GetOperation())
require.Equal(t, "follower-2", conn.lastReq.GetForwardedFrom())
var roundtripped CreateBucketRequest
require.NoError(t, json.Unmarshal(conn.lastReq.GetPayload(), &roundtripped))
require.Equal(t, in, roundtripped)
}

func TestGRPCForwardClient_ForwardDeleteBucket_HappyPath(t *testing.T) {
conn := &stubForwardConn{resp: &pb.AdminForwardResponse{
StatusCode: http.StatusNoContent,
ContentType: "application/json; charset=utf-8",
}}
dial := &stubConnFactory{conn: conn}
fwd, _ := NewGRPCForwardClient(fixedResolver("leader.local:7000"), dial, "follower-3")

res, err := fwd.ForwardDeleteBucket(context.Background(),
AuthPrincipal{AccessKey: "AKIA_F", Role: RoleFull}, "orders")
require.NoError(t, err)
require.Equal(t, http.StatusNoContent, res.StatusCode)
require.Equal(t, pb.AdminOperation_ADMIN_OP_DELETE_BUCKET, conn.lastReq.GetOperation())
require.JSONEq(t, `{"name":"orders"}`, string(conn.lastReq.GetPayload()))
}

func TestGRPCForwardClient_ForwardPutBucketAcl_HappyPath(t *testing.T) {
conn := &stubForwardConn{resp: &pb.AdminForwardResponse{
StatusCode: http.StatusNoContent,
ContentType: "application/json; charset=utf-8",
}}
dial := &stubConnFactory{conn: conn}
fwd, _ := NewGRPCForwardClient(fixedResolver("leader.local:7000"), dial, "follower-4")

res, err := fwd.ForwardPutBucketAcl(context.Background(),
AuthPrincipal{AccessKey: "AKIA_F", Role: RoleFull}, "orders", "public-read")
require.NoError(t, err)
require.Equal(t, http.StatusNoContent, res.StatusCode)
require.Equal(t, pb.AdminOperation_ADMIN_OP_PUT_BUCKET_ACL, conn.lastReq.GetOperation())
// Both name and acl travel in the payload — see handlePutBucketAcl
// for the leader-side decode contract.
require.JSONEq(t, `{"name":"orders","acl":"public-read"}`, string(conn.lastReq.GetPayload()))
}

func TestGRPCForwardClient_BucketOps_NoLeaderReturnsErrLeaderUnavailable(t *testing.T) {
dial := &stubConnFactory{conn: &stubForwardConn{}}
fwd, _ := NewGRPCForwardClient(fixedResolver(""), dial, "f")

_, err := fwd.ForwardCreateBucket(context.Background(), AuthPrincipal{Role: RoleFull},
CreateBucketRequest{BucketName: "x"})
require.ErrorIs(t, err, ErrLeaderUnavailable)
_, err = fwd.ForwardDeleteBucket(context.Background(), AuthPrincipal{Role: RoleFull}, "x")
require.ErrorIs(t, err, ErrLeaderUnavailable)
_, err = fwd.ForwardPutBucketAcl(context.Background(), AuthPrincipal{Role: RoleFull}, "x", "private")
require.ErrorIs(t, err, ErrLeaderUnavailable)
}
55 changes: 50 additions & 5 deletions internal/admin/forward_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,28 @@ type stubLeaderForwarder struct {
createErr error
deleteRes *ForwardResult
deleteErr error

lastCreatePrincipal AuthPrincipal
lastCreateInput CreateTableRequest
lastDeletePrincipal AuthPrincipal
lastDeleteName string
// Bucket-side counterparts ship with P2 slice 2b. Tests that
// only exercise Dynamo behaviour leave these zero values; the
// stub honours the same nil-result-and-no-error contract the
// table side uses on a "did not call us" path.
createBucketRes *ForwardResult
createBucketErr error
deleteBucketRes *ForwardResult
deleteBucketErr error
putACLRes *ForwardResult
putACLErr error

lastCreatePrincipal AuthPrincipal
lastCreateInput CreateTableRequest
lastDeletePrincipal AuthPrincipal
lastDeleteName string
lastCreateBucketPrincipal AuthPrincipal
lastCreateBucketInput CreateBucketRequest
lastDeleteBucketPrincipal AuthPrincipal
lastDeleteBucketName string
lastPutACLPrincipal AuthPrincipal
lastPutACLBucket string
lastPutACLValue string
}

func (s *stubLeaderForwarder) ForwardCreateTable(_ context.Context, principal AuthPrincipal, in CreateTableRequest) (*ForwardResult, error) {
Expand All @@ -45,6 +62,34 @@ func (s *stubLeaderForwarder) ForwardDeleteTable(_ context.Context, principal Au
return s.deleteRes, nil
}

func (s *stubLeaderForwarder) ForwardCreateBucket(_ context.Context, principal AuthPrincipal, in CreateBucketRequest) (*ForwardResult, error) {
s.lastCreateBucketPrincipal = principal
s.lastCreateBucketInput = in
if s.createBucketErr != nil {
return nil, s.createBucketErr
}
return s.createBucketRes, nil
}

func (s *stubLeaderForwarder) ForwardDeleteBucket(_ context.Context, principal AuthPrincipal, name string) (*ForwardResult, error) {
s.lastDeleteBucketPrincipal = principal
s.lastDeleteBucketName = name
if s.deleteBucketErr != nil {
return nil, s.deleteBucketErr
}
return s.deleteBucketRes, nil
}

func (s *stubLeaderForwarder) ForwardPutBucketAcl(_ context.Context, principal AuthPrincipal, name, acl string) (*ForwardResult, error) {
s.lastPutACLPrincipal = principal
s.lastPutACLBucket = name
s.lastPutACLValue = acl
if s.putACLErr != nil {
return nil, s.putACLErr
}
return s.putACLRes, nil
}

// notLeaderSource is a TablesSource that always returns
// ErrTablesNotLeader on writes — i.e., it simulates the local
// node being a follower. Used to exercise the forwarder path
Expand Down
Loading
Loading