Skip to content

Commit 62c66ca

Browse files
committed
wip
Signed-off-by: Attila Mészáros <a_meszaros@apple.com>
1 parent 2a4c8f1 commit 62c66ca

File tree

5 files changed

+118
-65
lines changed

5 files changed

+118
-65
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,13 +139,13 @@ private boolean isAcceptedByFilters(ResourceAction action, T resource, T oldReso
139139
}
140140

141141
@Override
142-
public void onAdd(T resource) {
142+
public synchronized void onAdd(T resource) {
143143
var handling = temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.ADDED, resource, null);
144144
handleEvent(ResourceAction.ADDED, resource, null, null, handling != EventHandling.NEW);
145145
}
146146

147147
@Override
148-
public void onUpdate(T oldCustomResource, T newCustomResource) {
148+
public synchronized void onUpdate(T oldCustomResource, T newCustomResource) {
149149
var handling =
150150
temporaryResourceCache.onAddOrUpdateEvent(
151151
ResourceAction.UPDATED, newCustomResource, oldCustomResource);
@@ -158,7 +158,7 @@ public void onUpdate(T oldCustomResource, T newCustomResource) {
158158
}
159159

160160
@Override
161-
public void onDelete(T resource, boolean deletedFinalStateUnknown) {
161+
public synchronized void onDelete(T resource, boolean deletedFinalStateUnknown) {
162162
temporaryResourceCache.onDeleteEvent(resource, deletedFinalStateUnknown);
163163
// delete event is quite special here, that requires special care, since we clean up caches on
164164
// delete event.

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/TestUtils.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ public static TestCustomResource testCustomResource() {
3232
return testCustomResource(new ResourceID(UUID.randomUUID().toString(), "test"));
3333
}
3434

35+
public static TestCustomResource testCustomResource1() {
36+
return testCustomResource(new ResourceID("test1", "default"));
37+
}
38+
3539
public static CustomResourceDefinition testCRD(String scope) {
3640
return new CustomResourceDefinitionBuilder()
3741
.editOrNewSpec()
@@ -43,10 +47,6 @@ public static CustomResourceDefinition testCRD(String scope) {
4347
.build();
4448
}
4549

46-
public static TestCustomResource testCustomResource1() {
47-
return testCustomResource(new ResourceID("test1", "default"));
48-
}
49-
5050
public static ResourceID testCustomResource1Id() {
5151
return new ResourceID("test1", "default");
5252
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright Java Operator SDK Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.javaoperatorsdk.operator.processing.event.source;
17+
18+
import java.util.concurrent.CountDownLatch;
19+
import java.util.concurrent.ExecutorService;
20+
import java.util.concurrent.Executors;
21+
import java.util.function.UnaryOperator;
22+
23+
import io.fabric8.kubernetes.api.model.HasMetadata;
24+
import io.javaoperatorsdk.operator.processing.event.source.informer.ManagedInformerEventSource;
25+
26+
public class EventFilterTestUtils {
27+
28+
static ExecutorService executorService = Executors.newCachedThreadPool();
29+
30+
public static <R extends HasMetadata> CountDownLatch sendForEventFilteringUpdate(
31+
ManagedInformerEventSource<R, ?, ?> eventSource, R resource, UnaryOperator<R> updateMethod) {
32+
try {
33+
CountDownLatch latch = new CountDownLatch(1);
34+
CountDownLatch sendOnGoingLatch = new CountDownLatch(1);
35+
executorService.submit(
36+
() ->
37+
eventSource.eventFilteringUpdateAndCacheResource(
38+
resource,
39+
r -> {
40+
try {
41+
sendOnGoingLatch.countDown();
42+
latch.await();
43+
var resp = updateMethod.apply(r);
44+
return resp;
45+
} catch (InterruptedException e) {
46+
throw new RuntimeException(e);
47+
}
48+
}));
49+
sendOnGoingLatch.await();
50+
return latch;
51+
} catch (InterruptedException e) {
52+
throw new RuntimeException(e);
53+
}
54+
}
55+
56+
public static <R extends HasMetadata> R withResourceVersion(R resource, int resourceVersion) {
57+
var v = resource.getMetadata().getResourceVersion();
58+
if (v == null) {
59+
throw new IllegalArgumentException("Resource version is null");
60+
}
61+
resource.getMetadata().setResourceVersion("" + resourceVersion);
62+
return resource;
63+
}
64+
}

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717

1818
import java.time.LocalDateTime;
1919
import java.util.List;
20+
import java.util.concurrent.CountDownLatch;
2021

2122
import org.junit.jupiter.api.BeforeEach;
23+
import org.junit.jupiter.api.Disabled;
2224
import org.junit.jupiter.api.Test;
2325

2426
import io.javaoperatorsdk.operator.MockKubernetesClient;
@@ -34,12 +36,14 @@
3436
import io.javaoperatorsdk.operator.processing.event.EventHandler;
3537
import io.javaoperatorsdk.operator.processing.event.EventSourceManager;
3638
import io.javaoperatorsdk.operator.processing.event.source.AbstractEventSourceTestBase;
39+
import io.javaoperatorsdk.operator.processing.event.source.EventFilterTestUtils;
3740
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
3841
import io.javaoperatorsdk.operator.processing.event.source.filter.GenericFilter;
3942
import io.javaoperatorsdk.operator.processing.event.source.filter.OnAddFilter;
4043
import io.javaoperatorsdk.operator.processing.event.source.filter.OnUpdateFilter;
4144
import io.javaoperatorsdk.operator.sample.simple.TestCustomResource;
4245

46+
import static io.javaoperatorsdk.operator.processing.event.source.EventFilterTestUtils.withResourceVersion;
4347
import static org.mockito.ArgumentMatchers.eq;
4448
import static org.mockito.Mockito.*;
4549

@@ -164,6 +168,33 @@ void genericFilterFiltersOutAddUpdateAndDeleteEvents() {
164168
verify(eventHandler, never()).handleEvent(any());
165169
}
166170

171+
@Disabled
172+
@Test
173+
void testEventFiltering() throws InterruptedException {
174+
source = spy(new ControllerEventSource<>(new TestController(null, null, null)));
175+
setUpSource(source, true, controllerConfig);
176+
177+
var latch = sendForEventFilteringUpdate(2);
178+
source.onUpdate(testResourceWithVersion(1), testResourceWithVersion(2));
179+
latch.countDown();
180+
Thread.sleep(100);
181+
verify(source, never()).handleEvent(any(), any(), any(), any(), anyBoolean());
182+
}
183+
184+
private TestCustomResource testResourceWithVersion(int v) {
185+
return withResourceVersion(TestUtils.testCustomResource1(), v);
186+
}
187+
188+
private CountDownLatch sendForEventFilteringUpdate(int v) {
189+
return sendForEventFilteringUpdate(TestUtils.testCustomResource1(), v);
190+
}
191+
192+
private CountDownLatch sendForEventFilteringUpdate(
193+
TestCustomResource testResource, int resourceVersion) {
194+
return EventFilterTestUtils.sendForEventFilteringUpdate(
195+
source, testResource, r -> withResourceVersion(testResource, resourceVersion));
196+
}
197+
167198
@SuppressWarnings("unchecked")
168199
private static class TestController extends Controller<TestCustomResource> {
169200

@@ -224,6 +255,7 @@ public TestConfiguration(
224255
.withOnAddFilter(onAddFilter)
225256
.withOnUpdateFilter(onUpdateFilter)
226257
.withGenericFilter(genericFilter)
258+
.withComparableResourceVersions(true)
227259
.buildForController(),
228260
false);
229261
}

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java

Lines changed: 15 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,14 @@
1515
*/
1616
package io.javaoperatorsdk.operator.processing.event.source.informer;
1717

18-
import java.time.Duration;
1918
import java.util.Optional;
2019
import java.util.Set;
2120
import java.util.concurrent.CountDownLatch;
22-
import java.util.concurrent.ExecutorService;
23-
import java.util.concurrent.Executors;
24-
import java.util.function.UnaryOperator;
2521

2622
import org.junit.jupiter.api.BeforeEach;
2723
import org.junit.jupiter.api.Test;
2824
import org.junit.jupiter.api.TestInstance;
2925

30-
import io.fabric8.kubernetes.api.model.HasMetadata;
3126
import io.fabric8.kubernetes.api.model.ObjectMeta;
3227
import io.fabric8.kubernetes.api.model.apps.Deployment;
3328
import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder;
@@ -43,12 +38,14 @@
4338
import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration;
4439
import io.javaoperatorsdk.operator.processing.event.EventHandler;
4540
import io.javaoperatorsdk.operator.processing.event.ResourceID;
41+
import io.javaoperatorsdk.operator.processing.event.source.EventFilterTestUtils;
4642
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
4743
import io.javaoperatorsdk.operator.processing.event.source.SecondaryToPrimaryMapper;
4844
import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling;
4945
import io.javaoperatorsdk.operator.sample.simple.TestCustomResource;
5046

5147
import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_NAMESPACES_SET;
48+
import static io.javaoperatorsdk.operator.processing.event.source.EventFilterTestUtils.withResourceVersion;
5249
import static org.assertj.core.api.Assertions.assertThat;
5350
import static org.awaitility.Awaitility.await;
5451
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -72,8 +69,6 @@ class InformerEventSourceTest {
7269
private static final String PREV_RESOURCE_VERSION = "0";
7370
private static final String DEFAULT_RESOURCE_VERSION = "1";
7471

75-
ExecutorService executorService = Executors.newCachedThreadPool();
76-
7772
private InformerEventSource<Deployment, TestCustomResource> informerEventSource;
7873
private final KubernetesClient clientMock = MockKubernetesClient.client(Deployment.class);
7974
private TemporaryResourceCache<Deployment> temporaryResourceCache =
@@ -224,12 +219,14 @@ void handlesPrevResourceVersionForUpdateInCaseOfException() {
224219
withRealTemporaryResourceCache();
225220

226221
CountDownLatch latch =
227-
sendForEventFilteringUpdate(
222+
EventFilterTestUtils.sendForEventFilteringUpdate(
223+
informerEventSource,
228224
testDeployment(),
229225
r -> {
230226
throw new KubernetesClientException("fake");
231227
});
232-
informerEventSource.onUpdate(testDeployment(), withResourceVersion(testDeployment(), 2));
228+
informerEventSource.onUpdate(
229+
deploymentWithResourceVersion(1), deploymentWithResourceVersion(2));
233230
latch.countDown();
234231

235232
expectHandleEvent(2, 1);
@@ -239,7 +236,6 @@ void handlesPrevResourceVersionForUpdateInCaseOfException() {
239236
void handlesPrevResourceVersionForUpdateInCaseOfMultipleUpdates() {
240237
withRealTemporaryResourceCache();
241238

242-
withRealTemporaryResourceCache();
243239
var deployment = testDeployment();
244240
CountDownLatch latch = sendForEventFilteringUpdate(deployment, 2);
245241
informerEventSource.onUpdate(
@@ -252,20 +248,16 @@ void handlesPrevResourceVersionForUpdateInCaseOfMultipleUpdates() {
252248
}
253249

254250
@Test
255-
void doesNotPropagateEventIfReceivedBeforeUpdate() {
251+
void doesNotPropagateEventIfReceivedBeforeUpdate() throws InterruptedException {
256252
withRealTemporaryResourceCache();
257-
var deployment = testDeployment();
258-
CountDownLatch latch = sendForEventFilteringUpdate(deployment, 2);
259-
informerEventSource.onUpdate(deployment, deploymentWithResourceVersion(2));
253+
254+
CountDownLatch latch = sendForEventFilteringUpdate(2);
255+
informerEventSource.onUpdate(
256+
deploymentWithResourceVersion(1), deploymentWithResourceVersion(2));
260257
latch.countDown();
261258

262-
await()
263-
.pollDelay(Duration.ofMillis(100))
264-
.untilAsserted(
265-
() -> {
266-
verify(informerEventSource, never())
267-
.handleEvent(any(), any(), any(), any(), anyBoolean());
268-
});
259+
Thread.sleep(100);
260+
verify(informerEventSource, never()).handleEvent(any(), any(), any(), any(), anyBoolean());
269261
}
270262

271263
private void expectHandleEvent(int newResourceVersion, int oldResourceVersion) {
@@ -297,34 +289,8 @@ private CountDownLatch sendForEventFilteringUpdate(int resourceVersion) {
297289
}
298290

299291
private CountDownLatch sendForEventFilteringUpdate(Deployment deployment, int resourceVersion) {
300-
return sendForEventFilteringUpdate(
301-
deployment, r -> withResourceVersion(deployment, resourceVersion));
302-
}
303-
304-
private CountDownLatch sendForEventFilteringUpdate(
305-
Deployment resource, UnaryOperator<Deployment> updateMethod) {
306-
try {
307-
CountDownLatch latch = new CountDownLatch(1);
308-
CountDownLatch sendOnGoingLatch = new CountDownLatch(1);
309-
executorService.submit(
310-
() ->
311-
informerEventSource.eventFilteringUpdateAndCacheResource(
312-
resource,
313-
r -> {
314-
try {
315-
sendOnGoingLatch.countDown();
316-
latch.await();
317-
var resp = updateMethod.apply(r);
318-
return resp;
319-
} catch (InterruptedException e) {
320-
throw new RuntimeException(e);
321-
}
322-
}));
323-
sendOnGoingLatch.await();
324-
return latch;
325-
} catch (InterruptedException e) {
326-
throw new RuntimeException(e);
327-
}
292+
return EventFilterTestUtils.sendForEventFilteringUpdate(
293+
informerEventSource, deployment, r -> withResourceVersion(deployment, resourceVersion));
328294
}
329295

330296
private void withRealTemporaryResourceCache() {
@@ -336,15 +302,6 @@ Deployment deploymentWithResourceVersion(int resourceVersion) {
336302
return withResourceVersion(testDeployment(), resourceVersion);
337303
}
338304

339-
<R extends HasMetadata> R withResourceVersion(R resource, int resourceVersion) {
340-
var v = resource.getMetadata().getResourceVersion();
341-
if (v == null) {
342-
throw new IllegalArgumentException("Resource version is null");
343-
}
344-
resource.getMetadata().setResourceVersion("" + resourceVersion);
345-
return resource;
346-
}
347-
348305
@Test
349306
void informerStoppedHandlerShouldBeCalledWhenInformerStops() {
350307
final var exception = new RuntimeException("Informer stopped exceptionally!");

0 commit comments

Comments
 (0)