Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions apis/rabbitmq/v1beta1/conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ const (
// Status=True means the proxy is active and must be cleared by setting the
// clients-reconfigured annotation. Status=False means no proxy is running.
RabbitMQProxyActiveCondition condition.Type = "RabbitMQProxyActive"

// ClusterAvailableCondition indicates that the RabbitMQ cluster has quorum
// and can serve traffic. True when ReadyCount >= ceil(Replicas/2).
// It never blocks ReadyCondition since quorum is always met before all replicas are ready.
ClusterAvailableCondition condition.Type = "ClusterAvailable"
)

// TransportURL Condition Types used by API objects.
Expand Down Expand Up @@ -58,6 +63,16 @@ const (
// RabbitMQProxyInactiveMessage is the message when the proxy is not active
RabbitMQProxyInactiveMessage = "AMQP proxy sidecar is not active"

//
// ClusterAvailable condition messages
//

// ClusterAvailableMessage
ClusterAvailableMessage = "RabbitMQ cluster has quorum and can serve traffic"

// ClusterNotAvailableMessage
ClusterNotAvailableMessage = "RabbitMQ cluster does not have quorum (%d/%d replicas ready, need %d)"

//
// TransportURLReady condition messages
//
Expand Down
8 changes: 8 additions & 0 deletions apis/rabbitmq/v1beta1/rabbitmq_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,14 @@ func (instance RabbitMq) IsReady() bool {
return instance.Status.Conditions.IsTrue(condition.ReadyCondition)
}

// IsAvailable - returns true if the cluster has quorum and can serve traffic.
// Unlike IsReady (which requires all replicas), this is true when
// ReadyCount >= ceil(Replicas/2), allowing TransportURLs to remain
// available during rolling restarts.
func (instance RabbitMq) IsAvailable() bool {
return instance.Status.Conditions.IsTrue(ClusterAvailableCondition)
}

// RbacConditionsSet - set the conditions for the rbac object
func (instance RabbitMq) RbacConditionsSet(c *condition.Condition) {
instance.Status.Conditions.Set(c)
Expand Down
44 changes: 0 additions & 44 deletions internal/controller/rabbitmq/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,47 +67,3 @@ func getTLSCACert(ctx context.Context, h *helper.Helper, rabbit *rabbitmqv1.Rabb

return caCert, nil
}

// ClusterReadinessError represents different types of cluster readiness failures
type ClusterReadinessError struct {
ClusterName string
Reason string
IsWaiting bool // true if cluster is starting up, false if it's being deleted
}

func (e *ClusterReadinessError) Error() string {
return e.Reason
}

// checkClusterReadiness validates that a RabbitMQ cluster is ready for operations
func checkClusterReadiness(rabbit *rabbitmqv1.RabbitMq) *ClusterReadinessError {
if !rabbit.DeletionTimestamp.IsZero() {
return &ClusterReadinessError{
ClusterName: rabbit.Name,
Reason: fmt.Sprintf("RabbitMQ cluster %s is being deleted", rabbit.Name),
IsWaiting: false,
}
}

// Check if cluster is ready by checking ReadyCount
if rabbit.Status.ReadyCount == 0 {
return &ClusterReadinessError{
ClusterName: rabbit.Name,
Reason: fmt.Sprintf("RabbitMQ cluster %s is not ready yet", rabbit.Name),
IsWaiting: true,
}
}

// DefaultUser status is populated by the RabbitMQ controller after the
// admin secret is created. There is a brief window where ReadyCount > 0
// but DefaultUser has not been set yet.
if rabbit.Status.DefaultUser == nil {
return &ClusterReadinessError{
ClusterName: rabbit.Name,
Reason: fmt.Sprintf("RabbitMQ cluster %s admin credentials not yet available", rabbit.Name),
IsWaiting: true,
}
}

return nil
}
18 changes: 18 additions & 0 deletions internal/controller/rabbitmq/rabbitmq_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,24 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ct
instance.Status.ReadyCount = sts.Status.ReadyReplicas
clusterReady := common_statefulset.IsReady(*sts)

// ClusterAvailable indicates the cluster has quorum and can serve traffic.
// It is a sub-condition of ReadyCondition but never blocks it: quorum
// (ceil(Replicas/2)) is always satisfied before all replicas are ready.
replicas := ptr.Deref(instance.Spec.Replicas, 1)
quorum := replicas/2 + 1
if instance.Status.ReadyCount >= quorum && instance.Status.DefaultUser != nil {
instance.Status.Conditions.Set(condition.TrueCondition(
rabbitmqv1beta1.ClusterAvailableCondition,
rabbitmqv1beta1.ClusterAvailableMessage))
} else {
instance.Status.Conditions.Set(condition.FalseCondition(
rabbitmqv1beta1.ClusterAvailableCondition,
condition.RequestedReason,
condition.SeverityInfo,
rabbitmqv1beta1.ClusterNotAvailableMessage,
instance.Status.ReadyCount, replicas, quorum))
}

if clusterReady {
instance.Status.Conditions.MarkTrue(condition.DeploymentReadyCondition, condition.DeploymentReadyMessage)

Expand Down
38 changes: 18 additions & 20 deletions internal/controller/rabbitmq/rabbitmqpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,26 +136,24 @@ func (r *RabbitMQPolicyReconciler) reconcileNormal(ctx context.Context, instance
return ctrl.Result{}, err
}

// Check if cluster is ready for operations
if readinessErr := checkClusterReadiness(rabbit); readinessErr != nil {
if readinessErr.IsWaiting {
// Cluster is starting up - set waiting condition
instance.Status.Conditions.Set(condition.FalseCondition(
rabbitmqv1.RabbitMQPolicyReadyCondition,
condition.RequestedReason,
condition.SeverityInfo,
"RabbitMQ policy waiting for dependencies %s",
readinessErr.Reason))
log.FromContext(ctx).Info("Waiting for RabbitMQ cluster to be ready", "cluster", instance.Spec.RabbitmqClusterName)
} else {
// Cluster is being deleted - set error condition
instance.Status.Conditions.Set(condition.FalseCondition(
rabbitmqv1.RabbitMQPolicyReadyCondition,
condition.ErrorReason,
condition.SeverityWarning,
rabbitmqv1.RabbitMQPolicyReadyErrorMessage,
readinessErr.Reason))
}
// Check if cluster is available for operations
if !rabbit.DeletionTimestamp.IsZero() {
instance.Status.Conditions.Set(condition.FalseCondition(
rabbitmqv1.RabbitMQPolicyReadyCondition,
condition.ErrorReason,
condition.SeverityWarning,
rabbitmqv1.RabbitMQPolicyReadyErrorMessage,
fmt.Sprintf("RabbitMQ cluster %s is being deleted", rabbit.Name)))
return ctrl.Result{RequeueAfter: time.Duration(10) * time.Second}, nil
}
if !rabbit.IsAvailable() {
instance.Status.Conditions.Set(condition.FalseCondition(
rabbitmqv1.RabbitMQPolicyReadyCondition,
condition.RequestedReason,
condition.SeverityInfo,
"RabbitMQ policy waiting for cluster %s to have quorum",
rabbit.Name))
log.FromContext(ctx).Info("Waiting for RabbitMQ cluster to be available", "cluster", instance.Spec.RabbitmqClusterName)
return ctrl.Result{RequeueAfter: time.Duration(10) * time.Second}, nil
}

Expand Down
38 changes: 18 additions & 20 deletions internal/controller/rabbitmq/rabbitmquser_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,26 +234,24 @@ func (r *RabbitMQUserReconciler) reconcileNormal(ctx context.Context, instance *
return ctrl.Result{}, err
}

// Check if cluster is ready for operations
if readinessErr := checkClusterReadiness(rabbit); readinessErr != nil {
if readinessErr.IsWaiting {
// Cluster is starting up - set waiting condition
instance.Status.Conditions.Set(condition.FalseCondition(
rabbitmqv1.RabbitMQUserReadyCondition,
condition.RequestedReason,
condition.SeverityInfo,
rabbitmqv1.RabbitMQUserReadyWaitingMessage,
readinessErr.Reason))
Log.Info("Waiting for RabbitMQ cluster to be ready", "cluster", instance.Spec.RabbitmqClusterName)
} else {
// Cluster is being deleted - set error condition
instance.Status.Conditions.Set(condition.FalseCondition(
rabbitmqv1.RabbitMQUserReadyCondition,
condition.ErrorReason,
condition.SeverityWarning,
rabbitmqv1.RabbitMQUserReadyErrorMessage,
readinessErr.Reason))
}
// Check if cluster is available for operations
if !rabbit.DeletionTimestamp.IsZero() {
instance.Status.Conditions.Set(condition.FalseCondition(
rabbitmqv1.RabbitMQUserReadyCondition,
condition.ErrorReason,
condition.SeverityWarning,
rabbitmqv1.RabbitMQUserReadyErrorMessage,
fmt.Sprintf("RabbitMQ cluster %s is being deleted", rabbit.Name)))
return ctrl.Result{RequeueAfter: time.Duration(10) * time.Second}, nil
}
if !rabbit.IsAvailable() {
instance.Status.Conditions.Set(condition.FalseCondition(
rabbitmqv1.RabbitMQUserReadyCondition,
condition.RequestedReason,
condition.SeverityInfo,
rabbitmqv1.RabbitMQUserReadyWaitingMessage,
fmt.Sprintf("cluster %s does not have quorum", rabbit.Name)))
Log.Info("Waiting for RabbitMQ cluster to be available", "cluster", instance.Spec.RabbitmqClusterName)
return ctrl.Result{RequeueAfter: time.Duration(10) * time.Second}, nil
}

Expand Down
38 changes: 18 additions & 20 deletions internal/controller/rabbitmq/rabbitmqvhost_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,26 +128,24 @@ func (r *RabbitMQVhostReconciler) reconcileNormal(ctx context.Context, instance
return ctrl.Result{}, err
}

// Check if cluster is ready for operations
if readinessErr := checkClusterReadiness(rabbit); readinessErr != nil {
if readinessErr.IsWaiting {
// Cluster is starting up - set waiting condition
instance.Status.Conditions.Set(condition.FalseCondition(
rabbitmqv1.RabbitMQVhostReadyCondition,
condition.RequestedReason,
condition.SeverityInfo,
"RabbitMQ vhost waiting for dependencies %s",
readinessErr.Reason))
log.FromContext(ctx).Info("Waiting for RabbitMQ cluster to be ready", "cluster", instance.Spec.RabbitmqClusterName)
} else {
// Cluster is being deleted - set error condition
instance.Status.Conditions.Set(condition.FalseCondition(
rabbitmqv1.RabbitMQVhostReadyCondition,
condition.ErrorReason,
condition.SeverityWarning,
rabbitmqv1.RabbitMQVhostReadyErrorMessage,
readinessErr.Reason))
}
// Check if cluster is available for operations
if !rabbit.DeletionTimestamp.IsZero() {
instance.Status.Conditions.Set(condition.FalseCondition(
rabbitmqv1.RabbitMQVhostReadyCondition,
condition.ErrorReason,
condition.SeverityWarning,
rabbitmqv1.RabbitMQVhostReadyErrorMessage,
fmt.Sprintf("RabbitMQ cluster %s is being deleted", rabbit.Name)))
return ctrl.Result{RequeueAfter: time.Duration(10) * time.Second}, nil
}
if !rabbit.IsAvailable() {
instance.Status.Conditions.Set(condition.FalseCondition(
rabbitmqv1.RabbitMQVhostReadyCondition,
condition.RequestedReason,
condition.SeverityInfo,
"RabbitMQ vhost waiting for cluster %s to have quorum",
rabbit.Name))
log.FromContext(ctx).Info("Waiting for RabbitMQ cluster to be available", "cluster", instance.Spec.RabbitmqClusterName)
return ctrl.Result{RequeueAfter: time.Duration(10) * time.Second}, nil
}

Expand Down
14 changes: 2 additions & 12 deletions internal/controller/rabbitmq/transporturl_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,18 +508,8 @@ func (r *TransportURLReconciler) reconcileNormal(ctx context.Context, instance *
return ctrl.Result{}, err
}

// Wait for RabbitMQ cluster to be ready
if err := checkClusterReadiness(rabbit); err != nil {
instance.Status.Conditions.Set(condition.FalseCondition(
rabbitmqv1.TransportURLReadyCondition,
condition.RequestedReason,
condition.SeverityInfo,
rabbitmqv1.TransportURLInProgressMessage))
return ctrl.Result{RequeueAfter: time.Duration(10) * time.Second}, nil
}

// Get cluster admin secret for connection details
if rabbit.Status.DefaultUser == nil {
// Wait for RabbitMQ cluster to be available (quorum maintained)
if !rabbit.DeletionTimestamp.IsZero() || !rabbit.IsAvailable() {
instance.Status.Conditions.Set(condition.FalseCondition(
rabbitmqv1.TransportURLReadyCondition,
condition.RequestedReason,
Expand Down
Loading