|
4 | 4 | package dev.vortex.spark.read; |
5 | 5 |
|
6 | 6 | import java.util.Arrays; |
| 7 | +import dev.vortex.api.DataSource; |
| 8 | +import dev.vortex.api.Session; |
| 9 | +import dev.vortex.jni.NativeFiles; |
| 10 | +import dev.vortex.spark.VortexSparkSession; |
| 11 | +import java.util.HashMap; |
7 | 12 | import java.util.List; |
8 | 13 | import java.util.Map; |
| 14 | +import java.util.OptionalLong; |
| 15 | +import java.util.stream.Collectors; |
| 16 | +import java.util.stream.Stream; |
9 | 17 | import org.apache.spark.sql.connector.catalog.CatalogV2Util; |
10 | 18 | import org.apache.spark.sql.connector.catalog.Column; |
11 | 19 | import org.apache.spark.sql.connector.expressions.filter.Predicate; |
| 20 | +import org.apache.spark.sql.connector.expressions.NamedReference; |
12 | 21 | import org.apache.spark.sql.connector.read.Batch; |
13 | 22 | import org.apache.spark.sql.connector.read.Scan; |
| 23 | +import org.apache.spark.sql.connector.read.Statistics; |
| 24 | +import org.apache.spark.sql.connector.read.SupportsReportStatistics; |
| 25 | +import org.apache.spark.sql.connector.read.colstats.ColumnStatistics; |
| 26 | +import org.apache.spark.sql.internal.SQLConf; |
14 | 27 | import org.apache.spark.sql.types.StructType; |
15 | 28 |
|
16 | | -/** Spark V2 {@link Scan} over a table of Vortex files. */ |
17 | | -public final class VortexScan implements Scan { |
| 29 | +/** |
| 30 | + * Spark V2 {@link Scan} over a table of Vortex files. |
| 31 | + * |
| 32 | + * <p>Implements {@link SupportsReportStatistics} to surface both the row count Vortex records in each file footer and a |
| 33 | + * Spark scan-size estimate. The byte estimate starts from the on-storage file sizes collected by |
| 34 | + * {@code MultiFileDataSource}, then follows Spark's file scan convention by applying the SQL file-compression factor |
| 35 | + * and scaling by the pushed read schema's default size relative to the full table schema's default size. When the |
| 36 | + * listing did not return a size for one or more files the file-byte total is extrapolated before Spark scaling is |
| 37 | + * applied. |
| 38 | + */ |
| 39 | +public final class VortexScan implements Scan, SupportsReportStatistics { |
18 | 40 |
|
19 | 41 | private final List<String> paths; |
| 42 | + private final List<Column> tableColumns; |
20 | 43 | private final List<Column> readColumns; |
21 | 44 | private final Map<String, String> formatOptions; |
22 | 45 | private final Predicate[] pushedPredicates; |
23 | 46 |
|
| 47 | + private volatile Statistics cachedStatistics; |
| 48 | + |
24 | 49 | /** |
25 | 50 | * Creates a new VortexScan for the specified file paths and columns. The caller is responsible for passing |
26 | 51 | * immutable collections; the constructor does not copy. |
27 | 52 | * |
28 | 53 | * @param paths the list of Vortex file paths to scan |
| 54 | + * @param tableColumns the full table columns before projection pushdown |
29 | 55 | * @param readColumns the list of columns to read from the files |
30 | 56 | * @param pushedPredicates predicates pushed down by Spark; {@code null} or empty means no pushdown |
31 | 57 | */ |
32 | 58 | public VortexScan( |
33 | 59 | List<String> paths, |
| 60 | + List<Column> tableColumns, |
34 | 61 | List<Column> readColumns, |
35 | | - Map<String, String> formatOptions, |
36 | | - Predicate[] pushedPredicates) { |
| 62 | + Predicate[] pushedPredicates, |
| 63 | + Map<String, String> formatOptions) { |
37 | 64 | this.paths = paths; |
| 65 | + this.tableColumns = tableColumns; |
38 | 66 | this.readColumns = readColumns; |
39 | 67 | this.formatOptions = formatOptions; |
40 | 68 | this.pushedPredicates = pushedPredicates == null ? new Predicate[0] : pushedPredicates.clone(); |
@@ -83,4 +111,77 @@ public Batch toBatch() { |
83 | 111 | public ColumnarSupportMode columnarSupportMode() { |
84 | 112 | return ColumnarSupportMode.SUPPORTED; |
85 | 113 | } |
| 114 | + |
| 115 | + /** |
| 116 | + * Returns statistics for this scan. |
| 117 | + * |
| 118 | + * <p>Opens the Vortex {@link DataSource} on first invocation and caches the result. The row count is taken from the |
| 119 | + * data source (sum of file-footer row counts; extrapolated from the first opened file when other files are |
| 120 | + * deferred). {@link Statistics#sizeInBytes()} is derived from the per-file sizes reported by the filesystem |
| 121 | + * listing, then adjusted by Spark's compression factor and the ratio between the pushed read schema and the full |
| 122 | + * table schema. When a listing did not return a size for some file the file-byte total is extrapolated. When no |
| 123 | + * file size is known at all the value is left empty so Spark falls back to its default heuristic. |
| 124 | + * |
| 125 | + * @return statistics with row-count and Spark scan-size estimates |
| 126 | + */ |
| 127 | + @Override |
| 128 | + public Statistics estimateStatistics() { |
| 129 | + Statistics local = cachedStatistics; |
| 130 | + if (local != null) { |
| 131 | + return local; |
| 132 | + } |
| 133 | + synchronized (this) { |
| 134 | + if (cachedStatistics == null) { |
| 135 | + cachedStatistics = computeStatistics(); |
| 136 | + } |
| 137 | + return cachedStatistics; |
| 138 | + } |
| 139 | + } |
| 140 | + |
| 141 | + private Statistics computeStatistics() { |
| 142 | + Session session = VortexSparkSession.get(formatOptions); |
| 143 | + // Expand directory paths to concrete files the way VortexBatchExec does, so we use the |
| 144 | + // same per-path resolution end-to-end. |
| 145 | + List<String> resolvedPaths = paths.stream() |
| 146 | + .flatMap(path -> path.endsWith(".vortex") |
| 147 | + ? Stream.of(path) |
| 148 | + : NativeFiles.listFiles(session, path, formatOptions).stream()) |
| 149 | + .collect(Collectors.toList()); |
| 150 | + |
| 151 | + if (resolvedPaths.isEmpty()) { |
| 152 | + return new VortexStatistics(OptionalLong.empty(), OptionalLong.empty()); |
| 153 | + } |
| 154 | + |
| 155 | + DataSource source = DataSource.open(session, resolvedPaths, formatOptions); |
| 156 | + return new VortexStatistics( |
| 157 | + source.rowCount().asOptional(), |
| 158 | + scaleSizeInBytes(source.byteSize().asOptional())); |
| 159 | + } |
| 160 | + |
| 161 | + private OptionalLong scaleSizeInBytes(OptionalLong fileBytes) { |
| 162 | + if (fileBytes.isEmpty()) { |
| 163 | + return OptionalLong.empty(); |
| 164 | + } |
| 165 | + |
| 166 | + StructType tableSchema = CatalogV2Util.v2ColumnsToStructType(tableColumns.toArray(new Column[0])); |
| 167 | + StructType readSchema = readSchema(); |
| 168 | + int tableDefaultSize = tableSchema.defaultSize(); |
| 169 | + if (tableDefaultSize <= 0) { |
| 170 | + return fileBytes; |
| 171 | + } |
| 172 | + |
| 173 | + double scaled = SQLConf.get().fileCompressionFactor() |
| 174 | + * fileBytes.getAsLong() |
| 175 | + / tableDefaultSize |
| 176 | + * readSchema.defaultSize(); |
| 177 | + return OptionalLong.of((long) scaled); |
| 178 | + } |
| 179 | + |
| 180 | + private record VortexStatistics(OptionalLong numRows, OptionalLong sizeInBytes) implements Statistics { |
| 181 | + |
| 182 | + @Override |
| 183 | + public Map<NamedReference, ColumnStatistics> columnStats() { |
| 184 | + return new HashMap<>(); |
| 185 | + } |
| 186 | + } |
86 | 187 | } |
0 commit comments