Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions pkg/client/resource_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package client

import (
"fmt"
"sync"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
)

type ResourceCache struct {
mutex sync.Mutex // guard the initialization ofthe resourceLists
resourceLists []*metav1.APIResourceList // All available API in the cluster
discoveryClient discovery.ServerResourcesInterface
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

// NewResourceCache creates a new ResourceCache with the provided discovery client.
// The discovery client is used to fetch available API resources.
func NewResourceCache(discoveryClient discovery.ServerResourcesInterface) *ResourceCache {
return &ResourceCache{
discoveryClient: discoveryClient,
}
}

// GVRForKind returns a group-resource-version for the supplied kind and api version.
func (rc *ResourceCache) GVRForKind(kind, apiVersion string) (gvr schema.GroupVersionResource, found bool, namespaced bool, err error) {
if err = rc.ensureResourceList(); err != nil {
return
}

// Parse the group and version from the APIVersion (e.g., "apps/v1" -> group: "apps", version: "v1")
var gv schema.GroupVersion
gv, err = schema.ParseGroupVersion(apiVersion)
if err != nil {
err = fmt.Errorf("failed to parse APIVersion %s: %w", apiVersion, err)
return
}

// Look for a matching resource
for _, resourceList := range rc.resourceLists {
if resourceList.GroupVersion == apiVersion {
for _, apiResource := range resourceList.APIResources {
if apiResource.Kind == kind {
// Construct the GVR
found = true
gvr = schema.GroupVersionResource{
Group: gv.Group,
Version: gv.Version,
Resource: apiResource.Name,
}
namespaced = apiResource.Namespaced

return
}
}
}
}

return
}

// GVKForGR given the group-resource, returns the first matching GVK for it.
func (rc *ResourceCache) GVKForGR(gr schema.GroupResource) (gvk schema.GroupVersionKind, found bool, err error) {
if err = rc.ensureResourceList(); err != nil {
return
}

for _, resourceList := range rc.resourceLists {
var gv schema.GroupVersion
gv, err = schema.ParseGroupVersion(resourceList.GroupVersion)
if err != nil {
err = fmt.Errorf("failed to parse GroupVersion %s: %w", resourceList.GroupVersion, err)
return
}
if gv.Group != gr.Group {
continue
}
for _, res := range resourceList.APIResources {
if res.Name == gr.Resource {
gvk = schema.GroupVersionKind{Group: gv.Group, Version: gv.Version, Kind: res.Kind}
found = true
return
}
}
}

return
}

func (rc *ResourceCache) ensureResourceList() error {
rc.mutex.Lock()
defer rc.mutex.Unlock()
Comment thread
metlos marked this conversation as resolved.

if rc.resourceLists == nil {
// Get all API resources from the cluster using the discovery client. We need it for constructing GVRs for unstructured objects.
// Do it here once, so we do not have to list it multiple times before listing/getting every unstructured resource.
//
// The ServerPreferredResources() method is meant to return partial results on failure.
// We ignore them here for the sake of simplicity. Let's just retry to get the full results the next time.
resourceList, err := rc.discoveryClient.ServerPreferredResources()
if err != nil {
return err
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

rc.resourceLists = resourceList
}

return nil
}
241 changes: 241 additions & 0 deletions pkg/client/resource_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
package client

import (
"errors"
"fmt"
"sync/atomic"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
)

type fakeDiscoveryClient struct {
resources []*metav1.APIResourceList
err error
calls atomic.Int32
}

func (f *fakeDiscoveryClient) ServerResourcesForGroupVersion(string) (*metav1.APIResourceList, error) {
return nil, nil
}

func (f *fakeDiscoveryClient) ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
return nil, nil, nil
}

func (f *fakeDiscoveryClient) ServerPreferredResources() ([]*metav1.APIResourceList, error) {
f.calls.Add(1)
return f.resources, f.err
}

func (f *fakeDiscoveryClient) ServerPreferredNamespacedResources() ([]*metav1.APIResourceList, error) {
return nil, nil
}

func TestGVRForKind(t *testing.T) {
resources := []*metav1.APIResourceList{
{
GroupVersion: "v1",
APIResources: []metav1.APIResource{
{Name: "pods", Kind: "Pod", Namespaced: true},
{Name: "nodes", Kind: "Node", Namespaced: false},
},
},
{
GroupVersion: "apps/v1",
APIResources: []metav1.APIResource{
{Name: "deployments", Kind: "Deployment", Namespaced: true},
{Name: "daemonsets", Kind: "DaemonSet", Namespaced: true},
},
},
}

t.Run("finds namespaced core resource", func(t *testing.T) {
rc := NewResourceCache(&fakeDiscoveryClient{resources: resources})

gvr, found, namespaced, err := rc.GVRForKind("Pod", "v1")

require.NoError(t, err)
assert.True(t, found)
assert.True(t, namespaced)
assert.Equal(t, schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}, gvr)
})

t.Run("finds cluster-scoped core resource", func(t *testing.T) {
rc := NewResourceCache(&fakeDiscoveryClient{resources: resources})

gvr, found, namespaced, err := rc.GVRForKind("Node", "v1")

require.NoError(t, err)
assert.True(t, found)
assert.False(t, namespaced)
assert.Equal(t, schema.GroupVersionResource{Group: "", Version: "v1", Resource: "nodes"}, gvr)
})

t.Run("finds resource in non-core group", func(t *testing.T) {
rc := NewResourceCache(&fakeDiscoveryClient{resources: resources})

gvr, found, namespaced, err := rc.GVRForKind("Deployment", "apps/v1")

require.NoError(t, err)
assert.True(t, found)
assert.True(t, namespaced)
assert.Equal(t, schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}, gvr)
})

