Skip to content

Commit f111d80

Browse files
committed
[SPARK-57626][SQL] Share repeated nested JSON path parsing
### What changes were proposed in this pull request? This PR extends the internal shared parser introduced by [SPARK-47670](https://issues.apache.org/jira/browse/SPARK-47670) and #56547 from simple top-level fields to repeated literal named object paths. The proposed changes: - Parse literal named JSON paths into field-name segments. Both dot notation such as `$.payload.user.id` and quoted names such as `$['payload']['user.name']` are supported. - Build a runtime path trie so multiple nested fields can be extracted in one streaming JSON scan. - Preserve the first non-null duplicate-key match independently for each requested path, including duplicate parent objects and non-object intermediate values. - Keep ancestor and descendant paths in separate shared parses. For example, `$.a` never shares a parse with `$.a.b`. - Build all greedy prefix-free groups in one optimizer invocation, avoiding repeated fixed-point iterations for parallel prefix chains. - Retain the existing top-level fast path so top-level sharing does not pay for runtime trie traversal. - Leave dynamic paths, wildcards, array subscripts, and paths deeper than 64 named fields on their existing independent `GetJsonObject` evaluation. - Continue using the existing internal, default-disabled `spark.sql.optimizer.getJsonObjectSharedParsing.enabled` configuration. The existing JSON expression optimization must also be enabled. - Extend optimizer, runtime, code-generation, malformed-input, and microbenchmark coverage for nested paths. For example, these prefix-free paths share one parse: ```text $.payload.user.id $.payload.user.name $.payload.request_id ``` For this ordered set of ancestor/descendant paths: ```text $.a $.a.b $.x $.x.y ``` the optimizer creates two prefix-free groups in one invocation: ```text group 1: $.a, $.x group 2: $.a.b, $.x.y ``` Unsupported forms such as `$.items[0].id`, `$.payload.*`, and paths supplied by another column remain unchanged. ### Why are the changes needed? [SPARK-57626](https://issues.apache.org/jira/browse/SPARK-57626) follows up on the initial shared-parsing optimization. The initial implementation intentionally supports only simple top-level fields, so repeated literal nested paths still parse the same JSON independently. For example, consider an `events` table whose `json` column contains: ```json {"payload":{"user":{"id":123,"name":"alice"},"request_id":"r-1"}} ``` An existing query might run: ```sql SELECT get_json_object(json, '$.payload.user.id') AS user_id, get_json_object(json, '$.payload.user.name') AS user_name, get_json_object(json, '$.payload.request_id') AS request_id FROM events; ``` Before this PR, Spark parses each row's `json` value independently for all three nested extractions. With shared parsing enabled, Catalyst rewrites them to one internal `MultiGetJsonObject`, so the input is scanned once and all three values are returned from that scan. The SQL and its results do not change. ### Does this PR introduce _any_ user-facing change? Yes, within the unreleased `master` branch only. When the existing internal, default-disabled `spark.sql.optimizer.getJsonObjectSharedParsing.enabled` configuration is enabled, eligible repeated simple nested `get_json_object` paths now use shared parsing; previously only top-level paths were eligible. There is no new API, expression, configuration, or query migration. Result semantics remain unchanged for malformed input, duplicate keys, nulls, non-object intermediate values, and rendering failures. With the flag disabled, existing analyzed and optimized plans remain unchanged. Released Spark versions are unaffected. ### How was this patch tested? The following compilation, suites, and style checks passed on JDK 17: ```bash build/sbt "catalyst/compile" "catalyst/Test/compile" "sql/Test/compile" build/sbt "catalyst/testOnly org.apache.spark.sql.catalyst.optimizer.OptimizeJsonExprsSuite" build/sbt "sql/testOnly org.apache.spark.sql.JsonFunctionsSuite" build/sbt "hive/Test/testOnly org.apache.spark.sql.configaudit.SparkConfigBindingPolicySuite" build/sbt "catalyst/scalastyle" "catalyst/Test/scalastyle" "sql/Test/scalastyle" git diff --check ``` The complete `OptimizeJsonExprsSuite` passed 24 tests, the complete `JsonFunctionsSuite` passed 106 tests, and `SparkConfigBindingPolicySuite` passed 3 tests. The coverage includes nested sharing, both prefix-conflict directions, one-pass grouping of parallel prefix chains, a 2,000-path projection, unsupported paths, the depth limit, default-off plan equivalence, duplicate keys, nulls, malformed and single-quoted JSON, non-object intermediate values, rendering failures, and whole-stage code generation. The microbenchmark and its committed result files were regenerated with Spark's `Run benchmarks` GitHub Actions workflow for [JDK 17](https://github.com/sunchao/spark/actions/runs/28035861052), [JDK 21](https://github.com/sunchao/spark/actions/runs/28035861792), and [JDK 25](https://github.com/sunchao/spark/actions/runs/28035859413). All three runs passed and committed their JDK-specific results to this branch. Best JDK 17 times for 200,000 cached rows with 32 fields under a nested `payload` object: | Nested paths extracted | Shared parsing off | Shared parsing on | Relative speedup | | ---: | ---: | ---: | ---: | | 2 | 1,498 ms | 894 ms | 1.7x | | 4 | 2,969 ms | 1,129 ms | 2.6x | | 8 | 5,841 ms | 1,378 ms | 4.2x | | 16 | 11,856 ms | 1,926 ms | 6.2x | These are GitHub-hosted expression-scaling measurements, not end-to-end production-job results. The complete output, including the JDK 21 and JDK 25 runs, is recorded in the three `SharedJsonParseBenchmark` result files. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: OpenAI Codex (GPT-5) Closes #56685 from sunchao/codex/SPARK-57626-shared-nested-json-paths. Lead-authored-by: Chao Sun <chao@openai.com> Co-authored-by: sunchao <sunchao@users.noreply.github.com> Signed-off-by: Chao Sun <chao@openai.com>
1 parent eb69a3e commit f111d80

