Skip to content

Commit 634fa80

Browse files
committed
Nexus chasm cleanup (#9843)
## What changed? - [[Nexus-Chasm] Improve nexus operation dynamic config](59a7068) - [[Nexus-Chasm] Improve operation state machine and task handlers](c6f1fa8) - [[Nexus-Chasm] Refactor workflow registry and nexus workflow integration](45d0cfc) ## Why? - Cleanup - Improved test coverage - Bug fixes
1 parent 63c4490 commit 634fa80

31 files changed

Lines changed: 1379 additions & 1269 deletions

chasm/lib/buf.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ breaking:
1010
- chasm/lib/scheduler/proto/v1/message.proto
1111
# TODO seankane: remove after merging #8499
1212
- chasm/lib/callback/proto/v1/tasks.proto
13+
# TODO bergundy: remove after nexus-chasm-cleanup merges
14+
- chasm/lib/nexusoperation/proto/v1/operation.proto
15+
- chasm/lib/nexusoperation/proto/v1/tasks.proto
1316
lint:
1417
use:
1518
- DEFAULT

chasm/lib/nexusoperation/cancellation_statemachine.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@ var TransitionCancellationScheduled = chasm.NewTransition(
1717
[]nexusoperationpb.CancellationStatus{nexusoperationpb.CANCELLATION_STATUS_UNSPECIFIED},
1818
nexusoperationpb.CANCELLATION_STATUS_SCHEDULED,
1919
func(c *Cancellation, ctx chasm.MutableContext, event EventCancellationScheduled) error {
20-
c.RequestedTime = timestamppb.New(ctx.Now(c))
21-
2220
ctx.AddTask(c, chasm.TaskAttributes{}, &nexusoperationpb.CancellationTask{
2321
Attempt: c.Attempt,
2422
})

chasm/lib/nexusoperation/cancellation_tasks.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
"go.uber.org/fx"
1212
)
1313

14-
type CancellationTaskHandlerOptions struct {
14+
type cancellationTaskHandlerOptions struct {
1515
fx.In
1616

1717
Config *Config
@@ -20,23 +20,23 @@ type CancellationTaskHandlerOptions struct {
2020
Logger log.Logger
2121
}
2222

23-
type CancellationTaskHandler struct {
23+
type cancellationTaskHandler struct {
2424
chasm.SideEffectTaskHandlerBase[*nexusoperationpb.CancellationTask]
2525
config *Config
2626

2727
metricsHandler metrics.Handler
2828
logger log.Logger
2929
}
3030

31-
func NewCancellationTaskHandler(opts CancellationTaskHandlerOptions) *CancellationTaskHandler {
32-
return &CancellationTaskHandler{
31+
func newCancellationTaskHandler(opts cancellationTaskHandlerOptions) *cancellationTaskHandler {
32+
return &cancellationTaskHandler{
3333
config: opts.Config,
3434
metricsHandler: opts.MetricsHandler,
3535
logger: opts.Logger,
3636
}
3737
}
3838

39-
func (h *CancellationTaskHandler) Validate(
39+
func (h *cancellationTaskHandler) Validate(
4040
ctx chasm.Context,
4141
cancellation *Cancellation,
4242
attrs chasm.TaskAttributes,
@@ -45,7 +45,7 @@ func (h *CancellationTaskHandler) Validate(
4545
return false, serviceerror.NewUnimplemented("unimplemented")
4646
}
4747

48-
func (h *CancellationTaskHandler) Execute(
48+
func (h *cancellationTaskHandler) Execute(
4949
ctx context.Context,
5050
cancelRef chasm.ComponentRef,
5151
attrs chasm.TaskAttributes,
@@ -54,23 +54,23 @@ func (h *CancellationTaskHandler) Execute(
5454
return serviceerror.NewUnimplemented("unimplemented")
5555
}
5656

57-
type CancellationBackoffTaskHandler struct {
57+
type cancellationBackoffTaskHandler struct {
5858
chasm.PureTaskHandlerBase
5959
config *Config
6060

6161
metricsHandler metrics.Handler
6262
logger log.Logger
6363
}
6464

65-
func NewCancellationBackoffTaskHandler(opts CancellationTaskHandlerOptions) *CancellationBackoffTaskHandler {
66-
return &CancellationBackoffTaskHandler{
65+
func newCancellationBackoffTaskHandler(opts cancellationTaskHandlerOptions) *cancellationBackoffTaskHandler {
66+
return &cancellationBackoffTaskHandler{
6767
config: opts.Config,
6868
metricsHandler: opts.MetricsHandler,
6969
logger: opts.Logger,
7070
}
7171
}
7272

73-
func (h *CancellationBackoffTaskHandler) Validate(
73+
func (h *cancellationBackoffTaskHandler) Validate(
7474
ctx chasm.Context,
7575
cancellation *Cancellation,
7676
attrs chasm.TaskAttributes,
@@ -79,7 +79,7 @@ func (h *CancellationBackoffTaskHandler) Validate(
7979
return false, serviceerror.NewUnimplemented("unimplemented")
8080
}
8181

82-
func (h *CancellationBackoffTaskHandler) Execute(
82+
func (h *cancellationBackoffTaskHandler) Execute(
8383
ctx chasm.MutableContext,
8484
cancellation *Cancellation,
8585
attrs chasm.TaskAttributes,

chasm/lib/nexusoperation/config.go

Lines changed: 38 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package nexusoperation
22

33
import (
4+
"fmt"
45
"strings"
56
"text/template"
67
"time"
@@ -40,8 +41,8 @@ than this value, a timeout error will be returned. Working in conjunction with M
4041
ensure that the server has enough time to complete a Nexus request.`,
4142
)
4243

43-
var MaxConcurrentOperations = dynamicconfig.NewNamespaceIntSetting(
44-
"nexusoperation.limit.operation.concurrency",
44+
var MaxConcurrentOperationsPerWorkflow = dynamicconfig.NewNamespaceIntSetting(
45+
"nexusoperation.limit.operation.concurrencyPerWorkflow.max",
4546
2000,
4647
`Limits the maximum allowed concurrent Nexus Operations for a given workflow execution. Once the limit is reached,
4748
ScheduleNexusOperation commands will be rejected.`,
@@ -97,6 +98,8 @@ var DisallowedOperationHeaders = dynamicconfig.NewGlobalTypedSettingWithConverte
9798
headers.CallerNameHeaderName,
9899
headers.CallerTypeHeaderName,
99100
headers.CallOriginHeaderName,
101+
headers.PrincipalTypeHeaderName,
102+
headers.PrincipalNameHeaderName,
100103
},
101104
`Case insensitive list of disallowed header keys for Nexus Operations. ScheduleNexusOperation commands with a
102105
"nexus_header" field that contains any of these disallowed keys will be rejected.`,
@@ -112,9 +115,9 @@ or a longer timeout than permitted will have their schedule-to-close timeout cap
112115
var CallbackURLTemplate = dynamicconfig.NewGlobalTypedSettingWithConverter(
113116
"nexusoperation.callback.endpoint.template",
114117
func(in any) (*template.Template, error) {
115-
s, err := dynamicconfig.ConvertStructure[string]("")(in)
116-
if err != nil {
117-
return nil, err
118+
s, ok := in.(string)
119+
if !ok {
120+
return nil, fmt.Errorf("invalid config type: %T for nexusoperation.callback.endpoint.template, expected string", in)
118121
}
119122
if s == "unset" {
120123
return nil, nil
@@ -128,16 +131,33 @@ and {{.NamespaceID}} parameters to construct a publicly accessible URL.
128131
Must be set to call external endpoints.`,
129132
)
130133

131-
var RetryPolicyInitialInterval = dynamicconfig.NewGlobalDurationSetting(
132-
"nexusoperation.retryPolicy.initialInterval",
133-
time.Second,
134-
`The initial backoff interval between every nexus StartOperation or CancelOperation request for a given operation.`,
135-
)
134+
type RetryPolicyConfig struct {
135+
InitialInterval time.Duration
136+
MaxInterval time.Duration
137+
}
138+
139+
func (cfg RetryPolicyConfig) build() backoff.RetryPolicy {
140+
return backoff.NewExponentialRetryPolicy(cfg.InitialInterval).
141+
WithMaximumInterval(cfg.MaxInterval).
142+
WithExpirationInterval(backoff.NoInterval)
143+
}
136144

137-
var RetryPolicyMaximumInterval = dynamicconfig.NewGlobalDurationSetting(
138-
"nexusoperation.retryPolicy.maxInterval",
139-
time.Hour,
140-
`The maximum backoff interval between every nexus StartOperation or CancelOperation request for a given operation.`,
145+
var defaultRetryPolicyConfig = RetryPolicyConfig{
146+
InitialInterval: time.Second,
147+
MaxInterval: time.Hour,
148+
}
149+
150+
var RetryPolicy = dynamicconfig.NewGlobalTypedSettingWithConverter(
151+
"nexusoperation.retryPolicy",
152+
func(in any) (backoff.RetryPolicy, error) {
153+
cfg, err := dynamicconfig.ConvertStructure(defaultRetryPolicyConfig)(in)
154+
if err != nil {
155+
return nil, err
156+
}
157+
return cfg.build(), nil
158+
},
159+
defaultRetryPolicyConfig.build(),
160+
`The retry policy for nexus StartOperation or CancelOperation requests for a given operation.`,
141161
)
142162

143163
var MetricTagConfiguration = dynamicconfig.NewGlobalTypedSetting(
@@ -173,7 +193,7 @@ type Config struct {
173193
NumHistoryShards int32
174194
RequestTimeout dynamicconfig.DurationPropertyFnWithDestinationFilter
175195
MinRequestTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter
176-
MaxConcurrentOperations dynamicconfig.IntPropertyFnWithNamespaceFilter
196+
MaxConcurrentOperationsPerWorkflow dynamicconfig.IntPropertyFnWithNamespaceFilter
177197
MaxServiceNameLength dynamicconfig.IntPropertyFnWithNamespaceFilter
178198
MaxOperationNameLength dynamicconfig.IntPropertyFnWithNamespaceFilter
179199
MaxOperationTokenLength dynamicconfig.IntPropertyFnWithNamespaceFilter
@@ -185,7 +205,7 @@ type Config struct {
185205
UseSystemCallbackURL dynamicconfig.BoolPropertyFn
186206
UseNewFailureWireFormat dynamicconfig.BoolPropertyFnWithNamespaceFilter
187207
RecordCancelRequestCompletionEvents dynamicconfig.BoolPropertyFn
188-
RetryPolicy func() backoff.RetryPolicy
208+
RetryPolicy dynamicconfig.TypedPropertyFn[backoff.RetryPolicy]
189209
}
190210

191211
func configProvider(dc *dynamicconfig.Collection, cfg *config.Persistence) *Config {
@@ -195,7 +215,7 @@ func configProvider(dc *dynamicconfig.Collection, cfg *config.Persistence) *Conf
195215
NumHistoryShards: cfg.NumHistoryShards,
196216
RequestTimeout: RequestTimeout.Get(dc),
197217
MinRequestTimeout: MinRequestTimeout.Get(dc),
198-
MaxConcurrentOperations: MaxConcurrentOperations.Get(dc),
218+
MaxConcurrentOperationsPerWorkflow: MaxConcurrentOperationsPerWorkflow.Get(dc),
199219
MaxServiceNameLength: MaxServiceNameLength.Get(dc),
200220
MaxOperationNameLength: MaxOperationNameLength.Get(dc),
201221
MaxOperationTokenLength: MaxOperationTokenLength.Get(dc),
@@ -206,14 +226,6 @@ func configProvider(dc *dynamicconfig.Collection, cfg *config.Persistence) *Conf
206226
CallbackURLTemplate: CallbackURLTemplate.Get(dc),
207227
UseSystemCallbackURL: UseSystemCallbackURL.Get(dc),
208228
UseNewFailureWireFormat: UseNewFailureWireFormat.Get(dc),
209-
RetryPolicy: func() backoff.RetryPolicy {
210-
return backoff.NewExponentialRetryPolicy(
211-
RetryPolicyInitialInterval.Get(dc)(),
212-
).WithMaximumInterval(
213-
RetryPolicyMaximumInterval.Get(dc)(),
214-
).WithExpirationInterval(
215-
backoff.NoInterval,
216-
)
217-
},
229+
RetryPolicy: RetryPolicy.Get(dc),
218230
}
219231
}

chasm/lib/nexusoperation/fx.go

Lines changed: 145 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,44 @@
11
package nexusoperation
22

33
import (
4+
"context"
5+
"fmt"
6+
"net/http"
7+
8+
"go.temporal.io/api/serviceerror"
9+
persistencespb "go.temporal.io/server/api/persistence/v1"
410
"go.temporal.io/server/chasm"
11+
"go.temporal.io/server/common"
12+
"go.temporal.io/server/common/cluster"
13+
"go.temporal.io/server/common/collection"
14+
"go.temporal.io/server/common/dynamicconfig"
15+
"go.temporal.io/server/common/log"
16+
"go.temporal.io/server/common/metrics"
17+
commonnexus "go.temporal.io/server/common/nexus"
18+
"go.temporal.io/server/common/nexus/nexusrpc"
19+
"go.temporal.io/server/common/persistence"
20+
"go.temporal.io/server/common/resource"
21+
"go.temporal.io/server/common/rpc"
522
"go.uber.org/fx"
623
)
724

25+
const nexusCallbackSourceHeader = "Nexus-Callback-Source"
26+
827
var Module = fx.Module(
9-
"chasm.lib.nexusoperations",
28+
"chasm.lib.nexusoperation",
1029
fx.Provide(configProvider),
11-
fx.Provide(NewCancellationBackoffTaskHandler),
12-
fx.Provide(NewCancellationTaskHandler),
13-
fx.Provide(NewOperationBackoffTaskHandler),
14-
fx.Provide(NewOperationInvocationTaskHandler),
15-
fx.Provide(NewOperationScheduleToCloseTimeoutTaskHandler),
16-
fx.Provide(NewOperationScheduleToStartTimeoutTaskHandler),
17-
fx.Provide(NewOperationStartToCloseTimeoutTaskHandler),
30+
fx.Provide(commonnexus.NewCallbackTokenGenerator),
31+
fx.Provide(endpointRegistryProvider),
32+
fx.Invoke(endpointRegistryLifetimeHooks),
33+
fx.Provide(defaultNexusTransportProvider),
34+
fx.Provide(clientProviderFactory),
35+
fx.Provide(newCancellationBackoffTaskHandler),
36+
fx.Provide(newCancellationTaskHandler),
37+
fx.Provide(newOperationBackoffTaskHandler),
38+
fx.Provide(newOperationInvocationTaskHandler),
39+
fx.Provide(newOperationScheduleToCloseTimeoutTaskHandler),
40+
fx.Provide(newOperationScheduleToStartTimeoutTaskHandler),
41+
fx.Provide(newOperationStartToCloseTimeoutTaskHandler),
1842
fx.Provide(newLibrary),
1943
fx.Invoke(register),
2044
)
@@ -25,3 +49,116 @@ func register(
2549
) error {
2650
return registry.Register(library)
2751
}
52+
53+
func endpointRegistryProvider(
54+
matchingClient resource.MatchingClient,
55+
endpointManager persistence.NexusEndpointManager,
56+
dc *dynamicconfig.Collection,
57+
logger log.Logger,
58+
metricsHandler metrics.Handler,
59+
) commonnexus.EndpointRegistry {
60+
registryConfig := commonnexus.NewEndpointRegistryConfig(dc)
61+
return commonnexus.NewEndpointRegistry(
62+
registryConfig,
63+
matchingClient,
64+
endpointManager,
65+
logger,
66+
metricsHandler,
67+
)
68+
}
69+
70+
func endpointRegistryLifetimeHooks(lc fx.Lifecycle, registry commonnexus.EndpointRegistry) {
71+
lc.Append(fx.StartStopHook(registry.StartLifecycle, registry.StopLifecycle))
72+
}
73+
74+
// NexusTransportProvider allows customization of the HTTP transport used for Nexus requests.
75+
type NexusTransportProvider func(namespaceID, serviceName string) http.RoundTripper
76+
77+
func defaultNexusTransportProvider() NexusTransportProvider {
78+
return func(namespaceID, serviceName string) http.RoundTripper {
79+
return http.DefaultTransport
80+
}
81+
}
82+
83+
// responseSizeLimiter wraps an http.RoundTripper to limit response body size.
84+
type responseSizeLimiter struct {
85+
rt http.RoundTripper
86+
}
87+
88+
func (r responseSizeLimiter) RoundTrip(request *http.Request) (*http.Response, error) {
89+
response, err := r.rt.RoundTrip(request)
90+
if err != nil {
91+
return nil, err
92+
}
93+
response.Body = http.MaxBytesReader(nil, response.Body, rpc.MaxNexusAPIRequestBodyBytes)
94+
return response, nil
95+
}
96+
97+
type clientProviderCacheKey struct {
98+
namespaceID, endpointID string
99+
url string
100+
}
101+
102+
func clientProviderFactory(
103+
httpTransportProvider NexusTransportProvider,
104+
clusterMetadata cluster.Metadata,
105+
rpcFactory common.RPCFactory,
106+
config *Config,
107+
) (ClientProvider, error) {
108+
cl, err := rpcFactory.CreateLocalFrontendHTTPClient()
109+
if err != nil {
110+
return nil, fmt.Errorf("cannot create local frontend HTTP client: %w", err)
111+
}
112+
var clusterID string
113+
114+
if clusterInfo, ok := clusterMetadata.GetAllClusterInfo()[clusterMetadata.GetCurrentClusterName()]; ok {
115+
clusterID = clusterInfo.ClusterID
116+
}
117+
m := collection.NewFallibleOnceMap(func(key clientProviderCacheKey) (*http.Client, error) {
118+
transport := httpTransportProvider(key.namespaceID, key.endpointID)
119+
return &http.Client{
120+
Transport: responseSizeLimiter{transport},
121+
}, nil
122+
})
123+
124+
return func(ctx context.Context, namespaceID string, entry *persistencespb.NexusEndpointEntry, service string) (*nexusrpc.HTTPClient, error) {
125+
var url string
126+
var httpClient *http.Client
127+
httpCaller := httpClient.Do
128+
switch variant := entry.Endpoint.Spec.Target.Variant.(type) {
129+
case *persistencespb.NexusEndpointTarget_External_:
130+
url = variant.External.GetUrl()
131+
var err error
132+
httpClient, err = m.Get(clientProviderCacheKey{namespaceID, entry.Id, url})
133+
if err != nil {
134+
return nil, err
135+
}
136+
if clusterID != "" {
137+
httpCaller = func(r *http.Request) (*http.Response, error) {
138+
resp, callErr := httpClient.Do(r)
139+
commonnexus.SetFailureSourceOnContext(ctx, resp)
140+
return resp, callErr
141+
}
142+
}
143+
case *persistencespb.NexusEndpointTarget_Worker_:
144+
url = cl.BaseURL() + "/" + commonnexus.RouteDispatchNexusTaskByEndpoint.Path(entry.Id)
145+
httpClient = &cl.Client
146+
if clusterID != "" {
147+
httpCaller = func(r *http.Request) (*http.Response, error) {
148+
r.Header.Set(nexusCallbackSourceHeader, clusterID)
149+
resp, callErr := httpClient.Do(r)
150+
commonnexus.SetFailureSourceOnContext(ctx, resp)
151+
return resp, callErr
152+
}
153+
}
154+
default:
155+
return nil, serviceerror.NewInternal("got unexpected endpoint target")
156+
}
157+
return nexusrpc.NewHTTPClient(nexusrpc.HTTPClientOptions{
158+
BaseURL: url,
159+
Service: service,
160+
HTTPCaller: httpCaller,
161+
Serializer: commonnexus.PayloadSerializer,
162+
})
163+
}, nil
164+
}

0 commit comments

Comments
 (0)