Skip to content

Commit 4b0c14b

Browse files
committed
more
Signed-off-by: Robert Kruszewski <github@robertk.io>
1 parent 14bd56e commit 4b0c14b

6 files changed

Lines changed: 282 additions & 18 deletions

File tree

java/vortex-spark/src/main/java/dev/vortex/spark/VortexDataSourceV2.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,20 @@
1111
import dev.vortex.api.Files;
1212
import dev.vortex.jni.NativeFileMethods;
1313
import dev.vortex.spark.config.HadoopUtils;
14+
import dev.vortex.spark.read.PartitionPathUtils;
1415
import java.util.Map;
1516
import java.util.Objects;
1617
import java.util.Optional;
18+
import java.util.Set;
19+
import java.util.stream.Collectors;
20+
import java.util.stream.Stream;
1721
import org.apache.spark.sql.SparkSession;
1822
import org.apache.spark.sql.connector.catalog.CatalogV2Util;
1923
import org.apache.spark.sql.connector.catalog.Table;
2024
import org.apache.spark.sql.connector.catalog.TableProvider;
2125
import org.apache.spark.sql.connector.expressions.Transform;
2226
import org.apache.spark.sql.sources.DataSourceRegister;
27+
import org.apache.spark.sql.types.DataType;
2328
import org.apache.spark.sql.types.StructType;
2429
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
2530
import scala.Option;
@@ -81,18 +86,31 @@ public StructType inferSchema(CaseInsensitiveStringMap options) {
8186
.findFirst();
8287

8388
if (firstFile.isEmpty()) {
84-
// Return empty struct if no files found
85-
// TODO(aduffy): how does Parquet handle this?
8689
return new StructType();
8790
} else {
8891
pathToInfer = firstFile.get();
8992
}
9093
}
9194

95+
StructType dataSchema;
9296
try (File file = Files.open(pathToInfer, formatOptions)) {
9397
var columns = SparkTypes.toColumns(file.getDType());
94-
return CatalogV2Util.v2ColumnsToStructType(columns);
98+
dataSchema = CatalogV2Util.v2ColumnsToStructType(columns);
9599
}
100+
101+
// Discover partition columns from Hive-style directory paths and append them.
102+
Map<String, String> partitionValues = PartitionPathUtils.parsePartitionValues(pathToInfer);
103+
if (!partitionValues.isEmpty()) {
104+
Set<String> dataColumnNames = Stream.of(dataSchema.fieldNames()).collect(Collectors.toSet());
105+
for (Map.Entry<String, String> entry : partitionValues.entrySet()) {
106+
if (!dataColumnNames.contains(entry.getKey())) {
107+
DataType type = PartitionPathUtils.inferPartitionColumnType(entry.getValue());
108+
dataSchema = dataSchema.add(entry.getKey(), type, true);
109+
}
110+
}
111+
}
112+
113+
return dataSchema;
96114
}
97115

