11use crate :: common:: { require_one_child, serialize_uuid} ;
22use crate :: coordinator:: metrics_store:: MetricsStore ;
33use crate :: coordinator:: prepare_static_plan:: prepare_static_plan;
4+ use crate :: coordinator:: query_coordinator:: QueryCoordinator ;
45use crate :: distributed_planner:: NetworkBoundaryExt ;
56use crate :: worker:: generated:: worker:: TaskKey ;
67use datafusion:: common:: internal_datafusion_err;
7- use datafusion:: common:: runtime:: JoinSet ;
88use datafusion:: common:: tree_node:: { TreeNode , TreeNodeRecursion } ;
99use datafusion:: common:: { Result , exec_err} ;
1010use datafusion:: execution:: { SendableRecordBatchStream , TaskContext } ;
@@ -14,8 +14,7 @@ use datafusion::physical_plan::stream::RecordBatchReceiverStreamBuilder;
1414use datafusion:: physical_plan:: { DisplayAs , DisplayFormatType , ExecutionPlan , PlanProperties } ;
1515use futures:: StreamExt ;
1616use std:: fmt:: Formatter ;
17- use std:: sync:: Arc ;
18- use std:: sync:: Mutex ;
17+ use std:: sync:: { Arc , Mutex } ;
1918
2019/// [ExecutionPlan] that executes the inner plan in distributed mode.
2120/// Before executing it, two modifications are lazily performed on the plan:
@@ -26,22 +25,39 @@ use std::sync::Mutex;
2625/// over the wire.
2726#[ derive( Debug ) ]
2827pub struct DistributedExec {
29- plan : Arc < dyn ExecutionPlan > ,
30- prepared_plan : Arc < Mutex < Option < Arc < dyn ExecutionPlan > > > > ,
28+ /// Initial [ExecutionPlan] present before execution.
29+ /// - If the plan was distributed statically, this will be the final distributed plan with all
30+ /// the appropriate network boundaries in it.
31+ /// - If the plan is going to be distributed dynamically during execution, this is the initial
32+ /// non-distributed plan.
33+ base_plan : Arc < dyn ExecutionPlan > ,
34+ /// Resulting [ExecutionPlan] after execution ready for visualization purposes.
35+ /// - If the plan was distributed statically, this is equal to the base plan.
36+ /// - If the plan is going to be distributed dynamically during execution, this is the resulting
37+ /// plan re-calculated based on runtime statistics.
38+ plan_for_viz : Arc < Mutex < Option < Arc < dyn ExecutionPlan > > > > ,
39+ /// The head stage meant to be executed locally on [DistributedExec::execute].
40+ head_stage : Arc < Mutex < Option < Arc < dyn ExecutionPlan > > > > ,
41+ /// DataFusion metrics.
3142 metrics : ExecutionPlanMetricsSet ,
43+ /// Storage where metrics collected from workers at runtime will place their results as they
44+ /// finish their respective remote tasks.
3245 pub ( crate ) metrics_store : Option < Arc < MetricsStore > > ,
3346}
3447
3548pub ( super ) struct PreparedPlan {
49+ /// The head stage meant to be executed locally by the coordinator.
3650 pub ( super ) head_stage : Arc < dyn ExecutionPlan > ,
37- pub ( super ) join_set : JoinSet < Result < ( ) > > ,
51+ /// A final representation of the plan for visualization purposes.
52+ pub ( super ) plan_for_viz : Arc < dyn ExecutionPlan > ,
3853}
3954
4055impl DistributedExec {
41- pub fn new ( plan : Arc < dyn ExecutionPlan > ) -> Self {
56+ pub fn new ( base_plan : Arc < dyn ExecutionPlan > ) -> Self {
4257 Self {
43- plan,
44- prepared_plan : Arc :: new ( Mutex :: new ( None ) ) ,
58+ base_plan,
59+ plan_for_viz : Arc :: new ( Mutex :: new ( None ) ) ,
60+ head_stage : Arc :: new ( Mutex :: new ( None ) ) ,
4561 metrics : ExecutionPlanMetricsSet :: new ( ) ,
4662 metrics_store : None ,
4763 }
@@ -68,7 +84,10 @@ impl DistributedExec {
6884 let Some ( task_metrics) = & self . metrics_store else {
6985 return ;
7086 } ;
71- let _ = self . plan . apply ( |plan| {
87+ let Some ( plan) = self . plan_for_viz . lock ( ) . unwrap ( ) . as_ref ( ) . cloned ( ) else {
88+ return ;
89+ } ;
90+ let _ = plan. apply ( |plan| {
7291 if let Some ( boundary) = plan. as_network_boundary ( ) {
7392 let stage = boundary. input_stage ( ) ;
7493 for i in 0 ..stage. task_count ( ) {
@@ -93,15 +112,27 @@ impl DistributedExec {
93112 /// Returns the plan which is lazily prepared on `execute()` and actually gets executed.
94113 /// It is updated on every call to `execute()`. Returns an error if `.execute()` has not been
95114 /// called.
96- pub ( crate ) fn prepared_plan ( & self ) -> Result < Arc < dyn ExecutionPlan > > {
97- self . prepared_plan
115+ pub ( crate ) fn plan_for_viz ( & self ) -> Result < Arc < dyn ExecutionPlan > > {
116+ self . plan_for_viz
98117 . lock ( )
99118 . map_err ( |e| internal_datafusion_err ! ( "Failed to lock prepared plan: {}" , e) ) ?
100119 . clone ( )
101120 . ok_or_else ( || {
102121 internal_datafusion_err ! ( "No prepared plan found. Was execute() called?" )
103122 } )
104123 }
124+
125+ /// Returns the head stage that was actually executed. Unlike [`Self::plan_for_viz`] (which is
126+ /// reconstructed for visualization, with `Stage::Local` boundaries and rebuilt ancestor
127+ /// `Arc`s), this returns the original `Arc` instances whose metrics were populated during
128+ /// execution.
129+ pub ( crate ) fn head_stage ( & self ) -> Result < Arc < dyn ExecutionPlan > > {
130+ self . head_stage
131+ . lock ( )
132+ . map_err ( |e| internal_datafusion_err ! ( "Failed to lock head stage: {}" , e) ) ?
133+ . clone ( )
134+ . ok_or_else ( || internal_datafusion_err ! ( "No head stage found. Was execute() called?" ) )
135+ }
105136}
106137
107138impl DisplayAs for DistributedExec {
@@ -116,20 +147,21 @@ impl ExecutionPlan for DistributedExec {
116147 }
117148
118149 fn properties ( & self ) -> & Arc < PlanProperties > {
119- self . plan . properties ( )
150+ self . base_plan . properties ( )
120151 }
121152
122153 fn children ( & self ) -> Vec < & Arc < dyn ExecutionPlan > > {
123- vec ! [ & self . plan ]
154+ vec ! [ & self . base_plan ]
124155 }
125156
126157 fn with_new_children (
127158 self : Arc < Self > ,
128159 children : Vec < Arc < dyn ExecutionPlan > > ,
129160 ) -> Result < Arc < dyn ExecutionPlan > > {
130161 Ok ( Arc :: new ( DistributedExec {
131- plan : require_one_child ( & children) ?,
132- prepared_plan : self . prepared_plan . clone ( ) ,
162+ base_plan : require_one_child ( & children) ?,
163+ plan_for_viz : Arc :: new ( Mutex :: new ( None ) ) ,
164+ head_stage : Arc :: new ( Mutex :: new ( None ) ) ,
133165 metrics : self . metrics . clone ( ) ,
134166 metrics_store : self . metrics_store . clone ( ) ,
135167 } ) )
@@ -150,36 +182,43 @@ impl ExecutionPlan for DistributedExec {
150182 ) ;
151183 }
152184
153- let PreparedPlan {
154- head_stage,
155- join_set,
156- } = prepare_static_plan ( & self . plan , & self . metrics , & self . metrics_store , & context) ?;
157- {
158- let mut guard = self
159- . prepared_plan
160- . lock ( )
161- . map_err ( |e| internal_datafusion_err ! ( "Failed to lock prepared plan: {e}" ) ) ?;
162- * guard = Some ( head_stage. clone ( ) ) ;
163- }
185+ let base_plan = Arc :: clone ( & self . base_plan ) ;
186+ let plan_for_viz = Arc :: clone ( & self . plan_for_viz ) ;
187+ let head_stage = Arc :: clone ( & self . head_stage ) ;
188+
189+ let query_coordinator = QueryCoordinator :: new (
190+ Arc :: clone ( & context) ,
191+ & self . metrics ,
192+ self . metrics_store . clone ( ) ,
193+ ) ;
194+
164195 let mut builder = RecordBatchReceiverStreamBuilder :: new ( self . schema ( ) , 1 ) ;
165196 let tx = builder. tx ( ) ;
166- // Spawn the task that pulls data from child...
197+
167198 builder. spawn ( async move {
168- let mut stream = head_stage. execute ( partition, context) ?;
199+ let _guard = query_coordinator. end_query_guard ( ) ;
200+
201+ let result = prepare_static_plan ( & query_coordinator, & base_plan) ?;
202+
203+ plan_for_viz
204+ . lock ( )
205+ . expect ( "poisoned lock" )
206+ . replace ( result. plan_for_viz ) ;
207+ head_stage
208+ . lock ( )
209+ . expect ( "poisoned lock" )
210+ . replace ( Arc :: clone ( & result. head_stage ) ) ;
211+ let mut stream = result. head_stage . execute ( partition, context) ?;
169212 while let Some ( msg) = stream. next ( ) . await {
170213 if tx. send ( msg) . await . is_err ( ) {
171214 break ; // channel closed
172215 }
173216 }
217+ drop ( tx) ;
218+ query_coordinator. drain_pending_tasks ( ) . await ?;
174219 Ok ( ( ) )
175220 } ) ;
176- // ...in parallel to the one that feeds the plan to workers.
177- builder. spawn ( async move {
178- for res in join_set. join_all ( ) . await {
179- res?;
180- }
181- Ok ( ( ) )
182- } ) ;
221+
183222 Ok ( builder. build ( ) )
184223 }
185224
0 commit comments