Skip to content

Commit 5f62613

Browse files
committed
less
Signed-off-by: Robert Kruszewski <github@robertk.io>
1 parent 1af95fc commit 5f62613

9 files changed

Lines changed: 73 additions & 81 deletions

File tree

docs/user-guide/vortex-python.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -219,21 +219,21 @@ with ThreadPoolExecutor(max_workers=len(columns)) as threads:
219219
totals = list(threads.map(lambda column: sum_column("example.vortex", column), columns))
220220
```
221221

222-
Applications that want Vortex work to continue on background threads can configure the shared
223-
runtime through `vortex.runtime`:
222+
By default Vortex starts a background worker pool sized to `available_parallelism() - 1`.
223+
Set `VORTEX_MAX_THREADS=n` to pin the pool to a specific size at startup. To adjust the pool
224+
at runtime, use {func}`vortex.set_worker_threads`; passing `None` resets it to the default:
224225

225226
```python
226227
import vortex as vx
227-
import vortex.runtime as vxrt
228228

229-
previous_workers = vxrt.worker_count()
230-
vxrt.set_worker_threads_to_available_parallelism()
229+
previous_workers = vx.worker_threads()
230+
vx.set_worker_threads(None) # reset to available_parallelism() - 1
231231

232232
try:
233233
reader = vx.open("example.vortex").to_arrow(batch_size=64_000)
234234
table = reader.read_all()
235235
finally:
236-
vxrt.set_worker_threads(previous_workers)
236+
vx.set_worker_threads(previous_workers)
237237
```
238238

239239
## Conversion

vortex-python/python/vortex/__init__.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
import importlib.metadata
55

6-
from . import _lib, arrays, dataset, expr, file, io, ray, registry, runtime, scan
6+
from . import _lib, arrays, dataset, expr, file, io, ray, registry, scan
77
from ._lib.arrays import ( # pyright: ignore[reportMissingModuleSource]
88
AlpArray,
99
AlpRdArray,
@@ -61,6 +61,10 @@
6161
utf8,
6262
)
6363
from ._lib.iter import ArrayIterator # pyright: ignore[reportMissingModuleSource]
64+
from ._lib.runtime import ( # pyright: ignore[reportMissingModuleSource]
65+
set_worker_threads,
66+
worker_threads,
67+
)
6468
from ._lib.scalar import ( # pyright: ignore[reportMissingModuleSource]
6569
BinaryScalar,
6670
BoolScalar,
@@ -105,7 +109,6 @@
105109
"scan",
106110
"io",
107111
"registry",
108-
"runtime",
109112
"ray",
110113
# --- Objects and Functions ---
111114
"array",
@@ -188,6 +191,9 @@
188191
"ArrayIterator",
189192
# Scan
190193
"RepeatedScan",
194+
# Runtime
195+
"set_worker_threads",
196+
"worker_threads",
191197
# Version
192198
"__version__",
193199
]
Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
# SPDX-License-Identifier: Apache-2.0
22
# SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4-
def set_worker_threads(n: int) -> None: ...
5-
def set_worker_threads_to_available_parallelism() -> None: ...
6-
def worker_count() -> int: ...
4+
def set_worker_threads(n: int | None = None) -> None: ...
5+
def worker_threads() -> int: ...

vortex-python/python/vortex/dataset.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,11 @@
1515

1616
from ._lib import dataset as _dataset # pyright: ignore[reportMissingModuleSource]
1717
from ._lib import file as _file # pyright: ignore[reportMissingModuleSource]
18+
from ._lib.runtime import set_worker_threads as _set_worker_threads # pyright: ignore[reportMissingModuleSource]
19+
from ._lib.runtime import worker_threads as _worker_threads # pyright: ignore[reportMissingModuleSource]
1820
from .arrays import array
1921
from .arrow.expression import ensure_vortex_expression
2022
from .expr import Expr, and_
21-
from .runtime import set_worker_threads as _set_worker_threads
22-
from .runtime import set_worker_threads_to_available_parallelism as _set_worker_threads_to_available_parallelism
23-
from .runtime import worker_count as _worker_count
2423

2524

2625
@contextmanager
@@ -29,9 +28,9 @@ def _temporary_worker_threads(use_threads: bool | None) -> Iterator[None]:
2928
yield
3029
return
3130

32-
previous_workers = _worker_count()
31+
previous_workers = _worker_threads()
3332
if use_threads:
34-
_set_worker_threads_to_available_parallelism()
33+
_set_worker_threads(None)
3534
else:
3635
_set_worker_threads(0)
3736

vortex-python/python/vortex/runtime.py

Lines changed: 0 additions & 14 deletions
This file was deleted.

vortex-python/src/lib.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,21 @@ use vortex::io::runtime::current::CurrentThreadWorkerPool;
4141
pub(crate) static RUNTIME: LazyLock<CurrentThreadRuntime> =
4242
LazyLock::new(CurrentThreadRuntime::new);
4343

44-
/// Shared worker pool that can drive [`RUNTIME`]'s executor in the background.
45-
pub(crate) static POOL: LazyLock<CurrentThreadWorkerPool> = LazyLock::new(|| RUNTIME.new_pool());
44+
/// Shared worker pool that drives [`RUNTIME`]'s executor in the background.
45+
///
46+
/// On first access, the pool is sized to `VORTEX_MAX_THREADS` (if set to a
47+
/// non-negative integer) or otherwise to `available_parallelism() - 1`.
48+
pub(crate) static POOL: LazyLock<CurrentThreadWorkerPool> = LazyLock::new(|| {
49+
let pool = RUNTIME.new_pool();
50+
match std::env::var("VORTEX_MAX_THREADS")
51+
.ok()
52+
.and_then(|s| s.parse::<usize>().ok())
53+
{
54+
Some(n) => pool.set_workers(n),
55+
None => pool.set_workers_to_available_parallelism(),
56+
}
57+
pool
58+
});
4659

4760
/// Vortex is an Apache Arrow-compatible toolkit for working with compressed array data.
4861
#[cfg(feature = "extension-module")]

vortex-python/src/runtime.rs

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -13,35 +13,33 @@ pub(crate) fn init(py: Python, parent: &Bound<PyModule>) -> PyResult<()> {
1313
install_module("vortex._lib.runtime", &m)?;
1414

1515
m.add_function(wrap_pyfunction!(set_worker_threads, &m)?)?;
16-
m.add_function(wrap_pyfunction!(
17-
set_worker_threads_to_available_parallelism,
18-
&m
19-
)?)?;
20-
m.add_function(wrap_pyfunction!(worker_count, &m)?)?;
16+
m.add_function(wrap_pyfunction!(worker_threads, &m)?)?;
2117

2218
Ok(())
2319
}
2420

2521
/// Set the number of background worker threads driving Vortex futures.
22+
///
23+
/// If `n` is `None`, resets the pool to `available_parallelism() - 1`.
2624
#[pyfunction]
27-
pub fn set_worker_threads(n: isize) -> PyResult<()> {
28-
if n < 0 {
29-
return Err(PyValueError::new_err(
30-
"worker thread count must be non-negative",
31-
));
25+
#[pyo3(signature = (n=None))]
26+
pub fn set_worker_threads(n: Option<isize>) -> PyResult<()> {
27+
match n {
28+
Some(n) => {
29+
if n < 0 {
30+
return Err(PyValueError::new_err(
31+
"worker thread count must be non-negative",
32+
));
33+
}
34+
POOL.set_workers(n as usize);
35+
}
36+
None => POOL.set_workers_to_available_parallelism(),
3237
}
33-
POOL.set_workers(n as usize);
3438
Ok(())
3539
}
3640

37-
/// Set the worker pool to `available_parallelism() - 1`.
38-
#[pyfunction]
39-
pub fn set_worker_threads_to_available_parallelism() {
40-
POOL.set_workers_to_available_parallelism();
41-
}
42-
4341
/// Return the current number of background worker threads.
4442
#[pyfunction]
45-
pub fn worker_count() -> usize {
43+
pub fn worker_threads() -> usize {
4644
POOL.worker_count()
4745
}

vortex-python/test/test_dataset.py

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -74,28 +74,18 @@ def test_to_batches(ds: pd.Dataset):
7474

7575
def test_use_threads_configures_worker_pool(monkeypatch: pytest.MonkeyPatch):
7676
current_workers = 3
77-
calls: list[tuple[str, int]] = []
77+
calls: list[int | None] = []
7878

79-
def fake_worker_count() -> int:
79+
def fake_worker_threads() -> int:
8080
return current_workers
8181

82-
def fake_set_worker_threads(count: int) -> None:
82+
def fake_set_worker_threads(count: int | None) -> None:
8383
nonlocal current_workers
84-
calls.append(("set", count))
85-
current_workers = count
84+
calls.append(count)
85+
current_workers = 11 if count is None else count
8686

87-
def fake_set_worker_threads_to_available_parallelism() -> None:
88-
nonlocal current_workers
89-
calls.append(("available", current_workers))
90-
current_workers = 11
91-
92-
monkeypatch.setattr(vx_dataset, "_worker_count", fake_worker_count)
87+
monkeypatch.setattr(vx_dataset, "_worker_threads", fake_worker_threads)
9388
monkeypatch.setattr(vx_dataset, "_set_worker_threads", fake_set_worker_threads)
94-
monkeypatch.setattr(
95-
vx_dataset,
96-
"_set_worker_threads_to_available_parallelism",
97-
fake_set_worker_threads_to_available_parallelism,
98-
)
9989

10090
with vx_dataset._temporary_worker_threads(True): # pyright: ignore[reportPrivateUsage]
10191
assert current_workers == 11
@@ -106,7 +96,7 @@ def fake_set_worker_threads_to_available_parallelism() -> None:
10696
assert current_workers == 0
10797

10898
assert current_workers == 3
109-
assert calls == [("available", 3), ("set", 3), ("set", 0), ("set", 3)]
99+
assert calls == [None, 3, 0, 3]
110100

111101
calls.clear()
112102
reader = pa.RecordBatchReader.from_batches(
@@ -121,7 +111,7 @@ def fake_set_worker_threads_to_available_parallelism() -> None:
121111

122112
assert [batch.to_pylist() for batch in batches] == [[{"x": 1}], [{"x": 2}]]
123113
assert current_workers == 3
124-
assert calls == [("available", 3), ("set", 3)]
114+
assert calls == [None, 3]
125115

126116

127117
@pytest.mark.parametrize("batch_size", [1234, 8192, 1 << 31])

vortex-python/test/test_runtime.py

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,30 +3,31 @@
33

44
import pytest
55

6-
from vortex import runtime
6+
import vortex as vx
77

88

99
def test_worker_threads():
10-
original = runtime.worker_count()
10+
original = vx.worker_threads()
1111
try:
12-
runtime.set_worker_threads(0)
13-
assert runtime.worker_count() == 0
12+
vx.set_worker_threads(0)
13+
assert vx.worker_threads() == 0
1414

15-
runtime.set_worker_threads(1)
16-
assert runtime.worker_count() == 1
15+
vx.set_worker_threads(1)
16+
assert vx.worker_threads() == 1
1717
finally:
18-
runtime.set_worker_threads(original)
18+
vx.set_worker_threads(original)
1919

2020

21-
def test_set_worker_threads_to_available_parallelism():
22-
original = runtime.worker_count()
21+
def test_set_worker_threads_none_resets_to_available_parallelism():
22+
original = vx.worker_threads()
2323
try:
24-
runtime.set_worker_threads_to_available_parallelism()
25-
assert runtime.worker_count() >= 1
24+
vx.set_worker_threads(0)
25+
vx.set_worker_threads(None)
26+
assert vx.worker_threads() >= 1
2627
finally:
27-
runtime.set_worker_threads(original)
28+
vx.set_worker_threads(original)
2829

2930

3031
def test_set_worker_threads_rejects_negative():
3132
with pytest.raises(ValueError):
32-
runtime.set_worker_threads(-1)
33+
vx.set_worker_threads(-1)

0 commit comments

Comments
 (0)