Skip to content

Commit 33d2181

Browse files
committed
feat(adapter): expose keyviz heatmap via AdminServer.GetKeyVizMatrix
Implements the `GetKeyVizMatrix` admin gRPC RPC declared in `proto/admin.proto`, completing the read-side of the key visualizer flow (design doc §5.2 / §6). - New `KeyVizSampler` interface in adapter (Snapshot only) so tests can pass a deterministic in-memory fake while production wires `*keyviz.MemSampler` directly. - `AdminServer.RegisterSampler` mirrors `RegisterGroup`. Without it, GetKeyVizMatrix returns codes.Unavailable so callers can distinguish "keyviz disabled on this node" from "no data yet" (which is a successful empty response). - Pivots the column-major MatrixColumn slice into the row-major proto layout: one KeyVizRow per RouteID with values aligned to a parallel column_unix_ms slice. KeyVizSeries selection picks the matching per-row counter; UNSPECIFIED defaults to Reads. - bucket_id encodes "route:<id>" for individual slots and "virtual:<syntheticID>" for aggregate buckets. Aggregate rows carry MemberRoutes verbatim through route_ids and route_count. Tests: - TestGetKeyVizMatrixReturnsUnavailableWhenSamplerNotRegistered - TestGetKeyVizMatrixPivotsColumnsToRows — two-column / two-route fixture, verifies the missing-row-becomes-zero contract. - TestGetKeyVizMatrixSeriesSelection — table-driven across all five enum values including UNSPECIFIED defaulting to Reads. - TestGetKeyVizMatrixEncodesAggregateBucket — virtual bucket layout.
1 parent 29011bb commit 33d2181

2 files changed

Lines changed: 329 additions & 0 deletions

File tree

adapter/admin_grpc.go

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
package adapter
22

