|
13 | 13 | import io.temporal.api.command.v1.*; |
14 | 14 | import io.temporal.api.common.v1.*; |
15 | 15 | import io.temporal.api.enums.v1.EventType; |
| 16 | +import io.temporal.api.failure.v1.CanceledFailureInfo; |
16 | 17 | import io.temporal.api.failure.v1.Failure; |
17 | 18 | import io.temporal.api.history.v1.*; |
18 | 19 | import io.temporal.api.protocol.v1.Message; |
|
29 | 30 | import io.temporal.worker.WorkflowImplementationOptions; |
30 | 31 | import io.temporal.workflow.ChildWorkflowCancellationType; |
31 | 32 | import io.temporal.workflow.Functions; |
| 33 | +import io.temporal.workflow.NexusOperationCancellationType; |
32 | 34 | import java.nio.charset.StandardCharsets; |
33 | 35 | import java.util.*; |
34 | 36 | import javax.annotation.Nonnull; |
@@ -985,32 +987,71 @@ public Functions.Proc startChildWorkflow( |
985 | 987 | } |
986 | 988 |
|
987 | 989 | public Functions.Proc startNexusOperation( |
988 | | - ScheduleNexusOperationCommandAttributes attributes, |
989 | | - @Nullable UserMetadata metadata, |
| 990 | + StartNexusOperationParameters parameters, |
990 | 991 | Functions.Proc2<Optional<String>, Failure> startedCallback, |
991 | 992 | Functions.Proc2<Optional<Payload>, Failure> completionCallback) { |
992 | 993 | checkEventLoopExecuting(); |
| 994 | + NexusOperationCancellationType cancellationType = parameters.getCancellationType(); |
993 | 995 | NexusOperationStateMachine operation = |
994 | 996 | NexusOperationStateMachine.newInstance( |
995 | | - attributes, |
996 | | - metadata, |
| 997 | + parameters.getAttributes().build(), |
| 998 | + parameters.getMetadata(), |
997 | 999 | startedCallback, |
998 | 1000 | completionCallback, |
999 | 1001 | commandSink, |
1000 | 1002 | stateMachineSink); |
1001 | 1003 | return () -> { |
| 1004 | + if (cancellationType == NexusOperationCancellationType.ABANDON) { |
| 1005 | + notifyNexusOperationCanceled(operation, startedCallback, completionCallback); |
| 1006 | + eventLoop(); |
| 1007 | + return; |
| 1008 | + } |
1002 | 1009 | if (operation.isCancellable()) { |
1003 | 1010 | operation.cancel(); |
| 1011 | + return; |
1004 | 1012 | } |
1005 | 1013 | if (!operation.isFinalState()) { |
1006 | 1014 | requestCancelNexusOperation( |
1007 | 1015 | RequestCancelNexusOperationCommandAttributes.newBuilder() |
1008 | 1016 | .setScheduledEventId(operation.getInitialCommandEventId()) |
1009 | | - .build()); |
| 1017 | + .build(), |
| 1018 | + (r, f) -> { |
| 1019 | + if (cancellationType == NexusOperationCancellationType.WAIT_REQUESTED) { |
| 1020 | + notifyNexusOperationCanceled(f, operation, startedCallback, completionCallback); |
| 1021 | + } |
| 1022 | + }); |
| 1023 | + if (cancellationType == NexusOperationCancellationType.TRY_CANCEL) { |
| 1024 | + notifyNexusOperationCanceled(operation, startedCallback, completionCallback); |
| 1025 | + eventLoop(); |
| 1026 | + } |
1010 | 1027 | } |
1011 | 1028 | }; |
1012 | 1029 | } |
1013 | 1030 |
|
| 1031 | + private void notifyNexusOperationCanceled( |
| 1032 | + NexusOperationStateMachine operation, |
| 1033 | + Functions.Proc2<Optional<String>, Failure> startedCallback, |
| 1034 | + Functions.Proc2<Optional<Payload>, Failure> completionCallback) { |
| 1035 | + Failure cause = |
| 1036 | + Failure.newBuilder() |
| 1037 | + .setMessage("operation canceled") |
| 1038 | + .setCanceledFailureInfo(CanceledFailureInfo.getDefaultInstance()) |
| 1039 | + .build(); |
| 1040 | + notifyNexusOperationCanceled(cause, operation, startedCallback, completionCallback); |
| 1041 | + } |
| 1042 | + |
| 1043 | + private void notifyNexusOperationCanceled( |
| 1044 | + Failure cause, |
| 1045 | + NexusOperationStateMachine operation, |
| 1046 | + Functions.Proc2<Optional<String>, Failure> startedCallback, |
| 1047 | + Functions.Proc2<Optional<Payload>, Failure> completionCallback) { |
| 1048 | + Failure failure = operation.createCancelNexusOperationFailure(cause); |
| 1049 | + if (!operation.isAsync()) { |
| 1050 | + startedCallback.apply(Optional.empty(), failure); |
| 1051 | + } |
| 1052 | + completionCallback.apply(Optional.empty(), failure); |
| 1053 | + } |
| 1054 | + |
1014 | 1055 | private void notifyChildCanceled( |
1015 | 1056 | Functions.Proc2<Optional<Payloads>, Exception> completionCallback) { |
1016 | 1057 | CanceledFailure failure = new CanceledFailure("Child canceled"); |
@@ -1044,10 +1085,15 @@ public void requestCancelExternalWorkflowExecution( |
1044 | 1085 |
|
1045 | 1086 | /** |
1046 | 1087 | * @param attributes attributes to use to cancel a nexus operation |
| 1088 | + * @param completionCallback one of NexusOperationCancelRequestCompleted or |
| 1089 | + * NexusOperationCancelRequestFailed events |
1047 | 1090 | */ |
1048 | | - public void requestCancelNexusOperation(RequestCancelNexusOperationCommandAttributes attributes) { |
| 1091 | + public void requestCancelNexusOperation( |
| 1092 | + RequestCancelNexusOperationCommandAttributes attributes, |
| 1093 | + Functions.Proc2<Void, Failure> completionCallback) { |
1049 | 1094 | checkEventLoopExecuting(); |
1050 | | - CancelNexusOperationStateMachine.newInstance(attributes, commandSink, stateMachineSink); |
| 1095 | + CancelNexusOperationStateMachine.newInstance( |
| 1096 | + attributes, completionCallback, commandSink, stateMachineSink); |
1051 | 1097 | } |
1052 | 1098 |
|
1053 | 1099 | public void upsertSearchAttributes(SearchAttributes attributes) { |
@@ -1540,6 +1586,12 @@ private OptionalLong getInitialCommandEventId(HistoryEvent event) { |
1540 | 1586 | case EVENT_TYPE_NEXUS_OPERATION_TIMED_OUT: |
1541 | 1587 | return OptionalLong.of( |
1542 | 1588 | event.getNexusOperationTimedOutEventAttributes().getScheduledEventId()); |
| 1589 | + case EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_COMPLETED: |
| 1590 | + return OptionalLong.of( |
| 1591 | + event.getNexusOperationCancelRequestCompletedEventAttributes().getRequestedEventId()); |
| 1592 | + case EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_FAILED: |
| 1593 | + return OptionalLong.of( |
| 1594 | + event.getNexusOperationCancelRequestFailedEventAttributes().getRequestedEventId()); |
1543 | 1595 | case EVENT_TYPE_ACTIVITY_TASK_SCHEDULED: |
1544 | 1596 | case EVENT_TYPE_TIMER_STARTED: |
1545 | 1597 | case EVENT_TYPE_MARKER_RECORDED: |
|
0 commit comments