Skip to content
151 changes: 141 additions & 10 deletions internal/admin/dynamo_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,10 @@ var (
// role required for the operation. Maps to 403.
ErrTablesForbidden = errors.New("admin tables: principal lacks required role")
// ErrTablesNotLeader is returned when the local node is not the
// Raft leader. Maps to 503 + Retry-After: 1 today; the future
// AdminForward RPC catches this as the trigger to forward.
// Raft leader. When a LeaderForwarder is configured,
// tryForwardCreate / tryForwardDelete catch this before it reaches
// writeTablesError and forward the request to the leader
// transparently. Without a forwarder, maps to 503 + Retry-After: 1.
ErrTablesNotLeader = errors.New("admin tables: local node is not the raft leader")
// ErrTablesNotFound is returned when DELETE / DESCRIBE / a
// follow-up read targets a table that does not exist. Maps to
Expand Down Expand Up @@ -168,15 +170,22 @@ func (e *ValidationError) Error() string {
// — the JWT freezes the role at login time, and tokens last one
// hour. Codex P1 on PR #635 flagged the gap on the HTTP path;
// the forward server already does this re-evaluation on its side.
//
// When the source returns ErrTablesNotLeader and a LeaderForwarder
// is configured, write requests are forwarded to the leader
// transparently — the SPA sees a leader-direct response shape
// regardless of which node it hit (design Section 3.3 criterion 2).
type DynamoHandler struct {
source TablesSource
roles RoleStore
logger *slog.Logger
source TablesSource
roles RoleStore
forwarder LeaderForwarder
logger *slog.Logger
}

// NewDynamoHandler binds the source and seeds logging with
// slog.Default(). Use WithLogger to attach a tagged logger and
// WithRoleStore to plug in the live access-key role lookup.
// slog.Default(). Use WithLogger to attach a tagged logger,
// WithRoleStore to plug in the live access-key role lookup, and
// WithLeaderForwarder to plug in the follower→leader forwarder.
func NewDynamoHandler(source TablesSource) *DynamoHandler {
return &DynamoHandler{source: source, logger: slog.Default()}
}
Expand All @@ -201,6 +210,22 @@ func (h *DynamoHandler) WithRoleStore(r RoleStore) *DynamoHandler {
return h
}

// WithLeaderForwarder enables transparent follower→leader
// forwarding. Without it, write requests on a follower fall back
// to the standard 503 leader_unavailable response. Production
// wires this to the gRPCForwardClient in main_admin.go; tests
// inject a stub.
//
// Asymmetric vs WithLogger by design: WithLogger no-ops on nil to
// preserve the slog.Default() seeded by NewDynamoHandler, but a
// nil forwarder here is a meaningful "disable forwarding" state
// (the gate in tryForwardCreate / tryForwardDelete checks for
// nil and falls back to the leader-only 503 path).
func (h *DynamoHandler) WithLeaderForwarder(f LeaderForwarder) *DynamoHandler {
h.forwarder = f
return h
}

// ServeHTTP routes /tables and /tables/{name}. We do not use
// http.ServeMux because the admin router already guards the
// /admin/api/v1/* prefix — adding another mux here would just
Expand Down Expand Up @@ -313,12 +338,48 @@ func (h *DynamoHandler) handleCreate(w http.ResponseWriter, r *http.Request) {
}
summary, err := h.source.AdminCreateTable(r.Context(), principal, body)
if err != nil {
// On a follower, the source returns ErrTablesNotLeader. If
// a forwarder is wired, dispatch to the leader and re-emit
// the leader's response verbatim — the SPA cannot tell the
// difference between a leader-direct call and a forwarded
// one. Without a forwarder, fall through to the standard
// 503 leader_unavailable response.
if h.tryForwardCreate(w, r, principal, body, err) {
return
}
h.writeTablesError(w, r, "create", err)
return
}
writeAdminJSONStatus(w, r.Context(), h.logger, http.StatusCreated, summary)
}

// tryForwardCreate handles the follower→leader forwarding path
// for POST /tables. Returns true when the response has been
// written (regardless of forward success/failure); the caller
// should then return without further processing.
//
// The "fall through to 503" path runs only when:
// - the source error is something other than ErrTablesNotLeader,
// - or no LeaderForwarder was configured,
// - or the forwarder itself returned ErrLeaderUnavailable
// (election in progress on the leader, criterion 3).
//
// Any other forwarder failure (gRPC transport error, etc.) is
// also surfaced as 503 + Retry-After: 1 so the SPA can re-issue.
// We log the raw error for operators and never echo it to clients.
func (h *DynamoHandler) tryForwardCreate(w http.ResponseWriter, r *http.Request, principal AuthPrincipal, body CreateTableRequest, sourceErr error) bool {
if !errors.Is(sourceErr, ErrTablesNotLeader) || h.forwarder == nil {
return false
}
res, err := h.forwarder.ForwardCreateTable(r.Context(), principal, body)
if err != nil {
h.writeForwardFailure(w, r, "create", err)
return true
}
h.writeForwardResult(w, r, res)
return true
}

// handleDelete is the DELETE /tables/{name} handler. Success is
// 204 No Content; the body is intentionally empty so the SPA can
// treat both 200 and 204 as success without parsing.
Expand All @@ -332,6 +393,9 @@ func (h *DynamoHandler) handleDelete(w http.ResponseWriter, r *http.Request, nam
return
}
if err := h.source.AdminDeleteTable(r.Context(), principal, name); err != nil {
if h.tryForwardDelete(w, r, principal, name, err) {
return
}
h.writeTablesError(w, r, "delete", err)
return
}
Expand Down Expand Up @@ -390,6 +454,72 @@ func (h *DynamoHandler) principalForWrite(w http.ResponseWriter, r *http.Request
return principal, true
}

// tryForwardDelete is the DELETE counterpart of tryForwardCreate.
// Same semantics: only the ErrTablesNotLeader source error
// triggers forwarding, and only when a forwarder is configured.
func (h *DynamoHandler) tryForwardDelete(w http.ResponseWriter, r *http.Request, principal AuthPrincipal, name string, sourceErr error) bool {
if !errors.Is(sourceErr, ErrTablesNotLeader) || h.forwarder == nil {
return false
}
res, err := h.forwarder.ForwardDeleteTable(r.Context(), principal, name)
if err != nil {
h.writeForwardFailure(w, r, "delete", err)
return true
}
h.writeForwardResult(w, r, res)
return true
}

// writeForwardResult re-emits the leader's structured response
// verbatim. Status, payload, and content-type all come from the
// gRPC response so a forwarded request looks identical to a
// leader-direct call from the SPA's point of view.
func (h *DynamoHandler) writeForwardResult(w http.ResponseWriter, r *http.Request, res *ForwardResult) {
w.Header().Set("Content-Type", res.ContentType)
// Match writeAdminJSONStatus's hardening on the leader-direct
// path: forwarded responses must also carry nosniff so a SPA
// request that happened to traverse a follower does not
// silently lose the MIME-sniff protection. Claude review on
// PR #644 caught the parity gap.
w.Header().Set("X-Content-Type-Options", "nosniff")
w.Header().Set("Cache-Control", "no-store")
// 503 from the leader (e.g. it stepped down mid-request) must
// carry Retry-After so the client retries; preserve the
// criterion-3 contract on the wire whether the 503 originated
// here or at the leader.
if res.StatusCode == http.StatusServiceUnavailable {
w.Header().Set("Retry-After", "1")
}
w.WriteHeader(res.StatusCode)
if len(res.Payload) > 0 {
if _, err := w.Write(res.Payload); err != nil {
h.logger.LogAttrs(r.Context(), slog.LevelWarn, "admin forward response write failed",
slog.String("error", err.Error()),
)
}
}
}

// writeForwardFailure handles forwarder errors that did not
// produce a structured leader response: ErrLeaderUnavailable
// (election in flight) and gRPC transport errors. Both surface as
// 503 + Retry-After: 1 — the SPA's retry contract is identical
// regardless of whether the leader is briefly absent or the
// network hiccupped.
func (h *DynamoHandler) writeForwardFailure(w http.ResponseWriter, r *http.Request, op string, err error) {
if !errors.Is(err, ErrLeaderUnavailable) {
// Not the "no leader known" case — log the raw error so
// operators can investigate. Client still sees the same
// 503 + Retry-After so they retry uniformly.
h.logger.LogAttrs(r.Context(), slog.LevelError, "admin dynamo "+op+" forward failed",
slog.String("error", err.Error()),
)
}
w.Header().Set("Retry-After", "1")
writeJSONError(w, http.StatusServiceUnavailable, "leader_unavailable",
"raft leader currently unavailable; retry shortly")
}

// writeTablesError translates a TablesSource error into the
// appropriate HTTP response. Internal-server-error fallthrough logs
// the raw err.Error() but never sends it to the client, matching
Expand All @@ -400,9 +530,10 @@ func (h *DynamoHandler) writeTablesError(w http.ResponseWriter, r *http.Request,
writeJSONError(w, http.StatusForbidden, "forbidden",
"this endpoint requires a full-access role")
case errors.Is(err, ErrTablesNotLeader):
// The follower→leader forwarding RPC (design 3.3) will
// catch this case in a follow-up PR. Until then, surface
// 503 + Retry-After: 1 so the SPA / curl can re-issue.
// Reached only when no LeaderForwarder is configured (single-
// node or leader-only deployments). When a forwarder is wired,
// tryForwardCreate / tryForwardDelete intercept ErrTablesNotLeader
// before writeTablesError is called.
w.Header().Set("Retry-After", "1")
writeJSONError(w, http.StatusServiceUnavailable, "leader_unavailable",
"this admin node is not the raft leader")
Expand Down
185 changes: 185 additions & 0 deletions internal/admin/forward_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package admin

import (
"context"
"errors"
"net/http"

pb "github.com/bootjp/elastickv/proto"
pkgerrors "github.com/cockroachdb/errors"
"github.com/goccy/go-json"
"google.golang.org/grpc"
)

// LeaderForwarder is the contract the admin HTTP handler invokes
// when the local node is a follower (the source returned
// ErrTablesNotLeader). Implementations dial the current leader
// over the AdminForward gRPC service and return the leader's
// response in a transport-neutral shape so the handler can re-emit
// it verbatim.
//
// Defining this interface in the admin package — rather than wiring
// pb.AdminForwardClient directly into the handler — keeps the
// admin HTTP layer free of any proto-level coupling and lets tests
// substitute a deterministic stub. The bridge in main_admin.go
// provides the production implementation that uses
// kv.GRPCConnCache + the raft engine's leader address.
type LeaderForwarder interface {
// ForwardCreateTable issues a forwarded CreateTable on the
// leader's behalf. The response is the leader's structured
// AdminForwardResponse re-shaped into ForwardResult so the
// handler does not need to import proto.
ForwardCreateTable(ctx context.Context, principal AuthPrincipal, in CreateTableRequest) (*ForwardResult, error)
// ForwardDeleteTable is the delete-side counterpart.
ForwardDeleteTable(ctx context.Context, principal AuthPrincipal, name string) (*ForwardResult, error)
}

// ForwardResult is the leader's response replayed for the SPA. The
// handler writes Payload verbatim with the given status code and
// content type, so a forwarded request is indistinguishable from a
// leader-direct call.
type ForwardResult struct {
StatusCode int
Payload []byte
ContentType string
}

// ErrLeaderUnavailable is returned when the forwarder cannot reach
// any leader — typically during a Raft election or a cluster split.
// The handler maps it to 503 + Retry-After: 1 so the SPA / client
// re-issues the request after a short delay (acceptance criterion 3).
var ErrLeaderUnavailable = errors.New("admin: raft leader currently unavailable")

// LeaderAddressResolver returns the current Raft leader's address
// for the local node's group, or "" if no leader is known. The
// production wiring uses raftengine.Engine.LeaderAddr / the
// cluster's address map; tests inject a fixed string.
type LeaderAddressResolver func() string

// GRPCConnFactory is the small surface AdminForwardClient needs
// from kv.GRPCConnCache. Pulling out an interface lets tests
// substitute an in-memory dialer without spinning up a TCP
// listener and lets the bridge use the existing connection cache
// without copy-paste.
type GRPCConnFactory interface {
// ConnFor returns a gRPC client connection to addr, reusing
// the cached entry if one exists. addr "" is a programming
// error and may panic; callers must check leader-empty before
// dialling.
ConnFor(addr string) (PBAdminForwardClient, error)
}

// PBAdminForwardClient narrows pb.AdminForwardClient to just the
// methods this package uses. The narrower interface keeps the test
// stub implementation small.
//
// The opts parameter must use grpc.CallOption (not interface{}) so
// the proto-generated *adminForwardClient satisfies this interface
// directly — Go interface satisfaction requires exact method-
// signature match, including variadic element types. Claude review
// on PR #644 caught the mismatch before the bridge tried to assign
// pb.NewAdminForwardClient(conn) and the build broke.
type PBAdminForwardClient interface {
Forward(ctx context.Context, in *pb.AdminForwardRequest, opts ...grpc.CallOption) (*pb.AdminForwardResponse, error)
}

// gRPCForwardClient is the production LeaderForwarder. Construct
// one with NewGRPCForwardClient. Two collaborators are required:
// - resolver: returns the current leader address, or "" if absent
// - dial: turns an address into a PBAdminForwardClient (the
// bridge wraps kv.GRPCConnCache to satisfy this)
//
// nodeID is echoed into the leader's audit log via
// AdminForwardRequest.forwarded_from (acceptance criterion 6).
type gRPCForwardClient struct {
resolver LeaderAddressResolver
dial GRPCConnFactory
nodeID string
}

// NewGRPCForwardClient constructs the production LeaderForwarder.
// All three parameters must be non-nil / non-empty; otherwise the
// constructor returns nil and a wiring-error so a misconfigured
// build refuses to start rather than producing 500s at runtime.
func NewGRPCForwardClient(resolver LeaderAddressResolver, dial GRPCConnFactory, nodeID string) (LeaderForwarder, error) {
if resolver == nil {
return nil, errors.New("admin forwarder: leader address resolver is required")
}
if dial == nil {
return nil, errors.New("admin forwarder: gRPC connection factory is required")
}
if nodeID == "" {
return nil, errors.New("admin forwarder: node id is required for audit log enrichment")
}
return &gRPCForwardClient{resolver: resolver, dial: dial, nodeID: nodeID}, nil
}

// ForwardCreateTable serialises `in` as JSON and dispatches the
// CreateTable operation to the leader. Returns ErrLeaderUnavailable
// when no leader address is known; gRPC-level errors are wrapped
// and surfaced unchanged so the handler can decide whether to
// retry, log, or 500.
func (c *gRPCForwardClient) ForwardCreateTable(ctx context.Context, principal AuthPrincipal, in CreateTableRequest) (*ForwardResult, error) {
payload, err := json.Marshal(in)
if err != nil {
// CreateTableRequest is plain string fields; Marshal
// cannot fail in practice. Surface the error rather than
// silently dropping the request.
return nil, pkgerrors.Wrap(err, "admin forward: marshal create-table request")
}
return c.forward(ctx, pb.AdminOperation_ADMIN_OP_CREATE_TABLE, principal, payload)
}

// ForwardDeleteTable serialises the table name as `{"name":"..."}`
// to match the leader-side handleDelete contract.
func (c *gRPCForwardClient) ForwardDeleteTable(ctx context.Context, principal AuthPrincipal, name string) (*ForwardResult, error) {
payload, err := json.Marshal(struct {
Name string `json:"name"`
}{Name: name})
if err != nil {
return nil, pkgerrors.Wrap(err, "admin forward: marshal delete-table request")
}
return c.forward(ctx, pb.AdminOperation_ADMIN_OP_DELETE_TABLE, principal, payload)
}

func (c *gRPCForwardClient) forward(ctx context.Context, op pb.AdminOperation, principal AuthPrincipal, payload []byte) (*ForwardResult, error) {
addr := c.resolver()
if addr == "" {
return nil, ErrLeaderUnavailable
}
cli, err := c.dial.ConnFor(addr)
if err != nil {
return nil, pkgerrors.Wrap(err, "admin forward: dial leader")
}
resp, err := cli.Forward(ctx, &pb.AdminForwardRequest{
Principal: &pb.AdminPrincipal{
AccessKey: principal.AccessKey,
Role: string(principal.Role),
},
Operation: op,
Payload: payload,
ForwardedFrom: c.nodeID,
})
if err != nil {
return nil, pkgerrors.Wrap(err, "admin forward: rpc")
}
out := &ForwardResult{
StatusCode: int(resp.GetStatusCode()),
Payload: resp.GetPayload(),
ContentType: resp.GetContentType(),
}
if out.ContentType == "" {
// The leader server always sets a content type, but be
// defensive: a future change that drops it must not
// produce a SPA response with no Content-Type header.
out.ContentType = "application/json; charset=utf-8"
}
if out.StatusCode == 0 {
// status_code 0 is the proto's zero value, which the
// leader server never emits intentionally. Treat it as a
// transport bug rather than a 200, since 0 is not a valid
// HTTP status.
out.StatusCode = http.StatusBadGateway
}
return out, nil
}
Loading
Loading