diff --git a/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/response/ExecutionPlanResponse.java b/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/response/ExecutionPlanResponse.java index f904ff250..74c213a03 100644 --- a/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/response/ExecutionPlanResponse.java +++ b/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/response/ExecutionPlanResponse.java @@ -95,6 +95,10 @@ public boolean isAwait() { return action == CloudExecutionAction.AWAIT; } + public boolean isAbort() { + return action == CloudExecutionAction.ABORT; + } + public void validate() { if (isExecute() && executionId == null) { throw new RuntimeException("ExecutionPlan must contain a valid executionId"); diff --git a/cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/planner/CloudExecutionPlanner.java b/cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/planner/CloudExecutionPlanner.java index bd93f2a6a..dc3d3805e 100644 --- a/cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/planner/CloudExecutionPlanner.java +++ b/cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/planner/CloudExecutionPlanner.java @@ -116,8 +116,12 @@ public ExecutionPlan getNextExecution(List loadedStages) th Math.min(remainingTimeForSameGuid, coreConfiguration.getLockTryFrequencyMillis()) ); + } else if (response.isAbort()) { + List stages = CloudExecutionPlanMapper.getExecutableStages(response, loadedStages); + return ExecutionPlan.ABORT(stages); + } else { - throw new RuntimeException("Unrecognized action from response. Not within(CONTINUE, EXECUTE, AWAIT)"); + throw new RuntimeException("Unrecognized action from response. Not within(CONTINUE, EXECUTE, AWAIT, ABORT)"); } } catch (FlamingockException ex) { diff --git a/cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlanMapperTest.java b/cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlanMapperTest.java index 0820cbbdb..60705f574 100644 --- a/cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlanMapperTest.java +++ b/cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlanMapperTest.java @@ -165,6 +165,29 @@ void shouldThrowManualInterventionOnValidate() { assertThrows(ManualInterventionRequiredException.class, plan::validate); } + @Test + @DisplayName("Should build stages from ABORT response preserving MANUAL_INTERVENTION actions") + void shouldBuildAbortPlanFromAbortResponse() { + List loadedStages = Arrays.asList(buildStage("stage-1", change1, change2)); + ExecutionPlanResponse response = new ExecutionPlanResponse( + CloudExecutionAction.ABORT, "exec-1", null, + Arrays.asList(buildStageResponse("stage-1", 0, + changeResponse(change1.getId(), CloudChangeAction.MANUAL_INTERVENTION), + changeResponse(change2.getId(), CloudChangeAction.APPLY))) + ); + + List result = CloudExecutionPlanMapper.getExecutableStages(response, loadedStages); + ExecutionPlan plan = ExecutionPlan.ABORT(result); + + assertTrue(plan.isAborted()); + assertFalse(plan.isExecutionRequired()); + + Map actions = result.get(0).getChanges().stream() + .collect(Collectors.toMap(ExecutableChange::getId, ExecutableChange::getAction)); + assertEquals(ChangeAction.MANUAL_INTERVENTION, actions.get(change1.getId())); + assertEquals(ChangeAction.APPLY, actions.get(change2.getId())); + } + private static DefaultLoadedStage buildStage(String name, AbstractLoadedChange... changes) { return new DefaultLoadedStage(name, StageType.DEFAULT, Arrays.asList(changes)); } diff --git a/cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlannerTest.java b/cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlannerTest.java new file mode 100644 index 000000000..bf3e29fa3 --- /dev/null +++ b/cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlannerTest.java @@ -0,0 +1,128 @@ +/* + * Copyright 2026 Flamingock (https://www.flamingock.io) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.flamingock.cloud.planner; + +import io.flamingock.api.StageType; +import io.flamingock.cloud.api.response.ChangeResponse; +import io.flamingock.cloud.api.response.ExecutionPlanResponse; +import io.flamingock.cloud.api.response.StageResponse; +import io.flamingock.cloud.api.vo.CloudChangeAction; +import io.flamingock.cloud.api.vo.CloudExecutionAction; +import io.flamingock.cloud.lock.CloudLockService; +import io.flamingock.cloud.planner.client.ExecutionPlannerClient; +import io.flamingock.internal.common.core.recovery.ManualInterventionRequiredException; +import io.flamingock.internal.core.change.loaded.AbstractLoadedChange; +import io.flamingock.internal.core.change.loaded.LoadedChangeBuilder; +import io.flamingock.internal.core.configuration.core.CoreConfigurable; +import io.flamingock.internal.core.external.targets.mark.TargetSystemAuditMarker; +import io.flamingock.internal.core.plan.ExecutionPlan; +import io.flamingock.internal.core.pipeline.loaded.stage.AbstractLoadedStage; +import io.flamingock.internal.core.pipeline.loaded.stage.DefaultLoadedStage; +import io.flamingock.internal.util.TimeService; +import io.flamingock.internal.util.id.RunnerId; +import io.flamingock.core.cloud.changes._001__CloudChange1; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class CloudExecutionPlannerTest { + + private static AbstractLoadedChange change1; + + private ExecutionPlannerClient client; + private CloudExecutionPlanner planner; + + @BeforeAll + static void setupChanges() { + change1 = LoadedChangeBuilder.getCodeBuilderInstance(_001__CloudChange1.class).build(); + } + + @BeforeEach + void setup() { + client = mock(ExecutionPlannerClient.class); + CoreConfigurable config = mock(CoreConfigurable.class); + when(config.getLockAcquiredForMillis()).thenReturn(60000L); + when(config.getLockQuitTryingAfterMillis()).thenReturn(30000L); + when(config.getLockTryFrequencyMillis()).thenReturn(1000L); + + TargetSystemAuditMarker auditMarker = mock(TargetSystemAuditMarker.class); + when(auditMarker.listAll()).thenReturn(Collections.emptySet()); + + planner = new CloudExecutionPlanner( + RunnerId.fromString("test-runner"), + client, + config, + mock(CloudLockService.class), + auditMarker, + TimeService.getDefault() + ); + } + + @Test + @DisplayName("Should return ABORT plan when server returns ABORT with MANUAL_INTERVENTION changes") + void shouldReturnAbortPlanWhenServerReturnsAbort() { + ExecutionPlanResponse response = new ExecutionPlanResponse( + CloudExecutionAction.ABORT, + "exec-1", + null, + Collections.singletonList(new StageResponse("stage-1", 0, + Collections.singletonList(new ChangeResponse(change1.getId(), CloudChangeAction.MANUAL_INTERVENTION)))) + ); + when(client.createExecution(any(), any(), anyLong())).thenReturn(response); + + List stages = Collections.singletonList( + new DefaultLoadedStage("stage-1", StageType.DEFAULT, Collections.singletonList(change1))); + + ExecutionPlan plan = planner.getNextExecution(stages); + + assertTrue(plan.isAborted()); + assertFalse(plan.isExecutionRequired()); + assertThrows(ManualInterventionRequiredException.class, plan::validate); + } + + @Test + @DisplayName("Should return ABORT plan that throws FlamingockException when server returns ABORT but no MI changes") + void shouldReturnAbortPlanWhenServerReturnsAbortWithNoMIChanges() { + ExecutionPlanResponse response = new ExecutionPlanResponse( + CloudExecutionAction.ABORT, + "exec-1", + null, + Collections.singletonList(new StageResponse("stage-1", 0, + Collections.singletonList(new ChangeResponse(change1.getId(), CloudChangeAction.APPLY)))) + ); + when(client.createExecution(any(), any(), anyLong())).thenReturn(response); + + List stages = Collections.singletonList( + new DefaultLoadedStage("stage-1", StageType.DEFAULT, Collections.singletonList(change1))); + + ExecutionPlan plan = planner.getNextExecution(stages); + + assertTrue(plan.isAborted()); + assertThrows(io.flamingock.internal.common.core.error.FlamingockException.class, plan::validate); + } +} diff --git a/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/recovery/action/ChangeActionMap.java b/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/recovery/action/ChangeActionMap.java index 4486a85a1..1fbf34191 100644 --- a/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/recovery/action/ChangeActionMap.java +++ b/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/recovery/action/ChangeActionMap.java @@ -43,9 +43,16 @@ public ChangeAction getActionFor(String changeId) { } + /** + * Returns true if any change in this action map requires manual intervention. + */ + public boolean hasManualInterventionActions() { + return actionMap.containsValue(ChangeAction.MANUAL_INTERVENTION); + } + /** * Returns true if the action plan is empty (no actions specified). - * + * * @return true if the action plan is empty */ public boolean isEmpty() { diff --git a/core/flamingock-core-commons/src/test/java/io/flamingock/internal/common/core/recovery/action/ChangeActionMapTest.java b/core/flamingock-core-commons/src/test/java/io/flamingock/internal/common/core/recovery/action/ChangeActionMapTest.java new file mode 100644 index 000000000..eabe95862 --- /dev/null +++ b/core/flamingock-core-commons/src/test/java/io/flamingock/internal/common/core/recovery/action/ChangeActionMapTest.java @@ -0,0 +1,61 @@ +/* + * Copyright 2026 Flamingock (https://www.flamingock.io) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.flamingock.internal.common.core.recovery.action; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.*; + +class ChangeActionMapTest { + + @Test + @DisplayName("Should return true when map contains MANUAL_INTERVENTION") + void shouldReturnTrueWhenMapContainsManualIntervention() { + Map map = new HashMap<>(); + map.put("change-1", ChangeAction.MANUAL_INTERVENTION); + assertTrue(new ChangeActionMap(map).hasManualInterventionActions()); + } + + @Test + @DisplayName("Should return false when map has only APPLY and SKIP") + void shouldReturnFalseWhenMapHasOnlyApplyAndSkip() { + Map map = new HashMap<>(); + map.put("change-1", ChangeAction.APPLY); + map.put("change-2", ChangeAction.SKIP); + assertFalse(new ChangeActionMap(map).hasManualInterventionActions()); + } + + @Test + @DisplayName("Should return false when map is empty") + void shouldReturnFalseWhenMapIsEmpty() { + assertFalse(new ChangeActionMap(Collections.emptyMap()).hasManualInterventionActions()); + } + + @Test + @DisplayName("Should return true when multiple changes and one is MANUAL_INTERVENTION") + void shouldReturnTrueWhenMultipleChangesAndOneIsManualIntervention() { + Map map = new HashMap<>(); + map.put("change-1", ChangeAction.APPLY); + map.put("change-2", ChangeAction.SKIP); + map.put("change-3", ChangeAction.MANUAL_INTERVENTION); + assertTrue(new ChangeActionMap(map).hasManualInterventionActions()); + } +} diff --git a/core/flamingock-core/src/main/java/io/flamingock/internal/core/operation/AbstractPipelineTraverseOperation.java b/core/flamingock-core/src/main/java/io/flamingock/internal/core/operation/AbstractPipelineTraverseOperation.java index 8db9f3847..cdec7078a 100644 --- a/core/flamingock-core/src/main/java/io/flamingock/internal/core/operation/AbstractPipelineTraverseOperation.java +++ b/core/flamingock-core/src/main/java/io/flamingock/internal/core/operation/AbstractPipelineTraverseOperation.java @@ -16,6 +16,7 @@ package io.flamingock.internal.core.operation; import io.flamingock.internal.common.core.error.FlamingockException; +import io.flamingock.internal.common.core.recovery.ManualInterventionRequiredException; import io.flamingock.internal.common.core.error.PendingChangesException; import io.flamingock.internal.common.core.response.data.ErrorInfo; import io.flamingock.internal.common.core.response.data.ExecuteResponseData; @@ -207,7 +208,12 @@ private FlamingockException processAndGetFlamingockException(Throwable exception } else { flamingockException = new FlamingockException(exception); } - logger.debug("Error executing the process. ABORTED OPERATION", exception); + if (flamingockException instanceof ManualInterventionRequiredException) { + ManualInterventionRequiredException miException = (ManualInterventionRequiredException) flamingockException; + logger.error("ABORTED OPERATION - Manual intervention required for changes: [{}]", miException.getConflictingSummary()); + } else { + logger.debug("Error executing the process. ABORTED OPERATION", exception); + } eventPublisher.publish(new StageFailedEvent(flamingockException)); eventPublisher.publish(new PipelineFailedEvent(flamingockException)); return flamingockException; diff --git a/core/flamingock-core/src/main/java/io/flamingock/internal/core/plan/ExecutionPlan.java b/core/flamingock-core/src/main/java/io/flamingock/internal/core/plan/ExecutionPlan.java index ec3a99894..996fe4051 100644 --- a/core/flamingock-core/src/main/java/io/flamingock/internal/core/plan/ExecutionPlan.java +++ b/core/flamingock-core/src/main/java/io/flamingock/internal/core/plan/ExecutionPlan.java @@ -20,6 +20,7 @@ import io.flamingock.internal.core.pipeline.execution.ExecutablePipeline; import io.flamingock.internal.core.pipeline.execution.ExecutableStage; import io.flamingock.internal.core.change.executable.ExecutableChange; +import io.flamingock.internal.common.core.error.FlamingockException; import io.flamingock.internal.common.core.recovery.action.ChangeAction; import io.flamingock.internal.common.core.recovery.ManualInterventionRequiredException; import io.flamingock.internal.common.core.recovery.RecoveryIssue; @@ -37,7 +38,11 @@ public static ExecutionPlan newExecution(String executionId, } public static ExecutionPlan CONTINUE(List stages) { - return new ExecutionPlan(stages); + return new ExecutionPlan(false, stages); + } + + public static ExecutionPlan ABORT(List stages) { + return new ExecutionPlan(true, stages); } private final String executionId; @@ -46,18 +51,29 @@ public static ExecutionPlan CONTINUE(List stages) { private final ExecutablePipeline pipeline; - private ExecutionPlan(List stages) { - this(null, null, stages); + private final boolean aborted; + + private ExecutionPlan(boolean aborted, List stages) { + this(null, null, aborted, stages); } private ExecutionPlan(String executionId, Lock lock, List stages) { + this(executionId, lock, false, stages); + } + + private ExecutionPlan(String executionId, Lock lock, boolean aborted, List stages) { this.executionId = executionId; this.lock = lock; + this.aborted = aborted; this.pipeline = new ExecutablePipeline(stages); } + public boolean isAborted() { + return aborted; + } + public boolean isExecutionRequired() { - return pipeline.isExecutionRequired(); + return !aborted && pipeline.isExecutionRequired(); } public ExecutablePipeline getPipeline() { @@ -72,37 +88,43 @@ public void applyOnEach(TriConsumer consumer) { } /** - * Validates the execution plan for manual intervention requirements. - * This method analyzes all executable stages and their changes to identify - * any that require manual intervention, throwing an exception if found. + * Validates the execution plan. *

- * This centralized validation follows DDD principles by keeping validation - * logic at the appropriate architectural layer (ExecutionPlan domain). - *

- * + * Checks two conditions: + *
    + *
  1. If any changes require manual intervention, throws {@link ManualInterventionRequiredException}
  2. + *
  3. If the plan is aborted (even without MI changes), throws {@link FlamingockException} + * — the execution planner decided to abort for reasons beyond individual change state
  4. + *
+ * * @throws ManualInterventionRequiredException if any changes require manual intervention + * @throws FlamingockException if the plan is aborted without specific MI changes */ public void validate() { List recoveryIssues = new ArrayList<>(); String firstStageName = "unknown"; boolean hasStages = false; - + for (ExecutableStage stage : pipeline.getExecutableStages()) { if (!hasStages) { firstStageName = stage.getName(); hasStages = true; } - + for (ExecutableChange change : stage.getChanges()) { if (change.getAction() == ChangeAction.MANUAL_INTERVENTION) { recoveryIssues.add(new RecoveryIssue(change.getId())); } } } - + if (!recoveryIssues.isEmpty()) { throw new ManualInterventionRequiredException(recoveryIssues, firstStageName); } + + if (aborted) { + throw new FlamingockException("Execution aborted by the execution planner"); + } } @Override diff --git a/core/flamingock-core/src/main/java/io/flamingock/internal/core/plan/community/CommunityExecutionPlanner.java b/core/flamingock-core/src/main/java/io/flamingock/internal/core/plan/community/CommunityExecutionPlanner.java index 8fe1aa240..42c1b9a55 100644 --- a/core/flamingock-core/src/main/java/io/flamingock/internal/core/plan/community/CommunityExecutionPlanner.java +++ b/core/flamingock-core/src/main/java/io/flamingock/internal/core/plan/community/CommunityExecutionPlanner.java @@ -17,6 +17,7 @@ import io.flamingock.internal.common.core.audit.AuditEntry; +import io.flamingock.internal.common.core.recovery.action.ChangeAction; import io.flamingock.internal.common.core.recovery.action.ChangeActionMap; import io.flamingock.internal.core.plan.ExecutionId; import io.flamingock.internal.core.external.store.lock.community.CommunityLock; @@ -124,6 +125,10 @@ public ExecutionPlan getNextExecution(List loadedStages) th List initialStages = buildExecutableStages(loadedStages, initialSnapshot); + if (hasManualInterventionChanges(initialStages)) { + return ExecutionPlan.ABORT(initialStages); + } + if (!hasExecutableStages(initialStages)) { return ExecutionPlan.CONTINUE(initialStages); } @@ -191,7 +196,7 @@ private List buildExecutableStages( return loadedStages.stream() .map(loadedStage -> { - ChangeActionMap changeActionMap = CommunityChangeActionBuilder.build( + ChangeActionMap changeActionMap = CommunityChangeActionBuilder.build( loadedStage.getChanges(), auditSnapshot ); @@ -200,6 +205,12 @@ private List buildExecutableStages( .collect(Collectors.toList()); } + private boolean hasManualInterventionChanges(List stages) { + return stages.stream() + .flatMap(stage -> stage.getChanges().stream()) + .anyMatch(change -> change.getAction() == ChangeAction.MANUAL_INTERVENTION); + } + /** * Checks if any stage requires execution. * diff --git a/core/flamingock-core/src/test/java/io/flamingock/internal/core/operation/AbstractPipelineTraverseOperationTest.java b/core/flamingock-core/src/test/java/io/flamingock/internal/core/operation/AbstractPipelineTraverseOperationTest.java new file mode 100644 index 000000000..c55c38003 --- /dev/null +++ b/core/flamingock-core/src/test/java/io/flamingock/internal/core/operation/AbstractPipelineTraverseOperationTest.java @@ -0,0 +1,112 @@ +/* + * Copyright 2026 Flamingock (https://www.flamingock.io) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.flamingock.internal.core.operation; + +import io.flamingock.internal.common.core.recovery.ManualInterventionRequiredException; +import io.flamingock.internal.common.core.recovery.action.ChangeAction; +import io.flamingock.internal.core.change.executable.ExecutableChange; +import io.flamingock.internal.core.event.EventPublisher; +import io.flamingock.internal.core.operation.execute.ExecuteApplyOperation; +import io.flamingock.internal.core.operation.execute.ExecuteArgs; +import io.flamingock.internal.core.pipeline.execution.ExecutableStage; +import io.flamingock.internal.core.pipeline.execution.OrphanExecutionContext; +import io.flamingock.internal.core.pipeline.execution.StageExecutor; +import io.flamingock.internal.core.pipeline.loaded.LoadedPipeline; +import io.flamingock.internal.core.pipeline.loaded.stage.AbstractLoadedStage; +import io.flamingock.internal.core.plan.ExecutionPlan; +import io.flamingock.internal.core.plan.ExecutionPlanner; +import io.flamingock.internal.util.id.RunnerId; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.util.Collections; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.Mockito.*; + +class AbstractPipelineTraverseOperationTest { + + @Test + @DisplayName("Should throw ManualInterventionRequiredException when planner returns ABORT with MI changes") + void shouldThrowManualInterventionWhenPlannerReturnsAbort() { + ExecutableChange miChange = mockChange("change-1", ChangeAction.MANUAL_INTERVENTION); + ExecutableStage stage = new ExecutableStage("stage-1", Collections.singletonList(miChange)); + ExecutionPlan abortPlan = ExecutionPlan.ABORT(Collections.singletonList(stage)); + + ExecutionPlanner planner = mock(ExecutionPlanner.class); + when(planner.getNextExecution(anyList())).thenReturn(abortPlan); + + LoadedPipeline pipeline = mockPipeline(); + ExecuteApplyOperation operation = buildOperation(planner); + + ManualInterventionRequiredException ex = assertThrows( + ManualInterventionRequiredException.class, + () -> operation.execute(new ExecuteArgs(pipeline))); + assertTrue(ex.getConflictingSummary().contains("change-1")); + } + + @Test + @DisplayName("Should throw FlamingockException and not execute changes when plan is ABORT without MI changes") + void shouldNotExecuteChangesWhenPlanIsAbort() { + ExecutableChange change = mockChange("change-1", ChangeAction.APPLY); + ExecutableStage stage = new ExecutableStage("stage-1", Collections.singletonList(change)); + ExecutionPlan abortPlan = ExecutionPlan.ABORT(Collections.singletonList(stage)); + + ExecutionPlanner planner = mock(ExecutionPlanner.class); + when(planner.getNextExecution(anyList())).thenReturn(abortPlan); + + StageExecutor stageExecutor = mock(StageExecutor.class); + LoadedPipeline pipeline = mockPipeline(); + ExecuteApplyOperation operation = buildOperation(planner, stageExecutor); + + assertThrows(io.flamingock.internal.common.core.error.FlamingockException.class, + () -> operation.execute(new ExecuteArgs(pipeline))); + verify(stageExecutor, never()).executeStage(any(), any(), any()); + } + + private static ExecuteApplyOperation buildOperation(ExecutionPlanner planner) { + return buildOperation(planner, mock(StageExecutor.class)); + } + + private static ExecuteApplyOperation buildOperation(ExecutionPlanner planner, StageExecutor stageExecutor) { + return new ExecuteApplyOperation( + RunnerId.fromString("test"), + planner, + stageExecutor, + new OrphanExecutionContext("localhost", null), + mock(EventPublisher.class), + true, + () -> {} + ); + } + + private static LoadedPipeline mockPipeline() { + LoadedPipeline pipeline = mock(LoadedPipeline.class); + when(pipeline.getStages()).thenReturn(Collections.singletonList(mock(AbstractLoadedStage.class))); + when(pipeline.getSystemStage()).thenReturn(java.util.Optional.empty()); + doNothing().when(pipeline).validate(); + return pipeline; + } + + private static ExecutableChange mockChange(String id, ChangeAction action) { + ExecutableChange change = mock(ExecutableChange.class); + when(change.getId()).thenReturn(id); + when(change.getAction()).thenReturn(action); + when(change.isAlreadyApplied()).thenReturn(action == ChangeAction.SKIP); + return change; + } +} diff --git a/core/flamingock-core/src/test/java/io/flamingock/internal/core/plan/ExecutionPlanTest.java b/core/flamingock-core/src/test/java/io/flamingock/internal/core/plan/ExecutionPlanTest.java new file mode 100644 index 000000000..05941d3f6 --- /dev/null +++ b/core/flamingock-core/src/test/java/io/flamingock/internal/core/plan/ExecutionPlanTest.java @@ -0,0 +1,101 @@ +/* + * Copyright 2026 Flamingock (https://www.flamingock.io) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.flamingock.internal.core.plan; + +import io.flamingock.internal.common.core.error.FlamingockException; +import io.flamingock.internal.common.core.recovery.ManualInterventionRequiredException; +import io.flamingock.internal.common.core.recovery.action.ChangeAction; +import io.flamingock.internal.core.change.executable.ExecutableChange; +import io.flamingock.internal.core.pipeline.execution.ExecutableStage; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class ExecutionPlanTest { + + @Test + @DisplayName("ABORT plan should not require execution") + void abortPlanShouldNotRequireExecution() { + ExecutionPlan plan = ExecutionPlan.ABORT(Collections.singletonList( + stageWith(mockChange("change-1", ChangeAction.MANUAL_INTERVENTION)) + )); + assertFalse(plan.isExecutionRequired()); + } + + @Test + @DisplayName("ABORT plan should be marked as aborted") + void abortPlanShouldBeAborted() { + ExecutionPlan plan = ExecutionPlan.ABORT(Collections.emptyList()); + assertTrue(plan.isAborted()); + } + + @Test + @DisplayName("ABORT plan with MANUAL_INTERVENTION changes should throw on validate") + void abortPlanShouldThrowOnValidateWhenManualInterventionChanges() { + ExecutionPlan plan = ExecutionPlan.ABORT(Collections.singletonList( + stageWith( + mockChange("change-1", ChangeAction.APPLY), + mockChange("change-2", ChangeAction.MANUAL_INTERVENTION) + ) + )); + ManualInterventionRequiredException ex = assertThrows( + ManualInterventionRequiredException.class, plan::validate); + assertTrue(ex.getConflictingSummary().contains("change-2")); + } + + @Test + @DisplayName("ABORT plan without MANUAL_INTERVENTION changes should throw FlamingockException on validate") + void abortPlanShouldThrowFlamingockExceptionWhenNoManualInterventionChanges() { + ExecutionPlan plan = ExecutionPlan.ABORT(Collections.singletonList( + stageWith(mockChange("change-1", ChangeAction.APPLY)) + )); + FlamingockException ex = assertThrows(FlamingockException.class, plan::validate); + assertFalse(ex instanceof ManualInterventionRequiredException); + } + + @Test + @DisplayName("newExecution plan should not be aborted") + void newExecutionPlanShouldNotBeAborted() { + ExecutionPlan plan = ExecutionPlan.newExecution("exec-1", null, + Collections.singletonList(stageWith(mockChange("c1", ChangeAction.APPLY)))); + assertFalse(plan.isAborted()); + } + + @Test + @DisplayName("CONTINUE plan should not be aborted") + void continuePlanShouldNotBeAborted() { + ExecutionPlan plan = ExecutionPlan.CONTINUE(Collections.emptyList()); + assertFalse(plan.isAborted()); + } + + private static ExecutableStage stageWith(ExecutableChange... changes) { + return new ExecutableStage("test-stage", Arrays.asList(changes)); + } + + private static ExecutableChange mockChange(String id, ChangeAction action) { + ExecutableChange change = mock(ExecutableChange.class); + when(change.getId()).thenReturn(id); + when(change.getAction()).thenReturn(action); + when(change.isAlreadyApplied()).thenReturn(action == ChangeAction.SKIP); + return change; + } +} diff --git a/core/flamingock-core/src/test/java/io/flamingock/internal/core/plan/community/CommunityExecutionPlannerTest.java b/core/flamingock-core/src/test/java/io/flamingock/internal/core/plan/community/CommunityExecutionPlannerTest.java new file mode 100644 index 000000000..d2fbe0cae --- /dev/null +++ b/core/flamingock-core/src/test/java/io/flamingock/internal/core/plan/community/CommunityExecutionPlannerTest.java @@ -0,0 +1,206 @@ +/* + * Copyright 2026 Flamingock (https://www.flamingock.io) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.flamingock.internal.core.plan.community; + +import io.flamingock.internal.common.core.audit.AuditEntry; +import io.flamingock.internal.common.core.audit.AuditTxType; +import io.flamingock.api.RecoveryStrategy; +import io.flamingock.internal.common.core.change.RecoveryDescriptor; +import io.flamingock.internal.core.configuration.core.CoreConfigurable; +import io.flamingock.internal.core.external.store.audit.community.CommunityAuditReader; +import io.flamingock.internal.core.external.store.lock.LockAcquisition; +import io.flamingock.internal.core.external.store.lock.community.CommunityLockService; +import io.flamingock.internal.core.plan.ExecutionPlan; +import io.flamingock.internal.core.pipeline.loaded.stage.AbstractLoadedStage; +import io.flamingock.internal.core.change.loaded.AbstractLoadedChange; +import io.flamingock.internal.util.id.RunnerId; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.time.LocalDateTime; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.*; + +class CommunityExecutionPlannerTest { + + private CommunityAuditReader auditReader; + private CommunityLockService lockService; + private CoreConfigurable configuration; + private CommunityExecutionPlanner planner; + + @BeforeEach + void setup() { + auditReader = mock(CommunityAuditReader.class); + lockService = mock(CommunityLockService.class); + configuration = mock(CoreConfigurable.class); + + when(configuration.getLockAcquiredForMillis()).thenReturn(60000L); + when(configuration.getLockQuitTryingAfterMillis()).thenReturn(30000L); + when(configuration.getLockTryFrequencyMillis()).thenReturn(1000L); + when(configuration.isEnableRefreshDaemon()).thenReturn(false); + + planner = new CommunityExecutionPlanner( + RunnerId.fromString("test-runner"), + lockService, + auditReader, + configuration + ); + } + + @Test + @DisplayName("Should return ABORT without acquiring lock when manual intervention is required") + void shouldReturnAbortWithoutAcquiringLockWhenManualInterventionRequired() { + AbstractLoadedChange change = mockLoadedChange("change-1"); + AbstractLoadedStage stage = mockStage("stage-1", change); + + Map snapshot = new HashMap<>(); + snapshot.put("change-1", buildAuditEntry("change-1", AuditEntry.Status.FAILED, AuditTxType.NON_TX)); + when(auditReader.getAuditSnapshotByChangeId()).thenReturn(snapshot); + + ExecutionPlan plan = planner.getNextExecution(Collections.singletonList(stage)); + + assertTrue(plan.isAborted()); + verify(lockService, never()).upsert(any(), any(), anyLong()); + } + + @Test + @DisplayName("Should acquire lock and return execution plan when no manual intervention") + void shouldAcquireLockAndReturnExecutionPlanWhenNoManualIntervention() { + AbstractLoadedChange change = mockLoadedChange("change-1"); + AbstractLoadedStage stage = mockStage("stage-1", change); + + when(auditReader.getAuditSnapshotByChangeId()).thenReturn(Collections.emptyMap()); + when(lockService.upsert(any(), any(), anyLong())) + .thenReturn(new LockAcquisition(RunnerId.fromString("test-runner"), 60000L)); + + ExecutionPlan plan = planner.getNextExecution(Collections.singletonList(stage)); + + assertFalse(plan.isAborted()); + assertTrue(plan.isExecutionRequired()); + verify(lockService).upsert(any(), any(), anyLong()); + } + + @Test + @DisplayName("Should return CONTINUE without lock when all changes are already applied") + void shouldReturnContinueWithoutLockWhenAllChangesApplied() { + AbstractLoadedChange change = mockLoadedChange("change-1"); + AbstractLoadedStage stage = mockStage("stage-1", change); + + Map snapshot = new HashMap<>(); + snapshot.put("change-1", buildAuditEntry("change-1", AuditEntry.Status.APPLIED, null)); + when(auditReader.getAuditSnapshotByChangeId()).thenReturn(snapshot); + + ExecutionPlan plan = planner.getNextExecution(Collections.singletonList(stage)); + + assertFalse(plan.isAborted()); + assertFalse(plan.isExecutionRequired()); + verify(lockService, never()).upsert(any(), any(), anyLong()); + } + + private static AbstractLoadedChange mockLoadedChange(String id) { + AbstractLoadedChange change = mock(AbstractLoadedChange.class); + when(change.getId()).thenReturn(id); + when(change.isRunAlways()).thenReturn(false); + when(change.isStandard()).thenReturn(true); + when(change.getRecovery()).thenReturn(RecoveryDescriptor.getDefault()); + when(change.isTransactional()).thenReturn(false); + return change; + } + + private static AbstractLoadedStage mockStage(String name, AbstractLoadedChange... changes) { + AbstractLoadedStage stage = mock(AbstractLoadedStage.class); + when(stage.getName()).thenReturn(name); + List changeList = java.util.Arrays.asList(changes); + when(stage.getChanges()).thenReturn(changeList); + + when(stage.applyActions(any())).thenAnswer(invocation -> { + io.flamingock.internal.common.core.recovery.action.ChangeActionMap actionMap = invocation.getArgument(0); + List execChanges = new java.util.ArrayList<>(); + for (AbstractLoadedChange c : changeList) { + io.flamingock.internal.common.core.recovery.action.ChangeAction action = actionMap.getActionFor(c.getId()); + execChanges.add(new StubExecutableChange(c.getId(), action)); + } + return new io.flamingock.internal.core.pipeline.execution.ExecutableStage(name, execChanges); + }); + return stage; + } + + private static AuditEntry buildAuditEntry(String changeId, AuditEntry.Status status, AuditTxType txType) { + return new AuditEntry( + "exec-1", + "stage-1", + changeId, + "test-author", + LocalDateTime.now(), + status, + AuditEntry.ChangeType.STANDARD_CODE, + "io.flamingock.test.TestChange", + "apply", + null, + 100L, + "localhost", + null, + false, + null, + txType, + null, + null, + RecoveryStrategy.MANUAL_INTERVENTION, + null + ); + } + + private static class StubExecutableChange implements io.flamingock.internal.core.change.executable.ExecutableChange { + private final String id; + private final io.flamingock.internal.common.core.recovery.action.ChangeAction action; + + StubExecutableChange(String id, io.flamingock.internal.common.core.recovery.action.ChangeAction action) { + this.id = id; + this.action = action; + } + + @Override public String getId() { return id; } + @Override public io.flamingock.internal.common.core.recovery.action.ChangeAction getAction() { return action; } + @Override public boolean isAlreadyApplied() { return action == io.flamingock.internal.common.core.recovery.action.ChangeAction.SKIP; } + @Override public boolean isTransactional() { return false; } + @Override public io.flamingock.internal.common.core.change.ChangeDescriptor getLoadedChange() { return null; } + @Override public String getStageName() { return "test"; } + @Override public void apply(io.flamingock.internal.core.runtime.ExecutionRuntime rt) {} + @Override public String getApplyMethodName() { return "apply"; } + @Override public void rollback(io.flamingock.internal.core.runtime.ExecutionRuntime rt) {} + @Override public String getRollbackMethodName() { return null; } + @Override public java.util.Optional getOrder() { return java.util.Optional.of("001"); } + @Override public String getAuthor() { return "test"; } + @Override public String getSource() { return "Test"; } + @Override public String getSourceFile() { return null; } + @Override public boolean isRunAlways() { return false; } + @Override public java.util.Optional getTransactionalFlag() { return java.util.Optional.empty(); } + @Override public boolean isStandard() { return true; } + @Override public boolean isSystem() { return false; } + @Override public io.flamingock.internal.common.core.change.TargetSystemDescriptor getTargetSystem() { return null; } + @Override public io.flamingock.internal.common.core.change.RecoveryDescriptor getRecovery() { return RecoveryDescriptor.getDefault(); } + @Override public boolean isLegacy() { return false; } + @Override public boolean isSortable() { return true; } + } +}