Skip to content

Commit 323c1d1

Browse files
committed
refactor: migrate kernel live_stat from Valkey to Prometheus
1 parent f55366d commit 323c1d1

12 files changed

Lines changed: 518 additions & 61 deletions

File tree

changes/11330.enhance.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Migrate kernel `live_stat` GraphQL resolver from Valkey to Prometheus while preserving the legacy wire shape

src/ai/backend/agent/stats.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,17 @@
2626
from ai.backend.common import msgpack
2727
from ai.backend.common.identity import is_containerized
2828
from ai.backend.common.metrics.metric import StageObserver
29-
from ai.backend.common.metrics.types import UTILIZATION_METRIC_INTERVAL
29+
from ai.backend.common.metrics.types import (
30+
UTILIZATION_METRIC_INTERVAL,
31+
MetricValue,
32+
MovingStatValue,
33+
)
3034
from ai.backend.common.types import (
3135
PID,
3236
ContainerId,
3337
DeviceId,
3438
KernelId,
3539
MetricKey,
36-
MetricValue,
37-
MovingStatValue,
3840
SessionId,
3941
SlotName,
4042
)

src/ai/backend/appproxy/worker/types.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,9 @@
4242
SafeGauge,
4343
SafeHistogram,
4444
)
45+
from ai.backend.common.metrics.types import MetricValue, MovingStatValue
4546
from ai.backend.common.types import (
4647
MetricKey,
47-
MetricValue,
48-
MovingStatValue,
4948
RuntimeVariant,
5049
)
5150

src/ai/backend/client/output/formatters.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
import humanize
1111

12-
from ai.backend.common.types import MetricValue
12+
from ai.backend.common.metrics.types import MetricValue
1313

1414
from .types import AbstractOutputFormatter, FieldSpec
1515

src/ai/backend/common/clients/valkey_client/valkey_stat/client.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from ai.backend.common.exception import BackendAIError
2626
from ai.backend.common.json import dump_json_str, load_json
2727
from ai.backend.common.metrics.metric import DomainType, LayerType
28+
from ai.backend.common.metrics.types import MetricValue
2829
from ai.backend.common.resilience import (
2930
BackoffStrategy,
3031
MetricArgs,
@@ -34,7 +35,7 @@
3435
RetryPolicy,
3536
)
3637
from ai.backend.common.resource.types import TotalResourceData
37-
from ai.backend.common.types import AccessKey, MetricKey, MetricValue, ValkeyTarget
38+
from ai.backend.common.types import AccessKey, MetricKey, ValkeyTarget
3839
from ai.backend.logging.utils import BraceStyleAdapter
3940

4041
log = BraceStyleAdapter(logging.getLogger(__spec__.name))
Lines changed: 97 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,107 @@
1-
from typing import Final
1+
from typing import Final, TypedDict
22

33
UNDEFINED: Final[str] = "undefined"
44

5+
6+
class MovingStatValue(TypedDict):
7+
min: str
8+
max: str
9+
sum: str
10+
avg: str
11+
diff: str
12+
rate: str
13+
version: int | None # for legacy client compatibility
14+
15+
16+
MetricValue = TypedDict(
17+
"MetricValue",
18+
{
19+
"current": str,
20+
"capacity": str,
21+
"pct": str,
22+
"unit_hint": str,
23+
"stats.min": str,
24+
"stats.max": str,
25+
"stats.sum": str,
26+
"stats.avg": str,
27+
"stats.diff": str,
28+
"stats.rate": str,
29+
"stats.version": int | None,
30+
},
31+
)
32+
33+
34+
def make_default_metric_value(unit_hint: str) -> MetricValue:
35+
"""Return a `MetricValue` populated with neutral defaults.
36+
37+
All numeric string fields are `"0"` (including `capacity`, matching the
38+
legacy Valkey shape where every metric carried a string capacity).
39+
`unit_hint` is supplied by the caller.
40+
"""
41+
return MetricValue({
42+
"current": "0",
43+
"capacity": "0",
44+
"pct": "0",
45+
"unit_hint": unit_hint,
46+
"stats.min": "0",
47+
"stats.max": "0",
48+
"stats.sum": "0",
49+
"stats.avg": "0",
50+
"stats.diff": "0",
51+
"stats.rate": "0",
52+
"stats.version": None,
53+
})
54+
55+
556
UTILIZATION_METRIC_INTERVAL: Final[float] = 5.0
657
UTILIZATION_METRIC_DETENTION: Final[float] = 600.0 # 10 minutes
758

