Skip to content

Commit 24720f8

Browse files
authored
Fixes the lifecycle of the PodWatcher used by AttachedConnectors (skupperproject#2321)
* Fixes the lifecycle of the PodWatcher used by AttachedConnectors Fixes skupperproject#2320. * Add unit tests * Stopping watcher when binding is deleted * Unit tests to validate podwatcher stopped when attached connector or binding deleted
1 parent 97ad5de commit 24720f8

11 files changed

Lines changed: 402 additions & 38 deletions

File tree

internal/kube/controller/controller.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,11 @@ func labelling() internalinterfaces.TweakListOptionsFunc {
8383
}
8484
}
8585

86+
var eventProcessorCustomizers []watchers.EventProcessorCustomizer
87+
8688
func NewController(cli internalclient.Clients, config *Config) (*Controller, error) {
8789
controller := &Controller{
88-
eventProcessor: watchers.NewEventProcessor("Controller", cli),
90+
eventProcessor: watchers.NewEventProcessor("Controller", cli, eventProcessorCustomizers...),
8991
sites: map[string]*site.Site{},
9092
siteSizing: sizing.NewRegistry(),
9193
labelling: labels.NewLabelsAndAnnotations(config.Namespace),
@@ -448,12 +450,29 @@ func (c *Controller) checkAttachedConnectorBinding(key string, binding *skupperv
448450
func (c *Controller) checkAttachedConnector(key string, connector *skupperv2alpha1.AttachedConnector) error {
449451
if connector == nil {
450452
if previous, ok := c.attachableConnectors[key]; ok {
453+
c.log.Info("AttachedConnector deleted", slog.String("key", key))
451454
delete(c.attachableConnectors, key)
452455
return c.getSite(previous.Spec.SiteNamespace).AttachedConnectorDeleted(previous.Namespace, previous.Name)
453456
} else {
454457
return nil
455458
}
456459
} else {
460+
if previous, ok := c.attachableConnectors[key]; ok {
461+
if previous.Spec.SiteNamespace != connector.Spec.SiteNamespace {
462+
c.log.Info("AttachedConnector site namespace has changed",
463+
slog.String("key", key),
464+
slog.String("from", previous.Spec.SiteNamespace),
465+
slog.String("to", connector.Spec.SiteNamespace),
466+
)
467+
err := c.getSite(previous.Spec.SiteNamespace).AttachedConnectorUnreferenced(previous)
468+
if err != nil {
469+
c.log.Error("Error removing AttachedConnector reference from previous namespace",
470+
slog.String("key", key),
471+
slog.String("previous", previous.Spec.SiteNamespace))
472+
}
473+
}
474+
}
475+
c.attachableConnectors[key] = connector
457476
return c.getSite(connector.Spec.SiteNamespace).AttachedConnectorUpdated(connector)
458477
}
459478
}

internal/kube/controller/controller_test.go

Lines changed: 227 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@ import (
88
"reflect"
99
"strings"
1010
"testing"
11+
"time"
1112

1213
"github.com/google/uuid"
14+
"github.com/skupperproject/skupper/internal/kube/watchers"
1315
"gotest.tools/v3/assert"
1416

1517
appsv1 "k8s.io/api/apps/v1"
@@ -37,6 +39,7 @@ import (
3739
)
3840

3941
type WaitFunction func(t *testing.T, clients internalclient.Clients) bool
42+
type ControllerFunction func(t *testing.T, controller *Controller) bool
4043

4144
func TestGeneral(t *testing.T) {
4245
fakeNetworkStatus := f.fakeNetworkStatusInfo("test")
@@ -605,6 +608,7 @@ func TestUpdate(t *testing.T) {
605608
k8sObjects []runtime.Object
606609
skupperObjects []runtime.Object
607610
functions []WaitFunction
611+
postFunctions []ControllerFunction
608612
}{
609613
{
610614
name: "change listener host",
@@ -640,8 +644,126 @@ func TestUpdate(t *testing.T) {
640644
deleteTargetPod("mypod-1", "test"),
641645
serviceCheck("mypod-1", "test").checkAbsent,
642646
},
647+
}, {
648+
name: "unreferenced attached connector",
649+
skupperObjects: []runtime.Object{
650+
f.site("mysite", "test", "", false, false),
651+
f.attachedConnectorBinding("myconnector", "test", "test2"),
652+
f.attachedConnector("myconnector", "test2", "test", "app=foo", 8080),
653+
f.listener("mylistener", "test", "mysvc", 8080),
654+
},
655+
k8sObjects: []runtime.Object{
656+
f.pod("foo", "test2", map[string]string{"app": "foo"}, nil,
657+
f.podStatus("10.1.1.10", corev1.PodRunning, f.podCondition(corev1.PodReady, corev1.ConditionTrue))),
658+
},
659+
functions: []WaitFunction{
660+
isListenerStatusConditionTrue("mylistener", "test", skupperv2alpha1.CONDITION_TYPE_CONFIGURED),
661+
serviceCheck("mysvc", "test").check,
662+
isListenerStatusConditionTrue("mylistener", "test", skupperv2alpha1.CONDITION_TYPE_CONFIGURED),
663+
isAttachedConnectorStatusConditionTrue("myconnector", "test2", skupperv2alpha1.CONDITION_TYPE_CONFIGURED),
664+
updateAttachedConnectorSiteNamespace("myconnector", "test2", "test3", skupperv2alpha1.CONDITION_TYPE_CONFIGURED, metav1.ConditionFalse),
665+
},
666+
postFunctions: []ControllerFunction{
667+
podWatchers(1, 1),
668+
},
669+
}, {
670+
name: "deleted attached connector",
671+
skupperObjects: []runtime.Object{
672+
f.site("mysite", "test", "", false, false),
673+
f.attachedConnectorBinding("myconnector", "test", "test2"),
674+
f.attachedConnector("myconnector", "test2", "test", "app=foo", 8080),
675+
f.listener("mylistener", "test", "mysvc", 8080),
676+
},
677+
k8sObjects: []runtime.Object{
678+
f.pod("foo", "test2", map[string]string{"app": "foo"}, nil,
679+
f.podStatus("10.1.1.10", corev1.PodRunning, f.podCondition(corev1.PodReady, corev1.ConditionTrue))),
680+
},
681+
functions: []WaitFunction{
682+
isListenerStatusConditionTrue("mylistener", "test", skupperv2alpha1.CONDITION_TYPE_CONFIGURED),
683+
serviceCheck("mysvc", "test").check,
684+
isListenerStatusConditionTrue("mylistener", "test", skupperv2alpha1.CONDITION_TYPE_CONFIGURED),
685+
isAttachedConnectorStatusConditionTrue("myconnector", "test2", skupperv2alpha1.CONDITION_TYPE_CONFIGURED),
686+
deleteAttachedConnector("myconnector", "test2"),
687+
},
688+
postFunctions: []ControllerFunction{
689+
podWatchers(1, 1),
690+
},
691+
}, {
692+
name: "unreferenced attached connector binding",
693+
skupperObjects: []runtime.Object{
694+
f.site("mysite", "test", "", false, false),
695+
f.attachedConnectorBinding("myconnector", "test", "test2"),
696+
f.attachedConnector("myconnector", "test2", "test", "app=foo", 8080),
697+
f.listener("mylistener", "test", "mysvc", 8080),
698+
},
699+
k8sObjects: []runtime.Object{
700+
f.pod("foo", "test2", map[string]string{"app": "foo"}, nil,
701+
f.podStatus("10.1.1.10", corev1.PodRunning, f.podCondition(corev1.PodReady, corev1.ConditionTrue))),
702+
},
703+
functions: []WaitFunction{
704+
isListenerStatusConditionTrue("mylistener", "test", skupperv2alpha1.CONDITION_TYPE_CONFIGURED),
705+
serviceCheck("mysvc", "test").check,
706+
isListenerStatusConditionTrue("mylistener", "test", skupperv2alpha1.CONDITION_TYPE_CONFIGURED),
707+
isAttachedConnectorStatusConditionTrue("myconnector", "test2", skupperv2alpha1.CONDITION_TYPE_CONFIGURED),
708+
isAttachedConnectorBindingStatusCondition("myconnector", "test", skupperv2alpha1.CONDITION_TYPE_CONFIGURED, metav1.ConditionTrue),
709+
updateAttachedConnectorBindingConnectorNamespace("myconnector", "test", "test3", skupperv2alpha1.CONDITION_TYPE_CONFIGURED, metav1.ConditionFalse),
710+
isAttachedConnectorBindingStatusCondition("myconnector", "test", skupperv2alpha1.CONDITION_TYPE_CONFIGURED, metav1.ConditionFalse),
711+
},
712+
postFunctions: []ControllerFunction{
713+
podWatchers(1, 1),
714+
},
715+
}, {
716+
name: "deleted attached connector binding",
717+
skupperObjects: []runtime.Object{
718+
f.site("mysite", "test", "", false, false),
719+
f.attachedConnectorBinding("myconnector", "test", "test2"),
720+
f.attachedConnector("myconnector", "test2", "test", "app=foo", 8080),
721+
f.listener("mylistener", "test", "mysvc", 8080),
722+
},
723+
k8sObjects: []runtime.Object{
724+
f.pod("foo", "test2", map[string]string{"app": "foo"}, nil,
725+
f.podStatus("10.1.1.10", corev1.PodRunning, f.podCondition(corev1.PodReady, corev1.ConditionTrue))),
726+
},
727+
functions: []WaitFunction{
728+
isListenerStatusConditionTrue("mylistener", "test", skupperv2alpha1.CONDITION_TYPE_CONFIGURED),
729+
serviceCheck("mysvc", "test").check,
730+
isListenerStatusConditionTrue("mylistener", "test", skupperv2alpha1.CONDITION_TYPE_CONFIGURED),
731+
isAttachedConnectorStatusConditionTrue("myconnector", "test2", skupperv2alpha1.CONDITION_TYPE_CONFIGURED),
732+
isAttachedConnectorBindingStatusCondition("myconnector", "test", skupperv2alpha1.CONDITION_TYPE_CONFIGURED, metav1.ConditionTrue),
733+
deleteAttachedConnectorBinding("myconnector", "test"),
734+
isAttachedConnectorStatusCondition("myconnector", "test2", skupperv2alpha1.CONDITION_TYPE_CONFIGURED, metav1.ConditionFalse),
735+
},
736+
postFunctions: []ControllerFunction{
737+
podWatchers(1, 1),
738+
},
739+
}, {
740+
name: "site deleted",
741+
skupperObjects: []runtime.Object{
742+
f.addUID(f.site("mysite", "test", "", false, false), "49b03ad4-d414-42be-bbb5-b32d7d4ca503"),
743+
f.attachedConnectorBinding("myconnector", "test", "test2"),
744+
f.attachedConnector("myconnector", "test2", "test", "app=foo", 8080),
745+
f.listener("mylistener", "test", "mysvc", 8080),
746+
},
747+
k8sObjects: []runtime.Object{
748+
f.pod("foo", "test2", map[string]string{"app": "foo"}, nil,
749+
f.podStatus("10.1.1.10", corev1.PodRunning, f.podCondition(corev1.PodReady, corev1.ConditionTrue))),
750+
},
751+
functions: []WaitFunction{
752+
isListenerStatusConditionTrue("mylistener", "test", skupperv2alpha1.CONDITION_TYPE_CONFIGURED),
753+
serviceCheck("mysvc", "test").check,
754+
isListenerStatusConditionTrue("mylistener", "test", skupperv2alpha1.CONDITION_TYPE_CONFIGURED),
755+
isAttachedConnectorStatusConditionTrue("myconnector", "test2", skupperv2alpha1.CONDITION_TYPE_CONFIGURED),
756+
isAttachedConnectorBindingStatusCondition("myconnector", "test", skupperv2alpha1.CONDITION_TYPE_CONFIGURED, metav1.ConditionTrue),
757+
deleteSite("mysite", "test"),
758+
},
759+
postFunctions: []ControllerFunction{
760+
podWatchers(1, 1),
761+
},
643762
},
644763
}
764+
defer func() {
765+
eventProcessorCustomizers = nil
766+
}()
645767
for _, tt := range testTable {
646768
t.Run(tt.name, func(t *testing.T) {
647769
flags := &flag.FlagSet{}
@@ -654,6 +776,11 @@ func TestUpdate(t *testing.T) {
654776
clients, err := fakeclient.NewFakeClient(config.Namespace, tt.k8sObjects, tt.skupperObjects, "")
655777
assert.Assert(t, err)
656778
enableSSA(clients.GetDynamicClient())
779+
eventProcessorCustomizers = []watchers.EventProcessorCustomizer{
780+
func(e *watchers.EventProcessor) {
781+
e.SetResyncShort(time.Second)
782+
},
783+
}
657784
controller, err := NewController(clients, config)
658785
assert.Assert(t, err)
659786
stopCh := make(chan struct{})
@@ -668,10 +795,31 @@ func TestUpdate(t *testing.T) {
668795
controller.eventProcessor.TestProcess()
669796
}
670797
}
798+
for _, f := range tt.postFunctions {
799+
for !f(t, controller) {
800+
controller.eventProcessor.TestProcess()
801+
}
802+
}
671803
})
672804
}
673805
}
674806

807+
func deleteAttachedConnector(name string, namespace string) WaitFunction {
808+
return func(t *testing.T, clients internalclient.Clients) bool {
809+
err := clients.GetSkupperClient().SkupperV2alpha1().AttachedConnectors(namespace).Delete(context.Background(), name, metav1.DeleteOptions{})
810+
assert.Assert(t, err)
811+
return true
812+
}
813+
}
814+
815+
func deleteAttachedConnectorBinding(name string, namespace string) WaitFunction {
816+
return func(t *testing.T, clients internalclient.Clients) bool {
817+
err := clients.GetSkupperClient().SkupperV2alpha1().AttachedConnectorBindings(namespace).Delete(context.Background(), name, metav1.DeleteOptions{})
818+
assert.Assert(t, err)
819+
return true
820+
}
821+
}
822+
675823
func verifyStatus(t *testing.T, expected skupperv2alpha1.Status, actual skupperv2alpha1.Status) {
676824
t.Helper()
677825
assert.Equal(t, expected.StatusType, actual.StatusType, actual.Message)
@@ -1643,11 +1791,89 @@ func isListenerStatusConditionTrue(name string, namespace string, condition stri
16431791
}
16441792
}
16451793

1794+
func updateAttachedConnectorSiteNamespace(name string, namespace string, siteNamespace string, condition string, conditionStatus metav1.ConditionStatus) WaitFunction {
1795+
return func(t *testing.T, clients internalclient.Clients) bool {
1796+
cli := clients.GetSkupperClient().SkupperV2alpha1().AttachedConnectors(namespace)
1797+
connector, err := cli.Get(context.Background(), name, metav1.GetOptions{})
1798+
assert.Assert(t, err)
1799+
if connector.Spec.SiteNamespace != siteNamespace {
1800+
t.Logf("updating siteNamespace")
1801+
connector.Spec.SiteNamespace = siteNamespace
1802+
_, err = cli.Update(context.Background(), connector, metav1.UpdateOptions{})
1803+
assert.Assert(t, err)
1804+
return false
1805+
}
1806+
return isConditionUpToDate(connector.Status.Conditions, condition, connector.ObjectMeta.Generation) && meta.IsStatusConditionPresentAndEqual(connector.Status.Conditions, condition, conditionStatus)
1807+
}
1808+
}
1809+
1810+
func updateAttachedConnectorBindingConnectorNamespace(name string, namespace string, connectorNamespace string, condition string, status metav1.ConditionStatus) WaitFunction {
1811+
return func(t *testing.T, clients internalclient.Clients) bool {
1812+
cli := clients.GetSkupperClient().SkupperV2alpha1().AttachedConnectorBindings(namespace)
1813+
binding, err := cli.Get(context.Background(), name, metav1.GetOptions{})
1814+
assert.Assert(t, err)
1815+
if binding.Spec.ConnectorNamespace != connectorNamespace {
1816+
t.Logf("updating connectorNamespace")
1817+
binding.Spec.ConnectorNamespace = connectorNamespace
1818+
_, err = cli.Update(context.Background(), binding, metav1.UpdateOptions{})
1819+
assert.Assert(t, err)
1820+
return false
1821+
}
1822+
return isConditionUpToDate(binding.Status.Conditions, condition, binding.ObjectMeta.Generation) && meta.IsStatusConditionPresentAndEqual(binding.Status.Conditions, condition, status)
1823+
}
1824+
}
1825+
16461826
func isAttachedConnectorStatusConditionTrue(name string, namespace string, condition string) WaitFunction {
1827+
return isAttachedConnectorStatusCondition(name, namespace, condition, metav1.ConditionTrue)
1828+
}
1829+
1830+
func isAttachedConnectorStatusCondition(name string, namespace string, condition string, status metav1.ConditionStatus) WaitFunction {
16471831
return func(t *testing.T, clients internalclient.Clients) bool {
16481832
connector, err := clients.GetSkupperClient().SkupperV2alpha1().AttachedConnectors(namespace).Get(context.Background(), name, metav1.GetOptions{})
16491833
assert.Assert(t, err)
1650-
return meta.IsStatusConditionTrue(connector.Status.Conditions, condition)
1834+
return meta.IsStatusConditionPresentAndEqual(connector.Status.Conditions, condition, status)
1835+
}
1836+
}
1837+
1838+
func isAttachedConnectorBindingStatusCondition(name string, namespace string, condition string, status metav1.ConditionStatus) WaitFunction {
1839+
return func(t *testing.T, clients internalclient.Clients) bool {
1840+
binding, err := clients.GetSkupperClient().SkupperV2alpha1().AttachedConnectorBindings(namespace).Get(context.Background(), name, metav1.GetOptions{})
1841+
assert.Assert(t, err)
1842+
return meta.IsStatusConditionPresentAndEqual(binding.Status.Conditions, condition, status)
1843+
}
1844+
}
1845+
1846+
func podWatchers(expectedRunning int, expectedStopped int) ControllerFunction {
1847+
return func(t *testing.T, controller *Controller) bool {
1848+
for {
1849+
var running int
1850+
var stopped int
1851+
for _, w := range controller.eventProcessor.GetWatchers() {
1852+
if rw, ok := w.(*watchers.ResourceWatcher[*corev1.Pod]); ok {
1853+
if rw.IsStopped() {
1854+
stopped++
1855+
} else {
1856+
running++
1857+
}
1858+
}
1859+
}
1860+
if expectedRunning == running && expectedStopped == stopped {
1861+
return true
1862+
}
1863+
t.Logf("PodWatchers count do not match - Expected Running %d/%d - Expected Stopped %d/%d", running, expectedRunning, stopped, expectedStopped)
1864+
return false
1865+
}
1866+
}
1867+
}
1868+
1869+
func deleteSite(name string, namespace string) WaitFunction {
1870+
return func(t *testing.T, clients internalclient.Clients) bool {
1871+
site, err := clients.GetSkupperClient().SkupperV2alpha1().Sites(namespace).Get(context.Background(), name, metav1.GetOptions{})
1872+
if err == nil && site != nil {
1873+
_ = clients.GetSkupperClient().SkupperV2alpha1().Sites(namespace).Delete(context.Background(), name, metav1.DeleteOptions{})
1874+
return false
1875+
}
1876+
return true
16511877
}
16521878
}
16531879

0 commit comments

Comments
 (0)