Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
d9ce4c5
refactor: replace map with sync.Map for VaryingInformers in monitor
Apr 8, 2025
8b8fb60
fix: correct locking mechanism in BindingContextController to prevent…
Apr 8, 2025
5e9bf83
fix: add mutex for thread-safe access to objsFromEvents in monitor tests
Apr 8, 2025
0873339
refactor: simplify Range function parameters in VaryingInformers
Apr 8, 2025
163443f
refactor: remove duplicate log import in monitor_test.go
Apr 8, 2025
78f56f4
refactor: change return type of NewKubeEventsManager and NewMonitor t…
Apr 8, 2025
7531e0b
refactor: change KubeEventsManager fields to pointers for consistency…
Apr 8, 2025
eaa1dd0
refactor: change ScheduleManager fields to pointers for consistency a…
Apr 8, 2025
0084430
refactor: rename resourceInformer to ResourceInformer for consistency
Apr 8, 2025
764fe20
test: enable race detection in test execution
Apr 9, 2025
9feaca0
refactor: replace boolean flag with atomic.Bool for thread-safe state…
Apr 9, 2025
dbb7344
refactor: initialize ScheduleManager and KubeEventsManager as pointer…
Apr 9, 2025
a488189
refactor: replace Swap with Store for boolean state management and ad…
Apr 9, 2025
7a10505
refactor: change pointer receivers to value receivers for KubeEventsM…
Apr 9, 2025
933ef2d
refactor: replace assert with require in tests for improved error han…
Apr 9, 2025
7a5d343
refactor: change function return types to pointers for KubeEventsMana…
Apr 9, 2025
3881967
refactor: add integration test cases for namespace informers in monit…
Apr 9, 2025
3e00eac
refactor: update informer checks in monitor tests for improved clarit…
Apr 9, 2025
4484a57
refactor: reset factory store in KubeEventsManager to ensure consiste…
Apr 9, 2025
0c351b0
refactor: replace sync.Map with custom types for varying informers an…
Apr 9, 2025
6d18229
refactor: introduce RangeValue method for varyingInformers to simplif…
Apr 9, 2025
6c02343
refactor: improve cancellation function handling in monitor by checki…
Apr 10, 2025
f27e013
refactor: remove global factory store initialization and use instance…
Apr 10, 2025
6596696
refactor: initialize default factory store and reset in KubeEventsMan…
Apr 10, 2025
ebc9ab1
Update pkg/kube_events_manager/monitor_test.go
yalosev Apr 10, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,6 @@ jobs:
export GOOS=linux

go test \
--race \
-tags test \
./cmd/... ./pkg/... ./test/utils
2 changes: 1 addition & 1 deletion pkg/hook/controller/kubernetes_bindings_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type kubernetesBindingsController struct {
}

// kubernetesHooksController should implement the KubernetesHooksController
var _ KubernetesBindingsController = &kubernetesBindingsController{}
var _ KubernetesBindingsController = (*kubernetesBindingsController)(nil)

// NewKubernetesBindingsController returns an implementation of KubernetesBindingsController
var NewKubernetesBindingsController = func(logger *log.Logger) *kubernetesBindingsController {
Expand Down
6 changes: 3 additions & 3 deletions pkg/hook/controller/schedule_bindings_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
schedulemanager "github.com/flant/shell-operator/pkg/schedule_manager"
)

