Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions resourcetopo/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ type nodeInfo struct {
// will be updated to true after the object added to cache or the manager is of virtual type.
objectExisted bool

relationsLock sync.RWMutex
relations []ResourceRelation
// always call relationsLock.Lock() before relation change, so when locked, the relation is stable.
// kind like a lock.RLock() in relation change scenario.
relationsLock sync.RWMutex
labelRelations []ResourceRelation
}

func newNode(s *nodeStorage, cluster, namespace, name string) *nodeInfo {
Expand Down Expand Up @@ -129,10 +131,10 @@ func (node *nodeInfo) checkLabelUpdateForPostNode(postNode *nodeInfo) {

node.relationsLock.RLock()
defer node.relationsLock.RUnlock()
if len(node.relations) == 0 {
if len(node.labelRelations) == 0 {
return
}
for _, relation := range node.relations {
for _, relation := range node.labelRelations {
if !typeEqual(relation.PostMeta, postNode.storageRef.meta) {
return
}
Expand Down Expand Up @@ -181,13 +183,10 @@ func (n *nodeInfo) updateNodeMeta(obj Object) {
func (n *nodeInfo) objectDeleted() {
n.metaLock.Lock()
defer n.metaLock.Unlock()
n.relationsLock.Lock()
defer n.relationsLock.Unlock()

n.objectExisted = false
n.labels = nil
n.ownerNodes = nil
n.relations = nil
}

func (n *nodeInfo) matched(selector labels.Selector) bool {
Expand Down
7 changes: 7 additions & 0 deletions resourcetopo/node_relation_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,13 @@ func deleteAllRelation(node *nodeInfo) {
// we always lock preOrder before postOrder, expect in this function.
// but to keep transition atomic, and in object deletion scenario,
// this is okay to hold the lock in whole process, and lock preOrderNode as needed.
// node.metaLock.Lock()
// defer node.metaLock.Unlock()
node.relationsLock.Lock()
node.objectDeleted()
node.labelRelations = nil
defer node.relationsLock.Unlock()

node.lock.Lock()
defer node.lock.Unlock()

Expand Down
128 changes: 96 additions & 32 deletions resourcetopo/node_topology_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ import (
"k8s.io/klog/v2"
)

type nodeMeta struct {
apiVersion, kind string
cluster, namespace, name string
}

var _ cache.ResourceEventHandler = &nodeStorage{}

func (s *nodeStorage) OnAdd(obj interface{}) {
Expand Down Expand Up @@ -63,23 +68,40 @@ func (s *nodeStorage) OnUpdate(oldObj, newObj interface{}) {
return
}

var resolvedRelations []ResourceRelation
var resolvedLabelRelations []ResourceRelation
resolvedDirectRelations := make(map[nodeMeta]interface{})
for _, resolver := range s.resolvers {
relations := resolver.Resolve(newTopoObj)
resolvedRelations = append(resolvedRelations, relations...)
for _, relation := range relations {
if relation.LabelSelector != nil {
resolvedLabelRelations = append(resolvedLabelRelations, relation)
} else {
for _, directRef := range relation.DirectRefs {
resolvedDirectRelations[nodeMeta{
apiVersion: relation.PostMeta.APIVersion,
kind: relation.PostMeta.Kind,
cluster: relation.Cluster,
namespace: directRef.Namespace,
name: directRef.Name,
}] = nil
}
}
}
}

slices.SortFunc(resolvedRelations, compareResourceRelation)
slices.SortFunc(resolvedLabelRelations, compareLabelResourceRelation)
node.relationsLock.Lock()
sortedSlicesCompare(node.relations, resolvedRelations,
sortedSlicesCompare(node.labelRelations, resolvedLabelRelations,
func(relation ResourceRelation) {
s.removeResourceRelation(node, &relation)
s.removeLabelResourceRelation(node, &relation)
},
func(relation ResourceRelation) {
s.addResourceRelation(node, &relation)
s.addLabelResourceRelation(node, &relation)
},
compareResourceRelation)
node.relations = resolvedRelations
compareLabelResourceRelation)
node.labelRelations = resolvedLabelRelations

s.setDirectResourceRelation(node, resolvedDirectRelations)
node.relationsLock.Unlock()

if !node.labelEqualed(newTopoObj.GetLabels()) ||
Expand Down Expand Up @@ -132,7 +154,6 @@ func (s *nodeStorage) OnDelete(obj interface{}) {
return
}

node.objectDeleted()
deleteAllRelation(node)
node.checkGC()

Expand All @@ -142,18 +163,24 @@ func (s *nodeStorage) OnDelete(obj interface{}) {

func (s *nodeStorage) addNode(obj Object, node *nodeInfo) {
node.updateNodeMeta(obj)
if len(node.relations) != 0 {
klog.Warningf("unexpected relations {%v}", node.relations)
node.relations = nil
if len(node.labelRelations) != 0 {
klog.Warningf("unexpected labelRelations {%v}", node.labelRelations)
node.labelRelations = nil
}
node.relationsLock.Lock()
for _, resolver := range s.resolvers {
relations := resolver.Resolve(obj)
node.relations = append(node.relations, relations...)
for _, relation := range relations {
if relation.LabelSelector != nil {
node.labelRelations = append(node.labelRelations, relation)
} else {
s.addDirectResourceRelation(node, &relation)
}
}
}
slices.SortFunc(node.relations, compareResourceRelation)
for _, relation := range node.relations {
s.addResourceRelation(node, &relation)
slices.SortFunc(node.labelRelations, compareLabelResourceRelation)
for _, relation := range node.labelRelations {
s.addLabelResourceRelation(node, &relation)
}
node.relationsLock.Unlock()

Expand All @@ -171,20 +198,14 @@ func (s *nodeStorage) addNode(obj Object, node *nodeInfo) {
}
}

func (s *nodeStorage) addResourceRelation(node *nodeInfo, relation *ResourceRelation) {
func (s *nodeStorage) addLabelResourceRelation(node *nodeInfo, relation *ResourceRelation) {
postMeta := relation.PostMeta
postMetaKey := generateMetaKey(postMeta)
postStorage := s.manager.getStorage(postMeta)
if postStorage == nil {
klog.Errorf("Failed to get node storage by meta %s, ignore this relation", postMetaKey)
return
}
if len(relation.DirectRefs) > 0 {
for _, ref := range relation.DirectRefs {
postNode := postStorage.getOrCreateNode(relation.Cluster, ref.Namespace, ref.Name)
rangeAndSetDirectRefRelation(node, postNode, s.manager)
}
}

if relation.LabelSelector != nil {
var postNodes []*nodeInfo
Expand All @@ -199,22 +220,65 @@ func (s *nodeStorage) addResourceRelation(node *nodeInfo, relation *ResourceRela
}
}

func (s *nodeStorage) removeResourceRelation(node *nodeInfo, relation *ResourceRelation) {
postStorage := s.manager.getStorage(relation.PostMeta)
func (s *nodeStorage) setDirectResourceRelation(node *nodeInfo, directRefs map[nodeMeta]interface{}) {
var toDel []*nodeInfo
rangeNodeList(node.directReferredPostOrders, func(postNode *nodeInfo) {
postNodeMeta := nodeMeta{
apiVersion: postNode.storageRef.meta.APIVersion,
kind: postNode.storageRef.meta.Kind,
cluster: postNode.cluster,
namespace: postNode.namespace,
name: postNode.name,
}
if _, ok := directRefs[postNodeMeta]; !ok {
toDel = append(toDel, postNode)
} else {
delete(directRefs, postNodeMeta)
}
})

for _, postNode := range toDel {
if deleteDirectRelation(node, postNode) {
node.noticePostOrderRelationDeleted(postNode)
postNode.checkGC()
}
}

for postNodeMeta := range directRefs {
postStorage := s.manager.getStorage(metav1.TypeMeta{APIVersion: postNodeMeta.apiVersion, Kind: postNodeMeta.kind})
if postStorage == nil {
klog.Errorf("Failed to get node storage by meta %s, ignore this relation", generateKey(postNodeMeta.apiVersion, postNodeMeta.kind))
continue
}
postNode := postStorage.getOrCreateNode(postNodeMeta.cluster, postNodeMeta.namespace, postNodeMeta.name)
rangeAndSetDirectRefRelation(node, postNode, s.manager)
}
}

func (s *nodeStorage) addDirectResourceRelation(node *nodeInfo, relation *ResourceRelation) {
postMeta := relation.PostMeta
postMetaKey := generateMetaKey(postMeta)
postStorage := s.manager.getStorage(postMeta)
if postStorage == nil {
klog.Error("Failed to get node Storage by %s, ignore this delete request",
generateMetaKey(relation.PostMeta))
klog.Errorf("Failed to get node storage by meta %s, ignore this relation", postMetaKey)
return
}

if len(relation.DirectRefs) > 0 {
for _, ref := range relation.DirectRefs {
postNode := postStorage.getNode(relation.Cluster, ref.Namespace, ref.Name)
if deleteDirectRelation(node, postNode) {
node.noticePostOrderRelationDeleted(postNode)
postNode.checkGC()
}
postNode := postStorage.getOrCreateNode(relation.Cluster, ref.Namespace, ref.Name)
rangeAndSetDirectRefRelation(node, postNode, s.manager)
}
}
}

func (s *nodeStorage) removeLabelResourceRelation(node *nodeInfo, relation *ResourceRelation) {
postStorage := s.manager.getStorage(relation.PostMeta)
if postStorage == nil {
klog.Error("Failed to get node Storage by %s, ignore this delete request",
generateMetaKey(relation.PostMeta))
return
}

if relation.LabelSelector != nil {
postNodes := postStorage.getMatchedNodeList(node.cluster, node.namespace, relation.LabelSelector)
Expand Down
45 changes: 37 additions & 8 deletions resourcetopo/resourcetopo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ var _ = Describe("test suite with ists config(label selector and virtual rersour
Expect(podStorage).NotTo(BeNil())
Expect(istsStorage).NotTo(BeNil())

ctx, cancel = context.WithCancel((context.Background()))
ctx, cancel = context.WithCancel(context.Background())
podHandler = &objecthandler{}
Expect(manager.AddNodeHandler(PodMeta, podHandler)).Should(Succeed())
stsHandler = &objecthandler{}
Expand Down Expand Up @@ -471,7 +471,7 @@ var _ = Describe("test suite with cluster role config(cluster role and direct re
Expect(clusterroleStorage).NotTo(BeNil())
Expect(saStorage).NotTo(BeNil())

ctx, cancel = context.WithCancel((context.Background()))
ctx, cancel = context.WithCancel(context.Background())
clusterRoleBindingHandler = &objecthandler{}
Expect(manager.AddNodeHandler(ClusterRoleBindingMeta, clusterRoleBindingHandler)).NotTo(HaveOccurred())
clusterRoleHandler = &objecthandler{}
Expand Down Expand Up @@ -804,7 +804,7 @@ var _ = Describe("test suite with svc and pod config(label selector and reverse
svcStorage, _ = manager.GetTopoNodeStorage(ServiceMeta)
Expect(svcStorage).NotTo(BeNil())

ctx, cancel = context.WithCancel((context.Background()))
ctx, cancel = context.WithCancel(context.Background())
podHandler = &objecthandler{}
Expect(manager.AddNodeHandler(PodMeta, podHandler)).To(Succeed())
svcHandler = &objecthandler{}
Expand Down Expand Up @@ -1143,7 +1143,7 @@ var _ = Describe("test suite with deploy config(label selector and owner referen
Expect(podStorage).NotTo(BeNil())
Expect(deployStorage).NotTo(BeNil())

ctx, cancel = context.WithCancel((context.Background()))
ctx, cancel = context.WithCancel(context.Background())
podHandler = &objecthandler{}
Expect(manager.AddNodeHandler(PodMeta, podHandler)).Should(Succeed())
replicasetHandler = &objecthandler{}
Expand Down Expand Up @@ -1356,7 +1356,7 @@ var _ = Describe("test suite with deploy config(label selector and owner referen
})
})

var _ = Describe("test suite with relations update", func() {
var _ = Describe("test suite with labelRelations update", func() {
var manager Manager
var fakeClient *fake.Clientset
var clusterRoleBindingHandler, clusterRoleHandler, saHandler *objecthandler
Expand Down Expand Up @@ -1389,7 +1389,7 @@ var _ = Describe("test suite with relations update", func() {
Expect(clusterroleStorage).NotTo(BeNil())
Expect(saStorage).NotTo(BeNil())

ctx, cancel = context.WithCancel((context.Background()))
ctx, cancel = context.WithCancel(context.Background())
clusterRoleBindingHandler = &objecthandler{}
Expect(manager.AddNodeHandler(ClusterRoleBindingMeta, clusterRoleBindingHandler)).NotTo(HaveOccurred())
clusterRoleHandler = &objecthandler{}
Expand Down Expand Up @@ -1552,6 +1552,35 @@ var _ = Describe("test suite with relations update", func() {
Expect(fakeClient.RbacV1().ClusterRoleBindings().Create(ctx, crb, metav1.CreateOptions{})).NotTo(BeNil())
syncStatus(checkAll)
})
It("create and update clusterRoleBinding with objects not existed", func() {
ns := "testclusterresource"
crbName := "crbtest"
crName := "crName"
saName := "saName"
saName2 := "saName2"

clusterRoleBindingHandler.addCallExpected()
crb := newClusterRoleBinding(crbName, crName,
[]types.NamespacedName{{Name: saName, Namespace: ns}})
Expect(fakeClient.RbacV1().ClusterRoleBindings().Create(ctx, crb, metav1.CreateOptions{})).NotTo(BeNil())
syncStatus(checkAll)

clusterRoleBindingHandler.updateCallExpected()
crb = newClusterRoleBinding(crbName, crName,
[]types.NamespacedName{{Name: saName, Namespace: ns}, {Name: saName2, Namespace: ns}})
Expect(fakeClient.RbacV1().ClusterRoleBindings().Update(ctx, crb, metav1.UpdateOptions{})).NotTo(BeNil())
syncStatus(checkAll)

clusterRoleBindingHandler.updateCallExpected()
crb = newClusterRoleBinding(crbName, crName, []types.NamespacedName{{Name: saName2, Namespace: ns}})
Expect(fakeClient.RbacV1().ClusterRoleBindings().Update(ctx, crb, metav1.UpdateOptions{})).NotTo(BeNil())
syncStatus(checkAll)

clusterRoleBindingHandler.updateCallExpected()
crb = newClusterRoleBinding(crbName, crName, []types.NamespacedName{})
Expect(fakeClient.RbacV1().ClusterRoleBindings().Update(ctx, crb, metav1.UpdateOptions{})).NotTo(BeNil())
syncStatus(checkAll)
})
})

var _ = Describe("test suite with mock relation for fed namespaces and local cluster pods", func() {
Expand Down Expand Up @@ -1579,7 +1608,7 @@ var _ = Describe("test suite with mock relation for fed namespaces and local clu
nsStorage, _ = manager.GetTopoNodeStorage(NamespaceMeta)
Expect(nsStorage).NotTo(BeNil())

ctx, cancel = context.WithCancel((context.Background()))
ctx, cancel = context.WithCancel(context.Background())
nsHandler = &objecthandler{}
Expect(manager.AddNodeHandler(NamespaceMeta, nsHandler)).NotTo(HaveOccurred())

Expand Down Expand Up @@ -1780,7 +1809,7 @@ var _ = Describe("test suite for multi routine", func() {
Expect(deployStorage).NotTo(BeNil())
Expect(inspectDeployStorage).NotTo(BeNil())

ctx, cancel = context.WithCancel((context.Background()))
ctx, cancel = context.WithCancel(context.Background())
podHandler = &objecthandler{needRangePostOrder: true, needRangePreOrder: true}
Expect(manager.AddNodeHandler(PodMeta, podHandler)).Should(Succeed())
replicasetHandler = &objecthandler{needRangePostOrder: true, needRangePreOrder: true}
Expand Down
1 change: 1 addition & 0 deletions resourcetopo/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ type ResourceRelation struct {
DirectRefs []types.NamespacedName

// LabelSelector offers a kubernetes label-selector way for svc-pod type relation, could be nil if it's empty.
// If Set, will ignore DirectRefs.
LabelSelector *metav1.LabelSelector
}

Expand Down
Loading
Loading