Skip to content

Commit e5cb977

Browse files
committed
Allow configuring the buffer in the worker connection
1 parent 6e962e3 commit e5cb977

3 files changed

Lines changed: 56 additions & 7 deletions

File tree

src/distributed_ext.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -541,6 +541,22 @@ pub trait DistributedExt: Sized {
541541
/// Same as [DistributedExt::with_distributed_partial_reduce] but with an in-place mutation.
542542
fn set_distributed_partial_reduce(&mut self, enabled: bool) -> Result<(), DataFusionError>;
543543

544+
/// Sets the soft byte budget that each per-worker connection will buffer in memory before
545+
/// pausing the gRPC pull from that worker. Per-partition channels are unbounded (to avoid
546+
/// head-of-line blocking between sibling partitions), so backpressure is enforced globally
547+
/// per worker connection using this budget.
548+
fn with_distributed_worker_connection_buffer_budget_bytes(
549+
self,
550+
budget_bytes: usize,
551+
) -> Result<Self, DataFusionError>;
552+
553+
/// Same as [DistributedExt::with_distributed_worker_connection_buffer_budget_bytes] but with
554+
/// an in-place mutation.
555+
fn set_distributed_worker_connection_buffer_budget_bytes(
556+
&mut self,
557+
budget_bytes: usize,
558+
) -> Result<(), DataFusionError>;
559+
544560
/// Registers a [WorkUnitFeed] so that Distributed DataFusion can discover it while traversing
545561
/// plans. For more info, refer to [WorkUnitFeed] docs.
546562
///
@@ -693,6 +709,15 @@ impl DistributedExt for SessionConfig {
693709
Ok(())
694710
}
695711

712+
fn set_distributed_worker_connection_buffer_budget_bytes(
713+
&mut self,
714+
budget_bytes: usize,
715+
) -> Result<(), DataFusionError> {
716+
let d_cfg = DistributedConfig::from_config_options_mut(self.options_mut())?;
717+
d_cfg.worker_connection_buffer_budget_bytes = budget_bytes;
718+
Ok(())
719+
}
720+
696721
fn set_distributed_work_unit_feed<T, P, F>(&mut self, getter: F)
697722
where
698723
T: ExecutionPlan + 'static,
@@ -775,6 +800,10 @@ impl DistributedExt for SessionConfig {
775800
#[expr($?;Ok(self))]
776801
fn with_distributed_partial_reduce(mut self, enabled: bool) -> Result<Self, DataFusionError>;
777802

803+
#[call(set_distributed_worker_connection_buffer_budget_bytes)]
804+
#[expr($?;Ok(self))]
805+
fn with_distributed_worker_connection_buffer_budget_bytes(mut self, budget_bytes: usize) -> Result<Self, DataFusionError>;
806+
778807
#[call(set_distributed_work_unit_feed)]
779808
#[expr($;self)]
780809
fn with_distributed_work_unit_feed<T, P, F>(mut self, getter: F) -> Self
@@ -875,6 +904,11 @@ impl DistributedExt for SessionStateBuilder {
875904
#[expr($?;Ok(self))]
876905
fn with_distributed_partial_reduce(mut self, enabled: bool) -> Result<Self, DataFusionError>;
877906

907+
fn set_distributed_worker_connection_buffer_budget_bytes(&mut self, budget_bytes: usize) -> Result<(), DataFusionError>;
908+
#[call(set_distributed_worker_connection_buffer_budget_bytes)]
909+
#[expr($?;Ok(self))]
910+
fn with_distributed_worker_connection_buffer_budget_bytes(mut self, budget_bytes: usize) -> Result<Self, DataFusionError>;
911+
878912
fn set_distributed_work_unit_feed<T, P, F>(&mut self, getter: F)
879913
where
880914
T: ExecutionPlan + 'static,
@@ -981,6 +1015,11 @@ impl DistributedExt for SessionState {
9811015
#[expr($?;Ok(self))]
9821016
fn with_distributed_partial_reduce(mut self, enabled: bool) -> Result<Self, DataFusionError>;
9831017

1018+
fn set_distributed_worker_connection_buffer_budget_bytes(&mut self, budget_bytes: usize) -> Result<(), DataFusionError>;
1019+
#[call(set_distributed_worker_connection_buffer_budget_bytes)]
1020+
#[expr($?;Ok(self))]
1021+
fn with_distributed_worker_connection_buffer_budget_bytes(mut self, budget_bytes: usize) -> Result<Self, DataFusionError>;
1022+
9841023
fn set_distributed_work_unit_feed<T, P, F>(&mut self, getter: F)
9851024
where
9861025
T: ExecutionPlan + 'static,
@@ -1087,6 +1126,11 @@ impl DistributedExt for SessionContext {
10871126
#[expr($?;Ok(self))]
10881127
fn with_distributed_partial_reduce(self, enabled: bool) -> Result<Self, DataFusionError>;
10891128

1129+
fn set_distributed_worker_connection_buffer_budget_bytes(&mut self, budget_bytes: usize) -> Result<(), DataFusionError>;
1130+
#[call(set_distributed_worker_connection_buffer_budget_bytes)]
1131+
#[expr($?;Ok(self))]
1132+
fn with_distributed_worker_connection_buffer_budget_bytes(self, budget_bytes: usize) -> Result<Self, DataFusionError>;
1133+
10901134
fn set_distributed_work_unit_feed<T, P, F>(&mut self, getter: F)
10911135
where
10921136
T: ExecutionPlan + 'static,

src/distributed_planner/distributed_config.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,13 @@ extensions_options! {
5858
/// Disabled by default because its effectiveness is workload-dependent: it helps when
5959
/// aggregation significantly reduces cardinality, but adds overhead when it does not.
6060
pub partial_reduce: bool, default = false
61+
/// Soft byte budget that each per-worker connection will buffer in memory before pausing
62+
/// the gRPC pull from that worker. Per-partition channels are unbounded (to avoid
63+
/// head-of-line blocking between sibling partitions), so backpressure is enforced
64+
/// globally per [WorkerConnection] using this budget. A single message larger than this
65+
/// budget will still be admitted (otherwise we would livelock), so the actual peak per
66+
/// connection is `worker_connection_buffer_budget_bytes + max_message_size`.
67+
pub worker_connection_buffer_budget_bytes: usize, default = 64 * 1024 * 1024
6168
/// Collection of [TaskEstimator]s that will be applied to leaf nodes in order to
6269
/// estimate how many tasks should be spawned for the [Stage] containing the leaf node.
6370
pub(crate) __private_task_estimator: CombinedTaskEstimator, default = CombinedTaskEstimator::default()

src/worker/worker_connection_pool.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use crate::passthrough_headers::get_passthrough_headers;
55
use crate::protobuf::{datafusion_error_to_tonic_status, map_flight_to_datafusion_error};
66
use crate::worker::generated::worker::FlightAppMetadata;
77
use crate::worker::generated::worker::{ExecuteTaskRequest, TaskKey};
8-
use crate::{BytesMetricExt, ChannelResolver, Stage};
8+
use crate::{BytesMetricExt, ChannelResolver, DistributedConfig, Stage};
99
use arrow_flight::FlightData;
1010
use arrow_flight::decode::FlightRecordBatchStream;
1111
use arrow_flight::error::FlightError;
@@ -99,11 +99,6 @@ impl WorkerConnectionPool {
9999

100100
type WorkerMsg = Result<(FlightData, FlightAppMetadata), Status>;
101101

102-
/// Soft byte budget the demux task will buffer in memory before pausing the gRPC
103-
/// pull. Per-partition channels are unbounded (to avoid head-of-line blocking
104-
/// between sibling partitions), so backpressure is enforced globally here instead.
105-
const PER_CONNECTION_BUFFER_BUDGET_BYTES: usize = 64 * 1024 * 1024;
106-
107102
/// Represents a connection to one [Worker]. Network boundaries will use this for streaming
108103
/// data from single partitions while the actual network communication is handling all the partitions
109104
/// under the hood.
@@ -138,6 +133,9 @@ impl WorkerConnection {
138133
metrics: &ExecutionPlanMetricsSet,
139134
) -> Result<Self> {
140135
let channel_resolver = get_distributed_channel_resolver(ctx.as_ref());
136+
let buffer_budget_bytes =
137+
DistributedConfig::from_config_options(ctx.session_config().options())?
138+
.worker_connection_buffer_budget_bytes;
141139
// We are retaining record batches in memory until they are consumed, so we need to account
142140
// for them in the memory pool.
143141
let memory_reservation =
@@ -235,7 +233,7 @@ impl WorkerConnection {
235233
// the way back to the worker without coupling sibling partitions.
236234
// We always allow a message through when reservation == 0 to avoid
237235
// livelock if a single message is larger than the budget.
238-
while memory_reservation.size() >= PER_CONNECTION_BUFFER_BUDGET_BYTES {
236+
while memory_reservation.size() >= buffer_budget_bytes {
239237
tokio::select! {
240238
biased;
241239
_ = cancel.cancelled() => return,

0 commit comments

Comments
 (0)