Skip to content

Commit ab6ee8c

Browse files
stephanosclaude
andcommitted
Nexus Standalone: Poll (#9780)
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 673f17e commit ab6ee8c

9 files changed

Lines changed: 1209 additions & 123 deletions

File tree

chasm/lib/nexusoperation/config.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,21 @@ import (
1313
"go.temporal.io/server/common/rpc/interceptor"
1414
)
1515

16+
var LongPollTimeout = dynamicconfig.NewNamespaceDurationSetting(
17+
"nexusoperation.longPollTimeout",
18+
20*time.Second,
19+
`Maximum timeout for nexus operation long-poll requests. Actual wait may be shorter to leave
20+
longPollBuffer before the caller deadline.`,
21+
)
22+
23+
var LongPollBuffer = dynamicconfig.NewNamespaceDurationSetting(
24+
"nexusoperation.longPollBuffer",
25+
time.Second,
26+
`A buffer used to adjust the nexus operation long-poll timeouts.
27+
Specifically, nexus operation long-poll requests are timed out at a time which leaves at least the buffer's duration
28+
remaining before the caller's deadline, if permitted by the caller's deadline.`,
29+
)
30+
1631
var Enabled = dynamicconfig.NewNamespaceBoolSetting(
1732
"nexusoperation.enableStandalone",
1833
false,
@@ -205,6 +220,8 @@ type Config struct {
205220
EnableChasm dynamicconfig.BoolPropertyFnWithNamespaceFilter
206221
EnableChasmNexus dynamicconfig.BoolPropertyFnWithNamespaceFilter
207222
NumHistoryShards int32
223+
LongPollBuffer dynamicconfig.DurationPropertyFnWithNamespaceFilter
224+
LongPollTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter
208225
RequestTimeout dynamicconfig.DurationPropertyFnWithDestinationFilter
209226
MinRequestTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter
210227
MaxConcurrentOperationsPerWorkflow dynamicconfig.IntPropertyFnWithNamespaceFilter
@@ -232,6 +249,8 @@ func configProvider(dc *dynamicconfig.Collection, cfg *config.Persistence) *Conf
232249
EnableChasm: dynamicconfig.EnableChasm.Get(dc),
233250
EnableChasmNexus: EnableChasmNexus.Get(dc),
234251
NumHistoryShards: cfg.NumHistoryShards,
252+
LongPollBuffer: LongPollBuffer.Get(dc),
253+
LongPollTimeout: LongPollTimeout.Get(dc),
235254
RequestTimeout: RequestTimeout.Get(dc),
236255
MinRequestTimeout: MinRequestTimeout.Get(dc),
237256
MaxConcurrentOperationsPerWorkflow: MaxConcurrentOperationsPerWorkflow.Get(dc),

chasm/lib/nexusoperation/frontend.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ func (h *frontendHandler) DescribeNexusOperationExecution(
110110
return nil, err
111111
}
112112

113-
if err := validateAndNormalizeDescribeRequest(req, h.config); err != nil {
113+
if err := validateAndNormalizeDescribeRequest(req, namespaceID.String(), h.config); err != nil {
114114
return nil, err
115115
}
116116

@@ -121,11 +121,29 @@ func (h *frontendHandler) DescribeNexusOperationExecution(
121121
return resp.GetFrontendResponse(), err
122122
}
123123

124-
func (h *frontendHandler) PollNexusOperationExecution(_ context.Context, req *workflowservice.PollNexusOperationExecutionRequest) (*workflowservice.PollNexusOperationExecutionResponse, error) {
124+
// PollNexusOperationExecution long-polls for a Nexus operation to reach a specific stage.
125+
func (h *frontendHandler) PollNexusOperationExecution(
126+
ctx context.Context,
127+
req *workflowservice.PollNexusOperationExecutionRequest,
128+
) (*workflowservice.PollNexusOperationExecutionResponse, error) {
125129
if !h.isStandaloneNexusOperationEnabled(req.GetNamespace()) {
126130
return nil, ErrStandaloneNexusOperationDisabled
127131
}
128-
return nil, serviceerror.NewUnimplemented("PollNexusOperationExecution not implemented")
132+
133+
if err := validateAndNormalizePollRequest(req, h.config); err != nil {
134+
return nil, err
135+
}
136+
137+
namespaceID, err := h.namespaceRegistry.GetNamespaceID(namespace.Name(req.GetNamespace()))
138+
if err != nil {
139+
return nil, err
140+
}
141+
142+
resp, err := h.client.PollNexusOperation(ctx, &nexusoperationpb.PollNexusOperationRequest{
143+
NamespaceId: namespaceID.String(),
144+
FrontendRequest: req,
145+
})
146+
return resp.GetFrontendResponse(), err
129147
}
130148

131149
func (h *frontendHandler) ListNexusOperationExecutions(

chasm/lib/nexusoperation/handler.go

Lines changed: 107 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,14 @@ package nexusoperation
22

33
import (
44
"context"
5+
"errors"
56

67
enumspb "go.temporal.io/api/enums/v1"
8+
"go.temporal.io/api/serviceerror"
79
"go.temporal.io/api/workflowservice/v1"
810
"go.temporal.io/server/chasm"
911
nexusoperationpb "go.temporal.io/server/chasm/lib/nexusoperation/gen/nexusoperationpb/v1"
12+
"go.temporal.io/server/common/contextutil"
1013
"go.temporal.io/server/common/log"
1114
)
1215

@@ -59,7 +62,14 @@ func (h *handler) StartNexusOperation(
5962
}, nil
6063
}
6164

62-
// TODO: Add long-poll support.
65+
// DescribeNexusOperation queries current operation state, optionally as a long-poll that waits
66+
// for any state change.
67+
//
68+
// When used to long-poll, it returns an empty non-error response on context
69+
// deadline expiry, to indicate that the state being waited for was not reached. Callers should
70+
// interpret this as an invitation to resubmit their long-poll request. This response is sent before
71+
// the caller's deadline (see nexusoperation.longPollBuffer) so that it is likely that the caller
72+
// does indeed receive the non-error response.
6373
func (h *handler) DescribeNexusOperation(
6474
ctx context.Context,
6575
req *nexusoperationpb.DescribeNexusOperationRequest,
@@ -72,7 +82,102 @@ func (h *handler) DescribeNexusOperation(
7282
RunID: req.GetFrontendRequest().GetRunId(),
7383
})
7484

75-
return chasm.ReadComponent(ctx, ref, (*Operation).buildDescribeResponse, req, nil)
85+
token := req.GetFrontendRequest().GetLongPollToken()
86+
if len(token) == 0 {
87+
// No long poll.
88+
return chasm.ReadComponent(ctx, ref, (*Operation).buildDescribeResponse, req)
89+
}
90+
91+
// Determine the long poll timeout and buffer.
92+
ns := req.GetFrontendRequest().GetNamespace()
93+
ctx, cancel := contextutil.WithDeadlineBuffer(
94+
ctx,
95+
h.config.LongPollTimeout(ns),
96+
h.config.LongPollBuffer(ns),
97+
)
98+
defer cancel()
99+
100+
// Poll for the operation state to change.
101+
response, _, err = chasm.PollComponent(ctx, ref, func(
102+
o *Operation,
103+
ctx chasm.Context,
104+
req *nexusoperationpb.DescribeNexusOperationRequest,
105+
) (*nexusoperationpb.DescribeNexusOperationResponse, bool, error) {
106+
changed, err := chasm.ExecutionStateChanged(o, ctx, token)
107+
if err != nil {
108+
if errors.Is(err, chasm.ErrMalformedComponentRef) {
109+
return nil, false, serviceerror.NewInvalidArgument("invalid long poll token")
110+
}
111+
if errors.Is(err, chasm.ErrInvalidComponentRef) {
112+
return nil, false, serviceerror.NewInvalidArgument("long poll token does not match execution")
113+
}
114+
return nil, false, err
115+
}
116+
if changed {
117+
response, err := o.buildDescribeResponse(ctx, req)
118+
return response, true, err
119+
}
120+
return nil, false, nil
121+
}, req)
122+
123+
if err != nil && ctx.Err() != nil {
124+
// Send empty non-error response on deadline expiry: caller should continue long-polling.
125+
return &nexusoperationpb.DescribeNexusOperationResponse{
126+
FrontendResponse: &workflowservice.DescribeNexusOperationExecutionResponse{},
127+
}, nil
128+
}
129+
return response, err
130+
}
131+
132+
// PollNexusOperation long-polls for a Nexus operation to reach a specific stage.
133+
//
134+
// It returns an empty non-error response on context deadline expiry, to indicate that the state
135+
// being waited for was not reached. Callers should interpret this as an invitation to resubmit
136+
// their long-poll request. This response is sent before the caller's
137+
// deadline (see nexusoperation.longPollBuffer) so that it is likely that the caller
138+
// does indeed receive the non-error response.
139+
func (h *handler) PollNexusOperation(
140+
ctx context.Context,
141+
req *nexusoperationpb.PollNexusOperationRequest,
142+
) (response *nexusoperationpb.PollNexusOperationResponse, err error) {
143+
defer log.CapturePanic(h.logger, &err)
144+
145+
ref := chasm.NewComponentRef[*Operation](chasm.ExecutionKey{
146+
NamespaceID: req.GetNamespaceId(),
147+
BusinessID: req.GetFrontendRequest().GetOperationId(),
148+
RunID: req.GetFrontendRequest().GetRunId(),
149+
})
150+
151+
// Determine the long poll timeout and buffer.
152+
ns := req.GetFrontendRequest().GetNamespace()
153+
ctx, cancel := contextutil.WithDeadlineBuffer(
154+
ctx,
155+
h.config.LongPollTimeout(ns),
156+
h.config.LongPollBuffer(ns),
157+
)
158+
defer cancel()
159+
160+
// Poll for the wait stage to be reached.
161+
waitStage := req.GetFrontendRequest().GetWaitStage()
162+
response, _, err = chasm.PollComponent(ctx, ref, func(
163+
o *Operation,
164+
ctx chasm.Context,
165+
req *nexusoperationpb.PollNexusOperationRequest,
166+
) (*nexusoperationpb.PollNexusOperationResponse, bool, error) {
167+
if o.isWaitStageReached(ctx, waitStage) {
168+
response := o.buildPollResponse(ctx)
169+
return response, true, nil
170+
}
171+
return nil, false, nil
172+
}, req)
173+
174+
if err != nil && ctx.Err() != nil {
175+
// Send an empty non-error response as an invitation to resubmit the long-poll.
176+
return &nexusoperationpb.PollNexusOperationResponse{
177+
FrontendResponse: &workflowservice.PollNexusOperationExecutionResponse{},
178+
}, nil
179+
}
180+
return response, err
76181
}
77182

78183
// RequestCancelNexusOperation requests cancellation of a standalone Nexus operation via CHASM.

chasm/lib/nexusoperation/operation.go

Lines changed: 66 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -320,32 +320,87 @@ func (o *Operation) buildDescribeResponse(
320320
ctx chasm.Context,
321321
req *nexusoperationpb.DescribeNexusOperationRequest,
322322
) (*nexusoperationpb.DescribeNexusOperationResponse, error) {
323+
token, err := ctx.Ref(o)
324+
if err != nil {
325+
return nil, err
326+
}
327+
323328
resp := &workflowservice.DescribeNexusOperationExecutionResponse{
324-
RunId: ctx.ExecutionKey().RunID,
325-
Info: o.buildExecutionInfo(ctx),
329+
RunId: ctx.ExecutionKey().RunID,
330+
Info: o.buildExecutionInfo(ctx),
331+
LongPollToken: token,
326332
}
327333
if req.GetFrontendRequest().GetIncludeInput() {
328334
resp.Input = o.RequestData.Get(ctx).GetInput()
329335
}
330-
if req.GetFrontendRequest().GetIncludeOutcome() && o.LifecycleState(ctx).IsClosed() {
331-
outcome := o.Outcome.Get(ctx)
332-
if successful := outcome.GetSuccessful(); successful != nil {
336+
if req.GetFrontendRequest().GetIncludeOutcome() && o.isClosed() {
337+
if successful, failure := o.describeOutcome(ctx); successful != nil {
333338
resp.Outcome = &workflowservice.DescribeNexusOperationExecutionResponse_Result{
334-
Result: successful.GetResult(),
339+
Result: successful,
335340
}
336-
} else if failure := outcome.GetFailed().GetFailure(); failure != nil {
341+
} else if failure != nil {
337342
resp.Outcome = &workflowservice.DescribeNexusOperationExecutionResponse_Failure{
338343
Failure: failure,
339344
}
340-
} else if o.LastAttemptFailure != nil {
341-
resp.Outcome = &workflowservice.DescribeNexusOperationExecutionResponse_Failure{
342-
Failure: o.LastAttemptFailure,
343-
}
344345
}
345346
}
346347
return &nexusoperationpb.DescribeNexusOperationResponse{FrontendResponse: resp}, nil
347348
}
348349

350+
func (o *Operation) buildPollResponse(
351+
ctx chasm.Context,
352+
) *nexusoperationpb.PollNexusOperationResponse {
353+
resp := &workflowservice.PollNexusOperationExecutionResponse{
354+
RunId: ctx.ExecutionKey().RunID,
355+
OperationToken: o.OperationToken,
356+
}
357+
358+
if o.isClosed() {
359+
resp.WaitStage = enumspb.NEXUS_OPERATION_WAIT_STAGE_CLOSED
360+
if successful, failure := o.describeOutcome(ctx); successful != nil {
361+
resp.Outcome = &workflowservice.PollNexusOperationExecutionResponse_Result{
362+
Result: successful,
363+
}
364+
} else if failure != nil {
365+
resp.Outcome = &workflowservice.PollNexusOperationExecutionResponse_Failure{
366+
Failure: failure,
367+
}
368+
}
369+
} else {
370+
resp.WaitStage = enumspb.NEXUS_OPERATION_WAIT_STAGE_STARTED
371+
}
372+
373+
return &nexusoperationpb.PollNexusOperationResponse{
374+
FrontendResponse: resp,
375+
}
376+
}
377+
378+
func (o *Operation) describeOutcome(ctx chasm.Context) (*commonpb.Payload, *failurepb.Failure) {
379+
outcome := o.Outcome.Get(ctx)
380+
if successful := outcome.GetSuccessful(); successful != nil {
381+
return successful.GetResult(), nil
382+
}
383+
if failure := outcome.GetFailed().GetFailure(); failure != nil {
384+
return nil, failure
385+
}
386+
return nil, o.LastAttemptFailure
387+
}
388+
389+
func (o *Operation) isWaitStageReached(_ chasm.Context, waitStage enumspb.NexusOperationWaitStage) bool {
390+
switch waitStage {
391+
case enumspb.NEXUS_OPERATION_WAIT_STAGE_STARTED:
392+
return o.Status == nexusoperationpb.OPERATION_STATUS_STARTED || o.isClosed()
393+
case enumspb.NEXUS_OPERATION_WAIT_STAGE_CLOSED:
394+
return o.isClosed()
395+
default:
396+
return false
397+
}
398+
}
399+
400+
func (o *Operation) isClosed() bool {
401+
return o.LifecycleState(nil).IsClosed()
402+
}
403+
349404
func (o *Operation) buildExecutionInfo(ctx chasm.Context) *nexuspb.NexusOperationExecutionInfo {
350405
requestData := o.RequestData.Get(ctx)
351406
key := ctx.ExecutionKey()

0 commit comments

Comments
 (0)