Skip to content

Commit dd314a3

Browse files
committed
Adapt planner so that plan distribution can be made async
1 parent f0ae220 commit dd314a3

25 files changed

Lines changed: 217 additions & 182 deletions

benchmarks/cdk/bin/worker.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ use datafusion::execution::runtime_env::RuntimeEnv;
1010
use datafusion::physical_plan::execute_stream;
1111
use datafusion::prelude::SessionContext;
1212
use datafusion_distributed::{
13-
ChannelResolver, DistributedExt, DistributedMetricsFormat, DistributedPhysicalOptimizerRule,
14-
Worker, WorkerResolver, display_plan_ascii, get_distributed_channel_resolver,
13+
ChannelResolver, DistributedExt, DistributedMetricsFormat, SessionStateBuilderExt, Worker,
14+
WorkerResolver, display_plan_ascii, get_distributed_channel_resolver,
1515
get_distributed_worker_resolver, rewrite_distributed_plan_with_metrics,
1616
};
1717
use futures::{StreamExt, TryFutureExt};
@@ -94,7 +94,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
9494
.with_default_features()
9595
.with_runtime_env(Arc::clone(&runtime_env))
9696
.with_distributed_worker_resolver(Ec2WorkerResolver::new())
97-
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
97+
.with_distributed_planner()
9898
.with_distributed_broadcast_joins(cmd.broadcast_joins)?
9999
.build();
100100
let ctx = SessionContext::from(state);

