Skip to content

Commit 2f75468

Browse files
authored
Lazily set the producer head at execution time (#478)
This is one PR from the following stack of PRs: - #477 - #463 - #464 - #478 <- you are here - #479 - #486 - #432 --- Introduces the `ProducerHead` type: ```rust pub enum ProducerHead { /// No specific head node is necessary. None, /// The head node should be a [BroadcastExec]. BroadcastExec { output_partitions: usize }, /// The head node should be a [RepartitionExec]. RepartitionExec { partitioning: Partitioning }, } ``` Which is passed over the network while remotely executing tasks in order to set the appropriate node at the head of a stage. Today, this is a noop because the right head node in stages is ensured statically at planning time, but in follow up PRs, network boundaries can get swamped and reorganized arbitrarily. One example that happens in AQE: 1. A JOIN is planned as a CollectLeft ```js HashJoinExec: mode=CollectLeft CoalescePartitionsExec: [Stage 1] => NetworkBroadcastExec BroadcastExec DistributedLeafExec: unknown size DistributedLeafExec: unknown size ``` 2. While collecting runtime statistics, it happens that `Stage 1` is huge, and during AQE the JOINs are swapped ```js HashJoinExec: mode=CollectLeft DistributedLeafExec: small size CoalescePartitionsExec: [Stage 1] => NetworkBroadcastExec BroadcastExec DistributedLeafExec: big size ``` 3. The `Stage 1` is now on the probe side, so it needs to be rewritten to a `NetworkShuffleExec`, otherwise duplicate data will be returned: ```js HashJoinExec: mode=CollectLeft DistributedLeafExec: small size CoalescePartitionsExec: [Stage 1] => NetworkShuffleExec RepartitionExec // <- dynamically swapped at runtime based on the passed `ProducerHead` DistributedLeafExec: big size ``` Passing a `ProducerHead` at execution time unlocks two things: 1. dynamically set the fanout width accounting for a dynamically scaled upper stage 2. dynamically set the correct operator `BroadcastExec` or `RepartitionExec` based on the network boundary above, which might have changed because of AQE
1 parent d2ec4fd commit 2f75468

20 files changed

Lines changed: 290 additions & 116 deletions

src/distributed_planner/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ mod task_estimator;
1111

1212
pub use distributed_config::DistributedConfig;
1313
pub use network_boundary::{NetworkBoundary, NetworkBoundaryExt};
14+
pub(crate) use network_boundary::{ProducerHead, insert_producer_head};
1415
pub use session_state_builder_ext::SessionStateBuilderExt;
1516
pub(crate) use task_estimator::set_distributed_task_estimator;
1617
pub use task_estimator::{TaskCountAnnotation, TaskEstimation, TaskEstimator, TaskRoutingContext};
Lines changed: 42 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1-
use crate::{NetworkBroadcastExec, NetworkCoalesceExec, NetworkShuffleExec, Stage};
1+
use crate::{BroadcastExec, NetworkBroadcastExec, NetworkCoalesceExec, NetworkShuffleExec, Stage};
22
use datafusion::common::Result;
3-
use datafusion::physical_plan::ExecutionPlan;
3+
use datafusion::physical_expr::Partitioning;
4+
use datafusion::physical_plan::repartition::RepartitionExec;
5+
use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties};
46
use std::sync::Arc;
57

68
/// This trait represents a node that introduces the necessity of a network boundary in the plan.
@@ -15,6 +17,22 @@ pub trait NetworkBoundary: ExecutionPlan {
1517

1618
/// Returns the assigned input [Stage], if any.
1719
fn input_stage(&self) -> &Stage;
20+
21+
/// Defines what head node should the producer stage feeding this [NetworkBoundary]
22+
/// implementation have. This information is used during planning an executing for ensuring
23+
/// the head of a stage has the appropriate shape for consumption.
24+
fn producer_head(&self, consumer_tasks: usize) -> ProducerHead;
25+
}
26+
27+
/// Defines what shape should the head node of a stage have upon getting executed. Depending
28+
/// on the [NetworkBoundary] implementation, the stage below should have different head nodes.
29+
pub enum ProducerHead {
30+
/// No specific head node is necessary.
31+
None,
32+
/// The head node should be a [BroadcastExec].
33+
BroadcastExec { output_partitions: usize },
34+
/// The head node should be a [RepartitionExec].
35+
RepartitionExec { partitioning: Partitioning },
1836
}
1937

