Skip to content

Commit 8200d40

Browse files
authored
feat: add Audit marker foundation and sql (#895)
1 parent 5722899 commit 8200d40

File tree

18 files changed

+378
-57
lines changed

18 files changed

+378
-57
lines changed

cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/CloudAuditStoreImpl.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@
3636
import io.flamingock.cloud.planner.client.ExecutionPlannerClient;
3737
import io.flamingock.cloud.planner.client.HttpExecutionPlannerClient;
3838
import io.flamingock.internal.core.external.store.audit.LifecycleAuditWriter;
39+
import io.flamingock.internal.core.external.targets.TransactionalTargetSystem;
40+
import io.flamingock.internal.core.external.targets.TargetSystemManager;
41+
import io.flamingock.internal.core.external.targets.mark.TargetSystemAuditMarker;
3942
import io.flamingock.internal.core.plan.ExecutionPlanner;
4043
import io.flamingock.internal.common.core.context.ContextResolver;
4144
import org.apache.http.impl.client.HttpClients;
@@ -44,6 +47,8 @@
4447
import org.slf4j.Logger;
4548

4649
import java.io.IOException;
50+
import java.util.List;
51+
import java.util.stream.Collectors;
4752

4853
public class CloudAuditStoreImpl implements CloudAuditStore {
4954
private static final Logger logger = FlamingockLoggerFactory.getLogger("CloudAuditStore");
@@ -61,6 +66,12 @@ public void initialize(ContextResolver baseContext) {
6166
CoreConfigurable coreConfiguration = baseContext.getRequiredDependencyValue(CoreConfigurable.class);
6267
CloudConfigurable cloudConfiguration = baseContext.getRequiredDependencyValue(CloudConfigurable.class);
6368

69+
TargetSystemManager targetSystemManager = baseContext.getRequiredDependencyValue(TargetSystemManager.class);
70+
List<TargetSystemAuditMarker> auditMarkers = targetSystemManager.getTransactionalTargetSystems().stream()
71+
.filter(TransactionalTargetSystem::hasMarker)
72+
.map(TransactionalTargetSystem::getAuditMarker)
73+
.collect(Collectors.toList());
74+
6475
Http.RequestBuilderFactory requestBuilderFactory =
6576
Http.builderFactory(HttpClients.createDefault(), JsonObjectMapper.DEFAULT_INSTANCE);
6677

@@ -69,7 +80,8 @@ public void initialize(ContextResolver baseContext) {
6980
runnerId,
7081
coreConfiguration,
7182
cloudConfiguration,
72-
requestBuilderFactory
83+
requestBuilderFactory,
84+
auditMarkers
7385
);
7486
}
7587
}
@@ -83,7 +95,8 @@ public CloudAuditPersistenceImpl getPersistence() {
8395
private CloudAuditPersistenceImpl buildPersistence(RunnerId runnerId,
8496
CoreConfigurable coreConfiguration,
8597
CloudConfigurable cloudConfiguration,
86-
Http.RequestBuilderFactory requestBuilderFactory) {
98+
Http.RequestBuilderFactory requestBuilderFactory,
99+
List<TargetSystemAuditMarker> auditMarkers) {
87100
AuthManager authManager = new AuthManager(
88101
cloudConfiguration.getApiToken(),
89102
cloudConfiguration.getServiceName(),
@@ -111,7 +124,8 @@ private CloudAuditPersistenceImpl buildPersistence(RunnerId runnerId,
111124
requestBuilderFactory,
112125
authManager,
113126
environmentId,
114-
serviceId);
127+
serviceId,
128+
auditMarkers);
115129

116130
return new CloudAuditPersistenceImpl(
117131
environmentId,
@@ -139,7 +153,8 @@ private ExecutionPlanner getExecutionPlanner(RunnerId runnerId,
139153
Http.RequestBuilderFactory requestBuilderFactory,
140154
AuthManager authManager,
141155
EnvironmentId environmentId,
142-
ServiceId serviceId) {
156+
ServiceId serviceId,
157+
List<TargetSystemAuditMarker> auditMarkers) {
143158
LockServiceClient lockClient = new HttpLockServiceClient(
144159
cloudConfiguration.getHost(),
145160
cloudConfiguration.getApiVersion(),
@@ -162,7 +177,7 @@ private ExecutionPlanner getExecutionPlanner(RunnerId runnerId,
162177
executionPlannerClient,
163178
coreConfiguration,
164179
new CloudLockService(lockClient),
165-
null,
180+
auditMarkers,
166181
TimeService.getDefault()
167182
);
168183
}

cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/planner/CloudExecutionPlanner.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import java.util.Collections;
4141
import java.util.List;
4242
import java.util.Map;
43+
import java.util.Set;
4344
import java.util.stream.Collectors;
4445

4546
public class CloudExecutionPlanner extends ExecutionPlanner {
@@ -56,19 +57,19 @@ public class CloudExecutionPlanner extends ExecutionPlanner {
5657

5758
private final ExecutionPlannerClient client;
5859

59-
private final TargetSystemAuditMarker ongoingStatusRepository;
60+
private final List<TargetSystemAuditMarker> auditMarkers;
6061

6162
public CloudExecutionPlanner(RunnerId runnerId,
6263
ExecutionPlannerClient client,
6364
CoreConfigurable coreConfiguration,
6465
CloudLockService lockService,
65-
TargetSystemAuditMarker ongoingStatusRepository,
66+
List<TargetSystemAuditMarker> auditMarkers,
6667
TimeService timeService) {
6768
this.client = client;
6869
this.runnerId = runnerId;
6970
this.coreConfiguration = coreConfiguration;
7071
this.lockService = lockService;
71-
this.ongoingStatusRepository = ongoingStatusRepository;
72+
this.auditMarkers = auditMarkers;
7273
this.timeService = timeService;
7374
}
7475

@@ -135,7 +136,7 @@ public ExecutionPlan getNextExecution(List<AbstractLoadedStage> loadedStages) th
135136

136137
private ExecutionPlanResponse createExecution(List<AbstractLoadedStage> loadedStages, String lastAcquisitionId, long elapsedMillis) {
137138

138-
Map<String, TargetSystemAuditMarkType> auditMarks = getOngoingStatuses()
139+
Map<String, TargetSystemAuditMarkType> auditMarks = getAuditMarkers()
139140
.stream()
140141
.collect(Collectors.toMap(TargetSystemAuditMark::getChangeId, TargetSystemAuditMark::getOperation));
141142

@@ -149,8 +150,14 @@ private ExecutionPlanResponse createExecution(List<AbstractLoadedStage> loadedSt
149150
return responsePlan;
150151
}
151152

152-
private Collection<TargetSystemAuditMark> getOngoingStatuses() {
153-
return ongoingStatusRepository != null ? ongoingStatusRepository.listAll() : Collections.emptySet();
153+
private Collection<TargetSystemAuditMark> getAuditMarkers() {
154+
if (auditMarkers == null || auditMarkers.isEmpty()) {
155+
return Collections.emptySet();
156+
}
157+
return auditMarkers.stream()
158+
.map(TargetSystemAuditMarker::listAll)
159+
.flatMap(Set::stream)
160+
.collect(Collectors.toSet());
154161
}
155162

156163
private ExecutionPlan buildNextExecutionPlan(List<AbstractLoadedStage> loadedStages, ExecutionPlanResponse response) {

cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlanMapperTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,13 @@
3636
import org.junit.jupiter.api.DisplayName;
3737
import org.junit.jupiter.api.Test;
3838

39+
import io.flamingock.cloud.api.request.ExecutionPlanRequest;
40+
import io.flamingock.cloud.api.request.ChangeRequest;
41+
import io.flamingock.cloud.api.vo.CloudTargetSystemAuditMarkType;
42+
import io.flamingock.internal.common.core.targets.TargetSystemAuditMarkType;
43+
3944
import java.util.Arrays;
45+
import java.util.HashMap;
4046
import java.util.Collections;
4147
import java.util.List;
4248
import java.util.Map;
@@ -188,6 +194,23 @@ void shouldBuildAbortPlanFromAbortResponse() {
188194
assertEquals(ChangeAction.APPLY, actions.get(change2.getId()));
189195
}
190196

197+
@Test
198+
@DisplayName("Should map ongoing status from audit marks to ChangeRequests in toRequest()")
199+
void shouldMapOngoingStatusFromAuditMarksToChangeRequests() {
200+
List<AbstractLoadedStage> loadedStages = Arrays.asList(buildStage("stage-1", change1, change2));
201+
202+
HashMap<String, TargetSystemAuditMarkType> ongoingStatusesMap = new HashMap<>();
203+
ongoingStatusesMap.put(change1.getId(), TargetSystemAuditMarkType.APPLIED);
204+
205+
ExecutionPlanRequest request = CloudExecutionPlanMapper.toRequest(loadedStages, 60000L, ongoingStatusesMap);
206+
207+
Map<String, CloudTargetSystemAuditMarkType> marksByChangeId = request.getClientSubmission().getStages().get(0).getChanges().stream()
208+
.collect(Collectors.toMap(ChangeRequest::getId, ChangeRequest::getOngoingStatus));
209+
210+
assertEquals(CloudTargetSystemAuditMarkType.APPLIED, marksByChangeId.get(change1.getId()));
211+
assertEquals(CloudTargetSystemAuditMarkType.NONE, marksByChangeId.get(change2.getId()));
212+
}
213+
191214
private static DefaultLoadedStage buildStage(String name, AbstractLoadedChange... changes) {
192215
return new DefaultLoadedStage(name, StageType.DEFAULT, Arrays.asList(changes));
193216
}

cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlannerTest.java

Lines changed: 88 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,80 +16,91 @@
1616
package io.flamingock.cloud.planner;
1717

1818
import io.flamingock.api.StageType;
19+
import io.flamingock.cloud.api.request.ExecutionPlanRequest;
20+
import io.flamingock.cloud.api.request.ChangeRequest;
1921
import io.flamingock.cloud.api.response.ChangeResponse;
2022
import io.flamingock.cloud.api.response.ExecutionPlanResponse;
2123
import io.flamingock.cloud.api.response.StageResponse;
2224
import io.flamingock.cloud.api.vo.CloudChangeAction;
2325
import io.flamingock.cloud.api.vo.CloudExecutionAction;
26+
import io.flamingock.cloud.api.vo.CloudTargetSystemAuditMarkType;
2427
import io.flamingock.cloud.lock.CloudLockService;
2528
import io.flamingock.cloud.planner.client.ExecutionPlannerClient;
2629
import io.flamingock.internal.common.core.recovery.ManualInterventionRequiredException;
30+
import io.flamingock.internal.common.core.targets.TargetSystemAuditMarkType;
2731
import io.flamingock.internal.core.change.loaded.AbstractLoadedChange;
2832
import io.flamingock.internal.core.change.loaded.LoadedChangeBuilder;
2933
import io.flamingock.internal.core.configuration.core.CoreConfigurable;
34+
import io.flamingock.internal.core.external.targets.mark.TargetSystemAuditMark;
3035
import io.flamingock.internal.core.external.targets.mark.TargetSystemAuditMarker;
3136
import io.flamingock.internal.core.plan.ExecutionPlan;
3237
import io.flamingock.internal.core.pipeline.loaded.stage.AbstractLoadedStage;
3338
import io.flamingock.internal.core.pipeline.loaded.stage.DefaultLoadedStage;
3439
import io.flamingock.internal.util.TimeService;
3540
import io.flamingock.internal.util.id.RunnerId;
3641
import io.flamingock.core.cloud.changes._001__CloudChange1;
42+
import io.flamingock.core.cloud.changes._002__CloudChange2;
3743
import org.junit.jupiter.api.BeforeAll;
3844
import org.junit.jupiter.api.BeforeEach;
3945
import org.junit.jupiter.api.DisplayName;
4046
import org.junit.jupiter.api.Test;
47+
import org.mockito.ArgumentCaptor;
4148

4249
import java.util.Arrays;
4350
import java.util.Collections;
51+
import java.util.HashSet;
4452
import java.util.List;
53+
import java.util.Map;
54+
import java.util.stream.Collectors;
4555

4656
import static org.junit.jupiter.api.Assertions.*;
4757
import static org.mockito.ArgumentMatchers.any;
4858
import static org.mockito.ArgumentMatchers.anyLong;
49-
import static org.mockito.ArgumentMatchers.anyString;
5059
import static org.mockito.Mockito.mock;
60+
import static org.mockito.Mockito.verify;
5161
import static org.mockito.Mockito.when;
5262

5363
class CloudExecutionPlannerTest {
5464

5565
private static AbstractLoadedChange change1;
66+
private static AbstractLoadedChange change2;
5667

5768
private ExecutionPlannerClient client;
58-
private CloudExecutionPlanner planner;
69+
private CoreConfigurable config;
5970

6071
@BeforeAll
6172
static void setupChanges() {
6273
change1 = LoadedChangeBuilder.getCodeBuilderInstance(_001__CloudChange1.class).build();
74+
change2 = LoadedChangeBuilder.getCodeBuilderInstance(_002__CloudChange2.class).build();
6375
}
6476

6577
@BeforeEach
6678
void setup() {
6779
client = mock(ExecutionPlannerClient.class);
68-
CoreConfigurable config = mock(CoreConfigurable.class);
80+
config = mock(CoreConfigurable.class);
6981
when(config.getLockAcquiredForMillis()).thenReturn(60000L);
7082
when(config.getLockQuitTryingAfterMillis()).thenReturn(30000L);
7183
when(config.getLockTryFrequencyMillis()).thenReturn(1000L);
84+
}
7285

73-
TargetSystemAuditMarker auditMarker = mock(TargetSystemAuditMarker.class);
74-
when(auditMarker.listAll()).thenReturn(Collections.emptySet());
75-
76-
planner = new CloudExecutionPlanner(
86+
private CloudExecutionPlanner buildPlanner(List<TargetSystemAuditMarker> auditMarkers) {
87+
return new CloudExecutionPlanner(
7788
RunnerId.fromString("test-runner"),
7889
client,
7990
config,
8091
mock(CloudLockService.class),
81-
auditMarker,
92+
auditMarkers,
8293
TimeService.getDefault()
8394
);
8495
}
8596

8697
@Test
8798
@DisplayName("Should return ABORT plan when server returns ABORT with MANUAL_INTERVENTION changes")
8899
void shouldReturnAbortPlanWhenServerReturnsAbort() {
100+
CloudExecutionPlanner planner = buildPlanner(Collections.emptyList());
101+
89102
ExecutionPlanResponse response = new ExecutionPlanResponse(
90-
CloudExecutionAction.ABORT,
91-
"exec-1",
92-
null,
103+
CloudExecutionAction.ABORT, "exec-1", null,
93104
Collections.singletonList(new StageResponse("stage-1", 0,
94105
Collections.singletonList(new ChangeResponse(change1.getId(), CloudChangeAction.MANUAL_INTERVENTION))))
95106
);
@@ -101,17 +112,16 @@ void shouldReturnAbortPlanWhenServerReturnsAbort() {
101112
ExecutionPlan plan = planner.getNextExecution(stages);
102113

103114
assertTrue(plan.isAborted());
104-
assertFalse(plan.isExecutionRequired());
105115
assertThrows(ManualInterventionRequiredException.class, plan::validate);
106116
}
107117

108118
@Test
109119
@DisplayName("Should return ABORT plan that throws FlamingockException when server returns ABORT but no MI changes")
110120
void shouldReturnAbortPlanWhenServerReturnsAbortWithNoMIChanges() {
121+
CloudExecutionPlanner planner = buildPlanner(Collections.emptyList());
122+
111123
ExecutionPlanResponse response = new ExecutionPlanResponse(
112-
CloudExecutionAction.ABORT,
113-
"exec-1",
114-
null,
124+
CloudExecutionAction.ABORT, "exec-1", null,
115125
Collections.singletonList(new StageResponse("stage-1", 0,
116126
Collections.singletonList(new ChangeResponse(change1.getId(), CloudChangeAction.APPLY))))
117127
);
@@ -125,4 +135,67 @@ void shouldReturnAbortPlanWhenServerReturnsAbortWithNoMIChanges() {
125135
assertTrue(plan.isAborted());
126136
assertThrows(io.flamingock.internal.common.core.error.FlamingockException.class, plan::validate);
127137
}
138+
139+
@Test
140+
@DisplayName("Should include audit marks from multiple target systems in the execution request")
141+
void shouldIncludeAuditMarksInExecutionRequest() {
142+
TargetSystemAuditMarker marker1 = mock(TargetSystemAuditMarker.class);
143+
when(marker1.listAll()).thenReturn(new HashSet<>(Collections.singletonList(
144+
new TargetSystemAuditMark(change1.getId(), TargetSystemAuditMarkType.APPLIED)
145+
)));
146+
147+
TargetSystemAuditMarker marker2 = mock(TargetSystemAuditMarker.class);
148+
when(marker2.listAll()).thenReturn(new HashSet<>(Collections.singletonList(
149+
new TargetSystemAuditMark(change2.getId(), TargetSystemAuditMarkType.ROLLED_BACK)
150+
)));
151+
152+
CloudExecutionPlanner planner = buildPlanner(Arrays.asList(marker1, marker2));
153+
154+
ExecutionPlanResponse response = new ExecutionPlanResponse(
155+
CloudExecutionAction.CONTINUE, "exec-1", null,
156+
Collections.singletonList(new StageResponse("stage-1", 0, Arrays.asList(
157+
new ChangeResponse(change1.getId(), CloudChangeAction.SKIP),
158+
new ChangeResponse(change2.getId(), CloudChangeAction.APPLY))))
159+
);
160+
when(client.createExecution(any(), any(), anyLong())).thenReturn(response);
161+
162+
List<AbstractLoadedStage> stages = Collections.singletonList(
163+
new DefaultLoadedStage("stage-1", StageType.DEFAULT, Arrays.asList(change1, change2)));
164+
165+
planner.getNextExecution(stages);
166+
167+
ArgumentCaptor<ExecutionPlanRequest> requestCaptor = ArgumentCaptor.forClass(ExecutionPlanRequest.class);
168+
verify(client).createExecution(requestCaptor.capture(), any(), anyLong());
169+
170+
ExecutionPlanRequest request = requestCaptor.getValue();
171+
Map<String, CloudTargetSystemAuditMarkType> marksByChangeId = request.getClientSubmission().getStages().get(0).getChanges().stream()
172+
.collect(Collectors.toMap(ChangeRequest::getId, ChangeRequest::getOngoingStatus));
173+
174+
assertEquals(CloudTargetSystemAuditMarkType.APPLIED, marksByChangeId.get(change1.getId()));
175+
assertEquals(CloudTargetSystemAuditMarkType.ROLLED_BACK, marksByChangeId.get(change2.getId()));
176+
}
177+
178+
@Test
179+
@DisplayName("Should send NONE status when no audit marks exist for a change")
180+
void shouldSendNoneStatusWhenNoMarks() {
181+
CloudExecutionPlanner planner = buildPlanner(Collections.emptyList());
182+
183+
ExecutionPlanResponse response = new ExecutionPlanResponse(
184+
CloudExecutionAction.CONTINUE, "exec-1", null,
185+
Collections.singletonList(new StageResponse("stage-1", 0,
186+
Collections.singletonList(new ChangeResponse(change1.getId(), CloudChangeAction.SKIP))))
187+
);
188+
when(client.createExecution(any(), any(), anyLong())).thenReturn(response);
189+
190+
List<AbstractLoadedStage> stages = Collections.singletonList(
191+
new DefaultLoadedStage("stage-1", StageType.DEFAULT, Collections.singletonList(change1)));
192+
193+
planner.getNextExecution(stages);
194+
195+
ArgumentCaptor<ExecutionPlanRequest> requestCaptor = ArgumentCaptor.forClass(ExecutionPlanRequest.class);
196+
verify(client).createExecution(requestCaptor.capture(), any(), anyLong());
197+
198+
ChangeRequest changeRequest = requestCaptor.getValue().getClientSubmission().getStages().get(0).getChanges().get(0);
199+
assertEquals(CloudTargetSystemAuditMarkType.NONE, changeRequest.getOngoingStatus());
200+
}
128201
}

cloud/flamingock-cloud/src/test/java/io/flamingock/core/cloud/CloudTransactionTest.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939

4040
import java.util.LinkedList;
4141
import java.util.List;
42-
import java.util.UUID;
4342

4443
import static org.mockito.ArgumentMatchers.any;
4544
import static org.mockito.Mockito.verify;
@@ -142,12 +141,12 @@ void happyPath() {
142141
runner.execute();
143142

144143
//THEN
145-
verify(cloudTargetSystem.getOnGoingChangeStatusRepository(), new Times(2)).listAll();
146-
verify(cloudTargetSystem.getOnGoingChangeStatusRepository(), new Times(1)).mark(new TargetSystemAuditMark("create-persons-table-from-template", TargetSystemAuditMarkType.APPLIED));
144+
verify(cloudTargetSystem.getAuditMarker(), new Times(2)).listAll();
145+
verify(cloudTargetSystem.getAuditMarker(), new Times(1)).mark(new TargetSystemAuditMark("create-persons-table-from-template", TargetSystemAuditMarkType.APPLIED));
147146

148147
ArgumentCaptor<String> changeIdValuesCaptor = ArgumentCaptor.forClass(String.class);
149-
verify(cloudTargetSystem.getOnGoingChangeStatusRepository(), new Times(1)).mark(new TargetSystemAuditMark("create-persons-table-from-template-2", TargetSystemAuditMarkType.APPLIED));
150-
verify(cloudTargetSystem.getOnGoingChangeStatusRepository(), new Times(2)).clearMark(changeIdValuesCaptor.capture());
148+
verify(cloudTargetSystem.getAuditMarker(), new Times(1)).mark(new TargetSystemAuditMark("create-persons-table-from-template-2", TargetSystemAuditMarkType.APPLIED));
149+
verify(cloudTargetSystem.getAuditMarker(), new Times(2)).clearMark(changeIdValuesCaptor.capture());
151150
List<String> allValues = changeIdValuesCaptor.getAllValues();
152151

153152
Assertions.assertEquals("create-persons-table-from-template", allValues.get(0));

0 commit comments

Comments
 (0)