Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions rllib/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1933,6 +1933,28 @@ py_test(
deps = [":conftest"],
)

py_test(
name = "algorithms/tests/test_get_learner_bundles",
size = "small",
srcs = ["algorithms/tests/test_get_learner_bundles.py"],
tags = [
"algorithms",
"team:rllib",
],
deps = [":conftest"],
)

py_test(
name = "algorithms/tests/test_custom_resources_per_learner",
size = "small",
srcs = ["algorithms/tests/test_custom_resources_per_learner.py"],
tags = [
"algorithms",
"team:rllib",
],
deps = [":conftest"],
)

py_test(
name = "algorithms/tests/test_dependency_tf",
size = "small",
Expand Down
189 changes: 133 additions & 56 deletions rllib/algorithms/algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
_get_learner_bundles,
_get_main_process_bundle,
_get_offline_eval_runner_bundles,
compute_aggregator_bundle_indices,
get_learner_bundle_offset,
)
from ray.rllib.callbacks.utils import make_callback
from ray.rllib.connectors.agent.obs_preproc import ObsPreprocessorConnector
Expand Down Expand Up @@ -193,6 +195,7 @@
from ray.tune.trainable import Trainable
from ray.util import log_once
from ray.util.metrics import Counter, Histogram
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
from ray.util.timer import _Timer

if TYPE_CHECKING:
Expand Down Expand Up @@ -259,6 +262,7 @@ class Algorithm(Checkpointable, Trainable):
"model",
"optimizer",
"custom_resources_per_env_runner",
"custom_resources_per_learner",
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing custom_resources_per_aggregator_actor in _allow_unknown_subkeys

Medium Severity

