Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package org.lance.spark;

import org.lance.spark.read.LanceMetadataColumns;
import org.lance.spark.read.LanceScanBuilder;
import org.lance.spark.utils.BlobUtils;
import org.lance.spark.write.AddColumnsBackfillWrite;
Expand Down Expand Up @@ -57,90 +58,6 @@ public class LanceDataset
ImmutableSet.of(
TableCapability.BATCH_READ, TableCapability.BATCH_WRITE, TableCapability.TRUNCATE);

public static final MetadataColumn FRAGMENT_ID_COLUMN =
new MetadataColumn() {
@Override
public String name() {
return LanceConstant.FRAGMENT_ID;
}

@Override
public DataType dataType() {
return DataTypes.IntegerType;
}

@Override
public boolean isNullable() {
return false;
}
};

public static final MetadataColumn ROW_ID_COLUMN =
new MetadataColumn() {
@Override
public String name() {
return LanceConstant.ROW_ID;
}

@Override
public DataType dataType() {
return DataTypes.LongType;
}
};

public static final MetadataColumn ROW_ADDRESS_COLUMN =
new MetadataColumn() {
@Override
public String name() {
return LanceConstant.ROW_ADDRESS;
}

@Override
public DataType dataType() {
return DataTypes.LongType;
}

@Override
public boolean isNullable() {
return false;
}
};

public static final MetadataColumn ROW_LAST_UPDATED_AT_VERSION_COLUMN =
new MetadataColumn() {
@Override
public String name() {
return LanceConstant.ROW_LAST_UPDATED_AT_VERSION;
}

@Override
public DataType dataType() {
return DataTypes.LongType;
}
};

public static final MetadataColumn ROW_CREATED_AT_VERSION_COLUMN =
new MetadataColumn() {
@Override
public String name() {
return LanceConstant.ROW_CREATED_AT_VERSION;
}

@Override
public DataType dataType() {
return DataTypes.LongType;
}
};

public static final MetadataColumn[] METADATA_COLUMNS =
new MetadataColumn[] {
ROW_ID_COLUMN,
ROW_ADDRESS_COLUMN,
ROW_LAST_UPDATED_AT_VERSION_COLUMN,
ROW_CREATED_AT_VERSION_COLUMN,
FRAGMENT_ID_COLUMN
};

protected final LanceSparkReadOptions readOptions;
protected final StructType sparkSchema;

