Skip to content

Commit dc33154

Browse files
stephanosclaude
andcommitted
Nexus Standalone: Delete (#9654)
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 22a44e0 commit dc33154

7 files changed

Lines changed: 296 additions & 36 deletions

File tree

chasm/lib/nexusoperation/frontend.go

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -276,9 +276,30 @@ func (h *frontendHandler) TerminateNexusOperationExecution(
276276
return &workflowservice.TerminateNexusOperationExecutionResponse{}, nil
277277
}
278278

279-
func (h *frontendHandler) DeleteNexusOperationExecution(_ context.Context, req *workflowservice.DeleteNexusOperationExecutionRequest) (*workflowservice.DeleteNexusOperationExecutionResponse, error) {
279+
func (h *frontendHandler) DeleteNexusOperationExecution(
280+
ctx context.Context,
281+
req *workflowservice.DeleteNexusOperationExecutionRequest,
282+
) (*workflowservice.DeleteNexusOperationExecutionResponse, error) {
280283
if !h.isStandaloneNexusOperationEnabled(req.GetNamespace()) {
281284
return nil, ErrStandaloneNexusOperationDisabled
282285
}
283-
return nil, serviceerror.NewUnimplemented("DeleteNexusOperationExecution not implemented")
286+
287+
namespaceID, err := h.namespaceRegistry.GetNamespaceID(namespace.Name(req.GetNamespace()))
288+
if err != nil {
289+
return nil, err
290+
}
291+
292+
if err := validateAndNormalizeDeleteRequest(req, h.config); err != nil {
293+
return nil, err
294+
}
295+
296+
_, err = h.client.DeleteNexusOperation(ctx, &nexusoperationpb.DeleteNexusOperationRequest{
297+
NamespaceId: namespaceID.String(),
298+
FrontendRequest: req,
299+
})
300+
if err != nil {
301+
return nil, err
302+
}
303+
304+
return &workflowservice.DeleteNexusOperationExecutionResponse{}, nil
284305
}

chasm/lib/nexusoperation/handler.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,31 @@ func (h *handler) TerminateNexusOperation(
135135
return resp, err
136136
}
137137

138+
// DeleteNexusOperation terminates the nexus operation if running, then schedules it for deletion.
139+
func (h *handler) DeleteNexusOperation(
140+
ctx context.Context,
141+
req *nexusoperationpb.DeleteNexusOperationRequest,
142+
) (response *nexusoperationpb.DeleteNexusOperationResponse, err error) {
143+
defer log.CapturePanic(h.logger, &err)
144+
145+
frontendReq := req.GetFrontendRequest()
146+
147+
key := chasm.ExecutionKey{
148+
NamespaceID: req.GetNamespaceId(),
149+
BusinessID: frontendReq.GetOperationId(),
150+
RunID: frontendReq.GetRunId(),
151+
}
152+
153+
if err := chasm.DeleteExecution[*Operation](ctx, key, chasm.DeleteExecutionRequest{
154+
TerminateComponentRequest: chasm.TerminateComponentRequest{
155+
Reason: "Delete nexus operation execution",
156+
},
157+
}); err != nil {
158+
return nil, err
159+
}
160+
161+
return &nexusoperationpb.DeleteNexusOperationResponse{}, nil
162+
}
138163
func idReusePolicyFromProto(p enumspb.NexusOperationIdReusePolicy) chasm.BusinessIDReusePolicy {
139164
switch p {
140165
case enumspb.NEXUS_OPERATION_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY:

chasm/lib/nexusoperation/operation_statemachine.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,7 @@ var TransitionTerminated = chasm.NewTransition(
240240
nexusoperationpb.OPERATION_STATUS_SCHEDULED,
241241
nexusoperationpb.OPERATION_STATUS_STARTED,
242242
nexusoperationpb.OPERATION_STATUS_BACKING_OFF,
243+
nexusoperationpb.OPERATION_STATUS_CANCELED,
243244
},
244245
nexusoperationpb.OPERATION_STATUS_TERMINATED,
245246
func(o *Operation, ctx chasm.MutableContext, event EventTerminated) error {

chasm/lib/nexusoperation/operation_statemachine_test.go

Lines changed: 59 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -511,41 +511,70 @@ func TestTransitionTimedOut(t *testing.T) {
511511
}
512512

513513
func TestTransitionTerminated(t *testing.T) {
514-
ctx := &chasm.MockMutableContext{
515-
MockContext: chasm.MockContext{
516-
HandleNow: func(chasm.Component) time.Time { return defaultTime },
514+
testCases := []struct {
515+
name string
516+
startStatus nexusoperationpb.OperationStatus
517+
}{
518+
{
519+
name: "terminated from scheduled",
520+
startStatus: nexusoperationpb.OPERATION_STATUS_SCHEDULED,
521+
},
522+
{
523+
name: "terminated from started",
524+
startStatus: nexusoperationpb.OPERATION_STATUS_STARTED,
525+
},
526+
{
527+
name: "terminated from backing off",
528+
startStatus: nexusoperationpb.OPERATION_STATUS_BACKING_OFF,
529+
},
530+
{
531+
name: "terminated from canceled",
532+
startStatus: nexusoperationpb.OPERATION_STATUS_CANCELED,
517533
},
518534
}
519535

520-
operation := newTestOperation()
521-
operation.Status = nexusoperationpb.OPERATION_STATUS_SCHEDULED
536+
for _, tc := range testCases {
537+
t.Run(tc.name, func(t *testing.T) {
538+
ctx := &chasm.MockMutableContext{
539+
MockContext: chasm.MockContext{
540+
HandleNow: func(chasm.Component) time.Time { return defaultTime },
541+
},
542+
}
543+
operation := newTestOperation()
544+
operation.Status = tc.startStatus
522545

523-
err := TransitionTerminated.Apply(operation, ctx, EventTerminated{
524-
TerminateComponentRequest: chasm.TerminateComponentRequest{
525-
RequestID: "terminate-request-id",
526-
Reason: "test reason",
527-
Identity: "test-identity",
528-
},
529-
})
530-
require.NoError(t, err)
546+
event := EventTerminated{TerminateComponentRequest: chasm.TerminateComponentRequest{
547+
RequestID: "terminate-request-id",
548+
Reason: "test reason",
549+
Identity: "test-identity",
550+
}}
531551

532-
require.Equal(t, nexusoperationpb.OPERATION_STATUS_TERMINATED, operation.Status)
533-
require.Equal(t, defaultTime, operation.ClosedTime.AsTime())
534-
protorequire.ProtoEqual(t, &nexusoperationpb.NexusOperationTerminateState{
535-
RequestId: "terminate-request-id",
536-
Identity: "test-identity",
537-
}, operation.TerminateState)
538-
protorequire.ProtoEqual(t, &nexusoperationpb.OperationOutcome{
539-
Variant: &nexusoperationpb.OperationOutcome_Failed_{
540-
Failed: &nexusoperationpb.OperationOutcome_Failed{
541-
Failure: &failurepb.Failure{
542-
Message: "test reason",
543-
FailureInfo: &failurepb.Failure_TerminatedFailureInfo{
544-
TerminatedFailureInfo: &failurepb.TerminatedFailureInfo{},
552+
err := TransitionTerminated.Apply(operation, ctx, event)
553+
require.NoError(t, err)
554+
555+
require.Equal(t, nexusoperationpb.OPERATION_STATUS_TERMINATED, operation.Status)
556+
require.Equal(t, defaultTime, operation.ClosedTime.AsTime())
557+
protorequire.ProtoEqual(t, &nexusoperationpb.NexusOperationTerminateState{
558+
RequestId: "terminate-request-id",
559+
Identity: "test-identity",
560+
}, operation.TerminateState)
561+
562+
// Verify outcome failure is set with terminated info and reason as message.
563+
protorequire.ProtoEqual(t, &nexusoperationpb.OperationOutcome{
564+
Variant: &nexusoperationpb.OperationOutcome_Failed_{
565+
Failed: &nexusoperationpb.OperationOutcome_Failed{
566+
Failure: &failurepb.Failure{
567+
Message: "test reason",
568+
FailureInfo: &failurepb.Failure_TerminatedFailureInfo{
569+
TerminatedFailureInfo: &failurepb.TerminatedFailureInfo{},
570+
},
571+
},
545572
},
546573
},
547-
},
548-
},
549-
}, operation.Outcome.Get(ctx))
550-
require.Empty(t, ctx.Tasks)
574+
}, operation.Outcome.Get(ctx))
575+
576+
// Terminal state - no tasks should be emitted
577+
require.Empty(t, ctx.Tasks)
578+
})
579+
}
551580
}

chasm/lib/nexusoperation/validator.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,22 @@ func validateAndNormalizeStartRequest(
154154
return nil
155155
}
156156

157+
func validateAndNormalizeDeleteRequest(req *workflowservice.DeleteNexusOperationExecutionRequest, config *Config) error {
158+
if req.GetOperationId() == "" {
159+
return serviceerror.NewInvalidArgument("operation_id is required")
160+
}
161+
if len(req.GetOperationId()) > config.MaxIDLengthLimit() {
162+
return serviceerror.NewInvalidArgumentf("operation_id exceeds length limit. Length=%d Limit=%d",
163+
len(req.GetOperationId()), config.MaxIDLengthLimit())
164+
}
165+
if req.GetRunId() != "" {
166+
if err := uuid.Validate(req.GetRunId()); err != nil {
167+
return serviceerror.NewInvalidArgument("invalid run id: must be a valid UUID")
168+
}
169+
}
170+
return nil
171+
}
172+
157173
func validateAndNormalizeDescribeRequest(req *workflowservice.DescribeNexusOperationExecutionRequest, config *Config) error {
158174
if req.GetOperationId() == "" {
159175
return serviceerror.NewInvalidArgument("operation_id is required")

chasm/lib/nexusoperation/validator_test.go

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ func TestValidateRequestCancelNexusOperationExecutionRequest(t *testing.T) {
331331
r.RequestId = ""
332332
},
333333
check: func(t *testing.T, r *workflowservice.RequestCancelNexusOperationExecutionRequest) {
334-
require.Len(t, r.RequestId, 36) // UUID length
334+
require.Len(t, r.RequestId, 36)
335335
},
336336
},
337337
{
@@ -400,6 +400,67 @@ func TestValidateRequestCancelNexusOperationExecutionRequest(t *testing.T) {
400400
}
401401
}
402402

403+
func TestValidateDeleteNexusOperationExecutionRequest(t *testing.T) {
404+
config := &Config{
405+
MaxIDLengthLimit: func() int { return 20 },
406+
}
407+
408+
for _, tc := range []struct {
409+
name string
410+
mutate func(*workflowservice.DeleteNexusOperationExecutionRequest)
411+
errMsg string
412+
}{
413+
{
414+
name: "valid request",
415+
},
416+
{
417+
name: "valid request - with run_id",
418+
mutate: func(r *workflowservice.DeleteNexusOperationExecutionRequest) {
419+
r.RunId = "550e8400-e29b-41d4-a716-446655440000"
420+
},
421+
},
422+
{
423+
name: "operation_id - required",
424+
mutate: func(r *workflowservice.DeleteNexusOperationExecutionRequest) {
425+
r.OperationId = ""
426+
},
427+
errMsg: "operation_id is required",
428+
},
429+
{
430+
name: "operation_id - exceeds length limit",
431+
mutate: func(r *workflowservice.DeleteNexusOperationExecutionRequest) {
432+
r.OperationId = "this-operation-id-is-too-long"
433+
},
434+
errMsg: "operation_id exceeds length limit",
435+
},
436+
{
437+
name: "run_id - invalid UUID",
438+
mutate: func(r *workflowservice.DeleteNexusOperationExecutionRequest) {
439+
r.RunId = "not-a-valid-uuid"
440+
},
441+
errMsg: "invalid run id: must be a valid UUID",
442+
},
443+
} {
444+
t.Run(tc.name, func(t *testing.T) {
445+
validReq := &workflowservice.DeleteNexusOperationExecutionRequest{
446+
Namespace: "default",
447+
OperationId: "operation-id",
448+
}
449+
if tc.mutate != nil {
450+
tc.mutate(validReq)
451+
}
452+
err := validateAndNormalizeDeleteRequest(validReq, config)
453+
if tc.errMsg != "" {
454+
var invalidArgErr *serviceerror.InvalidArgument
455+
require.ErrorAs(t, err, &invalidArgErr)
456+
require.Contains(t, err.Error(), tc.errMsg)
457+
} else {
458+
require.NoError(t, err)
459+
}
460+
})
461+
}
462+
}
463+
403464
func TestValidateTerminateNexusOperationExecutionRequest(t *testing.T) {
404465
config := &Config{
405466
MaxIDLengthLimit: func() int { return 20 },
@@ -421,7 +482,7 @@ func TestValidateTerminateNexusOperationExecutionRequest(t *testing.T) {
421482
r.RequestId = ""
422483
},
423484
check: func(t *testing.T, r *workflowservice.TerminateNexusOperationExecutionRequest) {
424-
require.Len(t, r.RequestId, 36) // UUID length
485+
require.Len(t, r.RequestId, 36)
425486
},
426487
},
427488
{

0 commit comments

Comments
 (0)