Skip to content

Commit 29eeae8

Browse files
committed
Added an in-process InMemoryWorkerTransport.
This PR adds a `WorkerTransport` that hosts its workers in the current process: plans are delivered with a direct `Worker::set_task_plan` call and partitions are read straight from the local task registry, with no gRPC underneath. It is the reference implementation for the transport extension points and the basis for running distributed plans without the Flight stack. To keep the dispatch paths from drifting, plan delivery is factored into transport-neutral pieces both transports share: `encode_task_plan` (task specialization + codec), the `Worker::set_task_plan` core that the Flight coordinator stream now wraps, and `collect_task_work_unit_feeds`. The in-memory read side runs one `execute_local_task` over the whole partition range and pumps each partition into a buffer, so a consumer that interleaves partition polls of a partitioned join can't leave partitions empty.
1 parent 83dcfc2 commit 29eeae8

11 files changed

Lines changed: 753 additions & 172 deletions

File tree

src/coordinator/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
mod distributed;
22
mod latency_metric;
33
mod metrics_store;
4+
mod plan_encoding;
45
mod prepare_static_plan;
56
mod query_coordinator;
67

78
pub use distributed::DistributedExec;
89
pub use latency_metric::LatencyMetric;
910
pub use metrics_store::MetricsStore;
10-
pub(crate) use query_coordinator::FlightWorkerDispatch;
11+
pub use plan_encoding::{EncodedTaskPlan, encode_task_plan};
12+
pub(crate) use query_coordinator::{CoordinatorToWorkerMetrics, FlightWorkerDispatch};

src/coordinator/plan_encoding.rs

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
use crate::common::{TreeNodeExt, serialize_uuid};
2+
use crate::execution_plans::{ChildrenIsolatorUnionExec, DistributedLeafExec};
3+
use crate::worker::generated::worker::set_plan_request::WorkUnitFeedDeclaration;
4+
use crate::{DistributedCodec, DistributedConfig, DistributedTaskContext};
5+
use datafusion::common::Result;
6+
use datafusion::common::tree_node::Transformed;
7+
use datafusion::physical_plan::ExecutionPlan;
8+
use datafusion::prelude::SessionConfig;
9+
use datafusion_proto::physical_plan::AsExecutionPlan;
10+
use datafusion_proto::protobuf::PhysicalPlanNode;
11+
use prost::Message;
12+
use std::sync::Arc;
13+
14+
/// A stage plan specialized for one task and encoded for delivery, together with the work-unit
15+
/// feeds it declares.
16+
pub struct EncodedTaskPlan {
17+
pub plan_proto: Vec<u8>,
18+
pub feed_declarations: Vec<WorkUnitFeedDeclaration>,
19+
/// Output partitions of the specialized plan. Task-isolated nodes (a union running one child
20+
/// per task) make this differ from the unspecialized stage plan, so a push-based transport
21+
/// sizes its per-task sinks from this. The pull transports ignore it; a push transport (such
22+
/// as the shared-memory one) reads it.
23+
pub partitions: usize,
24+
}
25+
26+
/// Specializes `plan` for `task_index` and encodes it with the session's combined codec.
27+
///
28+
/// Shared by every transport's dispatch path so task specialization and codec selection cannot
29+
/// drift between transports: each task must see its own slice of task-isolated nodes (a union
30+
/// executing one child per task, an unexecuted [DistributedLeafExec] variant), and the worker
31+
/// decodes with the same codec stack the coordinator encoded with.
32+
pub fn encode_task_plan(
33+
plan: &Arc<dyn ExecutionPlan>,
34+
task_index: usize,
35+
task_count: usize,
36+
cfg: &SessionConfig,
37+
) -> Result<EncodedTaskPlan> {
38+
let d_cfg = DistributedConfig::from_config_options(cfg.options())?;
39+
let wuf_registry = &d_cfg.__private_work_unit_feed_registry;
40+
41+
let mut feed_declarations = vec![];
42+
let d_ctx = DistributedTaskContext {
43+
task_index,
44+
task_count,
45+
};
46+
47+
let specialized = Arc::clone(plan).transform_down_with_dt_ctx(d_ctx, |plan, d_ctx| {
48+
if let Some(wuf) = wuf_registry.get_work_unit_feed(&plan) {
49+
feed_declarations.push(WorkUnitFeedDeclaration {
50+
id: serialize_uuid(&wuf.id()),
51+
partitions: plan.properties().partitioning.partition_count() as u64,
52+
});
53+
};
54+
55+
if let Some(ciu) = plan.downcast_ref::<ChildrenIsolatorUnionExec>() {
56+
let ciu = ciu.to_task_specialized(d_ctx.task_index);
57+
return Ok(Transformed::yes(Arc::new(ciu)));
58+
};
59+
60+
if let Some(dle) = plan.downcast_ref::<DistributedLeafExec>() {
61+
let specialized = dle.to_task_specialized(d_ctx.task_index);
62+
return Ok(Transformed::yes(specialized));
63+
}
64+
65+
Ok(Transformed::no(plan))
66+
})?;
67+
68+
let codec = DistributedCodec::new_combined_with_user(cfg);
69+
let partitions = specialized.data.properties().partitioning.partition_count();
70+
let plan_proto =
71+
PhysicalPlanNode::try_from_physical_plan(specialized.data, &codec)?.encode_to_vec();
72+
73+
Ok(EncodedTaskPlan {
74+
plan_proto,
75+
feed_declarations,
76+
partitions,
77+
})
78+
}

