Skip to content

feat: data plane transfer queue integration#2439

Open
ZhiyuLi-Nvidia wants to merge 90 commits into
mainfrom
zhiyul/data_plane_plan
Open

feat: data plane transfer queue integration#2439
ZhiyuLi-Nvidia wants to merge 90 commits into
mainfrom
zhiyul/data_plane_plan

Conversation

@ZhiyuLi-Nvidia
Copy link
Copy Markdown
Contributor

@ZhiyuLi-Nvidia ZhiyuLi-Nvidia commented May 7, 2026

What does this PR do ?

Summary

  • Introduces a TransferQueue-mediated data-plane for sync GRPO. Bulk per-token tensors live in TQ between rollout and train; the
    driver only handles per-sample slices and KVBatchMeta.
  • Backends supported: simple and mooncake_cpu
  • Wire format:
    • type-driven dispatch — tensors → maybe_pack_jagged (uint8 jagged for variable-length fields)
    • non-tensors →ack_object_array (verl-style np.ndarray(dtype=object) pickled into uint8 jagged). Object fields recorded in meta.extra_info[META_OBJECT_FIELDS] for read-side decode. Backends only ever see tensors.
  • Core principle: 1-hop offload. Data movement stays local to the producer (rollout actor → TQ) and the consumer (worker fetch → train). The driver never holds bulk between rollout finish and train fan-out — only metadata and small per-sample fields cross Ray.

Scope

  • nemo_rl/data_plane/: codec, adapters (NoOp / TQ over simple + mooncake_cpu), preshard helpers, observability hooks, driver I/O
    (read_columns / write_columns).
  • nemo_rl/algorithms/grpo_sync.py: TQ-only sibling of grpo.grpo_train. Same algorithm/metrics; per-step lifecycle is prepare_step →
    rollout 1-hop put → meta-driven logprob/train → kv_clear.
  • nemo_rl/experience/sync_rollout_actor.py: rollout + flatten + first put encapsulated in one Ray actor. Bulk never visits driver.
  • nemo_rl/models/policy/tq_policy.py: TQ-mediated Policy subclass; meta-driven workers via worker_mixin._fetch.

Test

https://wandb.ai/nvidia/nemorl-dataplane-zhiyul?nw=nwuserzhiyul

Usage

  • You can potentially add a usage example below
# Add a code snippet demonstrating how to use this

Before your PR is "Ready for review"

Pre checks:

  • Make sure you read and followed Contributor guidelines
  • Did you write any new necessary tests?
  • Did you run the unit tests and functional tests locally? Visit our Testing Guide for how to run tests
  • Did you add or update any necessary documentation? Visit our Document Development Guide for how to write, build and test the docs.

Additional Information

  • ...

@ZhiyuLi-Nvidia ZhiyuLi-Nvidia requested review from a team as code owners May 7, 2026 17:22
@copy-pr-bot
Copy link
Copy Markdown

copy-pr-bot Bot commented May 7, 2026

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@ZhiyuLi-Nvidia ZhiyuLi-Nvidia marked this pull request as draft May 7, 2026 17:22
@ZhiyuLi-Nvidia ZhiyuLi-Nvidia force-pushed the zhiyul/data_plane_plan branch from bff0471 to d20a6ed Compare May 9, 2026 01:15
@ZhiyuLi-Nvidia
Copy link
Copy Markdown
Contributor Author

/ok to test d20a6ed

@ZhiyuLi-Nvidia ZhiyuLi-Nvidia force-pushed the zhiyul/data_plane_plan branch from d20a6ed to e7f6a91 Compare May 9, 2026 02:02
@ZhiyuLi-Nvidia ZhiyuLi-Nvidia marked this pull request as ready for review May 9, 2026 02:02
@ZhiyuLi-Nvidia ZhiyuLi-Nvidia requested a review from a team as a code owner May 9, 2026 02:02
@ZhiyuLi-Nvidia
Copy link
Copy Markdown
Contributor Author

/ok to test e7f6a91

@ZhiyuLi-Nvidia
Copy link
Copy Markdown
Contributor Author

/ok to test f8add06

@ZhiyuLi-Nvidia
Copy link
Copy Markdown
Contributor Author

/ok to test c7cb642

