-
Notifications
You must be signed in to change notification settings - Fork 35
Expand file tree
/
Copy pathcodelist_terms.py
More file actions
174 lines (168 loc) · 6.81 KB
/
codelist_terms.py
File metadata and controls
174 lines (168 loc) · 6.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
from numpy import nan
import pandas as pd
from cdisc_rules_engine.operations.base_operation import BaseOperation
from cdisc_rules_engine.exceptions.custom_exceptions import (
MissingDataError,
RuleExecutionError,
)
from cdisc_rules_engine.services import logger
class CodelistTerms(BaseOperation):
def _execute_operation(self) -> pd.Series:
if (
self.params.ct_package_type
and self.params.ct_version
and self.params.codelist_code
and self.params.ct_version in self.evaluation_dataset
and (
self.params.term_code
or self.params.term_value
or self.params.term_pref_term
)
):
return self._handle_multiple_versions()
elif self.params.codelists:
return self._handle_single_version()
def _handle_multiple_versions(self) -> pd.Series:
params_count: int = sum(
1
for x in (
self.params.term_code,
self.params.term_value,
self.params.term_pref_term,
)
if x
)
if params_count > 1:
raise RuleExecutionError(
"More than one of term_code, term_pref_term and term_value cannot be specified at the same time."
)
elif self.params.term_code:
left_on = self.params.term_code
right_on = "term_code"
target = f"term_{self.params.returntype or 'value'}"
elif self.params.term_value:
left_on = self.params.term_value
right_on = "term_value"
target = f"term_{self.params.returntype or 'code'}"
elif self.params.term_pref_term:
left_on = self.params.term_pref_term # column from dataset
right_on = "term_pref_term" # column from lib metadata
target = f"term_{self.params.returntype or 'code'}"
ct_versions = self.evaluation_dataset[self.params.ct_version]
unique_ct_versions = ct_versions.unique()
ct_data = self.library_metadata.build_ct_terms(
self.params.ct_package_type, unique_ct_versions
)
ct_df = self.evaluation_dataset.__class__.from_dict(ct_data)
ct_df = ct_df.astype(
{
"version": "string",
"codelist_code": "string",
}
)
cast_cols = {self.params.ct_version: "string"}
if self.params.codelist_code in self.evaluation_dataset.columns:
cast_cols[self.params.codelist_code] = "string"
self.evaluation_dataset = self.evaluation_dataset.astype(cast_cols)
if self.params.codelist_code in self.evaluation_dataset.columns:
result = self.evaluation_dataset.merge(
ct_df.data,
left_on=(
self.params.ct_version,
self.params.codelist_code,
self.evaluation_dataset[left_on].astype(str).str.lower(),
),
right_on=(
"version",
"codelist_code",
ct_df[right_on].astype(str).str.lower(),
),
how="left",
).replace(nan, None)
else:
codelist = ct_df[ct_df["codelist_code"] == self.params.codelist_code]
result = self.evaluation_dataset.merge(
codelist,
left_on=(
self.params.ct_version,
self.evaluation_dataset[left_on].astype(str).str.lower(),
),
right_on=("version", codelist[right_on].astype(str).str.lower()),
how="left",
).replace(nan, None)
return result[target]
def _handle_single_version(self) -> pd.Series:
"""
Returns a list of codelists
Both the level of the codelist check (codelist or term level) and
the type of check (code or value) must be specified.
A list of appropriate submission values or codes is generated
using the list from comparator and the codelist map.
Returns a Series of booleans indicating whether each value is valid.
"""
codelist_names = self.params.codelists
codelist_level = self.params.level
check = self.params.returntype
codelists = []
try:
ct_packages = self.library_metadata._ct_package_metadata
if "define_XML_merged_CT" in ct_packages:
ct_package_data = ct_packages["define_XML_merged_CT"]
elif not ct_packages:
raise MissingDataError(
"CT package data is not populated. "
"A valid define.xml file or -ct command is required to execute."
)
else:
ct_package_data = next(
(
pkg
for name, pkg in ct_packages.items()
if name != "extensible" and not name.startswith("define-xml")
)
)
except AttributeError as e:
logger.warning(
"CT package data is not populated: %s "
"-- a valid define.xml file or -ct command is required to execute",
e,
)
submission_lookup = {
codelist["submissionValue"].lower(): codelist
for codelist in ct_package_data.get("codelists", [])
if "submissionValue" in codelist
}
for codelist_name in codelist_names:
code_obj = submission_lookup.get(codelist_name.lower())
if code_obj is None:
raise MissingDataError(
f"Codelist '{codelist_name}' not found in metadata"
)
codelists.append(code_obj)
values = [
value
for codelist in codelists
for value in self._get_codelist_values(codelist, codelist_level, check)
]
return values
def _get_codelist_values(
self, codelist: dict, codelist_level: str, check: str
) -> list:
"""Extract values from a codelist based on level and check type."""
values = []
if codelist_level == "codelist":
if check == "code":
values.append(codelist["conceptId"])
elif check == "pref_term":
values.append(codelist["preferredTerm"])
else:
values.append(codelist["submissionValue"])
elif codelist_level == "term":
for term in codelist.get("terms", []):
if check == "value":
values.append(term["submissionValue"])
elif check == "pref_term":
values.append(term["preferredTerm"])
else:
values.append(term["conceptId"])
return values