Skip to content

Commit aec1985

Browse files
committed
[Nexus] Migrate cancellation executors to chasm
1 parent 312e79a commit aec1985

4 files changed

Lines changed: 363 additions & 39 deletions

File tree

chasm/lib/nexusoperation/cancellation.go

Lines changed: 98 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,15 @@
11
package nexusoperation
22

33
import (
4+
"errors"
5+
"time"
6+
7+
"github.com/nexus-rpc/sdk-go/nexus"
8+
commonpb "go.temporal.io/api/common/v1"
9+
failurepb "go.temporal.io/api/failure/v1"
410
"go.temporal.io/server/chasm"
5-
"go.temporal.io/server/chasm/lib/nexusoperation/gen/nexusoperationpb/v1"
11+
nexusoperationpb "go.temporal.io/server/chasm/lib/nexusoperation/gen/nexusoperationpb/v1"
12+
"go.temporal.io/server/common/backoff"
613
)
714

815
var _ chasm.Component = (*Cancellation)(nil)
@@ -14,6 +21,9 @@ type Cancellation struct {
1421

1522
// Persisted internal state
1623
*nexusoperationpb.CancellationState
24+
25+
// Operation is a pointer to the parent Operation component.
26+
Operation chasm.ParentPtr[*Operation]
1727
}
1828

1929
func newCancellation(state *nexusoperationpb.CancellationState) *Cancellation {
@@ -42,3 +52,90 @@ func (o *Cancellation) StateMachineState() nexusoperationpb.CancellationStatus {
4252
func (o *Cancellation) SetStateMachineState(status nexusoperationpb.CancellationStatus) {
4353
o.Status = status
4454
}
55+
56+
// cancelArgs holds the arguments needed to cancel a Nexus operation.
57+
type cancelArgs struct {
58+
service string
59+
operation string
60+
token string
61+
requestID string
62+
endpointName string
63+
endpointID string
64+
currentTime time.Time
65+
scheduledTime time.Time
66+
scheduleToCloseTimeout time.Duration
67+
startToCloseTimeout time.Duration
68+
headers map[string]string
69+
payload *commonpb.Payload
70+
}
71+
72+
// loadCancelArgs is a ReadComponent callback that loads the cancel arguments from the cancellation
73+
// and its parent operation.
74+
func (c *Cancellation) loadCancelArgs(
75+
ctx chasm.Context,
76+
_ chasm.NoValue,
77+
) (cancelArgs, error) {
78+
op := c.Operation.Get(ctx)
79+
80+
invocationData, err := op.GetInvocationData(ctx)
81+
if err != nil {
82+
return cancelArgs{}, err
83+
}
84+
85+
return cancelArgs{
86+
service: op.GetService(),
87+
operation: op.GetOperation(),
88+
token: op.GetOperationToken(),
89+
requestID: op.GetRequestId(),
90+
endpointName: op.GetEndpoint(),
91+
endpointID: op.GetEndpointId(),
92+
currentTime: ctx.Now(c),
93+
scheduledTime: op.GetScheduledTime().AsTime(),
94+
scheduleToCloseTimeout: op.GetScheduleToCloseTimeout().AsDuration(),
95+
startToCloseTimeout: op.GetStartToCloseTimeout().AsDuration(),
96+
headers: invocationData.Header,
97+
payload: invocationData.Input,
98+
}, nil
99+
}
100+
101+
// saveCancellationResultInput is the input to the Cancellation.applyCancellationResult method.
102+
type saveCancellationResultInput struct {
103+
callErr error
104+
retryPolicy func() backoff.RetryPolicy
105+
}
106+
107+
// applyCancellationResult applies the outcome of a cancel operation call to the cancellation state machine.
108+
func (c *Cancellation) applyCancellationResult(
109+
ctx chasm.MutableContext,
110+
input saveCancellationResultInput,
111+
) (chasm.NoValue, error) {
112+
if input.callErr != nil {
113+
var handlerErr *nexus.HandlerError
114+
var opTimeoutBelowMinErr *operationTimeoutBelowMinError
115+
isRetryable := !errors.As(input.callErr, &opTimeoutBelowMinErr) &&
116+
(!errors.As(input.callErr, &handlerErr) || handlerErr.Retryable())
117+
118+
failure, err := callErrToFailure(input.callErr, isRetryable)
119+
if err != nil {
120+
failure = &failurepb.Failure{Message: input.callErr.Error()}
121+
}
122+
123+
if !isRetryable {
124+
// TODO: Emit EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_FAILED history event
125+
// when AddHistoryEvent is available on MutableContext.
126+
return nil, TransitionCancellationFailed.Apply(c, ctx, EventCancellationFailed{})
127+
}
128+
129+
return nil, transitionCancellationAttemptFailed.Apply(c, ctx, EventCancellationAttemptFailed{
130+
Failure: failure,
131+
RetryPolicy: input.retryPolicy(),
132+
})
133+
}
134+
135+
// Cancellation request transmitted successfully.
136+
// The operation is not yet canceled and may ignore our request, the outcome will be known via the
137+
// completion callback.
138+
// TODO: Emit EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_COMPLETED history event
139+
// when AddHistoryEvent is available on MutableContext.
140+
return nil, TransitionCancellationSucceeded.Apply(c, ctx, EventCancellationSucceeded{})
141+
}

0 commit comments

Comments
 (0)