@ZhiyuLi-Nvidia ZhiyuLi-Nvidia force-pushed the zhiyul/data_plane_plan branch from c7cb642 to fa121a5 Compare May 9, 2026 02:27
@ZhiyuLi-Nvidia
Copy link
Copy Markdown
Contributor Author

/ok to test fa121a5

@ZhiyuLi-Nvidia ZhiyuLi-Nvidia added the CI:L1 Run doctests, unit tests, and functional tests label May 9, 2026
@ZhiyuLi-Nvidia ZhiyuLi-Nvidia force-pushed the zhiyul/data_plane_plan branch from fa121a5 to 8de60a8 Compare May 9, 2026 02:41
@ZhiyuLi-Nvidia
Copy link
Copy Markdown
Contributor Author

/ok to test 8de60a8

@ZhiyuLi-Nvidia
Copy link
Copy Markdown
Contributor Author

/ok to test aeb273c

@ZhiyuLi-Nvidia
Copy link
Copy Markdown
Contributor Author

/ok to test 1596562

@ZhiyuLi-Nvidia
Copy link
Copy Markdown
Contributor Author

/ok to test abada7e

ZhiyuLi-Nvidia and others added 2 commits May 14, 2026 00:51
nemo_rl was wrapping ``np.ndarray(dtype=object)`` columns as
``NonTensorStack(*v.tolist())`` before storing them as leaves in the
``TensorDict`` passed to ``dp_client.kv_batch_put``. Under
``tensordict==0.12.2``, ``bulk[k]`` on such a leaf returns an internal
``LinkedList`` — the ``NonTensorStack`` class identity is lost, and
calling ``.contiguous()`` on the parent ``TensorDict`` collapses the
leaf to an empty ``TensorDict``, dropping the wrapped Python objects
entirely.

Symptom: simple-backend GRPO recipes crash at the first
``kv_batch_get`` for ``content`` with
``RuntimeError: All tensordicts must be non-tensors`` inside
``_pack_field_values``, because every batch position is an empty
``TensorDict`` instead of the expected per-sample string.

Fix:
- ``nemo_rl/experience/sync_rollout_actor.py``: pass object arrays
  through as ``np.ndarray`` (canonical site, full rationale).
- ``nemo_rl/data_plane/column_io.py``: same on the ``write_columns``
  path; refers to ``kv_first_write``.
- ``nemo_rl/data_plane/adapters/transfer_queue.py``: drop the
  ``.contiguous()`` call — TQ's encoder forces ``.contiguous()`` per
  tensor leaf itself, and on a parent TD with non-tensor leaves the
  call is destructive.

``TensorDict`` preserves ``ndarray(dtype=object)`` identity through
``__getitem__``, and TQ's encoder serializes object arrays via
``CUSTOM_TYPE_PICKLE``. No TQ patch required. As a bonus, the new
path skips the ``.tolist()`` materialization that the old wrapper
performed per write.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
…cate writes

Under TP-only configs (e.g. TP=2 CP=1), every rank in the TP group
was calling ``TQWorkerMixin._write_back`` and racing to write the
same per-sample keys (``prev_logprobs`` etc.). On the simple
backend the second writer's bytes silently overwrote the first
(``last-write-wins`` on a Python dict — benign because the data is
identical post-all-reduce). On the mooncake_cpu backend the
``MooncakeStore`` master rejected the second writer's
``BatchPutEnd`` with ``ILLEGAL_CLIENT`` (-601) because the metadata
``client_id`` was set to the first writer's UUID — the recipe
crashed at the first ``kv_batch_put`` of the offending step.

The existing leader check ``_is_replica_leader`` correctly returns
False for non-leaders, but only when ``_get_replica_group`` returns
a non-None group. Subclasses gate ``_get_replica_group`` on
``CP > 1`` as a fetch-path optimization (the docstring explicitly
calls out "matches the qwen3-mcore TP=2 baseline"). That gate
incorrectly disables the leader check on the write-back path too:
``CP=1 ⇒ replica_group is None ⇒ _is_replica_leader → True for
every rank``.

Split the write-back leader check from the replica-group machinery:

- ``TQWorkerMixin._is_writeback_leader``: default delegates to
  ``_is_replica_leader`` (preserves behavior for workers with no
  parallelism).
