Skip to content

Commit ba0743a

Browse files
committed
delete handling improvements and test improvements
Signed-off-by: Attila Mészáros <a_meszaros@apple.com>
1 parent 21978a4 commit ba0743a

3 files changed

Lines changed: 45 additions & 4 deletions

File tree

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,15 @@ public synchronized void onDelete(R resource, boolean deletedFinalStateUnknown)
122122
log.debug(
123123
"On delete event received. deletedFinalStateUnknown: {}", deletedFinalStateUnknown);
124124
}
125+
var resultEvent =
126+
temporaryResourceCache.onDeleteEvent(resource, deletedFinalStateUnknown);
127+
if (resultEvent.isEmpty()) {
128+
return;
129+
}
130+
if (resultEvent.orElseThrow().getAction() != ResourceAction.DELETED) {
131+
log.warn("Non delete event received on onDelete handling. This should not happen.");
132+
}
125133
primaryToSecondaryIndex.onDelete(resource);
126-
temporaryResourceCache.onDeleteEvent(resource, deletedFinalStateUnknown);
127134
if (acceptedByDeleteFilters(resource, deletedFinalStateUnknown)) {
128135
propagateEvent(resource);
129136
}
@@ -151,15 +158,15 @@ private synchronized void onAddOrUpdate(ResourceAction action, R newObject, R ol
151158
primaryToSecondaryIndex.onAddOrUpdate(newObject);
152159
var resourceID = ResourceID.fromResource(newObject);
153160

154-
var eventHandling = temporaryResourceCache.onAddOrUpdateEvent(action, newObject, oldObject);
161+
var resultEvent = temporaryResourceCache.onAddOrUpdateEvent(action, newObject, oldObject);
155162

156-
if (eventHandling.isEmpty()) {
163+
if (resultEvent.isEmpty()) {
157164
log.debug("Deferring event propagation");
158165
} else if (eventAcceptedByFilter(action, newObject, oldObject)) {
159166
log.debug(
160167
"Propagating event for {}, resource with same version not result of a our update.",
161168
action);
162-
var event = eventHandling.get();
169+
var event = resultEvent.get();
163170
handleEvent(
164171
event.getAction(),
165172
(R) event.getResource().orElseThrow(),

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,15 @@
4848
*
4949
* <p>If comparable resource versions are disabled, then this cache is effectively disabled.
5050
*
51+
* <p>Some principles to realize with the current filtering algorithm:
52+
*
53+
* <ul>
54+
* <li>We propagate events only if we received an event that has the same resourceVersion or newer
55+
* than resource version from update
56+
* <li>The propagated event should correspond to a possible real world scenario - considering also
57+
* ones that could happen is the Informer does a re-list.
58+
* </ul>
59+
*
5160
* @param <T> resource to cache.
5261
*/
5362
public class TemporaryResourceCache<T extends HasMetadata> {

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ void handlesPrevResourceVersionForUpdate() {
180180
deploymentWithResourceVersion(2), deploymentWithResourceVersion(3));
181181

182182
expectHandleAddEvent(3, 1);
183+
expectNoActiveUpdates();
183184
}
184185

185186
@RepeatedTest(REPEAT_COUNT)
@@ -198,6 +199,7 @@ void handlesPrevResourceVersionForUpdateInCaseOfException() {
198199
latch.countDown();
199200

200201
expectHandleAddEvent(2, 1);
202+
expectNoActiveUpdates();
201203
}
202204

203205
@RepeatedTest(REPEAT_COUNT)
@@ -213,6 +215,7 @@ void handlesPrevResourceVersionForUpdateInCaseOfMultipleUpdates() {
213215
latch.countDown();
214216

215217
expectHandleAddEvent(4, 2);
218+
expectNoActiveUpdates();
216219
}
217220

218221
@RepeatedTest(REPEAT_COUNT)
@@ -225,6 +228,7 @@ void doesNotPropagateEventIfReceivedBeforeUpdate() {
225228
latch.countDown();
226229

227230
assertNoEventProduced();
231+
expectNoActiveUpdates();
228232
}
229233

230234
@RepeatedTest(REPEAT_COUNT)
@@ -242,6 +246,7 @@ void multipleCachingFilteringUpdates() {
242246
deploymentWithResourceVersion(3), deploymentWithResourceVersion(4));
243247

244248
assertNoEventProduced();
249+
expectNoActiveUpdates();
245250
}
246251

247252
@RepeatedTest(REPEAT_COUNT)
@@ -260,6 +265,7 @@ void multipleCachingFilteringUpdates_variant2() {
260265
latch2.countDown();
261266

262267
assertNoEventProduced();
268+
expectNoActiveUpdates();
263269
}
264270

265271
@RepeatedTest(REPEAT_COUNT)
@@ -278,6 +284,7 @@ void multipleCachingFilteringUpdates_variant3() {
278284
latch2.countDown();
279285

280286
assertNoEventProduced();
287+
expectNoActiveUpdates();
281288
}
282289

283290
@RepeatedTest(REPEAT_COUNT)
@@ -296,6 +303,7 @@ void multipleCachingFilteringUpdates_variant4() {
296303
latch2.countDown();
297304

298305
assertNoEventProduced();
306+
expectNoActiveUpdates();
299307
}
300308

301309
@RepeatedTest(REPEAT_COUNT)
@@ -314,6 +322,7 @@ void multipleCachingFilteringUpdates_variant5() {
314322
deploymentWithResourceVersion(3), deploymentWithResourceVersion(4));
315323

316324
assertNoEventProduced();
325+
expectNoActiveUpdates();
317326
}
318327

319328
@RepeatedTest(REPEAT_COUNT)
@@ -436,6 +445,7 @@ void propagatesIntermediateEventForExternalUpdateDuringFiltering() {
436445
deploymentWithResourceVersion(4), deploymentWithResourceVersion(5));
437446

438447
expectHandleAddEvent(5, 2);
448+
expectNoActiveUpdates();
439449
}
440450

441451
@RepeatedTest(REPEAT_COUNT)
@@ -464,6 +474,14 @@ void doesNotPropagateIntermediateEventForOurOwnIntermediateUpdate() {
464474
verify(eventHandlerMock, never()).handleEvent(any());
465475

466476
latch3.countDown();
477+
awaitCachedResourceVersion(resourceId, "5");
478+
// drain the filter with the event for our own rv 5 — all events are now own,
479+
// summary must be empty and no event propagated.
480+
informerEventSource.onUpdate(
481+
deploymentWithResourceVersion(4), deploymentWithResourceVersion(5));
482+
483+
assertNoEventProduced();
484+
expectNoActiveUpdates();
467485
}
468486

469487
@RepeatedTest(REPEAT_COUNT)
@@ -481,6 +499,7 @@ void deleteEventPropagatedIfItWasTheLastEvent() {
481499
latch.countDown();
482500

483501
expectHandleDeleteEvent(5);
502+
expectNoActiveUpdates();
484503
}
485504

486505
private void awaitCachedResourceVersion(ResourceID resourceId, String resourceVersion) {
@@ -501,6 +520,12 @@ private void assertNoEventProduced() {
501520
.untilAsserted(() -> verify(informerEventSource, never()).propagateEvent(any()));
502521
}
503522

523+
private void expectNoActiveUpdates() {
524+
await()
525+
.atMost(Duration.ofSeconds(1))
526+
.untilAsserted(() -> assertThat(temporaryResourceCache.getActiveUpdates()).isEmpty());
527+
}
528+
504529
private void expectHandleAddEvent(int newResourceVersion) {
505530
await()
506531
.atMost(Duration.ofSeconds(1))

0 commit comments

Comments
 (0)