Skip to content

Commit ba17eee

Browse files
committed
share config
Signed-off-by: Adam Gutglick <adam@spiraldb.com>
1 parent a1adc57 commit ba17eee

6 files changed

Lines changed: 272 additions & 183 deletions

File tree

vortex-datafusion/src/persistent/opener.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ use vortex::layout::LayoutReader;
4242
use vortex::metrics::Label;
4343
use vortex::metrics::MetricsRegistry;
4444
use vortex::scan::ScanBuilder;
45+
use vortex::scan::api::DEFAULT_TARGET_OUTPUT_BYTES_HINT;
46+
use vortex::scan::api::DEFAULT_TARGET_OUTPUT_ROWS_HINT;
4547
use vortex::session::VortexSession;
4648
use vortex_utils::aliases::dash_map::DashMap;
4749
use vortex_utils::aliases::dash_map::Entry;
@@ -361,8 +363,8 @@ impl FileOpener for VortexOpener {
361363
.with_projection(scan_projection)
362364
.with_some_filter(filter)
363365
.with_ordered(has_output_ordering)
364-
.with_target_output_rows(64 * 1024)
365-
.with_target_output_bytes(4 << 20)
366+
.with_target_output_rows(DEFAULT_TARGET_OUTPUT_ROWS_HINT)
367+
.with_target_output_bytes(DEFAULT_TARGET_OUTPUT_BYTES_HINT)
366368
.map(move |chunk| {
367369
let mut ctx = session.create_execution_ctx();
368370
chunk.execute_record_batch(&stream_schema, &mut ctx)

vortex-scan/src/api.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,18 @@ use crate::Selection;
4141
/// A sendable stream of partitions.
4242
pub type PartitionStream = BoxStream<'static, VortexResult<PartitionRef>>;
4343

44+
/// Shared default lower bound for rows accumulated before materializing projected data.
45+
///
46+
/// Engines that drive scans via [`ScanRequest`] can use this to align with the tuned Vortex
47+
/// execution path used by DataFusion.
48+
pub const DEFAULT_TARGET_OUTPUT_ROWS_HINT: usize = 64 * 1024;
49+
50+
/// Shared default lower bound for projected payload bytes accumulated before materialization.
51+
///
52+
/// Engines that drive scans via [`ScanRequest`] can use this to align with the tuned Vortex
53+
/// execution path used by DataFusion.
54+
pub const DEFAULT_TARGET_OUTPUT_BYTES_HINT: usize = 4 << 20;
55+
4456
/// Opens a Vortex [`DataSource`] from a URI.
4557
///
4658
/// Configuration can be passed via the URI query parameters, similar to JDBC / ADBC.
@@ -126,6 +138,10 @@ pub struct ScanRequest {
126138
/// Optional limit on the number of rows returned by scan. Limits are applied after all
127139
/// filtering and row selection.
128140
pub limit: Option<u64>,
141+
/// Preferred lower bound for rows accumulated before materializing projected data.
142+
pub target_output_rows: Option<usize>,
143+
/// Preferred lower bound for projected payload bytes accumulated before materialization.
144+
pub target_output_bytes: Option<usize>,
129145
}
130146

131147
impl Default for ScanRequest {
@@ -137,6 +153,8 @@ impl Default for ScanRequest {
137153
selection: Selection::default(),
138154
ordered: false,
139155
limit: None,
156+
target_output_rows: None,
157+
target_output_bytes: None,
140158
}
141159
}
142160
}

vortex-scan/src/layout.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ use vortex_session::VortexSession;
3434

3535
use crate::ScanBuilder;
3636
use crate::Selection;
37+
use crate::api::DEFAULT_TARGET_OUTPUT_BYTES_HINT;
38+
use crate::api::DEFAULT_TARGET_OUTPUT_ROWS_HINT;
3739
use crate::api::DataSource;
3840
use crate::api::DataSourceScan;
3941
use crate::api::DataSourceScanRef;
@@ -166,6 +168,8 @@ impl DataSource for LayoutReaderDataSource {
166168
limit: scan_request.limit,
167169
selection: scan_request.selection,
168170
ordered: scan_request.ordered,
171+
target_output_rows: scan_request.target_output_rows,
172+
target_output_bytes: scan_request.target_output_bytes,
169173
metrics_registry: self.metrics_registry.clone(),
170174
next_row: row_range.start,
171175
end_row: row_range.end,
@@ -187,6 +191,8 @@ struct LayoutReaderScan {
187191
limit: Option<u64>,
188192
ordered: bool,
189193
selection: Selection,
194+
target_output_rows: Option<usize>,
195+
target_output_bytes: Option<usize>,
190196
metrics_registry: Option<Arc<dyn MetricsRegistry>>,
191197
next_row: u64,
192198
end_row: u64,
@@ -254,6 +260,8 @@ impl Stream for LayoutReaderScan {
254260
ordered: this.ordered,
255261
row_range,
256262
selection: this.selection.clone(),
263+
target_output_rows: this.target_output_rows,
264+
target_output_bytes: this.target_output_bytes,
257265
metrics_registry: this.metrics_registry.clone(),
258266
}) as PartitionRef;
259267

@@ -281,6 +289,8 @@ struct LayoutReaderSplit {
281289
ordered: bool,
282290
row_range: Range<u64>,
283291
selection: Selection,
292+
target_output_rows: Option<usize>,
293+
target_output_bytes: Option<usize>,
284294
metrics_registry: Option<Arc<dyn MetricsRegistry>>,
285295
}
286296

@@ -312,8 +322,17 @@ impl Partition for LayoutReaderSplit {
312322
.with_projection(self.projection)
313323
.with_some_filter(self.filter)
314324
.with_some_limit(self.limit)
315-
.with_some_metrics_registry(self.metrics_registry)
316325
.with_ordered(self.ordered);
326+
let builder = builder
327+
.with_target_output_rows(
328+
self.target_output_rows
329+
.unwrap_or(DEFAULT_TARGET_OUTPUT_ROWS_HINT),
330+
)
331+
.with_target_output_bytes(
332+
self.target_output_bytes
333+
.unwrap_or(DEFAULT_TARGET_OUTPUT_BYTES_HINT),
334+
);
335+
let builder = builder.with_some_metrics_registry(self.metrics_registry);
317336

318337
let dtype = builder.dtype()?;
319338
// Use into_stream() which creates a LazyScanStream that spawns individual I/O

vortex-scan/src/multi.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ use vortex_mask::Mask;
4848
use vortex_session::VortexSession;
4949

5050
use crate::ScanBuilder;
51+
use crate::api::DEFAULT_TARGET_OUTPUT_BYTES_HINT;
52+
use crate::api::DEFAULT_TARGET_OUTPUT_ROWS_HINT;
5153
use crate::api::DataSource;
5254
use crate::api::DataSourceScan;
5355
use crate::api::DataSourceScanRef;
@@ -397,7 +399,17 @@ impl Partition for MultiLayoutPartition {
397399
.with_projection(request.projection)
398400
.with_some_filter(request.filter)
399401
.with_some_limit(request.limit)
400-
.with_ordered(request.ordered);
402+
.with_ordered(request.ordered)
403+
.with_target_output_rows(
404+
request
405+
.target_output_rows
406+
.unwrap_or(DEFAULT_TARGET_OUTPUT_ROWS_HINT),
407+
)
408+
.with_target_output_bytes(
409+
request
410+
.target_output_bytes
411+
.unwrap_or(DEFAULT_TARGET_OUTPUT_BYTES_HINT),
412+
);
401413

402414
if let Some(row_range) = request.row_range {
403415
builder = builder.with_row_range(row_range);

0 commit comments

Comments
 (0)