Skip to content

Commit 2cd8627

Browse files
beinanclaude
andcommitted
feat: preserve blob data through Spark shuffle during JOIN + INSERT INTO
When blob columns flow through Spark's shuffle (e.g., INSERT INTO target SELECT ... FROM source_a JOIN source_b), the actual blob data was previously lost. This PR introduces a blob reference mechanism that preserves blob data through shuffle without materializing the full blob bytes. Read side: blob columns serialize compact ~100-byte BlobReference descriptors (LANCEREF magic + dataset URI + column name + row address) instead of empty bytes. The scanner requests _rowaddr when blob columns are present and strips it from the output. Write side: LargeBinaryWriter detects BlobReference headers, buffers them during setValue(), then batch-resolves all references in finish() via a single takeBlobs() call per (dataset, column) group. Dataset instances are cached across batches for the task lifetime. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent d813238 commit 2cd8627

12 files changed

Lines changed: 1092 additions & 41 deletions

File tree

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package org.lance.spark;
15+
16+
/** Concrete implementation of BaseBlobJoinTest for Spark 3.4. */
17+
public class BlobJoinTest extends BaseBlobJoinTest {
18+
// All test methods are inherited from BaseBlobJoinTest
19+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package org.lance.spark;
15+
16+
/** Concrete implementation of BaseBlobJoinTest for Spark 3.5. */
17+
public class BlobJoinTest extends BaseBlobJoinTest {
18+
// All test methods are inherited from BaseBlobJoinTest
19+
}

lance-spark-base_2.12/src/main/java/org/lance/spark/internal/LanceFragmentColumnarBatchScanner.java

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.lance.spark.vectorized.LanceArrowColumnVector;
2020

2121
import org.apache.arrow.vector.FieldVector;
22+
import org.apache.arrow.vector.UInt8Vector;
2223
import org.apache.arrow.vector.VectorSchemaRoot;
2324
import org.apache.arrow.vector.complex.StructVector;
2425
import org.apache.arrow.vector.ipc.ArrowReader;
@@ -36,6 +37,7 @@
3637
import java.util.HashMap;
3738
import java.util.List;
3839
import java.util.Map;
40+
import java.util.Set;
3941

4042
public class LanceFragmentColumnarBatchScanner implements AutoCloseable {
4143
private final LanceFragmentScanner fragmentScanner;
@@ -117,6 +119,10 @@ private List<ColumnVector> buildSparkOrderedVectors(
117119
actualFields.put(rootVectors.get(i).getField().getName(), rootVectors.get(i));
118120
}
119121

122+
// Extract row addresses for blob reference support
123+
Set<String> blobColumnNames = fragmentScanner.getBlobColumnNames();
124+
long[] rowAddresses = extractRowAddresses(rootVectors, blobColumnNames, root.getRowCount());
125+
120126
List<ColumnVector> fieldVectors = new ArrayList<>(schema.size());
121127
StructField[] fields = schema.fields();
122128
for (StructField field : fields) {
@@ -150,12 +156,46 @@ private List<ColumnVector> buildSparkOrderedVectors(
150156
throw new IllegalStateException(
151157
"Lance scan did not return expected field '" + fieldName + "'");
152158
}
153-
fieldVectors.add(new LanceArrowColumnVector(vector));
159+
LanceArrowColumnVector colVec = new LanceArrowColumnVector(vector);
160+
161+
// Set blob reference context so getBinary() produces blob references
162+
if (rowAddresses != null && blobColumnNames.contains(fieldName)) {
163+
BlobStructAccessor blobAccessor = colVec.getBlobStructAccessor();
164+
if (blobAccessor != null) {
165+
blobAccessor.setBlobReferenceContext(
166+
fragmentScanner.getDatasetUri(), fieldName, rowAddresses);
167+
}
168+
}
169+
170+
fieldVectors.add(colVec);
154171
}
155172
}
156173
return fieldVectors;
157174
}
158175

