Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,32 @@ type Cache interface {

// Informers loads informers and adds field indices.
Informers

// SetMinimumRVForGVKAndKey causes subsequent Get requests for the given
// GVK and key to block until the informer for that GVK has observed a
// resource version >= rv (or the context times out). For List requests
// on the given GVK, it blocks until the highest minimum RV across all
// keys for that GVK has been observed.
//
// TODO: This shouldn't be part of the public interface
SetMinimumRVForGVKAndKey(gvk schema.GroupVersionKind, key client.ObjectKey, rv int64)

// AddRequiredDeleteForGVKKeyAndUID causes subsequent Get requests for the
// given GVK and key to block until the UID has been observed as deleted.
// For List requests on the given GVK, it blocks until all pending delete
// UIDs across all keys for that GVK have been observed.
//
//
// An informer for the given object must have been created before the delete
// added here was executed, otherwise this will cause a deadlock.
//
// TODO: This shouldn't be part of the public interface
AddRequiredDeleteForObject(obj client.Object) error

// RemoveRequiredDeleteForObject removes a previously added pending delete.
//
// TODO: This shouldn't be part of the public interface
RemoveRequiredDeleteForObject(obj client.Object) error
}

// Informers knows how to create or fetch informers for different
Expand Down Expand Up @@ -129,6 +155,11 @@ type Informer interface {
HasSynced() bool
// IsStopped returns true if the informer has been stopped.
IsStopped() bool

// LastSyncResourceVersion is the resource version observed when last synced with the underlying
// store. The value returned is not synchronized with access to the underlying store and is not
// thread-safe.
LastSyncResourceVersion() string
}

// AllNamespaces should be used as the map key to deliminate namespace settings
Expand Down
21 changes: 21 additions & 0 deletions pkg/cache/delegating_by_gvk_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package cache

import (
"context"
"fmt"
"maps"
"slices"
"strings"
Expand Down Expand Up @@ -69,6 +70,26 @@ func (dbt *delegatingByGVKCache) GetInformer(ctx context.Context, obj client.Obj
return cache.GetInformer(ctx, obj, opts...)
}

func (dbt *delegatingByGVKCache) SetMinimumRVForGVKAndKey(gvk schema.GroupVersionKind, key client.ObjectKey, rv int64) {
dbt.cacheForGVK(gvk).SetMinimumRVForGVKAndKey(gvk, key, rv)
}

func (dbt *delegatingByGVKCache) AddRequiredDeleteForObject(obj client.Object) error {
cache, err := dbt.cacheForObject(obj)
if err != nil {
return err
}
return cache.AddRequiredDeleteForObject(obj)
}

func (dbt *delegatingByGVKCache) RemoveRequiredDeleteForObject(obj client.Object) error {
cache, err := dbt.cacheForObject(obj)
if err != nil {
return fmt.Errorf("getting cache for object: %w", err)
}
return cache.RemoveRequiredDeleteForObject(obj)
}

func (dbt *delegatingByGVKCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...InformerGetOption) (Informer, error) {
return dbt.cacheForGVK(gvk).GetInformerForKind(ctx, gvk, opts...)
}
Expand Down
49 changes: 47 additions & 2 deletions pkg/cache/informer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ func (ic *informerCache) Get(ctx context.Context, key client.ObjectKey, out clie
if !started {
return &ErrCacheNotStarted{}
}
return cache.Reader.Get(ctx, key, out, opts...)

return cache.Reader.Get(ctx, key, out, ic.Informers.MinimumRVs.GetForKey(gvk, key), opts...)
}

// List implements Reader.
Expand All @@ -95,7 +96,7 @@ func (ic *informerCache) List(ctx context.Context, out client.ObjectList, opts .
return &ErrCacheNotStarted{}
}

return cache.Reader.List(ctx, out, opts...)
return cache.Reader.List(ctx, out, ic.Informers.MinimumRVs.GetMaxForGVK(*gvk), opts...)
}

// objectTypeForListObject tries to find the runtime.Object and associated GVK
Expand Down Expand Up @@ -177,6 +178,50 @@ func (ic *informerCache) getInformerForKind(ctx context.Context, gvk schema.Grou
return started, cache, nil
}

func (ic *informerCache) SetMinimumRVForGVKAndKey(gvk schema.GroupVersionKind, key client.ObjectKey, rv int64) {
ic.MinimumRVs.Set(gvk, key, rv)
}

