Skip to content

Commit 25061d6

Browse files
committed
keyviz: synthetic virtual bucket IDs + scoped prune cancel
Codex round-11 P2 #1: cancelPendingPrune ran unconditionally at the top of RegisterRoute. When a removed virtual member rejoined as an individual slot (capacity freed up), the cancel suppressed the only cleanup path for the old bucket's MemberRoutes — leaving the routeID listed on the bucket forever despite no longer contributing traffic. Replace with cancelPendingPruneFor(bucket, routeID), called only after we know the route is rejoining the same bucket; prunes against other buckets (or where the route lands as an individual slot) fire normally and clean up the old bucket's metadata. Codex round-11 P2 #2: a freshly created virtual bucket stamped its RouteID with the first folded real route ID. If that real route was later removed and re-registered as an individual slot, flushed columns could show two rows with the same RouteID — one aggregate, one individual. Add an atomic.Uint64 virtualIDCounter on MemSampler and hand out synthetic IDs from the high end of uint64 (MaxUint64, MaxUint64-1, …). The synthetic space cannot collide with real route IDs (assigned from the low end by the coordinator), so row identity stays unambiguous under churn. Claude bot nit: rename the local `bytes` variable in Observe to `byteCount` so it no longer shadows the new `bytes` stdlib import. Tests: TestVirtualBucketRouteIDIsSynthetic + TestRejoinAsIndividualLetsBucketPruneFire.
1 parent 41ab078 commit 25061d6

2 files changed

Lines changed: 121 additions & 22 deletions

File tree

keyviz/sampler.go

Lines changed: 52 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,14 @@ type MemSampler struct {
135135
retiredMu sync.Mutex
136136
retiredSlots []retiredSlot
137137
pendingPrunes []memberPrune
138+
139+
// virtualIDCounter hands out synthetic RouteIDs for new virtual
140+
// buckets, starting at MaxUint64 and decrementing. The synthetic
141+
// space cannot collide with real route IDs (which the coordinator
142+
// assigns from the low end), so a real RouteID can never appear
143+
// twice in the same column — once as an aggregate row, once as
144+
// an individual row — even under register/remove churn.
145+
virtualIDCounter atomic.Uint64
138146
}
139147

