diff --git a/src/main/kotlin/org/opensearch/replication/action/resume/ForceResumeCoordinator.kt b/src/main/kotlin/org/opensearch/replication/action/resume/ForceResumeCoordinator.kt new file mode 100644 index 000000000..c13f872e5 --- /dev/null +++ b/src/main/kotlin/org/opensearch/replication/action/resume/ForceResumeCoordinator.kt @@ -0,0 +1,204 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.replication.action.resume + +import org.apache.logging.log4j.LogManager +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest +import org.opensearch.action.admin.indices.stats.IndicesStatsAction +import org.opensearch.action.admin.indices.stats.IndicesStatsRequest +import org.opensearch.cluster.service.ClusterService +import org.opensearch.core.index.shard.ShardId +import org.opensearch.index.seqno.RetentionLeaseAlreadyExistsException +import org.opensearch.replication.action.index.block.IndexBlockUpdateType +import org.opensearch.replication.action.index.block.UpdateIndexBlockAction +import org.opensearch.replication.action.index.block.UpdateIndexBlockRequest +import org.opensearch.replication.seqno.RemoteClusterRetentionLeaseHelper +import org.opensearch.replication.task.index.IndexReplicationParams +import org.opensearch.replication.util.suspendExecute +import org.opensearch.replication.util.suspending +import org.opensearch.transport.client.Client + +/** + * Coordinates the force resume operation when retention leases have expired. + * Reuses existing infrastructure: + * Block removal/addition: same as TransportStopIndexReplicationAction + * Index deletion: same as IndexReplicationTask.cancelRestore() + * Lease add/renew/remove: existing RemoteClusterRetentionLeaseHelper methods + * The only new logic is acquiring retention leases at leaderGlobalCheckpoint+1 + * BEFORE deleting the follower index, which prevents the race condition where + * the leader's translog is purged during the async snapshot restore. + */ +class ForceResumeCoordinator( + private val client: Client, + private val clusterService: ClusterService +) { + companion object { + private val log = LogManager.getLogger(ForceResumeCoordinator::class.java) + } + + /** + * Executes the force resume workflow: + * 1. Remove index block (reuses UpdateIndexBlockAction — same as stop action) + * 2. Acquire retention leases at leaderCheckpoint+1 per shard (NEW — race condition fix) + * 3. Delete follower index (reuses same pattern as IndexReplicationTask.cancelRestore) + */ + suspend fun executeForceResume(params: IndexReplicationParams): ForceResumeResult { + val followerIndex = params.followerIndexName + val startTime = System.currentTimeMillis() + val acquiredLeases = mutableMapOf() + + log.info("Starting force resume for index $followerIndex") + + // Step 1: Remove index block — same pattern as TransportStopIndexReplicationAction#L103 + removeIndexBlock(followerIndex) + + // From this point, any failure must clean up leases and re-add the block. + try { + // Step 2: Acquire retention leases on leader BEFORE deleting follower. + // This is the only genuinely new logic — prevents the race condition. + acquirePreRestoreLeases(params, acquiredLeases) + + // Step 3: Delete follower index — same pattern as IndexReplicationTask.cancelRestore() + deleteFollowerIndex(followerIndex) + } catch (e: Exception) { + log.error("Force resume failed for $followerIndex, cleaning up", e) + cleanupOnFailure(params, acquiredLeases, followerIndex) + throw e + } + + val duration = System.currentTimeMillis() - startTime + log.info("Force resume completed for $followerIndex in ${duration}ms") + + return ForceResumeResult( + successful = true, + followerIndex = followerIndex, + leaseAcquiredAtSeqNo = acquiredLeases.toMap(), + durationMillis = duration + ) + } + + private suspend fun removeIndexBlock(followerIndex: String) { + log.info("Removing index block for $followerIndex") + val request = UpdateIndexBlockRequest(followerIndex, IndexBlockUpdateType.REMOVE_BLOCK) + client.suspendExecute(UpdateIndexBlockAction.INSTANCE, request, defaultContext = true) + } + + /** + Acquires retention leases at leaderGlobalCheckpoint+1 per shard BEFORE + * deleting the follower. This prevents the race condition where the leader's + * global checkpoint advances during restore and operations get purged. + */ + private suspend fun acquirePreRestoreLeases( + params: IndexReplicationParams, + acquiredLeases: MutableMap + ) { + val remoteClient = client.getRemoteClusterClient(params.leaderAlias) + val retentionLeaseHelper = RemoteClusterRetentionLeaseHelper( + clusterService.clusterName.value(), + clusterService.state().metadata.clusterUUID(), + remoteClient + ) + + val shards = clusterService.state().routingTable + .indicesRouting().get(params.followerIndexName)?.shards() + if (shards == null) { + log.warn("No routing table found for follower index ${params.followerIndexName}, skipping lease acquisition") + return + } + + shards.forEach { entry -> + val followerShardId = entry.value.shardId + val leaderShardId = ShardId(params.leaderIndex, followerShardId.id) + val retainingSeqNo = getLeaderGlobalCheckpoint(remoteClient, params.leaderIndex.name, leaderShardId.id) + 1 + + try { + retentionLeaseHelper.addRetentionLease( + leaderShardId, retainingSeqNo, followerShardId + ) + } catch (e: RetentionLeaseAlreadyExistsException) { + retentionLeaseHelper.renewRetentionLease( + leaderShardId, retainingSeqNo, followerShardId + ) + } + acquiredLeases[followerShardId.id] = retainingSeqNo + log.info("Acquired pre-restore lease for shard ${followerShardId.id} at seqNo $retainingSeqNo") + } + } + + // Fetches the leader shard's global checkpoint via IndicesStatsAction. + private suspend fun getLeaderGlobalCheckpoint(remoteClient: Client, leaderIndexName: String, shardId: Int): Long { + val statsRequest = IndicesStatsRequest().indices(leaderIndexName).clear() + statsRequest.translog(true) // translog stats include seqNoStats/globalCheckpoint + val statsResponse = remoteClient.suspendExecute( + IndicesStatsAction.INSTANCE, + statsRequest, + injectSecurityContext = true + ) + return statsResponse.shards + .firstOrNull { it.shardRouting.shardId().id == shardId && it.shardRouting.primary() } + ?.seqNoStats?.globalCheckpoint + ?: throw IllegalStateException("Primary shard $shardId not found or seqNoStats unavailable for leader index $leaderIndexName") + } + + /** + * Same pattern as IndexReplicationTask.cancelRestore() — deletes the follower index. + * This makes isResumed() return false, triggering setupAndStartRestore() in the state machine. + */ + private suspend fun deleteFollowerIndex(followerIndex: String) { + log.info("Deleting follower index $followerIndex for snapshot bootstrap") + client.suspending(client.admin().indices()::delete, defaultContext = true)(DeleteIndexRequest(followerIndex)) + } + + /** + * Cleanup on failure: remove partially acquired leases + re-add index block. + * Uses existing RemoteClusterRetentionLeaseHelper.attemptRetentionLeaseRemoval() + * and UpdateIndexBlockAction with ADD_BLOCK. + */ + private suspend fun cleanupOnFailure( + params: IndexReplicationParams, + acquiredLeases: Map, + followerIndex: String + ) { + // Clean up partially acquired leases — uses existing attemptRetentionLeaseRemoval() + if (acquiredLeases.isNotEmpty()) { + try { + val remoteClient = client.getRemoteClusterClient(params.leaderAlias) + val retentionLeaseHelper = RemoteClusterRetentionLeaseHelper( + clusterService.clusterName.value(), + clusterService.state().metadata.clusterUUID(), + remoteClient + ) + for (shardIdInt in acquiredLeases.keys) { + val indexMetadata = clusterService.state().metadata.index(params.followerIndexName) + if (indexMetadata == null) { + log.warn("Follower index ${params.followerIndexName} not found in metadata during cleanup. " + + "Retention leases on leader may be orphaned for shard $shardIdInt.") + continue + } + val followerShardId = ShardId(indexMetadata.index, shardIdInt) + retentionLeaseHelper.attemptRetentionLeaseRemoval(ShardId(params.leaderIndex, shardIdInt), followerShardId) + } + } catch (e: Exception) { + log.warn("Best-effort lease cleanup failed for ${params.followerIndexName}: ${e.message}") + } + } + + // Re-add index block — same pattern as IndexReplicationTask.addIndexBlockForReplication() + try { + val request = UpdateIndexBlockRequest(followerIndex, IndexBlockUpdateType.ADD_BLOCK) + client.suspendExecute(UpdateIndexBlockAction.INSTANCE, request, defaultContext = true) + log.info("Restored index block for $followerIndex") + } catch (e: Exception) { + log.error("CRITICAL: Failed to re-add index block for $followerIndex. Manual intervention may be required.", e) + } + } +} diff --git a/src/main/kotlin/org/opensearch/replication/action/resume/ForceResumeResult.kt b/src/main/kotlin/org/opensearch/replication/action/resume/ForceResumeResult.kt new file mode 100644 index 000000000..742c97332 --- /dev/null +++ b/src/main/kotlin/org/opensearch/replication/action/resume/ForceResumeResult.kt @@ -0,0 +1,29 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.replication.action.resume + +/** + * Captures the result of a force resume operation for logging and diagnostics. + * + * @param successful Whether the force resume coordinator completed successfully + * @param followerIndex The name of the follower index being force-resumed + * @param leaseAcquiredAtSeqNo Map of shardId to the retaining sequence number where the lease was acquired + * @param durationMillis How long the force resume coordinator took + * @param failureReason Description of the failure if unsuccessful + */ +data class ForceResumeResult( + val successful: Boolean, + val followerIndex: String, + val leaseAcquiredAtSeqNo: Map, + val durationMillis: Long, + val failureReason: String? = null +) diff --git a/src/main/kotlin/org/opensearch/replication/action/resume/ResumeIndexReplicationRequest.kt b/src/main/kotlin/org/opensearch/replication/action/resume/ResumeIndexReplicationRequest.kt index 30413e0b2..b6ba33046 100644 --- a/src/main/kotlin/org/opensearch/replication/action/resume/ResumeIndexReplicationRequest.kt +++ b/src/main/kotlin/org/opensearch/replication/action/resume/ResumeIndexReplicationRequest.kt @@ -15,6 +15,7 @@ import org.opensearch.action.ActionRequestValidationException import org.opensearch.action.IndicesRequest import org.opensearch.action.support.IndicesOptions import org.opensearch.action.support.clustermanager.AcknowledgedRequest +import org.opensearch.core.ParseField import org.opensearch.core.common.io.stream.StreamInput import org.opensearch.core.common.io.stream.StreamOutput import org.opensearch.core.xcontent.* @@ -22,9 +23,12 @@ import org.opensearch.core.xcontent.* class ResumeIndexReplicationRequest : AcknowledgedRequest, IndicesRequest.Replaceable, ToXContentObject { lateinit var indexName: String + @JvmField + var forceResume: Boolean = false - constructor(indexName: String) { + constructor(indexName: String, forceResume: Boolean = false) { this.indexName = indexName + this.forceResume = forceResume } private constructor() { @@ -32,17 +36,24 @@ class ResumeIndexReplicationRequest : AcknowledgedRequest("ResumeReplicationRequestParser") { ResumeIndexReplicationRequest() } + init { + PARSER.declareBoolean({ req, value -> req.forceResume = value }, ParseField(FORCE_RESUME_FIELD)) + } + fun fromXContent(parser: XContentParser, followerIndex: String): ResumeIndexReplicationRequest { - val ResumeIndexReplicationRequest = PARSER.parse(parser, null) - ResumeIndexReplicationRequest.indexName = followerIndex - return ResumeIndexReplicationRequest + val resumeRequest = PARSER.parse(parser, null) + resumeRequest.indexName = followerIndex + return resumeRequest } } @@ -65,6 +76,7 @@ class ResumeIndexReplicationRequest : AcknowledgedRequest + val listener = invocation.getArgument>(1) + listener.onResponse(AcknowledgedResponse(true)) + null + }.whenever(indicesAdminClient).delete(any(), any>()) + + // Wire up clusterService basics + whenever(clusterService.clusterName).thenReturn(ClusterName(CLUSTER_NAME)) + } + + override fun tearDown() { + super.tearDown() + org.opensearch.threadpool.ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS) + } + + private fun buildParams(numShards: Int = 1): IndexReplicationParams { + return IndexReplicationParams(LEADER_ALIAS, leaderIndex, FOLLOWER_INDEX) + } + + // Builds a ClusterState with the follower index having the given number of shards + private fun buildClusterState(numShards: Int): ClusterState { + val indexMetadata = IndexMetadata.builder(FOLLOWER_INDEX) + .settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, FOLLOWER_INDEX_UUID) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShards) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ) + .build() + + val metadata = Metadata.builder() + .put(indexMetadata, false) + .clusterUUID(CLUSTER_UUID) + .build() + + val indexRoutingTableBuilder = IndexRoutingTable.builder(indexMetadata.index) + for (i in 0 until numShards) { + val shardId = ShardId(indexMetadata.index, i) + val shardRouting = TestShardRouting.newShardRouting( + shardId, "node-1", true, ShardRoutingState.STARTED + ) + indexRoutingTableBuilder.addShard(shardRouting) + } + + val routingTable = RoutingTable.builder() + .add(indexRoutingTableBuilder.build()) + .build() + + return ClusterState.builder(ClusterName(CLUSTER_NAME)) + .metadata(metadata) + .routingTable(routingTable) + .build() + } + + /** + * Stubs the client.execute() call to respond with AcknowledgedResponse for any action. + * This covers removeIndexBlock, deleteFollowerIndex, and re-add block calls. + */ + private fun stubClientExecuteAck() { + doAnswer { invocation -> + val listener = invocation.getArgument>(2) + listener.onResponse(AcknowledgedResponse(true)) + null + }.whenever(client).execute(any(), any(), any>()) + } + + /** + * Stubs the remoteClient.execute() for IndicesStatsAction to return a given global checkpoint + * for each shard. + */ + private fun stubRemoteStatsResponse(numShards: Int, globalCheckpoint: Long) { + doAnswer { invocation -> + val listener = invocation.getArgument>(2) + val shardStats = (0 until numShards).map { shardId -> + val shardRouting = TestShardRouting.newShardRouting( + ShardId(leaderIndex, shardId), "leader-node", true, ShardRoutingState.STARTED + ) + val seqNoStats = SeqNoStats(globalCheckpoint, globalCheckpoint, globalCheckpoint) + Mockito.mock(ShardStats::class.java).also { ss -> + whenever(ss.shardRouting).thenReturn(shardRouting) + whenever(ss.seqNoStats).thenReturn(seqNoStats) + } + }.toTypedArray() + + val statsResponse = Mockito.mock(IndicesStatsResponse::class.java) + whenever(statsResponse.shards).thenReturn(shardStats) + listener.onResponse(statsResponse as ActionResponse) + null + }.whenever(remoteClient).execute(any(), any(), any>()) + } + + // ======================== Tests ======================== + + fun `test ForceResumeResult is returned on success with single shard`() = runBlocking { + val numShards = 1 + val globalCheckpoint = 42L + val clusterState = buildClusterState(numShards) + whenever(clusterService.state()).thenReturn(clusterState) + + stubClientExecuteAck() + stubRemoteStatsResponse(numShards, globalCheckpoint) + + val coordinator = ForceResumeCoordinator(client, clusterService) + val result = coordinator.executeForceResume(buildParams()) + + assertThat(result.successful).isTrue() + assertThat(result.followerIndex).isEqualTo(FOLLOWER_INDEX) + assertThat(result.leaseAcquiredAtSeqNo).hasSize(numShards) + // Lease should be acquired at globalCheckpoint + 1 + assertThat(result.leaseAcquiredAtSeqNo[0]).isEqualTo(globalCheckpoint + 1) + assertThat(result.durationMillis).isGreaterThanOrEqualTo(0) + } + + fun `test removeIndexBlock is called with REMOVE_BLOCK`() = runBlocking { + val clusterState = buildClusterState(1) + whenever(clusterService.state()).thenReturn(clusterState) + + stubClientExecuteAck() + stubRemoteStatsResponse(1, 10L) + + val coordinator = ForceResumeCoordinator(client, clusterService) + coordinator.executeForceResume(buildParams()) + + // removeIndexBlock uses client.execute (suspendExecute) — at least 1 call + verify(client, Mockito.atLeast(1)).execute(any(), any(), any>()) + // deleteFollowerIndex uses client.admin().indices().delete — verify that path was called + verify(indicesAdminClient, Mockito.atLeast(1)).delete(any(), any>()) + } + + fun `test cleanup re-adds block when lease acquisition fails`() = runBlocking { + val clusterState = buildClusterState(1) + whenever(clusterService.state()).thenReturn(clusterState) + + // First call succeeds (removeIndexBlock), subsequent calls for cleanup also succeed + stubClientExecuteAck() + + // Make remote stats call fail to simulate lease acquisition failure + doAnswer { invocation -> + val listener = invocation.getArgument>(2) + listener.onFailure(RuntimeException("Stats call failed")) + null + }.whenever(remoteClient).execute(any(), any(), any>()) + + val coordinator = ForceResumeCoordinator(client, clusterService) + + assertThatThrownBy { + runBlocking { coordinator.executeForceResume(buildParams()) } + }.isInstanceOf(RuntimeException::class.java) + .hasMessageContaining("Stats call failed") + + // Verify that client.execute was called more than once (block removal + re-add on cleanup) + verify(client, Mockito.atLeast(2)).execute(any(), any(), any>()) + } + + fun `test no shards means no leases acquired`() = runBlocking { + // Build a cluster state where the follower index has no routing entry + val metadata = Metadata.builder() + .clusterUUID(CLUSTER_UUID) + .build() + val routingTable = RoutingTable.builder().build() + val clusterState = ClusterState.builder(ClusterName(CLUSTER_NAME)) + .metadata(metadata) + .routingTable(routingTable) + .build() + + whenever(clusterService.state()).thenReturn(clusterState) + stubClientExecuteAck() + + val coordinator = ForceResumeCoordinator(client, clusterService) + val result = coordinator.executeForceResume(buildParams()) + + assertThat(result.successful).isTrue() + assertThat(result.leaseAcquiredAtSeqNo).isEmpty() + } + + fun `test global checkpoint of zero results in lease at seqNo 1`() = runBlocking { + val clusterState = buildClusterState(1) + whenever(clusterService.state()).thenReturn(clusterState) + + stubClientExecuteAck() + stubRemoteStatsResponse(1, 0L) + + val coordinator = ForceResumeCoordinator(client, clusterService) + val result = coordinator.executeForceResume(buildParams()) + + assertThat(result.leaseAcquiredAtSeqNo[0]).isEqualTo(1L) + } + + fun `test removeIndexBlock failure propagates without cleanup`() = runBlocking { + val clusterState = buildClusterState(1) + whenever(clusterService.state()).thenReturn(clusterState) + + // Make the first client.execute call (removeIndexBlock) fail + doAnswer { invocation -> + val listener = invocation.getArgument>(2) + listener.onFailure(RuntimeException("Block removal failed")) + null + }.whenever(client).execute(any(), any(), any>()) + + val coordinator = ForceResumeCoordinator(client, clusterService) + + assertThatThrownBy { + runBlocking { coordinator.executeForceResume(buildParams()) } + }.isInstanceOf(RuntimeException::class.java) + .hasMessageContaining("Block removal failed") + + // No remote calls should have been made since we failed at step 1 + verify(remoteClient, never()).execute(any(), any(), any>()) + } + + fun `test delete follower index failure triggers cleanup`() = runBlocking { + val clusterState = buildClusterState(1) + whenever(clusterService.state()).thenReturn(clusterState) + + // client.execute succeeds (for removeIndexBlock and re-add block in cleanup) + stubClientExecuteAck() + // remote stats succeeds (lease acquisition works) + stubRemoteStatsResponse(1, 50L) + + // But delete follower index fails + doAnswer { invocation -> + val listener = invocation.getArgument>(1) + listener.onFailure(RuntimeException("Delete index failed")) + null + }.whenever(indicesAdminClient).delete(any(), any>()) + + val coordinator = ForceResumeCoordinator(client, clusterService) + + assertThatThrownBy { + runBlocking { coordinator.executeForceResume(buildParams()) } + }.isInstanceOf(RuntimeException::class.java) + .hasMessageContaining("Delete index failed") + + // Cleanup should have re-added the block (at least 2 client.execute calls: + // 1 for removeIndexBlock, 1 for re-add block in cleanup) + verify(client, Mockito.atLeast(2)).execute(any(), any(), any>()) + } + + fun `test primary shard not found throws IllegalStateException`() = runBlocking { + val clusterState = buildClusterState(1) + whenever(clusterService.state()).thenReturn(clusterState) + + stubClientExecuteAck() + + // Return stats with no matching primary shard + doAnswer { invocation -> + val listener = invocation.getArgument>(2) + val statsResponse = Mockito.mock(IndicesStatsResponse::class.java) + whenever(statsResponse.shards).thenReturn(emptyArray()) + listener.onResponse(statsResponse as ActionResponse) + null + }.whenever(remoteClient).execute(any(), any(), any>()) + + val coordinator = ForceResumeCoordinator(client, clusterService) + + assertThatThrownBy { + runBlocking { coordinator.executeForceResume(buildParams()) } + }.isInstanceOf(IllegalStateException::class.java) + .hasMessageContaining("Primary shard") + .hasMessageContaining("not found") + } + + fun `test multiple shards with different global checkpoints`() = runBlocking { + val numShards = 3 + val clusterState = buildClusterState(numShards) + whenever(clusterService.state()).thenReturn(clusterState) + + stubClientExecuteAck() + + // Return different global checkpoints per shard + doAnswer { invocation -> + val listener = invocation.getArgument>(2) + val shardStats = (0 until numShards).map { shardId -> + val shardRouting = TestShardRouting.newShardRouting( + ShardId(leaderIndex, shardId), "leader-node", true, ShardRoutingState.STARTED + ) + val checkpoint = (shardId + 1) * 100L // 100, 200, 300 + val seqNoStats = SeqNoStats(checkpoint, checkpoint, checkpoint) + Mockito.mock(ShardStats::class.java).also { ss -> + whenever(ss.shardRouting).thenReturn(shardRouting) + whenever(ss.seqNoStats).thenReturn(seqNoStats) + } + }.toTypedArray() + + val statsResponse = Mockito.mock(IndicesStatsResponse::class.java) + whenever(statsResponse.shards).thenReturn(shardStats) + listener.onResponse(statsResponse as ActionResponse) + null + }.whenever(remoteClient).execute(any(), any(), any>()) + + val coordinator = ForceResumeCoordinator(client, clusterService) + val result = coordinator.executeForceResume(buildParams()) + + assertThat(result.successful).isTrue() + assertThat(result.leaseAcquiredAtSeqNo).hasSize(numShards) + assertThat(result.leaseAcquiredAtSeqNo[0]).isEqualTo(101L) + assertThat(result.leaseAcquiredAtSeqNo[1]).isEqualTo(201L) + assertThat(result.leaseAcquiredAtSeqNo[2]).isEqualTo(301L) + } + +} diff --git a/src/test/kotlin/org/opensearch/replication/action/resume/ForceResumeResultTests.kt b/src/test/kotlin/org/opensearch/replication/action/resume/ForceResumeResultTests.kt new file mode 100644 index 000000000..6f23ed2dd --- /dev/null +++ b/src/test/kotlin/org/opensearch/replication/action/resume/ForceResumeResultTests.kt @@ -0,0 +1,60 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.replication.action.resume + +import org.opensearch.test.OpenSearchTestCase + +class ForceResumeResultTests : OpenSearchTestCase() { + + fun `test successful result`() { + val leases = mapOf(0 to 100L, 1 to 200L) + val result = ForceResumeResult( + successful = true, + followerIndex = "follower-index", + leaseAcquiredAtSeqNo = leases, + durationMillis = 500L + ) + assertTrue(result.successful) + assertEquals("follower-index", result.followerIndex) + assertEquals(leases, result.leaseAcquiredAtSeqNo) + assertEquals(500L, result.durationMillis) + assertNull(result.failureReason) + } + + fun `test failed result with reason`() { + val result = ForceResumeResult( + successful = false, + followerIndex = "follower-index", + leaseAcquiredAtSeqNo = emptyMap(), + durationMillis = 100L, + failureReason = "Retention lease acquisition failed" + ) + assertFalse(result.successful) + assertEquals("Retention lease acquisition failed", result.failureReason) + assertTrue(result.leaseAcquiredAtSeqNo.isEmpty()) + } + + fun `test result equality`() { + val leases = mapOf(0 to 50L) + val result1 = ForceResumeResult(true, "idx", leases, 10L) + val result2 = ForceResumeResult(true, "idx", leases, 10L) + assertEquals(result1, result2) + assertEquals(result1.hashCode(), result2.hashCode()) + } + + fun `test result inequality on different follower index`() { + val result1 = ForceResumeResult(true, "idx-a", emptyMap(), 10L) + val result2 = ForceResumeResult(true, "idx-b", emptyMap(), 10L) + assertNotEquals(result1, result2) + } + +} diff --git a/src/test/kotlin/org/opensearch/replication/action/resume/ResumeIndexReplicationRequestTests.kt b/src/test/kotlin/org/opensearch/replication/action/resume/ResumeIndexReplicationRequestTests.kt new file mode 100644 index 000000000..43e955b77 --- /dev/null +++ b/src/test/kotlin/org/opensearch/replication/action/resume/ResumeIndexReplicationRequestTests.kt @@ -0,0 +1,95 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.replication.action.resume + +import org.opensearch.common.io.stream.BytesStreamOutput +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.common.xcontent.XContentType +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.test.OpenSearchTestCase + +class ResumeIndexReplicationRequestTests : OpenSearchTestCase() { + + fun `test serialization roundtrip with forceResume false`() { + val original = ResumeIndexReplicationRequest("my-index", forceResume = false) + val out = BytesStreamOutput() + original.writeTo(out) + + val inp: StreamInput = out.bytes().streamInput() + val deserialized = ResumeIndexReplicationRequest(inp) + + assertEquals(original.indexName, deserialized.indexName) + assertEquals(original.forceResume, deserialized.forceResume) + } + + fun `test serialization roundtrip with forceResume true`() { + val original = ResumeIndexReplicationRequest("my-index", forceResume = true) + val out = BytesStreamOutput() + original.writeTo(out) + + val inp: StreamInput = out.bytes().streamInput() + val deserialized = ResumeIndexReplicationRequest(inp) + + assertEquals("my-index", deserialized.indexName) + assertTrue(deserialized.forceResume) + } + + fun `test indices returns index name`() { + val request = ResumeIndexReplicationRequest("follower-idx") + val indices = request.indices() + assertEquals(1, indices.size) + assertEquals("follower-idx", indices[0]) + } + + fun `test validate returns null`() { + val request = ResumeIndexReplicationRequest("test-index") + assertNull(request.validate()) + } + + fun `test fromXContent with force_resume true`() { + val json = """{"force_resume": true}""" + val parser = createParser(XContentType.JSON.xContent(), json) + val request = ResumeIndexReplicationRequest.fromXContent(parser, "follower-index") + assertEquals("follower-index", request.indexName) + assertTrue(request.forceResume) + } + + fun `test fromXContent with force_resume false`() { + val json = """{"force_resume": false}""" + val parser = createParser(XContentType.JSON.xContent(), json) + val request = ResumeIndexReplicationRequest.fromXContent(parser, "follower-index") + assertFalse(request.forceResume) + } + + fun `test fromXContent with empty body defaults forceResume to false`() { + val json = """{}""" + val parser = createParser(XContentType.JSON.xContent(), json) + val request = ResumeIndexReplicationRequest.fromXContent(parser, "follower-index") + assertEquals("follower-index", request.indexName) + assertFalse(request.forceResume) + } + + fun `test toXContent includes force_resume field`() { + val request = ResumeIndexReplicationRequest("test-idx", forceResume = true) + val builder = org.opensearch.common.xcontent.XContentFactory.jsonBuilder() + request.toXContent(builder, org.opensearch.core.xcontent.ToXContent.EMPTY_PARAMS) + val xContentString = builder.toString() + assertTrue(xContentString.contains("\"force_resume\":true")) + assertTrue(xContentString.contains("\"indexName\":\"test-idx\"")) + } + + fun `test indicesOptions is strict single index`() { + val request = ResumeIndexReplicationRequest("test-index") + val options = request.indicesOptions() + assertFalse(options.allowNoIndices()) + } +}