Skip to content

Commit da91b74

Browse files
committed
Event filtering now records resource action and previous resource
This is important to have a correct further event propagation Signed-off-by: Attila Mészáros <a_meszaros@apple.com>
1 parent 9986c1e commit da91b74

File tree

15 files changed

+130
-52
lines changed

15 files changed

+130
-52
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter;
3838
import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter.RateLimitState;
3939
import io.javaoperatorsdk.operator.processing.event.source.Cache;
40-
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction;
40+
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
4141
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceDeleteEvent;
4242
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;
4343
import io.javaoperatorsdk.operator.processing.event.source.timer.TimerEventSource;

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@
3737
import io.javaoperatorsdk.operator.processing.LifecycleAware;
3838
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
3939
import io.javaoperatorsdk.operator.processing.event.source.EventSourceStartPriority;
40+
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
4041
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventAware;
4142
import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerEventSource;
42-
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction;
4343
import io.javaoperatorsdk.operator.processing.event.source.informer.ManagedInformerEventSource;
4444
import io.javaoperatorsdk.operator.processing.event.source.timer.TimerEventSource;
4545

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceAction.java renamed to operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ResourceAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.javaoperatorsdk.operator.processing.event.source.controller;
16+
package io.javaoperatorsdk.operator.processing.event.source;
1717

1818
public enum ResourceAction {
1919
ADDED,

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.javaoperatorsdk.operator.processing.Controller;
2929
import io.javaoperatorsdk.operator.processing.MDCUtils;
3030
import io.javaoperatorsdk.operator.processing.event.ResourceID;
31+
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
3132
import io.javaoperatorsdk.operator.processing.event.source.filter.OnDeleteFilter;
3233
import io.javaoperatorsdk.operator.processing.event.source.filter.OnUpdateFilter;
3334
import io.javaoperatorsdk.operator.processing.event.source.informer.ManagedInformerEventSource;
@@ -139,13 +140,15 @@ private boolean isAcceptedByFilters(ResourceAction action, T resource, T oldReso
139140

140141
@Override
141142
public void onAdd(T resource) {
142-
var handling = temporaryResourceCache.onAddOrUpdateEvent(resource);
143+
var handling = temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.ADDED, resource, null);
143144
handleEvent(ResourceAction.ADDED, resource, null, null, handling != EventHandling.NEW);
144145
}
145146

146147
@Override
147148
public void onUpdate(T oldCustomResource, T newCustomResource) {
148-
var handling = temporaryResourceCache.onAddOrUpdateEvent(newCustomResource);
149+
var handling =
150+
temporaryResourceCache.onAddOrUpdateEvent(
151+
ResourceAction.UPDATED, newCustomResource, oldCustomResource);
149152
handleEvent(
150153
ResourceAction.UPDATED,
151154
newCustomResource,

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceDeleteEvent.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import io.fabric8.kubernetes.api.model.HasMetadata;
2121
import io.javaoperatorsdk.operator.processing.event.ResourceID;
22+
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
2223

2324
/**
2425
* Extends ResourceEvent for informer Delete events, it holds also information if the final state is

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceEvent.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.fabric8.kubernetes.api.model.HasMetadata;
2222
import io.javaoperatorsdk.operator.processing.event.Event;
2323
import io.javaoperatorsdk.operator.processing.event.ResourceID;
24+
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
2425

2526
public class ResourceEvent extends Event {
2627

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright Java Operator SDK Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.javaoperatorsdk.operator.processing.event.source.informer;
17+
18+
import java.util.Optional;
19+
20+
import io.fabric8.kubernetes.api.model.HasMetadata;
21+
import io.javaoperatorsdk.operator.processing.event.ResourceID;
22+
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
23+
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;
24+
25+
/** Used only for resource event filtering. */
26+
public class ExtendedResourceEvent extends ResourceEvent {
27+
28+
private HasMetadata previousResource;
29+
30+
public ExtendedResourceEvent(
31+
ResourceAction action,
32+
ResourceID resourceID,
33+
HasMetadata latestResource,
34+
HasMetadata previousResource) {
35+
super(action, resourceID, latestResource);
36+
this.previousResource = previousResource;
37+
}
38+
39+
public Optional<HasMetadata> getPreviousResource() {
40+
return Optional.ofNullable(previousResource);
41+
}
42+
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
import io.javaoperatorsdk.operator.processing.event.EventHandler;
3333
import io.javaoperatorsdk.operator.processing.event.ResourceID;
3434
import io.javaoperatorsdk.operator.processing.event.source.PrimaryToSecondaryMapper;
35-
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction;
35+
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
3636
import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling;
3737

3838
import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_COMPARABLE_RESOURCE_VERSION;
@@ -107,7 +107,7 @@ public void onAdd(R newResource) {
107107
resourceType().getSimpleName(),
108108
newResource.getMetadata().getResourceVersion());
109109
}
110-
onAddOrUpdate(Operation.ADD, newResource, null);
110+
onAddOrUpdate(ResourceAction.ADDED, newResource, null);
111111
}
112112

113113
@Override
@@ -120,7 +120,7 @@ public void onUpdate(R oldObject, R newObject) {
120120
newObject.getMetadata().getResourceVersion(),
121121
oldObject.getMetadata().getResourceVersion());
122122
}
123-
onAddOrUpdate(Operation.UPDATE, newObject, oldObject);
123+
onAddOrUpdate(ResourceAction.UPDATED, newObject, oldObject);
124124
}
125125