custom_resources_per_learner was correctly added to _allow_unknown_subkeys, but custom_resources_per_aggregator_actor was not. This list governs whether arbitrary user-defined sub-keys (like {"my_resource": 0.5}) are accepted during dict-based config merges (used by deep_update in update_from_dict and Tune's param_space). Without it, users setting custom aggregator resources via dict config (common in Tune trials) will have those sub-keys rejected or silently dropped during config merging.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit d757cac. Configure here.

"custom_resources_per_worker",
"evaluation_config",
"exploration_config",
Expand Down Expand Up @@ -1069,70 +1073,118 @@ def setup(self, config: AlgorithmConfig) -> None:
spaces=self.spaces,
inference_only=False,
)
agg_cls = ray.remote(
num_cpus=1,
max_restarts=-1,
)(AggregatorActor)
self._aggregator_actor_manager = FaultTolerantActorManager(
[
agg_cls.remote(self.config, rl_module_spec)
for _ in range(
(self.config.num_learners or 1)
* self.config.num_aggregator_actors_per_learner
)
],
max_remote_requests_in_flight_per_actor=(
self.config.max_requests_in_flight_per_aggregator_actor
# Bundle index where learner bundles start in the trial PG.
# `get_learner_bundle_offset` is the shared source of truth
# with `default_resource_request` so the two can't drift.
learner_bundle_offset = get_learner_bundle_offset(
self.config,
self.evaluation_config,
has_eval=self._should_create_evaluation_env_runners(
self.evaluation_config
),
has_offline_eval=self._should_create_offline_evaluation_runners(
self.evaluation_config
),
)
# Get the devices of each learner.
learner_locations = list(
enumerate(
self.learner_group.foreach_learner(
func=lambda _learner: (_learner.node, _learner.device),
)
)
trial_pg = ray.util.get_current_placement_group()

# Wrap the actor class once; per-instance options go through
# `.options(...)` below.
agg_cls = ray.remote(AggregatorActor)
base_options = dict(
num_cpus=self.config.num_cpus_per_aggregator_actor,
max_restarts=-1,
)
# Get the devices of each AggregatorActor.
aggregator_locations = list(
enumerate(
self._aggregator_actor_manager.foreach_actor(
func=lambda actor: (actor._node, actor._device)
if self.config.custom_resources_per_aggregator_actor:
base_options[
"resources"
] = self.config.custom_resources_per_aggregator_actor

colocate = self.config.colocate_aggregator_actors_with_learners

# Co-location requires a placement group with bundles sized
# for the learner + aggregator. Tune trials always provide
# one; running Algorithm directly outside Tune does not. We
# only use PG-based scheduling (it survives actor restarts,
# which NodeAffinity does not). When the PG is missing,
# fall back to best-effort placement and warn the user --
# they can silence the warning by setting
# `colocate_aggregator_actors_with_learners=False`.
if colocate and trial_pg is None:
if log_once("aggregator_colocation_no_pg_fallback"):
logger.warning(
"`colocate_aggregator_actors_with_learners=True` "
"(the default) but no Tune placement group is "
"present (Algorithm is being built outside a Tune "
"trial). Falling back to best-effort aggregator "
"placement -- aggregators may not land on their "
"Learner's node, incurring cross-node MABatch "
"transfers. Launch this Algorithm via "
"`tune.Tuner(...)` to get strict co-location, or "
"set "
"`colocate_aggregator_actors_with_learners=False` "
"to silence this warning."
)
colocate = False

# `num_learners == 0` still produces one (local) Learner.
n_learners = self.config.num_learners or 1
n_agg_per_learner = self.config.num_aggregator_actors_per_learner

# Resolve `learner_idx -> bundle index list` *after* the
# LearnerGroup has placed its workers. The bridge between
# Ray Train's sorted Learner order and our aggregator
# pinning is in `compute_aggregator_bundle_indices`.
learner_idx_to_bundles = {}
if colocate:
learner_node_ids = [
rc.get()
for rc in self.learner_group.foreach_learner(
func=lambda _l: ray.get_runtime_context().get_node_id()
)
]
bundles_to_node = ray.util.placement_group_table(trial_pg).get(
"bundles_to_node_id", {}
)
)
learner_idx_to_bundles = compute_aggregator_bundle_indices(
num_learners=self.config.num_learners,
num_aggregator_actors_per_learner=n_agg_per_learner,
learner_bundle_offset=learner_bundle_offset,
learner_node_ids=learner_node_ids,
bundles_to_node=bundles_to_node,
)

agg_handles = []
self._aggregator_actor_to_learner = {}
for agg_idx, aggregator_location in aggregator_locations:
aggregator_location = aggregator_location.get()
for learner_idx, learner_location in learner_locations:
# TODO (sven): Activate full comparison (including device) when Ray
# has figured out GPU pre-loading.
if learner_location.get()[0] == aggregator_location[0]:
# Round-robin, in case all Learners are on same device/node.
learner_locations = learner_locations[1:] + [
learner_locations[0]
]
self._aggregator_actor_to_learner[agg_idx] = learner_idx
break
if agg_idx not in self._aggregator_actor_to_learner:
raise RuntimeError(
"No Learner worker found that matches aggregation worker "
f"#{agg_idx}'s node ({aggregator_location[0]}) and device "
f"({aggregator_location[1]})! The Learner workers' locations "
f"are {learner_locations}."
for learner_idx in range(n_learners):
bundle_indices = learner_idx_to_bundles.get(learner_idx, [])
for j in range(n_agg_per_learner):
per_instance_options = dict(base_options)
if colocate and j < len(bundle_indices):
# Pin to a bundle on the Learner's actual node.
per_instance_options[
"scheduling_strategy"
] = PlacementGroupSchedulingStrategy(
placement_group=trial_pg,
placement_group_bundle_index=bundle_indices[j],
)
# else: `colocate=False` (or no matching bundle on the
# Learner's node, which can only happen if PG layout
# diverges from what we computed). No scheduling
# hint -- Ray's default behaviour applies.

handle = agg_cls.options(**per_instance_options).remote(
self.config, rl_module_spec
)
self._aggregator_actor_to_learner[len(agg_handles)] = learner_idx
agg_handles.append(handle)
Comment thread
cursor[bot] marked this conversation as resolved.

# Make sure, each Learner index is mapped to from at least one
# AggregatorActor.
if not all(
learner_idx in self._aggregator_actor_to_learner.values()
for learner_idx in range(self.config.num_learners or 1)
):
raise RuntimeError(
"Some Learner indices are not mapped to from any AggregatorActors! "
"Final AggregatorActor idx -> Learner idx mapping is: "
f"{self._aggregator_actor_to_learner}"
)
self._aggregator_actor_manager = FaultTolerantActorManager(
agg_handles,
max_remote_requests_in_flight_per_actor=(
self.config.max_requests_in_flight_per_aggregator_actor
),
)

# Ray metrics
self._set_up_metrics()
Expand Down Expand Up @@ -3385,6 +3437,31 @@ def default_resource_request(
+ learner_bundles
)

# Defensive: `Algorithm.setup` later pins aggregator actors to
# specific bundle indices in this PG via the offset computed by
# `get_learner_bundle_offset`. If the bundle layout above ever
# drifts from that helper's assumed layout, aggregator pinning
# would silently land on the wrong bundle. Catch it here.
expected_learner_bundle_start = get_learner_bundle_offset(
config,
eval_config,
has_eval=cls._should_create_evaluation_env_runners(eval_config),
has_offline_eval=cls._should_create_offline_evaluation_runners(eval_config),
)
actual_learner_bundle_start = (
1
+ len(env_runner_bundles)
+ len(eval_env_runner_bundles)
+ len(offline_eval_runner_bundles)
)
assert expected_learner_bundle_start == actual_learner_bundle_start, (
f"Bundle layout drift: `get_learner_bundle_offset` returned "
f"{expected_learner_bundle_start} but `default_resource_request` "
f"places learner bundles at index {actual_learner_bundle_start}. "
"Update `get_learner_bundle_offset` to match the bundle order "
"produced here."
)

return PlacementGroupFactory(
bundles=bundles,
strategy=config.placement_strategy,
Expand Down
51 changes: 51 additions & 0 deletions rllib/algorithms/algorithm_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,12 @@ def __init__(self, algo_class: Optional[type] = None):
self.num_learners = 0
self.num_gpus_per_learner = 0
self.num_cpus_per_learner = "auto"
self.custom_resources_per_learner = {}
self.num_aggregator_actors_per_learner = 0
self.max_requests_in_flight_per_aggregator_actor = 3
self.num_cpus_per_aggregator_actor = 1
self.custom_resources_per_aggregator_actor = {}
self.colocate_aggregator_actors_with_learners = True
self.local_gpu_idx = 0
# TODO (sven): This probably works even without any restriction
# (allowing for any arbitrary number of requests in-flight). Test with
Expand Down Expand Up @@ -2261,8 +2265,12 @@ def learners(
num_learners: Optional[int] = NotProvided,
num_cpus_per_learner: Optional[Union[str, float, int]] = NotProvided,
num_gpus_per_learner: Optional[Union[float, int]] = NotProvided,
custom_resources_per_learner: Optional[Dict[str, float]] = NotProvided,
num_aggregator_actors_per_learner: Optional[int] = NotProvided,
max_requests_in_flight_per_aggregator_actor: Optional[float] = NotProvided,
num_cpus_per_aggregator_actor: Optional[Union[float, int]] = NotProvided,
custom_resources_per_aggregator_actor: Optional[Dict[str, float]] = NotProvided,
colocate_aggregator_actors_with_learners: Optional[bool] = NotProvided,
local_gpu_idx: Optional[int] = NotProvided,
max_requests_in_flight_per_learner: Optional[int] = NotProvided,
learner_class: Optional[Type["Learner"]] = NotProvided,
Expand Down Expand Up @@ -2296,6 +2304,11 @@ def learners(
`num_learners=0`, any value greater than 0 runs the
training on a single GPU on the main process, while a value of 0 runs
the training on main process CPUs.
custom_resources_per_learner: Any custom Ray resources to allocate
per Learner worker. Useful for pinning Learners to specific
nodes via custom resource labels. Note: do NOT put ``"CPU"``
or ``"GPU"`` in here -- use ``num_cpus_per_learner`` and
``num_gpus_per_learner`` instead.
num_aggregator_actors_per_learner: The number of aggregator actors per
Learner (if num_learners=0, one local learner is created). Must be at
least 1. Aggregator actors perform the task of a) converting episodes
Expand All @@ -2304,6 +2317,23 @@ def learners(
this strongly depends on your setup and `EnvRunner` throughput.
max_requests_in_flight_per_aggregator_actor: How many in-flight requests
are allowed per aggregator actor before new requests are dropped?
num_cpus_per_aggregator_actor: Number of CPUs allocated per aggregator
actor. Default is 1.
custom_resources_per_aggregator_actor: Optional dict of custom Ray
resource requirements for each aggregator actor (e.g.
``{"my_label": 0.001}``). Do *not* put ``"CPU"`` or ``"GPU"`` in
here -- use ``num_cpus_per_aggregator_actor`` instead and note
that aggregator actors do not currently support GPUs. These are
hard requirements: if no node in the cluster can satisfy them,
the aggregator never schedules and the algorithm fails.
colocate_aggregator_actors_with_learners: If True (default), RLlib
pins each aggregator actor to the same node as its assigned
learner -- inside a Tune trial via the trial's placement-group
bundle, outside Tune via ``NodeAffinitySchedulingStrategy``.
Co-location avoids per-MABatch cross-node transfers between
the aggregator and its learner. If False, RLlib lets Ray's
default scheduler place each aggregator on any node with
free capacity (matches the pre-PG-bundle behavior).
local_gpu_idx: If `num_gpus_per_learner` > 0, and
`num_learners` < 2, then RLlib uses this GPU index for training. This is
an index into the available
Expand Down Expand Up @@ -2351,12 +2381,33 @@ def learners(
self.num_cpus_per_learner = num_cpus_per_learner
if num_gpus_per_learner is not NotProvided:
self.num_gpus_per_learner = num_gpus_per_learner
if custom_resources_per_learner is not NotProvided:
if custom_resources_per_learner and (
"CPU" in custom_resources_per_learner
or "GPU" in custom_resources_per_learner
):
raise ValueError(
"Do not include 'CPU' or 'GPU' in "
"`custom_resources_per_learner`. Use `num_cpus_per_learner` "
"and `num_gpus_per_learner` instead."
)
self.custom_resources_per_learner = custom_resources_per_learner
if num_aggregator_actors_per_learner is not NotProvided:
self.num_aggregator_actors_per_learner = num_aggregator_actors_per_learner
if max_requests_in_flight_per_aggregator_actor is not NotProvided:
self.max_requests_in_flight_per_aggregator_actor = (
max_requests_in_flight_per_aggregator_actor
)
if num_cpus_per_aggregator_actor is not NotProvided:
self.num_cpus_per_aggregator_actor = num_cpus_per_aggregator_actor
if custom_resources_per_aggregator_actor is not NotProvided:
self.custom_resources_per_aggregator_actor = (
custom_resources_per_aggregator_actor
)
if colocate_aggregator_actors_with_learners is not NotProvided:
self.colocate_aggregator_actors_with_learners = (
colocate_aggregator_actors_with_learners
)
if local_gpu_idx is not NotProvided:
self.local_gpu_idx = local_gpu_idx
if max_requests_in_flight_per_learner is not NotProvided:
Expand Down
Loading
Loading