Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion keyviz/sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,19 @@ const (
// Sampler is the narrow interface the coordinator depends on. The
// nil-safe contract is documented per-method so a coordinator wired
// without a sampler compiles to a no-op call.
//
// Implementations MUST be nil-receiver-safe: a typed-nil
// implementation passed through this interface (e.g.
// `var s Sampler = (*MemSampler)(nil)`) must not panic when its
// methods are called. The coordinator stores the interface value as
// supplied and dispatches through it on the hot path; a guard at the
// call site only checks for an interface-nil, not a typed-nil.
type Sampler interface {
// Observe records a single request against a route. Op identifies
// the counter family. keyLen and valueLen are summed into the
// matching *Bytes counter; pass 0 for read-only ops where the
// payload size is irrelevant.
// payload size is irrelevant. Implementations must no-op (not
// panic) when invoked on a typed-nil receiver.
Observe(routeID uint64, op Op, keyLen, valueLen int)
}

Expand Down
36 changes: 36 additions & 0 deletions kv/sharded_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/bootjp/elastickv/distribution"
"github.com/bootjp/elastickv/internal/monoclock"
"github.com/bootjp/elastickv/internal/raftengine"
"github.com/bootjp/elastickv/keyviz"
pb "github.com/bootjp/elastickv/proto"
"github.com/bootjp/elastickv/store"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -135,6 +136,12 @@ type ShardedCoordinator struct {
// leaseObserver records lease-read hit/miss for every shard the
// coordinator owns. Nil-safe; see Coordinate.leaseObserver.
leaseObserver LeaseReadObserver
// sampler counts requests per RouteID for the key visualizer
// heatmap. Nil-safe at the call site; the implementation
// (keyviz.MemSampler) also tolerates a typed-nil receiver, so a
// disabled keyviz wires through to a no-op without branching on
// the hot path.
sampler keyviz.Sampler
}

// WithLeaseReadObserver wires a LeaseReadObserver onto a
Expand All @@ -148,6 +155,22 @@ func (c *ShardedCoordinator) WithLeaseReadObserver(observer LeaseReadObserver) *
return c
}

// WithSampler wires a keyviz.Sampler onto a ShardedCoordinator. The
// coordinator calls sampler.Observe at dispatch entry — once per
// resolved (RouteID, mutation key) pair — to feed the key visualizer
// heatmap (design doc §5.1). Applied after construction for the same
// reason as WithLeaseReadObserver: NewShardedCoordinator is already
// heavily overloaded.
//
// Passing a nil interface value is supported and disables sampling
// (the call site guards against it). Passing a typed-nil
// *keyviz.MemSampler also works because Observe is nil-safe by
// contract.
func (c *ShardedCoordinator) WithSampler(s keyviz.Sampler) *ShardedCoordinator {
c.sampler = s
return c
}

