Skip to content

Commit 8827ebf

Browse files
authored
Add ExportScheduler and LeaderScheduler for MongoDB/DocumentDB source (#4277)
* Add ExportScheduler and LeaderScheduler for MongoDB/DocumentDB source Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> * Update access modifier for static field Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> * Unit test updates Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> * Add MongoDB Export Partition supplier Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> * Logging collection when creating scheduler Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> --------- Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
1 parent c705c09 commit 8827ebf

9 files changed

Lines changed: 902 additions & 4 deletions

File tree

data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/coordination/partition/ExportPartition.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,11 @@ public ExportPartition(final SourcePartitionStoreItem sourcePartitionStoreItem)
3939
}
4040

4141
public ExportPartition(final String collection, final int partitionSize, final Instant exportTime,
42-
final Optional<ExportProgressState> state) {
42+
final ExportProgressState state) {
4343
this.collection = collection;
4444
this.partitionSize = partitionSize;
4545
this.exportTime = exportTime;
46-
this.state = state.orElse(null);
46+
this.state = state;
4747

4848
}
4949

data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/coordination/partition/GlobalState.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@ public GlobalState(SourcePartitionStoreItem sourcePartitionStoreItem) {
2929
this.state = convertStringToPartitionProgressState(null, sourcePartitionStoreItem.getPartitionProgressState());
3030
}
3131

