Skip to content

Commit 65e725a

Browse files
authored
feat(keyviz): sampler GroupID + LeaderTerm stamping (Phase 2-C+ PR-3a) (#709)
## Summary - Add `GroupID` field to `routeSlot` and `RaftGroupID + LeaderTerm` to `MatrixRow` (Phase 2-C+ §9.1 dedupe key) - Add `MemSampler.SetLeaderTerm(groupID, term)` API and `groupTerms` snapshot at Flush time - Extend `RegisterRoute` with a `groupID uint64` parameter; update all call sites (`main.go` reads `r.GroupID` from `distribution.Engine.Stats()`) ## Why Per `docs/design/2026_04_27_proposed_keyviz_cluster_fanout.md` §9.1, the cluster fan-out aggregator dedupes write samples by `(routeID, raftGroupID, leaderTerm, columnAt)` instead of the conservative max-merge. Max-merge undercounts during a leadership flip when the new leader observes a window the old leader was halfway through. Carrying `RaftGroupID + LeaderTerm` on every row gives the merge enough information to dedupe per-term and sum across terms. ## Scope (PR-3a — sampler API only) This PR ships the sampler-side API. Wiring lands in follow-up PRs: - **PR-3a (this)**: `routeSlot.GroupID`, `MatrixRow.RaftGroupID + LeaderTerm`, `SetLeaderTerm` API, `RegisterRoute` signature, `Flush` term snapshot. - **PR-3b (follow-up)**: periodic ticker in `main.go` that polls engine Status and calls `SetLeaderTerm`; proto + JSON wire-format additions. - **PR-3c (follow-up)**: aggregator-side `(group, term)`-keyed dedupe in `internal/admin/keyviz_fanout.go`. With `SetLeaderTerm` never called (this PR alone), every row emits `LeaderTerm=0` and the fan-out merge falls back to today's max-merge → no behavior change for legacy deployments. ## Test plan - [x] `go test -race -count=1 ./keyviz/...` — passes - [x] `go test -race -count=1 ./internal/admin/...` — passes (uses MemSampler) - [x] `go test -race -count=1 .` (root, `main_keyviz_test.go`) — passes - [x] `go build ./...` — clean - [x] `golangci-lint run` — 0 issues - [ ] Jepsen — N/A (sampler-internal change, no replication/MVCC impact) ## Five-lens self-review 1. **Data loss** — no on-disk format change; legacy max-merge fallback preserves today's behavior when `SetLeaderTerm` is never called. 2. **Concurrency** — `groupTermsMu` is a fine-grained `RWMutex`; `Observe` never touches it; `snapshotGroupTerms` clones at the top of `Flush` so every row in a column observes a stable view, even if `SetLeaderTerm` fires concurrently. 3. **Performance** — `Observe` hot path is unchanged (no new map lookup, no new lock); `Flush` gains one `RLock` + map clone per column, bounded by `len(groupTerms)` (typically ≤ a few groups). 4. **Data consistency** — `(routeID, raftGroupID, leaderTerm)` is a strict superset of today's `routeID` dedupe key; rows with `LeaderTerm=0` collapse to the legacy max-merge. 5. **Test coverage** — new `SetLeaderTerm` publishing + `Flush` snapshot tests under `keyviz/sampler_test.go`; existing `RegisterRoute` test signatures updated; race detector clean.
2 parents 7ac9c04 + eab91c9 commit 65e725a

5 files changed

Lines changed: 318 additions & 60 deletions

File tree

keyviz/flusher_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
func TestRunFlusherTicksUntilCancel(t *testing.T) {
1010
t.Parallel()
1111
s := NewMemSampler(MemSamplerOptions{Step: 5 * time.Millisecond, HistoryColumns: 16})
12-
if !s.RegisterRoute(1, []byte("a"), []byte("b")) {
12+
if !s.RegisterRoute(1, []byte("a"), []byte("b"), 0) {
1313
t.Fatal("Register failed")
1414
}
1515
ctx, cancel := context.WithCancel(context.Background())

keyviz/sampler.go

Lines changed: 105 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,13 @@ type MemSampler struct {
160160
// twice in the same column — once as an aggregate row, once as
161161
// an individual row — even under register/remove churn.
162162
virtualIDCounter atomic.Uint64
163+
164+
// groupTermsMu guards groupTerms. SetLeaderTerm and Flush both
165+
// touch the map; Observe never does. The lock is fine-grained
166+
// enough that contention is bounded by leader-term flips, which
167+
// happen at most a few times per second across the whole cluster.
168+
groupTermsMu sync.RWMutex
169+
groupTerms map[uint64]uint64
163170
}
164171

165172
// retiredSlot tracks a removed slot through its post-removal grace
@@ -237,6 +244,14 @@ type routeTable struct {
237244
type routeSlot struct {
238245
metaMu sync.RWMutex
239246
RouteID uint64
247+
// GroupID is the Raft group this route belongs to. Phase 2-C+
248+
// stamps this on every emitted MatrixRow so the cluster fan-out
249+
// aggregator can dedupe write samples by (raftGroupID,
250+
// leaderTerm) instead of the conservative max-merge that may
251+
// undercount during a leadership flip. 0 means "no group
252+
// attached" (legacy single-group deployments and the synthetic
253+
// virtual-bucket slots both use 0).
254+
GroupID uint64
240255
Start []byte
241256
End []byte
242257
// Aggregate marks virtual buckets that fold multiple coarsened
@@ -266,7 +281,7 @@ type routeSlot struct {
266281
// Start/End/MemberRoutes with the live slot (which a later
267282
// RegisterRoute may extend, and which the snapshot API exports to
268283
// external consumers that may mutate the bounds).
269-
func (s *routeSlot) snapshotMeta() (start, end []byte, aggregate bool, members []uint64, membersTotal uint64) {
284+
func (s *routeSlot) snapshotMeta() (start, end []byte, aggregate bool, members []uint64, membersTotal, groupID uint64) {
270285
s.metaMu.RLock()
271286
defer s.metaMu.RUnlock()
272287
start = cloneBytes(s.Start)
@@ -276,6 +291,7 @@ func (s *routeSlot) snapshotMeta() (start, end []byte, aggregate bool, members [
276291
members = append([]uint64(nil), s.MemberRoutes...)
277292
}
278293
membersTotal = s.MemberRoutesTotal
294+
groupID = s.GroupID
279295
return
280296
}
281297

@@ -299,6 +315,16 @@ type MatrixRow struct {
299315
// prefix of this list when MemberRoutesTotal > len(MemberRoutes).
300316
MemberRoutesTotal uint64
301317

318+
// RaftGroupID + LeaderTerm carry the route's Raft identity at the
319+
// time the column was flushed. Stamped from the per-group term
320+
// snapshot SetLeaderTerm publishes (Phase 2-C+ fan-out merge
321+
// uses (RouteID/BucketID, RaftGroupID, LeaderTerm, columnAt) as
322+
// the dedupe key). Zero values mean "term not tracked" — emitted
323+
// when no SetLeaderTerm call has been made for the group yet, or
324+
// for synthetic virtual-bucket slots that span groups.
325+
RaftGroupID uint64
326+
LeaderTerm uint64
327+
302328
Reads uint64
303329
Writes uint64
304330
ReadBytes uint64
@@ -331,14 +357,61 @@ func NewMemSampler(opts MemSamplerOptions) *MemSampler {
331357
now = time.Now
332358
}
333359
s := &MemSampler{
334-
opts: opts,
335-
now: now,
336-
history: newRingBuffer(opts.HistoryColumns),
360+
opts: opts,
361+
now: now,
362+
history: newRingBuffer(opts.HistoryColumns),
363+
groupTerms: map[uint64]uint64{},
337364
}
338365
s.table.Store(newEmptyRouteTable())
339366
return s
340367
}
341368

369+
// SetLeaderTerm publishes the current Raft leader term for the given
370+
// group. Called by main.go on a periodic ticker that polls the engine
371+
// Status, and on every term-change observed via the engine. Phase 2-C+
372+
// stamps each MatrixRow's LeaderTerm from this snapshot at Flush time.
373+
//
374+
// Calling with term == 0 is allowed but treated as "term unknown"
375+
// during merge — the canonical (groupID, term) dedupe key collapses
376+
// to the legacy max-merge for cells whose LeaderTerm is 0. This lets
377+
// nodes that have not finished engine startup contribute partial data
378+
// without poisoning the merge.
379+
//
380+
// groupID == 0 is reserved for virtual aggregate buckets (which span
381+
// multiple real groups). Calls with groupID == 0 are silently ignored
382+
// so a future caller cannot accidentally stamp a non-zero term on
383+
// aggregate rows — they must remain LeaderTerm == 0 so the fan-out
384+
// merge falls back to max-merge for cross-group cells.
385+
//
386+
// nil-receiver-safe.
387+
func (s *MemSampler) SetLeaderTerm(groupID, term uint64) {
388+
if s == nil || groupID == 0 {
389+
return
390+
}
391+
s.groupTermsMu.Lock()
392+
defer s.groupTermsMu.Unlock()
393+
s.groupTerms[groupID] = term
394+
}
395+
396+
// snapshotGroupTerms returns a copy of the per-group term map. Called
397+
// at the top of Flush so the column built below sees a stable view
398+
// of the term mapping even if SetLeaderTerm fires concurrently.
399+
// Returns nil when no terms have been published yet — `nil[k]` returns
400+
// the zero value in Go, so the row-builder hot path needs no nil guard
401+
// and avoids one allocation per Flush before SetLeaderTerm is wired.
402+
func (s *MemSampler) snapshotGroupTerms() map[uint64]uint64 {
403+
s.groupTermsMu.RLock()
404+
defer s.groupTermsMu.RUnlock()
405+
if len(s.groupTerms) == 0 {
406+
return nil
407+
}
408+
out := make(map[uint64]uint64, len(s.groupTerms))
409+
for g, t := range s.groupTerms {
410+
out[g] = t
411+
}
412+
return out
413+
}
414+
342415
func newEmptyRouteTable() *routeTable {
343416
return &routeTable{
344417
slots: map[uint64]*routeSlot{},
@@ -385,10 +458,18 @@ func (s *MemSampler) Observe(routeID uint64, op Op, keyLen, valueLen int) {
385458
}
386459

387460
// RegisterRoute adds a (RouteID, [Start, End)) pair to the tracking
388-
// set. Returns true when the route gets its own slot, false when the
461+
// set. groupID is the Raft group the route belongs to; it is stamped
462+
// onto every MatrixRow this slot eventually emits so the Phase 2-C+
463+
// fan-out merge can dedupe write samples by (groupID, leaderTerm).
464+
// Pass 0 when the deployment doesn't use multiple groups (legacy /
465+
// single-group setups); the legacy max-merge fallback handles those.
466+
//
467+
// Returns true when the route gets its own slot, false when the
389468
// MaxTrackedRoutes cap was hit and the route was folded into a
390469
// virtual aggregate bucket. Idempotent: calling twice with the same
391-
// RouteID is a no-op (the original slot stays in place).
470+
// RouteID is a no-op (the original slot stays in place; a different
471+
// groupID on the second call is silently ignored — RegisterRoute is
472+
// not the right API to retag a slot, RemoveRoute + RegisterRoute is).
392473
//
393474
// If a previous RemoveRoute(routeID) queued a deferred member-prune
394475
// and the route is now re-registered into the SAME bucket inside the
@@ -397,7 +478,7 @@ func (s *MemSampler) Observe(routeID uint64, op Op, keyLen, valueLen int) {
397478
// attributing fresh traffic to it. Prunes for different buckets (or
398479
// when the route rejoins as an individual slot) are left alone so
399480
// the old bucket's MemberRoutes is correctly cleaned up.
400-
func (s *MemSampler) RegisterRoute(routeID uint64, start, end []byte) bool {
481+
func (s *MemSampler) RegisterRoute(routeID uint64, start, end []byte, groupID uint64) bool {
401482
if s == nil {
402483
return false
403484
}
@@ -418,6 +499,7 @@ func (s *MemSampler) RegisterRoute(routeID uint64, start, end []byte) bool {
418499
if slot == nil {
419500
slot = &routeSlot{
420501
RouteID: routeID,
502+
GroupID: groupID,
421503
Start: cloneBytes(start),
422504
End: cloneBytes(end),
423505
MemberRoutesTotal: 1,
@@ -431,6 +513,7 @@ func (s *MemSampler) RegisterRoute(routeID uint64, start, end []byte) bool {
431513
// counts split across them. Refresh the metadata fields to
432514
// match the new registration; counters are preserved.
433515
slot.metaMu.Lock()
516+
slot.GroupID = groupID
434517
slot.Start = cloneBytes(start)
435518
slot.End = cloneBytes(end)
436519
slot.MemberRoutesTotal = 1
@@ -601,14 +684,21 @@ func (s *MemSampler) Flush() {
601684
return
602685
}
603686
col := MatrixColumn{At: s.now()}
687+
// Snapshot the per-group leader-term map once at the top of
688+
// Flush so every row in this column sees a consistent view, even
689+
// if SetLeaderTerm fires concurrently. Later rows can never
690+
// observe a *newer* term than earlier rows in the same column,
691+
// which preserves the merge-side invariant that all rows in a
692+
// (groupID, columnAt) tuple share a single leaderTerm value.
693+
terms := s.snapshotGroupTerms()
604694
tbl := s.table.Load()
605695
for _, slot := range tbl.sortedSlots {
606-
col.Rows = appendDrainedRow(col.Rows, slot)
696+
col.Rows = appendDrainedRow(col.Rows, slot, terms)
607697
}
608698

609699
grace := s.graceWindow()
610700
s.retiredMu.Lock()
611-
s.retiredSlots = drainRetiredSlots(s.retiredSlots, &col.Rows, col.At, grace)
701+
s.retiredSlots = drainRetiredSlots(s.retiredSlots, &col.Rows, col.At, grace, terms)
612702
s.pendingPrunes = advancePendingPrunes(s.pendingPrunes, col.At, grace)
613703
s.retiredMu.Unlock()
614704

@@ -628,10 +718,10 @@ func (s *MemSampler) Flush() {
628718
// this final drain. The dropped tail of the backing array is zeroed
629719
// so released *routeSlot pointers do not stay GC-reachable through
630720
// the reused capacity.
631-
func drainRetiredSlots(retired []retiredSlot, rows *[]MatrixRow, now time.Time, grace time.Duration) []retiredSlot {
721+
func drainRetiredSlots(retired []retiredSlot, rows *[]MatrixRow, now time.Time, grace time.Duration, terms map[uint64]uint64) []retiredSlot {
632722
keep := retired[:0]
633723
for _, r := range retired {
634-
*rows = appendDrainedRow(*rows, r.slot)
724+
*rows = appendDrainedRow(*rows, r.slot, terms)
635725
if now.Sub(r.retiredAt) < grace {
636726
keep = append(keep, r)
637727
}
@@ -825,17 +915,19 @@ func (s *MemSampler) HistoryColumns() int {
825915
// MatrixRow when any counter was non-zero. Idle slots are skipped.
826916
// Metadata is read under the slot's metaMu so a concurrent
827917
// RegisterRoute fold cannot race with the row materialisation.
828-
func appendDrainedRow(rows []MatrixRow, slot *routeSlot) []MatrixRow {
918+
func appendDrainedRow(rows []MatrixRow, slot *routeSlot, terms map[uint64]uint64) []MatrixRow {
829919
reads := slot.reads.Swap(0)
830920
writes := slot.writes.Swap(0)
831921
readBytes := slot.readBytes.Swap(0)
832922
writeBytes := slot.writeBytes.Swap(0)
833923
if reads == 0 && writes == 0 && readBytes == 0 && writeBytes == 0 {
834924
return rows
835925
}
836-
start, end, aggregate, members, membersTotal := slot.snapshotMeta()
926+
start, end, aggregate, members, membersTotal, groupID := slot.snapshotMeta()
837927
return append(rows, MatrixRow{
838928
RouteID: slot.RouteID,
929+
RaftGroupID: groupID,
930+
LeaderTerm: terms[groupID],
839931
Start: start,
840932
End: end,
841933
Aggregate: aggregate,

0 commit comments

Comments
 (0)