Skip to content
Merged
154 changes: 154 additions & 0 deletions adapter/admin_grpc.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package adapter

import (
"bytes"
"context"
"crypto/subtle"
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/bootjp/elastickv/internal/raftengine"
"github.com/bootjp/elastickv/keyviz"
pb "github.com/bootjp/elastickv/proto"
"github.com/cockroachdb/errors"
"google.golang.org/grpc"
Expand All @@ -17,6 +20,19 @@ import (
"google.golang.org/grpc/status"
)

// KeyVizSampler is the read-side abstraction the Admin service needs
// from the keyviz package: a time-bounded matrix snapshot. Defined
// here (not in keyviz) so tests can pass an in-memory fake without
// constructing a full *keyviz.MemSampler. *keyviz.MemSampler
// satisfies this interface.
type KeyVizSampler interface {
// Snapshot returns the matrix columns in [from, to). Either
// bound may be the zero time meaning unbounded on that side.
// Implementations must return rows the caller can mutate freely
// (a deep copy) — see keyviz.MemSampler.Snapshot.
Snapshot(from, to time.Time) []keyviz.MatrixColumn
}

// AdminGroup exposes per-Raft-group state to the Admin service. It is a narrow
// subset of raftengine.Engine so tests can supply an in-memory fake without
// standing up a real Raft cluster. Configuration is polled on each
Expand Down Expand Up @@ -56,6 +72,12 @@ type AdminServer struct {
// instance cannot contend with concurrent RPCs on another instance.
now func() time.Time

// sampler exposes the keyviz heatmap matrix to GetKeyVizMatrix.
// Nil means keyviz is disabled — the RPC returns Unavailable.
// Guarded by groupsMu (same lock as groups/now) so RegisterSampler
// pairs atomically with concurrent RPC reads.
sampler KeyVizSampler

pb.UnimplementedAdminServer
}

Expand Down Expand Up @@ -97,6 +119,16 @@ func (s *AdminServer) RegisterGroup(groupID uint64, g AdminGroup) {
s.groupsMu.Unlock()
}

// RegisterSampler wires the keyviz sampler used by GetKeyVizMatrix.
// Without this call (or with a nil sampler) the RPC returns
// codes.Unavailable so callers can distinguish "keyviz disabled"
// from "no data yet".
func (s *AdminServer) RegisterSampler(sampler KeyVizSampler) {
s.groupsMu.Lock()
s.sampler = sampler
s.groupsMu.Unlock()
}

// GetClusterOverview returns the local node identity, the current member
// list, and per-group leader identity collected from the engines registered
// via RegisterGroup. The member list is the union of (a) the bootstrap seed
Expand Down Expand Up @@ -477,3 +509,125 @@ func AdminTokenAuth(token string) (grpc.UnaryServerInterceptor, grpc.StreamServe
// ErrAdminTokenRequired is returned by NewAdminServer helpers when the operator
// failed to supply a token and also did not opt into insecure mode.
var ErrAdminTokenRequired = errors.New("admin token file required; pass --adminInsecureNoAuth to run without")

// GetKeyVizMatrix renders the keyviz heatmap matrix for the [from, to)
// range supplied by the request, returning one KeyVizRow per tracked
// route or virtual bucket and a parallel column-timestamp slice.
//
// Series selection (Reads / Writes / ReadBytes / WriteBytes) maps from
// the request's KeyVizSeries enum to the matching keyviz.MatrixRow
// counter; KEYVIZ_SERIES_UNSPECIFIED defaults to Reads.
//
// Returns codes.Unavailable when no sampler is registered (keyviz
// disabled) so callers can distinguish that from "no data yet"
// (which yields a successful empty response).
func (s *AdminServer) GetKeyVizMatrix(
_ context.Context,
req *pb.GetKeyVizMatrixRequest,
) (*pb.GetKeyVizMatrixResponse, error) {
s.groupsMu.RLock()
sampler := s.sampler
s.groupsMu.RUnlock()
if sampler == nil {
return nil, errors.WithStack(status.Error(codes.Unavailable, "keyviz sampler not configured on this node"))
}
from := unixMsToTime(req.GetFromUnixMs())
to := unixMsToTime(req.GetToUnixMs())
cols := sampler.Snapshot(from, to)
pickValue := matrixSeriesPicker(req.GetSeries())
return matrixToProto(cols, pickValue), nil
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Apply the request row budget in GetKeyVizMatrix

This handler never reads req.GetRows(), so it always returns every row from the snapshot regardless of caller-requested resolution. On large route counts, this can blow up response size and processing time for admin queries that are supposed to request a bounded Y-axis (for example 256 rows), making the endpoint much more expensive than requested.

Useful? React with 👍 / 👎.

}

// unixMsToTime converts a Unix-millisecond timestamp into a time.Time,
// returning the zero Time when the input is zero so the sampler reads
// an unbounded range on that side.
func unixMsToTime(ms int64) time.Time {
if ms == 0 {
return time.Time{}
}
return time.UnixMilli(ms)
}

// matrixSeriesPicker returns a callback that extracts the requested
// counter from a MatrixRow. KEYVIZ_SERIES_UNSPECIFIED (and READS)
// fall through to Reads so a default-valued request still returns
// something useful.
func matrixSeriesPicker(series pb.KeyVizSeries) func(keyviz.MatrixRow) uint64 {
switch series {
case pb.KeyVizSeries_KEYVIZ_SERIES_WRITES:
return func(r keyviz.MatrixRow) uint64 { return r.Writes }
case pb.KeyVizSeries_KEYVIZ_SERIES_READ_BYTES:
return func(r keyviz.MatrixRow) uint64 { return r.ReadBytes }
case pb.KeyVizSeries_KEYVIZ_SERIES_WRITE_BYTES:
return func(r keyviz.MatrixRow) uint64 { return r.WriteBytes }
case pb.KeyVizSeries_KEYVIZ_SERIES_UNSPECIFIED, pb.KeyVizSeries_KEYVIZ_SERIES_READS:
return func(r keyviz.MatrixRow) uint64 { return r.Reads }
default:
return func(r keyviz.MatrixRow) uint64 { return r.Reads }
}
}

// matrixToProto pivots the column-major MatrixColumn slice into the
// row-major proto layout: one KeyVizRow per distinct RouteID with a
// values slice aligned to the column_unix_ms parallel slice. Idle
// routes (zero in every column) are not emitted by the sampler, so
// the row set already reflects observed activity in [from, to).
func matrixToProto(cols []keyviz.MatrixColumn, pick func(keyviz.MatrixRow) uint64) *pb.GetKeyVizMatrixResponse {
resp := &pb.GetKeyVizMatrixResponse{
ColumnUnixMs: make([]int64, len(cols)),
}
rowsByID := make(map[uint64]*pb.KeyVizRow)
order := make([]uint64, 0)
for j, col := range cols {
resp.ColumnUnixMs[j] = col.At.UnixMilli()
for _, mr := range col.Rows {
pr, ok := rowsByID[mr.RouteID]
if !ok {
pr = newKeyVizRowFrom(mr, len(cols))
rowsByID[mr.RouteID] = pr
order = append(order, mr.RouteID)
}
pr.Values[j] = pick(mr)
}
}
resp.Rows = make([]*pb.KeyVizRow, len(order))
for i, id := range order {
resp.Rows[i] = rowsByID[id]
}
sortKeyVizRowsByStart(resp.Rows)
return resp
}

// newKeyVizRowFrom seeds a proto row from the first MatrixRow seen
// for a given RouteID. Values is allocated with len == numCols so
// every column gets a deterministic slot (zero-valued by default).
func newKeyVizRowFrom(mr keyviz.MatrixRow, numCols int) *pb.KeyVizRow {
row := &pb.KeyVizRow{
BucketId: bucketIDFor(mr),
Start: append([]byte(nil), mr.Start...),
End: append([]byte(nil), mr.End...),
Aggregate: mr.Aggregate,
RouteCount: uint64(len(mr.MemberRoutes)),
Values: make([]uint64, numCols),
}
if mr.Aggregate {
row.RouteIds = append([]uint64(nil), mr.MemberRoutes...)
}
return row
}

func bucketIDFor(mr keyviz.MatrixRow) string {
if mr.Aggregate {
return "virtual:" + strconv.FormatUint(mr.RouteID, 10)
}
return "route:" + strconv.FormatUint(mr.RouteID, 10)
}

func sortKeyVizRowsByStart(rows []*pb.KeyVizRow) {
sort.Slice(rows, func(i, j int) bool {
if c := bytes.Compare(rows[i].Start, rows[j].Start); c != 0 {
return c < 0
}
return rows[i].BucketId < rows[j].BucketId
})
}
175 changes: 175 additions & 0 deletions adapter/admin_grpc_keyviz_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package adapter

import (
"context"
"testing"
"time"

"github.com/bootjp/elastickv/keyviz"
pb "github.com/bootjp/elastickv/proto"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// fakeKeyVizSampler is a deterministic in-memory KeyVizSampler so
// AdminServer tests don't need to drive a real keyviz.MemSampler with
// goroutines and time. Snapshot returns a fresh deep copy of the
// configured columns so the test mirrors the real sampler's contract.
type fakeKeyVizSampler struct {
cols []keyviz.MatrixColumn
}

func (f *fakeKeyVizSampler) Snapshot(_, _ time.Time) []keyviz.MatrixColumn {
out := make([]keyviz.MatrixColumn, len(f.cols))
for i, c := range f.cols {
rows := make([]keyviz.MatrixRow, len(c.Rows))
for j, r := range c.Rows {
rows[j] = r
rows[j].Start = append([]byte(nil), r.Start...)
rows[j].End = append([]byte(nil), r.End...)
if len(r.MemberRoutes) > 0 {
rows[j].MemberRoutes = append([]uint64(nil), r.MemberRoutes...)
}
}
out[i] = keyviz.MatrixColumn{At: c.At, Rows: rows}
}
return out
}

// TestGetKeyVizMatrixReturnsUnavailableWhenSamplerNotRegistered pins
// the failure mode operators should see when keyviz is disabled on
// a node — Unavailable rather than a successful empty response.
func TestGetKeyVizMatrixReturnsUnavailableWhenSamplerNotRegistered(t *testing.T) {
t.Parallel()
srv := NewAdminServer(NodeIdentity{NodeID: "node-a"}, nil)
_, err := srv.GetKeyVizMatrix(context.Background(), &pb.GetKeyVizMatrixRequest{})
st, ok := status.FromError(err)
if !ok || st.Code() != codes.Unavailable {
t.Fatalf("expected Unavailable, got %v", err)
}
}

// TestGetKeyVizMatrixPivotsColumnsToRows pins the row-major proto
// layout: one KeyVizRow per RouteID with values aligned to the
// parallel column_unix_ms slice. Drives a fake sampler with two
// columns and two routes (one of which reports zero in column 1).
func TestGetKeyVizMatrixPivotsColumnsToRows(t *testing.T) {
t.Parallel()
t0 := time.Unix(1_700_000_000, 0)
t1 := t0.Add(time.Minute)
srv := newAdminServerWithFakeSampler(t, twoColumnTwoRouteCols(t0, t1))

resp, err := srv.GetKeyVizMatrix(context.Background(), &pb.GetKeyVizMatrixRequest{
Series: pb.KeyVizSeries_KEYVIZ_SERIES_READS,
})
require.NoError(t, err)
require.Equal(t, []int64{t0.UnixMilli(), t1.UnixMilli()}, resp.ColumnUnixMs)
require.Len(t, resp.Rows, 2)
// Sorted by Start: route 1 ("a") then route 2 ("m").
r1, r2 := resp.Rows[0], resp.Rows[1]
require.Equal(t, "route:1", r1.BucketId)
require.Equal(t, "route:2", r2.BucketId)
require.Equal(t, []byte("a"), r1.Start)
require.Equal(t, []byte("m"), r1.End)
require.False(t, r1.Aggregate)
require.False(t, r2.Aggregate)
require.Equal(t, []uint64{4, 9}, r1.Values)
// Route 2 is absent in column 1 — zero by default.
require.Equal(t, []uint64{7, 0}, r2.Values)
}

func twoColumnTwoRouteCols(t0, t1 time.Time) []keyviz.MatrixColumn {
return []keyviz.MatrixColumn{
{
At: t0,
Rows: []keyviz.MatrixRow{
{RouteID: 1, Start: []byte("a"), End: []byte("m"), Reads: 4, Writes: 1},
{RouteID: 2, Start: []byte("m"), End: []byte("z"), Reads: 7, Writes: 0},
},
},
{
At: t1,
Rows: []keyviz.MatrixRow{
{RouteID: 1, Start: []byte("a"), End: []byte("m"), Reads: 9, Writes: 3},
},
},
}
}

func newAdminServerWithFakeSampler(t *testing.T, cols []keyviz.MatrixColumn) *AdminServer {
t.Helper()
srv := NewAdminServer(NodeIdentity{NodeID: "node-a"}, nil)
srv.RegisterSampler(&fakeKeyVizSampler{cols: cols})
return srv
}

// TestGetKeyVizMatrixSeriesSelection pins the request.Series →
// MatrixRow counter mapping including the UNSPECIFIED → Reads default.
func TestGetKeyVizMatrixSeriesSelection(t *testing.T) {
t.Parallel()
row := keyviz.MatrixRow{
RouteID: 1,
Start: []byte("a"),
End: []byte("z"),
Reads: 11,
Writes: 22,
ReadBytes: 333,
WriteBytes: 4444,
}
srv := newAdminServerWithFakeSampler(t, []keyviz.MatrixColumn{
{At: time.Unix(1_700_000_000, 0), Rows: []keyviz.MatrixRow{row}},
})

for _, tc := range []struct {
name string
series pb.KeyVizSeries
want uint64
}{
{"unspecified defaults to reads", pb.KeyVizSeries_KEYVIZ_SERIES_UNSPECIFIED, 11},
{"reads", pb.KeyVizSeries_KEYVIZ_SERIES_READS, 11},
{"writes", pb.KeyVizSeries_KEYVIZ_SERIES_WRITES, 22},
{"read_bytes", pb.KeyVizSeries_KEYVIZ_SERIES_READ_BYTES, 333},
{"write_bytes", pb.KeyVizSeries_KEYVIZ_SERIES_WRITE_BYTES, 4444},
} {
t.Run(tc.name, func(t *testing.T) {
resp, err := srv.GetKeyVizMatrix(context.Background(), &pb.GetKeyVizMatrixRequest{Series: tc.series})
require.NoError(t, err)
require.Len(t, resp.Rows, 1)
require.Equal(t, []uint64{tc.want}, resp.Rows[0].Values)
})
}
}

// TestGetKeyVizMatrixEncodesAggregateBucket pins the proto layout
// for virtual buckets: bucket_id prefixed "virtual:", aggregate=true,
// route_ids carries the MemberRoutes list, and route_count matches.
func TestGetKeyVizMatrixEncodesAggregateBucket(t *testing.T) {
t.Parallel()
srv := newAdminServerWithFakeSampler(t, []keyviz.MatrixColumn{
{
At: time.Unix(1_700_000_000, 0),
Rows: []keyviz.MatrixRow{
{
RouteID: ^uint64(0), // synthetic virtual-bucket ID
Start: []byte("c"),
End: []byte("d"),
Aggregate: true,
MemberRoutes: []uint64{2, 3, 4},
Reads: 50,
},
},
},
})

resp, err := srv.GetKeyVizMatrix(context.Background(), &pb.GetKeyVizMatrixRequest{
Series: pb.KeyVizSeries_KEYVIZ_SERIES_READS,
})
require.NoError(t, err)
require.Len(t, resp.Rows, 1)
r := resp.Rows[0]
require.True(t, r.Aggregate)
require.Equal(t, "virtual:18446744073709551615", r.BucketId)
require.Equal(t, uint64(3), r.RouteCount)
require.Equal(t, []uint64{2, 3, 4}, r.RouteIds)
}
Loading
Loading