Skip to content

Commit 317d8df

Browse files
committed
Event filtering with recording
Signed-off-by: Attila Mészáros <a_meszaros@apple.com>
1 parent a067b8c commit 317d8df

8 files changed

Lines changed: 113 additions & 170 deletions

File tree

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterDetails.java

Lines changed: 36 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,22 @@
1515
*/
1616
package io.javaoperatorsdk.operator.processing.event.source.informer;
1717

18+
import java.util.ArrayList;
1819
import java.util.HashSet;
20+
import java.util.List;
1921
import java.util.Optional;
2022
import java.util.Set;
2123
import java.util.function.UnaryOperator;
24+
import java.util.stream.Collectors;
2225

2326
import io.fabric8.kubernetes.api.model.HasMetadata;
24-
import io.javaoperatorsdk.operator.ReconcilerUtilsInternal;
25-
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;
27+
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
2628

2729
class EventFilterDetails {
2830

2931
private int activeUpdates = 0;
30-
private ResourceEvent lastRelevantEvent;
31-
private String lastOwnUpdatedResourceVersion;
32+
private List<GenericResourceEvent> relatedEvents = new ArrayList<>();
3233
private Set<String> allOwnResourceVersions = new HashSet<>();
33-
private Set<String> uncertainEvents = new HashSet<>();
3434

3535
public void increaseActiveUpdates() {
3636
activeUpdates = activeUpdates + 1;
@@ -41,40 +41,11 @@ public void increaseActiveUpdates() {
4141
* controller to prevent race condition and send event from {@link
4242
* ManagedInformerEventSource#eventFilteringUpdateAndCacheResource(HasMetadata, UnaryOperator)}
4343
*/
44-
public boolean decreaseActiveUpdates(String updatedResourceVersion) {
45-
if (updatedResourceVersion != null
46-
&& (lastOwnUpdatedResourceVersion == null
47-
|| ReconcilerUtilsInternal.compareResourceVersions(
48-
updatedResourceVersion, lastOwnUpdatedResourceVersion)
49-
> 0)) {
50-
lastOwnUpdatedResourceVersion = updatedResourceVersion;
51-
}
52-
44+
public boolean decreaseActiveUpdates() {
5345
activeUpdates = activeUpdates - 1;
5446
return activeUpdates == 0;
5547
}
5648

57-
public void setLastRelevantEvent(ResourceEvent event) {
58-
lastRelevantEvent = event;
59-
}
60-
61-
public Optional<ResourceEvent> getRelevantEventToPropagate() {
62-
if (lastRelevantEvent != null
63-
&& (lastOwnUpdatedResourceVersion == null
64-
|| ReconcilerUtilsInternal.compareResourceVersions(
65-
lastRelevantEvent
66-
.getResource()
67-
.orElseThrow()
68-
.getMetadata()
69-
.getResourceVersion(),
70-
lastOwnUpdatedResourceVersion)
71-
> 0)
72-
|| allOwnResourceVersions.containsAll(uncertainEvents)) {
73-
return Optional.of(lastRelevantEvent);
74-
}
75-
return Optional.empty();
76-
}
77-
7849
public int getActiveUpdates() {
7950
return activeUpdates;
8051
}
@@ -83,11 +54,37 @@ void addToOwnResourceVersions(String updateVersion) {
8354
allOwnResourceVersions.add(updateVersion);
8455
}
8556

86-
public boolean isOwnResourceVersions(String resourceVersion) {
87-
return allOwnResourceVersions.contains(resourceVersion);
57+
public void addRelatedEvent(GenericResourceEvent event) {
58+
relatedEvents.add(event);
8859
}
8960

90-
public void addUncertainResourceVersion(String resourceVersion) {
91-
uncertainEvents.add(resourceVersion);
61+
public Optional<GenericResourceEvent> prepareSummaryEventIfNotOwnEventsPresent() {
62+
if (relatedEvents.isEmpty()) {
63+
return Optional.empty();
64+
}
65+
if (allOwnResourceVersions.containsAll(
66+
relatedEvents.stream()
67+
.map(e -> e.getResource().orElseThrow().getMetadata().getResourceVersion())
68+
.collect(Collectors.toSet()))) {
69+
return Optional.empty();
70+
}
71+
var deleteEvent =
72+
relatedEvents.stream().filter(e -> e.getAction() == ResourceAction.DELETED).findFirst();
73+
if (deleteEvent.isPresent()) {
74+
return deleteEvent;
75+
}
76+
if (relatedEvents.size() == 1) {
77+
return Optional.of(relatedEvents.get(0));
78+
}
79+
var firstEvent = relatedEvents.get(0);
80+
var firstResource =
81+
firstEvent.getPreviousResource().orElseGet(() -> firstEvent.getResource().orElseThrow());
82+
83+
return Optional.of(
84+
new GenericResourceEvent(
85+
ResourceAction.UPDATED,
86+
relatedEvents.get(relatedEvents.size() - 1).getResource().orElseThrow(),
87+
firstResource,
88+
null));
9289
}
9390
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ExtendedResourceEvent.java renamed to operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/GenericResourceEvent.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,26 +24,32 @@
2424
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;
2525

2626
/** Used only for resource event filtering. */
27-
public class ExtendedResourceEvent extends ResourceEvent {
27+
public class GenericResourceEvent extends ResourceEvent {
2828

2929
private final HasMetadata previousResource;
30+
private final Boolean lastStateUnknow;
3031

31-
public ExtendedResourceEvent(
32+
public GenericResourceEvent(
3233
ResourceAction action,
33-
ResourceID resourceID,
3434
HasMetadata latestResource,
35-
HasMetadata previousResource) {
36-
super(action, resourceID, latestResource);
35+
HasMetadata previousResource,
36+
Boolean lastStateUnknow) {
37+
super(action, ResourceID.fromResource(latestResource), latestResource);
3738
this.previousResource = previousResource;
39+
this.lastStateUnknow = lastStateUnknow;
3840
}
3941

4042
public Optional<HasMetadata> getPreviousResource() {
4143
return Optional.ofNullable(previousResource);
4244
}
4345

46+
public Boolean getLastStateUnknow() {
47+
return lastStateUnknow;
48+
}
49+
4450
@Override
4551
public String toString() {
46-
return "ExtendedResourceEvent{"
52+
return "GenericResourceEvent{"
4753
+ getPreviousResource()
4854
.map(r -> "previousResourceVersion=" + r.getMetadata().getResourceVersion())
4955
.orElse("")
@@ -61,7 +67,7 @@ public String toString() {
6167
public boolean equals(Object o) {
6268
if (o == null || getClass() != o.getClass()) return false;
6369
if (!super.equals(o)) return false;
64-
ExtendedResourceEvent that = (ExtendedResourceEvent) o;
70+
GenericResourceEvent that = (GenericResourceEvent) o;
6571
return Objects.equals(previousResource, that.previousResource);
6672
}
6773

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java

Lines changed: 7 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import io.javaoperatorsdk.operator.processing.event.ResourceID;
4747
import io.javaoperatorsdk.operator.processing.event.source.*;
4848
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
49-
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceDeleteEvent;
5049

5150
@SuppressWarnings("rawtypes")
5251
public abstract class ManagedInformerEventSource<
@@ -105,58 +104,14 @@ public R eventFilteringUpdateAndCacheResource(R resourceToUpdate, UnaryOperator<
105104
handleRecentResourceUpdate(id, updatedResource, resourceToUpdate);
106105
return updatedResource;
107106
} finally {
108-
var res =
109-
temporaryResourceCache.doneEventFilterModify(
110-
id,
111-
updatedResource == null ? null : updatedResource.getMetadata().getResourceVersion());
112-
var updatedForLambda = updatedResource;
107+
var res = temporaryResourceCache.doneEventFilterModify(id);
113108
res.ifPresentOrElse(
114-
r -> {
115-
// as previous resource version we use the one from successful update, since
116-
// we process new event here only if that is more recent then the event from our update.
117-
// Note that this is equivalent with the scenario when an informer watch connection
118-
// would reconnect and loose some events in between.
119-
// If that update was not successful we still record the previous version from the
120-
// actual event in the ExtendedResourceEvent.
121-
R extendedResourcePrevVersion =
122-
(r instanceof ExtendedResourceEvent)
123-
? (R) ((ExtendedResourceEvent) r).getPreviousResource().orElse(null)
124-
: null;
125-
R prevVersionOfResource = null;
126-
R latestResource = null;
127-
if (updatedForLambda != null) {
128-
var updatedNewerThanRelated =
129-
ReconcilerUtilsInternal.compareResourceVersions(
130-
updatedForLambda, r.getResource().orElseThrow())
131-
> 0;
132-
prevVersionOfResource =
133-
updatedNewerThanRelated
134-
? (extendedResourcePrevVersion != null
135-
? extendedResourcePrevVersion
136-
: prevVersionOfResource)
137-
: updatedForLambda;
138-
latestResource = updatedForLambda;
139-
} else {
140-
prevVersionOfResource = extendedResourcePrevVersion;
141-
latestResource = (R) r.getResource().orElseThrow();
142-
}
143-
144-
if (log.isDebugEnabled()) {
145-
log.debug(
146-
"Previous resource version: {} resource from update present: {}"
147-
+ " extendedPrevResource present: {}",
148-
prevVersionOfResource.getMetadata().getResourceVersion(),
149-
updatedForLambda != null,
150-
extendedResourcePrevVersion != null);
151-
}
152-
handleEvent(
153-
r.getAction(),
154-
latestResource,
155-
prevVersionOfResource,
156-
(r instanceof ResourceDeleteEvent)
157-
? ((ResourceDeleteEvent) r).isDeletedFinalStateUnknown()
158-
: null);
159-
},
109+
r ->
110+
handleEvent(
111+
r.getAction(),
112+
(R) r.getResource().orElseThrow(),
113+
(R) r.getPreviousResource().orElse(null),
114+
r.getLastStateUnknow()),
160115
() -> log.debug("No new event present after the filtering update"));
161116
}
162117
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java

Lines changed: 10 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@
2929
import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResource;
3030
import io.javaoperatorsdk.operator.processing.event.ResourceID;
3131
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
32-
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceDeleteEvent;
33-
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;
3432

3533
/**
3634
* Temporal cache is used to solve the problem for {@link KubernetesDependentResource} that is, when
@@ -84,45 +82,33 @@ public synchronized void startEventFilteringModify(ResourceID resourceID) {
8482
ed.increaseActiveUpdates();
8583
}
8684

87-
public synchronized Optional<ResourceEvent> doneEventFilterModify(
88-
ResourceID resourceID, String updatedResourceVersion) {
85+
public synchronized Optional<GenericResourceEvent> doneEventFilterModify(ResourceID resourceID) {
8986
if (!comparableResourceVersions) {
9087
return Optional.empty();
9188
}
9289
var ed = activeUpdates.get(resourceID);
93-
if (ed == null || !ed.decreaseActiveUpdates(updatedResourceVersion)) {
90+
if (ed == null || !ed.decreaseActiveUpdates()) {
9491
log.debug(
9592
"Active updates {} for resource id: {}",
9693
ed != null ? ed.getActiveUpdates() : 0,
9794
resourceID);
9895
return Optional.empty();
9996
}
10097
activeUpdates.remove(resourceID);
101-
var res = ed.getRelevantEventToPropagate();
102-
log.debug(
103-
"Zero active updates for resource id: {}; event after update event: {}; updated resource"
104-
+ " version: {}",
105-
resourceID,
106-
res.isPresent(),
107-
updatedResourceVersion);
108-
return res;
98+
return ed.prepareSummaryEventIfNotOwnEventsPresent();
10999
}
110100

111101
public void onDeleteEvent(T resource, boolean unknownState) {
112-
onEvent(ResourceAction.DELETED, resource, null, unknownState, true);
102+
onEvent(ResourceAction.DELETED, resource, null, unknownState);
113103
}
114104

115105
public EventHandling onAddOrUpdateEvent(
116106
ResourceAction action, T resource, T prevResourceVersion) {
117-
return onEvent(action, resource, prevResourceVersion, false, false);
107+
return onEvent(action, resource, prevResourceVersion, false);
118108
}
119109

120110
private synchronized EventHandling onEvent(
121-
ResourceAction action,
122-
T resource,
123-
T prevResourceVersion,
124-
boolean unknownState,
125-
boolean delete) {
111+
ResourceAction action, T resource, T prevResourceVersion, boolean unknownState) {
126112
if (!comparableResourceVersions) {
127113
return EventHandling.NEW;
128114
}
@@ -155,29 +141,10 @@ private synchronized EventHandling onEvent(
155141
}
156142
var au = activeUpdates.get(resourceId);
157143
if (au != null) {
158-
if (result == EventHandling.INTERMEDIATE) {
159-
var ownResourceVersion =
160-
au.isOwnResourceVersions(resource.getMetadata().getResourceVersion());
161-
log.debug("Handling intermediate event. Own resource version: {}", ownResourceVersion);
162-
return ownResourceVersion ? EventHandling.DEFER : EventHandling.INTERMEDIATE;
163-
}
164-
if (result == EventHandling.NEW) {
165-
if (cached == null) {
166-
// this is for the case when temp cache is null, we receive an event
167-
// there is ongoing filtering-caching update; at this point we cannot tell
168-
// if that event is from our update
169-
log.debug("Setting uncertain resource version.");
170-
au.addUncertainResourceVersion(resource.getMetadata().getResourceVersion());
171-
}
172-
log.debug("Setting last event for id: {} delete: {}", resourceId, delete);
173-
au.setLastRelevantEvent(
174-
delete
175-
? new ResourceDeleteEvent(
176-
ResourceAction.DELETED, resourceId, resource, unknownState)
177-
: new ExtendedResourceEvent(action, resourceId, resource, prevResourceVersion));
178-
return EventHandling.DEFER;
179-
}
180-
return result;
144+
log.debug("Recording relevant event");
145+
au.addRelatedEvent(
146+
new GenericResourceEvent(action, resource, prevResourceVersion, unknownState));
147+
return EventHandling.DEFER;
181148
} else {
182149
return result;
183150
}

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -249,10 +249,9 @@ void propagatesIntermediateEventForExternalUpdateDuringFiltering() {
249249

250250
// external update with rv 3 (older than our cached rv 4) — must propagate
251251
source.onUpdate(testResourceWithVersion(2), testResourceWithVersion(3));
252-
253-
verify(eventHandler, times(1)).handleEvent(any());
254-
255252
latch2.countDown();
253+
254+
await().untilAsserted(() -> verify(eventHandler, times(1)).handleEvent(any()));
256255
}
257256

258257
@Test
@@ -317,7 +316,7 @@ private void expectHandleEvent(int newResourceVersion, int oldResourceVersion) {
317316
.isEqualTo("" + oldResourceVersion);
318317
return true;
319318
}),
320-
isNull());
319+
any());
321320
});
322321
}
323322

0 commit comments

Comments
 (0)