Skip to content

Commit 21f350e

Browse files
authored
feat: Stateful Resource Reconciler (#320)
* fix permission * make StatefulResource * go fmt * fix lint * add tests
1 parent 5d2d67c commit 21f350e

3 files changed

Lines changed: 684 additions & 17 deletions

File tree

pkg/connection/reconcile_permission.go

Lines changed: 200 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package connection
1616

1717
import (
1818
"context"
19+
"encoding/json"
1920
"fmt"
2021
"slices"
2122

@@ -145,36 +146,75 @@ func (r *PulsarPermissionReconciler) ReconcilePermission(ctx context.Context, pu
145146
return err
146147
}
147148

148-
currentRoles := []string{}
149+
// Extract current desired state
150+
currentState := r.extractCurrentState(permission)
151+
152+
// Get previous state from annotation
153+
previousState := r.getPreviousState(permission)
154+
155+
// Check for context changes (ResourceType or ResourceName)
156+
contextChanged := false
157+
if previousState != nil {
158+
contextChanged = previousState.ResourceType != currentState.ResourceType ||
159+
previousState.ResourceName != currentState.ResourceName
160+
}
161+
162+
if contextChanged {
163+
log.Info("Context change detected, cleaning up previous permissions",
164+
"previousResourceType", previousState.ResourceType,
165+
"currentResourceType", currentState.ResourceType,
166+
"previousResourceName", previousState.ResourceName,
167+
"currentResourceName", currentState.ResourceName)
168+
169+
// Clean up previous context
170+
if err := r.cleanupPreviousContext(permission, *previousState); err != nil {
171+
log.Error(err, "Failed to cleanup previous context, continuing with current operations")
172+
}
173+
}
174+
175+
// Determine roles to manage
176+
var previouslyManagedRoles []string
177+
if previousState != nil && !contextChanged {
178+
previouslyManagedRoles = previousState.Roles
179+
}
180+
181+
currentRoles := make([]string, 0, len(currentPermissions))
149182
incomingRoles := permission.Spec.Roles
150183

151184
for role := range currentPermissions {
152185
currentRoles = append(currentRoles, role)
153186
}
154187

155-
// revoking roles
156-
for _, role := range currentRoles {
157-
if !slices.Contains(incomingRoles, role) {
158-
permission.Spec.Roles = []string{role}
159-
per := GetPermissioner(permission)
160-
if err := pulsarAdmin.RevokePermissions(per); err != nil {
161-
log.Error(err, "Revoke permission failed")
162-
meta.SetStatusCondition(&permission.Status.Conditions, *NewErrorCondition(permission.Generation, err.Error()))
163-
if err := r.conn.client.Status().Update(ctx, permission); err != nil {
164-
log.Error(err, "Failed to update permission status")
165-
return err
166-
}
188+
// Only revoke roles that were previously managed by this PulsarPermission resource
189+
// This prevents conflicts with other PulsarPermission resources managing the same target
190+
for _, role := range previouslyManagedRoles {
191+
// If this role is still in incoming roles OR doesn't exist currently, skip it
192+
if slices.Contains(incomingRoles, role) || !slices.Contains(currentRoles, role) {
193+
continue
194+
}
195+
196+
log.Info("Revoking previously managed role", "role", role)
197+
tempPermission := permission.DeepCopy()
198+
tempPermission.Spec.Roles = []string{role}
199+
per := GetPermissioner(tempPermission)
200+
if err := pulsarAdmin.RevokePermissions(per); err != nil {
201+
log.Error(err, "Revoke permission failed", "role", role)
202+
meta.SetStatusCondition(&permission.Status.Conditions, *NewErrorCondition(permission.Generation, err.Error()))
203+
if err := r.conn.client.Status().Update(ctx, permission); err != nil {
204+
log.Error(err, "Failed to update permission status")
167205
return err
168206
}
207+
return err
169208
}
170209
}
171210

172-
// granting roles
211+
// Grant permissions for all incoming roles
173212
for _, role := range incomingRoles {
174-
permission.Spec.Roles = []string{role}
175-
per := GetPermissioner(permission)
213+
tempPermission := permission.DeepCopy()
214+
tempPermission.Spec.Roles = []string{role}
215+
per := GetPermissioner(tempPermission)
176216
if err := pulsarAdmin.GrantPermissions(per); err != nil {
177-
log.Error(err, "Grant permission failed")
217+
log.Error(err, "Grant permission failed", "role", role)
178218
meta.SetStatusCondition(&permission.Status.Conditions, *NewErrorCondition(permission.Generation, err.Error()))
179219
if err := r.conn.client.Status().Update(ctx, permission); err != nil {
180220
log.Error(err, "Failed to update permission status")
@@ -184,6 +224,18 @@ func (r *PulsarPermissionReconciler) ReconcilePermission(ctx context.Context, pu
184224
}
185225
}
186226

227+
// Update the state annotation
228+
if err := r.updateStateAnnotation(permission, currentState); err != nil {
229+
log.Error(err, "Failed to update state annotation")
230+
return err
231+
}
232+
233+
// Update the resource with new annotations
234+
if err := r.conn.client.Update(ctx, permission); err != nil {
235+
log.Error(err, "Failed to update permission annotations")
236+
return err
237+
}
238+
187239
permission.Status.ObservedGeneration = permission.Generation
188240
meta.SetStatusCondition(&permission.Status.Conditions, *NewReadyCondition(permission.Generation))
189241
if err := r.conn.client.Status().Update(ctx, permission); err != nil {
@@ -216,3 +268,134 @@ func GetPermissioner(p *resourcev1alpha1.PulsarPermission) admin.Permissioner {
216268
}
217269
return nil
218270
}
271+
272+
const (
273+
// PulsarPermissionStateAnnotation is the annotation key used to store the previous state
274+
// of PulsarPermission resources for stateful reconciliation
275+
PulsarPermissionStateAnnotation = "pulsarpermissions.resource.streamnative.io/managed-state"
276+
)
277+
278+
// PulsarPermissionState represents the state that needs to be tracked for PulsarPermission resources
279+
type PulsarPermissionState struct {
280+
ResourceType string `json:"resourceType"`
281+
ResourceName string `json:"resourceName"`
282+
Roles []string `json:"roles"`
283+
Actions []string `json:"actions"`
284+
}
285+
286+
// extractCurrentState extracts the current desired state from the PulsarPermission spec
287+
func (r *PulsarPermissionReconciler) extractCurrentState(permission *resourcev1alpha1.PulsarPermission) PulsarPermissionState {
288+
// Sort roles and actions for consistent comparison
289+
roles := make([]string, len(permission.Spec.Roles))
290+
copy(roles, permission.Spec.Roles)
291+
slices.Sort(roles)
292+
293+
actions := make([]string, len(permission.Spec.Actions))
294+
copy(actions, permission.Spec.Actions)
295+
slices.Sort(actions)
296+
297+
return PulsarPermissionState{
298+
ResourceType: string(permission.Spec.ResoureType),
299+
ResourceName: permission.Spec.ResourceName,
300+
Roles: roles,
301+
Actions: actions,
302+
}
303+
}
304+
305+
// getPreviousState retrieves the previous state from the resource annotation
306+
func (r *PulsarPermissionReconciler) getPreviousState(permission *resourcev1alpha1.PulsarPermission) *PulsarPermissionState {
307+
annotations := permission.GetAnnotations()
308+
if annotations == nil {
309+
r.log.V(1).Info("No annotations found, treating as first reconciliation")
310+
return nil
311+
}
312+
313+
stateJSON, exists := annotations[PulsarPermissionStateAnnotation]
314+
if !exists {
315+
r.log.V(1).Info("No previous state annotation found, treating as first reconciliation")
316+
return nil
317+
}
318+
319+
// Try to unmarshal as PulsarPermissionState
320+
var previousState PulsarPermissionState
321+
if err := json.Unmarshal([]byte(stateJSON), &previousState); err != nil {
322+
r.log.Error(err, "Failed to unmarshal previous state annotation, treating as first reconciliation",
323+
"annotation", stateJSON)
324+
return nil
325+
}
326+
327+
return &previousState
328+
}
329+
330+
// updateStateAnnotation updates the annotation with the current state after successful reconciliation
331+
func (r *PulsarPermissionReconciler) updateStateAnnotation(permission *resourcev1alpha1.PulsarPermission, currentState PulsarPermissionState) error {
332+
stateJSON, err := json.Marshal(currentState)
333+
if err != nil {
334+
return err
335+
}
336+
337+
annotations := permission.GetAnnotations()
338+
if annotations == nil {
339+
annotations = make(map[string]string)
340+
permission.SetAnnotations(annotations)
341+
}
342+
343+
// Only update if the value has changed
344+
currentValue := annotations[PulsarPermissionStateAnnotation]
345+
newValue := string(stateJSON)
346+
347+
if currentValue != newValue {
348+
annotations[PulsarPermissionStateAnnotation] = newValue
349+
r.log.V(1).Info("Updated state annotation", "state", currentState)
350+
}
351+
352+
return nil
353+
}
354+
355+
// cleanupPreviousContext cleans up permissions from the previous resource context
356+
func (r *PulsarPermissionReconciler) cleanupPreviousContext(permission *resourcev1alpha1.PulsarPermission, prevState PulsarPermissionState) error {
357+
if len(prevState.Roles) == 0 {
358+
return nil
359+
}
360+
361+
r.log.Info("Cleaning up permissions from previous context",
362+
"previousResourceType", prevState.ResourceType,
363+
"previousResourceName", prevState.ResourceName,
364+
"rolesToCleanup", prevState.Roles)
365+
366+
// Create a temporary permission resource for the previous context
367+
tempPermission := permission.DeepCopy()
368+
tempPermission.Spec.ResourceName = prevState.ResourceName
369+
tempPermission.Spec.ResoureType = resourcev1alpha1.PulsarResourceType(prevState.ResourceType)
370+
tempPermission.Spec.Roles = prevState.Roles
371+
tempPermission.Spec.Actions = prevState.Actions
372+
373+
// Get permissioner for the previous context
374+
permissioner := GetPermissioner(tempPermission)
375+
if permissioner == nil {
376+
return fmt.Errorf("failed to get permissioner for previous context")
377+
}
378+
379+
// Get the pulsar admin instance
380+
pulsarAdmin := r.conn.pulsarAdmin
381+
382+
// Revoke all roles from the previous context
383+
for _, role := range prevState.Roles {
384+
r.log.Info("Revoking permission from previous context", "role", role)
385+
386+
// Create a temporary permission for this specific role
387+
rolePermission := tempPermission.DeepCopy()
388+
rolePermission.Spec.Roles = []string{role}
389+
rolePermissioner := GetPermissioner(rolePermission)
390+
391+
if err := pulsarAdmin.RevokePermissions(rolePermissioner); err != nil {
392+
r.log.Error(err, "Failed to revoke permission from previous context, continuing",
393+
"role", role,
394+
"previousResourceType", prevState.ResourceType,
395+
"previousResourceName", prevState.ResourceName)
396+
// Continue with other roles even if one fails
397+
}
398+
}
399+
400+
return nil
401+
}

0 commit comments

Comments
 (0)