Skip to content

Removes per-shard ShardReplicationTask from cluster state#1695

Open
jainankitk wants to merge 7 commits into
opensearch-project:mainfrom
jainankitk:remove-shard-tasks
Open

Removes per-shard ShardReplicationTask from cluster state#1695
jainankitk wants to merge 7 commits into
opensearch-project:mainfrom
jainankitk:remove-shard-tasks

Conversation

@jainankitk
Copy link
Copy Markdown

Description

Summary

Removes per-shard ShardReplicationTask from cluster state. Replication of follower shards now runs as in-memory work managed by a per-node NodeReplicationController, reconciled from cluster state on every clusterChanged event. Per-index IndexReplicationTask is unchanged.

Why

ShardReplicationTask was registered as an OpenSearch persistent task — one entry per follower primary shard — which means start/stop/pause/resume of replication produced O(shards) cluster state updates serialized through the cluster manager. This dominated cluster state churn on large autofollow setups, contributed to ghosttasks blocking restart, and was the root cause for many customer tickets across several trends (Replication stuck/paused, ghost tasks, autofollow rule failures, metadata inconsistency).

The runtime state owned by ShardReplicationTask (changes tracker, sequencer, reader/writer scopes) didn't actually need to live in cluster state. Moving it to in-memory state on the node hosting the primary shard removes the cluster-state cost without changing replication semantics.

What changes

Removed:

  • ShardReplicationTask, ShardReplicationExecutor, ShardReplicationParams,
    ShardReplicationState
  • Per-shard task spawning/polling logic in IndexReplicationTask
    (startNewOrMissingShardTasks, pollShardTaskStatus,
    findAllReplicationFailedShardTasks, startReplicationTask)
  • INIT_FOLLOW state behavior (enum value retained for backward serialization
    compatibility; state machine no longer dwells in it)

Added:

  • NodeReplicationController — per-node ClusterStateListener that reconciles
    local follower primaries against ReplicationMetadata, starting/stopping
    in-memory ShardReplicationContext instances. Includes a periodic idle-shard
    retention-lease renewal sweep so leases on quiet shards don't expire after the
    default 12h.
  • ShardReplicationContext — in-memory replacement for ShardReplicationTask.
    Owns ShardReplicationChangesTracker, TranslogSequencer, reader/writer
    coroutine scope, lease renewal, and circuit-breaker state. On terminal failure,
    pauses (or stops, when leader-deletion is detected) the index directly via
    ReplicationMetadataManager rather than via persistent-task state transitions.
  • CleanupShardTasksUpdateTask — single batched cluster state update that removes
    legacy ShardReplicationExecutor entries from PersistentTasksCustomMetadata
    during plugin init on the cluster manager. Idempotent.
  • Unit tests: NodeReplicationControllerTests,
    CleanupShardTasksUpdateTaskTests.

Unchanged:

  • GetChangesAction and ReplayChangesAction data-fetch protocol.
  • Retention lease semantics.
  • Bootstrap flow (snapshot/restore via RemoteClusterRepository).
  • REST API surface (_start, _stop, _pause, _resume, _autofollow).
  • IndexReplicationTask as a per-index persistent task.

Behavior changes worth calling out

  • Cluster state updates on autofollow / start replication: O(shards) → O(1).
  • Worker placement: A worker now follows its primary by virtue of the
    controller observing routing changes, not via persistent-task allocation. There
    is a brief window during primary relocation where neither the old nor the new
    node has an active worker — GetChangesAction is idempotent, so no data loss.
  • Idle shards: A periodic timer (default 30m sweep, 2h staleness threshold)
    renews retention leases for shards that aren't producing batches. Closes a
    failure mode where lease lifecycle was coupled to the fetch loop.
  • Per-shard task visibility: _cat/tasks?actions=*replication* no longer
    shows per-shard rows. A status API extension surfacing per-shard worker state
    will land in a follow-up PR.
  • Coarse autopause preserved: Today's "any terminal shard failure pauses the
    whole index" policy is unchanged. Per-shard isolation is deferred to the status
    API follow-up.

Migration

Single batched cluster-state update on first plugin init removes legacy ShardReplicationTask entries. Rolling upgrade is supported with a brief replication lag window per node as each one restarts and its controller picks up local primaries; no operator-mandated pause is required.

Test plan

  • compileKotlin, compileTestKotlin
  • Unit tests pass (including new NodeReplicationControllerTests and
    CleanupShardTasksUpdateTaskTests)
  • ./gradlew integTest — 120 tests, 0 failures, 28 skipped (security tests)

Out of scope (follow-up PRs)

  • Status API to restore per-shard observability that _cat/tasks previously
    provided.
  • Throttling controls (max_buffered_bytes_per_shard, per-leader-alias semaphore).
  • Removal of IndexReplicationTask as a persistent task (per-index, low
    cardinality — not the bottleneck).