benchmarks/src/run.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,7 @@ use datafusion::physical_plan::display::DisplayableExecutionPlan;
2828
use datafusion::physical_plan::{collect, displayable};
2929
use datafusion::prelude::*;
3030
use datafusion_distributed::test_utils::localhost::LocalHostWorkerResolver;
31-
use datafusion_distributed::{
32-
DistributedExt, DistributedPhysicalOptimizerRule, NetworkBoundaryExt, Worker,
33-
};
31+
use datafusion_distributed::{DistributedExt, NetworkBoundaryExt, SessionStateBuilderExt, Worker};
3432
use datafusion_distributed_benchmarks::datasets::{clickbench, register_tables, tpcds, tpch};
3533
use std::error::Error;
3634
use std::fs;
@@ -178,7 +176,7 @@ impl RunOpt {
178176
.with_default_features()
179177
.with_config(self.config()?)
180178
.with_distributed_worker_resolver(LocalHostWorkerResolver::new(self.workers.clone()))
181-
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
179+
.with_distributed_planner()
182180
.with_distributed_files_per_task(
183181
self.files_per_task.unwrap_or(get_available_parallelism()),
184182
)?

cli/src/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use datafusion_cli::{
3636
use datafusion_distributed::test_utils::in_memory_channel_resolver::{
3737
InMemoryChannelResolver, InMemoryWorkerResolver,
3838
};
39-
use datafusion_distributed::{DistributedExt, DistributedPhysicalOptimizerRule};
39+
use datafusion_distributed::{DistributedExt, SessionStateBuilderExt};
4040
use std::env;
4141
use std::path::Path;
4242
use std::process::ExitCode;
@@ -148,7 +148,7 @@ async fn main_inner() -> Result<()> {
148148
.with_default_features()
149149
.with_config(session_config)
150150
.with_runtime_env(runtime_env)
151-
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
151+
.with_distributed_planner()
152152
.with_distributed_worker_resolver(InMemoryWorkerResolver::new(16))
153153
.with_distributed_channel_resolver(InMemoryChannelResolver::default())
154154
.build();

console/examples/console_run.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,10 @@ use datafusion::common::DataFusionError;
44
use datafusion::execution::SessionStateBuilder;
55
use datafusion::prelude::{ParquetReadOptions, SessionContext};
66
use datafusion_distributed::{
7-
DistributedExt, DistributedPhysicalOptimizerRule, WorkerResolver, display_plan_ascii,
7+
DistributedExt, SessionStateBuilderExt, WorkerResolver, display_plan_ascii,
88
};
99
use futures::TryStreamExt;
1010
use std::error::Error;
11-
use std::sync::Arc;
1211
use structopt::StructOpt;
1312
use url::Url;
1413

@@ -38,7 +37,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
3837
let state = SessionStateBuilder::new()
3938
.with_default_features()
4039
.with_distributed_worker_resolver(localhost_resolver)
41-
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
40+
.with_distributed_planner()
4241
.with_distributed_files_per_task(1)?
4342
.build();
4443

console/examples/tpcds_runner.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use datafusion::execution::SessionStateBuilder;
44
use datafusion::physical_plan::execute_stream;
55
use datafusion::prelude::SessionContext;
66
use datafusion_distributed::{
7-
DistributedExt, DistributedMetricsFormat, DistributedPhysicalOptimizerRule, WorkerResolver,
7+
DistributedExt, DistributedMetricsFormat, SessionStateBuilderExt, WorkerResolver,
88
display_plan_ascii,
99
};
1010
use datafusion_distributed_benchmarks::datasets::{register_tables, tpcds};
@@ -97,7 +97,7 @@ async fn run_queries(
9797
let state = SessionStateBuilder::new()
9898
.with_default_features()
9999
.with_distributed_worker_resolver(localhost_resolver)
100-
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
100+
.with_distributed_planner()
101101
.build();
102102

103103
let ctx = SessionContext::from(state);

docs/source/user-guide/channel-resolver.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ async fn main() {
4343
let state = SessionStateBuilder::new()
4444
// these two are mandatory.
4545
.with_distributed_worker_resolver(todo!())
46-
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
46+
.with_distributed_planner()
4747
// the CustomChannelResolver needs to be passed here once...
4848
.with_distributed_channel_resolver(channel_resolver.clone())
4949
.build();

docs/source/user-guide/concepts.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@ You'll see these concepts mentioned extensively across the documentation and the
2424

2525
Some other more tangible concepts are the structs and traits exposed publicly, the most important are:
2626

27-
## [DistributedPhysicalOptimizerRule](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/distributed_planner/distributed_physical_optimizer_rule.rs)
27+
## [SessionStateBuilderExt](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/distributed_planner/session_state_builder_ext.rs)
2828

29-
A physical optimizer rule that transforms single-node DataFusion query plans into distributed query plans. It reads
30-
a fully formed physical plan and injects the appropriate nodes to execute the query in a distributed fashion.
29+
An extension trait for `SessionStateBuilder` that provides `with_distributed_planner()`. This registers a custom
30+
query planner that transforms single-node DataFusion query plans into distributed query plans after physical planning.
3131

3232
It builds the distributed plan from bottom to top, injecting network boundaries at appropriate locations based on
3333
the nodes present in the original plan.

docs/source/user-guide/getting-started.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ impl WorkerResolver for LocalhostWorkerResolver {
4949
}
5050
```
5151

52-
Register both the `WorkerResolver` implementation and the `DistributedPhysicalOptimizerRule` in DataFusion's
52+
Register the `WorkerResolver` implementation and the distributed planner in DataFusion's
5353
`SessionStateBuilder` to enable distributed query planning:
5454

5555
```rs
@@ -59,7 +59,7 @@ let localhost_worker_resolver = LocalhostWorkerResolver {
5959

6060
let state = SessionStateBuilder::new()
6161
.with_distributed_worker_resolver(localhost_worker_resolver)
62-
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
62+
.with_distributed_planner()
6363
.build();
6464

6565
let ctx = SessionContext::from(state);

docs/source/user-guide/worker-resolver.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ impl WorkerResolver for CustomWorkerResolver {
2424
async fn main() {
2525
let state = SessionStateBuilder::new()
2626
.with_distributed_worker_resolver(CustomWorkerResolver)
27-
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
27+
.with_distributed_planner()
2828
.build();
2929
}
3030
```

examples/custom_execution_plan.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ use datafusion_distributed::test_utils::in_memory_channel_resolver::{
3939
InMemoryChannelResolver, InMemoryWorkerResolver,
4040
};
4141
use datafusion_distributed::{
42-
DistributedExt, DistributedPhysicalOptimizerRule, DistributedTaskContext, TaskEstimation,
43-
TaskEstimator, WorkerQueryContext, display_plan_ascii,
42+
DistributedExt, DistributedTaskContext, SessionStateBuilderExt, TaskEstimation, TaskEstimator,
43+
WorkerQueryContext, display_plan_ascii,
4444
};
4545
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
4646
use datafusion_proto::protobuf;
@@ -382,7 +382,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
382382
.with_config(config)
383383
.with_distributed_worker_resolver(worker_resolver)
384384
.with_distributed_channel_resolver(channel_resolver)
385-
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
385+
.with_distributed_planner()
386386
.with_distributed_user_codec(NumbersExecCodec)
387387
.with_distributed_task_estimator(NumbersTaskEstimator)
388388
.build();

0 commit comments

Comments
 (0)