Skip to content

Commit 6bb6bdc

Browse files
committed
🐛 could cause task updates to fail
1 parent 7e26aaa commit 6bb6bdc

3 files changed

Lines changed: 26 additions & 28 deletions

File tree

runtime/datamate-python/app/core/security.py

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,18 @@ def preserve_sensitive_values(
6969
original_nested = original_config.get(key, {})
7070
preserved_config[key] = preserve_sensitive_values(new_value, original_nested)
7171
elif isinstance(new_value, list):
72-
# Process list items (preserve original if matching structure)
7372
original_list = original_config.get(key, [])
74-
preserved_config[key] = [
75-
preserve_sensitive_values(new_item, orig_item)
76-
if isinstance(new_item, dict) and isinstance(orig_item, dict)
77-
else new_item
78-
for new_item, orig_item in zip(new_value, original_list)
79-
] if len(new_value) == len(original_list) else new_value
73+
preserved_list = []
74+
for i, new_item in enumerate(new_value):
75+
if i < len(original_list):
76+
orig_item = original_list[i]
77+
if isinstance(new_item, dict) and isinstance(orig_item, dict):
78+
preserved_list.append(preserve_sensitive_values(new_item, orig_item))
79+
else:
80+
preserved_list.append(new_item)
81+
else:
82+
preserved_list.append(new_item)
83+
preserved_config[key] = preserved_list
8084
else:
8185
# Keep non-sensitive/non-masked values
8286
preserved_config[key] = new_value
@@ -139,18 +143,17 @@ def mask_sensitive_info(text: str) -> str:
139143

140144
for field in SENSITIVE_FIELDS:
141145
patterns = [
142-
rf'"{field}"\s*:\s*"[^"]*"',
143146
rf'"{field}"\s*:\s*"[^"]*"',
144147
rf'{field}\s*=\s*[^\s,\]]+',
145148
rf"'{field}'\s*:\s*'[^']*'",
146149
]
147150

148151
for pattern in patterns:
149152
if '"' in pattern:
150-
masked_text = re.sub(pattern, f'"{field}": "{MASK_PATTERN}"', masked_text)
153+
masked_text = re.sub(pattern, f'"{field}": "{MASK_PATTERN}"', masked_text, flags=re.IGNORECASE)
151154
elif "'" in pattern:
152-
masked_text = re.sub(pattern, f"'{field}': '{MASK_PATTERN}'", masked_text)
155+
masked_text = re.sub(pattern, f"'{field}': '{MASK_PATTERN}'", masked_text, flags=re.IGNORECASE)
153156
else:
154-
masked_text = re.sub(pattern, f'{field}={MASK_PATTERN}', masked_text)
157+
masked_text = re.sub(pattern, f'{field}={MASK_PATTERN}', masked_text, flags=re.IGNORECASE)
155158

156159
return masked_text

runtime/datamate-python/app/module/collection/client/datax_client.py

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -119,25 +119,19 @@ def run_datax_job(self):
119119
Returns:
120120
执行结果字典
121121
"""
122-
# 创建配置文件
123-
self.create__config_file()
122+
if not self.execution.started_at:
123+
self.execution.started_at = datetime.now()
124+
124125
try:
125-
# 构建命令
126+
self.create__config_file()
126127
cmd = [self.python_path, str(self.datax_main), str(self.config_file_path)]
127128
cmd_str = ' '.join(cmd)
128129
logger.info(f"执行命令: {cmd_str}")
129-
if not self.execution.started_at:
130-
self.execution.started_at = datetime.now()
131-
# 执行命令并写入日志
132130
with open(self.execution.log_path, 'w', encoding='utf-8') as log_f:
133-
# 写入头信息
134131
self.write_header_log(cmd_str, log_f)
135-
# 启动datax进程
136132
exit_code = self._run_process(cmd, log_f)
137-
# 记录结束时间
138133
self.execution.completed_at = datetime.now()
139134
self.execution.duration_seconds = (self.execution.completed_at - self.execution.started_at).total_seconds()
140-
# 写入结束信息
141135
self.write_tail_log(exit_code, log_f)
142136
if exit_code == 0:
143137
logger.info(f"DataX 任务执行成功: {self.execution.id}")
@@ -150,17 +144,17 @@ def run_datax_job(self):
150144
logger.error(self.execution.error_message)
151145
except Exception as e:
152146
self.execution.completed_at = datetime.now()
153-
self.execution.duration_seconds = (self.execution.completed_at - self.execution.started_at).total_seconds()
147+
if self.execution.started_at:
148+
self.execution.duration_seconds = (self.execution.completed_at - self.execution.started_at).total_seconds()
154149
self.execution.error_message = f"执行异常: {e}"
155150
self.execution.status = TaskStatus.FAILED.name
156151
logger.error(f"执行异常: {e}", exc_info=True)
152+
with open(self.execution.log_path, 'w', encoding='utf-8') as log_f:
153+
log_f.write(f"任务执行失败: {e}\n")
157154

158-
# 根据同步模式更新任务状态
159155
if self.task.sync_mode == SyncMode.ONCE:
160-
# 一次性任务:使用执行结果作为最终状态
161156
self.task.status = self.execution.status
162157
else:
163-
# 定时任务:恢复为 PENDING 状态,等待下次执行
164158
self.task.status = TaskStatus.PENDING.name
165159

166160
def rename_collection_result(self):

runtime/datamate-python/app/module/collection/interface/collection.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -315,13 +315,14 @@ async def update_task(
315315
template = template.scalar_one_or_none()
316316
if template:
317317
# Use preserved config to generate DataX config
318+
config_obj = CollectionConfig(**preserved_config)
318319
DataxClient.generate_datx_config(
319-
CollectionConfig(**preserved_config),
320+
config_obj,
320321
template,
321322
task.target_path
322323
)
323-
# Save preserved config to database
324-
task.config = json.dumps(preserved_config)
324+
# Save the modified config (with regenerated job) to database
325+
task.config = json.dumps(config_obj.model_dump())
325326

326327
# 如果任务处于 FAILED 状态,修改后重置为 PENDING,允许重新执行
327328
if task.status == TaskStatus.FAILED.name:

0 commit comments

Comments
 (0)