Skip to content

Commit 41c7f0f

Browse files
authored
admin: AdminForward Phase 3 — register gRPC server + wire follower-side bridge (#648)
Phase 3 of Task #26 / AdminForward design 3.3. Closes the wiring loop opened by #635 (leader-side dispatcher) and #644 (follower-side client + handler integration): the admin HTTP DynamoHandler now hands `ErrTablesNotLeader` writes off to the in-process `LeaderForwarder`, and the leader-side gRPC `AdminForward` service is registered on every node so the forwarded RPC actually lands somewhere. ## Summary - **`admin.ServerDeps.Forwarder`** (3315bf4) — new optional field on the high-level admin server composition surface. `admin.NewServer` calls `WithLeaderForwarder` when set; nil leaves the existing 503 + Retry-After:1 fallback untouched. Single-node / leader-only deployments are unaffected. - **`main_admin_forward.go`** (f3e529c) — production bridge between `kv.GRPCConnCache` and the admin layer's typed `PBAdminForwardClient` (via `pb.NewAdminForwardClient`). Includes `buildLeaderForwarder`, `roleStoreFromFlags`, `adminForwardServerDeps`, and a `readyForRegistration` gate. - **Runtime wiring** (19f6b70) — `runtimeServerRunner.start()` now creates `dynamoServer` BEFORE `startRaftServers`, so the leader-side `ForwardServer` can use it as its `TablesSource`. `startRaftServers` accepts the new `adminForwardServerDeps` bundle and delegates to `registerAdminForwardServer`. `startServers` constructs a single `*kv.GRPCConnCache` and forwards it (with the `Coordinator`) to `startAdminFromFlags`, which in turn builds the production `LeaderForwarder` via `buildAdminLeaderForwarder` and passes it through `ServerDeps.Forwarder`. ## What is NOT in this PR - Rolling-upgrade compatibility flag (criterion 5) — still deferred behind a cluster-version bump. - Election-period retry loop on the follower's bridge — the handler returns 503 + Retry-After:1, the SPA / curl re-issues, and the bridge dials again on the next attempt. Inline retry inside the handler would just hide the latency from operators. ## Test plan - [x] `go build ./...` - [x] `go vet ./...` - [x] `golangci-lint run` (main + admin packages: 0 issues) - [x] `go test ./internal/admin/ -count=1 -race` — full admin suite passes (existing 21 forward tests + 1 new `TestServer_ServerDepsForwarderIsWired`) - [x] `go test . -count=1 -race` — main package passes (4 new bridge / role-store / readyForRegistration tests) - [ ] `adapter` package times out at 240s on this branch AND on `main` — pre-existing flake unrelated to this PR (verified by checking out main and running the same suite) - [ ] End-to-end smoke against a 3-node cluster — needs the local Jepsen runner; blocked on the next PR or a manual run ## Acceptance criteria coverage | # | Criterion | This PR | |---|---|---| | 1 | Leader direct write | ✓ (in main since #634) | | 2 | Follower forwards transparently | ✓ — wiring complete; both leader-side gRPC service AND follower-side bridge are now plumbed | | 3 | Election-period 503 + retry | ✓ — handler still returns 503 + Retry-After:1 when no leader is known; the SPA / client retries and the next attempt re-dials | | 4 | Leader demotes stale full role | ✓ (in main since #635) | | 5 | Rolling-upgrade compat flag | ⏳ deferred (cluster-version bump) | | 6 | `forwarded_from` in audit log | ✓ (in main since #635 — bridge now stamps `--raftId` onto every forwarded request) | ## Self-review (5 lenses) 1. **Data loss**: No FSM / Raft / Pebble path changes; admin writes still go through the same `AdminCreateTable` / `AdminDeleteTable` Raft proposal once they reach the leader. 2. **Concurrency**: `runner.start()` reorder confirmed safe — `startDynamoDBServer` only depends on `coordinate` + `shardStore` (already constructed); raft TCP listeners are independent of the dynamo HTTP listener. Worst case during the 100 ms-or-so reorder window is the same "no leader yet" 503 the old order would have produced if a request landed before raft converged. 3. **Performance**: One additional `&kv.GRPCConnCache{}` allocation per process; one closure for the resolver. No hot-path changes. 4. **Data consistency**: Forwarded requests re-validate the principal at the leader (criterion 4, already shipped); commit-ts ordering is unchanged because the leader's `AdminCreateTable` path does its own `HLC.Next`. 5. **Test coverage**: 4 new main-package tests + 1 new admin-package server-level test. `TestServer_ServerDepsForwarderIsWired` is the regression test for "future refactor drops the forwarder before it reaches the dynamo handler" — exactly the failure mode the wiring change in this PR introduces the risk of. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **New Features** * Added leader-forwarding capability for admin requests sent to follower nodes * Implemented role-based access control for admin API operations * Enhanced DynamoDB request routing to automatically direct operations to leader nodes * **Tests** * Added comprehensive integration and unit tests validating leader-forwarding paths <!-- end of auto-generated comment: release notes by coderabbit.ai -->
2 parents b9d24f6 + 964ecd4 commit 41c7f0f

7 files changed

Lines changed: 508 additions & 17 deletions

File tree

internal/admin/server.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,14 @@ type ServerDeps struct {
3939
// cluster page deploy without standing up the dynamo bridge.
4040
Tables TablesSource
4141

42+
// Forwarder is the LeaderForwarder that the Dynamo handler hands
43+
// off ErrTablesNotLeader writes to (design 3.3, AdminForward).
44+
// Optional: a nil value disables follower→leader forwarding, in
45+
// which case the handler surfaces 503 + Retry-After: 1 directly.
46+
// Single-node and leader-only deployments leave this nil; multi-
47+
// node clusters wire the production gRPC client.
48+
Forwarder LeaderForwarder
49+
4250
// StaticFS is the embed.FS (or any fs.FS) backing the SPA. May be
4351
// nil during early development; the router renders 404 for
4452
// /admin/assets/* and the SPA fallback in that case.
@@ -110,9 +118,13 @@ func NewServer(deps ServerDeps) (*Server, error) {
110118
// operators must restart the listener for revocation to
111119
// take effect, but the JWT no longer extends a revoked
112120
// key past the next request.
113-
dynamo = NewDynamoHandler(deps.Tables).
121+
dynamoHandler := NewDynamoHandler(deps.Tables).
114122
WithLogger(logger).
115123
WithRoleStore(MapRoleStore(deps.Roles))
124+
if deps.Forwarder != nil {
125+
dynamoHandler = dynamoHandler.WithLeaderForwarder(deps.Forwarder)
126+
}
127+
dynamo = dynamoHandler
116128
}
117129
mux := buildAPIMux(auth, deps.Verifier, cluster, dynamo, logger)
118130
router := NewRouter(mux, deps.StaticFS)

internal/admin/server_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,57 @@ func TestServer_DynamoDeleteTable_FullRoleHappyPath(t *testing.T) {
453453
_ = resp.Body.Close()
454454
}
455455

456+
// TestServer_ServerDepsForwarderIsWired confirms that a non-nil
457+
// ServerDeps.Forwarder reaches the DynamoHandler so a follower-side
458+
// CreateTable that the source returns ErrTablesNotLeader for is
459+
// transparently forwarded. Without the wire, the request would 503 +
460+
// Retry-After:1 — the no-forwarder fallback path. With it, the
461+
// leader's response status (here, 201) is replayed verbatim.
462+
func TestServer_ServerDepsForwarderIsWired(t *testing.T) {
463+
src := &notLeaderSource{}
464+
fwd := &stubLeaderForwarder{createRes: &ForwardResult{
465+
StatusCode: http.StatusCreated,
466+
Payload: []byte(`{"name":"users"}`),
467+
ContentType: "application/json; charset=utf-8",
468+
}}
469+
clk := fixedClock(time.Unix(1_700_000_000, 0).UTC())
470+
signer := newSignerForTest(t, 1, clk)
471+
verifier := newVerifierForTest(t, []byte{1}, clk)
472+
cluster := ClusterInfoFunc(func(_ context.Context) (ClusterInfo, error) {
473+
return ClusterInfo{NodeID: "node-1", Version: "0.1.0"}, nil
474+
})
475+
srv, err := NewServer(ServerDeps{
476+
Signer: signer,
477+
Verifier: verifier,
478+
Credentials: MapCredentialStore{"AKIA_ADMIN": "ADMIN_SECRET"},
479+
Roles: map[string]Role{"AKIA_ADMIN": RoleFull},
480+
ClusterInfo: cluster,
481+
Tables: src,
482+
Forwarder: fwd,
483+
AuthOpts: AuthServiceOpts{Clock: clk},
484+
})
485+
require.NoError(t, err)
486+
487+
ts := httptest.NewServer(srv.Handler())
488+
defer ts.Close()
489+
cookies := loginAsFullAdminAndCookies(t, ts)
490+
body := strings.NewReader(`{"table_name":"users","partition_key":{"name":"id","type":"S"}}`)
491+
req, err := http.NewRequestWithContext(context.Background(), http.MethodPost,
492+
ts.URL+"/admin/api/v1/dynamo/tables", body)
493+
require.NoError(t, err)
494+
req.Header.Set("Content-Type", "application/json")
495+
req.Header.Set(csrfHeaderName, csrfHeaderFromCookies(cookies))
496+
for _, c := range cookies {
497+
req.AddCookie(c)
498+
}
499+
resp, err := http.DefaultClient.Do(req)
500+
require.NoError(t, err)
501+
require.Equal(t, http.StatusCreated, resp.StatusCode)
502+
require.Equal(t, "users", fwd.lastCreateInput.TableName,
503+
"forwarder must be invoked when source returns ErrTablesNotLeader")
504+
_ = resp.Body.Close()
505+
}
506+
456507
func TestServer_WriteRejectsMissingCSRF(t *testing.T) {
457508
// Login to obtain a session, then hit cluster with POST to trigger
458509
// CSRF on what the router normally rejects as method_not_allowed.

main.go

Lines changed: 73 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/bootjp/elastickv/adapter"
2020
"github.com/bootjp/elastickv/distribution"
2121
internalutil "github.com/bootjp/elastickv/internal"
22+
"github.com/bootjp/elastickv/internal/admin"
2223
"github.com/bootjp/elastickv/internal/memwatch"
2324
internalraftadmin "github.com/bootjp/elastickv/internal/raftadmin"
2425
"github.com/bootjp/elastickv/internal/raftengine"
@@ -680,6 +681,40 @@ func startServers(in serversInput) error {
680681
if err != nil {
681682
return err
682683
}
684+
// roleStore + connCache are gated on *adminEnabled. With admin
685+
// disabled, building either is wasted work AND a security
686+
// regression risk: a non-empty -adminFullAccessKeys flag would
687+
// otherwise still flip forwardDeps.readyForRegistration() to
688+
// true, registering the leader-side gRPC AdminForward service
689+
// and re-exposing the table-write surface a follower-direct
690+
// admin call could reach (Codex P1, CodeRabbit Major on #648).
691+
// The HTTP admin listener already short-circuits in
692+
// startAdminFromFlags when *adminEnabled is false; the gRPC path
693+
// must do the same.
694+
var (
695+
roleStore admin.RoleStore
696+
connCache *kv.GRPCConnCache
697+
)
698+
if *adminEnabled {
699+
roleStore = roleStoreFromFlags(parseCSV(*adminFullAccessKeys), parseCSV(*adminReadOnlyAccessKeys))
700+
// connCache is shared between the follower-side LeaderForwarder
701+
// (built inside startAdminFromFlags) and any future bridge that
702+
// dials the leader's gRPC ports. Keeping a single instance per
703+
// process means the two paths re-use TLS / HTTP/2 connections
704+
// rather than each maintaining a parallel pool. The shutdown
705+
// goroutine drains the cache on context cancellation so the
706+
// accumulated HTTP/2 connections are not leaked when the
707+
// process exits gracefully (Claude review on #648).
708+
connCache = &kv.GRPCConnCache{}
709+
cache := connCache
710+
in.eg.Go(func() error {
711+
<-in.ctx.Done()
712+
if err := cache.Close(); err != nil {
713+
return errors.Wrap(err, "close admin gRPC connection cache")
714+
}
715+
return nil
716+
})
717+
}
683718
runner := runtimeServerRunner{
684719
ctx: in.ctx,
685720
lc: in.lc,
@@ -711,6 +746,7 @@ func startServers(in serversInput) error {
711746
pprofAddress: *pprofAddr,
712747
pprofToken: *pprofToken,
713748
metricsRegistry: in.metricsRegistry,
749+
roleStore: roleStore,
714750
}
715751
if err := runner.start(); err != nil {
716752
return err
@@ -720,7 +756,14 @@ func startServers(in serversInput) error {
720756
// Passing nil here would leave the admin dashboard with no
721757
// access to table metadata; the admin handler answers
722758
// /admin/api/v1/dynamo/* with 404 in that case.
723-
if err := startAdminFromFlags(in.ctx, in.lc, in.eg, in.runtimes, runner.dynamoServer); err != nil {
759+
//
760+
// in.coordinate + connCache are forwarded so the admin HTTP
761+
// dynamo handler can construct its production LeaderForwarder
762+
// (Phase 3 of design 3.3): when the local node is a follower,
763+
// the handler hands ErrTablesNotLeader writes to the forwarder
764+
// which dials the leader over the cached gRPC pool. Without these
765+
// the handler falls back to 503 + Retry-After:1.
766+
if err := startAdminFromFlags(in.ctx, in.lc, in.eg, in.runtimes, runner.dynamoServer, in.coordinate, connCache); err != nil {
724767
return waitErrgroupAfterStartupFailure(in.cancel, in.eg, err)
725768
}
726769
return nil
@@ -975,7 +1018,9 @@ func startRaftServers(
9751018
proposalObserverForGroup func(uint64) kv.ProposalObserver,
9761019
adminServer *adapter.AdminServer,
9771020
adminGRPCOpts adminGRPCInterceptors,
1021+
forwardDeps adminForwardServerDeps,
9781022
) error {
1023+
forwardLogger := slog.Default().With(slog.String("component", "admin"))
9791024
// extraOptsCap reserves slots for the unary + stream admin interceptor
9801025
// options appended below. Sized as a constant so the magic-number
9811026
// linter does not complain.
@@ -1005,6 +1050,7 @@ func startRaftServers(
10051050
if adminServer != nil {
10061051
pb.RegisterAdminServer(gs, adminServer)
10071052
}
1053+
registerAdminForwardServer(gs, forwardDeps, forwardLogger)
10081054
rt.registerGRPC(gs)
10091055
internalraftadmin.RegisterOperationalServices(ctx, gs, rt.engine, []string{"RawKV"})
10101056
reflection.Register(gs)
@@ -1255,12 +1301,37 @@ type runtimeServerRunner struct {
12551301
// field is unexported on purpose — it is package-private state,
12561302
// not a public API. Nil until start() reaches the dynamo step.
12571303
dynamoServer *adapter.DynamoDBServer
1304+
1305+
// roleStore is the access-key → role index the leader-side
1306+
// gRPC AdminForward service uses to re-validate the principal
1307+
// on every forwarded write. Mirrors what admin.Config.RoleIndex
1308+
// produces inside startAdminFromFlags; built up-front in
1309+
// startServers so registerAdminForwardServer in startRaftServers
1310+
// does not need to wait for the (later) admin-config parse.
1311+
// Nil when no admin access keys are configured.
1312+
roleStore admin.RoleStore
12581313
}
12591314

12601315
func (r *runtimeServerRunner) start() error {
12611316
if err := startRedisServer(r.ctx, r.lc, r.eg, r.redisAddress, r.shardStore, r.coordinate, r.leaderRedis, r.pubsubRelay, r.metricsRegistry, r.readTracker); err != nil {
12621317
return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err)
12631318
}
1319+
// startDynamoDBServer must run BEFORE startRaftServers so the
1320+
// resulting DynamoDBServer is available to the leader-side gRPC
1321+
// AdminForward registration in startRaftServers (design 3.3).
1322+
// Both servers listen on different addresses; the dynamo HTTP
1323+
// listener accepting traffic before raft TCP listeners are up
1324+
// is no different from the existing startup-race semantics — a
1325+
// hit in that window already returned 503 before this reorder.
1326+
dynamoServer, err := startDynamoDBServer(r.ctx, r.lc, r.eg, r.dynamoAddress, r.shardStore, r.coordinate, r.leaderDynamo, r.metricsRegistry, r.readTracker)
1327+
if err != nil {
1328+
return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err)
1329+
}
1330+
r.dynamoServer = dynamoServer
1331+
forwardDeps := adminForwardServerDeps{
1332+
tables: newDynamoTablesSource(r.dynamoServer),
1333+
roles: r.roleStore,
1334+
}
12641335
if err := startRaftServers(
12651336
r.ctx,
12661337
r.lc,
@@ -1275,14 +1346,10 @@ func (r *runtimeServerRunner) start() error {
12751346
},
12761347
r.adminServer,
12771348
r.adminGRPCOpts,
1349+
forwardDeps,
12781350
); err != nil {
12791351
return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err)
12801352
}
1281-
dynamoServer, err := startDynamoDBServer(r.ctx, r.lc, r.eg, r.dynamoAddress, r.shardStore, r.coordinate, r.leaderDynamo, r.metricsRegistry, r.readTracker)
1282-
if err != nil {
1283-
return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err)
1284-
}
1285-
r.dynamoServer = dynamoServer
12861353
if err := startS3Server(r.ctx, r.lc, r.eg, r.s3Address, r.shardStore, r.coordinate, r.leaderS3, r.s3Region, r.s3CredsFile, r.s3PathStyleOnly, r.readTracker); err != nil {
12871354
return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err)
12881355
}

main_admin.go

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,15 @@ type adminListenerConfig struct {
6868
// without touching --s3CredentialsFile: pulling the admin feature into
6969
// a hard dependency on that file would break deployments that never
7070
// intended to use it.
71-
func startAdminFromFlags(ctx context.Context, lc *net.ListenConfig, eg *errgroup.Group, runtimes []*raftGroupRuntime, dynamoServer *adapter.DynamoDBServer) error {
71+
func startAdminFromFlags(
72+
ctx context.Context,
73+
lc *net.ListenConfig,
74+
eg *errgroup.Group,
75+
runtimes []*raftGroupRuntime,
76+
dynamoServer *adapter.DynamoDBServer,
77+
coordinate kv.Coordinator,
78+
connCache *kv.GRPCConnCache,
79+
) error {
7280
if !*adminEnabled {
7381
return nil
7482
}
@@ -109,10 +117,40 @@ func startAdminFromFlags(ctx context.Context, lc *net.ListenConfig, eg *errgroup
109117
}
110118
clusterSrc := newClusterInfoSource(*raftId, buildVersion(), runtimes)
111119
tablesSrc := newDynamoTablesSource(dynamoServer)
112-
_, err = startAdminServer(ctx, lc, eg, cfg, staticCreds, clusterSrc, tablesSrc, buildVersion())
120+
forwarder, err := buildAdminLeaderForwarder(coordinate, connCache, *raftId)
121+
if err != nil {
122+
return errors.Wrap(err, "build admin leader forwarder")
123+
}
124+
_, err = startAdminServer(ctx, lc, eg, cfg, staticCreds, clusterSrc, tablesSrc, forwarder, buildVersion())
113125
return err
114126
}
115127

128+
// buildAdminLeaderForwarder constructs the production LeaderForwarder
129+
// for the dynamo HTTP handler when the wiring is complete enough to
130+
// reach a remote leader. The bridge tolerates a nil connCache (and a
131+
// nil coordinate) so single-node / leader-only builds — where the
132+
// dashboard always hits a leader — can ship without paying the
133+
// forwarder's wiring cost. tablesSrc itself can be nil for cluster-
134+
// only builds; that's handled higher up by ServerDeps.Tables == nil.
135+
func buildAdminLeaderForwarder(coordinate kv.Coordinator, connCache *kv.GRPCConnCache, nodeID string) (admin.LeaderForwarder, error) {
136+
if coordinate == nil || connCache == nil {
137+
// Returning (nil, nil) is the explicit "no forwarder" signal
138+
// — the handler falls back to 503 + Retry-After:1 on
139+
// ErrTablesNotLeader. The function-level doc comment above
140+
// describes this contract; the nilnil linter is not enabled
141+
// in .golangci.yaml so no suppression directive is needed
142+
// (Claude review on #648).
143+
return nil, nil
144+
}
145+
if nodeID == "" {
146+
// admin.NewGRPCForwardClient enforces this too; surfacing
147+
// it here keeps the misconfiguration message in the wiring
148+
// layer rather than buried under a Wrap chain.
149+
return nil, errors.New("admin forward bridge: --raftId is required")
150+
}
151+
return buildLeaderForwarder(coordinate, connCache, nodeID)
152+
}
153+
116154
// newDynamoTablesSource adapts *adapter.DynamoDBServer to the
117155
// admin.TablesSource interface. The bridge stays in this file (rather
118156
// than internal/admin) so the admin package stays free of the heavy
@@ -346,14 +384,15 @@ func startAdminServer(
346384
creds map[string]string,
347385
cluster admin.ClusterInfoSource,
348386
tables admin.TablesSource,
387+
forwarder admin.LeaderForwarder,
349388
version string,
350389
) (string, error) {
351390
adminCfg := buildAdminConfig(cfg)
352391
enabled, err := checkAdminConfig(&adminCfg, cluster)
353392
if err != nil || !enabled {
354393
return "", err
355394
}
356-
server, err := buildAdminHTTPServer(&adminCfg, creds, cluster, tables)
395+
server, err := buildAdminHTTPServer(&adminCfg, creds, cluster, tables, forwarder)
357396
if err != nil {
358397
return "", err
359398
}
@@ -393,7 +432,7 @@ func checkAdminConfig(adminCfg *admin.Config, cluster admin.ClusterInfoSource) (
393432
return true, nil
394433
}
395434

396-
func buildAdminHTTPServer(adminCfg *admin.Config, creds map[string]string, cluster admin.ClusterInfoSource, tables admin.TablesSource) (*admin.Server, error) {
435+
func buildAdminHTTPServer(adminCfg *admin.Config, creds map[string]string, cluster admin.ClusterInfoSource, tables admin.TablesSource, forwarder admin.LeaderForwarder) (*admin.Server, error) {
397436
primaryKeys, err := adminCfg.DecodedSigningKeys()
398437
if err != nil {
399438
return nil, errors.Wrap(err, "decode admin signing keys")
@@ -413,6 +452,7 @@ func buildAdminHTTPServer(adminCfg *admin.Config, creds map[string]string, clust
413452
Roles: adminCfg.RoleIndex(),
414453
ClusterInfo: cluster,
415454
Tables: tables,
455+
Forwarder: forwarder,
416456
StaticFS: nil,
417457
AuthOpts: admin.AuthServiceOpts{
418458
InsecureCookie: adminCfg.AllowInsecureDevCookie,

0 commit comments

Comments
 (0)