Skip to content

Commit e444135

Browse files
authored
Merge branch 'main' into union-propagation-distinct-count
2 parents 5d01e40 + 39226c3 commit e444135

73 files changed

Lines changed: 2832 additions & 554 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

datafusion-cli/src/object_storage/instrumented.rs

Lines changed: 134 additions & 75 deletions
Large diffs are not rendered by default.

datafusion-examples/examples/relation_planner/table_sample.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -727,8 +727,8 @@ impl ExecutionPlan for SampleExec {
727727
Some(self.metrics.clone_inner())
728728
}
729729

730-
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
731-
let mut stats = self.input.partition_statistics(partition)?;
730+
fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
731+
let mut stats = Arc::unwrap_or_clone(self.input.partition_statistics(partition)?);
732732
let ratio = self.upper_bound - self.lower_bound;
733733

734734
// Scale statistics by sampling ratio (inexact due to randomness)
@@ -741,7 +741,7 @@ impl ExecutionPlan for SampleExec {
741741
.map(|n| (n as f64 * ratio) as usize)
742742
.to_inexact();
743743

744-
Ok(stats)
744+
Ok(Arc::new(stats))
745745
}
746746

747747
fn apply_expressions(

datafusion/core/src/execution/context/mod.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2218,7 +2218,9 @@ mod tests {
22182218
use crate::test;
22192219
use crate::test_util::{plan_and_collect, populate_csv_partitions};
22202220
use arrow::datatypes::{DataType, TimeUnit};
2221+
use arrow_schema::FieldRef;
22212222
use datafusion_common::DataFusionError;
2223+
use datafusion_common::datatype::DataTypeExt;
22222224
use std::error::Error;
22232225
use std::path::PathBuf;
22242226

@@ -2735,7 +2737,7 @@ mod tests {
27352737
struct MyTypePlanner {}
27362738

27372739
impl TypePlanner for MyTypePlanner {
2738-
fn plan_type(&self, sql_type: &ast::DataType) -> Result<Option<DataType>> {
2740+
fn plan_type_field(&self, sql_type: &ast::DataType) -> Result<Option<FieldRef>> {
27392741
match sql_type {
27402742
ast::DataType::Datetime(precision) => {
27412743
let precision = match precision {
@@ -2745,7 +2747,9 @@ mod tests {
27452747
None | Some(9) => TimeUnit::Nanosecond,
27462748
_ => unreachable!(),
27472749
};
2748-
Ok(Some(DataType::Timestamp(precision, None)))
2750+
Ok(Some(
2751+
DataType::Timestamp(precision, None).into_nullable_field_ref(),
2752+
))
27492753
}
27502754
_ => Ok(None),
27512755
}

datafusion/core/tests/custom_sources_cases/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -184,12 +184,12 @@ impl ExecutionPlan for CustomExecutionPlan {
184184
Ok(Box::pin(TestCustomRecordBatchStream { nb_batch: 1 }))
185185
}
186186

187-
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
187+
fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
188188
if partition.is_some() {
189-
return Ok(Statistics::new_unknown(&self.schema()));
189+
return Ok(Arc::new(Statistics::new_unknown(&self.schema())));
190190
}
191191
let batch = TEST_CUSTOM_RECORD_BATCH!().unwrap();
192-
Ok(Statistics {
192+
Ok(Arc::new(Statistics {
193193
num_rows: Precision::Exact(batch.num_rows()),
194194
total_byte_size: Precision::Absent,
195195
column_statistics: self
@@ -208,7 +208,7 @@ impl ExecutionPlan for CustomExecutionPlan {
208208
..Default::default()
209209
})
210210
.collect(),
211-
})
211+
}))
212212
}
213213

214214
fn apply_expressions(

datafusion/core/tests/custom_sources_cases/statistics.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -182,11 +182,11 @@ impl ExecutionPlan for StatisticsValidation {
182182
unimplemented!("This plan only serves for testing statistics")
183183
}
184184

185-
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
185+
fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
186186
if partition.is_some() {
187-
Ok(Statistics::new_unknown(&self.schema))
187+
Ok(Arc::new(Statistics::new_unknown(&self.schema)))
188188
} else {
189-
Ok(self.stats.clone())
189+
Ok(Arc::new(self.stats.clone()))
190190
}
191191
}
192192

@@ -255,7 +255,7 @@ async fn sql_basic() -> Result<()> {
255255
let physical_plan = df.create_physical_plan().await.unwrap();
256256

257257
// the statistics should be those of the source
258-
assert_eq!(stats, physical_plan.partition_statistics(None)?);
258+
assert_eq!(stats, *physical_plan.partition_statistics(None)?);
259259

260260
Ok(())
261261
}
@@ -295,7 +295,7 @@ async fn sql_limit() -> Result<()> {
295295
.collect(),
296296
total_byte_size: Precision::Absent
297297
},
298-
physical_plan.partition_statistics(None)?
298+
*physical_plan.partition_statistics(None)?
299299
);
300300

301301
let df = ctx
@@ -304,7 +304,7 @@ async fn sql_limit() -> Result<()> {
304304
.unwrap();
305305
let physical_plan = df.create_physical_plan().await.unwrap();
306306
// when the limit is larger than the original number of lines, statistics remain unchanged
307-
assert_eq!(stats, physical_plan.partition_statistics(None)?);
307+
assert_eq!(stats, *physical_plan.partition_statistics(None)?);
308308

309309
Ok(())
310310
}
@@ -324,7 +324,7 @@ async fn sql_window() -> Result<()> {
324324
let result = physical_plan.partition_statistics(None)?;
325325

326326
assert_eq!(stats.num_rows, result.num_rows);
327-
let col_stats = result.column_statistics;
327+
let col_stats = &result.column_statistics;
328328
assert_eq!(2, col_stats.len());
329329
assert_eq!(stats.column_statistics[1], col_stats[0]);
330330

datafusion/core/tests/parquet/expr_adapter.rs

Lines changed: 144 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,11 @@
1717

1818
use std::sync::Arc;
1919

20-
use arrow::array::{RecordBatch, record_batch};
21-
use arrow_schema::{DataType, Field, Schema, SchemaRef};
20+
use arrow::array::{
21+
Array, ArrayRef, BooleanArray, Int32Array, Int64Array, RecordBatch, StringArray,
22+
StructArray, record_batch,
23+
};
24+
use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef};
2225
use bytes::{BufMut, BytesMut};
2326
use datafusion::assert_batches_eq;
2427
use datafusion::common::Result;
@@ -320,6 +323,145 @@ async fn test_physical_expr_adapter_with_non_null_defaults() {
320323
assert_batches_eq!(expected, &batches);
321324
}
322325

326+
#[tokio::test]
327+
async fn test_struct_schema_evolution_projection_and_filter() -> Result<()> {
328+
use std::collections::HashMap;
329+
330+
// Physical struct: {id: Int32, name: Utf8}
331+
let physical_struct_fields: Fields = vec![
332+
Arc::new(Field::new("id", DataType::Int32, false)),
333+
Arc::new(Field::new("name", DataType::Utf8, true)),
334+
]
335+
.into();
336+
337+
let struct_array = StructArray::new(
338+
physical_struct_fields.clone(),
339+
vec![
340+
Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef,
341+
Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef,
342+
],
343+
None,
344+
);
345+
346+
let physical_schema = Arc::new(Schema::new(vec![Field::new(
347+
"s",
348+
DataType::Struct(physical_struct_fields),
349+
true,
350+
)]));
351+
352+
let batch =
353+
RecordBatch::try_new(Arc::clone(&physical_schema), vec![Arc::new(struct_array)])?;
354+
355+
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
356+
let store_url = ObjectStoreUrl::parse("memory://").unwrap();
357+
write_parquet(batch, store.clone(), "struct_evolution.parquet").await;
358+
359+
// Logical struct: {id: Int64?, name: Utf8?, extra: Boolean?} + metadata
360+
let logical_struct_fields: Fields = vec![
361+
Arc::new(Field::new("id", DataType::Int64, true)),
362+
Arc::new(Field::new("name", DataType::Utf8, true)),
363+
Arc::new(Field::new("extra", DataType::Boolean, true).with_metadata(
364+
HashMap::from([("nested_meta".to_string(), "1".to_string())]),
365+
)),
366+
]
367+
.into();
368+
369+
let table_schema = Arc::new(Schema::new(vec![
370+
Field::new("s", DataType::Struct(logical_struct_fields), false)
371+
.with_metadata(HashMap::from([("top_meta".to_string(), "1".to_string())])),
372+
]));
373+
374+
let mut cfg = SessionConfig::new()
375+
.with_collect_statistics(false)
376+
.with_parquet_pruning(false)
377+
.with_parquet_page_index_pruning(false);
378+
cfg.options_mut().execution.parquet.pushdown_filters = true;
379+
380+
let ctx = SessionContext::new_with_config(cfg);
381+
ctx.register_object_store(store_url.as_ref(), Arc::clone(&store));
382+
383+
let listing_table_config =
384+
ListingTableConfig::new(ListingTableUrl::parse("memory:///").unwrap())
385+
.infer_options(&ctx.state())
386+
.await
387+
.unwrap()
388+
.with_schema(table_schema.clone())
389+
.with_expr_adapter_factory(Arc::new(DefaultPhysicalExprAdapterFactory));
390+
391+
let table = ListingTable::try_new(listing_table_config).unwrap();
392+
ctx.register_table("t", Arc::new(table)).unwrap();
393+
394+
let batches = ctx
395+
.sql("SELECT s FROM t")
396+
.await
397+
.unwrap()
398+
.collect()
399+
.await
400+
.unwrap();
401+
assert_eq!(batches.len(), 1);
402+
403+
// Verify top-level metadata propagation
404+
let output_schema = batches[0].schema();
405+
let s_field = output_schema.field_with_name("s").unwrap();
406+
assert_eq!(
407+
s_field.metadata().get("top_meta").map(String::as_str),
408+
Some("1")
409+
);
410+
411+
// Verify nested struct type/field propagation + values
412+
let s_array = batches[0]
413+
.column(0)
414+
.as_any()
415+
.downcast_ref::<StructArray>()
416+
.expect("expected struct array");
417+
418+
let id_array = s_array
419+
.column_by_name("id")
420+
.expect("id column")
421+
.as_any()
422+
.downcast_ref::<Int64Array>()
423+
.expect("id should be cast to Int64");
424+
assert_eq!(id_array.values(), &[1, 2, 3]);
425+
426+
let extra_array = s_array.column_by_name("extra").expect("extra column");
427+
assert_eq!(extra_array.null_count(), 3);
428+
429+
// Verify nested field metadata propagation
430+
let extra_field = match s_field.data_type() {
431+
DataType::Struct(fields) => fields
432+
.iter()
433+
.find(|f| f.name() == "extra")
434+
.expect("extra field"),
435+
other => panic!("expected struct type for s, got {other:?}"),
436+
};
437+
assert_eq!(
438+
extra_field
439+
.metadata()
440+
.get("nested_meta")
441+
.map(String::as_str),
442+
Some("1")
443+
);
444+
445+
// Smoke test: filtering on a missing nested field evaluates correctly
446+
let filtered = ctx
447+
.sql("SELECT get_field(s, 'extra') AS extra FROM t WHERE get_field(s, 'extra') IS NULL")
448+
.await
449+
.unwrap()
450+
.collect()
451+
.await
452+
.unwrap();
453+
assert_eq!(filtered.len(), 1);
454+
assert_eq!(filtered[0].num_rows(), 3);
455+
let extra = filtered[0]
456+
.column(0)
457+
.as_any()
458+
.downcast_ref::<BooleanArray>()
459+
.expect("extra should be a boolean array");
460+
assert_eq!(extra.null_count(), 3);
461+
462+
Ok(())
463+
}
464+
323465
/// Test demonstrating that a single PhysicalExprAdapterFactory instance can be
324466
/// reused across multiple ListingTable instances.
325467
///

datafusion/core/tests/physical_optimizer/join_selection.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1191,12 +1191,12 @@ impl ExecutionPlan for StatisticsExec {
11911191
unimplemented!("This plan only serves for testing statistics")
11921192
}
11931193

1194-
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
1195-
Ok(if partition.is_some() {
1194+
fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
1195+
Ok(Arc::new(if partition.is_some() {
11961196
Statistics::new_unknown(&self.schema)
11971197
} else {
11981198
self.stats.clone()
1199-
})
1199+
}))
12001200
}
12011201

12021202
fn apply_expressions(

0 commit comments

Comments
 (0)