Skip to content

Commit c1aa981

Browse files
authored
feat: blob v2 descriptor read support (#548)
1 parent 1e862f9 commit c1aa981

8 files changed

Lines changed: 413 additions & 6 deletions

File tree

docs/src/config.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -500,3 +500,18 @@ Lance Spark maintains index and metadata caches to minimize redundant I/O. Cache
500500
| `LANCE_METADATA_CACHE_SIZE`| 1GB | Metadata cache size in bytes. |
501501

502502
For details on how caching works and tuning recommendations, see [Performance Tuning - Caching](performance.md#caching).
503+
504+
## Blob v2 Reads
505+
506+
Lance datasets that contain a blob v2 column expose that column to Spark as the native 5-field descriptor struct: `struct<kind:short, position:long, size:long, blob_id:long, blob_uri:string>`. Querying the descriptor never fetches the blob bytes, so `SELECT payload.size` and `SELECT payload.blob_uri` are cheap.
507+
508+
```sql
509+
-- Query metadata only (no byte fetch):
510+
SELECT id, payload.size, payload.kind FROM lance.ns.tbl;
511+
```
512+
513+
A column is treated as blob v2 when the Arrow field carries `ARROW:extension:name = lance.blob.v2`, matching lance-core's blob v2 extension type.
514+
515+
Filter pushdown for SQL `WHERE` is disabled on blob v2 tables; Spark evaluates predicates after the scan. Zonemap-based fragment pruning still runs.
516+
517+
The connector does not materialize blob bytes on read; queries against descriptor fields fetch metadata only.

lance-spark-base_2.12/src/main/java/org/lance/spark/LanceDataset.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ public String name() {
303303

304304
@Override
305305
public StructType schema() {
306-
return sparkSchema;
306+
return BlobUtils.applyBlobV2DescriptorSchema(sparkSchema);
307307
}
308308

309309
@Override

lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScanBuilder.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.lance.schema.LanceSchema;
2727
import org.lance.spark.LanceSparkReadOptions;
2828
import org.lance.spark.sharding.SparkLanceShardingUtils;
29+
import org.lance.spark.utils.BlobUtils;
2930
import org.lance.spark.utils.Optional;
3031
import org.lance.spark.utils.Utils;
3132

@@ -117,8 +118,8 @@ public LanceScanBuilder(
117118
String namespaceImpl,
118119
java.util.Map<String, String> namespaceProperties,
119120
ShardingSpec shardingSpec) {
120-
this.fullSchema = schema;
121-
this.schema = schema;
121+
this.fullSchema = BlobUtils.applyBlobV2DescriptorSchema(schema);
122+
this.schema = this.fullSchema;
122123
this.readOptions = readOptions;
123124
this.initialStorageOptions = initialStorageOptions;
124125
this.namespaceImpl = namespaceImpl;

lance-spark-base_2.12/src/main/java/org/lance/spark/utils/BlobUtils.java

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,36 @@
1313
*/
1414
package org.lance.spark.utils;
1515

16+
import org.apache.arrow.vector.types.pojo.ArrowType;
17+
import org.apache.arrow.vector.types.pojo.Field;
18+
import org.apache.spark.sql.types.DataTypes;
19+
import org.apache.spark.sql.types.Metadata;
1620
import org.apache.spark.sql.types.StructField;
21+
import org.apache.spark.sql.types.StructType;
22+
23+
import java.util.List;
24+
import java.util.Map;
1725

1826
public class BlobUtils {
1927

2028
public static final String LANCE_ENCODING_BLOB_KEY = "lance-encoding:blob";
2129
public static final String LANCE_ENCODING_BLOB_VALUE = "true";
2230

31+
public static final String ARROW_EXTENSION_NAME_KEY = "ARROW:extension:name";
32+
public static final String ARROW_EXTENSION_BLOB_V2 = "lance.blob.v2";
33+
34+
/**
35+
* Spark struct type for a Lance blob v2 descriptor: {@code kind, position, size, blob_id,
36+
* blob_uri}.
37+
*/
38+
public static final StructType BLOB_DESCRIPTOR_STRUCT =
39+
new StructType()
40+
.add("kind", DataTypes.ShortType)
41+
.add("position", DataTypes.LongType)
42+
.add("size", DataTypes.LongType)
43+
.add("blob_id", DataTypes.LongType)
44+
.add("blob_uri", DataTypes.StringType);
45+
2346
/**
2447
* Check if a Spark field is a blob field based on its metadata.
2548
*
@@ -66,4 +89,83 @@ public static boolean isBlobArrowField(org.apache.arrow.vector.types.pojo.Field
6689
String value = metadata.get(LANCE_ENCODING_BLOB_KEY);
6790
return LANCE_ENCODING_BLOB_VALUE.equalsIgnoreCase(value);
6891
}
92+
93+
/** Returns true when a Spark field carries the lance-core blob v2 Arrow extension. */
94+
public static boolean isBlobV2SparkField(StructField field) {
95+
return field != null && isBlobV2SparkMetadata(field.metadata());
96+
}
97+
98+
public static boolean isBlobV2SparkMetadata(Metadata metadata) {
99+
if (metadata == null) {
100+
return false;
101+
}
102+
103+
return metadata.contains(ARROW_EXTENSION_NAME_KEY)
104+
&& ARROW_EXTENSION_BLOB_V2.equals(metadata.getString(ARROW_EXTENSION_NAME_KEY));
105+
}
106+
107+
/**
108+
* Arrow-side counterpart of {@link #isBlobV2SparkField} used inside the columnar batch scanner.
109+
*/
110+
public static boolean isBlobV2ArrowField(Field field) {
111+
if (field == null) {
112+
return false;
113+
}
114+
115+
Map<String, String> metadata = field.getMetadata();
116+
if (metadata != null
117+
&& ARROW_EXTENSION_BLOB_V2.equals(metadata.get(ARROW_EXTENSION_NAME_KEY))) {
118+
return true;
119+
}
120+
121+
// lance-core scan batches expose the unloaded descriptor struct (no extension metadata).
122+
return isBlobV2DescriptorArrowField(field);
123+
}
124+
125+
private static boolean isBlobV2DescriptorArrowField(Field field) {
126+
if (!(field.getType() instanceof ArrowType.Struct)) {
127+
return false;
128+
}
129+
List<Field> children = field.getChildren();
130+
if (children == null || children.size() != BLOB_DESCRIPTOR_STRUCT.fields().length) {
131+
return false;
132+
}
133+
StructField[] expected = BLOB_DESCRIPTOR_STRUCT.fields();
134+
for (int i = 0; i < expected.length; i++) {
135+
if (!expected[i].name().equals(children.get(i).getName())) {
136+
return false;
137+
}
138+
}
139+
return true;
140+
}
141+
142+
/** Returns true if any field in {@code schema} is a blob v2 column. */
143+
public static boolean hasBlobV2Fields(StructType schema) {
144+
for (StructField field : schema.fields()) {
145+
if (isBlobV2SparkField(field)) {
146+
return true;
147+
}
148+
}
149+
150+
return false;
151+
}
152+
153+
/** Rewrites blob v2 columns to the descriptor struct returned by Lance. */
154+
public static StructType applyBlobV2DescriptorSchema(StructType schema) {
155+
StructField[] fields = new StructField[schema.fields().length];
156+
boolean changed = false;
157+
for (int i = 0; i < schema.fields().length; i++) {
158+
StructField field = schema.fields()[i];
159+
if (!isBlobV2SparkField(field)) {
160+
fields[i] = field;
161+
continue;
162+
}
163+
164+
fields[i] =
165+
new StructField(field.name(), BLOB_DESCRIPTOR_STRUCT, field.nullable(), field.metadata());
166+
changed = true;
167+
}
168+
169+
return changed ? new StructType(fields) : schema;
170+
}
69171
}

lance-spark-base_2.12/src/main/java/org/lance/spark/vectorized/LanceArrowColumnVector.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.arrow.vector.complex.ListVector;
3939
import org.apache.arrow.vector.complex.MapVector;
4040
import org.apache.arrow.vector.complex.StructVector;
41+
import org.apache.spark.sql.types.DataType;
4142
import org.apache.spark.sql.types.Decimal;
4243
import org.apache.spark.sql.util.LanceArrowUtils;
4344
import org.apache.spark.sql.vectorized.ArrowColumnVector;
@@ -71,7 +72,7 @@ public LanceArrowColumnVector(ValueVector vector) {
7172
}
7273

7374
public LanceArrowColumnVector(ValueVector vector, boolean closeVectorOnClose) {
74-
super(LanceArrowUtils.fromArrowField(vector.getField()));
75+
super(computeDataType(vector));
7576
this.closeVectorOnClose = closeVectorOnClose;
7677

7778
if (vector instanceof UInt1Vector) {
@@ -86,6 +87,8 @@ public LanceArrowColumnVector(ValueVector vector, boolean closeVectorOnClose) {
8687
fixedSizeBinaryAccessor = new FixedSizeBinaryAccessor((FixedSizeBinaryVector) vector);
8788
} else if (vector instanceof FixedSizeListVector) {
8889
fixedSizeListAccessor = new FixedSizeListAccessor((FixedSizeListVector) vector);
90+
} else if (vector instanceof StructVector && BlobUtils.isBlobV2ArrowField(vector.getField())) {
91+
structAccessor = new LanceStructAccessor((StructVector) vector);
8992
} else if (vector instanceof StructVector && BlobUtils.isBlobArrowField(vector.getField())) {
9093
blobStructAccessor = new BlobStructAccessor((StructVector) vector);
9194
} else if (vector instanceof StructVector) {
@@ -522,4 +525,11 @@ public ColumnVector getChild(int ordinal) {
522525
public BlobStructAccessor getBlobStructAccessor() {
523526
return blobStructAccessor;
524527
}
528+
529+
private static DataType computeDataType(ValueVector vector) {
530+
if (vector instanceof StructVector && BlobUtils.isBlobV2ArrowField(vector.getField())) {
531+
return BlobUtils.BLOB_DESCRIPTOR_STRUCT;
532+
}
533+
return LanceArrowUtils.fromArrowField(vector.getField());
534+
}
525535
}

lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/util/LanceArrowUtils.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ object LanceArrowUtils {
4444
val ARROW_FIXED_SIZE_LIST_SIZE_KEY = VectorUtils.ARROW_FIXED_SIZE_LIST_SIZE_KEY
4545
val ARROW_FLOAT16_KEY = Float16Utils.ARROW_FLOAT16_KEY
4646
val ENCODING_BLOB = BlobUtils.LANCE_ENCODING_BLOB_KEY
47+
val ARROW_EXT_NAME_KEY = BlobUtils.ARROW_EXTENSION_NAME_KEY
48+
val BLOB_V2_EXT_NAME = BlobUtils.ARROW_EXTENSION_BLOB_V2
4749
val ARROW_LARGE_VAR_CHAR_KEY = LargeVarCharUtils.ARROW_LARGE_VAR_CHAR_KEY
4850
val ARROW_DATE_MILLISECOND_KEY = DateMilliUtils.ARROW_DATE_MILLISECOND_KEY
4951
val ARROW_FIXED_SIZE_BINARY_BYTE_WIDTH_KEY =
@@ -108,6 +110,8 @@ object LanceArrowUtils {
108110
val elementType = fromArrowField(elementField)
109111
val containsNull = elementField.isNullable
110112
ArrayType(elementType, containsNull)
113+
case _: ArrowType.Struct if isBlobField(field) =>
114+
BinaryType
111115
case _: ArrowType.Struct =>
112116
// Always recurse through LanceArrowUtils for struct children so special cases
113117
// like Date(MILLISECOND), FixedSizeBinary, etc. are applied in nested schemas too.
@@ -646,8 +650,10 @@ object LanceArrowUtils {
646650

647651
private def isBlobField(field: Field): Boolean = {
648652
val metadata = field.getMetadata
649-
metadata != null && metadata.containsKey(ENCODING_BLOB) &&
650-
"true".equalsIgnoreCase(metadata.get(ENCODING_BLOB))
653+
if (metadata == null) return false
654+
(metadata.containsKey(ENCODING_BLOB) &&
655+
"true".equalsIgnoreCase(metadata.get(ENCODING_BLOB))) ||
656+
BLOB_V2_EXT_NAME.equals(metadata.get(ARROW_EXT_NAME_KEY))
651657
}
652658
}
653659

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package org.lance.spark.utils;
15+
16+
import org.apache.arrow.vector.types.pojo.ArrowType;
17+
import org.apache.arrow.vector.types.pojo.Field;
18+
import org.apache.arrow.vector.types.pojo.FieldType;
19+
import org.apache.spark.sql.types.DataTypes;
20+
import org.apache.spark.sql.types.Metadata;
21+
import org.apache.spark.sql.types.MetadataBuilder;
22+
import org.apache.spark.sql.types.StructField;
23+
import org.apache.spark.sql.types.StructType;
24+
import org.junit.jupiter.api.Test;
25+
26+
import java.util.Arrays;
27+
import java.util.Collections;
28+
29+
import static org.junit.jupiter.api.Assertions.assertEquals;
30+
import static org.junit.jupiter.api.Assertions.assertFalse;
31+
import static org.junit.jupiter.api.Assertions.assertTrue;
32+
33+
public class BlobUtilsTest {
34+
35+
@Test
36+
public void testBlobV2FieldWithArrowExtensionName() {
37+
assertTrue(BlobUtils.isBlobV2SparkField(blobV2Field()));
38+
}
39+
40+
@Test
41+
public void testBlobV2FieldNullSafety() {
42+
assertFalse(BlobUtils.isBlobV2SparkField(null));
43+
}
44+
45+
@Test
46+
public void testV1Field() {
47+
assertTrue(BlobUtils.isBlobSparkField(blobV1Field()));
48+
}
49+
50+
@Test
51+
public void testBlobV2ArrowFieldRejectsUnrelated() {
52+
Field f =
53+
new Field(
54+
"payload",
55+
new FieldType(true, ArrowType.Binary.INSTANCE, null, Collections.emptyMap()),
56+
null);
57+
assertFalse(BlobUtils.isBlobV2ArrowField(f));
58+
assertFalse(BlobUtils.isBlobV2ArrowField(null));
59+
}
60+
61+
@Test
62+
public void testHasBlobV2FieldsInSchema() {
63+
StructType schema =
64+
new StructType(
65+
new StructField[] {
66+
field("id", DataTypes.IntegerType), blobV2Field(),
67+
});
68+
assertTrue(BlobUtils.hasBlobV2Fields(schema));
69+
}
70+
71+
@Test
72+
public void testDescriptorStructShape() {
73+
StructType s = BlobUtils.BLOB_DESCRIPTOR_STRUCT;
74+
assertEquals(5, s.fields().length);
75+
assertEquals(DataTypes.ShortType, s.apply("kind").dataType());
76+
assertEquals(DataTypes.LongType, s.apply("position").dataType());
77+
assertEquals(DataTypes.LongType, s.apply("size").dataType());
78+
assertEquals(DataTypes.LongType, s.apply("blob_id").dataType());
79+
assertEquals(DataTypes.StringType, s.apply("blob_uri").dataType());
80+
}
81+
82+
@Test
83+
public void testBlobV2DescriptorSchemaRewrite() {
84+
StructType schema =
85+
new StructType(
86+
new StructField[] {
87+
field("id", DataTypes.IntegerType), blobV2Field(),
88+
});
89+
StructType rewritten = BlobUtils.applyBlobV2DescriptorSchema(schema);
90+
assertEquals(DataTypes.IntegerType, rewritten.apply("id").dataType());
91+
assertEquals(BlobUtils.BLOB_DESCRIPTOR_STRUCT, rewritten.apply("payload").dataType());
92+
}
93+
94+
@Test
95+
public void testV1FieldsPreservedInRewrite() {
96+
StructType schema =
97+
new StructType(
98+
new StructField[] {
99+
field("id", DataTypes.IntegerType), blobV1Field(),
100+
});
101+
StructType rewritten = BlobUtils.applyBlobV2DescriptorSchema(schema);
102+
assertEquals(DataTypes.BinaryType, rewritten.apply("payload").dataType());
103+
}
104+
105+
@Test
106+
public void testUnloadedDescriptorStructRecognizedAsBlobV2() {
107+
Field f =
108+
new Field(
109+
"payload",
110+
new FieldType(true, ArrowType.Struct.INSTANCE, null, Collections.emptyMap()),
111+
Arrays.asList(
112+
intChild("kind"),
113+
intChild("position"),
114+
intChild("size"),
115+
intChild("blob_id"),
116+
utf8Child("blob_uri")));
117+
assertTrue(BlobUtils.isBlobV2ArrowField(f));
118+
assertFalse(BlobUtils.isBlobArrowField(f));
119+
}
120+
121+
private static Field intChild(String name) {
122+
return new Field(
123+
name,
124+
new FieldType(true, new ArrowType.Int(64, false), null, Collections.emptyMap()),
125+
null);
126+
}
127+
128+
private static Field utf8Child(String name) {
129+
return new Field(
130+
name, new FieldType(true, ArrowType.Utf8.INSTANCE, null, Collections.emptyMap()), null);
131+
}
132+
133+
private static StructField field(String name, org.apache.spark.sql.types.DataType dt) {
134+
return new StructField(name, dt, true, Metadata.empty());
135+
}
136+
137+
private static StructField blobV2Field() {
138+
Metadata md =
139+
new MetadataBuilder()
140+
.putString(BlobUtils.ARROW_EXTENSION_NAME_KEY, BlobUtils.ARROW_EXTENSION_BLOB_V2)
141+
.build();
142+
return new StructField("payload", DataTypes.BinaryType, true, md);
143+
}
144+
145+
private static StructField blobV1Field() {
146+
Metadata md =
147+
new MetadataBuilder()
148+
.putString(BlobUtils.LANCE_ENCODING_BLOB_KEY, BlobUtils.LANCE_ENCODING_BLOB_VALUE)
149+
.build();
150+
return new StructField("payload", DataTypes.BinaryType, true, md);
151+
}
152+
}

0 commit comments

Comments
 (0)