Skip to content

Commit 413171e

Browse files
committed
wip
Signed-off-by: Attila Mészáros <a_meszaros@apple.com>
1 parent 51d7087 commit 413171e

5 files changed

Lines changed: 128 additions & 7 deletions

File tree

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ private void handleOnAddOrUpdate(
141141
ResourceAction action, T oldCustomResource, T newCustomResource) {
142142
var handling =
143143
temporaryResourceCache.onAddOrUpdateEvent(action, newCustomResource, oldCustomResource);
144-
if (handling == EventHandling.NEW || handling == EventHandling.IN_BETWEEN) {
144+
if (handling == EventHandling.NEW || handling == EventHandling.INTERMEDIATE) {
145145
handleEvent(action, newCustomResource, oldCustomResource, null);
146146
} else if (log.isDebugEnabled()) {
147147
log.debug("{} event propagation for action: {}", handling, action);

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ private synchronized void onAddOrUpdate(ResourceAction action, R newObject, R ol
154154

155155
var eventHandling = temporaryResourceCache.onAddOrUpdateEvent(action, newObject, oldObject);
156156

157-
if (eventHandling != EventHandling.NEW && eventHandling != EventHandling.IN_BETWEEN) {
157+
if (eventHandling != EventHandling.NEW && eventHandling != EventHandling.INTERMEDIATE) {
158158
log.debug(
159159
"{} event propagation", eventHandling == EventHandling.DEFER ? "Deferring" : "Skipping");
160160
} else if (eventAcceptedByFilter(action, newObject, oldObject)) {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public class TemporaryResourceCache<T extends HasMetadata> {
6565
public enum EventHandling {
6666
DEFER,
6767
OBSOLETE,
68-
IN_BETWEEN,
68+
INTERMEDIATE,
6969
NEW
7070
}
7171

@@ -149,16 +149,16 @@ private synchronized EventHandling onEvent(
149149
// in this case we received and event that might be in some edge case that was
150150
// already used in reconciler or after that, but before our updated resource version.
151151
// That would be hard to distinguish, so for those we are propagating the event further.
152-
log.debug("Received in between event.");
153-
result = EventHandling.IN_BETWEEN;
152+
log.debug("Received intermediate event.");
153+
result = EventHandling.INTERMEDIATE;
154154
}
155155
}
156156
var au = activeUpdates.get(resourceId);
157157
if (au != null) {
158-
if (result == EventHandling.IN_BETWEEN) {
158+
if (result == EventHandling.INTERMEDIATE) {
159159
return au.isOwnResourceVersions(resource.getMetadata().getResourceVersion())
160160
? EventHandling.DEFER
161-
: EventHandling.IN_BETWEEN;
161+
: EventHandling.INTERMEDIATE;
162162
}
163163
if (result == EventHandling.NEW) {
164164
log.debug("Setting last event for id: {} delete: {}", resourceId, delete);

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,15 @@ void processEventPropagationWithIncorrectAnnotation() {
149149
verify(eventHandlerMock, times(1)).handleEvent(any());
150150
}
151151

152+
@Test
153+
void propagatesIntermediateEventHandling() {
154+
when(temporaryResourceCache.onAddOrUpdateEvent(any(), any(), any()))
155+
.thenReturn(EventHandling.INTERMEDIATE);
156+
informerEventSource.onUpdate(testDeployment(), testDeployment());
157+
158+
verify(eventHandlerMock, times(1)).handleEvent(any());
159+
}
160+
152161
@Test
153162
void propagateEventAndRemoveResourceFromTempCacheIfResourceVersionMismatch() {
154163
withRealTemporaryResourceCache();
@@ -439,6 +448,56 @@ void filteringUpdateAndGhostCheckWithNamespaceChange() {
439448
assertThat(temporaryResourceCache.getResourceFromCache(resourceId)).isEmpty();
440449
}
441450

451+
@Test
452+
void propagatesIntermediateEventForExternalUpdateDuringFiltering() {
453+
// Causal-dependency fix: another controller updated the resource between our read
454+
// and our write. The informer delivers that update during our active filter; since
455+
// its resource version is NOT one of our own writes, it must be propagated.
456+
var realCache = realCacheWithWatchedNamespace();
457+
var resourceId = ResourceID.fromResource(testDeployment());
458+
459+
realCache.startEventFilteringModify(resourceId);
460+
realCache.putResource(deploymentWithResourceVersion(4));
461+
462+
informerEventSource.onUpdate(
463+
deploymentWithResourceVersion(2), deploymentWithResourceVersion(3));
464+
465+
verify(eventHandlerMock, times(1)).handleEvent(any());
466+
467+
realCache.doneEventFilterModify(resourceId, "4");
468+
}
469+
470+
@Test
471+
void doesNotPropagateIntermediateEventForOurOwnIntermediateUpdate() {
472+
// Two consecutive own writes within a single filter window: the older one's event
473+
// arrives after the newer one has been cached. Because the version is recorded as
474+
// our own, the event must be deferred (not propagated).
475+
var realCache = realCacheWithWatchedNamespace();
476+
var resourceId = ResourceID.fromResource(testDeployment());
477+
478+
realCache.startEventFilteringModify(resourceId);
479+
realCache.putResource(deploymentWithResourceVersion(3));
480+
realCache.putResource(deploymentWithResourceVersion(4));
481+
482+
informerEventSource.onUpdate(
483+
deploymentWithResourceVersion(2), deploymentWithResourceVersion(3));
484+
485+
verify(eventHandlerMock, never()).handleEvent(any());
486+
487+
realCache.doneEventFilterModify(resourceId, "4");
488+
}
489+
490+
private TemporaryResourceCache<Deployment> realCacheWithWatchedNamespace() {
491+
var mes = mock(ManagedInformerEventSource.class);
492+
var mim = mock(InformerManager.class);
493+
when(mes.manager()).thenReturn(mim);
494+
when(mim.isWatchingNamespace(any())).thenReturn(true);
495+
when(mim.lastSyncResourceVersion(any())).thenReturn("1");
496+
temporaryResourceCache = spy(new TemporaryResourceCache<>(true, mes));
497+
informerEventSource.setTemporalResourceCache(temporaryResourceCache);
498+
return temporaryResourceCache;
499+
}
500+
442501
private void assertNoEventProduced() {
443502
await()
444503
.pollDelay(Duration.ofMillis(50))

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,68 @@ void putAfterEventWithEventFilteringWithPost() {
294294
assertTrue(postEvent.isPresent());
295295
}
296296

297+
@Test
298+
void intermediateEventPropagatedWhenNoActiveUpdate() {
299+
// Cache holds a newer version from a prior own write; no active filter is in progress.
300+
// An older event arriving used to be OBSOLETE; now it must be propagated as INTERMEDIATE
301+
// so callers can react to changes that happened between read and write.
302+
var olderEvent = testResource();
303+
var newer = testResource();
304+
newer.getMetadata().setResourceVersion("3");
305+
306+
temporaryResourceCache.putResource(newer);
307+
assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(olderEvent)))
308+
.isPresent();
309+
310+
var result =
311+
temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.UPDATED, olderEvent, null);
312+
313+
assertThat(result).isEqualTo(EventHandling.INTERMEDIATE);
314+
}
315+
316+
@Test
317+
void intermediateEventPropagatedWhenNotOurOwnUpdate() {
318+
// Causal-dependency scenario: a third party updated the resource between our read and
319+
// our write. Its version arrives as an event but is NOT in our own resource versions,
320+
// so it must be propagated (INTERMEDIATE), not deferred.
321+
var external = testResource(); // rv=2 — written by another controller
322+
var resourceId = ResourceID.fromResource(external);
323+
324+
temporaryResourceCache.startEventFilteringModify(resourceId);
325+
326+
var ourUpdate = testResource();
327+
ourUpdate.getMetadata().setResourceVersion("3");
328+
temporaryResourceCache.putResource(ourUpdate);
329+
330+
var result = temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.UPDATED, external, null);
331+
332+
assertThat(result).isEqualTo(EventHandling.INTERMEDIATE);
333+
}
334+
335+
@Test
336+
void intermediateEventDeferredWhenItIsOurOwnIntermediateUpdate() {
337+
// Two consecutive own writes within the same filter window: the older one's event
338+
// arrives after the newer one is cached. Because the version is recorded as our own,
339+
// the event must be DEFERred rather than propagated.
340+
var testResource = testResource();
341+
var resourceId = ResourceID.fromResource(testResource);
342+
343+
temporaryResourceCache.startEventFilteringModify(resourceId);
344+
345+
var ourFirst = testResource(); // rv=2
346+
temporaryResourceCache.putResource(ourFirst);
347+
348+
var ourSecond = testResource();
349+
ourSecond.getMetadata().setResourceVersion("3");
350+
351+
temporaryResourceCache.startEventFilteringModify(resourceId);
352+
temporaryResourceCache.putResource(ourSecond);
353+
354+
var result = temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.UPDATED, ourFirst, null);
355+
356+
assertThat(result).isEqualTo(EventHandling.DEFER);
357+
}
358+
297359
@Test
298360
void rapidDeletion() {
299361
var testResource = testResource();

0 commit comments

Comments
 (0)