Risks

  • Migration cleanup runs unconditionally on cluster-manager init. Idempotent and
    scoped to the legacy task name only. Failure logged but doesn't block plugin
    startup.
  • Reconciler runs on clusterChanged events. If events are missed or coalesced,
    next event reconciles — convergent.
  • Per-shard task visibility regression in _cat/tasks until follow-up PR.

Check List

  • New functionality includes testing.
  • New functionality has been documented.
  • API changes companion pull request created.
  • Commits are signed per the DCO using --signoff.
  • Public documentation issue/PR created.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Introduce in-memory replacement for ShardReplicationTask without yet wiring it
into the plugin. Existing per-shard persistent task code continues to run.

- ShardReplicationContext: lifts the replicate loop out of ShardReplicationTask
  into a plain class managed by the controller. On terminal failure, pauses the
  index directly via ReplicationMetadataManager rather than via persistent task
  state transitions.
- NodeReplicationController: cluster state listener that reconciles local
  follower primaries against ReplicationMetadata, starting/stopping contexts
  as routing or replication state changes. Includes idle-lease renewal sweep.
- CleanupShardTasksUpdateTask: single batched cluster state update that
  removes legacy ShardReplicationExecutor entries from
  PersistentTasksCustomMetadata. Idempotent.
- docs/remove-shard-tasks-design.md: design rationale and migration notes.

No behavior change in this commit. Subsequent commit will wire up the new
controller, remove the persistent-task plumbing, and run cleanup.

Signed-off-by: Ankit Jain <jainankitk@apache.org>
Activate the in-memory shard replication model added in the previous commit.
Per-shard persistent tasks are no longer registered, no longer stored in
cluster state, and no longer participate in start/stop/pause/resume flows.

Plugin (ReplicationPlugin.kt):
- Start NodeReplicationController during createComponents.
- Register a one-shot ClusterStateListener that submits
  CleanupShardTasksUpdateTask when this node becomes cluster manager,
  removing legacy ShardReplicationTask entries in a single batched update.
- Drop ShardReplicationExecutor from getPersistentTasksExecutor and the
  related ShardReplicationParams/ShardReplicationState entries from
  getNamedWriteables/getNamedXContent.

IndexReplicationTask:
- Remove startNewOrMissingShardTasks, findAllReplicationFailedShardTasks,
  pollShardTaskStatus, startReplicationTask. Per-shard lifecycle and
  failure detection now live on each follower data node's controller.
- INIT_FOLLOW state now just adds the index replication block and
  transitions to FOLLOWING - no per-shard task spawning.
- MONITORING state no longer polls shard task status; it just runs the
  metadata sync. Shard-level fatal failures pause the index directly via
  ReplicationMetadataManager from the failing context.
- Static-settings update path no longer removes per-shard persistent
  tasks; index close cascades to controller-driven worker stop.

IndexReplicationState:
- FollowingState is now an object with no payload (was a data class
  carrying a map of shard task references).
- FailedState carries only an errorMsg (was a map plus message).
- INIT_FOLLOW enum value and InitFollowState singleton retained for
  backward serialization compatibility with state persisted by older
  clusters; deserialized state effectively transitions through it.

Deletions:
- ShardReplicationTask, ShardReplicationExecutor, ShardReplicationParams,
  ShardReplicationState (no longer instantiated).
- ShardReplicationExecutorTests (tested deleted class).

Tests:
- ReplicationHelpers: drop ShardReplicationExecutor.TASK_NAME references.
  waitForReplicationStart/Stop now wait only on the index-level task.
  waitForShardTaskStart becomes a no-op stub for source compatibility.
- IndexReplicationTaskTests: remove testStartNewShardTasks /
  testStartMissingShardTasks (tested removed method). Trim
  createIndexReplicationTask helper.
- NoOpClient: drop the synthetic ShardReplicationParams response branch.

Build: compileKotlin and compileTestKotlin both pass.
Signed-off-by: Ankit Jain <jainankitk@apache.org>
Three substantive bug fixes uncovered by integ test runs, plus unit tests
for the new infrastructure, plus removal/refactor of integ tests that
were inherently coupled to per-shard persistent tasks.

Integration fixes:

- ShardReplicationContext (controller plumbing):
  * Resolve leader alias and leader Index (name+UUID) from the surviving
    IndexReplicationParams persistent task. The previous attempt to read
    these from follower index settings was wrong - those settings only
    carry the leader index name, not the cluster alias or UUID.
  * Use fully-qualified exception class name in pause-reason text so
    existing test helpers that grep for FQNs (e.g.
    "org.opensearch.indices.IndexClosedException") still match.
  * On fatal IndexNotFoundException matching the leader index AND with
    plugins.replication.replicate.delete_index enabled, stop replication
    instead of pausing it. Preserves the cleanup path that used to live
    in IndexReplicationTask.MONITORING.

