@@ -114,9 +114,8 @@ const (
114114
115115// RabbitMQ version upgrade constants
116116const (
117- // DefaultRabbitMQVersion is the default RabbitMQ version when Spec.Version is not set
118- // New instances default to 4.0, but this constant is used for Status.CurrentVersion
119- // initialization to maintain backwards compatibility with existing instances
117+ // DefaultRabbitMQVersion is the default RabbitMQ version when the target-version annotation is not set
118+ // This constant is used for Status.CurrentVersion initialization to maintain backwards compatibility
120119 DefaultRabbitMQVersion = "4.0"
121120 // UpgradeCheckInterval is how often to check upgrade progress
122121 UpgradeCheckInterval = 2 * time .Second
@@ -193,14 +192,38 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ct
193192
194193 // Initialize RabbitMQ version in Status if it doesn't exist
195194 // Status is controller-managed and can't be directly modified by users
196- // We initialize it to match Spec.Version to avoid triggering an immediate upgrade
195+ // We initialize it to match the target-version annotation to avoid triggering an immediate upgrade
197196 if instance .Status .CurrentVersion == "" {
198- // Use Spec.Version if set (webhook will have defaulted it appropriately)
199- // Otherwise fall back to the constant for backwards compatibility
197+ // Priority order for determining initial version:
198+ // 1. Use annotation if set by openstack-operator
199+ // 2. Check if RabbitMQCluster exists (upgrade scenario - infer we're on 3.9 for backwards compat)
200+ // 3. Fall back to DefaultRabbitMQVersion for new deployments
200201 initialVersion := DefaultRabbitMQVersion
201- if instance .Spec .Version != nil && * instance .Spec .Version != "" {
202- initialVersion = * instance .Spec .Version
202+
203+ if instance .Annotations != nil {
204+ if targetVersion , hasTarget := instance .Annotations [rabbitmqv1beta1 .AnnotationTargetVersion ]; hasTarget && targetVersion != "" {
205+ initialVersion = targetVersion
206+ }
207+ } else {
208+ // No annotation - check if this is an existing cluster
209+ existingCluster := & rabbitmqv2.RabbitmqCluster {}
210+ clusterKey := types.NamespacedName {Name : instance .Name , Namespace : instance .Namespace }
211+ err := r .Get (ctx , clusterKey , existingCluster )
212+
213+ if err == nil && ! existingCluster .CreationTimestamp .IsZero () {
214+ // RabbitMQCluster exists but no annotation - this is an upgrade from old version
215+ // Assume 3.9 for backwards compatibility (old deployments default to 3.9)
216+ initialVersion = "3.9"
217+ Log .Info ("Existing RabbitMQCluster found without version annotation, defaulting to 3.9 for backwards compatibility" ,
218+ "cluster" , clusterKey ,
219+ "clusterCreated" , existingCluster .CreationTimestamp )
220+ } else if err != nil && ! k8s_errors .IsNotFound (err ) {
221+ // Error checking for cluster (not NotFound) - log but continue with default
222+ Log .Error (err , "Failed to check for existing RabbitMQCluster during version initialization" )
223+ }
224+ // If cluster doesn't exist (NotFound), use DefaultRabbitMQVersion (4.0) for new deployments
203225 }
226+
204227 instance .Status .CurrentVersion = initialVersion
205228 Log .Info ("Initialized RabbitMQ current version in status" , "version" , initialVersion )
206229 // Persist the status update
@@ -218,46 +241,47 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ct
218241
219242 // Check for version upgrade requiring wipe
220243 // Current version comes from Status (controller-managed)
221- // Target version comes from Spec (user-specified via openstack-operator)
244+ // Target version comes from annotation (set by openstack-operator)
222245 currentVersion := instance .Status .CurrentVersion
223- if instance .Spec .Version != nil && * instance .Spec .Version != "" {
224- targetVersion := * instance .Spec .Version
225- needsWipe , err := requiresStorageWipe (currentVersion , targetVersion )
226- if err != nil {
227- Log .Error (err , "Failed to determine upgrade compatibility" ,
228- "currentVersion" , currentVersion ,
229- "targetVersion" , targetVersion )
230- return ctrl.Result {}, fmt .Errorf ("failed to check upgrade compatibility: %w" , err )
231- }
232- if needsWipe {
233- requiresWipe = true
234- wipeReason = "version upgrade"
235- Log .Info ("RabbitMQ upgrade requires storage wipe (no direct upgrade path)" ,
236- "currentVersion" , currentVersion ,
237- "targetVersion" , targetVersion )
238-
239- // Automatically migrate from Mirrored to Quorum when upgrading from 3.x to 4.x
240- // Mirrored queues are deprecated in RabbitMQ 4.0+
241- currentVer , err := rabbitmq .ParseRabbitMQVersion (currentVersion )
242- if err == nil {
243- targetVer , err := rabbitmq .ParseRabbitMQVersion (targetVersion )
244- if err == nil && currentVer .Major == 3 && targetVer .Major >= 4 {
245- // Upgrading from 3.x to 4.x - automatically enforce Quorum queues
246- if instance .Spec .QueueType == nil || * instance .Spec .QueueType != rabbitmqv1beta1 .QueueTypeQuorum {
247- Log .Info ("Upgrading from RabbitMQ 3.x to 4.x - automatically migrating to Quorum queues" ,
248- "currentVersion" , currentVersion ,
249- "targetVersion" , targetVersion )
250- queueType := rabbitmqv1beta1 .QueueTypeQuorum
251- instance .Spec .QueueType = & queueType
252-
253- // Patch the instance to persist the queue type change
254- if err := helper .PatchInstance (ctx , instance ); err != nil {
255- Log .Error (err , "Failed to patch instance with Quorum queue type" )
256- return ctrl.Result {}, fmt .Errorf ("failed to update queue type during upgrade: %w" , err )
246+ if instance .Annotations != nil {
247+ if targetVersion , hasTarget := instance .Annotations [rabbitmqv1beta1 .AnnotationTargetVersion ]; hasTarget && targetVersion != "" {
248+ needsWipe , err := requiresStorageWipe (currentVersion , targetVersion )
249+ if err != nil {
250+ Log .Error (err , "Failed to determine upgrade compatibility" ,
251+ "currentVersion" , currentVersion ,
252+ "targetVersion" , targetVersion )
253+ return ctrl.Result {}, fmt .Errorf ("failed to check upgrade compatibility: %w" , err )
254+ }
255+ if needsWipe {
256+ requiresWipe = true
257+ wipeReason = "version upgrade"
258+ Log .Info ("RabbitMQ upgrade requires storage wipe (no direct upgrade path)" ,
259+ "currentVersion" , currentVersion ,
260+ "targetVersion" , targetVersion )
261+
262+ // Automatically migrate from Mirrored to Quorum when upgrading from 3.x to 4.x
263+ // Mirrored queues are deprecated in RabbitMQ 4.0+
264+ currentVer , err := rabbitmq .ParseRabbitMQVersion (currentVersion )
265+ if err == nil {
266+ targetVer , err := rabbitmq .ParseRabbitMQVersion (targetVersion )
267+ if err == nil && currentVer .Major == 3 && targetVer .Major >= 4 {
268+ // Upgrading from 3.x to 4.x - automatically enforce Quorum queues
269+ if instance .Spec .QueueType == nil || * instance .Spec .QueueType != rabbitmqv1beta1 .QueueTypeQuorum {
270+ Log .Info ("Upgrading from RabbitMQ 3.x to 4.x - automatically migrating to Quorum queues" ,
271+ "currentVersion" , currentVersion ,
272+ "targetVersion" , targetVersion )
273+ queueType := rabbitmqv1beta1 .QueueTypeQuorum
274+ instance .Spec .QueueType = & queueType
275+
276+ // Patch the instance to persist the queue type change
277+ if err := helper .PatchInstance (ctx , instance ); err != nil {
278+ Log .Error (err , "Failed to patch instance with Quorum queue type" )
279+ return ctrl.Result {}, fmt .Errorf ("failed to update queue type during upgrade: %w" , err )
280+ }
281+ Log .Info ("Updated spec.queueType to Quorum for RabbitMQ 4.x compatibility" )
282+ // Requeue to let the change propagate
283+ return ctrl.Result {Requeue : true }, nil
257284 }
258- Log .Info ("Updated spec.queueType to Quorum for RabbitMQ 4.x compatibility" )
259- // Requeue to let the change propagate
260- return ctrl.Result {Requeue : true }, nil
261285 }
262286 }
263287 }
@@ -538,10 +562,12 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ct
538562
539563 // Emit event for observability
540564 if r .Recorder != nil {
541- if wipeReason == "version upgrade" && instance .Spec .Version != nil && * instance .Spec .Version != "" {
542- r .Recorder .Eventf (instance , corev1 .EventTypeNormal , "UpgradeStarted" ,
543- "Starting RabbitMQ upgrade from %s to %s (requires storage wipe)" ,
544- instance .Status .CurrentVersion , * instance .Spec .Version )
565+ if wipeReason == "version upgrade" && instance .Annotations != nil {
566+ if targetVersion , hasTarget := instance .Annotations [rabbitmqv1beta1 .AnnotationTargetVersion ]; hasTarget && targetVersion != "" {
567+ r .Recorder .Eventf (instance , corev1 .EventTypeNormal , "UpgradeStarted" ,
568+ "Starting RabbitMQ upgrade from %s to %s (requires storage wipe)" ,
569+ instance .Status .CurrentVersion , targetVersion )
570+ }
545571 } else {
546572 r .Recorder .Eventf (instance , corev1 .EventTypeNormal , "MigrationStarted" ,
547573 "Starting queue type migration (requires storage wipe)" )
@@ -599,27 +625,29 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ct
599625
600626 // Storage wipe complete - update Status.CurrentVersion for version upgrades
601627 if wipeReason == "version upgrade" {
602- if instance .Spec .Version != nil && * instance .Spec .Version != "" {
603- instance .Status .CurrentVersion = * instance .Spec .Version
604- // Clear the upgrade phase and timestamp
605- instance .Status .UpgradePhase = ""
606- instance .Status .StorageWipeStartedAt = nil
607-
608- // If queue type changed during upgrade, update Status.QueueType to prevent
609- // triggering another wipe for "queue type migration"
610- if instance .Spec .QueueType != nil && * instance .Spec .QueueType == rabbitmqv1beta1 .QueueTypeQuorum {
611- if instance .Status .QueueType == rabbitmqv1beta1 .QueueTypeMirrored {
612- instance .Status .QueueType = rabbitmqv1beta1 .QueueTypeQuorum
613- Log .Info ("Updated Status.QueueType during version upgrade" , "queueType" , "Quorum" )
628+ if instance .Annotations != nil {
629+ if targetVersion , hasTarget := instance .Annotations [rabbitmqv1beta1 .AnnotationTargetVersion ]; hasTarget && targetVersion != "" {
630+ instance .Status .CurrentVersion = targetVersion
631+ // Clear the upgrade phase and timestamp
632+ instance .Status .UpgradePhase = ""
633+ instance .Status .StorageWipeStartedAt = nil
634+
635+ // If queue type changed during upgrade, update Status.QueueType to prevent
636+ // triggering another wipe for "queue type migration"
637+ if instance .Spec .QueueType != nil && * instance .Spec .QueueType == rabbitmqv1beta1 .QueueTypeQuorum {
638+ if instance .Status .QueueType == rabbitmqv1beta1 .QueueTypeMirrored {
639+ instance .Status .QueueType = rabbitmqv1beta1 .QueueTypeQuorum
640+ Log .Info ("Updated Status.QueueType during version upgrade" , "queueType" , "Quorum" )
641+ }
614642 }
615- }
616643
617- Log .Info ("Storage cleanup complete - updated current version in status" , "version" , * instance . Spec . Version )
644+ Log .Info ("Storage cleanup complete - updated current version in status" , "version" , targetVersion )
618645
619- // Emit event for observability
620- if r .Recorder != nil {
621- r .Recorder .Eventf (instance , corev1 .EventTypeNormal , "StorageWipeComplete" ,
622- "Storage successfully wiped, recreating cluster with RabbitMQ %s" , * instance .Spec .Version )
646+ // Emit event for observability
647+ if r .Recorder != nil {
648+ r .Recorder .Eventf (instance , corev1 .EventTypeNormal , "StorageWipeComplete" ,
649+ "Storage successfully wiped, recreating cluster with RabbitMQ %s" , targetVersion )
650+ }
623651 }
624652 }
625653 } else {
0 commit comments