From e408930510d776be2c069462e3227d485c38a18b Mon Sep 17 00:00:00 2001 From: Mohit Kumar Date: Tue, 24 Mar 2026 16:57:25 +0530 Subject: [PATCH 01/11] Stab at Fixing integ tests Signed-off-by: Mohit Kumar --- .../replication/ReplicationEngine.kt | 32 ++++++++++++------- 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/src/main/kotlin/org/opensearch/replication/ReplicationEngine.kt b/src/main/kotlin/org/opensearch/replication/ReplicationEngine.kt index 94a284b21..84f0424be 100644 --- a/src/main/kotlin/org/opensearch/replication/ReplicationEngine.kt +++ b/src/main/kotlin/org/opensearch/replication/ReplicationEngine.kt @@ -11,32 +11,40 @@ package org.opensearch.replication +import org.opensearch.index.engine.DeletionStrategy +import org.opensearch.index.engine.Engine import org.opensearch.index.engine.EngineConfig +import org.opensearch.index.engine.IndexingStrategy import org.opensearch.index.engine.InternalEngine import org.opensearch.index.seqno.SequenceNumbers class ReplicationEngine(config: EngineConfig) : InternalEngine(config) { - override fun assertPrimaryIncomingSequenceNumber(origin: Operation.Origin, seqNo: Long): Boolean { - assert(origin == Operation.Origin.PRIMARY) { "Expected origin PRIMARY for replicated ops but was $origin" } - assert(seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) { "Expected valid sequence number for replicated op but was unassigned" } - return true + /** + * Replicated operations arrive with Origin.PRIMARY but must be planned using the + * non-primary path to avoid assertions in the primary planning flow (e.g. + * tryAcquireInFlightDocs) that don't apply to cross-cluster replicated ops. + */ + override fun indexingStrategyForOperation(index: Engine.Index): IndexingStrategy { + return planIndexingAsNonPrimary(index) } - override fun generateSeqNoForOperationOnPrimary(operation: Operation): Long { - check(operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { "Expected valid sequence number for replicate op but was unassigned"} - return operation.seqNo() + override fun deletionStrategyForOperation(delete: Engine.Delete): DeletionStrategy { + return planDeletionAsNonPrimary(delete) } - override fun indexingStrategyForOperation(index: Index): IndexingStrategy { - return planIndexingAsNonPrimary(index) + override fun assertPrimaryIncomingSequenceNumber(origin: Engine.Operation.Origin, seqNo: Long): Boolean { + assert(origin == Engine.Operation.Origin.PRIMARY) { "Expected origin PRIMARY for replicated ops but was $origin" } + assert(seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) { "Expected valid sequence number for replicated op but was unassigned" } + return true } - override fun deletionStrategyForOperation(delete: Delete): DeletionStrategy { - return planDeletionAsNonPrimary(delete) + override fun generateSeqNoForOperationOnPrimary(operation: Engine.Operation): Long { + check(operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { "Expected valid sequence number for replicate op but was unassigned"} + return operation.seqNo() } - override fun assertNonPrimaryOrigin(operation: Operation): Boolean { + override fun assertNonPrimaryOrigin(operation: Engine.Operation): Boolean { return true } } From ed33475178b74beee0204f579f748c512262cb54 Mon Sep 17 00:00:00 2001 From: Mohit Kumar Date: Tue, 24 Mar 2026 17:43:31 +0530 Subject: [PATCH 02/11] Revert "Stab at Fixing integ tests" This reverts commit 0e4b126b6adbdb4558046c3b50cf746bc5293da7. Signed-off-by: Mohit Kumar --- .../replication/ReplicationEngine.kt | 32 +++++++------------ 1 file changed, 12 insertions(+), 20 deletions(-) diff --git a/src/main/kotlin/org/opensearch/replication/ReplicationEngine.kt b/src/main/kotlin/org/opensearch/replication/ReplicationEngine.kt index 84f0424be..94a284b21 100644 --- a/src/main/kotlin/org/opensearch/replication/ReplicationEngine.kt +++ b/src/main/kotlin/org/opensearch/replication/ReplicationEngine.kt @@ -11,40 +11,32 @@ package org.opensearch.replication -import org.opensearch.index.engine.DeletionStrategy -import org.opensearch.index.engine.Engine import org.opensearch.index.engine.EngineConfig -import org.opensearch.index.engine.IndexingStrategy import org.opensearch.index.engine.InternalEngine import org.opensearch.index.seqno.SequenceNumbers class ReplicationEngine(config: EngineConfig) : InternalEngine(config) { - /** - * Replicated operations arrive with Origin.PRIMARY but must be planned using the - * non-primary path to avoid assertions in the primary planning flow (e.g. - * tryAcquireInFlightDocs) that don't apply to cross-cluster replicated ops. - */ - override fun indexingStrategyForOperation(index: Engine.Index): IndexingStrategy { - return planIndexingAsNonPrimary(index) - } - - override fun deletionStrategyForOperation(delete: Engine.Delete): DeletionStrategy { - return planDeletionAsNonPrimary(delete) - } - - override fun assertPrimaryIncomingSequenceNumber(origin: Engine.Operation.Origin, seqNo: Long): Boolean { - assert(origin == Engine.Operation.Origin.PRIMARY) { "Expected origin PRIMARY for replicated ops but was $origin" } + override fun assertPrimaryIncomingSequenceNumber(origin: Operation.Origin, seqNo: Long): Boolean { + assert(origin == Operation.Origin.PRIMARY) { "Expected origin PRIMARY for replicated ops but was $origin" } assert(seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) { "Expected valid sequence number for replicated op but was unassigned" } return true } - override fun generateSeqNoForOperationOnPrimary(operation: Engine.Operation): Long { + override fun generateSeqNoForOperationOnPrimary(operation: Operation): Long { check(operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { "Expected valid sequence number for replicate op but was unassigned"} return operation.seqNo() } - override fun assertNonPrimaryOrigin(operation: Engine.Operation): Boolean { + override fun indexingStrategyForOperation(index: Index): IndexingStrategy { + return planIndexingAsNonPrimary(index) + } + + override fun deletionStrategyForOperation(delete: Delete): DeletionStrategy { + return planDeletionAsNonPrimary(delete) + } + + override fun assertNonPrimaryOrigin(operation: Operation): Boolean { return true } } From e9b381cd9fd0ff4ded56877133101b00d60e316f Mon Sep 17 00:00:00 2001 From: Mohit Kumar Date: Tue, 24 Mar 2026 20:31:44 +0530 Subject: [PATCH 03/11] Stab at Fixing integ tests Signed-off-by: Mohit Kumar --- .../replication/ReplicationEngine.kt | 24 +++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/src/main/kotlin/org/opensearch/replication/ReplicationEngine.kt b/src/main/kotlin/org/opensearch/replication/ReplicationEngine.kt index 94a284b21..f013828a2 100644 --- a/src/main/kotlin/org/opensearch/replication/ReplicationEngine.kt +++ b/src/main/kotlin/org/opensearch/replication/ReplicationEngine.kt @@ -11,7 +11,9 @@ package org.opensearch.replication +import org.opensearch.index.engine.DeletionStrategy import org.opensearch.index.engine.EngineConfig +import org.opensearch.index.engine.IndexingStrategy import org.opensearch.index.engine.InternalEngine import org.opensearch.index.seqno.SequenceNumbers @@ -28,12 +30,30 @@ class ReplicationEngine(config: EngineConfig) : InternalEngine(config) { return operation.seqNo() } + /** + * Route all index operations through the non-primary planning path. CCR replays operations with + * PRIMARY origin but they need non-primary planning to avoid version conflict checks and in-flight + * doc acquisition. We create a copy with REPLICA origin to satisfy the assertions in both the engine + * and the new OperationStrategyPlanner. + */ override fun indexingStrategyForOperation(index: Index): IndexingStrategy { - return planIndexingAsNonPrimary(index) + val replicaIndex = Index( + index.uid(), index.parsedDoc(), index.seqNo(), index.primaryTerm(), + index.version(), null, Operation.Origin.REPLICA, + index.startTime(), index.getAutoGeneratedIdTimestamp(), index.isRetry, + SequenceNumbers.UNASSIGNED_SEQ_NO, 0 + ) + return planIndexingAsNonPrimary(replicaIndex) } + // Same as above for delete operations. override fun deletionStrategyForOperation(delete: Delete): DeletionStrategy { - return planDeletionAsNonPrimary(delete) + val replicaDelete = Delete( + delete.id(), delete.uid(), delete.seqNo(), delete.primaryTerm(), + delete.version(), null, Operation.Origin.REPLICA, + delete.startTime(), SequenceNumbers.UNASSIGNED_SEQ_NO, 0 + ) + return planDeletionAsNonPrimary(replicaDelete) } override fun assertNonPrimaryOrigin(operation: Operation): Boolean { From e9a5102faaee57e6c14c2dbc74ee4605e3f84733 Mon Sep 17 00:00:00 2001 From: Mohit Kumar Date: Fri, 27 Mar 2026 14:30:58 +0530 Subject: [PATCH 04/11] Fixing CVE-2026-25645 and CVE-2026-24400 Signed-off-by: Mohit Kumar --- build.gradle | 2 +- perf_workflow/requirements.txt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/build.gradle b/build.gradle index 1abef3981..9abcefb12 100644 --- a/build.gradle +++ b/build.gradle @@ -157,7 +157,7 @@ dependencies { implementation "org.apache.httpcomponents.core5:httpcore5-h2:${versions.httpcore5}" testImplementation "org.opensearch.test:framework:${opensearch_version}" - testImplementation "org.assertj:assertj-core:3.17.2" + testImplementation "org.assertj:assertj-core:3.27.7" testImplementation "org.opensearch.client:opensearch-rest-high-level-client:${opensearch_version}" testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-test:1.6.0" // Moving away from kotlin_version testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}" diff --git a/perf_workflow/requirements.txt b/perf_workflow/requirements.txt index 7d82da6d7..c51f89513 100644 --- a/perf_workflow/requirements.txt +++ b/perf_workflow/requirements.txt @@ -4,7 +4,7 @@ yamlfix cerberus pipenv filelock>=3.20.3 -requests~=2.32.4 +requests>=2.33.0 retry2 ndg-httpsclient psutil From 5694df0a6a23679e2e01c7e67b9881c28042e87a Mon Sep 17 00:00:00 2001 From: Mohit Kumar Date: Mon, 20 Apr 2026 13:41:50 +0530 Subject: [PATCH 05/11] Add force resume coordinator and update resume replication flow Signed-off-by: Mohit Kumar --- .../action/resume/ForceResumeCoordinator.kt | 293 ++++++++++++++++++ .../action/resume/ForceResumeResult.kt | 29 ++ .../resume/ResumeIndexReplicationRequest.kt | 21 +- .../TransportResumeIndexReplicationAction.kt | 26 +- .../RemoteClusterRetentionLeaseHelper.kt | 37 +++ 5 files changed, 400 insertions(+), 6 deletions(-) create mode 100644 src/main/kotlin/org/opensearch/replication/action/resume/ForceResumeCoordinator.kt create mode 100644 src/main/kotlin/org/opensearch/replication/action/resume/ForceResumeResult.kt diff --git a/src/main/kotlin/org/opensearch/replication/action/resume/ForceResumeCoordinator.kt b/src/main/kotlin/org/opensearch/replication/action/resume/ForceResumeCoordinator.kt new file mode 100644 index 000000000..c14e8f8fa --- /dev/null +++ b/src/main/kotlin/org/opensearch/replication/action/resume/ForceResumeCoordinator.kt @@ -0,0 +1,293 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.replication.action.resume + +import org.apache.logging.log4j.LogManager +import org.opensearch.action.admin.indices.close.CloseIndexRequest +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest +import org.opensearch.action.admin.indices.stats.IndicesStatsAction +import org.opensearch.action.admin.indices.stats.IndicesStatsRequest +import org.opensearch.cluster.service.ClusterService +import org.opensearch.core.index.shard.ShardId +import org.opensearch.index.seqno.RetentionLeaseAlreadyExistsException +import org.opensearch.replication.action.index.block.IndexBlockUpdateType +import org.opensearch.replication.action.index.block.UpdateIndexBlockAction +import org.opensearch.replication.action.index.block.UpdateIndexBlockRequest +import org.opensearch.replication.metadata.ReplicationMetadataManager +import org.opensearch.replication.repository.RemoteClusterRepository +import org.opensearch.replication.seqno.RemoteClusterRetentionLeaseHelper +import org.opensearch.replication.task.index.IndexReplicationParams +import org.opensearch.replication.util.suspendExecute +import org.opensearch.replication.util.suspending +import org.opensearch.transport.client.Client + +/** + * Coordinates the force resume operation when retention leases have expired. + * + * The flow is: + * 1. Validate replication is PAUSED + * 2. Remove the index block on the follower so we can operate on it + * 3. Acquire retention leases on the leader BEFORE deleting the follower (prevents race condition) + * 4. Close and delete the follower index + * 5. On any failure after block removal: clean up leases and re-add the block + * + * After this coordinator completes, the caller starts an IndexReplicationTask. + * Since the follower index no longer exists, isResumed() returns false, + * which triggers setupAndStartRestore() -> snapshot bootstrap from leader. + */ +class ForceResumeCoordinator( + private val client: Client, + private val clusterService: ClusterService, + private val replicationMetadataManager: ReplicationMetadataManager +) { + companion object { + private val log = LogManager.getLogger(ForceResumeCoordinator::class.java) + } + + /** + * Executes the force resume workflow. Returns a [ForceResumeResult] with details. + * Throws on unrecoverable failure (after cleaning up). + */ + suspend fun executeForceResume(params: IndexReplicationParams): ForceResumeResult { + val followerIndex = params.followerIndexName + val startTime = System.currentTimeMillis() + val acquiredLeases = mutableMapOf() + + log.info("Starting force resume for index $followerIndex") + + // Step 1: Remove the replication block so we can operate on the follower index. + // If this fails, nothing has been modified — safe to propagate directly. + removeIndexBlock(followerIndex) + + // From this point, any failure must clean up acquired leases and re-add the block. + try { + // Step 2: Acquire retention leases on the leader BEFORE restore. + // This prevents the race condition where the leader's global checkpoint + // advances past the snapshot point before shard tasks can establish leases. + acquirePreRestoreRetentionLeases(params, acquiredLeases) + + // Step 3: Close and delete the follower index. + // This makes isResumed() return false so IndexReplicationTask + // goes through INIT -> setupAndStartRestore() -> RESTORING -> ... + prepareSnapshotBootstrap(followerIndex) + } catch (e: Exception) { + log.error("Force resume failed for $followerIndex, cleaning up", e) + cleanupOnFailure(params, acquiredLeases, followerIndex) + throw e + } + + val duration = System.currentTimeMillis() - startTime + log.info("Force resume coordinator completed for $followerIndex in ${duration}ms. " + + "Leases acquired for shards: ${acquiredLeases.keys}") + + return ForceResumeResult( + successful = true, + followerIndex = followerIndex, + leaseAcquiredAtSeqNo = acquiredLeases.toMap(), + durationMillis = duration + ) + } + + // Removes the replication index block so we can close/delete the follower. + private suspend fun removeIndexBlock(followerIndex: String) { + log.info("Removing index block for force resume on $followerIndex") + try { + val request = UpdateIndexBlockRequest(followerIndex, IndexBlockUpdateType.REMOVE_BLOCK) + client.suspendExecute(UpdateIndexBlockAction.INSTANCE, request, defaultContext = true) + log.info("Removed index block for $followerIndex") + } catch (e: Exception) { + log.error("Failed to remove index block for $followerIndex during force resume", e) + throw IllegalStateException( + "Force resume failed: unable to remove index block for $followerIndex. " + + "Index remains in PAUSED state. Error: ${e.message}", e + ) + } + } + + /** + * Acquires retention leases on the leader for each shard BEFORE deleting the follower. + * Tracks which shards got leases in [acquiredLeases] for cleanup on partial failure. + */ + private suspend fun acquirePreRestoreRetentionLeases( + params: IndexReplicationParams, + acquiredLeases: MutableMap + ) { + val remoteClient = client.getRemoteClusterClient(params.leaderAlias) + val retentionLeaseHelper = RemoteClusterRetentionLeaseHelper( + clusterService.clusterName.value(), + clusterService.state().metadata.clusterUUID(), + remoteClient + ) + + val shards = clusterService.state().routingTable + .indicesRouting().get(params.followerIndexName)?.shards() + + if (shards == null || shards.isEmpty()) { + log.warn("No shards found for follower index ${params.followerIndexName}") + return + } + + shards.forEach { entry -> + val followerShardId = entry.value.shardId + val leaderShardId = ShardId(params.leaderIndex, followerShardId.id) + + // Get the leader's current global checkpoint for this shard + val leaderCheckpoint = getLeaderGlobalCheckpoint( + remoteClient, params.leaderIndex.name, leaderShardId.id + ) + + // Acquire lease at leader's current checkpoint + 1 + // This ensures all operations from this point forward are retained + val retainingSeqNo = leaderCheckpoint + 1 + + try { + retentionLeaseHelper.addRetentionLease( + leaderShardId, retainingSeqNo, followerShardId, + RemoteClusterRepository.REMOTE_CLUSTER_REPO_REQ_TIMEOUT_IN_MILLI_SEC + ) + acquiredLeases[followerShardId.id] = retainingSeqNo + log.info("Acquired pre-restore retention lease for shard ${followerShardId.id} " + + "at seqNo $retainingSeqNo") + } catch (e: RetentionLeaseAlreadyExistsException) { + // Lease already exists (maybe from a previous attempt), renew it + retentionLeaseHelper.renewRetentionLease( + leaderShardId, retainingSeqNo, followerShardId, + RemoteClusterRepository.REMOTE_CLUSTER_REPO_REQ_TIMEOUT_IN_MILLI_SEC + ) + acquiredLeases[followerShardId.id] = retainingSeqNo + log.info("Renewed pre-restore retention lease for shard ${followerShardId.id} " + + "at seqNo $retainingSeqNo") + } + } + + log.info("Acquired pre-restore retention leases for all ${acquiredLeases.size} shards " + + "of ${params.followerIndexName}") + } + + // Retrieves the leader shard's global checkpoint via IndicesStats. + private suspend fun getLeaderGlobalCheckpoint( + remoteClient: Client, + leaderIndexName: String, + shardId: Int + ): Long { + val statsRequest = IndicesStatsRequest().all().indices(leaderIndexName) + val statsResponse = remoteClient.suspendExecute( + IndicesStatsAction.INSTANCE, statsRequest, injectSecurityContext = true + ) + + for (shardStats in statsResponse.shards) { + if (shardStats.shardRouting.shardId().id == shardId && shardStats.shardRouting.primary()) { + val globalCheckpoint = shardStats.seqNoStats?.globalCheckpoint + ?: throw IllegalStateException( + "Unable to get global checkpoint for leader shard $leaderIndexName[$shardId]" + ) + log.debug("Leader shard $leaderIndexName[$shardId] global checkpoint: $globalCheckpoint") + return globalCheckpoint + } + } + + throw IllegalStateException( + "Primary shard $shardId not found for leader index $leaderIndexName" + ) + } + + // Closes and deletes the follower index to trigger snapshot bootstrap. + private suspend fun prepareSnapshotBootstrap(followerIndex: String) { + log.info("Preparing snapshot bootstrap: closing and deleting follower index $followerIndex") + + // Close the index first (best-effort — it may already be closed) + try { + val closeRequest = CloseIndexRequest(followerIndex) + client.suspending(client.admin().indices()::close, defaultContext = true)(closeRequest) + log.info("Closed follower index $followerIndex") + } catch (e: Exception) { + log.warn("Failed to close follower index $followerIndex " + + "(may already be closed): ${e.message}") + } + + // Delete the follower index — this makes isResumed() return false + val deleteRequest = DeleteIndexRequest(followerIndex) + client.suspending( + client.admin().indices()::delete, defaultContext = true + )(deleteRequest) + log.info("Deleted follower index $followerIndex — " + + "IndexReplicationTask will restore from snapshot") + } + + // Cleans up on failure: removes any partially acquired leases and re-adds the index block. + private suspend fun cleanupOnFailure( + params: IndexReplicationParams, + acquiredLeases: Map, + followerIndex: String + ) { + // Clean up any partially acquired retention leases + if (acquiredLeases.isNotEmpty()) { + cleanupAcquiredLeases(params, acquiredLeases) + } + + // Re-add the index block to preserve follower state + restoreBlockOnFailure(followerIndex) + } + + // Removes retention leases that were acquired during a failed force resume attempt. + private suspend fun cleanupAcquiredLeases( + params: IndexReplicationParams, + acquiredLeases: Map + ) { + log.info("Cleaning up ${acquiredLeases.size} partially acquired retention leases " + + "for ${params.followerIndexName}") + try { + val remoteClient = client.getRemoteClusterClient(params.leaderAlias) + val retentionLeaseHelper = RemoteClusterRetentionLeaseHelper( + clusterService.clusterName.value(), + clusterService.state().metadata.clusterUUID(), + remoteClient + ) + + for ((shardIdInt, _) in acquiredLeases) { + val followerShardId = ShardId( + clusterService.state().metadata.index(params.followerIndexName)?.index + ?: continue, + shardIdInt + ) + val leaderShardId = ShardId(params.leaderIndex, shardIdInt) + try { + retentionLeaseHelper.attemptRetentionLeaseRemoval(leaderShardId, followerShardId) + log.info("Cleaned up retention lease for shard $shardIdInt") + } catch (e: Exception) { + // Best-effort cleanup — lease will expire naturally in 12h + log.warn("Failed to clean up retention lease for shard $shardIdInt: ${e.message}") + } + } + } catch (e: Exception) { + log.warn("Failed to clean up retention leases for ${params.followerIndexName}: ${e.message}") + } + } + + /** + * Re-adds the replication index block after a failed force resume attempt. + * This preserves the follower index in a consistent PAUSED state. + */ + private suspend fun restoreBlockOnFailure(followerIndex: String) { + try { + log.info("Re-adding index block for $followerIndex after force resume failure") + val request = UpdateIndexBlockRequest(followerIndex, IndexBlockUpdateType.ADD_BLOCK) + client.suspendExecute(UpdateIndexBlockAction.INSTANCE, request, defaultContext = true) + log.info("Restored index block for $followerIndex") + } catch (e: Exception) { + // This is a serious situation — the follower is unblocked but force resume failed. + // Log at error level so operators can investigate. + log.error("CRITICAL: Failed to re-add index block for $followerIndex after force resume failure. " + + "The index may be in an inconsistent state. Manual intervention may be required.", e) + } + } +} diff --git a/src/main/kotlin/org/opensearch/replication/action/resume/ForceResumeResult.kt b/src/main/kotlin/org/opensearch/replication/action/resume/ForceResumeResult.kt new file mode 100644 index 000000000..742c97332 --- /dev/null +++ b/src/main/kotlin/org/opensearch/replication/action/resume/ForceResumeResult.kt @@ -0,0 +1,29 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.replication.action.resume + +/** + * Captures the result of a force resume operation for logging and diagnostics. + * + * @param successful Whether the force resume coordinator completed successfully + * @param followerIndex The name of the follower index being force-resumed + * @param leaseAcquiredAtSeqNo Map of shardId to the retaining sequence number where the lease was acquired + * @param durationMillis How long the force resume coordinator took + * @param failureReason Description of the failure if unsuccessful + */ +data class ForceResumeResult( + val successful: Boolean, + val followerIndex: String, + val leaseAcquiredAtSeqNo: Map, + val durationMillis: Long, + val failureReason: String? = null +) diff --git a/src/main/kotlin/org/opensearch/replication/action/resume/ResumeIndexReplicationRequest.kt b/src/main/kotlin/org/opensearch/replication/action/resume/ResumeIndexReplicationRequest.kt index 30413e0b2..b6ba33046 100644 --- a/src/main/kotlin/org/opensearch/replication/action/resume/ResumeIndexReplicationRequest.kt +++ b/src/main/kotlin/org/opensearch/replication/action/resume/ResumeIndexReplicationRequest.kt @@ -15,6 +15,7 @@ import org.opensearch.action.ActionRequestValidationException import org.opensearch.action.IndicesRequest import org.opensearch.action.support.IndicesOptions import org.opensearch.action.support.clustermanager.AcknowledgedRequest +import org.opensearch.core.ParseField import org.opensearch.core.common.io.stream.StreamInput import org.opensearch.core.common.io.stream.StreamOutput import org.opensearch.core.xcontent.* @@ -22,9 +23,12 @@ import org.opensearch.core.xcontent.* class ResumeIndexReplicationRequest : AcknowledgedRequest, IndicesRequest.Replaceable, ToXContentObject { lateinit var indexName: String + @JvmField + var forceResume: Boolean = false - constructor(indexName: String) { + constructor(indexName: String, forceResume: Boolean = false) { this.indexName = indexName + this.forceResume = forceResume } private constructor() { @@ -32,17 +36,24 @@ class ResumeIndexReplicationRequest : AcknowledgedRequest("ResumeReplicationRequestParser") { ResumeIndexReplicationRequest() } + init { + PARSER.declareBoolean({ req, value -> req.forceResume = value }, ParseField(FORCE_RESUME_FIELD)) + } + fun fromXContent(parser: XContentParser, followerIndex: String): ResumeIndexReplicationRequest { - val ResumeIndexReplicationRequest = PARSER.parse(parser, null) - ResumeIndexReplicationRequest.indexName = followerIndex - return ResumeIndexReplicationRequest + val resumeRequest = PARSER.parse(parser, null) + resumeRequest.indexName = followerIndex + return resumeRequest } } @@ -65,6 +76,7 @@ class ResumeIndexReplicationRequest : AcknowledgedRequest= maxAttempts) throw e + log.warn("Retention lease acquisition attempt $attempt/$maxAttempts failed " + + "for shard ${followerShardId.id}: ${e.message}. Retrying in ${backoff}ms...") + kotlinx.coroutines.delay(backoff) + backoff = (backoff * 2).coerceAtMost(30000L) + } + } + return false + } + /** * Remove these once the callers are moved to above APIs */ From ea1d75a70bb7c26414f5afbcd0525514ff6c3f2a Mon Sep 17 00:00:00 2001 From: Mohit Kumar Date: Mon, 20 Apr 2026 14:35:39 +0530 Subject: [PATCH 06/11] Refactor ForceResumeCoordinator to reuse existing infrastructure - Removed reimplemented block removal/addition (reuses UpdateIndexBlockAction, same as stop action) - Removed reimplemented index deletion (reuses same pattern as IndexReplicationTask.cancelRestore) - Removed reimplemented lease cleanup (reuses existing RemoteClusterRetentionLeaseHelper methods) - Removed ReplicationMetadataManager dependency from coordinator (not needed) - Simplified getLeaderGlobalCheckpoint using firstOrNull (same API as RemoteClusterRepository) - Only genuinely new logic retained: pre-restore lease acquisition at leaderCheckpoint+1 Signed-off-by: Mohit Kumar --- .../action/resume/ForceResumeCoordinator.kt | 232 ++++++------------ .../TransportResumeIndexReplicationAction.kt | 2 +- 2 files changed, 71 insertions(+), 163 deletions(-) diff --git a/src/main/kotlin/org/opensearch/replication/action/resume/ForceResumeCoordinator.kt b/src/main/kotlin/org/opensearch/replication/action/resume/ForceResumeCoordinator.kt index c14e8f8fa..85eaf72c2 100644 --- a/src/main/kotlin/org/opensearch/replication/action/resume/ForceResumeCoordinator.kt +++ b/src/main/kotlin/org/opensearch/replication/action/resume/ForceResumeCoordinator.kt @@ -12,7 +12,6 @@ package org.opensearch.replication.action.resume import org.apache.logging.log4j.LogManager -import org.opensearch.action.admin.indices.close.CloseIndexRequest import org.opensearch.action.admin.indices.delete.DeleteIndexRequest import org.opensearch.action.admin.indices.stats.IndicesStatsAction import org.opensearch.action.admin.indices.stats.IndicesStatsRequest @@ -22,7 +21,6 @@ import org.opensearch.index.seqno.RetentionLeaseAlreadyExistsException import org.opensearch.replication.action.index.block.IndexBlockUpdateType import org.opensearch.replication.action.index.block.UpdateIndexBlockAction import org.opensearch.replication.action.index.block.UpdateIndexBlockRequest -import org.opensearch.replication.metadata.ReplicationMetadataManager import org.opensearch.replication.repository.RemoteClusterRepository import org.opensearch.replication.seqno.RemoteClusterRetentionLeaseHelper import org.opensearch.replication.task.index.IndexReplicationParams @@ -33,29 +31,28 @@ import org.opensearch.transport.client.Client /** * Coordinates the force resume operation when retention leases have expired. * - * The flow is: - * 1. Validate replication is PAUSED - * 2. Remove the index block on the follower so we can operate on it - * 3. Acquire retention leases on the leader BEFORE deleting the follower (prevents race condition) - * 4. Close and delete the follower index - * 5. On any failure after block removal: clean up leases and re-add the block + * Reuses existing infrastructure: + * - Block removal/addition: same as TransportStopIndexReplicationAction + * - Index deletion: same as IndexReplicationTask.cancelRestore() + * - Lease add/renew/remove: existing RemoteClusterRetentionLeaseHelper methods * - * After this coordinator completes, the caller starts an IndexReplicationTask. - * Since the follower index no longer exists, isResumed() returns false, - * which triggers setupAndStartRestore() -> snapshot bootstrap from leader. + * The only new logic is acquiring retention leases at leaderGlobalCheckpoint+1 + * BEFORE deleting the follower index, which prevents the race condition where + * the leader's translog is purged during the async snapshot restore. */ class ForceResumeCoordinator( private val client: Client, - private val clusterService: ClusterService, - private val replicationMetadataManager: ReplicationMetadataManager + private val clusterService: ClusterService ) { companion object { private val log = LogManager.getLogger(ForceResumeCoordinator::class.java) } /** - * Executes the force resume workflow. Returns a [ForceResumeResult] with details. - * Throws on unrecoverable failure (after cleaning up). + * Executes the force resume workflow: + * 1. Remove index block (reuses UpdateIndexBlockAction — same as stop action) + * 2. Acquire retention leases at leaderCheckpoint+1 per shard (NEW — race condition fix) + * 3. Delete follower index (reuses same pattern as IndexReplicationTask.cancelRestore) */ suspend fun executeForceResume(params: IndexReplicationParams): ForceResumeResult { val followerIndex = params.followerIndexName @@ -64,21 +61,17 @@ class ForceResumeCoordinator( log.info("Starting force resume for index $followerIndex") - // Step 1: Remove the replication block so we can operate on the follower index. - // If this fails, nothing has been modified — safe to propagate directly. + // Step 1: Remove index block — same pattern as TransportStopIndexReplicationAction#L103 removeIndexBlock(followerIndex) - // From this point, any failure must clean up acquired leases and re-add the block. + // From this point, any failure must clean up leases and re-add the block. try { - // Step 2: Acquire retention leases on the leader BEFORE restore. - // This prevents the race condition where the leader's global checkpoint - // advances past the snapshot point before shard tasks can establish leases. - acquirePreRestoreRetentionLeases(params, acquiredLeases) + // Step 2: Acquire retention leases on leader BEFORE deleting follower. + // This is the only genuinely new logic — prevents the race condition. + acquirePreRestoreLeases(params, acquiredLeases) - // Step 3: Close and delete the follower index. - // This makes isResumed() return false so IndexReplicationTask - // goes through INIT -> setupAndStartRestore() -> RESTORING -> ... - prepareSnapshotBootstrap(followerIndex) + // Step 3: Delete follower index — same pattern as IndexReplicationTask.cancelRestore() + deleteFollowerIndex(followerIndex) } catch (e: Exception) { log.error("Force resume failed for $followerIndex, cleaning up", e) cleanupOnFailure(params, acquiredLeases, followerIndex) @@ -86,8 +79,7 @@ class ForceResumeCoordinator( } val duration = System.currentTimeMillis() - startTime - log.info("Force resume coordinator completed for $followerIndex in ${duration}ms. " + - "Leases acquired for shards: ${acquiredLeases.keys}") + log.info("Force resume completed for $followerIndex in ${duration}ms") return ForceResumeResult( successful = true, @@ -97,27 +89,18 @@ class ForceResumeCoordinator( ) } - // Removes the replication index block so we can close/delete the follower. private suspend fun removeIndexBlock(followerIndex: String) { - log.info("Removing index block for force resume on $followerIndex") - try { - val request = UpdateIndexBlockRequest(followerIndex, IndexBlockUpdateType.REMOVE_BLOCK) - client.suspendExecute(UpdateIndexBlockAction.INSTANCE, request, defaultContext = true) - log.info("Removed index block for $followerIndex") - } catch (e: Exception) { - log.error("Failed to remove index block for $followerIndex during force resume", e) - throw IllegalStateException( - "Force resume failed: unable to remove index block for $followerIndex. " + - "Index remains in PAUSED state. Error: ${e.message}", e - ) - } + log.info("Removing index block for $followerIndex") + val request = UpdateIndexBlockRequest(followerIndex, IndexBlockUpdateType.REMOVE_BLOCK) + client.suspendExecute(UpdateIndexBlockAction.INSTANCE, request, defaultContext = true) } /** - * Acquires retention leases on the leader for each shard BEFORE deleting the follower. - * Tracks which shards got leases in [acquiredLeases] for cleanup on partial failure. + Acquires retention leases at leaderGlobalCheckpoint+1 per shard BEFORE + * deleting the follower. This prevents the race condition where the leader's + * global checkpoint advances during restore and operations get purged. */ - private suspend fun acquirePreRestoreRetentionLeases( + private suspend fun acquirePreRestoreLeases( params: IndexReplicationParams, acquiredLeases: MutableMap ) { @@ -130,164 +113,89 @@ class ForceResumeCoordinator( val shards = clusterService.state().routingTable .indicesRouting().get(params.followerIndexName)?.shards() - - if (shards == null || shards.isEmpty()) { - log.warn("No shards found for follower index ${params.followerIndexName}") - return - } + ?: return shards.forEach { entry -> val followerShardId = entry.value.shardId val leaderShardId = ShardId(params.leaderIndex, followerShardId.id) - - // Get the leader's current global checkpoint for this shard - val leaderCheckpoint = getLeaderGlobalCheckpoint( - remoteClient, params.leaderIndex.name, leaderShardId.id - ) - - // Acquire lease at leader's current checkpoint + 1 - // This ensures all operations from this point forward are retained - val retainingSeqNo = leaderCheckpoint + 1 + val retainingSeqNo = getLeaderGlobalCheckpoint(remoteClient, params.leaderIndex.name, leaderShardId.id) + 1 try { retentionLeaseHelper.addRetentionLease( leaderShardId, retainingSeqNo, followerShardId, RemoteClusterRepository.REMOTE_CLUSTER_REPO_REQ_TIMEOUT_IN_MILLI_SEC ) - acquiredLeases[followerShardId.id] = retainingSeqNo - log.info("Acquired pre-restore retention lease for shard ${followerShardId.id} " + - "at seqNo $retainingSeqNo") } catch (e: RetentionLeaseAlreadyExistsException) { - // Lease already exists (maybe from a previous attempt), renew it retentionLeaseHelper.renewRetentionLease( leaderShardId, retainingSeqNo, followerShardId, RemoteClusterRepository.REMOTE_CLUSTER_REPO_REQ_TIMEOUT_IN_MILLI_SEC ) - acquiredLeases[followerShardId.id] = retainingSeqNo - log.info("Renewed pre-restore retention lease for shard ${followerShardId.id} " + - "at seqNo $retainingSeqNo") } + acquiredLeases[followerShardId.id] = retainingSeqNo + log.info("Acquired pre-restore lease for shard ${followerShardId.id} at seqNo $retainingSeqNo") } - - log.info("Acquired pre-restore retention leases for all ${acquiredLeases.size} shards " + - "of ${params.followerIndexName}") } - - // Retrieves the leader shard's global checkpoint via IndicesStats. - private suspend fun getLeaderGlobalCheckpoint( - remoteClient: Client, - leaderIndexName: String, - shardId: Int - ): Long { - val statsRequest = IndicesStatsRequest().all().indices(leaderIndexName) + + // Fetches the leader shard's global checkpoint via IndicesStatsAction. + private suspend fun getLeaderGlobalCheckpoint(remoteClient: Client, leaderIndexName: String, shardId: Int): Long { val statsResponse = remoteClient.suspendExecute( - IndicesStatsAction.INSTANCE, statsRequest, injectSecurityContext = true - ) - - for (shardStats in statsResponse.shards) { - if (shardStats.shardRouting.shardId().id == shardId && shardStats.shardRouting.primary()) { - val globalCheckpoint = shardStats.seqNoStats?.globalCheckpoint - ?: throw IllegalStateException( - "Unable to get global checkpoint for leader shard $leaderIndexName[$shardId]" - ) - log.debug("Leader shard $leaderIndexName[$shardId] global checkpoint: $globalCheckpoint") - return globalCheckpoint - } - } - - throw IllegalStateException( - "Primary shard $shardId not found for leader index $leaderIndexName" + IndicesStatsAction.INSTANCE, + IndicesStatsRequest().all().indices(leaderIndexName), + injectSecurityContext = true ) + return statsResponse.shards + .firstOrNull { it.shardRouting.shardId().id == shardId && it.shardRouting.primary() } + ?.seqNoStats?.globalCheckpoint + ?: throw IllegalStateException("Primary shard $shardId not found for leader index $leaderIndexName") } - // Closes and deletes the follower index to trigger snapshot bootstrap. - private suspend fun prepareSnapshotBootstrap(followerIndex: String) { - log.info("Preparing snapshot bootstrap: closing and deleting follower index $followerIndex") - - // Close the index first (best-effort — it may already be closed) - try { - val closeRequest = CloseIndexRequest(followerIndex) - client.suspending(client.admin().indices()::close, defaultContext = true)(closeRequest) - log.info("Closed follower index $followerIndex") - } catch (e: Exception) { - log.warn("Failed to close follower index $followerIndex " + - "(may already be closed): ${e.message}") - } - - // Delete the follower index — this makes isResumed() return false - val deleteRequest = DeleteIndexRequest(followerIndex) - client.suspending( - client.admin().indices()::delete, defaultContext = true - )(deleteRequest) - log.info("Deleted follower index $followerIndex — " + - "IndexReplicationTask will restore from snapshot") + /** + * Same pattern as IndexReplicationTask.cancelRestore() — deletes the follower index. + * This makes isResumed() return false, triggering setupAndStartRestore() in the state machine. + */ + private suspend fun deleteFollowerIndex(followerIndex: String) { + log.info("Deleting follower index $followerIndex for snapshot bootstrap") + client.suspending(client.admin().indices()::delete, defaultContext = true)(DeleteIndexRequest(followerIndex)) } - // Cleans up on failure: removes any partially acquired leases and re-adds the index block. + /** + * Cleanup on failure: remove partially acquired leases + re-add index block. + * Uses existing RemoteClusterRetentionLeaseHelper.attemptRetentionLeaseRemoval() + * and UpdateIndexBlockAction with ADD_BLOCK. + */ private suspend fun cleanupOnFailure( params: IndexReplicationParams, acquiredLeases: Map, followerIndex: String ) { - // Clean up any partially acquired retention leases + // Clean up partially acquired leases — uses existing attemptRetentionLeaseRemoval() if (acquiredLeases.isNotEmpty()) { - cleanupAcquiredLeases(params, acquiredLeases) - } - - // Re-add the index block to preserve follower state - restoreBlockOnFailure(followerIndex) - } - - // Removes retention leases that were acquired during a failed force resume attempt. - private suspend fun cleanupAcquiredLeases( - params: IndexReplicationParams, - acquiredLeases: Map - ) { - log.info("Cleaning up ${acquiredLeases.size} partially acquired retention leases " + - "for ${params.followerIndexName}") - try { - val remoteClient = client.getRemoteClusterClient(params.leaderAlias) - val retentionLeaseHelper = RemoteClusterRetentionLeaseHelper( - clusterService.clusterName.value(), - clusterService.state().metadata.clusterUUID(), - remoteClient - ) - - for ((shardIdInt, _) in acquiredLeases) { - val followerShardId = ShardId( - clusterService.state().metadata.index(params.followerIndexName)?.index - ?: continue, - shardIdInt + try { + val remoteClient = client.getRemoteClusterClient(params.leaderAlias) + val retentionLeaseHelper = RemoteClusterRetentionLeaseHelper( + clusterService.clusterName.value(), + clusterService.state().metadata.clusterUUID(), + remoteClient ) - val leaderShardId = ShardId(params.leaderIndex, shardIdInt) - try { - retentionLeaseHelper.attemptRetentionLeaseRemoval(leaderShardId, followerShardId) - log.info("Cleaned up retention lease for shard $shardIdInt") - } catch (e: Exception) { - // Best-effort cleanup — lease will expire naturally in 12h - log.warn("Failed to clean up retention lease for shard $shardIdInt: ${e.message}") + for (shardIdInt in acquiredLeases.keys) { + val followerShardId = ShardId( + clusterService.state().metadata.index(params.followerIndexName)?.index ?: continue, + shardIdInt + ) + retentionLeaseHelper.attemptRetentionLeaseRemoval(ShardId(params.leaderIndex, shardIdInt), followerShardId) } + } catch (e: Exception) { + log.warn("Best-effort lease cleanup failed for ${params.followerIndexName}: ${e.message}") } - } catch (e: Exception) { - log.warn("Failed to clean up retention leases for ${params.followerIndexName}: ${e.message}") } - } - /** - * Re-adds the replication index block after a failed force resume attempt. - * This preserves the follower index in a consistent PAUSED state. - */ - private suspend fun restoreBlockOnFailure(followerIndex: String) { + // Re-add index block — same pattern as IndexReplicationTask.addIndexBlockForReplication() try { - log.info("Re-adding index block for $followerIndex after force resume failure") val request = UpdateIndexBlockRequest(followerIndex, IndexBlockUpdateType.ADD_BLOCK) client.suspendExecute(UpdateIndexBlockAction.INSTANCE, request, defaultContext = true) log.info("Restored index block for $followerIndex") } catch (e: Exception) { - // This is a serious situation — the follower is unblocked but force resume failed. - // Log at error level so operators can investigate. - log.error("CRITICAL: Failed to re-add index block for $followerIndex after force resume failure. " + - "The index may be in an inconsistent state. Manual intervention may be required.", e) + log.error("CRITICAL: Failed to re-add index block for $followerIndex. Manual intervention may be required.", e) } } } diff --git a/src/main/kotlin/org/opensearch/replication/action/resume/TransportResumeIndexReplicationAction.kt b/src/main/kotlin/org/opensearch/replication/action/resume/TransportResumeIndexReplicationAction.kt index 3495ca506..f2c019a14 100644 --- a/src/main/kotlin/org/opensearch/replication/action/resume/TransportResumeIndexReplicationAction.kt +++ b/src/main/kotlin/org/opensearch/replication/action/resume/TransportResumeIndexReplicationAction.kt @@ -104,7 +104,7 @@ class TransportResumeIndexReplicationAction @Inject constructor(transportService // Delegate to force resume coordinator (snapshot bootstrap path) log.info("Retention lease expired for ${request.indexName}. " + "Force resume requested — initiating snapshot bootstrap.") - val coordinator = ForceResumeCoordinator(client, clusterService, replicationMetadataManager) + val coordinator = ForceResumeCoordinator(client, clusterService) val result = coordinator.executeForceResume(params) log.info("Force resume coordinator completed for ${request.indexName}: " + "leases acquired at ${result.leaseAcquiredAtSeqNo}, " + From 148f50350925692d20e08e71b6752e7cc7970225 Mon Sep 17 00:00:00 2001 From: Mohit Kumar Date: Mon, 20 Apr 2026 14:41:30 +0530 Subject: [PATCH 07/11] Minor: clean up comment formatting in ForceResumeCoordinator Signed-off-by: Mohit Kumar --- .../action/resume/ForceResumeCoordinator.kt | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/main/kotlin/org/opensearch/replication/action/resume/ForceResumeCoordinator.kt b/src/main/kotlin/org/opensearch/replication/action/resume/ForceResumeCoordinator.kt index 85eaf72c2..8d472e4a0 100644 --- a/src/main/kotlin/org/opensearch/replication/action/resume/ForceResumeCoordinator.kt +++ b/src/main/kotlin/org/opensearch/replication/action/resume/ForceResumeCoordinator.kt @@ -30,12 +30,10 @@ import org.opensearch.transport.client.Client /** * Coordinates the force resume operation when retention leases have expired. - * * Reuses existing infrastructure: - * - Block removal/addition: same as TransportStopIndexReplicationAction - * - Index deletion: same as IndexReplicationTask.cancelRestore() - * - Lease add/renew/remove: existing RemoteClusterRetentionLeaseHelper methods - * + * Block removal/addition: same as TransportStopIndexReplicationAction + * Index deletion: same as IndexReplicationTask.cancelRestore() + * Lease add/renew/remove: existing RemoteClusterRetentionLeaseHelper methods * The only new logic is acquiring retention leases at leaderGlobalCheckpoint+1 * BEFORE deleting the follower index, which prevents the race condition where * the leader's translog is purged during the async snapshot restore. @@ -135,7 +133,7 @@ class ForceResumeCoordinator( log.info("Acquired pre-restore lease for shard ${followerShardId.id} at seqNo $retainingSeqNo") } } - + // Fetches the leader shard's global checkpoint via IndicesStatsAction. private suspend fun getLeaderGlobalCheckpoint(remoteClient: Client, leaderIndexName: String, shardId: Int): Long { val statsResponse = remoteClient.suspendExecute( From d77c2c708410e2366c9a9789a076e8614f7b875c Mon Sep 17 00:00:00 2001 From: Mohit Kumar Date: Mon, 20 Apr 2026 14:41:30 +0530 Subject: [PATCH 08/11] Minor: clean up comment formatting in ForceResumeCoordinator Signed-off-by: Mohit Kumar --- .../action/resume/ForceResumeCoordinator.kt | 7 +-- .../RemoteClusterRetentionLeaseHelper.kt | 52 +++++-------------- 2 files changed, 14 insertions(+), 45 deletions(-) diff --git a/src/main/kotlin/org/opensearch/replication/action/resume/ForceResumeCoordinator.kt b/src/main/kotlin/org/opensearch/replication/action/resume/ForceResumeCoordinator.kt index 8d472e4a0..e3d8a2ba2 100644 --- a/src/main/kotlin/org/opensearch/replication/action/resume/ForceResumeCoordinator.kt +++ b/src/main/kotlin/org/opensearch/replication/action/resume/ForceResumeCoordinator.kt @@ -21,7 +21,6 @@ import org.opensearch.index.seqno.RetentionLeaseAlreadyExistsException import org.opensearch.replication.action.index.block.IndexBlockUpdateType import org.opensearch.replication.action.index.block.UpdateIndexBlockAction import org.opensearch.replication.action.index.block.UpdateIndexBlockRequest -import org.opensearch.replication.repository.RemoteClusterRepository import org.opensearch.replication.seqno.RemoteClusterRetentionLeaseHelper import org.opensearch.replication.task.index.IndexReplicationParams import org.opensearch.replication.util.suspendExecute @@ -120,13 +119,11 @@ class ForceResumeCoordinator( try { retentionLeaseHelper.addRetentionLease( - leaderShardId, retainingSeqNo, followerShardId, - RemoteClusterRepository.REMOTE_CLUSTER_REPO_REQ_TIMEOUT_IN_MILLI_SEC + leaderShardId, retainingSeqNo, followerShardId ) } catch (e: RetentionLeaseAlreadyExistsException) { retentionLeaseHelper.renewRetentionLease( - leaderShardId, retainingSeqNo, followerShardId, - RemoteClusterRepository.REMOTE_CLUSTER_REPO_REQ_TIMEOUT_IN_MILLI_SEC + leaderShardId, retainingSeqNo, followerShardId ) } acquiredLeases[followerShardId.id] = retainingSeqNo diff --git a/src/main/kotlin/org/opensearch/replication/seqno/RemoteClusterRetentionLeaseHelper.kt b/src/main/kotlin/org/opensearch/replication/seqno/RemoteClusterRetentionLeaseHelper.kt index 084c739b2..1cf8a67f2 100644 --- a/src/main/kotlin/org/opensearch/replication/seqno/RemoteClusterRetentionLeaseHelper.kt +++ b/src/main/kotlin/org/opensearch/replication/seqno/RemoteClusterRetentionLeaseHelper.kt @@ -115,6 +115,18 @@ class RemoteClusterRetentionLeaseHelper constructor(var followerClusterNameWithU } } + /** + * Suspend (non-blocking) version of addRetentionLease for use in coroutine contexts + * (e.g., transport thread). The blocking overload below uses actionGet() and must NOT + * be called from transport threads. + */ + public suspend fun addRetentionLease(leaderShardId: ShardId, seqNo: Long, followerShardId: ShardId) { + val retentionLeaseId = retentionLeaseIdForShard(followerClusterNameWithUUID, followerShardId) + val request = RetentionLeaseActions.AddRequest(leaderShardId, retentionLeaseId, seqNo, retentionLeaseSource) + log.info("Adding retention lease $retentionLeaseId (async)") + client.suspendExecute(RetentionLeaseActions.Add.INSTANCE, request) + } + public suspend fun renewRetentionLease(leaderShardId: ShardId, seqNo: Long, followerShardId: ShardId) { val retentionLeaseId = retentionLeaseIdForShard(followerClusterNameWithUUID, followerShardId) val request = RetentionLeaseActions.RenewRequest(leaderShardId, retentionLeaseId, seqNo, retentionLeaseSource) @@ -194,46 +206,6 @@ class RemoteClusterRetentionLeaseHelper constructor(var followerClusterNameWithU } - /** - * Acquires a retention lease with exponential backoff retry logic. - * On [RetentionLeaseAlreadyExistsException], renews the existing lease instead. - * Used by force resume to reliably acquire pre-restore leases. - * - * @return true if the lease was successfully acquired or renewed - * @throws Exception if all retry attempts are exhausted - */ - public suspend fun acquireLeaseWithRetry( - leaderShardId: ShardId, - retainingSeqNo: Long, - followerShardId: ShardId, - timeout: Long, - maxAttempts: Int = 5 - ): Boolean { - var attempt = 0 - var backoff = 1000L // 1 second initial backoff - - while (attempt < maxAttempts) { - try { - addRetentionLease(leaderShardId, retainingSeqNo, followerShardId, timeout) - return true - } catch (e: RetentionLeaseAlreadyExistsException) { - renewRetentionLease(leaderShardId, retainingSeqNo, followerShardId, timeout) - return true - } catch (e: Exception) { - attempt++ - if (attempt >= maxAttempts) throw e - log.warn("Retention lease acquisition attempt $attempt/$maxAttempts failed " + - "for shard ${followerShardId.id}: ${e.message}. Retrying in ${backoff}ms...") - kotlinx.coroutines.delay(backoff) - backoff = (backoff * 2).coerceAtMost(30000L) - } - } - return false - } - - /** - * Remove these once the callers are moved to above APIs - */ public fun addRetentionLease(leaderShardId: ShardId, seqNo: Long, followerShardId: ShardId, timeout: Long) { val retentionLeaseId = retentionLeaseIdForShard(followerClusterNameWithUUID, followerShardId) From 9d5ec94aadba4f3b1d12d2b5256afcb8837fc122 Mon Sep 17 00:00:00 2001 From: Mohit Kumar Date: Sat, 9 May 2026 18:27:07 +0530 Subject: [PATCH 09/11] Adding UTs for force resume feature Signed-off-by: Mohit Kumar --- .../resume/ForceResumeCoordinatorTests.kt | 413 ++++++++++++++++++ .../action/resume/ForceResumeResultTests.kt | 60 +++ .../ResumeIndexReplicationRequestTests.kt | 95 ++++ 3 files changed, 568 insertions(+) create mode 100644 src/test/kotlin/org/opensearch/replication/action/resume/ForceResumeCoordinatorTests.kt create mode 100644 src/test/kotlin/org/opensearch/replication/action/resume/ForceResumeResultTests.kt create mode 100644 src/test/kotlin/org/opensearch/replication/action/resume/ResumeIndexReplicationRequestTests.kt diff --git a/src/test/kotlin/org/opensearch/replication/action/resume/ForceResumeCoordinatorTests.kt b/src/test/kotlin/org/opensearch/replication/action/resume/ForceResumeCoordinatorTests.kt new file mode 100644 index 000000000..b30811871 --- /dev/null +++ b/src/test/kotlin/org/opensearch/replication/action/resume/ForceResumeCoordinatorTests.kt @@ -0,0 +1,413 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.replication.action.resume + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope +import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.doAnswer +import com.nhaarman.mockitokotlin2.doReturn +import com.nhaarman.mockitokotlin2.eq +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.never +import com.nhaarman.mockitokotlin2.times +import com.nhaarman.mockitokotlin2.verify +import com.nhaarman.mockitokotlin2.whenever +import kotlinx.coroutines.runBlocking +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.assertThatThrownBy +import org.mockito.ArgumentMatchers.anyString +import org.mockito.Mockito +import org.opensearch.Version +import org.opensearch.action.admin.indices.stats.IndexShardStats +import org.opensearch.action.admin.indices.stats.IndicesStatsAction +import org.opensearch.action.admin.indices.stats.IndicesStatsResponse +import org.opensearch.action.admin.indices.stats.ShardStats +import org.opensearch.action.support.clustermanager.AcknowledgedResponse +import org.opensearch.cluster.ClusterName +import org.opensearch.cluster.ClusterState +import org.opensearch.cluster.metadata.IndexMetadata +import org.opensearch.cluster.metadata.Metadata +import org.opensearch.cluster.routing.IndexRoutingTable +import org.opensearch.cluster.routing.IndexShardRoutingTable +import org.opensearch.cluster.routing.RoutingTable +import org.opensearch.cluster.routing.ShardRouting +import org.opensearch.cluster.routing.ShardRoutingState +import org.opensearch.cluster.routing.TestShardRouting +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.settings.Settings +import org.opensearch.core.action.ActionListener +import org.opensearch.core.action.ActionResponse +import org.opensearch.core.index.Index +import org.opensearch.core.index.shard.ShardId +import org.opensearch.index.seqno.RetentionLeaseAlreadyExistsException +import org.opensearch.index.seqno.SeqNoStats +import org.opensearch.replication.action.index.block.IndexBlockUpdateType +import org.opensearch.replication.action.index.block.UpdateIndexBlockAction +import org.opensearch.replication.task.index.IndexReplicationParams +import org.opensearch.test.OpenSearchTestCase +import org.opensearch.threadpool.TestThreadPool +import org.opensearch.transport.client.AdminClient +import org.opensearch.transport.client.Client +import org.opensearch.transport.client.IndicesAdminClient +import java.util.concurrent.TimeUnit + +/** + * Unit tests for [ForceResumeCoordinator]. + * The coordinator orchestrates: remove block → acquire leases → delete follower. + * On failure it must clean up leases and re-add the block. + */ +@ThreadLeakScope(ThreadLeakScope.Scope.NONE) +class ForceResumeCoordinatorTests : OpenSearchTestCase() { + + companion object { + private const val FOLLOWER_INDEX = "follower-index" + private const val LEADER_INDEX = "leader-index" + private const val LEADER_ALIAS = "leader-cluster" + private const val LEADER_INDEX_UUID = "leader-uuid" + private const val FOLLOWER_INDEX_UUID = "follower-uuid" + private const val CLUSTER_NAME = "follower-cluster" + private const val CLUSTER_UUID = "follower-cluster-uuid" + } + + private lateinit var threadPool: TestThreadPool + private lateinit var client: Client + private lateinit var remoteClient: Client + private lateinit var clusterService: ClusterService + private lateinit var adminClient: AdminClient + private lateinit var indicesAdminClient: IndicesAdminClient + + private val leaderIndex = Index(LEADER_INDEX, LEADER_INDEX_UUID) + private val followerIndex = Index(FOLLOWER_INDEX, FOLLOWER_INDEX_UUID) + + override fun setUp() { + super.setUp() + threadPool = TestThreadPool("ForceResumeCoordinatorTests") + client = Mockito.mock(Client::class.java) + remoteClient = Mockito.mock(Client::class.java) + clusterService = Mockito.mock(ClusterService::class.java) + adminClient = Mockito.mock(AdminClient::class.java) + indicesAdminClient = Mockito.mock(IndicesAdminClient::class.java) + + // Wire up client.getRemoteClusterClient + whenever(client.getRemoteClusterClient(anyString())).thenReturn(remoteClient) + + // Wire up threadPool for both clients + whenever(client.threadPool()).thenReturn(threadPool) + whenever(remoteClient.threadPool()).thenReturn(threadPool) + + // Wire up admin client chain for deleteFollowerIndex + whenever(client.admin()).thenReturn(adminClient) + whenever(adminClient.indices()).thenReturn(indicesAdminClient) + + // Stub indices delete to call the listener with acknowledged response + doAnswer { invocation -> + val listener = invocation.getArgument>(1) + listener.onResponse(AcknowledgedResponse(true)) + null + }.whenever(indicesAdminClient).delete(any(), any>()) + + // Wire up clusterService basics + whenever(clusterService.clusterName).thenReturn(ClusterName(CLUSTER_NAME)) + } + + override fun tearDown() { + super.tearDown() + org.opensearch.threadpool.ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS) + } + + private fun buildParams(numShards: Int = 1): IndexReplicationParams { + return IndexReplicationParams(LEADER_ALIAS, leaderIndex, FOLLOWER_INDEX) + } + + // Builds a ClusterState with the follower index having the given number of shards + private fun buildClusterState(numShards: Int): ClusterState { + val indexMetadata = IndexMetadata.builder(FOLLOWER_INDEX) + .settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, FOLLOWER_INDEX_UUID) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShards) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ) + .build() + + val metadata = Metadata.builder() + .put(indexMetadata, false) + .clusterUUID(CLUSTER_UUID) + .build() + + val indexRoutingTableBuilder = IndexRoutingTable.builder(indexMetadata.index) + for (i in 0 until numShards) { + val shardId = ShardId(indexMetadata.index, i) + val shardRouting = TestShardRouting.newShardRouting( + shardId, "node-1", true, ShardRoutingState.STARTED + ) + indexRoutingTableBuilder.addShard(shardRouting) + } + + val routingTable = RoutingTable.builder() + .add(indexRoutingTableBuilder.build()) + .build() + + return ClusterState.builder(ClusterName(CLUSTER_NAME)) + .metadata(metadata) + .routingTable(routingTable) + .build() + } + + /** + * Stubs the client.execute() call to respond with AcknowledgedResponse for any action. + * This covers removeIndexBlock, deleteFollowerIndex, and re-add block calls. + */ + private fun stubClientExecuteAck() { + doAnswer { invocation -> + val listener = invocation.getArgument>(2) + listener.onResponse(AcknowledgedResponse(true)) + null + }.whenever(client).execute(any(), any(), any>()) + } + + /** + * Stubs the remoteClient.execute() for IndicesStatsAction to return a given global checkpoint + * for each shard. + */ + private fun stubRemoteStatsResponse(numShards: Int, globalCheckpoint: Long) { + doAnswer { invocation -> + val listener = invocation.getArgument>(2) + val shardStats = (0 until numShards).map { shardId -> + val shardRouting = TestShardRouting.newShardRouting( + ShardId(leaderIndex, shardId), "leader-node", true, ShardRoutingState.STARTED + ) + val seqNoStats = SeqNoStats(globalCheckpoint, globalCheckpoint, globalCheckpoint) + Mockito.mock(ShardStats::class.java).also { ss -> + whenever(ss.shardRouting).thenReturn(shardRouting) + whenever(ss.seqNoStats).thenReturn(seqNoStats) + } + }.toTypedArray() + + val statsResponse = Mockito.mock(IndicesStatsResponse::class.java) + whenever(statsResponse.shards).thenReturn(shardStats) + listener.onResponse(statsResponse as ActionResponse) + null + }.whenever(remoteClient).execute(any(), any(), any>()) + } + + // ======================== Tests ======================== + + fun `test ForceResumeResult is returned on success with single shard`() = runBlocking { + val numShards = 1 + val globalCheckpoint = 42L + val clusterState = buildClusterState(numShards) + whenever(clusterService.state()).thenReturn(clusterState) + + stubClientExecuteAck() + stubRemoteStatsResponse(numShards, globalCheckpoint) + + val coordinator = ForceResumeCoordinator(client, clusterService) + val result = coordinator.executeForceResume(buildParams()) + + assertThat(result.successful).isTrue() + assertThat(result.followerIndex).isEqualTo(FOLLOWER_INDEX) + assertThat(result.leaseAcquiredAtSeqNo).hasSize(numShards) + // Lease should be acquired at globalCheckpoint + 1 + assertThat(result.leaseAcquiredAtSeqNo[0]).isEqualTo(globalCheckpoint + 1) + assertThat(result.durationMillis).isGreaterThanOrEqualTo(0) + } + + fun `test removeIndexBlock is called with REMOVE_BLOCK`() = runBlocking { + val clusterState = buildClusterState(1) + whenever(clusterService.state()).thenReturn(clusterState) + + stubClientExecuteAck() + stubRemoteStatsResponse(1, 10L) + + val coordinator = ForceResumeCoordinator(client, clusterService) + coordinator.executeForceResume(buildParams()) + + // removeIndexBlock uses client.execute (suspendExecute) — at least 1 call + verify(client, Mockito.atLeast(1)).execute(any(), any(), any>()) + // deleteFollowerIndex uses client.admin().indices().delete — verify that path was called + verify(indicesAdminClient, Mockito.atLeast(1)).delete(any(), any>()) + } + + fun `test cleanup re-adds block when lease acquisition fails`() = runBlocking { + val clusterState = buildClusterState(1) + whenever(clusterService.state()).thenReturn(clusterState) + + // First call succeeds (removeIndexBlock), subsequent calls for cleanup also succeed + stubClientExecuteAck() + + // Make remote stats call fail to simulate lease acquisition failure + doAnswer { invocation -> + val listener = invocation.getArgument>(2) + listener.onFailure(RuntimeException("Stats call failed")) + null + }.whenever(remoteClient).execute(any(), any(), any>()) + + val coordinator = ForceResumeCoordinator(client, clusterService) + + assertThatThrownBy { + runBlocking { coordinator.executeForceResume(buildParams()) } + }.isInstanceOf(RuntimeException::class.java) + .hasMessageContaining("Stats call failed") + + // Verify that client.execute was called more than once (block removal + re-add on cleanup) + verify(client, Mockito.atLeast(2)).execute(any(), any(), any>()) + } + + fun `test no shards means no leases acquired`() = runBlocking { + // Build a cluster state where the follower index has no routing entry + val metadata = Metadata.builder() + .clusterUUID(CLUSTER_UUID) + .build() + val routingTable = RoutingTable.builder().build() + val clusterState = ClusterState.builder(ClusterName(CLUSTER_NAME)) + .metadata(metadata) + .routingTable(routingTable) + .build() + + whenever(clusterService.state()).thenReturn(clusterState) + stubClientExecuteAck() + + val coordinator = ForceResumeCoordinator(client, clusterService) + val result = coordinator.executeForceResume(buildParams()) + + assertThat(result.successful).isTrue() + assertThat(result.leaseAcquiredAtSeqNo).isEmpty() + } + + fun `test global checkpoint of zero results in lease at seqNo 1`() = runBlocking { + val clusterState = buildClusterState(1) + whenever(clusterService.state()).thenReturn(clusterState) + + stubClientExecuteAck() + stubRemoteStatsResponse(1, 0L) + + val coordinator = ForceResumeCoordinator(client, clusterService) + val result = coordinator.executeForceResume(buildParams()) + + assertThat(result.leaseAcquiredAtSeqNo[0]).isEqualTo(1L) + } + + fun `test removeIndexBlock failure propagates without cleanup`() = runBlocking { + val clusterState = buildClusterState(1) + whenever(clusterService.state()).thenReturn(clusterState) + + // Make the first client.execute call (removeIndexBlock) fail + doAnswer { invocation -> + val listener = invocation.getArgument>(2) + listener.onFailure(RuntimeException("Block removal failed")) + null + }.whenever(client).execute(any(), any(), any>()) + + val coordinator = ForceResumeCoordinator(client, clusterService) + + assertThatThrownBy { + runBlocking { coordinator.executeForceResume(buildParams()) } + }.isInstanceOf(RuntimeException::class.java) + .hasMessageContaining("Block removal failed") + + // No remote calls should have been made since we failed at step 1 + verify(remoteClient, never()).execute(any(), any(), any>()) + } + + fun `test delete follower index failure triggers cleanup`() = runBlocking { + val clusterState = buildClusterState(1) + whenever(clusterService.state()).thenReturn(clusterState) + + // client.execute succeeds (for removeIndexBlock and re-add block in cleanup) + stubClientExecuteAck() + // remote stats succeeds (lease acquisition works) + stubRemoteStatsResponse(1, 50L) + + // But delete follower index fails + doAnswer { invocation -> + val listener = invocation.getArgument>(1) + listener.onFailure(RuntimeException("Delete index failed")) + null + }.whenever(indicesAdminClient).delete(any(), any>()) + + val coordinator = ForceResumeCoordinator(client, clusterService) + + assertThatThrownBy { + runBlocking { coordinator.executeForceResume(buildParams()) } + }.isInstanceOf(RuntimeException::class.java) + .hasMessageContaining("Delete index failed") + + // Cleanup should have re-added the block (at least 2 client.execute calls: + // 1 for removeIndexBlock, 1 for re-add block in cleanup) + verify(client, Mockito.atLeast(2)).execute(any(), any(), any>()) + } + + fun `test primary shard not found throws IllegalStateException`() = runBlocking { + val clusterState = buildClusterState(1) + whenever(clusterService.state()).thenReturn(clusterState) + + stubClientExecuteAck() + + // Return stats with no matching primary shard + doAnswer { invocation -> + val listener = invocation.getArgument>(2) + val statsResponse = Mockito.mock(IndicesStatsResponse::class.java) + whenever(statsResponse.shards).thenReturn(emptyArray()) + listener.onResponse(statsResponse as ActionResponse) + null + }.whenever(remoteClient).execute(any(), any(), any>()) + + val coordinator = ForceResumeCoordinator(client, clusterService) + + assertThatThrownBy { + runBlocking { coordinator.executeForceResume(buildParams()) } + }.isInstanceOf(IllegalStateException::class.java) + .hasMessageContaining("Primary shard") + .hasMessageContaining("not found") + } + + fun `test multiple shards with different global checkpoints`() = runBlocking { + val numShards = 3 + val clusterState = buildClusterState(numShards) + whenever(clusterService.state()).thenReturn(clusterState) + + stubClientExecuteAck() + + // Return different global checkpoints per shard + doAnswer { invocation -> + val listener = invocation.getArgument>(2) + val shardStats = (0 until numShards).map { shardId -> + val shardRouting = TestShardRouting.newShardRouting( + ShardId(leaderIndex, shardId), "leader-node", true, ShardRoutingState.STARTED + ) + val checkpoint = (shardId + 1) * 100L // 100, 200, 300 + val seqNoStats = SeqNoStats(checkpoint, checkpoint, checkpoint) + Mockito.mock(ShardStats::class.java).also { ss -> + whenever(ss.shardRouting).thenReturn(shardRouting) + whenever(ss.seqNoStats).thenReturn(seqNoStats) + } + }.toTypedArray() + + val statsResponse = Mockito.mock(IndicesStatsResponse::class.java) + whenever(statsResponse.shards).thenReturn(shardStats) + listener.onResponse(statsResponse as ActionResponse) + null + }.whenever(remoteClient).execute(any(), any(), any>()) + + val coordinator = ForceResumeCoordinator(client, clusterService) + val result = coordinator.executeForceResume(buildParams()) + + assertThat(result.successful).isTrue() + assertThat(result.leaseAcquiredAtSeqNo).hasSize(numShards) + assertThat(result.leaseAcquiredAtSeqNo[0]).isEqualTo(101L) + assertThat(result.leaseAcquiredAtSeqNo[1]).isEqualTo(201L) + assertThat(result.leaseAcquiredAtSeqNo[2]).isEqualTo(301L) + } + +} diff --git a/src/test/kotlin/org/opensearch/replication/action/resume/ForceResumeResultTests.kt b/src/test/kotlin/org/opensearch/replication/action/resume/ForceResumeResultTests.kt new file mode 100644 index 000000000..6f23ed2dd --- /dev/null +++ b/src/test/kotlin/org/opensearch/replication/action/resume/ForceResumeResultTests.kt @@ -0,0 +1,60 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.replication.action.resume + +import org.opensearch.test.OpenSearchTestCase + +class ForceResumeResultTests : OpenSearchTestCase() { + + fun `test successful result`() { + val leases = mapOf(0 to 100L, 1 to 200L) + val result = ForceResumeResult( + successful = true, + followerIndex = "follower-index", + leaseAcquiredAtSeqNo = leases, + durationMillis = 500L + ) + assertTrue(result.successful) + assertEquals("follower-index", result.followerIndex) + assertEquals(leases, result.leaseAcquiredAtSeqNo) + assertEquals(500L, result.durationMillis) + assertNull(result.failureReason) + } + + fun `test failed result with reason`() { + val result = ForceResumeResult( + successful = false, + followerIndex = "follower-index", + leaseAcquiredAtSeqNo = emptyMap(), + durationMillis = 100L, + failureReason = "Retention lease acquisition failed" + ) + assertFalse(result.successful) + assertEquals("Retention lease acquisition failed", result.failureReason) + assertTrue(result.leaseAcquiredAtSeqNo.isEmpty()) + } + + fun `test result equality`() { + val leases = mapOf(0 to 50L) + val result1 = ForceResumeResult(true, "idx", leases, 10L) + val result2 = ForceResumeResult(true, "idx", leases, 10L) + assertEquals(result1, result2) + assertEquals(result1.hashCode(), result2.hashCode()) + } + + fun `test result inequality on different follower index`() { + val result1 = ForceResumeResult(true, "idx-a", emptyMap(), 10L) + val result2 = ForceResumeResult(true, "idx-b", emptyMap(), 10L) + assertNotEquals(result1, result2) + } + +} diff --git a/src/test/kotlin/org/opensearch/replication/action/resume/ResumeIndexReplicationRequestTests.kt b/src/test/kotlin/org/opensearch/replication/action/resume/ResumeIndexReplicationRequestTests.kt new file mode 100644 index 000000000..43e955b77 --- /dev/null +++ b/src/test/kotlin/org/opensearch/replication/action/resume/ResumeIndexReplicationRequestTests.kt @@ -0,0 +1,95 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.replication.action.resume + +import org.opensearch.common.io.stream.BytesStreamOutput +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.common.xcontent.XContentType +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.test.OpenSearchTestCase + +class ResumeIndexReplicationRequestTests : OpenSearchTestCase() { + + fun `test serialization roundtrip with forceResume false`() { + val original = ResumeIndexReplicationRequest("my-index", forceResume = false) + val out = BytesStreamOutput() + original.writeTo(out) + + val inp: StreamInput = out.bytes().streamInput() + val deserialized = ResumeIndexReplicationRequest(inp) + + assertEquals(original.indexName, deserialized.indexName) + assertEquals(original.forceResume, deserialized.forceResume) + } + + fun `test serialization roundtrip with forceResume true`() { + val original = ResumeIndexReplicationRequest("my-index", forceResume = true) + val out = BytesStreamOutput() + original.writeTo(out) + + val inp: StreamInput = out.bytes().streamInput() + val deserialized = ResumeIndexReplicationRequest(inp) + + assertEquals("my-index", deserialized.indexName) + assertTrue(deserialized.forceResume) + } + + fun `test indices returns index name`() { + val request = ResumeIndexReplicationRequest("follower-idx") + val indices = request.indices() + assertEquals(1, indices.size) + assertEquals("follower-idx", indices[0]) + } + + fun `test validate returns null`() { + val request = ResumeIndexReplicationRequest("test-index") + assertNull(request.validate()) + } + + fun `test fromXContent with force_resume true`() { + val json = """{"force_resume": true}""" + val parser = createParser(XContentType.JSON.xContent(), json) + val request = ResumeIndexReplicationRequest.fromXContent(parser, "follower-index") + assertEquals("follower-index", request.indexName) + assertTrue(request.forceResume) + } + + fun `test fromXContent with force_resume false`() { + val json = """{"force_resume": false}""" + val parser = createParser(XContentType.JSON.xContent(), json) + val request = ResumeIndexReplicationRequest.fromXContent(parser, "follower-index") + assertFalse(request.forceResume) + } + + fun `test fromXContent with empty body defaults forceResume to false`() { + val json = """{}""" + val parser = createParser(XContentType.JSON.xContent(), json) + val request = ResumeIndexReplicationRequest.fromXContent(parser, "follower-index") + assertEquals("follower-index", request.indexName) + assertFalse(request.forceResume) + } + + fun `test toXContent includes force_resume field`() { + val request = ResumeIndexReplicationRequest("test-idx", forceResume = true) + val builder = org.opensearch.common.xcontent.XContentFactory.jsonBuilder() + request.toXContent(builder, org.opensearch.core.xcontent.ToXContent.EMPTY_PARAMS) + val xContentString = builder.toString() + assertTrue(xContentString.contains("\"force_resume\":true")) + assertTrue(xContentString.contains("\"indexName\":\"test-idx\"")) + } + + fun `test indicesOptions is strict single index`() { + val request = ResumeIndexReplicationRequest("test-index") + val options = request.indicesOptions() + assertFalse(options.allowNoIndices()) + } +} From a23f7a8ff2ad217a7b260ba14521f4b075b666e5 Mon Sep 17 00:00:00 2001 From: Mohit Kumar Date: Sun, 10 May 2026 20:30:51 +0530 Subject: [PATCH 10/11] Improve cleanup logging and error message in ForceResumeCoordinator Signed-off-by: Mohit Kumar --- .../action/resume/ForceResumeCoordinator.kt | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/main/kotlin/org/opensearch/replication/action/resume/ForceResumeCoordinator.kt b/src/main/kotlin/org/opensearch/replication/action/resume/ForceResumeCoordinator.kt index e3d8a2ba2..cbfacccfa 100644 --- a/src/main/kotlin/org/opensearch/replication/action/resume/ForceResumeCoordinator.kt +++ b/src/main/kotlin/org/opensearch/replication/action/resume/ForceResumeCoordinator.kt @@ -110,7 +110,10 @@ class ForceResumeCoordinator( val shards = clusterService.state().routingTable .indicesRouting().get(params.followerIndexName)?.shards() - ?: return + if (shards == null) { + log.warn("No routing table found for follower index ${params.followerIndexName}, skipping lease acquisition") + return + } shards.forEach { entry -> val followerShardId = entry.value.shardId @@ -141,7 +144,7 @@ class ForceResumeCoordinator( return statsResponse.shards .firstOrNull { it.shardRouting.shardId().id == shardId && it.shardRouting.primary() } ?.seqNoStats?.globalCheckpoint - ?: throw IllegalStateException("Primary shard $shardId not found for leader index $leaderIndexName") + ?: throw IllegalStateException("Primary shard $shardId not found or seqNoStats unavailable for leader index $leaderIndexName") } /** @@ -173,10 +176,13 @@ class ForceResumeCoordinator( remoteClient ) for (shardIdInt in acquiredLeases.keys) { - val followerShardId = ShardId( - clusterService.state().metadata.index(params.followerIndexName)?.index ?: continue, - shardIdInt - ) + val indexMetadata = clusterService.state().metadata.index(params.followerIndexName) + if (indexMetadata == null) { + log.warn("Follower index ${params.followerIndexName} not found in metadata during cleanup. " + + "Retention leases on leader may be orphaned for shard $shardIdInt.") + continue + } + val followerShardId = ShardId(indexMetadata.index, shardIdInt) retentionLeaseHelper.attemptRetentionLeaseRemoval(ShardId(params.leaderIndex, shardIdInt), followerShardId) } } catch (e: Exception) { From fdf3669dd6d127056a111a772ad864a8eb5306aa Mon Sep 17 00:00:00 2001 From: Mohit Kumar Date: Mon, 11 May 2026 10:42:01 +0530 Subject: [PATCH 11/11] Narrow IndicesStatsRequest to avoid fetching all stats across cluster boundary Signed-off-by: Mohit Kumar --- .../replication/action/resume/ForceResumeCoordinator.kt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/kotlin/org/opensearch/replication/action/resume/ForceResumeCoordinator.kt b/src/main/kotlin/org/opensearch/replication/action/resume/ForceResumeCoordinator.kt index cbfacccfa..c13f872e5 100644 --- a/src/main/kotlin/org/opensearch/replication/action/resume/ForceResumeCoordinator.kt +++ b/src/main/kotlin/org/opensearch/replication/action/resume/ForceResumeCoordinator.kt @@ -136,9 +136,11 @@ class ForceResumeCoordinator( // Fetches the leader shard's global checkpoint via IndicesStatsAction. private suspend fun getLeaderGlobalCheckpoint(remoteClient: Client, leaderIndexName: String, shardId: Int): Long { + val statsRequest = IndicesStatsRequest().indices(leaderIndexName).clear() + statsRequest.translog(true) // translog stats include seqNoStats/globalCheckpoint val statsResponse = remoteClient.suspendExecute( IndicesStatsAction.INSTANCE, - IndicesStatsRequest().all().indices(leaderIndexName), + statsRequest, injectSecurityContext = true ) return statsResponse.shards