Skip to content

Commit 1df9f66

Browse files
authored
[DSIP-95][API] Complete the functionality of using dependencies in the complement data (#18003)
1 parent 5314ac0 commit 1df9f66

File tree

5 files changed

+733
-19
lines changed

5 files changed

+733
-19
lines changed

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java

Lines changed: 114 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.dolphinscheduler.api.executor.workflow;
1919

2020
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
21+
import org.apache.dolphinscheduler.api.service.WorkflowLineageService;
2122
import org.apache.dolphinscheduler.api.validator.workflow.BackfillWorkflowDTO;
2223
import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
2324
import org.apache.dolphinscheduler.common.enums.ExecutionOrder;
@@ -49,17 +50,57 @@
4950
@Component
5051
public class BackfillWorkflowExecutorDelegate implements IExecutorDelegate<BackfillWorkflowDTO, List<Integer>> {
5152

53+
@Autowired
54+
private WorkflowLineageService workflowLineageService;
55+
5256
@Autowired
5357
private RegistryClient registryClient;
5458

5559
@Override
5660
public List<Integer> execute(final BackfillWorkflowDTO backfillWorkflowDTO) {
61+
return executeWithDependentExpansion(backfillWorkflowDTO);
62+
}
63+
64+
/**
65+
* Expands optional downstream workflows, then submits root and each downstream in list order.
66+
* <p>
67+
* {@link RunMode} (serial vs parallel date sharding) is taken only from the <strong>root</strong>
68+
* {@code backfillWorkflowDTO}'s {@link BackfillWorkflowDTO.BackfillParamsDTO#getRunMode()}; downstream DTOs
69+
* mirror the same mode in their params for consistency.
70+
*/
71+
List<Integer> executeWithDependentExpansion(final BackfillWorkflowDTO backfillWorkflowDTO) {
5772
// todo: directly call the master api to do backfill
73+
List<BackfillWorkflowDTO> dependentBackfillDtos = new ArrayList<>();
74+
dependentBackfillDtos.add(backfillWorkflowDTO);
75+
if (backfillWorkflowDTO.getBackfillParams()
76+
.getBackfillDependentMode() == ComplementDependentMode.ALL_DEPENDENT) {
77+
78+
List<WorkflowDefinition> downstreamWorkflowList =
79+
workflowLineageService.resolveDownstreamWorkflowDefinitionCodes(
80+
backfillWorkflowDTO.getWorkflowDefinition().getCode(),
81+
backfillWorkflowDTO.getBackfillParams().isAllLevelDependent(),
82+
true);
83+
if (downstreamWorkflowList.isEmpty()) {
84+
log.info("No downstream dependent workflows found for workflow code {}",
85+
backfillWorkflowDTO.getWorkflowDefinition().getCode());
86+
} else {
87+
dependentBackfillDtos.addAll(buildResolvedDownstreamBackfillDtos(backfillWorkflowDTO,
88+
backfillWorkflowDTO.getBackfillParams().getBackfillDateList(),
89+
downstreamWorkflowList));
90+
}
91+
}
92+
List<Integer> workflowInstanceIdList = new ArrayList<>();
93+
// RunMode is defined by the root request only (not per downstream DTO).
5894
if (backfillWorkflowDTO.getBackfillParams().getRunMode() == RunMode.RUN_MODE_SERIAL) {
59-
return doSerialBackfillWorkflow(backfillWorkflowDTO);
95+
for (BackfillWorkflowDTO dependentDto : dependentBackfillDtos) {
96+
workflowInstanceIdList.addAll(doSerialBackfillWorkflow(dependentDto));
97+
}
6098
} else {
61-
return doParallelBackfillWorkflow(backfillWorkflowDTO);
99+
for (BackfillWorkflowDTO dependentDto : dependentBackfillDtos) {
100+
workflowInstanceIdList.addAll(doParallelBackfillWorkflow(dependentDto));
101+
}
62102
}
103+
return workflowInstanceIdList;
63104
}
64105

65106
private List<Integer> doSerialBackfillWorkflow(final BackfillWorkflowDTO backfillWorkflowDTO) {
@@ -71,9 +112,7 @@ private List<Integer> doSerialBackfillWorkflow(final BackfillWorkflowDTO backfil
71112
Collections.sort(backfillTimeList);
72113
}
73114

74-
final Integer workflowInstanceId = doBackfillWorkflow(
75-
backfillWorkflowDTO,
76-
backfillTimeList.stream().map(DateUtils::dateToString).collect(Collectors.toList()));
115+
final Integer workflowInstanceId = doBackfillWorkflow(backfillWorkflowDTO, backfillTimeList);
77116
return Lists.newArrayList(workflowInstanceId);
78117
}
79118

@@ -92,8 +131,7 @@ private List<Integer> doParallelBackfillWorkflow(final BackfillWorkflowDTO backf
92131
final List<Integer> workflowInstanceIdList = Lists.newArrayList();
93132
for (List<ZonedDateTime> stringDate : splitDateTime(listDate, expectedParallelismNumber)) {
94133
final Integer workflowInstanceId = doBackfillWorkflow(
95-
backfillWorkflowDTO,
96-
stringDate.stream().map(DateUtils::dateToString).collect(Collectors.toList()));
134+
backfillWorkflowDTO, stringDate);
97135
workflowInstanceIdList.add(workflowInstanceId);
98136
}
99137
return workflowInstanceIdList;
@@ -124,12 +162,15 @@ private List<List<ZonedDateTime>> splitDateTime(List<ZonedDateTime> dateTimeList
124162
}
125163

126164
private Integer doBackfillWorkflow(final BackfillWorkflowDTO backfillWorkflowDTO,
127-
final List<String> backfillTimeList) {
165+
final List<ZonedDateTime> backfillDateTimes) {
128166
final Server masterServer = registryClient.getRandomServer(RegistryNodeType.MASTER).orElse(null);
129167
if (masterServer == null) {
130168
throw new ServiceException("no master server available");
131169
}
132170

171+
final List<String> backfillTimeList =
172+
backfillDateTimes.stream().map(DateUtils::dateToString).collect(Collectors.toList());
173+
133174
final WorkflowDefinition workflowDefinition = backfillWorkflowDTO.getWorkflowDefinition();
134175
final WorkflowBackfillTriggerRequest backfillTriggerRequest = WorkflowBackfillTriggerRequest.builder()
135176
.userId(backfillWorkflowDTO.getLoginUser().getId())
@@ -149,22 +190,76 @@ private Integer doBackfillWorkflow(final BackfillWorkflowDTO backfillWorkflowDTO
149190
.dryRun(backfillWorkflowDTO.getDryRun())
150191
.build();
151192

152-
final WorkflowBackfillTriggerResponse backfillTriggerResponse = Clients
153-
.withService(IWorkflowControlClient.class)
154-
.withHost(masterServer.getHost() + ":" + masterServer.getPort())
155-
.backfillTriggerWorkflow(backfillTriggerRequest);
193+
final WorkflowBackfillTriggerResponse backfillTriggerResponse =
194+
triggerBackfillWorkflow(backfillTriggerRequest, masterServer);
156195
if (!backfillTriggerResponse.isSuccess()) {
157196
throw new ServiceException("Backfill workflow failed: " + backfillTriggerResponse.getMessage());
158197
}
159-
final BackfillWorkflowDTO.BackfillParamsDTO backfillParams = backfillWorkflowDTO.getBackfillParams();
160-
if (backfillParams.getBackfillDependentMode() == ComplementDependentMode.ALL_DEPENDENT) {
161-
doBackfillDependentWorkflow(backfillWorkflowDTO, backfillTimeList);
162-
}
163198
return backfillTriggerResponse.getWorkflowInstanceId();
164199
}
165200

166-
private void doBackfillDependentWorkflow(final BackfillWorkflowDTO backfillWorkflowDTO,
167-
final List<String> backfillTimeList) {
168-
// todo:
201+
protected WorkflowBackfillTriggerResponse triggerBackfillWorkflow(final WorkflowBackfillTriggerRequest request,
202+
final Server masterServer) {
203+
return Clients
204+
.withService(IWorkflowControlClient.class)
205+
.withHost(masterServer.getHost() + ":" + masterServer.getPort())
206+
.backfillTriggerWorkflow(request);
207+
}
208+
209+
/**
210+
* Builds {@link BackfillWorkflowDTO} list for resolved downstream workflows.
211+
* {@link RunMode} in each downstream {@link BackfillWorkflowDTO.BackfillParamsDTO} matches the root (see
212+
* {@link #executeWithDependentExpansion(BackfillWorkflowDTO)}).
213+
*/
214+
private List<BackfillWorkflowDTO> buildResolvedDownstreamBackfillDtos(final BackfillWorkflowDTO backfillWorkflowDTO,
215+
final List<ZonedDateTime> backfillDateTimes,
216+
final List<WorkflowDefinition> downstreamWorkflows) {
217+
final long upstreamWorkflowCode = backfillWorkflowDTO.getWorkflowDefinition().getCode();
218+
final List<ZonedDateTime> upstreamBackfillDates = new ArrayList<>(backfillDateTimes);
219+
final BackfillWorkflowDTO.BackfillParamsDTO originalParams = backfillWorkflowDTO.getBackfillParams();
220+
final boolean allLevelDependent = originalParams.isAllLevelDependent();
221+
222+
final List<BackfillWorkflowDTO> result = new ArrayList<>();
223+
for (WorkflowDefinition downstreamWorkflow : downstreamWorkflows) {
224+
final long downstreamCode = downstreamWorkflow.getCode();
225+
226+
final BackfillWorkflowDTO.BackfillParamsDTO dependentParams =
227+
BackfillWorkflowDTO.BackfillParamsDTO.builder()
228+
// Same as root; executor also branches on root RunMode only.
229+
.runMode(originalParams.getRunMode())
230+
.backfillDateList(upstreamBackfillDates)
231+
.expectedParallelismNumber(originalParams.getExpectedParallelismNumber())
232+
// Downstream expansion has already been decided in resolution stage.
233+
.backfillDependentMode(ComplementDependentMode.OFF_MODE)
234+
.allLevelDependent(allLevelDependent)
235+
.executionOrder(originalParams.getExecutionOrder())
236+
.build();
237+
238+
final BackfillWorkflowDTO dependentBackfillDTO = BackfillWorkflowDTO.builder()
239+
.loginUser(backfillWorkflowDTO.getLoginUser())
240+
.workflowDefinition(downstreamWorkflow)
241+
.startNodes(null)
242+
.failureStrategy(backfillWorkflowDTO.getFailureStrategy())
243+
.taskDependType(backfillWorkflowDTO.getTaskDependType())
244+
.execType(backfillWorkflowDTO.getExecType())
245+
.warningType(backfillWorkflowDTO.getWarningType())
246+
.warningGroupId(downstreamWorkflow.getWarningGroupId())
247+
.runMode(dependentParams.getRunMode())
248+
.workflowInstancePriority(backfillWorkflowDTO.getWorkflowInstancePriority())
249+
.workerGroup(backfillWorkflowDTO.getWorkerGroup())
250+
.tenantCode(backfillWorkflowDTO.getTenantCode())
251+
.environmentCode(backfillWorkflowDTO.getEnvironmentCode())
252+
.startParamList(backfillWorkflowDTO.getStartParamList())
253+
.dryRun(backfillWorkflowDTO.getDryRun())
254+
.backfillParams(dependentParams)
255+
.build();
256+
257+
log.info("Built dependent backfill DTO for workflow {} (upstream {}) with backfill dates {}",
258+
downstreamCode, upstreamWorkflowCode,
259+
backfillDateTimes.stream().map(DateUtils::dateToString).collect(Collectors.toList()));
260+
261+
result.add(dependentBackfillDTO);
262+
}
263+
return result;
169264
}
170265
}

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkflowLineageService.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.dolphinscheduler.dao.entity.DependentWorkflowDefinition;
2222
import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
2323
import org.apache.dolphinscheduler.dao.entity.WorkFlowRelationDetail;
24+
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
2425
import org.apache.dolphinscheduler.dao.entity.WorkflowTaskLineage;
2526

2627
import java.util.List;
@@ -42,6 +43,37 @@ public interface WorkflowLineageService {
4243
*/
4344
List<DependentWorkflowDefinition> queryDownstreamDependentWorkflowDefinitions(Long workflowDefinitionCode);
4445

46+
/**
47+
* Resolve downstream workflow definitions that depend on the given root workflow, using stored lineage.
48+
* <ul>
49+
* <li>{@code allLevelDependent == false}: only direct dependents of the root.</li>
50+
* <li>{@code allLevelDependent == true}: transitive dependents (BFS over direct dependents), skipping the root
51+
* when it reappears as an edge target (cycle back to root).</li>
52+
* </ul>
53+
*
54+
* @param rootWorkflowDefinitionCode workflow to start from (not included in the result)
55+
* @param allLevelDependent {@code true} for transitive closure, {@code false} for one hop only
56+
* @return ordered distinct downstream workflow definitions (stable order: BFS / insertion order)
57+
*/
58+
default List<WorkflowDefinition> resolveDownstreamWorkflowDefinitionCodes(long rootWorkflowDefinitionCode,
59+
boolean allLevelDependent) {
60+
return resolveDownstreamWorkflowDefinitionCodes(rootWorkflowDefinitionCode, allLevelDependent, false);
61+
}
62+
63+
/**
64+
* Resolve downstream workflow definitions and optionally filter offline workflows.
65+
* When {@code filterOfflineWorkflow} is true, offline workflow definitions are excluded from the result and are
66+
* not expanded further during transitive traversal.
67+
*
68+
* @param rootWorkflowDefinitionCode workflow to start from (not included in the result)
69+
* @param allLevelDependent {@code true} for transitive closure, {@code false} for one hop only
70+
* @param filterOfflineWorkflow whether offline workflows should be filtered out during traversal
71+
* @return ordered distinct downstream workflow definitions (stable order: BFS / insertion order)
72+
*/
73+
List<WorkflowDefinition> resolveDownstreamWorkflowDefinitionCodes(long rootWorkflowDefinitionCode,
74+
boolean allLevelDependent,
75+
boolean filterOfflineWorkflow);
76+
4577
/**
4678
* Query and return tasks dependence with string format, is a wrapper of queryTaskDepOnTask and task query method.
4779
*

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowLineageServiceImpl.java

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
2222
import org.apache.dolphinscheduler.api.service.WorkflowLineageService;
2323
import org.apache.dolphinscheduler.common.constants.Constants;
24+
import org.apache.dolphinscheduler.common.enums.ReleaseState;
2425
import org.apache.dolphinscheduler.dao.entity.DependentLineageTask;
2526
import org.apache.dolphinscheduler.dao.entity.DependentWorkflowDefinition;
2627
import org.apache.dolphinscheduler.dao.entity.Project;
@@ -37,9 +38,15 @@
3738

3839
import java.text.MessageFormat;
3940
import java.util.ArrayList;
41+
import java.util.Collection;
4042
import java.util.Collections;
43+
import java.util.HashMap;
44+
import java.util.HashSet;
45+
import java.util.LinkedHashSet;
4146
import java.util.List;
47+
import java.util.Map;
4248
import java.util.Optional;
49+
import java.util.Set;
4350
import java.util.stream.Collectors;
4451
import java.util.stream.Stream;
4552

@@ -319,6 +326,112 @@ public List<DependentLineageTask> queryDependentWorkflowDefinitions(long project
319326
return dependentLineageTaskList;
320327
}
321328

329+
@Override
330+
public List<WorkflowDefinition> resolveDownstreamWorkflowDefinitionCodes(long rootWorkflowDefinitionCode,
331+
boolean allLevelDependent,
332+
boolean filterOfflineWorkflow) {
333+
334+
Set<Long> resultCodes = new LinkedHashSet<>();
335+
Set<Long> visitedWorkflowCodes = new HashSet<>();
336+
List<Long> frontier = Collections.singletonList(rootWorkflowDefinitionCode);
337+
Map<Long, WorkflowDefinition> workflowDefinitionCache = new HashMap<>();
338+
339+
while (!CollectionUtils.isEmpty(frontier)) {
340+
List<Long> currentLevel = new ArrayList<>(frontier);
341+
frontier = new ArrayList<>();
342+
343+
for (Long upstreamCode : currentLevel) {
344+
if (!visitedWorkflowCodes.add(upstreamCode)) {
345+
continue;
346+
}
347+
List<Long> directDownstreamCodes = queryDirectDownstreamWorkflowCodes(upstreamCode);
348+
if (CollectionUtils.isEmpty(directDownstreamCodes)) {
349+
continue;
350+
}
351+
352+
cacheWorkflowDefinitions(directDownstreamCodes, workflowDefinitionCache);
353+
for (Long downstreamCode : directDownstreamCodes) {
354+
if (downstreamCode == rootWorkflowDefinitionCode) {
355+
// Skip cycle edge back to root; root should never appear in downstream result.
356+
continue;
357+
}
358+
WorkflowDefinition downstreamWorkflow = workflowDefinitionCache.get(downstreamCode);
359+
if (downstreamWorkflow == null) {
360+
continue;
361+
}
362+
if (!resultCodes.add(downstreamCode)) {
363+
continue;
364+
}
365+
if (allLevelDependent && (!filterOfflineWorkflow
366+
|| downstreamWorkflow.getReleaseState() != ReleaseState.OFFLINE)) {
367+
frontier.add(downstreamCode);
368+
}
369+
}
370+
}
371+
372+
if (!allLevelDependent) {
373+
break;
374+
}
375+
}
376+
377+
if (resultCodes.isEmpty()) {
378+
return Collections.emptyList();
379+
}
380+
381+
if (filterOfflineWorkflow) {
382+
resultCodes = resultCodes.stream()
383+
.filter(code -> {
384+
WorkflowDefinition definition = workflowDefinitionCache.get(code);
385+
return definition != null
386+
&& definition.getReleaseState() != ReleaseState.OFFLINE;
387+
})
388+
.collect(Collectors.toCollection(LinkedHashSet::new));
389+
if (resultCodes.isEmpty()) {
390+
return Collections.emptyList();
391+
}
392+
}
393+
394+
List<WorkflowDefinition> orderedDefinitions = new ArrayList<>();
395+
for (Long code : resultCodes) {
396+
WorkflowDefinition definition = workflowDefinitionCache.get(code);
397+
if (definition != null) {
398+
orderedDefinitions.add(definition);
399+
}
400+
}
401+
return orderedDefinitions;
402+
}
403+
404+
private List<Long> queryDirectDownstreamWorkflowCodes(long upstreamWorkflowDefinitionCode) {
405+
List<WorkflowTaskLineage> workflowTaskLineageList =
406+
workflowTaskLineageDao.queryWorkFlowLineageByDept(Constants.DEFAULT_PROJECT_CODE,
407+
upstreamWorkflowDefinitionCode,
408+
Constants.DEPENDENT_ALL_TASK);
409+
if (CollectionUtils.isEmpty(workflowTaskLineageList)) {
410+
return Collections.emptyList();
411+
}
412+
return workflowTaskLineageList.stream()
413+
.map(WorkflowTaskLineage::getWorkflowDefinitionCode)
414+
.distinct()
415+
.collect(Collectors.toList());
416+
}
417+
418+
private void cacheWorkflowDefinitions(Collection<Long> workflowDefinitionCodes,
419+
Map<Long, WorkflowDefinition> workflowDefinitionCache) {
420+
List<Long> missingCodes = workflowDefinitionCodes.stream()
421+
.filter(code -> !workflowDefinitionCache.containsKey(code))
422+
.collect(Collectors.toList());
423+
if (CollectionUtils.isEmpty(missingCodes)) {
424+
return;
425+
}
426+
List<WorkflowDefinition> workflowDefinitions = workflowDefinitionMapper.queryByCodes(missingCodes);
427+
if (CollectionUtils.isEmpty(workflowDefinitions)) {
428+
return;
429+
}
430+
for (WorkflowDefinition workflowDefinition : workflowDefinitions) {
431+
workflowDefinitionCache.put(workflowDefinition.getCode(), workflowDefinition);
432+
}
433+
}
434+
322435
@Override
323436
public int updateWorkflowLineage(long workflowDefinitionCode, List<WorkflowTaskLineage> workflowTaskLineages) {
324437
// Remove existing lineage first to keep data consistent

0 commit comments

Comments
 (0)