Skip to content

Commit 1f819d6

Browse files
committed
feat: blob v2 descriptor read support
1 parent b5439c2 commit 1f819d6

8 files changed

Lines changed: 455 additions & 10 deletions

File tree

docs/src/config.md

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -499,4 +499,19 @@ Lance Spark maintains index and metadata caches to minimize redundant I/O. Cache
499499
| `LANCE_INDEX_CACHE_SIZE` | 6GB | Index cache size in bytes. |
500500
| `LANCE_METADATA_CACHE_SIZE`| 1GB | Metadata cache size in bytes. |
501501

502-
For details on how caching works and tuning recommendations, see [Performance Tuning - Caching](performance.md#caching).
502+
For details on how caching works and tuning recommendations, see [Performance Tuning - Caching](performance.md#caching).
503+
504+
## Blob v2 Reads
505+
506+
Lance datasets that contain a blob v2 column expose that column to Spark as the native 5-field descriptor struct: `struct<kind:short, position:long, size:long, blob_id:long, blob_uri:string>`. Querying the descriptor never fetches the blob bytes, so `SELECT payload.size` and `SELECT payload.blob_uri` are cheap.
507+
508+
```sql
509+
-- Query metadata only (no byte fetch):
510+
SELECT id, payload.size, payload.kind FROM lance.ns.tbl;
511+
```
512+
513+
A column is treated as blob v2 when the Arrow field carries `ARROW:extension:name = lance.blob.v2`, matching lance-core's blob v2 extension type.
514+
515+
Filter pushdown for SQL `WHERE` is disabled on blob v2 tables; Spark evaluates predicates after the scan. Zonemap-based fragment pruning still runs.
516+
517+
The connector does not materialize blob bytes on read; queries against descriptor fields fetch metadata only.

lance-spark-base_2.12/src/main/java/org/lance/spark/LanceDataset.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ public String name() {
303303

304304
@Override
305305
public StructType schema() {
306-
return sparkSchema;
306+
return BlobUtils.applyBlobV2DescriptorSchema(sparkSchema);
307307
}
308308

309309
@Override

lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScanBuilder.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.lance.schema.LanceSchema;
2727
import org.lance.spark.LanceSparkReadOptions;
2828
import org.lance.spark.sharding.SparkLanceShardingUtils;
29+
import org.lance.spark.utils.BlobUtils;
2930
import org.lance.spark.utils.Optional;
3031
import org.lance.spark.utils.Utils;
3132

@@ -117,8 +118,8 @@ public LanceScanBuilder(
117118
String namespaceImpl,
118119
java.util.Map<String, String> namespaceProperties,
119120
ShardingSpec shardingSpec) {
120-
this.fullSchema = schema;
121-
this.schema = schema;
121+
this.fullSchema = BlobUtils.applyBlobV2DescriptorSchema(schema);
122+
this.schema = this.fullSchema;
122123
this.readOptions = readOptions;
123124
this.initialStorageOptions = initialStorageOptions;
124125
this.namespaceImpl = namespaceImpl;
@@ -236,7 +237,9 @@ public Scan build() {
236237
closeLazyDataset();
237238

238239
Optional<String> whereCondition =
239-
FilterPushDown.compileFiltersToSqlWhereClause(pushedPredicates);
240+
BlobUtils.allowsFilterPushdown(fullSchema)
241+
? FilterPushDown.compileFiltersToSqlWhereClause(pushedPredicates)
242+
: Optional.empty();
240243
return new LanceScan(
241244
schema,
242245
readOptions,
@@ -268,6 +271,12 @@ public Predicate[] pushPredicates(Predicate[] predicates) {
268271
}
269272
Predicate[][] processed = FilterPushDown.processPredicates(predicates);
270273
pushedPredicates = processed[0];
274+
// Blob v2 is not safe with Lance SQL filter pushdown yet. Return predicates as
275+
// unhandled so Spark evaluates them post-scan, while still retaining
276+
// pushedPredicates for connector-side zonemap fragment pruning.
277+
if (!BlobUtils.allowsFilterPushdown(fullSchema)) {
278+
return predicates;
279+
}
271280
return processed[1];
272281
}
273282

lance-spark-base_2.12/src/main/java/org/lance/spark/utils/BlobUtils.java

Lines changed: 112 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,36 @@
1313
*/
1414
package org.lance.spark.utils;
1515

16+
import org.apache.arrow.vector.types.pojo.ArrowType;
17+
import org.apache.arrow.vector.types.pojo.Field;
18+
import org.apache.spark.sql.types.DataTypes;
19+
import org.apache.spark.sql.types.Metadata;
1620
import org.apache.spark.sql.types.StructField;
21+
import org.apache.spark.sql.types.StructType;
22+
23+
import java.util.List;
24+
import java.util.Map;
1725

1826
public class BlobUtils {
1927

2028
public static final String LANCE_ENCODING_BLOB_KEY = "lance-encoding:blob";
2129
public static final String LANCE_ENCODING_BLOB_VALUE = "true";
2230

31+
public static final String ARROW_EXTENSION_NAME_KEY = "ARROW:extension:name";
32+
public static final String ARROW_EXTENSION_BLOB_V2 = "lance.blob.v2";
33+
34+
/**
35+
* Spark struct type for a Lance blob v2 descriptor: {@code kind, position, size, blob_id,
36+
* blob_uri}.
37+
*/
38+
public static final StructType BLOB_DESCRIPTOR_STRUCT =
39+
new StructType()
40+
.add("kind", DataTypes.ShortType)
41+
.add("position", DataTypes.LongType)
42+
.add("size", DataTypes.LongType)
43+
.add("blob_id", DataTypes.LongType)
44+
.add("blob_uri", DataTypes.StringType);
45+
2346
/**
2447
* Check if a Spark field is a blob field based on its metadata.
2548
*
@@ -40,7 +63,7 @@ public static boolean isBlobSparkField(StructField field) {
4063
}
4164

4265
String value = field.metadata().getString(LANCE_ENCODING_BLOB_KEY);
43-
return LANCE_ENCODING_BLOB_VALUE.equalsIgnoreCase(value);
66+
return LANCE_ENCODING_BLOB_VALUE.equalsIgnoreCase(value) && !isBlobV2SparkField(field);
4467
}
4568

4669
/**
@@ -64,6 +87,93 @@ public static boolean isBlobArrowField(org.apache.arrow.vector.types.pojo.Field
6487
}
6588

6689
String value = metadata.get(LANCE_ENCODING_BLOB_KEY);
67-
return LANCE_ENCODING_BLOB_VALUE.equalsIgnoreCase(value);
90+
return LANCE_ENCODING_BLOB_VALUE.equalsIgnoreCase(value) && !isBlobV2ArrowField(field);
91+
}
92+
93+
/** Returns true when a Spark field carries the lance-core blob v2 Arrow extension. */
94+
public static boolean isBlobV2SparkField(StructField field) {
95+
return field != null && isBlobV2SparkMetadata(field.metadata());
96+
}
97+
98+
public static boolean isBlobV2SparkMetadata(Metadata metadata) {
99+
if (metadata == null) {
100+
return false;
101+
}
102+
103+
return metadata.contains(ARROW_EXTENSION_NAME_KEY)
104+
&& ARROW_EXTENSION_BLOB_V2.equals(metadata.getString(ARROW_EXTENSION_NAME_KEY));
105+
}
106+
107+
/**
108+
* Arrow-side counterpart of {@link #isBlobV2SparkField} used inside the columnar batch scanner.
109+
*/
110+
public static boolean isBlobV2ArrowField(Field field) {
111+
if (field == null) {
112+
return false;
113+
}
114+
115+
Map<String, String> metadata = field.getMetadata();
116+
if (metadata == null) {
117+
return false;
118+
}
119+
120+
if (ARROW_EXTENSION_BLOB_V2.equals(metadata.get(ARROW_EXTENSION_NAME_KEY))) {
121+
return true;
122+
}
123+
124+
// lance-core scan batches expose the unloaded descriptor struct (no extension metadata).
125+
return isBlobV2DescriptorArrowField(field);
126+
}
127+
128+
private static boolean isBlobV2DescriptorArrowField(Field field) {
129+
if (!(field.getType() instanceof ArrowType.Struct)) {
130+
return false;
131+
}
132+
List<Field> children = field.getChildren();
133+
if (children == null || children.size() != BLOB_DESCRIPTOR_STRUCT.fields().length) {
134+
return false;
135+
}
136+
StructField[] expected = BLOB_DESCRIPTOR_STRUCT.fields();
137+
for (int i = 0; i < expected.length; i++) {
138+
if (!expected[i].name().equals(children.get(i).getName())) {
139+
return false;
140+
}
141+
}
142+
return true;
143+
}
144+
145+
/** Returns true if any field in {@code schema} is a blob v2 column. */
146+
public static boolean hasBlobV2Fields(StructType schema) {
147+
for (StructField field : schema.fields()) {
148+
if (isBlobV2SparkField(field)) {
149+
return true;
150+
}
151+
}
152+
153+
return false;
154+
}
155+
156+
/** Returns true unless {@code schema} contains a blob v2 column. */
157+
public static boolean allowsFilterPushdown(StructType schema) {
158+
return !hasBlobV2Fields(schema);
159+
}
160+
161+
/** Rewrites blob v2 columns to the descriptor struct returned by Lance. */
162+
public static StructType applyBlobV2DescriptorSchema(StructType schema) {
163+
StructField[] fields = new StructField[schema.fields().length];
164+
boolean changed = false;
165+
for (int i = 0; i < schema.fields().length; i++) {
166+
StructField field = schema.fields()[i];
167+
if (!isBlobV2SparkField(field)) {
168+
fields[i] = field;
169+
continue;
170+
}
171+
172+
fields[i] =
173+
new StructField(field.name(), BLOB_DESCRIPTOR_STRUCT, field.nullable(), field.metadata());
174+
changed = true;
175+
}
176+
177+
return changed ? new StructType(fields) : schema;
68178
}
69179
}

lance-spark-base_2.12/src/main/java/org/lance/spark/vectorized/LanceArrowColumnVector.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.arrow.vector.complex.ListVector;
3535
import org.apache.arrow.vector.complex.MapVector;
3636
import org.apache.arrow.vector.complex.StructVector;
37+
import org.apache.spark.sql.types.DataType;
3738
import org.apache.spark.sql.types.Decimal;
3839
import org.apache.spark.sql.util.LanceArrowUtils;
3940
import org.apache.spark.sql.vectorized.ArrowColumnVector;
@@ -66,7 +67,7 @@ public LanceArrowColumnVector(ValueVector vector) {
6667
}
6768

6869
public LanceArrowColumnVector(ValueVector vector, boolean closeVectorOnClose) {
69-
super(LanceArrowUtils.fromArrowField(vector.getField()));
70+
super(computeDataType(vector));
7071
this.closeVectorOnClose = closeVectorOnClose;
7172

7273
if (vector instanceof UInt1Vector) {
@@ -81,6 +82,8 @@ public LanceArrowColumnVector(ValueVector vector, boolean closeVectorOnClose) {
8182
fixedSizeBinaryAccessor = new FixedSizeBinaryAccessor((FixedSizeBinaryVector) vector);
8283
} else if (vector instanceof FixedSizeListVector) {
8384
fixedSizeListAccessor = new FixedSizeListAccessor((FixedSizeListVector) vector);
85+
} else if (vector instanceof StructVector && BlobUtils.isBlobV2ArrowField(vector.getField())) {
86+
structAccessor = new LanceStructAccessor((StructVector) vector);
8487
} else if (vector instanceof StructVector && BlobUtils.isBlobArrowField(vector.getField())) {
8588
blobStructAccessor = new BlobStructAccessor((StructVector) vector);
8689
} else if (vector instanceof StructVector) {
@@ -490,4 +493,11 @@ public ColumnVector getChild(int ordinal) {
490493
public BlobStructAccessor getBlobStructAccessor() {
491494
return blobStructAccessor;
492495
}
496+
497+
private static DataType computeDataType(ValueVector vector) {
498+
if (vector instanceof StructVector && BlobUtils.isBlobV2ArrowField(vector.getField())) {
499+
return BlobUtils.BLOB_DESCRIPTOR_STRUCT;
500+
}
501+
return LanceArrowUtils.fromArrowField(vector.getField());
502+
}
493503
}

lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/util/LanceArrowUtils.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ object LanceArrowUtils {
4444
val ARROW_FIXED_SIZE_LIST_SIZE_KEY = VectorUtils.ARROW_FIXED_SIZE_LIST_SIZE_KEY
4545
val ARROW_FLOAT16_KEY = Float16Utils.ARROW_FLOAT16_KEY
4646
val ENCODING_BLOB = BlobUtils.LANCE_ENCODING_BLOB_KEY
47+
val ARROW_EXT_NAME_KEY = BlobUtils.ARROW_EXTENSION_NAME_KEY
48+
val BLOB_V2_EXT_NAME = BlobUtils.ARROW_EXTENSION_BLOB_V2
4749
val ARROW_LARGE_VAR_CHAR_KEY = LargeVarCharUtils.ARROW_LARGE_VAR_CHAR_KEY
4850
val ARROW_DATE_MILLISECOND_KEY = DateMilliUtils.ARROW_DATE_MILLISECOND_KEY
4951

@@ -82,6 +84,8 @@ object LanceArrowUtils {
8284
val elementType = fromArrowField(elementField)
8385
val containsNull = elementField.isNullable
8486
ArrayType(elementType, containsNull)
87+
case _: ArrowType.Struct if isBlobField(field) =>
88+
BinaryType
8589
case _: ArrowType.Struct =>
8690
// Always recurse through LanceArrowUtils for struct children so special cases
8791
// like Date(MILLISECOND), FixedSizeBinary, etc. are applied in nested schemas too.
@@ -519,7 +523,9 @@ object LanceArrowUtils {
519523

520524
private def isBlobField(field: Field): Boolean = {
521525
val metadata = field.getMetadata
522-
metadata != null && metadata.containsKey(ENCODING_BLOB) &&
523-
"true".equalsIgnoreCase(metadata.get(ENCODING_BLOB))
526+
if (metadata == null) return false
527+
(metadata.containsKey(ENCODING_BLOB) &&
528+
"true".equalsIgnoreCase(metadata.get(ENCODING_BLOB))) ||
529+
BLOB_V2_EXT_NAME.equals(metadata.get(ARROW_EXT_NAME_KEY))
524530
}
525531
}

0 commit comments

Comments
 (0)