- ``MegatronPolicyWorkerImpl._is_writeback_leader``: override gates on
  ``(tp_rank, cp_rank, pp_rank) == (0, 0, 0)`` via mcore
  ``parallel_state`` — unconditional, no CP gate.
- ``DTensorPolicyWorkerV2Impl._is_writeback_leader``: same idea but
  using ``device_mesh["cp"].get_local_rank()`` /
  ``device_mesh["tp"].get_local_rank()``.
- ``_write_back`` switched from ``_is_replica_leader`` to
  ``_is_writeback_leader``.

Correctness: simple backend's ``last-write-wins`` already proves the
data is identical across TP siblings (DSv3 32n8g TP=32 simple passes
its closing ``check_metrics.py`` with the same multi-write pattern;
gating to leader-only is semantically equivalent). Mooncake's race is
eliminated because exactly one client now writes each key.

Perf: ``tp_size - 1`` redundant ``kv_batch_put`` calls per training
step are now skipped on every backend, not just mooncake_cpu.

Verified by JOBID 11758259 (1n8g megatron TP=2 + temp/top-p/top-k
sampling on mooncake_cpu) — past Step 11/500 with no -601, whereas
every prior attempt of this recipe crashed at Step 1 within ~5 min.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
@ZhiyuLi-Nvidia
Copy link
Copy Markdown
Contributor Author

/ok to test 3ee70c4

Pre-commit auto-fix changes flagged by CI:

- nemo_rl/data_plane/column_io.py: remove unused NonTensorStack import (only referenced in docstrings/comments now).
- nemo_rl/experience/sync_rollout_actor.py: drop blank line between docstring close and first statement.

Plus two D205 (blank-line-between-summary-and-description) fixes that ruff check flagged after the auto-fixes:

- nemo_rl/models/policy/workers/dtensor_policy_worker_v2.py:216
- nemo_rl/models/policy/workers/megatron_policy_worker.py:117

Both _is_writeback_leader docstrings had a summary line ending with 'See' and the description continuing on the next line without a blank separator.

Verified: ruff check + ruff format --check both pass on nemo_rl/, tests/, examples/, docker/, docs/.
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
@ZhiyuLi-Nvidia
Copy link
Copy Markdown
Contributor Author

/ok to test 209d266

Copy link
Copy Markdown
Contributor

@yuki-97 yuki-97 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ZhiyuLi-Nvidia thanks for the great work! I go through grpo_sync.py and left some comments.

"timestamp": time.time(),
}

# When the data plane is enabled, replace the in-memory dict
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious, does the changes in this file (nemo_rl/algorithms/async_utils/trajectory_collector.py) an example for how to using TQ?
since I think this file will only be used in async grpo and this PR won't actually use it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct — this part is async-grpo for the incoming grpo_async intergration and not yet in part of this PR. I'll drop it to keep the scope sync-only from this PR and we can re-introduce it later together with async refactoring.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted the change in nemo_rl/algorithms/async_utils/trajectory_collector.py.

from nemo_rl.models.generation.interfaces import GenerationInterface


def kv_first_write(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move it as a function of DataPlaneClient?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @yuki-97.

kv_first_write is driver aware in original design and DataPlaneClient isn't the right place. To address your comments, I refactored this part for better the rollout-shape decoupling so what's left is small enough that the question dissolves.

Now, kv_first_write moved to column_io.py sync_rollout_actor.py −95 LOC, client stays transport-only.

Comment thread nemo_rl/algorithms/grpo_sync.py Outdated
step_finished=True,
)

batch_cache = None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks it's unused?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Removed this unused variable.

Comment thread nemo_rl/models/policy/tq_policy.py Outdated
Comment on lines +125 to +126
self._dp_client = build_data_plane_client(dp_cfg, bootstrap=True)
self._tq_partition_id = tq_partition_id
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks the two are used in many places outside the class in grpo_sync.py, wdyt making them public?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good comments.

Fixed.