t.Run("not found", func(t *testing.T) {
rc := NewResourceCache(&fakeDiscoveryClient{resources: resources})

_, found, _, err := rc.GVRForKind("StatefulSet", "apps/v1")

require.NoError(t, err)
assert.False(t, found)
assert.NoError(t, err)
})

t.Run("not found in wrong api version", func(t *testing.T) {
rc := NewResourceCache(&fakeDiscoveryClient{resources: resources})

_, found, _, err := rc.GVRForKind("Deployment", "apps/v1beta1")

require.NoError(t, err)
assert.False(t, found)
assert.NoError(t, err)
})

t.Run("invalid api version", func(t *testing.T) {
rc := NewResourceCache(&fakeDiscoveryClient{resources: resources})

_, _, _, err := rc.GVRForKind("Pod", "a/b/c")

require.Error(t, err)
assert.Contains(t, err.Error(), "failed to parse APIVersion")
})

t.Run("discovery error", func(t *testing.T) {
rc := NewResourceCache(&fakeDiscoveryClient{err: errors.New("discovery failed")})

_, _, _, err := rc.GVRForKind("Pod", "v1")

require.EqualError(t, err, "discovery failed")
})

t.Run("caches discovery results", func(t *testing.T) {
dc := &fakeDiscoveryClient{resources: resources}
rc := NewResourceCache(dc)

_, _, _, err := rc.GVRForKind("Pod", "v1")
require.NoError(t, err)

_, _, _, err = rc.GVRForKind("Deployment", "apps/v1")
require.NoError(t, err)

assert.Equal(t, int32(1), dc.calls.Load(), "discovery should have been called only once")
})
}

func TestGVKForGR(t *testing.T) {
resources := []*metav1.APIResourceList{
{
GroupVersion: "v1",
APIResources: []metav1.APIResource{
{Name: "pods", Kind: "Pod", Namespaced: true},
{Name: "services", Kind: "Service", Namespaced: true},
},
},
{
GroupVersion: "apps/v1",
APIResources: []metav1.APIResource{
{Name: "deployments", Kind: "Deployment", Namespaced: true},
{Name: "replicasets", Kind: "ReplicaSet", Namespaced: true},
},
},
{
GroupVersion: "kubevirt.io/v1",
APIResources: []metav1.APIResource{
{Name: "virtualmachines", Kind: "VirtualMachine", Namespaced: true},
},
},
}

t.Run("finds core group resource", func(t *testing.T) {
rc := NewResourceCache(&fakeDiscoveryClient{resources: resources})

gvk, found, err := rc.GVKForGR(schema.GroupResource{Group: "", Resource: "pods"})

require.NoError(t, err)
assert.True(t, found)
assert.Equal(t, schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"}, gvk)
})

t.Run("finds non-core group resource", func(t *testing.T) {
rc := NewResourceCache(&fakeDiscoveryClient{resources: resources})

gvk, found, err := rc.GVKForGR(schema.GroupResource{Group: "apps", Resource: "deployments"})

require.NoError(t, err)
assert.True(t, found)
assert.Equal(t, schema.GroupVersionKind{Group: "apps", Version: "v1", Kind: "Deployment"}, gvk)
})

t.Run("finds resource with dotted group", func(t *testing.T) {
rc := NewResourceCache(&fakeDiscoveryClient{resources: resources})

gvk, found, err := rc.GVKForGR(schema.GroupResource{Group: "kubevirt.io", Resource: "virtualmachines"})

require.NoError(t, err)
assert.True(t, found)
assert.Equal(t, schema.GroupVersionKind{Group: "kubevirt.io", Version: "v1", Kind: "VirtualMachine"}, gvk)
})

t.Run("not found", func(t *testing.T) {
rc := NewResourceCache(&fakeDiscoveryClient{resources: resources})

_, found, err := rc.GVKForGR(schema.GroupResource{Group: "apps", Resource: "statefulsets"})

require.NoError(t, err)
assert.False(t, found)
assert.NoError(t, err)
})

t.Run("wrong group", func(t *testing.T) {
rc := NewResourceCache(&fakeDiscoveryClient{resources: resources})

_, found, err := rc.GVKForGR(schema.GroupResource{Group: "extensions", Resource: "deployments"})

require.NoError(t, err)
assert.False(t, found)
assert.NoError(t, err)
})

t.Run("discovery error", func(t *testing.T) {
rc := NewResourceCache(&fakeDiscoveryClient{err: errors.New("discovery failed")})

_, _, err := rc.GVKForGR(schema.GroupResource{Group: "apps", Resource: "deployments"})

require.EqualError(t, err, "discovery failed")
})
}

func TestThreadSafety(t *testing.T) {
// given
cl := &fakeDiscoveryClient{resources: []*metav1.APIResourceList{}}
rc := NewResourceCache(cl)

// when
t.Run("wrap", func(t *testing.T) { // wrap so that the parallel inner tests complete before we check in the then.
for i := range 20 {
t.Run(fmt.Sprintf("concurrent-%d", i), func(t *testing.T) {
t.Parallel()
_, _, _, err := rc.GVRForKind("SomeKind", "v1")
assert.NoError(t, err)
})
}
})

// then
assert.Equal(t, int32(1), cl.calls.Load())
}
Loading
Loading