// NewShardedCoordinator builds a coordinator for the provided shard groups.
// The defaultGroup is used for non-keyed leader checks.
func NewShardedCoordinator(engine *distribution.Engine, groups map[uint64]*ShardGroup, defaultGroup uint64, clock *HLC, st store.MVCCStore) *ShardedCoordinator {
Expand Down Expand Up @@ -952,6 +975,18 @@ func (c *ShardedCoordinator) txnLogs(reqs *OperationGroup[OP]) ([]*pb.Request, e
return buildTxnLogs(reqs.StartTS, commitTS, grouped, gids)
}

// observeMutation: reads never reach this path; the early return
// keeps the disabled-keyviz hot path allocation-free. Counted
// pre-commit, so a mutation that subsequently fails its Raft
// proposal is still recorded — the heatmap reflects offered load,
// not just committed writes (intentional for traffic visualisation).
func (c *ShardedCoordinator) observeMutation(routeID uint64, mut *pb.Mutation) {
if c.sampler == nil {
return
}
c.sampler.Observe(routeID, keyviz.OpWrite, len(mut.Key), len(mut.Value))
}

func (c *ShardedCoordinator) groupMutations(reqs []*Elem[OP]) (map[uint64][]*pb.Mutation, []uint64, error) {
grouped := make(map[uint64][]*pb.Mutation)
for _, req := range reqs {
Expand All @@ -963,6 +998,7 @@ func (c *ShardedCoordinator) groupMutations(reqs []*Elem[OP]) (map[uint64][]*pb.
if !ok {
return nil, nil, errors.Wrapf(ErrInvalidRequest, "no route for key %q", mut.Key)
}
c.observeMutation(route.RouteID, mut)
grouped[route.GroupID] = append(grouped[route.GroupID], mut)
}
gids := make([]uint64, 0, len(grouped))
Expand Down
153 changes: 153 additions & 0 deletions kv/sharded_coordinator_sampler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package kv

import (
"context"
"sync"
"testing"

"github.com/bootjp/elastickv/distribution"
"github.com/bootjp/elastickv/keyviz"
"github.com/bootjp/elastickv/store"
"github.com/stretchr/testify/require"
)

// recordingSampler is a keyviz.Sampler that records every Observe
// call so tests can assert dispatch wiring fires once per resolved
// (RouteID, mutation key) pair.
type recordingSampler struct {
mu sync.Mutex
calls []sampleCall
}

type sampleCall struct {
routeID uint64
op keyviz.Op
keyLen int
valueLen int
}

func (r *recordingSampler) Observe(routeID uint64, op keyviz.Op, keyLen, valueLen int) {
r.mu.Lock()
defer r.mu.Unlock()
r.calls = append(r.calls, sampleCall{routeID: routeID, op: op, keyLen: keyLen, valueLen: valueLen})
}

func (r *recordingSampler) snapshot() []sampleCall {
r.mu.Lock()
defer r.mu.Unlock()
out := make([]sampleCall, len(r.calls))
copy(out, r.calls)
return out
}

// TestShardedCoordinatorObservesEveryDispatchedMutation pins the
// keyviz wiring contract: every successfully-routed mutation in a
// non-txn dispatch produces exactly one Observe call carrying the
// resolved RouteID, OpWrite, and the mutation's key/value lengths.
func TestShardedCoordinatorObservesEveryDispatchedMutation(t *testing.T) {
t.Parallel()
ctx := context.Background()

engine := distribution.NewEngine()
engine.UpdateRoute([]byte("a"), []byte("m"), 1)
engine.UpdateRoute([]byte("m"), nil, 2)

s1 := store.NewMVCCStore()
r1, stop1 := newSingleRaft(t, "kv-sampler-g1", NewKvFSMWithHLC(s1, NewHLC()))
t.Cleanup(stop1)
s2 := store.NewMVCCStore()
r2, stop2 := newSingleRaft(t, "kv-sampler-g2", NewKvFSMWithHLC(s2, NewHLC()))
t.Cleanup(stop2)

groups := map[uint64]*ShardGroup{
1: {Engine: r1, Store: s1, Txn: NewLeaderProxyWithEngine(r1)},
2: {Engine: r2, Store: s2, Txn: NewLeaderProxyWithEngine(r2)},
}
shardStore := NewShardStore(engine, groups)

rec := &recordingSampler{}
coord := NewShardedCoordinator(engine, groups, 1, NewHLC(), shardStore).WithSampler(rec)

// Cross-shard non-txn dispatch: "b" → group 1, "x" → group 2.
ops := &OperationGroup[OP]{
Elems: []*Elem[OP]{
{Op: Put, Key: []byte("b"), Value: []byte("val-b")},
{Op: Put, Key: []byte("x"), Value: []byte("val-x-longer")},
},
}
_, err := coord.Dispatch(ctx, ops)
require.NoError(t, err)

calls := rec.snapshot()
require.Len(t, calls, 2, "expected one Observe per mutation")

// groupMutations iterates reqs in order, so call[i] matches
// elem[i]. Resolve via routeKey(elem.Key) so the test mirrors
// production's routing transform — important for keys that
// normalize through internal-prefix handling.
for i, elem := range ops.Elems {
route, ok := engine.GetRoute(routeKey(elem.Key))
require.True(t, ok)
require.Equal(t, sampleCall{
routeID: route.RouteID,
op: keyviz.OpWrite,
keyLen: len(elem.Key),
valueLen: len(elem.Value),
}, calls[i], "Observe call %d for key %q", i, elem.Key)
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

// TestShardedCoordinatorWithoutSamplerStaysSafe pins the nil-safe
// contract: a coordinator without WithSampler (interface-nil
// c.sampler) and one wired with a typed-nil *MemSampler must both
// dispatch successfully without observing anything. The "no
// WithSampler" subcase additionally asserts c.sampler stays the
// zero interface value so a future refactor that silently
// initialises the field would fail this guard.
func TestShardedCoordinatorWithoutSamplerStaysSafe(t *testing.T) {
t.Parallel()
ctx := context.Background()

for _, tc := range []struct {
name string
opt func(*ShardedCoordinator) *ShardedCoordinator
wantNilField bool
}{
{
name: "no WithSampler call",
opt: func(c *ShardedCoordinator) *ShardedCoordinator { return c },
wantNilField: true,
},
{
name: "typed-nil *MemSampler",
opt: func(c *ShardedCoordinator) *ShardedCoordinator {
return c.WithSampler((*keyviz.MemSampler)(nil))
},
wantNilField: false,
},
} {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
engine := distribution.NewEngine()
engine.UpdateRoute([]byte("a"), nil, 1)

s1 := store.NewMVCCStore()
r1, stop1 := newSingleRaft(t, "kv-sampler-nilsafe-"+tc.name, NewKvFSMWithHLC(s1, NewHLC()))
t.Cleanup(stop1)
groups := map[uint64]*ShardGroup{
1: {Engine: r1, Store: s1, Txn: NewLeaderProxyWithEngine(r1)},
}
coord := tc.opt(NewShardedCoordinator(engine, groups, 1, NewHLC(), NewShardStore(engine, groups)))

if tc.wantNilField {
require.Nil(t, coord.sampler, "expected sampler field to be unset when WithSampler is never called")
}

ops := &OperationGroup[OP]{
Elems: []*Elem[OP]{{Op: Put, Key: []byte("b"), Value: []byte("v")}},
}
_, err := coord.Dispatch(ctx, ops)
require.NoError(t, err)
})
}
}
Loading