Skip to content

Commit 56aa4ca

Browse files
authored
feat: Wire DataFusion function Claude skill and csc implemented using it (apache#4337)
1 parent 9c76e87 commit 56aa4ca

6 files changed

Lines changed: 208 additions & 1 deletion

File tree

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
---
2+
name: wire-datafusion-function
3+
description: Use when wiring an existing DataFusion or datafusion-spark function into Comet for a Spark expression. Identifies the right wiring pattern (one-line passthrough, datafusion-spark UDF registration, or custom serde with input massaging / restrictions), applies the Scala serde, registers the UDF in jni_api when needed, and adds SQL file tests. Assumes the function already exists upstream — if not, switch to `implement-comet-expression`.
4+
argument-hint: <expression-name>
5+
---
6+
7+
Wire Comet support for the `$ARGUMENTS` Spark expression by reusing an existing DataFusion or `datafusion-spark` function. **No native Rust implementation is written by this skill** — if upstream coverage is missing, stop and run `implement-comet-expression` instead.
8+
9+
## Background reading
10+
11+
- `docs/source/contributor-guide/adding_a_new_expression.md` — Scala serde, protobuf, support levels, SQL tests.
12+
- `docs/source/contributor-guide/sql-file-tests.md` — Comet SQL Tests format.
13+
- `spark/src/main/scala/org/apache/comet/serde/CometExpressionSerde.scala` — trait every serde implements.
14+
- `spark/src/main/scala/org/apache/comet/serde/CometScalarFunction.scala` — the one-line passthrough wrapper.
15+
- `spark/src/main/scala/org/apache/comet/serde/SupportLevel.scala``Compatible` / `Incompatible` / `Unsupported`.
16+
17+
## Wiring patterns
18+
19+
Pick the lightest pattern that satisfies the Spark contract. Steps below tell you which one to use.
20+
21+
### Pattern A — One-line passthrough (cleanest)
22+
23+
Spark expression maps directly to a function name resolvable at runtime. The function lives in either `datafusion-functions` (registered by default in `SessionContext`) or in `datafusion-spark` and is **already** registered in `register_datafusion_spark_function`. Single line in the right map of `QueryPlanSerde.scala`:
24+
25+
```scala
26+
classOf[Acos] -> CometScalarFunction("acos"), // datafusion-functions built-in
27+
classOf[Crc32] -> CometScalarFunction("crc32"), // datafusion-spark SparkCrc32 (already registered)
28+
classOf[DateAdd] -> CometScalarFunction[DateAdd]("date_add"), // see datetime.scala
29+
```
30+
31+
### Pattern B — Pattern A plus a one-line UDF registration
32+
33+
Same one-liner in `QueryPlanSerde.scala`, **plus** registering the new `datafusion-spark` UDF in `native/core/src/execution/jni_api.rs::register_datafusion_spark_function`:
34+
35+
```rust
36+
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkXyz::default()));
37+
```
38+
39+
Function names must match — the Scala name passed to `CometScalarFunction("xyz")` must equal `SparkXyz::name()` in the upstream crate.
40+
41+
### Pattern C — Custom `CometExpressionSerde`
42+
43+
Use when the Spark contract requires any of:
44+
45+
- preprocessing inputs (e.g. wrap with `nullIfNegative`, add `+0.0` to flip `-0.0`, cast),
46+
- setting an explicit return type or `fail_on_error` flag → use `scalarFunctionExprToProtoWithReturnType`,
47+
- restricting supported input types → implement `getSupportLevel` returning `Unsupported(notes)` / `Incompatible(notes)`,
48+
- foldable / literal-only argument checks (e.g. `Sha2.numBits` must be a literal),
49+
- documenting always-on differences via `getCompatibleNotes` / `getIncompatibleReasons` / `getUnsupportedReasons`.
50+
51+
Examples to study before writing one: `CometCeil` / `CometFloor` (decimal scale fallback), `CometAtan2` (input massaging), `CometLog` (null-if-negative), `CometSha2` (literal-only argument), `CometAbs` (`getSupportLevel`).
52+
53+
## Workflow
54+
55+
### 1. Study the Spark master implementation
56+
57+
Same as step 1 of `implement-comet-expression`. Shallow-clone if missing, then grep for the expression class. The class name is PascalCase (e.g. `Csc`, not `csc`); capitalize `$ARGUMENTS` before grepping, or use `-i`:
58+
59+
```bash
60+
if [ ! -d /tmp/spark-master ]; then
61+
git clone --depth 1 https://github.com/apache/spark.git /tmp/spark-master
62+
fi
63+
64+
CLASS_NAME="$(printf '%s' "$ARGUMENTS" | awk '{print toupper(substr($0,1,1)) substr($0,2)}')"
65+
find /tmp/spark-master/sql -name "*.scala" | \
66+
xargs grep -l "case class ${CLASS_NAME}\b\|object ${CLASS_NAME}\b" 2>/dev/null
67+
```
68+
69+
If the `git clone` is blocked (e.g. sandboxed environment), **stop and ask the user for an alternative source** — typically a local Spark clone path. Then point grep at that path instead. Do not silently skip Spark study — the Spark contract is the ground truth.
70+
71+
Note `inputTypes`, `dataType`, `eval` / `nullSafeEval`, ANSI mode branches, `require` guards, and any foldable-only arguments. These define the Spark contract Comet must match.
72+
73+
### 2. Find the upstream function
74+
75+
Read the pinned `datafusion-spark` version from `native/Cargo.toml` rather than picking the first cached copy — `head -1` can land on an older release that lacks the function:
76+
77+
```bash
78+
REPO_ROOT=$(git rev-parse --show-toplevel)
79+
DF_SPARK_VER=$(awk -F'"' '/^datafusion-spark\b/ {print $2; exit}' "$REPO_ROOT/native/Cargo.toml")
80+
DF_SPARK=$(ls -d ~/.cargo/registry/src/*/datafusion-spark-${DF_SPARK_VER}/ 2>/dev/null | head -1)
81+
echo "Using datafusion-spark $DF_SPARK_VER at $DF_SPARK"
82+
83+
DF_FUNCS_VER=$(awk -F'"' '/^datafusion = / {print $2; exit}' "$REPO_ROOT/native/Cargo.toml")
84+
DF_FUNCS=$(ls -d ~/.cargo/registry/src/*/datafusion-functions-${DF_FUNCS_VER}/ 2>/dev/null | head -1)
85+
86+
grep -rin "fn name" "$DF_SPARK/src/function/" 2>/dev/null | grep -i "$ARGUMENTS"
87+
grep -rin "fn name" "$DF_FUNCS/src/" 2>/dev/null | grep -i "$ARGUMENTS"
88+
```
89+
90+
If the cached crate directory does not exist (fresh checkout), run `cargo fetch` from `native/` first, or fall back to the latest cached version and verify the function exists in the pinned version's git tag before relying on it.
91+
92+
Verify the candidate function's `Signature`, return type, and behavior matches Spark across all relevant edge cases (NULL, overflow, non-finite floats, decimal scale, locale, ANSI mode). If semantics diverge in a way the Scala serde can't bridge with preprocessing or restrictions → **stop and run `implement-comet-expression`** instead.
93+
94+
For datafusion-spark candidates, also check whether the UDF is already pre-registered: see `register_datafusion_spark_function` in `native/core/src/execution/jni_api.rs`. If listed, you only need Pattern A. If missing, you need Pattern B.
95+
96+
### 3. Pick the wiring pattern
97+
98+
| Situation | Pattern |
99+
| ----------------------------------------------------------------------------------------------------- | ------- |
100+
| DF built-in (e.g. `acos`, `md5`, `replace`) matches Spark; or datafusion-spark UDF already registered | **A** |
101+
| New `datafusion-spark` UDF, semantics already match Spark | **B** |
102+
| Inputs need preprocessing, restrictions, or explicit return type / failOnError | **C** |
103+
104+
When in doubt, start with the lightest pattern and escalate only if the Spark contract demands it.
105+
106+
### 4. Apply the Scala wiring
107+
108+
Add the entry to the matching map in `spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala`:
109+
110+
- `mathExpressions`, `stringExpressions`, `arrayExpressions`, `mapExpressions`, `structExpressions`, `predicateExpressions`, `conditionalExpressions`, `bitwiseExpressions`, `temporalExpressions`, `hashExpressions`, `conversionExpressions`, `miscExpressions`.
111+
112+
Confirm the Spark expression class is in scope. `QueryPlanSerde.scala` already imports `org.apache.spark.sql.catalyst.expressions._` (wildcard), so most expressions are picked up for free. For Pattern C, the topic file (`math.scala`, `strings.scala`, `datetime.scala`, `hash.scala`, `arrays.scala`, etc.) uses **explicit** imports — add the new class name to the existing `import org.apache.spark.sql.catalyst.expressions.{...}` line, and place the serde object alongside its peers.
113+
114+
Helpers from `QueryPlanSerde`:
115+
116+
- `scalarFunctionExprToProto(funcName, args*)` — basic call, `failOnError = false`, return type inferred from args.
117+
- `scalarFunctionExprToProtoWithReturnType(funcName, returnType, failOnError, args*)` — set explicit return type and/or `fail_on_error`.
118+
- `optExprWithInfo(optExpr, expr, children*)` — attaches the "why we couldn't convert" tag if any child failed; always wrap your final result.
119+
- `withInfo(expr, "reason")` — tag a fallback reason when returning `None`.
120+
121+
For `getSupportLevel`, returning `Incompatible(Some("..."))` gates the function behind `spark.comet.expr.<exprConfigName>.allowIncompatible=true`. `Unsupported(notes)` always falls back to Spark.
122+
123+
### 5. Apply the native registration (Pattern B only)
124+
125+
In `native/core/src/execution/jni_api.rs::register_datafusion_spark_function`, add one line with the upstream import already in scope (or extend the imports). Keep the list grouped logically with neighboring entries.
126+
127+
Pattern A and Pattern C **never** touch native code.
128+
129+
### 6. Build and smoke-test
130+
131+
Build the project (the user generally runs tests themselves — do not run them proactively unless asked):
132+
133+
```bash
134+
make
135+
```
136+
137+
If the user asks for a smoke test:
138+
139+
```bash
140+
./mvnw test -Dsuites="org.apache.comet.CometSqlFileTestSuite $ARGUMENTS" -Dtest=none
141+
```
142+
143+
### 7. Add SQL file tests
144+
145+
Create `spark/src/test/resources/sql-tests/expressions/<category>/$ARGUMENTS.sql`. Coverage requirements (mandatory — do not skip):
146+
147+
1. **Inspect the Spark source for accepted input datatypes.** Look at `inputTypes`, the parent class (`UnaryMathExpression`, `BinaryExpression`, `ImplicitCastInputTypes`, `ExpectsInputTypes`), and any `dataType` overrides. Add one column per supported type — e.g. for a numeric function: `ByteType`, `ShortType`, `IntegerType`, `LongType`, `FloatType`, `DoubleType`, `DecimalType` (varying precision/scale). For a string function: also cover `BinaryType` if accepted.
148+
2. **All `SELECT`s must read from a parquet table.** Do not write literal-only queries like `SELECT f(1.0), f(NULL)` — encode the values you want to test as rows in a parquet-backed table and query that. This exercises the real read path, not just the constant-folder.
149+
3. **Mix `NULL` into every column** so the kernel exercises the validity bitmap, not just the all-valid fast path.
150+
4. **Floating-point (`FloatType` / `DoubleType`)** columns must include: `NaN`, `+0.0`, `-0.0`, `-Infinity`, `+Infinity`. Use `cast('NaN' as double)`, `cast('Infinity' as double)`, `cast('-Infinity' as double)`.
151+
5. **Integer / decimal** columns must include each type's max and min (e.g. `127` / `-128` for byte, `2147483647` / `-2147483648` for int, `9223372036854775807` / `-9223372036854775808` for long, `Decimal(38, 0)` boundary for decimal) so overflow / wrap behavior is exercised.
152+
6. Cover the **Spark-specific edge cases identified in step 1** (negative scale, empty string, ANSI-mode error paths, etc.) — again, as rows in the parquet table.
153+
154+
Use `query expect_fallback(...)` for inputs where the serde intentionally returns `None`. Format described in `docs/source/contributor-guide/sql-file-tests.md`.
155+
156+
### 8. Update the documentation
157+
158+
Two files are hand-curated (not regenerated by `make`) and must be updated for every newly wired expression:
159+
160+
- `docs/source/user-guide/latest/expressions.md` — add a row to the matching category table (`## Math Expressions`, `## String Expressions`, etc.) in alphabetical order: `| <ExpressionClass> | \`<sql_name>\` |`.
161+
- `docs/source/contributor-guide/spark_expressions_support.md` — flip the entry from `- [ ] <name>` to `- [x] <name>` in the matching category section.
162+
163+
Per-category compatibility pages under `docs/source/user-guide/latest/compatibility/expressions/*.md` are auto-generated by `GenerateDocs.scala` from `getCompatibleNotes` / `getIncompatibleReasons` / `getUnsupportedReasons`. Do not hand-edit those — `make` regenerates them.
164+
165+
### 9. Run the audit skill
166+
167+
Run `audit-comet-expression` on `$ARGUMENTS`. It compares against Spark 3.4.3, 3.5.8, and 4.0.1 and produces a prioritized gap list — usually missing test coverage. Iterate on tests until the user is satisfied.
168+
169+
### 10. Final checks
170+
171+
```bash
172+
make format
173+
cd native && cargo clippy --all-targets --workspace -- -D warnings
174+
```
175+
176+
### 11. Open the PR
177+
178+
Fill in every section of `.github/pull_request_template.md`. In "What changes are included in this PR?", note that the `wire-datafusion-function` skill was used and call out which upstream function (DataFusion built-in or `datafusion-spark::function::...`) is being wired.

docs/source/contributor-guide/spark_expressions_support.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,7 @@
371371
- [x] cos
372372
- [x] cosh
373373
- [x] cot
374-
- [ ] csc
374+
- [x] csc
375375
- [x] degrees
376376
- [x] div
377377
- [ ] e

docs/source/user-guide/latest/expressions.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ of expressions that be disabled.
150150
| Cos | `cos` |
151151
| Cosh | `cosh` |
152152
| Cot | `cot` |
153+
| Csc | `csc` |
153154
| Divide | `/` |
154155
| Exp | `exp` |
155156
| Expm1 | `expm1` |

native/core/src/execution/jni_api.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ use datafusion_spark::function::map::map_from_entries::MapFromEntries;
5757
use datafusion_spark::function::map::str_to_map::SparkStrToMap;
5858
use datafusion_spark::function::math::expm1::SparkExpm1;
5959
use datafusion_spark::function::math::hex::SparkHex;
60+
use datafusion_spark::function::math::trigonometry::SparkCsc;
6061
use datafusion_spark::function::math::width_bucket::SparkWidthBucket;
6162
use datafusion_spark::function::string::char::CharFunc;
6263
use datafusion_spark::function::string::concat::SparkConcat;
@@ -591,6 +592,7 @@ fn register_datafusion_spark_function(session_ctx: &SessionContext) {
591592
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkUrlDecode::default()));
592593
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkUrlEncode::default()));
593594
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkTryUrlDecode::default()));
595+
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkCsc::default()));
594596
}
595597

