Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
7da34b7
Added callback support for standalone activities
fretz12 Apr 7, 2026
62baad4
Added link in StartActivityExecutionResponse
fretz12 Apr 7, 2026
868a947
Address review issues
fretz12 Apr 7, 2026
10ad366
Merge remote-tracking branch 'origin/main' into fredtzeng/saa-callbacks
fretz12 Apr 7, 2026
3c2d947
Update to activity callback API
fretz12 Apr 10, 2026
6738984
Addressed PR comments
fretz12 Apr 10, 2026
91efea1
Addressed PR comments
fretz12 Apr 13, 2026
e4a93b9
Merge main
fretz12 Apr 13, 2026
2dfefac
Updated API
fretz12 Apr 13, 2026
7d3830e
Undo file format change
fretz12 Apr 13, 2026
15c2692
Fix linter error
fretz12 Apr 13, 2026
5d114ed
Change validator to existing allowed addresses config
fretz12 Apr 13, 2026
09b7867
Fixed grpc err message extraction
fretz12 Apr 13, 2026
d493530
Merge branch 'main' into fredtzeng/saa-callbacks
fretz12 Apr 13, 2026
85f6c90
Refactor MaxPerExecution. Add on-conflict attach callbacks support.
fretz12 Apr 14, 2026
53be145
Merge remote-tracking branch 'origin/fredtzeng/saa-callbacks' into fr…
fretz12 Apr 14, 2026
41779ee
Merge branch 'main' into fredtzeng/saa-callbacks
fretz12 Apr 14, 2026
1520b56
Register new callback proto package
fretz12 Apr 14, 2026
f9d3cf5
Merge remote-tracking branch 'origin/fredtzeng/saa-callbacks' into fr…
fretz12 Apr 14, 2026
7fab7b6
Addressed PR comments
fretz12 Apr 15, 2026
f965a70
Update to master API
fretz12 Apr 15, 2026
d9060e0
go mod tidy
fretz12 Apr 15, 2026
ef7e50d
Merge branch 'main' into fredtzeng/saa-callbacks
fretz12 Apr 15, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 188 additions & 5 deletions chasm/lib/activity/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"slices"
"time"

