Skip to content

Commit d5c4c5f

Browse files
committed
Add Dynamic work scheduling in FileStream
1 parent 73351c3 commit d5c4c5f

File tree

7 files changed

+484
-52
lines changed

7 files changed

+484
-52
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/datasource/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ itertools = { workspace = true }
6464
liblzma = { workspace = true, optional = true }
6565
log = { workspace = true }
6666
object_store = { workspace = true }
67+
parking_lot = { workspace = true }
6768
rand = { workspace = true }
6869
tempfile = { workspace = true, optional = true }
6970
tokio = { workspace = true }

datafusion/datasource/src/file_scan_config.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ use crate::file_groups::FileGroup;
2222
use crate::{
2323
PartitionedFile, display::FileGroupsDisplay, file::FileSource,
2424
file_compression_type::FileCompressionType, file_stream::FileStreamBuilder,
25-
source::DataSource, statistics::MinMaxStatistics,
25+
file_stream::work_source::SharedWorkSource, source::DataSource,
26+
statistics::MinMaxStatistics,
2627
};
2728
use arrow::datatypes::FieldRef;
2829
use arrow::datatypes::{DataType, Schema, SchemaRef};
@@ -54,7 +55,13 @@ use datafusion_physical_plan::{
5455
metrics::ExecutionPlanMetricsSet,
5556
};
5657
use log::{debug, warn};
57-
use std::{any::Any, fmt::Debug, fmt::Formatter, fmt::Result as FmtResult, sync::Arc};
58+
use std::{
59+
any::Any,
60+
fmt::Debug,
61+
fmt::Formatter,
62+
fmt::Result as FmtResult,
63+
sync::{Arc, OnceLock},
64+
};
5865

5966
/// [`FileScanConfig`] represents scanning data from a group of files
6067
///
@@ -208,6 +215,8 @@ pub struct FileScanConfig {
208215
/// If the number of file partitions > target_partitions, the file partitions will be grouped
209216
/// in a round-robin fashion such that number of file partitions = target_partitions.
210217
pub partitioned_by_file_group: bool,
218+
/// Shared queue of unopened files for sibling streams in this scan.
219+
pub(crate) shared_work_source: Arc<OnceLock<SharedWorkSource>>,
211220
}
212221

213222
/// A builder for [`FileScanConfig`]'s.
@@ -550,6 +559,7 @@ impl FileScanConfigBuilder {
550559
expr_adapter_factory: expr_adapter,
551560
statistics,
552561
partitioned_by_file_group,
562+
shared_work_source: Arc::new(OnceLock::new()),
553563
}
554564
}
555565
}

datafusion/datasource/src/file_stream/builder.rs

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,30 +19,56 @@ use std::sync::Arc;
1919

2020
use crate::file_scan_config::FileScanConfig;
2121
use crate::file_stream::scan_state::ScanState;
22+
use crate::file_stream::work_source::{SharedWorkSource, WorkSource};
2223
use crate::morsel::{FileOpenerMorselizer, Morselizer};
2324
use datafusion_common::{Result, internal_err};
2425
use datafusion_physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
2526

2627
use super::{FileOpener, FileStream, FileStreamMetrics, FileStreamState, OnError};
2728

29+
/// Whether this stream may reorder work across sibling `FileStream`s.
30+
enum Reorderable {
31+
/// This stream may reorder work, optionally using a shared file queue.
32+
Yes {
33+
shared_work_source: Option<SharedWorkSource>,
34+
},
35+
/// This stream may not reorder work.
36+
No,
37+
}
38+
2839
/// Builder for constructing a [`FileStream`].
2940
pub struct FileStreamBuilder<'a> {
3041
config: &'a FileScanConfig,
3142
partition: Option<usize>,
3243
morselizer: Option<Box<dyn Morselizer>>,
3344
metrics: Option<&'a ExecutionPlanMetricsSet>,
3445
on_error: OnError,
46+
reorderable: Reorderable,
3547
}
3648

3749
impl<'a> FileStreamBuilder<'a> {
3850
/// Create a new builder.
3951
pub fn new(config: &'a FileScanConfig) -> Self {
52+
let reorderable = if config.preserve_order || config.partitioned_by_file_group {
53+
Reorderable::No
54+
} else {
55+
Reorderable::Yes {
56+
shared_work_source: Some(
57+
config
58+
.shared_work_source
59+
.get_or_init(SharedWorkSource::new)
60+
.clone(),
61+
),
62+
}
63+
};
64+
4065
Self {
4166
config,
4267
partition: None,
4368
morselizer: None,
4469
metrics: None,
4570
on_error: OnError::Fail,
71+
reorderable,
4672
}
4773
}
4874

@@ -88,6 +114,7 @@ impl<'a> FileStreamBuilder<'a> {
88114
morselizer,
89115
metrics,
90116
on_error,
117+
reorderable,
91118
} = self;
92119

93120
let Some(partition) = partition else {
@@ -105,10 +132,23 @@ impl<'a> FileStreamBuilder<'a> {
105132
"FileStreamBuilder invalid partition index: {partition}"
106133
);
107134
};
135+
let files = file_group.into_inner();
136+
let work_source = match reorderable {
137+
Reorderable::Yes { shared_work_source } => {
138+
if let Some(shared) = shared_work_source {
139+
shared.register_stream();
140+
shared.push_files(files);
141+
WorkSource::Shared(shared)
142+
} else {
143+
WorkSource::Local(files.into())
144+
}
145+
}
146+
Reorderable::No => WorkSource::Local(files.into()),
147+
};
108148

109149
let file_stream_metrics = FileStreamMetrics::new(metrics, partition);
110150
let scan_state = Box::new(ScanState::new(
111-
file_group.into_inner(),
151+
work_source,
112152
config.limit,
113153
morselizer,
114154
on_error,

0 commit comments

Comments
 (0)