Skip to content

Commit d9ccfb9

Browse files
committed
wip
Signed-off-by: Attila Mészáros <a_meszaros@apple.com>
1 parent 4d6f4b0 commit d9ccfb9

File tree

9 files changed

+77
-22
lines changed

9 files changed

+77
-22
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/Informer.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,11 @@
2727
import io.javaoperatorsdk.operator.processing.event.source.filter.OnAddFilter;
2828
import io.javaoperatorsdk.operator.processing.event.source.filter.OnDeleteFilter;
2929
import io.javaoperatorsdk.operator.processing.event.source.filter.OnUpdateFilter;
30+
import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache;
3031

3132
import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_COMPARABLE_RESOURCE_VERSION;
3233
import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_FOLLOW_CONTROLLER_NAMESPACE_CHANGES;
34+
import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_OBSOLETE_RESOURCE_CHECK_INTERVAL_MILLIS;
3335
import static io.javaoperatorsdk.operator.api.reconciler.Constants.NO_LONG_VALUE_SET;
3436
import static io.javaoperatorsdk.operator.api.reconciler.Constants.NO_VALUE_SET;
3537

@@ -139,4 +141,13 @@
139141
* @since 5.3.0
140142
*/
141143
boolean comparableResourceVersions() default DEFAULT_COMPARABLE_RESOURCE_VERSION;
144+
145+
/**
146+
* For read-cache-after-write consistency there are some corner cases where we need to check the
147+
* caches see {@link TemporaryResourceCache#checkObsoleteResources()} periodically. This is the
148+
* period in milliseconds. Applicable only if {@link #comparableResourceVersions()}} is true.
149+
*
150+
* @since 5.3.0
151+
*/
152+
long obsoleteResourceCacheCheckInterval() default DEFAULT_OBSOLETE_RESOURCE_CHECK_INTERVAL_MILLIS;
142153
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerConfiguration.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package io.javaoperatorsdk.operator.api.config.informer;
1717

18+
import java.time.Duration;
1819
import java.util.Arrays;
1920
import java.util.Collection;
2021
import java.util.Collections;
@@ -54,6 +55,7 @@ public class InformerConfiguration<R extends HasMetadata> {
5455
private Long informerListLimit;
5556
private FieldSelector fieldSelector;
5657
private boolean comparableResourceVersions;
58+
private Duration obsoleteResourceCacheCheckInterval;
5759

5860
protected InformerConfiguration(
5961
Class<R> resourceClass,
@@ -68,7 +70,8 @@ protected InformerConfiguration(
6870
ItemStore<R> itemStore,
6971
Long informerListLimit,
7072
FieldSelector fieldSelector,
71-
boolean comparableResourceVersions) {
73+
boolean comparableResourceVersions,
74+
Duration obsoleteResourceCacheCheckInterval) {
7275
this(resourceClass);
7376
this.name = name;
7477
this.namespaces = namespaces;
@@ -82,6 +85,7 @@ protected InformerConfiguration(
8285
this.informerListLimit = informerListLimit;
8386
this.fieldSelector = fieldSelector;
8487
this.comparableResourceVersions = comparableResourceVersions;
88+
this.obsoleteResourceCacheCheckInterval = obsoleteResourceCacheCheckInterval;
8589
}
8690

8791
private InformerConfiguration(Class<R> resourceClass) {
@@ -117,7 +121,8 @@ public static <R extends HasMetadata> InformerConfiguration<R>.Builder builder(
117121
original.itemStore,
118122
original.informerListLimit,
119123
original.fieldSelector,
120-
original.comparableResourceVersions)
124+
original.comparableResourceVersions,
125+
original.obsoleteResourceCacheCheckInterval)
121126
.builder;
122127
}
123128

@@ -296,6 +301,10 @@ public boolean isComparableResourceVersions() {
296301
return comparableResourceVersions;
297302
}
298303

304+
public Duration getObsoleteResourceCacheCheckInterval() {
305+
return obsoleteResourceCacheCheckInterval;
306+
}
307+
299308
@SuppressWarnings("UnusedReturnValue")
300309
public class Builder {
301310

@@ -368,6 +377,8 @@ public InformerConfiguration<R>.Builder initFromAnnotation(
368377
.map(f -> new FieldSelector.Field(f.path(), f.value(), f.negated()))
369378
.toList()));
370379
withComparableResourceVersions(informerConfig.comparableResourceVersions());
380+
withObsoleteResourceCacheCheckInterval(
381+
Duration.ofMillis(informerConfig.obsoleteResourceCacheCheckInterval()));
371382
}
372383
return this;
373384
}
@@ -473,5 +484,12 @@ public Builder withComparableResourceVersions(boolean comparableResourceVersions
473484
InformerConfiguration.this.comparableResourceVersions = comparableResourceVersions;
474485
return this;
475486
}
487+
488+
public Builder withObsoleteResourceCacheCheckInterval(
489+
Duration obsoleteResourceCacheCheckInterval) {
490+
InformerConfiguration.this.obsoleteResourceCacheCheckInterval =
491+
obsoleteResourceCacheCheckInterval;
492+
return this;
493+
}
476494
}
477495
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerEventSourceConfiguration.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package io.javaoperatorsdk.operator.api.config.informer;
1717

