Skip to content

Commit 25b2d5e

Browse files
committed
latest resync version
Signed-off-by: Attila Mészáros <a_meszaros@apple.com>
1 parent 6a543d2 commit 25b2d5e

File tree

6 files changed

+54
-58
lines changed

6 files changed

+54
-58
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,13 @@ private Optional<InformerWrapper<R>> getSource(String namespace) {
235235
return Optional.ofNullable(sources.get(namespace));
236236
}
237237

238+
String lastSyncResourceVersion(String namespace) {
239+
return getSource(namespace)
240+
.map(InformerWrapper::getInformer)
241+
.orElseThrow()
242+
.lastSyncResourceVersion();
243+
}
244+
238245
@Override
239246
public void addIndexers(Map<String, Function<R, List<String>>> indexers) {
240247
this.indexers.putAll(indexers);

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,4 +238,8 @@ public Status getStatus() {
238238
public String getTargetNamespace() {
239239
return namespaceIdentifier;
240240
}
241+
242+
public SharedIndexInformer<T> getInformer() {
243+
return informer;
244+
}
241245
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -153,11 +153,7 @@ public synchronized void start() {
153153
if (isRunning()) {
154154
return;
155155
}
156-
var watchesOnlyOneNamespace =
157-
configuration.getInformerConfig().watchCurrentNamespace()
158-
|| configuration.getInformerConfig().watchAllNamespaces();
159-
temporaryResourceCache =
160-
new TemporaryResourceCache<>(comparableResourceVersions, !watchesOnlyOneNamespace, this);
156+
temporaryResourceCache = new TemporaryResourceCache<>(comparableResourceVersions, this);
161157
this.cache = new InformerManager<>(client, configuration, this);
162158
cache.setControllerConfiguration(controllerConfiguration);
163159
cache.addIndexers(indexers);

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java

Lines changed: 6 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,6 @@ public class TemporaryResourceCache<T extends HasMetadata> {
5959
private final Map<ResourceID, T> cache = new ConcurrentHashMap<>();
6060
private final Map<ResourceID, EventFilterDetails> activeUpdates = new HashMap<>();
6161
private final boolean comparableResourceVersions;
62-
private final ConcurrentHashMap<String, String> perNamespaceLatestResourceVersion;
63-
private volatile String latestResourceVersion;
6462

6563
private final long obsoleteResourceCheckInterval;
6664
private volatile long lastObsoleteResourceCheck = System.currentTimeMillis();
@@ -74,28 +72,20 @@ public enum EventHandling {
7472

7573
public TemporaryResourceCache(
7674
boolean comparableResourceVersions,
77-
boolean wrapsMultipleInformers,
7875
ManagedInformerEventSource<T, ?, ?> managedInformerEventSource) {
7976
this(
8077
comparableResourceVersions,
81-
wrapsMultipleInformers,
8278
DEFAULT_OBSOLETE_RESOURCE_CHECK_INTERVAL,
8379
managedInformerEventSource);
8480
}
8581

8682
TemporaryResourceCache(
8783
boolean comparableResourceVersions,
88-
boolean wrapsMultipleInformers,
8984
long obsoleteResourceCheckInterval,
9085
ManagedInformerEventSource<T, ?, ?> managedInformerEventSource) {
9186
this.comparableResourceVersions = comparableResourceVersions;
9287
this.obsoleteResourceCheckInterval = obsoleteResourceCheckInterval;
9388
this.managedInformerEventSource = managedInformerEventSource;
94-
if (wrapsMultipleInformers || !comparableResourceVersions) {
95-
perNamespaceLatestResourceVersion = new ConcurrentHashMap<>();
96-
} else {
97-
perNamespaceLatestResourceVersion = null;
98-
}
9989
}
10090

10191
public synchronized void startEventFilteringModify(ResourceID resourceID) {
@@ -153,9 +143,6 @@ private synchronized EventHandling onEvent(
153143
if (log.isDebugEnabled()) {
154144
log.debug("Processing event");
155145
}
156-
if (!unknownState) {
157-
setLatestResourceVersion(resource);
158-
}
159146
var cached = cache.get(resourceId);
160147
EventHandling result = EventHandling.NEW;
161148
if (cached != null) {
@@ -236,6 +223,10 @@ public synchronized void putResource(T newResource) {
236223
}
237224
}
238225

226+
private String getLatestResourceVersion(String namespace) {
227+
return managedInformerEventSource.manager().lastSyncResourceVersion(namespace);
228+
}
229+
239230
void checkObsoleteResources() {
240231
if (System.currentTimeMillis() > lastObsoleteResourceCheck + obsoleteResourceCheckInterval) {
241232
lastObsoleteResourceCheck = System.currentTimeMillis();
@@ -244,7 +235,8 @@ void checkObsoleteResources() {
244235
while (iterator.hasNext()) {
245236
var e = iterator.next();
246237
if (ReconcilerUtilsInternal.compareResourceVersions(
247-
e.getValue().getMetadata().getResourceVersion(), latestResourceVersion)
238+
e.getValue().getMetadata().getResourceVersion(),
239+
getLatestResourceVersion(e.getValue().getMetadata().getNamespace()))
248240
< 0) iterator.remove();
249241
managedInformerEventSource.handleEvent(ResourceAction.DELETED, e.getValue(), null, true);
250242
log.debug("Removing obsolete resource with ID: {}", e.getKey());
@@ -255,23 +247,4 @@ void checkObsoleteResources() {
255247
public synchronized Optional<T> getResourceFromCache(ResourceID resourceID) {
256248
return Optional.ofNullable(cache.get(resourceID));
257249
}
258-
259-
private void setLatestResourceVersion(T resource) {
260-
if (perNamespaceLatestResourceVersion == null) {
261-
latestResourceVersion = resource.getMetadata().getResourceVersion();
262-
log.debug("Setting latest resource version to: {}", latestResourceVersion);
263-
} else {
264-
perNamespaceLatestResourceVersion.put(
265-
resource.getMetadata().getNamespace(), resource.getMetadata().getResourceVersion());
266-
log.debug("Setting latest resource version to: {} for namespace", latestResourceVersion);
267-
}
268-
}
269-
270-
public String getLatestResourceVersion(String namespace) {
271-
if (perNamespaceLatestResourceVersion == null) {
272-
return latestResourceVersion;
273-
} else {
274-
return perNamespaceLatestResourceVersion.get(namespace);
275-
}
276-
}
277250
}

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -376,8 +376,12 @@ private CountDownLatch sendForEventFilteringUpdate(Deployment deployment, int re
376376
}
377377

378378
private void withRealTemporaryResourceCache() {
379-
temporaryResourceCache =
380-
spy(new TemporaryResourceCache<>(true, false, mock(ManagedInformerEventSource.class)));
379+
var mes = mock(ManagedInformerEventSource.class);
380+
var mim = mock(InformerManager.class);
381+
when(mes.manager()).thenReturn(mim);
382+
when(mim.lastSyncResourceVersion(any())).thenReturn("1");
383+
384+
temporaryResourceCache = spy(new TemporaryResourceCache<>(true, mes));
381385
informerEventSource.setTemporalResourceCache(temporaryResourceCache);
382386
}
383387

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -29,18 +29,25 @@
2929

3030
import static org.assertj.core.api.Assertions.assertThat;
3131
import static org.junit.jupiter.api.Assertions.assertTrue;
32+
import static org.mockito.ArgumentMatchers.any;
3233
import static org.mockito.Mockito.mock;
34+
import static org.mockito.Mockito.when;
3335

3436
class TemporaryPrimaryResourceCacheTest {
3537

3638
public static final String RESOURCE_VERSION = "2";
3739

3840
private TemporaryResourceCache<ConfigMap> temporaryResourceCache;
41+
private volatile String latestSyncVersion;
3942

4043
@BeforeEach
4144
void setup() {
42-
temporaryResourceCache =
43-
new TemporaryResourceCache<>(true, false, mock(ManagedInformerEventSource.class));
45+
var mes = mock(ManagedInformerEventSource.class);
46+
var mim = mock(InformerManager.class);
47+
when(mes.manager()).thenReturn(mim);
48+
when(mim.lastSyncResourceVersion(any())).then(a -> latestSyncVersion);
49+
50+
temporaryResourceCache = new TemporaryResourceCache<>(true, mes);
4451
}
4552

4653
@Test
@@ -49,7 +56,7 @@ void updateAddsTheResourceIntoCacheIfTheInformerHasThePreviousResourceVersion()
4956
var prevTestResource = testResource();
5057
prevTestResource.getMetadata().setResourceVersion("1");
5158

52-
temporaryResourceCache.putResource(testResource);
59+
putResource(testResource);
5360

5461
var cached = temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource));
5562
assertThat(cached).isPresent();
@@ -63,8 +70,8 @@ void updateNotAddsTheResourceIntoCacheIfLaterVersionKnown() {
6370
ResourceAction.ADDED,
6471
testResource.toBuilder().editMetadata().withResourceVersion("3").endMetadata().build(),
6572
null);
66-
67-
temporaryResourceCache.putResource(testResource);
73+
latestSyncVersion = "3";
74+
putResource(testResource);
6875

6976
var cached = temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource));
7077
assertThat(cached).isNotPresent();
@@ -74,7 +81,7 @@ void updateNotAddsTheResourceIntoCacheIfLaterVersionKnown() {
7481
void addOperationAddsTheResourceIfInformerCacheStillEmpty() {
7582
var testResource = testResource();
7683

77-
temporaryResourceCache.putResource(testResource);
84+
putResource(testResource);
7885

7986
var cached = temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource));
8087
assertThat(cached).isPresent();
@@ -84,9 +91,9 @@ void addOperationAddsTheResourceIfInformerCacheStillEmpty() {
8491
void addOperationNotAddsTheResourceIfInformerCacheNotEmpty() {
8592
var testResource = testResource();
8693

87-
temporaryResourceCache.putResource(testResource);
94+
putResource(testResource);
8895

89-
temporaryResourceCache.putResource(
96+
putResource(
9097
new ConfigMapBuilder(testResource)
9198
.editMetadata()
9299
.withResourceVersion("1")
@@ -109,15 +116,15 @@ void removesResourceFromCache() {
109116
.endMetadata()
110117
.build(),
111118
null);
112-
119+
latestSyncVersion = "3";
113120
assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)))
114121
.isNotPresent();
115122
}
116123

117124
@Test
118125
void nonComparableResourceVersionsDisables() {
119126
this.temporaryResourceCache =
120-
new TemporaryResourceCache<>(false, false, mock(ManagedInformerEventSource.class));
127+
new TemporaryResourceCache<>(false, mock(ManagedInformerEventSource.class));
121128

122129
this.temporaryResourceCache.putResource(testResource());
123130

@@ -131,7 +138,7 @@ void eventReceivedDuringFiltering() {
131138

132139
temporaryResourceCache.startEventFilteringModify(ResourceID.fromResource(testResource));
133140

134-
temporaryResourceCache.putResource(testResource);
141+
putResource(testResource);
135142
assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)))
136143
.isPresent();
137144

@@ -153,7 +160,7 @@ void newerEventDuringFiltering() {
153160

154161
temporaryResourceCache.startEventFilteringModify(ResourceID.fromResource(testResource));
155162

156-
temporaryResourceCache.putResource(testResource);
163+
putResource(testResource);
157164
assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)))
158165
.isPresent();
159166

@@ -177,7 +184,7 @@ void eventAfterFiltering() {
177184

178185
temporaryResourceCache.startEventFilteringModify(ResourceID.fromResource(testResource));
179186

180-
temporaryResourceCache.putResource(testResource);
187+
putResource(testResource);
181188
assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)))
182189
.isPresent();
183190

@@ -204,7 +211,7 @@ void putBeforeEvent() {
204211

205212
var nextResource = testResource();
206213
nextResource.getMetadata().setResourceVersion("3");
207-
temporaryResourceCache.putResource(nextResource);
214+
putResource(nextResource);
208215

209216
// the result is obsolete
210217
result = temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.UPDATED, nextResource, null);
@@ -225,7 +232,7 @@ void putBeforeEventWithEventFiltering() {
225232
var resourceId = ResourceID.fromResource(testResource);
226233

227234
temporaryResourceCache.startEventFilteringModify(resourceId);
228-
temporaryResourceCache.putResource(nextResource);
235+
putResource(nextResource);
229236
temporaryResourceCache.doneEventFilterModify(resourceId, "3");
230237

231238
// the result is obsolete
@@ -252,7 +259,7 @@ void putAfterEventWithEventFilteringNoPost() {
252259
ResourceAction.UPDATED, nextResource, testResource);
253260
// the result is deferred
254261
assertThat(result).isEqualTo(EventHandling.DEFER);
255-
temporaryResourceCache.putResource(nextResource);
262+
putResource(nextResource);
256263
var postEvent = temporaryResourceCache.doneEventFilterModify(resourceId, "3");
257264

258265
// there is no post event because the done call claimed responsibility for rv 3
@@ -291,15 +298,20 @@ void rapidDeletion() {
291298
.endMetadata()
292299
.build(),
293300
false);
294-
temporaryResourceCache.putResource(testResource);
301+
latestSyncVersion = "3";
302+
putResource(testResource);
295303

296304
assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)))
297305
.isEmpty();
298306
}
299307

308+
private void putResource(ConfigMap testResource) {
309+
temporaryResourceCache.putResource(testResource);
310+
}
311+
300312
private ConfigMap propagateTestResourceToCache() {
301313
var testResource = testResource();
302-
temporaryResourceCache.putResource(testResource);
314+
putResource(testResource);
303315
assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)))
304316
.isPresent();
305317
return testResource;

0 commit comments

Comments
 (0)