Skip to content

Commit fff3bbf

Browse files
lmicciniclaude
andcommitted
Add quorum-based ClusterAvailableCondition for RabbitMQ availability
Replace the raw ReadyCount > 0 check (checkClusterReadiness) with a quorum-aware ClusterAvailableCondition. The new condition is True when ReadyCount >= ceil(Replicas/2), ensuring the cluster has quorum before allowing TransportURL, Policy, Vhost, and User operations. ClusterAvailableCondition is a sub-condition of ReadyCondition but never blocks it: quorum is always satisfied before all replicas are ready. IsReady() still means "fully reconciled" while IsAvailable() means "can serve traffic". Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 316b4d3 commit fff3bbf

8 files changed

Lines changed: 97 additions & 116 deletions

File tree

apis/rabbitmq/v1beta1/conditions.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@ const (
2525
// Status=True means the proxy is active and must be cleared by setting the
2626
// clients-reconfigured annotation. Status=False means no proxy is running.
2727
RabbitMQProxyActiveCondition condition.Type = "RabbitMQProxyActive"
28+
29+
// ClusterAvailableCondition indicates that the RabbitMQ cluster has quorum
30+
// and can serve traffic. True when ReadyCount >= ceil(Replicas/2).
31+
// It never blocks ReadyCondition since quorum is always met before all replicas are ready.
32+
ClusterAvailableCondition condition.Type = "ClusterAvailable"
2833
)
2934

