Skip to content

Commit 34651ad

Browse files
authored
feat(adapter): expose keyviz heatmap via AdminServer.GetKeyVizMatrix (#646)
## Summary - Implements `AdminServer.GetKeyVizMatrix` against the proto declared in `proto/admin.proto` (already generated; no proto edits). - New narrow `KeyVizSampler` interface in the adapter package (just `Snapshot(from, to time.Time) []keyviz.MatrixColumn`), so production wires `*keyviz.MemSampler` while tests pass an in-memory fake. - `AdminServer.RegisterSampler` mirrors `RegisterGroup`. Without it, `GetKeyVizMatrix` returns `codes.Unavailable` so callers can distinguish "keyviz disabled on this node" from "no data yet" (which is a successful empty response). - Pivots the column-major `MatrixColumn` slice into the row-major proto layout: one `KeyVizRow` per `RouteID` with values aligned to a parallel `column_unix_ms` slice. `KeyVizSeries` selection picks the matching per-row counter; `UNSPECIFIED` defaults to `Reads`. - `bucket_id` encodes `route:<id>` for individual slots and `virtual:<syntheticID>` for aggregate buckets. Aggregate rows carry `MemberRoutes` verbatim through `route_ids` and `route_count`. Implements the read-side half of `docs/admin_ui_key_visualizer_design.md` §5.2 / §6. The dispatch-side `Observe` wiring is in #645. ## Test plan - [x] `TestGetKeyVizMatrixReturnsUnavailableWhenSamplerNotRegistered` — verifies `codes.Unavailable` when no sampler registered. - [x] `TestGetKeyVizMatrixPivotsColumnsToRows` — two-column / two-route fixture, verifies the missing-row-becomes-zero contract. - [x] `TestGetKeyVizMatrixSeriesSelection` — table-driven across all five enum values including `UNSPECIFIED` defaulting to `Reads`. - [x] `TestGetKeyVizMatrixEncodesAggregateBucket` — virtual bucket layout (`bucket_id` prefix, `aggregate=true`, `route_ids`, `route_count`). - [x] `go test -race -count=1 -run TestGetKeyVizMatrix ./adapter/...` clean. - [x] `golangci-lint run ./adapter/...` clean.
2 parents ecd7f66 + 82dd4c4 commit 34651ad

4 files changed

Lines changed: 208 additions & 29 deletions

File tree

adapter/admin_grpc.go

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -535,7 +535,24 @@ func (s *AdminServer) GetKeyVizMatrix(
535535
to := unixMsToTime(req.GetToUnixMs())
536536
cols := sampler.Snapshot(from, to)
537537
pickValue := matrixSeriesPicker(req.GetSeries())
538-
return matrixToProto(cols, pickValue, int(req.GetRows())), nil
538+
return matrixToProto(cols, pickValue, clampRowBudget(int(req.GetRows()))), nil
539+
}
540+
541+
// keyVizRowBudgetCap is the upper bound on the per-request rows
542+
// budget — design doc §4.1 caps rows at 1024 to bound server work
543+
// (sort + payload) for adversarial / over-large requests.
544+
const keyVizRowBudgetCap = 1024
545+
546+
// clampRowBudget enforces design §4.1's upper bound. A request of 0
547+
// (or negative) means "no cap" and is preserved; anything past the
548+
// cap is silently clamped — clients asking for more rows than the
549+
// server is willing to render get the most rows the server will
550+
// render, not an error.
551+
func clampRowBudget(requested int) int {
552+
if requested > keyVizRowBudgetCap {
553+
return keyVizRowBudgetCap
554+
}
555+
return requested
539556
}
540557

541558
// unixMsToTime converts a Unix-millisecond timestamp into a time.Time,
@@ -549,21 +566,22 @@ func unixMsToTime(ms int64) time.Time {
549566
}
550567

551568
// matrixSeriesPicker returns a callback that extracts the requested
552-
// counter from a MatrixRow. KEYVIZ_SERIES_UNSPECIFIED (and READS)
553-
// fall through to Reads so a default-valued request still returns
554-
// something useful.
569+
// counter from a MatrixRow. KEYVIZ_SERIES_UNSPECIFIED falls through
570+
// to Writes per design doc §4.1 — write traffic is the primary
571+
// signal the heatmap is built around, and the read path is wired in
572+
// a follow-up phase.
555573
func matrixSeriesPicker(series pb.KeyVizSeries) func(keyviz.MatrixRow) uint64 {
556574
switch series {
557-
case pb.KeyVizSeries_KEYVIZ_SERIES_WRITES:
558-
return func(r keyviz.MatrixRow) uint64 { return r.Writes }
575+
case pb.KeyVizSeries_KEYVIZ_SERIES_READS:
576+
return func(r keyviz.MatrixRow) uint64 { return r.Reads }
559577
case pb.KeyVizSeries_KEYVIZ_SERIES_READ_BYTES:
560578
return func(r keyviz.MatrixRow) uint64 { return r.ReadBytes }
561579
case pb.KeyVizSeries_KEYVIZ_SERIES_WRITE_BYTES:
562580
return func(r keyviz.MatrixRow) uint64 { return r.WriteBytes }
563-
case pb.KeyVizSeries_KEYVIZ_SERIES_UNSPECIFIED, pb.KeyVizSeries_KEYVIZ_SERIES_READS:
564-
return func(r keyviz.MatrixRow) uint64 { return r.Reads }
581+
case pb.KeyVizSeries_KEYVIZ_SERIES_UNSPECIFIED, pb.KeyVizSeries_KEYVIZ_SERIES_WRITES:
582+
return func(r keyviz.MatrixRow) uint64 { return r.Writes }
565583
default:
566-
return func(r keyviz.MatrixRow) uint64 { return r.Reads }
584+
return func(r keyviz.MatrixRow) uint64 { return r.Writes }
567585
}
568586
}
569587

@@ -608,6 +626,14 @@ func matrixToProto(cols []keyviz.MatrixColumn, pick func(keyviz.MatrixRow) uint6
608626
// applyKeyVizRowBudget caps rows to budget by total activity per row
609627
// (sum of per-column values), preserving the top-N rows. budget <= 0
610628
// means "no cap."
629+
//
630+
// NOTE: design doc §5.5 specifies a "lexicographic walk + greedy
631+
// merge of low-activity adjacent ranges" algorithm — we simplify to
632+
// activity-descending truncation for Phase 1 because it covers the
633+
// common UI need (highlight hotspots) without needing the synthetic
634+
// virtual-bucket plumbing the merge requires. Phase 2 should swap
635+
// this for the spec'd merge so low-activity ranges become coarse
636+
// aggregates instead of being silently dropped.
611637
func applyKeyVizRowBudget(rows []*pb.KeyVizRow, budget int) []*pb.KeyVizRow {
612638
if budget <= 0 || len(rows) <= budget {
613639
return rows

adapter/admin_grpc_keyviz_test.go

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ func TestGetKeyVizMatrixSeriesSelection(t *testing.T) {
126126
series pb.KeyVizSeries
127127
want uint64
128128
}{
129-
{"unspecified defaults to reads", pb.KeyVizSeries_KEYVIZ_SERIES_UNSPECIFIED, 11},
129+
{"unspecified defaults to writes", pb.KeyVizSeries_KEYVIZ_SERIES_UNSPECIFIED, 22},
130130
{"reads", pb.KeyVizSeries_KEYVIZ_SERIES_READS, 11},
131131
{"writes", pb.KeyVizSeries_KEYVIZ_SERIES_WRITES, 22},
132132
{"read_bytes", pb.KeyVizSeries_KEYVIZ_SERIES_READ_BYTES, 333},
@@ -214,6 +214,35 @@ func TestGetKeyVizMatrixSurfacesRouteCountTruncation(t *testing.T) {
214214
require.Equal(t, []uint64{2, 3}, r.RouteIds)
215215
}
216216

217+
// TestGetKeyVizMatrixClampsRowsBudgetToCap pins design §4.1's
218+
// upper-bound: rows requests above the keyVizRowBudgetCap are
219+
// silently clamped down to the cap so a pathological client cannot
220+
// force the server to materialise an unbounded payload.
221+
func TestGetKeyVizMatrixClampsRowsBudgetToCap(t *testing.T) {
222+
t.Parallel()
223+
rows := make([]keyviz.MatrixRow, keyVizRowBudgetCap+5)
224+
for i := range rows {
225+
idx := uint64(i + 1) //nolint:gosec // i is bounded by keyVizRowBudgetCap+5
226+
rows[i] = keyviz.MatrixRow{
227+
RouteID: idx,
228+
Start: []byte{byte(i / 256), byte(i % 256)},
229+
End: []byte{byte((i + 1) / 256), byte((i + 1) % 256)},
230+
Writes: idx,
231+
}
232+
}
233+
srv := newAdminServerWithFakeSampler(t, []keyviz.MatrixColumn{{
234+
At: time.Unix(1_700_000_000, 0),
235+
Rows: rows,
236+
}})
237+
238+
resp, err := srv.GetKeyVizMatrix(context.Background(), &pb.GetKeyVizMatrixRequest{
239+
Series: pb.KeyVizSeries_KEYVIZ_SERIES_WRITES,
240+
Rows: uint32(keyVizRowBudgetCap + 1000),
241+
})
242+
require.NoError(t, err)
243+
require.Len(t, resp.Rows, keyVizRowBudgetCap, "rows must be clamped to keyVizRowBudgetCap")
244+
}
245+
217246
// TestGetKeyVizMatrixHonorsRowsBudget pins Codex round-1 P1 on
218247
// PR #646: a request with rows=N must return at most N rows. We
219248
// stage 4 routes with distinct activity totals and request rows=2;

keyviz/sampler.go

Lines changed: 55 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -234,11 +234,16 @@ type routeSlot struct {
234234
// routes together (Snapshot surfaces this in MatrixRow).
235235
Aggregate bool
236236
MemberRoutes []uint64
237-
// MemberRoutesTotal counts every distinct routeID that has folded
238-
// into this bucket, including ones beyond MaxMemberRoutesPerSlot
239-
// (which still contribute to the counters but are not appended to
240-
// MemberRoutes). Always equals len(MemberRoutes) for individual
241-
// (non-Aggregate) slots.
237+
// hiddenMembers stores routeIDs folded past
238+
// MaxMemberRoutesPerSlot. Used to dedup re-folds (so
239+
// MemberRoutesTotal doesn't drift on remove+re-register churn for
240+
// past-cap routes) and to drive accurate decrements in
241+
// pruneMemberRoute. nil for individual slots.
242+
hiddenMembers map[uint64]struct{}
243+
// MemberRoutesTotal is len(MemberRoutes) + len(hiddenMembers) — the
244+
// authoritative count of distinct routes folded into this bucket.
245+
// Always equals len(MemberRoutes) for individual (non-Aggregate)
246+
// slots.
242247
MemberRoutesTotal uint64
243248

244249
reads atomic.Uint64
@@ -463,16 +468,12 @@ func (s *MemSampler) RegisterRoute(routeID uint64, start, end []byte) bool {
463468
// to preserve Flush's key-order contract.
464469
//
465470
// MemberRoutes growth is capped by MaxMemberRoutesPerSlot — beyond
466-
// that cap the bucket counters still absorb the route's traffic, but
467-
// the routeID is not added to the visible member list.
471+
// that cap the bucket counters still absorb the route's traffic and
472+
// the routeID is recorded in hiddenMembers (so dedup is correct on
473+
// rejoin) but not appended to the visible MemberRoutes list.
468474
func (s *MemSampler) foldIntoBucket(next *routeTable, bucket *routeSlot, routeID uint64, start, end []byte) {
469475
bucket.metaMu.Lock()
470-
if !memberRoutesContains(bucket.MemberRoutes, routeID) {
471-
bucket.MemberRoutesTotal++
472-
if len(bucket.MemberRoutes) < s.opts.MaxMemberRoutesPerSlot {
473-
bucket.MemberRoutes = append(bucket.MemberRoutes, routeID)
474-
}
475-
}
476+
addMemberToBucket(bucket, routeID, s.opts.MaxMemberRoutesPerSlot)
476477
if len(end) == 0 || (len(bucket.End) != 0 && bytesGT(end, bucket.End)) {
477478
bucket.End = cloneBytes(end)
478479
}
@@ -486,6 +487,31 @@ func (s *MemSampler) foldIntoBucket(next *routeTable, bucket *routeSlot, routeID
486487
}
487488
}
488489

490+
// addMemberToBucket records routeID against bucket, choosing the
491+
// visible MemberRoutes list when there's room and falling back to
492+
// the hiddenMembers set past the cap. Both lists are deduped so a
493+
// rejoin during the prune grace doesn't inflate MemberRoutesTotal.
494+
// Caller holds bucket.metaMu.
495+
func addMemberToBucket(bucket *routeSlot, routeID uint64, visibleCap int) {
496+
if memberRoutesContains(bucket.MemberRoutes, routeID) {
497+
return
498+
}
499+
if bucket.hiddenMembers != nil {
500+
if _, ok := bucket.hiddenMembers[routeID]; ok {
501+
return
502+
}
503+
}
504+
if len(bucket.MemberRoutes) < visibleCap {
505+
bucket.MemberRoutes = append(bucket.MemberRoutes, routeID)
506+
} else {
507+
if bucket.hiddenMembers == nil {
508+
bucket.hiddenMembers = make(map[uint64]struct{})
509+
}
510+
bucket.hiddenMembers[routeID] = struct{}{}
511+
}
512+
bucket.MemberRoutesTotal++
513+
}
514+
489515
// RemoveRoute drops a RouteID from tracking. Counts accumulated since
490516
// the last flush are NOT lost: the retired slot (or, for virtual-bucket
491517
// members, just the membership entry) is queued for one final drain by
@@ -727,12 +753,13 @@ func bucketStillReferenced(virtualForRoute map[uint64]*routeSlot, bucket *routeS
727753
return false
728754
}
729755

730-
// pruneMemberRoute removes routeID from bucket.MemberRoutes under the
731-
// bucket's metaMu so a concurrent snapshotMeta reader sees a
732-
// consistent view. MemberRoutesTotal is decremented when the routeID
733-
// was visible in MemberRoutes (the only case we can confidently
734-
// account for) — routes pruned past the visible cap stay in the
735-
// total because we don't track individual past-cap members.
756+
// pruneMemberRoute removes routeID from bucket.MemberRoutes (or
757+
// hiddenMembers) under the bucket's metaMu so a concurrent
758+
// snapshotMeta reader sees a consistent view. MemberRoutesTotal is
759+
// decremented whenever the routeID was actually present in either
760+
// list — including past-cap members in hiddenMembers — so the
761+
// reported route_count stays truthful across remove/re-register
762+
// churn.
736763
func pruneMemberRoute(bucket *routeSlot, routeID uint64) {
737764
bucket.metaMu.Lock()
738765
defer bucket.metaMu.Unlock()
@@ -746,6 +773,15 @@ func pruneMemberRoute(bucket *routeSlot, routeID uint64) {
746773
filtered = append(filtered, m)
747774
}
748775
bucket.MemberRoutes = filtered
776+
if !removed && bucket.hiddenMembers != nil {
777+
if _, ok := bucket.hiddenMembers[routeID]; ok {
778+
delete(bucket.hiddenMembers, routeID)
779+
if len(bucket.hiddenMembers) == 0 {
780+
bucket.hiddenMembers = nil
781+
}
782+
removed = true
783+
}
784+
}
749785
if removed && bucket.MemberRoutesTotal > 0 {
750786
bucket.MemberRoutesTotal--
751787
}

keyviz/sampler_test.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -916,6 +916,94 @@ func TestMemberRoutesCappedAtConfiguredCap(t *testing.T) {
916916
if agg.Reads != 8 {
917917
t.Fatalf("bucket Reads = %d, want 8 (counters must absorb traffic past cap)", agg.Reads)
918918
}
919+
if agg.MemberRoutesTotal != 8 {
920+
t.Fatalf("MemberRoutesTotal = %d, want 8 (true count of contributors past cap)", agg.MemberRoutesTotal)
921+
}
922+
}
923+
924+
// TestPastCapMemberRejoinDoesNotInflateTotal pins Codex round-2 P2
925+
// on PR #646: re-registering a hidden (past-cap) routeID inside the
926+
// prune grace must not double-count MemberRoutesTotal. The hidden
927+
// member set serves as dedup so route_count stays truthful across
928+
// remove/re-register churn for routes the visible MemberRoutes list
929+
// never carried.
930+
func TestPastCapMemberRejoinDoesNotInflateTotal(t *testing.T) {
931+
t.Parallel()
932+
s, _ := newTestSampler(t, MemSamplerOptions{
933+
Step: time.Second,
934+
HistoryColumns: 4,
935+
MaxTrackedRoutes: 1,
936+
MaxMemberRoutesPerSlot: 1, // visible cap=1; routes 3+4 land in hidden set
937+
})
938+
mustRegister(t, s, 1, "a", "b")
939+
if s.RegisterRoute(2, []byte("c"), []byte("d")) {
940+
t.Fatal("route 2 should fold (visible)")
941+
}
942+
if s.RegisterRoute(3, []byte("e"), []byte("f")) {
943+
t.Fatal("route 3 should fold (hidden — past cap)")
944+
}
945+
if s.RegisterRoute(4, []byte("g"), []byte("h")) {
946+
t.Fatal("route 4 should fold (hidden — past cap)")
947+
}
948+
s.Observe(2, OpRead, 0, 0)
949+
s.Flush()
950+
beforeTotal := findAggregateRow(t, lastSnapshotColumn(t, s).Rows).MemberRoutesTotal
951+
if beforeTotal != 3 {
952+
t.Fatalf("baseline MemberRoutesTotal = %d, want 3", beforeTotal)
953+
}
954+
// Remove the past-cap route 3 and re-register it inside the prune
955+
// grace. Without the hidden-member dedup foldIntoBucket would
956+
// increment MemberRoutesTotal again, drifting route_count up.
957+
s.RemoveRoute(3)
958+
if s.RegisterRoute(3, []byte("e"), []byte("f")) {
959+
t.Fatal("route 3 should fold again")
960+
}
961+
s.Observe(2, OpRead, 0, 0)
962+
s.Flush()
963+
afterTotal := findAggregateRow(t, lastSnapshotColumn(t, s).Rows).MemberRoutesTotal
964+
if afterTotal != 3 {
965+
t.Fatalf("after remove+re-register, MemberRoutesTotal = %d, want 3 (no drift)", afterTotal)
966+
}
967+
}
968+
969+
// TestPastCapMemberPruneDecrementsTotal pins the other half of Codex
970+
// round-2 P2: when grace expires for a hidden (past-cap) member, the
971+
// prune must decrement MemberRoutesTotal — otherwise route_count
972+
// stays inflated even after the route is fully retired.
973+
func TestPastCapMemberPruneDecrementsTotal(t *testing.T) {
974+
t.Parallel()
975+
s, clk := newTestSampler(t, MemSamplerOptions{
976+
Step: time.Second,
977+
HistoryColumns: 8,
978+
MaxTrackedRoutes: 1,
979+
MaxMemberRoutesPerSlot: 1,
980+
})
981+
mustRegister(t, s, 1, "a", "b")
982+
if s.RegisterRoute(2, []byte("c"), []byte("d")) {
983+
t.Fatal("route 2 should fold (visible)")
984+
}
985+
if s.RegisterRoute(3, []byte("e"), []byte("f")) {
986+
t.Fatal("route 3 should fold (hidden — past cap)")
987+
}
988+
s.Observe(2, OpRead, 0, 0)
989+
s.Flush()
990+
require := func(want uint64, ctx string) {
991+
t.Helper()
992+
s.Observe(2, OpRead, 0, 0)
993+
s.Flush()
994+
got := findAggregateRow(t, lastSnapshotColumn(t, s).Rows).MemberRoutesTotal
995+
if got != want {
996+
t.Fatalf("%s: MemberRoutesTotal = %d, want %d", ctx, got, want)
997+
}
998+
}
999+
require(2, "baseline")
1000+
s.RemoveRoute(3) // hidden member — pendingPrune queued
1001+
clk.Advance(s.graceWindow() + time.Second)
1002+
// First flush past grace: drains the bucket (snapshot still
1003+
// shows Total=2) THEN fires the pending prune. The next flush is
1004+
// the first one whose drain reflects the post-prune state.
1005+
require(2, "drain runs before prune fires")
1006+
require(1, "after past-cap prune fires")
9191007
}
9201008

9211009
// TestRetiredTailClearedAfterDrop pins Codex round-7 P2: after a

0 commit comments

Comments
 (0)