1515// specific language governing permissions and limitations
1616// under the License.
1717
18- //! The `InsertYieldExec` optimizer rule inspects the physical plan to look for
19- //! tight-looping operators and inserts explicit yielding mechanisms (whether
20- //! as a separate operator, or via a yielding variant) at leaf nodes to make
21- //! the plan cancellation friendly.
18+ //! The [`InsertYieldExec`] optimizer rule inspects the physical plan to find all leaf
19+ //! nodes corresponding to tight-looping operators. It first attempts to replace
20+ //! each leaf with a cooperative-yielding variant via `with_cooperative_yields`,
21+ //! and only if no built-in variant exists does it wrap the node in a
22+ //! [`YieldStreamExec`] operator to enforce periodic yielding, ensuring the plan
23+ //! remains cancellation-friendly.
2224
2325use std:: fmt:: { Debug , Formatter } ;
2426use std:: sync:: Arc ;
@@ -32,9 +34,10 @@ use datafusion_physical_plan::yield_stream::YieldStreamExec;
3234use datafusion_physical_plan:: ExecutionPlan ;
3335
3436/// `InsertYieldExec` is a [`PhysicalOptimizerRule`] that finds every leaf node in
35- /// the plan, and replaces it with a variant that cooperatively yields
36- /// either using the its yielding variant given by `with_cooperative_yields`,
37- /// or, if none exists, by inserting a [`YieldStreamExec`] operator as a parent.
37+ /// the plan and replaces it with a variant that yields cooperatively if supported.
38+ /// If the node does not provide a built-in yielding variant via
39+ /// [`ExecutionPlan::with_cooperative_yields`], it is wrapped in a [`YieldStreamExec`] parent to
40+ /// enforce a configured yield frequency.
3841pub struct InsertYieldExec { }
3942
4043impl InsertYieldExec {
@@ -73,10 +76,11 @@ impl PhysicalOptimizerRule for InsertYieldExec {
7376 // Not a leaf, keep recursing down.
7477 return Ok ( Transformed :: no ( plan) ) ;
7578 }
79+ // For leaf nodes, try to get a built-in cooperative-yielding variant.
7680 let new_plan = Arc :: clone ( & plan)
7781 . with_cooperative_yields ( )
7882 . unwrap_or_else ( || {
79- // Otherwise , insert a `YieldStreamExec` to enforce periodic yielding .
83+ // Only if no built-in variant exists , insert a `YieldStreamExec`.
8084 Arc :: new ( YieldStreamExec :: new ( plan, yield_period) )
8185 } ) ;
8286 Ok ( Transformed :: new ( new_plan, true , TreeNodeRecursion :: Jump ) )
@@ -92,3 +96,27 @@ impl PhysicalOptimizerRule for InsertYieldExec {
9296 true
9397 }
9498}
99+
100+ #[ cfg( test) ]
101+ mod tests {
102+ use super :: * ;
103+ use datafusion_common:: config:: ConfigOptions ;
104+ use datafusion_physical_plan:: { displayable, test:: scan_partitioned} ;
105+ use insta:: assert_snapshot;
106+
107+ #[ tokio:: test]
108+ async fn test_yield_stream_exec_for_custom_exec ( ) {
109+ let test_custom_exec = scan_partitioned ( 1 ) ;
110+ let config = ConfigOptions :: new ( ) ;
111+ let optimized = InsertYieldExec :: new ( )
112+ . optimize ( test_custom_exec, & config)
113+ . unwrap ( ) ;
114+
115+ let display = displayable ( optimized. as_ref ( ) ) . indent ( true ) . to_string ( ) ;
116+ // Use insta snapshot to ensure full plan structure
117+ assert_snapshot ! ( display, @r###"
118+ YieldStreamExec frequency=64
119+ DataSourceExec: partitions=1, partition_sizes=[1]
120+ "### ) ;
121+ }
122+ }
0 commit comments