Skip to content

Commit 1ad838b

Browse files
committed
Merge remote-tracking branch 'apache/main' into java-regexp
# Conflicts: # common/src/main/java/org/apache/comet/udf/CometUdfBridge.java # common/src/main/scala/org/apache/comet/udf/CometUDF.scala # native/jni-bridge/src/lib.rs # native/spark-expr/src/jvm_udf/mod.rs
2 parents 6cac094 + 47ec2a6 commit 1ad838b

356 files changed

Lines changed: 3321 additions & 27385 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.asf.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ github:
4141
features:
4242
issues: true
4343
discussions: true
44+
projects: true
4445
protected_branches:
4546
main:
4647
required_pull_request_reviews:

.github/workflows/docker-publish.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,6 @@ jobs:
7474
with:
7575
platforms: linux/amd64,linux/arm64
7676
push: true
77-
tags: ghcr.io/apache/datafusion-comet:spark-3.5-scala-2.12-${{ env.COMET_VERSION }}
77+
tags: ghcr.io/apache/datafusion-comet:spark-4.1-scala-2.13-${{ env.COMET_VERSION }}
7878
file: kube/Dockerfile
7979
no-cache: true

.github/workflows/pr_benchmark_check.yml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,5 +84,9 @@ jobs:
8484
${{ runner.os }}-benchmark-maven-
8585
8686
- name: Check Scala compilation and linting
87+
# Pin to spark-4.0 (Scala 2.13.16) because the default profile is now
88+
# spark-4.1 / Scala 2.13.17, and semanticdb-scalac_2.13.17 is not yet
89+
# published, which breaks `-Psemanticdb`. See pr_build_linux.yml for
90+
# the same exclusion in the main lint matrix.
8791
run: |
88-
./mvnw -B compile test-compile scalafix:scalafix -Dscalafix.mode=CHECK -Psemanticdb -DskipTests
92+
./mvnw -B compile test-compile scalafix:scalafix -Dscalafix.mode=CHECK -Psemanticdb -Pspark-4.0 -DskipTests

.github/workflows/pr_build_linux.yml

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,7 @@ jobs:
355355
org.apache.comet.exec.CometWindowExecSuite
356356
org.apache.comet.exec.CometJoinSuite
357357
org.apache.comet.CometNativeSuite
358+
org.apache.comet.CometSetOpWithGroupBySuite
358359
org.apache.comet.CometSparkSessionExtensionsSuite
359360
org.apache.spark.CometPluginsSuite
360361
org.apache.spark.CometPluginsDefaultSuite
@@ -451,6 +452,8 @@ jobs:
451452
runs-on: ${{ github.repository_owner == 'apache' && format('runs-on={0},family=m8a+m7a+c8a,cpu=16,image=ubuntu24-full-x64,extras=s3-cache,disk=large,tag=datafusion-comet', github.run_id) || 'ubuntu-latest' }}
452453
container:
453454
image: amd64/rust
455+
env:
456+
JAVA_TOOL_OPTIONS: --add-exports=java.base/sun.nio.ch=ALL-UNNAMED --add-exports=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED
454457
steps:
455458
- uses: runs-on/action@742bf56072eb4845a0f94b3394673e4903c90ff0 # v2.1.0
456459

@@ -460,7 +463,7 @@ jobs:
460463
uses: ./.github/actions/setup-builder
461464
with:
462465
rust-version: ${{ env.RUST_VERSION }}
463-
jdk-version: 11
466+
jdk-version: 17
464467

465468
- name: Download native library
466469
uses: actions/download-artifact@v8
@@ -505,6 +508,8 @@ jobs:
505508
runs-on: ${{ github.repository_owner == 'apache' && format('runs-on={0},family=m8a+m7a+c8a,cpu=16,image=ubuntu24-full-x64,extras=s3-cache,disk=large,tag=datafusion-comet', github.run_id) || 'ubuntu-latest' }}
506509
container:
507510
image: amd64/rust
511+
env:
512+
JAVA_TOOL_OPTIONS: --add-exports=java.base/sun.nio.ch=ALL-UNNAMED --add-exports=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED
508513
strategy:
509514
matrix:
510515
join: [sort_merge, broadcast, hash]
@@ -518,7 +523,7 @@ jobs:
518523
uses: ./.github/actions/setup-builder
519524
with:
520525
rust-version: ${{ env.RUST_VERSION }}
521-
jdk-version: 11
526+
jdk-version: 17
522527

