Skip to content

Commit 51d7087

Browse files
committed
improve: filter only own updates for read-after-write-conistency
Signed-off-by: Attila Mészáros <a_meszaros@apple.com>
1 parent e873969 commit 51d7087

4 files changed

Lines changed: 41 additions & 12 deletions

File tree

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ private void handleOnAddOrUpdate(
141141
ResourceAction action, T oldCustomResource, T newCustomResource) {
142142
var handling =
143143
temporaryResourceCache.onAddOrUpdateEvent(action, newCustomResource, oldCustomResource);
144-
if (handling == EventHandling.NEW) {
144+
if (handling == EventHandling.NEW || handling == EventHandling.IN_BETWEEN) {
145145
handleEvent(action, newCustomResource, oldCustomResource, null);
146146
} else if (log.isDebugEnabled()) {
147147
log.debug("{} event propagation for action: {}", handling, action);

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterDetails.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
*/
1616
package io.javaoperatorsdk.operator.processing.event.source.informer;
1717

18+
import java.util.HashSet;
1819
import java.util.Optional;
20+
import java.util.Set;
1921
import java.util.function.UnaryOperator;
2022

2123
import io.fabric8.kubernetes.api.model.HasMetadata;
@@ -27,6 +29,7 @@ class EventFilterDetails {
2729
private int activeUpdates = 0;
2830
private ResourceEvent lastEvent;
2931
private String lastOwnUpdatedResourceVersion;
32+
private Set<String> allOwnResourceVersions = new HashSet<>();
3033

3134
public void increaseActiveUpdates() {
3235
activeUpdates = activeUpdates + 1;
@@ -69,4 +72,12 @@ public Optional<ResourceEvent> getLatestEventAfterLastUpdateEvent() {
6972
public int getActiveUpdates() {
7073
return activeUpdates;
7174
}
75+
76+
void addToOwnResourceVersions(String updateVersion) {
77+
allOwnResourceVersions.add(updateVersion);
78+
}
79+
80+
public boolean isOwnResourceVersions(String resourceVersion) {
81+
return allOwnResourceVersions.contains(resourceVersion);
82+
}
7283
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,12 +154,12 @@ private synchronized void onAddOrUpdate(ResourceAction action, R newObject, R ol
154154

155155
var eventHandling = temporaryResourceCache.onAddOrUpdateEvent(action, newObject, oldObject);
156156

157-
if (eventHandling != EventHandling.NEW) {
157+
if (eventHandling != EventHandling.NEW && eventHandling != EventHandling.IN_BETWEEN) {
158158
log.debug(
159159
"{} event propagation", eventHandling == EventHandling.DEFER ? "Deferring" : "Skipping");
160160
} else if (eventAcceptedByFilter(action, newObject, oldObject)) {
161161
log.debug(
162-
"Propagating event for {}, resource with same version not result of a reconciliation.",
162+
"Propagating event for {}, resource with same version not result of a our update.",
163163
action);
164164
propagateEvent(newObject);
165165
} else {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public class TemporaryResourceCache<T extends HasMetadata> {
6565
public enum EventHandling {
6666
DEFER,
6767
OBSOLETE,
68+
IN_BETWEEN,
6869
NEW
6970
}
7071

@@ -145,17 +146,30 @@ private synchronized EventHandling onEvent(
145146
// additional event
146147
result = comp == 0 ? EventHandling.OBSOLETE : EventHandling.NEW;
147148
} else {
148-
result = EventHandling.OBSOLETE;
149+
// in this case we received and event that might be in some edge case that was
150+
// already used in reconciler or after that, but before our updated resource version.
151+
// That would be hard to distinguish, so for those we are propagating the event further.
152+
log.debug("Received in between event.");
153+
result = EventHandling.IN_BETWEEN;
149154
}
150155
}
151-
var ed = activeUpdates.get(resourceId);
152-
if (ed != null && result != EventHandling.OBSOLETE) {
153-
log.debug("Setting last event for id: {} delete: {}", resourceId, delete);
154-
ed.setLastEvent(
155-
delete
156-
? new ResourceDeleteEvent(ResourceAction.DELETED, resourceId, resource, unknownState)
157-
: new ExtendedResourceEvent(action, resourceId, resource, prevResourceVersion));
158-
return EventHandling.DEFER;
156+
var au = activeUpdates.get(resourceId);
157+
if (au != null) {
158+
if (result == EventHandling.IN_BETWEEN) {
159+
return au.isOwnResourceVersions(resource.getMetadata().getResourceVersion())
160+
? EventHandling.DEFER
161+
: EventHandling.IN_BETWEEN;
162+
}
163+
if (result == EventHandling.NEW) {
164+
log.debug("Setting last event for id: {} delete: {}", resourceId, delete);
165+
au.setLastEvent(
166+
delete
167+
? new ResourceDeleteEvent(
168+
ResourceAction.DELETED, resourceId, resource, unknownState)
169+
: new ExtendedResourceEvent(action, resourceId, resource, prevResourceVersion));
170+
return EventHandling.DEFER;
171+
}
172+
return result;
159173
} else {
160174
return result;
161175
}
@@ -216,6 +230,10 @@ public synchronized void putResource(T newResource) {
216230
newResource.getMetadata().getResourceVersion(),
217231
resourceId);
218232
cache.put(resourceId, newResource);
233+
var au = activeUpdates.get(resourceId);
234+
if (au != null) {
235+
au.addToOwnResourceVersions(newResource.getMetadata().getResourceVersion());
236+
}
219237
}
220238
}
221239

0 commit comments

Comments
 (0)