Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 221 additions & 0 deletions adapter/admin_grpc.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package adapter

import (
"bytes"
"context"
"crypto/subtle"
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/bootjp/elastickv/internal/raftengine"
"github.com/bootjp/elastickv/keyviz"
pb "github.com/bootjp/elastickv/proto"
"github.com/cockroachdb/errors"
"google.golang.org/grpc"
Expand All @@ -17,6 +20,19 @@ import (
"google.golang.org/grpc/status"
)

// KeyVizSampler is the read-side abstraction the Admin service needs
// from the keyviz package: a time-bounded matrix snapshot. Defined
// here (not in keyviz) so tests can pass an in-memory fake without
// constructing a full *keyviz.MemSampler. *keyviz.MemSampler
// satisfies this interface.
type KeyVizSampler interface {
// Snapshot returns the matrix columns in [from, to). Either
// bound may be the zero time meaning unbounded on that side.
// Implementations must return rows the caller can mutate freely
// (a deep copy) — see keyviz.MemSampler.Snapshot.
Snapshot(from, to time.Time) []keyviz.MatrixColumn
}

// AdminGroup exposes per-Raft-group state to the Admin service. It is a narrow
// subset of raftengine.Engine so tests can supply an in-memory fake without
// standing up a real Raft cluster. Configuration is polled on each
Expand Down Expand Up @@ -56,6 +72,12 @@ type AdminServer struct {
// instance cannot contend with concurrent RPCs on another instance.
now func() time.Time

// sampler exposes the keyviz heatmap matrix to GetKeyVizMatrix.
// Nil means keyviz is disabled — the RPC returns Unavailable.
// Guarded by groupsMu (same lock as groups/now) so RegisterSampler
// pairs atomically with concurrent RPC reads.
sampler KeyVizSampler

pb.UnimplementedAdminServer
}

Expand Down Expand Up @@ -97,6 +119,16 @@ func (s *AdminServer) RegisterGroup(groupID uint64, g AdminGroup) {
s.groupsMu.Unlock()
}

// RegisterSampler wires the keyviz sampler used by GetKeyVizMatrix.
// Without this call (or with a nil sampler) the RPC returns
// codes.Unavailable so callers can distinguish "keyviz disabled"
// from "no data yet".
func (s *AdminServer) RegisterSampler(sampler KeyVizSampler) {
s.groupsMu.Lock()
s.sampler = sampler
s.groupsMu.Unlock()
}

// GetClusterOverview returns the local node identity, the current member
// list, and per-group leader identity collected from the engines registered
// via RegisterGroup. The member list is the union of (a) the bootstrap seed
Expand Down Expand Up @@ -477,3 +509,192 @@ func AdminTokenAuth(token string) (grpc.UnaryServerInterceptor, grpc.StreamServe
// ErrAdminTokenRequired is returned by NewAdminServer helpers when the operator
// failed to supply a token and also did not opt into insecure mode.
var ErrAdminTokenRequired = errors.New("admin token file required; pass --adminInsecureNoAuth to run without")

// GetKeyVizMatrix renders the keyviz heatmap matrix for the [from, to)
// range supplied by the request, returning one KeyVizRow per tracked
// route or virtual bucket and a parallel column-timestamp slice.
//
// Series selection (Reads / Writes / ReadBytes / WriteBytes) maps from
// the request's KeyVizSeries enum to the matching keyviz.MatrixRow
// counter; KEYVIZ_SERIES_UNSPECIFIED defaults to Reads.
//
// Returns codes.Unavailable when no sampler is registered (keyviz
// disabled) so callers can distinguish that from "no data yet"
// (which yields a successful empty response).
func (s *AdminServer) GetKeyVizMatrix(
_ context.Context,
req *pb.GetKeyVizMatrixRequest,
) (*pb.GetKeyVizMatrixResponse, error) {
s.groupsMu.RLock()
sampler := s.sampler
s.groupsMu.RUnlock()
if sampler == nil {
return nil, errors.WithStack(status.Error(codes.Unavailable, "keyviz sampler not configured on this node"))
}
from := unixMsToTime(req.GetFromUnixMs())
to := unixMsToTime(req.GetToUnixMs())
cols := sampler.Snapshot(from, to)
pickValue := matrixSeriesPicker(req.GetSeries())
return matrixToProto(cols, pickValue, clampRowBudget(int(req.GetRows()))), nil
}

// keyVizRowBudgetCap is the upper bound on the per-request rows
// budget — design doc §4.1 caps rows at 1024 to bound server work
// (sort + payload) for adversarial / over-large requests.
const keyVizRowBudgetCap = 1024

// clampRowBudget enforces design §4.1's upper bound. A request of 0
// (or negative) means "no cap" and is preserved; anything past the
// cap is silently clamped — clients asking for more rows than the
// server is willing to render get the most rows the server will
// render, not an error.
func clampRowBudget(requested int) int {
if requested > keyVizRowBudgetCap {
return keyVizRowBudgetCap
}
return requested
}

// unixMsToTime converts a Unix-millisecond timestamp into a time.Time,
// returning the zero Time when the input is zero so the sampler reads
// an unbounded range on that side.
func unixMsToTime(ms int64) time.Time {
if ms == 0 {
return time.Time{}
}
return time.UnixMilli(ms)
}

// matrixSeriesPicker returns a callback that extracts the requested
// counter from a MatrixRow. KEYVIZ_SERIES_UNSPECIFIED falls through
// to Writes per design doc §4.1 — write traffic is the primary
// signal the heatmap is built around, and the read path is wired in
// a follow-up phase.
func matrixSeriesPicker(series pb.KeyVizSeries) func(keyviz.MatrixRow) uint64 {
switch series {
case pb.KeyVizSeries_KEYVIZ_SERIES_READS:
return func(r keyviz.MatrixRow) uint64 { return r.Reads }
case pb.KeyVizSeries_KEYVIZ_SERIES_READ_BYTES:
return func(r keyviz.MatrixRow) uint64 { return r.ReadBytes }
case pb.KeyVizSeries_KEYVIZ_SERIES_WRITE_BYTES:
return func(r keyviz.MatrixRow) uint64 { return r.WriteBytes }
case pb.KeyVizSeries_KEYVIZ_SERIES_UNSPECIFIED, pb.KeyVizSeries_KEYVIZ_SERIES_WRITES:
return func(r keyviz.MatrixRow) uint64 { return r.Writes }
default:
return func(r keyviz.MatrixRow) uint64 { return r.Writes }
}
}

// matrixToProto pivots the column-major MatrixColumn slice into the
// row-major proto layout: one KeyVizRow per distinct RouteID with a
// values slice aligned to the column_unix_ms parallel slice. Idle
// routes (zero in every column) are not emitted by the sampler, so
// the row set already reflects observed activity in [from, to).
//
// rowBudget caps how many rows the response carries — passing
// 0 means "no cap." When the budget would be exceeded, rows are
// sorted by total activity across the requested series and the
// top-N retained, so callers asking for a compact matrix do not
// receive a payload that scales with the route count.
func matrixToProto(cols []keyviz.MatrixColumn, pick func(keyviz.MatrixRow) uint64, rowBudget int) *pb.GetKeyVizMatrixResponse {
resp := &pb.GetKeyVizMatrixResponse{
ColumnUnixMs: make([]int64, len(cols)),
}
rowsByID := make(map[uint64]*pb.KeyVizRow)
order := make([]uint64, 0)
for j, col := range cols {
resp.ColumnUnixMs[j] = col.At.UnixMilli()
for _, mr := range col.Rows {
pr, ok := rowsByID[mr.RouteID]
if !ok {
pr = newKeyVizRowFrom(mr, len(cols))
rowsByID[mr.RouteID] = pr
order = append(order, mr.RouteID)
}
pr.Values[j] = pick(mr)
}
}
resp.Rows = make([]*pb.KeyVizRow, len(order))
for i, id := range order {
resp.Rows[i] = rowsByID[id]
}
resp.Rows = applyKeyVizRowBudget(resp.Rows, rowBudget)
sortKeyVizRowsByStart(resp.Rows)
return resp
}

// applyKeyVizRowBudget caps rows to budget by total activity per row
// (sum of per-column values), preserving the top-N rows. budget <= 0
// means "no cap."
//
// NOTE: design doc §5.5 specifies a "lexicographic walk + greedy
// merge of low-activity adjacent ranges" algorithm — we simplify to
// activity-descending truncation for Phase 1 because it covers the
// common UI need (highlight hotspots) without needing the synthetic
// virtual-bucket plumbing the merge requires. Phase 2 should swap
// this for the spec'd merge so low-activity ranges become coarse
// aggregates instead of being silently dropped.
func applyKeyVizRowBudget(rows []*pb.KeyVizRow, budget int) []*pb.KeyVizRow {
if budget <= 0 || len(rows) <= budget {
return rows
}
sort.Slice(rows, func(i, j int) bool {
return rowActivityTotal(rows[i]) > rowActivityTotal(rows[j])
})
return rows[:budget]
}

func rowActivityTotal(r *pb.KeyVizRow) uint64 {
var sum uint64
for _, v := range r.Values {
sum += v
}
return sum
}

// newKeyVizRowFrom seeds a proto row from the first MatrixRow seen
// for a given RouteID. Values is allocated with len == numCols so
// every column gets a deterministic slot (zero-valued by default).
//
// route_count surfaces MemberRoutesTotal (the true number of routes
// folded into the bucket) — not just len(MemberRoutes), which the
// sampler caps at MaxMemberRoutesPerSlot. When the visible list is
// shorter than the total, route_ids_truncated lets consumers know
// to trust route_count for drill-down decisions.
func newKeyVizRowFrom(mr keyviz.MatrixRow, numCols int) *pb.KeyVizRow {
total := mr.MemberRoutesTotal
if !mr.Aggregate && total == 0 {
// Individual slots fall through to RouteCount=1 when the
// sampler predates MemberRoutesTotal or never set it.
total = 1
}
row := &pb.KeyVizRow{
BucketId: bucketIDFor(mr),
Start: append([]byte(nil), mr.Start...),
End: append([]byte(nil), mr.End...),
Aggregate: mr.Aggregate,
RouteCount: total,
RouteIdsTruncated: mr.Aggregate && total > uint64(len(mr.MemberRoutes)),
Values: make([]uint64, numCols),
}
if mr.Aggregate {
row.RouteIds = append([]uint64(nil), mr.MemberRoutes...)
}
return row
}

func bucketIDFor(mr keyviz.MatrixRow) string {
if mr.Aggregate {
return "virtual:" + strconv.FormatUint(mr.RouteID, 10)
}
return "route:" + strconv.FormatUint(mr.RouteID, 10)
}

func sortKeyVizRowsByStart(rows []*pb.KeyVizRow) {
sort.Slice(rows, func(i, j int) bool {
if c := bytes.Compare(rows[i].Start, rows[j].Start); c != 0 {
return c < 0
}
return rows[i].BucketId < rows[j].BucketId
})
}
Loading
Loading