Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
e408930
Stab at Fixing integ tests
mohitamg Mar 24, 2026
ed33475
Revert "Stab at Fixing integ tests"
mohitamg Mar 24, 2026
e9b381c
Stab at Fixing integ tests
mohitamg Mar 24, 2026
ed525f4
Merge branch 'opensearch-project:main' into main
mohit10011999 Mar 25, 2026
e9a5102
Fixing CVE-2026-25645 and CVE-2026-24400
mohitamg Mar 27, 2026
5cd6265
Merge branch 'opensearch-project:main' into main
mohit10011999 Mar 27, 2026
b557df0
Merge branch 'opensearch-project:main' into main
mohit10011999 Apr 6, 2026
773fe45
Merge branch 'opensearch-project:main' into main
mohit10011999 Apr 15, 2026
65f587b
Merge branch 'opensearch-project:main' into main
mohit10011999 Apr 26, 2026
6fceb22
Merge branch 'opensearch-project:main' into main
mohit10011999 May 7, 2026
4b20aa8
Merge branch 'opensearch-project:main' into main
mohit10011999 May 9, 2026
36bc2af
Merge branch 'opensearch-project:main' into main
mohit10011999 May 10, 2026
5694df0
Add force resume coordinator and update resume replication flow
mohitamg Apr 20, 2026
ea1d75a
Refactor ForceResumeCoordinator to reuse existing infrastructure
mohitamg Apr 20, 2026
148f503
Minor: clean up comment formatting in ForceResumeCoordinator
mohitamg Apr 20, 2026
d77c2c7
Minor: clean up comment formatting in ForceResumeCoordinator
mohitamg Apr 20, 2026
9d5ec94
Adding UTs for force resume feature
mohitamg May 9, 2026
a23f7a8
Improve cleanup logging and error message in ForceResumeCoordinator
mohitamg May 10, 2026
fdf3669
Narrow IndicesStatsRequest to avoid fetching all stats across cluster…
mohitamg May 11, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.replication.action.resume

import org.apache.logging.log4j.LogManager
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest
import org.opensearch.action.admin.indices.stats.IndicesStatsAction
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest
import org.opensearch.cluster.service.ClusterService
import org.opensearch.core.index.shard.ShardId
import org.opensearch.index.seqno.RetentionLeaseAlreadyExistsException
import org.opensearch.replication.action.index.block.IndexBlockUpdateType
import org.opensearch.replication.action.index.block.UpdateIndexBlockAction
import org.opensearch.replication.action.index.block.UpdateIndexBlockRequest
import org.opensearch.replication.seqno.RemoteClusterRetentionLeaseHelper
import org.opensearch.replication.task.index.IndexReplicationParams
import org.opensearch.replication.util.suspendExecute
import org.opensearch.replication.util.suspending
import org.opensearch.transport.client.Client

