Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,19 @@
import io.flamingock.internal.common.core.context.ContextResolver;
import io.flamingock.internal.common.core.error.FlamingockException;
import io.flamingock.internal.common.couchbase.CouchbaseUtils;
import io.flamingock.internal.core.builder.FlamingockEdition;
import io.flamingock.internal.core.external.targets.TransactionalTargetSystem;
import io.flamingock.internal.core.external.targets.mark.NoOpTargetSystemAuditMarker;
import io.flamingock.internal.core.transaction.TransactionManager;
import io.flamingock.internal.core.transaction.TransactionWrapper;
import java.util.function.Supplier;

import java.util.Objects;
import java.util.Optional;
Comment on lines +33 to 36

import static io.flamingock.internal.common.core.audit.AuditReaderType.MONGOCK;
import static io.flamingock.internal.common.core.metadata.Constants.MONGOCK_IMPORT_ORIGIN_PROPERTY_KEY;
import static io.flamingock.internal.core.builder.FlamingockEdition.COMMUNITY;

public class CouchbaseTargetSystem extends TransactionalTargetSystem<CouchbaseTargetSystem> implements CouchbaseExternalSystem {

Expand Down Expand Up @@ -79,11 +82,18 @@ public void initialize(ContextResolver baseContext) {
targetSystemContext.addDependency(bucket);


TransactionManager<TransactionAttemptContext> txManager = new TransactionManager<>(null); //TODO: update as needed
Supplier<TransactionAttemptContext> couchbaseTxSupplier = () -> {
throw new FlamingockException(
"Couchbase TransactionAttemptContext can only be obtained inside cluster.transactions().run(); "
+ "the wrapper must register the session via TransactionManager.startSession(sessionId, ctx).");
};
TransactionManager<TransactionAttemptContext> txManager = new TransactionManager<>(couchbaseTxSupplier);
txWrapper = new CouchbaseTxWrapper(cluster, txManager);

//TODO: inject marker repository based on edition(baseContext.getDependencyValue(FlamingockEdition.class))
auditMarker = new NoOpTargetSystemAuditMarker(this.getId());
FlamingockEdition edition = baseContext.getDependencyValue(FlamingockEdition.class).orElse(COMMUNITY);
auditMarker = edition == COMMUNITY
? new NoOpTargetSystemAuditMarker(this.getId())
: CouchbaseTargetSystemAuditMarker.builder(cluster, bucket, txManager).build();
}

private void validate() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Copyright 2026 Flamingock (https://www.flamingock.io)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.flamingock.targetsystem.couchbase;

import com.couchbase.client.core.io.CollectionIdentifier;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.transactions.TransactionAttemptContext;
import io.flamingock.internal.common.core.targets.TargetSystemAuditMarkType;
import io.flamingock.internal.core.external.targets.mark.TargetSystemAuditMark;
import io.flamingock.internal.core.transaction.TransactionManager;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.testcontainers.couchbase.BucketDefinition;
import org.testcontainers.couchbase.CouchbaseContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

import java.time.Duration;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Direct write-verification for {@link CouchbaseTargetSystemAuditMarker}. The existing
* {@code CouchbaseTargetSystemTest} only asserts the end-state marker count is zero, which is
* also satisfied when {@code mark()} is a no-op. This test exercises {@code mark()},
* {@code listAll()} and {@code clearMark()} through their production code paths, proving the
* write actually persists.
*
* <p>Couchbase's marker {@code mark()} requires a live {@link TransactionAttemptContext} from
* the {@link TransactionManager}. Each call is therefore wrapped in a real Couchbase transaction
* (`cluster.transactions().run(...)`), registering the attempt context under the changeId before
* invoking the marker and unregistering after.
*/
@Testcontainers
public class CouchbaseTargetSystemAuditMarkerTest {

private static final String BUCKET_NAME = "test";
private static final String SCOPE_NAME = CollectionIdentifier.DEFAULT_SCOPE;
private static final String MARKER_COLLECTION = "flamingockAuditMarkerTest";

@Container
public static final CouchbaseContainer couchbaseContainer = new CouchbaseContainer("couchbase/server:7.2.4")
.withBucket(new BucketDefinition(BUCKET_NAME));

private static Cluster cluster;
private static Bucket bucket;

private TransactionManager<TransactionAttemptContext> txManager;
private CouchbaseTargetSystemAuditMarker marker;

@BeforeAll
static void beforeAll() {
couchbaseContainer.start();
cluster = Cluster.connect(
couchbaseContainer.getConnectionString(),
couchbaseContainer.getUsername(),
couchbaseContainer.getPassword());

bucket = cluster.bucket(BUCKET_NAME);
bucket.waitUntilReady(Duration.ofSeconds(10));
}

@BeforeEach
void beforeEach() {
txManager = new TransactionManager<>(() -> {
throw new UnsupportedOperationException(
"Supplier is unused: Couchbase tests register the TransactionAttemptContext explicitly via startSession(id, ctx)");
});

marker = CouchbaseTargetSystemAuditMarker.builder(cluster, bucket, txManager)
.withScopeName(SCOPE_NAME)
.withCollectionName(MARKER_COLLECTION)
.withAutoCreate(true)
.build();

// Start each test from an empty marker collection.
clearAll();
}

@AfterEach
void afterEach() {
clearAll();
}

@Test
@DisplayName("mark() persists each mark and listAll() returns them with the right contents")
void markPersistsAndIsReadableViaListAll() {
String changeId1 = "change-1";
String changeId2 = "change-2";

markInTransaction(changeId1, TargetSystemAuditMarkType.APPLIED);
markInTransaction(changeId2, TargetSystemAuditMarkType.ROLLED_BACK);

Set<TargetSystemAuditMark> marks = marker.listAll();
Assertions.assertEquals(2, marks.size());

Map<String, TargetSystemAuditMarkType> byId = marks.stream()
.collect(Collectors.toMap(TargetSystemAuditMark::getChangeId,
TargetSystemAuditMark::getOperation));
Assertions.assertEquals(TargetSystemAuditMarkType.APPLIED, byId.get(changeId1));
Assertions.assertEquals(TargetSystemAuditMarkType.ROLLED_BACK, byId.get(changeId2));
}

@Test
@DisplayName("clearMark() removes only the targeted mark")
void clearMarkRemovesOnlyTheTargetedMark() {
String changeId1 = "change-1";
String changeId2 = "change-2";
markInTransaction(changeId1, TargetSystemAuditMarkType.APPLIED);
markInTransaction(changeId2, TargetSystemAuditMarkType.APPLIED);

marker.clearMark(changeId1);

Set<TargetSystemAuditMark> marks = marker.listAll();
Assertions.assertEquals(1, marks.size());
Assertions.assertEquals(changeId2, marks.iterator().next().getChangeId());
}

private void markInTransaction(String changeId, TargetSystemAuditMarkType operation) {
cluster.transactions().run(ctx -> {
txManager.startSession(changeId, ctx);
marker.mark(new TargetSystemAuditMark(changeId, operation));
txManager.closeSession(changeId);
});
}

private void clearAll() {
for (TargetSystemAuditMark mark : marker.listAll()) {
marker.clearMark(mark.getChangeId());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ void afterEach() throws Exception {
//tear down
mockRunnerServer.stop();
CouchbaseCollectionHelper.dropCollectionIfExists(cluster, BUCKET_NAME, SCOPE_NAME, CLIENTS_COLLECTION);
CouchbaseCollectionHelper.dropCollectionIfExists(cluster, BUCKET_NAME, SCOPE_NAME, CommunityPersistenceConstants.DEFAULT_MARKER_STORE_NAME);
}

@Test
Expand Down Expand Up @@ -182,14 +183,12 @@ void happyPath() {

// check clients changes
couchbaseTestHelper.checkCount(bucket.scope(SCOPE_NAME).collection(CLIENTS_COLLECTION), 1);
//TODO add when cloud added
// check ongoing status
// couchbaseTestHelper.checkOngoingChange(ongoingCount -> ongoingCount == 0);
couchbaseTestHelper.checkOngoingChange(ongoingCount -> ongoingCount == 0);
}
}

@Test
@Disabled("adapt when adding cloud support")
@DisplayName("Should rollback the ongoing deletion when a change fails")
void failedChanges() {
String executionId = "execution-1";
Expand Down Expand Up @@ -230,16 +229,15 @@ void failedChanges() {
.build();

//THEN
mockRunnerServer.verifyAllCalls();

OperationException ex = Assertions.assertThrows(OperationException.class, runner::run);

mockRunnerServer.verifyAllCalls();

// check clients changes
couchbaseTestHelper.checkCount(bucket.scope(SCOPE_NAME).collection(CLIENTS_COLLECTION), 0);

//TODO when cloud enabled
// check ongoing status
// couchbaseTestHelper.checkEmptyTargetSystemAuditMarker();
couchbaseTestHelper.checkEmptyTargetSystemAuditMarker();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import static io.flamingock.internal.common.core.audit.AuditReaderType.MONGOCK;
import static io.flamingock.internal.common.core.metadata.Constants.DEFAULT_MONGOCK_ORIGIN;
import static io.flamingock.internal.common.core.metadata.Constants.MONGOCK_IMPORT_ORIGIN_PROPERTY_KEY;
import static io.flamingock.internal.core.builder.FlamingockEdition.COMMUNITY;

public class DynamoDBTargetSystem extends TransactionalTargetSystem<DynamoDBTargetSystem> implements DynamoDBExternalSystem {

Expand Down Expand Up @@ -63,14 +64,13 @@ public void initialize(ContextResolver baseContext) {
this.validate();
targetSystemContext.addDependency(client);

FlamingockEdition edition = baseContext.getDependencyValue(FlamingockEdition.class)
.orElse(FlamingockEdition.CLOUD);

TransactionManager<TransactWriteItemsEnhancedRequest.Builder> txManager = new TransactionManager<>(TransactWriteItemsEnhancedRequest::builder);
txWrapper = new DynamoDBTxWrapper(client, txManager);

//TODO: inject marker repository based on edition(baseContext.getDependencyValue(FlamingockEdition.class))
auditMarker = new NoOpTargetSystemAuditMarker(this.getId());
FlamingockEdition edition = baseContext.getDependencyValue(FlamingockEdition.class).orElse(COMMUNITY);
auditMarker = edition == COMMUNITY
? new NoOpTargetSystemAuditMarker(this.getId())
: DynamoDBTargetSystemAuditMarker.builder(client, txManager).build();
}

private void validate() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void mark(TargetSystemAuditMark auditMark) {


public static class Builder {
protected static DynamoDBUtil dynamoDBUtil;
private final DynamoDBUtil dynamoDBUtil;
private final TransactionManager<TransactWriteItemsEnhancedRequest.Builder> txManager;
private String tableName = CommunityPersistenceConstants.DEFAULT_MARKER_STORE_NAME;
private boolean autoCreate = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ void afterEach() throws Exception {
//tear down
logger.info("Stopping Mock Server...");
mockRunnerServer.stop();
dynamoDBTestHelper.dropTable(UserEntity.tableName);
dynamoDBTestHelper.dropTable(dynamoDBTestHelper.tableName);
}

@Test
Expand Down Expand Up @@ -168,14 +170,12 @@ void happyPath() {
.table(UserEntity.tableName, TableSchema.fromBean(UserEntity.class)),
1);

//TODO when cloud enabled
// check ongoing status
// dynamoDBTestHelper.checkOngoingChange(ongoingCount -> ongoingCount == 0);
dynamoDBTestHelper.checkOngoingChange(ongoingCount -> ongoingCount == 0);
}
}

@Test
@Disabled("adapt when adding cloud support")
@DisplayName("Should rollback the ongoing deletion when a change fails")
void failedChanges() {
String executionId = "execution-1";
Expand Down Expand Up @@ -216,10 +216,10 @@ void failedChanges() {
.build();

//THEN
mockRunnerServer.verifyAllCalls();

OperationException ex = Assertions.assertThrows(OperationException.class, runner::run);

mockRunnerServer.verifyAllCalls();

// check clients changes
dynamoDBTestHelper.checkCount(
DynamoDbEnhancedClient.builder()
Expand Down
Loading
Loading