diff --git a/changes/11360.feature.md b/changes/11360.feature.md new file mode 100644 index 00000000000..2a87eeac417 --- /dev/null +++ b/changes/11360.feature.md @@ -0,0 +1 @@ +Provide a manager-side parallel supply for legacy `live_stat` `stats.max` / `stats.avg` / `stats.rate` fields, computed from Prometheus on demand instead of from the agent's in-memory `MovingStatistics` accumulator. Survives agent / manager / host restart, stays consistent across sessions, and uses a sliding window (default 5m) instead of unbounded lifetime accumulation. diff --git a/src/ai/backend/common/clients/prometheus/client.py b/src/ai/backend/common/clients/prometheus/client.py index 9e4397576fa..a6607a70d80 100644 --- a/src/ai/backend/common/clients/prometheus/client.py +++ b/src/ai/backend/common/clients/prometheus/client.py @@ -84,13 +84,12 @@ async def fetch_container_live_stats( kernel_ids: Sequence[KernelId], ) -> dict[KernelId, list[MetricValue]]: queries = self._fixed_query_builder.get_container_live_stat_queries(kernel_ids) - gauge_response = await self._query_instant(queries.gauge) - diff_response = await self._query_instant(queries.diff) - rate_response = await self._query_instant(queries.rate) - gauge = KernelMetricValuesByKernel.from_prometheus_response(gauge_response) - diff = KernelMetricValuesByKernel.from_prometheus_response(diff_response) - rate = KernelMetricValuesByKernel.from_prometheus_response(rate_response) - merged = gauge.merged_with(diff).merged_with(rate) + merged = KernelMetricValuesByKernel(values_by_kernel={}) + for preset in queries.to_list(): + response = await self._query_instant(preset) + merged = merged.merged_with( + KernelMetricValuesByKernel.from_prometheus_response(response) + ) return merged.values_by_kernel async def execute_preset( diff --git a/src/ai/backend/common/clients/prometheus/fixed_query_builder.py b/src/ai/backend/common/clients/prometheus/fixed_query_builder.py index 18fc8f9bd10..504720b2516 100644 --- a/src/ai/backend/common/clients/prometheus/fixed_query_builder.py +++ b/src/ai/backend/common/clients/prometheus/fixed_query_builder.py @@ -6,6 +6,14 @@ from ai.backend.common.clients.prometheus.metric_types import ( DIFF_METRICS, RATE_METRICS, + STATS_AVG_GAUGE_METRIC_PATTERNS, + STATS_AVG_GAUGE_METRICS, + STATS_AVG_OVER_RATE_METRICS, + STATS_MAX_GAUGE_METRIC_PATTERNS, + STATS_MAX_GAUGE_METRICS, + STATS_MAX_OVER_RATE_METRICS, + STATS_RATE_COUNTER_METRICS, + STATS_RATE_GAUGE_METRICS, ContainerLiveStatQueries, ContainerMetricOptionalLabel, MetricType, @@ -13,7 +21,6 @@ from ai.backend.common.clients.prometheus.preset import LabelMatcher, MetricPreset from ai.backend.common.clients.prometheus.querier import ContainerMetricQuerier from ai.backend.common.clients.prometheus.types import ValueType -from ai.backend.common.exception import UnreachableError from ai.backend.common.metrics.types import ( CONTAINER_UTILIZATION_METRIC_LABEL_NAME, CONTAINER_UTILIZATION_METRIC_NAME, @@ -47,8 +54,61 @@ class LabelValuesQuery: metric_match: str +@dataclass(frozen=True) +class _LiveStatQuerySpec: + template: str + metric_name_filter: frozenset[str] | None = None + value_type_filter: ValueType | None = None + + +@dataclass(frozen=True) +class _StatsBucket: + """Window-stats bucket spec (gauge metrics + rate metrics for a single stat).""" + + value_type: ValueType + gauge_metrics: frozenset[str] + rate_metrics: frozenset[str] + gauge_metric_patterns: frozenset[str] = frozenset() + + def _regex_union(values: Sequence[str]) -> str: - return "|".join(re.escape(value) for value in values) + return "|".join(re.escape(value).replace(r"\-", "-") for value in values) + + +def _metric_name_regex( + metric_names: frozenset[str], + metric_patterns: frozenset[str] = frozenset(), +) -> str: + exact_parts = [re.escape(value) for value in sorted(metric_names)] + return "|".join([*exact_parts, *sorted(metric_patterns)]) + + +_GAUGE_LIVE_STAT_SPEC: Final[_LiveStatQuerySpec] = _LiveStatQuerySpec( + template=_GAUGE_TEMPLATE, +) +_DIFF_LIVE_STAT_SPEC: Final[_LiveStatQuerySpec] = _LiveStatQuerySpec( + template=_DIFF_TEMPLATE, + metric_name_filter=DIFF_METRICS, + value_type_filter=ValueType.CURRENT, +) +_RATE_LIVE_STAT_SPEC: Final[_LiveStatQuerySpec] = _LiveStatQuerySpec( + template=_RATE_TEMPLATE, + metric_name_filter=RATE_METRICS, + value_type_filter=ValueType.CURRENT, +) + +_MAX_STATS_BUCKET: Final[_StatsBucket] = _StatsBucket( + value_type=ValueType.MAX, + gauge_metrics=STATS_MAX_GAUGE_METRICS, + rate_metrics=STATS_MAX_OVER_RATE_METRICS, + gauge_metric_patterns=STATS_MAX_GAUGE_METRIC_PATTERNS, +) +_AVG_STATS_BUCKET: Final[_StatsBucket] = _StatsBucket( + value_type=ValueType.AVG, + gauge_metrics=STATS_AVG_GAUGE_METRICS, + rate_metrics=STATS_AVG_OVER_RATE_METRICS, + gauge_metric_patterns=STATS_AVG_GAUGE_METRIC_PATTERNS, +) class FixedQueryBuilder: @@ -101,49 +161,127 @@ def get_container_live_stat_queries( kernel_ids: Sequence[KernelId], ) -> ContainerLiveStatQueries: return ContainerLiveStatQueries( - gauge=self._get_container_live_stat_query( - kernel_ids, - metric_type=MetricType.GAUGE, - ), - diff=self._get_container_live_stat_query( - kernel_ids, - metric_type=MetricType.DIFF, - metric_name_filter=DIFF_METRICS, - value_type_filter=ValueType.CURRENT, - ), - rate=self._get_container_live_stat_query( - kernel_ids, - metric_type=MetricType.RATE, - metric_name_filter=RATE_METRICS, - value_type_filter=ValueType.CURRENT, - ), + gauge=self._build_filtered_preset(kernel_ids, _GAUGE_LIVE_STAT_SPEC), + diff=self._build_filtered_preset(kernel_ids, _DIFF_LIVE_STAT_SPEC), + rate=self._build_filtered_preset(kernel_ids, _RATE_LIVE_STAT_SPEC), + max=self._build_window_stats_preset(kernel_ids, _MAX_STATS_BUCKET), + avg=self._build_window_stats_preset(kernel_ids, _AVG_STATS_BUCKET), + rate_stats=self._build_rate_stats_preset(kernel_ids), ) - def _get_container_live_stat_query( + def _build_rate_stats_preset( self, kernel_ids: Sequence[KernelId], - *, - metric_type: MetricType, - metric_name_filter: frozenset[str] | None = None, - value_type_filter: ValueType | None = None, + ) -> MetricPreset: + kernel_id_regex = _regex_union([str(kid) for kid in kernel_ids]) + group_by = ",".join(sorted(_LIVE_STAT_GROUP_BY)) + parts: list[str] = [] + if STATS_RATE_GAUGE_METRICS: + gauge_regex = _regex_union(sorted(STATS_RATE_GAUGE_METRICS)) + selector = self._utilization_selector(kernel_id_regex, gauge_regex) + parts.append(self._labelled_sum(selector, group_by, ValueType.RATE)) + if STATS_RATE_COUNTER_METRICS: + counter_regex = _regex_union(sorted(STATS_RATE_COUNTER_METRICS)) + base = self._utilization_selector(kernel_id_regex, counter_regex) + selector = f"rate({base}[{self._timewindow}])" + parts.append(self._labelled_sum(selector, group_by, ValueType.RATE)) + return MetricPreset(template=" or ".join(parts)) + + def _labelled_sum(self, selector: str, group_by: str, stat_label: ValueType) -> str: + return ( + f"label_replace(sum by ({group_by})({selector})," + f'"value_type","{stat_label}","value_type",".*")' + ) + + def _build_window_stats_preset( + self, + kernel_ids: Sequence[KernelId], + bucket: _StatsBucket, + ) -> MetricPreset: + kernel_id_regex = _regex_union([str(kid) for kid in kernel_ids]) + group_by = ",".join(sorted(_LIVE_STAT_GROUP_BY)) + return MetricPreset( + template=self._render_stats_query( + bucket, + kernel_id_regex=kernel_id_regex, + group_by=group_by, + ) + ) + + def _build_filtered_preset( + self, + kernel_ids: Sequence[KernelId], + spec: _LiveStatQuerySpec, ) -> MetricPreset: labels: dict[str, LabelMatcher] = { "kernel_id": LabelMatcher.regex(_regex_union([str(kid) for kid in kernel_ids])) } - if metric_name_filter is not None: + if spec.metric_name_filter is not None: labels["container_metric_name"] = LabelMatcher.regex( - _regex_union(sorted(metric_name_filter)) + _regex_union(sorted(spec.metric_name_filter)) ) - if value_type_filter is not None: - labels["value_type"] = LabelMatcher.exact(value_type_filter.value) + if spec.value_type_filter is not None: + labels["value_type"] = LabelMatcher.exact(spec.value_type_filter.value) return MetricPreset( - template=self._get_template(metric_type), - labels=labels, + template=spec.template, group_by=_LIVE_STAT_GROUP_BY, + labels=labels, window=self._timewindow, ) + def _render_stats_query( + self, + bucket: _StatsBucket, + *, + kernel_id_regex: str, + group_by: str, + ) -> str: + stat_fn = f"{bucket.value_type}_over_time" + parts: list[str] = [] + if bucket.gauge_metrics or bucket.gauge_metric_patterns: + gauge_regex = _metric_name_regex(bucket.gauge_metrics, bucket.gauge_metric_patterns) + selector = self._utilization_selector(kernel_id_regex, gauge_regex) + parts.append(self._window_stat_subquery(stat_fn, selector, group_by, bucket.value_type)) + if bucket.rate_metrics: + rate_regex = _regex_union(sorted(bucket.rate_metrics)) + base = self._utilization_selector(kernel_id_regex, rate_regex) + selector = f"rate({base}[{self._timewindow}])" + parts.append(self._window_stat_subquery(stat_fn, selector, group_by, bucket.value_type)) + return " or ".join(parts) + + def _utilization_selector(self, kernel_id_regex: str, metric_name_regex: str) -> str: + labels = self._live_stat_current_labels( + kernel_id_regex=kernel_id_regex, + metric_name_regex=metric_name_regex, + ) + return f"{CONTAINER_UTILIZATION_METRIC_NAME}{{{labels}}}" + + def _window_stat_subquery( + self, + stat_fn: str, + selector: str, + group_by: str, + stat_label: ValueType, + ) -> str: + return ( + f"label_replace(" + f"{stat_fn}((sum by ({group_by})({selector}))[{self._timewindow}:])," + f'"value_type","{stat_label}","value_type",".*")' + ) + + def _live_stat_current_labels( + self, + *, + kernel_id_regex: str, + metric_name_regex: str, + ) -> str: + return ( + f'kernel_id=~"{kernel_id_regex}"' + f',container_metric_name=~"{metric_name_regex}"' + f',value_type="{ValueType.CURRENT}"' + ) + def _get_template(self, metric_type: MetricType) -> str: match metric_type: case MetricType.GAUGE: @@ -152,5 +290,3 @@ def _get_template(self, metric_type: MetricType) -> str: return _RATE_TEMPLATE case MetricType.DIFF: return _DIFF_TEMPLATE - case _: - raise UnreachableError(f"Unknown metric type: {metric_type}") diff --git a/src/ai/backend/common/clients/prometheus/metric_types.py b/src/ai/backend/common/clients/prometheus/metric_types.py index 6ce41499666..ce672b828db 100644 --- a/src/ai/backend/common/clients/prometheus/metric_types.py +++ b/src/ai/backend/common/clients/prometheus/metric_types.py @@ -61,19 +61,65 @@ class MetricType(StrEnum): @dataclass(frozen=True) class ContainerLiveStatQueries: - """Gauge / diff / rate query preset bundle for container live stats.""" + """Gauge / diff / rate / max / avg / rate_stats query preset bundle for container live stats.""" gauge: MetricPreset diff: MetricPreset rate: MetricPreset + max: MetricPreset + avg: MetricPreset + rate_stats: MetricPreset def to_list(self) -> list[MetricPreset]: - return [self.gauge, self.diff, self.rate] + return [self.gauge, self.diff, self.rate, self.max, self.avg, self.rate_stats] + + +# Backend.AI accelerator/plugin gauge metric naming convention. +# Adding a new suffix here is the single edit needed to extend stats.{max,avg} +# coverage to a new family of accelerator metrics (e.g., adding "clock" auto- +# covers cuda_clock / gpu_clock / tpu_clock). +_ACCEL_GAUGE_SUFFIXES_MAX_ONLY: Final[frozenset[str]] = frozenset({"mem"}) +_ACCEL_GAUGE_SUFFIXES_WITH_AVG: Final[frozenset[str]] = frozenset({ + "util", + "power", + "temperature", +}) + + +def _accel_suffix_pattern(suffixes: frozenset[str]) -> str: + body = "|".join(sorted(suffixes)) + return rf"[A-Za-z0-9][A-Za-z0-9_-]*_({body})" DIFF_METRICS: Final[frozenset[str]] = frozenset({"cpu_util"}) RATE_METRICS: Final[frozenset[str]] = frozenset({"net_rx", "net_tx"}) +# Intrinsic gauge metrics that don't follow the accelerator suffix convention. +STATS_MAX_GAUGE_METRICS: Final[frozenset[str]] = frozenset({ + "mem", + "io_scratch_size", +}) +STATS_AVG_GAUGE_METRICS: Final[frozenset[str]] = frozenset() +# Pattern-based gauge coverage for plugin/accelerator metrics. +STATS_MAX_GAUGE_METRIC_PATTERNS: Final[frozenset[str]] = frozenset({ + _accel_suffix_pattern(_ACCEL_GAUGE_SUFFIXES_MAX_ONLY | _ACCEL_GAUGE_SUFFIXES_WITH_AVG), +}) +STATS_AVG_GAUGE_METRIC_PATTERNS: Final[frozenset[str]] = frozenset({ + _accel_suffix_pattern(_ACCEL_GAUGE_SUFFIXES_WITH_AVG), +}) +STATS_MAX_OVER_RATE_METRICS: Final[frozenset[str]] = frozenset({"cpu_util"}) +STATS_AVG_OVER_RATE_METRICS: Final[frozenset[str]] = frozenset({"cpu_util"}) + +# stats.rate emission targets the legacy stats.rate live_stat label. +# Two metric shapes flow in: +# * "gauge" set: agent's current_hook already publishes per-second rate as +# the metric's `current` value, so we only need to sum across replicas +# and relabel to stats.rate (no PromQL rate() wrap). +# * "counter" set: the published series is a cumulative byte counter, so +# we apply rate(...[window]) to get bytes/sec before relabel. +STATS_RATE_GAUGE_METRICS: Final[frozenset[str]] = frozenset({"net_rx", "net_tx"}) +STATS_RATE_COUNTER_METRICS: Final[frozenset[str]] = frozenset({"io_read", "io_write"}) + @dataclass class ContainerMetricResponseInfo: diff --git a/src/ai/backend/common/clients/prometheus/types.py b/src/ai/backend/common/clients/prometheus/types.py index ef16273ab98..7219c7f58cd 100644 --- a/src/ai/backend/common/clients/prometheus/types.py +++ b/src/ai/backend/common/clients/prometheus/types.py @@ -8,6 +8,9 @@ class ValueType(StrEnum): CURRENT = "current" CAPACITY = "capacity" PCT = "pct" + MAX = "max" + AVG = "avg" + RATE = "rate" @dataclass(frozen=True) diff --git a/src/ai/backend/manager/repositories/metric/repository.py b/src/ai/backend/manager/repositories/metric/repository.py index e7278927b37..ccbda445cfa 100644 --- a/src/ai/backend/manager/repositories/metric/repository.py +++ b/src/ai/backend/manager/repositories/metric/repository.py @@ -70,7 +70,7 @@ async def query_container_live_stats( return KernelLiveStatBatchResult.empty(kernel_ids) try: values_by_kernel = await self._prometheus_client.fetch_container_live_stats(kernel_ids) - except (PrometheusConnectionError, FailedToGetMetric): - log.warning("Failed to query metrics for kernel live stats, returning empty results") + except (PrometheusConnectionError, FailedToGetMetric) as e: + log.warning("Failed to query metrics for kernel live stats: {!r}", e) return KernelLiveStatBatchResult.empty(kernel_ids) return KernelLiveStatBatchResult.from_metric_values(kernel_ids, values_by_kernel) diff --git a/tests/unit/manager/services/utilization_metric/test_container_metric.py b/tests/unit/manager/services/utilization_metric/test_container_metric.py index d6d6898974e..05a17f24e4b 100644 --- a/tests/unit/manager/services/utilization_metric/test_container_metric.py +++ b/tests/unit/manager/services/utilization_metric/test_container_metric.py @@ -14,6 +14,7 @@ from ai.backend.common.clients.prometheus.metric_types import ( ContainerMetricOptionalLabel, ContainerMetricResponseInfo, + KernelMetricValuesByKernel, MetricType, ValueType, ) @@ -30,6 +31,7 @@ InvalidAPIParameters, PrometheusConnectionError, ) +from ai.backend.common.types import KernelId from ai.backend.manager.repositories.metric.repository import MetricRepository from ai.backend.manager.services.metric.actions.container import ( ContainerMetricAction, @@ -804,6 +806,96 @@ async def test_build_query_renders_expected_promql(self, case: BuiltinQueryTestC assert rendered_query == case.expected_query +class TestLiveStatQueryProvider: + """Characterization tests for container live stat PromQL.""" + + def test_stats_queries_render_legacy_labels_from_typed_value_types(self) -> None: + kernel_id = KernelId(UUID("12345678-1234-5678-1234-567812345678")) + fixed_query_builder = FixedQueryBuilder("5m") + + queries = fixed_query_builder.get_container_live_stat_queries([kernel_id]) + + assert queries.max.render() == ( + "label_replace(max_over_time((sum by (container_metric_name,kernel_id,value_type)(" + "backendai_container_utilization" + '{kernel_id=~"12345678-1234-5678-1234-567812345678",' + 'container_metric_name=~"io_scratch_size|mem|' + '[A-Za-z0-9][A-Za-z0-9_-]*_(mem|power|temperature|util)",' + 'value_type="current"}))[5m:]),' + '"value_type","max","value_type",".*")' + " or " + "label_replace(max_over_time((sum by (container_metric_name,kernel_id,value_type)(rate(" + "backendai_container_utilization" + '{kernel_id=~"12345678-1234-5678-1234-567812345678",' + 'container_metric_name=~"cpu_util",value_type="current"}' + "[5m])))[5m:])," + '"value_type","max","value_type",".*")' + ) + assert queries.avg.render() == ( + "label_replace(avg_over_time((sum by (container_metric_name,kernel_id,value_type)(" + "backendai_container_utilization" + '{kernel_id=~"12345678-1234-5678-1234-567812345678",' + 'container_metric_name=~"[A-Za-z0-9][A-Za-z0-9_-]*_(power|temperature|util)",' + 'value_type="current"}))[5m:]),' + '"value_type","avg","value_type",".*")' + " or " + "label_replace(avg_over_time((sum by (container_metric_name,kernel_id,value_type)(rate(" + "backendai_container_utilization" + '{kernel_id=~"12345678-1234-5678-1234-567812345678",' + 'container_metric_name=~"cpu_util",value_type="current"}' + "[5m])))[5m:])," + '"value_type","avg","value_type",".*")' + ) + + def test_rate_stats_query_renders_legacy_stats_rate_label(self) -> None: + kernel_id = KernelId(UUID("12345678-1234-5678-1234-567812345678")) + fixed_query_builder = FixedQueryBuilder("5m") + + queries = fixed_query_builder.get_container_live_stat_queries([kernel_id]) + + assert queries.rate_stats.render() == ( + "label_replace(sum by (container_metric_name,kernel_id,value_type)(" + "backendai_container_utilization" + '{kernel_id=~"12345678-1234-5678-1234-567812345678",' + 'container_metric_name=~"net_rx|net_tx",' + 'value_type="current"}),' + '"value_type","rate","value_type",".*")' + " or " + "label_replace(sum by (container_metric_name,kernel_id,value_type)(rate(" + "backendai_container_utilization" + '{kernel_id=~"12345678-1234-5678-1234-567812345678",' + 'container_metric_name=~"io_read|io_write",' + 'value_type="current"}' + "[5m]))," + '"value_type","rate","value_type",".*")' + ) + + +class TestKernelMetricValuesByKernel: + def test_from_prometheus_response_parses_value_type_into_enum(self) -> None: + kernel_id = KernelId(UUID("12345678-1234-5678-1234-567812345678")) + response = PrometheusResponse( + status="success", + data=PrometheusQueryData( + result_type="vector", + result=[ + MetricResponse( + metric=MetricResponseInfo( + kernel_id=str(kernel_id), + container_metric_name="mem", + value_type="max", + ), + values=[(1704067200.0, "1024")], + ) + ], + ), + ) + + result = KernelMetricValuesByKernel.from_prometheus_response(response) + + assert result.values_by_kernel[kernel_id][0].value_type == ValueType.MAX + + class TestMetricResponseInfoParsing: """Unit tests for MetricResponseInfo parsing behavior."""