Skip to content

Commit eb371ca

Browse files
authored
feat(encryption): Stage 6E-2c — dynamic raft envelope wrap on ShardGroup (#922)
## Summary Stage 6E-2c: install the Stage 6E-2 wrap-on-propose plumbing on every shard group without yet activating any cipher. Every `ShardGroup` gets an `atomic.Pointer[RaftPayloadWrapper]` cell; the proposer chain consults that pointer on every `Propose` / `ProposeAdmin` call; `SetRaftPayloadWrap` is the hot-swap surface Stage 6E-2d will use when `EnableRaftEnvelope` commits. The wrap closure factory itself ships in Stage 6E-2e/2f. A startup hook reads the sidecar and logs a `WARN` if `RaftEnvelopeCutoverIndex != 0` — operator-visible refusal of a deployment that crosses the cutover line under a binary without the cipher factory wired. ## Pieces - `dynamicWrappedProposer` (kv/raft_payload_wrapper.go): wraps a `raftengine.Proposer` with an `atomic.Pointer.Load` on every call. Mirrors `wrappedProposer`'s Propose / ProposeAdmin routing and wrap semantics — the wrap layer is NOT a barrier exemption; Propose and ProposeAdmin apply the same wrap. The cutover marker stays cleartext via the raw-engine reference at the call site, not at the method level. - `ShardGroup.raftPayloadWrap` + `SetRaftPayloadWrap` + `RaftPayloadWrap` (kv/sharded_coordinator.go): the published hot-swap surface. - `NewLeaderProxyForShardGroup` + `withDynamicRaftPayloadWrap` (kv/sharded_coordinator.go, kv/transaction.go): the production wiring path. `main.go`'s `buildShardGroups` now constructs every shard group's `LeaderProxy` via this helper so the dynamic wrap chain is universal. - `noteRaftEnvelopeCutoverStartup` (main_encryption_write_wiring.go): process-start sidecar read. Silent on the universal pre-cutover case and on any read error (the Stage 6C-1 startup guard is the authoritative refuser). Logs `WARN` only when the cutover index is non-zero but the binary does not (yet) ship the cipher factory. ## Tests (8 new) - `TestDynamicWrappedProposer_NilPointerReturnsInnerVerbatim` — defensive degradation for nil `*atomic.Pointer`. - `TestDynamicWrappedProposer_NilStoredIsPassThrough` — Stage 3 default (verbatim routing for Propose and ProposeAdmin separately). - `TestDynamicWrappedProposer_LoadsCurrentWrapEveryCall` — hot-swap contract (install / clear observed by the next proposal). - `TestDynamicWrappedProposer_ProposeAdminAppliesCurrentWrap` — admin-path mirror; confirms `inner.ProposeAdmin` (not `Propose`) is reached when wrap is active. - `TestShardGroup_SetRaftPayloadWrap_RoundTrip` — Set/Get round-trip including the nil-clears semantics. - `TestNoteRaftEnvelopeCutoverStartup_*` (×4) — empty path / missing file / zero cutover / non-zero cutover (asserts the hook is read-only). ## Five-lens self-review - **Data loss**: the dynamic wrap proposer is a pass-through when the pointer holds nil (the Stage 6E-2c default). Production behaviour is unchanged today — every proposal still reaches the engine cleartext via the existing path. No new error swallow, no new fall-through. - **Concurrency / distributed**: hot-swap uses `atomic.Pointer.Store` / `Load`; reads are lock-free and observe a fully-published closure (either nil or non-nil) on every call. Last-writer-wins on concurrent stores; no torn read. - **Performance**: one extra `atomic.Pointer.Load` per Propose / ProposeAdmin on the dynamic path; no allocation, no contention. Wrap closure invocation cost is paid only when non-nil; matches the existing `wrappedProposer` profile. - **Data consistency**: the wrap pointer's lifetime is owned by the `ShardGroup` that owns the engine; the proposer holds an `*atomic.Pointer` reference back into `ShardGroup`. No cross-shard mixing — each shard has its own pointer, so a future per-shard rotation policy works without changes here. - **Test coverage**: 8 new tests cover the wrap-proposer mechanics, the ShardGroup hot-swap surface, and the startup hook input matrix. ## Caller audit (per /loop semantic-change rule) - `kv.NewLeaderProxyWithEngine` (kept; non-wrap-aware): no behaviour change. Test fixtures and any shard group that opts out of encryption continue to use it. - `kv.NewLeaderProxyForShardGroup` (new; wrap-aware): used by `main.go`'s `buildShardGroups` for every production shard group. Audit: the only construction site for production `ShardGroup` instances now routes through this helper. - ShardGroup struct literal in `main.go`: switched from one-shot literal with `Txn` set inline to two-step build (struct literal, then `Txn = NewLeaderProxyForShardGroup(sg, opts...)`). Necessary because `NewLeaderProxyForShardGroup` needs the `&sg.raftPayloadWrap` pointer, which is only addressable after the struct exists. - `SetRaftPayloadWrap` callers: none in this PR. The surface is published for Stage 6E-2d's `EnableRaftEnvelope` handler. ## Test plan - [x] `go build ./...` clean - [x] `go vet ./...` clean - [x] `golangci-lint run` clean (pre-commit hook ran) - [x] `go test -race -run 'DynamicWrappedProposer|ShardGroup_Set|WrappedProposer|NoteRaftEnvelopeCutover' .` passes - [x] `go test -race ./kv/` scoped tests pass Refs: `docs/design/2026_05_31_partial_6e_enable_raft_envelope.md` <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Release Notes * **New Features** * Introduced runtime-swappable raft payload envelope wrapping with dynamic wrapper support across all proposal types and shard groups. * **Chores** * Added startup validation to detect and prevent incompatible payload envelope cutover states with clear operator guidance. * **Tests** * Expanded test coverage for dynamic payload wrapper behavior, atomic pointer runtime swapping, integration with shard groups, and startup safety checks. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
2 parents 0ece726 + b452b0c commit eb371ca

7 files changed

Lines changed: 732 additions & 5 deletions

kv/raft_payload_wrapper.go

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package kv
22

33
import (
44
"context"
5+
"sync/atomic"
56

67
"github.com/bootjp/elastickv/internal/raftengine"
78
"github.com/cockroachdb/errors"
@@ -108,3 +109,95 @@ func (p *wrappedProposer) ProposeAdmin(ctx context.Context, data []byte) (*rafte
108109
}
109110
return res, nil
110111
}
112+
113+
// dynamicWrappedProposer is the Stage 6E-2c sibling of wrappedProposer
114+
// for the case where the wrap closure must be swappable at runtime.
115+
// On every Propose / ProposeAdmin call it loads an atomic.Pointer to
116+
// the currently-active RaftPayloadWrapper; a nil pointer means wrap
117+
// is inactive (payload passes through verbatim).
118+
//
119+
// Why a separate type (vs adding a setter to wrappedProposer): the
120+
// existing wrappedProposer captures wrap at construction time and is
121+
// used by static call sites that never need to change wrap state.
122+
// The Stage 6E-2 cutover model needs runtime swap so the
123+
// EnableRaftEnvelope admin handler (Stage 6E-2d) can publish the
124+
// active wrap closure the instant the cutover entry commits, without
125+
// rebuilding the TransactionManager or stalling in-flight proposals.
126+
// Splitting the two keeps the static-wrap fast path branch-free and
127+
// keeps the dynamic path's atomic.Pointer.Load() cost confined to
128+
// the call sites that need it.
129+
//
130+
// The pointer is required to be non-nil; pass an atomic.Pointer that
131+
// stores nil to express "wrap inactive". This is so the call site
132+
// (typically a ShardGroup) owns the storage and the proposer just
133+
// reads.
134+
type dynamicWrappedProposer struct {
135+
inner raftengine.Proposer
136+
wrapPtr *atomic.Pointer[RaftPayloadWrapper]
137+
}
138+
139+
// newDynamicWrappedProposer wires a proposer that consults wrapPtr
140+
// on every Propose / ProposeAdmin. wrapPtr.Load() == nil keeps the
141+
// path as a pure pass-through to inner; a non-nil value applies
142+
// wrap before forwarding.
143+
//
144+
// Contract: inner MUST be non-nil. A nil inner is a caller bug
145+
// (the construction site is responsible for handing in a valid
146+
// proposer) and the constructor deliberately does not silently
147+
// degrade — passing nil to Propose / ProposeAdmin would NPE at
148+
// the first call site, which is the same fail-fast shape as the
149+
// sibling newWrappedProposer(nil, wrap) and matches CLAUDE.md's
150+
// "don't validate for scenarios that can't happen" policy at
151+
// internal boundaries.
152+
//
153+
// wrapPtr MAY be nil — that path returns inner verbatim, mirroring
154+
// newWrappedProposer(inner, nil)'s degraded shape so a wiring site
155+
// that accidentally drops the pointer downgrades to a no-wrap
156+
// proposer rather than crashing the engine loop on first use. The
157+
// caller owns the storage lifetime of the pointer when non-nil.
158+
func newDynamicWrappedProposer(inner raftengine.Proposer, wrapPtr *atomic.Pointer[RaftPayloadWrapper]) raftengine.Proposer {
159+
if wrapPtr == nil {
160+
// Defensive degradation: passing a nil pointer would NPE
161+
// on the first Propose. Treat as static no-wrap so callers
162+
// that pass nil by mistake see the Stage 3 default rather
163+
// than crashing the engine loop. This matches
164+
// newWrappedProposer(inner, nil) returning inner verbatim.
165+
return inner
166+
}
167+
return &dynamicWrappedProposer{inner: inner, wrapPtr: wrapPtr}
168+
}
169+
170+
func (p *dynamicWrappedProposer) currentWrap() RaftPayloadWrapper {
171+
if loaded := p.wrapPtr.Load(); loaded != nil {
172+
return *loaded
173+
}
174+
return nil
175+
}
176+
177+
func (p *dynamicWrappedProposer) Propose(ctx context.Context, data []byte) (*raftengine.ProposalResult, error) {
178+
wrapped, err := applyRaftPayloadWrap(p.currentWrap(), data)
179+
if err != nil {
180+
return nil, err
181+
}
182+
res, err := p.inner.Propose(ctx, wrapped)
183+
if err != nil {
184+
return nil, errors.Wrap(err, "kv: dynamic wrapped propose")
185+
}
186+
return res, nil
187+
}
188+
189+
// ProposeAdmin mirrors Propose's wrap-applies semantics. See
190+
// wrappedProposer.ProposeAdmin for the design rationale (the wrap
191+
// layer is NOT a barrier exemption; the EnableRaftEnvelope cutover
192+
// marker bypasses wrap at the call site, not the method level).
193+
func (p *dynamicWrappedProposer) ProposeAdmin(ctx context.Context, data []byte) (*raftengine.ProposalResult, error) {
194+
wrapped, err := applyRaftPayloadWrap(p.currentWrap(), data)
195+
if err != nil {
196+
return nil, err
197+
}
198+
res, err := p.inner.ProposeAdmin(ctx, wrapped)
199+
if err != nil {
200+
return nil, errors.Wrap(err, "kv: dynamic wrapped propose-admin")
201+
}
202+
return res, nil
203+
}

kv/raft_payload_wrapper_test.go

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,3 +223,166 @@ func TestWrappedProposer_RoundTripWithRealCipher(t *testing.T) {
223223
t.Fatalf("round-trip mismatch: got %q, want %q", got, plaintext)
224224
}
225225
}
226+
227+
// TestDynamicWrappedProposer_NilPointerReturnsInnerVerbatim pins the
228+
// defensive degradation in newDynamicWrappedProposer: if a caller
229+
// accidentally passes a nil *atomic.Pointer, the wrapper falls back
230+
// to the inner proposer rather than NPE on the first Propose call.
231+
// This matches newWrappedProposer(inner, nil)'s shape and keeps a
232+
// misconfigured wiring from crashing the engine loop.
233+
func TestDynamicWrappedProposer_NilPointerReturnsInnerVerbatim(t *testing.T) {
234+
t.Parallel()
235+
inner := &fakeProposer{}
236+
got := newDynamicWrappedProposer(inner, nil)
237+
if got != raftengine.Proposer(inner) {
238+
t.Fatal("nil pointer: newDynamicWrappedProposer should return the inner proposer verbatim, not a wrapper")
239+
}
240+
}
241+
242+
// TestDynamicWrappedProposer_NilStoredIsPassThrough pins the
243+
// Stage 6E-2c "wrap inactive" default: when the atomic pointer
244+
// holds nil (no wrap closure installed), payloads reach the inner
245+
// proposer verbatim and routing distinguishes Propose vs
246+
// ProposeAdmin correctly.
247+
func TestDynamicWrappedProposer_NilStoredIsPassThrough(t *testing.T) {
248+
t.Parallel()
249+
var wrapPtr atomic.Pointer[RaftPayloadWrapper]
250+
inner := &fakeProposer{}
251+
wp := newDynamicWrappedProposer(inner, &wrapPtr)
252+
253+
if _, err := wp.Propose(context.Background(), []byte("plain")); err != nil {
254+
t.Fatalf("Propose: %v", err)
255+
}
256+
if got := inner.calls.Load(); got != 1 {
257+
t.Fatalf("inner.Propose calls = %d, want 1", got)
258+
}
259+
if !bytes.Equal(inner.last, []byte("plain")) {
260+
t.Fatalf("inner.Propose saw %q, want %q (verbatim)", inner.last, []byte("plain"))
261+
}
262+
263+
if _, err := wp.ProposeAdmin(context.Background(), []byte("admin")); err != nil {
264+
t.Fatalf("ProposeAdmin: %v", err)
265+
}
266+
if got := inner.adminCalls.Load(); got != 1 {
267+
t.Fatalf("inner.ProposeAdmin calls = %d, want 1", got)
268+
}
269+
if !bytes.Equal(inner.adminLast, []byte("admin")) {
270+
t.Fatalf("inner.ProposeAdmin saw %q, want %q (verbatim)", inner.adminLast, []byte("admin"))
271+
}
272+
}
273+
274+
// TestDynamicWrappedProposer_LoadsCurrentWrapEveryCall pins the
275+
// Stage 6E-2c hot-swap contract: the atomic pointer is loaded on
276+
// every Propose / ProposeAdmin call, so a publish via Store between
277+
// two proposals is observed by the second without rebuilding the
278+
// proposer.
279+
func TestDynamicWrappedProposer_LoadsCurrentWrapEveryCall(t *testing.T) {
280+
t.Parallel()
281+
var wrapPtr atomic.Pointer[RaftPayloadWrapper]
282+
inner := &fakeProposer{}
283+
wp := newDynamicWrappedProposer(inner, &wrapPtr)
284+
285+
// First call: no wrap installed, payload passes through verbatim.
286+
if _, err := wp.Propose(context.Background(), []byte("first")); err != nil {
287+
t.Fatalf("Propose 1: %v", err)
288+
}
289+
if !bytes.Equal(inner.last, []byte("first")) {
290+
t.Fatalf("first call: inner saw %q, want %q", inner.last, "first")
291+
}
292+
293+
// Install a wrap mid-flight; the next call must see it.
294+
var wrap RaftPayloadWrapper = func(p []byte) ([]byte, error) {
295+
out := make([]byte, len(p)+1)
296+
out[0] = 'W'
297+
copy(out[1:], p)
298+
return out, nil
299+
}
300+
wrapPtr.Store(&wrap)
301+
302+
if _, err := wp.Propose(context.Background(), []byte("second")); err != nil {
303+
t.Fatalf("Propose 2: %v", err)
304+
}
305+
want := append([]byte{'W'}, []byte("second")...)
306+
if !bytes.Equal(inner.last, want) {
307+
t.Fatalf("second call: inner saw %q, want %q (wrapped)", inner.last, want)
308+
}
309+
310+
// Clear the wrap; the next call must see it gone.
311+
wrapPtr.Store(nil)
312+
313+
if _, err := wp.Propose(context.Background(), []byte("third")); err != nil {
314+
t.Fatalf("Propose 3: %v", err)
315+
}
316+
if !bytes.Equal(inner.last, []byte("third")) {
317+
t.Fatalf("third call (post-clear): inner saw %q, want %q (verbatim)", inner.last, "third")
318+
}
319+
}
320+
321+
// TestShardGroup_SetRaftPayloadWrap_RoundTrip pins the
322+
// Stage 6E-2c hot-swap surface on ShardGroup. Set with a non-nil
323+
// closure publishes it; Set with nil clears; the round-trip via
324+
// RaftPayloadWrap returns identity.
325+
func TestShardGroup_SetRaftPayloadWrap_RoundTrip(t *testing.T) {
326+
t.Parallel()
327+
g := &ShardGroup{}
328+
if got := g.RaftPayloadWrap(); got != nil {
329+
t.Fatalf("default: RaftPayloadWrap = %v, want nil", got)
330+
}
331+
var called atomic.Int32
332+
var wrap RaftPayloadWrapper = func(p []byte) ([]byte, error) {
333+
called.Add(1)
334+
return p, nil
335+
}
336+
g.SetRaftPayloadWrap(wrap)
337+
got := g.RaftPayloadWrap()
338+
if got == nil {
339+
t.Fatal("after Set: RaftPayloadWrap returned nil")
340+
}
341+
if _, err := got([]byte("x")); err != nil {
342+
t.Fatalf("invoking stored wrap: %v", err)
343+
}
344+
if got := called.Load(); got != 1 {
345+
t.Fatalf("stored wrap call count = %d, want 1", got)
346+
}
347+
g.SetRaftPayloadWrap(nil)
348+
if got := g.RaftPayloadWrap(); got != nil {
349+
t.Fatalf("after Set(nil): RaftPayloadWrap = %v, want nil", got)
350+
}
351+
}
352+
353+
// TestDynamicWrappedProposer_ProposeAdminAppliesCurrentWrap is the
354+
// admin-path mirror of LoadsCurrentWrapEveryCall: ProposeAdmin
355+
// must also see hot-swapped wrap closures so post-cutover admin
356+
// entries (RotateDEK, RegisterEncryptionWriter) committed at
357+
// `index > raftEnvelopeCutoverIndex` carry the AEAD envelope the
358+
// §6.3 strict-`>` apply hook expects. The cutover marker itself
359+
// bypasses this proposer at the call site (raw engine reference),
360+
// not at this layer.
361+
func TestDynamicWrappedProposer_ProposeAdminAppliesCurrentWrap(t *testing.T) {
362+
t.Parallel()
363+
var wrapPtr atomic.Pointer[RaftPayloadWrapper]
364+
var wrap RaftPayloadWrapper = func(p []byte) ([]byte, error) {
365+
out := make([]byte, len(p)+1)
366+
out[0] = 'A'
367+
copy(out[1:], p)
368+
return out, nil
369+
}
370+
wrapPtr.Store(&wrap)
371+
372+
inner := &fakeProposer{}
373+
wp := newDynamicWrappedProposer(inner, &wrapPtr)
374+
375+
if _, err := wp.ProposeAdmin(context.Background(), []byte("admin-payload")); err != nil {
376+
t.Fatalf("ProposeAdmin: %v", err)
377+
}
378+
if got := inner.adminCalls.Load(); got != 1 {
379+
t.Fatalf("inner.ProposeAdmin calls = %d, want 1", got)
380+
}
381+
if got := inner.calls.Load(); got != 0 {
382+
t.Fatalf("inner.Propose calls = %d, want 0 — admin path must NOT downgrade to non-exempt method", got)
383+
}
384+
want := append([]byte{'A'}, []byte("admin-payload")...)
385+
if !bytes.Equal(inner.adminLast, want) {
386+
t.Fatalf("inner.ProposeAdmin saw %q, want %q (wrapped)", inner.adminLast, want)
387+
}
388+
}

0 commit comments

Comments
 (0)