Skip to content

Commit ade5d7f

Browse files
committed
feat: provide de-duplicated secondary resources stream on Context
Signed-off-by: Chris Laprun <metacosm@gmail.com>
1 parent fd78e57 commit ade5d7f

File tree

5 files changed

+40
-14
lines changed

5 files changed

+40
-14
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Context.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,11 @@ default <R> Optional<R> getSecondaryResource(Class<R> expectedType) {
3838
<R> Set<R> getSecondaryResources(Class<R> expectedType);
3939

4040
default <R> Stream<R> getSecondaryResourcesAsStream(Class<R> expectedType) {
41-
return getSecondaryResources(expectedType).stream();
41+
return getSecondaryResourcesAsStream(expectedType, false);
4242
}
4343

44+
<R> Stream<R> getSecondaryResourcesAsStream(Class<R> expectedType, boolean deduplicate);
45+
4446
<R> Optional<R> getSecondaryResource(Class<R> expectedType, String eventSourceName);
4547

4648
ControllerConfiguration<P> getControllerConfiguration();

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
import java.util.stream.Collectors;
2525
import java.util.stream.Stream;
2626

27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
2730
import io.fabric8.kubernetes.api.model.HasMetadata;
2831
import io.fabric8.kubernetes.client.KubernetesClient;
2932
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
@@ -35,7 +38,10 @@
3538
import io.javaoperatorsdk.operator.processing.event.NoEventSourceForClassException;
3639
import io.javaoperatorsdk.operator.processing.event.ResourceID;
3740

41+
import static io.javaoperatorsdk.operator.ReconcilerUtilsInternal.compareResourceVersions;
42+
3843
public class DefaultContext<P extends HasMetadata> implements Context<P> {
44+
private static final Logger log = LoggerFactory.getLogger(DefaultContext.class);
3945

4046
private RetryInfo retryInfo;
4147
private final Controller<P> controller;
@@ -73,11 +79,31 @@ public <T> Set<T> getSecondaryResources(Class<T> expectedType) {
7379
return getSecondaryResourcesAsStream(expectedType).collect(Collectors.toSet());
7480
}
7581

76-
@Override
77-
public <R> Stream<R> getSecondaryResourcesAsStream(Class<R> expectedType) {
78-
return controller.getEventSourceManager().getEventSourcesFor(expectedType).stream()
79-
.map(es -> es.getSecondaryResources(primaryResource))
80-
.flatMap(Set::stream);
82+
public <R> Stream<R> getSecondaryResourcesAsStream(Class<R> expectedType, boolean deduplicate) {
83+
if (deduplicate && !HasMetadata.class.isAssignableFrom(expectedType)) {
84+
throw new IllegalArgumentException("Can only de-duplicate HasMetadata descendants");
85+
}
86+
87+
final var stream =
88+
controller.getEventSourceManager().getEventSourcesFor(expectedType).stream()
89+
.<R>mapMulti(
90+
(es, consumer) -> es.getSecondaryResources(primaryResource).forEach(consumer));
91+
if (deduplicate) {
92+
//noinspection unchecked
93+
return stream
94+
.map(HasMetadata.class::cast)
95+
.collect(
96+
Collectors.toUnmodifiableMap(
97+
ResourceID::fromResource,
98+
Function.identity(),
99+
(existing, replacement) ->
100+
compareResourceVersions(existing, replacement) >= 0 ? existing : replacement))
101+
.values()
102+
.stream()
103+
.map(hasMetadata -> (R) hasMetadata);
104+
} else {
105+
return stream;
106+
}
81107
}
82108

83109
@Override

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtils.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.List;
2323
import java.util.Map;
2424
import java.util.Set;
25+
import java.util.function.Function;
2526
import java.util.function.Predicate;
2627
import java.util.function.UnaryOperator;
2728
import java.util.stream.Collector;
@@ -650,9 +651,8 @@ public static <P extends HasMetadata> P addFinalizerWithSSA(
650651

651652
private static <T extends HasMetadata> Collector<T, ?, Map<ResourceID, T>> latestDistinctToMap() {
652653
return Collectors.toMap(
653-
resource ->
654-
new ResourceID(resource.getMetadata().getName(), resource.getMetadata().getNamespace()),
655-
resource -> resource,
654+
ResourceID::fromResource,
655+
Function.identity(),
656656
(existing, replacement) ->
657657
compareResourceVersions(existing, replacement) >= 0 ? existing : replacement);
658658
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,8 @@ public Set<R> getSecondaryResources(P primary) {
218218
}
219219
return secondaryIDs.stream()
220220
.map(this::get)
221-
.flatMap(Optional::stream)
221+
.filter(Optional::isPresent)
222+
.map(Optional::get)
222223
.collect(Collectors.toSet());
223224
}
224225

operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/latestdistinct/LatestDistinctTestReconciler.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,7 @@ public UpdateControl<LatestDistinctTestResource> reconcile(
5858
return UpdateControl.noUpdate();
5959
}
6060
// makes sure that distinc config maps returned
61-
var distinctConfigMaps =
62-
context
63-
.getSecondaryResourcesAsStream(ConfigMap.class)
64-
.collect(ReconcileUtils.latestDistinctList());
61+
var distinctConfigMaps = context.getSecondaryResourcesAsStream(ConfigMap.class, true).toList();
6562
if (distinctConfigMaps.size() != 1) {
6663
errorOccurred = true;
6764
throw new IllegalStateException();

0 commit comments

Comments
 (0)