Skip to content

Commit f6b4096

Browse files
committed
Merge branch 'main' of github.com:apache/datafusion-comet into worktree-pr-4239
# Conflicts: # spark/src/main/scala/org/apache/comet/udf/RegExpExtractAllUDF.scala # spark/src/main/scala/org/apache/comet/udf/RegExpExtractUDF.scala # spark/src/main/scala/org/apache/comet/udf/RegExpInStrUDF.scala # spark/src/main/scala/org/apache/comet/udf/RegExpLikeUDF.scala # spark/src/main/scala/org/apache/comet/udf/RegExpReplaceUDF.scala # spark/src/main/scala/org/apache/comet/udf/StringSplitUDF.scala
2 parents 0fa237f + fbc3d2f commit f6b4096

540 files changed

Lines changed: 4418 additions & 20711 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
---
2+
name: wire-datafusion-function
3+
description: Use when wiring an existing DataFusion or datafusion-spark function into Comet for a Spark expression. Identifies the right wiring pattern (one-line passthrough, datafusion-spark UDF registration, or custom serde with input massaging / restrictions), applies the Scala serde, registers the UDF in jni_api when needed, and adds SQL file tests. Assumes the function already exists upstream — if not, switch to `implement-comet-expression`.
4+
argument-hint: <expression-name>
5+
---
6+
7+
Wire Comet support for the `$ARGUMENTS` Spark expression by reusing an existing DataFusion or `datafusion-spark` function. **No native Rust implementation is written by this skill** — if upstream coverage is missing, stop and run `implement-comet-expression` instead.
8+
9+
## Background reading
10+
11+
- `docs/source/contributor-guide/adding_a_new_expression.md` — Scala serde, protobuf, support levels, SQL tests.
12+
- `docs/source/contributor-guide/sql-file-tests.md` — Comet SQL Tests format.
13+
- `spark/src/main/scala/org/apache/comet/serde/CometExpressionSerde.scala` — trait every serde implements.
14+
- `spark/src/main/scala/org/apache/comet/serde/CometScalarFunction.scala` — the one-line passthrough wrapper.
15+
- `spark/src/main/scala/org/apache/comet/serde/SupportLevel.scala``Compatible` / `Incompatible` / `Unsupported`.
16+
17+
## Wiring patterns
18+
19+
Pick the lightest pattern that satisfies the Spark contract. Steps below tell you which one to use.
20+
21+
### Pattern A — One-line passthrough (cleanest)
22+
23+
Spark expression maps directly to a function name resolvable at runtime. The function lives in either `datafusion-functions` (registered by default in `SessionContext`) or in `datafusion-spark` and is **already** registered in `register_datafusion_spark_function`. Single line in the right map of `QueryPlanSerde.scala`:
24+
25+
```scala
26+
classOf[Acos] -> CometScalarFunction("acos"), // datafusion-functions built-in
27+
classOf[Crc32] -> CometScalarFunction("crc32"), // datafusion-spark SparkCrc32 (already registered)
28+
classOf[DateAdd] -> CometScalarFunction[DateAdd]("date_add"), // see datetime.scala
29+
```
30+
31+
### Pattern B — Pattern A plus a one-line UDF registration
32+
33+
Same one-liner in `QueryPlanSerde.scala`, **plus** registering the new `datafusion-spark` UDF in `native/core/src/execution/jni_api.rs::register_datafusion_spark_function`:
34+
35+
```rust
36+
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkXyz::default()));
37+
```
38+
39+
Function names must match — the Scala name passed to `CometScalarFunction("xyz")` must equal `SparkXyz::name()` in the upstream crate.
40+
41+
### Pattern C — Custom `CometExpressionSerde`
42+
43+
Use when the Spark contract requires any of:
44+
45+
- preprocessing inputs (e.g. wrap with `nullIfNegative`, add `+0.0` to flip `-0.0`, cast),
46+
- setting an explicit return type or `fail_on_error` flag → use `scalarFunctionExprToProtoWithReturnType`,
47+
- restricting supported input types → implement `getSupportLevel` returning `Unsupported(notes)` / `Incompatible(notes)`,
48+
- foldable / literal-only argument checks (e.g. `Sha2.numBits` must be a literal),
49+
- documenting always-on differences via `getCompatibleNotes` / `getIncompatibleReasons` / `getUnsupportedReasons`.
50+
51+
Examples to study before writing one: `CometCeil` / `CometFloor` (decimal scale fallback), `CometAtan2` (input massaging), `CometLog` (null-if-negative), `CometSha2` (literal-only argument), `CometAbs` (`getSupportLevel`).
52+
53+
## Workflow
54+
55+
### 1. Study the Spark master implementation
56+
57+
Same as step 1 of `implement-comet-expression`. Shallow-clone if missing, then grep for the expression class. The class name is PascalCase (e.g. `Csc`, not `csc`); capitalize `$ARGUMENTS` before grepping, or use `-i`:
58+
59+
```bash
60+
if [ ! -d /tmp/spark-master ]; then
61+
git clone --depth 1 https://github.com/apache/spark.git /tmp/spark-master
62+
fi
63+
64+
CLASS_NAME="$(printf '%s' "$ARGUMENTS" | awk '{print toupper(substr($0,1,1)) substr($0,2)}')"
65+
find /tmp/spark-master/sql -name "*.scala" | \
66+
xargs grep -l "case class ${CLASS_NAME}\b\|object ${CLASS_NAME}\b" 2>/dev/null
67+
```
68+
69+
If the `git clone` is blocked (e.g. sandboxed environment), **stop and ask the user for an alternative source** — typically a local Spark clone path. Then point grep at that path instead. Do not silently skip Spark study — the Spark contract is the ground truth.
70+
71+
Note `inputTypes`, `dataType`, `eval` / `nullSafeEval`, ANSI mode branches, `require` guards, and any foldable-only arguments. These define the Spark contract Comet must match.
72+
73+
### 2. Find the upstream function
74+
75+
Read the pinned `datafusion-spark` version from `native/Cargo.toml` rather than picking the first cached copy — `head -1` can land on an older release that lacks the function:
76+
77+
```bash
78+
REPO_ROOT=$(git rev-parse --show-toplevel)
79+
DF_SPARK_VER=$(awk -F'"' '/^datafusion-spark\b/ {print $2; exit}' "$REPO_ROOT/native/Cargo.toml")
80+
DF_SPARK=$(ls -d ~/.cargo/registry/src/*/datafusion-spark-${DF_SPARK_VER}/ 2>/dev/null | head -1)
81+
echo "Using datafusion-spark $DF_SPARK_VER at $DF_SPARK"
82+
83+
DF_FUNCS_VER=$(awk -F'"' '/^datafusion = / {print $2; exit}' "$REPO_ROOT/native/Cargo.toml")
84+
DF_FUNCS=$(ls -d ~/.cargo/registry/src/*/datafusion-functions-${DF_FUNCS_VER}/ 2>/dev/null | head -1)
85+
86+
grep -rin "fn name" "$DF_SPARK/src/function/" 2>/dev/null | grep -i "$ARGUMENTS"
87+
grep -rin "fn name" "$DF_FUNCS/src/" 2>/dev/null | grep -i "$ARGUMENTS"
88+
```
89+
90+
If the cached crate directory does not exist (fresh checkout), run `cargo fetch` from `native/` first, or fall back to the latest cached version and verify the function exists in the pinned version's git tag before relying on it.
91+
92+
Verify the candidate function's `Signature`, return type, and behavior matches Spark across all relevant edge cases (NULL, overflow, non-finite floats, decimal scale, locale, ANSI mode). If semantics diverge in a way the Scala serde can't bridge with preprocessing or restrictions → **stop and run `implement-comet-expression`** instead.
93+
94+
For datafusion-spark candidates, also check whether the UDF is already pre-registered: see `register_datafusion_spark_function` in `native/core/src/execution/jni_api.rs`. If listed, you only need Pattern A. If missing, you need Pattern B.
95+
96+
### 3. Pick the wiring pattern
97+
98+
| Situation | Pattern |
99+
| ----------------------------------------------------------------------------------------------------- | ------- |
100+
| DF built-in (e.g. `acos`, `md5`, `replace`) matches Spark; or datafusion-spark UDF already registered | **A** |
101+
| New `datafusion-spark` UDF, semantics already match Spark | **B** |
102+
| Inputs need preprocessing, restrictions, or explicit return type / failOnError | **C** |
103+
104+
When in doubt, start with the lightest pattern and escalate only if the Spark contract demands it.
105+
106+
### 4. Apply the Scala wiring
107+
108+
Add the entry to the matching map in `spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala`:
109+
110+
- `mathExpressions`, `stringExpressions`, `arrayExpressions`, `mapExpressions`, `structExpressions`, `predicateExpressions`, `conditionalExpressions`, `bitwiseExpressions`, `temporalExpressions`, `hashExpressions`, `conversionExpressions`, `miscExpressions`.
111+
112+
Confirm the Spark expression class is in scope. `QueryPlanSerde.scala` already imports `org.apache.spark.sql.catalyst.expressions._` (wildcard), so most expressions are picked up for free. For Pattern C, the topic file (`math.scala`, `strings.scala`, `datetime.scala`, `hash.scala`, `arrays.scala`, etc.) uses **explicit** imports — add the new class name to the existing `import org.apache.spark.sql.catalyst.expressions.{...}` line, and place the serde object alongside its peers.
113+
114+
Helpers from `QueryPlanSerde`:
115+
116+
- `scalarFunctionExprToProto(funcName, args*)` — basic call, `failOnError = false`, return type inferred from args.
117+
- `scalarFunctionExprToProtoWithReturnType(funcName, returnType, failOnError, args*)` — set explicit return type and/or `fail_on_error`.
118+
- `optExprWithInfo(optExpr, expr, children*)` — attaches the "why we couldn't convert" tag if any child failed; always wrap your final result.
119+
- `withInfo(expr, "reason")` — tag a fallback reason when returning `None`.
120+
121+
For `getSupportLevel`, returning `Incompatible(Some("..."))` gates the function behind `spark.comet.expr.<exprConfigName>.allowIncompatible=true`. `Unsupported(notes)` always falls back to Spark.
122+
123+
### 5. Apply the native registration (Pattern B only)
124+
125+
In `native/core/src/execution/jni_api.rs::register_datafusion_spark_function`, add one line with the upstream import already in scope (or extend the imports). Keep the list grouped logically with neighboring entries.
126+
127+
Pattern A and Pattern C **never** touch native code.
128+
129+
### 6. Build and smoke-test
130+
131+
Build the project (the user generally runs tests themselves — do not run them proactively unless asked):
132+
133+
```bash
134+
make
135+
```
136+
137+
If the user asks for a smoke test:
138+
139+
```bash
140+
./mvnw test -Dsuites="org.apache.comet.CometSqlFileTestSuite $ARGUMENTS" -Dtest=none
141+
```
142+
143+
### 7. Add SQL file tests
144+
145+
Create `spark/src/test/resources/sql-tests/expressions/<category>/$ARGUMENTS.sql`. Coverage requirements (mandatory — do not skip):
146+
147+
1. **Inspect the Spark source for accepted input datatypes.** Look at `inputTypes`, the parent class (`UnaryMathExpression`, `BinaryExpression`, `ImplicitCastInputTypes`, `ExpectsInputTypes`), and any `dataType` overrides. Add one column per supported type — e.g. for a numeric function: `ByteType`, `ShortType`, `IntegerType`, `LongType`, `FloatType`, `DoubleType`, `DecimalType` (varying precision/scale). For a string function: also cover `BinaryType` if accepted.
148+
2. **All `SELECT`s must read from a parquet table.** Do not write literal-only queries like `SELECT f(1.0), f(NULL)` — encode the values you want to test as rows in a parquet-backed table and query that. This exercises the real read path, not just the constant-folder.
149+
3. **Mix `NULL` into every column** so the kernel exercises the validity bitmap, not just the all-valid fast path.
150+
4. **Floating-point (`FloatType` / `DoubleType`)** columns must include: `NaN`, `+0.0`, `-0.0`, `-Infinity`, `+Infinity`. Use `cast('NaN' as double)`, `cast('Infinity' as double)`, `cast('-Infinity' as double)`.
151+
5. **Integer / decimal** columns must include each type's max and min (e.g. `127` / `-128` for byte, `2147483647` / `-2147483648` for int, `9223372036854775807` / `-9223372036854775808` for long, `Decimal(38, 0)` boundary for decimal) so overflow / wrap behavior is exercised.
152+
6. Cover the **Spark-specific edge cases identified in step 1** (negative scale, empty string, ANSI-mode error paths, etc.) — again, as rows in the parquet table.
153+
154+
Use `query expect_fallback(...)` for inputs where the serde intentionally returns `None`. Format described in `docs/source/contributor-guide/sql-file-tests.md`.
155+
156+
### 8. Update the documentation
157+
158+
Two files are hand-curated (not regenerated by `make`) and must be updated for every newly wired expression:
159+
160+
- `docs/source/user-guide/latest/expressions.md` — add a row to the matching category table (`## Math Expressions`, `## String Expressions`, etc.) in alphabetical order: `| <ExpressionClass> | \`<sql_name>\` |`.
161+
- `docs/source/contributor-guide/spark_expressions_support.md` — flip the entry from `- [ ] <name>` to `- [x] <name>` in the matching category section.
162+
163+
Per-category compatibility pages under `docs/source/user-guide/latest/compatibility/expressions/*.md` are auto-generated by `GenerateDocs.scala` from `getCompatibleNotes` / `getIncompatibleReasons` / `getUnsupportedReasons`. Do not hand-edit those — `make` regenerates them.
164+
165+
### 9. Run the audit skill
166+
167+
Run `audit-comet-expression` on `$ARGUMENTS`. It compares against Spark 3.4.3, 3.5.8, and 4.0.1 and produces a prioritized gap list — usually missing test coverage. Iterate on tests until the user is satisfied.
168+
169+
### 10. Final checks
170+
171+
```bash
172+
make format
173+
cd native && cargo clippy --all-targets --workspace -- -D warnings
174+
```
175+
176+
### 11. Open the PR
177+
178+
Fill in every section of `.github/pull_request_template.md`. In "What changes are included in this PR?", note that the `wire-datafusion-function` skill was used and call out which upstream function (DataFusion built-in or `datafusion-spark::function::...`) is being wired.