523528
- name: Download native library
524529
uses: actions/download-artifact@v8

.github/workflows/pr_build_macos.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ jobs:
194194
org.apache.comet.exec.CometWindowExecSuite
195195
org.apache.comet.exec.CometJoinSuite
196196
org.apache.comet.CometNativeSuite
197+
org.apache.comet.CometSetOpWithGroupBySuite
197198
org.apache.comet.CometSparkSessionExtensionsSuite
198199
org.apache.spark.CometPluginsSuite
199200
org.apache.spark.CometPluginsDefaultSuite

common/src/main/java/org/apache/comet/udf/CometUdfBridge.java

Lines changed: 22 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@
1919

2020
package org.apache.comet.udf;
2121

22-
import java.util.LinkedHashMap;
23-
import java.util.Map;
22+
import java.util.concurrent.ConcurrentHashMap;
2423

2524
import org.apache.arrow.c.ArrowArray;
2625
import org.apache.arrow.c.ArrowSchema;
@@ -36,23 +35,10 @@
3635
*/
3736
public class CometUdfBridge {
3837

39-
// Per-thread, bounded LRU of UDF instances keyed by class name. Comet
40-
// native execution threads (Tokio/DataFusion worker pool) are reused
41-
// across tasks within an executor, so the effective lifetime of cached
42-
// entries is the worker thread (i.e. the executor JVM). This is fine for
43-
// stateless UDFs like RegExpLikeUDF; future stateful UDFs would need
44-
// explicit per-task isolation.
45-
private static final int CACHE_CAPACITY = 64;
46-
47-
private static final ThreadLocal<LinkedHashMap<String, CometUDF>> INSTANCES =
48-
ThreadLocal.withInitial(
49-
() ->
50-
new LinkedHashMap<String, CometUDF>(CACHE_CAPACITY, 0.75f, true) {
51-
@Override
52-
protected boolean removeEldestEntry(Map.Entry<String, CometUDF> eldest) {
53-
return size() > CACHE_CAPACITY;
54-
}
55-
});
38+
// Process-wide cache of UDF instances keyed by class name. CometUDF
39+
// implementations are required to be stateless (see CometUDF), so a
40+
// single shared instance per class is safe across native worker threads.
41+
private static final ConcurrentHashMap<String, CometUDF> INSTANCES = new ConcurrentHashMap<>();
5642

5743
/**
5844
* Called from native via JNI.
@@ -69,23 +55,23 @@ public static void evaluate(
6955
long[] inputSchemaPtrs,
7056
long outArrayPtr,
7157
long outSchemaPtr) {
72-
LinkedHashMap<String, CometUDF> cache = INSTANCES.get();
73-
CometUDF udf = cache.get(udfClassName);
74-
if (udf == null) {
75-
try {
76-
// Resolve via the executor's context classloader so user-supplied UDF jars
77-
// (added via spark.jars / --jars) are visible.
78-
ClassLoader cl = Thread.currentThread().getContextClassLoader();
79-
if (cl == null) {
80-
cl = CometUdfBridge.class.getClassLoader();
81-
}
82-
udf =
83-
(CometUDF) Class.forName(udfClassName, true, cl).getDeclaredConstructor().newInstance();
84-
} catch (ReflectiveOperationException e) {
85-
throw new RuntimeException("Failed to instantiate CometUDF: " + udfClassName, e);
86-
}
87-
cache.put(udfClassName, udf);
88-
}
58+
CometUDF udf =
59+
INSTANCES.computeIfAbsent(
60+
udfClassName,
61+
name -> {
62+
try {
63+
// Resolve via the executor's context classloader so user-supplied UDF jars
64+
// (added via spark.jars / --jars) are visible.
65+
ClassLoader cl = Thread.currentThread().getContextClassLoader();
66+
if (cl == null) {
67+
cl = CometUdfBridge.class.getClassLoader();
68+
}
69+
return (CometUDF)
70+
Class.forName(name, true, cl).getDeclaredConstructor().newInstance();
71+
} catch (ReflectiveOperationException e) {
72+
throw new RuntimeException("Failed to instantiate CometUDF: " + name, e);
73+
}
74+
});
8975

9076
BufferAllocator allocator = org.apache.comet.package$.MODULE$.CometArrowAllocator();
9177

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -94,12 +94,9 @@ object CometConf extends ShimCometConf {
9494
.createWithEnvVarOrDefault("ENABLE_COMET", true)
9595

9696
val COMET_NATIVE_SCAN_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.scan.enabled")
97-
.category(CATEGORY_SCAN)
98-
.doc(
99-
"Whether to enable native scans. When this is turned on, Spark will use Comet to " +
100-
"read supported data sources (currently only Parquet is supported natively). Note " +
101-
"that to enable native vectorized execution, both this config and " +
102-
"`spark.comet.exec.enabled` need to be enabled.")
97+
.category(CATEGORY_TESTING)
98+
.doc("Whether to enable native scans. Intended for use in Comet's own test suites to " +
99+
"selectively disable native scans; not intended for production use.")
103100
.booleanConf
104101
.createWithDefault(true)
105102

common/src/main/scala/org/apache/comet/parquet/CometParquetUtils.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,12 @@ object CometParquetUtils {
2929
private val PARQUET_FIELD_ID_READ_ENABLED = "spark.sql.parquet.fieldId.read.enabled"
3030
private val IGNORE_MISSING_PARQUET_FIELD_ID = "spark.sql.parquet.fieldId.read.ignoreMissing"
3131

32+
// Field-metadata key arrow-rs writes when it lifts Parquet field IDs into the Arrow schema
33+
// (`parquet::arrow::PARQUET_FIELD_ID_META_KEY`). Spark's local key for the same concept is
34+
// `parquet.field.id` (`ParquetUtils.FIELD_ID_METADATA_KEY`). The serde translates at the proto
35+
// boundary so the native side can match the same key it gets from arrow-rs.
36+
val PARQUET_FIELD_ID_META_KEY = "PARQUET:field_id"
37+
3238
// Map of encryption configuration key-value pairs that, if present, are only supported with
3339
// these specific values. Generally, these are the default values that won't be present,
3440
// but if they are present we want to check them.
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.udf
21+
22+
import java.util.UUID
23+
import java.util.concurrent.ConcurrentHashMap
24+
25+
import org.apache.spark.sql.catalyst.expressions.Expression
26+
27+
/**
28+
* Thread-safe registry bridging plan-time Spark expressions to execution-time UDF lookup. At plan
29+
* time the serde layer registers a lambda expression under a unique key; at execution time the
30+
* UDF retrieves it by that key (passed as a scalar argument).
31+
*/
32+
object CometLambdaRegistry {
33+
34+
private val registry = new ConcurrentHashMap[String, Expression]()
35+
36+
def register(expression: Expression): String = {
37+
val key = UUID.randomUUID().toString
38+
registry.put(key, expression)
39+
key
40+
}
41+
42+
def get(key: String): Expression = {
43+
val expr = registry.get(key)
44+
if (expr == null) {
45+
throw new IllegalStateException(
46+
s"Lambda expression not found in registry for key: $key. " +
47+
"This indicates a lifecycle issue between plan creation and execution.")
48+
}
49+
expr
50+
}
51+
52+
def remove(key: String): Unit = {
53+
registry.remove(key)
54+
}
55+
56+
// Visible for testing
57+
def size(): Int = registry.size()
58+
}

common/src/main/scala/org/apache/comet/udf/CometUDF.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ import org.apache.arrow.vector.ValueVector
2929
* - Scalar (literal-folded) arguments arrive as length-1 vectors and must be read at index 0.
3030
* - The returned vector's length must match the longest input.
3131
*
32-
* Implementations must have a public no-arg constructor and should be stateless: instances are
33-
* cached per executor thread for the lifetime of the JVM.
32+
* Implementations must have a public no-arg constructor and must be stateless: a single instance
33+
* per class is cached and shared across native worker threads for the lifetime of the JVM.
3434
*/
3535
trait CometUDF {
3636
def evaluate(inputs: Array[ValueVector]): ValueVector

0 commit comments

Comments
 (0)