Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ public class LanceScanBuilder
private static final Logger LOG = LoggerFactory.getLogger(LanceScanBuilder.class);

private final LanceSparkReadOptions readOptions;

/** Full table schema before column pruning; used to widen nested structs for vectorized reads. */
private final StructType fullSchema;

private StructType schema;

private Filter[] pushedFilters = new Filter[0];
Expand Down Expand Up @@ -234,7 +237,7 @@ public Scan build() {

@Override
public void pruneColumns(StructType requiredSchema) {
this.schema = requiredSchema;
this.schema = ReadSchemaNestedStructWidening.widenRequiredSchema(requiredSchema, fullSchema);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.lance.spark.read;

import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.MapType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.HashMap;
import java.util.Map;

/**
* Aligns Spark's column-pruned read schema with full Arrow batches from Lance.
*
* <p>When {@link org.apache.spark.sql.connector.read.SupportsPushDownRequiredColumns#pruneColumns}
* narrows a nested {@link StructType} to a subset of fields (e.g. only one nested column),
* vectorized execution still uses child ordinals from that pruned struct. Lance always returns the
* full struct column from the scanner, and {@link org.lance.spark.vectorized.LanceStructAccessor}
* indexes Arrow children by full struct ordinal. Replacing pruned nested structs with the table's
* full struct definition restores ordinal agreement between Catalyst and Arrow.
*/
final class ReadSchemaNestedStructWidening {

private ReadSchemaNestedStructWidening() {}

static StructType widenRequiredSchema(StructType required, StructType tableFull) {
if (required == null || tableFull == null) {
return required;
}
Map<String, StructField> tableFieldsByName = indexByName(tableFull);
StructField[] widenedFields = new StructField[required.length()];
for (int fieldIndex = 0; fieldIndex < required.length(); fieldIndex++) {
StructField requiredField = required.fields()[fieldIndex];
StructField tableField = tableFieldsByName.get(requiredField.name());
widenedFields[fieldIndex] =
tableField != null ? widenField(requiredField, tableField) : requiredField;
}
return new StructType(widenedFields);
}

private static StructField widenField(StructField required, StructField tableFull) {
DataType widened = widenDataType(required.dataType(), tableFull.dataType());
return new StructField(required.name(), widened, required.nullable(), required.metadata());
}

private static DataType widenDataType(DataType required, DataType tableFull) {
if (required instanceof StructType && tableFull instanceof StructType) {
return widenStructType((StructType) required, (StructType) tableFull);
}
if (required instanceof ArrayType && tableFull instanceof ArrayType) {
return widenArrayType((ArrayType) required, (ArrayType) tableFull);
}
if (required instanceof MapType && tableFull instanceof MapType) {
return widenMapType((MapType) required, (MapType) tableFull);
}
return required;
}

/**
* Widens a pruned nested struct back to the full table struct field order and field set.
*
* <p>Only widens when {@code requiredStruct} is a name-subset of {@code tableStruct} — i.e. every
* field in the required schema also exists in the table schema. Fields present in the table but
* absent from the required schema are filled in from the table definition unchanged, preserving
* the Arrow child ordinals that {@link org.lance.spark.vectorized.LanceStructAccessor} relies on.
*/
private static DataType widenStructType(StructType requiredStruct, StructType tableStruct) {
if (tableStruct.size() < requiredStruct.size()) {
return requiredStruct;
}
Map<String, StructField> requiredFieldsByName = indexByName(requiredStruct);

// Single pass over table fields: validate all required names are present, then widen.
// Building only one map (required) avoids an extra O(n) allocation and pass over table fields.
StructField[] tableFields = tableStruct.fields();
StructField[] widenedFields = new StructField[tableFields.length];
int matchedRequiredFields = 0;
for (int fieldIndex = 0; fieldIndex < tableFields.length; fieldIndex++) {
StructField tableField = tableFields[fieldIndex];
StructField requiredField = requiredFieldsByName.get(tableField.name());
if (requiredField != null) {
widenedFields[fieldIndex] = widenField(requiredField, tableField);
matchedRequiredFields++;
} else {
widenedFields[fieldIndex] = tableField;
}
}
// If not all required fields were matched, the schemas diverge — do not widen.
if (matchedRequiredFields != requiredFieldsByName.size()) {
return requiredStruct;
}
return new StructType(widenedFields);
}

private static DataType widenArrayType(ArrayType requiredArray, ArrayType tableArray) {
DataType widenedElement = widenDataType(requiredArray.elementType(), tableArray.elementType());
return new ArrayType(widenedElement, requiredArray.containsNull());
}

private static DataType widenMapType(MapType requiredMap, MapType tableMap) {
DataType widenedKey = widenDataType(requiredMap.keyType(), tableMap.keyType());
DataType widenedValue = widenDataType(requiredMap.valueType(), tableMap.valueType());
return new MapType(widenedKey, widenedValue, requiredMap.valueContainsNull());
}

private static Map<String, StructField> indexByName(StructType struct) {
Map<String, StructField> index = new HashMap<>(struct.size() * 2);
for (StructField field : struct.fields()) {
index.put(field.name(), field);
}
return index;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.lance.spark.read;

import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class ReadSchemaNestedStructWideningTest {

/**
* Simulates selecting only the second nested field from a struct. The widened schema must restore
* both fields in table order so that {@code LanceStructAccessor} child ordinals remain correct.
*/
@Test
public void widensSubsetNestedStructToFullTableFieldOrderAndFields() {
StructType fullMetadataSchema =
new StructType(
new StructField[] {
DataTypes.createStructField("first", DataTypes.StringType, true),
DataTypes.createStructField("second", DataTypes.LongType, true),
});
StructType fullTableSchema =
new StructType(
new StructField[] {DataTypes.createStructField("metadata", fullMetadataSchema, true)});

StructType prunedMetadataSchema =
new StructType(
new StructField[] {
DataTypes.createStructField("second", DataTypes.LongType, true),
});
StructType prunedTableSchema =
new StructType(
new StructField[] {
DataTypes.createStructField("metadata", prunedMetadataSchema, true)
});

StructType widenedSchema =
ReadSchemaNestedStructWidening.widenRequiredSchema(prunedTableSchema, fullTableSchema);

assertEquals(fullTableSchema, widenedSchema);
}

/**
* Verifies that widening recurses into array element types: a pruned struct inside an array
* column is restored to the full struct so Arrow child ordinals are preserved.
*/
@Test
public void widensSubsetStructNestedInsideArrayElementType() {
StructType fullEventSchema =
new StructType(
new StructField[] {
DataTypes.createStructField("eventName", DataTypes.StringType, true),
DataTypes.createStructField("eventTimestamp", DataTypes.LongType, true),
});
StructType prunedEventSchema =
new StructType(
new StructField[] {
DataTypes.createStructField("eventTimestamp", DataTypes.LongType, true),
});

StructType fullTableSchema =
new StructType(
new StructField[] {
DataTypes.createStructField("events", new ArrayType(fullEventSchema, true), true),
});
StructType prunedTableSchema =
new StructType(
new StructField[] {
DataTypes.createStructField("events", new ArrayType(prunedEventSchema, true), true),
});

StructType widenedSchema =
ReadSchemaNestedStructWidening.widenRequiredSchema(prunedTableSchema, fullTableSchema);

assertEquals(fullTableSchema, widenedSchema);
}

/**
* Top-level column pruning (dropping entire columns) must not be reversed: if a query selects
* only {@code userId} from a table with {@code userId} and {@code sessionId}, the widened schema
* should still contain only {@code userId}. Lance handles top-level projection natively.
*/
@Test
public void doesNotWidenTopLevelColumnPruning() {
StructType fullTableSchema =
new StructType(
new StructField[] {
DataTypes.createStructField("userId", DataTypes.LongType, true),
DataTypes.createStructField("sessionId", DataTypes.LongType, true),
});
StructType prunedTableSchema =
new StructType(
new StructField[] {
DataTypes.createStructField("userId", DataTypes.LongType, true),
});

StructType widenedSchema =
ReadSchemaNestedStructWidening.widenRequiredSchema(prunedTableSchema, fullTableSchema);

assertEquals(prunedTableSchema, widenedSchema);
}

/**
* When the required schema contains a field not present in the table schema (e.g. schema
* evolution produced a diverged field name), the nested struct is left unchanged rather than
* widened, avoiding silent data corruption.
*/
@Test
public void doesNotWidenWhenRequiredFieldIsAbsentFromTableSchema() {
StructType fullMetadataSchema =
new StructType(
new StructField[] {
DataTypes.createStructField("first", DataTypes.StringType, true),
DataTypes.createStructField("second", DataTypes.LongType, true),
});
StructType fullTableSchema =
new StructType(
new StructField[] {DataTypes.createStructField("metadata", fullMetadataSchema, true)});

// requiredField "unknownField" does not exist in the table struct
StructType divergedMetadataSchema =
new StructType(
new StructField[] {
DataTypes.createStructField("unknownField", DataTypes.StringType, true),
});
StructType divergedTableSchema =
new StructType(
new StructField[] {
DataTypes.createStructField("metadata", divergedMetadataSchema, true)
});

StructType widenedSchema =
ReadSchemaNestedStructWidening.widenRequiredSchema(divergedTableSchema, fullTableSchema);

assertEquals(divergedTableSchema, widenedSchema);
}
}
Loading