.github/actions/java-test/action.yaml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,6 @@ inputs:
2929
description: 'Maven options passed to the mvn command'
3030
required: false
3131
default: ''
32-
scan_impl:
33-
description: 'The default Parquet scan implementation'
34-
required: false
35-
default: 'auto'
3632
upload-test-reports:
3733
description: 'Whether to upload test results including coverage to GitHub'
3834
required: false
@@ -72,7 +68,6 @@ runs:
7268
shell: bash
7369
if: ${{ inputs.suites == '' }}
7470
env:
75-
COMET_PARQUET_SCAN_IMPL: ${{ inputs.scan_impl }}
7671
SPARK_LOCAL_HOSTNAME: "localhost"
7772
SPARK_LOCAL_IP: "127.0.0.1"
7873
run: |
@@ -81,7 +76,6 @@ runs:
8176
shell: bash
8277
if: ${{ inputs.suites != '' }}
8378
env:
84-
COMET_PARQUET_SCAN_IMPL: ${{ inputs.scan_impl }}
8579
SPARK_LOCAL_HOSTNAME: "localhost"
8680
SPARK_LOCAL_IP: "127.0.0.1"
8781
run: |

.github/actions/setup-spark-builder/action.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ runs:
5151
path: |
5252
~/.m2/repository
5353
/root/.m2/repository
54-
key: ${{ runner.os }}-spark-sql-${{ hashFiles('spark/**/pom.xml', 'common/**/pom.xml') }}
54+
key: ${{ runner.os }}-spark-sql-${{ hashFiles('common/**/pom.xml', 'spark/**/pom.xml') }}
5555
restore-keys: |
5656
${{ runner.os }}-spark-sql-
5757