"github.com/nexus-rpc/sdk-go/nexus"
apiactivitypb "go.temporal.io/api/activity/v1" //nolint:importas
callbackpb "go.temporal.io/api/callback/v1"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
failurepb "go.temporal.io/api/failure/v1"
Expand All @@ -18,10 +20,14 @@ import (
tokenspb "go.temporal.io/server/api/token/v1"
"go.temporal.io/server/chasm"
"go.temporal.io/server/chasm/lib/activity/gen/activitypb/v1"
"go.temporal.io/server/chasm/lib/callback"
callbackspb "go.temporal.io/server/chasm/lib/callback/gen/callbackpb/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
commonnexus "go.temporal.io/server/common/nexus"
"go.temporal.io/server/common/nexus/nexusrpc"
"go.temporal.io/server/common/payload"
serviceerrors "go.temporal.io/server/common/serviceerror"
"go.temporal.io/server/common/tqid"
Expand All @@ -41,6 +47,7 @@ var (
)

var _ chasm.VisibilitySearchAttributesProvider = (*Activity)(nil)
var _ callback.CompletionSource = (*Activity)(nil)

type ActivityStore interface {
// RecordCompleted applies the provided function to record activity completion
Expand All @@ -65,6 +72,10 @@ type Activity struct {
// implements the ActivityStore interface).
// TODO(saa-preview): figure out better naming.
Store chasm.ParentPtr[ActivityStore]

// Callbacks holds completion callbacks to be invoked when this standalone activity reaches a terminal state. Nil
// for workflow-embedded activities as the workflow handles its own callbacks.
Callbacks chasm.Map[string, *callback.Callback]
}

// WithToken wraps a request with its deserialized task token.
Expand Down Expand Up @@ -256,8 +267,116 @@ func attemptScheduleTimeForRetry(attempt *activitypb.ActivityAttemptState) *time
}

// RecordCompleted applies the provided function to record activity completion.
// For standalone activities, it also triggers any registered completion callbacks.
func (a *Activity) RecordCompleted(ctx chasm.MutableContext, applyFn func(ctx chasm.MutableContext) error) error {
return applyFn(ctx)
if err := applyFn(ctx); err != nil {
return err
}
return callback.ScheduleStandbyCallbacks(ctx, a.Callbacks)
}

func (a *Activity) addCompletionCallbacks(
ctx chasm.MutableContext,
requestID string,
completionCallbacks []*commonpb.Callback,
maxCallbacks int,
) error {
if len(completionCallbacks) == 0 {
Comment thread
fretz12 marked this conversation as resolved.
return nil
}
if a.LifecycleState(ctx).IsClosed() {
return serviceerror.NewFailedPrecondition("cannot attach callbacks to a closed activity")
}

currentCount := len(a.Callbacks)
if len(completionCallbacks)+currentCount > maxCallbacks {
return serviceerror.NewFailedPreconditionf(
"cannot attach more than %d callbacks to an activity (%d callbacks already attached)",
maxCallbacks,
currentCount,
)
}

if a.Callbacks == nil {
a.Callbacks = make(chasm.Map[string, *callback.Callback], len(completionCallbacks))
}

registrationTime := timestamppb.New(ctx.Now(a))

for idx, cb := range completionCallbacks {
chasmCB := &callbackspb.Callback{
Links: cb.GetLinks(),
}
switch variant := cb.Variant.(type) {
case *commonpb.Callback_Nexus_:
chasmCB.Variant = &callbackspb.Callback_Nexus_{
Nexus: &callbackspb.Callback_Nexus{
Url: variant.Nexus.GetUrl(),
Header: variant.Nexus.GetHeader(),
},
}
default:
return serviceerror.NewInvalidArgumentf("unsupported callback variant: %T", variant)
}

// requestID (unique per API call) + idx (position within the request) ensures unique,idempotent callback IDs.
id := fmt.Sprintf("%s-%d", requestID, idx)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment explaining what desirable properties follow from this ID-naming scheme (and add the same comment to chasm/lb/workflow/workflow.go.

Can someone confirm that the ID naming scheme used by HSM callbacks, which was designed to address a replication concern that I haven't fully understood yet, is not required by CHASM workflow/SAA callbacks?

https://github.com/temporalio/temporal/blob/main/service/history/workflow/mutable_state_impl.go#L3174-L3176

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment, please double check accuracy folks

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to mention HSM here IMHO.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

callbackObj := callback.NewCallback(requestID, registrationTime, &callbackspb.CallbackState{}, chasmCB)
a.Callbacks[id] = chasm.NewComponentField(ctx, callbackObj)
}
return nil
}

// GetNexusCompletion returns the activity's completion data in the format required by the Nexus callback invocation.
// Implements callback.CompletionSource.
func (a *Activity) GetNexusCompletion(ctx chasm.Context, _ string) (nexusrpc.CompleteOperationOptions, error) {
if !a.LifecycleState(ctx).IsClosed() {
return nexusrpc.CompleteOperationOptions{}, serviceerror.NewInternal("activity has not completed yet")
}

opts := nexusrpc.CompleteOperationOptions{
StartTime: a.GetScheduleTime().AsTime(),
CloseTime: ctx.ExecutionInfo().CloseTime,
}

outcome := a.Outcome.Get(ctx)
if successful := outcome.GetSuccessful(); successful != nil {
// Successful completion: return the first output payload as the result as Nexus supports only a single payload
var p *commonpb.Payload
if payloads := successful.GetOutput().GetPayloads(); len(payloads) > 0 {
p = payloads[0]
}
opts.Result = p
return opts, nil
}

failure := a.terminalFailure(ctx)
if failure != nil {
state := nexus.OperationStateFailed
message := "operation failed"
if a.Status == activitypb.ACTIVITY_EXECUTION_STATUS_CANCELED {
state = nexus.OperationStateCanceled
message = "operation canceled"
}

nf, err := commonnexus.TemporalFailureToNexusFailure(failure)
if err != nil {
return nexusrpc.CompleteOperationOptions{}, serviceerror.NewInternalf("failed to convert failure: %v", err)
}

opErr := &nexus.OperationError{
State: state,
Message: message,
Cause: &nexus.FailureError{Failure: nf},
}
if err := nexusrpc.MarkAsWrapperError(nexusrpc.DefaultFailureConverter(), opErr); err != nil {
return nexusrpc.CompleteOperationOptions{}, err
}
opts.Error = opErr
return opts, nil
}

return nexusrpc.CompleteOperationOptions{}, serviceerror.NewInternalf("activity in status %v has no outcome", a.Status)
}

// HandleCompleted updates the activity on activity completion.
Expand Down Expand Up @@ -716,11 +835,17 @@ func (a *Activity) buildDescribeActivityExecutionResponse(
input = a.RequestData.Get(ctx).GetInput()
}

callbackInfos, err := a.buildCallbackInfos(ctx)
if err != nil {
return nil, err
}

response := &workflowservice.DescribeActivityExecutionResponse{
Info: info,
RunId: ctx.ExecutionKey().RunID,
Input: input,
LongPollToken: token,
Callbacks: callbackInfos,
}

if request.GetIncludeOutcome() {
Expand All @@ -732,6 +857,56 @@ func (a *Activity) buildDescribeActivityExecutionResponse(
}, nil
}

func (a *Activity) buildCallbackInfos(ctx chasm.Context) ([]*apiactivitypb.CallbackInfo, error) {
if len(a.Callbacks) == 0 {
return nil, nil
}

cbInfos := make([]*apiactivitypb.CallbackInfo, 0, len(a.Callbacks))
for _, field := range a.Callbacks {
cb := field.Get(ctx)

cbSpec, err := cb.ToAPICallback()
if err != nil {
return nil, err
}

var state enumspb.CallbackState
switch cb.Status {
case callbackspb.CALLBACK_STATUS_UNSPECIFIED:
return nil, serviceerror.NewInternal("callback with UNSPECIFIED state")
case callbackspb.CALLBACK_STATUS_STANDBY:
state = enumspb.CALLBACK_STATE_STANDBY
case callbackspb.CALLBACK_STATUS_SCHEDULED:
state = enumspb.CALLBACK_STATE_SCHEDULED
case callbackspb.CALLBACK_STATUS_BACKING_OFF:
state = enumspb.CALLBACK_STATE_BACKING_OFF
case callbackspb.CALLBACK_STATUS_FAILED:
state = enumspb.CALLBACK_STATE_FAILED
case callbackspb.CALLBACK_STATUS_SUCCEEDED:
state = enumspb.CALLBACK_STATE_SUCCEEDED
default:
return nil, serviceerror.NewInternalf("unknown callback state: %v", cb.Status)
}

cbInfos = append(cbInfos, &apiactivitypb.CallbackInfo{
Trigger: &apiactivitypb.CallbackInfo_Trigger{
Variant: &apiactivitypb.CallbackInfo_Trigger_ActivityClosed{},
},
Info: &callbackpb.CallbackInfo{
Callback: cbSpec,
RegistrationTime: cb.RegistrationTime,
State: state,
Attempt: cb.Attempt,
LastAttemptCompleteTime: cb.LastAttemptCompleteTime,
LastAttemptFailure: cb.LastAttemptFailure,
NextAttemptScheduleTime: cb.NextAttemptScheduleTime,
},
})
}
return cbInfos, nil
}

func (a *Activity) buildPollActivityExecutionResponse(
ctx chasm.Context,
) *activitypb.PollActivityExecutionResponse {
Expand All @@ -755,15 +930,23 @@ func (a *Activity) outcome(ctx chasm.Context) *apiactivitypb.ActivityExecutionOu
Value: &apiactivitypb.ActivityExecutionOutcome_Result{Result: successful.GetOutput()},
}
}
if failure := activityOutcome.GetFailed().GetFailure(); failure != nil {
if failure := a.terminalFailure(ctx); failure != nil {
return &apiactivitypb.ActivityExecutionOutcome{
Value: &apiactivitypb.ActivityExecutionOutcome_Failure{Failure: failure},
}
}
return nil
}

// terminalFailure returns the failure for a closed activity. The failure may be stored in Outcome.Failed
// (terminated, canceled, timed out) or in LastAttempt.LastFailureDetails (failed after exhausting retries).
// Returns nil if no failure is found.
func (a *Activity) terminalFailure(ctx chasm.Context) *failurepb.Failure {
if f := a.Outcome.Get(ctx).GetFailed(); f != nil {
return f.GetFailure()
}
if details := a.LastAttempt.Get(ctx).GetLastFailureDetails(); details != nil {
return &apiactivitypb.ActivityExecutionOutcome{
Value: &apiactivitypb.ActivityExecutionOutcome_Failure{Failure: details.GetFailure()},
}
return details.GetFailure()
}
return nil
}
Expand Down
3 changes: 3 additions & 0 deletions chasm/lib/activity/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package activity
import (
"time"

"go.temporal.io/server/chasm/lib/callback"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/retrypolicy"
)
Expand Down Expand Up @@ -37,6 +38,7 @@ type Config struct {
LongPollBuffer dynamicconfig.DurationPropertyFnWithNamespaceFilter
LongPollTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter
MaxIDLengthLimit dynamicconfig.IntPropertyFn
MaxCallbacksPerExecution dynamicconfig.IntPropertyFnWithNamespaceFilter
DefaultActivityRetryPolicy dynamicconfig.TypedPropertyFnWithNamespaceFilter[retrypolicy.DefaultRetrySettings]
VisibilityMaxPageSize dynamicconfig.IntPropertyFnWithNamespaceFilter
}
Expand All @@ -51,6 +53,7 @@ func ConfigProvider(dc *dynamicconfig.Collection) *Config {
LongPollBuffer: LongPollBuffer.Get(dc),
LongPollTimeout: LongPollTimeout.Get(dc),
MaxIDLengthLimit: dynamicconfig.MaxIDLengthLimit.Get(dc),
MaxCallbacksPerExecution: callback.MaxPerExecution.Get(dc),
VisibilityMaxPageSize: dynamicconfig.FrontendVisibilityMaxPageSize.Get(dc),
}
}
10 changes: 10 additions & 0 deletions chasm/lib/activity/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/server/chasm"
"go.temporal.io/server/chasm/lib/activity/gen/activitypb/v1"
"go.temporal.io/server/chasm/lib/callback"
"go.temporal.io/server/common"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
Expand All @@ -37,6 +38,7 @@ var ErrStandaloneActivityDisabled = serviceerror.NewUnimplemented("Standalone ac

type frontendHandler struct {
FrontendHandler
callbackValidator *callback.Validator
client activitypb.ActivityServiceClient
config *Config
logger log.Logger
Expand All @@ -48,6 +50,7 @@ type frontendHandler struct {

// NewFrontendHandler creates a new FrontendHandler instance for processing activity frontend requests.
func NewFrontendHandler(
callbackValidator *callback.Validator,
client activitypb.ActivityServiceClient,
config *Config,
logger log.Logger,
Expand All @@ -57,6 +60,7 @@ func NewFrontendHandler(
saValidator *searchattribute.Validator,
) FrontendHandler {
return &frontendHandler{
callbackValidator: callbackValidator,
client: client,
config: config,
logger: logger,
Expand Down Expand Up @@ -392,6 +396,12 @@ func (h *frontendHandler) validateAndPopulateStartRequest(
return nil, err
}

if cbs := req.GetCompletionCallbacks(); len(cbs) > 0 {
if err := h.callbackValidator.Validate(req.GetNamespace(), cbs); err != nil {
return nil, err
}
Comment thread
fretz12 marked this conversation as resolved.
}

return req, nil
}

Expand Down
Loading
Loading