-
Notifications
You must be signed in to change notification settings - Fork 35
Expand file tree
/
Copy pathdatasetxpt_metadata_reader.py
More file actions
190 lines (176 loc) · 7.22 KB
/
datasetxpt_metadata_reader.py
File metadata and controls
190 lines (176 loc) · 7.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
import pyreadstat
from cdisc_rules_engine.services import logger
from cdisc_rules_engine.config import config
from cdisc_rules_engine.services.adam_variable_reader import AdamVariableReader
from cdisc_rules_engine.constants import DEFAULT_ENCODING
import os
class DatasetXPTMetadataReader:
"""
Responsibility of the class is to read metadata
from .xpt file.
"""
# TODO. Maybe in future it is worth having multiple constructors
# like from_bytes, from_file etc. But now there is no immediate need for that.
def __init__(
self, file_path: str, file_name: str, encoding: str = DEFAULT_ENCODING, **kwargs
):
file_size = os.path.getsize(file_path)
if file_size > config.get_dataset_size_threshold():
self._estimate_dataset_length = True
self.row_limit = 1
else:
self._estimate_dataset_length = False
self.row_limit = 0
self._metadata_container = {}
self._first_record = None
self._dataset_name = file_name.split(".")[0].upper()
self._file_path = file_path
self.encoding = encoding
def read(self) -> dict:
"""
Extracts metadata from binary contents of .xpt file.
"""
encoding = self.encoding or DEFAULT_ENCODING
try:
dataset, metadata = pyreadstat.read_xport(
self._file_path, encoding=encoding, row_limit=self.row_limit
)
except (pyreadstat.ReadstatError, UnicodeDecodeError):
return {
"variable_labels": [],
"variable_names": [],
"variable_formats": [],
"variable_name_to_label_map": {},
"variable_name_to_data_type_map": {},
"variable_name_to_size_map": {},
"number_of_variables": 0,
"dataset_label": "",
"dataset_length": 0,
"first_record": {},
"dataset_name": "",
"dataset_modification_date": "",
}
self._first_record = self._extract_first_record(dataset)
self._metadata_container = {
"variable_labels": list(metadata.column_labels),
"variable_names": list(metadata.column_names),
"variable_formats": [
"" if (data_type == "NULL" or data_type is None) else data_type
for data_type in metadata.original_variable_types.values()
],
"variable_name_to_label_map": metadata.column_names_to_labels,
"variable_name_to_data_type_map": metadata.readstat_variable_types,
"variable_name_to_size_map": metadata.variable_storage_width,
"number_of_variables": metadata.number_columns,
"dataset_label": metadata.file_label,
"dataset_length": metadata.number_rows,
"first_record": self._first_record,
"dataset_name": self._dataset_name,
"dataset_modification_date": metadata.modification_time.isoformat(),
}
if self._estimate_dataset_length:
try:
dataset_length: int = self._calculate_dataset_length()
except ValueError:
dataset_length = 0
self._metadata_container["dataset_length"] = dataset_length
self._convert_variable_types()
self._metadata_container["adam_info"] = self._extract_adam_info(
self._metadata_container["variable_names"]
)
logger.info(f"Extracted dataset metadata. metadata={self._metadata_container}")
return self._metadata_container
def _extract_first_record(self, df):
try:
return {
name: value.decode("utf-8") if isinstance(value, bytes) else str(value)
for name, value in df.iloc[0].items()
}
except IndexError:
pass
return None
def _calculate_dataset_length(self):
encoding = self.encoding or DEFAULT_ENCODING
_, meta = pyreadstat.read_xport(
self._file_path, encoding=encoding, metadataonly=True
)
row_size = sum(meta.variable_storage_width.values())
total_size = os.path.getsize(self._file_path)
start = self._read_header(self._file_path)
remainder = (total_size - start) % row_size
estimated_rows = (total_size - start - remainder) // row_size
if row_size < 80:
padding = self._count_trailing_padding(self._file_path, row_size)
return (total_size - start - padding) // row_size
return estimated_rows
def _count_trailing_padding(self, file_path, row_size):
"""
reads the file from the end in chunks of 80 bytes and counts the total number of trailing padding bytes
"""
padding_size = 0
with open(file_path, "rb") as file:
file.seek(0, os.SEEK_END)
file_size = file.tell()
file.seek(file_size - 1)
while file.tell() > 0:
byte = file.read(1)
if byte == b" ":
padding_size += 1
if file.tell() == 1:
break
file.seek(file.tell() - 2)
else:
break
return padding_size
def _read_header(self, file_path):
"""
read the header of the file to find the start of the data section in 10kb chunks
"""
marker = b"HEADER RECORD*******OBS HEADER RECORD!!!!!!!000000000000000000000000000000"
chunk_size = 1024
read_header = b""
padding_chars = (b"\x00", b" ")
with open(file_path, "rb") as file:
while True:
chunk = file.read(chunk_size)
if not chunk:
break
read_header += chunk
position = read_header.find(marker)
if position != -1:
data_start = position + len(marker)
for i in range(data_start, len(read_header)):
if read_header[i : i + 1] not in padding_chars:
return i
raise ValueError("End descriptor not found in the file header.")
def _convert_variable_types(self):
"""
Converts variable types to the format that
rule authors use.
"""
rule_author_type_map: dict = {
"string": "Char",
"double": "Num",
"Character": "Char",
"Numeric": "Num",
}
for key, value in self._metadata_container[
"variable_name_to_data_type_map"
].items():
self._metadata_container["variable_name_to_data_type_map"][key] = (
rule_author_type_map[value]
)
def _extract_adam_info(self, variable_names):
ad = AdamVariableReader()
adam_columns = ad.extract_columns(variable_names)
for column in adam_columns:
ad.check_y(column)
ad.check_w(column)
ad.check_xx_zz(column)
adam_info_dict = {
"categorization_scheme": ad.categorization_scheme,
"w_indexes": ad.w_indexes,
"period": ad.period,
"selection_algorithm": ad.selection_algorithm,
}
return adam_info_dict