-
Notifications
You must be signed in to change notification settings - Fork 35
Expand file tree
/
Copy pathcsv_reader.py
More file actions
64 lines (51 loc) · 2.25 KB
/
csv_reader.py
File metadata and controls
64 lines (51 loc) · 2.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import tempfile
import dask.dataframe as dd
from cdisc_rules_engine.exceptions.custom_exceptions import InvalidCSVFile
from cdisc_rules_engine.interfaces import DataReaderInterface
import pandas as pd
from cdisc_rules_engine.models.dataset import PandasDataset, DaskDataset
class CSVReader(DataReaderInterface):
def read(self, data):
"""
Function for reading data from a specific file type and returning a
pandas dataframe of the data.
"""
raise NotImplementedError
def from_file(self, file_path):
try:
with open(file_path, "r", encoding=self.encoding) as fp:
data = pd.read_csv(fp, sep=",", header=0, index_col=False)
data = data.where(data.notna(), None)
if self.dataset_implementation == PandasDataset:
return PandasDataset(data)
else:
return DaskDataset(
dd.from_pandas(data, npartitions=4), length=len(data.index)
)
except (UnicodeDecodeError, UnicodeError) as e:
raise InvalidCSVFile(
f"\n Error reading CSV from: {file_path}"
f"\n Failed to decode with {self.encoding} encoding: {e}"
f"\n Please specify the correct encoding using the -e flag."
)
except Exception as e:
raise InvalidCSVFile(
f"\n Error reading CSV from: {file_path}"
f"\n {type(e).__name__}: {e}"
)
def to_parquet(self, file_path: str) -> tuple[int, str]:
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".parquet")
dataset = pd.read_csv(file_path, chunksize=20000, encoding=self.encoding)
created = False
num_rows = 0
for chunk in dataset:
num_rows += len(chunk)
if not created:
chunk.to_parquet(temp_file.name, engine="fastparquet")
created = True
else:
chunk.to_parquet(temp_file.name, engine="fastparquet", append=True)
if not created:
empty_df = pd.read_csv(file_path, nrows=0, encoding=self.encoding)
empty_df.to_parquet(temp_file.name, engine="fastparquet")
return num_rows, temp_file.name