Skip to content

Commit 30f9049

Browse files
authored
feat(keyviz): in-memory Sampler + ring buffer + RunFlusher (#639)
## Summary First slice of Phase 2 of the Key Visualizer per [docs/admin_ui_key_visualizer_design.md §5](https://github.com/bootjp/elastickv/blob/main/docs/admin_ui_key_visualizer_design.md). This PR only adds the \`keyviz/\` package; wiring into \`kv.ShardedCoordinator\`, \`adapter.AdminServer.GetKeyVizMatrix\`, and \`main.go\` will follow in subsequent PRs to keep diffs reviewable. ### What's in - \`Sampler\` interface — nil-safe (nil receiver makes every method a no-op). - \`MemSampler\` hot path: \`atomic.Pointer[routeTable].Load\` + map lookup + 2× \`atomic.AddUint64\` (count + bytes). Matches design §10's per-call budget. - \`Flush\` drains via \`atomic.SwapUint64\` — no pointer retirement, no late-writer race past the snapshot. - COW route table publish (\`RegisterRoute\` / \`RemoveRoute\`) under \`routesMu\` off the hot path; one \`atomic.Pointer.Store\` on publish. - Route-budget cap (\`MaxTrackedRoutes\`, default 10000): excess routes fold into a virtual aggregate slot (\`MatrixRow.Aggregate=true\` + \`MemberRoutes\`), per design §5.3. - \`ringBuffer\` with chronological [from, to) Range queries; oldest entries drop on overflow. - \`RunFlusher\` convenience for the step-interval ticker. ### What's out (later PRs) - Coordinator call site (\`sampler.Observe(routeID, op, …)\`). - \`AdminServer.GetKeyVizMatrix\` RPC implementation. - \`main.go\` flag wiring + sampler construction. - Adaptive sub-sampling (§5.2) — MVP samples every op. - Persistence (§5.6) — Phase 3. ## Test plan - [x] Unit tests under \`-race\`: nil-safe contract, basic Observe→Flush, no-counts-lost across flush boundary under writer churn, route-budget coarsening, snapshot range filtering, ring eviction, RemoveRoute, unknown-route Observe, concurrent register+observe. - [x] \`go vet ./...\` and \`golangci-lint\` clean. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **New Features** * In-memory sampling to track reads/writes with size metrics. * Periodic flushing of accumulated samples at a configurable interval. * Route-based tracking with automatic aggregation when route capacity is exceeded. * Time-range snapshot retrieval with retention and eviction of old samples. * Graceful flusher lifecycle that starts/stops with the application without losing data. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
2 parents 7d9a607 + 927b194 commit 30f9049

5 files changed

Lines changed: 2031 additions & 0 deletions

File tree

keyviz/flusher.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package keyviz
2+
3+
import (
4+
"context"
5+
"time"
6+
)
7+
8+
// RunFlusher drives Sampler.Flush at the supplied interval until ctx
9+
// is cancelled. Returns when ctx fires; the final tick is not
10+
// executed (a graceful shutdown should call Sampler.Flush once more
11+
// after RunFlusher returns if it wants to harvest the in-progress
12+
// step).
13+
//
14+
// step <= 0 falls back to DefaultStep.
15+
//
16+
// This is a tiny wrapper so call sites in main.go don't need to spell
17+
// out the ticker boilerplate; testing the boilerplate is the unit test
18+
// for this package, not for callers.
19+
func RunFlusher(ctx context.Context, s *MemSampler, step time.Duration) {
20+
if s == nil {
21+
<-ctx.Done()
22+
return
23+
}
24+
if step <= 0 {
25+
step = DefaultStep
26+
}
27+
t := time.NewTicker(step)
28+
defer t.Stop()
29+
for {
30+
select {
31+
case <-ctx.Done():
32+
return
33+
case <-t.C:
34+
s.Flush()
35+
}
36+
}
37+
}

keyviz/flusher_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package keyviz
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
)
8+
9+
func TestRunFlusherTicksUntilCancel(t *testing.T) {
10+
t.Parallel()
11+
s := NewMemSampler(MemSamplerOptions{Step: 5 * time.Millisecond, HistoryColumns: 16})
12+
if !s.RegisterRoute(1, []byte("a"), []byte("b")) {
13+
t.Fatal("Register failed")
14+
}
15+
ctx, cancel := context.WithCancel(context.Background())
16+
done := make(chan struct{})
17+
go func() {
18+
defer close(done)
19+
RunFlusher(ctx, s, 5*time.Millisecond)
20+
}()
21+
22+
// Drive Observe across ticker firings.
23+
for i := 0; i < 10; i++ {
24+
s.Observe(1, OpRead, 0, 0)
25+
time.Sleep(2 * time.Millisecond)
26+
}
27+
cancel()
28+
select {
29+
case <-done:
30+
case <-time.After(time.Second):
31+
t.Fatal("RunFlusher did not return after cancel")
32+
}
33+
34+
cols := s.Snapshot(time.Time{}, time.Time{})
35+
if len(cols) == 0 {
36+
t.Fatal("expected at least one column from background flushes")
37+
}
38+
}
39+
40+
// TestRunFlusherNilSamplerWaitsCtx asserts the nil-sampler contract
41+
// (RunFlusher just blocks on ctx.Done so callers can hard-wire it
42+
// regardless of whether keyviz is enabled).
43+
func TestRunFlusherNilSamplerWaitsCtx(t *testing.T) {
44+
t.Parallel()
45+
ctx, cancel := context.WithCancel(context.Background())
46+
done := make(chan struct{})
47+
go func() {
48+
defer close(done)
49+
RunFlusher(ctx, nil, time.Millisecond)
50+
}()
51+
cancel()
52+
select {
53+
case <-done:
54+
case <-time.After(time.Second):
55+
t.Fatal("RunFlusher(nil) did not return on cancel")
56+
}
57+
}

