Skip to content

Commit acde88a

Browse files
committed
Add Dynamic work scheduling in FileStream
1 parent 393c03f commit acde88a

File tree

7 files changed

+578
-53
lines changed

7 files changed

+578
-53
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/datasource/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ itertools = { workspace = true }
6464
liblzma = { workspace = true, optional = true }
6565
log = { workspace = true }
6666
object_store = { workspace = true }
67+
parking_lot = { workspace = true }
6768
rand = { workspace = true }
6869
tempfile = { workspace = true, optional = true }
6970
tokio = { workspace = true }

datafusion/datasource/src/file_scan_config.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ use crate::file_groups::FileGroup;
2222
use crate::{
2323
PartitionedFile, display::FileGroupsDisplay, file::FileSource,
2424
file_compression_type::FileCompressionType, file_stream::FileStreamBuilder,
25-
source::DataSource, statistics::MinMaxStatistics,
25+
file_stream::work_source::SharedWorkSource, source::DataSource,
26+
statistics::MinMaxStatistics,
2627
};
2728
use arrow::datatypes::FieldRef;
2829
use arrow::datatypes::{DataType, Schema, SchemaRef};
@@ -55,7 +56,13 @@ use datafusion_physical_plan::{
5556
metrics::ExecutionPlanMetricsSet,
5657
};
5758
use log::{debug, warn};
58-
use std::{any::Any, fmt::Debug, fmt::Formatter, fmt::Result as FmtResult, sync::Arc};
59+
use std::{
60+
any::Any,
61+
fmt::Debug,
62+
fmt::Formatter,
63+
fmt::Result as FmtResult,
64+
sync::{Arc, OnceLock},
65+
};
5966

6067
/// [`FileScanConfig`] represents scanning data from a group of files
6168
///
@@ -209,6 +216,8 @@ pub struct FileScanConfig {
209216
/// If the number of file partitions > target_partitions, the file partitions will be grouped
210217
/// in a round-robin fashion such that number of file partitions = target_partitions.
211218
pub partitioned_by_file_group: bool,
219+
/// Shared queue of unopened files for sibling streams in this scan.
220+
pub(crate) shared_work_source: Arc<OnceLock<SharedWorkSource>>,
212221
}
213222

214223
/// A builder for [`FileScanConfig`]'s.
@@ -551,6 +560,7 @@ impl FileScanConfigBuilder {
551560
expr_adapter_factory: expr_adapter,
552561
statistics,
553562
partitioned_by_file_group,
563+
shared_work_source: Arc::new(OnceLock::new()),
554564
}
555565
}
556566
}

datafusion/datasource/src/file_stream/builder.rs

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,31 +19,57 @@ use std::sync::Arc;
1919

2020
use crate::file_scan_config::FileScanConfig;
2121
use crate::file_stream::scan_state::ScanState;
22+
use crate::file_stream::work_source::{SharedWorkSource, WorkSource};
2223
use crate::morsel::{FileOpenerMorselizer, Morselizer};
2324
use datafusion_common::{Result, internal_err};
2425
use datafusion_physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
2526

2627
use super::metrics::FileStreamMetrics;
2728
use super::{FileOpener, FileStream, FileStreamState, OnError};
2829

30+
/// Whether this stream may reorder work across sibling `FileStream`s.
31+
enum Reorderable {
32+
/// This stream may reorder work, optionally using a shared file queue.
33+
Yes {
34+
shared_work_source: Option<SharedWorkSource>,
35+
},
36+
/// This stream may not reorder work.
37+
No,
38+
}
39+
2940
/// Builder for constructing a [`FileStream`].
3041
pub struct FileStreamBuilder<'a> {
3142
config: &'a FileScanConfig,
3243
partition: Option<usize>,
3344
morselizer: Option<Box<dyn Morselizer>>,
3445
metrics: Option<&'a ExecutionPlanMetricsSet>,
3546
on_error: OnError,
47+
reorderable: Reorderable,
3648
}
3749

3850
impl<'a> FileStreamBuilder<'a> {
3951
/// Create a new builder.
4052
pub fn new(config: &'a FileScanConfig) -> Self {
53+
let reorderable = if config.preserve_order || config.partitioned_by_file_group {
54+
Reorderable::No
55+
} else {
56+
Reorderable::Yes {
57+
shared_work_source: Some(
58+
config
59+
.shared_work_source
60+
.get_or_init(SharedWorkSource::new)
61+
.clone(),
62+
),
63+
}
64+
};
65+
4166
Self {
4267
config,
4368
partition: None,
4469
morselizer: None,
4570
metrics: None,
4671
on_error: OnError::Fail,
72+
reorderable,
4773
}
4874
}
4975

@@ -89,6 +115,7 @@ impl<'a> FileStreamBuilder<'a> {
89115
morselizer,
90116
metrics,
91117
on_error,
118+
reorderable,
92119
} = self;
93120

94121
let Some(partition) = partition else {
@@ -106,10 +133,23 @@ impl<'a> FileStreamBuilder<'a> {
106133
"FileStreamBuilder invalid partition index: {partition}"
107134
);
108135
};
136+
let files = file_group.into_inner();
137+
let work_source = match reorderable {
138+
Reorderable::Yes { shared_work_source } => {
139+
if let Some(shared) = shared_work_source {
140+
shared.register_stream();
141+
shared.push_files(files);
142+
WorkSource::Shared(shared)
143+
} else {
144+
WorkSource::Local(files.into())
145+
}
146+
}
147+
Reorderable::No => WorkSource::Local(files.into()),
148+
};
109149

110150
let file_stream_metrics = FileStreamMetrics::new(metrics, partition);
111151
let scan_state = Box::new(ScanState::new(
112-
file_group.into_inner(),
152+
work_source,
113153
config.limit,
114154
morselizer,
115155
on_error,

0 commit comments

Comments
 (0)