Skip to content

Commit 4fad756

Browse files
committed
wip
Signed-off-by: Attila Mészáros <a_meszaros@apple.com>
1 parent 6018bec commit 4fad756

2 files changed

Lines changed: 118 additions & 21 deletions

File tree

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,14 @@
3535
import io.javaoperatorsdk.operator.processing.Controller;
3636
import io.javaoperatorsdk.operator.processing.event.EventHandler;
3737
import io.javaoperatorsdk.operator.processing.event.EventSourceManager;
38+
import io.javaoperatorsdk.operator.processing.event.ResourceID;
3839
import io.javaoperatorsdk.operator.processing.event.source.AbstractEventSourceTestBase;
3940
import io.javaoperatorsdk.operator.processing.event.source.EventFilterTestUtils;
4041
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
4142
import io.javaoperatorsdk.operator.processing.event.source.filter.GenericFilter;
4243
import io.javaoperatorsdk.operator.processing.event.source.filter.OnAddFilter;
4344
import io.javaoperatorsdk.operator.processing.event.source.filter.OnUpdateFilter;
45+
import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache;
4446
import io.javaoperatorsdk.operator.sample.simple.TestCustomResource;
4547

4648
import static io.javaoperatorsdk.operator.processing.event.source.EventFilterTestUtils.withResourceVersion;
@@ -227,6 +229,74 @@ void eventFilteringExceptionDuringUpdate() {
227229
expectHandleEvent(2, 1);
228230
}
229231

232+
@Test
233+
void propagatesIntermediateEventForExternalUpdateDuringFiltering() {
234+
// Causal-dependency scenario: a third party updated the resource between our read and
235+
// our write. The informer delivers that update during our active filter; since its
236+
// resource version is NOT one of our own writes, it must be propagated.
237+
var src = new TestableControllerEventSource(new TestController(null, null, null));
238+
setUpSource(src, true, controllerConfig);
239+
240+
var resourceId = ResourceID.fromResource(TestUtils.testCustomResource1());
241+
242+
// first filter writes rv 4 (our own); a second concurrent filter keeps the
243+
// active-updates window open while the event below is processed
244+
var latch1 = sendForEventFilteringUpdate(4);
245+
var latch2 = sendForEventFilteringUpdate(testResourceWithVersion(4), 5);
246+
247+
latch1.countDown();
248+
awaitCachedResourceVersion(src.tempCache(), resourceId, "4");
249+
250+
// external update with rv 3 (older than our cached rv 4) — must propagate
251+
source.onUpdate(testResourceWithVersion(2), testResourceWithVersion(3));
252+
253+
verify(eventHandler, times(1)).handleEvent(any());
254+
255+
latch2.countDown();
256+
}
257+
258+
@Test
259+
void doesNotPropagateIntermediateEventForOurOwnIntermediateUpdate() {
260+
// Two consecutive own writes (rv 3 then rv 4) within an open filter window: an event
261+
// for the older own version must be deferred since it's recognized as our own. A
262+
// third concurrent filter keeps the active-updates window open while the event below
263+
// is processed.
264+
var src = new TestableControllerEventSource(new TestController(null, null, null));
265+
setUpSource(src, true, controllerConfig);
266+
267+
var resourceId = ResourceID.fromResource(TestUtils.testCustomResource1());
268+
269+
var latch1 = sendForEventFilteringUpdate(3);
270+
var latch2 = sendForEventFilteringUpdate(testResourceWithVersion(3), 4);
271+
var latch3 = sendForEventFilteringUpdate(testResourceWithVersion(4), 5);
272+
273+
latch1.countDown();
274+
awaitCachedResourceVersion(src.tempCache(), resourceId, "3");
275+
latch2.countDown();
276+
awaitCachedResourceVersion(src.tempCache(), resourceId, "4");
277+
278+
// event for our own rv 3 (older than cached rv 4) — must be deferred
279+
source.onUpdate(testResourceWithVersion(2), testResourceWithVersion(3));
280+
281+
verify(eventHandler, never()).handleEvent(any());
282+
283+
latch3.countDown();
284+
}
285+
286+
private void awaitCachedResourceVersion(
287+
TemporaryResourceCache<TestCustomResource> cache,
288+
ResourceID resourceId,
289+
String resourceVersion) {
290+
await()
291+
.untilAsserted(
292+
() ->
293+
assertThat(
294+
cache
295+
.getResourceFromCache(resourceId)
296+
.map(r -> r.getMetadata().getResourceVersion()))
297+
.hasValue(resourceVersion));
298+
}
299+
230300
private void expectHandleEvent(int newResourceVersion, int oldResourceVersion) {
231301
await()
232302
.untilAsserted(
@@ -330,4 +400,15 @@ public TestConfiguration(
330400
false);
331401
}
332402
}
403+
404+
private static class TestableControllerEventSource
405+
extends ControllerEventSource<TestCustomResource> {
406+
TestableControllerEventSource(Controller<TestCustomResource> controller) {
407+
super(controller);
408+
}
409+
410+
TemporaryResourceCache<TestCustomResource> tempCache() {
411+
return temporaryResourceCache;
412+
}
413+
}
333414
}

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java

