Skip to content

Commit 583b6b9

Browse files
authored
Refactor task spawner into QueryCoordinator and StageCoordinator (#479)
This is one PR from the following stack of PRs: - #477 - #463 - #464 - #478 - #479 <- you are here - #486 - #432 --- This is a pure refactor PR with a couple of implementation detail changes: # Reducing the bloat of complex functions in already complicated parts of the codebase. For this, two key structs are introduced: - `QueryCoordinator`: scoped to a whole distributed query, it handles references to pieces of data global to a query's lifetime, like the `TaskContext`, the metrics, the `JoinSet` used for spawning tasks, etc... it's in charge also of building `StageCoordinator` instances, which are scoped per-stage instead of per-query. - `StageCoordinator`: this is the old `CoordinatorToWorkerTaskSpawner`, but with some more methods that allow reusability and some better naming. It handles all the comms between workers and coordinator needed for driving a stage forward. This allows reducing the bloat in `prepare_static_plan` and the future `prepare_dynamic_plan` functions. # Ensuring a coordinator->worker channel is held active for as long as the `DistributedExec` node is executing the query on the coordinator. For the static planner, this is a noop, as the previous model worked fine before, but this will become important in the future for the dynamic planner. In the dynamic planner, the plan can be set for some stages, but they might never reach execution, so instead of coupling the task entry cache invalidation to the task execution finish, it's coupled instead of the coordinator->channel lifetime. This has one collateral effect: `WorkUnit` feeds can no longer rely on the global `coordinator->worker` EOS signal for ensuring that no further `WorkUnit` feed is going to be sent by the coordinator, so they need an explicit EOS message that signals that no further `WorkUnit`s will be received, even though the `coordinator->worker` channel will still be alive for a while. # Add a `plan_for_viz` field in `DistributedExec`. This is a new slot in `DistributedExec` that holds a reference of the plan that is supposed to be rewritten with metrics for visualization purposes. Again, for the static planner this is a noop, because the plan meant for visualization is equal to the plan that arrived as child to `DistributedExec` on the first place. However, during dynamic planning, the plan that arrives to `DistributedExec` is not going to be the same as the final one after execution, so we need a slot for storing that final plan.
1 parent 579f3ac commit 583b6b9

11 files changed

Lines changed: 460 additions & 314 deletions

File tree

src/coordinator/distributed.rs

Lines changed: 75 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
use crate::common::{require_one_child, serialize_uuid};
22
use crate::coordinator::metrics_store::MetricsStore;
33
use crate::coordinator::prepare_static_plan::prepare_static_plan;
4+
use crate::coordinator::query_coordinator::QueryCoordinator;
45
use crate::distributed_planner::NetworkBoundaryExt;
56
use crate::worker::generated::worker::TaskKey;
67
use datafusion::common::internal_datafusion_err;
7-
use datafusion::common::runtime::JoinSet;
88
use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion};
99
use datafusion::common::{Result, exec_err};
1010
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
@@ -14,8 +14,7 @@ use datafusion::physical_plan::stream::RecordBatchReceiverStreamBuilder;
1414
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
1515
use futures::StreamExt;
1616
use std::fmt::Formatter;
17-
use std::sync::Arc;
18-
use std::sync::Mutex;
17+
use std::sync::{Arc, Mutex};
1918

