Skip to content

Commit e6dc9d7

Browse files
infvgzhouyuan
andcommitted
Enable enhanced tests for spark 4.0 & fix failures
Co-authored-by: Yuan <yuanzhou@apache.org>
1 parent 999ba5f commit e6dc9d7

5 files changed

Lines changed: 113 additions & 5 deletions

File tree

.github/workflows/velox_backend_enhanced.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ jobs:
298298
java -version
299299
$MVN_CMD clean test -Pspark-4.0 -Pscala-2.13 -Pjava-17 -Pbackends-velox -Piceberg \
300300
-Pspark-ut -DargLine="-Dspark.test.home=/opt/shims/spark40/spark_home/" \
301-
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.EnhancedFeaturesTest,org.apache.gluten.tags.SkipTest
301+
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTest
302302
- name: Upload test report
303303
if: always()
304304
uses: actions/upload-artifact@v4

backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergColumnarBatchDataWriter.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ case class IcebergColumnarBatchDataWriter(
4444
}
4545

4646
override def write(batch: ColumnarBatch): Unit = {
47+
// Pass the original batch to native code
48+
// The native code will use the schema (writeSchema) we provided during initialization
49+
// to determine which columns to write, effectively filtering out metadata columns
50+
// like __row_operation, _file, _pos that Spark 4.0 adds
4751
val batchHandle = ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName, batch)
4852
jniWrapper.write(writer, batchHandle)
4953
}

backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/AbstractIcebergWriteExec.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,23 @@ import org.apache.spark.sql.types.StructType
2424
import org.apache.iceberg.spark.source.IcebergWriteUtil
2525
import org.apache.iceberg.types.TypeUtil
2626

27+
import scala.collection.JavaConverters._
28+
2729
abstract class AbstractIcebergWriteExec extends IcebergWriteExec {
2830

2931
// the writer factory works for both batch and streaming
3032
private def createIcebergDataWriteFactory(schema: StructType): IcebergDataWriteFactory = {
3133
val writeSchema = IcebergWriteUtil.getWriteSchema(write)
3234
val nestedField = TypeUtil.visit(writeSchema, new IcebergNestedFieldVisitor)
35+
// Filter out metadata columns from the Spark output schema and reorder to match Iceberg schema
36+
// Spark 4.0 may include metadata columns in the output schema during UPDATE operations,
37+
// but these should not be written to the Iceberg table
38+
val schemaFieldMap = schema.fields.map(f => f.name -> f).toMap
39+
val filteredFields =
40+
writeSchema.columns().asScala.flatMap(icebergCol => schemaFieldMap.get(icebergCol.name()))
41+
val filteredSchema = StructType(filteredFields.toArray)
3342
IcebergDataWriteFactory(
34-
schema,
43+
filteredSchema,
3544
getFileFormat(IcebergWriteUtil.getFileFormat(write)),
3645
IcebergWriteUtil.getDirectory(write),
3746
getCodec,

backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,4 +383,59 @@ class VeloxIcebergSuite extends IcebergSuite {
383383
}
384384
}
385385
}
386+
387+
test("iceberg read cow table - update after schema evolution") {
388+
withTable("iceberg_cow_update_evolved_tb") {
389+
spark.sql("""
390+
|create table iceberg_cow_update_evolved_tb (
391+
| id int,
392+
| name string,
393+
| age int
394+
|) using iceberg
395+
|tblproperties (
396+
| 'format-version' = '2',
397+
| 'write.delete.mode' = 'copy-on-write',
398+
| 'write.update.mode' = 'copy-on-write',
399+
| 'write.merge.mode' = 'copy-on-write'
400+
|)
401+
|""".stripMargin)
402+
403+
spark.sql("""
404+
|alter table iceberg_cow_update_evolved_tb
405+
|add columns (salary decimal(10, 2))
406+
|""".stripMargin)
407+
408+
spark.sql("""
409+
|insert into table iceberg_cow_update_evolved_tb values
410+
| (1, 'Name1', 23, 3400.00),
411+
| (2, 'Name2', 30, 5500.00),
412+
| (3, 'Name3', 35, 6500.00)
413+
|""".stripMargin)
414+
415+
val df = spark.sql("""
416+
|update iceberg_cow_update_evolved_tb
417+
|set name = 'Name4'
418+
|where id = 1
419+
|""".stripMargin)
420+
421+
assert(
422+
df.queryExecution.executedPlan
423+
.asInstanceOf[CommandResultExec]
424+
.commandPhysicalPlan
425+
.isInstanceOf[VeloxIcebergReplaceDataExec])
426+
427+
checkAnswer(
428+
spark.sql("""
429+
|select id, name, age, salary
430+
|from iceberg_cow_update_evolved_tb
431+
|order by id
432+
|""".stripMargin),
433+
Seq(
434+
Row(1, "Name4", 23, new java.math.BigDecimal("3400.00")),
435+
Row(2, "Name2", 30, new java.math.BigDecimal("5500.00")),
436+
Row(3, "Name1", 35, new java.math.BigDecimal("6500.00"))
437+
)
438+
)
439+
}
440+
}
386441
}

