Skip to content

Commit 979556f

Browse files
authored
fix: hide sensitive data (#490)
* 🔒 hide privacy message in task log * 🔒 mask sensitive data * 🐛 could cause task updates to fail
1 parent 1eec08c commit 979556f

5 files changed

Lines changed: 212 additions & 23 deletions

File tree

.gitignore

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,3 +195,10 @@ Thumbs.db
195195
# Environment files - ignore local .env, but allow templates
196196
.env
197197
!.env.example
198+
199+
# AI tools/agent
200+
.agents/
201+
.claude/
202+
data/
203+
openspec/
204+
devspace.yaml
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
"""Security utilities for handling sensitive data"""
2+
import re
3+
from typing import Any, Dict
4+
5+
6+
SENSITIVE_FIELDS = [
7+
"secretKey",
8+
"accessKey",
9+
"password",
10+
"passwd",
11+
"pwd",
12+
"secret",
13+
"token",
14+
"apiKey",
15+
"api_key",
16+
"access_key",
17+
"secret_key",
18+
]
19+
20+
MASK_PATTERN = "**********"
21+
22+
23+
def mask_sensitive_value(value: str) -> str:
24+
"""Mask a single sensitive value"""
25+
return MASK_PATTERN
26+
27+
28+
def is_masked_value(value: str) -> bool:
29+
"""Check if a value is already masked"""
30+
return value == MASK_PATTERN
31+
32+
33+
def preserve_sensitive_values(
34+
new_config: Dict[str, Any],
35+
original_config: Dict[str, Any]
36+
) -> Dict[str, Any]:
37+
"""
38+
Preserve original sensitive values if masked pattern is detected in update request.
39+
40+
When frontend receives masked values and sends them back in update request,
41+
this function detects masked values and replaces them with original values
42+
from database.
43+
44+
Args:
45+
new_config: Config from update request (may contain masked values)
46+
original_config: Original config from database (contains real values)
47+
48+
Returns:
49+
Config with original sensitive values preserved
50+
"""
51+
if not isinstance(new_config, dict) or not isinstance(original_config, dict):
52+
return new_config
53+
54+
preserved_config = {}
55+
for key, new_value in new_config.items():
56+
# Check if key is a sensitive field
57+
key_lower = key.lower()
58+
is_sensitive = any(
59+
field.lower() == key_lower or field.lower() in key_lower
60+
for field in SENSITIVE_FIELDS
61+
)
62+
63+
# If value is masked and field is sensitive, use original value
64+
if is_sensitive and isinstance(new_value, str) and is_masked_value(new_value):
65+
original_value = original_config.get(key)
66+
preserved_config[key] = original_value if original_value else new_value
67+
elif isinstance(new_value, dict):
68+
# Recursively process nested dictionaries
69+
original_nested = original_config.get(key, {})
70+
preserved_config[key] = preserve_sensitive_values(new_value, original_nested)
71+
elif isinstance(new_value, list):
72+
original_list = original_config.get(key, [])
73+
preserved_list = []
74+
for i, new_item in enumerate(new_value):
75+
if i < len(original_list):
76+
orig_item = original_list[i]
77+
if isinstance(new_item, dict) and isinstance(orig_item, dict):
78+
preserved_list.append(preserve_sensitive_values(new_item, orig_item))
79+
else:
80+
preserved_list.append(new_item)
81+
else:
82+
preserved_list.append(new_item)
83+
preserved_config[key] = preserved_list
84+
else:
85+
# Keep non-sensitive/non-masked values
86+
preserved_config[key] = new_value
87+
88+
return preserved_config
89+
90+
91+
def mask_sensitive_dict(data: Dict[str, Any]) -> Dict[str, Any]:
92+
"""
93+
Recursively mask sensitive fields in a dictionary.
94+
95+
Args:
96+
data: Dictionary that may contain sensitive fields
97+
98+
Returns:
99+
Dictionary with sensitive values masked
100+
"""
101+
if not isinstance(data, dict):
102+
return data
103+
104+
masked_data = {}
105+
for key, value in data.items():
106+
# Check if the key is a sensitive field (case-insensitive)
107+
key_lower = key.lower()
108+
is_sensitive = any(
109+
field.lower() == key_lower or field.lower() in key_lower
110+
for field in SENSITIVE_FIELDS
111+
)
112+
113+
if is_sensitive and isinstance(value, str):
114+
# Mask the sensitive value
115+
masked_data[key] = MASK_PATTERN
116+
elif isinstance(value, dict):
117+
# Recursively mask nested dictionaries
118+
masked_data[key] = mask_sensitive_dict(value)
119+
elif isinstance(value, list):
120+
# Process list items
121+
masked_data[key] = [
122+
mask_sensitive_dict(item) if isinstance(item, dict) else item
123+
for item in value
124+
]
125+
else:
126+
# Keep non-sensitive values as-is
127+
masked_data[key] = value
128+
129+
return masked_data
130+
131+
132+
def mask_sensitive_info(text: str) -> str:
133+
"""
134+
Mask sensitive information in text by replacing values with **********
135+
136+
Args:
137+
text: Original text that may contain sensitive information
138+
139+
Returns:
140+
Text with sensitive values masked
141+
"""
142+
masked_text = text
143+
144+
for field in SENSITIVE_FIELDS:
145+
patterns = [
146+
rf'"{field}"\s*:\s*"[^"]*"',
147+
rf'{field}\s*=\s*[^\s,\]]+',
148+
rf"'{field}'\s*:\s*'[^']*'",
149+
]
150+
151+
for pattern in patterns:
152+
if '"' in pattern:
153+
masked_text = re.sub(pattern, f'"{field}": "{MASK_PATTERN}"', masked_text, flags=re.IGNORECASE)
154+
elif "'" in pattern:
155+
masked_text = re.sub(pattern, f"'{field}': '{MASK_PATTERN}'", masked_text, flags=re.IGNORECASE)
156+
else:
157+
masked_text = re.sub(pattern, f'{field}={MASK_PATTERN}', masked_text, flags=re.IGNORECASE)
158+
159+
return masked_text

runtime/datamate-python/app/module/collection/client/datax_client.py

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from typing import Dict, Any
77

88
from app.core.logging import get_logger
9+
from app.core.security import mask_sensitive_info
910
from app.db.models.data_collection import CollectionTask, TaskExecution, CollectionTemplate
1011
from app.module.collection.schema.collection import CollectionConfig, SyncMode
1112
from app.module.shared.schema import TaskStatus
@@ -118,25 +119,19 @@ def run_datax_job(self):
118119
Returns:
119120
执行结果字典
120121
"""
121-
# 创建配置文件
122-
self.create__config_file()
122+
if not self.execution.started_at:
123+
self.execution.started_at = datetime.now()
124+
123125
try:
124-
# 构建命令
126+
self.create__config_file()
125127
cmd = [self.python_path, str(self.datax_main), str(self.config_file_path)]
126128
cmd_str = ' '.join(cmd)
127129
logger.info(f"执行命令: {cmd_str}")
128-
if not self.execution.started_at:
129-
self.execution.started_at = datetime.now()
130-
# 执行命令并写入日志
131130
with open(self.execution.log_path, 'w', encoding='utf-8') as log_f:
132-
# 写入头信息
133131
self.write_header_log(cmd_str, log_f)
134-
# 启动datax进程
135132
exit_code = self._run_process(cmd, log_f)
136-
# 记录结束时间
137133
self.execution.completed_at = datetime.now()
138134
self.execution.duration_seconds = (self.execution.completed_at - self.execution.started_at).total_seconds()
139-
# 写入结束信息
140135
self.write_tail_log(exit_code, log_f)
141136
if exit_code == 0:
142137
logger.info(f"DataX 任务执行成功: {self.execution.id}")
@@ -149,17 +144,17 @@ def run_datax_job(self):
149144
logger.error(self.execution.error_message)
150145
except Exception as e:
151146
self.execution.completed_at = datetime.now()
152-
self.execution.duration_seconds = (self.execution.completed_at - self.execution.started_at).total_seconds()
147+
if self.execution.started_at:
148+
self.execution.duration_seconds = (self.execution.completed_at - self.execution.started_at).total_seconds()
153149
self.execution.error_message = f"执行异常: {e}"
154150
self.execution.status = TaskStatus.FAILED.name
155151
logger.error(f"执行异常: {e}", exc_info=True)
152+
with open(self.execution.log_path, 'w', encoding='utf-8') as log_f:
153+
log_f.write(f"任务执行失败: {e}\n")
156154

157-
# 根据同步模式更新任务状态
158155
if self.task.sync_mode == SyncMode.ONCE:
159-
# 一次性任务:使用执行结果作为最终状态
160156
self.task.status = self.execution.status
161157
else:
162-
# 定时任务:恢复为 PENDING 状态,等待下次执行
163158
self.task.status = TaskStatus.PENDING.name
164159

165160
def rename_collection_result(self):
@@ -229,10 +224,12 @@ def write_header_log(self, cmd: str, log_f):
229224

230225
@staticmethod
231226
def read_stream(stream, log_f):
232-
"""读取输出流"""
227+
"""读取输出流并屏蔽敏感信息"""
233228
for line in stream:
234229
line = line.rstrip('\n')
235230
if line:
231+
# Mask sensitive information before writing to log
232+
masked_line = mask_sensitive_info(line)
236233
# 写入日志文件
237-
log_f.write(f"{line}\n")
234+
log_f.write(f"{masked_line}\n")
238235
log_f.flush()

runtime/datamate-python/app/module/collection/interface/collection.py

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,12 @@
1212

1313
from app.core.exception import ErrorCodes, BusinessError, SuccessResponse, transaction
1414
from app.core.logging import get_logger
15+
from app.core.security import preserve_sensitive_values
1516
from app.db.models import Dataset, DatasetFiles
1617
from app.db.models.data_collection import CollectionTask, TaskExecution, CollectionTemplate
1718
from app.db.session import get_db
1819
from app.module.collection.client.datax_client import DataxClient
19-
from app.module.collection.schema.collection import CollectionTaskBase, CollectionTaskCreate, CollectionTaskUpdate, converter_to_response, \
20+
from app.module.collection.schema.collection import CollectionTaskBase, CollectionTaskCreate, CollectionTaskUpdate, CollectionConfig, converter_to_response, \
2021
convert_for_create, SyncMode
2122
from app.module.collection.schedule import schedule_collection_task, remove_collection_task
2223
from app.module.collection.service.collection import CollectionTaskService
@@ -300,12 +301,28 @@ async def update_task(
300301
reschedule_collection_task(task_id, task.schedule_expression)
301302

302303
if 'config' in update_data:
303-
# 重新生成任务配置文件
304+
# Get original config from database
305+
original_config = json.loads(task.config) if task.config else {}
306+
307+
# Preserve sensitive values if masked pattern detected
308+
preserved_config = preserve_sensitive_values(
309+
request.config.dict(),
310+
original_config
311+
)
312+
313+
# Regenerate task config file with preserved sensitive values
304314
template = await db.execute(select(CollectionTemplate).where(CollectionTemplate.id == task.template_id))
305315
template = template.scalar_one_or_none()
306316
if template:
307-
DataxClient.generate_datx_config(request.config, template, task.target_path)
308-
task.config = json.dumps(request.config.dict())
317+
# Use preserved config to generate DataX config
318+
config_obj = CollectionConfig(**preserved_config)
319+
DataxClient.generate_datx_config(
320+
config_obj,
321+
template,
322+
task.target_path
323+
)
324+
# Save the modified config (with regenerated job) to database
325+
task.config = json.dumps(config_obj.model_dump())
309326

310327
# 如果任务处于 FAILED 状态,修改后重置为 PENDING,允许重新执行
311328
if task.status == TaskStatus.FAILED.name:

runtime/datamate-python/app/module/collection/schema/collection.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@
22
import uuid
33
from datetime import datetime
44
from enum import Enum
5-
from typing import Optional
5+
from typing import Optional, Dict, Any
66

7-
from pydantic import BaseModel, Field, validator, ConfigDict, field_validator
7+
from pydantic import BaseModel, Field, validator, ConfigDict, field_validator, field_serializer
88
from pydantic.alias_generators import to_camel
99

10+
from app.core.security import mask_sensitive_dict
1011
from app.db.models.data_collection import CollectionTask, TaskExecution, CollectionTemplate
1112
from app.module.dataset.schema import DatasetTypeResponse
1213
from app.module.dataset.schema.dataset import DatasetType
@@ -23,6 +24,13 @@ class CollectionConfig(BaseModel):
2324
writer: Optional[dict] = Field(None, description="writer参数")
2425
job: Optional[dict] = Field(None, description="任务配置")
2526

27+
@field_serializer('parameter', 'reader', 'writer', 'job', when_used='json')
28+
def mask_sensitive_fields(self, value: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
29+
"""Mask sensitive fields when serializing to JSON"""
30+
if value is None:
31+
return value
32+
return mask_sensitive_dict(value)
33+
2634
class CollectionTaskBase(BaseModel):
2735
id: str = Field(..., description="任务id")
2836
name: str = Field(..., description="任务名称")
@@ -93,6 +101,7 @@ def validate_timeout(cls, v):
93101
)
94102

95103
def converter_to_response(task: CollectionTask) -> CollectionTaskBase:
104+
config_dict = json.loads(task.config)
96105
return CollectionTaskBase(
97106
id=task.id,
98107
name=task.name,
@@ -101,7 +110,7 @@ def converter_to_response(task: CollectionTask) -> CollectionTaskBase:
101110
template_id=task.template_id,
102111
template_name=task.template_name,
103112
target_path=task.target_path,
104-
config=json.loads(task.config),
113+
config=CollectionConfig(**config_dict),
105114
schedule_expression=task.schedule_expression,
106115
status=task.status,
107116
retry_count=task.retry_count,

0 commit comments

Comments
 (0)