Skip to content

Commit 66cad0a

Browse files
committed
feat: support delete event reconiliation
The idea is that reconcile method will be called on resource delete event. What would allow to clear inMemory caches without using a finalizer (thus Cleaner interface) Signed-off-by: Attila Mészáros <a_meszaros@apple.com>
1 parent 29cd363 commit 66cad0a

File tree

9 files changed

+105
-26
lines changed

9 files changed

+105
-26
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,4 +92,8 @@ default String fieldManager() {
9292
}
9393

9494
<C> C getConfigurationFor(DependentResourceSpec<?, P, C> spec);
95+
96+
default boolean reconcileOnPrimaryDelete() {
97+
return false;
98+
}
9599
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Context.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ default <R> Stream<R> getSecondaryResourcesAsStream(Class<R> expectedType) {
5555
@SuppressWarnings("unused")
5656
IndexedResourceCache<P> getPrimaryCache();
5757

58+
boolean isPrimaryDeleted();
59+
5860
/**
5961
* Determines whether a new reconciliation will be triggered right after the current
6062
* reconciliation is finished. This allows to optimize certain situations, helping avoid unneeded

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,4 +77,10 @@ MaxReconciliationInterval maxReconciliationInterval() default
7777
* @return the name used as field manager for SSA operations
7878
*/
7979
String fieldManager() default CONTROLLER_NAME_AS_FIELD_MANAGER;
80+
81+
/**
82+
* Will trigger reconciliation on delete event of the primary resource. Can be set to true only if
83+
* the reconciler does not implement {@link Cleaner} interface.
84+
*/
85+
boolean reconcileOnPrimaryDelete() default false;
8086
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,15 @@ public class DefaultContext<P extends HasMetadata> implements Context<P> {
2424
private final ControllerConfiguration<P> controllerConfiguration;
2525
private final DefaultManagedWorkflowAndDependentResourceContext<P>
2626
defaultManagedDependentResourceContext;
27+
private final boolean isPrimaryDeleted;
2728

28-
public DefaultContext(RetryInfo retryInfo, Controller<P> controller, P primaryResource) {
29+
public DefaultContext(
30+
RetryInfo retryInfo, Controller<P> controller, P primaryResource, boolean isPrimaryDeleted) {
2931
this.retryInfo = retryInfo;
3032
this.controller = controller;
3133
this.primaryResource = primaryResource;
3234
this.controllerConfiguration = controller.getConfiguration();
35+
this.isPrimaryDeleted = isPrimaryDeleted;
3336
this.defaultManagedDependentResourceContext =
3437
new DefaultManagedWorkflowAndDependentResourceContext<>(controller, primaryResource, this);
3538
}
@@ -49,6 +52,11 @@ public IndexedResourceCache<P> getPrimaryCache() {
4952
return controller.getEventSourceManager().getControllerEventSource();
5053
}
5154

55+
@Override
56+
public boolean isPrimaryDeleted() {
57+
return isPrimaryDeleted;
58+
}
59+
5260
@Override
5361
public boolean isNextReconciliationImminent() {
5462
return controller

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java

Lines changed: 41 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public class EventProcessor<P extends HasMetadata> implements EventHandler, Life
4343
private final Cache<P> cache;
4444
private final EventSourceManager<P> eventSourceManager;
4545
private final RateLimiter<? extends RateLimitState> rateLimiter;
46-
private final ResourceStateManager resourceStateManager = new ResourceStateManager();
46+
private final ResourceStateManager<P> resourceStateManager = new ResourceStateManager<>();
4747
private final Map<String, Object> metricsMetadata;
4848
private ExecutorService executor;
4949

@@ -121,19 +121,24 @@ public synchronized void handleEvent(Event event) {
121121
}
122122
}
123123

124-
private void handleMarkedEventForResource(ResourceState state) {
125-
if (state.deleteEventPresent()) {
124+
private void handleMarkedEventForResource(ResourceState<P> state) {
125+
if (skipDeleteEventProcessing(state)) {
126126
cleanupForDeletedEvent(state.getId());
127-
} else if (!state.processedMarkForDeletionPresent()) {
127+
} else if (!state.processedMarkForDeletionPresent()
128+
&& !state.deleteEventReconciliationSubmitted()) {
128129
submitReconciliationExecution(state);
129130
}
130131
}
131132

132-
private void submitReconciliationExecution(ResourceState state) {
133+
private boolean skipDeleteEventProcessing(ResourceState<P> state) {
134+
return state.deleteEventPresent() && !controllerConfiguration.reconcileOnPrimaryDelete();
135+
}
136+
137+
private void submitReconciliationExecution(ResourceState<P> state) {
133138
try {
134139
boolean controllerUnderExecution = isControllerUnderExecution(state);
135140
final var resourceID = state.getId();
136-
Optional<P> maybeLatest = cache.get(resourceID);
141+
Optional<P> maybeLatest = getCachedResource(resourceID, state);
137142
maybeLatest.ifPresent(MDCUtils::addResourceInfo);
138143
if (!controllerUnderExecution && maybeLatest.isPresent()) {
139144
var rateLimit = state.getRateLimit();
@@ -148,8 +153,15 @@ private void submitReconciliationExecution(ResourceState state) {
148153
}
149154
state.setUnderProcessing(true);
150155
final var latest = maybeLatest.get();
151-
ExecutionScope<P> executionScope = new ExecutionScope<>(state.getRetry());
152-
state.unMarkEventReceived();
156+
ExecutionScope<P> executionScope =
157+
new ExecutionScope<>(state.getRetry(), state.deleteEventPresent());
158+
159+
if (state.deleteEventPresent()) {
160+
state.markDeleteEventReconciliationSubmitted();
161+
} else {
162+
state.unMarkEventReceived();
163+
}
164+
153165
metrics.reconcileCustomResource(latest, state.getRetry(), metricsMetadata);
154166
log.debug("Executing events for custom resource. Scope: {}", executionScope);
155167
executor.execute(new ReconcilerExecutor(resourceID, executionScope));
@@ -164,7 +176,7 @@ private void submitReconciliationExecution(ResourceState state) {
164176
// there can be multiple reasons why the primary resource is not present, one is that the
165177
// informer is currently disconnected from k8s api server, but will eventually receive the
166178
// resource. Other is that simply there is no primary resource present for an event, this
167-
// might indicate issue with the implementation, but could happen also naturally, thus
179+
// might indicate an issue with the implementation, but could happen also naturally, thus
168180
// this is not necessarily a problem.
169181
log.debug("no primary resource found in cache with resource id: {}", resourceID);
170182
}
@@ -174,12 +186,25 @@ private void submitReconciliationExecution(ResourceState state) {
174186
}
175187
}
176188

177-
private void handleEventMarking(Event event, ResourceState state) {
189+
private Optional<P> getCachedResource(ResourceID resourceID, ResourceState<P> state) {
190+
var resource = cache.get(resourceID);
191+
if (resource.isPresent()) {
192+
return resource;
193+
}
194+
if (controllerConfiguration.reconcileOnPrimaryDelete() && state.deleteEventPresent()) {
195+
return Optional.of(state.getDeletedResource());
196+
}
197+
return Optional.empty();
198+
}
199+
200+
@SuppressWarnings("unchecked")
201+
private void handleEventMarking(Event event, ResourceState<P> state) {
178202
final var relatedCustomResourceID = event.getRelatedCustomResourceID();
179203
if (event instanceof ResourceEvent resourceEvent) {
180204
if (resourceEvent.getAction() == ResourceAction.DELETED) {
181205
log.debug("Marking delete event received for: {}", relatedCustomResourceID);
182-
state.markDeleteEventReceived();
206+
// todo check can there be delete event without resource?
207+
state.markDeleteEventReceived((P) resourceEvent.getResource().orElseThrow());
183208
} else {
184209
if (state.processedMarkForDeletionPresent() && isResourceMarkedForDeletion(resourceEvent)) {
185210
log.debug(
@@ -196,6 +221,7 @@ private void handleEventMarking(Event event, ResourceState state) {
196221
// event as below.
197222
markEventReceived(state);
198223
}
224+
// todo this if is weird
199225
} else if (!state.deleteEventPresent() || !state.processedMarkForDeletionPresent()) {
200226
markEventReceived(state);
201227
} else if (log.isDebugEnabled()) {
@@ -207,7 +233,7 @@ private void handleEventMarking(Event event, ResourceState state) {
207233
}
208234
}
209235

210-
private void markEventReceived(ResourceState state) {
236+
private void markEventReceived(ResourceState<P> state) {
211237
log.debug("Marking event received for: {}", state.getId());
212238
state.markEventReceived();
213239
}
@@ -251,7 +277,7 @@ synchronized void eventProcessingFinished(
251277
}
252278
cleanupOnSuccessfulExecution(executionScope);
253279
metrics.finishedReconciliation(executionScope.getResource(), metricsMetadata);
254-
if (state.deleteEventPresent()) {
280+
if (state.deleteEventPresent() || state.deleteEventReconciliationSubmitted()) {
255281
cleanupForDeletedEvent(executionScope.getResourceID());
256282
} else if (postExecutionControl.isFinalizerRemoved()) {
257283
state.markProcessedMarkForDeletion();
@@ -383,7 +409,7 @@ private void cleanupOnSuccessfulExecution(ExecutionScope<P> executionScope) {
383409
retryEventSource().cancelOnceSchedule(executionScope.getResourceID());
384410
}
385411

386-
private ResourceState getOrInitRetryExecution(ExecutionScope<P> executionScope) {
412+
private ResourceState<P> getOrInitRetryExecution(ExecutionScope<P> executionScope) {
387413
final var state = resourceStateManager.getOrCreate(executionScope.getResourceID());
388414
RetryExecution retryExecution = state.getRetry();
389415
if (retryExecution == null) {
@@ -399,7 +425,7 @@ private void cleanupForDeletedEvent(ResourceID resourceID) {
399425
metrics.cleanupDoneFor(resourceID, metricsMetadata);
400426
}
401427

402-
private boolean isControllerUnderExecution(ResourceState state) {
428+
private boolean isControllerUnderExecution(ResourceState<P> state) {
403429
return state.isUnderProcessing();
404430
}
405431

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ExecutionScope.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@ class ExecutionScope<R extends HasMetadata> {
88
// the latest custom resource from cache
99
private R resource;
1010
private final RetryInfo retryInfo;
11+
private boolean isPrimaryDeleted = false;
1112

12-
ExecutionScope(RetryInfo retryInfo) {
13+
ExecutionScope(RetryInfo retryInfo, boolean isPrimaryDeleted) {
1314
this.retryInfo = retryInfo;
15+
this.isPrimaryDeleted = isPrimaryDeleted;
1416
}
1517

1618
public ExecutionScope<R> setResource(R resource) {
@@ -42,4 +44,12 @@ public String toString() {
4244
public RetryInfo getRetryInfo() {
4345
return retryInfo;
4446
}
47+
48+
public void setPrimaryDeleted(boolean primaryDeleted) {
49+
isPrimaryDeleted = primaryDeleted;
50+
}
51+
52+
public boolean isPrimaryDeleted() {
53+
return isPrimaryDeleted;
54+
}
4555
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcher.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,11 @@ private PostExecutionControl<P> handleDispatch(ExecutionScope<P> executionScope)
9090
}
9191

9292
Context<P> context =
93-
new DefaultContext<>(executionScope.getRetryInfo(), controller, resourceForExecution);
93+
new DefaultContext<>(
94+
executionScope.getRetryInfo(),
95+
controller,
96+
resourceForExecution,
97+
executionScope.isPrimaryDeleted());
9498
if (markedForDeletion) {
9599
return handleCleanup(resourceForExecution, originalResource, context);
96100
} else {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceState.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
package io.javaoperatorsdk.operator.processing.event;
22

3+
import io.fabric8.kubernetes.api.model.HasMetadata;
34
import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter.RateLimitState;
45
import io.javaoperatorsdk.operator.processing.retry.RetryExecution;
56

6-
class ResourceState {
7+
class ResourceState<P extends HasMetadata> {
78

89
/**
910
* Manages the state of received events. Basically there can be only three distinct states
@@ -21,6 +22,7 @@ private enum EventingState {
2122
PROCESSED_MARK_FOR_DELETION,
2223
/** Delete event present, from this point other events are not relevant */
2324
DELETE_EVENT_PRESENT,
25+
DELETE_EVENT_RECONCILIATION_SUBMITTED
2426
}
2527

2628
private final ResourceID id;
@@ -30,6 +32,8 @@ private enum EventingState {
3032
private EventingState eventing;
3133
private RateLimitState rateLimit;
3234

35+
private P deletedResource;
36+
3337
public ResourceState(ResourceID id) {
3438
this.id = id;
3539
eventing = EventingState.NO_EVENT_PRESENT;
@@ -63,8 +67,13 @@ public void setUnderProcessing(boolean underProcessing) {
6367
this.underProcessing = underProcessing;
6468
}
6569

66-
public void markDeleteEventReceived() {
70+
public void markDeleteEventReceived(P deletedResource) {
6771
eventing = EventingState.DELETE_EVENT_PRESENT;
72+
this.deletedResource = deletedResource;
73+
}
74+
75+
public void markDeleteEventReconciliationSubmitted() {
76+
this.eventing = EventingState.DELETE_EVENT_RECONCILIATION_SUBMITTED;
6877
}
6978

7079
public boolean deleteEventPresent() {
@@ -75,8 +84,16 @@ public boolean processedMarkForDeletionPresent() {
7584
return eventing == EventingState.PROCESSED_MARK_FOR_DELETION;
7685
}
7786

87+
public boolean deleteEventReconciliationSubmitted() {
88+
return eventing == EventingState.DELETE_EVENT_RECONCILIATION_SUBMITTED;
89+
}
90+
91+
public P getDeletedResource() {
92+
return deletedResource;
93+
}
94+
7895
public void markEventReceived() {
79-
if (deleteEventPresent()) {
96+
if (deleteEventPresent() || deleteEventReconciliationSubmitted()) {
8097
throw new IllegalStateException("Cannot receive event after a delete event received");
8198
}
8299
eventing = EventingState.EVENT_PRESENT;

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceStateManager.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,25 +5,27 @@
55
import java.util.concurrent.ConcurrentHashMap;
66
import java.util.stream.Collectors;
77

8-
class ResourceStateManager {
8+
import io.fabric8.kubernetes.api.model.HasMetadata;
9+
10+
class ResourceStateManager<P extends HasMetadata> {
911
// maybe we should have a way for users to specify a hint on the amount of CRs their reconciler
1012
// will process to avoid under- or over-sizing the state maps and avoid too many resizing that
1113
// take time and memory?
12-
private final Map<ResourceID, ResourceState> states = new ConcurrentHashMap<>(100);
14+
private final Map<ResourceID, ResourceState<P>> states = new ConcurrentHashMap<>(100);
1315

14-
public ResourceState getOrCreate(ResourceID resourceID) {
16+
public ResourceState<P> getOrCreate(ResourceID resourceID) {
1517
return states.computeIfAbsent(resourceID, ResourceState::new);
1618
}
1719

18-
public ResourceState remove(ResourceID resourceID) {
20+
public ResourceState<P> remove(ResourceID resourceID) {
1921
return states.remove(resourceID);
2022
}
2123

2224
public boolean contains(ResourceID resourceID) {
2325
return states.containsKey(resourceID);
2426
}
2527

26-
public List<ResourceState> resourcesWithEventPresent() {
28+
public List<ResourceState<P>> resourcesWithEventPresent() {
2729
return states.values().stream()
2830
.filter(state -> !state.noEventPresent())
2931
.collect(Collectors.toList());

0 commit comments

Comments
 (0)