Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions fuzz/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,7 @@ use vortex_array::scalar_fn::fns::operators::Operator;
use vortex_array::search_sorted::SearchResult;
use vortex_array::search_sorted::SearchSorted;
use vortex_array::search_sorted::SearchSortedSide;
use vortex_btrblocks::BtrBlocksCompressor;
use vortex_btrblocks::BtrBlocksCompressorBuilder;
use vortex_btrblocks::FloatCode;
use vortex_btrblocks::IntCode;
use vortex_btrblocks::StringCode;
use vortex_btrblocks::{BtrBlocksCompressor, BtrBlocksCompressorBuilder, FloatCode, IntCode, StringCode};
use vortex_error::VortexExpect;
use vortex_error::vortex_panic;
use vortex_mask::Mask;
Expand Down
2 changes: 2 additions & 0 deletions vortex-datafusion/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ pub fn vortex_datafusion::VortexSource::with_projection_pushdown(self, enabled:

pub fn vortex_datafusion::VortexSource::with_scan_concurrency(self, scan_concurrency: usize) -> Self

pub fn vortex_datafusion::VortexSource::with_segment_cache_builder(self, builder: alloc::sync::Arc<dyn vortex_layout::segments::cache::SegmentCacheBuilder>) -> Self

pub fn vortex_datafusion::VortexSource::with_vortex_reader_factory(self, vortex_reader_factory: alloc::sync::Arc<dyn vortex_datafusion::reader::VortexReaderFactory>) -> Self

impl core::clone::Clone for vortex_datafusion::VortexSource
Expand Down
30 changes: 30 additions & 0 deletions vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use futures::FutureExt;
use futures::StreamExt;
use futures::TryStreamExt;
use futures::stream;
use object_store::ObjectMeta;
use object_store::path::Path;
use tracing::Instrument;
use vortex::array::ArrayRef;
Expand All @@ -53,6 +54,10 @@ use crate::convert::exprs::make_vortex_predicate;
use crate::convert::schema::calculate_physical_schema;
use crate::metrics::PARTITION_LABEL;
use crate::metrics::PATH_LABEL;
use vortex::layout::segments::FileIdentity;
use vortex::layout::segments::FileVersion;
use vortex::layout::segments::SegmentCacheBuilder;

use crate::persistent::cache::CachedVortexMetadata;
use crate::persistent::reader::VortexReaderFactory;
use crate::persistent::stream::PrunableStream;
Expand Down Expand Up @@ -93,6 +98,7 @@ pub(crate) struct VortexOpener {

pub expression_convertor: Arc<dyn ExpressionConvertor>,
pub file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
pub segment_cache_builder: Option<Arc<dyn SegmentCacheBuilder>>,
/// Whether to enable expression pushdown into the underlying Vortex scan.
pub projection_pushdown: bool,
pub scan_concurrency: Option<usize>,
Expand All @@ -118,6 +124,7 @@ impl FileOpener for VortexOpener {
let file_pruning_predicate = self.file_pruning_predicate.clone();
let expr_adapter_factory = self.expr_adapter_factory.clone();
let file_metadata_cache = self.file_metadata_cache.clone();
let segment_cache_builder = self.segment_cache_builder.clone();

let unified_file_schema = self.table_schema.file_schema().clone();
let batch_size = self.batch_size;
Expand Down Expand Up @@ -193,6 +200,11 @@ impl FileOpener for VortexOpener {
open_opts = open_opts.with_footer(vortex_metadata.footer().clone());
}

if let Some(builder) = segment_cache_builder {
let identity = file_identity(&file.object_meta);
open_opts = open_opts.with_segment_cache(builder.cache_for(&identity));
}

let vxf = open_opts
.open_read(reader)
.await
Expand Down Expand Up @@ -413,6 +425,18 @@ impl FileOpener for VortexOpener {
}
}

/// Build a [`FileIdentity`] from object store metadata, preferring the etag and falling
/// back to `(size, last_modified)` when no etag is available.
fn file_identity(meta: &ObjectMeta) -> FileIdentity {
let path = Arc::from(meta.location.as_ref());
let version = if let Some(etag) = meta.e_tag.as_deref() {
FileVersion::Etag(Arc::from(etag))
} else {
FileVersion::SizeMtime(meta.size, meta.last_modified.timestamp())
};
FileIdentity { path, version }
}

/// If the file has a [`FileRange`], we translate it into a row range in the file for the scan.
fn apply_byte_range(
file_range: FileRange,
Expand Down Expand Up @@ -569,6 +593,7 @@ mod tests {
has_output_ordering: false,
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
file_metadata_cache: None,
segment_cache_builder: None,
projection_pushdown: false,
scan_concurrency: None,
}
Expand Down Expand Up @@ -663,6 +688,7 @@ mod tests {
has_output_ordering: false,
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
file_metadata_cache: None,
segment_cache_builder: None,
projection_pushdown: false,
scan_concurrency: None,
};
Expand Down Expand Up @@ -749,6 +775,7 @@ mod tests {
has_output_ordering: false,
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
file_metadata_cache: None,
segment_cache_builder: None,
projection_pushdown: false,
scan_concurrency: None,
};
Expand Down Expand Up @@ -903,6 +930,7 @@ mod tests {
has_output_ordering: false,
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
file_metadata_cache: None,
segment_cache_builder: None,
projection_pushdown: false,
scan_concurrency: None,
};
Expand Down Expand Up @@ -962,6 +990,7 @@ mod tests {
has_output_ordering: false,
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
file_metadata_cache: None,
segment_cache_builder: None,
projection_pushdown: false,
scan_concurrency: None,
}
Expand Down Expand Up @@ -1163,6 +1192,7 @@ mod tests {
has_output_ordering: false,
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
file_metadata_cache: None,
segment_cache_builder: None,
projection_pushdown: false,
scan_concurrency: None,
};
Expand Down
24 changes: 23 additions & 1 deletion vortex-datafusion/src/persistent/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use object_store::path::Path;
use vortex::error::VortexExpect;
use vortex::file::VORTEX_FILE_EXTENSION;
use vortex::layout::LayoutReader;
use vortex::layout::segments::SegmentCacheBuilder;
use vortex::metrics::DefaultMetricsRegistry;
use vortex::metrics::MetricsRegistry;
use vortex::session::VortexSession;
Expand Down Expand Up @@ -65,6 +66,7 @@ pub struct VortexSource {
pub(crate) vortex_reader_factory: Option<Arc<dyn VortexReaderFactory>>,
vx_metrics_registry: Arc<dyn MetricsRegistry>,
file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
segment_cache_builder: Option<Arc<dyn SegmentCacheBuilder>>,
/// Whether to enable expression pushdown into the underlying Vortex scan.
options: VortexTableOptions,
}
Expand Down Expand Up @@ -92,6 +94,7 @@ impl VortexSource {
vortex_reader_factory: None,
vx_metrics_registry: Arc::new(DefaultMetricsRegistry::default()),
file_metadata_cache: None,
segment_cache_builder: None,
options: VortexTableOptions::default(),
}
}
Expand Down Expand Up @@ -136,7 +139,25 @@ impl VortexSource {
self
}

/// Set the underlying scan concurrency. This limit is used per Vortex scan operations.
/// Sets a [`SegmentCacheBuilder`] to reuse segment bytes across scans of the same files.
///
/// Without a builder every query re-reads zone map and data segments from object storage.
/// The builder is invoked once per opened file with that file's
/// [`FileIdentity`](vortex::layout::segments::FileIdentity); the returned per-file
/// [`SegmentCache`](vortex::layout::segments::SegmentCache) is wired into the file open
/// path. Use
/// [`NamespacedMokaSegmentCacheBuilder`](vortex::layout::segments::NamespacedMokaSegmentCacheBuilder)
/// for cross-query reuse with a global memory budget, optionally wrapped in
/// [`InstrumentedSegmentCacheBuilder`](vortex::layout::segments::InstrumentedSegmentCacheBuilder)
/// for hit/miss metrics.
pub fn with_segment_cache_builder(mut self, builder: Arc<dyn SegmentCacheBuilder>) -> Self {
self.segment_cache_builder = Some(builder);
self
}

/// Sets the per-file Vortex scan concurrency.
///
/// This is separate from DataFusion's partition-level parallelism.
pub fn with_scan_concurrency(mut self, scan_concurrency: usize) -> Self {
self.options.scan_concurrency = Some(scan_concurrency);
self
Expand Down Expand Up @@ -191,6 +212,7 @@ impl FileSource for VortexSource {
has_output_ordering: !base_config.output_ordering.is_empty(),
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
file_metadata_cache: self.file_metadata_cache.clone(),
segment_cache_builder: self.segment_cache_builder.clone(),
projection_pushdown: self.options.projection_pushdown,
scan_concurrency: self.options.scan_concurrency,
};
Expand Down
4 changes: 2 additions & 2 deletions vortex-file/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub fn vortex_file::segments::FileSegmentSource::request(&self, id: vortex_layou

pub struct vortex_file::segments::InitialReadSegmentCache

pub vortex_file::segments::InitialReadSegmentCache::fallback: alloc::sync::Arc<dyn vortex_layout::segments::cache::SegmentCache>
pub vortex_file::segments::InitialReadSegmentCache::fallback: vortex_layout::segments::cache::SharedSegmentCache

pub vortex_file::segments::InitialReadSegmentCache::initial: parking_lot::rwlock::RwLock<vortex_utils::aliases::hash_map::HashMap<vortex_layout::segments::SegmentId, vortex_buffer::ByteBuffer>>

Expand Down Expand Up @@ -310,7 +310,7 @@ pub fn vortex_file::VortexOpenOptions::with_labels(self, labels: alloc::vec::Vec

pub fn vortex_file::VortexOpenOptions::with_metrics_registry(self, metrics: alloc::sync::Arc<dyn vortex_metrics::MetricsRegistry>) -> Self

pub fn vortex_file::VortexOpenOptions::with_segment_cache(self, segment_cache: alloc::sync::Arc<dyn vortex_layout::segments::cache::SegmentCache>) -> Self
pub fn vortex_file::VortexOpenOptions::with_segment_cache(self, segment_cache: vortex_layout::segments::cache::SharedSegmentCache) -> Self

pub fn vortex_file::VortexOpenOptions::with_some_file_size(self, file_size: core::option::Option<u64>) -> Self

Expand Down
18 changes: 15 additions & 3 deletions vortex-file/src/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ use vortex_io::VortexReadAt;
use vortex_io::session::RuntimeSessionExt;
use vortex_layout::segments::InstrumentedSegmentCache;
use vortex_layout::segments::NoOpSegmentCache;
use vortex_layout::segments::SegmentCache;
use vortex_layout::segments::SegmentCacheSourceAdapter;
use vortex_layout::segments::SegmentId;
use vortex_layout::segments::SharedSegmentCache;
use vortex_layout::segments::SharedSegmentSource;
use vortex_layout::session::LayoutSessionExt;
use vortex_metrics::DefaultMetricsRegistry;
Expand All @@ -44,7 +44,7 @@ pub struct VortexOpenOptions {
/// The session to use for opening the file.
session: VortexSession,
/// Cache to use for file segments.
segment_cache: Option<Arc<dyn SegmentCache>>,
segment_cache: Option<SharedSegmentCache>,
/// The number of bytes to read when parsing the footer.
initial_read_size: usize,
/// An optional, externally provided, file size.
Expand Down Expand Up @@ -87,7 +87,19 @@ impl VortexOpenOptions {
}

/// Configure a custom [`SegmentCache`].
pub fn with_segment_cache(mut self, segment_cache: Arc<dyn SegmentCache>) -> Self {
///
/// The supplied cache must be **scoped to a single file**: [`SegmentId`] is a
/// file-local index, so reusing one cache across multiple files will alias entries
/// from different files onto the same key. For cross-file sharing use a
/// [`SegmentCacheBuilder`] (e.g.
/// [`NamespacedMokaSegmentCacheBuilder`](vortex_layout::segments::NamespacedMokaSegmentCacheBuilder))
/// at the layer that opens files, and pass the per-file [`SegmentCache`] it returns
/// here.
///
/// [`SegmentCache`]: vortex_layout::segments::SegmentCache
/// [`SegmentId`]: vortex_layout::segments::SegmentId
/// [`SegmentCacheBuilder`]: vortex_layout::segments::SegmentCacheBuilder
pub fn with_segment_cache(mut self, segment_cache: SharedSegmentCache) -> Self {
self.segment_cache = Some(segment_cache);
self
}
Expand Down
5 changes: 2 additions & 3 deletions vortex-file/src/segments/cache.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::sync::Arc;

use async_trait::async_trait;
use parking_lot::RwLock;
use vortex_buffer::ByteBuffer;
use vortex_error::VortexResult;
use vortex_layout::segments::SegmentCache;
use vortex_layout::segments::SegmentId;
use vortex_layout::segments::SharedSegmentCache;
use vortex_utils::aliases::hash_map::HashMap;

/// Segment cache containing the initial read segments.
pub struct InitialReadSegmentCache {
pub initial: RwLock<HashMap<SegmentId, ByteBuffer>>,
pub fallback: Arc<dyn SegmentCache>,
pub fallback: SharedSegmentCache,
}

#[async_trait]
Expand Down
Loading
Loading