Skip to content

Commit 183a370

Browse files
committed
Read shuffle writer channel capacity from BallistaConfig instead of protobuf
1 parent 1f7f67f commit 183a370

4 files changed

Lines changed: 15 additions & 20 deletions

File tree

ballista/core/proto/ballista.proto

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@ message ShuffleWriterExecNode {
6060
uint32 stage_id = 2;
6161
datafusion.PhysicalPlanNode input = 3;
6262
datafusion.PhysicalHashRepartition output_partitioning = 4;
63-
uint32 channel_capacity = 5;
6463
}
6564

6665
// Sort-based shuffle writer that produces consolidated files with index

ballista/core/src/serde/generated/ballista.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,6 @@ pub struct ShuffleWriterExecNode {
6565
pub output_partitioning: ::core::option::Option<
6666
::datafusion_proto::protobuf::PhysicalHashRepartition,
6767
>,
68-
#[prost(uint32, tag = "5")]
69-
pub channel_capacity: u32,
7068
}
7169
/// Sort-based shuffle writer that produces consolidated files with index
7270
#[derive(Clone, PartialEq, ::prost::Message)]

ballista/core/src/serde/mod.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
//! This crate contains code generated from the Ballista Protocol Buffer Definition as well
1919
//! as convenience code for interacting with the generated code.
2020
21+
use crate::config::BallistaConfig;
2122
use crate::extension::BallistaCacheNode;
2223
use crate::{error::BallistaError, serde::scheduler::Action as BallistaAction};
2324

@@ -352,12 +353,13 @@ impl PhysicalExtensionCodec for BallistaPhysicalExtensionCodec {
352353
&converter,
353354
)?;
354355

355-
let channel_capacity = shuffle_writer.channel_capacity as usize;
356-
let channel_capacity = if channel_capacity == 0 {
357-
DEFAULT_SHUFFLE_CHANNEL_CAPACITY
358-
} else {
359-
channel_capacity
360-
};
356+
let channel_capacity = ctx
357+
.session_config()
358+
.options()
359+
.extensions
360+
.get::<BallistaConfig>()
361+
.map(|c| c.shuffle_writer_channel_capacity())
362+
.unwrap_or(DEFAULT_SHUFFLE_CHANNEL_CAPACITY);
361363

362364
Ok(Arc::new(
363365
ShuffleWriterExec::try_new(
@@ -501,7 +503,6 @@ impl PhysicalExtensionCodec for BallistaPhysicalExtensionCodec {
501503
stage_id: exec.stage_id() as u32,
502504
input: None,
503505
output_partitioning,
504-
channel_capacity: exec.channel_capacity() as u32,
505506
},
506507
)),
507508
};

ballista/scheduler/src/planner.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -374,16 +374,13 @@ fn create_shuffle_writer_with_config(
374374
}
375375

376376
// Fall back to standard shuffle writer
377-
Ok(Arc::new(
378-
ShuffleWriterExec::try_new(
379-
job_id.to_owned(),
380-
stage_id,
381-
plan,
382-
"".to_owned(),
383-
partitioning,
384-
)?
385-
.with_channel_capacity(ballista_config.shuffle_writer_channel_capacity()),
386-
))
377+
Ok(Arc::new(ShuffleWriterExec::try_new(
378+
job_id.to_owned(),
379+
stage_id,
380+
plan,
381+
"".to_owned(),
382+
partitioning,
383+
)?))
387384
}
388385

389386
#[cfg(test)]

0 commit comments

Comments
 (0)