Skip to content

Commit 870d2fe

Browse files
committed
Add TransferQueue CI & Fix
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com> fix setup.py requirements for TQ Signed-off-by: 0oshowero0 <o0shower0o@outlook.com> add TransferQueue ut Signed-off-by: 0oshowero0 <o0shower0o@outlook.com> fix agent loop Signed-off-by: 0oshowero0 <o0shower0o@outlook.com> fix Signed-off-by: 0oshowero0 <o0shower0o@outlook.com> expose performance data to meta_info Signed-off-by: 0oshowero0 <o0shower0o@outlook.com> fix Signed-off-by: 0oshowero0 <o0shower0o@outlook.com> add TODO Signed-off-by: 0oshowero0 <o0shower0o@outlook.com> support passing temp data through BatchMeta Signed-off-by: 0oshowero0 <o0shower0o@outlook.com> fix ci script Signed-off-by: 0oshowero0 <o0shower0o@outlook.com> fix Signed-off-by: 0oshowero0 <o0shower0o@outlook.com> fix CI Signed-off-by: 0oshowero0 <o0shower0o@outlook.com> comment some agentloop metrics in TQ recipe Signed-off-by: 0oshowero0 <o0shower0o@outlook.com> fix Signed-off-by: 0oshowero0 <o0shower0o@outlook.com> fix workflow Signed-off-by: 0oshowero0 <o0shower0o@outlook.com> update tq install fix Signed-off-by: 0oshowero0 <o0shower0o@outlook.com> fix validate tq init logic Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
1 parent a21643f commit 870d2fe

12 files changed

Lines changed: 484 additions & 83 deletions

