Skip to content

Commit 8604343

Browse files
committed
admin: add AdminForward RPC + leader-side dispatcher (P1, partial)
Lays down the proto contract and the leader-side handler for the follower-leader forwarding path described in design Section 3.3. The follower-side client + bridge wiring lands in a separate PR to keep this one reviewable. Proto: - proto/admin_forward.proto with AdminForward.Forward(...) - AdminPrincipal carries access_key + role; the leader re-evaluates it against its own role index rather than trusting the follower (Section 3.3.1 invariant) - AdminOperation enum keeps the proto operation-agnostic so adding a future endpoint does not require a wire-format change - payload bytes carry the operation-specific JSON body verbatim - forwarded_from echoes the follower's node id into the leader's audit log line (acceptance criterion 6) Leader-side handler (internal/admin/forward_server.go): - ForwardServer implements pb.AdminForwardServer - RoleStore / MapRoleStore keep the access-key role lookup abstract so tests can swap in an in-memory map - validatePrincipal demotes a follower-claimed full role to forbidden when the leader's view is read-only (criterion 4) - forwardErrorResponse mirrors the HTTP handler's writeTablesError status mapping so a forwarded call is indistinguishable from a leader-direct call to the SPA - audit log entries on success carry forwarded_from; the leader is the source of truth (criterion 6) What is NOT in this PR (follow-ups): - Follower-side AdminForward client + bridge wiring (criterion 2) - Election-period 503 + Retry-After handling (criterion 3) - Rolling-upgrade compatibility flag admin.leader_forward_v2 (criterion 5) — needs a Raft-level cluster version bump - gRPC server registration in main.go Tests cover criteria 1, 4, 6 (leader direct, principal demotion, forwarded_from in audit log) and the structured error mapping (create-table 409, delete missing 404, generic error 500 with sanitised body, bad JSON 400, unknown operation 400).
1 parent 70e16e1 commit 8604343

6 files changed

Lines changed: 1012 additions & 0 deletions

File tree

