Skip to content

Commit 8eae000

Browse files
committed
[DSIP-99] [DSIP-99] Narrow task log APIs with logType-based access and use a shared task logs root path
1 parent 08648f2 commit 8eae000

46 files changed

Lines changed: 460 additions & 583 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoggerController.java

Lines changed: 9 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_INSTANCE_LOG_ERROR;
2222

2323
import org.apache.dolphinscheduler.api.exceptions.ApiException;
24+
import org.apache.dolphinscheduler.api.executor.logging.TaskLogType;
2425
import org.apache.dolphinscheduler.api.service.LoggerService;
2526
import org.apache.dolphinscheduler.api.utils.Result;
2627
import org.apache.dolphinscheduler.common.constants.Constants;
@@ -74,33 +75,9 @@ public class LoggerController extends BaseController {
7475
public Result<ResponseTaskLog> queryTaskLog(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
7576
@RequestParam(value = "taskInstanceId") int taskInstanceId,
7677
@RequestParam(value = "skipLineNum") int skipNum,
77-
@RequestParam(value = "limit") int limit) {
78-
return loggerService.queryTaskLog(loginUser, taskInstanceId, skipNum, limit);
79-
}
80-
81-
/**
82-
* query task output
83-
*
84-
* @param loginUser login user
85-
* @param taskInstanceId task instance id
86-
* @param skipNum skip number
87-
* @param limit limit
88-
* @return task log content
89-
*/
90-
@Operation(summary = "queryOutput", description = "QUERY_TASK_INSTANCE_OUTPUT_NOTES")
91-
@Parameters({
92-
@Parameter(name = "taskInstanceId", description = "TASK_ID", required = true, schema = @Schema(implementation = int.class, example = "100")),
93-
@Parameter(name = "skipLineNum", description = "SKIP_LINE_NUM", required = true, schema = @Schema(implementation = int.class, example = "100")),
94-
@Parameter(name = "limit", description = "LIMIT", required = true, schema = @Schema(implementation = int.class, example = "100"))
95-
})
96-
@GetMapping(value = "/output_detail")
97-
@ResponseStatus(HttpStatus.OK)
98-
@ApiException(QUERY_TASK_INSTANCE_LOG_ERROR)
99-
public Result<ResponseTaskLog> queryTaskOutput(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
100-
@RequestParam(value = "taskInstanceId") int taskInstanceId,
101-
@RequestParam(value = "skipLineNum") int skipNum,
102-
@RequestParam(value = "limit") int limit) {
103-
return loggerService.queryTaskOutput(loginUser, taskInstanceId, skipNum, limit);
78+
@RequestParam(value = "limit") int limit,
79+
@RequestParam(value = "logType", defaultValue = "LOG") TaskLogType logType) {
80+
return loggerService.queryLog(loginUser, taskInstanceId, skipNum, limit, logType);
10481
}
10582

10683
/**
@@ -118,37 +95,15 @@ public Result<ResponseTaskLog> queryTaskOutput(@Parameter(hidden = true) @Reques
11895
@ResponseBody
11996
@ApiException(DOWNLOAD_TASK_INSTANCE_LOG_FILE_ERROR)
12097
public ResponseEntity downloadTaskLog(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
121-
@RequestParam(value = "taskInstanceId") int taskInstanceId) {
122-
byte[] logBytes = loggerService.getTaskLogBytes(loginUser, taskInstanceId);
98+
@RequestParam(value = "taskInstanceId") int taskInstanceId,
99+
@RequestParam(value = "logType", defaultValue = "LOG") TaskLogType logType) {
100+
byte[] logBytes = loggerService.getLogBytes(loginUser, taskInstanceId, logType);
101+
String fileName = logType == TaskLogType.LOG ? "task.log" : "task.out";
123102
return ResponseEntity
124103
.ok()
125104
.header(HttpHeaders.CONTENT_DISPOSITION,
126-
"attachment; filename=\"" + System.currentTimeMillis() + ".log" + "\"")
105+
"attachment; filename=\"" + fileName + "\"")
127106
.body(logBytes);
128107
}
129108

130-
/**
131-
* download task output file
132-
*
133-
* @param loginUser login user
134-
* @param taskInstanceId task instance id
135-
* @return task output file content
136-
*/
137-
@Operation(summary = "downloadTaskOutput", description = "DOWNLOAD_TASK_INSTANCE_OUTPUT_NOTES")
138-
@Parameters({
139-
@Parameter(name = "taskInstanceId", description = "TASK_ID", required = true, schema = @Schema(implementation = int.class, example = "100"))
140-
})
141-
@GetMapping(value = "/download-output")
142-
@ResponseBody
143-
@ApiException(DOWNLOAD_TASK_INSTANCE_LOG_FILE_ERROR)
144-
public ResponseEntity downloadTaskOutput(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
145-
@RequestParam(value = "taskInstanceId") int taskInstanceId) {
146-
byte[] outputBytes = loggerService.getTaskOutputBytes(loginUser, taskInstanceId);
147-
return ResponseEntity
148-
.ok()
149-
.header(HttpHeaders.CONTENT_DISPOSITION,
150-
"attachment; filename=\"" + System.currentTimeMillis() + ".output.log" + "\"")
151-
.body(outputBytes);
152-
}
153-
154109
}

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/LocalLogClient.java

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadResponse;
2525
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryRequest;
2626
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryResponse;
27+
import org.apache.dolphinscheduler.plugin.task.api.utils.TaskLogFileProvider;
2728

