Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions doc/source/serve/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,11 @@ These metrics track replica health, restarts, and lifecycle timing.
These lifecycle **histograms** use `deployment` and `application` labels only—no `replica` label—so Prometheus cardinality stays manageable at scale.
:::

By default, replica lifecycle metrics include source identifiers such as
`replica` where applicable. For large deployments, set
`RAY_SERVE_CONTROLLER_METRICS_INCLUDE_HIGH_CARDINALITY_TAGS=0` to drop those
source-level high-cardinality tags while retaining `deployment` and `application`.

| Metric | Type | Tags | Description |
|--------|------|------|-------------|
| `ray_serve_deployment_replica_healthy` | Gauge | `deployment`, `replica`, `application` | Health status of the replica: `1` = healthy, `0` = unhealthy. |
Expand All @@ -733,6 +738,11 @@ These lifecycle **histograms** use `deployment` and `application` labels only—

These metrics provide visibility into autoscaling behavior and help debug scaling issues.

By default, autoscaling metrics include source identifiers such as `handle` and
`replica` where applicable. For large deployments, set
`RAY_SERVE_CONTROLLER_METRICS_INCLUDE_HIGH_CARDINALITY_TAGS=0` to drop those
source-level high-cardinality tags while retaining `deployment` and `application`.

| Metric | Type | Tags | Description |
|--------|------|------|-------------|
| `ray_serve_autoscaling_target_replicas` | Gauge | `deployment`, `application` | Target number of replicas the autoscaler is trying to reach. Compare with actual replicas to identify scaling lag. |
Expand Down
8 changes: 8 additions & 0 deletions python/ray/serve/_private/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,14 @@
RAY_SERVE_AGGREGATE_METRICS_AT_CONTROLLER = get_env_bool(
"RAY_SERVE_AGGREGATE_METRICS_AT_CONTROLLER", "0"
)

# Feature flag to include high-cardinality source tags on Serve controller metrics.
# Disable this to keep deployment/application tags while dropping source identifiers
# like replica IDs and handle IDs from controller-emitted metrics.
RAY_SERVE_CONTROLLER_METRICS_INCLUDE_HIGH_CARDINALITY_TAGS = get_env_bool(
"RAY_SERVE_CONTROLLER_METRICS_INCLUDE_HIGH_CARDINALITY_TAGS", "1"
)