func (ic *informerCache) AddRequiredDeleteForObject(obj client.Object) error {
gvk, err := apiutil.GVKForObject(obj, ic.scheme)
if err != nil {
return err
}
cache, started, ok := ic.Peek(gvk, obj)
if !ok {
return fmt.Errorf("informer for GVK %v not found in cache", gvk)
}
if !started {
return &ErrCacheNotStarted{}
}
if !cache.Informer.HasSynced() {
return fmt.Errorf("informer for GVK %v is not synced", gvk)
}

cache.Reader.ConsistencyHandler.AddPendingDelete(
client.ObjectKey{Namespace: obj.GetNamespace(), Name: obj.GetName()},
obj.GetUID(),
)
return nil
}

func (ic *informerCache) RemoveRequiredDeleteForObject(obj client.Object) error {
gvk, err := apiutil.GVKForObject(obj, ic.scheme)
if err != nil {
return fmt.Errorf("failed to get GVK for object: %w", err)
}
cache, _, ok := ic.Peek(gvk, obj)
if !ok {
return fmt.Errorf("informer for GVK %v not found in cache", gvk)
}
cache.Reader.ConsistencyHandler.RemovePendingDelete(
client.ObjectKey{Namespace: obj.GetNamespace(), Name: obj.GetName()},
obj.GetUID(),
)

return nil
}

