Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
import io.flamingock.cloud.planner.client.ExecutionPlannerClient;
import io.flamingock.cloud.planner.client.HttpExecutionPlannerClient;
import io.flamingock.internal.core.external.store.audit.LifecycleAuditWriter;
import io.flamingock.internal.core.external.targets.TransactionalTargetSystem;
import io.flamingock.internal.core.external.targets.TargetSystemManager;
import io.flamingock.internal.core.external.targets.mark.TargetSystemAuditMarker;
import io.flamingock.internal.core.plan.ExecutionPlanner;
import io.flamingock.internal.common.core.context.ContextResolver;
import org.apache.http.impl.client.HttpClients;
Expand All @@ -44,6 +47,8 @@
import org.slf4j.Logger;

import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;

public class CloudAuditStoreImpl implements CloudAuditStore {
private static final Logger logger = FlamingockLoggerFactory.getLogger("CloudAuditStore");
Expand All @@ -61,6 +66,12 @@ public void initialize(ContextResolver baseContext) {
CoreConfigurable coreConfiguration = baseContext.getRequiredDependencyValue(CoreConfigurable.class);
CloudConfigurable cloudConfiguration = baseContext.getRequiredDependencyValue(CloudConfigurable.class);

TargetSystemManager targetSystemManager = baseContext.getRequiredDependencyValue(TargetSystemManager.class);
List<TargetSystemAuditMarker> auditMarkers = targetSystemManager.getTransactionalTargetSystems().stream()
.filter(TransactionalTargetSystem::hasMarker)
.map(TransactionalTargetSystem::getAuditMarker)
.collect(Collectors.toList());

Http.RequestBuilderFactory requestBuilderFactory =
Http.builderFactory(HttpClients.createDefault(), JsonObjectMapper.DEFAULT_INSTANCE);

Expand All @@ -69,7 +80,8 @@ public void initialize(ContextResolver baseContext) {
runnerId,
coreConfiguration,
cloudConfiguration,
requestBuilderFactory
requestBuilderFactory,
auditMarkers
);
}
}
Expand All @@ -83,7 +95,8 @@ public CloudAuditPersistenceImpl getPersistence() {
private CloudAuditPersistenceImpl buildPersistence(RunnerId runnerId,
CoreConfigurable coreConfiguration,
CloudConfigurable cloudConfiguration,
Http.RequestBuilderFactory requestBuilderFactory) {
Http.RequestBuilderFactory requestBuilderFactory,
List<TargetSystemAuditMarker> auditMarkers) {
AuthManager authManager = new AuthManager(
cloudConfiguration.getApiToken(),
cloudConfiguration.getServiceName(),
Expand Down Expand Up @@ -111,7 +124,8 @@ private CloudAuditPersistenceImpl buildPersistence(RunnerId runnerId,
requestBuilderFactory,
authManager,
environmentId,
serviceId);
serviceId,
auditMarkers);

return new CloudAuditPersistenceImpl(
environmentId,
Expand Down Expand Up @@ -139,7 +153,8 @@ private ExecutionPlanner getExecutionPlanner(RunnerId runnerId,
Http.RequestBuilderFactory requestBuilderFactory,
AuthManager authManager,
EnvironmentId environmentId,
ServiceId serviceId) {
ServiceId serviceId,
List<TargetSystemAuditMarker> auditMarkers) {
LockServiceClient lockClient = new HttpLockServiceClient(
cloudConfiguration.getHost(),
cloudConfiguration.getApiVersion(),
Expand All @@ -162,7 +177,7 @@ private ExecutionPlanner getExecutionPlanner(RunnerId runnerId,
executionPlannerClient,
coreConfiguration,
new CloudLockService(lockClient),
null,
auditMarkers,
TimeService.getDefault()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class CloudExecutionPlanner extends ExecutionPlanner {
Expand All @@ -56,19 +57,19 @@ public class CloudExecutionPlanner extends ExecutionPlanner {

private final ExecutionPlannerClient client;

private final TargetSystemAuditMarker ongoingStatusRepository;
private final List<TargetSystemAuditMarker> auditMarkers;

public CloudExecutionPlanner(RunnerId runnerId,
ExecutionPlannerClient client,
CoreConfigurable coreConfiguration,
CloudLockService lockService,
TargetSystemAuditMarker ongoingStatusRepository,
List<TargetSystemAuditMarker> auditMarkers,
TimeService timeService) {
this.client = client;
this.runnerId = runnerId;
this.coreConfiguration = coreConfiguration;
this.lockService = lockService;
this.ongoingStatusRepository = ongoingStatusRepository;
this.auditMarkers = auditMarkers;
this.timeService = timeService;
}

Expand Down Expand Up @@ -135,7 +136,7 @@ public ExecutionPlan getNextExecution(List<AbstractLoadedStage> loadedStages) th

private ExecutionPlanResponse createExecution(List<AbstractLoadedStage> loadedStages, String lastAcquisitionId, long elapsedMillis) {

Map<String, TargetSystemAuditMarkType> auditMarks = getOngoingStatuses()
Map<String, TargetSystemAuditMarkType> auditMarks = getAuditMarkers()
.stream()
.collect(Collectors.toMap(TargetSystemAuditMark::getChangeId, TargetSystemAuditMark::getOperation));

Expand All @@ -149,8 +150,14 @@ private ExecutionPlanResponse createExecution(List<AbstractLoadedStage> loadedSt
return responsePlan;
}

private Collection<TargetSystemAuditMark> getOngoingStatuses() {
return ongoingStatusRepository != null ? ongoingStatusRepository.listAll() : Collections.emptySet();
private Collection<TargetSystemAuditMark> getAuditMarkers() {
if (auditMarkers == null || auditMarkers.isEmpty()) {
return Collections.emptySet();
}
return auditMarkers.stream()
.map(TargetSystemAuditMarker::listAll)
.flatMap(Set::stream)
.collect(Collectors.toSet());
}

private ExecutionPlan buildNextExecutionPlan(List<AbstractLoadedStage> loadedStages, ExecutionPlanResponse response) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,13 @@
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;

import io.flamingock.cloud.api.request.ExecutionPlanRequest;
import io.flamingock.cloud.api.request.ChangeRequest;
import io.flamingock.cloud.api.vo.CloudTargetSystemAuditMarkType;
import io.flamingock.internal.common.core.targets.TargetSystemAuditMarkType;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -188,6 +194,23 @@ void shouldBuildAbortPlanFromAbortResponse() {
assertEquals(ChangeAction.APPLY, actions.get(change2.getId()));
}

@Test
@DisplayName("Should map ongoing status from audit marks to ChangeRequests in toRequest()")
void shouldMapOngoingStatusFromAuditMarksToChangeRequests() {
List<AbstractLoadedStage> loadedStages = Arrays.asList(buildStage("stage-1", change1, change2));

HashMap<String, TargetSystemAuditMarkType> ongoingStatusesMap = new HashMap<>();
ongoingStatusesMap.put(change1.getId(), TargetSystemAuditMarkType.APPLIED);

ExecutionPlanRequest request = CloudExecutionPlanMapper.toRequest(loadedStages, 60000L, ongoingStatusesMap);

Map<String, CloudTargetSystemAuditMarkType> marksByChangeId = request.getClientSubmission().getStages().get(0).getChanges().stream()
.collect(Collectors.toMap(ChangeRequest::getId, ChangeRequest::getOngoingStatus));

assertEquals(CloudTargetSystemAuditMarkType.APPLIED, marksByChangeId.get(change1.getId()));
assertEquals(CloudTargetSystemAuditMarkType.NONE, marksByChangeId.get(change2.getId()));
}

private static DefaultLoadedStage buildStage(String name, AbstractLoadedChange... changes) {
return new DefaultLoadedStage(name, StageType.DEFAULT, Arrays.asList(changes));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,80 +16,91 @@
package io.flamingock.cloud.planner;

import io.flamingock.api.StageType;
import io.flamingock.cloud.api.request.ExecutionPlanRequest;
import io.flamingock.cloud.api.request.ChangeRequest;
import io.flamingock.cloud.api.response.ChangeResponse;
import io.flamingock.cloud.api.response.ExecutionPlanResponse;
import io.flamingock.cloud.api.response.StageResponse;
import io.flamingock.cloud.api.vo.CloudChangeAction;
import io.flamingock.cloud.api.vo.CloudExecutionAction;
import io.flamingock.cloud.api.vo.CloudTargetSystemAuditMarkType;
import io.flamingock.cloud.lock.CloudLockService;
import io.flamingock.cloud.planner.client.ExecutionPlannerClient;
import io.flamingock.internal.common.core.recovery.ManualInterventionRequiredException;
import io.flamingock.internal.common.core.targets.TargetSystemAuditMarkType;
import io.flamingock.internal.core.change.loaded.AbstractLoadedChange;
import io.flamingock.internal.core.change.loaded.LoadedChangeBuilder;
import io.flamingock.internal.core.configuration.core.CoreConfigurable;
import io.flamingock.internal.core.external.targets.mark.TargetSystemAuditMark;
import io.flamingock.internal.core.external.targets.mark.TargetSystemAuditMarker;
import io.flamingock.internal.core.plan.ExecutionPlan;
import io.flamingock.internal.core.pipeline.loaded.stage.AbstractLoadedStage;
import io.flamingock.internal.core.pipeline.loaded.stage.DefaultLoadedStage;
import io.flamingock.internal.util.TimeService;
import io.flamingock.internal.util.id.RunnerId;
import io.flamingock.core.cloud.changes._001__CloudChange1;
import io.flamingock.core.cloud.changes._002__CloudChange2;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

class CloudExecutionPlannerTest {

private static AbstractLoadedChange change1;
private static AbstractLoadedChange change2;

private ExecutionPlannerClient client;
private CloudExecutionPlanner planner;
private CoreConfigurable config;

@BeforeAll
static void setupChanges() {
change1 = LoadedChangeBuilder.getCodeBuilderInstance(_001__CloudChange1.class).build();
change2 = LoadedChangeBuilder.getCodeBuilderInstance(_002__CloudChange2.class).build();
}

@BeforeEach
void setup() {
client = mock(ExecutionPlannerClient.class);
CoreConfigurable config = mock(CoreConfigurable.class);
config = mock(CoreConfigurable.class);
when(config.getLockAcquiredForMillis()).thenReturn(60000L);
when(config.getLockQuitTryingAfterMillis()).thenReturn(30000L);
when(config.getLockTryFrequencyMillis()).thenReturn(1000L);
}

TargetSystemAuditMarker auditMarker = mock(TargetSystemAuditMarker.class);
when(auditMarker.listAll()).thenReturn(Collections.emptySet());

planner = new CloudExecutionPlanner(
private CloudExecutionPlanner buildPlanner(List<TargetSystemAuditMarker> auditMarkers) {
return new CloudExecutionPlanner(
RunnerId.fromString("test-runner"),
client,
config,
mock(CloudLockService.class),
auditMarker,
auditMarkers,
TimeService.getDefault()
);
}

@Test
@DisplayName("Should return ABORT plan when server returns ABORT with MANUAL_INTERVENTION changes")
void shouldReturnAbortPlanWhenServerReturnsAbort() {
CloudExecutionPlanner planner = buildPlanner(Collections.emptyList());

ExecutionPlanResponse response = new ExecutionPlanResponse(
CloudExecutionAction.ABORT,
"exec-1",
null,
CloudExecutionAction.ABORT, "exec-1", null,
Collections.singletonList(new StageResponse("stage-1", 0,
Collections.singletonList(new ChangeResponse(change1.getId(), CloudChangeAction.MANUAL_INTERVENTION))))
);
Expand All @@ -101,17 +112,16 @@ void shouldReturnAbortPlanWhenServerReturnsAbort() {
ExecutionPlan plan = planner.getNextExecution(stages);

assertTrue(plan.isAborted());
assertFalse(plan.isExecutionRequired());
assertThrows(ManualInterventionRequiredException.class, plan::validate);
}

@Test
@DisplayName("Should return ABORT plan that throws FlamingockException when server returns ABORT but no MI changes")
void shouldReturnAbortPlanWhenServerReturnsAbortWithNoMIChanges() {
CloudExecutionPlanner planner = buildPlanner(Collections.emptyList());

ExecutionPlanResponse response = new ExecutionPlanResponse(
CloudExecutionAction.ABORT,
"exec-1",
null,
CloudExecutionAction.ABORT, "exec-1", null,
Collections.singletonList(new StageResponse("stage-1", 0,
Collections.singletonList(new ChangeResponse(change1.getId(), CloudChangeAction.APPLY))))
);
Expand All @@ -125,4 +135,67 @@ void shouldReturnAbortPlanWhenServerReturnsAbortWithNoMIChanges() {
assertTrue(plan.isAborted());
assertThrows(io.flamingock.internal.common.core.error.FlamingockException.class, plan::validate);
}

@Test
@DisplayName("Should include audit marks from multiple target systems in the execution request")
void shouldIncludeAuditMarksInExecutionRequest() {
TargetSystemAuditMarker marker1 = mock(TargetSystemAuditMarker.class);
when(marker1.listAll()).thenReturn(new HashSet<>(Collections.singletonList(
new TargetSystemAuditMark(change1.getId(), TargetSystemAuditMarkType.APPLIED)
)));

TargetSystemAuditMarker marker2 = mock(TargetSystemAuditMarker.class);
when(marker2.listAll()).thenReturn(new HashSet<>(Collections.singletonList(
new TargetSystemAuditMark(change2.getId(), TargetSystemAuditMarkType.ROLLED_BACK)
)));

CloudExecutionPlanner planner = buildPlanner(Arrays.asList(marker1, marker2));

ExecutionPlanResponse response = new ExecutionPlanResponse(
CloudExecutionAction.CONTINUE, "exec-1", null,
Collections.singletonList(new StageResponse("stage-1", 0, Arrays.asList(
new ChangeResponse(change1.getId(), CloudChangeAction.SKIP),
new ChangeResponse(change2.getId(), CloudChangeAction.APPLY))))
);
when(client.createExecution(any(), any(), anyLong())).thenReturn(response);

List<AbstractLoadedStage> stages = Collections.singletonList(
new DefaultLoadedStage("stage-1", StageType.DEFAULT, Arrays.asList(change1, change2)));

planner.getNextExecution(stages);

ArgumentCaptor<ExecutionPlanRequest> requestCaptor = ArgumentCaptor.forClass(ExecutionPlanRequest.class);
verify(client).createExecution(requestCaptor.capture(), any(), anyLong());

ExecutionPlanRequest request = requestCaptor.getValue();
Map<String, CloudTargetSystemAuditMarkType> marksByChangeId = request.getClientSubmission().getStages().get(0).getChanges().stream()
.collect(Collectors.toMap(ChangeRequest::getId, ChangeRequest::getOngoingStatus));

assertEquals(CloudTargetSystemAuditMarkType.APPLIED, marksByChangeId.get(change1.getId()));
assertEquals(CloudTargetSystemAuditMarkType.ROLLED_BACK, marksByChangeId.get(change2.getId()));
}

@Test
@DisplayName("Should send NONE status when no audit marks exist for a change")
void shouldSendNoneStatusWhenNoMarks() {
CloudExecutionPlanner planner = buildPlanner(Collections.emptyList());

ExecutionPlanResponse response = new ExecutionPlanResponse(
CloudExecutionAction.CONTINUE, "exec-1", null,
Collections.singletonList(new StageResponse("stage-1", 0,
Collections.singletonList(new ChangeResponse(change1.getId(), CloudChangeAction.SKIP))))
);
when(client.createExecution(any(), any(), anyLong())).thenReturn(response);

List<AbstractLoadedStage> stages = Collections.singletonList(
new DefaultLoadedStage("stage-1", StageType.DEFAULT, Collections.singletonList(change1)));

planner.getNextExecution(stages);

ArgumentCaptor<ExecutionPlanRequest> requestCaptor = ArgumentCaptor.forClass(ExecutionPlanRequest.class);
verify(client).createExecution(requestCaptor.capture(), any(), anyLong());

ChangeRequest changeRequest = requestCaptor.getValue().getClientSubmission().getStages().get(0).getChanges().get(0);
assertEquals(CloudTargetSystemAuditMarkType.NONE, changeRequest.getOngoingStatus());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@

import java.util.LinkedList;
import java.util.List;
import java.util.UUID;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -142,12 +141,12 @@ void happyPath() {
runner.execute();

//THEN
verify(cloudTargetSystem.getOnGoingChangeStatusRepository(), new Times(2)).listAll();
verify(cloudTargetSystem.getOnGoingChangeStatusRepository(), new Times(1)).mark(new TargetSystemAuditMark("create-persons-table-from-template", TargetSystemAuditMarkType.APPLIED));
verify(cloudTargetSystem.getAuditMarker(), new Times(2)).listAll();
verify(cloudTargetSystem.getAuditMarker(), new Times(1)).mark(new TargetSystemAuditMark("create-persons-table-from-template", TargetSystemAuditMarkType.APPLIED));

ArgumentCaptor<String> changeIdValuesCaptor = ArgumentCaptor.forClass(String.class);
verify(cloudTargetSystem.getOnGoingChangeStatusRepository(), new Times(1)).mark(new TargetSystemAuditMark("create-persons-table-from-template-2", TargetSystemAuditMarkType.APPLIED));
verify(cloudTargetSystem.getOnGoingChangeStatusRepository(), new Times(2)).clearMark(changeIdValuesCaptor.capture());
verify(cloudTargetSystem.getAuditMarker(), new Times(1)).mark(new TargetSystemAuditMark("create-persons-table-from-template-2", TargetSystemAuditMarkType.APPLIED));
verify(cloudTargetSystem.getAuditMarker(), new Times(2)).clearMark(changeIdValuesCaptor.capture());
List<String> allValues = changeIdValuesCaptor.getAllValues();

Assertions.assertEquals("create-persons-table-from-template", allValues.get(0));
Expand Down
Loading
Loading