Skip to content

Commit 9cd0824

Browse files
authored
[X-2182] Upgrade DF to 52.3 (#34)
1 parent 28d012a commit 9cd0824

99 files changed

Lines changed: 4254 additions & 324 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ arrow-flight = { version = "57.1.0", features = [
101101
] }
102102
arrow-ipc = { version = "57.1.0", default-features = false, features = [
103103
"lz4",
104+
"zstd",
104105
] }
105106
arrow-ord = { version = "57.1.0", default-features = false }
106107
arrow-schema = { version = "57.1.0", default-features = false }
@@ -187,6 +188,7 @@ strum_macros = "0.27.2"
187188
tempfile = "3"
188189
testcontainers-modules = { version = "0.14" }
189190
tokio = { version = "1.48", features = ["macros", "rt", "sync"] }
191+
tokio-stream = "0.1"
190192
url = "2.5.7"
191193
zstd = { version = "0.13", default-features = false }
192194

datafusion-examples/examples/custom_data_source/csv_json_opener.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ async fn json_opener() -> Result<()> {
127127
projected,
128128
FileCompressionType::UNCOMPRESSED,
129129
Arc::new(object_store),
130+
true,
130131
);
131132

132133
let scan_config = FileScanConfigBuilder::new(

datafusion/common/src/config.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3046,6 +3046,22 @@ config_namespace! {
30463046
/// If not specified, the default level for the compression algorithm is used.
30473047
pub compression_level: Option<u32>, default = None
30483048
pub schema_infer_max_rec: Option<usize>, default = None
3049+
/// The JSON format to use when reading files.
3050+
///
3051+
/// When `true` (default), expects newline-delimited JSON (NDJSON):
3052+
/// ```text
3053+
/// {"key1": 1, "key2": "val"}
3054+
/// {"key1": 2, "key2": "vals"}
3055+
/// ```
3056+
///
3057+
/// When `false`, expects JSON array format:
3058+
/// ```text
3059+
/// [
3060+
/// {"key1": 1, "key2": "val"},
3061+
/// {"key1": 2, "key2": "vals"}
3062+
/// ]
3063+
/// ```
3064+
pub newline_delimited: bool, default = true
30493065
}
30503066
}
30513067

datafusion/core/src/dataframe/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -481,7 +481,7 @@ impl DataFrame {
481481
/// # #[tokio::main]
482482
/// # async fn main() -> Result<()> {
483483
/// let ctx = SessionContext::new();
484-
/// let df = ctx.read_json("tests/data/unnest.json", NdJsonReadOptions::default()).await?;
484+
/// let df = ctx.read_json("tests/data/unnest.json", JsonReadOptions::default()).await?;
485485
/// // expand into multiple columns if it's json array, flatten field name if it's nested structure
486486
/// let df = df.unnest_columns(&["b","c","d"])?;
487487
/// let expected = vec![

datafusion/core/src/dataframe/parquet.rs

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,4 +324,147 @@ mod tests {
324324

325325
Ok(())
326326
}
327+
328+
/// Test that ParquetSink exposes rows_written, bytes_written, and
329+
/// elapsed_compute metrics via DataSinkExec.
330+
#[tokio::test]
331+
async fn test_parquet_sink_metrics() -> Result<()> {
332+
use arrow::array::Int32Array;
333+
use arrow::datatypes::{DataType, Field, Schema};
334+
use arrow::record_batch::RecordBatch;
335+
use datafusion_execution::TaskContext;
336+
337+
use futures::TryStreamExt;
338+
339+
let ctx = SessionContext::new();
340+
let tmp_dir = TempDir::new()?;
341+
let output_path = tmp_dir.path().join("metrics_test.parquet");
342+
let output_path_str = output_path.to_str().unwrap();
343+
344+
// Register a table with 100 rows
345+
let schema = Arc::new(Schema::new(vec![
346+
Field::new("id", DataType::Int32, false),
347+
Field::new("val", DataType::Int32, false),
348+
]));
349+
let ids: Vec<i32> = (0..100).collect();
350+
let vals: Vec<i32> = (100..200).collect();
351+
let batch = RecordBatch::try_new(
352+
Arc::clone(&schema),
353+
vec![
354+
Arc::new(Int32Array::from(ids)),
355+
Arc::new(Int32Array::from(vals)),
356+
],
357+
)?;
358+
ctx.register_batch("source", batch)?;
359+
360+
// Create the physical plan for COPY TO
361+
let df = ctx
362+
.sql(&format!(
363+
"COPY source TO '{output_path_str}' STORED AS PARQUET"
364+
))
365+
.await?;
366+
let plan = df.create_physical_plan().await?;
367+
368+
// Execute the plan
369+
let task_ctx = Arc::new(TaskContext::from(&ctx.state()));
370+
let stream = plan.execute(0, task_ctx)?;
371+
let _batches: Vec<_> = stream.try_collect().await?;
372+
373+
// Check metrics on the DataSinkExec (top-level plan)
374+
let metrics = plan
375+
.metrics()
376+
.expect("DataSinkExec should return metrics from ParquetSink");
377+
let aggregated = metrics.aggregate_by_name();
378+
379+
// rows_written should be 100
380+
let rows_written = aggregated
381+
.iter()
382+
.find(|m| m.value().name() == "rows_written")
383+
.expect("should have rows_written metric");
384+
assert_eq!(
385+
rows_written.value().as_usize(),
386+
100,
387+
"expected 100 rows written"
388+
);
389+
390+
// bytes_written should be > 0
391+
let bytes_written = aggregated
392+
.iter()
393+
.find(|m| m.value().name() == "bytes_written")
394+
.expect("should have bytes_written metric");
395+
assert!(
396+
bytes_written.value().as_usize() > 0,
397+
"expected bytes_written > 0, got {}",
398+
bytes_written.value().as_usize()
399+
);
400+
401+
// elapsed_compute should be > 0
402+
let elapsed = aggregated
403+
.iter()
404+
.find(|m| m.value().name() == "elapsed_compute")
405+
.expect("should have elapsed_compute metric");
406+
assert!(
407+
elapsed.value().as_usize() > 0,
408+
"expected elapsed_compute > 0"
409+
);
410+
411+
Ok(())
412+
}
413+
414+
/// Test that ParquetSink metrics work with single_file_parallelism enabled.
415+
#[tokio::test]
416+
async fn test_parquet_sink_metrics_parallel() -> Result<()> {
417+
use arrow::array::Int32Array;
418+
use arrow::datatypes::{DataType, Field, Schema};
419+
use arrow::record_batch::RecordBatch;
420+
use datafusion_execution::TaskContext;
421+
422+
use futures::TryStreamExt;
423+
424+
let ctx = SessionContext::new();
425+
ctx.sql("SET datafusion.execution.parquet.allow_single_file_parallelism = true")
426+
.await?
427+
.collect()
428+
.await?;
429+
430+
let tmp_dir = TempDir::new()?;
431+
let output_path = tmp_dir.path().join("metrics_parallel.parquet");
432+
let output_path_str = output_path.to_str().unwrap();
433+
434+
let schema =
435+
Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
436+
let ids: Vec<i32> = (0..50).collect();
437+
let batch = RecordBatch::try_new(
438+
Arc::clone(&schema),
439+
vec![Arc::new(Int32Array::from(ids))],
440+
)?;
441+
ctx.register_batch("source2", batch)?;
442+
443+
let df = ctx
444+
.sql(&format!(
445+
"COPY source2 TO '{output_path_str}' STORED AS PARQUET"
446+
))
447+
.await?;
448+
let plan = df.create_physical_plan().await?;
449+
let task_ctx = Arc::new(TaskContext::from(&ctx.state()));
450+
let stream = plan.execute(0, task_ctx)?;
451+
let _batches: Vec<_> = stream.try_collect().await?;
452+
453+
let metrics = plan.metrics().expect("DataSinkExec should return metrics");
454+
let aggregated = metrics.aggregate_by_name();
455+
456+
let rows_written = aggregated
457+
.iter()
458+
.find(|m| m.value().name() == "rows_written")
459+
.expect("should have rows_written metric");
460+
assert_eq!(rows_written.value().as_usize(), 50);
461+
462+
let bytes_written = aggregated
463+
.iter()
464+
.find(|m| m.value().name() == "bytes_written")
465+
.expect("should have bytes_written metric");
466+
assert!(bytes_written.value().as_usize() > 0);
467+
468+
Ok(())
469+
}
327470
}

0 commit comments

Comments
 (0)