src/coordinator/query_coordinator.rs

Lines changed: 13 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -2,32 +2,28 @@ use crate::common::{TreeNodeExt, now_ns, serialize_uuid, task_ctx_with_extension
22
use crate::config_extension_ext::get_config_extension_propagation_headers;
33
use crate::coordinator::MetricsStore;
44
use crate::coordinator::latency_metric::LatencyMetric;
5-
use crate::execution_plans::{ChildrenIsolatorUnionExec, DistributedLeafExec};
5+
use crate::coordinator::plan_encoding::encode_task_plan;
66
use crate::passthrough_headers::get_passthrough_headers;
77
use crate::protobuf::tonic_status_to_datafusion_error;
88
use crate::work_unit_feed::{build_work_unit_batch_msg, set_work_unit_send_time};
99
use crate::worker::generated::worker as pb;
1010
use crate::worker::generated::worker::coordinator_to_worker_msg::Inner;
11-
use crate::worker::generated::worker::set_plan_request::WorkUnitFeedDeclaration;
1211
use crate::worker::{WorkerDispatch, WorkerDispatchRequest};
1312
use crate::{
14-
BytesCounterMetric, BytesMetricExt, DISTRIBUTED_DATAFUSION_TASK_ID_LABEL, DistributedCodec,
15-
DistributedConfig, DistributedTaskContext, DistributedWorkUnitFeedContext, TaskKey,
13+
BytesCounterMetric, BytesMetricExt, DISTRIBUTED_DATAFUSION_TASK_ID_LABEL, DistributedConfig,
14+
DistributedTaskContext, DistributedWorkUnitFeedContext, TaskKey,
1615
get_distributed_channel_resolver,
1716
};
1817
use datafusion::common::Result;
1918
use datafusion::common::instant::Instant;
2019
use datafusion::common::runtime::JoinSet;
21-
use datafusion::common::tree_node::{Transformed, TreeNodeRecursion};
20+
use datafusion::common::tree_node::TreeNodeRecursion;
2221
use datafusion::common::{DataFusionError, exec_datafusion_err};
2322
use datafusion::execution::TaskContext;
2423
use datafusion::physical_expr_common::metrics::{ExecutionPlanMetricsSet, Label, MetricBuilder};
2524
use datafusion::physical_plan::ExecutionPlan;
26-
use datafusion_proto::physical_plan::AsExecutionPlan;
27-
use datafusion_proto::protobuf::PhysicalPlanNode;
2825
use futures::{Stream, StreamExt};
2926
use http::Extensions;
30-
use prost::Message;
3127
use std::sync::{Arc, OnceLock};
3228
use tokio::sync::Notify;
3329
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
@@ -128,13 +124,9 @@ impl<'a> StageCoordinator<'a> {
128124
UnboundedReceiver<pb::WorkerToCoordinatorMsg>,
129125
)> {
130126
let session_config = self.task_ctx.session_config();
131-
let codec = DistributedCodec::new_combined_with_user(session_config);
132127

133-
let (specialized, work_unit_feed_declarations) = self.task_specialized_plan(task_i)?;
134-
135-
let plan_proto =
136-
PhysicalPlanNode::try_from_physical_plan(specialized, &codec)?.encode_to_vec();
137-
let plan_size = plan_proto.len();
128+
let encoded = encode_task_plan(self.plan, task_i, self.task_count, session_config)?;
129+
let plan_size = encoded.plan_proto.len();
138130

139131
let task_key = TaskKey {
140132
query_id: serialize_uuid(&self.query_id),
@@ -143,10 +135,10 @@ impl<'a> StageCoordinator<'a> {
143135
};
144136
let msg = pb::CoordinatorToWorkerMsg {
145137
inner: Some(Inner::SetPlanRequest(pb::SetPlanRequest {
146-
plan_proto,
138+
plan_proto: encoded.plan_proto,
147139
task_count: self.task_count as u64,
148140
task_key: Some(task_key.clone()),
149-
work_unit_feed_declarations,
141+
work_unit_feed_declarations: encoded.feed_declarations,
150142
target_worker_url: url.to_string(),
151143
query_start_time_ns: self.metrics.instantiation_time,
152144
})),
@@ -309,48 +301,6 @@ impl<'a> StageCoordinator<'a> {
309301
});
310302
Ok(())
311303
}
312-
313-
/// Specializes the [Arc<dyn ExecutionPlan>] for this stage to provided task index. This implies
314-
/// trimming down any unnecessary information that the specific `task_i` task is not going to
315-
/// need, like unexecuted branches in [ChildrenIsolatorUnionExec], or unexecuted variants of
316-
/// [DistributedLeafExec].
317-
fn task_specialized_plan(
318-
&self,
319-
task_i: usize,
320-
) -> Result<(Arc<dyn ExecutionPlan>, Vec<WorkUnitFeedDeclaration>)> {
321-
let session_config = self.task_ctx.session_config();
322-
let d_cfg = DistributedConfig::from_config_options(session_config.options())?;
323-
let wuf_registry = &d_cfg.__private_work_unit_feed_registry;
324-
325-
let mut work_unit_feed_declarations = vec![];
326-
let d_ctx = DistributedTaskContext {
327-
task_index: task_i,
328-
task_count: self.task_count,
329-
};
330-
331-
let plan = Arc::clone(self.plan);
332-
let transformed = plan.transform_down_with_dt_ctx(d_ctx, |plan, d_ctx| {
333-
if let Some(wuf) = wuf_registry.get_work_unit_feed(&plan) {
334-
work_unit_feed_declarations.push(WorkUnitFeedDeclaration {
335-
id: serialize_uuid(&wuf.id()),
336-
partitions: plan.properties().partitioning.partition_count() as u64,
337-
});
338-
};
339-
340-
if let Some(ciu) = plan.downcast_ref::<ChildrenIsolatorUnionExec>() {
341-
let ciu = ciu.to_task_specialized(d_ctx.task_index);
342-
return Ok(Transformed::yes(Arc::new(ciu)));
343-
};
344-
345-
if let Some(dle) = plan.downcast_ref::<DistributedLeafExec>() {
346-
let specialized = dle.to_task_specialized(d_ctx.task_index);
347-
return Ok(Transformed::yes(specialized));
348-
}
349-
350-
Ok(Transformed::no(plan))
351-
})?;
352-
Ok((transformed.data, work_unit_feed_declarations))
353-
}
354304
}
355305

356306
fn keep_stream_alive<T: 'static>(notify: Arc<Notify>) -> impl Stream<Item = T> + 'static {
@@ -359,14 +309,14 @@ fn keep_stream_alive<T: 'static>(notify: Arc<Notify>) -> impl Stream<Item = T> +
359309

360310
/// Metrics that measure network details about communications between [DistributedExec] and a worker.
361311
#[derive(Clone)]
362-
pub(super) struct CoordinatorToWorkerMetrics {
363-
pub(super) plan_bytes_sent: BytesCounterMetric,
364-
pub(super) plan_send_latency: Arc<LatencyMetric>,
365-
pub(super) instantiation_time: u64,
312+
pub(crate) struct CoordinatorToWorkerMetrics {
313+
pub(crate) plan_bytes_sent: BytesCounterMetric,
314+
pub(crate) plan_send_latency: Arc<LatencyMetric>,
315+
pub(crate) instantiation_time: u64,
366316
}
367317

368318
impl CoordinatorToWorkerMetrics {
369-
pub(super) fn new(metrics: &ExecutionPlanMetricsSet) -> Self {
319+
pub(crate) fn new(metrics: &ExecutionPlanMetricsSet) -> Self {
370320
Self {
371321
// Metric that measures to total sum of bytes worth of subplans sent.
372322
plan_bytes_sent: MetricBuilder::new(metrics)

src/lib.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ pub use common::{
2424
TreeNodeExt, deserialize_uuid, get_distributed_cancellation_token, serialize_uuid,
2525
};
2626
pub use config_extension_ext::get_config_extension_propagation_headers;
27-
pub use coordinator::{DistributedExec, LatencyMetric, MetricsStore};
27+
pub use coordinator::{
28+
DistributedExec, EncodedTaskPlan, LatencyMetric, MetricsStore, encode_task_plan,
29+
};
2830
pub use distributed_ext::DistributedExt;
2931
pub use distributed_planner::{
3032
DistributedConfig, NetworkBoundary, NetworkBoundaryExt, PartitionRoute, ProducerHead,
@@ -51,15 +53,18 @@ pub use stage::{
5153
explain_analyze,
5254
};
5355
pub use work_unit_feed::{
54-
DistributedWorkUnitFeedContext, WorkUnit, WorkUnitFeed, WorkUnitFeedProto, WorkUnitFeedProvider,
56+
DistributedWorkUnitFeedContext, RemoteWorkUnitFeedRegistry, RemoteWorkUnitFeedRxs,
57+
RemoteWorkUnitFeedTxs, WorkUnit, WorkUnitFeed, WorkUnitFeedProto, WorkUnitFeedProvider,
58+
WorkUnitRx, WorkUnitTx, collect_task_work_unit_feeds, set_received_time, set_sent_time,
5559
};
5660
pub use worker::generated::worker::worker_service_client::WorkerServiceClient;
5761
pub use worker::generated::worker::worker_service_server::WorkerServiceServer;
5862
pub use worker::generated::worker::{GetWorkerInfoRequest, GetWorkerInfoResponse, TaskKey};
5963
pub use worker::{
60-
DefaultSessionBuilder, FlightWorkerTransport, MappedWorkerSessionBuilder,
61-
MappedWorkerSessionBuilderExt, TaskData, Worker, WorkerConnection, WorkerDispatch,
62-
WorkerDispatchRequest, WorkerQueryContext, WorkerSessionBuilder, WorkerTransport,
64+
DefaultSessionBuilder, FlightWorkerTransport, InMemoryWorkerTransport,
65+
MappedWorkerSessionBuilder, MappedWorkerSessionBuilderExt, TaskData, Worker, WorkerConnection,
66+
WorkerDispatch, WorkerDispatchRequest, WorkerQueryContext, WorkerSessionBuilder,
67+
WorkerTransport,
6368
};
6469

6570
pub use observability::{

src/work_unit_feed/coordinator.rs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
use crate::common::{TreeNodeExt, task_ctx_with_extension};
2+
use crate::work_unit_feed::remote_work_unit_feed::build_work_unit;
3+
use crate::worker::generated::worker as pb;
4+
use crate::{DistributedConfig, DistributedTaskContext, DistributedWorkUnitFeedContext};
5+
use datafusion::common::Result;
6+
use datafusion::common::tree_node::TreeNodeRecursion;
7+
use datafusion::execution::TaskContext;
8+
use datafusion::physical_plan::ExecutionPlan;
9+
use futures::StreamExt;
10+
use futures::stream::BoxStream;
11+
use std::sync::Arc;
12+
13+
/// Drives every registered [crate::WorkUnitFeed] in `plan` for one task and returns its
14+
/// per-partition work-unit streams, already encoded.
15+
///
16+
/// Transport-neutral: the Flight dispatch wraps each [pb::WorkUnit] in its envelope and pushes it
17+
/// over the coordinator-to-worker gRPC stream; an in-process transport hands them straight to the
18+
/// worker-side channels. Each unit carries its `(feed id, task-local partition)`; two tasks emit
19+
/// identical pairs, so routing to the right task stays the transport's job, as with Flight's
20+
/// per-task stream. A task owns a non-overlapping window of partition feeds, offset by its task
21+
/// index, and each partition feed can be consumed once.
22+
pub fn collect_task_work_unit_feeds(
23+
plan: &Arc<dyn ExecutionPlan>,
24+
ctx: &Arc<TaskContext>,
25+
task_index: usize,
26+
task_count: usize,
27+
) -> Result<Vec<BoxStream<'static, Result<pb::WorkUnit>>>> {
28+
let d_cfg = DistributedConfig::from_config_options(ctx.session_config().options())?;
29+
let registry = &d_cfg.__private_work_unit_feed_registry;
30+
31+
let d_ctx = DistributedTaskContext {
32+
task_index,
33+
task_count,
34+
};
35+
let mut streams = vec![];
36+
plan.apply_with_dt_ctx(d_ctx, |plan, d_ctx| {
37+
let Some(wuf) = registry.get_work_unit_feed(plan) else {
38+
return Ok(TreeNodeRecursion::Continue);
39+
};
40+
41+
let partitions = plan.properties().partitioning.partition_count();
42+
let start_partition = partitions * d_ctx.task_index;
43+
let end_partition = start_partition + partitions;
44+
45+
let dist_feed_ctx = DistributedWorkUnitFeedContext {
46+
fan_out_tasks: d_ctx.task_count,
47+
};
48+
let t_ctx = Arc::new(task_ctx_with_extension(ctx, dist_feed_ctx));
49+
let id = wuf.id();
50+
51+
for (partition, feed_idx) in (start_partition..end_partition).enumerate() {
52+
let stream = wuf
53+
.feed(feed_idx, Arc::clone(&t_ctx))?
54+
.map(move |res| res.map(|wu| build_work_unit(&id, partition, wu)))
55+
.boxed();
56+
streams.push(stream);
57+
}
58+
Ok(TreeNodeRecursion::Continue)
59+
})?;
60+
Ok(streams)
61+
}

src/work_unit_feed/mod.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,18 @@
1+
mod coordinator;
12
mod remote_work_unit_feed;
23
mod work_unit;
34
#[allow(clippy::module_inception)]
45
mod work_unit_feed;
56
mod work_unit_feed_provider;
67
mod work_unit_feed_registry;
78

9+
pub use coordinator::collect_task_work_unit_feeds;
10+
pub use remote_work_unit_feed::{
11+
RemoteWorkUnitFeedRegistry, RemoteWorkUnitFeedRxs, RemoteWorkUnitFeedTxs, WorkUnitRx,
12+
WorkUnitTx, set_received_time, set_sent_time,
13+
};
814
pub(crate) use remote_work_unit_feed::{
9-
RemoteWorkUnitFeedRegistry, build_work_unit_batch_msg, set_work_unit_received_time,
10-
set_work_unit_send_time,
15+
build_work_unit_batch_msg, set_work_unit_received_time, set_work_unit_send_time,
1116
};
1217
pub(crate) use work_unit_feed_registry::{WorkUnitFeedRegistry, set_distributed_work_unit_feed};
1318

0 commit comments

Comments
 (0)