Skip to content

Commit 42a3685

Browse files
committed
Allow setting via environment variable
1 parent bcc7d6d commit 42a3685

1 file changed

Lines changed: 21 additions & 4 deletions

File tree

datafusion/datasource/src/file_stream.rs

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,12 @@ use futures::{FutureExt, Stream, StreamExt as _};
5656
/// However, setting this too high may lead to more memory buffering and
5757
/// resource contention if there are too many concurrent IOs.
5858
///
59-
/// TODO make this a config option
60-
const TARGET_CONCURRENT_PLANNERS: usize = 2;
59+
/// Default target number of planners that may be active at once for a single
60+
/// `FileStream`.
61+
///
62+
/// This can be overridden temporarily for testing with the
63+
/// `DATAFUSION_FILESTREAM_TARGET_CONCURRENT_PLANNERS` environment variable.
64+
const DEFAULT_TARGET_CONCURRENT_PLANNERS: usize = 2;
6165

6266
/// Keep at most this many morsels buffered before pausing additional planning.
6367
///
@@ -72,6 +76,14 @@ fn max_buffered_morsels() -> usize {
7276
.unwrap_or(1)
7377
}
7478

79+
fn target_concurrent_planners() -> usize {
80+
std::env::var("DATAFUSION_FILESTREAM_TARGET_CONCURRENT_PLANNERS")
81+
.ok()
82+
.and_then(|value| value.parse::<usize>().ok())
83+
.filter(|value| *value > 0)
84+
.unwrap_or(DEFAULT_TARGET_CONCURRENT_PLANNERS)
85+
}
86+
7587
/// A stream that iterates record batch by record batch, file over file.
7688
///
7789
/// When running, a FileStream has some number of waiting planners (that are
@@ -128,6 +140,10 @@ pub struct FileStream {
128140
///
129141
/// [`MorselPlan`]: crate::morsel::MorselPlan
130142
preserve_order: bool,
143+
/// Target number of planners that may be active concurrently for this
144+
/// stream. This is resolved once during construction so the scheduler
145+
/// doesn't repeatedly consult the environment during polling.
146+
target_concurrent_planners: usize,
131147
/// Is the stream complete?
132148
state: StreamState,
133149
}
@@ -181,6 +197,7 @@ impl FileStream {
181197
baseline_metrics: BaselineMetrics::new(metrics, partition),
182198
on_error: OnError::Fail,
183199
preserve_order: false,
200+
target_concurrent_planners: target_concurrent_planners(),
184201
state: StreamState::Active,
185202
})
186203
}
@@ -208,7 +225,7 @@ impl FileStream {
208225
/// parent plan can fan out into arbitrarily many waiting I/O futures and
209226
/// bypass `TARGET_CONCURRENT_PLANNERS`.
210227
fn planner_io_at_capacity(&self) -> bool {
211-
self.waiting_planners.len() >= TARGET_CONCURRENT_PLANNERS
228+
self.waiting_planners.len() >= self.target_concurrent_planners
212229
}
213230

214231
/// Run a planner on CPU until it either needs I/O or fully completes.
@@ -252,7 +269,7 @@ impl FileStream {
252269
fn start_next_files(&mut self) -> Result<()> {
253270
let max_buffered_morsels = max_buffered_morsels();
254271
while (self.waiting_planners.len() + self.ready_planners.len())
255-
< TARGET_CONCURRENT_PLANNERS
272+
< self.target_concurrent_planners
256273
{
257274
// In ordered mode, do not admit later files while there is any
258275
// earlier file work still buffered, waiting on I/O, or actively

0 commit comments

Comments
 (0)