18+
import java.time.Duration;
1819
import java.util.Objects;
1920
import java.util.Optional;
2021
import java.util.Set;
@@ -34,6 +35,7 @@
3435
import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers;
3536

3637
import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_COMPARABLE_RESOURCE_VERSION;
38+
import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_OBSOLETE_RESOURCE_CHECK_INTERVAL;
3739
import static io.javaoperatorsdk.operator.api.reconciler.Constants.SAME_AS_CONTROLLER_NAMESPACES_SET;
3840
import static io.javaoperatorsdk.operator.api.reconciler.Constants.WATCH_ALL_NAMESPACE_SET;
3941
import static io.javaoperatorsdk.operator.api.reconciler.Constants.WATCH_CURRENT_NAMESPACE_SET;
@@ -90,6 +92,10 @@ default Optional<KubernetesClient> getKubernetesClient() {
9092
return Optional.empty();
9193
}
9294

95+
boolean comparableResourceVersion();
96+
97+
Duration getObsoleteResourceCacheCheckInterval();
98+
9399
class DefaultInformerEventSourceConfiguration<R extends HasMetadata>
94100
implements InformerEventSourceConfiguration<R> {
95101
private final PrimaryToSecondaryMapper<?> primaryToSecondaryMapper;
@@ -98,20 +104,23 @@ class DefaultInformerEventSourceConfiguration<R extends HasMetadata>
98104
private final InformerConfiguration<R> informerConfig;
99105
private final KubernetesClient kubernetesClient;
100106
private final boolean comparableResourceVersion;
107+
private final Duration obsoleteResourceCacheCheckInterval;
101108

102109
protected DefaultInformerEventSourceConfiguration(
103110
GroupVersionKind groupVersionKind,
104111
PrimaryToSecondaryMapper<?> primaryToSecondaryMapper,
105112
SecondaryToPrimaryMapper<R> secondaryToPrimaryMapper,
106113
InformerConfiguration<R> informerConfig,
107114
KubernetesClient kubernetesClient,
108-
boolean comparableResourceVersion) {
115+
boolean comparableResourceVersion,
116+
Duration obsoleteResourceCacheCheckInterval) {
109117
this.informerConfig = Objects.requireNonNull(informerConfig);
110118
this.groupVersionKind = groupVersionKind;
111119
this.primaryToSecondaryMapper = primaryToSecondaryMapper;
112120
this.secondaryToPrimaryMapper = secondaryToPrimaryMapper;
113121
this.kubernetesClient = kubernetesClient;
114122
this.comparableResourceVersion = comparableResourceVersion;
123+
this.obsoleteResourceCacheCheckInterval = obsoleteResourceCacheCheckInterval;
115124
}
116125

117126
@Override
@@ -144,6 +153,11 @@ public Optional<KubernetesClient> getKubernetesClient() {
144153
public boolean comparableResourceVersion() {
145154
return this.comparableResourceVersion;
146155
}
156+
157+
@Override
158+
public Duration getObsoleteResourceCacheCheckInterval() {
159+
return obsoleteResourceCacheCheckInterval;
160+
}
147161
}
148162

149163
@SuppressWarnings({"unused", "UnusedReturnValue"})
@@ -158,6 +172,7 @@ class Builder<R extends HasMetadata> {
158172
private SecondaryToPrimaryMapper<R> secondaryToPrimaryMapper;
159173
private KubernetesClient kubernetesClient;
160174
private boolean comparableResourceVersion = DEFAULT_COMPARABLE_RESOURCE_VERSION;
175+
private Duration obsoleteResourceCacheCheckInterval = DEFAULT_OBSOLETE_RESOURCE_CHECK_INTERVAL;
161176

162177
private Builder(Class<R> resourceClass, Class<? extends HasMetadata> primaryResourceClass) {
163178
this(resourceClass, primaryResourceClass, null);
@@ -300,6 +315,12 @@ public Builder<R> withComparableResourceVersion(boolean comparableResourceVersio
300315
return this;
301316
}
302317

318+
public Builder<R> withObsoleteResourceCacheCheckInterval(
319+
Duration obsoleteResourceCacheCheckInterval) {
320+
this.obsoleteResourceCacheCheckInterval = obsoleteResourceCacheCheckInterval;
321+
return this;
322+
}
323+
303324
public void updateFrom(InformerConfiguration<R> informerConfig) {
304325
if (informerConfig != null) {
305326
final var informerConfigName = informerConfig.getName();
@@ -340,9 +361,8 @@ public InformerEventSourceConfiguration<R> build() {
340361
false)),
341362
config.build(),
342363
kubernetesClient,
343-
comparableResourceVersion);
364+
comparableResourceVersion,
365+
obsoleteResourceCacheCheckInterval);
344366
}
345367
}
346-
347-
boolean comparableResourceVersion();
348368
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Constants.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package io.javaoperatorsdk.operator.api.reconciler;
1717

