|
65 | 65 | //! The optimizer rule currently checks the plan for exchange-like operators and leave operators |
66 | 66 | //! that report [`SchedulingType::NonCooperative`] in their [plan properties](ExecutionPlan::properties). |
67 | 67 |
|
68 | | -#[cfg(any( |
69 | | - datafusion_coop = "tokio_fallback", |
70 | | - not(any(datafusion_coop = "tokio", datafusion_coop = "per_stream")) |
71 | | -))] |
| 68 | +use datafusion_common::config::ConfigOptions; |
| 69 | +use datafusion_physical_expr::PhysicalExpr; |
| 70 | +#[cfg(datafusion_coop = "tokio_fallback")] |
72 | 71 | use futures::Future; |
73 | 72 | use std::any::Any; |
74 | 73 | use std::pin::Pin; |
75 | 74 | use std::sync::Arc; |
76 | 75 | use std::task::{Context, Poll}; |
77 | 76 |
|
78 | 77 | use crate::execution_plan::CardinalityEffect::{self, Equal}; |
| 78 | +use crate::filter_pushdown::{ |
| 79 | + ChildPushdownResult, FilterDescription, FilterPushdownPhase, |
| 80 | + FilterPushdownPropagation, |
| 81 | +}; |
79 | 82 | use crate::{ |
80 | 83 | DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, RecordBatchStream, |
81 | 84 | SendableRecordBatchStream, |
@@ -164,6 +167,8 @@ where |
164 | 167 | // after the work has been done and just assume that that succeeded. |
165 | 168 | // The poll result is ignored because we don't want to discard |
166 | 169 | // or buffer the Ready result we got from the inner stream. |
| 170 | + |
| 171 | + use std::future::Future; |
167 | 172 | let consume = tokio::task::coop::consume_budget(); |
168 | 173 | let consume_ref = std::pin::pin!(consume); |
169 | 174 | let _ = consume_ref.poll(cx); |
@@ -291,6 +296,24 @@ impl ExecutionPlan for CooperativeExec { |
291 | 296 | fn cardinality_effect(&self) -> CardinalityEffect { |
292 | 297 | Equal |
293 | 298 | } |
| 299 | + |
| 300 | + fn gather_filters_for_pushdown( |
| 301 | + &self, |
| 302 | + _phase: FilterPushdownPhase, |
| 303 | + parent_filters: Vec<Arc<dyn PhysicalExpr>>, |
| 304 | + _config: &ConfigOptions, |
| 305 | + ) -> Result<FilterDescription> { |
| 306 | + FilterDescription::from_children(parent_filters, &self.children()) |
| 307 | + } |
| 308 | + |
| 309 | + fn handle_child_pushdown_result( |
| 310 | + &self, |
| 311 | + _phase: FilterPushdownPhase, |
| 312 | + child_pushdown_result: ChildPushdownResult, |
| 313 | + _config: &ConfigOptions, |
| 314 | + ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> { |
| 315 | + Ok(FilterPushdownPropagation::if_all(child_pushdown_result)) |
| 316 | + } |
294 | 317 | } |
295 | 318 |
|
296 | 319 | /// Creates a [`CooperativeStream`] wrapper around the given [`RecordBatchStream`]. |
|
0 commit comments