Skip to content

Commit 851af03

Browse files
authored
[Nexus] Minor fixes - Added destination queue and schedule cancellati… (#9790)
## What changed? Minor fixes. ## Why? These were missing or misplaced previously. ## How did you test it? - [x] built - [ ] run locally and tested manually - [ ] covered by existing tests - [ ] added new unit test(s) - [ ] added new functional test(s) <!-- CURSOR_SUMMARY --> --- > [!NOTE] > **Medium Risk** > Changes task routing and cancellation scheduling in the Nexus operation state machine, which can affect execution flow and delivery to outbound queues. While localized, incorrect destinations or transition sequencing could cause stuck or misrouted operations. > > **Overview** > Ensures Nexus invocation tasks are routed to the correct outbound queue by setting `TaskAttributes.Destination` to the operation endpoint when scheduling and when rescheduling after backoff. > > Moves the "cancellation already requested" handling from the workflow `Started` event handler into `TransitionStarted`, so pending cancellations are automatically scheduled as soon as an operation token becomes available. > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit 4eddec6. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent 291cb26 commit 851af03

2 files changed

Lines changed: 13 additions & 16 deletions

File tree

chasm/lib/nexusoperation/operation_statemachine.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,9 @@ var TransitionScheduled = chasm.NewTransition(
1616
[]nexusoperationpb.OperationStatus{nexusoperationpb.OPERATION_STATUS_UNSPECIFIED},
1717
nexusoperationpb.OPERATION_STATUS_SCHEDULED,
1818
func(o *Operation, ctx chasm.MutableContext, event EventScheduled) error {
19-
// Emit an invocation task to start the operation
20-
ctx.AddTask(o, chasm.TaskAttributes{}, &nexusoperationpb.InvocationTask{
19+
// Emit an invocation task to start the operation.
20+
// The destination is the endpoint name, which routes the task to the correct outbound queue.
21+
ctx.AddTask(o, chasm.TaskAttributes{Destination: o.GetEndpoint()}, &nexusoperationpb.InvocationTask{
2122
Attempt: o.Attempt,
2223
})
2324

@@ -93,7 +94,7 @@ var transitionRescheduled = chasm.NewTransition(
9394
o.NextAttemptScheduleTime = nil
9495

9596
// Emit a new invocation task for the retry attempt
96-
ctx.AddTask(o, chasm.TaskAttributes{}, &nexusoperationpb.InvocationTask{
97+
ctx.AddTask(o, chasm.TaskAttributes{Destination: o.GetEndpoint()}, &nexusoperationpb.InvocationTask{
9798
Attempt: o.Attempt,
9899
})
99100

@@ -142,6 +143,13 @@ var TransitionStarted = chasm.NewTransition(
142143
})
143144
}
144145

146+
// If cancellation was already requested, schedule sending the cancellation request now that we have
147+
// an operation token.
148+
cancellation, ok := o.Cancellation.TryGet(ctx)
149+
if ok && cancellation.StateMachineState() == nexusoperationpb.CANCELLATION_STATUS_UNSPECIFIED {
150+
return TransitionCancellationScheduled.Apply(cancellation, ctx, EventCancellationScheduled{})
151+
}
152+
145153
return nil
146154
},
147155
)

chasm/lib/nexusoperation/workflow/events.go

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -281,20 +281,9 @@ func (d StartedEventDefinition) Apply(ctx chasm.MutableContext, wf *chasmworkflo
281281

282282
// TODO: Store event.Links on the Operation for standalone mode, where links won't be available via history.
283283

284-
if err := nexusoperation.TransitionStarted.Apply(op, ctx, nexusoperation.EventStarted{
284+
return nexusoperation.TransitionStarted.Apply(op, ctx, nexusoperation.EventStarted{
285285
OperationToken: attrs.GetOperationToken(),
286-
}); err != nil {
287-
return err
288-
}
289-
290-
// If cancellation was already requested, schedule sending the cancellation request now that we have
291-
// an operation token.
292-
cancellation, ok := op.Cancellation.TryGet(ctx)
293-
if ok && cancellation.StateMachineState() == nexusoperationpb.CANCELLATION_STATUS_UNSPECIFIED {
294-
return nexusoperation.TransitionCancellationScheduled.Apply(cancellation, ctx, nexusoperation.EventCancellationScheduled{})
295-
}
296-
297-
return nil
286+
})
298287
}
299288

300289
func (d StartedEventDefinition) CherryPick(ctx chasm.MutableContext, wf *chasmworkflow.Workflow, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error {

0 commit comments

Comments
 (0)