Skip to content

Commit 84a22ea

Browse files
xudong963alamb
andauthored
Wrap Arc to Statistics for partition_statistics API (#20570)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Part of #20184 ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent 44dfa7b commit 84a22ea

File tree

38 files changed

+283
-208
lines changed

38 files changed

+283
-208
lines changed

datafusion-examples/examples/relation_planner/table_sample.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -727,8 +727,8 @@ impl ExecutionPlan for SampleExec {
727727
Some(self.metrics.clone_inner())
728728
}
729729

730-
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
731-
let mut stats = self.input.partition_statistics(partition)?;
730+
fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
731+
let mut stats = Arc::unwrap_or_clone(self.input.partition_statistics(partition)?);
732732
let ratio = self.upper_bound - self.lower_bound;
733733

734734
// Scale statistics by sampling ratio (inexact due to randomness)
@@ -741,7 +741,7 @@ impl ExecutionPlan for SampleExec {
741741
.map(|n| (n as f64 * ratio) as usize)
742742
.to_inexact();
743743

744-
Ok(stats)
744+
Ok(Arc::new(stats))
745745
}
746746

747747
fn apply_expressions(

datafusion/core/tests/custom_sources_cases/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -184,12 +184,12 @@ impl ExecutionPlan for CustomExecutionPlan {
184184
Ok(Box::pin(TestCustomRecordBatchStream { nb_batch: 1 }))
185185
}
186186

187-
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
187+
fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
188188
if partition.is_some() {
189-
return Ok(Statistics::new_unknown(&self.schema()));
189+
return Ok(Arc::new(Statistics::new_unknown(&self.schema())));
190190
}
191191
let batch = TEST_CUSTOM_RECORD_BATCH!().unwrap();
192-
Ok(Statistics {
192+
Ok(Arc::new(Statistics {
193193
num_rows: Precision::Exact(batch.num_rows()),
194194
total_byte_size: Precision::Absent,
195195
column_statistics: self
@@ -208,7 +208,7 @@ impl ExecutionPlan for CustomExecutionPlan {
208208
..Default::default()
209209
})
210210
.collect(),
211-
})
211+
}))
212212
}
213213

214214
fn apply_expressions(

datafusion/core/tests/custom_sources_cases/statistics.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -182,11 +182,11 @@ impl ExecutionPlan for StatisticsValidation {
182182
unimplemented!("This plan only serves for testing statistics")
183183
}
184184

185-
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
185+
fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
186186
if partition.is_some() {
187-
Ok(Statistics::new_unknown(&self.schema))
187+
Ok(Arc::new(Statistics::new_unknown(&self.schema)))
188188
} else {
189-
Ok(self.stats.clone())
189+
Ok(Arc::new(self.stats.clone()))
190190
}
191191
}
192192

@@ -255,7 +255,7 @@ async fn sql_basic() -> Result<()> {
255255
let physical_plan = df.create_physical_plan().await.unwrap();
256256

257257
// the statistics should be those of the source
258-
assert_eq!(stats, physical_plan.partition_statistics(None)?);
258+
assert_eq!(stats, *physical_plan.partition_statistics(None)?);
259259

260260
Ok(())
261261
}
@@ -295,7 +295,7 @@ async fn sql_limit() -> Result<()> {
295295
.collect(),
296296
total_byte_size: Precision::Absent
297297
},
298-
physical_plan.partition_statistics(None)?
298+
*physical_plan.partition_statistics(None)?
299299
);
300300

301301
let df = ctx
@@ -304,7 +304,7 @@ async fn sql_limit() -> Result<()> {
304304
.unwrap();
305305
let physical_plan = df.create_physical_plan().await.unwrap();
306306
// when the limit is larger than the original number of lines, statistics remain unchanged
307-
assert_eq!(stats, physical_plan.partition_statistics(None)?);
307+
assert_eq!(stats, *physical_plan.partition_statistics(None)?);
308308

309309
Ok(())
310310
}
@@ -324,7 +324,7 @@ async fn sql_window() -> Result<()> {
324324
let result = physical_plan.partition_statistics(None)?;
325325

326326
assert_eq!(stats.num_rows, result.num_rows);
327-
let col_stats = result.column_statistics;
327+
let col_stats = &result.column_statistics;
328328
assert_eq!(2, col_stats.len());
329329
assert_eq!(stats.column_statistics[1], col_stats[0]);
330330

datafusion/core/tests/physical_optimizer/join_selection.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1191,12 +1191,12 @@ impl ExecutionPlan for StatisticsExec {
11911191
unimplemented!("This plan only serves for testing statistics")
11921192
}
11931193

1194-
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
1195-
Ok(if partition.is_some() {
1194+
fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
1195+
Ok(Arc::new(if partition.is_some() {
11961196
Statistics::new_unknown(&self.schema)
11971197
} else {
11981198
self.stats.clone()
1199-
})
1199+
}))
12001200
}
12011201

12021202
fn apply_expressions(

0 commit comments

Comments
 (0)