Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.lance.spark;

/** Concrete implementation of BaseBlobJoinTest for Spark 3.4. */
public class BlobJoinTest extends BaseBlobJoinTest {
// All test methods are inherited from BaseBlobJoinTest
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.lance.spark;

/** Concrete implementation of BaseBlobJoinTest for Spark 3.5. */
public class BlobJoinTest extends BaseBlobJoinTest {
// All test methods are inherited from BaseBlobJoinTest
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.lance.spark.vectorized.LanceArrowColumnVector;

import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.UInt8Vector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.StructVector;
import org.apache.arrow.vector.ipc.ArrowReader;
Expand All @@ -32,10 +33,11 @@
import org.apache.spark.sql.vectorized.ColumnarMap;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.Set;

public class LanceFragmentColumnarBatchScanner implements AutoCloseable {
private final LanceFragmentScanner fragmentScanner;
Expand All @@ -62,29 +64,90 @@ public boolean loadNextBatch() throws IOException {

if (hasNext) {
VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
List<FieldVector> rootVectors = root.getFieldVectors();
int rowCount = root.getRowCount();

Set<String> blobColumnNames = fragmentScanner.getBlobColumnNames();
long[] rowAddresses = extractRowAddresses(rootVectors, blobColumnNames, rowCount);

List<ColumnVector> fieldVectors =
root.getFieldVectors().stream()
.map(LanceArrowColumnVector::new)
.collect(Collectors.toList());
buildColumnVectors(rootVectors, blobColumnNames, rowAddresses);

// Add virtual columns for blob metadata
addBlobVirtualColumns(fieldVectors, root, fragmentScanner.getInputPartition());

if (fragmentScanner.withFragemtId()) {
if (fragmentScanner.withFragmentId()) {
ConstantColumnVector fragmentVector =
new ConstantColumnVector(root.getRowCount(), DataTypes.IntegerType);
new ConstantColumnVector(rowCount, DataTypes.IntegerType);
fragmentVector.setInt(fragmentScanner.fragmentId());
fieldVectors.add(fragmentVector);
}

currentColumnarBatch =
new ColumnarBatch(fieldVectors.toArray(new ColumnVector[] {}), root.getRowCount());
new ColumnarBatch(fieldVectors.toArray(new ColumnVector[] {}), rowCount);
return true;
}
return false;
}

/**
* Extracts row addresses from the {@code _rowaddr} column appended by the native scanner. Row
* addresses are needed to construct blob references that allow the write side to fetch actual
* blob bytes from the source dataset.
*/
private long[] extractRowAddresses(
List<FieldVector> rootVectors, Set<String> blobColumnNames, int rowCount) {
if (blobColumnNames.isEmpty()) {
return null;
}
for (FieldVector fv : rootVectors) {
if (LanceConstant.ROW_ADDRESS.equals(fv.getField().getName()) && fv instanceof UInt8Vector) {
UInt8Vector rowAddrVector = (UInt8Vector) fv;
long[] rowAddresses = new long[rowCount];
for (int i = 0; i < rowCount; i++) {
rowAddresses[i] = rowAddrVector.get(i);
}
Comment on lines +107 to +109
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per-row BigInteger allocation extracting row addresses: extractRowAddresses() runs once per Arrow batch and loops over every row calling rowAddrVector.getObjectNoOverflow(i), which returns a freshly allocated BigInteger that is immediately reduced to a long. For a multi-million-row scan with a blob column this is millions of throwaway BigInteger allocations purely to read a 64-bit value, adding avoidable GC pressure on the scan hot path.

Fix: Use UInt8Vector.get(i), which returns a primitive long directly, instead of getObjectNoOverflow(i).longValue().

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — now uses UInt8Vector.get(i) which returns a primitive long directly, avoiding per-row BigInteger allocation.

return rowAddresses;
}
}
return null;
}

/**
* Builds the list of Spark {@link ColumnVector}s from Arrow field vectors. For blob columns, sets
* the blob reference context so that {@code getBinary()} returns a compact blob reference instead
* of empty bytes.
*/
private List<ColumnVector> buildColumnVectors(
List<FieldVector> rootVectors, Set<String> blobColumnNames, long[] rowAddresses) {
boolean stripRowAddr = fragmentScanner.isWithRowAddrForBlobs();
List<ColumnVector> fieldVectors = new ArrayList<>(rootVectors.size());

for (int i = 0; i < rootVectors.size(); i++) {
FieldVector fv = rootVectors.get(i);

if (stripRowAddr
&& i == rootVectors.size() - 1
&& LanceConstant.ROW_ADDRESS.equals(fv.getField().getName())) {
continue;
}

LanceArrowColumnVector colVec = new LanceArrowColumnVector(fv);

if (rowAddresses != null && blobColumnNames.contains(fv.getField().getName())) {
BlobStructAccessor blobAccessor = colVec.getBlobStructAccessor();
if (blobAccessor != null) {
blobAccessor.setBlobReferenceContext(
fragmentScanner.getDatasetUri(), fv.getField().getName(), rowAddresses);
}
}

fieldVectors.add(colVec);
}

return fieldVectors;
}

/**
* @return the current batch, the caller responsible for closing the batch
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.lance.spark.LanceRuntime;
import org.lance.spark.LanceSparkReadOptions;
import org.lance.spark.read.LanceInputPartition;
import org.lance.spark.utils.BlobUtils;
import org.lance.spark.utils.Utils;

import org.apache.arrow.vector.ipc.ArrowReader;
Expand All @@ -29,51 +30,55 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

public class LanceFragmentScanner implements AutoCloseable {
private final Dataset dataset;
private final LanceScanner scanner;
private final int fragmentId;
private final boolean withFragemtId;
private final boolean withFragmentId;
private final LanceInputPartition inputPartition;
private final long datasetOpenTimeNs;
private final long scannerCreateTimeNs;

/**
* Whether the scanner requested _rowaddr for blob reference support. When true, the last column
* in the Arrow batch is the row address column (appended by the native scanner).
*/
private final boolean withRowAddrForBlobs;

/** The names of blob columns in the projected schema. */
private final Set<String> blobColumnNames;

private LanceFragmentScanner(
Dataset dataset,
LanceScanner scanner,
int fragmentId,
boolean withFragmentId,
LanceInputPartition inputPartition,
long datasetOpenTimeNs,
long scannerCreateTimeNs) {
long scannerCreateTimeNs,
boolean withRowAddrForBlobs,
Set<String> blobColumnNames) {
this.dataset = dataset;
this.scanner = scanner;
this.fragmentId = fragmentId;
this.withFragemtId = withFragmentId;
this.withFragmentId = withFragmentId;
this.inputPartition = inputPartition;
this.datasetOpenTimeNs = datasetOpenTimeNs;
this.scannerCreateTimeNs = scannerCreateTimeNs;
this.withRowAddrForBlobs = withRowAddrForBlobs;
this.blobColumnNames = blobColumnNames;
}

public static LanceFragmentScanner create(int fragmentId, LanceInputPartition inputPartition) {
Dataset dataset = null;
LanceScanner lanceScanner = null;
try {
LanceSparkReadOptions readOptions = inputPartition.getReadOptions();
// Optionally rebuild the namespace client on the executor so the dataset open routes through
// Utils.OpenDatasetBuilder's namespaceClient branch. This preserves the storage options
// provider on the Rust side, which refreshes short-lived vended credentials (e.g. STS
// tokens) during long-running scans. The price is an eager describeTable() RPC against the
// namespace on every fragment open.
//
// For catalogs whose backing service authenticates per-call (e.g. Hive Metastore over
// Kerberos) executors typically lack a TGT and that RPC fails with "GSS initiate failed".
// Setting LanceSparkReadOptions.CONFIG_EXECUTOR_CREDENTIAL_REFRESH=false makes executors
// skip the rebuild and open the dataset by URI using the initialStorageOptions the driver
// already obtained, at the cost of losing the Rust-side credential refresh callback.
if (inputPartition.getNamespaceImpl() != null && readOptions.isExecutorCredentialRefresh()) {
if (LanceRuntime.useNamespaceOnWorkers(inputPartition.getNamespaceImpl())) {
readOptions.setNamespace(
Expand All @@ -93,18 +98,17 @@ public static LanceFragmentScanner create(int fragmentId, LanceInputPartition in
if (fragment == null) {
throw new IllegalStateException(
String.format(
"Fragment %d not found in dataset at %s (version=%s)",
"Fragment %d not found in dataset at %s " + "(version=%s)",
fragmentId, readOptions.getDatasetUri(), readOptions.getVersion()));
}
ScanOptions.Builder scanOptions = new ScanOptions.Builder();

// Detect blob columns in the schema
Set<String> blobColumnNames = getBlobColumnNames(inputPartition.getSchema());
boolean hasBlobColumns = !blobColumnNames.isEmpty();

List<String> projectedColumns = getColumnNames(inputPartition.getSchema());
if (projectedColumns.isEmpty() && inputPartition.getSchema().isEmpty()) {
// Lance requires at least one projected column. Use _rowid as a lightweight
// sentinel so the scanner still returns the correct row count (e.g. SELECT 1).
// Only do this when the schema is truly empty; when the schema contains virtual
// columns (e.g. _fragid, blob position/size) that are not passed to the scanner
// but added later by the batch scanner, adding _rowid here would shift column
// indices and cause Spark to read wrong data.
scanOptions.withRowId(true);
}
scanOptions.columns(projectedColumns);
Expand All @@ -114,12 +118,6 @@ public static LanceFragmentScanner create(int fragmentId, LanceInputPartition in
scanOptions.batchSize(readOptions.getBatchSize());
if (readOptions.getNearest() != null) {
scanOptions.nearest(readOptions.getNearest());
// We strictly set `prefilter = true` here to ensure query correctness.
// This is necessary due to the combination of two factors:
// 1. Spark currently performs the vector search by individually scanning each fragment.
// 2. Lance mandates that `prefilter` must be enabled for fragmented vector queries.
// If Spark's execution model or Lance's search functionality changes in the future,
// we need to revisit this.
scanOptions.prefilter(true);
}
if (inputPartition.getLimit().isPresent()) {
Expand All @@ -131,6 +129,14 @@ public static LanceFragmentScanner create(int fragmentId, LanceInputPartition in
if (inputPartition.getTopNSortOrders().isPresent()) {
scanOptions.setColumnOrderings(inputPartition.getTopNSortOrders().get());
}

boolean userRequestedRowAddr =
inputPartition.getSchema().getFieldIndex(LanceConstant.ROW_ADDRESS).nonEmpty();
boolean withRowAddrForBlobs = hasBlobColumns && !userRequestedRowAddr;
if (hasBlobColumns || userRequestedRowAddr) {
scanOptions.withRowAddress(true);
}

boolean withFragmentId =
inputPartition.getSchema().getFieldIndex(LanceConstant.FRAGMENT_ID).nonEmpty();
long scanCreateStart = System.nanoTime();
Expand All @@ -143,7 +149,9 @@ public static LanceFragmentScanner create(int fragmentId, LanceInputPartition in
withFragmentId,
inputPartition,
dsOpenTimeNs,
scanCreateTimeNs);
scanCreateTimeNs,
withRowAddrForBlobs,
blobColumnNames);
} catch (Throwable throwable) {
if (lanceScanner != null) {
try {
Expand Down Expand Up @@ -209,8 +217,8 @@ public int fragmentId() {
return fragmentId;
}

public boolean withFragemtId() {
return withFragemtId;
public boolean withFragmentId() {
return withFragmentId;
}

public LanceInputPartition getInputPartition() {
Expand All @@ -225,20 +233,37 @@ public long getScannerCreateTimeNs() {
return scannerCreateTimeNs;
}

/**
* Builds the projection column list for the scanner. Regular data columns come first, followed by
* special metadata columns in the order matching {@link
* org.lance.spark.LanceDataset#METADATA_COLUMNS}. All special columns (_rowid, _rowaddr, version
* columns) go through scanner.project() for consistent output ordering.
*/
/** Whether the scanner implicitly requested _rowaddr for blob reference support. */
public boolean isWithRowAddrForBlobs() {
return withRowAddrForBlobs;
}

/** Returns the blob column names in the projected schema. */
public Set<String> getBlobColumnNames() {
return blobColumnNames;
}

/** Returns the dataset URI for blob references. */
public String getDatasetUri() {
return inputPartition.getReadOptions().getDatasetUri();
}

private static Set<String> getBlobColumnNames(StructType schema) {
Set<String> blobColumns = new HashSet<>();
for (StructField field : schema.fields()) {
if (BlobUtils.isBlobSparkField(field)) {
blobColumns.add(field.name());
}
}
return blobColumns;
}

private static List<String> getColumnNames(StructType schema) {
// Collect all field names in the schema for quick lookup
java.util.Set<String> schemaFields = new java.util.HashSet<>();
for (StructField field : schema.fields()) {
schemaFields.add(field.name());
}

// Regular data columns (exclude all special/metadata columns)
List<String> columns =
Arrays.stream(schema.fields())
.map(StructField::name)
Expand All @@ -253,7 +278,6 @@ private static List<String> getColumnNames(StructType schema) {
&& !name.endsWith(LanceConstant.BLOB_SIZE_SUFFIX))
.collect(Collectors.toList());

// Append special columns in METADATA_COLUMNS order (must match Rust scanner output order)
if (schemaFields.contains(LanceConstant.ROW_ID)) {
columns.add(LanceConstant.ROW_ID);
}
Expand Down
Loading
Loading