Skip to content

Commit c2512af

Browse files
authored
admin: AdminForward follower-side client + handler integration (P1, partial) (#644)
Phase 1 + 2 of Task #26: the follower-side `LeaderForwarder` client and its integration into the dynamo HTTP handler. Builds on the AdminForward leader-side dispatcher landed via #635. ## Summary - New `LeaderForwarder` interface (`internal/admin/forward_client.go`) decouples the dynamo HTTP handler from `pb.AdminForwardClient`. The handler stays proto-free; the bridge in `main_admin.go` (next phase) plugs in the gRPC-backed implementation. - `gRPCForwardClient` translates a `CreateTableRequest` / table-name into an `AdminForwardRequest`, dials via a `GRPCConnFactory` (production wraps `kv.GRPCConnCache`), and re-shapes the response into `ForwardResult` (status, payload, content-type). - `forwarded_from = nodeID` is populated so the leader's audit log carries the trace (criterion 6, leader-side already shipped in #635). - Defensive: `status_code == 0` upgrades to `502 Bad Gateway`; missing `ContentType` fills the JSON default. Both surface transport bugs rather than producing silently-malformed SPA responses. - `ErrLeaderUnavailable` sentinel signals the "no leader known" case so the handler can map to 503 + `Retry-After: 1` (criterion 3). - `DynamoHandler` gains a `forwarder` field and `WithLeaderForwarder` option. When set, `handleCreate` / `handleDelete` catch `ErrTablesNotLeader` from the source and forward to the leader transparently — the SPA cannot tell forwarded from leader-direct. - `writeForwardResult` re-emits the leader's structured response verbatim (status + payload + content-type), so a forwarded `409 Conflict` from the leader stays `409` on the wire — no re-classification. - `writeForwardFailure` maps `ErrLeaderUnavailable` (election in flight) and gRPC transport errors to 503 + `Retry-After: 1`. `ErrLeaderUnavailable` is intentionally NOT logged at error level (elections are routine); transport errors are logged at LevelError so operators can investigate. ## What is NOT in this PR - gRPC server registration in `main.go` (production wiring of the `ForwardServer` from #635) — comes in the next phase. - The bridge that wraps `kv.GRPCConnCache` and supplies `LeaderAddressResolver` — same phase. - Election-period retry-loop on the client side (criterion 3 partial: this PR returns 503 + Retry-After; the SPA / client retries the request; criterion 3 fully needs the production bridge to dial actual leader-discovery). ## Test plan - [x] `go build ./...` - [x] `go vet ./...` - [x] `golangci-lint run` (admin package: 0 issues) - [x] `go test ./internal/admin/ -count=1 -race` - 8 forward-client unit tests: constructor input validation, both Forward operations including principal/payload/op-enum/`forwarded_from` round-trip, `ErrLeaderUnavailable`, dial/RPC errors propagated with `cockroachdb/errors` wrapping, zero status code upgrade, missing content type fallback - 9 handler integration tests: transparent forward for create + delete, no-forwarder fallback to 503, `ErrLeaderUnavailable` → 503 + Retry-After, transport error → 503 + log, leader 409 pass-through, leader 503 + Retry-After preserved, role check short-circuits before forward, body validation short-circuits before forward - [ ] Wire production bridge + register `pb.RegisterAdminForwardServer` in `main.go` and exercise an end-to-end follower → leader call against a real cluster (next PR). ## Acceptance criteria coverage | # | Criterion | This PR | |---|---|---| | 1 | Leader direct write | ✓ (in main since #634) | | 2 | Follower forwards transparently | ✓ wiring done; needs main.go gRPC registration to take effect | | 3 | Election-period 503 + retry | ✓ partial — handler returns 503 + Retry-After; full transparency needs the bridge | | 4 | Leader demotes stale full role | ✓ (in main since #635) | | 5 | Rolling-upgrade compat flag | ⏳ deferred (cluster-version bump) | | 6 | `forwarded_from` in audit log | ✓ (in main since #635) |
2 parents 1e7e9bc + e7f017e commit c2512af

4 files changed

Lines changed: 885 additions & 10 deletions

File tree

internal/admin/dynamo_handler.go

Lines changed: 141 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,10 @@ var (
119119
// role required for the operation. Maps to 403.
120120
ErrTablesForbidden = errors.New("admin tables: principal lacks required role")
121121
// ErrTablesNotLeader is returned when the local node is not the
122-
// Raft leader. Maps to 503 + Retry-After: 1 today; the future
123-
// AdminForward RPC catches this as the trigger to forward.
122+
// Raft leader. When a LeaderForwarder is configured,
123+
// tryForwardCreate / tryForwardDelete catch this before it reaches
124+
// writeTablesError and forward the request to the leader
125+
// transparently. Without a forwarder, maps to 503 + Retry-After: 1.
124126
ErrTablesNotLeader = errors.New("admin tables: local node is not the raft leader")
125127
// ErrTablesNotFound is returned when DELETE / DESCRIBE / a
126128
// follow-up read targets a table that does not exist. Maps to
@@ -168,15 +170,22 @@ func (e *ValidationError) Error() string {
168170
// — the JWT freezes the role at login time, and tokens last one
169171
// hour. Codex P1 on PR #635 flagged the gap on the HTTP path;
170172
// the forward server already does this re-evaluation on its side.
173+
//
174+
// When the source returns ErrTablesNotLeader and a LeaderForwarder
175+
// is configured, write requests are forwarded to the leader
176+
// transparently — the SPA sees a leader-direct response shape
177+
// regardless of which node it hit (design Section 3.3 criterion 2).
171178
type DynamoHandler struct {
172-
source TablesSource
173-
roles RoleStore
174-
logger *slog.Logger
179+
source TablesSource
180+
roles RoleStore
181+
forwarder LeaderForwarder
182+
logger *slog.Logger
175183
}
176184

177185
// NewDynamoHandler binds the source and seeds logging with
178-
// slog.Default(). Use WithLogger to attach a tagged logger and
179-
// WithRoleStore to plug in the live access-key role lookup.
186+
// slog.Default(). Use WithLogger to attach a tagged logger,
187+
// WithRoleStore to plug in the live access-key role lookup, and
188+
// WithLeaderForwarder to plug in the follower→leader forwarder.
180189
func NewDynamoHandler(source TablesSource) *DynamoHandler {
181190
return &DynamoHandler{source: source, logger: slog.Default()}
182191
}
@@ -201,6 +210,22 @@ func (h *DynamoHandler) WithRoleStore(r RoleStore) *DynamoHandler {
201210
return h
202211
}
203212

213+
// WithLeaderForwarder enables transparent follower→leader
214+
// forwarding. Without it, write requests on a follower fall back
215+
// to the standard 503 leader_unavailable response. Production
216+
// wires this to the gRPCForwardClient in main_admin.go; tests
217+
// inject a stub.
218+
//
219+
// Asymmetric vs WithLogger by design: WithLogger no-ops on nil to
220+
// preserve the slog.Default() seeded by NewDynamoHandler, but a
221+
// nil forwarder here is a meaningful "disable forwarding" state
222+
// (the gate in tryForwardCreate / tryForwardDelete checks for
223+
// nil and falls back to the leader-only 503 path).
224+
func (h *DynamoHandler) WithLeaderForwarder(f LeaderForwarder) *DynamoHandler {
225+
h.forwarder = f
226+
return h
227+
}
228+
204229
// ServeHTTP routes /tables and /tables/{name}. We do not use
205230
// http.ServeMux because the admin router already guards the
206231
// /admin/api/v1/* prefix — adding another mux here would just
@@ -313,12 +338,48 @@ func (h *DynamoHandler) handleCreate(w http.ResponseWriter, r *http.Request) {
313338
}
314339
summary, err := h.source.AdminCreateTable(r.Context(), principal, body)
315340
if err != nil {
341+
// On a follower, the source returns ErrTablesNotLeader. If
342+
// a forwarder is wired, dispatch to the leader and re-emit
343+
// the leader's response verbatim — the SPA cannot tell the
344+
// difference between a leader-direct call and a forwarded
345+
// one. Without a forwarder, fall through to the standard
346+
// 503 leader_unavailable response.
347+
if h.tryForwardCreate(w, r, principal, body, err) {
348+
return
349+
}
316350
h.writeTablesError(w, r, "create", err)
317351
return
318352
}
319353
writeAdminJSONStatus(w, r.Context(), h.logger, http.StatusCreated, summary)
320354
}
321355

356+
// tryForwardCreate handles the follower→leader forwarding path
357+
// for POST /tables. Returns true when the response has been
358+
// written (regardless of forward success/failure); the caller
359+
// should then return without further processing.
360+
//
361+
// The "fall through to 503" path runs only when:
362+
// - the source error is something other than ErrTablesNotLeader,
363+
// - or no LeaderForwarder was configured,
364+
// - or the forwarder itself returned ErrLeaderUnavailable
365+
// (election in progress on the leader, criterion 3).
366+
//
367+
// Any other forwarder failure (gRPC transport error, etc.) is
368+
// also surfaced as 503 + Retry-After: 1 so the SPA can re-issue.
369+
// We log the raw error for operators and never echo it to clients.
370+
func (h *DynamoHandler) tryForwardCreate(w http.ResponseWriter, r *http.Request, principal AuthPrincipal, body CreateTableRequest, sourceErr error) bool {
371+
if !errors.Is(sourceErr, ErrTablesNotLeader) || h.forwarder == nil {
372+
return false
373+
}
374+
res, err := h.forwarder.ForwardCreateTable(r.Context(), principal, body)
375+
if err != nil {
376+
h.writeForwardFailure(w, r, "create", err)
377+
return true
378+
}
379+
h.writeForwardResult(w, r, res)
380+
return true
381+
}
382+
322383
// handleDelete is the DELETE /tables/{name} handler. Success is
323384
// 204 No Content; the body is intentionally empty so the SPA can
324385
// treat both 200 and 204 as success without parsing.
@@ -332,6 +393,9 @@ func (h *DynamoHandler) handleDelete(w http.ResponseWriter, r *http.Request, nam
332393
return
333394
}
334395
if err := h.source.AdminDeleteTable(r.Context(), principal, name); err != nil {
396+
if h.tryForwardDelete(w, r, principal, name, err) {
397+
return
398+
}
335399
h.writeTablesError(w, r, "delete", err)
336400
return
337401
}
@@ -390,6 +454,72 @@ func (h *DynamoHandler) principalForWrite(w http.ResponseWriter, r *http.Request
390454
return principal, true
391455
}
392456

457+
// tryForwardDelete is the DELETE counterpart of tryForwardCreate.
458+
// Same semantics: only the ErrTablesNotLeader source error
459+
// triggers forwarding, and only when a forwarder is configured.
460+
func (h *DynamoHandler) tryForwardDelete(w http.ResponseWriter, r *http.Request, principal AuthPrincipal, name string, sourceErr error) bool {
461+
if !errors.Is(sourceErr, ErrTablesNotLeader) || h.forwarder == nil {
462+
return false
463+
}
464+
res, err := h.forwarder.ForwardDeleteTable(r.Context(), principal, name)
465+
if err != nil {
466+
h.writeForwardFailure(w, r, "delete", err)
467+
return true
468+
}
469+
h.writeForwardResult(w, r, res)
470+
return true
471+
}
472+
473+
// writeForwardResult re-emits the leader's structured response
474+
// verbatim. Status, payload, and content-type all come from the
475+
// gRPC response so a forwarded request looks identical to a
476+
// leader-direct call from the SPA's point of view.
477+
func (h *DynamoHandler) writeForwardResult(w http.ResponseWriter, r *http.Request, res *ForwardResult) {
478+
w.Header().Set("Content-Type", res.ContentType)
479+
// Match writeAdminJSONStatus's hardening on the leader-direct
480+
// path: forwarded responses must also carry nosniff so a SPA
481+
// request that happened to traverse a follower does not
482+
// silently lose the MIME-sniff protection. Claude review on
483+
// PR #644 caught the parity gap.
484+
w.Header().Set("X-Content-Type-Options", "nosniff")
485+
w.Header().Set("Cache-Control", "no-store")
486+
// 503 from the leader (e.g. it stepped down mid-request) must
487+
// carry Retry-After so the client retries; preserve the
488+
// criterion-3 contract on the wire whether the 503 originated
489+
// here or at the leader.
490+
if res.StatusCode == http.StatusServiceUnavailable {
491+
w.Header().Set("Retry-After", "1")
492+
}
493+
w.WriteHeader(res.StatusCode)
494+
if len(res.Payload) > 0 {
495+
if _, err := w.Write(res.Payload); err != nil {
496+
h.logger.LogAttrs(r.Context(), slog.LevelWarn, "admin forward response write failed",
497+
slog.String("error", err.Error()),
498+
)
499+
}
500+
}
501+
}
502+
503+
// writeForwardFailure handles forwarder errors that did not
504+
// produce a structured leader response: ErrLeaderUnavailable
505+
// (election in flight) and gRPC transport errors. Both surface as
506+
// 503 + Retry-After: 1 — the SPA's retry contract is identical
507+
// regardless of whether the leader is briefly absent or the
508+
// network hiccupped.
509+
func (h *DynamoHandler) writeForwardFailure(w http.ResponseWriter, r *http.Request, op string, err error) {
510+
if !errors.Is(err, ErrLeaderUnavailable) {
511+
// Not the "no leader known" case — log the raw error so
512+
// operators can investigate. Client still sees the same
513+
// 503 + Retry-After so they retry uniformly.
514+
h.logger.LogAttrs(r.Context(), slog.LevelError, "admin dynamo "+op+" forward failed",
515+
slog.String("error", err.Error()),
516+
)
517+
}
518+
w.Header().Set("Retry-After", "1")
519+
writeJSONError(w, http.StatusServiceUnavailable, "leader_unavailable",
520+
"raft leader currently unavailable; retry shortly")
521+
}
522+
393523
// writeTablesError translates a TablesSource error into the
394524
// appropriate HTTP response. Internal-server-error fallthrough logs
395525
// the raw err.Error() but never sends it to the client, matching
@@ -400,9 +530,10 @@ func (h *DynamoHandler) writeTablesError(w http.ResponseWriter, r *http.Request,
400530
writeJSONError(w, http.StatusForbidden, "forbidden",
401531
"this endpoint requires a full-access role")
402532
case errors.Is(err, ErrTablesNotLeader):
403-
// The follower→leader forwarding RPC (design 3.3) will
404-
// catch this case in a follow-up PR. Until then, surface
405-
// 503 + Retry-After: 1 so the SPA / curl can re-issue.
533+
// Reached only when no LeaderForwarder is configured (single-
534+
// node or leader-only deployments). When a forwarder is wired,
535+
// tryForwardCreate / tryForwardDelete intercept ErrTablesNotLeader
536+
// before writeTablesError is called.
406537
w.Header().Set("Retry-After", "1")
407538
writeJSONError(w, http.StatusServiceUnavailable, "leader_unavailable",
408539
"this admin node is not the raft leader")

internal/admin/forward_client.go

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
package admin
2+
3+
import (
4+
"context"
5+
"errors"
6+
"net/http"
7+
8+
pb "github.com/bootjp/elastickv/proto"
9+
pkgerrors "github.com/cockroachdb/errors"
10+
"github.com/goccy/go-json"
11+
"google.golang.org/grpc"
12+
)
13+
14+
// LeaderForwarder is the contract the admin HTTP handler invokes
15+
// when the local node is a follower (the source returned
16+
// ErrTablesNotLeader). Implementations dial the current leader
17+
// over the AdminForward gRPC service and return the leader's
18+
// response in a transport-neutral shape so the handler can re-emit
19+
// it verbatim.
20+
//
21+
// Defining this interface in the admin package — rather than wiring
22+
// pb.AdminForwardClient directly into the handler — keeps the
23+
// admin HTTP layer free of any proto-level coupling and lets tests
24+
// substitute a deterministic stub. The bridge in main_admin.go
25+
// provides the production implementation that uses
26+
// kv.GRPCConnCache + the raft engine's leader address.
27+
type LeaderForwarder interface {
28+
// ForwardCreateTable issues a forwarded CreateTable on the
29+
// leader's behalf. The response is the leader's structured
30+
// AdminForwardResponse re-shaped into ForwardResult so the
31+
// handler does not need to import proto.
32+
ForwardCreateTable(ctx context.Context, principal AuthPrincipal, in CreateTableRequest) (*ForwardResult, error)
33+
// ForwardDeleteTable is the delete-side counterpart.
34+
ForwardDeleteTable(ctx context.Context, principal AuthPrincipal, name string) (*ForwardResult, error)
35+
}
36+
37+
// ForwardResult is the leader's response replayed for the SPA. The
38+
// handler writes Payload verbatim with the given status code and
39+
// content type, so a forwarded request is indistinguishable from a
40+
// leader-direct call.
41+
type ForwardResult struct {
42+
StatusCode int
43+
Payload []byte
44+
ContentType string
45+
}
46+
47+
// ErrLeaderUnavailable is returned when the forwarder cannot reach
48+
// any leader — typically during a Raft election or a cluster split.
49+
// The handler maps it to 503 + Retry-After: 1 so the SPA / client
50+
// re-issues the request after a short delay (acceptance criterion 3).
51+
var ErrLeaderUnavailable = errors.New("admin: raft leader currently unavailable")
52+
53+
// LeaderAddressResolver returns the current Raft leader's address
54+
// for the local node's group, or "" if no leader is known. The
55+
// production wiring uses raftengine.Engine.LeaderAddr / the
56+
// cluster's address map; tests inject a fixed string.
57+
type LeaderAddressResolver func() string
58+
59+
// GRPCConnFactory is the small surface AdminForwardClient needs
60+
// from kv.GRPCConnCache. Pulling out an interface lets tests
61+
// substitute an in-memory dialer without spinning up a TCP
62+
// listener and lets the bridge use the existing connection cache
63+
// without copy-paste.
64+
type GRPCConnFactory interface {
65+
// ConnFor returns a gRPC client connection to addr, reusing
66+
// the cached entry if one exists. addr "" is a programming
67+
// error and may panic; callers must check leader-empty before
68+
// dialling.
69+
ConnFor(addr string) (PBAdminForwardClient, error)
70+
}
71+
72+
// PBAdminForwardClient narrows pb.AdminForwardClient to just the
73+
// methods this package uses. The narrower interface keeps the test
74+
// stub implementation small.
75+
//
76+
// The opts parameter must use grpc.CallOption (not interface{}) so
77+
// the proto-generated *adminForwardClient satisfies this interface
78+
// directly — Go interface satisfaction requires exact method-
79+
// signature match, including variadic element types. Claude review
80+
// on PR #644 caught the mismatch before the bridge tried to assign
81+
// pb.NewAdminForwardClient(conn) and the build broke.
82+
type PBAdminForwardClient interface {
83+
Forward(ctx context.Context, in *pb.AdminForwardRequest, opts ...grpc.CallOption) (*pb.AdminForwardResponse, error)
84+
}
85+
86+
// gRPCForwardClient is the production LeaderForwarder. Construct
87+
// one with NewGRPCForwardClient. Two collaborators are required:
88+
// - resolver: returns the current leader address, or "" if absent
89+
// - dial: turns an address into a PBAdminForwardClient (the
90+
// bridge wraps kv.GRPCConnCache to satisfy this)
91+
//
92+
// nodeID is echoed into the leader's audit log via
93+
// AdminForwardRequest.forwarded_from (acceptance criterion 6).
94+
type gRPCForwardClient struct {
95+
resolver LeaderAddressResolver
96+
dial GRPCConnFactory
97+
nodeID string
98+
}
99+
100+
// NewGRPCForwardClient constructs the production LeaderForwarder.
101+
// All three parameters must be non-nil / non-empty; otherwise the
102+
// constructor returns nil and a wiring-error so a misconfigured
103+
// build refuses to start rather than producing 500s at runtime.
104+
func NewGRPCForwardClient(resolver LeaderAddressResolver, dial GRPCConnFactory, nodeID string) (LeaderForwarder, error) {
105+
if resolver == nil {
106+
return nil, errors.New("admin forwarder: leader address resolver is required")
107+
}
108+
if dial == nil {
109+
return nil, errors.New("admin forwarder: gRPC connection factory is required")
110+
}
111+
if nodeID == "" {
112+
return nil, errors.New("admin forwarder: node id is required for audit log enrichment")
113+
}
114+
return &gRPCForwardClient{resolver: resolver, dial: dial, nodeID: nodeID}, nil
115+
}
116+
117+
// ForwardCreateTable serialises `in` as JSON and dispatches the
118+
// CreateTable operation to the leader. Returns ErrLeaderUnavailable
119+
// when no leader address is known; gRPC-level errors are wrapped
120+
// and surfaced unchanged so the handler can decide whether to
121+
// retry, log, or 500.
122+
func (c *gRPCForwardClient) ForwardCreateTable(ctx context.Context, principal AuthPrincipal, in CreateTableRequest) (*ForwardResult, error) {
123+
payload, err := json.Marshal(in)
124+
if err != nil {
125+
// CreateTableRequest is plain string fields; Marshal
126+
// cannot fail in practice. Surface the error rather than
127+
// silently dropping the request.
128+
return nil, pkgerrors.Wrap(err, "admin forward: marshal create-table request")
129+
}
130+
return c.forward(ctx, pb.AdminOperation_ADMIN_OP_CREATE_TABLE, principal, payload)
131+
}
132+
133+
// ForwardDeleteTable serialises the table name as `{"name":"..."}`
134+
// to match the leader-side handleDelete contract.
135+
func (c *gRPCForwardClient) ForwardDeleteTable(ctx context.Context, principal AuthPrincipal, name string) (*ForwardResult, error) {
136+
payload, err := json.Marshal(struct {
137+
Name string `json:"name"`
138+
}{Name: name})
139+
if err != nil {
140+
return nil, pkgerrors.Wrap(err, "admin forward: marshal delete-table request")
141+
}
142+
return c.forward(ctx, pb.AdminOperation_ADMIN_OP_DELETE_TABLE, principal, payload)
143+
}
144+
145+
func (c *gRPCForwardClient) forward(ctx context.Context, op pb.AdminOperation, principal AuthPrincipal, payload []byte) (*ForwardResult, error) {
146+
addr := c.resolver()
147+
if addr == "" {
148+
return nil, ErrLeaderUnavailable
149+
}
150+
cli, err := c.dial.ConnFor(addr)
151+
if err != nil {
152+
return nil, pkgerrors.Wrap(err, "admin forward: dial leader")
153+
}
154+
resp, err := cli.Forward(ctx, &pb.AdminForwardRequest{
155+
Principal: &pb.AdminPrincipal{
156+
AccessKey: principal.AccessKey,
157+
Role: string(principal.Role),
158+
},
159+
Operation: op,
160+
Payload: payload,
161+
ForwardedFrom: c.nodeID,
162+
})
163+
if err != nil {
164+
return nil, pkgerrors.Wrap(err, "admin forward: rpc")
165+
}
166+
out := &ForwardResult{
167+
StatusCode: int(resp.GetStatusCode()),
168+
Payload: resp.GetPayload(),
169+
ContentType: resp.GetContentType(),
170+
}
171+
if out.ContentType == "" {
172+
// The leader server always sets a content type, but be
173+
// defensive: a future change that drops it must not
174+
// produce a SPA response with no Content-Type header.
175+
out.ContentType = "application/json; charset=utf-8"
176+
}
177+
if out.StatusCode == 0 {
178+
// status_code 0 is the proto's zero value, which the
179+
// leader server never emits intentionally. Treat it as a
180+
// transport bug rather than a 200, since 0 is not a valid
181+
// HTTP status.
182+
out.StatusCode = http.StatusBadGateway
183+
}
184+
return out, nil
185+
}

0 commit comments

Comments
 (0)