|
19 | 19 | import java.util.Optional; |
20 | 20 | import java.util.Set; |
21 | 21 | import java.util.concurrent.CountDownLatch; |
| 22 | +import java.util.concurrent.Executors; |
22 | 23 | import java.util.concurrent.ScheduledExecutorService; |
23 | 24 |
|
24 | 25 | import org.junit.jupiter.api.BeforeEach; |
@@ -339,6 +340,116 @@ void multipleCachingFilteringUpdates_variant4() { |
339 | 340 | assertNoEventProduced(); |
340 | 341 | } |
341 | 342 |
|
| 343 | + @Test |
| 344 | + void ghostCheckRemovesCachedResourceDuringFilteringUpdate() { |
| 345 | + var mes = mock(ManagedInformerEventSource.class); |
| 346 | + var mim = mock(InformerManager.class); |
| 347 | + when(mes.manager()).thenReturn(mim); |
| 348 | + when(mim.isWatchingNamespace(any())).thenReturn(true); |
| 349 | + when(mim.lastSyncResourceVersion(any())).thenReturn("1"); |
| 350 | + when(mim.get(any())).thenReturn(Optional.empty()); |
| 351 | + |
| 352 | + var ghostCheckExecutor = Executors.newScheduledThreadPool(1); |
| 353 | + temporaryResourceCache = spy(new TemporaryResourceCache<>(true, 50, ghostCheckExecutor, mes)); |
| 354 | + informerEventSource.setTemporalResourceCache(temporaryResourceCache); |
| 355 | + |
| 356 | + // put resource in cache and start a filtering update |
| 357 | + var deployment = deploymentWithResourceVersion(2); |
| 358 | + temporaryResourceCache.putResource(deployment); |
| 359 | + var resourceId = ResourceID.fromResource(deployment); |
| 360 | + temporaryResourceCache.startEventFilteringModify(resourceId); |
| 361 | + |
| 362 | + // advance sync version so ghost check considers the cached resource outdated |
| 363 | + when(mim.lastSyncResourceVersion(any())).thenReturn("3"); |
| 364 | + |
| 365 | + // ghost check should remove the cached resource |
| 366 | + await() |
| 367 | + .untilAsserted( |
| 368 | + () -> assertThat(temporaryResourceCache.getResourceFromCache(resourceId)).isEmpty()); |
| 369 | + |
| 370 | + // complete the filtering update - the resource should not reappear |
| 371 | + temporaryResourceCache.doneEventFilterModify(resourceId, "2"); |
| 372 | + assertThat(temporaryResourceCache.getResourceFromCache(resourceId)).isEmpty(); |
| 373 | + |
| 374 | + ghostCheckExecutor.shutdownNow(); |
| 375 | + } |
| 376 | + |
| 377 | + @Test |
| 378 | + void ghostCheckRunsConcurrentlyWithPutResource() { |
| 379 | + var mes = mock(ManagedInformerEventSource.class); |
| 380 | + var mim = mock(InformerManager.class); |
| 381 | + when(mes.manager()).thenReturn(mim); |
| 382 | + when(mim.isWatchingNamespace(any())).thenReturn(true); |
| 383 | + when(mim.lastSyncResourceVersion(any())).thenReturn("1"); |
| 384 | + when(mim.get(any())).thenReturn(Optional.empty()); |
| 385 | + |
| 386 | + var ghostCheckExecutor = Executors.newScheduledThreadPool(1); |
| 387 | + temporaryResourceCache = spy(new TemporaryResourceCache<>(true, 50, ghostCheckExecutor, mes)); |
| 388 | + informerEventSource.setTemporalResourceCache(temporaryResourceCache); |
| 389 | + |
| 390 | + // put a resource that will become a ghost |
| 391 | + var deployment = deploymentWithResourceVersion(2); |
| 392 | + temporaryResourceCache.putResource(deployment); |
| 393 | + |
| 394 | + // advance sync version so ghost check removes it |
| 395 | + when(mim.lastSyncResourceVersion(any())).thenReturn("3"); |
| 396 | + |
| 397 | + await() |
| 398 | + .untilAsserted( |
| 399 | + () -> |
| 400 | + assertThat( |
| 401 | + temporaryResourceCache.getResourceFromCache( |
| 402 | + ResourceID.fromResource(deployment))) |
| 403 | + .isEmpty()); |
| 404 | + |
| 405 | + // now put a newer resource - should succeed even after ghost removal |
| 406 | + var newerDeployment = deploymentWithResourceVersion(4); |
| 407 | + temporaryResourceCache.putResource(newerDeployment); |
| 408 | + assertThat( |
| 409 | + temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(newerDeployment))) |
| 410 | + .isPresent(); |
| 411 | + |
| 412 | + ghostCheckExecutor.shutdownNow(); |
| 413 | + } |
| 414 | + |
| 415 | + @Test |
| 416 | + void filteringUpdateAndGhostCheckWithNamespaceChange() { |
| 417 | + var mes = mock(ManagedInformerEventSource.class); |
| 418 | + var mim = mock(InformerManager.class); |
| 419 | + when(mes.manager()).thenReturn(mim); |
| 420 | + when(mim.isWatchingNamespace(any())).thenReturn(true); |
| 421 | + when(mim.lastSyncResourceVersion(any())).thenReturn("1"); |
| 422 | + when(mim.get(any())).thenReturn(Optional.empty()); |
| 423 | + |
| 424 | + var ghostCheckExecutor = Executors.newScheduledThreadPool(1); |
| 425 | + temporaryResourceCache = spy(new TemporaryResourceCache<>(true, 50, ghostCheckExecutor, mes)); |
| 426 | + informerEventSource.setTemporalResourceCache(temporaryResourceCache); |
| 427 | + |
| 428 | + // start filtering update and put resource |
| 429 | + var deployment = deploymentWithResourceVersion(2); |
| 430 | + var resourceId = ResourceID.fromResource(deployment); |
| 431 | + temporaryResourceCache.startEventFilteringModify(resourceId); |
| 432 | + temporaryResourceCache.putResource(deployment); |
| 433 | + |
| 434 | + // namespace becomes unwatched - ghost check should clean up |
| 435 | + when(mim.isWatchingNamespace(any())).thenReturn(false); |
| 436 | + |
| 437 | + await() |
| 438 | + .untilAsserted( |
| 439 | + () -> assertThat(temporaryResourceCache.getResourceFromCache(resourceId)).isEmpty()); |
| 440 | + |
| 441 | + // complete the filtering update |
| 442 | + var doneResult = temporaryResourceCache.doneEventFilterModify(resourceId, "2"); |
| 443 | + // resource was already cleaned by ghost check, so no deferred event |
| 444 | + assertThat(doneResult).isEmpty(); |
| 445 | + |
| 446 | + // put should be rejected since namespace is no longer watched |
| 447 | + temporaryResourceCache.putResource(deploymentWithResourceVersion(3)); |
| 448 | + assertThat(temporaryResourceCache.getResourceFromCache(resourceId)).isEmpty(); |
| 449 | + |
| 450 | + ghostCheckExecutor.shutdownNow(); |
| 451 | + } |
| 452 | + |
342 | 453 | private void assertNoEventProduced() { |
343 | 454 | await() |
344 | 455 | .pollDelay(Duration.ofMillis(50)) |
|
0 commit comments