diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index b814170de1..a41b353571 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -68,6 +68,32 @@ type Cache interface { // Informers loads informers and adds field indices. Informers + + // SetMinimumRVForGVKAndKey causes subsequent Get requests for the given + // GVK and key to block until the informer for that GVK has observed a + // resource version >= rv (or the context times out). For List requests + // on the given GVK, it blocks until the highest minimum RV across all + // keys for that GVK has been observed. + // + // TODO: This shouldn't be part of the public interface + SetMinimumRVForGVKAndKey(gvk schema.GroupVersionKind, key client.ObjectKey, rv int64) + + // AddRequiredDeleteForGVKKeyAndUID causes subsequent Get requests for the + // given GVK and key to block until the UID has been observed as deleted. + // For List requests on the given GVK, it blocks until all pending delete + // UIDs across all keys for that GVK have been observed. + // + // + // An informer for the given object must have been created before the delete + // added here was executed, otherwise this will cause a deadlock. + // + // TODO: This shouldn't be part of the public interface + AddRequiredDeleteForObject(obj client.Object) error + + // RemoveRequiredDeleteForObject removes a previously added pending delete. + // + // TODO: This shouldn't be part of the public interface + RemoveRequiredDeleteForObject(obj client.Object) error } // Informers knows how to create or fetch informers for different @@ -129,6 +155,11 @@ type Informer interface { HasSynced() bool // IsStopped returns true if the informer has been stopped. IsStopped() bool + + // LastSyncResourceVersion is the resource version observed when last synced with the underlying + // store. The value returned is not synchronized with access to the underlying store and is not + // thread-safe. + LastSyncResourceVersion() string } // AllNamespaces should be used as the map key to deliminate namespace settings diff --git a/pkg/cache/delegating_by_gvk_cache.go b/pkg/cache/delegating_by_gvk_cache.go index adc5d957a4..67e912f2d9 100644 --- a/pkg/cache/delegating_by_gvk_cache.go +++ b/pkg/cache/delegating_by_gvk_cache.go @@ -18,6 +18,7 @@ package cache import ( "context" + "fmt" "maps" "slices" "strings" @@ -69,6 +70,26 @@ func (dbt *delegatingByGVKCache) GetInformer(ctx context.Context, obj client.Obj return cache.GetInformer(ctx, obj, opts...) } +func (dbt *delegatingByGVKCache) SetMinimumRVForGVKAndKey(gvk schema.GroupVersionKind, key client.ObjectKey, rv int64) { + dbt.cacheForGVK(gvk).SetMinimumRVForGVKAndKey(gvk, key, rv) +} + +func (dbt *delegatingByGVKCache) AddRequiredDeleteForObject(obj client.Object) error { + cache, err := dbt.cacheForObject(obj) + if err != nil { + return err + } + return cache.AddRequiredDeleteForObject(obj) +} + +func (dbt *delegatingByGVKCache) RemoveRequiredDeleteForObject(obj client.Object) error { + cache, err := dbt.cacheForObject(obj) + if err != nil { + return fmt.Errorf("getting cache for object: %w", err) + } + return cache.RemoveRequiredDeleteForObject(obj) +} + func (dbt *delegatingByGVKCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...InformerGetOption) (Informer, error) { return dbt.cacheForGVK(gvk).GetInformerForKind(ctx, gvk, opts...) } diff --git a/pkg/cache/informer_cache.go b/pkg/cache/informer_cache.go index f8a1faa7b9..28a1100e13 100644 --- a/pkg/cache/informer_cache.go +++ b/pkg/cache/informer_cache.go @@ -76,7 +76,8 @@ func (ic *informerCache) Get(ctx context.Context, key client.ObjectKey, out clie if !started { return &ErrCacheNotStarted{} } - return cache.Reader.Get(ctx, key, out, opts...) + + return cache.Reader.Get(ctx, key, out, ic.Informers.MinimumRVs.GetForKey(gvk, key), opts...) } // List implements Reader. @@ -95,7 +96,7 @@ func (ic *informerCache) List(ctx context.Context, out client.ObjectList, opts . return &ErrCacheNotStarted{} } - return cache.Reader.List(ctx, out, opts...) + return cache.Reader.List(ctx, out, ic.Informers.MinimumRVs.GetMaxForGVK(*gvk), opts...) } // objectTypeForListObject tries to find the runtime.Object and associated GVK @@ -177,6 +178,50 @@ func (ic *informerCache) getInformerForKind(ctx context.Context, gvk schema.Grou return started, cache, nil } +func (ic *informerCache) SetMinimumRVForGVKAndKey(gvk schema.GroupVersionKind, key client.ObjectKey, rv int64) { + ic.MinimumRVs.Set(gvk, key, rv) +} + +func (ic *informerCache) AddRequiredDeleteForObject(obj client.Object) error { + gvk, err := apiutil.GVKForObject(obj, ic.scheme) + if err != nil { + return err + } + cache, started, ok := ic.Peek(gvk, obj) + if !ok { + return fmt.Errorf("informer for GVK %v not found in cache", gvk) + } + if !started { + return &ErrCacheNotStarted{} + } + if !cache.Informer.HasSynced() { + return fmt.Errorf("informer for GVK %v is not synced", gvk) + } + + cache.Reader.ConsistencyHandler.AddPendingDelete( + client.ObjectKey{Namespace: obj.GetNamespace(), Name: obj.GetName()}, + obj.GetUID(), + ) + return nil +} + +func (ic *informerCache) RemoveRequiredDeleteForObject(obj client.Object) error { + gvk, err := apiutil.GVKForObject(obj, ic.scheme) + if err != nil { + return fmt.Errorf("failed to get GVK for object: %w", err) + } + cache, _, ok := ic.Peek(gvk, obj) + if !ok { + return fmt.Errorf("informer for GVK %v not found in cache", gvk) + } + cache.Reader.ConsistencyHandler.RemovePendingDelete( + client.ObjectKey{Namespace: obj.GetNamespace(), Name: obj.GetName()}, + obj.GetUID(), + ) + + return nil +} + // RemoveInformer deactivates and removes the informer from the cache. func (ic *informerCache) RemoveInformer(_ context.Context, obj client.Object) error { gvk, err := apiutil.GVKForObject(obj, ic.scheme) diff --git a/pkg/cache/informertest/fake_cache.go b/pkg/cache/informertest/fake_cache.go index 5f907d774b..11fb8280e1 100644 --- a/pkg/cache/informertest/fake_cache.go +++ b/pkg/cache/informertest/fake_cache.go @@ -131,6 +131,20 @@ func (c *FakeInformers) IndexField(ctx context.Context, obj client.Object, field return nil } +// SetMinimumRVForGVKAndKey implements Cache. +func (c *FakeInformers) SetMinimumRVForGVKAndKey(_ schema.GroupVersionKind, _ client.ObjectKey, _ int64) { +} + +// AddRequiredDeleteForObject implements Cache. +func (c *FakeInformers) AddRequiredDeleteForObject(client.Object) error { + return nil +} + +// RemoveRequiredDeleteForObject implements Cache. +func (c *FakeInformers) RemoveRequiredDeleteForObject(client.Object) error { + return nil +} + // Get implements Cache. func (c *FakeInformers) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { return nil diff --git a/pkg/cache/internal/cache_reader.go b/pkg/cache/internal/cache_reader.go index 624869f590..32c8f8cbb7 100644 --- a/pkg/cache/internal/cache_reader.go +++ b/pkg/cache/internal/cache_reader.go @@ -30,13 +30,11 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/tools/cache" + "sigs.k8s.io/controller-runtime/pkg/cache/internal/readerconsistency" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/internal/field/selector" ) -// CacheReader is a client.Reader. -var _ client.Reader = &CacheReader{} - // CacheReader wraps a cache.Index to implement the client.Reader interface for a single type. type CacheReader struct { // indexer is the underlying indexer wrapped by this cache. @@ -52,10 +50,12 @@ type CacheReader struct { // Be very careful with this, when enabled you must DeepCopy any object before mutating it, // otherwise you will mutate the object in the cache. disableDeepCopy bool + + *readerconsistency.ConsistencyHandler } // Get checks the indexer for the object and writes a copy of it if found. -func (c *CacheReader) Get(_ context.Context, key client.ObjectKey, out client.Object, opts ...client.GetOption) error { +func (c *CacheReader) Get(ctx context.Context, key client.ObjectKey, out client.Object, minRV int64, opts ...client.GetOption) error { getOpts := client.GetOptions{} getOpts.ApplyOptions(opts) @@ -64,6 +64,10 @@ func (c *CacheReader) Get(_ context.Context, key client.ObjectKey, out client.Ob } storeKey := objectKeyToStoreKey(key) + if err := c.ConsistencyHandler.WaitForGet(ctx, key, minRV); err != nil { + return err + } + // Lookup the object from the indexer cache obj, exists, err := c.indexer.GetByKey(storeKey) if err != nil { @@ -109,7 +113,7 @@ func (c *CacheReader) Get(_ context.Context, key client.ObjectKey, out client.Ob } // List lists items out of the indexer and writes them to out. -func (c *CacheReader) List(_ context.Context, out client.ObjectList, opts ...client.ListOption) error { +func (c *CacheReader) List(ctx context.Context, out client.ObjectList, minRV int64, opts ...client.ListOption) error { var objs []any var err error @@ -120,6 +124,10 @@ func (c *CacheReader) List(_ context.Context, out client.ObjectList, opts ...cli return fmt.Errorf("continue list option is not supported by the cache") } + if err := c.ConsistencyHandler.WaitForList(ctx, minRV); err != nil { + return err + } + switch { case listOpts.FieldSelector != nil: requiresExact := selector.RequiresExactMatch(listOpts.FieldSelector) diff --git a/pkg/cache/internal/informers.go b/pkg/cache/internal/informers.go index 619e36abd3..7c34c5ec7e 100644 --- a/pkg/cache/internal/informers.go +++ b/pkg/cache/internal/informers.go @@ -38,6 +38,8 @@ import ( "k8s.io/client-go/metadata" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" + "sigs.k8s.io/controller-runtime/pkg/cache/internal/readerconsistency" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" logf "sigs.k8s.io/controller-runtime/pkg/internal/log" "sigs.k8s.io/controller-runtime/pkg/internal/syncs" @@ -101,6 +103,7 @@ func NewInformers(config *rest.Config, options *InformersOpts) *Informers { enableWatchBookmarks: options.EnableWatchBookmarks, newInformer: newInformer, watchErrorHandler: options.WatchErrorHandler, + MinimumRVs: newMinimumRVStore(), } } @@ -205,6 +208,12 @@ type Informers struct { // watchErrorHandler to be set by overriding the options // or to use the default watchErrorHandler watchErrorHandler cache.WatchErrorHandlerWithContext + + // MinimumRVs stores the minimum RVs we must have seen before returning reads. Due to RV + // being just a version, we can store this before we have an informer. This is + // different from deletes, where we can only observe deletes once we do have an informer, + // so we only allow storing required deletes as part of the informer. + MinimumRVs *minimumRVStore } // Start calls Run on each of the informers and sets started to true. Blocks on the context. @@ -412,14 +421,22 @@ func (ip *Informers) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.O return nil, false, err } + consistencyHandler := readerconsistency.NewHandler(func(currentRV int64) { + ip.MinimumRVs.Cleanup(gvk, currentRV) + }) + if _, err := sharedIndexInformer.AddEventHandler(consistencyHandler); err != nil { + return nil, false, fmt.Errorf("failed to add readerconsistency handler: %w", err) + } + // Create the new entry and set it in the map. i := &Cache{ Informer: sharedIndexInformer, Reader: CacheReader{ - indexer: sharedIndexInformer.GetIndexer(), - groupVersionKind: gvk, - scopeName: mapping.Scope.Name(), - disableDeepCopy: ip.unsafeDisableDeepCopy, + indexer: sharedIndexInformer.GetIndexer(), + groupVersionKind: gvk, + scopeName: mapping.Scope.Name(), + disableDeepCopy: ip.unsafeDisableDeepCopy, + ConsistencyHandler: consistencyHandler, }, stop: make(chan struct{}), } @@ -629,3 +646,67 @@ func restrictNamespaceBySelector(namespaceOpt string, s Selector) string { } return "" } + +type minimumRVStore struct { + mu sync.Mutex + minimums map[schema.GroupVersionKind]map[client.ObjectKey]int64 +} + +func newMinimumRVStore() *minimumRVStore { + return &minimumRVStore{ + minimums: make(map[schema.GroupVersionKind]map[client.ObjectKey]int64), + } +} + +func (s *minimumRVStore) Set(gvk schema.GroupVersionKind, key client.ObjectKey, rv int64) { + s.mu.Lock() + defer s.mu.Unlock() + + if s.minimums[gvk] == nil { + s.minimums[gvk] = make(map[client.ObjectKey]int64) + } + s.minimums[gvk][key] = rv +} + +func (s *minimumRVStore) GetForKey(gvk schema.GroupVersionKind, key client.ObjectKey) int64 { + s.mu.Lock() + defer s.mu.Unlock() + + keys, ok := s.minimums[gvk] + if !ok { + return 0 + } + return keys[key] +} + +func (s *minimumRVStore) GetMaxForGVK(gvk schema.GroupVersionKind) int64 { + s.mu.Lock() + defer s.mu.Unlock() + + keys, ok := s.minimums[gvk] + if !ok || len(keys) == 0 { + return 0 + } + var maxRV int64 + for _, rv := range keys { + if rv > maxRV { + maxRV = rv + } + } + return maxRV +} + +func (s *minimumRVStore) Cleanup(gvk schema.GroupVersionKind, currentRV int64) { + s.mu.Lock() + defer s.mu.Unlock() + + keys, ok := s.minimums[gvk] + if !ok { + return + } + for key, rv := range keys { + if rv <= currentRV { + delete(keys, key) + } + } +} diff --git a/pkg/cache/internal/readerconsistency/readerconsistency.go b/pkg/cache/internal/readerconsistency/readerconsistency.go new file mode 100644 index 0000000000..4f85241b9c --- /dev/null +++ b/pkg/cache/internal/readerconsistency/readerconsistency.go @@ -0,0 +1,236 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package readerconsistency + +import ( + "context" + "fmt" + "maps" + "strconv" + "sync" + + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/tools/cache" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func NewHandler(rvCleanup func(int64)) *ConsistencyHandler { + return &ConsistencyHandler{ + rvCond: sync.NewCond(&sync.Mutex{}), + pendingDeletesCond: sync.NewCond(&sync.Mutex{}), + pendingDeletes: make(map[client.ObjectKey]sets.Set[types.UID]), + rvCleanup: rvCleanup, + } +} + +type ConsistencyHandler struct { + rvCond *sync.Cond + observedRV int64 + + pendingDeletesCond *sync.Cond + // pendingDeletes holds pending deletes. Must only be acquired when holding pendingDeletesCond.L + pendingDeletes map[client.ObjectKey]sets.Set[types.UID] + + rvCleanup func(int64) +} + +func (h *ConsistencyHandler) AddPendingDelete(key client.ObjectKey, uid types.UID) { + h.pendingDeletesCond.L.Lock() + defer h.pendingDeletesCond.L.Unlock() + + if h.pendingDeletes[key] == nil { + h.pendingDeletes[key] = sets.New(uid) + return + } + h.pendingDeletes[key].Insert(uid) +} + +func (h *ConsistencyHandler) RemovePendingDelete(key client.ObjectKey, uid types.UID) { + h.pendingDeletesCond.L.Lock() + defer h.pendingDeletesCond.L.Unlock() + + if h.pendingDeletes[key] != nil { + h.pendingDeletes[key].Delete(uid) + if len(h.pendingDeletes[key]) == 0 { + delete(h.pendingDeletes, key) + } + h.pendingDeletesCond.Broadcast() + } +} + +func (h *ConsistencyHandler) WaitForList(ctx context.Context, minRV int64) error { + if err := h.waitForRV(ctx, minRV); err != nil { + return err + } + + return h.waitAllDeletes(ctx) +} + +func (h *ConsistencyHandler) WaitForGet(ctx context.Context, key client.ObjectKey, minRV int64) error { + if err := h.waitForRV(ctx, minRV); err != nil { + return err + } + + return h.waitDeletesForKey(ctx, key) +} + +// waitDeletesForKey blocks until all pending deletes at the time of calling it were observed or context times out +func (h *ConsistencyHandler) waitDeletesForKey(ctx context.Context, key client.ObjectKey) error { + h.pendingDeletesCond.L.Lock() + pendingDeletes := maps.Clone(h.pendingDeletes[key]) + h.pendingDeletesCond.L.Unlock() + + return h.waitDeletes(ctx, pendingDeletes) +} + +// waitDeletesForGVK blocks until all pending deletes at the time of calling it were observed or context times out +func (h *ConsistencyHandler) waitAllDeletes(ctx context.Context) error { + h.pendingDeletesCond.L.Lock() + pendingDeletes := sets.Set[types.UID]{} + for _, uids := range h.pendingDeletes { + maps.Copy(pendingDeletes, uids) + } + h.pendingDeletesCond.L.Unlock() + + return h.waitDeletes(ctx, pendingDeletes) +} + +func (h *ConsistencyHandler) waitDeletes(ctx context.Context, uids sets.Set[types.UID]) error { + if len(uids) == 0 { + return nil + } + + allDeleted := make(chan struct{}) + go func() { + h.pendingDeletesCond.L.Lock() + for !h.allDeletedLocked(uids) { + if ctx.Err() != nil { + break + } + h.pendingDeletesCond.Wait() + } + h.pendingDeletesCond.L.Unlock() + close(allDeleted) + }() + + select { + case <-ctx.Done(): + return ctx.Err() + case <-allDeleted: + return nil + } +} + +func (h *ConsistencyHandler) allDeletedLocked(uids sets.Set[types.UID]) bool { + for wantDeleted := range uids { + for _, notDeletedUIDs := range h.pendingDeletes { + if notDeletedUIDs.Has(wantDeleted) { + return false + } + } + } + + return true +} + +func (h *ConsistencyHandler) observeDeletion(obj client.Object) { + key := client.ObjectKey{Namespace: obj.GetNamespace(), Name: obj.GetName()} + h.pendingDeletesCond.L.Lock() + defer h.pendingDeletesCond.L.Unlock() + + if h.pendingDeletes[key].Has(obj.GetUID()) { + h.pendingDeletes[key].Delete(obj.GetUID()) + h.pendingDeletesCond.Broadcast() + } + if len(h.pendingDeletes[key]) == 0 { + delete(h.pendingDeletes, key) + } +} + +func (h *ConsistencyHandler) waitForRV(ctx context.Context, rv int64) error { + observed := make(chan struct{}) + go func() { + h.rvCond.L.Lock() + for h.observedRV < rv { + if ctx.Err() != nil { + break + } + h.rvCond.Wait() + } + h.rvCond.L.Unlock() + close(observed) + }() + + select { + case <-ctx.Done(): + return ctx.Err() + case <-observed: + return nil + } +} + +func (h *ConsistencyHandler) observeResourceVersion(rv string) { + parsed, err := strconv.ParseInt(rv, 10, 64) + if err != nil { + fmt.Printf("Failed to parse resource version %s: %v\n", rv, err) + return + } + + h.rvCond.L.Lock() + defer h.rvCond.L.Unlock() + + if parsed > h.observedRV { + h.observedRV = parsed + h.rvCond.Broadcast() + } + + go h.rvCleanup(parsed) +} + +func (h *ConsistencyHandler) OnAdd(raw any, _ bool) { + obj, ok := raw.(client.Object) + if !ok { + // TODO: Should never happen, log an error? + return + } + go func() { h.observeResourceVersion(obj.GetResourceVersion()) }() +} + +func (h *ConsistencyHandler) OnUpdate(_, newObj any) { + obj, ok := newObj.(client.Object) + if !ok { + // TODO: Should never happen, log an error? + return + } + go func() { h.observeResourceVersion(obj.GetResourceVersion()) }() +} + +func (h *ConsistencyHandler) OnDelete(raw any) { + var obj client.Object + switch t := raw.(type) { + case client.Object: + obj = t + case cache.DeletedFinalStateUnknown: + obj = t.Obj.(client.Object) + default: + // TODO: Should never happen, log an error? + return + } + go func() { h.observeResourceVersion(obj.GetResourceVersion()) }() + go func() { h.observeDeletion(obj) }() +} diff --git a/pkg/cache/multi_namespace_cache.go b/pkg/cache/multi_namespace_cache.go index d7d7b0e7c2..a24380eb12 100644 --- a/pkg/cache/multi_namespace_cache.go +++ b/pkg/cache/multi_namespace_cache.go @@ -223,6 +223,42 @@ func (c *multiNamespaceCache) IndexField(ctx context.Context, obj client.Object, return nil } +func (c *multiNamespaceCache) SetMinimumRVForGVKAndKey(gvk schema.GroupVersionKind, key client.ObjectKey, rv int64) { + if key.Namespace == "" { + if c.clusterCache != nil { + c.clusterCache.SetMinimumRVForGVKAndKey(gvk, key, rv) + } + return + } + if cache, ok := c.namespaceToCache[key.Namespace]; ok { + cache.SetMinimumRVForGVKAndKey(gvk, key, rv) + return + } + if global, ok := c.namespaceToCache[metav1.NamespaceAll]; ok { + global.SetMinimumRVForGVKAndKey(gvk, key, rv) + } +} + +func (c *multiNamespaceCache) AddRequiredDeleteForObject(obj client.Object) error { + if ns := obj.GetNamespace(); ns == "" && c.clusterCache != nil { + return c.clusterCache.AddRequiredDeleteForObject(obj) + } else if cache, ok := c.namespaceToCache[ns]; ok { + return cache.AddRequiredDeleteForObject(obj) + } + + return nil +} + +func (c *multiNamespaceCache) RemoveRequiredDeleteForObject(obj client.Object) error { + if ns := obj.GetNamespace(); ns == "" && c.clusterCache != nil { + return c.clusterCache.RemoveRequiredDeleteForObject(obj) + } else if cache, ok := c.namespaceToCache[ns]; ok { + return cache.RemoveRequiredDeleteForObject(obj) + } + + return nil +} + func (c *multiNamespaceCache) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { isNamespaced, err := apiutil.IsObjectNamespaced(obj, c.Scheme, c.RESTMapper) if err != nil { @@ -445,3 +481,12 @@ func (i *multiNamespaceInformer) IsStopped() bool { } return true } + +// LastSyncResourceVersion returns the resource version from the last namespace informer. +func (i *multiNamespaceInformer) LastSyncResourceVersion() string { + var rv string + for _, informer := range i.namespaceToInformer { + rv = informer.LastSyncResourceVersion() + } + return rv +} diff --git a/pkg/client/applyconfigurations.go b/pkg/client/applyconfigurations.go index 97192050f9..3a1864847d 100644 --- a/pkg/client/applyconfigurations.go +++ b/pkg/client/applyconfigurations.go @@ -39,28 +39,6 @@ func ApplyConfigurationFromUnstructured(u *unstructured.Unstructured) runtime.Ap return &unstructuredApplyConfiguration{Unstructured: u} } -type applyconfigurationRuntimeObject struct { - runtime.ApplyConfiguration -} - -func (a *applyconfigurationRuntimeObject) GetObjectKind() schema.ObjectKind { - return a -} - -func (a *applyconfigurationRuntimeObject) GroupVersionKind() schema.GroupVersionKind { - return schema.GroupVersionKind{} -} - -func (a *applyconfigurationRuntimeObject) SetGroupVersionKind(gvk schema.GroupVersionKind) {} - -func (a *applyconfigurationRuntimeObject) DeepCopyObject() runtime.Object { - panic("applyconfigurationRuntimeObject does not support DeepCopyObject") -} - -func runtimeObjectFromApplyConfiguration(ac runtime.ApplyConfiguration) runtime.Object { - return &applyconfigurationRuntimeObject{ApplyConfiguration: ac} -} - func gvkFromApplyConfiguration(ac applyConfiguration) (schema.GroupVersionKind, error) { var gvk schema.GroupVersionKind gv, err := schema.ParseGroupVersion(ptr.Deref(ac.GetAPIVersion(), "")) diff --git a/pkg/client/client.go b/pkg/client/client.go index ad946daeaa..2e8ed9dfeb 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -25,9 +25,11 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/metadata" "k8s.io/client-go/rest" @@ -85,6 +87,8 @@ type CacheOptions struct { // read unstructured objects or lists from the cache. // If false, unstructured objects will always result in a live lookup. Unstructured bool + + ReadYourOwnWriteConsistencyEnabled bool } // NewClientFunc allows a user to define how to create a client. @@ -128,7 +132,7 @@ func New(config *rest.Config, options Options) (c Client, err error) { return c, err } -func newClient(config *rest.Config, options Options) (*client, error) { +func newClient(config *rest.Config, options Options) (Client, error) { if config == nil { return nil, fmt.Errorf("must provide non-nil rest.Config to client.New") } @@ -219,7 +223,20 @@ func newClient(config *rest.Config, options Options) (*client, error) { } c.uncachedGVKs[gvk] = struct{}{} } - return c, nil + + if !options.Cache.ReadYourOwnWriteConsistencyEnabled { + return c, nil + } + + informerCache, isCache := options.Cache.Reader.(cache) + if !isCache { + return nil, fmt.Errorf("cache reader does not implement %T, can not provide ReadYourOwnWriteConsistency", cache(nil)) + } + return &consistentClient{ + upstream: c, + cache: informerCache, + lockedKeysByGVK: threadSafeMap[schema.GroupVersionKind, *threadSafeMap[types.NamespacedName, *keyLocker]]{}, + }, nil } var _ Client = &client{} @@ -319,11 +336,17 @@ func (c *client) Update(ctx context.Context, obj Object, opts ...UpdateOption) e // Delete implements client.Client. func (c *client) Delete(ctx context.Context, obj Object, opts ...DeleteOption) error { + _, err := c.delete(ctx, obj, opts...) + return err +} + +// delete issues a delete call and returns the response or an error. The response +// gets deserialized into an unstructured and is either a metav1.Status if the object +// is gone from storage or the object if it remains, for example because of finalizers. +func (c *client) delete(ctx context.Context, obj Object, opts ...DeleteOption) (*unstructured.Unstructured, error) { switch obj.(type) { case runtime.Unstructured: return c.unstructuredClient.Delete(ctx, obj, opts...) - case *metav1.PartialObjectMetadata: - return c.metadataClient.Delete(ctx, obj, opts...) default: return c.typedClient.Delete(ctx, obj, opts...) } diff --git a/pkg/client/consistency.go b/pkg/client/consistency.go new file mode 100644 index 0000000000..e3c7a564a0 --- /dev/null +++ b/pkg/client/consistency.go @@ -0,0 +1,382 @@ +package client + +import ( + "context" + "errors" + "fmt" + "reflect" + "strconv" + "sync" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" +) + +type cache interface { + SetMinimumRVForGVKAndKey(gvk schema.GroupVersionKind, key ObjectKey, rv int64) + AddRequiredDeleteForObject(Object) error + RemoveRequiredDeleteForObject(Object) error +} + +var _ Client = (*consistentClient)(nil) + +type consistentClient struct { + upstream *client + cache cache + + // lockedKeysByGVK maps gvk -> key -> keyLocker + lockedKeysByGVK threadSafeMap[schema.GroupVersionKind, *threadSafeMap[types.NamespacedName, *keyLocker]] +} + +func (c *consistentClient) Get(ctx context.Context, key ObjectKey, obj Object, opts ...GetOption) error { + gvk, err := apiutil.GVKForObject(obj, c.upstream.Scheme()) + if err != nil { + return fmt.Errorf("failed to get GVK for object %T: %w", obj, err) + } + + keyLock := c.lockedKeysByGVK.getOrCreate(gvk).getOrCreate(key) + if err := keyLock.wait(ctx); err != nil { + return err + } + + return c.upstream.Get(ctx, key, obj, opts...) +} + +func (c *consistentClient) List(ctx context.Context, list ObjectList, opts ...ListOption) error { + gvk, err := apiutil.GVKForObject(list, c.upstream.Scheme()) + if err != nil { + return fmt.Errorf("failed to get GVK for list %T: %w", list, err) + } + + keys := c.lockedKeysByGVK.getOrCreate(gvk).allValues() + for _, keyLock := range keys { + if err := keyLock.wait(ctx); err != nil { + return err + } + } + + return c.upstream.List(ctx, list, opts...) +} + +func (c *consistentClient) Create(ctx context.Context, obj Object, opts ...CreateOption) error { + gvk, err := apiutil.GVKForObject(obj, c.upstream.Scheme()) + if err != nil { + return fmt.Errorf("failed to get GVK for object %v: %w", obj, err) + } + + namespacedName := types.NamespacedName{Namespace: obj.GetNamespace(), Name: obj.GetName()} + + keyLock := c.lockedKeysByGVK.getOrCreate(gvk).getOrCreate(namespacedName) + if err := keyLock.lock(ctx); err != nil { + return fmt.Errorf("failed to acquire lock for %s/%s: %w", namespacedName.Namespace, namespacedName.Name, err) + } + defer keyLock.unlock() + + if err := c.upstream.Create(ctx, obj, opts...); err != nil { + return err + } + + rvRaw := obj.GetResourceVersion() + rv, err := strconv.ParseInt(rvRaw, 10, 64) + if err != nil { + return fmt.Errorf("failed to parse resource version %s: %w", rvRaw, err) + } + c.cache.SetMinimumRVForGVKAndKey(gvk, namespacedName, rv) + + return nil +} + +func (c *consistentClient) Update(ctx context.Context, obj Object, opts ...UpdateOption) error { + gvk, err := apiutil.GVKForObject(obj, c.upstream.Scheme()) + if err != nil { + return fmt.Errorf("failed to get GVK for object %v: %w", obj, err) + } + + namespacedName := types.NamespacedName{Namespace: obj.GetNamespace(), Name: obj.GetName()} + + keyLock := c.lockedKeysByGVK.getOrCreate(gvk).getOrCreate(namespacedName) + if err := keyLock.lock(ctx); err != nil { + return fmt.Errorf("failed to acquire lock for %s/%s: %w", namespacedName.Namespace, namespacedName.Name, err) + } + defer keyLock.unlock() + + if err := c.upstream.Update(ctx, obj, opts...); err != nil { + return err + } + + rvRaw := obj.GetResourceVersion() + rv, err := strconv.ParseInt(rvRaw, 10, 64) + if err != nil { + return fmt.Errorf("failed to parse resource version %s: %w", rvRaw, err) + } + c.cache.SetMinimumRVForGVKAndKey(gvk, namespacedName, rv) + + return nil +} + +func (c *consistentClient) Patch(ctx context.Context, obj Object, patch Patch, opts ...PatchOption) error { + gvk, err := apiutil.GVKForObject(obj, c.upstream.Scheme()) + if err != nil { + return fmt.Errorf("failed to get GVK for object %v: %w", obj, err) + } + + namespacedName := types.NamespacedName{Namespace: obj.GetNamespace(), Name: obj.GetName()} + + keyLock := c.lockedKeysByGVK.getOrCreate(gvk).getOrCreate(namespacedName) + if err := keyLock.lock(ctx); err != nil { + return fmt.Errorf("failed to acquire lock for %s/%s: %w", namespacedName.Namespace, namespacedName.Name, err) + } + defer keyLock.unlock() + + if err := c.upstream.Patch(ctx, obj, patch, opts...); err != nil { + return err + } + + rvRaw := obj.GetResourceVersion() + rv, err := strconv.ParseInt(rvRaw, 10, 64) + if err != nil { + return fmt.Errorf("failed to parse resource version %s: %w", rvRaw, err) + } + c.cache.SetMinimumRVForGVKAndKey(gvk, namespacedName, rv) + + return nil +} + +func (c *consistentClient) Apply(ctx context.Context, obj runtime.ApplyConfiguration, opts ...ApplyOption) error { + var gvk schema.GroupVersionKind + var namespacedName types.NamespacedName + var getResourceVersion func() (string, error) + switch t := obj.(type) { + case *unstructuredApplyConfiguration: + gvk = t.Unstructured.GroupVersionKind() + namespacedName.Namespace = t.Unstructured.GetNamespace() + namespacedName.Name = t.Unstructured.GetName() + getResourceVersion = func() (string, error) { + return t.Unstructured.GetResourceVersion(), nil + } + case applyConfiguration: + gv, err := schema.ParseGroupVersion(*t.GetAPIVersion()) + if err != nil { + return fmt.Errorf("failed to parse group version %s: %w", *t.GetAPIVersion(), err) + } + gvk.Group = gv.Group + gvk.Version = gv.Version + gvk.Kind = *t.GetKind() + namespacedName.Namespace = *t.GetNamespace() + namespacedName.Name = *t.GetName() + getResourceVersion = func() (string, error) { + return resourceVersionFromApplyConfiguration(t) + } + default: + return fmt.Errorf("unsupported type for Apply: %T, must be either %T or %T", obj, &unstructuredApplyConfiguration{}, applyConfiguration(nil)) + } + + keyLock := c.lockedKeysByGVK.getOrCreate(gvk).getOrCreate(namespacedName) + if err := keyLock.lock(ctx); err != nil { + return fmt.Errorf("failed to acquire lock for %s/%s: %w", namespacedName.Namespace, namespacedName.Name, err) + } + defer keyLock.unlock() + + if err := c.upstream.Apply(ctx, obj, opts...); err != nil { + return err + } + + rvRaw, err := getResourceVersion() + if err != nil { + return fmt.Errorf("failed to get resource version from apply configuration: %w", err) + } + rv, err := strconv.ParseInt(rvRaw, 10, 64) + if err != nil { + return fmt.Errorf("failed to parse resource version %s: %w", rvRaw, err) + } + c.cache.SetMinimumRVForGVKAndKey(gvk, namespacedName, rv) + + return nil +} + +func resourceVersionFromApplyConfiguration(obj applyConfiguration) (string, error) { + v := reflect.ValueOf(obj) + for v.Kind() == reflect.Pointer { + v = v.Elem() + } + if v.Kind() != reflect.Struct { + return "", fmt.Errorf("expected struct, got %s", v.Kind()) + } + rv := v.FieldByName("ResourceVersion") + if !rv.IsValid() { + return "", fmt.Errorf("type %T has no ResourceVersion field", obj) + } + if rv.Kind() != reflect.Ptr || rv.Type().Elem().Kind() != reflect.String { + return "", fmt.Errorf("ResourceVersion field in %T is not *string", obj) + } + if rv.IsNil() { + return "", fmt.Errorf("ResourceVersion field in %T is nil", obj) + } + return rv.Elem().String(), nil +} + +func (c *consistentClient) Delete(ctx context.Context, obj Object, opts ...DeleteOption) error { + gvk, err := apiutil.GVKForObject(obj, c.upstream.Scheme()) + if err != nil { + return fmt.Errorf("failed to get GVK for object %v: %w", obj, err) + } + + namespacedName := types.NamespacedName{Namespace: obj.GetNamespace(), Name: obj.GetName()} + + keyLock := c.lockedKeysByGVK.getOrCreate(gvk).getOrCreate(namespacedName) + if err := keyLock.lock(ctx); err != nil { + return fmt.Errorf("failed to acquire lock for %s/%s: %w", namespacedName.Namespace, namespacedName.Name, err) + } + defer keyLock.unlock() + + // Register the delete before we execute it, otherwise it may be in the cache + // before we register it, causing a deadlock. + if err := c.cache.AddRequiredDeleteForObject(obj); err != nil { + return fmt.Errorf("failed to add required delete for object: %w", err) + } + + response, err := c.upstream.delete(ctx, obj, opts...) + if err != nil { + if removeErr := c.cache.RemoveRequiredDeleteForObject(obj); removeErr != nil { + return errors.Join(err, fmt.Errorf("failed to remove required delete for object after delete error: %w", removeErr)) + } + return err + } + + if rvRaw := response.GetResourceVersion(); rvRaw != "" { + if err := c.cache.RemoveRequiredDeleteForObject(obj); err != nil { + return fmt.Errorf("failed to remove required delete for object after successful delete: %w", err) + } + rv, err := strconv.ParseInt(rvRaw, 10, 64) + if err != nil { + return fmt.Errorf("failed to parse resource version %s: %w", rvRaw, err) + } + c.cache.SetMinimumRVForGVKAndKey(gvk, namespacedName, rv) + } + + return nil +} + +func (c *consistentClient) DeleteAllOf(ctx context.Context, obj Object, opts ...DeleteAllOfOption) error { + return errors.New("DeleteAllOf is not supported by consistentClient, please use List and Delete instead") +} + +func (c *consistentClient) Status() SubResourceWriter { + return c.SubResource("status") +} + +func (c *consistentClient) SubResource(subResource string) SubResourceClient { + panic("not implemented") +} + +func (c *consistentClient) Scheme() *runtime.Scheme { + return c.upstream.Scheme() +} + +func (c *consistentClient) RESTMapper() meta.RESTMapper { + return c.upstream.RESTMapper() +} + +func (c *consistentClient) GroupVersionKindFor(obj runtime.Object) (schema.GroupVersionKind, error) { + return c.upstream.GroupVersionKindFor(obj) +} + +func (c *consistentClient) IsObjectNamespaced(obj runtime.Object) (bool, error) { + return c.upstream.IsObjectNamespaced(obj) +} + +// keyLocker implements a mutex with context support +// that also allows to wait for the current lock to +// be released. +// TODO: find a better name +type keyLocker struct { + // mutex must be held to access done + mutex sync.Mutex + // done is nil when no one is holding the lock + done chan struct{} +} + +func (l *keyLocker) lock(ctx context.Context) error { + for { + l.mutex.Lock() + if l.done == nil { + l.done = make(chan struct{}) + l.mutex.Unlock() + return nil + } + + done := l.done + l.mutex.Unlock() + select { + case <-done: // released, try acquire + case <-ctx.Done(): + return ctx.Err() + } + } +} + +func (l *keyLocker) unlock() { + l.mutex.Lock() + defer l.mutex.Unlock() + + if l.done == nil { + panic("unlock of unlocked mutex") + } + close(l.done) + l.done = nil +} + +// wait waits for the current lock holder if any to +// release the lock. +func (l *keyLocker) wait(ctx context.Context) error { + l.mutex.Lock() + done := l.done + l.mutex.Unlock() + + if done == nil { + return nil + } + + select { + case <-done: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +type threadSafeMap[k comparable, v any] struct { + lock sync.Mutex + data map[k]v +} + +func (t *threadSafeMap[k, v]) getOrCreate(key k) v { + t.lock.Lock() + defer t.lock.Unlock() + + val, exists := t.data[key] + if !exists { + if t.data == nil { + t.data = make(map[k]v) + } + val = reflect.New(reflect.TypeOf(val).Elem()).Interface().(v) + t.data[key] = val + } + + return val +} + +func (t *threadSafeMap[k, v]) allValues() []v { + t.lock.Lock() + defer t.lock.Unlock() + + result := make([]v, 0, len(t.data)) + for _, val := range t.data { + result = append(result, val) + } + + return result +} diff --git a/pkg/client/consistency_envtest_test.go b/pkg/client/consistency_envtest_test.go new file mode 100644 index 0000000000..17ffc0f851 --- /dev/null +++ b/pkg/client/consistency_envtest_test.go @@ -0,0 +1,257 @@ +package client_test + +import ( + "context" + "fmt" + "sync/atomic" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + corev1ac "k8s.io/client-go/applyconfigurations/core/v1" + kscheme "k8s.io/client-go/kubernetes/scheme" + + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +var _ = Describe("ConsistentClient", func() { + var ( + cl client.Client + ctx context.Context + cancel context.CancelFunc + counter uint64 + ) + + BeforeEach(func(specCtx context.Context) { + ctx, cancel = context.WithCancel(specCtx) + + c, err := cache.New(cfg, cache.Options{Scheme: kscheme.Scheme}) + Expect(err).NotTo(HaveOccurred()) + + // Set up informers for types used through the consistent client. + _, err = c.GetInformer(ctx, &corev1.Namespace{}) + Expect(err).NotTo(HaveOccurred()) + _, err = c.GetInformer(ctx, &corev1.ConfigMap{}) + Expect(err).NotTo(HaveOccurred()) + + go func() { + defer GinkgoRecover() + Expect(c.Start(ctx)).To(Succeed()) + }() + Expect(c.WaitForCacheSync(ctx)).To(BeTrue()) + + cl, err = client.New(cfg, client.Options{ + Scheme: kscheme.Scheme, + Cache: &client.CacheOptions{ + Reader: c, + ReadYourOwnWriteConsistencyEnabled: true, + }, + }) + Expect(err).NotTo(HaveOccurred()) + }) + + AfterEach(func() { + cancel() + }) + + newConfigMap := func(ns string) *corev1.ConfigMap { + n := atomic.AddUint64(&counter, 1) + return &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("consistency-test-%d", n), + Namespace: ns, + }, + Data: map[string]string{"key": "value"}, + } + } + + type writeResult struct { + name string + deleted bool + data map[string]string + } + + DescribeTable("write then read", + func(ctx context.Context, write func(ctx context.Context, cl client.Client, cm *corev1.ConfigMap) (writeResult, error)) { + ns := &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("consistency-wtr-%d", atomic.AddUint64(&counter, 1))}} + Expect(cl.Create(ctx, ns)).To(Succeed()) + DeferCleanup(func(ctx context.Context) { + Expect(client.IgnoreNotFound(cl.Delete(ctx, ns))).To(Succeed()) + }) + + cm := newConfigMap(ns.Name) + Expect(cl.Create(ctx, cm)).To(Succeed()) + DeferCleanup(func(ctx context.Context) { + Expect(client.IgnoreNotFound(cl.Delete(ctx, cm))).To(Succeed()) + }) + + result, err := write(ctx, cl, cm) + Expect(err).NotTo(HaveOccurred()) + + done := make(chan struct{}) + + go func() { + defer GinkgoRecover() + defer func() { done <- struct{}{} }() + + if result.deleted { + err := cl.Get(ctx, client.ObjectKeyFromObject(cm), &corev1.ConfigMap{}) + Expect(apierrors.IsNotFound(err)).To(BeTrue(), "expected NotFound after delete, got: %v", err) + } else { + got := &corev1.ConfigMap{} + Expect(cl.Get(ctx, client.ObjectKeyFromObject(cm), got)).To(Succeed()) + Expect(got.Name).To(Equal(result.name)) + Expect(got.Data).To(Equal(result.data)) + } + }() + + go func() { + defer GinkgoRecover() + defer func() { done <- struct{}{} }() + + list := &corev1.ConfigMapList{} + Expect(cl.List(ctx, list, client.InNamespace(ns.Name))).To(Succeed()) + + if result.deleted { + Expect(list.Items).To(BeEmpty(), "list should be empty after delete") + } else { + Expect(list.Items).To(HaveLen(1), "list should contain exactly one ConfigMap") + Expect(list.Items[0].Name).To(Equal(result.name)) + Expect(list.Items[0].Data).To(Equal(result.data)) + } + }() + + <-done + <-done + }, + + Entry("create", func(ctx context.Context, cl client.Client, cm *corev1.ConfigMap) (writeResult, error) { + return writeResult{ + name: cm.Name, + data: cm.Data, + }, nil // already created in the setup + }), + + Entry("update", func(ctx context.Context, cl client.Client, cm *corev1.ConfigMap) (writeResult, error) { + got := &corev1.ConfigMap{} + if err := cl.Get(ctx, client.ObjectKeyFromObject(cm), got); err != nil { + return writeResult{}, err + } + got.Data["key"] = "updated" + if err := cl.Update(ctx, got); err != nil { + return writeResult{}, err + } + return writeResult{ + name: cm.Name, + data: map[string]string{"key": "updated"}, + }, nil + }), + + Entry("patch", func(ctx context.Context, cl client.Client, cm *corev1.ConfigMap) (writeResult, error) { + got := &corev1.ConfigMap{} + if err := cl.Get(ctx, client.ObjectKeyFromObject(cm), got); err != nil { + return writeResult{}, err + } + patch := client.MergeFrom(got.DeepCopy()) + got.Data["patched"] = "yes" + if err := cl.Patch(ctx, got, patch); err != nil { + return writeResult{}, err + } + return writeResult{ + name: cm.Name, + data: map[string]string{"key": "value", "patched": "yes"}, + }, nil + }), + + Entry("apply", func(ctx context.Context, cl client.Client, cm *corev1.ConfigMap) (writeResult, error) { + ac := corev1ac.ConfigMap(cm.Name, cm.Namespace). + WithData(map[string]string{"key": "applied"}) + if err := cl.Apply(ctx, ac, client.FieldOwner("consistency-test"), client.ForceOwnership); err != nil { + return writeResult{}, err + } + return writeResult{ + name: cm.Name, + data: map[string]string{"key": "applied"}, + }, nil + }), + + Entry("delete", func(ctx context.Context, cl client.Client, cm *corev1.ConfigMap) (writeResult, error) { + if err := cl.Delete(ctx, cm); err != nil { + return writeResult{}, err + } + return writeResult{ + name: cm.Name, + deleted: true, + }, nil + }), + ) + + Describe("Multiple sequential writes then Get", func() { + It("should observe the final state after multiple updates", func() { + cm := newConfigMap("default") + Expect(cl.Create(ctx, cm)).To(Succeed()) + DeferCleanup(func(ctx context.Context) { + Expect(client.IgnoreNotFound(cl.Delete(ctx, cm))).To(Succeed()) + }) + + for i := range 5 { + got := &corev1.ConfigMap{} + Expect(cl.Get(ctx, client.ObjectKeyFromObject(cm), got)).To(Succeed()) + got.Data["key"] = fmt.Sprintf("iteration-%d", i) + Expect(cl.Update(ctx, got)).To(Succeed()) + } + + final := &corev1.ConfigMap{} + Expect(cl.Get(ctx, client.ObjectKeyFromObject(cm), final)).To(Succeed()) + Expect(final.Data["key"]).To(Equal("iteration-4")) + }) + }) + + Describe("Create with namespace-scoped object", func() { + It("should work across different namespaces", func() { + ns := &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("consistency-ns-%d", atomic.AddUint64(&counter, 1))}} + Expect(cl.Create(ctx, ns)).To(Succeed()) + DeferCleanup(func(ctx context.Context) { + Expect(client.IgnoreNotFound(cl.Delete(ctx, ns))).To(Succeed()) + }) + + cm := newConfigMap(ns.Name) + Expect(cl.Create(ctx, cm)).To(Succeed()) + DeferCleanup(func(ctx context.Context) { + Expect(client.IgnoreNotFound(cl.Delete(ctx, cm))).To(Succeed()) + }) + + got := &corev1.ConfigMap{} + Expect(cl.Get(ctx, client.ObjectKeyFromObject(cm), got)).To(Succeed()) + Expect(got.Name).To(Equal(cm.Name)) + Expect(got.Namespace).To(Equal(ns.Name)) + }) + }) + + Describe("Delete object with finalizer then Get", func() { + It("should observe the updated object with deletion timestamp after delete", func() { + cm := newConfigMap("default") + cm.Finalizers = []string{"test.io/hold"} + Expect(cl.Create(ctx, cm)).To(Succeed()) + DeferCleanup(func(ctx context.Context) { + got := &corev1.ConfigMap{} + if err := cl.Get(ctx, client.ObjectKeyFromObject(cm), got); err == nil { + got.Finalizers = nil + Expect(cl.Update(ctx, got)).To(Succeed()) + } + }) + + got := &corev1.ConfigMap{} + Expect(cl.Get(ctx, client.ObjectKeyFromObject(cm), got)).To(Succeed()) + + Expect(cl.Delete(ctx, cm)).To(Succeed()) + + afterDelete := &corev1.ConfigMap{} + Expect(cl.Get(ctx, client.ObjectKeyFromObject(cm), afterDelete)).To(Succeed()) + Expect(afterDelete.DeletionTimestamp).NotTo(BeNil(), "should have a deletion timestamp") + }) + }) +}) diff --git a/pkg/client/typed_client.go b/pkg/client/typed_client.go index 66ae2e4a5c..b84b4ff1d2 100644 --- a/pkg/client/typed_client.go +++ b/pkg/client/typed_client.go @@ -18,14 +18,15 @@ package client import ( "context" + "encoding/json" "fmt" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/util/apply" ) var _ Reader = &typedClient{} -var _ Writer = &typedClient{} type typedClient struct { resources *clientRestResources @@ -72,22 +73,34 @@ func (c *typedClient) Update(ctx context.Context, obj Object, opts ...UpdateOpti } // Delete implements client.Client. -func (c *typedClient) Delete(ctx context.Context, obj Object, opts ...DeleteOption) error { +func (c *typedClient) Delete(ctx context.Context, obj Object, opts ...DeleteOption) (*unstructured.Unstructured, error) { o, err := c.resources.getObjMeta(obj) if err != nil { - return err + return nil, err } deleteOpts := DeleteOptions{} deleteOpts.ApplyOptions(opts) - return o.Delete(). + runtimeObj, err := o.Delete(). NamespaceIfScoped(o.namespace, o.isNamespaced()). Resource(o.resource()). Name(o.name). Body(deleteOpts.AsDeleteOptions()). Do(ctx). - Error() + Get() + if err != nil { + return nil, err + } + data, err := json.Marshal(runtimeObj) + if err != nil { + return nil, fmt.Errorf("failed to marshal delete response: %w", err) + } + response := &unstructured.Unstructured{} + if err := json.Unmarshal(data, &response.Object); err != nil { + return nil, fmt.Errorf("failed to unmarshal delete response: %w", err) + } + return response, nil } // DeleteAllOf implements client.Client. @@ -146,17 +159,21 @@ func (c *typedClient) Apply(ctx context.Context, obj runtime.ApplyConfiguration, applyOpts := &ApplyOptions{} applyOpts.ApplyOptions(opts) - return req. + into := &unstructured.Unstructured{} + if err := req. NamespaceIfScoped(o.namespace, o.isNamespaced()). Resource(o.resource()). Name(o.name). VersionedParams(applyOpts.AsPatchOptions(), c.paramCodec). Do(ctx). - // This is hacky, it is required because `Into` takes a `runtime.Object` and - // that is not implemented by the ApplyConfigurations. The generated clients - // don't have this problem because they deserialize into the api type, not the - // apply configuration: https://github.com/kubernetes/kubernetes/blob/22f5e01a37c0bc6a5f494dec14dd4e3688ee1d55/staging/src/k8s.io/client-go/gentype/type.go#L296-L317 - Into(runtimeObjectFromApplyConfiguration(obj)) + Into(into); err != nil { + return err + } + raw, err := json.Marshal(into) + if err != nil { + return err + } + return json.Unmarshal(raw, &obj) } // Get implements client.Client. @@ -324,16 +341,20 @@ func (c *typedClient) ApplySubResource(ctx context.Context, obj runtime.ApplyCon return fmt.Errorf("failed to create apply request: %w", err) } - return req. + into := &unstructured.Unstructured{} + if err := req. NamespaceIfScoped(o.namespace, o.isNamespaced()). Resource(o.resource()). Name(o.name). SubResource(subResource). VersionedParams(applyOpts.AsPatchOptions(), c.paramCodec). Do(ctx). - // This is hacky, it is required because `Into` takes a `runtime.Object` and - // that is not implemented by the ApplyConfigurations. The generated clients - // don't have this problem because they deserialize into the api type, not the - // apply configuration: https://github.com/kubernetes/kubernetes/blob/22f5e01a37c0bc6a5f494dec14dd4e3688ee1d55/staging/src/k8s.io/client-go/gentype/type.go#L296-L317 - Into(runtimeObjectFromApplyConfiguration(obj)) + Into(into); err != nil { + return err + } + raw, err := json.Marshal(into) + if err != nil { + return err + } + return json.Unmarshal(raw, &obj) } diff --git a/pkg/client/unstructured_client.go b/pkg/client/unstructured_client.go index d2ea6d7a32..31e935e557 100644 --- a/pkg/client/unstructured_client.go +++ b/pkg/client/unstructured_client.go @@ -21,12 +21,12 @@ import ( "fmt" "strings" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/util/apply" ) var _ Reader = &unstructuredClient{} -var _ Writer = &unstructuredClient{} type unstructuredClient struct { resources *clientRestResources @@ -93,26 +93,23 @@ func (uc *unstructuredClient) Update(ctx context.Context, obj Object, opts ...Up } // Delete implements client.Client. -func (uc *unstructuredClient) Delete(ctx context.Context, obj Object, opts ...DeleteOption) error { - if _, ok := obj.(runtime.Unstructured); !ok { - return fmt.Errorf("unstructured client did not understand object: %T", obj) - } - +func (uc *unstructuredClient) Delete(ctx context.Context, obj Object, opts ...DeleteOption) (*unstructured.Unstructured, error) { o, err := uc.resources.getObjMeta(obj) if err != nil { - return err + return nil, err } deleteOpts := DeleteOptions{} deleteOpts.ApplyOptions(opts) - return o.Delete(). + response := &unstructured.Unstructured{} + return response, o.Delete(). NamespaceIfScoped(o.namespace, o.isNamespaced()). Resource(o.resource()). Name(o.name). Body(deleteOpts.AsDeleteOptions()). Do(ctx). - Error() + Into(response) } // DeleteAllOf implements client.Client. diff --git a/pkg/client/watch.go b/pkg/client/watch.go index 181b22a673..d9d26ab1be 100644 --- a/pkg/client/watch.go +++ b/pkg/client/watch.go @@ -28,11 +28,11 @@ import ( // NewWithWatch returns a new WithWatch. func NewWithWatch(config *rest.Config, options Options) (WithWatch, error) { - client, err := newClient(config, options) + c, err := newClient(config, options) if err != nil { return nil, err } - return &watchingClient{client: client}, nil + return &watchingClient{client: c.(*client)}, nil // TODO: This will panic if consistency is enabled } type watchingClient struct {