diff --git a/doc/source/serve/monitoring.md b/doc/source/serve/monitoring.md index 97f1a0ada4dc..a3d1053f20de 100644 --- a/doc/source/serve/monitoring.md +++ b/doc/source/serve/monitoring.md @@ -718,6 +718,11 @@ These metrics track replica health, restarts, and lifecycle timing. These lifecycle **histograms** use `deployment` and `application` labels only—no `replica` label—so Prometheus cardinality stays manageable at scale. ::: +By default, replica lifecycle metrics include source identifiers such as +`replica` where applicable. For large deployments, set +`RAY_SERVE_CONTROLLER_METRICS_INCLUDE_HIGH_CARDINALITY_TAGS=0` to drop those +source-level high-cardinality tags while retaining `deployment` and `application`. + | Metric | Type | Tags | Description | |--------|------|------|-------------| | `ray_serve_deployment_replica_healthy` | Gauge | `deployment`, `replica`, `application` | Health status of the replica: `1` = healthy, `0` = unhealthy. | @@ -733,6 +738,11 @@ These lifecycle **histograms** use `deployment` and `application` labels only— These metrics provide visibility into autoscaling behavior and help debug scaling issues. +By default, autoscaling metrics include source identifiers such as `handle` and +`replica` where applicable. For large deployments, set +`RAY_SERVE_CONTROLLER_METRICS_INCLUDE_HIGH_CARDINALITY_TAGS=0` to drop those +source-level high-cardinality tags while retaining `deployment` and `application`. + | Metric | Type | Tags | Description | |--------|------|------|-------------| | `ray_serve_autoscaling_target_replicas` | Gauge | `deployment`, `application` | Target number of replicas the autoscaler is trying to reach. Compare with actual replicas to identify scaling lag. | diff --git a/python/ray/serve/_private/constants.py b/python/ray/serve/_private/constants.py index 73f404f087e5..084a35a42774 100644 --- a/python/ray/serve/_private/constants.py +++ b/python/ray/serve/_private/constants.py @@ -928,6 +928,14 @@ RAY_SERVE_AGGREGATE_METRICS_AT_CONTROLLER = get_env_bool( "RAY_SERVE_AGGREGATE_METRICS_AT_CONTROLLER", "0" ) + +# Feature flag to include high-cardinality source tags on Serve controller metrics. +# Disable this to keep deployment/application tags while dropping source identifiers +# like replica IDs and handle IDs from controller-emitted metrics. +RAY_SERVE_CONTROLLER_METRICS_INCLUDE_HIGH_CARDINALITY_TAGS = get_env_bool( + "RAY_SERVE_CONTROLLER_METRICS_INCLUDE_HIGH_CARDINALITY_TAGS", "1" +) + # Feature flag to use compact (low-cardinality) namespace tags on long poll metrics. # When enabled, metric tags use only the LongPollNamespace enum name # (e.g., "DEPLOYMENT_CONFIG") instead of the full key string which includes diff --git a/python/ray/serve/_private/controller.py b/python/ray/serve/_private/controller.py index 4239d5bdaae2..408108ca74f1 100644 --- a/python/ray/serve/_private/controller.py +++ b/python/ray/serve/_private/controller.py @@ -36,6 +36,7 @@ from ray.serve._private.constants import ( CONTROL_LOOP_INTERVAL_S, RAY_SERVE_CONTROLLER_CALLBACK_IMPORT_PATH, + RAY_SERVE_CONTROLLER_METRICS_INCLUDE_HIGH_CARDINALITY_TAGS, RAY_SERVE_ENABLE_DIRECT_INGRESS, RAY_SERVE_ENABLE_HA_PROXY, RAY_SERVE_LOG_TO_STDERR, @@ -114,6 +115,33 @@ logger = logging.getLogger(SERVE_LOGGER_NAME) +_AUTOSCALING_METRICS_DELAY_BASE_TAG_KEYS = ("deployment", "application") + + +def _get_autoscaling_metrics_delay_tag_keys( + high_cardinality_tag_keys: Tuple[str, ...], +) -> Tuple[str, ...]: + """Return tag keys for controller autoscaling metrics delay gauges.""" + if RAY_SERVE_CONTROLLER_METRICS_INCLUDE_HIGH_CARDINALITY_TAGS: + return _AUTOSCALING_METRICS_DELAY_BASE_TAG_KEYS + high_cardinality_tag_keys + return _AUTOSCALING_METRICS_DELAY_BASE_TAG_KEYS + + +def _get_autoscaling_metrics_delay_tags( + deployment_id: DeploymentID, + high_cardinality_tag_key: str, + high_cardinality_tag_value: str, +) -> Dict[str, str]: + """Return tags for controller autoscaling metrics delay gauge updates.""" + tags = { + "deployment": deployment_id.name, + "application": deployment_id.app_name, + } + if RAY_SERVE_CONTROLLER_METRICS_INCLUDE_HIGH_CARDINALITY_TAGS: + tags[high_cardinality_tag_key] = high_cardinality_tag_value + return tags + + # Used for testing purposes only. If this is set, the controller will crash # after writing each checkpoint with the specified probability. _CRASH_AFTER_CHECKPOINT_PROBABILITY = 0 @@ -358,11 +386,11 @@ def record_autoscaling_metrics_from_replica( # Record the metrics delay for observability self.replica_metrics_delay_gauge.set( latency_ms, - tags={ - "deployment": replica_metric_report.replica_id.deployment_id.name, - "application": replica_metric_report.replica_id.deployment_id.app_name, - "replica": replica_metric_report.replica_id.unique_id, - }, + tags=_get_autoscaling_metrics_delay_tags( + replica_metric_report.replica_id.deployment_id, + "replica", + replica_metric_report.replica_id.unique_id, + ), ) # Track in health metrics self._health_metrics_tracker.record_replica_metrics_delay(latency_ms) @@ -380,11 +408,11 @@ def record_autoscaling_metrics_from_handle( # Record the metrics delay for observability self.handle_metrics_delay_gauge.set( latency_ms, - tags={ - "deployment": handle_metric_report.deployment_id.name, - "application": handle_metric_report.deployment_id.app_name, - "handle": handle_metric_report.handle_id, - }, + tags=_get_autoscaling_metrics_delay_tags( + handle_metric_report.deployment_id, + "handle", + handle_metric_report.handle_id, + ), ) # Track in health metrics self._health_metrics_tracker.record_handle_metrics_delay(latency_ms) @@ -742,7 +770,7 @@ def _create_control_loop_metrics(self): "Time taken for the replica metrics to be reported to the controller. " "High values may indicate a busy controller." ), - tag_keys=("deployment", "application", "replica"), + tag_keys=_get_autoscaling_metrics_delay_tag_keys(("replica",)), ) self.handle_metrics_delay_gauge = metrics.Gauge( "serve_autoscaling_handle_metrics_delay_ms", @@ -750,7 +778,7 @@ def _create_control_loop_metrics(self): "Time taken for the handle metrics to be reported to the controller. " "High values may indicate a busy controller." ), - tag_keys=("deployment", "application", "handle"), + tag_keys=_get_autoscaling_metrics_delay_tag_keys(("handle",)), ) self.async_inference_task_queue_metrics_delay_gauge = metrics.Gauge( "serve_autoscaling_async_inference_task_queue_metrics_delay_ms", @@ -758,7 +786,7 @@ def _create_control_loop_metrics(self): "Time taken for the async inference task queue metrics to be reported " "to the controller. High values may indicate a busy controller." ), - tag_keys=("deployment", "application"), + tag_keys=_AUTOSCALING_METRICS_DELAY_BASE_TAG_KEYS, ) def _recover_state_from_checkpoint(self): diff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py index 856e8183ebe0..6284574fe622 100644 --- a/python/ray/serve/_private/deployment_state.py +++ b/python/ray/serve/_private/deployment_state.py @@ -49,6 +49,7 @@ DEPLOYMENT_ACTOR_HEALTH_CHECK_TIMEOUT_S, DEPLOYMENT_ACTOR_HEALTH_CHECK_UNHEALTHY_THRESHOLD, MAX_PER_REPLICA_RETRY_COUNT, + RAY_SERVE_CONTROLLER_METRICS_INCLUDE_HIGH_CARDINALITY_TAGS, RAY_SERVE_DIRECT_INGRESS_MIN_DRAINING_PERIOD_S, RAY_SERVE_ENABLE_DIRECT_INGRESS, RAY_SERVE_ENABLE_TASK_EVENTS, @@ -104,6 +105,23 @@ logger = logging.getLogger(SERVE_LOGGER_NAME) +_REPLICA_LIFECYCLE_METRIC_BASE_TAG_KEYS = ("deployment", "application") + + +def _get_replica_lifecycle_metric_tag_keys() -> Tuple[str, ...]: + if RAY_SERVE_CONTROLLER_METRICS_INCLUDE_HIGH_CARDINALITY_TAGS: + return ("deployment", "replica", "application") + return _REPLICA_LIFECYCLE_METRIC_BASE_TAG_KEYS + + +def _get_replica_lifecycle_metric_tags( + replica_unique_id: str, +) -> Optional[Dict[str, str]]: + if RAY_SERVE_CONTROLLER_METRICS_INCLUDE_HIGH_CARDINALITY_TAGS: + return {"replica": replica_unique_id} + return None + + _RESERVED_INTERNAL_DEPLOYMENT_CONTEXT_ENV_VARS = { RAY_SERVE_INTERNAL_DEPLOYMENT_APP_NAME_ENV_VAR, RAY_SERVE_INTERNAL_DEPLOYMENT_NAME_ENV_VAR, @@ -2869,7 +2887,7 @@ def __init__( "Tracks whether this deployment replica is healthy. 1 means " "healthy, 0 means unhealthy." ), - tag_keys=("deployment", "replica", "application"), + tag_keys=_get_replica_lifecycle_metric_tag_keys(), ) self.health_check_gauge.set_default_tags( {"deployment": self._id.name, "application": self._id.app_name} @@ -2924,7 +2942,7 @@ def __init__( self.health_check_failures_counter = metrics.Counter( "serve_health_check_failures_total", description=("Count of failed health checks."), - tag_keys=("deployment", "replica", "application"), + tag_keys=_get_replica_lifecycle_metric_tag_keys(), ) self.health_check_failures_counter.set_default_tags( {"deployment": self._id.name, "application": self._id.app_name} @@ -4331,7 +4349,10 @@ def _set_health_gauge(self, replica_unique_id: str, value: int) -> None: and (now - cached[1]) < RAY_SERVE_STATUS_GAUGE_REPORT_INTERVAL_S ): return - self.health_check_gauge.set(value, tags={"replica": replica_unique_id}) + self.health_check_gauge.set( + value, + tags=_get_replica_lifecycle_metric_tags(replica_unique_id), + ) self._health_gauge_cache[replica_unique_id] = (value, now) def _register_gang_replica(self, replica_id: ReplicaID, gang_id: str) -> None: @@ -4498,7 +4519,9 @@ def check_and_update_replicas(self): ) if replica.last_health_check_failed: self.health_check_failures_counter.inc( - tags={"replica": replica.replica_id.unique_id} + tags=_get_replica_lifecycle_metric_tags( + replica.replica_id.unique_id + ) ) if is_healthy: diff --git a/python/ray/serve/_private/replica.py b/python/ray/serve/_private/replica.py index b7232b0527b4..eb2e04006756 100644 --- a/python/ray/serve/_private/replica.py +++ b/python/ray/serve/_private/replica.py @@ -63,6 +63,7 @@ HEALTHY_MESSAGE, RAY_SERVE_AUTOSCALING_METRIC_RECORD_INTERVAL_FACTOR, RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE, + RAY_SERVE_CONTROLLER_METRICS_INCLUDE_HIGH_CARDINALITY_TAGS, RAY_SERVE_DIRECT_INGRESS_MIN_DRAINING_PERIOD_S, RAY_SERVE_DIRECT_INGRESS_PORT_RETRY_COUNT, RAY_SERVE_ENABLE_DIRECT_INGRESS, @@ -190,6 +191,30 @@ logger = logging.getLogger(SERVE_LOGGER_NAME) +_REPLICA_METRIC_BASE_TAG_KEYS = ("deployment", "application") + + +def _get_replica_metric_default_tag_keys( + additional_tag_keys: Tuple[str, ...] = tuple(), +) -> Tuple[str, ...]: + if RAY_SERVE_CONTROLLER_METRICS_INCLUDE_HIGH_CARDINALITY_TAGS: + return ("deployment", "replica", "application") + additional_tag_keys + return _REPLICA_METRIC_BASE_TAG_KEYS + additional_tag_keys + + +def _get_replica_metric_default_tags( + deployment_id: DeploymentID, + replica_unique_id: str, +) -> Dict[str, str]: + tags = { + "deployment": deployment_id.name, + "application": deployment_id.app_name, + } + if RAY_SERVE_CONTROLLER_METRICS_INCLUDE_HIGH_CARDINALITY_TAGS: + tags["replica"] = replica_unique_id + return tags + + SERVE_BUILD_ASGI_APP_METHOD = "__serve_build_asgi_app__" @@ -363,11 +388,18 @@ def __init__( self._cached_metrics_interval_s = RAY_SERVE_METRICS_EXPORT_INTERVAL_MS / 1000 # Request counter (only set on replica startup). - self._restart_counter = metrics.Counter( + self._restart_counter = ray_metrics.Counter( "serve_deployment_replica_starts", description=( "The number of times this replica has been restarted due to failure." ), + tag_keys=_get_replica_metric_default_tag_keys(), + ) + self._restart_counter.set_default_tags( + _get_replica_metric_default_tags( + self._deployment_id, + self._replica_id.unique_id, + ) ) self._restart_counter.inc() @@ -426,19 +458,32 @@ def __init__( description="The current number of queries being processed.", ) - self.record_autoscaling_stats_failed_counter = metrics.Counter( + self.record_autoscaling_stats_failed_counter = ray_metrics.Counter( "serve_record_autoscaling_stats_failed", - tag_keys=("exception_name",), + tag_keys=_get_replica_metric_default_tag_keys(("exception_name",)), description="The number of errored record_autoscaling_stats invocations.", ) + self.record_autoscaling_stats_failed_counter.set_default_tags( + _get_replica_metric_default_tags( + self._deployment_id, + self._replica_id.unique_id, + ) + ) - self.user_autoscaling_stats_latency_tracker = metrics.Histogram( + self.user_autoscaling_stats_latency_tracker = ray_metrics.Histogram( "serve_user_autoscaling_stats_latency_ms", description=( "Time taken to execute the user-defined autoscaling stats function " "in milliseconds." ), boundaries=REQUEST_LATENCY_BUCKETS_MS, + tag_keys=_get_replica_metric_default_tag_keys(), + ) + self.user_autoscaling_stats_latency_tracker.set_default_tags( + _get_replica_metric_default_tags( + self._deployment_id, + self._replica_id.unique_id, + ) ) # Replica utilization tracking with rolling window. diff --git a/python/ray/serve/tests/unit/test_controller.py b/python/ray/serve/tests/unit/test_controller.py index 1db5ac20bff1..4b4d5071a193 100644 --- a/python/ray/serve/tests/unit/test_controller.py +++ b/python/ray/serve/tests/unit/test_controller.py @@ -2,8 +2,10 @@ import pytest -from ray.serve._private.common import TargetCapacityDirection +from ray.serve._private.common import DeploymentID, TargetCapacityDirection from ray.serve._private.controller import ( + _get_autoscaling_metrics_delay_tag_keys, + _get_autoscaling_metrics_delay_tags, applications_match, calculate_target_capacity_direction, ) @@ -20,6 +22,58 @@ ) +def test_autoscaling_metrics_delay_tags_include_high_cardinality_by_default( + monkeypatch, +): + from ray.serve._private import controller + + monkeypatch.setattr( + controller, + "RAY_SERVE_CONTROLLER_METRICS_INCLUDE_HIGH_CARDINALITY_TAGS", + True, + ) + + assert _get_autoscaling_metrics_delay_tag_keys(("handle",)) == ( + "deployment", + "application", + "handle", + ) + assert _get_autoscaling_metrics_delay_tags( + DeploymentID(name="deployment", app_name="application"), + "handle", + "handle-id", + ) == { + "deployment": "deployment", + "application": "application", + "handle": "handle-id", + } + + +def test_autoscaling_metrics_delay_tags_exclude_high_cardinality( + monkeypatch, +): + from ray.serve._private import controller + + monkeypatch.setattr( + controller, + "RAY_SERVE_CONTROLLER_METRICS_INCLUDE_HIGH_CARDINALITY_TAGS", + False, + ) + + assert _get_autoscaling_metrics_delay_tag_keys(("replica",)) == ( + "deployment", + "application", + ) + assert _get_autoscaling_metrics_delay_tags( + DeploymentID(name="deployment", app_name="application"), + "replica", + "replica-id", + ) == { + "deployment": "deployment", + "application": "application", + } + + def create_app_config(name: str) -> ServeApplicationSchema: return ServeApplicationSchema( name=name, import_path=f"fake.{name}", route_prefix=f"/{name}" diff --git a/python/ray/serve/tests/unit/test_deployment_state.py b/python/ray/serve/tests/unit/test_deployment_state.py index 94c84c93505a..11f5d4c12e46 100644 --- a/python/ray/serve/tests/unit/test_deployment_state.py +++ b/python/ray/serve/tests/unit/test_deployment_state.py @@ -7,6 +7,10 @@ from ray._common.ray_constants import DEFAULT_MAX_CONCURRENCY_ASYNC from ray._raylet import NodeID +from ray.serve._private import ( + deployment_state as deployment_state_module, + replica as replica_module, +) from ray.serve._private.autoscaling_state import AutoscalingStateManager from ray.serve._private.common import ( RUNNING_REQUESTS_KEY, @@ -51,9 +55,15 @@ DeploymentVersion, ReplicaStartupStatus, ReplicaStateContainer, + _get_replica_lifecycle_metric_tag_keys, + _get_replica_lifecycle_metric_tags, ) from ray.serve._private.exceptions import DeploymentIsBeingDeletedError from ray.serve._private.long_poll import LongPollNamespace +from ray.serve._private.replica import ( + _get_replica_metric_default_tag_keys, + _get_replica_metric_default_tags, +) from ray.serve._private.test_utils import ( MockDeploymentActorWrapper, MockPlacementGroup, @@ -73,6 +83,77 @@ TEST_DEPLOYMENT_ID_2 = DeploymentID(name="test_deployment_2", app_name="test_app") +def test_replica_lifecycle_metric_tags_include_high_cardinality_by_default( + monkeypatch, +): + monkeypatch.setattr( + deployment_state_module, + "RAY_SERVE_CONTROLLER_METRICS_INCLUDE_HIGH_CARDINALITY_TAGS", + True, + ) + + assert _get_replica_lifecycle_metric_tag_keys() == ( + "deployment", + "replica", + "application", + ) + assert _get_replica_lifecycle_metric_tags("replica-id") == {"replica": "replica-id"} + + +def test_replica_lifecycle_metric_tags_exclude_high_cardinality(monkeypatch): + monkeypatch.setattr( + deployment_state_module, + "RAY_SERVE_CONTROLLER_METRICS_INCLUDE_HIGH_CARDINALITY_TAGS", + False, + ) + + assert _get_replica_lifecycle_metric_tag_keys() == ( + "deployment", + "application", + ) + assert _get_replica_lifecycle_metric_tags("replica-id") is None + + +def test_replica_metric_default_tags_include_high_cardinality_by_default( + monkeypatch, +): + monkeypatch.setattr( + replica_module, + "RAY_SERVE_CONTROLLER_METRICS_INCLUDE_HIGH_CARDINALITY_TAGS", + True, + ) + + assert _get_replica_metric_default_tag_keys(("exception_name",)) == ( + "deployment", + "replica", + "application", + "exception_name", + ) + assert _get_replica_metric_default_tags(TEST_DEPLOYMENT_ID, "replica-id") == { + "deployment": "test_deployment", + "application": "test_app", + "replica": "replica-id", + } + + +def test_replica_metric_default_tags_exclude_high_cardinality(monkeypatch): + monkeypatch.setattr( + replica_module, + "RAY_SERVE_CONTROLLER_METRICS_INCLUDE_HIGH_CARDINALITY_TAGS", + False, + ) + + assert _get_replica_metric_default_tag_keys(("exception_name",)) == ( + "deployment", + "application", + "exception_name", + ) + assert _get_replica_metric_default_tags(TEST_DEPLOYMENT_ID, "replica-id") == { + "deployment": "test_deployment", + "application": "test_app", + } + + def deployment_info( version: Optional[str] = None, num_replicas: Optional[int] = 1,