Skip to content

Commit 83bfb53

Browse files
stephanosclaude
andcommitted
Nexus Standalone: Poll (#9780)
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent b47f5dc commit 83bfb53

12 files changed

Lines changed: 1325 additions & 127 deletions

File tree

chasm/lib/nexusoperation/config.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,21 @@ import (
1212
"go.temporal.io/server/common/rpc/interceptor"
1313
)
1414

15+
var LongPollTimeout = dynamicconfig.NewNamespaceDurationSetting(
16+
"nexusoperation.longPollTimeout",
17+
20*time.Second,
18+
`Maximum timeout for nexus operation long-poll requests. Actual wait may be shorter to leave
19+
longPollBuffer before the caller deadline.`,
20+
)
21+
22+
var LongPollBuffer = dynamicconfig.NewNamespaceDurationSetting(
23+
"nexusoperation.longPollBuffer",
24+
time.Second,
25+
`A buffer used to adjust the nexus operation long-poll timeouts.
26+
Specifically, nexus operation long-poll requests are timed out at a time which leaves at least the buffer's duration
27+
remaining before the caller's deadline, if permitted by the caller's deadline.`,
28+
)
29+
1530
var Enabled = dynamicconfig.NewNamespaceBoolSetting(
1631
"nexusoperation.enableStandalone",
1732
false,
@@ -185,6 +200,8 @@ type Config struct {
185200
ChasmEnabled dynamicconfig.BoolPropertyFnWithNamespaceFilter
186201
ChasmNexusEnabled dynamicconfig.BoolPropertyFnWithNamespaceFilter
187202
NumHistoryShards int32
203+
LongPollBuffer dynamicconfig.DurationPropertyFnWithNamespaceFilter
204+
LongPollTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter
188205
RequestTimeout dynamicconfig.DurationPropertyFnWithDestinationFilter
189206
MinRequestTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter
190207
MaxConcurrentOperations dynamicconfig.IntPropertyFnWithNamespaceFilter
@@ -212,6 +229,8 @@ func configProvider(dc *dynamicconfig.Collection, cfg *config.Persistence) *Conf
212229
ChasmEnabled: dynamicconfig.EnableChasm.Get(dc),
213230
ChasmNexusEnabled: ChasmNexusEnabled.Get(dc),
214231
NumHistoryShards: cfg.NumHistoryShards,
232+
LongPollBuffer: LongPollBuffer.Get(dc),
233+
LongPollTimeout: LongPollTimeout.Get(dc),
215234
RequestTimeout: RequestTimeout.Get(dc),
216235
MinRequestTimeout: MinRequestTimeout.Get(dc),
217236
MaxConcurrentOperations: MaxConcurrentOperations.Get(dc),

chasm/lib/nexusoperation/frontend.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ func (h *frontendHandler) DescribeNexusOperationExecution(
110110
return nil, err
111111
}
112112

113-
if err := validateAndNormalizeDescribeRequest(req, h.config); err != nil {
113+
if err := validateAndNormalizeDescribeRequest(req, namespaceID.String(), h.config); err != nil {
114114
return nil, err
115115
}
116116

@@ -121,11 +121,29 @@ func (h *frontendHandler) DescribeNexusOperationExecution(
121121
return resp.GetFrontendResponse(), err
122122
}
123123

124-
func (h *frontendHandler) PollNexusOperationExecution(_ context.Context, req *workflowservice.PollNexusOperationExecutionRequest) (*workflowservice.PollNexusOperationExecutionResponse, error) {
124+
// PollNexusOperationExecution long-polls for a Nexus operation to reach a specific stage.
125+
func (h *frontendHandler) PollNexusOperationExecution(
126+
ctx context.Context,
127+
req *workflowservice.PollNexusOperationExecutionRequest,
128+
) (*workflowservice.PollNexusOperationExecutionResponse, error) {
125129
if !h.isStandaloneNexusOperationEnabled(req.GetNamespace()) {
126130
return nil, ErrStandaloneNexusOperationDisabled
127131
}
128-
return nil, serviceerror.NewUnimplemented("PollNexusOperationExecution not implemented")
132+
133+
if err := validateAndNormalizePollRequest(req, h.config); err != nil {
134+
return nil, err
135+
}
136+
137+
namespaceID, err := h.namespaceRegistry.GetNamespaceID(namespace.Name(req.GetNamespace()))
138+
if err != nil {
139+
return nil, err
140+
}
141+
142+
resp, err := h.client.PollNexusOperation(ctx, &nexusoperationpb.PollNexusOperationRequest{
143+
NamespaceId: namespaceID.String(),
144+
FrontendRequest: req,
145+
})
146+
return resp.GetFrontendResponse(), err
129147
}
130148

131149
func (h *frontendHandler) ListNexusOperationExecutions(

chasm/lib/nexusoperation/fx.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,26 @@
11
package nexusoperation
22

33
import (
4+
"context"
5+
"fmt"
6+
"net/http"
7+
8+
"go.temporal.io/api/serviceerror"
9+
persistencespb "go.temporal.io/server/api/persistence/v1"
410
"go.temporal.io/server/chasm"
511
nexusoperationpb "go.temporal.io/server/chasm/lib/nexusoperation/gen/nexusoperationpb/v1"
12+
"go.temporal.io/server/common"
13+
"go.temporal.io/server/common/cluster"
14+
"go.temporal.io/server/common/collection"
15+
commonnexus "go.temporal.io/server/common/nexus"
16+
"go.temporal.io/server/common/nexus/nexusrpc"
617
"go.uber.org/fx"
718
)
819

920
var Module = fx.Module(
1021
"chasm.lib.nexusoperation",
1122
fx.Provide(configProvider),
23+
fx.Provide(ClientProviderFactory),
1224
fx.Provide(newHandler),
1325
fx.Provide(NewCancellationBackoffTaskHandler),
1426
fx.Provide(NewCancellationTaskHandler),
@@ -33,3 +45,80 @@ func register(
3345
) error {
3446
return registry.Register(library)
3547
}
48+
49+
type nexusTransportProvider func(namespaceID, serviceName string) http.RoundTripper
50+
51+
func defaultNexusTransportProvider() nexusTransportProvider {
52+
return func(string, string) http.RoundTripper {
53+
return http.DefaultTransport
54+
}
55+
}
56+
57+
type clientProviderCacheKey struct {
58+
namespaceID string
59+
endpointID string
60+
url string
61+
}
62+
63+
func ClientProviderFactory(
64+
clusterMetadata cluster.Metadata,
65+
rpcFactory common.RPCFactory,
66+
) (ClientProvider, error) {
67+
cl, err := rpcFactory.CreateLocalFrontendHTTPClient()
68+
if err != nil {
69+
return nil, fmt.Errorf("cannot create local frontend HTTP client: %w", err)
70+
}
71+
var clusterID string
72+
if clusterInfo, ok := clusterMetadata.GetAllClusterInfo()[clusterMetadata.GetCurrentClusterName()]; ok {
73+
clusterID = clusterInfo.ClusterID
74+
}
75+
76+
transportProvider := defaultNexusTransportProvider()
77+
clients := collection.NewFallibleOnceMap(func(key clientProviderCacheKey) (*http.Client, error) {
78+
return &http.Client{
79+
Transport: transportProvider(key.namespaceID, key.endpointID),
80+
}, nil
81+
})
82+
83+
return func(ctx context.Context, namespaceID string, entry *persistencespb.NexusEndpointEntry, service string) (*nexusrpc.HTTPClient, error) {
84+
var (
85+
url string
86+
httpClient *http.Client
87+
httpCaller func(*http.Request) (*http.Response, error)
88+
)
89+
90+
switch variant := entry.Endpoint.Spec.Target.Variant.(type) {
91+
case *persistencespb.NexusEndpointTarget_External_:
92+
url = variant.External.GetUrl()
93+
httpClient, err = clients.Get(clientProviderCacheKey{namespaceID: namespaceID, endpointID: entry.Id, url: url})
94+
if err != nil {
95+
return nil, err
96+
}
97+
httpCaller = func(r *http.Request) (*http.Response, error) {
98+
resp, callErr := httpClient.Do(r)
99+
commonnexus.SetFailureSourceOnContext(ctx, resp)
100+
return resp, callErr
101+
}
102+
case *persistencespb.NexusEndpointTarget_Worker_:
103+
url = cl.BaseURL() + "/" + commonnexus.RouteDispatchNexusTaskByEndpoint.Path(entry.Id)
104+
httpClient = &cl.Client
105+
httpCaller = func(r *http.Request) (*http.Response, error) {
106+
if clusterID != "" {
107+
r.Header.Set("Nexus-Callback-Source", clusterID)
108+
}
109+
resp, callErr := httpClient.Do(r)
110+
commonnexus.SetFailureSourceOnContext(ctx, resp)
111+
return resp, callErr
112+
}
113+
default:
114+
return nil, serviceerror.NewInternal("got unexpected endpoint target")
115+
}
116+
117+
return nexusrpc.NewHTTPClient(nexusrpc.HTTPClientOptions{
118+
BaseURL: url,
119+
Service: service,
120+
HTTPCaller: httpCaller,
121+
Serializer: commonnexus.PayloadSerializer,
122+
})
123+
}, nil
124+
}

chasm/lib/nexusoperation/handler.go

Lines changed: 107 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,14 @@ package nexusoperation
22

33
import (
44
"context"
5+
"errors"
56

67
enumspb "go.temporal.io/api/enums/v1"
8+
"go.temporal.io/api/serviceerror"
79
"go.temporal.io/api/workflowservice/v1"
810
"go.temporal.io/server/chasm"
911
nexusoperationpb "go.temporal.io/server/chasm/lib/nexusoperation/gen/nexusoperationpb/v1"
12+
"go.temporal.io/server/common/contextutil"
1013
"go.temporal.io/server/common/log"
1114
)
1215

@@ -59,7 +62,14 @@ func (h *handler) StartNexusOperation(
5962
}, nil
6063
}
6164

62-
// TODO: Add long-poll support.
65+
// DescribeNexusOperation queries current operation state, optionally as a long-poll that waits
66+
// for any state change.
67+
//
68+
// When used to long-poll, it returns an empty non-error response on context
69+
// deadline expiry, to indicate that the state being waited for was not reached. Callers should
70+
// interpret this as an invitation to resubmit their long-poll request. This response is sent before
71+
// the caller's deadline (see nexusoperation.longPollBuffer) so that it is likely that the caller
72+
// does indeed receive the non-error response.
6373
func (h *handler) DescribeNexusOperation(
6474
ctx context.Context,
6575
req *nexusoperationpb.DescribeNexusOperationRequest,
@@ -72,7 +82,102 @@ func (h *handler) DescribeNexusOperation(
7282
RunID: req.GetFrontendRequest().GetRunId(),
7383
})
7484

75-
return chasm.ReadComponent(ctx, ref, (*Operation).buildDescribeResponse, req, nil)
85+
token := req.GetFrontendRequest().GetLongPollToken()
86+
if len(token) == 0 {
87+
// No long poll.
88+
return chasm.ReadComponent(ctx, ref, (*Operation).buildDescribeResponse, req)
89+
}
90+
91+
// Determine the long poll timeout and buffer.
92+
ns := req.GetFrontendRequest().GetNamespace()
93+
ctx, cancel := contextutil.WithDeadlineBuffer(
94+
ctx,
95+
h.config.LongPollTimeout(ns),
96+
h.config.LongPollBuffer(ns),
97+
)
98+
defer cancel()
99+
100+
// Poll for the operation state to change.
101+
response, _, err = chasm.PollComponent(ctx, ref, func(
102+
o *Operation,
103+
ctx chasm.Context,
104+
req *nexusoperationpb.DescribeNexusOperationRequest,
105+
) (*nexusoperationpb.DescribeNexusOperationResponse, bool, error) {
106+
changed, err := chasm.ExecutionStateChanged(o, ctx, token)
107+
if err != nil {
108+
if errors.Is(err, chasm.ErrMalformedComponentRef) {
109+
return nil, false, serviceerror.NewInvalidArgument("invalid long poll token")
110+
}
111+
if errors.Is(err, chasm.ErrInvalidComponentRef) {
112+
return nil, false, serviceerror.NewInvalidArgument("long poll token does not match execution")
113+
}
114+
return nil, false, err
115+
}
116+
if changed {
117+
response, err := o.buildDescribeResponse(ctx, req)
118+
return response, true, err
119+
}
120+
return nil, false, nil
121+
}, req)
122+
123+
if err != nil && ctx.Err() != nil {
124+
// Send empty non-error response on deadline expiry: caller should continue long-polling.
125+
return &nexusoperationpb.DescribeNexusOperationResponse{
126+
FrontendResponse: &workflowservice.DescribeNexusOperationExecutionResponse{},
127+
}, nil
128+
}
129+
return response, err
130+
}
131+
132+
// PollNexusOperation long-polls for a Nexus operation to reach a specific stage.
133+
//
134+
// It returns an empty non-error response on context deadline expiry, to indicate that the state
135+
// being waited for was not reached. Callers should interpret this as an invitation to resubmit
136+
// their long-poll request. This response is sent before the caller's
137+
// deadline (see nexusoperation.longPollBuffer) so that it is likely that the caller
138+
// does indeed receive the non-error response.
139+
func (h *handler) PollNexusOperation(
140+
ctx context.Context,
141+
req *nexusoperationpb.PollNexusOperationRequest,
142+
) (response *nexusoperationpb.PollNexusOperationResponse, err error) {
143+
defer log.CapturePanic(h.logger, &err)
144+
145+
ref := chasm.NewComponentRef[*Operation](chasm.ExecutionKey{
146+
NamespaceID: req.GetNamespaceId(),
147+
BusinessID: req.GetFrontendRequest().GetOperationId(),
148+
RunID: req.GetFrontendRequest().GetRunId(),
149+
})
150+
151+
// Determine the long poll timeout and buffer.
152+
ns := req.GetFrontendRequest().GetNamespace()
153+
ctx, cancel := contextutil.WithDeadlineBuffer(
154+
ctx,
155+
h.config.LongPollTimeout(ns),
156+
h.config.LongPollBuffer(ns),
157+
)
158+
defer cancel()
159+
160+
// Poll for the wait stage to be reached.
161+
waitStage := req.GetFrontendRequest().GetWaitStage()
162+
response, _, err = chasm.PollComponent(ctx, ref, func(
163+
o *Operation,
164+
ctx chasm.Context,
165+
req *nexusoperationpb.PollNexusOperationRequest,
166+
) (*nexusoperationpb.PollNexusOperationResponse, bool, error) {
167+
if o.isWaitStageReached(ctx, waitStage) {
168+
response := o.buildPollResponse(ctx)
169+
return response, true, nil
170+
}
171+
return nil, false, nil
172+
}, req)
173+
174+
if err != nil && ctx.Err() != nil {
175+
// Send an empty non-error response as an invitation to resubmit the long-poll.
176+
return &nexusoperationpb.PollNexusOperationResponse{
177+
FrontendResponse: &workflowservice.PollNexusOperationExecutionResponse{},
178+
}, nil
179+
}
180+
return response, err
76181
}
77182

78183
// RequestCancelNexusOperation requests cancellation of a standalone Nexus operation via CHASM.

chasm/lib/nexusoperation/library.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,26 @@ type Library struct {
4747
CancellationBackoffTaskHandler *CancellationBackoffTaskHandler
4848
}
4949

50-
func newLibrary(handler *handler) *Library {
51-
return &Library{handler: handler}
50+
func newLibrary(
51+
handler *handler,
52+
operationBackoffTaskHandler *OperationBackoffTaskHandler,
53+
operationInvocationTaskHandler *OperationInvocationTaskHandler,
54+
operationScheduleToCloseTimeoutTaskHandler *OperationScheduleToCloseTimeoutTaskHandler,
55+
operationScheduleToStartTimeoutTaskHandler *OperationScheduleToStartTimeoutTaskHandler,
56+
operationStartToCloseTimeoutTaskHandler *OperationStartToCloseTimeoutTaskHandler,
57+
cancellationTaskHandler *CancellationTaskHandler,
58+
cancellationBackoffTaskHandler *CancellationBackoffTaskHandler,
59+
) *Library {
60+
return &Library{
61+
handler: handler,
62+
OperationBackoffTaskHandler: operationBackoffTaskHandler,
63+
OperationInvocationTaskHandler: operationInvocationTaskHandler,
64+
OperationScheduleToCloseTimeoutTaskHandler: operationScheduleToCloseTimeoutTaskHandler,
65+
OperationScheduleToStartTimeoutTaskHandler: operationScheduleToStartTimeoutTaskHandler,
66+
OperationStartToCloseTimeoutTaskHandler: operationStartToCloseTimeoutTaskHandler,
67+
CancellationTaskHandler: cancellationTaskHandler,
68+
CancellationBackoffTaskHandler: cancellationBackoffTaskHandler,
69+
}
5270
}
5371

5472
func (l *Library) Tasks() []*chasm.RegistrableTask {

0 commit comments

Comments
 (0)