Skip to content

Commit 69d0f44

Browse files
authored
Support JSON arrays reader/parse for datafusion (#19924)
## Which issue does this PR close? Closes #19920 ## Rationale for this change DataFusion currently only supports line-delimited JSON (NDJSON) format. Many data sources provide JSON in array format `[{...}, {...}]`, which cannot be parsed by the existing implementation. ## What changes are included in this PR? - Add `newline_delimited` option to `JsonOptions` (default `true` for backward compatibility) - Implement streaming JSON array to NDJSON conversion via `JsonArrayToNdjsonReader` - Support both file-based and stream-based (e.g., S3) reading with memory-efficient streaming - Add `ChannelReader` for async-to-sync byte transfer in object store streaming scenarios - Add protobuf serialization support for the new option - Rename `NdJsonReadOptions` to `JsonReadOptions` (with deprecation alias) - SQL support via `OPTIONS ('format.newline_delimited' 'false')` ### Architecture ```text JSON Array File (e.g., 33GB) │ ▼ read chunks via ChannelReader (for streams) or BufReader (for files) ┌───────────────────┐ │ JsonArrayToNdjson │ ← streaming character substitution: │ Reader │ '[' skip, ',' → '\n', ']' stop └───────────────────┘ │ ▼ outputs NDJSON format ┌───────────────────┐ │ Arrow Reader │ ← batch parsing └───────────────────┘ │ ▼ RecordBatch ``` ### Memory Efficiency | Approach | Memory for 33GB file | Parse count | |----------|---------------------|-------------| | Load entire file + serde_json | ~100GB+ | 3x | | Streaming with JsonArrayToNdjsonReader | ~32MB | 1x | ## Are these changes tested? Yes: - Unit tests for `JsonArrayToNdjsonReader` (nested objects, escaped strings, empty arrays, buffer boundaries) - Unit tests for `ChannelReader` - Integration tests for `JsonOpener` (file-based, stream-based, large files, cancellation) - Schema inference tests (normal, empty, nested struct, list types) - End-to-end query tests with SQL - SQLLogicTest for SQL validation ## Are there any user-facing changes? Yes. Users can now read JSON array format files: **Via SQL:** ```sql CREATE EXTERNAL TABLE my_table STORED AS JSON OPTIONS ('format.newline_delimited' 'false') LOCATION 'path/to/array.json'; ``` **Via API:** ```rust let options = JsonReadOptions::default().newline_delimited(false); ctx.register_json("my_table", "path/to/array.json", options).await?; ``` **Note:** `NdJsonReadOptions` is deprecated in favor of `JsonReadOptions`. **Limitation:** JSON array format does not support range-based file scanning (`repartition_file_scans`). Users will see a clear error message if this is attempted.
1 parent dff1cad commit 69d0f44

File tree

28 files changed

+1847
-85
lines changed

28 files changed

+1847
-85
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,8 @@ strum_macros = "0.27.2"
188188
tempfile = "3"
189189
testcontainers-modules = { version = "0.14" }
190190
tokio = { version = "1.48", features = ["macros", "rt", "sync"] }
191+
tokio-stream = "0.1"
192+
tokio-util = "0.7"
191193
url = "2.5.7"
192194
uuid = "1.20"
193195
zstd = { version = "0.13", default-features = false }

datafusion-examples/examples/custom_data_source/csv_json_opener.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ async fn json_opener() -> Result<()> {
125125
projected,
126126
FileCompressionType::UNCOMPRESSED,
127127
Arc::new(object_store),
128+
true,
128129
);
129130

130131
let scan_config = FileScanConfigBuilder::new(

datafusion/common/src/config.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3065,6 +3065,22 @@ config_namespace! {
30653065
/// If not specified, the default level for the compression algorithm is used.
30663066
pub compression_level: Option<u32>, default = None
30673067
pub schema_infer_max_rec: Option<usize>, default = None
3068+
/// The JSON format to use when reading files.
3069+
///
3070+
/// When `true` (default), expects newline-delimited JSON (NDJSON):
3071+
/// ```text
3072+
/// {"key1": 1, "key2": "val"}
3073+
/// {"key1": 2, "key2": "vals"}
3074+
/// ```
3075+
///
3076+
/// When `false`, expects JSON array format:
3077+
/// ```text
3078+
/// [
3079+
/// {"key1": 1, "key2": "val"},
3080+
/// {"key1": 2, "key2": "vals"}
3081+
/// ]
3082+
/// ```
3083+
pub newline_delimited: bool, default = true
30683084
}
30693085
}
30703086

datafusion/core/src/dataframe/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -512,7 +512,7 @@ impl DataFrame {
512512
/// # #[tokio::main]
513513
/// # async fn main() -> Result<()> {
514514
/// let ctx = SessionContext::new();
515-
/// let df = ctx.read_json("tests/data/unnest.json", NdJsonReadOptions::default()).await?;
515+
/// let df = ctx.read_json("tests/data/unnest.json", JsonReadOptions::default()).await?;
516516
/// // expand into multiple columns if it's json array, flatten field name if it's nested structure
517517
/// let df = df.unnest_columns(&["b","c","d"])?;
518518
/// let expected = vec![

datafusion/core/src/datasource/file_format/json.rs

Lines changed: 251 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ mod tests {
2525
use super::*;
2626

2727
use crate::datasource::file_format::test_util::scan_format;
28-
use crate::prelude::{NdJsonReadOptions, SessionConfig, SessionContext};
28+
use crate::prelude::{SessionConfig, SessionContext};
2929
use crate::test::object_store::local_unpartitioned_file;
3030
use arrow::array::RecordBatch;
3131
use arrow_schema::Schema;
@@ -46,12 +46,54 @@ mod tests {
4646
use datafusion_common::internal_err;
4747
use datafusion_common::stats::Precision;
4848

49+
use crate::execution::options::JsonReadOptions;
4950
use datafusion_common::Result;
51+
use datafusion_datasource::file_compression_type::FileCompressionType;
5052
use futures::StreamExt;
5153
use insta::assert_snapshot;
5254
use object_store::local::LocalFileSystem;
5355
use regex::Regex;
5456
use rstest::rstest;
57+
// ==================== Test Helpers ====================
58+
59+
/// Create a temporary JSON file and return (TempDir, path)
60+
fn create_temp_json(content: &str) -> (tempfile::TempDir, String) {
61+
let tmp_dir = tempfile::TempDir::new().unwrap();
62+
let path = tmp_dir.path().join("test.json");
63+
std::fs::write(&path, content).unwrap();
64+
(tmp_dir, path.to_string_lossy().to_string())
65+
}
66+
67+
/// Infer schema from JSON array format file
68+
async fn infer_json_array_schema(
69+
content: &str,
70+
) -> Result<arrow::datatypes::SchemaRef> {
71+
let (_tmp_dir, path) = create_temp_json(content);
72+
let session = SessionContext::new();
73+
let ctx = session.state();
74+
let store = Arc::new(LocalFileSystem::new()) as _;
75+
let format = JsonFormat::default().with_newline_delimited(false);
76+
format
77+
.infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)])
78+
.await
79+
}
80+
81+
/// Register a JSON array table and run a query
82+
async fn query_json_array(content: &str, query: &str) -> Result<Vec<RecordBatch>> {
83+
let (_tmp_dir, path) = create_temp_json(content);
84+
let ctx = SessionContext::new();
85+
let options = JsonReadOptions::default().newline_delimited(false);
86+
ctx.register_json("test_table", &path, options).await?;
87+
ctx.sql(query).await?.collect().await
88+
}
89+
90+
/// Register a JSON array table and run a query, return formatted string
91+
async fn query_json_array_str(content: &str, query: &str) -> Result<String> {
92+
let result = query_json_array(content, query).await?;
93+
Ok(batches_to_string(&result))
94+
}
95+
96+
// ==================== Existing Tests ====================
5597

5698
#[tokio::test]
5799
async fn read_small_batches() -> Result<()> {
@@ -208,7 +250,7 @@ mod tests {
208250
let ctx = SessionContext::new_with_config(config);
209251

210252
let table_path = "tests/data/1.json";
211-
let options = NdJsonReadOptions::default();
253+
let options = JsonReadOptions::default();
212254

213255
ctx.register_json("json_parallel", table_path, options)
214256
.await?;
@@ -240,7 +282,7 @@ mod tests {
240282
let ctx = SessionContext::new_with_config(config);
241283

242284
let table_path = "tests/data/empty.json";
243-
let options = NdJsonReadOptions::default();
285+
let options = JsonReadOptions::default();
244286

245287
ctx.register_json("json_parallel_empty", table_path, options)
246288
.await?;
@@ -314,7 +356,6 @@ mod tests {
314356
.digest(r#"{ "c1": 11, "c2": 12, "c3": 13, "c4": 14, "c5": 15 }"#.into());
315357

316358
let mut all_batches = RecordBatch::new_empty(schema.clone());
317-
// We get RequiresMoreData after 2 batches because of how json::Decoder works
318359
for _ in 0..2 {
319360
let output = deserializer.next()?;
320361
let DeserializerOutput::RecordBatch(batch) = output else {
@@ -354,11 +395,11 @@ mod tests {
354395
async fn test_write_empty_json_from_sql() -> Result<()> {
355396
let ctx = SessionContext::new();
356397
let tmp_dir = tempfile::TempDir::new()?;
357-
let path = format!("{}/empty_sql.json", tmp_dir.path().to_string_lossy());
398+
let path = tmp_dir.path().join("empty_sql.json");
399+
let path = path.to_string_lossy().to_string();
358400
let df = ctx.sql("SELECT CAST(1 AS BIGINT) AS id LIMIT 0").await?;
359401
df.write_json(&path, crate::dataframe::DataFrameWriteOptions::new(), None)
360402
.await?;
361-
// Expected the file to exist and be empty
362403
assert!(std::path::Path::new(&path).exists());
363404
let metadata = std::fs::metadata(&path)?;
364405
assert_eq!(metadata.len(), 0);
@@ -381,14 +422,216 @@ mod tests {
381422
)?;
382423

383424
let tmp_dir = tempfile::TempDir::new()?;
384-
let path = format!("{}/empty_batch.json", tmp_dir.path().to_string_lossy());
425+
let path = tmp_dir.path().join("empty_batch.json");
426+
let path = path.to_string_lossy().to_string();
385427
let df = ctx.read_batch(empty_batch.clone())?;
386428
df.write_json(&path, crate::dataframe::DataFrameWriteOptions::new(), None)
387429
.await?;
388-
// Expected the file to exist and be empty
389430
assert!(std::path::Path::new(&path).exists());
390431
let metadata = std::fs::metadata(&path)?;
391432
assert_eq!(metadata.len(), 0);
392433
Ok(())
393434
}
435+
436+
// ==================== JSON Array Format Tests ====================
437+
438+
#[tokio::test]
439+
async fn test_json_array_schema_inference() -> Result<()> {
440+
let schema = infer_json_array_schema(
441+
r#"[{"a": 1, "b": 2.0, "c": true}, {"a": 2, "b": 3.5, "c": false}]"#,
442+
)
443+
.await?;
444+
445+
let fields: Vec<_> = schema
446+
.fields()
447+
.iter()
448+
.map(|f| format!("{}: {:?}", f.name(), f.data_type()))
449+
.collect();
450+
assert_eq!(vec!["a: Int64", "b: Float64", "c: Boolean"], fields);
451+
Ok(())
452+
}
453+
454+
#[tokio::test]
455+
async fn test_json_array_empty() -> Result<()> {
456+
let schema = infer_json_array_schema("[]").await?;
457+
assert_eq!(schema.fields().len(), 0);
458+
Ok(())
459+
}
460+
461+
#[tokio::test]
462+
async fn test_json_array_nested_struct() -> Result<()> {
463+
let schema = infer_json_array_schema(
464+
r#"[{"id": 1, "info": {"name": "Alice", "age": 30}}]"#,
465+
)
466+
.await?;
467+
468+
let info_field = schema.field_with_name("info").unwrap();
469+
assert!(matches!(info_field.data_type(), DataType::Struct(_)));
470+
Ok(())
471+
}
472+
473+
#[tokio::test]
474+
async fn test_json_array_list_type() -> Result<()> {
475+
let schema =
476+
infer_json_array_schema(r#"[{"id": 1, "tags": ["a", "b", "c"]}]"#).await?;
477+
478+
let tags_field = schema.field_with_name("tags").unwrap();
479+
assert!(matches!(tags_field.data_type(), DataType::List(_)));
480+
Ok(())
481+
}
482+
483+
#[tokio::test]
484+
async fn test_json_array_basic_query() -> Result<()> {
485+
let result = query_json_array_str(
486+
r#"[{"a": 1, "b": "hello"}, {"a": 2, "b": "world"}, {"a": 3, "b": "test"}]"#,
487+
"SELECT a, b FROM test_table ORDER BY a",
488+
)
489+
.await?;
490+
491+
assert_snapshot!(result, @r"
492+
+---+-------+
493+
| a | b |
494+
+---+-------+
495+
| 1 | hello |
496+
| 2 | world |
497+
| 3 | test |
498+
+---+-------+
499+
");
500+
Ok(())
501+
}
502+
503+
#[tokio::test]
504+
async fn test_json_array_with_nulls() -> Result<()> {
505+
let result = query_json_array_str(
506+
r#"[{"id": 1, "name": "Alice"}, {"id": 2, "name": null}, {"id": 3, "name": "Charlie"}]"#,
507+
"SELECT id, name FROM test_table ORDER BY id",
508+
)
509+
.await?;
510+
511+
assert_snapshot!(result, @r"
512+
+----+---------+
513+
| id | name |
514+
+----+---------+
515+
| 1 | Alice |
516+
| 2 | |
517+
| 3 | Charlie |
518+
+----+---------+
519+
");
520+
Ok(())
521+
}
522+
523+
#[tokio::test]
524+
async fn test_json_array_unnest() -> Result<()> {
525+
let result = query_json_array_str(
526+
r#"[{"id": 1, "values": [10, 20, 30]}, {"id": 2, "values": [40, 50]}]"#,
527+
"SELECT id, unnest(values) as value FROM test_table ORDER BY id, value",
528+
)
529+
.await?;
530+
531+
assert_snapshot!(result, @r"
532+
+----+-------+
533+
| id | value |
534+
+----+-------+
535+
| 1 | 10 |
536+
| 1 | 20 |
537+
| 1 | 30 |
538+
| 2 | 40 |
539+
| 2 | 50 |
540+
+----+-------+
541+
");
542+
Ok(())
543+
}
544+
545+
#[tokio::test]
546+
async fn test_json_array_unnest_struct() -> Result<()> {
547+
let result = query_json_array_str(
548+
r#"[{"id": 1, "orders": [{"product": "A", "qty": 2}, {"product": "B", "qty": 3}]}, {"id": 2, "orders": [{"product": "C", "qty": 1}]}]"#,
549+
"SELECT id, unnest(orders)['product'] as product, unnest(orders)['qty'] as qty FROM test_table ORDER BY id, product",
550+
)
551+
.await?;
552+
553+
assert_snapshot!(result, @r"
554+
+----+---------+-----+
555+
| id | product | qty |
556+
+----+---------+-----+
557+
| 1 | A | 2 |
558+
| 1 | B | 3 |
559+
| 2 | C | 1 |
560+
+----+---------+-----+
561+
");
562+
Ok(())
563+
}
564+
565+
#[tokio::test]
566+
async fn test_json_array_nested_struct_access() -> Result<()> {
567+
let result = query_json_array_str(
568+
r#"[{"id": 1, "dept": {"name": "Engineering", "head": "Alice"}}, {"id": 2, "dept": {"name": "Sales", "head": "Bob"}}]"#,
569+
"SELECT id, dept['name'] as dept_name, dept['head'] as head FROM test_table ORDER BY id",
570+
)
571+
.await?;
572+
573+
assert_snapshot!(result, @r"
574+
+----+-------------+-------+
575+
| id | dept_name | head |
576+
+----+-------------+-------+
577+
| 1 | Engineering | Alice |
578+
| 2 | Sales | Bob |
579+
+----+-------------+-------+
580+
");
581+
Ok(())
582+
}
583+
584+
#[tokio::test]
585+
async fn test_json_array_with_compression() -> Result<()> {
586+
use flate2::Compression;
587+
use flate2::write::GzEncoder;
588+
use std::io::Write;
589+
590+
let tmp_dir = tempfile::TempDir::new()?;
591+
let path = tmp_dir.path().join("array.json.gz");
592+
let path = path.to_string_lossy().to_string();
593+
594+
let file = std::fs::File::create(&path)?;
595+
let mut encoder = GzEncoder::new(file, Compression::default());
596+
encoder.write_all(
597+
r#"[{"a": 1, "b": "hello"}, {"a": 2, "b": "world"}]"#.as_bytes(),
598+
)?;
599+
encoder.finish()?;
600+
601+
let ctx = SessionContext::new();
602+
let options = JsonReadOptions::default()
603+
.newline_delimited(false)
604+
.file_compression_type(FileCompressionType::GZIP)
605+
.file_extension(".json.gz");
606+
607+
ctx.register_json("test_table", &path, options).await?;
608+
let result = ctx
609+
.sql("SELECT a, b FROM test_table ORDER BY a")
610+
.await?
611+
.collect()
612+
.await?;
613+
614+
assert_snapshot!(batches_to_string(&result), @r"
615+
+---+-------+
616+
| a | b |
617+
+---+-------+
618+
| 1 | hello |
619+
| 2 | world |
620+
+---+-------+
621+
");
622+
Ok(())
623+
}
624+
625+
#[tokio::test]
626+
async fn test_json_array_list_of_structs() -> Result<()> {
627+
let batches = query_json_array(
628+
r#"[{"id": 1, "items": [{"name": "x", "price": 10.5}]}, {"id": 2, "items": []}]"#,
629+
"SELECT id, items FROM test_table ORDER BY id",
630+
)
631+
.await?;
632+
633+
assert_eq!(1, batches.len());
634+
assert_eq!(2, batches[0].num_rows());
635+
Ok(())
636+
}
394637
}

0 commit comments

Comments
 (0)