859
CONTAINER_UTILIZATION_METRIC_NAME: Final[str] = "backendai_container_utilization"
960
CONTAINER_UTILIZATION_METRIC_LABEL_NAME: Final[str] = "container_metric_name"
1061
DEVICE_UTILIZATION_METRIC_LABEL_NAME: Final[str] = "device_metric_name"
1162
PROCESS_UTILIZATION_METRIC_LABEL_NAME: Final[str] = "process_metric_name"
63+
64+
# Metric-name classification used by the legacy live_stat dict converter.
65+
# These mirror the semantics that Worker's MovingStatistics produced when
66+
# kernel stats were stored in Valkey:
67+
# - RATE_STAT_METRICS: stats.rate is meaningful (rate of change per second).
68+
# - DIFF_STAT_METRICS: stats.diff is meaningful (delta over the last window).
69+
RATE_STAT_METRICS: Final[frozenset[str]] = frozenset({"net_rx", "net_tx"})
70+
DIFF_STAT_METRICS: Final[frozenset[str]] = frozenset({"cpu_util"})
71+
72+
# Per-metric unit hint emitted by the agent (source of truth: src/ai/backend/agent/docker/intrinsic.py).
73+
METRIC_UNIT_HINTS: Final[dict[str, str]] = {
74+
"cpu_used": "msec",
75+
"cpu_util": "percent",
76+
"mem": "bytes",
77+
"net_rx": "bps",
78+
"net_tx": "bps",
79+
"io_read": "bytes",
80+
"io_write": "bytes",
81+
"io_scratch_size": "bytes",
82+
}
83+
84+
85+
def resolve_unit_hint(metric_name: str) -> str:
86+
"""Return the unit_hint for a Backend.AI container metric name.
87+
88+
Prometheus does not carry the agent-side `unit_hint` in its samples, so the
89+
manager has to recover it from the metric name alone. Lookup order:
90+
91+
1. Explicit registration in :data:`METRIC_UNIT_HINTS` (highest priority).
92+
2. Naming-convention fallback for plugin metrics that follow Backend.AI
93+
conventions (e.g., `cuda_util`, `gpu_mem`, `tpu_util`).
94+
3. The metric_name itself as a last resort — preserves the sample data
95+
and surfaces the missing registration to the WebUI via the response.
96+
"""
97+
if metric_name in METRIC_UNIT_HINTS:
98+
return METRIC_UNIT_HINTS[metric_name]
99+
if metric_name.endswith("_util"):
100+
return "percent"
101+
if metric_name == "mem" or metric_name.endswith("_mem"):
102+
return "bytes"
103+
if metric_name.startswith("io_"):
104+
return "bytes"
105+
if metric_name.startswith("net_"):
106+
return "bps"
107+
return metric_name

src/ai/backend/common/types.py

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,6 @@
106106
"KernelEnqueueingConfig",
107107
"KernelId",
108108
"MetricKey",
109-
"MetricValue",
110109
"ModelServiceProfile",
111110
"ModelServiceStatus",
112111
"MountExpression",
@@ -115,7 +114,6 @@
115114
"MountPermissionLiteral",
116115
"MountPoint",
117116
"MountTypes",
118-
"MovingStatValue",
119117
"PreemptionMode",
120118
"PreemptionOrder",
121119
"PromMetric",
@@ -565,34 +563,6 @@ class AbuseReport(TypedDict):
565563
abuse_report: str | None
566564

567565

568-
class MovingStatValue(TypedDict):
569-
min: str
570-
max: str
571-
sum: str
572-
avg: str
573-
diff: str
574-
rate: str
575-
version: int | None # for legacy client compatibility
576-
577-
578-
MetricValue = TypedDict(
579-
"MetricValue",
580-
{
581-
"current": str,
582-
"capacity": str | None,
583-
"pct": str,
584-
"unit_hint": str,
585-
"stats.min": str,
586-
"stats.max": str,
587-
"stats.sum": str,
588-
"stats.avg": str,
589-
"stats.diff": str,
590-
"stats.rate": str,
591-
"stats.version": int | None,
592-
},
593-
)
594-
595-
596566
class IntrinsicSlotNames(enum.Enum):
597567
CPU = SlotName("cpu")
598568
MEMORY = SlotName("mem")

src/ai/backend/manager/api/gql_legacy/kernel.py

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
KernelId,
2525
SessionId,
2626
)
27+
from ai.backend.manager.api.gql_legacy.stat_converter import LegacyLiveStatConverter
2728
from ai.backend.manager.data.kernel.types import KernelStatus
2829
from ai.backend.manager.defs import DEFAULT_ROLE
2930
from ai.backend.manager.models.group import groups
@@ -42,6 +43,7 @@
4243
QueryFilterParser,
4344
)
4445
from ai.backend.manager.models.user import UserRole, users
46+
from ai.backend.manager.services.metric.actions.live_stat import ContainerLiveStatAction
4547

