Skip to content

Commit 6f5854c

Browse files
committed
fixes
Signed-off-by: Attila Mészáros <a_meszaros@apple.com>
1 parent 490fde0 commit 6f5854c

5 files changed

Lines changed: 284 additions & 2 deletions

File tree

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@
5353
* <ul>
5454
* <li>We propagate events only if we received an event that has the same resourceVersion or newer
5555
* than resource version from update
56-
* <li>The propagated event should correspond to a possible real world scenario - considering also
57-
* ones that could happen if the Informer does a re-list.
56+
* <li>The propagated event should correspond to a possible real world scenario - considering also
57+
* ones that could happen if the Informer does a re-list.
5858
* </ul>
5959
*
6060
* @param <T> resource to cache.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright Java Operator SDK Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.javaoperatorsdk.operator.baseapi.readcacheafterwrite.readownupdates;
17+
18+
import io.fabric8.kubernetes.api.model.Namespaced;
19+
import io.fabric8.kubernetes.client.CustomResource;
20+
import io.fabric8.kubernetes.model.annotation.Group;
21+
import io.fabric8.kubernetes.model.annotation.ShortNames;
22+
import io.fabric8.kubernetes.model.annotation.Version;
23+
24+
@Group("sample.javaoperatorsdk")
25+
@Version("v1")
26+
@ShortNames("rou")
27+
public class ReadOwnUpdatesCustomResource extends CustomResource<Void, ReadOwnUpdatesStatus>
28+
implements Namespaced {}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright Java Operator SDK Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.javaoperatorsdk.operator.baseapi.readcacheafterwrite.readownupdates;
17+
18+
import java.time.Duration;
19+
20+
import org.junit.jupiter.api.Test;
21+
import org.junit.jupiter.api.extension.RegisterExtension;
22+
23+
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
24+
import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension;
25+
26+
import static org.assertj.core.api.Assertions.assertThat;
27+
import static org.awaitility.Awaitility.await;
28+
29+
class ReadOwnUpdatesIT {
30+
31+
public static final int RESOURCE_NUMBER = 250;
32+
ReadOwnUpdatesReconciler reconciler = new ReadOwnUpdatesReconciler();
33+
34+
@RegisterExtension
35+
LocallyRunOperatorExtension operator =
36+
LocallyRunOperatorExtension.builder().withReconciler(reconciler).build();
37+
38+
@Test
39+
void testResourceAccessAfterUpdate() {
40+
for (int i = 0; i < RESOURCE_NUMBER; i++) {
41+
operator.create(createCustomResource(i));
42+
}
43+
await()
44+
.pollDelay(Duration.ofSeconds(5))
45+
.atMost(Duration.ofMinutes(1))
46+
.until(
47+
() -> {
48+
if (reconciler.isIssueFound()) {
49+
// Stop waiting as soon as an issue is detected.
50+
return true;
51+
}
52+
// Use a single representative resource to detect that updates have completed.
53+
var res =
54+
operator.get(
55+
ReadOwnUpdatesCustomResource.class, "resource" + (RESOURCE_NUMBER - 1));
56+
return res != null
57+
&& res.getStatus() != null
58+
&& Boolean.TRUE.equals(res.getStatus().getUpdated());
59+
});
60+
61+
if (operator.getReconcilerOfType(ReadOwnUpdatesReconciler.class).isIssueFound()) {
62+
throw new IllegalStateException("Error already found.");
63+
}
64+
65+
for (int i = 0; i < RESOURCE_NUMBER; i++) {
66+
var res = operator.get(ReadOwnUpdatesCustomResource.class, "resource" + i);
67+
assertThat(res.getStatus()).isNotNull();
68+
assertThat(res.getStatus().getUpdated()).isTrue();
69+
}
70+
}
71+
72+
public ReadOwnUpdatesCustomResource createCustomResource(int i) {
73+
ReadOwnUpdatesCustomResource resource = new ReadOwnUpdatesCustomResource();
74+
resource.setMetadata(
75+
new ObjectMetaBuilder()
76+
.withName("resource" + i)
77+
.withNamespace(operator.getNamespace())
78+
.build());
79+
return resource;
80+
}
81+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*
2+
* Copyright Java Operator SDK Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.javaoperatorsdk.operator.baseapi.readcacheafterwrite.readownupdates;
17+
18+
import java.util.List;
19+
import java.util.Map;
20+
import java.util.concurrent.atomic.AtomicBoolean;
21+
22+
import io.fabric8.kubernetes.api.model.ConfigMap;
23+
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
24+
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
25+
import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration;
26+
import io.javaoperatorsdk.operator.api.reconciler.Context;
27+
import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
28+
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
29+
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
30+
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
31+
import io.javaoperatorsdk.operator.processing.event.ResourceID;
32+
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
33+
import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
34+
35+
@ControllerConfiguration
36+
public class ReadOwnUpdatesReconciler implements Reconciler<ReadOwnUpdatesCustomResource> {
37+
38+
public static final String RESOURCE_VERSION_INDEX = "resourceVersionIndex";
39+
private final AtomicBoolean issueFound = new AtomicBoolean(false);
40+
41+
private InformerEventSource<ConfigMap, ReadOwnUpdatesCustomResource> configMapEventSource;
42+
43+
@Override
44+
public UpdateControl<ReadOwnUpdatesCustomResource> reconcile(
45+
ReadOwnUpdatesCustomResource resource, Context<ReadOwnUpdatesCustomResource> context) {
46+
try {
47+
var updated = context.resourceOperations().serverSideApply(prepareCM(resource, 1));
48+
var cachedCM = context.getSecondaryResource(ConfigMap.class);
49+
if (cachedCM.isEmpty()) {
50+
throw new IllegalStateException("Error for resource: " + ResourceID.fromResource(resource));
51+
}
52+
checkListContainsCM(updated);
53+
checkIfResourceVersionIndexContainsUpdated(updated);
54+
updated = context.resourceOperations().serverSideApply(prepareCM(resource, 2));
55+
cachedCM = context.getSecondaryResource(ConfigMap.class);
56+
if (!cachedCM
57+
.orElseThrow()
58+
.getMetadata()
59+
.getResourceVersion()
60+
.equals(updated.getMetadata().getResourceVersion())) {
61+
throw new IllegalStateException(
62+
"Update error for resource: " + ResourceID.fromResource(resource));
63+
}
64+
checkListContainsCM(updated);
65+
checkIfResourceVersionIndexContainsUpdated(updated);
66+
67+
ensureStatusExists(resource);
68+
resource.getStatus().setUpdated(true);
69+
return UpdateControl.patchStatus(resource);
70+
} catch (IllegalStateException e) {
71+
issueFound.set(true);
72+
throw e;
73+
}
74+
}
75+
76+
private void checkIfResourceVersionIndexContainsUpdated(ConfigMap updated) {
77+
if (configMapEventSource
78+
.byIndex(RESOURCE_VERSION_INDEX, updated.getMetadata().getResourceVersion())
79+
.stream()
80+
.noneMatch(
81+
r ->
82+
ResourceID.fromResource(r).equals(ResourceID.fromResource(updated))
83+
&& r.getMetadata()
84+
.getResourceVersion()
85+
.equals(updated.getMetadata().getResourceVersion()))) {
86+
throw new IllegalStateException(
87+
"Index does not contain resource: " + ResourceID.fromResource(updated));
88+
}
89+
}
90+
91+
private void checkListContainsCM(ConfigMap updated) {
92+
if (configMapEventSource
93+
.list()
94+
.noneMatch(
95+
r ->
96+
ResourceID.fromResource(r).equals(ResourceID.fromResource(updated))
97+
&& r.getMetadata()
98+
.getResourceVersion()
99+
.equals(updated.getMetadata().getResourceVersion()))) {
100+
throw new IllegalStateException(
101+
"List does not contain resource: " + ResourceID.fromResource(updated));
102+
}
103+
}
104+
105+
private static ConfigMap prepareCM(ReadOwnUpdatesCustomResource p, int num) {
106+
var cm =
107+
new ConfigMapBuilder()
108+
.withMetadata(
109+
new ObjectMetaBuilder()
110+
.withName(p.getMetadata().getName())
111+
.withNamespace(p.getMetadata().getNamespace())
112+
.build())
113+
.withData(Map.of("name", p.getMetadata().getName(), "num", "" + num))
114+
.build();
115+
cm.addOwnerReference(p);
116+
return cm;
117+
}
118+
119+
@Override
120+
public List<EventSource<?, ReadOwnUpdatesCustomResource>> prepareEventSources(
121+
EventSourceContext<ReadOwnUpdatesCustomResource> context) {
122+
configMapEventSource =
123+
new InformerEventSource<>(
124+
InformerEventSourceConfiguration.from(
125+
ConfigMap.class, ReadOwnUpdatesCustomResource.class)
126+
.build(),
127+
context);
128+
configMapEventSource.addIndexers(
129+
Map.of(RESOURCE_VERSION_INDEX, cm -> List.of(cm.getMetadata().getResourceVersion())));
130+
return List.of(configMapEventSource);
131+
}
132+
133+
private void ensureStatusExists(ReadOwnUpdatesCustomResource resource) {
134+
ReadOwnUpdatesStatus status = resource.getStatus();
135+
if (status == null) {
136+
status = new ReadOwnUpdatesStatus();
137+
resource.setStatus(status);
138+
}
139+
}
140+
141+
public boolean isIssueFound() {
142+
return issueFound.get();
143+
}
144+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright Java Operator SDK Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.javaoperatorsdk.operator.baseapi.readcacheafterwrite.readownupdates;
17+
18+
public class ReadOwnUpdatesStatus {
19+
20+
private Boolean updated;
21+
22+
public Boolean getUpdated() {
23+
return updated;
24+
}
25+
26+
public void setUpdated(Boolean updated) {
27+
this.updated = updated;
28+
}
29+
}

0 commit comments

Comments
 (0)