Skip to content

Commit fef18da

Browse files
committed
improvements and releated unit tests
Signed-off-by: Attila Mészáros <a_meszaros@apple.com>
1 parent 1a425a7 commit fef18da

7 files changed

Lines changed: 143 additions & 118 deletions

File tree

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@
3131
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
3232
import io.javaoperatorsdk.operator.processing.event.source.filter.OnDeleteFilter;
3333
import io.javaoperatorsdk.operator.processing.event.source.filter.OnUpdateFilter;
34+
import io.javaoperatorsdk.operator.processing.event.source.informer.GenericResourceEvent;
3435
import io.javaoperatorsdk.operator.processing.event.source.informer.ManagedInformerEventSource;
35-
import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling;
3636

3737
import static io.javaoperatorsdk.operator.ReconcilerUtilsInternal.handleKubernetesClientException;
3838
import static io.javaoperatorsdk.operator.processing.event.source.controller.InternalEventFilters.*;
@@ -141,11 +141,22 @@ private void handleOnAddOrUpdate(
141141
ResourceAction action, T oldCustomResource, T newCustomResource) {
142142
var handling =
143143
temporaryResourceCache.onAddOrUpdateEvent(action, newCustomResource, oldCustomResource);
144-
if (handling == EventHandling.PROPAGATE) {
145-
handleEvent(action, newCustomResource, oldCustomResource, null);
146-
} else if (log.isDebugEnabled()) {
147-
log.debug("{} event propagation for action: {}", handling, action);
148-
}
144+
handling.ifPresentOrElse(
145+
this::handleEvent,
146+
() -> {
147+
if (log.isDebugEnabled()) {
148+
log.debug("{} event propagation for action: {}", handling, action);
149+
}
150+
});
151+
}
152+
153+
@SuppressWarnings("unchecked")
154+
private void handleEvent(GenericResourceEvent r) {
155+
handleEvent(
156+
r.getAction(),
157+
(T) r.getResource().orElseThrow(),
158+
(T) r.getPreviousResource().orElse(null),
159+
r.getLastStateUnknow());
149160
}
150161

151162
@Override
@@ -154,10 +165,10 @@ public synchronized void onDelete(T resource, boolean deletedFinalStateUnknown)
154165
resource,
155166
ResourceAction.DELETED,
156167
() -> {
157-
temporaryResourceCache.onDeleteEvent(resource, deletedFinalStateUnknown);
168+
var res = temporaryResourceCache.onDeleteEvent(resource, deletedFinalStateUnknown);
158169
// delete event is quite special here, that requires special care, since we clean up
159170
// caches on delete event.
160-
handleEvent(ResourceAction.DELETED, resource, null, deletedFinalStateUnknown);
171+
res.ifPresent(this::handleEvent);
161172
});
162173
}
163174

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterDetails.java

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,14 @@
2424
import java.util.stream.Collectors;
2525

2626
import io.fabric8.kubernetes.api.model.HasMetadata;
27+
import io.javaoperatorsdk.operator.ReconcilerUtilsInternal;
2728
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
2829

