Skip to content

[SPARK-57678][SQL] Add explodeEmbeddedArray JSON option for streaming scans of embedded arrays.#56756

Open
chenhao-db wants to merge 3 commits into
apache:masterfrom
chenhao-db:explodeEmbeddedArray
Open

[SPARK-57678][SQL] Add explodeEmbeddedArray JSON option for streaming scans of embedded arrays.#56756
chenhao-db wants to merge 3 commits into
apache:masterfrom
chenhao-db:explodeEmbeddedArray

Conversation

@chenhao-db

Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Adds a new JSON reader option explodeEmbeddedArray for reading JSON files that are object documents with a top-level array-valued field, e.g. {..., "Records": [record1, record2, ...], ...}. The option value names the array field to explode; each element of that array becomes one row, stored as a single VARIANT column (like singleVariantColumn, with which it is mutually exclusive). The inferred schema names the variant column after the array field, and a user-specified schema may give the column any name.

The records are split out of the array by a new streaming EmbeddedArraySplitter, and each record is then parsed by the unchanged JSON parsing logic, so neither the whole array text nor the result rows are ever buffered in memory. The scan is whole-file: a new EmbeddedArrayJsonDataSource reports isSplitable = false, which both the V1 JsonFileFormat and the V2 JsonScan already consult, so the option works in both v1 and v2 scans (including compressed files, partitions, and streaming reads). The option is rejected in from_json and json(Dataset[String]), where each input is already a single document.

It is an intentional design decision that anything outside the array is ignored, because:

  1. Users only care about the records array in common use cases (e.g. AWS CloudTrail).
  2. It is difficult to implement if we want to achive both: avoid buffering the whole array, scanning the file only once.

Why are the changes needed?

Some common JSON formats (e.g. AWS CloudTrail) wrap their records in a single top-level array inside one large object document. Today, ingesting such files requires materializing the whole array, which requires all elements to stay in memory at the same time and can easily cause OOM. This option makes that ingestion efficient by streaming the records out one at a time as if they were independent input records, never buffering the array.

Does this PR introduce any user-facing change?

Yes — a new opt-in JSON reader option explodeEmbeddedArray. There is no behavior change unless the option is set. Example:

// file.json: {"Records": [{"a": 1}, {"b": 2}]}
spark.read.format("json").option("explodeEmbeddedArray", "Records").load(path)
// => 2 rows, one VARIANT column named "Records": {"a":1}, {"b":2}

It also adds three new error conditions: EXPLODE_EMBEDDED_ARRAY_CONFLICTING_OPTION, EXPLODE_EMBEDDED_ARRAY_UNSUPPORTED_USAGE, and INVALID_EXPLODE_EMBEDDED_ARRAY_SCHEMA.

How was this patch tested?

New EmbeddedArraySplitterSuite (splitter unit tests, including custom array field names, nesting/escapes, scalars, BOM, truncated input, and inputs larger than the internal buffer) and ExplodeEmbeddedArrayJsonV1Suite/ExplodeEmbeddedArrayJsonV2Suite (end-to-end: schema inference/validation, array field names other than Records, user schemas that rename the variant column, malformed records in all parse modes, truncated/compressed/partitioned/streaming inputs, option conflicts, the from_json/json(Dataset[String]) rejections, and equivalence with reading the records as ndjson with singleVariantColumn). JsonSuite's option-validation test was updated for the new option. All pass.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (Opus 4.8)

@chenhao-db chenhao-db changed the title Add explodeEmbeddedArray JSON option for streaming scans of embedded arrays. [SPARK-57678] Add explodeEmbeddedArray JSON option for streaming scans of embedded arrays. Jun 24, 2026
@chenhao-db

Copy link
Copy Markdown
Contributor Author

@cloud-fan could you help review it? Thanks!

@HyukjinKwon HyukjinKwon left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0 blocking, 1 non-blocking, 0 nits.
High-quality, well-tested addition. The hand-written streaming splitter is the correctness-critical surface, and it holds up across the JSON-structure edge cases that matter; the unchanged parser is reused, and validation/error classes are thorough and well-formed.

