Skip to content

Commit 550fc20

Browse files
committed
Add global state ownership renewal during partition creation
Signed-off-by: Siqi Ding <dingdd@amazon.com>
1 parent 359b7d0 commit 550fc20

1 file changed

Lines changed: 25 additions & 0 deletions

File tree

data-prepper-core/src/main/java/org/opensearch/dataprepper/core/sourcecoordination/LeaseBasedSourceCoordinator.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ public class LeaseBasedSourceCoordinator<T> implements SourceCoordinator<T> {
6060
static final String PARTITION_NOT_OWNED_ERROR_COUNT = "partitionNotOwnedErrors";
6161
static final String PARTITION_UPDATE_ERROR_COUNT = "PartitionUpdateErrors";
6262
static final Duration DEFAULT_LEASE_TIMEOUT = Duration.ofMinutes(10);
63+
static final Duration DEFAULT_RENEW_TIMEOUT = Duration.ofMinutes(3);
6364

6465
private static final String hostName;
6566
static final String PARTITION_TYPE = "PARTITION";
@@ -204,7 +205,17 @@ private Optional<SourcePartition<T>> getNextPartitionInternal(final Function<Map
204205

205206
@Override
206207
public void createPartitions(final List<PartitionIdentifier> partitionIdentifiers) {
208+
Instant lastOwnershipRenewal = Instant.now();
209+
207210
for (final PartitionIdentifier partitionIdentifier : partitionIdentifiers) {
211+
if (Instant.now().isAfter(lastOwnershipRenewal.plus(DEFAULT_RENEW_TIMEOUT))) {
212+
LOG.info("Renewing global state ownership for partition creation");
213+
renewGlobalStateOwnershipForPartitionCreation();
214+
lastOwnershipRenewal = Instant.now();
215+
}
216+
217+
218+
208219
final Optional<SourcePartitionStoreItem> optionalPartitionItem = sourceCoordinationStore.getSourcePartitionItem(sourceIdentifierWithPartitionType, partitionIdentifier.getPartitionKey());
209220

210221
if (optionalPartitionItem.isPresent()) {
@@ -515,6 +526,20 @@ private Optional<SourcePartitionStoreItem> acquireGlobalStateForPartitionCreatio
515526
return globalStateItemForPartitionCreation;
516527
}
517528

529+
private void renewGlobalStateOwnershipForPartitionCreation() {
530+
try {
531+
final Optional<SourcePartitionStoreItem> globalStateItem = sourceCoordinationStore.getSourcePartitionItem(
532+
sourceIdentifierWithGlobalStateType, GLOBAL_STATE_SOURCE_PARTITION_KEY_FOR_CREATING_PARTITIONS);
533+
534+
if (globalStateItem.isPresent() && ownerId.equals(globalStateItem.get().getPartitionOwner())) {
535+
globalStateItem.get().setPartitionOwnershipTimeout(Instant.now().plus(DEFAULT_LEASE_TIMEOUT));
536+
sourceCoordinationStore.tryUpdateSourcePartitionItem(globalStateItem.get());
537+
}
538+
} catch (final Exception e) {
539+
LOG.warn("Failed to renew global state ownership for partition creation", e);
540+
}
541+
}
542+
518543
private void giveUpAndSaveGlobalStateForPartitionCreation(final SourcePartitionStoreItem globalStateItemForPartitionCreation, final Map<String, Object> globalStateMap) {
519544

520545
globalStateItemForPartitionCreation.setSourcePartitionStatus(SourcePartitionStatus.UNASSIGNED);

0 commit comments

Comments
 (0)