Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion internal/admin/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ type ServerDeps struct {
// cluster page deploy without standing up the dynamo bridge.
Tables TablesSource

// Forwarder is the LeaderForwarder that the Dynamo handler hands
// off ErrTablesNotLeader writes to (design 3.3, AdminForward).
// Optional: a nil value disables follower→leader forwarding, in
// which case the handler surfaces 503 + Retry-After: 1 directly.
// Single-node and leader-only deployments leave this nil; multi-
// node clusters wire the production gRPC client.
Forwarder LeaderForwarder

// StaticFS is the embed.FS (or any fs.FS) backing the SPA. May be
// nil during early development; the router renders 404 for
// /admin/assets/* and the SPA fallback in that case.
Expand Down Expand Up @@ -110,9 +118,13 @@ func NewServer(deps ServerDeps) (*Server, error) {
// operators must restart the listener for revocation to
// take effect, but the JWT no longer extends a revoked
// key past the next request.
dynamo = NewDynamoHandler(deps.Tables).
dynamoHandler := NewDynamoHandler(deps.Tables).
WithLogger(logger).
WithRoleStore(MapRoleStore(deps.Roles))
if deps.Forwarder != nil {
dynamoHandler = dynamoHandler.WithLeaderForwarder(deps.Forwarder)
}
dynamo = dynamoHandler
}
mux := buildAPIMux(auth, deps.Verifier, cluster, dynamo, logger)
router := NewRouter(mux, deps.StaticFS)
Expand Down
51 changes: 51 additions & 0 deletions internal/admin/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,57 @@ func TestServer_DynamoDeleteTable_FullRoleHappyPath(t *testing.T) {
_ = resp.Body.Close()
}

// TestServer_ServerDepsForwarderIsWired confirms that a non-nil
// ServerDeps.Forwarder reaches the DynamoHandler so a follower-side
// CreateTable that the source returns ErrTablesNotLeader for is
// transparently forwarded. Without the wire, the request would 503 +
// Retry-After:1 — the no-forwarder fallback path. With it, the
// leader's response status (here, 201) is replayed verbatim.
func TestServer_ServerDepsForwarderIsWired(t *testing.T) {
src := &notLeaderSource{}
fwd := &stubLeaderForwarder{createRes: &ForwardResult{
StatusCode: http.StatusCreated,
Payload: []byte(`{"name":"users"}`),
ContentType: "application/json; charset=utf-8",
}}
clk := fixedClock(time.Unix(1_700_000_000, 0).UTC())
signer := newSignerForTest(t, 1, clk)
verifier := newVerifierForTest(t, []byte{1}, clk)
cluster := ClusterInfoFunc(func(_ context.Context) (ClusterInfo, error) {
return ClusterInfo{NodeID: "node-1", Version: "0.1.0"}, nil
})
srv, err := NewServer(ServerDeps{
Signer: signer,
Verifier: verifier,
Credentials: MapCredentialStore{"AKIA_ADMIN": "ADMIN_SECRET"},
Roles: map[string]Role{"AKIA_ADMIN": RoleFull},
ClusterInfo: cluster,
Tables: src,
Forwarder: fwd,
AuthOpts: AuthServiceOpts{Clock: clk},
})
require.NoError(t, err)

ts := httptest.NewServer(srv.Handler())
defer ts.Close()
cookies := loginAsFullAdminAndCookies(t, ts)
body := strings.NewReader(`{"table_name":"users","partition_key":{"name":"id","type":"S"}}`)
req, err := http.NewRequestWithContext(context.Background(), http.MethodPost,
ts.URL+"/admin/api/v1/dynamo/tables", body)
require.NoError(t, err)
req.Header.Set("Content-Type", "application/json")
req.Header.Set(csrfHeaderName, csrfHeaderFromCookies(cookies))
for _, c := range cookies {
req.AddCookie(c)
}
resp, err := http.DefaultClient.Do(req)
require.NoError(t, err)
require.Equal(t, http.StatusCreated, resp.StatusCode)
require.Equal(t, "users", fwd.lastCreateInput.TableName,
"forwarder must be invoked when source returns ErrTablesNotLeader")
_ = resp.Body.Close()
}

func TestServer_WriteRejectsMissingCSRF(t *testing.T) {
// Login to obtain a session, then hit cluster with POST to trigger
// CSRF on what the router normally rejects as method_not_allowed.
Expand Down
79 changes: 73 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/bootjp/elastickv/adapter"
"github.com/bootjp/elastickv/distribution"
internalutil "github.com/bootjp/elastickv/internal"
"github.com/bootjp/elastickv/internal/admin"
"github.com/bootjp/elastickv/internal/memwatch"
internalraftadmin "github.com/bootjp/elastickv/internal/raftadmin"
"github.com/bootjp/elastickv/internal/raftengine"
Expand Down Expand Up @@ -653,6 +654,40 @@ func startServers(in serversInput) error {
if err != nil {
return err
}
// roleStore + connCache are gated on *adminEnabled. With admin
// disabled, building either is wasted work AND a security
// regression risk: a non-empty -adminFullAccessKeys flag would
// otherwise still flip forwardDeps.readyForRegistration() to
// true, registering the leader-side gRPC AdminForward service
// and re-exposing the table-write surface a follower-direct
// admin call could reach (Codex P1, CodeRabbit Major on #648).
// The HTTP admin listener already short-circuits in
// startAdminFromFlags when *adminEnabled is false; the gRPC path
// must do the same.
var (
roleStore admin.RoleStore
connCache *kv.GRPCConnCache
)
if *adminEnabled {
roleStore = roleStoreFromFlags(parseCSV(*adminFullAccessKeys), parseCSV(*adminReadOnlyAccessKeys))
// connCache is shared between the follower-side LeaderForwarder
// (built inside startAdminFromFlags) and any future bridge that
// dials the leader's gRPC ports. Keeping a single instance per
// process means the two paths re-use TLS / HTTP/2 connections
// rather than each maintaining a parallel pool. The shutdown
// goroutine drains the cache on context cancellation so the
// accumulated HTTP/2 connections are not leaked when the
// process exits gracefully (Claude review on #648).
connCache = &kv.GRPCConnCache{}
cache := connCache
in.eg.Go(func() error {
<-in.ctx.Done()
if err := cache.Close(); err != nil {
return errors.Wrap(err, "close admin gRPC connection cache")
}
return nil
})
}
runner := runtimeServerRunner{
ctx: in.ctx,
lc: in.lc,
Expand Down Expand Up @@ -684,6 +719,7 @@ func startServers(in serversInput) error {
pprofAddress: *pprofAddr,
pprofToken: *pprofToken,
metricsRegistry: in.metricsRegistry,
roleStore: roleStore,
}
if err := runner.start(); err != nil {
return err
Expand All @@ -693,7 +729,14 @@ func startServers(in serversInput) error {
// Passing nil here would leave the admin dashboard with no
// access to table metadata; the admin handler answers
// /admin/api/v1/dynamo/* with 404 in that case.
if err := startAdminFromFlags(in.ctx, in.lc, in.eg, in.runtimes, runner.dynamoServer); err != nil {
//
// in.coordinate + connCache are forwarded so the admin HTTP
// dynamo handler can construct its production LeaderForwarder
// (Phase 3 of design 3.3): when the local node is a follower,
// the handler hands ErrTablesNotLeader writes to the forwarder
// which dials the leader over the cached gRPC pool. Without these
// the handler falls back to 503 + Retry-After:1.
if err := startAdminFromFlags(in.ctx, in.lc, in.eg, in.runtimes, runner.dynamoServer, in.coordinate, connCache); err != nil {
return waitErrgroupAfterStartupFailure(in.cancel, in.eg, err)
}
return nil
Expand Down Expand Up @@ -940,7 +983,9 @@ func startRaftServers(
proposalObserverForGroup func(uint64) kv.ProposalObserver,
adminServer *adapter.AdminServer,
adminGRPCOpts adminGRPCInterceptors,
forwardDeps adminForwardServerDeps,
) error {
forwardLogger := slog.Default().With(slog.String("component", "admin"))
// extraOptsCap reserves slots for the unary + stream admin interceptor
// options appended below. Sized as a constant so the magic-number
// linter does not complain.
Expand Down Expand Up @@ -970,6 +1015,7 @@ func startRaftServers(
if adminServer != nil {
pb.RegisterAdminServer(gs, adminServer)
}
registerAdminForwardServer(gs, forwardDeps, forwardLogger)
rt.registerGRPC(gs)
internalraftadmin.RegisterOperationalServices(ctx, gs, rt.engine, []string{"RawKV"})
reflection.Register(gs)
Expand Down Expand Up @@ -1220,12 +1266,37 @@ type runtimeServerRunner struct {
// field is unexported on purpose — it is package-private state,
// not a public API. Nil until start() reaches the dynamo step.
dynamoServer *adapter.DynamoDBServer

// roleStore is the access-key → role index the leader-side
// gRPC AdminForward service uses to re-validate the principal
// on every forwarded write. Mirrors what admin.Config.RoleIndex
// produces inside startAdminFromFlags; built up-front in
// startServers so registerAdminForwardServer in startRaftServers
// does not need to wait for the (later) admin-config parse.
// Nil when no admin access keys are configured.
roleStore admin.RoleStore
}

func (r *runtimeServerRunner) start() error {
if err := startRedisServer(r.ctx, r.lc, r.eg, r.redisAddress, r.shardStore, r.coordinate, r.leaderRedis, r.pubsubRelay, r.metricsRegistry, r.readTracker); err != nil {
return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err)
}
// startDynamoDBServer must run BEFORE startRaftServers so the
// resulting DynamoDBServer is available to the leader-side gRPC
// AdminForward registration in startRaftServers (design 3.3).
// Both servers listen on different addresses; the dynamo HTTP
// listener accepting traffic before raft TCP listeners are up
// is no different from the existing startup-race semantics — a
// hit in that window already returned 503 before this reorder.
dynamoServer, err := startDynamoDBServer(r.ctx, r.lc, r.eg, r.dynamoAddress, r.shardStore, r.coordinate, r.leaderDynamo, r.metricsRegistry, r.readTracker)
if err != nil {
return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err)
}
r.dynamoServer = dynamoServer
forwardDeps := adminForwardServerDeps{
tables: newDynamoTablesSource(r.dynamoServer),
roles: r.roleStore,
}
if err := startRaftServers(
r.ctx,
r.lc,
Expand All @@ -1240,14 +1311,10 @@ func (r *runtimeServerRunner) start() error {
},
r.adminServer,
r.adminGRPCOpts,
forwardDeps,
); err != nil {
return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err)
}
dynamoServer, err := startDynamoDBServer(r.ctx, r.lc, r.eg, r.dynamoAddress, r.shardStore, r.coordinate, r.leaderDynamo, r.metricsRegistry, r.readTracker)
if err != nil {
return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err)
}
r.dynamoServer = dynamoServer
if err := startS3Server(r.ctx, r.lc, r.eg, r.s3Address, r.shardStore, r.coordinate, r.leaderS3, r.s3Region, r.s3CredsFile, r.s3PathStyleOnly, r.readTracker); err != nil {
return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err)
}
Expand Down
48 changes: 44 additions & 4 deletions main_admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,15 @@ type adminListenerConfig struct {
// without touching --s3CredentialsFile: pulling the admin feature into
// a hard dependency on that file would break deployments that never
// intended to use it.
func startAdminFromFlags(ctx context.Context, lc *net.ListenConfig, eg *errgroup.Group, runtimes []*raftGroupRuntime, dynamoServer *adapter.DynamoDBServer) error {
func startAdminFromFlags(
ctx context.Context,
lc *net.ListenConfig,
eg *errgroup.Group,
runtimes []*raftGroupRuntime,
dynamoServer *adapter.DynamoDBServer,
coordinate kv.Coordinator,
connCache *kv.GRPCConnCache,
) error {
if !*adminEnabled {
return nil
}
Expand Down Expand Up @@ -109,10 +117,40 @@ func startAdminFromFlags(ctx context.Context, lc *net.ListenConfig, eg *errgroup
}
clusterSrc := newClusterInfoSource(*raftId, buildVersion(), runtimes)
tablesSrc := newDynamoTablesSource(dynamoServer)
_, err = startAdminServer(ctx, lc, eg, cfg, staticCreds, clusterSrc, tablesSrc, buildVersion())
forwarder, err := buildAdminLeaderForwarder(coordinate, connCache, *raftId)
if err != nil {
return errors.Wrap(err, "build admin leader forwarder")
}
_, err = startAdminServer(ctx, lc, eg, cfg, staticCreds, clusterSrc, tablesSrc, forwarder, buildVersion())
return err
}

// buildAdminLeaderForwarder constructs the production LeaderForwarder
// for the dynamo HTTP handler when the wiring is complete enough to
// reach a remote leader. The bridge tolerates a nil connCache (and a
// nil coordinate) so single-node / leader-only builds — where the
// dashboard always hits a leader — can ship without paying the
// forwarder's wiring cost. tablesSrc itself can be nil for cluster-
// only builds; that's handled higher up by ServerDeps.Tables == nil.
func buildAdminLeaderForwarder(coordinate kv.Coordinator, connCache *kv.GRPCConnCache, nodeID string) (admin.LeaderForwarder, error) {
if coordinate == nil || connCache == nil {
// Returning (nil, nil) is the explicit "no forwarder" signal
// — the handler falls back to 503 + Retry-After:1 on
// ErrTablesNotLeader. The function-level doc comment above
// describes this contract; the nilnil linter is not enabled
// in .golangci.yaml so no suppression directive is needed
// (Claude review on #648).
return nil, nil
}
if nodeID == "" {
// admin.NewGRPCForwardClient enforces this too; surfacing
// it here keeps the misconfiguration message in the wiring
// layer rather than buried under a Wrap chain.
return nil, errors.New("admin forward bridge: --raftId is required")
}
return buildLeaderForwarder(coordinate, connCache, nodeID)
}

// newDynamoTablesSource adapts *adapter.DynamoDBServer to the
// admin.TablesSource interface. The bridge stays in this file (rather
// than internal/admin) so the admin package stays free of the heavy
Expand Down Expand Up @@ -346,14 +384,15 @@ func startAdminServer(
creds map[string]string,
cluster admin.ClusterInfoSource,
tables admin.TablesSource,
forwarder admin.LeaderForwarder,
version string,
) (string, error) {
adminCfg := buildAdminConfig(cfg)
enabled, err := checkAdminConfig(&adminCfg, cluster)
if err != nil || !enabled {
return "", err
}
server, err := buildAdminHTTPServer(&adminCfg, creds, cluster, tables)
server, err := buildAdminHTTPServer(&adminCfg, creds, cluster, tables, forwarder)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -393,7 +432,7 @@ func checkAdminConfig(adminCfg *admin.Config, cluster admin.ClusterInfoSource) (
return true, nil
}

func buildAdminHTTPServer(adminCfg *admin.Config, creds map[string]string, cluster admin.ClusterInfoSource, tables admin.TablesSource) (*admin.Server, error) {
func buildAdminHTTPServer(adminCfg *admin.Config, creds map[string]string, cluster admin.ClusterInfoSource, tables admin.TablesSource, forwarder admin.LeaderForwarder) (*admin.Server, error) {
primaryKeys, err := adminCfg.DecodedSigningKeys()
if err != nil {
return nil, errors.Wrap(err, "decode admin signing keys")
Expand All @@ -413,6 +452,7 @@ func buildAdminHTTPServer(adminCfg *admin.Config, creds map[string]string, clust
Roles: adminCfg.RoleIndex(),
ClusterInfo: cluster,
Tables: tables,
Forwarder: forwarder,
StaticFS: nil,
AuthOpts: admin.AuthServiceOpts{
InsecureCookie: adminCfg.AllowInsecureDevCookie,
Expand Down
Loading
Loading