Skip to content

Commit ef308bc

Browse files
authored
[Multi-GPU Polars] Introduce StreamingEngine base class and SPMDEngine (rapidsai#21867)
Introduces `StreamingEngine`, a new base class that all multi-GPU execution frontends (SPMD, Ray, Dask, etc.) will return. This PR provides the initial implementation via `SPMDEngine`; Ray and Dask frontends will follow in separate PRs. * Add `StreamingEngine` base class and `SPMDEngine` implementation. * Update tests to use a shared fixture for engine creation instead of constructing engines inline. * Move tests specific to the legacy tasks/distributed runtime to `tests/experimental/legacy/`. * Add `pytest.mark.spmd` marker and an autouse fixture that skips non-SPMD tests when running under `rrun -n N` with `N > 1`. * Separate regular cudf-polars tests from experimental tests, we now have: - `run_cudf_polars_pytests.sh` (CI-blocking). - `run_cudf_polars_experimental_pytests.sh` (requires rapidsmpf). This is the first step in migrating away from the legacy tasks and Dask runtimes toward a unified streaming engine abstraction. Authors: - Mads R. B. Kristensen (https://github.com/madsbk) Approvers: - Bradley Dice (https://github.com/bdice) - Tom Augspurger (https://github.com/TomAugspurger) - Niranda Perera (https://github.com/nirandaperera) - Lawrence Mitchell (https://github.com/wence-) URL: rapidsai#21867
1 parent 4791449 commit ef308bc

59 files changed

Lines changed: 2124 additions & 1821 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/pr.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -490,7 +490,7 @@ jobs:
490490
# (rapidsmpf compatibility already validated in rapidsmpf CI)
491491
matrix_filter: map(select(.ARCH == "amd64")) | group_by(.CUDA_VER|split(".")|map(tonumber)|.[0]) | map(max_by([(.PY_VER|split(".")|map(tonumber)), (.CUDA_VER|split(".")|map(tonumber))]))
492492
build_type: pull-request
493-
script: "ci/test_cudf_polars_with_rapidsmpf.sh"
493+
script: "ci/test_cudf_polars_experimental.sh"
494494
cudf-polars-polars-tests:
495495
needs: [wheel-build-cudf-polars, changed-files]
496496
secrets: inherit

.github/workflows/test.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ jobs:
164164
branch: ${{ inputs.branch }}
165165
date: ${{ inputs.date }}
166166
sha: ${{ inputs.sha }}
167-
script: "ci/test_cudf_polars_with_rapidsmpf.sh"
167+
script: "ci/test_cudf_polars_experimental.sh"
168168
continue-on-error: true
169169
cudf-polars-polars-tests:
170170
secrets: inherit

ci/run_cudf_polars_with_rapidsmpf_pytests.sh renamed to ci/run_cudf_polars_experimental_pytests.sh

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,23 @@
44

55
set -euo pipefail
66

7-
# Test cudf_polars with rapidsmpf integration
8-
# This script runs experimental tests with single cluster mode and the rapidsmpf runtime
7+
# Test cudf_polars experimental.
98

109
# It is essential to cd into python/cudf_polars as `pytest-xdist` + `coverage` seem to work only at this directory level.
11-
12-
# Support invoking run_cudf_polars_with_rapidsmpf_pytests.sh outside the script directory
10+
# Support invoking outside the script directory
1311
cd "$(dirname "$(realpath "${BASH_SOURCE[0]}")")"/../python/cudf_polars/
1412

15-
# Run experimental tests with the "single" cluster mode and the "rapidsmpf" runtime
16-
rapids-logger "Running experimental tests with the 'rapidsmpf' runtime and a 'single' cluster"
17-
timeout 10m python -m pytest --cache-clear "$@" "tests/experimental" \
13+
rapids-logger "Running experimental legacy tests with the 'rapidsmpf' runtime and a 'single' cluster"
14+
timeout 10m python -m pytest --cache-clear "$@" "tests/experimental/legacy" \
1815
--executor streaming \
1916
--cluster single \
2017
--runtime rapidsmpf
2118

22-
# Run experimental tests with the "distributed" cluster mode and the "rapidsmpf" runtime
23-
rapids-logger "Running experimental tests with the 'rapidsmpf' runtime and a 'distributed' cluster"
24-
timeout 10m python -m pytest --cache-clear "$@" "tests/experimental" \
19+
rapids-logger "Running experimental legacy tests with the 'rapidsmpf' runtime and a 'distributed' cluster"
20+
timeout 10m python -m pytest --cache-clear "$@" "tests/experimental/legacy" \
2521
--executor streaming \
2622
--cluster distributed \
2723
--runtime rapidsmpf
24+
25+
rapids-logger "Running experimental tests"
26+
timeout 10m python -m pytest --cache-clear "$@" tests/experimental --ignore=tests/experimental/legacy

ci/run_cudf_polars_pytests.sh

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,16 @@
11
#!/bin/bash
2-
# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION.
2+
# SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION.
33
# SPDX-License-Identifier: Apache-2.0
44

55
set -euo pipefail
66

77
# It is essential to cd into python/cudf_polars as `pytest-xdist` + `coverage` seem to work only at this directory level.
8-
98
# Support invoking run_cudf_polars_pytests.sh outside the script directory
109
cd "$(dirname "$(realpath "${BASH_SOURCE[0]}")")"/../python/cudf_polars/
1110

12-
# Test the "in-memory" executor
13-
python -m pytest --cache-clear "$@" tests --executor in-memory
14-
15-
# Test the default "streaming" executor
16-
python -m pytest --cache-clear "$@" tests --executor streaming
17-
18-
# Test the "streaming" executor with small blocksize
19-
python -m pytest --cache-clear "$@" tests --executor streaming --blocksize-mode small
20-
21-
# Run experimental tests with Distributed cluster
22-
python -m pytest --cache-clear "$@" "tests/experimental" \
23-
--executor streaming \
24-
--cluster distributed \
25-
--cov-fail-under=0 # No code-coverage requirement for these tests.
11+
# Run all non-experimental tests using both the in-memory and streaming executor.
12+
IGNORE_EXPERIMENTAL="--ignore=tests/experimental/"
13+
python -m pytest --cache-clear "$@" tests $IGNORE_EXPERIMENTAL --executor in-memory
14+
python -m pytest --cache-clear "$@" tests $IGNORE_EXPERIMENTAL --executor streaming
15+
python -m pytest --cache-clear "$@" tests $IGNORE_EXPERIMENTAL --executor streaming \
16+
--blocksize-mode small
Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,8 @@ EXITCODE=0
4949
trap set_exitcode ERR
5050
set +e
5151

52-
rapids-logger "Running cudf_polars tests with rapidsmpf"
53-
54-
# Run cudf_polars tests with rapidsmpf using dedicated test runner
55-
timeout 15m ./ci/run_cudf_polars_with_rapidsmpf_pytests.sh \
52+
rapids-logger "Running cudf_polars experimental tests (non-ci-blocking)"
53+
timeout 15m ./ci/run_cudf_polars_experimental_pytests.sh \
5654
--no-cov \
5755
--numprocesses=8 \
5856
--dist=worksteal \

python/cudf_polars/cudf_polars/dsl/expressions/unary.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,7 @@ def do_evaluate(
421421
1, dtype=column.dtype.plc_type, stream=df.stream
422422
)
423423
else:
424-
assert_never(strategy) # pragma: no cover
424+
assert_never(strategy)
425425

426426
if strategy == "mean":
427427
return Column(

python/cudf_polars/cudf_polars/dsl/ir.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2317,7 +2317,7 @@ def _joiners(
23172317
plc.copying.OutOfBoundsPolicy.DONT_CHECK,
23182318
None,
23192319
)
2320-
assert_never(how) # pragma: no cover
2320+
assert_never(how)
23212321

23222322
@staticmethod
23232323
def _reorder_maps(

python/cudf_polars/cudf_polars/experimental/benchmarks/utils.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@
2727
from typing import TYPE_CHECKING, Any, Literal, assert_never
2828

2929
import nvtx
30+
from rapidsmpf.config import Options, get_environment_variables
3031

3132
import polars as pl
32-
import polars.testing
3333

3434
import rmm.statistics
3535

@@ -768,7 +768,6 @@ def initialize_dask_cluster(run_config: RunConfig, args: argparse.Namespace): #
768768

769769
if run_config.shuffle != "tasks":
770770
try:
771-
from rapidsmpf.config import Options
772771
from rapidsmpf.integrations.dask import bootstrap_dask_cluster
773772

774773
bootstrap_dask_cluster(
@@ -1863,12 +1862,13 @@ def run_polars_spmd(
18631862
rmm.mr.CudaAsyncMemoryResource(release_threshold=args.rmm_release_threshold)
18641863
)
18651864
with spmd_execution(
1865+
rapidsmpf_options=Options(get_environment_variables()),
18661866
executor_options=executor_options,
18671867
engine_options={
18681868
"parquet_options": parquet_options,
18691869
"cuda_stream_policy": run_config.stream_policy,
18701870
},
1871-
) as (comm, ctx, engine):
1871+
) as engine:
18721872
from cudf_polars.experimental.rapidsmpf.collectives.common import reserve_op_id
18731873
from cudf_polars.experimental.rapidsmpf.frontend.spmd import (
18741874
allgather_polars_dataframe,
@@ -1877,14 +1877,13 @@ def run_polars_spmd(
18771877
def _allgather_result(df: pl.DataFrame) -> pl.DataFrame:
18781878
with reserve_op_id() as op_id:
18791879
return allgather_polars_dataframe(
1880-
comm=comm,
1881-
ctx=ctx,
1880+
engine=engine,
18821881
local_df=df,
18831882
op_id=op_id,
18841883
)
18851884

1886-
rank = comm.rank
1887-
run_config = dataclasses.replace(run_config, n_workers=comm.nranks)
1885+
rank = engine.rank
1886+
run_config = dataclasses.replace(run_config, n_workers=engine.nranks)
18881887
records, plans, validation_failures, query_failures = _run_query_loop(
18891888
benchmark,
18901889
args,
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES.
2+
# SPDX-License-Identifier: Apache-2.0
3+
"""Multi-GPU frontend core."""
4+
5+
from __future__ import annotations
6+
7+
import contextlib
8+
from typing import Any, Self
9+
10+
import polars as pl
11+
12+
13+
class StreamingEngine(pl.GPUEngine):
14+
"""
15+
Base class for multi-GPU Polars engines.
16+
17+
The engine manages the lifecycle of a streaming execution and can
18+
be used as a context manager. On exit, :meth:`shutdown` is called.
19+
20+
Notes
21+
-----
22+
The engine must be created and shut down on the same thread. In particular,
23+
destruction and context manager exit must occur on the thread that created
24+
the instance.
25+
26+
Parameters
27+
----------
28+
nranks
29+
Number of ranks (workers or GPUs) in the cluster.
30+
executor_options
31+
Key/value options forwarded to the streaming executor.
32+
engine_options
33+
Additional keyword arguments forwarded to
34+
:class:`~polars.lazyframe.engine_config.GPUEngine`.
35+
exit_stack
36+
A :class:`contextlib.ExitStack` whose registered contexts are closed
37+
when :meth:`shutdown` is called. If ``None``, an empty stack is created.
38+
"""
39+
40+
def __init__(
41+
self,
42+
*,
43+
nranks: int,
44+
executor_options: dict[str, Any],
45+
engine_options: dict[str, Any],
46+
exit_stack: contextlib.ExitStack | None = None,
47+
):
48+
self._nranks = nranks
49+
self._exit_stack: contextlib.ExitStack | None = (
50+
exit_stack or contextlib.ExitStack()
51+
)
52+
super().__init__(
53+
executor="streaming",
54+
executor_options=executor_options,
55+
**engine_options,
56+
)
57+
58+
@property
59+
def nranks(self) -> int:
60+
"""
61+
Number of ranks (for example GPUs or workers) in the cluster.
62+
63+
Local execution without a cluster returns 1.
64+
65+
Returns
66+
-------
67+
Number of ranks.
68+
"""
69+
return self._nranks
70+
71+
def shutdown(self) -> None:
72+
"""
73+
Shut down engine and release all owned resources.
74+
75+
Idempotent: safe to call more than once. Must be called on the same
76+
thread that created the engine.
77+
"""
78+
if self._exit_stack is None:
79+
return # already shut down
80+
try:
81+
self._exit_stack.close()
82+
finally:
83+
self._exit_stack = None
84+
self.device = None
85+
self.memory_resource = None
86+
self.config = {}
87+
88+
def __enter__(self) -> Self:
89+
"""Enter the context manager, returning ``self``."""
90+
return self
91+
92+
def __exit__(self, *_: object) -> None:
93+
"""Exit the context manager, calling :meth:`shutdown`."""
94+
self.shutdown()

0 commit comments

Comments
 (0)