File tree

Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
# # Tests layout
2+
3+
# Each folder under tests/ corresponds to a test category for a sub-namespace in verl. For instance:
4+
# - `tests/trainer` for testing functionality related to `verl/trainer`
5+
# - `tests/models` for testing functionality related to `verl/models`
6+
# - ...
7+
8+
# There are a few folders with `special_` prefix, created for special purposes:
9+
# - `special_distributed`: unit tests that must run with multiple GPUs
10+
# - `special_e2e`: end-to-end tests with training/generation scripts
11+
# - `special_npu`: tests for NPUs
12+
# - `special_sanity`: a suite of quick sanity tests
13+
# - `special_standalone`: a set of test that are designed to run in dedicated environments
14+
15+
# Accelerators for tests
16+
# - By default tests are run with GPU available, except for the ones under `special_npu`, and any test script whose name ends with `on_cpu.py`.
17+
# - For test scripts with `on_cpu.py` name suffix would be tested on CPU resources in linux environment.
18+
19+
# # Workflow layout
20+
21+
# All CI tests are configured by yaml files in `.github/workflows/`. Here's an overview of all test configs:
22+
# 1. A list of always triggered CPU sanity tests: `check-pr-title.yml`, `secrets_scan.yml`, `check-pr-title,yml`, `pre-commit.yml`, `doc.yml`
23+
# 2. Some heavy multi-GPU unit tests, such as `model.yml`, `vllm.yml`, `sgl.yml`
24+
# 3. End-to-end tests: `e2e_*.yml`
25+
# 4. Unit tests
26+
# - `cpu_unit_tests.yml`, run pytest on all scripts with file name pattern `tests/**/test_*_on_cpu.py`
27+
# - `gpu_unit_tests.yml`, run pytest on all scripts with file without the `on_cpu.py` suffix.
28+
# - Since cpu/gpu unit tests by default runs all tests under `tests`, please make sure tests are manually excluded in them when
29+
# - new workflow yaml is added to `.github/workflows`
30+
# - new tests are added to workflow mentioned in 2.
31+
32+
33+
name: e2e_transferqueue
34+
35+
on:
36+
# Trigger the workflow on push or pull request,
37+
# but only for the main branch
38+
# For push, for now only anti-patterns are specified so it is more conservative
39+
# and achieves higher coverage.
40+
push:
41+
branches:
42+
- main
43+
- v0.*
44+
paths:
45+
- "**/*.py"
46+
- "!**/*.md"
47+
- "!**/*.sh"
48+
# Other entrypoints
49+
- "!examples/*trainer*"
50+
- "!tests/**"
51+
- "!verl/trainer/main_*.py"
52+
- "!verl/trainer/fsdp_sft_trainer.py"
53+
- "!recipe/**"
54+
- "recipe/transfer_queue/**"
55+
pull_request:
56+
branches:
57+
- main
58+
- v0.*
59+
paths:
60+
- "**/*.py"
61+
- "!**/*.md"
62+
- "!**/*.sh"
63+
# Other entrypoints
64+
- "!examples/**"
65+
- "!tests/**"
66+
- "!verl/trainer/main_*.py"
67+
- "!verl/trainer/fsdp_sft_trainer.py"
68+
# Other recipes
69+
- "!recipe/**"
70+
# Home
71+
- "recipe/transfer_queue"
72+
# Entrypoints
73+
- ".github/workflows/e2e_transferqueue.yml"
74+
- "examples/data_preprocess/gsm8k.py"
75+
- "tests/special_e2e/run_transferqueue.sh"
76+
77+
# Cancel jobs on the same ref if a new one is triggered
78+
concurrency:
79+
group: ${{ github.workflow }}-${{ github.ref }}
80+
cancel-in-progress: ${{ github.ref != 'refs/heads/main' }}
81+
82+
# Declare permissions just read content.
83+
permissions:
84+
contents: read
85+
86+
env:
87+
IMAGE: "verl-ci-cn-beijing.cr.volces.com/verlai/verl:vllm011.dev7"
88+
DYNAMIC_RUNNER_ENDPOINT: "https://sd10g3clalm04ug7alq90.apigateway-cn-beijing.volceapi.com/runner"
89+
TRANSFORMERS_VERSION: "4.56.2"
90+
91+
jobs:
92+
setup:
93+
if: github.repository_owner == 'volcengine'
94+
runs-on: ubuntu-latest
95+
outputs:
96+
runner-label: ${{ steps.create-runner.outputs.runner-label }}
97+
mlp-task-id: ${{ steps.create-runner.outputs.mlp-task-id }}
98+
steps:
99+
- uses: actions/checkout@v4
100+
- id: create-runner
101+
uses: volcengine/vemlp-github-runner@v1
102+
with:
103+
mode: "create"
104+
faas-url: "${{ env.DYNAMIC_RUNNER_ENDPOINT }}"
105+
mlp-image: "${{ env.IMAGE }}"
106+
107+
# Test FSDP strategy
108+
e2e_transferqueue_fsdp:
109+
needs: setup
110+
runs-on: [ "${{ needs.setup.outputs.runner-label || 'L20x8' }}" ]
111+
timeout-minutes: 10 # Increase timeout for async training
112+
env:
113+
HTTP_PROXY: ${{ secrets.PROXY_HTTP }}
114+
HTTPS_PROXY: ${{ secrets.PROXY_HTTPS }}
115+
NO_PROXY: "localhost,127.0.0.1,hf-mirror.com"
116+
HF_ENDPOINT: "https://hf-mirror.com"
117+
HF_HUB_ENABLE_HF_TRANSFER: "0" # This is more stable
118+
ACTOR_STRATEGY: "fsdp"
119+
steps:
120+
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
121+
with:
122+
fetch-depth: 0
123+
- name: Install the current repository
124+
run: |
125+
pip3 install --no-deps -e .[test,gpu]
126+
pip3 install transformers==$TRANSFORMERS_VERSION
127+
pip3 install -i https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple TransferQueue==0.1.2.dev0
128+
- name: Prepare GSM8K dataset
129+
run: |
130+
python3 examples/data_preprocess/gsm8k.py --local_dataset_path ${HOME}/models/hf_data/gsm8k
131+
- name: Running the E2E test with TransferQueue (FSDP)
132+
run: |
133+
ray stop --force
134+
bash tests/special_e2e/run_transferqueue.sh
135+
136+
# Test Megatron strategy
137+
e2e_transferqueue_megatron:
138+
needs: setup
139+
runs-on: [ "${{ needs.setup.outputs.runner-label || 'L20x8' }}" ]
140+
timeout-minutes: 10 # Increase timeout for async training
141+
env:
142+
HTTP_PROXY: ${{ secrets.PROXY_HTTP }}
143+
HTTPS_PROXY: ${{ secrets.PROXY_HTTPS }}
144+
NO_PROXY: "localhost,127.0.0.1,hf-mirror.com"
145+
HF_ENDPOINT: "https://hf-mirror.com"
146+
HF_HUB_ENABLE_HF_TRANSFER: "0" # This is more stable
147+
ACTOR_STRATEGY: "megatron"
148+
steps:
149+
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
150+
with:
151+
fetch-depth: 0
152+
- name: Install the current repository
153+
run: |
154+
pip3 install --no-deps -e .[test,gpu]
155+
pip3 install transformers==$TRANSFORMERS_VERSION
156+
pip3 install -i https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple TransferQueue==0.1.2.dev0
157+
- name: Prepare GSM8K dataset
158+
run: |
159+
python3 examples/data_preprocess/gsm8k.py --local_dataset_path ${HOME}/models/hf_data/gsm8k
160+
- name: Running the E2E test with TransferQueue (Megatron)
161+
run: |
162+
ray stop --force
163+
bash tests/special_e2e/run_transferqueue.sh
164+
165+
cleanup:
166+
runs-on: ubuntu-latest
167+
needs:
168+
[
169+
setup,
170+
e2e_transferqueue_fsdp,
171+
e2e_transferqueue_megatron
172+
]
173+
if: always()
174+
steps:
175+
- id: destroy-runner
176+
uses: volcengine/vemlp-github-runner@v1
177+
with:
178+
mode: "destroy"
179+
faas-url: "${{ env.DYNAMIC_RUNNER_ENDPOINT }}"
180+
mlp-task-id: "${{ needs.setup.outputs.mlp-task-id }}"