internal/admin/forward_server.go

Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
package admin
2+
3+
import (
4+
"context"
5+
"errors"
6+
"log/slog"
7+
"net/http"
8+
9+
pb "github.com/bootjp/elastickv/proto"
10+
"github.com/goccy/go-json"
11+
)
12+
13+
// ForwardServer is the leader-side gRPC handler for the AdminForward
14+
// RPC (design Section 3.3). The follower's admin HTTP layer calls it
15+
// when the local node is not the Raft leader; this server then
16+
// re-validates the principal, dispatches the operation against the
17+
// local TablesSource, and serialises the result back to the
18+
// follower in the same JSON shape the SPA would have received from a
19+
// leader-direct call.
20+
//
21+
// The server is deliberately kept independent of the dynamo HTTP
22+
// handler: it runs in the gRPC server's goroutine pool, not in the
23+
// HTTP server's, and shares only the TablesSource interface (which
24+
// the bridge in main_admin.go already implements for the local
25+
// adapter).
26+
type ForwardServer struct {
27+
pb.UnimplementedAdminForwardServer
28+
29+
source TablesSource
30+
roles RoleStore
31+
logger *slog.Logger
32+
}
33+
34+
// RoleStore is the access-key → role lookup the leader uses to
35+
// re-validate the inbound principal. Implementations should mirror
36+
// the admin server's `Roles` map; production passes a typed wrapper
37+
// around that map so tests can swap in an in-memory stub.
38+
type RoleStore interface {
39+
// LookupRole returns the role for an access key as understood
40+
// by the leader's view of cluster configuration. The bool is
41+
// false when the access key is not in the admin role index — a
42+
// follower that forwarded the principal should not be able to
43+
// "make up" an admin identity.
44+
LookupRole(accessKey string) (Role, bool)
45+
}
46+
47+
// MapRoleStore is the trivial in-memory implementation, sufficient
48+
// for tests and for the production wiring (which already keeps the
49+
// role map in memory).
50+
type MapRoleStore map[string]Role
51+
52+
// LookupRole implements RoleStore.
53+
func (m MapRoleStore) LookupRole(accessKey string) (Role, bool) {
54+
r, ok := m[accessKey]
55+
return r, ok
56+
}
57+
58+
// NewForwardServer wires a TablesSource and a RoleStore behind the
59+
// gRPC AdminForward service. logger may be nil; defaults to
60+
// slog.Default().
61+
func NewForwardServer(source TablesSource, roles RoleStore, logger *slog.Logger) *ForwardServer {
62+
if logger == nil {
63+
logger = slog.Default()
64+
}
65+
return &ForwardServer{source: source, roles: roles, logger: logger}
66+
}
67+
68+
// Forward is the gRPC entrypoint. It performs the principal
69+
// re-evaluation the design mandates, then dispatches by operation.
70+
// Errors that the SPA can act on are returned as a structured
71+
// AdminForwardResponse with status_code + JSON payload; only fatal
72+
// gRPC-layer errors (decode failure, unknown operation) come back as
73+
// status.Errorf to the follower.
74+
func (s *ForwardServer) Forward(ctx context.Context, req *pb.AdminForwardRequest) (*pb.AdminForwardResponse, error) {
75+
if req == nil || req.GetPrincipal() == nil {
76+
return rejectForward(http.StatusBadRequest, "invalid_request", "missing principal")
77+
}
78+
principal, ok := s.validatePrincipal(req.GetPrincipal())
79+
if !ok {
80+
// Don't leak why the principal failed — the follower may
81+
// have a different view of the cluster's role config and
82+
// we want operators to investigate from the audit log on
83+
// the leader, not the follower's response body.
84+
s.logger.LogAttrs(ctx, slog.LevelWarn, "admin_forward_principal_rejected",
85+
slog.String("forwarded_from", req.GetForwardedFrom()),
86+
slog.String("claimed_access_key", req.GetPrincipal().GetAccessKey()),
87+
slog.String("claimed_role", req.GetPrincipal().GetRole()),
88+
)
89+
return rejectForward(http.StatusForbidden, "forbidden",
90+
"this endpoint requires a full-access role")
91+
}
92+
switch req.GetOperation() {
93+
case pb.AdminOperation_ADMIN_OP_CREATE_TABLE:
94+
return s.handleCreate(ctx, principal, req)
95+
case pb.AdminOperation_ADMIN_OP_DELETE_TABLE:
96+
return s.handleDelete(ctx, principal, req)
97+
case pb.AdminOperation_ADMIN_OP_UNSPECIFIED:
98+
return rejectForward(http.StatusBadRequest, "invalid_request", "unknown admin operation")
99+
default:
100+
return rejectForward(http.StatusBadRequest, "invalid_request", "unknown admin operation")
101+
}
102+
}
103+
104+
func (s *ForwardServer) validatePrincipal(p *pb.AdminPrincipal) (AuthPrincipal, bool) {
105+
accessKey := p.GetAccessKey()
106+
if accessKey == "" {
107+
return AuthPrincipal{}, false
108+
}
109+
role, ok := s.roles.LookupRole(accessKey)
110+
if !ok {
111+
return AuthPrincipal{}, false
112+
}
113+
// Critical re-evaluation: if the leader sees this access key as
114+
// read-only, the operation is forbidden even if the follower
115+
// thought it was full. The reverse — leader sees full, follower
116+
// sees read-only — would have been short-circuited at the
117+
// follower already, so we do not need to check it here.
118+
if !role.AllowsWrite() {
119+
return AuthPrincipal{}, false
120+
}
121+
return AuthPrincipal{AccessKey: accessKey, Role: role}, true
122+
}
123+
124+
func (s *ForwardServer) handleCreate(ctx context.Context, principal AuthPrincipal, req *pb.AdminForwardRequest) (*pb.AdminForwardResponse, error) {
125+
var body CreateTableRequest
126+
if err := json.Unmarshal(req.GetPayload(), &body); err != nil {
127+
return rejectForward(http.StatusBadRequest, "invalid_body", "request body is not valid JSON")
128+
}
129+
summary, err := s.source.AdminCreateTable(ctx, principal, body)
130+
if err != nil {
131+
return forwardErrorResponse(err), nil
132+
}
133+
s.logger.LogAttrs(ctx, slog.LevelInfo, "admin_audit",
134+
slog.String("actor", principal.AccessKey),
135+
slog.String("role", string(principal.Role)),
136+
slog.String("forwarded_from", req.GetForwardedFrom()),
137+
slog.String("operation", "create_table"),
138+
slog.String("table", body.TableName),
139+
)
140+
return jsonForwardResponse(http.StatusCreated, summary)
141+
}
142+
143+
func (s *ForwardServer) handleDelete(ctx context.Context, principal AuthPrincipal, req *pb.AdminForwardRequest) (*pb.AdminForwardResponse, error) {
144+
// Delete carries the table name in the payload as JSON so the
145+
// proto stays operation-agnostic — there is no operation-specific
146+
// field in AdminForwardRequest, by design (adding one per op
147+
// would couple every new admin endpoint to the proto schema).
148+
var body struct {
149+
Name string `json:"name"`
150+
}
151+
if err := json.Unmarshal(req.GetPayload(), &body); err != nil || body.Name == "" {
152+
return rejectForward(http.StatusBadRequest, "invalid_body", "delete payload missing name")
153+
}
154+
if err := s.source.AdminDeleteTable(ctx, principal, body.Name); err != nil {
155+
return forwardErrorResponse(err), nil
156+
}
157+
s.logger.LogAttrs(ctx, slog.LevelInfo, "admin_audit",
158+
slog.String("actor", principal.AccessKey),
159+
slog.String("role", string(principal.Role)),
160+
slog.String("forwarded_from", req.GetForwardedFrom()),
161+
slog.String("operation", "delete_table"),
162+
slog.String("table", body.Name),
163+
)
164+
return &pb.AdminForwardResponse{StatusCode: http.StatusNoContent}, nil
165+
}
166+
167+
// forwardErrorResponse re-encodes a TablesSource error in the
168+
// structured shape the follower's handler can re-emit verbatim. This
169+
// is the leader-side counterpart of writeTablesError: every status /
170+
// JSON code the HTTP handler chooses is mirrored here so a forwarded
171+
// call is indistinguishable to the SPA from a leader-direct call.
172+
func forwardErrorResponse(err error) *pb.AdminForwardResponse {
173+
switch {
174+
case errors.Is(err, ErrTablesForbidden):
175+
return mustForwardJSON(http.StatusForbidden, errorBody{Error: "forbidden", Message: "this endpoint requires a full-access role"})
176+
case errors.Is(err, ErrTablesNotLeader):
177+
// Should never happen on the leader path — the leader
178+
// just verified itself — but if a leadership transfer
179+
// races with the dispatch, surface it consistently.
180+
return mustForwardJSON(http.StatusServiceUnavailable, errorBody{Error: "leader_unavailable", Message: "leader stepped down mid-request"})
181+
case errors.Is(err, ErrTablesNotFound):
182+
return mustForwardJSON(http.StatusNotFound, errorBody{Error: "not_found", Message: "table does not exist"})
183+
case errors.Is(err, ErrTablesAlreadyExists):
184+
return mustForwardJSON(http.StatusConflict, errorBody{Error: "already_exists", Message: "table already exists"})
185+
}
186+
var verr *ValidationError
187+
if errors.As(err, &verr) {
188+
return mustForwardJSON(http.StatusBadRequest, errorBody{Error: "invalid_request", Message: verr.Error()})
189+
}
190+
return mustForwardJSON(http.StatusInternalServerError, errorBody{Error: "internal", Message: "internal error; see leader logs"})
191+
}
192+
193+
// errorBody is the shared JSON shape for both the HTTP handler's
194+
// writeJSONError and the forward server's encoded responses.
195+
type errorBody struct {
196+
Error string `json:"error"`
197+
Message string `json:"message,omitempty"`
198+
}
199+
200+
func rejectForward(status int, code, msg string) (*pb.AdminForwardResponse, error) {
201+
return mustForwardJSON(status, errorBody{Error: code, Message: msg}), nil
202+
}
203+
204+
func mustForwardJSON(status int, body any) *pb.AdminForwardResponse {
205+
payload, err := json.Marshal(body)
206+
if err != nil {
207+
// json.Marshal on a struct of strings cannot fail in
208+
// practice; a 500 with a bare string body is the safest
209+
// fallback if it ever does.
210+
return &pb.AdminForwardResponse{
211+
StatusCode: http.StatusInternalServerError,
212+
Payload: []byte(`{"error":"internal","message":"failed to encode response"}`),
213+
ContentType: "application/json; charset=utf-8",
214+
}
215+
}
216+
return &pb.AdminForwardResponse{
217+
StatusCode: int32(status), //nolint:gosec // status fits in int32; net/http codes are 100-599.
218+
Payload: payload,
219+
ContentType: "application/json; charset=utf-8",
220+
}
221+
}
222+
223+
func jsonForwardResponse(status int, body any) (*pb.AdminForwardResponse, error) {
224+
return mustForwardJSON(status, body), nil
225+
}

0 commit comments

Comments
 (0)