Skip to content

Commit deb1dc0

Browse files
committed
fixes
Signed-off-by: Robert Kruszewski <github@robertk.io>
1 parent 3a91217 commit deb1dc0

7 files changed

Lines changed: 108 additions & 112 deletions

File tree

java/vortex-jni/src/test/java/dev/vortex/api/TestMinimal.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import dev.vortex.jni.NativeLoader;
1212
import java.io.IOException;
1313
import java.math.BigDecimal;
14+
import java.nio.file.Files;
1415
import java.nio.file.Path;
1516
import java.util.ArrayList;
1617
import java.util.HashMap;
@@ -132,6 +133,7 @@ public void testFullScan() throws Exception {
132133
DataSource ds = DataSource.open(session, writePath);
133134

134135
assertEquals(new DataSource.RowCount.Exact(10L), ds.rowCount());
136+
assertEquals(new DataSource.ByteSize.Exact(Files.size(tempDir.resolve("minimal.vortex"))), ds.byteSize());
135137

136138
var schema = ds.arrowSchema(allocator);
137139
assertEquals(

java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexBatchExec.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package dev.vortex.spark.read;
55

66
import com.google.common.collect.ImmutableMap;
7+
import dev.vortex.api.Session;
78
import dev.vortex.jni.NativeFiles;
89
import dev.vortex.spark.VortexFilePartition;
910
import dev.vortex.spark.VortexSparkSession;
@@ -76,14 +77,19 @@ public PartitionReaderFactory createReaderFactory() {
7677
}
7778

7879
private List<String> resolvePaths() {
79-
var session = VortexSparkSession.get(formatOptions);
80+
return resolveVortexPaths(VortexSparkSession.get(formatOptions), paths, formatOptions);
81+
}
82+
83+
/**
84+
* Expands directory-like entries to concrete {@code .vortex} files; entries that already name a {@code .vortex}
85+
* file are kept as-is. Shared with {@link VortexScan#estimateStatistics()} so planning and execution resolve paths
86+
* identically.
87+
*/
88+
static List<String> resolveVortexPaths(Session session, List<String> paths, Map<String, String> formatOptions) {
8089
return paths.stream()
81-
.flatMap(path -> {
82-
if (path.endsWith(".vortex")) {
83-
return Stream.of(path);
84-
}
85-
return NativeFiles.listFiles(session, path, formatOptions).stream();
86-
})
90+
.flatMap(path -> path.endsWith(".vortex")
91+
? Stream.of(path)
92+
: NativeFiles.listFiles(session, path, formatOptions).stream())
8793
.collect(Collectors.toList());
8894
}
8995

java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScan.java

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,11 @@
55

66
import dev.vortex.api.DataSource;
77
import dev.vortex.api.Session;
8-
import dev.vortex.jni.NativeFiles;
98
import dev.vortex.spark.VortexSparkSession;
109
import java.util.Arrays;
11-
import java.util.HashMap;
1210
import java.util.List;
1311
import java.util.Map;
1412
import java.util.OptionalLong;
15-
import java.util.stream.Collectors;
16-
import java.util.stream.Stream;
1713
import org.apache.spark.sql.connector.catalog.CatalogV2Util;
1814
import org.apache.spark.sql.connector.catalog.Column;
1915
import org.apache.spark.sql.connector.expressions.NamedReference;
@@ -140,14 +136,7 @@ public Statistics estimateStatistics() {
140136

141137
private Statistics computeStatistics() {
142138
Session session = VortexSparkSession.get(formatOptions);
143-
// Expand directory paths to concrete files the way VortexBatchExec does, so we use the
144-
// same per-path resolution end-to-end.
145-
List<String> resolvedPaths = paths.stream()
146-
.flatMap(path -> path.endsWith(".vortex")
147-
? Stream.of(path)
148-
: NativeFiles.listFiles(session, path, formatOptions).stream())
149-
.collect(Collectors.toList());
150-
139+
List<String> resolvedPaths = VortexBatchExec.resolveVortexPaths(session, paths, formatOptions);
151140
if (resolvedPaths.isEmpty()) {
152141
return new VortexStatistics(OptionalLong.empty(), OptionalLong.empty());
153142
}
@@ -181,7 +170,7 @@ private record VortexStatistics(OptionalLong numRows, OptionalLong sizeInBytes)
181170

182171
@Override
183172
public Map<NamedReference, ColumnStatistics> columnStats() {
184-
return new HashMap<>();
173+
return Map.of();
185174
}
186175
}
187176
}

java/vortex-spark/src/test/java/dev/vortex/spark/VortexDataSourceStatsTest.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,17 +98,26 @@ public void testEstimateStatisticsAcrossMultipleFiles() throws IOException {
9898
public void testEstimateStatisticsReportsSizeInBytes() throws IOException {
9999
Path outputPath = writeRows(120, "with_size", 3);
100100

101-
long expectedTotalBytes = totalVortexFileBytes(outputPath);
102-
assertTrue(expectedTotalBytes > 0, "Test setup should produce at least one non-empty .vortex file");
101+
long fileBytes = totalVortexFileBytes(outputPath);
102+
assertTrue(fileBytes > 0, "Test setup should produce at least one non-empty .vortex file");
103103

104104
VortexScan scan = buildScan(outputPath);
105105
Statistics stats = scan.estimateStatistics();
106106

107107
assertTrue(
108108
stats.sizeInBytes().isPresent(),
109109
"VortexScan should surface a sizeInBytes when the filesystem listing reports file sizes");
110+
// Mirror the scan's Spark-convention scaling (factor 1.0, unpruned schema), which divides and
111+
// re-multiplies by the schema default size in double arithmetic before truncating; asserting
112+
// against the raw byte sum would be sensitive to the floating-point round trip.
113+
StructType schema = spark.read()
114+
.format("vortex")
115+
.option("path", outputPath.toUri().toString())
116+
.load()
117+
.schema();
118+
long expectedSize = (long) (1.0 * fileBytes / schema.defaultSize() * schema.defaultSize());
110119
assertEquals(
111-
expectedTotalBytes,
120+
expectedSize,
112121
stats.sizeInBytes().getAsLong(),
113122
"sizeInBytes should equal the sum of on-storage .vortex file sizes");
114123
}

vortex-file/src/multi/mod.rs

Lines changed: 37 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,12 @@ mod session;
88
use std::sync::Arc;
99

1010
use async_trait::async_trait;
11+
use futures::StreamExt;
1112
use futures::TryStreamExt;
13+
use futures::stream;
1214
use session::MultiFileSessionExt;
1315
use tracing::debug;
16+
use vortex_error::VortexError;
1417
use vortex_error::VortexResult;
1518
use vortex_error::vortex_bail;
1619
use vortex_io::filesystem::FileListing;
@@ -60,19 +63,20 @@ pub struct MultiFileDataSource {
6063
/// List of (glob, optional filesystem) pairs to resolve.
6164
/// When the filesystem is None, a local filesystem will be created in build().
6265
glob_sources: Vec<(String, Option<FileSystemRef>)>,
63-
/// Pre-resolved file listings that skip glob expansion. The caller is responsible for
64-
/// supplying the [`FileListing::size`] when stats reporting matters.
65-
listing_sources: Vec<(FileListing, FileSystemRef)>,
6666
open_options_fn: Arc<dyn Fn(VortexOpenOptions) -> VortexOpenOptions + Send + Sync>,
6767
}
6868

69+
/// In-flight glob resolutions in [`MultiFileDataSource::build`]. Callers like the JNI data
70+
/// source add one exact path per glob source, where each resolution is a single remote
71+
/// metadata lookup; resolving them concurrently avoids one round trip of latency per file.
72+
const GLOB_RESOLUTION_CONCURRENCY: usize = 16;
73+
6974
impl MultiFileDataSource {
7075
/// Create a new [`MultiFileDataSource`] builder.
7176
pub fn new(session: VortexSession) -> Self {
7277
Self {
7378
session,
7479
glob_sources: Vec::new(),
75-
listing_sources: Vec::new(),
7680
open_options_fn: Arc::new(|opts| opts),
7781
}
7882
}
@@ -98,23 +102,6 @@ impl MultiFileDataSource {
98102
self
99103
}
100104

101-
/// Add a pre-resolved file listing.
102-
///
103-
/// Use this when the caller already knows the exact file path and (optionally) its size,
104-
/// avoiding the glob expansion done by [`Self::with_glob`]. Supplying
105-
/// [`FileListing::size`] is required for [`DataSource::byte_size`] to surface a contribution
106-
/// from this file; otherwise the source size remains unknown for this file and the
107-
/// data-source-level total is extrapolated from the files that do report a size.
108-
pub fn with_listing(mut self, listing: FileListing, fs: FileSystemRef) -> Self {
109-
let FileListing { path, size } = listing;
110-
let listing = FileListing {
111-
path: path.trim_start_matches('/').to_string(),
112-
size,
113-
};
114-
self.listing_sources.push((listing, fs));
115-
self
116-
}
117-
118105
/// Customize [`VortexOpenOptions`] applied to each file.
119106
///
120107
/// Use this to configure segment caches, metrics registries, or other per-file options.
@@ -131,10 +118,8 @@ impl MultiFileDataSource {
131118
/// Discovers files via glob, opens the first file eagerly to determine the schema,
132119
/// and creates lazy factories for the remaining files.
133120
pub async fn build(self) -> VortexResult<MultiLayoutDataSource> {
134-
if self.glob_sources.is_empty() && self.listing_sources.is_empty() {
135-
vortex_bail!(
136-
"MultiFileDataSource requires at least one glob pattern or pre-resolved listing"
137-
);
121+
if self.glob_sources.is_empty() {
122+
vortex_bail!("MultiFileDataSource requires at least one glob pattern");
138123
}
139124

140125
// Create local filesystem lazily if needed (only if any glob lacks a filesystem).
@@ -145,32 +130,39 @@ impl MultiFileDataSource {
145130
.then(|| create_local_filesystem(&self.session))
146131
.transpose()?;
147132

148-
// Collect files from all glob sources.
149-
let mut all_files: Vec<(FileListing, FileSystemRef)> = Vec::new();
150-
for (glob, maybe_fs) in &self.glob_sources {
151-
// Use the provided filesystem, or fall back to the local filesystem.
152-
// We know local_fs is Some when maybe_fs is None (by construction above).
153-
let fs = maybe_fs
154-
.as_ref()
155-
.or(local_fs.as_ref())
156-
.map(Arc::clone)
157-
.unwrap_or_else(|| {
158-
unreachable!("local_fs is set when any glob lacks a filesystem")
159-
});
160-
let files: Vec<FileListing> = fs.glob(glob)?.try_collect().await?;
161-
for file in files {
162-
all_files.push((file, Arc::clone(&fs)));
163-
}
164-
}
165-
all_files.extend(self.listing_sources);
133+
let globs: Vec<String> = self.glob_sources.iter().map(|(g, _)| g.clone()).collect();
134+
135+
// Resolve glob sources concurrently while preserving their order, since the order
136+
// determines partition indices and which file is opened eagerly for the schema.
137+
let resolved: Vec<Vec<(FileListing, FileSystemRef)>> =
138+
stream::iter(self.glob_sources.into_iter().map(|(glob, maybe_fs)| {
139+
// Use the provided filesystem, or fall back to the local filesystem.
140+
// We know local_fs is Some when maybe_fs is None (by construction above).
141+
let fs = maybe_fs
142+
.or_else(|| local_fs.as_ref().map(Arc::clone))
143+
.unwrap_or_else(|| {
144+
unreachable!("local_fs is set when any glob lacks a filesystem")
145+
});
146+
async move {
147+
let files: Vec<FileListing> = fs.glob(&glob)?.try_collect().await?;
148+
Ok::<_, VortexError>(
149+
files
150+
.into_iter()
151+
.map(|file| (file, Arc::clone(&fs)))
152+
.collect(),
153+
)
154+
}
155+
}))
156+
.buffered(GLOB_RESOLUTION_CONCURRENCY)
157+
.try_collect()
158+
.await?;
159+
let all_files: Vec<(FileListing, FileSystemRef)> = resolved.into_iter().flatten().collect();
166160

167161
if all_files.is_empty() {
168-
let globs: Vec<_> = self.glob_sources.iter().map(|(g, _)| g.as_str()).collect();
169162
vortex_bail!("No files matched the glob pattern(s): {:?}", globs);
170163
}
171164

172165
let file_count = all_files.len();
173-
let globs: Vec<_> = self.glob_sources.iter().map(|(g, _)| g.as_str()).collect();
174166
debug!(file_count, glob = ?globs, "discovered files");
175167

176168
// Open first file eagerly for dtype.

vortex-jni/src/data_source.rs

Lines changed: 2 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@ use std::path::PathBuf;
1616
use std::path::absolute;
1717
use std::sync::Arc;
1818

19-
use futures::StreamExt;
20-
use futures::stream;
2119
use jni::EnvUnowned;
2220
use jni::objects::JClass;
2321
use jni::objects::JLongArray;
@@ -30,7 +28,6 @@ use vortex::error::VortexResult;
3028
use vortex::error::vortex_err;
3129
use vortex::expr::stats::Precision;
3230
use vortex::file::multi::MultiFileDataSource;
33-
use vortex::io::filesystem::FileListing;
3431
use vortex::io::filesystem::FileSystemRef;
3532
use vortex::io::runtime::BlockingRuntime;
3633
use vortex::io::session::RuntimeSessionExt;
@@ -44,10 +41,6 @@ use crate::file::extract_properties;
4441
use crate::object_store::object_store_fs;
4542
use crate::session::session_ref;
4643

47-
/// In-flight size lookups while resolving exact paths to file listings. Balances HEAD
48-
/// throughput on remote stores against connection overhead.
49-
const SIZE_LOOKUP_CONCURRENCY: usize = 16;
50-
5144
/// Wraps an `Arc<dyn DataSource>` behind a single pointer.
5245
pub(crate) struct NativeDataSource {
5346
inner: DataSourceRef,
@@ -110,49 +103,14 @@ pub extern "system" fn Java_dev_vortex_jni_NativeDataSource_open(
110103
}
111104
}
112105

113-
// Split inputs into glob patterns (which fs.glob() expands via list(), capturing sizes
114-
// automatically) and exact paths (which are resolved one-by-one with a HEAD-style size
115-
// lookup so the data source can report total bytes for Spark-style stats).
116-
let mut glob_inputs: Vec<(String, FileSystemRef)> = Vec::new();
117-
let mut exact_inputs: Vec<(String, FileSystemRef)> = Vec::new();
106+
let mut builder = MultiFileDataSource::new(session.clone());
118107
for glob_url in &glob_urls {
119108
let base = base_url(glob_url);
120109
let fs = fs_cache
121110
.get(&base)
122111
.cloned()
123112
.unwrap_or_else(|| unreachable!("fs cached for every base url"));
124-
let path = glob_url.path().to_string();
125-
if path.contains(['*', '?', '[']) {
126-
glob_inputs.push((path, fs));
127-
} else {
128-
exact_inputs.push((path, fs));
129-
}
130-
}
131-
132-
let resolved_listings: Vec<(FileListing, FileSystemRef)> = if exact_inputs.is_empty() {
133-
Vec::new()
134-
} else {
135-
RUNTIME.block_on(async {
136-
stream::iter(exact_inputs)
137-
.map(|(path, fs)| async move {
138-
let size = match fs.open_read(&path).await {
139-
Ok(source) => source.size().await.ok(),
140-
Err(_) => None,
141-
};
142-
(FileListing { path, size }, fs)
143-
})
144-
.buffer_unordered(SIZE_LOOKUP_CONCURRENCY)
145-
.collect::<Vec<_>>()
146-
.await
147-
})
148-
};
149-
150-
let mut builder = MultiFileDataSource::new(session.clone());
151-
for (glob, fs) in glob_inputs {
152-
builder = builder.with_glob(glob, Some(fs));
153-
}
154-
for (listing, fs) in resolved_listings {
155-
builder = builder.with_listing(listing, fs);
113+
builder = builder.with_glob(glob_url.path(), Some(fs));
156114
}
157115

158116
let inner = RUNTIME

vortex-layout/src/scan/multi.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -530,3 +530,43 @@ impl Partition for MultiLayoutPartition {
530530
)))
531531
}
532532
}
533+
534+
#[cfg(test)]
535+
mod tests {
536+
use rstest::rstest;
537+
use vortex_array::dtype::Nullability;
538+
539+
use super::*;
540+
use crate::scan::test::new_session;
541+
542+
struct NeverOpened;
543+
544+
#[async_trait]
545+
impl LayoutReaderFactory for NeverOpened {
546+
async fn open(&self) -> VortexResult<Option<LayoutReaderRef>> {
547+
unreachable!("byte_size must not open readers")
548+
}
549+
}
550+
551+
fn deferred_source(byte_sizes: Vec<Option<u64>>) -> MultiLayoutDataSource {
552+
let factories: Vec<Arc<dyn LayoutReaderFactory>> = byte_sizes
553+
.iter()
554+
.map(|_| Arc::new(NeverOpened) as _)
555+
.collect();
556+
MultiLayoutDataSource::new_deferred(
557+
DType::Bool(Nullability::NonNullable),
558+
factories,
559+
byte_sizes,
560+
&new_session(),
561+
)
562+
}
563+
564+
#[rstest]
565+
#[case::all_known(vec![Some(10), Some(20), Some(30)], Precision::exact(60u64))]
566+
#[case::some_known_extrapolates(vec![Some(10), None, Some(30)], Precision::inexact(60u64))]
567+
#[case::none_known(vec![None, None], Precision::Absent)]
568+
#[case::no_children(vec![], Precision::exact(0u64))]
569+
fn byte_size_precision(#[case] sizes: Vec<Option<u64>>, #[case] expected: Precision<u64>) {
570+
assert_eq!(deferred_source(sizes).byte_size(), expected);
571+
}
572+
}

0 commit comments

Comments
 (0)