Skip to content

Commit 67efa66

Browse files
committed
fix: re-fetch resources before status patch to avoid 409 Conflict
When running operator ITs against a remote cluster, the operator can reconcile between create() and patchStatus(), bumping resourceVersion and making the held object stale. Apply the same re-fetch pattern (already used for KafkaService in KafkaProxyReconcilerIT) to all remaining updateStatusObservedGeneration overloads in both KafkaProxyReconcilerIT and VirtualKafkaClusterReconcilerIT. Assisted-by: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Sam Barker <sam@quadrocket.co.uk>
1 parent 67e16f2 commit 67efa66

2 files changed

Lines changed: 34 additions & 21 deletions

File tree

kroxylicious-operator/src/test/java/io/kroxylicious/kubernetes/operator/reconciler/kafkaproxy/KafkaProxyReconcilerIT.java

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,6 @@
1919
import java.util.function.Predicate;
2020
import java.util.stream.Stream;
2121

22-
import org.slf4j.Logger;
23-
import org.slf4j.LoggerFactory;
24-
2522
import org.assertj.core.api.AbstractStringAssert;
2623
import org.assertj.core.api.InstanceOfAssertFactories;
2724
import org.awaitility.core.ConditionFactory;
@@ -31,6 +28,8 @@
3128
import org.junit.jupiter.params.ParameterizedTest;
3229
import org.junit.jupiter.params.provider.Arguments;
3330
import org.junit.jupiter.params.provider.MethodSource;
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
3433

3534
import com.fasterxml.jackson.core.JsonProcessingException;
3635

