Skip to content

Commit 1841eaf

Browse files
luxldet101
authored andcommitted
fix(api): isolate visited codes per parallel backfill chunk
Ensure dependent backfill triggering is evaluated per parallel date chunk, and batch-load downstream workflow definitions to avoid N+1 queries. Add regression coverage for parallel mode visited-code isolation. Made-with: Cursor
1 parent 29905e8 commit 1841eaf

File tree

2 files changed

+81
-18
lines changed

2 files changed

+81
-18
lines changed

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,9 @@
4242
import java.util.ArrayList;
4343
import java.util.Collections;
4444
import java.util.HashSet;
45+
import java.util.LinkedHashSet;
4546
import java.util.List;
47+
import java.util.Map;
4648
import java.util.Set;
4749
import java.util.stream.Collectors;
4850

@@ -118,11 +120,14 @@ private List<Integer> doParallelBackfillWorkflow(final BackfillWorkflowDTO backf
118120

119121
log.info("In parallel mode, current expectedParallelismNumber: {}", expectedParallelismNumber);
120122
final List<Integer> workflowInstanceIdList = Lists.newArrayList();
123+
final Set<Long> baseVisitedCodes = visitedCodes == null ? new HashSet<>() : visitedCodes;
121124
for (List<ZonedDateTime> stringDate : splitDateTime(listDate, expectedParallelismNumber)) {
125+
// Each parallel chunk should keep its own traversal context to avoid cross-chunk pollution.
126+
final Set<Long> chunkVisitedCodes = new HashSet<>(baseVisitedCodes);
122127
final Integer workflowInstanceId = doBackfillWorkflow(
123128
backfillWorkflowDTO,
124129
stringDate.stream().map(DateUtils::dateToString).collect(Collectors.toList()),
125-
visitedCodes);
130+
chunkVisitedCodes);
126131
workflowInstanceIdList.add(workflowInstanceId);
127132
}
128133
return workflowInstanceIdList;
@@ -190,11 +195,17 @@ private Integer doBackfillWorkflow(final BackfillWorkflowDTO backfillWorkflowDTO
190195
if (backfillParams.getBackfillDependentMode() == ComplementDependentMode.ALL_DEPENDENT) {
191196
final Set<Long> effectiveVisitedCodes = visitedCodes == null ? new HashSet<>() : visitedCodes;
192197
effectiveVisitedCodes.add(backfillWorkflowDTO.getWorkflowDefinition().getCode());
193-
doBackfillDependentWorkflow(backfillWorkflowDTO, backfillTimeList, effectiveVisitedCodes);
198+
doBackfillDependentWorkflowForTesting(backfillWorkflowDTO, backfillTimeList, effectiveVisitedCodes);
194199
}
195200
return backfillTriggerResponse.getWorkflowInstanceId();
196201
}
197202

