Skip to content

Commit 453923c

Browse files
[RLlib] Handle the all-evaluation-workers-unhealthy case uniformly across modes
When all *configured* remote evaluation EnvRunners are unhealthy at the start of an evaluation step, `Algorithm.evaluate()` previously did one of two thing: - `evaluation_parallel_to_training=True`: fall back to the local eval EnvRunner, which raises `ValueError: Cannot run on local evaluation worker parallel to training!`. Hard-crashes a long training run. - `evaluation_parallel_to_training=False`: silently fall back to the local eval EnvRunner. "Works" but the eval numbers are quietly produced by a different EnvRunner from the one the user configured, on the driver process, with potentially different perf and env settings. Both behaviors are gone. RLlib never silently falls back to local eval in the failure case anymore. Two new orthogonal config knobs on `AlgorithmConfig.evaluation()` control the behavior: - `evaluation_unhealthy_workers_timeout_s` (float, default 0): how long to wait for at least one remote evaluation EnvRunner to recover. - `evaluation_error_on_no_workers` (bool, default False): if still none after the wait, raise `RuntimeError` (True) or skip evaluation for this iteration (False). Both knobs apply uniformly regardless of `evaluation_parallel_to_training`. The intentional `evaluation_num_env_runners=0` case (user explicitly asked for local-only eval) is preserved -- this is not a fallback, it's the user's chosen configuration, and is recognized via `num_remote_env_runners() == 0`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent bbf5fef commit 453923c

4 files changed

Lines changed: 248 additions & 3 deletions

File tree

rllib/BUILD.bazel

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1685,6 +1685,18 @@ py_test(
16851685
deps = [":conftest"],
16861686
)
16871687

1688+
py_test(
1689+
name = "test_eval_workers_all_unhealthy",
1690+
size = "medium",
1691+
srcs = ["algorithms/tests/test_eval_workers_all_unhealthy.py"],
1692+
tags = [
1693+
"algorithms",
1694+
"exclusive",
1695+
"team:rllib",
1696+
],
1697+
deps = [":conftest"],
1698+
)
1699+
16881700
# Specific Algorithms
16891701

16901702
# APPO

rllib/algorithms/algorithm.py

Lines changed: 79 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1461,9 +1461,14 @@ def evaluate(
14611461
kwargs=dict(algorithm=self, metrics_logger=self.metrics),
14621462
)
14631463

1464+
eval_results: ResultDict = {}
14641465
env_steps = agent_steps = 0
14651466
batches = []
14661467

1468+
# If *all* configured remote eval EnvRunners are unhealthy,
1469+
# optionally wait for recovery before deciding to skip / raise.
1470+
self._maybe_wait_for_eval_env_runner_recovery()
1471+
14671472
# We will use a user provided evaluation function.
14681473
if self.config.custom_evaluation_function:
14691474
if self.config.enable_env_runner_and_connector_v2:
@@ -1474,22 +1479,39 @@ def evaluate(
14741479
) = self._evaluate_with_custom_eval_function()
14751480
else:
14761481
eval_results = self.config.custom_evaluation_function()
1477-
# There is no eval EnvRunnerGroup -> Run on local EnvRunner.
1482+
# There is no eval EnvRunnerGroup -> Run on (training) local
1483+
# EnvRunner.
14781484
elif self.eval_env_runner_group is None and self.env_runner:
14791485
(
14801486
eval_results,
14811487
env_steps,
14821488
agent_steps,
14831489
batches,
14841490
) = self._evaluate_on_local_env_runner(self.env_runner)
1485-
# There is only a local eval EnvRunner -> Run on that.
1486-
elif self.eval_env_runner_group.num_healthy_remote_workers() == 0:
1491+
# User intentionally configured 0 remote eval EnvRunners
1492+
# (`evaluation_num_env_runners=0`) -> Run on the local eval
1493+
# EnvRunner. NB: this is *not* the failure-case fallback; that
1494+
# path is handled by `evaluation_error_on_no_workers` below.
1495+
elif self.eval_env_runner_group.num_remote_env_runners() == 0:
14871496
(
14881497
eval_results,
14891498
env_steps,
14901499
agent_steps,
14911500
batches,
14921501
) = self._evaluate_on_local_env_runner(self.eval_env_runner)
1502+
# Configured remote eval EnvRunners but *none* are healthy
1503+
# and the user asked us to raise rather than skip.
1504+
elif (
1505+
self.eval_env_runner_group.num_healthy_remote_workers() == 0
1506+
and self.config.evaluation_error_on_no_workers
1507+
):
1508+
raise RuntimeError(
1509+
"All evaluation EnvRunners are unhealthy. Set "
1510+
"`evaluation_error_on_no_workers=False` (default) to "
1511+
"skip evaluation for this iteration instead of raising, "
1512+
"and/or `evaluation_unhealthy_workers_timeout_s` > 0 to "
1513+
"wait for recovery before deciding."
1514+
)
14931515
# There are healthy remote evaluation workers -> Run on these.
14941516
elif self.eval_env_runner_group.num_healthy_remote_workers() > 0:
14951517
# Running in automatic duration mode (parallel with training step).
@@ -1616,6 +1638,60 @@ def _evaluate_offline_on_local_runner(self):
16161638
key=(EVALUATION_RESULTS, OFFLINE_EVAL_RUNNER_RESULTS),
16171639
)
16181640

