diff --git a/images/virtualization-artifact/pkg/controller/vm/internal/watcher/clustervirtualimage_watcher.go b/images/virtualization-artifact/pkg/controller/vm/internal/watcher/clustervirtualimage_watcher.go new file mode 100644 index 0000000000..a11bf2f559 --- /dev/null +++ b/images/virtualization-artifact/pkg/controller/vm/internal/watcher/clustervirtualimage_watcher.go @@ -0,0 +1,58 @@ +/* +Copyright 2025 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watcher + +import ( + "fmt" + + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/source" + + virtv2 "github.com/deckhouse/virtualization/api/core/v1alpha2" +) + +func NewClusterVirtualImageWatcher() *CLusterVirtualImageWatcher { + return &CLusterVirtualImageWatcher{} +} + +type CLusterVirtualImageWatcher struct{} + +func (w *CLusterVirtualImageWatcher) Watch(mgr manager.Manager, ctr controller.Controller) error { + if err := ctr.Watch( + source.Kind(mgr.GetCache(), &virtv2.ClusterVirtualImage{}), + handler.EnqueueRequestsFromMapFunc(enqueueRequestsBlockDevice(mgr.GetClient(), virtv2.ClusterImageDevice)), + predicate.Funcs{ + CreateFunc: func(e event.CreateEvent) bool { return true }, + DeleteFunc: func(e event.DeleteEvent) bool { return true }, + UpdateFunc: func(e event.UpdateEvent) bool { + oldCvi, oldOk := e.ObjectOld.(*virtv2.ClusterVirtualImage) + newCvi, newOk := e.ObjectNew.(*virtv2.ClusterVirtualImage) + if !oldOk || !newOk { + return false + } + return oldCvi.Status.Phase != newCvi.Status.Phase + }, + }, + ); err != nil { + return fmt.Errorf("error setting watch on ClusterVirtualImage: %w", err) + } + return nil +} diff --git a/images/virtualization-artifact/pkg/controller/vm/internal/watcher/kvvm_watcher.go b/images/virtualization-artifact/pkg/controller/vm/internal/watcher/kvvm_watcher.go new file mode 100644 index 0000000000..1e61a611b7 --- /dev/null +++ b/images/virtualization-artifact/pkg/controller/vm/internal/watcher/kvvm_watcher.go @@ -0,0 +1,65 @@ +/* +Copyright 2025 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watcher + +import ( + "fmt" + + virtv1 "kubevirt.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/source" + + "github.com/deckhouse/virtualization-controller/pkg/common/annotations" + virtv2 "github.com/deckhouse/virtualization/api/core/v1alpha2" +) + +func NewKVVMWatcher() *KVVMWatcher { + return &KVVMWatcher{} +} + +type KVVMWatcher struct{} + +func (w *KVVMWatcher) Watch(mgr manager.Manager, ctr controller.Controller) error { + if err := ctr.Watch( + source.Kind(mgr.GetCache(), &virtv1.VirtualMachine{}), + handler.EnqueueRequestForOwner( + mgr.GetScheme(), + mgr.GetRESTMapper(), + &virtv2.VirtualMachine{}, + handler.OnlyControllerOwner(), + ), + predicate.Funcs{ + CreateFunc: func(e event.CreateEvent) bool { return true }, + DeleteFunc: func(e event.DeleteEvent) bool { return true }, + UpdateFunc: func(e event.UpdateEvent) bool { + oldVM := e.ObjectOld.(*virtv1.VirtualMachine) + newVM := e.ObjectNew.(*virtv1.VirtualMachine) + return oldVM.Status.PrintableStatus != newVM.Status.PrintableStatus || + oldVM.Status.Ready != newVM.Status.Ready || + oldVM.Annotations[annotations.AnnVmStartRequested] != newVM.Annotations[annotations.AnnVmStartRequested] || + oldVM.Annotations[annotations.AnnVmRestartRequested] != newVM.Annotations[annotations.AnnVmRestartRequested] + }, + }, + ); err != nil { + return fmt.Errorf("error setting watch on VirtualMachine: %w", err) + } + return nil +} diff --git a/images/virtualization-artifact/pkg/controller/vm/internal/watcher/kvvmi_watcher.go b/images/virtualization-artifact/pkg/controller/vm/internal/watcher/kvvmi_watcher.go new file mode 100644 index 0000000000..d6e637f45d --- /dev/null +++ b/images/virtualization-artifact/pkg/controller/vm/internal/watcher/kvvmi_watcher.go @@ -0,0 +1,69 @@ +/* +Copyright 2025 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watcher + +import ( + "context" + "fmt" + "reflect" + + "k8s.io/apimachinery/pkg/types" + virtv1 "kubevirt.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +func NewKVVMIWatcher() *KVVMIWatcher { + return &KVVMIWatcher{} +} + +type KVVMIWatcher struct{} + +func (w *KVVMIWatcher) Watch(mgr manager.Manager, ctr controller.Controller) error { + // Subscribe on Kubevirt VirtualMachineInstances to update our VM status. + if err := ctr.Watch( + source.Kind(mgr.GetCache(), &virtv1.VirtualMachineInstance{}), + handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, vmi client.Object) []reconcile.Request { + return []reconcile.Request{ + { + NamespacedName: types.NamespacedName{ + Name: vmi.GetName(), + Namespace: vmi.GetNamespace(), + }, + }, + } + }), + predicate.Funcs{ + CreateFunc: func(e event.CreateEvent) bool { return true }, + DeleteFunc: func(e event.DeleteEvent) bool { return true }, + UpdateFunc: func(e event.UpdateEvent) bool { + oldVM := e.ObjectOld.(*virtv1.VirtualMachineInstance) + newVM := e.ObjectNew.(*virtv1.VirtualMachineInstance) + return !reflect.DeepEqual(oldVM.Status, newVM.Status) + }, + }, + ); err != nil { + return fmt.Errorf("error setting watch on VirtualMachine: %w", err) + } + return nil +} diff --git a/images/virtualization-artifact/pkg/controller/vm/internal/watcher/pod_watcher.go b/images/virtualization-artifact/pkg/controller/vm/internal/watcher/pod_watcher.go new file mode 100644 index 0000000000..071413d009 --- /dev/null +++ b/images/virtualization-artifact/pkg/controller/vm/internal/watcher/pod_watcher.go @@ -0,0 +1,74 @@ +/* +Copyright 2025 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watcher + +import ( + "context" + "fmt" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +func NewPodWatcher() *PodWatcher { + return &PodWatcher{} +} + +type PodWatcher struct{} + +func (w *PodWatcher) Watch(mgr manager.Manager, ctr controller.Controller) error { + // Watch for Pods created on behalf of VMs. Handle only changes in status.phase. + // Pod tracking is required to detect when Pod becomes Completed after guest initiated reset or shutdown. + if err := ctr.Watch( + source.Kind(mgr.GetCache(), &corev1.Pod{}), + handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, pod client.Object) []reconcile.Request { + vmName, hasLabel := pod.GetLabels()["vm.kubevirt.io/name"] + if !hasLabel { + return nil + } + + return []reconcile.Request{ + { + NamespacedName: types.NamespacedName{ + Name: vmName, + Namespace: pod.GetNamespace(), + }, + }, + } + }), + predicate.Funcs{ + CreateFunc: func(e event.CreateEvent) bool { return true }, + DeleteFunc: func(e event.DeleteEvent) bool { return true }, + UpdateFunc: func(e event.UpdateEvent) bool { + oldPod := e.ObjectOld.(*corev1.Pod) + newPod := e.ObjectNew.(*corev1.Pod) + return oldPod.Status.Phase != newPod.Status.Phase + }, + }, + ); err != nil { + return fmt.Errorf("error setting watch on Pod: %w", err) + } + return nil +} diff --git a/images/virtualization-artifact/pkg/controller/vm/internal/watcher/virtualdisk_watcher.go b/images/virtualization-artifact/pkg/controller/vm/internal/watcher/virtualdisk_watcher.go new file mode 100644 index 0000000000..9ec057ecb4 --- /dev/null +++ b/images/virtualization-artifact/pkg/controller/vm/internal/watcher/virtualdisk_watcher.go @@ -0,0 +1,68 @@ +/* +Copyright 2025 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watcher + +import ( + "fmt" + + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/source" + + "github.com/deckhouse/virtualization-controller/pkg/controller/conditions" + virtv2 "github.com/deckhouse/virtualization/api/core/v1alpha2" + "github.com/deckhouse/virtualization/api/core/v1alpha2/vdcondition" +) + +func NewVirtualDiskWatcher() *VirtualDiskWatcher { + return &VirtualDiskWatcher{} +} + +type VirtualDiskWatcher struct{} + +func (w *VirtualDiskWatcher) Watch(mgr manager.Manager, ctr controller.Controller) error { + if err := ctr.Watch( + source.Kind(mgr.GetCache(), &virtv2.VirtualDisk{}), + handler.EnqueueRequestsFromMapFunc(enqueueRequestsBlockDevice(mgr.GetClient(), virtv2.DiskDevice)), + predicate.Funcs{ + CreateFunc: func(e event.CreateEvent) bool { return true }, + DeleteFunc: func(e event.DeleteEvent) bool { return true }, + UpdateFunc: func(e event.UpdateEvent) bool { + oldVd, oldOk := e.ObjectOld.(*virtv2.VirtualDisk) + newVd, newOk := e.ObjectNew.(*virtv2.VirtualDisk) + if !oldOk || !newOk { + return false + } + + oldInUseCondition, _ := conditions.GetCondition(vdcondition.InUseType, oldVd.Status.Conditions) + newInUseCondition, _ := conditions.GetCondition(vdcondition.InUseType, newVd.Status.Conditions) + + if oldVd.Status.Phase != newVd.Status.Phase || oldInUseCondition != newInUseCondition { + return true + } + + return false + }, + }, + ); err != nil { + return fmt.Errorf("error setting watch on VirtualDisk: %w", err) + } + return nil +} diff --git a/images/virtualization-artifact/pkg/controller/vm/internal/watcher/virtualimage_watcher.go b/images/virtualization-artifact/pkg/controller/vm/internal/watcher/virtualimage_watcher.go new file mode 100644 index 0000000000..73b24785f2 --- /dev/null +++ b/images/virtualization-artifact/pkg/controller/vm/internal/watcher/virtualimage_watcher.go @@ -0,0 +1,110 @@ +/* +Copyright 2025 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watcher + +import ( + "context" + "fmt" + + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" + + "github.com/deckhouse/virtualization-controller/pkg/controller/indexer" + virtv2 "github.com/deckhouse/virtualization/api/core/v1alpha2" +) + +func NewVirtualImageWatcher() *VirtualImageWatcher { + return &VirtualImageWatcher{} +} + +type VirtualImageWatcher struct{} + +func (w *VirtualImageWatcher) Watch(mgr manager.Manager, ctr controller.Controller) error { + if err := ctr.Watch( + source.Kind(mgr.GetCache(), &virtv2.VirtualImage{}), + handler.EnqueueRequestsFromMapFunc(enqueueRequestsBlockDevice(mgr.GetClient(), virtv2.ImageDevice)), + predicate.Funcs{ + CreateFunc: func(e event.CreateEvent) bool { return true }, + DeleteFunc: func(e event.DeleteEvent) bool { return true }, + UpdateFunc: func(e event.UpdateEvent) bool { + oldVi, oldOk := e.ObjectOld.(*virtv2.VirtualImage) + newVi, newOk := e.ObjectNew.(*virtv2.VirtualImage) + if !oldOk || !newOk { + return false + } + return oldVi.Status.Phase != newVi.Status.Phase + }, + }, + ); err != nil { + return fmt.Errorf("error setting watch on VirtualImage: %w", err) + } + return nil +} + +func enqueueRequestsBlockDevice(cl client.Client, kind virtv2.BlockDeviceKind) func(ctx context.Context, obj client.Object) []reconcile.Request { + return func(ctx context.Context, obj client.Object) []reconcile.Request { + var opts []client.ListOption + switch kind { + case virtv2.ImageDevice: + if _, ok := obj.(*virtv2.VirtualImage); !ok { + return nil + } + opts = append(opts, + client.InNamespace(obj.GetNamespace()), + client.MatchingFields{indexer.IndexFieldVMByVI: obj.GetName()}, + ) + case virtv2.ClusterImageDevice: + if _, ok := obj.(*virtv2.ClusterVirtualImage); !ok { + return nil + } + opts = append(opts, + client.MatchingFields{indexer.IndexFieldVMByCVI: obj.GetName()}, + ) + case virtv2.DiskDevice: + if _, ok := obj.(*virtv2.VirtualDisk); !ok { + return nil + } + opts = append(opts, + client.InNamespace(obj.GetNamespace()), + client.MatchingFields{indexer.IndexFieldVMByVD: obj.GetName()}, + ) + default: + return nil + } + var vms virtv2.VirtualMachineList + if err := cl.List(ctx, &vms, opts...); err != nil { + return nil + } + var result []reconcile.Request + for _, vm := range vms.Items { + result = append(result, reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: vm.GetName(), + Namespace: vm.GetNamespace(), + }, + }) + } + return result + } +} diff --git a/images/virtualization-artifact/pkg/controller/vm/internal/watcher/vmip_watcher.go b/images/virtualization-artifact/pkg/controller/vm/internal/watcher/vmip_watcher.go new file mode 100644 index 0000000000..665442ca0d --- /dev/null +++ b/images/virtualization-artifact/pkg/controller/vm/internal/watcher/vmip_watcher.go @@ -0,0 +1,77 @@ +/* +Copyright 2025 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watcher + +import ( + "context" + "fmt" + + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" + + virtv2 "github.com/deckhouse/virtualization/api/core/v1alpha2" +) + +func NewVMIPWatcher() *VMIPWatcher { + return &VMIPWatcher{} +} + +type VMIPWatcher struct{} + +func (w *VMIPWatcher) Watch(mgr manager.Manager, ctr controller.Controller) error { + if err := ctr.Watch( + source.Kind(mgr.GetCache(), &virtv2.VirtualMachineIPAddress{}), + handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, obj client.Object) []reconcile.Request { + vmip, ok := obj.(*virtv2.VirtualMachineIPAddress) + if !ok { + return nil + } + name := vmip.Status.VirtualMachine + if name == "" { + return nil + } + return []reconcile.Request{ + { + NamespacedName: types.NamespacedName{ + Name: name, + Namespace: vmip.GetNamespace(), + }, + }, + } + }), + predicate.Funcs{ + CreateFunc: func(e event.CreateEvent) bool { return true }, + DeleteFunc: func(e event.DeleteEvent) bool { return true }, + UpdateFunc: func(e event.UpdateEvent) bool { + oldVmip := e.ObjectOld.(*virtv2.VirtualMachineIPAddress) + newVmip := e.ObjectNew.(*virtv2.VirtualMachineIPAddress) + return oldVmip.Status.Phase != newVmip.Status.Phase || + oldVmip.Status.VirtualMachine != newVmip.Status.VirtualMachine + }, + }, + ); err != nil { + return fmt.Errorf("error setting watch on VirtualMachineIpAddress: %w", err) + } + return nil +} diff --git a/images/virtualization-artifact/pkg/controller/vm/vm_reconciler.go b/images/virtualization-artifact/pkg/controller/vm/vm_reconciler.go index 0525b0d325..778b4b9c63 100644 --- a/images/virtualization-artifact/pkg/controller/vm/vm_reconciler.go +++ b/images/virtualization-artifact/pkg/controller/vm/vm_reconciler.go @@ -21,27 +21,18 @@ import ( "fmt" "reflect" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" - virtv1 "kubevirt.io/api/core/v1" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" - "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" - "github.com/deckhouse/virtualization-controller/pkg/common/annotations" - "github.com/deckhouse/virtualization-controller/pkg/controller/conditions" - "github.com/deckhouse/virtualization-controller/pkg/controller/indexer" "github.com/deckhouse/virtualization-controller/pkg/controller/reconciler" "github.com/deckhouse/virtualization-controller/pkg/controller/vm/internal/state" "github.com/deckhouse/virtualization-controller/pkg/controller/vm/internal/watcher" "github.com/deckhouse/virtualization-controller/pkg/logger" virtv2 "github.com/deckhouse/virtualization/api/core/v1alpha2" - "github.com/deckhouse/virtualization/api/core/v1alpha2/vdcondition" ) type Handler interface { @@ -70,192 +61,14 @@ func (r *Reconciler) SetupController(_ context.Context, mgr manager.Manager, ctr return fmt.Errorf("error setting watch on VM: %w", err) } - if err := ctr.Watch( - source.Kind(mgr.GetCache(), &virtv1.VirtualMachine{}), - handler.EnqueueRequestForOwner( - mgr.GetScheme(), - mgr.GetRESTMapper(), - &virtv2.VirtualMachine{}, - handler.OnlyControllerOwner(), - ), - predicate.Funcs{ - CreateFunc: func(e event.CreateEvent) bool { return true }, - DeleteFunc: func(e event.DeleteEvent) bool { return true }, - UpdateFunc: func(e event.UpdateEvent) bool { - oldVM := e.ObjectOld.(*virtv1.VirtualMachine) - newVM := e.ObjectNew.(*virtv1.VirtualMachine) - return oldVM.Status.PrintableStatus != newVM.Status.PrintableStatus || - oldVM.Status.Ready != newVM.Status.Ready || - oldVM.Annotations[annotations.AnnVmStartRequested] != newVM.Annotations[annotations.AnnVmStartRequested] || - oldVM.Annotations[annotations.AnnVmRestartRequested] != newVM.Annotations[annotations.AnnVmRestartRequested] - }, - }, - ); err != nil { - return fmt.Errorf("error setting watch on VirtualMachine: %w", err) - } - - // Subscribe on Kubevirt VirtualMachineInstances to update our VM status. - if err := ctr.Watch( - source.Kind(mgr.GetCache(), &virtv1.VirtualMachineInstance{}), - handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, vmi client.Object) []reconcile.Request { - return []reconcile.Request{ - { - NamespacedName: types.NamespacedName{ - Name: vmi.GetName(), - Namespace: vmi.GetNamespace(), - }, - }, - } - }), - predicate.Funcs{ - CreateFunc: func(e event.CreateEvent) bool { return true }, - DeleteFunc: func(e event.DeleteEvent) bool { return true }, - UpdateFunc: func(e event.UpdateEvent) bool { - oldVM := e.ObjectOld.(*virtv1.VirtualMachineInstance) - newVM := e.ObjectNew.(*virtv1.VirtualMachineInstance) - return !reflect.DeepEqual(oldVM.Status, newVM.Status) - }, - }, - ); err != nil { - return fmt.Errorf("error setting watch on VirtualMachine: %w", err) - } - - // Watch for Pods created on behalf of VMs. Handle only changes in status.phase. - // Pod tracking is required to detect when Pod becomes Completed after guest initiated reset or shutdown. - if err := ctr.Watch( - source.Kind(mgr.GetCache(), &corev1.Pod{}), - handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, pod client.Object) []reconcile.Request { - vmName, hasLabel := pod.GetLabels()["vm.kubevirt.io/name"] - if !hasLabel { - return nil - } - - return []reconcile.Request{ - { - NamespacedName: types.NamespacedName{ - Name: vmName, - Namespace: pod.GetNamespace(), - }, - }, - } - }), - predicate.Funcs{ - CreateFunc: func(e event.CreateEvent) bool { return true }, - DeleteFunc: func(e event.DeleteEvent) bool { return true }, - UpdateFunc: func(e event.UpdateEvent) bool { - oldPod := e.ObjectOld.(*corev1.Pod) - newPod := e.ObjectNew.(*corev1.Pod) - return oldPod.Status.Phase != newPod.Status.Phase - }, - }, - ); err != nil { - return fmt.Errorf("error setting watch on Pod: %w", err) - } - - // Subscribe on VirtualMachineIpAddress. - if err := ctr.Watch( - source.Kind(mgr.GetCache(), &virtv2.VirtualMachineIPAddress{}), - handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, obj client.Object) []reconcile.Request { - vmip, ok := obj.(*virtv2.VirtualMachineIPAddress) - if !ok { - return nil - } - name := vmip.Status.VirtualMachine - if name == "" { - return nil - } - return []reconcile.Request{ - { - NamespacedName: types.NamespacedName{ - Name: name, - Namespace: vmip.GetNamespace(), - }, - }, - } - }), - predicate.Funcs{ - CreateFunc: func(e event.CreateEvent) bool { return true }, - DeleteFunc: func(e event.DeleteEvent) bool { return true }, - UpdateFunc: func(e event.UpdateEvent) bool { - oldVmip := e.ObjectOld.(*virtv2.VirtualMachineIPAddress) - newVmip := e.ObjectNew.(*virtv2.VirtualMachineIPAddress) - return oldVmip.Status.Phase != newVmip.Status.Phase || - oldVmip.Status.VirtualMachine != newVmip.Status.VirtualMachine - }, - }, - ); err != nil { - return fmt.Errorf("error setting watch on VirtualMachineIpAddress: %w", err) - } - - // Subscribe on VirtualImage. - if err := ctr.Watch( - source.Kind(mgr.GetCache(), &virtv2.VirtualImage{}), - handler.EnqueueRequestsFromMapFunc(r.enqueueRequestsBlockDevice(mgr.GetClient(), virtv2.ImageDevice)), - predicate.Funcs{ - CreateFunc: func(e event.CreateEvent) bool { return true }, - DeleteFunc: func(e event.DeleteEvent) bool { return true }, - UpdateFunc: func(e event.UpdateEvent) bool { - oldVi, oldOk := e.ObjectOld.(*virtv2.VirtualImage) - newVi, newOk := e.ObjectNew.(*virtv2.VirtualImage) - if !oldOk || !newOk { - return false - } - return oldVi.Status.Phase != newVi.Status.Phase - }, - }, - ); err != nil { - return fmt.Errorf("error setting watch on VirtualImage: %w", err) - } - - // Subscribe on VirtualDisk. - if err := ctr.Watch( - source.Kind(mgr.GetCache(), &virtv2.VirtualDisk{}), - handler.EnqueueRequestsFromMapFunc(r.enqueueRequestsBlockDevice(mgr.GetClient(), virtv2.DiskDevice)), - predicate.Funcs{ - CreateFunc: func(e event.CreateEvent) bool { return true }, - DeleteFunc: func(e event.DeleteEvent) bool { return true }, - UpdateFunc: func(e event.UpdateEvent) bool { - oldVd, oldOk := e.ObjectOld.(*virtv2.VirtualDisk) - newVd, newOk := e.ObjectNew.(*virtv2.VirtualDisk) - if !oldOk || !newOk { - return false - } - - oldInUseCondition, _ := conditions.GetCondition(vdcondition.InUseType, oldVd.Status.Conditions) - newInUseCondition, _ := conditions.GetCondition(vdcondition.InUseType, newVd.Status.Conditions) - - if oldVd.Status.Phase != newVd.Status.Phase || oldInUseCondition != newInUseCondition { - return true - } - - return false - }, - }, - ); err != nil { - return fmt.Errorf("error setting watch on VirtualDisk: %w", err) - } - - // Subscribe on ClusterVirtualImage. - if err := ctr.Watch( - source.Kind(mgr.GetCache(), &virtv2.ClusterVirtualImage{}), - handler.EnqueueRequestsFromMapFunc(r.enqueueRequestsBlockDevice(mgr.GetClient(), virtv2.ClusterImageDevice)), - predicate.Funcs{ - CreateFunc: func(e event.CreateEvent) bool { return true }, - DeleteFunc: func(e event.DeleteEvent) bool { return true }, - UpdateFunc: func(e event.UpdateEvent) bool { - oldCvi, oldOk := e.ObjectOld.(*virtv2.ClusterVirtualImage) - newCvi, newOk := e.ObjectNew.(*virtv2.ClusterVirtualImage) - if !oldOk || !newOk { - return false - } - return oldCvi.Status.Phase != newCvi.Status.Phase - }, - }, - ); err != nil { - return fmt.Errorf("error setting watch on ClusterVirtualImage: %w", err) - } - for _, w := range []Watcher{ + watcher.NewKVVMWatcher(), + watcher.NewKVVMIWatcher(), + watcher.NewPodWatcher(), + watcher.NewVirtualImageWatcher(), + watcher.NewClusterVirtualImageWatcher(), + watcher.NewVirtualDiskWatcher(), + watcher.NewVMIPWatcher(), watcher.NewVirtualMachineClassWatcher(), watcher.NewVirtualMachineSnapshotWatcher(mgr.GetClient()), } { @@ -268,53 +81,6 @@ func (r *Reconciler) SetupController(_ context.Context, mgr manager.Manager, ctr return nil } -func (r *Reconciler) enqueueRequestsBlockDevice(cl client.Client, kind virtv2.BlockDeviceKind) func(ctx context.Context, obj client.Object) []reconcile.Request { - return func(ctx context.Context, obj client.Object) []reconcile.Request { - var opts []client.ListOption - switch kind { - case virtv2.ImageDevice: - if _, ok := obj.(*virtv2.VirtualImage); !ok { - return nil - } - opts = append(opts, - client.InNamespace(obj.GetNamespace()), - client.MatchingFields{indexer.IndexFieldVMByVI: obj.GetName()}, - ) - case virtv2.ClusterImageDevice: - if _, ok := obj.(*virtv2.ClusterVirtualImage); !ok { - return nil - } - opts = append(opts, - client.MatchingFields{indexer.IndexFieldVMByCVI: obj.GetName()}, - ) - case virtv2.DiskDevice: - if _, ok := obj.(*virtv2.VirtualDisk); !ok { - return nil - } - opts = append(opts, - client.InNamespace(obj.GetNamespace()), - client.MatchingFields{indexer.IndexFieldVMByVD: obj.GetName()}, - ) - default: - return nil - } - var vms virtv2.VirtualMachineList - if err := cl.List(ctx, &vms, opts...); err != nil { - return nil - } - var result []reconcile.Request - for _, vm := range vms.Items { - result = append(result, reconcile.Request{ - NamespacedName: types.NamespacedName{ - Name: vm.GetName(), - Namespace: vm.GetNamespace(), - }, - }) - } - return result - } -} - func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { log := logger.FromContext(ctx)