diff --git a/keyviz/sampler.go b/keyviz/sampler.go index 0d816c08f..d74404d09 100644 --- a/keyviz/sampler.go +++ b/keyviz/sampler.go @@ -58,11 +58,19 @@ const ( // Sampler is the narrow interface the coordinator depends on. The // nil-safe contract is documented per-method so a coordinator wired // without a sampler compiles to a no-op call. +// +// Implementations MUST be nil-receiver-safe: a typed-nil +// implementation passed through this interface (e.g. +// `var s Sampler = (*MemSampler)(nil)`) must not panic when its +// methods are called. The coordinator stores the interface value as +// supplied and dispatches through it on the hot path; a guard at the +// call site only checks for an interface-nil, not a typed-nil. type Sampler interface { // Observe records a single request against a route. Op identifies // the counter family. keyLen and valueLen are summed into the // matching *Bytes counter; pass 0 for read-only ops where the - // payload size is irrelevant. + // payload size is irrelevant. Implementations must no-op (not + // panic) when invoked on a typed-nil receiver. Observe(routeID uint64, op Op, keyLen, valueLen int) } diff --git a/kv/sharded_coordinator.go b/kv/sharded_coordinator.go index de8c02fd8..6a728e264 100644 --- a/kv/sharded_coordinator.go +++ b/kv/sharded_coordinator.go @@ -13,6 +13,7 @@ import ( "github.com/bootjp/elastickv/distribution" "github.com/bootjp/elastickv/internal/monoclock" "github.com/bootjp/elastickv/internal/raftengine" + "github.com/bootjp/elastickv/keyviz" pb "github.com/bootjp/elastickv/proto" "github.com/bootjp/elastickv/store" "github.com/cockroachdb/errors" @@ -135,6 +136,12 @@ type ShardedCoordinator struct { // leaseObserver records lease-read hit/miss for every shard the // coordinator owns. Nil-safe; see Coordinate.leaseObserver. leaseObserver LeaseReadObserver + // sampler counts requests per RouteID for the key visualizer + // heatmap. Nil-safe at the call site; the implementation + // (keyviz.MemSampler) also tolerates a typed-nil receiver, so a + // disabled keyviz wires through to a no-op without branching on + // the hot path. + sampler keyviz.Sampler } // WithLeaseReadObserver wires a LeaseReadObserver onto a @@ -148,6 +155,22 @@ func (c *ShardedCoordinator) WithLeaseReadObserver(observer LeaseReadObserver) * return c } +// WithSampler wires a keyviz.Sampler onto a ShardedCoordinator. The +// coordinator calls sampler.Observe at dispatch entry — once per +// resolved (RouteID, mutation key) pair — to feed the key visualizer +// heatmap (design doc §5.1). Applied after construction for the same +// reason as WithLeaseReadObserver: NewShardedCoordinator is already +// heavily overloaded. +// +// Passing a nil interface value is supported and disables sampling +// (the call site guards against it). Passing a typed-nil +// *keyviz.MemSampler also works because Observe is nil-safe by +// contract. +func (c *ShardedCoordinator) WithSampler(s keyviz.Sampler) *ShardedCoordinator { + c.sampler = s + return c +} + // NewShardedCoordinator builds a coordinator for the provided shard groups. // The defaultGroup is used for non-keyed leader checks. func NewShardedCoordinator(engine *distribution.Engine, groups map[uint64]*ShardGroup, defaultGroup uint64, clock *HLC, st store.MVCCStore) *ShardedCoordinator { @@ -952,6 +975,18 @@ func (c *ShardedCoordinator) txnLogs(reqs *OperationGroup[OP]) ([]*pb.Request, e return buildTxnLogs(reqs.StartTS, commitTS, grouped, gids) } +// observeMutation: reads never reach this path; the early return +// keeps the disabled-keyviz hot path allocation-free. Counted +// pre-commit, so a mutation that subsequently fails its Raft +// proposal is still recorded — the heatmap reflects offered load, +// not just committed writes (intentional for traffic visualisation). +func (c *ShardedCoordinator) observeMutation(routeID uint64, mut *pb.Mutation) { + if c.sampler == nil { + return + } + c.sampler.Observe(routeID, keyviz.OpWrite, len(mut.Key), len(mut.Value)) +} + func (c *ShardedCoordinator) groupMutations(reqs []*Elem[OP]) (map[uint64][]*pb.Mutation, []uint64, error) { grouped := make(map[uint64][]*pb.Mutation) for _, req := range reqs { @@ -963,6 +998,7 @@ func (c *ShardedCoordinator) groupMutations(reqs []*Elem[OP]) (map[uint64][]*pb. if !ok { return nil, nil, errors.Wrapf(ErrInvalidRequest, "no route for key %q", mut.Key) } + c.observeMutation(route.RouteID, mut) grouped[route.GroupID] = append(grouped[route.GroupID], mut) } gids := make([]uint64, 0, len(grouped)) diff --git a/kv/sharded_coordinator_sampler_test.go b/kv/sharded_coordinator_sampler_test.go new file mode 100644 index 000000000..97e9e1068 --- /dev/null +++ b/kv/sharded_coordinator_sampler_test.go @@ -0,0 +1,153 @@ +package kv + +import ( + "context" + "sync" + "testing" + + "github.com/bootjp/elastickv/distribution" + "github.com/bootjp/elastickv/keyviz" + "github.com/bootjp/elastickv/store" + "github.com/stretchr/testify/require" +) + +// recordingSampler is a keyviz.Sampler that records every Observe +// call so tests can assert dispatch wiring fires once per resolved +// (RouteID, mutation key) pair. +type recordingSampler struct { + mu sync.Mutex + calls []sampleCall +} + +type sampleCall struct { + routeID uint64 + op keyviz.Op + keyLen int + valueLen int +} + +func (r *recordingSampler) Observe(routeID uint64, op keyviz.Op, keyLen, valueLen int) { + r.mu.Lock() + defer r.mu.Unlock() + r.calls = append(r.calls, sampleCall{routeID: routeID, op: op, keyLen: keyLen, valueLen: valueLen}) +} + +func (r *recordingSampler) snapshot() []sampleCall { + r.mu.Lock() + defer r.mu.Unlock() + out := make([]sampleCall, len(r.calls)) + copy(out, r.calls) + return out +} + +// TestShardedCoordinatorObservesEveryDispatchedMutation pins the +// keyviz wiring contract: every successfully-routed mutation in a +// non-txn dispatch produces exactly one Observe call carrying the +// resolved RouteID, OpWrite, and the mutation's key/value lengths. +func TestShardedCoordinatorObservesEveryDispatchedMutation(t *testing.T) { + t.Parallel() + ctx := context.Background() + + engine := distribution.NewEngine() + engine.UpdateRoute([]byte("a"), []byte("m"), 1) + engine.UpdateRoute([]byte("m"), nil, 2) + + s1 := store.NewMVCCStore() + r1, stop1 := newSingleRaft(t, "kv-sampler-g1", NewKvFSMWithHLC(s1, NewHLC())) + t.Cleanup(stop1) + s2 := store.NewMVCCStore() + r2, stop2 := newSingleRaft(t, "kv-sampler-g2", NewKvFSMWithHLC(s2, NewHLC())) + t.Cleanup(stop2) + + groups := map[uint64]*ShardGroup{ + 1: {Engine: r1, Store: s1, Txn: NewLeaderProxyWithEngine(r1)}, + 2: {Engine: r2, Store: s2, Txn: NewLeaderProxyWithEngine(r2)}, + } + shardStore := NewShardStore(engine, groups) + + rec := &recordingSampler{} + coord := NewShardedCoordinator(engine, groups, 1, NewHLC(), shardStore).WithSampler(rec) + + // Cross-shard non-txn dispatch: "b" → group 1, "x" → group 2. + ops := &OperationGroup[OP]{ + Elems: []*Elem[OP]{ + {Op: Put, Key: []byte("b"), Value: []byte("val-b")}, + {Op: Put, Key: []byte("x"), Value: []byte("val-x-longer")}, + }, + } + _, err := coord.Dispatch(ctx, ops) + require.NoError(t, err) + + calls := rec.snapshot() + require.Len(t, calls, 2, "expected one Observe per mutation") + + // groupMutations iterates reqs in order, so call[i] matches + // elem[i]. Resolve via routeKey(elem.Key) so the test mirrors + // production's routing transform — important for keys that + // normalize through internal-prefix handling. + for i, elem := range ops.Elems { + route, ok := engine.GetRoute(routeKey(elem.Key)) + require.True(t, ok) + require.Equal(t, sampleCall{ + routeID: route.RouteID, + op: keyviz.OpWrite, + keyLen: len(elem.Key), + valueLen: len(elem.Value), + }, calls[i], "Observe call %d for key %q", i, elem.Key) + } +} + +// TestShardedCoordinatorWithoutSamplerStaysSafe pins the nil-safe +// contract: a coordinator without WithSampler (interface-nil +// c.sampler) and one wired with a typed-nil *MemSampler must both +// dispatch successfully without observing anything. The "no +// WithSampler" subcase additionally asserts c.sampler stays the +// zero interface value so a future refactor that silently +// initialises the field would fail this guard. +func TestShardedCoordinatorWithoutSamplerStaysSafe(t *testing.T) { + t.Parallel() + ctx := context.Background() + + for _, tc := range []struct { + name string + opt func(*ShardedCoordinator) *ShardedCoordinator + wantNilField bool + }{ + { + name: "no WithSampler call", + opt: func(c *ShardedCoordinator) *ShardedCoordinator { return c }, + wantNilField: true, + }, + { + name: "typed-nil *MemSampler", + opt: func(c *ShardedCoordinator) *ShardedCoordinator { + return c.WithSampler((*keyviz.MemSampler)(nil)) + }, + wantNilField: false, + }, + } { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + engine := distribution.NewEngine() + engine.UpdateRoute([]byte("a"), nil, 1) + + s1 := store.NewMVCCStore() + r1, stop1 := newSingleRaft(t, "kv-sampler-nilsafe-"+tc.name, NewKvFSMWithHLC(s1, NewHLC())) + t.Cleanup(stop1) + groups := map[uint64]*ShardGroup{ + 1: {Engine: r1, Store: s1, Txn: NewLeaderProxyWithEngine(r1)}, + } + coord := tc.opt(NewShardedCoordinator(engine, groups, 1, NewHLC(), NewShardStore(engine, groups))) + + if tc.wantNilField { + require.Nil(t, coord.sampler, "expected sampler field to be unset when WithSampler is never called") + } + + ops := &OperationGroup[OP]{ + Elems: []*Elem[OP]{{Op: Put, Key: []byte("b"), Value: []byte("v")}}, + } + _, err := coord.Dispatch(ctx, ops) + require.NoError(t, err) + }) + } +}