@@ -60,6 +60,7 @@ public class LeaseBasedSourceCoordinator<T> implements SourceCoordinator<T> {
6060 static final String PARTITION_NOT_OWNED_ERROR_COUNT = "partitionNotOwnedErrors" ;
6161 static final String PARTITION_UPDATE_ERROR_COUNT = "PartitionUpdateErrors" ;
6262 static final Duration DEFAULT_LEASE_TIMEOUT = Duration .ofMinutes (10 );
63+ static final Duration DEFAULT_RENEW_TIMEOUT = Duration .ofMinutes (3 );
6364
6465 private static final String hostName ;
6566 static final String PARTITION_TYPE = "PARTITION" ;
@@ -204,7 +205,17 @@ private Optional<SourcePartition<T>> getNextPartitionInternal(final Function<Map
204205
205206 @ Override
206207 public void createPartitions (final List <PartitionIdentifier > partitionIdentifiers ) {
208+ Instant lastOwnershipRenewal = Instant .now ();
209+
207210 for (final PartitionIdentifier partitionIdentifier : partitionIdentifiers ) {
211+ if (Instant .now ().isAfter (lastOwnershipRenewal .plus (DEFAULT_RENEW_TIMEOUT ))) {
212+ LOG .info ("Renewing global state ownership for partition creation" );
213+ renewGlobalStateOwnershipForPartitionCreation ();
214+ lastOwnershipRenewal = Instant .now ();
215+ }
216+
217+
218+
208219 final Optional <SourcePartitionStoreItem > optionalPartitionItem = sourceCoordinationStore .getSourcePartitionItem (sourceIdentifierWithPartitionType , partitionIdentifier .getPartitionKey ());
209220
210221 if (optionalPartitionItem .isPresent ()) {
@@ -515,6 +526,20 @@ private Optional<SourcePartitionStoreItem> acquireGlobalStateForPartitionCreatio
515526 return globalStateItemForPartitionCreation ;
516527 }
517528
529+ private void renewGlobalStateOwnershipForPartitionCreation () {
530+ try {
531+ final Optional <SourcePartitionStoreItem > globalStateItem = sourceCoordinationStore .getSourcePartitionItem (
532+ sourceIdentifierWithGlobalStateType , GLOBAL_STATE_SOURCE_PARTITION_KEY_FOR_CREATING_PARTITIONS );
533+
534+ if (globalStateItem .isPresent () && ownerId .equals (globalStateItem .get ().getPartitionOwner ())) {
535+ globalStateItem .get ().setPartitionOwnershipTimeout (Instant .now ().plus (DEFAULT_LEASE_TIMEOUT ));
536+ sourceCoordinationStore .tryUpdateSourcePartitionItem (globalStateItem .get ());
537+ }
538+ } catch (final Exception e ) {
539+ LOG .warn ("Failed to renew global state ownership for partition creation" , e );
540+ }
541+ }
542+
518543 private void giveUpAndSaveGlobalStateForPartitionCreation (final SourcePartitionStoreItem globalStateItemForPartitionCreation , final Map <String , Object > globalStateMap ) {
519544
520545 globalStateItemForPartitionCreation .setSourcePartitionStatus (SourcePartitionStatus .UNASSIGNED );
0 commit comments