- NodeReplicationController:
  * Defer clusterService.localNode() reference until after cluster state
    initializes. The previous logging at start() time threw
    AssertionError "initial cluster state not set yet" during
    createComponents.

- IndexReplicationTask.MONITORING:
  * Restore SLEEP_TIME_BETWEEN_POLL_MS delay between metadata-sync
    iterations. Removing pollShardTaskStatus had inadvertently turned
    MONITORING into a tight loop that drowned the log with
    "Handling dynamic settings change".

Tests:

- New: CleanupShardTasksUpdateTaskTests - verifies the batched
  cluster-state cleanup removes only legacy shard-task entries, is
  idempotent on already-clean state, and tolerates absent persistent
  tasks metadata.
- New: NodeReplicationControllerTests - verifies the pure
  computeDesiredShards function over various cluster states (empty,
  paused, no replication metadata, no local node id).
- Removed: TaskCancellationIT - both tests asserted on per-shard
  persistent task cancellation behavior that no longer exists.
- Refactor: PauseReplicationIT.testPauseReplicationInRestoringState
  drops getShardReplicationTasks().isNotEmpty() assertion.
- Refactor: StopReplicationIT.test_stop_replication_with_stale...
  drops one getShardReplicationTasks() assertion.

Build: compileKotlin and compileTestKotlin both pass. Unit tests pass.
Integ test results: 120 tests, 5 failures, 28 skipped. Failures are:
  - StartReplicationIT.test_that_snapshot_on_leader_does_not_affect_replication...
    expects SYNCING, observes PAUSED. Likely a too-eager
    pauseIndexOnFatalFailure during bootstrap.
  - StartReplicationIT.test_follower_stats - stats counter mismatch.
  - StopReplicationIT.test_delete_follower_index_when_leader_index_is_unavailable
    leader-deletion auto-cleanup does not trigger; investigation pending.
  - StopReplicationIT.test_stop_replication_with_stale_replication_settings...
    second getShardReplicationTasks assertion at line ~352, also
    needs the same removal as line 315.
  - UpdateAutoFollowPatternIT.test_auto_follow_pattern_with_settings -
    cleanup error from leftover read-only block on leader. Likely a
    pre-existing test isolation issue surfaced by ordering changes.

These should be addressed before the PR lands.

Signed-off-by: Ankit Jain <jainankitk@apache.org>
Signed-off-by: Ankit Jain <jainankitk@apache.org>
@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 22, 2026

PR Reviewer Guide 🔍

(Review updated until commit 5e8f01b)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Race Condition

The ShardTaskCleanupListener uses AtomicBoolean.compareAndSet to ensure single submission, but checks two different conditions for cluster manager status (event.localNodeClusterManager() OR event.state().nodes().isLocalNodeElectedClusterManager). If both conditions can be true simultaneously or change between the check and submission, the listener might submit twice or miss the cleanup window. The OR logic should be clarified or one condition removed.

override fun clusterChanged(event: ClusterChangedEvent) {
    if (submitted.get()) return
    if (event.localNodeClusterManager() || event.state().nodes().isLocalNodeElectedClusterManager) {
        if (submitted.compareAndSet(false, true)) {
            cs.submitStateUpdateTask(CleanupShardTasksUpdateTask.SOURCE, CleanupShardTasksUpdateTask())
            cs.removeListener(this)
        }
    }
}
Missing Null Check

In computeDesiredShards, the code retrieves IndexReplicationParams from persistent tasks and maps them by follower index name. If a task's params field is null (which the cast allows), the mapNotNull filter will skip it, but the subsequent indexParamsByFollower[indexName] lookup returns null when no matching task exists. The code then uses 'continue' to skip that index. However, if the persistent task exists but params is unexpectedly null due to deserialization issues during migration, the entry is silently dropped without logging. This could mask migration problems where legacy state is partially corrupt.

val indexParamsByFollower: Map<String, IndexReplicationParams> = persistentTasks?.tasks()
    ?.filter { it.taskName == IndexReplicationExecutor.TASK_NAME }
    ?.mapNotNull { task ->
        @Suppress("UNCHECKED_CAST")
        val params = task.params as? IndexReplicationParams ?: return@mapNotNull null
        params.followerIndexName to params
    }
    ?.toMap()
    ?: emptyMap()
Lease Renewal Failure

