Skip to content

Commit fe92488

Browse files
committed
Add ProducerHead for ensuring that the appropriate node is placed at the head of stages
1 parent d33afcc commit fe92488

20 files changed

Lines changed: 290 additions & 116 deletions

src/distributed_planner/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ mod task_estimator;
1111

1212
pub use distributed_config::DistributedConfig;
1313
pub use network_boundary::{NetworkBoundary, NetworkBoundaryExt};
14+
pub(crate) use network_boundary::{ProducerHead, insert_producer_head};
1415
pub use session_state_builder_ext::SessionStateBuilderExt;
1516
pub(crate) use task_estimator::set_distributed_task_estimator;
1617
pub use task_estimator::{TaskCountAnnotation, TaskEstimation, TaskEstimator, TaskRoutingContext};
Lines changed: 42 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1-
use crate::{NetworkBroadcastExec, NetworkCoalesceExec, NetworkShuffleExec, Stage};
1+
use crate::{BroadcastExec, NetworkBroadcastExec, NetworkCoalesceExec, NetworkShuffleExec, Stage};
22
use datafusion::common::Result;
3-
use datafusion::physical_plan::ExecutionPlan;
3+
use datafusion::physical_expr::Partitioning;
4+
use datafusion::physical_plan::repartition::RepartitionExec;
5+
use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties};
46
use std::sync::Arc;
57

68
/// This trait represents a node that introduces the necessity of a network boundary in the plan.
@@ -15,6 +17,22 @@ pub trait NetworkBoundary: ExecutionPlan {
1517

1618
/// Returns the assigned input [Stage], if any.
1719
fn input_stage(&self) -> &Stage;
20+
21+
/// Defines what head node should the producer stage feeding this [NetworkBoundary]
22+
/// implementation have. This information is used during planning an executing for ensuring
23+
/// the head of a stage has the appropriate shape for consumption.
24+
fn producer_head(&self, consumer_tasks: usize) -> ProducerHead;
25+
}
26+
27+
/// Defines what shape should the head node of a stage have upon getting executed. Depending
28+
/// on the [NetworkBoundary] implementation, the stage below should have different head nodes.
29+
pub enum ProducerHead {
30+
/// No specific head node is necessary.
31+
None,
32+
/// The head node should be a [BroadcastExec].
33+
BroadcastExec { output_partitions: usize },
34+
/// The head node should be a [RepartitionExec].
35+
RepartitionExec { partitioning: Partitioning },
1836
}
1937

2038
/// Extension trait for downcasting dynamic types to [NetworkBoundary].
@@ -41,38 +59,28 @@ impl NetworkBoundaryExt for dyn ExecutionPlan {
4159
}
4260
}
4361

