Skip to content

Commit dd10c98

Browse files
committed
Merge remote-tracking branch 'apache/main' into user-jvm-udf
# Conflicts: # common/src/main/java/org/apache/comet/udf/CometUdfBridge.java # common/src/main/scala/org/apache/comet/udf/CometUDF.scala
2 parents 24b9361 + 47ec2a6 commit dd10c98

48 files changed

Lines changed: 842 additions & 511 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/pr_benchmark_check.yml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,5 +84,9 @@ jobs:
8484
${{ runner.os }}-benchmark-maven-
8585
8686
- name: Check Scala compilation and linting
87+
# Pin to spark-4.0 (Scala 2.13.16) because the default profile is now
88+
# spark-4.1 / Scala 2.13.17, and semanticdb-scalac_2.13.17 is not yet
89+
# published, which breaks `-Psemanticdb`. See pr_build_linux.yml for
90+
# the same exclusion in the main lint matrix.
8791
run: |
88-
./mvnw -B compile test-compile scalafix:scalafix -Dscalafix.mode=CHECK -Psemanticdb -DskipTests
92+
./mvnw -B compile test-compile scalafix:scalafix -Dscalafix.mode=CHECK -Psemanticdb -Pspark-4.0 -DskipTests

common/src/main/java/org/apache/comet/udf/CometUdfBridge.java

Lines changed: 22 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@
1919

2020
package org.apache.comet.udf;
2121

22-
import java.util.LinkedHashMap;
23-
import java.util.Map;
22+
import java.util.concurrent.ConcurrentHashMap;
2423