176+
/**
177+
* Extracts row addresses from the {@code _rowaddr} column appended by the native scanner. Row
178+
* addresses are needed to construct blob references that allow the write side to fetch actual
179+
* blob bytes from the source dataset.
180+
*/
181+
private long[] extractRowAddresses(
182+
List<FieldVector> rootVectors, Set<String> blobColumnNames, int rowCount) {
183+
if (blobColumnNames.isEmpty()) {
184+
return null;
185+
}
186+
for (FieldVector fv : rootVectors) {
187+
if (LanceConstant.ROW_ADDRESS.equals(fv.getField().getName()) && fv instanceof UInt8Vector) {
188+
UInt8Vector rowAddrVector = (UInt8Vector) fv;
189+
long[] rowAddresses = new long[rowCount];
190+
for (int i = 0; i < rowCount; i++) {
191+
rowAddresses[i] = rowAddrVector.get(i);
192+
}
193+
return rowAddresses;
194+
}
195+
}
196+
return null;
197+
}
198+
159199
// Virtual column vector for blob position
160200
private static class BlobPositionColumnVector extends ColumnVector {
161201
private final BlobStructAccessor accessor;

lance-spark-base_2.12/src/main/java/org/lance/spark/internal/LanceFragmentScanner.java

Lines changed: 61 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.lance.spark.LanceRuntime;
2222
import org.lance.spark.LanceSparkReadOptions;
2323
import org.lance.spark.read.LanceInputPartition;
24+
import org.lance.spark.utils.BlobUtils;
2425
import org.lance.spark.utils.Utils;
2526

2627
import org.apache.arrow.vector.ipc.ArrowReader;
@@ -29,51 +30,55 @@
2930

3031
import java.io.IOException;
3132
import java.util.Arrays;
33+
import java.util.HashSet;
3234
import java.util.List;
35+
import java.util.Set;
3336
import java.util.stream.Collectors;
3437

3538
public class LanceFragmentScanner implements AutoCloseable {
3639
private final Dataset dataset;
3740
private final LanceScanner scanner;
3841
private final int fragmentId;
39-
private final boolean withFragemtId;
42+
private final boolean withFragmentId;
4043
private final LanceInputPartition inputPartition;
4144
private final long datasetOpenTimeNs;
4245
private final long scannerCreateTimeNs;
4346

47+
/**
48+
* Whether the scanner requested _rowaddr for blob reference support. When true, the _rowaddr
49+
* column in the Arrow batch was implicitly added and should be stripped from user-visible output.
50+
*/
51+
private final boolean withRowAddrForBlobs;
52+
53+
/** The names of blob columns in the projected schema. */
54+
private final Set<String> blobColumnNames;
55+
4456
private LanceFragmentScanner(
4557
Dataset dataset,
4658
LanceScanner scanner,
4759
int fragmentId,
4860
boolean withFragmentId,
4961
LanceInputPartition inputPartition,
5062
long datasetOpenTimeNs,
51-
long scannerCreateTimeNs) {
63+
long scannerCreateTimeNs,
64+
boolean withRowAddrForBlobs,
65+
Set<String> blobColumnNames) {
5266
this.dataset = dataset;
5367
this.scanner = scanner;
5468
this.fragmentId = fragmentId;
55-
this.withFragemtId = withFragmentId;
69+
this.withFragmentId = withFragmentId;
5670
this.inputPartition = inputPartition;
5771
this.datasetOpenTimeNs = datasetOpenTimeNs;
5872
this.scannerCreateTimeNs = scannerCreateTimeNs;
73+
this.withRowAddrForBlobs = withRowAddrForBlobs;
74+
this.blobColumnNames = blobColumnNames;
5975
}
6076

6177
public static LanceFragmentScanner create(int fragmentId, LanceInputPartition inputPartition) {
6278
Dataset dataset = null;
6379
LanceScanner lanceScanner = null;
6480
try {
6581
LanceSparkReadOptions readOptions = inputPartition.getReadOptions();
66-
// Optionally rebuild the namespace client on the executor so the dataset open routes through
67-
// Utils.OpenDatasetBuilder's namespaceClient branch. This preserves the storage options
68-
// provider on the Rust side, which refreshes short-lived vended credentials (e.g. STS
69-
// tokens) during long-running scans. The price is an eager describeTable() RPC against the
70-
// namespace on every fragment open.
71-
//
72-
// For catalogs whose backing service authenticates per-call (e.g. Hive Metastore over
73-
// Kerberos) executors typically lack a TGT and that RPC fails with "GSS initiate failed".
74-
// Setting LanceSparkReadOptions.CONFIG_EXECUTOR_CREDENTIAL_REFRESH=false makes executors
75-
// skip the rebuild and open the dataset by URI using the initialStorageOptions the driver
76-
// already obtained, at the cost of losing the Rust-side credential refresh callback.
7782
if (inputPartition.getNamespaceImpl() != null && readOptions.isExecutorCredentialRefresh()) {
7883
if (LanceRuntime.useNamespaceOnWorkers(inputPartition.getNamespaceImpl())) {
7984
readOptions.setNamespace(
@@ -97,31 +102,34 @@ public static LanceFragmentScanner create(int fragmentId, LanceInputPartition in
97102
fragmentId, readOptions.getDatasetUri(), readOptions.getVersion()));
98103
}
99104
ScanOptions.Builder scanOptions = new ScanOptions.Builder();
105+
106+
// Detect blob columns in the schema
107+
Set<String> blobColumnNames = getBlobColumnNames(inputPartition.getSchema());
108+
boolean hasBlobColumns = !blobColumnNames.isEmpty();
109+
100110
List<String> projectedColumns = getColumnNames(inputPartition.getSchema());
101111
if (projectedColumns.isEmpty() && inputPartition.getSchema().isEmpty()) {
102-
// Lance requires at least one projected column. Use _rowid as a lightweight
103-
// sentinel so the scanner still returns the correct row count (e.g. SELECT 1).
104112
scanOptions.withRowId(true);
105113
}
106114
if (hasField(inputPartition.getSchema(), LanceConstant.ROW_ID)) {
107115
scanOptions.withRowId(true);
108116
}
109-
if (hasField(inputPartition.getSchema(), LanceConstant.ROW_ADDRESS)) {
117+
118+
// Request _rowaddr when blob columns are present so we can build blob references.
119+
boolean userRequestedRowAddr =
120+
hasField(inputPartition.getSchema(), LanceConstant.ROW_ADDRESS);
121+
boolean withRowAddrForBlobs = hasBlobColumns && !userRequestedRowAddr;
122+
if (hasBlobColumns || userRequestedRowAddr) {
110123
scanOptions.withRowAddress(true);
111124
}
125+
112126
scanOptions.columns(projectedColumns);
113127
if (inputPartition.getWhereCondition().isPresent()) {
114128
scanOptions.filter(inputPartition.getWhereCondition().get());
115129
}
116130
scanOptions.batchSize(readOptions.getBatchSize());
117131
if (readOptions.getNearest() != null) {
118132
scanOptions.nearest(readOptions.getNearest());
119-
// We strictly set `prefilter = true` here to ensure query correctness.
120-
// This is necessary due to the combination of two factors:
121-
// 1. Spark currently performs the vector search by individually scanning each fragment.
122-
// 2. Lance mandates that `prefilter` must be enabled for fragmented vector queries.
123-
// If Spark's execution model or Lance's search functionality changes in the future,
124-
// we need to revisit this.
125133
scanOptions.prefilter(true);
126134
}
127135
if (inputPartition.getLimit().isPresent()) {
@@ -145,7 +153,9 @@ public static LanceFragmentScanner create(int fragmentId, LanceInputPartition in
145153
withFragmentId,
146154
inputPartition,
147155
dsOpenTimeNs,
148-
scanCreateTimeNs);
156+
scanCreateTimeNs,
157+
withRowAddrForBlobs,
158+
blobColumnNames);
149159
} catch (Throwable throwable) {
150160
if (lanceScanner != null) {
151161
try {
@@ -211,8 +221,8 @@ public int fragmentId() {
211221
return fragmentId;
212222
}
213223

214-
public boolean withFragemtId() {
215-
return withFragemtId;
224+
public boolean withFragmentId() {
225+
return withFragmentId;
216226
}
217227

218228
public LanceInputPartition getInputPartition() {
@@ -227,19 +237,37 @@ public long getScannerCreateTimeNs() {
227237
return scannerCreateTimeNs;
228238
}
229239

230-
/**
231-
* Builds the projection column list for the scanner. Row ID and row address are requested through
232-
* explicit scan flags so Lance computes them from the active fragment metadata instead of reading
233-
* them as regular columns.
234-
*/
240+
/** Whether the scanner implicitly requested _rowaddr for blob reference support. */
241+
public boolean isWithRowAddrForBlobs() {
242+
return withRowAddrForBlobs;
243+
}
244+
245+
/** Returns the blob column names in the projected schema. */
246+
public Set<String> getBlobColumnNames() {
247+
return blobColumnNames;
248+
}
249+
250+
/** Returns the dataset URI for blob references. */
251+
public String getDatasetUri() {
252+
return inputPartition.getReadOptions().getDatasetUri();
253+
}
254+
255+
private static Set<String> getBlobColumnNames(StructType schema) {
256+
Set<String> blobColumns = new HashSet<>();
257+
for (StructField field : schema.fields()) {
258+
if (BlobUtils.isBlobSparkField(field)) {
259+
blobColumns.add(field.name());
260+
}
261+
}
262+
return blobColumns;
263+
}
264+
235265
private static List<String> getColumnNames(StructType schema) {
236-
// Collect all field names in the schema for quick lookup
237266
java.util.Set<String> schemaFields = new java.util.HashSet<>();
238267
for (StructField field : schema.fields()) {
239268
schemaFields.add(field.name());
240269
}
241270

242-
// Regular data columns (exclude all special/metadata columns)
243271
List<String> columns =
244272
Arrays.stream(schema.fields())
245273
.map(StructField::name)

0 commit comments

Comments
 (0)