Skip to content

Commit 076611f

Browse files
committed
scheduled cleanup of obsolete resources, improved get
Signed-off-by: Attila Mészáros <a_meszaros@apple.com>
1 parent 25b2d5e commit 076611f

File tree

5 files changed

+37
-13
lines changed

5 files changed

+37
-13
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.concurrent.ExecutorService;
2424
import java.util.concurrent.Executors;
2525
import java.util.concurrent.Future;
26+
import java.util.concurrent.ScheduledExecutorService;
2627
import java.util.concurrent.TimeUnit;
2728
import java.util.concurrent.TimeoutException;
2829
import java.util.function.Function;
@@ -39,7 +40,7 @@ public class ExecutorServiceManager {
3940
private static final Logger log = LoggerFactory.getLogger(ExecutorServiceManager.class);
4041
private ExecutorService executor;
4142
private ExecutorService workflowExecutor;
42-
private ExecutorService cachingExecutorService;
43+
private ScheduledExecutorService cachingExecutorService;
4344
private boolean started;
4445
private ConfigurationService configurationService;
4546

@@ -122,14 +123,14 @@ private synchronized void lazyInitWorkflowExecutorService() {
122123
}
123124
}
124125

125-
public ExecutorService cachingExecutorService() {
126+
public ScheduledExecutorService cachingExecutorService() {
126127
return cachingExecutorService;
127128
}
128129

129130
public void start(ConfigurationService configurationService) {
130131
if (!started) {
131132
this.configurationService = configurationService; // used to lazy init workflow executor
132-
this.cachingExecutorService = Executors.newCachedThreadPool();
133+
this.cachingExecutorService = Executors.newScheduledThreadPool(0);
133134
this.executor = new InstrumentedExecutorService(configurationService.getExecutorService());
134135
started = true;
135136
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,14 @@ public synchronized void start() {
153153
if (isRunning()) {
154154
return;
155155
}
156-
temporaryResourceCache = new TemporaryResourceCache<>(comparableResourceVersions, this);
156+
temporaryResourceCache =
157+
new TemporaryResourceCache<>(
158+
comparableResourceVersions,
159+
controllerConfiguration
160+
.getConfigurationService()
161+
.getExecutorServiceManager()
162+
.cachingExecutorService(),
163+
this);
157164
this.cache = new InformerManager<>(client, configuration, this);
158165
cache.setControllerConfiguration(controllerConfiguration);
159166
cache.addIndexers(indexers);
@@ -192,9 +199,10 @@ public Optional<R> get(ResourceID resourceID) {
192199
var res = cache.get(resourceID);
193200
if (comparableResourceVersions
194201
&& resource.isPresent()
195-
&& res.filter(
196-
r -> ReconcilerUtilsInternal.compareResourceVersions(r, resource.orElseThrow()) > 0)
197-
.isEmpty()) {
202+
&& ReconcilerUtilsInternal.compareResourceVersions(
203+
resource.get().getMetadata().getResourceVersion(),
204+
manager().lastSyncResourceVersion(resource.get().getMetadata().getNamespace()))
205+
> 0) {
198206
log.debug("Latest resource found in temporary cache for Resource ID: {}", resourceID);
199207
return resource;
200208
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import java.util.Map;
2020
import java.util.Optional;
2121
import java.util.concurrent.ConcurrentHashMap;
22+
import java.util.concurrent.ScheduledExecutorService;
23+
import java.util.concurrent.TimeUnit;
2224

2325
import org.slf4j.Logger;
2426
import org.slf4j.LoggerFactory;
@@ -72,20 +74,30 @@ public enum EventHandling {
7274

7375
public TemporaryResourceCache(
7476
boolean comparableResourceVersions,
77+
ScheduledExecutorService obsoleteCheckExecutor,
7578
ManagedInformerEventSource<T, ?, ?> managedInformerEventSource) {
7679
this(
7780
comparableResourceVersions,
7881
DEFAULT_OBSOLETE_RESOURCE_CHECK_INTERVAL,
82+
obsoleteCheckExecutor,
7983
managedInformerEventSource);
8084
}
8185

8286
TemporaryResourceCache(
8387
boolean comparableResourceVersions,
8488
long obsoleteResourceCheckInterval,
89+
ScheduledExecutorService obsoleteCheckExecutor,
8590
ManagedInformerEventSource<T, ?, ?> managedInformerEventSource) {
8691
this.comparableResourceVersions = comparableResourceVersions;
8792
this.obsoleteResourceCheckInterval = obsoleteResourceCheckInterval;
8893
this.managedInformerEventSource = managedInformerEventSource;
94+
if (comparableResourceVersions) {
95+
obsoleteCheckExecutor.scheduleWithFixedDelay(
96+
this::checkObsoleteResources,
97+
obsoleteResourceCheckInterval,
98+
obsoleteResourceCheckInterval,
99+
TimeUnit.MILLISECONDS);
100+
}
89101
}
90102

91103
public synchronized void startEventFilteringModify(ResourceID resourceID) {
@@ -161,7 +173,6 @@ private synchronized EventHandling onEvent(
161173
result = EventHandling.OBSOLETE;
162174
}
163175
}
164-
checkObsoleteResources();
165176
var ed = activeUpdates.get(resourceId);
166177
if (ed != null && result != EventHandling.OBSOLETE) {
167178
log.debug("Setting last event for id: {} delete: {}", resourceId, delete);
@@ -227,7 +238,7 @@ private String getLatestResourceVersion(String namespace) {
227238
return managedInformerEventSource.manager().lastSyncResourceVersion(namespace);
228239
}
229240

230-
void checkObsoleteResources() {
241+
private void checkObsoleteResources() {
231242
if (System.currentTimeMillis() > lastObsoleteResourceCheck + obsoleteResourceCheckInterval) {
232243
lastObsoleteResourceCheck = System.currentTimeMillis();
233244
log.debug("Checking for obsolete resources.");

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.Optional;
2020
import java.util.Set;
2121
import java.util.concurrent.CountDownLatch;
22+
import java.util.concurrent.ScheduledExecutorService;
2223

2324
import org.junit.jupiter.api.BeforeEach;
2425
import org.junit.jupiter.api.Test;
@@ -381,7 +382,8 @@ private void withRealTemporaryResourceCache() {
381382
when(mes.manager()).thenReturn(mim);
382383
when(mim.lastSyncResourceVersion(any())).thenReturn("1");
383384

384-
temporaryResourceCache = spy(new TemporaryResourceCache<>(true, mes));
385+
temporaryResourceCache =
386+
spy(new TemporaryResourceCache<>(true, mock(ScheduledExecutorService.class), mes));
385387
informerEventSource.setTemporalResourceCache(temporaryResourceCache);
386388
}
387389

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package io.javaoperatorsdk.operator.processing.event.source.informer;
1717

1818
import java.util.Map;
19+
import java.util.concurrent.ScheduledExecutorService;
1920

2021
import org.junit.jupiter.api.BeforeEach;
2122
import org.junit.jupiter.api.Test;
@@ -46,8 +47,8 @@ void setup() {
4647
var mim = mock(InformerManager.class);
4748
when(mes.manager()).thenReturn(mim);
4849
when(mim.lastSyncResourceVersion(any())).then(a -> latestSyncVersion);
49-
50-
temporaryResourceCache = new TemporaryResourceCache<>(true, mes);
50+
temporaryResourceCache =
51+
new TemporaryResourceCache<>(true, mock(ScheduledExecutorService.class), mes);
5152
}
5253

5354
@Test
@@ -124,7 +125,8 @@ void removesResourceFromCache() {
124125
@Test
125126
void nonComparableResourceVersionsDisables() {
126127
this.temporaryResourceCache =
127-
new TemporaryResourceCache<>(false, mock(ManagedInformerEventSource.class));
128+
new TemporaryResourceCache<>(
129+
false, mock(ScheduledExecutorService.class), mock(ManagedInformerEventSource.class));
128130

129131
this.temporaryResourceCache.putResource(testResource());
130132

0 commit comments

Comments
 (0)