2019
/// [ExecutionPlan] that executes the inner plan in distributed mode.
2120
/// Before executing it, two modifications are lazily performed on the plan:
@@ -26,22 +25,39 @@ use std::sync::Mutex;
2625
/// over the wire.
2726
#[derive(Debug)]
2827
pub struct DistributedExec {
29-
plan: Arc<dyn ExecutionPlan>,
30-
prepared_plan: Arc<Mutex<Option<Arc<dyn ExecutionPlan>>>>,
28+
/// Initial [ExecutionPlan] present before execution.
29+
/// - If the plan was distributed statically, this will be the final distributed plan with all
30+
/// the appropriate network boundaries in it.
31+
/// - If the plan is going to be distributed dynamically during execution, this is the initial
32+
/// non-distributed plan.
33+
base_plan: Arc<dyn ExecutionPlan>,
34+
/// Resulting [ExecutionPlan] after execution ready for visualization purposes.
35+
/// - If the plan was distributed statically, this is equal to the base plan.
36+
/// - If the plan is going to be distributed dynamically during execution, this is the resulting
37+
/// plan re-calculated based on runtime statistics.
38+
plan_for_viz: Arc<Mutex<Option<Arc<dyn ExecutionPlan>>>>,
39+
/// The head stage meant to be executed locally on [DistributedExec::execute].
40+
head_stage: Arc<Mutex<Option<Arc<dyn ExecutionPlan>>>>,
41+
/// DataFusion metrics.
3142
metrics: ExecutionPlanMetricsSet,
43+
/// Storage where metrics collected from workers at runtime will place their results as they
44+
/// finish their respective remote tasks.
3245
pub(crate) metrics_store: Option<Arc<MetricsStore>>,
3346
}
3447

3548
pub(super) struct PreparedPlan {
49+
/// The head stage meant to be executed locally by the coordinator.
3650
pub(super) head_stage: Arc<dyn ExecutionPlan>,
37-
pub(super) join_set: JoinSet<Result<()>>,
51+
/// A final representation of the plan for visualization purposes.
52+
pub(super) plan_for_viz: Arc<dyn ExecutionPlan>,
3853
}
3954

