|
20 | 20 | import software.amazon.awssdk.services.lambda.model.OperationStatus; |
21 | 21 | import software.amazon.awssdk.services.lambda.model.OperationUpdate; |
22 | 22 | import software.amazon.lambda.durable.DurableConfig; |
23 | | -import software.amazon.lambda.durable.exception.IllegalDurableOperationException; |
24 | 23 | import software.amazon.lambda.durable.exception.UnrecoverableDurableExecutionException; |
25 | 24 | import software.amazon.lambda.durable.model.DurableExecutionInput; |
26 | 25 | import software.amazon.lambda.durable.operation.BaseDurableOperation; |
@@ -231,14 +230,16 @@ public void deregisterActiveThread(String threadId) { |
231 | 230 | } |
232 | 231 |
|
233 | 232 | private void preSuspendCheck() { |
234 | | - operationStorage.values().stream() |
235 | | - .filter(op -> !isTerminalStatus(op.status())) |
236 | | - .findFirst() |
237 | | - .ifPresentOrElse(op -> logger.debug("Found waiting operations"), () -> { |
238 | | - logger.warn("Invalid suspension. No operation is in progress"); |
239 | | - terminateExecution(new IllegalDurableOperationException( |
240 | | - "Cannot suspend execution without an operation in progress")); |
241 | | - }); |
| 233 | + if (operationStorage.values().stream().anyMatch(o -> switch (o.type()) { |
| 234 | + case STEP -> o.status() == OperationStatus.PENDING; |
| 235 | + case WAIT, CALLBACK -> o.status() == OperationStatus.STARTED; |
| 236 | + case CHAINED_INVOKE -> o.status() == OperationStatus.PENDING || o.status() == OperationStatus.STARTED; |
| 237 | + default -> false; |
| 238 | + })) { |
| 239 | + logger.debug("Found pending operations. Good to suspend now."); |
| 240 | + } else { |
| 241 | + logger.warn("Invalid suspension. No operation is in progress"); |
| 242 | + } |
242 | 243 | } |
243 | 244 |
|
244 | 245 | // ===== Checkpointing ===== |
|
0 commit comments