Skip to content

Commit 3d785b2

Browse files
authored
Merge branch 'master' into core/fix-oom-threshold-message
2 parents 4a4453e + b3eb203 commit 3d785b2

17 files changed

Lines changed: 656 additions & 59 deletions

File tree

doc/source/serve/model-multiplexing.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,20 @@ When using model composition, you can send requests from an upstream deployment
8585
:end-before: __serve_model_composition_example_end__
8686
```
8787

88+
## Configuring model ID matching timeout
89+
90+
When a request arrives with a `serve_multiplexed_model_id`, the Serve router attempts to match it to a replica that already has the model loaded. If no matching replica becomes available within the timeout, the request falls back to the default routing strategy and is sent to any available replica, which then loads the model on demand.
91+
92+
You can configure this timeout using the `RAY_SERVE_MULTIPLEXED_MODEL_ID_MATCHING_TIMEOUT_S` environment variable:
93+
94+
```bash
95+
export RAY_SERVE_MULTIPLEXED_MODEL_ID_MATCHING_TIMEOUT_S=2.0
96+
```
97+
98+
**Default**: `1.0` second. To avoid thundering herd problems when many requests for the same unloaded model arrive concurrently, the actual timeout is randomized between this value and `value * 2` (for example, 1.0–2.0 seconds by default).
99+
100+
Increase this timeout if your models take a long time to load and you prefer to wait for a replica that already has the model loaded. Decrease it if you prefer faster fallback to any available replica.
101+
88102
## Using model multiplexing with batching
89103

90104
You can combine model multiplexing with the `@serve.batch` decorator for efficient batched inference. When you use both features together, Ray Serve automatically splits batches by model ID to ensure each batch contains only requests for the same model. This prevents issues where a single batch would contain requests targeting different models.

doc/source/serve/monitoring.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -571,6 +571,24 @@ Prometheus histograms aggregate data into predefined buckets, which can affect t
571571
For accurate percentile calculations, configure bucket boundaries that closely match your expected latency distribution. For example, if most requests complete in 10-100ms, use finer-grained buckets in that range.
572572
:::
573573

574+
### Metrics export interval
575+
576+
By default, Ray Serve batches its in-process metric updates (counters, gauges, histograms recorded by the router and replica) to reduce per-request overhead. You can configure how often Serve flushes these batched updates to the Ray metrics API using the `RAY_SERVE_METRICS_EXPORT_INTERVAL_MS` environment variable:
577+
578+
```bash
579+
export RAY_SERVE_METRICS_EXPORT_INTERVAL_MS=500
580+
```
581+
582+
**Default**: `100` milliseconds. Set to `0` to disable batching entirely and record every metric update eagerly. This interval applies to both the router and replica metric pipelines.
583+
584+
Increasing this value reduces the overhead of recording metrics at the cost of less frequent updates. Decreasing it provides more up-to-date values but increases recording frequency.
585+
586+
:::{note}
587+
`RAY_SERVE_METRICS_EXPORT_INTERVAL_MS` only controls Serve-side batching; it does **not** change how often Ray exports metrics to the Prometheus scrape endpoint. That is controlled separately by Ray Core's `metrics_report_interval_ms` system config (default `10000` ms), which determines how often each Ray process pushes its metrics to the metrics agent that Prometheus scrapes.
588+
589+
The two settings compose: a Serve metric update is first buffered in the router/replica for up to `RAY_SERVE_METRICS_EXPORT_INTERVAL_MS`, then made available at the Prometheus endpoint on the next Ray Core export tick (`metrics_report_interval_ms`). Lowering only `RAY_SERVE_METRICS_EXPORT_INTERVAL_MS` without also lowering `metrics_report_interval_ms` does not make metrics appear in Prometheus any sooner. To change the Ray Core interval, pass it via system config when starting Ray, e.g. `ray start --head --system-config='{"metrics_report_interval_ms": 1000}'`.
590+
:::
591+
574592
### Request lifecycle and metrics
575593

576594
The following diagram shows where metrics are captured along the request path:

doc/source/serve/production-guide/fault-tolerance.md

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -686,5 +686,34 @@ Table:
686686

687687
Note that the PID for the first ProxyActor has changed, indicating that it restarted.
688688

689+
## Environment variables
690+
691+
These environment variables control fault tolerance-related behavior. Set them before starting Ray.
692+
693+
### `RAY_SERVE_KV_TIMEOUT_S`
694+
695+
**Default**: None (no timeout)
696+
697+
Ray Serve persists deployment configurations and state in the Global Control Store (GCS) using its internal KV interface. Each read and write to the GCS KV store uses this timeout. By default, no timeout is set and these operations block until the GCS responds. If the GCS becomes unavailable (for example, during a head node restart), Serve operations that depend on the KV store — such as fetching or updating deployment configs — hang until the GCS recovers.
698+
699+
Setting this value causes those operations to fail fast with a timeout error instead of blocking indefinitely, allowing Serve to detect GCS failures and trigger recovery sooner.
700+
701+
```bash
702+
export RAY_SERVE_KV_TIMEOUT_S=5
703+
```
704+
705+
### `LISTEN_FOR_CHANGE_REQUEST_TIMEOUT_S_LOWER_BOUND` / `LISTEN_FOR_CHANGE_REQUEST_TIMEOUT_S_UPPER_BOUND`
706+
707+
**Defaults**: `30` / `60` seconds
708+
709+
Ray Serve uses a long-polling mechanism for replicas and proxies to receive configuration updates from the controller. Each long-poll request uses a random timeout between the lower and upper bounds to avoid thundering herd problems when many clients poll simultaneously.
710+
711+
```bash
712+
export LISTEN_FOR_CHANGE_REQUEST_TIMEOUT_S_LOWER_BOUND=10
713+
export LISTEN_FOR_CHANGE_REQUEST_TIMEOUT_S_UPPER_BOUND=30
714+
```
715+
716+
Decreasing these values makes replicas and proxies detect controller changes faster, which can speed up recovery after controller restarts. Increasing them reduces the frequency of long-poll requests to the controller.
717+
689718
[KubeRay]: kuberay-index
690719
[external storage namespace]: kuberay-external-storage-namespace

python/ray/data/_internal/arrow_ops/transform_pyarrow.py

Lines changed: 47 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -75,28 +75,66 @@ def _create_empty_table(schema: "pyarrow.Schema"):
7575
return pa.table(arrays, schema=schema)
7676

7777

78+
def _has_unhashable_pandas_types(schema: "pyarrow.Schema") -> bool:
79+
"""Check if any column type becomes unhashable after to_pandas() conversion.
80+
81+
Nested PyArrow types (struct/list/large_list/fixed_size_list/map/union and
82+
their view variants) convert to Python dicts/lists, and Ray's tensor and
83+
Python-object extension types convert to numpy arrays / Python objects.
84+
None of these are hashable by pandas' hash_pandas_object. We check the
85+
schema upfront so the hash algorithm choice is deterministic per schema,
86+
not per block data.
87+
"""
88+
from ray.data._internal.object_extensions.arrow import ArrowPythonObjectType
89+
90+
tensor_types = get_arrow_extension_tensor_types()
91+
for field in schema:
92+
# `is_nested` covers struct/list/large_list/map/union and (on pyarrow
93+
# 16+) list_view/large_list_view. It does NOT include fixed_size_list
94+
# on older pyarrow (<10-ish), so check that explicitly.
95+
if pyarrow.types.is_nested(field.type) or pyarrow.types.is_fixed_size_list(
96+
field.type
97+
):
98+
return True
99+
if isinstance(field.type, tensor_types):
100+
return True
101+
if isinstance(field.type, ArrowPythonObjectType):
102+
return True
103+
return False
104+
105+
78106
def _hash_partition(
79107
table: "pyarrow.Table",
80108
num_partitions: int,
81109
) -> np.ndarray:
82-
83-
# NOTE: We special casing-scenario of single column with integer type
110+
# NOTE: We special-case single column with integer type,
84111
# short-circuiting the need for hashing the column and instead
85-
# using values as is for partitioning
112+
# using values as-is for partitioning.
86113
if len(table.columns) == 1 and pyarrow.types.is_integer(table.column(0).type):
87114
target_column = table.column(0)
88115
partitions = (target_column.to_numpy() % num_partitions).astype(np.int64)
89-
else:
90-
# Otherwise fallback to invoking __hash__ on Pyarrow scalars filling out
91-
# target table
116+
elif _has_unhashable_pandas_types(table.schema):
117+
# Struct/list/map columns become dicts/lists in pandas, which are
118+
# unhashable. Use row-by-row hashing on PyArrow scalars instead.
92119
partitions = np.zeros((table.num_rows,), dtype=np.int64)
93-
94120
for i in range(table.num_rows):
95121
_tuple = tuple(c[i] for c in table.columns)
96122
partitions[i] = hash(_tuple) % num_partitions
123+
else:
124+
# Use pandas' vectorized hash (xxhash-based) instead of a Python
125+
# row-by-row loop.
126+
import pandas as pd
127+
128+
# Use types_mapper=pd.ArrowDtype to keep Arrow-backed extension arrays
129+
# in pandas. This avoids int64 -> float64 promotion for nullable integer
130+
# columns, which would cause the same value to hash differently across
131+
# blocks depending on whether the block contains nulls.
132+
hashes = pd.util.hash_pandas_object(
133+
table.to_pandas(types_mapper=pd.ArrowDtype), index=False
134+
).values
135+
np.mod(hashes, num_partitions, out=hashes)
136+
partitions = hashes
97137

98-
# Convert to ndarray to compute hash partition indices
99-
# more efficiently
100138
return partitions
101139

102140

python/ray/data/_internal/execution/operators/zip_operator.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,9 +222,17 @@ def _zip(
222222
# cumulative number of rows as that left block.
223223
# NOTE: _split_at_indices has a no-op fastpath if the blocks are already
224224
# aligned.
225+
# Determine the ownership of the blocks being split, accounting for the
226+
# potential swap above. We must not free blocks that are shared with
227+
# other operators (e.g., when the input RefBundle has owns_blocks=False
228+
# because it comes from a materialized dataset).
229+
split_side_owned = all(
230+
b.owns_blocks for b in (left_input if input_side_inverted else right_input)
231+
)
225232
aligned_right_blocks_with_metadata = _split_at_indices(
226233
right_blocks_with_metadata,
227234
indices,
235+
owned_by_consumer=split_side_owned,
228236
block_rows=right_block_rows,
229237
)
230238
del right_blocks_with_metadata

python/ray/data/_internal/split.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ def _generate_global_split_results(
247247
def _split_at_indices(
248248
blocks_with_metadata: List[Tuple[ObjectRef[Block], BlockMetadata]],
249249
indices: List[int],
250-
owned_by_consumer: bool = True,
250+
owned_by_consumer: bool,
251251
block_rows: List[int] = None,
252252
) -> Tuple[List[List[ObjectRef[Block]]], List[List[BlockMetadata]]]:
253253
"""Split blocks at the provided indices.

python/ray/data/tests/test_split.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -624,35 +624,35 @@ def test_generate_global_split_results(ray_start_regular_shared_2_cpus):
624624

625625
def test_private_split_at_indices(ray_start_regular_shared_2_cpus):
626626
inputs = _create_blocks_with_metadata([])
627-
splits = list(zip(*_split_at_indices(inputs, [0])))
627+
splits = list(zip(*_split_at_indices(inputs, [0], True)))
628628
verify_splits(splits, [[], []])
629629

630-
splits = list(zip(*_split_at_indices(inputs, [])))
630+
splits = list(zip(*_split_at_indices(inputs, [], True)))
631631
verify_splits(splits, [[]])
632632

633633
inputs = _create_blocks_with_metadata([[1], [2, 3], [4]])
634634

635-
splits = list(zip(*_split_at_indices(inputs, [1])))
635+
splits = list(zip(*_split_at_indices(inputs, [1], True)))
636636
verify_splits(splits, [[[1]], [[2, 3], [4]]])
637637

638638
inputs = _create_blocks_with_metadata([[1], [2, 3], [4]])
639-
splits = list(zip(*_split_at_indices(inputs, [2])))
639+
splits = list(zip(*_split_at_indices(inputs, [2], True)))
640640
verify_splits(splits, [[[1], [2]], [[3], [4]]])
641641

642642
inputs = _create_blocks_with_metadata([[1], [2, 3], [4]])
643-
splits = list(zip(*_split_at_indices(inputs, [1])))
643+
splits = list(zip(*_split_at_indices(inputs, [1], True)))
644644
verify_splits(splits, [[[1]], [[2, 3], [4]]])
645645

646646
inputs = _create_blocks_with_metadata([[1], [2, 3], [4]])
647-
splits = list(zip(*_split_at_indices(inputs, [2, 2])))
647+
splits = list(zip(*_split_at_indices(inputs, [2, 2], True)))
648648
verify_splits(splits, [[[1], [2]], [], [[3], [4]]])
649649

650650
inputs = _create_blocks_with_metadata([[1], [2, 3], [4]])
651-
splits = list(zip(*_split_at_indices(inputs, [])))
651+
splits = list(zip(*_split_at_indices(inputs, [], True)))
652652
verify_splits(splits, [[[1], [2, 3], [4]]])
653653

654654
inputs = _create_blocks_with_metadata([[1], [2, 3], [4]])
655-
splits = list(zip(*_split_at_indices(inputs, [0, 4])))
655+
splits = list(zip(*_split_at_indices(inputs, [0, 4], True)))
656656
verify_splits(splits, [[], [[1], [2, 3], [4]], []])
657657

658658

python/ray/data/tests/test_zip.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,41 @@ def foo(x):
153153
), result
154154

155155

156+
def test_zip_does_not_free_shared_materialized_blocks(ray_start_regular_shared):
157+
"""Regression test: ZipOperator should not free blocks from a materialized
158+
dataset that is shared with another consumer.
159+
160+
Previously, ZipOperator._zip() called _split_at_indices() without specifying
161+
owned_by_consumer, which defaulted to True. This caused ray.internal.free()
162+
to be called on blocks that were shared with other operators in the DAG,
163+
leading to ObjectFreedError.
164+
"""
165+
# Create a dataset with 3 blocks (rows [7, 7, 6]) and materialize it.
166+
# The materialized blocks have owns_blocks=False.
167+
ds = ray.data.range(20, override_num_blocks=3).materialize()
168+
assert not ds._plan.execute().owns_blocks
169+
170+
# Consumer 1: a map_batches that uses the same materialized dataset.
171+
mapped_ds = ds.map_batches(lambda batch: batch, batch_format="pandas")
172+
173+
# Consumer 2: zip the same materialized dataset with another dataset.
174+
# This triggers _split_at_indices inside ZipOperator._zip().
175+
# Use 2 blocks (rows [10, 10]) so that block boundaries are NOT aligned
176+
# with ds's blocks (rows [7, 7, 6]). This forces actual block splitting
177+
# (e.g., the first 10-row block gets split at row 7), which exercises
178+
# the owned_by_consumer code path in _split_all_blocks.
179+
other_ds = ray.data.range(20, override_num_blocks=2)
180+
zipped = other_ds.zip(ds)
181+
182+
# Consuming the zipped result should not raise ObjectFreedError.
183+
result = zipped.take_all()
184+
assert len(result) == 20
185+
186+
# The mapped_ds should also work fine (blocks not freed by the zip).
187+
result2 = mapped_ds.take_all()
188+
assert len(result2) == 20
189+
190+
156191
if __name__ == "__main__":
157192
import sys
158193

python/ray/data/tests/unit/test_transform_pyarrow.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@
66
import pandas as pd
77
import pyarrow as pa
88
import pytest
9+
from packaging.version import parse as parse_version
910

1011
from ray.data._internal.arrow_ops.transform_pyarrow import (
1112
MIN_PYARROW_VERSION_TYPE_PROMOTION,
1213
_align_struct_fields,
14+
_has_unhashable_pandas_types,
1315
concat,
1416
hash_partition,
1517
shuffle,
@@ -144,6 +146,78 @@ def _concat_and_sort_partitions(parts: Iterable[pa.Table]) -> pa.Table:
144146
assert t == _concat_and_sort_partitions(_structs_partition_dict.values())
145147

146148

149+
@pytest.mark.parametrize(
150+
"pa_type,expected",
151+
[
152+
# Nested types -> unhashable in pandas (convert to dict/list)
153+
(pa.struct([("a", pa.int32())]), True),
154+
(pa.list_(pa.int32()), True),
155+
(pa.large_list(pa.int32()), True),
156+
(pa.list_(pa.int32(), 3), True), # fixed_size_list
157+
(pa.map_(pa.string(), pa.int32()), True),
158+
(pa.dense_union([pa.field("x", pa.int32())]), True),
159+
# Ray extension types -> numpy arrays / arbitrary objects in pandas
160+
(ArrowTensorTypeV2((2, 2), pa.int64()), True),
161+
(ArrowPythonObjectType(), True),
162+
# Hashable primitives -> must stay False so we keep the fast path
163+
(pa.int32(), False),
164+
(pa.float64(), False),
165+
(pa.bool_(), False),
166+
(pa.string(), False),
167+
(pa.large_string(), False),
168+
(pa.binary(), False),
169+
(pa.decimal128(10, 2), False),
170+
(pa.date32(), False),
171+
(pa.timestamp("ns"), False),
172+
(pa.dictionary(pa.int32(), pa.string()), False),
173+
],
174+
)
175+
def test_has_unhashable_pandas_types(pa_type, expected):
176+
schema = pa.schema([("c", pa_type)])
177+
assert _has_unhashable_pandas_types(schema) is expected
178+
179+
180+
@pytest.mark.skipif(
181+
get_pyarrow_version() < parse_version("16.0.0"),
182+
reason="list_view / large_list_view require pyarrow 16+",
183+
)
184+
def test_has_unhashable_pandas_types_list_views():
185+
# Regression: list_view/large_list_view also convert to Python lists in
186+
# pandas, so they must be flagged as unhashable like list/large_list.
187+
for view_type in (pa.list_view(pa.int32()), pa.large_list_view(pa.int32())):
188+
schema = pa.schema([("c", view_type)])
189+
assert _has_unhashable_pandas_types(schema) is True
190+
191+
192+
def test_hash_partition_null_struct_consistent_across_blocks():
193+
struct_t = pa.struct([("v", pa.int32())])
194+
num_partitions = 8
195+
196+
all_null = pa.Table.from_pydict(
197+
{"k": pa.array([None, None, None], type=struct_t), "idx": [0, 1, 2]}
198+
)
199+
mixed = pa.Table.from_pydict(
200+
{
201+
"k": pa.array([None, {"v": 1}, None], type=struct_t),
202+
"idx": [10, 11, 12],
203+
}
204+
)
205+
206+
p1 = hash_partition(all_null, hash_cols=["k"], num_partitions=num_partitions)
207+
p2 = hash_partition(mixed, hash_cols=["k"], num_partitions=num_partitions)
208+
209+
def null_partition_id(parts):
210+
# Return the partition id holding null-key rows (there should be
211+
# exactly one — identical null keys must co-locate).
212+
null_pids = {
213+
pid for pid, tbl in parts.items() if any(tbl["k"].is_null().to_pylist())
214+
}
215+
assert len(null_pids) == 1, null_pids
216+
return next(iter(null_pids))
217+
218+
assert null_partition_id(p1) == null_partition_id(p2)
219+
220+
147221
def test_shuffle():
148222
t = pa.Table.from_pydict(
149223
{

python/ray/includes/gcs_client.pxi

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -639,9 +639,26 @@ cdef class InnerGcsClient:
639639
reason: int32_t,
640640
reason_message: c_string,
641641
deadline_timestamp_ms: int64_t):
642-
"""Send the DrainNode request to GCS.
642+
"""Send a DrainNode request to GCS to gracefully terminate a node.
643643
644-
This is only for testing.
644+
Used by the `ray drain-node` CLI command and by autoscaler v2's
645+
ray_stopper for idle and preemption-based node termination.
646+
647+
Args:
648+
node_id: Binary node ID of the target node.
649+
reason: A `DrainNodeReason` enum value. `IDLE_TERMINATION`
650+
requests are rejectable by the raylet; `PREEMPTION`
651+
requests are non-rejectable.
652+
reason_message: Human-readable explanation, used for
653+
observability.
654+
deadline_timestamp_ms: Timestamp (ms) when the node will be
655+
force-killed. Used as a hint so workloads can drain
656+
before the deadline.
657+
658+
Returns:
659+
Tuple of (is_accepted, rejection_reason_message). When
660+
`is_accepted` is False, `rejection_reason_message` describes
661+
why the raylet rejected the request.
645662
"""
646663
cdef:
647664
int64_t timeout_ms = -1

0 commit comments

Comments
 (0)