Skip to content

Commit 1875918

Browse files
authored
feat: use hard file link instead of copying file (#433)
1 parent bca3b07 commit 1875918

File tree

3 files changed

+118
-9
lines changed

3 files changed

+118
-9
lines changed

backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,10 @@ public void processDataSourceAsync(String datasetId, String dataSourceId) {
285285
if (CollectionUtils.isEmpty(filePaths)) {
286286
return;
287287
}
288-
datasetFileApplicationService.addFilesToDataset(datasetId, new AddFilesRequest(filePaths));
288+
// 创建请求并启用硬链接(softAdd=true)
289+
AddFilesRequest addFilesRequest = new AddFilesRequest(filePaths);
290+
addFilesRequest.setSoftAdd(true);
291+
datasetFileApplicationService.addFilesToDataset(datasetId, addFilesRequest);
289292
log.info("Success file scan, total files: {}", filePaths.size());
290293
} catch (Exception e) {
291294
log.error("处理数据源文件扫描失败,数据集ID: {}, 数据源ID: {}", datasetId, dataSourceId, e);

runtime/datamate-python/app/module/collection/interface/collection.py

Lines changed: 92 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import uuid
44
import shutil
55
import os
6+
import asyncio
67
from typing import Optional
78

89
from fastapi import APIRouter, Depends, Query
@@ -11,7 +12,7 @@
1112

1213
from app.core.exception import ErrorCodes, BusinessError, SuccessResponse, transaction
1314
from app.core.logging import get_logger
14-
from app.db.models import Dataset
15+
from app.db.models import Dataset, DatasetFiles
1516
from app.db.models.data_collection import CollectionTask, TaskExecution, CollectionTemplate
1617
from app.db.session import get_db
1718
from app.module.collection.client.datax_client import DataxClient
@@ -28,6 +29,92 @@
2829
logger = get_logger(__name__)
2930

3031

32+
async def is_hard_link(file_path: str) -> bool:
33+
"""检查文件是否是硬链接"""
34+
try:
35+
stat_info = await asyncio.to_thread(os.stat, file_path)
36+
# 如果链接数大于1,说明是硬链接
37+
return stat_info.st_nlink > 1
38+
except OSError:
39+
return False
40+
41+
42+
async def convert_hardlink_to_real_file(file_path: str) -> bool:
43+
"""
44+
将硬链接转换为实体文件
45+
通过读取并重新写入文件内容,创建一个独立的副本
46+
"""
47+
try:
48+
# 创建临时文件
49+
temp_path = f"{file_path}.tmp"
50+
# 使用 shutil.copy2 创建副本(保留元数据)
51+
await asyncio.to_thread(shutil.copy2, file_path, temp_path)
52+
# 删除原文件(硬链接)
53+
await asyncio.to_thread(os.unlink, file_path)
54+
# 重命名临时文件为原文件名
55+
await asyncio.to_thread(os.replace, temp_path, file_path)
56+
return True
57+
except OSError as e:
58+
logger.warning(f"Failed to convert hard link to real file {file_path}: {e}")
59+
# 清理临时文件(如果存在)
60+
if os.path.exists(f"{file_path}.tmp"):
61+
try:
62+
await asyncio.to_thread(os.remove, f"{file_path}.tmp")
63+
except OSError:
64+
pass
65+
return False
66+
67+
68+
async def convert_dataset_hardlinks_before_delete(task_id: str, db: AsyncSession) -> int:
69+
"""
70+
删除归集任务前,将数据集中的硬链接文件转换为实体文件
71+
72+
Args:
73+
task_id: 归集任务ID
74+
db: 数据库会话
75+
76+
Returns:
77+
转换成功的文件数量
78+
"""
79+
try:
80+
# 查找所有数据集文件(通过文件路径匹配任务ID)
81+
# 注意:归集任务的源文件路径是 tmp/dataset/{task_id}/
82+
# 我们需要找到数据集中所有以这个路径为源的文件
83+
source_prefix = f"tmp/dataset/{task_id}/"
84+
85+
# 查询所有可能相关的数据集文件
86+
result = await db.execute(
87+
select(DatasetFiles).where(
88+
DatasetFiles.file_path.like(f"%/dataset/%"),
89+
DatasetFiles.status == "ACTIVE"
90+
)
91+
)
92+
dataset_files = result.scalars().all()
93+
94+
converted_count = 0
95+
for dataset_file in dataset_files:
96+
file_path = dataset_file.file_path
97+
if not file_path:
98+
continue
99+
100+
# 检查文件是否是硬链接
101+
if await is_hard_link(file_path):
102+
logger.info(f"Converting hard link to real file: {file_path}")
103+
success = await convert_hardlink_to_real_file(file_path)
104+
if success:
105+
converted_count += 1
106+
else:
107+
logger.warning(f"Failed to convert hard link: {file_path}")
108+
109+
if converted_count > 0:
110+
logger.info(f"Converted {converted_count} hard link(s) to real file(s) for task {task_id}")
111+
112+
return converted_count
113+
except Exception as e:
114+
logger.error(f"Error converting hard links for task {task_id}: {e}", exc_info=True)
115+
return 0
116+
117+
31118
@router.post("", response_model=StandardResponse[CollectionTaskBase], operation_id="create_collect_task", tags=["mcp"])
32119
async def create_task(
33120
request: CollectionTaskCreate,
@@ -263,7 +350,10 @@ async def delete_collection_tasks(
263350
# 删除任务
264351
await db.delete(task)
265352

266-
# 事务提交后,删除文件系统和调度
353+
# 事务提交后,先转换硬链接,再删除文件系统和调度
354+
logger.info(f"Converting hard links before deleting task {task_id}")
355+
await convert_dataset_hardlinks_before_delete(task_id, db)
356+
267357
remove_collection_task(task_id)
268358

269359
target_path = f"/dataset/local/{task_id}"

runtime/datamate-python/app/module/dataset/service/service.py

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -582,11 +582,19 @@ async def add_files_to_dataset_subdir(self, dataset_id: str, source_paths: List[
582582
dataset.updated_at = datetime.now()
583583
dataset.status = 'ACTIVE'
584584

585-
# 复制物理文件到目标路径
586-
logger.info(f"copy file {source_path} to {target_path}")
585+
# 创建硬链接(如果跨设备则回退到符号链接)
586+
logger.info(f"creating hard link from {source_path} to {target_path}")
587587
dst_dir = os.path.dirname(target_path)
588588
await asyncio.to_thread(os.makedirs, dst_dir, exist_ok=True)
589-
await asyncio.to_thread(shutil.copy2, source_path, target_path)
589+
try:
590+
# Try to create hard link first
591+
await asyncio.to_thread(os.link, source_path, target_path)
592+
logger.info(f"hard link created successfully")
593+
except OSError as e:
594+
# Hard link may fail due to cross-device link error, fall back to symbolic link
595+
logger.warning(f"failed to create hard link from {source_path} to {target_path}: {e}, falling back to symbolic link")
596+
await asyncio.to_thread(os.symlink, source_path, target_path)
597+
logger.info(f"symbolic link created successfully")
590598

591599
await self.db.commit()
592600

@@ -621,11 +629,19 @@ async def handle_dataset_file(self, dataset, existing_files_map: dict[Any, Any],
621629
dataset.size_bytes = dataset.size_bytes + file_record.file_size
622630
dataset.updated_at = datetime.now()
623631
dataset.status = 'ACTIVE'
624-
# Copy file
625-
logger.info(f"copy file {source_path} to {target_path}")
632+
# Create hard link (fallback to symbolic link if cross-device)
633+
logger.info(f"creating hard link from {source_path} to {target_path}")
626634
dst_dir = os.path.dirname(target_path)
627635
await asyncio.to_thread(os.makedirs, dst_dir, exist_ok=True)
628-
await asyncio.to_thread(shutil.copy2, source_path, target_path)
636+
try:
637+
# Try to create hard link first
638+
await asyncio.to_thread(os.link, source_path, target_path)
639+
logger.info(f"hard link created successfully")
640+
except OSError as e:
641+
# Hard link may fail due to cross-device link error, fall back to symbolic link
642+
logger.warning(f"failed to create hard link from {source_path} to {target_path}: {e}, falling back to symbolic link")
643+
await asyncio.to_thread(os.symlink, source_path, target_path)
644+
logger.info(f"symbolic link created successfully")
629645
await self.db.commit()
630646

631647
@staticmethod

0 commit comments

Comments
 (0)