diff --git a/java/vortex-jni/src/main/java/dev/vortex/api/DataSource.java b/java/vortex-jni/src/main/java/dev/vortex/api/DataSource.java index 24780785274..93c5df2c61a 100644 --- a/java/vortex-jni/src/main/java/dev/vortex/api/DataSource.java +++ b/java/vortex-jni/src/main/java/dev/vortex/api/DataSource.java @@ -128,6 +128,57 @@ public OptionalLong asOptional() { } } + /** + * Sum of the on-storage byte sizes of all files included in this data source along with the precision of that + * estimate. Mirrors the Rust {@code Option>} returned by {@code DataSource::byte_size}: + * {@link ByteSize.Unknown} when no estimate is available (for example when the filesystem listing did not return + * sizes), {@link ByteSize.Estimate} for an inexact hint (some files contribute extrapolated sizes), and + * {@link ByteSize.Exact} when every file has a known size. + */ + public ByteSize byteSize() { + long[] out = new long[2]; + NativeDataSource.byteSize(pointer, out); + return switch ((int) out[1]) { + case 1 -> new ByteSize.Estimate(out[0]); + case 2 -> new ByteSize.Exact(out[0]); + default -> ByteSize.Unknown.INSTANCE; + }; + } + + /** Precision-aware byte size. See {@link #byteSize()}. */ + public sealed interface ByteSize { + /** Returns the byte size as a long, or {@code OptionalLong.empty()} when unknown. */ + OptionalLong asOptional(); + + /** Byte size is not known. */ + final class Unknown implements ByteSize { + public static final Unknown INSTANCE = new Unknown(); + + private Unknown() {} + + @Override + public OptionalLong asOptional() { + return OptionalLong.empty(); + } + } + + /** Estimated byte size; the actual value may differ. */ + record Estimate(long value) implements ByteSize { + @Override + public OptionalLong asOptional() { + return OptionalLong.of(value); + } + } + + /** Exact byte size. */ + record Exact(long value) implements ByteSize { + @Override + public OptionalLong asOptional() { + return OptionalLong.of(value); + } + } + } + /** Submit a scan. */ public Scan scan(ScanOptions options) { Objects.requireNonNull(options, "options"); diff --git a/java/vortex-jni/src/main/java/dev/vortex/jni/NativeDataSource.java b/java/vortex-jni/src/main/java/dev/vortex/jni/NativeDataSource.java index b7e58d2dc21..cc2aa163cee 100644 --- a/java/vortex-jni/src/main/java/dev/vortex/jni/NativeDataSource.java +++ b/java/vortex-jni/src/main/java/dev/vortex/jni/NativeDataSource.java @@ -33,4 +33,10 @@ private NativeDataSource() {} * {@code 1=estimate}, {@code 2=exact}. */ public static native void rowCount(long pointer, long[] out); + + /** + * Populate {@code out} with {@code [bytes, precision]}, the sum of on-storage file sizes for the data source. + * Precision is one of {@code 0=unknown}, {@code 1=estimate}, {@code 2=exact}. + */ + public static native void byteSize(long pointer, long[] out); } diff --git a/java/vortex-jni/src/test/java/dev/vortex/api/TestMinimal.java b/java/vortex-jni/src/test/java/dev/vortex/api/TestMinimal.java index 322dc5522c3..aeee8febcf4 100644 --- a/java/vortex-jni/src/test/java/dev/vortex/api/TestMinimal.java +++ b/java/vortex-jni/src/test/java/dev/vortex/api/TestMinimal.java @@ -11,6 +11,7 @@ import dev.vortex.jni.NativeLoader; import java.io.IOException; import java.math.BigDecimal; +import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.HashMap; @@ -132,6 +133,7 @@ public void testFullScan() throws Exception { DataSource ds = DataSource.open(session, writePath); assertEquals(new DataSource.RowCount.Exact(10L), ds.rowCount()); + assertEquals(new DataSource.ByteSize.Exact(Files.size(tempDir.resolve("minimal.vortex"))), ds.byteSize()); var schema = ds.arrowSchema(allocator); assertEquals( diff --git a/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexBatchExec.java b/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexBatchExec.java index 8df7ce8e1db..198dfd6c77c 100644 --- a/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexBatchExec.java +++ b/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexBatchExec.java @@ -4,6 +4,7 @@ package dev.vortex.spark.read; import com.google.common.collect.ImmutableMap; +import dev.vortex.api.Session; import dev.vortex.jni.NativeFiles; import dev.vortex.spark.VortexFilePartition; import dev.vortex.spark.VortexSparkSession; @@ -76,14 +77,19 @@ public PartitionReaderFactory createReaderFactory() { } private List resolvePaths() { - var session = VortexSparkSession.get(formatOptions); + return resolveVortexPaths(VortexSparkSession.get(formatOptions), paths, formatOptions); + } + + /** + * Expands directory-like entries to concrete {@code .vortex} files; entries that already name a {@code .vortex} + * file are kept as-is. Shared with {@link VortexScan#estimateStatistics()} so planning and execution resolve paths + * identically. + */ + static List resolveVortexPaths(Session session, List paths, Map formatOptions) { return paths.stream() - .flatMap(path -> { - if (path.endsWith(".vortex")) { - return Stream.of(path); - } - return NativeFiles.listFiles(session, path, formatOptions).stream(); - }) + .flatMap(path -> path.endsWith(".vortex") + ? Stream.of(path) + : NativeFiles.listFiles(session, path, formatOptions).stream()) .collect(Collectors.toList()); } diff --git a/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScan.java b/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScan.java index d5949b57a4d..02a7563f925 100644 --- a/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScan.java +++ b/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScan.java @@ -3,38 +3,62 @@ package dev.vortex.spark.read; +import dev.vortex.api.DataSource; +import dev.vortex.api.Session; +import dev.vortex.spark.VortexSparkSession; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.OptionalLong; import org.apache.spark.sql.connector.catalog.CatalogV2Util; import org.apache.spark.sql.connector.catalog.Column; +import org.apache.spark.sql.connector.expressions.NamedReference; import org.apache.spark.sql.connector.expressions.filter.Predicate; import org.apache.spark.sql.connector.read.Batch; import org.apache.spark.sql.connector.read.Scan; +import org.apache.spark.sql.connector.read.Statistics; +import org.apache.spark.sql.connector.read.SupportsReportStatistics; +import org.apache.spark.sql.connector.read.colstats.ColumnStatistics; +import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.StructType; -/** Spark V2 {@link Scan} over a table of Vortex files. */ -public final class VortexScan implements Scan { +/** + * Spark V2 {@link Scan} over a table of Vortex files. + * + *

Implements {@link SupportsReportStatistics} to surface both the row count Vortex records in each file footer and a + * Spark scan-size estimate. The byte estimate starts from the on-storage file sizes collected by + * {@code MultiFileDataSource}, then follows Spark's file scan convention by applying the SQL file-compression factor + * and scaling by the pushed read schema's default size relative to the full table schema's default size. When the + * listing did not return a size for one or more files the file-byte total is extrapolated before Spark scaling is + * applied. + */ +public final class VortexScan implements Scan, SupportsReportStatistics { private final List paths; + private final List tableColumns; private final List readColumns; private final Map formatOptions; private final Predicate[] pushedPredicates; + private volatile Statistics cachedStatistics; + /** * Creates a new VortexScan for the specified file paths and columns. The caller is responsible for passing * immutable collections; the constructor does not copy. * * @param paths the list of Vortex file paths to scan + * @param tableColumns the full table columns before projection pushdown * @param readColumns the list of columns to read from the files * @param pushedPredicates predicates pushed down by Spark; {@code null} or empty means no pushdown */ public VortexScan( List paths, + List tableColumns, List readColumns, - Map formatOptions, - Predicate[] pushedPredicates) { + Predicate[] pushedPredicates, + Map formatOptions) { this.paths = paths; + this.tableColumns = tableColumns; this.readColumns = readColumns; this.formatOptions = formatOptions; this.pushedPredicates = pushedPredicates == null ? new Predicate[0] : pushedPredicates.clone(); @@ -83,4 +107,70 @@ public Batch toBatch() { public ColumnarSupportMode columnarSupportMode() { return ColumnarSupportMode.SUPPORTED; } + + /** + * Returns statistics for this scan. + * + *

Opens the Vortex {@link DataSource} on first invocation and caches the result. The row count is taken from the + * data source (sum of file-footer row counts; extrapolated from the first opened file when other files are + * deferred). {@link Statistics#sizeInBytes()} is derived from the per-file sizes reported by the filesystem + * listing, then adjusted by Spark's compression factor and the ratio between the pushed read schema and the full + * table schema. When a listing did not return a size for some file the file-byte total is extrapolated. When no + * file size is known at all the value is left empty so Spark falls back to its default heuristic. + * + * @return statistics with row-count and Spark scan-size estimates + */ + @Override + public Statistics estimateStatistics() { + Statistics local = cachedStatistics; + if (local != null) { + return local; + } + synchronized (this) { + if (cachedStatistics == null) { + cachedStatistics = computeStatistics(); + } + return cachedStatistics; + } + } + + private Statistics computeStatistics() { + Session session = VortexSparkSession.get(formatOptions); + List resolvedPaths = VortexBatchExec.resolveVortexPaths(session, paths, formatOptions); + if (resolvedPaths.isEmpty()) { + return new VortexStatistics(OptionalLong.empty(), OptionalLong.empty()); + } + + DataSource source = DataSource.open(session, resolvedPaths, formatOptions); + return new VortexStatistics( + source.rowCount().asOptional(), + scaleSizeInBytes(source.byteSize().asOptional())); + } + + private OptionalLong scaleSizeInBytes(OptionalLong fileBytes) { + if (fileBytes.isEmpty()) { + return OptionalLong.empty(); + } + + StructType tableSchema = CatalogV2Util.v2ColumnsToStructType(tableColumns.toArray(new Column[0])); + StructType readSchema = readSchema(); + int tableDefaultSize = tableSchema.defaultSize(); + if (tableDefaultSize <= 0) { + return fileBytes; + } + + double scaled = SQLConf.get().fileCompressionFactor() + * fileBytes.getAsLong() + / tableDefaultSize + * readSchema.defaultSize(); + return OptionalLong.of((long) scaled); + } + + private record VortexStatistics(OptionalLong numRows, OptionalLong sizeInBytes) implements Statistics { + + @Override + public Map columnStats() { + return Map.of(); + } + } } diff --git a/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScanBuilder.java b/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScanBuilder.java index 94990432b45..107d22b30f6 100644 --- a/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScanBuilder.java +++ b/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScanBuilder.java @@ -31,7 +31,8 @@ public final class VortexScanBuilder implements ScanBuilder, SupportsPushDownRequiredColumns, SupportsPushDownV2Filters { private final ImmutableList.Builder paths; - private final List columns; + private final List tableColumns; + private final List readColumns; private final Map formatOptions; private final Set partitionColumnNames; private Predicate[] pushedPredicates = new Predicate[0]; @@ -48,10 +49,11 @@ public VortexScanBuilder(Map formatOptions) { */ public VortexScanBuilder(Map formatOptions, Transform[] partitionTransforms) { this.paths = ImmutableList.builder(); - this.columns = new ArrayList<>(); Map options = Maps.newHashMap(); options.put("vortex.workerThreads", "4"); options.putAll(formatOptions); + this.tableColumns = new ArrayList<>(); + this.readColumns = new ArrayList<>(); this.formatOptions = options; this.partitionColumnNames = collectPartitionColumnNames(partitionTransforms); } @@ -74,7 +76,8 @@ public VortexScanBuilder addPath(String path) { * @return this builder for method chaining */ public VortexScanBuilder addColumn(Column column) { - this.columns.add(column); + this.tableColumns.add(column); + this.readColumns.add(column); return this; } @@ -97,7 +100,7 @@ public VortexScanBuilder addAllPaths(Iterable paths) { */ public VortexScanBuilder addAllColumns(Iterable columns) { for (Column column : columns) { - this.columns.add(column); + addColumn(column); } return this; } @@ -116,7 +119,12 @@ public Scan build() { // Allow empty columns for operations like count() that don't need actual column data // If no columns are specified, we'll read the minimal schema needed - return new VortexScan(paths, List.copyOf(this.columns), this.formatOptions, pushedPredicates); + return new VortexScan( + paths, + List.copyOf(this.tableColumns), + List.copyOf(this.readColumns), + pushedPredicates, + this.formatOptions); } /** @@ -129,8 +137,8 @@ public Scan build() { */ @Override public void pruneColumns(StructType requiredSchema) { - columns.clear(); - columns.addAll(Arrays.asList(CatalogV2Util.structTypeToV2Columns(requiredSchema))); + readColumns.clear(); + readColumns.addAll(Arrays.asList(CatalogV2Util.structTypeToV2Columns(requiredSchema))); } /** @@ -145,7 +153,7 @@ public void pruneColumns(StructType requiredSchema) { @Override public Predicate[] pushPredicates(Predicate[] predicates) { Map dataColumnTypes = new HashMap<>(); - for (Column column : columns) { + for (Column column : readColumns) { if (!partitionColumnNames.contains(column.name())) { dataColumnTypes.put(column.name(), column.dataType()); } diff --git a/java/vortex-spark/src/test/java/dev/vortex/spark/VortexDataSourceStatsTest.java b/java/vortex-spark/src/test/java/dev/vortex/spark/VortexDataSourceStatsTest.java new file mode 100644 index 00000000000..0595349a49a --- /dev/null +++ b/java/vortex-spark/src/test/java/dev/vortex/spark/VortexDataSourceStatsTest.java @@ -0,0 +1,241 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +package dev.vortex.spark; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import dev.vortex.spark.read.VortexScan; +import dev.vortex.spark.read.VortexScanBuilder; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Comparator; +import java.util.Map; +import java.util.stream.Stream; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.catalog.Column; +import org.apache.spark.sql.connector.read.Statistics; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.io.TempDir; + +/** + * Integration tests for {@link VortexScan#estimateStatistics()}. + * + *

Verifies that the Spark V2 scan surfaces both the row count Vortex stores in each file footer and the sum of the + * on-storage file sizes reported by the filesystem listing. + */ +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public final class VortexDataSourceStatsTest { + private static final String FILE_COMPRESSION_FACTOR_KEY = "spark.sql.sources.fileCompressionFactor"; + + private SparkSession spark; + + @TempDir + Path tempDir; + + @BeforeAll + public void setUp() { + spark = SparkSession.builder() + .appName("VortexStatsTest") + .master("local[2]") + .config("spark.driver.host", "127.0.0.1") + .config("spark.sql.shuffle.partitions", "2") + .config("spark.sql.adaptive.enabled", "false") + .config("spark.ui.enabled", "false") + .getOrCreate(); + } + + @AfterAll + public void tearDown() { + if (spark != null) { + spark.stop(); + } + } + + @Test + @DisplayName("VortexScan reports exact row count for single-file scans") + public void testEstimateStatisticsReportsRowCount() throws IOException { + int numRows = 250; + Path outputPath = writeRows(numRows, "single_file"); + + VortexScan scan = buildScan(outputPath); + Statistics stats = scan.estimateStatistics(); + + assertTrue( + stats.numRows().isPresent(), + "VortexScan should report a row count for a Vortex dataset with a populated footer"); + assertEquals(numRows, stats.numRows().getAsLong(), "Row count should match the rows we wrote"); + } + + @Test + @DisplayName("VortexScan reports aggregate row count across multi-file scans") + public void testEstimateStatisticsAcrossMultipleFiles() throws IOException { + int numRows = 400; + Path outputPath = writeRows(numRows, "multi_file", 4); + + VortexScan scan = buildScan(outputPath); + Statistics stats = scan.estimateStatistics(); + + assertTrue(stats.numRows().isPresent(), "Row count should be reported for multi-file Vortex datasets"); + assertEquals(numRows, stats.numRows().getAsLong(), "Row count should sum across all files"); + } + + @Test + @DisplayName("VortexScan reports sizeInBytes equal to the sum of on-storage file sizes") + public void testEstimateStatisticsReportsSizeInBytes() throws IOException { + Path outputPath = writeRows(120, "with_size", 3); + + long fileBytes = totalVortexFileBytes(outputPath); + assertTrue(fileBytes > 0, "Test setup should produce at least one non-empty .vortex file"); + + VortexScan scan = buildScan(outputPath); + Statistics stats = scan.estimateStatistics(); + + assertTrue( + stats.sizeInBytes().isPresent(), + "VortexScan should surface a sizeInBytes when the filesystem listing reports file sizes"); + // Mirror the scan's Spark-convention scaling (factor 1.0, unpruned schema), which divides and + // re-multiplies by the schema default size in double arithmetic before truncating; asserting + // against the raw byte sum would be sensitive to the floating-point round trip. + StructType schema = spark.read() + .format("vortex") + .option("path", outputPath.toUri().toString()) + .load() + .schema(); + long expectedSize = (long) (1.0 * fileBytes / schema.defaultSize() * schema.defaultSize()); + assertEquals( + expectedSize, + stats.sizeInBytes().getAsLong(), + "sizeInBytes should equal the sum of on-storage .vortex file sizes"); + } + + @Test + @DisplayName("VortexScan scales sizeInBytes by the pushed read schema") + public void testEstimateStatisticsScalesSizeInBytesForProjection() throws IOException { + Path outputPath = writeRows(120, "projected_size", 3); + long fileBytes = totalVortexFileBytes(outputPath); + + StructType fullSchema = spark.read() + .format("vortex") + .option("path", outputPath.toUri().toString()) + .load() + .schema(); + StructType idOnlySchema = new StructType(new StructField[] {fullSchema.fields()[0]}); + + String previousCompressionFactor = spark.conf().get(FILE_COMPRESSION_FACTOR_KEY); + spark.conf().set(FILE_COMPRESSION_FACTOR_KEY, "0.5"); + try { + VortexScan scan = buildScan(outputPath, idOnlySchema); + Statistics stats = scan.estimateStatistics(); + + long expectedSize = (long) (0.5 * fileBytes / fullSchema.defaultSize() * idOnlySchema.defaultSize()); + assertTrue(stats.sizeInBytes().isPresent(), "Projected scans should still surface sizeInBytes"); + assertEquals( + expectedSize, + stats.sizeInBytes().getAsLong(), + "sizeInBytes should follow Spark FileScan's compression and schema-width scaling"); + assertTrue( + stats.sizeInBytes().getAsLong() < fileBytes, + "Projected scan stats should be smaller than full file bytes"); + } finally { + spark.conf().set(FILE_COMPRESSION_FACTOR_KEY, previousCompressionFactor); + } + } + + @Test + @DisplayName("VortexScan caches statistics across repeated calls") + public void testEstimateStatisticsIsCached() throws IOException { + Path outputPath = writeRows(50, "cached", 1); + + VortexScan scan = buildScan(outputPath); + Statistics first = scan.estimateStatistics(); + Statistics second = scan.estimateStatistics(); + + // Same instance returned -- the second call hits the cached value. + assertEquals(first, second, "estimateStatistics() should return the same Statistics object on repeat calls"); + assertInstanceOf(Statistics.class, first); + } + + private VortexScan buildScan(Path outputPath) { + return buildScan(outputPath, null); + } + + private VortexScan buildScan(Path outputPath, StructType requiredSchema) { + Dataset readDf = spark.read() + .format("vortex") + .option("path", outputPath.toUri().toString()) + .load(); + StructType readSchema = readDf.schema(); + + VortexScanBuilder builder = new VortexScanBuilder(Map.of()); + builder.addPath(outputPath.toUri().toString()); + for (StructField field : readSchema.fields()) { + builder.addColumn(Column.create(field.name(), field.dataType())); + } + if (requiredSchema != null) { + builder.pruneColumns(requiredSchema); + } + return (VortexScan) builder.build(); + } + + private Path writeRows(int numRows, String name) throws IOException { + return writeRows(numRows, name, 1); + } + + private long totalVortexFileBytes(Path outputPath) throws IOException { + try (Stream paths = Files.walk(outputPath)) { + return paths.filter(Files::isRegularFile) + .filter(path -> path.getFileName().toString().endsWith(".vortex")) + .mapToLong(path -> { + try { + return Files.size(path); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .sum(); + } + } + + private Path writeRows(int numRows, String name, int partitions) throws IOException { + Path outputPath = tempDir.resolve(name); + Dataset df = spark.range(0, numRows) + .selectExpr("cast(id as int) as id", "concat('value_', cast(id as string)) as value"); + + df.repartition(partitions) + .write() + .format("vortex") + .option("path", outputPath.toUri().toString()) + .mode(SaveMode.Overwrite) + .save(); + return outputPath; + } + + @AfterEach + public void cleanupTempFiles() throws IOException { + if (tempDir != null && Files.exists(tempDir)) { + try (Stream paths = Files.walk(tempDir)) { + paths.sorted(Comparator.reverseOrder()).forEach(path -> { + try { + Files.deleteIfExists(path); + } catch (IOException e) { + System.err.println("Failed to delete: " + path); + } + }); + } + } + } +} diff --git a/vortex-duckdb/src/table_function.rs b/vortex-duckdb/src/table_function.rs index 11c5851af27..7b34725b123 100644 --- a/vortex-duckdb/src/table_function.rs +++ b/vortex-duckdb/src/table_function.rs @@ -439,7 +439,7 @@ pub fn statistics(bind_data: &TableFunctionBind, column_index: usize) -> Option< if children.len() != 1 { return None; } - let MultiLayoutChild::Opened(reader) = &children[0] else { + let MultiLayoutChild::Opened { reader, .. } = &children[0] else { return None; }; let stats_sets = match reader.as_any().downcast_ref::() { diff --git a/vortex-file/src/multi/mod.rs b/vortex-file/src/multi/mod.rs index 3abb4ebea2a..2baf53ff845 100644 --- a/vortex-file/src/multi/mod.rs +++ b/vortex-file/src/multi/mod.rs @@ -8,9 +8,12 @@ mod session; use std::sync::Arc; use async_trait::async_trait; +use futures::StreamExt; use futures::TryStreamExt; +use futures::stream; use session::MultiFileSessionExt; use tracing::debug; +use vortex_error::VortexError; use vortex_error::VortexResult; use vortex_error::vortex_bail; use vortex_io::filesystem::FileListing; @@ -63,6 +66,11 @@ pub struct MultiFileDataSource { open_options_fn: Arc VortexOpenOptions + Send + Sync>, } +/// In-flight glob resolutions in [`MultiFileDataSource::build`]. Callers like the JNI data +/// source add one exact path per glob source, where each resolution is a single remote +/// metadata lookup; resolving them concurrently avoids one round trip of latency per file. +const GLOB_RESOLUTION_CONCURRENCY: usize = 16; + impl MultiFileDataSource { /// Create a new [`MultiFileDataSource`] builder. pub fn new(session: VortexSession) -> Self { @@ -122,31 +130,39 @@ impl MultiFileDataSource { .then(|| create_local_filesystem(&self.session)) .transpose()?; - // Collect files from all glob sources. - let mut all_files: Vec<(FileListing, FileSystemRef)> = Vec::new(); - for (glob, maybe_fs) in &self.glob_sources { - // Use the provided filesystem, or fall back to the local filesystem. - // We know local_fs is Some when maybe_fs is None (by construction above). - let fs = maybe_fs - .as_ref() - .or(local_fs.as_ref()) - .map(Arc::clone) - .unwrap_or_else(|| { - unreachable!("local_fs is set when any glob lacks a filesystem") - }); - let files: Vec = fs.glob(glob)?.try_collect().await?; - for file in files { - all_files.push((file, Arc::clone(&fs))); - } - } + let globs: Vec = self.glob_sources.iter().map(|(g, _)| g.clone()).collect(); + + // Resolve glob sources concurrently while preserving their order, since the order + // determines partition indices and which file is opened eagerly for the schema. + let resolved: Vec> = + stream::iter(self.glob_sources.into_iter().map(|(glob, maybe_fs)| { + // Use the provided filesystem, or fall back to the local filesystem. + // We know local_fs is Some when maybe_fs is None (by construction above). + let fs = maybe_fs + .or_else(|| local_fs.as_ref().map(Arc::clone)) + .unwrap_or_else(|| { + unreachable!("local_fs is set when any glob lacks a filesystem") + }); + async move { + let files: Vec = fs.glob(&glob)?.try_collect().await?; + Ok::<_, VortexError>( + files + .into_iter() + .map(|file| (file, Arc::clone(&fs))) + .collect(), + ) + } + })) + .buffered(GLOB_RESOLUTION_CONCURRENCY) + .try_collect() + .await?; + let all_files: Vec<(FileListing, FileSystemRef)> = resolved.into_iter().flatten().collect(); if all_files.is_empty() { - let globs: Vec<_> = self.glob_sources.iter().map(|(g, _)| g.as_str()).collect(); vortex_bail!("No files matched the glob pattern(s): {:?}", globs); } let file_count = all_files.len(); - let globs: Vec<_> = self.glob_sources.iter().map(|(g, _)| g.as_str()).collect(); debug!(file_count, glob = ?globs, "discovered files"); // Open first file eagerly for dtype. @@ -155,6 +171,8 @@ impl MultiFileDataSource { let first_file = open_file(first_fs, first_file_listing, &self.session, open_fn).await?; let first_reader = first_file.layout_reader()?; + let byte_sizes: Vec> = all_files.iter().map(|(file, _)| file.size).collect(); + let factories: Vec> = all_files[1..] .iter() .map(|(file, fs)| { @@ -167,7 +185,12 @@ impl MultiFileDataSource { }) .collect(); - let inner = MultiLayoutDataSource::new_with_first(first_reader, factories, &self.session); + let inner = MultiLayoutDataSource::new_with_first( + first_reader, + factories, + byte_sizes, + &self.session, + ); debug!(file_count, dtype = %inner.dtype(), "built MultiFileDataSource"); diff --git a/vortex-jni/src/data_source.rs b/vortex-jni/src/data_source.rs index d733c2db48f..5f244998f67 100644 --- a/vortex-jni/src/data_source.rs +++ b/vortex-jni/src/data_source.rs @@ -211,6 +211,27 @@ pub extern "system" fn Java_dev_vortex_jni_NativeDataSource_rowCount( }); } +/// Write the byte size into the two-slot jlong pair `out`: +/// `out[0]` receives the size in bytes (0 when unknown), `out[1]` the precision (0=unknown, 1=estimate, 2=exact). +#[unsafe(no_mangle)] +pub extern "system" fn Java_dev_vortex_jni_NativeDataSource_byteSize( + mut env: EnvUnowned, + _class: JClass, + pointer: jlong, + out: JLongArray, +) { + try_or_throw(&mut env, |env| { + let ds = unsafe { NativeDataSource::from_ptr(pointer) }; + let (bytes, precision) = match ds.inner.byte_size() { + Precision::Exact(b) => (b as jlong, 2), + Precision::Inexact(b) => (b as jlong, 1), + Precision::Absent => (0, 0), + }; + out.set_region(env, 0, &[bytes, precision])?; + Ok(()) + }); +} + #[cfg(test)] mod tests { use super::*; diff --git a/vortex-layout/src/scan/multi.rs b/vortex-layout/src/scan/multi.rs index 9442b72cf7a..577271da844 100644 --- a/vortex-layout/src/scan/multi.rs +++ b/vortex-layout/src/scan/multi.rs @@ -32,6 +32,7 @@ use async_trait::async_trait; use futures::FutureExt; use futures::StreamExt; use futures::stream; +use itertools::Itertools; use tracing::Instrument; use vortex_array::dtype::DType; use vortex_array::dtype::FieldPath; @@ -87,26 +88,68 @@ pub struct MultiLayoutDataSource { } pub enum MultiLayoutChild { - Opened(LayoutReaderRef), - Deferred(Arc), + Opened { + reader: LayoutReaderRef, + /// On-storage file size in bytes, if known from the listing metadata. + byte_size: Option, + }, + Deferred { + factory: Arc, + /// On-storage file size in bytes, if known from the listing metadata. + byte_size: Option, + }, +} + +impl MultiLayoutChild { + /// On-storage file size in bytes for this child, if known. + pub fn byte_size(&self) -> Option { + match self { + MultiLayoutChild::Opened { byte_size, .. } => *byte_size, + MultiLayoutChild::Deferred { byte_size, .. } => *byte_size, + } + } } impl MultiLayoutDataSource { /// Creates a multi-layout data source with the first reader pre-opened. /// /// The first reader determines the dtype. Remaining readers are opened lazily during - /// scanning via their factories. + /// scanning via their factories. `byte_sizes` carries the on-storage file size in bytes for + /// each child (first followed by remaining); pass `None` for entries where the size is + /// unknown. Must be empty or have length `1 + remaining.len()`. pub fn new_with_first( first: LayoutReaderRef, remaining: Vec>, + byte_sizes: Vec>, session: &VortexSession, ) -> Self { let dtype = first.dtype().clone(); let concurrency = get_available_parallelism().unwrap_or(DEFAULT_CONCURRENCY); - let mut children = Vec::with_capacity(1 + remaining.len()); - children.push(MultiLayoutChild::Opened(first)); - children.extend(remaining.into_iter().map(MultiLayoutChild::Deferred)); + let total = 1 + remaining.len(); + let mut sizes = byte_sizes; + if sizes.is_empty() { + sizes = vec![None; total]; + } + debug_assert_eq!( + sizes.len(), + total, + "byte_sizes length must match the number of children" + ); + + let mut children = Vec::with_capacity(total); + let mut sizes_iter = sizes.into_iter(); + let first_size = sizes_iter.next().unwrap_or(None); + children.push(MultiLayoutChild::Opened { + reader: first, + byte_size: first_size, + }); + children.extend( + remaining + .into_iter() + .zip_eq(sizes_iter) + .map(|(factory, byte_size)| MultiLayoutChild::Deferred { factory, byte_size }), + ); Self { dtype, @@ -120,20 +163,34 @@ impl MultiLayoutDataSource { /// /// The dtype must be provided externally since there is no pre-opened reader to infer it /// from. This avoids eagerly opening any file when the schema is already known (e.g. from - /// a catalog or a prior scan). + /// a catalog or a prior scan). `byte_sizes` carries the on-storage file size in bytes for + /// each factory; pass `None` for entries where the size is unknown. Must be empty or have + /// the same length as `factories`. pub fn new_deferred( dtype: DType, factories: Vec>, + byte_sizes: Vec>, session: &VortexSession, ) -> Self { let concurrency = get_available_parallelism().unwrap_or(DEFAULT_CONCURRENCY); + let mut sizes = byte_sizes; + if sizes.is_empty() { + sizes = vec![None; factories.len()]; + } + debug_assert_eq!( + sizes.len(), + factories.len(), + "byte_sizes length must match the number of factories" + ); + Self { dtype, session: session.clone(), children: factories .into_iter() - .map(MultiLayoutChild::Deferred) + .zip_eq(sizes) + .map(|(factory, byte_size)| MultiLayoutChild::Deferred { factory, byte_size }) .collect(), concurrency, } @@ -166,11 +223,11 @@ impl DataSource for MultiLayoutDataSource { for child in &self.children { match child { - MultiLayoutChild::Opened(reader) => { + MultiLayoutChild::Opened { reader, .. } => { opened_count += 1; sum = sum.saturating_add(reader.row_count()); } - MultiLayoutChild::Deferred(_) => { + MultiLayoutChild::Deferred { .. } => { deferred_count += 1; } } @@ -192,6 +249,34 @@ impl DataSource for MultiLayoutDataSource { } } + fn byte_size(&self) -> Precision { + let total_count = self.children.len() as u64; + if total_count == 0 { + return Precision::exact(0u64); + } + + let mut sum: u64 = 0; + let mut known_count: u64 = 0; + for child in &self.children { + if let Some(size) = child.byte_size() { + sum = sum.saturating_add(size); + known_count += 1; + } + } + + if known_count == 0 { + return Precision::Absent; + } + + if known_count == total_count { + Precision::exact(sum) + } else { + let avg = sum / known_count; + let extrapolated = avg.saturating_mul(total_count); + Precision::inexact(extrapolated) + } + } + fn deserialize_partition( &self, _data: &[u8], @@ -206,8 +291,10 @@ impl DataSource for MultiLayoutDataSource { for child in &self.children { match child { - MultiLayoutChild::Opened(reader) => ready.push_back(Arc::clone(reader)), - MultiLayoutChild::Deferred(factory) => deferred.push_back(Arc::clone(factory)), + MultiLayoutChild::Opened { reader, .. } => ready.push_back(Arc::clone(reader)), + MultiLayoutChild::Deferred { factory, .. } => { + deferred.push_back(Arc::clone(factory)) + } } } @@ -443,3 +530,43 @@ impl Partition for MultiLayoutPartition { ))) } } + +#[cfg(test)] +mod tests { + use rstest::rstest; + use vortex_array::dtype::Nullability; + + use super::*; + use crate::scan::test::new_session; + + struct NeverOpened; + + #[async_trait] + impl LayoutReaderFactory for NeverOpened { + async fn open(&self) -> VortexResult> { + unreachable!("byte_size must not open readers") + } + } + + fn deferred_source(byte_sizes: Vec>) -> MultiLayoutDataSource { + let factories: Vec> = byte_sizes + .iter() + .map(|_| Arc::new(NeverOpened) as _) + .collect(); + MultiLayoutDataSource::new_deferred( + DType::Bool(Nullability::NonNullable), + factories, + byte_sizes, + &new_session(), + ) + } + + #[rstest] + #[case::all_known(vec![Some(10), Some(20), Some(30)], Precision::exact(60u64))] + #[case::some_known_extrapolates(vec![Some(10), None, Some(30)], Precision::inexact(60u64))] + #[case::none_known(vec![None, None], Precision::Absent)] + #[case::no_children(vec![], Precision::exact(0u64))] + fn byte_size_precision(#[case] sizes: Vec>, #[case] expected: Precision) { + assert_eq!(deferred_source(sizes).byte_size(), expected); + } +}