Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ endef
# ========== Build Targets ==========

# Valid build targets
VALID_BUILD_TARGETS := backend database frontend runtime backend-python deer-flow mineru mineru-npu
VALID_BUILD_TARGETS := backend database frontend runtime backend-python deer-flow mineru mineru-npu gateway

# Generic docker build target with service name as parameter
# Automatically prefixes image names with "datamate-" unless it's deer-flow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ public void deleteOperator(String id) {
if (operatorRepo.operatorInTemplateOrRunning(id)) {
throw BusinessException.of(OperatorErrorCode.OPERATOR_IN_INSTANCE);
}
if (relationRepo.operatorIsPredefined(id)) {
throw BusinessException.of(OperatorErrorCode.CANT_DELETE_PREDEFINED_OPERATOR);
}
operatorRepo.deleteOperator(id);
relationRepo.deleteByOperatorId(id);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class OperatorConstant {

public static String CATEGORY_STAR_ID = "51847c24-bba9-11f0-888b-5b143cb738aa";

public static String CATEGORY_PREDEFINED_ID = "96a3b07a-3439-4557-a835-525faad60ca3";

public static Map<String, String> CATEGORY_MAP = new HashMap<>();

static {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,6 @@ public interface CategoryRelationRepository extends IRepository<CategoryRelation
void batchUpdate(String operatorId, List<String> categories);

void deleteByOperatorId(String operatorId);

boolean operatorIsPredefined(String operatorId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ public enum OperatorErrorCode implements ErrorCode {

SETTINGS_PARSE_FAILED("op.0004", "settings字段解析失败"),

OPERATOR_IN_INSTANCE("op.0005", "算子已被编排在模板或未完成的任务中");
OPERATOR_IN_INSTANCE("op.0005", "算子已被编排在模板或未完成的任务中"),

CANT_DELETE_PREDEFINED_OPERATOR("op.0006", "预置算子无法删除");

private final String code;
private final String message;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.repository.CrudRepository;
import com.datamate.operator.domain.contants.OperatorConstant;
import com.datamate.operator.domain.model.CategoryRelation;
import com.datamate.operator.domain.repository.CategoryRelationRepository;
import com.datamate.operator.infrastructure.converter.CategoryRelationConverter;
Expand Down Expand Up @@ -48,4 +49,12 @@ public void deleteByOperatorId(String operatorId) {
queryWrapper.eq(CategoryRelation::getOperatorId, operatorId);
mapper.delete(queryWrapper);
}

@Override
public boolean operatorIsPredefined(String operatorId) {
LambdaQueryWrapper<CategoryRelation> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(CategoryRelation::getOperatorId, operatorId)
.eq(CategoryRelation::getCategoryId, OperatorConstant.CATEGORY_PREDEFINED_ID);
return this.exists(queryWrapper);
}
}
6 changes: 6 additions & 0 deletions backend/services/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
<version>${spring-boot.version}</version>
</dependency>

<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-spring-boot3-starter</artifactId>
Expand Down
2 changes: 2 additions & 0 deletions frontend/src/components/ActionDropdown.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ const ActionDropdown = ({
<Button
type="text"
size="small"
disabled={action.disabled || false}
className="w-full text-left"
danger={action.danger}
icon={action.icon}
Expand All @@ -84,6 +85,7 @@ const ActionDropdown = ({
className="w-full"
size="small"
type="text"
disabled={action.disabled || false}
danger={action.danger}
icon={action.icon}
onClick={() => handleActionClick(action)}
Expand Down
38 changes: 38 additions & 0 deletions frontend/src/pages/DataCleansing/Detail/components/FileTable.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ export default function FileTable({result, fetchTaskResult}) {
title: "文件名",
dataIndex: "srcName",
key: "srcName",
width: 200,
filterDropdown: ({
setSelectedKeys,
selectedKeys,
Expand Down Expand Up @@ -110,6 +111,43 @@ export default function FileTable({result, fetchTaskResult}) {
<span>{text?.replace(/\.[^/.]+$/, "")}</span>
),
},
{
title: "清洗后文件名",
dataIndex: "destName",
key: "destName",
width: 200,
filterDropdown: ({
setSelectedKeys,
selectedKeys,
confirm,
clearFilters,
}: any) => (
<div className="p-4 w-64">
<Input
placeholder="搜索文件名"
value={selectedKeys[0]}
onChange={(e) =>
setSelectedKeys(e.target.value ? [e.target.value] : [])
}
onPressEnter={() => confirm()}
className="mb-2"
/>
<div className="flex gap-2">
<Button size="small" onClick={() => confirm()}>
搜索
</Button>
<Button size="small" onClick={() => clearFilters()}>
重置
</Button>
</div>
</div>
),
onFilter: (value: string, record: any) =>
record.destName.toLowerCase().includes(value.toLowerCase()),
render: (text: string) => (
<span>{text?.replace(/\.[^/.]+$/, "")}</span>
),
},
{
title: "文件类型",
dataIndex: "srcType",
Expand Down
45 changes: 2 additions & 43 deletions frontend/src/pages/OperatorMarket/Home/components/List.tsx
Original file line number Diff line number Diff line change
@@ -1,54 +1,13 @@
import { Button, List, Tag, Badge } from "antd";
import { StarFilled } from "@ant-design/icons";
import { Zap, Settings, X } from "lucide-react";
import { useState } from "react";
import { Button, List, Tag } from "antd";
import { useNavigate } from "react-router";
import { Operator } from "../../operator.model";

export function ListView({ operators = [], pagination, operations }) {
const navigate = useNavigate();
const [favoriteOperators, setFavoriteOperators] = useState<Set<number>>(
new Set([1, 3, 6])
);
const handleUpdateOperator = (operator: Operator) => {
navigate(`/data/operator-market/create/${operator.id}`);
};

const handleViewOperator = (operator: Operator) => {
navigate(`/data/operator-market/plugin-detail/${operator.id}`);
};
const handleToggleFavorite = (operatorId: number) => {
setFavoriteOperators((prev) => {
const newFavorites = new Set(prev);
if (newFavorites.has(operatorId)) {
newFavorites.delete(operatorId);
} else {
newFavorites.add(operatorId);
}
return newFavorites;
});
};
const getStatusBadge = (status: string) => {
const statusConfig = {
active: {
label: "活跃",
color: "green",
icon: <Zap className="w-3 h-3" />,
},
beta: {
label: "测试版",
color: "blue",
icon: <Settings className="w-3 h-3" />,
},
deprecated: {
label: "已弃用",
color: "gray",
icon: <X className="w-3 h-3" />,
},
};
return (
statusConfig[status as keyof typeof statusConfig] || statusConfig.active
);
};

return (
<List
Expand Down
2 changes: 1 addition & 1 deletion runtime/datamate-python/app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class Config:
# Log
log_level: str = "INFO"
debug: bool = True
log_file_dir: str = "/var/log/datamate"
log_file_dir: str = "/var/log/datamate/backend-python"

# Database
mysql_host: str = "datamate-database"
Expand Down
2 changes: 1 addition & 1 deletion runtime/ops/filter/remove_duplicate_file/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def has_similar_text(self, connection, file_name, text_minhash, total_count) ->
query_sql = self.sql_dict.get("query_sql")
for i in range(0, total_count, self.page_size):
rows = connection.execute(
text(query_sql), {"task_uuid": self.task_uuid, "ge": self.page_size, "le": i}).fetchall()
text(query_sql), {"task_uuid": self.task_uuid, "file_name": file_name, "ge": self.page_size, "le": i}).fetchall()
# 对应任务uuid,最后一页没有数据,跳出循环
if not rows:
break
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"query_sql": "SELECT * FROM operators_similar_text_features WHERE task_uuid = :task_uuid ORDER BY timestamp LIMIT :ge OFFSET :le",
"query_sql": "SELECT * FROM operators_similar_text_features WHERE task_uuid = :task_uuid AND file_name != :file_name ORDER BY timestamp LIMIT :ge OFFSET :le",
"create_tables_sql": "CREATE TABLE IF NOT EXISTS operators_similar_text_features (id INT AUTO_INCREMENT PRIMARY KEY, task_uuid VARCHAR(255),file_feature TEXT,file_name TEXT,timestamp DATETIME);",
"insert_sql": "INSERT INTO operators_similar_text_features (task_uuid, file_feature, file_name, timestamp) VALUES (:task_uuid, :file_feature, :file_name, :timestamp)",
"query_task_uuid_sql": "SELECT * FROM operators_similar_text_features WHERE task_uuid = :task_uuid"
Expand Down
31 changes: 24 additions & 7 deletions runtime/python-executor/datamate/core/base_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import time
import traceback
import uuid
from pathlib import Path
from typing import List, Dict, Any, Tuple

import cv2
Expand Down Expand Up @@ -445,14 +446,15 @@ def execute(self, sample: Dict[str, Any]):
return False

if save_path:
self.save_file(sample, save_path)
save_path = self.save_file(sample, save_path)
sample[self.text_key] = ''
sample[self.data_key] = b''
sample[Fields.result] = True

file_type = save_path.split('.')[-1]
sample[self.filetype_key] = file_type

file_name = os.path.basename(save_path)
base_name, _ = os.path.splitext(file_name)
new_file_name = base_name + '.' + file_type
sample[self.filename_key] = new_file_name
Expand Down Expand Up @@ -516,13 +518,28 @@ def get_medicalfile_handler(self, sample: Dict[str, Any]):
def save_file(self, sample, save_path):
# 以二进制格式保存文件
file_sample = sample[self.text_key].encode('utf-8') if sample[self.text_key] else sample[self.data_key]
with open(save_path, 'wb') as f:
f.write(file_sample)
# 获取父目录路径

parent_dir = os.path.dirname(save_path)
path_obj = Path(save_path).resolve()
parent_dir = path_obj.parent
stem = path_obj.stem # 文件名不含后缀
suffix = path_obj.suffix # 后缀 (.txt)

counter = 0
current_path = path_obj
while True:
try:
# x 模式保证:如果文件存在则报错,如果不存在则创建。
# 这个检查+创建的过程是操作系统级的原子操作,没有竞态条件。
with open(current_path, 'xb') as f:
f.write(file_sample)
break
except FileExistsError:
# 文件已存在(被其他线程/进程抢占),更新文件名重试
counter += 1
new_filename = f"{stem}_{counter}{suffix}"
current_path = parent_dir / new_filename
os.chmod(parent_dir, 0o770)
os.chmod(save_path, 0o640)
os.chmod(current_path, 0o640)
return str(current_path)

def _get_from_data(self, sample: Dict[str, Any]) -> Dict[str, Any]:
sample[self.data_key] = bytes(sample[self.data_key])
Expand Down
5 changes: 4 additions & 1 deletion runtime/python-executor/datamate/core/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import importlib
import sys
import uuid
from abc import ABC, abstractmethod

import pyarrow as pa
Expand Down Expand Up @@ -119,12 +120,14 @@ def process(self,

# 加载Ops module
temp_ops = self.load_ops_module(op_name)
operators_cls_list.append(temp_ops)

if index == 0:
init_kwargs["is_first_op"] = True

if index == len(cfg_process) - 1:
init_kwargs["is_last_op"] = True
operators_cls_list.append(temp_ops)
init_kwargs["instance_id"] = kwargs.get("instance_id", str(uuid.uuid4()))
init_kwargs_list.append(init_kwargs)

for cls_id, operators_cls in enumerate(operators_cls_list):
Expand Down