Skip to content

Commit 423e39a

Browse files
committed
Add ProducerHead for ensuring that the appropriate node is placed at the head of stages
1 parent ce7fb75 commit 423e39a

20 files changed

Lines changed: 290 additions & 116 deletions

src/distributed_planner/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ mod task_estimator;
1111

1212
pub use distributed_config::DistributedConfig;
1313
pub use network_boundary::{NetworkBoundary, NetworkBoundaryExt};
14+
pub(crate) use network_boundary::{ProducerHead, insert_producer_head};
1415
pub use session_state_builder_ext::SessionStateBuilderExt;
1516
pub(crate) use task_estimator::set_distributed_task_estimator;
1617
pub use task_estimator::{TaskCountAnnotation, TaskEstimation, TaskEstimator, TaskRoutingContext};
Lines changed: 42 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1-
use crate::{NetworkBroadcastExec, NetworkCoalesceExec, NetworkShuffleExec, Stage};
1+
use crate::{BroadcastExec, NetworkBroadcastExec, NetworkCoalesceExec, NetworkShuffleExec, Stage};
22
use datafusion::common::Result;
3-
use datafusion::physical_plan::ExecutionPlan;
3+
use datafusion::physical_expr::Partitioning;
4+
use datafusion::physical_plan::repartition::RepartitionExec;
5+
use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties};
46
use std::sync::Arc;
57

68
/// This trait represents a node that introduces the necessity of a network boundary in the plan.
@@ -15,6 +17,22 @@ pub trait NetworkBoundary: ExecutionPlan {
1517

1618
/// Returns the assigned input [Stage], if any.
1719
fn input_stage(&self) -> &Stage;
20+
21+
/// Defines what head node should the producer stage feeding this [NetworkBoundary]
22+
/// implementation have. This information is used during planning an executing for ensuring
23+
/// the head of a stage has the appropriate shape for consumption.
24+
fn producer_head(&self, consumer_tasks: usize) -> ProducerHead;
25+
}
26+
27+
/// Defines what shape should the head node of a stage have upon getting executed. Depending
28+
/// on the [NetworkBoundary] implementation, the stage below should have different head nodes.
29+
pub enum ProducerHead {
30+
/// No specific head node is necessary.
31+
None,
32+
/// The head node should be a [BroadcastExec].
33+
BroadcastExec { output_partitions: usize },
34+
/// The head node should be a [RepartitionExec].
35+
RepartitionExec { partitioning: Partitioning },
1836
}
1937

2038
/// Extension trait for downcasting dynamic types to [NetworkBoundary].
@@ -41,38 +59,28 @@ impl NetworkBoundaryExt for dyn ExecutionPlan {
4159
}
4260
}
4361

