Skip to content

Commit a0b21a1

Browse files
authored
refactor: cloud MongoDB marker (#899)
1 parent d202d96 commit a0b21a1

6 files changed

Lines changed: 47 additions & 45 deletions

File tree

build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ allprojects {
2222
extra["templateApiVersion"] = "1.3.2"
2323
extra["coreApiVersion"] = "1.3.1"
2424
extra["sqlVersion"] = "1.2.0"
25-
extra["mongodbTemplateVersion"] = "1.2.0"
25+
extra["mongodbTemplateVersion"] = "1.3.0-SNAPSHOT"
2626

2727
repositories {
2828
mavenLocal()

core/target-systems/flamingock-mongodb-sync-targetsystem/src/main/java/io/flamingock/targetsystem/mongodb/sync/MongoDBSyncTargetSystemAuditMarker.java renamed to core/target-systems/flamingock-mongodb-sync-targetsystem/src/main/java/io/flamingock/targetsystem/mongodb/sync/MongoDBSyncAuditMarker.java

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -22,46 +22,51 @@
2222
import com.mongodb.client.MongoCollection;
2323
import com.mongodb.client.MongoDatabase;
2424
import com.mongodb.client.model.Filters;
25-
import io.flamingock.internal.core.transaction.TransactionManager;
26-
import io.flamingock.internal.util.constants.CommunityPersistenceConstants;
27-
import io.flamingock.internal.common.mongodb.MongoDBSyncCollectionHelper;
28-
import io.flamingock.internal.common.mongodb.MongoDBSyncDocumentHelper;
2925
import io.flamingock.internal.common.core.targets.TargetSystemAuditMarkType;
3026
import io.flamingock.internal.common.mongodb.CollectionInitializator;
27+
import io.flamingock.internal.common.mongodb.MongoDBSyncCollectionHelper;
28+
import io.flamingock.internal.common.mongodb.MongoDBSyncDocumentHelper;
3129
import io.flamingock.internal.core.external.targets.mark.TargetSystemAuditMark;
3230
import io.flamingock.internal.core.external.targets.mark.TargetSystemAuditMarker;
31+
import io.flamingock.internal.core.transaction.TransactionManager;
32+
import io.flamingock.internal.util.constants.CommunityPersistenceConstants;
3333
import org.bson.Document;
3434

3535
import java.util.HashSet;
3636
import java.util.Set;
3737

3838

39-
public class MongoDBSyncTargetSystemAuditMarker implements TargetSystemAuditMarker {
39+
public class MongoDBSyncAuditMarker implements TargetSystemAuditMarker {
4040
public static final String OPERATION = "operation";
4141
private static final String CHANGE_ID = "changeId";
42-
private final MongoCollection<Document> onGoingChangeStatusCollection;
42+
private final MongoCollection<Document> auditMarkerCollection;
4343
private final TransactionManager<ClientSession> txManager;
4444

45+
public MongoDBSyncAuditMarker(MongoCollection<Document> auditMarkerCollection,
46+
TransactionManager<ClientSession> txManager) {
47+
this.auditMarkerCollection = auditMarkerCollection;
48+
this.txManager = txManager;
49+
}
50+
4551
public static Builder builder(MongoDatabase mongoDatabase, TransactionManager<ClientSession> txManager) {
4652
return new Builder(mongoDatabase, txManager);
4753
}
4854

49-
public MongoDBSyncTargetSystemAuditMarker(MongoCollection<Document> onGoingChangeStatusCollection,
50-
TransactionManager<ClientSession> txManager) {
51-
this.onGoingChangeStatusCollection = onGoingChangeStatusCollection;
52-
this.txManager = txManager;
55+
public static TargetSystemAuditMark mapToOnGoingStatus(Document document) {
56+
TargetSystemAuditMarkType operation = TargetSystemAuditMarkType.valueOf(document.getString(OPERATION));
57+
return new TargetSystemAuditMark(document.getString(CHANGE_ID), operation);
5358
}
5459

5560
@Override
5661
public Set<TargetSystemAuditMark> listAll() {
57-
return onGoingChangeStatusCollection.find()
58-
.map(MongoDBSyncTargetSystemAuditMarker::mapToOnGoingStatus)
62+
return auditMarkerCollection.find()
63+
.map(MongoDBSyncAuditMarker::mapToOnGoingStatus)
5964
.into(new HashSet<>());
6065
}
6166

6267
@Override
6368
public void clearMark(String changeId) {
64-
onGoingChangeStatusCollection.deleteMany(Filters.eq(CHANGE_ID, changeId));
69+
auditMarkerCollection.deleteMany(Filters.eq(CHANGE_ID, changeId));
6570
}
6671

6772
@Override
@@ -74,19 +79,13 @@ public void mark(TargetSystemAuditMark auditMark) {
7479
.append(OPERATION, auditMark.getOperation().name());
7580

7681
ClientSession clientSession = txManager.getSessionOrThrow(auditMark.getChangeId());
77-
onGoingChangeStatusCollection.updateOne(
82+
auditMarkerCollection.updateOne(
7883
clientSession,
7984
filter,
8085
new Document("$set", newDocument),
8186
new com.mongodb.client.model.UpdateOptions().upsert(true));
8287
}
8388

84-
public static TargetSystemAuditMark mapToOnGoingStatus(Document document) {
85-
TargetSystemAuditMarkType operation = TargetSystemAuditMarkType.valueOf(document.getString(OPERATION));
86-
return new TargetSystemAuditMark(document.getString(CHANGE_ID), operation);
87-
}
88-
89-
9089
public static class Builder {
9190

9291
private final MongoDatabase mongoDatabase;
@@ -127,7 +126,7 @@ public Builder withWriteConcern(WriteConcern writeConcern) {
127126
return this;
128127
}
129128

130-
public MongoDBSyncTargetSystemAuditMarker build() {
129+
public MongoDBSyncAuditMarker build() {
131130
MongoCollection<Document> collection = mongoDatabase.getCollection(collectionName)
132131
// .withReadConcern(readConcern)
133132
// .withReadPreference(readPreference)
@@ -144,7 +143,7 @@ public MongoDBSyncTargetSystemAuditMarker build() {
144143
} else {
145144
initializer.justValidateCollection();
146145
}
147-
return new MongoDBSyncTargetSystemAuditMarker(collection, txManager);
146+
return new MongoDBSyncAuditMarker(collection, txManager);
148147
}
149148
}
150149
}

core/target-systems/flamingock-mongodb-sync-targetsystem/src/main/java/io/flamingock/targetsystem/mongodb/sync/MongoDBSyncTargetSystem.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,23 +21,25 @@
2121
import com.mongodb.client.ClientSession;
2222
import com.mongodb.client.MongoClient;
2323
import com.mongodb.client.MongoDatabase;
24+
import io.flamingock.externalsystem.mongodb.api.MongoDBExternalSystem;
25+
import io.flamingock.importer.mongock.mongodb.MongockImporterMongoDB;
2426
import io.flamingock.internal.common.core.audit.AuditHistoryReader;
2527
import io.flamingock.internal.common.core.audit.AuditReaderType;
2628
import io.flamingock.internal.common.core.context.ContextResolver;
2729
import io.flamingock.internal.common.core.error.FlamingockException;
28-
import io.flamingock.internal.core.transaction.TransactionManager;
29-
import io.flamingock.internal.core.external.targets.mark.NoOpTargetSystemAuditMarker;
30+
import io.flamingock.internal.core.builder.FlamingockEdition;
3031
import io.flamingock.internal.core.external.targets.TransactionalTargetSystem;
32+
import io.flamingock.internal.core.external.targets.mark.NoOpTargetSystemAuditMarker;
33+
import io.flamingock.internal.core.transaction.TransactionManager;
3134
import io.flamingock.internal.core.transaction.TransactionWrapper;
32-
import io.flamingock.importer.mongock.mongodb.MongockImporterMongoDB;
33-
import io.flamingock.externalsystem.mongodb.api.MongoDBExternalSystem;
3435

3536
import java.util.Objects;
3637
import java.util.Optional;
3738

3839
import static io.flamingock.internal.common.core.audit.AuditReaderType.MONGOCK;
3940
import static io.flamingock.internal.common.core.metadata.Constants.DEFAULT_MONGOCK_ORIGIN;
4041
import static io.flamingock.internal.common.core.metadata.Constants.MONGOCK_IMPORT_ORIGIN_PROPERTY_KEY;
42+
import static io.flamingock.internal.core.builder.FlamingockEdition.COMMUNITY;
4143

4244
public class MongoDBSyncTargetSystem extends TransactionalTargetSystem<MongoDBSyncTargetSystem> implements MongoDBExternalSystem {
4345

@@ -110,16 +112,18 @@ public void initialize(ContextResolver baseContext) {
110112
this.validate();
111113
targetSystemContext.addDependency(mongoClient);
112114
database = mongoClient.getDatabase(databaseName)
113-
.withReadConcern(readConcern)
114-
.withReadPreference(readPreference)
115-
.withWriteConcern(writeConcern);
115+
.withReadConcern(readConcern)
116+
.withReadPreference(readPreference)
117+
.withWriteConcern(writeConcern);
116118
targetSystemContext.addDependency(database);
117119

118120
TransactionManager<ClientSession> txManager = new TransactionManager<>(mongoClient::startSession);
119121
txWrapper = new MongoDBSyncTxWrapper(txManager);
122+
FlamingockEdition edition = baseContext.getDependencyValue(FlamingockEdition.class).orElse(COMMUNITY);
120123

121-
//TODO: inject marker repository based on edition(baseContext.getDependencyValue(FlamingockEdition.class))
122-
auditMarker = new NoOpTargetSystemAuditMarker(this.getId());
124+
auditMarker = edition == COMMUNITY
125+
? new NoOpTargetSystemAuditMarker(this.getId())
126+
:MongoDBSyncAuditMarker.builder(database, txManager).build();
123127
}
124128

125129
private void validate() {

core/target-systems/flamingock-sql-targetsystem/src/main/java/io/flamingock/targetsystem/sql/SqlTargetSystemAuditMarker.java renamed to core/target-systems/flamingock-sql-targetsystem/src/main/java/io/flamingock/targetsystem/sql/SqlAuditMarker.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
import java.util.Set;
3434

3535

36-
public class SqlTargetSystemAuditMarker implements TargetSystemAuditMarker {
36+
public class SqlAuditMarker implements TargetSystemAuditMarker {
3737

3838
private final String tableName;
3939
private final DataSource dataSource;
@@ -44,10 +44,10 @@ public static Builder builder(DataSource dataSource, TransactionManager<Connecti
4444
return new Builder(dataSource, txManager);
4545
}
4646

47-
private SqlTargetSystemAuditMarker(DataSource dataSource,
48-
String tableName,
49-
TransactionManager<Connection> txManager,
50-
SqlAuditMarkerDialectHelper dialectHelper) {
47+
private SqlAuditMarker(DataSource dataSource,
48+
String tableName,
49+
TransactionManager<Connection> txManager,
50+
SqlAuditMarkerDialectHelper dialectHelper) {
5151
this.dataSource = dataSource;
5252
this.tableName = tableName;
5353
this.txManager = txManager;
@@ -130,7 +130,7 @@ public Builder withSqlDialect(SqlDialect sqlDialect) {
130130
return this;
131131
}
132132

133-
public SqlTargetSystemAuditMarker build() {
133+
public SqlAuditMarker build() {
134134
try (Connection connection = dataSource.getConnection()) {
135135
this.dialectHelper = new SqlAuditMarkerDialectHelper(connection);
136136
} catch (SQLException ex) {
@@ -139,7 +139,7 @@ public SqlTargetSystemAuditMarker build() {
139139
if (autoCreate) {
140140
createTableIfNotExists();
141141
}
142-
return new SqlTargetSystemAuditMarker(dataSource, tableName, txManager, dialectHelper);
142+
return new SqlAuditMarker(dataSource, tableName, txManager, dialectHelper);
143143
}
144144

145145
private void createTableIfNotExists() {

core/target-systems/flamingock-sql-targetsystem/src/main/java/io/flamingock/targetsystem/sql/SqlTargetSystem.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import java.sql.Connection;
3030
import java.sql.SQLException;
3131

32-
import static io.flamingock.internal.core.builder.FlamingockEdition.CLOUD;
3332
import static io.flamingock.internal.core.builder.FlamingockEdition.COMMUNITY;
3433

3534
public class SqlTargetSystem extends TransactionalTargetSystem<SqlTargetSystem> implements SqlExternalSystem {
@@ -58,9 +57,9 @@ public void initialize(ContextResolver baseContext) {
5857

5958
//TODO: inject marker repository based on edition(baseContext.getDependencyValue(FlamingockEdition.class))
6059
FlamingockEdition edition = baseContext.getDependencyValue(FlamingockEdition.class).orElse(COMMUNITY);
61-
auditMarker = edition == CLOUD
62-
? SqlTargetSystemAuditMarker.builder(dataSource, txManager).build()
63-
: new NoOpTargetSystemAuditMarker(this.getId());
60+
auditMarker = edition == COMMUNITY
61+
? new NoOpTargetSystemAuditMarker(this.getId())
62+
: SqlAuditMarker.builder(dataSource, txManager).build();
6463

6564
}
6665

core/target-systems/flamingock-sql-targetsystem/src/test/java/io/flamingock/targetsystem/sql/SqlAuditMarkerDialectHelperTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public class SqlAuditMarkerDialectHelperTest {
4545
private static final String ONGOING_CHANGES_TABLE = "FLAMINGOCK_ONGOING_CHANGES";
4646

4747
private DataSource dataSource;
48-
private SqlTargetSystemAuditMarker sqlTargetSystemAuditMarker;
48+
private SqlAuditMarker sqlTargetSystemAuditMarker;
4949
private TransactionManager<Connection> txManager;
5050

5151
private JdbcDatabaseContainer<?> createContainerForDialect(SqlDialect dialect) {
@@ -126,7 +126,7 @@ private void initForDialect(SqlDialect dialect, JdbcDatabaseContainer<?> contain
126126
}
127127
});
128128

129-
sqlTargetSystemAuditMarker = SqlTargetSystemAuditMarker.builder(dataSource, txManager)
129+
sqlTargetSystemAuditMarker = SqlAuditMarker.builder(dataSource, txManager)
130130
.withTableName(ONGOING_CHANGES_TABLE)
131131
.build();
132132
}

0 commit comments

Comments
 (0)