32-
public GlobalState(String stateName, Optional<Map<String, Object>> state) {
32+
public GlobalState(String stateName, Map<String, Object> state) {
3333
this.stateName = stateName;
34-
this.state = state.orElse(null);
34+
this.state = state;
3535

3636
}
3737

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package org.opensearch.dataprepper.plugins.mongo.export;
2+
3+
import io.micrometer.core.instrument.Counter;
4+
import org.opensearch.dataprepper.metrics.PluginMetrics;
5+
import org.opensearch.dataprepper.model.source.coordinator.PartitionIdentifier;
6+
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
7+
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
8+
import org.opensearch.dataprepper.plugins.mongo.coordination.partition.DataQueryPartition;
9+
import org.opensearch.dataprepper.plugins.mongo.coordination.partition.ExportPartition;
10+
import org.opensearch.dataprepper.plugins.mongo.coordination.partition.GlobalState;
11+
import org.opensearch.dataprepper.plugins.mongo.coordination.state.DataQueryProgressState;
12+
import org.opensearch.dataprepper.plugins.mongo.model.LoadStatus;
13+
import org.slf4j.Logger;
14+
import org.slf4j.LoggerFactory;
15+
16+
import java.time.Instant;
17+
import java.util.List;
18+
import java.util.Optional;
19+
import java.util.concurrent.atomic.AtomicInteger;
20+
21+
public class ExportScheduler implements Runnable {
22+
public static final String EXPORT_PREFIX = "EXPORT-";
23+
private static final Logger LOG = LoggerFactory.getLogger(ExportScheduler.class);
24+
private static final int DEFAULT_TAKE_LEASE_INTERVAL_MILLIS = 60_000;
25+
static final String EXPORT_JOB_SUCCESS_COUNT = "exportJobSuccess";
26+
static final String EXPORT_JOB_FAILURE_COUNT = "exportJobFailure";
27+
static final String EXPORT_PARTITION_QUERY_TOTAL_COUNT = "exportPartitionQueryTotal";
28+
static final String EXPORT_RECORDS_TOTAL_COUNT = "exportRecordsTotal";
29+
private final PluginMetrics pluginMetrics;
30+
private final EnhancedSourceCoordinator enhancedSourceCoordinator;
31+
private final MongoDBExportPartitionSupplier mongoDBExportPartitionSupplier;
32+
private final Counter exportJobSuccessCounter;
33+
private final Counter exportJobFailureCounter;
34+
35+
private final Counter exportPartitionTotalCounter;
36+
private final Counter exportRecordsTotalCounter;
37+
38+
public ExportScheduler(final EnhancedSourceCoordinator enhancedSourceCoordinator,
39+
final MongoDBExportPartitionSupplier mongoDBExportPartitionSupplier,
40+
final PluginMetrics pluginMetrics) {
41+
this.enhancedSourceCoordinator = enhancedSourceCoordinator;
42+
this.mongoDBExportPartitionSupplier = mongoDBExportPartitionSupplier;
43+
this.pluginMetrics = pluginMetrics;
44+
45+
exportJobSuccessCounter = pluginMetrics.counter(EXPORT_JOB_SUCCESS_COUNT);
46+
exportJobFailureCounter = pluginMetrics.counter(EXPORT_JOB_FAILURE_COUNT);
47+
exportPartitionTotalCounter = pluginMetrics.counter(EXPORT_PARTITION_QUERY_TOTAL_COUNT);
48+
exportRecordsTotalCounter = pluginMetrics.counter(EXPORT_RECORDS_TOTAL_COUNT);
49+
}
50+
51+
@Override
52+
public void run() {
53+
LOG.info("Start running Export Scheduler");
54+
while (!Thread.currentThread().isInterrupted()) {
55+
try {
56+
final Optional<EnhancedSourcePartition> sourcePartition = enhancedSourceCoordinator.acquireAvailablePartition(ExportPartition.PARTITION_TYPE);
57+
if (sourcePartition.isPresent()) {
58+
final ExportPartition exportPartition = (ExportPartition) sourcePartition.get();
59+
LOG.info("Acquired an export partition: {}", exportPartition.getPartitionKey());
60+
61+
final List<PartitionIdentifier> partitionIdentifiers = mongoDBExportPartitionSupplier.apply(exportPartition);
62+
63+
createDataQueryPartitions(exportPartition.getPartitionKey(), Instant.now(), partitionIdentifiers);
64+
}
65+
try {
66+
Thread.sleep(DEFAULT_TAKE_LEASE_INTERVAL_MILLIS);
67+
} catch (final InterruptedException e) {
68+
LOG.info("The ExportScheduler was interrupted while waiting to retry, stopping processing");
69+
break;
70+
}
71+
} catch (final Exception e) {
72+
LOG.error("Received an exception during export from DocumentDB, backing off and retrying", e);
73+
try {
74+
Thread.sleep(DEFAULT_TAKE_LEASE_INTERVAL_MILLIS);
75+
} catch (final InterruptedException ex) {
76+
LOG.info("The ExportScheduler was interrupted while waiting to retry, stopping processing");
77+
break;
78+
}
79+
}
80+
}
81+
LOG.warn("Export scheduler interrupted, looks like shutdown has triggered");
82+
}
83+
84+
private void createDataQueryPartitions(final String collection,
85+
final Instant exportTime,
86+
final List<PartitionIdentifier> partitionIdentifiers) {
87+
AtomicInteger totalQueries = new AtomicInteger();
88+
partitionIdentifiers.forEach(partitionIdentifier -> {
89+
final DataQueryProgressState progressState = new DataQueryProgressState();
90+
progressState.setExecutedQueries(0);
91+
progressState.setLoadedRecords(0);
92+
progressState.setStartTime(exportTime.toEpochMilli());
93+
94+
totalQueries.getAndIncrement();
95+
final DataQueryPartition partition = new DataQueryPartition(partitionIdentifier.getPartitionKey(), progressState);
96+
enhancedSourceCoordinator.createPartition(partition);
97+
});
98+
99+
exportPartitionTotalCounter.increment(totalQueries.get());
100+
101+
// Currently, we need to maintain a global state to track the overall progress.
102+
// So that we can easily tell if all the export files are loaded
103+
final LoadStatus loadStatus = new LoadStatus(totalQueries.get(), 0);
104+
enhancedSourceCoordinator.createPartition(new GlobalState(EXPORT_PREFIX + collection, loadStatus.toMap()));
105+
}
106+
107+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.plugins.mongo.export;
7+
8+
import com.mongodb.client.FindIterable;
9+
import com.mongodb.client.MongoClient;
10+
import com.mongodb.client.MongoCollection;
11+
import com.mongodb.client.MongoCursor;
12+
import com.mongodb.client.MongoDatabase;
13+
import com.mongodb.client.model.Filters;
14+
import org.bson.Document;
15+
import org.opensearch.dataprepper.model.source.coordinator.PartitionIdentifier;
16+
import org.opensearch.dataprepper.plugins.mongo.client.MongoDBConnection;
17+
import org.opensearch.dataprepper.plugins.mongo.client.BsonHelper;
18+
import org.opensearch.dataprepper.plugins.mongo.coordination.partition.ExportPartition;
19+
import org.opensearch.dataprepper.plugins.mongo.configuration.MongoDBSourceConfig;
20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
22+
23+
import java.util.ArrayList;
24+
import java.util.List;
25+
import java.util.function.Function;
26+
27+
public class MongoDBExportPartitionSupplier implements Function<ExportPartition, List<PartitionIdentifier>> {
28+
private static final Logger LOG = LoggerFactory.getLogger(MongoDBExportPartitionSupplier.class);
29+
private static final String MONGODB_PARTITION_KEY_FORMAT = "%s|%s|%s|%s"; // partition format: <db.collection>|<gte>|<lt>|<className>
30+
private static final String COLLECTION_SPLITTER = "\\.";
31+
32+
private final MongoDBSourceConfig sourceConfig;
33+
34+
public MongoDBExportPartitionSupplier(final MongoDBSourceConfig sourceConfig) {
35+
this.sourceConfig = sourceConfig;
36+
}
37+
38+
private List<PartitionIdentifier> buildPartitions(final ExportPartition exportPartition) {
39+
final List<PartitionIdentifier> collectionPartitions = new ArrayList<>();
40+
final String collectionDbName = exportPartition.getCollection();
41+
List<String> collection = List.of(collectionDbName.split(COLLECTION_SPLITTER));
42+
if (collection.size() < 2) {
43+
throw new IllegalArgumentException("Invalid Collection Name. Must be in db.collection format");
44+
}
45+
try (MongoClient mongoClient = MongoDBConnection.getMongoClient(sourceConfig)) {
46+
final MongoDatabase db = mongoClient.getDatabase(collection.get(0));
47+
final MongoCollection<Document> col = db.getCollection(collection.get(1));
48+
final int partitionSize = exportPartition.getPartitionSize();
49+
FindIterable<Document> startIterable = col.find()
50+
.projection(new Document("_id", 1))
51+
.sort(new Document("_id", 1))
52+
.limit(1);
53+
while (!Thread.currentThread().isInterrupted()) {
54+
try (final MongoCursor<Document> startCursor = startIterable.iterator()) {
55+
if (!startCursor.hasNext()) {
56+
break;
57+
}
58+
final Document startDoc = startCursor.next();
59+
final Object gteValue = startDoc.get("_id");
60+
final String className = gteValue.getClass().getName();
61+
62+
// Get end doc
63+
Document endDoc = startIterable.skip(partitionSize - 1).limit(1).first();
64+
if (endDoc == null) {
65+
// this means we have reached the end of the doc
66+
endDoc = col.find()
67+
.projection(new Document("_id", 1))
68+
.sort(new Document("_id", -1))
69+
.limit(1)
70+
.first();
71+
}
72+
if (endDoc == null) {
73+
break;
74+
}
75+
76+
final Object lteValue = endDoc.get("_id");
77+
final String gteValueString = BsonHelper.getPartitionStringFromMongoDBId(gteValue, className);
78+
final String lteValueString = BsonHelper.getPartitionStringFromMongoDBId(lteValue, className);
79+
LOG.info("Partition of " + collectionDbName + ": {gte: " + gteValueString + ", lte: " + lteValueString + "}");
80+
collectionPartitions.add(
81+
PartitionIdentifier
82+
.builder()
83+
.withPartitionKey(String.format(MONGODB_PARTITION_KEY_FORMAT, collectionDbName, gteValueString, lteValueString, className))
84+
.build());
85+
86+
startIterable = col.find(Filters.gt("_id", lteValue))
87+
.projection(new Document("_id", 1))
88+
.sort(new Document("_id", 1))
89+
.limit(1);
90+
} catch (Exception e) {
91+
LOG.error("Failed to read start cursor when build partitions", e);
92+
throw new RuntimeException(e);
93+
}
94+
}
95+
}
96+
return collectionPartitions;
97+
}
98+
99+
@Override
100+
public List<PartitionIdentifier> apply(final ExportPartition exportPartition) {
101+
return buildPartitions(exportPartition);
102+
}
103+
}

0 commit comments

Comments
 (0)