-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathencounter.py
More file actions
211 lines (181 loc) · 8.37 KB
/
encounter.py
File metadata and controls
211 lines (181 loc) · 8.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
from typing import List, Optional
from pydantic import BaseModel, Field, validator
from linuxforhealth.csvtofhir.fhirutils import csv_record_validator as csvrecord
from linuxforhealth.csvtofhir.fhirutils import fhir_utils
from linuxforhealth.csvtofhir.fhirutils.fhir_constants import SystemConstants, ValueDataAbsentReason
from linuxforhealth.csvtofhir.model.csv.base import CsvBaseModel
ENCOUNTER_STATUS_DEFAULT_VALUE = "unknown"
ENCOUNTER_CLASS_CODING_SYSTEM = SystemConstants.ENCOUNTER_CLASS_CODING_SYSTEM
class EncounterStatusHistoryEntry(BaseModel):
status: Optional[str]
start_time: Optional[str]
end_time: Optional[str]
class EncounterCsv(CsvBaseModel):
patientInternalId: Optional[str] = Field(
description="Patient Internal Id within the source system. Added to resource.identifier as type 'PI' "
"Patient Internal id will be also used as (resource.id). "
"Restriction on characters: [A-Za-z0-9\\-\\.]{1,64}."
"All illegal characters will be replaced with '-' (example Hello!Id -> Hello-Id) "
"Specified AssigningAuthority field will be added to this identifier."
)
accountNumber: Optional[str] = Field(
description="Patient Account Number. Should be unique for patient."
"Added to resource.identifier as type 'AN'. "
"It is expected that patient might have more than one account number associated with their identity."
"Specified AssigningAuthority field will be added to this identifier."
)
ssn: Optional[str] = Field(
description="Patient Social Security Number. Should be unique for a patient. "
"Added to resource.identifier as type 'SS'."
"Dashes will be removed. Must contain 9 integers"
"System source: ssnSystem added in data-contract is used, None if not provided."
"Social Security numbers that repeat the same number will be set to None (ex. 000-00-0000, 999-99-9999)."
)
ssnSystem: Optional[str]
mrn: Optional[str]
encounterNumber: Optional[str] = Field(
description="Encounter or Visit Number. "
"Combined with AssigningAuthority column to create a unique encounter within the system"
)
resourceInternalId: Optional[str]
assigningAuthority: Optional[str]
encounterSourceRecordId: Optional[str]
encounterStatus: Optional[str] = ENCOUNTER_STATUS_DEFAULT_VALUE
encounterClassCode: Optional[str]
encounterClassText: Optional[str]
encounterClassSystem: Optional[str] = SystemConstants.ENCOUNTER_CLASS_CODING_SYSTEM
encounterPriorityCode: Optional[str]
encounterPriorityText: Optional[str]
encounterPriorityCodeSystem: Optional[str] = SystemConstants.PRIORITY_SYSTEM
encounterStartDateTime: Optional[str]
encounterEndDateTime: Optional[str]
encounterLengthValue: Optional[str]
encounterLengthUnits: Optional[str]
encounterReasonCode: Optional[str]
encounterReasonCodeSystem: Optional[str]
encounterReasonCodeText: Optional[str]
# hospitalization section within Encounter
hospitalizationAdmitSourceCode: Optional[str]
hospitalizationAdmitSourceCodeText: Optional[str]
hospitalizationAdmitSourceCodeSystem: Optional[str] = SystemConstants.ADMISSION_SOURCE_SYSTEM
hospitalizationReAdmissionCode: Optional[str]
hospitalizationReAdmissionCodeText: Optional[str]
hospitalizationReAdmissionCodeSystem: Optional[str]
hospitalizationDischargeDispositionCode: Optional[str]
hospitalizationDischargeDispositionCodeText: Optional[str]
hospitalizationDischargeDispositionCodeSystem: Optional[str] = SystemConstants.DISCHARGE_DISPOSITION_SYSTEM
# participant section within Encounter
encounterParticipantSequenceId: Optional[str]
encounterParticipantTypeCode: Optional[str]
encounterParticipantTypeText: Optional[str]
encounterParticipantTypeCodeSystem: Optional[str] = SystemConstants.PARTICIPANT_TYPE_SYSTEM
practitionerInternalId: Optional[str]
practitionerNPI: Optional[str]
practitionerNameLast: Optional[str]
practitionerNameFirst: Optional[str]
practitionerGender: Optional[str]
practitionerRoleText: Optional[str]
practitionerRoleCodes: Optional[str]
practitionerRoleCodesSystem: Optional[str]
practitionerSpecialtyCodes: Optional[str]
practitionerSpecialtyCodesSystem: Optional[str]
practitionerSpecialtyText: Optional[str]
# location section within Encounter
encounterLocationSequenceId: Optional[str]
encounterLocationPeriodStart: Optional[str]
encounterLocationPeriodEnd: Optional[str]
locationResourceInternalId: Optional[str]
locationName: Optional[str]
locationTypeCode: Optional[str]
locationTypeText: Optional[str]
locationTypeCodeSystem: Optional[str]
# status history is an array of strings.
# with each string representing status history status^start_date^end_date
encounterStatusHistory: Optional[List[str]]
# Extension: Insured
encounterInsuredEntryId: Optional[str]
encounterInsuredRank: Optional[int]
encounterInsuredCategoryCode: Optional[str]
encounterInsuredCategorySystem: Optional[str]
encounterInsuredCategoryText: Optional[str]
# Extension: Claim-type
encounterClaimType: Optional[str]
# Extension: drgCode
encounterDrgCode: Optional[str]
@validator("ssn")
def validate_ssn(cls, v):
return csvrecord.validate_ssn(v)
@validator("encounterStatus", always=True)
def validate_encounterStatus(cls, v):
if v:
return v
return ENCOUNTER_STATUS_DEFAULT_VALUE
@validator("encounterClassSystem", always=True)
def validate_encounterClassSystem(cls, v):
if v:
return v
return ENCOUNTER_CLASS_CODING_SYSTEM
@validator("hospitalizationAdmitSourceCodeSystem", always=True)
def validate_hospitalizationAdmitSourceCodeSystem(cls, v):
if v:
return v
return SystemConstants.ADMISSION_SOURCE_SYSTEM
@validator("hospitalizationDischargeDispositionCodeSystem", always=True)
def validate_hospitalizationDischargeDispositionCodeSystem(cls, v):
if v:
return v
return SystemConstants.DISCHARGE_DISPOSITION_SYSTEM
@validator("encounterParticipantTypeCodeSystem", always=True)
def validate_encounterParticipantTypeCodeSystem(cls, v):
if v:
return v
return SystemConstants.PARTICIPANT_TYPE_SYSTEM
@validator("encounterPriorityCodeSystem", always=True)
def validate_encounterPriorityCodeSystem(cls, v):
if v:
return v
return SystemConstants.PRIORITY_SYSTEM
# Order indicates Primary, Secondary, Tertiary, etc.. it is a good entry id to allow overwriting values
# However, user has an option to provide a different id
def get_insured_entry_id(self):
if any([self.encounterInsuredRank, self.encounterInsuredEntryId]):
return (
self.encounterInsuredEntryId
if self.encounterInsuredEntryId
else self.encounterInsuredRank
)
return (
self.encounterInsuredCategoryCode
if self.encounterInsuredCategoryCode
else self.encounterInsuredCategoryText
)
def get_drg_code(self):
return self.encounterDrgCode
def get_required_field_encounter_class(self):
if self.encounterClassCode: # Encounter Class is set
return fhir_utils.get_coding(
self.encounterClassCode,
self.encounterClassSystem,
self.encounterClassText
)
# set to temporarily unknown codeable concept
return fhir_utils.get_coding(
ValueDataAbsentReason.CODE_TEMPORARILY_UNKNOWN[0],
ValueDataAbsentReason.SYSTEM,
ValueDataAbsentReason.CODE_TEMPORARILY_UNKNOWN[1]
)
def contains_hospitalization_entries(self) -> bool:
"""
Return True if any of the hospitalization fields are set/active
otherwise returns False to make sure hospitalization segment in the encounter stays as None
"""
return any(
[
self.hospitalizationAdmitSourceCode,
self.hospitalizationAdmitSourceCodeText,
self.hospitalizationReAdmissionCode,
self.hospitalizationReAdmissionCodeText,
self.hospitalizationDischargeDispositionCode,
self.hospitalizationDischargeDispositionCodeText
]
)