diff --git a/internal/admin/keyviz_handler.go b/internal/admin/keyviz_handler.go new file mode 100644 index 000000000..222c8fa25 --- /dev/null +++ b/internal/admin/keyviz_handler.go @@ -0,0 +1,367 @@ +package admin + +import ( + "bytes" + "errors" + "log/slog" + "net/http" + "sort" + "strconv" + "time" + + "github.com/bootjp/elastickv/keyviz" + "github.com/goccy/go-json" +) + +// KeyVizSource is the small contract the keyviz handler depends on. +// Production wires this to a real *keyviz.MemSampler; tests use a +// stub that returns canned columns. +// +// Snapshot returns the matrix columns within [from, to). Either bound +// may be the zero Time meaning unbounded on that side. Implementations +// MUST return rows the caller can mutate freely (a deep copy) — see +// keyviz.MemSampler.Snapshot. +type KeyVizSource interface { + Snapshot(from, to time.Time) []keyviz.MatrixColumn +} + +// KeyVizSeries selects which counter on a MatrixRow the response +// surfaces in `Values`. Wire form mirrors the proto enum but uses +// lowercase strings so the SPA can pass `?series=writes` directly +// without an extra encoding round-trip. +type KeyVizSeries string + +const ( + keyVizSeriesReads KeyVizSeries = "reads" + keyVizSeriesWrites KeyVizSeries = "writes" + keyVizSeriesReadBytes KeyVizSeries = "read_bytes" + keyVizSeriesWriteBytes KeyVizSeries = "write_bytes" +) + +// keyVizDefaultSeries matches the design doc §4.1 default. Writes is +// the primary signal the heatmap is built around; reads will land in +// a follow-up phase (read sampling not yet wired). +const keyVizDefaultSeries = keyVizSeriesWrites + +// keyVizRowBudgetCap is the upper bound on the per-request row +// budget. Mirrors the same cap on the gRPC GetKeyVizMatrix RPC so +// the SPA cannot force unbounded payloads via the JSON path. Design +// doc §4.1. +const keyVizRowBudgetCap = 1024 + +// KeyVizMatrix is the row-major JSON wire form returned by +// /admin/api/v1/keyviz/matrix. Mirrors the proto GetKeyVizMatrixResponse +// shape so a future refactor can share a single pivot helper across +// the adapter (gRPC) and admin (JSON) paths. +type KeyVizMatrix struct { + ColumnUnixMs []int64 `json:"column_unix_ms"` + Rows []KeyVizRow `json:"rows"` + Series KeyVizSeries `json:"series"` + GeneratedAt time.Time `json:"generated_at"` +} + +// KeyVizRow is one route's worth of activity across the column window, +// matching the proto KeyVizRow layout. Values is parallel to +// KeyVizMatrix.ColumnUnixMs — Values[j] is the counter for that route +// at column j. +type KeyVizRow struct { + BucketID string `json:"bucket_id"` + Start []byte `json:"start"` + End []byte `json:"end"` + Aggregate bool `json:"aggregate"` + RouteIDs []uint64 `json:"route_ids,omitempty"` + RouteIDsTruncated bool `json:"route_ids_truncated,omitempty"` + RouteCount uint64 `json:"route_count"` + Values []uint64 `json:"values"` + // total accumulates the sum of Values during pivot so the + // rowBudget sort is O(N log N) on a precomputed key rather + // than O(N log N × M) recomputing the sum per comparison. + // Not on the wire — clients read activity off Values directly. + total uint64 +} + +// KeyVizHandler serves GET /admin/api/v1/keyviz/matrix. +// +// Query parameters (all optional): +// +// series - reads | writes | read_bytes | write_bytes (default: writes) +// from_unix_ms - lower bound in unix ms; 0 or omitted means unbounded +// on that side (NOT the Unix epoch) +// to_unix_ms - upper bound in unix ms; same 0 = unbounded contract +// rows - row budget; 0 means no cap, capped at 1024 (default: 0) +// +// Returns 503 codes.Unavailable when no sampler is configured so the +// SPA can distinguish "keyviz disabled" from "no data yet" (the +// latter is a successful empty matrix). +type KeyVizHandler struct { + source KeyVizSource + now func() time.Time + logger *slog.Logger +} + +// NewKeyVizHandler wires a KeyVizSource into the HTTP handler. +// source may be nil; calls to ServeHTTP will then return 503 with +// code "keyviz_disabled". +func NewKeyVizHandler(source KeyVizSource) *KeyVizHandler { + return &KeyVizHandler{ + source: source, + now: func() time.Time { return time.Now().UTC() }, + logger: slog.Default(), + } +} + +// WithLogger overrides the slog destination so main.go can attach a +// component tag without changing the constructor signature. +func (h *KeyVizHandler) WithLogger(l *slog.Logger) *KeyVizHandler { + if l == nil { + return h + } + h.logger = l + return h +} + +// WithClock lets tests inject a deterministic GeneratedAt. +func (h *KeyVizHandler) WithClock(now func() time.Time) *KeyVizHandler { + if now == nil { + return h + } + h.now = now + return h +} + +func (h *KeyVizHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + writeJSONError(w, http.StatusMethodNotAllowed, "method_not_allowed", "only GET") + return + } + if h.source == nil { + writeJSONError(w, http.StatusServiceUnavailable, "keyviz_disabled", + "key visualizer sampler is not configured on this node") + return + } + params, err := parseKeyVizParams(r) + if err != nil { + writeJSONError(w, http.StatusBadRequest, "invalid_query", err.Error()) + return + } + cols := h.source.Snapshot(params.from, params.to) + matrix := pivotKeyVizColumns(cols, params.series, params.rows) + matrix.GeneratedAt = h.now() + w.Header().Set("Content-Type", "application/json; charset=utf-8") + w.Header().Set("Cache-Control", "no-store") + w.WriteHeader(http.StatusOK) + if err := json.NewEncoder(w).Encode(matrix); err != nil { + h.logger.LogAttrs(r.Context(), slog.LevelWarn, "admin keyviz response encode failed", + slog.String("error", err.Error()), + ) + } +} + +// keyVizParams is the parsed query-string form of a matrix request. +type keyVizParams struct { + series KeyVizSeries + from time.Time + to time.Time + rows int +} + +func parseKeyVizParams(r *http.Request) (keyVizParams, error) { + p := keyVizParams{series: keyVizDefaultSeries} + q := r.URL.Query() + if err := setKeyVizSeriesParam(&p, q.Get("series")); err != nil { + return keyVizParams{}, err + } + if err := setKeyVizTimeParam(&p.from, "from_unix_ms", q.Get("from_unix_ms")); err != nil { + return keyVizParams{}, err + } + if err := setKeyVizTimeParam(&p.to, "to_unix_ms", q.Get("to_unix_ms")); err != nil { + return keyVizParams{}, err + } + if err := setKeyVizRowsParam(&p.rows, q.Get("rows")); err != nil { + return keyVizParams{}, err + } + return p, nil +} + +func setKeyVizSeriesParam(p *keyVizParams, raw string) error { + if raw == "" { + return nil + } + series, err := parseKeyVizSeries(raw) + if err != nil { + return err + } + p.series = series + return nil +} + +func setKeyVizTimeParam(dst *time.Time, name, raw string) error { + if raw == "" { + return nil + } + t, err := parseUnixMs(name, raw) + if err != nil { + return err + } + *dst = t + return nil +} + +func setKeyVizRowsParam(dst *int, raw string) error { + if raw == "" { + return nil + } + n, err := strconv.Atoi(raw) + if err != nil || n < 0 { + return errors.New("rows must be a non-negative integer") + } + if n > keyVizRowBudgetCap { + n = keyVizRowBudgetCap + } + *dst = n + return nil +} + +func parseKeyVizSeries(s string) (KeyVizSeries, error) { + switch KeyVizSeries(s) { + case keyVizSeriesReads, keyVizSeriesWrites, keyVizSeriesReadBytes, keyVizSeriesWriteBytes: + return KeyVizSeries(s), nil + default: + return "", errors.New("series must be one of: reads, writes, read_bytes, write_bytes") + } +} + +func parseUnixMs(name, raw string) (time.Time, error) { + n, err := strconv.ParseInt(raw, 10, 64) + if err != nil { + return time.Time{}, errors.New(name + " must be an integer (unix milliseconds)") + } + if n == 0 { + return time.Time{}, nil + } + return time.UnixMilli(n).UTC(), nil +} + +// pivotKeyVizColumns flips the column-major MatrixColumn slice into +// the row-major JSON shape, picks the requested series counter from +// each MatrixRow, and applies the rowBudget cap (top-N by total +// activity) before sorting back into Start order. +// +// Mirrors adapter.matrixToProto exactly — the duplication is +// intentional for now to keep the gRPC and JSON paths independent; +// extracting a shared helper into the keyviz package is a future +// cleanup. +func pivotKeyVizColumns(cols []keyviz.MatrixColumn, series KeyVizSeries, rowBudget int) KeyVizMatrix { + pick := keyVizSeriesPicker(series) + matrix := KeyVizMatrix{ + Series: series, + ColumnUnixMs: make([]int64, len(cols)), + } + rowsByID := make(map[uint64]*KeyVizRow) + order := make([]uint64, 0) + for j, col := range cols { + matrix.ColumnUnixMs[j] = col.At.UnixMilli() + for _, mr := range col.Rows { + row, ok := rowsByID[mr.RouteID] + if !ok { + row = newKeyVizRowFrom(mr, len(cols)) + rowsByID[mr.RouteID] = row + order = append(order, mr.RouteID) + } + v := pick(mr) + row.Values[j] = v + row.total += v + } + } + matrix.Rows = make([]KeyVizRow, len(order)) + for i, id := range order { + matrix.Rows[i] = *rowsByID[id] + } + matrix.Rows = applyKeyVizRowBudget(matrix.Rows, rowBudget) + sortKeyVizRowsByStart(matrix.Rows) + return matrix +} + +func keyVizSeriesPicker(series KeyVizSeries) func(keyviz.MatrixRow) uint64 { + switch series { + case keyVizSeriesReads: + return func(r keyviz.MatrixRow) uint64 { return r.Reads } + case keyVizSeriesReadBytes: + return func(r keyviz.MatrixRow) uint64 { return r.ReadBytes } + case keyVizSeriesWriteBytes: + return func(r keyviz.MatrixRow) uint64 { return r.WriteBytes } + case keyVizSeriesWrites: + return func(r keyviz.MatrixRow) uint64 { return r.Writes } + default: + return func(r keyviz.MatrixRow) uint64 { return r.Writes } + } +} + +func newKeyVizRowFrom(mr keyviz.MatrixRow, numCols int) *KeyVizRow { + total := mr.MemberRoutesTotal + switch { + case !mr.Aggregate && total == 0: + // Individual slots with the field zero-initialised — every + // real route contributes exactly one member to itself. + total = 1 + case mr.Aggregate && total == 0: + // Defensive fallback: a virtual bucket should always carry a + // non-zero MemberRoutesTotal once foldIntoBucket has run, but + // if a sampler ever serialises a just-coalesced bucket before + // the count is set the SPA would render "0 routes" — which is + // nonsense for an aggregate row. Fall back to the visible + // MemberRoutes length so route_count stays meaningful. + total = uint64(len(mr.MemberRoutes)) + } + row := &KeyVizRow{ + BucketID: bucketIDFor(mr), + Start: append([]byte(nil), mr.Start...), + End: append([]byte(nil), mr.End...), + Aggregate: mr.Aggregate, + RouteCount: total, + RouteIDsTruncated: mr.Aggregate && total > uint64(len(mr.MemberRoutes)), + Values: make([]uint64, numCols), + } + if mr.Aggregate { + row.RouteIDs = append([]uint64(nil), mr.MemberRoutes...) + } + return row +} + +func bucketIDFor(mr keyviz.MatrixRow) string { + if mr.Aggregate { + return "virtual:" + strconv.FormatUint(mr.RouteID, 10) + } + return "route:" + strconv.FormatUint(mr.RouteID, 10) +} + +// applyKeyVizRowBudget mirrors the adapter Phase-1 simplification: +// activity-descending truncation rather than design §5.5's lexicographic +// merge. Future work should swap in the spec'd merge once the +// virtual-bucket plumbing supports synthesis at the response layer. +// +// Sort uses the precomputed row.total (accumulated during pivot) so +// the comparator is O(1), not O(M). BucketID breaks activity ties +// deterministically — the SPA refresh on the same data must yield the +// same row set. +func applyKeyVizRowBudget(rows []KeyVizRow, budget int) []KeyVizRow { + if budget <= 0 || len(rows) <= budget { + return rows + } + sort.Slice(rows, func(i, j int) bool { + if rows[i].total != rows[j].total { + return rows[i].total > rows[j].total + } + return rows[i].BucketID < rows[j].BucketID + }) + return rows[:budget] +} + +func sortKeyVizRowsByStart(rows []KeyVizRow) { + sort.Slice(rows, func(i, j int) bool { + if c := bytes.Compare(rows[i].Start, rows[j].Start); c != 0 { + return c < 0 + } + return rows[i].BucketID < rows[j].BucketID + }) +} diff --git a/internal/admin/keyviz_handler_test.go b/internal/admin/keyviz_handler_test.go new file mode 100644 index 000000000..319461543 --- /dev/null +++ b/internal/admin/keyviz_handler_test.go @@ -0,0 +1,389 @@ +package admin + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "strconv" + "testing" + "time" + + "github.com/bootjp/elastickv/keyviz" + "github.com/stretchr/testify/require" +) + +// keyVizGet performs a GET against the test server with a request +// context (so the noctx linter is satisfied) and returns the parsed +// response body for the caller to inspect. +func keyVizGet(t *testing.T, url string) *http.Response { + t.Helper() + req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, url, nil) + require.NoError(t, err) + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + return resp +} + +// fakeKeyVizSource is a deterministic in-memory KeyVizSource so the +// handler tests don't need to drive a real *keyviz.MemSampler with +// goroutines and time. +type fakeKeyVizSource struct { + cols []keyviz.MatrixColumn +} + +func (f *fakeKeyVizSource) Snapshot(_, _ time.Time) []keyviz.MatrixColumn { + out := make([]keyviz.MatrixColumn, len(f.cols)) + for i, c := range f.cols { + rows := make([]keyviz.MatrixRow, len(c.Rows)) + for j, r := range c.Rows { + rows[j] = r + rows[j].Start = append([]byte(nil), r.Start...) + rows[j].End = append([]byte(nil), r.End...) + if len(r.MemberRoutes) > 0 { + rows[j].MemberRoutes = append([]uint64(nil), r.MemberRoutes...) + } + } + out[i] = keyviz.MatrixColumn{At: c.At, Rows: rows} + } + return out +} + +func newKeyVizTestServer(t *testing.T, source KeyVizSource) *httptest.Server { + t.Helper() + h := NewKeyVizHandler(source).WithClock(func() time.Time { + return time.Unix(1_700_000_000, 0).UTC() + }) + return httptest.NewServer(h) +} + +// TestKeyVizHandlerReturnsServiceUnavailableWhenNoSource pins the +// "keyviz disabled" signal so the SPA can render a clear feature-off +// state instead of an empty matrix indistinguishable from "no +// activity yet." +func TestKeyVizHandlerReturnsServiceUnavailableWhenNoSource(t *testing.T) { + t.Parallel() + srv := newKeyVizTestServer(t, nil) + defer srv.Close() + + resp := keyVizGet(t, srv.URL) + defer resp.Body.Close() + require.Equal(t, http.StatusServiceUnavailable, resp.StatusCode) + + var body map[string]string + require.NoError(t, json.NewDecoder(resp.Body).Decode(&body)) + require.Equal(t, "keyviz_disabled", body["error"]) +} + +// TestKeyVizHandlerRejectsNonGet pins the method allow-list so a +// stray POST from a misbehaving client doesn't surface as 200 with +// an empty matrix. +func TestKeyVizHandlerRejectsNonGet(t *testing.T) { + t.Parallel() + srv := newKeyVizTestServer(t, &fakeKeyVizSource{}) + defer srv.Close() + + req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, srv.URL, nil) + require.NoError(t, err) + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + require.Equal(t, http.StatusMethodNotAllowed, resp.StatusCode) +} + +// TestKeyVizHandlerPivotsMatrix pins the JSON wire shape: row-major +// layout (one KeyVizRow per RouteID, values aligned to the parallel +// column_unix_ms slice), the requested series counter (default writes), +// and Start-order sort. +func TestKeyVizHandlerPivotsMatrix(t *testing.T) { + t.Parallel() + t0 := time.Unix(1_700_000_000, 0) + t1 := t0.Add(time.Minute) + srv := newKeyVizTestServer(t, &fakeKeyVizSource{cols: []keyviz.MatrixColumn{ + { + At: t0, + Rows: []keyviz.MatrixRow{ + {RouteID: 1, Start: []byte("a"), End: []byte("m"), Writes: 4, Reads: 1}, + {RouteID: 2, Start: []byte("m"), End: []byte("z"), Writes: 7, Reads: 0}, + }, + }, + { + At: t1, + Rows: []keyviz.MatrixRow{ + {RouteID: 1, Start: []byte("a"), End: []byte("m"), Writes: 9, Reads: 3}, + }, + }, + }}) + defer srv.Close() + + resp := keyVizGet(t, srv.URL) + defer resp.Body.Close() + require.Equal(t, http.StatusOK, resp.StatusCode) + + var matrix KeyVizMatrix + require.NoError(t, json.NewDecoder(resp.Body).Decode(&matrix)) + require.Equal(t, KeyVizSeries("writes"), matrix.Series, "default series must be writes") + require.Equal(t, []int64{t0.UnixMilli(), t1.UnixMilli()}, matrix.ColumnUnixMs) + require.Len(t, matrix.Rows, 2) + + r1, r2 := matrix.Rows[0], matrix.Rows[1] + require.Equal(t, "route:1", r1.BucketID) + require.Equal(t, "route:2", r2.BucketID) + require.Equal(t, []uint64{4, 9}, r1.Values) + // Route 2 absent in column 1 → zero by default. + require.Equal(t, []uint64{7, 0}, r2.Values) +} + +// TestKeyVizHandlerSeriesParam pins the ?series=... query parameter +// dispatching across all four enum values. +func TestKeyVizHandlerSeriesParam(t *testing.T) { + t.Parallel() + row := keyviz.MatrixRow{ + RouteID: 1, Start: []byte("a"), End: []byte("z"), + Reads: 11, Writes: 22, ReadBytes: 333, WriteBytes: 4444, + } + srv := newKeyVizTestServer(t, &fakeKeyVizSource{cols: []keyviz.MatrixColumn{ + {At: time.Unix(1_700_000_000, 0), Rows: []keyviz.MatrixRow{row}}, + }}) + defer srv.Close() + + for _, tc := range []struct { + series string + want uint64 + }{ + {"reads", 11}, + {"writes", 22}, + {"read_bytes", 333}, + {"write_bytes", 4444}, + } { + t.Run(tc.series, func(t *testing.T) { + // Sequential: the parent's `defer srv.Close()` would fire + // before parallel subtests get to dial. + resp := keyVizGet(t, srv.URL+"?series="+tc.series) + defer resp.Body.Close() + require.Equal(t, http.StatusOK, resp.StatusCode) + var matrix KeyVizMatrix + require.NoError(t, json.NewDecoder(resp.Body).Decode(&matrix)) + require.Equal(t, KeyVizSeries(tc.series), matrix.Series) + require.Equal(t, []uint64{tc.want}, matrix.Rows[0].Values) + }) + } +} + +// TestKeyVizHandlerSeriesParamRejectsUnknown pins input validation: +// an unknown series surfaces as 400 invalid_query so the SPA gets a +// crisp error rather than silently degrading to the default. +func TestKeyVizHandlerSeriesParamRejectsUnknown(t *testing.T) { + t.Parallel() + srv := newKeyVizTestServer(t, &fakeKeyVizSource{cols: []keyviz.MatrixColumn{ + {At: time.Unix(1_700_000_000, 0), Rows: []keyviz.MatrixRow{ + {RouteID: 1, Start: []byte("a"), End: []byte("z"), Writes: 1}, + }}, + }}) + defer srv.Close() + + resp := keyVizGet(t, srv.URL+"?series=bogus") + defer resp.Body.Close() + require.Equal(t, http.StatusBadRequest, resp.StatusCode) + + var body map[string]string + require.NoError(t, json.NewDecoder(resp.Body).Decode(&body)) + require.Equal(t, "invalid_query", body["error"]) +} + +// TestKeyVizHandlerHonorsRowsBudget pins the rows cap: the request +// truncates to top-N rows by activity (then sorts by Start), matching +// the gRPC handler's Phase-1 simplification. +func TestKeyVizHandlerHonorsRowsBudget(t *testing.T) { + t.Parallel() + srv := newKeyVizTestServer(t, &fakeKeyVizSource{cols: []keyviz.MatrixColumn{ + { + At: time.Unix(1_700_000_000, 0), + Rows: []keyviz.MatrixRow{ + {RouteID: 1, Start: []byte("a"), End: []byte("b"), Writes: 1}, + {RouteID: 2, Start: []byte("b"), End: []byte("c"), Writes: 100}, + {RouteID: 3, Start: []byte("c"), End: []byte("d"), Writes: 5}, + {RouteID: 4, Start: []byte("d"), End: []byte("e"), Writes: 50}, + }, + }, + }}) + defer srv.Close() + + resp := keyVizGet(t, srv.URL+"?rows=2") + defer resp.Body.Close() + require.Equal(t, http.StatusOK, resp.StatusCode) + + var matrix KeyVizMatrix + require.NoError(t, json.NewDecoder(resp.Body).Decode(&matrix)) + require.Len(t, matrix.Rows, 2) + // Top-2 by Writes activity = routes 2 (100) and 4 (50). + // Then sorted by Start: "b" before "d". + require.Equal(t, "route:2", matrix.Rows[0].BucketID) + require.Equal(t, "route:4", matrix.Rows[1].BucketID) +} + +// TestKeyVizHandlerEncodesAggregateBucket pins the aggregate-row +// proto-equivalent layout: bucket_id prefixed "virtual:", aggregate +// flag, route_count from MemberRoutesTotal (not len(MemberRoutes)), +// and route_ids_truncated when the cap was exceeded. +func TestKeyVizHandlerEncodesAggregateBucket(t *testing.T) { + t.Parallel() + srv := newKeyVizTestServer(t, &fakeKeyVizSource{cols: []keyviz.MatrixColumn{ + { + At: time.Unix(1_700_000_000, 0), + Rows: []keyviz.MatrixRow{ + { + RouteID: ^uint64(0), + Start: []byte("c"), + End: []byte("d"), + Aggregate: true, + MemberRoutes: []uint64{2, 3}, + MemberRoutesTotal: 9, + Writes: 100, + }, + }, + }, + }}) + defer srv.Close() + + resp := keyVizGet(t, srv.URL) + defer resp.Body.Close() + + var matrix KeyVizMatrix + require.NoError(t, json.NewDecoder(resp.Body).Decode(&matrix)) + require.Len(t, matrix.Rows, 1) + r := matrix.Rows[0] + require.True(t, r.Aggregate) + require.Equal(t, "virtual:18446744073709551615", r.BucketID) + require.Equal(t, uint64(9), r.RouteCount) + require.True(t, r.RouteIDsTruncated) + require.Equal(t, []uint64{2, 3}, r.RouteIDs) +} + +// TestKeyVizHandlerRowsBudgetTieBreakDeterministic pins Gemini round-1 +// nit: when two rows tie on activity total, the rows-budget truncation +// must pick the same set every refresh. Tie-break is BucketID +// ascending so a re-poll on identical data yields identical rows. +func TestKeyVizHandlerRowsBudgetTieBreakDeterministic(t *testing.T) { + t.Parallel() + srv := newKeyVizTestServer(t, &fakeKeyVizSource{cols: []keyviz.MatrixColumn{ + { + At: time.Unix(1_700_000_000, 0), + Rows: []keyviz.MatrixRow{ + {RouteID: 3, Start: []byte("c"), End: []byte("d"), Writes: 10}, + {RouteID: 1, Start: []byte("a"), End: []byte("b"), Writes: 10}, + {RouteID: 2, Start: []byte("b"), End: []byte("c"), Writes: 10}, + }, + }, + }}) + defer srv.Close() + + for i := 0; i < 3; i++ { + resp := keyVizGet(t, srv.URL+"?rows=2") + var matrix KeyVizMatrix + require.NoError(t, json.NewDecoder(resp.Body).Decode(&matrix)) + resp.Body.Close() + require.Len(t, matrix.Rows, 2, "iteration %d", i) + // BucketID tie-break: route:1, route:2 win over route:3. + // After the budget cap they sort by Start, giving "a" then "b". + require.Equal(t, "route:1", matrix.Rows[0].BucketID, "iteration %d", i) + require.Equal(t, "route:2", matrix.Rows[1].BucketID, "iteration %d", i) + } +} + +// TestKeyVizHandlerTimeBoundsParam exercises the from_unix_ms / +// to_unix_ms query parameters: a non-zero pair filters columns to the +// requested half-open window, while 0 means "unbounded on that side" +// (NOT the Unix epoch). The fakeKeyVizSource here does not actually +// honour the bounds (its Snapshot ignores them) — what we're pinning +// is the parse/dispatch contract: a parsable pair must yield 200, +// 0 must reach the source as the zero Time, and a non-numeric value +// must surface as 400 invalid_query. +func TestKeyVizHandlerTimeBoundsParam(t *testing.T) { + t.Parallel() + captured := &capturingKeyVizSource{} + h := NewKeyVizHandler(captured).WithClock(func() time.Time { + return time.Unix(1_700_000_000, 0).UTC() + }) + srv := httptest.NewServer(h) + defer srv.Close() + + from := time.Unix(1_699_000_000, 0) + to := time.Unix(1_700_500_000, 0) + u := srv.URL + "?from_unix_ms=" + strconv.FormatInt(from.UnixMilli(), 10) + + "&to_unix_ms=" + strconv.FormatInt(to.UnixMilli(), 10) + resp := keyVizGet(t, u) + resp.Body.Close() + require.Equal(t, http.StatusOK, resp.StatusCode) + require.True(t, captured.from.Equal(from.UTC()), "from = %v, want %v", captured.from, from.UTC()) + require.True(t, captured.to.Equal(to.UTC()), "to = %v, want %v", captured.to, to.UTC()) + + // 0 → unbounded (zero Time), not Unix epoch. + captured.reset() + resp = keyVizGet(t, srv.URL+"?from_unix_ms=0&to_unix_ms=0") + resp.Body.Close() + require.Equal(t, http.StatusOK, resp.StatusCode) + require.True(t, captured.from.IsZero(), "from = %v, want zero", captured.from) + require.True(t, captured.to.IsZero(), "to = %v, want zero", captured.to) + + // Non-numeric → 400 invalid_query. + resp = keyVizGet(t, srv.URL+"?from_unix_ms=notanumber") + defer resp.Body.Close() + require.Equal(t, http.StatusBadRequest, resp.StatusCode) + var body map[string]string + require.NoError(t, json.NewDecoder(resp.Body).Decode(&body)) + require.Equal(t, "invalid_query", body["error"]) +} + +// capturingKeyVizSource records the from/to bounds the handler +// forwarded so tests can assert on the parse step without rendering +// any matrix data. +type capturingKeyVizSource struct { + from time.Time + to time.Time +} + +func (c *capturingKeyVizSource) Snapshot(from, to time.Time) []keyviz.MatrixColumn { + c.from = from + c.to = to + return nil +} + +func (c *capturingKeyVizSource) reset() { + c.from = time.Time{} + c.to = time.Time{} +} + +// TestKeyVizHandlerAggregateFallbackWhenTotalZero pins the defensive +// fallback for the unlikely case where the sampler emits an aggregate +// row with MemberRoutesTotal == 0: the handler must not serialise +// route_count = 0 (which the SPA would render as "0 routes" — nonsense +// for a virtual bucket). Instead it falls back to len(MemberRoutes). +func TestKeyVizHandlerAggregateFallbackWhenTotalZero(t *testing.T) { + t.Parallel() + srv := newKeyVizTestServer(t, &fakeKeyVizSource{cols: []keyviz.MatrixColumn{ + { + At: time.Unix(1_700_000_000, 0), + Rows: []keyviz.MatrixRow{ + { + RouteID: ^uint64(0), + Start: []byte("c"), + End: []byte("d"), + Aggregate: true, + MemberRoutes: []uint64{2, 3}, + MemberRoutesTotal: 0, // pathological zero + Writes: 5, + }, + }, + }, + }}) + defer srv.Close() + + resp := keyVizGet(t, srv.URL) + defer resp.Body.Close() + var matrix KeyVizMatrix + require.NoError(t, json.NewDecoder(resp.Body).Decode(&matrix)) + require.Len(t, matrix.Rows, 1) + require.True(t, matrix.Rows[0].Aggregate) + require.Equal(t, uint64(2), matrix.Rows[0].RouteCount, "fallback to len(MemberRoutes)") +} diff --git a/internal/admin/server.go b/internal/admin/server.go index 479c9424e..652a9fd84 100644 --- a/internal/admin/server.go +++ b/internal/admin/server.go @@ -54,6 +54,13 @@ type ServerDeps struct { // builds. Buckets BucketsSource + // KeyViz exposes the keyviz heatmap matrix to the dashboard via + // /admin/api/v1/keyviz/matrix. Optional: a nil value (or a node + // started without --keyvizEnabled) makes the route return 503 + // codes "keyviz_disabled" so the SPA can render a clear "feature + // off" state instead of an empty matrix. + KeyViz KeyVizSource + // StaticFS is the embed.FS (or any fs.FS) backing the SPA. May be // nil during early development; the router renders 404 for // /admin/assets/* and the SPA fallback in that case. @@ -101,7 +108,11 @@ func NewServer(deps ServerDeps) (*Server, error) { cluster := NewClusterHandler(deps.ClusterInfo).WithLogger(logger) dynamo := buildDynamoHandlerForDeps(deps, logger) s3 := buildS3HandlerForDeps(deps, logger) - mux := buildAPIMux(auth, deps.Verifier, cluster, dynamo, s3, logger) + // KeyViz handler is always registered: even when the source is + // nil it serves a 503 keyviz_disabled, which the SPA renders as + // a clearer "feature off" state than an unknown_endpoint 404. + keyviz := NewKeyVizHandler(deps.KeyViz).WithLogger(logger) + mux := buildAPIMux(auth, deps.Verifier, cluster, dynamo, s3, keyviz, logger) router := NewRouter(mux, deps.StaticFS) return &Server{deps: deps, router: router, auth: auth, mux: mux}, nil } @@ -197,6 +208,7 @@ func (s *Server) APIHandler() http.Handler { // DELETE /admin/api/v1/dynamo/tables/{name} (auth required, full role) // GET /admin/api/v1/s3/buckets (auth required) // GET /admin/api/v1/s3/buckets/{name} (auth required) +// GET /admin/api/v1/keyviz/matrix (auth required) // // Body limit applies uniformly. CSRF and Audit middleware apply to // write-capable protected endpoints; login and logout carry their own @@ -206,7 +218,11 @@ func (s *Server) APIHandler() http.Handler { // dynamoHandler / s3Handler may be nil; in that case the corresponding // paths fall through to the unknown-endpoint 404, matching the // behaviour of any other unregistered admin path. -func buildAPIMux(auth *AuthService, verifier *Verifier, clusterHandler, dynamoHandler, s3Handler http.Handler, logger *slog.Logger) http.Handler { +// +// keyvizHandler is always non-nil even when the sampler is disabled — +// it serves 503 keyviz_disabled itself so the SPA gets a clearer +// signal than an unknown_endpoint 404 from the catch-all. +func buildAPIMux(auth *AuthService, verifier *Verifier, clusterHandler, dynamoHandler, s3Handler, keyvizHandler http.Handler, logger *slog.Logger) http.Handler { loginHandler := http.HandlerFunc(auth.HandleLogin) logoutHandler := http.HandlerFunc(auth.HandleLogout) @@ -256,6 +272,7 @@ func buildAPIMux(auth *AuthService, verifier *Verifier, clusterHandler, dynamoHa loginChain := publicAuth(loginHandler) logoutChain := protectNoAudit(logoutHandler) clusterChain := protect(clusterHandler) + keyvizChain := protect(keyvizHandler) // Dynamo endpoints (reads and writes) share the protect chain // so a missing session or CSRF token 401s/403s the same way // regardless of method. The Audit middleware is a no-op for @@ -280,6 +297,7 @@ func buildAPIMux(auth *AuthService, verifier *Verifier, clusterHandler, dynamoHa cluster: clusterChain, dynamo: dynamoChain, s3: s3Chain, + keyviz: keyvizChain, } return http.HandlerFunc(routes.dispatch) } @@ -292,6 +310,7 @@ func buildAPIMux(auth *AuthService, verifier *Verifier, clusterHandler, dynamoHa type apiRouteTable struct { login, logout, cluster http.Handler dynamo, s3 http.Handler + keyviz http.Handler } // dispatch is the receiver method httpHandlerFunc adapts. Logic is @@ -304,6 +323,8 @@ func (t apiRouteTable) dispatch(w http.ResponseWriter, r *http.Request) { t.logout.ServeHTTP(w, r) case r.URL.Path == "/admin/api/v1/cluster": t.cluster.ServeHTTP(w, r) + case r.URL.Path == "/admin/api/v1/keyviz/matrix": + t.keyviz.ServeHTTP(w, r) case t.dynamo != nil && isDynamoPath(r.URL.Path): t.dynamo.ServeHTTP(w, r) case t.s3 != nil && isS3Path(r.URL.Path): diff --git a/main.go b/main.go index 5dc321975..05176eff0 100644 --- a/main.go +++ b/main.go @@ -764,7 +764,7 @@ func startServers(in serversInput) error { // the handler hands ErrTablesNotLeader writes to the forwarder // which dials the leader over the cached gRPC pool. Without these // the handler falls back to 503 + Retry-After:1. - if err := startAdminFromFlags(in.ctx, in.lc, in.eg, in.runtimes, runner.dynamoServer, runner.s3Server, in.coordinate, connCache); err != nil { + if err := startAdminFromFlags(in.ctx, in.lc, in.eg, in.runtimes, runner.dynamoServer, runner.s3Server, in.coordinate, connCache, in.keyvizSampler); err != nil { return waitErrgroupAfterStartupFailure(in.cancel, in.eg, err) } return nil diff --git a/main_admin.go b/main_admin.go index fa0fd95f0..75d37c374 100644 --- a/main_admin.go +++ b/main_admin.go @@ -13,6 +13,7 @@ import ( "github.com/bootjp/elastickv/adapter" "github.com/bootjp/elastickv/internal/admin" "github.com/bootjp/elastickv/internal/raftengine" + "github.com/bootjp/elastickv/keyviz" "github.com/bootjp/elastickv/kv" "github.com/cockroachdb/errors" "golang.org/x/sync/errgroup" @@ -77,6 +78,7 @@ func startAdminFromFlags( s3Server *adapter.S3Server, coordinate kv.Coordinator, connCache *kv.GRPCConnCache, + keyvizSampler *keyviz.MemSampler, ) error { if !*adminEnabled { return nil @@ -123,7 +125,7 @@ func startAdminFromFlags( if err != nil { return errors.Wrap(err, "build admin leader forwarder") } - _, err = startAdminServer(ctx, lc, eg, cfg, staticCreds, clusterSrc, tablesSrc, bucketsSrc, forwarder, buildVersion()) + _, err = startAdminServer(ctx, lc, eg, cfg, staticCreds, clusterSrc, tablesSrc, bucketsSrc, forwarder, keyvizSampler, buildVersion()) return err } @@ -449,6 +451,7 @@ func startAdminServer( tables admin.TablesSource, buckets admin.BucketsSource, forwarder admin.LeaderForwarder, + keyvizSampler *keyviz.MemSampler, version string, ) (string, error) { adminCfg := buildAdminConfig(cfg) @@ -456,7 +459,7 @@ func startAdminServer( if err != nil || !enabled { return "", err } - server, err := buildAdminHTTPServer(&adminCfg, creds, cluster, tables, buckets, forwarder) + server, err := buildAdminHTTPServer(&adminCfg, creds, cluster, tables, buckets, forwarder, keyvizSampler) if err != nil { return "", err } @@ -496,7 +499,7 @@ func checkAdminConfig(adminCfg *admin.Config, cluster admin.ClusterInfoSource) ( return true, nil } -func buildAdminHTTPServer(adminCfg *admin.Config, creds map[string]string, cluster admin.ClusterInfoSource, tables admin.TablesSource, buckets admin.BucketsSource, forwarder admin.LeaderForwarder) (*admin.Server, error) { +func buildAdminHTTPServer(adminCfg *admin.Config, creds map[string]string, cluster admin.ClusterInfoSource, tables admin.TablesSource, buckets admin.BucketsSource, forwarder admin.LeaderForwarder, keyvizSampler *keyviz.MemSampler) (*admin.Server, error) { primaryKeys, err := adminCfg.DecodedSigningKeys() if err != nil { return nil, errors.Wrap(err, "decode admin signing keys") @@ -522,6 +525,7 @@ func buildAdminHTTPServer(adminCfg *admin.Config, creds map[string]string, clust Tables: tables, Buckets: buckets, Forwarder: forwarder, + KeyViz: keyvizSourceFromSampler(keyvizSampler), StaticFS: staticFS, AuthOpts: admin.AuthServiceOpts{ InsecureCookie: adminCfg.AllowInsecureDevCookie, @@ -641,6 +645,20 @@ func resolveSigningKey(flagValue, filePath, envVar string) (string, error) { return strings.TrimSpace(flagValue), nil } +// keyvizSourceFromSampler boxes a *keyviz.MemSampler into the +// admin.KeyVizSource interface understood by ServerDeps. Returning a +// nil interface (not a typed-nil) when the sampler is disabled is +// load-bearing: the admin handler's "keyviz disabled → 503" branch +// only fires on an interface-nil; a typed-nil *MemSampler stored as +// a non-nil interface would silently return an empty matrix instead +// of the explicit "feature off" signal the SPA expects. +func keyvizSourceFromSampler(s *keyviz.MemSampler) admin.KeyVizSource { + if s == nil { + return nil + } + return s +} + // parseCSV splits a flag value like "a,b,c" into a slice with empty and // whitespace-only entries dropped. It is not in shard_config.go because // admin's comma-separated list format is simpler than raft groups. diff --git a/main_admin_test.go b/main_admin_test.go index d167804a1..2f83fdf0f 100644 --- a/main_admin_test.go +++ b/main_admin_test.go @@ -198,7 +198,7 @@ func TestStartAdminServer_DisabledNoOp(t *testing.T) { eg, ctx := errgroup.WithContext(context.Background()) defer func() { _ = eg.Wait() }() var lc net.ListenConfig - _, err := startAdminServer(ctx, &lc, eg, adminListenerConfig{enabled: false}, nil, nil, nil, nil, nil, "") + _, err := startAdminServer(ctx, &lc, eg, adminListenerConfig{enabled: false}, nil, nil, nil, nil, nil, nil, "") require.NoError(t, err) } @@ -211,7 +211,7 @@ func TestStartAdminServer_InvalidConfigRejected(t *testing.T) { listen: "127.0.0.1:0", // missing signing key } - _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, nil, nil, nil, "") + _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, nil, nil, nil, nil, "") require.Error(t, err) } @@ -224,7 +224,7 @@ func TestStartAdminServer_NonLoopbackWithoutTLSRejected(t *testing.T) { listen: "0.0.0.0:0", sessionSigningKey: freshKey(), } - _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, nil, nil, nil, "") + _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, nil, nil, nil, nil, "") require.Error(t, err) require.Contains(t, err.Error(), "TLS") } @@ -238,7 +238,7 @@ func TestStartAdminServer_RejectsMissingClusterSource(t *testing.T) { listen: "127.0.0.1:0", sessionSigningKey: freshKey(), } - _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, nil, nil, nil, "") + _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, nil, nil, nil, nil, "") require.Error(t, err) require.Contains(t, err.Error(), "cluster info source") } @@ -261,7 +261,7 @@ func TestStartAdminServer_ServesHealthz(t *testing.T) { cluster := admin.ClusterInfoFunc(func(_ context.Context) (admin.ClusterInfo, error) { return admin.ClusterInfo{NodeID: "n1", Version: "test"}, nil }) - addr, err := startAdminServer(eCtx, &lc, eg, cfg, map[string]string{}, cluster, nil, nil, nil, "test") + addr, err := startAdminServer(eCtx, &lc, eg, cfg, map[string]string{}, cluster, nil, nil, nil, nil, "test") require.NoError(t, err) // Poll /admin/healthz until success or the test deadline. @@ -304,7 +304,7 @@ func TestStartAdminServer_ServesTLS(t *testing.T) { cluster := admin.ClusterInfoFunc(func(_ context.Context) (admin.ClusterInfo, error) { return admin.ClusterInfo{NodeID: "n-tls", Version: "test"}, nil }) - addr, err := startAdminServer(eCtx, &lc, eg, cfg, map[string]string{}, cluster, nil, nil, nil, "test") + addr, err := startAdminServer(eCtx, &lc, eg, cfg, map[string]string{}, cluster, nil, nil, nil, nil, "test") require.NoError(t, err) transport := &http.Transport{TLSClientConfig: &tls.Config{