Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
43d5fb9
feat: Support Spark Expression Decode (#4284)
YutaLin May 15, 2026
9c76e87
feat: support stateful CometUDFs (#4345)
mbutrovich May 15, 2026
56aa4ca
feat: Wire DataFusion function Claude skill and `csc` implemented usi…
comphead May 15, 2026
526d216
docs: move changelogs from dev/ to docs/source/changelog/ (#4330)
andygrove May 15, 2026
323b6d7
feat: support current timezone (#4348)
YutaLin May 15, 2026
1deb1d2
deps: bump arrow and parquet to 58.3.0 (#4346)
mbutrovich May 15, 2026
32126c9
docs: add versioning policy (#4324)
andygrove May 16, 2026
dc08a96
fix: complete native_datafusion Parquet schema-mismatch rejections (#…
andygrove May 17, 2026
3209b98
feat: verify local_timestamp (#4331)
YutaLin May 17, 2026
b1c586a
feat: add from_utc_timestamp and to_utc_timestamp expressions (#4308)
andygrove May 17, 2026
0cec8f8
fix: configurable fallback when parquet vectorized reader is disabled…
andygrove May 18, 2026
ffcebaa
ci: fix Spark 4.0.2/JDK 21 flake by enabling per-suite dedicated JVMs…
andygrove May 18, 2026
64b5ac3
refactor: Move most of `comet-common` module into `comet-spark` (#4325)
andygrove May 18, 2026
0ca37e1
feat: add support for `posexplode` and `posexplode_outer` (#4270)
andygrove May 18, 2026
7203fb5
chore: Remove config option for `native_iceberg_compat` (#4019)
andygrove May 19, 2026
fbc3d2f
docs: remove references to native_datafusion and native_iceberg_compa…
andygrove May 19, 2026
0fd52b7
chore: remove dead native_iceberg_compat code path (#4363)
andygrove May 19, 2026
b7df937
feat: disable Comet by default when CometShuffleManager is not regist…
andygrove May 19, 2026
50791de
feat: add GroupsAccumulator for variance, stddev, covariance, correla…
andygrove May 19, 2026
9e5af5a
test: let Spark 4 tests use the Spark-default ANSI mode (#4370)
andygrove May 20, 2026
b993c0f
feat: wire `factorial` and update wire skill (#4349)
comphead May 20, 2026
2e8cc34
feat: Support Spark expression: convert_timezone (#4369)
YutaLin May 20, 2026
b23b760
feat: implement make_time and to_time (#4256)
parthchandra May 20, 2026
41d2e0c
feat: adding math sec expression (#4371)
athlcode May 20, 2026
c7cee9b
test: enable nested array cast coverage (#4278)
manuzhang May 20, 2026
271a4f4
chore(deps): bump the all-other-cargo-deps group across 1 directory w…
dependabot[bot] May 21, 2026
ce1b9d4
feat: implement parse_url (#4350)
parthchandra May 21, 2026
f3387fe
feat(experimental): ScalaUDF and Java UDF support via Janino codegen …
mbutrovich May 21, 2026
e11fdbe
fix: allow safe mixed Spark/Comet partial/final aggregate execution (…
andygrove May 21, 2026
a675092
chore: wire `rint` (#4372)
comphead May 21, 2026
184a883
test: fix merge conflicts in CodegenFuzzSuite and CodegenSuite (#4388)
mbutrovich May 21, 2026
8d59cb4
chore: DF54 upgrade
comphead May 21, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
131 changes: 131 additions & 0 deletions .claude/skills/wire-datafusion-function/SKILL.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
---
name: wire-datafusion-function
description: Use when wiring an existing DataFusion or datafusion-spark function into Comet for a Spark expression. Identifies the right wiring pattern (one-line passthrough, datafusion-spark UDF registration, or custom serde with input massaging / restrictions), applies the Scala serde, registers the UDF in jni_api when needed, and adds SQL file tests. Assumes the function already exists upstream — if not, switch to `implement-comet-expression`.
argument-hint: <expression-name>
---

Wire Comet support for the `$ARGUMENTS` Spark expression by reusing an existing DataFusion or `datafusion-spark` function. **No native Rust is written here** — if upstream coverage is missing, stop and run `implement-comet-expression`.

## Wiring patterns

Pick the lightest one that satisfies the Spark contract.

| Pattern | When | What to change |
| ------------------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| **A** — passthrough | DF built-in (e.g. `acos`), or `datafusion-spark` UDF already registered in `register_datafusion_spark_function` | one line in the right map of `QueryPlanSerde.scala`: `classOf[Foo] -> CometScalarFunction("foo")` |
| **B** — register + passthrough | `datafusion-spark` UDF, _not_ yet registered, semantics already match Spark | Pattern A line **plus** `session_ctx.register_udf(ScalarUDF::new_from_impl(SparkFoo::default()));` in `native/core/src/execution/jni_api.rs::register_datafusion_spark_function` |
| **C** — custom serde | Inputs need preprocessing (cast, `nullIfNegative`, `+0.0` flip), or you need to set return type / `failOnError`, restrict input types via `getSupportLevel`, enforce foldable-only args, or attach `getCompatibleNotes`/`getIncompatibleReasons`/`getUnsupportedReasons` | new `CometXxx` object in the topic file (`math.scala`, `strings.scala`, …); see `CometCeil`, `CometAtan2`, `CometLog`, `CometSha2`, `CometAbs` |

Function names must match: the string passed to `CometScalarFunction("xyz")` equals `SparkXyz::name()` upstream.

## Workflow

### 1. Study the Spark contract

Find the `case class $ARGUMENTS` (PascalCase). Prefer the user's local Spark clone (`CLAUDE.md` / project memory); fall back to `git clone --depth 1 https://github.com/apache/spark.git /tmp/spark-master`. If the clone is sandbox-blocked and no local clone is found, **stop and ask** — Spark is the ground truth.

Note `inputTypes`, `dataType`, `eval` / `nullSafeEval`, `require` guards, ANSI mode branches, and foldable-only arguments. These define what Comet must reproduce.

### 2. Find the upstream function (at the pinned version)

The pinned versions in `native/Cargo.toml` are the ground truth — a function on upstream `main` may not exist in the version Comet depends on. Resolve versions first, then search sources matching those exact versions. **Pick one source path; do not mix.**

```bash
REPO_ROOT=$(git rev-parse --show-toplevel)
# Portable regex — BSD awk on macOS does not support \b.
DF_SPARK_VER=$(awk -F'"' '/^datafusion-spark[ =]/ {print $2; exit}' "$REPO_ROOT/native/Cargo.toml")
DF_FUNCS_VER=$(awk -F'"' '/^datafusion[ =]/ {print $2; exit}' "$REPO_ROOT/native/Cargo.toml")
[ -z "$DF_SPARK_VER" ] || [ -z "$DF_FUNCS_VER" ] && { echo "ERROR: version extraction failed"; exit 1; }
EXPR='$ARGUMENTS'
```

**Option A — cargo registry (preferred):**

```bash
DF_SPARK=$(ls -d ~/.cargo/registry/src/*/datafusion-spark-${DF_SPARK_VER}/ 2>/dev/null | head -1)
DF_FUNCS=$(ls -d ~/.cargo/registry/src/*/datafusion-functions-${DF_FUNCS_VER}/ 2>/dev/null | head -1)
# If empty, run `cargo fetch` from native/.
grep -rin "fn name" "$DF_SPARK/src/function/" 2>/dev/null | grep -i "$EXPR"
grep -rin "fn name" "$DF_FUNCS/src/" 2>/dev/null | grep -i "$EXPR"
```

**Option B — local DataFusion clone** (only if `CLAUDE.md` / memory points at one). Grep at the pinned tag — DataFusion uses lightweight tags (`<version>`, no `v` prefix), so use `tag -l` not `rev-parse --verify ^{tag}`:

```bash
DF_CLONE=<path from memory>
[ -z "$(git -C "$DF_CLONE" tag -l "$DF_SPARK_VER")" ] && echo "WARNING: tag missing — git fetch --tags or use Option A"
git -C "$DF_CLONE" grep -in "fn name" "$DF_SPARK_VER" -- 'datafusion/spark/src/function/' | grep -i "$EXPR"
git -C "$DF_CLONE" grep -in "fn name" "$DF_FUNCS_VER" -- 'datafusion/functions/src/' | grep -i "$EXPR"
```

Never grep the clone's working tree directly — it may be on `main`.

### 2a. Decision gate — confirm the source crate

- **Found in `datafusion-spark`** → proceed (Spark-tuned by design). If listed in `register_datafusion_spark_function` → Pattern A; otherwise Pattern B.
- **Found only in `datafusion-functions`** → **STOP and `AskUserQuestion`**. Pure DataFusion follows standard SQL semantics and frequently diverges from Spark (NULL vs error, negatives, overflow, return type, NaN). Surface what you found in each crate and the divergences you've already identified, then offer:
1. Wire from `datafusion-functions` with Pattern C bridging — list the divergences and the masking/casting that closes them.
2. Stop and switch to `implement-comet-expression` to port to `datafusion-spark` upstream first.
- **Found in neither** → stop and run `implement-comet-expression`.

If semantics diverge in a way Pattern C can't bridge, escalate.

### 3. Apply the Scala wiring

Add the entry to the matching map in `spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala`: `mathExpressions`, `stringExpressions`, `arrayExpressions`, `mapExpressions`, `structExpressions`, `predicateExpressions`, `conditionalExpressions`, `bitwiseExpressions`, `temporalExpressions`, `hashExpressions`, `conversionExpressions`, `miscExpressions`.

Spark expression classes are in scope via `import org.apache.spark.sql.catalyst.expressions._`. For Pattern C, the topic file uses **explicit** imports — extend the existing `import …expressions.{…}` line and place the new object alongside its peers.

Helpers from `QueryPlanSerde`:

- `scalarFunctionExprToProto(name, args*)` — return type inferred, `failOnError = false`.
- `scalarFunctionExprToProtoWithReturnType(name, returnType, failOnError, args*)` — explicit type / fail-on-error.
- `optExprWithInfo(optExpr, expr, children*)` — wrap final result; propagates "why we couldn't convert" tags.
- `withInfo(expr, "reason")` — tag a fallback when returning `None`.

`getSupportLevel` returning `Incompatible(Some("…"))` gates behind `spark.comet.expr.<name>.allowIncompatible=true`. `Unsupported(…)` always falls back.

### 4. Register the UDF (Pattern B only)

In `native/core/src/execution/jni_api.rs::register_datafusion_spark_function`, add the `use` and the `register_udf` line, grouped with neighbors. Patterns A and C never touch native code.

### 5. Add SQL file tests

Create `spark/src/test/resources/sql-tests/expressions/<category>/$ARGUMENTS.sql`. Mandatory rules:

1. **Test only the directly supported input types** — read `inputTypes` from the Spark source. Do **not** add columns for implicit-cast widening (`ImplicitCastInputTypes`); the cast path has its own tests. E.g. `inputTypes = Seq(IntegerType)` (`Factorial`) → one `int` column, not byte/short/long/float/double/decimal. For `TypeCollection(A, B)`, add one column per member.
2. **All `SELECT`s read from a parquet table** — no literal-only queries (the constant folder skips the read path and column-vector kernel).
3. **Mix `NULL` into every column** — exercises the validity bitmap.
4. **Per type, cover the edge-case set:**
- Float / Double: `NaN`, `+0.0`, `-0.0`, `±Infinity` via `cast('NaN' as double)` etc.
- Integer / decimal: each type's max and min (`127`/`-128`, `2147483647`/`-2147483648`, `9223372036854775807`/`-9223372036854775808`, `Decimal(38, 0)` boundary) for overflow / wrap.
- String / binary: empty, multi-byte UTF-8 (`'é'`, `'日本'`), embedded NULs if relevant.
5. **Cover the Spark-specific edges** found in step 1: in-range boundaries, just-out-of-range values, ANSI error paths, foldable-only args.

Use `query expect_fallback(...)` for inputs where the serde returns `None`. Format: `docs/source/contributor-guide/sql-file-tests.md`.

### 6. Update docs

Hand-curated (`make` does not regenerate these):

- `docs/source/user-guide/latest/expressions.md` — add `| <ExpressionClass> | \`<sql_name>\` |` to the matching category table, alphabetical.
- `docs/source/contributor-guide/spark_expressions_support.md` — flip `- [ ] <name>` to `- [x] <name>`.

Per-expression compatibility pages under `compatibility/expressions/*.md` are auto-generated from `getCompatibleNotes` / `getIncompatibleReasons` / `getUnsupportedReasons` — do not hand-edit.

### 7. Build, audit, finalize

```bash
make
cd native && cargo clippy --all-targets --workspace -- -D warnings
```

Then run `audit-comet-expression` on `$ARGUMENTS` to compare against Spark 3.4.3 / 3.5.8 / 4.0.1 and surface coverage gaps; iterate on tests.

The user generally runs tests themselves. If asked for a smoke test:

```bash
./mvnw test -Dsuites="org.apache.comet.CometSqlFileTestSuite $ARGUMENTS" -Dtest=none
```

PR: fill in `.github/pull_request_template.md`; under "What changes are included", note the skill was used and which upstream function (`datafusion::…` built-in or `datafusion-spark::function::…`) is being wired.
6 changes: 0 additions & 6 deletions .github/actions/java-test/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@ inputs:
description: 'Maven options passed to the mvn command'
required: false
default: ''
scan_impl:
description: 'The default Parquet scan implementation'
required: false
default: 'auto'
upload-test-reports:
description: 'Whether to upload test results including coverage to GitHub'
required: false
Expand Down Expand Up @@ -72,7 +68,6 @@ runs:
shell: bash
if: ${{ inputs.suites == '' }}
env:
COMET_PARQUET_SCAN_IMPL: ${{ inputs.scan_impl }}
SPARK_LOCAL_HOSTNAME: "localhost"
SPARK_LOCAL_IP: "127.0.0.1"
run: |
Expand All @@ -81,7 +76,6 @@ runs:
shell: bash
if: ${{ inputs.suites != '' }}
env:
COMET_PARQUET_SCAN_IMPL: ${{ inputs.scan_impl }}
SPARK_LOCAL_HOSTNAME: "localhost"
SPARK_LOCAL_IP: "127.0.0.1"
run: |
Expand Down
2 changes: 1 addition & 1 deletion .github/actions/setup-spark-builder/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ runs:
path: |
~/.m2/repository
/root/.m2/repository
key: ${{ runner.os }}-spark-sql-${{ hashFiles('spark/**/pom.xml', 'common/**/pom.xml') }}
key: ${{ runner.os }}-spark-sql-${{ hashFiles('common/**/pom.xml', 'spark/**/pom.xml') }}
restore-keys: |
${{ runner.os }}-spark-sql-

Expand Down
4 changes: 0 additions & 4 deletions .github/workflows/iceberg_spark_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,9 @@ on:
- "doc/**"
- "docs/**"
- "**.md"
- "dev/changelog/*.md"
- "native/core/benches/**"
- "native/spark-expr/benches/**"
- "spark/src/test/**"
- "common/src/test/**"
- "spark/src/main/scala/org/apache/comet/GenerateDocs.scala"
- "spark-integration/**"
pull_request:
Expand All @@ -43,11 +41,9 @@ on:
- "doc/**"
- "docs/**"
- "**.md"
- "dev/changelog/*.md"
- "native/core/benches/**"
- "native/spark-expr/benches/**"
- "spark/src/test/**"
- "common/src/test/**"
- "spark/src/main/scala/org/apache/comet/GenerateDocs.scala"
- "spark-integration/**"
# manual trigger
Expand Down
2 changes: 0 additions & 2 deletions .github/workflows/miri.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ on:
- "doc/**"
- "docs/**"
- "**.md"
- "dev/changelog/*.md"
- "native/core/benches/**"
- "native/spark-expr/benches/**"
- "spark/src/test/scala/org/apache/spark/sql/benchmark/**"
Expand All @@ -39,7 +38,6 @@ on:
- "doc/**"
- "docs/**"
- "**.md"
- "dev/changelog/*.md"
- "native/core/benches/**"
- "native/spark-expr/benches/**"
- "spark/src/test/scala/org/apache/spark/sql/benchmark/**"
Expand Down
18 changes: 7 additions & 11 deletions .github/workflows/pr_build_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ on:
- "doc/**"
- "docs/**"
- "**.md"
- "dev/changelog/*.md"
- "native/core/benches/**"
- "native/spark-expr/benches/**"
- "spark/src/test/scala/org/apache/spark/sql/benchmark/**"
Expand All @@ -41,7 +40,6 @@ on:
- "doc/**"
- "docs/**"
- "**.md"
- "dev/changelog/*.md"
- "native/core/benches/**"
- "native/spark-expr/benches/**"
- "spark/src/test/scala/org/apache/spark/sql/benchmark/**"
Expand Down Expand Up @@ -191,7 +189,7 @@ jobs:
uses: ./.github/actions/setup-builder
with:
rust-version: ${{ env.RUST_VERSION }}
jdk-version: 17 # JDK only needed for common module proto generation
jdk-version: 17 # JDK only needed for JVM module proto generation

- name: Restore Cargo cache
uses: actions/cache/restore@v5
Expand Down Expand Up @@ -281,34 +279,30 @@ jobs:
- name: "Spark 3.4, JDK 11, Scala 2.12"
java_version: "11"
maven_opts: "-Pspark-3.4 -Pscala-2.12"
scan_impl: "auto"

- name: "Spark 3.5, JDK 17, Scala 2.13"
java_version: "17"
maven_opts: "-Pspark-3.5 -Pscala-2.13"
scan_impl: "native_iceberg_compat"

- name: "Spark 4.0, JDK 21"
java_version: "21"
maven_opts: "-Pspark-4.0"
scan_impl: "auto"

- name: "Spark 4.1, JDK 17"
java_version: "17"
maven_opts: "-Pspark-4.1"
scan_impl: "auto"

- name: "Spark 4.2, JDK 17"
java_version: "17"
maven_opts: "-Pspark-4.2"
scan_impl: "auto"
suite:
- name: "fuzz"
value: |
org.apache.comet.CometFuzzTestSuite
org.apache.comet.CometFuzzAggregateSuite
org.apache.comet.CometFuzzIcebergSuite
org.apache.comet.CometFuzzMathSuite
org.apache.comet.CometCodegenFuzzSuite
org.apache.comet.DataGeneratorSuite
- name: "shuffle"
value: |
Expand Down Expand Up @@ -387,12 +381,15 @@ jobs:
org.apache.comet.expressions.conditional.CometIfSuite
org.apache.comet.expressions.conditional.CometCoalesceSuite
org.apache.comet.expressions.conditional.CometCaseWhenSuite
org.apache.comet.CometCodegenSuite
org.apache.comet.CometCodegenSourceSuite
org.apache.comet.CometCodegenHOFSuite
- name: "sql"
value: |
org.apache.spark.sql.CometToPrettyStringSuite
org.apache.spark.sql.CometCollationSuite
fail-fast: false
name: ${{ matrix.profile.name }}/${{ matrix.profile.scan_impl }} [${{ matrix.suite.name }}]
name: ${{ matrix.profile.name }} [${{ matrix.suite.name }}]
runs-on: ubuntu-24.04
container:
image: amd64/rust
Expand Down Expand Up @@ -429,10 +426,9 @@ jobs:
- name: Java test steps
uses: ./.github/actions/java-test
with:
artifact_name: ${{ matrix.profile.name }}-${{ matrix.suite.name }}-${{ github.run_id }}-${{ github.run_number }}-${{ github.run_attempt }}-${{ matrix.profile.scan_impl }}
artifact_name: ${{ matrix.profile.name }}-${{ matrix.suite.name }}-${{ github.run_id }}-${{ github.run_number }}-${{ github.run_attempt }}
suites: ${{ matrix.suite.name == 'sql' && matrix.profile.name == 'Spark 3.4, JDK 11, Scala 2.12' && '' || matrix.suite.value }}
maven_opts: ${{ matrix.profile.maven_opts }}
scan_impl: ${{ matrix.profile.scan_impl }}
upload-test-reports: true
skip-native-build: true

Expand Down
6 changes: 4 additions & 2 deletions .github/workflows/pr_build_macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ on:
- "doc/**"
- "docs/**"
- "**.md"
- "dev/changelog/*.md"
- "native/core/benches/**"
- "native/spark-expr/benches/**"
- "spark/src/test/scala/org/apache/spark/sql/benchmark/**"
Expand All @@ -41,7 +40,6 @@ on:
- "doc/**"
- "docs/**"
- "**.md"
- "dev/changelog/*.md"
- "native/core/benches/**"
- "native/spark-expr/benches/**"
- "spark/src/test/scala/org/apache/spark/sql/benchmark/**"
Expand Down Expand Up @@ -157,6 +155,7 @@ jobs:
org.apache.comet.CometFuzzAggregateSuite
org.apache.comet.CometFuzzIcebergSuite
org.apache.comet.CometFuzzMathSuite
org.apache.comet.CometCodegenFuzzSuite
org.apache.comet.DataGeneratorSuite
- name: "shuffle"
value: |
Expand Down Expand Up @@ -234,6 +233,9 @@ jobs:
org.apache.comet.expressions.conditional.CometIfSuite
org.apache.comet.expressions.conditional.CometCoalesceSuite
org.apache.comet.expressions.conditional.CometCaseWhenSuite
org.apache.comet.CometCodegenSuite
org.apache.comet.CometCodegenSourceSuite
org.apache.comet.CometCodegenHOFSuite
- name: "sql"
value: |
org.apache.spark.sql.CometToPrettyStringSuite
Expand Down
Loading
Loading