Skip to content

Commit 41c0d4f

Browse files
committed
Let spark report statistics
Signed-off-by: Robert Kruszewski <github@robertk.io>
1 parent 012d0ec commit 41c0d4f

11 files changed

Lines changed: 635 additions & 31 deletions

File tree

java/vortex-jni/src/main/java/dev/vortex/api/DataSource.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,57 @@ public OptionalLong asOptional() {
128128
}
129129
}
130130

131+
/**
132+
* Sum of the on-storage byte sizes of all files included in this data source along with the precision of that
133+
* estimate. Mirrors the Rust {@code Option<Precision<u64>>} returned by {@code DataSource::byte_size}:
134+
* {@link ByteSize.Unknown} when no estimate is available (for example when the filesystem listing did not return
135+
* sizes), {@link ByteSize.Estimate} for an inexact hint (some files contribute extrapolated sizes), and
136+
* {@link ByteSize.Exact} when every file has a known size.
137+
*/
138+
public ByteSize byteSize() {
139+
long[] out = new long[2];
140+
NativeDataSource.byteSize(pointer, out);
141+
return switch ((int) out[1]) {
142+
case 1 -> new ByteSize.Estimate(out[0]);
143+
case 2 -> new ByteSize.Exact(out[0]);
144+
default -> ByteSize.Unknown.INSTANCE;
145+
};
146+
}
147+
148+
/** Precision-aware byte size. See {@link #byteSize()}. */
149+
public sealed interface ByteSize {
150+
/** Returns the byte size as a long, or {@code OptionalLong.empty()} when unknown. */
151+
OptionalLong asOptional();
152+
153+
/** Byte size is not known. */
154+
final class Unknown implements ByteSize {
155+
public static final Unknown INSTANCE = new Unknown();
156+
157+
private Unknown() {}
158+
159+
@Override
160+
public OptionalLong asOptional() {
161+
return OptionalLong.empty();
162+
}
163+
}
164+
165+
/** Estimated byte size; the actual value may differ. */
166+
record Estimate(long value) implements ByteSize {
167+
@Override
168+
public OptionalLong asOptional() {
169+
return OptionalLong.of(value);
170+
}
171+
}
172+
173+
/** Exact byte size. */
174+
record Exact(long value) implements ByteSize {
175+
@Override
176+
public OptionalLong asOptional() {
177+
return OptionalLong.of(value);
178+
}
179+
}
180+
}
181+
131182
/** Submit a scan. */
132183
public Scan scan(ScanOptions options) {
133184
Objects.requireNonNull(options, "options");

java/vortex-jni/src/main/java/dev/vortex/jni/NativeDataSource.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,10 @@ private NativeDataSource() {}
3333
* {@code 1=estimate}, {@code 2=exact}.
3434
*/
3535
public static native void rowCount(long pointer, long[] out);
36+
37+
/**
38+
* Populate {@code out} with {@code [bytes, precision]}, the sum of on-storage file sizes for the data source.
39+
* Precision is one of {@code 0=unknown}, {@code 1=estimate}, {@code 2=exact}.
40+
*/
41+
public static native void byteSize(long pointer, long[] out);
3642
}

java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScan.java

Lines changed: 117 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,46 @@
33

44
package dev.vortex.spark.read;
55

6+
import dev.vortex.api.DataSource;
7+
import dev.vortex.api.Session;
8+
import dev.vortex.jni.NativeFiles;
9+
import dev.vortex.spark.VortexSparkSession;
10+
import java.util.HashMap;
611
import java.util.List;
712
import java.util.Map;
13+
import java.util.OptionalLong;
14+
import java.util.stream.Collectors;
15+
import java.util.stream.Stream;
816
import org.apache.spark.sql.connector.catalog.CatalogV2Util;
917
import org.apache.spark.sql.connector.catalog.Column;
18+
import org.apache.spark.sql.connector.expressions.NamedReference;
1019
import org.apache.spark.sql.connector.read.Batch;
1120
import org.apache.spark.sql.connector.read.Scan;
21+
import org.apache.spark.sql.connector.read.Statistics;
22+
import org.apache.spark.sql.connector.read.SupportsReportStatistics;
23+
import org.apache.spark.sql.connector.read.colstats.ColumnStatistics;
24+
import org.apache.spark.sql.internal.SQLConf;
1225
import org.apache.spark.sql.types.StructType;
1326

