|
2 | 2 | // SPDX-License-Identifier: Apache-2.0 |
3 | 3 | package software.amazon.lambda.durable.operation; |
4 | 4 |
|
| 5 | +import static software.amazon.lambda.durable.execution.ExecutionManager.isTerminalStatus; |
| 6 | + |
5 | 7 | import java.nio.charset.StandardCharsets; |
6 | 8 | import java.util.concurrent.CompletableFuture; |
7 | 9 | import java.util.concurrent.ExecutorService; |
@@ -203,37 +205,48 @@ public T get() { |
203 | 205 | } |
204 | 206 |
|
205 | 207 | // throw a general failed exception if a user exception is not reconstructed |
206 | | - switch (subType) { |
207 | | - case WAIT_FOR_CALLBACK: { |
208 | | - var childrenOps = getChildOperations(op.id()); |
209 | | - var callbackOp = childrenOps.stream() |
210 | | - .filter(o -> o.type() == OperationType.CALLBACK) |
211 | | - .findFirst() |
212 | | - .orElse(null); |
213 | | - var stepOp = childrenOps.stream() |
214 | | - .filter(o -> o.type() == OperationType.STEP) |
215 | | - .findFirst() |
216 | | - .orElse(null); |
217 | | - if (callbackOp != null) { |
218 | | - switch (callbackOp.status()) { |
219 | | - case FAILED -> throw new CallbackFailedException(callbackOp); |
220 | | - case TIMED_OUT -> throw new CallbackTimeoutException(callbackOp); |
221 | | - default -> throw new ChildContextFailedException(op); |
222 | | - } |
223 | | - } |
224 | | - if (stepOp != null) { |
225 | | - var stepError = stepOp.stepDetails().error(); |
226 | | - if (StepInterruptedException.isStepInterruptedException(stepError)) { |
227 | | - throw new CallbackSubmitterException(callbackOp, new StepInterruptedException(stepOp)); |
228 | | - } else { |
229 | | - throw new CallbackSubmitterException(callbackOp, new StepFailedException(stepOp)); |
230 | | - } |
231 | | - } |
| 208 | + return switch (subType) { |
| 209 | + case WAIT_FOR_CALLBACK -> handleWaitForCallbackFailure(op); |
| 210 | + // todo: handle MAP/PARALLEL |
| 211 | + case MAP -> throw new ChildContextFailedException(op); |
| 212 | + case PARALLEL -> throw new ChildContextFailedException(op); |
| 213 | + case RUN_IN_CHILD_CONTEXT -> throw new ChildContextFailedException(op); |
| 214 | + }; |
| 215 | + } |
| 216 | + } |
| 217 | + |
| 218 | + private T handleWaitForCallbackFailure(Operation op) { |
| 219 | + var childrenOps = getChildOperations(op.id()); |
| 220 | + var callbackOp = childrenOps.stream() |
| 221 | + .filter(o -> o.type() == OperationType.CALLBACK) |
| 222 | + .findFirst() |
| 223 | + .orElse(null); |
| 224 | + var submitterOp = childrenOps.stream() |
| 225 | + .filter(o -> o.type() == OperationType.STEP) |
| 226 | + .findFirst() |
| 227 | + .orElse(null); |
| 228 | + if (callbackOp != null) { |
| 229 | + // if callback failed |
| 230 | + if (isTerminalStatus(callbackOp.status())) { |
| 231 | + switch (callbackOp.status()) { |
| 232 | + case FAILED -> throw new CallbackFailedException(callbackOp); |
| 233 | + case TIMED_OUT -> throw new CallbackTimeoutException(callbackOp); |
| 234 | + } |
| 235 | + } |
| 236 | + |
| 237 | + // if submitter failed |
| 238 | + if (submitterOp != null |
| 239 | + && isTerminalStatus(submitterOp.status()) |
| 240 | + && submitterOp.status() != OperationStatus.SUCCEEDED) { |
| 241 | + var stepError = submitterOp.stepDetails().error(); |
| 242 | + if (StepInterruptedException.isStepInterruptedException(stepError)) { |
| 243 | + throw new CallbackSubmitterException(callbackOp, new StepInterruptedException(submitterOp)); |
| 244 | + } else { |
| 245 | + throw new CallbackSubmitterException(callbackOp, new StepFailedException(submitterOp)); |
232 | 246 | } |
233 | | - // todo: add cases for MAP/PARALLEL |
234 | | - default: |
235 | | - throw new ChildContextFailedException(op); |
236 | 247 | } |
237 | 248 | } |
| 249 | + |
| 250 | + throw new IllegalStateException("Unknown waitForCallback status"); |
238 | 251 | } |
239 | 252 | } |
0 commit comments