2829
import lombok.extern.slf4j.Slf4j;
2930

@@ -41,12 +42,8 @@ public class LocalLogClient {
4142
* @param taskInstance The task instance object, containing information needed to retrieve the log.
4243
* @return The complete log file download response of the task instance, including log content and metadata.
4344
*/
44-
public TaskInstanceLogFileDownloadResponse getTaskLog(TaskInstance taskInstance) {
45-
return getLocalWholeLog(taskInstance, TaskLogType.LOG);
46-
}
47-
48-
public TaskInstanceLogFileDownloadResponse getTaskOutput(TaskInstance taskInstance) {
49-
return getLocalWholeLog(taskInstance, TaskLogType.OUTPUT);
45+
public TaskInstanceLogFileDownloadResponse getLog(TaskInstance taskInstance, TaskLogType taskLogType) {
46+
return getLocalWholeLog(taskInstance, taskLogType);
5047
}
5148

5249
/**
@@ -59,24 +56,23 @@ public TaskInstanceLogFileDownloadResponse getTaskOutput(TaskInstance taskInstan
5956
* @param limit The maximum number of lines to read, indicating the maximum number of lines to retrieve in this query.
6057
* @return The partial log query response, including log content within the specified range and metadata.
6158
*/
62-
public TaskInstanceLogPageQueryResponse getTaskLog(TaskInstance taskInstance, int skipLineNum, int limit) {
63-
return getLocalPartLog(taskInstance, skipLineNum, limit, TaskLogType.LOG);
64-
}
65-
66-
public TaskInstanceLogPageQueryResponse getTaskOutput(TaskInstance taskInstance, int skipLineNum, int limit) {
67-
return getLocalPartLog(taskInstance, skipLineNum, limit, TaskLogType.OUTPUT);
59+
public TaskInstanceLogPageQueryResponse getLog(TaskInstance taskInstance, int skipLineNum, int limit,
60+
TaskLogType taskLogType) {
61+
return getLocalPartLog(taskInstance, skipLineNum, limit, taskLogType);
6862
}
6963

7064
private TaskInstanceLogFileDownloadResponse getLocalWholeLog(TaskInstance taskInstance, TaskLogType taskLogType) {
7165
TaskInstanceLogFileDownloadRequest request = new TaskInstanceLogFileDownloadRequest(
7266
taskInstance.getId(),
73-
taskLogType.getLogPath(taskInstance));
67+
TaskLogFileProvider.getFilePath(taskInstance.getTaskLogsRootPath(),
68+
TaskLogFileTypeMapping.toTaskLogFileType(taskLogType)));
7469
return getProxyLogService(taskInstance).getTaskInstanceWholeLogFileBytes(request);
7570
}
7671

7772
private TaskInstanceLogPageQueryResponse getLocalPartLog(TaskInstance taskInstance, int skipLineNum,
7873
int limit, TaskLogType taskLogType) {
79-
String logFilePath = taskLogType.getLogPath(taskInstance);
74+
String logFilePath = TaskLogFileProvider.getFilePath(taskInstance.getTaskLogsRootPath(),
75+
TaskLogFileTypeMapping.toTaskLogFileType(taskLogType));
8076
TaskInstanceLogPageQueryRequest request = TaskInstanceLogPageQueryRequest
8177
.builder()
8278
.taskInstanceId(taskInstance.getId())

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/LogClientDelegate.java

Lines changed: 9 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -51,34 +51,20 @@ public class LogClientDelegate {
5151
* @return A string containing the specified portion of the log.
5252
*/
5353

54-
public String getTaskLogString(TaskInstance taskInstance, int skipLineNum, int limit) {
55-
return getPartLogString(taskInstance, skipLineNum, limit, TaskLogType.LOG);
56-
}
57-
58-
public String getTaskOutputString(TaskInstance taskInstance, int skipLineNum, int limit) {
59-
return getPartLogString(taskInstance, skipLineNum, limit, TaskLogType.OUTPUT);
60-
}
61-
62-
private String getPartLogString(TaskInstance taskInstance, int skipLineNum, int limit, TaskLogType taskLogType) {
54+
public String getLogString(TaskInstance taskInstance, int skipLineNum, int limit, TaskLogType taskLogType) {
6355
checkArgs(taskInstance);
6456
if (checkNodeExists(taskInstance)) {
65-
TaskInstanceLogPageQueryResponse response =
66-
taskLogType == TaskLogType.LOG
67-
? localLogClient.getTaskLog(taskInstance, skipLineNum, limit)
68-
: localLogClient.getTaskOutput(taskInstance, skipLineNum, limit);
57+
TaskInstanceLogPageQueryResponse response = localLogClient.getLog(taskInstance, skipLineNum, limit,
58+
taskLogType);
6959
if (response.getCode() == LogResponseStatus.SUCCESS) {
7060
return response.getLogContent();
7161
} else {
7262
log.warn("get part log string is not success for task instance {}; reason :{}",
7363
taskInstance.getId(), response.getMessage());
74-
return taskLogType == TaskLogType.LOG
75-
? remoteLogClient.getTaskLogString(taskInstance, skipLineNum, limit)
76-
: remoteLogClient.getTaskOutputString(taskInstance, skipLineNum, limit);
64+
return remoteLogClient.getLogString(taskInstance, skipLineNum, limit, taskLogType);
7765
}
7866
} else {
79-
return taskLogType == TaskLogType.LOG
80-
? remoteLogClient.getTaskLogString(taskInstance, skipLineNum, limit)
81-
: remoteLogClient.getTaskOutputString(taskInstance, skipLineNum, limit);
67+
return remoteLogClient.getLogString(taskInstance, skipLineNum, limit, taskLogType);
8268
}
8369
}
8470

@@ -89,34 +75,19 @@ private String getPartLogString(TaskInstance taskInstance, int skipLineNum, int
8975
* @param taskInstance The task instance object, containing information needed for log retrieval.
9076
* @return A byte array containing the complete log content.
9177
*/
92-
public byte[] getTaskLogBytes(TaskInstance taskInstance) {
93-
return getWholeLogBytes(taskInstance, TaskLogType.LOG);
94-
}
95-
96-
public byte[] getTaskOutputBytes(TaskInstance taskInstance) {
97-
return getWholeLogBytes(taskInstance, TaskLogType.OUTPUT);
98-
}
99-
100-
private byte[] getWholeLogBytes(TaskInstance taskInstance, TaskLogType taskLogType) {
78+
public byte[] getLogBytes(TaskInstance taskInstance, TaskLogType taskLogType) {
10179
checkArgs(taskInstance);
10280
if (checkNodeExists(taskInstance)) {
103-
TaskInstanceLogFileDownloadResponse response =
104-
taskLogType == TaskLogType.LOG
105-
? localLogClient.getTaskLog(taskInstance)
106-
: localLogClient.getTaskOutput(taskInstance);
81+
TaskInstanceLogFileDownloadResponse response = localLogClient.getLog(taskInstance, taskLogType);
10782
if (response.getCode() == LogResponseStatus.SUCCESS) {
10883
return response.getLogBytes();
10984
} else {
11085
log.warn("get whole log bytes is not success for task instance {}; reason :{}", taskInstance.getId(),
11186
response.getMessage());
112-
return taskLogType == TaskLogType.LOG
113-
? remoteLogClient.getTaskLogBytes(taskInstance)
114-
: remoteLogClient.getTaskOutputBytes(taskInstance);
87+
return remoteLogClient.getLogBytes(taskInstance, taskLogType);
11588
}
11689
} else {
117-
return taskLogType == TaskLogType.LOG
118-
? remoteLogClient.getTaskLogBytes(taskInstance)
119-
: remoteLogClient.getTaskOutputBytes(taskInstance);
90+
return remoteLogClient.getLogBytes(taskInstance, taskLogType);
12091
}
12192
}
12293

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/RemoteLogClient.java

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.dolphinscheduler.common.utils.LogUtils;
2121
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
22+
import org.apache.dolphinscheduler.plugin.task.api.utils.TaskLogFileProvider;
2223

2324
import org.springframework.stereotype.Component;
2425

@@ -32,12 +33,8 @@ public class RemoteLogClient {
3233
* @param taskInstance The task instance object, containing information such as the task ID and log path.
3334
* @return Returns the log content in byte array format.
3435
*/
35-
public byte[] getTaskLogBytes(TaskInstance taskInstance) {
36-
return getWholeLog(taskInstance, TaskLogType.LOG);
37-
}
38-
39-
public byte[] getTaskOutputBytes(TaskInstance taskInstance) {
40-
return getWholeLog(taskInstance, TaskLogType.OUTPUT);
36+
public byte[] getLogBytes(TaskInstance taskInstance, TaskLogType taskLogType) {
37+
return getWholeLog(taskInstance, taskLogType);
4138
}
4239

4340
/**
@@ -50,23 +47,22 @@ public byte[] getTaskOutputBytes(TaskInstance taskInstance) {
5047
* @return Returns the specified part of the log content in string format.
5148
*/
5249

53-
public String getTaskLogString(TaskInstance taskInstance, int skipLineNum, int limit) {
54-
return getPartLog(taskInstance, skipLineNum, limit, TaskLogType.LOG);
55-
}
56-
57-
public String getTaskOutputString(TaskInstance taskInstance, int skipLineNum, int limit) {
58-
return getPartLog(taskInstance, skipLineNum, limit, TaskLogType.OUTPUT);
50+
public String getLogString(TaskInstance taskInstance, int skipLineNum, int limit, TaskLogType taskLogType) {
51+
return getPartLog(taskInstance, skipLineNum, limit, taskLogType);
5952
}
6053

6154
private byte[] getWholeLog(TaskInstance taskInstance, TaskLogType taskLogType) {
62-
return LogUtils.getFileContentBytesFromRemote(taskLogType.getLogPath(taskInstance));
55+
return LogUtils
56+
.getFileContentBytesFromRemote(TaskLogFileProvider.getFilePath(taskInstance.getTaskLogsRootPath(),
57+
TaskLogFileTypeMapping.toTaskLogFileType(taskLogType)));
6358
}
6459

6560
private String getPartLog(TaskInstance taskInstance, int skipLineNum, int limit, TaskLogType taskLogType) {
6661
// todo We can optimize requests by the actual range, reducing disk usage and network traffic.
6762
return LogUtils.rollViewLogLines(
6863
LogUtils.readPartFileContentFromRemote(
69-
taskLogType.getLogPath(taskInstance),
64+
TaskLogFileProvider.getFilePath(taskInstance.getTaskLogsRootPath(),
65+
TaskLogFileTypeMapping.toTaskLogFileType(taskLogType)),
7066
skipLineNum, limit));
7167
}
7268

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/TaskLogType.java

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,24 +17,7 @@
1717

1818
package org.apache.dolphinscheduler.api.executor.logging;
1919

20-
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
21-
2220
public enum TaskLogType {
23-
24-
LOG {
25-
26-
@Override
27-
public String getLogPath(TaskInstance taskInstance) {
28-
return taskInstance.getLogPath();
29-
}
30-
},
31-
OUTPUT {
32-
33-
@Override
34-
public String getLogPath(TaskInstance taskInstance) {
35-
return taskInstance.getTaskOutputLogPath();
36-
}
37-
};
38-
39-
public abstract String getLogPath(TaskInstance taskInstance);
21+
LOG,
22+
OUTPUT
4023
}

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.dolphinscheduler.api.service;
1919

20+
import org.apache.dolphinscheduler.api.executor.logging.TaskLogType;
2021
import org.apache.dolphinscheduler.api.utils.Result;
2122
import org.apache.dolphinscheduler.dao.entity.ResponseTaskLog;
2223
import org.apache.dolphinscheduler.dao.entity.User;
@@ -32,9 +33,8 @@ public interface LoggerService {
3233
* @param limit limit
3334
* @return log string data
3435
*/
35-
Result<ResponseTaskLog> queryTaskLog(User loginUser, int taskInstId, int skipLineNum, int limit);
36-
37-
Result<ResponseTaskLog> queryTaskOutput(User loginUser, int taskInstId, int skipLineNum, int limit);
36+
Result<ResponseTaskLog> queryLog(User loginUser, int taskInstId, int skipLineNum, int limit,
37+
TaskLogType taskLogType);
3838

3939
/**
4040
* get log size
@@ -43,8 +43,6 @@ public interface LoggerService {
4343
* @param taskInstId task instance id
4444
* @return log byte array
4545
*/
46-
byte[] getTaskLogBytes(User loginUser, int taskInstId);
47-
48-
byte[] getTaskOutputBytes(User loginUser, int taskInstId);
46+
byte[] getLogBytes(User loginUser, int taskInstId, TaskLogType taskLogType);
4947

5048
}

0 commit comments

Comments
 (0)