Expand Down Expand Up @@ -384,7 +301,7 @@ public WriteBuilder newWriteBuilder(LogicalWriteInfo logicalWriteInfo) {
public MetadataColumn[] metadataColumns() {
// Start with the base metadata columns
List<MetadataColumn> columns = new ArrayList<>();
for (MetadataColumn col : METADATA_COLUMNS) {
for (MetadataColumn col : LanceMetadataColumns.ALL) {
columns.add(col);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import org.lance.spark.LanceRuntime;
import org.lance.spark.LanceSparkReadOptions;
import org.lance.spark.read.LanceInputPartition;
import org.lance.spark.read.LanceMetadataColumns;
import org.lance.spark.utils.Utils;

import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.spark.sql.connector.catalog.MetadataColumn;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

Expand Down Expand Up @@ -183,9 +185,8 @@ public LanceInputPartition getInputPartition() {

/**
* Builds the projection column list for the scanner. Regular data columns come first, followed by
* special metadata columns in the order matching {@link
* org.lance.spark.LanceDataset#METADATA_COLUMNS}. All special columns (_rowid, _rowaddr, version
* columns) go through scanner.project() for consistent output ordering.
* special metadata columns in the order declared in {@link LanceMetadataColumns#ALL} — that
* ordering must match the Rust scanner's output so Spark's batch layout lines up.
*/
private static List<String> getColumnNames(StructType schema) {
// Collect all field names in the schema for quick lookup
Expand All @@ -200,27 +201,16 @@ private static List<String> getColumnNames(StructType schema) {
.map(StructField::name)
.filter(
name ->
!name.equals(LanceConstant.FRAGMENT_ID)
&& !name.equals(LanceConstant.ROW_ID)
&& !name.equals(LanceConstant.ROW_ADDRESS)
&& !name.equals(LanceConstant.ROW_CREATED_AT_VERSION)
&& !name.equals(LanceConstant.ROW_LAST_UPDATED_AT_VERSION)
!LanceMetadataColumns.allNames().contains(name)
&& !name.endsWith(LanceConstant.BLOB_POSITION_SUFFIX)
&& !name.endsWith(LanceConstant.BLOB_SIZE_SUFFIX))
.collect(Collectors.toList());

// Append special columns in METADATA_COLUMNS order (must match Rust scanner output order)
if (schemaFields.contains(LanceConstant.ROW_ID)) {
columns.add(LanceConstant.ROW_ID);
}
if (schemaFields.contains(LanceConstant.ROW_ADDRESS)) {
columns.add(LanceConstant.ROW_ADDRESS);
}
if (schemaFields.contains(LanceConstant.ROW_LAST_UPDATED_AT_VERSION)) {
columns.add(LanceConstant.ROW_LAST_UPDATED_AT_VERSION);
}
if (schemaFields.contains(LanceConstant.ROW_CREATED_AT_VERSION)) {
columns.add(LanceConstant.ROW_CREATED_AT_VERSION);
// Append scanner-projectable metadata columns in registry order (matches Rust scanner output).
for (MetadataColumn col : LanceMetadataColumns.PROJECTABLE) {
if (schemaFields.contains(col.name())) {
columns.add(col.name());
}
}

return columns;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.lance.spark.read;

import org.lance.spark.LanceConstant;

import org.apache.spark.sql.connector.catalog.MetadataColumn;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;

import java.util.Arrays;
import java.util.Collections;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Central registry of the metadata (virtual) columns that Lance surfaces on {@code
* SupportsMetadataColumns}. The registry has two responsibilities that must stay in lock-step with
* each other:
*
* <ol>
* <li>declaring the {@link MetadataColumn} instances exposed to Spark's analyzer via {@link
* #ALL};
* <li>driving the scanner-level projection in {@code LanceFragmentScanner} — each column must be
* excluded from the "regular columns" list and appended to the Arrow output list in the same
* order as {@link #ALL} so Spark's batch layout lines up with the Rust scanner's output.
* </ol>
*
* Adding a new metadata column is a single-line addition below plus an {@link #ALL} entry; callers
* do not need to touch {@code LanceDataset} or the scanner.
*/
public final class LanceMetadataColumns {

private LanceMetadataColumns() {}

public static final MetadataColumn ROW_ID =
column(LanceConstant.ROW_ID, DataTypes.LongType, true);
public static final MetadataColumn ROW_ADDRESS =
column(LanceConstant.ROW_ADDRESS, DataTypes.LongType, false);
public static final MetadataColumn ROW_LAST_UPDATED_AT_VERSION =
column(LanceConstant.ROW_LAST_UPDATED_AT_VERSION, DataTypes.LongType, true);
public static final MetadataColumn ROW_CREATED_AT_VERSION =
column(LanceConstant.ROW_CREATED_AT_VERSION, DataTypes.LongType, true);
public static final MetadataColumn FRAGMENT_ID =
column(LanceConstant.FRAGMENT_ID, DataTypes.IntegerType, false);

/**
* All metadata columns registered with Spark's analyzer. Used both by {@code
* LanceDataset.metadataColumns()} and by {@code LanceFragmentScanner} to exclude these names from
* the regular data-column projection.
*/
public static final MetadataColumn[] ALL =
new MetadataColumn[] {
ROW_ID, ROW_ADDRESS, ROW_LAST_UPDATED_AT_VERSION, ROW_CREATED_AT_VERSION, FRAGMENT_ID
};

/**
* Metadata columns that flow through the native scanner's column-projection list, in the order
* the Rust scanner emits them. Excludes columns that are computed per-fragment outside the
* scanner (currently {@link #FRAGMENT_ID}, derived from the partition's fragment id).
*/
public static final MetadataColumn[] PROJECTABLE =
new MetadataColumn[] {
ROW_ID, ROW_ADDRESS, ROW_LAST_UPDATED_AT_VERSION, ROW_CREATED_AT_VERSION
};

private static final Set<String> ALL_NAMES =
Collections.unmodifiableSet(
Arrays.stream(ALL).map(MetadataColumn::name).collect(Collectors.toSet()));

/** Returns the set of registered metadata column names, for fast membership checks. */
public static Set<String> allNames() {
return ALL_NAMES;
}

private static MetadataColumn column(String name, DataType type, boolean nullable) {
return new MetadataColumn() {
@Override
public String name() {
return name;
}

@Override
public DataType dataType() {
return type;
}

@Override
public boolean isNullable() {
return nullable;
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import org.lance.Dataset;
import org.lance.Fragment;
import org.lance.spark.LanceDataset;
import org.lance.spark.LanceConstant;
import org.lance.spark.LanceRuntime;
import org.lance.spark.LanceSparkWriteOptions;
import org.lance.spark.utils.Utils;
Expand Down Expand Up @@ -78,16 +78,15 @@ protected AbstractBackfillWriter(
List<String> tableId) {
this.writeOptions = writeOptions;
this.schema = schema;
this.fragmentIdField = schema.fieldIndex(LanceDataset.FRAGMENT_ID_COLUMN.name());
this.fragmentIdField = schema.fieldIndex(LanceConstant.FRAGMENT_ID);
this.initialStorageOptions = initialStorageOptions;
this.namespaceImpl = namespaceImpl;
this.namespaceProperties = namespaceProperties;
this.tableId = tableId;

StructType ws = new StructType();
for (org.apache.spark.sql.types.StructField f : schema.fields()) {
if (targetColumns.contains(f.name())
|| f.name().equals(LanceDataset.ROW_ADDRESS_COLUMN.name())) {
if (targetColumns.contains(f.name()) || f.name().equals(LanceConstant.ROW_ADDRESS)) {
ws = ws.add(f);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.lance.Transaction;
import org.lance.fragment.FragmentMergeResult;
import org.lance.operation.Merge;
import org.lance.spark.LanceDataset;
import org.lance.spark.LanceConstant;
import org.lance.spark.LanceSparkWriteOptions;
import org.lance.spark.utils.Utils;

Expand Down Expand Up @@ -184,10 +184,7 @@ public AddColumnsWriter(
@Override
protected void processFragment(Fragment fragment, ArrowArrayStream stream) {
FragmentMergeResult result =
fragment.mergeColumns(
stream,
LanceDataset.ROW_ADDRESS_COLUMN.name(),
LanceDataset.ROW_ADDRESS_COLUMN.name());
fragment.mergeColumns(stream, LanceConstant.ROW_ADDRESS, LanceConstant.ROW_ADDRESS);
fragments.add(result.getFragmentMetadata());
mergedSchema = result.getSchema().asArrowSchema();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.lance.Transaction;
import org.lance.fragment.FragmentUpdateResult;
import org.lance.operation.Update;
import org.lance.spark.LanceDataset;
import org.lance.spark.LanceConstant;
import org.lance.spark.LanceSparkWriteOptions;
import org.lance.spark.utils.Utils;

Expand Down Expand Up @@ -191,10 +191,7 @@ public UpdateColumnsWriter(
@Override
protected void processFragment(Fragment fragment, ArrowArrayStream stream) {
FragmentUpdateResult result =
fragment.updateColumns(
stream,
LanceDataset.ROW_ADDRESS_COLUMN.name(),
LanceDataset.ROW_ADDRESS_COLUMN.name());
fragment.updateColumns(stream, LanceConstant.ROW_ADDRESS, LanceConstant.ROW_ADDRESS);
updatedFragments.add(result.getUpdatedFragment());
fieldsModified = result.getFieldsModified();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ case class AddColumnsBackfillExec(
// Add Project if source relation has more fields
val needFields = query.output.filter(p =>
columnNames.contains(p.name)
|| LanceDataset.ROW_ADDRESS_COLUMN.name().equals(p.name)
|| LanceDataset.FRAGMENT_ID_COLUMN.name().equals(p.name))
|| LanceConstant.ROW_ADDRESS.equals(p.name)
|| LanceConstant.FRAGMENT_ID.equals(p.name))

val actualQuery = if (needFields.length != query.output.length) {
Project(needFields, query)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.lance.{CommitBuilder, Dataset, Transaction}
import org.lance.index.{Index, IndexOptions, IndexParams, IndexType}
import org.lance.index.scalar.{BTreeIndexParams, ScalarIndexParams}
import org.lance.operation.{CreateIndex => AddIndexOperation}
import org.lance.spark.{BaseLanceNamespaceSparkCatalog, LanceDataset, LanceRuntime, LanceSparkReadOptions}
import org.lance.spark.{BaseLanceNamespaceSparkCatalog, LanceConstant, LanceDataset, LanceRuntime, LanceSparkReadOptions}
import org.lance.spark.arrow.LanceArrowWriter
import org.lance.spark.utils.{CloseableUtil, Utils}

Expand Down Expand Up @@ -369,7 +369,7 @@ class RangeBasedBTreeIndexJob(
// Read specific column and _rowid from dataset
val df = session.table(fullTableName)
val selectDf =
df.select(df.col(columns.head).as(VALUE_COLUMN_NAME), df.col(LanceDataset.ROW_ID_COLUMN.name))
df.select(df.col(columns.head).as(VALUE_COLUMN_NAME), df.col(LanceConstant.ROW_ID))

// Repartition the data to numRanges and sort by indexed column
val rangeDf = selectDf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ case class UpdateColumnsBackfillExec(
// Add Project if source relation has more fields
val needFields = query.output.filter(p =>
columnNames.contains(p.name)
|| LanceDataset.ROW_ADDRESS_COLUMN.name().equals(p.name)
|| LanceDataset.FRAGMENT_ID_COLUMN.name().equals(p.name))
|| LanceConstant.ROW_ADDRESS.equals(p.name)
|| LanceConstant.FRAGMENT_ID.equals(p.name))

val actualQuery = if (needFields.length != query.output.length) {
Project(needFields, query)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void testGetColumnNamesWithVersionColumns() throws Exception {

@Test
public void testGetColumnNamesWithAllMetadataColumns() throws Exception {
// Test with all metadata columns in the order defined in LanceDataset.METADATA_COLUMNS
// Test with all metadata columns in the order defined in LanceMetadataColumns.ALL
StructType schema =
new StructType(
new StructField[] {
Expand Down
Loading