77import org .slf4j .LoggerFactory ;
88
99import io .fabric8 .kubernetes .api .model .HasMetadata ;
10+ import io .fabric8 .kubernetes .client .KubernetesClientException ;
1011import io .fabric8 .kubernetes .client .dsl .base .PatchContext ;
1112import io .fabric8 .kubernetes .client .dsl .base .PatchType ;
13+ import io .javaoperatorsdk .operator .OperatorException ;
1214import io .javaoperatorsdk .operator .api .reconciler .support .PrimaryResourceCache ;
1315import io .javaoperatorsdk .operator .processing .event .ResourceID ;
1416
2022 */
2123public class PrimaryUpdateAndCacheUtils {
2224
25+ public static final int DEFAULT_MAX_RETRY = 3 ;
26+
2327 private PrimaryUpdateAndCacheUtils () {}
2428
2529 private static final Logger log = LoggerFactory .getLogger (PrimaryUpdateAndCacheUtils .class );
@@ -40,6 +44,15 @@ public static <P extends HasMetadata> P updateStatusAndCacheResource(
4044 primary , context , () -> context .getClient ().resource (primary ).updateStatus ());
4145 }
4246
47+ public static <P extends HasMetadata > P updateStatusAndCacheResourceWithLock (
48+ P primary , Context <P > context , UnaryOperator <P > modificationFunction ) {
49+ return updateAndCacheResourceWithLock (
50+ primary ,
51+ context ,
52+ modificationFunction ,
53+ r -> context .getClient ().resource (r ).updateStatus ());
54+ }
55+
4356 /**
4457 * Patches status with and makes sure that the up-to-date primary resource will be present during
4558 * the next reconciliation. Using JSON Merge patch.
@@ -56,6 +69,12 @@ public static <P extends HasMetadata> P patchStatusAndCacheResource(
5669 primary , context , () -> context .getClient ().resource (primary ).patchStatus ());
5770 }
5871
72+ public static <P extends HasMetadata > P patchStatusAndCacheResourceWithLock (
73+ P primary , Context <P > context , UnaryOperator <P > modificationFunction ) {
74+ return updateAndCacheResourceWithLock (
75+ primary , context , modificationFunction , r -> context .getClient ().resource (r ).patchStatus ());
76+ }
77+
5978 /**
6079 * Patches status and makes sure that the up-to-date primary resource will be present during the
6180 * next reconciliation. Using JSON Patch.
@@ -72,6 +91,15 @@ public static <P extends HasMetadata> P editStatusAndCacheResource(
7291 primary , context , () -> context .getClient ().resource (primary ).editStatus (operation ));
7392 }
7493
94+ public static <P extends HasMetadata > P editStatusAndCacheResourceWithLock (
95+ P primary , Context <P > context , UnaryOperator <P > modificationFunction ) {
96+ return updateAndCacheResourceWithLock (
97+ primary ,
98+ context ,
99+ UnaryOperator .identity (),
100+ r -> context .getClient ().resource (r ).editStatus (modificationFunction ));
101+ }
102+
75103 /**
76104 * Patches the resource with supplied method and makes sure that the up-to-date primary resource
77105 * will be present during the next reconciliation.
@@ -124,6 +152,25 @@ public static <P extends HasMetadata> P ssaPatchStatusAndCacheResource(
124152 return res ;
125153 }
126154
155+ public static <P extends HasMetadata > P ssaPatchStatusAndCacheResourceWithLock (
156+ P primary , P freshResourceWithStatus , Context <P > context ) {
157+ return updateAndCacheResourceWithLock (
158+ primary ,
159+ context ,
160+ r -> freshResourceWithStatus ,
161+ r ->
162+ context
163+ .getClient ()
164+ .resource (r )
165+ .subresource ("status" )
166+ .patch (
167+ new PatchContext .Builder ()
168+ .withForce (true )
169+ .withFieldManager (context .getControllerConfiguration ().fieldManager ())
170+ .withPatchType (PatchType .SERVER_SIDE_APPLY )
171+ .build ()));
172+ }
173+
127174 /**
128175 * Patches the resource status and caches the response in provided {@link PrimaryResourceCache}.
129176 * Uses Server Side Apply.
@@ -228,4 +275,71 @@ private static <P extends HasMetadata> void logWarnIfResourceVersionPresent(P pr
228275 + "using optimistic locking is discouraged for this purpose. " );
229276 }
230277 }
278+
279+ public static <P extends HasMetadata > P updateAndCacheResourceWithLock (
280+ P primary ,
281+ Context <P > context ,
282+ UnaryOperator <P > modificationFunction ,
283+ UnaryOperator <P > updateMethod ) {
284+ return updateAndCacheResourceWithLock (
285+ primary , context , modificationFunction , updateMethod , DEFAULT_MAX_RETRY );
286+ }
287+
288+ @ SuppressWarnings ("unchecked" )
289+ public static <P extends HasMetadata > P updateAndCacheResourceWithLock (
290+ P primary ,
291+ Context <P > context ,
292+ UnaryOperator <P > modificationFunction ,
293+ UnaryOperator <P > updateMethod ,
294+ int maxRetry ) {
295+
296+ if (log .isDebugEnabled ()) {
297+ log .debug ("Conflict retrying update for: {}" , ResourceID .fromResource (primary ));
298+ }
299+ int retryIndex = 0 ;
300+ while (true ) {
301+ try {
302+ var modified = modificationFunction .apply (primary );
303+ modified .getMetadata ().setResourceVersion (primary .getMetadata ().getResourceVersion ());
304+ var updated = updateMethod .apply (modified );
305+ context
306+ .eventSourceRetriever ()
307+ .getControllerEventSource ()
308+ .handleRecentResourceUpdate (ResourceID .fromResource (primary ), updated , primary );
309+ return updated ;
310+ } catch (KubernetesClientException e ) {
311+ log .trace ("Exception during patch for resource: {}" , primary );
312+ retryIndex ++;
313+ // only retry on conflict (409) and unprocessable content (422) which
314+ // can happen if JSON Patch is not a valid request since there was
315+ // a concurrent request which already removed another finalizer:
316+ // List element removal from a list is by index in JSON Patch
317+ // so if addressing a second finalizer but first is meanwhile removed
318+ // it is a wrong request.
319+ if (e .getCode () != 409 && e .getCode () != 422 ) {
320+ throw e ;
321+ }
322+ if (retryIndex >= maxRetry ) {
323+ throw new OperatorException (
324+ "Exceeded maximum ("
325+ + maxRetry
326+ + ") retry attempts to patch resource: "
327+ + ResourceID .fromResource (primary ));
328+ }
329+ log .debug (
330+ "Retrying patch for resource name: {}, namespace: {}; HTTP code: {}" ,
331+ primary .getMetadata ().getName (),
332+ primary .getMetadata ().getNamespace (),
333+ e .getCode ());
334+ primary =
335+ (P )
336+ context
337+ .getClient ()
338+ .resources (primary .getClass ())
339+ .inNamespace (primary .getMetadata ().getNamespace ())
340+ .withName (primary .getMetadata ().getName ())
341+ .get ();
342+ }
343+ }
344+ }
231345}
0 commit comments