diff --git a/apis/rabbitmq/v1beta1/conditions.go b/apis/rabbitmq/v1beta1/conditions.go index 6662c092..5cbf1a3b 100644 --- a/apis/rabbitmq/v1beta1/conditions.go +++ b/apis/rabbitmq/v1beta1/conditions.go @@ -25,6 +25,11 @@ const ( // Status=True means the proxy is active and must be cleared by setting the // clients-reconfigured annotation. Status=False means no proxy is running. RabbitMQProxyActiveCondition condition.Type = "RabbitMQProxyActive" + + // ClusterAvailableCondition indicates that the RabbitMQ cluster has quorum + // and can serve traffic. True when ReadyCount >= ceil(Replicas/2). + // It never blocks ReadyCondition since quorum is always met before all replicas are ready. + ClusterAvailableCondition condition.Type = "ClusterAvailable" ) // TransportURL Condition Types used by API objects. @@ -58,6 +63,16 @@ const ( // RabbitMQProxyInactiveMessage is the message when the proxy is not active RabbitMQProxyInactiveMessage = "AMQP proxy sidecar is not active" + // + // ClusterAvailable condition messages + // + + // ClusterAvailableMessage + ClusterAvailableMessage = "RabbitMQ cluster has quorum and can serve traffic" + + // ClusterNotAvailableMessage + ClusterNotAvailableMessage = "RabbitMQ cluster does not have quorum (%d/%d replicas ready, need %d)" + // // TransportURLReady condition messages // diff --git a/apis/rabbitmq/v1beta1/rabbitmq_types.go b/apis/rabbitmq/v1beta1/rabbitmq_types.go index 2d8e7055..fe85ebae 100644 --- a/apis/rabbitmq/v1beta1/rabbitmq_types.go +++ b/apis/rabbitmq/v1beta1/rabbitmq_types.go @@ -394,6 +394,14 @@ func (instance RabbitMq) IsReady() bool { return instance.Status.Conditions.IsTrue(condition.ReadyCondition) } +// IsAvailable - returns true if the cluster has quorum and can serve traffic. +// Unlike IsReady (which requires all replicas), this is true when +// ReadyCount >= ceil(Replicas/2), allowing TransportURLs to remain +// available during rolling restarts. +func (instance RabbitMq) IsAvailable() bool { + return instance.Status.Conditions.IsTrue(ClusterAvailableCondition) +} + // RbacConditionsSet - set the conditions for the rbac object func (instance RabbitMq) RbacConditionsSet(c *condition.Condition) { instance.Status.Conditions.Set(c) diff --git a/internal/controller/rabbitmq/helpers.go b/internal/controller/rabbitmq/helpers.go index 3756b2ca..12993aa1 100644 --- a/internal/controller/rabbitmq/helpers.go +++ b/internal/controller/rabbitmq/helpers.go @@ -67,47 +67,3 @@ func getTLSCACert(ctx context.Context, h *helper.Helper, rabbit *rabbitmqv1.Rabb return caCert, nil } - -// ClusterReadinessError represents different types of cluster readiness failures -type ClusterReadinessError struct { - ClusterName string - Reason string - IsWaiting bool // true if cluster is starting up, false if it's being deleted -} - -func (e *ClusterReadinessError) Error() string { - return e.Reason -} - -// checkClusterReadiness validates that a RabbitMQ cluster is ready for operations -func checkClusterReadiness(rabbit *rabbitmqv1.RabbitMq) *ClusterReadinessError { - if !rabbit.DeletionTimestamp.IsZero() { - return &ClusterReadinessError{ - ClusterName: rabbit.Name, - Reason: fmt.Sprintf("RabbitMQ cluster %s is being deleted", rabbit.Name), - IsWaiting: false, - } - } - - // Check if cluster is ready by checking ReadyCount - if rabbit.Status.ReadyCount == 0 { - return &ClusterReadinessError{ - ClusterName: rabbit.Name, - Reason: fmt.Sprintf("RabbitMQ cluster %s is not ready yet", rabbit.Name), - IsWaiting: true, - } - } - - // DefaultUser status is populated by the RabbitMQ controller after the - // admin secret is created. There is a brief window where ReadyCount > 0 - // but DefaultUser has not been set yet. - if rabbit.Status.DefaultUser == nil { - return &ClusterReadinessError{ - ClusterName: rabbit.Name, - Reason: fmt.Sprintf("RabbitMQ cluster %s admin credentials not yet available", rabbit.Name), - IsWaiting: true, - } - } - - return nil -} diff --git a/internal/controller/rabbitmq/rabbitmq_controller.go b/internal/controller/rabbitmq/rabbitmq_controller.go index 747440ae..2af25bd6 100644 --- a/internal/controller/rabbitmq/rabbitmq_controller.go +++ b/internal/controller/rabbitmq/rabbitmq_controller.go @@ -868,6 +868,24 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ct instance.Status.ReadyCount = sts.Status.ReadyReplicas clusterReady := common_statefulset.IsReady(*sts) + // ClusterAvailable indicates the cluster has quorum and can serve traffic. + // It is a sub-condition of ReadyCondition but never blocks it: quorum + // (ceil(Replicas/2)) is always satisfied before all replicas are ready. + replicas := ptr.Deref(instance.Spec.Replicas, 1) + quorum := replicas/2 + 1 + if instance.Status.ReadyCount >= quorum && instance.Status.DefaultUser != nil { + instance.Status.Conditions.Set(condition.TrueCondition( + rabbitmqv1beta1.ClusterAvailableCondition, + rabbitmqv1beta1.ClusterAvailableMessage)) + } else { + instance.Status.Conditions.Set(condition.FalseCondition( + rabbitmqv1beta1.ClusterAvailableCondition, + condition.RequestedReason, + condition.SeverityInfo, + rabbitmqv1beta1.ClusterNotAvailableMessage, + instance.Status.ReadyCount, replicas, quorum)) + } + if clusterReady { instance.Status.Conditions.MarkTrue(condition.DeploymentReadyCondition, condition.DeploymentReadyMessage) diff --git a/internal/controller/rabbitmq/rabbitmqpolicy_controller.go b/internal/controller/rabbitmq/rabbitmqpolicy_controller.go index 66355432..ca46f340 100644 --- a/internal/controller/rabbitmq/rabbitmqpolicy_controller.go +++ b/internal/controller/rabbitmq/rabbitmqpolicy_controller.go @@ -136,26 +136,24 @@ func (r *RabbitMQPolicyReconciler) reconcileNormal(ctx context.Context, instance return ctrl.Result{}, err } - // Check if cluster is ready for operations - if readinessErr := checkClusterReadiness(rabbit); readinessErr != nil { - if readinessErr.IsWaiting { - // Cluster is starting up - set waiting condition - instance.Status.Conditions.Set(condition.FalseCondition( - rabbitmqv1.RabbitMQPolicyReadyCondition, - condition.RequestedReason, - condition.SeverityInfo, - "RabbitMQ policy waiting for dependencies %s", - readinessErr.Reason)) - log.FromContext(ctx).Info("Waiting for RabbitMQ cluster to be ready", "cluster", instance.Spec.RabbitmqClusterName) - } else { - // Cluster is being deleted - set error condition - instance.Status.Conditions.Set(condition.FalseCondition( - rabbitmqv1.RabbitMQPolicyReadyCondition, - condition.ErrorReason, - condition.SeverityWarning, - rabbitmqv1.RabbitMQPolicyReadyErrorMessage, - readinessErr.Reason)) - } + // Check if cluster is available for operations + if !rabbit.DeletionTimestamp.IsZero() { + instance.Status.Conditions.Set(condition.FalseCondition( + rabbitmqv1.RabbitMQPolicyReadyCondition, + condition.ErrorReason, + condition.SeverityWarning, + rabbitmqv1.RabbitMQPolicyReadyErrorMessage, + fmt.Sprintf("RabbitMQ cluster %s is being deleted", rabbit.Name))) + return ctrl.Result{RequeueAfter: time.Duration(10) * time.Second}, nil + } + if !rabbit.IsAvailable() { + instance.Status.Conditions.Set(condition.FalseCondition( + rabbitmqv1.RabbitMQPolicyReadyCondition, + condition.RequestedReason, + condition.SeverityInfo, + "RabbitMQ policy waiting for cluster %s to have quorum", + rabbit.Name)) + log.FromContext(ctx).Info("Waiting for RabbitMQ cluster to be available", "cluster", instance.Spec.RabbitmqClusterName) return ctrl.Result{RequeueAfter: time.Duration(10) * time.Second}, nil } diff --git a/internal/controller/rabbitmq/rabbitmquser_controller.go b/internal/controller/rabbitmq/rabbitmquser_controller.go index 323c27d0..3815e77a 100644 --- a/internal/controller/rabbitmq/rabbitmquser_controller.go +++ b/internal/controller/rabbitmq/rabbitmquser_controller.go @@ -234,26 +234,24 @@ func (r *RabbitMQUserReconciler) reconcileNormal(ctx context.Context, instance * return ctrl.Result{}, err } - // Check if cluster is ready for operations - if readinessErr := checkClusterReadiness(rabbit); readinessErr != nil { - if readinessErr.IsWaiting { - // Cluster is starting up - set waiting condition - instance.Status.Conditions.Set(condition.FalseCondition( - rabbitmqv1.RabbitMQUserReadyCondition, - condition.RequestedReason, - condition.SeverityInfo, - rabbitmqv1.RabbitMQUserReadyWaitingMessage, - readinessErr.Reason)) - Log.Info("Waiting for RabbitMQ cluster to be ready", "cluster", instance.Spec.RabbitmqClusterName) - } else { - // Cluster is being deleted - set error condition - instance.Status.Conditions.Set(condition.FalseCondition( - rabbitmqv1.RabbitMQUserReadyCondition, - condition.ErrorReason, - condition.SeverityWarning, - rabbitmqv1.RabbitMQUserReadyErrorMessage, - readinessErr.Reason)) - } + // Check if cluster is available for operations + if !rabbit.DeletionTimestamp.IsZero() { + instance.Status.Conditions.Set(condition.FalseCondition( + rabbitmqv1.RabbitMQUserReadyCondition, + condition.ErrorReason, + condition.SeverityWarning, + rabbitmqv1.RabbitMQUserReadyErrorMessage, + fmt.Sprintf("RabbitMQ cluster %s is being deleted", rabbit.Name))) + return ctrl.Result{RequeueAfter: time.Duration(10) * time.Second}, nil + } + if !rabbit.IsAvailable() { + instance.Status.Conditions.Set(condition.FalseCondition( + rabbitmqv1.RabbitMQUserReadyCondition, + condition.RequestedReason, + condition.SeverityInfo, + rabbitmqv1.RabbitMQUserReadyWaitingMessage, + fmt.Sprintf("cluster %s does not have quorum", rabbit.Name))) + Log.Info("Waiting for RabbitMQ cluster to be available", "cluster", instance.Spec.RabbitmqClusterName) return ctrl.Result{RequeueAfter: time.Duration(10) * time.Second}, nil } diff --git a/internal/controller/rabbitmq/rabbitmqvhost_controller.go b/internal/controller/rabbitmq/rabbitmqvhost_controller.go index 5f6cbeb9..9d7a6b15 100644 --- a/internal/controller/rabbitmq/rabbitmqvhost_controller.go +++ b/internal/controller/rabbitmq/rabbitmqvhost_controller.go @@ -128,26 +128,24 @@ func (r *RabbitMQVhostReconciler) reconcileNormal(ctx context.Context, instance return ctrl.Result{}, err } - // Check if cluster is ready for operations - if readinessErr := checkClusterReadiness(rabbit); readinessErr != nil { - if readinessErr.IsWaiting { - // Cluster is starting up - set waiting condition - instance.Status.Conditions.Set(condition.FalseCondition( - rabbitmqv1.RabbitMQVhostReadyCondition, - condition.RequestedReason, - condition.SeverityInfo, - "RabbitMQ vhost waiting for dependencies %s", - readinessErr.Reason)) - log.FromContext(ctx).Info("Waiting for RabbitMQ cluster to be ready", "cluster", instance.Spec.RabbitmqClusterName) - } else { - // Cluster is being deleted - set error condition - instance.Status.Conditions.Set(condition.FalseCondition( - rabbitmqv1.RabbitMQVhostReadyCondition, - condition.ErrorReason, - condition.SeverityWarning, - rabbitmqv1.RabbitMQVhostReadyErrorMessage, - readinessErr.Reason)) - } + // Check if cluster is available for operations + if !rabbit.DeletionTimestamp.IsZero() { + instance.Status.Conditions.Set(condition.FalseCondition( + rabbitmqv1.RabbitMQVhostReadyCondition, + condition.ErrorReason, + condition.SeverityWarning, + rabbitmqv1.RabbitMQVhostReadyErrorMessage, + fmt.Sprintf("RabbitMQ cluster %s is being deleted", rabbit.Name))) + return ctrl.Result{RequeueAfter: time.Duration(10) * time.Second}, nil + } + if !rabbit.IsAvailable() { + instance.Status.Conditions.Set(condition.FalseCondition( + rabbitmqv1.RabbitMQVhostReadyCondition, + condition.RequestedReason, + condition.SeverityInfo, + "RabbitMQ vhost waiting for cluster %s to have quorum", + rabbit.Name)) + log.FromContext(ctx).Info("Waiting for RabbitMQ cluster to be available", "cluster", instance.Spec.RabbitmqClusterName) return ctrl.Result{RequeueAfter: time.Duration(10) * time.Second}, nil } diff --git a/internal/controller/rabbitmq/transporturl_controller.go b/internal/controller/rabbitmq/transporturl_controller.go index 2b5fd1bd..b84a7ac2 100644 --- a/internal/controller/rabbitmq/transporturl_controller.go +++ b/internal/controller/rabbitmq/transporturl_controller.go @@ -508,18 +508,8 @@ func (r *TransportURLReconciler) reconcileNormal(ctx context.Context, instance * return ctrl.Result{}, err } - // Wait for RabbitMQ cluster to be ready - if err := checkClusterReadiness(rabbit); err != nil { - instance.Status.Conditions.Set(condition.FalseCondition( - rabbitmqv1.TransportURLReadyCondition, - condition.RequestedReason, - condition.SeverityInfo, - rabbitmqv1.TransportURLInProgressMessage)) - return ctrl.Result{RequeueAfter: time.Duration(10) * time.Second}, nil - } - - // Get cluster admin secret for connection details - if rabbit.Status.DefaultUser == nil { + // Wait for RabbitMQ cluster to be available (quorum maintained) + if !rabbit.DeletionTimestamp.IsZero() || !rabbit.IsAvailable() { instance.Status.Conditions.Set(condition.FalseCondition( rabbitmqv1.TransportURLReadyCondition, condition.RequestedReason,