|
| 1 | +--- |
| 2 | +name: wire-datafusion-function |
| 3 | +description: Use when wiring an existing DataFusion or datafusion-spark function into Comet for a Spark expression. Identifies the right wiring pattern (one-line passthrough, datafusion-spark UDF registration, or custom serde with input massaging / restrictions), applies the Scala serde, registers the UDF in jni_api when needed, and adds SQL file tests. Assumes the function already exists upstream — if not, switch to `implement-comet-expression`. |
| 4 | +argument-hint: <expression-name> |
| 5 | +--- |
| 6 | + |
| 7 | +Wire Comet support for the `$ARGUMENTS` Spark expression by reusing an existing DataFusion or `datafusion-spark` function. **No native Rust is written here** — if upstream coverage is missing, stop and run `implement-comet-expression`. |
| 8 | + |
| 9 | +## Wiring patterns |
| 10 | + |
| 11 | +Pick the lightest one that satisfies the Spark contract. |
| 12 | + |
| 13 | +| Pattern | When | What to change | |
| 14 | +| ------------------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | |
| 15 | +| **A** — passthrough | DF built-in (e.g. `acos`), or `datafusion-spark` UDF already registered in `register_datafusion_spark_function` | one line in the right map of `QueryPlanSerde.scala`: `classOf[Foo] -> CometScalarFunction("foo")` | |
| 16 | +| **B** — register + passthrough | `datafusion-spark` UDF, _not_ yet registered, semantics already match Spark | Pattern A line **plus** `session_ctx.register_udf(ScalarUDF::new_from_impl(SparkFoo::default()));` in `native/core/src/execution/jni_api.rs::register_datafusion_spark_function` | |
| 17 | +| **C** — custom serde | Inputs need preprocessing (cast, `nullIfNegative`, `+0.0` flip), or you need to set return type / `failOnError`, restrict input types via `getSupportLevel`, enforce foldable-only args, or attach `getCompatibleNotes`/`getIncompatibleReasons`/`getUnsupportedReasons` | new `CometXxx` object in the topic file (`math.scala`, `strings.scala`, …); see `CometCeil`, `CometAtan2`, `CometLog`, `CometSha2`, `CometAbs` | |
| 18 | + |
| 19 | +Function names must match: the string passed to `CometScalarFunction("xyz")` equals `SparkXyz::name()` upstream. |
| 20 | + |
| 21 | +## Workflow |
| 22 | + |
| 23 | +### 1. Study the Spark contract |
| 24 | + |
| 25 | +Find the `case class $ARGUMENTS` (PascalCase). Prefer the user's local Spark clone (`CLAUDE.md` / project memory); fall back to `git clone --depth 1 https://github.com/apache/spark.git /tmp/spark-master`. If the clone is sandbox-blocked and no local clone is found, **stop and ask** — Spark is the ground truth. |
| 26 | + |
| 27 | +Note `inputTypes`, `dataType`, `eval` / `nullSafeEval`, `require` guards, ANSI mode branches, and foldable-only arguments. These define what Comet must reproduce. |
| 28 | + |
| 29 | +### 2. Find the upstream function (at the pinned version) |
| 30 | + |
| 31 | +The pinned versions in `native/Cargo.toml` are the ground truth — a function on upstream `main` may not exist in the version Comet depends on. Resolve versions first, then search sources matching those exact versions. **Pick one source path; do not mix.** |
| 32 | + |
| 33 | +```bash |
| 34 | +REPO_ROOT=$(git rev-parse --show-toplevel) |
| 35 | +# Portable regex — BSD awk on macOS does not support \b. |
| 36 | +DF_SPARK_VER=$(awk -F'"' '/^datafusion-spark[ =]/ {print $2; exit}' "$REPO_ROOT/native/Cargo.toml") |
| 37 | +DF_FUNCS_VER=$(awk -F'"' '/^datafusion[ =]/ {print $2; exit}' "$REPO_ROOT/native/Cargo.toml") |
| 38 | +[ -z "$DF_SPARK_VER" ] || [ -z "$DF_FUNCS_VER" ] && { echo "ERROR: version extraction failed"; exit 1; } |
| 39 | +EXPR='$ARGUMENTS' |
| 40 | +``` |
| 41 | + |
| 42 | +**Option A — cargo registry (preferred):** |
| 43 | + |
| 44 | +```bash |
| 45 | +DF_SPARK=$(ls -d ~/.cargo/registry/src/*/datafusion-spark-${DF_SPARK_VER}/ 2>/dev/null | head -1) |
| 46 | +DF_FUNCS=$(ls -d ~/.cargo/registry/src/*/datafusion-functions-${DF_FUNCS_VER}/ 2>/dev/null | head -1) |
| 47 | +# If empty, run `cargo fetch` from native/. |
| 48 | +grep -rin "fn name" "$DF_SPARK/src/function/" 2>/dev/null | grep -i "$EXPR" |
| 49 | +grep -rin "fn name" "$DF_FUNCS/src/" 2>/dev/null | grep -i "$EXPR" |
| 50 | +``` |
| 51 | + |
| 52 | +**Option B — local DataFusion clone** (only if `CLAUDE.md` / memory points at one). Grep at the pinned tag — DataFusion uses lightweight tags (`<version>`, no `v` prefix), so use `tag -l` not `rev-parse --verify ^{tag}`: |
| 53 | + |
| 54 | +```bash |
| 55 | +DF_CLONE=<path from memory> |
| 56 | +[ -z "$(git -C "$DF_CLONE" tag -l "$DF_SPARK_VER")" ] && echo "WARNING: tag missing — git fetch --tags or use Option A" |
| 57 | +git -C "$DF_CLONE" grep -in "fn name" "$DF_SPARK_VER" -- 'datafusion/spark/src/function/' | grep -i "$EXPR" |
| 58 | +git -C "$DF_CLONE" grep -in "fn name" "$DF_FUNCS_VER" -- 'datafusion/functions/src/' | grep -i "$EXPR" |
| 59 | +``` |
| 60 | + |
| 61 | +Never grep the clone's working tree directly — it may be on `main`. |
| 62 | + |
| 63 | +### 2a. Decision gate — confirm the source crate |
| 64 | + |
| 65 | +- **Found in `datafusion-spark`** → proceed (Spark-tuned by design). If listed in `register_datafusion_spark_function` → Pattern A; otherwise Pattern B. |
| 66 | +- **Found only in `datafusion-functions`** → **STOP and `AskUserQuestion`**. Pure DataFusion follows standard SQL semantics and frequently diverges from Spark (NULL vs error, negatives, overflow, return type, NaN). Surface what you found in each crate and the divergences you've already identified, then offer: |
| 67 | + 1. Wire from `datafusion-functions` with Pattern C bridging — list the divergences and the masking/casting that closes them. |
| 68 | + 2. Stop and switch to `implement-comet-expression` to port to `datafusion-spark` upstream first. |
| 69 | +- **Found in neither** → stop and run `implement-comet-expression`. |
| 70 | + |
| 71 | +If semantics diverge in a way Pattern C can't bridge, escalate. |
| 72 | + |
| 73 | +### 3. Apply the Scala wiring |
| 74 | + |
| 75 | +Add the entry to the matching map in `spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala`: `mathExpressions`, `stringExpressions`, `arrayExpressions`, `mapExpressions`, `structExpressions`, `predicateExpressions`, `conditionalExpressions`, `bitwiseExpressions`, `temporalExpressions`, `hashExpressions`, `conversionExpressions`, `miscExpressions`. |
| 76 | + |
| 77 | +Spark expression classes are in scope via `import org.apache.spark.sql.catalyst.expressions._`. For Pattern C, the topic file uses **explicit** imports — extend the existing `import …expressions.{…}` line and place the new object alongside its peers. |
| 78 | + |
| 79 | +Helpers from `QueryPlanSerde`: |
| 80 | + |
| 81 | +- `scalarFunctionExprToProto(name, args*)` — return type inferred, `failOnError = false`. |
| 82 | +- `scalarFunctionExprToProtoWithReturnType(name, returnType, failOnError, args*)` — explicit type / fail-on-error. |
| 83 | +- `optExprWithInfo(optExpr, expr, children*)` — wrap final result; propagates "why we couldn't convert" tags. |
| 84 | +- `withInfo(expr, "reason")` — tag a fallback when returning `None`. |
| 85 | + |
| 86 | +`getSupportLevel` returning `Incompatible(Some("…"))` gates behind `spark.comet.expr.<name>.allowIncompatible=true`. `Unsupported(…)` always falls back. |
| 87 | + |
| 88 | +### 4. Register the UDF (Pattern B only) |
| 89 | + |
| 90 | +In `native/core/src/execution/jni_api.rs::register_datafusion_spark_function`, add the `use` and the `register_udf` line, grouped with neighbors. Patterns A and C never touch native code. |
| 91 | + |
| 92 | +### 5. Add SQL file tests |
| 93 | + |
| 94 | +Create `spark/src/test/resources/sql-tests/expressions/<category>/$ARGUMENTS.sql`. Mandatory rules: |
| 95 | + |
| 96 | +1. **Test only the directly supported input types** — read `inputTypes` from the Spark source. Do **not** add columns for implicit-cast widening (`ImplicitCastInputTypes`); the cast path has its own tests. E.g. `inputTypes = Seq(IntegerType)` (`Factorial`) → one `int` column, not byte/short/long/float/double/decimal. For `TypeCollection(A, B)`, add one column per member. |
| 97 | +2. **All `SELECT`s read from a parquet table** — no literal-only queries (the constant folder skips the read path and column-vector kernel). |
| 98 | +3. **Mix `NULL` into every column** — exercises the validity bitmap. |
| 99 | +4. **Per type, cover the edge-case set:** |
| 100 | + - Float / Double: `NaN`, `+0.0`, `-0.0`, `±Infinity` via `cast('NaN' as double)` etc. |
| 101 | + - Integer / decimal: each type's max and min (`127`/`-128`, `2147483647`/`-2147483648`, `9223372036854775807`/`-9223372036854775808`, `Decimal(38, 0)` boundary) for overflow / wrap. |
| 102 | + - String / binary: empty, multi-byte UTF-8 (`'é'`, `'日本'`), embedded NULs if relevant. |
| 103 | +5. **Cover the Spark-specific edges** found in step 1: in-range boundaries, just-out-of-range values, ANSI error paths, foldable-only args. |
| 104 | + |
| 105 | +Use `query expect_fallback(...)` for inputs where the serde returns `None`. Format: `docs/source/contributor-guide/sql-file-tests.md`. |
| 106 | + |
| 107 | +### 6. Update docs |
| 108 | + |
| 109 | +Hand-curated (`make` does not regenerate these): |
| 110 | + |
| 111 | +- `docs/source/user-guide/latest/expressions.md` — add `| <ExpressionClass> | \`<sql_name>\` |` to the matching category table, alphabetical. |
| 112 | +- `docs/source/contributor-guide/spark_expressions_support.md` — flip `- [ ] <name>` to `- [x] <name>`. |
| 113 | + |
| 114 | +Per-expression compatibility pages under `compatibility/expressions/*.md` are auto-generated from `getCompatibleNotes` / `getIncompatibleReasons` / `getUnsupportedReasons` — do not hand-edit. |
| 115 | + |
| 116 | +### 7. Build, audit, finalize |
| 117 | + |
| 118 | +```bash |
| 119 | +make |
| 120 | +cd native && cargo clippy --all-targets --workspace -- -D warnings |
| 121 | +``` |
| 122 | + |
| 123 | +Then run `audit-comet-expression` on `$ARGUMENTS` to compare against Spark 3.4.3 / 3.5.8 / 4.0.1 and surface coverage gaps; iterate on tests. |
| 124 | + |
| 125 | +The user generally runs tests themselves. If asked for a smoke test: |
| 126 | + |
| 127 | +```bash |
| 128 | +./mvnw test -Dsuites="org.apache.comet.CometSqlFileTestSuite $ARGUMENTS" -Dtest=none |
| 129 | +``` |
| 130 | + |
| 131 | +PR: fill in `.github/pull_request_template.md`; under "What changes are included", note the skill was used and which upstream function (`datafusion::…` built-in or `datafusion-spark::function::…`) is being wired. |
0 commit comments