Skip to content

Commit 7230595

Browse files
committed
improve: read cache after write corner case for missed delete event
Signed-off-by: Attila Mészáros <a_meszaros@apple.com>
1 parent 78eecc0 commit 7230595

File tree

2 files changed

+38
-4
lines changed

2 files changed

+38
-4
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
import java.util.HashMap;
1919
import java.util.Map;
2020
import java.util.Optional;
21+
import java.util.concurrent.CompletableFuture;
2122
import java.util.concurrent.ConcurrentHashMap;
23+
import java.util.concurrent.ConcurrentSkipListMap;
2224

2325
import org.slf4j.Logger;
2426
import org.slf4j.LoggerFactory;
@@ -55,9 +57,12 @@ public class TemporaryResourceCache<T extends HasMetadata> {
5557

5658
private static final Logger log = LoggerFactory.getLogger(TemporaryResourceCache.class);
5759

60+
private final ConcurrentSkipListMap<Long, ResourceID> cachedVersions =
61+
new ConcurrentSkipListMap<>();
62+
5863
private final Map<ResourceID, T> cache = new ConcurrentHashMap<>();
5964
private final boolean comparableResourceVersions;
60-
private String latestResourceVersion;
65+
private volatile String latestResourceVersion;
6166

6267
private final Map<ResourceID, EventFilterDetails> activeUpdates = new HashMap<>();
6368

@@ -139,7 +144,7 @@ private synchronized EventHandling onEvent(
139144
"Removing resource from temp cache. comparison: {} unknown state: {}",
140145
comp,
141146
unknownState);
142-
cache.remove(resourceId);
147+
cacheRemove(resourceId, cached.getMetadata().getResourceVersion());
143148
// we propagate event only for our update or newer other can be discarded since we know we
144149
// will receive
145150
// additional event
@@ -155,12 +160,25 @@ private synchronized EventHandling onEvent(
155160
delete
156161
? new ResourceDeleteEvent(ResourceAction.DELETED, resourceId, resource, unknownState)
157162
: new ExtendedResourceEvent(action, resourceId, resource, prevResourceVersion));
163+
checkStaleResources();
158164
return EventHandling.DEFER;
159165
} else {
160166
return result;
161167
}
162168
}
163169

170+
private void checkStaleResources() {
171+
CompletableFuture.runAsync(
172+
() -> {
173+
var longLatest = Long.parseLong(latestResourceVersion);
174+
var head = cachedVersions.headMap(longLatest);
175+
for (var entry : head.entrySet()) {
176+
cache.remove(entry.getValue());
177+
cachedVersions.remove(entry.getKey());
178+
}
179+
});
180+
}
181+
164182
/** put the item into the cache if it's for a later state than what has already been observed. */
165183
public synchronized void putResource(T newResource) {
166184
if (!comparableResourceVersions) {
@@ -204,11 +222,27 @@ public synchronized void putResource(T newResource) {
204222
"Temporarily moving ahead to target version {} for resource id: {}",
205223
newResource.getMetadata().getResourceVersion(),
206224
resourceId);
207-
cache.put(resourceId, newResource);
225+
cacheResource(resourceId, newResource);
208226
}
209227
}
210228

211229
public synchronized Optional<T> getResourceFromCache(ResourceID resourceID) {
212230
return Optional.ofNullable(cache.get(resourceID));
213231
}
232+
233+
private void cacheResource(ResourceID resourceId, T resource) {
234+
var actualCached = cache.get(resourceId);
235+
cache.put(resourceId, resource);
236+
CompletableFuture.runAsync(
237+
() -> {
238+
cachedVersions.remove(Long.parseLong(actualCached.getMetadata().getResourceVersion()));
239+
cachedVersions.put(
240+
Long.parseLong(resource.getMetadata().getResourceVersion()), resourceId);
241+
});
242+
}
243+
244+
private void cacheRemove(ResourceID resourceId, String cachedResourceVersion) {
245+
cache.remove(resourceId);
246+
CompletableFuture.runAsync(() -> cachedVersions.remove(Long.parseLong(cachedResourceVersion)));
247+
}
214248
}

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java renamed to operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import static org.assertj.core.api.Assertions.assertThat;
3131
import static org.junit.jupiter.api.Assertions.assertTrue;
3232

33-
class TemporaryPrimaryResourceCacheTest {
33+
class TemporaryResourceCacheTest {
3434

3535
public static final String RESOURCE_VERSION = "2";
3636

0 commit comments

Comments
 (0)