@@ -114,9 +114,8 @@ const (
114114
115115// RabbitMQ version upgrade constants
116116const (
117- // DefaultRabbitMQVersion is the default RabbitMQ version when Spec.Version is not set
118- // New instances default to 4.0, but this constant is used for Status.CurrentVersion
119- // initialization to maintain backwards compatibility with existing instances
117+ // DefaultRabbitMQVersion is the default RabbitMQ version when the target-version annotation is not set
118+ // This constant is used for Status.CurrentVersion initialization to maintain backwards compatibility
120119 DefaultRabbitMQVersion = "4.0"
121120 // UpgradeCheckInterval is how often to check upgrade progress
122121 UpgradeCheckInterval = 2 * time .Second
@@ -193,13 +192,15 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ct
193192
194193 // Initialize RabbitMQ version in Status if it doesn't exist
195194 // Status is controller-managed and can't be directly modified by users
196- // We initialize it to match Spec.Version to avoid triggering an immediate upgrade
195+ // We initialize it to match the target-version annotation to avoid triggering an immediate upgrade
197196 if instance .Status .CurrentVersion == "" {
198- // Use Spec.Version if set (webhook will have defaulted it appropriately)
197+ // Use annotation if set by openstack-operator
199198 // Otherwise fall back to the constant for backwards compatibility
200199 initialVersion := DefaultRabbitMQVersion
201- if instance .Spec .Version != nil && * instance .Spec .Version != "" {
202- initialVersion = * instance .Spec .Version
200+ if instance .Annotations != nil {
201+ if targetVersion , hasTarget := instance .Annotations [rabbitmqv1beta1 .AnnotationTargetVersion ]; hasTarget && targetVersion != "" {
202+ initialVersion = targetVersion
203+ }
203204 }
204205 instance .Status .CurrentVersion = initialVersion
205206 Log .Info ("Initialized RabbitMQ current version in status" , "version" , initialVersion )
@@ -218,46 +219,47 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ct
218219
219220 // Check for version upgrade requiring wipe
220221 // Current version comes from Status (controller-managed)
221- // Target version comes from Spec (user-specified via openstack-operator)
222+ // Target version comes from annotation (set by openstack-operator)
222223 currentVersion := instance .Status .CurrentVersion
223- if instance .Spec .Version != nil && * instance .Spec .Version != "" {
224- targetVersion := * instance .Spec .Version
225- needsWipe , err := requiresStorageWipe (currentVersion , targetVersion )
226- if err != nil {
227- Log .Error (err , "Failed to determine upgrade compatibility" ,
228- "currentVersion" , currentVersion ,
229- "targetVersion" , targetVersion )
230- return ctrl.Result {}, fmt .Errorf ("failed to check upgrade compatibility: %w" , err )
231- }
232- if needsWipe {
233- requiresWipe = true
234- wipeReason = "version upgrade"
235- Log .Info ("RabbitMQ upgrade requires storage wipe (no direct upgrade path)" ,
236- "currentVersion" , currentVersion ,
237- "targetVersion" , targetVersion )
238-
239- // Automatically migrate from Mirrored to Quorum when upgrading from 3.x to 4.x
240- // Mirrored queues are deprecated in RabbitMQ 4.0+
241- currentVer , err := rabbitmq .ParseRabbitMQVersion (currentVersion )
242- if err == nil {
243- targetVer , err := rabbitmq .ParseRabbitMQVersion (targetVersion )
244- if err == nil && currentVer .Major == 3 && targetVer .Major >= 4 {
245- // Upgrading from 3.x to 4.x - automatically enforce Quorum queues
246- if instance .Spec .QueueType == nil || * instance .Spec .QueueType != rabbitmqv1beta1 .QueueTypeQuorum {
247- Log .Info ("Upgrading from RabbitMQ 3.x to 4.x - automatically migrating to Quorum queues" ,
248- "currentVersion" , currentVersion ,
249- "targetVersion" , targetVersion )
250- queueType := rabbitmqv1beta1 .QueueTypeQuorum
251- instance .Spec .QueueType = & queueType
252-
253- // Patch the instance to persist the queue type change
254- if err := helper .PatchInstance (ctx , instance ); err != nil {
255- Log .Error (err , "Failed to patch instance with Quorum queue type" )
256- return ctrl.Result {}, fmt .Errorf ("failed to update queue type during upgrade: %w" , err )
224+ if instance .Annotations != nil {
225+ if targetVersion , hasTarget := instance .Annotations [rabbitmqv1beta1 .AnnotationTargetVersion ]; hasTarget && targetVersion != "" {
226+ needsWipe , err := requiresStorageWipe (currentVersion , targetVersion )
227+ if err != nil {
228+ Log .Error (err , "Failed to determine upgrade compatibility" ,
229+ "currentVersion" , currentVersion ,
230+ "targetVersion" , targetVersion )
231+ return ctrl.Result {}, fmt .Errorf ("failed to check upgrade compatibility: %w" , err )
232+ }
233+ if needsWipe {
234+ requiresWipe = true
235+ wipeReason = "version upgrade"
236+ Log .Info ("RabbitMQ upgrade requires storage wipe (no direct upgrade path)" ,
237+ "currentVersion" , currentVersion ,
238+ "targetVersion" , targetVersion )
239+
240+ // Automatically migrate from Mirrored to Quorum when upgrading from 3.x to 4.x
241+ // Mirrored queues are deprecated in RabbitMQ 4.0+
242+ currentVer , err := rabbitmq .ParseRabbitMQVersion (currentVersion )
243+ if err == nil {
244+ targetVer , err := rabbitmq .ParseRabbitMQVersion (targetVersion )
245+ if err == nil && currentVer .Major == 3 && targetVer .Major >= 4 {
246+ // Upgrading from 3.x to 4.x - automatically enforce Quorum queues
247+ if instance .Spec .QueueType == nil || * instance .Spec .QueueType != rabbitmqv1beta1 .QueueTypeQuorum {
248+ Log .Info ("Upgrading from RabbitMQ 3.x to 4.x - automatically migrating to Quorum queues" ,
249+ "currentVersion" , currentVersion ,
250+ "targetVersion" , targetVersion )
251+ queueType := rabbitmqv1beta1 .QueueTypeQuorum
252+ instance .Spec .QueueType = & queueType
253+
254+ // Patch the instance to persist the queue type change
255+ if err := helper .PatchInstance (ctx , instance ); err != nil {
256+ Log .Error (err , "Failed to patch instance with Quorum queue type" )
257+ return ctrl.Result {}, fmt .Errorf ("failed to update queue type during upgrade: %w" , err )
258+ }
259+ Log .Info ("Updated spec.queueType to Quorum for RabbitMQ 4.x compatibility" )
260+ // Requeue to let the change propagate
261+ return ctrl.Result {Requeue : true }, nil
257262 }
258- Log .Info ("Updated spec.queueType to Quorum for RabbitMQ 4.x compatibility" )
259- // Requeue to let the change propagate
260- return ctrl.Result {Requeue : true }, nil
261263 }
262264 }
263265 }
@@ -538,10 +540,12 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ct
538540
539541 // Emit event for observability
540542 if r .Recorder != nil {
541- if wipeReason == "version upgrade" && instance .Spec .Version != nil && * instance .Spec .Version != "" {
542- r .Recorder .Eventf (instance , corev1 .EventTypeNormal , "UpgradeStarted" ,
543- "Starting RabbitMQ upgrade from %s to %s (requires storage wipe)" ,
544- instance .Status .CurrentVersion , * instance .Spec .Version )
543+ if wipeReason == "version upgrade" && instance .Annotations != nil {
544+ if targetVersion , hasTarget := instance .Annotations [rabbitmqv1beta1 .AnnotationTargetVersion ]; hasTarget && targetVersion != "" {
545+ r .Recorder .Eventf (instance , corev1 .EventTypeNormal , "UpgradeStarted" ,
546+ "Starting RabbitMQ upgrade from %s to %s (requires storage wipe)" ,
547+ instance .Status .CurrentVersion , targetVersion )
548+ }
545549 } else {
546550 r .Recorder .Eventf (instance , corev1 .EventTypeNormal , "MigrationStarted" ,
547551 "Starting queue type migration (requires storage wipe)" )
@@ -599,27 +603,29 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ct
599603
600604 // Storage wipe complete - update Status.CurrentVersion for version upgrades
601605 if wipeReason == "version upgrade" {
602- if instance .Spec .Version != nil && * instance .Spec .Version != "" {
603- instance .Status .CurrentVersion = * instance .Spec .Version
604- // Clear the upgrade phase and timestamp
605- instance .Status .UpgradePhase = ""
606- instance .Status .StorageWipeStartedAt = nil
607-
608- // If queue type changed during upgrade, update Status.QueueType to prevent
609- // triggering another wipe for "queue type migration"
610- if instance .Spec .QueueType != nil && * instance .Spec .QueueType == rabbitmqv1beta1 .QueueTypeQuorum {
611- if instance .Status .QueueType == rabbitmqv1beta1 .QueueTypeMirrored {
612- instance .Status .QueueType = rabbitmqv1beta1 .QueueTypeQuorum
613- Log .Info ("Updated Status.QueueType during version upgrade" , "queueType" , "Quorum" )
606+ if instance .Annotations != nil {
607+ if targetVersion , hasTarget := instance .Annotations [rabbitmqv1beta1 .AnnotationTargetVersion ]; hasTarget && targetVersion != "" {
608+ instance .Status .CurrentVersion = targetVersion
609+ // Clear the upgrade phase and timestamp
610+ instance .Status .UpgradePhase = ""
611+ instance .Status .StorageWipeStartedAt = nil
612+
613+ // If queue type changed during upgrade, update Status.QueueType to prevent
614+ // triggering another wipe for "queue type migration"
615+ if instance .Spec .QueueType != nil && * instance .Spec .QueueType == rabbitmqv1beta1 .QueueTypeQuorum {
616+ if instance .Status .QueueType == rabbitmqv1beta1 .QueueTypeMirrored {
617+ instance .Status .QueueType = rabbitmqv1beta1 .QueueTypeQuorum
618+ Log .Info ("Updated Status.QueueType during version upgrade" , "queueType" , "Quorum" )
619+ }
614620 }
615- }
616621
617- Log .Info ("Storage cleanup complete - updated current version in status" , "version" , * instance . Spec . Version )
622+ Log .Info ("Storage cleanup complete - updated current version in status" , "version" , targetVersion )
618623
619- // Emit event for observability
620- if r .Recorder != nil {
621- r .Recorder .Eventf (instance , corev1 .EventTypeNormal , "StorageWipeComplete" ,
622- "Storage successfully wiped, recreating cluster with RabbitMQ %s" , * instance .Spec .Version )
624+ // Emit event for observability
625+ if r .Recorder != nil {
626+ r .Recorder .Eventf (instance , corev1 .EventTypeNormal , "StorageWipeComplete" ,
627+ "Storage successfully wiped, recreating cluster with RabbitMQ %s" , targetVersion )
628+ }
623629 }
624630 }
625631 } else {
0 commit comments