Skip to content

Commit d17c2e9

Browse files
committed
[Nexus] Migrate cancellation executors to chasm
1 parent 4b13977 commit d17c2e9

4 files changed

Lines changed: 369 additions & 39 deletions

File tree

chasm/lib/nexusoperation/cancellation.go

Lines changed: 104 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,16 @@
11
package nexusoperation
22

33
import (
4+
"errors"
5+
"time"
6+
7+
"github.com/nexus-rpc/sdk-go/nexus"
8+
commonpb "go.temporal.io/api/common/v1"
9+
failurepb "go.temporal.io/api/failure/v1"
10+
"go.temporal.io/api/serviceerror"
411
"go.temporal.io/server/chasm"
5-
"go.temporal.io/server/chasm/lib/nexusoperation/gen/nexusoperationpb/v1"
12+
nexusoperationpb "go.temporal.io/server/chasm/lib/nexusoperation/gen/nexusoperationpb/v1"
13+
"go.temporal.io/server/common/backoff"
614
)
715

816
var _ chasm.Component = (*Cancellation)(nil)
@@ -14,6 +22,9 @@ type Cancellation struct {
1422

1523
// Persisted internal state
1624
*nexusoperationpb.CancellationState
25+
26+
// Operation is a pointer to the parent Operation component.
27+
Operation chasm.ParentPtr[*Operation]
1728
}
1829

1930
func newCancellation(state *nexusoperationpb.CancellationState) *Cancellation {
@@ -42,3 +53,95 @@ func (o *Cancellation) StateMachineState() nexusoperationpb.CancellationStatus {
4253
func (o *Cancellation) SetStateMachineState(status nexusoperationpb.CancellationStatus) {
4354
o.Status = status
4455
}
56+
57+
// cancelArgs holds the arguments needed to cancel a Nexus operation.
58+
type cancelArgs struct {
59+
service string
60+
operation string
61+
token string
62+
requestID string
63+
endpointName string
64+
endpointID string
65+
currentTime time.Time
66+
scheduledTime time.Time
67+
scheduleToCloseTimeout time.Duration
68+
startToCloseTimeout time.Duration
69+
headers map[string]string
70+
payload *commonpb.Payload
71+
}
72+
73+
// loadCancelArgs is a ReadComponent callback that loads the cancel arguments from the cancellation
74+
// and its parent operation.
75+
func (c *Cancellation) loadCancelArgs(
76+
ctx chasm.Context,
77+
_ chasm.NoValue,
78+
) (cancelArgs, error) {
79+
op := c.Operation.Get(ctx)
80+
81+
store, ok := op.Store.TryGet(ctx)
82+
if !ok {
83+
// TODO: For standalone operations, load invocation data from the operation state.
84+
return cancelArgs{}, serviceerror.NewInternal("no store available to load invocation data")
85+
}
86+
invocationData, err := store.NexusOperationInvocationData(ctx, op)
87+
if err != nil {
88+
return cancelArgs{}, err
89+
}
90+
91+
return cancelArgs{
92+
service: op.GetService(),
93+
operation: op.GetOperation(),
94+
token: op.GetOperationToken(),
95+
requestID: op.GetRequestId(),
96+
endpointName: op.GetEndpoint(),
97+
endpointID: op.GetEndpointId(),
98+
currentTime: ctx.Now(c),
99+
scheduledTime: op.GetScheduledTime().AsTime(),
100+
scheduleToCloseTimeout: op.GetScheduleToCloseTimeout().AsDuration(),
101+
startToCloseTimeout: op.GetStartToCloseTimeout().AsDuration(),
102+
headers: invocationData.Header,
103+
payload: invocationData.Input,
104+
}, nil
105+
}
106+
107+
// saveCancellationResultInput is the input to the Cancellation.applyCancellationResult method.
108+
type saveCancellationResultInput struct {
109+
callErr error
110+
retryPolicy func() backoff.RetryPolicy
111+
}
112+
113+
// applyCancellationResult applies the outcome of a cancel operation call to the cancellation state machine.
114+
func (c *Cancellation) applyCancellationResult(
115+
ctx chasm.MutableContext,
116+
input saveCancellationResultInput,
117+
) (chasm.NoValue, error) {
118+
if input.callErr != nil {
119+
var handlerErr *nexus.HandlerError
120+
var opTimeoutBelowMinErr *operationTimeoutBelowMinError
121+
isRetryable := !errors.As(input.callErr, &opTimeoutBelowMinErr) &&
122+
(!errors.As(input.callErr, &handlerErr) || handlerErr.Retryable())
123+
124+
failure, err := callErrToFailure(input.callErr, isRetryable)
125+
if err != nil {
126+
failure = &failurepb.Failure{Message: input.callErr.Error()}
127+
}
128+
129+
if !isRetryable {
130+
// TODO: Emit EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_FAILED history event
131+
// when AddHistoryEvent is available on MutableContext.
132+
return nil, TransitionCancellationFailed.Apply(c, ctx, EventCancellationFailed{})
133+
}
134+
135+
return nil, transitionCancellationAttemptFailed.Apply(c, ctx, EventCancellationAttemptFailed{
136+
Failure: failure,
137+
RetryPolicy: input.retryPolicy(),
138+
})
139+
}
140+
141+
// Cancellation request transmitted successfully.
142+
// The operation is not yet canceled and may ignore our request, the outcome will be known via the
143+
// completion callback.
144+
// TODO: Emit EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_COMPLETED history event
145+
// when AddHistoryEvent is available on MutableContext.
146+
return nil, TransitionCancellationSucceeded.Apply(c, ctx, EventCancellationSucceeded{})
147+
}

0 commit comments

Comments
 (0)