Removes per-shard ShardReplicationTask from cluster state#1695
Removes per-shard ShardReplicationTask from cluster state#1695jainankitk wants to merge 7 commits into
ShardReplicationTask from cluster state#1695Conversation
Introduce in-memory replacement for ShardReplicationTask without yet wiring it into the plugin. Existing per-shard persistent task code continues to run. - ShardReplicationContext: lifts the replicate loop out of ShardReplicationTask into a plain class managed by the controller. On terminal failure, pauses the index directly via ReplicationMetadataManager rather than via persistent task state transitions. - NodeReplicationController: cluster state listener that reconciles local follower primaries against ReplicationMetadata, starting/stopping contexts as routing or replication state changes. Includes idle-lease renewal sweep. - CleanupShardTasksUpdateTask: single batched cluster state update that removes legacy ShardReplicationExecutor entries from PersistentTasksCustomMetadata. Idempotent. - docs/remove-shard-tasks-design.md: design rationale and migration notes. No behavior change in this commit. Subsequent commit will wire up the new controller, remove the persistent-task plumbing, and run cleanup. Signed-off-by: Ankit Jain <jainankitk@apache.org>
Activate the in-memory shard replication model added in the previous commit. Per-shard persistent tasks are no longer registered, no longer stored in cluster state, and no longer participate in start/stop/pause/resume flows. Plugin (ReplicationPlugin.kt): - Start NodeReplicationController during createComponents. - Register a one-shot ClusterStateListener that submits CleanupShardTasksUpdateTask when this node becomes cluster manager, removing legacy ShardReplicationTask entries in a single batched update. - Drop ShardReplicationExecutor from getPersistentTasksExecutor and the related ShardReplicationParams/ShardReplicationState entries from getNamedWriteables/getNamedXContent. IndexReplicationTask: - Remove startNewOrMissingShardTasks, findAllReplicationFailedShardTasks, pollShardTaskStatus, startReplicationTask. Per-shard lifecycle and failure detection now live on each follower data node's controller. - INIT_FOLLOW state now just adds the index replication block and transitions to FOLLOWING - no per-shard task spawning. - MONITORING state no longer polls shard task status; it just runs the metadata sync. Shard-level fatal failures pause the index directly via ReplicationMetadataManager from the failing context. - Static-settings update path no longer removes per-shard persistent tasks; index close cascades to controller-driven worker stop. IndexReplicationState: - FollowingState is now an object with no payload (was a data class carrying a map of shard task references). - FailedState carries only an errorMsg (was a map plus message). - INIT_FOLLOW enum value and InitFollowState singleton retained for backward serialization compatibility with state persisted by older clusters; deserialized state effectively transitions through it. Deletions: - ShardReplicationTask, ShardReplicationExecutor, ShardReplicationParams, ShardReplicationState (no longer instantiated). - ShardReplicationExecutorTests (tested deleted class). Tests: - ReplicationHelpers: drop ShardReplicationExecutor.TASK_NAME references. waitForReplicationStart/Stop now wait only on the index-level task. waitForShardTaskStart becomes a no-op stub for source compatibility. - IndexReplicationTaskTests: remove testStartNewShardTasks / testStartMissingShardTasks (tested removed method). Trim createIndexReplicationTask helper. - NoOpClient: drop the synthetic ShardReplicationParams response branch. Build: compileKotlin and compileTestKotlin both pass. Signed-off-by: Ankit Jain <jainankitk@apache.org>
Three substantive bug fixes uncovered by integ test runs, plus unit tests
for the new infrastructure, plus removal/refactor of integ tests that
were inherently coupled to per-shard persistent tasks.
Integration fixes:
- ShardReplicationContext (controller plumbing):
* Resolve leader alias and leader Index (name+UUID) from the surviving
IndexReplicationParams persistent task. The previous attempt to read
these from follower index settings was wrong - those settings only
carry the leader index name, not the cluster alias or UUID.
* Use fully-qualified exception class name in pause-reason text so
existing test helpers that grep for FQNs (e.g.
"org.opensearch.indices.IndexClosedException") still match.
* On fatal IndexNotFoundException matching the leader index AND with
plugins.replication.replicate.delete_index enabled, stop replication
instead of pausing it. Preserves the cleanup path that used to live
in IndexReplicationTask.MONITORING.
- NodeReplicationController:
* Defer clusterService.localNode() reference until after cluster state
initializes. The previous logging at start() time threw
AssertionError "initial cluster state not set yet" during
createComponents.
- IndexReplicationTask.MONITORING:
* Restore SLEEP_TIME_BETWEEN_POLL_MS delay between metadata-sync
iterations. Removing pollShardTaskStatus had inadvertently turned
MONITORING into a tight loop that drowned the log with
"Handling dynamic settings change".
Tests:
- New: CleanupShardTasksUpdateTaskTests - verifies the batched
cluster-state cleanup removes only legacy shard-task entries, is
idempotent on already-clean state, and tolerates absent persistent
tasks metadata.
- New: NodeReplicationControllerTests - verifies the pure
computeDesiredShards function over various cluster states (empty,
paused, no replication metadata, no local node id).
- Removed: TaskCancellationIT - both tests asserted on per-shard
persistent task cancellation behavior that no longer exists.
- Refactor: PauseReplicationIT.testPauseReplicationInRestoringState
drops getShardReplicationTasks().isNotEmpty() assertion.
- Refactor: StopReplicationIT.test_stop_replication_with_stale...
drops one getShardReplicationTasks() assertion.
Build: compileKotlin and compileTestKotlin both pass. Unit tests pass.
Integ test results: 120 tests, 5 failures, 28 skipped. Failures are:
- StartReplicationIT.test_that_snapshot_on_leader_does_not_affect_replication...
expects SYNCING, observes PAUSED. Likely a too-eager
pauseIndexOnFatalFailure during bootstrap.
- StartReplicationIT.test_follower_stats - stats counter mismatch.
- StopReplicationIT.test_delete_follower_index_when_leader_index_is_unavailable
leader-deletion auto-cleanup does not trigger; investigation pending.
- StopReplicationIT.test_stop_replication_with_stale_replication_settings...
second getShardReplicationTasks assertion at line ~352, also
needs the same removal as line 315.
- UpdateAutoFollowPatternIT.test_auto_follow_pattern_with_settings -
cleanup error from leftover read-only block on leader. Likely a
pre-existing test isolation issue surfaced by ordering changes.
These should be addressed before the PR lands.
Signed-off-by: Ankit Jain <jainankitk@apache.org>
Signed-off-by: Ankit Jain <jainankitk@apache.org>
PR Reviewer Guide 🔍(Review updated until commit 5e8f01b)Here are some key observations to aid the review process:
|
PR Code Suggestions ✨Latest suggestions up to 5e8f01b Explore these optional code suggestions:
Previous suggestionsSuggestions up to commit dbaf1ba
Suggestions up to commit 521067c
|
Signed-off-by: Ankit Jain <jainankitk@apache.org>
PR Code Analyzer ❗AI-powered 'Code-Diff-Analyzer' found issues on commit dbaf1ba.
The table above displays the top 10 most important findings. Pull Requests Author(s): Please update your Pull Request according to the report above. Repository Maintainer(s): You can Thanks. |
|
Persistent review updated to latest commit dbaf1ba |
Signed-off-by: Ankit Jain <jainankitk@apache.org>
|
Persistent review updated to latest commit 5e8f01b |
| object FollowingState : IndexReplicationState(ReplicationState.FOLLOWING) { | ||
| @JvmStatic | ||
| @Suppress("UNUSED_PARAMETER") | ||
| fun fromStream(inp: StreamInput): FollowingState { |
There was a problem hiding this comment.
In IndexReplicationState.kt, the new FollowingState.fromStream(inp) reads inp.readVInt() to get the count of legacy shard entries, but the repeat(count) block is empty — it doesn't actually consume the bytes from the stream
If a legacy FollowingState payload is encountered during a rolling upgrade (before CleanupShardTasksUpdateTask runs), the unread bytes will corrupt the stream position and cause opaque deserialization failures downstream. The automated code analyzer already flagged this. The invoke(inp) operator also discards the stream without consuming it. Both need to actually drain the legacy bytes or the BWC story is broken.
Description
Summary
Removes per-shard
ShardReplicationTaskfrom cluster state. Replication of follower shards now runs as in-memory work managed by a per-nodeNodeReplicationController, reconciled from cluster state on everyclusterChangedevent. Per-indexIndexReplicationTaskis unchanged.Why
ShardReplicationTaskwas registered as an OpenSearch persistent task — one entry per follower primary shard — which means start/stop/pause/resume of replication produced O(shards) cluster state updates serialized through the cluster manager. This dominated cluster state churn on large autofollow setups, contributed to ghosttasks blocking restart, and was the root cause for many customer tickets across several trends (Replication stuck/paused, ghost tasks, autofollow rule failures, metadata inconsistency).The runtime state owned by
ShardReplicationTask(changes tracker, sequencer, reader/writer scopes) didn't actually need to live in cluster state. Moving it to in-memory state on the node hosting the primary shard removes the cluster-state cost without changing replication semantics.What changes
Removed:
ShardReplicationTask,ShardReplicationExecutor,ShardReplicationParams,ShardReplicationStateIndexReplicationTask(
startNewOrMissingShardTasks,pollShardTaskStatus,findAllReplicationFailedShardTasks,startReplicationTask)INIT_FOLLOWstate behavior (enum value retained for backward serializationcompatibility; state machine no longer dwells in it)
Added:
NodeReplicationController— per-nodeClusterStateListenerthat reconcileslocal follower primaries against
ReplicationMetadata, starting/stoppingin-memory
ShardReplicationContextinstances. Includes a periodic idle-shardretention-lease renewal sweep so leases on quiet shards don't expire after the
default 12h.
ShardReplicationContext— in-memory replacement forShardReplicationTask.Owns
ShardReplicationChangesTracker,TranslogSequencer, reader/writercoroutine scope, lease renewal, and circuit-breaker state. On terminal failure,
pauses (or stops, when leader-deletion is detected) the index directly via
ReplicationMetadataManagerrather than via persistent-task state transitions.CleanupShardTasksUpdateTask— single batched cluster state update that removeslegacy
ShardReplicationExecutorentries fromPersistentTasksCustomMetadataduring plugin init on the cluster manager. Idempotent.
NodeReplicationControllerTests,CleanupShardTasksUpdateTaskTests.Unchanged:
GetChangesActionandReplayChangesActiondata-fetch protocol.RemoteClusterRepository)._start,_stop,_pause,_resume,_autofollow).IndexReplicationTaskas a per-index persistent task.Behavior changes worth calling out
controller observing routing changes, not via persistent-task allocation. There
is a brief window during primary relocation where neither the old nor the new
node has an active worker —
GetChangesActionis idempotent, so no data loss.renews retention leases for shards that aren't producing batches. Closes a
failure mode where lease lifecycle was coupled to the fetch loop.
_cat/tasks?actions=*replication*no longershows per-shard rows. A status API extension surfacing per-shard worker state
will land in a follow-up PR.
whole index" policy is unchanged. Per-shard isolation is deferred to the status
API follow-up.
Migration
Single batched cluster-state update on first plugin init removes legacy
ShardReplicationTaskentries. Rolling upgrade is supported with a brief replication lag window per node as each one restarts and its controller picks up local primaries; no operator-mandated pause is required.Test plan
compileKotlin,compileTestKotlinNodeReplicationControllerTestsandCleanupShardTasksUpdateTaskTests)./gradlew integTest— 120 tests, 0 failures, 28 skipped (security tests)Out of scope (follow-up PRs)
_cat/taskspreviouslyprovided.
max_buffered_bytes_per_shard, per-leader-alias semaphore).IndexReplicationTaskas a persistent task (per-index, lowcardinality — not the bottleneck).
Risks
scoped to the legacy task name only. Failure logged but doesn't block plugin
startup.
clusterChangedevents. If events are missed or coalesced,next event reconciles — convergent.
_cat/tasksuntil follow-up PR.Check List
--signoff.By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.