Skip to content

Commit 0f5ec20

Browse files
committed
wip
Signed-off-by: Attila Mészáros <a_meszaros@apple.com>
1 parent e23172f commit 0f5ec20

File tree

3 files changed

+90
-33
lines changed

3 files changed

+90
-33
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,8 @@ private synchronized EventHandling onEvent(
127127
}
128128
var cached = cache.get(resourceId);
129129
EventHandling result = EventHandling.NEW;
130-
int comp = 0;
131130
if (cached != null) {
132-
comp = ReconcileUtils.compareResourceVersions(resource, cached);
131+
int comp = ReconcileUtils.compareResourceVersions(resource, cached);
133132
if (comp >= 0 || unknownState) {
134133
cache.remove(resourceId);
135134
// we propagate event only for our update or newer other can be discarded since we know we

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,13 @@
1515
*/
1616
package io.javaoperatorsdk.operator.processing.event.source.informer;
1717

18+
import java.time.Duration;
1819
import java.util.Optional;
1920
import java.util.Set;
2021
import java.util.concurrent.CountDownLatch;
2122

2223
import org.junit.jupiter.api.BeforeEach;
2324
import org.junit.jupiter.api.Test;
24-
import org.junit.jupiter.api.TestInstance;
2525

2626
import io.fabric8.kubernetes.api.model.ObjectMeta;
2727
import io.fabric8.kubernetes.api.model.apps.Deployment;
@@ -62,7 +62,6 @@
6262
import static org.mockito.Mockito.when;
6363

6464
@SuppressWarnings({"rawtypes", "unchecked"})
65-
@TestInstance(value = TestInstance.Lifecycle.PER_METHOD)
6665
class InformerEventSourceTest {
6766

6867
private static final String PREV_RESOURCE_VERSION = "0";
@@ -146,12 +145,11 @@ void processEventPropagationWithIncorrectAnnotation() {
146145

147146
@Test
148147
void propagateEventAndRemoveResourceFromTempCacheIfResourceVersionMismatch() {
148+
withRealTemporaryResourceCache();
149+
149150
Deployment cachedDeployment = testDeployment();
150151
cachedDeployment.getMetadata().setResourceVersion(PREV_RESOURCE_VERSION);
151-
when(temporaryResourceCache.getResourceFromCache(any()))
152-
.thenReturn(Optional.of(cachedDeployment));
153-
when(temporaryResourceCache.onAddOrUpdateEvent(any(), any(), any()))
154-
.thenReturn(EventHandling.NEW);
152+
temporaryResourceCache.putResource(cachedDeployment);
155153

156154
informerEventSource.onUpdate(cachedDeployment, testDeployment());
157155

@@ -247,16 +245,36 @@ void handlesPrevResourceVersionForUpdateInCaseOfMultipleUpdates() {
247245
}
248246

249247
@Test
250-
void doesNotPropagateEventIfReceivedBeforeUpdate() throws InterruptedException {
248+
void doesNotPropagateEventIfReceivedBeforeUpdate() {
251249
withRealTemporaryResourceCache();
252250

253251
CountDownLatch latch = sendForEventFilteringUpdate(2);
254252
informerEventSource.onUpdate(
255253
deploymentWithResourceVersion(1), deploymentWithResourceVersion(2));
256254
latch.countDown();
257255

258-
Thread.sleep(100);
259-
verify(informerEventSource, never()).handleEvent(any(), any(), any(), any());
256+
assertNoEventProduced();
257+
}
258+
259+
@Test
260+
void filterAddEventBeforeUpdate() {
261+
withRealTemporaryResourceCache();
262+
263+
CountDownLatch latch = sendForEventFilteringUpdate(2);
264+
informerEventSource.onAdd(deploymentWithResourceVersion(1));
265+
latch.countDown();
266+
267+
assertNoEventProduced();
268+
}
269+
270+
private void assertNoEventProduced() {
271+
await()
272+
.pollDelay(Duration.ofMillis(50))
273+
.timeout(Duration.ofMillis(51))
274+
.untilAsserted(
275+
() -> {
276+
verify(informerEventSource, never()).handleEvent(any(), any(), any(), any());
277+
});
260278
}
261279

262280
private void expectHandleEvent(int newResourceVersion, int oldResourceVersion) {
@@ -292,7 +310,7 @@ private CountDownLatch sendForEventFilteringUpdate(Deployment deployment, int re
292310
}
293311

294312
private void withRealTemporaryResourceCache() {
295-
temporaryResourceCache = new TemporaryResourceCache<>(true);
313+
temporaryResourceCache = spy(new TemporaryResourceCache<>(true));
296314
informerEventSource.setTemporalResourceCache(temporaryResourceCache);
297315
}
298316

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java

Lines changed: 61 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,8 @@
1616
package io.javaoperatorsdk.operator.processing.event.source.informer;
1717

1818
import java.util.Map;
19-
import java.util.concurrent.ExecutorService;
20-
import java.util.concurrent.Executors;
21-
import java.util.concurrent.TimeUnit;
2219

2320
import org.junit.jupiter.api.BeforeEach;
24-
import org.junit.jupiter.api.Disabled;
2521
import org.junit.jupiter.api.Test;
2622

2723
import io.fabric8.kubernetes.api.model.ConfigMap;
@@ -126,28 +122,72 @@ void nonComparableResourceVersionsDisables() {
126122
.isEmpty();
127123
}
128124

129-
@Disabled("todo")
130125
@Test
131-
void lockedEventBeforePut() throws Exception {
126+
void eventReceivedDuringFiltering() throws Exception {
132127
var testResource = testResource();
133128

134129
temporaryResourceCache.startEventFilteringModify(ResourceID.fromResource(testResource));
135130

136-
ExecutorService ex = Executors.newSingleThreadExecutor();
137-
try {
138-
var result =
139-
ex.submit(
140-
() ->
141-
temporaryResourceCache.onAddOrUpdateEvent(
142-
ResourceAction.ADDED, testResource, null));
143-
144-
temporaryResourceCache.putResource(testResource);
145-
assertThat(result.isDone()).isFalse();
146-
temporaryResourceCache.doneEventFilterModify(ResourceID.fromResource(testResource), "3");
147-
assertThat(result.get(10, TimeUnit.SECONDS)).isEqualTo(EventHandling.NEW);
148-
} finally {
149-
ex.shutdownNow();
150-
}
131+
temporaryResourceCache.putResource(testResource);
132+
assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)))
133+
.isPresent();
134+
135+
temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.ADDED, testResource, null);
136+
assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)))
137+
.isEmpty();
138+
139+
var doneRes =
140+
temporaryResourceCache.doneEventFilterModify(ResourceID.fromResource(testResource), "2");
141+
142+
assertThat(doneRes).isEmpty();
143+
assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)))
144+
.isEmpty();
145+
}
146+
147+
@Test
148+
void newerEventDuringFiltering() {
149+
var testResource = testResource();
150+
151+
temporaryResourceCache.startEventFilteringModify(ResourceID.fromResource(testResource));
152+
153+
temporaryResourceCache.putResource(testResource);
154+
assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)))
155+
.isPresent();
156+
157+
var testResource2 = testResource();
158+
testResource2.getMetadata().setResourceVersion("3");
159+
temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.UPDATED, testResource2, testResource);
160+
assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)))
161+
.isEmpty();
162+
163+
var doneRes =
164+
temporaryResourceCache.doneEventFilterModify(ResourceID.fromResource(testResource), "2");
165+
166+
assertThat(doneRes).isPresent();
167+
assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)))
168+
.isEmpty();
169+
}
170+
171+
@Test
172+
void eventAfterFiltering() {
173+
var testResource = testResource();
174+
175+
temporaryResourceCache.startEventFilteringModify(ResourceID.fromResource(testResource));
176+
177+
temporaryResourceCache.putResource(testResource);
178+
assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)))
179+
.isPresent();
180+
181+
var doneRes =
182+
temporaryResourceCache.doneEventFilterModify(ResourceID.fromResource(testResource), "2");
183+
184+
assertThat(doneRes).isEmpty();
185+
assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)))
186+
.isPresent();
187+
188+
temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.ADDED, testResource, null);
189+
assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)))
190+
.isEmpty();
151191
}
152192

153193
@Test

0 commit comments

Comments
 (0)