[SPARK-57678][SQL] Add explodeEmbeddedArray JSON option for streaming scans of embedded arrays.#56756
[SPARK-57678][SQL] Add explodeEmbeddedArray JSON option for streaming scans of embedded arrays.#56756chenhao-db wants to merge 3 commits into
Conversation
|
@cloud-fan could you help review it? Thanks! |
HyukjinKwon
left a comment
There was a problem hiding this comment.
0 blocking, 1 non-blocking, 0 nits.
High-quality, well-tested addition. The hand-written streaming splitter is the correctness-critical surface, and it holds up across the JSON-structure edge cases that matter; the unchanged parser is reused, and validation/error classes are thorough and well-formed.
Correctness (1)
- EmbeddedArrayJsonDataSource.scala:78: charset is forced to
encoding.getOrElse(UTF_8), which may skip the regular reader's charset detection for non-UTF-8 files — see inline
Verification
Traced EmbeddedArraySplitter across the JSON state space: string escapes (consumeString consumes the char after \ unconditionally, so \"/\\ don't falsely close or nest), braces/brackets inside strings ignored for depth, scalars, nested-array records, EOF/partial records (deferred to the parser as corrupt), top-level-only key matching, BOM, stray commas, and records > 64KB (handled across buffer refills). The suite tests exactly these ("}]ignore me[{" inside a string, \"/\\, a >64KB record, escaped keys not matching). Mutual exclusion with singleVariantColumn and schema constraints are validated with dedicated, correctly-ordered error classes.
| stream: InputStream, | ||
| parser: JacksonParser, | ||
| schema: StructType): Iterator[InternalRow] = { | ||
| val encoding = parser.options.encoding.getOrElse(StandardCharsets.UTF_8.name()) |
There was a problem hiding this comment.
parseStream resolves the charset as parser.options.encoding.getOrElse(UTF_8) and wraps the stream in an InputStreamReader with it. For a non-UTF-8 JSON file where the user didn't set the encoding option, this forces UTF-8 rather than using the charset detection the regular (multiline) JSON reader applies.
Question: is encoding auto-detection intended to be supported with explodeEmbeddedArray? If so, this path would need to mirror it; if not (explicit encoding required for non-UTF-8), a line in the option docs would set expectations. The explicit \uFEFF BOM skip in findEmbeddedArray suggests encoding was considered, hence a question rather than an assertion. Non-blocking.
There was a problem hiding this comment.
It is a bit difficult to support auto-detection here, so I choose to keep code unchanged and explicitly document it.
The reason it differs from the other readers: every other JSON path hands the raw byte stream to Jackson (createParser(InputStream)), and Jackson's ByteSourceJsonBootstrapper does the BOM + byte-pattern charset detection internally — Spark never sees that logic. EmbeddedArrayJsonDataSource is the exception because the EmbeddedArraySplitter scans characters to find record boundaries, so it needs a decoded Reader and has to choose the charset before Jackson is involved. Mirroring auto-detection would mean reimplementing Jackson's internal detection in Spark, which isn't worth it for this, and Jackson's bootstrapper isn't reusable as stable API.
uros-b
left a comment
There was a problem hiding this comment.
@chenhao-db Please mention a component in the PR title.
|
@HyukjinKwon could you take another look? Thanks! |
cloud-fan
left a comment
There was a problem hiding this comment.
0 blocking, 1 non-blocking, 0 nits.
Clean, well-structured, thoroughly tested addition. The hand-written EmbeddedArraySplitter (the correctness-critical surface) holds up across the JSON-structure state space and is exercised by an independent-fixture unit suite; the reused parser, the validation helper kept in sync with singleVariantColumn, and v1/v2 parity all check out.
Design / architecture (1)
- The new user-facing
explodeEmbeddedArrayoption isn't added todocs/sql-data-sources-json.md, whereas its analoguesingleVariantColumnis (line 211). Worth documenting it there so the option is discoverable alongside its peers. Non-blocking.
Verification
Traced EmbeddedArraySplitter over the java.io.Reader.read state space across buffer refills and EOF: a normal >0 read advances, -1 (EOF) propagates as the -1 sentinel through every consumer, and a mid-value EOF returns the partial record for the JSON parser to reject as corrupt. String escapes (consumeString consumes the char after \ unconditionally), braces/brackets inside strings, scalars, nested-array records, BOM, stray commas, and records larger than the 64KB buffer are all handled, and the unit suite feeds these via raw hand-written input (an independent producer). The prior encoding-detection question is already resolved in-thread, so not re-raised.
|
@cloud-fan Thanks! doc is updated. |
What changes were proposed in this pull request?
Adds a new JSON reader option
explodeEmbeddedArrayfor reading JSON files that are object documents with a top-level array-valued field, e.g.{..., "Records": [record1, record2, ...], ...}. The option value names the array field to explode; each element of that array becomes one row, stored as a single VARIANT column (likesingleVariantColumn, with which it is mutually exclusive). The inferred schema names the variant column after the array field, and a user-specified schema may give the column any name.The records are split out of the array by a new streaming
EmbeddedArraySplitter, and each record is then parsed by the unchanged JSON parsing logic, so neither the whole array text nor the result rows are ever buffered in memory. The scan is whole-file: a newEmbeddedArrayJsonDataSourcereportsisSplitable = false, which both the V1JsonFileFormatand the V2JsonScanalready consult, so the option works in both v1 and v2 scans (including compressed files, partitions, and streaming reads). The option is rejected infrom_jsonandjson(Dataset[String]), where each input is already a single document.It is an intentional design decision that anything outside the array is ignored, because:
Why are the changes needed?
Some common JSON formats (e.g. AWS CloudTrail) wrap their records in a single top-level array inside one large object document. Today, ingesting such files requires materializing the whole array, which requires all elements to stay in memory at the same time and can easily cause OOM. This option makes that ingestion efficient by streaming the records out one at a time as if they were independent input records, never buffering the array.
Does this PR introduce any user-facing change?
Yes — a new opt-in JSON reader option
explodeEmbeddedArray. There is no behavior change unless the option is set. Example:It also adds three new error conditions:
EXPLODE_EMBEDDED_ARRAY_CONFLICTING_OPTION,EXPLODE_EMBEDDED_ARRAY_UNSUPPORTED_USAGE, andINVALID_EXPLODE_EMBEDDED_ARRAY_SCHEMA.How was this patch tested?
New
EmbeddedArraySplitterSuite(splitter unit tests, including custom array field names, nesting/escapes, scalars, BOM, truncated input, and inputs larger than the internal buffer) andExplodeEmbeddedArrayJsonV1Suite/ExplodeEmbeddedArrayJsonV2Suite(end-to-end: schema inference/validation, array field names other thanRecords, user schemas that rename the variant column, malformed records in all parse modes, truncated/compressed/partitioned/streaming inputs, option conflicts, thefrom_json/json(Dataset[String])rejections, and equivalence with reading the records as ndjson withsingleVariantColumn).JsonSuite's option-validation test was updated for the new option. All pass.Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Opus 4.8)