.github/workflows/codeql.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ permissions:
3636
jobs:
3737
analyze:
3838
name: Analyze Actions
39-
runs-on: ubuntu-24.04
39+
runs-on: ubuntu-slim
4040
permissions:
4141
contents: read
4242
security-events: write

.github/workflows/iceberg_spark_test.yml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,9 @@ on:
3030
- "doc/**"
3131
- "docs/**"
3232
- "**.md"
33-
- "dev/changelog/*.md"
3433
- "native/core/benches/**"
3534
- "native/spark-expr/benches/**"
3635
- "spark/src/test/**"
37-
- "common/src/test/**"
3836
- "spark/src/main/scala/org/apache/comet/GenerateDocs.scala"
3937
- "spark-integration/**"
4038
pull_request:
@@ -43,11 +41,9 @@ on:
4341
- "doc/**"
4442
- "docs/**"
4543
- "**.md"
46-
- "dev/changelog/*.md"
4744
- "native/core/benches/**"
4845
- "native/spark-expr/benches/**"
4946
- "spark/src/test/**"
50-
- "common/src/test/**"
5147
- "spark/src/main/scala/org/apache/comet/GenerateDocs.scala"
5248
- "spark-integration/**"
5349
# manual trigger

.github/workflows/label_new_issues.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ permissions:
2626

