Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions vortex-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ object_store = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "fs"] }
tokio-stream = { workspace = true }
tracing = { workspace = true, features = ["std", "attributes"] }
uuid = { workspace = true, features = ["v7"] }
vortex = { workspace = true, features = ["object_store", "tokio", "files"] }
vortex-utils = { workspace = true, features = ["dashmap"] }

Expand Down
82 changes: 82 additions & 0 deletions vortex-datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,85 @@ where
}
}
}

#[cfg(test)]
mod common_tests {
use std::sync::Arc;
use std::sync::LazyLock;

use datafusion::arrow::array::RecordBatch;
use datafusion::datasource::provider::DefaultTableFactory;
use datafusion::execution::SessionStateBuilder;
use datafusion::prelude::SessionContext;
use datafusion_common::GetExt;
use object_store::ObjectStore;
use object_store::memory::InMemory;
use url::Url;
use vortex::VortexSessionDefault;
use vortex::array::ArrayRef;
use vortex::array::arrow::FromArrowArray;
use vortex::file::WriteOptionsSessionExt;
use vortex::io::ObjectStoreWriter;
use vortex::io::VortexWrite;
use vortex::session::VortexSession;

use crate::VortexFormatFactory;
use crate::VortexOptions;

static VX_SESSION: LazyLock<VortexSession> = LazyLock::new(VortexSession::default);

pub struct TestSessionContext {
pub store: Arc<dyn ObjectStore>,
Comment thread
krinart marked this conversation as resolved.
pub session: SessionContext,
}

impl Default for TestSessionContext {
fn default() -> Self {
Self::new(false)
}
}

impl TestSessionContext {
/// Create a new test session context with the given projection pushdown setting.
pub fn new(projection_pushdown: bool) -> Self {
let store = Arc::new(InMemory::new());
let opts = VortexOptions {
projection_pushdown,
..Default::default()
};
let factory = Arc::new(VortexFormatFactory::new().with_options(opts));
let mut session_state_builder = SessionStateBuilder::new()
.with_default_features()
.with_table_factory(
factory.get_ext().to_uppercase(),
Arc::new(DefaultTableFactory::new()),
)
.with_object_store(&Url::try_from("file://").unwrap(), store.clone());

if let Some(file_formats) = session_state_builder.file_formats() {
file_formats.push(factory as _);
}

let session: SessionContext =
SessionContext::new_with_state(session_state_builder.build()).enable_url_table();

Self { store, session }
}

/// Write arrow data into a vortex file.
pub async fn write_arrow_batch<P>(&self, path: P, batch: &RecordBatch) -> anyhow::Result<()>
where
P: Into<object_store::path::Path>,
{
let array = ArrayRef::from_arrow(batch, false);
let mut write = ObjectStoreWriter::new(self.store.clone(), &path.into()).await?;
VX_SESSION
.write_options()
.write(&mut write, array.to_array_stream())
.await?;
write.shutdown().await?;

Ok(())
}
}
}
58 changes: 57 additions & 1 deletion vortex-datafusion/src/persistent/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use datafusion_common::Result as DFResult;
use datafusion_common::Statistics;
use datafusion_common::config::ConfigField;
use datafusion_common::config_namespace;
use datafusion_common::internal_datafusion_err;
use datafusion_common::not_impl_err;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::stats::Precision;
Expand All @@ -33,6 +34,8 @@ use datafusion_datasource::source::DataSourceExec;
use datafusion_expr::dml::InsertOp;
use datafusion_physical_expr::LexRequirement;
use datafusion_physical_plan::ExecutionPlan;
use datafusion_physical_plan::ExecutionPlanProperties;
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
use futures::FutureExt;
use futures::StreamExt as _;
use futures::TryStreamExt as _;
Expand Down Expand Up @@ -64,6 +67,7 @@ use crate::PrecisionExt as _;
use crate::convert::TryToDataFusion;

const DEFAULT_FOOTER_INITIAL_READ_SIZE_BYTES: usize = MAX_POSTSCRIPT_SIZE as usize + EOF_SIZE;
const DEFAULT_TARGET_FILE_SIZE_MB: usize = 128;

/// Vortex implementation of a DataFusion [`FileFormat`].
pub struct VortexFormat {
Expand Down Expand Up @@ -96,6 +100,24 @@ config_namespace! {
/// Values smaller than `MAX_POSTSCRIPT_SIZE + EOF_SIZE` will be clamped to that minimum
/// during footer parsing.
pub footer_initial_read_size_bytes: usize, default = DEFAULT_FOOTER_INITIAL_READ_SIZE_BYTES
/// Target file size in megabytes for written Vortex files.
///
/// When greater than 0 for non-partitioned writes, Vortex bypasses
/// DataFusion's file demuxer and splits output files based on
/// approximate byte size rather than row count.
pub target_file_size_mb: usize, default = DEFAULT_TARGET_FILE_SIZE_MB
/// Whether to enable projection pushdown into the underlying Vortex scan.
///
/// When enabled, projection expressions may be partially evaluated during
/// the scan. When disabled, Vortex reads only the referenced columns and
/// all expressions are evaluated after the scan.
pub projection_pushdown: bool, default = false
/// The intra-partition scan concurrency, controlling the number of row splits to process
/// concurrently per-thread within each file.
///
/// This does not affect the overall parallelism
/// across partitions, which is controlled by DataFusion's execution configuration.
pub scan_concurrency: Option<usize>, default = None
}
}

Expand Down Expand Up @@ -417,8 +439,42 @@ impl FileFormat for VortexFormat {
return not_impl_err!("Overwrites are not implemented yet for Vortex");
}

let target_file_size = (self.opts.target_file_size_mb > 0)
.then(|| {
u64::try_from(self.opts.target_file_size_mb)
.map_err(|e| {
internal_datafusion_err!(
"target_file_size_mb cannot be represented as u64: {e}"
)
})
.map(|v| v.saturating_mul(1024 * 1024).max(1))
})
.transpose()?;

// For non-partitioned writes, force a single input stream so VortexSink
// performs one coordinated write per statement instead of one
// independent write per CPU/input partition.
//
// Use coalescing rather than repartitioning to avoid introducing a
// shuffle/dispatcher step that can interleave batches from different
// input partitions.
//
// For partitioned writes, keep DataFusion's demuxer behavior.
let input: Arc<dyn ExecutionPlan> = if conf.table_partition_cols.is_empty()
&& input.output_partitioning().partition_count() > 1
{
Arc::new(CoalescePartitionsExec::new(input))
} else {
input
};

let schema = conf.output_schema().clone();
let sink = Arc::new(VortexSink::new(conf, schema, self.session.clone()));
let sink = Arc::new(VortexSink::new(
conf,
schema,
self.session.clone(),
target_file_size,
));

Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
}
Expand Down
Loading
Loading