diff --git a/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt b/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt index 6125619b5..3bacfd25e 100644 --- a/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt +++ b/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt @@ -166,6 +166,32 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript const val AUTOPAUSED_REASON_PREFIX = "AutoPaused: " const val TASK_CANCELLATION_REASON = AUTOPAUSED_REASON_PREFIX + "Index replication task was cancelled by user" + /** + * Returns true if the given settings has `index.auto_expand_replicas` set to anything other + * than "false" (the OpenSearch sentinel that disables auto-expand). + * + * When this returns true, CCR must NOT sync `index.number_of_replicas` from the leader to the + * follower: the follower's local OpenSearch is responsible for computing `number_of_replicas` + * from its own data-node count. See issue #1661. + */ + internal fun isAutoExpandReplicasActive(settings: Settings): Boolean { + val value = settings.get(IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING.key) ?: return false + // OpenSearch stores the disabled sentinel as the literal string "false". + return !value.equals("false", ignoreCase = true) + } + + /** + * Returns a copy of the given settings with `index.number_of_replicas` removed. Used to + * prevent CCR from syncing `number_of_replicas` to a follower whose auto-expand is active + * (see issue #1661). + */ + internal fun filterOutNumberOfReplicas(settings: Settings): Settings { + if (settings.get(IndexMetadata.SETTING_NUMBER_OF_REPLICAS) == null) { + return settings + } + return settings.filter { k: String -> k != IndexMetadata.SETTING_NUMBER_OF_REPLICAS } + } + } //only for testing @@ -523,7 +549,22 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript } } } - val desiredSettings = desiredSettingsBuilder.build() + var desiredSettings = desiredSettingsBuilder.build() + + // Issue #1661: When the follower has `index.auto_expand_replicas` active (any value other + // than "false"), the follower cluster independently manages `index.number_of_replicas` based + // on its local node count. Syncing `number_of_replicas` from the leader in that case destroys + // STARTED replica shards and triggers perpetual YELLOW state as `adaptAutoExpandReplicas()` + // re-creates them only for the next sync to destroy them again. + // + // Note: any explicit `number_of_replicas` the user set in the replication metadata overrides + // is also suppressed here. This is intentional \u2014 `auto_expand_replicas` and a fixed + // `number_of_replicas` are contradictory settings, and the follower's active auto-expand + // takes precedence until it is disabled. + if (isAutoExpandReplicasActive(followerSettings)) { + desiredSettings = filterOutNumberOfReplicas(desiredSettings) + followerSettings = filterOutNumberOfReplicas(followerSettings) + } val changedSettingsBuilder = Settings.builder() for(key in desiredSettings.keySet()) { diff --git a/src/test/kotlin/org/opensearch/replication/task/index/IndexReplicationTaskTests.kt b/src/test/kotlin/org/opensearch/replication/task/index/IndexReplicationTaskTests.kt index f0e4f56a3..809abe6f8 100644 --- a/src/test/kotlin/org/opensearch/replication/task/index/IndexReplicationTaskTests.kt +++ b/src/test/kotlin/org/opensearch/replication/task/index/IndexReplicationTaskTests.kt @@ -386,4 +386,75 @@ class IndexReplicationTaskTests : OpenSearchTestCase() { val isMissingResult = replicationTask.isIndexClosed("non-existent-index") assertThat(isMissingResult).isTrue() // Should default to true for safety } -} \ No newline at end of file + + // --- Issue #1661: number_of_replicas must not be synced when follower has auto_expand_replicas active --- + + fun testIsAutoExpandReplicasActive_trueForNumericRange() { + val settings = Settings.builder() + .put(IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING.key, "0-1") + .build() + assertThat(IndexReplicationTask.isAutoExpandReplicasActive(settings)).isTrue() + } + + fun testIsAutoExpandReplicasActive_trueForAllRange() { + val settings = Settings.builder() + .put(IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING.key, "0-all") + .build() + assertThat(IndexReplicationTask.isAutoExpandReplicasActive(settings)).isTrue() + } + + fun testIsAutoExpandReplicasActive_falseWhenDisabled() { + val settings = Settings.builder() + .put(IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING.key, "false") + .build() + assertThat(IndexReplicationTask.isAutoExpandReplicasActive(settings)).isFalse() + } + + fun testIsAutoExpandReplicasActive_falseWhenDisabledCaseInsensitive() { + val settings = Settings.builder() + .put(IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING.key, "False") + .build() + assertThat(IndexReplicationTask.isAutoExpandReplicasActive(settings)).isFalse() + } + + fun testIsAutoExpandReplicasActive_falseWhenAbsent() { + val settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "1") + .build() + assertThat(IndexReplicationTask.isAutoExpandReplicasActive(settings)).isFalse() + } + + fun testFilterOutNumberOfReplicas_removesKey() { + val settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "1") + .put(IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING.key, "0-all") + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "3") + .build() + + val filtered = IndexReplicationTask.filterOutNumberOfReplicas(settings) + + assertThat(filtered.get(IndexMetadata.SETTING_NUMBER_OF_REPLICAS)).isNull() + // Other keys are preserved. + assertThat(filtered.get(IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING.key)).isEqualTo("0-all") + assertThat(filtered.get(IndexMetadata.SETTING_NUMBER_OF_SHARDS)).isEqualTo("3") + } + + fun testFilterOutNumberOfReplicas_noopWhenKeyAbsent() { + val settings = Settings.builder() + .put(IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING.key, "0-all") + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "3") + .build() + + val filtered = IndexReplicationTask.filterOutNumberOfReplicas(settings) + + // Same keys, same values. + assertThat(filtered.keySet()).isEqualTo(settings.keySet()) + assertThat(filtered.get(IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING.key)).isEqualTo("0-all") + assertThat(filtered.get(IndexMetadata.SETTING_NUMBER_OF_SHARDS)).isEqualTo("3") + } + + fun testFilterOutNumberOfReplicas_emptySettings() { + val filtered = IndexReplicationTask.filterOutNumberOfReplicas(Settings.EMPTY) + assertThat(filtered.keySet()).isEmpty() + } +}