1641+
def _maybe_wait_for_eval_env_runner_recovery(self) -> None:
1642+
"""Poll for at least one healthy eval EnvRunner if the user asked to.
1643+
1644+
When *all* configured remote eval EnvRunners are unhealthy, wait up
1645+
to `evaluation_unhealthy_workers_timeout_s` seconds for at least one
1646+
to come back before deciding to skip evaluation or raise (per
1647+
`evaluation_error_on_no_workers`).
1648+
"""
1649+
timeout_s = self.config.evaluation_unhealthy_workers_timeout_s
1650+
if not timeout_s or timeout_s <= 0:
1651+
return
1652+
if self.eval_env_runner_group is None:
1653+
return
1654+
# Only relevant when remote workers were *configured* but are all
1655+
# unhealthy. `num_remote_env_runners() == 0` means the user asked
1656+
# for local-only eval; nothing to wait for.
1657+
if self.eval_env_runner_group.num_remote_env_runners() == 0:
1658+
return
1659+
if self.eval_env_runner_group.num_healthy_remote_workers() > 0:
1660+
return
1661+
1662+
start = time.monotonic()
1663+
deadline = start + timeout_s
1664+
# Heartbeat every 60s so long waits show up in logs without spamming.
1665+
next_log = start + 60.0
1666+
logger.warning(
1667+
"All %d remote eval EnvRunner(s) are unhealthy; waiting up to "
1668+
"%.0fs for at least one to recover before "
1669+
"deciding to skip evaluation or raise (controlled by "
1670+
"`evaluation_error_on_no_workers`).",
1671+
self.eval_env_runner_group.num_remote_env_runners(),
1672+
timeout_s,
1673+
)
1674+
while (
1675+
self.eval_env_runner_group.num_healthy_remote_workers() == 0
1676+
and time.monotonic() < deadline
1677+
):
1678+
# Actively ping unhealthy actors so the ActorManager can mark
1679+
# them healthy if Ray Core has restarted them since the last
1680+
# call. Without this poke, `num_healthy_remote_workers()`
1681+
# would stay stuck at 0 even after recovery.
1682+
self.eval_env_runner_group.probe_unhealthy_env_runners()
1683+
time.sleep(0.1)
1684+
now = time.monotonic()
1685+
if now >= next_log:
1686+
logger.warning(
1687+
"Still 0/%d eval EnvRunners healthy after %.0fs "
1688+
"(timeout %.0fs).",
1689+
self.eval_env_runner_group.num_remote_env_runners(),
1690+
now - start,
1691+
timeout_s,
1692+
)
1693+
next_log = now + 60.0
1694+
16191695
def _evaluate_on_local_env_runner(self, env_runner):
16201696
if hasattr(env_runner, "input_reader") and env_runner.input_reader is None:
16211697
raise ValueError(

rllib/algorithms/algorithm_config.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -520,6 +520,15 @@ def __init__(self, algo_class: Optional[type] = None):
520520
self.evaluation_auto_duration_min_env_steps_per_sample = 100
521521
self.evaluation_auto_duration_max_env_steps_per_sample = 2000
522522
self.evaluation_parallel_to_training = False
523+
# How long to wait for at least one remote eval EnvRunner to recover
524+
# when all *configured* remote eval EnvRunners are unhealthy at the
525+
# start of an evaluation step. Default 0: don't wait.
526+
self.evaluation_unhealthy_workers_timeout_s = 0.0
527+
# If still no eval EnvRunners are healthy after the wait above,
528+
# raise a clear `RuntimeError` (True) or skip evaluation for this
529+
# iteration (False, default). Applies regardless of
530+
# `evaluation_parallel_to_training`.
531+
self.evaluation_error_on_no_workers = False
523532
self.evaluation_force_reset_envs_before_iteration = True
524533
self.evaluation_config = None
525534
self.off_policy_estimation_methods = {}
@@ -2744,6 +2753,8 @@ def evaluation(
27442753
evaluation_auto_duration_max_env_steps_per_sample: Optional[int] = NotProvided,
27452754
evaluation_sample_timeout_s: Optional[float] = NotProvided,
27462755
evaluation_parallel_to_training: Optional[bool] = NotProvided,
2756+
evaluation_unhealthy_workers_timeout_s: Optional[float] = NotProvided,
2757+
evaluation_error_on_no_workers: Optional[bool] = NotProvided,
27472758
evaluation_force_reset_envs_before_iteration: Optional[bool] = NotProvided,
27482759
evaluation_config: Optional[
27492760
Union["AlgorithmConfig", PartialAlgorithmConfigDict]
@@ -2827,6 +2838,20 @@ def evaluation(
28272838
reports a good evaluation `episode_return_mean`, be aware that these
28282839
results were achieved on the weights trained in iteration 41, so you
28292840
should probably pick the iteration 41 checkpoint instead.
2841+
evaluation_unhealthy_workers_timeout_s: How long (in seconds) to
2842+
wait for at least one remote eval EnvRunner to recover when
2843+
all *configured* remote eval EnvRunners are unhealthy at the
2844+
start of an evaluation step. Default 0: don't wait. Combine
2845+
with `evaluation_error_on_no_workers` to choose what happens
2846+
if recovery doesn't arrive in time. Applies regardless of
2847+
`evaluation_parallel_to_training`.
2848+
evaluation_error_on_no_workers: If still no remote eval
2849+
EnvRunners are healthy after waiting
2850+
`evaluation_unhealthy_workers_timeout_s` seconds, raise a
2851+
clear `RuntimeError` (True) or skip evaluation for this
2852+
iteration (False, default). Has no effect if
2853+
`evaluation_num_env_runners=0` (in which case local eval is
2854+
the user's intentional choice).
28302855
evaluation_force_reset_envs_before_iteration: Whether all environments
28312856
should be force-reset (even if they are not done yet) right before
28322857
the evaluation step of the iteration begins. Setting this to True
@@ -3000,6 +3025,12 @@ def evaluation(
30003025
self.evaluation_sample_timeout_s = evaluation_sample_timeout_s
30013026
if evaluation_parallel_to_training is not NotProvided:
30023027
self.evaluation_parallel_to_training = evaluation_parallel_to_training
3028+
if evaluation_unhealthy_workers_timeout_s is not NotProvided:
3029+
self.evaluation_unhealthy_workers_timeout_s = (
3030+
evaluation_unhealthy_workers_timeout_s
3031+
)
3032+
if evaluation_error_on_no_workers is not NotProvided:
3033+
self.evaluation_error_on_no_workers = evaluation_error_on_no_workers
30033034
if evaluation_force_reset_envs_before_iteration is not NotProvided:
30043035
self.evaluation_force_reset_envs_before_iteration = (
30053036
evaluation_force_reset_envs_before_iteration
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
"""Tests for evaluating when all configured remote eval EnvRunners are
2+
unhealthy.
3+
4+
The behavior is controlled by two orthogonal config knobs:
5+
6+
- ``evaluation_unhealthy_workers_timeout_s``: how long to wait for at
7+
least one eval EnvRunner to recover before deciding what to do
8+
(default 0: don't wait).
9+
- ``evaluation_error_on_no_workers``: if still no healthy eval EnvRunners
10+
after the wait, raise ``RuntimeError`` (True) or skip evaluation for
11+
this iteration (False, default).
12+
13+
Both apply identically regardless of ``evaluation_parallel_to_training``.
14+
"""
15+
import time
16+
17+
import pytest
18+
19+
import ray
20+
from ray.rllib.algorithms.ppo import PPOConfig
21+
22+
23+
@pytest.fixture(params=[True, False], ids=["parallel", "sequential"])
24+
def parallel_to_training(request):
25+
return request.param
26+
27+
28+
def _algo_with_killed_eval_workers(
29+
*,
30+
timeout_s=0,
31+
error_on_no_workers=False,
32+
parallel_to_training=True,
33+
):
34+
"""Build a PPO algo, kill every eval worker, mark them all unhealthy.
35+
36+
Returns the live ``Algorithm`` ready for an ``algo.evaluate()`` call.
37+
38+
Config is the smallest that exercises the failure path: no remote
39+
training EnvRunners, 1 (single) remote eval EnvRunner that we kill,
40+
fixed-duration eval so we can call `evaluate()` directly without a
41+
parallel-training future.
42+
"""
43+
config = (
44+
PPOConfig()
45+
.environment("CartPole-v1")
46+
# Local-only training: skips remote train EnvRunner setup.
47+
.env_runners(num_env_runners=0)
48+
.evaluation(
49+
# 1 eval worker is enough; killing it leaves 0 healthy, which
50+
# is the condition we're testing.
51+
evaluation_num_env_runners=1,
52+
evaluation_interval=1,
53+
evaluation_parallel_to_training=parallel_to_training,
54+
# Fixed duration so `evaluate()` doesn't need a parallel-train
55+
# future (avoids the `auto` branch's `assert future is not None`).
56+
evaluation_duration=1,
57+
evaluation_duration_unit="episodes",
58+
evaluation_unhealthy_workers_timeout_s=timeout_s,
59+
evaluation_error_on_no_workers=error_on_no_workers,
60+
)
61+
.fault_tolerance(restart_failed_env_runners=False)
62+
)
63+
algo = config.build()
64+
65+
# Kill the eval worker and mark it unhealthy. Mirrors losing every
66+
# eval node at once.
67+
eval_grp = algo.eval_env_runner_group
68+
for a in list(eval_grp._worker_manager._actors.values()):
69+
ray.kill(a, no_restart=True)
70+
for actor_id in list(eval_grp._worker_manager.actor_ids()):
71+
eval_grp._worker_manager.set_actor_state(actor_id, healthy=False)
72+
assert eval_grp.num_healthy_remote_workers() == 0
73+
return algo
74+
75+
76+
def test_default_skips_eval_silently(parallel_to_training):
77+
"""Defaults (timeout=0, error=False): evaluate() must return cleanly
78+
even though every eval worker is dead -- it just skips."""
79+
algo = _algo_with_killed_eval_workers(parallel_to_training=parallel_to_training)
80+
algo.evaluate() # must not raise
81+
82+
83+
def test_error_on_no_workers_raises(parallel_to_training):
84+
"""error_on_no_workers=True with no recovery: evaluate() must raise a
85+
clear, actionable error rather than silently skipping."""
86+
algo = _algo_with_killed_eval_workers(
87+
error_on_no_workers=True, parallel_to_training=parallel_to_training
88+
)
89+
with pytest.raises(RuntimeError, match="evaluation_error_on_no_workers"):
90+
algo.evaluate()
91+
92+
93+
def test_timeout_waits_then_skips_when_no_recovery(parallel_to_training):
94+
"""timeout_s>0 with workers that never come back: evaluate() should
95+
take at least roughly that long (waiting for recovery), then skip
96+
silently because error_on_no_workers defaults to False."""
97+
timeout_s = 0.5
98+
algo = _algo_with_killed_eval_workers(
99+
timeout_s=timeout_s, parallel_to_training=parallel_to_training
100+
)
101+
start = time.monotonic()
102+
algo.evaluate() # must not raise
103+
elapsed = time.monotonic() - start
104+
assert elapsed >= timeout_s
105+
106+
107+
def test_timeout_then_error_when_no_recovery(parallel_to_training):
108+
"""timeout_s>0 + error_on_no_workers=True with no recovery: wait the
109+
timeout, then raise."""
110+
timeout_s = 0.3
111+
algo = _algo_with_killed_eval_workers(
112+
timeout_s=timeout_s,
113+
error_on_no_workers=True,
114+
parallel_to_training=parallel_to_training,
115+
)
116+
start = time.monotonic()
117+
with pytest.raises(RuntimeError, match="evaluation_error_on_no_workers"):
118+
algo.evaluate()
119+
elapsed = time.monotonic() - start
120+
assert elapsed >= timeout_s
121+
122+
123+
if __name__ == "__main__":
124+
import sys
125+
126+
sys.exit(pytest.main(["-v", __file__]))

0 commit comments

Comments
 (0)