Skip to content

Commit 56327ed

Browse files
committed
fix: ensure UDF bridge inputs/result close on every path and resolve UDF class via context classloader
Wrap the JNI body in try/finally so input ValueVectors and the result vector are always closed, even when the UDF or arrow export throws. Resolve the CometUDF class through the thread context classloader so user-supplied UDF jars (added via spark.jars) are visible from the bridge.
1 parent 1c66f44 commit 56327ed

1 file changed

Lines changed: 40 additions & 18 deletions

File tree

common/src/main/java/org/apache/comet/udf/CometUdfBridge.java

Lines changed: 40 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,14 @@ public static void evaluate(
6363
CometUDF udf = cache.get(udfClassName);
6464
if (udf == null) {
6565
try {
66-
udf = (CometUDF) Class.forName(udfClassName).getDeclaredConstructor().newInstance();
66+
// Resolve via the executor's context classloader so user-supplied UDF jars
67+
// (added via spark.jars / --jars) are visible.
68+
ClassLoader cl = Thread.currentThread().getContextClassLoader();
69+
if (cl == null) {
70+
cl = CometUdfBridge.class.getClassLoader();
71+
}
72+
udf =
73+
(CometUDF) Class.forName(udfClassName, true, cl).getDeclaredConstructor().newInstance();
6774
} catch (ReflectiveOperationException e) {
6875
throw new RuntimeException("Failed to instantiate CometUDF: " + udfClassName, e);
6976
}
@@ -73,24 +80,39 @@ public static void evaluate(
7380
BufferAllocator allocator = org.apache.comet.package$.MODULE$.CometArrowAllocator();
7481

7582
ValueVector[] inputs = new ValueVector[inputArrayPtrs.length];
76-
for (int i = 0; i < inputArrayPtrs.length; i++) {
77-
ArrowArray inArr = ArrowArray.wrap(inputArrayPtrs[i]);
78-
ArrowSchema inSch = ArrowSchema.wrap(inputSchemaPtrs[i]);
79-
inputs[i] = Data.importVector(allocator, inArr, inSch, null);
80-
}
81-
82-
ValueVector result = udf.evaluate(inputs);
83-
if (!(result instanceof FieldVector)) {
84-
throw new RuntimeException(
85-
"CometUDF.evaluate() must return a FieldVector, got: " + result.getClass().getName());
86-
}
87-
ArrowArray outArr = ArrowArray.wrap(outArrayPtr);
88-
ArrowSchema outSch = ArrowSchema.wrap(outSchemaPtr);
89-
Data.exportVector(allocator, (FieldVector) result, null, outArr, outSch);
83+
ValueVector result = null;
84+
try {
85+
for (int i = 0; i < inputArrayPtrs.length; i++) {
86+
ArrowArray inArr = ArrowArray.wrap(inputArrayPtrs[i]);
87+
ArrowSchema inSch = ArrowSchema.wrap(inputSchemaPtrs[i]);
88+
inputs[i] = Data.importVector(allocator, inArr, inSch, null);
89+
}
9090

91-
for (ValueVector v : inputs) {
92-
v.close();
91+
result = udf.evaluate(inputs);
92+
if (!(result instanceof FieldVector)) {
93+
throw new RuntimeException(
94+
"CometUDF.evaluate() must return a FieldVector, got: " + result.getClass().getName());
95+
}
96+
ArrowArray outArr = ArrowArray.wrap(outArrayPtr);
97+
ArrowSchema outSch = ArrowSchema.wrap(outSchemaPtr);
98+
Data.exportVector(allocator, (FieldVector) result, null, outArr, outSch);
99+
} finally {
100+
for (ValueVector v : inputs) {
101+
if (v != null) {
102+
try {
103+
v.close();
104+
} catch (RuntimeException ignored) {
105+
// do not mask the original throwable
106+
}
107+
}
108+
}
109+
if (result != null) {
110+
try {
111+
result.close();
112+
} catch (RuntimeException ignored) {
113+
// do not mask the original throwable
114+
}
115+
}
93116
}
94-
result.close();
95117
}
96118
}

0 commit comments

Comments
 (0)