98116
/**

java/vortex-spark/src/main/java/dev/vortex/spark/VortexFilePartition.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,25 @@ public final class VortexFilePartition implements InputPartition, Serializable {
2121
private final String path;
2222
private final ImmutableList<Column> columns;
2323
private final ImmutableMap<String, String> formatOptions;
24+
private final ImmutableMap<String, String> partitionValues;
2425

2526
/**
2627
* Creates a new Vortex file partition.
2728
*
2829
* @param path the file system path to the Vortex file
2930
* @param columns the list of columns to read from the file
31+
* @param formatOptions options for accessing the file (S3/Azure credentials, etc.)
32+
* @param partitionValues Hive-style partition column values extracted from the file path
3033
*/
31-
public VortexFilePartition(String path, ImmutableList<Column> columns, ImmutableMap<String, String> formatOptions) {
34+
public VortexFilePartition(
35+
String path,
36+
ImmutableList<Column> columns,
37+
ImmutableMap<String, String> formatOptions,
38+
ImmutableMap<String, String> partitionValues) {
3239
this.path = path;
3340
this.columns = columns;
3441
this.formatOptions = formatOptions;
42+
this.partitionValues = partitionValues;
3543
}
3644

3745
/**
@@ -55,4 +63,14 @@ public ImmutableList<Column> getColumns() {
5563
public Map<String, String> getFormatOptions() {
5664
return formatOptions;
5765
}
66+
67+
/**
68+
* Returns the partition column values parsed from this file's Hive-style directory path.
69+
* Keys are column names, values are the string-encoded partition values.
70+
*
71+
* @return the partition values, empty if the file is not in a partitioned directory
72+
*/
73+
public ImmutableMap<String, String> getPartitionValues() {
74+
return partitionValues;
75+
}
5876
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
package dev.vortex.spark.read;
5+
6+
import java.net.URLDecoder;
7+
import java.nio.charset.StandardCharsets;
8+
import java.util.LinkedHashMap;
9+
import java.util.Map;
10+
import org.apache.spark.sql.execution.vectorized.ConstantColumnVector;
11+
import org.apache.spark.sql.types.*;
12+
import org.apache.spark.unsafe.types.UTF8String;
13+
14+
/**
15+
* Utilities for discovering and materializing Hive-style partition columns from file paths.
16+
*/
17+
public final class PartitionPathUtils {
18+
private static final String HIVE_DEFAULT_PARTITION = "__HIVE_DEFAULT_PARTITION__";
19+
20+
private PartitionPathUtils() {}
21+
22+
/**
23+
* Parses Hive-style {@code key=value} segments from a file path.
24+
*
25+
* @return an ordered map of partition column names to their string values
26+
*/
27+
public static Map<String, String> parsePartitionValues(String filePath) {
28+
LinkedHashMap<String, String> values = new LinkedHashMap<>();
29+
String[] segments = filePath.split("/");
30+
for (String segment : segments) {
31+
int eqIdx = segment.indexOf('=');
32+
if (eqIdx > 0 && eqIdx < segment.length() - 1) {
33+
String key = URLDecoder.decode(segment.substring(0, eqIdx), StandardCharsets.UTF_8);
34+
String val = URLDecoder.decode(segment.substring(eqIdx + 1), StandardCharsets.UTF_8);
35+
values.put(key, val);
36+
}
37+
}
38+
return values;
39+
}
40+
41+
/**
42+
* Infers a Spark {@link DataType} from a partition value string.
43+
* Tries integer, long, double, boolean, and falls back to string.
44+
*/
45+
public static DataType inferPartitionColumnType(String value) {
46+
if (value == null || HIVE_DEFAULT_PARTITION.equals(value)) {
47+
return DataTypes.StringType;
48+
}
49+
try {
50+
Integer.parseInt(value);
51+
return DataTypes.IntegerType;
52+
} catch (NumberFormatException ignored) {
53+
}
54+
try {
55+
Long.parseLong(value);
56+
return DataTypes.LongType;
57+
} catch (NumberFormatException ignored) {
58+
}
59+
try {
60+
Double.parseDouble(value);
61+
return DataTypes.DoubleType;
62+
} catch (NumberFormatException ignored) {
63+
}
64+
if ("true".equalsIgnoreCase(value) || "false".equalsIgnoreCase(value)) {
65+
return DataTypes.BooleanType;
66+
}
67+
return DataTypes.StringType;
68+
}
69+
70+
/**
71+
* Creates a Spark {@link ConstantColumnVector} populated with the given partition value,
72+
* parsed according to the target {@link DataType}.
73+
*/
74+
public static ConstantColumnVector createConstantVector(int numRows, DataType type, String value) {
75+
ConstantColumnVector vec = new ConstantColumnVector(numRows, type);
76+
if (value == null || HIVE_DEFAULT_PARTITION.equals(value)) {
77+
vec.setNull();
78+
return vec;
79+
}
80+
vec.setNotNull();
81+
if (type instanceof StringType) {
82+
vec.setUtf8String(UTF8String.fromString(value));
83+
} else if (type instanceof IntegerType || type instanceof DateType) {
84+
vec.setInt(Integer.parseInt(value));
85+
} else if (type instanceof LongType || type instanceof TimestampType || type instanceof TimestampNTZType) {
86+
vec.setLong(Long.parseLong(value));
87+
} else if (type instanceof ShortType) {
88+
vec.setShort(Short.parseShort(value));
89+
} else if (type instanceof ByteType) {
90+
vec.setByte(Byte.parseByte(value));
91+
} else if (type instanceof BooleanType) {
92+
vec.setBoolean(Boolean.parseBoolean(value));
93+
} else if (type instanceof FloatType) {
94+
vec.setFloat(Float.parseFloat(value));
95+
} else if (type instanceof DoubleType) {
96+
vec.setDouble(Double.parseDouble(value));
97+
} else {
98+
vec.setUtf8String(UTF8String.fromString(value));
99+
}
100+
return vec;
101+
}
102+
}

java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexBatchExec.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.google.common.collect.ImmutableMap;
88
import dev.vortex.jni.NativeFileMethods;
99
import dev.vortex.spark.VortexFilePartition;
10+
import java.util.Map;
1011
import java.util.stream.Stream;
1112
import org.apache.spark.sql.connector.catalog.Column;
1213
import org.apache.spark.sql.connector.read.Batch;
@@ -44,17 +45,20 @@ public VortexBatchExec(
4445
*/
4546
@Override
4647
public InputPartition[] planInputPartitions() {
47-
// Scan all paths and assign each file its own partition
48+
// Scan all paths and assign each file its own partition.
49+
// For each discovered file, parse Hive-style partition values from the path.
4850
return paths.stream()
4951
.flatMap(path -> {
5052
if (path.endsWith(".vortex")) {
5153
return Stream.of(path);
5254
} else {
53-
// Scan and return the paths
5455
return NativeFileMethods.listVortexFiles(path, formatOptions).stream();
5556
}
5657
})
57-
.map(path -> new VortexFilePartition(path, columns, formatOptions))
58+
.map(path -> {
59+
Map<String, String> partVals = PartitionPathUtils.parsePartitionValues(path);
60+
return new VortexFilePartition(path, columns, formatOptions, ImmutableMap.copyOf(partVals));
61+
})
5862
.toArray(InputPartition[]::new);
5963
}
6064

java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexPartitionReader.java

Lines changed: 98 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,22 +9,33 @@
99
import dev.vortex.api.Files;
1010
import dev.vortex.api.ScanOptions;
1111
import dev.vortex.spark.VortexFilePartition;
12-
import java.util.List;
13-
import java.util.stream.Collectors;
12+
import java.util.*;
1413
import org.apache.spark.sql.connector.catalog.Column;
1514
import org.apache.spark.sql.connector.read.PartitionReader;
15+
import org.apache.spark.sql.vectorized.ColumnVector;
1616
import org.apache.spark.sql.vectorized.ColumnarBatch;
1717

1818
/**
1919
* A {@link PartitionReader} that reads columnar batches out of a Vortex file into
2020
* Vortex memory format.
21+
* <p>
22+
* When reading from partitioned directories, partition column values are extracted from the
23+
* Hive-style file path and materialized as Spark
24+
* {@link org.apache.spark.sql.execution.vectorized.ConstantColumnVector} instances that are
25+
* spliced into each output batch.
2126
*/
2227
final class VortexPartitionReader implements PartitionReader<ColumnarBatch> {
2328
private final VortexFilePartition partition;
2429

2530
private File file;
2631
private VortexColumnarBatchIterator batches;
2732

33+
/** Names of columns whose values come from the partition path rather than the data file. */
34+
private Set<String> partitionColumnNames;
35+
36+
/** Tracks the last data batch so its native memory can be freed properly. */
37+
private ColumnarBatch lastDataBatch;
38+
2839
VortexPartitionReader(VortexFilePartition partition) {
2940
this.partition = partition;
3041
initNativeResources();
@@ -33,29 +44,86 @@ final class VortexPartitionReader implements PartitionReader<ColumnarBatch> {
3344
@Override
3445
public boolean next() {
3546
checkNotNull(batches, "batches");
36-
3747
return batches.hasNext();
3848
}
3949

4050
@Override
4151
public ColumnarBatch get() {
4252
checkNotNull(batches, "closed ArrayStream");
43-
return batches.next();
53+
54+
// Free previous data batch native memory
55+
if (lastDataBatch != null) {
56+
lastDataBatch.close();
57+
lastDataBatch = null;
58+
}
59+
60+
ColumnarBatch dataBatch = batches.next();
61+
62+
if (partitionColumnNames.isEmpty()) {
63+
return dataBatch;
64+
}
65+
66+
// Track the data batch for lifecycle management
67+
lastDataBatch = dataBatch;
68+
return buildCombinedBatch(dataBatch);
69+
}
70+
71+
/**
72+
* Builds a combined batch with data columns from the file and constant partition columns
73+
* in the order expected by the full table schema.
74+
*/
75+
private ColumnarBatch buildCombinedBatch(ColumnarBatch dataBatch) {
76+
int rowCount = dataBatch.numRows();
77+
Map<String, String> partVals = partition.getPartitionValues();
78+
List<Column> allColumns = partition.getColumns();
79+
ColumnVector[] combined = new ColumnVector[allColumns.size()];
80+
81+
int dataIdx = 0;
82+
for (int i = 0; i < allColumns.size(); i++) {
83+
Column col = allColumns.get(i);
84+
String partValue = partVals.get(col.name());
85+
if (partValue != null) {
86+
combined[i] = PartitionPathUtils.createConstantVector(rowCount, col.dataType(), partValue);
87+
} else {
88+
combined[i] = dataBatch.column(dataIdx++);
89+
}
90+
}
91+
92+
return new CombinedColumnarBatch(combined, rowCount);
4493
}
4594

4695
/**
4796
* Initialize the Vortex File and ArrayStream resources.
97+
* <p>
98+
* Partition columns are identified by matching requested column names against the
99+
* partition values from the file path. Only non-partition columns are pushed down
100+
* to the Vortex scan.
48101
*/
49102
void initNativeResources() {
103+
Map<String, String> partVals = partition.getPartitionValues();
104+
this.partitionColumnNames = new HashSet<>();
105+
106+
List<String> dataColumnNames = new ArrayList<>();
107+
for (Column col : partition.getColumns()) {
108+
if (partVals.containsKey(col.name())) {
109+
partitionColumnNames.add(col.name());
110+
} else {
111+
dataColumnNames.add(col.name());
112+
}
113+
}
114+
50115
file = Files.open(partition.getPath(), partition.getFormatOptions());
51-
List<String> pushdownColumns =
52-
partition.getColumns().stream().map(Column::name).collect(Collectors.toList());
53116
batches = new VortexColumnarBatchIterator(
54-
file.newScan(ScanOptions.builder().columns(pushdownColumns).build()));
117+
file.newScan(ScanOptions.builder().columns(dataColumnNames).build()));
55118
}
56119

57120
@Override
58121
public void close() {
122+
if (lastDataBatch != null) {
123+
lastDataBatch.close();
124+
lastDataBatch = null;
125+
}
126+
59127
checkNotNull(file, "File was closed");
60128
checkNotNull(batches, "ArrayStream was closed");
61129

@@ -65,4 +133,27 @@ public void close() {
65133
file.close();
66134
file = null;
67135
}
136+
137+
/**
138+
* A ColumnarBatch that does not close its column vectors on {@link #close()}.
139+
* <p>
140+
* The data column vectors are owned by the underlying {@link VortexColumnarBatch}
141+
* (tracked via {@link #lastDataBatch}), and the constant partition vectors have trivial
142+
* lifecycle. Neither should be closed by this wrapper.
143+
*/
144+
private static final class CombinedColumnarBatch extends ColumnarBatch {
145+
CombinedColumnarBatch(ColumnVector[] columns, int numRows) {
146+
super(columns, numRows);
147+
}
148+
149+
@Override
150+
public void close() {
151+
// Intentionally empty: lifecycle is managed by VortexPartitionReader
152+
}
153+
154+
@Override
155+
public void closeIfFreeable() {
156+
// Intentionally empty
157+
}
158+
}
68159
}

0 commit comments

Comments
 (0)