# no need to refetch the bulk schema. Logprob /
# mask / adv columns added later are irrelevant
# here.
_calib_fields = [
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we just choose what we need here instead of not in?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the comments.
A pure positive list isn't feasible as multimodal field names are model-specific (pixel_values, image_grid_thw, audio extras, …) and there's no static.

Here's the fix to address your concern: create the exclude set into a named DP_CALIB_EXCLUDED_FIELDS in data_plane/schema.py.

partition_id=partition_id,
task_name=None,
keys=list(keys),
fields=field_names,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems meta.fields only contains the filed put this time instead of all fields. is it what we want?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, thats intentional. The point is to minimize transfer: each consumer (logprob, training, calibration) pulls only its needed columns via select_fields. meta.fields lists what's available; the wire only carries what's selected.

)

if self.policy_generation is not None:
self.policy_generation.finish_generation()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious, why we need policy_generation.finish_generation() here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is the exact same policy_generation.finish_generation() performing the same GPU cleanup on the same worker. The only change is who triggers it and when:

  • Old way (legacy grpo.py): The driver called it as a separate step after the rollout finished.
  • New way (grpo_sync.py): The rollout actor calls it automatically as part of wrapping up the rollout — bundled into the same Ray round-trip as the rollout itself, so the driver gets back everything it needs (metadata, metrics) in one call instead of three.

gen_metrics = None
return meta, slice_extras, rollout_metrics, gen_metrics

def finish_generation(self) -> None:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems grpo_sync.py still just use policy_generation.finish_generation() and I think good to just use that.
so do we still need this function in the class? seems it not used anywhere.

also for get_logger_metrics and clear_logger_metrics.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeap. Good catch removed all these unused apis.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The three wrapper methods were never called by anyone outside. The actual work resetting accumulators before rollout, capturing metrics after, releasing GPU resources still happens correctly through direct policy_generation calls inside rollout_and_first_put, and gen_metrics is returned to the driver in the result tuple. Removed the unused wrappers; behavior unchanged.

Modify to make it more transparent.

The async-grpo data-plane plumbing in AsyncTrajectoryCollector (dp_cfg
constructor param, _ensure_dp_client(), and the kv_batch_put branch in
_run_prompt_group_worker) isn't exercised in the sync data-plane PR.
Re-introduce it together with the async-grpo refactor.

