Skip to content

Commit 1f98b60

Browse files
committed
latest resource version by informers
Signed-off-by: Attila Mészáros <a_meszaros@apple.com>
1 parent cfe42bb commit 1f98b60

File tree

4 files changed

+42
-10
lines changed

4 files changed

+42
-10
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,11 @@ public synchronized void start() {
153153
if (isRunning()) {
154154
return;
155155
}
156-
temporaryResourceCache = new TemporaryResourceCache<>(comparableResourceVersions, this);
156+
var watchesOnlyOneNamespace =
157+
configuration.getInformerConfig().watchCurrentNamespace()
158+
|| configuration.getInformerConfig().watchAllNamespaces();
159+
temporaryResourceCache =
160+
new TemporaryResourceCache<>(comparableResourceVersions, !watchesOnlyOneNamespace, this);
157161
this.cache = new InformerManager<>(client, configuration, this);
158162
cache.setControllerConfiguration(controllerConfiguration);
159163
cache.addIndexers(indexers);

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ public class TemporaryResourceCache<T extends HasMetadata> {
5959
private final Map<ResourceID, T> cache = new ConcurrentHashMap<>();
6060
private final Map<ResourceID, EventFilterDetails> activeUpdates = new HashMap<>();
6161
private final boolean comparableResourceVersions;
62-
private String latestResourceVersion;
62+
private final ConcurrentHashMap<String, String> perNamespaceLatestResourceVersion;
63+
private volatile String latestResourceVersion;
6364

6465
private final long obsoleteResourceCheckInterval;
6566
private volatile long lastObsoleteResourceCheck = System.currentTimeMillis();
@@ -73,20 +74,28 @@ public enum EventHandling {
7374

7475
public TemporaryResourceCache(
7576
boolean comparableResourceVersions,
77+
boolean wrapsMultipleInformers,
7678
ManagedInformerEventSource<T, ?, ?> managedInformerEventSource) {
7779
this(
7880
comparableResourceVersions,
81+
wrapsMultipleInformers,
7982
DEFAULT_OBSOLETE_RESOURCE_CHECK_INTERVAL,
8083
managedInformerEventSource);
8184
}
8285

8386
TemporaryResourceCache(
8487
boolean comparableResourceVersions,
88+
boolean wrapsMultipleInformers,
8589
long obsoleteResourceCheckInterval,
8690
ManagedInformerEventSource<T, ?, ?> managedInformerEventSource) {
8791
this.comparableResourceVersions = comparableResourceVersions;
8892
this.obsoleteResourceCheckInterval = obsoleteResourceCheckInterval;
8993
this.managedInformerEventSource = managedInformerEventSource;
94+
if (wrapsMultipleInformers || !comparableResourceVersions) {
95+
perNamespaceLatestResourceVersion = new ConcurrentHashMap<>();
96+
} else {
97+
perNamespaceLatestResourceVersion = null;
98+
}
9099
}
91100

92101
public synchronized void startEventFilteringModify(ResourceID resourceID) {
@@ -145,8 +154,7 @@ private synchronized EventHandling onEvent(
145154
log.debug("Processing event");
146155
}
147156
if (!unknownState) {
148-
latestResourceVersion = resource.getMetadata().getResourceVersion();
149-
log.debug("Setting latest resource version to: {}", latestResourceVersion);
157+
setLatestResourceVersion(resource);
150158
}
151159
var cached = cache.get(resourceId);
152160
EventHandling result = EventHandling.NEW;
@@ -202,15 +210,16 @@ public synchronized void putResource(T newResource) {
202210
//
203211
// this also prevents resurrecting recently deleted entities for which the delete event
204212
// has already been processed
205-
if (latestResourceVersion != null
213+
var latestRV = getLatestResourceVersion(newResource.getMetadata().getNamespace());
214+
if (latestRV != null
206215
&& ReconcilerUtilsInternal.compareResourceVersions(
207-
latestResourceVersion, newResource.getMetadata().getResourceVersion())
216+
latestRV, newResource.getMetadata().getResourceVersion())
208217
> 0) {
209218
log.debug(
210219
"Resource {}: resourceVersion {} is not later than latest {}",
211220
resourceId,
212221
newResource.getMetadata().getResourceVersion(),
213-
latestResourceVersion);
222+
latestRV);
214223
return;
215224
}
216225

@@ -246,4 +255,23 @@ void checkObsoleteResources() {
246255
public synchronized Optional<T> getResourceFromCache(ResourceID resourceID) {
247256
return Optional.ofNullable(cache.get(resourceID));
248257
}
258+
259+
private void setLatestResourceVersion(T resource) {
260+
if (perNamespaceLatestResourceVersion == null) {
261+
latestResourceVersion = resource.getMetadata().getResourceVersion();
262+
log.debug("Setting latest resource version to: {}", latestResourceVersion);
263+
} else {
264+
perNamespaceLatestResourceVersion.put(
265+
resource.getMetadata().getNamespace(), resource.getMetadata().getResourceVersion());
266+
log.debug("Setting latest resource version to: {} for namesoace", latestResourceVersion);
267+
}
268+
}
269+
270+
public String getLatestResourceVersion(String namespace) {
271+
if (perNamespaceLatestResourceVersion == null) {
272+
return latestResourceVersion;
273+
} else {
274+
return perNamespaceLatestResourceVersion.get(namespace);
275+
}
276+
}
249277
}

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ private CountDownLatch sendForEventFilteringUpdate(Deployment deployment, int re
377377

378378
private void withRealTemporaryResourceCache() {
379379
temporaryResourceCache =
380-
spy(new TemporaryResourceCache<>(true, mock(ManagedInformerEventSource.class)));
380+
spy(new TemporaryResourceCache<>(true, false, mock(ManagedInformerEventSource.class)));
381381
informerEventSource.setTemporalResourceCache(temporaryResourceCache);
382382
}
383383

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class TemporaryPrimaryResourceCacheTest {
4040
@BeforeEach
4141
void setup() {
4242
temporaryResourceCache =
43-
new TemporaryResourceCache<>(true, mock(ManagedInformerEventSource.class));
43+
new TemporaryResourceCache<>(true, false, mock(ManagedInformerEventSource.class));
4444
}
4545

4646
@Test
@@ -117,7 +117,7 @@ void removesResourceFromCache() {
117117
@Test
118118
void nonComparableResourceVersionsDisables() {
119119
this.temporaryResourceCache =
120-
new TemporaryResourceCache<>(false, mock(ManagedInformerEventSource.class));
120+
new TemporaryResourceCache<>(false, false, mock(ManagedInformerEventSource.class));
121121

122122
this.temporaryResourceCache.putResource(testResource());
123123

0 commit comments

Comments
 (0)