@@ -1459,30 +1458,36 @@ void deleteAndRestoreADependency() {
14591458

14601459
// the KafkaProxyReconciler only operates on Clusters that have been reconciled, ie metadata.status == status.observedGeneration
14611460
private VirtualKafkaCluster updateStatusObservedGeneration(VirtualKafkaCluster clusterBar) {
1462-
clusterBar.setStatus(new VirtualKafkaClusterStatusBuilder().withObservedGeneration(generation(clusterBar)).build());
1463-
return operator.patchStatus(clusterBar);
1461+
// Re-fetch to get the latest resourceVersion - the operator may have reconciled since we created it
1462+
VirtualKafkaCluster fresh = Objects.requireNonNull(operator.get(VirtualKafkaCluster.class, name(clusterBar)));
1463+
fresh.setStatus(new VirtualKafkaClusterStatusBuilder().withObservedGeneration(generation(fresh)).build());
1464+
return operator.patchStatus(fresh);
14641465
}
14651466

14661467
// the KafkaProxyReconciler only operates on KafkaProtocolFilters that have been reconciled, ie metadata.status == status.observedGeneration
14671468
private KafkaProtocolFilter updateStatusObservedGeneration(KafkaProtocolFilter filter) {
1468-
filter.setStatus(new KafkaProtocolFilterStatusBuilder().withObservedGeneration(generation(filter)).build());
1469-
return operator.patchStatus(filter);
1469+
// Re-fetch to get the latest resourceVersion - the operator may have reconciled since we created it
1470+
KafkaProtocolFilter fresh = Objects.requireNonNull(operator.get(KafkaProtocolFilter.class, name(filter)));
1471+
fresh.setStatus(new KafkaProtocolFilterStatusBuilder().withObservedGeneration(generation(fresh)).build());
1472+
return operator.patchStatus(fresh);
14701473
}
14711474

14721475
// the KafkaProxyReconciler only operates on KafkaServices that have been reconciled, ie metadata.status == status.observedGeneration
14731476
private KafkaService updateStatusObservedGeneration(KafkaService service, String bootstrapServers) {
14741477
// Re-fetch to get the latest resourceVersion - the operator may have reconciled since we created it
1475-
KafkaService fresh = operator.get(KafkaService.class, name(service));
1478+
KafkaService fresh = Objects.requireNonNull(operator.get(KafkaService.class, name(service)));
14761479
fresh.setStatus(new KafkaServiceStatusBuilder().withObservedGeneration(generation(fresh))
14771480
.withBootstrapServers(bootstrapServers)
14781481
.build());
14791482
return operator.patchStatus(fresh);
14801483
}
14811484

1482-
// the KafkaProxyReconciler only operates on KafkaServices that have been reconciled, ie metadata.status == status.observedGeneration
1485+
// the KafkaProxyReconciler only operates on KafkaProxyIngresses that have been reconciled, ie metadata.status == status.observedGeneration
14831486
private KafkaProxyIngress updateStatusObservedGeneration(KafkaProxyIngress ingress) {
1484-
ingress.setStatus(new KafkaProxyIngressStatusBuilder().withObservedGeneration(generation(ingress)).build());
1485-
return operator.patchStatus(ingress);
1487+
// Re-fetch to get the latest resourceVersion - the operator may have reconciled since we created it
1488+
KafkaProxyIngress fresh = Objects.requireNonNull(operator.get(KafkaProxyIngress.class, name(ingress)));
1489+
fresh.setStatus(new KafkaProxyIngressStatusBuilder().withObservedGeneration(generation(fresh)).build());
1490+
return operator.patchStatus(fresh);
14861491
}
14871492

14881493
private void assertDeploymentIsRemoved(KafkaProxy proxy) {

kroxylicious-operator/src/test/java/io/kroxylicious/kubernetes/operator/reconciler/virtualkafkacluster/VirtualKafkaClusterReconcilerIT.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import java.time.Duration;
1111
import java.util.List;
1212
import java.util.Map;
13+
import java.util.Objects;
1314

1415
import org.assertj.core.api.InstanceOfAssertFactories;
1516
import org.awaitility.core.ConditionFactory;
@@ -53,6 +54,7 @@
5354
import static io.kroxylicious.kubernetes.api.common.Protocol.TCP;
5455
import static io.kroxylicious.kubernetes.api.common.Protocol.TLS;
5556
import static io.kroxylicious.kubernetes.operator.ResourcesUtil.generation;
57+
import static io.kroxylicious.kubernetes.operator.ResourcesUtil.name;
5658
import static org.assertj.core.api.Assertions.assertThat;
5759
import static org.awaitility.Awaitility.await;
5860

@@ -618,24 +620,30 @@ private static KafkaProtocolFilter filter(String name) {
618620
// @formatter:on
619621
}
620622

621-
// the KafkaProxyReconciler only operates on KafkaProtocolFilters that have been reconciled, ie metadata.status == status.observedGeneration
623+
// the VirtualKafkaClusterReconciler only operates on KafkaProtocolFilters that have been reconciled, ie metadata.status == status.observedGeneration
622624
private KafkaProtocolFilter updateStatusObservedGeneration(KafkaProtocolFilter filter) {
623-
filter.setStatus(new KafkaProtocolFilterStatusBuilder().withObservedGeneration(generation(filter)).build());
624-
return operator.patchStatus(filter);
625+
// Re-fetch to get the latest resourceVersion - the operator may have reconciled since we created it
626+
KafkaProtocolFilter fresh = Objects.requireNonNull(operator.get(KafkaProtocolFilter.class, name(filter)));
627+
fresh.setStatus(new KafkaProtocolFilterStatusBuilder().withObservedGeneration(generation(fresh)).build());
628+
return operator.patchStatus(fresh);
625629
}
626630

627-
// the KafkaProxyReconciler only operates on KafkaServices that have been reconciled, ie metadata.status == status.observedGeneration
628-
private KafkaService updateStatusObservedGeneration(KafkaService filter, String bootstrapServers) {
629-
filter.setStatus(new KafkaServiceStatusBuilder().withObservedGeneration(generation(filter))
631+
// the VirtualKafkaClusterReconciler only operates on KafkaServices that have been reconciled, ie metadata.status == status.observedGeneration
632+
private KafkaService updateStatusObservedGeneration(KafkaService service, String bootstrapServers) {
633+
// Re-fetch to get the latest resourceVersion - the operator may have reconciled since we created it
634+
KafkaService fresh = Objects.requireNonNull(operator.get(KafkaService.class, name(service)));
635+
fresh.setStatus(new KafkaServiceStatusBuilder().withObservedGeneration(generation(fresh))
630636
.withBootstrapServers(bootstrapServers)
631637
.build());
632-
return operator.patchStatus(filter);
638+
return operator.patchStatus(fresh);
633639
}
634640

635-
// the KafkaProxyReconciler only operates on KafkaServices that have been reconciled, ie metadata.status == status.observedGeneration
641+
// the VirtualKafkaClusterReconciler only operates on KafkaProxyIngresses that have been reconciled, ie metadata.status == status.observedGeneration
636642
private KafkaProxyIngress updateStatusObservedGeneration(KafkaProxyIngress ingress) {
637-
ingress.setStatus(new KafkaProxyIngressStatusBuilder().withObservedGeneration(generation(ingress)).build());
638-
return operator.patchStatus(ingress);
643+
// Re-fetch to get the latest resourceVersion - the operator may have reconciled since we created it
644+
KafkaProxyIngress fresh = Objects.requireNonNull(operator.get(KafkaProxyIngress.class, name(ingress)));
645+
fresh.setStatus(new KafkaProxyIngressStatusBuilder().withObservedGeneration(generation(fresh)).build());
646+
return operator.patchStatus(fresh);
639647
}
640648

641649
}

0 commit comments

Comments
 (0)