203+
void doBackfillDependentWorkflowForTesting(final BackfillWorkflowDTO backfillWorkflowDTO,
204+
final List<String> backfillTimeList,
205+
final Set<Long> visitedCodes) {
206+
doBackfillDependentWorkflow(backfillWorkflowDTO, backfillTimeList, visitedCodes);
207+
}
208+
198209
private void doBackfillDependentWorkflow(final BackfillWorkflowDTO backfillWorkflowDTO,
199210
final List<String> backfillTimeList,
200211
final Set<Long> visitedCodes) {
@@ -209,6 +220,12 @@ private void doBackfillDependentWorkflow(final BackfillWorkflowDTO backfillWorkf
209220
log.info("No downstream dependent workflows found for workflow code {}", upstreamWorkflowCode);
210221
return;
211222
}
223+
final Set<Long> downstreamCodes = downstreamDefinitions.stream()
224+
.map(DependentWorkflowDefinition::getWorkflowDefinitionCode)
225+
.collect(Collectors.toCollection(LinkedHashSet::new));
226+
final List<WorkflowDefinition> downstreamWorkflowList = workflowDefinitionDao.queryByCodes(downstreamCodes);
227+
final Map<Long, WorkflowDefinition> downstreamWorkflowMap = downstreamWorkflowList.stream()
228+
.collect(Collectors.toMap(WorkflowDefinition::getCode, workflow -> workflow));
212229

213230
// 2) Convert upstream backfill time from string to ZonedDateTime as the base business dates for downstream
214231
// backfill
@@ -226,8 +243,7 @@ private void doBackfillDependentWorkflow(final BackfillWorkflowDTO backfillWorkf
226243
continue;
227244
}
228245

229-
WorkflowDefinition downstreamWorkflow =
230-
workflowDefinitionDao.queryByCode(downstreamCode).orElse(null);
246+
WorkflowDefinition downstreamWorkflow = downstreamWorkflowMap.get(downstreamCode);
231247
if (downstreamWorkflow == null) {
232248
log.warn("Skip dependent workflow {}, workflow definition not found", downstreamCode);
233249
continue;

dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegateTest.java

Lines changed: 61 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.dolphinscheduler.api.executor.workflow;
1919

2020
import static org.mockito.ArgumentMatchers.any;
21-
import static org.mockito.ArgumentMatchers.anyLong;
21+
import static org.mockito.Mockito.doAnswer;
2222
import static org.mockito.Mockito.doReturn;
2323
import static org.mockito.Mockito.never;
2424
import static org.mockito.Mockito.times;
@@ -40,8 +40,8 @@
4040
import java.util.Arrays;
4141
import java.util.Collections;
4242
import java.util.HashSet;
43+
import java.util.LinkedHashSet;
4344
import java.util.List;
44-
import java.util.Optional;
4545
import java.util.Set;
4646
import java.util.stream.Collectors;
4747

@@ -102,7 +102,7 @@ public void testDoBackfillDependentWorkflow_NoDownstreamDefinitions() throws Exc
102102
visitedCodes.add(dto.getWorkflowDefinition().getCode());
103103
method.invoke(backfillWorkflowExecutorDelegate, dto, backfillTimeList, visitedCodes);
104104

105-
verify(workflowDefinitionDao, never()).queryByCode(anyLong());
105+
verify(workflowDefinitionDao, never()).queryByCodes(any());
106106
}
107107

108108
@Test
@@ -145,8 +145,8 @@ public void testDoBackfillDependentWorkflow_WithDownstream_AllLevelDependent() t
145145

146146
when(workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamCode))
147147
.thenReturn(Arrays.asList(selfDependent, validDependent));
148-
when(workflowDefinitionDao.queryByCode(downstreamCode))
149-
.thenReturn(Optional.of(downstreamWorkflow));
148+
when(workflowDefinitionDao.queryByCodes(new LinkedHashSet<>(Arrays.asList(upstreamCode, downstreamCode))))
149+
.thenReturn(Collections.singletonList(downstreamWorkflow));
150150

151151
ArgumentCaptor<BackfillWorkflowDTO> captor = ArgumentCaptor.forClass(BackfillWorkflowDTO.class);
152152
doReturn(Collections.singletonList(1)).when(backfillWorkflowExecutorDelegate)
@@ -164,7 +164,8 @@ public void testDoBackfillDependentWorkflow_WithDownstream_AllLevelDependent() t
164164
visitedCodes.add(dto.getWorkflowDefinition().getCode());
165165
method.invoke(backfillWorkflowExecutorDelegate, dto, backfillTimeList, visitedCodes);
166166

167-
verify(workflowDefinitionDao).queryByCode(downstreamCode);
167+
verify(workflowDefinitionDao)
168+
.queryByCodes(new LinkedHashSet<>(Arrays.asList(upstreamCode, downstreamCode)));
168169

