diff --git a/cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/CloudAuditStoreImpl.java b/cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/CloudAuditStoreImpl.java index 8825d5696..6f5eccd1f 100644 --- a/cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/CloudAuditStoreImpl.java +++ b/cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/CloudAuditStoreImpl.java @@ -36,6 +36,9 @@ import io.flamingock.cloud.planner.client.ExecutionPlannerClient; import io.flamingock.cloud.planner.client.HttpExecutionPlannerClient; import io.flamingock.internal.core.external.store.audit.LifecycleAuditWriter; +import io.flamingock.internal.core.external.targets.TransactionalTargetSystem; +import io.flamingock.internal.core.external.targets.TargetSystemManager; +import io.flamingock.internal.core.external.targets.mark.TargetSystemAuditMarker; import io.flamingock.internal.core.plan.ExecutionPlanner; import io.flamingock.internal.common.core.context.ContextResolver; import org.apache.http.impl.client.HttpClients; @@ -44,6 +47,8 @@ import org.slf4j.Logger; import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; public class CloudAuditStoreImpl implements CloudAuditStore { private static final Logger logger = FlamingockLoggerFactory.getLogger("CloudAuditStore"); @@ -61,6 +66,12 @@ public void initialize(ContextResolver baseContext) { CoreConfigurable coreConfiguration = baseContext.getRequiredDependencyValue(CoreConfigurable.class); CloudConfigurable cloudConfiguration = baseContext.getRequiredDependencyValue(CloudConfigurable.class); + TargetSystemManager targetSystemManager = baseContext.getRequiredDependencyValue(TargetSystemManager.class); + List auditMarkers = targetSystemManager.getTransactionalTargetSystems().stream() + .filter(TransactionalTargetSystem::hasMarker) + .map(TransactionalTargetSystem::getAuditMarker) + .collect(Collectors.toList()); + Http.RequestBuilderFactory requestBuilderFactory = Http.builderFactory(HttpClients.createDefault(), JsonObjectMapper.DEFAULT_INSTANCE); @@ -69,7 +80,8 @@ public void initialize(ContextResolver baseContext) { runnerId, coreConfiguration, cloudConfiguration, - requestBuilderFactory + requestBuilderFactory, + auditMarkers ); } } @@ -83,7 +95,8 @@ public CloudAuditPersistenceImpl getPersistence() { private CloudAuditPersistenceImpl buildPersistence(RunnerId runnerId, CoreConfigurable coreConfiguration, CloudConfigurable cloudConfiguration, - Http.RequestBuilderFactory requestBuilderFactory) { + Http.RequestBuilderFactory requestBuilderFactory, + List auditMarkers) { AuthManager authManager = new AuthManager( cloudConfiguration.getApiToken(), cloudConfiguration.getServiceName(), @@ -111,7 +124,8 @@ private CloudAuditPersistenceImpl buildPersistence(RunnerId runnerId, requestBuilderFactory, authManager, environmentId, - serviceId); + serviceId, + auditMarkers); return new CloudAuditPersistenceImpl( environmentId, @@ -139,7 +153,8 @@ private ExecutionPlanner getExecutionPlanner(RunnerId runnerId, Http.RequestBuilderFactory requestBuilderFactory, AuthManager authManager, EnvironmentId environmentId, - ServiceId serviceId) { + ServiceId serviceId, + List auditMarkers) { LockServiceClient lockClient = new HttpLockServiceClient( cloudConfiguration.getHost(), cloudConfiguration.getApiVersion(), @@ -162,7 +177,7 @@ private ExecutionPlanner getExecutionPlanner(RunnerId runnerId, executionPlannerClient, coreConfiguration, new CloudLockService(lockClient), - null, + auditMarkers, TimeService.getDefault() ); } diff --git a/cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/planner/CloudExecutionPlanner.java b/cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/planner/CloudExecutionPlanner.java index dc3d3805e..3db6b3cbe 100644 --- a/cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/planner/CloudExecutionPlanner.java +++ b/cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/planner/CloudExecutionPlanner.java @@ -40,6 +40,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; public class CloudExecutionPlanner extends ExecutionPlanner { @@ -56,19 +57,19 @@ public class CloudExecutionPlanner extends ExecutionPlanner { private final ExecutionPlannerClient client; - private final TargetSystemAuditMarker ongoingStatusRepository; + private final List auditMarkers; public CloudExecutionPlanner(RunnerId runnerId, ExecutionPlannerClient client, CoreConfigurable coreConfiguration, CloudLockService lockService, - TargetSystemAuditMarker ongoingStatusRepository, + List auditMarkers, TimeService timeService) { this.client = client; this.runnerId = runnerId; this.coreConfiguration = coreConfiguration; this.lockService = lockService; - this.ongoingStatusRepository = ongoingStatusRepository; + this.auditMarkers = auditMarkers; this.timeService = timeService; } @@ -135,7 +136,7 @@ public ExecutionPlan getNextExecution(List loadedStages) th private ExecutionPlanResponse createExecution(List loadedStages, String lastAcquisitionId, long elapsedMillis) { - Map auditMarks = getOngoingStatuses() + Map auditMarks = getAuditMarkers() .stream() .collect(Collectors.toMap(TargetSystemAuditMark::getChangeId, TargetSystemAuditMark::getOperation)); @@ -149,8 +150,14 @@ private ExecutionPlanResponse createExecution(List loadedSt return responsePlan; } - private Collection getOngoingStatuses() { - return ongoingStatusRepository != null ? ongoingStatusRepository.listAll() : Collections.emptySet(); + private Collection getAuditMarkers() { + if (auditMarkers == null || auditMarkers.isEmpty()) { + return Collections.emptySet(); + } + return auditMarkers.stream() + .map(TargetSystemAuditMarker::listAll) + .flatMap(Set::stream) + .collect(Collectors.toSet()); } private ExecutionPlan buildNextExecutionPlan(List loadedStages, ExecutionPlanResponse response) { diff --git a/cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlanMapperTest.java b/cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlanMapperTest.java index 60705f574..05bb2a157 100644 --- a/cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlanMapperTest.java +++ b/cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlanMapperTest.java @@ -36,7 +36,13 @@ import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; +import io.flamingock.cloud.api.request.ExecutionPlanRequest; +import io.flamingock.cloud.api.request.ChangeRequest; +import io.flamingock.cloud.api.vo.CloudTargetSystemAuditMarkType; +import io.flamingock.internal.common.core.targets.TargetSystemAuditMarkType; + import java.util.Arrays; +import java.util.HashMap; import java.util.Collections; import java.util.List; import java.util.Map; @@ -188,6 +194,23 @@ void shouldBuildAbortPlanFromAbortResponse() { assertEquals(ChangeAction.APPLY, actions.get(change2.getId())); } + @Test + @DisplayName("Should map ongoing status from audit marks to ChangeRequests in toRequest()") + void shouldMapOngoingStatusFromAuditMarksToChangeRequests() { + List loadedStages = Arrays.asList(buildStage("stage-1", change1, change2)); + + HashMap ongoingStatusesMap = new HashMap<>(); + ongoingStatusesMap.put(change1.getId(), TargetSystemAuditMarkType.APPLIED); + + ExecutionPlanRequest request = CloudExecutionPlanMapper.toRequest(loadedStages, 60000L, ongoingStatusesMap); + + Map marksByChangeId = request.getClientSubmission().getStages().get(0).getChanges().stream() + .collect(Collectors.toMap(ChangeRequest::getId, ChangeRequest::getOngoingStatus)); + + assertEquals(CloudTargetSystemAuditMarkType.APPLIED, marksByChangeId.get(change1.getId())); + assertEquals(CloudTargetSystemAuditMarkType.NONE, marksByChangeId.get(change2.getId())); + } + private static DefaultLoadedStage buildStage(String name, AbstractLoadedChange... changes) { return new DefaultLoadedStage(name, StageType.DEFAULT, Arrays.asList(changes)); } diff --git a/cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlannerTest.java b/cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlannerTest.java index bf3e29fa3..309a043f5 100644 --- a/cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlannerTest.java +++ b/cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlannerTest.java @@ -16,17 +16,22 @@ package io.flamingock.cloud.planner; import io.flamingock.api.StageType; +import io.flamingock.cloud.api.request.ExecutionPlanRequest; +import io.flamingock.cloud.api.request.ChangeRequest; import io.flamingock.cloud.api.response.ChangeResponse; import io.flamingock.cloud.api.response.ExecutionPlanResponse; import io.flamingock.cloud.api.response.StageResponse; import io.flamingock.cloud.api.vo.CloudChangeAction; import io.flamingock.cloud.api.vo.CloudExecutionAction; +import io.flamingock.cloud.api.vo.CloudTargetSystemAuditMarkType; import io.flamingock.cloud.lock.CloudLockService; import io.flamingock.cloud.planner.client.ExecutionPlannerClient; import io.flamingock.internal.common.core.recovery.ManualInterventionRequiredException; +import io.flamingock.internal.common.core.targets.TargetSystemAuditMarkType; import io.flamingock.internal.core.change.loaded.AbstractLoadedChange; import io.flamingock.internal.core.change.loaded.LoadedChangeBuilder; import io.flamingock.internal.core.configuration.core.CoreConfigurable; +import io.flamingock.internal.core.external.targets.mark.TargetSystemAuditMark; import io.flamingock.internal.core.external.targets.mark.TargetSystemAuditMarker; import io.flamingock.internal.core.plan.ExecutionPlan; import io.flamingock.internal.core.pipeline.loaded.stage.AbstractLoadedStage; @@ -34,51 +39,57 @@ import io.flamingock.internal.util.TimeService; import io.flamingock.internal.util.id.RunnerId; import io.flamingock.core.cloud.changes._001__CloudChange1; +import io.flamingock.core.cloud.changes._002__CloudChange2; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.*; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; class CloudExecutionPlannerTest { private static AbstractLoadedChange change1; + private static AbstractLoadedChange change2; private ExecutionPlannerClient client; - private CloudExecutionPlanner planner; + private CoreConfigurable config; @BeforeAll static void setupChanges() { change1 = LoadedChangeBuilder.getCodeBuilderInstance(_001__CloudChange1.class).build(); + change2 = LoadedChangeBuilder.getCodeBuilderInstance(_002__CloudChange2.class).build(); } @BeforeEach void setup() { client = mock(ExecutionPlannerClient.class); - CoreConfigurable config = mock(CoreConfigurable.class); + config = mock(CoreConfigurable.class); when(config.getLockAcquiredForMillis()).thenReturn(60000L); when(config.getLockQuitTryingAfterMillis()).thenReturn(30000L); when(config.getLockTryFrequencyMillis()).thenReturn(1000L); + } - TargetSystemAuditMarker auditMarker = mock(TargetSystemAuditMarker.class); - when(auditMarker.listAll()).thenReturn(Collections.emptySet()); - - planner = new CloudExecutionPlanner( + private CloudExecutionPlanner buildPlanner(List auditMarkers) { + return new CloudExecutionPlanner( RunnerId.fromString("test-runner"), client, config, mock(CloudLockService.class), - auditMarker, + auditMarkers, TimeService.getDefault() ); } @@ -86,10 +97,10 @@ void setup() { @Test @DisplayName("Should return ABORT plan when server returns ABORT with MANUAL_INTERVENTION changes") void shouldReturnAbortPlanWhenServerReturnsAbort() { + CloudExecutionPlanner planner = buildPlanner(Collections.emptyList()); + ExecutionPlanResponse response = new ExecutionPlanResponse( - CloudExecutionAction.ABORT, - "exec-1", - null, + CloudExecutionAction.ABORT, "exec-1", null, Collections.singletonList(new StageResponse("stage-1", 0, Collections.singletonList(new ChangeResponse(change1.getId(), CloudChangeAction.MANUAL_INTERVENTION)))) ); @@ -101,17 +112,16 @@ void shouldReturnAbortPlanWhenServerReturnsAbort() { ExecutionPlan plan = planner.getNextExecution(stages); assertTrue(plan.isAborted()); - assertFalse(plan.isExecutionRequired()); assertThrows(ManualInterventionRequiredException.class, plan::validate); } @Test @DisplayName("Should return ABORT plan that throws FlamingockException when server returns ABORT but no MI changes") void shouldReturnAbortPlanWhenServerReturnsAbortWithNoMIChanges() { + CloudExecutionPlanner planner = buildPlanner(Collections.emptyList()); + ExecutionPlanResponse response = new ExecutionPlanResponse( - CloudExecutionAction.ABORT, - "exec-1", - null, + CloudExecutionAction.ABORT, "exec-1", null, Collections.singletonList(new StageResponse("stage-1", 0, Collections.singletonList(new ChangeResponse(change1.getId(), CloudChangeAction.APPLY)))) ); @@ -125,4 +135,67 @@ void shouldReturnAbortPlanWhenServerReturnsAbortWithNoMIChanges() { assertTrue(plan.isAborted()); assertThrows(io.flamingock.internal.common.core.error.FlamingockException.class, plan::validate); } + + @Test + @DisplayName("Should include audit marks from multiple target systems in the execution request") + void shouldIncludeAuditMarksInExecutionRequest() { + TargetSystemAuditMarker marker1 = mock(TargetSystemAuditMarker.class); + when(marker1.listAll()).thenReturn(new HashSet<>(Collections.singletonList( + new TargetSystemAuditMark(change1.getId(), TargetSystemAuditMarkType.APPLIED) + ))); + + TargetSystemAuditMarker marker2 = mock(TargetSystemAuditMarker.class); + when(marker2.listAll()).thenReturn(new HashSet<>(Collections.singletonList( + new TargetSystemAuditMark(change2.getId(), TargetSystemAuditMarkType.ROLLED_BACK) + ))); + + CloudExecutionPlanner planner = buildPlanner(Arrays.asList(marker1, marker2)); + + ExecutionPlanResponse response = new ExecutionPlanResponse( + CloudExecutionAction.CONTINUE, "exec-1", null, + Collections.singletonList(new StageResponse("stage-1", 0, Arrays.asList( + new ChangeResponse(change1.getId(), CloudChangeAction.SKIP), + new ChangeResponse(change2.getId(), CloudChangeAction.APPLY)))) + ); + when(client.createExecution(any(), any(), anyLong())).thenReturn(response); + + List stages = Collections.singletonList( + new DefaultLoadedStage("stage-1", StageType.DEFAULT, Arrays.asList(change1, change2))); + + planner.getNextExecution(stages); + + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(ExecutionPlanRequest.class); + verify(client).createExecution(requestCaptor.capture(), any(), anyLong()); + + ExecutionPlanRequest request = requestCaptor.getValue(); + Map marksByChangeId = request.getClientSubmission().getStages().get(0).getChanges().stream() + .collect(Collectors.toMap(ChangeRequest::getId, ChangeRequest::getOngoingStatus)); + + assertEquals(CloudTargetSystemAuditMarkType.APPLIED, marksByChangeId.get(change1.getId())); + assertEquals(CloudTargetSystemAuditMarkType.ROLLED_BACK, marksByChangeId.get(change2.getId())); + } + + @Test + @DisplayName("Should send NONE status when no audit marks exist for a change") + void shouldSendNoneStatusWhenNoMarks() { + CloudExecutionPlanner planner = buildPlanner(Collections.emptyList()); + + ExecutionPlanResponse response = new ExecutionPlanResponse( + CloudExecutionAction.CONTINUE, "exec-1", null, + Collections.singletonList(new StageResponse("stage-1", 0, + Collections.singletonList(new ChangeResponse(change1.getId(), CloudChangeAction.SKIP)))) + ); + when(client.createExecution(any(), any(), anyLong())).thenReturn(response); + + List stages = Collections.singletonList( + new DefaultLoadedStage("stage-1", StageType.DEFAULT, Collections.singletonList(change1))); + + planner.getNextExecution(stages); + + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(ExecutionPlanRequest.class); + verify(client).createExecution(requestCaptor.capture(), any(), anyLong()); + + ChangeRequest changeRequest = requestCaptor.getValue().getClientSubmission().getStages().get(0).getChanges().get(0); + assertEquals(CloudTargetSystemAuditMarkType.NONE, changeRequest.getOngoingStatus()); + } } diff --git a/cloud/flamingock-cloud/src/test/java/io/flamingock/core/cloud/CloudTransactionTest.java b/cloud/flamingock-cloud/src/test/java/io/flamingock/core/cloud/CloudTransactionTest.java index ce6df1122..a66e0acd7 100644 --- a/cloud/flamingock-cloud/src/test/java/io/flamingock/core/cloud/CloudTransactionTest.java +++ b/cloud/flamingock-cloud/src/test/java/io/flamingock/core/cloud/CloudTransactionTest.java @@ -39,7 +39,6 @@ import java.util.LinkedList; import java.util.List; -import java.util.UUID; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.verify; @@ -142,12 +141,12 @@ void happyPath() { runner.execute(); //THEN - verify(cloudTargetSystem.getOnGoingChangeStatusRepository(), new Times(2)).listAll(); - verify(cloudTargetSystem.getOnGoingChangeStatusRepository(), new Times(1)).mark(new TargetSystemAuditMark("create-persons-table-from-template", TargetSystemAuditMarkType.APPLIED)); + verify(cloudTargetSystem.getAuditMarker(), new Times(2)).listAll(); + verify(cloudTargetSystem.getAuditMarker(), new Times(1)).mark(new TargetSystemAuditMark("create-persons-table-from-template", TargetSystemAuditMarkType.APPLIED)); ArgumentCaptor changeIdValuesCaptor = ArgumentCaptor.forClass(String.class); - verify(cloudTargetSystem.getOnGoingChangeStatusRepository(), new Times(1)).mark(new TargetSystemAuditMark("create-persons-table-from-template-2", TargetSystemAuditMarkType.APPLIED)); - verify(cloudTargetSystem.getOnGoingChangeStatusRepository(), new Times(2)).clearMark(changeIdValuesCaptor.capture()); + verify(cloudTargetSystem.getAuditMarker(), new Times(1)).mark(new TargetSystemAuditMark("create-persons-table-from-template-2", TargetSystemAuditMarkType.APPLIED)); + verify(cloudTargetSystem.getAuditMarker(), new Times(2)).clearMark(changeIdValuesCaptor.capture()); List allValues = changeIdValuesCaptor.getAllValues(); Assertions.assertEquals("create-persons-table-from-template", allValues.get(0)); diff --git a/cloud/flamingock-cloud/src/test/java/io/flamingock/core/cloud/utils/TestCloudTargetSystem.java b/cloud/flamingock-cloud/src/test/java/io/flamingock/core/cloud/utils/TestCloudTargetSystem.java index 656cd63e8..f478e8aa5 100644 --- a/cloud/flamingock-cloud/src/test/java/io/flamingock/core/cloud/utils/TestCloudTargetSystem.java +++ b/cloud/flamingock-cloud/src/test/java/io/flamingock/core/cloud/utils/TestCloudTargetSystem.java @@ -40,7 +40,7 @@ public TestCloudTargetSystem(String id, TargetSystemAuditMark... statuses) { this.txWrapper = Mockito.spy(new TestCloudTxWrapper()); } - public TargetSystemAuditMarker getOnGoingChangeStatusRepository() { + public TargetSystemAuditMarker getAuditMarker() { return ongoignRepo; } diff --git a/core/flamingock-core/src/main/java/io/flamingock/internal/core/external/targets/TargetSystemManager.java b/core/flamingock-core/src/main/java/io/flamingock/internal/core/external/targets/TargetSystemManager.java index d3ca3a764..a1e472cda 100644 --- a/core/flamingock-core/src/main/java/io/flamingock/internal/core/external/targets/TargetSystemManager.java +++ b/core/flamingock-core/src/main/java/io/flamingock/internal/core/external/targets/TargetSystemManager.java @@ -28,7 +28,9 @@ import javax.annotation.concurrent.NotThreadSafe; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** * Internal registry for managing {@link TargetSystem} instances. @@ -107,6 +109,16 @@ public TargetSystemOps getTargetSystem(String id) { } + /** + * Returns all registered {@link TransactionalTargetSystem} instances. + */ + public List> getTransactionalTargetSystems() { + return targetSystemMap.values().stream() + .filter(ts -> ts instanceof TransactionalTargetSystem) + .map(ts -> (TransactionalTargetSystem) ts) + .collect(Collectors.toList()); + } + /** * Validates that the target system is non-null and has a valid ID. * diff --git a/core/flamingock-core/src/main/java/io/flamingock/internal/core/external/targets/TransactionalTargetSystem.java b/core/flamingock-core/src/main/java/io/flamingock/internal/core/external/targets/TransactionalTargetSystem.java index fae2195bc..14212d795 100644 --- a/core/flamingock-core/src/main/java/io/flamingock/internal/core/external/targets/TransactionalTargetSystem.java +++ b/core/flamingock-core/src/main/java/io/flamingock/internal/core/external/targets/TransactionalTargetSystem.java @@ -22,7 +22,6 @@ import io.flamingock.internal.core.external.targets.mark.NoOpTargetSystemAuditMarker; import io.flamingock.internal.core.external.targets.mark.TargetSystemAuditMarker; import io.flamingock.internal.core.transaction.TransactionWrapper; -import io.flamingock.internal.util.constants.CommunityPersistenceConstants; import java.util.Optional; import java.util.function.Function; @@ -45,16 +44,15 @@ public abstract class TransactionalTargetSystem implements ContextInitializable { - protected String onGoingChangesRepositoryName = CommunityPersistenceConstants.DEFAULT_MARKER_STORE_NAME; protected boolean autoCreate = true; - protected TargetSystemAuditMarker markerRepository; + protected TargetSystemAuditMarker auditMarker; public TransactionalTargetSystem(String id) { super(id); } public boolean hasMarker() { - TargetSystemAuditMarker onGoingChangeStatusRepository = getOnGoingChangeStatusRepository(); + TargetSystemAuditMarker onGoingChangeStatusRepository = getAuditMarker(); return onGoingChangeStatusRepository != null && !(onGoingChangeStatusRepository instanceof NoOpTargetSystemAuditMarker); } @@ -87,8 +85,8 @@ public final T applyChangeTransactional(Function change * * @return the audit marker instance */ - public TargetSystemAuditMarker getOnGoingChangeStatusRepository() { - return markerRepository; + public TargetSystemAuditMarker getAuditMarker() { + return auditMarker; } /** diff --git a/core/flamingock-core/src/main/java/io/flamingock/internal/core/external/targets/mark/TargetSystemAuditMarker.java b/core/flamingock-core/src/main/java/io/flamingock/internal/core/external/targets/mark/TargetSystemAuditMarker.java index 9b857d4ec..953634992 100644 --- a/core/flamingock-core/src/main/java/io/flamingock/internal/core/external/targets/mark/TargetSystemAuditMarker.java +++ b/core/flamingock-core/src/main/java/io/flamingock/internal/core/external/targets/mark/TargetSystemAuditMarker.java @@ -67,7 +67,9 @@ public interface TargetSystemAuditMarker { * Creates or updates a local audit mark. *

* Implementations should ensure this operation is idempotent. The write operation must - * participate in the same transaction as the Target System operation that is being confirmed. + * participate in the same transaction as the Target System operation that is being confirmed. THis is achieved + * by using the same Tx sessionId, which is supposed to be the changeId, encapsulated in the + * ExecutionRuntime.getSessionId() when the Target System operation performed. * * @param auditMark the mark to persist. * @throws FlamingockException if the operation fails (e.g., storage unavailable). diff --git a/core/flamingock-core/src/main/java/io/flamingock/internal/core/external/targets/operations/TransactionalTargetSystemOpsImpl.java b/core/flamingock-core/src/main/java/io/flamingock/internal/core/external/targets/operations/TransactionalTargetSystemOpsImpl.java index 6b239518f..1d8e5bc02 100644 --- a/core/flamingock-core/src/main/java/io/flamingock/internal/core/external/targets/operations/TransactionalTargetSystemOpsImpl.java +++ b/core/flamingock-core/src/main/java/io/flamingock/internal/core/external/targets/operations/TransactionalTargetSystemOpsImpl.java @@ -65,17 +65,17 @@ public String getId() { @Override public Set listAll() { - return targetSystem.getOnGoingChangeStatusRepository().listAll(); + return targetSystem.getAuditMarker().listAll(); } @Override public void clearMark(String changeId) { - targetSystem.getOnGoingChangeStatusRepository().clearMark(changeId); + targetSystem.getAuditMarker().clearMark(changeId); } @Override public void mark(TargetSystemAuditMark auditMark) { - targetSystem.getOnGoingChangeStatusRepository().mark(auditMark); + targetSystem.getAuditMarker().mark(auditMark); } private OperationType internalGetOperationType(AbstractTargetSystem auditStoreTargetSystem) { diff --git a/core/flamingock-core/src/main/java/io/flamingock/internal/core/transaction/TransactionWrapper.java b/core/flamingock-core/src/main/java/io/flamingock/internal/core/transaction/TransactionWrapper.java index 545dd2646..84e1a7d12 100644 --- a/core/flamingock-core/src/main/java/io/flamingock/internal/core/transaction/TransactionWrapper.java +++ b/core/flamingock-core/src/main/java/io/flamingock/internal/core/transaction/TransactionWrapper.java @@ -42,7 +42,7 @@ public interface TransactionWrapper { *

* The implementation should follow this pattern: *

    - *
  1. Start a transaction
  2. + *
  3. Start a transaction using the changeId as sessionId(executionRuntime.getSessionId() should contain the changeId)
  4. *
  5. Inject transaction-scoped dependencies into the runtime
  6. *
  7. Execute the operation
  8. *
  9. Commit on success or rollback on failure
  10. diff --git a/core/flamingock-core/src/test/java/io/flamingock/internal/core/external/targets/TargetSystemManagerTest.java b/core/flamingock-core/src/test/java/io/flamingock/internal/core/external/targets/TargetSystemManagerTest.java new file mode 100644 index 000000000..b32a84e3e --- /dev/null +++ b/core/flamingock-core/src/test/java/io/flamingock/internal/core/external/targets/TargetSystemManagerTest.java @@ -0,0 +1,83 @@ +/* + * Copyright 2026 Flamingock (https://www.flamingock.io) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.flamingock.internal.core.external.targets; + +import io.flamingock.internal.core.runtime.ExecutionRuntime; +import io.flamingock.internal.core.transaction.TransactionWrapper; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.junit.jupiter.api.Assertions.*; + +class TargetSystemManagerTest { + + @Test + @DisplayName("Should return empty list when no target systems are registered") + void shouldReturnEmptyListWhenNoTargetSystemsRegistered() { + TargetSystemManager manager = new TargetSystemManager(); + assertTrue(manager.getTransactionalTargetSystems().isEmpty()); + } + + @Test + @DisplayName("Should return empty list when only non-transactional target systems are registered") + void shouldReturnEmptyListWhenOnlyNonTransactionalTargetSystems() { + TargetSystemManager manager = new TargetSystemManager(); + manager.add(new StubNonTransactionalTargetSystem("non-tx-1")); + assertTrue(manager.getTransactionalTargetSystems().isEmpty()); + } + + @Test + @DisplayName("Should return only transactional target systems from mixed registrations") + void shouldReturnTransactionalTargetSystems() { + TargetSystemManager manager = new TargetSystemManager(); + manager.add(new StubNonTransactionalTargetSystem("non-tx-1")); + manager.add(new StubTransactionalTargetSystem("tx-1")); + + List> result = manager.getTransactionalTargetSystems(); + + assertEquals(1, result.size()); + assertEquals("tx-1", result.get(0).getId()); + } + + @Test + @DisplayName("Should return all transactional target systems when multiple are registered") + void shouldReturnAllTransactionalTargetSystems() { + TargetSystemManager manager = new TargetSystemManager(); + manager.add(new StubTransactionalTargetSystem("tx-1")); + manager.add(new StubTransactionalTargetSystem("tx-2")); + manager.add(new StubNonTransactionalTargetSystem("non-tx-1")); + + List> result = manager.getTransactionalTargetSystems(); + + assertEquals(2, result.size()); + } + + private static class StubNonTransactionalTargetSystem extends AbstractTargetSystem { + StubNonTransactionalTargetSystem(String id) { super(id); } + @Override protected StubNonTransactionalTargetSystem getSelf() { return this; } + @Override protected void enhanceExecutionRuntime(ExecutionRuntime rt, boolean tx) {} + } + + private static class StubTransactionalTargetSystem extends TransactionalTargetSystem { + StubTransactionalTargetSystem(String id) { super(id); } + @Override protected StubTransactionalTargetSystem getSelf() { return this; } + @Override public TransactionWrapper getTxWrapper() { return null; } + @Override protected void enhanceExecutionRuntime(ExecutionRuntime rt, boolean tx) {} + @Override public void initialize(io.flamingock.internal.common.core.context.ContextResolver ctx) {} + } +} diff --git a/core/target-systems/flamingock-couchbase-targetsystem/src/main/java/io/flamingock/targetsystem/couchbase/CouchbaseTargetSystem.java b/core/target-systems/flamingock-couchbase-targetsystem/src/main/java/io/flamingock/targetsystem/couchbase/CouchbaseTargetSystem.java index bf7ac4403..29dd4bebe 100644 --- a/core/target-systems/flamingock-couchbase-targetsystem/src/main/java/io/flamingock/targetsystem/couchbase/CouchbaseTargetSystem.java +++ b/core/target-systems/flamingock-couchbase-targetsystem/src/main/java/io/flamingock/targetsystem/couchbase/CouchbaseTargetSystem.java @@ -83,7 +83,7 @@ public void initialize(ContextResolver baseContext) { txWrapper = new CouchbaseTxWrapper(cluster, txManager); //TODO: inject marker repository based on edition(baseContext.getDependencyValue(FlamingockEdition.class)) - markerRepository = new NoOpTargetSystemAuditMarker(this.getId()); + auditMarker = new NoOpTargetSystemAuditMarker(this.getId()); } private void validate() { diff --git a/core/target-systems/flamingock-dynamodb-targetsystem/src/main/java/io/flamingock/targetsystem/dynamodb/DynamoDBTargetSystem.java b/core/target-systems/flamingock-dynamodb-targetsystem/src/main/java/io/flamingock/targetsystem/dynamodb/DynamoDBTargetSystem.java index bdb391e7d..c8ad4c9e7 100644 --- a/core/target-systems/flamingock-dynamodb-targetsystem/src/main/java/io/flamingock/targetsystem/dynamodb/DynamoDBTargetSystem.java +++ b/core/target-systems/flamingock-dynamodb-targetsystem/src/main/java/io/flamingock/targetsystem/dynamodb/DynamoDBTargetSystem.java @@ -70,7 +70,7 @@ public void initialize(ContextResolver baseContext) { txWrapper = new DynamoDBTxWrapper(client, txManager); //TODO: inject marker repository based on edition(baseContext.getDependencyValue(FlamingockEdition.class)) - markerRepository = new NoOpTargetSystemAuditMarker(this.getId()); + auditMarker = new NoOpTargetSystemAuditMarker(this.getId()); } private void validate() { diff --git a/core/target-systems/flamingock-mongodb-springdata-targetsystem/src/main/java/io/flamingock/targetsystem/mongodb/springdata/MongoDBSpringDataTargetSystem.java b/core/target-systems/flamingock-mongodb-springdata-targetsystem/src/main/java/io/flamingock/targetsystem/mongodb/springdata/MongoDBSpringDataTargetSystem.java index ff4505188..6ff74508a 100644 --- a/core/target-systems/flamingock-mongodb-springdata-targetsystem/src/main/java/io/flamingock/targetsystem/mongodb/springdata/MongoDBSpringDataTargetSystem.java +++ b/core/target-systems/flamingock-mongodb-springdata-targetsystem/src/main/java/io/flamingock/targetsystem/mongodb/springdata/MongoDBSpringDataTargetSystem.java @@ -107,7 +107,7 @@ public void initialize(ContextResolver baseContext) { .build(); //TODO: inject marker repository based on edition(baseContext.getDependencyValue(FlamingockEdition.class)) - markerRepository = new NoOpTargetSystemAuditMarker(this.getId()); + auditMarker = new NoOpTargetSystemAuditMarker(this.getId()); } private void validate() { diff --git a/core/target-systems/flamingock-mongodb-sync-targetsystem/src/main/java/io/flamingock/targetsystem/mongodb/sync/MongoDBSyncTargetSystem.java b/core/target-systems/flamingock-mongodb-sync-targetsystem/src/main/java/io/flamingock/targetsystem/mongodb/sync/MongoDBSyncTargetSystem.java index 20a8fe173..ce192f020 100644 --- a/core/target-systems/flamingock-mongodb-sync-targetsystem/src/main/java/io/flamingock/targetsystem/mongodb/sync/MongoDBSyncTargetSystem.java +++ b/core/target-systems/flamingock-mongodb-sync-targetsystem/src/main/java/io/flamingock/targetsystem/mongodb/sync/MongoDBSyncTargetSystem.java @@ -119,7 +119,7 @@ public void initialize(ContextResolver baseContext) { txWrapper = new MongoDBSyncTxWrapper(txManager); //TODO: inject marker repository based on edition(baseContext.getDependencyValue(FlamingockEdition.class)) - markerRepository = new NoOpTargetSystemAuditMarker(this.getId()); + auditMarker = new NoOpTargetSystemAuditMarker(this.getId()); } private void validate() { diff --git a/core/target-systems/flamingock-sql-targetsystem/src/main/java/io/flamingock/targetsystem/sql/SqlTargetSystem.java b/core/target-systems/flamingock-sql-targetsystem/src/main/java/io/flamingock/targetsystem/sql/SqlTargetSystem.java index b54448dfa..98ee32468 100644 --- a/core/target-systems/flamingock-sql-targetsystem/src/main/java/io/flamingock/targetsystem/sql/SqlTargetSystem.java +++ b/core/target-systems/flamingock-sql-targetsystem/src/main/java/io/flamingock/targetsystem/sql/SqlTargetSystem.java @@ -18,18 +18,23 @@ import io.flamingock.externalsystem.sql.api.SqlExternalSystem; import io.flamingock.internal.common.core.context.ContextResolver; import io.flamingock.internal.common.core.error.FlamingockException; -import io.flamingock.internal.core.transaction.TransactionManager; -import io.flamingock.internal.core.runtime.ExecutionRuntime; -import io.flamingock.internal.core.external.targets.mark.NoOpTargetSystemAuditMarker; +import io.flamingock.internal.core.builder.FlamingockEdition; import io.flamingock.internal.core.external.targets.TransactionalTargetSystem; +import io.flamingock.internal.core.external.targets.mark.NoOpTargetSystemAuditMarker; +import io.flamingock.internal.core.runtime.ExecutionRuntime; +import io.flamingock.internal.core.transaction.TransactionManager; import io.flamingock.internal.core.transaction.TransactionWrapper; import javax.sql.DataSource; +import java.sql.Connection; import java.sql.SQLException; +import static io.flamingock.internal.core.builder.FlamingockEdition.CLOUD; +import static io.flamingock.internal.core.builder.FlamingockEdition.COMMUNITY; + public class SqlTargetSystem extends TransactionalTargetSystem implements SqlExternalSystem { - private DataSource dataSource; + private final DataSource dataSource; private SqlTxWrapper txWrapper; @@ -48,10 +53,15 @@ public void initialize(ContextResolver baseContext) { this.validate(); targetSystemContext.addDependency(dataSource); - txWrapper = createTxWrapper(); + TransactionManager txManager = getTxManager(); + txWrapper = createTxWrapper(txManager); //TODO: inject marker repository based on edition(baseContext.getDependencyValue(FlamingockEdition.class)) - markerRepository = new NoOpTargetSystemAuditMarker(this.getId()); + FlamingockEdition edition = baseContext.getDependencyValue(FlamingockEdition.class).orElse(COMMUNITY); + auditMarker = edition == CLOUD + ? SqlTargetSystemAuditMarker.builder(dataSource, txManager).build() + : new NoOpTargetSystemAuditMarker(this.getId()); + } private void validate() { @@ -73,7 +83,7 @@ public TransactionWrapper getTxWrapper() { @Override protected void enhanceExecutionRuntime(ExecutionRuntime executionRuntime, boolean isTransactional) { //if transactional, the connection is injected in the wrapInTransaction - if(!isTransactional) { + if (!isTransactional) { try { executionRuntime.addDependency(dataSource.getConnection()); } catch (SQLException e) { @@ -83,13 +93,17 @@ protected void enhanceExecutionRuntime(ExecutionRuntime executionRuntime, boolea } - private SqlTxWrapper createTxWrapper() { - return new SqlTxWrapper(new TransactionManager<>(() -> { + private SqlTxWrapper createTxWrapper(TransactionManager txManager) { + return new SqlTxWrapper(txManager); + } + + private TransactionManager getTxManager() { + return new TransactionManager<>(() -> { try { return dataSource.getConnection(); } catch (SQLException e) { throw new RuntimeException(e); } - })); + }); } } diff --git a/core/target-systems/flamingock-sql-targetsystem/src/test/java/io/flamingock/targetsystem/sql/SqlTargetSystemSharedTxManagerTest.java b/core/target-systems/flamingock-sql-targetsystem/src/test/java/io/flamingock/targetsystem/sql/SqlTargetSystemSharedTxManagerTest.java new file mode 100644 index 000000000..4a940f4eb --- /dev/null +++ b/core/target-systems/flamingock-sql-targetsystem/src/test/java/io/flamingock/targetsystem/sql/SqlTargetSystemSharedTxManagerTest.java @@ -0,0 +1,95 @@ +/* + * Copyright 2026 Flamingock (https://www.flamingock.io) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.flamingock.targetsystem.sql; + +import io.flamingock.internal.common.core.context.ContextResolver; +import io.flamingock.internal.core.builder.FlamingockEdition; +import io.flamingock.internal.core.transaction.TransactionManager; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import javax.sql.DataSource; +import java.lang.reflect.Field; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class SqlTargetSystemSharedTxManagerTest { + + @Test + @DisplayName("SqlTxWrapper and SqlTargetSystemAuditMarker should share the same TransactionManager instance") + void txWrapperAndAuditMarkerShouldShareSameTxManager() throws Exception { + DataSource dataSource = mockDataSource(); + + ContextResolver contextResolver = mock(ContextResolver.class); + when(contextResolver.getDependencyValue(FlamingockEdition.class)) + .thenReturn(Optional.of(FlamingockEdition.CLOUD)); + + SqlTargetSystem targetSystem = new SqlTargetSystem("test-sql", dataSource); + targetSystem.initialize(contextResolver); + + TransactionManager txManagerFromWrapper = extractField(targetSystem.getTxWrapper(), "txManager"); + TransactionManager txManagerFromMarker = extractField(targetSystem.getAuditMarker(), "txManager"); + + assertNotNull(txManagerFromWrapper); + assertNotNull(txManagerFromMarker); + assertSame(txManagerFromWrapper, txManagerFromMarker, + "SqlTxWrapper and SqlTargetSystemAuditMarker must share the same TransactionManager instance"); + } + + private static DataSource mockDataSource() throws Exception { + ResultSet emptyResultSet = mock(ResultSet.class); + when(emptyResultSet.next()).thenReturn(false); + + DatabaseMetaData metadata = mock(DatabaseMetaData.class); + when(metadata.getDatabaseProductName()).thenReturn("MySQL"); + when(metadata.getTables(any(), any(), anyString(), any(String[].class))).thenReturn(emptyResultSet); + + Statement statement = mock(Statement.class); + when(statement.execute(anyString())).thenReturn(false); + + Connection connection = mock(Connection.class); + when(connection.getMetaData()).thenReturn(metadata); + when(connection.createStatement()).thenReturn(statement); + + DataSource dataSource = mock(DataSource.class); + when(dataSource.getConnection()).thenReturn(connection); + return dataSource; + } + + @SuppressWarnings("unchecked") + private static T extractField(Object target, String fieldName) throws Exception { + Class current = target.getClass(); + while (current != null) { + try { + Field field = current.getDeclaredField(fieldName); + field.setAccessible(true); + return (T) field.get(target); + } catch (NoSuchFieldException e) { + current = current.getSuperclass(); + } + } + throw new NoSuchFieldException(fieldName + " not found in " + target.getClass().getName()); + } +}