Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ public boolean isAwait() {
return action == CloudExecutionAction.AWAIT;
}

public boolean isAbort() {
return action == CloudExecutionAction.ABORT;
}

public void validate() {
if (isExecute() && executionId == null) {
throw new RuntimeException("ExecutionPlan must contain a valid executionId");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,12 @@ public ExecutionPlan getNextExecution(List<AbstractLoadedStage> loadedStages) th
Math.min(remainingTimeForSameGuid, coreConfiguration.getLockTryFrequencyMillis())
);

} else if (response.isAbort()) {
List<ExecutableStage> stages = CloudExecutionPlanMapper.getExecutableStages(response, loadedStages);
return ExecutionPlan.ABORT(stages);

} else {
throw new RuntimeException("Unrecognized action from response. Not within(CONTINUE, EXECUTE, AWAIT)");
throw new RuntimeException("Unrecognized action from response. Not within(CONTINUE, EXECUTE, AWAIT, ABORT)");
}

} catch (FlamingockException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,29 @@ void shouldThrowManualInterventionOnValidate() {
assertThrows(ManualInterventionRequiredException.class, plan::validate);
}

@Test
@DisplayName("Should build stages from ABORT response preserving MANUAL_INTERVENTION actions")
void shouldBuildAbortPlanFromAbortResponse() {
List<AbstractLoadedStage> loadedStages = Arrays.asList(buildStage("stage-1", change1, change2));
ExecutionPlanResponse response = new ExecutionPlanResponse(
CloudExecutionAction.ABORT, "exec-1", null,
Arrays.asList(buildStageResponse("stage-1", 0,
changeResponse(change1.getId(), CloudChangeAction.MANUAL_INTERVENTION),
changeResponse(change2.getId(), CloudChangeAction.APPLY)))
);

List<ExecutableStage> result = CloudExecutionPlanMapper.getExecutableStages(response, loadedStages);
ExecutionPlan plan = ExecutionPlan.ABORT(result);

assertTrue(plan.isAborted());
assertFalse(plan.isExecutionRequired());

Map<String, ChangeAction> actions = result.get(0).getChanges().stream()
.collect(Collectors.toMap(ExecutableChange::getId, ExecutableChange::getAction));
assertEquals(ChangeAction.MANUAL_INTERVENTION, actions.get(change1.getId()));
assertEquals(ChangeAction.APPLY, actions.get(change2.getId()));
}

private static DefaultLoadedStage buildStage(String name, AbstractLoadedChange... changes) {
return new DefaultLoadedStage(name, StageType.DEFAULT, Arrays.asList(changes));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright 2026 Flamingock (https://www.flamingock.io)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.flamingock.cloud.planner;

import io.flamingock.api.StageType;
import io.flamingock.cloud.api.response.ChangeResponse;
import io.flamingock.cloud.api.response.ExecutionPlanResponse;
import io.flamingock.cloud.api.response.StageResponse;
import io.flamingock.cloud.api.vo.CloudChangeAction;
import io.flamingock.cloud.api.vo.CloudExecutionAction;
import io.flamingock.cloud.lock.CloudLockService;
import io.flamingock.cloud.planner.client.ExecutionPlannerClient;
import io.flamingock.internal.common.core.recovery.ManualInterventionRequiredException;
import io.flamingock.internal.core.change.loaded.AbstractLoadedChange;
import io.flamingock.internal.core.change.loaded.LoadedChangeBuilder;
import io.flamingock.internal.core.configuration.core.CoreConfigurable;
import io.flamingock.internal.core.external.targets.mark.TargetSystemAuditMarker;
import io.flamingock.internal.core.plan.ExecutionPlan;
import io.flamingock.internal.core.pipeline.loaded.stage.AbstractLoadedStage;
import io.flamingock.internal.core.pipeline.loaded.stage.DefaultLoadedStage;
import io.flamingock.internal.util.TimeService;
import io.flamingock.internal.util.id.RunnerId;
import io.flamingock.core.cloud.changes._001__CloudChange1;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class CloudExecutionPlannerTest {

private static AbstractLoadedChange change1;

private ExecutionPlannerClient client;
private CloudExecutionPlanner planner;

@BeforeAll
static void setupChanges() {
change1 = LoadedChangeBuilder.getCodeBuilderInstance(_001__CloudChange1.class).build();
}

@BeforeEach
void setup() {
client = mock(ExecutionPlannerClient.class);
CoreConfigurable config = mock(CoreConfigurable.class);
when(config.getLockAcquiredForMillis()).thenReturn(60000L);
when(config.getLockQuitTryingAfterMillis()).thenReturn(30000L);
when(config.getLockTryFrequencyMillis()).thenReturn(1000L);

TargetSystemAuditMarker auditMarker = mock(TargetSystemAuditMarker.class);
when(auditMarker.listAll()).thenReturn(Collections.emptySet());

planner = new CloudExecutionPlanner(
RunnerId.fromString("test-runner"),
client,
config,
mock(CloudLockService.class),
auditMarker,
TimeService.getDefault()
);
}

@Test
@DisplayName("Should return ABORT plan when server returns ABORT with MANUAL_INTERVENTION changes")
void shouldReturnAbortPlanWhenServerReturnsAbort() {
ExecutionPlanResponse response = new ExecutionPlanResponse(
CloudExecutionAction.ABORT,
"exec-1",
null,
Collections.singletonList(new StageResponse("stage-1", 0,
Collections.singletonList(new ChangeResponse(change1.getId(), CloudChangeAction.MANUAL_INTERVENTION))))
);
when(client.createExecution(any(), any(), anyLong())).thenReturn(response);

List<AbstractLoadedStage> stages = Collections.singletonList(
new DefaultLoadedStage("stage-1", StageType.DEFAULT, Collections.singletonList(change1)));

ExecutionPlan plan = planner.getNextExecution(stages);

assertTrue(plan.isAborted());
assertFalse(plan.isExecutionRequired());
assertThrows(ManualInterventionRequiredException.class, plan::validate);
}

@Test
@DisplayName("Should return ABORT plan that throws FlamingockException when server returns ABORT but no MI changes")
void shouldReturnAbortPlanWhenServerReturnsAbortWithNoMIChanges() {
ExecutionPlanResponse response = new ExecutionPlanResponse(
CloudExecutionAction.ABORT,
"exec-1",
null,
Collections.singletonList(new StageResponse("stage-1", 0,
Collections.singletonList(new ChangeResponse(change1.getId(), CloudChangeAction.APPLY))))
);
when(client.createExecution(any(), any(), anyLong())).thenReturn(response);

List<AbstractLoadedStage> stages = Collections.singletonList(
new DefaultLoadedStage("stage-1", StageType.DEFAULT, Collections.singletonList(change1)));

ExecutionPlan plan = planner.getNextExecution(stages);

assertTrue(plan.isAborted());
assertThrows(io.flamingock.internal.common.core.error.FlamingockException.class, plan::validate);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,16 @@ public ChangeAction getActionFor(String changeId) {
}


/**
* Returns true if any change in this action map requires manual intervention.
*/
public boolean hasManualInterventionActions() {
return actionMap.containsValue(ChangeAction.MANUAL_INTERVENTION);
}

/**
* Returns true if the action plan is empty (no actions specified).
*
*
* @return true if the action plan is empty
*/
public boolean isEmpty() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2026 Flamingock (https://www.flamingock.io)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.flamingock.internal.common.core.recovery.action;

import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.*;

class ChangeActionMapTest {

@Test
@DisplayName("Should return true when map contains MANUAL_INTERVENTION")
void shouldReturnTrueWhenMapContainsManualIntervention() {
Map<String, ChangeAction> map = new HashMap<>();
map.put("change-1", ChangeAction.MANUAL_INTERVENTION);
assertTrue(new ChangeActionMap(map).hasManualInterventionActions());
}

@Test
@DisplayName("Should return false when map has only APPLY and SKIP")
void shouldReturnFalseWhenMapHasOnlyApplyAndSkip() {
Map<String, ChangeAction> map = new HashMap<>();
map.put("change-1", ChangeAction.APPLY);
map.put("change-2", ChangeAction.SKIP);
assertFalse(new ChangeActionMap(map).hasManualInterventionActions());
}

@Test
@DisplayName("Should return false when map is empty")
void shouldReturnFalseWhenMapIsEmpty() {
assertFalse(new ChangeActionMap(Collections.emptyMap()).hasManualInterventionActions());
}

@Test
@DisplayName("Should return true when multiple changes and one is MANUAL_INTERVENTION")
void shouldReturnTrueWhenMultipleChangesAndOneIsManualIntervention() {
Map<String, ChangeAction> map = new HashMap<>();
map.put("change-1", ChangeAction.APPLY);
map.put("change-2", ChangeAction.SKIP);
map.put("change-3", ChangeAction.MANUAL_INTERVENTION);
assertTrue(new ChangeActionMap(map).hasManualInterventionActions());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.flamingock.internal.core.operation;

import io.flamingock.internal.common.core.error.FlamingockException;
import io.flamingock.internal.common.core.recovery.ManualInterventionRequiredException;
import io.flamingock.internal.common.core.error.PendingChangesException;
import io.flamingock.internal.common.core.response.data.ErrorInfo;
import io.flamingock.internal.common.core.response.data.ExecuteResponseData;
Expand Down Expand Up @@ -207,7 +208,12 @@ private FlamingockException processAndGetFlamingockException(Throwable exception
} else {
flamingockException = new FlamingockException(exception);
}
logger.debug("Error executing the process. ABORTED OPERATION", exception);
if (flamingockException instanceof ManualInterventionRequiredException) {
ManualInterventionRequiredException miException = (ManualInterventionRequiredException) flamingockException;
logger.error("ABORTED OPERATION - Manual intervention required for changes: [{}]", miException.getConflictingSummary());
} else {
logger.debug("Error executing the process. ABORTED OPERATION", exception);
}
eventPublisher.publish(new StageFailedEvent(flamingockException));
eventPublisher.publish(new PipelineFailedEvent(flamingockException));
return flamingockException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.flamingock.internal.core.pipeline.execution.ExecutablePipeline;
import io.flamingock.internal.core.pipeline.execution.ExecutableStage;
import io.flamingock.internal.core.change.executable.ExecutableChange;
import io.flamingock.internal.common.core.error.FlamingockException;
import io.flamingock.internal.common.core.recovery.action.ChangeAction;
import io.flamingock.internal.common.core.recovery.ManualInterventionRequiredException;
import io.flamingock.internal.common.core.recovery.RecoveryIssue;
Expand All @@ -37,7 +38,11 @@ public static ExecutionPlan newExecution(String executionId,
}

public static ExecutionPlan CONTINUE(List<ExecutableStage> stages) {
return new ExecutionPlan(stages);
return new ExecutionPlan(false, stages);
}

public static ExecutionPlan ABORT(List<ExecutableStage> stages) {
return new ExecutionPlan(true, stages);
}

private final String executionId;
Expand All @@ -46,18 +51,29 @@ public static ExecutionPlan CONTINUE(List<ExecutableStage> stages) {

private final ExecutablePipeline pipeline;

private ExecutionPlan(List<ExecutableStage> stages) {
this(null, null, stages);
private final boolean aborted;

private ExecutionPlan(boolean aborted, List<ExecutableStage> stages) {
this(null, null, aborted, stages);
}

private ExecutionPlan(String executionId, Lock lock, List<ExecutableStage> stages) {
this(executionId, lock, false, stages);
}

private ExecutionPlan(String executionId, Lock lock, boolean aborted, List<ExecutableStage> stages) {
this.executionId = executionId;
this.lock = lock;
this.aborted = aborted;
this.pipeline = new ExecutablePipeline(stages);
}

public boolean isAborted() {
return aborted;
}

public boolean isExecutionRequired() {
return pipeline.isExecutionRequired();
return !aborted && pipeline.isExecutionRequired();
}

public ExecutablePipeline getPipeline() {
Expand All @@ -72,37 +88,43 @@ public void applyOnEach(TriConsumer<String, Lock, ExecutableStage> consumer) {
}

/**
* Validates the execution plan for manual intervention requirements.
* This method analyzes all executable stages and their changes to identify
* any that require manual intervention, throwing an exception if found.
* Validates the execution plan.
* <p>
* This centralized validation follows DDD principles by keeping validation
* logic at the appropriate architectural layer (ExecutionPlan domain).
* </p>
*
* Checks two conditions:
* <ol>
* <li>If any changes require manual intervention, throws {@link ManualInterventionRequiredException}</li>
* <li>If the plan is aborted (even without MI changes), throws {@link FlamingockException}
* — the execution planner decided to abort for reasons beyond individual change state</li>
* </ol>
*
* @throws ManualInterventionRequiredException if any changes require manual intervention
* @throws FlamingockException if the plan is aborted without specific MI changes
*/
public void validate() {
List<RecoveryIssue> recoveryIssues = new ArrayList<>();
String firstStageName = "unknown";
boolean hasStages = false;

for (ExecutableStage stage : pipeline.getExecutableStages()) {
if (!hasStages) {
firstStageName = stage.getName();
hasStages = true;
}

for (ExecutableChange change : stage.getChanges()) {
if (change.getAction() == ChangeAction.MANUAL_INTERVENTION) {
recoveryIssues.add(new RecoveryIssue(change.getId()));
}
}
}

if (!recoveryIssues.isEmpty()) {
throw new ManualInterventionRequiredException(recoveryIssues, firstStageName);
}

if (aborted) {
throw new FlamingockException("Execution aborted by the execution planner");
}
}

@Override
Expand Down
Loading
Loading