Skip to content

Commit bda1c2d

Browse files
committed
Refactor coordinator module and ensure cache invalidation on coordinator->worker stream drop
1 parent 431d5a2 commit bda1c2d

10 files changed

Lines changed: 461 additions & 307 deletions

File tree

src/coordinator/distributed.rs

Lines changed: 75 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
use crate::common::{require_one_child, serialize_uuid};
22
use crate::coordinator::metrics_store::MetricsStore;
33
use crate::coordinator::prepare_static_plan::prepare_static_plan;
4+
use crate::coordinator::query_coordinator::QueryCoordinator;
45
use crate::distributed_planner::NetworkBoundaryExt;
56
use crate::worker::generated::worker::TaskKey;
67
use datafusion::common::internal_datafusion_err;
7-
use datafusion::common::runtime::JoinSet;
88
use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion};
99
use datafusion::common::{Result, exec_err};
1010
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
@@ -15,8 +15,7 @@ use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, Pla
1515
use futures::StreamExt;
1616
use std::any::Any;
1717
use std::fmt::Formatter;
18-
use std::sync::Arc;
19-
use std::sync::Mutex;
18+
use std::sync::{Arc, Mutex};
2019

2120
/// [ExecutionPlan] that executes the inner plan in distributed mode.
2221
/// Before executing it, two modifications are lazily performed on the plan:
@@ -27,22 +26,39 @@ use std::sync::Mutex;
2726
/// over the wire.
2827
#[derive(Debug)]
2928
pub struct DistributedExec {
30-
plan: Arc<dyn ExecutionPlan>,
31-
prepared_plan: Arc<Mutex<Option<Arc<dyn ExecutionPlan>>>>,
29+
/// Initial [ExecutionPlan] present before execution.
30+
/// - If the plan was distributed statically, this will be the final distributed plan with all
31+
/// the appropriate network boundaries in it.
32+
/// - If the plan is going to be distributed dynamically during execution, this is the initial
33+
/// non-distributed plan.
34+
base_plan: Arc<dyn ExecutionPlan>,
35+
/// Resulting [ExecutionPlan] after execution ready for visualization purposes.
36+
/// - If the plan was distributed statically, this is equal to the base plan.
37+
/// - If the plan is going to be distributed dynamically during execution, this is the resulting
38+
/// plan re-calculated based on runtime statistics.
39+
plan_for_viz: Arc<Mutex<Option<Arc<dyn ExecutionPlan>>>>,
40+
/// The head stage meant to be executed locally on [DistributedExec::execute].
41+
head_stage: Arc<Mutex<Option<Arc<dyn ExecutionPlan>>>>,
42+
/// DataFusion metrics.
3243
metrics: ExecutionPlanMetricsSet,
44+
/// Storage where metrics collected from workers at runtime will place their results as they
45+
/// finish their respective remote tasks.
3346
pub(crate) metrics_store: Option<Arc<MetricsStore>>,
3447
}
3548

3649
pub(super) struct PreparedPlan {
50+
/// The head stage meant to be executed locally by the coordinator.
3751
pub(super) head_stage: Arc<dyn ExecutionPlan>,
38-
pub(super) join_set: JoinSet<Result<()>>,
52+
/// A final representation of the plan for visualization purposes.
53+
pub(super) plan_for_viz: Arc<dyn ExecutionPlan>,
3954
}
4055