Per yuki-97 PR review (#1).

Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
DataPlaneClient stays a pure transport. Rollout-shape concerns (key
layout, batch geometry) move to the producer.

* New codec.pack_jagged_fields() owns the single wire-layout transform
  (jagged pack + np.ndarray(dtype=object) passthrough). Previously
  duplicated across kv_first_write, write_columns, and
  worker_mixin._write_back.
* kv_first_write moves to data_plane/column_io.py next to write_columns
  and takes pre-minted keys (no longer (uids, n_gen)). Rollout actor
  builds the keys inline at the call site, matching how verl's
  AgentLoopWorkerTQ._agent_loop_postprocess draws the same line.
* worker_mixin._write_back collapses to write_columns(...).
* Unit tests updated for the new keys= signature.

Net: sync_rollout_actor.py -95 LOC; client signature unchanged.

Per yuki-97 PR review (#2).

Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
* Remove test_materialize_rejects_non_tensor_leaves: codec.materialize
  now deliberately accepts NonTensorData (commit c3ac42342, simple-
  backend wire-strip survival). The "wire is tensors only" assertion
  predated that fix and contradicts current behavior.
* Remove test_no_data_plane_in_master_config: TODO gate that fires
  until legacy grpo.py is retired; tracked separately.
* Rewrite test_codec_wire_stripped.py to cover the production decode
  paths via:
    - direct unit coverage of unwrap_wire_stripped_payload (per-item
      helper; doesn't need a NonTensorStack construction);
    - end-to-end materialize coverage via patch.object(stack, "tolist")
      to simulate the wire-stripped state (tensordict>=0.12.2 rejects
      NonTensorStack(TensorDict({}, batch_size=[])) at construction).
  Wire round-trip e2e for object columns is already covered by
  tests/data_plane/functional/test_tq_lifecycle.py.

Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
* grpo_sync.py: remove unused batch_cache = None (leftover from
  grpo.py-style dynamic sampling; grpo_sync threads survivors through
  pending_meta / pending_slice).
* TQPolicy: rename _dp_client -> dp_client and _tq_partition_id ->
  tq_partition_id. They are read from grpo_sync.py in 7 places, so
  the underscore prefix was misleading. Constructor kwarg
  tq_partition_id already matched the new attribute name.
* Update README + data_plane_api_lifecycle docs example snippets.

Per yuki-97 PR review (#3, #4).

Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
…ema constant

Replaces the inline 6-field exclusion tuple in grpo_sync.py with
DP_CALIB_EXCLUDED_FIELDS in data_plane/schema.py, derived from
DP_TRAIN_FIELDS - {input_ids, input_lengths}. A new column added to
DP_TRAIN_FIELDS is now excluded-by-default from KV-scale calibration;
opting it in requires editing the private base set explicitly.

Multimodal extras (pixel_values, image_grid_thw, etc.) pass through
unchanged because they are not in DP_TRAIN_FIELDS.

Per yuki-97 PR review (#5).

Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
Silent over-fetch was possible when callers omitted select_fields: the
noop adapter fetched every registered field via set intersection; the
TQ adapter forwarded None to the backend. Bulk schemas are wide and
fetching everything is the most expensive shape the wire can take.

select_fields is now a required list[str] on DataPlaneClient.kv_batch_get
and all concrete implementations. Callers must name what they read;
fetch-all is still possible by passing list(meta.fields) explicitly.

Also: worker_mixin internal call sites use list(meta.fields) directly
(fail-loud TypeError if meta.fields is None, rather than silently
producing an empty TensorDict).

Per yuki-97 PR review (#6).

Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
…ifecycle

Remove three actor-level wrappers (finish_generation, get_logger_metrics,
clear_logger_metrics) that had zero external callers. The actor's internal
code already calls self.policy_generation.{...} directly at the right
points inside rollout_to_tq; the wrappers added indirection without value.

Rewrite the rollout_to_tq docstring to list all six steps bundled into
the single Ray RPC (reset metrics -> rollout -> flatten -> TQ put ->
release GPU -> capture metrics), making the lifecycle visible without
having to read the method body.

Per yuki-97 PR review (#7, #8).

Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
@ZhiyuLi-Nvidia
Copy link
Copy Markdown
Contributor Author

/ok to test eecbcc4

… discovery

tests/unit/L0_Unit_Tests_*.sh hard-code TEST_PATHS=('unit/'), so any test outside tests/unit/ is silently skipped. Our data_plane suite lived at tests/data_plane/unit/ and was never collected by CI.

Move all 19 unit tests + conftest + __init__ into tests/unit/data_plane/ via git mv (one was untracked, so plain mv). The tests/data_plane/functional/ tree stays where it is — those are Tier 2 (Ray + TQ), need a separate runner.

Plus three drive-by fixes flagged by CI lint:

- nemo_rl/data_plane/docs/data_plane_api_lifecycle.md → data-plane-api-lifecycle.md (new pre-commit hook disallows underscores in .md filenames).
- nemo_rl/data_plane/column_io.py:158: change variable annotation Mapping -> dict so pyrefly accepts the dict-comp result at the pack_jagged_fields call site (the function signature is dict[str, ...]). Mapping import drops as unused; ruff auto-fixes.
- nemo_rl/data_plane/worker_mixin.py:224,249: pyrefly no-matching-overload on list(meta.fields) where meta.fields: list[str] | None. Use # type: ignore[no-matching-overload] rather than list(meta.fields or []) — the runtime contract guarantees meta.fields is non-None at these call sites; silently substituting [] would mean fetch-nothing which is wrong.
- nemo_rl/experience/sync_rollout_actor.py: add 'import torch' (F821 — module used torch.zeros_like and isinstance(v, torch.Tensor) without importing torch).

Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
@ZhiyuLi-Nvidia
Copy link
Copy Markdown
Contributor Author

/ok to test 25dcf41

… tests

After the move under tests/unit/data_plane/ (commit 25dcf41), the CI ruff hooks would auto-fix two minor things:

- test_codec_wire_stripped.py: drop extra blank line after import block
- test_correctness.py: merge two 'from column_io import ...' lines (import-sort)

Apply locally so the pre-commit auto-fix step on CI has nothing left to modify (avoiding the 'files were modified by this hook' failure on the next run).

Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
@ZhiyuLi-Nvidia
Copy link
Copy Markdown
Contributor Author

/ok to test fff691d

The link in nemo-gym-integration.md pointed at
docs.nvidia.com/nemo/gym/latest/about/concepts/core-components.html
which returns 404. The page is actually served at
docs.nvidia.com/nemo/gym/about/core-components (no /latest/, no
/concepts/) per the gym docs sitemap.

Update the source link instead of suppressing it via the linkcheck
false-positives walkaround.

Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
@ZhiyuLi-Nvidia
Copy link
Copy Markdown
Contributor Author

/ok to test c069f92

"""

@abstractmethod
def check_consumption_status(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i can't seem to find any uses of this function outside of tests. is this needed?

"""

@abstractmethod
def claim_meta(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same with this one. i can't seem to find it used anywhere outside of tests. intentional?

"""

@abstractmethod
def get_data(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i can't seem to find any uses of this function outside of tests. is this needed?

policy_generation = policy # type: ignore
NEED_REFIT = False
POLICY_GENERATION_STALE = True
assert policy_generation is not None # for mypy type check
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we use pyrefly

adv_estimator = _create_advantage_estimator(master_config)

# ── Data-plane setup (mandatory in the sync trainer) ───────────────
# Sync trainer requires a TQ-mediated policy. The TQPolicy ctor
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Sync trainer requires a TQ-mediated policy. The TQPolicy ctor
# Sync trainer requires a TQ-mediated policy. The TQPolicy actor

}
if not hasattr(policy, "dp_cfg"):
raise ValueError(
"grpo_train_sync requires a TQ-mediated policy "
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably maintain a set of issues to clean this stuff up. i can imagine things like this will be forgotten unless we track somehow. Maybe github issue for each small thing is too tedious, so maybe just one "cleanup" tracking issue for things like this

# ──────────────────────────────────────────────────────────────────────────


def _get_local_node_ip() -> str:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only rejects link-local (169.254/16) but not loopback (127.0.0.1). On hosts where gethostname() resolves to 127.0.0.1 via /etc/hosts, announcing that to mooncake peers as MC_TCP_BIND_ADDRESS would cause cross-node connection refused. The PR's own test notes "Loopback is NOT skipped by the current implementation."

Consider also rejecting is_loopback.

Comment on lines +124 to +141
"""Inject ``{"pip": ["TransferQueue==0.1.6"]}`` into TQ's actor ``.options()``.

TQ spawns ``SimpleStorageUnit`` and ``TransferQueueController`` via
``Cls.options(...).remote(...)`` without a runtime_env, so they
inherit the job-level env. In a multi-node container deployment
where each node has its own ``/opt/nemo_rl_venv``, the driver's
``uv sync`` only updates ray-head's venv and a worker-node actor
fails with ``ModuleNotFoundError``. This monkey-patch makes Ray
pip-install TQ into a per-actor runtime_env on first spawn (cached
per-node by Ray afterwards). Idempotent. Couples us to TQ's internal
class layout — if TQ restructures, this becomes a no-op with a
logged warning and we fall back to per-node ``uv sync``.
"""
global _TQ_RUNTIME_ENV_PATCHED
if _TQ_RUNTIME_ENV_PATCHED:
return

runtime_env = {"pip": ["TransferQueue==0.1.6"]}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not quite sure why this is pinned when the pyproject.toml uses a diff version. seems brittle

tq = _tq()
base = OmegaConf.load(str(resources.files("transfer_queue") / "config.yaml"))

backend = cfg.get("backend", "simple")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

transfer_queue.py:206-208

Per project config conventions, YAML is the single source of truth for defaults. These lines introduce hidden defaults for backend ("simple"), storage_capacity (1_000_000), and num_storage_units (2). Same pattern at lines 262, 266, and 410 (global_segment_size, local_buffer_size, get_meta_poll_interval_s).

There is no data_plane: block in any exemplar YAML under examples/configs/, so the only source of these defaults is Python code — exactly what the convention exists to prevent.

Consider adding a data_plane: block to the relevant exemplar YAMLs with these defaults expressed there, and replacing the in-code defaults with .get(key) / .get(key, None).

# ── (A) task-mediated ───────────────────────────────────────────────

@abstractmethod
def register_partition(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any case where this doesn't mean "regster step"? i feel the naming may be more approachable if we can map it to a training loop's sematics

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

CI:L1 Run doctests, unit tests, and functional tests Documentation Improvements or additions to documentation

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants