Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import org.opensearch.replication.task.ReplicationState
import org.opensearch.replication.task.index.IndexReplicationExecutor
import org.opensearch.replication.task.index.IndexReplicationParams
import org.opensearch.replication.task.index.IndexReplicationState
import org.opensearch.replication.util.StaleTaskUtils
import org.opensearch.replication.util.coroutineContext
import org.opensearch.replication.util.startTask
import org.opensearch.replication.util.suspending
Expand Down Expand Up @@ -106,6 +107,12 @@ class TransportReplicateIndexClusterManagerNodeAction @Inject constructor(transp
"Delete the index:${replicateIndexReq.followerIndex}")
}

// Remove all replication tasks before creating new ones
val removedTaskCount = StaleTaskUtils.removeAllTasksForIndex(clusterService, nodeClient, replicateIndexReq.followerIndex)
if (removedTaskCount > 0) {
log.info("Cleaned up $removedTaskCount tasks for ${replicateIndexReq.followerIndex}")
}

indexScopedSettings.validate(replicateIndexReq.settings,
false,
false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ class TransportResumeIndexReplicationAction @Inject constructor(transportService
listener.completeWith {
log.info("Resuming index replication on index:" + request.indexName)
validateResumeReplicationRequest(request)

// Remove all existing tasks before resuming to ensure a clean slate
StaleTaskUtils.removeAllTasksForIndex(clusterService, client, request.indexName)

val replMetdata = replicationMetadataManager.getIndexReplicationMetadata(request.indexName)
val remoteMetadata = getLeaderIndexMetadata(replMetdata.connectionName, replMetdata.leaderContext.resource)
val params = IndexReplicationParams(replMetdata.connectionName, remoteMetadata.index, request.indexName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@ import org.opensearch.replication.action.index.block.UpdateIndexBlockAction
import org.opensearch.replication.action.index.block.UpdateIndexBlockRequest
import org.opensearch.replication.metadata.INDEX_REPLICATION_BLOCK
import org.opensearch.replication.metadata.ReplicationMetadataManager
import org.opensearch.replication.metadata.ReplicationOverallState
import org.opensearch.replication.metadata.UpdateMetadataAction
import org.opensearch.replication.metadata.UpdateMetadataRequest
import org.opensearch.replication.metadata.state.REPLICATION_LAST_KNOWN_OVERALL_STATE
import org.opensearch.replication.metadata.state.getReplicationStateParamsForIndex
import org.opensearch.replication.seqno.RemoteClusterRetentionLeaseHelper
import org.opensearch.replication.util.StaleTaskUtils
import org.opensearch.replication.util.coroutineContext
import org.opensearch.replication.util.suspendExecute
import org.opensearch.replication.util.suspending
Expand All @@ -35,6 +33,7 @@ import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.OpenSearchException
import org.opensearch.ResourceNotFoundException
import org.opensearch.core.action.ActionListener
import org.opensearch.action.admin.indices.open.OpenIndexRequest
import org.opensearch.action.support.ActionFilters
Expand All @@ -56,8 +55,6 @@ import org.opensearch.common.inject.Inject
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.common.settings.Settings
import org.opensearch.replication.util.stackTraceToString
import org.opensearch.persistent.PersistentTasksCustomMetadata
import org.opensearch.persistent.RemovePersistentTaskAction
import org.opensearch.threadpool.ThreadPool
import org.opensearch.transport.TransportService
import java.io.IOException
Expand Down Expand Up @@ -106,8 +103,6 @@ class TransportStopIndexReplicationAction @Inject constructor(transportService:
throw OpenSearchException("Failed to remove index block on ${request.indexName}")
}

validateReplicationStateOfIndex(request)

// Index will be deleted if replication is stopped while it is restoring. So no need to close/reopen
val restoring = clusterService.state().custom<RestoreInProgress>(RestoreInProgress.TYPE, RestoreInProgress.EMPTY).any { entry ->
entry.indices().any { it == request.indexName }
Expand Down Expand Up @@ -148,8 +143,17 @@ class TransportStopIndexReplicationAction @Inject constructor(transportService:
throw OpenSearchException("Failed to reopen index: ${request.indexName}")
}
}
replicationMetadataManager.deleteIndexReplicationMetadata(request.indexName)
removeStaleReplicationTasksFromClusterState(request)

// Remove stale persistent replication tasks from cluster state.
// Unassigned tasks are always removed. Assigned tasks are removed only if
// the task is not actually running in the task manager or the node is invalid.
StaleTaskUtils.removeStaleTasksForIndex(clusterService, client, request.indexName)
try {
replicationMetadataManager.deleteIndexReplicationMetadata(request.indexName)
} catch (e: ResourceNotFoundException) {
throw IllegalArgumentException("No replication in progress for index:${request.indexName}")
}

listener.onResponse(AcknowledgedResponse(true))
} catch (e: Exception) {
log.error("Stop replication failed for index[${request.indexName}] with error ${e.stackTraceToString()}")
Expand All @@ -158,62 +162,6 @@ class TransportStopIndexReplicationAction @Inject constructor(transportService:
}
}

private suspend fun removeStaleReplicationTasksFromClusterState(request: StopIndexReplicationRequest) {
try {
val allTasks: PersistentTasksCustomMetadata =
clusterService.state().metadata().custom(PersistentTasksCustomMetadata.TYPE)
for (singleTask in allTasks.tasks()) {
if (isReplicationTask(singleTask, request) && !singleTask.isAssigned){
log.info("Removing task: ${singleTask.id} from cluster state")
val removeRequest: RemovePersistentTaskAction.Request =
RemovePersistentTaskAction.Request(singleTask.id)
client.suspendExecute(RemovePersistentTaskAction.INSTANCE, removeRequest)
}
}
} catch (e: Exception) {
log.info("Could not update cluster state")
}
}

// Remove index replication task metadata, format replication:index:fruit-1
// Remove shard replication task metadata, format replication:[fruit-1][0]
private fun isReplicationTask(
singleTask: PersistentTasksCustomMetadata.PersistentTask<*>,
request: StopIndexReplicationRequest
) = singleTask.id.startsWith("replication:") &&
(singleTask.id == "replication:index:${request.indexName}" || singleTask.id.split(":")[1].contains(request.indexName))


private fun validateReplicationStateOfIndex(request: StopIndexReplicationRequest) {
// If replication blocks/settings are present, Stop action should proceed with the clean-up
// This can happen during settings of follower index are carried over in the snapshot and the restore is
// performed using this snapshot.
if (clusterService.state().blocks.hasIndexBlock(request.indexName, INDEX_REPLICATION_BLOCK)
|| clusterService.state().metadata.index(request.indexName)?.settings?.get(REPLICATED_INDEX_SETTING.key) != null) {
return
}

//check for stale replication tasks
val allTasks: PersistentTasksCustomMetadata? =
clusterService.state()?.metadata()?.custom(PersistentTasksCustomMetadata.TYPE)
allTasks?.tasks()?.forEach{
if (isReplicationTask(it, request) && !it.isAssigned){
return
}
}

val replicationStateParams = getReplicationStateParamsForIndex(clusterService, request.indexName)
?:
throw IllegalArgumentException("No replication in progress for index:${request.indexName}")
val replicationOverallState = replicationStateParams[REPLICATION_LAST_KNOWN_OVERALL_STATE]
if (replicationOverallState == ReplicationOverallState.RUNNING.name ||
replicationOverallState == ReplicationOverallState.STOPPED.name ||
replicationOverallState == ReplicationOverallState.FAILED.name ||
replicationOverallState == ReplicationOverallState.PAUSED.name)
return
throw IllegalStateException("Unknown value of replication state:$replicationOverallState")
}

override fun executor(): String {
return ThreadPool.Names.SAME
}
Expand Down Expand Up @@ -253,4 +201,4 @@ class TransportStopIndexReplicationAction @Inject constructor(transportService:

override fun newResponse(acknowledged: Boolean) = AcknowledgedResponse(acknowledged)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,10 @@ open class ReplicationMetadataManager constructor(private val clusterService: Cl
private suspend fun deleteMetadata(deleteReq: DeleteReplicationMetadataRequest) {
executeAndWrapExceptionIfAny({
val delRes = replicaionMetadataStore.deleteMetadata(deleteReq)
if(delRes.result != DocWriteResponse.Result.DELETED && delRes.result != DocWriteResponse.Result.NOT_FOUND) {
if(delRes.result == DocWriteResponse.Result.NOT_FOUND) {
throw ResourceNotFoundException("Metadata for ${deleteReq.resourceName} doesn't exist")
}
if(delRes.result != DocWriteResponse.Result.DELETED) {
log.error("Encountered error with result - ${delRes.result}, while deleting metadata")
throw ReplicationException("Error deleting replication metadata")
}
Expand Down
Loading
Loading