Skip to content

Commit 55a9552

Browse files
committed
step one
Signed-off-by: Adam Gutglick <adam@spiraldb.com>
1 parent ca31c98 commit 55a9552

6 files changed

Lines changed: 97 additions & 57 deletions

File tree

scripts/run_clickbench_rss.sh

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
#!/usr/bin/env bash
2+
set -euo pipefail
3+
4+
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
5+
cd "$ROOT_DIR"
6+
7+
if ! command -v gtime >/dev/null 2>&1; then
8+
echo "error: gtime not found in PATH" >&2
9+
exit 1
10+
fi
11+
12+
BINARY="${BINARY:-$ROOT_DIR/target/release/datafusion-bench}"
13+
FORMAT="${FORMAT:-vortex}"
14+
START_QUERY="${START_QUERY:-0}"
15+
END_QUERY="${END_QUERY:-42}"
16+
OUTPUT_DIR="${OUTPUT_DIR:-$ROOT_DIR/data/clickbench-rss}"
17+
18+
mkdir -p "$OUTPUT_DIR"
19+
20+
echo "building release benchmark binary..."
21+
cargo build -p datafusion-bench --release
22+
23+
timestamp="$(date +%Y%m%d-%H%M%S)"
24+
csv_path="$OUTPUT_DIR/clickbench-rss-$timestamp.csv"
25+
26+
cat >"$csv_path" <<'EOF'
27+
query,max_rss_kb,elapsed,user_seconds,system_seconds,cpu_percent,exit_status
28+
EOF
29+
30+
for ((q = START_QUERY; q <= END_QUERY; q++)); do
31+
log_path="$OUTPUT_DIR/q${q}.gtime.$timestamp.log"
32+
echo "running clickbench query $q ..."
33+
34+
gtime --verbose \
35+
"$BINARY" clickbench --formats "$FORMAT" --queries "$q" \
36+
>"$OUTPUT_DIR/q${q}.stdout.$timestamp.log" \
37+
2>"$log_path"
38+
39+
max_rss_kb="$(awk -F': ' '/Maximum resident set size/ {print $2}' "$log_path" | tr -d '[:space:]')"
40+
elapsed="$(awk -F': ' '/Elapsed \(wall clock\) time/ {print $2}' "$log_path" | sed 's/^ *//')"
41+
user_seconds="$(awk -F': ' '/User time \(seconds\)/ {print $2}' "$log_path" | tr -d '[:space:]')"
42+
system_seconds="$(awk -F': ' '/System time \(seconds\)/ {print $2}' "$log_path" | tr -d '[:space:]')"
43+
cpu_percent="$(awk -F': ' '/Percent of CPU this job got/ {print $2}' "$log_path" | tr -d '[:space:]')"
44+
exit_status="$(awk -F': ' '/Exit status/ {print $2}' "$log_path" | tr -d '[:space:]')"
45+
46+
printf '%s,%s,%s,%s,%s,%s,%s\n' \
47+
"$q" \
48+
"${max_rss_kb:-}" \
49+
"${elapsed:-}" \
50+
"${user_seconds:-}" \
51+
"${system_seconds:-}" \
52+
"${cpu_percent:-}" \
53+
"${exit_status:-}" \
54+
>>"$csv_path"
55+
done
56+
57+
echo "wrote summary to $csv_path"