140148
// retiredSlot tracks a removed slot through its post-removal grace
@@ -317,23 +325,23 @@ func (s *MemSampler) Observe(routeID uint64, op Op, keyLen, valueLen int) {
317325
return
318326
}
319327
}
320-
bytes := uint64(0)
328+
byteCount := uint64(0)
321329
if keyLen > 0 {
322-
bytes += uint64(keyLen)
330+
byteCount += uint64(keyLen)
323331
}
324332
if valueLen > 0 {
325-
bytes += uint64(valueLen)
333+
byteCount += uint64(valueLen)
326334
}
327335
switch op {
328336
case OpRead:
329337
slot.reads.Add(1)
330-
if bytes > 0 {
331-
slot.readBytes.Add(bytes)
338+
if byteCount > 0 {
339+
slot.readBytes.Add(byteCount)
332340
}
333341
case OpWrite:
334342
slot.writes.Add(1)
335-
if bytes > 0 {
336-
slot.writeBytes.Add(bytes)
343+
if byteCount > 0 {
344+
slot.writeBytes.Add(byteCount)
337345
}
338346
}
339347
}
@@ -345,20 +353,19 @@ func (s *MemSampler) Observe(routeID uint64, op Op, keyLen, valueLen int) {
345353
// RouteID is a no-op (the original slot stays in place).
346354
//
347355
// If a previous RemoveRoute(routeID) queued a deferred member-prune
348-
// for this same RouteID and the grace window has not elapsed yet,
349-
// that prune is cancelled here — otherwise re-registering during
350-
// route churn would still see the routeID disappear from
351-
// bucket.MemberRoutes once grace expires, despite Observe attributing
352-
// fresh traffic to that ID.
356+
// and the route is now re-registered into the SAME bucket inside the
357+
// grace window, that prune is cancelled — otherwise the routeID
358+
// would disappear from bucket.MemberRoutes despite Observe still
359+
// attributing fresh traffic to it. Prunes for different buckets (or
360+
// when the route rejoins as an individual slot) are left alone so
361+
// the old bucket's MemberRoutes is correctly cleaned up.
353362
func (s *MemSampler) RegisterRoute(routeID uint64, start, end []byte) bool {
354363
if s == nil {
355364
return false
356365
}
357366
s.routesMu.Lock()
358367
defer s.routesMu.Unlock()
359368

360-
s.cancelPendingPrune(routeID)
361-
362369
cur := s.table.Load()
363370
if _, ok := cur.slots[routeID]; ok {
364371
return true
@@ -388,7 +395,7 @@ func (s *MemSampler) RegisterRoute(routeID uint64, start, end []byte) bool {
388395
bucket := findVirtualBucket(next.sortedSlots, start)
389396
if bucket == nil {
390397
bucket = &routeSlot{
391-
RouteID: routeID,
398+
RouteID: s.nextVirtualBucketID(),
392399
Start: cloneBytes(start),
393400
End: cloneBytes(end),
394401
Aggregate: true,
@@ -398,6 +405,11 @@ func (s *MemSampler) RegisterRoute(routeID uint64, start, end []byte) bool {
398405
} else {
399406
s.foldIntoBucket(next, bucket, routeID, start, end)
400407
}
408+
// The route is rejoining `bucket`; cancel any deferred prune for
409+
// this same routeID against this same bucket so it stays in
410+
// MemberRoutes. Prunes against other buckets are intentionally
411+
// left in place.
412+
s.cancelPendingPruneFor(bucket, routeID)
401413
next.virtualForRoute[routeID] = bucket
402414
s.table.Store(next)
403415
return false
@@ -567,17 +579,23 @@ func advancePendingPrunes(pending []memberPrune, now time.Time, grace time.Durat
567579
return keep
568580
}
569581

570-
// cancelPendingPrune drops any pendingPrune entries matching routeID,
571-
// stopping a deferred prune from executing after the route has been
572-
// re-registered inside the grace window. Holds retiredMu briefly off
573-
// the hot path; callers must already hold routesMu so the
574-
// cancellation pairs atomically with the route-table mutation.
575-
func (s *MemSampler) cancelPendingPrune(routeID uint64) {
582+
// cancelPendingPruneFor drops any pendingPrune entry whose routeID
583+
// AND bucket pointer both match — i.e. the same routeID is rejoining
584+
// the same bucket it was just removed from. This stops a deferred
585+
// prune from removing the routeID from MemberRoutes despite Observe
586+
// still attributing traffic to it. Prunes against other buckets (or
587+
// where the route rejoins as an individual slot) are left in place
588+
// so the old bucket's MemberRoutes is correctly cleaned up.
589+
//
590+
// Holds retiredMu briefly off the hot path; callers must already
591+
// hold routesMu so the cancellation pairs atomically with the
592+
// route-table mutation.
593+
func (s *MemSampler) cancelPendingPruneFor(bucket *routeSlot, routeID uint64) {
576594
s.retiredMu.Lock()
577595
defer s.retiredMu.Unlock()
578596
keep := s.pendingPrunes[:0]
579597
for _, p := range s.pendingPrunes {
580-
if p.routeID == routeID {
598+
if p.routeID == routeID && p.bucket == bucket {
581599
continue
582600
}
583601
keep = append(keep, p)
@@ -586,6 +604,18 @@ func (s *MemSampler) cancelPendingPrune(routeID uint64) {
586604
s.pendingPrunes = keep
587605
}
588606

607+
// nextVirtualBucketID returns a synthetic RouteID for a brand-new
608+
// virtual bucket. Synthetic IDs come from the high end of uint64 and
609+
// decrement, so they cannot collide with real route IDs (which are
610+
// assigned from the low end by the coordinator). Without this,
611+
// stamping the bucket with the first folded real RouteID would mean
612+
// that ID could later show up on TWO rows in the same column — one
613+
// aggregate, one individual — if the original route is later
614+
// re-registered as an individual slot.
615+
func (s *MemSampler) nextVirtualBucketID() uint64 {
616+
return s.virtualIDCounter.Add(^uint64(0)) // subtract 1; counter starts at MaxUint64+1
617+
}
618+
589619
// memberRoutesContains reports whether routeID is already listed in
590620
// members. Used as a dedup guard so re-registering a routeID inside
591621
// the prune grace window doesn't add a duplicate MemberRoutes entry.

keyviz/sampler_test.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -686,6 +686,75 @@ func TestSnapshotReturnsDeepCopy(t *testing.T) {
686686
}
687687
}
688688

689+
// TestVirtualBucketRouteIDIsSynthetic pins Codex round-11 P2: a
690+
// virtual bucket must not stamp itself with a real route ID, or that
691+
// real ID could later show up on TWO rows in the same column —
692+
// once aggregate, once individual — when the original folded route
693+
// is later re-registered as an individual slot.
694+
func TestVirtualBucketRouteIDIsSynthetic(t *testing.T) {
695+
t.Parallel()
696+
s, _ := newTestSampler(t, MemSamplerOptions{
697+
Step: time.Second,
698+
HistoryColumns: 4,
699+
MaxTrackedRoutes: 1,
700+
})
701+
mustRegister(t, s, 1, "a", "b")
702+
if s.RegisterRoute(2, []byte("c"), []byte("d")) {
703+
t.Fatal("route 2 should fold into a fresh virtual bucket")
704+
}
705+
s.Observe(2, OpRead, 0, 0)
706+
s.Flush()
707+
708+
rows := lastSnapshotColumn(t, s).Rows
709+
agg := findAggregateRow(t, rows)
710+
if agg.RouteID == 2 {
711+
t.Fatalf("aggregate bucket reused real RouteID 2 — synthetic ID space required: %v", agg)
712+
}
713+
for _, r := range rows {
714+
if !r.Aggregate && r.RouteID == agg.RouteID {
715+
t.Fatalf("real RouteID %d collided with aggregate row %v", r.RouteID, agg)
716+
}
717+
}
718+
}
719+
720+
// TestRejoinAsIndividualLetsBucketPruneFire pins Codex round-11 P2:
721+
// when a removed virtual member rejoins as an individual slot (capacity
722+
// freed up), the deferred member-prune for the old bucket must still
723+
// execute — otherwise the bucket's MemberRoutes keeps a route that no
724+
// longer contributes traffic to it.
725+
func TestRejoinAsIndividualLetsBucketPruneFire(t *testing.T) {
726+
t.Parallel()
727+
s, clk := newTestSampler(t, MemSamplerOptions{
728+
Step: time.Second,
729+
HistoryColumns: 4,
730+
MaxTrackedRoutes: 2,
731+
})
732+
mustRegister(t, s, 1, "a", "b")
733+
mustRegister(t, s, 2, "m", "n")
734+
if s.RegisterRoute(3, []byte("y"), []byte("z")) {
735+
t.Fatal("route 3 over budget should fold")
736+
}
737+
// Free capacity: remove route 2 (now slots have room) AND remove 3
738+
// (queued in pendingPrunes). Re-register 3 — it now fits as an
739+
// individual slot, so the old bucket should still get pruned.
740+
s.RemoveRoute(3)
741+
s.RemoveRoute(2)
742+
if !s.RegisterRoute(3, []byte("y"), []byte("z")) {
743+
t.Fatal("route 3 should now fit as individual slot")
744+
}
745+
746+
clk.Advance(s.graceWindow() + time.Second)
747+
s.Observe(3, OpRead, 0, 0)
748+
s.Flush()
749+
750+
rows := lastSnapshotColumn(t, s).Rows
751+
for _, r := range rows {
752+
if r.Aggregate && memberRoutesContain(r.MemberRoutes, 3) {
753+
t.Fatalf("after rejoin-as-individual + grace, bucket still lists route 3: %+v", r)
754+
}
755+
}
756+
}
757+
689758
// TestReRegisterDuringPruneGraceCancelsPrune pins Codex round-9 P2:
690759
// a virtual-member route that gets removed and re-registered inside
691760
// the prune grace window must not have its routeID stripped from

0 commit comments

Comments
 (0)