126126
@Override
@@ -156,27 +156,27 @@ public synchronized void start() {
156156
manager().list().forEach(primaryToSecondaryIndex::onAddOrUpdate);
157157
}
158158

159-
private synchronized void onAddOrUpdate(Operation operation, R newObject, R oldObject) {
159+
private synchronized void onAddOrUpdate(ResourceAction action, R newObject, R oldObject) {
160160
primaryToSecondaryIndex.onAddOrUpdate(newObject);
161161
var resourceID = ResourceID.fromResource(newObject);
162162

163-
var eventHandling = temporaryResourceCache.onAddOrUpdateEvent(newObject);
163+
var eventHandling = temporaryResourceCache.onAddOrUpdateEvent(action, newObject, oldObject);
164164

165165
if (eventHandling != EventHandling.NEW) {
166166
log.debug(
167167
"{} event propagation for {}. Resource ID: {}",
168168
eventHandling == EventHandling.DEFER ? "Deferring" : "Skipping",
169-
operation,
169+
action,
170170
ResourceID.fromResource(newObject));
171-
} else if (eventAcceptedByFilter(operation, newObject, oldObject)) {
171+
} else if (eventAcceptedByFilter(action, newObject, oldObject)) {
172172
log.debug(
173173
"Propagating event for {}, resource with same version not result of a reconciliation."
174174
+ " Resource ID: {}",
175-
operation,
175+
action,
176176
resourceID);
177177
propagateEvent(newObject);
178178
} else {
179-
log.debug("Event filtered out for operation: {}, resourceID: {}", operation, resourceID);
179+
log.debug("Event filtered out for operation: {}, resourceID: {}", action, resourceID);
180180
}
181181
}
182182

@@ -251,11 +251,11 @@ public boolean allowsNamespaceChanges() {
251251
return configuration().followControllerNamespaceChanges();
252252
}
253253

254-
private boolean eventAcceptedByFilter(Operation operation, R newObject, R oldObject) {
254+
private boolean eventAcceptedByFilter(ResourceAction operation, R newObject, R oldObject) {
255255
if (genericFilter != null && !genericFilter.accept(newObject)) {
256256
return false;
257257
}
258-
if (operation == Operation.ADD) {
258+
if (operation == ResourceAction.ADDED) {
259259
return onAddFilter == null || onAddFilter.accept(newObject);
260260
} else {
261261
return onUpdateFilter == null || onUpdateFilter.accept(newObject, oldObject);
@@ -266,9 +266,4 @@ private boolean acceptedByDeleteFilters(R resource, boolean b) {
266266
return (onDeleteFilter == null || onDeleteFilter.accept(resource, b))
267267
&& (genericFilter == null || genericFilter.accept(resource));
268268
}
269-
270-
private enum Operation {
271-
ADD,
272-
UPDATE
273-
}
274269
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
import io.javaoperatorsdk.operator.health.Status;
4343
import io.javaoperatorsdk.operator.processing.event.ResourceID;
4444
import io.javaoperatorsdk.operator.processing.event.source.*;
45-
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction;
45+
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
4646
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceDeleteEvent;
4747

4848
@SuppressWarnings("rawtypes")
@@ -90,6 +90,7 @@ public void changeNamespaces(Set<String> namespaces) {
9090
* Also makes sure that the even produced by this update is filtered, thus does not trigger the
9191
* reconciliation.
9292
*/
93+
@SuppressWarnings("unchecked")
9394
public R eventFilteringUpdateAndCacheResource(R resourceToUpdate, UnaryOperator<R> updateMethod) {
9495
ResourceID id = ResourceID.fromResource(resourceToUpdate);
9596
if (log.isDebugEnabled()) {
@@ -110,12 +111,21 @@ public R eventFilteringUpdateAndCacheResource(R resourceToUpdate, UnaryOperator<
110111
res.ifPresent(
111112
r -> {
112113
R latestResource = (R) r.getResource().orElseThrow();
113-
// for update we need to have a historic resource, this might be improved to mimic more
114-
// realistic scenario
114+
115+
// as previous resource version we use the one from successful update, since
116+
// we process new event here only if that is more recent then the event from our update.
117+
// Note that this is equivalent with the scenario when an informer watch connection
118+
// would
119+
// reconnect and loose some events in between.
120+
// If that update was not successful we still record the previous version from the
121+
// actual
122+
// event in the ExtendedResourceEvent.
123+
R extendedResourcePrevVersion =
124+
(r instanceof ExtendedResourceEvent)
125+
? (R) ((ExtendedResourceEvent) r).getPreviousResource().orElse(null)
126+
: null;
115127
R prevVersionOfResource =
116-
updatedForLambda != null
117-
? updatedForLambda
118-
: (r.getAction() == ResourceAction.UPDATED ? latestResource : null);
128+
updatedForLambda != null ? updatedForLambda : extendedResourcePrevVersion;
119129
handleEvent(
120130
r.getAction(),
121131
latestResource,

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import io.javaoperatorsdk.operator.api.reconciler.ReconcileUtils;
2828
import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResource;
2929
import io.javaoperatorsdk.operator.processing.event.ResourceID;
30-
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction;
30+
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
3131
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceDeleteEvent;
3232
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;
3333

@@ -94,17 +94,23 @@ public synchronized Optional<ResourceEvent> doneEventFilterModify(
9494
}
9595

9696
public void onDeleteEvent(T resource, boolean unknownState) {
97-
onEvent(resource, unknownState, true);
97+
onEvent(ResourceAction.DELETED, resource, null, unknownState, true);
9898
}
9999

100100
/**
101101
* @return true if the resourceVersion was obsolete
102102
*/
103-
public EventHandling onAddOrUpdateEvent(T resource) {
104-
return onEvent(resource, false, false);
103+
public EventHandling onAddOrUpdateEvent(
104+
ResourceAction action, T resource, T prevResourceVersion) {
105+
return onEvent(action, resource, prevResourceVersion, false, false);
105106
}
106107

107-
private synchronized EventHandling onEvent(T resource, boolean unknownState, boolean delete) {
108+
private synchronized EventHandling onEvent(
109+
ResourceAction action,
110+
T resource,
111+
T prevResourceVersion,
112+
boolean unknownState,
113+
boolean delete) {
108114
if (!comparableResourceVersions) {
109115
return EventHandling.NEW;
110116
}
@@ -139,8 +145,7 @@ private synchronized EventHandling onEvent(T resource, boolean unknownState, boo
139145
ed.setLastEvent(
140146
delete
141147
? new ResourceDeleteEvent(ResourceAction.DELETED, resourceId, resource, unknownState)
142-
: new ResourceEvent(
143-
ResourceAction.UPDATED, resourceId, resource)); // todo true action
148+
: new ExtendedResourceEvent(action, resourceId, resource, prevResourceVersion));
144149
return EventHandling.DEFER;
145150
} else {
146151
return result;

0 commit comments

Comments
 (0)