Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public final class Constants {
public static final String MONGOCK_IMPORT_SKIP_PROPERTY_KEY = "internal.mongock.import.skip";
public static final String MONGOCK_IMPORT_ORIGIN_PROPERTY_KEY = "internal.mongock.import.origin";
public static final String MONGOCK_IMPORT_EMPTY_ORIGIN_ALLOWED_PROPERTY_KEY = "internal.mongock.import.emptyOriginAllowed";
public static final String MONGOCK_IMPORT_IGNORE_UNKNOWN_AUDIT_ENTRIES_PROPERTY_KEY = "internal.mongock.import.ignoreUnknownAuditEntries";

private Constants() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,28 @@
package io.flamingock.internal.common.core.pipeline;

import io.flamingock.internal.common.core.audit.AuditEntry;
import org.jetbrains.annotations.NotNull;

import java.util.Optional;

public class PipelineHelper {

public static final String SYSTEM_STAGE_ID = "flamingock-system-stage";
public static final String LEGACY_STAGE_ID = "flamingock-legacy-stage";

private static final String errorTemplate = "importing change with id[%s] from database. It must be imported to a flamingock stage";

private final PipelineDescriptor pipelineDescriptor;

public PipelineHelper(PipelineDescriptor pipelineDescriptor) {
this.pipelineDescriptor = pipelineDescriptor;
}

public String getStageId(AuditEntry auditEntryFromOrigin) {
public Optional<String> findStageId(AuditEntry auditEntryFromOrigin) {
if (Boolean.TRUE.equals(auditEntryFromOrigin.getSystemChange())) {
return LEGACY_STAGE_ID;
} else {
String changeIdInPipeline = getBaseChangeId(auditEntryFromOrigin);
return pipelineDescriptor.getStageByChange(changeIdInPipeline).orElseThrow(() -> generateChangeIdException(changeIdInPipeline));
return Optional.of(LEGACY_STAGE_ID);
}
String changeIdInPipeline = getBaseChangeId(auditEntryFromOrigin);
return pipelineDescriptor.getStageByChange(changeIdInPipeline);
}



public String getBaseChangeId(AuditEntry auditEntry) {
String originalChangeId = auditEntry.getChangeId();
int index = originalChangeId.indexOf("_before");
Expand All @@ -51,9 +47,4 @@ public String getBaseChangeId(AuditEntry auditEntry) {
public String getStorableChangeId(AuditEntry auditEntry) {
return auditEntry.getChangeId();
}

@NotNull
public IllegalArgumentException generateChangeIdException(String changeIdInPipeline) {
return new IllegalArgumentException(String.format(errorTemplate, changeIdInPipeline));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2026 Flamingock (https://www.flamingock.io)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.flamingock.internal.common.core.pipeline;

import io.flamingock.api.RecoveryStrategy;
import io.flamingock.internal.common.core.audit.AuditEntry;
import io.flamingock.internal.common.core.change.ChangeDescriptor;
import io.flamingock.internal.common.core.context.ContextInjectable;
import org.junit.jupiter.api.Test;

import java.time.LocalDateTime;
import java.util.Optional;

import static io.flamingock.internal.common.core.pipeline.PipelineHelper.LEGACY_STAGE_ID;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

class PipelineHelperTest {

private final PipelineHelper pipelineHelper = new PipelineHelper(new PipelineDescriptor() {
@Override
public Optional<? extends ChangeDescriptor> getLoadedChange(String changeId) {
return Optional.empty();
}

@Override
public Optional<String> getStageByChange(String changeId) {
return "known-change".equals(changeId) ? Optional.of("user-stage") : Optional.empty();
}

@Override
public void contributeToContext(ContextInjectable contextInjectable) {
// no-op for unit test
}
});

@Test
void shouldReturnLegacyStageForSystemChange() {
Optional<String> stageId = pipelineHelper.findStageId(buildAuditEntry("system-change-1", true));

assertEquals(Optional.of(LEGACY_STAGE_ID), stageId);
}

@Test
void shouldReturnMatchingStageForKnownChange() {
Optional<String> stageId = pipelineHelper.findStageId(buildAuditEntry("known-change", false));

assertEquals(Optional.of("user-stage"), stageId);
}

@Test
void shouldReturnEmptyForUnknownChange() {
Optional<String> stageId = pipelineHelper.findStageId(buildAuditEntry("unknown-change", false));

assertFalse(stageId.isPresent());
}

private static AuditEntry buildAuditEntry(String changeId, boolean systemChange) {
return new AuditEntry(
"exec-1",
null,
changeId,
"author",
LocalDateTime.now(),
AuditEntry.Status.APPLIED,
AuditEntry.ChangeType.MONGOCK_EXECUTION,
"io.example.Change",
"apply",
null,
10L,
"host",
null,
systemChange,
null,
null,
null,
null,
RecoveryStrategy.MANUAL_INTERVENTION,
null
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import io.flamingock.store.couchbase.CouchbaseAuditStore;
import io.flamingock.internal.common.core.response.data.ErrorInfo;
import io.flamingock.internal.common.couchbase.CouchbaseCollectionHelper;
import io.flamingock.internal.core.builder.FlamingockFactory;
import io.flamingock.internal.core.builder.runner.Runner;
import io.flamingock.internal.core.operation.StagedExecuteOperationException;
import io.flamingock.internal.util.constants.CommunityPersistenceConstants;
Expand All @@ -53,6 +52,7 @@
import static io.flamingock.core.kit.audit.AuditEntryExpectation.APPLIED;
import static io.flamingock.core.kit.audit.AuditEntryExpectation.STARTED;
import static io.flamingock.internal.common.core.metadata.Constants.MONGOCK_IMPORT_EMPTY_ORIGIN_ALLOWED_PROPERTY_KEY;
import static io.flamingock.internal.common.core.metadata.Constants.MONGOCK_IMPORT_IGNORE_UNKNOWN_AUDIT_ENTRIES_PROPERTY_KEY;
import static io.flamingock.internal.common.core.metadata.Constants.MONGOCK_IMPORT_ORIGIN_PROPERTY_KEY;
import static io.flamingock.internal.common.core.metadata.Constants.MONGOCK_IMPORT_SKIP_PROPERTY_KEY;
import static io.flamingock.internal.util.constants.AuditEntryFieldConstants.KEY_CREATED_AT;
Expand Down Expand Up @@ -320,6 +320,108 @@ void GIVEN_allMongockChangeUnitsAlreadyExecutedAndCustomOriginProvided_WHEN_migr

}

@Test
@DisplayName("GIVEN Mongock audit history contains unknown entries " +
"AND relaxed import flag is not provided " +
"WHEN migrating to Flamingock Community " +
"THEN should fail with the current strict validation")
void GIVEN_unknownAuditEntriesAndImplicitStrictMode_WHEN_migratingToFlamingockCommunity_THEN_shouldFail() {
Collection originCollection = cluster.bucket(MONGOCK_BUCKET_NAME).scope(MONGOCK_SCOPE_NAME).collection(MONGOCK_COLLECTION_NAME);

originCollection.upsert("mongock-change-1", createAuditObject("mongock-change-1"));
originCollection.upsert("foreign-change-1", createAuditObject("foreign-change-1", false, "io.example.foreign.ForeignChangeUnit", "apply"));

CouchbaseTargetSystem targetSystem = new CouchbaseTargetSystem("couchbase-target-system", cluster, FLAMINGOCK_BUCKET_NAME);

Runner flamingock = testKit.createBuilder()
.setAuditStore(auditStore)
.addTargetSystem(targetSystem)
.build();

StagedExecuteOperationException ex = assertThrows(StagedExecuteOperationException.class, flamingock::run);
assertEquals("Error importing audit entry with changeId[foreign-change-1]: no matching change was found in the current Flamingock pipeline.",
firstFailedStageErrorMessage(ex));
}

@Test
@DisplayName("GIVEN Mongock audit history contains unknown entries " +
"AND relaxed import flag is explicitly disabled " +
"WHEN migrating to Flamingock Community " +
"THEN should fail with the current strict validation")
void GIVEN_unknownAuditEntriesAndExplicitStrictMode_WHEN_migratingToFlamingockCommunity_THEN_shouldFail() {
Collection originCollection = cluster.bucket(MONGOCK_BUCKET_NAME).scope(MONGOCK_SCOPE_NAME).collection(MONGOCK_COLLECTION_NAME);

originCollection.upsert("mongock-change-1", createAuditObject("mongock-change-1"));
originCollection.upsert("foreign-change-1", createAuditObject("foreign-change-1", false, "io.example.foreign.ForeignChangeUnit", "apply"));

CouchbaseTargetSystem targetSystem = new CouchbaseTargetSystem("couchbase-target-system", cluster, FLAMINGOCK_BUCKET_NAME);

Runner flamingock = testKit.createBuilder()
.setAuditStore(auditStore)
.addTargetSystem(targetSystem)
.setProperty(MONGOCK_IMPORT_IGNORE_UNKNOWN_AUDIT_ENTRIES_PROPERTY_KEY, Boolean.FALSE.toString())
.build();

StagedExecuteOperationException ex = assertThrows(StagedExecuteOperationException.class, flamingock::run);
assertEquals("Error importing audit entry with changeId[foreign-change-1]: no matching change was found in the current Flamingock pipeline.",
firstFailedStageErrorMessage(ex));
}

@Test
@DisplayName("GIVEN Mongock audit history contains unknown entries " +
"AND relaxed import flag is enabled " +
"WHEN migrating to Flamingock Community " +
"THEN should skip the unknown entries and continue")
void GIVEN_unknownAuditEntriesAndRelaxedMode_WHEN_migratingToFlamingockCommunity_THEN_shouldSkipUnknownEntries() {
Collection originCollection = cluster.bucket(MONGOCK_BUCKET_NAME).scope(MONGOCK_SCOPE_NAME).collection(MONGOCK_COLLECTION_NAME);

originCollection.upsert("mongock-change-1", createAuditObject("mongock-change-1"));
originCollection.upsert("foreign-change-1", createAuditObject("foreign-change-1", false, "io.example.foreign.ForeignChangeUnit", "apply"));

CouchbaseTargetSystem targetSystem = new CouchbaseTargetSystem("couchbase-target-system", cluster, FLAMINGOCK_BUCKET_NAME);

Runner flamingock = testKit.createBuilder()
.setAuditStore(auditStore)
.addTargetSystem(targetSystem)
.setProperty(MONGOCK_IMPORT_IGNORE_UNKNOWN_AUDIT_ENTRIES_PROPERTY_KEY, Boolean.TRUE.toString())
.build();

flamingock.run();

auditHelper.verifyAuditSequenceStrict(
APPLIED("mongock-change-1"),
STARTED("migration-mongock-to-flamingock-community"),
APPLIED("migration-mongock-to-flamingock-community"),
STARTED("mongock-change-2"),
APPLIED("mongock-change-2"),
STARTED("flamingock-change"),
APPLIED("flamingock-change")
);
}

@Test
@DisplayName("GIVEN relaxed import flag with invalid value " +
"WHEN migrating to Flamingock Community " +
"THEN should throw exception")
void GIVEN_relaxedImportFlagWithInvalidValue_WHEN_migratingToFlamingockCommunity_THEN_shouldThrowException() {
Collection originCollection = cluster.bucket(MONGOCK_BUCKET_NAME).scope(MONGOCK_SCOPE_NAME).collection(MONGOCK_COLLECTION_NAME);
originCollection.upsert("foreign-change-1", createAuditObject("foreign-change-1", false, "io.example.foreign.ForeignChangeUnit", "apply"));

final String flagValue = "invalid_value";

CouchbaseTargetSystem targetSystem = new CouchbaseTargetSystem("couchbase-target-system", cluster, FLAMINGOCK_BUCKET_NAME);

Runner flamingock = testKit.createBuilder()
.setAuditStore(auditStore)
.addTargetSystem(targetSystem)
.setProperty(MONGOCK_IMPORT_IGNORE_UNKNOWN_AUDIT_ENTRIES_PROPERTY_KEY, flagValue)
.build();

StagedExecuteOperationException ex = assertThrows(StagedExecuteOperationException.class, flamingock::run);
assertEquals("Invalid value for " + MONGOCK_IMPORT_IGNORE_UNKNOWN_AUDIT_ENTRIES_PROPERTY_KEY + ": " + flagValue
+ " (expected \"true\" or \"false\" or empty)", firstFailedStageErrorMessage(ex));
}

@Test
@DisplayName("GIVEN skip import flag with invalid value " +
"WHEN migrating to Flamingock Community" +
Expand Down Expand Up @@ -481,20 +583,24 @@ private List<JsonObject> getAuditLog() {
}

private static JsonObject createAuditObject(String value) {
return createAuditObject(value, true, "io.flamingock.changelog.Class1", "method1");
}

private static JsonObject createAuditObject(String value, boolean systemChange, String changeLogClass, String changeSetMethod) {
JsonObject doc = JsonObject.create()
.put("executionId", "exec-1")
.put("changeId", value)
.put("author", "author1")
.put("timestamp", Instant.now().toEpochMilli())
.put("state", "EXECUTED")
.put("type", "EXECUTION")
.put("changeLogClass", "io.flamingock.changelog.Class1")
.put("changeSetMethod", "method1")
.put("changeLogClass", changeLogClass)
.put("changeSetMethod", changeSetMethod)
.putNull("metadata")
.put("executionMillis", 123L)
.put("executionHostName", "host1")
.putNull("errorTrace")
.put("systemChange", true)
.put("systemChange", systemChange)
.put("_doctype", "mongockChangeEntry");
return doc;
}
Expand Down
Loading
Loading