4156
impl DistributedExec {
42-
pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
57+
pub fn new(base_plan: Arc<dyn ExecutionPlan>) -> Self {
4358
Self {
44-
plan,
45-
prepared_plan: Arc::new(Mutex::new(None)),
59+
base_plan,
60+
plan_for_viz: Arc::new(Mutex::new(None)),
61+
head_stage: Arc::new(Mutex::new(None)),
4662
metrics: ExecutionPlanMetricsSet::new(),
4763
metrics_store: None,
4864
}
@@ -69,7 +85,10 @@ impl DistributedExec {
6985
let Some(task_metrics) = &self.metrics_store else {
7086
return;
7187
};
72-
let _ = self.plan.apply(|plan| {
88+
let Some(plan) = self.plan_for_viz.lock().unwrap().as_ref().cloned() else {
89+
return;
90+
};
91+
let _ = plan.apply(|plan| {
7392
if let Some(boundary) = plan.as_network_boundary() {
7493
let stage = boundary.input_stage();
7594
for i in 0..stage.task_count() {
@@ -94,15 +113,27 @@ impl DistributedExec {
94113
/// Returns the plan which is lazily prepared on `execute()` and actually gets executed.
95114
/// It is updated on every call to `execute()`. Returns an error if `.execute()` has not been
96115
/// called.
97-
pub(crate) fn prepared_plan(&self) -> Result<Arc<dyn ExecutionPlan>> {
98-
self.prepared_plan
116+
pub(crate) fn plan_for_viz(&self) -> Result<Arc<dyn ExecutionPlan>> {
117+
self.plan_for_viz
99118
.lock()
100119
.map_err(|e| internal_datafusion_err!("Failed to lock prepared plan: {}", e))?
101120
.clone()
102121
.ok_or_else(|| {
103122
internal_datafusion_err!("No prepared plan found. Was execute() called?")
104123
})
105124
}
125+
126+
/// Returns the head stage that was actually executed. Unlike [`Self::plan_for_viz`] (which is
127+
/// reconstructed for visualization, with `Stage::Local` boundaries and rebuilt ancestor
128+
/// `Arc`s), this returns the original `Arc` instances whose metrics were populated during
129+
/// execution.
130+
pub(crate) fn head_stage(&self) -> Result<Arc<dyn ExecutionPlan>> {
131+
self.head_stage
132+
.lock()
133+
.map_err(|e| internal_datafusion_err!("Failed to lock head stage: {}", e))?
134+
.clone()
135+
.ok_or_else(|| internal_datafusion_err!("No head stage found. Was execute() called?"))
136+
}
106137
}
107138

108139
impl DisplayAs for DistributedExec {
@@ -121,20 +152,21 @@ impl ExecutionPlan for DistributedExec {
121152
}
122153

123154
fn properties(&self) -> &Arc<PlanProperties> {
124-
self.plan.properties()
155+
self.base_plan.properties()
125156
}
126157

127158
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
128-
vec![&self.plan]
159+
vec![&self.base_plan]
129160
}
130161

131162
fn with_new_children(
132163
self: Arc<Self>,
133164
children: Vec<Arc<dyn ExecutionPlan>>,
134165
) -> Result<Arc<dyn ExecutionPlan>> {
135166
Ok(Arc::new(DistributedExec {
136-
plan: require_one_child(&children)?,
137-
prepared_plan: self.prepared_plan.clone(),
167+
base_plan: require_one_child(&children)?,
168+
plan_for_viz: Arc::new(Mutex::new(None)),
169+
head_stage: Arc::new(Mutex::new(None)),
138170
metrics: self.metrics.clone(),
139171
metrics_store: self.metrics_store.clone(),
140172
}))
@@ -155,36 +187,43 @@ impl ExecutionPlan for DistributedExec {
155187
);
156188
}
157189

158-
let PreparedPlan {
159-
head_stage,
160-
join_set,
161-
} = prepare_static_plan(&self.plan, &self.metrics, &self.metrics_store, &context)?;
162-
{
163-
let mut guard = self
164-
.prepared_plan
165-
.lock()
166-
.map_err(|e| internal_datafusion_err!("Failed to lock prepared plan: {e}"))?;
167-
*guard = Some(head_stage.clone());
168-
}
190+
let base_plan = Arc::clone(&self.base_plan);
191+
let plan_for_viz = Arc::clone(&self.plan_for_viz);
192+
let head_stage = Arc::clone(&self.head_stage);
193+
194+
let query_coordinator = QueryCoordinator::new(
195+
Arc::clone(&context),
196+
&self.metrics,
197+
self.metrics_store.clone(),
198+
);
199+
169200
let mut builder = RecordBatchReceiverStreamBuilder::new(self.schema(), 1);
170201
let tx = builder.tx();
171-
// Spawn the task that pulls data from child...
202+
172203
builder.spawn(async move {
173-
let mut stream = head_stage.execute(partition, context)?;
204+
let _guard = query_coordinator.end_query_guard();
205+
206+
let result = prepare_static_plan(&query_coordinator, &base_plan)?;
207+
208+
plan_for_viz
209+
.lock()
210+
.expect("poisoned lock")
211+
.replace(result.plan_for_viz);
212+
head_stage
213+
.lock()
214+
.expect("poisoned lock")
215+
.replace(Arc::clone(&result.head_stage));
216+
let mut stream = result.head_stage.execute(partition, context)?;
174217
while let Some(msg) = stream.next().await {
175218
if tx.send(msg).await.is_err() {
176219
break; // channel closed
177220
}
178221
}
222+
drop(tx);
223+
query_coordinator.drain_pending_tasks().await?;
179224
Ok(())
180225
});
181-
// ...in parallel to the one that feeds the plan to workers.
182-
builder.spawn(async move {
183-
for res in join_set.join_all().await {
184-
res?;
185-
}
186-
Ok(())
187-
});
226+
188227
Ok(builder.build())
189228
}
190229

src/coordinator/latency_metric.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
use datafusion::common::instant::Instant;
2+
use datafusion::physical_expr_common::metrics::{
3+
ExecutionPlanMetricsSet, MetricBuilder, MetricValue, Time,
4+
};
5+
use std::fmt::Display;
6+
use std::sync::atomic::{AtomicU64, Ordering};
7+
use std::time::Duration;
8+
9+
/// DataFusion metrics system is pretty limited from an API standpoint. This intermediate struct
10+
/// bridges the gaps that are not satisfied by upstream API for measuring latency.
11+
pub(super) struct LatencyMetric {
12+
max: Time,
13+
avg: Time,
14+
max_latency_micros: AtomicU64,
15+
sum_latency_micros: AtomicU64,
16+
count_latency_micros: AtomicU64,
17+
}
18+
19+
impl Drop for LatencyMetric {
20+
fn drop(&mut self) {
21+
self.max.add_duration(Duration::from_micros(
22+
self.max_latency_micros.load(Ordering::Relaxed),
23+
));
24+
self.avg.add_duration(Duration::from_micros(
25+
self.sum_latency_micros.load(Ordering::Relaxed)
26+
/ self.count_latency_micros.load(Ordering::Relaxed).max(1),
27+
));
28+
}
29+
}
30+
31+
impl LatencyMetric {
32+
pub(super) fn new(
33+
name: impl Display,
34+
builder: impl Fn(MetricBuilder) -> MetricBuilder,
35+
metrics: &ExecutionPlanMetricsSet,
36+
) -> Self {
37+
let max = Time::new();
38+
builder(MetricBuilder::new(metrics)).build(MetricValue::Time {
39+
name: format!("{name}_max").into(),
40+
time: max.clone(),
41+
});
42+
let avg = Time::new();
43+
builder(MetricBuilder::new(metrics)).build(MetricValue::Time {
44+
name: format!("{name}_avg").into(),
45+
time: avg.clone(),
46+
});
47+
Self {
48+
max,
49+
avg,
50+
max_latency_micros: AtomicU64::new(0),
51+
sum_latency_micros: AtomicU64::new(0),
52+
count_latency_micros: AtomicU64::new(0),
53+
}
54+
}
55+
56+
pub(super) fn record(&self, start: &Instant) {
57+
let micros = start.elapsed().as_micros() as u64;
58+
self.max_latency_micros.fetch_max(micros, Ordering::Relaxed);
59+
self.sum_latency_micros.fetch_add(micros, Ordering::Relaxed);
60+
self.count_latency_micros.fetch_add(1, Ordering::Relaxed);
61+
}
62+
}

src/coordinator/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
mod distributed;
2+
mod latency_metric;
23
mod metrics_store;
34
mod prepare_static_plan;
4-
mod task_spawner;
5+
mod query_coordinator;
56

67
pub use distributed::DistributedExec;
78
pub(crate) use metrics_store::MetricsStore;
Lines changed: 11 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,10 @@
1-
use crate::coordinator::MetricsStore;
21
use crate::coordinator::distributed::PreparedPlan;
3-
use crate::coordinator::task_spawner::{
4-
CoordinatorToWorkerMetrics, CoordinatorToWorkerTaskSpawner,
5-
};
2+
use crate::coordinator::query_coordinator::QueryCoordinator;
63
use crate::stage::RemoteStage;
7-
use crate::{
8-
DistributedConfig, NetworkBoundaryExt, Stage, TaskEstimator, TaskRoutingContext,
9-
get_distributed_worker_resolver,
10-
};
11-
use datafusion::common::runtime::JoinSet;
4+
use crate::{NetworkBoundaryExt, Stage};
125
use datafusion::common::tree_node::{Transformed, TreeNode};
136
use datafusion::common::{Result, exec_err};
14-
use datafusion::execution::TaskContext;
157
use datafusion::physical_plan::ExecutionPlan;
16-
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
17-
use rand::Rng;
188
use std::sync::Arc;
199

2010
/// Prepares the distributed plan for execution, which implies:
@@ -28,18 +18,9 @@ use std::sync::Arc;
2818
/// 4. Spawn a background task per worker that waits for the worker to finish and collects
2919
/// its metrics into [DistributedExec::task_metrics] via the coordinator channel.
3020
pub(super) fn prepare_static_plan(
21+
query_coordinator: &QueryCoordinator,
3122
base_plan: &Arc<dyn ExecutionPlan>,
32-
metrics: &ExecutionPlanMetricsSet,
33-
task_metrics: &Option<Arc<MetricsStore>>,
34-
ctx: &Arc<TaskContext>,
3523
) -> Result<PreparedPlan> {
36-
let worker_resolver = get_distributed_worker_resolver(ctx.session_config())?;
37-
38-
let available_urls = worker_resolver.get_urls()?;
39-
40-
let metrics = CoordinatorToWorkerMetrics::new(metrics);
41-
42-
let mut join_set = JoinSet::new();
4324
let prepared = Arc::clone(base_plan).transform_up(|plan| {
4425
// The following logic is just applied on network boundaries.
4526
let Some(plan) = plan.as_network_boundary() else {
@@ -50,46 +31,18 @@ pub(super) fn prepare_static_plan(
5031
return exec_err!("Input stage from network boundary was not in Local state");
5132
};
5233

53-
let d_cfg = DistributedConfig::from_config_options(ctx.session_config().options())?;
54-
let task_estimator = &d_cfg.__private_task_estimator;
55-
56-
let mut spawner =
57-
CoordinatorToWorkerTaskSpawner::new(stage, &metrics, task_metrics, ctx, &mut join_set)?;
34+
let mut stage_coordinator = query_coordinator.stage_coordinator(stage);
5835

59-
let routed_urls = match task_estimator.route_tasks(&TaskRoutingContext {
60-
task_ctx: Arc::clone(ctx),
61-
plan: &stage.plan,
62-
task_count: stage.tasks,
63-
available_urls: &available_urls,
64-
}) {
65-
Ok(Some(routed_urls)) => routed_urls,
66-
// If the user has not defined custom routing with a `route_tasks` implementation, we
67-
// default to round-robin task assignation from a randomized starting point.
68-
Ok(None) => {
69-
let start_idx = rand::rng().random_range(0..available_urls.len());
70-
(0..stage.tasks)
71-
.map(|i| available_urls[(start_idx + i) % available_urls.len()].clone())
72-
.collect()
73-
}
74-
Err(e) => return exec_err!("error routing tasks to workers: {e}"),
75-
};
76-
77-
if routed_urls.len() != stage.tasks {
78-
return exec_err!(
79-
"number of tasks ({}) was not equal to number of urls ({}) at execution time",
80-
stage.tasks,
81-
routed_urls.len()
82-
);
83-
}
36+
let routed_urls = stage_coordinator.routed_urls()?;
8437

8538
let mut workers = Vec::with_capacity(stage.tasks);
8639
for (i, routed_url) in routed_urls.into_iter().enumerate() {
8740
workers.push(routed_url.clone());
8841
// Spawn a task that sends the subplan to the chosen URL.
8942
// There will be as many spawned tasks as workers.
90-
let (tx, worker_rx) = spawner.send_plan_task(Arc::clone(ctx), i, routed_url)?;
91-
spawner.metrics_collection_task(i, worker_rx);
92-
spawner.work_unit_feed_task(Arc::clone(ctx), i, tx)?;
43+
let (worker_tx, worker_rx) = stage_coordinator.send_plan_task(i, routed_url)?;
44+
stage_coordinator.worker_to_coordinator_task(i, worker_rx);
45+
stage_coordinator.coordinator_to_worker_task(i, worker_tx)?;
9346
}
9447

9548
Ok(Transformed::yes(plan.with_input_stage(Stage::Remote(
@@ -102,6 +55,8 @@ pub(super) fn prepare_static_plan(
10255
})?;
10356
Ok(PreparedPlan {
10457
head_stage: prepared.data,
105-
join_set,
58+
// If the plan was statically planned, the base plan is the same one that will be used for
59+
// visualization.
60+
plan_for_viz: Arc::clone(base_plan),
10661
})
10762
}

0 commit comments

Comments
 (0)