2727
jobs:
2828
add-triage-label:
29-
runs-on: ubuntu-latest
29+
runs-on: ubuntu-slim
3030
steps:
3131
- uses: actions/github-script@v9
3232
with:

.github/workflows/miri.yml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ on:
2929
- "doc/**"
3030
- "docs/**"
3131
- "**.md"
32-
- "dev/changelog/*.md"
3332
- "native/core/benches/**"
3433
- "native/spark-expr/benches/**"
3534
- "spark/src/test/scala/org/apache/spark/sql/benchmark/**"
@@ -39,7 +38,6 @@ on:
3938
- "doc/**"
4039
- "docs/**"
4140
- "**.md"
42-
- "dev/changelog/*.md"
4341
- "native/core/benches/**"
4442
- "native/spark-expr/benches/**"
4543
- "spark/src/test/scala/org/apache/spark/sql/benchmark/**"

.github/workflows/pr_build_linux.yml

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ on:
3030
- "doc/**"
3131
- "docs/**"
3232
- "**.md"
33-
- "dev/changelog/*.md"
3433
- "native/core/benches/**"
3534
- "native/spark-expr/benches/**"
3635
- "spark/src/test/scala/org/apache/spark/sql/benchmark/**"
@@ -41,7 +40,6 @@ on:
4140
- "doc/**"
4241
- "docs/**"
4342
- "**.md"
44-
- "dev/changelog/*.md"
4543
- "native/core/benches/**"
4644
- "native/spark-expr/benches/**"
4745
- "spark/src/test/scala/org/apache/spark/sql/benchmark/**"
@@ -191,7 +189,7 @@ jobs:
191189
uses: ./.github/actions/setup-builder
192190
with:
193191
rust-version: ${{ env.RUST_VERSION }}
194-
jdk-version: 17 # JDK only needed for common module proto generation
192+
jdk-version: 17 # JDK only needed for JVM module proto generation
195193

