Skip to content

Commit 122d59e

Browse files
committed
wip impl
Signed-off-by: Attila Mészáros <a_meszaros@apple.com>
1 parent 5363ec6 commit 122d59e

File tree

6 files changed

+65
-49
lines changed

6 files changed

+65
-49
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/expectation/ExpectationResult.java

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,5 @@
22

33
import io.fabric8.kubernetes.api.model.HasMetadata;
44

5-
public class ExpectationResult<P extends HasMetadata> {
6-
7-
private ExpectationStatus status;
8-
9-
private Expectation<P> expectation;
10-
11-
public ExpectationResult(ExpectationStatus status) {
12-
this.status = status;
13-
}
14-
15-
public ExpectationStatus getStatus() {
16-
return status;
17-
}
18-
19-
public Expectation<P> getExpectation() {
20-
return expectation;
21-
}
22-
}
5+
public record ExpectationResult<P extends HasMetadata>(
6+
ExpectationStatus status, Expectation<P> expectation) {}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import io.javaoperatorsdk.operator.api.reconciler.Constants;
2020
import io.javaoperatorsdk.operator.api.reconciler.expectation.DefaultExpectationContext;
2121
import io.javaoperatorsdk.operator.api.reconciler.expectation.ExpectationContext;
22+
import io.javaoperatorsdk.operator.api.reconciler.expectation.ExpectationResult;
23+
import io.javaoperatorsdk.operator.api.reconciler.expectation.ExpectationStatus;
2224
import io.javaoperatorsdk.operator.processing.LifecycleAware;
2325
import io.javaoperatorsdk.operator.processing.MDCUtils;
2426
import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter;
@@ -138,8 +140,18 @@ private void submitReconciliationExecution(ResourceState state) {
138140
Optional<P> maybeLatest = cache.get(resourceID);
139141
maybeLatest.ifPresent(MDCUtils::addResourceInfo);
140142
if (!controllerUnderExecution && maybeLatest.isPresent()) {
141-
if (!shouldProceedWithExpectation(state, maybeLatest.orElseThrow())) {
142-
return;
143+
ExpectationResult<P> expectationResult = null;
144+
if (isExpectationPresent(state)) {
145+
var expectationCheckResult =
146+
shouldProceedWithExpectation(state, maybeLatest.orElseThrow());
147+
if (expectationCheckResult.isEmpty()) {
148+
log.debug(
149+
"Skipping processing since expectation is not fulfilled. ResourceID: {}",
150+
resourceID);
151+
return;
152+
} else {
153+
expectationResult = expectationCheckResult.orElseThrow();
154+
}
143155
}
144156

145157
var rateLimit = state.getRateLimit();
@@ -154,7 +166,8 @@ private void submitReconciliationExecution(ResourceState state) {
154166
}
155167
state.setUnderProcessing(true);
156168
final var latest = maybeLatest.get();
157-
ExecutionScope<P> executionScope = new ExecutionScope<>(state.getRetry());
169+
ExecutionScope<P> executionScope =
170+
new ExecutionScope<>(state.getRetry(), expectationResult);
158171
state.unMarkEventReceived();
159172
metrics.reconcileCustomResource(latest, state.getRetry(), metricsMetadata);
160173
log.debug("Executing events for custom resource. Scope: {}", executionScope);
@@ -180,19 +193,24 @@ private void submitReconciliationExecution(ResourceState state) {
180193
}
181194
}
182195

183-
boolean shouldProceedWithExpectation(ResourceState state, P primary) {
184-
var optionalHolder = state.getExpectationHolder();
185-
if (optionalHolder.isEmpty()) {
186-
return true;
187-
}
188-
var holder = optionalHolder.orElseThrow();
196+
private boolean isExpectationPresent(ResourceState state) {
197+
return state.getExpectationHolder().isPresent();
198+
}
199+
200+
@SuppressWarnings("unchecked")
201+
Optional<ExpectationResult<P>> shouldProceedWithExpectation(ResourceState state, P primary) {
202+
203+
var holder = state.getExpectationHolder().orElseThrow();
189204
if (holder.isTimedOut()) {
190-
return true;
205+
return Optional.of(
206+
new ExpectationResult<P>(ExpectationStatus.TIMEOUT, holder.getExpectation()));
191207
}
192-
// todo cleanup state etc
193208
ExpectationContext<P> expectationContext =
194209
new DefaultExpectationContext<>(this.eventSourceManager.getController(), primary);
195-
return holder.getExpectation().isFulfilled(primary, expectationContext);
210+
return holder.getExpectation().isFulfilled(primary, expectationContext)
211+
? Optional.of(
212+
new ExpectationResult<P>(ExpectationStatus.FULFILLED, holder.getExpectation()))
213+
: Optional.empty();
196214
}
197215

198216
private void handleEventMarking(Event event, ResourceState state) {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ExecutionScope.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,18 @@
22

33
import io.fabric8.kubernetes.api.model.HasMetadata;
44
import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
5+
import io.javaoperatorsdk.operator.api.reconciler.expectation.ExpectationResult;
56

67
class ExecutionScope<R extends HasMetadata> {
78

89
// the latest custom resource from cache
910
private R resource;
1011
private final RetryInfo retryInfo;
12+
private final ExpectationResult<R> expectationResult;
1113

12-
ExecutionScope(RetryInfo retryInfo) {
14+
ExecutionScope(RetryInfo retryInfo, ExpectationResult<R> expectationResult) {
1315
this.retryInfo = retryInfo;
16+
this.expectationResult = expectationResult;
1417
}
1518

1619
public ExecutionScope<R> setResource(R resource) {
@@ -42,4 +45,8 @@ public String toString() {
4245
public RetryInfo getRetryInfo() {
4346
return retryInfo;
4447
}
48+
49+
public ExpectationResult<R> getExpectationResult() {
50+
return expectationResult;
51+
}
4552
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcher.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,11 @@ private PostExecutionControl<P> handleDispatch(ExecutionScope<P> executionScope)
9090
}
9191

9292
Context<P> context =
93-
new DefaultContext<>(executionScope.getRetryInfo(), controller, resourceForExecution, null);
93+
new DefaultContext<>(
94+
executionScope.getRetryInfo(),
95+
controller,
96+
resourceForExecution,
97+
executionScope.getExpectationResult());
9498
if (markedForDeletion) {
9599
return handleCleanup(resourceForExecution, originalResource, context);
96100
} else {

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ void ifExecutionInProgressWaitsUntilItsFinished() {
129129
void schedulesAnEventRetryOnException() {
130130
TestCustomResource customResource = testCustomResource();
131131

132-
ExecutionScope executionScope = new ExecutionScope(null);
132+
ExecutionScope executionScope = new ExecutionScope(null, null);
133133
executionScope.setResource(customResource);
134134
PostExecutionControl postExecutionControl =
135135
PostExecutionControl.exceptionDuringExecution(new RuntimeException("test"));
@@ -271,7 +271,7 @@ void cancelScheduleOnceEventsOnSuccessfulExecution() {
271271
var cr = testCustomResource(crID);
272272

273273
eventProcessor.eventProcessingFinished(
274-
new ExecutionScope(null).setResource(cr), PostExecutionControl.defaultDispatch());
274+
new ExecutionScope(null, null).setResource(cr), PostExecutionControl.defaultDispatch());
275275

276276
verify(retryTimerEventSourceMock, times(1)).cancelOnceSchedule(eq(crID));
277277
}
@@ -300,7 +300,7 @@ void startProcessedMarkedEventReceivedBefore() {
300300
@Test
301301
void notUpdatesEventSourceHandlerIfResourceUpdated() {
302302
TestCustomResource customResource = testCustomResource();
303-
ExecutionScope executionScope = new ExecutionScope(null).setResource(customResource);
303+
ExecutionScope executionScope = new ExecutionScope(null, null).setResource(customResource);
304304
PostExecutionControl postExecutionControl =
305305
PostExecutionControl.customResourceStatusPatched(customResource);
306306

@@ -313,7 +313,7 @@ void notUpdatesEventSourceHandlerIfResourceUpdated() {
313313
void notReschedulesAfterTheFinalizerRemoveProcessed() {
314314
TestCustomResource customResource = testCustomResource();
315315
markForDeletion(customResource);
316-
ExecutionScope executionScope = new ExecutionScope(null).setResource(customResource);
316+
ExecutionScope executionScope = new ExecutionScope(null, null).setResource(customResource);
317317
PostExecutionControl postExecutionControl =
318318
PostExecutionControl.customResourceFinalizerRemoved(customResource);
319319

@@ -326,7 +326,7 @@ void notReschedulesAfterTheFinalizerRemoveProcessed() {
326326
void skipEventProcessingIfFinalizerRemoveProcessed() {
327327
TestCustomResource customResource = testCustomResource();
328328
markForDeletion(customResource);
329-
ExecutionScope executionScope = new ExecutionScope(null).setResource(customResource);
329+
ExecutionScope executionScope = new ExecutionScope(null, null).setResource(customResource);
330330
PostExecutionControl postExecutionControl =
331331
PostExecutionControl.customResourceFinalizerRemoved(customResource);
332332

@@ -343,7 +343,7 @@ void skipEventProcessingIfFinalizerRemoveProcessed() {
343343
void newResourceAfterMissedDeleteEvent() {
344344
TestCustomResource customResource = testCustomResource();
345345
markForDeletion(customResource);
346-
ExecutionScope executionScope = new ExecutionScope(null).setResource(customResource);
346+
ExecutionScope executionScope = new ExecutionScope(null, null).setResource(customResource);
347347
PostExecutionControl postExecutionControl =
348348
PostExecutionControl.customResourceFinalizerRemoved(customResource);
349349
var newResource = testCustomResource();
@@ -379,7 +379,7 @@ void rateLimitsReconciliationSubmission() {
379379
@Test
380380
void schedulesRetryForMarReconciliationInterval() {
381381
TestCustomResource customResource = testCustomResource();
382-
ExecutionScope executionScope = new ExecutionScope(null).setResource(customResource);
382+
ExecutionScope executionScope = new ExecutionScope(null, null).setResource(customResource);
383383
PostExecutionControl postExecutionControl = PostExecutionControl.defaultDispatch();
384384

385385
eventProcessorWithRetry.eventProcessingFinished(executionScope, postExecutionControl);
@@ -401,7 +401,8 @@ void schedulesRetryForMarReconciliationIntervalIfRetryExhausted() {
401401
eventSourceManagerMock,
402402
metricsMock));
403403
eventProcessorWithRetry.start();
404-
ExecutionScope executionScope = new ExecutionScope(null).setResource(testCustomResource());
404+
ExecutionScope executionScope =
405+
new ExecutionScope(null, null).setResource(testCustomResource());
405406
PostExecutionControl postExecutionControl =
406407
PostExecutionControl.exceptionDuringExecution(new RuntimeException());
407408
when(eventProcessorWithRetry.retryEventSource()).thenReturn(retryTimerEventSourceMock);

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcherTest.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,8 @@ public int getAttemptCount() {
404404
public boolean isLastAttempt() {
405405
return true;
406406
}
407-
})
407+
},
408+
null)
408409
.setResource(testCustomResource));
409410

410411
ArgumentCaptor<Context> contextArgumentCaptor = ArgumentCaptor.forClass(Context.class);
@@ -505,7 +506,8 @@ public int getAttemptCount() {
505506
public boolean isLastAttempt() {
506507
return true;
507508
}
508-
})
509+
},
510+
null)
509511
.setResource(testCustomResource));
510512

511513
verify(customResourceFacade, times(1)).patchStatus(eq(testCustomResource), any());
@@ -528,7 +530,7 @@ void callErrorStatusHandlerEvenOnFirstError() {
528530

529531
var postExecControl =
530532
reconciliationDispatcher.handleExecution(
531-
new ExecutionScope(null).setResource(testCustomResource));
533+
new ExecutionScope(null, null).setResource(testCustomResource));
532534
verify(customResourceFacade, times(1)).patchStatus(eq(testCustomResource), any());
533535
verify(reconciler, times(1)).updateErrorStatus(eq(testCustomResource), any(), any());
534536
assertThat(postExecControl.exceptionDuringExecution()).isTrue();
@@ -549,7 +551,7 @@ void errorHandlerCanInstructNoRetryWithUpdate() {
549551

550552
var postExecControl =
551553
reconciliationDispatcher.handleExecution(
552-
new ExecutionScope(null).setResource(testCustomResource));
554+
new ExecutionScope(null, null).setResource(testCustomResource));
553555

554556
verify(reconciler, times(1)).updateErrorStatus(eq(testCustomResource), any(), any());
555557
verify(customResourceFacade, times(1)).patchStatus(eq(testCustomResource), any());
@@ -571,7 +573,7 @@ void errorHandlerCanInstructNoRetryNoUpdate() {
571573

572574
var postExecControl =
573575
reconciliationDispatcher.handleExecution(
574-
new ExecutionScope(null).setResource(testCustomResource));
576+
new ExecutionScope(null, null).setResource(testCustomResource));
575577

576578
verify(reconciler, times(1)).updateErrorStatus(eq(testCustomResource), any(), any());
577579
verify(customResourceFacade, times(0)).patchStatus(eq(testCustomResource), any());
@@ -588,7 +590,7 @@ void errorStatusHandlerCanPatchResource() {
588590
reconciler.errorHandler = () -> ErrorStatusUpdateControl.patchStatus(testCustomResource);
589591

590592
reconciliationDispatcher.handleExecution(
591-
new ExecutionScope(null).setResource(testCustomResource));
593+
new ExecutionScope(null, null).setResource(testCustomResource));
592594

593595
verify(customResourceFacade, times(1)).patchStatus(eq(testCustomResource), any());
594596
verify(reconciler, times(1)).updateErrorStatus(eq(testCustomResource), any(), any());
@@ -611,7 +613,7 @@ void ifRetryLimitedToZeroMaxAttemptsErrorHandlerGetsCorrectLastAttempt() {
611613
reconciler.errorHandler = () -> ErrorStatusUpdateControl.noStatusUpdate();
612614

613615
reconciliationDispatcher.handleExecution(
614-
new ExecutionScope(null).setResource(testCustomResource));
616+
new ExecutionScope(null, null).setResource(testCustomResource));
615617

616618
verify(reconciler, times(1))
617619
.updateErrorStatus(
@@ -675,7 +677,7 @@ void reSchedulesFromErrorHandler() {
675677

676678
var res =
677679
reconciliationDispatcher.handleExecution(
678-
new ExecutionScope(null).setResource(testCustomResource));
680+
new ExecutionScope(null, null).setResource(testCustomResource));
679681

680682
assertThat(res.getReScheduleDelay()).contains(delay);
681683
assertThat(res.getRuntimeException()).isEmpty();
@@ -723,7 +725,7 @@ private void removeFinalizers(CustomResource customResource) {
723725
}
724726

725727
public <T extends HasMetadata> ExecutionScope<T> executionScopeWithCREvent(T resource) {
726-
return (ExecutionScope<T>) new ExecutionScope<>(null).setResource(resource);
728+
return (ExecutionScope<T>) new ExecutionScope<>(null, null).setResource(resource);
727729
}
728730

729731
private class TestReconciler

0 commit comments

Comments
 (0)