From 03a002ab4ae21a0d552afede1b56bec5f34b8542 Mon Sep 17 00:00:00 2001 From: Monu Singh Date: Tue, 28 Apr 2026 12:49:14 +0530 Subject: [PATCH] Skip syncing number_of_replicas when follower has auto_expand_replicas active Fixes #1661. When the follower index has `index.auto_expand_replicas` active (any value other than the literal "false"), the follower's local OpenSearch is responsible for deriving `index.number_of_replicas` from its own data-node count. CCR's 60s metadata sync was blindly copying the leader's `number_of_replicas` onto the follower, which in topologies where the leader has fewer data nodes than the follower caused a destructive cycle: 1. CCR pushes leader's lower `number_of_replicas` -> STARTED replica shards are destroyed on the follower. 2. OpenSearch's `adaptAutoExpandReplicas()` immediately corrects the count back up -> new UNASSIGNED shards enter peer recovery -> cluster goes YELLOW. 3. 60s later CCR syncs again -> destroys the recovering shards before recovery completes -> cycle repeats forever. Fix: in `IndexReplicationTask.pollForMetadata()`, after computing the desired settings from leader+overrides and before diffing against the follower, if the follower's `auto_expand_replicas` is active, strip `number_of_replicas` from BOTH the desired and follower settings so the diff loops neither add nor remove it. The leader's `auto_expand_replicas` itself continues to be synced, so once the user disables auto-expand on the leader the next sync cycle resumes syncing `number_of_replicas`. Any explicit `number_of_replicas` override set via the replication metadata is also suppressed while auto-expand is active on the follower; this is intentional because a fixed replica count and auto-expand are contradictory and the follower's active auto-expand takes precedence. Added unit tests for the two new companion-object helpers `isAutoExpandReplicasActive` and `filterOutNumberOfReplicas` covering: numeric range, "0-all", "false"/"False"/absent, key removal, no-op when absent, and empty settings. Signed-off-by: Monu Singh --- .../task/index/IndexReplicationTask.kt | 43 ++++++++++- .../task/index/IndexReplicationTaskTests.kt | 73 ++++++++++++++++++- 2 files changed, 114 insertions(+), 2 deletions(-) diff --git a/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt b/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt index 6125619b5..3bacfd25e 100644 --- a/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt +++ b/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt @@ -166,6 +166,32 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript const val AUTOPAUSED_REASON_PREFIX = "AutoPaused: " const val TASK_CANCELLATION_REASON = AUTOPAUSED_REASON_PREFIX + "Index replication task was cancelled by user" + /** + * Returns true if the given settings has `index.auto_expand_replicas` set to anything other + * than "false" (the OpenSearch sentinel that disables auto-expand). + * + * When this returns true, CCR must NOT sync `index.number_of_replicas` from the leader to the + * follower: the follower's local OpenSearch is responsible for computing `number_of_replicas` + * from its own data-node count. See issue #1661. + */ + internal fun isAutoExpandReplicasActive(settings: Settings): Boolean { + val value = settings.get(IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING.key) ?: return false + // OpenSearch stores the disabled sentinel as the literal string "false". + return !value.equals("false", ignoreCase = true) + } + + /** + * Returns a copy of the given settings with `index.number_of_replicas` removed. Used to + * prevent CCR from syncing `number_of_replicas` to a follower whose auto-expand is active + * (see issue #1661). + */ + internal fun filterOutNumberOfReplicas(settings: Settings): Settings { + if (settings.get(IndexMetadata.SETTING_NUMBER_OF_REPLICAS) == null) { + return settings + } + return settings.filter { k: String -> k != IndexMetadata.SETTING_NUMBER_OF_REPLICAS } + } + } //only for testing @@ -523,7 +549,22 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript } } } - val desiredSettings = desiredSettingsBuilder.build() + var desiredSettings = desiredSettingsBuilder.build() + + // Issue #1661: When the follower has `index.auto_expand_replicas` active (any value other + // than "false"), the follower cluster independently manages `index.number_of_replicas` based + // on its local node count. Syncing `number_of_replicas` from the leader in that case destroys + // STARTED replica shards and triggers perpetual YELLOW state as `adaptAutoExpandReplicas()` + // re-creates them only for the next sync to destroy them again. + // + // Note: any explicit `number_of_replicas` the user set in the replication metadata overrides + // is also suppressed here. This is intentional \u2014 `auto_expand_replicas` and a fixed + // `number_of_replicas` are contradictory settings, and the follower's active auto-expand + // takes precedence until it is disabled. + if (isAutoExpandReplicasActive(followerSettings)) { + desiredSettings = filterOutNumberOfReplicas(desiredSettings) + followerSettings = filterOutNumberOfReplicas(followerSettings) + } val changedSettingsBuilder = Settings.builder() for(key in desiredSettings.keySet()) { diff --git a/src/test/kotlin/org/opensearch/replication/task/index/IndexReplicationTaskTests.kt b/src/test/kotlin/org/opensearch/replication/task/index/IndexReplicationTaskTests.kt index f0e4f56a3..809abe6f8 100644 --- a/src/test/kotlin/org/opensearch/replication/task/index/IndexReplicationTaskTests.kt +++ b/src/test/kotlin/org/opensearch/replication/task/index/IndexReplicationTaskTests.kt @@ -386,4 +386,75 @@ class IndexReplicationTaskTests : OpenSearchTestCase() { val isMissingResult = replicationTask.isIndexClosed("non-existent-index") assertThat(isMissingResult).isTrue() // Should default to true for safety } -} \ No newline at end of file + + // --- Issue #1661: number_of_replicas must not be synced when follower has auto_expand_replicas active --- + + fun testIsAutoExpandReplicasActive_trueForNumericRange() { + val settings = Settings.builder() + .put(IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING.key, "0-1") + .build() + assertThat(IndexReplicationTask.isAutoExpandReplicasActive(settings)).isTrue() + } + + fun testIsAutoExpandReplicasActive_trueForAllRange() { + val settings = Settings.builder() + .put(IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING.key, "0-all") + .build() + assertThat(IndexReplicationTask.isAutoExpandReplicasActive(settings)).isTrue() + } + + fun testIsAutoExpandReplicasActive_falseWhenDisabled() { + val settings = Settings.builder() + .put(IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING.key, "false") + .build() + assertThat(IndexReplicationTask.isAutoExpandReplicasActive(settings)).isFalse() + } + + fun testIsAutoExpandReplicasActive_falseWhenDisabledCaseInsensitive() { + val settings = Settings.builder() + .put(IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING.key, "False") + .build() + assertThat(IndexReplicationTask.isAutoExpandReplicasActive(settings)).isFalse() + } + + fun testIsAutoExpandReplicasActive_falseWhenAbsent() { + val settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "1") + .build() + assertThat(IndexReplicationTask.isAutoExpandReplicasActive(settings)).isFalse() + } + + fun testFilterOutNumberOfReplicas_removesKey() { + val settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "1") + .put(IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING.key, "0-all") + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "3") + .build() + + val filtered = IndexReplicationTask.filterOutNumberOfReplicas(settings) + + assertThat(filtered.get(IndexMetadata.SETTING_NUMBER_OF_REPLICAS)).isNull() + // Other keys are preserved. + assertThat(filtered.get(IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING.key)).isEqualTo("0-all") + assertThat(filtered.get(IndexMetadata.SETTING_NUMBER_OF_SHARDS)).isEqualTo("3") + } + + fun testFilterOutNumberOfReplicas_noopWhenKeyAbsent() { + val settings = Settings.builder() + .put(IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING.key, "0-all") + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "3") + .build() + + val filtered = IndexReplicationTask.filterOutNumberOfReplicas(settings) + + // Same keys, same values. + assertThat(filtered.keySet()).isEqualTo(settings.keySet()) + assertThat(filtered.get(IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING.key)).isEqualTo("0-all") + assertThat(filtered.get(IndexMetadata.SETTING_NUMBER_OF_SHARDS)).isEqualTo("3") + } + + fun testFilterOutNumberOfReplicas_emptySettings() { + val filtered = IndexReplicationTask.filterOutNumberOfReplicas(Settings.EMPTY) + assertThat(filtered.keySet()).isEmpty() + } +}