Skip to content

Commit 118d164

Browse files
stephanosclaude
andcommitted
Nexus Standalone: Start + Describe (#9487)
Add Nexus Standalone Describe and Start handlers. - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [x] added new unit test(s) - [x] added new functional test(s) --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 118fb93 commit 118d164

15 files changed

Lines changed: 402 additions & 3821 deletions

chasm/lib/nexusoperation/config.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,9 @@ type Config struct {
212212
UseSystemCallbackURL dynamicconfig.BoolPropertyFn
213213
UseNewFailureWireFormat dynamicconfig.BoolPropertyFnWithNamespaceFilter
214214
RecordCancelRequestCompletionEvents dynamicconfig.BoolPropertyFn
215-
RetryPolicy dynamicconfig.TypedPropertyFn[backoff.RetryPolicy]
215+
VisibilityMaxPageSize dynamicconfig.IntPropertyFnWithNamespaceFilter
216+
MaxIDLengthLimit dynamicconfig.IntPropertyFn
217+
RetryPolicy func() backoff.RetryPolicy
216218
}
217219

218220
func configProvider(dc *dynamicconfig.Collection, cfg *config.Persistence) *Config {
@@ -234,6 +236,8 @@ func configProvider(dc *dynamicconfig.Collection, cfg *config.Persistence) *Conf
234236
CallbackURLTemplate: CallbackURLTemplate.Get(dc),
235237
UseSystemCallbackURL: UseSystemCallbackURL.Get(dc),
236238
UseNewFailureWireFormat: UseNewFailureWireFormat.Get(dc),
239+
VisibilityMaxPageSize: dynamicconfig.FrontendVisibilityMaxPageSize.Get(dc),
240+
MaxIDLengthLimit: dynamicconfig.MaxIDLengthLimit.Get(dc),
237241
RetryPolicy: RetryPolicy.Get(dc),
238242
}
239243
}

chasm/lib/nexusoperation/frontend.go

Lines changed: 58 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@ import (
55

66
"go.temporal.io/api/serviceerror"
77
"go.temporal.io/api/workflowservice/v1"
8+
nexusoperationpb "go.temporal.io/server/chasm/lib/nexusoperation/gen/nexusoperationpb/v1"
89
"go.temporal.io/server/common/log"
910
"go.temporal.io/server/common/namespace"
11+
commonnexus "go.temporal.io/server/common/nexus"
12+
"go.temporal.io/server/common/searchattribute"
1013
)
1114

1215
// FrontendHandler provides the frontend-facing API for standalone Nexus operations.
@@ -24,20 +27,32 @@ type FrontendHandler interface {
2427
var ErrStandaloneNexusOperationDisabled = serviceerror.NewUnimplemented("Standalone Nexus operation is disabled")
2528

2629
type frontendHandler struct {
30+
client nexusoperationpb.NexusOperationServiceClient
2731
config *Config
2832
logger log.Logger
2933
namespaceRegistry namespace.Registry
34+
endpointRegistry commonnexus.EndpointRegistry
35+
saMapperProvider searchattribute.MapperProvider
36+
saValidator *searchattribute.Validator
3037
}
3138

3239
func NewFrontendHandler(
40+
client nexusoperationpb.NexusOperationServiceClient,
3341
config *Config,
3442
logger log.Logger,
3543
namespaceRegistry namespace.Registry,
44+
endpointRegistry commonnexus.EndpointRegistry,
45+
saMapperProvider searchattribute.MapperProvider,
46+
saValidator *searchattribute.Validator,
3647
) FrontendHandler {
3748
return &frontendHandler{
49+
client: client,
3850
config: config,
3951
logger: logger,
4052
namespaceRegistry: namespaceRegistry,
53+
endpointRegistry: endpointRegistry,
54+
saMapperProvider: saMapperProvider,
55+
saValidator: saValidator,
4156
}
4257
}
4358

@@ -46,18 +61,57 @@ func (h *frontendHandler) isStandaloneNexusOperationEnabled(namespaceName string
4661
return h.config.EnableChasm(namespaceName) && h.config.Enabled(namespaceName)
4762
}
4863

49-
func (h *frontendHandler) StartNexusOperationExecution(_ context.Context, req *workflowservice.StartNexusOperationExecutionRequest) (*workflowservice.StartNexusOperationExecutionResponse, error) {
64+
func (h *frontendHandler) StartNexusOperationExecution(
65+
ctx context.Context,
66+
req *workflowservice.StartNexusOperationExecutionRequest,
67+
) (*workflowservice.StartNexusOperationExecutionResponse, error) {
5068
if !h.isStandaloneNexusOperationEnabled(req.GetNamespace()) {
5169
return nil, ErrStandaloneNexusOperationDisabled
5270
}
53-
return nil, serviceerror.NewUnimplemented("StartNexusOperationExecution not implemented")
71+
72+
namespaceID, err := h.namespaceRegistry.GetNamespaceID(namespace.Name(req.GetNamespace()))
73+
if err != nil {
74+
return nil, err
75+
}
76+
77+
if err := validateAndNormalizeStartRequest(req, h.config, h.saMapperProvider, h.saValidator); err != nil {
78+
return nil, err
79+
}
80+
81+
// Verify the endpoint exists before creating the operation.
82+
if _, err := h.endpointRegistry.GetByName(ctx, namespaceID, req.GetEndpoint()); err != nil {
83+
return nil, err
84+
}
85+
86+
resp, err := h.client.StartNexusOperation(ctx, &nexusoperationpb.StartNexusOperationRequest{
87+
NamespaceId: namespaceID.String(),
88+
FrontendRequest: req,
89+
})
90+
return resp.GetFrontendResponse(), err
5491
}
5592

56-
func (h *frontendHandler) DescribeNexusOperationExecution(_ context.Context, req *workflowservice.DescribeNexusOperationExecutionRequest) (*workflowservice.DescribeNexusOperationExecutionResponse, error) {
93+
func (h *frontendHandler) DescribeNexusOperationExecution(
94+
ctx context.Context,
95+
req *workflowservice.DescribeNexusOperationExecutionRequest,
96+
) (*workflowservice.DescribeNexusOperationExecutionResponse, error) {
5797
if !h.isStandaloneNexusOperationEnabled(req.GetNamespace()) {
5898
return nil, ErrStandaloneNexusOperationDisabled
5999
}
60-
return nil, serviceerror.NewUnimplemented("DescribeNexusOperationExecution not implemented")
100+
101+
namespaceID, err := h.namespaceRegistry.GetNamespaceID(namespace.Name(req.GetNamespace()))
102+
if err != nil {
103+
return nil, err
104+
}
105+
106+
if err := validateDescribeNexusOperationExecutionRequest(req, h.config); err != nil {
107+
return nil, err
108+
}
109+
110+
resp, err := h.client.DescribeNexusOperation(ctx, &nexusoperationpb.DescribeNexusOperationRequest{
111+
NamespaceId: namespaceID.String(),
112+
FrontendRequest: req,
113+
})
114+
return resp.GetFrontendResponse(), err
61115
}
62116

63117
func (h *frontendHandler) PollNexusOperationExecution(_ context.Context, req *workflowservice.PollNexusOperationExecutionRequest) (*workflowservice.PollNexusOperationExecutionResponse, error) {

0 commit comments

Comments
 (0)