Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions ballista/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
//! Ballista configuration

use crate::error::{BallistaError, Result};
use crate::execution_plans::DEFAULT_SHUFFLE_CHANNEL_CAPACITY;
use datafusion::{
arrow::datatypes::DataType, common::config_err, config::ConfigExtension,
};
Expand Down Expand Up @@ -75,6 +76,9 @@ pub const BALLISTA_SHUFFLE_SORT_BASED_ENABLED: &str =
/// Configuration key for sort shuffle target batch size in rows.
pub const BALLISTA_SHUFFLE_SORT_BASED_BATCH_SIZE: &str =
"ballista.shuffle.sort_based.batch_size";
/// Configuration key for shuffle writer bounded-channel capacity.
pub const BALLISTA_SHUFFLE_WRITER_CHANNEL_CAPACITY: &str =
"ballista.shuffle.writer_channel_capacity";

/// Result type for configuration parsing operations.
pub type ParseResult<T> = result::Result<T, String>;
Expand Down Expand Up @@ -140,6 +144,10 @@ static CONFIG_ENTRIES: LazyLock<HashMap<String, ConfigEntry>> = LazyLock::new(||
"Target batch size in rows for coalescing small batches in sort shuffle".to_string(),
DataType::UInt64,
Some((8192).to_string())),
ConfigEntry::new(BALLISTA_SHUFFLE_WRITER_CHANNEL_CAPACITY.to_string(),
"Bounded channel capacity for async-to-blocking I/O bridge in shuffle writer".to_string(),
DataType::UInt32,
Some(DEFAULT_SHUFFLE_CHANNEL_CAPACITY.to_string())),
ConfigEntry::new(BALLISTA_CLIENT_PULL.to_string(),
"Should client employ pull or push job tracking. In pull mode client will make a request to server in the loop, until job finishes. Pull mode is kept for legacy clients.".to_string(),
DataType::Boolean,
Expand Down Expand Up @@ -346,6 +354,11 @@ impl BallistaConfig {
self.get_usize_setting(BALLISTA_SHUFFLE_SORT_BASED_BATCH_SIZE)
}

/// Returns the bounded-channel capacity for the shuffle writer I/O bridge.
pub fn shuffle_writer_channel_capacity(&self) -> usize {
self.get_usize_setting(BALLISTA_SHUFFLE_WRITER_CHANNEL_CAPACITY)
}

/// Should client employ pull or push job tracking strategy
pub fn client_pull(&self) -> bool {
self.get_bool_setting(BALLISTA_CLIENT_PULL)
Expand Down
1 change: 1 addition & 0 deletions ballista/core/src/execution_plans/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub use distributed_explain_analyze::DistributedExplainAnalyzeExec;
pub use distributed_query::DistributedQueryExec;
pub use shuffle_reader::ShuffleReaderExec;
pub use shuffle_reader::{stats_for_partition, stats_for_partitions};
pub use shuffle_writer::DEFAULT_SHUFFLE_CHANNEL_CAPACITY;
pub use shuffle_writer::ShuffleWriterExec;
pub use shuffle_writer_trait::ShuffleWriter;
pub use sort_shuffle::SortShuffleWriterExec;
Expand Down
Loading
Loading