Skip to content

Commit d202d96

Browse files
authored
refactor: clear temporal marks after server being synchronised (#896)
1 parent f3d22fc commit d202d96

4 files changed

Lines changed: 185 additions & 13 deletions

File tree

cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/response/ExecutionPlanResponse.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,16 @@
2222

2323
public class ExecutionPlanResponse {
2424

25+
private String executionId;
2526

2627
private CloudExecutionAction action;
2728

28-
private String executionId;
29-
3029
private LockInfoResponse lock;
3130

3231
private List<StageResponse> stages;
3332

33+
private boolean synchronizedMarks;
34+
3435

3536
public ExecutionPlanResponse() {
3637
}
@@ -45,10 +46,19 @@ public ExecutionPlanResponse(CloudExecutionAction action,
4546
String executionId,
4647
LockInfoResponse lock,
4748
List<StageResponse> stages) {
49+
this(action, executionId, lock, stages, false);
50+
}
51+
52+
public ExecutionPlanResponse(CloudExecutionAction action,
53+
String executionId,
54+
LockInfoResponse lock,
55+
List<StageResponse> stages,
56+
boolean synchronizedMarks) {
4857
this.action = action;
4958
this.executionId = executionId;
5059
this.lock = lock;
5160
this.stages = stages;
61+
this.synchronizedMarks = synchronizedMarks;
5262
}
5363

5464
public void setAction(CloudExecutionAction action) {
@@ -99,6 +109,14 @@ public boolean isAbort() {
99109
return action == CloudExecutionAction.ABORT;
100110
}
101111

112+
public boolean isSynchronizedMarks() {
113+
return synchronizedMarks;
114+
}
115+
116+
public void setSynchronizedMarks(boolean synchronizedMarks) {
117+
this.synchronizedMarks = synchronizedMarks;
118+
}
119+
102120
public void validate() {
103121
if (isExecute() && executionId == null) {
104122
throw new RuntimeException("ExecutionPlan must contain a valid executionId");

cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/planner/CloudExecutionPlanner.java

Lines changed: 58 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838

3939
import java.util.Collection;
4040
import java.util.Collections;
41+
import java.util.HashMap;
42+
import java.util.HashSet;
4143
import java.util.List;
4244
import java.util.Map;
4345
import java.util.Set;
@@ -76,6 +78,7 @@ public CloudExecutionPlanner(RunnerId runnerId,
7678
@Override
7779
public ExecutionPlan getNextExecution(List<AbstractLoadedStage> loadedStages) throws LockException {
7880

81+
AuditMarkSnapshot snapshot = buildAuditMarkSnapshot();
7982

8083
//In every execution, as it start a stopwatch
8184
ThreadSleeper lockThreadSleeper = new ThreadSleeper(
@@ -87,8 +90,13 @@ public ExecutionPlan getNextExecution(List<AbstractLoadedStage> loadedStages) th
8790
do {
8891
try {
8992
logger.info("Requesting cloud execution plan - elapsed[{}ms]", counterPerGuid.getElapsed());
90-
ExecutionPlanResponse response = createExecution(loadedStages, lastOwnerGuid, counterPerGuid.getElapsed());
93+
ExecutionPlanResponse response = createExecution(loadedStages, snapshot.getMarks(), lastOwnerGuid, counterPerGuid.getElapsed());
9194
logger.info("Obtained cloud execution plan: {}", response.getAction());
95+
96+
if (response.isSynchronizedMarks()) {
97+
snapshot = clearSynchronizedMarks(snapshot);
98+
}
99+
92100
if (response.isContinue()) {
93101
List<ExecutableStage> executableStages = CloudExecutionPlanMapper.getExecutableStages(response, loadedStages);
94102
return ExecutionPlan.CONTINUE(executableStages);
@@ -134,30 +142,49 @@ public ExecutionPlan getNextExecution(List<AbstractLoadedStage> loadedStages) th
134142
} while (true);
135143
}
136144

137-
private ExecutionPlanResponse createExecution(List<AbstractLoadedStage> loadedStages, String lastAcquisitionId, long elapsedMillis) {
145+
private ExecutionPlanResponse createExecution(List<AbstractLoadedStage> loadedStages,
146+
Collection<TargetSystemAuditMark> auditMarks,
147+
String lastAcquisitionId,
148+
long elapsedMillis) {
138149

139-
Map<String, TargetSystemAuditMarkType> auditMarks = getAuditMarkers()
150+
Map<String, TargetSystemAuditMarkType> auditMarksMap = auditMarks
140151
.stream()
141152
.collect(Collectors.toMap(TargetSystemAuditMark::getChangeId, TargetSystemAuditMark::getOperation));
142153

143154
ExecutionPlanRequest requestBody = CloudExecutionPlanMapper.toRequest(
144155
loadedStages,
145156
coreConfiguration.getLockAcquiredForMillis(),
146-
auditMarks);
157+
auditMarksMap);
147158

148159
ExecutionPlanResponse responsePlan = client.createExecution(requestBody, lastAcquisitionId, elapsedMillis);
149160
responsePlan.validate();
150161
return responsePlan;
151162
}
152163

153-
private Collection<TargetSystemAuditMark> getAuditMarkers() {
164+
private AuditMarkSnapshot buildAuditMarkSnapshot() {
154165
if (auditMarkers == null || auditMarkers.isEmpty()) {
155-
return Collections.emptySet();
166+
return AuditMarkSnapshot.empty();
167+
}
168+
Set<TargetSystemAuditMark> allMarks = new HashSet<>();
169+
Map<String, TargetSystemAuditMarker> markerByChangeId = new HashMap<>();
170+
for (TargetSystemAuditMarker marker : auditMarkers) {
171+
for (TargetSystemAuditMark mark : marker.listAll()) {
172+
allMarks.add(mark);
173+
markerByChangeId.put(mark.getChangeId(), marker);
174+
}
175+
}
176+
return new AuditMarkSnapshot(allMarks, markerByChangeId);
177+
}
178+
179+
private AuditMarkSnapshot clearSynchronizedMarks(AuditMarkSnapshot snapshot) {
180+
Map<String, TargetSystemAuditMarker> markerByChangeId = snapshot.getMarkerByChangeId();
181+
for (Map.Entry<String, TargetSystemAuditMarker> entry : markerByChangeId.entrySet()) {
182+
entry.getValue().clearMark(entry.getKey());
183+
}
184+
if (!markerByChangeId.isEmpty()) {
185+
logger.info("Cleared {} synchronized audit marks", markerByChangeId.size());
156186
}
157-
return auditMarkers.stream()
158-
.map(TargetSystemAuditMarker::listAll)
159-
.flatMap(Set::stream)
160-
.collect(Collectors.toSet());
187+
return AuditMarkSnapshot.empty();
161188
}
162189

163190
private ExecutionPlan buildNextExecutionPlan(List<AbstractLoadedStage> loadedStages, ExecutionPlanResponse response) {
@@ -168,4 +195,25 @@ private ExecutionPlan buildNextExecutionPlan(List<AbstractLoadedStage> loadedSta
168195
);
169196
}
170197

198+
static class AuditMarkSnapshot {
199+
private final Collection<TargetSystemAuditMark> marks;
200+
private final Map<String, TargetSystemAuditMarker> markerByChangeId;
201+
202+
static AuditMarkSnapshot empty() {
203+
return new AuditMarkSnapshot(Collections.emptySet(), Collections.emptyMap());
204+
}
205+
206+
AuditMarkSnapshot(Collection<TargetSystemAuditMark> marks, Map<String, TargetSystemAuditMarker> markerByChangeId) {
207+
this.marks = marks;
208+
this.markerByChangeId = markerByChangeId;
209+
}
210+
211+
Collection<TargetSystemAuditMark> getMarks() {
212+
return marks;
213+
}
214+
215+
Map<String, TargetSystemAuditMarker> getMarkerByChangeId() {
216+
return markerByChangeId;
217+
}
218+
}
171219
}

cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlannerTest.java

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import static org.mockito.ArgumentMatchers.any;
5858
import static org.mockito.ArgumentMatchers.anyLong;
5959
import static org.mockito.Mockito.mock;
60+
import static org.mockito.Mockito.never;
6061
import static org.mockito.Mockito.verify;
6162
import static org.mockito.Mockito.when;
6263

@@ -198,4 +199,109 @@ void shouldSendNoneStatusWhenNoMarks() {
198199
ChangeRequest changeRequest = requestCaptor.getValue().getClientSubmission().getStages().get(0).getChanges().get(0);
199200
assertEquals(CloudTargetSystemAuditMarkType.NONE, changeRequest.getOngoingStatus());
200201
}
202+
203+
@Test
204+
@DisplayName("Should clear marks when response has synchronizedMarks=true")
205+
void shouldClearMarksWhenResponseHasSynchronizedMarksTrue() {
206+
TargetSystemAuditMarker marker1 = mock(TargetSystemAuditMarker.class);
207+
when(marker1.listAll()).thenReturn(new HashSet<>(Collections.singletonList(
208+
new TargetSystemAuditMark(change1.getId(), TargetSystemAuditMarkType.APPLIED)
209+
)));
210+
211+
TargetSystemAuditMarker marker2 = mock(TargetSystemAuditMarker.class);
212+
when(marker2.listAll()).thenReturn(new HashSet<>(Collections.singletonList(
213+
new TargetSystemAuditMark(change2.getId(), TargetSystemAuditMarkType.ROLLED_BACK)
214+
)));
215+
216+
CloudExecutionPlanner planner = buildPlanner(Arrays.asList(marker1, marker2));
217+
218+
ExecutionPlanResponse response = buildSyncResponse(CloudExecutionAction.CONTINUE, true);
219+
when(client.createExecution(any(), any(), anyLong())).thenReturn(response);
220+
221+
List<AbstractLoadedStage> stages = Collections.singletonList(
222+
new DefaultLoadedStage("stage-1", StageType.DEFAULT, Arrays.asList(change1, change2)));
223+
224+
planner.getNextExecution(stages);
225+
226+
verify(marker1).clearMark(change1.getId());
227+
verify(marker2).clearMark(change2.getId());
228+
}
229+
230+
@Test
231+
@DisplayName("Should not clear marks when response has synchronizedMarks=false")
232+
void shouldNotClearMarksWhenResponseHasSynchronizedMarksFalse() {
233+
TargetSystemAuditMarker marker1 = mock(TargetSystemAuditMarker.class);
234+
when(marker1.listAll()).thenReturn(new HashSet<>(Collections.singletonList(
235+
new TargetSystemAuditMark(change1.getId(), TargetSystemAuditMarkType.APPLIED)
236+
)));
237+
238+
CloudExecutionPlanner planner = buildPlanner(Collections.singletonList(marker1));
239+
240+
ExecutionPlanResponse response = buildSyncResponse(CloudExecutionAction.CONTINUE, false);
241+
when(client.createExecution(any(), any(), anyLong())).thenReturn(response);
242+
243+
List<AbstractLoadedStage> stages = Collections.singletonList(
244+
new DefaultLoadedStage("stage-1", StageType.DEFAULT, Collections.singletonList(change1)));
245+
246+
planner.getNextExecution(stages);
247+
248+
verify(marker1, never()).clearMark(any());
249+
}
250+
251+
@Test
252+
@DisplayName("Should clear marks regardless of response action (ABORT)")
253+
void shouldClearMarksRegardlessOfResponseAction() {
254+
TargetSystemAuditMarker marker1 = mock(TargetSystemAuditMarker.class);
255+
when(marker1.listAll()).thenReturn(new HashSet<>(Collections.singletonList(
256+
new TargetSystemAuditMark(change1.getId(), TargetSystemAuditMarkType.APPLIED)
257+
)));
258+
259+
CloudExecutionPlanner planner = buildPlanner(Collections.singletonList(marker1));
260+
261+
ExecutionPlanResponse response = buildSyncResponse(CloudExecutionAction.ABORT, true);
262+
when(client.createExecution(any(), any(), anyLong())).thenReturn(response);
263+
264+
List<AbstractLoadedStage> stages = Collections.singletonList(
265+
new DefaultLoadedStage("stage-1", StageType.DEFAULT, Collections.singletonList(change1)));
266+
267+
planner.getNextExecution(stages);
268+
269+
verify(marker1).clearMark(change1.getId());
270+
}
271+
272+
@Test
273+
@DisplayName("Should only clear marks that were captured in the snapshot, not new ones")
274+
void shouldNotClearNewMarksWrittenAfterRequest() {
275+
TargetSystemAuditMarker marker1 = mock(TargetSystemAuditMarker.class);
276+
// At snapshot time, only change1 has a mark
277+
when(marker1.listAll()).thenReturn(new HashSet<>(Collections.singletonList(
278+
new TargetSystemAuditMark(change1.getId(), TargetSystemAuditMarkType.APPLIED)
279+
)));
280+
281+
CloudExecutionPlanner planner = buildPlanner(Collections.singletonList(marker1));
282+
283+
ExecutionPlanResponse response = buildSyncResponse(CloudExecutionAction.CONTINUE, true);
284+
when(client.createExecution(any(), any(), anyLong())).thenReturn(response);
285+
286+
List<AbstractLoadedStage> stages = Collections.singletonList(
287+
new DefaultLoadedStage("stage-1", StageType.DEFAULT, Arrays.asList(change1, change2)));
288+
289+
planner.getNextExecution(stages);
290+
291+
// Only change1 should be cleared (was in snapshot), not change2
292+
verify(marker1).clearMark(change1.getId());
293+
verify(marker1, never()).clearMark(change2.getId());
294+
}
295+
296+
private ExecutionPlanResponse buildSyncResponse(CloudExecutionAction action, boolean synchronizedMarks) {
297+
ExecutionPlanResponse response = new ExecutionPlanResponse(
298+
action, "exec-1", null,
299+
Collections.singletonList(new StageResponse("stage-1", 0,
300+
Arrays.asList(
301+
new ChangeResponse(change1.getId(), CloudChangeAction.SKIP),
302+
new ChangeResponse(change2.getId(), CloudChangeAction.SKIP))))
303+
);
304+
response.setSynchronizedMarks(synchronizedMarks);
305+
return response;
306+
}
201307
}

core/target-systems/flamingock-sql-targetsystem/src/main/java/io/flamingock/targetsystem/sql/SqlTargetSystemAuditMarker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ public static class Builder {
107107
private final DataSource dataSource;
108108
private final TransactionManager<Connection> txManager;
109109
private SqlAuditMarkerDialectHelper dialectHelper;
110-
private String tableName = "FLAMINGOCK_ONGOING_CHANGES";
110+
private String tableName = "flamingock_audit_marks";
111111
private boolean autoCreate = true;
112112

113113
public Builder(DataSource dataSource, TransactionManager<Connection> txManager) {

0 commit comments

Comments
 (0)