169170
BackfillWorkflowDTO captured = captor.getValue();
170171
Assertions.assertNotNull(captured);
@@ -219,8 +220,8 @@ public void testDoBackfillDependentWorkflow_WithDownstream_SingleLevelDependent(
219220

220221
when(workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamCode))
221222
.thenReturn(Collections.singletonList(validDependent));
222-
when(workflowDefinitionDao.queryByCode(downstreamCode))
223-
.thenReturn(Optional.of(downstreamWorkflow));
223+
when(workflowDefinitionDao.queryByCodes(Collections.singleton(downstreamCode)))
224+
.thenReturn(Collections.singletonList(downstreamWorkflow));
224225

225226
ArgumentCaptor<BackfillWorkflowDTO> captor = ArgumentCaptor.forClass(BackfillWorkflowDTO.class);
226227
doReturn(Collections.singletonList(1)).when(backfillWorkflowExecutorDelegate)
@@ -236,7 +237,7 @@ public void testDoBackfillDependentWorkflow_WithDownstream_SingleLevelDependent(
236237
visitedCodes.add(dto.getWorkflowDefinition().getCode());
237238
method.invoke(backfillWorkflowExecutorDelegate, dto, backfillTimeList, visitedCodes);
238239

239-
verify(workflowDefinitionDao).queryByCode(downstreamCode);
240+
verify(workflowDefinitionDao).queryByCodes(Collections.singleton(downstreamCode));
240241

241242
BackfillWorkflowDTO captured = captor.getValue();
242243
Assertions.assertNotNull(captured);
@@ -275,7 +276,8 @@ public void testDoBackfillDependentWorkflow_SkipWorkflowNotFound() throws Except
275276

276277
when(workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamCode))
277278
.thenReturn(Collections.singletonList(dep));
278-
when(workflowDefinitionDao.queryByCode(downstreamCode)).thenReturn(Optional.empty());
279+
when(workflowDefinitionDao.queryByCodes(Collections.singleton(downstreamCode)))
280+
.thenReturn(Collections.emptyList());
279281

280282
Method method = BackfillWorkflowExecutorDelegate.class.getDeclaredMethod(
281283
"doBackfillDependentWorkflow", BackfillWorkflowDTO.class, List.class, Set.class);
@@ -318,7 +320,8 @@ public void testDoBackfillDependentWorkflow_SkipOfflineWorkflow() throws Excepti
318320

319321
when(workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamCode))
320322
.thenReturn(Collections.singletonList(dep));
321-
when(workflowDefinitionDao.queryByCode(downstreamCode)).thenReturn(Optional.of(offlineDownstream));
323+
when(workflowDefinitionDao.queryByCodes(Collections.singleton(downstreamCode)))
324+
.thenReturn(Collections.singletonList(offlineDownstream));
322325

323326
Method method = BackfillWorkflowExecutorDelegate.class.getDeclaredMethod(
324327
"doBackfillDependentWorkflow", BackfillWorkflowDTO.class, List.class, Set.class);
@@ -371,8 +374,10 @@ public void testDoBackfillDependentWorkflow_MultiLevelAndCycle() throws Exceptio
371374
.thenReturn(Collections.singletonList(depToB));
372375
when(workflowLineageService.queryDownstreamDependentWorkflowDefinitions(workflowB))
373376
.thenReturn(Arrays.asList(depToA, depToC));
374-
when(workflowDefinitionDao.queryByCode(workflowB)).thenReturn(Optional.of(downstreamB));
375-
when(workflowDefinitionDao.queryByCode(workflowC)).thenReturn(Optional.of(downstreamC));
377+
when(workflowDefinitionDao.queryByCodes(Collections.singleton(workflowB)))
378+
.thenReturn(Collections.singletonList(downstreamB));
379+
when(workflowDefinitionDao.queryByCodes(new LinkedHashSet<>(Arrays.asList(workflowA, workflowC))))
380+
.thenReturn(Collections.singletonList(downstreamC));
376381

