From ee71fb12cb5fd5285905b14b041a4b3a09a72ff2 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Mon, 22 Jun 2026 17:32:25 -0700 Subject: [PATCH 1/7] [SPARK-57626][SQL] Share repeated nested JSON path parsing --- .../json/JsonExpressionEvalUtils.scala | 163 +++++++++++++++--- .../expressions/jsonExpressions.scala | 29 +++- .../optimizer/OptimizeCsvJsonExprs.scala | 130 +++++++++++--- .../apache/spark/sql/internal/SQLConf.scala | 2 +- .../optimizer/OptimizeJsonExprsSuite.scala | 116 ++++++++++++- .../apache/spark/sql/JsonFunctionsSuite.scala | 78 ++++++++- .../json/SharedJsonParseBenchmark.scala | 36 +++- 7 files changed, 479 insertions(+), 75 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala index c0203dd762f7d..e02497b922b46 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala @@ -575,20 +575,30 @@ case class GetJsonObjectEvaluator(cachedPath: UTF8String) { } /** - * Evaluates multiple simple top-level JSON fields in one parse. + * Evaluates multiple simple named JSON paths in one parse. */ case class MultiGetJsonObjectEvaluator( fieldNames: Seq[String], - fallbackPaths: Seq[UTF8String]) { + fallbackPaths: Seq[UTF8String], + namedPaths: Seq[Seq[String]]) { import SharedFactory._ require( fieldNames.nonEmpty && - fieldNames.distinct.length == fieldNames.length && - fallbackPaths.length == fieldNames.length) + fallbackPaths.length == fieldNames.length && + namedPaths.length == fieldNames.length) @transient - private lazy val fieldToOrdinal: Map[String, Int] = fieldNames.zipWithIndex.toMap + private lazy val useTopLevelFastPath: Boolean = + namedPaths.forall(_.length == 1) && namedPaths.distinct.length == namedPaths.length + + @transient + private lazy val topLevelFieldToOrdinal: Map[String, Int] = + namedPaths.zipWithIndex.map { case (path, ordinal) => path.head -> ordinal }.toMap + + @transient + private lazy val pathTrie: MultiGetJsonObjectEvaluator.PathTrieNode = + MultiGetJsonObjectEvaluator.buildPathTrie(namedPaths) @transient private lazy val nullRow: InternalRow = @@ -619,26 +629,10 @@ case class MultiGetJsonObjectEvaluator( CreateJacksonParser.utf8String(jsonFactory, json)) { parser => if (parser.nextToken() != JsonToken.START_OBJECT) { false + } else if (useTopLevelFastPath) { + extractTopLevelObject(parser, values, matched) } else { - var token = parser.nextToken() - while (token != null && token != JsonToken.END_OBJECT) { - if (token == JsonToken.FIELD_NAME) { - val fieldName = parser.currentName - val ordinal = fieldToOrdinal.get(fieldName).filter(!matched(_)) - val valueToken = parser.nextToken() - if (ordinal.nonEmpty && valueToken != JsonToken.VALUE_NULL) { - val index = ordinal.get - matched(index) = true - copyCurrentStructure(parser).foreach(value => values(index) = value) - } else { - parser.skipChildren() - } - } else { - parser.skipChildren() - } - token = parser.nextToken() - } - token == JsonToken.END_OBJECT + extractObject(parser, pathTrie, values, matched) } } if (validObject) { @@ -647,15 +641,86 @@ case class MultiGetJsonObjectEvaluator( nullRow } } catch { - // Every simple top-level legacy extraction scans through the root object's closing token, - // so a syntax failure makes every sibling null without needing per-path reparsing. + // Every simple named legacy extraction scans through the root object's closing token, so a + // syntax failure makes every sibling null without needing per-path reparsing. case _: JsonParseException => nullRow - // A parser-side rendering failure can leave the shared token stream unusable. Reparse each - // path with the legacy evaluator so one bad selected value cannot erase sibling results. + // A parser-side rendering failure, such as a string-length constraint violation, can leave + // the shared token stream unusable. Reparse each path with the legacy evaluator so one bad + // selected value cannot erase independent sibling results. case _: JsonProcessingException => fallback(json) } } + private def extractTopLevelObject( + parser: JsonParser, + values: Array[Any], + matched: Array[Boolean]): Boolean = { + var token = parser.nextToken() + while (token != null && token != JsonToken.END_OBJECT) { + if (token == JsonToken.FIELD_NAME) { + val ordinal = topLevelFieldToOrdinal.get(parser.currentName).filter(!matched(_)) + val valueToken = parser.nextToken() + if (ordinal.nonEmpty && valueToken != JsonToken.VALUE_NULL) { + val index = ordinal.get + matched(index) = true + copyCurrentStructure(parser).foreach(value => values(index) = value) + } else { + parser.skipChildren() + } + } else { + parser.skipChildren() + } + token = parser.nextToken() + } + token == JsonToken.END_OBJECT + } + + private def extractObject( + parser: JsonParser, + node: MultiGetJsonObjectEvaluator.PathTrieNode, + values: Array[Any], + matched: Array[Boolean]): Boolean = { + var valid = true + var token = parser.nextToken() + while (valid && token != null && token != JsonToken.END_OBJECT) { + if (token == JsonToken.FIELD_NAME) { + val child = node.children.get(parser.currentName).filter(_.hasUnmatched(matched)) + val valueToken = parser.nextToken() + if (child.nonEmpty && valueToken != JsonToken.VALUE_NULL) { + valid = extractValue(parser, child.get, values, matched) + } else { + parser.skipChildren() + } + } else { + parser.skipChildren() + } + if (valid) { + token = parser.nextToken() + } + } + valid && token == JsonToken.END_OBJECT + } + + private def extractValue( + parser: JsonParser, + node: MultiGetJsonObjectEvaluator.PathTrieNode, + values: Array[Any], + matched: Array[Boolean]): Boolean = { + if (node.terminalOrdinals.nonEmpty) { + node.terminalOrdinals.foreach { ordinal => matched(ordinal) = true } + val value = copyCurrentStructure(parser) + value.foreach { result => + node.terminalOrdinals.foreach { ordinal => values(ordinal) = result } + } + true + } else if (parser.currentToken == JsonToken.START_OBJECT) { + extractObject(parser, node, values, matched) + } else { + parser.skipChildren() + true + } + } + private def copyCurrentStructure(parser: JsonParser): Option[UTF8String] = { outputBuffer.reset() var renderingFailed = false @@ -726,3 +791,45 @@ case class MultiGetJsonObjectEvaluator( if (renderingFailed) None else Some(UTF8String.fromBytes(outputBuffer.toByteArray)) } } + +object MultiGetJsonObjectEvaluator { + private final class MutablePathTrieNode { + val terminalOrdinals: scala.collection.mutable.ArrayBuffer[Int] = + scala.collection.mutable.ArrayBuffer.empty + val children: scala.collection.mutable.LinkedHashMap[String, MutablePathTrieNode] = + scala.collection.mutable.LinkedHashMap.empty + + def freeze(): PathTrieNode = { + require( + terminalOrdinals.isEmpty || children.isEmpty, + "Shared JSON paths must not be prefixes of one another") + val frozenChildren = children.iterator.map { case (name, child) => + name -> child.freeze() + }.toMap + val ordinals = (terminalOrdinals.iterator ++ + frozenChildren.valuesIterator.flatMap(_.descendantOrdinals.iterator)).toArray + PathTrieNode(terminalOrdinals.toArray, frozenChildren, ordinals) + } + } + + private case class PathTrieNode( + terminalOrdinals: Array[Int], + children: Map[String, PathTrieNode], + descendantOrdinals: Array[Int]) { + def hasUnmatched(matched: Array[Boolean]): Boolean = { + descendantOrdinals.exists(index => !matched(index)) + } + } + + private def buildPathTrie(paths: Seq[Seq[String]]): PathTrieNode = { + val root = new MutablePathTrieNode + paths.zipWithIndex.foreach { case (path, ordinal) => + var node = root + path.foreach { fieldName => + node = node.children.getOrElseUpdate(fieldName, new MutablePathTrieNode) + } + node.terminalOrdinals += ordinal + } + root.freeze() + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 76e94c3f5975c..864428d6a261a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -142,10 +142,16 @@ case class GetJsonObject(json: Expression, path: Expression) } object GetJsonObject { - private[sql] def simpleTopLevelField(path: UTF8String): Option[String] = { + import PathInstruction._ + + private[sql] def simpleNamedPath(path: UTF8String): Option[Seq[String]] = { try { - Option(path).flatMap(value => JsonPathParser.parse(value.toString)).collect { - case List(PathInstruction.Key, PathInstruction.Named(fieldName)) => fieldName + Option(path).flatMap(value => JsonPathParser.parse(value.toString)).flatMap { instructions => + val names = instructions.grouped(2).map { + case List(Key, Named(fieldName)) => Some(fieldName) + case _ => None + }.toSeq + if (names.nonEmpty && names.forall(_.isDefined)) Some(names.flatten) else None } } catch { // Numeric subscripts are parsed as Long and can overflow before the parser returns None. @@ -155,9 +161,9 @@ object GetJsonObject { } /** - * Extracts multiple simple top-level fields from a JSON string in one parse. This is an internal - * expression used to share sibling [[GetJsonObject]] expressions; unsupported JSON paths remain - * as independent GetJsonObject expressions. + * Extracts multiple simple named paths from a JSON string in one parse. This is an internal + * expression used to share sibling [[GetJsonObject]] expressions; unsupported and + * prefix-conflicting JSON paths remain as independent GetJsonObject expressions. */ case class MultiGetJsonObject( json: Expression, @@ -168,7 +174,6 @@ case class MultiGetJsonObject( require( fieldNames.nonEmpty && - fieldNames.distinct.length == fieldNames.length && fallbackPaths.length == fieldNames.length) override def child: Expression = json @@ -189,10 +194,18 @@ case class MultiGetJsonObject( final override val nodePatterns: Seq[TreePattern] = Seq(GET_JSON_OBJECT) + @transient + private lazy val namedPaths = fallbackPaths.map { path => + GetJsonObject.simpleNamedPath(UTF8String.fromString(path)).getOrElse { + throw new IllegalArgumentException(s"Unsupported shared JSON path: $path") + } + } + @transient private lazy val evaluator = MultiGetJsonObjectEvaluator( fieldNames, - fallbackPaths.map(UTF8String.fromString)) + fallbackPaths.map(UTF8String.fromString), + namedPaths) override def eval(input: InternalRow): Any = { evaluator.evaluate(json.eval(input).asInstanceOf[UTF8String]) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeCsvJsonExprs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeCsvJsonExprs.scala index 29d4073916f56..162f377a77db9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeCsvJsonExprs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeCsvJsonExprs.scala @@ -46,9 +46,78 @@ object OptimizeCsvJsonExprs extends Rule[LogicalPlan] { private case class SharedJsonFields( json: Expression, - fieldNames: Seq[String], + paths: Seq[Seq[String]], alias: Alias) { - val ordinalMapping: Map[String, Int] = fieldNames.zipWithIndex.toMap + val ordinalMapping: Map[Seq[String], Int] = paths.zipWithIndex.toMap + } + + private final class SelectedJsonPathTrieNode { + var isTerminal: Boolean = false + var hasSelectedPathInSubtree: Boolean = false + val children: mutable.HashMap[String, SelectedJsonPathTrieNode] = mutable.HashMap.empty + } + + private final class SelectedJsonPathGroup { + val root = new SelectedJsonPathTrieNode + val paths = mutable.ArrayBuffer.empty[(Seq[String], String)] + + def tryAdd(candidate: (Seq[String], String)): Boolean = { + val added = addIfNoPrefixConflict(root, candidate._1) + if (added) { + paths += candidate + } + added + } + } + + // Keep the recursive shared-path traversal comfortably below executor stack limits. Deeper + // paths retain their existing independent GetJsonObject evaluation. + private val maxSharedJsonPathDepth = 64 + + /** Inserts a path unless a previously selected path is its ancestor or descendant. */ + private def addIfNoPrefixConflict( + root: SelectedJsonPathTrieNode, + path: Seq[String]): Boolean = { + var node = root + val visited = mutable.ArrayBuffer(root) + var index = 0 + var hasSelectedPrefix = false + while (index < path.length && !hasSelectedPrefix) { + if (node.isTerminal) { + hasSelectedPrefix = true + } else { + node = node.children.getOrElseUpdate(path(index), new SelectedJsonPathTrieNode) + visited += node + index += 1 + } + } + + val conflicts = hasSelectedPrefix || node.hasSelectedPathInSubtree + if (!conflicts) { + node.isTerminal = true + visited.foreach(_.hasSelectedPathInSubtree = true) + } + !conflicts + } + + // First-fit produces the same groups as repeated greedy optimizer passes, but does so in one + // invocation so parallel prefix chains do not consume one fixed-point iteration per depth. + private def groupNonConflictingPaths( + paths: Iterable[(Seq[String], String)]): Seq[Seq[(Seq[String], String)]] = { + val groups = mutable.ArrayBuffer.empty[SelectedJsonPathGroup] + paths.foreach { candidate => + var added = false + val iterator = groups.iterator + while (!added && iterator.hasNext) { + added = iterator.next().tryAdd(candidate) + } + if (!added) { + val group = new SelectedJsonPathGroup + require(group.tryAdd(candidate)) + groups += group + } + } + groups.map(_.paths.toSeq).toSeq } private def evaluatesLeftFirst(binary: BinaryArithmetic): Boolean = binary match { @@ -85,42 +154,48 @@ object OptimizeCsvJsonExprs extends Rule[LogicalPlan] { } /** - * Share simple top-level GetJsonObject paths without changing the Hive-compatible semantics of - * nested paths, wildcards, or array subscripts. [[MultiGetJsonObject]] preserves the first - * non-null duplicate-key match used by GetJsonObject, unlike JsonTuple. + * Share simple named GetJsonObject paths without changing the Hive-compatible semantics of + * wildcards or array subscripts. [[MultiGetJsonObject]] preserves the first non-null + * duplicate-key match used by GetJsonObject, unlike JsonTuple. Prefix-conflicting paths are + * placed in separate shared parses so each path retains independent legacy evaluation. */ private def shareGetJsonObjects(project: Project): Project = { val candidates = project.projectList.flatMap(collectGetJsonObjectFields) val groups = mutable.ArrayBuffer.empty[ - (Expression, mutable.ArrayBuffer[(String, String)])] + (Expression, mutable.ArrayBuffer[(Seq[String], String)])] val groupsByHash = mutable.HashMap.empty[ - Int, mutable.ArrayBuffer[(Expression, mutable.ArrayBuffer[(String, String)])]] + Int, mutable.ArrayBuffer[(Expression, mutable.ArrayBuffer[(Seq[String], String)])]] - candidates.foreach { case (getJsonObject, fieldName, path) => + candidates.foreach { case (getJsonObject, pathSegments, path) => val bucket = groupsByHash.getOrElseUpdate( getJsonObject.json.semanticHash(), mutable.ArrayBuffer.empty) bucket.find(_._1.semanticEquals(getJsonObject.json)) match { - case Some((_, fields)) => fields += fieldName -> path + case Some((_, fields)) => fields += pathSegments -> path case None => - val group = getJsonObject.json -> mutable.ArrayBuffer(fieldName -> path) + val group = getJsonObject.json -> mutable.ArrayBuffer(pathSegments -> path) bucket += group groups += group } } val sharedFields = groups.flatMap { case (json, requestedFields) => - val fieldsByName = mutable.LinkedHashMap.empty[String, String] - requestedFields.foreach { case (fieldName, path) => - fieldsByName.getOrElseUpdate(fieldName, path) + val paths = mutable.LinkedHashMap.empty[Seq[String], String] + requestedFields.foreach { case (pathSegments, path) => + paths.getOrElseUpdate(pathSegments, path) } - val fieldNames = fieldsByName.keys.toSeq - if (fieldNames.length > 1) { - val alias = Alias( - MultiGetJsonObject(json, fieldNames, fieldsByName.values.toSeq), - "_shared_json_paths")() - Some(SharedJsonFields(json, fieldNames, alias)) - } else { - None + groupNonConflictingPaths(paths).flatMap { nonConflictingPaths => + if (nonConflictingPaths.length > 1) { + val pathSegments = nonConflictingPaths.map(_._1) + val alias = Alias( + MultiGetJsonObject( + json, + pathSegments.map(_.last), + nonConflictingPaths.map(_._2)), + "_shared_json_paths")() + Some(SharedJsonFields(json, pathSegments, alias)) + } else { + None + } } }.toSeq @@ -138,12 +213,13 @@ object OptimizeCsvJsonExprs extends Rule[LogicalPlan] { } private def collectGetJsonObjectFields( - expression: Expression): Seq[(GetJsonObject, String, String)] = { + expression: Expression): Seq[(GetJsonObject, Seq[String], String)] = { expression match { case getJsonObject @ GetJsonObject(_: Attribute, Literal(path: UTF8String, StringType)) if getJsonObject.deterministic => - GetJsonObject.simpleTopLevelField(path) - .map(fieldName => (getJsonObject, fieldName, path.toString)).toSeq + GetJsonObject.simpleNamedPath(path) + .filter(_.length <= maxSharedJsonPathDepth) + .map(pathSegments => (getJsonObject, pathSegments, path.toString)).toSeq case _: GetJsonObject => Nil @@ -159,11 +235,11 @@ object OptimizeCsvJsonExprs extends Rule[LogicalPlan] { expression match { case getJsonObject @ GetJsonObject(json, Literal(path: UTF8String, StringType)) => val replacement = for { - fieldName <- GetJsonObject.simpleTopLevelField(path) + pathSegments <- GetJsonObject.simpleNamedPath(path) shared <- sharedFieldsByHash.getOrElse(json.semanticHash(), Nil).find { candidate => - candidate.json.semanticEquals(json) && candidate.ordinalMapping.contains(fieldName) + candidate.json.semanticEquals(json) && candidate.ordinalMapping.contains(pathSegments) } - } yield GetStructField(shared.alias.toAttribute, shared.ordinalMapping(fieldName)) + } yield GetStructField(shared.alias.toAttribute, shared.ordinalMapping(pathSegments)) replacement.getOrElse(getJsonObject) case _: GetJsonObject => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index db74d0378fc19..5e8e7c18f5e22 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3869,7 +3869,7 @@ object SQLConf { buildConf("spark.sql.optimizer.getJsonObjectSharedParsing.enabled") .internal() .doc(s"When true and '${JSON_EXPRESSION_OPTIMIZATION.key}' is also true, the optimizer " + - "replaces repeated simple top-level get_json_object expressions over the same input " + + "replaces repeated simple named get_json_object paths over the same input " + "with one shared parse.") .version("4.3.0") .withBindingPolicy(ConfigBindingPolicy.NOT_APPLICABLE) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeJsonExprsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeJsonExprsSuite.scala index 85d621d2bc798..914db4f9f8e1a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeJsonExprsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeJsonExprsSuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.catalyst.util.DateTimeUtils.getZoneId import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String class OptimizeJsonExprsSuite extends PlanTest with ExpressionEvalHelper { @@ -243,12 +244,68 @@ class OptimizeJsonExprsSuite extends PlanTest with ExpressionEvalHelper { comparePlans(Optimizer.execute(query.analyze), query.analyze) } - test("SPARK-47670: do not combine nested get_json_object paths") { - val nested = GetJsonObject($"json", Literal("$.a.b")) + test("SPARK-57626: share simple nested get_json_object paths") { val query = testRelation2.select( + GetJsonObject($"json", Literal("$.a.b")).as("b"), + GetJsonObject($"json", Literal("$['a']['c.d']")).as("c"), + GetJsonObject($"json", Literal("$.e")).as("e"), + GetJsonObject($"json", Literal("$.f.b")).as("other_b")) + val optimized = Optimizer.execute(query.analyze) + + optimized match { + case Project(projectList, Project(innerProjectList, _: LocalRelation)) => + val sharedAlias = innerProjectList.collectFirst { + case alias @ Alias(_: MultiGetJsonObject, "_shared_json_paths") => alias + }.getOrElse(fail(s"Missing shared JSON paths in plan:\n$optimized")) + val shared = sharedAlias.child.asInstanceOf[MultiGetJsonObject] + assert(shared.fieldNames == Seq("b", "c.d", "e", "b")) + assert(shared.fallbackPaths == Seq("$.a.b", "$['a']['c.d']", "$.e", "$.f.b")) + + val sharedAttr = sharedAlias.toAttribute + val extractedFields = projectList.flatMap(_.collect { + case getStructField: GetStructField + if getStructField.child.semanticEquals(sharedAttr) => getStructField + }) + assert(extractedFields.map(_.ordinal) == Seq(0, 1, 2, 3)) + + case _ => + fail(s"Expected shared nested JSON paths below the project, but found:\n$optimized") + } + } + + test("SPARK-57626: leave prefix-conflicting and unsupported paths independent") { + val deepPath = (1 to 65).map(index => s"field$index").mkString("$.", ".", "") + val legacyPaths = Seq("$.a.b", "$.items[0].id", "$.a.*", deepPath) + val query = testRelation2.select( + GetJsonObject($"json", Literal("$.a")).as("a"), + GetJsonObject($"json", Literal(legacyPaths(0))).as("nested"), + GetJsonObject($"json", Literal("$.c.d")).as("d"), + GetJsonObject($"json", Literal(legacyPaths(1))).as("array"), + GetJsonObject($"json", Literal(legacyPaths(2))).as("wildcard"), + GetJsonObject($"json", Literal(legacyPaths(3))).as("deep"), + GetJsonObject($"json", Literal("$.e")).as("e")) + val optimized = Optimizer.execute(query.analyze) + + val shared = optimized.collect { + case Project(projectList, _) => projectList.collectFirst { + case alias @ Alias(_: MultiGetJsonObject, "_shared_json_paths") => alias + } + }.flatten.headOption.getOrElse(fail(s"Missing shared JSON paths in plan:\n$optimized")) + .child.asInstanceOf[MultiGetJsonObject] + assert(shared.fallbackPaths == Seq("$.a", "$.c.d", "$.e")) + + val remainingPaths = optimized.expressions.flatMap(_.collect { + case GetJsonObject(_, Literal(path: UTF8String, StringType)) => path.toString + }) + assert(legacyPaths.forall(remainingPaths.contains)) + } + + test("SPARK-57626: keep a later prefix-conflicting path independent") { + val query = testRelation2.select( + GetJsonObject($"json", Literal("$.a.b")).as("b"), GetJsonObject($"json", Literal("$.a")).as("a"), - nested.as("nested"), - GetJsonObject($"json", Literal("$.c")).as("c")) + GetJsonObject($"json", Literal("$.a.c")).as("c"), + GetJsonObject($"json", Literal("$.d")).as("d")) val optimized = Optimizer.execute(query.analyze) val shared = optimized.collect { @@ -256,13 +313,60 @@ class OptimizeJsonExprsSuite extends PlanTest with ExpressionEvalHelper { case alias @ Alias(_: MultiGetJsonObject, "_shared_json_paths") => alias } }.flatten.headOption.getOrElse(fail(s"Missing shared JSON paths in plan:\n$optimized")) - assert(shared.child.asInstanceOf[MultiGetJsonObject].fieldNames == Seq("a", "c")) + .child.asInstanceOf[MultiGetJsonObject] + assert(shared.fallbackPaths == Seq("$.a.b", "$.a.c", "$.d")) assert(optimized.expressions.exists(_.exists { - case GetJsonObject(_, Literal(path, StringType)) => path.toString == "$.a.b" + case GetJsonObject(_, Literal(path: UTF8String, StringType)) => path.toString == "$.a" case _ => false })) } + test("SPARK-57626: share parallel prefix chains in one optimizer invocation") { + def prefixChain(root: String): Seq[String] = (1 to 9).map { depth => + (1 to depth).map(index => s"$root$index").mkString("$.", ".", "") + } + + val leftPaths = prefixChain("a") + val rightPaths = prefixChain("x") + val query = testRelation2.select((leftPaths ++ rightPaths).zipWithIndex.map { + case (path, index) => GetJsonObject($"json", Literal(path)).as(s"field_$index") + }: _*) + + val expectedSharedPaths = leftPaths.zip(rightPaths).map { + case (left, right) => Seq(left, right) + } + def assertSharedPaths(optimized: LogicalPlan): Unit = { + val sharedPaths = optimized.collect { + case Project(projectList, _) => projectList.collect { + case Alias(shared: MultiGetJsonObject, "_shared_json_paths") => shared.fallbackPaths + } + }.flatten + assert(sharedPaths == expectedSharedPaths) + } + + assertSharedPaths(OptimizeCsvJsonExprs(query.analyze)) + assertSharedPaths(Optimizer.execute(query.analyze)) + assertSharedPaths(OptimizerWithCollapseProject.execute(query.analyze)) + } + + test("SPARK-57626: share a wide set of simple paths") { + val pathCount = 2000 + val query = testRelation2.select((0 until pathCount).map { index => + GetJsonObject($"json", Literal(s"$$.field_$index")).as(s"field_$index") + }: _*) + val optimized = Optimizer.execute(query.analyze) + + val shared = optimized.collect { + case Project(projectList, _) => projectList.collectFirst { + case alias @ Alias(_: MultiGetJsonObject, "_shared_json_paths") => alias + } + }.flatten.headOption.getOrElse(fail(s"Missing shared JSON paths in plan:\n$optimized")) + .child.asInstanceOf[MultiGetJsonObject] + assert(shared.fallbackPaths.length == pathCount) + assert(shared.fallbackPaths.head == "$.field_0") + assert(shared.fallbackPaths.last == s"$$.field_${pathCount - 1}") + } + test("SPARK-47670: shared get_json_object paths survive project collapsing") { val query = testRelation2.select( GetJsonObject($"json", Literal("$.a")).as("a"), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index a5f5a9a0e9171..f42442cacf763 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -120,6 +120,76 @@ class JsonFunctionsSuite extends SharedSparkSession { assert(result(jsonOptimization = true, sharedParsing = true) == legacy) } + test("SPARK-57626: share simple nested get_json_object paths") { + val malformed = """{"a":{"b":1,"c":"\q"},"d":2}""" + val input = Seq[String]( + """{"a":{"b":1,"c":"x"},"a.b":{"c.d":"dot"},"d":2}""", + """{"a":{"b":"first"},"a":{"b":"second","c":"later"},"d":3}""", + """{"a":null,"a":{"b":"after-null","c":null,"c":"after-c-null"},""" + + """"a.b":{"c.d":4},"d":5}""", + """{"a":"not-object","a":{"b":{"nested":1},"c":[1,2]},""" + + """"a.b":{"c.d":"dot"},"d":null,"d":6}""", + malformed, + """{'a':{'b':'single','c':7},'a.b':{'c.d':8},'d':9}""", + """[1,2,3]""", + """{}""", + null) + + def result(jsonOptimization: Boolean, sharedParsing: Boolean): Seq[Row] = { + var rows = Seq.empty[Row] + withSQLConf( + SQLConf.JSON_EXPRESSION_OPTIMIZATION.key -> jsonOptimization.toString, + SQLConf.GET_JSON_OBJECT_SHARED_PARSING_ENABLED.key -> sharedParsing.toString) { + val query = input.toDF("json").select( + get_json_object($"json", "$.a.b"), + get_json_object($"json", "$['a']['c']"), + get_json_object($"json", "$['a.b']['c.d']"), + get_json_object($"json", "$.d")) + if (jsonOptimization && sharedParsing) { + assert(query.queryExecution.optimizedPlan.exists { plan => + plan.expressions.exists(_.exists(_.isInstanceOf[MultiGetJsonObject])) + }) + } + rows = query.collect().toSeq + } + rows + } + + val legacy = result(jsonOptimization = false, sharedParsing = false) + assert(result(jsonOptimization = true, sharedParsing = false) == legacy) + assert(result(jsonOptimization = true, sharedParsing = true) == legacy) + assert(legacy.take(6) == Seq( + Row("1", "x", "dot", "2"), + Row("first", "later", null, "3"), + Row("after-null", "after-c-null", "4", "5"), + Row("{\"nested\":1}", "[1,2]", "dot", "6"), + Row(null, null, null, null), + Row("single", "7", "8", "9"))) + } + + test("SPARK-57626: shared nested get_json_object isolates value rendering failures") { + val invalidSurrogate = "\\" + "uD800" + val input = Seq( + s"""{"a":{"b":"before","c":"$invalidSurrogate","d":"after"},"z":"root"}""") + + def result(jsonOptimization: Boolean): Seq[Row] = { + var rows = Seq.empty[Row] + withSQLConf( + SQLConf.JSON_EXPRESSION_OPTIMIZATION.key -> jsonOptimization.toString, + SQLConf.GET_JSON_OBJECT_SHARED_PARSING_ENABLED.key -> "true") { + rows = input.toDF("json").select( + get_json_object($"json", "$.a.b"), + get_json_object($"json", "$.a.c"), + get_json_object($"json", "$.a.d"), + get_json_object($"json", "$.z")).collect().toSeq + } + rows + } + + assert(result(jsonOptimization = true) == result(jsonOptimization = false)) + assert(result(jsonOptimization = true) == Seq(Row("before", null, "after", "root"))) + } + test("SPARK-47670: shared get_json_object isolates value rendering failures") { val invalidSurrogate = "\\" + "uD800" val input = Seq( @@ -233,11 +303,11 @@ class JsonFunctionsSuite extends SharedSparkSession { } } - test("SPARK-47670: shared get_json_object supports project code generation") { + test("SPARK-57626: shared nested get_json_object supports project code generation") { withSQLConf(SQLConf.GET_JSON_OBJECT_SHARED_PARSING_ENABLED.key -> "true") { - val df = Seq("""{"a":1,"b":2}""").toDF("json").select( - get_json_object($"json", "$.a"), - get_json_object($"json", "$.b")) + val df = Seq("""{"a":{"x":1,"y":2}}""").toDF("json").select( + get_json_object($"json", "$.a.x"), + get_json_object($"json", "$.a.y")) checkAnswer(df, Row("1", "2")) def containsSharedExtraction(plan: SparkPlan): Boolean = plan match { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/SharedJsonParseBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/SharedJsonParseBenchmark.scala index 7d2f74b43a0c8..48aa5842858b6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/SharedJsonParseBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/SharedJsonParseBenchmark.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StringType /** - * Benchmarks sharing repeated simple top-level get_json_object parsing. + * Benchmarks sharing repeated simple get_json_object parsing. * * To run this benchmark: * {{{ @@ -75,6 +75,40 @@ object SharedJsonParseBenchmark extends SqlBasedBenchmark { } data.unpersist() + + val nestedData = spark.range(0, rows, 1, 4) + .select(to_json(struct(struct(Seq.tabulate(fieldCount) { index => + fieldValue.as(s"field_$index") + }: _*).as("payload"))).as("json")) + .cache() + nestedData.count() + + Seq(2, 4, 8, 16).foreach { selectedFieldCount => + val pathBenchmark = new Benchmark( + s"get_json_object extracting $selectedFieldCount of $fieldCount nested fields", + rows, + output = output) + + def extractPaths(sharedParsing: Boolean): Unit = { + withSQLConf( + SQLConf.JSON_EXPRESSION_OPTIMIZATION.key -> "true", + SQLConf.GET_JSON_OBJECT_SHARED_PARSING_ENABLED.key -> sharedParsing.toString) { + nestedData.select(Seq.tabulate(selectedFieldCount) { index => + get_json_object($"json", s"$$.payload.field_$index") + }: _*).noop() + } + } + + pathBenchmark.addCase("shared parsing off", 3) { _ => + extractPaths(sharedParsing = false) + } + pathBenchmark.addCase("shared parsing on", 3) { _ => + extractPaths(sharedParsing = true) + } + pathBenchmark.run() + } + + nestedData.unpersist() } } } From bb0fff01e3edd572893df6f428c221901364c751 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Tue, 23 Jun 2026 08:08:54 -0700 Subject: [PATCH 2/7] [SPARK-57626][SQL] Remove redundant shared JSON field names --- .../expressions/json/JsonExpressionEvalUtils.scala | 12 +++++------- .../sql/catalyst/expressions/jsonExpressions.scala | 8 ++------ .../catalyst/optimizer/OptimizeCsvJsonExprs.scala | 1 - .../catalyst/optimizer/OptimizeJsonExprsSuite.scala | 2 -- .../org/apache/spark/sql/JsonFunctionsSuite.scala | 2 +- 5 files changed, 8 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala index e02497b922b46..a24541dfcd011 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala @@ -578,15 +578,13 @@ case class GetJsonObjectEvaluator(cachedPath: UTF8String) { * Evaluates multiple simple named JSON paths in one parse. */ case class MultiGetJsonObjectEvaluator( - fieldNames: Seq[String], fallbackPaths: Seq[UTF8String], namedPaths: Seq[Seq[String]]) { import SharedFactory._ require( - fieldNames.nonEmpty && - fallbackPaths.length == fieldNames.length && - namedPaths.length == fieldNames.length) + fallbackPaths.nonEmpty && + namedPaths.length == fallbackPaths.length) @transient private lazy val useTopLevelFastPath: Boolean = @@ -602,7 +600,7 @@ case class MultiGetJsonObjectEvaluator( @transient private lazy val nullRow: InternalRow = - new GenericInternalRow(Array.ofDim[Any](fieldNames.length)) + new GenericInternalRow(Array.ofDim[Any](fallbackPaths.length)) @transient private lazy val fallbackEvaluators: Seq[GetJsonObjectEvaluator] = @@ -621,8 +619,8 @@ case class MultiGetJsonObjectEvaluator( def evaluate(json: UTF8String): InternalRow = { if (json == null) return null - val values = Array.ofDim[Any](fieldNames.length) - val matched = Array.ofDim[Boolean](fieldNames.length) + val values = Array.ofDim[Any](fallbackPaths.length) + val matched = Array.ofDim[Boolean](fallbackPaths.length) try { val validObject = Utils.tryWithResource( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 864428d6a261a..7aff36c21c55b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -167,21 +167,18 @@ object GetJsonObject { */ case class MultiGetJsonObject( json: Expression, - fieldNames: Seq[String], fallbackPaths: Seq[String]) extends UnaryExpression with ExpectsInputTypes { - require( - fieldNames.nonEmpty && - fallbackPaths.length == fieldNames.length) + require(fallbackPaths.nonEmpty) override def child: Expression = json override def inputTypes: Seq[AbstractDataType] = Seq(StringTypeWithCollation(supportsTrimCollation = true)) - override lazy val dataType: DataType = StructType(fieldNames.indices.map { index => + override lazy val dataType: DataType = StructType(fallbackPaths.indices.map { index => StructField(s"_$index", StringType, nullable = true) }) @@ -203,7 +200,6 @@ case class MultiGetJsonObject( @transient private lazy val evaluator = MultiGetJsonObjectEvaluator( - fieldNames, fallbackPaths.map(UTF8String.fromString), namedPaths) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeCsvJsonExprs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeCsvJsonExprs.scala index 162f377a77db9..a95c6807bc2ca 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeCsvJsonExprs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeCsvJsonExprs.scala @@ -189,7 +189,6 @@ object OptimizeCsvJsonExprs extends Rule[LogicalPlan] { val alias = Alias( MultiGetJsonObject( json, - pathSegments.map(_.last), nonConflictingPaths.map(_._2)), "_shared_json_paths")() Some(SharedJsonFields(json, pathSegments, alias)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeJsonExprsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeJsonExprsSuite.scala index 914db4f9f8e1a..cb551b772ef31 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeJsonExprsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeJsonExprsSuite.scala @@ -210,7 +210,6 @@ class OptimizeJsonExprsSuite extends PlanTest with ExpressionEvalHelper { case alias @ Alias(_: MultiGetJsonObject, "_shared_json_paths") => alias }.getOrElse(fail(s"Missing shared JSON paths in plan:\n$optimized")) val shared = sharedAlias.child.asInstanceOf[MultiGetJsonObject] - assert(shared.fieldNames == Seq("b", "a")) assert(shared.fallbackPaths == Seq("$.b", "$['a']")) val sharedAttr = sharedAlias.toAttribute @@ -258,7 +257,6 @@ class OptimizeJsonExprsSuite extends PlanTest with ExpressionEvalHelper { case alias @ Alias(_: MultiGetJsonObject, "_shared_json_paths") => alias }.getOrElse(fail(s"Missing shared JSON paths in plan:\n$optimized")) val shared = sharedAlias.child.asInstanceOf[MultiGetJsonObject] - assert(shared.fieldNames == Seq("b", "c.d", "e", "b")) assert(shared.fallbackPaths == Seq("$.a.b", "$['a']['c.d']", "$.e", "$.f.b")) val sharedAttr = sharedAlias.toAttribute diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index f42442cacf763..20deafb8b386b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -275,7 +275,7 @@ class JsonFunctionsSuite extends SharedSparkSession { val depth = 999 val nested = "[" * depth + "1" + "]" * depth val expression = MultiGetJsonObject( - Literal(s"""{"a":$nested,"b":2}"""), Seq("a", "b"), Seq("$.a", "$.b")) + Literal(s"""{"a":$nested,"b":2}"""), Seq("$.a", "$.b")) val result = Array.ofDim[Any](1) val thread = new Thread( null, From 8c97ee4d0faa6ab3e77ccb57183bf3a340d96cac Mon Sep 17 00:00:00 2001 From: sunchao Date: Tue, 23 Jun 2026 15:25:50 +0000 Subject: [PATCH 3/7] Benchmark results for org.apache.spark.sql.execution.datasources.json.SharedJsonParseBenchmark (JDK 21, Scala 2.13, split 1 of 1) --- ...SharedJsonParseBenchmark-jdk21-results.txt | 44 +++++++++++++++---- 1 file changed, 36 insertions(+), 8 deletions(-) diff --git a/sql/core/benchmarks/SharedJsonParseBenchmark-jdk21-results.txt b/sql/core/benchmarks/SharedJsonParseBenchmark-jdk21-results.txt index dcda87f622937..f4e2d3112fbbd 100644 --- a/sql/core/benchmarks/SharedJsonParseBenchmark-jdk21-results.txt +++ b/sql/core/benchmarks/SharedJsonParseBenchmark-jdk21-results.txt @@ -6,28 +6,56 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux 6.17.0-1018-azure AMD EPYC 7763 64-Core Processor get_json_object extracting 2 of 32 fields: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------- -shared parsing off 1686 1712 25 0.1 8429.2 1.0X -shared parsing on 979 1004 23 0.2 4892.7 1.7X +shared parsing off 1625 1667 46 0.1 8125.0 1.0X +shared parsing on 910 933 20 0.2 4552.2 1.8X OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux 6.17.0-1018-azure AMD EPYC 7763 64-Core Processor get_json_object extracting 4 of 32 fields: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------- -shared parsing off 3273 3279 6 0.1 16365.1 1.0X -shared parsing on 1242 1250 14 0.2 6209.0 2.6X +shared parsing off 3154 3178 21 0.1 15770.7 1.0X +shared parsing on 1130 1178 45 0.2 5649.4 2.8X OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux 6.17.0-1018-azure AMD EPYC 7763 64-Core Processor get_json_object extracting 8 of 32 fields: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------- -shared parsing off 6449 6465 16 0.0 32247.0 1.0X -shared parsing on 1317 1331 12 0.2 6586.7 4.9X +shared parsing off 6067 6175 93 0.0 30333.8 1.0X +shared parsing on 1234 1265 27 0.2 6170.1 4.9X OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux 6.17.0-1018-azure AMD EPYC 7763 64-Core Processor get_json_object extracting 16 of 32 fields: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative -------------------------------------------------------------------------------------------------------------------------- -shared parsing off 13118 13134 15 0.0 65592.1 1.0X -shared parsing on 1823 1830 9 0.1 9113.0 7.2X +shared parsing off 11938 11946 13 0.0 59690.5 1.0X +shared parsing on 1699 1709 15 0.1 8493.5 7.0X + +OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux 6.17.0-1018-azure +AMD EPYC 7763 64-Core Processor +get_json_object extracting 2 of 32 nested fields: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +-------------------------------------------------------------------------------------------------------------------------------- +shared parsing off 1600 1607 10 0.1 8001.8 1.0X +shared parsing on 873 900 25 0.2 4367.2 1.8X + +OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux 6.17.0-1018-azure +AMD EPYC 7763 64-Core Processor +get_json_object extracting 4 of 32 nested fields: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +-------------------------------------------------------------------------------------------------------------------------------- +shared parsing off 3120 3156 35 0.1 15600.5 1.0X +shared parsing on 1102 1115 18 0.2 5509.8 2.8X + +OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux 6.17.0-1018-azure +AMD EPYC 7763 64-Core Processor +get_json_object extracting 8 of 32 nested fields: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +-------------------------------------------------------------------------------------------------------------------------------- +shared parsing off 6362 6408 46 0.0 31809.2 1.0X +shared parsing on 1347 1367 19 0.1 6733.2 4.7X + +OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux 6.17.0-1018-azure +AMD EPYC 7763 64-Core Processor +get_json_object extracting 16 of 32 nested fields: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +--------------------------------------------------------------------------------------------------------------------------------- +shared parsing off 12772 12831 54 0.0 63861.1 1.0X +shared parsing on 1852 1868 15 0.1 9260.4 6.9X From 6cff419d7fba965ca162055961d9e0f2a745c3a6 Mon Sep 17 00:00:00 2001 From: sunchao Date: Tue, 23 Jun 2026 15:26:00 +0000 Subject: [PATCH 4/7] Benchmark results for org.apache.spark.sql.execution.datasources.json.SharedJsonParseBenchmark (JDK 25, Scala 2.13, split 1 of 1) --- ...SharedJsonParseBenchmark-jdk25-results.txt | 52 ++++++++++++++----- 1 file changed, 40 insertions(+), 12 deletions(-) diff --git a/sql/core/benchmarks/SharedJsonParseBenchmark-jdk25-results.txt b/sql/core/benchmarks/SharedJsonParseBenchmark-jdk25-results.txt index b09050a476f3b..5ab0fd777a7f0 100644 --- a/sql/core/benchmarks/SharedJsonParseBenchmark-jdk25-results.txt +++ b/sql/core/benchmarks/SharedJsonParseBenchmark-jdk25-results.txt @@ -3,31 +3,59 @@ Benchmark for sharing repeated get_json_object parsing ================================================================================================ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 6.17.0-1018-azure -AMD EPYC 7763 64-Core Processor +AMD EPYC 9V74 80-Core Processor get_json_object extracting 2 of 32 fields: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------- -shared parsing off 1663 1680 26 0.1 8315.1 1.0X -shared parsing on 964 980 14 0.2 4818.8 1.7X +shared parsing off 1567 1580 18 0.1 7835.8 1.0X +shared parsing on 927 933 6 0.2 4634.1 1.7X OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 6.17.0-1018-azure -AMD EPYC 7763 64-Core Processor +AMD EPYC 9V74 80-Core Processor get_json_object extracting 4 of 32 fields: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------- -shared parsing off 3200 3215 14 0.1 15998.6 1.0X -shared parsing on 1096 1111 14 0.2 5478.2 2.9X +shared parsing off 3002 3015 14 0.1 15008.5 1.0X +shared parsing on 1187 1218 28 0.2 5933.7 2.5X OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 6.17.0-1018-azure -AMD EPYC 7763 64-Core Processor +AMD EPYC 9V74 80-Core Processor get_json_object extracting 8 of 32 fields: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------- -shared parsing off 6233 6248 13 0.0 31163.9 1.0X -shared parsing on 1304 1319 14 0.2 6519.4 4.8X +shared parsing off 6027 6030 2 0.0 30136.0 1.0X +shared parsing on 1290 1305 16 0.2 6449.2 4.7X OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 6.17.0-1018-azure -AMD EPYC 7763 64-Core Processor +AMD EPYC 9V74 80-Core Processor get_json_object extracting 16 of 32 fields: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative -------------------------------------------------------------------------------------------------------------------------- -shared parsing off 12453 12474 19 0.0 62264.1 1.0X -shared parsing on 1776 1785 9 0.1 8879.8 7.0X +shared parsing off 12003 12020 25 0.0 60015.7 1.0X +shared parsing on 1724 1728 5 0.1 8620.3 7.0X + +OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 6.17.0-1018-azure +AMD EPYC 9V74 80-Core Processor +get_json_object extracting 2 of 32 nested fields: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +-------------------------------------------------------------------------------------------------------------------------------- +shared parsing off 1620 1637 16 0.1 8098.1 1.0X +shared parsing on 966 968 2 0.2 4831.3 1.7X + +OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 6.17.0-1018-azure +AMD EPYC 9V74 80-Core Processor +get_json_object extracting 4 of 32 nested fields: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +-------------------------------------------------------------------------------------------------------------------------------- +shared parsing off 3107 3136 28 0.1 15532.8 1.0X +shared parsing on 1185 1193 7 0.2 5925.4 2.6X + +OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 6.17.0-1018-azure +AMD EPYC 9V74 80-Core Processor +get_json_object extracting 8 of 32 nested fields: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +-------------------------------------------------------------------------------------------------------------------------------- +shared parsing off 6162 6178 21 0.0 30809.9 1.0X +shared parsing on 1405 1421 14 0.1 7025.9 4.4X + +OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 6.17.0-1018-azure +AMD EPYC 9V74 80-Core Processor +get_json_object extracting 16 of 32 nested fields: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +--------------------------------------------------------------------------------------------------------------------------------- +shared parsing off 12341 12350 12 0.0 61704.0 1.0X +shared parsing on 1834 1846 10 0.1 9171.9 6.7X From 152293f5cda1274f55a926737c6dbe4100f20c9a Mon Sep 17 00:00:00 2001 From: sunchao Date: Tue, 23 Jun 2026 15:27:13 +0000 Subject: [PATCH 5/7] Benchmark results for org.apache.spark.sql.execution.datasources.json.SharedJsonParseBenchmark (JDK 17, Scala 2.13, split 1 of 1) --- .../SharedJsonParseBenchmark-results.txt | 44 +++++++++++++++---- 1 file changed, 36 insertions(+), 8 deletions(-) diff --git a/sql/core/benchmarks/SharedJsonParseBenchmark-results.txt b/sql/core/benchmarks/SharedJsonParseBenchmark-results.txt index fdee387321846..6c90a4479c3a3 100644 --- a/sql/core/benchmarks/SharedJsonParseBenchmark-results.txt +++ b/sql/core/benchmarks/SharedJsonParseBenchmark-results.txt @@ -6,28 +6,56 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1018-azure AMD EPYC 7763 64-Core Processor get_json_object extracting 2 of 32 fields: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------- -shared parsing off 1526 1558 35 0.1 7629.3 1.0X -shared parsing on 902 907 6 0.2 4508.0 1.7X +shared parsing off 1577 1587 15 0.1 7883.7 1.0X +shared parsing on 932 946 16 0.2 4661.0 1.7X OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1018-azure AMD EPYC 7763 64-Core Processor get_json_object extracting 4 of 32 fields: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------- -shared parsing off 2978 2999 25 0.1 14890.0 1.0X -shared parsing on 1027 1029 1 0.2 5136.9 2.9X +shared parsing off 3005 3052 41 0.1 15025.5 1.0X +shared parsing on 1074 1075 2 0.2 5370.1 2.8X OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1018-azure AMD EPYC 7763 64-Core Processor get_json_object extracting 8 of 32 fields: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------- -shared parsing off 5871 5889 16 0.0 29355.9 1.0X -shared parsing on 1245 1251 6 0.2 6226.5 4.7X +shared parsing off 6090 6104 12 0.0 30448.1 1.0X +shared parsing on 1315 1321 8 0.2 6575.3 4.6X OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1018-azure AMD EPYC 7763 64-Core Processor get_json_object extracting 16 of 32 fields: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative -------------------------------------------------------------------------------------------------------------------------- -shared parsing off 11620 11679 54 0.0 58098.9 1.0X -shared parsing on 1704 1707 3 0.1 8519.4 6.8X +shared parsing off 12144 12159 17 0.0 60719.4 1.0X +shared parsing on 1840 1848 7 0.1 9201.7 6.6X + +OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1018-azure +AMD EPYC 7763 64-Core Processor +get_json_object extracting 2 of 32 nested fields: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +-------------------------------------------------------------------------------------------------------------------------------- +shared parsing off 1498 1509 9 0.1 7492.3 1.0X +shared parsing on 894 895 1 0.2 4471.6 1.7X + +OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1018-azure +AMD EPYC 7763 64-Core Processor +get_json_object extracting 4 of 32 nested fields: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +-------------------------------------------------------------------------------------------------------------------------------- +shared parsing off 2969 2978 8 0.1 14845.4 1.0X +shared parsing on 1129 1131 4 0.2 5644.8 2.6X + +OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1018-azure +AMD EPYC 7763 64-Core Processor +get_json_object extracting 8 of 32 nested fields: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +-------------------------------------------------------------------------------------------------------------------------------- +shared parsing off 5841 5847 9 0.0 29206.9 1.0X +shared parsing on 1378 1384 7 0.1 6889.6 4.2X + +OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1018-azure +AMD EPYC 7763 64-Core Processor +get_json_object extracting 16 of 32 nested fields: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +--------------------------------------------------------------------------------------------------------------------------------- +shared parsing off 11856 11904 64 0.0 59278.4 1.0X +shared parsing on 1926 1936 12 0.1 9630.4 6.2X From 917b893e784fc74bdc70da2c250c3ed2cd141c07 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Tue, 23 Jun 2026 09:14:14 -0700 Subject: [PATCH 6/7] [SPARK-57626][SQL] Clarify shared JSON path invariants --- .../sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala | 2 ++ .../apache/spark/sql/catalyst/expressions/jsonExpressions.scala | 1 + 2 files changed, 3 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala index a24541dfcd011..fc34d8fecee81 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala @@ -704,6 +704,8 @@ case class MultiGetJsonObjectEvaluator( node: MultiGetJsonObjectEvaluator.PathTrieNode, values: Array[Any], matched: Array[Boolean]): Boolean = { + // Optimizer-generated paths are deduplicated. Multiple ordinals defensively support + // directly constructed internal expressions with duplicate paths. if (node.terminalOrdinals.nonEmpty) { node.terminalOrdinals.foreach { ordinal => matched(ordinal) = true } val value = copyCurrentStructure(parser) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 7aff36c21c55b..b82ed39824f9d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -171,6 +171,7 @@ case class MultiGetJsonObject( extends UnaryExpression with ExpectsInputTypes { + // OptimizeCsvJsonExprs caps shared path depth to keep evaluator recursion stack-safe. require(fallbackPaths.nonEmpty) override def child: Expression = json From 6754fbf286fa733384826f8e9752a9de0f12881c Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Tue, 23 Jun 2026 12:15:27 -0700 Subject: [PATCH 7/7] [SPARK-57626][SQL] Address review nits --- .../expressions/json/JsonExpressionEvalUtils.scala | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala index fc34d8fecee81..b5d31af455851 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.expressions.json import java.io.{ByteArrayOutputStream, CharArrayWriter, StringWriter} +import scala.collection.mutable import scala.util.parsing.combinator.RegexParsers import com.fasterxml.jackson.core._ @@ -582,9 +583,7 @@ case class MultiGetJsonObjectEvaluator( namedPaths: Seq[Seq[String]]) { import SharedFactory._ - require( - fallbackPaths.nonEmpty && - namedPaths.length == fallbackPaths.length) + require(fallbackPaths.nonEmpty && namedPaths.length == fallbackPaths.length) @transient private lazy val useTopLevelFastPath: Boolean = @@ -794,10 +793,8 @@ case class MultiGetJsonObjectEvaluator( object MultiGetJsonObjectEvaluator { private final class MutablePathTrieNode { - val terminalOrdinals: scala.collection.mutable.ArrayBuffer[Int] = - scala.collection.mutable.ArrayBuffer.empty - val children: scala.collection.mutable.LinkedHashMap[String, MutablePathTrieNode] = - scala.collection.mutable.LinkedHashMap.empty + val terminalOrdinals: mutable.ArrayBuffer[Int] = mutable.ArrayBuffer.empty + val children: mutable.LinkedHashMap[String, MutablePathTrieNode] = mutable.LinkedHashMap.empty def freeze(): PathTrieNode = { require(