In renewLeaseIfStale, if indicesService.indexServiceSafe throws an exception (e.g., index deleted between the threshold check and the renewal attempt), the catch block logs a warning but does not update lastLeaseRenewalMillis. Subsequent idle-sweep calls will retry immediately on every sweep cycle, flooding logs. The method should either update the timestamp on failure to implement backoff, or the controller should track per-shard failure counts to avoid repeated attempts on persistently-failed shards.

suspend fun renewLeaseIfStale(thresholdMillis: Long) {
    if (System.currentTimeMillis() - lastLeaseRenewalMillis <= thresholdMillis) return
    try {
        val indexShard = indicesService.indexServiceSafe(followerShardId.index).getShard(followerShardId.id)
        retentionLeaseHelper.renewRetentionLease(
            leaderShardId,
            indexShard.lastSyncedGlobalCheckpoint + 1,
            followerShardId
        )
        lastLeaseRenewalMillis = System.currentTimeMillis()
        log.info("Idle-shard lease renewed for $followerShardId at seqNo ${indexShard.lastSyncedGlobalCheckpoint + 1}")
    } catch (e: Exception) {
        log.warn("Idle-shard lease renewal failed for $followerShardId: ${e.message}")
    }
}
Unhandled Exception

In pauseIndexOnFatalFailure, if the leader index is not found and replicate-delete is enabled, the code attempts to stop replication and then delete the follower index. The delete operation is wrapped in a try-catch that handles IndexNotFoundException as benign. However, if the stop operation itself throws an exception (caught in the outer catch at line 390), the code falls through to the pause path. If the pause path also fails (line 402 catch), the method logs and returns, but the context remains in a failed state with no retry mechanism. The supervisor job in start() will have already exited, so the context is effectively dead. Operators have no automated recovery path short of manual intervention.

private suspend fun pauseIndexOnFatalFailure(cause: Throwable) {
    // Use the fully-qualified class name in the reason so existing helpers/tests that grep for FQNs (e.g.
    // "org.opensearch.indices.IndexClosedException") continue to match.
    val reason = "Shard $followerShardId failed: ${cause.javaClass.name} - ${cause.message ?: "no message"}"

    // If the failure is because the leader index no longer exists AND replicate-index-deletion is enabled,
    // stop replication entirely (which cascades to follower index deletion via IndexReplicationTask). This
    // preserves the cleanup path that used to live in IndexReplicationTask.MONITORING.
    val replicateDelete = clusterService.clusterSettings.get(
        org.opensearch.replication.ReplicationPlugin.REPLICATION_REPLICATE_INDEX_DELETION
    )
    if (replicateDelete && cause is IndexNotFoundException && cause.message?.contains(leaderShardId.indexName) == true) {
        try {
            log.warn("Leader index ${leaderShardId.indexName} unavailable; stopping replication on $followerIndexName")
            client.execute(
                INTERNAL_STOP_REPLICATION_ACTION_TYPE,
                StopIndexReplicationRequest(followerIndexName)
            ).actionGet()
            // The legacy IndexReplicationTask cleanup path used `isLeaderIndexDeleted` to drive a follow-on
            // DeleteIndex when replicate-delete was enabled. With per-shard tasks gone, that flag never gets
            // set, so we issue the delete here. Idempotent — a concurrent context on another shard may have
            // already deleted the index, in which case IndexNotFoundException is benign.
            try {
                val storedContext = client.threadPool().threadContext.stashContext()
                try {
                    client.execute(DeleteIndexAction.INSTANCE, DeleteIndexRequest(followerIndexName)).actionGet()
                } finally {
                    storedContext.close()
                }
            } catch (e: IndexNotFoundException) {
                log.debug("Follower index $followerIndexName already deleted")
            } catch (e: Exception) {
                log.warn("Failed to delete follower index $followerIndexName after leader-deletion stop: ${e.message}")
            }
            return
        } catch (e: Exception) {
            log.warn("Stop-on-leader-deletion failed for $followerIndexName, falling through to pause: ${e.message}")
        }
    }

    try {
        log.warn("Pausing replication for index $followerIndexName due to fatal shard failure: $reason")
        replicationMetadataManager.updateIndexReplicationState(
            followerIndexName,
            ReplicationOverallState.PAUSED,
            reason
        )
    } catch (e: Exception) {
        // Update may have already been done by another shard's context. Idempotent — log and move on.
        log.warn("Could not update replication state to PAUSED for $followerIndexName (may already be paused): ${e.message}")
    }
}

@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 22, 2026

PR Code Suggestions ✨

Latest suggestions up to 5e8f01b

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Ensure idempotent start behavior

The start() method does not check if the controller is already started, which could
lead to duplicate listeners and multiple scheduled tasks if called multiple times.
Add a guard to ensure idempotent behavior and prevent resource leaks from duplicate
registrations.

src/main/kotlin/org/opensearch/replication/task/shard/NodeReplicationController.kt [82-93]

