|
| 1 | +package admin |
| 2 | + |
| 3 | +import ( |
| 4 | + "bytes" |
| 5 | + "errors" |
| 6 | + "log/slog" |
| 7 | + "net/http" |
| 8 | + "sort" |
| 9 | + "strconv" |
| 10 | + "time" |
| 11 | + |
| 12 | + "github.com/bootjp/elastickv/keyviz" |
| 13 | + "github.com/goccy/go-json" |
| 14 | +) |
| 15 | + |
| 16 | +// KeyVizSource is the small contract the keyviz handler depends on. |
| 17 | +// Production wires this to a real *keyviz.MemSampler; tests use a |
| 18 | +// stub that returns canned columns. |
| 19 | +// |
| 20 | +// Snapshot returns the matrix columns within [from, to). Either bound |
| 21 | +// may be the zero Time meaning unbounded on that side. Implementations |
| 22 | +// MUST return rows the caller can mutate freely (a deep copy) — see |
| 23 | +// keyviz.MemSampler.Snapshot. |
| 24 | +type KeyVizSource interface { |
| 25 | + Snapshot(from, to time.Time) []keyviz.MatrixColumn |
| 26 | +} |
| 27 | + |
| 28 | +// KeyVizSeries selects which counter on a MatrixRow the response |
| 29 | +// surfaces in `Values`. Wire form mirrors the proto enum but uses |
| 30 | +// lowercase strings so the SPA can pass `?series=writes` directly |
| 31 | +// without an extra encoding round-trip. |
| 32 | +type KeyVizSeries string |
| 33 | + |
| 34 | +const ( |
| 35 | + keyVizSeriesReads KeyVizSeries = "reads" |
| 36 | + keyVizSeriesWrites KeyVizSeries = "writes" |
| 37 | + keyVizSeriesReadBytes KeyVizSeries = "read_bytes" |
| 38 | + keyVizSeriesWriteBytes KeyVizSeries = "write_bytes" |
| 39 | +) |
| 40 | + |
| 41 | +// keyVizDefaultSeries matches the design doc §4.1 default. Writes is |
| 42 | +// the primary signal the heatmap is built around; reads will land in |
| 43 | +// a follow-up phase (read sampling not yet wired). |
| 44 | +const keyVizDefaultSeries = keyVizSeriesWrites |
| 45 | + |
| 46 | +// keyVizRowBudgetCap is the upper bound on the per-request row |
| 47 | +// budget. Mirrors the same cap on the gRPC GetKeyVizMatrix RPC so |
| 48 | +// the SPA cannot force unbounded payloads via the JSON path. Design |
| 49 | +// doc §4.1. |
| 50 | +const keyVizRowBudgetCap = 1024 |
| 51 | + |
| 52 | +// KeyVizMatrix is the row-major JSON wire form returned by |
| 53 | +// /admin/api/v1/keyviz/matrix. Mirrors the proto GetKeyVizMatrixResponse |
| 54 | +// shape so a future refactor can share a single pivot helper across |
| 55 | +// the adapter (gRPC) and admin (JSON) paths. |
| 56 | +type KeyVizMatrix struct { |
| 57 | + ColumnUnixMs []int64 `json:"column_unix_ms"` |
| 58 | + Rows []KeyVizRow `json:"rows"` |
| 59 | + Series KeyVizSeries `json:"series"` |
| 60 | + GeneratedAt time.Time `json:"generated_at"` |
| 61 | +} |
| 62 | + |
| 63 | +// KeyVizRow is one route's worth of activity across the column window, |
| 64 | +// matching the proto KeyVizRow layout. Values is parallel to |
| 65 | +// KeyVizMatrix.ColumnUnixMs — Values[j] is the counter for that route |
| 66 | +// at column j. |
| 67 | +type KeyVizRow struct { |
| 68 | + BucketID string `json:"bucket_id"` |
| 69 | + Start []byte `json:"start"` |
| 70 | + End []byte `json:"end"` |
| 71 | + Aggregate bool `json:"aggregate"` |
| 72 | + RouteIDs []uint64 `json:"route_ids,omitempty"` |
| 73 | + RouteIDsTruncated bool `json:"route_ids_truncated,omitempty"` |
| 74 | + RouteCount uint64 `json:"route_count"` |
| 75 | + Values []uint64 `json:"values"` |
| 76 | + // total accumulates the sum of Values during pivot so the |
| 77 | + // rowBudget sort is O(N log N) on a precomputed key rather |
| 78 | + // than O(N log N × M) recomputing the sum per comparison. |
| 79 | + // Not on the wire — clients read activity off Values directly. |
| 80 | + total uint64 |
| 81 | +} |
| 82 | + |
| 83 | +// KeyVizHandler serves GET /admin/api/v1/keyviz/matrix. |
| 84 | +// |
| 85 | +// Query parameters (all optional): |
| 86 | +// |
| 87 | +// series - reads | writes | read_bytes | write_bytes (default: writes) |
| 88 | +// from_unix_ms - lower bound in unix ms; 0 or omitted means unbounded |
| 89 | +// on that side (NOT the Unix epoch) |
| 90 | +// to_unix_ms - upper bound in unix ms; same 0 = unbounded contract |
| 91 | +// rows - row budget; 0 means no cap, capped at 1024 (default: 0) |
| 92 | +// |
| 93 | +// Returns 503 codes.Unavailable when no sampler is configured so the |
| 94 | +// SPA can distinguish "keyviz disabled" from "no data yet" (the |
| 95 | +// latter is a successful empty matrix). |
| 96 | +type KeyVizHandler struct { |
| 97 | + source KeyVizSource |
| 98 | + now func() time.Time |
| 99 | + logger *slog.Logger |
| 100 | +} |
| 101 | + |
| 102 | +// NewKeyVizHandler wires a KeyVizSource into the HTTP handler. |
| 103 | +// source may be nil; calls to ServeHTTP will then return 503 with |
| 104 | +// code "keyviz_disabled". |
| 105 | +func NewKeyVizHandler(source KeyVizSource) *KeyVizHandler { |
| 106 | + return &KeyVizHandler{ |
| 107 | + source: source, |
| 108 | + now: func() time.Time { return time.Now().UTC() }, |
| 109 | + logger: slog.Default(), |
| 110 | + } |
| 111 | +} |
| 112 | + |
| 113 | +// WithLogger overrides the slog destination so main.go can attach a |
| 114 | +// component tag without changing the constructor signature. |
| 115 | +func (h *KeyVizHandler) WithLogger(l *slog.Logger) *KeyVizHandler { |
| 116 | + if l == nil { |
| 117 | + return h |
| 118 | + } |
| 119 | + h.logger = l |
| 120 | + return h |
| 121 | +} |
| 122 | + |
| 123 | +// WithClock lets tests inject a deterministic GeneratedAt. |
| 124 | +func (h *KeyVizHandler) WithClock(now func() time.Time) *KeyVizHandler { |
| 125 | + if now == nil { |
| 126 | + return h |
| 127 | + } |
| 128 | + h.now = now |
| 129 | + return h |
| 130 | +} |
| 131 | + |
| 132 | +func (h *KeyVizHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { |
| 133 | + if r.Method != http.MethodGet { |
| 134 | + writeJSONError(w, http.StatusMethodNotAllowed, "method_not_allowed", "only GET") |
| 135 | + return |
| 136 | + } |
| 137 | + if h.source == nil { |
| 138 | + writeJSONError(w, http.StatusServiceUnavailable, "keyviz_disabled", |
| 139 | + "key visualizer sampler is not configured on this node") |
| 140 | + return |
| 141 | + } |
| 142 | + params, err := parseKeyVizParams(r) |
| 143 | + if err != nil { |
| 144 | + writeJSONError(w, http.StatusBadRequest, "invalid_query", err.Error()) |
| 145 | + return |
| 146 | + } |
| 147 | + cols := h.source.Snapshot(params.from, params.to) |
| 148 | + matrix := pivotKeyVizColumns(cols, params.series, params.rows) |
| 149 | + matrix.GeneratedAt = h.now() |
| 150 | + w.Header().Set("Content-Type", "application/json; charset=utf-8") |
| 151 | + w.Header().Set("Cache-Control", "no-store") |
| 152 | + w.WriteHeader(http.StatusOK) |
| 153 | + if err := json.NewEncoder(w).Encode(matrix); err != nil { |
| 154 | + h.logger.LogAttrs(r.Context(), slog.LevelWarn, "admin keyviz response encode failed", |
| 155 | + slog.String("error", err.Error()), |
| 156 | + ) |
| 157 | + } |
| 158 | +} |
| 159 | + |
| 160 | +// keyVizParams is the parsed query-string form of a matrix request. |
| 161 | +type keyVizParams struct { |
| 162 | + series KeyVizSeries |
| 163 | + from time.Time |
| 164 | + to time.Time |
| 165 | + rows int |
| 166 | +} |
| 167 | + |
| 168 | +func parseKeyVizParams(r *http.Request) (keyVizParams, error) { |
| 169 | + p := keyVizParams{series: keyVizDefaultSeries} |
| 170 | + q := r.URL.Query() |
| 171 | + if err := setKeyVizSeriesParam(&p, q.Get("series")); err != nil { |
| 172 | + return keyVizParams{}, err |
| 173 | + } |
| 174 | + if err := setKeyVizTimeParam(&p.from, "from_unix_ms", q.Get("from_unix_ms")); err != nil { |
| 175 | + return keyVizParams{}, err |
| 176 | + } |
| 177 | + if err := setKeyVizTimeParam(&p.to, "to_unix_ms", q.Get("to_unix_ms")); err != nil { |
| 178 | + return keyVizParams{}, err |
| 179 | + } |
| 180 | + if err := setKeyVizRowsParam(&p.rows, q.Get("rows")); err != nil { |
| 181 | + return keyVizParams{}, err |
| 182 | + } |
| 183 | + return p, nil |
| 184 | +} |
| 185 | + |
| 186 | +func setKeyVizSeriesParam(p *keyVizParams, raw string) error { |
| 187 | + if raw == "" { |
| 188 | + return nil |
| 189 | + } |
| 190 | + series, err := parseKeyVizSeries(raw) |
| 191 | + if err != nil { |
| 192 | + return err |
| 193 | + } |
| 194 | + p.series = series |
| 195 | + return nil |
| 196 | +} |
| 197 | + |
| 198 | +func setKeyVizTimeParam(dst *time.Time, name, raw string) error { |
| 199 | + if raw == "" { |
| 200 | + return nil |
| 201 | + } |
| 202 | + t, err := parseUnixMs(name, raw) |
| 203 | + if err != nil { |
| 204 | + return err |
| 205 | + } |
| 206 | + *dst = t |
| 207 | + return nil |
| 208 | +} |
| 209 | + |
| 210 | +func setKeyVizRowsParam(dst *int, raw string) error { |
| 211 | + if raw == "" { |
| 212 | + return nil |
| 213 | + } |
| 214 | + n, err := strconv.Atoi(raw) |
| 215 | + if err != nil || n < 0 { |
| 216 | + return errors.New("rows must be a non-negative integer") |
| 217 | + } |
| 218 | + if n > keyVizRowBudgetCap { |
| 219 | + n = keyVizRowBudgetCap |
| 220 | + } |
| 221 | + *dst = n |
| 222 | + return nil |
| 223 | +} |
| 224 | + |
| 225 | +func parseKeyVizSeries(s string) (KeyVizSeries, error) { |
| 226 | + switch KeyVizSeries(s) { |
| 227 | + case keyVizSeriesReads, keyVizSeriesWrites, keyVizSeriesReadBytes, keyVizSeriesWriteBytes: |
| 228 | + return KeyVizSeries(s), nil |
| 229 | + default: |
| 230 | + return "", errors.New("series must be one of: reads, writes, read_bytes, write_bytes") |
| 231 | + } |
| 232 | +} |
| 233 | + |
| 234 | +func parseUnixMs(name, raw string) (time.Time, error) { |
| 235 | + n, err := strconv.ParseInt(raw, 10, 64) |
| 236 | + if err != nil { |
| 237 | + return time.Time{}, errors.New(name + " must be an integer (unix milliseconds)") |
| 238 | + } |
| 239 | + if n == 0 { |
| 240 | + return time.Time{}, nil |
| 241 | + } |
| 242 | + return time.UnixMilli(n).UTC(), nil |
| 243 | +} |
| 244 | + |
| 245 | +// pivotKeyVizColumns flips the column-major MatrixColumn slice into |
| 246 | +// the row-major JSON shape, picks the requested series counter from |
| 247 | +// each MatrixRow, and applies the rowBudget cap (top-N by total |
| 248 | +// activity) before sorting back into Start order. |
| 249 | +// |
| 250 | +// Mirrors adapter.matrixToProto exactly — the duplication is |
| 251 | +// intentional for now to keep the gRPC and JSON paths independent; |
| 252 | +// extracting a shared helper into the keyviz package is a future |
| 253 | +// cleanup. |
| 254 | +func pivotKeyVizColumns(cols []keyviz.MatrixColumn, series KeyVizSeries, rowBudget int) KeyVizMatrix { |
| 255 | + pick := keyVizSeriesPicker(series) |
| 256 | + matrix := KeyVizMatrix{ |
| 257 | + Series: series, |
| 258 | + ColumnUnixMs: make([]int64, len(cols)), |
| 259 | + } |
| 260 | + rowsByID := make(map[uint64]*KeyVizRow) |
| 261 | + order := make([]uint64, 0) |
| 262 | + for j, col := range cols { |
| 263 | + matrix.ColumnUnixMs[j] = col.At.UnixMilli() |
| 264 | + for _, mr := range col.Rows { |
| 265 | + row, ok := rowsByID[mr.RouteID] |
| 266 | + if !ok { |
| 267 | + row = newKeyVizRowFrom(mr, len(cols)) |
| 268 | + rowsByID[mr.RouteID] = row |
| 269 | + order = append(order, mr.RouteID) |
| 270 | + } |
| 271 | + v := pick(mr) |
| 272 | + row.Values[j] = v |
| 273 | + row.total += v |
| 274 | + } |
| 275 | + } |
| 276 | + matrix.Rows = make([]KeyVizRow, len(order)) |
| 277 | + for i, id := range order { |
| 278 | + matrix.Rows[i] = *rowsByID[id] |
| 279 | + } |
| 280 | + matrix.Rows = applyKeyVizRowBudget(matrix.Rows, rowBudget) |
| 281 | + sortKeyVizRowsByStart(matrix.Rows) |
| 282 | + return matrix |
| 283 | +} |
| 284 | + |
| 285 | +func keyVizSeriesPicker(series KeyVizSeries) func(keyviz.MatrixRow) uint64 { |
| 286 | + switch series { |
| 287 | + case keyVizSeriesReads: |
| 288 | + return func(r keyviz.MatrixRow) uint64 { return r.Reads } |
| 289 | + case keyVizSeriesReadBytes: |
| 290 | + return func(r keyviz.MatrixRow) uint64 { return r.ReadBytes } |
| 291 | + case keyVizSeriesWriteBytes: |
| 292 | + return func(r keyviz.MatrixRow) uint64 { return r.WriteBytes } |
| 293 | + case keyVizSeriesWrites: |
| 294 | + return func(r keyviz.MatrixRow) uint64 { return r.Writes } |
| 295 | + default: |
| 296 | + return func(r keyviz.MatrixRow) uint64 { return r.Writes } |
| 297 | + } |
| 298 | +} |
| 299 | + |
| 300 | +func newKeyVizRowFrom(mr keyviz.MatrixRow, numCols int) *KeyVizRow { |
| 301 | + total := mr.MemberRoutesTotal |
| 302 | + switch { |
| 303 | + case !mr.Aggregate && total == 0: |
| 304 | + // Individual slots with the field zero-initialised — every |
| 305 | + // real route contributes exactly one member to itself. |
| 306 | + total = 1 |
| 307 | + case mr.Aggregate && total == 0: |
| 308 | + // Defensive fallback: a virtual bucket should always carry a |
| 309 | + // non-zero MemberRoutesTotal once foldIntoBucket has run, but |
| 310 | + // if a sampler ever serialises a just-coalesced bucket before |
| 311 | + // the count is set the SPA would render "0 routes" — which is |
| 312 | + // nonsense for an aggregate row. Fall back to the visible |
| 313 | + // MemberRoutes length so route_count stays meaningful. |
| 314 | + total = uint64(len(mr.MemberRoutes)) |
| 315 | + } |
| 316 | + row := &KeyVizRow{ |
| 317 | + BucketID: bucketIDFor(mr), |
| 318 | + Start: append([]byte(nil), mr.Start...), |
| 319 | + End: append([]byte(nil), mr.End...), |
| 320 | + Aggregate: mr.Aggregate, |
| 321 | + RouteCount: total, |
| 322 | + RouteIDsTruncated: mr.Aggregate && total > uint64(len(mr.MemberRoutes)), |
| 323 | + Values: make([]uint64, numCols), |
| 324 | + } |
| 325 | + if mr.Aggregate { |
| 326 | + row.RouteIDs = append([]uint64(nil), mr.MemberRoutes...) |
| 327 | + } |
| 328 | + return row |
| 329 | +} |
| 330 | + |
| 331 | +func bucketIDFor(mr keyviz.MatrixRow) string { |
| 332 | + if mr.Aggregate { |
| 333 | + return "virtual:" + strconv.FormatUint(mr.RouteID, 10) |
| 334 | + } |
| 335 | + return "route:" + strconv.FormatUint(mr.RouteID, 10) |
| 336 | +} |
| 337 | + |
| 338 | +// applyKeyVizRowBudget mirrors the adapter Phase-1 simplification: |
| 339 | +// activity-descending truncation rather than design §5.5's lexicographic |
| 340 | +// merge. Future work should swap in the spec'd merge once the |
| 341 | +// virtual-bucket plumbing supports synthesis at the response layer. |
| 342 | +// |
| 343 | +// Sort uses the precomputed row.total (accumulated during pivot) so |
| 344 | +// the comparator is O(1), not O(M). BucketID breaks activity ties |
| 345 | +// deterministically — the SPA refresh on the same data must yield the |
| 346 | +// same row set. |
| 347 | +func applyKeyVizRowBudget(rows []KeyVizRow, budget int) []KeyVizRow { |
| 348 | + if budget <= 0 || len(rows) <= budget { |
| 349 | + return rows |
| 350 | + } |
| 351 | + sort.Slice(rows, func(i, j int) bool { |
| 352 | + if rows[i].total != rows[j].total { |
| 353 | + return rows[i].total > rows[j].total |
| 354 | + } |
| 355 | + return rows[i].BucketID < rows[j].BucketID |
| 356 | + }) |
| 357 | + return rows[:budget] |
| 358 | +} |
| 359 | + |
| 360 | +func sortKeyVizRowsByStart(rows []KeyVizRow) { |
| 361 | + sort.Slice(rows, func(i, j int) bool { |
| 362 | + if c := bytes.Compare(rows[i].Start, rows[j].Start); c != 0 { |
| 363 | + return c < 0 |
| 364 | + } |
| 365 | + return rows[i].BucketID < rows[j].BucketID |
| 366 | + }) |
| 367 | +} |
0 commit comments