Skip to content

Commit 0a14502

Browse files
authored
feat: blob v2 write support (#560)
1 parent c1aa981 commit 0a14502

13 files changed

Lines changed: 1215 additions & 33 deletions

File tree

docs/src/config.md

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -514,4 +514,48 @@ A column is treated as blob v2 when the Arrow field carries `ARROW:extension:nam
514514

515515
Filter pushdown for SQL `WHERE` is disabled on blob v2 tables; Spark evaluates predicates after the scan. Zonemap-based fragment pruning still runs.
516516

517-
The connector does not materialize blob bytes on read; queries against descriptor fields fetch metadata only.
517+
The connector does not materialize blob bytes on read; queries against descriptor fields fetch metadata only. See [Blob v2 Writes](#blob-v2-writes) below for the write path.
518+
519+
## Blob v2 Writes
520+
521+
To write blob v2 columns, set `file_format_version` to `2.2` or higher and set
522+
`<column>.lance.encoding = blob` in `TBLPROPERTIES`.
523+
524+
Spark still sees the column as `BINARY` when writing. The connector converts that binary
525+
value into the Arrow blob write struct during encoding.
526+
527+
On reads, blob v2 columns are exposed as descriptor structs. See
528+
[Blob v2 Reads](#blob-v2-reads). For writes, `INSERT` and DataFrame append still take
529+
`BINARY`.
530+
531+
```sql
532+
CREATE TABLE lance.mydb.users (
533+
id INT NOT NULL,
534+
content BINARY
535+
) USING lance
536+
TBLPROPERTIES (
537+
'content.lance.encoding' = 'blob',
538+
'file_format_version' = '2.2'
539+
);
540+
```
541+
542+
With `file_format_version = '2.2'` or higher, blob columns are written using blob v2
543+
encoding and `ARROW:extension:name = lance.blob.v2 metadata`.
544+
545+
With an older version, or when `file_format_version` is not set, blob columns use the
546+
legacy v1 encoding with `lance-encoding:blob = true` metadata.
547+
548+
Blob encoding requires a numeric `file_format_version`, such as `2.2`.
549+
550+
Blob v2 writes must go through the catalog path. Use SQL DDL with `TBLPROPERTIES`, as
551+
shown above, or use the `DataFrameWriterV2` API:
552+
553+
```python
554+
df.writeTo("lance.ns.users") \
555+
.tableProperty("content.lance.encoding", "blob") \
556+
.tableProperty("file_format_version", "2.2") \
557+
.create()
558+
```
559+
560+
Setting only `file_format_version` does not enable blob encoding. Without
561+
`<column>.lance.encoding = blob`, the column is written as plain `BINARY`.

docs/src/operations/ddl/create-table.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,33 @@ To create a table with blob columns, use the table property pattern `<column_nam
338338
.createOrReplace();
339339
```
340340

341+
### Blob v2 Columns
342+
343+
To create blob v2 columns, set the blob encoding property and use `file_format_version = '2.2'` or higher.
344+
345+
Spark writes blob v2 columns as `BINARY`. On reads, the same columns are exposed as
346+
descriptor structs. See [Blob v2 Reads](../../config.md#blob-v2-reads).
347+
348+
=== "SQL"
349+
```sql
350+
CREATE TABLE documents (
351+
id INT NOT NULL,
352+
content BINARY
353+
) USING lance
354+
TBLPROPERTIES (
355+
'content.lance.encoding' = 'blob',
356+
'file_format_version' = '2.2'
357+
);
358+
```
359+
360+
=== "Python"
361+
```python
362+
df.writeTo("documents") \
363+
.tableProperty("content.lance.encoding", "blob") \
364+
.tableProperty("file_format_version", "2.2") \
365+
.createOrReplace()
366+
```
367+
341368
## Large String Columns
342369

343370
Lance supports large string columns for storing very large text data. By default, Arrow uses `Utf8` (VarChar) type with 32-bit offsets, which limits total string data to 2GB per batch. For columns containing very large strings (e.g., document content, base64-encoded data), you can use `LargeUtf8` (LargeVarChar) with 64-bit offsets.

integration-tests/test_lance_spark.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ def _lance_storage_options(spark):
4646
}
4747

4848

49+
def _sql_binary_literal(data: bytes) -> str:
50+
return "X'" + data.hex().upper() + "'"
51+
52+
4953
def _table_location(spark, table_name):
5054
rows = (
5155
spark.sql(f"DESCRIBE EXTENDED {table_name}")
@@ -705,6 +709,72 @@ def test_compression_metadata_reaches_lance_file(self, spark):
705709
assert b"lance-encoding:compression" not in (id_meta or {})
706710

707711

712+
class TestDDLBlobV2:
713+
def test_blob_v2_table_reads_content_as_descriptor(self, spark):
714+
spark.sql("""
715+
CREATE TABLE default.test_blob_v2 (
716+
id INT NOT NULL,
717+
content BINARY
718+
) USING lance
719+
TBLPROPERTIES (
720+
'content.lance.encoding' = 'blob',
721+
'file_format_version' = '2.2'
722+
)
723+
""")
724+
725+
first_content = b"SQL insert content 1"
726+
second_content = b"SQL insert content 2"
727+
728+
spark.sql(
729+
f"INSERT INTO default.test_blob_v2 VALUES (1, {_sql_binary_literal(first_content)})"
730+
)
731+
spark.sql(
732+
f"INSERT INTO default.test_blob_v2 VALUES (2, {_sql_binary_literal(second_content)})"
733+
)
734+
735+
describe_rows = spark.sql("DESCRIBE default.test_blob_v2").collect()
736+
content_field = next(row for row in describe_rows if row.col_name == "content")
737+
content_type = content_field.data_type.lower()
738+
739+
assert "struct" in content_type
740+
assert "kind" in content_type
741+
assert "blob_uri" in content_type
742+
743+
rows = spark.sql("""
744+
SELECT id, content.size, content.kind, content.blob_id, content.blob_uri
745+
FROM default.test_blob_v2
746+
ORDER BY id
747+
""").collect()
748+
749+
assert len(rows) == 2
750+
751+
assert rows[0].id == 1
752+
assert rows[0].size == len(first_content)
753+
assert rows[0].kind == 0
754+
755+
assert rows[1].id == 2
756+
assert rows[1].size == len(second_content)
757+
assert rows[1].kind == 0
758+
759+
def test_blob_v2_insert_rejects_non_binary_content(self, spark):
760+
spark.sql("""
761+
CREATE TABLE default.test_blob_v2_bad_insert (
762+
id INT NOT NULL,
763+
content BINARY
764+
) USING lance
765+
TBLPROPERTIES (
766+
'content.lance.encoding' = 'blob',
767+
'file_format_version' = '2.2'
768+
)
769+
""")
770+
771+
with pytest.raises(Exception, match="got string"):
772+
spark.sql("""
773+
INSERT INTO default.test_blob_v2_bad_insert
774+
VALUES (1, 'not-binary')
775+
""")
776+
777+
708778
class TestDDLIndex:
709779
"""Test DDL index operations: CREATE INDEX (BTree, FTS)."""
710780

lance-spark-base_2.12/src/main/java/org/lance/spark/BaseLanceNamespaceSparkCatalog.java

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -599,7 +599,9 @@ public Table createTable(
599599
// Build the table ID for credential vending
600600
List<String> tableIdList = buildTableId(actualIdent);
601601

602-
StructType processedSchema = SchemaConverter.processSchemaWithProperties(schema, properties);
602+
String fileFormatVersion = catalogConfig.getFileFormatVersion(properties);
603+
StructType processedSchema =
604+
SchemaConverter.processSchemaWithProperties(schema, properties, fileFormatVersion);
603605

604606
// Create dataset using namespace - WriteDatasetBuilder handles declareTable internally
605607
// and properly leverages namespace client for credential vending
@@ -613,7 +615,6 @@ public Table createTable(
613615
.mode(WriteParams.WriteMode.CREATE)
614616
.enableStableRowIds(catalogConfig.isEnableStableRowIds(properties))
615617
.storageOptions(catalogConfig.getStorageOptions());
616-
String fileFormatVersion = catalogConfig.getFileFormatVersion(properties);
617618
if (fileFormatVersion != null) {
618619
writeBuilder.dataStorageVersion(fileFormatVersion);
619620
}
@@ -672,12 +673,13 @@ private Table createTableAtPath(
672673
throws TableAlreadyExistsException {
673674
String datasetUri = getDatasetUri(ident);
674675

675-
StructType processedSchema = SchemaConverter.processSchemaWithProperties(schema, properties);
676+
String fileFormatVersion = catalogConfig.getFileFormatVersion(properties);
677+
StructType processedSchema =
678+
SchemaConverter.processSchemaWithProperties(schema, properties, fileFormatVersion);
676679
LanceSparkReadOptions readOptions =
677680
createReadOptions(
678681
datasetUri, catalogConfig, Optional.empty(), Optional.empty(), Optional.empty(), name);
679682

680-
String fileFormatVersion = catalogConfig.getFileFormatVersion(properties);
681683
Map<String, String> tableProperties = copyUserTableProperties(properties);
682684
try {
683685
WriteDatasetBuilder writeBuilder =
@@ -895,7 +897,9 @@ public StagedTable stageCreate(
895897

896898
Identifier actualIdent = transformIdentifierForApi(ident);
897899
List<String> tableIdList = buildTableId(actualIdent);
898-
StructType processedSchema = SchemaConverter.processSchemaWithProperties(schema, properties);
900+
String fileFormatVersion = catalogConfig.getFileFormatVersion(properties);
901+
StructType processedSchema =
902+
SchemaConverter.processSchemaWithProperties(schema, properties, fileFormatVersion);
899903

900904
DeclareTableRequest declareRequest = new DeclareTableRequest();
901905
tableIdList.forEach(declareRequest::addIdItem);
@@ -925,7 +929,6 @@ public StagedTable stageCreate(
925929
managedVersioning);
926930
StagedCommit stagedCommit = StagedCommit.forNewTable(arrowSchema, location, commitOptions);
927931
stagedCommit.setShardingSpec(shardingSpec);
928-
String fileFormatVersion = catalogConfig.getFileFormatVersion(properties);
929932
return createStagedDataset(
930933
readOptions,
931934
processedSchema,
@@ -946,7 +949,9 @@ private StagedTable stageCreateAtPath(
946949
Map<String, String> properties,
947950
ShardingSpec shardingSpec) {
948951
String datasetUri = getDatasetUri(ident);
949-
StructType processedSchema = SchemaConverter.processSchemaWithProperties(schema, properties);
952+
String fileFormatVersion = catalogConfig.getFileFormatVersion(properties);
953+
StructType processedSchema =
954+
SchemaConverter.processSchemaWithProperties(schema, properties, fileFormatVersion);
950955

951956
LanceSparkReadOptions readOptions =
952957
createReadOptions(
@@ -958,7 +963,6 @@ private StagedTable stageCreateAtPath(
958963
catalogConfig.getStorageOptions(), catalogConfig.isEnableStableRowIds(properties));
959964
StagedCommit stagedCommit = StagedCommit.forNewTable(arrowSchema, datasetUri, commitOptions);
960965
stagedCommit.setShardingSpec(shardingSpec);
961-
String fileFormatVersion = catalogConfig.getFileFormatVersion(properties);
962966
return createStagedDataset(
963967
readOptions,
964968
processedSchema,
@@ -986,7 +990,9 @@ public StagedTable stageReplace(
986990

987991
ResolvedTable resolved = resolveIdentifier(ident);
988992
DescribeTableResponse describeResponse = resolved.describeResponse;
989-
StructType processedSchema = SchemaConverter.processSchemaWithProperties(schema, properties);
993+
String fileFormatVersion = catalogConfig.getFileFormatVersion(properties);
994+
StructType processedSchema =
995+
SchemaConverter.processSchemaWithProperties(schema, properties, fileFormatVersion);
990996
Map<String, String> initialStorageOptions = describeResponse.getStorageOptions();
991997
boolean managedVersioning = Boolean.TRUE.equals(describeResponse.getManagedVersioning());
992998

@@ -1004,7 +1010,6 @@ public StagedTable stageReplace(
10041010
StagedCommit stagedCommit = StagedCommit.forExistingTable(ds, arrowSchema, commitOptions);
10051011
stagedCommit.setShardingSpec(shardingSpec);
10061012
// Use specified file format version, or fall back to existing table's version
1007-
String fileFormatVersion = catalogConfig.getFileFormatVersion(properties);
10081013
if (fileFormatVersion == null) {
10091014
fileFormatVersion = ds.getLanceFileFormatVersion();
10101015
}
@@ -1029,7 +1034,9 @@ private StagedTable stageReplaceAtPath(
10291034
ShardingSpec shardingSpec)
10301035
throws NoSuchTableException {
10311036
String datasetUri = getDatasetUri(ident);
1032-
StructType processedSchema = SchemaConverter.processSchemaWithProperties(schema, properties);
1037+
String fileFormatVersion = catalogConfig.getFileFormatVersion(properties);
1038+
StructType processedSchema =
1039+
SchemaConverter.processSchemaWithProperties(schema, properties, fileFormatVersion);
10331040

10341041
LanceSparkReadOptions readOptions =
10351042
createReadOptions(
@@ -1049,7 +1056,6 @@ private StagedTable stageReplaceAtPath(
10491056
StagedCommit stagedCommit = StagedCommit.forExistingTable(ds, arrowSchema, commitOptions);
10501057
stagedCommit.setShardingSpec(shardingSpec);
10511058
// Use specified file format version, or fall back to existing table's version
1052-
String fileFormatVersion = catalogConfig.getFileFormatVersion(properties);
10531059
if (fileFormatVersion == null) {
10541060
fileFormatVersion = ds.getLanceFileFormatVersion();
10551061
}
@@ -1086,7 +1092,9 @@ public StagedTable stageCreateOrReplace(
10861092

10871093
Identifier actualIdent = transformIdentifierForApi(ident);
10881094
List<String> tableIdList = buildTableId(actualIdent);
1089-
StructType processedSchema = SchemaConverter.processSchemaWithProperties(schema, properties);
1095+
String fileFormatVersion = catalogConfig.getFileFormatVersion(properties);
1096+
StructType processedSchema =
1097+
SchemaConverter.processSchemaWithProperties(schema, properties, fileFormatVersion);
10901098

10911099
boolean exists = tableExists(ident);
10921100
String location;
@@ -1120,7 +1128,6 @@ public StagedTable stageCreateOrReplace(
11201128

11211129
Schema arrowSchema = LanceArrowUtils.toArrowSchema(processedSchema, "UTC", true);
11221130
// Use specified file format version, or fall back to existing table's version
1123-
String fileFormatVersion = catalogConfig.getFileFormatVersion(properties);
11241131
Map<String, String> merged =
11251132
LanceRuntime.mergeStorageOptions(catalogConfig.getStorageOptions(), initialStorageOptions);
11261133
final StagedCommitOptions commitOptions =
@@ -1161,7 +1168,9 @@ private StagedTable stageCreateOrReplaceAtPath(
11611168
Map<String, String> properties,
11621169
ShardingSpec shardingSpec) {
11631170
String datasetUri = getDatasetUri(ident);
1164-
StructType processedSchema = SchemaConverter.processSchemaWithProperties(schema, properties);
1171+
String fileFormatVersion = catalogConfig.getFileFormatVersion(properties);
1172+
StructType processedSchema =
1173+
SchemaConverter.processSchemaWithProperties(schema, properties, fileFormatVersion);
11651174

11661175
LanceSparkReadOptions readOptions =
11671176
createReadOptions(
@@ -1174,7 +1183,6 @@ private StagedTable stageCreateOrReplaceAtPath(
11741183
catalogConfig.getStorageOptions(), catalogConfig.isEnableStableRowIds(properties));
11751184
StagedCommit stagedCommit;
11761185
// Use specified file format version, or fall back to existing table's version
1177-
String fileFormatVersion = catalogConfig.getFileFormatVersion(properties);
11781186
if (exists) {
11791187
Dataset ds = Utils.openDatasetBuilder(readOptions).build();
11801188
stagedCommit = StagedCommit.forExistingTable(ds, arrowSchema, commitOptions);

lance-spark-base_2.12/src/main/java/org/lance/spark/LanceDataset.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.lance.spark.utils.BlobSourceContext;
1919
import org.lance.spark.utils.BlobUtils;
2020
import org.lance.spark.write.AddColumnsBackfillWrite;
21+
import org.lance.spark.write.LanceWriteSchemaValidator;
2122
import org.lance.spark.write.SparkWrite;
2223
import org.lance.spark.write.StagedCommit;
2324
import org.lance.spark.write.UpdateColumnsBackfillWrite;
@@ -60,6 +61,14 @@ public class LanceDataset
6061
ImmutableSet.of(
6162
TableCapability.BATCH_READ, TableCapability.BATCH_WRITE, TableCapability.TRUNCATE);
6263

64+
// Blob v2 is read as descriptor structs, but written as BINARY from sparkSchema.
65+
private static final Set<TableCapability> CAPABILITIES_WITH_BLOB_V2 =
66+
ImmutableSet.of(
67+
TableCapability.BATCH_READ,
68+
TableCapability.BATCH_WRITE,
69+
TableCapability.TRUNCATE,
70+
TableCapability.ACCEPT_ANY_SCHEMA);
71+
6372
public static final MetadataColumn FRAGMENT_ID_COLUMN =
6473
new MetadataColumn() {
6574
@Override
@@ -318,11 +327,14 @@ public Map<String, String> properties() {
318327

319328
@Override
320329
public Set<TableCapability> capabilities() {
321-
return CAPABILITIES;
330+
return BlobUtils.hasBlobV2Fields(sparkSchema) ? CAPABILITIES_WITH_BLOB_V2 : CAPABILITIES;
322331
}
323332

324333
@Override
325334
public WriteBuilder newWriteBuilder(LogicalWriteInfo logicalWriteInfo) {
335+
if (capabilities().contains(TableCapability.ACCEPT_ANY_SCHEMA)) {
336+
LanceWriteSchemaValidator.validate(sparkSchema, logicalWriteInfo.schema());
337+
}
326338
// Merge write-time options with the base options from read options
327339
CaseInsensitiveStringMap sparkWriteOptions = logicalWriteInfo.options();
328340
Map<String, BlobSourceContext> blobSourceContexts = decodeBlobSourceContexts(sparkWriteOptions);

0 commit comments

Comments
 (0)