18+
import java.time.Duration;
1819
import java.util.Collections;
1920
import java.util.Set;
2021

@@ -43,5 +44,9 @@ public final class Constants {
4344
public static final boolean DEFAULT_FOLLOW_CONTROLLER_NAMESPACE_CHANGES = true;
4445
public static final boolean DEFAULT_COMPARABLE_RESOURCE_VERSION = true;
4546

47+
public static final long DEFAULT_OBSOLETE_RESOURCE_CHECK_INTERVAL_MILLIS = 3 * 60 * 1000;
48+
public static final Duration DEFAULT_OBSOLETE_RESOURCE_CHECK_INTERVAL =
49+
Duration.ofMillis(DEFAULT_OBSOLETE_RESOURCE_CHECK_INTERVAL_MILLIS);
50+
4651
private Constants() {}
4752
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ public synchronized void start() {
156156
temporaryResourceCache =
157157
new TemporaryResourceCache<>(
158158
comparableResourceVersions,
159+
configuration.getInformerConfig().getObsoleteResourceCacheCheckInterval().toMillis(),
159160
controllerConfiguration
160161
.getConfigurationService()
161162
.getExecutorServiceManager()

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@
5656
public class TemporaryResourceCache<T extends HasMetadata> {
5757

5858
private static final Logger log = LoggerFactory.getLogger(TemporaryResourceCache.class);
59-
public static final long DEFAULT_OBSOLETE_RESOURCE_CHECK_INTERVAL = 1000 * 60 * 3L;
6059

6160
private final Map<ResourceID, T> cache = new ConcurrentHashMap<>();
6261
private final Map<ResourceID, EventFilterDetails> activeUpdates = new HashMap<>();
@@ -73,17 +72,6 @@ public enum EventHandling {
7372
}
7473

7574
public TemporaryResourceCache(
76-
boolean comparableResourceVersions,
77-
ScheduledExecutorService obsoleteCheckExecutor,
78-
ManagedInformerEventSource<T, ?, ?> managedInformerEventSource) {
79-
this(
80-
comparableResourceVersions,
81-
DEFAULT_OBSOLETE_RESOURCE_CHECK_INTERVAL,
82-
obsoleteCheckExecutor,
83-
managedInformerEventSource);
84-
}
85-
86-
TemporaryResourceCache(
8775
boolean comparableResourceVersions,
8876
long obsoleteResourceCheckInterval,
8977
ScheduledExecutorService obsoleteCheckExecutor,
@@ -238,6 +226,13 @@ private String getLatestResourceVersion(String namespace) {
238226
return managedInformerEventSource.manager().lastSyncResourceVersion(namespace);
239227
}
240228

229+
/**
230+
* There are (probably) extremely rare circumstances, when we can miss a delete event related to a
231+
* resources: when we create a resource that is deleted right after by third party and the related
232+
* informer have a disconnected watch and this watch needs to do a re-list when connected again.
233+
* In this case neither the ADD nor DELETE event will be propagated to the informer, but we
234+
* explicitly add resources to this cache. Those are cleaned up by this check.
235+
*/
241236
private void checkObsoleteResources() {
242237
if (System.currentTimeMillis() >= lastObsoleteResourceCheck + obsoleteResourceCheckInterval) {
243238
lastObsoleteResourceCheck = System.currentTimeMillis();

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
3131
import io.javaoperatorsdk.operator.api.config.ResolvedControllerConfiguration;
3232
import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
33+
import io.javaoperatorsdk.operator.api.reconciler.Constants;
3334
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
3435
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
3536
import io.javaoperatorsdk.operator.processing.Controller;
@@ -60,8 +61,11 @@ class ControllerEventSourceTest
6061

6162
@BeforeEach
6263
public void setup() {
63-
6464
when(controllerConfig.getConfigurationService()).thenReturn(new BaseConfigurationService());
65+
var ic = mock(InformerConfiguration.class);
66+
when(controllerConfig.getInformerConfig()).thenReturn(ic);
67+
when(ic.getObsoleteResourceCacheCheckInterval())
68+
.thenReturn(Constants.DEFAULT_OBSOLETE_RESOURCE_CHECK_INTERVAL);
6569

6670
setUpSource(new ControllerEventSource<>(testController), true, controllerConfig);
6771
}

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,7 @@ private void withRealTemporaryResourceCache() {
383383
when(mim.lastSyncResourceVersion(any())).thenReturn("1");
384384

385385
temporaryResourceCache =
386-
spy(new TemporaryResourceCache<>(true, mock(ScheduledExecutorService.class), mes));
386+
spy(new TemporaryResourceCache<>(true, 0, mock(ScheduledExecutorService.class), mes));
387387
informerEventSource.setTemporalResourceCache(temporaryResourceCache);
388388
}
389389

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import static org.mockito.Mockito.mock;
3838
import static org.mockito.Mockito.when;
3939

40+
@SuppressWarnings({"unchecked", "rawtypes"})
4041
class TemporaryPrimaryResourceCacheTest {
4142

4243
public static final String RESOURCE_VERSION = "2";
@@ -56,7 +57,7 @@ void setup() {
5657
when(mim.lastSyncResourceVersion(any())).then(a -> latestSyncVersion);
5758
temporaryResourceCache =
5859
new TemporaryResourceCache<>(
59-
true, mock(ScheduledExecutorService.class), managedInformerEventSource);
60+
true, 0, mock(ScheduledExecutorService.class), managedInformerEventSource);
6061
}
6162

6263
@Test
@@ -134,7 +135,7 @@ void removesResourceFromCache() {
134135
void nonComparableResourceVersionsDisables() {
135136
this.temporaryResourceCache =
136137
new TemporaryResourceCache<>(
137-
false, mock(ScheduledExecutorService.class), mock(ManagedInformerEventSource.class));
138+
false, 0, mock(ScheduledExecutorService.class), mock(ManagedInformerEventSource.class));
138139

139140
this.temporaryResourceCache.putResource(testResource());
140141

0 commit comments

Comments
 (0)