Skip to content

Commit d72c79f

Browse files
committed
Cache available parallelism
Signed-off-by: Adam Gutglick <adam@spiraldb.com>
1 parent 91a6b57 commit d72c79f

15 files changed

Lines changed: 50 additions & 37 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

benchmarks/vector-search-bench/src/prepare.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,8 +162,6 @@ async fn write_shard_streaming(
162162
.open(vortex_path)
163163
.await?;
164164

165-
// This will write in parallel, using `std::thread::available_parallelism()`.
166-
// See `CompressingStrategy` for more details.
167165
flavor
168166
.create_write_options(&SESSION)
169167
.write(&mut output, stream)

vortex-bench/src/polarsignals/data.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use parquet::file::properties::WriterProperties;
2525
use rand::RngExt;
2626
use rand::SeedableRng;
2727
use rand::rngs::StdRng;
28+
use vortex::utils::parallelism::get_available_parallelism;
2829

2930
use super::schema::Int64DictBuilder;
3031
use super::schema::LABELS;
@@ -146,9 +147,7 @@ pub fn generate_polarsignals_parquet(n_rows: usize, output_path: &Path) -> Resul
146147
let mut writer = ArrowWriter::try_new(file, Arc::clone(&schema), Some(props))?;
147148

148149
let batch_size = 10_000;
149-
let num_threads = std::thread::available_parallelism()
150-
.map(|n| n.get())
151-
.unwrap_or(1);
150+
let num_threads = get_available_parallelism().unwrap_or(1);
152151

153152
let batch_ranges: Vec<(usize, usize)> = (0..n_rows)
154153
.step_by(batch_size)

vortex-datafusion/src/v2/source.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ use vortex::io::session::RuntimeSessionExt;
117117
use vortex::scan::DataSourceRef;
118118
use vortex::scan::ScanRequest;
119119
use vortex::session::VortexSession;
120+
use vortex_utils::parallelism::get_available_parallelism;
120121

121122
use crate::convert::exprs::DefaultExpressionConvertor;
122123
use crate::convert::exprs::ExpressionConvertor;
@@ -277,9 +278,7 @@ impl VortexDataSourceBuilder {
277278
filter: None,
278279
limit: None,
279280
ordered: false,
280-
num_partitions: std::thread::available_parallelism().unwrap_or_else(|_| {
281-
NonZero::new(1).vortex_expect("available parallelism must be non-zero")
282-
}),
281+
num_partitions: get_available_parallelism().unwrap_or(1),
283282
})
284283
}
285284
}
@@ -360,7 +359,7 @@ pub struct VortexDataSource {
360359
/// We use this as a hint for how many splits to execute concurrently in `open()`, but we
361360
/// always declare to DataFusion that we only have a single partition so that we can
362361
/// internally manage concurrency and fix the problem of partition skew.
363-
num_partitions: NonZeroUsize,
362+
num_partitions: usize,
364363
}
365364

