Skip to content

Commit 16eaed0

Browse files
authored
[Data] Replace on_exit hook with __ray_shutdown__ to fix UDF cleanup race condition (ray-project#61700)
## Description Replaces `_MapWorker.on_exit()` with `_MapWorker.__ray_shutdown__()` and removes the `DataContext._enable_actor_pool_on_exit_hook` workaround flag.. ### What changed and why: The old approach called `actor.on_exit.remote()` (a regular actor task) in _release_running_actor, then used `ray.wait(..., timeout=30s)` to block until the hook finished. This had two problems: - Opt-in only. The hook was gated behind `DataContext._enable_actor_pool_on_exit_hook`, which defaulted to `False`. UDF cleanup was silently skipped unless users knew to set the private flag. - Fault-tolerance race condition. Because `on_exit` was submitted as a regular task, a lineage-reconstruction retry could be routed to the same actor after `on_exit` had already deleted the UDF. This may cause the retried task to execute against a `None` UDF instance. ### The new approach: - Renames `on_exit()` to `__ray_shutdown__()` on `_MapWorker`, using Ray Core's native actor shutdown hook, which is called directly by the worker process before it exits. - Replaces `.options().remote()` with `._remote()` for actor task submission. `ActorMethod.options()` creates a `FuncWrapper` closure that captures the `ActorMethod` (and therefore the `ActorHandle`) in a closure cell, forming a reference cycle. This cycle prevents actor handles from being collected by reference counting alone, meaning `__ray_shutdown__` would never fire without explicit `gc.collect()`. Using `._remote()` directly avoids the `FuncWrapper` entirely, so actor handles are collected properly by reference counting once all strong references are dropped. - Relies on passive GC (reference counting) to trigger `__ray_shutdown__`. During graceful shutdown, the actor pool drops its references to actor handles in `_release_running_actor`. - UDF cleanup is now unconditional. `__ray_shutdown__` is always called on graceful actor exit with no flag, no timeout, and no explicit termination task. ### Removed: - `DataContext._enable_actor_pool_on_exit_hook` (the flag is no longer needed because cleanup is now zero-cost and unconditional). - `_MapWorker.on_exit()` (replaced by `__ray_shutdown__()`). - The on_exit_refs collection and `ray.wait()` call in _release_running_actors. - `_ActorPool._ACTOR_POOL_GRACEFUL_SHUTDOWN_TIMEOUT_S`. ## Related issues Related to ray-project#53249 and partially resolves ray-project#60453. ## Additional information The race condition in question from old `on_exit` approach: - Actor A is processing Task T. - `_release_running_actor` submits `actor.on_exit.remote()`; task added to actor's queue. - Task T fails and retry task is routed back to Actor A. - on_exit runs and deletes UDF. - Retry arrives and executes against `None` UDF, leading to crash. With the new approach: - `_release_running_actor` drops all pool references to the actor handle.. - Once `_data_tasks` are cleared during shutdown, the actor handle's refcount reaches zero and the actor exits gracefully. - Ray Core calls `__ray_shutdown__` directly in the worker process before exit, after all pending tasks complete. - `__ray_shutdown__` runs as part of the actor's exit sequence, guaranteed to be the last thing before the process terminates. No FIFO queuing issue (race conditions) because of this. The old `_enable_actor_pool_on_exit_hook` was a private, temporary workaround documented as having this race condition. It has been removed entirely as UDF cleanup is now unconditional and safe by default. Users who were setting `ctx._enable_actor_pool_on_exit_hook = True` will get the same behavior automatically with no code changes. --------- Signed-off-by: Sirui Huang <ray.huang@anyscale.com> Signed-off-by: HFFuture <ray.huang@anyscale.com>
1 parent f996fa0 commit 16eaed0

4 files changed

Lines changed: 25 additions & 52 deletions

File tree

python/ray/data/_internal/execution/operators/actor_pool_map_operator.py

Lines changed: 24 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,6 @@ def _create_actor_pool(
185185
create_actor_fn=self._start_actor,
186186
config=config,
187187
map_worker_cls_name=self._map_worker_cls_name,
188-
_enable_actor_pool_on_exit_hook=self.data_context._enable_actor_pool_on_exit_hook,
189188
)
190189

191190
def _create_actor_pool_config(
@@ -392,16 +391,17 @@ def _try_schedule_tasks_internal(self) -> int:
392391
)
393392
actor_task_args = dict(self._ray_actor_task_remote_args)
394393
extra_labels = actor_task_args.pop("_labels", None) or {}
395-
gen = actor.submit.options(
394+
395+
# Call _remote() directly instead of .options().remote() to
396+
# avoid the FuncWrapper closure in ActorMethod.options(), which
397+
# creates a reference cycle that prevents the ActorHandle from
398+
# being collected by reference counting alone.
399+
gen = actor.submit._remote(
400+
args=[self.data_context, ctx, *input_blocks],
401+
kwargs={"slices": bundle.slices, **self.get_map_task_kwargs()},
396402
num_returns="streaming",
397403
_labels={self._OPERATOR_ID_LABEL_KEY: self.id, **extra_labels},
398404
**actor_task_args,
399-
).remote(
400-
self.data_context,
401-
ctx,
402-
*input_blocks,
403-
slices=bundle.slices,
404-
**self.get_map_task_kwargs(),
405405
)
406406

407407
def _task_done_callback(actor_to_return):
@@ -700,12 +700,19 @@ def __repr__(self):
700700
# This can happen during actor restarts or initialization failures.
701701
return f"MapWorker({getattr(self, 'src_fn_name', '<initializing>')})"
702702

703-
def on_exit(self):
704-
"""Called when the actor is about to exist.
705-
This enables performing cleanup operations via `UDF.__del__`.
703+
def __ray_shutdown__(self):
704+
"""Called by Ray Core when the actor exits gracefully.
705+
706+
Triggered when all Python actor handles go out of scope and the handle
707+
is collected by reference counting.
708+
709+
During graceful shutdown, ActorPoolMapOperator clears _data_tasks and
710+
drops pool references so handles become collectible immediately.
711+
Ray Core guarantees this is called after all pending tasks complete
712+
and before the actor process exits.
706713
707-
Note, this only ensures cleanup is performed when the job exists gracefully.
708-
If the driver or the actor is forcefully killed, `__del__` will not be called.
714+
Note: this is NOT called if the actor is forcefully killed (e.g. via
715+
`ray.kill(actor)`) or crashes unexpectedly.
709716
"""
710717
# `_map_actor_context` is a global variable that references the UDF object.
711718
# Delete it to trigger `UDF.__del__`.
@@ -738,15 +745,13 @@ class _ActorPool(AutoscalingActorPool):
738745
"""
739746

740747
_ACTOR_POOL_SCALE_DOWN_DEBOUNCE_PERIOD_S = 10
741-
_ACTOR_POOL_GRACEFUL_SHUTDOWN_TIMEOUT_S = 30
742748

743749
def __init__(
744750
self,
745751
create_actor_fn: Callable[[Dict[str, str]], Tuple[ActorHandle, ObjectRef[Any]]],
746752
config: AutoscalingActorConfig,
747753
map_worker_cls_name: str = "MapWorker",
748754
debounce_period_s: int = _ACTOR_POOL_SCALE_DOWN_DEBOUNCE_PERIOD_S,
749-
_enable_actor_pool_on_exit_hook: bool = False,
750755
):
751756
"""Initialize the actor pool.
752757
@@ -760,13 +765,10 @@ def __init__(
760765
purposes.
761766
debounce_period_s: Debounce period for scaling down after scaling
762767
up.
763-
_enable_actor_pool_on_exit_hook: Whether to enable the actor pool
764-
on exit hook.
765768
"""
766769
super().__init__(config=config)
767770

768771
self._create_actor_fn = create_actor_fn
769-
self._enable_actor_pool_on_exit_hook = _enable_actor_pool_on_exit_hook
770772
self._map_worker_cls_name = map_worker_cls_name
771773
self._debounce_period_s = debounce_period_s
772774
# Timestamp of the last scale up action
@@ -1095,35 +1097,24 @@ def _release_pending_actors(self, force: bool):
10951097
def _release_running_actors(self, force: bool):
10961098
running = list(self._running_actors.keys())
10971099

1098-
on_exit_refs = []
1099-
1100-
# First release actors and collect their shutdown hook object-refs
11011100
for actor in running:
1102-
ref = self._release_running_actor(actor)
1103-
if ref:
1104-
on_exit_refs.append(ref)
1105-
1106-
# Wait for all actors to shutdown gracefully before killing them
1107-
ray.wait(on_exit_refs, timeout=self._ACTOR_POOL_GRACEFUL_SHUTDOWN_TIMEOUT_S)
1101+
self._release_running_actor(actor)
11081102

11091103
# NOTE: Actors can't be brought back after being ``ray.kill``-ed,
11101104
# hence we're only doing that if this is a forced release
11111105
if force:
11121106
for actor in running:
11131107
ray.kill(actor)
11141108

1115-
def _release_running_actor(self, actor: ActorHandle) -> Optional[ObjectRef]:
1116-
"""Remove the given actor from the pool and trigger its `on_exit` callback.
1117-
1118-
This method returns a ``ref`` to the result
1119-
"""
1109+
def _release_running_actor(self, actor: ActorHandle):
1110+
"""Remove the given actor from the pool by dropping all pool references."""
11201111
# NOTE: By default, we remove references to the actor and let ref counting
11211112
# garbage collect the actor, instead of using ray.kill.
11221113
#
11231114
# Otherwise, actor cannot be reconstructed for the purposes of produced
11241115
# object's lineage reconstruction.
11251116
if actor not in self._running_actors:
1126-
return None
1117+
return
11271118

11281119
# Update cached statistics before removing the actor
11291120
actor_state = self._running_actors[actor]
@@ -1139,17 +1130,9 @@ def _release_running_actor(self, actor: ActorHandle) -> Optional[ObjectRef]:
11391130
if actor_state.is_restarting:
11401131
self._num_restarting_actors -= 1
11411132

1142-
if self._enable_actor_pool_on_exit_hook:
1143-
# Call `on_exit` to trigger `UDF.__del__` which may perform
1144-
# cleanup operations.
1145-
ref = actor.on_exit.remote()
1146-
else:
1147-
ref = None
11481133
del self._running_actors[actor]
11491134
del self._actor_to_logical_id[actor]
11501135

1151-
return ref
1152-
11531136
def _rank_actors(
11541137
self,
11551138
actors: List[ActorHandle],

python/ray/data/context.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -723,12 +723,6 @@ class DataContext:
723723
override_object_store_memory_limit_fraction: float = None
724724
memory_usage_poll_interval_s: Optional[float] = 1
725725
dataset_logger_id: Optional[str] = None
726-
# This is a temporary workaround to allow actors to perform cleanup
727-
# until https://github.com/ray-project/ray/issues/53169 is fixed.
728-
# This hook is known to have a race condition bug in fault tolerance.
729-
# I.E., after the hook is triggered and the UDF is deleted, another
730-
# retry task may still be scheduled to this actor and it will fail.
731-
_enable_actor_pool_on_exit_hook: bool = False
732726

733727
issue_detectors_config: "IssueDetectorsConfiguration" = field(
734728
default_factory=_issue_detectors_config_factory

python/ray/data/tests/test_actor_pool_map_operator.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ def __init__(self, node_id: str = "node1"):
7676
def get_location(self) -> str:
7777
return self.node_id
7878

79-
def on_exit(self):
79+
def __ray_shutdown__(self):
8080
pass
8181

8282

@@ -170,7 +170,6 @@ def _create_actor_pool(
170170
create_actor_fn=self._create_actor_fn,
171171
map_worker_cls_name=map_worker_cls_name,
172172
config=config,
173-
_enable_actor_pool_on_exit_hook=False,
174173
)
175174
return pool
176175

@@ -805,7 +804,6 @@ def create_actor_fn(
805804
map_worker_cls_name="MapWorker(TestOp)",
806805
config=config,
807806
debounce_period_s=0,
808-
_enable_actor_pool_on_exit_hook=False,
809807
)
810808

811809
with caplog.at_level(logging.DEBUG, logger=logger_name):

python/ray/data/tests/test_map.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -888,8 +888,6 @@ def test_actor_udf_cleanup(
888888
"""Test that for the actor map operator, the UDF object is deleted properly."""
889889
ray.shutdown()
890890
ray.init(num_cpus=2)
891-
ctx = DataContext.get_current()
892-
ctx._enable_actor_pool_on_exit_hook = True
893891

894892
test_file = tmp_path / "test.txt"
895893

0 commit comments

Comments
 (0)