Skip to content

Commit e3569ba

Browse files
Kontinuationhvanhovell
authored andcommitted
[SPARK-52819][SQL] Making KryoSerializationCodec serializable to fix java.io.NotSerializableException errors in various use cases
### What changes were proposed in this pull request? This PR makes `KryoSerializationCodec` implements `java.io.Serializable` to avoid `java.io.NotSerializableException` exceptions when using Kryo encoder in `Dataset.flatMapGroupsWithState` or `Aggregator.bufferEncoder`. ### Why are the changes needed? See the description in [SPARK-52819](https://issues.apache.org/jira/browse/SPARK-52819) as well as the [minimal repro](https://github.com/Kontinuation/spark-4-kryo-encoder-bug). The problems only happens when using Spark 4.0.0 but not when using Spark 3.5.5. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New tests were added to ensure that 1. Both kryo encoder and java serialization encoder are serializable 2. The kryo encoded aggregator buffer use case works ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#51615 from Kontinuation/fix-kryo-codec-serializable. Authored-by: Kristin Cowalcijk <bo@wherobots.com> Signed-off-by: Herman van Hövell <herman@databricks.com>
1 parent 1413dbb commit e3569ba

File tree

3 files changed

+20
-1
lines changed

3 files changed

+20
-1
lines changed

sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/codecs.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ object JavaSerializationCodec extends (() => Codec[Any, Array[Byte]]) {
5555
* server (driver & executors) very tricky. As a workaround a user can define their own Codec
5656
* which internalizes the Kryo configuration.
5757
*/
58-
object KryoSerializationCodec extends (() => Codec[Any, Array[Byte]]) {
58+
object KryoSerializationCodec extends (() => Codec[Any, Array[Byte]]) with Serializable {
5959
private lazy val kryoCodecConstructor: MethodHandle = {
6060
val cls = SparkClassUtils.classForName(
6161
"org.apache.spark.sql.catalyst.encoders.KryoSerializationCodecImpl")

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -612,6 +612,7 @@ class ExpressionEncoderSuite extends CodegenInterpretedPlanTest with AnalysisTes
612612
provider,
613613
nullable = true))
614614
.resolveAndBind()
615+
assert(encoder.isInstanceOf[Serializable])
615616
assert(encoder.schema == new StructType().add("value", BinaryType))
616617
val toRow = encoder.createSerializer()
617618
val fromRow = encoder.createDeserializer()

sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,17 @@ private case class FunctionResult(f1: String, f2: String)
5353
private case class LocalDateInstantType(date: LocalDate, instant: Instant)
5454
private case class TimestampInstantType(t: Timestamp, instant: Instant)
5555

56+
private case class KryoEncodedBuf(value: Long)
57+
private case class KryoBufAggregator() extends Aggregator[Long, KryoEncodedBuf, Long] {
58+
override def zero: KryoEncodedBuf = KryoEncodedBuf(0)
59+
override def reduce(b: KryoEncodedBuf, a: Long): KryoEncodedBuf = KryoEncodedBuf(b.value + a)
60+
override def merge(b1: KryoEncodedBuf, b2: KryoEncodedBuf): KryoEncodedBuf =
61+
KryoEncodedBuf(b1.value + b2.value)
62+
override def finish(reduction: KryoEncodedBuf): Long = reduction.value
63+
override def bufferEncoder: Encoder[KryoEncodedBuf] = Encoders.kryo[KryoEncodedBuf]
64+
override def outputEncoder: Encoder[Long] = Encoders.scalaLong
65+
}
66+
5667
class UDFSuite extends QueryTest with SharedSparkSession {
5768
import testImplicits._
5869

@@ -1249,4 +1260,11 @@ class UDFSuite extends QueryTest with SharedSparkSession {
12491260
.select(f($"c").as("f"), f($"f"))
12501261
checkAnswer(df, Seq(Row(2, 3), Row(null, null)))
12511262
}
1263+
1264+
test("SPARK-52819: Support using Kryo to encode BUF in Aggregator") {
1265+
val kryoBufUDAF = udaf(KryoBufAggregator())
1266+
val input = Seq(1L, 2L, 3L).toDF("value")
1267+
val result = input.select(kryoBufUDAF($"value").as("sum"))
1268+
checkAnswer(result, Row(6L) :: Nil)
1269+
}
12521270
}

0 commit comments

Comments
 (0)