-
Notifications
You must be signed in to change notification settings - Fork 2
feat(admin): expose keyviz heatmap via /admin/api/v1/keyviz/matrix #660
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,367 @@ | ||
| package admin | ||
|
|
||
| import ( | ||
| "bytes" | ||
| "errors" | ||
| "log/slog" | ||
| "net/http" | ||
| "sort" | ||
| "strconv" | ||
| "time" | ||
|
|
||
| "github.com/bootjp/elastickv/keyviz" | ||
| "github.com/goccy/go-json" | ||
| ) | ||
|
|
||
| // KeyVizSource is the small contract the keyviz handler depends on. | ||
| // Production wires this to a real *keyviz.MemSampler; tests use a | ||
| // stub that returns canned columns. | ||
| // | ||
| // Snapshot returns the matrix columns within [from, to). Either bound | ||
| // may be the zero Time meaning unbounded on that side. Implementations | ||
| // MUST return rows the caller can mutate freely (a deep copy) — see | ||
| // keyviz.MemSampler.Snapshot. | ||
| type KeyVizSource interface { | ||
| Snapshot(from, to time.Time) []keyviz.MatrixColumn | ||
| } | ||
|
|
||
| // KeyVizSeries selects which counter on a MatrixRow the response | ||
| // surfaces in `Values`. Wire form mirrors the proto enum but uses | ||
| // lowercase strings so the SPA can pass `?series=writes` directly | ||
| // without an extra encoding round-trip. | ||
| type KeyVizSeries string | ||
|
|
||
| const ( | ||
| keyVizSeriesReads KeyVizSeries = "reads" | ||
| keyVizSeriesWrites KeyVizSeries = "writes" | ||
| keyVizSeriesReadBytes KeyVizSeries = "read_bytes" | ||
| keyVizSeriesWriteBytes KeyVizSeries = "write_bytes" | ||
| ) | ||
|
|
||
| // keyVizDefaultSeries matches the design doc §4.1 default. Writes is | ||
| // the primary signal the heatmap is built around; reads will land in | ||
| // a follow-up phase (read sampling not yet wired). | ||
| const keyVizDefaultSeries = keyVizSeriesWrites | ||
|
|
||
| // keyVizRowBudgetCap is the upper bound on the per-request row | ||
| // budget. Mirrors the same cap on the gRPC GetKeyVizMatrix RPC so | ||
| // the SPA cannot force unbounded payloads via the JSON path. Design | ||
| // doc §4.1. | ||
| const keyVizRowBudgetCap = 1024 | ||
|
|
||
| // KeyVizMatrix is the row-major JSON wire form returned by | ||
| // /admin/api/v1/keyviz/matrix. Mirrors the proto GetKeyVizMatrixResponse | ||
| // shape so a future refactor can share a single pivot helper across | ||
| // the adapter (gRPC) and admin (JSON) paths. | ||
| type KeyVizMatrix struct { | ||
| ColumnUnixMs []int64 `json:"column_unix_ms"` | ||
| Rows []KeyVizRow `json:"rows"` | ||
| Series KeyVizSeries `json:"series"` | ||
| GeneratedAt time.Time `json:"generated_at"` | ||
| } | ||
|
|
||
| // KeyVizRow is one route's worth of activity across the column window, | ||
| // matching the proto KeyVizRow layout. Values is parallel to | ||
| // KeyVizMatrix.ColumnUnixMs — Values[j] is the counter for that route | ||
| // at column j. | ||
| type KeyVizRow struct { | ||
| BucketID string `json:"bucket_id"` | ||
| Start []byte `json:"start"` | ||
| End []byte `json:"end"` | ||
| Aggregate bool `json:"aggregate"` | ||
| RouteIDs []uint64 `json:"route_ids,omitempty"` | ||
| RouteIDsTruncated bool `json:"route_ids_truncated,omitempty"` | ||
| RouteCount uint64 `json:"route_count"` | ||
| Values []uint64 `json:"values"` | ||
| // total accumulates the sum of Values during pivot so the | ||
| // rowBudget sort is O(N log N) on a precomputed key rather | ||
| // than O(N log N × M) recomputing the sum per comparison. | ||
| // Not on the wire — clients read activity off Values directly. | ||
| total uint64 | ||
| } | ||
|
|
||
| // KeyVizHandler serves GET /admin/api/v1/keyviz/matrix. | ||
| // | ||
| // Query parameters (all optional): | ||
| // | ||
| // series - reads | writes | read_bytes | write_bytes (default: writes) | ||
| // from_unix_ms - lower bound in unix ms; 0 or omitted means unbounded | ||
| // on that side (NOT the Unix epoch) | ||
| // to_unix_ms - upper bound in unix ms; same 0 = unbounded contract | ||
| // rows - row budget; 0 means no cap, capped at 1024 (default: 0) | ||
| // | ||
| // Returns 503 codes.Unavailable when no sampler is configured so the | ||
| // SPA can distinguish "keyviz disabled" from "no data yet" (the | ||
| // latter is a successful empty matrix). | ||
| type KeyVizHandler struct { | ||
| source KeyVizSource | ||
| now func() time.Time | ||
| logger *slog.Logger | ||
| } | ||
|
|
||
| // NewKeyVizHandler wires a KeyVizSource into the HTTP handler. | ||
| // source may be nil; calls to ServeHTTP will then return 503 with | ||
| // code "keyviz_disabled". | ||
| func NewKeyVizHandler(source KeyVizSource) *KeyVizHandler { | ||
| return &KeyVizHandler{ | ||
| source: source, | ||
| now: func() time.Time { return time.Now().UTC() }, | ||
| logger: slog.Default(), | ||
| } | ||
| } | ||
|
|
||
| // WithLogger overrides the slog destination so main.go can attach a | ||
| // component tag without changing the constructor signature. | ||
| func (h *KeyVizHandler) WithLogger(l *slog.Logger) *KeyVizHandler { | ||
| if l == nil { | ||
| return h | ||
| } | ||
| h.logger = l | ||
| return h | ||
| } | ||
|
|
||
| // WithClock lets tests inject a deterministic GeneratedAt. | ||
| func (h *KeyVizHandler) WithClock(now func() time.Time) *KeyVizHandler { | ||
| if now == nil { | ||
| return h | ||
| } | ||
| h.now = now | ||
| return h | ||
| } | ||
|
|
||
| func (h *KeyVizHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { | ||
| if r.Method != http.MethodGet { | ||
| writeJSONError(w, http.StatusMethodNotAllowed, "method_not_allowed", "only GET") | ||
| return | ||
| } | ||
| if h.source == nil { | ||
| writeJSONError(w, http.StatusServiceUnavailable, "keyviz_disabled", | ||
| "key visualizer sampler is not configured on this node") | ||
| return | ||
| } | ||
| params, err := parseKeyVizParams(r) | ||
| if err != nil { | ||
| writeJSONError(w, http.StatusBadRequest, "invalid_query", err.Error()) | ||
| return | ||
| } | ||
| cols := h.source.Snapshot(params.from, params.to) | ||
| matrix := pivotKeyVizColumns(cols, params.series, params.rows) | ||
| matrix.GeneratedAt = h.now() | ||
| w.Header().Set("Content-Type", "application/json; charset=utf-8") | ||
| w.Header().Set("Cache-Control", "no-store") | ||
| w.WriteHeader(http.StatusOK) | ||
| if err := json.NewEncoder(w).Encode(matrix); err != nil { | ||
| h.logger.LogAttrs(r.Context(), slog.LevelWarn, "admin keyviz response encode failed", | ||
| slog.String("error", err.Error()), | ||
| ) | ||
| } | ||
| } | ||
|
|
||
| // keyVizParams is the parsed query-string form of a matrix request. | ||
| type keyVizParams struct { | ||
| series KeyVizSeries | ||
| from time.Time | ||
| to time.Time | ||
| rows int | ||
| } | ||
|
|
||
| func parseKeyVizParams(r *http.Request) (keyVizParams, error) { | ||
| p := keyVizParams{series: keyVizDefaultSeries} | ||
| q := r.URL.Query() | ||
| if err := setKeyVizSeriesParam(&p, q.Get("series")); err != nil { | ||
| return keyVizParams{}, err | ||
| } | ||
| if err := setKeyVizTimeParam(&p.from, "from_unix_ms", q.Get("from_unix_ms")); err != nil { | ||
| return keyVizParams{}, err | ||
| } | ||
| if err := setKeyVizTimeParam(&p.to, "to_unix_ms", q.Get("to_unix_ms")); err != nil { | ||
| return keyVizParams{}, err | ||
| } | ||
| if err := setKeyVizRowsParam(&p.rows, q.Get("rows")); err != nil { | ||
| return keyVizParams{}, err | ||
| } | ||
| return p, nil | ||
| } | ||
|
|
||
| func setKeyVizSeriesParam(p *keyVizParams, raw string) error { | ||
| if raw == "" { | ||
| return nil | ||
| } | ||
| series, err := parseKeyVizSeries(raw) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| p.series = series | ||
| return nil | ||
| } | ||
|
|
||
| func setKeyVizTimeParam(dst *time.Time, name, raw string) error { | ||
| if raw == "" { | ||
| return nil | ||
| } | ||
| t, err := parseUnixMs(name, raw) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| *dst = t | ||
| return nil | ||
| } | ||
|
|
||
| func setKeyVizRowsParam(dst *int, raw string) error { | ||
| if raw == "" { | ||
| return nil | ||
| } | ||
|
Comment on lines
+211
to
+213
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When the query omits Useful? React with 👍 / 👎. |
||
| n, err := strconv.Atoi(raw) | ||
| if err != nil || n < 0 { | ||
| return errors.New("rows must be a non-negative integer") | ||
| } | ||
| if n > keyVizRowBudgetCap { | ||
| n = keyVizRowBudgetCap | ||
| } | ||
| *dst = n | ||
| return nil | ||
| } | ||
|
|
||
| func parseKeyVizSeries(s string) (KeyVizSeries, error) { | ||
| switch KeyVizSeries(s) { | ||
| case keyVizSeriesReads, keyVizSeriesWrites, keyVizSeriesReadBytes, keyVizSeriesWriteBytes: | ||
| return KeyVizSeries(s), nil | ||
| default: | ||
| return "", errors.New("series must be one of: reads, writes, read_bytes, write_bytes") | ||
| } | ||
| } | ||
|
|
||
| func parseUnixMs(name, raw string) (time.Time, error) { | ||
| n, err := strconv.ParseInt(raw, 10, 64) | ||
| if err != nil { | ||
| return time.Time{}, errors.New(name + " must be an integer (unix milliseconds)") | ||
| } | ||
| if n == 0 { | ||
| return time.Time{}, nil | ||
| } | ||
| return time.UnixMilli(n).UTC(), nil | ||
| } | ||
|
|
||
| // pivotKeyVizColumns flips the column-major MatrixColumn slice into | ||
| // the row-major JSON shape, picks the requested series counter from | ||
| // each MatrixRow, and applies the rowBudget cap (top-N by total | ||
| // activity) before sorting back into Start order. | ||
| // | ||
| // Mirrors adapter.matrixToProto exactly — the duplication is | ||
| // intentional for now to keep the gRPC and JSON paths independent; | ||
| // extracting a shared helper into the keyviz package is a future | ||
| // cleanup. | ||
| func pivotKeyVizColumns(cols []keyviz.MatrixColumn, series KeyVizSeries, rowBudget int) KeyVizMatrix { | ||
| pick := keyVizSeriesPicker(series) | ||
| matrix := KeyVizMatrix{ | ||
| Series: series, | ||
| ColumnUnixMs: make([]int64, len(cols)), | ||
| } | ||
| rowsByID := make(map[uint64]*KeyVizRow) | ||
| order := make([]uint64, 0) | ||
| for j, col := range cols { | ||
| matrix.ColumnUnixMs[j] = col.At.UnixMilli() | ||
| for _, mr := range col.Rows { | ||
| row, ok := rowsByID[mr.RouteID] | ||
| if !ok { | ||
| row = newKeyVizRowFrom(mr, len(cols)) | ||
| rowsByID[mr.RouteID] = row | ||
| order = append(order, mr.RouteID) | ||
| } | ||
| v := pick(mr) | ||
| row.Values[j] = v | ||
| row.total += v | ||
| } | ||
| } | ||
| matrix.Rows = make([]KeyVizRow, len(order)) | ||
| for i, id := range order { | ||
| matrix.Rows[i] = *rowsByID[id] | ||
| } | ||
| matrix.Rows = applyKeyVizRowBudget(matrix.Rows, rowBudget) | ||
| sortKeyVizRowsByStart(matrix.Rows) | ||
| return matrix | ||
| } | ||
|
|
||
| func keyVizSeriesPicker(series KeyVizSeries) func(keyviz.MatrixRow) uint64 { | ||
| switch series { | ||
| case keyVizSeriesReads: | ||
| return func(r keyviz.MatrixRow) uint64 { return r.Reads } | ||
| case keyVizSeriesReadBytes: | ||
| return func(r keyviz.MatrixRow) uint64 { return r.ReadBytes } | ||
| case keyVizSeriesWriteBytes: | ||
| return func(r keyviz.MatrixRow) uint64 { return r.WriteBytes } | ||
| case keyVizSeriesWrites: | ||
| return func(r keyviz.MatrixRow) uint64 { return r.Writes } | ||
| default: | ||
| return func(r keyviz.MatrixRow) uint64 { return r.Writes } | ||
| } | ||
| } | ||
|
|
||
| func newKeyVizRowFrom(mr keyviz.MatrixRow, numCols int) *KeyVizRow { | ||
| total := mr.MemberRoutesTotal | ||
| switch { | ||
| case !mr.Aggregate && total == 0: | ||
| // Individual slots with the field zero-initialised — every | ||
| // real route contributes exactly one member to itself. | ||
| total = 1 | ||
| case mr.Aggregate && total == 0: | ||
| // Defensive fallback: a virtual bucket should always carry a | ||
| // non-zero MemberRoutesTotal once foldIntoBucket has run, but | ||
| // if a sampler ever serialises a just-coalesced bucket before | ||
| // the count is set the SPA would render "0 routes" — which is | ||
| // nonsense for an aggregate row. Fall back to the visible | ||
| // MemberRoutes length so route_count stays meaningful. | ||
| total = uint64(len(mr.MemberRoutes)) | ||
| } | ||
| row := &KeyVizRow{ | ||
| BucketID: bucketIDFor(mr), | ||
| Start: append([]byte(nil), mr.Start...), | ||
| End: append([]byte(nil), mr.End...), | ||
| Aggregate: mr.Aggregate, | ||
| RouteCount: total, | ||
| RouteIDsTruncated: mr.Aggregate && total > uint64(len(mr.MemberRoutes)), | ||
| Values: make([]uint64, numCols), | ||
| } | ||
| if mr.Aggregate { | ||
| row.RouteIDs = append([]uint64(nil), mr.MemberRoutes...) | ||
| } | ||
| return row | ||
| } | ||
|
|
||
| func bucketIDFor(mr keyviz.MatrixRow) string { | ||
| if mr.Aggregate { | ||
| return "virtual:" + strconv.FormatUint(mr.RouteID, 10) | ||
| } | ||
| return "route:" + strconv.FormatUint(mr.RouteID, 10) | ||
| } | ||
|
|
||
| // applyKeyVizRowBudget mirrors the adapter Phase-1 simplification: | ||
| // activity-descending truncation rather than design §5.5's lexicographic | ||
| // merge. Future work should swap in the spec'd merge once the | ||
| // virtual-bucket plumbing supports synthesis at the response layer. | ||
| // | ||
| // Sort uses the precomputed row.total (accumulated during pivot) so | ||
| // the comparator is O(1), not O(M). BucketID breaks activity ties | ||
| // deterministically — the SPA refresh on the same data must yield the | ||
| // same row set. | ||
| func applyKeyVizRowBudget(rows []KeyVizRow, budget int) []KeyVizRow { | ||
| if budget <= 0 || len(rows) <= budget { | ||
| return rows | ||
| } | ||
| sort.Slice(rows, func(i, j int) bool { | ||
| if rows[i].total != rows[j].total { | ||
| return rows[i].total > rows[j].total | ||
| } | ||
| return rows[i].BucketID < rows[j].BucketID | ||
| }) | ||
|
Comment on lines
+351
to
+356
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The current sorting logic in applyKeyVizRowBudget is inefficient because it calls rowActivityTotal (an O(M) operation) inside the sort comparator (O(N log N) calls). Additionally, it lacks a tie-breaker, which makes the truncation non-deterministic when multiple rows have the same total activity. This can lead to inconsistent UI states upon refresh. Using a pre-calculated Total field and adding BucketID as a tie-breaker ensures both performance and determinism. sort.Slice(rows, func(i, j int) bool {
if rows[i].Total != rows[j].Total {
return rows[i].Total > rows[j].Total
}
return rows[i].BucketID < rows[j].BucketID
})References
|
||
| return rows[:budget] | ||
| } | ||
|
|
||
| func sortKeyVizRowsByStart(rows []KeyVizRow) { | ||
| sort.Slice(rows, func(i, j int) bool { | ||
| if c := bytes.Compare(rows[i].Start, rows[j].Start); c != 0 { | ||
| return c < 0 | ||
| } | ||
| return rows[i].BucketID < rows[j].BucketID | ||
| }) | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The KeyVizRow struct is used for JSON serialization but also serves as the intermediate object for sorting and truncation. To improve the performance of applyKeyVizRowBudget, consider adding a Total field with a json:"-" tag. This allows pre-calculating the activity sum during the pivot phase, avoiding redundant O(M) iterations over the Values slice during the O(N log N) sort comparisons.