44-
/// Scales up the head node of the input stage of a network boundary. Different network boundaries
45-
/// have different needs for scaling up their input, like for example, scaling up a RepartitionExec
46-
/// during shuffles.
47-
pub(crate) fn network_boundary_scale_input(
62+
/// Ensures the head of the provided plan complies with the passed [ProducerHead] definition. This
63+
/// can be called both during planning and lazily at runtime.
64+
pub(crate) fn insert_producer_head(
4865
input: Arc<dyn ExecutionPlan>,
49-
consumer_partitions: usize,
50-
consumer_task_count: usize,
66+
head: ProducerHead,
5167
) -> Result<Arc<dyn ExecutionPlan>> {
52-
let transformed = NetworkShuffleExec::scale_input(
53-
Arc::clone(&input),
54-
consumer_partitions,
55-
consumer_task_count,
56-
)?;
57-
if transformed.transformed {
58-
return Ok(transformed.data);
59-
}
60-
let transformed = NetworkBroadcastExec::scale_input(
61-
Arc::clone(&input),
62-
consumer_partitions,
63-
consumer_task_count,
64-
)?;
65-
if transformed.transformed {
66-
return Ok(transformed.data);
67-
}
68-
let transformed = NetworkCoalesceExec::scale_input(
69-
Arc::clone(&input),
70-
consumer_partitions,
71-
consumer_task_count,
72-
)?;
73-
if transformed.transformed {
74-
return Ok(transformed.data);
75-
}
76-
77-
Ok(input)
68+
let input = if let Some(r_exec) = input.as_any().downcast_ref::<RepartitionExec>() {
69+
Arc::clone(r_exec.input())
70+
} else if let Some(b_exec) = input.as_any().downcast_ref::<BroadcastExec>() {
71+
Arc::clone(b_exec.input())
72+
} else {
73+
input
74+
};
75+
let plan = match head {
76+
ProducerHead::None => input,
77+
ProducerHead::BroadcastExec { output_partitions } => {
78+
let partitions = input.output_partitioning().partition_count();
79+
Arc::new(BroadcastExec::new(input, output_partitions / partitions))
80+
}
81+
ProducerHead::RepartitionExec { partitioning } => {
82+
Arc::new(RepartitionExec::try_new(input, partitioning)?)
83+
}
84+
};
85+
Ok(plan)
7886
}

src/distributed_planner/prepare_network_boundaries.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::common::TreeNodeExt;
2-
use crate::distributed_planner::network_boundary::network_boundary_scale_input;
2+
use crate::distributed_planner::network_boundary::insert_producer_head;
33
use crate::stage::LocalStage;
44
use crate::{NetworkBoundaryExt, Stage};
55
use datafusion::common::Result;
@@ -35,11 +35,8 @@ pub(crate) fn prepare_network_boundaries(
3535

3636
// 2) Scale up the head node of the input stage in order to account for the amount of partition
3737
// and consumer count above it.
38-
let plan = network_boundary_scale_input(
39-
Arc::clone(&input_stage.plan),
40-
nb.properties().partitioning.partition_count(),
41-
task_count,
42-
)?;
38+
let plan =
39+
insert_producer_head(Arc::clone(&input_stage.plan), nb.producer_head(task_count))?;
4340

4441
// 3) Make sure the input stage can be uniquely identified with a stage index and query id.
4542
// If there were already some `query_id` and `num` that's fine.

src/execution_plans/benchmarks/shuffle_bench.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ impl ShuffleFixture {
223223
let shuffle = NetworkShuffleExec {
224224
properties: Arc::new(PlanProperties::new(
225225
EquivalenceProperties::new(Arc::clone(&self.schema)),
226-
Partitioning::UnknownPartitioning(self.bench.partitions),
226+
Partitioning::Hash(vec![Arc::new(Column::new("id", 0))], self.bench.partitions),
227227
EmissionType::Incremental,
228228
Boundedness::Bounded,
229229
)),

src/execution_plans/benchmarks/transport_bench.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ impl TransportFixture {
273273
let shuffle = NetworkShuffleExec {
274274
properties: Arc::new(PlanProperties::new(
275275
EquivalenceProperties::new(Arc::clone(&self.schema)),
276-
Partitioning::UnknownPartitioning(self.bench.partitions),
276+
Partitioning::RoundRobinBatch(self.bench.partitions),
277277
EmissionType::Incremental,
278278
Boundedness::Bounded,
279279
)),

src/execution_plans/broadcast.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,10 @@ impl BroadcastExec {
105105
pub fn consumer_task_count(&self) -> usize {
106106
self.consumer_task_count
107107
}
108+
109+
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
110+
&self.input
111+
}
108112
}
109113

110114
impl DisplayAs for BroadcastExec {

src/execution_plans/network_broadcast.rs

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
use crate::common::require_one_child;
2-
use crate::distributed_planner::NetworkBoundary;
2+
use crate::distributed_planner::{NetworkBoundary, ProducerHead};
33
use crate::stage::{LocalStage, Stage};
44
use crate::worker::WorkerConnectionPool;
55
use crate::{BroadcastExec, DistributedTaskContext};
6-
use datafusion::common::tree_node::Transformed;
76
use datafusion::common::{Result, not_impl_err, plan_err};
87
use datafusion::error::DataFusionError;
98
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
@@ -124,21 +123,6 @@ pub struct NetworkBroadcastExec {
124123
}
125124

126125
impl NetworkBroadcastExec {
127-
pub(crate) fn scale_input(
128-
plan: Arc<dyn ExecutionPlan>,
129-
_consumer_partitions: usize,
130-
consumer_task_count: usize,
131-
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
132-
let Some(broadcast) = plan.as_any().downcast_ref::<BroadcastExec>() else {
133-
return Ok(Transformed::no(plan));
134-
};
135-
136-
Ok(Transformed::yes(Arc::new(BroadcastExec::new(
137-
require_one_child(broadcast.children())?,
138-
consumer_task_count,
139-
))))
140-
}
141-
142126
pub(crate) fn from_stage(input_stage: Stage, input_properties: Arc<PlanProperties>) -> Self {
143127
let input_partition_count = input_properties.partitioning.partition_count();
144128
let properties = Arc::new(
@@ -187,6 +171,13 @@ impl NetworkBoundary for NetworkBroadcastExec {
187171
fn input_stage(&self) -> &Stage {
188172
&self.input_stage
189173
}
174+
175+
fn producer_head(&self, consumer_task_count: usize) -> ProducerHead {
176+
let partition_count = self.properties.output_partitioning().partition_count();
177+
ProducerHead::BroadcastExec {
178+
output_partitions: partition_count * consumer_task_count,
179+
}
180+
}
190181
}
191182

192183
impl DisplayAs for NetworkBroadcastExec {
@@ -252,14 +243,16 @@ impl ExecutionPlan for NetworkBroadcastExec {
252243
};
253244

254245
let task_context = DistributedTaskContext::from_ctx(&context);
255-
let off = self.properties.partitioning.partition_count() * task_context.task_index;
246+
let out_partitions = self.properties.partitioning.partition_count();
247+
let off = out_partitions * task_context.task_index;
256248
let mut streams = Vec::with_capacity(self.input_stage.task_count());
257249

258250
for input_task_index in 0..self.input_stage.task_count() {
259251
let worker_connection = self.worker_connections.get_or_init_worker_connection(
260252
remote_stage,
261253
off..(off + self.properties.partitioning.partition_count()),
262254
input_task_index,
255+
self.producer_head(task_context.task_count),
263256
&context,
264257
)?;
265258

src/execution_plans/network_coalesce.rs

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
use crate::DistributedTaskContext;
22
use crate::common::require_one_child;
3-
use crate::distributed_planner::NetworkBoundary;
3+
use crate::distributed_planner::{NetworkBoundary, ProducerHead};
44
use crate::execution_plans::common::scale_partitioning_props;
55
use crate::stage::{LocalStage, Stage};
66
use crate::worker::WorkerConnectionPool;
7-
use datafusion::common::tree_node::Transformed;
87
use datafusion::common::{exec_err, not_impl_err, plan_err};
98
use datafusion::error::Result;
109
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
@@ -82,16 +81,6 @@ pub struct NetworkCoalesceExec {
8281
}
8382

8483
impl NetworkCoalesceExec {
85-
/// Does nothing, but it's here for explicitly stating that this network boundary does not
86-
/// need to mutate the input plan in other to account for more consumer tasks.
87-
pub(crate) fn scale_input(
88-
plan: Arc<dyn ExecutionPlan>,
89-
_consumer_partitions: usize,
90-
_consumer_task_count: usize,
91-
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
92-
Ok(Transformed::no(plan))
93-
}
94-
9584
pub(crate) fn from_stage(
9685
input_stage: Stage,
9786
input_properties: Arc<PlanProperties>,
@@ -188,6 +177,10 @@ impl NetworkBoundary for NetworkCoalesceExec {
188177
self_clone.input_stage = input_stage;
189178
Ok(Arc::new(self_clone))
190179
}
180+
181+
fn producer_head(&self, _consumer_task_count: usize) -> ProducerHead {
182+
ProducerHead::None
183+
}
191184
}
192185

193186
impl DisplayAs for NetworkCoalesceExec {
@@ -304,6 +297,7 @@ impl ExecutionPlan for NetworkCoalesceExec {
304297
remote_stage,
305298
0..partitions_per_task,
306299
target_task,
300+
self.producer_head(task_context.task_count),
307301
&context,
308302
)?;
309303

src/execution_plans/network_shuffle.rs

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use crate::common::require_one_child;
2+
use crate::distributed_planner::ProducerHead;
23
use crate::execution_plans::common::scale_partitioning;
34
use crate::stage::{LocalStage, Stage};
45
use crate::worker::WorkerConnectionPool;
56
use crate::{DistributedTaskContext, NetworkBoundary};
6-
use datafusion::common::tree_node::{Transformed, TreeNodeRecursion};
77
use datafusion::common::{Result, not_impl_err, plan_err};
88
use datafusion::error::DataFusionError;
99
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
@@ -106,26 +106,6 @@ pub struct NetworkShuffleExec {
106106
}
107107

108108
impl NetworkShuffleExec {
109-
pub(crate) fn scale_input(
110-
plan: Arc<dyn ExecutionPlan>,
111-
consumer_partitions: usize,
112-
consumer_task_count: usize,
113-
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
114-
let Some(repartition_exec) = plan.as_any().downcast_ref::<RepartitionExec>() else {
115-
return Ok(Transformed::no(plan));
116-
};
117-
118-
let child = require_one_child(repartition_exec.children())?;
119-
let partitioning = scale_partitioning(repartition_exec.partitioning(), |_| {
120-
consumer_partitions * consumer_task_count
121-
});
122-
123-
// Scale the input RepartitionExec to account for all the tasks to which it will
124-
// need to fan data out.
125-
let scaled = Arc::new(RepartitionExec::try_new(child, partitioning)?);
126-
Ok(Transformed::new(scaled, true, TreeNodeRecursion::Stop))
127-
}
128-
129109
pub(crate) fn from_stage(input_stage: Stage, input_properties: Arc<PlanProperties>) -> Self {
130110
Self {
131111
properties: input_properties,
@@ -171,6 +151,14 @@ impl NetworkBoundary for NetworkShuffleExec {
171151
self_clone.input_stage = input_stage;
172152
Ok(Arc::new(self_clone))
173153
}
154+
155+
fn producer_head(&self, consumer_task_count: usize) -> ProducerHead {
156+
ProducerHead::RepartitionExec {
157+
partitioning: scale_partitioning(&self.properties.partitioning, |prev| {
158+
prev * consumer_task_count
159+
}),
160+
}
161+
}
174162
}
175163

176164
impl DisplayAs for NetworkShuffleExec {
@@ -238,6 +226,7 @@ impl ExecutionPlan for NetworkShuffleExec {
238226
remote_stage,
239227
off..(off + self.properties.partitioning.partition_count()),
240228
input_task_index,
229+
self.producer_head(task_context.task_count),
241230
&context,
242231
)?;
243232

src/observability/service.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ impl ObservabilityService for ObservabilityServiceImpl {
9696
let total_partitions = task_data.total_partitions() as u64;
9797
let remaining = task_data.num_partitions_remaining() as u64;
9898
let completed_partitions = total_partitions.saturating_sub(remaining);
99-
let output_rows = output_rows_from_plan(&task_data.plan);
99+
let output_rows = output_rows_from_plan(&task_data.base_plan);
100100

101101
tasks.push(TaskProgress {
102102
task_key: Some((*internal_key).clone()),

0 commit comments

Comments
 (0)