Skip to content

Commit b97ebfd

Browse files
authored
[ci] fix: fully async ci break (verl-project#5166)
### What does this PR do? as title ### Checklist Before Starting - [ ] Search for similar PRs. Paste at least one query link here: ... - [ ] Format the PR title as `[{modules}] {type}: {description}` (This will be checked by the CI) - `{modules}` include `fsdp`, `megatron`, `veomni`, `sglang`, `vllm`, `rollout`, `trainer`, `ci`, `training_utils`, `recipe`, `hardware`, `deployment`, `ray`, `worker`, `single_controller`, `misc`, `perf`, `model`, `algo`, `env`, `tool`, `ckpt`, `doc`, `data`, `cfg`, `reward` - If this PR involves multiple modules, separate them with `,` like `[megatron, fsdp, doc]` - `{type}` is in `feat`, `fix`, `refactor`, `chore`, `test` - If this PR breaks any API (CLI arguments, config, function signature, etc.), add `[BREAKING]` to the beginning of the title. - Example: `[BREAKING][fsdp, megatron] feat: dynamic batching` ### Test > For changes that can not be tested by CI (e.g., algorithm implementation, new model support), validate by experiment(s) and show results like training curve plots, evaluation results, etc. ### API and Usage Example > Demonstrate how the API changes if any, and provide usage example(s) if possible. ```python # Add code snippet or script demonstrating how to use this ``` ### Design & Code Changes > Demonstrate the high-level design if this PR is complex, and list the specific changes. ### Checklist Before Submitting > [!IMPORTANT] > Please check all the following items before requesting a review, otherwise the reviewer might deprioritize this PR for review. - [ ] Read the [Contribute Guide](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md). - [ ] Apply [pre-commit checks](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md#code-linting-and-formatting): `pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=always` - [ ] Add / Update [the documentation](https://github.com/volcengine/verl/tree/main/docs). - [ ] Add unit or end-to-end test(s) to [the CI workflow](https://github.com/volcengine/verl/tree/main/.github/workflows) to cover all the code. If not feasible, explain why: ... - [ ] Once your PR is ready for CI, send a message in [the `ci-request` channel](https://verl-project.slack.com/archives/C091TCESWB1) in [the `verl` Slack workspace](https://join.slack.com/t/verl-project/shared_invite/zt-3855yhg8g-CTkqXu~hKojPCmo7k_yXTQ). (If not accessible, please try [the Feishu group (飞书群)](https://applink.larkoffice.com/client/chat/chatter/add_by_link?link_token=772jd4f1-cd91-441e-a820-498c6614126a).) - [ ] If your PR is related to the `recipe` submodule, please also update the reference to the submodule commit via `git submodule update --remote` or `cd recipe && git pull origin main`.
1 parent 82cf2dd commit b97ebfd

5 files changed

Lines changed: 69 additions & 27 deletions

File tree

tests/special_e2e/run_fully_async_policy.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ MODEL_PATH=${MODEL_PATH:-${HOME}/models/${MODEL_ID}}
1515

1616

1717
rollout_mode="async"
18-
rollout_name="vllm" # sglang or vllm
18+
rollout_name="sglang" # sglang or vllm
1919
if [ "$rollout_mode" = "async" ]; then
2020
export VLLM_USE_V1=1
2121
return_raw_chat="True"

verl/experimental/fully_async_policy/agent_loop/agent_loop.py

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
)
3333
from verl.experimental.agent_loop.prometheus_utils import update_prometheus_config
3434
from verl.protocol import DataProto
35-
from verl.single_controller.ray import RayResourcePool, RayWorkerGroup
35+
from verl.single_controller.ray import RayWorkerGroup
3636
from verl.utils.rollout_trace import (
3737
rollout_trace_attr,
3838
rollout_trace_op,
@@ -78,10 +78,13 @@ async def generate_for_partial(
7878
@ray.remote
7979
class FullyAsyncAgentLoopWorker(AgentLoopWorker):
8080
def __init__(
81-
self, config: DictConfig, server_handles: list[ray.actor.ActorHandle], reward_router_address: str = None
81+
self,
82+
config: DictConfig,
83+
server_handles: list[ray.actor.ActorHandle],
84+
reward_loop_worker_handles: list[ray.actor.ActorHandle] = None,
8285
):
8386
self.server_manager = FullyAsyncLLMServerManager(config, server_handles)
84-
super().__init__(config, server_handles, reward_router_address)
87+
super().__init__(config, server_handles, reward_loop_worker_handles)
8588
# A shared cancellation event for all agent loops running on this worker.
8689
self.cancellation_event = asyncio.Event()
8790

@@ -211,12 +214,14 @@ async def resume_agent_loops(self):
211214

212215
class FullyAsyncAgentLoopManager(AgentLoopManager):
213216
def __init__(
214-
self, config: DictConfig, worker_group: RayWorkerGroup = None, rm_resource_pool: RayResourcePool = None
217+
self,
218+
config: DictConfig,
219+
worker_group: RayWorkerGroup = None,
220+
reward_loop_worker_handles: list[ray.actor.ActorHandle] = None,
215221
):
216222
self.config = config
217223
self.worker_group = worker_group
218-
self.reward_model_manager = None
219-
self.reward_router_address = None
224+
self.reward_loop_worker_handles = reward_loop_worker_handles
220225
self.agent_loop_workers_class = FullyAsyncAgentLoopWorker
221226

222227
# Select rollout replica class based on rollout name
@@ -234,27 +239,23 @@ def __init__(
234239
else:
235240
raise ValueError(f"Unsupported rollout name: {rollout_name}. Supported values are 'sglang' and 'vllm'.")
236241

237-
self.rm_resource_pool = rm_resource_pool
238242
self.rollout_replicas = None
239243
self.server_handles = None
240244
self.server_addresses = None
241245
self.agent_loop_workers = None
242246

243247
@classmethod
244248
async def create(
245-
cls, config: DictConfig, worker_group: RayWorkerGroup = None, rm_resource_pool: RayResourcePool = None
249+
cls,
250+
config: DictConfig,
251+
worker_group: RayWorkerGroup = None,
252+
reward_loop_worker_handles: list[ray.actor.ActorHandle] = None,
246253
):
247-
instance = cls(config, worker_group, rm_resource_pool)
254+
instance = cls(config, worker_group, reward_loop_worker_handles)
248255
await instance._async_init()
249256
return instance
250257

251258
async def _async_init(self):
252-
if self.config.reward_model.enable and self.config.reward_model.enable_resource_pool:
253-
from verl.experimental.reward_loop import RewardModelManager
254-
255-
self.reward_model_manager = RewardModelManager(self.config.reward_model, self.rm_resource_pool)
256-
self.reward_router_address = self.reward_model_manager.get_router_address()
257-
258259
await self._initialize_llm_servers_async()
259260
self._init_agent_loop_workers()
260261

verl/experimental/fully_async_policy/fsdp_workers.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,23 @@ async def init_engine():
8585
return self.rollout._engine
8686

8787
inference_model = self._run_async_safely(init_engine())
88-
if inference_model is None:
88+
# For ServerAdapter, only TP rank 0 initializes the engine
89+
# TP rank != 0 can safely have inference_model as None
90+
from verl.workers.rollout.sglang_rollout.sglang_rollout import ServerAdapter
91+
92+
is_server_adapter = isinstance(self.rollout, ServerAdapter)
93+
is_non_tp_rank = False
94+
if (
95+
is_server_adapter
96+
and hasattr(self.rollout, "device_mesh")
97+
and self.rollout.device_mesh is not None
98+
):
99+
try:
100+
is_non_tp_rank = self.rollout.device_mesh["infer_tp"].get_local_rank() != 0
101+
except Exception:
102+
pass
103+
104+
if inference_model is None and not (is_server_adapter and is_non_tp_rank):
89105
raise RuntimeError(
90106
f"Failed to initialize rollout engine. "
91107
f"rollout type: {type(self.rollout)}, "

verl/experimental/fully_async_policy/megatron_worker.py

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,23 @@ async def init_engine():
8282
return self.rollout._engine
8383

8484
inference_model = self._run_async_safely(init_engine())
85-
if inference_model is None:
85+
# For ServerAdapter, only TP rank 0 initializes the engine
86+
# TP rank != 0 can safely have inference_model as None
87+
from verl.workers.rollout.sglang_rollout.sglang_rollout import ServerAdapter
88+
89+
is_server_adapter = isinstance(self.rollout, ServerAdapter)
90+
is_non_tp_rank = False
91+
if (
92+
is_server_adapter
93+
and hasattr(self.rollout, "device_mesh")
94+
and self.rollout.device_mesh is not None
95+
):
96+
try:
97+
is_non_tp_rank = self.rollout.device_mesh["infer_tp"].get_local_rank() != 0
98+
except Exception:
99+
pass
100+
101+
if inference_model is None and not (is_server_adapter and is_non_tp_rank):
86102
raise RuntimeError(
87103
f"Failed to initialize rollout engine. "
88104
f"rollout type: {type(self.rollout)}, "
@@ -179,7 +195,23 @@ async def init_engine():
179195
return self.rollout._engine
180196

181197
inference_model = self._run_async_safely(init_engine())
182-
if inference_model is None:
198+
# For ServerAdapter, only TP rank 0 initializes the engine
199+
# TP rank != 0 can safely have inference_model as None
200+
from verl.workers.rollout.sglang_rollout.sglang_rollout import ServerAdapter
201+
202+
is_server_adapter = isinstance(self.rollout, ServerAdapter)
203+
is_non_tp_rank = False
204+
if (
205+
is_server_adapter
206+
and hasattr(self.rollout, "device_mesh")
207+
and self.rollout.device_mesh is not None
208+
):
209+
try:
210+
is_non_tp_rank = self.rollout.device_mesh["infer_tp"].get_local_rank() != 0
211+
except Exception:
212+
pass
213+
214+
if inference_model is None and not (is_server_adapter and is_non_tp_rank):
183215
raise RuntimeError(
184216
f"Failed to initialize rollout engine. "
185217
f"rollout type: {type(self.rollout)}, "

verl/experimental/one_step_off_policy/ray_trainer.py

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -291,14 +291,7 @@ def _init_async_rollout_manager(self):
291291

292292
self.async_rollout_mode = True
293293

294-
if self.config.reward_model.enable and self.config.reward_model.enable_resource_pool:
295-
rm_resource_pool = self.resource_pool_manager.get_resource_pool(Role.RewardModel)
296-
else:
297-
rm_resource_pool = None
298-
299-
self.async_rollout_manager = OneStepOffAgentLoopManager(
300-
config=self.config, worker_group=self.rollout_wg, rm_resource_pool=rm_resource_pool
301-
)
294+
self.async_rollout_manager = OneStepOffAgentLoopManager(config=self.config, worker_group=self.rollout_wg)
302295

303296
def sync_rollout_weights(self):
304297
self.actor_wg.sync_rollout_weights()

0 commit comments

Comments
 (0)