Skip to content

Commit 0560d84

Browse files
Mohammad LinjawiMohammad Linjawi
authored andcommitted
Fix Delta 4.0 Spark 4.1 package build
1 parent ea4d893 commit 0560d84

5 files changed

Lines changed: 146 additions & 10 deletions

File tree

.github/workflows/velox_backend_x86.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1227,10 +1227,11 @@ jobs:
12271227
container: apache/gluten:centos-8-jdk17
12281228
steps:
12291229
- uses: actions/checkout@v4
1230-
- name: Build with fast-build profile (Spark 4.0, Java 17)
1230+
- name: Build with fast-build profile (Spark 4.0/4.1, Java 17)
12311231
run: |
12321232
cd $GITHUB_WORKSPACE/
12331233
$MVN_CMD clean test-compile -Pspark-4.0 -Pscala-2.13 -Pbackends-velox -Pspark-ut -Piceberg,iceberg-test,delta,paimon -Pfast-build
1234+
$MVN_CMD clean test-compile -Pspark-4.1 -Pscala-2.13 -Pjava-17 -Pbackends-velox -Pspark-ut -Pdelta -Pfast-build
12341235
12351236
spark-test-spark40:
12361237
needs: build-native-lib-centos-7

backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/GlutenDeltaParquetFileFormat.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*/
1717
package org.apache.spark.sql.delta
1818

19-
import org.apache.spark.internal.{LoggingShims, MDC}
19+
import org.apache.spark.internal.{LoggingShims, MDC => SparkMDC}
2020
import org.apache.spark.sql.SparkSession
2121
import org.apache.spark.sql.catalyst.InternalRow
2222
import org.apache.spark.sql.delta.GlutenDeltaParquetFileFormat._
@@ -528,7 +528,7 @@ case class GlutenDeltaParquetFileFormat(
528528
case AlwaysTrue() => Some(AlwaysTrue())
529529
case AlwaysFalse() => Some(AlwaysFalse())
530530
case _ =>
531-
logError(log"Failed to translate filter ${MDC(DeltaLogKeys.FILTER, filter)}")
531+
logError(log"Failed to translate filter ${SparkMDC.of(DeltaLogKeys.FILTER, filter)}")
532532
None
533533
}
534534
}

backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.gluten.execution.datasource.GlutenFormatFactory
2424
import org.apache.gluten.extension.columnar.transition.{Convention, Transitions}
2525

2626
import org.apache.spark._
27-
import org.apache.spark.internal.{LoggingShims, MDC}
27+
import org.apache.spark.internal.{LoggingShims, MDC => SparkMDC}
2828
import org.apache.spark.internal.io.{FileCommitProtocol, SparkHadoopWriterUtils}
2929
import org.apache.spark.shuffle.FetchFailedException
3030
import org.apache.spark.sql.SparkSession
@@ -343,20 +343,20 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
343343
val ret = f
344344
val commitMsgs = ret.map(_.commitMsg)
345345

346-
logInfo(log"Start to commit write Job ${MDC(DeltaLogKeys.JOB_ID, description.uuid)}.")
346+
logInfo(log"Start to commit write Job ${SparkMDC.of(DeltaLogKeys.JOB_ID, description.uuid)}.")
347347
val (_, duration) = Utils.timeTakenMs { committer.commitJob(job, commitMsgs) }
348-
logInfo(log"Write Job ${MDC(DeltaLogKeys.JOB_ID, description.uuid)} committed. " +
349-
log"Elapsed time: ${MDC(DeltaLogKeys.DURATION, duration)} ms.")
348+
logInfo(log"Write Job ${SparkMDC.of(DeltaLogKeys.JOB_ID, description.uuid)} committed. " +
349+
log"Elapsed time: ${SparkMDC.of(DeltaLogKeys.DURATION, duration)} ms.")
350350

351351
processStats(description.statsTrackers, ret.map(_.summary.stats), duration)
352352
logInfo(log"Finished processing stats for write job " +
353-
log"${MDC(DeltaLogKeys.JOB_ID, description.uuid)}.")
353+
log"${SparkMDC.of(DeltaLogKeys.JOB_ID, description.uuid)}.")
354354

