Skip to content

Commit 2ce4111

Browse files
freeznetclaude
andcommitted
fix(connection): use MergeFrom patch for finalizer writes to avoid 409 conflicts
Sub-resource reconcilers added and removed the resource finalizer with a full-object Update. The object is loaded once from the informer cache, so the Update carries a stale resourceVersion as an optimistic-lock precondition. When the server copy moves ahead — concurrent writers such as GitOps controllers, or informer-cache lag — the API server rejects the write with a 409 Conflict ("the object has been modified") and the finalizer is never removed. The object is then stuck Terminating while the connection requeues hot, and it only clears once the cache happens to catch up. Replace each finalizer Update with a client.MergeFrom patch, guarded by the boolean return of Add/RemoveFinalizer. The patch sends only the finalizer diff and carries no resourceVersion precondition, so it always applies to the live object regardless of cache lag or concurrent writers; the guard avoids an API call when the finalizer set is unchanged. Applied across all PulsarConnection sub-reconcilers (tenant, namespace, topic, source, sink, function, package, permission, geo-replication, ns-isolation-policy) and the PulsarConnection reconciler itself. Add a regression test that removes a PulsarSource finalizer from a stale-resourceVersion copy and asserts there is no conflict, that the write uses Patch rather than a full-object Update, and that the object is garbage-collected promptly. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
1 parent b8f7a8b commit 2ce4111

12 files changed

Lines changed: 308 additions & 85 deletions

pkg/connection/reconcile_function.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -101,20 +101,24 @@ func (r *PulsarFunctionReconciler) ReconcileFunction(ctx context.Context, pulsar
101101
}
102102

103103
// TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer
104-
controllerutil.RemoveFinalizer(instance, resourcev1alpha1.FinalizerName)
105-
if err := r.conn.client.Update(ctx, instance); err != nil {
106-
log.Error(err, "Failed to remove finalizer")
107-
return err
104+
patch := client.MergeFrom(instance.DeepCopy())
105+
if controllerutil.RemoveFinalizer(instance, resourcev1alpha1.FinalizerName) {
106+
if err := r.conn.client.Patch(ctx, instance, patch); err != nil {
107+
log.Error(err, "Failed to remove finalizer")
108+
return err
109+
}
108110
}
109111
return nil
110112
}
111113

