Skip to content

Commit 250b469

Browse files
committed
fix: use ConcurrentHashMap for pattern cache in regexp UDFs
The bridge now caches a single shared CometUDF instance per class across native worker threads, so UDF instance state must be thread-safe. Replace the per-instance LinkedHashMap LRU (which mutates on get when accessOrder is true) with ConcurrentHashMap.
1 parent 1ad838b commit 250b469

6 files changed

Lines changed: 12 additions & 65 deletions

File tree

common/src/main/scala/org/apache/comet/udf/RegExpExtractAllUDF.scala

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
package org.apache.comet.udf
2121

2222
import java.nio.charset.StandardCharsets
23-
import java.util
23+
import java.util.concurrent.ConcurrentHashMap
2424
import java.util.regex.Pattern
2525

2626
import org.apache.arrow.vector.{IntVector, ValueVector, VarCharVector}
@@ -44,14 +44,7 @@ import org.apache.comet.CometArrowAllocator
4444
*/
4545
class RegExpExtractAllUDF extends CometUDF {
4646

47-
private val patternCache =
48-
new util.LinkedHashMap[String, Pattern](
49-
RegExpExtractAllUDF.PatternCacheCapacity,
50-
0.75f,
51-
true) {
52-
override def removeEldestEntry(eldest: util.Map.Entry[String, Pattern]): Boolean =
53-
size() > RegExpExtractAllUDF.PatternCacheCapacity
54-
}
47+
private val patternCache = new ConcurrentHashMap[String, Pattern]()
5548

5649
override def evaluate(inputs: Array[ValueVector]): ValueVector = {
5750
require(inputs.length == 3, s"RegExpExtractAllUDF expects 3 inputs, got ${inputs.length}")
@@ -117,7 +110,3 @@ class RegExpExtractAllUDF extends CometUDF {
117110
out
118111
}
119112
}
120-
121-
object RegExpExtractAllUDF {
122-
private val PatternCacheCapacity: Int = 128
123-
}

common/src/main/scala/org/apache/comet/udf/RegExpExtractUDF.scala

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
package org.apache.comet.udf
2121

2222
import java.nio.charset.StandardCharsets
23-
import java.util
23+
import java.util.concurrent.ConcurrentHashMap
2424
import java.util.regex.Pattern
2525

2626
import org.apache.arrow.vector.{IntVector, ValueVector, VarCharVector}
@@ -42,11 +42,7 @@ import org.apache.comet.CometArrowAllocator
4242
*/
4343
class RegExpExtractUDF extends CometUDF {
4444

45-
private val patternCache =
46-
new util.LinkedHashMap[String, Pattern](RegExpExtractUDF.PatternCacheCapacity, 0.75f, true) {
47-
override def removeEldestEntry(eldest: util.Map.Entry[String, Pattern]): Boolean =
48-
size() > RegExpExtractUDF.PatternCacheCapacity
49-
}
45+
private val patternCache = new ConcurrentHashMap[String, Pattern]()
5046

5147
override def evaluate(inputs: Array[ValueVector]): ValueVector = {
5248
require(inputs.length == 3, s"RegExpExtractUDF expects 3 inputs, got ${inputs.length}")
@@ -100,7 +96,3 @@ class RegExpExtractUDF extends CometUDF {
10096
out
10197
}
10298
}
103-
104-
object RegExpExtractUDF {
105-
private val PatternCacheCapacity: Int = 128
106-
}

common/src/main/scala/org/apache/comet/udf/RegExpInStrUDF.scala

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
package org.apache.comet.udf
2121

2222
import java.nio.charset.StandardCharsets
23-
import java.util
23+
import java.util.concurrent.ConcurrentHashMap
2424
import java.util.regex.Pattern
2525

2626
import org.apache.arrow.vector.{IntVector, ValueVector, VarCharVector}
@@ -42,11 +42,7 @@ import org.apache.comet.CometArrowAllocator
4242
*/
4343
class RegExpInStrUDF extends CometUDF {
4444

45-
private val patternCache =
46-
new util.LinkedHashMap[String, Pattern](RegExpInStrUDF.PatternCacheCapacity, 0.75f, true) {
47-
override def removeEldestEntry(eldest: util.Map.Entry[String, Pattern]): Boolean =
48-
size() > RegExpInStrUDF.PatternCacheCapacity
49-
}
45+
private val patternCache = new ConcurrentHashMap[String, Pattern]()
5046

5147
override def evaluate(inputs: Array[ValueVector]): ValueVector = {
5248
require(inputs.length == 3, s"RegExpInStrUDF expects 3 inputs, got ${inputs.length}")
@@ -96,7 +92,3 @@ class RegExpInStrUDF extends CometUDF {
9692
out
9793
}
9894
}
99-
100-
object RegExpInStrUDF {
101-
private val PatternCacheCapacity: Int = 128
102-
}

common/src/main/scala/org/apache/comet/udf/RegExpLikeUDF.scala

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
package org.apache.comet.udf
2121

2222
import java.nio.charset.StandardCharsets
23-
import java.util
23+
import java.util.concurrent.ConcurrentHashMap
2424
import java.util.regex.Pattern
2525

2626
import org.apache.arrow.vector.{BitVector, ValueVector, VarCharVector}
@@ -38,13 +38,7 @@ import org.apache.comet.CometArrowAllocator
3838
*/
3939
class RegExpLikeUDF extends CometUDF {
4040

41-
// Bounded LRU so a workload with many distinct patterns does not retain
42-
// Pattern objects for the executor's lifetime.
43-
private val patternCache =
44-
new util.LinkedHashMap[String, Pattern](RegExpLikeUDF.PatternCacheCapacity, 0.75f, true) {
45-
override def removeEldestEntry(eldest: util.Map.Entry[String, Pattern]): Boolean =
46-
size() > RegExpLikeUDF.PatternCacheCapacity
47-
}
41+
private val patternCache = new ConcurrentHashMap[String, Pattern]()
4842

4943
override def evaluate(inputs: Array[ValueVector]): ValueVector = {
5044
require(inputs.length == 2, s"RegExpLikeUDF expects 2 inputs, got ${inputs.length}")
@@ -83,7 +77,3 @@ class RegExpLikeUDF extends CometUDF {
8377
out
8478
}
8579
}
86-
87-
object RegExpLikeUDF {
88-
private val PatternCacheCapacity: Int = 128
89-
}

common/src/main/scala/org/apache/comet/udf/RegExpReplaceUDF.scala

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
package org.apache.comet.udf
2121

2222
import java.nio.charset.StandardCharsets
23-
import java.util
23+
import java.util.concurrent.ConcurrentHashMap
2424
import java.util.regex.Pattern
2525

2626
import org.apache.arrow.vector.{ValueVector, VarCharVector}
@@ -41,11 +41,7 @@ import org.apache.comet.CometArrowAllocator
4141
*/
4242
class RegExpReplaceUDF extends CometUDF {
4343

44-
private val patternCache =
45-
new util.LinkedHashMap[String, Pattern](RegExpReplaceUDF.PatternCacheCapacity, 0.75f, true) {
46-
override def removeEldestEntry(eldest: util.Map.Entry[String, Pattern]): Boolean =
47-
size() > RegExpReplaceUDF.PatternCacheCapacity
48-
}
44+
private val patternCache = new ConcurrentHashMap[String, Pattern]()
4945

5046
override def evaluate(inputs: Array[ValueVector]): ValueVector = {
5147
require(inputs.length == 3, s"RegExpReplaceUDF expects 3 inputs, got ${inputs.length}")
@@ -90,7 +86,3 @@ class RegExpReplaceUDF extends CometUDF {
9086
out
9187
}
9288
}
93-
94-
object RegExpReplaceUDF {
95-
private val PatternCacheCapacity: Int = 128
96-
}

common/src/main/scala/org/apache/comet/udf/StringSplitUDF.scala

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
package org.apache.comet.udf
2121

2222
import java.nio.charset.StandardCharsets
23-
import java.util
23+
import java.util.concurrent.ConcurrentHashMap
2424
import java.util.regex.Pattern
2525

2626
import org.apache.arrow.vector.ValueVector
@@ -44,11 +44,7 @@ import org.apache.comet.CometArrowAllocator
4444
*/
4545
class StringSplitUDF extends CometUDF {
4646

47-
private val patternCache =
48-
new util.LinkedHashMap[String, Pattern](StringSplitUDF.PatternCacheCapacity, 0.75f, true) {
49-
override def removeEldestEntry(eldest: util.Map.Entry[String, Pattern]): Boolean =
50-
size() > StringSplitUDF.PatternCacheCapacity
51-
}
47+
private val patternCache = new ConcurrentHashMap[String, Pattern]()
5248

5349
override def evaluate(inputs: Array[ValueVector]): ValueVector = {
5450
require(inputs.length == 3, s"StringSplitUDF expects 3 inputs, got ${inputs.length}")
@@ -106,7 +102,3 @@ class StringSplitUDF extends CometUDF {
106102
out
107103
}
108104
}
109-
110-
object StringSplitUDF {
111-
private val PatternCacheCapacity: Int = 128
112-
}

0 commit comments

Comments
 (0)