-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Add callback support for standalone activities #9786
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
7da34b7
62baad4
868a947
10ad366
3c2d947
6738984
91efea1
e4a93b9
2dfefac
7d3830e
15c2692
5d114ed
09b7867
d493530
85f6c90
53be145
41779ee
1520b56
f9d3cf5
7fab7b6
f965a70
d9060e0
ef7e50d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,7 +6,9 @@ import ( | |
| "slices" | ||
| "time" | ||
|
|
||
| "github.com/nexus-rpc/sdk-go/nexus" | ||
| apiactivitypb "go.temporal.io/api/activity/v1" //nolint:importas | ||
| callbackpb "go.temporal.io/api/callback/v1" | ||
| commonpb "go.temporal.io/api/common/v1" | ||
| enumspb "go.temporal.io/api/enums/v1" | ||
| failurepb "go.temporal.io/api/failure/v1" | ||
|
|
@@ -18,10 +20,14 @@ import ( | |
| tokenspb "go.temporal.io/server/api/token/v1" | ||
| "go.temporal.io/server/chasm" | ||
| "go.temporal.io/server/chasm/lib/activity/gen/activitypb/v1" | ||
| "go.temporal.io/server/chasm/lib/callback" | ||
| callbackspb "go.temporal.io/server/chasm/lib/callback/gen/callbackpb/v1" | ||
| "go.temporal.io/server/common" | ||
| "go.temporal.io/server/common/backoff" | ||
| "go.temporal.io/server/common/metrics" | ||
| "go.temporal.io/server/common/namespace" | ||
| commonnexus "go.temporal.io/server/common/nexus" | ||
| "go.temporal.io/server/common/nexus/nexusrpc" | ||
| "go.temporal.io/server/common/payload" | ||
| serviceerrors "go.temporal.io/server/common/serviceerror" | ||
| "go.temporal.io/server/common/tqid" | ||
|
|
@@ -41,6 +47,7 @@ var ( | |
| ) | ||
|
|
||
| var _ chasm.VisibilitySearchAttributesProvider = (*Activity)(nil) | ||
| var _ callback.CompletionSource = (*Activity)(nil) | ||
|
|
||
| type ActivityStore interface { | ||
| // RecordCompleted applies the provided function to record activity completion | ||
|
|
@@ -65,6 +72,10 @@ type Activity struct { | |
| // implements the ActivityStore interface). | ||
| // TODO(saa-preview): figure out better naming. | ||
| Store chasm.ParentPtr[ActivityStore] | ||
|
|
||
| // Callbacks holds completion callbacks to be invoked when this standalone activity reaches a terminal state. Nil | ||
| // for workflow-embedded activities as the workflow handles its own callbacks. | ||
| Callbacks chasm.Map[string, *callback.Callback] | ||
| } | ||
|
|
||
| // WithToken wraps a request with its deserialized task token. | ||
|
|
@@ -256,8 +267,116 @@ func attemptScheduleTimeForRetry(attempt *activitypb.ActivityAttemptState) *time | |
| } | ||
|
|
||
| // RecordCompleted applies the provided function to record activity completion. | ||
| // For standalone activities, it also triggers any registered completion callbacks. | ||
| func (a *Activity) RecordCompleted(ctx chasm.MutableContext, applyFn func(ctx chasm.MutableContext) error) error { | ||
| return applyFn(ctx) | ||
| if err := applyFn(ctx); err != nil { | ||
| return err | ||
| } | ||
| return callback.ScheduleStandbyCallbacks(ctx, a.Callbacks) | ||
| } | ||
|
|
||
| func (a *Activity) addCompletionCallbacks( | ||
| ctx chasm.MutableContext, | ||
| requestID string, | ||
| completionCallbacks []*commonpb.Callback, | ||
| maxCallbacks int, | ||
| ) error { | ||
| if len(completionCallbacks) == 0 { | ||
| return nil | ||
| } | ||
| if a.LifecycleState(ctx).IsClosed() { | ||
| return serviceerror.NewFailedPrecondition("cannot attach callbacks to a closed activity") | ||
| } | ||
|
|
||
| currentCount := len(a.Callbacks) | ||
| if len(completionCallbacks)+currentCount > maxCallbacks { | ||
| return serviceerror.NewFailedPreconditionf( | ||
| "cannot attach more than %d callbacks to an activity (%d callbacks already attached)", | ||
| maxCallbacks, | ||
| currentCount, | ||
| ) | ||
| } | ||
|
|
||
| if a.Callbacks == nil { | ||
| a.Callbacks = make(chasm.Map[string, *callback.Callback], len(completionCallbacks)) | ||
| } | ||
|
|
||
| registrationTime := timestamppb.New(ctx.Now(a)) | ||
|
|
||
| for idx, cb := range completionCallbacks { | ||
| chasmCB := &callbackspb.Callback{ | ||
| Links: cb.GetLinks(), | ||
| } | ||
| switch variant := cb.Variant.(type) { | ||
| case *commonpb.Callback_Nexus_: | ||
| chasmCB.Variant = &callbackspb.Callback_Nexus_{ | ||
| Nexus: &callbackspb.Callback_Nexus{ | ||
| Url: variant.Nexus.GetUrl(), | ||
| Header: variant.Nexus.GetHeader(), | ||
| }, | ||
| } | ||
| default: | ||
| return serviceerror.NewInvalidArgumentf("unsupported callback variant: %T", variant) | ||
| } | ||
|
|
||
| // requestID (unique per API call) + idx (position within the request) ensures unique,idempotent callback IDs. | ||
| id := fmt.Sprintf("%s-%d", requestID, idx) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add a comment explaining what desirable properties follow from this ID-naming scheme (and add the same comment to Can someone confirm that the ID naming scheme used by HSM callbacks, which was designed to address a replication concern that I haven't fully understood yet, is not required by CHASM workflow/SAA callbacks?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added a comment, please double check accuracy folks
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No need to mention HSM here IMHO.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed |
||
| callbackObj := callback.NewCallback(requestID, registrationTime, &callbackspb.CallbackState{}, chasmCB) | ||
| a.Callbacks[id] = chasm.NewComponentField(ctx, callbackObj) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // GetNexusCompletion returns the activity's completion data in the format required by the Nexus callback invocation. | ||
| // Implements callback.CompletionSource. | ||
| func (a *Activity) GetNexusCompletion(ctx chasm.Context, _ string) (nexusrpc.CompleteOperationOptions, error) { | ||
| if !a.LifecycleState(ctx).IsClosed() { | ||
| return nexusrpc.CompleteOperationOptions{}, serviceerror.NewInternal("activity has not completed yet") | ||
| } | ||
|
|
||
| opts := nexusrpc.CompleteOperationOptions{ | ||
| StartTime: a.GetScheduleTime().AsTime(), | ||
| CloseTime: ctx.ExecutionInfo().CloseTime, | ||
| } | ||
|
|
||
| outcome := a.Outcome.Get(ctx) | ||
| if successful := outcome.GetSuccessful(); successful != nil { | ||
| // Successful completion: return the first output payload as the result as Nexus supports only a single payload | ||
| var p *commonpb.Payload | ||
| if payloads := successful.GetOutput().GetPayloads(); len(payloads) > 0 { | ||
| p = payloads[0] | ||
| } | ||
| opts.Result = p | ||
| return opts, nil | ||
| } | ||
|
|
||
| failure := a.terminalFailure(ctx) | ||
| if failure != nil { | ||
| state := nexus.OperationStateFailed | ||
| message := "operation failed" | ||
| if a.Status == activitypb.ACTIVITY_EXECUTION_STATUS_CANCELED { | ||
| state = nexus.OperationStateCanceled | ||
| message = "operation canceled" | ||
| } | ||
|
|
||
| nf, err := commonnexus.TemporalFailureToNexusFailure(failure) | ||
| if err != nil { | ||
| return nexusrpc.CompleteOperationOptions{}, serviceerror.NewInternalf("failed to convert failure: %v", err) | ||
| } | ||
|
|
||
| opErr := &nexus.OperationError{ | ||
| State: state, | ||
| Message: message, | ||
| Cause: &nexus.FailureError{Failure: nf}, | ||
| } | ||
| if err := nexusrpc.MarkAsWrapperError(nexusrpc.DefaultFailureConverter(), opErr); err != nil { | ||
| return nexusrpc.CompleteOperationOptions{}, err | ||
| } | ||
| opts.Error = opErr | ||
| return opts, nil | ||
| } | ||
|
|
||
| return nexusrpc.CompleteOperationOptions{}, serviceerror.NewInternalf("activity in status %v has no outcome", a.Status) | ||
| } | ||
|
|
||
| // HandleCompleted updates the activity on activity completion. | ||
|
|
@@ -716,11 +835,17 @@ func (a *Activity) buildDescribeActivityExecutionResponse( | |
| input = a.RequestData.Get(ctx).GetInput() | ||
| } | ||
|
|
||
| callbackInfos, err := a.buildCallbackInfos(ctx) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| response := &workflowservice.DescribeActivityExecutionResponse{ | ||
| Info: info, | ||
| RunId: ctx.ExecutionKey().RunID, | ||
| Input: input, | ||
| LongPollToken: token, | ||
| Callbacks: callbackInfos, | ||
| } | ||
|
|
||
| if request.GetIncludeOutcome() { | ||
|
|
@@ -732,6 +857,56 @@ func (a *Activity) buildDescribeActivityExecutionResponse( | |
| }, nil | ||
| } | ||
|
|
||
| func (a *Activity) buildCallbackInfos(ctx chasm.Context) ([]*apiactivitypb.CallbackInfo, error) { | ||
| if len(a.Callbacks) == 0 { | ||
| return nil, nil | ||
| } | ||
|
|
||
| cbInfos := make([]*apiactivitypb.CallbackInfo, 0, len(a.Callbacks)) | ||
| for _, field := range a.Callbacks { | ||
| cb := field.Get(ctx) | ||
|
|
||
| cbSpec, err := cb.ToAPICallback() | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| var state enumspb.CallbackState | ||
| switch cb.Status { | ||
| case callbackspb.CALLBACK_STATUS_UNSPECIFIED: | ||
| return nil, serviceerror.NewInternal("callback with UNSPECIFIED state") | ||
| case callbackspb.CALLBACK_STATUS_STANDBY: | ||
| state = enumspb.CALLBACK_STATE_STANDBY | ||
| case callbackspb.CALLBACK_STATUS_SCHEDULED: | ||
| state = enumspb.CALLBACK_STATE_SCHEDULED | ||
| case callbackspb.CALLBACK_STATUS_BACKING_OFF: | ||
| state = enumspb.CALLBACK_STATE_BACKING_OFF | ||
| case callbackspb.CALLBACK_STATUS_FAILED: | ||
| state = enumspb.CALLBACK_STATE_FAILED | ||
| case callbackspb.CALLBACK_STATUS_SUCCEEDED: | ||
| state = enumspb.CALLBACK_STATE_SUCCEEDED | ||
| default: | ||
| return nil, serviceerror.NewInternalf("unknown callback state: %v", cb.Status) | ||
| } | ||
|
|
||
| cbInfos = append(cbInfos, &apiactivitypb.CallbackInfo{ | ||
| Trigger: &apiactivitypb.CallbackInfo_Trigger{ | ||
| Variant: &apiactivitypb.CallbackInfo_Trigger_ActivityClosed{}, | ||
| }, | ||
| Info: &callbackpb.CallbackInfo{ | ||
| Callback: cbSpec, | ||
| RegistrationTime: cb.RegistrationTime, | ||
| State: state, | ||
| Attempt: cb.Attempt, | ||
| LastAttemptCompleteTime: cb.LastAttemptCompleteTime, | ||
| LastAttemptFailure: cb.LastAttemptFailure, | ||
| NextAttemptScheduleTime: cb.NextAttemptScheduleTime, | ||
| }, | ||
| }) | ||
| } | ||
| return cbInfos, nil | ||
| } | ||
|
|
||
| func (a *Activity) buildPollActivityExecutionResponse( | ||
| ctx chasm.Context, | ||
| ) *activitypb.PollActivityExecutionResponse { | ||
|
|
@@ -755,15 +930,23 @@ func (a *Activity) outcome(ctx chasm.Context) *apiactivitypb.ActivityExecutionOu | |
| Value: &apiactivitypb.ActivityExecutionOutcome_Result{Result: successful.GetOutput()}, | ||
| } | ||
| } | ||
| if failure := activityOutcome.GetFailed().GetFailure(); failure != nil { | ||
| if failure := a.terminalFailure(ctx); failure != nil { | ||
| return &apiactivitypb.ActivityExecutionOutcome{ | ||
| Value: &apiactivitypb.ActivityExecutionOutcome_Failure{Failure: failure}, | ||
| } | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // terminalFailure returns the failure for a closed activity. The failure may be stored in Outcome.Failed | ||
| // (terminated, canceled, timed out) or in LastAttempt.LastFailureDetails (failed after exhausting retries). | ||
| // Returns nil if no failure is found. | ||
| func (a *Activity) terminalFailure(ctx chasm.Context) *failurepb.Failure { | ||
| if f := a.Outcome.Get(ctx).GetFailed(); f != nil { | ||
| return f.GetFailure() | ||
| } | ||
| if details := a.LastAttempt.Get(ctx).GetLastFailureDetails(); details != nil { | ||
| return &apiactivitypb.ActivityExecutionOutcome{ | ||
| Value: &apiactivitypb.ActivityExecutionOutcome_Failure{Failure: details.GetFailure()}, | ||
| } | ||
| return details.GetFailure() | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.