+private val started = AtomicBoolean(false)
+
 fun start() {
+    if (!started.compareAndSet(false, true)) {
+        log.debug("NodeReplicationController already started")
+        return
+    }
     clusterService.addListener(this)
     idleLeaseSweep = threadPool.scheduleWithFixedDelay(
         { runIdleLeaseSweep() },
         org.opensearch.common.unit.TimeValue.timeValueMillis(IDLE_LEASE_SWEEP_INTERVAL),
         ThreadPool.Names.GENERIC
     )
     log.info("NodeReplicationController started")
 }
Suggestion importance[1-10]: 8

__

Why: The start() method lacks idempotency guards, which could lead to duplicate listener registrations and multiple scheduled tasks if called multiple times. This is a real risk in plugin lifecycle management and could cause resource leaks. The suggested AtomicBoolean guard is a standard pattern for ensuring single initialization.

Medium
Add exception handling for cleanup submission

The cleanup listener checks both event.localNodeClusterManager() and
event.state().nodes().isLocalNodeElectedClusterManager which may be redundant.
Verify that both checks are necessary, or simplify to use only one condition.
Additionally, consider handling potential exceptions from submitStateUpdateTask to
prevent silent failures during cleanup submission.

src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt [254-264]

 private class ShardTaskCleanupListener(private val cs: ClusterService) : ClusterStateListener {
     private val submitted = AtomicBoolean(false)
     override fun clusterChanged(event: ClusterChangedEvent) {
         if (submitted.get()) return
-        if (event.localNodeClusterManager() || event.state().nodes().isLocalNodeElectedClusterManager) {
+        if (event.state().nodes().isLocalNodeElectedClusterManager) {
             if (submitted.compareAndSet(false, true)) {
-                cs.submitStateUpdateTask(CleanupShardTasksUpdateTask.SOURCE, CleanupShardTasksUpdateTask())
+                try {
+                    cs.submitStateUpdateTask(CleanupShardTasksUpdateTask.SOURCE, CleanupShardTasksUpdateTask())
+                } catch (e: Exception) {
+                    log.error("Failed to submit cleanup task: ${e.message}")
+                    submitted.set(false)
+                    return
+                }
                 cs.removeListener(this)
             }
         }
     }
 }
Suggestion importance[1-10]: 7

__

Why: Adding exception handling around submitStateUpdateTask prevents silent failures during cleanup submission. The suggestion also simplifies the redundant cluster manager check. However, the original code's dual check may be intentional for compatibility, and the error recovery logic (resetting submitted to false) could lead to retry loops if the underlying issue persists.

Medium
General
Handle pause-operation failures gracefully

The pauseIndexOnFatalFailure call after catching Throwable may fail or throw
exceptions itself, but there's no error handling around it. If pausing fails, the
context terminates silently without recording the failure. Wrap the pause call in a
try-catch to ensure the original error is logged even if the pause operation fails.

src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationContext.kt [113-128]

 fun start(parentScope: CoroutineScope) {
     if (!started.compareAndSet(false, true)) {
         log.debug("ShardReplicationContext already started for $followerShardId")
         return
     }
     supervisor = parentScope.launch {
         try {
             runReplicateLoop()
         } catch (e: CancellationException) {
             log.info("ShardReplicationContext for $followerShardId cancelled")
         } catch (e: Throwable) {
             log.error("ShardReplicationContext for $followerShardId terminated with error: ${e.stackTraceToString()}")
-            pauseIndexOnFatalFailure(e)
+            try {
+                pauseIndexOnFatalFailure(e)
+            } catch (pauseError: Exception) {
+                log.error("Failed to pause index after fatal failure: ${pauseError.message}")
+            }
         }
     }
 }
Suggestion importance[1-10]: 6

__

Why: Wrapping pauseIndexOnFatalFailure in a try-catch ensures that if the pause operation itself fails, the original error is still logged and the context terminates cleanly. This improves observability. However, the impact is moderate since the outer catch already logs the original error before calling pause.

Low
Clarify shard worker startup responsibility

The transition from INIT_FOLLOW to FOLLOWING now happens immediately after adding
the index block, without verifying that shard workers have actually started on data
nodes. If the NodeReplicationController on data nodes is slow to reconcile or fails
to start contexts, replication may appear to be in FOLLOWING state while no actual
work is happening. Consider adding a verification step or logging to confirm that at
least one shard context has started before transitioning.

src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt [211-217]

 ReplicationState.INIT_FOLLOW -> {
     log.info("Adding index replication block and transitioning to FOLLOWING")
     addIndexBlockForReplication()
+    log.info("Index block added for $followerIndexName. Shard workers will be started by NodeReplicationController on data nodes.")
     FollowingState
 }
Suggestion importance[1-10]: 4

__

Why: The suggestion adds a clarifying log message but does not change behavior. The concern about verifying shard worker startup is valid, but the improved code only adds logging without addressing the verification gap. The existing code is correct per the design (workers start asynchronously via NodeReplicationController), so this is a minor documentation improvement rather than a functional fix.

Low

Previous suggestions

Suggestions up to commit dbaf1ba
CategorySuggestion                                                                                                                                    Impact
Possible issue
Wrap suspend call in coroutine scope

The pauseIndexOnFatalFailure call in the catch block is a suspend function but is
invoked from a non-suspend context. This will fail to compile or cause runtime
issues. Wrap it in a coroutine launch or use runBlocking with proper cancellation
handling.

src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationContext.kt [118-127]

 fun start(parentScope: CoroutineScope) {
     if (!started.compareAndSet(false, true)) {
         log.debug("ShardReplicationContext already started for $followerShardId")
         return
     }
     supervisor = parentScope.launch {
         try {
             runReplicateLoop()
         } catch (e: CancellationException) {
             log.info("ShardReplicationContext for $followerShardId cancelled")
         } catch (e: Throwable) {
             log.error("ShardReplicationContext for $followerShardId terminated with error: ${e.stackTraceToString()}")
-            pauseIndexOnFatalFailure(e)
+            launch(NonCancellable) {
+                pauseIndexOnFatalFailure(e)
+            }
         }
     }
 }
Suggestion importance[1-10]: 9

__

Why: The pauseIndexOnFatalFailure is a suspend function but is called from a non-suspend catch block. This will cause a compilation error or runtime failure. Wrapping it in a coroutine launch with NonCancellable ensures the cleanup logic runs even if the parent scope is cancelled.

High
Replace blocking call with suspend function

The actionGet() call blocks the coroutine thread, defeating the purpose of using
suspend functions. Replace with suspendExecute or similar non-blocking alternative
to maintain proper coroutine semantics and avoid thread pool exhaustion under load.

src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationContext.kt [369-372]

 private suspend fun pauseIndexOnFatalFailure(cause: Throwable) {
     val reason = "Shard $followerShardId failed: ${cause.javaClass.name} - ${cause.message ?: "no message"}"
     ...
     if (replicateDelete && cause is IndexNotFoundException && cause.message?.contains(leaderShardId.indexName) == true) {
         try {
             log.warn("Leader index ${leaderShardId.indexName} unavailable; stopping replication on $followerIndexName")
-            client.execute(
+            client.suspendExecute(
                 INTERNAL_STOP_REPLICATION_ACTION_TYPE,
                 StopIndexReplicationRequest(followerIndexName)
-            ).actionGet()
+            )
             ...
         } catch (e: Exception) {
             log.warn("Stop-on-leader-deletion failed for $followerIndexName, falling through to pause: ${e.message}")
         }
     }
Suggestion importance[1-10]: 8

__

Why: The actionGet() call blocks the coroutine thread, which can lead to thread pool exhaustion under load. Using suspendExecute maintains proper coroutine semantics and is critical for scalability in a high-throughput replication scenario.

Medium
Use coroutine Mutex instead of synchronized

The @Synchronized annotation on a suspend-calling function can cause deadlocks or
thread starvation. Since reconcile is launched in a coroutine scope, use a Mutex
instead to ensure proper coroutine-based synchronization without blocking threads.

src/main/kotlin/org/opensearch/replication/task/shard/NodeReplicationController.kt [115-144]

-@Synchronized
-private fun reconcile(event: ClusterChangedEvent) {
-    val desired = computeDesiredShards(event.state())
+private val reconcileMutex = Mutex()
 
-    val toStop = contexts.keys - desired.keys
-    for (shardId in toStop) {
-        val ctx = contexts.remove(shardId) ?: continue
-        log.info("Stopping replication context for $shardId (no longer eligible)")
-        ctx.stop()
-    }
+private suspend fun reconcile(event: ClusterChangedEvent) {
+    reconcileMutex.withLock {
+        val desired = computeDesiredShards(event.state())
 
-    for ((shardId, params) in desired) {
-        if (contexts.containsKey(shardId)) continue
-        val ctx = ShardReplicationContext(...)
-        ctx.start(controllerScope)
-        contexts[shardId] = ctx
-        log.info("Started replication context for $shardId (leader=${params.leaderAlias}${params.leaderShardId})")
+        val toStop = contexts.keys - desired.keys
+        for (shardId in toStop) {
+            val ctx = contexts.remove(shardId) ?: continue
+            log.info("Stopping replication context for $shardId (no longer eligible)")
+            ctx.stop()
+        }
+
+        for ((shardId, params) in desired) {
+            if (contexts.containsKey(shardId)) continue
+            val ctx = ShardReplicationContext(...)
+            ctx.start(controllerScope)
+            contexts[shardId] = ctx
+            log.info("Started replication context for $shardId (leader=${params.leaderAlias}${params.leaderShardId})")
+        }
     }
 }
Suggestion importance[1-10]: 7

__

Why: The @Synchronized annotation on a function that is invoked from a coroutine context can cause thread blocking and potential deadlocks. Using a coroutine Mutex is the idiomatic approach for coroutine-based synchronization and avoids blocking threads.

Medium
General
Add idempotent pause state update

Multiple concurrent shard failures can race to update the same index to PAUSED
state, potentially causing cluster state contention or lost error messages. Consider
using a cluster-wide lock or idempotent update mechanism to ensure only one context
successfully pauses the index while preserving all failure reasons.

src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationContext.kt [395-405]

 private suspend fun pauseIndexOnFatalFailure(cause: Throwable) {
     ...
     try {
         log.warn("Pausing replication for index $followerIndexName due to fatal shard failure: $reason")
-        replicationMetadataManager.updateIndexReplicationState(
+        val updated = replicationMetadataManager.updateIndexReplicationStateIfNotPaused(
             followerIndexName,
             ReplicationOverallState.PAUSED,
             reason
         )
+        if (!updated) {
+            log.info("Index $followerIndexName already paused by another shard context")
+        }
     } catch (e: Exception) {
-        log.warn("Could not update replication state to PAUSED for $followerIndexName (may already be paused): ${e.message}")
+        log.warn("Could not update replication state to PAUSED for $followerIndexName: ${e.message}")
     }
 }
Suggestion importance[1-10]: 6

__

Why: Multiple concurrent shard failures can race to update the same index to PAUSED, potentially causing cluster state contention. An idempotent update mechanism (e.g., updateIndexReplicationStateIfNotPaused) would reduce contention and improve clarity, though the current implementation is acceptable given the design acknowledges this race condition.

Low
Suggestions up to commit 521067c
CategorySuggestion                                                                                                                                    Impact
Possible issue
Serialize reconcile invocations to prevent races

Launching reconcile asynchronously without coordination can cause race conditions
when multiple cluster state events arrive rapidly. Consider using a single-threaded
dispatcher or an actor pattern to serialize reconcile invocations and prevent
concurrent modifications to the contexts map.

src/main/kotlin/org/opensearch/replication/task/shard/NodeReplicationController.kt [103-112]

+private val reconcileDispatcher = Dispatchers.Default.limitedParallelism(1)
+
 override fun clusterChanged(event: ClusterChangedEvent) {
-    // Reconcile asynchronously to avoid holding up the cluster state applier thread.
-    controllerScope.launch {
+    // Reconcile asynchronously on a single-threaded dispatcher to serialize events
+    controllerScope.launch(reconcileDispatcher) {
         try {
             reconcile(event)
         } catch (e: Throwable) {
             log.warn("NodeReplicationController reconcile failed: ${e.stackTraceToString()}")
         }
     }
 }
Suggestion importance[1-10]: 8

__

Why: Valid concern about concurrent reconcile invocations racing on the contexts map. The suggestion to use a single-threaded dispatcher is a sound mitigation. The design doc mentions synchronization as an open question, and this addresses it directly with a practical solution.

Medium
Fix race between job assignment and stop

The supervisor job is assigned after the coroutine is launched, creating a window
where stop() could be called before supervisor is set, resulting in a no-op stop.
Assign the job reference before launching or use a lateinit property with proper
synchronization to ensure stop() can always cancel the job.

src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationContext.kt [113-128]

 fun start(parentScope: CoroutineScope) {
     if (!started.compareAndSet(false, true)) {
         log.debug("ShardReplicationContext already started for $followerShardId")
         return
     }
-    supervisor = parentScope.launch {
+    val job = parentScope.launch {
         try {
             runReplicateLoop()
         } catch (e: CancellationException) {
             log.info("ShardReplicationContext for $followerShardId cancelled")
         } catch (e: Throwable) {
             log.error("ShardReplicationContext for $followerShardId terminated with error: ${e.stackTraceToString()}")
             pauseIndexOnFatalFailure(e)
         }
     }
+    supervisor = job
 }
Suggestion importance[1-10]: 7

__

Why: Identifies a real race condition where stop() could be called before supervisor is assigned. The improved code assigns the job reference immediately, closing the window. This is a legitimate concurrency issue that could cause stop() to silently fail.

Medium
General
Prevent concurrent pause-state update races

Multiple concurrent shard failures can race to update ReplicationMetadata to PAUSED,
potentially causing contention or lost error messages. Consider using a cluster-wide
lock or a dedicated single-writer actor to serialize pause requests and preserve the
first failure reason for operator visibility.

src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationContext.kt [355-406]

 private suspend fun pauseIndexOnFatalFailure(cause: Throwable) {
     val reason = "Shard $followerShardId failed: ${cause.javaClass.name} - ${cause.message ?: "no message"}"
     ...
     try {
         log.warn("Pausing replication for index $followerIndexName due to fatal shard failure: $reason")
-        replicationMetadataManager.updateIndexReplicationState(
+        // Use optimistic concurrency control: only update if current state is RUNNING
+        replicationMetadataManager.updateIndexReplicationStateIfRunning(
             followerIndexName,
             ReplicationOverallState.PAUSED,
             reason
         )
     } catch (e: Exception) {
         log.warn("Could not update replication state to PAUSED for $followerIndexName (may already be paused): ${e.message}")
     }
 }
Suggestion importance[1-10]: 5

__

Why: The suggestion raises a valid concern about concurrent pause updates. However, the proposed updateIndexReplicationStateIfRunning method does not exist in the codebase, and the design doc explicitly acknowledges this race as acceptable (last-write-wins for the reason field). The suggestion is conceptually sound but requires API changes not in scope for this PR.

Low
Remove redundant atomic boolean check

The double-check pattern with submitted.get() followed by compareAndSet is
redundant. The compareAndSet alone provides atomic check-and-set semantics. Remove
the initial if (submitted.get()) return guard to simplify the logic without changing
behavior.

src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt [254-262]

 private class ShardTaskCleanupListener(private val cs: ClusterService) : ClusterStateListener {
     private val submitted = AtomicBoolean(false)
     override fun clusterChanged(event: ClusterChangedEvent) {
-        if (submitted.get()) return
         if (event.localNodeClusterManager() || event.state().nodes().isLocalNodeElectedClusterManager) {
             if (submitted.compareAndSet(false, true)) {
                 cs.submitStateUpdateTask(CleanupShardTasksUpdateTask.SOURCE, CleanupShardTasksUpdateTask())
                 cs.removeListener(this)
             }
         }
     }
 }
Suggestion importance[1-10]: 4

__

Why: The suggestion correctly identifies a redundant check pattern. However, the initial submitted.get() guard is a common optimization to avoid the more expensive compareAndSet call when already submitted. Removing it doesn't change correctness but may slightly reduce performance in high-frequency scenarios.

Low

@github-actions
Copy link
Copy Markdown

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit dbaf1ba.

PathLineSeverityDescription
src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationContext.kt356mediumstashContext() is used before executing DeleteIndexRequest on the follower index, bypassing the current security/thread context. In OpenSearch Security plugin deployments this allows follower index deletion to run with elevated internal privileges, skipping normal ACL checks. This is a new code path not present in the deleted ShardReplicationTask — it triggers automatically when replicateIndexDeletion is enabled and an IndexNotFoundException is thrown, including potential edge cases where a transient connectivity failure is misclassified as a leader-deleted event.
src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationState.kt152lowFollowingState.invoke(inp: StreamInput) and fromStream(inp: StreamInput) both fail to consume bytes from the StreamInput when deserializing legacy FollowingState payloads. The old implementation wrote a length-prefixed ShardId->PersistentTask map; the new code reads neither the entry count nor any entries. If a legacy payload is encountered during rolling upgrade before CleanupShardTasksUpdateTask runs, the unread bytes corrupt the stream position and will cause deserialization of the next field to fail with an opaque error, potentially destabilizing cluster state reads.

The table above displays the top 10 most important findings.

Total: 2 | Critical: 0 | High: 0 | Medium: 1 | Low: 1


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

@github-actions
Copy link
Copy Markdown

Persistent review updated to latest commit dbaf1ba

Signed-off-by: Ankit Jain <jainankitk@apache.org>
@github-actions
Copy link
Copy Markdown

Persistent review updated to latest commit 5e8f01b

object FollowingState : IndexReplicationState(ReplicationState.FOLLOWING) {
@JvmStatic
@Suppress("UNUSED_PARAMETER")
fun fromStream(inp: StreamInput): FollowingState {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In IndexReplicationState.kt, the new FollowingState.fromStream(inp) reads inp.readVInt() to get the count of legacy shard entries, but the repeat(count) block is empty — it doesn't actually consume the bytes from the stream

If a legacy FollowingState payload is encountered during a rolling upgrade (before CleanupShardTasksUpdateTask runs), the unread bytes will corrupt the stream position and cause opaque deserialization failures downstream. The automated code analyzer already flagged this. The invoke(inp) operator also discards the stream without consuming it. Both need to actually drain the legacy bytes or the BWC story is broken.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants