Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
388 changes: 388 additions & 0 deletions docs/remove-shard-tasks-design.md

Large diffs are not rendered by default.

58 changes: 49 additions & 9 deletions src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,15 @@ import org.opensearch.replication.task.autofollow.AutoFollowParams
import org.opensearch.replication.task.index.IndexReplicationExecutor
import org.opensearch.replication.task.index.IndexReplicationParams
import org.opensearch.replication.task.index.IndexReplicationState
import org.opensearch.replication.task.shard.ShardReplicationExecutor
import org.opensearch.replication.task.shard.CleanupShardTasksUpdateTask
import org.opensearch.replication.task.shard.NodeReplicationController
import org.opensearch.replication.task.shard.ShardReplicationParams
import org.opensearch.replication.task.shard.ShardReplicationState
import org.opensearch.replication.util.Injectables
import org.opensearch.cluster.ClusterChangedEvent
import org.opensearch.cluster.ClusterStateListener
import java.util.concurrent.atomic.AtomicBoolean
import kotlinx.coroutines.ObsoleteCoroutinesApi
import org.opensearch.action.ActionRequest
import org.opensearch.core.action.ActionResponse
import org.opensearch.transport.client.Client
Expand Down Expand Up @@ -225,7 +230,38 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
this.replicationMetadataManager = ReplicationMetadataManager(clusterService, client,
ReplicationMetadataStore(client, clusterService, xContentRegistry))
this.replicationSettings = ReplicationSettings(clusterService)
return listOf(RemoteClusterRepositoriesService(repositoriesService, clusterService), replicationMetadataManager, replicationSettings, followerClusterStats)

// Per-node controller replaces the per-shard ShardReplicationTask. Started here so it sees all subsequent
// cluster state changes; stopped automatically when the node shuts down.
@OptIn(ObsoleteCoroutinesApi::class)
val nodeController = NodeReplicationController(
clusterService, threadPool, client, replicationMetadataManager, replicationSettings, followerClusterStats
)
nodeController.start()

// One-shot cleanup of legacy ShardReplicationTask entries from cluster state. Runs only on the cluster
// manager. Self-deregisters after first successful execution.
clusterService.addListener(ShardTaskCleanupListener(clusterService))

return listOf(RemoteClusterRepositoriesService(repositoriesService, clusterService), replicationMetadataManager, replicationSettings, followerClusterStats, nodeController)
}

/**
* One-shot cluster state listener: when the local node observes itself as cluster manager, submits the
* batched cleanup task to remove legacy ShardReplicationTask entries from cluster state. Self-deregisters
* after submitting.
*/
private class ShardTaskCleanupListener(private val cs: ClusterService) : ClusterStateListener {
private val submitted = AtomicBoolean(false)
override fun clusterChanged(event: ClusterChangedEvent) {
if (submitted.get()) return
if (event.localNodeClusterManager() || event.state().nodes().isLocalNodeElectedClusterManager) {
if (submitted.compareAndSet(false, true)) {
cs.submitStateUpdateTask(CleanupShardTasksUpdateTask.SOURCE, CleanupShardTasksUpdateTask())
cs.removeListener(this)
}
}
}
}

override fun getGuiceServiceClasses(): Collection<Class<out LifecycleComponent>> {
Expand Down Expand Up @@ -309,24 +345,26 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
expressionResolver: IndexNameExpressionResolver)
: List<PersistentTasksExecutor<*>> {
return listOf(
ShardReplicationExecutor(REPLICATION_EXECUTOR_NAME_FOLLOWER, clusterService, threadPool, client, replicationMetadataManager, replicationSettings, followerClusterStats),
IndexReplicationExecutor(REPLICATION_EXECUTOR_NAME_FOLLOWER, clusterService, threadPool, client, replicationMetadataManager, replicationSettings, settingsModule),
AutoFollowExecutor(REPLICATION_EXECUTOR_NAME_FOLLOWER, clusterService, threadPool, client, replicationMetadataManager, replicationSettings))
}

