@@ -1461,9 +1461,14 @@ def evaluate(
14611461 kwargs = dict (algorithm = self , metrics_logger = self .metrics ),
14621462 )
14631463
1464+ eval_results : ResultDict = {}
14641465 env_steps = agent_steps = 0
14651466 batches = []
14661467
1468+ # If *all* configured remote eval EnvRunners are unhealthy,
1469+ # optionally wait for recovery before deciding to skip / raise.
1470+ self ._maybe_wait_for_eval_env_runner_recovery ()
1471+
14671472 # We will use a user provided evaluation function.
14681473 if self .config .custom_evaluation_function :
14691474 if self .config .enable_env_runner_and_connector_v2 :
@@ -1474,24 +1479,28 @@ def evaluate(
14741479 ) = self ._evaluate_with_custom_eval_function ()
14751480 else :
14761481 eval_results = self .config .custom_evaluation_function ()
1477- # There is no eval EnvRunnerGroup -> Run on local EnvRunner.
1482+ # No eval EnvRunnerGroup -> Run on (training) local EnvRunner.
14781483 elif self .eval_env_runner_group is None and self .env_runner :
14791484 (
14801485 eval_results ,
14811486 env_steps ,
14821487 agent_steps ,
14831488 batches ,
14841489 ) = self ._evaluate_on_local_env_runner (self .env_runner )
1485- # There is only a local eval EnvRunner -> Run on that .
1486- elif self .eval_env_runner_group .num_healthy_remote_workers () == 0 :
1490+ # 0 remote eval EnvRunners configured -> Run on the local eval EnvRunner .
1491+ elif self .eval_env_runner_group .num_remote_env_runners () == 0 :
14871492 (
14881493 eval_results ,
14891494 env_steps ,
14901495 agent_steps ,
14911496 batches ,
14921497 ) = self ._evaluate_on_local_env_runner (self .eval_env_runner )
1493- # There are healthy remote evaluation workers -> Run on these.
1498+ # Healthy remote evaluation workers -> Run on these.
14941499 elif self .eval_env_runner_group .num_healthy_remote_workers () > 0 :
1500+ # A successful eval iteration resets the consecutive-skip
1501+ # counter; this is what tells the algorithm "the failure
1502+ # was transient".
1503+ self ._counters ["num_consecutive_eval_no_workers_iterations" ] = 0
14951504 # Running in automatic duration mode (parallel with training step).
14961505 if self .config .evaluation_duration == "auto" :
14971506 assert parallel_train_future is not None
@@ -1509,9 +1518,25 @@ def evaluate(
15091518 agent_steps ,
15101519 batches ,
15111520 ) = self ._evaluate_with_fixed_duration ()
1512- # Can't find a good way to run this evaluation -> Wait for next iteration.
1521+ # No healthy remote eval EnvRunners. Increment the consecutive-
1522+ # skip counter; raise if it exceeds the configured threshold,
1523+ # otherwise skip evaluation for this iteration.
15131524 else :
1514- eval_results = {}
1525+ counter_key = "num_consecutive_eval_no_workers_iterations"
1526+ self ._counters [counter_key ] += 1
1527+ threshold = self .config .evaluation_error_after_n_consecutive_skips
1528+ if threshold is not None and self ._counters [counter_key ] >= threshold :
1529+ n_skips = self ._counters [counter_key ]
1530+ raise RuntimeError (
1531+ "All evaluation EnvRunners have been unhealthy for "
1532+ f"{ n_skips } consecutive evaluation iterations "
1533+ f"(threshold: { threshold } ). Set "
1534+ "`evaluation_error_after_n_consecutive_skips` to "
1535+ "None to skip indefinitely instead, or to a higher "
1536+ "number for more tolerance, and/or "
1537+ "`evaluation_unhealthy_workers_timeout_s` > 0 to "
1538+ "wait for recovery within each iteration."
1539+ )
15151540
15161541 if self .config .enable_env_runner_and_connector_v2 :
15171542 if log_once ("no_eval_results" ) and not self .metrics .peek (
@@ -1616,6 +1641,111 @@ def _evaluate_offline_on_local_runner(self):
16161641 key = (EVALUATION_RESULTS , OFFLINE_EVAL_RUNNER_RESULTS ),
16171642 )
16181643
1644+ def _maybe_wait_for_eval_env_runner_recovery (self ) -> None :
1645+ """Poll for at least one healthy eval EnvRunner if the user asked to.
1646+
1647+ When *all* configured remote eval EnvRunners are unhealthy, wait up
1648+ to `evaluation_unhealthy_workers_timeout_s` seconds for at least one
1649+ to come back before deciding to skip evaluation or raise (per
1650+ `evaluation_error_after_n_consecutive_skips`). If any worker
1651+ recovers during the wait, re-syncs current weights *and*
1652+ connector states / observation filters to it: the corresponding
1653+ syncs at the start of `evaluate()` were made before the wait and
1654+ skipped workers that were unhealthy then.
1655+ """
1656+ timeout_s = self .config .evaluation_unhealthy_workers_timeout_s
1657+ if not timeout_s or timeout_s <= 0 :
1658+ return
1659+ if self .eval_env_runner_group is None :
1660+ return
1661+ # Only relevant when remote workers were *configured* but are all
1662+ # unhealthy. `num_remote_env_runners() == 0` means the user asked
1663+ # for local-only eval; nothing to wait for.
1664+ if self .eval_env_runner_group .num_remote_env_runners () == 0 :
1665+ return
1666+ if self .eval_env_runner_group .num_healthy_remote_workers () > 0 :
1667+ return
1668+
1669+ start = time .monotonic ()
1670+ deadline = start + timeout_s
1671+ # Heartbeat every 60s so long waits show up in logs without spamming.
1672+ next_log = start + 60.0
1673+ logger .warning (
1674+ "All %d remote eval EnvRunner(s) are unhealthy; waiting up to "
1675+ "%.0fs for at least one to recover before "
1676+ "deciding to skip evaluation or raise (controlled by "
1677+ "`evaluation_error_after_n_consecutive_skips`)." ,
1678+ self .eval_env_runner_group .num_remote_env_runners (),
1679+ timeout_s ,
1680+ )
1681+ while (
1682+ self .eval_env_runner_group .num_healthy_remote_workers () == 0
1683+ and time .monotonic () < deadline
1684+ ):
1685+ # Actively ping unhealthy actors so the ActorManager can mark
1686+ # them healthy if Ray Core has restarted them since the last
1687+ # call. Without this poke, `num_healthy_remote_workers()`
1688+ # would stay stuck at 0 even after recovery. Cap the per-probe
1689+ # timeout to remaining wait time so a hanging actor can't push
1690+ # us past `evaluation_unhealthy_workers_timeout_s`.
1691+ remaining = deadline - time .monotonic ()
1692+ if remaining <= 0 :
1693+ break
1694+ self .eval_env_runner_group .probe_unhealthy_env_runners (
1695+ timeout_seconds = min (remaining , 1.0 ),
1696+ )
1697+ time .sleep (0.1 )
1698+ now = time .monotonic ()
1699+ if now >= next_log :
1700+ logger .warning (
1701+ "Still 0/%d eval EnvRunners healthy after %.0fs "
1702+ "(timeout %.0fs)." ,
1703+ self .eval_env_runner_group .num_remote_env_runners (),
1704+ now - start ,
1705+ timeout_s ,
1706+ )
1707+ next_log = now + 60.0
1708+
1709+ # If any workers recovered during the wait, push current weights
1710+ # *and* connector/filter state to them. The sync block at the
1711+ # start of `evaluate()` ran before this wait and only targeted
1712+ # workers that were healthy *then*; freshly-recovered workers
1713+ # were skipped. Without re-syncing, they would run eval with
1714+ # default/empty model weights *and* default/empty observation
1715+ # filters (or stale connector states on the v2 stack) -- both
1716+ # silently producing wrong eval metrics for one iteration.
1717+ if self .eval_env_runner_group .num_healthy_remote_workers () > 0 :
1718+ weights_src = (
1719+ self .learner_group
1720+ if self .config .enable_env_runner_and_connector_v2
1721+ else self .env_runner
1722+ )
1723+ self .eval_env_runner_group .sync_weights (
1724+ from_worker_or_learner_group = weights_src ,
1725+ inference_only = True ,
1726+ )
1727+ if self .config .enable_env_runner_and_connector_v2 :
1728+ if self .evaluation_config .broadcast_env_runner_states :
1729+ self .eval_env_runner_group .sync_env_runner_states (
1730+ config = self .evaluation_config ,
1731+ from_worker = self .env_runner ,
1732+ env_steps_sampled = self .metrics .peek (
1733+ (
1734+ ENV_RUNNER_RESULTS ,
1735+ NUM_ENV_STEPS_SAMPLED_LIFETIME ,
1736+ ),
1737+ default = 0 ,
1738+ ),
1739+ env_to_module = self .env_to_module_connector ,
1740+ module_to_env = self .module_to_env_connector ,
1741+ )
1742+ else :
1743+ self ._sync_filters_if_needed (
1744+ central_worker = self .env_runner_group .local_env_runner ,
1745+ workers = self .eval_env_runner_group ,
1746+ config = self .evaluation_config ,
1747+ )
1748+
16191749 def _evaluate_on_local_env_runner (self , env_runner ):
16201750 if hasattr (env_runner , "input_reader" ) and env_runner .input_reader is None :
16211751 raise ValueError (
0 commit comments