4055
impl DistributedExec {
41-
pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
56+
pub fn new(base_plan: Arc<dyn ExecutionPlan>) -> Self {
4257
Self {
43-
plan,
44-
prepared_plan: Arc::new(Mutex::new(None)),
58+
base_plan,
59+
plan_for_viz: Arc::new(Mutex::new(None)),
60+
head_stage: Arc::new(Mutex::new(None)),
4561
metrics: ExecutionPlanMetricsSet::new(),
4662
metrics_store: None,
4763
}
@@ -68,7 +84,10 @@ impl DistributedExec {
6884
let Some(task_metrics) = &self.metrics_store else {
6985
return;
7086
};
71-
let _ = self.plan.apply(|plan| {
87+
let Some(plan) = self.plan_for_viz.lock().unwrap().as_ref().cloned() else {
88+
return;
89+
};
90+
let _ = plan.apply(|plan| {
7291
if let Some(boundary) = plan.as_network_boundary() {
7392
let stage = boundary.input_stage();
7493
for i in 0..stage.task_count() {
@@ -93,15 +112,27 @@ impl DistributedExec {
93112
/// Returns the plan which is lazily prepared on `execute()` and actually gets executed.
94113
/// It is updated on every call to `execute()`. Returns an error if `.execute()` has not been
95114
/// called.
96-
pub(crate) fn prepared_plan(&self) -> Result<Arc<dyn ExecutionPlan>> {
97-
self.prepared_plan
115+
pub(crate) fn plan_for_viz(&self) -> Result<Arc<dyn ExecutionPlan>> {
116+
self.plan_for_viz
98117
.lock()
99118
.map_err(|e| internal_datafusion_err!("Failed to lock prepared plan: {}", e))?
100119
.clone()
101120
.ok_or_else(|| {
102121
internal_datafusion_err!("No prepared plan found. Was execute() called?")
103122
})
104123
}
124+
125+
/// Returns the head stage that was actually executed. Unlike [`Self::plan_for_viz`] (which is
126+
/// reconstructed for visualization, with `Stage::Local` boundaries and rebuilt ancestor
127+
/// `Arc`s), this returns the original `Arc` instances whose metrics were populated during
128+
/// execution.
129+
pub(crate) fn head_stage(&self) -> Result<Arc<dyn ExecutionPlan>> {
130+
self.head_stage
131+
.lock()
132+
.map_err(|e| internal_datafusion_err!("Failed to lock head stage: {}", e))?
133+
.clone()
134+
.ok_or_else(|| internal_datafusion_err!("No head stage found. Was execute() called?"))
135+
}
105136
}
106137

107138
impl DisplayAs for DistributedExec {
@@ -116,20 +147,21 @@ impl ExecutionPlan for DistributedExec {
116147
}
117148

118149
fn properties(&self) -> &Arc<PlanProperties> {
119-
self.plan.properties()
150+
self.base_plan.properties()
120151
}
121152

122153
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
123-
vec![&self.plan]
154+
vec![&self.base_plan]
124155
}
125156

126157
fn with_new_children(
127158
self: Arc<Self>,
128159
children: Vec<Arc<dyn ExecutionPlan>>,
129160
) -> Result<Arc<dyn ExecutionPlan>> {
130161
Ok(Arc::new(DistributedExec {
131-
plan: require_one_child(&children)?,
132-
prepared_plan: self.prepared_plan.clone(),
162+
base_plan: require_one_child(&children)?,
163+
plan_for_viz: Arc::new(Mutex::new(None)),
164+
head_stage: Arc::new(Mutex::new(None)),
133165
metrics: self.metrics.clone(),
134166
metrics_store: self.metrics_store.clone(),
135167
}))
@@ -150,36 +182,43 @@ impl ExecutionPlan for DistributedExec {
150182
);
151183
}
152184

153-
let PreparedPlan {
154-
head_stage,
155-
join_set,
156-
} = prepare_static_plan(&self.plan, &self.metrics, &self.metrics_store, &context)?;
157-
{
158-
let mut guard = self
159-
.prepared_plan
160-
.lock()
161-
.map_err(|e| internal_datafusion_err!("Failed to lock prepared plan: {e}"))?;
162-
*guard = Some(head_stage.clone());
163-
}
185+
let base_plan = Arc::clone(&self.base_plan);
186+
let plan_for_viz = Arc::clone(&self.plan_for_viz);
187+
let head_stage = Arc::clone(&self.head_stage);
188+
189+
let query_coordinator = QueryCoordinator::new(
190+
Arc::clone(&context),
191+
&self.metrics,
192+
self.metrics_store.clone(),
193+
);
194+
164195
let mut builder = RecordBatchReceiverStreamBuilder::new(self.schema(), 1);
165196
let tx = builder.tx();
166-
// Spawn the task that pulls data from child...
197+
167198
builder.spawn(async move {
168-
let mut stream = head_stage.execute(partition, context)?;
199+
let _guard = query_coordinator.end_query_guard();
200+
201+
let result = prepare_static_plan(&query_coordinator, &base_plan)?;
202+
203+
plan_for_viz
204+
.lock()
205+
.expect("poisoned lock")
206+
.replace(result.plan_for_viz);
207+
head_stage
208+
.lock()
209+
.expect("poisoned lock")
210+
.replace(Arc::clone(&result.head_stage));
211+
let mut stream = result.head_stage.execute(partition, context)?;
169212
while let Some(msg) = stream.next().await {
170213
if tx.send(msg).await.is_err() {
171214
break; // channel closed
172215
}
173216
}
217+
drop(tx);
218+
query_coordinator.drain_pending_tasks().await?;
174219
Ok(())
175220
});
176-
// ...in parallel to the one that feeds the plan to workers.
177-
builder.spawn(async move {
178-
for res in join_set.join_all().await {
179-
res?;
180-
}
181-
Ok(())
182-
});
221+
183222
Ok(builder.build())
184223
}
185224

src/coordinator/latency_metric.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
use datafusion::common::instant::Instant;
2+
use datafusion::physical_expr_common::metrics::{
3+
ExecutionPlanMetricsSet, MetricBuilder, MetricValue, Time,
4+
};
5+
use std::fmt::Display;
6+
use std::sync::atomic::{AtomicU64, Ordering};
7+
use std::time::Duration;
8+
9+
/// DataFusion metrics system is pretty limited from an API standpoint. This intermediate struct
10+
/// bridges the gaps that are not satisfied by upstream API for measuring latency.
11+
pub(super) struct LatencyMetric {
12+
max: Time,
13+
avg: Time,
14+
max_latency_micros: AtomicU64,
15+
sum_latency_micros: AtomicU64,
16+
count_latency_micros: AtomicU64,
17+
}
18+
19+
impl Drop for LatencyMetric {
20+
fn drop(&mut self) {
21+
self.max.add_duration(Duration::from_micros(
22+
self.max_latency_micros.load(Ordering::Relaxed),
23+
));
24+
self.avg.add_duration(Duration::from_micros(
25+
self.sum_latency_micros.load(Ordering::Relaxed)
26+
/ self.count_latency_micros.load(Ordering::Relaxed).max(1),
27+
));
28+
}
29+
}
30+
31+
impl LatencyMetric {
32+
pub(super) fn new(
33+
name: impl Display,
34+
builder: impl Fn(MetricBuilder) -> MetricBuilder,
35+
metrics: &ExecutionPlanMetricsSet,
36+
) -> Self {
37+
let max = Time::new();
38+
builder(MetricBuilder::new(metrics)).build(MetricValue::Time {
39+
name: format!("{name}_max").into(),
40+
time: max.clone(),
41+
});
42+
let avg = Time::new();
43+
builder(MetricBuilder::new(metrics)).build(MetricValue::Time {
44+
name: format!("{name}_avg").into(),
45+
time: avg.clone(),
46+
});
47+
Self {
48+
max,
49+
avg,
50+
max_latency_micros: AtomicU64::new(0),
51+
sum_latency_micros: AtomicU64::new(0),
52+
count_latency_micros: AtomicU64::new(0),
53+
}
54+
}
55+
56+
pub(super) fn record(&self, start: &Instant) {
57+
let micros = start.elapsed().as_micros() as u64;
58+
self.max_latency_micros.fetch_max(micros, Ordering::Relaxed);
59+
self.sum_latency_micros.fetch_add(micros, Ordering::Relaxed);
60+
self.count_latency_micros.fetch_add(1, Ordering::Relaxed);
61+
}
62+
}

src/coordinator/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
mod distributed;
2+
mod latency_metric;
23
mod metrics_store;
34
mod prepare_static_plan;
4-
mod task_spawner;
5+
mod query_coordinator;
56

67
pub use distributed::DistributedExec;
78
pub(crate) use metrics_store::MetricsStore;
Lines changed: 11 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,10 @@
1-
use crate::coordinator::MetricsStore;
21
use crate::coordinator::distributed::PreparedPlan;
3-
use crate::coordinator::task_spawner::{
4-
CoordinatorToWorkerMetrics, CoordinatorToWorkerTaskSpawner,
5-
};
2+
use crate::coordinator::query_coordinator::QueryCoordinator;
63
use crate::stage::RemoteStage;
7-
use crate::{
8-
DistributedConfig, NetworkBoundaryExt, Stage, TaskEstimator, TaskRoutingContext,
9-
get_distributed_worker_resolver,
10-
};
11-
use datafusion::common::runtime::JoinSet;
4+
use crate::{NetworkBoundaryExt, Stage};
125
use datafusion::common::tree_node::{Transformed, TreeNode};
136
use datafusion::common::{Result, exec_err};
14-
use datafusion::execution::TaskContext;
157
use datafusion::physical_plan::ExecutionPlan;
16-
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
17-
use rand::Rng;
188
use std::sync::Arc;
199

2010
/// Prepares the distributed plan for execution, which implies:
@@ -28,18 +18,9 @@ use std::sync::Arc;
2818
/// 4. Spawn a background task per worker that waits for the worker to finish and collects
2919
/// its metrics into [DistributedExec::task_metrics] via the coordinator channel.
3020
pub(super) fn prepare_static_plan(
21+
query_coordinator: &QueryCoordinator,
3122
base_plan: &Arc<dyn ExecutionPlan>,
32-
metrics: &ExecutionPlanMetricsSet,
33-
task_metrics: &Option<Arc<MetricsStore>>,
34-
ctx: &Arc<TaskContext>,
3523
) -> Result<PreparedPlan> {
36-
let worker_resolver = get_distributed_worker_resolver(ctx.session_config())?;
37-
38-
let available_urls = worker_resolver.get_urls()?;
39-
40-
let metrics = CoordinatorToWorkerMetrics::new(metrics);
41-
42-
let mut join_set = JoinSet::new();
4324
let prepared = Arc::clone(base_plan).transform_up(|plan| {
4425
// The following logic is just applied on network boundaries.
4526
let Some(plan) = plan.as_network_boundary() else {
@@ -50,46 +31,18 @@ pub(super) fn prepare_static_plan(
5031
return exec_err!("Input stage from network boundary was not in Local state");
5132
};
5233

53-
let d_cfg = DistributedConfig::from_config_options(ctx.session_config().options())?;
54-
let task_estimator = &d_cfg.__private_task_estimator;
55-
56-
let mut spawner =
57-
CoordinatorToWorkerTaskSpawner::new(stage, &metrics, task_metrics, ctx, &mut join_set)?;
34+
let mut stage_coordinator = query_coordinator.stage_coordinator(stage);
5835

59-
let routed_urls = match task_estimator.route_tasks(&TaskRoutingContext {
60-
task_ctx: Arc::clone(ctx),
61-
plan: &stage.plan,
62-
task_count: stage.tasks,
63-
available_urls: &available_urls,
64-
}) {
65-
Ok(Some(routed_urls)) => routed_urls,
66-
// If the user has not defined custom routing with a `route_tasks` implementation, we
67-
// default to round-robin task assignation from a randomized starting point.
68-
Ok(None) => {
69-
let start_idx = rand::rng().random_range(0..available_urls.len());
70-
(0..stage.tasks)
71-
.map(|i| available_urls[(start_idx + i) % available_urls.len()].clone())
72-
.collect()
73-
}
74-
Err(e) => return exec_err!("error routing tasks to workers: {e}"),
75-
};
76-
77-
if routed_urls.len() != stage.tasks {
78-
return exec_err!(
79-
"number of tasks ({}) was not equal to number of urls ({}) at execution time",
80-
stage.tasks,
81-
routed_urls.len()
82-
);
83-
}
36+
let routed_urls = stage_coordinator.routed_urls()?;
8437

8538
let mut workers = Vec::with_capacity(stage.tasks);
8639
for (i, routed_url) in routed_urls.into_iter().enumerate() {
8740
workers.push(routed_url.clone());
8841
// Spawn a task that sends the subplan to the chosen URL.
8942
// There will be as many spawned tasks as workers.
90-
let (tx, worker_rx) = spawner.send_plan_task(Arc::clone(ctx), i, routed_url)?;
91-
spawner.metrics_collection_task(i, worker_rx);
92-
spawner.work_unit_feed_task(Arc::clone(ctx), i, tx)?;
43+
let (worker_tx, worker_rx) = stage_coordinator.send_plan_task(i, routed_url)?;
44+
stage_coordinator.worker_to_coordinator_task(i, worker_rx);
45+
stage_coordinator.coordinator_to_worker_task(i, worker_tx)?;
9346
}
9447

9548
Ok(Transformed::yes(plan.with_input_stage(Stage::Remote(
@@ -102,6 +55,8 @@ pub(super) fn prepare_static_plan(
10255
})?;
10356
Ok(PreparedPlan {
10457
head_stage: prepared.data,
105-
join_set,
58+
// If the plan was statically planned, the base plan is the same one that will be used for
59+
// visualization.
60+
plan_for_viz: Arc::clone(base_plan),
10661
})
10762
}

0 commit comments

Comments
 (0)