Skip to content

Commit d972b78

Browse files
authored
Add FileStreamBuilder for creating FileStreams (#21261)
## Which issue does this PR close? - part of #20529 - Broken out of #20820 ## Rationale for this change @Dandandan and I are in the process of significantly reworking how FileStream works internally (morsels!) To support this, I want to get the code ready for a reorganization and minimize the diffs ## What changes are included in this PR? 1. Add `FileStreamBuilder` 2. Deprecate FileStream::new 3. Promote file_stream to a module ## Are these changes tested? Yes by existing CI. ## Are there any user-facing changes? Certain user facing API code that uses FileStream will need to use `FileStreamBuilder` rather than directly construct a `FileStream`
1 parent 537e84f commit d972b78

File tree

5 files changed

+200
-35
lines changed

5 files changed

+200
-35
lines changed

datafusion-examples/examples/custom_data_source/csv_json_opener.rs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ use datafusion::{
2727
file_format::file_compression_type::FileCompressionType,
2828
listing::PartitionedFile,
2929
object_store::ObjectStoreUrl,
30-
physical_plan::{CsvSource, FileSource, FileStream, JsonOpener, JsonSource},
30+
physical_plan::{
31+
CsvSource, FileSource, FileStreamBuilder, JsonOpener, JsonSource,
32+
},
3133
},
3234
error::Result,
3335
physical_plan::metrics::ExecutionPlanMetricsSet,
@@ -80,8 +82,12 @@ async fn csv_opener() -> Result<()> {
8082
.create_file_opener(object_store, &scan_config, 0)?;
8183

8284
let mut result = vec![];
83-
let mut stream =
84-
FileStream::new(&scan_config, 0, opener, &ExecutionPlanMetricsSet::new())?;
85+
let metrics = ExecutionPlanMetricsSet::new();
86+
let mut stream = FileStreamBuilder::new(&scan_config)
87+
.with_partition(0)
88+
.with_file_opener(opener)
89+
.with_metrics(&metrics)
90+
.build()?;
8591
while let Some(batch) = stream.next().await.transpose()? {
8692
result.push(batch);
8793
}
@@ -137,12 +143,12 @@ async fn json_opener() -> Result<()> {
137143
.with_file(PartitionedFile::new(path.to_string(), 10))
138144
.build();
139145

140-
let mut stream = FileStream::new(
141-
&scan_config,
142-
0,
143-
Arc::new(opener),
144-
&ExecutionPlanMetricsSet::new(),
145-
)?;
146+
let metrics = ExecutionPlanMetricsSet::new();
147+
let mut stream = FileStreamBuilder::new(&scan_config)
148+
.with_partition(0)
149+
.with_file_opener(Arc::new(opener))
150+
.with_metrics(&metrics)
151+
.build()?;
146152
let mut result = vec![];
147153
while let Some(batch) = stream.next().await.transpose()? {
148154
result.push(batch);

datafusion/core/src/datasource/physical_plan/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,5 +49,5 @@ pub use datafusion_datasource::file_scan_config::{
4949
pub use datafusion_datasource::file_sink_config::*;
5050

5151
pub use datafusion_datasource::file_stream::{
52-
FileOpenFuture, FileOpener, FileStream, OnError,
52+
FileOpenFuture, FileOpener, FileStream, FileStreamBuilder, OnError,
5353
};

datafusion/datasource/src/file_scan_config.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
use crate::file_groups::FileGroup;
2222
use crate::{
2323
PartitionedFile, display::FileGroupsDisplay, file::FileSource,
24-
file_compression_type::FileCompressionType, file_stream::FileStream,
24+
file_compression_type::FileCompressionType, file_stream::FileStreamBuilder,
2525
source::DataSource, statistics::MinMaxStatistics,
2626
};
2727
use arrow::datatypes::FieldRef;
@@ -588,7 +588,11 @@ impl DataSource for FileScanConfig {
588588

589589
let opener = source.create_file_opener(object_store, self, partition)?;
590590

591-
let stream = FileStream::new(self, partition, opener, source.metrics())?;
591+
let stream = FileStreamBuilder::new(self)
592+
.with_partition(partition)
593+
.with_file_opener(opener)
594+
.with_metrics(source.metrics())
595+
.build()?;
592596
Ok(Box::pin(cooperative(stream)))
593597
}
594598

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::sync::Arc;
19+
20+
use crate::file_scan_config::FileScanConfig;
21+
use datafusion_common::{Result, internal_err};
22+
use datafusion_physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
23+
24+
use super::{FileOpener, FileStream, FileStreamMetrics, FileStreamState, OnError};
25+
26+
/// Builder for constructing a [`FileStream`].
27+
pub struct FileStreamBuilder<'a> {
28+
config: &'a FileScanConfig,
29+
partition: Option<usize>,
30+
file_opener: Option<Arc<dyn FileOpener>>,
31+
metrics: Option<&'a ExecutionPlanMetricsSet>,
32+
on_error: OnError,
33+
}
34+
35+
impl<'a> FileStreamBuilder<'a> {
36+
/// Create a new builder.
37+
pub fn new(config: &'a FileScanConfig) -> Self {
38+
Self {
39+
config,
40+
partition: None,
41+
file_opener: None,
42+
metrics: None,
43+
on_error: OnError::Fail,
44+
}
45+
}
46+
47+
/// Configure the partition to scan.
48+
pub fn with_partition(mut self, partition: usize) -> Self {
49+
self.partition = Some(partition);
50+
self
51+
}
52+
53+
/// Configure the [`FileOpener`] used to open files.
54+
pub fn with_file_opener(mut self, file_opener: Arc<dyn FileOpener>) -> Self {
55+
self.file_opener = Some(file_opener);
56+
self
57+
}
58+
59+
/// Configure the metrics set used by the stream.
60+
pub fn with_metrics(mut self, metrics: &'a ExecutionPlanMetricsSet) -> Self {
61+
self.metrics = Some(metrics);
62+
self
63+
}
64+
65+
/// Configure the behavior when opening or scanning a file fails.
66+
pub fn with_on_error(mut self, on_error: OnError) -> Self {
67+
self.on_error = on_error;
68+
self
69+
}
70+
71+
/// Build the configured [`FileStream`].
72+
pub fn build(self) -> Result<FileStream> {
73+
let Self {
74+
config,
75+
partition,
76+
file_opener,
77+
metrics,
78+
on_error,
79+
} = self;
80+
81+
let Some(partition) = partition else {
82+
return internal_err!("FileStreamBuilder missing required partition");
83+
};
84+
let Some(file_opener) = file_opener else {
85+
return internal_err!("FileStreamBuilder missing required file_opener");
86+
};
87+
let Some(metrics) = metrics else {
88+
return internal_err!("FileStreamBuilder missing required metrics");
89+
};
90+
let projected_schema = config.projected_schema()?;
91+
let Some(file_group) = config.file_groups.get(partition).cloned() else {
92+
return internal_err!(
93+
"FileStreamBuilder invalid partition index: {partition}"
94+
);
95+
};
96+
97+
Ok(FileStream {
98+
file_iter: file_group.into_inner().into_iter().collect(),
99+
projected_schema,
100+
remain: config.limit,
101+
file_opener,
102+
state: FileStreamState::Idle,
103+
file_stream_metrics: FileStreamMetrics::new(metrics, partition),
104+
baseline_metrics: BaselineMetrics::new(metrics, partition),
105+
on_error,
106+
})
107+
}
108+
}

datafusion/datasource/src/file_stream.rs renamed to datafusion/datasource/src/file_stream/mod.rs

Lines changed: 70 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
//! Note: Most traits here need to be marked `Sync + Send` to be
2222
//! compliant with the `SendableRecordBatchStream` trait.
2323
24+
mod builder;
25+
2426
use std::collections::VecDeque;
2527
use std::pin::Pin;
2628
use std::sync::Arc;
@@ -29,7 +31,7 @@ use std::task::{Context, Poll};
2931
use crate::PartitionedFile;
3032
use crate::file_scan_config::FileScanConfig;
3133
use arrow::datatypes::SchemaRef;
32-
use datafusion_common::error::Result;
34+
use datafusion_common::Result;
3335
use datafusion_execution::RecordBatchStream;
3436
use datafusion_physical_plan::metrics::{
3537
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricCategory, Time,
@@ -42,6 +44,8 @@ use futures::future::BoxFuture;
4244
use futures::stream::BoxStream;
4345
use futures::{FutureExt as _, Stream, StreamExt as _, ready};
4446

47+
pub use builder::FileStreamBuilder;
48+
4549
/// A stream that iterates record batch by record batch, file over file.
4650
pub struct FileStream {
4751
/// An iterator over input files.
@@ -66,26 +70,18 @@ pub struct FileStream {
6670

6771
impl FileStream {
6872
/// Create a new `FileStream` using the give `FileOpener` to scan underlying files
73+
#[deprecated(since = "54.0.0", note = "Use FileStreamBuilder instead")]
6974
pub fn new(
7075
config: &FileScanConfig,
7176
partition: usize,
7277
file_opener: Arc<dyn FileOpener>,
7378
metrics: &ExecutionPlanMetricsSet,
7479
) -> Result<Self> {
75-
let projected_schema = config.projected_schema()?;
76-
77-
let file_group = config.file_groups[partition].clone();
78-
79-
Ok(Self {
80-
file_iter: file_group.into_inner().into_iter().collect(),
81-
projected_schema,
82-
remain: config.limit,
83-
file_opener,
84-
state: FileStreamState::Idle,
85-
file_stream_metrics: FileStreamMetrics::new(metrics, partition),
86-
baseline_metrics: BaselineMetrics::new(metrics, partition),
87-
on_error: OnError::Fail,
88-
})
80+
FileStreamBuilder::new(config)
81+
.with_partition(partition)
82+
.with_file_opener(file_opener)
83+
.with_metrics(metrics)
84+
.build()
8985
}
9086

9187
/// Specify the behavior when an error occurs opening or scanning a file
@@ -400,17 +396,17 @@ impl FileStreamMetrics {
400396

401397
#[cfg(test)]
402398
mod tests {
403-
use crate::PartitionedFile;
404-
use crate::file_scan_config::FileScanConfigBuilder;
399+
use crate::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
405400
use crate::tests::make_partition;
401+
use crate::{PartitionedFile, TableSchema};
406402
use datafusion_common::error::Result;
407403
use datafusion_execution::object_store::ObjectStoreUrl;
408404
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
409405
use futures::{FutureExt as _, StreamExt as _};
410406
use std::sync::Arc;
411407
use std::sync::atomic::{AtomicUsize, Ordering};
412408

413-
use crate::file_stream::{FileOpenFuture, FileOpener, FileStream, OnError};
409+
use crate::file_stream::{FileOpenFuture, FileOpener, FileStreamBuilder, OnError};
414410
use crate::test_util::MockSource;
415411
use arrow::array::RecordBatch;
416412
use arrow::datatypes::Schema;
@@ -530,7 +526,7 @@ mod tests {
530526

531527
let on_error = self.on_error;
532528

533-
let table_schema = crate::table_schema::TableSchema::new(file_schema, vec![]);
529+
let table_schema = TableSchema::new(file_schema, vec![]);
534530
let config = FileScanConfigBuilder::new(
535531
ObjectStoreUrl::parse("test:///").unwrap(),
536532
Arc::new(MockSource::new(table_schema)),
@@ -539,10 +535,12 @@ mod tests {
539535
.with_limit(self.limit)
540536
.build();
541537
let metrics_set = ExecutionPlanMetricsSet::new();
542-
let file_stream =
543-
FileStream::new(&config, 0, Arc::new(self.opener), &metrics_set)
544-
.unwrap()
545-
.with_on_error(on_error);
538+
let file_stream = FileStreamBuilder::new(&config)
539+
.with_partition(0)
540+
.with_file_opener(Arc::new(self.opener))
541+
.with_metrics(&metrics_set)
542+
.with_on_error(on_error)
543+
.build()?;
546544

547545
file_stream
548546
.collect::<Vec<_>>()
@@ -563,6 +561,23 @@ mod tests {
563561
.expect("error executing stream")
564562
}
565563

564+
/// Create the smallest valid file scan config for builder validation tests.
565+
fn builder_test_config() -> FileScanConfig {
566+
let table_schema = TableSchema::new(Arc::new(Schema::empty()), vec![]);
567+
FileScanConfigBuilder::new(
568+
ObjectStoreUrl::parse("test:///").unwrap(),
569+
Arc::new(MockSource::new(table_schema)),
570+
)
571+
.with_file(PartitionedFile::new("mock_file", 10))
572+
.build()
573+
}
574+
575+
/// Convenience helper to keep builder error assertions focused on the
576+
/// specific missing or invalid input under test.
577+
fn builder_error(builder: FileStreamBuilder<'_>) -> String {
578+
builder.build().err().unwrap().to_string()
579+
}
580+
566581
#[tokio::test]
567582
async fn on_error_opening() -> Result<()> {
568583
let batches = FileStreamTest::new()
@@ -857,4 +872,36 @@ mod tests {
857872

858873
Ok(())
859874
}
875+
876+
#[test]
877+
fn builder_requires_partition_file_opener_and_metrics() {
878+
let config = builder_test_config();
879+
880+
let err = builder_error(FileStreamBuilder::new(&config));
881+
assert!(err.contains("FileStreamBuilder missing required partition"));
882+
883+
let err = builder_error(FileStreamBuilder::new(&config).with_partition(0));
884+
assert!(err.contains("FileStreamBuilder missing required file_opener"));
885+
886+
let err = builder_error(
887+
FileStreamBuilder::new(&config)
888+
.with_partition(0)
889+
.with_file_opener(Arc::new(TestOpener::default())),
890+
);
891+
assert!(err.contains("FileStreamBuilder missing required metrics"));
892+
}
893+
894+
#[test]
895+
fn builder_errors_on_invalid_partition() {
896+
let config = builder_test_config();
897+
let metrics = ExecutionPlanMetricsSet::new();
898+
899+
let err = builder_error(
900+
FileStreamBuilder::new(&config)
901+
.with_partition(1)
902+
.with_file_opener(Arc::new(TestOpener::default()))
903+
.with_metrics(&metrics),
904+
);
905+
assert!(err.contains("FileStreamBuilder invalid partition index: 1"));
906+
}
860907
}

0 commit comments

Comments
 (0)