Lines changed: 37 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -453,49 +453,64 @@ void propagatesIntermediateEventForExternalUpdateDuringFiltering() {
453453
// Causal-dependency fix: another controller updated the resource between our read
454454
// and our write. The informer delivers that update during our active filter; since
455455
// its resource version is NOT one of our own writes, it must be propagated.
456-
var realCache = realCacheWithWatchedNamespace();
456+
withRealTemporaryResourceCache();
457+
457458
var resourceId = ResourceID.fromResource(testDeployment());
458459

459-
realCache.startEventFilteringModify(resourceId);
460-
realCache.putResource(deploymentWithResourceVersion(4));
460+
// first filter writes rv 4 (our own); a second concurrent filter keeps the
461+
// active-updates window open so the event below hits the active path
462+
var latch1 = sendForEventFilteringUpdate(4);
463+
var latch2 = sendForEventFilteringUpdate(deploymentWithResourceVersion(4), 5);
464+
465+
latch1.countDown();
466+
awaitCachedResourceVersion(resourceId, "4");
461467

468+
// external update with rv 3 (older than our cached rv 4) — must propagate
462469
informerEventSource.onUpdate(
463470
deploymentWithResourceVersion(2), deploymentWithResourceVersion(3));
464471

465472
verify(eventHandlerMock, times(1)).handleEvent(any());
466473

467-
realCache.doneEventFilterModify(resourceId, "4");
474+
latch2.countDown();
468475
}
469476

470477
@Test
471478
void doesNotPropagateIntermediateEventForOurOwnIntermediateUpdate() {
472-
// Two consecutive own writes within a single filter window: the older one's event
473-
// arrives after the newer one has been cached. Because the version is recorded as
474-
// our own, the event must be deferred (not propagated).
475-
var realCache = realCacheWithWatchedNamespace();
479+
// Two consecutive own writes (rv 3 then rv 4) within an open filter window: an
480+
// event for the older own version must be deferred since it's recognized as our own.
481+
// A third concurrent filter keeps the active-updates window open while the event
482+
// below is processed.
483+
withRealTemporaryResourceCache();
484+
476485
var resourceId = ResourceID.fromResource(testDeployment());
477486

478-
realCache.startEventFilteringModify(resourceId);
479-
realCache.putResource(deploymentWithResourceVersion(3));
480-
realCache.putResource(deploymentWithResourceVersion(4));
487+
var latch1 = sendForEventFilteringUpdate(3);
488+
var latch2 = sendForEventFilteringUpdate(deploymentWithResourceVersion(3), 4);
489+
var latch3 = sendForEventFilteringUpdate(deploymentWithResourceVersion(4), 5);
490+
491+
latch1.countDown();
492+
awaitCachedResourceVersion(resourceId, "3");
493+
latch2.countDown();
494+
awaitCachedResourceVersion(resourceId, "4");
481495

496+
// event for our own rv 3 (older than cached rv 4) — must be deferred
482497
informerEventSource.onUpdate(
483498
deploymentWithResourceVersion(2), deploymentWithResourceVersion(3));
484499

485500
verify(eventHandlerMock, never()).handleEvent(any());
486501

487-
realCache.doneEventFilterModify(resourceId, "4");
502+
latch3.countDown();
488503
}
489504

490-
private TemporaryResourceCache<Deployment> realCacheWithWatchedNamespace() {
491-
var mes = mock(ManagedInformerEventSource.class);
492-
var mim = mock(InformerManager.class);
493-
when(mes.manager()).thenReturn(mim);
494-
when(mim.isWatchingNamespace(any())).thenReturn(true);
495-
when(mim.lastSyncResourceVersion(any())).thenReturn("1");
496-
temporaryResourceCache = spy(new TemporaryResourceCache<>(true, mes));
497-
informerEventSource.setTemporalResourceCache(temporaryResourceCache);
498-
return temporaryResourceCache;
505+
private void awaitCachedResourceVersion(ResourceID resourceId, String resourceVersion) {
506+
await()
507+
.untilAsserted(
508+
() ->
509+
assertThat(
510+
temporaryResourceCache
511+
.getResourceFromCache(resourceId)
512+
.map(d -> d.getMetadata().getResourceVersion()))
513+
.hasValue(resourceVersion));
499514
}
500515

501516
private void assertNoEventProduced() {
@@ -542,6 +557,7 @@ private void withRealTemporaryResourceCache() {
542557
var mes = mock(ManagedInformerEventSource.class);
543558
var mim = mock(InformerManager.class);
544559
when(mes.manager()).thenReturn(mim);
560+
when(mim.isWatchingNamespace(any())).thenReturn(true);
545561
when(mim.lastSyncResourceVersion(any())).thenReturn("1");
546562

547563
temporaryResourceCache = spy(new TemporaryResourceCache<>(true, mes));

0 commit comments

Comments
 (0)