// A link between a hook and a kube monitor
// ScheduleBindingToCrontabLink a link between a hook and a kube monitor
type ScheduleBindingToCrontabLink struct {
BindingName string
Crontab string
Expand All @@ -29,7 +29,7 @@ type ScheduleBindingsController interface {
HandleEvent(crontab string) []BindingExecutionInfo
}

// scheduleHooksController is a main implementation of KubernetesHooksController
// scheduleBindingsController is a main implementation of KubernetesHooksController
type scheduleBindingsController struct {
// dependencies
scheduleManager schedulemanager.ScheduleManager
Expand All @@ -43,7 +43,7 @@ type scheduleBindingsController struct {
}

// kubernetesHooksController should implement the KubernetesHooksController
var _ ScheduleBindingsController = &scheduleBindingsController{}
var _ ScheduleBindingsController = (*scheduleBindingsController)(nil)

// NewScheduleBindingsController returns an implementation of ScheduleBindingsController
var NewScheduleBindingsController = func() *scheduleBindingsController {
Expand Down
6 changes: 6 additions & 0 deletions pkg/kube_events_manager/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ func NewFactoryStore() *FactoryStore {
}
}

func (c *FactoryStore) Reset() {
c.mu.Lock()
defer c.mu.Unlock()
c.data = make(map[FactoryIndex]Factory)
}

func (c *FactoryStore) add(index FactoryIndex, f dynamicinformer.DynamicSharedInformerFactory) {
ctx, cancel := context.WithCancel(context.Background())
c.data[index] = Factory{
Expand Down
164 changes: 123 additions & 41 deletions pkg/kube_events_manager/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"log/slog"
"sort"
"sync"

"github.com/deckhouse/deckhouse/pkg/log"

Expand Down Expand Up @@ -35,14 +36,14 @@ type monitor struct {
// Namespace informer to get new namespaces
NamespaceInformer *namespaceInformer
// map of dynamically starting informers
VaryingInformers map[string][]*resourceInformer
VaryingInformers varyingInformers

eventCb func(kemtypes.KubeEvent)
eventsEnabled bool
// Index of namespaces statically defined in monitor configuration
staticNamespaces map[string]bool
staticNamespaces sync.Map

cancelForNs map[string]context.CancelFunc
cancelForNs cancelForNs

ctx context.Context
cancel context.CancelFunc
Expand All @@ -51,6 +52,85 @@ type monitor struct {
logger *log.Logger
}

type varyingInformers struct {
value sync.Map
}

func (v *varyingInformers) Store(key string, value []*resourceInformer) {
v.value.Store(key, value)
}

func (v *varyingInformers) Load(key string) ([]*resourceInformer, bool) {
value, ok := v.value.Load(key)
if !ok {
return nil, false
}
if value, ok := value.([]*resourceInformer); ok {
return value, true
}
return nil, false
}

func (v *varyingInformers) Range(f func(key string, value []*resourceInformer) bool) {
v.value.Range(func(key, value any) bool {
if key, ok := key.(string); ok {
if value, ok := value.([]*resourceInformer); ok {
return f(key, value)
}
}
return true
})
}

func (v *varyingInformers) RangeValue(f func(value []*resourceInformer)) {
v.value.Range(func(_, value any) bool {
if value, ok := value.([]*resourceInformer); ok {
f(value)
}
return true
})
}

func (v *varyingInformers) Delete(key string) {
v.value.Delete(key)
}

type cancelForNs struct {
value sync.Map
}

func (c *cancelForNs) Store(key string, value context.CancelFunc) {
c.value.Store(key, value)
}

func (c *cancelForNs) Load(key string) (context.CancelFunc, bool) {
value, ok := c.value.Load(key)
if !ok {
return nil, false
}
if value, ok := value.(context.CancelFunc); ok {
return value, true
}
return nil, false
}

func (c *cancelForNs) Range(f func(key string, value context.CancelFunc) bool) {
c.value.Range(func(key, value any) bool {
if key, ok := key.(string); ok {
if value, ok := value.(context.CancelFunc); ok {
return f(key, value)
}
}
return true
})
}

func (c *cancelForNs) Delete(key string) {
c.value.Delete(key)
}

var _ Monitor = (*monitor)(nil)

func NewMonitor(ctx context.Context, client *klient.Client, mstor metric.Storage, config *MonitorConfig, eventCb func(kemtypes.KubeEvent), logger *log.Logger) *monitor {
cctx, cancel := context.WithCancel(ctx)

Expand All @@ -62,9 +142,9 @@ func NewMonitor(ctx context.Context, client *klient.Client, mstor metric.Storage
Config: config,
eventCb: eventCb,
ResourceInformers: make([]*resourceInformer, 0),
VaryingInformers: make(map[string][]*resourceInformer),
cancelForNs: make(map[string]context.CancelFunc),
staticNamespaces: make(map[string]bool),
VaryingInformers: varyingInformers{value: sync.Map{}},
cancelForNs: cancelForNs{value: sync.Map{}},
staticNamespaces: sync.Map{},
logger: logger,
}
}
Expand Down Expand Up @@ -98,7 +178,7 @@ func (m *monitor) CreateInformers() error {
// This list of informers is static.
for _, nsName := range nsNames {
if nsName != "" {
m.staticNamespaces[nsName] = true
m.staticNamespaces.Store(nsName, true)
}
informers, err := m.CreateInformersForNamespace(nsName)
if err != nil {
Expand All @@ -115,29 +195,28 @@ func (m *monitor) CreateInformers() error {
func(nsName string) {
// Added/Modified event: check, create and run informers for Ns
// ignore event if namespace is already has static ResourceInformers
if _, ok := m.staticNamespaces[nsName]; ok {
if _, ok := m.staticNamespaces.Load(nsName); ok {
return
}
// ignore already started informers
_, ok := m.VaryingInformers[nsName]
if ok {
if _, ok := m.VaryingInformers.Load(nsName); ok {
return
}

logEntry.Info("got ns, create dynamic ResourceInformers", slog.String("name", nsName))

var err error
m.VaryingInformers[nsName], err = m.CreateInformersForNamespace(nsName)
varyingInformers, err := m.CreateInformersForNamespace(nsName)
if err != nil {
logEntry.Error("create ResourceInformers for ns",
slog.String("name", nsName),
log.Err(err))
}
m.VaryingInformers.Store(nsName, varyingInformers)

var ctx context.Context
ctx, m.cancelForNs[nsName] = context.WithCancel(m.ctx)
ctx, cancelForNs := context.WithCancel(m.ctx)
m.cancelForNs.Store(nsName, cancelForNs)

for _, informer := range m.VaryingInformers[nsName] {
for _, informer := range varyingInformers {
informer.withContext(ctx)
if m.eventsEnabled {
informer.enableKubeEventCb()
Expand All @@ -150,22 +229,23 @@ func (m *monitor) CreateInformers() error {
logEntry.Info("deleted ns, stop dynamic ResourceInformers", slog.String("name", nsName))

// ignore statically specified namespaces
if _, ok := m.staticNamespaces[nsName]; ok {
if _, ok := m.staticNamespaces.Load(nsName); ok {
return
}

// ignore already stopped informers
_, ok := m.cancelForNs[nsName]
if !ok {
if _, ok := m.cancelForNs.Load(nsName); !ok {
return
}

m.cancelForNs[nsName]()
if fn, ok := m.cancelForNs.Load(nsName); ok {
fn()
}

// TODO wait

delete(m.VaryingInformers, nsName)
delete(m.cancelForNs, nsName)
m.VaryingInformers.Delete(nsName)
m.cancelForNs.Delete(nsName)
},
)
if err != nil {
Expand All @@ -175,17 +255,17 @@ func (m *monitor) CreateInformers() error {
logEntry.Info("got ns, create dynamic ResourceInformers", slog.String("name", nsName))

// ignore event if namespace is already has static ResourceInformers
if _, ok := m.staticNamespaces[nsName]; ok {
if _, ok := m.staticNamespaces.Load(nsName); ok {
continue
}

var err error
m.VaryingInformers[nsName], err = m.CreateInformersForNamespace(nsName)
varyingInformers, err := m.CreateInformersForNamespace(nsName)
if err != nil {
logEntry.Error("create ResourceInformers for ns",
slog.String("name", nsName),
log.Err(err))
}
m.VaryingInformers.Store(nsName, varyingInformers)
}
}

Expand All @@ -200,11 +280,11 @@ func (m *monitor) Snapshot() []kemtypes.ObjectAndFilterResult {
objects = append(objects, informer.getCachedObjects()...)
}

for nsName := range m.VaryingInformers {
for _, informer := range m.VaryingInformers[nsName] {
m.VaryingInformers.RangeValue(func(value []*resourceInformer) {
for _, informer := range value {
objects = append(objects, informer.getCachedObjects()...)
}
}
})

// Sort objects by namespace and name
sort.Sort(kemtypes.ByNamespaceAndName(objects))
Expand All @@ -218,11 +298,12 @@ func (m *monitor) EnableKubeEventCb() {
for _, informer := range m.ResourceInformers {
informer.enableKubeEventCb()
}
for nsName := range m.VaryingInformers {
for _, informer := range m.VaryingInformers[nsName] {
// Execute eventCb for events accumulated during "Synchronization" phase.
m.VaryingInformers.RangeValue(func(value []*resourceInformer) {
for _, informer := range value {
informer.enableKubeEventCb()
}
}
})
// Enable events for future VaryingInformers.
m.eventsEnabled = true
}
Expand Down Expand Up @@ -269,14 +350,15 @@ func (m *monitor) Start(parentCtx context.Context) {
informer.start()
}

for nsName := range m.VaryingInformers {
var ctx context.Context
ctx, m.cancelForNs[nsName] = context.WithCancel(m.ctx)
for _, informer := range m.VaryingInformers[nsName] {
m.VaryingInformers.Range(func(nsName string, value []*resourceInformer) bool {
ctx, cancelForNs := context.WithCancel(m.ctx)
m.cancelForNs.Store(nsName, cancelForNs)
for _, informer := range value {
informer.withContext(ctx)
informer.start()
}
}
return true
})

if m.NamespaceInformer != nil {
m.NamespaceInformer.withContext(m.ctx)
Expand All @@ -299,11 +381,11 @@ func (m *monitor) PauseHandleEvents() {
informer.pauseHandleEvents()
}

for _, informers := range m.VaryingInformers {
for _, informer := range informers {
m.VaryingInformers.RangeValue(func(value []*resourceInformer) {
for _, informer := range value {
informer.pauseHandleEvents()
}
}
})

if m.NamespaceInformer != nil {
m.NamespaceInformer.pauseHandleEvents()
Expand All @@ -319,12 +401,12 @@ func (m *monitor) SnapshotOperations() (*CachedObjectsInfo /*total*/, *CachedObj
last.add(informer.getCachedObjectsInfoIncrement())
}

for nsName := range m.VaryingInformers {
for _, informer := range m.VaryingInformers[nsName] {
m.VaryingInformers.RangeValue(func(value []*resourceInformer) {
for _, informer := range value {
total.add(informer.getCachedObjectsInfo())
last.add(informer.getCachedObjectsInfoIncrement())
}
}
})

return total, last
}
Loading
Loading