196194
- name: Restore Cargo cache
197195
uses: actions/cache/restore@v5
@@ -281,27 +279,22 @@ jobs:
281279
- name: "Spark 3.4, JDK 11, Scala 2.12"
282280
java_version: "11"
283281
maven_opts: "-Pspark-3.4 -Pscala-2.12"
284-
scan_impl: "auto"
285282

286283
- name: "Spark 3.5, JDK 17, Scala 2.13"
287284
java_version: "17"
288285
maven_opts: "-Pspark-3.5 -Pscala-2.13"
289-
scan_impl: "native_iceberg_compat"
290286

291287
- name: "Spark 4.0, JDK 21"
292288
java_version: "21"
293289
maven_opts: "-Pspark-4.0"
294-
scan_impl: "auto"
295290

296291
- name: "Spark 4.1, JDK 17"
297292
java_version: "17"
298293
maven_opts: "-Pspark-4.1"
299-
scan_impl: "auto"
300294

301295
- name: "Spark 4.2, JDK 17"
302296
java_version: "17"
303297
maven_opts: "-Pspark-4.2"
304-
scan_impl: "auto"
305298
suite:
306299
- name: "fuzz"
307300
value: |
@@ -393,7 +386,7 @@ jobs:
393386
org.apache.spark.sql.CometToPrettyStringSuite
394387
org.apache.spark.sql.CometCollationSuite
395388
fail-fast: false
396-
name: ${{ matrix.profile.name }}/${{ matrix.profile.scan_impl }} [${{ matrix.suite.name }}]
389+
name: ${{ matrix.profile.name }} [${{ matrix.suite.name }}]
397390
runs-on: ubuntu-24.04
398391
container:
399392
image: amd64/rust
@@ -430,10 +423,9 @@ jobs:
430423
- name: Java test steps
431424
uses: ./.github/actions/java-test
432425
with:
433-
artifact_name: ${{ matrix.profile.name }}-${{ matrix.suite.name }}-${{ github.run_id }}-${{ github.run_number }}-${{ github.run_attempt }}-${{ matrix.profile.scan_impl }}
426+
artifact_name: ${{ matrix.profile.name }}-${{ matrix.suite.name }}-${{ github.run_id }}-${{ github.run_number }}-${{ github.run_attempt }}
434427
suites: ${{ matrix.suite.name == 'sql' && matrix.profile.name == 'Spark 3.4, JDK 11, Scala 2.12' && '' || matrix.suite.value }}
435428
maven_opts: ${{ matrix.profile.maven_opts }}
436-
scan_impl: ${{ matrix.profile.scan_impl }}
437429
upload-test-reports: true
438430
skip-native-build: true
439431

.github/workflows/pr_build_macos.yml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ on:
3030
- "doc/**"
3131
- "docs/**"
3232
- "**.md"
33-
- "dev/changelog/*.md"
3433
- "native/core/benches/**"
3534
- "native/spark-expr/benches/**"
3635
- "spark/src/test/scala/org/apache/spark/sql/benchmark/**"
@@ -41,7 +40,6 @@ on:
4140
- "doc/**"
4241
- "docs/**"
4342
- "**.md"
44-
- "dev/changelog/*.md"
4543
- "native/core/benches/**"
4644
- "native/spark-expr/benches/**"
4745
- "spark/src/test/scala/org/apache/spark/sql/benchmark/**"

.github/workflows/pr_markdown_format.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ on:
2828

2929
jobs:
3030
prettier-check:
31-
runs-on: ubuntu-24.04
31+
runs-on: ubuntu-slim
3232
steps:
3333
- uses: actions/checkout@v6
3434

0 commit comments

Comments
 (0)