-
Notifications
You must be signed in to change notification settings - Fork 35
Expand file tree
/
Copy pathdataset_ndjson_reader.py
More file actions
69 lines (56 loc) · 2.39 KB
/
dataset_ndjson_reader.py
File metadata and controls
69 lines (56 loc) · 2.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import pandas as pd
import dask.dataframe as dd
import os
import json
import jsonschema
from cdisc_rules_engine.interfaces import (
DataReaderInterface,
)
from cdisc_rules_engine.models.dataset.dask_dataset import DaskDataset
from cdisc_rules_engine.models.dataset.pandas_dataset import PandasDataset
import tempfile
from cdisc_rules_engine.services.data_readers.json_reader import JSONReader
class DatasetNDJSONReader(DataReaderInterface):
def get_schema(self) -> dict:
schema = JSONReader(encoding="utf-8").from_file(
os.path.join("resources", "schema", "dataset-ndjson-schema.json")
)
return schema
def read_json_file(self, file_path: str) -> dict:
try:
with open(file_path, "r", encoding=self.encoding) as file:
lines = file.readlines()
return json.loads(lines[0]), [json.loads(line) for line in lines[1:]]
except (UnicodeDecodeError, UnicodeError) as e:
raise ValueError(
f"Could not decode NDJSON file {file_path} with {self.encoding} encoding: {e}. "
f"Please specify the correct encoding using the -e flag."
)
def _raw_dataset_from_file(self, file_path) -> pd.DataFrame:
# Load Dataset-JSON Schema
schema = self.get_schema()
metadatandjson, datandjson = self.read_json_file(file_path)
jsonschema.validate(metadatandjson, schema)
df = pd.DataFrame(
[item for item in datandjson],
columns=[item["name"] for item in metadatandjson.get("columns", [])],
)
return df.map(lambda x: round(x, 15) if isinstance(x, float) else x)
def from_file(self, file_path):
try:
df = self._raw_dataset_from_file(file_path)
if self.dataset_implementation == PandasDataset:
return PandasDataset(df)
else:
return DaskDataset(
dd.from_pandas(df, npartitions=4), length=len(df.index)
)
except jsonschema.exceptions.ValidationError:
return PandasDataset(pd.DataFrame())
def to_parquet(self, file_path: str) -> str:
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".parquet")
df = self._raw_dataset_from_file(file_path)
df.to_parquet(temp_file.name)
return len(df.index), temp_file.name
def read(self, data):
pass