Skip to content

Commit c60ae86

Browse files
committed
Merge branch 'main' into users-cluster-init
2 parents 09cc75c + 8d9217d commit c60ae86

6 files changed

Lines changed: 464 additions & 192 deletions

File tree

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ Valkey Operator is a Kubernetes operator that automates the deployment and manag
2222
> - 🏗️ [Participate in design discussions](https://github.com/valkey-io/valkey-operator/discussions/categories/design)
2323
> - 🙏 [Ask questions](https://github.com/valkey-io/valkey-operator/discussions/categories/q-a)
2424
> - 🐛 [Report bugs](https://github.com/valkey-io/valkey-operator/issues)
25+
>
26+
> Want to discuss the operator development? Join the [tech call every Friday at 11:00-11:30 US Eastern](https://zoom-lfx.platform.linuxfoundation.org/meeting/99658320446?password=2eae4006-633e-4fed-aa93-631ab2101421
27+
).
28+
2529

2630
## Getting Started
2731

internal/controller/utils.go

Lines changed: 50 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,12 @@ const appName = "valkey"
4848
//
4949
// The reconciler reads pod labels (via podRoleAndShard) to decide whether
5050
// to assign slots (node 0 = initial primary) or issue CLUSTER REPLICATE
51-
// (node 1+ = initial replica), and for which shard. This convention only
52-
// applies during initial cluster creation; after a failover, Valkey may
53-
// promote a replica to primary, making node 0 a replica. The labels are
54-
// not updated to reflect this — the live role is always read from
55-
// CLUSTER NODES, not from the labels.
51+
// (node 1+ = initial replica), and for which shard. After a failover,
52+
// Valkey may promote a replica to primary, making node 0 a replica. The
53+
// reconciler detects this via shardExistsInTopology: if the shard already
54+
// has members in the cluster topology, the replacement node-index=0 pod
55+
// joins as a replica instead of trying to claim slots. The labels themselves are not
56+
// updated — the live role is always read from CLUSTER NODES.
5657
//
5758
// Names are set by deploymentName, labels by createClusterDeployment.
5859
const (
@@ -138,29 +139,54 @@ func podRoleAndShard(address string, pods *corev1.PodList) (string, int) {
138139
return RoleReplica, shardIndex
139140
}
140141

141-
// primaryPodIP finds the IP of the node-0 (primary) pod for a given shard by
142-
// reading pod labels. Returns "" if not found.
143-
func primaryPodIP(pods *corev1.PodList, shardIndex int) string {
142+
// shardExistsInTopology reports whether another pod in the same shard (same
143+
// shard-index label) already exists as a member of any shard in the Valkey
144+
// cluster topology. This covers two cases:
145+
//
146+
// 1. Post-failover (completed): a promoted replica is the primary.
147+
// 2. Mid-failover (in progress): the replica exists but hasn't been promoted
148+
// yet — the operator must wait rather than trying to assign new slots.
149+
//
150+
// In both cases the replacement node-index=0 pod must NOT call
151+
// assignSlotsToNewPrimary. Instead it should fall through to
152+
// replicateToShardPrimary, which will either succeed (case 1) or return an
153+
// error and retry on the next reconcile (case 2).
154+
func shardExistsInTopology(state *valkey.ClusterState, shardIndex int, pods *corev1.PodList) bool {
144155
si := strconv.Itoa(shardIndex)
145-
idx := slices.IndexFunc(pods.Items, func(p corev1.Pod) bool {
146-
return p.Labels[LabelShardIndex] == si && p.Labels[LabelNodeIndex] == "0"
147-
})
148-
if idx == -1 {
149-
return ""
156+
for i := range pods.Items {
157+
p := &pods.Items[i]
158+
if p.Labels[LabelShardIndex] != si || p.Status.PodIP == "" {
159+
continue
160+
}
161+
for _, shard := range state.Shards {
162+
for _, node := range shard.Nodes {
163+
if node.Address == p.Status.PodIP {
164+
return true
165+
}
166+
}
167+
}
150168
}
151-
return pods.Items[idx].Status.PodIP
169+
return false
152170
}
153171

154-
// pickPendingNode selects the next pending node to process, prioritizing
155-
// primaries (node index 0) over replicas. This ensures primaries get slots
156-
// before replicas try to attach, since CLUSTER REPLICATE needs the primary
157-
// to already be in state.Shards.
158-
func pickPendingNode(nodes []*valkey.NodeState, pods *corev1.PodList) *valkey.NodeState {
159-
for _, n := range nodes {
160-
role, _ := podRoleAndShard(n.Address, pods)
161-
if role == RolePrimary {
162-
return n
172+
// findShardPrimary scans all pods with the given shard-index label and returns
173+
// the Valkey node ID + IP of whichever pod is currently the slot-bearing
174+
// primary, regardless of its node-index label. This handles the post-failover
175+
// case where node-index=1 (or higher) was promoted by Valkey.
176+
// Returns ("", "") if no primary is found.
177+
func findShardPrimary(state *valkey.ClusterState, shardIndex int, pods *corev1.PodList) (nodeID, ip string) {
178+
si := strconv.Itoa(shardIndex)
179+
for i := range pods.Items {
180+
p := &pods.Items[i]
181+
if p.Labels[LabelShardIndex] != si || p.Status.PodIP == "" {
182+
continue
183+
}
184+
for _, shard := range state.Shards {
185+
primary := shard.GetPrimaryNode()
186+
if primary != nil && primary.Address == p.Status.PodIP {
187+
return primary.Id, p.Status.PodIP
188+
}
163189
}
164190
}
165-
return nodes[0]
191+
return "", ""
166192
}

0 commit comments

Comments
 (0)