Skip to content

Commit 291cb26

Browse files
authored
[Nexus-Chasm] Migrate OperationInvocationTaskExecutor (#9680)
## What changed? This PR migrates the Nexus operation invocation task handler from HSM version to Chasm. ## Why? Migrating from HSM to Chasm ## How did you test it? - [x] built - [ ] run locally and tested manually - [ ] covered by existing tests - [x] added new unit test(s) - [ ] added new functional test(s) <!-- CURSOR_SUMMARY --> --- > [!NOTE] > **Medium Risk** > Introduces a new CHASM-based Nexus `StartOperation` execution path with endpoint lookup, callback URL/token generation, and error classification; mistakes could cause failed invocations, incorrect retries/timeouts, or misrouted callbacks. Risk is mitigated somewhat by strict task validation and added unit coverage, but the change touches critical workflow/history integration and outbound request handling. > > **Overview** > Migrates Nexus operation invocation execution to CHASM by implementing `OperationInvocationTaskHandler.Validate/Execute` end-to-end, including endpoint resolution (ID with name fallback), callback URL selection (system vs templated), callback token generation, timeout budgeting, outbound StartOperation calls (HTTP or internal history service), metrics/logging, and classification of results into operation state transitions. > > Adds supporting plumbing: `OperationStore.NexusOperationInvocationData` and workflow implementation that loads invocation input/headers from the scheduled history event, plus a new `MSPointer.LoadHistoryEvent`/`NodeBackend.LoadHistoryEvent` API. Configuration is extended to parse `CallbackURLTemplate` into a `*template.Template`, add `UseSystemCallbackURL`, and pass `NumHistoryShards` for internal routing; new helper utilities centralize callback building, error/failure conversion, and internal/HTTP start logic. > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit 4b13977. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent 5c6bbd9 commit 291cb26

9 files changed

Lines changed: 1035 additions & 24 deletions

File tree

chasm/lib/nexusoperation/config.go

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@ package nexusoperation
22

33
import (
44
"strings"
5+
"text/template"
56
"time"
67

78
"go.temporal.io/server/common/backoff"
9+
"go.temporal.io/server/common/config"
810
"go.temporal.io/server/common/dynamicconfig"
911
"go.temporal.io/server/common/headers"
1012
"go.temporal.io/server/common/rpc/interceptor"
@@ -107,9 +109,19 @@ var MaxOperationScheduleToCloseTimeout = dynamicconfig.NewNamespaceDurationSetti
107109
or a longer timeout than permitted will have their schedule-to-close timeout capped to this value. 0 implies no limit.`,
108110
)
109111

110-
var CallbackURLTemplate = dynamicconfig.NewGlobalStringSetting(
112+
var CallbackURLTemplate = dynamicconfig.NewGlobalTypedSettingWithConverter(
111113
"nexusoperation.callback.endpoint.template",
112-
"unset",
114+
func(in any) (*template.Template, error) {
115+
s, err := dynamicconfig.ConvertStructure[string]("")(in)
116+
if err != nil {
117+
return nil, err
118+
}
119+
if s == "unset" {
120+
return nil, nil
121+
}
122+
return template.New("NexusCallbackURL").Parse(s)
123+
},
124+
nil,
113125
`Controls the template for generating callback URLs included in Nexus operation requests, which are used to deliver
114126
asynchronous completion for external endpoint targets. The template can be used to interpolate the {{.NamepaceName}}
115127
and {{.NamespaceID}} parameters to construct a publicly accessible URL.
@@ -141,6 +153,13 @@ Adding high-cardinality tags (like unique operation names) can significantly inc
141153
query complexity. Consider the cardinality impact when enabling these tags.`,
142154
)
143155

156+
var UseSystemCallbackURL = dynamicconfig.NewGlobalBoolSetting(
157+
"nexusoperation.useSystemCallbackURL",
158+
true,
159+
`Controls how the executor generates callback URLs for worker targets in Nexus Operations.
160+
When true, uses the fixed system callback URL for all worker targets.`,
161+
)
162+
144163
var UseNewFailureWireFormat = dynamicconfig.NewNamespaceBoolSetting(
145164
"nexusoperation.useNewFailureWireFormat",
146165
true,
@@ -151,6 +170,7 @@ Added for safety. Defaults to true. Likely to be removed in future server versio
151170
type Config struct {
152171
ChasmEnabled dynamicconfig.BoolPropertyFnWithNamespaceFilter
153172
ChasmNexusEnabled dynamicconfig.BoolPropertyFnWithNamespaceFilter
173+
NumHistoryShards int32
154174
RequestTimeout dynamicconfig.DurationPropertyFnWithDestinationFilter
155175
MinRequestTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter
156176
MaxConcurrentOperations dynamicconfig.IntPropertyFnWithNamespaceFilter
@@ -161,16 +181,18 @@ type Config struct {
161181
DisallowedOperationHeaders dynamicconfig.TypedPropertyFn[[]string]
162182
MaxOperationScheduleToCloseTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter
163183
PayloadSizeLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
164-
CallbackURLTemplate dynamicconfig.StringPropertyFn
184+
CallbackURLTemplate dynamicconfig.TypedPropertyFn[*template.Template]
185+
UseSystemCallbackURL dynamicconfig.BoolPropertyFn
165186
UseNewFailureWireFormat dynamicconfig.BoolPropertyFnWithNamespaceFilter
166187
RecordCancelRequestCompletionEvents dynamicconfig.BoolPropertyFn
167188
RetryPolicy func() backoff.RetryPolicy
168189
}
169190

170-
func configProvider(dc *dynamicconfig.Collection) *Config {
191+
func configProvider(dc *dynamicconfig.Collection, cfg *config.Persistence) *Config {
171192
return &Config{
172193
ChasmEnabled: dynamicconfig.EnableChasm.Get(dc),
173194
ChasmNexusEnabled: ChasmNexusEnabled.Get(dc),
195+
NumHistoryShards: cfg.NumHistoryShards,
174196
RequestTimeout: RequestTimeout.Get(dc),
175197
MinRequestTimeout: MinRequestTimeout.Get(dc),
176198
MaxConcurrentOperations: MaxConcurrentOperations.Get(dc),
@@ -181,8 +203,9 @@ func configProvider(dc *dynamicconfig.Collection) *Config {
181203
DisallowedOperationHeaders: DisallowedOperationHeaders.Get(dc),
182204
MaxOperationScheduleToCloseTimeout: MaxOperationScheduleToCloseTimeout.Get(dc),
183205
PayloadSizeLimit: dynamicconfig.BlobSizeLimitError.Get(dc),
184-
UseNewFailureWireFormat: UseNewFailureWireFormat.Get(dc),
185206
CallbackURLTemplate: CallbackURLTemplate.Get(dc),
207+
UseSystemCallbackURL: UseSystemCallbackURL.Get(dc),
208+
UseNewFailureWireFormat: UseNewFailureWireFormat.Get(dc),
186209
RetryPolicy: func() backoff.RetryPolicy {
187210
return backoff.NewExponentialRetryPolicy(
188211
RetryPolicyInitialInterval.Get(dc)(),

chasm/lib/nexusoperation/operation.go

Lines changed: 95 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
package nexusoperation
22

33
import (
4+
"fmt"
5+
6+
"github.com/nexus-rpc/sdk-go/nexus"
47
commonpb "go.temporal.io/api/common/v1"
58
failurepb "go.temporal.io/api/failure/v1"
69
"go.temporal.io/api/serviceerror"
710
"go.temporal.io/server/chasm"
811
nexusoperationpb "go.temporal.io/server/chasm/lib/nexusoperation/gen/nexusoperationpb/v1"
12+
queueserrors "go.temporal.io/server/service/history/queues/errors"
913
"google.golang.org/protobuf/types/known/anypb"
1014
)
1115

@@ -18,6 +22,16 @@ var ErrCancellationAlreadyRequested = serviceerror.NewFailedPrecondition("cancel
1822
// ErrOperationAlreadyCompleted is returned when trying to cancel an operation that has already completed.
1923
var ErrOperationAlreadyCompleted = serviceerror.NewFailedPrecondition("operation already completed")
2024

25+
// InvocationData contains data needed to invoke a Nexus operation.
26+
type InvocationData struct {
27+
// Input is the operation input payload.
28+
Input *commonpb.Payload
29+
// Header contains the Nexus headers for the operation.
30+
Header map[string]string
31+
// NexusLink is the link to the caller that scheduled this operation.
32+
NexusLink nexus.Link
33+
}
34+
2135
// OperationStore defines the interface that must be implemented by any parent component that wants to manage Nexus operations.
2236
// It's the responsibility of the parrent component to apply the appropriate state transitions to the operation.
2337
type OperationStore interface {
@@ -26,6 +40,8 @@ type OperationStore interface {
2640
OnNexusOperationFailed(ctx chasm.MutableContext, operation *Operation, cause *failurepb.Failure) error
2741
OnNexusOperationTimedOut(ctx chasm.MutableContext, operation *Operation, cause *failurepb.Failure) error
2842
OnNexusOperationCompleted(ctx chasm.MutableContext, operation *Operation, result *commonpb.Payload, links []*commonpb.Link) error
43+
// NexusOperationInvocationData loads invocation data (Input, Header, NexusLink) from the scheduled history event.
44+
NexusOperationInvocationData(ctx chasm.Context, operation *Operation) (InvocationData, error)
2945
}
3046

3147
// Operation is a CHASM component that represents a Nexus operation.
@@ -98,7 +114,7 @@ func (o *Operation) Cancel(ctx chasm.MutableContext, parentData *anypb.Any) erro
98114
}
99115

100116
// OnStarted applies the started transition or delegates to the store if one is present.
101-
func (o *Operation) OnStarted(ctx chasm.MutableContext, _ *Operation, operationToken string, links []*commonpb.Link) error {
117+
func (o *Operation) OnStarted(ctx chasm.MutableContext, operationToken string, links []*commonpb.Link) error {
102118
store, ok := o.Store.TryGet(ctx)
103119
if ok {
104120
return store.OnNexusOperationStarted(ctx, o, operationToken, links)
@@ -109,7 +125,7 @@ func (o *Operation) OnStarted(ctx chasm.MutableContext, _ *Operation, operationT
109125
}
110126

111127
// OnCompleted applies the succeeded transition or delegates to the store if one is present.
112-
func (o *Operation) OnCompleted(ctx chasm.MutableContext, _ *Operation, result *commonpb.Payload, links []*commonpb.Link) error {
128+
func (o *Operation) OnCompleted(ctx chasm.MutableContext, result *commonpb.Payload, links []*commonpb.Link) error {
113129
store, ok := o.Store.TryGet(ctx)
114130
if ok {
115131
return store.OnNexusOperationCompleted(ctx, o, result, links)
@@ -118,7 +134,7 @@ func (o *Operation) OnCompleted(ctx chasm.MutableContext, _ *Operation, result *
118134
}
119135

120136
// OnFailed applies the failed transition or delegates to the store if one is present.
121-
func (o *Operation) OnFailed(ctx chasm.MutableContext, _ *Operation, cause *failurepb.Failure) error {
137+
func (o *Operation) OnFailed(ctx chasm.MutableContext, cause *failurepb.Failure) error {
122138
store, ok := o.Store.TryGet(ctx)
123139
if ok {
124140
return store.OnNexusOperationFailed(ctx, o, cause)
@@ -127,7 +143,7 @@ func (o *Operation) OnFailed(ctx chasm.MutableContext, _ *Operation, cause *fail
127143
}
128144

129145
// OnCancelled applies the canceled transition or delegates to the store if one is present.
130-
func (o *Operation) OnCancelled(ctx chasm.MutableContext, _ *Operation, cause *failurepb.Failure) error {
146+
func (o *Operation) OnCancelled(ctx chasm.MutableContext, cause *failurepb.Failure) error {
131147
store, ok := o.Store.TryGet(ctx)
132148
if ok {
133149
return store.OnNexusOperationCancelled(ctx, o, cause)
@@ -136,10 +152,84 @@ func (o *Operation) OnCancelled(ctx chasm.MutableContext, _ *Operation, cause *f
136152
}
137153

138154
// OnTimedOut applies the timed out transition or delegates to the store if one is present.
139-
func (o *Operation) OnTimedOut(ctx chasm.MutableContext, _ *Operation, cause *failurepb.Failure) error {
155+
func (o *Operation) OnTimedOut(ctx chasm.MutableContext, cause *failurepb.Failure) error {
140156
store, ok := o.Store.TryGet(ctx)
141157
if ok {
142158
return store.OnNexusOperationTimedOut(ctx, o, cause)
143159
}
144160
return TransitionTimedOut.Apply(o, ctx, EventTimedOut{})
145161
}
162+
163+
// loadStartArgs is a ReadComponent callback that loads the start arguments from the operation.
164+
func (o *Operation) loadStartArgs(
165+
ctx chasm.Context,
166+
_ chasm.NoValue,
167+
) (startArgs, error) {
168+
store, ok := o.Store.TryGet(ctx)
169+
if !ok {
170+
// TODO: For standalone operations, load invocation data from the operation state.
171+
return startArgs{}, serviceerror.NewInternal("no store available to load invocation data")
172+
}
173+
invocationData, err := store.NexusOperationInvocationData(ctx, o)
174+
if err != nil {
175+
return startArgs{}, err
176+
}
177+
178+
serializedRef, err := ctx.Ref(o)
179+
if err != nil {
180+
return startArgs{}, err
181+
}
182+
183+
return startArgs{
184+
endpointName: o.GetEndpoint(),
185+
endpointID: o.GetEndpointId(),
186+
service: o.GetService(),
187+
operation: o.GetOperation(),
188+
requestID: o.GetRequestId(),
189+
currentTime: ctx.Now(o),
190+
scheduledTime: o.GetScheduledTime().AsTime(),
191+
scheduleToCloseTimeout: o.GetScheduleToCloseTimeout().AsDuration(),
192+
scheduleToStartTimeout: o.GetScheduleToStartTimeout().AsDuration(),
193+
startToCloseTimeout: o.GetStartToCloseTimeout().AsDuration(),
194+
payload: invocationData.Input,
195+
header: invocationData.Header,
196+
nexusLink: invocationData.NexusLink,
197+
serializedRef: serializedRef,
198+
}, nil
199+
}
200+
201+
// saveResult is an UpdateComponent callback that saves the invocation outcome.
202+
func (o *Operation) saveResult(
203+
ctx chasm.MutableContext,
204+
input saveResultInput,
205+
) (chasm.NoValue, error) {
206+
switch r := input.result.(type) {
207+
case invocationResultOK:
208+
if r.response.Pending != nil {
209+
return nil, o.OnStarted(ctx, r.response.Pending.Token, r.links)
210+
}
211+
return nil, o.OnCompleted(ctx, r.response.Successful, r.links)
212+
case invocationResultFail:
213+
return nil, o.OnFailed(ctx, r.failure)
214+
case invocationResultCanceled:
215+
return nil, o.OnCancelled(ctx, r.failure)
216+
case invocationResultRetry:
217+
return nil, transitionAttemptFailed.Apply(o, ctx, EventAttemptFailed{
218+
Failure: r.failure,
219+
RetryPolicy: input.retryPolicy(),
220+
})
221+
case invocationResultTimeout:
222+
return nil, o.OnTimedOut(ctx, &failurepb.Failure{
223+
Message: "operation timed out",
224+
FailureInfo: &failurepb.Failure_TimeoutFailureInfo{
225+
TimeoutFailureInfo: &failurepb.TimeoutFailureInfo{
226+
TimeoutType: r.timeoutType,
227+
},
228+
},
229+
})
230+
default:
231+
return nil, queueserrors.NewUnprocessableTaskError(
232+
fmt.Sprintf("unrecognized invocation result %T", input.result),
233+
)
234+
}
235+
}

0 commit comments

Comments
 (0)