Skip to content

Commit 40abfa1

Browse files
authored
Fixes #288 - Operator does not revoke permissions when roles get removed from the list (#295)
1 parent 94cc4f1 commit 40abfa1

9 files changed

Lines changed: 225 additions & 20 deletions

File tree

CONTRIBUTING.md

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,4 +55,15 @@ Please read through below conventions before contributions.
5555
- Go source files and directories use underscores, not dashes.
5656
- Package directories should generally avoid using separators as much as possible. When package names are multiple words, they usually should be in nested subdirectories.
5757
- Document directories and filenames should use dashes rather than underscores.
58-
- All source files should add a license at the beginning.
58+
- All source files should add a license at the beginning.
59+
60+
61+
### How to work locally
62+
63+
1. Clones this repo
64+
2. Create the cluster `minikube start --memory=8192 --cpus=4`
65+
3. [Deploy Apache Pulsar Standalone](https://pulsar.apache.org/docs/4.0.x/getting-started-helm/#step-1-install-pulsar-helm-chart)
66+
4. Open the minikube tunnel in another terminal `minikube tunnel -c`
67+
5. Apply operator's crds `make install`
68+
6. Executes `go run .` in order to run the operator locally rather than inside the cluster
69+
7. Run tests `~/go/bin/ginkgo ./operator`

pkg/admin/dummy.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,16 @@ func (d *DummyPulsarAdmin) RevokePermissions(Permissioner) error {
111111
return nil
112112
}
113113

114+
// GetTopicPermissions is a fake implements of GetTopicPermissions
115+
func (d *DummyPulsarAdmin) GetTopicPermissions(string) (map[string][]utils.AuthAction, error) {
116+
return map[string][]utils.AuthAction{}, nil
117+
}
118+
119+
// GetNamespacePermissions is a fake implements of GetNamespacePermissions
120+
func (d *DummyPulsarAdmin) GetNamespacePermissions(string) (map[string][]utils.AuthAction, error) {
121+
return map[string][]utils.AuthAction{}, nil
122+
}
123+
114124
// GetSchema is a fake implements of GetSchema
115125
func (d *DummyPulsarAdmin) GetSchema(string) (*v1alpha1.SchemaInfo, error) {
116126
return nil, nil

pkg/admin/impl.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -504,6 +504,24 @@ func (p *PulsarAdminClient) RevokePermissions(permission Permissioner) error {
504504
return nil
505505
}
506506

507+
// GetTopicPermissions retrieve permission by name
508+
func (p *PulsarAdminClient) GetTopicPermissions(topic string) (map[string][]utils.AuthAction, error) {
509+
topicName, err := utils.GetTopicName(topic)
510+
if err != nil {
511+
return nil, err
512+
}
513+
return p.adminClient.Topics().GetPermissions(*topicName)
514+
}
515+
516+
// GetNamespacePermissions retrieve permission by name
517+
func (p *PulsarAdminClient) GetNamespacePermissions(namespaceName string) (map[string][]utils.AuthAction, error) {
518+
namespace, err := utils.GetNamespaceName(namespaceName)
519+
if err != nil {
520+
return nil, err
521+
}
522+
return p.adminClient.Namespaces().GetNamespacePermissions(*namespace)
523+
}
524+
507525
// convertActions converts actions type from string to common.AuthAction
508526
func convertActions(actions []string) ([]utils.AuthAction, error) {
509527
r := make([]utils.AuthAction, 0)

pkg/admin/interface.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,12 @@ type PulsarAdmin interface {
133133
// it will revoke all actions which granted to a role on a namespace or topic
134134
RevokePermissions(p Permissioner) error
135135

136+
// GetNamespacePermissions get permissions by namespace
137+
GetNamespacePermissions(namespace string) (map[string][]utils2.AuthAction, error)
138+
139+
// GetTopicPermissions get permissions by topic
140+
GetTopicPermissions(topic string) (map[string][]utils2.AuthAction, error)
141+
136142
// Close releases the connection with pulsar admin
137143
Close() error
138144

pkg/connection/reconcile_permission.go

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,17 @@ package connection
1717
import (
1818
"context"
1919
"fmt"
20+
"slices"
2021

22+
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
2123
"github.com/go-logr/logr"
22-
"github.com/streamnative/pulsar-resources-operator/pkg/feature"
2324
"k8s.io/apimachinery/pkg/api/meta"
2425
"sigs.k8s.io/controller-runtime/pkg/client"
2526
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
2627

2728
resourcev1alpha1 "github.com/streamnative/pulsar-resources-operator/api/v1alpha1"
2829
"github.com/streamnative/pulsar-resources-operator/pkg/admin"
30+
"github.com/streamnative/pulsar-resources-operator/pkg/feature"
2931
"github.com/streamnative/pulsar-resources-operator/pkg/reconciler"
3032
)
3133

@@ -122,18 +124,66 @@ func (r *PulsarPermissionReconciler) ReconcilePermission(ctx context.Context, pu
122124
return nil
123125
}
124126

125-
log.V(1).Info("Granting permission", "ResourceName", permission.Spec.ResourceName,
127+
log.Info("Updating permission", "ResourceName", permission.Spec.ResourceName,
126128
"ResourceType", permission.Spec.ResoureType, "Roles", permission.Spec.Roles, "Actions", permission.Spec.Actions)
127-
if err := pulsarAdmin.GrantPermissions(per); err != nil {
128-
log.Error(err, "Grant permission failed")
129+
130+
var currentPermissions map[string][]utils.AuthAction
131+
var err error
132+
133+
if permission.Spec.ResoureType == resourcev1alpha1.PulsarResourceTypeTopic {
134+
currentPermissions, err = pulsarAdmin.GetTopicPermissions(permission.Spec.ResourceName)
135+
} else {
136+
currentPermissions, err = pulsarAdmin.GetNamespacePermissions(permission.Spec.ResourceName)
137+
}
138+
139+
if err != nil {
140+
log.Error(err, "Failed to get current permissions")
129141
meta.SetStatusCondition(&permission.Status.Conditions, *NewErrorCondition(permission.Generation, err.Error()))
130142
if err := r.conn.client.Status().Update(ctx, permission); err != nil {
131143
log.Error(err, "Failed to update permission status")
132-
return err
133144
}
134145
return err
135146
}
136147

148+
currentRoles := []string{}
149+
incomingRoles := permission.Spec.Roles
150+
151+
for role := range currentPermissions {
152+
currentRoles = append(currentRoles, role)
153+
}
154+
155+
// revoking roles
156+
for _, role := range currentRoles {
157+
if !slices.Contains(incomingRoles, role) {
158+
permission.Spec.Roles = []string{role}
159+
per := GetPermissioner(permission)
160+
if err := pulsarAdmin.RevokePermissions(per); err != nil {
161+
log.Error(err, "Revoke permission failed")
162+
meta.SetStatusCondition(&permission.Status.Conditions, *NewErrorCondition(permission.Generation, err.Error()))
163+
if err := r.conn.client.Status().Update(ctx, permission); err != nil {
164+
log.Error(err, "Failed to update permission status")
165+
return err
166+
}
167+
return err
168+
}
169+
}
170+
}
171+
172+
// granting roles
173+
for _, role := range incomingRoles {
174+
permission.Spec.Roles = []string{role}
175+
per := GetPermissioner(permission)
176+
if err := pulsarAdmin.GrantPermissions(per); err != nil {
177+
log.Error(err, "Grant permission failed")
178+
meta.SetStatusCondition(&permission.Status.Conditions, *NewErrorCondition(permission.Generation, err.Error()))
179+
if err := r.conn.client.Status().Update(ctx, permission); err != nil {
180+
log.Error(err, "Failed to update permission status")
181+
return err
182+
}
183+
return err
184+
}
185+
}
186+
137187
permission.Status.ObservedGeneration = permission.Generation
138188
meta.SetStatusCondition(&permission.Status.Conditions, *NewReadyCondition(permission.Generation))
139189
if err := r.conn.client.Status().Update(ctx, permission); err != nil {

tests/README.md

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,35 @@ tests is an individul module beside pulsar resources operator
55
`go mod tidy` to download modules for tests
66

77

8-
# Requirements
8+
## Requirements
99
- Pulsar Operator installed
1010
- A pulsar cluster installed without authentication and authorization
1111

1212

13-
# Run tests
13+
## Run tests
1414

15-
`ginkgo --trace --progress ./operator`
15+
`ginkgo --trace --progress ./operator`
16+
17+
Optionally, if you have an external pulsar cluster (e.g. deployed on minikube) and you want to test the operator without deploying it in kubernetes:
18+
19+
1. Run the code in a terminal
20+
21+
```bash
22+
make install
23+
go run .
24+
```
25+
26+
2. In another terminal run
27+
28+
```bash
29+
# your admin service url
30+
export ADMIN_SERVICE_URL=http://localhost:80
31+
# your pulsar namespace
32+
export NAMESPACE=pulsar
33+
# your pulsar broker name
34+
export BROKER_NAME=pulsar-mini
35+
# your pulsar proxy url
36+
export PROXY_URL=http://localhost:80
37+
38+
ginkgo --trace --progress ./operator
39+
```

tests/operator/operator_suite_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,11 @@ import (
3434
)
3535

3636
var (
37-
namespaceName string = "default"
37+
namespaceName string = utils.GetEnv("NAMESPACE", "default")
3838
k8sClient client.Client
3939
k8sConfig *rest.Config
40-
brokerName string = "test-pulsar"
41-
proxyURL string = fmt.Sprintf("http://%s-broker.%s.svc.cluster.local:6650", brokerName, namespaceName)
40+
brokerName string = utils.GetEnv("BROKER_NAME", "test-pulsar")
41+
proxyURL string = utils.GetEnv("PROXY_URL", fmt.Sprintf("http://%s-broker.%s.svc.cluster.local:6650", brokerName, namespaceName))
4242
pulsarClient pulsar.Client
4343
)
4444

tests/operator/resources_test.go

Lines changed: 85 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ var _ = Describe("Resources", func() {
9595
Expect(feature.SetFeatureGates()).ShouldNot(HaveOccurred())
9696
ctx = context.TODO()
9797
// use ClusterIP svc when run operator in k8s
98-
adminServiceURL := fmt.Sprintf("http://%s-broker.%s.svc.cluster.local:8080", brokerName, namespaceName)
98+
adminServiceURL := utils.GetEnv("ADMIN_SERVICE_URL", fmt.Sprintf("http://%s-broker.%s.svc.cluster.local:8080", brokerName, namespaceName))
9999
// use NodePort svc when cluster is kind cluster and run operator locally, the nodePort need to be setup in kind
100100
// adminServiceURL := fmt.Sprintf("http://127.0.0.1:%d", nodePort)
101101
pconn = utils.MakePulsarConnection(namespaceName, pconnName, adminServiceURL)
@@ -125,7 +125,7 @@ var _ = Describe("Resources", func() {
125125
})
126126

127127
Describe("Basic resource operations", Ordered, func() {
128-
Context("Check pulsar broker", func() {
128+
Context("Check pulsar broker", Label("Permissions"), func() {
129129
It("should create the pulsar broker successfully", func() {
130130
Eventually(func() bool {
131131
statefulset := &v1.StatefulSet{}
@@ -138,14 +138,14 @@ var _ = Describe("Resources", func() {
138138
})
139139
})
140140

141-
Context("PulsarConnection operation", func() {
141+
Context("PulsarConnection operation", Label("Permissions"), func() {
142142
It("should create the pulsarconnection successfully", func() {
143143
err := k8sClient.Create(ctx, pconn)
144144
Expect(err == nil || apierrors.IsAlreadyExists(err)).Should(BeTrue())
145145
})
146146
})
147147

148-
Context("PulsarTenant operation", func() {
148+
Context("PulsarTenant operation", Label("Permissions"), func() {
149149
It("should create the pulsartenant successfully", func() {
150150
err := k8sClient.Create(ctx, ptenant)
151151
Expect(err == nil || apierrors.IsAlreadyExists(err)).Should(BeTrue())
@@ -176,7 +176,7 @@ var _ = Describe("Resources", func() {
176176
})
177177
})
178178

179-
Context("PulsarNamespace operation", func() {
179+
Context("PulsarNamespace operation", Label("Permissions"), func() {
180180
It("should create the pulsarnamespace successfully", func() {
181181
err := k8sClient.Create(ctx, pnamespace)
182182
Expect(err == nil || apierrors.IsAlreadyExists(err)).Should(BeTrue())
@@ -193,7 +193,7 @@ var _ = Describe("Resources", func() {
193193
})
194194

195195
Context("PulsarTopic operation", Ordered, func() {
196-
It("should create the pulsartopic successfully", func() {
196+
It("should create the pulsartopic successfully", Label("Permissions"), func() {
197197
err := k8sClient.Create(ctx, ptopic)
198198
Expect(err == nil || apierrors.IsAlreadyExists(err)).Should(BeTrue())
199199
err = k8sClient.Create(ctx, ptopic2)
@@ -202,7 +202,7 @@ var _ = Describe("Resources", func() {
202202
Expect(err == nil || apierrors.IsAlreadyExists(err)).Should(BeTrue())
203203
})
204204

205-
It("should be ready", func() {
205+
It("should be ready", Label("Permissions"), func() {
206206
Eventually(func() bool {
207207
t := &v1alphav1.PulsarTopic{}
208208
tns := types.NamespacedName{Namespace: namespaceName, Name: ptopicName}
@@ -291,7 +291,7 @@ var _ = Describe("Resources", func() {
291291

292292
})
293293

294-
Context("PulsarPermission operation", func() {
294+
Context("PulsarPermission operation", Label("Permissions"), func() {
295295
It("should grant the pulsarpermission successfully", func() {
296296
err := k8sClient.Create(ctx, ppermission)
297297
Expect(err == nil || apierrors.IsAlreadyExists(err)).Should(BeTrue())
@@ -305,6 +305,69 @@ var _ = Describe("Resources", func() {
305305
return v1alphav1.IsPulsarResourceReady(t)
306306
}, "20s", "100ms").Should(BeTrue())
307307
})
308+
309+
It("should add a new role", func() {
310+
t := &v1alphav1.PulsarPermission{}
311+
tns := types.NamespacedName{Namespace: namespaceName, Name: ppermissionName}
312+
Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed())
313+
t.Spec.Roles = append(t.Spec.Roles, "spiderman")
314+
err := k8sClient.Update(ctx, t)
315+
Expect(err).Should(Succeed())
316+
})
317+
318+
It("should be ready", func() {
319+
Eventually(func() bool {
320+
t := &v1alphav1.PulsarPermission{}
321+
tns := types.NamespacedName{Namespace: namespaceName, Name: ppermissionName}
322+
Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed())
323+
return v1alphav1.IsPulsarResourceReady(t)
324+
}, "20s", "100ms").Should(BeTrue())
325+
})
326+
327+
It("spiderman should exists along with ironman", func() {
328+
Eventually(func(g Gomega) {
329+
podName := fmt.Sprintf("%s-broker-0", brokerName)
330+
containerName := fmt.Sprintf("%s-broker", brokerName)
331+
stdout, _, err := utils.ExecInPod(k8sConfig, namespaceName, podName, containerName,
332+
"./bin/pulsar-admin topics permissions "+ppermission.Spec.ResourceName)
333+
g.Expect(err).Should(Succeed())
334+
g.Expect(stdout).Should(Not(BeEmpty()))
335+
g.Expect(stdout).Should(ContainSubstring("ironman"))
336+
g.Expect(stdout).Should(ContainSubstring("spiderman"))
337+
}, "20s", "100ms").Should(Succeed())
338+
})
339+
340+
It("should delete a role", func() {
341+
t := &v1alphav1.PulsarPermission{}
342+
tns := types.NamespacedName{Namespace: namespaceName, Name: ppermissionName}
343+
Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed())
344+
// remove spiderman and assign to roles
345+
t.Spec.Roles = []string{"ironman"}
346+
err := k8sClient.Update(ctx, t)
347+
Expect(err).Should(Succeed())
348+
})
349+
350+
It("should be ready", func() {
351+
Eventually(func() bool {
352+
t := &v1alphav1.PulsarPermission{}
353+
tns := types.NamespacedName{Namespace: namespaceName, Name: ppermissionName}
354+
Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed())
355+
return v1alphav1.IsPulsarResourceReady(t)
356+
}, "20s", "100ms").Should(BeTrue())
357+
})
358+
359+
It("spiderman shouldn't exists anymore but ironman", func() {
360+
Eventually(func(g Gomega) {
361+
podName := fmt.Sprintf("%s-broker-0", brokerName)
362+
containerName := fmt.Sprintf("%s-broker", brokerName)
363+
stdout, _, err := utils.ExecInPod(k8sConfig, namespaceName, podName, containerName,
364+
"./bin/pulsar-admin topics permissions "+ptopic.Spec.Name)
365+
g.Expect(err).Should(Succeed())
366+
g.Expect(stdout).Should(Not(BeEmpty()))
367+
g.Expect(stdout).Should(ContainSubstring("ironman"))
368+
g.Expect(stdout).Should(Not(ContainSubstring("spiderman")))
369+
}, "20s", "100ms").Should(Succeed())
370+
})
308371
})
309372

310373
Context("PulsarFunction & PulsarPackage operation", func() {
@@ -514,6 +577,20 @@ var _ = Describe("Resources", func() {
514577
g.Expect(k8sClient.Delete(ctx, t)).Should(Succeed())
515578
}).Should(Succeed())
516579

580+
Eventually(func(g Gomega) {
581+
t := &v1alphav1.PulsarTopic{}
582+
tns := types.NamespacedName{Namespace: namespaceName, Name: ptopicName2}
583+
g.Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed())
584+
g.Expect(k8sClient.Delete(ctx, t)).Should(Succeed())
585+
}).Should(Succeed())
586+
587+
Eventually(func(g Gomega) {
588+
t := &v1alphav1.PulsarTopic{}
589+
tns := types.NamespacedName{Namespace: namespaceName, Name: partitionedTopic.Name}
590+
g.Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed())
591+
g.Expect(k8sClient.Delete(ctx, t)).Should(Succeed())
592+
}).Should(Succeed())
593+
517594
Eventually(func(g Gomega) {
518595
ns := &v1alphav1.PulsarNamespace{}
519596
tns := types.NamespacedName{Namespace: namespaceName, Name: pnamespaceName}

0 commit comments

Comments
 (0)