2930
class EventFilterDetails {
3031

3132
private int activeUpdates = 0;
32-
private List<GenericResourceEvent> relatedEvents = new ArrayList<>();
33-
private Set<String> allOwnResourceVersions = new HashSet<>();
33+
private final List<GenericResourceEvent> relatedEvents = new ArrayList<>(5);
34+
private final Set<String> allOwnResourceVersions = new HashSet<>(5);
3435

3536
public void increaseActiveUpdates() {
3637
activeUpdates = activeUpdates + 1;
@@ -50,6 +51,10 @@ public int getActiveUpdates() {
5051
return activeUpdates;
5152
}
5253

54+
public boolean isNoActiveUpdate() {
55+
return activeUpdates == 0;
56+
}
57+
5358
void addToOwnResourceVersions(String updateVersion) {
5459
allOwnResourceVersions.add(updateVersion);
5560
}
@@ -62,10 +67,7 @@ public Optional<GenericResourceEvent> prepareSummaryEventIfNotOwnEventsPresent()
6267
if (relatedEvents.isEmpty()) {
6368
return Optional.empty();
6469
}
65-
if (allOwnResourceVersions.containsAll(
66-
relatedEvents.stream()
67-
.map(e -> e.getResource().orElseThrow().getMetadata().getResourceVersion())
68-
.collect(Collectors.toSet()))) {
70+
if (allOwnResourceVersions.containsAll(relatedEventResourceVersions())) {
6971
return Optional.empty();
7072
}
7173
var deleteEvent =
@@ -87,4 +89,23 @@ public Optional<GenericResourceEvent> prepareSummaryEventIfNotOwnEventsPresent()
8789
firstResource,
8890
null));
8991
}
92+
93+
private Set<String> relatedEventResourceVersions() {
94+
return relatedEvents.stream()
95+
.map(e -> e.getResource().orElseThrow().getMetadata().getResourceVersion())
96+
.collect(Collectors.toSet());
97+
}
98+
99+
public boolean newerOrEqualEventReceivedForOwnLastUpdate() {
100+
if (allOwnResourceVersions.isEmpty()) {
101+
return true;
102+
}
103+
String lastOwn =
104+
allOwnResourceVersions.stream()
105+
.reduce((a, b) -> ReconcilerUtilsInternal.compareResourceVersions(a, b) >= 0 ? a : b)
106+
.orElseThrow();
107+
return relatedEvents.stream()
108+
.map(e -> e.getResource().orElseThrow().getMetadata().getResourceVersion())
109+
.anyMatch(rv -> ReconcilerUtilsInternal.compareResourceVersions(rv, lastOwn) >= 0);
110+
}
90111
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import io.javaoperatorsdk.operator.processing.event.ResourceID;
3434
import io.javaoperatorsdk.operator.processing.event.source.PrimaryToSecondaryMapper;
3535
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
36-
import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling;
3736