Correctness (1)

  • EmbeddedArrayJsonDataSource.scala:78: charset is forced to encoding.getOrElse(UTF_8), which may skip the regular reader's charset detection for non-UTF-8 files — see inline

Verification

Traced EmbeddedArraySplitter across the JSON state space: string escapes (consumeString consumes the char after \ unconditionally, so \"/\\ don't falsely close or nest), braces/brackets inside strings ignored for depth, scalars, nested-array records, EOF/partial records (deferred to the parser as corrupt), top-level-only key matching, BOM, stray commas, and records > 64KB (handled across buffer refills). The suite tests exactly these ("}]ignore me[{" inside a string, \"/\\, a >64KB record, escaped keys not matching). Mutual exclusion with singleVariantColumn and schema constraints are validated with dedicated, correctly-ordered error classes.

stream: InputStream,
parser: JacksonParser,
schema: StructType): Iterator[InternalRow] = {
val encoding = parser.options.encoding.getOrElse(StandardCharsets.UTF_8.name())

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parseStream resolves the charset as parser.options.encoding.getOrElse(UTF_8) and wraps the stream in an InputStreamReader with it. For a non-UTF-8 JSON file where the user didn't set the encoding option, this forces UTF-8 rather than using the charset detection the regular (multiline) JSON reader applies.

Question: is encoding auto-detection intended to be supported with explodeEmbeddedArray? If so, this path would need to mirror it; if not (explicit encoding required for non-UTF-8), a line in the option docs would set expectations. The explicit \uFEFF BOM skip in findEmbeddedArray suggests encoding was considered, hence a question rather than an assertion. Non-blocking.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a bit difficult to support auto-detection here, so I choose to keep code unchanged and explicitly document it.

The reason it differs from the other readers: every other JSON path hands the raw byte stream to Jackson (createParser(InputStream)), and Jackson's ByteSourceJsonBootstrapper does the BOM + byte-pattern charset detection internally — Spark never sees that logic. EmbeddedArrayJsonDataSource is the exception because the EmbeddedArraySplitter scans characters to find record boundaries, so it needs a decoded Reader and has to choose the charset before Jackson is involved. Mirroring auto-detection would mean reimplementing Jackson's internal detection in Spark, which isn't worth it for this, and Jackson's bootstrapper isn't reusable as stable API.

@chenhao-db chenhao-db requested a review from HyukjinKwon June 26, 2026 00:16

@uros-b uros-b left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chenhao-db Please mention a component in the PR title.

@chenhao-db chenhao-db changed the title [SPARK-57678] Add explodeEmbeddedArray JSON option for streaming scans of embedded arrays. [SPARK-57678][SQL] Add explodeEmbeddedArray JSON option for streaming scans of embedded arrays. Jun 28, 2026
@chenhao-db

Copy link
Copy Markdown
Contributor Author

@HyukjinKwon could you take another look? Thanks!

@cloud-fan cloud-fan left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0 blocking, 1 non-blocking, 0 nits.
Clean, well-structured, thoroughly tested addition. The hand-written EmbeddedArraySplitter (the correctness-critical surface) holds up across the JSON-structure state space and is exercised by an independent-fixture unit suite; the reused parser, the validation helper kept in sync with singleVariantColumn, and v1/v2 parity all check out.

Design / architecture (1)

  • The new user-facing explodeEmbeddedArray option isn't added to docs/sql-data-sources-json.md, whereas its analogue singleVariantColumn is (line 211). Worth documenting it there so the option is discoverable alongside its peers. Non-blocking.

Verification

Traced EmbeddedArraySplitter over the java.io.Reader.read state space across buffer refills and EOF: a normal >0 read advances, -1 (EOF) propagates as the -1 sentinel through every consumer, and a mid-value EOF returns the partial record for the JSON parser to reject as corrupt. String escapes (consumeString consumes the char after \ unconditionally), braces/brackets inside strings, scalars, nested-array records, BOM, stray commas, and records larger than the 64KB buffer are all handled, and the unit suite feeds these via raw hand-written input (an independent producer). The prior encoding-detection question is already resolved in-thread, so not re-raised.

@chenhao-db

Copy link
Copy Markdown
Contributor Author

@cloud-fan Thanks! doc is updated.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants