Skip to content

Commit a067b8c

Browse files
committed
wip
Signed-off-by: Attila Mészáros <a_meszaros@apple.com>
1 parent 4fad756 commit a067b8c

8 files changed

Lines changed: 298 additions & 20 deletions

File tree

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ protected synchronized void handleEvent(
8484
try {
8585
if (log.isDebugEnabled()) {
8686
log.debug("Event received with action: {}", action);
87-
log.trace("Event Old resource: {},\n new resource: {}", oldResource, resource);
87+
log.debug("Event Old resource: {},\n new resource: {}", oldResource, resource);
8888
}
8989
MDCUtils.addResourceInfo(resource);
9090
controller.getEventSourceManager().broadcastOnResourceEvent(action, resource, oldResource);

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterDetails.java

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,10 @@
2727
class EventFilterDetails {
2828

2929
private int activeUpdates = 0;
30-
private ResourceEvent lastEvent;
30+
private ResourceEvent lastRelevantEvent;
3131
private String lastOwnUpdatedResourceVersion;
3232
private Set<String> allOwnResourceVersions = new HashSet<>();
33+
private Set<String> uncertainEvents = new HashSet<>();
3334

3435
public void increaseActiveUpdates() {
3536
activeUpdates = activeUpdates + 1;
@@ -53,18 +54,23 @@ public boolean decreaseActiveUpdates(String updatedResourceVersion) {
5354
return activeUpdates == 0;
5455
}
5556

56-
public void setLastEvent(ResourceEvent event) {
57-
lastEvent = event;
57+
public void setLastRelevantEvent(ResourceEvent event) {
58+
lastRelevantEvent = event;
5859
}
5960

60-
public Optional<ResourceEvent> getLatestEventAfterLastUpdateEvent() {
61-
if (lastEvent != null
62-
&& (lastOwnUpdatedResourceVersion == null
63-
|| ReconcilerUtilsInternal.compareResourceVersions(
64-
lastEvent.getResource().orElseThrow().getMetadata().getResourceVersion(),
65-
lastOwnUpdatedResourceVersion)
66-
> 0)) {
67-
return Optional.of(lastEvent);
61+
public Optional<ResourceEvent> getRelevantEventToPropagate() {
62+
if (lastRelevantEvent != null
63+
&& (lastOwnUpdatedResourceVersion == null
64+
|| ReconcilerUtilsInternal.compareResourceVersions(
65+
lastRelevantEvent
66+
.getResource()
67+
.orElseThrow()
68+
.getMetadata()
69+
.getResourceVersion(),
70+
lastOwnUpdatedResourceVersion)
71+
> 0)
72+
|| allOwnResourceVersions.containsAll(uncertainEvents)) {
73+
return Optional.of(lastRelevantEvent);
6874
}
6975
return Optional.empty();
7076
}
@@ -80,4 +86,8 @@ void addToOwnResourceVersions(String updateVersion) {
8086
public boolean isOwnResourceVersions(String resourceVersion) {
8187
return allOwnResourceVersions.contains(resourceVersion);
8288
}
89+
90+
public void addUncertainResourceVersion(String resourceVersion) {
91+
uncertainEvents.add(resourceVersion);
92+
}
8393
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,6 @@ public R eventFilteringUpdateAndCacheResource(R resourceToUpdate, UnaryOperator<
112112
var updatedForLambda = updatedResource;
113113
res.ifPresentOrElse(
114114
r -> {
115-
R latestResource = (R) r.getResource().orElseThrow();
116115
// as previous resource version we use the one from successful update, since
117116
// we process new event here only if that is more recent then the event from our update.
118117
// Note that this is equivalent with the scenario when an informer watch connection
@@ -123,8 +122,25 @@ public R eventFilteringUpdateAndCacheResource(R resourceToUpdate, UnaryOperator<
123122
(r instanceof ExtendedResourceEvent)
124123
? (R) ((ExtendedResourceEvent) r).getPreviousResource().orElse(null)
125124
: null;
126-
R prevVersionOfResource =
127-
updatedForLambda != null ? updatedForLambda : extendedResourcePrevVersion;
125+
R prevVersionOfResource = null;
126+
R latestResource = null;
127+
if (updatedForLambda != null) {
128+
var updatedNewerThanRelated =
129+
ReconcilerUtilsInternal.compareResourceVersions(
130+
updatedForLambda, r.getResource().orElseThrow())
131+
> 0;
132+
prevVersionOfResource =
133+
updatedNewerThanRelated
134+
? (extendedResourcePrevVersion != null
135+
? extendedResourcePrevVersion
136+
: prevVersionOfResource)
137+
: updatedForLambda;
138+
latestResource = updatedForLambda;
139+
} else {
140+
prevVersionOfResource = extendedResourcePrevVersion;
141+
latestResource = (R) r.getResource().orElseThrow();
142+
}
143+
128144
if (log.isDebugEnabled()) {
129145
log.debug(
130146
"Previous resource version: {} resource from update present: {}"

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ public synchronized Optional<ResourceEvent> doneEventFilterModify(
9898
return Optional.empty();
9999
}
100100
activeUpdates.remove(resourceID);
101-
var res = ed.getLatestEventAfterLastUpdateEvent();
101+
var res = ed.getRelevantEventToPropagate();
102102
log.debug(
103103
"Zero active updates for resource id: {}; event after update event: {}; updated resource"
104104
+ " version: {}",
@@ -156,13 +156,21 @@ private synchronized EventHandling onEvent(
156156
var au = activeUpdates.get(resourceId);
157157
if (au != null) {
158158
if (result == EventHandling.INTERMEDIATE) {
159-
return au.isOwnResourceVersions(resource.getMetadata().getResourceVersion())
160-
? EventHandling.DEFER
161-
: EventHandling.INTERMEDIATE;
159+
var ownResourceVersion =
160+
au.isOwnResourceVersions(resource.getMetadata().getResourceVersion());
161+
log.debug("Handling intermediate event. Own resource version: {}", ownResourceVersion);
162+
return ownResourceVersion ? EventHandling.DEFER : EventHandling.INTERMEDIATE;
162163
}
163164
if (result == EventHandling.NEW) {
165+
if (cached == null) {
166+
// this is for the case when temp cache is null, we receive an event
167+
// there is ongoing filtering-caching update; at this point we cannot tell
168+
// if that event is from our update
169+
log.debug("Setting uncertain resource version.");
170+
au.addUncertainResourceVersion(resource.getMetadata().getResourceVersion());
171+
}
164172
log.debug("Setting last event for id: {} delete: {}", resourceId, delete);
165-
au.setLastEvent(
173+
au.setLastRelevantEvent(
166174
delete
167175
? new ResourceDeleteEvent(
168176
ResourceAction.DELETED, resourceId, resource, unknownState)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright Java Operator SDK Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.javaoperatorsdk.operator.baseapi.deletionduringstatusupdate;
17+
18+
import io.fabric8.kubernetes.api.model.Namespaced;
19+
import io.fabric8.kubernetes.client.CustomResource;
20+
import io.fabric8.kubernetes.model.annotation.Group;
21+
import io.fabric8.kubernetes.model.annotation.ShortNames;
22+
import io.fabric8.kubernetes.model.annotation.Version;
23+
24+
@Group("sample.javaoperatorsdk")
25+
@Version("v1")
26+
@ShortNames("ddsu")
27+
public class DeletionDuringStatusUpdateCustomResource
28+
extends CustomResource<Void, DeletionDuringStatusUpdateStatus> implements Namespaced {}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Copyright Java Operator SDK Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.javaoperatorsdk.operator.baseapi.deletionduringstatusupdate;
17+
18+
import java.time.Duration;
19+
import java.util.Collections;
20+
import java.util.concurrent.TimeUnit;
21+
22+
import org.junit.jupiter.api.AfterEach;
23+
import org.junit.jupiter.api.Test;
24+
import org.junit.jupiter.api.extension.RegisterExtension;
25+
26+
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
27+
import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension;
28+
29+
import static org.assertj.core.api.Assertions.assertThat;
30+
import static org.awaitility.Awaitility.await;
31+
32+
/**
33+
* Regression test for: deletion event dropped when resource is deleted concurrently with a status
34+
* update.
35+
*/
36+
class DeletionDuringStatusUpdateIT {
37+
38+
static final String RESOURCE_NAME = "test-resource";
39+
40+
@RegisterExtension
41+
LocallyRunOperatorExtension extension =
42+
LocallyRunOperatorExtension.builder()
43+
.withReconciler(new DeletionDuringStatusUpdateReconciler())
44+
.build();
45+
46+
@AfterEach
47+
void forceCleanup() {
48+
// If the test failed, remove the finalizer so the resource can be deleted
49+
var res = extension.get(DeletionDuringStatusUpdateCustomResource.class, RESOURCE_NAME);
50+
if (res != null) {
51+
res.getMetadata().setFinalizers(Collections.emptyList());
52+
extension.replace(res);
53+
extension.delete(res);
54+
}
55+
}
56+
57+
@Test
58+
void deletionDuringStatusUpdateTriggersCleanup() throws InterruptedException {
59+
var reconciler = extension.getReconcilerOfType(DeletionDuringStatusUpdateReconciler.class);
60+
61+
extension.create(testResource());
62+
63+
// Wait until the reconciler is inside the update operation (active-update window is open)
64+
assertThat(reconciler.patchStartedLatch.await(30, TimeUnit.SECONDS))
65+
.as("reconciler should enter the patch update operation")
66+
.isTrue();
67+
68+
// Issue delete — K8s sets deletionTimestamp while the active-update window is open
69+
extension.delete(testResource());
70+
71+
// Wait for deletionTimestamp to be confirmed on the resource in K8s
72+
await()
73+
.atMost(Duration.ofSeconds(30))
74+
.until(
75+
() -> {
76+
var res =
77+
extension.get(DeletionDuringStatusUpdateCustomResource.class, RESOURCE_NAME);
78+
return res != null && res.isMarkedForDeletion();
79+
});
80+
81+
// Signal the reconciler to proceed with the actual PATCH. K8s will merge deletionTimestamp
82+
// into the response - the deletion event (lower RV) is now deferred and will be dropped
83+
// without the fix.
84+
reconciler.deleteConfirmedLatch.countDown();
85+
86+
// cleanup() must be called — the deletion must not be silently lost
87+
assertThat(reconciler.cleanupCalledLatch.await(30, TimeUnit.SECONDS))
88+
.as("cleanup() must be called after the status update that races with the delete")
89+
.isTrue();
90+
91+
// Resource must eventually disappear (finalizer removed)
92+
await()
93+
.atMost(Duration.ofSeconds(30))
94+
.untilAsserted(
95+
() ->
96+
assertThat(
97+
extension.get(
98+
DeletionDuringStatusUpdateCustomResource.class, RESOURCE_NAME))
99+
.isNull());
100+
}
101+
102+
DeletionDuringStatusUpdateCustomResource testResource() {
103+
var resource = new DeletionDuringStatusUpdateCustomResource();
104+
resource.setMetadata(new ObjectMetaBuilder().withName(RESOURCE_NAME).build());
105+
return resource;
106+
}
107+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright Java Operator SDK Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.javaoperatorsdk.operator.baseapi.deletionduringstatusupdate;
17+
18+
import java.util.concurrent.CountDownLatch;
19+
import java.util.concurrent.TimeUnit;
20+
21+
import io.javaoperatorsdk.operator.api.reconciler.Cleaner;
22+
import io.javaoperatorsdk.operator.api.reconciler.Context;
23+
import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
24+
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
25+
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
26+
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
27+
28+
@ControllerConfiguration
29+
public class DeletionDuringStatusUpdateReconciler
30+
implements Reconciler<DeletionDuringStatusUpdateCustomResource>,
31+
Cleaner<DeletionDuringStatusUpdateCustomResource> {
32+
33+
final CountDownLatch patchStartedLatch = new CountDownLatch(1);
34+
final CountDownLatch deleteConfirmedLatch = new CountDownLatch(1);
35+
final CountDownLatch cleanupCalledLatch = new CountDownLatch(1);
36+
37+
@Override
38+
public UpdateControl<DeletionDuringStatusUpdateCustomResource> reconcile(
39+
DeletionDuringStatusUpdateCustomResource resource,
40+
Context<DeletionDuringStatusUpdateCustomResource> context)
41+
throws InterruptedException {
42+
if (resource.isMarkedForDeletion()) {
43+
return UpdateControl.noUpdate();
44+
}
45+
46+
var status = new DeletionDuringStatusUpdateStatus();
47+
status.setReady(true);
48+
resource.setStatus(status);
49+
50+
context
51+
.resourceOperations()
52+
.resourcePatch(
53+
resource,
54+
r -> {
55+
patchStartedLatch.countDown();
56+
try {
57+
if (!deleteConfirmedLatch.await(30, TimeUnit.SECONDS)) {
58+
throw new RuntimeException("Timed out waiting for delete confirmation");
59+
}
60+
} catch (InterruptedException e) {
61+
Thread.currentThread().interrupt();
62+
throw new RuntimeException(e);
63+
}
64+
r.getMetadata().setResourceVersion(null);
65+
return context.getClient().resource(r).patchStatus();
66+
},
67+
context.eventSourceRetriever().getControllerEventSource());
68+
69+
return UpdateControl.noUpdate();
70+
}
71+
72+
@Override
73+
public DeleteControl cleanup(
74+
DeletionDuringStatusUpdateCustomResource resource,
75+
Context<DeletionDuringStatusUpdateCustomResource> context) {
76+
System.out.println("DeletionDuringStatusUpdateReconciler.cleanup");
77+
cleanupCalledLatch.countDown();
78+
return DeleteControl.defaultDelete();
79+
}
80+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright Java Operator SDK Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.javaoperatorsdk.operator.baseapi.deletionduringstatusupdate;
17+
18+
public class DeletionDuringStatusUpdateStatus {
19+
20+
private boolean ready;
21+
22+
public boolean isReady() {
23+
return ready;
24+
}
25+
26+
public void setReady(boolean ready) {
27+
this.ready = ready;
28+
}
29+
}

0 commit comments

Comments
 (0)