Skip to content

Commit 446a60f

Browse files
authored
feat(encryption): Stage 7c — ConfChange-time writer registration (#872)
## Summary Implements the merged 7c design (PR #868). Pre-registers a new node's writer-registry row from the admin-RPC `AddVoter`/`AddLearner` handler so that: 1. **Write-window elimination** (optimization): encrypted writes from the new node never fail closed on the 7a-2 storage gate after the conf-change commits. 2. **Collision-safe membership change** (correctness): a §6.1 uint16 NodeID collision is caught at the RPC layer with `ErrWriterUint16Collision` **before** the conf-change is proposed — no §4.1 case-4 halt-apply on a durable conf-change entry. ### Three changes 1. **`internal/raftadmin`**: `MembershipChangeInterceptor` interface + `NewServerWithInterceptor` constructor + `RegisterOperationalServicesWithInterceptor`. `Server.AddVoter`/`AddLearner` invoke the optional `PreAddMember` hook before the engine's conf-change proposal. Nil interceptor preserves the pre-7c posture. 2. **`internal/encryption`**: `ErrWriterUint16Collision` typed error for the §6.1 collision branch. 3. **`main`**: `encryptionPreRegister` adapter implements the interceptor — read-before-propose guard against §4.1 case-3 (re-proposing `epoch=0` against an existing `(epoch=0)` row), idempotent skip on FullNodeID match, typed error on FullNodeID mismatch (§6.1 collision), case-1 first-seen propose otherwise. Wired into `serversInput → runtimeServerRunner → startRaftServers → RegisterOperationalServicesWithInterceptor` so the encryption adapter coupling stays at the main-level wiring point. ### Test coverage (per design §5) - `internal/raftadmin/server_test.go`: - `TestServer_AddVoter_InvokesInterceptorBeforeConfChange` — ordering: hook called, then engine.AddVoter. - `TestServer_AddVoter_InterceptorErrorAbortsConfChange` — engine.AddVoter NOT called when hook errs. - `TestServer_AddVoter_NilInterceptorSkipsPreStep` — pre-7c posture preserved. - `TestServer_AddLearner_InterceptorContract` — symmetric sub-tests for AddLearner. - `main_encryption_confchange_test.go`: - `TestEncryptionPreRegister_PreBootstrapSkips` — `ActiveStorageKeyID() == (0, false)` → nil-skip. - `TestEncryptionPreRegister_NilCacheReturnsNilInterceptor` — defensive nil-check at constructor. - `TestEncryptionPreRegister_IdempotentWhenRowExists` — pins the §3.1 read-before-propose guard against the §4.1 case-3 `ErrLocalEpochRollback` regression (claude round-2 BLOCKING on PR #868). - `TestEncryptionPreRegister_Uint16CollisionReturnsTypedError` — pins the §6.1 collision branch. ## 5-lens self review - **Data loss**: clean. Pre-register propose is additive (`0x03` entry + §4.1 case-1 insert); no user-data-path change. - **Concurrency**: pre-register runs on the leader's RPC goroutine; the guard catches the leader-flip retry race via the durable row, NOT via case-2 monotonic (which requires strictly greater epoch). - **Performance**: one registry read + at most one Raft round-trip per `AddVoter`/`AddLearner` call. Encryption-unaware path unchanged (nil interceptor skips the pre-step). - **Data consistency**: §4.1 case-1 first-seen for genuinely new members; guard returns idempotent-skip or `ErrWriterUint16Collision` for existing rows. §6.1 case-4 halt-apply is restricted to the narrow TOCTOU residual (two concurrent AddVoter for uint16-colliding raftIDs). - **Test coverage**: complete per design §5 (raftadmin contract + adapter branches). The §3.5 leader-flip retry path is pinned by `IdempotentWhenRowExists`. End-to-end test deferred per design §5.3 (requires 2-node test cluster fixture; unit + integration above cover the contract surface). ## Test plan - [x] `go test -run 'TestApplier|TestApplyRotation|TestApplyBootstrap|TestRuntimeRegistration|TestBuildProcessStartRegistration|TestRegistrationCommittedAtEpoch|TestEncryption_E2E|TestRunWriterRegistration|TestServer|TestEncryptionPreRegister|TestRegisterOperational' -race -timeout 180s ./internal/encryption ./internal/raftadmin ./.` — green. - [x] `make lint` clean. - [ ] @claude review for correctness of the interceptor wiring, guard-branch invariants, and main-level construction. 🤖 Generated with [Claude Code](https://claude.com/claude-code)
2 parents 749d8fb + b221d9a commit 446a60f

8 files changed

Lines changed: 556 additions & 42 deletions

File tree

internal/encryption/errors.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,18 @@ var (
120120
// than a nil-pointer panic deep in the apply path.
121121
ErrKEKNotConfigured = errors.New("encryption: KEK not configured on this node; cannot unwrap wrapped DEK material")
122122

123+
// ErrWriterUint16Collision is the Stage 7c §3.3 typed error returned
124+
// by the encryption-aware MembershipChangeInterceptor when a new
125+
// node's NodeID16 collides with an existing member's writer-
126+
// registry row at the same uint16 truncation but with a different
127+
// FullNodeID. The §3.1 read-before-propose guard catches the
128+
// common case BEFORE any propose, so the admin RPC returns a
129+
// retryable client-facing error rather than triggering a §4.1
130+
// case-4 halt apply (which would, without 7c's interceptor, fire
131+
// AFTER the conf-change had already committed — bricking the
132+
// cluster). Operators choose a non-colliding raftID and retry.
133+
ErrWriterUint16Collision = errors.New("encryption: writer registry uint16 collision; choose a different raftID")
134+
123135
// ErrSidecarPresentWithoutFlag is the §9.1 startup-refusal guard
124136
// raised when the data dir already contains a sidecar (keys.json)
125137
// but --encryption-enabled is NOT set. Continuing would silently

internal/raftadmin/health.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,30 @@ const (
2121
)
2222

2323
func RegisterOperationalServices(ctx context.Context, gs *grpc.Server, engine raftengine.Engine, serviceNames []string) {
24+
RegisterOperationalServicesWithInterceptor(ctx, gs, engine, serviceNames, nil)
25+
}
26+
27+
// RegisterOperationalServicesWithInterceptor is the Stage 7c variant
28+
// of RegisterOperationalServices that installs an optional
29+
// [MembershipChangeInterceptor] on the underlying [Server] so
30+
// AddVoter/AddLearner run an encryption-aware pre-step before the
31+
// conf-change proposal. Passing nil is equivalent to
32+
// [RegisterOperationalServices].
33+
func RegisterOperationalServicesWithInterceptor(
34+
ctx context.Context,
35+
gs *grpc.Server,
36+
engine raftengine.Engine,
37+
serviceNames []string,
38+
interceptor MembershipChangeInterceptor,
39+
) {
2440
if gs == nil {
2541
return
2642
}
2743
if ctx == nil {
2844
panic("raftadmin: RegisterOperationalServices requires non-nil context")
2945
}
3046

31-
pb.RegisterRaftAdminServer(gs, NewServer(engine))
47+
pb.RegisterRaftAdminServer(gs, NewServerWithInterceptor(engine, interceptor))
3248

3349
healthSrv := health.NewServer()
3450
healthpb.RegisterHealthServer(gs, healthSrv)

internal/raftadmin/interceptor.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package raftadmin
2+
3+
import "context"
4+
5+
// MembershipChangeInterceptor lets a caller of [Server] inject a
6+
// pre-step into [Server.AddVoter] and [Server.AddLearner] that runs
7+
// **before** the underlying Raft engine proposes the configuration
8+
// change. A non-nil error from PreAddMember aborts the conf-change
9+
// proposal; the error is returned to the gRPC caller verbatim.
10+
//
11+
// Stage 7c uses this hook to pre-register a new node's writer-
12+
// registry row (see
13+
// docs/design/2026_05_29_proposed_7c_confchange_time_registration.md).
14+
// Keeping the encryption-aware adapter outside this package preserves
15+
// raftadmin's engine-generic posture — no concrete dependency on
16+
// the KV or encryption layers.
17+
//
18+
// The hook is intentionally optional: when nil is passed to [NewServer]
19+
// (or [NewServerWithInterceptor] is not used), AddVoter/AddLearner run
20+
// exactly as they did pre-7c. Encryption-unaware builds and
21+
// encryption-disabled clusters therefore see no behavior change.
22+
type MembershipChangeInterceptor interface {
23+
// PreAddMember runs on the leader before AddVoter/AddLearner
24+
// proposes the conf-change. The raftID is the same string the
25+
// caller passed in the AddVoter/AddLearner request (the
26+
// `Id` field).
27+
PreAddMember(ctx context.Context, raftID string) error
28+
}

internal/raftadmin/server.go

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,31 @@ import (
1313
type Server struct {
1414
engine raftengine.Engine
1515
admin raftengine.Admin
16+
// interceptor is invoked from AddVoter/AddLearner before the
17+
// underlying Raft engine proposes the conf-change. nil = no
18+
// pre-step (the pre-7c posture). Stage 7c wires the encryption-
19+
// aware adapter via NewServerWithInterceptor; encryption-unaware
20+
// builds keep this nil.
21+
interceptor MembershipChangeInterceptor
1622

1723
pb.UnimplementedRaftAdminServer
1824
}
1925

2026
func NewServer(engine raftengine.Engine) *Server {
27+
return NewServerWithInterceptor(engine, nil)
28+
}
29+
30+
// NewServerWithInterceptor constructs a Server with an optional
31+
// pre-step that runs before AddVoter/AddLearner propose the
32+
// conf-change. See [MembershipChangeInterceptor]. Passing nil is
33+
// equivalent to [NewServer] — the pre-step is skipped and the
34+
// conf-change runs exactly as the pre-7c posture.
35+
func NewServerWithInterceptor(engine raftengine.Engine, interceptor MembershipChangeInterceptor) *Server {
2136
admin, _ := any(engine).(raftengine.Admin)
2237
return &Server{
23-
engine: engine,
24-
admin: admin,
38+
engine: engine,
39+
admin: admin,
40+
interceptor: interceptor,
2541
}
2642
}
2743

@@ -73,6 +89,15 @@ func (s *Server) AddVoter(ctx context.Context, req *pb.RaftAdminAddVoterRequest)
7389
if req == nil || req.Id == "" || req.Address == "" {
7490
return nil, grpcStatus(codes.InvalidArgument, "id and address are required")
7591
}
92+
// Stage 7c §3.1 pre-step: run the encryption-aware adapter (if any)
93+
// before the conf-change proposal so the new node's writer-registry
94+
// row exists at apply time and any §6.1 uint16 collision halts here
95+
// rather than after the conf-change is durable.
96+
if s.interceptor != nil {
97+
if err := s.interceptor.PreAddMember(ctx, req.Id); err != nil {
98+
return nil, adminError(err)
99+
}
100+
}
76101
index, err := s.admin.AddVoter(ctx, req.Id, req.Address, req.PreviousIndex)
77102
if err != nil {
78103
return nil, adminError(err)
@@ -87,6 +112,11 @@ func (s *Server) AddLearner(ctx context.Context, req *pb.RaftAdminAddLearnerRequ
87112
if req == nil || req.Id == "" || req.Address == "" {
88113
return nil, grpcStatus(codes.InvalidArgument, "id and address are required")
89114
}
115+
if s.interceptor != nil {
116+
if err := s.interceptor.PreAddMember(ctx, req.Id); err != nil {
117+
return nil, adminError(err)
118+
}
119+
}
90120
index, err := s.admin.AddLearner(ctx, req.Id, req.Address, req.PreviousIndex)
91121
if err != nil {
92122
return nil, adminError(err)

internal/raftadmin/server_test.go

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,14 @@ type fakeEngine struct {
2929
removeServerCalls []fakeRemoveServerCall
3030
transferCalls int
3131
targetTransferCalls []fakeTransferCall
32+
33+
// addVoterHook is invoked synchronously inside AddVoter, before
34+
// recording the call. Tests use this to observe the ordering of
35+
// the engine call relative to the interceptor's hook so the
36+
// "interceptor runs before engine.AddVoter" invariant is actually
37+
// pinned (claude review on PR #872 — without the hook the test
38+
// just observed that both ran in any order).
39+
addVoterHook func()
3240
}
3341

3442
type fakeAddVoterCall struct {
@@ -98,6 +106,12 @@ func (f *fakeEngine) CheckServing(context.Context) error {
98106
}
99107

100108
func (f *fakeEngine) AddVoter(_ context.Context, id string, address string, prevIndex uint64) (uint64, error) {
109+
f.mu.Lock()
110+
hook := f.addVoterHook
111+
f.mu.Unlock()
112+
if hook != nil {
113+
hook()
114+
}
101115
f.mu.Lock()
102116
defer f.mu.Unlock()
103117
f.addVoterCalls = append(f.addVoterCalls, fakeAddVoterCall{id: id, address: address, prevIndex: prevIndex})
@@ -348,3 +362,107 @@ func TestRegisterOperationalServicesRequiresContext(t *testing.T) {
348362
RegisterOperationalServices(nilCtx, server, &fakeEngine{}, []string{"RawKV"})
349363
})
350364
}
365+
366+
// recordingInterceptor is a test double for MembershipChangeInterceptor
367+
// that records every PreAddMember call and optionally returns a
368+
// pre-configured error so tests can drive the abort branches.
369+
type recordingInterceptor struct {
370+
mu sync.Mutex
371+
calls []string
372+
retErr error
373+
preHook func(raftID string) // optional callback; useful for ordering assertions
374+
}
375+
376+
func (r *recordingInterceptor) PreAddMember(_ context.Context, raftID string) error {
377+
r.mu.Lock()
378+
defer r.mu.Unlock()
379+
r.calls = append(r.calls, raftID)
380+
if r.preHook != nil {
381+
r.preHook(raftID)
382+
}
383+
return r.retErr
384+
}
385+
386+
// TestServer_AddVoter_InvokesInterceptorBeforeConfChange pins Stage 7c
387+
// §5.1 contract item 1: AddVoter calls the interceptor (with the raftID
388+
// from the request) before the engine's AddVoter, in that order. The
389+
// fakeEngine.addVoterHook appends "addVoter" synchronously from inside
390+
// the engine call, so the recorded ordering pins interceptor-before-
391+
// engine, not just that-both-ran (claude review low finding on PR #872).
392+
func TestServer_AddVoter_InvokesInterceptorBeforeConfChange(t *testing.T) {
393+
t.Parallel()
394+
var order []string
395+
engine := &fakeEngine{addVoterHook: func() { order = append(order, "addVoter") }}
396+
interceptor := &recordingInterceptor{preHook: func(string) { order = append(order, "preAdd") }}
397+
server := NewServerWithInterceptor(engine, interceptor)
398+
resp, err := server.AddVoter(context.Background(), &pb.RaftAdminAddVoterRequest{Id: "n42", Address: "127.0.0.1:9999"})
399+
require.NoError(t, err)
400+
require.NotNil(t, resp)
401+
require.Equal(t, []string{"n42"}, interceptor.calls)
402+
require.Equal(t, []string{"preAdd", "addVoter"}, order)
403+
require.Equal(t, 1, len(engine.addVoterCalls))
404+
}
405+
406+
// TestServer_AddVoter_InterceptorErrorAbortsConfChange pins §5.1
407+
// contract item 2: a non-nil PreAddMember error aborts AddVoter; the
408+
// engine's AddVoter is never called.
409+
func TestServer_AddVoter_InterceptorErrorAbortsConfChange(t *testing.T) {
410+
t.Parallel()
411+
engine := &fakeEngine{}
412+
sentinel := context.DeadlineExceeded
413+
interceptor := &recordingInterceptor{retErr: sentinel}
414+
server := NewServerWithInterceptor(engine, interceptor)
415+
resp, err := server.AddVoter(context.Background(), &pb.RaftAdminAddVoterRequest{Id: "n42", Address: "127.0.0.1:9999"})
416+
require.Error(t, err)
417+
require.Nil(t, resp)
418+
require.Equal(t, []string{"n42"}, interceptor.calls)
419+
require.Equal(t, 0, len(engine.addVoterCalls), "engine.AddVoter must NOT be called when PreAddMember errs")
420+
}
421+
422+
// TestServer_AddVoter_NilInterceptorSkipsPreStep pins §5.1 contract
423+
// item 3: with no interceptor installed (the pre-7c posture, or an
424+
// encryption-disabled build), AddVoter proceeds directly to the
425+
// engine's AddVoter as today.
426+
func TestServer_AddVoter_NilInterceptorSkipsPreStep(t *testing.T) {
427+
t.Parallel()
428+
engine := &fakeEngine{}
429+
server := NewServer(engine) // nil interceptor by construction
430+
resp, err := server.AddVoter(context.Background(), &pb.RaftAdminAddVoterRequest{Id: "n42", Address: "127.0.0.1:9999"})
431+
require.NoError(t, err)
432+
require.NotNil(t, resp)
433+
require.Equal(t, 1, len(engine.addVoterCalls))
434+
}
435+
436+
// TestServer_AddLearner_InterceptorContract is the symmetric set for
437+
// AddLearner — combined into one test since the behavior mirrors
438+
// AddVoter exactly.
439+
func TestServer_AddLearner_InterceptorContract(t *testing.T) {
440+
t.Parallel()
441+
t.Run("invokes interceptor then conf-change", func(t *testing.T) {
442+
t.Parallel()
443+
engine := &fakeEngine{}
444+
interceptor := &recordingInterceptor{}
445+
server := NewServerWithInterceptor(engine, interceptor)
446+
_, err := server.AddLearner(context.Background(), &pb.RaftAdminAddLearnerRequest{Id: "l1", Address: "127.0.0.1:9000"})
447+
require.NoError(t, err)
448+
require.Equal(t, []string{"l1"}, interceptor.calls)
449+
require.Equal(t, 1, len(engine.addLearnerCalls))
450+
})
451+
t.Run("interceptor error aborts", func(t *testing.T) {
452+
t.Parallel()
453+
engine := &fakeEngine{}
454+
interceptor := &recordingInterceptor{retErr: context.DeadlineExceeded}
455+
server := NewServerWithInterceptor(engine, interceptor)
456+
_, err := server.AddLearner(context.Background(), &pb.RaftAdminAddLearnerRequest{Id: "l1", Address: "127.0.0.1:9000"})
457+
require.Error(t, err)
458+
require.Equal(t, 0, len(engine.addLearnerCalls))
459+
})
460+
t.Run("nil interceptor skips pre-step", func(t *testing.T) {
461+
t.Parallel()
462+
engine := &fakeEngine{}
463+
server := NewServer(engine)
464+
_, err := server.AddLearner(context.Background(), &pb.RaftAdminAddLearnerRequest{Id: "l1", Address: "127.0.0.1:9000"})
465+
require.NoError(t, err)
466+
require.Equal(t, 1, len(engine.addLearnerCalls))
467+
})
468+
}

0 commit comments

Comments
 (0)