Skip to content

Commit 2b118a4

Browse files
authored
kv(composed1): M2 — versioned-snapshot ring + kvFSM RouteHistory wiring (#894)
2 parents 2bcb6e7 + e2dbb16 commit 2b118a4

9 files changed

Lines changed: 551 additions & 7 deletions

distribution/engine.go

Lines changed: 137 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,27 @@ type Engine struct {
3636
catalogVersion uint64
3737
ts atomic.Uint64
3838
hotspotThreshold uint64
39+
// history is the M2 versioned-snapshot ring for Composed-1
40+
// (docs/design/2026_05_29_proposed_composed1_cross_group_commit_guard.md
41+
// §M2). Keyed by catalogVersion; populated on every successful
42+
// ApplySnapshot and seeded by NewEngineWithDefaultRoute so a
43+
// transaction that observed catalogVersion = 0 (the engine's
44+
// pre-bootstrap snapshot) can still resolve its read-set owner.
45+
// Bounded by historyDepth via a FIFO eviction list
46+
// (historyOrder). At M2 the ring is plumbing only — M3 will
47+
// read from it via verifyComposed1.
48+
history map[uint64]RouteHistorySnapshot
49+
historyOrder []uint64
50+
historyDepth int
3951
}
4052

53+
// DefaultRouteHistoryDepth is the size of Engine's versioned-snapshot
54+
// ring used by the Composed-1 M2 plumbing. 32 is conservative
55+
// against current single-leader catalog churn (operator-frequency,
56+
// not data-plane) per the design doc §9 Q2; raise if a future
57+
// control plane generates more than ~tens of versions per second.
58+
const DefaultRouteHistoryDepth = 32
59+
4160
const defaultGroupID uint64 = 1
4261
const minRouteCountForOrderValidation = 2
4362

@@ -54,18 +73,28 @@ func NewEngine() *Engine {
5473
}
5574

5675
// NewEngineWithDefaultRoute creates an Engine and registers a default route
57-
// covering the full keyspace with a default group ID.
76+
// covering the full keyspace with a default group ID. The default route is
77+
// also recorded in the M2 history ring as the version-0 snapshot so
78+
// transactions that observed catalogVersion = 0 can resolve their read-set
79+
// owner through SnapshotAt(0).
5880
func NewEngineWithDefaultRoute() *Engine {
5981
engine := NewEngine()
6082
engine.UpdateRoute([]byte(""), nil, defaultGroupID)
83+
engine.recordHistorySnapshotLocked()
6184
return engine
6285
}
6386

6487
// NewEngineWithThreshold creates an Engine and sets a threshold for hotspot
6588
// detection. A non-zero threshold enables automatic range splitting when the
6689
// number of accesses to a range exceeds the threshold.
6790
func NewEngineWithThreshold(threshold uint64) *Engine {
68-
return &Engine{routes: make([]Route, 0), hotspotThreshold: threshold}
91+
return &Engine{
92+
routes: make([]Route, 0),
93+
hotspotThreshold: threshold,
94+
history: make(map[uint64]RouteHistorySnapshot, DefaultRouteHistoryDepth),
95+
historyOrder: make([]uint64, 0, DefaultRouteHistoryDepth),
96+
historyDepth: DefaultRouteHistoryDepth,
97+
}
6998
}
7099

71100
// Version returns current route catalog version applied to the engine.
@@ -106,9 +135,115 @@ func (e *Engine) ApplySnapshot(snapshot CatalogSnapshot) error {
106135

107136
e.routes = routes
108137
e.catalogVersion = snapshot.Version
138+
e.recordHistorySnapshotLocked()
109139
return nil
110140
}
111141

142+
// RouteHistorySnapshot is a point-in-time view of the route catalog at
143+
// a specific version. Returned by Engine.SnapshotAt for the M3
144+
// Composed-1 commit-time gate. Carries an immutable copy of the
145+
// catalog's routes at the recorded version so a caller can resolve
146+
// ownership without holding the Engine lock.
147+
type RouteHistorySnapshot struct {
148+
version uint64
149+
routes []Route
150+
}
151+
152+
// Version returns the catalog version this snapshot was recorded at.
153+
func (s RouteHistorySnapshot) Version() uint64 { return s.version }
154+
155+
// OwnerOf returns the Raft group ID that owned key at this snapshot's
156+
// version. Returns (0, false) when no route covers key (the
157+
// pre-bootstrap state or an explicitly-uncovered range). Mirrors
158+
// Engine.GetRoute's right-half-open interval semantics but against
159+
// the historical snapshot, not the live engine state.
160+
//
161+
// Routes are sorted by Start (recordHistorySnapshotLocked clones from
162+
// e.routes, which Engine.UpdateRoute / routesFromCatalog keep sorted),
163+
// so the scan can break the moment key < r.Start — every later route
164+
// has a strictly greater Start and cannot cover key either. This
165+
// matters because M3 puts OwnerOf on every txn commit's apply path
166+
// (claude review on PR #894 — break-vs-continue lifts the worst-case
167+
// scan from O(N) to "first non-covering gap" without changing the
168+
// resolution semantics).
169+
func (s RouteHistorySnapshot) OwnerOf(key []byte) (uint64, bool) {
170+
for _, r := range s.routes {
171+
if bytes.Compare(key, r.Start) < 0 {
172+
break
173+
}
174+
if r.End != nil && bytes.Compare(key, r.End) >= 0 {
175+
continue
176+
}
177+
return r.GroupID, true
178+
}
179+
return 0, false
180+
}
181+
182+
// SnapshotAt returns the route catalog snapshot recorded at version v.
183+
// Returns (zero, false) when v is not in the ring — either because v
184+
// is in the future (> catalogVersion), or because the FIFO ring has
185+
// evicted v (it was older than the historyDepth-most-recent
186+
// versions). The M3 Composed-1 gate (design doc §4.3) treats the
187+
// not-found case as a hard error and triggers a coordinator retry,
188+
// so retention depth is a liveness knob, not a safety knob.
189+
func (e *Engine) SnapshotAt(v uint64) (RouteHistorySnapshot, bool) {
190+
e.mu.RLock()
191+
defer e.mu.RUnlock()
192+
snap, ok := e.history[v]
193+
return snap, ok
194+
}
195+
196+
// HistoryDepth returns the configured ring depth for diagnostics.
197+
func (e *Engine) HistoryDepth() int {
198+
e.mu.RLock()
199+
defer e.mu.RUnlock()
200+
return e.historyDepth
201+
}
202+
203+
// recordHistorySnapshotLocked pushes the current (catalogVersion,
204+
// routes) pair into the ring. The `Locked` suffix is the Go
205+
// convention for "caller MUST hold the receiver's lock" — checked
206+
// by reviewers via name, not by the runtime (claude review on PR
207+
// #894 — fragile lock contract). Invoked from ApplySnapshot under
208+
// the write lock and from NewEngineWithDefaultRoute before the
209+
// Engine is shared with any concurrent reader. Idempotent on
210+
// re-record at the same version.
211+
func (e *Engine) recordHistorySnapshotLocked() {
212+
if e.history == nil {
213+
// Engines constructed via the bare struct literal (e.g.
214+
// internal test seams) — no history ring configured. Skip
215+
// the record so the M2 plumbing stays optional for those
216+
// paths; the M3 gate will observe SnapshotAt → (zero,
217+
// false) and trigger the soft-fail-as-retry path.
218+
return
219+
}
220+
if _, exists := e.history[e.catalogVersion]; exists {
221+
return
222+
}
223+
if len(e.historyOrder) >= e.historyDepth {
224+
evict := e.historyOrder[0]
225+
// Copy the retained tail into fresh storage rather than
226+
// reslicing. `historyOrder[1:]` only advances the slice
227+
// header — the head of the original backing array stays
228+
// alive and grows unboundedly across evictions. At depth=32
229+
// this is small, but the FIFO eviction is the only place
230+
// the array grows, and the compaction is free (single
231+
// allocation, single copy of <=historyDepth entries —
232+
// claude review on PR #894 — backing-array leak).
233+
retained := make([]uint64, len(e.historyOrder)-1, e.historyDepth)
234+
copy(retained, e.historyOrder[1:])
235+
e.historyOrder = retained
236+
delete(e.history, evict)
237+
}
238+
cloned := make([]Route, len(e.routes))
239+
copy(cloned, e.routes)
240+
e.history[e.catalogVersion] = RouteHistorySnapshot{
241+
version: e.catalogVersion,
242+
routes: cloned,
243+
}
244+
e.historyOrder = append(e.historyOrder, e.catalogVersion)
245+
}
246+
112247
func staleSnapshotVersionErr(snapshotVersion, currentVersion uint64) error {
113248
return errors.Wrapf(
114249
ErrEngineSnapshotVersionStale,

distribution/engine_test.go

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,3 +477,177 @@ func TestEngineApplySnapshot_Concurrent(t *testing.T) {
477477
t.Fatalf("expected route group %d, got %d", maxVersion, route.GroupID)
478478
}
479479
}
480+
481+
// TestEngineSnapshotAt_RecordsApplySnapshot is the M2 round-trip
482+
// witness for the Composed-1 versioned-snapshot ring (design doc
483+
// §M2): every successful ApplySnapshot records the (version, routes)
484+
// pair so SnapshotAt(v) can resolve OwnerOf(k) for any in-flight
485+
// transaction that observed v at BeginTxn.
486+
func TestEngineSnapshotAt_RecordsApplySnapshot(t *testing.T) {
487+
e := NewEngine()
488+
if err := e.ApplySnapshot(CatalogSnapshot{
489+
Version: 1,
490+
Routes: []RouteDescriptor{
491+
{RouteID: 10, Start: []byte(""), End: []byte("m"), GroupID: 7, State: RouteStateActive},
492+
{RouteID: 11, Start: []byte("m"), End: nil, GroupID: 9, State: RouteStateActive},
493+
},
494+
}); err != nil {
495+
t.Fatalf("apply snapshot v1: %v", err)
496+
}
497+
498+
snap, ok := e.SnapshotAt(1)
499+
if !ok {
500+
t.Fatal("expected SnapshotAt(1) to return the v1 snapshot")
501+
}
502+
if snap.Version() != 1 {
503+
t.Fatalf("expected snapshot version 1, got %d", snap.Version())
504+
}
505+
if owner, found := snap.OwnerOf([]byte("a")); !found || owner != 7 {
506+
t.Fatalf("expected key 'a' owner=7 in v1 snapshot; got owner=%d found=%v", owner, found)
507+
}
508+
if owner, found := snap.OwnerOf([]byte("z")); !found || owner != 9 {
509+
t.Fatalf("expected key 'z' owner=9 in v1 snapshot; got owner=%d found=%v", owner, found)
510+
}
511+
}
512+
513+
// TestEngineSnapshotAt_PreservesHistoryAcrossVersions verifies the
514+
// M3-critical property: after ApplySnapshot has moved the catalog
515+
// forward, a SnapshotAt for the PRIOR version still returns the old
516+
// routes. Without this, the M3 verifyComposed1 gate could not
517+
// resolve a txn whose observedVer is behind the current catalog —
518+
// exactly the case the design doc §3 G1c trace requires.
519+
func TestEngineSnapshotAt_PreservesHistoryAcrossVersions(t *testing.T) {
520+
e := NewEngine()
521+
if err := e.ApplySnapshot(CatalogSnapshot{
522+
Version: 1,
523+
Routes: []RouteDescriptor{
524+
{RouteID: 10, Start: []byte(""), End: nil, GroupID: 1, State: RouteStateActive},
525+
},
526+
}); err != nil {
527+
t.Fatalf("apply v1: %v", err)
528+
}
529+
if err := e.ApplySnapshot(CatalogSnapshot{
530+
Version: 2,
531+
Routes: []RouteDescriptor{
532+
{RouteID: 11, Start: []byte(""), End: nil, GroupID: 2, State: RouteStateActive},
533+
},
534+
}); err != nil {
535+
t.Fatalf("apply v2: %v", err)
536+
}
537+
538+
snapV1, ok := e.SnapshotAt(1)
539+
if !ok {
540+
t.Fatal("expected v1 still in history after v2 applied")
541+
}
542+
if owner, _ := snapV1.OwnerOf([]byte("k")); owner != 1 {
543+
t.Fatalf("v1 snapshot must still show group=1 owner; got %d", owner)
544+
}
545+
snapV2, ok := e.SnapshotAt(2)
546+
if !ok {
547+
t.Fatal("expected v2 in history")
548+
}
549+
if owner, _ := snapV2.OwnerOf([]byte("k")); owner != 2 {
550+
t.Fatalf("v2 snapshot must show group=2 owner; got %d", owner)
551+
}
552+
}
553+
554+
// TestEngineSnapshotAt_FIFOEviction verifies that the ring respects
555+
// historyDepth: once more than depth versions have been applied, the
556+
// oldest is evicted and SnapshotAt returns (zero, false) for it.
557+
// The M3 gate (design doc §4.3) treats the not-found case as a hard
558+
// retryable error, so retention depth is a liveness knob — this
559+
// test pins the eviction order so a future depth change does not
560+
// silently break the M3 contract.
561+
func TestEngineSnapshotAt_FIFOEviction(t *testing.T) {
562+
t.Parallel()
563+
e := NewEngine()
564+
// Force a tiny depth so the test is bounded and explicit. The
565+
// direct field write is safe because `e` is local to this test
566+
// goroutine and the depth is set before any ApplySnapshot fires;
567+
// once the Engine is published to concurrent readers, the depth
568+
// would have to flow through a constructor option (claude review
569+
// on PR #894 — fragile-but-test-local lock contract).
570+
e.historyDepth = 3
571+
572+
for v := uint64(1); v <= 5; v++ {
573+
if err := e.ApplySnapshot(CatalogSnapshot{
574+
Version: v,
575+
Routes: []RouteDescriptor{
576+
{RouteID: v, Start: []byte(""), End: nil, GroupID: v, State: RouteStateActive},
577+
},
578+
}); err != nil {
579+
t.Fatalf("apply v%d: %v", v, err)
580+
}
581+
}
582+
583+
if _, ok := e.SnapshotAt(1); ok {
584+
t.Fatal("v1 must be evicted (oldest) under depth=3 after v2..v5 applied")
585+
}
586+
if _, ok := e.SnapshotAt(2); ok {
587+
t.Fatal("v2 must be evicted (second-oldest) under depth=3 after v3..v5 applied")
588+
}
589+
for v := uint64(3); v <= 5; v++ {
590+
snap, ok := e.SnapshotAt(v)
591+
if !ok {
592+
t.Fatalf("expected v%d retained (depth=3 keeps the 3 most recent)", v)
593+
}
594+
if owner, _ := snap.OwnerOf([]byte("k")); owner != v {
595+
t.Fatalf("v%d snapshot must show group=%d owner; got %d", v, v, owner)
596+
}
597+
}
598+
}
599+
600+
// TestEngineSnapshotAt_UnknownVersionReturnsNotFound documents the
601+
// M3-relevant contract: a version that has never been applied (e.g.
602+
// in the future, or a typo) returns (zero, false). The M3 gate
603+
// uses this signal to emit ErrComposed1VersionGCd and trigger a
604+
// coordinator retry.
605+
func TestEngineSnapshotAt_UnknownVersionReturnsNotFound(t *testing.T) {
606+
e := NewEngineWithDefaultRoute()
607+
if _, ok := e.SnapshotAt(42); ok {
608+
t.Fatal("expected SnapshotAt for an unknown version to return false")
609+
}
610+
}
611+
612+
// TestEngineSnapshotAt_SeedsVersionZeroForDefaultRoute verifies that
613+
// the NewEngineWithDefaultRoute path records the version-0 default
614+
// route snapshot. Without this, every txn that observed v=0 (the
615+
// common case before any ApplySnapshot lands) would fall through
616+
// to the M3 not-found path and trigger a spurious retry on its first
617+
// commit.
618+
func TestEngineSnapshotAt_SeedsVersionZeroForDefaultRoute(t *testing.T) {
619+
e := NewEngineWithDefaultRoute()
620+
snap, ok := e.SnapshotAt(0)
621+
if !ok {
622+
t.Fatal("expected NewEngineWithDefaultRoute to seed a v0 history entry")
623+
}
624+
if snap.Version() != 0 {
625+
t.Fatalf("expected seed snapshot version 0, got %d", snap.Version())
626+
}
627+
// Default route covers the full keyspace ⇒ every key resolves to
628+
// defaultGroupID at v0.
629+
owner, ok := snap.OwnerOf([]byte("anything"))
630+
if !ok || owner != defaultGroupID {
631+
t.Fatalf("expected v0 snapshot to resolve every key to defaultGroupID=%d; got owner=%d found=%v", defaultGroupID, owner, ok)
632+
}
633+
}
634+
635+
// TestEngineSnapshotAt_BareEngineHasNoHistory documents that an
636+
// Engine constructed via the bare struct literal (e.g. via internal
637+
// test seams) has a nil history map and SnapshotAt always returns
638+
// (zero, false). recordHistorySnapshot is nil-safe so ApplySnapshot
639+
// still works on such an engine, but the M3 gate will treat every
640+
// SnapshotAt as "not in ring" → soft-fail-as-retry, which is the
641+
// correct posture for unconfigured engines.
642+
func TestEngineSnapshotAt_BareEngineHasNoHistory(t *testing.T) {
643+
e := &Engine{routes: make([]Route, 0)} // bare struct literal — no history
644+
if err := e.ApplySnapshot(CatalogSnapshot{
645+
Version: 1,
646+
Routes: []RouteDescriptor{{RouteID: 1, Start: []byte(""), End: nil, GroupID: 1, State: RouteStateActive}},
647+
}); err != nil {
648+
t.Fatalf("apply on bare engine should succeed: %v", err)
649+
}
650+
if _, ok := e.SnapshotAt(1); ok {
651+
t.Fatal("bare engine has no history ring; SnapshotAt should always be false")
652+
}
653+
}

0 commit comments

Comments
 (0)