Skip to content

Commit 3ee5d65

Browse files
authored
feat: 100% Spark-compatible JSON support via codegen dispatcher (apache#4305)
1 parent 027c9b0 commit 3ee5d65

18 files changed

Lines changed: 364 additions & 133 deletions

File tree

.github/workflows/pr_build_linux.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,7 @@ jobs:
377377
org.apache.comet.CometMapExpressionSuite
378378
org.apache.comet.CometCsvExpressionSuite
379379
org.apache.comet.CometJsonExpressionSuite
380+
org.apache.comet.CometJsonJvmSuite
380381
org.apache.comet.SparkErrorConverterSuite
381382
org.apache.comet.expressions.conditional.CometIfSuite
382383
org.apache.comet.expressions.conditional.CometCoalesceSuite

.github/workflows/pr_build_macos.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ jobs:
193193
org.apache.comet.CometMapExpressionSuite
194194
org.apache.comet.CometCsvExpressionSuite
195195
org.apache.comet.CometJsonExpressionSuite
196+
org.apache.comet.CometJsonJvmSuite
196197
org.apache.comet.SparkErrorConverterSuite
197198
org.apache.comet.expressions.conditional.CometIfSuite
198199
org.apache.comet.expressions.conditional.CometCoalesceSuite

docs/source/contributor-guide/expression-audits/json_funcs.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,12 @@
3333
- Spark 4.1.1 (audited 2026-05-27): identical to 4.0.1.
3434
- Known incompatibility: Spark accepts single-quoted JSON and unescaped control characters; Comet's native parser (built on `serde_json`) rejects both, so those inputs require `spark.comet.expression.GetJsonObject.allowIncompatible=true` and may still produce different results. Non-default Spark 4.0 string collations are not propagated (https://github.com/apache/datafusion-comet/issues/2190).
3535

36+
## json_array_length
37+
38+
- `LengthOfJsonArray`: `UnaryExpression with ExpectsInputTypes with CodegenFallback`; `inputTypes = Seq(StringType) -> IntegerType`. Returns NULL for NULL input, invalid JSON, or non-array JSON; otherwise the number of top-level array elements.
39+
- Runs through the codegen dispatcher by default for byte-exact Spark compatibility.
40+
- Known incompatibility: the native path (built on `serde_json`) requires strict JSON, so single-quoted JSON, unescaped control characters, and trailing content require `spark.comet.expression.LengthOfJsonArray.allowIncompatible=true` and may still produce different results.
41+
3642
## to_json
3743

3844
- Partial native support; options and map/array inputs fall back.

docs/source/user-guide/latest/compatibility/index.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,5 @@ This guide documents areas where Comet's behavior is known to differ from Spark.
2828
- **Regular expressions**: differences between the Rust regexp crate and Java's regex engine.
2929
- **Operators**: operator-level compatibility notes, including window functions and round-robin partitioning.
3030
- **Expressions**: per-expression compatibility notes, including cast.
31+
- **JSON**: choosing between the native and Spark-compatible engines for JSON expressions.
3132
- **Spark versions**: version-specific known issues and limitations.
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
<!---
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing,
13+
software distributed under the License is distributed on an
14+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
KIND, either express or implied. See the License for the
16+
specific language governing permissions and limitations
17+
under the License.
18+
-->
19+
20+
# JSON Compatibility
21+
22+
Comet can evaluate JSON expressions (`get_json_object`, `from_json`, `to_json`,
23+
`json_array_length`) two ways:
24+
25+
- **Codegen dispatcher (default):** Spark's own `doGenCode` for the expression
26+
runs inside the Comet pipeline (via Comet's Arrow-direct codegen dispatcher),
27+
giving byte-exact compatibility with Spark at the cost of a JNI roundtrip per
28+
batch. This rides the codegen dispatcher
29+
(`spark.comet.exec.scalaUDF.codegen.enabled`, enabled by default); if the
30+
dispatcher is disabled, the operator falls back to Spark.
31+
- **Native (rust) path:** the native DataFusion implementation. Faster, but has
32+
known compatibility gaps with Spark on certain inputs, so it is **opt-in per
33+
expression** via the expression's `allowIncompatible` config. Any expression or
34+
input case with no native implementation falls back to the codegen dispatcher.
35+
36+
## Expression coverage
37+
38+
| SQL | Native (rust) path | Opt-in config |
39+
| ------------------- | ---------------------------------------------------------------------------------------------- | ------------------------------------------------------------ |
40+
| `get_json_object` | Supported, with gaps on single-quoted JSON and unescaped control characters | `spark.comet.expression.GetJsonObject.allowIncompatible` |
41+
| `from_json` | Supported with restrictions (PERMISSIVE mode only, simple schema types only) | `spark.comet.expression.JsonToStructs.allowIncompatible` |
42+
| `to_json` | Supported for struct inputs only, no options | `spark.comet.expression.StructsToJson.allowIncompatible` |
43+
| `json_array_length` | Supported, with gaps on single-quoted JSON, unescaped control characters, and trailing content | `spark.comet.expression.LengthOfJsonArray.allowIncompatible` |
44+
45+
When the native path is enabled but an expression or input case has no native
46+
implementation (for example `to_json` with map or array inputs, or `from_json`
47+
with an unsupported schema), Comet falls back to the codegen dispatcher for that
48+
case.
49+
50+
## When to use the native path
51+
52+
- You want the faster native path and your inputs avoid the known compatibility
53+
gaps above.
54+
- Enable it per expression, for example
55+
`spark.comet.expression.GetJsonObject.allowIncompatible=true`. Cases the native path
56+
does not cover still fall back to the codegen dispatcher.

docs/source/user-guide/latest/expressions.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ expression-level). The `outer` variants are wired but marked `Incompatible`; the
338338
| --- | --- | --- |
339339
| `from_json` || Falls back by default; opt-in via allowIncompatible ([audit](../../contributor-guide/expression-audits/json_funcs.md#from_json)) |
340340
| `get_json_object` || Some inputs need allowIncompatible ([audit](../../contributor-guide/expression-audits/json_funcs.md#get_json_object)) |
341-
| `json_array_length` | 🔜 | tracking [#4098](https://github.com/apache/datafusion-comet/issues/4098) |
341+
| `json_array_length` | | Single-quoted/trailing JSON needs allowIncompatible ([audit](../../contributor-guide/expression-audits/json_funcs.md#json_array_length)) |
342342
| `json_object_keys` | 🔜 | [#3161](https://github.com/apache/datafusion-comet/issues/3161) |
343343
| `json_tuple` | 🔜 | [#3160](https://github.com/apache/datafusion-comet/issues/3160) |
344344
| `schema_of_json` | 🔜 | [#3163](https://github.com/apache/datafusion-comet/issues/3163) |

docs/source/user-guide/latest/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ to read more.
6262
compatibility/regex
6363
compatibility/operators
6464
compatibility/expressions/index
65+
compatibility/json
6566
compatibility/spark-versions
6667

6768
.. toctree::

spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegen.scala

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.apache.arrow.vector._
2323
import org.apache.arrow.vector.complex.{ListVector, MapVector, StructVector}
2424
import org.apache.arrow.vector.types.pojo.Field
2525
import org.apache.spark.internal.Logging
26-
import org.apache.spark.sql.catalyst.expressions.{BoundReference, Expression, HigherOrderFunction, LambdaFunction, Literal, NamedLambdaVariable, Unevaluable}
26+
import org.apache.spark.sql.catalyst.expressions.{BoundReference, Expression, Literal, Unevaluable}
2727
import org.apache.spark.sql.catalyst.expressions.codegen._
2828
import org.apache.spark.sql.internal.SQLConf
2929
import org.apache.spark.sql.types._
@@ -107,9 +107,8 @@ object CometBatchKernelCodegen extends Logging with CometExprTraitShim {
107107
* back cleanly rather than crashing the Janino compile at execute time.
108108
*
109109
* Checks every `BoundReference`'s data type and the root `expr.dataType` against
110-
* [[isSupportedDataType]], rejects aggregates / generators / `CodegenFallback` (other than
111-
* HOFs, which are admitted), and gates total nested-field count on
112-
* `spark.sql.codegen.maxFields`.
110+
* [[isSupportedDataType]], rejects aggregates / generators / `Unevaluable`, and gates total
111+
* nested-field count on `spark.sql.codegen.maxFields`.
113112
*/
114113
def canHandle(boundExpr: Expression): Option[String] = {
115114
if (!isSupportedDataType(boundExpr.dataType)) {
@@ -127,12 +126,15 @@ object CometBatchKernelCodegen extends Logging with CometExprTraitShim {
127126
s"codegen dispatch: too many nested fields ($totalFields > " +
128127
s"spark.sql.codegen.maxFields=$maxFields)")
129128
}
130-
// HOFs are `CodegenFallback` but admitted: `CodegenFallback.doGenCode` emits one
131-
// `((Expression) references[N]).eval(row)` call site per HOF. The kernel dispatches to the
132-
// HOF's interpreted `eval`, which mutates `NamedLambdaVariable.value` per element and reads
133-
// the input array through the kernel's typed Arrow getters. Per-task `boundExpr` isolation
134-
// in `CometScalaUDFCodegen.kernelCache` prevents concurrent partitions from racing on the
135-
// lambda variable's `AtomicReference`. See `CometCodegenHOFSuite`.
129+
// `CodegenFallback` expressions are admitted. `CodegenFallback.doGenCode` emits one
130+
// `((Expression) references[N]).eval(row)` call site per expression. The kernel dispatches
131+
// to the expression's interpreted `eval` against `row` aliased to `this`, so the eval reads
132+
// through the kernel's typed Arrow getters. This covers `HigherOrderFunction` (which mutates
133+
// `NamedLambdaVariable.value` per element; see `CometCodegenHOFSuite`) as well as other
134+
// CodegenFallback expressions like `JsonToStructs` / `StructsToJson` whose `eval(row)`
135+
// simply calls `row.get(0, dataType)`. Per-task `boundExpr` isolation in
136+
// `CometScalaUDFCodegen.kernelCache` prevents concurrent partitions from racing on shared
137+
// state inside the expression.
136138
//
137139
// Nondeterministic / stateful expressions are accepted: each cache entry holds one kernel
138140
// instance with a single `init(partitionIndex)` call, so `Rand` / `MonotonicallyIncreasingID`
@@ -150,18 +152,14 @@ object CometBatchKernelCodegen extends Logging with CometExprTraitShim {
150152
boundExpr.find {
151153
case _: org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction => true
152154
case _: org.apache.spark.sql.catalyst.expressions.Generator => true
153-
case _: HigherOrderFunction => false
154-
case _: LambdaFunction => false
155-
case _: NamedLambdaVariable => false
156-
case _: CodegenFallback => true
157155
case u: Unevaluable if isCodegenInertUnevaluable(u) => false
158156
case _: Unevaluable => true
159157
case _ => false
160158
} match {
161159
case Some(bad) =>
162160
return Some(
163161
s"codegen dispatch: expression ${bad.getClass.getSimpleName} not supported " +
164-
"(aggregate, generator, codegen-fallback, or unevaluable)")
162+
"(aggregate, generator, or unevaluable)")
165163
case None =>
166164
}
167165
val badRef = boundExpr.collectFirst {

spark/src/main/scala/org/apache/comet/serde/CometScalaUDF.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
package org.apache.comet.serde
2121

2222
import org.apache.spark.SparkEnv
23-
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSeq, BindReferences, Expression, Literal, ScalaUDF}
23+
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSeq, BindReferences, Expression, Literal, RuntimeReplaceable, ScalaUDF}
2424
import org.apache.spark.sql.types.BinaryType
2525

2626
import org.apache.comet.CometConf
@@ -78,10 +78,20 @@ object CometScalaUDF extends CometExpressionSerde[ScalaUDF] {
7878
return None
7979
}
8080

81+
// `RuntimeReplaceable` expressions (e.g. Spark 4's `StructsToJson`) have a `doGenCode` that
82+
// always throws "Cannot generate code for expression". Catalyst's `ReplaceExpressions` rule
83+
// normally rewrites them to their `replacement` form before codegen runs. Comet's serde
84+
// sometimes works with the pre-rewrite form (via shim reconstruction) for matching purposes,
85+
// so unwrap to the replacement here before binding so the kernel compiles.
86+
val target = expr match {
87+
case rr: RuntimeReplaceable => rr.replacement
88+
case other => other
89+
}
90+
8191
// Bind against only the AttributeReferences the tree actually reads, so ordinals align with
8292
// the data args we ship.
83-
val attrs = expr.collect { case a: AttributeReference => a }.distinct
84-
val boundExpr = BindReferences.bindReference(expr, AttributeSeq(attrs))
93+
val attrs = target.collect { case a: AttributeReference => a }.distinct
94+
val boundExpr = BindReferences.bindReference(target, AttributeSeq(attrs))
8595

8696
// Gate at plan time. Surface the reason via withFallbackReason rather than crashing Janino
8797
// at execute.

spark/src/main/scala/org/apache/comet/serde/json.scala

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,30 @@
1919

2020
package org.apache.comet.serde
2121

22-
import org.apache.spark.sql.catalyst.expressions.LengthOfJsonArray
22+
import org.apache.spark.sql.catalyst.expressions.{Attribute, LengthOfJsonArray}
2323

24-
object CometLengthOfJsonArray
25-
extends CometScalarFunction[LengthOfJsonArray]("json_array_length") {
24+
import org.apache.comet.CometConf
25+
import org.apache.comet.serde.ExprOuterClass.Expr
26+
import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithFallbackReason, scalarFunctionExprToProto}
2627

27-
private val IncompatibleReason: String =
28-
"Spark's lenient JSON parser allows single quotes, unescaped controls, " +
29-
"and trailing content, " +
30-
"while Comet's serde_json requires strict JSON."
31-
32-
override def getIncompatibleReasons(): Seq[String] = Seq(IncompatibleReason)
28+
/**
29+
* `json_array_length` runs Spark's own implementation through the codegen dispatcher by default,
30+
* for byte-exact results. The native (rust) path is faster but incompatible with Spark for
31+
* single-quoted JSON, unescaped control characters, and trailing content, so it is opt-in via
32+
* `spark.comet.expression.LengthOfJsonArray.allowIncompatible`; otherwise it rides the codegen
33+
* dispatcher via [[CometCodegenDispatch]].
34+
*/
35+
object CometLengthOfJsonArray extends CometCodegenDispatch[LengthOfJsonArray] {
3336

34-
override def getSupportLevel(expr: LengthOfJsonArray): SupportLevel = Incompatible(
35-
Some(IncompatibleReason))
37+
override def convert(
38+
expr: LengthOfJsonArray,
39+
inputs: Seq[Attribute],
40+
binding: Boolean): Option[Expr] =
41+
if (CometConf.isExprAllowIncompat(getExprConfigName(expr))) {
42+
val childExpr = expr.children.map(exprToProtoInternal(_, inputs, binding))
43+
val optExpr = scalarFunctionExprToProto("json_array_length", childExpr: _*)
44+
optExprWithFallbackReason(optExpr, expr, expr.children: _*)
45+
} else {
46+
super.convert(expr, inputs, binding)
47+
}
3648
}

0 commit comments

Comments
 (0)