Skip to content

Commit 230014c

Browse files
Merge branch 'main' into improve_parse_duration_api
2 parents 669fb9c + fd97799 commit 230014c

File tree

65 files changed

+2277
-485
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+2277
-485
lines changed

datafusion-examples/examples/relation_planner/table_sample.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -727,8 +727,8 @@ impl ExecutionPlan for SampleExec {
727727
Some(self.metrics.clone_inner())
728728
}
729729

730-
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
731-
let mut stats = self.input.partition_statistics(partition)?;
730+
fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
731+
let mut stats = Arc::unwrap_or_clone(self.input.partition_statistics(partition)?);
732732
let ratio = self.upper_bound - self.lower_bound;
733733

734734
// Scale statistics by sampling ratio (inexact due to randomness)
@@ -741,7 +741,7 @@ impl ExecutionPlan for SampleExec {
741741
.map(|n| (n as f64 * ratio) as usize)
742742
.to_inexact();
743743

744-
Ok(stats)
744+
Ok(Arc::new(stats))
745745
}
746746

747747
fn apply_expressions(

datafusion/core/src/execution/context/mod.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2226,7 +2226,9 @@ mod tests {
22262226
use crate::test;
22272227
use crate::test_util::{plan_and_collect, populate_csv_partitions};
22282228
use arrow::datatypes::{DataType, TimeUnit};
2229+
use arrow_schema::FieldRef;
22292230
use datafusion_common::DataFusionError;
2231+
use datafusion_common::datatype::DataTypeExt;
22302232
use std::error::Error;
22312233
use std::path::PathBuf;
22322234

@@ -2743,7 +2745,7 @@ mod tests {
27432745
struct MyTypePlanner {}
27442746

27452747
impl TypePlanner for MyTypePlanner {
2746-
fn plan_type(&self, sql_type: &ast::DataType) -> Result<Option<DataType>> {
2748+
fn plan_type_field(&self, sql_type: &ast::DataType) -> Result<Option<FieldRef>> {
27472749
match sql_type {
27482750
ast::DataType::Datetime(precision) => {
27492751
let precision = match precision {
@@ -2753,7 +2755,9 @@ mod tests {
27532755
None | Some(9) => TimeUnit::Nanosecond,
27542756
_ => unreachable!(),
27552757
};
2756-
Ok(Some(DataType::Timestamp(precision, None)))
2758+
Ok(Some(
2759+
DataType::Timestamp(precision, None).into_nullable_field_ref(),
2760+
))
27572761
}
27582762
_ => Ok(None),
27592763
}

datafusion/core/tests/custom_sources_cases/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -184,12 +184,12 @@ impl ExecutionPlan for CustomExecutionPlan {
184184
Ok(Box::pin(TestCustomRecordBatchStream { nb_batch: 1 }))
185185
}
186186

187-
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
187+
fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
188188
if partition.is_some() {
189-
return Ok(Statistics::new_unknown(&self.schema()));
189+
return Ok(Arc::new(Statistics::new_unknown(&self.schema())));
190190
}
191191
let batch = TEST_CUSTOM_RECORD_BATCH!().unwrap();
192-
Ok(Statistics {
192+
Ok(Arc::new(Statistics {
193193
num_rows: Precision::Exact(batch.num_rows()),
194194
total_byte_size: Precision::Absent,
195195
column_statistics: self
@@ -208,7 +208,7 @@ impl ExecutionPlan for CustomExecutionPlan {
208208
..Default::default()
209209
})
210210
.collect(),
211-
})
211+
}))
212212
}
213213

214214
fn apply_expressions(

datafusion/core/tests/custom_sources_cases/statistics.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -182,11 +182,11 @@ impl ExecutionPlan for StatisticsValidation {
182182
unimplemented!("This plan only serves for testing statistics")
183183
}
184184

185-
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
185+
fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
186186
if partition.is_some() {
187-
Ok(Statistics::new_unknown(&self.schema))
187+
Ok(Arc::new(Statistics::new_unknown(&self.schema)))
188188
} else {
189-
Ok(self.stats.clone())
189+
Ok(Arc::new(self.stats.clone()))
190190
}
191191
}
192192

@@ -255,7 +255,7 @@ async fn sql_basic() -> Result<()> {
255255
let physical_plan = df.create_physical_plan().await.unwrap();
256256

257257
// the statistics should be those of the source
258-
assert_eq!(stats, physical_plan.partition_statistics(None)?);
258+
assert_eq!(stats, *physical_plan.partition_statistics(None)?);
259259

260260
Ok(())
261261
}
@@ -295,7 +295,7 @@ async fn sql_limit() -> Result<()> {
295295
.collect(),
296296
total_byte_size: Precision::Absent
297297
},
298-
physical_plan.partition_statistics(None)?
298+
*physical_plan.partition_statistics(None)?
299299
);
300300

301301
let df = ctx
@@ -304,7 +304,7 @@ async fn sql_limit() -> Result<()> {
304304
.unwrap();
305305
let physical_plan = df.create_physical_plan().await.unwrap();
306306
// when the limit is larger than the original number of lines, statistics remain unchanged
307-
assert_eq!(stats, physical_plan.partition_statistics(None)?);
307+
assert_eq!(stats, *physical_plan.partition_statistics(None)?);
308308

309309
Ok(())
310310
}
@@ -324,7 +324,7 @@ async fn sql_window() -> Result<()> {
324324
let result = physical_plan.partition_statistics(None)?;
325325

326326
assert_eq!(stats.num_rows, result.num_rows);
327-
let col_stats = result.column_statistics;
327+
let col_stats = &result.column_statistics;
328328
assert_eq!(2, col_stats.len());
329329
assert_eq!(stats.column_statistics[1], col_stats[0]);
330330

datafusion/core/tests/physical_optimizer/join_selection.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1191,12 +1191,12 @@ impl ExecutionPlan for StatisticsExec {
11911191
unimplemented!("This plan only serves for testing statistics")
11921192
}
11931193

1194-
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
1195-
Ok(if partition.is_some() {
1194+
fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
1195+
Ok(Arc::new(if partition.is_some() {
11961196
Statistics::new_unknown(&self.schema)
11971197
} else {
11981198
self.stats.clone()
1199-
})
1199+
}))
12001200
}
12011201

12021202
fn apply_expressions(

0 commit comments

Comments
 (0)