Skip to content

Commit 509c682

Browse files
committed
fix: ship lambda expression in proto instead of via process-local registry
The previous CometLambdaRegistry approach registered the ArrayExists expression in a JVM-local ConcurrentHashMap on the driver and looked it up by UUID on the executor. In a real cluster the executor JVM never sees the driver's registry and the lookup fails. Local-mode tests masked this because driver and executor share the same JVM. Serialize the ArrayExists Catalyst expression to bytes in the serde layer and embed it in the proto as a BinaryType literal arg. The UDF deserializes on each invocation. CometLambdaRegistry is no longer referenced and is removed.
1 parent 2c904fc commit 509c682

3 files changed

Lines changed: 29 additions & 78 deletions

File tree

spark/src/main/scala/org/apache/comet/serde/arrays.scala

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ import org.apache.comet.CometConf
3131
import org.apache.comet.CometSparkSessionExtensions.withInfo
3232
import org.apache.comet.serde.QueryPlanSerde._
3333
import org.apache.comet.shims.{CometExprShim, CometTypeShim}
34-
import org.apache.comet.udf.CometLambdaRegistry
3534

3635
object CometArrayRemove
3736
extends CometExpressionSerde[ArrayRemove]
@@ -852,25 +851,31 @@ object CometArrayExists extends CometExpressionSerde[ArrayExists] {
852851
return None
853852
}
854853

855-
val registryKey = CometLambdaRegistry.register(expr)
856-
val keyLiteral = Literal(registryKey)
857-
val keyProto = exprToProtoInternal(keyLiteral, inputs, binding)
858-
if (keyProto.isEmpty) {
859-
CometLambdaRegistry.remove(registryKey)
860-
withInfo(expr, "Failed to serialize registry key literal")
854+
// The lambda body is evaluated on the executor inside ArrayExistsUDF, so the entire
855+
// ArrayExists Catalyst expression must travel with the plan. The driver-side
856+
// CometLambdaRegistry approach broke under real distributed execution because the
857+
// executor JVM doesn't share the driver's map. Serializing the expression as bytes
858+
// and shipping it through the proto as a Literal arg keeps the lambda self-contained.
859+
val baos = new java.io.ByteArrayOutputStream()
860+
val oos = new java.io.ObjectOutputStream(baos)
861+
try {
862+
oos.writeObject(expr)
863+
} finally {
864+
oos.close()
865+
}
866+
val payloadProto = exprToProtoInternal(Literal(baos.toByteArray, BinaryType), inputs, binding)
867+
if (payloadProto.isEmpty) {
868+
withInfo(expr, "Failed to serialize lambda expression payload")
861869
return None
862870
}
863871

864-
val returnType = serializeDataType(BooleanType).getOrElse {
865-
CometLambdaRegistry.remove(registryKey)
866-
return None
867-
}
872+
val returnType = serializeDataType(BooleanType).getOrElse(return None)
868873

869874
val udfBuilder = ExprOuterClass.JvmScalarUdf
870875
.newBuilder()
871876
.setClassName("org.apache.comet.udf.ArrayExistsUDF")
872877
.addArgs(arrayProto.get)
873-
.addArgs(keyProto.get)
878+
.addArgs(payloadProto.get)
874879
.setReturnType(returnType)
875880
.setReturnNullable(expr.nullable)
876881

spark/src/main/scala/org/apache/comet/udf/ArrayExistsUDF.scala

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919

2020
package org.apache.comet.udf
2121

22-
import java.nio.charset.StandardCharsets
23-
2422
import org.apache.arrow.vector._
2523
import org.apache.arrow.vector.complex.ListVector
2624
import org.apache.spark.sql.catalyst.expressions.{ArrayExists, LambdaFunction, NamedLambdaVariable}
@@ -34,7 +32,9 @@ import org.apache.comet.CometArrowAllocator
3432
*
3533
* Inputs:
3634
* - inputs(0): ListVector (the array column)
37-
* - inputs(1): VarCharVector length-1 scalar (registry key for the lambda expression)
35+
* - inputs(1): VarBinaryVector length-1 scalar containing the Java-serialized [[ArrayExists]]
36+
* Catalyst expression. Shipping the expression in the proto avoids the driver-vs-executor
37+
* mismatch a process-local registry would suffer.
3838
*
3939
* Output: BitVector (nullable boolean), same length as the input array vector.
4040
*
@@ -48,13 +48,17 @@ class ArrayExistsUDF extends CometUDF {
4848
override def evaluate(inputs: Array[ValueVector], numRows: Int): ValueVector = {
4949
require(inputs.length == 2, s"ArrayExistsUDF expects 2 inputs, got ${inputs.length}")
5050
val listVec = inputs(0).asInstanceOf[ListVector]
51-
val keyVec = inputs(1).asInstanceOf[VarCharVector]
51+
val payloadVec = inputs(1).asInstanceOf[VarBinaryVector]
5252
require(
53-
keyVec.getValueCount >= 1 && !keyVec.isNull(0),
54-
"ArrayExistsUDF requires a non-null scalar registry key")
53+
payloadVec.getValueCount >= 1 && !payloadVec.isNull(0),
54+
"ArrayExistsUDF requires a non-null scalar payload")
5555

56-
val registryKey = new String(keyVec.get(0), StandardCharsets.UTF_8)
57-
val arrayExistsExpr = CometLambdaRegistry.get(registryKey).asInstanceOf[ArrayExists]
56+
val payloadBytes = payloadVec.get(0)
57+
val bais = new java.io.ByteArrayInputStream(payloadBytes)
58+
val ois = new java.io.ObjectInputStream(bais)
59+
val arrayExistsExpr =
60+
try ois.readObject().asInstanceOf[ArrayExists]
61+
finally ois.close()
5862

5963
val LambdaFunction(_, Seq(elementVar: NamedLambdaVariable), _) = arrayExistsExpr.function
6064
val body = arrayExistsExpr.functionForEval

spark/src/main/scala/org/apache/comet/udf/CometLambdaRegistry.scala

Lines changed: 0 additions & 58 deletions
This file was deleted.

0 commit comments

Comments
 (0)