Skip to content

Commit 506dfba

Browse files
feat: add couchbase, dynamodb and mongodbspringdata marker (#902)
1 parent 8673961 commit 506dfba

13 files changed

Lines changed: 595 additions & 33 deletions

File tree

core/target-systems/flamingock-couchbase-targetsystem/src/main/java/io/flamingock/targetsystem/couchbase/CouchbaseTargetSystem.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,19 @@
2525
import io.flamingock.internal.common.core.context.ContextResolver;
2626
import io.flamingock.internal.common.core.error.FlamingockException;
2727
import io.flamingock.internal.common.couchbase.CouchbaseUtils;
28+
import io.flamingock.internal.core.builder.FlamingockEdition;
2829
import io.flamingock.internal.core.external.targets.TransactionalTargetSystem;
2930
import io.flamingock.internal.core.external.targets.mark.NoOpTargetSystemAuditMarker;
3031
import io.flamingock.internal.core.transaction.TransactionManager;
3132
import io.flamingock.internal.core.transaction.TransactionWrapper;
33+
import java.util.function.Supplier;
3234

3335
import java.util.Objects;
3436
import java.util.Optional;
3537

3638
import static io.flamingock.internal.common.core.audit.AuditReaderType.MONGOCK;
3739
import static io.flamingock.internal.common.core.metadata.Constants.MONGOCK_IMPORT_ORIGIN_PROPERTY_KEY;
40+
import static io.flamingock.internal.core.builder.FlamingockEdition.COMMUNITY;
3841

3942
public class CouchbaseTargetSystem extends TransactionalTargetSystem<CouchbaseTargetSystem> implements CouchbaseExternalSystem {
4043

@@ -79,11 +82,18 @@ public void initialize(ContextResolver baseContext) {
7982
targetSystemContext.addDependency(bucket);
8083

8184

82-
TransactionManager<TransactionAttemptContext> txManager = new TransactionManager<>(null); //TODO: update as needed
85+
Supplier<TransactionAttemptContext> couchbaseTxSupplier = () -> {
86+
throw new FlamingockException(
87+
"Couchbase TransactionAttemptContext can only be obtained inside cluster.transactions().run(); "
88+
+ "the wrapper must register the session via TransactionManager.startSession(sessionId, ctx).");
89+
};
90+
TransactionManager<TransactionAttemptContext> txManager = new TransactionManager<>(couchbaseTxSupplier);
8391
txWrapper = new CouchbaseTxWrapper(cluster, txManager);
8492

85-
//TODO: inject marker repository based on edition(baseContext.getDependencyValue(FlamingockEdition.class))
86-
auditMarker = new NoOpTargetSystemAuditMarker(this.getId());
93+
FlamingockEdition edition = baseContext.getDependencyValue(FlamingockEdition.class).orElse(COMMUNITY);
94+
auditMarker = edition == COMMUNITY
95+
? new NoOpTargetSystemAuditMarker(this.getId())
96+
: CouchbaseTargetSystemAuditMarker.builder(cluster, bucket, txManager).build();
8797
}
8898

8999
private void validate() {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
/*
2+
* Copyright 2026 Flamingock (https://www.flamingock.io)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.flamingock.targetsystem.couchbase;
17+
18+
import com.couchbase.client.core.io.CollectionIdentifier;
19+
import com.couchbase.client.java.Bucket;
20+
import com.couchbase.client.java.Cluster;
21+
import com.couchbase.client.java.transactions.TransactionAttemptContext;
22+
import io.flamingock.internal.common.core.targets.TargetSystemAuditMarkType;
23+
import io.flamingock.internal.core.external.targets.mark.TargetSystemAuditMark;
24+
import io.flamingock.internal.core.transaction.TransactionManager;
25+
import org.junit.jupiter.api.AfterEach;
26+
import org.junit.jupiter.api.Assertions;
27+
import org.junit.jupiter.api.BeforeAll;
28+
import org.junit.jupiter.api.BeforeEach;
29+
import org.junit.jupiter.api.DisplayName;
30+
import org.junit.jupiter.api.Test;
31+
import org.testcontainers.couchbase.BucketDefinition;
32+
import org.testcontainers.couchbase.CouchbaseContainer;
33+
import org.testcontainers.junit.jupiter.Container;
34+
import org.testcontainers.junit.jupiter.Testcontainers;
35+
36+
import java.time.Duration;
37+
import java.util.Map;
38+
import java.util.Set;
39+
import java.util.stream.Collectors;
40+
41+
/**
42+
* Direct write-verification for {@link CouchbaseTargetSystemAuditMarker}. The existing
43+
* {@code CouchbaseTargetSystemTest} only asserts the end-state marker count is zero, which is
44+
* also satisfied when {@code mark()} is a no-op. This test exercises {@code mark()},
45+
* {@code listAll()} and {@code clearMark()} through their production code paths, proving the
46+
* write actually persists.
47+
*
48+
* <p>Couchbase's marker {@code mark()} requires a live {@link TransactionAttemptContext} from
49+
* the {@link TransactionManager}. Each call is therefore wrapped in a real Couchbase transaction
50+
* (`cluster.transactions().run(...)`), registering the attempt context under the changeId before
51+
* invoking the marker and unregistering after.
52+
*/
53+
@Testcontainers
54+
public class CouchbaseTargetSystemAuditMarkerTest {
55+
56+
private static final String BUCKET_NAME = "test";
57+
private static final String SCOPE_NAME = CollectionIdentifier.DEFAULT_SCOPE;
58+
private static final String MARKER_COLLECTION = "flamingockAuditMarkerTest";
59+
60+
@Container
61+
public static final CouchbaseContainer couchbaseContainer = new CouchbaseContainer("couchbase/server:7.2.4")
62+
.withBucket(new BucketDefinition(BUCKET_NAME));
63+
64+
private static Cluster cluster;
65+
private static Bucket bucket;
66+
67+
private TransactionManager<TransactionAttemptContext> txManager;
68+
private CouchbaseTargetSystemAuditMarker marker;
69+
70+
@BeforeAll
71+
static void beforeAll() {
72+
couchbaseContainer.start();
73+
cluster = Cluster.connect(
74+
couchbaseContainer.getConnectionString(),
75+
couchbaseContainer.getUsername(),
76+
couchbaseContainer.getPassword());
77+
78+
bucket = cluster.bucket(BUCKET_NAME);
79+
bucket.waitUntilReady(Duration.ofSeconds(10));
80+
}
81+
82+
@BeforeEach
83+
void beforeEach() {
84+
txManager = new TransactionManager<>(() -> {
85+
throw new UnsupportedOperationException(
86+
"Supplier is unused: Couchbase tests register the TransactionAttemptContext explicitly via startSession(id, ctx)");
87+
});
88+
89+
marker = CouchbaseTargetSystemAuditMarker.builder(cluster, bucket, txManager)
90+
.withScopeName(SCOPE_NAME)
91+
.withCollectionName(MARKER_COLLECTION)
92+
.withAutoCreate(true)
93+
.build();
94+
95+
// Start each test from an empty marker collection.
96+
clearAll();
97+
}
98+
99+
@AfterEach
100+
void afterEach() {
101+
clearAll();
102+
}
103+
104+
@Test
105+
@DisplayName("mark() persists each mark and listAll() returns them with the right contents")
106+
void markPersistsAndIsReadableViaListAll() {
107+
String changeId1 = "change-1";
108+
String changeId2 = "change-2";
109+
110+
markInTransaction(changeId1, TargetSystemAuditMarkType.APPLIED);
111+
markInTransaction(changeId2, TargetSystemAuditMarkType.ROLLED_BACK);
112+
113+
Set<TargetSystemAuditMark> marks = marker.listAll();
114+
Assertions.assertEquals(2, marks.size());
115+
116+
Map<String, TargetSystemAuditMarkType> byId = marks.stream()
117+
.collect(Collectors.toMap(TargetSystemAuditMark::getChangeId,
118+
TargetSystemAuditMark::getOperation));
119+
Assertions.assertEquals(TargetSystemAuditMarkType.APPLIED, byId.get(changeId1));
120+
Assertions.assertEquals(TargetSystemAuditMarkType.ROLLED_BACK, byId.get(changeId2));
121+
}
122+
123+
@Test
124+
@DisplayName("clearMark() removes only the targeted mark")
125+
void clearMarkRemovesOnlyTheTargetedMark() {
126+
String changeId1 = "change-1";
127+
String changeId2 = "change-2";
128+
markInTransaction(changeId1, TargetSystemAuditMarkType.APPLIED);
129+
markInTransaction(changeId2, TargetSystemAuditMarkType.APPLIED);
130+
131+
marker.clearMark(changeId1);
132+
133+
Set<TargetSystemAuditMark> marks = marker.listAll();
134+
Assertions.assertEquals(1, marks.size());
135+
Assertions.assertEquals(changeId2, marks.iterator().next().getChangeId());
136+
}
137+
138+
private void markInTransaction(String changeId, TargetSystemAuditMarkType operation) {
139+
cluster.transactions().run(ctx -> {
140+
txManager.startSession(changeId, ctx);
141+
marker.mark(new TargetSystemAuditMark(changeId, operation));
142+
txManager.closeSession(changeId);
143+
});
144+
}
145+
146+
private void clearAll() {
147+
for (TargetSystemAuditMark mark : marker.listAll()) {
148+
marker.clearMark(mark.getChangeId());
149+
}
150+
}
151+
}

core/target-systems/flamingock-couchbase-targetsystem/src/test/java/io/flamingock/targetsystem/couchbase/CouchbaseTargetSystemTest.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ void afterEach() throws Exception {
136136
//tear down
137137
mockRunnerServer.stop();
138138
CouchbaseCollectionHelper.dropCollectionIfExists(cluster, BUCKET_NAME, SCOPE_NAME, CLIENTS_COLLECTION);
139+
CouchbaseCollectionHelper.dropCollectionIfExists(cluster, BUCKET_NAME, SCOPE_NAME, CommunityPersistenceConstants.DEFAULT_MARKER_STORE_NAME);
139140
}
140141

141142
@Test
@@ -182,14 +183,12 @@ void happyPath() {
182183

183184
// check clients changes
184185
couchbaseTestHelper.checkCount(bucket.scope(SCOPE_NAME).collection(CLIENTS_COLLECTION), 1);
185-
//TODO add when cloud added
186186
// check ongoing status
187-
// couchbaseTestHelper.checkOngoingChange(ongoingCount -> ongoingCount == 0);
187+
couchbaseTestHelper.checkOngoingChange(ongoingCount -> ongoingCount == 0);
188188
}
189189
}
190190

191191
@Test
192-
@Disabled("adapt when adding cloud support")
193192
@DisplayName("Should rollback the ongoing deletion when a change fails")
194193
void failedChanges() {
195194
String executionId = "execution-1";
@@ -230,16 +229,15 @@ void failedChanges() {
230229
.build();
231230

232231
//THEN
233-
mockRunnerServer.verifyAllCalls();
234-
235232
OperationException ex = Assertions.assertThrows(OperationException.class, runner::run);
236233

234+
mockRunnerServer.verifyAllCalls();
235+
237236
// check clients changes
238237
couchbaseTestHelper.checkCount(bucket.scope(SCOPE_NAME).collection(CLIENTS_COLLECTION), 0);
239238

240-
//TODO when cloud enabled
241239
// check ongoing status
242-
// couchbaseTestHelper.checkEmptyTargetSystemAuditMarker();
240+
couchbaseTestHelper.checkEmptyTargetSystemAuditMarker();
243241
}
244242
}
245243

core/target-systems/flamingock-dynamodb-targetsystem/src/main/java/io/flamingock/targetsystem/dynamodb/DynamoDBTargetSystem.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import static io.flamingock.internal.common.core.audit.AuditReaderType.MONGOCK;
3636
import static io.flamingock.internal.common.core.metadata.Constants.DEFAULT_MONGOCK_ORIGIN;
3737
import static io.flamingock.internal.common.core.metadata.Constants.MONGOCK_IMPORT_ORIGIN_PROPERTY_KEY;
38+
import static io.flamingock.internal.core.builder.FlamingockEdition.COMMUNITY;
3839

3940
public class DynamoDBTargetSystem extends TransactionalTargetSystem<DynamoDBTargetSystem> implements DynamoDBExternalSystem {
4041

@@ -63,14 +64,13 @@ public void initialize(ContextResolver baseContext) {
6364
this.validate();
6465
targetSystemContext.addDependency(client);
6566

66-
FlamingockEdition edition = baseContext.getDependencyValue(FlamingockEdition.class)
67-
.orElse(FlamingockEdition.CLOUD);
68-
6967
TransactionManager<TransactWriteItemsEnhancedRequest.Builder> txManager = new TransactionManager<>(TransactWriteItemsEnhancedRequest::builder);
7068
txWrapper = new DynamoDBTxWrapper(client, txManager);
7169

72-
//TODO: inject marker repository based on edition(baseContext.getDependencyValue(FlamingockEdition.class))
73-
auditMarker = new NoOpTargetSystemAuditMarker(this.getId());
70+
FlamingockEdition edition = baseContext.getDependencyValue(FlamingockEdition.class).orElse(COMMUNITY);
71+
auditMarker = edition == COMMUNITY
72+
? new NoOpTargetSystemAuditMarker(this.getId())
73+
: DynamoDBTargetSystemAuditMarker.builder(client, txManager).build();
7474
}
7575

7676
private void validate() {

core/target-systems/flamingock-dynamodb-targetsystem/src/main/java/io/flamingock/targetsystem/dynamodb/DynamoDBTargetSystemAuditMarker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public void mark(TargetSystemAuditMark auditMark) {
9494

9595

9696
public static class Builder {
97-
protected static DynamoDBUtil dynamoDBUtil;
97+
private final DynamoDBUtil dynamoDBUtil;
9898
private final TransactionManager<TransactWriteItemsEnhancedRequest.Builder> txManager;
9999
private String tableName = CommunityPersistenceConstants.DEFAULT_MARKER_STORE_NAME;
100100
private boolean autoCreate = true;

core/target-systems/flamingock-dynamodb-targetsystem/src/test/java/io/flamingock/targetsystem/dynamodb/DynamoDBCloudTargetSystemTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ void afterEach() throws Exception {
115115
//tear down
116116
logger.info("Stopping Mock Server...");
117117
mockRunnerServer.stop();
118+
dynamoDBTestHelper.dropTable(UserEntity.tableName);
119+
dynamoDBTestHelper.dropTable(dynamoDBTestHelper.tableName);
118120
}
119121

120122
@Test
@@ -168,14 +170,12 @@ void happyPath() {
168170
.table(UserEntity.tableName, TableSchema.fromBean(UserEntity.class)),
169171
1);
170172

171-
//TODO when cloud enabled
172173
// check ongoing status
173-
// dynamoDBTestHelper.checkOngoingChange(ongoingCount -> ongoingCount == 0);
174+
dynamoDBTestHelper.checkOngoingChange(ongoingCount -> ongoingCount == 0);
174175
}
175176
}
176177

177178
@Test
178-
@Disabled("adapt when adding cloud support")
179179
@DisplayName("Should rollback the ongoing deletion when a change fails")
180180
void failedChanges() {
181181
String executionId = "execution-1";
@@ -216,10 +216,10 @@ void failedChanges() {
216216
.build();
217217

218218
//THEN
219-
mockRunnerServer.verifyAllCalls();
220-
221219
OperationException ex = Assertions.assertThrows(OperationException.class, runner::run);
222220

221+
mockRunnerServer.verifyAllCalls();
222+
223223
// check clients changes
224224
dynamoDBTestHelper.checkCount(
225225
DynamoDbEnhancedClient.builder()

0 commit comments

Comments
 (0)