Skip to content

Commit 42462c3

Browse files
committed
fix: bound UDF and pattern caches with LRU eviction
The per-thread CometUDF instance cache (CometUdfBridge.INSTANCES) and the per-instance Pattern cache in RegExpLikeUDF were unbounded, so a workload that registers many UDF classes or evaluates many distinct regex patterns would retain entries for the executor's JVM lifetime. Both caches now use access-order LinkedHashMap with removeEldestEntry bounds (64 UDF instances, 128 patterns).
1 parent 1dd81fb commit 42462c3

2 files changed

Lines changed: 39 additions & 12 deletions

File tree

common/src/main/java/org/apache/comet/udf/CometUdfBridge.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919

2020
package org.apache.comet.udf;
2121

22-
import java.util.HashMap;
22+
import java.util.LinkedHashMap;
23+
import java.util.Map;
2324

2425
import org.apache.arrow.c.ArrowArray;
2526
import org.apache.arrow.c.ArrowSchema;
@@ -35,14 +36,23 @@
3536
*/
3637
public class CometUdfBridge {
3738

38-
// Per-thread cache of UDF instances keyed by class name. Comet native
39-
// execution threads (Tokio/DataFusion worker pool) are reused across
40-
// tasks within an executor, so the effective lifetime of cached entries
41-
// is the worker thread (i.e. the executor JVM). This is fine for
39+
// Per-thread, bounded LRU of UDF instances keyed by class name. Comet
40+
// native execution threads (Tokio/DataFusion worker pool) are reused
41+
// across tasks within an executor, so the effective lifetime of cached
42+
// entries is the worker thread (i.e. the executor JVM). This is fine for
4243
// stateless UDFs like RegExpLikeUDF; future stateful UDFs would need
4344
// explicit per-task isolation.
44-
private static final ThreadLocal<HashMap<String, CometUDF>> INSTANCES =
45-
ThreadLocal.withInitial(HashMap::new);
45+
private static final int CACHE_CAPACITY = 64;
46+
47+
private static final ThreadLocal<LinkedHashMap<String, CometUDF>> INSTANCES =
48+
ThreadLocal.withInitial(
49+
() ->
50+
new LinkedHashMap<String, CometUDF>(CACHE_CAPACITY, 0.75f, true) {
51+
@Override
52+
protected boolean removeEldestEntry(Map.Entry<String, CometUDF> eldest) {
53+
return size() > CACHE_CAPACITY;
54+
}
55+
});
4656

4757
/**
4858
* Called from native via JNI.
@@ -59,7 +69,7 @@ public static void evaluate(
5969
long[] inputSchemaPtrs,
6070
long outArrayPtr,
6171
long outSchemaPtr) {
62-
HashMap<String, CometUDF> cache = INSTANCES.get();
72+
LinkedHashMap<String, CometUDF> cache = INSTANCES.get();
6373
CometUDF udf = cache.get(udfClassName);
6474
if (udf == null) {
6575
try {

common/src/main/scala/org/apache/comet/udf/RegExpLikeUDF.scala

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,9 @@
2020
package org.apache.comet.udf
2121

2222
import java.nio.charset.StandardCharsets
23+
import java.util
2324
import java.util.regex.Pattern
2425

25-
import scala.collection.mutable
26-
2726
import org.apache.arrow.vector.{BitVector, ValueVector, VarCharVector}
2827

2928
import org.apache.comet.CometArrowAllocator
@@ -39,7 +38,13 @@ import org.apache.comet.CometArrowAllocator
3938
*/
4039
class RegExpLikeUDF extends CometUDF {
4140

42-
private val patternCache = mutable.Map.empty[String, Pattern]
41+
// Bounded LRU so a workload with many distinct patterns does not retain
42+
// Pattern objects for the executor's lifetime.
43+
private val patternCache =
44+
new util.LinkedHashMap[String, Pattern](RegExpLikeUDF.PatternCacheCapacity, 0.75f, true) {
45+
override def removeEldestEntry(eldest: util.Map.Entry[String, Pattern]): Boolean =
46+
size() > RegExpLikeUDF.PatternCacheCapacity
47+
}
4348

4449
override def evaluate(inputs: Array[ValueVector]): ValueVector = {
4550
require(inputs.length == 2, s"RegExpLikeUDF expects 2 inputs, got ${inputs.length}")
@@ -50,7 +55,15 @@ class RegExpLikeUDF extends CometUDF {
5055
"RegExpLikeUDF requires a non-null scalar pattern")
5156

5257
val patternStr = new String(patternVec.get(0), StandardCharsets.UTF_8)
53-
val pattern = patternCache.getOrElseUpdate(patternStr, Pattern.compile(patternStr))
58+
val pattern = {
59+
val cached = patternCache.get(patternStr)
60+
if (cached != null) cached
61+
else {
62+
val compiled = Pattern.compile(patternStr)
63+
patternCache.put(patternStr, compiled)
64+
compiled
65+
}
66+
}
5467

5568
val n = subject.getValueCount
5669
val out = new BitVector("rlike_result", CometArrowAllocator)
@@ -70,3 +83,7 @@ class RegExpLikeUDF extends CometUDF {
7083
out
7184
}
7285
}
86+
87+
object RegExpLikeUDF {
88+
private val PatternCacheCapacity: Int = 128
89+
}

0 commit comments

Comments
 (0)