Skip to content

Commit 6e1ea50

Browse files
stephanosclaude
andcommitted
Nexus Standalone: Poll (#9780)
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 118d164 commit 6e1ea50

7 files changed

Lines changed: 2437 additions & 174 deletions

File tree

chasm/lib/nexusoperation/config.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,21 @@ import (
1313
"go.temporal.io/server/common/rpc/interceptor"
1414
)
1515

16+
var LongPollTimeout = dynamicconfig.NewNamespaceDurationSetting(
17+
"nexusoperation.longPollTimeout",
18+
20*time.Second,
19+
`Maximum timeout for nexus operation long-poll requests. Actual wait may be shorter to leave
20+
longPollBuffer before the caller deadline.`,
21+
)
22+
23+
var LongPollBuffer = dynamicconfig.NewNamespaceDurationSetting(
24+
"nexusoperation.longPollBuffer",
25+
time.Second,
26+
`A buffer used to adjust the nexus operation long-poll timeouts.
27+
Specifically, nexus operation long-poll requests are timed out at a time which leaves at least the buffer's duration
28+
remaining before the caller's deadline, if permitted by the caller's deadline.`,
29+
)
30+
1631
var Enabled = dynamicconfig.NewNamespaceBoolSetting(
1732
"nexusoperation.enableStandalone",
1833
false,
@@ -198,6 +213,8 @@ type Config struct {
198213
EnableChasm dynamicconfig.BoolPropertyFnWithNamespaceFilter
199214
EnableChasmNexus dynamicconfig.BoolPropertyFnWithNamespaceFilter
200215
NumHistoryShards int32
216+
LongPollBuffer dynamicconfig.DurationPropertyFnWithNamespaceFilter
217+
LongPollTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter
201218
RequestTimeout dynamicconfig.DurationPropertyFnWithDestinationFilter
202219
MinRequestTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter
203220
MaxConcurrentOperationsPerWorkflow dynamicconfig.IntPropertyFnWithNamespaceFilter
@@ -223,6 +240,8 @@ func configProvider(dc *dynamicconfig.Collection, cfg *config.Persistence) *Conf
223240
EnableChasm: dynamicconfig.EnableChasm.Get(dc),
224241
EnableChasmNexus: EnableChasmNexus.Get(dc),
225242
NumHistoryShards: cfg.NumHistoryShards,
243+
LongPollBuffer: LongPollBuffer.Get(dc),
244+
LongPollTimeout: LongPollTimeout.Get(dc),
226245
RequestTimeout: RequestTimeout.Get(dc),
227246
MinRequestTimeout: MinRequestTimeout.Get(dc),
228247
MaxConcurrentOperationsPerWorkflow: MaxConcurrentOperationsPerWorkflow.Get(dc),

chasm/lib/nexusoperation/frontend.go

Lines changed: 181 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,20 @@ package nexusoperation
33
import (
44
"context"
55

6+
commonpb "go.temporal.io/api/common/v1"
7+
enumspb "go.temporal.io/api/enums/v1"
8+
nexuspb "go.temporal.io/api/nexus/v1"
69
"go.temporal.io/api/serviceerror"
710
"go.temporal.io/api/workflowservice/v1"
11+
"go.temporal.io/server/chasm"
812
nexusoperationpb "go.temporal.io/server/chasm/lib/nexusoperation/gen/nexusoperationpb/v1"
913
"go.temporal.io/server/common/log"
1014
"go.temporal.io/server/common/namespace"
1115
commonnexus "go.temporal.io/server/common/nexus"
1216
"go.temporal.io/server/common/searchattribute"
17+
"google.golang.org/protobuf/types/known/durationpb"
18+
"google.golang.org/protobuf/types/known/emptypb"
19+
"google.golang.org/protobuf/types/known/timestamppb"
1320
)
1421

1522
// FrontendHandler provides the frontend-facing API for standalone Nexus operations.
@@ -58,7 +65,7 @@ func NewFrontendHandler(
5865

5966
// isStandaloneNexusOperationEnabled checks if standalone Nexus operations are enabled for the given namespace.
6067
func (h *frontendHandler) isStandaloneNexusOperationEnabled(namespaceName string) bool {
61-
return h.config.EnableChasm(namespaceName) && h.config.Enabled(namespaceName)
68+
return h.config.ChasmEnabled(namespaceName) && h.config.Enabled(namespaceName)
6269
}
6370

6471
func (h *frontendHandler) StartNexusOperationExecution(
@@ -74,7 +81,7 @@ func (h *frontendHandler) StartNexusOperationExecution(
7481
return nil, err
7582
}
7683

77-
if err := validateAndNormalizeStartRequest(req, h.config, h.saMapperProvider, h.saValidator); err != nil {
84+
if err := validateAndNormalizeStartRequest(req, h.config, h.logger, h.saMapperProvider, h.saValidator); err != nil {
7885
return nil, err
7986
}
8087

@@ -103,7 +110,7 @@ func (h *frontendHandler) DescribeNexusOperationExecution(
103110
return nil, err
104111
}
105112

106-
if err := validateDescribeNexusOperationExecutionRequest(req, h.config); err != nil {
113+
if err := validateAndNormalizeDescribeRequest(req, namespaceID.String(), h.config); err != nil {
107114
return nil, err
108115
}
109116

@@ -114,44 +121,203 @@ func (h *frontendHandler) DescribeNexusOperationExecution(
114121
return resp.GetFrontendResponse(), err
115122
}
116123

117-
func (h *frontendHandler) PollNexusOperationExecution(_ context.Context, req *workflowservice.PollNexusOperationExecutionRequest) (*workflowservice.PollNexusOperationExecutionResponse, error) {
124+
// PollNexusOperationExecution long-polls for a Nexus operation to reach a specific stage.
125+
func (h *frontendHandler) PollNexusOperationExecution(
126+
ctx context.Context,
127+
req *workflowservice.PollNexusOperationExecutionRequest,
128+
) (*workflowservice.PollNexusOperationExecutionResponse, error) {
118129
if !h.isStandaloneNexusOperationEnabled(req.GetNamespace()) {
119130
return nil, ErrStandaloneNexusOperationDisabled
120131
}
121-
return nil, serviceerror.NewUnimplemented("PollNexusOperationExecution not implemented")
132+
133+
if err := validateAndNormalizePollRequest(req, h.config); err != nil {
134+
return nil, err
135+
}
136+
137+
namespaceID, err := h.namespaceRegistry.GetNamespaceID(namespace.Name(req.GetNamespace()))
138+
if err != nil {
139+
return nil, err
140+
}
141+
142+
resp, err := h.client.PollNexusOperation(ctx, &nexusoperationpb.PollNexusOperationRequest{
143+
NamespaceId: namespaceID.String(),
144+
FrontendRequest: req,
145+
})
146+
return resp.GetFrontendResponse(), err
122147
}
123148

124-
func (h *frontendHandler) ListNexusOperationExecutions(_ context.Context, req *workflowservice.ListNexusOperationExecutionsRequest) (*workflowservice.ListNexusOperationExecutionsResponse, error) {
149+
func (h *frontendHandler) ListNexusOperationExecutions(
150+
ctx context.Context,
151+
req *workflowservice.ListNexusOperationExecutionsRequest,
152+
) (*workflowservice.ListNexusOperationExecutionsResponse, error) {
125153
if !h.isStandaloneNexusOperationEnabled(req.GetNamespace()) {
126154
return nil, ErrStandaloneNexusOperationDisabled
127155
}
128-
return nil, serviceerror.NewUnimplemented("ListNexusOperationExecutions not implemented")
156+
157+
pageSize := req.GetPageSize()
158+
maxPageSize := int32(h.config.VisibilityMaxPageSize(req.GetNamespace()))
159+
if pageSize <= 0 || pageSize > maxPageSize {
160+
pageSize = maxPageSize
161+
}
162+
163+
resp, err := chasm.ListExecutions[*Operation, *emptypb.Empty](ctx, &chasm.ListExecutionsRequest{
164+
NamespaceName: req.GetNamespace(),
165+
PageSize: int(pageSize),
166+
NextPageToken: req.GetNextPageToken(),
167+
Query: req.GetQuery(),
168+
})
169+
if err != nil {
170+
return nil, err
171+
}
172+
173+
operations := make([]*nexuspb.NexusOperationExecutionListInfo, 0, len(resp.Executions))
174+
for _, exec := range resp.Executions {
175+
endpoint, _ := chasm.SearchAttributeValue(exec.ChasmSearchAttributes, EndpointSearchAttribute)
176+
service, _ := chasm.SearchAttributeValue(exec.ChasmSearchAttributes, ServiceSearchAttribute)
177+
operation, _ := chasm.SearchAttributeValue(exec.ChasmSearchAttributes, OperationSearchAttribute)
178+
statusStr, _ := chasm.SearchAttributeValue(exec.ChasmSearchAttributes, StatusSearchAttribute)
179+
status, _ := enumspb.NexusOperationExecutionStatusFromString(statusStr)
180+
181+
var closeTime *timestamppb.Timestamp
182+
var executionDuration *durationpb.Duration
183+
if !exec.CloseTime.IsZero() {
184+
closeTime = timestamppb.New(exec.CloseTime)
185+
if !exec.StartTime.IsZero() {
186+
executionDuration = durationpb.New(exec.CloseTime.Sub(exec.StartTime))
187+
}
188+
}
189+
190+
operations = append(operations, &nexuspb.NexusOperationExecutionListInfo{
191+
OperationId: exec.BusinessID,
192+
RunId: exec.RunID,
193+
Endpoint: endpoint,
194+
Service: service,
195+
Operation: operation,
196+
Status: status,
197+
ScheduleTime: timestamppb.New(exec.StartTime),
198+
CloseTime: closeTime,
199+
ExecutionDuration: executionDuration,
200+
StateTransitionCount: exec.StateTransitionCount,
201+
SearchAttributes: &commonpb.SearchAttributes{IndexedFields: exec.CustomSearchAttributes},
202+
})
203+
}
204+
205+
return &workflowservice.ListNexusOperationExecutionsResponse{
206+
Operations: operations,
207+
NextPageToken: resp.NextPageToken,
208+
}, nil
129209
}
130210

131-
func (h *frontendHandler) CountNexusOperationExecutions(_ context.Context, req *workflowservice.CountNexusOperationExecutionsRequest) (*workflowservice.CountNexusOperationExecutionsResponse, error) {
211+
func (h *frontendHandler) CountNexusOperationExecutions(
212+
ctx context.Context,
213+
req *workflowservice.CountNexusOperationExecutionsRequest,
214+
) (*workflowservice.CountNexusOperationExecutionsResponse, error) {
132215
if !h.isStandaloneNexusOperationEnabled(req.GetNamespace()) {
133216
return nil, ErrStandaloneNexusOperationDisabled
134217
}
135-
return nil, serviceerror.NewUnimplemented("CountNexusOperationExecutions not implemented")
218+
219+
resp, err := chasm.CountExecutions[*Operation](ctx, &chasm.CountExecutionsRequest{
220+
NamespaceName: req.GetNamespace(),
221+
Query: req.GetQuery(),
222+
})
223+
if err != nil {
224+
return nil, err
225+
}
226+
227+
groups := make([]*workflowservice.CountNexusOperationExecutionsResponse_AggregationGroup, 0, len(resp.Groups))
228+
for _, g := range resp.Groups {
229+
groups = append(groups, &workflowservice.CountNexusOperationExecutionsResponse_AggregationGroup{
230+
GroupValues: g.Values,
231+
Count: g.Count,
232+
})
233+
}
234+
235+
return &workflowservice.CountNexusOperationExecutionsResponse{
236+
Count: resp.Count,
237+
Groups: groups,
238+
}, nil
136239
}
137240

138-
func (h *frontendHandler) RequestCancelNexusOperationExecution(_ context.Context, req *workflowservice.RequestCancelNexusOperationExecutionRequest) (*workflowservice.RequestCancelNexusOperationExecutionResponse, error) {
241+
func (h *frontendHandler) RequestCancelNexusOperationExecution(
242+
ctx context.Context,
243+
req *workflowservice.RequestCancelNexusOperationExecutionRequest,
244+
) (*workflowservice.RequestCancelNexusOperationExecutionResponse, error) {
139245
if !h.isStandaloneNexusOperationEnabled(req.GetNamespace()) {
140246
return nil, ErrStandaloneNexusOperationDisabled
141247
}
142-
return nil, serviceerror.NewUnimplemented("RequestCancelNexusOperationExecution not implemented")
248+
249+
namespaceID, err := h.namespaceRegistry.GetNamespaceID(namespace.Name(req.GetNamespace()))
250+
if err != nil {
251+
return nil, err
252+
}
253+
254+
if err := validateAndNormalizeCancelRequest(req, h.config); err != nil {
255+
return nil, err
256+
}
257+
258+
_, err = h.client.RequestCancelNexusOperation(ctx, &nexusoperationpb.RequestCancelNexusOperationRequest{
259+
NamespaceId: namespaceID.String(),
260+
FrontendRequest: req,
261+
})
262+
if err != nil {
263+
return nil, err
264+
}
265+
266+
return &workflowservice.RequestCancelNexusOperationExecutionResponse{}, nil
143267
}
144268

145-
func (h *frontendHandler) TerminateNexusOperationExecution(_ context.Context, req *workflowservice.TerminateNexusOperationExecutionRequest) (*workflowservice.TerminateNexusOperationExecutionResponse, error) {
269+
func (h *frontendHandler) TerminateNexusOperationExecution(
270+
ctx context.Context,
271+
req *workflowservice.TerminateNexusOperationExecutionRequest,
272+
) (*workflowservice.TerminateNexusOperationExecutionResponse, error) {
146273
if !h.isStandaloneNexusOperationEnabled(req.GetNamespace()) {
147274
return nil, ErrStandaloneNexusOperationDisabled
148275
}
149-
return nil, serviceerror.NewUnimplemented("TerminateNexusOperationExecution not implemented")
276+
277+
namespaceID, err := h.namespaceRegistry.GetNamespaceID(namespace.Name(req.GetNamespace()))
278+
if err != nil {
279+
return nil, err
280+
}
281+
282+
if err := validateAndNormalizeTerminateRequest(req, h.config); err != nil {
283+
return nil, err
284+
}
285+
286+
_, err = h.client.TerminateNexusOperation(ctx, &nexusoperationpb.TerminateNexusOperationRequest{
287+
NamespaceId: namespaceID.String(),
288+
FrontendRequest: req,
289+
})
290+
if err != nil {
291+
return nil, err
292+
}
293+
294+
return &workflowservice.TerminateNexusOperationExecutionResponse{}, nil
150295
}
151296

152-
func (h *frontendHandler) DeleteNexusOperationExecution(_ context.Context, req *workflowservice.DeleteNexusOperationExecutionRequest) (*workflowservice.DeleteNexusOperationExecutionResponse, error) {
297+
func (h *frontendHandler) DeleteNexusOperationExecution(
298+
ctx context.Context,
299+
req *workflowservice.DeleteNexusOperationExecutionRequest,
300+
) (*workflowservice.DeleteNexusOperationExecutionResponse, error) {
153301
if !h.isStandaloneNexusOperationEnabled(req.GetNamespace()) {
154302
return nil, ErrStandaloneNexusOperationDisabled
155303
}
156-
return nil, serviceerror.NewUnimplemented("DeleteNexusOperationExecution not implemented")
304+
305+
namespaceID, err := h.namespaceRegistry.GetNamespaceID(namespace.Name(req.GetNamespace()))
306+
if err != nil {
307+
return nil, err
308+
}
309+
310+
if err := validateAndNormalizeDeleteRequest(req, h.config); err != nil {
311+
return nil, err
312+
}
313+
314+
_, err = h.client.DeleteNexusOperation(ctx, &nexusoperationpb.DeleteNexusOperationRequest{
315+
NamespaceId: namespaceID.String(),
316+
FrontendRequest: req,
317+
})
318+
if err != nil {
319+
return nil, err
320+
}
321+
322+
return &workflowservice.DeleteNexusOperationExecutionResponse{}, nil
157323
}

0 commit comments

Comments
 (0)