Skip to content

Commit 0ad15bf

Browse files
committed
improve: filter only own updates for read-after-write-conistency with re-list
Signed-off-by: Attila Mészáros <a_meszaros@apple.com>
1 parent 5ea96dc commit 0ad15bf

4 files changed

Lines changed: 80 additions & 17 deletions

File tree

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterDetails.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,12 @@ class EventFilterDetails {
3232
private int activeUpdates = 0;
3333
private final List<GenericResourceEvent> relatedEvents = new ArrayList<>(5);
3434
private final Set<String> allOwnResourceVersions = new HashSet<>(5);
35+
private boolean affectedByReList;
36+
private volatile boolean reListSummaryEventSent = false;
37+
38+
public EventFilterDetails(boolean affectedByReList) {
39+
this.affectedByReList = affectedByReList;
40+
}
3541

3642
public void increaseActiveUpdates() {
3743
activeUpdates = activeUpdates + 1;
@@ -63,13 +69,33 @@ public void addRelatedEvent(GenericResourceEvent event) {
6369
relatedEvents.add(event);
6470
}
6571

66-
public Optional<GenericResourceEvent> prepareSummaryEventIfNotOwnEventsPresent() {
72+
public Optional<GenericResourceEvent> summaryEventForReList() {
73+
if (!affectedByReList) {
74+
throw new IllegalStateException(
75+
"ReList summary event requested to detail not affected by relist");
76+
}
77+
if (reListSummaryEventSent) {
78+
throw new IllegalStateException("ReList summary event already sent");
79+
}
80+
reListSummaryEventSent = true;
81+
if (relatedEvents.isEmpty()) {
82+
return Optional.empty();
83+
}
84+
return summaryEvent();
85+
}
86+
87+
// todo unit tests for corner cases with empty collections
88+
public Optional<GenericResourceEvent> summaryEvent() {
6789
if (relatedEvents.isEmpty()) {
6890
return Optional.empty();
6991
}
7092
if (allOwnResourceVersions.containsAll(relatedEventResourceVersions())) {
7193
return Optional.empty();
7294
}
95+
return summaryEventInternal();
96+
}
97+
98+
private Optional<GenericResourceEvent> summaryEventInternal() {
7399
var deleteEvent =
74100
relatedEvents.stream().filter(e -> e.getAction() == ResourceAction.DELETED).findFirst();
75101
if (deleteEvent.isPresent()) {
@@ -108,4 +134,16 @@ public boolean newerOrEqualEventReceivedForOwnLastUpdate() {
108134
.map(e -> e.getResource().orElseThrow().getMetadata().getResourceVersion())
109135
.anyMatch(rv -> ReconcilerUtilsInternal.compareResourceVersions(rv, lastOwn) >= 0);
110136
}
137+
138+
public boolean isAffectedByReList() {
139+
return affectedByReList;
140+
}
141+
142+
public void affectedByReList() {
143+
this.affectedByReList = true;
144+
}
145+
146+
public boolean isReListSummaryEventSent() {
147+
return reListSummaryEventSent;
148+
}
111149
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,9 +146,15 @@ public synchronized void stop() {
146146

147147
@Override
148148
public void onList(String resourceVersion, boolean remainedEmpty) {
149+
temporaryResourceCache.setRelistFinished();
149150
temporaryResourceCache.checkGhostResources();
150151
}
151152

153+
@Override
154+
public void onBeforeList(String lastSyncResourceVersion) {
155+
temporaryResourceCache.setOngoingRelist();
156+
}
157+
152158
@Override
153159
public void handleRecentResourceUpdate(
154160
ResourceID resourceID, R resource, R previousVersionOfResource) {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,9 @@ public class TemporaryResourceCache<T extends HasMetadata> {
5555
private static final Logger log = LoggerFactory.getLogger(TemporaryResourceCache.class);
5656

5757
private final Map<ResourceID, T> cache = new ConcurrentHashMap<>();
58-
private final Map<ResourceID, EventFilterDetails> activeUpdates = new HashMap<>();
58+
private final Map<ResourceID, EventFilterDetails> cachingFilteringUpdates = new HashMap<>();
5959
private final boolean comparableResourceVersions;
60+
private boolean informerOngoingRelist = false;
6061

6162
private final ManagedInformerEventSource<T, ?, ?> managedInformerEventSource;
6263

@@ -71,26 +72,22 @@ public synchronized void startEventFilteringModify(ResourceID resourceID) {
7172
if (!comparableResourceVersions) {
7273
return;
7374
}
74-
var ed = activeUpdates.computeIfAbsent(resourceID, id -> new EventFilterDetails());
75+
var ed =
76+
cachingFilteringUpdates.computeIfAbsent(
77+
resourceID, id -> new EventFilterDetails(informerOngoingRelist));
7578
ed.increaseActiveUpdates();
7679
}
7780

7881
public synchronized Optional<GenericResourceEvent> doneEventFilterModify(ResourceID resourceID) {
7982
if (!comparableResourceVersions) {
8083
return Optional.empty();
8184
}
82-
var ed = activeUpdates.get(resourceID);
85+
var ed = cachingFilteringUpdates.get(resourceID);
8386
if (!ed.decreaseActiveUpdates()) {
8487
log.debug("Active updates {} for resource id: {}", ed.getActiveUpdates(), resourceID);
8588
return Optional.empty();
8689
}
87-
88-
if (ed.newerOrEqualEventReceivedForOwnLastUpdate()) {
89-
activeUpdates.remove(resourceID);
90-
return ed.prepareSummaryEventIfNotOwnEventsPresent();
91-
} else {
92-
return Optional.empty();
93-
}
90+
return finaleEventHandlingAndCleanup(resourceID, ed);
9491
}
9592

9693
public Optional<GenericResourceEvent> onDeleteEvent(T resource, boolean unknownState) {
@@ -136,21 +133,20 @@ private synchronized Optional<GenericResourceEvent> onEvent(
136133
log.debug("Received intermediate event.");
137134
}
138135
}
139-
var au = activeUpdates.get(resourceId);
136+
var au = cachingFilteringUpdates.get(resourceId);
140137
if (au != null) {
141138
log.debug("Recording relevant event");
142139
au.addRelatedEvent(
143140
new GenericResourceEvent(action, resource, prevResourceVersion, unknownState));
144141
// this is to cover the situation when we finished the filtering and caching update but
145142
// did not receive events for our own updates yet.
146143
if (au.isNoActiveUpdate() && au.newerOrEqualEventReceivedForOwnLastUpdate()) {
147-
activeUpdates.remove(resourceId);
148-
return au.prepareSummaryEventIfNotOwnEventsPresent();
144+
return finaleEventHandlingAndCleanup(resourceId, au);
149145
}
150146
return Optional.empty();
151147
} else {
152148
log.debug("No active recording, event handling: {}", result);
153-
return result;
149+
return informerOngoingRelist ? Optional.of(actualEvent) : result;
154150
}
155151
}
156152

@@ -206,7 +202,7 @@ public synchronized void putResource(T newResource) {
206202

207203
// also make sure that we're later than the existing temporary entry
208204
var cachedResource = getResourceFromCache(resourceId).orElse(null);
209-
Optional.ofNullable(activeUpdates.get(resourceId))
205+
Optional.ofNullable(cachingFilteringUpdates.get(resourceId))
210206
.ifPresent(
211207
au -> au.addToOwnResourceVersions(newResource.getMetadata().getResourceVersion()));
212208

@@ -264,6 +260,20 @@ public void checkGhostResources() {
264260
}
265261
}
266262

263+
private Optional<GenericResourceEvent> finaleEventHandlingAndCleanup(
264+
ResourceID resourceID, EventFilterDetails ed) {
265+
if (ed.newerOrEqualEventReceivedForOwnLastUpdate()) {
266+
cachingFilteringUpdates.remove(resourceID);
267+
if (ed.isAffectedByReList()) {
268+
return ed.summaryEventForReList();
269+
} else {
270+
return ed.summaryEvent();
271+
}
272+
} else {
273+
return Optional.empty();
274+
}
275+
}
276+
267277
public synchronized Optional<T> getResourceFromCache(ResourceID resourceID) {
268278
return Optional.ofNullable(cache.get(resourceID));
269279
}
@@ -275,4 +285,13 @@ synchronized boolean isEmpty() {
275285
synchronized Map<ResourceID, T> getResources() {
276286
return Collections.unmodifiableMap(cache);
277287
}
288+
289+
public synchronized void setOngoingRelist() {
290+
this.informerOngoingRelist = true;
291+
cachingFilteringUpdates.values().forEach(EventFilterDetails::affectedByReList);
292+
}
293+
294+
public synchronized void setRelistFinished() {
295+
this.informerOngoingRelist = false;
296+
}
278297
}

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@
7171
<sonar.host.url>https://sonarcloud.io</sonar.host.url>
7272
<fabric8-httpclient-impl.name>jdk</fabric8-httpclient-impl.name>
7373
<junit.version>6.1.0</junit.version>
74-
<fabric8-client.version>7.7.0</fabric8-client.version>
74+
<fabric8-client.version>999-SNAPSHOT</fabric8-client.version>
7575
<slf4j.version>2.0.18</slf4j.version>
7676
<log4j.version>2.26.0</log4j.version>
7777
<mokito.version>5.23.0</mokito.version>

0 commit comments

Comments
 (0)