366365
impl fmt::Debug for VortexDataSource {
@@ -428,7 +427,7 @@ impl DataSource for VortexDataSource {
428427

429428
let handle = session.handle();
430429
let stream = scan_streams
431-
.try_flatten_unordered(Some(num_partitions.get() * 2))
430+
.try_flatten_unordered(Some(num_partitions * 2))
432431
.map(move |result| {
433432
let session = session.clone();
434433
let schema = Arc::clone(&projected_schema);
@@ -437,7 +436,7 @@ impl DataSource for VortexDataSource {
437436
result.and_then(|chunk| chunk.execute_record_batch(&schema, &mut ctx))
438437
})
439438
})
440-
.buffered(num_partitions.get())
439+
.buffered(num_partitions)
441440
.map(|result| result.map_err(|e| DataFusionError::External(Box::new(e))));
442441

443442
// Apply leftover projection (expressions that couldn't be pushed into Vortex).
@@ -488,8 +487,7 @@ impl DataSource for VortexDataSource {
488487
) -> DFResult<Option<Arc<dyn DataSource>>> {
489488
// Vortex handles parallelism internally — always use a single partition.
490489
let mut this = self.clone();
491-
this.num_partitions = NonZero::new(target_partitions)
492-
.ok_or_else(|| DataFusionError::Internal("non-zero partitions".to_string()))?;
490+
this.num_partitions = target_partitions;
493491
this.ordered |= output_ordering.is_some();
494492
Ok(Some(Arc::new(this)))
495493
}

vortex-duckdb/src/datasource.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ use vortex::scalar_fn::fns::pack::Pack;
5151
use vortex::scan::DataSource;
5252
use vortex::scan::ScanRequest;
5353
use vortex_utils::aliases::hash_set::HashSet;
54+
use vortex_utils::parallelism::get_available_parallelism;
5455

5556
use crate::RUNTIME;
5657
use crate::SESSION;
@@ -304,9 +305,7 @@ impl<T: DataSourceTableFunction> TableFunction for T {
304305

305306
let scan = RUNTIME.block_on(bind_data.data_source.scan(request))?;
306307

307-
let num_workers = std::thread::available_parallelism()
308-
.map(|n| n.get())
309-
.unwrap_or(1);
308+
let num_workers = get_available_parallelism().unwrap_or(1);
310309

311310
// We create an async bounded channel so that all thread-local workers can pull the next
312311
// available array chunk regardless of which partition it came from.

vortex-io/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ vortex-buffer = { workspace = true }
3939
vortex-error = { workspace = true }
4040
vortex-metrics = { workspace = true }
4141
vortex-session = { workspace = true }
42+
vortex-utils = { workspace = true }
4243

4344
[target.'cfg(unix)'.dependencies]
4445
custom-labels = { workspace = true }

vortex-io/src/runtime/pool.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use std::time::Duration;
99
use parking_lot::Mutex;
1010
use smol::block_on;
1111
use vortex_error::VortexExpect;
12+
use vortex_utils::parallelism::get_available_parallelism;
1213

1314
#[derive(Clone)]
1415
pub struct CurrentThreadWorkerPool {
@@ -25,10 +26,10 @@ impl CurrentThreadWorkerPool {
2526
}
2627

2728
/// Set the number of worker threads to the available system parallelism as reported by
28-
/// `std::thread::available_parallelism()` minus 1, to leave a slot open for the calling thread.
29+
/// [`get_available_parallelism()`] minus 1, to leave a slot open for the calling thread.
2930
pub fn set_workers_to_available_parallelism(&self) {
30-
let n = std::thread::available_parallelism()
31-
.map(|n| n.get().saturating_sub(1).max(1))
31+
let n = get_available_parallelism()
32+
.map(|n| n.saturating_sub(1).max(1))
3233
.unwrap_or(1);
3334
self.set_workers(n);
3435
}

vortex-layout/src/layouts/compressed.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use vortex_btrblocks::BtrBlocksCompressor;
1414
use vortex_error::VortexResult;
1515
use vortex_io::session::RuntimeSessionExt;
1616
use vortex_session::VortexSession;
17+
use vortex_utils::parallelism::get_available_parallelism;
1718

1819
use crate::LayoutRef;
1920
use crate::LayoutStrategy;
@@ -67,9 +68,7 @@ impl CompressingStrategy {
6768
child: Arc::new(child),
6869
compressor: Arc::new(compressor),
6970
stats: Stat::all().collect(),
70-
concurrency: std::thread::available_parallelism()
71-
.map(|v| v.get())
72-
.unwrap_or(1),
71+
concurrency: get_available_parallelism().unwrap_or(1),
7372
}
7473
}
7574

vortex-layout/src/layouts/zoned/writer.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use vortex_array::stats::PRUNING_STATS;
1414
use vortex_error::VortexResult;
1515
use vortex_io::session::RuntimeSessionExt;
1616
use vortex_session::VortexSession;
17+
use vortex_utils::parallelism::get_available_parallelism;
1718

1819
use crate::IntoLayout;
1920
use crate::LayoutRef;
@@ -44,9 +45,7 @@ impl Default for ZonedLayoutOptions {
4445
block_size: 8192,
4546
stats: PRUNING_STATS.into(),
4647
max_variable_length_statistics_size: 64,
47-
concurrency: std::thread::available_parallelism()
48-
.map(|v| v.get())
49-
.unwrap_or(1),
48+
concurrency: get_available_parallelism().unwrap_or(1),
5049
}
5150
}
5251
}

vortex-layout/src/scan/multi.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ use vortex_scan::PartitionRef;
5252
use vortex_scan::PartitionStream;
5353
use vortex_scan::ScanRequest;
5454
use vortex_session::VortexSession;
55+
use vortex_utils::parallelism::get_available_parallelism;
5556

5657
use crate::LayoutReaderRef;
5758
use crate::scan::scan_builder::ScanBuilder;
@@ -100,9 +101,7 @@ impl MultiLayoutDataSource {
100101
session: &VortexSession,
101102
) -> Self {
102103
let dtype = first.dtype().clone();
103-
let concurrency = std::thread::available_parallelism()
104-
.map(|v| v.get())
105-
.unwrap_or(DEFAULT_CONCURRENCY);
104+
let concurrency = get_available_parallelism().unwrap_or(DEFAULT_CONCURRENCY);
106105

107106
let mut children = Vec::with_capacity(1 + remaining.len());
108107
children.push(MultiLayoutChild::Opened(first));
@@ -126,9 +125,7 @@ impl MultiLayoutDataSource {
126125
factories: Vec<Arc<dyn LayoutReaderFactory>>,
127126
session: &VortexSession,
128127
) -> Self {
129-
let concurrency = std::thread::available_parallelism()
130-
.map(|v| v.get())
131-
.unwrap_or(DEFAULT_CONCURRENCY);
128+
let concurrency = get_available_parallelism().unwrap_or(DEFAULT_CONCURRENCY);
132129

133130
Self {
134131
dtype,

0 commit comments

Comments
 (0)