10 files changed

Lines changed: 596 additions & 117 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala

Lines changed: 138 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.expressions.json
1818

1919
import java.io.{ByteArrayOutputStream, CharArrayWriter, StringWriter}
2020

21+
import scala.collection.mutable
2122
import scala.util.parsing.combinator.RegexParsers
2223

2324
import com.fasterxml.jackson.core._
@@ -575,24 +576,30 @@ case class GetJsonObjectEvaluator(cachedPath: UTF8String) {
575576
}
576577

577578
/**
578-
* Evaluates multiple simple top-level JSON fields in one parse.
579+
* Evaluates multiple simple named JSON paths in one parse.
579580
*/
580581
case class MultiGetJsonObjectEvaluator(
581-
fieldNames: Seq[String],
582-
fallbackPaths: Seq[UTF8String]) {
582+
fallbackPaths: Seq[UTF8String],
583+
namedPaths: Seq[Seq[String]]) {
583584
import SharedFactory._
584585

585-
require(
586-
fieldNames.nonEmpty &&
587-
fieldNames.distinct.length == fieldNames.length &&
588-
fallbackPaths.length == fieldNames.length)
586+
require(fallbackPaths.nonEmpty && namedPaths.length == fallbackPaths.length)
589587

590588
@transient
591-
private lazy val fieldToOrdinal: Map[String, Int] = fieldNames.zipWithIndex.toMap
589+
private lazy val useTopLevelFastPath: Boolean =
590+
namedPaths.forall(_.length == 1) && namedPaths.distinct.length == namedPaths.length
591+
592+
@transient
593+
private lazy val topLevelFieldToOrdinal: Map[String, Int] =
594+
namedPaths.zipWithIndex.map { case (path, ordinal) => path.head -> ordinal }.toMap
595+
596+
@transient
597+
private lazy val pathTrie: MultiGetJsonObjectEvaluator.PathTrieNode =
598+
MultiGetJsonObjectEvaluator.buildPathTrie(namedPaths)
592599

593600
@transient
594601
private lazy val nullRow: InternalRow =
595-
new GenericInternalRow(Array.ofDim[Any](fieldNames.length))
602+
new GenericInternalRow(Array.ofDim[Any](fallbackPaths.length))
596603

597604
@transient
598605
private lazy val fallbackEvaluators: Seq[GetJsonObjectEvaluator] =
@@ -611,34 +618,18 @@ case class MultiGetJsonObjectEvaluator(
611618
def evaluate(json: UTF8String): InternalRow = {
612619
if (json == null) return null
613620

614-
val values = Array.ofDim[Any](fieldNames.length)
615-
val matched = Array.ofDim[Boolean](fieldNames.length)
621+
val values = Array.ofDim[Any](fallbackPaths.length)
622+
val matched = Array.ofDim[Boolean](fallbackPaths.length)
616623

617624
try {
618625
val validObject = Utils.tryWithResource(
619626
CreateJacksonParser.utf8String(jsonFactory, json)) { parser =>
620627
if (parser.nextToken() != JsonToken.START_OBJECT) {
621628
false
629+
} else if (useTopLevelFastPath) {
630+
extractTopLevelObject(parser, values, matched)
622631
} else {
623-
var token = parser.nextToken()
624-
while (token != null && token != JsonToken.END_OBJECT) {
625-
if (token == JsonToken.FIELD_NAME) {
626-
val fieldName = parser.currentName
627-
val ordinal = fieldToOrdinal.get(fieldName).filter(!matched(_))
628-
val valueToken = parser.nextToken()
629-
if (ordinal.nonEmpty && valueToken != JsonToken.VALUE_NULL) {
630-
val index = ordinal.get
631-
matched(index) = true
632-
copyCurrentStructure(parser).foreach(value => values(index) = value)
633-
} else {
634-
parser.skipChildren()
635-
}
636-
} else {
637-
parser.skipChildren()
638-
}
639-
token = parser.nextToken()
640-
}
641-
token == JsonToken.END_OBJECT
632+
extractObject(parser, pathTrie, values, matched)
642633
}
643634
}
644635
if (validObject) {
@@ -647,15 +638,88 @@ case class MultiGetJsonObjectEvaluator(
647638
nullRow
648639
}
649640
} catch {
650-
// Every simple top-level legacy extraction scans through the root object's closing token,
651-
// so a syntax failure makes every sibling null without needing per-path reparsing.
641+
// Every simple named legacy extraction scans through the root object's closing token, so a
642+
// syntax failure makes every sibling null without needing per-path reparsing.
652643
case _: JsonParseException => nullRow
653-
// A parser-side rendering failure can leave the shared token stream unusable. Reparse each
654-
// path with the legacy evaluator so one bad selected value cannot erase sibling results.
644+
// A parser-side rendering failure, such as a string-length constraint violation, can leave
645+
// the shared token stream unusable. Reparse each path with the legacy evaluator so one bad
646+
// selected value cannot erase independent sibling results.
655647
case _: JsonProcessingException => fallback(json)
656648
}
657649
}
658650

651+
private def extractTopLevelObject(
652+
parser: JsonParser,
653+
values: Array[Any],
654+
matched: Array[Boolean]): Boolean = {
655+
var token = parser.nextToken()
656+
while (token != null && token != JsonToken.END_OBJECT) {
657+
if (token == JsonToken.FIELD_NAME) {
658+
val ordinal = topLevelFieldToOrdinal.get(parser.currentName).filter(!matched(_))
659+
val valueToken = parser.nextToken()
660+
if (ordinal.nonEmpty && valueToken != JsonToken.VALUE_NULL) {
661+
val index = ordinal.get
662+
matched(index) = true
663+
copyCurrentStructure(parser).foreach(value => values(index) = value)
664+
} else {
665+
parser.skipChildren()
666+
}
667+
} else {
668+
parser.skipChildren()
669+
}
670+
token = parser.nextToken()
671+
}
672+
token == JsonToken.END_OBJECT
673+
}
674+
675+
private def extractObject(
676+
parser: JsonParser,
677+
node: MultiGetJsonObjectEvaluator.PathTrieNode,
678+
values: Array[Any],
679+
matched: Array[Boolean]): Boolean = {
680+
var valid = true
681+
var token = parser.nextToken()
682+
while (valid && token != null && token != JsonToken.END_OBJECT) {
683+
if (token == JsonToken.FIELD_NAME) {
684+
val child = node.children.get(parser.currentName).filter(_.hasUnmatched(matched))
685+
val valueToken = parser.nextToken()
686+
if (child.nonEmpty && valueToken != JsonToken.VALUE_NULL) {
687+
valid = extractValue(parser, child.get, values, matched)
688+
} else {
689+
parser.skipChildren()
690+
}
691+
} else {
692+
parser.skipChildren()
693+
}
694+
if (valid) {
695+
token = parser.nextToken()
696+
}
697+
}
698+
valid && token == JsonToken.END_OBJECT
699+
}
700+
701+
private def extractValue(
702+
parser: JsonParser,
703+
node: MultiGetJsonObjectEvaluator.PathTrieNode,
704+
values: Array[Any],
705+
matched: Array[Boolean]): Boolean = {
706+
// Optimizer-generated paths are deduplicated. Multiple ordinals defensively support
707+
// directly constructed internal expressions with duplicate paths.
708+
if (node.terminalOrdinals.nonEmpty) {
709+
node.terminalOrdinals.foreach { ordinal => matched(ordinal) = true }
710+
val value = copyCurrentStructure(parser)
711+
value.foreach { result =>
712+
node.terminalOrdinals.foreach { ordinal => values(ordinal) = result }
713+
}
714+
true
715+
} else if (parser.currentToken == JsonToken.START_OBJECT) {
716+
extractObject(parser, node, values, matched)
717+
} else {
718+
parser.skipChildren()
719+
true
720+
}
721+
}
722+
659723
private def copyCurrentStructure(parser: JsonParser): Option[UTF8String] = {
660724
outputBuffer.reset()
661725
var renderingFailed = false
@@ -726,3 +790,43 @@ case class MultiGetJsonObjectEvaluator(
726790
if (renderingFailed) None else Some(UTF8String.fromBytes(outputBuffer.toByteArray))
727791
}
728792
}
793+
794+
object MultiGetJsonObjectEvaluator {
795+
private final class MutablePathTrieNode {
796+
val terminalOrdinals: mutable.ArrayBuffer[Int] = mutable.ArrayBuffer.empty
797+
val children: mutable.LinkedHashMap[String, MutablePathTrieNode] = mutable.LinkedHashMap.empty
798+
799+
def freeze(): PathTrieNode = {
800+
require(
801+
terminalOrdinals.isEmpty || children.isEmpty,
802+
"Shared JSON paths must not be prefixes of one another")
803+
val frozenChildren = children.iterator.map { case (name, child) =>
804+
name -> child.freeze()
805+
}.toMap
806+
val ordinals = (terminalOrdinals.iterator ++
807+
frozenChildren.valuesIterator.flatMap(_.descendantOrdinals.iterator)).toArray
808+
PathTrieNode(terminalOrdinals.toArray, frozenChildren, ordinals)
809+
}
810+
}
811+
812+
private case class PathTrieNode(
813+
terminalOrdinals: Array[Int],
814+
children: Map[String, PathTrieNode],
815+
descendantOrdinals: Array[Int]) {
816+
def hasUnmatched(matched: Array[Boolean]): Boolean = {
817+
descendantOrdinals.exists(index => !matched(index))
818+
}
819+
}
820+
821+
private def buildPathTrie(paths: Seq[Seq[String]]): PathTrieNode = {
822+
val root = new MutablePathTrieNode
823+
paths.zipWithIndex.foreach { case (path, ordinal) =>
824+
var node = root
825+
path.foreach { fieldName =>
826+
node = node.children.getOrElseUpdate(fieldName, new MutablePathTrieNode)
827+
}
828+
node.terminalOrdinals += ordinal
829+
}
830+
root.freeze()
831+
}
832+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -142,10 +142,16 @@ case class GetJsonObject(json: Expression, path: Expression)
142142
}
143143

144144
object GetJsonObject {
145-
private[sql] def simpleTopLevelField(path: UTF8String): Option[String] = {
145+
import PathInstruction._
146+
147+
private[sql] def simpleNamedPath(path: UTF8String): Option[Seq[String]] = {
146148
try {
147-
Option(path).flatMap(value => JsonPathParser.parse(value.toString)).collect {
148-
case List(PathInstruction.Key, PathInstruction.Named(fieldName)) => fieldName
149+
Option(path).flatMap(value => JsonPathParser.parse(value.toString)).flatMap { instructions =>
150+
val names = instructions.grouped(2).map {
151+
case List(Key, Named(fieldName)) => Some(fieldName)
152+
case _ => None
153+
}.toSeq
154+
if (names.nonEmpty && names.forall(_.isDefined)) Some(names.flatten) else None
149155
}
150156
} catch {
151157
// Numeric subscripts are parsed as Long and can overflow before the parser returns None.
@@ -155,28 +161,25 @@ object GetJsonObject {
155161
}
156162

157163
/**
158-
* Extracts multiple simple top-level fields from a JSON string in one parse. This is an internal
159-
* expression used to share sibling [[GetJsonObject]] expressions; unsupported JSON paths remain
160-
* as independent GetJsonObject expressions.
164+
* Extracts multiple simple named paths from a JSON string in one parse. This is an internal
165+
* expression used to share sibling [[GetJsonObject]] expressions; unsupported and
166+
* prefix-conflicting JSON paths remain as independent GetJsonObject expressions.
161167
*/
162168
case class MultiGetJsonObject(
163169
json: Expression,
164-
fieldNames: Seq[String],
165170
fallbackPaths: Seq[String])
166171
extends UnaryExpression
167172
with ExpectsInputTypes {
168173

169-
require(
170-
fieldNames.nonEmpty &&
171-
fieldNames.distinct.length == fieldNames.length &&
172-
fallbackPaths.length == fieldNames.length)
174+
// OptimizeCsvJsonExprs caps shared path depth to keep evaluator recursion stack-safe.
175+
require(fallbackPaths.nonEmpty)
173176

174177
override def child: Expression = json
175178

176179
override def inputTypes: Seq[AbstractDataType] =
177180
Seq(StringTypeWithCollation(supportsTrimCollation = true))
178181

179-
override lazy val dataType: DataType = StructType(fieldNames.indices.map { index =>
182+
override lazy val dataType: DataType = StructType(fallbackPaths.indices.map { index =>
180183
StructField(s"_$index", StringType, nullable = true)
181184
})
182185

@@ -189,10 +192,17 @@ case class MultiGetJsonObject(
189192

190193
final override val nodePatterns: Seq[TreePattern] = Seq(GET_JSON_OBJECT)
191194

195+
@transient
196+
private lazy val namedPaths = fallbackPaths.map { path =>
197+
GetJsonObject.simpleNamedPath(UTF8String.fromString(path)).getOrElse {
198+
throw new IllegalArgumentException(s"Unsupported shared JSON path: $path")
199+
}
200+
}
201+
192202
@transient
193203
private lazy val evaluator = MultiGetJsonObjectEvaluator(
194-
fieldNames,
195-
fallbackPaths.map(UTF8String.fromString))
204+
fallbackPaths.map(UTF8String.fromString),
205+
namedPaths)
196206

197207
override def eval(input: InternalRow): Any = {
198208
evaluator.evaluate(json.eval(input).asInstanceOf[UTF8String])

0 commit comments

Comments
 (0)