Skip to content

Commit 5b3169f

Browse files
authored
feat(kv): wire keyviz.Sampler into ShardedCoordinator dispatch path (#645)
## Summary - Adds a `WithSampler(s keyviz.Sampler) *ShardedCoordinator` option (mirroring `WithLeaseReadObserver`) plus a single `observeMutation` call inside `groupMutations`. - Each resolved `(RouteID, mutation)` pair produces one `sampler.Observe(routeID, OpWrite, len(Key), len(Value))` call before the mutation is grouped by `GroupID`. Reads do not reach this path; transactional dispatch reuses `groupMutations` so it gets wired automatically. - `DelPrefix` is intentionally not observed — `dispatchDelPrefixBroadcast` broadcasts to every shard rather than resolving a single `RouteID`, so per-route attribution is out of scope for this slice. - Nil-safety: an interface-nil `c.sampler` is guarded at the call site; the `keyviz.MemSampler` contract also tolerates a typed-nil receiver, so a disabled sampler costs one branch off the hot path. Implements task §5.1 from `docs/admin_ui_key_visualizer_design.md` (split out of the keyviz design originally landed in #639). ## Test plan - [x] `TestShardedCoordinatorObservesEveryDispatchedMutation` — cross-shard Put batch, verifies one Observe per element with engine-resolved RouteID, OpWrite, and exact keyLen / valueLen. - [x] `TestShardedCoordinatorWithoutSamplerStaysSafe` — dispatches succeed with no `WithSampler` call (interface-nil) and with a typed-nil `*MemSampler`. - [x] `go test -race -count=1 ./kv/... ./keyviz/...` clean. - [x] `golangci-lint run ./kv/...` clean. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **New Features** * Optional sampling for mutation dispatches: when enabled, each routed mutation records routing, operation kind, and key/value size metrics; no effect when disabled. * **Tests** * Added tests covering sampler integration, cross-shard dispatch observations, and safe behavior when no sampler or a typed-nil sampler is configured. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
2 parents 29011bb + 0c89321 commit 5b3169f

3 files changed

Lines changed: 198 additions & 1 deletion

File tree

keyviz/sampler.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,19 @@ const (
5858
// Sampler is the narrow interface the coordinator depends on. The
5959
// nil-safe contract is documented per-method so a coordinator wired
6060
// without a sampler compiles to a no-op call.
61+
//
62+
// Implementations MUST be nil-receiver-safe: a typed-nil
63+
// implementation passed through this interface (e.g.
64+
// `var s Sampler = (*MemSampler)(nil)`) must not panic when its
65+
// methods are called. The coordinator stores the interface value as
66+
// supplied and dispatches through it on the hot path; a guard at the
67+
// call site only checks for an interface-nil, not a typed-nil.
6168
type Sampler interface {
6269
// Observe records a single request against a route. Op identifies
6370
// the counter family. keyLen and valueLen are summed into the
6471
// matching *Bytes counter; pass 0 for read-only ops where the
65-
// payload size is irrelevant.
72+
// payload size is irrelevant. Implementations must no-op (not
73+
// panic) when invoked on a typed-nil receiver.
6674
Observe(routeID uint64, op Op, keyLen, valueLen int)
6775
}
6876

kv/sharded_coordinator.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/bootjp/elastickv/distribution"
1414
"github.com/bootjp/elastickv/internal/monoclock"
1515
"github.com/bootjp/elastickv/internal/raftengine"
16+
"github.com/bootjp/elastickv/keyviz"
1617
pb "github.com/bootjp/elastickv/proto"
1718
"github.com/bootjp/elastickv/store"
1819
"github.com/cockroachdb/errors"
@@ -135,6 +136,12 @@ type ShardedCoordinator struct {
135136
// leaseObserver records lease-read hit/miss for every shard the
136137
// coordinator owns. Nil-safe; see Coordinate.leaseObserver.
137138
leaseObserver LeaseReadObserver
139+
// sampler counts requests per RouteID for the key visualizer
140+
// heatmap. Nil-safe at the call site; the implementation
141+
// (keyviz.MemSampler) also tolerates a typed-nil receiver, so a
142+
// disabled keyviz wires through to a no-op without branching on
143+
// the hot path.
144+
sampler keyviz.Sampler
138145
}
139146

140147
// WithLeaseReadObserver wires a LeaseReadObserver onto a
@@ -148,6 +155,22 @@ func (c *ShardedCoordinator) WithLeaseReadObserver(observer LeaseReadObserver) *
148155
return c
149156
}
150157

158+
// WithSampler wires a keyviz.Sampler onto a ShardedCoordinator. The
159+
// coordinator calls sampler.Observe at dispatch entry — once per
160+
// resolved (RouteID, mutation key) pair — to feed the key visualizer
161+
// heatmap (design doc §5.1). Applied after construction for the same
162+
// reason as WithLeaseReadObserver: NewShardedCoordinator is already
163+
// heavily overloaded.
164+
//
165+
// Passing a nil interface value is supported and disables sampling
166+
// (the call site guards against it). Passing a typed-nil
167+
// *keyviz.MemSampler also works because Observe is nil-safe by
168+
// contract.
169+
func (c *ShardedCoordinator) WithSampler(s keyviz.Sampler) *ShardedCoordinator {
170+
c.sampler = s
171+
return c
172+
}
173+
151174
// NewShardedCoordinator builds a coordinator for the provided shard groups.
152175
// The defaultGroup is used for non-keyed leader checks.
153176
func NewShardedCoordinator(engine *distribution.Engine, groups map[uint64]*ShardGroup, defaultGroup uint64, clock *HLC, st store.MVCCStore) *ShardedCoordinator {
@@ -952,6 +975,18 @@ func (c *ShardedCoordinator) txnLogs(reqs *OperationGroup[OP]) ([]*pb.Request, e
952975
return buildTxnLogs(reqs.StartTS, commitTS, grouped, gids)
953976
}
954977

978+
// observeMutation: reads never reach this path; the early return
979+
// keeps the disabled-keyviz hot path allocation-free. Counted
980+
// pre-commit, so a mutation that subsequently fails its Raft
981+
// proposal is still recorded — the heatmap reflects offered load,
982+
// not just committed writes (intentional for traffic visualisation).
983+
func (c *ShardedCoordinator) observeMutation(routeID uint64, mut *pb.Mutation) {
984+
if c.sampler == nil {
985+
return
986+
}
987+
c.sampler.Observe(routeID, keyviz.OpWrite, len(mut.Key), len(mut.Value))
988+
}
989+
955990
func (c *ShardedCoordinator) groupMutations(reqs []*Elem[OP]) (map[uint64][]*pb.Mutation, []uint64, error) {
956991
grouped := make(map[uint64][]*pb.Mutation)
957992
for _, req := range reqs {
@@ -963,6 +998,7 @@ func (c *ShardedCoordinator) groupMutations(reqs []*Elem[OP]) (map[uint64][]*pb.
963998
if !ok {
964999
return nil, nil, errors.Wrapf(ErrInvalidRequest, "no route for key %q", mut.Key)
9651000
}
1001+
c.observeMutation(route.RouteID, mut)
9661002
grouped[route.GroupID] = append(grouped[route.GroupID], mut)
9671003
}
9681004
gids := make([]uint64, 0, len(grouped))
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
package kv
2+
3+
import (
4+
"context"
5+
"sync"
6+
"testing"
7+
8+
"github.com/bootjp/elastickv/distribution"
9+
"github.com/bootjp/elastickv/keyviz"
10+
"github.com/bootjp/elastickv/store"
11+
"github.com/stretchr/testify/require"
12+
)
13+
14+
// recordingSampler is a keyviz.Sampler that records every Observe
15+
// call so tests can assert dispatch wiring fires once per resolved
16+
// (RouteID, mutation key) pair.
17+
type recordingSampler struct {
18+
mu sync.Mutex
19+
calls []sampleCall
20+
}
21+
22+
type sampleCall struct {
23+
routeID uint64
24+
op keyviz.Op
25+
keyLen int
26+
valueLen int
27+
}
28+
29+
func (r *recordingSampler) Observe(routeID uint64, op keyviz.Op, keyLen, valueLen int) {
30+
r.mu.Lock()
31+
defer r.mu.Unlock()
32+
r.calls = append(r.calls, sampleCall{routeID: routeID, op: op, keyLen: keyLen, valueLen: valueLen})
33+
}
34+
35+
func (r *recordingSampler) snapshot() []sampleCall {
36+
r.mu.Lock()
37+
defer r.mu.Unlock()
38+
out := make([]sampleCall, len(r.calls))
39+
copy(out, r.calls)
40+
return out
41+
}
42+
43+
// TestShardedCoordinatorObservesEveryDispatchedMutation pins the
44+
// keyviz wiring contract: every successfully-routed mutation in a
45+
// non-txn dispatch produces exactly one Observe call carrying the
46+
// resolved RouteID, OpWrite, and the mutation's key/value lengths.
47+
func TestShardedCoordinatorObservesEveryDispatchedMutation(t *testing.T) {
48+
t.Parallel()
49+
ctx := context.Background()
50+
51+
engine := distribution.NewEngine()
52+
engine.UpdateRoute([]byte("a"), []byte("m"), 1)
53+
engine.UpdateRoute([]byte("m"), nil, 2)
54+
55+
s1 := store.NewMVCCStore()
56+
r1, stop1 := newSingleRaft(t, "kv-sampler-g1", NewKvFSMWithHLC(s1, NewHLC()))
57+
t.Cleanup(stop1)
58+
s2 := store.NewMVCCStore()
59+
r2, stop2 := newSingleRaft(t, "kv-sampler-g2", NewKvFSMWithHLC(s2, NewHLC()))
60+
t.Cleanup(stop2)
61+
62+
groups := map[uint64]*ShardGroup{
63+
1: {Engine: r1, Store: s1, Txn: NewLeaderProxyWithEngine(r1)},
64+
2: {Engine: r2, Store: s2, Txn: NewLeaderProxyWithEngine(r2)},
65+
}
66+
shardStore := NewShardStore(engine, groups)
67+
68+
rec := &recordingSampler{}
69+
coord := NewShardedCoordinator(engine, groups, 1, NewHLC(), shardStore).WithSampler(rec)
70+
71+
// Cross-shard non-txn dispatch: "b" → group 1, "x" → group 2.
72+
ops := &OperationGroup[OP]{
73+
Elems: []*Elem[OP]{
74+
{Op: Put, Key: []byte("b"), Value: []byte("val-b")},
75+
{Op: Put, Key: []byte("x"), Value: []byte("val-x-longer")},
76+
},
77+
}
78+
_, err := coord.Dispatch(ctx, ops)
79+
require.NoError(t, err)
80+
81+
calls := rec.snapshot()
82+
require.Len(t, calls, 2, "expected one Observe per mutation")
83+
84+
// groupMutations iterates reqs in order, so call[i] matches
85+
// elem[i]. Resolve via routeKey(elem.Key) so the test mirrors
86+
// production's routing transform — important for keys that
87+
// normalize through internal-prefix handling.
88+
for i, elem := range ops.Elems {
89+
route, ok := engine.GetRoute(routeKey(elem.Key))
90+
require.True(t, ok)
91+
require.Equal(t, sampleCall{
92+
routeID: route.RouteID,
93+
op: keyviz.OpWrite,
94+
keyLen: len(elem.Key),
95+
valueLen: len(elem.Value),
96+
}, calls[i], "Observe call %d for key %q", i, elem.Key)
97+
}
98+
}
99+
100+
// TestShardedCoordinatorWithoutSamplerStaysSafe pins the nil-safe
101+
// contract: a coordinator without WithSampler (interface-nil
102+
// c.sampler) and one wired with a typed-nil *MemSampler must both
103+
// dispatch successfully without observing anything. The "no
104+
// WithSampler" subcase additionally asserts c.sampler stays the
105+
// zero interface value so a future refactor that silently
106+
// initialises the field would fail this guard.
107+
func TestShardedCoordinatorWithoutSamplerStaysSafe(t *testing.T) {
108+
t.Parallel()
109+
ctx := context.Background()
110+
111+
for _, tc := range []struct {
112+
name string
113+
opt func(*ShardedCoordinator) *ShardedCoordinator
114+
wantNilField bool
115+
}{
116+
{
117+
name: "no WithSampler call",
118+
opt: func(c *ShardedCoordinator) *ShardedCoordinator { return c },
119+
wantNilField: true,
120+
},
121+
{
122+
name: "typed-nil *MemSampler",
123+
opt: func(c *ShardedCoordinator) *ShardedCoordinator {
124+
return c.WithSampler((*keyviz.MemSampler)(nil))
125+
},
126+
wantNilField: false,
127+
},
128+
} {
129+
t.Run(tc.name, func(t *testing.T) {
130+
t.Parallel()
131+
engine := distribution.NewEngine()
132+
engine.UpdateRoute([]byte("a"), nil, 1)
133+
134+
s1 := store.NewMVCCStore()
135+
r1, stop1 := newSingleRaft(t, "kv-sampler-nilsafe-"+tc.name, NewKvFSMWithHLC(s1, NewHLC()))
136+
t.Cleanup(stop1)
137+
groups := map[uint64]*ShardGroup{
138+
1: {Engine: r1, Store: s1, Txn: NewLeaderProxyWithEngine(r1)},
139+
}
140+
coord := tc.opt(NewShardedCoordinator(engine, groups, 1, NewHLC(), NewShardStore(engine, groups)))
141+
142+
if tc.wantNilField {
143+
require.Nil(t, coord.sampler, "expected sampler field to be unset when WithSampler is never called")
144+
}
145+
146+
ops := &OperationGroup[OP]{
147+
Elems: []*Elem[OP]{{Op: Put, Key: []byte("b"), Value: []byte("v")}},
148+
}
149+
_, err := coord.Dispatch(ctx, ops)
150+
require.NoError(t, err)
151+
})
152+
}
153+
}

0 commit comments

Comments
 (0)