Skip to content

Commit b9d24f6

Browse files
authored
feat(main): wire keyviz sampler end-to-end (flags, RunFlusher, coordinator + admin) (#647)
## Summary End-to-end keyviz wiring on each node binary. Stacked on top of #645 (coordinator `WithSampler`) and #646 (admin server `RegisterSampler`) — those two PRs are merged into this branch via merge commits, so this PR's incremental diff is `main.go` + `main_keyviz_test.go`. - Three new flags: `--keyvizEnabled` (off by default — opt-in), `--keyvizStep`, `--keyvizMaxTrackedRoutes`, `--keyvizMaxMemberRoutesPerSlot`. Defaults pull from `keyviz.Default*` so flag help and code stay in sync. - `buildKeyVizSampler()` returns `*keyviz.MemSampler` when enabled, `nil` otherwise — the single decision point for "keyviz on/off." - `seedKeyVizRoutes()` copies the engine's startup route catalogue into the sampler. Route-watch propagation post-startup is a follow-up (Phase 3 in the design doc). - `startKeyVizFlusher()` runs `RunFlusher` in the existing errgroup and calls one final `Flush()` after ctx fires so the in-progress step is harvested at graceful shutdown. - Coordinator: chained `.WithSampler(...)` onto the existing `.WithLeaseReadObserver(...)` call so the dispatch hot path observes mutations. - AdminServer: `setupAdminService` now takes the `*MemSampler` and only calls `RegisterSampler` when it's non-nil — operators with keyviz disabled get `codes.Unavailable` on `GetKeyVizMatrix` instead of a spurious empty success. Implements `docs/admin_ui_key_visualizer_design.md` §5 / §10. After this lands the heatmap is fully usable end-to-end. ## Test plan - [x] `TestBuildKeyVizSamplerHonorsEnabledFlag` — flag on/off contract. - [x] `TestSeedKeyVizRoutesCopiesEngineCatalogue` — `engine.Stats()` seed path. - [x] `TestSeedKeyVizRoutesNoOpOnNilSampler` — disabled-sampler safety. - [x] `TestStartKeyVizFlusherReturnsAfterCancel` — graceful shutdown drains the pre-cancel counters via the final `Flush`. - [x] `go build .`, `go vet .`, `golangci-lint run ./...` clean. - [x] `go test -race -count=1 -run 'TestBuildKeyVizSampler|TestSeedKeyVizRoutes|TestStartKeyVizFlusher' .` passes. ## Merge order 1. Merge #645 (coordinator wiring). 2. Merge #646 (admin server wiring). 3. Rebase this branch on `main` (the merge commits in this PR collapse to no-ops) and merge.
2 parents 5b3169f + 7e14751 commit b9d24f6

5 files changed

Lines changed: 704 additions & 27 deletions

File tree

adapter/admin_grpc.go

Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
package adapter
22

33
import (
4+
"bytes"
45
"context"
56
"crypto/subtle"
67
"sort"
8+
"strconv"
79
"strings"
810
"sync"
911
"time"
1012

1113
"github.com/bootjp/elastickv/internal/raftengine"
14+
"github.com/bootjp/elastickv/keyviz"
1215
pb "github.com/bootjp/elastickv/proto"
1316
"github.com/cockroachdb/errors"
1417
"google.golang.org/grpc"
@@ -17,6 +20,19 @@ import (
1720
"google.golang.org/grpc/status"
1821
)
1922

23+
// KeyVizSampler is the read-side abstraction the Admin service needs
24+
// from the keyviz package: a time-bounded matrix snapshot. Defined
25+
// here (not in keyviz) so tests can pass an in-memory fake without
26+
// constructing a full *keyviz.MemSampler. *keyviz.MemSampler
27+
// satisfies this interface.
28+
type KeyVizSampler interface {
29+
// Snapshot returns the matrix columns in [from, to). Either
30+
// bound may be the zero time meaning unbounded on that side.
31+
// Implementations must return rows the caller can mutate freely
32+
// (a deep copy) — see keyviz.MemSampler.Snapshot.
33+
Snapshot(from, to time.Time) []keyviz.MatrixColumn
34+
}
35+
2036
// AdminGroup exposes per-Raft-group state to the Admin service. It is a narrow
2137
// subset of raftengine.Engine so tests can supply an in-memory fake without
2238
// standing up a real Raft cluster. Configuration is polled on each
@@ -56,6 +72,12 @@ type AdminServer struct {
5672
// instance cannot contend with concurrent RPCs on another instance.
5773
now func() time.Time
5874

75+
// sampler exposes the keyviz heatmap matrix to GetKeyVizMatrix.
76+
// Nil means keyviz is disabled — the RPC returns Unavailable.
77+
// Guarded by groupsMu (same lock as groups/now) so RegisterSampler
78+
// pairs atomically with concurrent RPC reads.
79+
sampler KeyVizSampler
80+
5981
pb.UnimplementedAdminServer
6082
}
6183

@@ -97,6 +119,16 @@ func (s *AdminServer) RegisterGroup(groupID uint64, g AdminGroup) {
97119
s.groupsMu.Unlock()
98120
}
99121

122+
// RegisterSampler wires the keyviz sampler used by GetKeyVizMatrix.
123+
// Without this call (or with a nil sampler) the RPC returns
124+
// codes.Unavailable so callers can distinguish "keyviz disabled"
125+
// from "no data yet".
126+
func (s *AdminServer) RegisterSampler(sampler KeyVizSampler) {
127+
s.groupsMu.Lock()
128+
s.sampler = sampler
129+
s.groupsMu.Unlock()
130+
}
131+
100132
// GetClusterOverview returns the local node identity, the current member
101133
// list, and per-group leader identity collected from the engines registered
102134
// via RegisterGroup. The member list is the union of (a) the bootstrap seed
@@ -477,3 +509,166 @@ func AdminTokenAuth(token string) (grpc.UnaryServerInterceptor, grpc.StreamServe
477509
// ErrAdminTokenRequired is returned by NewAdminServer helpers when the operator
478510
// failed to supply a token and also did not opt into insecure mode.
479511
var ErrAdminTokenRequired = errors.New("admin token file required; pass --adminInsecureNoAuth to run without")
512+
513+
// GetKeyVizMatrix renders the keyviz heatmap matrix for the [from, to)
514+
// range supplied by the request, returning one KeyVizRow per tracked
515+
// route or virtual bucket and a parallel column-timestamp slice.
516+
//
517+
// Series selection (Reads / Writes / ReadBytes / WriteBytes) maps from
518+
// the request's KeyVizSeries enum to the matching keyviz.MatrixRow
519+
// counter; KEYVIZ_SERIES_UNSPECIFIED defaults to Reads.
520+
//
521+
// Returns codes.Unavailable when no sampler is registered (keyviz
522+
// disabled) so callers can distinguish that from "no data yet"
523+
// (which yields a successful empty response).
524+
func (s *AdminServer) GetKeyVizMatrix(
525+
_ context.Context,
526+
req *pb.GetKeyVizMatrixRequest,
527+
) (*pb.GetKeyVizMatrixResponse, error) {
528+
s.groupsMu.RLock()
529+
sampler := s.sampler
530+
s.groupsMu.RUnlock()
531+
if sampler == nil {
532+
return nil, errors.WithStack(status.Error(codes.Unavailable, "keyviz sampler not configured on this node"))
533+
}
534+
from := unixMsToTime(req.GetFromUnixMs())
535+
to := unixMsToTime(req.GetToUnixMs())
536+
cols := sampler.Snapshot(from, to)
537+
pickValue := matrixSeriesPicker(req.GetSeries())
538+
return matrixToProto(cols, pickValue, int(req.GetRows())), nil
539+
}
540+
541+
// unixMsToTime converts a Unix-millisecond timestamp into a time.Time,
542+
// returning the zero Time when the input is zero so the sampler reads
543+
// an unbounded range on that side.
544+
func unixMsToTime(ms int64) time.Time {
545+
if ms == 0 {
546+
return time.Time{}
547+
}
548+
return time.UnixMilli(ms)
549+
}
550+
551+
// matrixSeriesPicker returns a callback that extracts the requested
552+
// counter from a MatrixRow. KEYVIZ_SERIES_UNSPECIFIED (and READS)
553+
// fall through to Reads so a default-valued request still returns
554+
// something useful.
555+
func matrixSeriesPicker(series pb.KeyVizSeries) func(keyviz.MatrixRow) uint64 {
556+
switch series {
557+
case pb.KeyVizSeries_KEYVIZ_SERIES_WRITES:
558+
return func(r keyviz.MatrixRow) uint64 { return r.Writes }
559+
case pb.KeyVizSeries_KEYVIZ_SERIES_READ_BYTES:
560+
return func(r keyviz.MatrixRow) uint64 { return r.ReadBytes }
561+
case pb.KeyVizSeries_KEYVIZ_SERIES_WRITE_BYTES:
562+
return func(r keyviz.MatrixRow) uint64 { return r.WriteBytes }
563+
case pb.KeyVizSeries_KEYVIZ_SERIES_UNSPECIFIED, pb.KeyVizSeries_KEYVIZ_SERIES_READS:
564+
return func(r keyviz.MatrixRow) uint64 { return r.Reads }
565+
default:
566+
return func(r keyviz.MatrixRow) uint64 { return r.Reads }
567+
}
568+
}
569+
570+
// matrixToProto pivots the column-major MatrixColumn slice into the
571+
// row-major proto layout: one KeyVizRow per distinct RouteID with a
572+
// values slice aligned to the column_unix_ms parallel slice. Idle
573+
// routes (zero in every column) are not emitted by the sampler, so
574+
// the row set already reflects observed activity in [from, to).
575+
//
576+
// rowBudget caps how many rows the response carries — passing
577+
// 0 means "no cap." When the budget would be exceeded, rows are
578+
// sorted by total activity across the requested series and the
579+
// top-N retained, so callers asking for a compact matrix do not
580+
// receive a payload that scales with the route count.
581+
func matrixToProto(cols []keyviz.MatrixColumn, pick func(keyviz.MatrixRow) uint64, rowBudget int) *pb.GetKeyVizMatrixResponse {
582+
resp := &pb.GetKeyVizMatrixResponse{
583+
ColumnUnixMs: make([]int64, len(cols)),
584+
}
585+
rowsByID := make(map[uint64]*pb.KeyVizRow)
586+
order := make([]uint64, 0)
587+
for j, col := range cols {
588+
resp.ColumnUnixMs[j] = col.At.UnixMilli()
589+
for _, mr := range col.Rows {
590+
pr, ok := rowsByID[mr.RouteID]
591+
if !ok {
592+
pr = newKeyVizRowFrom(mr, len(cols))
593+
rowsByID[mr.RouteID] = pr
594+
order = append(order, mr.RouteID)
595+
}
596+
pr.Values[j] = pick(mr)
597+
}
598+
}
599+
resp.Rows = make([]*pb.KeyVizRow, len(order))
600+
for i, id := range order {
601+
resp.Rows[i] = rowsByID[id]
602+
}
603+
resp.Rows = applyKeyVizRowBudget(resp.Rows, rowBudget)
604+
sortKeyVizRowsByStart(resp.Rows)
605+
return resp
606+
}
607+
608+
// applyKeyVizRowBudget caps rows to budget by total activity per row
609+
// (sum of per-column values), preserving the top-N rows. budget <= 0
610+
// means "no cap."
611+
func applyKeyVizRowBudget(rows []*pb.KeyVizRow, budget int) []*pb.KeyVizRow {
612+
if budget <= 0 || len(rows) <= budget {
613+
return rows
614+
}
615+
sort.Slice(rows, func(i, j int) bool {
616+
return rowActivityTotal(rows[i]) > rowActivityTotal(rows[j])
617+
})
618+
return rows[:budget]
619+
}
620+
621+
func rowActivityTotal(r *pb.KeyVizRow) uint64 {
622+
var sum uint64
623+
for _, v := range r.Values {
624+
sum += v
625+
}
626+
return sum
627+
}
628+
629+
// newKeyVizRowFrom seeds a proto row from the first MatrixRow seen
630+
// for a given RouteID. Values is allocated with len == numCols so
631+
// every column gets a deterministic slot (zero-valued by default).
632+
//
633+
// route_count surfaces MemberRoutesTotal (the true number of routes
634+
// folded into the bucket) — not just len(MemberRoutes), which the
635+
// sampler caps at MaxMemberRoutesPerSlot. When the visible list is
636+
// shorter than the total, route_ids_truncated lets consumers know
637+
// to trust route_count for drill-down decisions.
638+
func newKeyVizRowFrom(mr keyviz.MatrixRow, numCols int) *pb.KeyVizRow {
639+
total := mr.MemberRoutesTotal
640+
if !mr.Aggregate && total == 0 {
641+
// Individual slots fall through to RouteCount=1 when the
642+
// sampler predates MemberRoutesTotal or never set it.
643+
total = 1
644+
}
645+
row := &pb.KeyVizRow{
646+
BucketId: bucketIDFor(mr),
647+
Start: append([]byte(nil), mr.Start...),
648+
End: append([]byte(nil), mr.End...),
649+
Aggregate: mr.Aggregate,
650+
RouteCount: total,
651+
RouteIdsTruncated: mr.Aggregate && total > uint64(len(mr.MemberRoutes)),
652+
Values: make([]uint64, numCols),
653+
}
654+
if mr.Aggregate {
655+
row.RouteIds = append([]uint64(nil), mr.MemberRoutes...)
656+
}
657+
return row
658+
}
659+
660+
func bucketIDFor(mr keyviz.MatrixRow) string {
661+
if mr.Aggregate {
662+
return "virtual:" + strconv.FormatUint(mr.RouteID, 10)
663+
}
664+
return "route:" + strconv.FormatUint(mr.RouteID, 10)
665+
}
666+
667+
func sortKeyVizRowsByStart(rows []*pb.KeyVizRow) {
668+
sort.Slice(rows, func(i, j int) bool {
669+
if c := bytes.Compare(rows[i].Start, rows[j].Start); c != 0 {
670+
return c < 0
671+
}
672+
return rows[i].BucketId < rows[j].BucketId
673+
})
674+
}

0 commit comments

Comments
 (0)