Skip to content

Commit 237a090

Browse files
committed
keyviz+adapter: rows-budget + true route_count for capped buckets
Round-1 review fixes for PR #646: Codex P1: GetKeyVizMatrix never read req.GetRows() and always returned every row, breaking the documented row-budget contract for deployments with many tracked routes. Add applyKeyVizRowBudget — when budget > 0, sort by per-row activity total (sum of values across all columns of the requested series) and truncate to the top-N before the final Start-order sort. Codex P2: route_count was set to len(MemberRoutes), but MemberRoutes is intentionally capped at MaxMemberRoutesPerSlot in the sampler while extra routes still drive the bucket counters. Capped buckets under-reported their contributors. Add MemberRoutesTotal to the keyviz routeSlot/MatrixRow surface, increment it in foldIntoBucket / RegisterRoute / bucket creation regardless of the visible cap, and decrement in pruneMemberRoute. The adapter now surfaces it as route_count and flips route_ids_truncated when the visible list is shorter than the total so consumers can tell their drill-down list is partial. Tests: - TestGetKeyVizMatrixHonorsRowsBudget — 4 routes, rows=2 returns top-2 by activity, sorted by Start. - TestGetKeyVizMatrixSurfacesRouteCountTruncation — visible cap=2, total=9 → route_count=9, route_ids_truncated=true. - TestGetKeyVizMatrixEncodesAggregateBucket updated for the new MemberRoutesTotal field on the test fixture.
1 parent 33d2181 commit 237a090

3 files changed

Lines changed: 180 additions & 40 deletions

File tree

adapter/admin_grpc.go

Lines changed: 49 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -535,7 +535,7 @@ func (s *AdminServer) GetKeyVizMatrix(
535535
to := unixMsToTime(req.GetToUnixMs())
536536
cols := sampler.Snapshot(from, to)
537537
pickValue := matrixSeriesPicker(req.GetSeries())
538-
return matrixToProto(cols, pickValue), nil
538+
return matrixToProto(cols, pickValue, int(req.GetRows())), nil
539539
}
540540

