forked from TransferQueue/verl
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathray_trainer.py
More file actions
1718 lines (1462 loc) · 79.5 KB
/
ray_trainer.py
File metadata and controls
1718 lines (1462 loc) · 79.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Copyright 2024 Bytedance Ltd. and/or its affiliates
# Copyright 2023-2024 SGLang Team
# Copyright 2025 ModelBest Inc. and/or its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
PPO Trainer with Ray-based single controller.
This trainer supports model-agonistic model initialization with huggingface
"""
import json
import logging
import math
import os
import uuid
from collections import defaultdict
from dataclasses import dataclass, field
from pprint import pprint
from typing import Any, Optional
import numpy as np
import ray
import tensordict
import torch
from omegaconf import OmegaConf, open_dict
from packaging.version import parse as parse_version
from tensordict import TensorDict
from torch.utils.data import Dataset, Sampler
from torchdata.stateful_dataloader import StatefulDataLoader
from tqdm import tqdm
from transfer_queue import (
BatchMeta,
SimpleStorageUnit,
TransferQueueController,
get_placement_group,
process_zmq_server_info,
)
from verl import DataProto
from verl.experimental.dataset.sampler import AbstractCurriculumSampler
from verl.single_controller.ray import (
RayClassWithInitArgs,
RayResourcePool,
RayWorkerGroup,
)
from verl.single_controller.ray.base import create_colocated_worker_cls
from verl.trainer.config import AlgoConfig
from verl.trainer.ppo import core_algos
from verl.trainer.ppo.core_algos import AdvantageEstimator, agg_loss
from verl.trainer.ppo.metric_utils import (
compute_data_metrics,
compute_throughout_metrics,
compute_timing_metrics,
process_validation_metrics,
)
from verl.trainer.ppo.reward import compute_reward, compute_reward_async
from verl.trainer.ppo.utils import (
Role,
WorkerType,
need_critic,
need_reference_policy,
need_reward_model,
)
from verl.utils.checkpoint.checkpoint_manager import (
find_latest_ckpt_path,
should_save_ckpt_esi,
)
from verl.utils.config import omega_conf_to_dataclass
from verl.utils.debug import marked_timer
from verl.utils.metric import reduce_metrics
from verl.utils.rollout_skip import RolloutSkip
from verl.utils.seqlen_balancing import (
calculate_workload,
get_seqlen_balanced_partitions,
log_seqlen_unbalance,
)
from verl.utils.torch_functional import masked_mean
from verl.utils.tracking import ValidationGenerationsLogger
from verl.utils.transferqueue_utils import (
create_transferqueue_client,
get_transferqueue_client,
tqbridge,
)
@dataclass
class ResourcePoolManager:
"""
Define a resource pool specification. Resource pool will be initialized first.
"""
resource_pool_spec: dict[str, list[int]]
mapping: dict[Role, str]
resource_pool_dict: dict[str, RayResourcePool] = field(default_factory=dict)
def create_resource_pool(self):
"""Create Ray resource pools for distributed training.
Initializes resource pools based on the resource pool specification,
with each pool managing GPU resources across multiple nodes.
For FSDP backend, uses max_colocate_count=1 to merge WorkerGroups.
For Megatron backend, uses max_colocate_count>1 for different models.
"""
for resource_pool_name, process_on_nodes in self.resource_pool_spec.items():
# max_colocate_count means the number of WorkerGroups (i.e. processes) in each RayResourcePool
# For FSDP backend, we recommend using max_colocate_count=1 that merge all WorkerGroups into one.
# For Megatron backend, we recommend using max_colocate_count>1
# that can utilize different WorkerGroup for differnt models
resource_pool = RayResourcePool(
process_on_nodes=process_on_nodes, use_gpu=True, max_colocate_count=1, name_prefix=resource_pool_name
)
self.resource_pool_dict[resource_pool_name] = resource_pool
self._check_resource_available()
def get_resource_pool(self, role: Role) -> RayResourcePool:
"""Get the resource pool of the worker_cls"""
return self.resource_pool_dict[self.mapping[role]]
def get_n_gpus(self) -> int:
"""Get the number of gpus in this cluster."""
return sum([n_gpus for process_on_nodes in self.resource_pool_spec.values() for n_gpus in process_on_nodes])
def _check_resource_available(self):
"""Check if the resource pool can be satisfied in this ray cluster."""
node_available_resources = ray._private.state.available_resources_per_node()
node_available_gpus = {
node: node_info.get("GPU", 0) if "GPU" in node_info else node_info.get("NPU", 0)
for node, node_info in node_available_resources.items()
}
# check total required gpus can be satisfied
total_available_gpus = sum(node_available_gpus.values())
total_required_gpus = sum(
[n_gpus for process_on_nodes in self.resource_pool_spec.values() for n_gpus in process_on_nodes]
)
if total_available_gpus < total_required_gpus:
raise ValueError(
f"Total available GPUs {total_available_gpus} is less than total desired GPUs {total_required_gpus}"
)
@tqbridge(put_data=False)
def compute_reward_decorated(data, reward_fn):
return compute_reward(data, reward_fn)
@tqbridge(put_data=False)
def compute_reward_async_decorated(data, reward_fn):
return compute_reward_async.remote(data, reward_fn)
@tqbridge(put_data=False)
def apply_kl_penalty(data: DataProto, kl_ctrl: core_algos.AdaptiveKLController, kl_penalty="kl"):
"""Apply KL penalty to the token-level rewards.
This function computes the KL divergence between the reference policy and current policy,
then applies a penalty to the token-level rewards based on this divergence.
Args:
data (DataProto): The data containing batched model outputs and inputs.
kl_ctrl (core_algos.AdaptiveKLController): Controller for adaptive KL penalty.
kl_penalty (str, optional): Type of KL penalty to apply. Defaults to "kl".
Returns:
tuple: A tuple containing:
- The updated data with token-level rewards adjusted by KL penalty
- A dictionary of metrics related to the KL penalty
"""
response_mask = data.batch["response_mask"]
token_level_scores = data.batch["token_level_scores"]
batch_size = data.batch.batch_size[0]
# compute kl between ref_policy and current policy
# When apply_kl_penalty, algorithm.use_kl_in_reward=True, so the reference model has been enabled.
kld = core_algos.kl_penalty(
data.batch["old_log_probs"], data.batch["ref_log_prob"], kl_penalty=kl_penalty
) # (batch_size, response_length)
kld = kld * response_mask
beta = kl_ctrl.value
token_level_rewards = token_level_scores - beta * kld
current_kl = masked_mean(kld, mask=response_mask, axis=-1) # average over sequence
current_kl = torch.mean(current_kl, dim=0).item()
# according to https://github.com/huggingface/trl/blob/951ca1841f29114b969b57b26c7d3e80a39f75a0/trl/trainer/ppo_trainer.py#L837
kl_ctrl.update(current_kl=current_kl, n_steps=batch_size)
metrics = {"actor/reward_kl_penalty": current_kl, "actor/reward_kl_penalty_coeff": beta}
return token_level_rewards, metrics
def compute_response_mask(batch_meta: BatchMeta, tq_client):
"""Compute the attention mask for the response part of the sequence.
This function extracts the portion of the attention mask that corresponds to the model's response,
which is used for masking computations that should only apply to response tokens.
Args:
batch_meta (BatchMeta): The data containing batched model outputs and inputs.
Returns:
BatchMeta: The BatchMeta of attention mask for the response tokens.
"""
data = tq_client.get_data(batch_meta)
responses = data["responses"]
response_length = responses.size(1)
attention_mask = data["attention_mask"]
response_mask = attention_mask[:, -response_length:]
output = TensorDict({"response_mask": response_mask}, batch_size=response_mask.size(0))
batch_meta = tq_client.put(data=output, metadata=batch_meta)
return batch_meta
@tqbridge(put_data=False)
def compute_advantage(
data: DataProto,
adv_estimator: AdvantageEstimator,
gamma: float = 1.0,
lam: float = 1.0,
num_repeat: int = 1,
norm_adv_by_std_in_grpo: bool = True,
config: Optional[AlgoConfig] = None,
) -> tuple[Any, Any]:
"""Compute advantage estimates for policy optimization.
This function computes advantage estimates using various estimators like GAE, GRPO, REINFORCE++, etc.
The advantage estimates are used to guide policy optimization in RL algorithms.
Args:
data (DataProto): The data containing batched model outputs and inputs.
adv_estimator (AdvantageEstimator): The advantage estimator to use (e.g., GAE, GRPO, REINFORCE++).
gamma (float, optional): Discount factor for future rewards. Defaults to 1.0.
lam (float, optional): Lambda parameter for GAE. Defaults to 1.0.
num_repeat (int, optional): Number of times to repeat the computation. Defaults to 1.
norm_adv_by_std_in_grpo (bool, optional): Whether to normalize advantages by standard deviation in
GRPO. Defaults to True.
config (dict, optional): Configuration dictionary for algorithm settings. Defaults to None.
Returns:
tuple: A tuple containing:
- advantages: The computed advantage estimates.
- returns: The computed returns.
"""
# prepare response group
if adv_estimator == AdvantageEstimator.GAE:
# Compute advantages and returns using Generalized Advantage Estimation (GAE)
advantages, returns = core_algos.compute_gae_advantage_return(
token_level_rewards=data.batch["token_level_rewards"],
values=data.batch["values"],
response_mask=data.batch["response_mask"],
gamma=gamma,
lam=lam,
)
# TODO (TQ): adapt core_algos.compute_pf_ppo_reweight_data function to support transfer queue
if config.get("use_pf_ppo", False):
data = core_algos.compute_pf_ppo_reweight_data(
data,
config.pf_ppo.get("reweight_method"),
config.pf_ppo.get("weight_pow"),
)
elif adv_estimator == AdvantageEstimator.GRPO:
# Initialize the mask for GRPO calculation
grpo_calculation_mask = data.batch["response_mask"]
# Call compute_grpo_outcome_advantage with parameters matching its definition
advantages, returns = core_algos.compute_grpo_outcome_advantage(
token_level_rewards=data.batch["token_level_rewards"],
response_mask=grpo_calculation_mask,
index=data.non_tensor_batch["uid"],
norm_adv_by_std_in_grpo=norm_adv_by_std_in_grpo,
)
else:
# handle all other adv estimator type other than GAE and GRPO
adv_estimator_fn = core_algos.get_adv_estimator_fn(adv_estimator)
adv_kwargs = {
"token_level_rewards": data.batch["token_level_rewards"],
"response_mask": data.batch["response_mask"],
"config": config,
}
if "uid" in data.non_tensor_batch: # optional
adv_kwargs["index"] = data.non_tensor_batch["uid"]
if "reward_baselines" in data.batch: # optional
adv_kwargs["reward_baselines"] = data.batch["reward_baselines"]
# calculate advantage estimator
advantages, returns = adv_estimator_fn(**adv_kwargs)
return advantages, returns
@tqbridge(put_data=False)
def compute_data_metrics_decorated(batch, use_critic: bool = True):
return compute_data_metrics(batch, use_critic)
@tqbridge(put_data=False)
def compute_timing_metrics_decorated(batch, timing_raw: dict[str, float]) -> dict[str, Any]:
return compute_timing_metrics(batch, timing_raw)
@tqbridge(put_data=False)
def compute_throughout_metrics_decorated(batch, timing_raw: dict[str, float], n_gpus: int) -> dict[str, Any]:
return compute_throughout_metrics(batch, timing_raw, n_gpus)
@tqbridge(put_data=False)
def calculate_debug_metrics_decorated(data):
from verl.utils.debug.metrics import calculate_debug_metrics
return calculate_debug_metrics(data)
@tqbridge(put_data=False)
def compute_val_reward_decorated(reward_fn, data, return_dict):
return reward_fn(data, return_dict)
class RayPPOTrainer:
"""Distributed PPO trainer using Ray for scalable reinforcement learning.
This trainer orchestrates distributed PPO training across multiple nodes and GPUs,
managing actor rollouts, critic training, and reward computation with Ray backend.
Supports various model architectures including FSDP, Megatron, vLLM, and SGLang integration.
"""
# TODO: support each role have individual ray_worker_group_cls,
# i.e., support different backend of different role
def __init__(
self,
config,
tokenizer,
role_worker_mapping: dict[Role, WorkerType],
resource_pool_manager: ResourcePoolManager,
ray_worker_group_cls: type[RayWorkerGroup] = RayWorkerGroup,
processor=None,
reward_fn=None,
val_reward_fn=None,
train_dataset: Optional[Dataset] = None,
val_dataset: Optional[Dataset] = None,
collate_fn=None,
train_sampler: Optional[Sampler] = None,
device_name=None,
):
"""
Initialize distributed PPO trainer with Ray backend.
Note that this trainer runs on the driver process on a single CPU/GPU node.
Args:
config: Configuration object containing training parameters.
tokenizer: Tokenizer used for encoding and decoding text.
role_worker_mapping (dict[Role, WorkerType]): Mapping from roles to worker classes.
resource_pool_manager (ResourcePoolManager): Manager for Ray resource pools.
ray_worker_group_cls (RayWorkerGroup, optional): Class for Ray worker groups. Defaults to RayWorkerGroup.
processor: Optional data processor, used for multimodal data
reward_fn: Function for computing rewards during training.
val_reward_fn: Function for computing rewards during validation.
train_dataset (Optional[Dataset], optional): Training dataset. Defaults to None.
val_dataset (Optional[Dataset], optional): Validation dataset. Defaults to None.
collate_fn: Function to collate data samples into batches.
train_sampler (Optional[Sampler], optional): Sampler for the training dataset. Defaults to None.
device_name (str, optional): Device name for training (e.g., "cuda", "cpu"). Defaults to None.
"""
# Store the tokenizer for text processing
self.tokenizer = tokenizer
self.processor = processor
self.config = config
self.reward_fn = reward_fn
self.val_reward_fn = val_reward_fn
self.hybrid_engine = config.actor_rollout_ref.hybrid_engine
assert self.hybrid_engine, "Currently, only support hybrid engine"
if self.hybrid_engine:
assert Role.ActorRollout in role_worker_mapping, f"{role_worker_mapping.keys()=}"
self.role_worker_mapping = role_worker_mapping
self.resource_pool_manager = resource_pool_manager
self.use_reference_policy = need_reference_policy(self.role_worker_mapping)
self.use_rm = need_reward_model(self.role_worker_mapping)
self.use_critic = need_critic(self.config)
self.ray_worker_group_cls = ray_worker_group_cls
self.device_name = device_name if device_name else self.config.trainer.device
self.validation_generations_logger = ValidationGenerationsLogger(
project_name=self.config.trainer.project_name,
experiment_name=self.config.trainer.experiment_name,
)
# if ref_in_actor is True, the reference policy will be actor without lora applied
self.ref_in_actor = config.actor_rollout_ref.model.get("lora_rank", 0) > 0
# define in-reward KL control
# kl loss control currently not suppoorted
if self.config.algorithm.use_kl_in_reward:
self.kl_ctrl_in_reward = core_algos.get_kl_controller(self.config.algorithm.kl_ctrl)
self._create_dataloader(train_dataset, val_dataset, collate_fn, train_sampler)
self.tq_client = self._initialize_transferqueue()
def _initialize_transferqueue(self):
# 1. initialize TransferQueueStorage
if self.config.transfer_queue.storage_backend == "AsyncSimpleStorageManager":
train_data_size = (
self.config.data.train_batch_size
* self.config.transfer_queue.num_global_batch
* self.config.actor_rollout_ref.rollout.n
)
val_data_size = self.val_dataset_size * self.config.actor_rollout_ref.rollout.val_kwargs.n
total_storage_size = train_data_size + val_data_size
self.data_system_storage_units = {}
storage_placement_group = get_placement_group(
self.config.transfer_queue.num_data_storage_units, num_cpus_per_actor=1
)
for storage_unit_rank in range(self.config.transfer_queue.num_data_storage_units):
storage_node = SimpleStorageUnit.options(
placement_group=storage_placement_group, placement_group_bundle_index=storage_unit_rank
).remote(
storage_unit_size=math.ceil(total_storage_size / self.config.transfer_queue.num_data_storage_units)
)
self.data_system_storage_units[storage_unit_rank] = storage_node
logging.info(f"SimpleStorageUnit #{storage_unit_rank} has been created.")
else:
raise NotImplementedError("Currently only support AsyncSimpleStorageManager backend in TransferQueue")
# 2. Initialize TransferQueueController (single controller only)
# Sampler usage instructions:
# For GRPO grouped sampling, you can initialize the controller with GRPOGroupNSampler:
# Option 1: Pass sampler class (will be instantiated automatically)
# self.data_system_controller = TransferQueueController.remote(sampler=GRPOGroupNSampler)
# Option 2: Pass sampler instance (if you need custom configuration)
# grpo_sampler = GRPOGroupNSampler()
# self.data_system_controller = TransferQueueController.remote(sampler=grpo_sampler)
# Then use sampling_config in get_meta calls:
# sampling_config={"n_samples_per_prompt": 4}
self.data_system_controller = TransferQueueController.remote()
logging.info("TransferQueueController has been created.")
# 3. register controller & storage and prepare necessary information
self.data_system_controller_info = process_zmq_server_info(self.data_system_controller)
if self.config.transfer_queue.storage_backend == "AsyncSimpleStorageManager":
self.data_system_storage_unit_infos = process_zmq_server_info(self.data_system_storage_units)
# Note: Need to generate a new DictConfig with allow_objects=True to preserve ZMQServerInfo instances
# (which contain socket connection details). Without this flag, OmegaConf would flatten these objects to dicts,
# breaking the transfer queue client initialization.
tq_config = OmegaConf.create({"transfer_queue": {}}, flags={"allow_objects": True})
tq_config.transfer_queue.controller_info = self.data_system_controller_info
if self.config.transfer_queue.storage_backend == "AsyncSimpleStorageManager":
tq_config.transfer_queue.storage_unit_infos = self.data_system_storage_unit_infos
self.config = OmegaConf.merge(tq_config, self.config)
# 4. create client
create_transferqueue_client(client_id="Trainer", config=self.config.transfer_queue, sync=True)
tq_client = get_transferqueue_client()
return tq_client
def _create_dataloader(self, train_dataset, val_dataset, collate_fn, train_sampler: Optional[Sampler]):
"""
Creates the train and validation dataloaders.
"""
# TODO: we have to make sure the batch size is divisible by the dp size
from verl.trainer.main_ppo import create_rl_dataset, create_rl_sampler
if train_dataset is None:
train_dataset = create_rl_dataset(
self.config.data.train_files, self.config.data, self.tokenizer, self.processor
)
if val_dataset is None:
val_dataset = create_rl_dataset(
self.config.data.val_files, self.config.data, self.tokenizer, self.processor
)
self.train_dataset, self.val_dataset = train_dataset, val_dataset
self.val_dataset_size = len(val_dataset)
if train_sampler is None:
train_sampler = create_rl_sampler(self.config.data, self.train_dataset)
if collate_fn is None:
from verl.utils.dataset.rl_dataset import collate_fn as default_collate_fn
collate_fn = default_collate_fn
num_workers = self.config.data["dataloader_num_workers"]
self.train_dataloader = StatefulDataLoader(
dataset=self.train_dataset,
batch_size=self.config.data.get("gen_batch_size", self.config.data.train_batch_size),
num_workers=num_workers,
drop_last=True,
collate_fn=collate_fn,
sampler=train_sampler,
)
val_batch_size = self.config.data.val_batch_size # Prefer config value if set
if val_batch_size is None:
val_batch_size = len(self.val_dataset)
self.val_batch_size = val_batch_size
self.val_dataloader = StatefulDataLoader(
dataset=self.val_dataset,
batch_size=val_batch_size,
num_workers=num_workers,
shuffle=self.config.data.get("validation_shuffle", True),
drop_last=False,
collate_fn=collate_fn,
)
assert len(self.train_dataloader) >= 1, "Train dataloader is empty!"
assert len(self.val_dataloader) >= 1, "Validation dataloader is empty!"
print(
f"Size of train dataloader: {len(self.train_dataloader)}, Size of val dataloader: "
f"{len(self.val_dataloader)}"
)
total_training_steps = len(self.train_dataloader) * self.config.trainer.total_epochs
if self.config.trainer.total_training_steps is not None:
total_training_steps = self.config.trainer.total_training_steps
self.total_training_steps = total_training_steps
print(f"Total training steps: {self.total_training_steps}")
try:
OmegaConf.set_struct(self.config, True)
with open_dict(self.config):
if OmegaConf.select(self.config, "actor_rollout_ref.actor.optim"):
self.config.actor_rollout_ref.actor.optim.total_training_steps = total_training_steps
if OmegaConf.select(self.config, "critic.optim"):
self.config.critic.optim.total_training_steps = total_training_steps
except Exception as e:
print(f"Warning: Could not set total_training_steps in config. Structure missing? Error: {e}")
def _dump_generations(self, inputs, outputs, gts, scores, reward_extra_infos_dict, dump_path):
"""Dump rollout/validation samples as JSONL."""
os.makedirs(dump_path, exist_ok=True)
filename = os.path.join(dump_path, f"{self.global_steps}.jsonl")
n = len(inputs)
base_data = {
"input": inputs,
"output": outputs,
"gts": gts,
"score": scores,
"step": [self.global_steps] * n,
}
for k, v in reward_extra_infos_dict.items():
if len(v) == n:
base_data[k] = v
lines = []
for i in range(n):
entry = {k: v[i] for k, v in base_data.items()}
lines.append(json.dumps(entry, ensure_ascii=False))
with open(filename, "w") as f:
f.write("\n".join(lines) + "\n")
print(f"Dumped generations to {filename}")
def _log_rollout_data(
self, log_rollout_meta: BatchMeta, reward_extra_infos_dict: dict, timing_raw: dict, rollout_data_dir: str
):
"""
Log rollout data to disk.
Args:
log_rollout_meta (BatchMeta): The batch_meta of rollout data
reward_extra_infos_dict (dict): Additional reward information to log
timing_raw (dict): Timing information for profiling
rollout_data_dir (str): Directory path to save the rollout data
"""
with marked_timer("dump_rollout_generations", timing_raw, color="green"):
data = self.tq_client.get_data(log_rollout_meta)
inputs = self.tokenizer.batch_decode(data["prompts"], skip_special_tokens=True)
outputs = self.tokenizer.batch_decode(data["responses"], skip_special_tokens=True)
scores = data["token_level_scores"].sum(-1).cpu().tolist()
sample_gts = [item.get("ground_truth", None) for item in data.get("reward_model", {})]
reward_extra_infos_to_dump = reward_extra_infos_dict.copy()
if "request_id" in log_rollout_meta.field_names:
reward_extra_infos_dict.setdefault(
"request_id",
data["request_id"].tolist(),
)
self._dump_generations(
inputs=inputs,
outputs=outputs,
gts=sample_gts,
scores=scores,
reward_extra_infos_dict=reward_extra_infos_to_dump,
dump_path=rollout_data_dir,
)
def _maybe_log_val_generations(self, inputs, outputs, scores):
"""Log a table of validation samples to the configured logger (wandb or swanlab)"""
generations_to_log = self.config.trainer.log_val_generations
if generations_to_log == 0:
return
import numpy as np
# Create tuples of (input, output, score) and sort by input text
samples = list(zip(inputs, outputs, scores, strict=True))
samples.sort(key=lambda x: x[0]) # Sort by input text
# Use fixed random seed for deterministic shuffling
rng = np.random.RandomState(42)
rng.shuffle(samples)
# Take first N samples after shuffling
samples = samples[:generations_to_log]
# Log to each configured logger
self.validation_generations_logger.log(self.config.trainer.logger, samples, self.global_steps)
def _get_gen_batch(self, batch: DataProto) -> DataProto:
reward_model_keys = set({"data_source", "reward_model", "extra_info", "uid"}) & batch.non_tensor_batch.keys()
# pop those keys for generation
batch_keys_to_pop = []
non_tensor_batch_keys_to_pop = set(batch.non_tensor_batch.keys()) - reward_model_keys
gen_batch = batch.pop(
batch_keys=batch_keys_to_pop,
non_tensor_batch_keys=list(non_tensor_batch_keys_to_pop),
)
# For agent loop, we need reward model keys to compute score.
if self.async_rollout_mode:
gen_batch.non_tensor_batch.update(batch.non_tensor_batch)
return gen_batch
def _validate(self):
data_source_lst = []
reward_extra_infos_dict: dict[str, list] = defaultdict(list)
# Lists to collect samples for the table
sample_inputs = []
sample_outputs = []
sample_gts = []
sample_scores = []
sample_turns = []
sample_uids = []
for test_data in self.val_dataloader:
if "uid" not in test_data.keys():
test_data["uid"] = np.array(
[str(uuid.uuid4()) for _ in range(len(test_data["raw_prompt"]))], dtype=object
)
# repeat test data
repeated_test_data = self.repeat_dict(
test_data, repeat_times=self.config.actor_rollout_ref.rollout.val_kwargs.n, interleave=True
)
test_batch: TensorDict = self.dict_to_tensordict(repeated_test_data)
# we only do validation on rule-based rm
if self.config.reward_model.enable and test_batch[0]["reward_model"]["style"] == "model":
return {}
batch_meta = self.tq_client.put(data=test_batch, partition_id=f"val_{self.global_steps - 1}")
batch_meta.update_extra_info(
{
"eos_token_id": self.tokenizer.eos_token_id,
"pad_token_id": self.tokenizer.pad_token_id,
"recompute_log_prob": False,
"do_sample": self.config.actor_rollout_ref.rollout.val_kwargs.do_sample,
"validate": True,
"global_steps": self.global_steps,
}
)
print(f"batch_meta extra_info: {batch_meta.extra_info}")
# TODO: (TQ) Support padding and unpadding to make DataProto divisible by dp_size with TransferQueue
if not self.async_rollout_mode:
test_output_gen_meta = self.actor_rollout_wg.generate_sequences(batch_meta)
else:
test_output_gen_meta = self.async_rollout_manager.generate_sequences(batch_meta)
batch_meta = batch_meta.union(test_output_gen_meta)
print("validation generation end")
# Store generated outputs
test_response_meta = batch_meta.select_fields(["prompts", "responses", "uid", "reward_model"])
data = self.tq_client.get_data(test_response_meta)
output_ids = data["responses"]
output_texts = [self.tokenizer.decode(ids, skip_special_tokens=True) for ids in output_ids]
sample_outputs.extend(output_texts)
# TODO: Can we keep special tokens except for padding tokens?
input_ids = data["prompts"]
input_texts = [self.tokenizer.decode(ids, skip_special_tokens=True) for ids in input_ids]
sample_inputs.extend(input_texts)
sample_uids.extend(data["uid"])
ground_truths = [item.get("ground_truth", None) for item in data.get("reward_model", {})]
sample_gts.extend(ground_truths)
# evaluate using reward_function
if self.val_reward_fn is None:
raise ValueError("val_reward_fn must be provided for validation.")
# TODO (TQ): Support PR https://github.com/volcengine/verl/pull/4581
compute_reward_fields = [
"responses",
"prompts",
"attention_mask",
"reward_model",
"data_source",
]
# if "rm_scores" in batch_meta.field_names:
# compute_reward_fields = ["rm_scores"]
val_reward_meta = batch_meta.select_fields(compute_reward_fields)
result = compute_val_reward_decorated(self.val_reward_fn, val_reward_meta, return_dict=True)
reward_tensor = result["reward_tensor"]
scores = reward_tensor.sum(-1).cpu().tolist()
sample_scores.extend(scores)
reward_extra_infos_dict["reward"].extend(scores)
print(f"len reward_extra_infos_dict['reward']: {len(reward_extra_infos_dict['reward'])}")
if "reward_extra_info" in result:
for key, lst in result["reward_extra_info"].items():
reward_extra_infos_dict[key].extend(lst)
print(f"len reward_extra_infos_dict['{key}']: {len(reward_extra_infos_dict[key])}")
# collect num_turns of each prompt
if "__num_turns__" in batch_meta.field_names:
data = self.tq_client.get_data(batch_meta.select_fields(["__num_turns__"]))
sample_turns.append(data["__num_turns__"])
data_source = ["unknown"] * reward_tensor.shape[0]
if "data_source" in batch_meta.field_names:
data_source_meta = batch_meta.select_fields(["data_source"])
data = self.tq_client.get_data(data_source_meta)
data_source = data["data_source"]
data_source_lst.append(data_source)
self.tq_client.clear_samples(batch_meta)
self._maybe_log_val_generations(inputs=sample_inputs, outputs=sample_outputs, scores=sample_scores)
# dump generations
val_data_dir = self.config.trainer.get("validation_data_dir", None)
if val_data_dir:
self._dump_generations(
inputs=sample_inputs,
outputs=sample_outputs,
gts=sample_gts,
scores=sample_scores,
reward_extra_infos_dict=reward_extra_infos_dict,
dump_path=val_data_dir,
)
for key_info, lst in reward_extra_infos_dict.items():
assert len(lst) == 0 or len(lst) == len(sample_scores), f"{key_info}: {len(lst)=}, {len(sample_scores)=}"
data_sources = np.concatenate(data_source_lst, axis=0)
data_src2var2metric2val = process_validation_metrics(data_sources, sample_uids, reward_extra_infos_dict)
metric_dict = {}
for data_source, var2metric2val in data_src2var2metric2val.items():
core_var = "acc" if "acc" in var2metric2val else "reward"
for var_name, metric2val in var2metric2val.items():
n_max = max([int(name.split("@")[-1].split("/")[0]) for name in metric2val.keys()])
for metric_name, metric_val in metric2val.items():
if (
(var_name == core_var)
and any(metric_name.startswith(pfx) for pfx in ["mean", "maj", "best"])
and (f"@{n_max}" in metric_name)
):
metric_sec = "val-core"
else:
metric_sec = "val-aux"
pfx = f"{metric_sec}/{data_source}/{var_name}/{metric_name}"
metric_dict[pfx] = metric_val
if len(sample_turns) > 0:
sample_turns = np.concatenate(sample_turns)
metric_dict["val-aux/num_turns/min"] = sample_turns.min()
metric_dict["val-aux/num_turns/max"] = sample_turns.max()
metric_dict["val-aux/num_turns/mean"] = sample_turns.mean()
return metric_dict
def init_workers(self):
"""Initialize distributed training workers using Ray backend.
Creates:
1. Ray resource pools from configuration
2. Worker groups for each role (actor, critic, etc.)
"""
self.resource_pool_manager.create_resource_pool()
self.resource_pool_to_cls = {pool: {} for pool in self.resource_pool_manager.resource_pool_dict.values()}
# create actor and rollout
if self.hybrid_engine:
resource_pool = self.resource_pool_manager.get_resource_pool(Role.ActorRollout)
actor_rollout_cls = RayClassWithInitArgs(
cls=self.role_worker_mapping[Role.ActorRollout],
config=self.config.actor_rollout_ref,
role="actor_rollout",
)
self.resource_pool_to_cls[resource_pool]["actor_rollout"] = actor_rollout_cls
else:
raise NotImplementedError
# create critic
if self.use_critic:
resource_pool = self.resource_pool_manager.get_resource_pool(Role.Critic)
critic_cfg = omega_conf_to_dataclass(self.config.critic)
critic_cls = RayClassWithInitArgs(cls=self.role_worker_mapping[Role.Critic], config=critic_cfg)
self.resource_pool_to_cls[resource_pool]["critic"] = critic_cls
# create reference policy if needed
if self.use_reference_policy:
resource_pool = self.resource_pool_manager.get_resource_pool(Role.RefPolicy)
ref_policy_cls = RayClassWithInitArgs(
self.role_worker_mapping[Role.RefPolicy],
config=self.config.actor_rollout_ref,
role="ref",
)
self.resource_pool_to_cls[resource_pool]["ref"] = ref_policy_cls
# create a reward model if reward_fn is None
if self.use_rm:
# we create a RM here
resource_pool = self.resource_pool_manager.get_resource_pool(Role.RewardModel)
rm_cls = RayClassWithInitArgs(self.role_worker_mapping[Role.RewardModel], config=self.config.reward_model)
self.resource_pool_to_cls[resource_pool]["rm"] = rm_cls
# initialize WorkerGroup
# NOTE: if you want to use a different resource pool for each role, which can support different parallel size,
# you should not use `create_colocated_worker_cls`.
# Instead, directly pass different resource pool to different worker groups.
# See https://github.com/volcengine/verl/blob/master/examples/ray/tutorial.ipynb for more information.
all_wg = {}
wg_kwargs = {} # Setting up kwargs for RayWorkerGroup
if OmegaConf.select(self.config.trainer, "ray_wait_register_center_timeout") is not None:
wg_kwargs["ray_wait_register_center_timeout"] = self.config.trainer.ray_wait_register_center_timeout
if OmegaConf.select(self.config.global_profiler, "steps") is not None:
wg_kwargs["profile_steps"] = OmegaConf.select(self.config.global_profiler, "steps")
# Only require nsight worker options when tool is nsys
if OmegaConf.select(self.config.global_profiler, "tool") == "nsys":
assert (
OmegaConf.select(self.config.global_profiler.global_tool_config.nsys, "worker_nsight_options")
is not None
), "worker_nsight_options must be set when using nsys with profile_steps"
wg_kwargs["worker_nsight_options"] = OmegaConf.to_container(
OmegaConf.select(self.config.global_profiler.global_tool_config.nsys, "worker_nsight_options")
)
wg_kwargs["device_name"] = self.device_name
for resource_pool, class_dict in self.resource_pool_to_cls.items():
worker_dict_cls = create_colocated_worker_cls(class_dict=class_dict)
wg_dict = self.ray_worker_group_cls(
resource_pool=resource_pool,
ray_cls_with_init=worker_dict_cls,
**wg_kwargs,
)
spawn_wg = wg_dict.spawn(prefix_set=class_dict.keys())
all_wg.update(spawn_wg)
if self.use_critic:
self.critic_wg = all_wg["critic"]
self.critic_wg.init_model()
if self.use_reference_policy and not self.ref_in_actor:
self.ref_policy_wg = all_wg["ref"]
self.ref_policy_wg.init_model()
self.rm_wg = None
if self.use_rm:
self.rm_wg = all_wg["rm"]
self.rm_wg.init_model()
# we should create rollout at the end so that vllm can have a better estimation of kv cache memory
self.actor_rollout_wg = all_wg["actor_rollout"]
self.actor_rollout_wg.init_model()
# set transferqueue server info for each worker
for _, wg in all_wg.items():
wg.create_transferqueue_client(self.config)
# create async rollout manager and request scheduler
self.async_rollout_mode = False
if self.config.actor_rollout_ref.rollout.mode == "async":
from .agent_loop import AgentLoopManager
self.async_rollout_mode = True
if self.config.reward_model.enable and self.config.reward_model.enable_resource_pool:
rm_resource_pool = self.resource_pool_manager.get_resource_pool(Role.RewardModel)
else:
rm_resource_pool = None
self.async_rollout_manager = AgentLoopManager(
config=self.config,
worker_group=self.actor_rollout_wg,
rm_resource_pool=rm_resource_pool,
)
# TODO (TQ): initialize tq during worker init when enable TQ switch is stable
self.async_rollout_manager.create_transferqueue_client_for_workers()
def _save_checkpoint(self):
from verl.utils.fs import local_mkdir_safe
# path: given_path + `/global_step_{global_steps}` + `/actor`
local_global_step_folder = os.path.join(
self.config.trainer.default_local_dir, f"global_step_{self.global_steps}"
)
print(f"local_global_step_folder: {local_global_step_folder}")
actor_local_path = os.path.join(local_global_step_folder, "actor")
actor_remote_path = (
None
if self.config.trainer.default_hdfs_dir is None
else os.path.join(self.config.trainer.default_hdfs_dir, f"global_step_{self.global_steps}", "actor")
)
remove_previous_ckpt_in_save = self.config.trainer.get("remove_previous_ckpt_in_save", False)
if remove_previous_ckpt_in_save:
print(
"Warning: remove_previous_ckpt_in_save is deprecated,"
+ " set max_actor_ckpt_to_keep=1 and max_critic_ckpt_to_keep=1 instead"
)
max_actor_ckpt_to_keep = (
self.config.trainer.get("max_actor_ckpt_to_keep", None) if not remove_previous_ckpt_in_save else 1
)
max_critic_ckpt_to_keep = (
self.config.trainer.get("max_critic_ckpt_to_keep", None) if not remove_previous_ckpt_in_save else 1
)
self.actor_rollout_wg.save_checkpoint(
actor_local_path, actor_remote_path, self.global_steps, max_ckpt_to_keep=max_actor_ckpt_to_keep
)
if self.use_critic:
critic_local_path = os.path.join(local_global_step_folder, "critic")
critic_remote_path = (
None
if self.config.trainer.default_hdfs_dir is None
else os.path.join(self.config.trainer.default_hdfs_dir, f"global_step_{self.global_steps}", "critic")
)
self.critic_wg.save_checkpoint(
critic_local_path, critic_remote_path, self.global_steps, max_ckpt_to_keep=max_critic_ckpt_to_keep
)
# save dataloader
local_mkdir_safe(local_global_step_folder)
dataloader_local_path = os.path.join(local_global_step_folder, "data.pt")
dataloader_state_dict = self.train_dataloader.state_dict()
torch.save(dataloader_state_dict, dataloader_local_path)
# latest checkpointed iteration tracker (for atomic usage)
local_latest_checkpointed_iteration = os.path.join(
self.config.trainer.default_local_dir, "latest_checkpointed_iteration.txt"
)
with open(local_latest_checkpointed_iteration, "w") as f:
f.write(str(self.global_steps))
def _load_checkpoint(self):
if self.config.trainer.resume_mode == "disable":
return 0
# load from hdfs
if self.config.trainer.default_hdfs_dir is not None: