Skip to content

Commit fc261f1

Browse files
committed
Refactor coordinator module and ensure cache invalidation on coordinator->worker stream drop
1 parent 41f359e commit fc261f1

11 files changed

Lines changed: 458 additions & 316 deletions

File tree

src/coordinator/distributed.rs

Lines changed: 75 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
use crate::common::{require_one_child, serialize_uuid};
22
use crate::coordinator::metrics_store::MetricsStore;
33
use crate::coordinator::prepare_static_plan::prepare_static_plan;
4+
use crate::coordinator::query_coordinator::QueryCoordinator;
45
use crate::distributed_planner::NetworkBoundaryExt;
56
use crate::worker::generated::worker::TaskKey;
67
use datafusion::common::internal_datafusion_err;
7-
use datafusion::common::runtime::JoinSet;
88
use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion};
99
use datafusion::common::{Result, exec_err};
1010
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
@@ -14,8 +14,7 @@ use datafusion::physical_plan::stream::RecordBatchReceiverStreamBuilder;
1414
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
1515
use futures::StreamExt;
1616
use std::fmt::Formatter;
17-
use std::sync::Arc;
18-
use std::sync::Mutex;
17+
use std::sync::{Arc, Mutex};
1918

2019
/// [ExecutionPlan] that executes the inner plan in distributed mode.
2120
/// Before executing it, two modifications are lazily performed on the plan:
@@ -26,22 +25,39 @@ use std::sync::Mutex;
2625
/// over the wire.
2726
#[derive(Debug)]
2827
pub struct DistributedExec {
29-
plan: Arc<dyn ExecutionPlan>,
30-
prepared_plan: Arc<Mutex<Option<Arc<dyn ExecutionPlan>>>>,
28+
/// Initial [ExecutionPlan] present before execution.
29+
/// - If the plan was distributed statically, this will be the final distributed plan with all
30+
/// the appropriate network boundaries in it.
31+
/// - If the plan is going to be distributed dynamically during execution, this is the initial
32+
/// non-distributed plan.
33+
base_plan: Arc<dyn ExecutionPlan>,
34+
/// Resulting [ExecutionPlan] after execution ready for visualization purposes.
35+
/// - If the plan was distributed statically, this is equal to the base plan.
36+
/// - If the plan is going to be distributed dynamically during execution, this is the resulting
37+
/// plan re-calculated based on runtime statistics.
38+
plan_for_viz: Arc<Mutex<Option<Arc<dyn ExecutionPlan>>>>,
39+
/// The head stage meant to be executed locally on [DistributedExec::execute].
40+
head_stage: Arc<Mutex<Option<Arc<dyn ExecutionPlan>>>>,
41+
/// DataFusion metrics.
3142
metrics: ExecutionPlanMetricsSet,
43+
/// Storage where metrics collected from workers at runtime will place their results as they
44+
/// finish their respective remote tasks.
3245
pub(crate) metrics_store: Option<Arc<MetricsStore>>,
3346
}
3447

3548
pub(super) struct PreparedPlan {
49+
/// The head stage meant to be executed locally by the coordinator.
3650
pub(super) head_stage: Arc<dyn ExecutionPlan>,
37-
pub(super) join_set: JoinSet<Result<()>>,
51+
/// A final representation of the plan for visualization purposes.
52+
pub(super) plan_for_viz: Arc<dyn ExecutionPlan>,
3853
}
3954