3035
// TransportURL Condition Types used by API objects.
@@ -58,6 +63,16 @@ const (
5863
// RabbitMQProxyInactiveMessage is the message when the proxy is not active
5964
RabbitMQProxyInactiveMessage = "AMQP proxy sidecar is not active"
6065

66+
//
67+
// ClusterAvailable condition messages
68+
//
69+
70+
// ClusterAvailableMessage
71+
ClusterAvailableMessage = "RabbitMQ cluster has quorum and can serve traffic"
72+
73+
// ClusterNotAvailableMessage
74+
ClusterNotAvailableMessage = "RabbitMQ cluster does not have quorum (%d/%d replicas ready, need %d)"
75+
6176
//
6277
// TransportURLReady condition messages
6378
//

apis/rabbitmq/v1beta1/rabbitmq_types.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,14 @@ func (instance RabbitMq) IsReady() bool {
394394
return instance.Status.Conditions.IsTrue(condition.ReadyCondition)
395395
}
396396

397+
// IsAvailable - returns true if the cluster has quorum and can serve traffic.
398+
// Unlike IsReady (which requires all replicas), this is true when
399+
// ReadyCount >= ceil(Replicas/2), allowing TransportURLs to remain
400+
// available during rolling restarts.
401+
func (instance RabbitMq) IsAvailable() bool {
402+
return instance.Status.Conditions.IsTrue(ClusterAvailableCondition)
403+
}
404+
397405
// RbacConditionsSet - set the conditions for the rbac object
398406
func (instance RabbitMq) RbacConditionsSet(c *condition.Condition) {
399407
instance.Status.Conditions.Set(c)

internal/controller/rabbitmq/helpers.go

Lines changed: 0 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -67,47 +67,3 @@ func getTLSCACert(ctx context.Context, h *helper.Helper, rabbit *rabbitmqv1.Rabb
6767

6868
return caCert, nil
6969
}
70-
71-
// ClusterReadinessError represents different types of cluster readiness failures
72-
type ClusterReadinessError struct {
73-
ClusterName string
74-
Reason string
75-
IsWaiting bool // true if cluster is starting up, false if it's being deleted
76-
}
77-
78-
func (e *ClusterReadinessError) Error() string {
79-
return e.Reason
80-
}
81-
82-
// checkClusterReadiness validates that a RabbitMQ cluster is ready for operations
83-
func checkClusterReadiness(rabbit *rabbitmqv1.RabbitMq) *ClusterReadinessError {
84-
if !rabbit.DeletionTimestamp.IsZero() {
85-
return &ClusterReadinessError{
86-
ClusterName: rabbit.Name,
87-
Reason: fmt.Sprintf("RabbitMQ cluster %s is being deleted", rabbit.Name),
88-
IsWaiting: false,
89-
}
90-
}
91-
92-
// Check if cluster is ready by checking ReadyCount
93-
if rabbit.Status.ReadyCount == 0 {
94-
return &ClusterReadinessError{
95-
ClusterName: rabbit.Name,
96-
Reason: fmt.Sprintf("RabbitMQ cluster %s is not ready yet", rabbit.Name),
97-
IsWaiting: true,
98-
}
99-
}
100-
101-
// DefaultUser status is populated by the RabbitMQ controller after the
102-
// admin secret is created. There is a brief window where ReadyCount > 0
103-
// but DefaultUser has not been set yet.
104-
if rabbit.Status.DefaultUser == nil {
105-
return &ClusterReadinessError{
106-
ClusterName: rabbit.Name,
107-
Reason: fmt.Sprintf("RabbitMQ cluster %s admin credentials not yet available", rabbit.Name),
108-
IsWaiting: true,
109-
}
110-
}
111-
112-
return nil
113-
}

internal/controller/rabbitmq/rabbitmq_controller.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -868,6 +868,24 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ct
868868
instance.Status.ReadyCount = sts.Status.ReadyReplicas
869869
clusterReady := common_statefulset.IsReady(*sts)
870870

871+
// ClusterAvailable indicates the cluster has quorum and can serve traffic.
872+
// It is a sub-condition of ReadyCondition but never blocks it: quorum
873+
// (ceil(Replicas/2)) is always satisfied before all replicas are ready.
874+
replicas := ptr.Deref(instance.Spec.Replicas, 1)
875+
quorum := replicas/2 + 1
876+
if instance.Status.ReadyCount >= quorum && instance.Status.DefaultUser != nil {
877+
instance.Status.Conditions.Set(condition.TrueCondition(
878+
rabbitmqv1beta1.ClusterAvailableCondition,
879+
rabbitmqv1beta1.ClusterAvailableMessage))
880+
} else {
881+
instance.Status.Conditions.Set(condition.FalseCondition(
882+
rabbitmqv1beta1.ClusterAvailableCondition,
883+
condition.RequestedReason,
884+
condition.SeverityInfo,
885+
rabbitmqv1beta1.ClusterNotAvailableMessage,
886+
instance.Status.ReadyCount, replicas, quorum))
887+
}
888+
871889
if clusterReady {
872890
instance.Status.Conditions.MarkTrue(condition.DeploymentReadyCondition, condition.DeploymentReadyMessage)
873891

internal/controller/rabbitmq/rabbitmqpolicy_controller.go

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -136,26 +136,24 @@ func (r *RabbitMQPolicyReconciler) reconcileNormal(ctx context.Context, instance
136136
return ctrl.Result{}, err
137137
}
138138

139-
// Check if cluster is ready for operations
140-
if readinessErr := checkClusterReadiness(rabbit); readinessErr != nil {
141-
if readinessErr.IsWaiting {
142-
// Cluster is starting up - set waiting condition
143-
instance.Status.Conditions.Set(condition.FalseCondition(
144-
rabbitmqv1.RabbitMQPolicyReadyCondition,
145-
condition.RequestedReason,
146-
condition.SeverityInfo,
147-
"RabbitMQ policy waiting for dependencies %s",
148-
readinessErr.Reason))
149-
log.FromContext(ctx).Info("Waiting for RabbitMQ cluster to be ready", "cluster", instance.Spec.RabbitmqClusterName)
150-
} else {
151-
// Cluster is being deleted - set error condition
152-
instance.Status.Conditions.Set(condition.FalseCondition(
153-
rabbitmqv1.RabbitMQPolicyReadyCondition,
154-
condition.ErrorReason,
155-
condition.SeverityWarning,
156-
rabbitmqv1.RabbitMQPolicyReadyErrorMessage,
157-
readinessErr.Reason))
158-
}
139+
// Check if cluster is available for operations
140+
if !rabbit.DeletionTimestamp.IsZero() {
141+
instance.Status.Conditions.Set(condition.FalseCondition(
142+
rabbitmqv1.RabbitMQPolicyReadyCondition,
143+
condition.ErrorReason,
144+
condition.SeverityWarning,
145+
rabbitmqv1.RabbitMQPolicyReadyErrorMessage,
146+
fmt.Sprintf("RabbitMQ cluster %s is being deleted", rabbit.Name)))
147+
return ctrl.Result{RequeueAfter: time.Duration(10) * time.Second}, nil
148+
}
149+
if !rabbit.IsAvailable() {
150+
instance.Status.Conditions.Set(condition.FalseCondition(
151+
rabbitmqv1.RabbitMQPolicyReadyCondition,
152+
condition.RequestedReason,
153+
condition.SeverityInfo,
154+
"RabbitMQ policy waiting for cluster %s to have quorum",
155+
rabbit.Name))
156+
log.FromContext(ctx).Info("Waiting for RabbitMQ cluster to be available", "cluster", instance.Spec.RabbitmqClusterName)
159157
return ctrl.Result{RequeueAfter: time.Duration(10) * time.Second}, nil
160158
}
161159

internal/controller/rabbitmq/rabbitmquser_controller.go

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -234,26 +234,24 @@ func (r *RabbitMQUserReconciler) reconcileNormal(ctx context.Context, instance *
234234
return ctrl.Result{}, err
235235
}
236236

237-
// Check if cluster is ready for operations
238-
if readinessErr := checkClusterReadiness(rabbit); readinessErr != nil {
239-
if readinessErr.IsWaiting {
240-
// Cluster is starting up - set waiting condition
241-
instance.Status.Conditions.Set(condition.FalseCondition(
242-
rabbitmqv1.RabbitMQUserReadyCondition,
243-
condition.RequestedReason,
244-
condition.SeverityInfo,
245-
rabbitmqv1.RabbitMQUserReadyWaitingMessage,
246-
readinessErr.Reason))
247-
Log.Info("Waiting for RabbitMQ cluster to be ready", "cluster", instance.Spec.RabbitmqClusterName)
248-
} else {
249-
// Cluster is being deleted - set error condition
250-
instance.Status.Conditions.Set(condition.FalseCondition(
251-
rabbitmqv1.RabbitMQUserReadyCondition,
252-
condition.ErrorReason,
253-
condition.SeverityWarning,
254-
rabbitmqv1.RabbitMQUserReadyErrorMessage,
255-
readinessErr.Reason))
256-
}
237+
// Check if cluster is available for operations
238+
if !rabbit.DeletionTimestamp.IsZero() {
239+
instance.Status.Conditions.Set(condition.FalseCondition(
240+
rabbitmqv1.RabbitMQUserReadyCondition,
241+
condition.ErrorReason,
242+
condition.SeverityWarning,
243+
rabbitmqv1.RabbitMQUserReadyErrorMessage,
244+
fmt.Sprintf("RabbitMQ cluster %s is being deleted", rabbit.Name)))
245+
return ctrl.Result{RequeueAfter: time.Duration(10) * time.Second}, nil
246+
}
247+
if !rabbit.IsAvailable() {
248+
instance.Status.Conditions.Set(condition.FalseCondition(
249+
rabbitmqv1.RabbitMQUserReadyCondition,
250+
condition.RequestedReason,
251+
condition.SeverityInfo,
252+
rabbitmqv1.RabbitMQUserReadyWaitingMessage,
253+
fmt.Sprintf("cluster %s does not have quorum", rabbit.Name)))
254+
Log.Info("Waiting for RabbitMQ cluster to be available", "cluster", instance.Spec.RabbitmqClusterName)
257255
return ctrl.Result{RequeueAfter: time.Duration(10) * time.Second}, nil
258256
}
259257

internal/controller/rabbitmq/rabbitmqvhost_controller.go

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -128,26 +128,24 @@ func (r *RabbitMQVhostReconciler) reconcileNormal(ctx context.Context, instance
128128
return ctrl.Result{}, err
129129
}
130130

131-
// Check if cluster is ready for operations
132-
if readinessErr := checkClusterReadiness(rabbit); readinessErr != nil {
133-
if readinessErr.IsWaiting {
134-
// Cluster is starting up - set waiting condition
135-
instance.Status.Conditions.Set(condition.FalseCondition(
136-
rabbitmqv1.RabbitMQVhostReadyCondition,
137-
condition.RequestedReason,
138-
condition.SeverityInfo,
139-
"RabbitMQ vhost waiting for dependencies %s",
140-
readinessErr.Reason))
141-
log.FromContext(ctx).Info("Waiting for RabbitMQ cluster to be ready", "cluster", instance.Spec.RabbitmqClusterName)
142-
} else {
143-
// Cluster is being deleted - set error condition
144-
instance.Status.Conditions.Set(condition.FalseCondition(
145-
rabbitmqv1.RabbitMQVhostReadyCondition,
146-
condition.ErrorReason,
147-
condition.SeverityWarning,
148-
rabbitmqv1.RabbitMQVhostReadyErrorMessage,
149-
readinessErr.Reason))
150-
}
131+
// Check if cluster is available for operations
132+
if !rabbit.DeletionTimestamp.IsZero() {
133+
instance.Status.Conditions.Set(condition.FalseCondition(
134+
rabbitmqv1.RabbitMQVhostReadyCondition,
135+
condition.ErrorReason,
136+
condition.SeverityWarning,
137+
rabbitmqv1.RabbitMQVhostReadyErrorMessage,
138+
fmt.Sprintf("RabbitMQ cluster %s is being deleted", rabbit.Name)))
139+
return ctrl.Result{RequeueAfter: time.Duration(10) * time.Second}, nil
140+
}
141+
if !rabbit.IsAvailable() {
142+
instance.Status.Conditions.Set(condition.FalseCondition(
143+
rabbitmqv1.RabbitMQVhostReadyCondition,
144+
condition.RequestedReason,
145+
condition.SeverityInfo,
146+
"RabbitMQ vhost waiting for cluster %s to have quorum",
147+
rabbit.Name))
148+
log.FromContext(ctx).Info("Waiting for RabbitMQ cluster to be available", "cluster", instance.Spec.RabbitmqClusterName)
151149
return ctrl.Result{RequeueAfter: time.Duration(10) * time.Second}, nil
152150
}
153151

internal/controller/rabbitmq/transporturl_controller.go

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -508,18 +508,8 @@ func (r *TransportURLReconciler) reconcileNormal(ctx context.Context, instance *
508508
return ctrl.Result{}, err
509509
}
510510

511-
// Wait for RabbitMQ cluster to be ready
512-
if err := checkClusterReadiness(rabbit); err != nil {
513-
instance.Status.Conditions.Set(condition.FalseCondition(
514-
rabbitmqv1.TransportURLReadyCondition,
515-
condition.RequestedReason,
516-
condition.SeverityInfo,
517-
rabbitmqv1.TransportURLInProgressMessage))
518-
return ctrl.Result{RequeueAfter: time.Duration(10) * time.Second}, nil
519-
}
520-
521-
// Get cluster admin secret for connection details
522-
if rabbit.Status.DefaultUser == nil {
511+
// Wait for RabbitMQ cluster to be available (quorum maintained)
512+
if !rabbit.DeletionTimestamp.IsZero() || !rabbit.IsAvailable() {
523513
instance.Status.Conditions.Set(condition.FalseCondition(
524514
rabbitmqv1.TransportURLReadyCondition,
525515
condition.RequestedReason,

0 commit comments

Comments
 (0)