Skip to content

Commit 30e26ab

Browse files
authored
Remove CUDAStreamPolicy enum and simplify CUDA stream policy (rapidsai#22086)
Currently it is possible to produce streams for the rapidsmpf execution path in cudf to get streams in a few different ways. You can... 1) use the CUDA default stream 2) construct a new stream before pipeline evaluation, 3) get streams from the rapidsmpf stream pool. We have no real need for the second option, and it is a potential performance footgun for conflicting with the stream pool. This PR removes that ability, and in the process simplifies much of the stream policy handling leaving us with just the stream pool config. Authors: - Vyas Ramasubramani (https://github.com/vyasr) Approvers: - Matthew Roeschke (https://github.com/mroeschke) URL: rapidsai#22086
1 parent d291699 commit 30e26ab

16 files changed

Lines changed: 67 additions & 122 deletions

File tree

docs/cudf/source/cudf_polars/api.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ For the most part, the public API of `cudf-polars` is the polars API.
88
:members:
99
Cluster,
1010
ConfigOptions,
11-
CUDAStreamPolicy,
11+
CUDAStreamPoolConfig,
1212
DynamicPlanningOptions,
1313
ExecutorType,
1414
InMemoryExecutor,

python/cudf_polars/cudf_polars/callback.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ def _callback(
301301
),
302302
):
303303
if config_options.executor.name == "in-memory":
304-
context = IRExecutionContext.from_config_options(config_options)
304+
context = IRExecutionContext()
305305
df = ir.evaluate(cache={}, timer=timer, context=context).to_polars()
306306
if timer is None:
307307
return df