2524
import org.apache.arrow.c.ArrowArray;
2625
import org.apache.arrow.c.ArrowSchema;
@@ -36,23 +35,10 @@
3635
*/
3736
public class CometUdfBridge {
3837

39-
// Per-thread, bounded LRU of UDF instances keyed by class name. Comet
40-
// native execution threads (Tokio/DataFusion worker pool) are reused
41-
// across tasks within an executor, so the effective lifetime of cached
42-
// entries is the worker thread (i.e. the executor JVM). This is fine for
43-
// stateless UDFs like ArrayExistsUDF; future stateful UDFs would need
44-
// explicit per-task isolation.
45-
private static final int CACHE_CAPACITY = 64;
46-
47-
private static final ThreadLocal<LinkedHashMap<String, CometUDF>> INSTANCES =
48-
ThreadLocal.withInitial(
49-
() ->
50-
new LinkedHashMap<String, CometUDF>(CACHE_CAPACITY, 0.75f, true) {
51-
@Override
52-
protected boolean removeEldestEntry(Map.Entry<String, CometUDF> eldest) {
53-
return size() > CACHE_CAPACITY;
54-
}
55-
});
38+
// Process-wide cache of UDF instances keyed by class name. CometUDF
39+
// implementations are required to be stateless (see CometUDF), so a
40+
// single shared instance per class is safe across native worker threads.
41+
private static final ConcurrentHashMap<String, CometUDF> INSTANCES = new ConcurrentHashMap<>();
5642

5743
/**
5844
* Called from native via JNI.
@@ -69,23 +55,23 @@ public static void evaluate(
6955
long[] inputSchemaPtrs,
7056
long outArrayPtr,
7157
long outSchemaPtr) {
72-
LinkedHashMap<String, CometUDF> cache = INSTANCES.get();
73-
CometUDF udf = cache.get(udfClassName);
74-
if (udf == null) {
75-
try {
76-
// Resolve via the executor's context classloader so user-supplied UDF jars
77-
// (added via spark.jars / --jars) are visible.
78-
ClassLoader cl = Thread.currentThread().getContextClassLoader();
79-
if (cl == null) {
80-
cl = CometUdfBridge.class.getClassLoader();
81-
}
82-
udf =
83-
(CometUDF) Class.forName(udfClassName, true, cl).getDeclaredConstructor().newInstance();
84-
} catch (ReflectiveOperationException e) {
85-
throw new RuntimeException("Failed to instantiate CometUDF: " + udfClassName, e);
86-
}
87-
cache.put(udfClassName, udf);
88-
}
58+
CometUDF udf =
59+
INSTANCES.computeIfAbsent(
60+
udfClassName,
61+
name -> {
62+
try {
63+
// Resolve via the executor's context classloader so user-supplied UDF jars
64+
// (added via spark.jars / --jars) are visible.
65+
ClassLoader cl = Thread.currentThread().getContextClassLoader();
66+
if (cl == null) {
67+
cl = CometUdfBridge.class.getClassLoader();
68+
}
69+
return (CometUDF)
70+
Class.forName(name, true, cl).getDeclaredConstructor().newInstance();
71+
} catch (ReflectiveOperationException e) {
72+
throw new RuntimeException("Failed to instantiate CometUDF: " + name, e);
73+
}
74+
});
8975

9076
BufferAllocator allocator = org.apache.comet.package$.MODULE$.CometArrowAllocator();
9177

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -94,12 +94,9 @@ object CometConf extends ShimCometConf {
9494
.createWithEnvVarOrDefault("ENABLE_COMET", true)
9595

9696
val COMET_NATIVE_SCAN_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.scan.enabled")
97-
.category(CATEGORY_SCAN)
98-
.doc(
99-
"Whether to enable native scans. When this is turned on, Spark will use Comet to " +
100-
"read supported data sources (currently only Parquet is supported natively). Note " +
101-
"that to enable native vectorized execution, both this config and " +
102-
"`spark.comet.exec.enabled` need to be enabled.")
97+
.category(CATEGORY_TESTING)
98+
.doc("Whether to enable native scans. Intended for use in Comet's own test suites to " +
99+
"selectively disable native scans; not intended for production use.")
103100
.booleanConf
104101
.createWithDefault(true)
105102

common/src/main/scala/org/apache/comet/udf/CometUDF.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ import org.apache.spark.sql.types.DataType
3030
* - Scalar (literal-folded) arguments arrive as length-1 vectors and must be read at index 0.
3131
* - The returned vector's length must match the longest input.
3232
*
33-
* Implementations must have a public no-arg constructor and should be stateless: instances are
34-
* cached per executor thread for the lifetime of the JVM.
33+
* Implementations must have a public no-arg constructor and must be stateless: a single instance
34+
* per class is cached and shared across native worker threads for the lifetime of the JVM.
3535
*/
3636
trait CometUDF {
3737

dev/diffs/3.4.3.diff

Lines changed: 18 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1998,9 +1998,9 @@ index 104b4e416cd..b8af360fa14 100644
19981998
// Note that, if record level filtering is enabled, it should be a single record.
19991999
// If no filter is pushed down to Parquet, it should be the total length of data.
20002000
- assert(actual > 1 && actual < data.length)
2001-
+ // Only enable Comet test iff it's scan only, since with native execution
2001+
+ // Skip when Comet is enabled since with native execution
20022002
+ // `stripSparkFilter` can't remove the native filter
2003-
+ if (!isCometEnabled || isCometScanOnly) {
2003+
+ if (!isCometEnabled) {
20042004
+ assert(actual > 1 && actual < data.length)
20052005
+ }
20062006
}
@@ -2011,9 +2011,9 @@ index 104b4e416cd..b8af360fa14 100644
20112011
// Note that, if record level filtering is enabled, it should be a single record.
20122012
// If no filter is pushed down to Parquet, it should be the total length of data.
20132013
- assert(actual > 1 && actual < data.length)
2014-
+ // Only enable Comet test iff it's scan only, since with native execution
2014+
+ // Skip when Comet is enabled since with native execution
20152015
+ // `stripSparkFilter` can't remove the native filter
2016-
+ if (!isCometEnabled || isCometScanOnly) {
2016+
+ if (!isCometEnabled) {
20172017
+ assert(actual > 1 && actual < data.length)
20182018
+ }
20192019
}
@@ -2926,32 +2926,14 @@ index dd55fcfe42c..99bc018008a 100644
29262926
if (testTags.exists(_.isInstanceOf[DisableAdaptiveExecution])) {
29272927
super.test(testName, testTags: _*) {
29282928
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
2929-
@@ -242,6 +272,29 @@ private[sql] trait SQLTestUtilsBase
2929+
@@ -242,6 +272,11 @@ private[sql] trait SQLTestUtilsBase
29302930
protected override def _sqlContext: SQLContext = self.spark.sqlContext
29312931
}
29322932

29332933
+ /**
29342934
+ * Whether Comet extension is enabled
29352935
+ */
29362936
+ protected def isCometEnabled: Boolean = SparkSession.isCometEnabled
2937-
+
2938-
+ /**
2939-
+ * Whether to enable ansi mode This is only effective when
2940-
+ * [[isCometEnabled]] returns true.
2941-
+ */
2942-
+ protected def enableCometAnsiMode: Boolean = {
2943-
+ val v = System.getenv("ENABLE_COMET_ANSI_MODE")
2944-
+ v != null && v.toBoolean
2945-
+ }
2946-
+
2947-
+ /**
2948-
+ * Whether Spark should only apply Comet scan optimization. This is only effective when
2949-
+ * [[isCometEnabled]] returns true.
2950-
+ */
2951-
+ protected def isCometScanOnly: Boolean = {
2952-
+ val v = System.getenv("ENABLE_COMET_SCAN_ONLY")
2953-
+ v != null && v.toBoolean
2954-
+ }
29552937
+
29562938
protected override def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
29572939
SparkSession.setActiveSession(spark)
@@ -2969,7 +2951,7 @@ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSessio
29692951
index ed2e309fa07..a5ea58146ad 100644
29702952
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
29712953
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
2972-
@@ -74,6 +74,31 @@ trait SharedSparkSessionBase
2954+
@@ -74,6 +74,20 @@ trait SharedSparkSessionBase
29732955
// this rule may potentially block testing of other optimization rules such as
29742956
// ConstantPropagation etc.
29752957
.set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName)
@@ -2980,23 +2962,12 @@ index ed2e309fa07..a5ea58146ad 100644
29802962
+ .set("spark.comet.enabled", "true")
29812963
+ .set("spark.comet.parquet.respectFilterPushdown", "true")
29822964
+
2983-
+ if (!isCometScanOnly) {
2984-
+ conf
2985-
+ .set("spark.comet.exec.enabled", "true")
2986-
+ .set("spark.shuffle.manager",
2987-
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
2988-
+ .set("spark.comet.exec.shuffle.enabled", "true")
2989-
+ .set("spark.comet.memoryOverhead", "10g")
2990-
+ } else {
2991-
+ conf
2992-
+ .set("spark.comet.exec.enabled", "false")
2993-
+ .set("spark.comet.exec.shuffle.enabled", "false")
2994-
+ }
2995-
+
2996-
+ if (enableCometAnsiMode) {
2997-
+ conf
2998-
+ .set("spark.sql.ansi.enabled", "true")
2999-
+ }
2965+
+ conf
2966+
+ .set("spark.comet.exec.enabled", "true")
2967+
+ .set("spark.shuffle.manager",
2968+
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
2969+
+ .set("spark.comet.exec.shuffle.enabled", "true")
2970+
+ .set("spark.comet.memoryOverhead", "10g")
30002971
+ }
30012972
conf.set(
30022973
StaticSQLConf.WAREHOUSE_PATH,
@@ -3093,7 +3064,7 @@ diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.sca
30933064
index 07361cfdce9..97dab2a3506 100644
30943065
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
30953066
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
3096-
@@ -55,25 +55,54 @@ object TestHive
3067+
@@ -55,25 +55,41 @@ object TestHive
30973068
new SparkContext(
30983069
System.getProperty("spark.sql.test.master", "local[1]"),
30993070
"TestSQLContext",
@@ -3140,24 +3111,11 @@ index 07361cfdce9..97dab2a3506 100644
31403111
+ .set("spark.sql.extensions", "org.apache.comet.CometSparkSessionExtensions")
31413112
+ .set("spark.comet.enabled", "true")
31423113
+
3143-
+ val v = System.getenv("ENABLE_COMET_SCAN_ONLY")
3144-
+ if (v == null || !v.toBoolean) {
3145-
+ conf
3146-
+ .set("spark.comet.exec.enabled", "true")
3147-
+ .set("spark.shuffle.manager",
3148-
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
3149-
+ .set("spark.comet.exec.shuffle.enabled", "true")
3150-
+ } else {
3151-
+ conf
3152-
+ .set("spark.comet.exec.enabled", "false")
3153-
+ .set("spark.comet.exec.shuffle.enabled", "false")
3154-
+ }
3155-
+
3156-
+ val a = System.getenv("ENABLE_COMET_ANSI_MODE")
3157-
+ if (a != null && a.toBoolean) {
3158-
+ conf
3159-
+ .set("spark.sql.ansi.enabled", "true")
3160-
+ }
3114+
+ conf
3115+
+ .set("spark.comet.exec.enabled", "true")
3116+
+ .set("spark.shuffle.manager",
3117+
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
3118+
+ .set("spark.comet.exec.shuffle.enabled", "true")
31613119
+ }
31623120

31633121
+ conf

dev/diffs/3.5.8.diff

Lines changed: 18 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1979,9 +1979,9 @@ index 8e88049f51e..20d7ef7b1bc 100644
19791979
// Note that, if record level filtering is enabled, it should be a single record.
19801980
// If no filter is pushed down to Parquet, it should be the total length of data.
19811981
- assert(actual > 1 && actual < data.length)
1982-
+ // Only enable Comet test iff it's scan only, since with native execution
1982+
+ // Skip when Comet is enabled since with native execution
19831983
+ // `stripSparkFilter` can't remove the native filter
1984-
+ if (!isCometEnabled || isCometScanOnly) {
1984+
+ if (!isCometEnabled) {
19851985
+ assert(actual > 1 && actual < data.length)
19861986
+ }
19871987
}
@@ -1992,9 +1992,9 @@ index 8e88049f51e..20d7ef7b1bc 100644
19921992
// Note that, if record level filtering is enabled, it should be a single record.
19931993
// If no filter is pushed down to Parquet, it should be the total length of data.
19941994
- assert(actual > 1 && actual < data.length)
1995-
+ // Only enable Comet test iff it's scan only, since with native execution
1995+
+ // Skip when Comet is enabled since with native execution
19961996
+ // `stripSparkFilter` can't remove the native filter
1997-
+ if (!isCometEnabled || isCometScanOnly) {
1997+
+ if (!isCometEnabled) {
19981998
+ assert(actual > 1 && actual < data.length)
19991999
+ }
20002000
}
@@ -2878,32 +2878,14 @@ index e937173a590..7d20538bc68 100644
28782878
if (testTags.exists(_.isInstanceOf[DisableAdaptiveExecution])) {
28792879
super.test(testName, testTags: _*) {
28802880
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
2881-
@@ -242,6 +272,29 @@ private[sql] trait SQLTestUtilsBase
2881+
@@ -242,6 +272,11 @@ private[sql] trait SQLTestUtilsBase
28822882
protected override def _sqlContext: SQLContext = self.spark.sqlContext
28832883
}
28842884

28852885
+ /**
28862886
+ * Whether Comet extension is enabled
28872887
+ */
28882888
+ protected def isCometEnabled: Boolean = SparkSession.isCometEnabled
2889-
+
2890-
+ /**
2891-
+ * Whether to enable ansi mode This is only effective when
2892-
+ * [[isCometEnabled]] returns true.
2893-
+ */
2894-
+ protected def enableCometAnsiMode: Boolean = {
2895-
+ val v = System.getenv("ENABLE_COMET_ANSI_MODE")
2896-
+ v != null && v.toBoolean
2897-
+ }
2898-
+
2899-
+ /**
2900-
+ * Whether Spark should only apply Comet scan optimization. This is only effective when
2901-
+ * [[isCometEnabled]] returns true.
2902-
+ */
2903-
+ protected def isCometScanOnly: Boolean = {
2904-
+ val v = System.getenv("ENABLE_COMET_SCAN_ONLY")
2905-
+ v != null && v.toBoolean
2906-
+ }
29072889
+
29082890
protected override def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
29092891
SparkSession.setActiveSession(spark)
@@ -2921,7 +2903,7 @@ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSessio
29212903
index ed2e309fa07..a5ea58146ad 100644
29222904
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
29232905
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
2924-
@@ -74,6 +74,31 @@ trait SharedSparkSessionBase
2906+
@@ -74,6 +74,20 @@ trait SharedSparkSessionBase
29252907
// this rule may potentially block testing of other optimization rules such as
29262908
// ConstantPropagation etc.
29272909
.set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName)
@@ -2932,23 +2914,12 @@ index ed2e309fa07..a5ea58146ad 100644
29322914
+ .set("spark.comet.enabled", "true")
29332915
+ .set("spark.comet.parquet.respectFilterPushdown", "true")
29342916
+
2935-
+ if (!isCometScanOnly) {
2936-
+ conf
2937-
+ .set("spark.comet.exec.enabled", "true")
2938-
+ .set("spark.shuffle.manager",
2939-
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
2940-
+ .set("spark.comet.exec.shuffle.enabled", "true")
2941-
+ .set("spark.comet.memoryOverhead", "10g")
2942-
+ } else {
2943-
+ conf
2944-
+ .set("spark.comet.exec.enabled", "false")
2945-
+ .set("spark.comet.exec.shuffle.enabled", "false")
2946-
+ }
2947-
+
2948-
+ if (enableCometAnsiMode) {
2949-
+ conf
2950-
+ .set("spark.sql.ansi.enabled", "true")
2951-
+ }
2917+
+ conf
2918+
+ .set("spark.comet.exec.enabled", "true")
2919+
+ .set("spark.shuffle.manager",
2920+
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
2921+
+ .set("spark.comet.exec.shuffle.enabled", "true")
2922+
+ .set("spark.comet.memoryOverhead", "10g")
29522923
+ }
29532924
conf.set(
29542925
StaticSQLConf.WAREHOUSE_PATH,
@@ -3045,7 +3016,7 @@ diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.sca
30453016
index 1d646f40b3e..5babe505301 100644
30463017
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
30473018
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
3048-
@@ -53,25 +53,54 @@ object TestHive
3019+
@@ -53,25 +53,41 @@ object TestHive
30493020
new SparkContext(
30503021
System.getProperty("spark.sql.test.master", "local[1]"),
30513022
"TestSQLContext",
@@ -3092,24 +3063,11 @@ index 1d646f40b3e..5babe505301 100644
30923063
+ .set("spark.sql.extensions", "org.apache.comet.CometSparkSessionExtensions")
30933064
+ .set("spark.comet.enabled", "true")
30943065
+
3095-
+ val v = System.getenv("ENABLE_COMET_SCAN_ONLY")
3096-
+ if (v == null || !v.toBoolean) {
3097-
+ conf
3098-
+ .set("spark.comet.exec.enabled", "true")
3099-
+ .set("spark.shuffle.manager",
3100-
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
3101-
+ .set("spark.comet.exec.shuffle.enabled", "true")
3102-
+ } else {
3103-
+ conf
3104-
+ .set("spark.comet.exec.enabled", "false")
3105-
+ .set("spark.comet.exec.shuffle.enabled", "false")
3106-
+ }
3107-
+
3108-
+ val a = System.getenv("ENABLE_COMET_ANSI_MODE")
3109-
+ if (a != null && a.toBoolean) {
3110-
+ conf
3111-
+ .set("spark.sql.ansi.enabled", "true")
3112-
+ }
3066+
+ conf
3067+
+ .set("spark.comet.exec.enabled", "true")
3068+
+ .set("spark.shuffle.manager",
3069+
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
3070+
+ .set("spark.comet.exec.shuffle.enabled", "true")
31133071
+ }
31143072

31153073
+ conf

0 commit comments

Comments
 (0)