Skip to content

Commit 83021a8

Browse files
committed
feat: added flag to ignore unknown changes when migrating from Mongock
Add a new @MongockSupport flag, ignoreUnknownAuditEntries, to relax legacy Mongock audit import when the origin contains entries that do not match any change in the current Flamingock pipeline. - add the new annotation field and internal property key - propagate the flag through the Mongock annotation processor - resolve unknown legacy entries through PipelineHelper.findStageId - ignore unmatched entries in relaxed mode with a warning - fail in strict mode with clearer import-specific messaging - add unit and importer test coverage for strict and relaxed scenarios
1 parent 506dfba commit 83021a8

10 files changed

Lines changed: 478 additions & 30 deletions

File tree

core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/metadata/Constants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ public final class Constants {
2828
public static final String MONGOCK_IMPORT_SKIP_PROPERTY_KEY = "internal.mongock.import.skip";
2929
public static final String MONGOCK_IMPORT_ORIGIN_PROPERTY_KEY = "internal.mongock.import.origin";
3030
public static final String MONGOCK_IMPORT_EMPTY_ORIGIN_ALLOWED_PROPERTY_KEY = "internal.mongock.import.emptyOriginAllowed";
31+
public static final String MONGOCK_IMPORT_IGNORE_UNKNOWN_AUDIT_ENTRIES_PROPERTY_KEY = "internal.mongock.import.ignoreUnknownAuditEntries";
3132

3233
private Constants() {}
3334

core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/pipeline/PipelineHelper.java

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,32 +16,28 @@
1616
package io.flamingock.internal.common.core.pipeline;
1717

1818
import io.flamingock.internal.common.core.audit.AuditEntry;
19-
import org.jetbrains.annotations.NotNull;
19+
20+
import java.util.Optional;
2021

2122
public class PipelineHelper {
2223

2324
public static final String SYSTEM_STAGE_ID = "flamingock-system-stage";
2425
public static final String LEGACY_STAGE_ID = "flamingock-legacy-stage";
2526

26-
private static final String errorTemplate = "importing change with id[%s] from database. It must be imported to a flamingock stage";
27-
2827
private final PipelineDescriptor pipelineDescriptor;
2928

3029
public PipelineHelper(PipelineDescriptor pipelineDescriptor) {
3130
this.pipelineDescriptor = pipelineDescriptor;
3231
}
3332

34-
public String getStageId(AuditEntry auditEntryFromOrigin) {
33+
public Optional<String> findStageId(AuditEntry auditEntryFromOrigin) {
3534
if (Boolean.TRUE.equals(auditEntryFromOrigin.getSystemChange())) {
36-
return LEGACY_STAGE_ID;
37-
} else {
38-
String changeIdInPipeline = getBaseChangeId(auditEntryFromOrigin);
39-
return pipelineDescriptor.getStageByChange(changeIdInPipeline).orElseThrow(() -> generateChangeIdException(changeIdInPipeline));
35+
return Optional.of(LEGACY_STAGE_ID);
4036
}
37+
String changeIdInPipeline = getBaseChangeId(auditEntryFromOrigin);
38+
return pipelineDescriptor.getStageByChange(changeIdInPipeline);
4139
}
4240

43-
44-
4541
public String getBaseChangeId(AuditEntry auditEntry) {
4642
String originalChangeId = auditEntry.getChangeId();
4743
int index = originalChangeId.indexOf("_before");
@@ -51,9 +47,4 @@ public String getBaseChangeId(AuditEntry auditEntry) {
5147
public String getStorableChangeId(AuditEntry auditEntry) {
5248
return auditEntry.getChangeId();
5349
}
54-
55-
@NotNull
56-
public IllegalArgumentException generateChangeIdException(String changeIdInPipeline) {
57-
return new IllegalArgumentException(String.format(errorTemplate, changeIdInPipeline));
58-
}
5950
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Copyright 2026 Flamingock (https://www.flamingock.io)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.flamingock.internal.common.core.pipeline;
17+
18+
import io.flamingock.api.RecoveryStrategy;
19+
import io.flamingock.internal.common.core.audit.AuditEntry;
20+
import io.flamingock.internal.common.core.change.ChangeDescriptor;
21+
import io.flamingock.internal.common.core.context.ContextInjectable;
22+
import org.junit.jupiter.api.Test;
23+
24+
import java.time.LocalDateTime;
25+
import java.util.Optional;
26+
27+
import static io.flamingock.internal.common.core.pipeline.PipelineHelper.LEGACY_STAGE_ID;
28+
import static org.junit.jupiter.api.Assertions.assertEquals;
29+
import static org.junit.jupiter.api.Assertions.assertFalse;
30+
import static org.junit.jupiter.api.Assertions.assertTrue;
31+
32+
class PipelineHelperTest {
33+
34+
private final PipelineHelper pipelineHelper = new PipelineHelper(new PipelineDescriptor() {
35+
@Override
36+
public Optional<? extends ChangeDescriptor> getLoadedChange(String changeId) {
37+
return Optional.empty();
38+
}
39+
40+
@Override
41+
public Optional<String> getStageByChange(String changeId) {
42+
return "known-change".equals(changeId) ? Optional.of("user-stage") : Optional.empty();
43+
}
44+
45+
@Override
46+
public void contributeToContext(ContextInjectable contextInjectable) {
47+
// no-op for unit test
48+
}
49+
});
50+
51+
@Test
52+
void shouldReturnLegacyStageForSystemChange() {
53+
Optional<String> stageId = pipelineHelper.findStageId(buildAuditEntry("system-change-1", true));
54+
55+
assertEquals(Optional.of(LEGACY_STAGE_ID), stageId);
56+
}
57+
58+
@Test
59+
void shouldReturnMatchingStageForKnownChange() {
60+
Optional<String> stageId = pipelineHelper.findStageId(buildAuditEntry("known-change", false));
61+
62+
assertEquals(Optional.of("user-stage"), stageId);
63+
}
64+
65+
@Test
66+
void shouldReturnEmptyForUnknownChange() {
67+
Optional<String> stageId = pipelineHelper.findStageId(buildAuditEntry("unknown-change", false));
68+
69+
assertFalse(stageId.isPresent());
70+
}
71+
72+
private static AuditEntry buildAuditEntry(String changeId, boolean systemChange) {
73+
return new AuditEntry(
74+
"exec-1",
75+
null,
76+
changeId,
77+
"author",
78+
LocalDateTime.now(),
79+
AuditEntry.Status.APPLIED,
80+
AuditEntry.ChangeType.MONGOCK_EXECUTION,
81+
"io.example.Change",
82+
"apply",
83+
null,
84+
10L,
85+
"host",
86+
null,
87+
systemChange,
88+
null,
89+
null,
90+
null,
91+
null,
92+
RecoveryStrategy.MANUAL_INTERVENTION,
93+
null
94+
);
95+
}
96+
}

legacy/mongock-importer-couchbase/src/test/java/io/flamingock/importer/mongock/couchbase/CouchbaseImporterTest.java

Lines changed: 110 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import io.flamingock.store.couchbase.CouchbaseAuditStore;
3131
import io.flamingock.internal.common.core.response.data.ErrorInfo;
3232
import io.flamingock.internal.common.couchbase.CouchbaseCollectionHelper;
33-
import io.flamingock.internal.core.builder.FlamingockFactory;
3433
import io.flamingock.internal.core.builder.runner.Runner;
3534
import io.flamingock.internal.core.operation.StagedExecuteOperationException;
3635
import io.flamingock.internal.util.constants.CommunityPersistenceConstants;
@@ -53,6 +52,7 @@
5352
import static io.flamingock.core.kit.audit.AuditEntryExpectation.APPLIED;
5453
import static io.flamingock.core.kit.audit.AuditEntryExpectation.STARTED;
5554
import static io.flamingock.internal.common.core.metadata.Constants.MONGOCK_IMPORT_EMPTY_ORIGIN_ALLOWED_PROPERTY_KEY;
55+
import static io.flamingock.internal.common.core.metadata.Constants.MONGOCK_IMPORT_IGNORE_UNKNOWN_AUDIT_ENTRIES_PROPERTY_KEY;
5656
import static io.flamingock.internal.common.core.metadata.Constants.MONGOCK_IMPORT_ORIGIN_PROPERTY_KEY;
5757
import static io.flamingock.internal.common.core.metadata.Constants.MONGOCK_IMPORT_SKIP_PROPERTY_KEY;
5858
import static io.flamingock.internal.util.constants.AuditEntryFieldConstants.KEY_CREATED_AT;
@@ -320,6 +320,108 @@ void GIVEN_allMongockChangeUnitsAlreadyExecutedAndCustomOriginProvided_WHEN_migr
320320

321321
}
322322

323+
@Test
324+
@DisplayName("GIVEN Mongock audit history contains unknown entries " +
325+
"AND relaxed import flag is not provided " +
326+
"WHEN migrating to Flamingock Community " +
327+
"THEN should fail with the current strict validation")
328+
void GIVEN_unknownAuditEntriesAndImplicitStrictMode_WHEN_migratingToFlamingockCommunity_THEN_shouldFail() {
329+
Collection originCollection = cluster.bucket(MONGOCK_BUCKET_NAME).scope(MONGOCK_SCOPE_NAME).collection(MONGOCK_COLLECTION_NAME);
330+
331+
originCollection.upsert("mongock-change-1", createAuditObject("mongock-change-1"));
332+
originCollection.upsert("foreign-change-1", createAuditObject("foreign-change-1", false, "io.example.foreign.ForeignChangeUnit", "apply"));
333+
334+
CouchbaseTargetSystem targetSystem = new CouchbaseTargetSystem("couchbase-target-system", cluster, FLAMINGOCK_BUCKET_NAME);
335+
336+
Runner flamingock = testKit.createBuilder()
337+
.setAuditStore(auditStore)
338+
.addTargetSystem(targetSystem)
339+
.build();
340+
341+
StagedExecuteOperationException ex = assertThrows(StagedExecuteOperationException.class, flamingock::run);
342+
assertEquals("Error importing audit entry with changeId[foreign-change-1]: no matching change was found in the current Flamingock pipeline.",
343+
firstFailedStageErrorMessage(ex));
344+
}
345+
346+
@Test
347+
@DisplayName("GIVEN Mongock audit history contains unknown entries " +
348+
"AND relaxed import flag is explicitly disabled " +
349+
"WHEN migrating to Flamingock Community " +
350+
"THEN should fail with the current strict validation")
351+
void GIVEN_unknownAuditEntriesAndExplicitStrictMode_WHEN_migratingToFlamingockCommunity_THEN_shouldFail() {
352+
Collection originCollection = cluster.bucket(MONGOCK_BUCKET_NAME).scope(MONGOCK_SCOPE_NAME).collection(MONGOCK_COLLECTION_NAME);
353+
354+
originCollection.upsert("mongock-change-1", createAuditObject("mongock-change-1"));
355+
originCollection.upsert("foreign-change-1", createAuditObject("foreign-change-1", false, "io.example.foreign.ForeignChangeUnit", "apply"));
356+
357+
CouchbaseTargetSystem targetSystem = new CouchbaseTargetSystem("couchbase-target-system", cluster, FLAMINGOCK_BUCKET_NAME);
358+
359+
Runner flamingock = testKit.createBuilder()
360+
.setAuditStore(auditStore)
361+
.addTargetSystem(targetSystem)
362+
.setProperty(MONGOCK_IMPORT_IGNORE_UNKNOWN_AUDIT_ENTRIES_PROPERTY_KEY, Boolean.FALSE.toString())
363+
.build();
364+
365+
StagedExecuteOperationException ex = assertThrows(StagedExecuteOperationException.class, flamingock::run);
366+
assertEquals("Error importing audit entry with changeId[foreign-change-1]: no matching change was found in the current Flamingock pipeline.",
367+
firstFailedStageErrorMessage(ex));
368+
}
369+
370+
@Test
371+
@DisplayName("GIVEN Mongock audit history contains unknown entries " +
372+
"AND relaxed import flag is enabled " +
373+
"WHEN migrating to Flamingock Community " +
374+
"THEN should skip the unknown entries and continue")
375+
void GIVEN_unknownAuditEntriesAndRelaxedMode_WHEN_migratingToFlamingockCommunity_THEN_shouldSkipUnknownEntries() {
376+
Collection originCollection = cluster.bucket(MONGOCK_BUCKET_NAME).scope(MONGOCK_SCOPE_NAME).collection(MONGOCK_COLLECTION_NAME);
377+
378+
originCollection.upsert("mongock-change-1", createAuditObject("mongock-change-1"));
379+
originCollection.upsert("foreign-change-1", createAuditObject("foreign-change-1", false, "io.example.foreign.ForeignChangeUnit", "apply"));
380+
381+
CouchbaseTargetSystem targetSystem = new CouchbaseTargetSystem("couchbase-target-system", cluster, FLAMINGOCK_BUCKET_NAME);
382+
383+
Runner flamingock = testKit.createBuilder()
384+
.setAuditStore(auditStore)
385+
.addTargetSystem(targetSystem)
386+
.setProperty(MONGOCK_IMPORT_IGNORE_UNKNOWN_AUDIT_ENTRIES_PROPERTY_KEY, Boolean.TRUE.toString())
387+
.build();
388+
389+
flamingock.run();
390+
391+
auditHelper.verifyAuditSequenceStrict(
392+
APPLIED("mongock-change-1"),
393+
STARTED("migration-mongock-to-flamingock-community"),
394+
APPLIED("migration-mongock-to-flamingock-community"),
395+
STARTED("mongock-change-2"),
396+
APPLIED("mongock-change-2"),
397+
STARTED("flamingock-change"),
398+
APPLIED("flamingock-change")
399+
);
400+
}
401+
402+
@Test
403+
@DisplayName("GIVEN relaxed import flag with invalid value " +
404+
"WHEN migrating to Flamingock Community " +
405+
"THEN should throw exception")
406+
void GIVEN_relaxedImportFlagWithInvalidValue_WHEN_migratingToFlamingockCommunity_THEN_shouldThrowException() {
407+
Collection originCollection = cluster.bucket(MONGOCK_BUCKET_NAME).scope(MONGOCK_SCOPE_NAME).collection(MONGOCK_COLLECTION_NAME);
408+
originCollection.upsert("foreign-change-1", createAuditObject("foreign-change-1", false, "io.example.foreign.ForeignChangeUnit", "apply"));
409+
410+
final String flagValue = "invalid_value";
411+
412+
CouchbaseTargetSystem targetSystem = new CouchbaseTargetSystem("couchbase-target-system", cluster, FLAMINGOCK_BUCKET_NAME);
413+
414+
Runner flamingock = testKit.createBuilder()
415+
.setAuditStore(auditStore)
416+
.addTargetSystem(targetSystem)
417+
.setProperty(MONGOCK_IMPORT_IGNORE_UNKNOWN_AUDIT_ENTRIES_PROPERTY_KEY, flagValue)
418+
.build();
419+
420+
StagedExecuteOperationException ex = assertThrows(StagedExecuteOperationException.class, flamingock::run);
421+
assertEquals("Invalid value for " + MONGOCK_IMPORT_IGNORE_UNKNOWN_AUDIT_ENTRIES_PROPERTY_KEY + ": " + flagValue
422+
+ " (expected \"true\" or \"false\" or empty)", firstFailedStageErrorMessage(ex));
423+
}
424+
323425
@Test
324426
@DisplayName("GIVEN skip import flag with invalid value " +
325427
"WHEN migrating to Flamingock Community" +
@@ -481,20 +583,24 @@ private List<JsonObject> getAuditLog() {
481583
}
482584

483585
private static JsonObject createAuditObject(String value) {
586+
return createAuditObject(value, true, "io.flamingock.changelog.Class1", "method1");
587+
}
588+
589+
private static JsonObject createAuditObject(String value, boolean systemChange, String changeLogClass, String changeSetMethod) {
484590
JsonObject doc = JsonObject.create()
485591
.put("executionId", "exec-1")
486592
.put("changeId", value)
487593
.put("author", "author1")
488594
.put("timestamp", Instant.now().toEpochMilli())
489595
.put("state", "EXECUTED")
490596
.put("type", "EXECUTION")
491-
.put("changeLogClass", "io.flamingock.changelog.Class1")
492-
.put("changeSetMethod", "method1")
597+
.put("changeLogClass", changeLogClass)
598+
.put("changeSetMethod", changeSetMethod)
493599
.putNull("metadata")
494600
.put("executionMillis", 123L)
495601
.put("executionHostName", "host1")
496602
.putNull("errorTrace")
497-
.put("systemChange", true)
603+
.put("systemChange", systemChange)
498604
.put("_doctype", "mongockChangeEntry");
499605
return doc;
500606
}

0 commit comments

Comments
 (0)