Skip to content

Commit 3c9de5c

Browse files
csvirimetacosm
authored andcommitted
feat: provide de-duplicated secondary resources stream on Context
Signed-off-by: Chris Laprun <metacosm@gmail.com>
1 parent a3bc9e7 commit 3c9de5c

File tree

12 files changed

+616
-29
lines changed

12 files changed

+616
-29
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Context.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,23 @@ default <R> Optional<R> getSecondaryResource(Class<R> expectedType) {
3838
<R> Set<R> getSecondaryResources(Class<R> expectedType);
3939

4040
default <R> Stream<R> getSecondaryResourcesAsStream(Class<R> expectedType) {
41-
return getSecondaryResources(expectedType).stream();
41+
return getSecondaryResourcesAsStream(expectedType, false);
4242
}
4343

44+
/**
45+
* Retrieves a {@link Stream} the secondary resources of the specified type, which are associated
46+
* with the primary resource being processed, possibly making sure that only the latest version of
47+
* each resource is retrieved.
48+
*
49+
* @param expectedType a class representing the type of secondary resources to retrieve
50+
* @param deduplicate {@code true} if only the latest version of each resource should be kept,
51+
* {@code false} otherwise
52+
* @return a {@link Stream} of secondary resources of the specified type, possibly deduplicated
53+
* @param <R> the type of secondary resources to retrieve
54+
* @since 5.3.0
55+
*/
56+
<R> Stream<R> getSecondaryResourcesAsStream(Class<R> expectedType, boolean deduplicate);
57+
4458
<R> Optional<R> getSecondaryResource(Class<R> expectedType, String eventSourceName);
4559

4660
ControllerConfiguration<P> getControllerConfiguration();

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
import io.fabric8.kubernetes.api.model.HasMetadata;
2828
import io.fabric8.kubernetes.client.KubernetesClient;
29+
import io.javaoperatorsdk.operator.ReconcilerUtilsInternal;
2930
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
3031
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;
3132
import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.DefaultManagedWorkflowAndDependentResourceContext;
@@ -36,7 +37,6 @@
3637
import io.javaoperatorsdk.operator.processing.event.ResourceID;
3738

3839
public class DefaultContext<P extends HasMetadata> implements Context<P> {
39-
4040
private RetryInfo retryInfo;
4141
private final Controller<P> controller;
4242
private final P primaryResource;
@@ -75,11 +75,36 @@ public <T> Set<T> getSecondaryResources(Class<T> expectedType) {
7575
return getSecondaryResourcesAsStream(expectedType).collect(Collectors.toSet());
7676
}
7777

78-
@Override
79-
public <R> Stream<R> getSecondaryResourcesAsStream(Class<R> expectedType) {
80-
return controller.getEventSourceManager().getEventSourcesFor(expectedType).stream()
81-
.map(es -> es.getSecondaryResources(primaryResource))
82-
.flatMap(Set::stream);
78+
public <R> Stream<R> getSecondaryResourcesAsStream(Class<R> expectedType, boolean deduplicate) {
79+
80+
final var stream =
81+
controller.getEventSourceManager().getEventSourcesFor(expectedType).stream()
82+
.<R>mapMulti(
83+
(es, consumer) -> es.getSecondaryResources(primaryResource).forEach(consumer));
84+
if (deduplicate) {
85+
if (!HasMetadata.class.isAssignableFrom(expectedType)) {
86+
throw new IllegalArgumentException("Can only de-duplicate HasMetadata descendants");
87+
}
88+
return stream
89+
.collect(
90+
Collectors.toUnmodifiableMap(
91+
DefaultContext::resourceID,
92+
Function.identity(),
93+
(existing, replacement) ->
94+
compareResourceVersions(existing, replacement) >= 0 ? existing : replacement))
95+
.values()
96+
.stream();
97+
} else {
98+
return stream;
99+
}
100+
}
101+
102+
private static ResourceID resourceID(Object hasMetadata) {
103+
return ResourceID.fromResource((HasMetadata) hasMetadata);
104+
}
105+
106+
private static int compareResourceVersions(Object v1, Object v2) {
107+
return ReconcilerUtilsInternal.compareResourceVersions((HasMetadata) v1, (HasMetadata) v2);
83108
}
84109

85110
@Override

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceOperations.java

Lines changed: 66 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,17 @@
1616
package io.javaoperatorsdk.operator.api.reconciler;
1717

1818
import java.lang.reflect.InvocationTargetException;
19+
import java.util.ArrayList;
20+
import java.util.Collection;
21+
import java.util.HashSet;
22+
import java.util.List;
23+
import java.util.Map;
24+
import java.util.Set;
25+
import java.util.function.Function;
1926
import java.util.function.Predicate;
2027
import java.util.function.UnaryOperator;
28+
import java.util.stream.Collector;
29+
import java.util.stream.Collectors;
2130

2231
import org.slf4j.Logger;
2332
import org.slf4j.LoggerFactory;
@@ -30,6 +39,7 @@
3039
import io.javaoperatorsdk.operator.processing.event.ResourceID;
3140
import io.javaoperatorsdk.operator.processing.event.source.informer.ManagedInformerEventSource;
3241

42+
import static io.javaoperatorsdk.operator.ReconcilerUtilsInternal.compareResourceVersions;
3343
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getUID;
3444
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion;
3545

@@ -437,13 +447,13 @@ public <R extends HasMetadata> R resourcePatch(R resource, UnaryOperator<R> upda
437447
if (esList.isEmpty()) {
438448
throw new IllegalStateException("No event source found for type: " + resource.getClass());
439449
}
450+
var es = esList.get(0);
440451
if (esList.size() > 1) {
441-
throw new IllegalStateException(
442-
"Multiple event sources found for: "
443-
+ resource.getClass()
444-
+ " please provide the target event source");
452+
log.warn(
453+
"Multiple event sources found for type: {}, selecting first with name {}",
454+
resource.getClass(),
455+
es.name());
445456
}
446-
var es = esList.get(0);
447457
if (es instanceof ManagedInformerEventSource mes) {
448458
return resourcePatch(resource, updateOperation, (ManagedInformerEventSource<R, P, ?>) mes);
449459
} else {
@@ -655,4 +665,55 @@ public P addFinalizerWithSSA(String finalizerName) {
655665
e);
656666
}
657667
}
668+
669+
/**
670+
* Returns a collector that deduplicates Kubernetes objects by keeping only the one with the
671+
* latest metadata.resourceVersion for each unique name and namespace combination. The intended
672+
* use case is for the rather rare setup when there are overlapping {@link
673+
* io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource}s for a
674+
* resource type.
675+
*
676+
* @param <T> the type of HasMetadata objects
677+
* @return a collector that produces a collection of deduplicated Kubernetes objects
678+
*/
679+
public static <T extends HasMetadata> Collector<T, ?, Collection<T>> latestDistinct() {
680+
return Collectors.collectingAndThen(latestDistinctToMap(), Map::values);
681+
}
682+
683+
/**
684+
* Returns a collector that deduplicates Kubernetes objects by keeping only the one with the
685+
* latest metadata.resourceVersion for each unique name and namespace combination. The intended
686+
* use case is for the rather rare setup when there are overlapping {@link
687+
* io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource}s for a
688+
* resource type.
689+
*
690+
* @param <T> the type of HasMetadata objects
691+
* @return a collector that produces a List of deduplicated Kubernetes objects
692+
*/
693+
public static <T extends HasMetadata> Collector<T, ?, List<T>> latestDistinctList() {
694+
return Collectors.collectingAndThen(
695+
latestDistinctToMap(), map -> new ArrayList<>(map.values()));
696+
}
697+
698+
/**
699+
* Returns a collector that deduplicates Kubernetes objects by keeping only the one with the
700+
* latest metadata.resourceVersion for each unique name and namespace combination. The intended
701+
* use case is for the rather rare setup when there are overlapping {@link
702+
* io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource}s for a
703+
* resource type.
704+
*
705+
* @param <T> the type of HasMetadata objects
706+
* @return a collector that produces a Set of deduplicated Kubernetes objects
707+
*/
708+
public static <T extends HasMetadata> Collector<T, ?, Set<T>> latestDistinctSet() {
709+
return Collectors.collectingAndThen(latestDistinctToMap(), map -> new HashSet<>(map.values()));
710+
}
711+
712+
private static <T extends HasMetadata> Collector<T, ?, Map<ResourceID, T>> latestDistinctToMap() {
713+
return Collectors.toMap(
714+
ResourceID::fromResource,
715+
Function.identity(),
716+
(existing, replacement) ->
717+
compareResourceVersions(existing, replacement) >= 0 ? existing : replacement);
718+
}
658719
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceID.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,26 @@ public boolean equals(Object o) {
6363
}
6464

6565
public boolean isSameResource(HasMetadata hasMetadata) {
66+
if (hasMetadata == null) {
67+
return false;
68+
}
6669
final var metadata = hasMetadata.getMetadata();
67-
return getName().equals(metadata.getName())
68-
&& getNamespace().map(ns -> ns.equals(metadata.getNamespace())).orElse(true);
70+
return isSameResource(metadata.getName(), metadata.getNamespace());
71+
}
72+
73+
/**
74+
* Whether this ResourceID points to the same resource as the one identified by the specified name
75+
* and namespace. Note that this doesn't take API version or Kind into account so this should only
76+
* be used when checking resources that are reasonably expected to be of the same type.
77+
*
78+
* @param name the name of the resource we want to check
79+
* @param namespace the possibly {@code null} namespace of the resource we want to check
80+
* @return {@code true} if this resource points to the same resource as the one pointed to by the
81+
* specified name and namespace, {@code false} otherwise
82+
* @since 5.3.0
83+
*/
84+
public boolean isSameResource(String name, String namespace) {
85+
return Objects.equals(this.name, name) && Objects.equals(this.namespace, namespace);
6986
}
7087

7188
@Override

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,8 @@ public Set<R> getSecondaryResources(P primary) {
218218
}
219219
return secondaryIDs.stream()
220220
.map(this::get)
221-
.flatMap(Optional::stream)
221+
.filter(Optional::isPresent)
222+
.map(Optional::get)
222223
.collect(Collectors.toSet());
223224
}
224225

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContextTest.java

Lines changed: 118 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,23 @@
1515
*/
1616
package io.javaoperatorsdk.operator.api.reconciler;
1717

18+
import java.util.List;
19+
import java.util.Set;
20+
21+
import org.junit.jupiter.api.BeforeEach;
1822
import org.junit.jupiter.api.Test;
1923

2024
import io.fabric8.kubernetes.api.model.ConfigMap;
25+
import io.fabric8.kubernetes.api.model.HasMetadata;
26+
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
27+
import io.fabric8.kubernetes.api.model.Pod;
28+
import io.fabric8.kubernetes.api.model.PodBuilder;
2129
import io.fabric8.kubernetes.api.model.Secret;
2230
import io.javaoperatorsdk.operator.processing.Controller;
2331
import io.javaoperatorsdk.operator.processing.event.EventSourceManager;
2432
import io.javaoperatorsdk.operator.processing.event.NoEventSourceForClassException;
33+
import io.javaoperatorsdk.operator.processing.event.ResourceID;
34+
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
2535

2636
import static org.assertj.core.api.Assertions.assertThat;
2737
import static org.mockito.ArgumentMatchers.any;
@@ -30,17 +40,21 @@
3040

3141
class DefaultContextTest {
3242

33-
private final Secret primary = new Secret();
34-
private final Controller<Secret> mockController = mock();
43+
private DefaultContext<?> context;
44+
private Controller<HasMetadata> mockController;
45+
private EventSourceManager<HasMetadata> mockManager;
3546

36-
private final DefaultContext<?> context =
37-
new DefaultContext<>(null, mockController, primary, false, false);
47+
@BeforeEach
48+
void setUp() {
49+
mockController = mock();
50+
mockManager = mock();
51+
when(mockController.getEventSourceManager()).thenReturn(mockManager);
52+
53+
context = new DefaultContext<>(null, mockController, new Secret(), false, false);
54+
}
3855

3956
@Test
40-
@SuppressWarnings("unchecked")
4157
void getSecondaryResourceReturnsEmptyOptionalOnNonActivatedDRType() {
42-
var mockManager = mock(EventSourceManager.class);
43-
when(mockController.getEventSourceManager()).thenReturn(mockManager);
4458
when(mockController.workflowContainsDependentForType(ConfigMap.class)).thenReturn(true);
4559
when(mockManager.getEventSourceFor(any(), any()))
4660
.thenThrow(new NoEventSourceForClassException(ConfigMap.class));
@@ -56,4 +70,101 @@ void setRetryInfo() {
5670
assertThat(newContext).isSameAs(context);
5771
assertThat(newContext.getRetryInfo()).hasValue(retryInfo);
5872
}
73+
74+
@Test
75+
void latestDistinctKeepsOnlyLatestResourceVersion() {
76+
// Create multiple resources with same name and namespace but different versions
77+
var pod1v1 = podWithNameAndVersion("pod1", "100");
78+
var pod1v2 = podWithNameAndVersion("pod1", "200");
79+
var pod1v3 = podWithNameAndVersion("pod1", "150");
80+
81+
// Create a resource with different name
82+
var pod2v1 = podWithNameAndVersion("pod2", "100");
83+
84+
// Create a resource with same name but different namespace
85+
var pod1OtherNsv1 = podWithNameAndVersion("pod1", "50", "other");
86+
87+
setUpEventSourceWith(pod1v1, pod1v2, pod1v3, pod1OtherNsv1, pod2v1);
88+
89+
var result = context.getSecondaryResourcesAsStream(Pod.class, true).toList();
90+
91+
// Should have 3 resources: pod1 in default (latest version 200), pod2 in default, and pod1 in
92+
// other
93+
assertThat(result).hasSize(3);
94+
95+
// Find pod1 in default namespace - should have version 200
96+
final var pod1InDefault =
97+
result.stream()
98+
.filter(r -> ResourceID.fromResource(r).isSameResource("pod1", "default"))
99+
.findFirst()
100+
.orElseThrow();
101+
assertThat(pod1InDefault.getMetadata().getResourceVersion()).isEqualTo("200");
102+
103+
// Find pod2 in default namespace - should exist
104+
HasMetadata pod2InDefault =
105+
result.stream()
106+
.filter(r -> ResourceID.fromResource(r).isSameResource("pod2", "default"))
107+
.findFirst()
108+
.orElseThrow();
109+
assertThat(pod2InDefault.getMetadata().getResourceVersion()).isEqualTo("100");
110+
111+
// Find pod1 in other namespace - should exist
112+
HasMetadata pod1InOther =
113+
result.stream()
114+
.filter(r -> ResourceID.fromResource(r).isSameResource("pod1", "other"))
115+
.findFirst()
116+
.orElseThrow();
117+
assertThat(pod1InOther.getMetadata().getResourceVersion()).isEqualTo("50");
118+
}
119+
120+
private void setUpEventSourceWith(Pod... pods) {
121+
EventSource<Pod, HasMetadata> mockEventSource = mock();
122+
when(mockEventSource.getSecondaryResources(any())).thenReturn(Set.of(pods));
123+
when(mockManager.getEventSourcesFor(Pod.class)).thenReturn(List.of(mockEventSource));
124+
}
125+
126+
private static Pod podWithNameAndVersion(
127+
String name, String resourceVersion, String... namespace) {
128+
final var ns = namespace != null && namespace.length > 0 ? namespace[0] : "default";
129+
return new PodBuilder()
130+
.withMetadata(
131+
new ObjectMetaBuilder()
132+
.withName(name)
133+
.withNamespace(ns)
134+
.withResourceVersion(resourceVersion)
135+
.build())
136+
.build();
137+
}
138+
139+
@Test
140+
void latestDistinctHandlesEmptyStream() {
141+
var result = context.getSecondaryResourcesAsStream(Pod.class, true).toList();
142+
143+
assertThat(result).isEmpty();
144+
}
145+
146+
@Test
147+
void latestDistinctHandlesSingleResource() {
148+
final var pod = podWithNameAndVersion("pod1", "100");
149+
setUpEventSourceWith(pod);
150+
151+
var result = context.getSecondaryResourcesAsStream(Pod.class, true).toList();
152+
153+
assertThat(result).hasSize(1);
154+
assertThat(result).contains(pod);
155+
}
156+
157+
@Test
158+
void latestDistinctComparesNumericVersionsCorrectly() {
159+
// Test that version 1000 is greater than version 999 (not lexicographic)
160+
final var podV999 = podWithNameAndVersion("pod1", "999");
161+
final var podV1000 = podWithNameAndVersion("pod1", "1000");
162+
setUpEventSourceWith(podV999, podV1000);
163+
164+
var result = context.getSecondaryResourcesAsStream(Pod.class, true).toList();
165+
166+
assertThat(result).hasSize(1);
167+
HasMetadata resultPod = result.iterator().next();
168+
assertThat(resultPod.getMetadata().getResourceVersion()).isEqualTo("1000");
169+
}
59170
}

0 commit comments

Comments
 (0)