4648
from .base import (
4749
BigInt,
@@ -67,6 +69,23 @@
6769
)
6870

6971

72+
async def _batch_load_kernel_live_stat(
73+
ctx: GraphQueryContext,
74+
kernel_ids: Sequence[KernelId],
75+
) -> list[dict[str, Any] | None]:
76+
"""Prometheus-backed replacement for the old Valkey `KernelStatistics.by_kernel`
77+
loader. Returns the legacy `dict[metric_name, MetricValue]` shape (or `None`
78+
when the kernel has no Prometheus samples) preserving wire compatibility.
79+
"""
80+
if not kernel_ids:
81+
return []
82+
action_result = await ctx.processors.metric.query_container_live_stat.wait_for_complete(
83+
ContainerLiveStatAction(kernel_ids=list(kernel_ids))
84+
)
85+
converted = LegacyLiveStatConverter().convert(action_result.stats)
86+
return [converted.get(kid) for kid in kernel_ids]
87+
88+
7089
class KernelNode(graphene.ObjectType): # type: ignore[misc]
7190
class Meta:
7291
interfaces = (AsyncNode,)
@@ -190,17 +209,10 @@ async def resolve_image(self, info: graphene.ResolveInfo) -> ImageNode | None:
190209
async def resolve_live_stat(self, info: graphene.ResolveInfo) -> dict[str, Any] | None:
191210
graph_ctx: GraphQueryContext = info.context
192211
loader = graph_ctx.dataloader_manager.get_loader_by_func(
193-
graph_ctx, self.batch_load_live_stat
212+
graph_ctx, _batch_load_kernel_live_stat
194213
)
195214
return cast(dict[str, Any] | None, await loader.load(self.row_id))
196215

197-
@classmethod
198-
async def batch_load_live_stat(
199-
cls, ctx: GraphQueryContext, kernel_ids: Sequence[KernelId]
200-
) -> list[dict[str, Any] | None]:
201-
kernel_ids_str = [str(kid) for kid in kernel_ids]
202-
return await ctx.valkey_stat.get_session_statistics_batch(kernel_ids_str)
203-
204216

205217
class KernelConnection(Connection):
206218
class Meta:
@@ -313,7 +325,9 @@ def from_row(cls, ctx: GraphQueryContext, row: KernelRow | None) -> ComputeConta
313325
# we can leave last_stat value for legacy support, as an alias to last_stat
314326
async def resolve_live_stat(self, info: graphene.ResolveInfo) -> Mapping[str, Any] | None:
315327
graph_ctx: GraphQueryContext = info.context
316-
loader = graph_ctx.dataloader_manager.get_loader(graph_ctx, "KernelStatistics.by_kernel")
328+
loader = graph_ctx.dataloader_manager.get_loader_by_func(
329+
graph_ctx, _batch_load_kernel_live_stat
330+
)
317331
return cast(Mapping[str, Any] | None, await loader.load(self.id))
318332

319333
async def resolve_last_stat(self, info: graphene.ResolveInfo) -> Mapping[str, Any] | None:
@@ -606,7 +620,9 @@ class Meta:
606620
# we can leave last_stat value for legacy support, as an alias to last_stat
607621
async def resolve_live_stat(self, info: graphene.ResolveInfo) -> Mapping[str, Any] | None:
608622
graph_ctx: GraphQueryContext = info.context
609-
loader = graph_ctx.dataloader_manager.get_loader(graph_ctx, "KernelStatistics.by_kernel")
623+
loader = graph_ctx.dataloader_manager.get_loader_by_func(
624+
graph_ctx, _batch_load_kernel_live_stat
625+
)
610626
return cast(Mapping[str, Any] | None, await loader.load(self.id))
611627

612628
async def resolve_last_stat(self, info: graphene.ResolveInfo) -> Mapping[str, Any] | None:
@@ -632,7 +648,9 @@ async def _resolve_legacy_metric(
632648
if value is None:
633649
return convert_type(0)
634650
return convert_type(value)
635-
loader = graph_ctx.dataloader_manager.get_loader(graph_ctx, "KernelStatistics.by_kernel")
651+
loader = graph_ctx.dataloader_manager.get_loader_by_func(
652+
graph_ctx, _batch_load_kernel_live_stat
653+
)
636654
kstat = await loader.load(self.id)
637655
if kstat is None:
638656
return convert_type(0)

0 commit comments

Comments
 (0)