Skip to content

Commit 9789b9f

Browse files
committed
Add Dynamic work scheduling in FileStream
1 parent b2c9bd6 commit 9789b9f

File tree

7 files changed

+484
-52
lines changed

7 files changed

+484
-52
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/datasource/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ itertools = { workspace = true }
6464
liblzma = { workspace = true, optional = true }
6565
log = { workspace = true }
6666
object_store = { workspace = true }
67+
parking_lot = { workspace = true }
6768
rand = { workspace = true }
6869
tempfile = { workspace = true, optional = true }
6970
tokio = { workspace = true }

datafusion/datasource/src/file_scan_config.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ use crate::file_groups::FileGroup;
2222
use crate::{
2323
PartitionedFile, display::FileGroupsDisplay, file::FileSource,
2424
file_compression_type::FileCompressionType, file_stream::FileStreamBuilder,
25-
source::DataSource, statistics::MinMaxStatistics,
25+
file_stream::work_source::SharedWorkSource, source::DataSource,
26+
statistics::MinMaxStatistics,
2627
};
2728
use arrow::datatypes::FieldRef;
2829
use arrow::datatypes::{DataType, Schema, SchemaRef};
@@ -54,7 +55,13 @@ use datafusion_physical_plan::{
5455
metrics::ExecutionPlanMetricsSet,
5556
};
5657
use log::{debug, warn};
57-
use std::{any::Any, fmt::Debug, fmt::Formatter, fmt::Result as FmtResult, sync::Arc};
58+
use std::{
59+
any::Any,
60+
fmt::Debug,
61+
fmt::Formatter,
62+
fmt::Result as FmtResult,
63+
sync::{Arc, OnceLock},
64+
};
5865

5966
/// [`FileScanConfig`] represents scanning data from a group of files
6067
///
@@ -208,6 +215,8 @@ pub struct FileScanConfig {
208215
/// If the number of file partitions > target_partitions, the file partitions will be grouped
209216
/// in a round-robin fashion such that number of file partitions = target_partitions.
210217
pub partitioned_by_file_group: bool,
218+
/// Shared queue of unopened files for sibling streams in this scan.
219+
pub(crate) shared_work_source: Arc<OnceLock<SharedWorkSource>>,
211220
}
212221

213222
/// A builder for [`FileScanConfig`]'s.
@@ -550,6 +559,7 @@ impl FileScanConfigBuilder {
550559
expr_adapter_factory: expr_adapter,
551560
statistics,
552561
partitioned_by_file_group,
562+
shared_work_source: Arc::new(OnceLock::new()),
553563
}
554564
}
555565
}

datafusion/datasource/src/file_stream/builder.rs

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,31 +19,57 @@ use std::sync::Arc;
1919

2020
use crate::file_scan_config::FileScanConfig;
2121
use crate::file_stream::scan_state::ScanState;
22+
use crate::file_stream::work_source::{SharedWorkSource, WorkSource};
2223
use crate::morsel::{FileOpenerMorselizer, Morselizer};
2324
use datafusion_common::{Result, internal_err};
2425
use datafusion_physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
2526

2627
use super::metrics::FileStreamMetrics;
2728
use super::{FileOpener, FileStream, FileStreamState, OnError};
2829

30+
/// Whether this stream may reorder work across sibling `FileStream`s.
31+
enum Reorderable {
32+
/// This stream may reorder work, optionally using a shared file queue.
33+
Yes {
34+
shared_work_source: Option<SharedWorkSource>,
35+
},
36+
/// This stream may not reorder work.
37+
No,
38+
}
39+
2940
/// Builder for constructing a [`FileStream`].
3041
pub struct FileStreamBuilder<'a> {
3142
config: &'a FileScanConfig,
3243
partition: Option<usize>,
3344
morselizer: Option<Box<dyn Morselizer>>,
3445
metrics: Option<&'a ExecutionPlanMetricsSet>,
3546
on_error: OnError,
47+
reorderable: Reorderable,
3648
}
3749

3850
impl<'a> FileStreamBuilder<'a> {
3951
/// Create a new builder.
4052
pub fn new(config: &'a FileScanConfig) -> Self {
53+
let reorderable = if config.preserve_order || config.partitioned_by_file_group {
54+
Reorderable::No
55+
} else {
56+
Reorderable::Yes {
57+
shared_work_source: Some(
58+
config
59+
.shared_work_source
60+
.get_or_init(SharedWorkSource::new)
61+
.clone(),
62+
),
63+
}
64+
};
65+
4166
Self {
4267
config,
4368
partition: None,
4469
morselizer: None,
4570
metrics: None,
4671
on_error: OnError::Fail,
72+
reorderable,
4773
}
4874
}
4975

@@ -89,6 +115,7 @@ impl<'a> FileStreamBuilder<'a> {
89115
morselizer,
90116
metrics,
91117
on_error,
118+
reorderable,
92119
} = self;
93120

94121
let Some(partition) = partition else {
@@ -106,10 +133,23 @@ impl<'a> FileStreamBuilder<'a> {
106133
"FileStreamBuilder invalid partition index: {partition}"
107134
);
108135
};
136+
let files = file_group.into_inner();
137+
let work_source = match reorderable {
138+
Reorderable::Yes { shared_work_source } => {
139+
if let Some(shared) = shared_work_source {
140+
shared.register_stream();
141+
shared.push_files(files);
142+
WorkSource::Shared(shared)
143+
} else {
144+
WorkSource::Local(files.into())
145+
}
146+
}
147+
Reorderable::No => WorkSource::Local(files.into()),
148+
};
109149

110150
let file_stream_metrics = FileStreamMetrics::new(metrics, partition);
111151
let scan_state = Box::new(ScanState::new(
112-
file_group.into_inner(),
152+
work_source,
113153
config.limit,
114154
morselizer,
115155
on_error,

0 commit comments

Comments
 (0)