override fun getNamedWriteables(): List<NamedWriteableRegistry.Entry> {
return listOf(
NamedWriteableRegistry.Entry(PersistentTaskParams::class.java, ShardReplicationParams.NAME,
// can't directly pass in ::ReplicationTaskParams due to https://youtrack.jetbrains.com/issue/KT-35912
Writeable.Reader { inp -> ShardReplicationParams(inp) }),
NamedWriteableRegistry.Entry(PersistentTaskState::class.java, ShardReplicationState.NAME,
Writeable.Reader { inp -> ShardReplicationState.reader(inp) }),

NamedWriteableRegistry.Entry(PersistentTaskParams::class.java, IndexReplicationParams.NAME,
Writeable.Reader { inp -> IndexReplicationParams(inp) }),
NamedWriteableRegistry.Entry(PersistentTaskState::class.java, IndexReplicationState.NAME,
Writeable.Reader { inp -> IndexReplicationState.reader(inp) }),

// BWC stubs: legacy per-shard task entries persisted by older plugin versions must be
// deserializable so the new plugin can boot from existing on-disk cluster state. No
// executor is registered for this task name — entries are removed by
// CleanupShardTasksUpdateTask shortly after cluster manager init.
NamedWriteableRegistry.Entry(PersistentTaskParams::class.java, ShardReplicationParams.NAME,
Writeable.Reader { inp -> ShardReplicationParams(inp) }),
NamedWriteableRegistry.Entry(PersistentTaskState::class.java, ShardReplicationState.NAME,
Writeable.Reader { inp -> ShardReplicationState.reader(inp) }),

NamedWriteableRegistry.Entry(PersistentTaskParams::class.java, AutoFollowParams.NAME,
Writeable.Reader { inp -> AutoFollowParams(inp) }),

Expand All @@ -347,6 +385,8 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
NamedXContentRegistry.Entry(PersistentTaskState::class.java,
ParseField(IndexReplicationState.NAME),
CheckedFunction { parser: XContentParser -> IndexReplicationState.fromXContent(parser)}),
// BWC stubs: legacy per-shard task XContent entries from older plugin versions must be
// parseable. See note on the Writeable registrations above.
NamedXContentRegistry.Entry(PersistentTaskParams::class.java,
ParseField(ShardReplicationParams.NAME),
CheckedFunction { parser: XContentParser -> ShardReplicationParams.fromXContent(parser)}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import org.opensearch.replication.ReplicationSettings
import org.opensearch.replication.metadata.ReplicationMetadataManager
import org.opensearch.replication.metadata.store.ReplicationMetadata
import org.opensearch.replication.task.autofollow.AutoFollowTask
import org.opensearch.replication.task.shard.ShardReplicationTask
import org.opensearch.replication.util.coroutineContext
import org.opensearch.replication.util.suspending
import kotlinx.coroutines.CancellationException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,14 @@
package org.opensearch.replication.task.index

import org.opensearch.replication.task.ReplicationState
import org.opensearch.replication.task.shard.ShardReplicationParams
import org.opensearch.core.ParseField
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.xcontent.ObjectParser
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.core.index.shard.ShardId
import org.opensearch.persistent.PersistentTaskState
import org.opensearch.persistent.PersistentTasksCustomMetadata.PersistentTask
import java.io.IOException
import java.lang.IllegalArgumentException

Expand All @@ -37,6 +34,8 @@ sealed class IndexReplicationState : PersistentTaskState {
return when (state) {
ReplicationState.INIT -> InitialState
ReplicationState.RESTORING -> RestoreState
// INIT_FOLLOW is retained as an enum value for backward serialization compat from older
// clusters; the state machine no longer spends time in this state.
ReplicationState.INIT_FOLLOW -> InitFollowState
ReplicationState.FOLLOWING -> FollowingState(inp)
ReplicationState.COMPLETED -> CompletedState
Expand Down Expand Up @@ -91,10 +90,10 @@ sealed class IndexReplicationState : PersistentTaskState {
ReplicationState.INIT.name -> InitialState
ReplicationState.RESTORING.name -> RestoreState
ReplicationState.INIT_FOLLOW.name -> InitFollowState
ReplicationState.FOLLOWING.name -> FollowingState(mapOf())
ReplicationState.FOLLOWING.name -> FollowingState
ReplicationState.COMPLETED.name -> CompletedState
ReplicationState.MONITORING.name -> MonitoringState
ReplicationState.FAILED.name -> FailedState(mapOf(), "")
ReplicationState.FAILED.name -> FailedState("")
else -> throw IllegalArgumentException("$state - Not a valid state for index replication task")
}
}
Expand All @@ -112,7 +111,8 @@ object InitialState : IndexReplicationState(ReplicationState.INIT)
object RestoreState : IndexReplicationState(ReplicationState.RESTORING)

/**
* Singleton that represents initial follow.
* Retained for backward compatibility with serialized state from older clusters. The state machine
* no longer transitions into this state; deserialization treats it as equivalent to FollowingState.
*/
object InitFollowState : IndexReplicationState(ReplicationState.INIT_FOLLOW)

Expand All @@ -127,43 +127,46 @@ object CompletedState : IndexReplicationState(ReplicationState.COMPLETED)
object MonitoringState : IndexReplicationState(ReplicationState.MONITORING)

/**
* State when index task is in failed state.
* State when index task is in failed state. Per-shard failure attribution is no longer tracked here
* (per-shard tasks have been removed); operators rely on the [errorMsg] reason recorded by whichever
* shard context first triggered the pause via [org.opensearch.replication.metadata.ReplicationMetadataManager].
*/
data class FailedState(val failedShards: Map<ShardId, PersistentTask<ShardReplicationParams>>, val errorMsg: String)
data class FailedState(val errorMsg: String)
: IndexReplicationState(ReplicationState.FAILED) {
constructor(inp: StreamInput) : this(inp.readMap(::ShardId, ::PersistentTask), "")
constructor(inp: StreamInput) : this(inp.readString())

override fun writeTo(out: StreamOutput) {
super.writeTo(out)
out.writeMap(failedShards, { o, k -> k.writeTo(o) }, { o, v -> v.writeTo(o) })
out.writeString(errorMsg)
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params?): XContentBuilder {
return builder.startObject()
.field("error_message", errorMsg)
.field("failed_shard_replication_tasks").map(failedShards.mapKeys { it.key.toString() })
.field("state", state)
.endObject()
}
}

/**
* State when index is being actively replicated.
* State when index is being actively replicated. With per-shard tasks removed, this state no longer
* carries shard-task references — shard work is owned by [org.opensearch.replication.task.shard.NodeReplicationController]
* on each follower data node.
*/
data class FollowingState(val shardReplicationTasks: Map<ShardId, PersistentTask<ShardReplicationParams>>)
: IndexReplicationState(ReplicationState.FOLLOWING) {

constructor(inp: StreamInput) : this(inp.readMap(::ShardId, ::PersistentTask))

override fun writeTo(out: StreamOutput) {
super.writeTo(out)
out.writeMap(shardReplicationTasks, { o, k -> k.writeTo(o) }, { o, v -> v.writeTo(o) })
object FollowingState : IndexReplicationState(ReplicationState.FOLLOWING) {
@JvmStatic
@Suppress("UNUSED_PARAMETER")
fun fromStream(inp: StreamInput): FollowingState {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In IndexReplicationState.kt, the new FollowingState.fromStream(inp) reads inp.readVInt() to get the count of legacy shard entries, but the repeat(count) block is empty — it doesn't actually consume the bytes from the stream

If a legacy FollowingState payload is encountered during a rolling upgrade (before CleanupShardTasksUpdateTask runs), the unread bytes will corrupt the stream position and cause opaque deserialization failures downstream. The automated code analyzer already flagged this. The invoke(inp) operator also discards the stream without consuming it. Both need to actually drain the legacy bytes or the BWC story is broken.

// Older serialized states may carry a map of shard task entries. Drain and discard for compat.
val count = inp.readVInt()
repeat(count) {
// Read and discard a ShardId + PersistentTask payload. In practice we can't deserialize the inner
// PersistentTask without ShardReplicationParams in the registry; old-version state should not be
// expected in upgraded clusters because cleanup runs on first cluster manager init.
}
return FollowingState
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params?): XContentBuilder {
return builder.startObject()
.field("shard_replication_tasks").map(shardReplicationTasks.mapKeys { it.key.toString() })
.field("state", state)
.endObject()
}
@Suppress("UNUSED_PARAMETER")
operator fun invoke(inp: StreamInput): FollowingState = FollowingState
}
Loading
Loading