-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathVC_BC04_operateType.py
More file actions
375 lines (314 loc) · 14.3 KB
/
VC_BC04_operateType.py
File metadata and controls
375 lines (314 loc) · 14.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
"""
VAPORCONE 项目操作类型处理模块
优化前参考:
VC_BC04_operateType20250827.py
该模块实现了数据转换过程中的各种操作类型,包括:
- 单表操作
- 多表联合操作
- 字段映射处理
- 特殊操作类型处理
- 优化的向量化操作处理
"""
import re
import numpy as np
import pandas as pd
from VC_BC03_fetchConfig import *
sys.path.append(SPECIFIC_PATH)
from VC_BC05_studyFunctions import * # type: ignore
from VC_BC06_operateTypeFunctions import get_opertype_function
# 编译的正则表达式模式(优化性能)
COMPILED_CYCLE_PATTERN = re.compile(PATTERN_CYCLE_PRA)
# CSV缓存字典
csv_cache = {}
def get_cached_csv(file_path, needed_columns=None, error_callback=None):
"""
优化的CSV缓存读取
参数:
- file_path (str): CSV文件路径
- needed_columns (list, optional): 需要读取的列名列表
- error_callback (callable, optional): 错误回调函数,用于记录读取异常
返回:
- DataFrame | None: 读取的数据框,读取失败时返回None
"""
global csv_cache
cache_key = (file_path, tuple(sorted(needed_columns)) if needed_columns else None)
if cache_key not in csv_cache:
try:
read_kwargs = {
'dtype': str,
'na_filter': False,
'engine': 'c',
'low_memory': False
}
if needed_columns:
read_kwargs['usecols'] = needed_columns
csv_cache[cache_key] = pd.read_csv(file_path, **read_kwargs)
except Exception as e:
message = f"读取CSV文件失败: {file_path}, 错误: {e}"
print(message)
if error_callback:
error_callback(message=message, stage='CSV读取', detail=str(e))
return None
return csv_cache.get(cache_key)
def singleTable(table):
"""
单表操作,获取并返回指定表的数据
参数:
- table (str): 表名
返回:
- DataFrame: 转换为字符串类型的数据框
"""
format_dataset = getFormatDataset(table)
be_converted_list = format_dataset[table]
return be_converted_list.astype(str)
def tableJoinType1(*tableList):
"""
多表联合操作类型1,基于SUBJID字段进行外连接
参数:
- *tableList: 可变长度的表名列表
返回:
- DataFrame: 联合后的数据框,转换为字符串类型
"""
format_dataset = getFormatDataset(*tableList)
left_info = pandas.DataFrame()
for file_name in tableList:
file_filter_data = format_dataset[file_name]
if left_info.empty:
left_info = file_filter_data
else:
be_converted_list = pandas.merge(
left_info,
file_filter_data,
left_on='SUBJID',
right_on='SUBJID',
how='outer'
).fillna('')
left_info = be_converted_list
return left_info.astype(str)
def precompute_mapping_rules(domain_param, definition_merge_rule):
"""
预计算映射规则,优化性能
参数:
- domain_param (dict): 域参数字典
- definition_merge_rule (dict): 定义合并规则字典
返回:
- dict: 预计算的映射规则字典
"""
precomputed_rules = {}
for definition_row_num, definition_param_dict in domain_param.items():
if definition_row_num not in definition_merge_rule:
continue
combo_file_name = definition_merge_rule[definition_row_num][COL_MERGERULE]
if not combo_file_name:
continue
cycle_time = definition_merge_rule[definition_row_num][COL_DEFINITION]
field_rules = {}
needed_columns = set(['SUBJID'])
for standard_field, sdtm_field_param in definition_param_dict.items():
fieldname = sdtm_field_param[COL_FIELDNAME]
parameter = sdtm_field_param[COL_PARAMETER]
opertype = sdtm_field_param[COL_OPERTYPE]
fieldname_cycles = []
parameter_cycles = []
for i in range(cycle_time):
if COMPILED_CYCLE_PATTERN.match(fieldname):
fieldname_str = COMPILED_CYCLE_PATTERN.sub(r"\1", fieldname)
fieldname_list = fieldname_str.split(MARK_DOLLAR)
updated_column_names = [fieldname_list[i]] if i < len(fieldname_list) else []
else:
fieldname_list = [f for f in fieldname.split(MARK_DOLLAR) if f]
updated_column_names = fieldname_list
fieldname_cycles.append(updated_column_names)
needed_columns.update(updated_column_names)
if COMPILED_CYCLE_PATTERN.match(parameter):
parameter_str = COMPILED_CYCLE_PATTERN.sub(r"\1", parameter)
parameter_list = parameter_str.split(MARK_DOLLAR)
cycle_parameter = parameter_list[i] if i < len(parameter_list) else ""
else:
cycle_parameter = parameter
parameter_cycles.append(cycle_parameter)
# 添加额外需要的列(用于条件判断)
if opertype == OPERTYPE_IIF and cycle_parameter:
for param_record in cycle_parameter.split(MARK_DOLLAR):
if MARK_COLON in param_record:
flg_field = param_record.split(MARK_COLON)[0]
needed_columns.add(flg_field)
elif opertype == OPERTYPE_SEL and cycle_parameter:
if MARK_COLON in cycle_parameter:
flg_field = cycle_parameter.split(MARK_COLON)[0]
needed_columns.add(flg_field)
field_rules[standard_field] = {
'fieldname_cycles': fieldname_cycles,
'parameter_cycles': parameter_cycles,
'opertype': opertype,
'ndkey': sdtm_field_param[COL_NDKEY]
}
precomputed_rules[definition_row_num] = {
'combo_file_name': combo_file_name,
'cycle_time': cycle_time,
'field_rules': field_rules,
'needed_columns': list(needed_columns)
}
return precomputed_rules
def prepare_epoch_sort(df, sort_keys):
"""
预处理 EPOCH 字段用于排序:将 'TREATMENT1' 等转为数值。
返回:
- (actual_sort_keys, epoch_col_name): 替换后的排序键列表和临时列名(或 None)
"""
epoch_col = None
actual_sort_keys = list(sort_keys)
for i, key in enumerate(actual_sort_keys):
if key == 'EPOCH' and 'EPOCH' in df.columns:
epoch_col = '_EPOCH_SORT'
df[epoch_col] = df['EPOCH'].fillna('').astype(str)
df[epoch_col] = df[epoch_col].str.replace('TREATMENT', '', regex=False)
df[epoch_col] = pd.to_numeric(df[epoch_col], errors='coerce').fillna(0).astype('int32')
actual_sort_keys[i] = epoch_col
break
return actual_sort_keys, epoch_col
def ultra_fast_sequence_generation(df, seq_field, sort_keys, domain_key, sequenceDict):
"""
超高效序号生成算法 - 保持序号连续性
参数:
- df (DataFrame): 数据框
- seq_field (str): 序号字段名
- sort_keys (list): 排序键列表
- domain_key (str): 域名
- sequenceDict (dict): 序号字典
返回:
- DataFrame: 添加序号后的数据框
"""
if df.empty or seq_field not in df.columns:
return df
# 1. 预排序优化
sort_df = df.copy()
# 2-3. 预处理EPOCH字段并构建排序列
actual_sort_keys, epoch_col = prepare_epoch_sort(sort_df, sort_keys)
# 4. 高效排序
sort_df = sort_df.sort_values(actual_sort_keys, kind='mergesort') # 稳定排序
# 5. 序号生成 - 保持连续性(复制原版逻辑)
sequences = []
prev_usubjid = None
seq_counter = 1
for idx, row in sort_df.iterrows():
usubjid = row[VARIABLE_USUBJID]
if usubjid != prev_usubjid:
# 🔑 关键:使用全局sequenceDict保持连续性
seq_counter = sequenceDict[usubjid][domain_key]
sequences.append(str(seq_counter))
seq_counter += 1
# 🔑 关键:更新全局sequenceDict
sequenceDict[usubjid][domain_key] = seq_counter
prev_usubjid = usubjid
sort_df[seq_field] = sequences
# 6. 清理临时列
if epoch_col and epoch_col in sort_df.columns:
sort_df = sort_df.drop(epoch_col, axis=1)
return sort_df
def vectorized_field_mapping(result_df, be_converted_df, standard_field, field_rule, cycle_idx, codeDict, definition_row_num=None, error_callback=None):
"""
向量化字段映射处理 - 使用 VC_BC06 中拆分的操作类型函数
参数:
- result_df (DataFrame): 结果数据框
- be_converted_df (DataFrame): 源数据框
- standard_field (str): 标准字段名
- field_rule (dict): 字段规则
- cycle_idx (int): 循环索引
- codeDict (dict): 代码字典
- definition_row_num (int, optional): 定义行号(用于错误报告)
- error_callback (callable, optional): 错误记录回调
返回:
- tuple: (更新后的结果数据框, 继续标志数组)
"""
fieldname_cycle = field_rule['fieldname_cycles'][cycle_idx] if cycle_idx < len(field_rule['fieldname_cycles']) else []
parameter_cycle = field_rule['parameter_cycles'][cycle_idx] if cycle_idx < len(field_rule['parameter_cycles']) else ""
opertype = field_rule['opertype']
continue_flags = np.zeros(len(result_df), dtype=bool)
# DEF操作不需要fieldname,直接处理
if not fieldname_cycle and opertype != OPERTYPE_DEF:
return result_df, continue_flags
try:
# 获取操作类型对应的处理函数
opertype_func = get_opertype_function(opertype)
if opertype_func:
# 调用拆分的操作类型函数
result_df, continue_flags = opertype_func(
result_df=result_df,
be_converted_df=be_converted_df,
standard_field=standard_field,
fieldname_cycle=fieldname_cycle,
parameter_cycle=parameter_cycle,
codeDict=codeDict
)
elif opertype:
# 处理特殊操作类型 - 调用specialType函数(如果存在)
try:
special_error_logged = False
for idx in range(len(be_converted_df)):
be_converted_row = be_converted_df.iloc[idx]
domain_row = {col: result_df.iloc[idx][col] for col in result_df.columns}
row_continue_flg = False
try:
# 尝试调用specialType函数
domain_row, row_continue_flg = specialType( # type: ignore
domain_row, standard_field, opertype, parameter_cycle,
be_converted_row, fieldname_cycle, codeDict, False
)
result_df.iloc[idx, result_df.columns.get_loc(standard_field)] = domain_row[standard_field]
if row_continue_flg:
continue_flags[idx] = True
except NameError:
warn_message = f"警告: 特殊操作类型 '{opertype}' 无法处理,specialType函数未定义"
print(warn_message)
if definition_row_num:
print(f"警告发生在Excel的第 {definition_row_num} 行")
if error_callback and not special_error_logged:
error_callback(message=warn_message, stage='特殊操作类型', field=standard_field)
special_error_logged = True
break
except Exception as e:
warn_message = f"警告: 特殊操作类型 '{opertype}' 处理失败: {str(e)}"
print(warn_message)
if definition_row_num:
print(f"警告发生在Excel的第 {definition_row_num} 行")
if error_callback and not special_error_logged:
error_callback(message=warn_message, stage='特殊操作类型', field=standard_field, detail=str(e))
special_error_logged = True
continue
except Exception as e:
err_message = f"处理特殊操作类型时发生错误: {str(e)}"
print(err_message)
if definition_row_num:
print(f"错误发生在Excel的第 {definition_row_num} 行")
if error_callback:
error_callback(message=err_message, stage='特殊操作类型', field=standard_field, detail=str(e))
except KeyError as e:
# 处理键错误
print(f'KeyError: 字段 {standard_field} 处理出错')
print(f'KeyError: 操作类型 {opertype} 处理出错')
print(f'KeyError: 参数 {parameter_cycle} 处理出错')
print(f'KeyError: 详细错误信息: {str(e)}')
if definition_row_num:
print(f'错误发生在Excel的第 {definition_row_num} 行')
if error_callback:
error_callback(
message=f"字段 {standard_field} 映射发生 KeyError: {str(e)}",
stage='字段映射',
field=standard_field,
detail=f"操作类型 {opertype}, 参数 {parameter_cycle}"
)
except Exception as e:
err_message = f"字段映射处理时发生错误: {str(e)}"
print(err_message)
print(f"处理字段: {standard_field}, 操作类型: {opertype}, 参数: {parameter_cycle}")
if definition_row_num:
print(f"错误发生在Excel的第 {definition_row_num} 行")
if error_callback:
error_callback(
message=err_message,
stage='字段映射',
field=standard_field,
detail=f"操作类型 {opertype}, 参数 {parameter_cycle}"
)
return result_df, continue_flags