355355
// return a set of all the partition paths that were updated during this job
356356
ret.map(_.summary.updatedPartitions).reduceOption(_ ++ _).getOrElse(Set.empty)
357357
} catch {
358358
case cause: Throwable =>
359-
logError(log"Aborting job ${MDC(DeltaLogKeys.JOB_ID, description.uuid)}", cause)
359+
logError(log"Aborting job ${SparkMDC.of(DeltaLogKeys.JOB_ID, description.uuid)}", cause)
360360
committer.abortJob(job)
361361
throw cause
362362
}
@@ -490,7 +490,7 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
490490
})(catchBlock = {
491491
// If there is an error, abort the task
492492
dataWriter.abort()
493-
logError(log"Job ${MDC(DeltaLogKeys.JOB_ID, jobId)} aborted.")
493+
logError(log"Job ${SparkMDC.of(DeltaLogKeys.JOB_ID, jobId)} aborted.")
494494
}, finallyBlock = {
495495
dataWriter.close()
496496
})
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.execution.streaming
18+
19+
import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager => Spark41CheckpointFileManager}
20+
21+
import org.apache.hadoop.conf.Configuration
22+
import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataInputStream, Path, PathFilter}
23+
24+
import java.io.OutputStream
25+
26+
/**
27+
* Binary compatibility shim for Delta 4.0, which was compiled against Spark 4.0's
28+
* CheckpointFileManager package before Spark 4.1 moved it under streaming.checkpointing.
29+
*/
30+
trait CheckpointFileManager {
31+
def createAtomic(
32+
path: Path,
33+
overwriteIfPossible: Boolean): CheckpointFileManager.CancellableFSDataOutputStream
34+
35+
def open(path: Path): FSDataInputStream
36+
37+
def list(path: Path, filter: PathFilter): Array[FileStatus]
38+
39+
def list(path: Path): Array[FileStatus] = {
40+
list(
41+
path,
42+
new PathFilter {
43+
override def accept(path: Path): Boolean = true
44+
})
45+
}
46+
47+
def mkdirs(path: Path): Unit
48+
49+
def exists(path: Path): Boolean
50+
51+
def delete(path: Path): Unit
52+
53+
def isLocal: Boolean
54+
55+
def createCheckpointDirectory(): Path
56+
}
57+
58+
object CheckpointFileManager {
59+
def create(path: Path, hadoopConf: Configuration): CheckpointFileManager = {
60+
new Spark41CheckpointFileManagerAdapter(
61+
Spark41CheckpointFileManager.create(path, hadoopConf))
62+
}
63+
64+
abstract class CancellableFSDataOutputStream(outputStream: OutputStream)
65+
extends org.apache.hadoop.fs.FSDataOutputStream(
66+
outputStream,
67+
null.asInstanceOf[FileSystem.Statistics]) {
68+
def cancel(): Unit
69+
}
70+
71+
private class Spark41CheckpointFileManagerAdapter(
72+
delegate: Spark41CheckpointFileManager)
73+
extends CheckpointFileManager {
74+
override def createAtomic(
75+
path: Path,
76+
overwriteIfPossible: Boolean): CancellableFSDataOutputStream = {
77+
new CancellableFSDataOutputStreamAdapter(delegate.createAtomic(path, overwriteIfPossible))
78+
}
79+
80+
override def open(path: Path): FSDataInputStream = delegate.open(path)
81+
82+
override def list(path: Path, filter: PathFilter): Array[FileStatus] =
83+
delegate.list(path, filter)
84+
85+
override def mkdirs(path: Path): Unit = delegate.mkdirs(path)
86+
87+
override def exists(path: Path): Boolean = delegate.exists(path)
88+
89+
override def delete(path: Path): Unit = delegate.delete(path)
90+
91+
override def isLocal: Boolean = delegate.isLocal
92+
93+
override def createCheckpointDirectory(): Path = delegate.createCheckpointDirectory()
94+
}
95+
96+
private class CancellableFSDataOutputStreamAdapter(
97+
delegate: Spark41CheckpointFileManager.CancellableFSDataOutputStream)
98+
extends CancellableFSDataOutputStream(delegate) {
99+
override def close(): Unit = delegate.close()
100+
101+
override def cancel(): Unit = delegate.cancel()
102+
}
103+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.execution.streaming
18+
19+
import org.apache.spark.sql.{Encoder, SQLContext}
20+
import org.apache.spark.sql.execution.streaming.runtime.{MemoryStream => RuntimeMemoryStream}
21+
22+
object MemoryStream {
23+
def apply[A: Encoder](implicit sqlContext: SQLContext): RuntimeMemoryStream[A] = {
24+
RuntimeMemoryStream[A]()(implicitly[Encoder[A]], sqlContext)
25+
}
26+
27+
def apply[A: Encoder](
28+
numPartitions: Int)(
29+
implicit sqlContext: SQLContext): RuntimeMemoryStream[A] = {
30+
RuntimeMemoryStream[A](numPartitions)(implicitly[Encoder[A]], sqlContext.sparkSession)
31+
}
32+
}

0 commit comments

Comments
 (0)