377382
ArgumentCaptor<BackfillWorkflowDTO> captor = ArgumentCaptor.forClass(BackfillWorkflowDTO.class);
378383
doReturn(Collections.singletonList(1)).when(backfillWorkflowExecutorDelegate)
@@ -394,7 +399,7 @@ public void testDoBackfillDependentWorkflow_MultiLevelAndCycle() throws Exceptio
394399
method.invoke(backfillWorkflowExecutorDelegate, dtoB, backfillTimeList, visitedCodes);
395400

396401
verify(backfillWorkflowExecutorDelegate, times(2)).executeWithVisitedCodes(any(), any());
397-
verify(workflowDefinitionDao, never()).queryByCode(workflowA);
402+
verify(workflowDefinitionDao, times(2)).queryByCodes(any());
398403

399404
List<Long> triggeredCodes = captor.getAllValues().stream()
400405
.map(it -> it.getWorkflowDefinition().getCode())
@@ -403,4 +408,46 @@ public void testDoBackfillDependentWorkflow_MultiLevelAndCycle() throws Exceptio
403408
Assertions.assertTrue(visitedCodes.contains(workflowB));
404409
Assertions.assertTrue(visitedCodes.contains(workflowC));
405410
}
411+
412+
@Test
413+
public void testDoParallelBackfillWorkflow_ShouldIsolateVisitedCodesAcrossChunks() {
414+
long upstreamCode = 500L;
415+
WorkflowDefinition upstreamWorkflow =
416+
WorkflowDefinition.builder().code(upstreamCode).releaseState(ReleaseState.ONLINE).build();
417+
List<ZonedDateTime> dates = Arrays.asList(
418+
ZonedDateTime.parse("2026-02-01T00:00:00Z"),
419+
ZonedDateTime.parse("2026-02-02T00:00:00Z"),
420+
ZonedDateTime.parse("2026-02-03T00:00:00Z"));
421+
BackfillWorkflowDTO.BackfillParamsDTO params = BackfillWorkflowDTO.BackfillParamsDTO.builder()
422+
.runMode(RunMode.RUN_MODE_PARALLEL)
423+
.backfillDateList(dates)
424+
.expectedParallelismNumber(2)
425+
.backfillDependentMode(ComplementDependentMode.ALL_DEPENDENT)
426+
.allLevelDependent(true)
427+
.executionOrder(ExecutionOrder.ASC_ORDER)
428+
.build();
429+
BackfillWorkflowDTO dto = BackfillWorkflowDTO.builder()
430+
.workflowDefinition(upstreamWorkflow)
431+
.backfillParams(params)
432+
.build();
433+
Set<Long> baseVisitedCodes = new HashSet<>(Collections.singleton(upstreamCode));
434+
List<Set<Long>> visitedSnapshotPerChunk = new java.util.ArrayList<>();
435+
436+
doAnswer(invocation -> {
437+
Set<Long> chunkVisited = invocation.getArgument(2);
438+
visitedSnapshotPerChunk.add(new HashSet<>(chunkVisited));
439+
chunkVisited.add(9000L + visitedSnapshotPerChunk.size());
440+
return null;
441+
}).when(backfillWorkflowExecutorDelegate).doBackfillDependentWorkflowForTesting(any(), any(), any());
442+
443+
List<Integer> result = backfillWorkflowExecutorDelegate.executeWithVisitedCodes(dto, baseVisitedCodes);
444+
445+
Assertions.assertEquals(2, result.size());
446+
Assertions.assertEquals(2, visitedSnapshotPerChunk.size());
447+
Assertions.assertEquals(Collections.singleton(upstreamCode), visitedSnapshotPerChunk.get(0));
448+
Assertions.assertEquals(Collections.singleton(upstreamCode), visitedSnapshotPerChunk.get(1));
449+
Assertions.assertEquals(Collections.singleton(upstreamCode), baseVisitedCodes);
450+
verify(backfillWorkflowExecutorDelegate, times(2))
451+
.doBackfillDependentWorkflowForTesting(any(), any(), any());
452+
}
406453
}

0 commit comments

Comments
 (0)