2038
/// Extension trait for downcasting dynamic types to [NetworkBoundary].
@@ -41,38 +59,28 @@ impl NetworkBoundaryExt for dyn ExecutionPlan {
4159
}
4260
}
4361

44-
/// Scales up the head node of the input stage of a network boundary. Different network boundaries
45-
/// have different needs for scaling up their input, like for example, scaling up a RepartitionExec
46-
/// during shuffles.
47-
pub(crate) fn network_boundary_scale_input(
62+
/// Ensures the head of the provided plan complies with the passed [ProducerHead] definition. This
63+
/// can be called both during planning and lazily at runtime.
64+
pub(crate) fn insert_producer_head(
4865
input: Arc<dyn ExecutionPlan>,
49-
consumer_partitions: usize,
50-
consumer_task_count: usize,
66+
head: ProducerHead,
5167
) -> Result<Arc<dyn ExecutionPlan>> {
52-
let transformed = NetworkShuffleExec::scale_input(
53-
Arc::clone(&input),
54-
consumer_partitions,
55-
consumer_task_count,
56-
)?;
57-
if transformed.transformed {
58-
return Ok(transformed.data);
59-
}
60-
let transformed = NetworkBroadcastExec::scale_input(
61-
Arc::clone(&input),
62-
consumer_partitions,
63-
consumer_task_count,
64-
)?;
65-
if transformed.transformed {
66-
return Ok(transformed.data);
67-
}
68-
let transformed = NetworkCoalesceExec::scale_input(
69-
Arc::clone(&input),
70-
consumer_partitions,
71-
consumer_task_count,
72-
)?;
73-
if transformed.transformed {
74-
return Ok(transformed.data);
75-
}
76-
77-
Ok(input)
68+
let input = if let Some(r_exec) = input.downcast_ref::<RepartitionExec>() {
69+
Arc::clone(r_exec.input())
70+
} else if let Some(b_exec) = input.downcast_ref::<BroadcastExec>() {
71+
Arc::clone(b_exec.input())
72+
} else {
73+
input
74+
};
75+
let plan = match head {
76+
ProducerHead::None => input,
77+
ProducerHead::BroadcastExec { output_partitions } => {
78+
let partitions = input.output_partitioning().partition_count();
79+
Arc::new(BroadcastExec::new(input, output_partitions / partitions))
80+
}
81+
ProducerHead::RepartitionExec { partitioning } => {
82+
Arc::new(RepartitionExec::try_new(input, partitioning)?)
83+
}
84+
};
85+
Ok(plan)
7886
}

src/distributed_planner/prepare_network_boundaries.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::common::TreeNodeExt;
2-
use crate::distributed_planner::network_boundary::network_boundary_scale_input;
2+
use crate::distributed_planner::network_boundary::insert_producer_head;
33
use crate::stage::LocalStage;
44
use crate::{NetworkBoundaryExt, Stage};
55
use datafusion::common::Result;
@@ -35,11 +35,8 @@ pub(crate) fn prepare_network_boundaries(
3535

3636
// 2) Scale up the head node of the input stage in order to account for the amount of partition
3737
// and consumer count above it.
38-
let plan = network_boundary_scale_input(
39-
Arc::clone(&input_stage.plan),
40-
nb.properties().partitioning.partition_count(),
41-
task_count,
42-
)?;
38+
let plan =
39+
insert_producer_head(Arc::clone(&input_stage.plan), nb.producer_head(task_count))?;
4340

4441
// 3) Make sure the input stage can be uniquely identified with a stage index and query id.
4542
// If there were already some `query_id` and `num` that's fine.

src/execution_plans/benchmarks/shuffle_bench.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ impl ShuffleFixture {
223223
let shuffle = NetworkShuffleExec {
224224
properties: Arc::new(PlanProperties::new(
225225
EquivalenceProperties::new(Arc::clone(&self.schema)),
226-
Partitioning::UnknownPartitioning(self.bench.partitions),
226+
Partitioning::Hash(vec![Arc::new(Column::new("id", 0))], self.bench.partitions),
227227
EmissionType::Incremental,
228228
Boundedness::Bounded,
229229
)),

src/execution_plans/benchmarks/transport_bench.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ impl TransportFixture {
273273
let shuffle = NetworkShuffleExec {
274274
properties: Arc::new(PlanProperties::new(
275275
EquivalenceProperties::new(Arc::clone(&self.schema)),
276-
Partitioning::UnknownPartitioning(self.bench.partitions),
276+
Partitioning::RoundRobinBatch(self.bench.partitions),
277277
EmissionType::Incremental,
278278
Boundedness::Bounded,
279279
)),

src/execution_plans/broadcast.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,10 @@ impl BroadcastExec {
104104
pub fn consumer_task_count(&self) -> usize {
105105
self.consumer_task_count
106106
}
107+
108+
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
109+
&self.input
110+
}
107111
}
108112

109113
impl DisplayAs for BroadcastExec {

src/execution_plans/network_broadcast.rs

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
use crate::common::require_one_child;
2-
use crate::distributed_planner::NetworkBoundary;
2+
use crate::distributed_planner::{NetworkBoundary, ProducerHead};
33
use crate::stage::{LocalStage, Stage};
44
use crate::worker::WorkerConnectionPool;
55
use crate::{BroadcastExec, DistributedTaskContext};
6-
use datafusion::common::tree_node::Transformed;
76
use datafusion::common::{Result, not_impl_err, plan_err};
87
use datafusion::error::DataFusionError;
98
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
@@ -123,21 +122,6 @@ pub struct NetworkBroadcastExec {
123122
}
124123

125124
impl NetworkBroadcastExec {
126-
pub(crate) fn scale_input(
127-
plan: Arc<dyn ExecutionPlan>,
128-
_consumer_partitions: usize,
129-
consumer_task_count: usize,
130-
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
131-
let Some(broadcast) = plan.downcast_ref::<BroadcastExec>() else {
132-
return Ok(Transformed::no(plan));
133-
};
134-
135-
Ok(Transformed::yes(Arc::new(BroadcastExec::new(
136-
require_one_child(broadcast.children())?,
137-
consumer_task_count,
138-
))))
139-
}
140-
141125
pub(crate) fn from_stage(input_stage: Stage, input_properties: Arc<PlanProperties>) -> Self {
142126
let input_partition_count = input_properties.partitioning.partition_count();
143127
let properties = Arc::new(
@@ -186,6 +170,13 @@ impl NetworkBoundary for NetworkBroadcastExec {
186170
fn input_stage(&self) -> &Stage {
187171
&self.input_stage
188172
}
173+
174+
fn producer_head(&self, consumer_task_count: usize) -> ProducerHead {
175+
let partition_count = self.properties.output_partitioning().partition_count();
176+
ProducerHead::BroadcastExec {
177+
output_partitions: partition_count * consumer_task_count,
178+
}
179+
}
189180
}
190181

191182
impl DisplayAs for NetworkBroadcastExec {
@@ -247,14 +238,16 @@ impl ExecutionPlan for NetworkBroadcastExec {
247238
};
248239

249240
let task_context = DistributedTaskContext::from_ctx(&context);
250-
let off = self.properties.partitioning.partition_count() * task_context.task_index;
241+
let out_partitions = self.properties.partitioning.partition_count();
242+
let off = out_partitions * task_context.task_index;
251243
let mut streams = Vec::with_capacity(self.input_stage.task_count());
252244

253245
for input_task_index in 0..self.input_stage.task_count() {
254246
let worker_connection = self.worker_connections.get_or_init_worker_connection(
255247
remote_stage,
256248
off..(off + self.properties.partitioning.partition_count()),
257249
input_task_index,
250+
self.producer_head(task_context.task_count),
258251
&context,
259252
)?;
260253

src/execution_plans/network_coalesce.rs

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
use crate::DistributedTaskContext;
22
use crate::common::require_one_child;
3-
use crate::distributed_planner::NetworkBoundary;
3+
use crate::distributed_planner::{NetworkBoundary, ProducerHead};
44
use crate::execution_plans::common::scale_partitioning_props;
55
use crate::stage::{LocalStage, Stage};
66
use crate::worker::WorkerConnectionPool;
7-
use datafusion::common::tree_node::Transformed;
87
use datafusion::common::{exec_err, not_impl_err, plan_err};
98
use datafusion::error::Result;
109
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
@@ -81,16 +80,6 @@ pub struct NetworkCoalesceExec {
8180
}
8281

8382
impl NetworkCoalesceExec {
84-
/// Does nothing, but it's here for explicitly stating that this network boundary does not
85-
/// need to mutate the input plan in other to account for more consumer tasks.
86-
pub(crate) fn scale_input(
87-
plan: Arc<dyn ExecutionPlan>,
88-
_consumer_partitions: usize,
89-
_consumer_task_count: usize,
90-
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
91-
Ok(Transformed::no(plan))
92-
}
93-
9483
pub(crate) fn from_stage(
9584
input_stage: Stage,
9685
input_properties: Arc<PlanProperties>,
@@ -187,6 +176,10 @@ impl NetworkBoundary for NetworkCoalesceExec {
187176
self_clone.input_stage = input_stage;
188177
Ok(Arc::new(self_clone))
189178
}
179+
180+
fn producer_head(&self, _consumer_task_count: usize) -> ProducerHead {
181+
ProducerHead::None
182+
}
190183
}
191184

192185
impl DisplayAs for NetworkCoalesceExec {
@@ -299,6 +292,7 @@ impl ExecutionPlan for NetworkCoalesceExec {
299292
remote_stage,
300293
0..partitions_per_task,
301294
target_task,
295+
self.producer_head(task_context.task_count),
302296
&context,
303297
)?;
304298

src/execution_plans/network_shuffle.rs

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use crate::common::require_one_child;
2+
use crate::distributed_planner::ProducerHead;
23
use crate::execution_plans::common::scale_partitioning;
34
use crate::stage::{LocalStage, Stage};
45
use crate::worker::WorkerConnectionPool;
56
use crate::{DistributedTaskContext, NetworkBoundary};
6-
use datafusion::common::tree_node::{Transformed, TreeNodeRecursion};
77
use datafusion::common::{Result, not_impl_err, plan_err};
88
use datafusion::error::DataFusionError;
99
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
@@ -105,26 +105,6 @@ pub struct NetworkShuffleExec {
105105
}
106106

107107
impl NetworkShuffleExec {
108-
pub(crate) fn scale_input(
109-
plan: Arc<dyn ExecutionPlan>,
110-
consumer_partitions: usize,
111-
consumer_task_count: usize,
112-
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
113-
let Some(repartition_exec) = plan.downcast_ref::<RepartitionExec>() else {
114-
return Ok(Transformed::no(plan));
115-
};
116-
117-
let child = require_one_child(repartition_exec.children())?;
118-
let partitioning = scale_partitioning(repartition_exec.partitioning(), |_| {
119-
consumer_partitions * consumer_task_count
120-
});
121-
122-
// Scale the input RepartitionExec to account for all the tasks to which it will
123-
// need to fan data out.
124-
let scaled = Arc::new(RepartitionExec::try_new(child, partitioning)?);
125-
Ok(Transformed::new(scaled, true, TreeNodeRecursion::Stop))
126-
}
127-
128108
pub(crate) fn from_stage(input_stage: Stage, input_properties: Arc<PlanProperties>) -> Self {
129109
Self {
130110
properties: input_properties,
@@ -170,6 +150,14 @@ impl NetworkBoundary for NetworkShuffleExec {
170150
self_clone.input_stage = input_stage;
171151
Ok(Arc::new(self_clone))
172152
}
153+
154+
fn producer_head(&self, consumer_task_count: usize) -> ProducerHead {
155+
ProducerHead::RepartitionExec {
156+
partitioning: scale_partitioning(&self.properties.partitioning, |prev| {
157+
prev * consumer_task_count
158+
}),
159+
}
160+
}
173161
}
174162

175163
impl DisplayAs for NetworkShuffleExec {
@@ -233,6 +221,7 @@ impl ExecutionPlan for NetworkShuffleExec {
233221
remote_stage,
234222
off..(off + self.properties.partitioning.partition_count()),
235223
input_task_index,
224+
self.producer_head(task_context.task_count),
236225
&context,
237226
)?;
238227

src/observability/service.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ impl ObservabilityService for ObservabilityServiceImpl {
9696
let total_partitions = task_data.total_partitions() as u64;
9797
let remaining = task_data.num_partitions_remaining() as u64;
9898
let completed_partitions = total_partitions.saturating_sub(remaining);
99-
let output_rows = output_rows_from_plan(&task_data.plan);
99+
let output_rows = output_rows_from_plan(&task_data.base_plan);
100100

101101
tasks.push(TaskProgress {
102102
task_key: Some((*internal_key).clone()),

0 commit comments

Comments
 (0)