Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/11330.enhance.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Migrate kernel `live_stat` GraphQL resolver from Valkey to Prometheus while preserving the legacy wire shape
8 changes: 5 additions & 3 deletions src/ai/backend/agent/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,17 @@
from ai.backend.common import msgpack
from ai.backend.common.identity import is_containerized
from ai.backend.common.metrics.metric import StageObserver
from ai.backend.common.metrics.types import UTILIZATION_METRIC_INTERVAL
from ai.backend.common.metrics.types import (
UTILIZATION_METRIC_INTERVAL,
MetricValue,
MovingStatValue,
)
from ai.backend.common.types import (
PID,
ContainerId,
DeviceId,
KernelId,
MetricKey,
MetricValue,
MovingStatValue,
SessionId,
SlotName,
)
Expand Down
3 changes: 1 addition & 2 deletions src/ai/backend/appproxy/worker/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@
SafeGauge,
SafeHistogram,
)
from ai.backend.common.metrics.types import MetricValue, MovingStatValue
from ai.backend.common.types import (
MetricKey,
MetricValue,
MovingStatValue,
RuntimeVariant,
)

Expand Down
2 changes: 1 addition & 1 deletion src/ai/backend/client/output/formatters.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import humanize

from ai.backend.common.types import MetricValue
from ai.backend.common.metrics.types import MetricValue

from .types import AbstractOutputFormatter, FieldSpec

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from ai.backend.common.exception import BackendAIError
from ai.backend.common.json import dump_json_str, load_json
from ai.backend.common.metrics.metric import DomainType, LayerType
from ai.backend.common.metrics.types import MetricValue
from ai.backend.common.resilience import (
BackoffStrategy,
MetricArgs,
Expand All @@ -34,7 +35,7 @@
RetryPolicy,
)
from ai.backend.common.resource.types import TotalResourceData
from ai.backend.common.types import AccessKey, MetricKey, MetricValue, ValkeyTarget
from ai.backend.common.types import AccessKey, MetricKey, ValkeyTarget
from ai.backend.logging.utils import BraceStyleAdapter

log = BraceStyleAdapter(logging.getLogger(__spec__.name))
Expand Down
98 changes: 97 additions & 1 deletion src/ai/backend/common/metrics/types.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,107 @@
from typing import Final
from typing import Final, TypedDict

UNDEFINED: Final[str] = "undefined"


class MovingStatValue(TypedDict):
min: str
max: str
sum: str
avg: str
diff: str
rate: str
version: int | None # for legacy client compatibility


MetricValue = TypedDict(
"MetricValue",
{
"current": str,
"capacity": str,
"pct": str,
"unit_hint": str,
"stats.min": str,
"stats.max": str,
"stats.sum": str,
"stats.avg": str,
"stats.diff": str,
"stats.rate": str,
"stats.version": int | None,
},
)
Comment on lines +16 to +31
Copy link

Copilot AI Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MetricValue now defines capacity as a required str, but there are still in-repo producers/consumers that treat capacity as nullable (e.g., alembic stats migration sets "capacity": None, and the CLI formatter checks metric["capacity"] is not None). This makes the TypedDict inconsistent with real payloads and will either break type-checking or force unsafe casts. Please either keep capacity as str | None (like the previous definition) or update all producers to always emit a string (e.g., "0") and remove/adjust the None handling accordingly.

Copilot uses AI. Check for mistakes.


def make_default_metric_value(unit_hint: str) -> MetricValue:
"""Return a `MetricValue` populated with neutral defaults.

All numeric string fields are `"0"` (including `capacity`, matching the
legacy Valkey shape where every metric carried a string capacity).
`unit_hint` is supplied by the caller.
"""
return MetricValue({
"current": "0",
"capacity": "0",
"pct": "0",
"unit_hint": unit_hint,
"stats.min": "0",
"stats.max": "0",
"stats.sum": "0",
"stats.avg": "0",
"stats.diff": "0",
"stats.rate": "0",
"stats.version": None,
})