# Feature flag to use compact (low-cardinality) namespace tags on long poll metrics.
# When enabled, metric tags use only the LongPollNamespace enum name
# (e.g., "DEPLOYMENT_CONFIG") instead of the full key string which includes
Expand Down
54 changes: 41 additions & 13 deletions python/ray/serve/_private/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from ray.serve._private.constants import (
CONTROL_LOOP_INTERVAL_S,
RAY_SERVE_CONTROLLER_CALLBACK_IMPORT_PATH,
RAY_SERVE_CONTROLLER_METRICS_INCLUDE_HIGH_CARDINALITY_TAGS,
RAY_SERVE_ENABLE_DIRECT_INGRESS,
RAY_SERVE_ENABLE_HA_PROXY,
RAY_SERVE_LOG_TO_STDERR,
Expand Down Expand Up @@ -114,6 +115,33 @@

logger = logging.getLogger(SERVE_LOGGER_NAME)

_AUTOSCALING_METRICS_DELAY_BASE_TAG_KEYS = ("deployment", "application")


def _get_autoscaling_metrics_delay_tag_keys(
high_cardinality_tag_keys: Tuple[str, ...],
) -> Tuple[str, ...]:
"""Return tag keys for controller autoscaling metrics delay gauges."""
if RAY_SERVE_CONTROLLER_METRICS_INCLUDE_HIGH_CARDINALITY_TAGS:
return _AUTOSCALING_METRICS_DELAY_BASE_TAG_KEYS + high_cardinality_tag_keys
return _AUTOSCALING_METRICS_DELAY_BASE_TAG_KEYS


def _get_autoscaling_metrics_delay_tags(
deployment_id: DeploymentID,
high_cardinality_tag_key: str,
high_cardinality_tag_value: str,
) -> Dict[str, str]:
"""Return tags for controller autoscaling metrics delay gauge updates."""
tags = {
"deployment": deployment_id.name,
"application": deployment_id.app_name,
}
if RAY_SERVE_CONTROLLER_METRICS_INCLUDE_HIGH_CARDINALITY_TAGS:
tags[high_cardinality_tag_key] = high_cardinality_tag_value
return tags
Comment thread
johntaylor-cell marked this conversation as resolved.


# Used for testing purposes only. If this is set, the controller will crash
# after writing each checkpoint with the specified probability.
_CRASH_AFTER_CHECKPOINT_PROBABILITY = 0
Expand Down Expand Up @@ -358,11 +386,11 @@ def record_autoscaling_metrics_from_replica(
# Record the metrics delay for observability
self.replica_metrics_delay_gauge.set(
latency_ms,
tags={
"deployment": replica_metric_report.replica_id.deployment_id.name,
"application": replica_metric_report.replica_id.deployment_id.app_name,
"replica": replica_metric_report.replica_id.unique_id,
},
tags=_get_autoscaling_metrics_delay_tags(
replica_metric_report.replica_id.deployment_id,
"replica",
replica_metric_report.replica_id.unique_id,
),
Comment thread
johntaylor-cell marked this conversation as resolved.
)
# Track in health metrics
self._health_metrics_tracker.record_replica_metrics_delay(latency_ms)
Expand All @@ -380,11 +408,11 @@ def record_autoscaling_metrics_from_handle(
# Record the metrics delay for observability
self.handle_metrics_delay_gauge.set(
latency_ms,
tags={
"deployment": handle_metric_report.deployment_id.name,
"application": handle_metric_report.deployment_id.app_name,
"handle": handle_metric_report.handle_id,
},
tags=_get_autoscaling_metrics_delay_tags(
handle_metric_report.deployment_id,
"handle",
handle_metric_report.handle_id,
),
Comment thread
johntaylor-cell marked this conversation as resolved.
)
# Track in health metrics
self._health_metrics_tracker.record_handle_metrics_delay(latency_ms)
Expand Down Expand Up @@ -742,23 +770,23 @@ def _create_control_loop_metrics(self):
"Time taken for the replica metrics to be reported to the controller. "
"High values may indicate a busy controller."
),
tag_keys=("deployment", "application", "replica"),
tag_keys=_get_autoscaling_metrics_delay_tag_keys(("replica",)),
)
self.handle_metrics_delay_gauge = metrics.Gauge(
"serve_autoscaling_handle_metrics_delay_ms",
description=(
"Time taken for the handle metrics to be reported to the controller. "
"High values may indicate a busy controller."
),
tag_keys=("deployment", "application", "handle"),
tag_keys=_get_autoscaling_metrics_delay_tag_keys(("handle",)),
)
self.async_inference_task_queue_metrics_delay_gauge = metrics.Gauge(
"serve_autoscaling_async_inference_task_queue_metrics_delay_ms",
description=(
"Time taken for the async inference task queue metrics to be reported "
"to the controller. High values may indicate a busy controller."
),
tag_keys=("deployment", "application"),
tag_keys=_AUTOSCALING_METRICS_DELAY_BASE_TAG_KEYS,
)

def _recover_state_from_checkpoint(self):
Expand Down
31 changes: 27 additions & 4 deletions python/ray/serve/_private/deployment_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
DEPLOYMENT_ACTOR_HEALTH_CHECK_TIMEOUT_S,
DEPLOYMENT_ACTOR_HEALTH_CHECK_UNHEALTHY_THRESHOLD,
MAX_PER_REPLICA_RETRY_COUNT,
RAY_SERVE_CONTROLLER_METRICS_INCLUDE_HIGH_CARDINALITY_TAGS,
RAY_SERVE_DIRECT_INGRESS_MIN_DRAINING_PERIOD_S,
RAY_SERVE_ENABLE_DIRECT_INGRESS,
RAY_SERVE_ENABLE_TASK_EVENTS,
Expand Down Expand Up @@ -104,6 +105,23 @@

logger = logging.getLogger(SERVE_LOGGER_NAME)

_REPLICA_LIFECYCLE_METRIC_BASE_TAG_KEYS = ("deployment", "application")


def _get_replica_lifecycle_metric_tag_keys() -> Tuple[str, ...]:
if RAY_SERVE_CONTROLLER_METRICS_INCLUDE_HIGH_CARDINALITY_TAGS:
return ("deployment", "replica", "application")
return _REPLICA_LIFECYCLE_METRIC_BASE_TAG_KEYS


def _get_replica_lifecycle_metric_tags(
replica_unique_id: str,
) -> Optional[Dict[str, str]]:
if RAY_SERVE_CONTROLLER_METRICS_INCLUDE_HIGH_CARDINALITY_TAGS:
return {"replica": replica_unique_id}
return None


_RESERVED_INTERNAL_DEPLOYMENT_CONTEXT_ENV_VARS = {
RAY_SERVE_INTERNAL_DEPLOYMENT_APP_NAME_ENV_VAR,
RAY_SERVE_INTERNAL_DEPLOYMENT_NAME_ENV_VAR,
Expand Down Expand Up @@ -2869,7 +2887,7 @@ def __init__(
"Tracks whether this deployment replica is healthy. 1 means "
"healthy, 0 means unhealthy."
),
tag_keys=("deployment", "replica", "application"),
tag_keys=_get_replica_lifecycle_metric_tag_keys(),
)
self.health_check_gauge.set_default_tags(
{"deployment": self._id.name, "application": self._id.app_name}
Expand Down Expand Up @@ -2924,7 +2942,7 @@ def __init__(
self.health_check_failures_counter = metrics.Counter(
"serve_health_check_failures_total",
description=("Count of failed health checks."),
tag_keys=("deployment", "replica", "application"),
tag_keys=_get_replica_lifecycle_metric_tag_keys(),
)
self.health_check_failures_counter.set_default_tags(
{"deployment": self._id.name, "application": self._id.app_name}
Expand Down Expand Up @@ -4331,7 +4349,10 @@ def _set_health_gauge(self, replica_unique_id: str, value: int) -> None:
and (now - cached[1]) < RAY_SERVE_STATUS_GAUGE_REPORT_INTERVAL_S
):
return
self.health_check_gauge.set(value, tags={"replica": replica_unique_id})
self.health_check_gauge.set(
value,
tags=_get_replica_lifecycle_metric_tags(replica_unique_id),
)
self._health_gauge_cache[replica_unique_id] = (value, now)

def _register_gang_replica(self, replica_id: ReplicaID, gang_id: str) -> None:
Expand Down Expand Up @@ -4498,7 +4519,9 @@ def check_and_update_replicas(self):
)
if replica.last_health_check_failed:
self.health_check_failures_counter.inc(
tags={"replica": replica.replica_id.unique_id}
tags=_get_replica_lifecycle_metric_tags(
replica.replica_id.unique_id
)
)

if is_healthy:
Expand Down
53 changes: 49 additions & 4 deletions python/ray/serve/_private/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
HEALTHY_MESSAGE,
RAY_SERVE_AUTOSCALING_METRIC_RECORD_INTERVAL_FACTOR,
RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE,
RAY_SERVE_CONTROLLER_METRICS_INCLUDE_HIGH_CARDINALITY_TAGS,
RAY_SERVE_DIRECT_INGRESS_MIN_DRAINING_PERIOD_S,
RAY_SERVE_DIRECT_INGRESS_PORT_RETRY_COUNT,
RAY_SERVE_ENABLE_DIRECT_INGRESS,
Expand Down Expand Up @@ -190,6 +191,30 @@

logger = logging.getLogger(SERVE_LOGGER_NAME)

_REPLICA_METRIC_BASE_TAG_KEYS = ("deployment", "application")


def _get_replica_metric_default_tag_keys(
additional_tag_keys: Tuple[str, ...] = tuple(),
) -> Tuple[str, ...]:
if RAY_SERVE_CONTROLLER_METRICS_INCLUDE_HIGH_CARDINALITY_TAGS:
return ("deployment", "replica", "application") + additional_tag_keys
return _REPLICA_METRIC_BASE_TAG_KEYS + additional_tag_keys


def _get_replica_metric_default_tags(
deployment_id: DeploymentID,
replica_unique_id: str,
) -> Dict[str, str]:
tags = {
"deployment": deployment_id.name,
"application": deployment_id.app_name,
}
if RAY_SERVE_CONTROLLER_METRICS_INCLUDE_HIGH_CARDINALITY_TAGS:
tags["replica"] = replica_unique_id
return tags


SERVE_BUILD_ASGI_APP_METHOD = "__serve_build_asgi_app__"


Expand Down Expand Up @@ -363,11 +388,18 @@ def __init__(
self._cached_metrics_interval_s = RAY_SERVE_METRICS_EXPORT_INTERVAL_MS / 1000

# Request counter (only set on replica startup).
self._restart_counter = metrics.Counter(
self._restart_counter = ray_metrics.Counter(
"serve_deployment_replica_starts",
description=(
"The number of times this replica has been restarted due to failure."
),
tag_keys=_get_replica_metric_default_tag_keys(),
)
self._restart_counter.set_default_tags(
_get_replica_metric_default_tags(
self._deployment_id,
self._replica_id.unique_id,
)
)
self._restart_counter.inc()

Expand Down Expand Up @@ -426,19 +458,32 @@ def __init__(
description="The current number of queries being processed.",
)

self.record_autoscaling_stats_failed_counter = metrics.Counter(
self.record_autoscaling_stats_failed_counter = ray_metrics.Counter(
"serve_record_autoscaling_stats_failed",
tag_keys=("exception_name",),
tag_keys=_get_replica_metric_default_tag_keys(("exception_name",)),
description="The number of errored record_autoscaling_stats invocations.",
)
self.record_autoscaling_stats_failed_counter.set_default_tags(
_get_replica_metric_default_tags(
self._deployment_id,
self._replica_id.unique_id,
)
)

self.user_autoscaling_stats_latency_tracker = metrics.Histogram(
self.user_autoscaling_stats_latency_tracker = ray_metrics.Histogram(
"serve_user_autoscaling_stats_latency_ms",
description=(
"Time taken to execute the user-defined autoscaling stats function "
"in milliseconds."
),
boundaries=REQUEST_LATENCY_BUCKETS_MS,
tag_keys=_get_replica_metric_default_tag_keys(),
)
self.user_autoscaling_stats_latency_tracker.set_default_tags(
_get_replica_metric_default_tags(
self._deployment_id,
self._replica_id.unique_id,
)
)

# Replica utilization tracking with rolling window.
Expand Down
56 changes: 55 additions & 1 deletion python/ray/serve/tests/unit/test_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

import pytest

from ray.serve._private.common import TargetCapacityDirection
from ray.serve._private.common import DeploymentID, TargetCapacityDirection
from ray.serve._private.controller import (
_get_autoscaling_metrics_delay_tag_keys,
_get_autoscaling_metrics_delay_tags,
applications_match,
calculate_target_capacity_direction,
)
Expand All @@ -20,6 +22,58 @@
)


def test_autoscaling_metrics_delay_tags_include_high_cardinality_by_default(
monkeypatch,
):
from ray.serve._private import controller

monkeypatch.setattr(
controller,
"RAY_SERVE_CONTROLLER_METRICS_INCLUDE_HIGH_CARDINALITY_TAGS",
True,
)

assert _get_autoscaling_metrics_delay_tag_keys(("handle",)) == (
"deployment",
"application",
"handle",
)
assert _get_autoscaling_metrics_delay_tags(
DeploymentID(name="deployment", app_name="application"),
"handle",
"handle-id",
) == {
"deployment": "deployment",
"application": "application",
"handle": "handle-id",
}
Comment thread
johntaylor-cell marked this conversation as resolved.


def test_autoscaling_metrics_delay_tags_exclude_high_cardinality(
monkeypatch,
):
from ray.serve._private import controller

monkeypatch.setattr(
controller,
"RAY_SERVE_CONTROLLER_METRICS_INCLUDE_HIGH_CARDINALITY_TAGS",
False,
)

assert _get_autoscaling_metrics_delay_tag_keys(("replica",)) == (
"deployment",
"application",
)
assert _get_autoscaling_metrics_delay_tags(
DeploymentID(name="deployment", app_name="application"),
"replica",
"replica-id",
) == {
"deployment": "deployment",
"application": "application",
}
Comment thread
johntaylor-cell marked this conversation as resolved.


def create_app_config(name: str) -> ServeApplicationSchema:
return ServeApplicationSchema(
name=name, import_path=f"fake.{name}", route_prefix=f"/{name}"
Expand Down
Loading
Loading