cpp/velox/compute/iceberg/IcebergWriter.cc

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,15 @@ std::shared_ptr<IcebergInsertTableHandle> createIcebergInsertTableHandle(
135135
for (const auto& field : spec->fields) {
136136
partitionColumns.push_back(field.name);
137137
}
138+
139+
// Validate that nestedField.children size matches columnNames size
140+
VELOX_CHECK_EQ(
141+
nestedField.children.size(),
142+
columnNames.size(),
143+
"Mismatch between nestedField children size ({}) and column names size ({})",
144+
nestedField.children.size(),
145+
columnNames.size());
146+
138147
for (auto i = 0; i < columnNames.size(); ++i) {
139148
if (std::find(partitionColumns.begin(), partitionColumns.end(), columnNames[i]) != partitionColumns.end()) {
140149
columnHandles.push_back(
@@ -154,10 +163,10 @@ std::shared_ptr<IcebergInsertTableHandle> createIcebergInsertTableHandle(
154163
nestedField.children[i]));
155164
}
156165
}
157-
166+
158167
auto fileNameGenerator = std::make_shared<const GlutenIcebergFileNameGenerator>(
159168
partitionId, taskId, operationId, fileFormat);
160-
169+
161170
std::shared_ptr<const connector::hive::LocationHandle> locationHandle =
162171
std::make_shared<connector::hive::LocationHandle>(
163172
outputDirectoryPath, outputDirectoryPath, connector::hive::LocationHandle::TableType::kExisting);
@@ -212,7 +221,38 @@ IcebergWriter::IcebergWriter(
212221
}
213222

214223
void IcebergWriter::write(const VeloxColumnarBatch& batch) {
215-
dataSink_->appendData(batch.getRowVector());
224+
auto inputRowVector = batch.getRowVector();
225+
auto inputRowType = asRowType(inputRowVector->type());
226+
227+
if (inputRowType->size() != rowType_->size()) {
228+
VELOX_CHECK_GT(
229+
inputRowType->size(), rowType_->size(), "Input schema has fewer columns than expected output schema");
230+
231+
std::vector<VectorPtr> filteredChildren;
232+
filteredChildren.reserve(rowType_->size());
233+
const size_t offset = inputRowType->size() - rowType_->size();
234+
235+
for (size_t i = 0; i < rowType_->size(); i++) {
236+
VELOX_CHECK_EQ(
237+
inputRowType->nameOf(i + offset),
238+
rowType_->nameOf(i),
239+
"Column name mismatch at position {}: expected '{}', got '{}'. "
240+
"Metadata columns should be at the beginning.",
241+
i + offset,
242+
rowType_->nameOf(i),
243+
inputRowType->nameOf(i + offset));
244+
filteredChildren.push_back(inputRowVector->childAt(i + offset));
245+
}
246+
247+
// Create a new RowVector with filtered columns
248+
auto filteredRowVector = std::make_shared<RowVector>(
249+
pool_.get(), rowType_, inputRowVector->nulls(), inputRowVector->size(), std::move(filteredChildren));
250+
251+
dataSink_->appendData(filteredRowVector);
252+
} else {
253+
// No filtering needed, schemas match
254+
dataSink_->appendData(inputRowVector);
255+
}
216256
}
217257

218258
std::vector<std::string> IcebergWriter::commit() {

0 commit comments

Comments
 (0)