UTILIZATION_METRIC_INTERVAL: Final[float] = 5.0
UTILIZATION_METRIC_DETENTION: Final[float] = 600.0 # 10 minutes

CONTAINER_UTILIZATION_METRIC_NAME: Final[str] = "backendai_container_utilization"
CONTAINER_UTILIZATION_METRIC_LABEL_NAME: Final[str] = "container_metric_name"
DEVICE_UTILIZATION_METRIC_LABEL_NAME: Final[str] = "device_metric_name"
PROCESS_UTILIZATION_METRIC_LABEL_NAME: Final[str] = "process_metric_name"

# Metric-name classification used by the legacy live_stat dict converter.
# These mirror the semantics that Worker's MovingStatistics produced when
# kernel stats were stored in Valkey:
# - RATE_STAT_METRICS: stats.rate is meaningful (rate of change per second).
# - DIFF_STAT_METRICS: stats.diff is meaningful (delta over the last window).
RATE_STAT_METRICS: Final[frozenset[str]] = frozenset({"net_rx", "net_tx"})
DIFF_STAT_METRICS: Final[frozenset[str]] = frozenset({"cpu_util"})

# Per-metric unit hint emitted by the agent (source of truth: src/ai/backend/agent/docker/intrinsic.py).
METRIC_UNIT_HINTS: Final[dict[str, str]] = {
"cpu_used": "msec",
"cpu_util": "percent",
"mem": "bytes",
"net_rx": "bps",
"net_tx": "bps",
"io_read": "bytes",
"io_write": "bytes",
"io_scratch_size": "bytes",
}


def resolve_unit_hint(metric_name: str) -> str:
"""Return the unit_hint for a Backend.AI container metric name.

Prometheus does not carry the agent-side `unit_hint` in its samples, so the
manager has to recover it from the metric name alone. Lookup order:

1. Explicit registration in :data:`METRIC_UNIT_HINTS` (highest priority).
2. Naming-convention fallback for plugin metrics that follow Backend.AI
conventions (e.g., `cuda_util`, `gpu_mem`, `tpu_util`).
3. The metric_name itself as a last resort — preserves the sample data
and surfaces the missing registration to the WebUI via the response.
"""
if metric_name in METRIC_UNIT_HINTS:
return METRIC_UNIT_HINTS[metric_name]
if metric_name.endswith("_util"):
return "percent"
if metric_name == "mem" or metric_name.endswith("_mem"):
return "bytes"
if metric_name.startswith("io_"):
return "bytes"
if metric_name.startswith("net_"):
return "bps"
return metric_name
30 changes: 0 additions & 30 deletions src/ai/backend/common/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@
"KernelEnqueueingConfig",
"KernelId",
"MetricKey",
"MetricValue",
"ModelServiceProfile",
"ModelServiceStatus",
"MountExpression",
Expand All @@ -115,7 +114,6 @@
"MountPermissionLiteral",
"MountPoint",
"MountTypes",
"MovingStatValue",
"PreemptionMode",
"PreemptionOrder",
"PromMetric",
Expand Down Expand Up @@ -565,34 +563,6 @@ class AbuseReport(TypedDict):
abuse_report: str | None


class MovingStatValue(TypedDict):
min: str
max: str
sum: str
avg: str
diff: str
rate: str
version: int | None # for legacy client compatibility


MetricValue = TypedDict(
"MetricValue",
{
"current": str,
"capacity": str | None,
"pct": str,
"unit_hint": str,
"stats.min": str,
"stats.max": str,
"stats.sum": str,
"stats.avg": str,
"stats.diff": str,
"stats.rate": str,
"stats.version": int | None,
},
)


class IntrinsicSlotNames(enum.Enum):
CPU = SlotName("cpu")
MEMORY = SlotName("mem")
Expand Down
36 changes: 25 additions & 11 deletions src/ai/backend/manager/api/gql_legacy/kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
KernelId,
SessionId,
)
from ai.backend.manager.api.gql_legacy.stat_converter import LegacyLiveStatConverter
from ai.backend.manager.data.kernel.types import KernelStatus
from ai.backend.manager.defs import DEFAULT_ROLE
from ai.backend.manager.models.group import groups
Expand All @@ -42,6 +43,7 @@
QueryFilterParser,
)
from ai.backend.manager.models.user import UserRole, users
from ai.backend.manager.services.metric.actions.live_stat import ContainerLiveStatAction

