Skip to content

Commit 14121eb

Browse files
committed
Let spark report statistics
Signed-off-by: Robert Kruszewski <github@robertk.io>
1 parent 76b0ad8 commit 14121eb

10 files changed

Lines changed: 623 additions & 33 deletions

File tree

java/vortex-jni/src/main/java/dev/vortex/api/DataSource.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,57 @@ public OptionalLong asOptional() {
128128
}
129129
}
130130

131+
/**
132+
* Sum of the on-storage byte sizes of all files included in this data source along with the precision of that
133+
* estimate. Mirrors the Rust {@code Option<Precision<u64>>} returned by {@code DataSource::byte_size}:
134+
* {@link ByteSize.Unknown} when no estimate is available (for example when the filesystem listing did not return
135+
* sizes), {@link ByteSize.Estimate} for an inexact hint (some files contribute extrapolated sizes), and
136+
* {@link ByteSize.Exact} when every file has a known size.
137+
*/
138+
public ByteSize byteSize() {
139+
long[] out = new long[2];
140+
NativeDataSource.byteSize(pointer, out);
141+
return switch ((int) out[1]) {
142+
case 1 -> new ByteSize.Estimate(out[0]);
143+
case 2 -> new ByteSize.Exact(out[0]);
144+
default -> ByteSize.Unknown.INSTANCE;
145+
};
146+
}
147+
148+
/** Precision-aware byte size. See {@link #byteSize()}. */
149+
public sealed interface ByteSize {
150+
/** Returns the byte size as a long, or {@code OptionalLong.empty()} when unknown. */
151+
OptionalLong asOptional();
152+
153+
/** Byte size is not known. */
154+
final class Unknown implements ByteSize {
155+
public static final Unknown INSTANCE = new Unknown();
156+
157+
private Unknown() {}
158+
159+
@Override
160+
public OptionalLong asOptional() {
161+
return OptionalLong.empty();
162+
}
163+
}
164+
165+
/** Estimated byte size; the actual value may differ. */
166+
record Estimate(long value) implements ByteSize {
167+
@Override
168+
public OptionalLong asOptional() {
169+
return OptionalLong.of(value);
170+
}
171+
}
172+
173+
/** Exact byte size. */
174+
record Exact(long value) implements ByteSize {
175+
@Override
176+
public OptionalLong asOptional() {
177+
return OptionalLong.of(value);
178+
}
179+
}
180+
}
181+
131182
/** Submit a scan. */
132183
public Scan scan(ScanOptions options) {
133184
Objects.requireNonNull(options, "options");

java/vortex-jni/src/main/java/dev/vortex/jni/NativeDataSource.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,10 @@ private NativeDataSource() {}
3333
* {@code 1=estimate}, {@code 2=exact}.
3434
*/
3535
public static native void rowCount(long pointer, long[] out);
36+
37+
/**
38+
* Populate {@code out} with {@code [bytes, precision]}, the sum of on-storage file sizes for the data source.
39+
* Precision is one of {@code 0=unknown}, {@code 1=estimate}, {@code 2=exact}.
40+
*/
41+
public static native void byteSize(long pointer, long[] out);
3642
}

java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScan.java

Lines changed: 105 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,37 +4,65 @@
44
package dev.vortex.spark.read;
55

66
import java.util.Arrays;
7+
import dev.vortex.api.DataSource;
8+
import dev.vortex.api.Session;
9+
import dev.vortex.jni.NativeFiles;
10+
import dev.vortex.spark.VortexSparkSession;
11+
import java.util.HashMap;
712
import java.util.List;
813
import java.util.Map;
14+
import java.util.OptionalLong;
15+
import java.util.stream.Collectors;
16+
import java.util.stream.Stream;
917
import org.apache.spark.sql.connector.catalog.CatalogV2Util;
1018
import org.apache.spark.sql.connector.catalog.Column;
1119
import org.apache.spark.sql.connector.expressions.filter.Predicate;
20+
import org.apache.spark.sql.connector.expressions.NamedReference;
1221
import org.apache.spark.sql.connector.read.Batch;
1322
import org.apache.spark.sql.connector.read.Scan;
23+
import org.apache.spark.sql.connector.read.Statistics;
24+
import org.apache.spark.sql.connector.read.SupportsReportStatistics;
25+
import org.apache.spark.sql.connector.read.colstats.ColumnStatistics;
26+
import org.apache.spark.sql.internal.SQLConf;
1427
import org.apache.spark.sql.types.StructType;
1528