python/cudf_polars/cudf_polars/dsl/ir.py

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,9 @@
4343
range_window_bounds,
4444
)
4545
from cudf_polars.utils import dtypes
46-
from cudf_polars.utils.config import CUDAStreamPolicy
4746
from cudf_polars.utils.cuda_stream import (
4847
get_cuda_stream,
4948
get_joined_cuda_stream,
50-
get_new_cuda_stream,
5149
join_cuda_streams,
5250
)
5351
from cudf_polars.utils.versions import (
@@ -68,7 +66,7 @@
6866

6967
from cudf_polars.containers.dataframe import NamedColumn
7068
from cudf_polars.typing import CSECache, ClosedInterval, Schema, Slice as Zlice
71-
from cudf_polars.utils.config import ConfigOptions, ParquetOptions
69+
from cudf_polars.utils.config import ParquetOptions
7270
from cudf_polars.utils.timer import Timer
7371

7472
__all__ = [
@@ -114,25 +112,9 @@ class IRExecutionContext:
114112
A zero-argument callable that returns a CUDA stream.
115113
"""
116114

117-
get_cuda_stream: Callable[[], Stream]
115+
get_cuda_stream: Callable[[], Stream] = field(default=get_cuda_stream)
118116
query_id: uuid.UUID = field(default_factory=uuid.uuid4)
119117

120-
@classmethod
121-
def from_config_options(
122-
cls, config_options: ConfigOptions, query_id: uuid.UUID | None = None
123-
) -> IRExecutionContext:
124-
"""Create an IRExecutionContext from ConfigOptions."""
125-
query_id = query_id or uuid.uuid4()
126-
match config_options.cuda_stream_policy:
127-
case CUDAStreamPolicy.DEFAULT:
128-
return cls(get_cuda_stream=get_cuda_stream, query_id=query_id)
129-
case CUDAStreamPolicy.NEW:
130-
return cls(get_cuda_stream=get_new_cuda_stream, query_id=query_id)
131-
case _: # pragma: no cover
132-
raise ValueError(
133-
f"Invalid CUDA stream policy: {config_options.cuda_stream_policy}"
134-
)
135-
136118
@contextlib.contextmanager
137119
def stream_ordered_after(self, *dfs: DataFrame) -> Generator[Stream, None, None]:
138120
"""

python/cudf_polars/cudf_polars/experimental/benchmarks/utils_legacy.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -812,9 +812,7 @@ def execute_query(
812812
if args.debug:
813813
translator = Translator(q._ldf.visit(), engine)
814814
ir = translator.translate_ir()
815-
context = IRExecutionContext.from_config_options(
816-
translator.config_options
817-
)
815+
context = IRExecutionContext()
818816
if run_config.executor == "in-memory":
819817
t0 = time.monotonic()
820818
result = ir.evaluate(

python/cudf_polars/cudf_polars/experimental/benchmarks/utils_new_frontends.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -713,9 +713,7 @@ def execute_query(
713713
if args.debug:
714714
translator = Translator(q._ldf.visit(), engine)
715715
ir = translator.translate_ir()
716-
context = IRExecutionContext.from_config_options(
717-
translator.config_options
718-
)
716+
context = IRExecutionContext()
719717
if run_config.executor == "in-memory":
720718
t0 = time.monotonic()
721719
result = ir.evaluate(

python/cudf_polars/cudf_polars/experimental/parallel.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ def task_graph(
141141
--------
142142
generate_ir_tasks
143143
"""
144-
context = IRExecutionContext.from_config_options(config_options)
144+
context = IRExecutionContext()
145145
graph = reduce(
146146
operator.or_,
147147
(

python/cudf_polars/cudf_polars/experimental/rapidsmpf/core.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -312,9 +312,7 @@ def evaluate_pipeline(
312312
get_cuda_stream=rmpf_context.get_stream_from_pool, query_id=query_id
313313
)
314314
else:
315-
ir_context = IRExecutionContext.from_config_options(
316-
config_options, query_id=query_id
317-
)
315+
ir_context = IRExecutionContext(query_id=query_id)
318316

319317
# Generate network nodes
320318
assert rmpf_context is not None, "RapidsMPF context must defined."

python/cudf_polars/cudf_polars/experimental/rapidsmpf/frontend/options.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ class StreamingOptions:
242242
Env: ``CUDF_POLARS__MEMORY_RESOURCE_CONFIG__*``.
243243
Category: engine.
244244
cuda_stream_policy
245-
CUDA stream policy (``"default"``, ``"new"``, ``"pool"`` or config dict).
245+
CUDA stream policy (``"default"``, ``"pool"`` or config dict).
246246
Env: ``CUDF_POLARS__CUDA_STREAM_POLICY``.
247247
Category: engine.
248248
@@ -307,9 +307,9 @@ class StreamingOptions:
307307
raise_on_fail: bool | Unspecified = _opt("engine")
308308
parquet_options: dict[str, Any] | ParquetOptions | Unspecified = _opt("engine")
309309
memory_resource_config: MemoryResourceConfig | Unspecified = _opt("engine")
310-
cuda_stream_policy: (
311-
Literal["default", "new", "pool"] | dict[str, Any] | Unspecified
312-
) = _opt("engine", "CUDF_POLARS__CUDA_STREAM_POLICY")
310+
cuda_stream_policy: Literal["default", "pool"] | dict[str, Any] | Unspecified = (
311+
_opt("engine", "CUDF_POLARS__CUDA_STREAM_POLICY")
312+
)
313313

314314
# ------------------------------------------------------------------
315315
# Conversion helpers used by the engines

python/cudf_polars/cudf_polars/utils/config.py

Lines changed: 18 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -947,24 +947,12 @@ def build(self) -> CudaStreamPool:
947947
)
948948

949949

950-
class CUDAStreamPolicy(enum.StrEnum):
951-
"""
952-
The policy to use for acquiring new CUDA streams.
953-
954-
* ``CUDAStreamPolicy.DEFAULT`` : Use the default CUDA stream.
955-
* ``CUDAStreamPolicy.NEW`` : Create a new CUDA stream.
956-
"""
957-
958-
DEFAULT = "default"
959-
NEW = "new"
960-
961-
962950
def _convert_cuda_stream_policy(
963951
user_cuda_stream_policy: dict | str,
964-
) -> CUDAStreamPolicy | CUDAStreamPoolConfig:
952+
) -> CUDAStreamPoolConfig | None:
965953
match user_cuda_stream_policy:
966-
case "default" | "new":
967-
return CUDAStreamPolicy(user_cuda_stream_policy)
954+
case "default":
955+
return None
968956
case "pool":
969957
return CUDAStreamPoolConfig()
970958
case dict():
@@ -997,6 +985,13 @@ def _convert_cuda_stream_policy(
997985
) from None
998986

999987

988+
def _default_cuda_stream_policy() -> CUDAStreamPoolConfig | None:
989+
v = os.environ.get("CUDF_POLARS__CUDA_STREAM_POLICY")
990+
if v is None:
991+
return None
992+
return _convert_cuda_stream_policy(v)
993+
994+
1000995
@dataclasses.dataclass(frozen=True, eq=True)
1001996
class ConfigOptions(Generic[ExecutorType]):
1002997
"""
@@ -1017,7 +1012,9 @@ class ConfigOptions(Generic[ExecutorType]):
10171012
The GPU used to run the query. If not provided, the
10181013
query uses the current CUDA device.
10191014
cuda_stream_policy
1020-
The policy to use for acquiring new CUDA streams. See :class:`~cudf_polars.utils.config.CUDAStreamPolicy` for more.
1015+
The policy to use for CUDA streams. ``None`` (the default) uses the
1016+
default CUDA stream. A :class:`~cudf_polars.utils.config.CUDAStreamPoolConfig`
1017+
can be used to configure a stream pool.
10211018
"""
10221019

10231020
raise_on_fail: bool = False
@@ -1029,12 +1026,8 @@ class ConfigOptions(Generic[ExecutorType]):
10291026
)
10301027
device: int | None = None
10311028
memory_resource_config: MemoryResourceConfig | None = None
1032-
cuda_stream_policy: CUDAStreamPolicy | CUDAStreamPoolConfig = dataclasses.field(
1033-
default_factory=_make_default_factory(
1034-
"CUDF_POLARS__CUDA_STREAM_POLICY",
1035-
CUDAStreamPolicy.__call__,
1036-
default=CUDAStreamPolicy.DEFAULT,
1037-
)
1029+
cuda_stream_policy: CUDAStreamPoolConfig | None = dataclasses.field(
1030+
default_factory=_default_cuda_stream_policy
10381031
)
10391032

10401033
@classmethod
@@ -1137,7 +1130,7 @@ def from_polars_engine(
11371130
"cuda_stream_policy", None
11381131
) or os.environ.get("CUDF_POLARS__CUDA_STREAM_POLICY", None)
11391132

1140-
cuda_stream_policy: CUDAStreamPolicy | CUDAStreamPoolConfig
1133+
cuda_stream_policy: CUDAStreamPoolConfig | None
11411134

11421135
if user_cuda_stream_policy is None:
11431136
if (
@@ -1147,7 +1140,7 @@ def from_polars_engine(
11471140
cuda_stream_policy = CUDAStreamPoolConfig()
11481141
else:
11491142
# everything else defaults to the default stream
1150-
cuda_stream_policy = CUDAStreamPolicy.DEFAULT
1143+
cuda_stream_policy = None
11511144
else:
11521145
cuda_stream_policy = _convert_cuda_stream_policy(user_cuda_stream_policy)
11531146

@@ -1157,7 +1150,7 @@ def from_polars_engine(
11571150
or (executor.name == "streaming" and executor.runtime != Runtime.RAPIDSMPF)
11581151
):
11591152
raise ValueError(
1160-
"CUDAStreamPolicy.POOL is only supported by the rapidsmpf runtime."
1153+
"A stream pool is only supported by the rapidsmpf runtime."
11611154
)
11621155

11631156
kwargs["cuda_stream_policy"] = cuda_stream_policy

python/cudf_polars/cudf_polars/utils/cuda_stream.py

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES.
1+
# SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES.
22
# SPDX-License-Identifier: Apache-2.0
33

44
"""CUDA stream utilities."""
@@ -8,11 +8,13 @@
88
from typing import TYPE_CHECKING
99

1010
import pylibcudf as plc
11-
from rmm.pylibrmm.stream import DEFAULT_STREAM, Stream
11+
from rmm.pylibrmm.stream import DEFAULT_STREAM
1212

1313
if TYPE_CHECKING:
1414
from collections.abc import Callable, Sequence
1515

16+
from rmm.pylibrmm.stream import Stream
17+
1618

1719
def get_dask_cuda_stream() -> Stream:
1820
"""Get the default CUDA stream for Dask."""
@@ -24,11 +26,6 @@ def get_cuda_stream() -> Stream:
2426
return DEFAULT_STREAM
2527

2628

27-
def get_new_cuda_stream() -> Stream:
28-
"""Get a new CUDA stream for the current thread."""
29-
return Stream()
30-
31-
3229
def join_cuda_streams(
3330
*, downstreams: Sequence[Stream], upstreams: Sequence[Stream]
3431
) -> None:

0 commit comments

Comments
 (0)