Skip to content

Commit 933605a

Browse files
committed
[SPARK-56228] Fix StatusRecorder to refresh resourceVersion on 409 Error before retrying
### What changes were proposed in this pull request? This PR fixes the `StatusRecorder.patchAndStatusWithVersionLocked()` retry loop to refresh the `resourceVersion` from the API server when a 409 Conflict occurs, before retrying the status update. ### Why are the changes needed? When a 409 Conflict (`"the object has been modified; please apply your changes to the latest version"`) occurs during a status update, the retry loop retries with the same stale `resourceVersion`, causing all subsequent retries to fail with the same conflict error. The fix fetches the latest resource from the API server on 409 to obtain the current `resourceVersion`, following the existing pattern used in `ReconcilerUtils`. ``` 26/03/26 02:10:34 ERROR pi default o.a.s.k.o.u.StatusRecorder Error while persisting status to ApplicationStatus(super=BaseStatus(currentState=ApplicationState(super=BaseState(currentStateSummary=ResourceReleased, lastTransitionTime=2026-03-26T02:10:31.642213107Z, message=null), lastObservedDriverStatus=null), stateTransitionHistory={0=ApplicationState(super=BaseState(currentStatio.fabric8.kubernetes.client.KubernetesClientException: Failure executing: PUT at: https://10.43.0.1:443/apis/spark.apache.org/v1/namespaces/default/sparkapplications/pi/status. Message: Operation cannot be fulfilled on sparkapplications.spark.apache.org "pi": the object has been modified; please apply your changes to the latest version and try again. Received status: Status(apiV at io.fabric8.kubernetes.client.KubernetesClientException.copyAsCause(KubernetesClientException.java:205) at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.waitForResult(OperationSupport.java:507) at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.handleResponse(OperationSupport.java:524) at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.handleUpdate(OperationSupport.java:358) at io.fabric8.kubernetes.client.dsl.internal.BaseOperation.handleUpdate(BaseOperation.java:771) at io.fabric8.kubernetes.client.dsl.internal.HasMetadataOperation.update(HasMetadataOperation.java:138) at io.fabric8.kubernetes.client.dsl.internal.HasMetadataOperation.update(HasMetadataOperation.java:121) at io.fabric8.kubernetes.client.dsl.internal.HasMetadataOperation.updateStatus(HasMetadataOperation.java:126) at io.fabric8.kubernetes.client.dsl.internal.HasMetadataOperation.updateStatus(HasMetadataOperation.java:44) at org.apache.spark.k8s.operator.utils.StatusRecorder.patchAndStatusWithVersionLocked(StatusRecorder.java:99) ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? A new unit test `refreshesResourceVersionOn409Conflict` was added to `StatusRecorderTest` to verify that the `resourceVersion` is refreshed after a 409 Conflict and the retry succeeds. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code (claude-opus-4-6) Closes #592 from dongjoon-hyun/SPARK-56228. Authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent a2d216e commit 933605a

2 files changed

Lines changed: 44 additions & 0 deletions

File tree

spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/StatusRecorder.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.spark.k8s.operator.utils;
2121

22+
import static java.net.HttpURLConnection.HTTP_CONFLICT;
2223
import static org.apache.spark.k8s.operator.config.SparkOperatorConf.API_RETRY_ATTEMPT_AFTER_SECONDS;
2324
import static org.apache.spark.k8s.operator.config.SparkOperatorConf.API_STATUS_PATCH_MAX_ATTEMPTS;
2425

@@ -102,6 +103,18 @@ private void patchAndStatusWithVersionLocked(CR resource, KubernetesClient clien
102103
break;
103104
} catch (KubernetesClientException e) {
104105
log.debug("Error while patching status, retrying {}/{}...", i, maxRetry, e);
106+
if (e.getCode() == HTTP_CONFLICT) {
107+
try {
108+
CR latest = client.resource(resource).get();
109+
if (latest != null) {
110+
resource
111+
.getMetadata()
112+
.setResourceVersion(latest.getMetadata().getResourceVersion());
113+
}
114+
} catch (KubernetesClientException refreshEx) {
115+
log.debug("Failed to refresh resource version", refreshEx);
116+
}
117+
}
105118
try {
106119
Thread.sleep(TimeUnit.SECONDS.toMillis(API_RETRY_ATTEMPT_AFTER_SECONDS.getValue()));
107120
} catch (InterruptedException ie) {

spark-operator/src/test/java/org/apache/spark/k8s/operator/utils/StatusRecorderTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,37 @@ class StatusRecorderTest {
5656
new StatusRecorder<>(
5757
List.of(mockStatusListener), ApplicationStatus.class, SparkApplication.class);
5858

59+
@Test
60+
void refreshesResourceVersionOn409Conflict() {
61+
var testResource = getSparkApplication("1");
62+
var resourceV2 = getSparkApplication("2");
63+
var resourceV3 = getSparkApplication("3");
64+
65+
BaseContext<SparkApplication> context = mock(BaseContext.class);
66+
when(context.getResource()).thenReturn(testResource);
67+
when(context.getClient()).thenReturn(client);
68+
var basePath =
69+
"/apis/spark.apache.org/v1/namespaces/"
70+
+ DEFAULT_NS
71+
+ "/sparkapplications/"
72+
+ testResource.getMetadata().getName();
73+
var statusPath = basePath + "/status";
74+
// First status update returns 409 Conflict
75+
server.expect().withPath(statusPath).andReturn(409, null).once();
76+
// After 409, GET latest resource with resourceVersion "2"
77+
server.expect().withPath(basePath).andReturn(200, resourceV2).once();
78+
// Second status update succeeds with refreshed resourceVersion
79+
server.expect().withPath(statusPath).andReturn(200, resourceV3).once();
80+
81+
statusRecorder.persistStatus(context, new ApplicationStatus());
82+
83+
verify(mockStatusListener, times(1))
84+
.listenStatus(
85+
assertArg(a -> assertThat(a.getMetadata().getResourceVersion()).isEqualTo("3")),
86+
any(),
87+
any());
88+
}
89+
5990
@Test
6091
void retriesFailedStatusPatches() {
6192
var testResource = getSparkApplication("1");

0 commit comments

Comments
 (0)