4055
impl DistributedExec {
41-
pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
56+
pub fn new(base_plan: Arc<dyn ExecutionPlan>) -> Self {
4257
Self {
43-
plan,
44-
prepared_plan: Arc::new(Mutex::new(None)),
58+
base_plan,
59+
plan_for_viz: Arc::new(Mutex::new(None)),
60+
head_stage: Arc::new(Mutex::new(None)),
4561
metrics: ExecutionPlanMetricsSet::new(),
4662
metrics_store: None,
4763
}
@@ -68,7 +84,10 @@ impl DistributedExec {
6884
let Some(task_metrics) = &self.metrics_store else {
6985
return;
7086
};
71-
let _ = self.plan.apply(|plan| {
87+
let Some(plan) = self.plan_for_viz.lock().unwrap().as_ref().cloned() else {
88+
return;
89+
};
90+
let _ = plan.apply(|plan| {
7291
if let Some(boundary) = plan.as_network_boundary() {
7392
let stage = boundary.input_stage();
7493
for i in 0..stage.task_count() {
@@ -93,15 +112,27 @@ impl DistributedExec {
93112
/// Returns the plan which is lazily prepared on `execute()` and actually gets executed.
94113
/// It is updated on every call to `execute()`. Returns an error if `.execute()` has not been
95114
/// called.
96-
pub(crate) fn prepared_plan(&self) -> Result<Arc<dyn ExecutionPlan>> {
97-
self.prepared_plan
115+
pub(crate) fn plan_for_viz(&self) -> Result<Arc<dyn ExecutionPlan>> {
116+
self.plan_for_viz
98117
.lock()
99118
.map_err(|e| internal_datafusion_err!("Failed to lock prepared plan: {}", e))?
100119
.clone()
101120
.ok_or_else(|| {
102121
internal_datafusion_err!("No prepared plan found. Was execute() called?")
103122
})
104123
}
124+
125+
/// Returns the head stage that was actually executed. Unlike [`Self::plan_for_viz`] (which is
126+
/// reconstructed for visualization, with `Stage::Local` boundaries and rebuilt ancestor
127+
/// `Arc`s), this returns the original `Arc` instances whose metrics were populated during
128+
/// execution.
129+
pub(crate) fn head_stage(&self) -> Result<Arc<dyn ExecutionPlan>> {
130+
self.head_stage
131+
.lock()
132+
.map_err(|e| internal_datafusion_err!("Failed to lock head stage: {}", e))?
133+
.clone()
134+
.ok_or_else(|| internal_datafusion_err!("No head stage found. Was execute() called?"))
135+
}
105136
}
106137

107138
impl DisplayAs for DistributedExec {
@@ -116,20 +147,21 @@ impl ExecutionPlan for DistributedExec {
116147
}
117148

118149
fn properties(&self) -> &Arc<PlanProperties> {
119-
self.plan.properties()
150+
self.base_plan.properties()
120151
}
121152

122153
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
123-
vec![&self.plan]
154+
vec![&self.base_plan]
124155
}
125156

126157
fn with_new_children(
127158
self: Arc<Self>,
128159
children: Vec<Arc<dyn ExecutionPlan>>,
129160
) -> Result<Arc<dyn ExecutionPlan>> {
130161
Ok(Arc::new(DistributedExec {
131-
plan: require_one_child(&children)?,
132-
prepared_plan: self.prepared_plan.clone(),
162+
base_plan: require_one_child(&children)?,
163+
plan_for_viz: Arc::new(Mutex::new(None)),
164+
head_stage: Arc::new(Mutex::new(None)),
133165
metrics: self.metrics.clone(),
134166
metrics_store: self.metrics_store.clone(),
135167
}))
@@ -150,36 +182,43 @@ impl ExecutionPlan for DistributedExec {
150182
);
151183
}
152184

153-
let PreparedPlan {
154-
head_stage,
155-
join_set,
156-
} = prepare_static_plan(&self.plan, &self.metrics, &self.metrics_store, &context)?;
157-
{
158-
let mut guard = self
159-
.prepared_plan
160-
.lock()
161-
.map_err(|e| internal_datafusion_err!("Failed to lock prepared plan: {e}"))?;
162-
*guard = Some(head_stage.clone());
163-
}
185+
let base_plan = Arc::clone(&self.base_plan);
186+
let plan_for_viz = Arc::clone(&self.plan_for_viz);
187+
let head_stage = Arc::clone(&self.head_stage);
188+
189+
let query_coordinator = QueryCoordinator::new(
190+
Arc::clone(&context),
191+
&self.metrics,
192+
self.metrics_store.clone(),
193+
);
194+
164195
let mut builder = RecordBatchReceiverStreamBuilder::new(self.schema(), 1);
165196
let tx = builder.tx();
166-
// Spawn the task that pulls data from child...
197+
167198
builder.spawn(async move {
168-
let mut stream = head_stage.execute(partition, context)?;
199+
let _guard = query_coordinator.end_query_guard();
200+
201+
let result = prepare_static_plan(&query_coordinator, &base_plan)?;
202+
203+
plan_for_viz
204+
.lock()
205+
.expect("poisoned lock")
206+
.replace(result.plan_for_viz);
207+
head_stage
208+
.lock()
209+
.expect("poisoned lock")
210+
.replace(Arc::clone(&result.head_stage));
211+
let mut stream = result.head_stage.execute(partition, context)?;
169212
while let Some(msg) = stream.next().await {
170213
if tx.send(msg).await.is_err() {
171214
break; // channel closed
172215
}
173216
}
217+
drop(tx);
218+
query_coordinator.drain_pending_tasks().await?;
174219
Ok(())
175220
});
176-
// ...in parallel to the one that feeds the plan to workers.
177-
builder.spawn(async move {
178-
for res in join_set.join_all().await {
179-
res?;
180-
}
181-
Ok(())
182-
});
221+
183222
Ok(builder.build())
184223
}
185224

src/coordinator/latency_metric.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
use datafusion::common::instant::Instant;
2+
use datafusion::physical_expr_common::metrics::{
3+
ExecutionPlanMetricsSet, MetricBuilder, MetricValue, Time,
4+
};
5+
use std::fmt::Display;
6+
use std::sync::atomic::{AtomicU64, Ordering};
7+
use std::time::Duration;
8+
9+
/// DataFusion metrics system is pretty limited from an API standpoint. This intermediate struct
10+
/// bridges the gaps that are not satisfied by upstream API for measuring latency.
11+
pub(super) struct LatencyMetric {
12+
max: Time,
13+
avg: Time,
14+
max_latency_micros: AtomicU64,
15+
sum_latency_micros: AtomicU64,
16+
count_latency_micros: AtomicU64,
17+
}
18+
19+
impl Drop for LatencyMetric {
20+
fn drop(&mut self) {
21+
self.max.add_duration(Duration::from_micros(
22+
self.max_latency_micros.load(Ordering::Relaxed),
23+
));
24+
self.avg.add_duration(Duration::from_micros(
25+
self.sum_latency_micros.load(Ordering::Relaxed)
26+
/ self.count_latency_micros.load(Ordering::Relaxed).max(1),
27+
));
28+
}
29+
}
30+
31+
impl LatencyMetric {
32+
pub(super) fn new(
33+
name: impl Display,
34+
builder: impl Fn(MetricBuilder) -> MetricBuilder,
35+
metrics: &ExecutionPlanMetricsSet,
36+
) -> Self {
37+
let max = Time::new();
38+
builder(MetricBuilder::new(metrics)).build(MetricValue::Time {
39+
name: format!("{name}_max").into(),
40+
time: max.clone(),
41+
});
42+
let avg = Time::new();
43+
builder(MetricBuilder::new(metrics)).build(MetricValue::Time {
44+
name: format!("{name}_avg").into(),
45+
time: avg.clone(),
46+
});
47+
Self {
48+
max,
49+
avg,
50+
max_latency_micros: AtomicU64::new(0),
51+
sum_latency_micros: AtomicU64::new(0),
52+
count_latency_micros: AtomicU64::new(0),
53+
}
54+
}
55+
56+
pub(super) fn record(&self, start: &Instant) {
57+
let micros = start.elapsed().as_micros() as u64;
58+
self.max_latency_micros.fetch_max(micros, Ordering::Relaxed);
59+
self.sum_latency_micros.fetch_add(micros, Ordering::Relaxed);
60+
self.count_latency_micros.fetch_add(1, Ordering::Relaxed);
61+
}
62+
}

src/coordinator/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
mod distributed;
2+
mod latency_metric;
23
mod metrics_store;
34
mod prepare_static_plan;
4-
mod task_spawner;
5+
mod query_coordinator;
56

67
pub use distributed::DistributedExec;
78
pub(crate) use metrics_store::MetricsStore;
Lines changed: 11 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,10 @@
1-
use crate::coordinator::MetricsStore;
21
use crate::coordinator::distributed::PreparedPlan;
3-
use crate::coordinator::task_spawner::{
4-
CoordinatorToWorkerMetrics, CoordinatorToWorkerTaskSpawner,
5-
};
2+
use crate::coordinator::query_coordinator::QueryCoordinator;
63
use crate::stage::RemoteStage;
7-
use crate::{
8-
DistributedConfig, NetworkBoundaryExt, Stage, TaskEstimator, TaskRoutingContext,
9-
get_distributed_worker_resolver,
10-
};
11-
use datafusion::common::runtime::JoinSet;
4+
use crate::{NetworkBoundaryExt, Stage};
125
use datafusion::common::tree_node::{Transformed, TreeNode};
136
use datafusion::common::{Result, exec_err};
14-
use datafusion::execution::TaskContext;
157
use datafusion::physical_plan::ExecutionPlan;
16-
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
17-
use rand::Rng;
188
use std::sync::Arc;
199

2010
/// Prepares the distributed plan for execution, which implies:
@@ -28,18 +18,9 @@ use std::sync::Arc;
2818
/// 4. Spawn a background task per worker that waits for the worker to finish and collects
2919
/// its metrics into [DistributedExec::task_metrics] via the coordinator channel.
3020
pub(super) fn prepare_static_plan(
21+
query_coordinator: &QueryCoordinator,
3122
base_plan: &Arc<dyn ExecutionPlan>,
32-
metrics: &ExecutionPlanMetricsSet,
33-
task_metrics: &Option<Arc<MetricsStore>>,
34-
ctx: &Arc<TaskContext>,
3523
) -> Result<PreparedPlan> {
36-
let worker_resolver = get_distributed_worker_resolver(ctx.session_config())?;
37-
38-
let available_urls = worker_resolver.get_urls()?;
39-
40-
let metrics = CoordinatorToWorkerMetrics::new(metrics);
41-
42-
let mut join_set = JoinSet::new();
4324
let prepared = Arc::clone(base_plan).transform_up(|plan| {
4425
// The following logic is just applied on network boundaries.
4526
let Some(plan) = plan.as_network_boundary() else {
@@ -50,46 +31,18 @@ pub(super) fn prepare_static_plan(
5031
return exec_err!("Input stage from network boundary was not in Local state");
5132
};
5233

53-
let d_cfg = DistributedConfig::from_config_options(ctx.session_config().options())?;
54-
let task_estimator = &d_cfg.__private_task_estimator;
55-
56-
let mut spawner =
57-
CoordinatorToWorkerTaskSpawner::new(stage, &metrics, task_metrics, ctx, &mut join_set)?;
34+
let mut stage_coordinator = query_coordinator.stage_coordinator(stage);
5835

59-
let routed_urls = match task_estimator.route_tasks(&TaskRoutingContext {
60-
task_ctx: Arc::clone(ctx),
61-
plan: &stage.plan,
62-
task_count: stage.tasks,
63-
available_urls: &available_urls,
64-
}) {
65-
Ok(Some(routed_urls)) => routed_urls,
66-
// If the user has not defined custom routing with a `route_tasks` implementation, we
67-
// default to round-robin task assignation from a randomized starting point.
68-
Ok(None) => {
69-
let start_idx = rand::rng().random_range(0..available_urls.len());
70-
(0..stage.tasks)
71-
.map(|i| available_urls[(start_idx + i) % available_urls.len()].clone())
72-
.collect()
73-
}
74-
Err(e) => return exec_err!("error routing tasks to workers: {e}"),
75-
};
76-
77-
if routed_urls.len() != stage.tasks {
78-
return exec_err!(
79-
"number of tasks ({}) was not equal to number of urls ({}) at execution time",
80-
stage.tasks,
81-
routed_urls.len()
82-
);
83-
}
36+
let routed_urls = stage_coordinator.routed_urls()?;
8437

8538
let mut workers = Vec::with_capacity(stage.tasks);
8639
for (i, routed_url) in routed_urls.into_iter().enumerate() {
8740
workers.push(routed_url.clone());
8841
// Spawn a task that sends the subplan to the chosen URL.
8942
// There will be as many spawned tasks as workers.
90-
let (tx, worker_rx) = spawner.send_plan_task(Arc::clone(ctx), i, routed_url)?;
91-
spawner.metrics_collection_task(i, worker_rx);
92-
spawner.work_unit_feed_task(Arc::clone(ctx), i, tx)?;
43+
let (worker_tx, worker_rx) = stage_coordinator.send_plan_task(i, routed_url)?;
44+
stage_coordinator.worker_to_coordinator_task(i, worker_rx);
45+
stage_coordinator.coordinator_to_worker_task(i, worker_tx)?;
9346
}
9447

9548
Ok(Transformed::yes(plan.with_input_stage(Stage::Remote(
@@ -102,6 +55,8 @@ pub(super) fn prepare_static_plan(
10255
})?;
10356
Ok(PreparedPlan {
10457
head_stage: prepared.data,
105-
join_set,
58+
// If the plan was statically planned, the base plan is the same one that will be used for
59+
// visualization.
60+
plan_for_viz: Arc::clone(base_plan),
10661
})
10762
}

0 commit comments

Comments
 (0)