14-
/** Spark V2 {@link Scan} over a table of Vortex files. */
15-
public final class VortexScan implements Scan {
27+
/**
28+
* Spark V2 {@link Scan} over a table of Vortex files.
29+
*
30+
* <p>Implements {@link SupportsReportStatistics} to surface both the row count Vortex records in each file footer and a
31+
* Spark scan-size estimate. The byte estimate starts from the on-storage file sizes collected by
32+
* {@code MultiFileDataSource}, then follows Spark's file scan convention by applying the SQL file-compression factor
33+
* and scaling by the pushed read schema's default size relative to the full table schema's default size. When the
34+
* listing did not return a size for one or more files the file-byte total is extrapolated before Spark scaling is
35+
* applied.
36+
*/
37+
public final class VortexScan implements Scan, SupportsReportStatistics {
1638

1739
private final List<String> paths;
40+
private final List<Column> tableColumns;
1841
private final List<Column> readColumns;
1942
private final Map<String, String> formatOptions;
2043

44+
private volatile Statistics cachedStatistics;
45+
2146
/**
2247
* Creates a new VortexScan for the specified file paths and columns. The caller is responsible for passing
2348
* immutable collections; the constructor does not copy.
@@ -26,7 +51,24 @@ public final class VortexScan implements Scan {
2651
* @param readColumns the list of columns to read from the files
2752
*/
2853
public VortexScan(List<String> paths, List<Column> readColumns, Map<String, String> formatOptions) {
54+
this(paths, readColumns, readColumns, formatOptions);
55+
}
56+
57+
/**
58+
* Creates a new VortexScan for the specified file paths, table columns, and requested read columns. The caller is
59+
* responsible for passing immutable collections; the constructor does not copy.
60+
*
61+
* @param paths the list of Vortex file paths to scan
62+
* @param tableColumns the full table columns before projection pushdown
63+
* @param readColumns the columns Spark requested after projection pushdown
64+
*/
65+
public VortexScan(
66+
List<String> paths,
67+
List<Column> tableColumns,
68+
List<Column> readColumns,
69+
Map<String, String> formatOptions) {
2970
this.paths = paths;
71+
this.tableColumns = tableColumns;
3072
this.readColumns = readColumns;
3173
this.formatOptions = formatOptions;
3274
}
@@ -72,4 +114,77 @@ public Batch toBatch() {
72114
public ColumnarSupportMode columnarSupportMode() {
73115
return ColumnarSupportMode.SUPPORTED;
74116
}
117+
118+
/**
119+
* Returns statistics for this scan.
120+
*
121+
* <p>Opens the Vortex {@link DataSource} on first invocation and caches the result. The row count is taken from the
122+
* data source (sum of file-footer row counts; extrapolated from the first opened file when other files are
123+
* deferred). {@link Statistics#sizeInBytes()} is derived from the per-file sizes reported by the filesystem
124+
* listing, then adjusted by Spark's compression factor and the ratio between the pushed read schema and the full
125+
* table schema. When a listing did not return a size for some file the file-byte total is extrapolated. When no
126+
* file size is known at all the value is left empty so Spark falls back to its default heuristic.
127+
*
128+
* @return statistics with row-count and Spark scan-size estimates
129+
*/
130+
@Override
131+
public Statistics estimateStatistics() {
132+
Statistics local = cachedStatistics;
133+
if (local != null) {
134+
return local;
135+
}
136+
synchronized (this) {
137+
if (cachedStatistics == null) {
138+
cachedStatistics = computeStatistics();
139+
}
140+
return cachedStatistics;
141+
}
142+
}
143+
144+
private Statistics computeStatistics() {
145+
Session session = VortexSparkSession.get(formatOptions);
146+
// Expand directory paths to concrete files the way VortexBatchExec does, so we use the
147+
// same per-path resolution end-to-end.
148+
List<String> resolvedPaths = paths.stream()
149+
.flatMap(path -> path.endsWith(".vortex")
150+
? Stream.of(path)
151+
: NativeFiles.listFiles(session, path, formatOptions).stream())
152+
.collect(Collectors.toList());
153+
154+
if (resolvedPaths.isEmpty()) {
155+
return new VortexStatistics(OptionalLong.empty(), OptionalLong.empty());
156+
}
157+
158+
DataSource source = DataSource.open(session, resolvedPaths, formatOptions);
159+
return new VortexStatistics(
160+
source.rowCount().asOptional(),
161+
scaleSizeInBytes(source.byteSize().asOptional()));
162+
}
163+
164+
private OptionalLong scaleSizeInBytes(OptionalLong fileBytes) {
165+
if (fileBytes.isEmpty()) {
166+
return OptionalLong.empty();
167+
}
168+
169+
StructType tableSchema = CatalogV2Util.v2ColumnsToStructType(tableColumns.toArray(new Column[0]));
170+
StructType readSchema = readSchema();
171+
int tableDefaultSize = tableSchema.defaultSize();
172+
if (tableDefaultSize <= 0) {
173+
return fileBytes;
174+
}
175+
176+
double scaled = SQLConf.get().fileCompressionFactor()
177+
* fileBytes.getAsLong()
178+
/ tableDefaultSize
179+
* readSchema.defaultSize();
180+
return OptionalLong.of((long) scaled);
181+
}
182+
183+
private record VortexStatistics(OptionalLong numRows, OptionalLong sizeInBytes) implements Statistics {
184+
185+
@Override
186+
public Map<NamedReference, ColumnStatistics> columnStats() {
187+
return new HashMap<>();
188+
}
189+
}
75190
}

java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScanBuilder.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@
1919
/** Spark V2 {@link ScanBuilder} for table scans over Vortex files. */
2020
public final class VortexScanBuilder implements ScanBuilder, SupportsPushDownRequiredColumns {
2121
private final ImmutableList.Builder<String> paths;
22-
private final List<Column> columns;
22+
private final List<Column> tableColumns;
23+
private final List<Column> readColumns;
2324
private final Map<String, String> formatOptions;
2425

2526
/** Creates a new VortexScanBuilder with empty paths and columns. */
2627
public VortexScanBuilder(Map<String, String> formatOptions) {
2728
this.paths = ImmutableList.builder();
28-
this.columns = new ArrayList<>();
29+
this.tableColumns = new ArrayList<>();
30+
this.readColumns = new ArrayList<>();
2931
this.formatOptions = Map.copyOf(formatOptions);
3032
}
3133

@@ -47,7 +49,8 @@ public VortexScanBuilder addPath(String path) {
4749
* @return this builder for method chaining
4850
*/
4951
public VortexScanBuilder addColumn(Column column) {
50-
this.columns.add(column);
52+
this.tableColumns.add(column);
53+
this.readColumns.add(column);
5154
return this;
5255
}
5356

@@ -70,7 +73,7 @@ public VortexScanBuilder addAllPaths(Iterable<String> paths) {
7073
*/
7174
public VortexScanBuilder addAllColumns(Iterable<Column> columns) {
7275
for (Column column : columns) {
73-
this.columns.add(column);
76+
addColumn(column);
7477
}
7578
return this;
7679
}
@@ -89,7 +92,7 @@ public Scan build() {
8992
// Allow empty columns for operations like count() that don't need actual column data
9093
// If no columns are specified, we'll read the minimal schema needed
9194

92-
return new VortexScan(paths, List.copyOf(this.columns), this.formatOptions);
95+
return new VortexScan(paths, List.copyOf(this.tableColumns), List.copyOf(this.readColumns), this.formatOptions);
9396
}
9497

9598
/**
@@ -103,9 +106,9 @@ public Scan build() {
103106
@Override
104107
public void pruneColumns(StructType requiredSchema) {
105108
// TODO(aduffy): support deeply nested schema prunes
106-
columns.clear();
109+
readColumns.clear();
107110
for (StructField field : requiredSchema.fields()) {
108-
columns.add(Column.create(field.name(), field.dataType()));
111+
readColumns.add(Column.create(field.name(), field.dataType()));
109112
}
110113
}
111114
}

0 commit comments

Comments
 (0)