596598
/// Prepares arrow arrays for output.

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim {
104104
classOf[Ceil] -> CometCeil,
105105
classOf[Cos] -> CometScalarFunction("cos"),
106106
classOf[Cosh] -> CometScalarFunction("cosh"),
107+
classOf[Csc] -> CometScalarFunction("csc"),
107108
classOf[Divide] -> CometDivide,
108109
classOf[Exp] -> CometScalarFunction("exp"),
109110
classOf[Expm1] -> CometScalarFunction("expm1"),
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
-- Licensed to the Apache Software Foundation (ASF) under one
2+
-- or more contributor license agreements. See the NOTICE file
3+
-- distributed with this work for additional information
4+
-- regarding copyright ownership. The ASF licenses this file
5+
-- to you under the Apache License, Version 2.0 (the
6+
-- "License"); you may not use this file except in compliance
7+
-- with the License. You may obtain a copy of the License at
8+
--
9+
-- http://www.apache.org/licenses/LICENSE-2.0
10+
--
11+
-- Unless required by applicable law or agreed to in writing,
12+
-- software distributed under the License is distributed on an
13+
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
-- KIND, either express or implied. See the License for the
15+
-- specific language governing permissions and limitations
16+
-- under the License.
17+
18+
statement
19+
CREATE TABLE test_csc(d double) USING parquet
20+
21+
statement
22+
INSERT INTO test_csc VALUES (0.0), (-0.0), (1.5707963267948966), (-1.5707963267948966), (3.141592653589793), (NULL), (cast('NaN' as double)), (cast('Infinity' as double)), (cast('-Infinity' as double))
23+
24+
query tolerance=1e-6
25+
SELECT csc(d) FROM test_csc

0 commit comments

Comments
 (0)