|
| 1 | +package admin |
| 2 | + |
| 3 | +import ( |
| 4 | + "bytes" |
| 5 | + "errors" |
| 6 | + "log/slog" |
| 7 | + "net/http" |
| 8 | + "sort" |
| 9 | + "strconv" |
| 10 | + "time" |
| 11 | + |
| 12 | + "github.com/bootjp/elastickv/keyviz" |
| 13 | + "github.com/goccy/go-json" |
| 14 | +) |
| 15 | + |
| 16 | +// KeyVizSource is the small contract the keyviz handler depends on. |
| 17 | +// Production wires this to a real *keyviz.MemSampler; tests use a |
| 18 | +// stub that returns canned columns. |
| 19 | +// |
| 20 | +// Snapshot returns the matrix columns within [from, to). Either bound |
| 21 | +// may be the zero Time meaning unbounded on that side. Implementations |
| 22 | +// MUST return rows the caller can mutate freely (a deep copy) — see |
| 23 | +// keyviz.MemSampler.Snapshot. |
| 24 | +type KeyVizSource interface { |
| 25 | + Snapshot(from, to time.Time) []keyviz.MatrixColumn |
| 26 | +} |
| 27 | + |
| 28 | +// KeyVizSeries selects which counter on a MatrixRow the response |
| 29 | +// surfaces in `Values`. Wire form mirrors the proto enum but uses |
| 30 | +// lowercase strings so the SPA can pass `?series=writes` directly |
| 31 | +// without an extra encoding round-trip. |
| 32 | +type KeyVizSeries string |
| 33 | + |
| 34 | +const ( |
| 35 | + keyVizSeriesReads KeyVizSeries = "reads" |
| 36 | + keyVizSeriesWrites KeyVizSeries = "writes" |
| 37 | + keyVizSeriesReadBytes KeyVizSeries = "read_bytes" |
| 38 | + keyVizSeriesWriteBytes KeyVizSeries = "write_bytes" |
| 39 | +) |
| 40 | + |
| 41 | +// keyVizDefaultSeries matches the design doc §4.1 default. Writes is |
| 42 | +// the primary signal the heatmap is built around; reads will land in |
| 43 | +// a follow-up phase (read sampling not yet wired). |
| 44 | +const keyVizDefaultSeries = keyVizSeriesWrites |
| 45 | + |
| 46 | +// keyVizRowBudgetCap is the upper bound on the per-request row |
| 47 | +// budget. Mirrors the same cap on the gRPC GetKeyVizMatrix RPC so |
| 48 | +// the SPA cannot force unbounded payloads via the JSON path. Design |
| 49 | +// doc §4.1. |
| 50 | +const keyVizRowBudgetCap = 1024 |
| 51 | + |
| 52 | +// KeyVizMatrix is the row-major JSON wire form returned by |
| 53 | +// /admin/api/v1/keyviz/matrix. Mirrors the proto GetKeyVizMatrixResponse |
| 54 | +// shape so a future refactor can share a single pivot helper across |
| 55 | +// the adapter (gRPC) and admin (JSON) paths. |
| 56 | +type KeyVizMatrix struct { |
| 57 | + ColumnUnixMs []int64 `json:"column_unix_ms"` |
| 58 | + Rows []KeyVizRow `json:"rows"` |
| 59 | + Series KeyVizSeries `json:"series"` |
| 60 | + GeneratedAt time.Time `json:"generated_at"` |
| 61 | +} |
| 62 | + |
| 63 | +// KeyVizRow is one route's worth of activity across the column window, |
| 64 | +// matching the proto KeyVizRow layout. Values is parallel to |
| 65 | +// KeyVizMatrix.ColumnUnixMs — Values[j] is the counter for that route |
| 66 | +// at column j. |
| 67 | +type KeyVizRow struct { |
| 68 | + BucketID string `json:"bucket_id"` |
| 69 | + Start []byte `json:"start"` |
| 70 | + End []byte `json:"end"` |
| 71 | + Aggregate bool `json:"aggregate"` |
| 72 | + RouteIDs []uint64 `json:"route_ids,omitempty"` |
| 73 | + RouteIDsTruncated bool `json:"route_ids_truncated,omitempty"` |
| 74 | + RouteCount uint64 `json:"route_count"` |
| 75 | + Values []uint64 `json:"values"` |
| 76 | +} |
| 77 | + |
| 78 | +// KeyVizHandler serves GET /admin/api/v1/keyviz/matrix. |
| 79 | +// |
| 80 | +// Query parameters (all optional): |
| 81 | +// |
| 82 | +// series - reads | writes | read_bytes | write_bytes (default: writes) |
| 83 | +// from_unix_ms - lower bound in unix ms (default: unbounded) |
| 84 | +// to_unix_ms - upper bound in unix ms (default: unbounded) |
| 85 | +// rows - row budget; 0 means no cap, capped at 1024 (default: 0) |
| 86 | +// |
| 87 | +// Returns 503 codes.Unavailable when no sampler is configured so the |
| 88 | +// SPA can distinguish "keyviz disabled" from "no data yet" (the |
| 89 | +// latter is a successful empty matrix). |
| 90 | +type KeyVizHandler struct { |
| 91 | + source KeyVizSource |
| 92 | + now func() time.Time |
| 93 | + logger *slog.Logger |
| 94 | +} |
| 95 | + |
| 96 | +// NewKeyVizHandler wires a KeyVizSource into the HTTP handler. |
| 97 | +// source may be nil; calls to ServeHTTP will then return 503 with |
| 98 | +// code "keyviz_disabled". |
| 99 | +func NewKeyVizHandler(source KeyVizSource) *KeyVizHandler { |
| 100 | + return &KeyVizHandler{ |
| 101 | + source: source, |
| 102 | + now: func() time.Time { return time.Now().UTC() }, |
| 103 | + logger: slog.Default(), |
| 104 | + } |
| 105 | +} |
| 106 | + |
| 107 | +// WithLogger overrides the slog destination so main.go can attach a |
| 108 | +// component tag without changing the constructor signature. |
| 109 | +func (h *KeyVizHandler) WithLogger(l *slog.Logger) *KeyVizHandler { |
| 110 | + if l == nil { |
| 111 | + return h |
| 112 | + } |
| 113 | + h.logger = l |
| 114 | + return h |
| 115 | +} |
| 116 | + |
| 117 | +// WithClock lets tests inject a deterministic GeneratedAt. |
| 118 | +func (h *KeyVizHandler) WithClock(now func() time.Time) *KeyVizHandler { |
| 119 | + if now == nil { |
| 120 | + return h |
| 121 | + } |
| 122 | + h.now = now |
| 123 | + return h |
| 124 | +} |
| 125 | + |
| 126 | +func (h *KeyVizHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { |
| 127 | + if r.Method != http.MethodGet { |
| 128 | + writeJSONError(w, http.StatusMethodNotAllowed, "method_not_allowed", "only GET") |
| 129 | + return |
| 130 | + } |
| 131 | + if h.source == nil { |
| 132 | + writeJSONError(w, http.StatusServiceUnavailable, "keyviz_disabled", |
| 133 | + "key visualizer sampler is not configured on this node") |
| 134 | + return |
| 135 | + } |
| 136 | + params, err := parseKeyVizParams(r) |
| 137 | + if err != nil { |
| 138 | + writeJSONError(w, http.StatusBadRequest, "invalid_query", err.Error()) |
| 139 | + return |
| 140 | + } |
| 141 | + cols := h.source.Snapshot(params.from, params.to) |
| 142 | + matrix := pivotKeyVizColumns(cols, params.series, params.rows) |
| 143 | + matrix.GeneratedAt = h.now() |
| 144 | + w.Header().Set("Content-Type", "application/json; charset=utf-8") |
| 145 | + w.Header().Set("Cache-Control", "no-store") |
| 146 | + w.WriteHeader(http.StatusOK) |
| 147 | + if err := json.NewEncoder(w).Encode(matrix); err != nil { |
| 148 | + h.logger.LogAttrs(r.Context(), slog.LevelWarn, "admin keyviz response encode failed", |
| 149 | + slog.String("error", err.Error()), |
| 150 | + ) |
| 151 | + } |
| 152 | +} |
| 153 | + |
| 154 | +// keyVizParams is the parsed query-string form of a matrix request. |
| 155 | +type keyVizParams struct { |
| 156 | + series KeyVizSeries |
| 157 | + from time.Time |
| 158 | + to time.Time |
| 159 | + rows int |
| 160 | +} |
| 161 | + |
| 162 | +func parseKeyVizParams(r *http.Request) (keyVizParams, error) { |
| 163 | + p := keyVizParams{series: keyVizDefaultSeries} |
| 164 | + q := r.URL.Query() |
| 165 | + if err := setKeyVizSeriesParam(&p, q.Get("series")); err != nil { |
| 166 | + return keyVizParams{}, err |
| 167 | + } |
| 168 | + if err := setKeyVizTimeParam(&p.from, "from_unix_ms", q.Get("from_unix_ms")); err != nil { |
| 169 | + return keyVizParams{}, err |
| 170 | + } |
| 171 | + if err := setKeyVizTimeParam(&p.to, "to_unix_ms", q.Get("to_unix_ms")); err != nil { |
| 172 | + return keyVizParams{}, err |
| 173 | + } |
| 174 | + if err := setKeyVizRowsParam(&p.rows, q.Get("rows")); err != nil { |
| 175 | + return keyVizParams{}, err |
| 176 | + } |
| 177 | + return p, nil |
| 178 | +} |
| 179 | + |
| 180 | +func setKeyVizSeriesParam(p *keyVizParams, raw string) error { |
| 181 | + if raw == "" { |
| 182 | + return nil |
| 183 | + } |
| 184 | + series, err := parseKeyVizSeries(raw) |
| 185 | + if err != nil { |
| 186 | + return err |
| 187 | + } |
| 188 | + p.series = series |
| 189 | + return nil |
| 190 | +} |
| 191 | + |
| 192 | +func setKeyVizTimeParam(dst *time.Time, name, raw string) error { |
| 193 | + if raw == "" { |
| 194 | + return nil |
| 195 | + } |
| 196 | + t, err := parseUnixMs(name, raw) |
| 197 | + if err != nil { |
| 198 | + return err |
| 199 | + } |
| 200 | + *dst = t |
| 201 | + return nil |
| 202 | +} |
| 203 | + |
| 204 | +func setKeyVizRowsParam(dst *int, raw string) error { |
| 205 | + if raw == "" { |
| 206 | + return nil |
| 207 | + } |
| 208 | + n, err := strconv.Atoi(raw) |
| 209 | + if err != nil || n < 0 { |
| 210 | + return errors.New("rows must be a non-negative integer") |
| 211 | + } |
| 212 | + if n > keyVizRowBudgetCap { |
| 213 | + n = keyVizRowBudgetCap |
| 214 | + } |
| 215 | + *dst = n |
| 216 | + return nil |
| 217 | +} |
| 218 | + |
| 219 | +func parseKeyVizSeries(s string) (KeyVizSeries, error) { |
| 220 | + switch KeyVizSeries(s) { |
| 221 | + case keyVizSeriesReads, keyVizSeriesWrites, keyVizSeriesReadBytes, keyVizSeriesWriteBytes: |
| 222 | + return KeyVizSeries(s), nil |
| 223 | + default: |
| 224 | + return "", errors.New("series must be one of: reads, writes, read_bytes, write_bytes") |
| 225 | + } |
| 226 | +} |
| 227 | + |
| 228 | +func parseUnixMs(name, raw string) (time.Time, error) { |
| 229 | + n, err := strconv.ParseInt(raw, 10, 64) |
| 230 | + if err != nil { |
| 231 | + return time.Time{}, errors.New(name + " must be an integer (unix milliseconds)") |
| 232 | + } |
| 233 | + if n == 0 { |
| 234 | + return time.Time{}, nil |
| 235 | + } |
| 236 | + return time.UnixMilli(n).UTC(), nil |
| 237 | +} |
| 238 | + |
| 239 | +// pivotKeyVizColumns flips the column-major MatrixColumn slice into |
| 240 | +// the row-major JSON shape, picks the requested series counter from |
| 241 | +// each MatrixRow, and applies the rowBudget cap (top-N by total |
| 242 | +// activity) before sorting back into Start order. |
| 243 | +// |
| 244 | +// Mirrors adapter.matrixToProto exactly — the duplication is |
| 245 | +// intentional for now to keep the gRPC and JSON paths independent; |
| 246 | +// extracting a shared helper into the keyviz package is a future |
| 247 | +// cleanup. |
| 248 | +func pivotKeyVizColumns(cols []keyviz.MatrixColumn, series KeyVizSeries, rowBudget int) KeyVizMatrix { |
| 249 | + pick := keyVizSeriesPicker(series) |
| 250 | + matrix := KeyVizMatrix{ |
| 251 | + Series: series, |
| 252 | + ColumnUnixMs: make([]int64, len(cols)), |
| 253 | + } |
| 254 | + rowsByID := make(map[uint64]*KeyVizRow) |
| 255 | + order := make([]uint64, 0) |
| 256 | + for j, col := range cols { |
| 257 | + matrix.ColumnUnixMs[j] = col.At.UnixMilli() |
| 258 | + for _, mr := range col.Rows { |
| 259 | + row, ok := rowsByID[mr.RouteID] |
| 260 | + if !ok { |
| 261 | + row = newKeyVizRowFrom(mr, len(cols)) |
| 262 | + rowsByID[mr.RouteID] = row |
| 263 | + order = append(order, mr.RouteID) |
| 264 | + } |
| 265 | + row.Values[j] = pick(mr) |
| 266 | + } |
| 267 | + } |
| 268 | + matrix.Rows = make([]KeyVizRow, len(order)) |
| 269 | + for i, id := range order { |
| 270 | + matrix.Rows[i] = *rowsByID[id] |
| 271 | + } |
| 272 | + matrix.Rows = applyKeyVizRowBudget(matrix.Rows, rowBudget) |
| 273 | + sortKeyVizRowsByStart(matrix.Rows) |
| 274 | + return matrix |
| 275 | +} |
| 276 | + |
| 277 | +func keyVizSeriesPicker(series KeyVizSeries) func(keyviz.MatrixRow) uint64 { |
| 278 | + switch series { |
| 279 | + case keyVizSeriesReads: |
| 280 | + return func(r keyviz.MatrixRow) uint64 { return r.Reads } |
| 281 | + case keyVizSeriesReadBytes: |
| 282 | + return func(r keyviz.MatrixRow) uint64 { return r.ReadBytes } |
| 283 | + case keyVizSeriesWriteBytes: |
| 284 | + return func(r keyviz.MatrixRow) uint64 { return r.WriteBytes } |
| 285 | + case keyVizSeriesWrites: |
| 286 | + return func(r keyviz.MatrixRow) uint64 { return r.Writes } |
| 287 | + default: |
| 288 | + return func(r keyviz.MatrixRow) uint64 { return r.Writes } |
| 289 | + } |
| 290 | +} |
| 291 | + |
| 292 | +func newKeyVizRowFrom(mr keyviz.MatrixRow, numCols int) *KeyVizRow { |
| 293 | + total := mr.MemberRoutesTotal |
| 294 | + if !mr.Aggregate && total == 0 { |
| 295 | + total = 1 |
| 296 | + } |
| 297 | + row := &KeyVizRow{ |
| 298 | + BucketID: bucketIDFor(mr), |
| 299 | + Start: append([]byte(nil), mr.Start...), |
| 300 | + End: append([]byte(nil), mr.End...), |
| 301 | + Aggregate: mr.Aggregate, |
| 302 | + RouteCount: total, |
| 303 | + RouteIDsTruncated: mr.Aggregate && total > uint64(len(mr.MemberRoutes)), |
| 304 | + Values: make([]uint64, numCols), |
| 305 | + } |
| 306 | + if mr.Aggregate { |
| 307 | + row.RouteIDs = append([]uint64(nil), mr.MemberRoutes...) |
| 308 | + } |
| 309 | + return row |
| 310 | +} |
| 311 | + |
| 312 | +func bucketIDFor(mr keyviz.MatrixRow) string { |
| 313 | + if mr.Aggregate { |
| 314 | + return "virtual:" + strconv.FormatUint(mr.RouteID, 10) |
| 315 | + } |
| 316 | + return "route:" + strconv.FormatUint(mr.RouteID, 10) |
| 317 | +} |
| 318 | + |
| 319 | +// applyKeyVizRowBudget mirrors the adapter Phase-1 simplification: |
| 320 | +// activity-descending truncation rather than design §5.5's lexicographic |
| 321 | +// merge. Future work should swap in the spec'd merge once the |
| 322 | +// virtual-bucket plumbing supports synthesis at the response layer. |
| 323 | +func applyKeyVizRowBudget(rows []KeyVizRow, budget int) []KeyVizRow { |
| 324 | + if budget <= 0 || len(rows) <= budget { |
| 325 | + return rows |
| 326 | + } |
| 327 | + sort.Slice(rows, func(i, j int) bool { |
| 328 | + return rowActivityTotal(rows[i]) > rowActivityTotal(rows[j]) |
| 329 | + }) |
| 330 | + return rows[:budget] |
| 331 | +} |
| 332 | + |
| 333 | +func rowActivityTotal(r KeyVizRow) uint64 { |
| 334 | + var sum uint64 |
| 335 | + for _, v := range r.Values { |
| 336 | + sum += v |
| 337 | + } |
| 338 | + return sum |
| 339 | +} |
| 340 | + |
| 341 | +func sortKeyVizRowsByStart(rows []KeyVizRow) { |
| 342 | + sort.Slice(rows, func(i, j int) bool { |
| 343 | + if c := bytes.Compare(rows[i].Start, rows[j].Start); c != 0 { |
| 344 | + return c < 0 |
| 345 | + } |
| 346 | + return rows[i].BucketID < rows[j].BucketID |
| 347 | + }) |
| 348 | +} |
0 commit comments