541541
// unixMsToTime converts a Unix-millisecond timestamp into a time.Time,
@@ -572,7 +572,13 @@ func matrixSeriesPicker(series pb.KeyVizSeries) func(keyviz.MatrixRow) uint64 {
572572
// values slice aligned to the column_unix_ms parallel slice. Idle
573573
// routes (zero in every column) are not emitted by the sampler, so
574574
// the row set already reflects observed activity in [from, to).
575-
func matrixToProto(cols []keyviz.MatrixColumn, pick func(keyviz.MatrixRow) uint64) *pb.GetKeyVizMatrixResponse {
575+
//
576+
// rowBudget caps how many rows the response carries — passing
577+
// 0 means "no cap." When the budget would be exceeded, rows are
578+
// sorted by total activity across the requested series and the
579+
// top-N retained, so callers asking for a compact matrix do not
580+
// receive a payload that scales with the route count.
581+
func matrixToProto(cols []keyviz.MatrixColumn, pick func(keyviz.MatrixRow) uint64, rowBudget int) *pb.GetKeyVizMatrixResponse {
576582
resp := &pb.GetKeyVizMatrixResponse{
577583
ColumnUnixMs: make([]int64, len(cols)),
578584
}
@@ -594,21 +600,56 @@ func matrixToProto(cols []keyviz.MatrixColumn, pick func(keyviz.MatrixRow) uint6
594600
for i, id := range order {
595601
resp.Rows[i] = rowsByID[id]
596602
}
603+
resp.Rows = applyKeyVizRowBudget(resp.Rows, rowBudget)
597604
sortKeyVizRowsByStart(resp.Rows)
598605
return resp
599606
}
600607

608+
// applyKeyVizRowBudget caps rows to budget by total activity per row
609+
// (sum of per-column values), preserving the top-N rows. budget <= 0
610+
// means "no cap."
611+
func applyKeyVizRowBudget(rows []*pb.KeyVizRow, budget int) []*pb.KeyVizRow {
612+
if budget <= 0 || len(rows) <= budget {
613+
return rows
614+
}
615+
sort.Slice(rows, func(i, j int) bool {
616+
return rowActivityTotal(rows[i]) > rowActivityTotal(rows[j])
617+
})
618+
return rows[:budget]
619+
}
620+
621+
func rowActivityTotal(r *pb.KeyVizRow) uint64 {
622+
var sum uint64
623+
for _, v := range r.Values {
624+
sum += v
625+
}
626+
return sum
627+
}
628+
601629
// newKeyVizRowFrom seeds a proto row from the first MatrixRow seen
602630
// for a given RouteID. Values is allocated with len == numCols so
603631
// every column gets a deterministic slot (zero-valued by default).
632+
//
633+
// route_count surfaces MemberRoutesTotal (the true number of routes
634+
// folded into the bucket) — not just len(MemberRoutes), which the
635+
// sampler caps at MaxMemberRoutesPerSlot. When the visible list is
636+
// shorter than the total, route_ids_truncated lets consumers know
637+
// to trust route_count for drill-down decisions.
604638
func newKeyVizRowFrom(mr keyviz.MatrixRow, numCols int) *pb.KeyVizRow {
639+
total := mr.MemberRoutesTotal
640+
if !mr.Aggregate && total == 0 {
641+
// Individual slots fall through to RouteCount=1 when the
642+
// sampler predates MemberRoutesTotal or never set it.
643+
total = 1
644+
}
605645
row := &pb.KeyVizRow{
606-
BucketId: bucketIDFor(mr),
607-
Start: append([]byte(nil), mr.Start...),
608-
End: append([]byte(nil), mr.End...),
609-
Aggregate: mr.Aggregate,
610-
RouteCount: uint64(len(mr.MemberRoutes)),
611-
Values: make([]uint64, numCols),
646+
BucketId: bucketIDFor(mr),
647+
Start: append([]byte(nil), mr.Start...),
648+
End: append([]byte(nil), mr.End...),
649+
Aggregate: mr.Aggregate,
650+
RouteCount: total,
651+
RouteIdsTruncated: mr.Aggregate && total > uint64(len(mr.MemberRoutes)),
652+
Values: make([]uint64, numCols),
612653
}
613654
if mr.Aggregate {
614655
row.RouteIds = append([]uint64(nil), mr.MemberRoutes...)

adapter/admin_grpc_keyviz_test.go

Lines changed: 78 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -143,20 +143,24 @@ func TestGetKeyVizMatrixSeriesSelection(t *testing.T) {
143143

144144
// TestGetKeyVizMatrixEncodesAggregateBucket pins the proto layout
145145
// for virtual buckets: bucket_id prefixed "virtual:", aggregate=true,
146-
// route_ids carries the MemberRoutes list, and route_count matches.
146+
// route_ids carries the visible MemberRoutes list, and route_count
147+
// reports the TRUE total (MemberRoutesTotal) — including past-cap
148+
// folded routes — so consumers always know how many routes
149+
// contributed.
147150
func TestGetKeyVizMatrixEncodesAggregateBucket(t *testing.T) {
148151
t.Parallel()
149152
srv := newAdminServerWithFakeSampler(t, []keyviz.MatrixColumn{
150153
{
151154
At: time.Unix(1_700_000_000, 0),
152155
Rows: []keyviz.MatrixRow{
153156
{
154-
RouteID: ^uint64(0), // synthetic virtual-bucket ID
155-
Start: []byte("c"),
156-
End: []byte("d"),
157-
Aggregate: true,
158-
MemberRoutes: []uint64{2, 3, 4},
159-
Reads: 50,
157+
RouteID: ^uint64(0), // synthetic virtual-bucket ID
158+
Start: []byte("c"),
159+
End: []byte("d"),
160+
Aggregate: true,
161+
MemberRoutes: []uint64{2, 3, 4},
162+
MemberRoutesTotal: 3,
163+
Reads: 50,
160164
},
161165
},
162166
},
@@ -171,5 +175,72 @@ func TestGetKeyVizMatrixEncodesAggregateBucket(t *testing.T) {
171175
require.True(t, r.Aggregate)
172176
require.Equal(t, "virtual:18446744073709551615", r.BucketId)
173177
require.Equal(t, uint64(3), r.RouteCount)
178+
require.False(t, r.RouteIdsTruncated)
174179
require.Equal(t, []uint64{2, 3, 4}, r.RouteIds)
175180
}
181+
182+
// TestGetKeyVizMatrixSurfacesRouteCountTruncation pins Codex round-1
183+
// P2 on PR #646: when the sampler caps MemberRoutes at
184+
// MaxMemberRoutesPerSlot, route_count must still report the TRUE
185+
// total (MemberRoutesTotal) and route_ids_truncated must flip true so
186+
// consumers know the visible list is a prefix.
187+
func TestGetKeyVizMatrixSurfacesRouteCountTruncation(t *testing.T) {
188+
t.Parallel()
189+
srv := newAdminServerWithFakeSampler(t, []keyviz.MatrixColumn{
190+
{
191+
At: time.Unix(1_700_000_000, 0),
192+
Rows: []keyviz.MatrixRow{
193+
{
194+
RouteID: ^uint64(0),
195+
Start: []byte("c"),
196+
End: []byte("d"),
197+
Aggregate: true,
198+
MemberRoutes: []uint64{2, 3}, // visible cap=2
199+
MemberRoutesTotal: 9, // 7 more folded past the cap
200+
Reads: 100,
201+
},
202+
},
203+
},
204+
})
205+
206+
resp, err := srv.GetKeyVizMatrix(context.Background(), &pb.GetKeyVizMatrixRequest{
207+
Series: pb.KeyVizSeries_KEYVIZ_SERIES_READS,
208+
})
209+
require.NoError(t, err)
210+
require.Len(t, resp.Rows, 1)
211+
r := resp.Rows[0]
212+
require.Equal(t, uint64(9), r.RouteCount, "route_count must reflect MemberRoutesTotal")
213+
require.True(t, r.RouteIdsTruncated, "route_ids_truncated must signal capped membership")
214+
require.Equal(t, []uint64{2, 3}, r.RouteIds)
215+
}
216+
217+
// TestGetKeyVizMatrixHonorsRowsBudget pins Codex round-1 P1 on
218+
// PR #646: a request with rows=N must return at most N rows. We
219+
// stage 4 routes with distinct activity totals and request rows=2;
220+
// the response must contain only the two highest-activity routes,
221+
// sorted by Start.
222+
func TestGetKeyVizMatrixHonorsRowsBudget(t *testing.T) {
223+
t.Parallel()
224+
srv := newAdminServerWithFakeSampler(t, []keyviz.MatrixColumn{
225+
{
226+
At: time.Unix(1_700_000_000, 0),
227+
Rows: []keyviz.MatrixRow{
228+
{RouteID: 1, Start: []byte("a"), End: []byte("b"), Reads: 1},
229+
{RouteID: 2, Start: []byte("b"), End: []byte("c"), Reads: 100},
230+
{RouteID: 3, Start: []byte("c"), End: []byte("d"), Reads: 5},
231+
{RouteID: 4, Start: []byte("d"), End: []byte("e"), Reads: 50},
232+
},
233+
},
234+
})
235+
236+
resp, err := srv.GetKeyVizMatrix(context.Background(), &pb.GetKeyVizMatrixRequest{
237+
Series: pb.KeyVizSeries_KEYVIZ_SERIES_READS,
238+
Rows: 2,
239+
})
240+
require.NoError(t, err)
241+
require.Len(t, resp.Rows, 2, "rows budget must cap response size")
242+
// Top 2 by activity = routes 2 (100) and 4 (50); sorted by Start
243+
// gives "b" then "d".
244+
require.Equal(t, "route:2", resp.Rows[0].BucketId)
245+
require.Equal(t, "route:4", resp.Rows[1].BucketId)
246+
}

keyviz/sampler.go

Lines changed: 53 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,12 @@ type routeSlot struct {
226226
// routes together (Snapshot surfaces this in MatrixRow).
227227
Aggregate bool
228228
MemberRoutes []uint64
229+
// MemberRoutesTotal counts every distinct routeID that has folded
230+
// into this bucket, including ones beyond MaxMemberRoutesPerSlot
231+
// (which still contribute to the counters but are not appended to
232+
// MemberRoutes). Always equals len(MemberRoutes) for individual
233+
// (non-Aggregate) slots.
234+
MemberRoutesTotal uint64
229235

230236
reads atomic.Uint64
231237
writes atomic.Uint64
@@ -238,7 +244,7 @@ type routeSlot struct {
238244
// Start/End/MemberRoutes with the live slot (which a later
239245
// RegisterRoute may extend, and which the snapshot API exports to
240246
// external consumers that may mutate the bounds).
241-
func (s *routeSlot) snapshotMeta() (start, end []byte, aggregate bool, members []uint64) {
247+
func (s *routeSlot) snapshotMeta() (start, end []byte, aggregate bool, members []uint64, membersTotal uint64) {
242248
s.metaMu.RLock()
243249
defer s.metaMu.RUnlock()
244250
start = cloneBytes(s.Start)
@@ -247,6 +253,7 @@ func (s *routeSlot) snapshotMeta() (start, end []byte, aggregate bool, members [
247253
if len(s.MemberRoutes) > 0 {
248254
members = append([]uint64(nil), s.MemberRoutes...)
249255
}
256+
membersTotal = s.MemberRoutesTotal
250257
return
251258
}
252259

@@ -263,6 +270,12 @@ type MatrixRow struct {
263270
Start, End []byte
264271
Aggregate bool
265272
MemberRoutes []uint64
273+
// MemberRoutesTotal is how many distinct route IDs contributed to
274+
// this row's counters, including ones that exceeded
275+
// MaxMemberRoutesPerSlot and so are NOT listed in MemberRoutes.
276+
// Snapshot consumers should treat MemberRoutes as the visible
277+
// prefix of this list when MemberRoutesTotal > len(MemberRoutes).
278+
MemberRoutesTotal uint64
266279

267280
Reads uint64
268281
Writes uint64
@@ -379,9 +392,10 @@ func (s *MemSampler) RegisterRoute(routeID uint64, start, end []byte) bool {
379392
slot := s.reclaimRetiredSlot(routeID)
380393
if slot == nil {
381394
slot = &routeSlot{
382-
RouteID: routeID,
383-
Start: cloneBytes(start),
384-
End: cloneBytes(end),
395+
RouteID: routeID,
396+
Start: cloneBytes(start),
397+
End: cloneBytes(end),
398+
MemberRoutesTotal: 1,
385399
}
386400
} else {
387401
// Re-registering the same routeID inside the grace window:
@@ -394,6 +408,7 @@ func (s *MemSampler) RegisterRoute(routeID uint64, start, end []byte) bool {
394408
slot.metaMu.Lock()
395409
slot.Start = cloneBytes(start)
396410
slot.End = cloneBytes(end)
411+
slot.MemberRoutesTotal = 1
397412
slot.metaMu.Unlock()
398413
}
399414
next.slots[routeID] = slot
@@ -410,11 +425,12 @@ func (s *MemSampler) RegisterRoute(routeID uint64, start, end []byte) bool {
410425
bucket := findVirtualBucket(next.sortedSlots, start)
411426
if bucket == nil {
412427
bucket = &routeSlot{
413-
RouteID: s.nextVirtualBucketID(),
414-
Start: cloneBytes(start),
415-
End: cloneBytes(end),
416-
Aggregate: true,
417-
MemberRoutes: []uint64{routeID},
428+
RouteID: s.nextVirtualBucketID(),
429+
Start: cloneBytes(start),
430+
End: cloneBytes(end),
431+
Aggregate: true,
432+
MemberRoutes: []uint64{routeID},
433+
MemberRoutesTotal: 1,
418434
}
419435
next.sortedSlots = appendSorted(next.sortedSlots, bucket)
420436
} else {
@@ -443,9 +459,11 @@ func (s *MemSampler) RegisterRoute(routeID uint64, start, end []byte) bool {
443459
// the routeID is not added to the visible member list.
444460
func (s *MemSampler) foldIntoBucket(next *routeTable, bucket *routeSlot, routeID uint64, start, end []byte) {
445461
bucket.metaMu.Lock()
446-
if !memberRoutesContains(bucket.MemberRoutes, routeID) &&
447-
len(bucket.MemberRoutes) < s.opts.MaxMemberRoutesPerSlot {
448-
bucket.MemberRoutes = append(bucket.MemberRoutes, routeID)
462+
if !memberRoutesContains(bucket.MemberRoutes, routeID) {
463+
bucket.MemberRoutesTotal++
464+
if len(bucket.MemberRoutes) < s.opts.MaxMemberRoutesPerSlot {
465+
bucket.MemberRoutes = append(bucket.MemberRoutes, routeID)
466+
}
449467
}
450468
if len(end) == 0 || (len(bucket.End) != 0 && bytesGT(end, bucket.End)) {
451469
bucket.End = cloneBytes(end)
@@ -703,17 +721,26 @@ func bucketStillReferenced(virtualForRoute map[uint64]*routeSlot, bucket *routeS
703721

704722
// pruneMemberRoute removes routeID from bucket.MemberRoutes under the
705723
// bucket's metaMu so a concurrent snapshotMeta reader sees a
706-
// consistent view.
724+
// consistent view. MemberRoutesTotal is decremented when the routeID
725+
// was visible in MemberRoutes (the only case we can confidently
726+
// account for) — routes pruned past the visible cap stay in the
727+
// total because we don't track individual past-cap members.
707728
func pruneMemberRoute(bucket *routeSlot, routeID uint64) {
708729
bucket.metaMu.Lock()
709730
defer bucket.metaMu.Unlock()
710731
filtered := bucket.MemberRoutes[:0]
732+
removed := false
711733
for _, m := range bucket.MemberRoutes {
712-
if m != routeID {
713-
filtered = append(filtered, m)
734+
if m == routeID {
735+
removed = true
736+
continue
714737
}
738+
filtered = append(filtered, m)
715739
}
716740
bucket.MemberRoutes = filtered
741+
if removed && bucket.MemberRoutesTotal > 0 {
742+
bucket.MemberRoutesTotal--
743+
}
717744
}
718745

719746
// Step returns the configured flush interval after applying default
@@ -739,17 +766,18 @@ func appendDrainedRow(rows []MatrixRow, slot *routeSlot) []MatrixRow {
739766
if reads == 0 && writes == 0 && readBytes == 0 && writeBytes == 0 {
740767
return rows
741768
}
742-
start, end, aggregate, members := slot.snapshotMeta()
769+
start, end, aggregate, members, membersTotal := slot.snapshotMeta()
743770
return append(rows, MatrixRow{
744-
RouteID: slot.RouteID,
745-
Start: start,
746-
End: end,
747-
Aggregate: aggregate,
748-
MemberRoutes: members,
749-
Reads: reads,
750-
Writes: writes,
751-
ReadBytes: readBytes,
752-
WriteBytes: writeBytes,
771+
RouteID: slot.RouteID,
772+
Start: start,
773+
End: end,
774+
Aggregate: aggregate,
775+
MemberRoutes: members,
776+
MemberRoutesTotal: membersTotal,
777+
Reads: reads,
778+
Writes: writes,
779+
ReadBytes: readBytes,
780+
WriteBytes: writeBytes,
753781
})
754782
}
755783

0 commit comments

Comments
 (0)