// RemoveInformer deactivates and removes the informer from the cache.
func (ic *informerCache) RemoveInformer(_ context.Context, obj client.Object) error {
gvk, err := apiutil.GVKForObject(obj, ic.scheme)
Expand Down
14 changes: 14 additions & 0 deletions pkg/cache/informertest/fake_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,20 @@ func (c *FakeInformers) IndexField(ctx context.Context, obj client.Object, field
return nil
}

// SetMinimumRVForGVKAndKey implements Cache.
func (c *FakeInformers) SetMinimumRVForGVKAndKey(_ schema.GroupVersionKind, _ client.ObjectKey, _ int64) {
}

// AddRequiredDeleteForObject implements Cache.
func (c *FakeInformers) AddRequiredDeleteForObject(client.Object) error {
return nil
}

// RemoveRequiredDeleteForObject implements Cache.
func (c *FakeInformers) RemoveRequiredDeleteForObject(client.Object) error {
return nil
}

// Get implements Cache.
func (c *FakeInformers) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error {
return nil
Expand Down
18 changes: 13 additions & 5 deletions pkg/cache/internal/cache_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,11 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/cache"

"sigs.k8s.io/controller-runtime/pkg/cache/internal/readerconsistency"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/internal/field/selector"
)

// CacheReader is a client.Reader.
var _ client.Reader = &CacheReader{}

// CacheReader wraps a cache.Index to implement the client.Reader interface for a single type.
type CacheReader struct {
// indexer is the underlying indexer wrapped by this cache.
Expand All @@ -52,10 +50,12 @@ type CacheReader struct {
// Be very careful with this, when enabled you must DeepCopy any object before mutating it,
// otherwise you will mutate the object in the cache.
disableDeepCopy bool

*readerconsistency.ConsistencyHandler
}

// Get checks the indexer for the object and writes a copy of it if found.
func (c *CacheReader) Get(_ context.Context, key client.ObjectKey, out client.Object, opts ...client.GetOption) error {
func (c *CacheReader) Get(ctx context.Context, key client.ObjectKey, out client.Object, minRV int64, opts ...client.GetOption) error {
getOpts := client.GetOptions{}
getOpts.ApplyOptions(opts)

Expand All @@ -64,6 +64,10 @@ func (c *CacheReader) Get(_ context.Context, key client.ObjectKey, out client.Ob
}
storeKey := objectKeyToStoreKey(key)

if err := c.ConsistencyHandler.WaitForGet(ctx, key, minRV); err != nil {
return err
}

// Lookup the object from the indexer cache
obj, exists, err := c.indexer.GetByKey(storeKey)
if err != nil {
Expand Down Expand Up @@ -109,7 +113,7 @@ func (c *CacheReader) Get(_ context.Context, key client.ObjectKey, out client.Ob
}

// List lists items out of the indexer and writes them to out.
func (c *CacheReader) List(_ context.Context, out client.ObjectList, opts ...client.ListOption) error {
func (c *CacheReader) List(ctx context.Context, out client.ObjectList, minRV int64, opts ...client.ListOption) error {
var objs []any
var err error

Expand All @@ -120,6 +124,10 @@ func (c *CacheReader) List(_ context.Context, out client.ObjectList, opts ...cli
return fmt.Errorf("continue list option is not supported by the cache")
}

if err := c.ConsistencyHandler.WaitForList(ctx, minRV); err != nil {
return err
}

switch {
case listOpts.FieldSelector != nil:
requiresExact := selector.RequiresExactMatch(listOpts.FieldSelector)
Expand Down
89 changes: 85 additions & 4 deletions pkg/cache/internal/informers.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
"k8s.io/client-go/metadata"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/cache/internal/readerconsistency"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
"sigs.k8s.io/controller-runtime/pkg/internal/syncs"
Expand Down Expand Up @@ -101,6 +103,7 @@ func NewInformers(config *rest.Config, options *InformersOpts) *Informers {
enableWatchBookmarks: options.EnableWatchBookmarks,
newInformer: newInformer,
watchErrorHandler: options.WatchErrorHandler,
MinimumRVs: newMinimumRVStore(),
}
}

Expand Down Expand Up @@ -205,6 +208,12 @@ type Informers struct {
// watchErrorHandler to be set by overriding the options
// or to use the default watchErrorHandler
watchErrorHandler cache.WatchErrorHandlerWithContext

// MinimumRVs stores the minimum RVs we must have seen before returning reads. Due to RV
// being just a version, we can store this before we have an informer. This is
// different from deletes, where we can only observe deletes once we do have an informer,
// so we only allow storing required deletes as part of the informer.
MinimumRVs *minimumRVStore
}

// Start calls Run on each of the informers and sets started to true. Blocks on the context.
Expand Down Expand Up @@ -412,14 +421,22 @@ func (ip *Informers) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.O
return nil, false, err
}

consistencyHandler := readerconsistency.NewHandler(func(currentRV int64) {
ip.MinimumRVs.Cleanup(gvk, currentRV)
})
if _, err := sharedIndexInformer.AddEventHandler(consistencyHandler); err != nil {
return nil, false, fmt.Errorf("failed to add readerconsistency handler: %w", err)
}

// Create the new entry and set it in the map.
i := &Cache{
Informer: sharedIndexInformer,
Reader: CacheReader{
indexer: sharedIndexInformer.GetIndexer(),
groupVersionKind: gvk,
scopeName: mapping.Scope.Name(),
disableDeepCopy: ip.unsafeDisableDeepCopy,
indexer: sharedIndexInformer.GetIndexer(),
groupVersionKind: gvk,
scopeName: mapping.Scope.Name(),
disableDeepCopy: ip.unsafeDisableDeepCopy,
ConsistencyHandler: consistencyHandler,
},
stop: make(chan struct{}),
}
Expand Down Expand Up @@ -629,3 +646,67 @@ func restrictNamespaceBySelector(namespaceOpt string, s Selector) string {
}
return ""
}

type minimumRVStore struct {
mu sync.Mutex
minimums map[schema.GroupVersionKind]map[client.ObjectKey]int64
}

func newMinimumRVStore() *minimumRVStore {
return &minimumRVStore{
minimums: make(map[schema.GroupVersionKind]map[client.ObjectKey]int64),
}
}

func (s *minimumRVStore) Set(gvk schema.GroupVersionKind, key client.ObjectKey, rv int64) {
s.mu.Lock()
defer s.mu.Unlock()

if s.minimums[gvk] == nil {
s.minimums[gvk] = make(map[client.ObjectKey]int64)
}
s.minimums[gvk][key] = rv
}

func (s *minimumRVStore) GetForKey(gvk schema.GroupVersionKind, key client.ObjectKey) int64 {
s.mu.Lock()
defer s.mu.Unlock()

keys, ok := s.minimums[gvk]
if !ok {
return 0
}
return keys[key]
}

func (s *minimumRVStore) GetMaxForGVK(gvk schema.GroupVersionKind) int64 {
s.mu.Lock()
defer s.mu.Unlock()

keys, ok := s.minimums[gvk]
if !ok || len(keys) == 0 {
return 0
}
var maxRV int64
for _, rv := range keys {
if rv > maxRV {
maxRV = rv
}
}
return maxRV
}

func (s *minimumRVStore) Cleanup(gvk schema.GroupVersionKind, currentRV int64) {
s.mu.Lock()
defer s.mu.Unlock()

keys, ok := s.minimums[gvk]
if !ok {
return
}
for key, rv := range keys {
if rv <= currentRV {
delete(keys, key)
}
}
}
Loading
Loading