16-
/** Spark V2 {@link Scan} over a table of Vortex files. */
17-
public final class VortexScan implements Scan {
29+
/**
30+
* Spark V2 {@link Scan} over a table of Vortex files.
31+
*
32+
* <p>Implements {@link SupportsReportStatistics} to surface both the row count Vortex records in each file footer and a
33+
* Spark scan-size estimate. The byte estimate starts from the on-storage file sizes collected by
34+
* {@code MultiFileDataSource}, then follows Spark's file scan convention by applying the SQL file-compression factor
35+
* and scaling by the pushed read schema's default size relative to the full table schema's default size. When the
36+
* listing did not return a size for one or more files the file-byte total is extrapolated before Spark scaling is
37+
* applied.
38+
*/
39+
public final class VortexScan implements Scan, SupportsReportStatistics {
1840

1941
private final List<String> paths;
42+
private final List<Column> tableColumns;
2043
private final List<Column> readColumns;
2144
private final Map<String, String> formatOptions;
2245
private final Predicate[] pushedPredicates;
2346

47+
private volatile Statistics cachedStatistics;
48+
2449
/**
2550
* Creates a new VortexScan for the specified file paths and columns. The caller is responsible for passing
2651
* immutable collections; the constructor does not copy.
2752
*
2853
* @param paths the list of Vortex file paths to scan
54+
* @param tableColumns the full table columns before projection pushdown
2955
* @param readColumns the list of columns to read from the files
3056
* @param pushedPredicates predicates pushed down by Spark; {@code null} or empty means no pushdown
3157
*/
3258
public VortexScan(
3359
List<String> paths,
60+
List<Column> tableColumns,
3461
List<Column> readColumns,
35-
Map<String, String> formatOptions,
36-
Predicate[] pushedPredicates) {
62+
Predicate[] pushedPredicates,
63+
Map<String, String> formatOptions) {
3764
this.paths = paths;
65+
this.tableColumns = tableColumns;
3866
this.readColumns = readColumns;
3967
this.formatOptions = formatOptions;
4068
this.pushedPredicates = pushedPredicates == null ? new Predicate[0] : pushedPredicates.clone();
@@ -83,4 +111,77 @@ public Batch toBatch() {
83111
public ColumnarSupportMode columnarSupportMode() {
84112
return ColumnarSupportMode.SUPPORTED;
85113
}
114+
115+
/**
116+
* Returns statistics for this scan.
117+
*
118+
* <p>Opens the Vortex {@link DataSource} on first invocation and caches the result. The row count is taken from the
119+
* data source (sum of file-footer row counts; extrapolated from the first opened file when other files are
120+
* deferred). {@link Statistics#sizeInBytes()} is derived from the per-file sizes reported by the filesystem
121+
* listing, then adjusted by Spark's compression factor and the ratio between the pushed read schema and the full
122+
* table schema. When a listing did not return a size for some file the file-byte total is extrapolated. When no
123+
* file size is known at all the value is left empty so Spark falls back to its default heuristic.
124+
*
125+
* @return statistics with row-count and Spark scan-size estimates
126+
*/
127+
@Override
128+
public Statistics estimateStatistics() {
129+
Statistics local = cachedStatistics;
130+
if (local != null) {
131+
return local;
132+
}
133+
synchronized (this) {
134+
if (cachedStatistics == null) {
135+
cachedStatistics = computeStatistics();
136+
}
137+
return cachedStatistics;
138+
}
139+
}
140+
141+
private Statistics computeStatistics() {
142+
Session session = VortexSparkSession.get(formatOptions);
143+
// Expand directory paths to concrete files the way VortexBatchExec does, so we use the
144+
// same per-path resolution end-to-end.
145+
List<String> resolvedPaths = paths.stream()
146+
.flatMap(path -> path.endsWith(".vortex")
147+
? Stream.of(path)
148+
: NativeFiles.listFiles(session, path, formatOptions).stream())
149+
.collect(Collectors.toList());
150+
151+
if (resolvedPaths.isEmpty()) {
152+
return new VortexStatistics(OptionalLong.empty(), OptionalLong.empty());
153+
}
154+
155+
DataSource source = DataSource.open(session, resolvedPaths, formatOptions);
156+
return new VortexStatistics(
157+
source.rowCount().asOptional(),
158+
scaleSizeInBytes(source.byteSize().asOptional()));
159+
}
160+
161+
private OptionalLong scaleSizeInBytes(OptionalLong fileBytes) {
162+
if (fileBytes.isEmpty()) {
163+
return OptionalLong.empty();
164+
}
165+
166+
StructType tableSchema = CatalogV2Util.v2ColumnsToStructType(tableColumns.toArray(new Column[0]));
167+
StructType readSchema = readSchema();
168+
int tableDefaultSize = tableSchema.defaultSize();
169+
if (tableDefaultSize <= 0) {
170+
return fileBytes;
171+
}
172+
173+
double scaled = SQLConf.get().fileCompressionFactor()
174+
* fileBytes.getAsLong()
175+
/ tableDefaultSize
176+
* readSchema.defaultSize();
177+
return OptionalLong.of((long) scaled);
178+
}
179+
180+
private record VortexStatistics(OptionalLong numRows, OptionalLong sizeInBytes) implements Statistics {
181+
182+
@Override
183+
public Map<NamedReference, ColumnStatistics> columnStats() {
184+
return new HashMap<>();
185+
}
186+
}
86187
}

java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScanBuilder.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@
3131
public final class VortexScanBuilder
3232
implements ScanBuilder, SupportsPushDownRequiredColumns, SupportsPushDownV2Filters {
3333
private final ImmutableList.Builder<String> paths;
34-
private final List<Column> columns;
34+
private final List<Column> tableColumns;
35+
private final List<Column> readColumns;
3536
private final Map<String, String> formatOptions;
3637
private final Set<String> partitionColumnNames;
3738
private Predicate[] pushedPredicates = new Predicate[0];
@@ -48,10 +49,11 @@ public VortexScanBuilder(Map<String, String> formatOptions) {
4849
*/
4950
public VortexScanBuilder(Map<String, String> formatOptions, Transform[] partitionTransforms) {
5051
this.paths = ImmutableList.builder();
51-
this.columns = new ArrayList<>();
5252
Map<String, String> options = Maps.newHashMap();
5353
options.put("vortex.workerThreads", "4");
5454
options.putAll(formatOptions);
55+
this.tableColumns = new ArrayList<>();
56+
this.readColumns = new ArrayList<>();
5557
this.formatOptions = options;
5658
this.partitionColumnNames = collectPartitionColumnNames(partitionTransforms);
5759
}
@@ -74,7 +76,8 @@ public VortexScanBuilder addPath(String path) {
7476
* @return this builder for method chaining
7577
*/
7678
public VortexScanBuilder addColumn(Column column) {
77-
this.columns.add(column);
79+
this.tableColumns.add(column);
80+
this.readColumns.add(column);
7881
return this;
7982
}
8083

@@ -97,7 +100,7 @@ public VortexScanBuilder addAllPaths(Iterable<String> paths) {
97100
*/
98101
public VortexScanBuilder addAllColumns(Iterable<Column> columns) {
99102
for (Column column : columns) {
100-
this.columns.add(column);
103+
addColumn(column);
101104
}
102105
return this;
103106
}
@@ -116,7 +119,7 @@ public Scan build() {
116119
// Allow empty columns for operations like count() that don't need actual column data
117120
// If no columns are specified, we'll read the minimal schema needed
118121

119-
return new VortexScan(paths, List.copyOf(this.columns), this.formatOptions, pushedPredicates);
122+
return new VortexScan(paths, List.copyOf(this.tableColumns), List.copyOf(this.readColumns), pushedPredicates, this.formatOptions);
120123
}
121124

122125
/**
@@ -129,8 +132,8 @@ public Scan build() {
129132
*/
130133
@Override
131134
public void pruneColumns(StructType requiredSchema) {
132-
columns.clear();
133-
columns.addAll(Arrays.asList(CatalogV2Util.structTypeToV2Columns(requiredSchema)));
135+
readColumns.clear();
136+
readColumns.addAll(Arrays.asList(CatalogV2Util.structTypeToV2Columns(requiredSchema)));
134137
}
135138

136139
/**
@@ -145,7 +148,7 @@ public void pruneColumns(StructType requiredSchema) {
145148
@Override
146149
public Predicate[] pushPredicates(Predicate[] predicates) {
147150
Map<String, DataType> dataColumnTypes = new HashMap<>();
148-
for (Column column : columns) {
151+
for (Column column : readColumns) {
149152
if (!partitionColumnNames.contains(column.name())) {
150153
dataColumnTypes.put(column.name(), column.dataType());
151154
}

0 commit comments

Comments
 (0)