-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathVC_OP03_insertMetadata.py
More file actions
321 lines (269 loc) · 12.6 KB
/
Copy pathVC_OP03_insertMetadata.py
File metadata and controls
321 lines (269 loc) · 12.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
"""
VAPORCONE 项目元数据插入模块
该模块负责将处理后的数据作为元数据插入到数据库中,包括:
- 读取清洗后的数据文件
- 判断字段类型(日期类型等)
- 格式化字段值
- 插入元数据表
"""
from VC_BC03_fetchConfig import *
import time # 性能计时
STEP_ID = 'OP03'
STEP_NAME = 'InsertMetadata'
def main():
"""
主函数,执行元数据插入流程
"""
logger = create_logger(
os.path.join(SPECIFIC_PATH, 'log_file.log'),
log_level=logging.DEBUG
)
# 获取配置信息
workbook = load_workbook(filename=os.path.join(SPECIFIC_PATH, CONFIG_NAME))
sheetSetting = getSheetSetting(workbook)
# 获取相关字典和配置
caseDict = getCaseDict(workbook, sheetSetting)
fileDict = getFileDict(workbook, sheetSetting)
# 注意:已移除4OTHER功能,不再需要codeDict4other
codeDict, _ = getCodeListInfo(workbook, sheetSetting)
_, transFieldDict, _, _ = getProcess(workbook, sheetSetting)
# 根据mapping定义的SDTM字段判断是否为日期型
# SDTM字段以DTC结尾则说明mapping至该字段的原文件字段一定为日期型,否则则一定不是
dateTypeDict = {}
mappingDict, _ = getMapping(workbook, sheetSetting)
for domain_val in mappingDict.values():
for row_param in domain_val.values():
for variable in row_param.keys():
if variable in TIME_VARIABLE or row_param[variable]['SUPPTIMEFLG']:
rFieldName = row_param[variable][COL_FIELDNAME]
if re.match(PATTERN_CYCLE_PRA, rFieldName):
rFieldName = re.sub(PATTERN_CYCLE_PRA, r"\1", rFieldName)
fieldname_list = rFieldName.split(MARK_DOLLAR)
for fieldname in fieldname_list:
dateTypeDict[fieldname] = True
# 总体计时开始
t_total_start = time.perf_counter()
db = DatabaseManager()
db.connect()
try:
# 创建正式表
db.create_metadata_table(METADATA_TABLE_NAME)
# 创建暂存表(使用AUTO_INCREMENT主键,无二级索引)
staging_table_name = f"{METADATA_TABLE_NAME}_STAGING"
staging_sql = f"""
CREATE TABLE IF NOT EXISTS {staging_table_name} (
No INT AUTO_INCREMENT PRIMARY KEY,
FILENAME VARCHAR(100),
ROWNUM INT,
USUBJID VARCHAR(50),
SUBJID VARCHAR(50),
FIELDLBL VARCHAR(200),
FIELDID VARCHAR(100),
METAVAL TEXT,
FORMVAL TEXT,
DATETYPE BOOLEAN,
CODELISTID VARCHAR(100),
CHKFIELDID VARCHAR(100)
) ENGINE=InnoDB
"""
db.cursor.execute(f"DROP TABLE IF EXISTS {staging_table_name}")
db.cursor.execute(staging_sql)
# 暂存表已创建
data = []
# 统计与计时变量
total_files_processed = 0
total_records_to_insert = 0
file_summary = [] # 按文件统计
build_start = time.perf_counter()
# 🆕 动态获取最新的清洗数据文件夹路径
actual_cleaning_path = find_latest_timestamped_path(CLEANINGSTEP_PATH, 'cleaning_dataset')
# 使用最新的清洗数据路径
# 获取清洗后的数据文件列表
all_files = os.listdir(actual_cleaning_path)
files_only = [
file for file in all_files
if os.path.isfile(os.path.join(actual_cleaning_path, file))
]
eligible_files = [f for f in fileDict.keys() if f in transFieldDict]
progress_build = ProgressReporter(total=len(eligible_files), desc='Build')
for fileName in fileDict.keys():
if fileName not in transFieldDict:
continue
# 查找对应的清洗文件
full_name = next((
file_name for file_name in files_only
if f'C-{fileName}{EXTENSION}' == file_name
), None)
if not full_name:
full_name = next((
file_name for file_name in files_only
if f'C-{fileName}{EXTENSION}' in file_name
), None)
if not full_name:
print(f'{fileName}{EXTENSION} is undefined')
sys.exit()
subjectId_fieldID = fileDict[fileName][COL_SUBJIDFIELDID]
file_param = transFieldDict[fileName]
file_record_count = 0
with open(os.path.join(actual_cleaning_path, full_name), 'r', newline=MARK_BLANK, encoding='utf-8-sig') as read_file:
dict_result = csv.DictReader(read_file)
tROWNUM = 0
for row in dict_result:
tROWNUM += 1
tSUBJID = row[subjectId_fieldID]
tUSUBJID = caseDict[tSUBJID]
for tFIELDID, field_param in file_param.items():
if tFIELDID == subjectId_fieldID:
continue
if tFIELDID not in row:
continue
tMETAVAL = row[tFIELDID].strip()
if not tMETAVAL:
continue
tFIELDLBL = field_param[COL_LABEL]
tCODELISTID = field_param[COL_CODELISTNAME]
tCHKFIELDID = field_param[COL_CHKTYPE]
tDATETYPE = dateTypeDict[tFIELDID] if tFIELDID in dateTypeDict else False
tFORMVAL = make_format_value(tMETAVAL, tDATETYPE)
data.append((fileName, tROWNUM, tUSUBJID, tSUBJID, tFIELDLBL, tFIELDID, tMETAVAL, tFORMVAL, tDATETYPE, tCODELISTID, tCHKFIELDID))
file_record_count += 1
file_summary.append({
'file': fileName, 'rows': tROWNUM,
'fields': len(file_param), 'records': file_record_count,
})
total_files_processed += 1
progress_build.update()
progress_build.finish()
build_elapsed = time.perf_counter() - build_start
total_records_to_insert = len(data)
# 生成CSV文件用于LOAD DATA
csv_file_path = os.path.join(SPECIFIC_PATH, f"{METADATA_TABLE_NAME}_staging.csv")
with open(csv_file_path, 'w', newline='', encoding='utf-8') as csvfile:
csv_writer = csv.writer(csvfile, quoting=csv.QUOTE_MINIMAL)
# 不写入表头,直接写入数据行(对应暂存表除了AUTO_INCREMENT主键外的所有列)
for row in data:
# 转换DATETYPE为1/0(MySQL BOOLEAN)
row_list = list(row)
row_list[8] = 1 if row_list[8] else 0 # DATETYPE列
csv_writer.writerow(row_list)
# CSV文件生成完成
# 使用LOAD DATA进行批量导入
load_data_start = time.perf_counter()
# 性能优化设置(导入窗口内临时使用)
# 设置性能优化参数
optimization_settings = [
"SET SESSION foreign_key_checks = 0",
"SET SESSION unique_checks = 0",
"SET SESSION sql_log_bin = 0", # 如果不需要复制到从库
]
for setting in optimization_settings:
try:
db.cursor.execute(setting)
except Exception as e:
print(f"⚠ 设置跳过: {setting}, 错误: {e}")
# 执行LOAD DATA INFILE
# 使用绝对路径并处理Windows路径分隔符
csv_file_path_normalized = csv_file_path.replace('\\', '/')
load_data_sql = f"""
LOAD DATA LOCAL INFILE '{csv_file_path_normalized}'
INTO TABLE {staging_table_name}
FIELDS TERMINATED BY ','
OPTIONALLY ENCLOSED BY '"'
LINES TERMINATED BY '\\n'
(FILENAME, ROWNUM, USUBJID, SUBJID, FIELDLBL, FIELDID, METAVAL, FORMVAL, DATETYPE, CODELISTID, CHKFIELDID)
"""
db.cursor.execute(load_data_sql)
db.connection.commit()
# 检查导入结果
db.cursor.execute(f"SELECT COUNT(*) FROM {staging_table_name}")
imported_count = db.cursor.fetchone()[0]
load_data_elapsed = time.perf_counter() - load_data_start
print(f"LOAD DATA完成: {imported_count} 条记录, 耗时: {load_data_elapsed:.3f}s")
# 从暂存表转移数据到正式表(保持原有表结构和索引)
transfer_start = time.perf_counter()
print("从暂存表转移数据到正式表...")
# 清空正式表(如果需要)
db.cursor.execute(f"TRUNCATE TABLE {METADATA_TABLE_NAME}")
# 使用INSERT INTO ... SELECT 转移数据,手动分配顺序主键No
# 分批转移数据以显示进度(同时手动分配No)
db.cursor.execute(f"SELECT COUNT(*) FROM {staging_table_name}")
total_staging_records = db.cursor.fetchone()[0]
batch_size = 50000
current_no = 1
total_batches = (total_staging_records + batch_size - 1) // batch_size
progress_transfer = ProgressReporter(total=total_batches, desc='Transfer')
for offset in range(0, total_staging_records, batch_size):
batch_sql = f"""
INSERT INTO {METADATA_TABLE_NAME}
(No, FILENAME, ROWNUM, USUBJID, SUBJID, FIELDLBL, FIELDID, METAVAL, FORMVAL, DATETYPE, CODELISTID, CHKFIELDID)
SELECT
@row_number := @row_number + 1 as No,
FILENAME, ROWNUM, USUBJID, SUBJID, FIELDLBL, FIELDID, METAVAL, FORMVAL, DATETYPE, CODELISTID, CHKFIELDID
FROM (SELECT @row_number := {current_no - 1}) r,
(SELECT * FROM {staging_table_name} ORDER BY No LIMIT {batch_size} OFFSET {offset}) s
"""
db.cursor.execute(batch_sql)
current_no += batch_size
progress_transfer.update()
progress_transfer.finish()
db.connection.commit()
# 验证转移结果
db.cursor.execute(f"SELECT COUNT(*) FROM {METADATA_TABLE_NAME}")
final_count = db.cursor.fetchone()[0]
transfer_elapsed = time.perf_counter() - transfer_start
print(f"数据转移完成: {final_count} 条记录, 耗时: {transfer_elapsed:.3f}s")
# 清理暂存表和CSV文件
try:
db.cursor.execute(f"DROP TABLE {staging_table_name}")
os.remove(csv_file_path)
except Exception as e:
print(f"⚠ 清理时出现警告: {e}")
# 恢复优化设置
restore_settings = [
"SET SESSION foreign_key_checks = 1",
"SET SESSION unique_checks = 1",
"SET SESSION sql_log_bin = 1",
]
for setting in restore_settings:
try:
db.cursor.execute(setting)
except Exception as e:
print(f"⚠ 恢复设置跳过: {setting}, 错误: {e}")
# 处理摘要
t_total_elapsed = time.perf_counter() - t_total_start
overall_rps = (total_records_to_insert / t_total_elapsed) if t_total_elapsed > 0 else 0.0
TW = [14, 8, 8, 10]
tcols = ['文件', '数据行', '字段数', '元数据条数']
print_summary_header(f'处理摘要 - {STEP_NAME}')
print(
cjk_ljust(tcols[0], TW[0]) + ' '
+ ' '.join(cjk_rjust(c, w) for c, w in zip(tcols[1:], TW[1:]))
)
print_summary_sep()
for s in file_summary:
print(
f'{s["file"]:<{TW[0]}} {s["rows"]:>{TW[1]}} {s["fields"]:>{TW[2]}} '
f'{s["records"]:>{TW[3]}}'
)
print_summary_sep()
print_summary_kv('处理文件数', total_files_processed)
print_summary_kv('准备插入记录数', total_records_to_insert)
print_summary_kv('实际插入记录数', final_count)
print_summary_kv('总耗时', f'{t_total_elapsed:.3f}s')
print_summary_kv('总体吞吐', f'{overall_rps:.1f} rec/s')
logger.info(
f'Metadata插入完成: 文件={total_files_processed}, '
f'记录={final_count}, 耗时={t_total_elapsed:.3f}s'
)
except Exception as e:
log_and_print(logger, 'ERROR', f'Metadata插入失败: {e}')
traceback.print_exc()
finally:
if db.cursor:
db.cursor.close()
if db.connection and db.connection.is_connected():
db.disconnect()
if __name__ == '__main__':
print_step_header(STEP_ID, STEP_NAME)
main()
print_step_footer(STEP_ID, STEP_NAME)