keyviz/ring_buffer.go

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package keyviz
2+
3+
import (
4+
"sort"
5+
"time"
6+
)
7+
8+
// ringBuffer is a fixed-capacity circular buffer of MatrixColumn,
9+
// oldest-first. Push drops the oldest entry once cap is reached.
10+
// Range returns a half-open [from, to) slice in chronological order.
11+
//
12+
// The buffer is not goroutine-safe by itself; callers (MemSampler)
13+
// guard it with historyMu.
14+
type ringBuffer struct {
15+
cap int
16+
buf []MatrixColumn // logical-index ordered; head is the oldest
17+
wrapped bool
18+
pos int // next write index when wrapped == false
19+
}
20+
21+
func newRingBuffer(cap int) *ringBuffer {
22+
if cap < 1 {
23+
cap = 1
24+
}
25+
return &ringBuffer{cap: cap, buf: make([]MatrixColumn, 0, cap)}
26+
}
27+
28+
// Push appends a column. Once length reaches cap the oldest entry is
29+
// dropped (overwritten in place).
30+
func (r *ringBuffer) Push(col MatrixColumn) {
31+
if !r.wrapped {
32+
r.buf = append(r.buf, col)
33+
if len(r.buf) >= r.cap {
34+
r.wrapped = true
35+
r.pos = 0
36+
}
37+
return
38+
}
39+
r.buf[r.pos] = col
40+
r.pos = (r.pos + 1) % r.cap
41+
}
42+
43+
// Range returns the columns whose At falls in [from, to), oldest
44+
// first. Either bound may be the zero Time, meaning unbounded on that
45+
// side. The returned slice is a deep copy: each column has its own
46+
// Rows slice and each row owns its Start/End byte slices, so callers
47+
// may mutate any field without corrupting stored history or racing
48+
// with concurrent flushes.
49+
func (r *ringBuffer) Range(from, to time.Time) []MatrixColumn {
50+
all := r.snapshotOrdered()
51+
// snapshotOrdered is already chronologically ordered.
52+
lo := 0
53+
if !from.IsZero() {
54+
lo = sort.Search(len(all), func(i int) bool {
55+
return !all[i].At.Before(from)
56+
})
57+
}
58+
hi := len(all)
59+
if !to.IsZero() {
60+
hi = sort.Search(len(all), func(i int) bool {
61+
return !all[i].At.Before(to)
62+
})
63+
}
64+
if lo > hi {
65+
return nil
66+
}
67+
out := make([]MatrixColumn, hi-lo)
68+
for i, src := range all[lo:hi] {
69+
out[i] = cloneColumn(src)
70+
}
71+
return out
72+
}
73+
74+
// cloneColumn returns a deep copy of col: a fresh Rows slice with
75+
// each row's Start/End and MemberRoutes independently allocated.
76+
func cloneColumn(col MatrixColumn) MatrixColumn {
77+
rows := make([]MatrixRow, len(col.Rows))
78+
for i, row := range col.Rows {
79+
rows[i] = row
80+
rows[i].Start = cloneBytes(row.Start)
81+
rows[i].End = cloneBytes(row.End)
82+
if len(row.MemberRoutes) > 0 {
83+
rows[i].MemberRoutes = append([]uint64(nil), row.MemberRoutes...)
84+
}
85+
}
86+
return MatrixColumn{At: col.At, Rows: rows}
87+
}
88+
89+
// snapshotOrdered returns a chronologically ordered (oldest first)
90+
// view of the buffer contents in a fresh slice.
91+
func (r *ringBuffer) snapshotOrdered() []MatrixColumn {
92+
if !r.wrapped {
93+
return append([]MatrixColumn(nil), r.buf...)
94+
}
95+
out := make([]MatrixColumn, 0, r.cap)
96+
out = append(out, r.buf[r.pos:]...)
97+
out = append(out, r.buf[:r.pos]...)
98+
return out
99+
}

0 commit comments

Comments
 (0)