Skip to content

Commit 6b79a6f

Browse files
committed
Add Dynamic work scheduling in FileStream
1 parent 393c03f commit 6b79a6f

File tree

7 files changed

+703
-59
lines changed

7 files changed

+703
-59
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/datasource/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ itertools = { workspace = true }
6464
liblzma = { workspace = true, optional = true }
6565
log = { workspace = true }
6666
object_store = { workspace = true }
67+
parking_lot = { workspace = true }
6768
rand = { workspace = true }
6869
tempfile = { workspace = true, optional = true }
6970
tokio = { workspace = true }

datafusion/datasource/src/file_scan_config.rs

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ use crate::file_groups::FileGroup;
2222
use crate::{
2323
PartitionedFile, display::FileGroupsDisplay, file::FileSource,
2424
file_compression_type::FileCompressionType, file_stream::FileStreamBuilder,
25-
source::DataSource, statistics::MinMaxStatistics,
25+
file_stream::work_source::SharedWorkSource, source::DataSource,
26+
statistics::MinMaxStatistics,
2627
};
2728
use arrow::datatypes::FieldRef;
2829
use arrow::datatypes::{DataType, Schema, SchemaRef};
@@ -55,7 +56,13 @@ use datafusion_physical_plan::{
5556
metrics::ExecutionPlanMetricsSet,
5657
};
5758
use log::{debug, warn};
58-
use std::{any::Any, fmt::Debug, fmt::Formatter, fmt::Result as FmtResult, sync::Arc};
59+
use std::{
60+
any::Any,
61+
fmt::Debug,
62+
fmt::Formatter,
63+
fmt::Result as FmtResult,
64+
sync::{Arc, OnceLock},
65+
};
5966

6067
/// [`FileScanConfig`] represents scanning data from a group of files
6168
///
@@ -209,6 +216,11 @@ pub struct FileScanConfig {
209216
/// If the number of file partitions > target_partitions, the file partitions will be grouped
210217
/// in a round-robin fashion such that number of file partitions = target_partitions.
211218
pub partitioned_by_file_group: bool,
219+
/// Shared queue of unopened files for sibling streams in this scan.
220+
///
221+
/// This is initialized once per `FileScanConfig` and reused by reorderable
222+
/// `FileStream`s created from that config.
223+
pub(crate) shared_work_source: Arc<OnceLock<SharedWorkSource>>,
212224
}
213225

214226
/// A builder for [`FileScanConfig`]'s.
@@ -551,10 +563,34 @@ impl FileScanConfigBuilder {
551563
expr_adapter_factory: expr_adapter,
552564
statistics,
553565
partitioned_by_file_group,
566+
shared_work_source: Arc::new(OnceLock::new()),
554567
}
555568
}
556569
}
557570

571+
impl FileScanConfig {
572+
/// Returns the shared unopened-file queue for reorderable streams in this scan.
573+
///
574+
/// The queue is initialized once from all file groups so sibling streams
575+
/// can begin stealing work immediately, even if they are built or polled
576+
/// before every sibling `FileStream` has been constructed.
577+
pub(crate) fn shared_work_source(&self) -> Option<SharedWorkSource> {
578+
if self.preserve_order || self.partitioned_by_file_group {
579+
return None;
580+
}
581+
582+
Some(
583+
self.shared_work_source
584+
.get_or_init(|| {
585+
SharedWorkSource::new(
586+
self.file_groups.iter().flat_map(FileGroup::iter).cloned(),
587+
)
588+
})
589+
.clone(),
590+
)
591+
}
592+
}
593+
558594
impl From<FileScanConfig> for FileScanConfigBuilder {
559595
fn from(config: FileScanConfig) -> Self {
560596
Self {

datafusion/datasource/src/file_stream/builder.rs

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,31 +19,50 @@ use std::sync::Arc;
1919

2020
use crate::file_scan_config::FileScanConfig;
2121
use crate::file_stream::scan_state::ScanState;
22+
use crate::file_stream::work_source::{SharedWorkSource, WorkSource};
2223
use crate::morsel::{FileOpenerMorselizer, Morselizer};
2324
use datafusion_common::{Result, internal_err};
2425
use datafusion_physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
2526

2627
use super::metrics::FileStreamMetrics;
2728
use super::{FileOpener, FileStream, FileStreamState, OnError};
2829

30+
/// Whether this stream may reorder work across sibling `FileStream`s.
31+
///
32+
/// This is derived entirely from [`FileScanConfig`]. Streams that must
33+
/// preserve file order or file-group partition boundaries are not reorderable.
34+
enum Reorderable {
35+
/// This stream may reorder work using a shared queue of unopened files.
36+
Yes(SharedWorkSource),
37+
/// This stream must keep its own local file order.
38+
No,
39+
}
40+
2941
/// Builder for constructing a [`FileStream`].
3042
pub struct FileStreamBuilder<'a> {
3143
config: &'a FileScanConfig,
3244
partition: Option<usize>,
3345
morselizer: Option<Box<dyn Morselizer>>,
3446
metrics: Option<&'a ExecutionPlanMetricsSet>,
3547
on_error: OnError,
48+
reorderable: Reorderable,
3649
}
3750

3851
impl<'a> FileStreamBuilder<'a> {
39-
/// Create a new builder.
52+
/// Create a new builder for [`FileStream`].
4053
pub fn new(config: &'a FileScanConfig) -> Self {
54+
let reorderable = match config.shared_work_source() {
55+
Some(shared_work_source) => Reorderable::Yes(shared_work_source),
56+
None => Reorderable::No,
57+
};
58+
4159
Self {
4260
config,
4361
partition: None,
4462
morselizer: None,
4563
metrics: None,
4664
on_error: OnError::Fail,
65+
reorderable,
4766
}
4867
}
4968

@@ -89,6 +108,7 @@ impl<'a> FileStreamBuilder<'a> {
89108
morselizer,
90109
metrics,
91110
on_error,
111+
reorderable,
92112
} = self;
93113

94114
let Some(partition) = partition else {
@@ -106,10 +126,14 @@ impl<'a> FileStreamBuilder<'a> {
106126
"FileStreamBuilder invalid partition index: {partition}"
107127
);
108128
};
129+
let work_source = match reorderable {
130+
Reorderable::Yes(shared) => WorkSource::Shared(shared),
131+
Reorderable::No => WorkSource::Local(file_group.into_inner().into()),
132+
};
109133

110134
let file_stream_metrics = FileStreamMetrics::new(metrics, partition);
111135
let scan_state = Box::new(ScanState::new(
112-
file_group.into_inner(),
136+
work_source,
113137
config.limit,
114138
morselizer,
115139
on_error,

0 commit comments

Comments
 (0)