Skip to content

Commit a1bf1f1

Browse files
committed
Obsolete resource handling for read-cache-after-write
Signed-off-by: Attila Mészáros <a_meszaros@apple.com>
1 parent 78eecc0 commit a1bf1f1

File tree

4 files changed

+47
-7
lines changed

4 files changed

+47
-7
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ public synchronized void start() {
153153
if (isRunning()) {
154154
return;
155155
}
156-
temporaryResourceCache = new TemporaryResourceCache<>(comparableResourceVersions);
156+
temporaryResourceCache = new TemporaryResourceCache<>(comparableResourceVersions, this);
157157
this.cache = new InformerManager<>(client, configuration, this);
158158
cache.setControllerConfiguration(controllerConfiguration);
159159
cache.addIndexers(indexers);

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,21 +54,39 @@
5454
public class TemporaryResourceCache<T extends HasMetadata> {
5555

5656
private static final Logger log = LoggerFactory.getLogger(TemporaryResourceCache.class);
57+
public static final long DEFAULT_OBSOLETE_RESOURCE_CHECK_INTERVAL = 1000 * 60 * 3L;
5758

5859
private final Map<ResourceID, T> cache = new ConcurrentHashMap<>();
60+
private final Map<ResourceID, EventFilterDetails> activeUpdates = new HashMap<>();
5961
private final boolean comparableResourceVersions;
6062
private String latestResourceVersion;
6163

62-
private final Map<ResourceID, EventFilterDetails> activeUpdates = new HashMap<>();
64+
private final long obsoleteResourceCheckInterval;
65+
private volatile long lastObsoleteResourceCheck = System.currentTimeMillis();
66+
private ManagedInformerEventSource<T, ?, ?> managedInformerEventSource;
6367

6468
public enum EventHandling {
6569
DEFER,
6670
OBSOLETE,
6771
NEW
6872
}
6973

70-
public TemporaryResourceCache(boolean comparableResourceVersions) {
74+
public TemporaryResourceCache(
75+
boolean comparableResourceVersions,
76+
ManagedInformerEventSource<T, ?, ?> managedInformerEventSource) {
77+
this(
78+
comparableResourceVersions,
79+
DEFAULT_OBSOLETE_RESOURCE_CHECK_INTERVAL,
80+
managedInformerEventSource);
81+
}
82+
83+
TemporaryResourceCache(
84+
boolean comparableResourceVersions,
85+
long obsoleteResourceCheckInterval,
86+
ManagedInformerEventSource<T, ?, ?> managedInformerEventSource) {
7187
this.comparableResourceVersions = comparableResourceVersions;
88+
this.obsoleteResourceCheckInterval = obsoleteResourceCheckInterval;
89+
this.managedInformerEventSource = managedInformerEventSource;
7290
}
7391

7492
public synchronized void startEventFilteringModify(ResourceID resourceID) {
@@ -148,6 +166,7 @@ private synchronized EventHandling onEvent(
148166
result = EventHandling.OBSOLETE;
149167
}
150168
}
169+
checkObsoleteResources();
151170
var ed = activeUpdates.get(resourceId);
152171
if (ed != null && result != EventHandling.OBSOLETE) {
153172
log.debug("Setting last event for id: {} delete: {}", resourceId, delete);
@@ -208,6 +227,23 @@ public synchronized void putResource(T newResource) {
208227
}
209228
}
210229

230+
void checkObsoleteResources() {
231+
if (System.currentTimeMillis() > lastObsoleteResourceCheck + obsoleteResourceCheckInterval) {
232+
lastObsoleteResourceCheck = System.currentTimeMillis();
233+
log.debug("Checking for obsolete resources.");
234+
var iterator = cache.entrySet().iterator();
235+
while (iterator.hasNext()) {
236+
var e = iterator.next();
237+
if (ReconcilerUtilsInternal.compareResourceVersions(
238+
e.getValue().getMetadata().getResourceVersion(), latestResourceVersion)
239+
< 0) iterator.remove();
240+
// todo propagate event
241+
managedInformerEventSource.handleEvent(ResourceAction.DELETED, e.getValue(), null, true);
242+
log.debug("Removing obsolete resource with ID: {}", e.getKey());
243+
}
244+
}
245+
}
246+
211247
public synchronized Optional<T> getResourceFromCache(ResourceID resourceID) {
212248
return Optional.ofNullable(cache.get(resourceID));
213249
}

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,8 @@ private CountDownLatch sendForEventFilteringUpdate(Deployment deployment, int re
376376
}
377377

378378
private void withRealTemporaryResourceCache() {
379-
temporaryResourceCache = spy(new TemporaryResourceCache<>(true));
379+
temporaryResourceCache =
380+
spy(new TemporaryResourceCache<>(true, mock(ManagedInformerEventSource.class)));
380381
informerEventSource.setTemporalResourceCache(temporaryResourceCache);
381382
}
382383

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
import static org.assertj.core.api.Assertions.assertThat;
3131
import static org.junit.jupiter.api.Assertions.assertTrue;
32+
import static org.mockito.Mockito.mock;
3233

3334
class TemporaryPrimaryResourceCacheTest {
3435

@@ -38,7 +39,8 @@ class TemporaryPrimaryResourceCacheTest {
3839

3940
@BeforeEach
4041
void setup() {
41-
temporaryResourceCache = new TemporaryResourceCache<>(true);
42+
temporaryResourceCache =
43+
new TemporaryResourceCache<>(true, mock(ManagedInformerEventSource.class));
4244
}
4345

4446
@Test
@@ -114,7 +116,8 @@ void removesResourceFromCache() {
114116

115117
@Test
116118
void nonComparableResourceVersionsDisables() {
117-
this.temporaryResourceCache = new TemporaryResourceCache<>(false);
119+
this.temporaryResourceCache =
120+
new TemporaryResourceCache<>(false, mock(ManagedInformerEventSource.class));
118121

119122
this.temporaryResourceCache.putResource(testResource());
120123

@@ -123,7 +126,7 @@ void nonComparableResourceVersionsDisables() {
123126
}
124127

125128
@Test
126-
void eventReceivedDuringFiltering() throws Exception {
129+
void eventReceivedDuringFiltering() {
127130
var testResource = testResource();
128131

129132
temporaryResourceCache.startEventFilteringModify(ResourceID.fromResource(testResource));

0 commit comments

Comments
 (0)