fix: use compute instead of concurrency for ray data#1786
fix: use compute instead of concurrency for ray data#1786nightcityblade wants to merge 1 commit intoNVIDIA-NeMo:mainfrom
Conversation
Signed-off-by: nightcityblade <nightcityblade@gmail.com>
Greptile SummaryThis PR intends to fix actor-backed
Confidence Score: 2/5Not safe to merge — the core change introduces the deprecated Ray parameter for There is a concrete P1 defect: the PR swaps
Important Files Changed
Sequence DiagramsequenceDiagram
participant PD as process_dataset()
participant Ray as ray.data.Dataset.map_batches()
PD->>PD: is_actor_stage? → True
PD->>PD: calculate_concurrency_for_actors_for_stage() → (min, max)
Note over PD: PR sets compute=(min,max) ❌<br/>Should set concurrency=(min,max) ✅
PD->>Ray: map_batches(fn, batch_size=N, compute=(min,max))
Note over Ray: compute is DEPRECATED for map_batches<br/>(Ray 2.54.1 docs)
Reviews (1): Last reviewed commit: "fix: use compute instead of concurrency ..." | Re-trigger Greptile |
| map_batches_kwargs = { | ||
| "compute": calculate_concurrency_for_actors_for_stage( | ||
| self.stage, ignore_head_node=ignore_head_node | ||
| ), | ||
| } |
There was a problem hiding this comment.
Wrong direction:
compute is deprecated for map_batches, not concurrency
According to the Ray 2.54.1 docs, the signature for map_batches explicitly states: "compute – This argument is deprecated. Use concurrency argument." The map method is the one where concurrency was deprecated in favour of compute — but this code calls map_batches (line 111), so the PR has the direction backwards. The original use of concurrency= was correct for map_batches, and this change reintroduces the deprecated parameter.
Additionally, the old compute parameter expected a ray.data.ActorPoolStrategy object, not a plain int or (min, max) tuple. Passing compute=(1, 4) may silently mis-behave or raise at runtime when Ray processes the argument.
| map_batches_kwargs = { | |
| "compute": calculate_concurrency_for_actors_for_stage( | |
| self.stage, ignore_head_node=ignore_head_node | |
| ), | |
| } | |
| map_batches_kwargs = { | |
| "concurrency": calculate_concurrency_for_actors_for_stage( | |
| self.stage, ignore_head_node=ignore_head_node | |
| ), | |
| } |
| _, kwargs = dataset.map_batches.call_args | ||
| assert kwargs["compute"] == (1, 4) | ||
| assert "concurrency" not in kwargs |
There was a problem hiding this comment.
Test asserts the wrong keyword argument
This assertion validates that compute is forwarded, but since map_batches should receive concurrency (not compute), the test is guarding the wrong behavior. If the adapter is corrected to use concurrency=, this assertion should match:
| _, kwargs = dataset.map_batches.call_args | |
| assert kwargs["compute"] == (1, 4) | |
| assert "concurrency" not in kwargs | |
| assert kwargs["concurrency"] == (1, 4) | |
| assert "compute" not in kwargs |
Description
closes #1520
Switch the Ray Data adapter from the deprecated
concurrencyargument tocomputefor actor-backedmap_batches()calls, and add a focused adapter test to guard the forwarded kwargs.Usage
Checklist