diff --git a/internal/admin/server.go b/internal/admin/server.go index eff694a2d..7742eedd9 100644 --- a/internal/admin/server.go +++ b/internal/admin/server.go @@ -39,6 +39,14 @@ type ServerDeps struct { // cluster page deploy without standing up the dynamo bridge. Tables TablesSource + // Forwarder is the LeaderForwarder that the Dynamo handler hands + // off ErrTablesNotLeader writes to (design 3.3, AdminForward). + // Optional: a nil value disables follower→leader forwarding, in + // which case the handler surfaces 503 + Retry-After: 1 directly. + // Single-node and leader-only deployments leave this nil; multi- + // node clusters wire the production gRPC client. + Forwarder LeaderForwarder + // StaticFS is the embed.FS (or any fs.FS) backing the SPA. May be // nil during early development; the router renders 404 for // /admin/assets/* and the SPA fallback in that case. @@ -110,9 +118,13 @@ func NewServer(deps ServerDeps) (*Server, error) { // operators must restart the listener for revocation to // take effect, but the JWT no longer extends a revoked // key past the next request. - dynamo = NewDynamoHandler(deps.Tables). + dynamoHandler := NewDynamoHandler(deps.Tables). WithLogger(logger). WithRoleStore(MapRoleStore(deps.Roles)) + if deps.Forwarder != nil { + dynamoHandler = dynamoHandler.WithLeaderForwarder(deps.Forwarder) + } + dynamo = dynamoHandler } mux := buildAPIMux(auth, deps.Verifier, cluster, dynamo, logger) router := NewRouter(mux, deps.StaticFS) diff --git a/internal/admin/server_test.go b/internal/admin/server_test.go index 016d4ce28..de655af8f 100644 --- a/internal/admin/server_test.go +++ b/internal/admin/server_test.go @@ -453,6 +453,57 @@ func TestServer_DynamoDeleteTable_FullRoleHappyPath(t *testing.T) { _ = resp.Body.Close() } +// TestServer_ServerDepsForwarderIsWired confirms that a non-nil +// ServerDeps.Forwarder reaches the DynamoHandler so a follower-side +// CreateTable that the source returns ErrTablesNotLeader for is +// transparently forwarded. Without the wire, the request would 503 + +// Retry-After:1 — the no-forwarder fallback path. With it, the +// leader's response status (here, 201) is replayed verbatim. +func TestServer_ServerDepsForwarderIsWired(t *testing.T) { + src := ¬LeaderSource{} + fwd := &stubLeaderForwarder{createRes: &ForwardResult{ + StatusCode: http.StatusCreated, + Payload: []byte(`{"name":"users"}`), + ContentType: "application/json; charset=utf-8", + }} + clk := fixedClock(time.Unix(1_700_000_000, 0).UTC()) + signer := newSignerForTest(t, 1, clk) + verifier := newVerifierForTest(t, []byte{1}, clk) + cluster := ClusterInfoFunc(func(_ context.Context) (ClusterInfo, error) { + return ClusterInfo{NodeID: "node-1", Version: "0.1.0"}, nil + }) + srv, err := NewServer(ServerDeps{ + Signer: signer, + Verifier: verifier, + Credentials: MapCredentialStore{"AKIA_ADMIN": "ADMIN_SECRET"}, + Roles: map[string]Role{"AKIA_ADMIN": RoleFull}, + ClusterInfo: cluster, + Tables: src, + Forwarder: fwd, + AuthOpts: AuthServiceOpts{Clock: clk}, + }) + require.NoError(t, err) + + ts := httptest.NewServer(srv.Handler()) + defer ts.Close() + cookies := loginAsFullAdminAndCookies(t, ts) + body := strings.NewReader(`{"table_name":"users","partition_key":{"name":"id","type":"S"}}`) + req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, + ts.URL+"/admin/api/v1/dynamo/tables", body) + require.NoError(t, err) + req.Header.Set("Content-Type", "application/json") + req.Header.Set(csrfHeaderName, csrfHeaderFromCookies(cookies)) + for _, c := range cookies { + req.AddCookie(c) + } + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusCreated, resp.StatusCode) + require.Equal(t, "users", fwd.lastCreateInput.TableName, + "forwarder must be invoked when source returns ErrTablesNotLeader") + _ = resp.Body.Close() +} + func TestServer_WriteRejectsMissingCSRF(t *testing.T) { // Login to obtain a session, then hit cluster with POST to trigger // CSRF on what the router normally rejects as method_not_allowed. diff --git a/main.go b/main.go index f6e38cc89..348293d10 100644 --- a/main.go +++ b/main.go @@ -19,6 +19,7 @@ import ( "github.com/bootjp/elastickv/adapter" "github.com/bootjp/elastickv/distribution" internalutil "github.com/bootjp/elastickv/internal" + "github.com/bootjp/elastickv/internal/admin" "github.com/bootjp/elastickv/internal/memwatch" internalraftadmin "github.com/bootjp/elastickv/internal/raftadmin" "github.com/bootjp/elastickv/internal/raftengine" @@ -653,6 +654,40 @@ func startServers(in serversInput) error { if err != nil { return err } + // roleStore + connCache are gated on *adminEnabled. With admin + // disabled, building either is wasted work AND a security + // regression risk: a non-empty -adminFullAccessKeys flag would + // otherwise still flip forwardDeps.readyForRegistration() to + // true, registering the leader-side gRPC AdminForward service + // and re-exposing the table-write surface a follower-direct + // admin call could reach (Codex P1, CodeRabbit Major on #648). + // The HTTP admin listener already short-circuits in + // startAdminFromFlags when *adminEnabled is false; the gRPC path + // must do the same. + var ( + roleStore admin.RoleStore + connCache *kv.GRPCConnCache + ) + if *adminEnabled { + roleStore = roleStoreFromFlags(parseCSV(*adminFullAccessKeys), parseCSV(*adminReadOnlyAccessKeys)) + // connCache is shared between the follower-side LeaderForwarder + // (built inside startAdminFromFlags) and any future bridge that + // dials the leader's gRPC ports. Keeping a single instance per + // process means the two paths re-use TLS / HTTP/2 connections + // rather than each maintaining a parallel pool. The shutdown + // goroutine drains the cache on context cancellation so the + // accumulated HTTP/2 connections are not leaked when the + // process exits gracefully (Claude review on #648). + connCache = &kv.GRPCConnCache{} + cache := connCache + in.eg.Go(func() error { + <-in.ctx.Done() + if err := cache.Close(); err != nil { + return errors.Wrap(err, "close admin gRPC connection cache") + } + return nil + }) + } runner := runtimeServerRunner{ ctx: in.ctx, lc: in.lc, @@ -684,6 +719,7 @@ func startServers(in serversInput) error { pprofAddress: *pprofAddr, pprofToken: *pprofToken, metricsRegistry: in.metricsRegistry, + roleStore: roleStore, } if err := runner.start(); err != nil { return err @@ -693,7 +729,14 @@ func startServers(in serversInput) error { // Passing nil here would leave the admin dashboard with no // access to table metadata; the admin handler answers // /admin/api/v1/dynamo/* with 404 in that case. - if err := startAdminFromFlags(in.ctx, in.lc, in.eg, in.runtimes, runner.dynamoServer); err != nil { + // + // in.coordinate + connCache are forwarded so the admin HTTP + // dynamo handler can construct its production LeaderForwarder + // (Phase 3 of design 3.3): when the local node is a follower, + // the handler hands ErrTablesNotLeader writes to the forwarder + // which dials the leader over the cached gRPC pool. Without these + // the handler falls back to 503 + Retry-After:1. + if err := startAdminFromFlags(in.ctx, in.lc, in.eg, in.runtimes, runner.dynamoServer, in.coordinate, connCache); err != nil { return waitErrgroupAfterStartupFailure(in.cancel, in.eg, err) } return nil @@ -940,7 +983,9 @@ func startRaftServers( proposalObserverForGroup func(uint64) kv.ProposalObserver, adminServer *adapter.AdminServer, adminGRPCOpts adminGRPCInterceptors, + forwardDeps adminForwardServerDeps, ) error { + forwardLogger := slog.Default().With(slog.String("component", "admin")) // extraOptsCap reserves slots for the unary + stream admin interceptor // options appended below. Sized as a constant so the magic-number // linter does not complain. @@ -970,6 +1015,7 @@ func startRaftServers( if adminServer != nil { pb.RegisterAdminServer(gs, adminServer) } + registerAdminForwardServer(gs, forwardDeps, forwardLogger) rt.registerGRPC(gs) internalraftadmin.RegisterOperationalServices(ctx, gs, rt.engine, []string{"RawKV"}) reflection.Register(gs) @@ -1220,12 +1266,37 @@ type runtimeServerRunner struct { // field is unexported on purpose — it is package-private state, // not a public API. Nil until start() reaches the dynamo step. dynamoServer *adapter.DynamoDBServer + + // roleStore is the access-key → role index the leader-side + // gRPC AdminForward service uses to re-validate the principal + // on every forwarded write. Mirrors what admin.Config.RoleIndex + // produces inside startAdminFromFlags; built up-front in + // startServers so registerAdminForwardServer in startRaftServers + // does not need to wait for the (later) admin-config parse. + // Nil when no admin access keys are configured. + roleStore admin.RoleStore } func (r *runtimeServerRunner) start() error { if err := startRedisServer(r.ctx, r.lc, r.eg, r.redisAddress, r.shardStore, r.coordinate, r.leaderRedis, r.pubsubRelay, r.metricsRegistry, r.readTracker); err != nil { return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err) } + // startDynamoDBServer must run BEFORE startRaftServers so the + // resulting DynamoDBServer is available to the leader-side gRPC + // AdminForward registration in startRaftServers (design 3.3). + // Both servers listen on different addresses; the dynamo HTTP + // listener accepting traffic before raft TCP listeners are up + // is no different from the existing startup-race semantics — a + // hit in that window already returned 503 before this reorder. + dynamoServer, err := startDynamoDBServer(r.ctx, r.lc, r.eg, r.dynamoAddress, r.shardStore, r.coordinate, r.leaderDynamo, r.metricsRegistry, r.readTracker) + if err != nil { + return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err) + } + r.dynamoServer = dynamoServer + forwardDeps := adminForwardServerDeps{ + tables: newDynamoTablesSource(r.dynamoServer), + roles: r.roleStore, + } if err := startRaftServers( r.ctx, r.lc, @@ -1240,14 +1311,10 @@ func (r *runtimeServerRunner) start() error { }, r.adminServer, r.adminGRPCOpts, + forwardDeps, ); err != nil { return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err) } - dynamoServer, err := startDynamoDBServer(r.ctx, r.lc, r.eg, r.dynamoAddress, r.shardStore, r.coordinate, r.leaderDynamo, r.metricsRegistry, r.readTracker) - if err != nil { - return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err) - } - r.dynamoServer = dynamoServer if err := startS3Server(r.ctx, r.lc, r.eg, r.s3Address, r.shardStore, r.coordinate, r.leaderS3, r.s3Region, r.s3CredsFile, r.s3PathStyleOnly, r.readTracker); err != nil { return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err) } diff --git a/main_admin.go b/main_admin.go index e9e1f526c..f6d76f7a3 100644 --- a/main_admin.go +++ b/main_admin.go @@ -68,7 +68,15 @@ type adminListenerConfig struct { // without touching --s3CredentialsFile: pulling the admin feature into // a hard dependency on that file would break deployments that never // intended to use it. -func startAdminFromFlags(ctx context.Context, lc *net.ListenConfig, eg *errgroup.Group, runtimes []*raftGroupRuntime, dynamoServer *adapter.DynamoDBServer) error { +func startAdminFromFlags( + ctx context.Context, + lc *net.ListenConfig, + eg *errgroup.Group, + runtimes []*raftGroupRuntime, + dynamoServer *adapter.DynamoDBServer, + coordinate kv.Coordinator, + connCache *kv.GRPCConnCache, +) error { if !*adminEnabled { return nil } @@ -109,10 +117,40 @@ func startAdminFromFlags(ctx context.Context, lc *net.ListenConfig, eg *errgroup } clusterSrc := newClusterInfoSource(*raftId, buildVersion(), runtimes) tablesSrc := newDynamoTablesSource(dynamoServer) - _, err = startAdminServer(ctx, lc, eg, cfg, staticCreds, clusterSrc, tablesSrc, buildVersion()) + forwarder, err := buildAdminLeaderForwarder(coordinate, connCache, *raftId) + if err != nil { + return errors.Wrap(err, "build admin leader forwarder") + } + _, err = startAdminServer(ctx, lc, eg, cfg, staticCreds, clusterSrc, tablesSrc, forwarder, buildVersion()) return err } +// buildAdminLeaderForwarder constructs the production LeaderForwarder +// for the dynamo HTTP handler when the wiring is complete enough to +// reach a remote leader. The bridge tolerates a nil connCache (and a +// nil coordinate) so single-node / leader-only builds — where the +// dashboard always hits a leader — can ship without paying the +// forwarder's wiring cost. tablesSrc itself can be nil for cluster- +// only builds; that's handled higher up by ServerDeps.Tables == nil. +func buildAdminLeaderForwarder(coordinate kv.Coordinator, connCache *kv.GRPCConnCache, nodeID string) (admin.LeaderForwarder, error) { + if coordinate == nil || connCache == nil { + // Returning (nil, nil) is the explicit "no forwarder" signal + // — the handler falls back to 503 + Retry-After:1 on + // ErrTablesNotLeader. The function-level doc comment above + // describes this contract; the nilnil linter is not enabled + // in .golangci.yaml so no suppression directive is needed + // (Claude review on #648). + return nil, nil + } + if nodeID == "" { + // admin.NewGRPCForwardClient enforces this too; surfacing + // it here keeps the misconfiguration message in the wiring + // layer rather than buried under a Wrap chain. + return nil, errors.New("admin forward bridge: --raftId is required") + } + return buildLeaderForwarder(coordinate, connCache, nodeID) +} + // newDynamoTablesSource adapts *adapter.DynamoDBServer to the // admin.TablesSource interface. The bridge stays in this file (rather // than internal/admin) so the admin package stays free of the heavy @@ -346,6 +384,7 @@ func startAdminServer( creds map[string]string, cluster admin.ClusterInfoSource, tables admin.TablesSource, + forwarder admin.LeaderForwarder, version string, ) (string, error) { adminCfg := buildAdminConfig(cfg) @@ -353,7 +392,7 @@ func startAdminServer( if err != nil || !enabled { return "", err } - server, err := buildAdminHTTPServer(&adminCfg, creds, cluster, tables) + server, err := buildAdminHTTPServer(&adminCfg, creds, cluster, tables, forwarder) if err != nil { return "", err } @@ -393,7 +432,7 @@ func checkAdminConfig(adminCfg *admin.Config, cluster admin.ClusterInfoSource) ( return true, nil } -func buildAdminHTTPServer(adminCfg *admin.Config, creds map[string]string, cluster admin.ClusterInfoSource, tables admin.TablesSource) (*admin.Server, error) { +func buildAdminHTTPServer(adminCfg *admin.Config, creds map[string]string, cluster admin.ClusterInfoSource, tables admin.TablesSource, forwarder admin.LeaderForwarder) (*admin.Server, error) { primaryKeys, err := adminCfg.DecodedSigningKeys() if err != nil { return nil, errors.Wrap(err, "decode admin signing keys") @@ -413,6 +452,7 @@ func buildAdminHTTPServer(adminCfg *admin.Config, creds map[string]string, clust Roles: adminCfg.RoleIndex(), ClusterInfo: cluster, Tables: tables, + Forwarder: forwarder, StaticFS: nil, AuthOpts: admin.AuthServiceOpts{ InsecureCookie: adminCfg.AllowInsecureDevCookie, diff --git a/main_admin_forward.go b/main_admin_forward.go new file mode 100644 index 000000000..c06e57165 --- /dev/null +++ b/main_admin_forward.go @@ -0,0 +1,120 @@ +package main + +import ( + "log/slog" + + "github.com/bootjp/elastickv/internal/admin" + "github.com/bootjp/elastickv/kv" + pb "github.com/bootjp/elastickv/proto" + "github.com/cockroachdb/errors" + "google.golang.org/grpc" +) + +// adminForwardConnFactory bridges kv.GRPCConnCache to the +// admin.GRPCConnFactory interface. The cache hands back +// *grpc.ClientConn; the admin layer wants a typed +// PBAdminForwardClient — pb.NewAdminForwardClient(conn) is the +// generated constructor that adapts one to the other. Defining the +// bridge here (rather than in internal/admin) keeps the admin package +// free of any kv-package import. +type adminForwardConnFactory struct { + cache *kv.GRPCConnCache +} + +// ConnFor satisfies admin.GRPCConnFactory. addr "" is rejected at the +// LeaderForwarder layer (ErrLeaderUnavailable) before reaching this +// method, so we surface the conn-cache's own error vocabulary +// unchanged when the dial fails. +func (f *adminForwardConnFactory) ConnFor(addr string) (admin.PBAdminForwardClient, error) { + conn, err := f.cache.ConnFor(addr) + if err != nil { + return nil, errors.Wrap(err, "admin forward: dial leader") + } + return pb.NewAdminForwardClient(conn), nil +} + +// buildLeaderForwarder is the production constructor for the +// follower-side LeaderForwarder. It resolves the current leader's +// gRPC address through the local Coordinator (which queries the +// default group's raft engine), reuses one cached gRPC connection per +// address, and stamps the local nodeID onto every forwarded request +// so the leader's audit log carries the trace. +// +// All three inputs are required; a nil coordinate, a nil connCache, +// or an empty nodeID is a wiring bug and surfaces as a startup error +// rather than a runtime 500. +func buildLeaderForwarder(coordinate kv.Coordinator, connCache *kv.GRPCConnCache, nodeID string) (admin.LeaderForwarder, error) { + if coordinate == nil { + return nil, errors.New("admin forward bridge: coordinator is required") + } + if connCache == nil { + return nil, errors.New("admin forward bridge: gRPC connection cache is required") + } + resolver := func() string { return coordinate.RaftLeader() } + factory := &adminForwardConnFactory{cache: connCache} + fwd, err := admin.NewGRPCForwardClient(resolver, factory, nodeID) + if err != nil { + return nil, errors.Wrap(err, "build leader forwarder") + } + return fwd, nil +} + +// adminForwardServerDeps is the small bundle the gRPC ForwardServer +// needs to be reachable from a follower's bridge. Collecting them in +// a struct keeps startRaftServers' signature tractable as the wiring +// surface grows. All fields are required; a missing one means the +// ForwardServer is not registered (single-node / leader-only build). +type adminForwardServerDeps struct { + tables admin.TablesSource + roles admin.RoleStore +} + +// readyForRegistration reports whether the bundle has enough +// collaborators to construct + register a ForwardServer. Both fields +// must be non-nil; a nil TablesSource means the build ships without +// the dynamo adapter, and a nil RoleStore means admin auth is not +// configured. Either way, registering the gRPC service would 500 +// every forwarded call, so we silently skip registration instead. +func (d adminForwardServerDeps) readyForRegistration() bool { + return d.tables != nil && d.roles != nil +} + +// registerAdminForwardServer attaches the leader-side gRPC +// AdminForward service to gs when the bundle is ready (TablesSource + +// RoleStore both present). Centralising the call here keeps the +// proto-level Register* import out of main.go's startRaftServers and +// lets the readyForRegistration gate decide silently whether this +// build serves forwarded admin writes at all. +func registerAdminForwardServer(gs *grpc.Server, deps adminForwardServerDeps, logger *slog.Logger) { + if !deps.readyForRegistration() { + return + } + pb.RegisterAdminForwardServer(gs, admin.NewForwardServer(deps.tables, deps.roles, logger)) +} + +// roleStoreFromFlags builds the same access-key → role map that +// admin.Config.RoleIndex produces, but from the raw flag strings so +// the gRPC ForwardServer registration in startRaftServers does not +// need to wait for startAdminFromFlags to parse the admin config. +// Returns nil when no keys are configured at all — that shape is the +// "admin auth disabled" signal adminForwardServerDeps consumes to +// skip registration. +func roleStoreFromFlags(fullKeys, readOnlyKeys []string) admin.RoleStore { + if len(fullKeys) == 0 && len(readOnlyKeys) == 0 { + return nil + } + idx := make(map[string]admin.Role, len(fullKeys)+len(readOnlyKeys)) + for _, k := range fullKeys { + idx[k] = admin.RoleFull + } + for _, k := range readOnlyKeys { + // Overlap with FullAccessKeys is rejected at admin.Config.Validate + // time during startAdminFromFlags. We can't replicate that here + // without parsing the full config, so the ReadOnlyAccessKeys loop + // runs second to mirror RoleIndex's "last-write-wins-but-only-for- + // non-overlapping-keys" semantics — overlap is a startup error + // that the HTTP path will surface. + idx[k] = admin.RoleReadOnly + } + return admin.MapRoleStore(idx) +} diff --git a/main_admin_forward_test.go b/main_admin_forward_test.go new file mode 100644 index 000000000..041713052 --- /dev/null +++ b/main_admin_forward_test.go @@ -0,0 +1,201 @@ +package main + +import ( + "context" + "testing" + + "github.com/bootjp/elastickv/internal/admin" + "github.com/bootjp/elastickv/kv" + "github.com/stretchr/testify/require" +) + +func TestBuildLeaderForwarder_RejectsMissingDeps(t *testing.T) { + cache := &kv.GRPCConnCache{} + cases := []struct { + name string + coord kv.Coordinator + cache *kv.GRPCConnCache + nodeID string + wantSubst string + }{ + {"nil coordinator", nil, cache, "n1", "coordinator"}, + {"nil conn cache", &kv.Coordinate{}, nil, "n1", "gRPC connection cache"}, + // admin.NewGRPCForwardClient owns the empty-nodeID rejection; + // we confirm the wrapped error preserves that vocabulary so a + // misconfigured deployment fails fast at startup with a + // pinpointed message rather than mysterious 500s at runtime. + {"empty node id", &kv.Coordinate{}, cache, "", "node id is required"}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + fwd, err := buildLeaderForwarder(tc.coord, tc.cache, tc.nodeID) + require.Error(t, err) + require.Nil(t, fwd) + require.Contains(t, err.Error(), tc.wantSubst) + }) + } +} + +func TestBuildLeaderForwarder_HappyPathReturnsForwarder(t *testing.T) { + // The production bridge does not dial during construction — + // resolver / dial calls only happen on the first Forward — so + // passing real (zero-value) collaborators is enough to confirm + // the wiring itself is well-formed. + fwd, err := buildLeaderForwarder(&kv.Coordinate{}, &kv.GRPCConnCache{}, "n1") + require.NoError(t, err) + require.NotNil(t, fwd) +} + +func TestAdminForwardConnFactory_RejectsEmptyAddr(t *testing.T) { + // kv.GRPCConnCache.ConnFor returns ErrLeaderNotFound on "". The + // LeaderForwarder catches the empty address before this layer is + // reached, but the bridge still surfaces an error rather than a + // nil client when invoked directly — so a future caller that + // bypasses the resolver does not get a typed-nil PBAdminForwardClient. + f := &adminForwardConnFactory{cache: &kv.GRPCConnCache{}} + cli, err := f.ConnFor("") + require.Error(t, err) + require.Nil(t, cli) +} + +func TestRoleStoreFromFlags(t *testing.T) { + cases := []struct { + name string + full []string + readOnly []string + wantNil bool + wantFull []string + wantReadOnly []string + }{ + {name: "both empty produces nil store", wantNil: true}, + { + name: "full only", + full: []string{"AKIA_F"}, + wantFull: []string{"AKIA_F"}, + }, + { + name: "read-only only", + readOnly: []string{"AKIA_R"}, + wantReadOnly: []string{"AKIA_R"}, + }, + { + name: "mixed roles", + full: []string{"AKIA_F1", "AKIA_F2"}, + readOnly: []string{"AKIA_R1"}, + wantFull: []string{"AKIA_F1", "AKIA_F2"}, + wantReadOnly: []string{"AKIA_R1"}, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + store := roleStoreFromFlags(tc.full, tc.readOnly) + if tc.wantNil { + require.Nil(t, store) + return + } + require.NotNil(t, store) + for _, k := range tc.wantFull { + role, ok := store.LookupRole(k) + require.True(t, ok, "expected %s present", k) + require.Equal(t, admin.RoleFull, role) + } + for _, k := range tc.wantReadOnly { + role, ok := store.LookupRole(k) + require.True(t, ok, "expected %s present", k) + require.Equal(t, admin.RoleReadOnly, role) + } + }) + } +} + +func TestAdminForwardServerDeps_ReadyForRegistration(t *testing.T) { + // The bundle's readyForRegistration gate decides whether + // startRaftServers wires the gRPC ForwardServer at all. A nil + // TablesSource (cluster-only build) or nil RoleStore (admin + // auth disabled) means a registered service would 500 every + // forwarded call — silently skipping registration is the + // preferred behaviour. + require.False(t, adminForwardServerDeps{}.readyForRegistration()) + require.False(t, adminForwardServerDeps{tables: dummyTablesSource{}}.readyForRegistration()) + require.False(t, adminForwardServerDeps{roles: admin.MapRoleStore{}}.readyForRegistration()) + require.True(t, adminForwardServerDeps{ + tables: dummyTablesSource{}, + roles: admin.MapRoleStore{}, + }.readyForRegistration()) +} + +func TestBuildAdminLeaderForwarder_NilGateReturnsNoForwarder(t *testing.T) { + // buildAdminLeaderForwarder is the wrapper in main_admin.go that + // short-circuits to (nil, nil) when either coordinate or + // connCache is nil — the explicit "no forwarder" path for + // single-node / leader-only deployments. A future refactor that + // drops the guard would silently pass a nil collaborator into + // buildLeaderForwarder, which would either crash on the nil + // resolver / cache deref or build a forwarder that panics on + // the first request. Locking this down keeps the contract intact + // (Claude review on #648). + cases := []struct { + name string + coord kv.Coordinator + cache *kv.GRPCConnCache + nodeID string + wantNil bool + wantError string + }{ + {name: "nil coordinator", cache: &kv.GRPCConnCache{}, nodeID: "n1", wantNil: true}, + {name: "nil conn cache", coord: &kv.Coordinate{}, nodeID: "n1", wantNil: true}, + {name: "both nil", nodeID: "n1", wantNil: true}, + { + name: "complete deps but empty node id", + coord: &kv.Coordinate{}, + cache: &kv.GRPCConnCache{}, + wantError: "--raftId is required", + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + fwd, err := buildAdminLeaderForwarder(tc.coord, tc.cache, tc.nodeID) + if tc.wantError != "" { + require.Error(t, err) + require.Contains(t, err.Error(), tc.wantError) + require.Nil(t, fwd) + return + } + require.NoError(t, err) + if tc.wantNil { + require.Nil(t, fwd) + } else { + require.NotNil(t, fwd) + } + }) + } +} + +func TestBuildAdminLeaderForwarder_HappyPathReturnsForwarder(t *testing.T) { + fwd, err := buildAdminLeaderForwarder(&kv.Coordinate{}, &kv.GRPCConnCache{}, "n1") + require.NoError(t, err) + require.NotNil(t, fwd) +} + +// dummyTablesSource is the smallest concrete admin.TablesSource for +// the readyForRegistration gate test — no method body needs to +// execute, so every method just panics. Using a real implementation +// would pull adapter dependencies into a main_admin test that has +// nothing to do with adapter behaviour. +type dummyTablesSource struct{} + +func (dummyTablesSource) AdminListTables(_ context.Context) ([]string, error) { + panic("dummyTablesSource.AdminListTables should not be invoked") +} + +func (dummyTablesSource) AdminDescribeTable(_ context.Context, _ string) (*admin.DynamoTableSummary, bool, error) { + panic("dummyTablesSource.AdminDescribeTable should not be invoked") +} + +func (dummyTablesSource) AdminCreateTable(_ context.Context, _ admin.AuthPrincipal, _ admin.CreateTableRequest) (*admin.DynamoTableSummary, error) { + panic("dummyTablesSource.AdminCreateTable should not be invoked") +} + +func (dummyTablesSource) AdminDeleteTable(_ context.Context, _ admin.AuthPrincipal, _ string) error { + panic("dummyTablesSource.AdminDeleteTable should not be invoked") +} diff --git a/main_admin_test.go b/main_admin_test.go index e62a61aaf..d463dbe1a 100644 --- a/main_admin_test.go +++ b/main_admin_test.go @@ -198,7 +198,7 @@ func TestStartAdminServer_DisabledNoOp(t *testing.T) { eg, ctx := errgroup.WithContext(context.Background()) defer func() { _ = eg.Wait() }() var lc net.ListenConfig - _, err := startAdminServer(ctx, &lc, eg, adminListenerConfig{enabled: false}, nil, nil, nil, "") + _, err := startAdminServer(ctx, &lc, eg, adminListenerConfig{enabled: false}, nil, nil, nil, nil, "") require.NoError(t, err) } @@ -211,7 +211,7 @@ func TestStartAdminServer_InvalidConfigRejected(t *testing.T) { listen: "127.0.0.1:0", // missing signing key } - _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, nil, "") + _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, nil, nil, "") require.Error(t, err) } @@ -224,7 +224,7 @@ func TestStartAdminServer_NonLoopbackWithoutTLSRejected(t *testing.T) { listen: "0.0.0.0:0", sessionSigningKey: freshKey(), } - _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, nil, "") + _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, nil, nil, "") require.Error(t, err) require.Contains(t, err.Error(), "TLS") } @@ -238,7 +238,7 @@ func TestStartAdminServer_RejectsMissingClusterSource(t *testing.T) { listen: "127.0.0.1:0", sessionSigningKey: freshKey(), } - _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, nil, "") + _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, nil, nil, "") require.Error(t, err) require.Contains(t, err.Error(), "cluster info source") } @@ -261,7 +261,7 @@ func TestStartAdminServer_ServesHealthz(t *testing.T) { cluster := admin.ClusterInfoFunc(func(_ context.Context) (admin.ClusterInfo, error) { return admin.ClusterInfo{NodeID: "n1", Version: "test"}, nil }) - addr, err := startAdminServer(eCtx, &lc, eg, cfg, map[string]string{}, cluster, nil, "test") + addr, err := startAdminServer(eCtx, &lc, eg, cfg, map[string]string{}, cluster, nil, nil, "test") require.NoError(t, err) // Poll /admin/healthz until success or the test deadline. @@ -304,7 +304,7 @@ func TestStartAdminServer_ServesTLS(t *testing.T) { cluster := admin.ClusterInfoFunc(func(_ context.Context) (admin.ClusterInfo, error) { return admin.ClusterInfo{NodeID: "n-tls", Version: "test"}, nil }) - addr, err := startAdminServer(eCtx, &lc, eg, cfg, map[string]string{}, cluster, nil, "test") + addr, err := startAdminServer(eCtx, &lc, eg, cfg, map[string]string{}, cluster, nil, nil, "test") require.NoError(t, err) transport := &http.Transport{TLSClientConfig: &tls.Config{