from .base import (
BigInt,
Expand All @@ -67,6 +69,19 @@
)


async def _batch_load_kernel_live_stat(
Comment thread
jopemachine marked this conversation as resolved.
ctx: GraphQueryContext,
kernel_ids: Sequence[KernelId],
) -> list[dict[str, Any] | None]:
if not kernel_ids:
return []
action_result = await ctx.processors.metric.query_container_live_stat.wait_for_complete(
ContainerLiveStatAction(kernel_ids=list(kernel_ids))
)
converted = LegacyLiveStatConverter().convert(action_result.stats)
return [converted.get(kid) for kid in kernel_ids]
Comment on lines +81 to +82
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have to create a converter every time?



class KernelNode(graphene.ObjectType): # type: ignore[misc]
class Meta:
interfaces = (AsyncNode,)
Expand Down Expand Up @@ -190,17 +205,10 @@ async def resolve_image(self, info: graphene.ResolveInfo) -> ImageNode | None:
async def resolve_live_stat(self, info: graphene.ResolveInfo) -> dict[str, Any] | None:
graph_ctx: GraphQueryContext = info.context
loader = graph_ctx.dataloader_manager.get_loader_by_func(
graph_ctx, self.batch_load_live_stat
graph_ctx, _batch_load_kernel_live_stat
)
return cast(dict[str, Any] | None, await loader.load(self.row_id))

@classmethod
async def batch_load_live_stat(
cls, ctx: GraphQueryContext, kernel_ids: Sequence[KernelId]
) -> list[dict[str, Any] | None]:
kernel_ids_str = [str(kid) for kid in kernel_ids]
return await ctx.valkey_stat.get_session_statistics_batch(kernel_ids_str)


class KernelConnection(Connection):
class Meta:
Expand Down Expand Up @@ -313,7 +321,9 @@ def from_row(cls, ctx: GraphQueryContext, row: KernelRow | None) -> ComputeConta
# we can leave last_stat value for legacy support, as an alias to last_stat
async def resolve_live_stat(self, info: graphene.ResolveInfo) -> Mapping[str, Any] | None:
graph_ctx: GraphQueryContext = info.context
loader = graph_ctx.dataloader_manager.get_loader(graph_ctx, "KernelStatistics.by_kernel")
loader = graph_ctx.dataloader_manager.get_loader_by_func(
graph_ctx, _batch_load_kernel_live_stat
)
return cast(Mapping[str, Any] | None, await loader.load(self.id))

async def resolve_last_stat(self, info: graphene.ResolveInfo) -> Mapping[str, Any] | None:
Expand Down Expand Up @@ -606,7 +616,9 @@ class Meta:
# we can leave last_stat value for legacy support, as an alias to last_stat
async def resolve_live_stat(self, info: graphene.ResolveInfo) -> Mapping[str, Any] | None:
graph_ctx: GraphQueryContext = info.context
loader = graph_ctx.dataloader_manager.get_loader(graph_ctx, "KernelStatistics.by_kernel")
loader = graph_ctx.dataloader_manager.get_loader_by_func(
graph_ctx, _batch_load_kernel_live_stat
)
return cast(Mapping[str, Any] | None, await loader.load(self.id))

async def resolve_last_stat(self, info: graphene.ResolveInfo) -> Mapping[str, Any] | None:
Expand All @@ -632,7 +644,9 @@ async def _resolve_legacy_metric(
if value is None:
return convert_type(0)
return convert_type(value)
loader = graph_ctx.dataloader_manager.get_loader(graph_ctx, "KernelStatistics.by_kernel")
loader = graph_ctx.dataloader_manager.get_loader_by_func(
graph_ctx, _batch_load_kernel_live_stat
)
kstat = await loader.load(self.id)
if kstat is None:
return convert_type(0)
Expand Down
Loading
Loading