docs/data/transfer_queue.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,9 @@ This class encapsulates the core interaction logic within the TransferQueue syst
6161
Currently, we support the following storage backends:
6262

6363
- SimpleStorageUnit: A basic CPU memory storage with minimal data format constraints and easy usability.
64-
- [MoonCakeStore](https://github.com/kvcache-ai/Mooncake): A high-performance, KV-based hierarchical storage that supports RDMA transport between GPU and DRAM.
6564
- [Yuanrong](https://gitee.com/openeuler/yuanrong-datasystem): An Ascend native data system that provides hierarchical storage interfaces including HBM/DRAM/SSD.
66-
- [Ray Direct Transport](https://docs.ray.io/en/master/ray-core/direct-transport.html): Ray's new feature that allows Ray to store and pass objects directly between Ray actors.
65+
- [MoonCakeStore](https://github.com/kvcache-ai/Mooncake) (WIP): A high-performance, KV-based hierarchical storage that supports RDMA transport between GPU and DRAM.
66+
- [Ray Direct Transport](https://docs.ray.io/en/master/ray-core/direct-transport.html) ([WIP](https://github.com/TransferQueue/TransferQueue/pull/108)): Ray's new feature that allows Ray to store and pass objects directly between Ray actors.
6767

6868
Among them, `SimpleStorageUnit` serves as our default storage backend, coordinated by the `AsyncSimpleStorageManager` class. Each storage unit can be deployed on a separate node, allowing for distributed data management.
6969

recipe/transfer_queue/agent_loop.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
from transfer_queue import BatchMeta
1717

1818
import verl.experimental.agent_loop.agent_loop as agent_loop
19-
from verl import DataProto
2019

2120

2221
class AgentLoopManager(agent_loop.AgentLoopManager):
@@ -30,12 +29,11 @@ def generate_sequences(self, prompts: BatchMeta) -> BatchMeta:
3029
BatchMeta: Output batch metadata.
3130
"""
3231

33-
if self.rm_micro_batch_size and len(prompts) % self.rm_micro_batch_size != 0:
34-
raise ValueError(
35-
f"The length of prompts {len(prompts)} cannot divide the world size of rm_wg {self.rm_micro_batch_size}"
36-
)
3732
if self.config.actor_rollout_ref.rollout.free_cache_engine:
3833
self.wake_up()
34+
if self.reward_model_manager and self.config.reward_model.rollout.free_cache_engine:
35+
self.reward_model_manager.wake_up()
36+
3937
chunkes = prompts.chunk(len(self.agent_loop_workers))
4038
outputs = ray.get(
4139
[
@@ -46,6 +44,8 @@ def generate_sequences(self, prompts: BatchMeta) -> BatchMeta:
4644
output = BatchMeta.concat(outputs)
4745
if self.config.actor_rollout_ref.rollout.free_cache_engine:
4846
self.sleep()
47+
if self.reward_model_manager and self.config.reward_model.rollout.free_cache_engine:
48+
self.reward_model_manager.sleep()
4949

5050
# calculate performance metrics
5151
metrics = [output.extra_info.pop("metrics") for output in outputs] # List[List[Dict[str, str]]]
@@ -54,7 +54,7 @@ def generate_sequences(self, prompts: BatchMeta) -> BatchMeta:
5454
output.set_extra_info("timing", timing)
5555
return output
5656

57-
def _performance_metrics(self, metrics: list[list[dict[str, str]]], output: DataProto) -> dict[str, float]:
57+
def _performance_metrics(self, metrics: list[list[dict[str, str]]], output: BatchMeta) -> dict[str, float]:
5858
timing = {}
5959
t_generate_sequences = np.array([metric["generate_sequences"] for chunk in metrics for metric in chunk])
6060
t_tool_calls = np.array([metric["tool_calls"] for chunk in metrics for metric in chunk])
@@ -65,6 +65,16 @@ def _performance_metrics(self, metrics: list[list[dict[str, str]]], output: Data
6565
timing["agent_loop/tool_calls/max"] = t_tool_calls.max()
6666
timing["agent_loop/tool_calls/mean"] = t_tool_calls.mean()
6767

68+
# TODO (TQ): pass tq info throughout AgentLoop so we can retrieve tensor for these metrics
69+
# batch sequence generation is bounded by the slowest sample
70+
# slowest = np.argmax(t_generate_sequences + t_tool_calls)
71+
# attention_mask = output.extra_info.pop("attention_mask_perf")[slowest]
72+
# prompt_length = output.extra_info.pop("prompts_perf").shape[1]
73+
# timing["agent_loop/slowest/generate_sequences"] = t_generate_sequences[slowest]
74+
# timing["agent_loop/slowest/tool_calls"] = t_tool_calls[slowest]
75+
# timing["agent_loop/slowest/prompt_length"] = attention_mask[:prompt_length].sum().item()
76+
# timing["agent_loop/slowest/response_length"] = attention_mask[prompt_length:].sum().item()
77+
6878
return timing
6979

7080
def create_transferqueue_client(self, controller_info, config):
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
hydra:
2+
searchpath:
3+
- file://verl/trainer/config
4+
5+
defaults:
6+
- ppo_megatron_trainer
7+
- _self_
8+
9+
# config for TransferQueue
10+
transfer_queue:
11+
enable: True

recipe/transfer_queue/ray_trainer.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -421,11 +421,7 @@ def _initialize_data_system(self):
421421
* self.config.trainer.num_global_batch
422422
* self.config.actor_rollout_ref.rollout.n
423423
)
424-
val_data_size = (
425-
self.val_batch_size
426-
* self.config.trainer.num_global_batch
427-
* self.config.actor_rollout_ref.rollout.val_kwargs.n
428-
)
424+
val_data_size = self.config.data.val_dataset_size * self.config.actor_rollout_ref.rollout.val_kwargs.n
429425

430426
total_storage_size = train_data_size + val_data_size
431427
self.data_system_storage_units = {}
@@ -491,6 +487,8 @@ def _create_dataloader(self, train_dataset, val_dataset, collate_fn, train_sampl
491487
)
492488
self.train_dataset, self.val_dataset = train_dataset, val_dataset
493489

490+
self.config.data["val_dataset_size"] = len(val_dataset)
491+
494492
if train_sampler is None:
495493
train_sampler = create_rl_sampler(self.config.data, self.train_dataset)
496494
if collate_fn is None:
@@ -810,6 +808,8 @@ def _validate(self):
810808

811809
data_source_lst.append(data_source)
812810

811+
asyncio.run(self.data_system_client.async_clear(partition_id=f"val_{self.global_steps - 1}"))
812+
813813
self._maybe_log_val_generations(inputs=sample_inputs, outputs=sample_outputs, scores=sample_scores)
814814

815815
# dump generations
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
set -x
2+
3+
MODEL_PATH="/workspace/models/Qwen3-8B"
4+
TRAIN_FILE="/workspace/datasets/preprocessed/gsm8k/train.parquet"
5+
TEST_FILE="/workspace/datasets/preprocessed/gsm8k/test.parquet"
6+
7+
log_dir="./logs"
8+
mkdir -p ${log_dir}
9+
timestamp=$(date +"%Y%m%d%H%M%S")
10+
log_file="${log_dir}/qwen3-8b_tq_${timestamp}.log"
11+
12+
rollout_mode="async"
13+
rollout_name="vllm" # sglang or vllm
14+
if [ "$rollout_mode" = "async" ]; then
15+
export VLLM_USE_V1=1
16+
return_raw_chat="True"
17+
fi
18+
19+
# You may also refer to tests/special_e2e/run_transferqueue.sh for more demo scripts
20+
21+
python3 -m recipe.transfer_queue.main_ppo \
22+
--config-name='transfer_queue_ppo_trainer' \
23+
algorithm.adv_estimator=grpo \
24+
data.train_files=${TRAIN_FILE} \
25+
data.val_files=${TEST_FILE} \
26+
data.return_raw_chat=$return_raw_chat \
27+
data.train_batch_size=128 \
28+
data.max_prompt_length=2048 \
29+
data.max_response_length=8192 \
30+
data.filter_overlong_prompts_workers=128 \
31+
data.filter_overlong_prompts=True \
32+
data.truncation='error' \
33+
actor_rollout_ref.model.path=${MODEL_PATH} \
34+
actor_rollout_ref.actor.optim.lr=1e-6 \
35+
actor_rollout_ref.model.use_remove_padding=True \
36+
actor_rollout_ref.actor.ppo_mini_batch_size=32 \
37+
actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu=4 \
38+
actor_rollout_ref.actor.use_kl_loss=True \
39+
actor_rollout_ref.actor.kl_loss_coef=0.001 \
40+
actor_rollout_ref.actor.kl_loss_type=low_var_kl \
41+
actor_rollout_ref.actor.entropy_coeff=0 \
42+
actor_rollout_ref.model.enable_gradient_checkpointing=True \
43+
actor_rollout_ref.actor.fsdp_config.param_offload=True \
44+
actor_rollout_ref.actor.fsdp_config.optimizer_offload=True \
45+
actor_rollout_ref.rollout.log_prob_micro_batch_size_per_gpu=4 \
46+
actor_rollout_ref.rollout.tensor_model_parallel_size=4 \
47+
actor_rollout_ref.rollout.max_num_batched_tokens=10240 \
48+
actor_rollout_ref.rollout.name=$rollout_name \
49+
actor_rollout_ref.rollout.mode=$rollout_mode \
50+
actor_rollout_ref.rollout.gpu_memory_utilization=0.8 \
51+
actor_rollout_ref.rollout.n=5 \
52+
actor_rollout_ref.ref.log_prob_micro_batch_size_per_gpu=8 \
53+
actor_rollout_ref.ref.fsdp_config.param_offload=True \
54+
algorithm.use_kl_in_reward=False \
55+
trainer.critic_warmup=0 \
56+
trainer.logger=console \
57+
trainer.project_name='verl_grpo_example_gsm8k' \
58+
trainer.experiment_name='qwen3_8b_function_rm' \
59+
trainer.n_gpus_per_node=8 \
60+
trainer.nnodes=1 \
61+
trainer.save_freq=-1 \
62+
trainer.test_freq=1000 \
63+
trainer.total_epochs=15 \
64+
trainer.total_training_steps=2 \
65+
trainer.val_before_train=False \
66+
+trainer.num_global_batch=1 \
67+
+trainer.num_data_storage_units=8 \
68+
2>&1 | tee "$log_file"
69+
echo "Finished, log is saved in: $log_file"

0 commit comments

Comments
 (0)