Skip to content

Commit 32e15ad

Browse files
committed
refactor: update dataset handling and clean up unused parameters
1 parent 5363927 commit 32e15ad

8 files changed

Lines changed: 25 additions & 25 deletions

File tree

backend/services/data-cleaning-service/src/main/java/com/dataengine/cleaning/application/httpclient/DatasetClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public class DatasetClient {
3737
public static DatasetResponse createDataset(String name, String type) {
3838
CreateDatasetRequest createDatasetRequest = new CreateDatasetRequest();
3939
createDatasetRequest.setName(name);
40-
createDatasetRequest.setType(type);
40+
createDatasetRequest.setDatasetType(type);
4141

4242

4343
String jsonPayload;

backend/services/data-cleaning-service/src/main/java/com/dataengine/cleaning/application/service/CleaningTaskService.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import java.io.File;
3030
import java.io.FileWriter;
3131
import java.io.IOException;
32-
import java.time.ZoneId;
3332
import java.util.List;
3433
import java.util.Map;
3534
import java.util.UUID;
@@ -56,7 +55,8 @@ public List<CleaningTask> getTasks(String status, String keywords, Integer page,
5655

5756
@Transactional
5857
public CleaningTask createTask(CreateCleaningTaskRequest request) {
59-
DatasetResponse datasetResponse = DatasetClient.createDataset(request.getDestDatasetName(), request.getDestDatasetType());
58+
DatasetResponse datasetResponse = DatasetClient.createDataset(request.getDestDatasetName(),
59+
request.getDestDatasetType());
6060

6161
CleaningTask task = new CleaningTask();
6262
task.setName(request.getName());
@@ -74,7 +74,7 @@ public CleaningTask createTask(CreateCleaningTaskRequest request) {
7474
.map(OperatorInstanceConverter.INSTANCE::operatorToDo).toList();
7575
operatorInstanceMapper.insertInstance(taskId, instancePos);
7676

77-
taskExecutor.submit(() -> executeTask(task, request, datasetResponse.getId()));
77+
taskExecutor.submit(() -> executeTask(task, request));
7878
return task;
7979
}
8080

@@ -87,7 +87,7 @@ public void deleteTask(String taskId) {
8787
cleaningTaskMapper.deleteTask(taskId);
8888
}
8989

90-
private void executeTask(CleaningTask task, CreateCleaningTaskRequest request, String destDatasetId) {
90+
private void executeTask(CleaningTask task, CreateCleaningTaskRequest request) {
9191
task.setStatus(CleaningTask.StatusEnum.RUNNING);
9292
cleaningTaskMapper.updateTaskStatus(task);
9393
prepareTask(task, request.getInstance());
@@ -139,9 +139,7 @@ private void scanDataset(String taskId, String srcDatasetId) {
139139
"fileSize", content.getSize() + "B",
140140
"filePath", content.getFilePath(),
141141
"fileType", content.getFileType(),
142-
"fileId", content.getId(),
143-
"sourceFileModifyTime",
144-
content.getLastAccessTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()))
142+
"fileId", content.getId()))
145143
.toList();
146144
writeListMapToJsonlFile(files, FLOW_PATH + "/" + taskId + "/dataset.jsonl");
147145
pageNumber += 1;

backend/services/data-cleaning-service/src/main/java/com/dataengine/cleaning/domain/model/CreateDatasetRequest.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,16 @@
1212
@Setter
1313
@NoArgsConstructor
1414
public class CreateDatasetRequest {
15-
15+
/** 数据集名称 */
1616
private String name;
17-
17+
/** 数据集描述 */
1818
private String description;
19-
20-
private String type;
21-
22-
@Valid
19+
/** 数据集类型 */
20+
private String datasetType;
21+
/** 标签列表 */
2322
private List<String> tags;
24-
23+
/** 数据源 */
2524
private String dataSource;
25+
/** 目标位置 */
26+
private String targetLocation;
2627
}

backend/services/data-cleaning-service/src/main/java/com/dataengine/cleaning/domain/model/DatasetResponse.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import lombok.Setter;
77

88
import java.time.LocalDateTime;
9+
import java.util.List;
910

1011
/**
1112
* 数据集实体(与数据库表 t_dm_datasets 对齐)
@@ -22,7 +23,7 @@ public class DatasetResponse {
2223
/** 数据集描述 */
2324
private String description;
2425
/** 数据集类型 */
25-
private DatasetTypeResponse type;
26+
private String datasetType;
2627
/** 数据集状态 */
2728
private String status;
2829
/** 数据源 */

deployment/helm/ray/ray-cluster/values.yaml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ head:
117117
volumeMounts:
118118
- mountPath: /tmp/ray
119119
name: log-volume
120-
subPath: ray
120+
subPath: ray/head
121121
- mountPath: /dataset
122122
name: dataset-volume
123123
- mountPath: /flow
@@ -137,6 +137,9 @@ head:
137137
ports:
138138
- containerPort: 8080
139139
volumeMounts:
140+
- mountPath: /tmp/ray
141+
name: log-volume
142+
subPath: ray/head
140143
- mountPath: /var/log/data-platform
141144
name: log-volume
142145
- mountPath: /dataset
@@ -237,7 +240,7 @@ worker:
237240
volumeMounts:
238241
- mountPath: /tmp/ray
239242
name: log-volume
240-
subPath: ray
243+
subPath: ray/worker
241244
- mountPath: /dataset
242245
name: dataset-volume
243246
- mountPath: /flow

runtime/python-executor/data_platform/sqlite_manager/persistence_atction.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ def persistence_task_info(self, sample: Dict[str, Any]):
4141
file_type = str(sample.get("fileType"))
4242
file_name = str(sample.get("fileName"))
4343
file_path = str(sample.get("filePath"))
44-
source_file_modify_time = int(sample.get("sourceFileModifyTime") if sample.get("sourceFileModifyTime") else "0")
4544
status = int(sample.get("execute_status"))
4645
failed_reason = sample.get("failed_reason")
4746
operator_id = str(failed_reason.get("op_name")) if failed_reason else ""
@@ -50,8 +49,8 @@ def persistence_task_info(self, sample: Dict[str, Any]):
5049
child_id = sample.get("childId")
5150
slice_num = sample.get('slice_num', 0)
5251
insert_data = [instance_id, meta_file_name, meta_file_type, meta_file_id, meta_file_size, file_id, file_size,
53-
file_type, file_name, file_path, source_file_modify_time, status, operator_id, error_code,
54-
incremental, child_id, slice_num]
52+
file_type, file_name, file_path, status, operator_id, error_code, incremental, child_id,
53+
slice_num]
5554
self.insert_clean_result(insert_data, instance_id)
5655

5756
def insert_clean_result(self, insert_data, instance_id):
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"query_sql": "SELECT * FROM task_instance_info WHERE instance_id = ?",
33
"db_path": "/flow/sqlite.db",
4-
"insert_sql": "INSERT INTO task_instance_info (instance_id, meta_file_name, meta_file_type, meta_file_id, meta_file_size, file_id, file_size, file_type, file_name, file_path, source_file_modify_time, status, operator_id, error_code, incremental, childId, slice_num) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
5-
"create_tables_sql": "CREATE TABLE IF NOT EXISTS task_instance_info (instance_id TEXT, meta_file_name TEXT, meta_file_type TEXT, meta_file_id TEXT, meta_file_size TEXT, file_id TEXT, file_size TEXT, file_type TEXT, file_name TEXT, file_path TEXT, source_file_modify_time INTEGER, status INTEGER, operator_id TEXT, error_code TEXT, incremental TEXT, childId INTEGER, slice_num INTEGER DEFAULT 0)",
4+
"insert_sql": "INSERT INTO task_instance_info (instance_id, meta_file_name, meta_file_type, meta_file_id, meta_file_size, file_id, file_size, file_type, file_name, file_path, status, operator_id, error_code, incremental, childId, slice_num) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
5+
"create_tables_sql": "CREATE TABLE IF NOT EXISTS task_instance_info (instance_id TEXT, meta_file_name TEXT, meta_file_type TEXT, meta_file_id TEXT, meta_file_size TEXT, file_id TEXT, file_size TEXT, file_type TEXT, file_name TEXT, file_path TEXT, status INTEGER, operator_id TEXT, error_code TEXT, incremental TEXT, childId INTEGER, slice_num INTEGER DEFAULT 0)",
66
"delete_task_instance_sql": "DELETE FROM task_instance_info WHERE instance_id = ?"
77
}

runtime/python-executor/data_platform/wrappers/data_platform_executor.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,6 @@ def load_meta(line):
5555
meta["extraFilePath"] = None
5656
if not meta.get("extraFileType"):
5757
meta["extraFileType"] = None
58-
if not meta.get("sourceFileModifyTime"):
59-
meta["sourceFileModifyTime"] = None
6058
return meta
6159

6260
def run(self):

0 commit comments

Comments
 (0)