Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,32 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
const val AUTOPAUSED_REASON_PREFIX = "AutoPaused: "
const val TASK_CANCELLATION_REASON = AUTOPAUSED_REASON_PREFIX + "Index replication task was cancelled by user"

/**
* Returns true if the given settings has `index.auto_expand_replicas` set to anything other
* than "false" (the OpenSearch sentinel that disables auto-expand).
*
* When this returns true, CCR must NOT sync `index.number_of_replicas` from the leader to the
* follower: the follower's local OpenSearch is responsible for computing `number_of_replicas`
* from its own data-node count. See issue #1661.
*/
internal fun isAutoExpandReplicasActive(settings: Settings): Boolean {
val value = settings.get(IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING.key) ?: return false
// OpenSearch stores the disabled sentinel as the literal string "false".
return !value.equals("false", ignoreCase = true)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldnt it be simpler to check value.equals("true)

}

/**
* Returns a copy of the given settings with `index.number_of_replicas` removed. Used to
* prevent CCR from syncing `number_of_replicas` to a follower whose auto-expand is active
* (see issue #1661).
*/
internal fun filterOutNumberOfReplicas(settings: Settings): Settings {
if (settings.get(IndexMetadata.SETTING_NUMBER_OF_REPLICAS) == null) {
return settings
}
return settings.filter { k: String -> k != IndexMetadata.SETTING_NUMBER_OF_REPLICAS }
}

}

//only for testing
Expand Down Expand Up @@ -523,7 +549,22 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
}
}
}
val desiredSettings = desiredSettingsBuilder.build()
var desiredSettings = desiredSettingsBuilder.build()

// Issue #1661: When the follower has `index.auto_expand_replicas` active (any value other
// than "false"), the follower cluster independently manages `index.number_of_replicas` based
// on its local node count. Syncing `number_of_replicas` from the leader in that case destroys
// STARTED replica shards and triggers perpetual YELLOW state as `adaptAutoExpandReplicas()`
// re-creates them only for the next sync to destroy them again.
//
// Note: any explicit `number_of_replicas` the user set in the replication metadata overrides
// is also suppressed here. This is intentional \u2014 `auto_expand_replicas` and a fixed
// `number_of_replicas` are contradictory settings, and the follower's active auto-expand
// takes precedence until it is disabled.
if (isAutoExpandReplicasActive(followerSettings)) {
desiredSettings = filterOutNumberOfReplicas(desiredSettings)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add this logic above at line 543 where we are already iterating over the settings ?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can add it in line 535 when the desiredSettingsBuilder is being initialised. have done the same fix here in this pr #1664

followerSettings = filterOutNumberOfReplicas(followerSettings)
}

val changedSettingsBuilder = Settings.builder()
for(key in desiredSettings.keySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,4 +386,75 @@ class IndexReplicationTaskTests : OpenSearchTestCase() {
val isMissingResult = replicationTask.isIndexClosed("non-existent-index")
assertThat(isMissingResult).isTrue() // Should default to true for safety
}
}

// --- Issue #1661: number_of_replicas must not be synced when follower has auto_expand_replicas active ---

fun testIsAutoExpandReplicasActive_trueForNumericRange() {
val settings = Settings.builder()
.put(IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING.key, "0-1")
.build()
assertThat(IndexReplicationTask.isAutoExpandReplicasActive(settings)).isTrue()
}

fun testIsAutoExpandReplicasActive_trueForAllRange() {
val settings = Settings.builder()
.put(IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING.key, "0-all")
.build()
assertThat(IndexReplicationTask.isAutoExpandReplicasActive(settings)).isTrue()
}

fun testIsAutoExpandReplicasActive_falseWhenDisabled() {
val settings = Settings.builder()
.put(IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING.key, "false")
.build()
assertThat(IndexReplicationTask.isAutoExpandReplicasActive(settings)).isFalse()
}

fun testIsAutoExpandReplicasActive_falseWhenDisabledCaseInsensitive() {
val settings = Settings.builder()
.put(IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING.key, "False")
.build()
assertThat(IndexReplicationTask.isAutoExpandReplicasActive(settings)).isFalse()
}

fun testIsAutoExpandReplicasActive_falseWhenAbsent() {
val settings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "1")
.build()
assertThat(IndexReplicationTask.isAutoExpandReplicasActive(settings)).isFalse()
}

fun testFilterOutNumberOfReplicas_removesKey() {
val settings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "1")
.put(IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING.key, "0-all")
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "3")
.build()

val filtered = IndexReplicationTask.filterOutNumberOfReplicas(settings)

assertThat(filtered.get(IndexMetadata.SETTING_NUMBER_OF_REPLICAS)).isNull()
// Other keys are preserved.
assertThat(filtered.get(IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING.key)).isEqualTo("0-all")
assertThat(filtered.get(IndexMetadata.SETTING_NUMBER_OF_SHARDS)).isEqualTo("3")
}

fun testFilterOutNumberOfReplicas_noopWhenKeyAbsent() {
val settings = Settings.builder()
.put(IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING.key, "0-all")
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "3")
.build()

val filtered = IndexReplicationTask.filterOutNumberOfReplicas(settings)

// Same keys, same values.
assertThat(filtered.keySet()).isEqualTo(settings.keySet())
assertThat(filtered.get(IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING.key)).isEqualTo("0-all")
assertThat(filtered.get(IndexMetadata.SETTING_NUMBER_OF_SHARDS)).isEqualTo("3")
}

fun testFilterOutNumberOfReplicas_emptySettings() {
val filtered = IndexReplicationTask.filterOutNumberOfReplicas(Settings.EMPTY)
assertThat(filtered.keySet()).isEmpty()
}
}
Loading