Skip to content

Commit 638c765

Browse files
committed
feat: add csv_pipeline to hold handler & processor
1 parent d102736 commit 638c765

1 file changed

Lines changed: 82 additions & 0 deletions

File tree

src/csv_pipeline.rs

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
use std::fs::File;
2+
3+
use csv::Reader;
4+
5+
use crate::config::Config;
6+
pub(crate) use crate::prelude::*;
7+
use crate::processing::{CsvHandler, CsvProcessor};
8+
use crate::retained::RetainedData;
9+
10+
pub struct CsvPipeline {
11+
reader: Reader<File>,
12+
handler: CsvHandler,
13+
processor: CsvProcessor,
14+
}
15+
16+
impl CsvPipeline {
17+
pub fn new(config: &Config, retained_data: &mut RetainedData) -> Result<Self> {
18+
let mut reader = csv::ReaderBuilder::new()
19+
.has_headers(config.has_headers)
20+
.from_path(&config.source)
21+
.map_err(|e| Error::CsvRead(format!("Failed to read CSV file from source provided: {e}")))?;
22+
23+
#[rustfmt::skip]
24+
let handler = CsvHandler::new(
25+
config,
26+
retained_data,
27+
reader.headers().map_err(|e| {
28+
Error::CsvHeaders(e.to_string())
29+
})?,
30+
);
31+
32+
let processor = CsvProcessor::new(config);
33+
34+
Ok(Self {
35+
reader,
36+
handler,
37+
processor,
38+
})
39+
}
40+
41+
/// Processes the CSV data and updates the retained data.
42+
///
43+
/// This function iterates over the records in the CSV reader, applies filters using the `CsvHandler`,
44+
/// and retains the specified columns in the `retained_data`.
45+
///
46+
/// # Arguments
47+
///
48+
/// * `retained_data` - A mutable reference to `RetainedData` to store the processed data.
49+
/// * `handler` - A reference to a `CsvHandler` instance for handling CSV processing.
50+
/// * `rdr` - A mutable reference to a `csv::Reader` instance for reading the CSV data.
51+
///
52+
/// # Returns
53+
///
54+
/// * `Result<()>` - Returns `Ok(())` on success, or an `Error` on failure.
55+
///
56+
/// # Errors
57+
///
58+
/// This function can return errors if reading the CSV records fails.
59+
///
60+
/// # Example
61+
///
62+
/// ```rust
63+
/// let mut rdr = csv::Reader::from_path("data.csv").expect("Failed to open CSV file");
64+
/// processor.process(&mut retained_data, &handler, &mut rdr).expect("Failed to process CSV data");
65+
/// ```
66+
pub fn process(&mut self, retained_data: &mut RetainedData) -> Result<()> {
67+
for record_result in self.reader.records() {
68+
let record = record_result?;
69+
70+
if self.handler.row_passes_filters(&record) {
71+
let retained = self.handler.keep_columns(&record);
72+
retained_data.data.push(retained);
73+
}
74+
}
75+
76+
Ok(())
77+
}
78+
79+
pub fn deduplicate(&mut self, retained_data: &mut RetainedData) {
80+
self.processor.deduplicate(retained_data);
81+
}
82+
}

0 commit comments

Comments
 (0)