Skip to content

Commit f3e8c45

Browse files
author
Evsyukov Denis
committed
refactor: initialize default factory store and reset in KubeEventsManager for consistent informer management
Signed-off-by: Evsyukov Denis <denis.evsyukov@flant.com>
1 parent f27e013 commit f3e8c45

3 files changed

Lines changed: 20 additions & 6 deletions

File tree

pkg/kube_events_manager/factory.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,24 @@ import (
66
"sync"
77
"time"
88

9-
"github.com/deckhouse/deckhouse/pkg/log"
109
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1110
"k8s.io/apimachinery/pkg/runtime/schema"
1211
"k8s.io/apimachinery/pkg/util/wait"
1312
"k8s.io/client-go/dynamic"
1413
"k8s.io/client-go/dynamic/dynamicinformer"
1514
"k8s.io/client-go/tools/cache"
15+
16+
"github.com/deckhouse/deckhouse/pkg/log"
17+
)
18+
19+
var (
20+
DefaultFactoryStore *FactoryStore
21+
DefaultSyncTime = 100 * time.Millisecond
1622
)
1723

18-
var DefaultSyncTime = 100 * time.Millisecond
24+
func init() {
25+
DefaultFactoryStore = NewFactoryStore()
26+
}
1927

2028
type FactoryIndex struct {
2129
GVR schema.GroupVersionResource
@@ -42,6 +50,12 @@ func NewFactoryStore() *FactoryStore {
4250
}
4351
}
4452

53+
func (c *FactoryStore) Reset() {
54+
c.mu.Lock()
55+
defer c.mu.Unlock()
56+
c.data = make(map[FactoryIndex]Factory)
57+
}
58+
4559
func (c *FactoryStore) add(index FactoryIndex, f dynamicinformer.DynamicSharedInformerFactory) {
4660
ctx, cancel := context.WithCancel(context.Background())
4761
c.data[index] = Factory{

pkg/kube_events_manager/resource_informer.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ type resourceInformer struct {
3737
GroupVersionResource schema.GroupVersionResource
3838
ListOptions metav1.ListOptions
3939

40-
defaultFactoryStore *FactoryStore
4140
// A cache of objects and filterResults. It is a part of the Monitor's snapshot.
4241
cachedObjects map[string]*kemtypes.ObjectAndFilterResult
4342
cacheLock sync.RWMutex
@@ -92,7 +91,6 @@ func newResourceInformer(ns, name string, cfg *resourceInformerConfig) *resource
9291
cachedObjectsInfo: &CachedObjectsInfo{},
9392
cachedObjectsIncrement: &CachedObjectsInfo{},
9493
logger: cfg.logger,
95-
defaultFactoryStore: NewFactoryStore(),
9694
}
9795
return informer
9896
}
@@ -457,13 +455,13 @@ func (ei *resourceInformer) start() {
457455
go func() {
458456
if ei.ctx != nil {
459457
<-ei.ctx.Done()
460-
ei.defaultFactoryStore.Stop(ei.id, ei.FactoryIndex)
458+
DefaultFactoryStore.Stop(ei.id, ei.FactoryIndex)
461459
}
462460
}()
463461

464462
// TODO: separate handler and informer
465463
errorHandler := newWatchErrorHandler(ei.Monitor.Metadata.DebugName, ei.Monitor.Kind, ei.Monitor.Metadata.LogLabels, ei.metricStorage, ei.logger.Named("watch-error-handler"))
466-
err := ei.defaultFactoryStore.Start(ei.ctx, ei.id, ei.KubeClient.Dynamic(), ei.FactoryIndex, ei, errorHandler)
464+
err := DefaultFactoryStore.Start(ei.ctx, ei.id, ei.KubeClient.Dynamic(), ei.FactoryIndex, ei, errorHandler)
467465
if err != nil {
468466
ei.Monitor.Logger.Error("cache is not synced for informer", slog.String("debugName", ei.Monitor.Metadata.DebugName))
469467
return

test/hook/context/generator.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ func NewBindingContextController(config string, logger *log.Logger, version ...f
6767

6868
b.KubeEventsManager = kubeeventsmanager.NewKubeEventsManager(ctx, b.fakeCluster.Client, b.logger.Named("kube-events-manager"))
6969
b.KubeEventsManager.WithMetricStorage(metricstorage.NewMetricStorage(ctx, "metrics-prefix", false, log.NewNop()))
70+
// Re-create factory to drop informers created using different b.fakeCluster.Client.
71+
kubeeventsmanager.DefaultFactoryStore.Reset()
7072

7173
b.ScheduleManager = schedulemanager.NewScheduleManager(ctx, b.logger.Named("schedule-manager"))
7274

0 commit comments

Comments
 (0)