/**
* Coordinates the force resume operation when retention leases have expired.
* Reuses existing infrastructure:
* Block removal/addition: same as TransportStopIndexReplicationAction
* Index deletion: same as IndexReplicationTask.cancelRestore()
* Lease add/renew/remove: existing RemoteClusterRetentionLeaseHelper methods
* The only new logic is acquiring retention leases at leaderGlobalCheckpoint+1
* BEFORE deleting the follower index, which prevents the race condition where
* the leader's translog is purged during the async snapshot restore.
*/
class ForceResumeCoordinator(
private val client: Client,
private val clusterService: ClusterService
) {
companion object {
private val log = LogManager.getLogger(ForceResumeCoordinator::class.java)
}

/**
* Executes the force resume workflow:
* 1. Remove index block (reuses UpdateIndexBlockAction — same as stop action)
* 2. Acquire retention leases at leaderCheckpoint+1 per shard (NEW — race condition fix)
* 3. Delete follower index (reuses same pattern as IndexReplicationTask.cancelRestore)
*/
suspend fun executeForceResume(params: IndexReplicationParams): ForceResumeResult {
val followerIndex = params.followerIndexName
val startTime = System.currentTimeMillis()
val acquiredLeases = mutableMapOf<Int, Long>()

log.info("Starting force resume for index $followerIndex")

// Step 1: Remove index block — same pattern as TransportStopIndexReplicationAction#L103
removeIndexBlock(followerIndex)

// From this point, any failure must clean up leases and re-add the block.
try {
// Step 2: Acquire retention leases on leader BEFORE deleting follower.
// This is the only genuinely new logic — prevents the race condition.
acquirePreRestoreLeases(params, acquiredLeases)

// Step 3: Delete follower index — same pattern as IndexReplicationTask.cancelRestore()
deleteFollowerIndex(followerIndex)
} catch (e: Exception) {
log.error("Force resume failed for $followerIndex, cleaning up", e)
cleanupOnFailure(params, acquiredLeases, followerIndex)
throw e
}

val duration = System.currentTimeMillis() - startTime
log.info("Force resume completed for $followerIndex in ${duration}ms")

return ForceResumeResult(
successful = true,
followerIndex = followerIndex,
leaseAcquiredAtSeqNo = acquiredLeases.toMap(),
durationMillis = duration
)
}

private suspend fun removeIndexBlock(followerIndex: String) {
log.info("Removing index block for $followerIndex")
val request = UpdateIndexBlockRequest(followerIndex, IndexBlockUpdateType.REMOVE_BLOCK)
client.suspendExecute(UpdateIndexBlockAction.INSTANCE, request, defaultContext = true)
}

/**
Acquires retention leases at leaderGlobalCheckpoint+1 per shard BEFORE
* deleting the follower. This prevents the race condition where the leader's
* global checkpoint advances during restore and operations get purged.
*/
private suspend fun acquirePreRestoreLeases(
params: IndexReplicationParams,
acquiredLeases: MutableMap<Int, Long>
) {
val remoteClient = client.getRemoteClusterClient(params.leaderAlias)
val retentionLeaseHelper = RemoteClusterRetentionLeaseHelper(
clusterService.clusterName.value(),
clusterService.state().metadata.clusterUUID(),
remoteClient
)

val shards = clusterService.state().routingTable
.indicesRouting().get(params.followerIndexName)?.shards()
if (shards == null) {
log.warn("No routing table found for follower index ${params.followerIndexName}, skipping lease acquisition")
return
}

shards.forEach { entry ->
val followerShardId = entry.value.shardId
val leaderShardId = ShardId(params.leaderIndex, followerShardId.id)
val retainingSeqNo = getLeaderGlobalCheckpoint(remoteClient, params.leaderIndex.name, leaderShardId.id) + 1

try {
retentionLeaseHelper.addRetentionLease(
leaderShardId, retainingSeqNo, followerShardId
)
} catch (e: RetentionLeaseAlreadyExistsException) {
retentionLeaseHelper.renewRetentionLease(
leaderShardId, retainingSeqNo, followerShardId
)
}
acquiredLeases[followerShardId.id] = retainingSeqNo
log.info("Acquired pre-restore lease for shard ${followerShardId.id} at seqNo $retainingSeqNo")
}
}

// Fetches the leader shard's global checkpoint via IndicesStatsAction.
private suspend fun getLeaderGlobalCheckpoint(remoteClient: Client, leaderIndexName: String, shardId: Int): Long {
val statsRequest = IndicesStatsRequest().indices(leaderIndexName).clear()
statsRequest.translog(true) // translog stats include seqNoStats/globalCheckpoint
val statsResponse = remoteClient.suspendExecute(
IndicesStatsAction.INSTANCE,
statsRequest,
injectSecurityContext = true
)
return statsResponse.shards
.firstOrNull { it.shardRouting.shardId().id == shardId && it.shardRouting.primary() }
?.seqNoStats?.globalCheckpoint
?: throw IllegalStateException("Primary shard $shardId not found or seqNoStats unavailable for leader index $leaderIndexName")
}

/**
* Same pattern as IndexReplicationTask.cancelRestore() — deletes the follower index.
* This makes isResumed() return false, triggering setupAndStartRestore() in the state machine.
*/
private suspend fun deleteFollowerIndex(followerIndex: String) {
log.info("Deleting follower index $followerIndex for snapshot bootstrap")
client.suspending(client.admin().indices()::delete, defaultContext = true)(DeleteIndexRequest(followerIndex))
}

/**
* Cleanup on failure: remove partially acquired leases + re-add index block.
* Uses existing RemoteClusterRetentionLeaseHelper.attemptRetentionLeaseRemoval()
* and UpdateIndexBlockAction with ADD_BLOCK.
*/
private suspend fun cleanupOnFailure(
params: IndexReplicationParams,
acquiredLeases: Map<Int, Long>,
followerIndex: String
) {
// Clean up partially acquired leases — uses existing attemptRetentionLeaseRemoval()
if (acquiredLeases.isNotEmpty()) {
try {
val remoteClient = client.getRemoteClusterClient(params.leaderAlias)
val retentionLeaseHelper = RemoteClusterRetentionLeaseHelper(
clusterService.clusterName.value(),
clusterService.state().metadata.clusterUUID(),
remoteClient
)
for (shardIdInt in acquiredLeases.keys) {
val indexMetadata = clusterService.state().metadata.index(params.followerIndexName)
if (indexMetadata == null) {
log.warn("Follower index ${params.followerIndexName} not found in metadata during cleanup. " +
"Retention leases on leader may be orphaned for shard $shardIdInt.")
continue
}
val followerShardId = ShardId(indexMetadata.index, shardIdInt)
retentionLeaseHelper.attemptRetentionLeaseRemoval(ShardId(params.leaderIndex, shardIdInt), followerShardId)
}
} catch (e: Exception) {
log.warn("Best-effort lease cleanup failed for ${params.followerIndexName}: ${e.message}")
}
}

// Re-add index block — same pattern as IndexReplicationTask.addIndexBlockForReplication()
try {
val request = UpdateIndexBlockRequest(followerIndex, IndexBlockUpdateType.ADD_BLOCK)
client.suspendExecute(UpdateIndexBlockAction.INSTANCE, request, defaultContext = true)
log.info("Restored index block for $followerIndex")
} catch (e: Exception) {
log.error("CRITICAL: Failed to re-add index block for $followerIndex. Manual intervention may be required.", e)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.replication.action.resume

/**
* Captures the result of a force resume operation for logging and diagnostics.
*
* @param successful Whether the force resume coordinator completed successfully
* @param followerIndex The name of the follower index being force-resumed
* @param leaseAcquiredAtSeqNo Map of shardId to the retaining sequence number where the lease was acquired
* @param durationMillis How long the force resume coordinator took
* @param failureReason Description of the failure if unsuccessful
*/
data class ForceResumeResult(
val successful: Boolean,
val followerIndex: String,
val leaseAcquiredAtSeqNo: Map<Int, Long>,
val durationMillis: Long,
val failureReason: String? = null
)
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,45 @@ import org.opensearch.action.ActionRequestValidationException
import org.opensearch.action.IndicesRequest
import org.opensearch.action.support.IndicesOptions
import org.opensearch.action.support.clustermanager.AcknowledgedRequest
import org.opensearch.core.ParseField
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.xcontent.*

class ResumeIndexReplicationRequest : AcknowledgedRequest<ResumeIndexReplicationRequest>, IndicesRequest.Replaceable, ToXContentObject {

lateinit var indexName: String
@JvmField
var forceResume: Boolean = false

constructor(indexName: String) {
constructor(indexName: String, forceResume: Boolean = false) {
this.indexName = indexName
this.forceResume = forceResume
}

private constructor() {
}

constructor(inp: StreamInput): super(inp) {
indexName = inp.readString()
forceResume = inp.readBoolean()
}

companion object {
private const val FORCE_RESUME_FIELD = "force_resume"

private val PARSER = ObjectParser<ResumeIndexReplicationRequest, Void>("ResumeReplicationRequestParser") {
ResumeIndexReplicationRequest()
}

init {
PARSER.declareBoolean({ req, value -> req.forceResume = value }, ParseField(FORCE_RESUME_FIELD))
}

fun fromXContent(parser: XContentParser, followerIndex: String): ResumeIndexReplicationRequest {
val ResumeIndexReplicationRequest = PARSER.parse(parser, null)
ResumeIndexReplicationRequest.indexName = followerIndex
return ResumeIndexReplicationRequest
val resumeRequest = PARSER.parse(parser, null)
resumeRequest.indexName = followerIndex
return resumeRequest
}
}

Expand All @@ -65,13 +76,15 @@ class ResumeIndexReplicationRequest : AcknowledgedRequest<ResumeIndexReplication
override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
builder.field("indexName", indexName)
builder.field("force_resume", forceResume)
builder.endObject()
return builder
}

override fun writeTo(out: StreamOutput) {
super.writeTo(out)
out.writeString(indexName)
out.writeBoolean(forceResume)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,32 @@ class TransportResumeIndexReplicationAction @Inject constructor(transportService
val replMetdata = replicationMetadataManager.getIndexReplicationMetadata(request.indexName)
val remoteMetadata = getLeaderIndexMetadata(replMetdata.connectionName, replMetdata.leaderContext.resource)
val params = IndexReplicationParams(replMetdata.connectionName, remoteMetadata.index, request.indexName)
if (!isResumable(params)) {
throw ResourceNotFoundException("Retention lease doesn't exist. Replication can't be resumed for ${request.indexName}")
val resumable = isResumable(params)

if (!resumable && !request.forceResume) {
throw ResourceNotFoundException(
"Retention lease doesn't exist. Replication can't be resumed for ${request.indexName}. " +
"Use force_resume=true to restore from snapshot."
)
}

if (!resumable && request.forceResume) {
// Delegate to force resume coordinator (snapshot bootstrap path)
log.info("Retention lease expired for ${request.indexName}. " +
"Force resume requested — initiating snapshot bootstrap.")
val coordinator = ForceResumeCoordinator(client, clusterService)
val result = coordinator.executeForceResume(params)
log.info("Force resume coordinator completed for ${request.indexName}: " +
"leases acquired at ${result.leaseAcquiredAtSeqNo}, " +
"duration=${result.durationMillis}ms")
// After coordinator completes, fall through to start IndexReplicationTask.
// Since the follower index was deleted, isResumed() returns false,
// which triggers setupAndStartRestore() in the state machine.
}

// For force resume with valid leases, proceed with normal resume
// (forceResume=true but leases exist → no-op, just resume normally)

val remoteClient = client.getRemoteClusterClient(params.leaderAlias)
val getSettingsRequest = GetSettingsRequest().includeDefaults(false).indices(params.leaderIndex.name)
val settingsResponse = remoteClient.suspending(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,18 @@ class RemoteClusterRetentionLeaseHelper constructor(var followerClusterNameWithU
}
}

/**
* Suspend (non-blocking) version of addRetentionLease for use in coroutine contexts
* (e.g., transport thread). The blocking overload below uses actionGet() and must NOT
* be called from transport threads.
*/
public suspend fun addRetentionLease(leaderShardId: ShardId, seqNo: Long, followerShardId: ShardId) {
val retentionLeaseId = retentionLeaseIdForShard(followerClusterNameWithUUID, followerShardId)
val request = RetentionLeaseActions.AddRequest(leaderShardId, retentionLeaseId, seqNo, retentionLeaseSource)
log.info("Adding retention lease $retentionLeaseId (async)")
client.suspendExecute(RetentionLeaseActions.Add.INSTANCE, request)
}

public suspend fun renewRetentionLease(leaderShardId: ShardId, seqNo: Long, followerShardId: ShardId) {
val retentionLeaseId = retentionLeaseIdForShard(followerClusterNameWithUUID, followerShardId)
val request = RetentionLeaseActions.RenewRequest(leaderShardId, retentionLeaseId, seqNo, retentionLeaseSource)
Expand Down Expand Up @@ -194,9 +206,6 @@ class RemoteClusterRetentionLeaseHelper constructor(var followerClusterNameWithU
}


/**
* Remove these once the callers are moved to above APIs
*/
public fun addRetentionLease(leaderShardId: ShardId, seqNo: Long,
followerShardId: ShardId, timeout: Long) {
val retentionLeaseId = retentionLeaseIdForShard(followerClusterNameWithUUID, followerShardId)
Expand Down
Loading
Loading