Skip to content

Commit ac4a2f1

Browse files
stephanosclaude
andcommitted
Nexus Standalone: Poll (#9780)
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 89a9c70 commit ac4a2f1

11 files changed

Lines changed: 1215 additions & 124 deletions

File tree

chasm/lib/nexusoperation/config.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,21 @@ import (
1212
"go.temporal.io/server/common/rpc/interceptor"
1313
)
1414

15+
var LongPollTimeout = dynamicconfig.NewNamespaceDurationSetting(
16+
"nexusoperation.longPollTimeout",
17+
20*time.Second,
18+
`Maximum timeout for nexus operation long-poll requests. Actual wait may be shorter to leave
19+
longPollBuffer before the caller deadline.`,
20+
)
21+
22+
var LongPollBuffer = dynamicconfig.NewNamespaceDurationSetting(
23+
"nexusoperation.longPollBuffer",
24+
time.Second,
25+
`A buffer used to adjust the nexus operation long-poll timeouts.
26+
Specifically, nexus operation long-poll requests are timed out at a time which leaves at least the buffer's duration
27+
remaining before the caller's deadline, if permitted by the caller's deadline.`,
28+
)
29+
1530
var Enabled = dynamicconfig.NewNamespaceBoolSetting(
1631
"nexusoperation.enableStandalone",
1732
false,
@@ -185,6 +200,8 @@ type Config struct {
185200
ChasmEnabled dynamicconfig.BoolPropertyFnWithNamespaceFilter
186201
ChasmNexusEnabled dynamicconfig.BoolPropertyFnWithNamespaceFilter
187202
NumHistoryShards int32
203+
LongPollBuffer dynamicconfig.DurationPropertyFnWithNamespaceFilter
204+
LongPollTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter
188205
RequestTimeout dynamicconfig.DurationPropertyFnWithDestinationFilter
189206
MinRequestTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter
190207
MaxConcurrentOperations dynamicconfig.IntPropertyFnWithNamespaceFilter
@@ -212,6 +229,8 @@ func configProvider(dc *dynamicconfig.Collection, cfg *config.Persistence) *Conf
212229
ChasmEnabled: dynamicconfig.EnableChasm.Get(dc),
213230
ChasmNexusEnabled: ChasmNexusEnabled.Get(dc),
214231
NumHistoryShards: cfg.NumHistoryShards,
232+
LongPollBuffer: LongPollBuffer.Get(dc),
233+
LongPollTimeout: LongPollTimeout.Get(dc),
215234
RequestTimeout: RequestTimeout.Get(dc),
216235
MinRequestTimeout: MinRequestTimeout.Get(dc),
217236
MaxConcurrentOperations: MaxConcurrentOperations.Get(dc),

chasm/lib/nexusoperation/frontend.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ func (h *frontendHandler) DescribeNexusOperationExecution(
110110
return nil, err
111111
}
112112

113-
if err := validateAndNormalizeDescribeRequest(req, h.config); err != nil {
113+
if err := validateAndNormalizeDescribeRequest(req, namespaceID.String(), h.config); err != nil {
114114
return nil, err
115115
}
116116

@@ -121,11 +121,29 @@ func (h *frontendHandler) DescribeNexusOperationExecution(
121121
return resp.GetFrontendResponse(), err
122122
}
123123

124-
func (h *frontendHandler) PollNexusOperationExecution(_ context.Context, req *workflowservice.PollNexusOperationExecutionRequest) (*workflowservice.PollNexusOperationExecutionResponse, error) {
124+
// PollNexusOperationExecution long-polls for a Nexus operation to reach a specific stage.
125+
func (h *frontendHandler) PollNexusOperationExecution(
126+
ctx context.Context,
127+
req *workflowservice.PollNexusOperationExecutionRequest,
128+
) (*workflowservice.PollNexusOperationExecutionResponse, error) {
125129
if !h.isStandaloneNexusOperationEnabled(req.GetNamespace()) {
126130
return nil, ErrStandaloneNexusOperationDisabled
127131
}
128-
return nil, serviceerror.NewUnimplemented("PollNexusOperationExecution not implemented")
132+
133+
if err := validateAndNormalizePollRequest(req, h.config); err != nil {
134+
return nil, err
135+
}
136+
137+
namespaceID, err := h.namespaceRegistry.GetNamespaceID(namespace.Name(req.GetNamespace()))
138+
if err != nil {
139+
return nil, err
140+
}
141+
142+
resp, err := h.client.PollNexusOperation(ctx, &nexusoperationpb.PollNexusOperationRequest{
143+
NamespaceId: namespaceID.String(),
144+
FrontendRequest: req,
145+
})
146+
return resp.GetFrontendResponse(), err
129147
}
130148

131149
func (h *frontendHandler) ListNexusOperationExecutions(

chasm/lib/nexusoperation/handler.go

Lines changed: 107 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,14 @@ package nexusoperation
22

33
import (
44
"context"
5+
"errors"
56

67
enumspb "go.temporal.io/api/enums/v1"
8+
"go.temporal.io/api/serviceerror"
79
"go.temporal.io/api/workflowservice/v1"
810
"go.temporal.io/server/chasm"
911
nexusoperationpb "go.temporal.io/server/chasm/lib/nexusoperation/gen/nexusoperationpb/v1"
12+
"go.temporal.io/server/common/contextutil"
1013
"go.temporal.io/server/common/log"
1114
)
1215

@@ -59,7 +62,14 @@ func (h *handler) StartNexusOperation(
5962
}, nil
6063
}
6164

62-
// TODO: Add long-poll support.
65+
// DescribeNexusOperation queries current operation state, optionally as a long-poll that waits
66+
// for any state change.
67+
//
68+
// When used to long-poll, it returns an empty non-error response on context
69+
// deadline expiry, to indicate that the state being waited for was not reached. Callers should
70+
// interpret this as an invitation to resubmit their long-poll request. This response is sent before
71+
// the caller's deadline (see nexusoperation.longPollBuffer) so that it is likely that the caller
72+
// does indeed receive the non-error response.
6373
func (h *handler) DescribeNexusOperation(
6474
ctx context.Context,
6575
req *nexusoperationpb.DescribeNexusOperationRequest,
@@ -72,7 +82,102 @@ func (h *handler) DescribeNexusOperation(
7282
RunID: req.GetFrontendRequest().GetRunId(),
7383
})
7484

75-
return chasm.ReadComponent(ctx, ref, (*Operation).buildDescribeResponse, req, nil)
85+
token := req.GetFrontendRequest().GetLongPollToken()
86+
if len(token) == 0 {
87+
// No long poll.
88+
return chasm.ReadComponent(ctx, ref, (*Operation).buildDescribeResponse, req)
89+
}
90+
91+
// Determine the long poll timeout and buffer.
92+
ns := req.GetFrontendRequest().GetNamespace()
93+
ctx, cancel := contextutil.WithDeadlineBuffer(
94+
ctx,
95+
h.config.LongPollTimeout(ns),
96+
h.config.LongPollBuffer(ns),
97+
)
98+
defer cancel()
99+
100+
// Poll for the operation state to change.
101+
response, _, err = chasm.PollComponent(ctx, ref, func(
102+
o *Operation,
103+
ctx chasm.Context,
104+
req *nexusoperationpb.DescribeNexusOperationRequest,
105+
) (*nexusoperationpb.DescribeNexusOperationResponse, bool, error) {
106+
changed, err := chasm.ExecutionStateChanged(o, ctx, token)
107+
if err != nil {
108+
if errors.Is(err, chasm.ErrMalformedComponentRef) {
109+
return nil, false, serviceerror.NewInvalidArgument("invalid long poll token")
110+
}
111+
if errors.Is(err, chasm.ErrInvalidComponentRef) {
112+
return nil, false, serviceerror.NewInvalidArgument("long poll token does not match execution")
113+
}
114+
return nil, false, err
115+
}
116+
if changed {
117+
response, err := o.buildDescribeResponse(ctx, req)
118+
return response, true, err
119+
}
120+
return nil, false, nil
121+
}, req)
122+
123+
if err != nil && ctx.Err() != nil {
124+
// Send empty non-error response on deadline expiry: caller should continue long-polling.
125+
return &nexusoperationpb.DescribeNexusOperationResponse{
126+
FrontendResponse: &workflowservice.DescribeNexusOperationExecutionResponse{},
127+
}, nil
128+
}
129+
return response, err
130+
}
131+
132+
// PollNexusOperation long-polls for a Nexus operation to reach a specific stage.
133+
//
134+
// It returns an empty non-error response on context deadline expiry, to indicate that the state
135+
// being waited for was not reached. Callers should interpret this as an invitation to resubmit
136+
// their long-poll request. This response is sent before the caller's
137+
// deadline (see nexusoperation.longPollBuffer) so that it is likely that the caller
138+
// does indeed receive the non-error response.
139+
func (h *handler) PollNexusOperation(
140+
ctx context.Context,
141+
req *nexusoperationpb.PollNexusOperationRequest,
142+
) (response *nexusoperationpb.PollNexusOperationResponse, err error) {
143+
defer log.CapturePanic(h.logger, &err)
144+
145+
ref := chasm.NewComponentRef[*Operation](chasm.ExecutionKey{
146+
NamespaceID: req.GetNamespaceId(),
147+
BusinessID: req.GetFrontendRequest().GetOperationId(),
148+
RunID: req.GetFrontendRequest().GetRunId(),
149+
})
150+
151+
// Determine the long poll timeout and buffer.
152+
ns := req.GetFrontendRequest().GetNamespace()
153+
ctx, cancel := contextutil.WithDeadlineBuffer(
154+
ctx,
155+
h.config.LongPollTimeout(ns),
156+
h.config.LongPollBuffer(ns),
157+
)
158+
defer cancel()
159+
160+
// Poll for the wait stage to be reached.
161+
waitStage := req.GetFrontendRequest().GetWaitStage()
162+
response, _, err = chasm.PollComponent(ctx, ref, func(
163+
o *Operation,
164+
ctx chasm.Context,
165+
req *nexusoperationpb.PollNexusOperationRequest,
166+
) (*nexusoperationpb.PollNexusOperationResponse, bool, error) {
167+
if o.isWaitStageReached(ctx, waitStage) {
168+
response := o.buildPollResponse(ctx)
169+
return response, true, nil
170+
}
171+
return nil, false, nil
172+
}, req)
173+
174+
if err != nil && ctx.Err() != nil {
175+
// Send an empty non-error response as an invitation to resubmit the long-poll.
176+
return &nexusoperationpb.PollNexusOperationResponse{
177+
FrontendResponse: &workflowservice.PollNexusOperationExecutionResponse{},
178+
}, nil
179+
}
180+
return response, err
76181
}
77182

78183
// RequestCancelNexusOperation requests cancellation of a standalone Nexus operation via CHASM.

chasm/lib/nexusoperation/operation.go

Lines changed: 69 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -336,10 +336,15 @@ func (o *Operation) buildDescribeResponse(
336336
ctx chasm.Context,
337337
req *nexusoperationpb.DescribeNexusOperationRequest,
338338
) (*nexusoperationpb.DescribeNexusOperationResponse, error) {
339+
token, err := ctx.Ref(o)
340+
if err != nil {
341+
return nil, err
342+
}
343+
339344
resp := &workflowservice.DescribeNexusOperationExecutionResponse{
340-
RunId: ctx.ExecutionKey().RunID,
341-
Info: o.buildExecutionInfo(ctx),
342-
// TODO: Add LongPollToken support.
345+
RunId: ctx.ExecutionKey().RunID,
346+
Info: o.buildExecutionInfo(ctx),
347+
LongPollToken: token,
343348
}
344349

345350
// Include input, if requested
@@ -348,14 +353,12 @@ func (o *Operation) buildDescribeResponse(
348353
}
349354

350355
// Include output, if available and requested
351-
// TODO: get failure from last attempt for running operation, if available
352-
if req.GetFrontendRequest().GetIncludeOutcome() && o.LifecycleState(ctx).IsClosed() {
353-
outcome := o.Outcome.Get(ctx)
354-
if successful := outcome.GetSuccessful(); successful != nil {
356+
if req.GetFrontendRequest().GetIncludeOutcome() && o.isClosed() {
357+
if successful, failure := o.describeOutcome(ctx); successful != nil {
355358
resp.Outcome = &workflowservice.DescribeNexusOperationExecutionResponse_Result{
356-
Result: successful.GetResult(),
359+
Result: successful,
357360
}
358-
} else if failure := outcome.GetFailed().GetFailure(); failure != nil {
361+
} else if failure != nil {
359362
resp.Outcome = &workflowservice.DescribeNexusOperationExecutionResponse_Failure{
360363
Failure: failure,
361364
}
@@ -367,6 +370,63 @@ func (o *Operation) buildDescribeResponse(
367370
}, nil
368371
}
369372

373+
func (o *Operation) buildPollResponse(
374+
ctx chasm.Context,
375+
) *nexusoperationpb.PollNexusOperationResponse {
376+
resp := &workflowservice.PollNexusOperationExecutionResponse{
377+
RunId: ctx.ExecutionKey().RunID,
378+
OperationToken: o.OperationToken,
379+
}
380+
381+
if o.isClosed() {
382+
resp.WaitStage = enumspb.NEXUS_OPERATION_WAIT_STAGE_CLOSED
383+
if successful, failure := o.describeOutcome(ctx); successful != nil {
384+
resp.Outcome = &workflowservice.PollNexusOperationExecutionResponse_Result{
385+
Result: successful,
386+
}
387+
} else if failure != nil {
388+
resp.Outcome = &workflowservice.PollNexusOperationExecutionResponse_Failure{
389+
Failure: failure,
390+
}
391+
}
392+
} else {
393+
resp.WaitStage = enumspb.NEXUS_OPERATION_WAIT_STAGE_STARTED
394+
}
395+
396+
return &nexusoperationpb.PollNexusOperationResponse{
397+
FrontendResponse: resp,
398+
}
399+
}
400+
401+
func (o *Operation) describeOutcome(ctx chasm.Context) (*commonpb.Payload, *failurepb.Failure) {
402+
outcome := o.Outcome.Get(ctx)
403+
if successful := outcome.GetSuccessful(); successful != nil {
404+
return successful.GetResult(), nil
405+
}
406+
if failure := outcome.GetFailed().GetFailure(); failure != nil {
407+
return nil, failure
408+
}
409+
return nil, o.LastAttemptFailure
410+
}
411+
412+
// isWaitStageReached checks if the operation has reached the requested wait stage.
413+
// NOTE: WaitStage.UNSPECIFIED is normalized to CLOSED by the frontend validator.
414+
func (o *Operation) isWaitStageReached(_ chasm.Context, waitStage enumspb.NexusOperationWaitStage) bool {
415+
switch waitStage {
416+
case enumspb.NEXUS_OPERATION_WAIT_STAGE_STARTED:
417+
return o.Status == nexusoperationpb.OPERATION_STATUS_STARTED || o.isClosed()
418+
case enumspb.NEXUS_OPERATION_WAIT_STAGE_CLOSED:
419+
return o.isClosed()
420+
default:
421+
return false
422+
}
423+
}
424+
425+
// isClosed returns true if the operation is in a terminal state.
426+
func (o *Operation) isClosed() bool {
427+
return o.LifecycleState(nil).IsClosed()
428+
}
429+
370430
func (o *Operation) buildExecutionInfo(ctx chasm.Context) *nexuspb.NexusOperationExecutionInfo {
371431
requestData := o.RequestData.Get(ctx)
372432

0 commit comments

Comments
 (0)