Skip to content

Commit 004c5f9

Browse files
committed
Read channel capacity from BallistaConfig in create_query_stage_exec
Move channel_capacity extraction from serde deserialization to the executor's create_query_stage_exec, which now has access to SessionConfig (upstream #1542). This is the clean place for executor-side late binding.
1 parent 0530399 commit 004c5f9

3 files changed

Lines changed: 23 additions & 24 deletions

File tree

ballista/core/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ prost = { workspace = true }
6464
prost-types = { workspace = true }
6565
rand = { workspace = true }
6666
serde = { workspace = true, features = ["derive"] }
67-
tokio = { workspace = true, features = ["fs", "rt-multi-thread"] }
67+
tokio = { workspace = true, features = ["rt-multi-thread"] }
6868
tokio-stream = { workspace = true, features = ["net"] }
6969
tonic = { workspace = true }
7070
tonic-prost = { workspace = true }

ballista/core/src/serde/mod.rs

Lines changed: 8 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
//! This crate contains code generated from the Ballista Protocol Buffer Definition as well
1919
//! as convenience code for interacting with the generated code.
2020
21-
use crate::config::BallistaConfig;
2221
use crate::extension::BallistaCacheNode;
2322
use crate::{error::BallistaError, serde::scheduler::Action as BallistaAction};
2423

@@ -54,7 +53,7 @@ use std::{convert::TryInto, io::Cursor};
5453

5554
use crate::execution_plans::sort_shuffle::SortShuffleConfig;
5655
use crate::execution_plans::{
57-
DEFAULT_SHUFFLE_CHANNEL_CAPACITY, ShuffleReaderExec, ShuffleWriterExec,
56+
ShuffleReaderExec, ShuffleWriterExec,
5857
SortShuffleWriterExec, UnresolvedShuffleExec,
5958
};
6059
use crate::serde::protobuf::{
@@ -353,24 +352,13 @@ impl PhysicalExtensionCodec for BallistaPhysicalExtensionCodec {
353352
&converter,
354353
)?;
355354

356-
let channel_capacity = ctx
357-
.session_config()
358-
.options()
359-
.extensions
360-
.get::<BallistaConfig>()
361-
.map(|c| c.shuffle_writer_channel_capacity())
362-
.unwrap_or(DEFAULT_SHUFFLE_CHANNEL_CAPACITY);
363-
364-
Ok(Arc::new(
365-
ShuffleWriterExec::try_new(
366-
shuffle_writer.job_id.clone(),
367-
shuffle_writer.stage_id as usize,
368-
input,
369-
"".to_string(), // this is intentional but hacky - the executor will fill this in
370-
shuffle_output_partitioning,
371-
)?
372-
.with_channel_capacity(channel_capacity),
373-
))
355+
Ok(Arc::new(ShuffleWriterExec::try_new(
356+
shuffle_writer.job_id.clone(),
357+
shuffle_writer.stage_id as usize,
358+
input,
359+
"".to_string(), // this is intentional but hacky - the executor will fill this in
360+
shuffle_output_partitioning,
361+
)?))
374362
}
375363
PhysicalPlanType::SortShuffleWriter(sort_shuffle_writer) => {
376364
let input = inputs[0].clone();

ballista/executor/src/execution_engine.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,11 @@
2222
//! for creating query stage executors from physical plans.
2323
2424
use async_trait::async_trait;
25+
use ballista_core::config::BallistaConfig;
2526
use ballista_core::execution_plans::sort_shuffle::SortShuffleWriterExec;
26-
use ballista_core::execution_plans::{ShuffleReaderExec, ShuffleWriterExec};
27+
use ballista_core::execution_plans::{
28+
ShuffleReaderExec, ShuffleWriterExec, DEFAULT_SHUFFLE_CHANNEL_CAPACITY,
29+
};
2730
use ballista_core::serde::protobuf::ShuffleWritePartition;
2831
use ballista_core::utils;
2932
use datafusion::common::tree_node::{Transformed, TreeNode};
@@ -91,7 +94,7 @@ impl ExecutionEngine for DefaultExecutionEngine {
9194
stage_id: usize,
9295
plan: Arc<dyn ExecutionPlan>,
9396
work_dir: &str,
94-
_config: &SessionConfig,
97+
config: &SessionConfig,
9598
) -> Result<Arc<dyn QueryStageExecutor>> {
9699
let plan = plan
97100
.transform(|p| {
@@ -104,6 +107,14 @@ impl ExecutionEngine for DefaultExecutionEngine {
104107
})?
105108
.data;
106109

110+
let channel_capacity = config
111+
.options()
112+
.extensions
113+
.get::<BallistaConfig>()
114+
.map(|c| c.shuffle_writer_channel_capacity())
115+
.unwrap_or(DEFAULT_SHUFFLE_CHANNEL_CAPACITY)
116+
.max(1);
117+
107118
// the query plan created by the scheduler always starts with a shuffle writer
108119
// (either ShuffleWriterExec or SortShuffleWriterExec)
109120
if let Some(shuffle_writer) = plan.as_any().downcast_ref::<ShuffleWriterExec>() {
@@ -115,7 +126,7 @@ impl ExecutionEngine for DefaultExecutionEngine {
115126
work_dir.to_string(),
116127
shuffle_writer.shuffle_output_partitioning().cloned(),
117128
)?
118-
.with_channel_capacity(shuffle_writer.channel_capacity());
129+
.with_channel_capacity(channel_capacity);
119130
Ok(Arc::new(DefaultQueryStageExec::new(
120131
ShuffleWriterVariant::Hash(exec),
121132
)))

0 commit comments

Comments
 (0)