Skip to content

Commit 1b9c692

Browse files
committed
admin: scaffold AdminForward follower-side client (Task #26 phase 1)
Adds the LeaderForwarder interface and a gRPC-backed implementation that the dynamo HTTP handler will invoke when the local node is a follower (the source returned ErrTablesNotLeader). This commit only introduces the client + its tests; wiring it into the handler and the main_admin.go bridge is the next phase. Pieces: - LeaderForwarder interface with ForwardCreateTable / ForwardDeleteTable, decoupled from proto so the handler stays proto-free. - ForwardResult is the transport-neutral envelope the handler re-emits verbatim, so a forwarded request looks identical to a leader-direct call from the SPA's perspective. - ErrLeaderUnavailable for the "no leader known" path (acceptance criterion 3 — the handler will turn this into 503 + Retry-After: 1 once wired up). - gRPCForwardClient builds the AdminForwardRequest from the admin CreateTableRequest / table name + AuthPrincipal, dials via a GRPCConnFactory abstraction, and re-shapes the response. - forwarded_from is populated with the local node id so the leader's audit log line carries the trace (criterion 6, leader side already in #635). - Defensive: status_code == 0 (proto zero value) is upgraded to 502 Bad Gateway; missing ContentType is filled with the JSON default. Both surface transport bugs rather than producing silently-malformed SPA responses. Tests cover: - constructor input validation (3 cases) - happy paths for both Forward operations including principal, payload, op enum, and forwarded_from round-trip - ErrLeaderUnavailable when the resolver returns "" - dial / rpc errors propagated with cockroachdb/errors wrapping - zero status code upgrade - missing content type fallback Not in this commit: handler integration, gRPC server registration in main.go, election-period retry. Those land in follow-ups before the PR opens.
1 parent 1e7e9bc commit 1b9c692

2 files changed

Lines changed: 364 additions & 0 deletions

File tree

internal/admin/forward_client.go

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
package admin
2+
3+
import (
4+
"context"
5+
"errors"
6+
"net/http"
7+
8+
pb "github.com/bootjp/elastickv/proto"
9+
pkgerrors "github.com/cockroachdb/errors"
10+
"github.com/goccy/go-json"
11+
)
12+
13+
// LeaderForwarder is the contract the admin HTTP handler invokes
14+
// when the local node is a follower (the source returned
15+
// ErrTablesNotLeader). Implementations dial the current leader
16+
// over the AdminForward gRPC service and return the leader's
17+
// response in a transport-neutral shape so the handler can re-emit
18+
// it verbatim.
19+
//
20+
// Defining this interface in the admin package — rather than wiring
21+
// pb.AdminForwardClient directly into the handler — keeps the
22+
// admin HTTP layer free of any proto-level coupling and lets tests
23+
// substitute a deterministic stub. The bridge in main_admin.go
24+
// provides the production implementation that uses
25+
// kv.GRPCConnCache + the raft engine's leader address.
26+
type LeaderForwarder interface {
27+
// ForwardCreateTable issues a forwarded CreateTable on the
28+
// leader's behalf. The response is the leader's structured
29+
// AdminForwardResponse re-shaped into ForwardResult so the
30+
// handler does not need to import proto.
31+
ForwardCreateTable(ctx context.Context, principal AuthPrincipal, in CreateTableRequest) (*ForwardResult, error)
32+
// ForwardDeleteTable is the delete-side counterpart.
33+
ForwardDeleteTable(ctx context.Context, principal AuthPrincipal, name string) (*ForwardResult, error)
34+
}
35+
36+
// ForwardResult is the leader's response replayed for the SPA. The
37+
// handler writes Payload verbatim with the given status code and
38+
// content type, so a forwarded request is indistinguishable from a
39+
// leader-direct call.
40+
type ForwardResult struct {
41+
StatusCode int
42+
Payload []byte
43+
ContentType string
44+
}
45+
46+
// ErrLeaderUnavailable is returned when the forwarder cannot reach
47+
// any leader — typically during a Raft election or a cluster split.
48+
// The handler maps it to 503 + Retry-After: 1 so the SPA / client
49+
// re-issues the request after a short delay (acceptance criterion 3).
50+
var ErrLeaderUnavailable = errors.New("admin: raft leader currently unavailable")
51+
52+
// LeaderAddressResolver returns the current Raft leader's address
53+
// for the local node's group, or "" if no leader is known. The
54+
// production wiring uses raftengine.Engine.LeaderAddr / the
55+
// cluster's address map; tests inject a fixed string.
56+
type LeaderAddressResolver func() string
57+
58+
// GRPCConnFactory is the small surface AdminForwardClient needs
59+
// from kv.GRPCConnCache. Pulling out an interface lets tests
60+
// substitute an in-memory dialer without spinning up a TCP
61+
// listener and lets the bridge use the existing connection cache
62+
// without copy-paste.
63+
type GRPCConnFactory interface {
64+
// ConnFor returns a gRPC client connection to addr, reusing
65+
// the cached entry if one exists. addr "" is a programming
66+
// error and may panic; callers must check leader-empty before
67+
// dialling.
68+
ConnFor(addr string) (PBAdminForwardClient, error)
69+
}
70+
71+
// PBAdminForwardClient narrows pb.AdminForwardClient to just the
72+
// methods this package uses. The narrower interface keeps the test
73+
// stub implementation small.
74+
type PBAdminForwardClient interface {
75+
Forward(ctx context.Context, in *pb.AdminForwardRequest, opts ...grpcCallOption) (*pb.AdminForwardResponse, error)
76+
}
77+
78+
// grpcCallOption is a re-export of google.golang.org/grpc.CallOption
79+
// kept private so callers do not import proto's grpc dep directly.
80+
type grpcCallOption = interface{}
81+
82+
// gRPCForwardClient is the production LeaderForwarder. Construct
83+
// one with NewGRPCForwardClient. Two collaborators are required:
84+
// - resolver: returns the current leader address, or "" if absent
85+
// - dial: turns an address into a PBAdminForwardClient (the
86+
// bridge wraps kv.GRPCConnCache to satisfy this)
87+
//
88+
// nodeID is echoed into the leader's audit log via
89+
// AdminForwardRequest.forwarded_from (acceptance criterion 6).
90+
type gRPCForwardClient struct {
91+
resolver LeaderAddressResolver
92+
dial GRPCConnFactory
93+
nodeID string
94+
}
95+
96+
// NewGRPCForwardClient constructs the production LeaderForwarder.
97+
// All three parameters must be non-nil / non-empty; otherwise the
98+
// constructor returns nil and a wiring-error so a misconfigured
99+
// build refuses to start rather than producing 500s at runtime.
100+
func NewGRPCForwardClient(resolver LeaderAddressResolver, dial GRPCConnFactory, nodeID string) (LeaderForwarder, error) {
101+
if resolver == nil {
102+
return nil, errors.New("admin forwarder: leader address resolver is required")
103+
}
104+
if dial == nil {
105+
return nil, errors.New("admin forwarder: gRPC connection factory is required")
106+
}
107+
if nodeID == "" {
108+
return nil, errors.New("admin forwarder: node id is required for audit log enrichment")
109+
}
110+
return &gRPCForwardClient{resolver: resolver, dial: dial, nodeID: nodeID}, nil
111+
}
112+
113+
// ForwardCreateTable serialises `in` as JSON and dispatches the
114+
// CreateTable operation to the leader. Returns ErrLeaderUnavailable
115+
// when no leader address is known; gRPC-level errors are wrapped
116+
// and surfaced unchanged so the handler can decide whether to
117+
// retry, log, or 500.
118+
func (c *gRPCForwardClient) ForwardCreateTable(ctx context.Context, principal AuthPrincipal, in CreateTableRequest) (*ForwardResult, error) {
119+
payload, err := json.Marshal(in)
120+
if err != nil {
121+
// CreateTableRequest is plain string fields; Marshal
122+
// cannot fail in practice. Surface the error rather than
123+
// silently dropping the request.
124+
return nil, pkgerrors.Wrap(err, "admin forward: marshal create-table request")
125+
}
126+
return c.forward(ctx, pb.AdminOperation_ADMIN_OP_CREATE_TABLE, principal, payload)
127+
}
128+
129+
// ForwardDeleteTable serialises the table name as `{"name":"..."}`
130+
// to match the leader-side handleDelete contract.
131+
func (c *gRPCForwardClient) ForwardDeleteTable(ctx context.Context, principal AuthPrincipal, name string) (*ForwardResult, error) {
132+
payload, err := json.Marshal(struct {
133+
Name string `json:"name"`
134+
}{Name: name})
135+
if err != nil {
136+
return nil, pkgerrors.Wrap(err, "admin forward: marshal delete-table request")
137+
}
138+
return c.forward(ctx, pb.AdminOperation_ADMIN_OP_DELETE_TABLE, principal, payload)
139+
}
140+
141+
func (c *gRPCForwardClient) forward(ctx context.Context, op pb.AdminOperation, principal AuthPrincipal, payload []byte) (*ForwardResult, error) {
142+
addr := c.resolver()
143+
if addr == "" {
144+
return nil, ErrLeaderUnavailable
145+
}
146+
cli, err := c.dial.ConnFor(addr)
147+
if err != nil {
148+
return nil, pkgerrors.Wrap(err, "admin forward: dial leader")
149+
}
150+
resp, err := cli.Forward(ctx, &pb.AdminForwardRequest{
151+
Principal: &pb.AdminPrincipal{
152+
AccessKey: principal.AccessKey,
153+
Role: string(principal.Role),
154+
},
155+
Operation: op,
156+
Payload: payload,
157+
ForwardedFrom: c.nodeID,
158+
})
159+
if err != nil {
160+
return nil, pkgerrors.Wrap(err, "admin forward: rpc")
161+
}
162+
out := &ForwardResult{
163+
StatusCode: int(resp.GetStatusCode()),
164+
Payload: resp.GetPayload(),
165+
ContentType: resp.GetContentType(),
166+
}
167+
if out.ContentType == "" {
168+
// The leader server always sets a content type, but be
169+
// defensive: a future change that drops it must not
170+
// produce a SPA response with no Content-Type header.
171+
out.ContentType = "application/json; charset=utf-8"
172+
}
173+
if out.StatusCode == 0 {
174+
// status_code 0 is the proto's zero value, which the
175+
// leader server never emits intentionally. Treat it as a
176+
// transport bug rather than a 200, since 0 is not a valid
177+
// HTTP status.
178+
out.StatusCode = http.StatusBadGateway
179+
}
180+
return out, nil
181+
}
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
package admin
2+
3+
import (
4+
"context"
5+
"errors"
6+
"net/http"
7+
"testing"
8+
9+
pb "github.com/bootjp/elastickv/proto"
10+
"github.com/goccy/go-json"
11+
"github.com/stretchr/testify/require"
12+
)
13+
14+
// stubForwardConn is the in-memory PBAdminForwardClient the
15+
// follower-side tests use. It captures the last request and
16+
// returns whatever response/error the test prepared.
17+
type stubForwardConn struct {
18+
resp *pb.AdminForwardResponse
19+
err error
20+
lastReq *pb.AdminForwardRequest
21+
}
22+
23+
func (s *stubForwardConn) Forward(_ context.Context, in *pb.AdminForwardRequest, _ ...grpcCallOption) (*pb.AdminForwardResponse, error) {
24+
s.lastReq = in
25+
if s.err != nil {
26+
return nil, s.err
27+
}
28+
return s.resp, nil
29+
}
30+
31+
// stubConnFactory wraps stubForwardConn so the gRPCForwardClient
32+
// can resolve an address to a client. The captured `addr` lets
33+
// tests prove the resolver round-tripped correctly.
34+
type stubConnFactory struct {
35+
conn *stubForwardConn
36+
dialErr error
37+
lastAddr string
38+
}
39+
40+
func (s *stubConnFactory) ConnFor(addr string) (PBAdminForwardClient, error) {
41+
s.lastAddr = addr
42+
if s.dialErr != nil {
43+
return nil, s.dialErr
44+
}
45+
return s.conn, nil
46+
}
47+
48+
// fixedResolver returns a closure that always resolves to addr.
49+
// Pulling this into a helper keeps the per-test setup compact.
50+
func fixedResolver(addr string) LeaderAddressResolver {
51+
return func() string { return addr }
52+
}
53+
54+
func TestNewGRPCForwardClient_RejectsMissingDeps(t *testing.T) {
55+
cases := []struct {
56+
name string
57+
resolver LeaderAddressResolver
58+
dial GRPCConnFactory
59+
nodeID string
60+
expect string
61+
}{
62+
{"nil resolver", nil, &stubConnFactory{}, "n1", "leader address resolver"},
63+
{"nil dial", fixedResolver(""), nil, "n1", "gRPC connection factory"},
64+
{"empty node id", fixedResolver(""), &stubConnFactory{}, "", "node id is required"},
65+
}
66+
for _, tc := range cases {
67+
t.Run(tc.name, func(t *testing.T) {
68+
fwd, err := NewGRPCForwardClient(tc.resolver, tc.dial, tc.nodeID)
69+
require.Error(t, err)
70+
require.Nil(t, fwd)
71+
require.Contains(t, err.Error(), tc.expect)
72+
})
73+
}
74+
}
75+
76+
func TestGRPCForwardClient_ForwardCreateTable_HappyPath(t *testing.T) {
77+
conn := &stubForwardConn{resp: &pb.AdminForwardResponse{
78+
StatusCode: http.StatusCreated,
79+
Payload: []byte(`{"name":"users"}`),
80+
ContentType: "application/json; charset=utf-8",
81+
}}
82+
dial := &stubConnFactory{conn: conn}
83+
fwd, err := NewGRPCForwardClient(fixedResolver("leader.local:7000"), dial, "follower-2")
84+
require.NoError(t, err)
85+
86+
in := CreateTableRequest{TableName: "users", PartitionKey: CreateTableAttribute{Name: "id", Type: "S"}}
87+
res, err := fwd.ForwardCreateTable(context.Background(), AuthPrincipal{AccessKey: "AKIA_F", Role: RoleFull}, in)
88+
require.NoError(t, err)
89+
require.NotNil(t, res)
90+
require.Equal(t, http.StatusCreated, res.StatusCode)
91+
require.JSONEq(t, `{"name":"users"}`, string(res.Payload))
92+
require.Equal(t, "application/json; charset=utf-8", res.ContentType)
93+
94+
// The captured request must carry the principal, the canonical
95+
// op enum, the JSON-encoded body, and the follower node id.
96+
require.Equal(t, "leader.local:7000", dial.lastAddr)
97+
require.NotNil(t, conn.lastReq)
98+
require.Equal(t, pb.AdminOperation_ADMIN_OP_CREATE_TABLE, conn.lastReq.GetOperation())
99+
require.Equal(t, "AKIA_F", conn.lastReq.GetPrincipal().GetAccessKey())
100+
require.Equal(t, "full", conn.lastReq.GetPrincipal().GetRole())
101+
require.Equal(t, "follower-2", conn.lastReq.GetForwardedFrom())
102+
var roundtripped CreateTableRequest
103+
require.NoError(t, json.Unmarshal(conn.lastReq.GetPayload(), &roundtripped))
104+
require.Equal(t, in, roundtripped)
105+
}
106+
107+
func TestGRPCForwardClient_ForwardDeleteTable_HappyPath(t *testing.T) {
108+
conn := &stubForwardConn{resp: &pb.AdminForwardResponse{
109+
StatusCode: http.StatusNoContent,
110+
ContentType: "application/json; charset=utf-8",
111+
}}
112+
dial := &stubConnFactory{conn: conn}
113+
fwd, _ := NewGRPCForwardClient(fixedResolver("leader.local:7000"), dial, "follower-3")
114+
115+
res, err := fwd.ForwardDeleteTable(context.Background(), AuthPrincipal{AccessKey: "AKIA_F", Role: RoleFull}, "users")
116+
require.NoError(t, err)
117+
require.Equal(t, http.StatusNoContent, res.StatusCode)
118+
require.Empty(t, res.Payload)
119+
require.Equal(t, pb.AdminOperation_ADMIN_OP_DELETE_TABLE, conn.lastReq.GetOperation())
120+
require.JSONEq(t, `{"name":"users"}`, string(conn.lastReq.GetPayload()))
121+
require.Equal(t, "follower-3", conn.lastReq.GetForwardedFrom())
122+
}
123+
124+
func TestGRPCForwardClient_NoLeaderReturnsErrLeaderUnavailable(t *testing.T) {
125+
dial := &stubConnFactory{conn: &stubForwardConn{}}
126+
fwd, _ := NewGRPCForwardClient(fixedResolver(""), dial, "f")
127+
128+
_, err := fwd.ForwardCreateTable(context.Background(), AuthPrincipal{Role: RoleFull},
129+
CreateTableRequest{TableName: "t", PartitionKey: CreateTableAttribute{Name: "id", Type: "S"}})
130+
require.ErrorIs(t, err, ErrLeaderUnavailable)
131+
// Connection must not have been dialled when no leader is known.
132+
require.Empty(t, dial.lastAddr)
133+
}
134+
135+
func TestGRPCForwardClient_DialErrorPropagated(t *testing.T) {
136+
dial := &stubConnFactory{dialErr: errors.New("network unreachable")}
137+
fwd, _ := NewGRPCForwardClient(fixedResolver("leader:1"), dial, "f")
138+
139+
_, err := fwd.ForwardCreateTable(context.Background(), AuthPrincipal{Role: RoleFull},
140+
CreateTableRequest{TableName: "t", PartitionKey: CreateTableAttribute{Name: "id", Type: "S"}})
141+
require.Error(t, err)
142+
require.Contains(t, err.Error(), "network unreachable")
143+
}
144+
145+
func TestGRPCForwardClient_GRPCErrorPropagated(t *testing.T) {
146+
conn := &stubForwardConn{err: errors.New("rpc deadline exceeded")}
147+
dial := &stubConnFactory{conn: conn}
148+
fwd, _ := NewGRPCForwardClient(fixedResolver("leader:1"), dial, "f")
149+
150+
_, err := fwd.ForwardDeleteTable(context.Background(), AuthPrincipal{Role: RoleFull}, "x")
151+
require.Error(t, err)
152+
require.Contains(t, err.Error(), "rpc deadline exceeded")
153+
}
154+
155+
func TestGRPCForwardClient_ZeroStatusUpgradesTo502(t *testing.T) {
156+
// status_code 0 is the proto zero value — never emitted
157+
// intentionally by the leader. The forwarder must not treat
158+
// that as 200; mapping it to 502 surfaces the transport bug.
159+
conn := &stubForwardConn{resp: &pb.AdminForwardResponse{StatusCode: 0}}
160+
dial := &stubConnFactory{conn: conn}
161+
fwd, _ := NewGRPCForwardClient(fixedResolver("leader:1"), dial, "f")
162+
163+
res, err := fwd.ForwardCreateTable(context.Background(), AuthPrincipal{Role: RoleFull},
164+
CreateTableRequest{TableName: "t", PartitionKey: CreateTableAttribute{Name: "id", Type: "S"}})
165+
require.NoError(t, err)
166+
require.Equal(t, http.StatusBadGateway, res.StatusCode)
167+
}
168+
169+
func TestGRPCForwardClient_FillsMissingContentType(t *testing.T) {
170+
// A future leader-server change that omits ContentType must
171+
// still produce a SPA-readable response. The forwarder fills
172+
// in the default JSON content type defensively.
173+
conn := &stubForwardConn{resp: &pb.AdminForwardResponse{
174+
StatusCode: http.StatusCreated,
175+
Payload: []byte(`{"name":"u"}`),
176+
}}
177+
dial := &stubConnFactory{conn: conn}
178+
fwd, _ := NewGRPCForwardClient(fixedResolver("leader:1"), dial, "f")
179+
180+
res, _ := fwd.ForwardCreateTable(context.Background(), AuthPrincipal{Role: RoleFull},
181+
CreateTableRequest{TableName: "t", PartitionKey: CreateTableAttribute{Name: "id", Type: "S"}})
182+
require.Equal(t, "application/json; charset=utf-8", res.ContentType)
183+
}

0 commit comments

Comments
 (0)