Skip to content

Commit faad82c

Browse files
malinjawiMohammad Linjawi
andauthored
[CORE] Fix Delta 4.0 Spark 4.1 package build (#12078)
* Fix Delta 4.0 Spark 4.1 package build * Use Delta 4.1 for Spark 4.1 profile * Align Delta tests with Spark 4.1 errors * Add Spark 4.1 Delta compile coverage * Trigger CI * Address Spark 4.1 Delta review comments * Move Spark 4.1 MemoryStream shim to Delta tests * Move MemoryStream compatibility to Spark 4.1 shim --------- Co-authored-by: Mohammad Linjawi <Mohammad.Linjawi@ibm.com>
1 parent d2c6f38 commit faad82c

6 files changed

Lines changed: 60 additions & 17 deletions

File tree

.github/workflows/velox_backend_x86.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1432,7 +1432,7 @@ jobs:
14321432
export PATH=$JAVA_HOME/bin:$PATH
14331433
java -version
14341434
$MVN_CMD clean test -Pspark-4.1 -Pscala-2.13 -Pjava-17 -Pbackends-velox \
1435-
-Pspark-ut -DargLine="-Dspark.test.home=/opt/shims/spark41/spark_home/" \
1435+
-Pspark-ut -Pdelta -DargLine="-Dspark.test.home=/opt/shims/spark41/spark_home/" \
14361436
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.spark.tags.SlowHiveTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.EnhancedFeaturesTest,org.apache.gluten.tags.SkipTest
14371437
- name: Upload test report
14381438
if: always()

backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/GlutenDeltaParquetFileFormat.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*/
1717
package org.apache.spark.sql.delta
1818

19-
import org.apache.spark.internal.{LoggingShims, MDC}
19+
import org.apache.spark.internal.{Logging, MDC => SparkMDC}
2020
import org.apache.spark.sql.SparkSession
2121
import org.apache.spark.sql.catalyst.InternalRow
2222
import org.apache.spark.sql.delta.GlutenDeltaParquetFileFormat._
@@ -62,7 +62,7 @@ case class GlutenDeltaParquetFileFormat(
6262
tablePath: Option[String] = None,
6363
isCDCRead: Boolean = false)
6464
extends GlutenParquetFileFormat
65-
with LoggingShims {
65+
with Logging {
6666
// Validate either we have all arguments for DV enabled read or none of them.
6767
if (hasTablePath) {
6868
SparkSession.getActiveSession.map { session =>
@@ -528,7 +528,7 @@ case class GlutenDeltaParquetFileFormat(
528528
case AlwaysTrue() => Some(AlwaysTrue())
529529
case AlwaysFalse() => Some(AlwaysFalse())
530530
case _ =>
531-
logError(log"Failed to translate filter ${MDC(DeltaLogKeys.FILTER, filter)}")
531+
logError(log"Failed to translate filter ${SparkMDC.of(DeltaLogKeys.FILTER, filter)}")
532532
None
533533
}
534534
}

backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.gluten.execution.datasource.GlutenFormatFactory
2424
import org.apache.gluten.extension.columnar.transition.{Convention, Transitions}
2525

2626
import org.apache.spark._
27-
import org.apache.spark.internal.{LoggingShims, MDC}
27+
import org.apache.spark.internal.{Logging, MDC => SparkMDC}
2828
import org.apache.spark.internal.io.{FileCommitProtocol, SparkHadoopWriterUtils}
2929
import org.apache.spark.shuffle.FetchFailedException
3030
import org.apache.spark.sql.SparkSession
@@ -63,7 +63,7 @@ import java.util.{Date, UUID}
6363
* values to data files. Specifically L123-126, L132, and L140 where it adds option
6464
* WRITE_PARTITION_COLUMNS
6565
*/
66-
object GlutenDeltaFileFormatWriter extends LoggingShims {
66+
object GlutenDeltaFileFormatWriter extends Logging {
6767

6868
/**
6969
* A variable used in tests to check whether the output ordering of the query matches the
@@ -343,20 +343,20 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
343343
val ret = f
344344
val commitMsgs = ret.map(_.commitMsg)
345345

346-
logInfo(log"Start to commit write Job ${MDC(DeltaLogKeys.JOB_ID, description.uuid)}.")
346+
logInfo(log"Start to commit write Job ${SparkMDC.of(DeltaLogKeys.JOB_ID, description.uuid)}.")
347347
val (_, duration) = Utils.timeTakenMs { committer.commitJob(job, commitMsgs) }
348-
logInfo(log"Write Job ${MDC(DeltaLogKeys.JOB_ID, description.uuid)} committed. " +
349-
log"Elapsed time: ${MDC(DeltaLogKeys.DURATION, duration)} ms.")
348+
logInfo(log"Write Job ${SparkMDC.of(DeltaLogKeys.JOB_ID, description.uuid)} committed. " +
349+
log"Elapsed time: ${SparkMDC.of(DeltaLogKeys.DURATION, duration)} ms.")
350350

351351
processStats(description.statsTrackers, ret.map(_.summary.stats), duration)
352352
logInfo(log"Finished processing stats for write job " +
353-
log"${MDC(DeltaLogKeys.JOB_ID, description.uuid)}.")
353+
log"${SparkMDC.of(DeltaLogKeys.JOB_ID, description.uuid)}.")
354354

355355
// return a set of all the partition paths that were updated during this job
356356
ret.map(_.summary.updatedPartitions).reduceOption(_ ++ _).getOrElse(Set.empty)
357357
} catch {
358358
case cause: Throwable =>
359-
logError(log"Aborting job ${MDC(DeltaLogKeys.JOB_ID, description.uuid)}", cause)
359+
logError(log"Aborting job ${SparkMDC.of(DeltaLogKeys.JOB_ID, description.uuid)}", cause)
360360
committer.abortJob(job)
361361
throw cause
362362
}
@@ -490,7 +490,7 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
490490
})(catchBlock = {
491491
// If there is an error, abort the task
492492
dataWriter.abort()
493-
logError(log"Job ${MDC(DeltaLogKeys.JOB_ID, jobId)} aborted.")
493+
logError(log"Job ${SparkMDC.of(DeltaLogKeys.JOB_ID, jobId)} aborted.")
494494
}, finallyBlock = {
495495
dataWriter.close()
496496
})

backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuiteShims.scala

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,19 @@
1717
package org.apache.spark.sql.delta
1818

1919
object DeltaInsertIntoTableSuiteShims {
20-
val INSERT_INTO_TMP_VIEW_ERROR_MSG = "[EXPECT_TABLE_NOT_VIEW.NO_ALTERNATIVE]"
20+
private val isSpark41 = org.apache.spark.SPARK_VERSION.startsWith("4.1")
2121

22-
// Spark 4.0.1 reports non-constant defaults with NOT_CONSTANT.
23-
val INVALID_COLUMN_DEFAULT_VALUE_ERROR_MSG = "INVALID_DEFAULT_VALUE.NOT_CONSTANT"
22+
val INSERT_INTO_TMP_VIEW_ERROR_MSG =
23+
if (isSpark41) {
24+
"[TABLE_OR_VIEW_NOT_FOUND]"
25+
} else {
26+
"[EXPECT_TABLE_NOT_VIEW.NO_ALTERNATIVE]"
27+
}
28+
29+
val INVALID_COLUMN_DEFAULT_VALUE_ERROR_MSG =
30+
if (isSpark41) {
31+
"INVALID_DEFAULT_VALUE.UNRESOLVED_EXPRESSION"
32+
} else {
33+
"INVALID_DEFAULT_VALUE.NOT_CONSTANT"
34+
}
2435
}

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1351,8 +1351,8 @@
13511351
<sparkshim.artifactId>spark-sql-columnar-shims-spark41</sparkshim.artifactId>
13521352
<spark.version>4.1.1</spark.version>
13531353
<iceberg.version>1.10.0</iceberg.version>
1354-
<delta.package.name>delta-spark</delta.package.name>
1355-
<delta.version>4.0.0</delta.version>
1354+
<delta.package.name>delta-spark_4.1</delta.package.name>
1355+
<delta.version>4.1.0</delta.version>
13561356
<delta.binary.version>40</delta.binary.version>
13571357
<hudi.version>1.1.0</hudi.version>
13581358
<fasterxml.version>2.18.2</fasterxml.version>
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.execution.streaming
18+
19+
import org.apache.spark.sql.{Encoder, SQLContext}
20+
import org.apache.spark.sql.execution.streaming.runtime.{MemoryStream => RuntimeMemoryStream}
21+
22+
object MemoryStream {
23+
def apply[A: Encoder](implicit sqlContext: SQLContext): RuntimeMemoryStream[A] = {
24+
RuntimeMemoryStream[A](implicitly[Encoder[A]], sqlContext.sparkSession)
25+
}
26+
27+
def apply[A: Encoder](
28+
numPartitions: Int)(
29+
implicit sqlContext: SQLContext): RuntimeMemoryStream[A] = {
30+
RuntimeMemoryStream[A](numPartitions)(implicitly[Encoder[A]], sqlContext.sparkSession)
31+
}
32+
}

0 commit comments

Comments
 (0)