3837
/**
3938
* Wraps informer(s) so they are connected to the eventing system of the framework. Note that since
@@ -154,20 +153,24 @@ private synchronized void onAddOrUpdate(ResourceAction action, R newObject, R ol
154153

155154
var eventHandling = temporaryResourceCache.onAddOrUpdateEvent(action, newObject, oldObject);
156155

157-
if (eventHandling != EventHandling.PROPAGATE) {
158-
log.debug(
159-
"{} event propagation", eventHandling == EventHandling.IGNORE ? "Deferring" : "Skipping");
156+
if (eventHandling.isEmpty()) {
157+
log.debug("Deferring event propagation");
160158
} else if (eventAcceptedByFilter(action, newObject, oldObject)) {
161159
log.debug(
162160
"Propagating event for {}, resource with same version not result of a our update.",
163161
action);
164-
propagateEvent(newObject);
162+
var event = eventHandling.get();
163+
handleEvent(
164+
event.getAction(),
165+
(R) event.getResource().orElseThrow(),
166+
(R) event.getPreviousResource().orElse(null),
167+
event.getLastStateUnknow());
165168
} else {
166169
log.debug("Event filtered out for operation: {}, resourceID: {}", action, resourceID);
167170
}
168171
}
169172

170-
private void propagateEvent(R object) {
173+
protected void propagateEvent(R object) {
171174
var primaryResourceIdSet =
172175
configuration().getSecondaryToPrimaryMapper().toPrimaryResourceIDs(object);
173176
if (primaryResourceIdSet.isEmpty()) {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java

Lines changed: 36 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -85,41 +85,44 @@ public synchronized Optional<GenericResourceEvent> doneEventFilterModify(Resourc
8585
return Optional.empty();
8686
}
8787
var ed = activeUpdates.get(resourceID);
88-
if (ed == null || !ed.decreaseActiveUpdates()) {
89-
log.debug(
90-
"Active updates {} for resource id: {}",
91-
ed != null ? ed.getActiveUpdates() : 0,
92-
resourceID);
88+
if (!ed.decreaseActiveUpdates()) {
89+
log.debug("Active updates {} for resource id: {}", ed.getActiveUpdates(), resourceID);
90+
return Optional.empty();
91+
}
92+
93+
if (ed.newerOrEqualEventReceivedForOwnLastUpdate()) {
94+
activeUpdates.remove(resourceID);
95+
return ed.prepareSummaryEventIfNotOwnEventsPresent();
96+
} else {
9397
return Optional.empty();
9498
}
95-
activeUpdates.remove(resourceID);
96-
return ed.prepareSummaryEventIfNotOwnEventsPresent();
9799
}
98100

99-
public void onDeleteEvent(T resource, boolean unknownState) {
100-
onEvent(ResourceAction.DELETED, resource, null, unknownState);
101+
public Optional<GenericResourceEvent> onDeleteEvent(T resource, boolean unknownState) {
102+
return onEvent(ResourceAction.DELETED, resource, null, unknownState);
101103
}
102104

103-
public EventHandling onAddOrUpdateEvent(
105+
public Optional<GenericResourceEvent> onAddOrUpdateEvent(
104106
ResourceAction action, T resource, T prevResourceVersion) {
105-
return onEvent(action, resource, prevResourceVersion, false);
107+
return onEvent(action, resource, prevResourceVersion, null);
106108
}
107109

108-
private synchronized EventHandling onEvent(
109-
ResourceAction action, T resource, T prevResourceVersion, boolean unknownState) {
110+
private synchronized Optional<GenericResourceEvent> onEvent(
111+
ResourceAction action, T resource, T prevResourceVersion, Boolean unknownState) {
112+
GenericResourceEvent actualEvent =
113+
toGenericResourceEvent(action, resource, prevResourceVersion, unknownState);
110114
if (!comparableResourceVersions) {
111-
return EventHandling.PROPAGATE;
115+
return Optional.of(actualEvent);
112116
}
113-
114117
var resourceId = ResourceID.fromResource(resource);
115118
if (log.isDebugEnabled()) {
116119
log.debug("Processing event");
117120
}
118121
var cached = cache.get(resourceId);
119-
EventHandling result = EventHandling.PROPAGATE;
122+
Optional<GenericResourceEvent> result = Optional.of(actualEvent);
120123
if (cached != null) {
121124
int comp = ReconcilerUtilsInternal.compareResourceVersions(resource, cached);
122-
if (comp >= 0 || unknownState) {
125+
if (comp >= 0 || Boolean.TRUE.equals(unknownState)) {
123126
log.debug(
124127
"Removing resource from temp cache. comparison: {} unknown state: {}",
125128
comp,
@@ -128,7 +131,9 @@ private synchronized EventHandling onEvent(
128131
// we propagate event only for our update or newer other can be discarded since we know we
129132
// will receive
130133
// additional event
131-
result = comp == 0 ? EventHandling.IGNORE : EventHandling.PROPAGATE;
134+
if (comp == 0) {
135+
result = Optional.empty();
136+
}
132137
} else {
133138
// in this case we received and event that might be in some edge case that was
134139
// already used in reconciler or after that, but before our updated resource version.
@@ -141,13 +146,24 @@ private synchronized EventHandling onEvent(
141146
log.debug("Recording relevant event");
142147
au.addRelatedEvent(
143148
new GenericResourceEvent(action, resource, prevResourceVersion, unknownState));
144-
return EventHandling.IGNORE;
149+
// this is to cover the situation when we finished the filtering and caching update but
150+
// did not receive events for our own updates yet.
151+
if (au.isNoActiveUpdate() && au.newerOrEqualEventReceivedForOwnLastUpdate()) {
152+
activeUpdates.remove(resourceId);
153+
return au.prepareSummaryEventIfNotOwnEventsPresent();
154+
}
155+
return Optional.empty();
145156
} else {
146-
log.debug("No active recornding, event handling: {}", result);
157+
log.debug("No active recording, event handling: {}", result);
147158
return result;
148159
}
149160
}
150161

162+
static <T extends HasMetadata> GenericResourceEvent toGenericResourceEvent(
163+
ResourceAction action, T resource, T prevResourceVersion, Boolean unknownState) {
164+
return new GenericResourceEvent(action, resource, prevResourceVersion, unknownState);
165+
}
166+
151167
/** put the item into the cache if it's for a later state than what has already been observed. */
152168
public synchronized void putResource(T newResource) {
153169
if (!comparableResourceVersions) {

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ void propagatesIntermediateEventForExternalUpdateDuringFiltering() {
250250
// external update with rv 3 (older than our cached rv 4) — must propagate
251251
source.onUpdate(testResourceWithVersion(2), testResourceWithVersion(3));
252252
latch2.countDown();
253+
source.onUpdate(testResourceWithVersion(3), testResourceWithVersion(5));
253254

254255
await().untilAsserted(() -> verify(eventHandler, times(1)).handleEvent(any()));
255256
}

0 commit comments

Comments
 (0)