vortex-datafusion/src/persistent/format.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ config_namespace! {
101101
/// all expressions are evaluated after the scan.
102102
pub projection_pushdown: bool, default = false
103103
/// The intra-partition scan concurrency, controlling the number of row splits to process
104-
/// concurrently per-thread within each file.
104+
/// concurrently within each file.
105105
///
106106
/// This does not affect the overall parallelism
107107
/// across partitions, which is controlled by DataFusion's execution configuration.

vortex-layout/src/layouts/flat/reader.rs

Lines changed: 22 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use std::collections::BTreeSet;
55
use std::ops::BitAnd;
66
use std::ops::Range;
77
use std::sync::Arc;
8-
use std::sync::OnceLock;
98

109
use futures::FutureExt;
1110
use futures::future::BoxFuture;
@@ -40,7 +39,6 @@ pub struct FlatReader {
4039
name: Arc<str>,
4140
segment_source: Arc<dyn SegmentSource>,
4241
session: VortexSession,
43-
array: OnceLock<SharedArrayFuture>,
4442
}
4543

4644
impl FlatReader {
@@ -55,38 +53,33 @@ impl FlatReader {
5553
name,
5654
segment_source,
5755
session,
58-
array: Default::default(),
5956
}
6057
}
6158

6259
/// Register the segment request and return a future that would resolve into the deserialised array.
6360
fn array_future(&self) -> SharedArrayFuture {
64-
self.array
65-
.get_or_init(|| {
66-
let row_count = usize::try_from(self.layout.row_count())
67-
.vortex_expect("row count must fit in usize");
68-
let segment_fut = self.segment_source.request(self.layout.segment_id());
69-
let ctx = self.layout.array_ctx().clone();
70-
let session = self.session.clone();
71-
let dtype = self.layout.dtype().clone();
72-
let array_tree = self.layout.array_tree().cloned();
73-
async move {
74-
let segment = segment_fut.await?;
75-
let parts = if let Some(array_tree) = array_tree {
76-
// Use the pre-stored flatbuffer from layout metadata combined with segment buffers.
77-
ArrayParts::from_flatbuffer_and_segment(array_tree, segment)?
78-
} else {
79-
// Parse the flatbuffer from the segment itself.
80-
ArrayParts::try_from(segment)?
81-
};
82-
parts
83-
.decode(&dtype, row_count, &ctx, &session)
84-
.map_err(Arc::new)
85-
}
86-
.boxed()
87-
.shared()
88-
})
89-
.clone()
61+
let row_count =
62+
usize::try_from(self.layout.row_count()).vortex_expect("row count must fit in usize");
63+
let segment_fut = self.segment_source.request(self.layout.segment_id());
64+
let ctx = self.layout.array_ctx().clone();
65+
let session = self.session.clone();
66+
let dtype = self.layout.dtype().clone();
67+
let array_tree = self.layout.array_tree().cloned();
68+
async move {
69+
let segment = segment_fut.await?;
70+
let parts = if let Some(array_tree) = array_tree {
71+
// Use the pre-stored flatbuffer from layout metadata combined with segment buffers.
72+
ArrayParts::from_flatbuffer_and_segment(array_tree, segment)?
73+
} else {
74+
// Parse the flatbuffer from the segment itself.
75+
ArrayParts::try_from(segment)?
76+
};
77+
parts
78+
.decode(&dtype, row_count, &ctx, &session)
79+
.map_err(Arc::new)
80+
}
81+
.boxed()
82+
.shared()
9083
}
9184
}
9285

vortex-scan/src/multi.rs

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -102,10 +102,6 @@ impl MultiLayoutDataSource {
102102
session: &VortexSession,
103103
) -> Self {
104104
let dtype = first.dtype().clone();
105-
let concurrency = std::thread::available_parallelism()
106-
.map(|v| v.get())
107-
.unwrap_or(DEFAULT_CONCURRENCY);
108-
109105
let mut children = Vec::with_capacity(1 + remaining.len());
110106
children.push(MultiLayoutChild::Opened(first));
111107
children.extend(remaining.into_iter().map(MultiLayoutChild::Deferred));
@@ -114,7 +110,7 @@ impl MultiLayoutDataSource {
114110
dtype,
115111
session: session.clone(),
116112
children,
117-
concurrency,
113+
concurrency: DEFAULT_CONCURRENCY,
118114
}
119115
}
120116

@@ -128,25 +124,21 @@ impl MultiLayoutDataSource {
128124
factories: Vec<Arc<dyn LayoutReaderFactory>>,
129125
session: &VortexSession,
130126
) -> Self {
131-
let concurrency = std::thread::available_parallelism()
132-
.map(|v| v.get())
133-
.unwrap_or(DEFAULT_CONCURRENCY);
134-
135127
Self {
136128
dtype,
137129
session: session.clone(),
138130
children: factories
139131
.into_iter()
140132
.map(MultiLayoutChild::Deferred)
141133
.collect(),
142-
concurrency,
134+
concurrency: DEFAULT_CONCURRENCY,
143135
}
144136
}
145137

