diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index ec5f21db..a0004140 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -45,5 +45,6 @@ jobs: export GOOS=linux go test \ + --race \ -tags test \ ./cmd/... ./pkg/... ./test/utils diff --git a/pkg/hook/controller/kubernetes_bindings_controller.go b/pkg/hook/controller/kubernetes_bindings_controller.go index 3eed31da..f7d6e38e 100644 --- a/pkg/hook/controller/kubernetes_bindings_controller.go +++ b/pkg/hook/controller/kubernetes_bindings_controller.go @@ -56,7 +56,7 @@ type kubernetesBindingsController struct { } // kubernetesHooksController should implement the KubernetesHooksController -var _ KubernetesBindingsController = &kubernetesBindingsController{} +var _ KubernetesBindingsController = (*kubernetesBindingsController)(nil) // NewKubernetesBindingsController returns an implementation of KubernetesBindingsController var NewKubernetesBindingsController = func(logger *log.Logger) *kubernetesBindingsController { diff --git a/pkg/hook/controller/schedule_bindings_controller.go b/pkg/hook/controller/schedule_bindings_controller.go index 187a19a9..de595358 100644 --- a/pkg/hook/controller/schedule_bindings_controller.go +++ b/pkg/hook/controller/schedule_bindings_controller.go @@ -8,7 +8,7 @@ import ( schedulemanager "github.com/flant/shell-operator/pkg/schedule_manager" ) -// A link between a hook and a kube monitor +// ScheduleBindingToCrontabLink a link between a hook and a kube monitor type ScheduleBindingToCrontabLink struct { BindingName string Crontab string @@ -29,7 +29,7 @@ type ScheduleBindingsController interface { HandleEvent(crontab string) []BindingExecutionInfo } -// scheduleHooksController is a main implementation of KubernetesHooksController +// scheduleBindingsController is a main implementation of KubernetesHooksController type scheduleBindingsController struct { // dependencies scheduleManager schedulemanager.ScheduleManager @@ -43,7 +43,7 @@ type scheduleBindingsController struct { } // kubernetesHooksController should implement the KubernetesHooksController -var _ ScheduleBindingsController = &scheduleBindingsController{} +var _ ScheduleBindingsController = (*scheduleBindingsController)(nil) // NewScheduleBindingsController returns an implementation of ScheduleBindingsController var NewScheduleBindingsController = func() *scheduleBindingsController { diff --git a/pkg/kube_events_manager/factory.go b/pkg/kube_events_manager/factory.go index 35bfd528..a52ebab4 100644 --- a/pkg/kube_events_manager/factory.go +++ b/pkg/kube_events_manager/factory.go @@ -49,6 +49,12 @@ func NewFactoryStore() *FactoryStore { } } +func (c *FactoryStore) Reset() { + c.mu.Lock() + defer c.mu.Unlock() + c.data = make(map[FactoryIndex]Factory) +} + func (c *FactoryStore) add(index FactoryIndex, f dynamicinformer.DynamicSharedInformerFactory) { ctx, cancel := context.WithCancel(context.Background()) c.data[index] = Factory{ diff --git a/pkg/kube_events_manager/monitor.go b/pkg/kube_events_manager/monitor.go index 27aaa469..da11c384 100644 --- a/pkg/kube_events_manager/monitor.go +++ b/pkg/kube_events_manager/monitor.go @@ -5,6 +5,7 @@ import ( "fmt" "log/slog" "sort" + "sync" "github.com/deckhouse/deckhouse/pkg/log" @@ -35,14 +36,14 @@ type monitor struct { // Namespace informer to get new namespaces NamespaceInformer *namespaceInformer // map of dynamically starting informers - VaryingInformers map[string][]*resourceInformer + VaryingInformers varyingInformers eventCb func(kemtypes.KubeEvent) eventsEnabled bool // Index of namespaces statically defined in monitor configuration - staticNamespaces map[string]bool + staticNamespaces sync.Map - cancelForNs map[string]context.CancelFunc + cancelForNs cancelForNs ctx context.Context cancel context.CancelFunc @@ -51,6 +52,85 @@ type monitor struct { logger *log.Logger } +type varyingInformers struct { + value sync.Map +} + +func (v *varyingInformers) Store(key string, value []*resourceInformer) { + v.value.Store(key, value) +} + +func (v *varyingInformers) Load(key string) ([]*resourceInformer, bool) { + value, ok := v.value.Load(key) + if !ok { + return nil, false + } + if value, ok := value.([]*resourceInformer); ok { + return value, true + } + return nil, false +} + +func (v *varyingInformers) Range(f func(key string, value []*resourceInformer) bool) { + v.value.Range(func(key, value any) bool { + if key, ok := key.(string); ok { + if value, ok := value.([]*resourceInformer); ok { + return f(key, value) + } + } + return true + }) +} + +func (v *varyingInformers) RangeValue(f func(value []*resourceInformer)) { + v.value.Range(func(_, value any) bool { + if value, ok := value.([]*resourceInformer); ok { + f(value) + } + return true + }) +} + +func (v *varyingInformers) Delete(key string) { + v.value.Delete(key) +} + +type cancelForNs struct { + value sync.Map +} + +func (c *cancelForNs) Store(key string, value context.CancelFunc) { + c.value.Store(key, value) +} + +func (c *cancelForNs) Load(key string) (context.CancelFunc, bool) { + value, ok := c.value.Load(key) + if !ok { + return nil, false + } + if value, ok := value.(context.CancelFunc); ok { + return value, true + } + return nil, false +} + +func (c *cancelForNs) Range(f func(key string, value context.CancelFunc) bool) { + c.value.Range(func(key, value any) bool { + if key, ok := key.(string); ok { + if value, ok := value.(context.CancelFunc); ok { + return f(key, value) + } + } + return true + }) +} + +func (c *cancelForNs) Delete(key string) { + c.value.Delete(key) +} + +var _ Monitor = (*monitor)(nil) + func NewMonitor(ctx context.Context, client *klient.Client, mstor metric.Storage, config *MonitorConfig, eventCb func(kemtypes.KubeEvent), logger *log.Logger) *monitor { cctx, cancel := context.WithCancel(ctx) @@ -62,9 +142,9 @@ func NewMonitor(ctx context.Context, client *klient.Client, mstor metric.Storage Config: config, eventCb: eventCb, ResourceInformers: make([]*resourceInformer, 0), - VaryingInformers: make(map[string][]*resourceInformer), - cancelForNs: make(map[string]context.CancelFunc), - staticNamespaces: make(map[string]bool), + VaryingInformers: varyingInformers{value: sync.Map{}}, + cancelForNs: cancelForNs{value: sync.Map{}}, + staticNamespaces: sync.Map{}, logger: logger, } } @@ -98,7 +178,7 @@ func (m *monitor) CreateInformers() error { // This list of informers is static. for _, nsName := range nsNames { if nsName != "" { - m.staticNamespaces[nsName] = true + m.staticNamespaces.Store(nsName, true) } informers, err := m.CreateInformersForNamespace(nsName) if err != nil { @@ -115,29 +195,28 @@ func (m *monitor) CreateInformers() error { func(nsName string) { // Added/Modified event: check, create and run informers for Ns // ignore event if namespace is already has static ResourceInformers - if _, ok := m.staticNamespaces[nsName]; ok { + if _, ok := m.staticNamespaces.Load(nsName); ok { return } // ignore already started informers - _, ok := m.VaryingInformers[nsName] - if ok { + if _, ok := m.VaryingInformers.Load(nsName); ok { return } logEntry.Info("got ns, create dynamic ResourceInformers", slog.String("name", nsName)) - var err error - m.VaryingInformers[nsName], err = m.CreateInformersForNamespace(nsName) + varyingInformers, err := m.CreateInformersForNamespace(nsName) if err != nil { logEntry.Error("create ResourceInformers for ns", slog.String("name", nsName), log.Err(err)) } + m.VaryingInformers.Store(nsName, varyingInformers) - var ctx context.Context - ctx, m.cancelForNs[nsName] = context.WithCancel(m.ctx) + ctx, cancelForNs := context.WithCancel(m.ctx) + m.cancelForNs.Store(nsName, cancelForNs) - for _, informer := range m.VaryingInformers[nsName] { + for _, informer := range varyingInformers { informer.withContext(ctx) if m.eventsEnabled { informer.enableKubeEventCb() @@ -150,22 +229,23 @@ func (m *monitor) CreateInformers() error { logEntry.Info("deleted ns, stop dynamic ResourceInformers", slog.String("name", nsName)) // ignore statically specified namespaces - if _, ok := m.staticNamespaces[nsName]; ok { + if _, ok := m.staticNamespaces.Load(nsName); ok { return } // ignore already stopped informers - _, ok := m.cancelForNs[nsName] - if !ok { + if _, ok := m.cancelForNs.Load(nsName); !ok { return } - m.cancelForNs[nsName]() + if fn, ok := m.cancelForNs.Load(nsName); ok { + fn() + } // TODO wait - delete(m.VaryingInformers, nsName) - delete(m.cancelForNs, nsName) + m.VaryingInformers.Delete(nsName) + m.cancelForNs.Delete(nsName) }, ) if err != nil { @@ -175,17 +255,17 @@ func (m *monitor) CreateInformers() error { logEntry.Info("got ns, create dynamic ResourceInformers", slog.String("name", nsName)) // ignore event if namespace is already has static ResourceInformers - if _, ok := m.staticNamespaces[nsName]; ok { + if _, ok := m.staticNamespaces.Load(nsName); ok { continue } - var err error - m.VaryingInformers[nsName], err = m.CreateInformersForNamespace(nsName) + varyingInformers, err := m.CreateInformersForNamespace(nsName) if err != nil { logEntry.Error("create ResourceInformers for ns", slog.String("name", nsName), log.Err(err)) } + m.VaryingInformers.Store(nsName, varyingInformers) } } @@ -200,11 +280,11 @@ func (m *monitor) Snapshot() []kemtypes.ObjectAndFilterResult { objects = append(objects, informer.getCachedObjects()...) } - for nsName := range m.VaryingInformers { - for _, informer := range m.VaryingInformers[nsName] { + m.VaryingInformers.RangeValue(func(value []*resourceInformer) { + for _, informer := range value { objects = append(objects, informer.getCachedObjects()...) } - } + }) // Sort objects by namespace and name sort.Sort(kemtypes.ByNamespaceAndName(objects)) @@ -218,11 +298,12 @@ func (m *monitor) EnableKubeEventCb() { for _, informer := range m.ResourceInformers { informer.enableKubeEventCb() } - for nsName := range m.VaryingInformers { - for _, informer := range m.VaryingInformers[nsName] { + // Execute eventCb for events accumulated during "Synchronization" phase. + m.VaryingInformers.RangeValue(func(value []*resourceInformer) { + for _, informer := range value { informer.enableKubeEventCb() } - } + }) // Enable events for future VaryingInformers. m.eventsEnabled = true } @@ -269,14 +350,15 @@ func (m *monitor) Start(parentCtx context.Context) { informer.start() } - for nsName := range m.VaryingInformers { - var ctx context.Context - ctx, m.cancelForNs[nsName] = context.WithCancel(m.ctx) - for _, informer := range m.VaryingInformers[nsName] { + m.VaryingInformers.Range(func(nsName string, value []*resourceInformer) bool { + ctx, cancelForNs := context.WithCancel(m.ctx) + m.cancelForNs.Store(nsName, cancelForNs) + for _, informer := range value { informer.withContext(ctx) informer.start() } - } + return true + }) if m.NamespaceInformer != nil { m.NamespaceInformer.withContext(m.ctx) @@ -299,11 +381,11 @@ func (m *monitor) PauseHandleEvents() { informer.pauseHandleEvents() } - for _, informers := range m.VaryingInformers { - for _, informer := range informers { + m.VaryingInformers.RangeValue(func(value []*resourceInformer) { + for _, informer := range value { informer.pauseHandleEvents() } - } + }) if m.NamespaceInformer != nil { m.NamespaceInformer.pauseHandleEvents() @@ -319,12 +401,12 @@ func (m *monitor) SnapshotOperations() (*CachedObjectsInfo /*total*/, *CachedObj last.add(informer.getCachedObjectsInfoIncrement()) } - for nsName := range m.VaryingInformers { - for _, informer := range m.VaryingInformers[nsName] { + m.VaryingInformers.RangeValue(func(value []*resourceInformer) { + for _, informer := range value { total.add(informer.getCachedObjectsInfo()) last.add(informer.getCachedObjectsInfoIncrement()) } - } + }) return total, last } diff --git a/pkg/kube_events_manager/monitor_test.go b/pkg/kube_events_manager/monitor_test.go index 4cbba90c..c261c0ec 100644 --- a/pkg/kube_events_manager/monitor_test.go +++ b/pkg/kube_events_manager/monitor_test.go @@ -3,6 +3,7 @@ package kubeeventsmanager import ( "context" "fmt" + "sync" "testing" "github.com/deckhouse/deckhouse/pkg/log" @@ -38,6 +39,7 @@ func Test_Monitor_should_handle_dynamic_ns_events(t *testing.T) { }, } objsFromEvents := make([]string, 0) + var objsMutex sync.Mutex metricStorage := metric.NewStorageMock(t) metricStorage.HistogramObserveMock.Set(func(metric string, value float64, labels map[string]string, buckets []float64) { @@ -55,7 +57,9 @@ func Test_Monitor_should_handle_dynamic_ns_events(t *testing.T) { metricStorage.GaugeSetMock.When("{PREFIX}kube_snapshot_objects", 3, map[string]string(nil)).Then() mon := NewMonitor(context.Background(), fc.Client, metricStorage, monitorCfg, func(ev kemtypes.KubeEvent) { + objsMutex.Lock() objsFromEvents = append(objsFromEvents, snapshotResourceIDs(ev.Objects)...) + objsMutex.Unlock() }, log.NewNop()) // Start monitor. @@ -72,8 +76,11 @@ func Test_Monitor_should_handle_dynamic_ns_events(t *testing.T) { createNsWithLabels(fc, "test-ns-1", map[string]string{"test-label": ""}) // Wait until informers appears. - g.Eventually(mon.VaryingInformers, "5s", "10ms"). - Should(HaveKey("test-ns-1"), "Should create informer for new namespace") + g.Eventually(func() bool { + _, ok := mon.VaryingInformers.Load("test-ns-1") + return ok + }, "5s", "10ms"). + Should(BeTrue(), "Should create informer for new namespace") createCM(fc, "test-ns-1", testCM("cm-1")) @@ -92,7 +99,11 @@ func Test_Monitor_should_handle_dynamic_ns_events(t *testing.T) { mon.EnableKubeEventCb() // Should catch 2 events for cm-2 and cm-3. - g.Eventually(func() []string { return objsFromEvents }, "6s", "10ms"). + g.Eventually(func() []string { + objsMutex.Lock() + defer objsMutex.Unlock() + return objsFromEvents + }, "6s", "10ms"). Should(SatisfyAll( ContainElement("test-ns-1/ConfigMap/cm-2"), ContainElement("test-ns-1/ConfigMap/cm-3"), @@ -110,8 +121,11 @@ func Test_Monitor_should_handle_dynamic_ns_events(t *testing.T) { createNsWithLabels(fc, "test-ns-2", map[string]string{"test-label": ""}) // Monitor should create new configmap informer for new namespace. - g.Eventually(mon.VaryingInformers, "5s", "10ms"). - Should(HaveKey("test-ns-2"), "Should create informer for ns/test-ns-2") + g.Eventually(func() bool { + _, ok := mon.VaryingInformers.Load("test-ns-2") + return ok + }, "5s", "10ms"). + Should(BeTrue(), "Should create informer for ns/test-ns-2") // Create new ConfigMap after Synchronization. createCM(fc, "test-ns-2", testCM("cm-2-1")) @@ -122,15 +136,22 @@ func Test_Monitor_should_handle_dynamic_ns_events(t *testing.T) { }, "5s", "10ms").Should(ContainElement("test-ns-2/ConfigMap/cm-2-1"), "Should update snapshot on new ConfigMap after Synchronization") // Should catch event for cm-2-1. - g.Eventually(func() []string { return objsFromEvents }, "5s", "10ms"). - Should(ContainElement("test-ns-2/ConfigMap/cm-2-1"), "Should fire KubeEvent for new ConfigMap after Synchronization", objsFromEvents) + g.Eventually(func() []string { + objsMutex.Lock() + defer objsMutex.Unlock() + return objsFromEvents + }, "5s", "10ms"). + Should(ContainElement("test-ns-2/ConfigMap/cm-2-1"), "Should fire KubeEvent for new ConfigMap after Synchronization") // Add non-matched Namespace. createNsWithLabels(fc, "test-ns-non-matched", map[string]string{"non-matched-label": ""}) // Monitor should create new configmap informer for new namespace. - g.Eventually(mon.VaryingInformers, "5s", "10ms"). - ShouldNot(HaveKey("test-ns-non-matched"), "Should not create informer for non-mathed Namespace") + g.Eventually(func() bool { + _, ok := mon.VaryingInformers.Load("test-ns-non-matched") + return ok + }, "5s", "10ms"). + ShouldNot(BeTrue(), "Should not create informer for non-matched Namespace") } func createNsWithLabels(fc *fake.Cluster, name string, labels map[string]string) { diff --git a/pkg/schedule_manager/schedule_manager.go b/pkg/schedule_manager/schedule_manager.go index b62be1a1..fb765b25 100644 --- a/pkg/schedule_manager/schedule_manager.go +++ b/pkg/schedule_manager/schedule_manager.go @@ -33,7 +33,7 @@ type scheduleManager struct { logger *log.Logger } -var _ ScheduleManager = &scheduleManager{} +var _ ScheduleManager = (*scheduleManager)(nil) func NewScheduleManager(ctx context.Context, logger *log.Logger) *scheduleManager { cctx, cancel := context.WithCancel(ctx) diff --git a/test/hook/context/generator.go b/test/hook/context/generator.go index 3dbe9314..88179259 100644 --- a/test/hook/context/generator.go +++ b/test/hook/context/generator.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "time" "github.com/deckhouse/deckhouse/pkg/log" @@ -41,7 +42,7 @@ type BindingContextController struct { fakeCluster *fake.Cluster mu sync.Mutex - started bool + started atomic.Bool logger *log.Logger } @@ -67,7 +68,7 @@ func NewBindingContextController(config string, logger *log.Logger, version ...f b.KubeEventsManager = kubeeventsmanager.NewKubeEventsManager(ctx, b.fakeCluster.Client, b.logger.Named("kube-events-manager")) b.KubeEventsManager.WithMetricStorage(metricstorage.NewMetricStorage(ctx, "metrics-prefix", false, log.NewNop())) // Re-create factory to drop informers created using different b.fakeCluster.Client. - kubeeventsmanager.DefaultFactoryStore = kubeeventsmanager.NewFactoryStore() + kubeeventsmanager.DefaultFactoryStore.Reset() b.ScheduleManager = schedulemanager.NewScheduleManager(ctx, b.logger.Named("schedule-manager")) @@ -94,9 +95,8 @@ func (b *BindingContextController) RegisterCRD(group, version, kind string, name func (b *BindingContextController) Run(initialState string) (GeneratedBindingContexts, error) { b.mu.Lock() defer b.mu.Unlock() - - if b.started { - return GeneratedBindingContexts{}, fmt.Errorf("attempt to runner started runner, it cannot be started twice") + if b.started.Load() { + return GeneratedBindingContexts{}, fmt.Errorf("attempt to start an already started runner, it cannot be started twice") } err := b.Controller.SetInitialState(initialState) @@ -131,7 +131,7 @@ func (b *BindingContextController) Run(initialState string) (GeneratedBindingCon } b.HookCtrl.UnlockKubernetesEvents() - b.started = true + b.started.Store(true) time.Sleep(50 * time.Millisecond) return cc.CombinedAndUpdated(b.HookCtrl)