From 323c1d12fec7251dcd881ffb300dc5b4687c70ff Mon Sep 17 00:00:00 2001 From: BoKeum Date: Mon, 27 Apr 2026 13:00:08 +0900 Subject: [PATCH 1/3] refactor: migrate kernel live_stat from Valkey to Prometheus --- changes/11330.enhance.md | 1 + src/ai/backend/agent/stats.py | 8 +- src/ai/backend/appproxy/worker/types.py | 3 +- src/ai/backend/client/output/formatters.py | 2 +- .../valkey_client/valkey_stat/client.py | 3 +- src/ai/backend/common/metrics/types.py | 98 ++++++- src/ai/backend/common/types.py | 30 -- .../backend/manager/api/gql_legacy/kernel.py | 40 ++- .../manager/api/gql_legacy/stat_converter.py | 105 +++++++ .../manager/api/gql_legacy/statistics.py | 9 - .../clients/prometheus/fixed_query_builder.py | 3 - .../api/gql_legacy/test_stat_converter.py | 277 ++++++++++++++++++ 12 files changed, 518 insertions(+), 61 deletions(-) create mode 100644 changes/11330.enhance.md create mode 100644 src/ai/backend/manager/api/gql_legacy/stat_converter.py create mode 100644 tests/unit/manager/api/gql_legacy/test_stat_converter.py diff --git a/changes/11330.enhance.md b/changes/11330.enhance.md new file mode 100644 index 00000000000..ced9388a908 --- /dev/null +++ b/changes/11330.enhance.md @@ -0,0 +1 @@ +Migrate kernel `live_stat` GraphQL resolver from Valkey to Prometheus while preserving the legacy wire shape diff --git a/src/ai/backend/agent/stats.py b/src/ai/backend/agent/stats.py index bd9b9eccaf7..c866c5b5ae5 100644 --- a/src/ai/backend/agent/stats.py +++ b/src/ai/backend/agent/stats.py @@ -26,15 +26,17 @@ from ai.backend.common import msgpack from ai.backend.common.identity import is_containerized from ai.backend.common.metrics.metric import StageObserver -from ai.backend.common.metrics.types import UTILIZATION_METRIC_INTERVAL +from ai.backend.common.metrics.types import ( + UTILIZATION_METRIC_INTERVAL, + MetricValue, + MovingStatValue, +) from ai.backend.common.types import ( PID, ContainerId, DeviceId, KernelId, MetricKey, - MetricValue, - MovingStatValue, SessionId, SlotName, ) diff --git a/src/ai/backend/appproxy/worker/types.py b/src/ai/backend/appproxy/worker/types.py index 8053872537d..92bcee57bbf 100644 --- a/src/ai/backend/appproxy/worker/types.py +++ b/src/ai/backend/appproxy/worker/types.py @@ -42,10 +42,9 @@ SafeGauge, SafeHistogram, ) +from ai.backend.common.metrics.types import MetricValue, MovingStatValue from ai.backend.common.types import ( MetricKey, - MetricValue, - MovingStatValue, RuntimeVariant, ) diff --git a/src/ai/backend/client/output/formatters.py b/src/ai/backend/client/output/formatters.py index b2e93ff7fd3..056e1cda4fa 100644 --- a/src/ai/backend/client/output/formatters.py +++ b/src/ai/backend/client/output/formatters.py @@ -9,7 +9,7 @@ import humanize -from ai.backend.common.types import MetricValue +from ai.backend.common.metrics.types import MetricValue from .types import AbstractOutputFormatter, FieldSpec diff --git a/src/ai/backend/common/clients/valkey_client/valkey_stat/client.py b/src/ai/backend/common/clients/valkey_client/valkey_stat/client.py index 34f9b801525..51fbfe08bc8 100644 --- a/src/ai/backend/common/clients/valkey_client/valkey_stat/client.py +++ b/src/ai/backend/common/clients/valkey_client/valkey_stat/client.py @@ -25,6 +25,7 @@ from ai.backend.common.exception import BackendAIError from ai.backend.common.json import dump_json_str, load_json from ai.backend.common.metrics.metric import DomainType, LayerType +from ai.backend.common.metrics.types import MetricValue from ai.backend.common.resilience import ( BackoffStrategy, MetricArgs, @@ -34,7 +35,7 @@ RetryPolicy, ) from ai.backend.common.resource.types import TotalResourceData -from ai.backend.common.types import AccessKey, MetricKey, MetricValue, ValkeyTarget +from ai.backend.common.types import AccessKey, MetricKey, ValkeyTarget from ai.backend.logging.utils import BraceStyleAdapter log = BraceStyleAdapter(logging.getLogger(__spec__.name)) diff --git a/src/ai/backend/common/metrics/types.py b/src/ai/backend/common/metrics/types.py index 7c5dab12523..7c40460ee6b 100644 --- a/src/ai/backend/common/metrics/types.py +++ b/src/ai/backend/common/metrics/types.py @@ -1,7 +1,58 @@ -from typing import Final +from typing import Final, TypedDict UNDEFINED: Final[str] = "undefined" + +class MovingStatValue(TypedDict): + min: str + max: str + sum: str + avg: str + diff: str + rate: str + version: int | None # for legacy client compatibility + + +MetricValue = TypedDict( + "MetricValue", + { + "current": str, + "capacity": str, + "pct": str, + "unit_hint": str, + "stats.min": str, + "stats.max": str, + "stats.sum": str, + "stats.avg": str, + "stats.diff": str, + "stats.rate": str, + "stats.version": int | None, + }, +) + + +def make_default_metric_value(unit_hint: str) -> MetricValue: + """Return a `MetricValue` populated with neutral defaults. + + All numeric string fields are `"0"` (including `capacity`, matching the + legacy Valkey shape where every metric carried a string capacity). + `unit_hint` is supplied by the caller. + """ + return MetricValue({ + "current": "0", + "capacity": "0", + "pct": "0", + "unit_hint": unit_hint, + "stats.min": "0", + "stats.max": "0", + "stats.sum": "0", + "stats.avg": "0", + "stats.diff": "0", + "stats.rate": "0", + "stats.version": None, + }) + + UTILIZATION_METRIC_INTERVAL: Final[float] = 5.0 UTILIZATION_METRIC_DETENTION: Final[float] = 600.0 # 10 minutes @@ -9,3 +60,48 @@ CONTAINER_UTILIZATION_METRIC_LABEL_NAME: Final[str] = "container_metric_name" DEVICE_UTILIZATION_METRIC_LABEL_NAME: Final[str] = "device_metric_name" PROCESS_UTILIZATION_METRIC_LABEL_NAME: Final[str] = "process_metric_name" + +# Metric-name classification used by the legacy live_stat dict converter. +# These mirror the semantics that Worker's MovingStatistics produced when +# kernel stats were stored in Valkey: +# - RATE_STAT_METRICS: stats.rate is meaningful (rate of change per second). +# - DIFF_STAT_METRICS: stats.diff is meaningful (delta over the last window). +RATE_STAT_METRICS: Final[frozenset[str]] = frozenset({"net_rx", "net_tx"}) +DIFF_STAT_METRICS: Final[frozenset[str]] = frozenset({"cpu_util"}) + +# Per-metric unit hint emitted by the agent (source of truth: src/ai/backend/agent/docker/intrinsic.py). +METRIC_UNIT_HINTS: Final[dict[str, str]] = { + "cpu_used": "msec", + "cpu_util": "percent", + "mem": "bytes", + "net_rx": "bps", + "net_tx": "bps", + "io_read": "bytes", + "io_write": "bytes", + "io_scratch_size": "bytes", +} + + +def resolve_unit_hint(metric_name: str) -> str: + """Return the unit_hint for a Backend.AI container metric name. + + Prometheus does not carry the agent-side `unit_hint` in its samples, so the + manager has to recover it from the metric name alone. Lookup order: + + 1. Explicit registration in :data:`METRIC_UNIT_HINTS` (highest priority). + 2. Naming-convention fallback for plugin metrics that follow Backend.AI + conventions (e.g., `cuda_util`, `gpu_mem`, `tpu_util`). + 3. The metric_name itself as a last resort — preserves the sample data + and surfaces the missing registration to the WebUI via the response. + """ + if metric_name in METRIC_UNIT_HINTS: + return METRIC_UNIT_HINTS[metric_name] + if metric_name.endswith("_util"): + return "percent" + if metric_name == "mem" or metric_name.endswith("_mem"): + return "bytes" + if metric_name.startswith("io_"): + return "bytes" + if metric_name.startswith("net_"): + return "bps" + return metric_name diff --git a/src/ai/backend/common/types.py b/src/ai/backend/common/types.py index 3318da96136..92965efbb54 100644 --- a/src/ai/backend/common/types.py +++ b/src/ai/backend/common/types.py @@ -106,7 +106,6 @@ "KernelEnqueueingConfig", "KernelId", "MetricKey", - "MetricValue", "ModelServiceProfile", "ModelServiceStatus", "MountExpression", @@ -115,7 +114,6 @@ "MountPermissionLiteral", "MountPoint", "MountTypes", - "MovingStatValue", "PreemptionMode", "PreemptionOrder", "PromMetric", @@ -565,34 +563,6 @@ class AbuseReport(TypedDict): abuse_report: str | None -class MovingStatValue(TypedDict): - min: str - max: str - sum: str - avg: str - diff: str - rate: str - version: int | None # for legacy client compatibility - - -MetricValue = TypedDict( - "MetricValue", - { - "current": str, - "capacity": str | None, - "pct": str, - "unit_hint": str, - "stats.min": str, - "stats.max": str, - "stats.sum": str, - "stats.avg": str, - "stats.diff": str, - "stats.rate": str, - "stats.version": int | None, - }, -) - - class IntrinsicSlotNames(enum.Enum): CPU = SlotName("cpu") MEMORY = SlotName("mem") diff --git a/src/ai/backend/manager/api/gql_legacy/kernel.py b/src/ai/backend/manager/api/gql_legacy/kernel.py index 47e4914280d..304f4673f0d 100644 --- a/src/ai/backend/manager/api/gql_legacy/kernel.py +++ b/src/ai/backend/manager/api/gql_legacy/kernel.py @@ -24,6 +24,7 @@ KernelId, SessionId, ) +from ai.backend.manager.api.gql_legacy.stat_converter import LegacyLiveStatConverter from ai.backend.manager.data.kernel.types import KernelStatus from ai.backend.manager.defs import DEFAULT_ROLE from ai.backend.manager.models.group import groups @@ -42,6 +43,7 @@ QueryFilterParser, ) from ai.backend.manager.models.user import UserRole, users +from ai.backend.manager.services.metric.actions.live_stat import ContainerLiveStatAction from .base import ( BigInt, @@ -67,6 +69,23 @@ ) +async def _batch_load_kernel_live_stat( + ctx: GraphQueryContext, + kernel_ids: Sequence[KernelId], +) -> list[dict[str, Any] | None]: + """Prometheus-backed replacement for the old Valkey `KernelStatistics.by_kernel` + loader. Returns the legacy `dict[metric_name, MetricValue]` shape (or `None` + when the kernel has no Prometheus samples) preserving wire compatibility. + """ + if not kernel_ids: + return [] + action_result = await ctx.processors.metric.query_container_live_stat.wait_for_complete( + ContainerLiveStatAction(kernel_ids=list(kernel_ids)) + ) + converted = LegacyLiveStatConverter().convert(action_result.stats) + return [converted.get(kid) for kid in kernel_ids] + + class KernelNode(graphene.ObjectType): # type: ignore[misc] class Meta: interfaces = (AsyncNode,) @@ -190,17 +209,10 @@ async def resolve_image(self, info: graphene.ResolveInfo) -> ImageNode | None: async def resolve_live_stat(self, info: graphene.ResolveInfo) -> dict[str, Any] | None: graph_ctx: GraphQueryContext = info.context loader = graph_ctx.dataloader_manager.get_loader_by_func( - graph_ctx, self.batch_load_live_stat + graph_ctx, _batch_load_kernel_live_stat ) return cast(dict[str, Any] | None, await loader.load(self.row_id)) - @classmethod - async def batch_load_live_stat( - cls, ctx: GraphQueryContext, kernel_ids: Sequence[KernelId] - ) -> list[dict[str, Any] | None]: - kernel_ids_str = [str(kid) for kid in kernel_ids] - return await ctx.valkey_stat.get_session_statistics_batch(kernel_ids_str) - class KernelConnection(Connection): class Meta: @@ -313,7 +325,9 @@ def from_row(cls, ctx: GraphQueryContext, row: KernelRow | None) -> ComputeConta # we can leave last_stat value for legacy support, as an alias to last_stat async def resolve_live_stat(self, info: graphene.ResolveInfo) -> Mapping[str, Any] | None: graph_ctx: GraphQueryContext = info.context - loader = graph_ctx.dataloader_manager.get_loader(graph_ctx, "KernelStatistics.by_kernel") + loader = graph_ctx.dataloader_manager.get_loader_by_func( + graph_ctx, _batch_load_kernel_live_stat + ) return cast(Mapping[str, Any] | None, await loader.load(self.id)) async def resolve_last_stat(self, info: graphene.ResolveInfo) -> Mapping[str, Any] | None: @@ -606,7 +620,9 @@ class Meta: # we can leave last_stat value for legacy support, as an alias to last_stat async def resolve_live_stat(self, info: graphene.ResolveInfo) -> Mapping[str, Any] | None: graph_ctx: GraphQueryContext = info.context - loader = graph_ctx.dataloader_manager.get_loader(graph_ctx, "KernelStatistics.by_kernel") + loader = graph_ctx.dataloader_manager.get_loader_by_func( + graph_ctx, _batch_load_kernel_live_stat + ) return cast(Mapping[str, Any] | None, await loader.load(self.id)) async def resolve_last_stat(self, info: graphene.ResolveInfo) -> Mapping[str, Any] | None: @@ -632,7 +648,9 @@ async def _resolve_legacy_metric( if value is None: return convert_type(0) return convert_type(value) - loader = graph_ctx.dataloader_manager.get_loader(graph_ctx, "KernelStatistics.by_kernel") + loader = graph_ctx.dataloader_manager.get_loader_by_func( + graph_ctx, _batch_load_kernel_live_stat + ) kstat = await loader.load(self.id) if kstat is None: return convert_type(0) diff --git a/src/ai/backend/manager/api/gql_legacy/stat_converter.py b/src/ai/backend/manager/api/gql_legacy/stat_converter.py new file mode 100644 index 00000000000..c5bdb841f9a --- /dev/null +++ b/src/ai/backend/manager/api/gql_legacy/stat_converter.py @@ -0,0 +1,105 @@ +from collections.abc import Iterable + +from ai.backend.common.clients.prometheus.types import MetricValue as PrometheusMetricValue +from ai.backend.common.clients.prometheus.types import ValueType +from ai.backend.common.metrics.types import ( + DIFF_STAT_METRICS, + RATE_STAT_METRICS, + UTILIZATION_METRIC_INTERVAL, + MetricValue, + make_default_metric_value, + resolve_unit_hint, +) +from ai.backend.common.types import KernelId +from ai.backend.manager.data.metric.types import KernelLiveStatBatchResult + + +class LegacyLiveStatConverter: + """Adapt `KernelLiveStatBatchResult` into the legacy + `dict[metric_name, MetricValue]` shape consumed by GQL/WebUI. + + Merge order from upstream is gauge -> diff -> rate, so for + RATE/DIFF metrics the same `(name, CURRENT)` tuple appears twice; + `currents[0]` is the raw gauge sample, `currents[-1]` is the + rate/diff query result. + + `stats.max` / `stats.avg` are not populated — see + `docs/kernel-live-stat-followup-issues.md`. + """ + + def convert( + self, result: KernelLiveStatBatchResult + ) -> dict[KernelId, dict[str, MetricValue] | None]: + out: dict[KernelId, dict[str, MetricValue] | None] = {} + for kernel_id, entry in result.entries.items(): + if not entry.values: + out[kernel_id] = None + continue + out[kernel_id] = self._convert_one_kernel(entry.values) + return out + + def _convert_one_kernel( + self, values: Iterable[PrometheusMetricValue] + ) -> dict[str, MetricValue]: + grouped: dict[str, list[PrometheusMetricValue]] = {} + for v in values: + grouped.setdefault(v.metric_name, []).append(v) + + per_metric: dict[str, MetricValue] = {} + for name, samples in grouped.items(): + per_metric[name] = self._convert_metric_samples(name, samples) + return per_metric + + def _convert_metric_samples( + self, metric_name: str, samples: list[PrometheusMetricValue] + ) -> MetricValue: + # `resolve_unit_hint` falls back to naming conventions and finally + # the metric_name itself for unregistered plugin metrics. + unit_hint = resolve_unit_hint(metric_name) + out = make_default_metric_value(unit_hint=unit_hint) + + currents = [s.value for s in samples if s.value_type is ValueType.CURRENT] + capacities = [s.value for s in samples if s.value_type is ValueType.CAPACITY] + pcts = [s.value for s in samples if s.value_type is ValueType.PCT] + + is_rate_metric = metric_name in RATE_STAT_METRICS + is_diff_metric = metric_name in DIFF_STAT_METRICS + + if currents: + # RATE/DIFF: prefer the rate/diff query result over the raw gauge, + # mirroring the legacy `current_hook=stats.rate|diff` behavior. + if (is_rate_metric or is_diff_metric) and len(currents) > 1: + out["current"] = currents[-1] + else: + out["current"] = currents[0] + if capacities: + out["capacity"] = capacities[-1] + + if is_rate_metric and currents: + # RATE template applies `/ UTILIZATION_METRIC_INTERVAL`; undo it + # here to recover the per-second magnitude legacy `stats.rate` had. + # TODO: separate the rate query from the gauge query so this + # hack-multiply isn't needed. + try: + rate_value = float(currents[-1]) * UTILIZATION_METRIC_INTERVAL + out["stats.rate"] = f"{rate_value:.6f}" + except ValueError: + out["stats.rate"] = currents[-1] + if is_diff_metric and currents: + # Per-second rate, not the legacy per-5s delta — GQL consumers + # only read `cpu_util.pct`, so magnitude mismatch is acceptable. + out["stats.diff"] = currents[-1] + + # Derive pct from current/capacity when no PCT sample was emitted. + if pcts: + out["pct"] = pcts[-1] + else: + try: + current_value = float(out["current"]) + capacity_value = float(out["capacity"]) + if capacity_value > 0: + out["pct"] = f"{current_value / capacity_value * 100:.2f}" + except ValueError: + pass + + return out diff --git a/src/ai/backend/manager/api/gql_legacy/statistics.py b/src/ai/backend/manager/api/gql_legacy/statistics.py index 2cc9d5d1c63..5dd158bb0da 100644 --- a/src/ai/backend/manager/api/gql_legacy/statistics.py +++ b/src/ai/backend/manager/api/gql_legacy/statistics.py @@ -28,15 +28,6 @@ async def batch_load_by_kernel_impl( session_ids_str = [str(sess_id) for sess_id in session_ids] return await valkey_stat_client.get_session_statistics_batch(session_ids_str) - @classmethod - async def batch_load_by_kernel( - cls, - ctx: GraphQueryContext, - session_ids: Sequence[SessionId], - ) -> Sequence[Mapping[str, Any] | None]: - """wrapper of `KernelStatistics.batch_load_by_kernel_impl()` for aiodataloader""" - return await cls.batch_load_by_kernel_impl(ctx.valkey_stat, session_ids) - @classmethod async def batch_load_inference_metrics_by_kernel( cls, diff --git a/src/ai/backend/manager/clients/prometheus/fixed_query_builder.py b/src/ai/backend/manager/clients/prometheus/fixed_query_builder.py index 02acd322600..b2baefde333 100644 --- a/src/ai/backend/manager/clients/prometheus/fixed_query_builder.py +++ b/src/ai/backend/manager/clients/prometheus/fixed_query_builder.py @@ -6,7 +6,6 @@ from ai.backend.common.clients.prometheus.preset import LabelMatcher, MetricPreset from ai.backend.common.clients.prometheus.querier import ContainerMetricQuerier from ai.backend.common.clients.prometheus.types import ValueType -from ai.backend.common.exception import UnreachableError from ai.backend.common.metrics.types import ( CONTAINER_UTILIZATION_METRIC_LABEL_NAME, CONTAINER_UTILIZATION_METRIC_NAME, @@ -152,5 +151,3 @@ def _get_template(self, metric_type: MetricType) -> str: return _RATE_TEMPLATE case MetricType.DIFF: return _DIFF_TEMPLATE - case _: - raise UnreachableError(f"Unknown metric type: {metric_type}") diff --git a/tests/unit/manager/api/gql_legacy/test_stat_converter.py b/tests/unit/manager/api/gql_legacy/test_stat_converter.py new file mode 100644 index 00000000000..627032cce64 --- /dev/null +++ b/tests/unit/manager/api/gql_legacy/test_stat_converter.py @@ -0,0 +1,277 @@ +import uuid +from collections.abc import Mapping, Sequence +from typing import cast + +import pytest + +from ai.backend.common.clients.prometheus.types import MetricValue, ValueType +from ai.backend.common.metrics.types import MetricValue as LegacyMetricValue +from ai.backend.common.types import KernelId +from ai.backend.manager.api.gql_legacy.stat_converter import LegacyLiveStatConverter +from ai.backend.manager.data.metric.types import KernelLiveStatBatchResult + + +@pytest.fixture +def converter() -> LegacyLiveStatConverter: + return LegacyLiveStatConverter() + + +@pytest.fixture +def kernel_id() -> KernelId: + return KernelId(uuid.uuid4()) + + +@pytest.fixture +def two_kernel_ids() -> tuple[KernelId, KernelId]: + return KernelId(uuid.uuid4()), KernelId(uuid.uuid4()) + + +def _build_result( + samples_by_kernel: Mapping[KernelId, Sequence[MetricValue]], +) -> KernelLiveStatBatchResult: + return KernelLiveStatBatchResult.from_metric_values( + list(samples_by_kernel.keys()), + {k: list(v) for k, v in samples_by_kernel.items()}, + ) + + +def _per_metric( + out: Mapping[KernelId, dict[str, LegacyMetricValue] | None], kernel_id: KernelId +) -> Mapping[str, Mapping[str, object]]: + """Narrow the converter result to a non-Optional, dynamically-indexable + view so that parametrized tests can assert against arbitrary + `(metric_name, field)` pairs without violating TypedDict literal-key + rules. + """ + per_kernel = out[kernel_id] + assert per_kernel is not None + return cast(Mapping[str, Mapping[str, object]], per_kernel) + + +class TestEmptyKernel: + @pytest.fixture + def empty_result(self, kernel_id: KernelId) -> KernelLiveStatBatchResult: + return KernelLiveStatBatchResult.empty([kernel_id]) + + def test_kernel_with_no_samples_yields_none( + self, + converter: LegacyLiveStatConverter, + kernel_id: KernelId, + empty_result: KernelLiveStatBatchResult, + ) -> None: + assert converter.convert(empty_result) == {kernel_id: None} + + +class TestGaugeMetric: + @pytest.fixture + def gauge_result(self, kernel_id: KernelId) -> KernelLiveStatBatchResult: + return _build_result({ + kernel_id: [ + MetricValue("mem", ValueType.CURRENT, "1024"), + MetricValue("mem", ValueType.CAPACITY, "8192"), + MetricValue("mem", ValueType.PCT, "12.5"), + ] + }) + + @pytest.mark.parametrize( + "field, expected", + [ + ("current", "1024"), + ("capacity", "8192"), + ("pct", "12.5"), + ("unit_hint", "bytes"), + ], + ) + def test_legacy_field_is_populated( + self, + converter: LegacyLiveStatConverter, + kernel_id: KernelId, + gauge_result: KernelLiveStatBatchResult, + field: str, + expected: str, + ) -> None: + per_metric = _per_metric(converter.convert(gauge_result), kernel_id) + assert per_metric["mem"][field] == expected + + +class TestRateMetric: + """For a RATE_STAT_METRICS entry the rate-of-change sample (currents[-1]) + is what the WebUI expects in `current` (legacy Valkey behavior). The + cumulative gauge (currents[0]) is discarded because the WebUI never + consumed it on the Valkey path. `stats.rate` is hack-multiplied by + `UTILIZATION_METRIC_INTERVAL` to recover the per-second magnitude that + legacy `MovingStatistics.rate` produced (the rate query template applies + `/ UTILIZATION_METRIC_INTERVAL` so its `current` matches the per-window + legacy magnitude; the converter undoes that scaling for `stats.rate`). + """ + + @pytest.fixture + def rate_result(self, kernel_id: KernelId) -> KernelLiveStatBatchResult: + return _build_result({ + kernel_id: [ + MetricValue("net_rx", ValueType.CURRENT, "1000000"), + MetricValue("net_rx", ValueType.CURRENT, "2048"), + ] + }) + + @pytest.mark.parametrize( + "field, expected", + [ + ("current", "2048"), + ("stats.rate", "10240.000000"), + ("unit_hint", "bps"), + ], + ) + def test_legacy_field_is_populated( + self, + converter: LegacyLiveStatConverter, + kernel_id: KernelId, + rate_result: KernelLiveStatBatchResult, + field: str, + expected: str, + ) -> None: + per_metric = _per_metric(converter.convert(rate_result), kernel_id) + assert per_metric["net_rx"][field] == expected + + +class TestDiffMetric: + """For a DIFF_STAT_METRICS entry the diff-over-window sample (currents[-1]) + is exposed as `current` (legacy Valkey behavior) and mirrored to + `stats.diff`. The cumulative gauge (currents[0]) is dropped — see + `docs/kernel-live-stat-followup-issues.md`. + """ + + @pytest.fixture + def diff_result(self, kernel_id: KernelId) -> KernelLiveStatBatchResult: + return _build_result({ + kernel_id: [ + MetricValue("cpu_util", ValueType.CURRENT, "5000000"), + MetricValue("cpu_util", ValueType.PCT, "37.0"), + MetricValue("cpu_util", ValueType.CURRENT, "150"), + ] + }) + + @pytest.mark.parametrize( + "field, expected", + [ + ("current", "150"), + ("pct", "37.0"), + ("stats.diff", "150"), + ], + ) + def test_legacy_field_is_populated( + self, + converter: LegacyLiveStatConverter, + kernel_id: KernelId, + diff_result: KernelLiveStatBatchResult, + field: str, + expected: str, + ) -> None: + per_metric = _per_metric(converter.convert(diff_result), kernel_id) + assert per_metric["cpu_util"][field] == expected + + +class TestPctDerivation: + """When the Prometheus pipeline does not emit a PCT sample, the converter + derives the percentage from current/capacity, matching the value the + Valkey baseline produced via the agent's MovingStatistics. + """ + + @pytest.fixture + def gauge_with_capacity(self, kernel_id: KernelId) -> KernelLiveStatBatchResult: + return _build_result({ + kernel_id: [ + MetricValue("mem", ValueType.CURRENT, "200"), + MetricValue("mem", ValueType.CAPACITY, "800"), + ] + }) + + def test_pct_is_computed_from_current_and_capacity( + self, + converter: LegacyLiveStatConverter, + kernel_id: KernelId, + gauge_with_capacity: KernelLiveStatBatchResult, + ) -> None: + per_metric = _per_metric(converter.convert(gauge_with_capacity), kernel_id) + assert per_metric["mem"]["pct"] == "25.00" + + def test_explicit_pct_sample_wins_over_derivation( + self, + converter: LegacyLiveStatConverter, + kernel_id: KernelId, + ) -> None: + result = _build_result({ + kernel_id: [ + MetricValue("mem", ValueType.CURRENT, "200"), + MetricValue("mem", ValueType.CAPACITY, "800"), + MetricValue("mem", ValueType.PCT, "30.0"), + ] + }) + per_metric = _per_metric(converter.convert(result), kernel_id) + assert per_metric["mem"]["pct"] == "30.0" + + +class TestCapacityDefault: + """The legacy Valkey shape always carried a string `capacity`. The + converter mirrors that — when the Prometheus pipeline emits no CAPACITY + sample, capacity stays at the `"0"` default rather than `null`. + """ + + @pytest.fixture + def gauge_no_capacity(self, kernel_id: KernelId) -> KernelLiveStatBatchResult: + return _build_result({kernel_id: [MetricValue("io_read", ValueType.CURRENT, "0")]}) + + def test_capacity_defaults_to_zero_string( + self, + converter: LegacyLiveStatConverter, + kernel_id: KernelId, + gauge_no_capacity: KernelLiveStatBatchResult, + ) -> None: + per_metric = _per_metric(converter.convert(gauge_no_capacity), kernel_id) + assert per_metric["io_read"]["capacity"] == "0" + + +class TestUnknownMetric: + """An unregistered metric falls back to its own name as the unit_hint + so the sample is not dropped and the missing registration is + self-evident in the response payload. + """ + + @pytest.fixture + def unknown_metric_result(self, kernel_id: KernelId) -> KernelLiveStatBatchResult: + return _build_result({kernel_id: [MetricValue("brand_new_metric", ValueType.CURRENT, "1")]}) + + def test_unknown_metric_uses_name_as_unit_hint( + self, + converter: LegacyLiveStatConverter, + kernel_id: KernelId, + unknown_metric_result: KernelLiveStatBatchResult, + ) -> None: + per_metric = _per_metric(converter.convert(unknown_metric_result), kernel_id) + assert per_metric["brand_new_metric"]["unit_hint"] == "brand_new_metric" + assert per_metric["brand_new_metric"]["current"] == "1" + + +class TestMultiKernelIsolation: + @pytest.fixture + def two_kernel_result( + self, two_kernel_ids: tuple[KernelId, KernelId] + ) -> KernelLiveStatBatchResult: + a, b = two_kernel_ids + return _build_result({ + a: [MetricValue("mem", ValueType.CURRENT, "10")], + b: [MetricValue("mem", ValueType.CURRENT, "20")], + }) + + def test_per_kernel_values_do_not_leak( + self, + converter: LegacyLiveStatConverter, + two_kernel_ids: tuple[KernelId, KernelId], + two_kernel_result: KernelLiveStatBatchResult, + ) -> None: + a, b = two_kernel_ids + out = converter.convert(two_kernel_result) + per_metric_a = _per_metric(out, a) + per_metric_b = _per_metric(out, b) + assert per_metric_a["mem"]["current"] == "10" + assert per_metric_b["mem"]["current"] == "20" From cf30ee0ea263c10800fbc64246ead365ccdd4417 Mon Sep 17 00:00:00 2001 From: BoKeum Date: Mon, 27 Apr 2026 13:03:07 +0900 Subject: [PATCH 2/3] style: Add linter result --- src/ai/backend/manager/api/gql_legacy/kernel.py | 4 ---- src/ai/backend/manager/repositories/metric/repository.py | 6 +++++- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/ai/backend/manager/api/gql_legacy/kernel.py b/src/ai/backend/manager/api/gql_legacy/kernel.py index 304f4673f0d..2d7423a885f 100644 --- a/src/ai/backend/manager/api/gql_legacy/kernel.py +++ b/src/ai/backend/manager/api/gql_legacy/kernel.py @@ -73,10 +73,6 @@ async def _batch_load_kernel_live_stat( ctx: GraphQueryContext, kernel_ids: Sequence[KernelId], ) -> list[dict[str, Any] | None]: - """Prometheus-backed replacement for the old Valkey `KernelStatistics.by_kernel` - loader. Returns the legacy `dict[metric_name, MetricValue]` shape (or `None` - when the kernel has no Prometheus samples) preserving wire compatibility. - """ if not kernel_ids: return [] action_result = await ctx.processors.metric.query_container_live_stat.wait_for_complete( diff --git a/src/ai/backend/manager/repositories/metric/repository.py b/src/ai/backend/manager/repositories/metric/repository.py index 589ff95cee6..a019c277d1a 100644 --- a/src/ai/backend/manager/repositories/metric/repository.py +++ b/src/ai/backend/manager/repositories/metric/repository.py @@ -102,7 +102,11 @@ async def _query_container_live_stats( kernel_ids: Sequence[KernelId], ) -> dict[KernelId, list[MetricValue]]: queries = self._fixed_query_builder.get_container_live_stat_queries(kernel_ids) - gauge_response, diff_response, rate_response = await asyncio.gather( + ( + gauge_response, + diff_response, + rate_response, + ) = await asyncio.gather( *(self._prometheus_client.query_instant(preset) for preset in queries.to_list()) ) gauge = KernelMetricValuesByKernel.from_prometheus_response(gauge_response) From cde8b1fe8b549997e94b5156a82ed64103c0eaf9 Mon Sep 17 00:00:00 2001 From: BoKeum Date: Mon, 27 Apr 2026 14:57:59 +0900 Subject: [PATCH 3/3] style: Remove untracked file name in comment --- src/ai/backend/manager/api/gql_legacy/stat_converter.py | 3 +-- tests/unit/manager/api/gql_legacy/test_stat_converter.py | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/ai/backend/manager/api/gql_legacy/stat_converter.py b/src/ai/backend/manager/api/gql_legacy/stat_converter.py index c5bdb841f9a..2ac457f69f2 100644 --- a/src/ai/backend/manager/api/gql_legacy/stat_converter.py +++ b/src/ai/backend/manager/api/gql_legacy/stat_converter.py @@ -23,8 +23,7 @@ class LegacyLiveStatConverter: `currents[0]` is the raw gauge sample, `currents[-1]` is the rate/diff query result. - `stats.max` / `stats.avg` are not populated — see - `docs/kernel-live-stat-followup-issues.md`. + `stats.max` / `stats.avg` are not populated """ def convert( diff --git a/tests/unit/manager/api/gql_legacy/test_stat_converter.py b/tests/unit/manager/api/gql_legacy/test_stat_converter.py index 627032cce64..a751846a072 100644 --- a/tests/unit/manager/api/gql_legacy/test_stat_converter.py +++ b/tests/unit/manager/api/gql_legacy/test_stat_converter.py @@ -137,8 +137,7 @@ def test_legacy_field_is_populated( class TestDiffMetric: """For a DIFF_STAT_METRICS entry the diff-over-window sample (currents[-1]) is exposed as `current` (legacy Valkey behavior) and mirrored to - `stats.diff`. The cumulative gauge (currents[0]) is dropped — see - `docs/kernel-live-stat-followup-issues.md`. + `stats.diff`. The cumulative gauge (currents[0]) is dropped """ @pytest.fixture