33
import (
4+
"bytes"
45
"context"
56
"crypto/subtle"
67
"sort"
8+
"strconv"
79
"strings"
810
"sync"
911
"time"
1012

1113
"github.com/bootjp/elastickv/internal/raftengine"
14+
"github.com/bootjp/elastickv/keyviz"
1215
pb "github.com/bootjp/elastickv/proto"
1316
"github.com/cockroachdb/errors"
1417
"google.golang.org/grpc"
@@ -17,6 +20,19 @@ import (
1720
"google.golang.org/grpc/status"
1821
)
1922

23+
// KeyVizSampler is the read-side abstraction the Admin service needs
24+
// from the keyviz package: a time-bounded matrix snapshot. Defined
25+
// here (not in keyviz) so tests can pass an in-memory fake without
26+
// constructing a full *keyviz.MemSampler. *keyviz.MemSampler
27+
// satisfies this interface.
28+
type KeyVizSampler interface {
29+
// Snapshot returns the matrix columns in [from, to). Either
30+
// bound may be the zero time meaning unbounded on that side.
31+
// Implementations must return rows the caller can mutate freely
32+
// (a deep copy) — see keyviz.MemSampler.Snapshot.
33+
Snapshot(from, to time.Time) []keyviz.MatrixColumn
34+
}
35+
2036
// AdminGroup exposes per-Raft-group state to the Admin service. It is a narrow
2137
// subset of raftengine.Engine so tests can supply an in-memory fake without
2238
// standing up a real Raft cluster. Configuration is polled on each
@@ -56,6 +72,12 @@ type AdminServer struct {
5672
// instance cannot contend with concurrent RPCs on another instance.
5773
now func() time.Time
5874

75+
// sampler exposes the keyviz heatmap matrix to GetKeyVizMatrix.
76+
// Nil means keyviz is disabled — the RPC returns Unavailable.
77+
// Guarded by groupsMu (same lock as groups/now) so RegisterSampler
78+
// pairs atomically with concurrent RPC reads.
79+
sampler KeyVizSampler
80+
5981
pb.UnimplementedAdminServer
6082
}
6183

@@ -97,6 +119,16 @@ func (s *AdminServer) RegisterGroup(groupID uint64, g AdminGroup) {
97119
s.groupsMu.Unlock()
98120
}
99121

122+
// RegisterSampler wires the keyviz sampler used by GetKeyVizMatrix.
123+
// Without this call (or with a nil sampler) the RPC returns
124+
// codes.Unavailable so callers can distinguish "keyviz disabled"
125+
// from "no data yet".
126+
func (s *AdminServer) RegisterSampler(sampler KeyVizSampler) {
127+
s.groupsMu.Lock()
128+
s.sampler = sampler
129+
s.groupsMu.Unlock()
130+
}
131+
100132
// GetClusterOverview returns the local node identity, the current member
101133
// list, and per-group leader identity collected from the engines registered
102134
// via RegisterGroup. The member list is the union of (a) the bootstrap seed
@@ -477,3 +509,125 @@ func AdminTokenAuth(token string) (grpc.UnaryServerInterceptor, grpc.StreamServe
477509
// ErrAdminTokenRequired is returned by NewAdminServer helpers when the operator
478510
// failed to supply a token and also did not opt into insecure mode.
479511
var ErrAdminTokenRequired = errors.New("admin token file required; pass --adminInsecureNoAuth to run without")
512+
513+
// GetKeyVizMatrix renders the keyviz heatmap matrix for the [from, to)
514+
// range supplied by the request, returning one KeyVizRow per tracked
515+
// route or virtual bucket and a parallel column-timestamp slice.
516+
//
517+
// Series selection (Reads / Writes / ReadBytes / WriteBytes) maps from
518+
// the request's KeyVizSeries enum to the matching keyviz.MatrixRow
519+
// counter; KEYVIZ_SERIES_UNSPECIFIED defaults to Reads.
520+
//
521+
// Returns codes.Unavailable when no sampler is registered (keyviz
522+
// disabled) so callers can distinguish that from "no data yet"
523+
// (which yields a successful empty response).
524+
func (s *AdminServer) GetKeyVizMatrix(
525+
_ context.Context,
526+
req *pb.GetKeyVizMatrixRequest,
527+
) (*pb.GetKeyVizMatrixResponse, error) {
528+
s.groupsMu.RLock()
529+
sampler := s.sampler
530+
s.groupsMu.RUnlock()
531+
if sampler == nil {
532+
return nil, errors.WithStack(status.Error(codes.Unavailable, "keyviz sampler not configured on this node"))
533+
}
534+
from := unixMsToTime(req.GetFromUnixMs())
535+
to := unixMsToTime(req.GetToUnixMs())
536+
cols := sampler.Snapshot(from, to)
537+
pickValue := matrixSeriesPicker(req.GetSeries())
538+
return matrixToProto(cols, pickValue), nil
539+
}
540+
541+
// unixMsToTime converts a Unix-millisecond timestamp into a time.Time,
542+
// returning the zero Time when the input is zero so the sampler reads
543+
// an unbounded range on that side.
544+
func unixMsToTime(ms int64) time.Time {
545+
if ms == 0 {
546+
return time.Time{}
547+
}
548+
return time.UnixMilli(ms)
549+
}
550+
551+
// matrixSeriesPicker returns a callback that extracts the requested
552+
// counter from a MatrixRow. KEYVIZ_SERIES_UNSPECIFIED (and READS)
553+
// fall through to Reads so a default-valued request still returns
554+
// something useful.
555+
func matrixSeriesPicker(series pb.KeyVizSeries) func(keyviz.MatrixRow) uint64 {
556+
switch series {
557+
case pb.KeyVizSeries_KEYVIZ_SERIES_WRITES:
558+
return func(r keyviz.MatrixRow) uint64 { return r.Writes }
559+
case pb.KeyVizSeries_KEYVIZ_SERIES_READ_BYTES:
560+
return func(r keyviz.MatrixRow) uint64 { return r.ReadBytes }
561+
case pb.KeyVizSeries_KEYVIZ_SERIES_WRITE_BYTES:
562+
return func(r keyviz.MatrixRow) uint64 { return r.WriteBytes }
563+
case pb.KeyVizSeries_KEYVIZ_SERIES_UNSPECIFIED, pb.KeyVizSeries_KEYVIZ_SERIES_READS:
564+
return func(r keyviz.MatrixRow) uint64 { return r.Reads }
565+
default:
566+
return func(r keyviz.MatrixRow) uint64 { return r.Reads }
567+
}
568+
}
569+
570+
// matrixToProto pivots the column-major MatrixColumn slice into the
571+
// row-major proto layout: one KeyVizRow per distinct RouteID with a
572+
// values slice aligned to the column_unix_ms parallel slice. Idle
573+
// routes (zero in every column) are not emitted by the sampler, so
574+
// the row set already reflects observed activity in [from, to).
575+
func matrixToProto(cols []keyviz.MatrixColumn, pick func(keyviz.MatrixRow) uint64) *pb.GetKeyVizMatrixResponse {
576+
resp := &pb.GetKeyVizMatrixResponse{
577+
ColumnUnixMs: make([]int64, len(cols)),
578+
}
579+
rowsByID := make(map[uint64]*pb.KeyVizRow)
580+
order := make([]uint64, 0)
581+
for j, col := range cols {
582+
resp.ColumnUnixMs[j] = col.At.UnixMilli()
583+
for _, mr := range col.Rows {
584+
pr, ok := rowsByID[mr.RouteID]
585+
if !ok {
586+
pr = newKeyVizRowFrom(mr, len(cols))
587+
rowsByID[mr.RouteID] = pr
588+
order = append(order, mr.RouteID)
589+
}
590+
pr.Values[j] = pick(mr)
591+
}
592+
}
593+
resp.Rows = make([]*pb.KeyVizRow, len(order))
594+
for i, id := range order {
595+
resp.Rows[i] = rowsByID[id]
596+
}
597+
sortKeyVizRowsByStart(resp.Rows)
598+
return resp
599+
}
600+
601+
// newKeyVizRowFrom seeds a proto row from the first MatrixRow seen
602+
// for a given RouteID. Values is allocated with len == numCols so
603+
// every column gets a deterministic slot (zero-valued by default).
604+
func newKeyVizRowFrom(mr keyviz.MatrixRow, numCols int) *pb.KeyVizRow {
605+
row := &pb.KeyVizRow{
606+
BucketId: bucketIDFor(mr),
607+
Start: append([]byte(nil), mr.Start...),
608+
End: append([]byte(nil), mr.End...),
609+
Aggregate: mr.Aggregate,
610+
RouteCount: uint64(len(mr.MemberRoutes)),
611+
Values: make([]uint64, numCols),
612+
}
613+
if mr.Aggregate {
614+
row.RouteIds = append([]uint64(nil), mr.MemberRoutes...)
615+
}
616+
return row
617+
}
618+
619+
func bucketIDFor(mr keyviz.MatrixRow) string {
620+
if mr.Aggregate {
621+
return "virtual:" + strconv.FormatUint(mr.RouteID, 10)
622+
}
623+
return "route:" + strconv.FormatUint(mr.RouteID, 10)
624+
}
625+
626+
func sortKeyVizRowsByStart(rows []*pb.KeyVizRow) {
627+
sort.Slice(rows, func(i, j int) bool {
628+
if c := bytes.Compare(rows[i].Start, rows[j].Start); c != 0 {
629+
return c < 0
630+
}
631+
return rows[i].BucketId < rows[j].BucketId
632+
})
633+
}

adapter/admin_grpc_keyviz_test.go

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
package adapter
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/bootjp/elastickv/keyviz"
9+
pb "github.com/bootjp/elastickv/proto"
10+
"github.com/stretchr/testify/require"
11+
"google.golang.org/grpc/codes"
12+
"google.golang.org/grpc/status"
13+
)
14+
15+
// fakeKeyVizSampler is a deterministic in-memory KeyVizSampler so
16+
// AdminServer tests don't need to drive a real keyviz.MemSampler with
17+
// goroutines and time. Snapshot returns a fresh deep copy of the
18+
// configured columns so the test mirrors the real sampler's contract.
19+
type fakeKeyVizSampler struct {
20+
cols []keyviz.MatrixColumn
21+
}
22+
23+
func (f *fakeKeyVizSampler) Snapshot(_, _ time.Time) []keyviz.MatrixColumn {
24+
out := make([]keyviz.MatrixColumn, len(f.cols))
25+
for i, c := range f.cols {
26+
rows := make([]keyviz.MatrixRow, len(c.Rows))
27+
for j, r := range c.Rows {
28+
rows[j] = r
29+
rows[j].Start = append([]byte(nil), r.Start...)
30+
rows[j].End = append([]byte(nil), r.End...)
31+
if len(r.MemberRoutes) > 0 {
32+
rows[j].MemberRoutes = append([]uint64(nil), r.MemberRoutes...)
33+
}
34+
}
35+
out[i] = keyviz.MatrixColumn{At: c.At, Rows: rows}
36+
}
37+
return out
38+
}
39+
40+
// TestGetKeyVizMatrixReturnsUnavailableWhenSamplerNotRegistered pins
41+
// the failure mode operators should see when keyviz is disabled on
42+
// a node — Unavailable rather than a successful empty response.
43+
func TestGetKeyVizMatrixReturnsUnavailableWhenSamplerNotRegistered(t *testing.T) {
44+
t.Parallel()
45+
srv := NewAdminServer(NodeIdentity{NodeID: "node-a"}, nil)
46+
_, err := srv.GetKeyVizMatrix(context.Background(), &pb.GetKeyVizMatrixRequest{})
47+
st, ok := status.FromError(err)
48+
if !ok || st.Code() != codes.Unavailable {
49+
t.Fatalf("expected Unavailable, got %v", err)
50+
}
51+
}
52+
53+
// TestGetKeyVizMatrixPivotsColumnsToRows pins the row-major proto
54+
// layout: one KeyVizRow per RouteID with values aligned to the
55+
// parallel column_unix_ms slice. Drives a fake sampler with two
56+
// columns and two routes (one of which reports zero in column 1).
57+
func TestGetKeyVizMatrixPivotsColumnsToRows(t *testing.T) {
58+
t.Parallel()
59+
t0 := time.Unix(1_700_000_000, 0)
60+
t1 := t0.Add(time.Minute)
61+
srv := newAdminServerWithFakeSampler(t, twoColumnTwoRouteCols(t0, t1))
62+
63+
resp, err := srv.GetKeyVizMatrix(context.Background(), &pb.GetKeyVizMatrixRequest{
64+
Series: pb.KeyVizSeries_KEYVIZ_SERIES_READS,
65+
})
66+
require.NoError(t, err)
67+
require.Equal(t, []int64{t0.UnixMilli(), t1.UnixMilli()}, resp.ColumnUnixMs)
68+
require.Len(t, resp.Rows, 2)
69+
// Sorted by Start: route 1 ("a") then route 2 ("m").
70+
r1, r2 := resp.Rows[0], resp.Rows[1]
71+
require.Equal(t, "route:1", r1.BucketId)
72+
require.Equal(t, "route:2", r2.BucketId)
73+
require.Equal(t, []byte("a"), r1.Start)
74+
require.Equal(t, []byte("m"), r1.End)
75+
require.False(t, r1.Aggregate)
76+
require.False(t, r2.Aggregate)
77+
require.Equal(t, []uint64{4, 9}, r1.Values)
78+
// Route 2 is absent in column 1 — zero by default.
79+
require.Equal(t, []uint64{7, 0}, r2.Values)
80+
}
81+
82+
func twoColumnTwoRouteCols(t0, t1 time.Time) []keyviz.MatrixColumn {
83+
return []keyviz.MatrixColumn{
84+
{
85+
At: t0,
86+
Rows: []keyviz.MatrixRow{
87+
{RouteID: 1, Start: []byte("a"), End: []byte("m"), Reads: 4, Writes: 1},
88+
{RouteID: 2, Start: []byte("m"), End: []byte("z"), Reads: 7, Writes: 0},
89+
},
90+
},
91+
{
92+
At: t1,
93+
Rows: []keyviz.MatrixRow{
94+
{RouteID: 1, Start: []byte("a"), End: []byte("m"), Reads: 9, Writes: 3},
95+
},
96+
},
97+
}
98+
}
99+
100+
func newAdminServerWithFakeSampler(t *testing.T, cols []keyviz.MatrixColumn) *AdminServer {
101+
t.Helper()
102+
srv := NewAdminServer(NodeIdentity{NodeID: "node-a"}, nil)
103+
srv.RegisterSampler(&fakeKeyVizSampler{cols: cols})
104+
return srv
105+
}
106+
107+
// TestGetKeyVizMatrixSeriesSelection pins the request.Series →
108+
// MatrixRow counter mapping including the UNSPECIFIED → Reads default.
109+
func TestGetKeyVizMatrixSeriesSelection(t *testing.T) {
110+
t.Parallel()
111+
row := keyviz.MatrixRow{
112+
RouteID: 1,
113+
Start: []byte("a"),
114+
End: []byte("z"),
115+
Reads: 11,
116+
Writes: 22,
117+
ReadBytes: 333,
118+
WriteBytes: 4444,
119+
}
120+
srv := newAdminServerWithFakeSampler(t, []keyviz.MatrixColumn{
121+
{At: time.Unix(1_700_000_000, 0), Rows: []keyviz.MatrixRow{row}},
122+
})
123+
124+
for _, tc := range []struct {
125+
name string
126+
series pb.KeyVizSeries
127+
want uint64
128+
}{
129+
{"unspecified defaults to reads", pb.KeyVizSeries_KEYVIZ_SERIES_UNSPECIFIED, 11},
130+
{"reads", pb.KeyVizSeries_KEYVIZ_SERIES_READS, 11},
131+
{"writes", pb.KeyVizSeries_KEYVIZ_SERIES_WRITES, 22},
132+
{"read_bytes", pb.KeyVizSeries_KEYVIZ_SERIES_READ_BYTES, 333},
133+
{"write_bytes", pb.KeyVizSeries_KEYVIZ_SERIES_WRITE_BYTES, 4444},
134+
} {
135+
t.Run(tc.name, func(t *testing.T) {
136+
resp, err := srv.GetKeyVizMatrix(context.Background(), &pb.GetKeyVizMatrixRequest{Series: tc.series})
137+
require.NoError(t, err)
138+
require.Len(t, resp.Rows, 1)
139+
require.Equal(t, []uint64{tc.want}, resp.Rows[0].Values)
140+
})
141+
}
142+
}
143+
144+
// TestGetKeyVizMatrixEncodesAggregateBucket pins the proto layout
145+
// for virtual buckets: bucket_id prefixed "virtual:", aggregate=true,
146+
// route_ids carries the MemberRoutes list, and route_count matches.
147+
func TestGetKeyVizMatrixEncodesAggregateBucket(t *testing.T) {
148+
t.Parallel()
149+
srv := newAdminServerWithFakeSampler(t, []keyviz.MatrixColumn{
150+
{
151+
At: time.Unix(1_700_000_000, 0),
152+
Rows: []keyviz.MatrixRow{
153+
{
154+
RouteID: ^uint64(0), // synthetic virtual-bucket ID
155+
Start: []byte("c"),
156+
End: []byte("d"),
157+
Aggregate: true,
158+
MemberRoutes: []uint64{2, 3, 4},
159+
Reads: 50,
160+
},
161+
},
162+
},
163+
})
164+
165+
resp, err := srv.GetKeyVizMatrix(context.Background(), &pb.GetKeyVizMatrixRequest{
166+
Series: pb.KeyVizSeries_KEYVIZ_SERIES_READS,
167+
})
168+
require.NoError(t, err)
169+
require.Len(t, resp.Rows, 1)
170+
r := resp.Rows[0]
171+
require.True(t, r.Aggregate)
172+
require.Equal(t, "virtual:18446744073709551615", r.BucketId)
173+
require.Equal(t, uint64(3), r.RouteCount)
174+
require.Equal(t, []uint64{2, 3, 4}, r.RouteIds)
175+
}

0 commit comments

Comments
 (0)