Skip to content

Commit 61a6731

Browse files
authored
[SANDBOX-1788] refactor the resource cache out of the OwnerFetcher (#525)
1 parent 1fd670b commit 61a6731

4 files changed

Lines changed: 371 additions & 111 deletions

File tree

pkg/client/resource_cache.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package client
2+
3+
import (
4+
"fmt"
5+
"sync"
6+
7+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
8+
"k8s.io/apimachinery/pkg/runtime/schema"
9+
"k8s.io/client-go/discovery"
10+
)
11+
12+
type ResourceCache struct {
13+
mutex sync.Mutex // guard the initialization ofthe resourceLists
14+
resourceLists []*metav1.APIResourceList // All available API in the cluster
15+
discoveryClient discovery.ServerResourcesInterface
16+
}
17+
18+
// NewResourceCache creates a new ResourceCache with the provided discovery client.
19+
// The discovery client is used to fetch available API resources.
20+
func NewResourceCache(discoveryClient discovery.ServerResourcesInterface) *ResourceCache {
21+
return &ResourceCache{
22+
discoveryClient: discoveryClient,
23+
}
24+
}
25+
26+
// GVRForKind returns a group-resource-version for the supplied kind and api version.
27+
func (rc *ResourceCache) GVRForKind(kind, apiVersion string) (gvr schema.GroupVersionResource, found bool, namespaced bool, err error) {
28+
if err = rc.ensureResourceList(); err != nil {
29+
return
30+
}
31+
32+
// Parse the group and version from the APIVersion (e.g., "apps/v1" -> group: "apps", version: "v1")
33+
var gv schema.GroupVersion
34+
gv, err = schema.ParseGroupVersion(apiVersion)
35+
if err != nil {
36+
err = fmt.Errorf("failed to parse APIVersion %s: %w", apiVersion, err)
37+
return
38+
}
39+
40+
// Look for a matching resource
41+
for _, resourceList := range rc.resourceLists {
42+
if resourceList.GroupVersion == apiVersion {
43+
for _, apiResource := range resourceList.APIResources {
44+
if apiResource.Kind == kind {
45+
// Construct the GVR
46+
found = true
47+
gvr = schema.GroupVersionResource{
48+
Group: gv.Group,
49+
Version: gv.Version,
50+
Resource: apiResource.Name,
51+
}
52+
namespaced = apiResource.Namespaced
53+
54+
return
55+
}
56+
}
57+
}
58+
}
59+
60+
return
61+
}
62+
63+
// GVKForGR given the group-resource, returns the first matching GVK for it.
64+
func (rc *ResourceCache) GVKForGR(gr schema.GroupResource) (gvk schema.GroupVersionKind, found bool, err error) {
65+
if err = rc.ensureResourceList(); err != nil {
66+
return
67+
}
68+
69+
for _, resourceList := range rc.resourceLists {
70+
var gv schema.GroupVersion
71+
gv, err = schema.ParseGroupVersion(resourceList.GroupVersion)
72+
if err != nil {
73+
err = fmt.Errorf("failed to parse GroupVersion %s: %w", resourceList.GroupVersion, err)
74+
return
75+
}
76+
if gv.Group != gr.Group {
77+
continue
78+
}
79+
for _, res := range resourceList.APIResources {
80+
if res.Name == gr.Resource {
81+
gvk = schema.GroupVersionKind{Group: gv.Group, Version: gv.Version, Kind: res.Kind}
82+
found = true
83+
return
84+
}
85+
}
86+
}
87+
88+
return
89+
}
90+
91+
func (rc *ResourceCache) ensureResourceList() error {
92+
rc.mutex.Lock()
93+
defer rc.mutex.Unlock()
94+
95+
if rc.resourceLists == nil {
96+
// Get all API resources from the cluster using the discovery client. We need it for constructing GVRs for unstructured objects.
97+
// Do it here once, so we do not have to list it multiple times before listing/getting every unstructured resource.
98+
//
99+
// The ServerPreferredResources() method is meant to return partial results on failure.
100+
// We ignore them here for the sake of simplicity. Let's just retry to get the full results the next time.
101+
resourceList, err := rc.discoveryClient.ServerPreferredResources()
102+
if err != nil {
103+
return err
104+
}
105+
106+
rc.resourceLists = resourceList
107+
}
108+
109+
return nil
110+
}

pkg/client/resource_cache_test.go

Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
package client
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"sync/atomic"
7+
"testing"
8+
9+
"github.com/stretchr/testify/assert"
10+
"github.com/stretchr/testify/require"
11+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
12+
"k8s.io/apimachinery/pkg/runtime/schema"
13+
)
14+
15+
type fakeDiscoveryClient struct {
16+
resources []*metav1.APIResourceList
17+
err error
18+
calls atomic.Int32
19+
}
20+
21+
func (f *fakeDiscoveryClient) ServerResourcesForGroupVersion(string) (*metav1.APIResourceList, error) {
22+
return nil, nil
23+
}
24+
25+
func (f *fakeDiscoveryClient) ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
26+
return nil, nil, nil
27+
}
28+
29+
func (f *fakeDiscoveryClient) ServerPreferredResources() ([]*metav1.APIResourceList, error) {
30+
f.calls.Add(1)
31+
return f.resources, f.err
32+
}
33+
34+
func (f *fakeDiscoveryClient) ServerPreferredNamespacedResources() ([]*metav1.APIResourceList, error) {
35+
return nil, nil
36+
}
37+
38+
func TestGVRForKind(t *testing.T) {
39+
resources := []*metav1.APIResourceList{
40+
{
41+
GroupVersion: "v1",
42+
APIResources: []metav1.APIResource{
43+
{Name: "pods", Kind: "Pod", Namespaced: true},
44+
{Name: "nodes", Kind: "Node", Namespaced: false},
45+
},
46+
},
47+
{
48+
GroupVersion: "apps/v1",
49+
APIResources: []metav1.APIResource{
50+
{Name: "deployments", Kind: "Deployment", Namespaced: true},
51+
{Name: "daemonsets", Kind: "DaemonSet", Namespaced: true},
52+
},
53+
},
54+
}
55+
56+
t.Run("finds namespaced core resource", func(t *testing.T) {
57+
rc := NewResourceCache(&fakeDiscoveryClient{resources: resources})
58+
59+
gvr, found, namespaced, err := rc.GVRForKind("Pod", "v1")
60+
61+
require.NoError(t, err)
62+
assert.True(t, found)
63+
assert.True(t, namespaced)
64+
assert.Equal(t, schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}, gvr)
65+
})
66+
67+
t.Run("finds cluster-scoped core resource", func(t *testing.T) {
68+
rc := NewResourceCache(&fakeDiscoveryClient{resources: resources})
69+
70+
gvr, found, namespaced, err := rc.GVRForKind("Node", "v1")
71+
72+
require.NoError(t, err)
73+
assert.True(t, found)
74+
assert.False(t, namespaced)
75+
assert.Equal(t, schema.GroupVersionResource{Group: "", Version: "v1", Resource: "nodes"}, gvr)
76+
})
77+
78+
t.Run("finds resource in non-core group", func(t *testing.T) {
79+
rc := NewResourceCache(&fakeDiscoveryClient{resources: resources})
80+
81+
gvr, found, namespaced, err := rc.GVRForKind("Deployment", "apps/v1")
82+
83+
require.NoError(t, err)
84+
assert.True(t, found)
85+
assert.True(t, namespaced)
86+
assert.Equal(t, schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}, gvr)
87+
})
88+
89+
t.Run("not found", func(t *testing.T) {
90+
rc := NewResourceCache(&fakeDiscoveryClient{resources: resources})
91+
92+
_, found, _, err := rc.GVRForKind("StatefulSet", "apps/v1")
93+
94+
require.NoError(t, err)
95+
assert.False(t, found)
96+
assert.NoError(t, err)
97+
})
98+
99+
t.Run("not found in wrong api version", func(t *testing.T) {
100+
rc := NewResourceCache(&fakeDiscoveryClient{resources: resources})
101+
102+
_, found, _, err := rc.GVRForKind("Deployment", "apps/v1beta1")
103+
104+
require.NoError(t, err)
105+
assert.False(t, found)
106+
assert.NoError(t, err)
107+
})
108+
109+
t.Run("invalid api version", func(t *testing.T) {
110+
rc := NewResourceCache(&fakeDiscoveryClient{resources: resources})
111+
112+
_, _, _, err := rc.GVRForKind("Pod", "a/b/c")
113+
114+
require.Error(t, err)
115+
assert.Contains(t, err.Error(), "failed to parse APIVersion")
116+
})
117+
118+
t.Run("discovery error", func(t *testing.T) {
119+
rc := NewResourceCache(&fakeDiscoveryClient{err: errors.New("discovery failed")})
120+
121+
_, _, _, err := rc.GVRForKind("Pod", "v1")
122+
123+
require.EqualError(t, err, "discovery failed")
124+
})
125+
126+
t.Run("caches discovery results", func(t *testing.T) {
127+
dc := &fakeDiscoveryClient{resources: resources}
128+
rc := NewResourceCache(dc)
129+
130+
_, _, _, err := rc.GVRForKind("Pod", "v1")
131+
require.NoError(t, err)
132+
133+
_, _, _, err = rc.GVRForKind("Deployment", "apps/v1")
134+
require.NoError(t, err)
135+
136+
assert.Equal(t, int32(1), dc.calls.Load(), "discovery should have been called only once")
137+
})
138+
}
139+
140+
func TestGVKForGR(t *testing.T) {
141+
resources := []*metav1.APIResourceList{
142+
{
143+
GroupVersion: "v1",
144+
APIResources: []metav1.APIResource{
145+
{Name: "pods", Kind: "Pod", Namespaced: true},
146+
{Name: "services", Kind: "Service", Namespaced: true},
147+
},
148+
},
149+
{
150+
GroupVersion: "apps/v1",
151+
APIResources: []metav1.APIResource{
152+
{Name: "deployments", Kind: "Deployment", Namespaced: true},
153+
{Name: "replicasets", Kind: "ReplicaSet", Namespaced: true},
154+
},
155+
},
156+
{
157+
GroupVersion: "kubevirt.io/v1",
158+
APIResources: []metav1.APIResource{
159+
{Name: "virtualmachines", Kind: "VirtualMachine", Namespaced: true},
160+
},
161+
},
162+
}
163+
164+
t.Run("finds core group resource", func(t *testing.T) {
165+
rc := NewResourceCache(&fakeDiscoveryClient{resources: resources})
166+
167+
gvk, found, err := rc.GVKForGR(schema.GroupResource{Group: "", Resource: "pods"})
168+
169+
require.NoError(t, err)
170+
assert.True(t, found)
171+
assert.Equal(t, schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"}, gvk)
172+
})
173+
174+
t.Run("finds non-core group resource", func(t *testing.T) {
175+
rc := NewResourceCache(&fakeDiscoveryClient{resources: resources})
176+
177+
gvk, found, err := rc.GVKForGR(schema.GroupResource{Group: "apps", Resource: "deployments"})
178+
179+
require.NoError(t, err)
180+
assert.True(t, found)
181+
assert.Equal(t, schema.GroupVersionKind{Group: "apps", Version: "v1", Kind: "Deployment"}, gvk)
182+
})
183+
184+
t.Run("finds resource with dotted group", func(t *testing.T) {
185+
rc := NewResourceCache(&fakeDiscoveryClient{resources: resources})
186+
187+
gvk, found, err := rc.GVKForGR(schema.GroupResource{Group: "kubevirt.io", Resource: "virtualmachines"})
188+
189+
require.NoError(t, err)
190+
assert.True(t, found)
191+
assert.Equal(t, schema.GroupVersionKind{Group: "kubevirt.io", Version: "v1", Kind: "VirtualMachine"}, gvk)
192+
})
193+
194+
t.Run("not found", func(t *testing.T) {
195+
rc := NewResourceCache(&fakeDiscoveryClient{resources: resources})
196+
197+
_, found, err := rc.GVKForGR(schema.GroupResource{Group: "apps", Resource: "statefulsets"})
198+
199+
require.NoError(t, err)
200+
assert.False(t, found)
201+
assert.NoError(t, err)
202+
})
203+
204+
t.Run("wrong group", func(t *testing.T) {
205+
rc := NewResourceCache(&fakeDiscoveryClient{resources: resources})
206+
207+
_, found, err := rc.GVKForGR(schema.GroupResource{Group: "extensions", Resource: "deployments"})
208+
209+
require.NoError(t, err)
210+
assert.False(t, found)
211+
assert.NoError(t, err)
212+
})
213+
214+
t.Run("discovery error", func(t *testing.T) {
215+
rc := NewResourceCache(&fakeDiscoveryClient{err: errors.New("discovery failed")})
216+
217+
_, _, err := rc.GVKForGR(schema.GroupResource{Group: "apps", Resource: "deployments"})
218+
219+
require.EqualError(t, err, "discovery failed")
220+
})
221+
}
222+
223+
func TestThreadSafety(t *testing.T) {
224+
// given
225+
cl := &fakeDiscoveryClient{resources: []*metav1.APIResourceList{}}
226+
rc := NewResourceCache(cl)
227+
228+
// when
229+
t.Run("wrap", func(t *testing.T) { // wrap so that the parallel inner tests complete before we check in the then.
230+
for i := range 20 {
231+
t.Run(fmt.Sprintf("concurrent-%d", i), func(t *testing.T) {
232+
t.Parallel()
233+
_, _, _, err := rc.GVRForKind("SomeKind", "v1")
234+
assert.NoError(t, err)
235+
})
236+
}
237+
})
238+
239+
// then
240+
assert.Equal(t, int32(1), cl.calls.Load())
241+
}

0 commit comments

Comments
 (0)