Skip to content

Commit 17fa9ff

Browse files
committed
bufferd batch
Signed-off-by: Adam Gutglick <adam@spiraldb.com>
1 parent 0374b7f commit 17fa9ff

3 files changed

Lines changed: 43 additions & 30 deletions

File tree

vortex-datafusion/src/persistent/opener.rs

Lines changed: 34 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use arrow_schema::Schema;
99
use datafusion_common::DataFusionError;
1010
use datafusion_common::Result as DFResult;
1111
use datafusion_common::ScalarValue;
12+
use datafusion_common::arrow::array::RecordBatch;
1213
use datafusion_common::exec_datafusion_err;
1314
use datafusion_datasource::FileRange;
1415
use datafusion_datasource::PartitionedFile;
@@ -364,6 +365,7 @@ impl FileOpener for VortexOpener {
364365

365366
let stream_schema = Arc::new(stream_schema);
366367
let handle = session.handle();
368+
let file_location = file.object_meta.location.clone();
367369

368370
let stream = scan_builder
369371
.with_metrics_registry(metrics_registry)
@@ -372,7 +374,7 @@ impl FileOpener for VortexOpener {
372374
.with_ordered(has_output_ordering)
373375
.into_stream()
374376
.map_err(|e| exec_datafusion_err!("Failed to create Vortex stream: {e}"))?
375-
.then(move |chunk| {
377+
.map(move |chunk| {
376378
let session = session.clone();
377379
let stream_schema = Arc::clone(&stream_schema);
378380
let handle = handle.clone();
@@ -383,33 +385,11 @@ impl FileOpener for VortexOpener {
383385
})
384386
})
385387
})
388+
.buffered(2)
386389
.map_ok(move |rb| {
387-
// We try and slice the stream into respecting datafusion's configured batch size.
388-
stream::iter(
389-
(0..rb.num_rows().div_ceil(batch_size * 2))
390-
.flat_map(move |block_idx| {
391-
let offset = block_idx * batch_size * 2;
392-
393-
// If we have less than two batches worth of rows left, we keep them together as a single batch.
394-
if rb.num_rows() - offset < 2 * batch_size {
395-
let length = rb.num_rows() - offset;
396-
[Some(rb.slice(offset, length)), None].into_iter()
397-
} else {
398-
let first = rb.slice(offset, batch_size);
399-
let second = rb.slice(offset + batch_size, batch_size);
400-
[Some(first), Some(second)].into_iter()
401-
}
402-
})
403-
.flatten()
404-
.map(Ok),
405-
)
406-
})
407-
.map_err(move |e: VortexError| {
408-
DataFusionError::External(Box::new(e.with_context(format!(
409-
"Failed to read Vortex file: {}",
410-
file.object_meta.location
411-
))))
390+
stream::iter(split_record_batch(rb, batch_size).into_iter().map(Ok))
412391
})
392+
.map_err(move |e: VortexError| vortex_file_read_error(&file_location, e))
413393
.try_flatten()
414394
.map(move |batch| {
415395
if projector.projection().as_ref().is_empty() {
@@ -460,6 +440,33 @@ fn byte_range_to_row_range(byte_range: Range<u64>, row_count: u64, total_size: u
460440
start_row..u64::min(row_count, end_row)
461441
}
462442

443+
fn split_record_batch(rb: RecordBatch, batch_size: usize) -> Vec<RecordBatch> {
444+
assert!(batch_size > 0, "batch size must be positive");
445+
446+
let mut batches = Vec::new();
447+
let mut offset = 0;
448+
449+
while offset < rb.num_rows() {
450+
let remaining = rb.num_rows() - offset;
451+
if remaining < 2 * batch_size {
452+
batches.push(rb.slice(offset, remaining));
453+
break;
454+
}
455+
456+
batches.push(rb.slice(offset, batch_size));
457+
batches.push(rb.slice(offset + batch_size, batch_size));
458+
offset += batch_size * 2;
459+
}
460+
461+
batches
462+
}
463+
464+
fn vortex_file_read_error(path: &Path, error: VortexError) -> DataFusionError {
465+
DataFusionError::External(Box::new(
466+
error.with_context(format!("Failed to read Vortex file: {path}")),
467+
))
468+
}
469+
463470
#[cfg(test)]
464471
mod tests {
465472
use std::sync::Arc;
@@ -486,6 +493,7 @@ mod tests {
486493
use datafusion_expr::Operator;
487494
use datafusion_physical_expr::expressions as df_expr;
488495
use datafusion_physical_expr::projection::ProjectionExpr;
496+
use futures::TryStreamExt;
489497
use insta::assert_snapshot;
490498
use itertools::Itertools;
491499
use object_store::ObjectStore;

vortex-datafusion/src/persistent/stream.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,14 @@ use futures::stream::BoxStream;
1515
/// Utility to end a stream early if its backing [`PartitionFile`] can be pruned away by an updated dynamic expression.
1616
pub(crate) struct PrunableStream {
1717
file_pruner: FilePruner,
18-
stream: BoxStream<'static, DFResult<RecordBatch>>,
18+
stream: Option<BoxStream<'static, DFResult<RecordBatch>>>,
1919
}
2020

2121
impl PrunableStream {
2222
pub fn new(file_pruner: FilePruner, stream: BoxStream<'static, DFResult<RecordBatch>>) -> Self {
2323
Self {
2424
file_pruner,
25-
stream,
25+
stream: Some(stream),
2626
}
2727
}
2828
}
@@ -32,9 +32,13 @@ impl Stream for PrunableStream {
3232

3333
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3434
if self.as_mut().file_pruner.should_prune()? {
35+
self.stream.take();
3536
Poll::Ready(None)
3637
} else {
37-
self.stream.poll_next_unpin(cx)
38+
match self.stream.as_mut() {
39+
Some(stream) => stream.poll_next_unpin(cx),
40+
None => Poll::Ready(None),
41+
}
3842
}
3943
}
4044
}

vortex-datafusion/src/v2/source.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,7 @@ impl DataSource for VortexDataSource {
429429
let handle = session.handle();
430430
let stream = scan_streams
431431
.try_flatten_unordered(Some(num_partitions.get() * 2))
432-
.then(move |result| {
432+
.map(move |result| {
433433
let session = session.clone();
434434
let schema = Arc::clone(&projected_schema);
435435
let handle = handle.clone();
@@ -438,6 +438,7 @@ impl DataSource for VortexDataSource {
438438
result.and_then(|chunk| chunk.execute_record_batch(&schema, &mut ctx))
439439
})
440440
})
441+
.buffered(2)
441442
.map(|result| result.map_err(|e| DataFusionError::External(Box::new(e))));
442443

443444
// Apply leftover projection (expressions that couldn't be pushed into Vortex).

0 commit comments

Comments
 (0)