Skip to content

Commit 5a81a90

Browse files
refactor(vm): move watchers out into a separate package (#952)
move watchers out into a separate package Signed-off-by: Yaroslav Borbat <yaroslav.borbat@flant.com>
1 parent 4a3d68c commit 5a81a90

8 files changed

Lines changed: 528 additions & 241 deletions

File tree

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
Copyright 2025 Flant JSC
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package watcher
18+
19+
import (
20+
"fmt"
21+
22+
"sigs.k8s.io/controller-runtime/pkg/controller"
23+
"sigs.k8s.io/controller-runtime/pkg/event"
24+
"sigs.k8s.io/controller-runtime/pkg/handler"
25+
"sigs.k8s.io/controller-runtime/pkg/manager"
26+
"sigs.k8s.io/controller-runtime/pkg/predicate"
27+
"sigs.k8s.io/controller-runtime/pkg/source"
28+
29+
virtv2 "github.com/deckhouse/virtualization/api/core/v1alpha2"
30+
)
31+
32+
func NewClusterVirtualImageWatcher() *CLusterVirtualImageWatcher {
33+
return &CLusterVirtualImageWatcher{}
34+
}
35+
36+
type CLusterVirtualImageWatcher struct{}
37+
38+
func (w *CLusterVirtualImageWatcher) Watch(mgr manager.Manager, ctr controller.Controller) error {
39+
if err := ctr.Watch(
40+
source.Kind(mgr.GetCache(), &virtv2.ClusterVirtualImage{}),
41+
handler.EnqueueRequestsFromMapFunc(enqueueRequestsBlockDevice(mgr.GetClient(), virtv2.ClusterImageDevice)),
42+
predicate.Funcs{
43+
CreateFunc: func(e event.CreateEvent) bool { return true },
44+
DeleteFunc: func(e event.DeleteEvent) bool { return true },
45+
UpdateFunc: func(e event.UpdateEvent) bool {
46+
oldCvi, oldOk := e.ObjectOld.(*virtv2.ClusterVirtualImage)
47+
newCvi, newOk := e.ObjectNew.(*virtv2.ClusterVirtualImage)
48+
if !oldOk || !newOk {
49+
return false
50+
}
51+
return oldCvi.Status.Phase != newCvi.Status.Phase
52+
},
53+
},
54+
); err != nil {
55+
return fmt.Errorf("error setting watch on ClusterVirtualImage: %w", err)
56+
}
57+
return nil
58+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
Copyright 2025 Flant JSC
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package watcher
18+
19+
import (
20+
"fmt"
21+
22+
virtv1 "kubevirt.io/api/core/v1"
23+
"sigs.k8s.io/controller-runtime/pkg/controller"
24+
"sigs.k8s.io/controller-runtime/pkg/event"
25+
"sigs.k8s.io/controller-runtime/pkg/handler"
26+
"sigs.k8s.io/controller-runtime/pkg/manager"
27+
"sigs.k8s.io/controller-runtime/pkg/predicate"
28+
"sigs.k8s.io/controller-runtime/pkg/source"
29+
30+
"github.com/deckhouse/virtualization-controller/pkg/common/annotations"
31+
virtv2 "github.com/deckhouse/virtualization/api/core/v1alpha2"
32+
)
33+
34+
func NewKVVMWatcher() *KVVMWatcher {
35+
return &KVVMWatcher{}
36+
}
37+
38+
type KVVMWatcher struct{}
39+
40+
func (w *KVVMWatcher) Watch(mgr manager.Manager, ctr controller.Controller) error {
41+
if err := ctr.Watch(
42+
source.Kind(mgr.GetCache(), &virtv1.VirtualMachine{}),
43+
handler.EnqueueRequestForOwner(
44+
mgr.GetScheme(),
45+
mgr.GetRESTMapper(),
46+
&virtv2.VirtualMachine{},
47+
handler.OnlyControllerOwner(),
48+
),
49+
predicate.Funcs{
50+
CreateFunc: func(e event.CreateEvent) bool { return true },
51+
DeleteFunc: func(e event.DeleteEvent) bool { return true },
52+
UpdateFunc: func(e event.UpdateEvent) bool {
53+
oldVM := e.ObjectOld.(*virtv1.VirtualMachine)
54+
newVM := e.ObjectNew.(*virtv1.VirtualMachine)
55+
return oldVM.Status.PrintableStatus != newVM.Status.PrintableStatus ||
56+
oldVM.Status.Ready != newVM.Status.Ready ||
57+
oldVM.Annotations[annotations.AnnVmStartRequested] != newVM.Annotations[annotations.AnnVmStartRequested] ||
58+
oldVM.Annotations[annotations.AnnVmRestartRequested] != newVM.Annotations[annotations.AnnVmRestartRequested]
59+
},
60+
},
61+
); err != nil {
62+
return fmt.Errorf("error setting watch on VirtualMachine: %w", err)
63+
}
64+
return nil
65+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
Copyright 2025 Flant JSC
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package watcher
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"reflect"
23+
24+
"k8s.io/apimachinery/pkg/types"
25+
virtv1 "kubevirt.io/api/core/v1"
26+
"sigs.k8s.io/controller-runtime/pkg/client"
27+
"sigs.k8s.io/controller-runtime/pkg/controller"
28+
"sigs.k8s.io/controller-runtime/pkg/event"
29+
"sigs.k8s.io/controller-runtime/pkg/handler"
30+
"sigs.k8s.io/controller-runtime/pkg/manager"
31+
"sigs.k8s.io/controller-runtime/pkg/predicate"
32+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
33+
"sigs.k8s.io/controller-runtime/pkg/source"
34+
)
35+
36+
func NewKVVMIWatcher() *KVVMIWatcher {
37+
return &KVVMIWatcher{}
38+
}
39+
40+
type KVVMIWatcher struct{}
41+
42+
func (w *KVVMIWatcher) Watch(mgr manager.Manager, ctr controller.Controller) error {
43+
// Subscribe on Kubevirt VirtualMachineInstances to update our VM status.
44+
if err := ctr.Watch(
45+
source.Kind(mgr.GetCache(), &virtv1.VirtualMachineInstance{}),
46+
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, vmi client.Object) []reconcile.Request {
47+
return []reconcile.Request{
48+
{
49+
NamespacedName: types.NamespacedName{
50+
Name: vmi.GetName(),
51+
Namespace: vmi.GetNamespace(),
52+
},
53+
},
54+
}
55+
}),
56+
predicate.Funcs{
57+
CreateFunc: func(e event.CreateEvent) bool { return true },
58+
DeleteFunc: func(e event.DeleteEvent) bool { return true },
59+
UpdateFunc: func(e event.UpdateEvent) bool {
60+
oldVM := e.ObjectOld.(*virtv1.VirtualMachineInstance)
61+
newVM := e.ObjectNew.(*virtv1.VirtualMachineInstance)
62+
return !reflect.DeepEqual(oldVM.Status, newVM.Status)
63+
},
64+
},
65+
); err != nil {
66+
return fmt.Errorf("error setting watch on VirtualMachine: %w", err)
67+
}
68+
return nil
69+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
Copyright 2025 Flant JSC
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package watcher
18+
19+
import (
20+
"context"
21+
"fmt"
22+
23+
corev1 "k8s.io/api/core/v1"
24+
"k8s.io/apimachinery/pkg/types"
25+
"sigs.k8s.io/controller-runtime/pkg/client"
26+
"sigs.k8s.io/controller-runtime/pkg/controller"
27+
"sigs.k8s.io/controller-runtime/pkg/event"
28+
"sigs.k8s.io/controller-runtime/pkg/handler"
29+
"sigs.k8s.io/controller-runtime/pkg/manager"
30+
"sigs.k8s.io/controller-runtime/pkg/predicate"
31+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
32+
"sigs.k8s.io/controller-runtime/pkg/source"
33+
)
34+
35+
func NewPodWatcher() *PodWatcher {
36+
return &PodWatcher{}
37+
}
38+
39+
type PodWatcher struct{}
40+
41+
func (w *PodWatcher) Watch(mgr manager.Manager, ctr controller.Controller) error {
42+
// Watch for Pods created on behalf of VMs. Handle only changes in status.phase.
43+
// Pod tracking is required to detect when Pod becomes Completed after guest initiated reset or shutdown.
44+
if err := ctr.Watch(
45+
source.Kind(mgr.GetCache(), &corev1.Pod{}),
46+
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, pod client.Object) []reconcile.Request {
47+
vmName, hasLabel := pod.GetLabels()["vm.kubevirt.io/name"]
48+
if !hasLabel {
49+
return nil
50+
}
51+
52+
return []reconcile.Request{
53+
{
54+
NamespacedName: types.NamespacedName{
55+
Name: vmName,
56+
Namespace: pod.GetNamespace(),
57+
},
58+
},
59+
}
60+
}),
61+
predicate.Funcs{
62+
CreateFunc: func(e event.CreateEvent) bool { return true },
63+
DeleteFunc: func(e event.DeleteEvent) bool { return true },
64+
UpdateFunc: func(e event.UpdateEvent) bool {
65+
oldPod := e.ObjectOld.(*corev1.Pod)
66+
newPod := e.ObjectNew.(*corev1.Pod)
67+
return oldPod.Status.Phase != newPod.Status.Phase
68+
},
69+
},
70+
); err != nil {
71+
return fmt.Errorf("error setting watch on Pod: %w", err)
72+
}
73+
return nil
74+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
Copyright 2025 Flant JSC
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package watcher
18+
19+
import (
20+
"fmt"
21+
22+
"sigs.k8s.io/controller-runtime/pkg/controller"
23+
"sigs.k8s.io/controller-runtime/pkg/event"
24+
"sigs.k8s.io/controller-runtime/pkg/handler"
25+
"sigs.k8s.io/controller-runtime/pkg/manager"
26+
"sigs.k8s.io/controller-runtime/pkg/predicate"
27+
"sigs.k8s.io/controller-runtime/pkg/source"
28+
29+
"github.com/deckhouse/virtualization-controller/pkg/controller/conditions"
30+
virtv2 "github.com/deckhouse/virtualization/api/core/v1alpha2"
31+
"github.com/deckhouse/virtualization/api/core/v1alpha2/vdcondition"
32+
)
33+
34+
func NewVirtualDiskWatcher() *VirtualDiskWatcher {
35+
return &VirtualDiskWatcher{}
36+
}
37+
38+
type VirtualDiskWatcher struct{}
39+
40+
func (w *VirtualDiskWatcher) Watch(mgr manager.Manager, ctr controller.Controller) error {
41+
if err := ctr.Watch(
42+
source.Kind(mgr.GetCache(), &virtv2.VirtualDisk{}),
43+
handler.EnqueueRequestsFromMapFunc(enqueueRequestsBlockDevice(mgr.GetClient(), virtv2.DiskDevice)),
44+
predicate.Funcs{
45+
CreateFunc: func(e event.CreateEvent) bool { return true },
46+
DeleteFunc: func(e event.DeleteEvent) bool { return true },
47+
UpdateFunc: func(e event.UpdateEvent) bool {
48+
oldVd, oldOk := e.ObjectOld.(*virtv2.VirtualDisk)
49+
newVd, newOk := e.ObjectNew.(*virtv2.VirtualDisk)
50+
if !oldOk || !newOk {
51+
return false
52+
}
53+
54+
oldInUseCondition, _ := conditions.GetCondition(vdcondition.InUseType, oldVd.Status.Conditions)
55+
newInUseCondition, _ := conditions.GetCondition(vdcondition.InUseType, newVd.Status.Conditions)
56+
57+
if oldVd.Status.Phase != newVd.Status.Phase || oldInUseCondition != newInUseCondition {
58+
return true
59+
}
60+
61+
return false
62+
},
63+
},
64+
); err != nil {
65+
return fmt.Errorf("error setting watch on VirtualDisk: %w", err)
66+
}
67+
return nil
68+
}

0 commit comments

Comments
 (0)