112114
if instance.Spec.LifecyclePolicy != resourcev1alpha1.KeepAfterDeletion {
113115
// TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer
114-
controllerutil.AddFinalizer(instance, resourcev1alpha1.FinalizerName)
115-
if err := r.conn.client.Update(ctx, instance); err != nil {
116-
log.Error(err, "Failed to add finalizer")
117-
return err
116+
patch := client.MergeFrom(instance.DeepCopy())
117+
if controllerutil.AddFinalizer(instance, resourcev1alpha1.FinalizerName) {
118+
if err := r.conn.client.Patch(ctx, instance, patch); err != nil {
119+
log.Error(err, "Failed to add finalizer")
120+
return err
121+
}
118122
}
119123
}
120124

pkg/connection/reconcile_geo_replication.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -168,17 +168,21 @@ func (r *PulsarGeoReplicationReconciler) ReconcileGeoReplication(ctx context.Con
168168
}
169169
}
170170
}
171-
controllerutil.RemoveFinalizer(geoReplication, resourcev1alpha1.FinalizerName)
172-
if err := r.conn.client.Update(ctx, geoReplication); err != nil {
173-
log.Error(err, "Failed to remove finalizer")
174-
return err
171+
patch := client.MergeFrom(geoReplication.DeepCopy())
172+
if controllerutil.RemoveFinalizer(geoReplication, resourcev1alpha1.FinalizerName) {
173+
if err := r.conn.client.Patch(ctx, geoReplication, patch); err != nil {
174+
log.Error(err, "Failed to remove finalizer")
175+
return err
176+
}
175177
}
176178
}
177179
}
178-
controllerutil.AddFinalizer(geoReplication, resourcev1alpha1.FinalizerName)
179-
if err := r.conn.client.Update(ctx, geoReplication); err != nil {
180-
log.Error(err, "Failed to add finalizer")
181-
return err
180+
patch := client.MergeFrom(geoReplication.DeepCopy())
181+
if controllerutil.AddFinalizer(geoReplication, resourcev1alpha1.FinalizerName) {
182+
if err := r.conn.client.Patch(ctx, geoReplication, patch); err != nil {
183+
log.Error(err, "Failed to add finalizer")
184+
return err
185+
}
182186
}
183187

184188
secretUpdated, err := r.checkSecretRefUpdate(*destConnection, geoReplication.Spec.ClusterParamsOverride)

pkg/connection/reconcile_namespace.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -124,21 +124,25 @@ func (r *PulsarNamespaceReconciler) ReconcileNamespace(ctx context.Context, puls
124124
}
125125

126126
// TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer
127-
controllerutil.RemoveFinalizer(namespace, resourcev1alpha1.FinalizerName)
128-
if err := r.conn.client.Update(ctx, namespace); err != nil {
129-
log.Error(err, "Failed to remove finalizer")
130-
return err
127+
patch := client.MergeFrom(namespace.DeepCopy())
128+
if controllerutil.RemoveFinalizer(namespace, resourcev1alpha1.FinalizerName) {
129+
if err := r.conn.client.Patch(ctx, namespace, patch); err != nil {
130+
log.Error(err, "Failed to remove finalizer")
131+
return err
132+
}
131133
}
132134

133135
return nil
134136
}
135137

136138
if namespace.Spec.LifecyclePolicy != resourcev1alpha1.KeepAfterDeletion {
137139
// TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer
138-
controllerutil.AddFinalizer(namespace, resourcev1alpha1.FinalizerName)
139-
if err := r.conn.client.Update(ctx, namespace); err != nil {
140-
log.Error(err, "Failed to add finalizer")
141-
return err
140+
patch := client.MergeFrom(namespace.DeepCopy())
141+
if controllerutil.AddFinalizer(namespace, resourcev1alpha1.FinalizerName) {
142+
if err := r.conn.client.Patch(ctx, namespace, patch); err != nil {
143+
log.Error(err, "Failed to add finalizer")
144+
return err
145+
}
142146
}
143147
}
144148

pkg/connection/reconcile_nsisolationpolicy.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -105,20 +105,24 @@ func (r *PulsarNSIsolationPolicyReconciler) ReconcilePolicy(ctx context.Context,
105105
}
106106

107107
// TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer
108-
controllerutil.RemoveFinalizer(policy, resourcev1alpha1.FinalizerName)
109-
if err := r.conn.client.Update(ctx, policy); err != nil {
110-
log.Error(err, "Failed to remove finalizer")
111-
return err
108+
patch := client.MergeFrom(policy.DeepCopy())
109+
if controllerutil.RemoveFinalizer(policy, resourcev1alpha1.FinalizerName) {
110+
if err := r.conn.client.Patch(ctx, policy, patch); err != nil {
111+
log.Error(err, "Failed to remove finalizer")
112+
return err
113+
}
112114
}
113115

114116
return nil
115117
}
116118

117119
// TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer
118-
controllerutil.AddFinalizer(policy, resourcev1alpha1.FinalizerName)
119-
if err := r.conn.client.Update(ctx, policy); err != nil {
120-
log.Error(err, "Failed to add finalizer")
121-
return err
120+
patch := client.MergeFrom(policy.DeepCopy())
121+
if controllerutil.AddFinalizer(policy, resourcev1alpha1.FinalizerName) {
122+
if err := r.conn.client.Patch(ctx, policy, patch); err != nil {
123+
log.Error(err, "Failed to add finalizer")
124+
return err
125+
}
122126
}
123127

124128
if resourcev1alpha1.IsPulsarResourceReady(policy) &&

pkg/connection/reconcile_package.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -154,19 +154,23 @@ func (r *PulsarPackageReconciler) ReconcilePackage(ctx context.Context, pulsarAd
154154
}
155155
}
156156

157-
controllerutil.RemoveFinalizer(pkg, resourcev1alpha1.FinalizerName)
158-
if err := r.conn.client.Update(ctx, pkg); err != nil {
159-
log.Error(err, "Failed to remove finalizer")
160-
return err
157+
patch := client.MergeFrom(pkg.DeepCopy())
158+
if controllerutil.RemoveFinalizer(pkg, resourcev1alpha1.FinalizerName) {
159+
if err := r.conn.client.Patch(ctx, pkg, patch); err != nil {
160+
log.Error(err, "Failed to remove finalizer")
161+
return err
162+
}
161163
}
162164
return nil
163165
}
164166

165167
if pkg.Spec.LifecyclePolicy != resourcev1alpha1.KeepAfterDeletion {
166-
controllerutil.AddFinalizer(pkg, resourcev1alpha1.FinalizerName)
167-
if err := r.conn.client.Update(ctx, pkg); err != nil {
168-
log.Error(err, "Failed to add finalizer")
169-
return err
168+
patch := client.MergeFrom(pkg.DeepCopy())
169+
if controllerutil.AddFinalizer(pkg, resourcev1alpha1.FinalizerName) {
170+
if err := r.conn.client.Patch(ctx, pkg, patch); err != nil {
171+
log.Error(err, "Failed to add finalizer")
172+
return err
173+
}
170174
}
171175
}
172176

pkg/connection/reconcile_permission.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -102,20 +102,24 @@ func (r *PulsarPermissionReconciler) ReconcilePermission(ctx context.Context, pu
102102
}
103103
}
104104
// TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer
105-
controllerutil.RemoveFinalizer(permission, resourcev1alpha1.FinalizerName)
106-
if err := r.conn.client.Update(ctx, permission); err != nil {
107-
log.Error(err, "Failed to remove finalizer")
108-
return err
105+
patch := client.MergeFrom(permission.DeepCopy())
106+
if controllerutil.RemoveFinalizer(permission, resourcev1alpha1.FinalizerName) {
107+
if err := r.conn.client.Patch(ctx, permission, patch); err != nil {
108+
log.Error(err, "Failed to remove finalizer")
109+
return err
110+
}
109111
}
110112
return nil
111113
}
112114

113115
if permission.Spec.LifecyclePolicy != resourcev1alpha1.KeepAfterDeletion {
114116
// TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer
115-
controllerutil.AddFinalizer(permission, resourcev1alpha1.FinalizerName)
116-
if err := r.conn.client.Update(ctx, permission); err != nil {
117-
log.Error(err, "Failed to add finalizer")
118-
return err
117+
patch := client.MergeFrom(permission.DeepCopy())
118+
if controllerutil.AddFinalizer(permission, resourcev1alpha1.FinalizerName) {
119+
if err := r.conn.client.Patch(ctx, permission, patch); err != nil {
120+
log.Error(err, "Failed to add finalizer")
121+
return err
122+
}
119123
}
120124
}
121125

pkg/connection/reconcile_sink.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -99,20 +99,24 @@ func (r *PulsarSinkReconciler) ReconcileSink(ctx context.Context, pulsarAdmin ad
9999
}
100100

101101
// TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer
102-
controllerutil.RemoveFinalizer(sink, resourcev1alpha1.FinalizerName)
103-
if err := r.conn.client.Update(ctx, sink); err != nil {
104-
log.Error(err, "Failed to remove finalizer")
105-
return err
102+
patch := client.MergeFrom(sink.DeepCopy())
103+
if controllerutil.RemoveFinalizer(sink, resourcev1alpha1.FinalizerName) {
104+
if err := r.conn.client.Patch(ctx, sink, patch); err != nil {
105+
log.Error(err, "Failed to remove finalizer")
106+
return err
107+
}
106108
}
107109
return nil
108110
}
109111

110112
if sink.Spec.LifecyclePolicy != resourcev1alpha1.KeepAfterDeletion {
111113
// TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer
112-
controllerutil.AddFinalizer(sink, resourcev1alpha1.FinalizerName)
113-
if err := r.conn.client.Update(ctx, sink); err != nil {
114-
log.Error(err, "Failed to add finalizer")
115-
return err
114+
patch := client.MergeFrom(sink.DeepCopy())
115+
if controllerutil.AddFinalizer(sink, resourcev1alpha1.FinalizerName) {
116+
if err := r.conn.client.Patch(ctx, sink, patch); err != nil {
117+
log.Error(err, "Failed to add finalizer")
118+
return err
119+
}
116120
}
117121
}
118122

pkg/connection/reconcile_source.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -100,20 +100,24 @@ func (r *PulsarSourceReconciler) ReconcileSource(ctx context.Context, pulsarAdmi
100100
}
101101

102102
// TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer
103-
controllerutil.RemoveFinalizer(source, resourcev1alpha1.FinalizerName)
104-
if err := r.conn.client.Update(ctx, source); err != nil {
105-
log.Error(err, "Failed to remove finalizer")
106-
return err
103+
patch := client.MergeFrom(source.DeepCopy())
104+
if controllerutil.RemoveFinalizer(source, resourcev1alpha1.FinalizerName) {
105+
if err := r.conn.client.Patch(ctx, source, patch); err != nil {
106+
log.Error(err, "Failed to remove finalizer")
107+
return err
108+
}
107109
}
108110
return nil
109111
}
110112

111113
if source.Spec.LifecyclePolicy != resourcev1alpha1.KeepAfterDeletion {
112114
// TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer
113-
controllerutil.AddFinalizer(source, resourcev1alpha1.FinalizerName)
114-
if err := r.conn.client.Update(ctx, source); err != nil {
115-
log.Error(err, "Failed to add finalizer")
116-
return err
115+
patch := client.MergeFrom(source.DeepCopy())
116+
if controllerutil.AddFinalizer(source, resourcev1alpha1.FinalizerName) {
117+
if err := r.conn.client.Patch(ctx, source, patch); err != nil {
118+
log.Error(err, "Failed to add finalizer")
119+
return err
120+
}
117121
}
118122
}
119123

0 commit comments

Comments
 (0)