Skip to content

Commit 5722899

Browse files
authored
feat: abort cloud execution (#893)
1 parent a589f61 commit 5722899

12 files changed

Lines changed: 703 additions & 18 deletions

File tree

cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/response/ExecutionPlanResponse.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,10 @@ public boolean isAwait() {
9595
return action == CloudExecutionAction.AWAIT;
9696
}
9797

98+
public boolean isAbort() {
99+
return action == CloudExecutionAction.ABORT;
100+
}
101+
98102
public void validate() {
99103
if (isExecute() && executionId == null) {
100104
throw new RuntimeException("ExecutionPlan must contain a valid executionId");

cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/planner/CloudExecutionPlanner.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,12 @@ public ExecutionPlan getNextExecution(List<AbstractLoadedStage> loadedStages) th
116116
Math.min(remainingTimeForSameGuid, coreConfiguration.getLockTryFrequencyMillis())
117117
);
118118

119+
} else if (response.isAbort()) {
120+
List<ExecutableStage> stages = CloudExecutionPlanMapper.getExecutableStages(response, loadedStages);
121+
return ExecutionPlan.ABORT(stages);
122+
119123
} else {
120-
throw new RuntimeException("Unrecognized action from response. Not within(CONTINUE, EXECUTE, AWAIT)");
124+
throw new RuntimeException("Unrecognized action from response. Not within(CONTINUE, EXECUTE, AWAIT, ABORT)");
121125
}
122126

123127
} catch (FlamingockException ex) {

cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlanMapperTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,29 @@ void shouldThrowManualInterventionOnValidate() {
165165
assertThrows(ManualInterventionRequiredException.class, plan::validate);
166166
}
167167

168+
@Test
169+
@DisplayName("Should build stages from ABORT response preserving MANUAL_INTERVENTION actions")
170+
void shouldBuildAbortPlanFromAbortResponse() {
171+
List<AbstractLoadedStage> loadedStages = Arrays.asList(buildStage("stage-1", change1, change2));
172+
ExecutionPlanResponse response = new ExecutionPlanResponse(
173+
CloudExecutionAction.ABORT, "exec-1", null,
174+
Arrays.asList(buildStageResponse("stage-1", 0,
175+
changeResponse(change1.getId(), CloudChangeAction.MANUAL_INTERVENTION),
176+
changeResponse(change2.getId(), CloudChangeAction.APPLY)))
177+
);
178+
179+
List<ExecutableStage> result = CloudExecutionPlanMapper.getExecutableStages(response, loadedStages);
180+
ExecutionPlan plan = ExecutionPlan.ABORT(result);
181+
182+
assertTrue(plan.isAborted());
183+
assertFalse(plan.isExecutionRequired());
184+
185+
Map<String, ChangeAction> actions = result.get(0).getChanges().stream()
186+
.collect(Collectors.toMap(ExecutableChange::getId, ExecutableChange::getAction));
187+
assertEquals(ChangeAction.MANUAL_INTERVENTION, actions.get(change1.getId()));
188+
assertEquals(ChangeAction.APPLY, actions.get(change2.getId()));
189+
}
190+
168191
private static DefaultLoadedStage buildStage(String name, AbstractLoadedChange... changes) {
169192
return new DefaultLoadedStage(name, StageType.DEFAULT, Arrays.asList(changes));
170193
}
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
* Copyright 2026 Flamingock (https://www.flamingock.io)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.flamingock.cloud.planner;
17+
18+
import io.flamingock.api.StageType;
19+
import io.flamingock.cloud.api.response.ChangeResponse;
20+
import io.flamingock.cloud.api.response.ExecutionPlanResponse;
21+
import io.flamingock.cloud.api.response.StageResponse;
22+
import io.flamingock.cloud.api.vo.CloudChangeAction;
23+
import io.flamingock.cloud.api.vo.CloudExecutionAction;
24+
import io.flamingock.cloud.lock.CloudLockService;
25+
import io.flamingock.cloud.planner.client.ExecutionPlannerClient;
26+
import io.flamingock.internal.common.core.recovery.ManualInterventionRequiredException;
27+
import io.flamingock.internal.core.change.loaded.AbstractLoadedChange;
28+
import io.flamingock.internal.core.change.loaded.LoadedChangeBuilder;
29+
import io.flamingock.internal.core.configuration.core.CoreConfigurable;
30+
import io.flamingock.internal.core.external.targets.mark.TargetSystemAuditMarker;
31+
import io.flamingock.internal.core.plan.ExecutionPlan;
32+
import io.flamingock.internal.core.pipeline.loaded.stage.AbstractLoadedStage;
33+
import io.flamingock.internal.core.pipeline.loaded.stage.DefaultLoadedStage;
34+
import io.flamingock.internal.util.TimeService;
35+
import io.flamingock.internal.util.id.RunnerId;
36+
import io.flamingock.core.cloud.changes._001__CloudChange1;
37+
import org.junit.jupiter.api.BeforeAll;
38+
import org.junit.jupiter.api.BeforeEach;
39+
import org.junit.jupiter.api.DisplayName;
40+
import org.junit.jupiter.api.Test;
41+
42+
import java.util.Arrays;
43+
import java.util.Collections;
44+
import java.util.List;
45+
46+
import static org.junit.jupiter.api.Assertions.*;
47+
import static org.mockito.ArgumentMatchers.any;
48+
import static org.mockito.ArgumentMatchers.anyLong;
49+
import static org.mockito.ArgumentMatchers.anyString;
50+
import static org.mockito.Mockito.mock;
51+
import static org.mockito.Mockito.when;
52+
53+
class CloudExecutionPlannerTest {
54+
55+
private static AbstractLoadedChange change1;
56+
57+
private ExecutionPlannerClient client;
58+
private CloudExecutionPlanner planner;
59+
60+
@BeforeAll
61+
static void setupChanges() {
62+
change1 = LoadedChangeBuilder.getCodeBuilderInstance(_001__CloudChange1.class).build();
63+
}
64+
65+
@BeforeEach
66+
void setup() {
67+
client = mock(ExecutionPlannerClient.class);
68+
CoreConfigurable config = mock(CoreConfigurable.class);
69+
when(config.getLockAcquiredForMillis()).thenReturn(60000L);
70+
when(config.getLockQuitTryingAfterMillis()).thenReturn(30000L);
71+
when(config.getLockTryFrequencyMillis()).thenReturn(1000L);
72+
73+
TargetSystemAuditMarker auditMarker = mock(TargetSystemAuditMarker.class);
74+
when(auditMarker.listAll()).thenReturn(Collections.emptySet());
75+
76+
planner = new CloudExecutionPlanner(
77+
RunnerId.fromString("test-runner"),
78+
client,
79+
config,
80+
mock(CloudLockService.class),
81+
auditMarker,
82+
TimeService.getDefault()
83+
);
84+
}
85+
86+
@Test
87+
@DisplayName("Should return ABORT plan when server returns ABORT with MANUAL_INTERVENTION changes")
88+
void shouldReturnAbortPlanWhenServerReturnsAbort() {
89+
ExecutionPlanResponse response = new ExecutionPlanResponse(
90+
CloudExecutionAction.ABORT,
91+
"exec-1",
92+
null,
93+
Collections.singletonList(new StageResponse("stage-1", 0,
94+
Collections.singletonList(new ChangeResponse(change1.getId(), CloudChangeAction.MANUAL_INTERVENTION))))
95+
);
96+
when(client.createExecution(any(), any(), anyLong())).thenReturn(response);
97+
98+
List<AbstractLoadedStage> stages = Collections.singletonList(
99+
new DefaultLoadedStage("stage-1", StageType.DEFAULT, Collections.singletonList(change1)));
100+
101+
ExecutionPlan plan = planner.getNextExecution(stages);
102+
103+
assertTrue(plan.isAborted());
104+
assertFalse(plan.isExecutionRequired());
105+
assertThrows(ManualInterventionRequiredException.class, plan::validate);
106+
}
107+
108+
@Test
109+
@DisplayName("Should return ABORT plan that throws FlamingockException when server returns ABORT but no MI changes")
110+
void shouldReturnAbortPlanWhenServerReturnsAbortWithNoMIChanges() {
111+
ExecutionPlanResponse response = new ExecutionPlanResponse(
112+
CloudExecutionAction.ABORT,
113+
"exec-1",
114+
null,
115+
Collections.singletonList(new StageResponse("stage-1", 0,
116+
Collections.singletonList(new ChangeResponse(change1.getId(), CloudChangeAction.APPLY))))
117+
);
118+
when(client.createExecution(any(), any(), anyLong())).thenReturn(response);
119+
120+
List<AbstractLoadedStage> stages = Collections.singletonList(
121+
new DefaultLoadedStage("stage-1", StageType.DEFAULT, Collections.singletonList(change1)));
122+
123+
ExecutionPlan plan = planner.getNextExecution(stages);
124+
125+
assertTrue(plan.isAborted());
126+
assertThrows(io.flamingock.internal.common.core.error.FlamingockException.class, plan::validate);
127+
}
128+
}

core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/recovery/action/ChangeActionMap.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,16 @@ public ChangeAction getActionFor(String changeId) {
4343
}
4444

4545

46+
/**
47+
* Returns true if any change in this action map requires manual intervention.
48+
*/
49+
public boolean hasManualInterventionActions() {
50+
return actionMap.containsValue(ChangeAction.MANUAL_INTERVENTION);
51+
}
52+
4653
/**
4754
* Returns true if the action plan is empty (no actions specified).
48-
*
55+
*
4956
* @return true if the action plan is empty
5057
*/
5158
public boolean isEmpty() {
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright 2026 Flamingock (https://www.flamingock.io)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.flamingock.internal.common.core.recovery.action;
17+
18+
import org.junit.jupiter.api.DisplayName;
19+
import org.junit.jupiter.api.Test;
20+
21+
import java.util.Collections;
22+
import java.util.HashMap;
23+
import java.util.Map;
24+
25+
import static org.junit.jupiter.api.Assertions.*;
26+
27+
class ChangeActionMapTest {
28+
29+
@Test
30+
@DisplayName("Should return true when map contains MANUAL_INTERVENTION")
31+
void shouldReturnTrueWhenMapContainsManualIntervention() {
32+
Map<String, ChangeAction> map = new HashMap<>();
33+
map.put("change-1", ChangeAction.MANUAL_INTERVENTION);
34+
assertTrue(new ChangeActionMap(map).hasManualInterventionActions());
35+
}
36+
37+
@Test
38+
@DisplayName("Should return false when map has only APPLY and SKIP")
39+
void shouldReturnFalseWhenMapHasOnlyApplyAndSkip() {
40+
Map<String, ChangeAction> map = new HashMap<>();
41+
map.put("change-1", ChangeAction.APPLY);
42+
map.put("change-2", ChangeAction.SKIP);
43+
assertFalse(new ChangeActionMap(map).hasManualInterventionActions());
44+
}
45+
46+
@Test
47+
@DisplayName("Should return false when map is empty")
48+
void shouldReturnFalseWhenMapIsEmpty() {
49+
assertFalse(new ChangeActionMap(Collections.emptyMap()).hasManualInterventionActions());
50+
}
51+
52+
@Test
53+
@DisplayName("Should return true when multiple changes and one is MANUAL_INTERVENTION")
54+
void shouldReturnTrueWhenMultipleChangesAndOneIsManualIntervention() {
55+
Map<String, ChangeAction> map = new HashMap<>();
56+
map.put("change-1", ChangeAction.APPLY);
57+
map.put("change-2", ChangeAction.SKIP);
58+
map.put("change-3", ChangeAction.MANUAL_INTERVENTION);
59+
assertTrue(new ChangeActionMap(map).hasManualInterventionActions());
60+
}
61+
}

core/flamingock-core/src/main/java/io/flamingock/internal/core/operation/AbstractPipelineTraverseOperation.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package io.flamingock.internal.core.operation;
1717

1818
import io.flamingock.internal.common.core.error.FlamingockException;
19+
import io.flamingock.internal.common.core.recovery.ManualInterventionRequiredException;
1920
import io.flamingock.internal.common.core.error.PendingChangesException;
2021
import io.flamingock.internal.common.core.response.data.ErrorInfo;
2122
import io.flamingock.internal.common.core.response.data.ExecuteResponseData;
@@ -207,7 +208,12 @@ private FlamingockException processAndGetFlamingockException(Throwable exception
207208
} else {
208209
flamingockException = new FlamingockException(exception);
209210
}
210-
logger.debug("Error executing the process. ABORTED OPERATION", exception);
211+
if (flamingockException instanceof ManualInterventionRequiredException) {
212+
ManualInterventionRequiredException miException = (ManualInterventionRequiredException) flamingockException;
213+
logger.error("ABORTED OPERATION - Manual intervention required for changes: [{}]", miException.getConflictingSummary());
214+
} else {
215+
logger.debug("Error executing the process. ABORTED OPERATION", exception);
216+
}
211217
eventPublisher.publish(new StageFailedEvent(flamingockException));
212218
eventPublisher.publish(new PipelineFailedEvent(flamingockException));
213219
return flamingockException;

core/flamingock-core/src/main/java/io/flamingock/internal/core/plan/ExecutionPlan.java

Lines changed: 36 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.flamingock.internal.core.pipeline.execution.ExecutablePipeline;
2121
import io.flamingock.internal.core.pipeline.execution.ExecutableStage;
2222
import io.flamingock.internal.core.change.executable.ExecutableChange;
23+
import io.flamingock.internal.common.core.error.FlamingockException;
2324
import io.flamingock.internal.common.core.recovery.action.ChangeAction;
2425
import io.flamingock.internal.common.core.recovery.ManualInterventionRequiredException;
2526
import io.flamingock.internal.common.core.recovery.RecoveryIssue;
@@ -37,7 +38,11 @@ public static ExecutionPlan newExecution(String executionId,
3738
}
3839

3940
public static ExecutionPlan CONTINUE(List<ExecutableStage> stages) {
40-
return new ExecutionPlan(stages);
41+
return new ExecutionPlan(false, stages);
42+
}
43+
44+
public static ExecutionPlan ABORT(List<ExecutableStage> stages) {
45+
return new ExecutionPlan(true, stages);
4146
}
4247

4348
private final String executionId;
@@ -46,18 +51,29 @@ public static ExecutionPlan CONTINUE(List<ExecutableStage> stages) {
4651

4752
private final ExecutablePipeline pipeline;
4853

49-
private ExecutionPlan(List<ExecutableStage> stages) {
50-
this(null, null, stages);
54+
private final boolean aborted;
55+
56+
private ExecutionPlan(boolean aborted, List<ExecutableStage> stages) {
57+
this(null, null, aborted, stages);
5158
}
5259

5360
private ExecutionPlan(String executionId, Lock lock, List<ExecutableStage> stages) {
61+
this(executionId, lock, false, stages);
62+
}
63+
64+
private ExecutionPlan(String executionId, Lock lock, boolean aborted, List<ExecutableStage> stages) {
5465
this.executionId = executionId;
5566
this.lock = lock;
67+
this.aborted = aborted;
5668
this.pipeline = new ExecutablePipeline(stages);
5769
}
5870

71+
public boolean isAborted() {
72+
return aborted;
73+
}
74+
5975
public boolean isExecutionRequired() {
60-
return pipeline.isExecutionRequired();
76+
return !aborted && pipeline.isExecutionRequired();
6177
}
6278

6379
public ExecutablePipeline getPipeline() {
@@ -72,37 +88,43 @@ public void applyOnEach(TriConsumer<String, Lock, ExecutableStage> consumer) {
7288
}
7389

7490
/**
75-
* Validates the execution plan for manual intervention requirements.
76-
* This method analyzes all executable stages and their changes to identify
77-
* any that require manual intervention, throwing an exception if found.
91+
* Validates the execution plan.
7892
* <p>
79-
* This centralized validation follows DDD principles by keeping validation
80-
* logic at the appropriate architectural layer (ExecutionPlan domain).
81-
* </p>
82-
*
93+
* Checks two conditions:
94+
* <ol>
95+
* <li>If any changes require manual intervention, throws {@link ManualInterventionRequiredException}</li>
96+
* <li>If the plan is aborted (even without MI changes), throws {@link FlamingockException}
97+
* — the execution planner decided to abort for reasons beyond individual change state</li>
98+
* </ol>
99+
*
83100
* @throws ManualInterventionRequiredException if any changes require manual intervention
101+
* @throws FlamingockException if the plan is aborted without specific MI changes
84102
*/
85103
public void validate() {
86104
List<RecoveryIssue> recoveryIssues = new ArrayList<>();
87105
String firstStageName = "unknown";
88106
boolean hasStages = false;
89-
107+
90108
for (ExecutableStage stage : pipeline.getExecutableStages()) {
91109
if (!hasStages) {
92110
firstStageName = stage.getName();
93111
hasStages = true;
94112
}
95-
113+
96114
for (ExecutableChange change : stage.getChanges()) {
97115
if (change.getAction() == ChangeAction.MANUAL_INTERVENTION) {
98116
recoveryIssues.add(new RecoveryIssue(change.getId()));
99117
}
100118
}
101119
}
102-
120+
103121
if (!recoveryIssues.isEmpty()) {
104122
throw new ManualInterventionRequiredException(recoveryIssues, firstStageName);
105123
}
124+
125+
if (aborted) {
126+
throw new FlamingockException("Execution aborted by the execution planner");
127+
}
106128
}
107129

108130
@Override

0 commit comments

Comments
 (0)