11package adapter
22
33import (
4+ "bytes"
45 "context"
56 "crypto/subtle"
67 "sort"
8+ "strconv"
79 "strings"
810 "sync"
911 "time"
1012
1113 "github.com/bootjp/elastickv/internal/raftengine"
14+ "github.com/bootjp/elastickv/keyviz"
1215 pb "github.com/bootjp/elastickv/proto"
1316 "github.com/cockroachdb/errors"
1417 "google.golang.org/grpc"
@@ -17,6 +20,19 @@ import (
1720 "google.golang.org/grpc/status"
1821)
1922
23+ // KeyVizSampler is the read-side abstraction the Admin service needs
24+ // from the keyviz package: a time-bounded matrix snapshot. Defined
25+ // here (not in keyviz) so tests can pass an in-memory fake without
26+ // constructing a full *keyviz.MemSampler. *keyviz.MemSampler
27+ // satisfies this interface.
28+ type KeyVizSampler interface {
29+ // Snapshot returns the matrix columns in [from, to). Either
30+ // bound may be the zero time meaning unbounded on that side.
31+ // Implementations must return rows the caller can mutate freely
32+ // (a deep copy) — see keyviz.MemSampler.Snapshot.
33+ Snapshot (from , to time.Time ) []keyviz.MatrixColumn
34+ }
35+
2036// AdminGroup exposes per-Raft-group state to the Admin service. It is a narrow
2137// subset of raftengine.Engine so tests can supply an in-memory fake without
2238// standing up a real Raft cluster. Configuration is polled on each
@@ -56,6 +72,12 @@ type AdminServer struct {
5672 // instance cannot contend with concurrent RPCs on another instance.
5773 now func () time.Time
5874
75+ // sampler exposes the keyviz heatmap matrix to GetKeyVizMatrix.
76+ // Nil means keyviz is disabled — the RPC returns Unavailable.
77+ // Guarded by groupsMu (same lock as groups/now) so RegisterSampler
78+ // pairs atomically with concurrent RPC reads.
79+ sampler KeyVizSampler
80+
5981 pb.UnimplementedAdminServer
6082}
6183
@@ -97,6 +119,16 @@ func (s *AdminServer) RegisterGroup(groupID uint64, g AdminGroup) {
97119 s .groupsMu .Unlock ()
98120}
99121
122+ // RegisterSampler wires the keyviz sampler used by GetKeyVizMatrix.
123+ // Without this call (or with a nil sampler) the RPC returns
124+ // codes.Unavailable so callers can distinguish "keyviz disabled"
125+ // from "no data yet".
126+ func (s * AdminServer ) RegisterSampler (sampler KeyVizSampler ) {
127+ s .groupsMu .Lock ()
128+ s .sampler = sampler
129+ s .groupsMu .Unlock ()
130+ }
131+
100132// GetClusterOverview returns the local node identity, the current member
101133// list, and per-group leader identity collected from the engines registered
102134// via RegisterGroup. The member list is the union of (a) the bootstrap seed
@@ -477,3 +509,166 @@ func AdminTokenAuth(token string) (grpc.UnaryServerInterceptor, grpc.StreamServe
477509// ErrAdminTokenRequired is returned by NewAdminServer helpers when the operator
478510// failed to supply a token and also did not opt into insecure mode.
479511var ErrAdminTokenRequired = errors .New ("admin token file required; pass --adminInsecureNoAuth to run without" )
512+
513+ // GetKeyVizMatrix renders the keyviz heatmap matrix for the [from, to)
514+ // range supplied by the request, returning one KeyVizRow per tracked
515+ // route or virtual bucket and a parallel column-timestamp slice.
516+ //
517+ // Series selection (Reads / Writes / ReadBytes / WriteBytes) maps from
518+ // the request's KeyVizSeries enum to the matching keyviz.MatrixRow
519+ // counter; KEYVIZ_SERIES_UNSPECIFIED defaults to Reads.
520+ //
521+ // Returns codes.Unavailable when no sampler is registered (keyviz
522+ // disabled) so callers can distinguish that from "no data yet"
523+ // (which yields a successful empty response).
524+ func (s * AdminServer ) GetKeyVizMatrix (
525+ _ context.Context ,
526+ req * pb.GetKeyVizMatrixRequest ,
527+ ) (* pb.GetKeyVizMatrixResponse , error ) {
528+ s .groupsMu .RLock ()
529+ sampler := s .sampler
530+ s .groupsMu .RUnlock ()
531+ if sampler == nil {
532+ return nil , errors .WithStack (status .Error (codes .Unavailable , "keyviz sampler not configured on this node" ))
533+ }
534+ from := unixMsToTime (req .GetFromUnixMs ())
535+ to := unixMsToTime (req .GetToUnixMs ())
536+ cols := sampler .Snapshot (from , to )
537+ pickValue := matrixSeriesPicker (req .GetSeries ())
538+ return matrixToProto (cols , pickValue , int (req .GetRows ())), nil
539+ }
540+
541+ // unixMsToTime converts a Unix-millisecond timestamp into a time.Time,
542+ // returning the zero Time when the input is zero so the sampler reads
543+ // an unbounded range on that side.
544+ func unixMsToTime (ms int64 ) time.Time {
545+ if ms == 0 {
546+ return time.Time {}
547+ }
548+ return time .UnixMilli (ms )
549+ }
550+
551+ // matrixSeriesPicker returns a callback that extracts the requested
552+ // counter from a MatrixRow. KEYVIZ_SERIES_UNSPECIFIED (and READS)
553+ // fall through to Reads so a default-valued request still returns
554+ // something useful.
555+ func matrixSeriesPicker (series pb.KeyVizSeries ) func (keyviz.MatrixRow ) uint64 {
556+ switch series {
557+ case pb .KeyVizSeries_KEYVIZ_SERIES_WRITES :
558+ return func (r keyviz.MatrixRow ) uint64 { return r .Writes }
559+ case pb .KeyVizSeries_KEYVIZ_SERIES_READ_BYTES :
560+ return func (r keyviz.MatrixRow ) uint64 { return r .ReadBytes }
561+ case pb .KeyVizSeries_KEYVIZ_SERIES_WRITE_BYTES :
562+ return func (r keyviz.MatrixRow ) uint64 { return r .WriteBytes }
563+ case pb .KeyVizSeries_KEYVIZ_SERIES_UNSPECIFIED , pb .KeyVizSeries_KEYVIZ_SERIES_READS :
564+ return func (r keyviz.MatrixRow ) uint64 { return r .Reads }
565+ default :
566+ return func (r keyviz.MatrixRow ) uint64 { return r .Reads }
567+ }
568+ }
569+
570+ // matrixToProto pivots the column-major MatrixColumn slice into the
571+ // row-major proto layout: one KeyVizRow per distinct RouteID with a
572+ // values slice aligned to the column_unix_ms parallel slice. Idle
573+ // routes (zero in every column) are not emitted by the sampler, so
574+ // the row set already reflects observed activity in [from, to).
575+ //
576+ // rowBudget caps how many rows the response carries — passing
577+ // 0 means "no cap." When the budget would be exceeded, rows are
578+ // sorted by total activity across the requested series and the
579+ // top-N retained, so callers asking for a compact matrix do not
580+ // receive a payload that scales with the route count.
581+ func matrixToProto (cols []keyviz.MatrixColumn , pick func (keyviz.MatrixRow ) uint64 , rowBudget int ) * pb.GetKeyVizMatrixResponse {
582+ resp := & pb.GetKeyVizMatrixResponse {
583+ ColumnUnixMs : make ([]int64 , len (cols )),
584+ }
585+ rowsByID := make (map [uint64 ]* pb.KeyVizRow )
586+ order := make ([]uint64 , 0 )
587+ for j , col := range cols {
588+ resp .ColumnUnixMs [j ] = col .At .UnixMilli ()
589+ for _ , mr := range col .Rows {
590+ pr , ok := rowsByID [mr .RouteID ]
591+ if ! ok {
592+ pr = newKeyVizRowFrom (mr , len (cols ))
593+ rowsByID [mr .RouteID ] = pr
594+ order = append (order , mr .RouteID )
595+ }
596+ pr .Values [j ] = pick (mr )
597+ }
598+ }
599+ resp .Rows = make ([]* pb.KeyVizRow , len (order ))
600+ for i , id := range order {
601+ resp .Rows [i ] = rowsByID [id ]
602+ }
603+ resp .Rows = applyKeyVizRowBudget (resp .Rows , rowBudget )
604+ sortKeyVizRowsByStart (resp .Rows )
605+ return resp
606+ }
607+
608+ // applyKeyVizRowBudget caps rows to budget by total activity per row
609+ // (sum of per-column values), preserving the top-N rows. budget <= 0
610+ // means "no cap."
611+ func applyKeyVizRowBudget (rows []* pb.KeyVizRow , budget int ) []* pb.KeyVizRow {
612+ if budget <= 0 || len (rows ) <= budget {
613+ return rows
614+ }
615+ sort .Slice (rows , func (i , j int ) bool {
616+ return rowActivityTotal (rows [i ]) > rowActivityTotal (rows [j ])
617+ })
618+ return rows [:budget ]
619+ }
620+
621+ func rowActivityTotal (r * pb.KeyVizRow ) uint64 {
622+ var sum uint64
623+ for _ , v := range r .Values {
624+ sum += v
625+ }
626+ return sum
627+ }
628+
629+ // newKeyVizRowFrom seeds a proto row from the first MatrixRow seen
630+ // for a given RouteID. Values is allocated with len == numCols so
631+ // every column gets a deterministic slot (zero-valued by default).
632+ //
633+ // route_count surfaces MemberRoutesTotal (the true number of routes
634+ // folded into the bucket) — not just len(MemberRoutes), which the
635+ // sampler caps at MaxMemberRoutesPerSlot. When the visible list is
636+ // shorter than the total, route_ids_truncated lets consumers know
637+ // to trust route_count for drill-down decisions.
638+ func newKeyVizRowFrom (mr keyviz.MatrixRow , numCols int ) * pb.KeyVizRow {
639+ total := mr .MemberRoutesTotal
640+ if ! mr .Aggregate && total == 0 {
641+ // Individual slots fall through to RouteCount=1 when the
642+ // sampler predates MemberRoutesTotal or never set it.
643+ total = 1
644+ }
645+ row := & pb.KeyVizRow {
646+ BucketId : bucketIDFor (mr ),
647+ Start : append ([]byte (nil ), mr .Start ... ),
648+ End : append ([]byte (nil ), mr .End ... ),
649+ Aggregate : mr .Aggregate ,
650+ RouteCount : total ,
651+ RouteIdsTruncated : mr .Aggregate && total > uint64 (len (mr .MemberRoutes )),
652+ Values : make ([]uint64 , numCols ),
653+ }
654+ if mr .Aggregate {
655+ row .RouteIds = append ([]uint64 (nil ), mr .MemberRoutes ... )
656+ }
657+ return row
658+ }
659+
660+ func bucketIDFor (mr keyviz.MatrixRow ) string {
661+ if mr .Aggregate {
662+ return "virtual:" + strconv .FormatUint (mr .RouteID , 10 )
663+ }
664+ return "route:" + strconv .FormatUint (mr .RouteID , 10 )
665+ }
666+
667+ func sortKeyVizRowsByStart (rows []* pb.KeyVizRow ) {
668+ sort .Slice (rows , func (i , j int ) bool {
669+ if c := bytes .Compare (rows [i ].Start , rows [j ].Start ); c != 0 {
670+ return c < 0
671+ }
672+ return rows [i ].BucketId < rows [j ].BucketId
673+ })
674+ }
0 commit comments