146138
/// Sets the concurrency for opening deferred readers.
147139
///
148140
/// Controls how many file opens run in parallel via `buffer_unordered`.
149-
/// Defaults to the number of available CPU cores.
141+
/// Defaults to a fixed budget.
150142
pub fn with_concurrency(mut self, concurrency: usize) -> Self {
151143
self.concurrency = concurrency;
152144
self

vortex-scan/src/repeated_scan.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ pub struct RepeatedScan<A: 'static + Send> {
7171
selection: Selection,
7272
/// The natural splits of the file.
7373
splits: Splits,
74-
/// The number of splits to make progress on concurrently **per-thread**.
74+
/// The total number of splits to make progress on concurrently.
7575
concurrency: usize,
7676
/// Function to apply to each [`ArrayRef`] within the spawned split tasks.
7777
map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
@@ -148,11 +148,12 @@ impl<A: 'static + Send> RepeatedScan<A> {
148148
&self,
149149
row_range: Option<Range<u64>>,
150150
) -> VortexResult<BoxStream<'static, VortexResult<A>>> {
151-
let num_workers = std::thread::available_parallelism()
152-
.map(|n| n.get())
153-
.unwrap_or(1);
154-
let concurrency = self.concurrency * num_workers;
155-
self.execute_stream(row_range, concurrency, self.ordered, self.session.handle())
151+
self.execute_stream(
152+
row_range,
153+
self.concurrency,
154+
self.ordered,
155+
self.session.handle(),
156+
)
156157
}
157158

158159
fn legacy_stream_from_ranges(

vortex-scan/src/scan_builder.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ pub struct ScanBuilder<A> {
6161
selection: Selection,
6262
/// How to split the file for concurrent processing.
6363
split_by: SplitBy,
64-
/// The number of splits to make progress on concurrently **per-thread**.
64+
/// The total number of splits to make progress on concurrently.
6565
concurrency: usize,
6666
/// Function to apply to each [`ArrayRef`] within the spawned split tasks.
6767
map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
@@ -80,6 +80,7 @@ pub struct ScanBuilder<A> {
8080
}
8181

8282
const DEFAULT_TARGET_OUTPUT_ROWS: usize = DEFAULT_TARGET_OUTPUT_ROWS_HINT;
83+
const DEFAULT_SCAN_CONCURRENCY: usize = 16;
8384
const MIN_TARGET_OUTPUT_BYTES: usize = 1 << 20;
8485
const MAX_TARGET_OUTPUT_BYTES: usize = 8 << 20;
8586
const DEFAULT_VARIABLE_ROW_BYTES: usize = 32;
@@ -96,9 +97,9 @@ impl ScanBuilder<ArrayRef> {
9697
row_range: None,
9798
selection: Default::default(),
9899
split_by: SplitBy::Layout,
99-
// We default to four tasks per worker thread, which allows for some I/O lookahead
100-
// without too much impact on work-stealing.
101-
concurrency: 4,
100+
// Keep one explicit scan-concurrency budget rather than multiplying by the
101+
// runtime's worker count later.
102+
concurrency: DEFAULT_SCAN_CONCURRENCY,
102103
map_fn: Arc::new(Ok),
103104
metrics_registry: None,
104105
file_stats: None,
@@ -186,8 +187,7 @@ impl<A: 'static + Send> ScanBuilder<A> {
186187
self.concurrency
187188
}
188189

189-
/// The number of row splits to make progress on concurrently per-thread, must
190-
/// be greater than 0.
190+
/// The total number of row splits to make progress on concurrently.
191191
pub fn with_concurrency(mut self, concurrency: usize) -> Self {
192192
assert!(concurrency > 0);
193193
self.concurrency = concurrency;
@@ -400,10 +400,7 @@ impl<A: 'static + Send> Stream for LazyScanStream<A> {
400400
LazyScanState::Builder(builder) => {
401401
let builder = builder.take().vortex_expect("polled after completion");
402402
let ordered = builder.ordered;
403-
let num_workers = std::thread::available_parallelism()
404-
.map(|n| n.get())
405-
.unwrap_or(1);
406-
let concurrency = builder.concurrency * num_workers;
403+
let concurrency = builder.concurrency;
407404
let handle = builder.session.handle();
408405
let execute_handle = handle.clone();
409406
let task = handle.spawn_blocking(move || {

0 commit comments

Comments
 (0)