Skip to content

Commit 0f2140c

Browse files
authored
Merge branch 'main' into fix/sort-merge-reservation-starvation
2 parents af78770 + 95de1bf commit 0f2140c

File tree

3 files changed

+176
-3
lines changed

3 files changed

+176
-3
lines changed

datafusion/core/src/dataframe/parquet.rs

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,19 @@ mod tests {
127127
use tempfile::TempDir;
128128
use url::Url;
129129

130+
/// Helper to extract a metric value by name from aggregated metrics.
131+
fn metric_usize(
132+
aggregated: &datafusion_physical_expr_common::metrics::MetricsSet,
133+
name: &str,
134+
) -> usize {
135+
aggregated
136+
.iter()
137+
.find(|m| m.value().name() == name)
138+
.unwrap_or_else(|| panic!("should have {name} metric"))
139+
.value()
140+
.as_usize()
141+
}
142+
130143
#[tokio::test]
131144
async fn filter_pushdown_dataframe() -> Result<()> {
132145
let ctx = SessionContext::new();
@@ -430,6 +443,126 @@ mod tests {
430443
Ok(())
431444
}
432445

446+
/// Test that ParquetSink exposes rows_written, bytes_written, and
447+
/// elapsed_compute metrics via DataSinkExec.
448+
#[tokio::test]
449+
async fn test_parquet_sink_metrics() -> Result<()> {
450+
use arrow::array::Int32Array;
451+
use arrow::datatypes::{DataType, Field, Schema};
452+
use arrow::record_batch::RecordBatch;
453+
use datafusion_execution::TaskContext;
454+
455+
use futures::TryStreamExt;
456+
457+
let ctx = SessionContext::new();
458+
let tmp_dir = TempDir::new()?;
459+
let output_path = tmp_dir.path().join("metrics_test.parquet");
460+
let output_path_str = output_path.to_str().unwrap();
461+
462+
// Register a table with 100 rows
463+
let schema = Arc::new(Schema::new(vec![
464+
Field::new("id", DataType::Int32, false),
465+
Field::new("val", DataType::Int32, false),
466+
]));
467+
let ids: Vec<i32> = (0..100).collect();
468+
let vals: Vec<i32> = (100..200).collect();
469+
let batch = RecordBatch::try_new(
470+
Arc::clone(&schema),
471+
vec![
472+
Arc::new(Int32Array::from(ids)),
473+
Arc::new(Int32Array::from(vals)),
474+
],
475+
)?;
476+
ctx.register_batch("source", batch)?;
477+
478+
// Create the physical plan for COPY TO
479+
let df = ctx
480+
.sql(&format!(
481+
"COPY source TO '{output_path_str}' STORED AS PARQUET"
482+
))
483+
.await?;
484+
let plan = df.create_physical_plan().await?;
485+
486+
// Execute the plan
487+
let task_ctx = Arc::new(TaskContext::from(&ctx.state()));
488+
let stream = plan.execute(0, task_ctx)?;
489+
let _batches: Vec<_> = stream.try_collect().await?;
490+
491+
// Check metrics on the DataSinkExec (top-level plan)
492+
let metrics = plan
493+
.metrics()
494+
.expect("DataSinkExec should return metrics from ParquetSink");
495+
let aggregated = metrics.aggregate_by_name();
496+
497+
// rows_written should be 100
498+
assert_eq!(
499+
metric_usize(&aggregated, "rows_written"),
500+
100,
501+
"expected 100 rows written"
502+
);
503+
504+
// bytes_written should be > 0
505+
let bytes_written = metric_usize(&aggregated, "bytes_written");
506+
assert!(
507+
bytes_written > 0,
508+
"expected bytes_written > 0, got {bytes_written}"
509+
);
510+
511+
// elapsed_compute should be > 0
512+
let elapsed = metric_usize(&aggregated, "elapsed_compute");
513+
assert!(elapsed > 0, "expected elapsed_compute > 0");
514+
515+
Ok(())
516+
}
517+
518+
/// Test that ParquetSink metrics work with single_file_parallelism enabled.
519+
#[tokio::test]
520+
async fn test_parquet_sink_metrics_parallel() -> Result<()> {
521+
use arrow::array::Int32Array;
522+
use arrow::datatypes::{DataType, Field, Schema};
523+
use arrow::record_batch::RecordBatch;
524+
use datafusion_execution::TaskContext;
525+
526+
use futures::TryStreamExt;
527+
528+
let ctx = SessionContext::new();
529+
ctx.sql("SET datafusion.execution.parquet.allow_single_file_parallelism = true")
530+
.await?
531+
.collect()
532+
.await?;
533+
534+
let tmp_dir = TempDir::new()?;
535+
let output_path = tmp_dir.path().join("metrics_parallel.parquet");
536+
let output_path_str = output_path.to_str().unwrap();
537+
538+
let schema =
539+
Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
540+
let ids: Vec<i32> = (0..50).collect();
541+
let batch = RecordBatch::try_new(
542+
Arc::clone(&schema),
543+
vec![Arc::new(Int32Array::from(ids))],
544+
)?;
545+
ctx.register_batch("source2", batch)?;
546+
547+
let df = ctx
548+
.sql(&format!(
549+
"COPY source2 TO '{output_path_str}' STORED AS PARQUET"
550+
))
551+
.await?;
552+
let plan = df.create_physical_plan().await?;
553+
let task_ctx = Arc::new(TaskContext::from(&ctx.state()));
554+
let stream = plan.execute(0, task_ctx)?;
555+
let _batches: Vec<_> = stream.try_collect().await?;
556+
557+
let metrics = plan.metrics().expect("DataSinkExec should return metrics");
558+
let aggregated = metrics.aggregate_by_name();
559+
560+
assert_eq!(metric_usize(&aggregated, "rows_written"), 50);
561+
assert!(metric_usize(&aggregated, "bytes_written") > 0);
562+
563+
Ok(())
564+
}
565+
433566
/// Test FileOutputMode::Directory - explicitly request directory output
434567
/// even for paths WITH file extensions.
435568
#[tokio::test]

datafusion/datasource-parquet/src/file_format.rs

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReserv
5555
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
5656
use datafusion_expr::dml::InsertOp;
5757
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
58+
use datafusion_physical_plan::metrics::{
59+
ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
60+
};
5861
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
5962
use datafusion_session::Session;
6063

@@ -1156,6 +1159,8 @@ pub struct ParquetSink {
11561159
written: Arc<parking_lot::Mutex<HashMap<Path, ParquetMetaData>>>,
11571160
/// Optional sorting columns to write to Parquet metadata
11581161
sorting_columns: Option<Vec<SortingColumn>>,
1162+
/// Metrics for tracking write operations
1163+
metrics: ExecutionPlanMetricsSet,
11591164
}
11601165

11611166
impl Debug for ParquetSink {
@@ -1188,6 +1193,7 @@ impl ParquetSink {
11881193
parquet_options,
11891194
written: Default::default(),
11901195
sorting_columns: None,
1196+
metrics: ExecutionPlanMetricsSet::new(),
11911197
}
11921198
}
11931199

@@ -1333,6 +1339,17 @@ impl FileSink for ParquetSink {
13331339
mut file_stream_rx: DemuxedStreamReceiver,
13341340
object_store: Arc<dyn ObjectStore>,
13351341
) -> Result<u64> {
1342+
let rows_written_counter =
1343+
MetricBuilder::new(&self.metrics).global_counter("rows_written");
1344+
// Note: bytes_written is the sum of compressed row group sizes, which
1345+
// may differ slightly from the actual on-disk file size (excludes footer,
1346+
// page indexes, and other Parquet metadata overhead).
1347+
let bytes_written_counter =
1348+
MetricBuilder::new(&self.metrics).global_counter("bytes_written");
1349+
let elapsed_compute = MetricBuilder::new(&self.metrics).elapsed_compute(0);
1350+
1351+
let write_start = datafusion_common::instant::Instant::now();
1352+
13361353
let parquet_opts = &self.parquet_options;
13371354

13381355
let mut file_write_tasks: JoinSet<
@@ -1410,12 +1427,18 @@ impl FileSink for ParquetSink {
14101427
}
14111428
}
14121429

1413-
let mut row_count = 0;
14141430
while let Some(result) = file_write_tasks.join_next().await {
14151431
match result {
14161432
Ok(r) => {
14171433
let (path, parquet_meta_data) = r?;
1418-
row_count += parquet_meta_data.file_metadata().num_rows();
1434+
let file_rows = parquet_meta_data.file_metadata().num_rows() as usize;
1435+
let file_bytes: usize = parquet_meta_data
1436+
.row_groups()
1437+
.iter()
1438+
.map(|rg| rg.compressed_size() as usize)
1439+
.sum();
1440+
rows_written_counter.add(file_rows);
1441+
bytes_written_counter.add(file_bytes);
14191442
let mut written_files = self.written.lock();
14201443
written_files
14211444
.try_insert(path.clone(), parquet_meta_data)
@@ -1437,7 +1460,9 @@ impl FileSink for ParquetSink {
14371460
.await
14381461
.map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
14391462

1440-
Ok(row_count as u64)
1463+
elapsed_compute.add_elapsed(write_start);
1464+
1465+
Ok(rows_written_counter.value() as u64)
14411466
}
14421467
}
14431468

@@ -1447,6 +1472,10 @@ impl DataSink for ParquetSink {
14471472
self
14481473
}
14491474

1475+
fn metrics(&self) -> Option<MetricsSet> {
1476+
Some(self.metrics.clone_inner())
1477+
}
1478+
14501479
fn schema(&self) -> &SchemaRef {
14511480
self.config.output_schema()
14521481
}

datafusion/sqllogictest/test_files/copy.slt

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,17 @@ physical_plan
200200
01)DataSinkExec: sink=ParquetSink(file_groups=[])
201201
02)--DataSourceExec: partitions=1, partition_sizes=[1]
202202

203+
# Verify ParquetSink exposes rows_written, bytes_written, and elapsed_compute metrics
204+
# Use a query with Sort and Projection to verify metrics across all operators
205+
query TT
206+
EXPLAIN ANALYZE COPY (SELECT col1, upper(col2) AS col2_upper FROM source_table ORDER BY col1) TO 'test_files/scratch/copy/table_metrics/' STORED AS PARQUET;
207+
----
208+
Plan with Metrics
209+
01)DataSinkExec: sink=ParquetSink(file_groups=[]), metrics=[elapsed_compute=<slt:ignore>, bytes_written=<slt:ignore>, rows_written=2]
210+
02)--SortExec: expr=[col1@0 ASC NULLS LAST], preserve_partitioning=[false], metrics=[output_rows=2, elapsed_compute=<slt:ignore>, output_bytes=<slt:ignore>, output_batches=<slt:ignore>, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0]
211+
03)----ProjectionExec: expr=[col1@0 as col1, upper(col2@1) as col2_upper], metrics=[output_rows=2, elapsed_compute=<slt:ignore>, output_bytes=<slt:ignore>, output_batches=1, expr_0_eval_time=<slt:ignore>, expr_1_eval_time=<slt:ignore>]
212+
04)------DataSourceExec: partitions=1, partition_sizes=[1], metrics=[]
213+
203214
# Copy to directory as partitioned files with keep_partition_by_columns enabled
204215
query I
205216
COPY (values ('1', 'a'), ('2', 'b'), ('3', 'c')) TO 'test_files/scratch/copy/partitioned_table4/' STORED AS parquet PARTITIONED BY (column1)

0 commit comments

Comments
 (0)