|
| 1 | +--- |
| 2 | +name: wire-datafusion-function |
| 3 | +description: Use when wiring an existing DataFusion or datafusion-spark function into Comet for a Spark expression. Identifies the right wiring pattern (one-line passthrough, datafusion-spark UDF registration, or custom serde with input massaging / restrictions), applies the Scala serde, registers the UDF in jni_api when needed, and adds SQL file tests. Assumes the function already exists upstream — if not, switch to `implement-comet-expression`. |
| 4 | +argument-hint: <expression-name> |
| 5 | +--- |
| 6 | + |
| 7 | +Wire Comet support for the `$ARGUMENTS` Spark expression by reusing an existing DataFusion or `datafusion-spark` function. **No native Rust implementation is written by this skill** — if upstream coverage is missing, stop and run `implement-comet-expression` instead. |
| 8 | + |
| 9 | +## Background reading |
| 10 | + |
| 11 | +- `docs/source/contributor-guide/adding_a_new_expression.md` — Scala serde, protobuf, support levels, SQL tests. |
| 12 | +- `docs/source/contributor-guide/sql-file-tests.md` — Comet SQL Tests format. |
| 13 | +- `spark/src/main/scala/org/apache/comet/serde/CometExpressionSerde.scala` — trait every serde implements. |
| 14 | +- `spark/src/main/scala/org/apache/comet/serde/CometScalarFunction.scala` — the one-line passthrough wrapper. |
| 15 | +- `spark/src/main/scala/org/apache/comet/serde/SupportLevel.scala` — `Compatible` / `Incompatible` / `Unsupported`. |
| 16 | + |
| 17 | +## Wiring patterns |
| 18 | + |
| 19 | +Pick the lightest pattern that satisfies the Spark contract. Steps below tell you which one to use. |
| 20 | + |
| 21 | +### Pattern A — One-line passthrough (cleanest) |
| 22 | + |
| 23 | +Spark expression maps directly to a function name resolvable at runtime. The function lives in either `datafusion-functions` (registered by default in `SessionContext`) or in `datafusion-spark` and is **already** registered in `register_datafusion_spark_function`. Single line in the right map of `QueryPlanSerde.scala`: |
| 24 | + |
| 25 | +```scala |
| 26 | +classOf[Acos] -> CometScalarFunction("acos"), // datafusion-functions built-in |
| 27 | +classOf[Crc32] -> CometScalarFunction("crc32"), // datafusion-spark SparkCrc32 (already registered) |
| 28 | +classOf[DateAdd] -> CometScalarFunction[DateAdd]("date_add"), // see datetime.scala |
| 29 | +``` |
| 30 | + |
| 31 | +### Pattern B — Pattern A plus a one-line UDF registration |
| 32 | + |
| 33 | +Same one-liner in `QueryPlanSerde.scala`, **plus** registering the new `datafusion-spark` UDF in `native/core/src/execution/jni_api.rs::register_datafusion_spark_function`: |
| 34 | + |
| 35 | +```rust |
| 36 | +session_ctx.register_udf(ScalarUDF::new_from_impl(SparkXyz::default())); |
| 37 | +``` |
| 38 | + |
| 39 | +Function names must match — the Scala name passed to `CometScalarFunction("xyz")` must equal `SparkXyz::name()` in the upstream crate. |
| 40 | + |
| 41 | +### Pattern C — Custom `CometExpressionSerde` |
| 42 | + |
| 43 | +Use when the Spark contract requires any of: |
| 44 | + |
| 45 | +- preprocessing inputs (e.g. wrap with `nullIfNegative`, add `+0.0` to flip `-0.0`, cast), |
| 46 | +- setting an explicit return type or `fail_on_error` flag → use `scalarFunctionExprToProtoWithReturnType`, |
| 47 | +- restricting supported input types → implement `getSupportLevel` returning `Unsupported(notes)` / `Incompatible(notes)`, |
| 48 | +- foldable / literal-only argument checks (e.g. `Sha2.numBits` must be a literal), |
| 49 | +- documenting always-on differences via `getCompatibleNotes` / `getIncompatibleReasons` / `getUnsupportedReasons`. |
| 50 | + |
| 51 | +Examples to study before writing one: `CometCeil` / `CometFloor` (decimal scale fallback), `CometAtan2` (input massaging), `CometLog` (null-if-negative), `CometSha2` (literal-only argument), `CometAbs` (`getSupportLevel`). |
| 52 | + |
| 53 | +## Workflow |
| 54 | + |
| 55 | +### 1. Study the Spark master implementation |
| 56 | + |
| 57 | +Same as step 1 of `implement-comet-expression`. Shallow-clone if missing, then grep for the expression class. The class name is PascalCase (e.g. `Csc`, not `csc`); capitalize `$ARGUMENTS` before grepping, or use `-i`: |
| 58 | + |
| 59 | +```bash |
| 60 | +if [ ! -d /tmp/spark-master ]; then |
| 61 | + git clone --depth 1 https://github.com/apache/spark.git /tmp/spark-master |
| 62 | +fi |
| 63 | + |
| 64 | +CLASS_NAME="$(printf '%s' "$ARGUMENTS" | awk '{print toupper(substr($0,1,1)) substr($0,2)}')" |
| 65 | +find /tmp/spark-master/sql -name "*.scala" | \ |
| 66 | + xargs grep -l "case class ${CLASS_NAME}\b\|object ${CLASS_NAME}\b" 2>/dev/null |
| 67 | +``` |
| 68 | + |
| 69 | +If the `git clone` is blocked (e.g. sandboxed environment), **stop and ask the user for an alternative source** — typically a local Spark clone path. Then point grep at that path instead. Do not silently skip Spark study — the Spark contract is the ground truth. |
| 70 | + |
| 71 | +Note `inputTypes`, `dataType`, `eval` / `nullSafeEval`, ANSI mode branches, `require` guards, and any foldable-only arguments. These define the Spark contract Comet must match. |
| 72 | + |
| 73 | +### 2. Find the upstream function |
| 74 | + |
| 75 | +Read the pinned `datafusion-spark` version from `native/Cargo.toml` rather than picking the first cached copy — `head -1` can land on an older release that lacks the function: |
| 76 | + |
| 77 | +```bash |
| 78 | +REPO_ROOT=$(git rev-parse --show-toplevel) |
| 79 | +DF_SPARK_VER=$(awk -F'"' '/^datafusion-spark\b/ {print $2; exit}' "$REPO_ROOT/native/Cargo.toml") |
| 80 | +DF_SPARK=$(ls -d ~/.cargo/registry/src/*/datafusion-spark-${DF_SPARK_VER}/ 2>/dev/null | head -1) |
| 81 | +echo "Using datafusion-spark $DF_SPARK_VER at $DF_SPARK" |
| 82 | + |
| 83 | +DF_FUNCS_VER=$(awk -F'"' '/^datafusion = / {print $2; exit}' "$REPO_ROOT/native/Cargo.toml") |
| 84 | +DF_FUNCS=$(ls -d ~/.cargo/registry/src/*/datafusion-functions-${DF_FUNCS_VER}/ 2>/dev/null | head -1) |
| 85 | + |
| 86 | +grep -rin "fn name" "$DF_SPARK/src/function/" 2>/dev/null | grep -i "$ARGUMENTS" |
| 87 | +grep -rin "fn name" "$DF_FUNCS/src/" 2>/dev/null | grep -i "$ARGUMENTS" |
| 88 | +``` |
| 89 | + |
| 90 | +If the cached crate directory does not exist (fresh checkout), run `cargo fetch` from `native/` first, or fall back to the latest cached version and verify the function exists in the pinned version's git tag before relying on it. |
| 91 | + |
| 92 | +Verify the candidate function's `Signature`, return type, and behavior matches Spark across all relevant edge cases (NULL, overflow, non-finite floats, decimal scale, locale, ANSI mode). If semantics diverge in a way the Scala serde can't bridge with preprocessing or restrictions → **stop and run `implement-comet-expression`** instead. |
| 93 | + |
| 94 | +For datafusion-spark candidates, also check whether the UDF is already pre-registered: see `register_datafusion_spark_function` in `native/core/src/execution/jni_api.rs`. If listed, you only need Pattern A. If missing, you need Pattern B. |
| 95 | + |
| 96 | +### 3. Pick the wiring pattern |
| 97 | + |
| 98 | +| Situation | Pattern | |
| 99 | +| ----------------------------------------------------------------------------------------------------- | ------- | |
| 100 | +| DF built-in (e.g. `acos`, `md5`, `replace`) matches Spark; or datafusion-spark UDF already registered | **A** | |
| 101 | +| New `datafusion-spark` UDF, semantics already match Spark | **B** | |
| 102 | +| Inputs need preprocessing, restrictions, or explicit return type / failOnError | **C** | |
| 103 | + |
| 104 | +When in doubt, start with the lightest pattern and escalate only if the Spark contract demands it. |
| 105 | + |
| 106 | +### 4. Apply the Scala wiring |
| 107 | + |
| 108 | +Add the entry to the matching map in `spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala`: |
| 109 | + |
| 110 | +- `mathExpressions`, `stringExpressions`, `arrayExpressions`, `mapExpressions`, `structExpressions`, `predicateExpressions`, `conditionalExpressions`, `bitwiseExpressions`, `temporalExpressions`, `hashExpressions`, `conversionExpressions`, `miscExpressions`. |
| 111 | + |
| 112 | +Confirm the Spark expression class is in scope. `QueryPlanSerde.scala` already imports `org.apache.spark.sql.catalyst.expressions._` (wildcard), so most expressions are picked up for free. For Pattern C, the topic file (`math.scala`, `strings.scala`, `datetime.scala`, `hash.scala`, `arrays.scala`, etc.) uses **explicit** imports — add the new class name to the existing `import org.apache.spark.sql.catalyst.expressions.{...}` line, and place the serde object alongside its peers. |
| 113 | + |
| 114 | +Helpers from `QueryPlanSerde`: |
| 115 | + |
| 116 | +- `scalarFunctionExprToProto(funcName, args*)` — basic call, `failOnError = false`, return type inferred from args. |
| 117 | +- `scalarFunctionExprToProtoWithReturnType(funcName, returnType, failOnError, args*)` — set explicit return type and/or `fail_on_error`. |
| 118 | +- `optExprWithInfo(optExpr, expr, children*)` — attaches the "why we couldn't convert" tag if any child failed; always wrap your final result. |
| 119 | +- `withInfo(expr, "reason")` — tag a fallback reason when returning `None`. |
| 120 | + |
| 121 | +For `getSupportLevel`, returning `Incompatible(Some("..."))` gates the function behind `spark.comet.expr.<exprConfigName>.allowIncompatible=true`. `Unsupported(notes)` always falls back to Spark. |
| 122 | + |
| 123 | +### 5. Apply the native registration (Pattern B only) |
| 124 | + |
| 125 | +In `native/core/src/execution/jni_api.rs::register_datafusion_spark_function`, add one line with the upstream import already in scope (or extend the imports). Keep the list grouped logically with neighboring entries. |
| 126 | + |
| 127 | +Pattern A and Pattern C **never** touch native code. |
| 128 | + |
| 129 | +### 6. Build and smoke-test |
| 130 | + |
| 131 | +Build the project (the user generally runs tests themselves — do not run them proactively unless asked): |
| 132 | + |
| 133 | +```bash |
| 134 | +make |
| 135 | +``` |
| 136 | + |
| 137 | +If the user asks for a smoke test: |
| 138 | + |
| 139 | +```bash |
| 140 | +./mvnw test -Dsuites="org.apache.comet.CometSqlFileTestSuite $ARGUMENTS" -Dtest=none |
| 141 | +``` |
| 142 | + |
| 143 | +### 7. Add SQL file tests |
| 144 | + |
| 145 | +Create `spark/src/test/resources/sql-tests/expressions/<category>/$ARGUMENTS.sql`. Coverage requirements (mandatory — do not skip): |
| 146 | + |
| 147 | +1. **Inspect the Spark source for accepted input datatypes.** Look at `inputTypes`, the parent class (`UnaryMathExpression`, `BinaryExpression`, `ImplicitCastInputTypes`, `ExpectsInputTypes`), and any `dataType` overrides. Add one column per supported type — e.g. for a numeric function: `ByteType`, `ShortType`, `IntegerType`, `LongType`, `FloatType`, `DoubleType`, `DecimalType` (varying precision/scale). For a string function: also cover `BinaryType` if accepted. |
| 148 | +2. **All `SELECT`s must read from a parquet table.** Do not write literal-only queries like `SELECT f(1.0), f(NULL)` — encode the values you want to test as rows in a parquet-backed table and query that. This exercises the real read path, not just the constant-folder. |
| 149 | +3. **Mix `NULL` into every column** so the kernel exercises the validity bitmap, not just the all-valid fast path. |
| 150 | +4. **Floating-point (`FloatType` / `DoubleType`)** columns must include: `NaN`, `+0.0`, `-0.0`, `-Infinity`, `+Infinity`. Use `cast('NaN' as double)`, `cast('Infinity' as double)`, `cast('-Infinity' as double)`. |
| 151 | +5. **Integer / decimal** columns must include each type's max and min (e.g. `127` / `-128` for byte, `2147483647` / `-2147483648` for int, `9223372036854775807` / `-9223372036854775808` for long, `Decimal(38, 0)` boundary for decimal) so overflow / wrap behavior is exercised. |
| 152 | +6. Cover the **Spark-specific edge cases identified in step 1** (negative scale, empty string, ANSI-mode error paths, etc.) — again, as rows in the parquet table. |
| 153 | + |
| 154 | +Use `query expect_fallback(...)` for inputs where the serde intentionally returns `None`. Format described in `docs/source/contributor-guide/sql-file-tests.md`. |
| 155 | + |
| 156 | +### 8. Update the documentation |
| 157 | + |
| 158 | +Two files are hand-curated (not regenerated by `make`) and must be updated for every newly wired expression: |
| 159 | + |
| 160 | +- `docs/source/user-guide/latest/expressions.md` — add a row to the matching category table (`## Math Expressions`, `## String Expressions`, etc.) in alphabetical order: `| <ExpressionClass> | \`<sql_name>\` |`. |
| 161 | +- `docs/source/contributor-guide/spark_expressions_support.md` — flip the entry from `- [ ] <name>` to `- [x] <name>` in the matching category section. |
| 162 | + |
| 163 | +Per-category compatibility pages under `docs/source/user-guide/latest/compatibility/expressions/*.md` are auto-generated by `GenerateDocs.scala` from `getCompatibleNotes` / `getIncompatibleReasons` / `getUnsupportedReasons`. Do not hand-edit those — `make` regenerates them. |
| 164 | + |
| 165 | +### 9. Run the audit skill |
| 166 | + |
| 167 | +Run `audit-comet-expression` on `$ARGUMENTS`. It compares against Spark 3.4.3, 3.5.8, and 4.0.1 and produces a prioritized gap list — usually missing test coverage. Iterate on tests until the user is satisfied. |
| 168 | + |
| 169 | +### 10. Final checks |
| 170 | + |
| 171 | +```bash |
| 172 | +make format |
| 173 | +cd native && cargo clippy --all-targets --workspace -- -D warnings |
| 174 | +``` |
| 175 | + |
| 176 | +### 11. Open the PR |
| 177 | + |
| 178 | +Fill in every section of `.github/pull_request_template.md`. In "What changes are included in this PR?", note that the `wire-datafusion-function` skill was used and call out which upstream function (DataFusion built-in or `datafusion-spark::function::...`) is being wired. |
0 commit comments