Skip to content
Merged
195 changes: 195 additions & 0 deletions adapter/admin_grpc.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package adapter

import (
"bytes"
"context"
"crypto/subtle"
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/bootjp/elastickv/internal/raftengine"
"github.com/bootjp/elastickv/keyviz"
pb "github.com/bootjp/elastickv/proto"
"github.com/cockroachdb/errors"
"google.golang.org/grpc"
Expand All @@ -17,6 +20,19 @@ import (
"google.golang.org/grpc/status"
)

// KeyVizSampler is the read-side abstraction the Admin service needs
// from the keyviz package: a time-bounded matrix snapshot. Defined
// here (not in keyviz) so tests can pass an in-memory fake without
// constructing a full *keyviz.MemSampler. *keyviz.MemSampler
// satisfies this interface.
type KeyVizSampler interface {
// Snapshot returns the matrix columns in [from, to). Either
// bound may be the zero time meaning unbounded on that side.
// Implementations must return rows the caller can mutate freely
// (a deep copy) — see keyviz.MemSampler.Snapshot.
Snapshot(from, to time.Time) []keyviz.MatrixColumn
}

// AdminGroup exposes per-Raft-group state to the Admin service. It is a narrow
// subset of raftengine.Engine so tests can supply an in-memory fake without
// standing up a real Raft cluster. Configuration is polled on each
Expand Down Expand Up @@ -56,6 +72,12 @@ type AdminServer struct {
// instance cannot contend with concurrent RPCs on another instance.
now func() time.Time

// sampler exposes the keyviz heatmap matrix to GetKeyVizMatrix.
// Nil means keyviz is disabled — the RPC returns Unavailable.
// Guarded by groupsMu (same lock as groups/now) so RegisterSampler
// pairs atomically with concurrent RPC reads.
sampler KeyVizSampler

pb.UnimplementedAdminServer
}

Expand Down Expand Up @@ -97,6 +119,16 @@ func (s *AdminServer) RegisterGroup(groupID uint64, g AdminGroup) {
s.groupsMu.Unlock()
}

// RegisterSampler wires the keyviz sampler used by GetKeyVizMatrix.
// Without this call (or with a nil sampler) the RPC returns
// codes.Unavailable so callers can distinguish "keyviz disabled"
// from "no data yet".
func (s *AdminServer) RegisterSampler(sampler KeyVizSampler) {
s.groupsMu.Lock()
s.sampler = sampler
s.groupsMu.Unlock()
}

// GetClusterOverview returns the local node identity, the current member
// list, and per-group leader identity collected from the engines registered
// via RegisterGroup. The member list is the union of (a) the bootstrap seed
Expand Down Expand Up @@ -477,3 +509,166 @@ func AdminTokenAuth(token string) (grpc.UnaryServerInterceptor, grpc.StreamServe
// ErrAdminTokenRequired is returned by NewAdminServer helpers when the operator
// failed to supply a token and also did not opt into insecure mode.
var ErrAdminTokenRequired = errors.New("admin token file required; pass --adminInsecureNoAuth to run without")

// GetKeyVizMatrix renders the keyviz heatmap matrix for the [from, to)
// range supplied by the request, returning one KeyVizRow per tracked
// route or virtual bucket and a parallel column-timestamp slice.
//
// Series selection (Reads / Writes / ReadBytes / WriteBytes) maps from
// the request's KeyVizSeries enum to the matching keyviz.MatrixRow
// counter; KEYVIZ_SERIES_UNSPECIFIED defaults to Reads.
//
// Returns codes.Unavailable when no sampler is registered (keyviz
// disabled) so callers can distinguish that from "no data yet"
// (which yields a successful empty response).
func (s *AdminServer) GetKeyVizMatrix(
_ context.Context,
req *pb.GetKeyVizMatrixRequest,
) (*pb.GetKeyVizMatrixResponse, error) {
s.groupsMu.RLock()
sampler := s.sampler
s.groupsMu.RUnlock()
if sampler == nil {
return nil, errors.WithStack(status.Error(codes.Unavailable, "keyviz sampler not configured on this node"))
}
from := unixMsToTime(req.GetFromUnixMs())
to := unixMsToTime(req.GetToUnixMs())
cols := sampler.Snapshot(from, to)
pickValue := matrixSeriesPicker(req.GetSeries())
return matrixToProto(cols, pickValue, int(req.GetRows())), nil
}

// unixMsToTime converts a Unix-millisecond timestamp into a time.Time,
// returning the zero Time when the input is zero so the sampler reads
// an unbounded range on that side.
func unixMsToTime(ms int64) time.Time {
if ms == 0 {
return time.Time{}
}
return time.UnixMilli(ms)
}

// matrixSeriesPicker returns a callback that extracts the requested
// counter from a MatrixRow. KEYVIZ_SERIES_UNSPECIFIED (and READS)
// fall through to Reads so a default-valued request still returns
// something useful.
func matrixSeriesPicker(series pb.KeyVizSeries) func(keyviz.MatrixRow) uint64 {
switch series {
case pb.KeyVizSeries_KEYVIZ_SERIES_WRITES:
return func(r keyviz.MatrixRow) uint64 { return r.Writes }
case pb.KeyVizSeries_KEYVIZ_SERIES_READ_BYTES:
return func(r keyviz.MatrixRow) uint64 { return r.ReadBytes }
case pb.KeyVizSeries_KEYVIZ_SERIES_WRITE_BYTES:
return func(r keyviz.MatrixRow) uint64 { return r.WriteBytes }
case pb.KeyVizSeries_KEYVIZ_SERIES_UNSPECIFIED, pb.KeyVizSeries_KEYVIZ_SERIES_READS:
return func(r keyviz.MatrixRow) uint64 { return r.Reads }
default:
return func(r keyviz.MatrixRow) uint64 { return r.Reads }
}
}

// matrixToProto pivots the column-major MatrixColumn slice into the
// row-major proto layout: one KeyVizRow per distinct RouteID with a
// values slice aligned to the column_unix_ms parallel slice. Idle
// routes (zero in every column) are not emitted by the sampler, so
// the row set already reflects observed activity in [from, to).
//
// rowBudget caps how many rows the response carries — passing
// 0 means "no cap." When the budget would be exceeded, rows are
// sorted by total activity across the requested series and the
// top-N retained, so callers asking for a compact matrix do not
// receive a payload that scales with the route count.
func matrixToProto(cols []keyviz.MatrixColumn, pick func(keyviz.MatrixRow) uint64, rowBudget int) *pb.GetKeyVizMatrixResponse {
resp := &pb.GetKeyVizMatrixResponse{
ColumnUnixMs: make([]int64, len(cols)),
}
rowsByID := make(map[uint64]*pb.KeyVizRow)
order := make([]uint64, 0)
for j, col := range cols {
resp.ColumnUnixMs[j] = col.At.UnixMilli()
for _, mr := range col.Rows {
pr, ok := rowsByID[mr.RouteID]
if !ok {
pr = newKeyVizRowFrom(mr, len(cols))
rowsByID[mr.RouteID] = pr
order = append(order, mr.RouteID)
}
pr.Values[j] = pick(mr)
}
}
resp.Rows = make([]*pb.KeyVizRow, len(order))
for i, id := range order {
resp.Rows[i] = rowsByID[id]
}
resp.Rows = applyKeyVizRowBudget(resp.Rows, rowBudget)
sortKeyVizRowsByStart(resp.Rows)
return resp
}

// applyKeyVizRowBudget caps rows to budget by total activity per row
// (sum of per-column values), preserving the top-N rows. budget <= 0
// means "no cap."
func applyKeyVizRowBudget(rows []*pb.KeyVizRow, budget int) []*pb.KeyVizRow {
if budget <= 0 || len(rows) <= budget {
return rows
}
sort.Slice(rows, func(i, j int) bool {
return rowActivityTotal(rows[i]) > rowActivityTotal(rows[j])
})
return rows[:budget]
}

func rowActivityTotal(r *pb.KeyVizRow) uint64 {
var sum uint64
for _, v := range r.Values {
sum += v
}
return sum
}

// newKeyVizRowFrom seeds a proto row from the first MatrixRow seen
// for a given RouteID. Values is allocated with len == numCols so
// every column gets a deterministic slot (zero-valued by default).
//
// route_count surfaces MemberRoutesTotal (the true number of routes
// folded into the bucket) — not just len(MemberRoutes), which the
// sampler caps at MaxMemberRoutesPerSlot. When the visible list is
// shorter than the total, route_ids_truncated lets consumers know
// to trust route_count for drill-down decisions.
func newKeyVizRowFrom(mr keyviz.MatrixRow, numCols int) *pb.KeyVizRow {
total := mr.MemberRoutesTotal
if !mr.Aggregate && total == 0 {
// Individual slots fall through to RouteCount=1 when the
// sampler predates MemberRoutesTotal or never set it.
total = 1
}
row := &pb.KeyVizRow{
BucketId: bucketIDFor(mr),
Start: append([]byte(nil), mr.Start...),
End: append([]byte(nil), mr.End...),
Aggregate: mr.Aggregate,
RouteCount: total,
RouteIdsTruncated: mr.Aggregate && total > uint64(len(mr.MemberRoutes)),
Values: make([]uint64, numCols),
}
if mr.Aggregate {
row.RouteIds = append([]uint64(nil), mr.MemberRoutes...)
}
return row
}

func bucketIDFor(mr keyviz.MatrixRow) string {
if mr.Aggregate {
return "virtual:" + strconv.FormatUint(mr.RouteID, 10)
}
return "route:" + strconv.FormatUint(mr.RouteID, 10)
}

func sortKeyVizRowsByStart(rows []*pb.KeyVizRow) {
sort.Slice(rows, func(i, j int) bool {
if c := bytes.Compare(rows[i].Start, rows[j].Start); c != 0 {
return c < 0
}
return rows[i].BucketId < rows[j].BucketId
})
}
Loading
Loading