44-
/// Scales up the head node of the input stage of a network boundary. Different network boundaries
45-
/// have different needs for scaling up their input, like for example, scaling up a RepartitionExec
46-
/// during shuffles.
47-
pub(crate) fn network_boundary_scale_input(
62+
/// Ensures the head of the provided plan complies with the passed [ProducerHead] definition. This
63+
/// can be called both during planning and lazily at runtime.
64+
pub(crate) fn insert_producer_head(
4865
input: Arc<dyn ExecutionPlan>,
49-
consumer_partitions: usize,
50-
consumer_task_count: usize,
66+
head: ProducerHead,
5167
) -> Result<Arc<dyn ExecutionPlan>> {
52-
let transformed = NetworkShuffleExec::scale_input(
53-
Arc::clone(&input),
54-
consumer_partitions,
55-
consumer_task_count,
56-
)?;
57-
if transformed.transformed {
58-
return Ok(transformed.data);
59-
}
60-
let transformed = NetworkBroadcastExec::scale_input(
61-
Arc::clone(&input),
62-
consumer_partitions,
63-
consumer_task_count,
64-
)?;
65-
if transformed.transformed {
66-
return Ok(transformed.data);
67-
}
68-
let transformed = NetworkCoalesceExec::scale_input(
69-
Arc::clone(&input),
70-
consumer_partitions,
71-
consumer_task_count,
72-
)?;
73-
if transformed.transformed {
74-
return Ok(transformed.data);
75-
}
76-
77-
Ok(input)
68+
let input = if let Some(r_exec) = input.downcast_ref::<RepartitionExec>() {
69+
Arc::clone(r_exec.input())
70+
} else if let Some(b_exec) = input.downcast_ref::<BroadcastExec>() {
71+
Arc::clone(b_exec.input())
72+
} else {
73+
input
74+
};
75+
let plan = match head {
76+
ProducerHead::None => input,
77+
ProducerHead::BroadcastExec { output_partitions } => {
78+
let partitions = input.output_partitioning().partition_count();
79+
Arc::new(BroadcastExec::new(input, output_partitions / partitions))
80+
}
81+
ProducerHead::RepartitionExec { partitioning } => {
82+
Arc::new(RepartitionExec::try_new(input, partitioning)?)
83+
}
84+
};
85+
Ok(plan)
7886
}

src/distributed_planner/prepare_network_boundaries.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::common::TreeNodeExt;
2-
use crate::distributed_planner::network_boundary::network_boundary_scale_input;
2+
use crate::distributed_planner::network_boundary::insert_producer_head;
33
use crate::stage::LocalStage;
44
use crate::{NetworkBoundaryExt, Stage};
55
use datafusion::common::Result;
@@ -35,11 +35,8 @@ pub(crate) fn prepare_network_boundaries(
3535

3636
// 2) Scale up the head node of the input stage in order to account for the amount of partition
3737
// and consumer count above it.
38-
let plan = network_boundary_scale_input(
39-
Arc::clone(&input_stage.plan),
40-
nb.properties().partitioning.partition_count(),
41-
task_count,
42-
)?;
38+
let plan =
39+
insert_producer_head(Arc::clone(&input_stage.plan), nb.producer_head(task_count))?;
4340

4441
// 3) Make sure the input stage can be uniquely identified with a stage index and query id.
4542
// If there were already some `query_id` and `num` that's fine.

src/execution_plans/benchmarks/shuffle_bench.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ impl ShuffleFixture {
223223
let shuffle = NetworkShuffleExec {
224224
properties: Arc::new(PlanProperties::new(
225225
EquivalenceProperties::new(Arc::clone(&self.schema)),
226-
Partitioning::UnknownPartitioning(self.bench.partitions),
226+
Partitioning::Hash(vec![Arc::new(Column::new("id", 0))], self.bench.partitions),
227227
EmissionType::Incremental,
228228
Boundedness::Bounded,
229229
)),

src/execution_plans/benchmarks/transport_bench.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ impl TransportFixture {
273273
let shuffle = NetworkShuffleExec {
274274
properties: Arc::new(PlanProperties::new(
275275
EquivalenceProperties::new(Arc::clone(&self.schema)),
276-
Partitioning::UnknownPartitioning(self.bench.partitions),
276+
Partitioning::RoundRobinBatch(self.bench.partitions),
277277
EmissionType::Incremental,
278278
Boundedness::Bounded,
279279
)),

src/execution_plans/broadcast.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,10 @@ impl BroadcastExec {
104104
pub fn consumer_task_count(&self) -> usize {
105105
self.consumer_task_count
106106
}
107+
108+
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
109+
&self.input
110+
}
107111
}
108112

109113
impl DisplayAs for BroadcastExec {

src/execution_plans/network_broadcast.rs

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
use crate::common::require_one_child;
2-
use crate::distributed_planner::NetworkBoundary;
2+
use crate::distributed_planner::{NetworkBoundary, ProducerHead};
33
use crate::stage::{LocalStage, Stage};
44
use crate::worker::WorkerConnectionPool;
55
use crate::{BroadcastExec, DistributedTaskContext};
6-
use datafusion::common::tree_node::Transformed;
76
use datafusion::common::{Result, not_impl_err, plan_err};
87
use datafusion::error::DataFusionError;
98
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
@@ -123,21 +122,6 @@ pub struct NetworkBroadcastExec {
123122
}
124123

125124
impl NetworkBroadcastExec {
126-
pub(crate) fn scale_input(
127-
plan: Arc<dyn ExecutionPlan>,
128-
_consumer_partitions: usize,
129-
consumer_task_count: usize,
130-
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
131-
let Some(broadcast) = plan.downcast_ref::<BroadcastExec>() else {
132-
return Ok(Transformed::no(plan));
133-
};
134-
135-
Ok(Transformed::yes(Arc::new(BroadcastExec::new(
136-
require_one_child(broadcast.children())?,
137-
consumer_task_count,
138-
))))
139-
}
140-
141125
pub(crate) fn from_stage(input_stage: Stage, input_properties: Arc<PlanProperties>) -> Self {
142126
let input_partition_count = input_properties.partitioning.partition_count();
143127
let properties = Arc::new(
@@ -186,6 +170,13 @@ impl NetworkBoundary for NetworkBroadcastExec {
186170
fn input_stage(&self) -> &Stage {
187171
&self.input_stage
188172
}
173+
174+
fn producer_head(&self, consumer_task_count: usize) -> ProducerHead {
175+
let partition_count = self.properties.output_partitioning().partition_count();
176+
ProducerHead::BroadcastExec {
177+
output_partitions: partition_count * consumer_task_count,
178+
}
179+
}
189180
}
190181

191182
impl DisplayAs for NetworkBroadcastExec {
@@ -247,14 +238,16 @@ impl ExecutionPlan for NetworkBroadcastExec {
247238
};
248239

249240
let task_context = DistributedTaskContext::from_ctx(&context);
250-
let off = self.properties.partitioning.partition_count() * task_context.task_index;
241+
let out_partitions = self.properties.partitioning.partition_count();
242+
let off = out_partitions * task_context.task_index;
251243
let mut streams = Vec::with_capacity(self.input_stage.task_count());
252244

253245
for input_task_index in 0..self.input_stage.task_count() {
254246
let worker_connection = self.worker_connections.get_or_init_worker_connection(
255247
remote_stage,
256248
off..(off + self.properties.partitioning.partition_count()),
257249
input_task_index,
250+
self.producer_head(task_context.task_count),
258251
&context,
259252
)?;
260253

src/execution_plans/network_coalesce.rs

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
use crate::DistributedTaskContext;
22
use crate::common::require_one_child;
3-
use crate::distributed_planner::NetworkBoundary;
3+
use crate::distributed_planner::{NetworkBoundary, ProducerHead};
44
use crate::execution_plans::common::scale_partitioning_props;
55
use crate::stage::{LocalStage, Stage};
66
use crate::worker::WorkerConnectionPool;
7-
use datafusion::common::tree_node::Transformed;
87
use datafusion::common::{exec_err, not_impl_err, plan_err};
98
use datafusion::error::Result;
109
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
@@ -81,16 +80,6 @@ pub struct NetworkCoalesceExec {
8180
}
8281

8382
impl NetworkCoalesceExec {
84-
/// Does nothing, but it's here for explicitly stating that this network boundary does not
85-
/// need to mutate the input plan in other to account for more consumer tasks.
86-
pub(crate) fn scale_input(
87-
plan: Arc<dyn ExecutionPlan>,
88-
_consumer_partitions: usize,
89-
_consumer_task_count: usize,
90-
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
91-
Ok(Transformed::no(plan))
92-
}
93-
9483
pub(crate) fn from_stage(
9584
input_stage: Stage,
9685
input_properties: Arc<PlanProperties>,
@@ -187,6 +176,10 @@ impl NetworkBoundary for NetworkCoalesceExec {
187176
self_clone.input_stage = input_stage;
188177
Ok(Arc::new(self_clone))
189178
}
179+
180+
fn producer_head(&self, _consumer_task_count: usize) -> ProducerHead {
181+
ProducerHead::None
182+
}
190183
}
191184

192185
impl DisplayAs for NetworkCoalesceExec {
@@ -299,6 +292,7 @@ impl ExecutionPlan for NetworkCoalesceExec {
299292
remote_stage,
300293
0..partitions_per_task,
301294
target_task,
295+
self.producer_head(task_context.task_count),
302296
&context,
303297
)?;
304298

src/execution_plans/network_shuffle.rs

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use crate::common::require_one_child;
2+
use crate::distributed_planner::ProducerHead;
23
use crate::execution_plans::common::scale_partitioning;
34
use crate::stage::{LocalStage, Stage};
45
use crate::worker::WorkerConnectionPool;
56
use crate::{DistributedTaskContext, NetworkBoundary};
6-
use datafusion::common::tree_node::{Transformed, TreeNodeRecursion};
77
use datafusion::common::{Result, not_impl_err, plan_err};
88
use datafusion::error::DataFusionError;
99
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
@@ -105,26 +105,6 @@ pub struct NetworkShuffleExec {
105105
}
106106

107107
impl NetworkShuffleExec {
108-
pub(crate) fn scale_input(
109-
plan: Arc<dyn ExecutionPlan>,
110-
consumer_partitions: usize,
111-
consumer_task_count: usize,
112-
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
113-
let Some(repartition_exec) = plan.downcast_ref::<RepartitionExec>() else {
114-
return Ok(Transformed::no(plan));
115-
};
116-
117-
let child = require_one_child(repartition_exec.children())?;
118-
let partitioning = scale_partitioning(repartition_exec.partitioning(), |_| {
119-
consumer_partitions * consumer_task_count
120-
});
121-
122-
// Scale the input RepartitionExec to account for all the tasks to which it will
123-
// need to fan data out.
124-
let scaled = Arc::new(RepartitionExec::try_new(child, partitioning)?);
125-
Ok(Transformed::new(scaled, true, TreeNodeRecursion::Stop))
126-
}
127-
128108
pub(crate) fn from_stage(input_stage: Stage, input_properties: Arc<PlanProperties>) -> Self {
129109
Self {
130110
properties: input_properties,
@@ -170,6 +150,14 @@ impl NetworkBoundary for NetworkShuffleExec {
170150
self_clone.input_stage = input_stage;
171151
Ok(Arc::new(self_clone))
172152
}
153+
154+
fn producer_head(&self, consumer_task_count: usize) -> ProducerHead {
155+
ProducerHead::RepartitionExec {
156+
partitioning: scale_partitioning(&self.properties.partitioning, |prev| {
157+
prev * consumer_task_count
158+
}),
159+
}
160+
}
173161
}
174162

175163
impl DisplayAs for NetworkShuffleExec {
@@ -233,6 +221,7 @@ impl ExecutionPlan for NetworkShuffleExec {
233221
remote_stage,
234222
off..(off + self.properties.partitioning.partition_count()),
235223
input_task_index,
224+
self.producer_head(task_context.task_count),
236225
&context,
237226
)?;
238227

src/observability/service.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ impl ObservabilityService for ObservabilityServiceImpl {
9696
let total_partitions = task_data.total_partitions() as u64;
9797
let remaining = task_data.num_partitions_remaining() as u64;
9898
let completed_partitions = total_partitions.saturating_sub(remaining);
99-
let output_rows = output_rows_from_plan(&task_data.plan);
99+
let output_rows = output_rows_from_plan(&task_data.base_plan);
100100

101101
tasks.push(TaskProgress {
102102
task_key: Some((*internal_key).clone()),

0 commit comments

Comments
 (0)