Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
90 commits
Select commit Hold shift + click to select a range
ddddf33
plan
ZhiyuLi-Nvidia May 1, 2026
8e03d85
plan: align Stage 4 with rl-arena/verl 1-hop pattern
ZhiyuLi-Nvidia May 2, 2026
9dc98d4
feat(data-plane): TransferQueue integration for GRPO with driver-side…
ZhiyuLi-Nvidia May 4, 2026
9f85f1a
refactor(data-plane): extract driver-side balanced packing into presh…
ZhiyuLi-Nvidia May 5, 2026
4a231a8
feat(data-plane): AsyncTrajectoryCollector writes rollouts to TQ when…
ZhiyuLi-Nvidia May 5, 2026
4024410
feat(data-plane): wire async-on-TQ end-to-end with driver-side balanc…
ZhiyuLi-Nvidia May 5, 2026
d42b607
fix(data-plane): preserve sample order and FLOPs semantics on @dp_dis…
ZhiyuLi-Nvidia May 5, 2026
6f91300
feat(data-plane): grpo_sync routes logprob/ref-logprob through @dp_di…
ZhiyuLi-Nvidia May 5, 2026
136bc46
refactor(data-plane): replace @dp_dispatch with TQPolicy subclass; ad…
ZhiyuLi-Nvidia May 5, 2026
a91f638
fix(data-plane): VLM extras, async fan-out, cleanup-on-failure
ZhiyuLi-Nvidia May 5, 2026
503273e
docs(data-plane): add API lifecycle doc with verl comparison
ZhiyuLi-Nvidia May 7, 2026
10cb0b8
feat(data-plane): sync 1-hop trajectory collector + per-sample key li…
ZhiyuLi-Nvidia May 7, 2026
37ac8c1
refactor(data-plane): extract make_actor_runtime_env, fix N² list copy
ZhiyuLi-Nvidia May 7, 2026
f0f804a
feat(data-plane): jagged tensors on TQ wire + naming/factory cleanup
ZhiyuLi-Nvidia May 7, 2026
4437381
refactor(data-plane): KVBatchMeta.subset/slice/concat methods
ZhiyuLi-Nvidia May 7, 2026
dcff72a
Mooncake cpu backend
ZhiyuLi-Nvidia May 7, 2026
a3087a7
Readability Refactor
ZhiyuLi-Nvidia May 8, 2026
21cba95
wip test mooncake
ZhiyuLi-Nvidia May 8, 2026
686041b
refactor(data-plane): drop dead set_wire_format/_PACK_JAGGED + adapte…
ZhiyuLi-Nvidia May 8, 2026
cb36841
refactor(ray.sub): drop NETWORK_INIT_CMDS — MC_TCP_BIND_ADDRESS suffices
ZhiyuLi-Nvidia May 8, 2026
8b50d2e
docs(data-plane): consolidate README; drop stale plan/verl refs
ZhiyuLi-Nvidia May 8, 2026
96605bd
feat(data-plane): non-tensor object support on TQ wire
ZhiyuLi-Nvidia May 8, 2026
8a2ecaf
feat(grpo-sync): equivalency fixes + content via TQ object column
ZhiyuLi-Nvidia May 9, 2026
fcd1c13
style: fix ruff lint errors and apply ruff format
ZhiyuLi-Nvidia May 9, 2026
45281d7
style: apply pre-commit auto-fixes (ruff)
ZhiyuLi-Nvidia May 9, 2026
6e7f80c
chore(pyrefly): whitelist all new data_plane files + fix type errors
ZhiyuLi-Nvidia May 9, 2026
8782aa1
remove unnecessary script
ZhiyuLi-Nvidia May 9, 2026
a374d49
feat(data-plane): decompose message_log at wire boundary
ZhiyuLi-Nvidia May 12, 2026
4231cd2
chore: regenerate uv.lock after rebase onto main
ZhiyuLi-Nvidia May 13, 2026
35f6b5d
refactor(data-plane): rename DataPlaneClient.get_meta → claim_meta
ZhiyuLi-Nvidia May 12, 2026
1f59951
docs(data-plane): tighten DataPlaneClient boundary docstring
ZhiyuLi-Nvidia May 12, 2026
e40fb19
fix(data-plane): treat DataPlaneConfig.enabled as required field
ZhiyuLi-Nvidia May 12, 2026
607d1f5
docs(data-plane): make build_data_plane_client docstring backend-agno…
ZhiyuLi-Nvidia May 12, 2026
77a5129
refactor(data-plane): promote codec imports to module top-level
ZhiyuLi-Nvidia May 12, 2026
5de226c
refactor(data-plane): rename driver_io → column_io
ZhiyuLi-Nvidia May 12, 2026
cffc921
refactor(data-plane): validate dp_world at TQPolicy config time
ZhiyuLi-Nvidia May 12, 2026
ba1282f
refactor(data-plane): centralize packing-meta keys in schema.py
ZhiyuLi-Nvidia May 13, 2026
5b58ca6
refactor(data-plane): drop redundant dp_world assert in shard_meta_fo…
ZhiyuLi-Nvidia May 13, 2026
9a045ec
refactor(data-plane): move DP_SEED_FIELDS to schema.py as DP_TRAIN_FI…
ZhiyuLi-Nvidia May 13, 2026
19a8336
fix(data-plane): reject empty meta in shard_meta_for_dp
ZhiyuLi-Nvidia May 13, 2026
b181104
refactor(data-plane): print_event → log_event via stdlib logging
ZhiyuLi-Nvidia May 13, 2026
7f4340f
style(data-plane): match repo logger naming convention
ZhiyuLi-Nvidia May 13, 2026
46ef02b
refactor(data-plane): convert DataPlaneStats to @dataclass
ZhiyuLi-Nvidia May 13, 2026
6f7c4bf
refactor(data-plane): type DataPlaneEvent as TypedDict
ZhiyuLi-Nvidia May 13, 2026
aaa9f98
refactor(data-plane): drop placeholder 0s from _run; make sizes kw-only
ZhiyuLi-Nvidia May 13, 2026
8c4d0f6
fix(data-plane): route check_consumption_status through _run
ZhiyuLi-Nvidia May 13, 2026
0282643
fix(data-plane): route close() through _run
ZhiyuLi-Nvidia May 13, 2026
ab56af3
perf(data-plane): single sync in to_nested_by_length
ZhiyuLi-Nvidia May 13, 2026
382386a
docs(data-plane): convert codec.py docstrings to Google style
ZhiyuLi-Nvidia May 13, 2026
cde0468
refactor(data-plane): centralize Layout type alias in schema.py
ZhiyuLi-Nvidia May 13, 2026
0458982
fix(data-plane): validate pad_to_multiple >= 1 in materialize
ZhiyuLi-Nvidia May 13, 2026
8aa016d
fix(data-plane): fail fast on empty local IP at Mooncake bootstrap
ZhiyuLi-Nvidia May 13, 2026
ba064f8
fix(data-plane): surface chmod failure when mooncake_master is not exec
ZhiyuLi-Nvidia May 13, 2026
d5f0065
refactor(data-plane): scope mooncake_cpu 1D workaround to TQDataPlane…
ZhiyuLi-Nvidia May 13, 2026
f008295
docs(data-plane): clarify TQ module vs client access convention
ZhiyuLi-Nvidia May 13, 2026
274efbd
docs(data-plane): note trust boundary at pack_object_array pickle site
ZhiyuLi-Nvidia May 13, 2026
46cacb4
refactor(data-plane): drop codec pickle, use TQ-native NonTensorStack
ZhiyuLi-Nvidia May 13, 2026
0e7dd77
refactor(data-plane): drop dead object-array codec helpers
ZhiyuLi-Nvidia May 13, 2026
ffecbe4
refactor(data-plane): centralize _meta_idx sentinel in schema.py
ZhiyuLi-Nvidia May 13, 2026
afb1c30
docs(data-plane): convert interfaces.py docstrings to Google style
ZhiyuLi-Nvidia May 13, 2026
671bf62
refactor(data-plane): align schema constant names with their values
ZhiyuLi-Nvidia May 13, 2026
f14d389
docs(data-plane): tighten preshard.py docstring to Google style
ZhiyuLi-Nvidia May 13, 2026
8177270
docs(data-plane): convert column_io.py docstrings to Google style
ZhiyuLi-Nvidia May 13, 2026
ef71cfd
docs(data-plane): convert factory.py docstring to Google style
ZhiyuLi-Nvidia May 13, 2026
f67e40e
docs(data-plane): add Args/Returns blocks to observability.py docstrings
ZhiyuLi-Nvidia May 13, 2026
c3b0a62
docs(data-plane): tighten transfer_queue.py docstrings, add Args/Retu…
ZhiyuLi-Nvidia May 13, 2026
fc1c0ff
docs(data-plane): add Args/Returns to worker_mixin.py docstrings
ZhiyuLi-Nvidia May 13, 2026
37f1f06
docs(data-plane): add Args/Returns blocks to tq_policy.py docstrings
ZhiyuLi-Nvidia May 13, 2026
6cbb921
docs(data-plane): convert sync_rollout_actor.py docstrings to Google …
ZhiyuLi-Nvidia May 13, 2026
2c0d4ae
docs(data-plane): add Args/Returns to grpo_sync.py dynamic-sampling h…
ZhiyuLi-Nvidia May 13, 2026
bd71482
refactor(data-plane): drop _to_wire's redundant promote_1d kwarg
ZhiyuLi-Nvidia May 13, 2026
c3ac223
fix(data-plane): survive TQ simple-backend NonTensorData wire-strip
ZhiyuLi-Nvidia May 14, 2026
420bbe9
build(data-plane): pin mooncake-transfer-engine-cuda13 wheel for cu13…
ZhiyuLi-Nvidia May 14, 2026
30bbe91
chore: ruff auto-fix and ruff-format pass
ZhiyuLi-Nvidia May 14, 2026
b48f21c
chore(pyrefly): rename driver_io → column_io in whitelist
ZhiyuLi-Nvidia May 14, 2026
0523f4c
chore(pyrefly): silence 5 latent type errors with targeted ignore com…
ZhiyuLi-Nvidia May 14, 2026
dec8339
chore(pyrefly): whitelist nemo_rl/data_plane/schema.py
ZhiyuLi-Nvidia May 14, 2026
4c035c7
fix(data-plane): preserve object-column identity through TQ wire
ZhiyuLi-Nvidia May 14, 2026
3ee70c4
fix(data-plane): gate TQ write-back on TP×CP×PP leader to avoid dupli…
ZhiyuLi-Nvidia May 14, 2026
209d266
chore: ruff auto-fix and D205 docstring fixes
ZhiyuLi-Nvidia May 14, 2026
212ce55
refactor(data-plane): drop async-grpo TQ scaffolding from sync PR
ZhiyuLi-Nvidia May 14, 2026
e3661b0
refactor(data-plane): consolidate producer codec, caller mints keys
ZhiyuLi-Nvidia May 14, 2026
0673a46
test(data-plane): align codec tests with current contract
ZhiyuLi-Nvidia May 14, 2026
200645a
refactor(grpo_sync): drop dead batch_cache; make TQPolicy attrs public
ZhiyuLi-Nvidia May 14, 2026
b38b473
refactor(data-plane): extract calibration field filter into named sch…
ZhiyuLi-Nvidia May 15, 2026
416df7c
refactor(data-plane): make kv_batch_get(select_fields) required
ZhiyuLi-Nvidia May 15, 2026
eecbcc4
refactor(sync-rollout-actor): remove unused wrappers; document full l…
ZhiyuLi-Nvidia May 15, 2026
25dcf41
test(data-plane): move data_plane unit tests under tests/unit/ for CI…
ZhiyuLi-Nvidia May 15, 2026
fff691d
test(data-plane): apply ruff --fix and import-sort to data_plane unit…
ZhiyuLi-Nvidia May 15, 2026
c069f92
docs: fix broken nemo-gym Core Components link
ZhiyuLi-Nvidia May 15, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/design-docs/nemo-gym-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ sequenceDiagram
GRPO->>Policy: Compute loss and train
```

> **NeMo Gym server types** (see [Core Components](https://docs.nvidia.com/nemo/gym/latest/about/concepts/core-components.html)):
> **NeMo Gym server types** (see [Core Components](https://docs.nvidia.com/nemo/gym/about/core-components)):
> - **Agent Server**: Orchestrates the rollout loop
> - **Model Server**: HTTP proxy to vLLM; translates Responses API ↔ Chat Completions
> - **Resource Server**: Provides tools and rewards
Expand Down
14 changes: 14 additions & 0 deletions examples/configs/grpo_math_1B.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -391,3 +391,17 @@ logger:
cluster:
gpus_per_node: 1
num_nodes: 1

# TransferQueue-mediated data plane for sync GRPO.
# Off by default — the legacy grpo_train trainer never engages this.
# Flip enabled=true and run grpo_train_sync to use TQ-mediated bulk
# transfer between rollout and train. See nemo_rl/data_plane/README.md.
data_plane:
enabled: false
impl: transfer_queue
# backend: "simple" # NotRequired: TQ storage backend ('simple' or 'mooncake_cpu')
# storage_capacity: 1000000 # NotRequired
# num_storage_units: 2 # NotRequired
# claim_meta_poll_interval_s: 0.5 # NotRequired: blocking-claim poll cadence
# observability: # NotRequired
# enabled: false
42 changes: 37 additions & 5 deletions examples/run_grpo.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,20 @@ def main() -> None:
val_task_to_env,
) = setup_response_data(tokenizer, config["data"], config["env"])

# Pick the policy factory at the launcher level so the legacy trainer
# stays data-plane-agnostic (architectural invariant — see
# tests/data_plane/unit/test_architecture_invariants.py).
_dp_cfg = config.get("data_plane") or {}
if _dp_cfg.get("enabled", False):
from nemo_rl.models.policy.tq_policy import TQPolicy

def _make_policy(**kwargs):
return TQPolicy(**kwargs, dp_cfg=_dp_cfg)

_policy_factory = _make_policy
else:
_policy_factory = None # setup() defaults to plain Policy

(
policy,
policy_generation,
Expand All @@ -110,7 +124,13 @@ def main() -> None:
checkpointer,
grpo_state,
master_config,
) = setup(config, tokenizer, dataset, val_dataset)
) = setup(
config,
tokenizer,
dataset,
val_dataset,
policy_factory=_policy_factory,
)

# Check if async mode is enabled
if "async_grpo" in config["grpo"] and config["grpo"]["async_grpo"]["enabled"]:
Expand Down Expand Up @@ -164,10 +184,22 @@ def main() -> None:
max_trajectory_age_steps=async_config["max_trajectory_age_steps"],
)
else:
print("🚀 Running synchronous GRPO training")

# Run standard GRPO training
grpo_train(
# Two parallel synchronous trainers (verl-style — main_ppo.py vs
# main_ppo_sync.py). data_plane.enabled selects which one runs:
# the legacy in-memory path or the TransferQueue-mediated fork.
# Same model, same data, same seed → diff the wandb runs to
# validate parity.
dp_cfg = master_config.get("data_plane", {})
if dp_cfg.get("enabled", False):
from nemo_rl.algorithms.grpo_sync import grpo_train_sync

print("🚀 Running synchronous GRPO training (TransferQueue)")
trainer = grpo_train_sync
else:
print("🚀 Running synchronous GRPO training (legacy)")
trainer = grpo_train

trainer(
policy,
policy_generation,
dataloader,
Expand Down
14 changes: 11 additions & 3 deletions nemo_rl/algorithms/grpo.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import warnings
from concurrent.futures import ThreadPoolExecutor
from contextlib import nullcontext
from typing import Any, NotRequired, Optional, TypedDict, TypeVar, cast
from typing import Any, Callable, NotRequired, Optional, TypedDict, TypeVar, cast

import numpy as np
import ray
Expand Down Expand Up @@ -58,6 +58,7 @@
get_keys_from_message_log,
)
from nemo_rl.data.utils import extract_necessary_env_names
from nemo_rl.data_plane.interfaces import DataPlaneConfig
from nemo_rl.distributed.batched_data_dict import BatchedDataDict
from nemo_rl.distributed.ray_actor_environment_registry import get_actor_python_env
from nemo_rl.distributed.virtual_cluster import ClusterConfig, RayVirtualCluster
Expand Down Expand Up @@ -206,6 +207,7 @@ class MasterConfig(TypedDict):
logger: GRPOLoggerConfig
cluster: ClusterConfig
checkpointing: CheckpointingConfig
data_plane: NotRequired[DataPlaneConfig]


# ===============================================================================
Expand All @@ -219,6 +221,7 @@ def setup(
dataset: AllTaskProcessedDataset | dict[str, AllTaskProcessedDataset],
val_dataset: Optional[AllTaskProcessedDataset],
processor: Optional[AutoProcessor] = None,
policy_factory: Optional[Callable[..., ColocatablePolicyInterface]] = None,
) -> tuple[
ColocatablePolicyInterface,
Optional[GenerationInterface],
Expand Down Expand Up @@ -582,10 +585,15 @@ def init_train_dataloader(dataset, suffix: str = ""):
"(reference model is not loaded)."
)

# Caller-supplied factory lets the sync trainer swap in a TQ-mediated
# Policy subclass without this shared setup needing to know the data
# plane exists. Default is the plain Policy class — legacy behavior.
_make_policy = policy_factory if policy_factory is not None else Policy

def init_policy():
"""Initialize policy training workers."""
t0 = time.perf_counter()
p = Policy(
p = _make_policy(
cluster=train_cluster,
config=policy_config,
tokenizer=tokenizer,
Expand Down Expand Up @@ -2554,7 +2562,7 @@ def async_grpo_train(
)

replay_buffer = ReplayBuffer.options(runtime_env=_replay_runtime_env).remote(
max_size=optimal_buffer_size
max_size=optimal_buffer_size,
)

_tc_py_exec = get_actor_python_env(
Expand Down
Loading
Loading