Skip to content

Commit ed575bf

Browse files
authored
kv(composed1): M3 — verifyComposed1 apply-time gate + retry sentinels (#895)
2 parents 2b118a4 + fbd0d46 commit ed575bf

4 files changed

Lines changed: 542 additions & 6 deletions

File tree

distribution/engine.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,21 @@ func (s RouteHistorySnapshot) OwnerOf(key []byte) (uint64, bool) {
179179
return 0, false
180180
}
181181

182+
// Current returns the route catalog snapshot at the engine's current
183+
// catalogVersion. Returns (zero, false) when the history ring has
184+
// not been initialised (bare-struct Engine). Used by the M3
185+
// Composed-1 cross-version-read fence (design doc §4.4) — the gate
186+
// compares the txn's observed-version owner against the current
187+
// owner so a route shift between BeginTxn and Commit is caught
188+
// before it can produce a G1c anomaly across a cross-group
189+
// MoveRange / SplitRange.
190+
func (e *Engine) Current() (RouteHistorySnapshot, bool) {
191+
e.mu.RLock()
192+
defer e.mu.RUnlock()
193+
snap, ok := e.history[e.catalogVersion]
194+
return snap, ok
195+
}
196+
182197
// SnapshotAt returns the route catalog snapshot recorded at version v.
183198
// Returns (zero, false) when v is not in the ring — either because v
184199
// is in the future (> catalogVersion), or because the FIFO ring has
@@ -193,6 +208,49 @@ func (e *Engine) SnapshotAt(v uint64) (RouteHistorySnapshot, bool) {
193208
return snap, ok
194209
}
195210

211+
// SetHistoryDepthForTest overrides the FIFO ring depth from outside
212+
// the package. Test-only. Callers should set the depth before
213+
// sharing the Engine with concurrent SnapshotAt/Current readers to
214+
// avoid interleaving surprises around the eviction watermark, but
215+
// the write itself is lock-protected (e.mu.Lock below) so it is
216+
// safe to call from any goroutine that does not also expect a
217+
// consistent SnapshotAt view across the depth change.
218+
//
219+
// Exists so tests in the kv package can drive eviction-trigger
220+
// scenarios without adding a constructor option just for tests
221+
// (claude review on PR #894). Production code must use
222+
// DefaultRouteHistoryDepth (32) or a future operator-exposed
223+
// config knob.
224+
//
225+
// Fails fast on depth <= 0 (coderabbit minor on PR #895):
226+
// recordHistorySnapshotLocked's eviction path indexes
227+
// historyOrder[0], so a zero/negative depth would surface as a
228+
// confusing index-out-of-range deep in the apply path instead of
229+
// at the misconfigured test seam. When shrinking depth below the
230+
// current ring size, evict the excess oldest entries immediately
231+
// rather than letting the next record see len(historyOrder) >
232+
// historyDepth (gemini medium on PR #895 — without this trim, the
233+
// next recordHistorySnapshotLocked's
234+
// `make([]uint64, len-1, historyDepth)` would panic on len-1 >
235+
// historyDepth).
236+
func (e *Engine) SetHistoryDepthForTest(depth int) {
237+
if depth <= 0 {
238+
panic("SetHistoryDepthForTest: depth must be > 0")
239+
}
240+
e.mu.Lock()
241+
defer e.mu.Unlock()
242+
e.historyDepth = depth
243+
if len(e.historyOrder) > depth {
244+
excess := len(e.historyOrder) - depth
245+
for i := range excess {
246+
delete(e.history, e.historyOrder[i])
247+
}
248+
retained := make([]uint64, depth)
249+
copy(retained, e.historyOrder[excess:])
250+
e.historyOrder = retained
251+
}
252+
}
253+
196254
// HistoryDepth returns the configured ring depth for diagnostics.
197255
func (e *Engine) HistoryDepth() int {
198256
e.mu.RLock()

kv/fsm.go

Lines changed: 153 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -86,16 +86,24 @@ type kvFSM struct {
8686
}
8787

8888
// RouteHistory is the kv-side interface to the route catalog's
89-
// versioned-snapshot ring. *distribution.Engine satisfies it.
90-
// Defined in the kv package so kvFSM does not have to import a
91-
// concrete type for the field; the M3 verifyComposed1 gate uses
92-
// only SnapshotAt and the returned snapshot's OwnerOf, so the
93-
// interface stays minimal.
89+
// versioned-snapshot ring. *distribution.Engine satisfies it via
90+
// WrapDistributionEngine. Defined in the kv package so kvFSM does
91+
// not have to import a concrete type for the field; the M3
92+
// verifyComposed1 gate uses only SnapshotAt + Current + the returned
93+
// snapshot's OwnerOf, so the interface stays minimal.
9494
type RouteHistory interface {
9595
// SnapshotAt returns the route catalog at the given catalog
9696
// version. Returns (zero, false) when the version is outside
97-
// the ring (either evicted by depth, or in the future).
97+
// the ring (either evicted by depth, or in the future). The
98+
// M3 gate maps the not-found case to ErrComposed1VersionGCd.
9899
SnapshotAt(version uint64) (RouteSnapshot, bool)
100+
// Current returns the route catalog snapshot at the engine's
101+
// current catalog version. Returns (zero, false) when the
102+
// engine has no history (bare-struct case used by some test
103+
// seams). The M3 cross-version fence uses this to compare
104+
// the txn's observed-version owner against the current
105+
// owner — a mismatch is the §3 codex P1 trace.
106+
Current() (RouteSnapshot, bool)
99107
}
100108

101109
// RouteSnapshot is the historical view of the route catalog at a
@@ -199,6 +207,38 @@ var _ raftengine.StateMachine = (*kvFSM)(nil)
199207

200208
var ErrUnknownRequestType = errors.New("unknown request type")
201209

210+
// ErrComposed1Violation is returned by verifyComposed1 when the
211+
// transaction's commit cannot proceed on this Raft group because the
212+
// txn's read-set or write-set keys are not owned by this group at
213+
// either the txn's observed catalog version (the spec-level §4.2(a)
214+
// check) or the current catalog version observed by the FSM at apply
215+
// time (the §4.4 cross-version-read fence). Surfaces to the
216+
// coordinator as a retryable error: the M4 coordinator path re-reads
217+
// the route cache, re-routes the txn, and re-issues it once on the
218+
// new owning group.
219+
//
220+
// Wrapped with errors.Wrapf at the call site to carry the
221+
// per-key diagnostic (which key, which observed-version owner, which
222+
// current-version owner) — the caller's retry path uses
223+
// errors.Is(err, ErrComposed1Violation) to match.
224+
var ErrComposed1Violation = errors.New("composed-1: route ownership shifted; retry on new owning group")
225+
226+
// ErrComposed1VersionGCd is returned by verifyComposed1 when the
227+
// txn's observed catalog version is no longer in the engine's
228+
// retention ring — either because the FIFO ring evicted it (the
229+
// txn lived longer than `routeHistoryDepth` versions worth of
230+
// catalog churn) or because the version was never seen on this
231+
// node. Surfaces to the coordinator as a retryable error: the
232+
// caller's M4 retry path reads the current route cache and
233+
// re-issues the txn with a fresh observedVer.
234+
//
235+
// The not-found ⇒ hard-error semantics (rather than soft-pass)
236+
// matters because a soft-pass would let the gate be bypassed
237+
// exactly in the long-running-txn / high-churn cases where the
238+
// cross-version-read hazard is most likely (design doc §4.3 +
239+
// gemini medium + codex P2 on PR #870).
240+
var ErrComposed1VersionGCd = errors.New("composed-1: observed catalog version evicted from history ring; retry")
241+
202242
type fsmApplyResponse struct {
203243
results []error
204244
}
@@ -493,6 +533,9 @@ func (f *kvFSM) RestoredCutover() uint64 {
493533
}
494534

495535
func (f *kvFSM) handleTxnRequest(ctx context.Context, r *pb.Request, commitTS uint64) error {
536+
if err := f.verifyComposed1(r); err != nil {
537+
return err
538+
}
496539
switch r.Phase {
497540
case pb.Phase_PREPARE:
498541
return f.handlePrepareRequest(ctx, r)
@@ -507,6 +550,110 @@ func (f *kvFSM) handleTxnRequest(ctx context.Context, r *pb.Request, commitTS ui
507550
}
508551
}
509552

553+
// verifyComposed1 is the M3 apply-time Composed-1 gate per
554+
// docs/design/2026_05_29_proposed_composed1_cross_group_commit_guard.md
555+
// §4.2(a) + §4.4. Runs two checks before the txn's writes land:
556+
//
557+
// (a) Observed-version owner — the txn's read-set was captured
558+
// at routes[observedVer], so every write key must be owned
559+
// by THIS Raft group at that historical version. Matches
560+
// the spec-level Commit precondition in tla/composed/Composed.tla.
561+
//
562+
// (b) Current-version owner — even when (a) passes, a route
563+
// shift between BeginTxn and Commit can leave the write
564+
// landing on the OLD owner while readers at the new
565+
// version route to the NEW owner and miss the write (the
566+
// §3 codex P1 G1c trace). The current-version fence
567+
// refuses the commit when this group no longer owns the
568+
// key, forcing a coordinator retry on the new owner.
569+
//
570+
// Short-circuits cleanly in three legacy / not-applicable cases:
571+
// - FSM was constructed without WithRouteHistory (legacy / test
572+
// seam): routes == nil, return nil.
573+
// - Request carries ObservedRouteVersion == 0 (unpinned —
574+
// pre-M1 caller, or ABORT request that doesn't carry the
575+
// version): return nil.
576+
// - Engine.Current returns (zero, false) — the engine has no
577+
// history (bare-struct test seam): return nil at the (b) check.
578+
//
579+
// Returns ErrComposed1VersionGCd when the observed version is
580+
// outside the ring (M4 retry), and ErrComposed1Violation wrapped
581+
// with per-key context otherwise.
582+
func (f *kvFSM) verifyComposed1(r *pb.Request) error {
583+
// ABORT requests MUST always reach the abort handler so the
584+
// txn's intent locks get released. If a route shifted between
585+
// PREPARE and ABORT (or the observed version was evicted), a
586+
// rejected ABORT would leave the locks pinned until LockResolver
587+
// noticed at TTL expiry — minutes of write-blocked keys for
588+
// what should be a one-RPC cleanup. Production callers carry
589+
// ObservedRouteVersion=0 on ABORT (the existing observedVer==0
590+
// check below handles that case), so this guard is defensive
591+
// belt-and-suspenders: any future ABORT construction site that
592+
// inadvertently sets the version still bypasses the gate
593+
// (gemini HIGH on PR #895 — fail-open on ABORT).
594+
if r.GetPhase() == pb.Phase_ABORT {
595+
return nil
596+
}
597+
// Bypass the gate when EITHER the route history is unwired OR
598+
// the FSM's shardGroupID is the unset sentinel (0). Both
599+
// fields are documented as "unset ⇒ short-circuit" but the
600+
// original check only honoured `routes == nil`, leaving a
601+
// partially-wired caller (routes installed before group ID)
602+
// to silently reject every pinned txn — no real Raft group
603+
// has ID 0, so OwnerOf would never match (coderabbit MAJOR on
604+
// PR #895).
605+
if f.routes == nil || f.shardGroupID == 0 {
606+
return nil
607+
}
608+
observedVer := r.GetObservedRouteVersion()
609+
if observedVer == 0 {
610+
return nil
611+
}
612+
613+
// (a) Observed-version check.
614+
observedSnap, ok := f.routes.SnapshotAt(observedVer)
615+
if !ok {
616+
return errors.WithStack(ErrComposed1VersionGCd)
617+
}
618+
if err := f.verifyOwnerFromSnapshot(r.GetMutations(), observedSnap, observedVer, "observed"); err != nil {
619+
return err
620+
}
621+
622+
// (b) Current-version cross-version-read fence.
623+
currentSnap, ok := f.routes.Current()
624+
if !ok {
625+
// No current snapshot — engine has no history, nothing
626+
// to compare against. Fall through (matches the
627+
// short-circuit posture of an unwired FSM).
628+
return nil
629+
}
630+
return f.verifyOwnerFromSnapshot(r.GetMutations(), currentSnap, currentSnap.Version(), "current")
631+
}
632+
633+
// verifyOwnerFromSnapshot is the shared per-mutation owner-check
634+
// loop used by verifyComposed1's observed-version and current-
635+
// version passes. `phase` is the diagnostic label ("observed" /
636+
// "current") that ends up in the wrapped error. isTxnInternalKey
637+
// mutations (the TxnMeta marker prefix) are skipped — they are
638+
// always on every shard and have no Composed-1 ownership.
639+
func (f *kvFSM) verifyOwnerFromSnapshot(mutations []*pb.Mutation, snap RouteSnapshot, snapVer uint64, phase string) error {
640+
for _, mut := range mutations {
641+
if mut == nil || len(mut.Key) == 0 {
642+
continue
643+
}
644+
if isTxnInternalKey(mut.Key) {
645+
continue
646+
}
647+
owner, found := snap.OwnerOf(mut.Key)
648+
if !found || owner != f.shardGroupID {
649+
return errors.Wrapf(ErrComposed1Violation,
650+
"%s-version v=%d: key %q owned by group %d (found=%v); this FSM serves group %d",
651+
phase, snapVer, mut.Key, owner, found, f.shardGroupID)
652+
}
653+
}
654+
return nil
655+
}
656+
510657
func (f *kvFSM) validateConflicts(ctx context.Context, muts []*pb.Mutation, startTS uint64) error {
511658
seen := make(map[string]struct{}, len(muts))
512659
for _, mut := range muts {

0 commit comments

Comments
 (0)