Skip to content

Commit 7da34b7

Browse files
committed
Added callback support for standalone activities
1 parent 832a576 commit 7da34b7

14 files changed

Lines changed: 845 additions & 76 deletions

File tree

chasm/lib/activity/activity.go

Lines changed: 192 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,22 +6,28 @@ import (
66
"slices"
77
"time"
88

9+
"github.com/nexus-rpc/sdk-go/nexus"
910
apiactivitypb "go.temporal.io/api/activity/v1" //nolint:importas
1011
commonpb "go.temporal.io/api/common/v1"
1112
enumspb "go.temporal.io/api/enums/v1"
1213
failurepb "go.temporal.io/api/failure/v1"
1314
historypb "go.temporal.io/api/history/v1"
1415
"go.temporal.io/api/serviceerror"
16+
workflowpb "go.temporal.io/api/workflow/v1"
1517
"go.temporal.io/api/workflowservice/v1"
1618
"go.temporal.io/server/api/historyservice/v1"
1719
"go.temporal.io/server/api/matchingservice/v1"
1820
tokenspb "go.temporal.io/server/api/token/v1"
1921
"go.temporal.io/server/chasm"
2022
"go.temporal.io/server/chasm/lib/activity/gen/activitypb/v1"
23+
"go.temporal.io/server/chasm/lib/callback"
24+
callbackspb "go.temporal.io/server/chasm/lib/callback/gen/callbackpb/v1"
2125
"go.temporal.io/server/common"
2226
"go.temporal.io/server/common/backoff"
2327
"go.temporal.io/server/common/metrics"
2428
"go.temporal.io/server/common/namespace"
29+
commonnexus "go.temporal.io/server/common/nexus"
30+
"go.temporal.io/server/common/nexus/nexusrpc"
2531
"go.temporal.io/server/common/payload"
2632
serviceerrors "go.temporal.io/server/common/serviceerror"
2733
"go.temporal.io/server/common/tqid"
@@ -41,6 +47,7 @@ var (
4147
)
4248

4349
var _ chasm.VisibilitySearchAttributesProvider = (*Activity)(nil)
50+
var _ callback.CompletionSource = (*Activity)(nil)
4451

4552
type ActivityStore interface {
4653
// RecordCompleted applies the provided function to record activity completion
@@ -65,6 +72,10 @@ type Activity struct {
6572
// implements the ActivityStore interface).
6673
// TODO(saa-preview): figure out better naming.
6774
Store chasm.ParentPtr[ActivityStore]
75+
76+
// Callbacks holds completion callbacks to be invoked when this standalone activity reaches a terminal state. Nil
77+
// for workflow-embedded activities as the workflow handles its own callbacks.
78+
Callbacks chasm.Map[string, *callback.Callback]
6879
}
6980

7081
// WithToken wraps a request with its deserialized task token.
@@ -256,8 +267,136 @@ func attemptScheduleTimeForRetry(attempt *activitypb.ActivityAttemptState) *time
256267
}
257268

258269
// RecordCompleted applies the provided function to record activity completion.
270+
// For standalone activities, it also triggers any registered completion callbacks.
259271
func (a *Activity) RecordCompleted(ctx chasm.MutableContext, applyFn func(ctx chasm.MutableContext) error) error {
260-
return applyFn(ctx)
272+
if err := applyFn(ctx); err != nil {
273+
return err
274+
}
275+
return a.processCloseCallbacks(ctx)
276+
}
277+
278+
func (a *Activity) addCompletionCallbacks(
279+
ctx chasm.MutableContext,
280+
requestID string,
281+
completionCallbacks []*commonpb.Callback,
282+
maxCallbacks int,
283+
) error {
284+
if len(completionCallbacks) == 0 {
285+
return nil
286+
}
287+
288+
currentCount := len(a.Callbacks)
289+
if len(completionCallbacks)+currentCount > maxCallbacks {
290+
return serviceerror.NewFailedPreconditionf(
291+
"cannot attach more than %d callbacks to an activity (%d callbacks already attached)",
292+
maxCallbacks,
293+
currentCount,
294+
)
295+
}
296+
297+
if a.Callbacks == nil {
298+
a.Callbacks = make(chasm.Map[string, *callback.Callback], len(completionCallbacks))
299+
}
300+
301+
registrationTime := timestamppb.New(ctx.Now(a))
302+
303+
for idx, cb := range completionCallbacks {
304+
chasmCB := &callbackspb.Callback{
305+
Links: cb.GetLinks(),
306+
}
307+
switch variant := cb.Variant.(type) {
308+
case *commonpb.Callback_Nexus_:
309+
chasmCB.Variant = &callbackspb.Callback_Nexus_{
310+
Nexus: &callbackspb.Callback_Nexus{
311+
Url: variant.Nexus.GetUrl(),
312+
Header: variant.Nexus.GetHeader(),
313+
},
314+
}
315+
default:
316+
return serviceerror.NewInvalidArgumentf("unsupported callback variant: %T", variant)
317+
}
318+
319+
id := fmt.Sprintf("%s-%d", requestID, idx)
320+
callbackObj := callback.NewCallback(requestID, registrationTime, &callbackspb.CallbackState{}, chasmCB)
321+
a.Callbacks[id] = chasm.NewComponentField(ctx, callbackObj)
322+
}
323+
return nil
324+
}
325+
326+
// processCloseCallbacks triggers all STANDBY completion callbacks by transitioning them
327+
// to SCHEDULED state, which causes the callback library to invoke them.
328+
func (a *Activity) processCloseCallbacks(ctx chasm.MutableContext) error {
329+
for _, field := range a.Callbacks {
330+
cb := field.Get(ctx)
331+
if cb.Status != callbackspb.CALLBACK_STATUS_STANDBY {
332+
continue
333+
}
334+
if err := callback.TransitionScheduled.Apply(cb, ctx, callback.EventScheduled{}); err != nil {
335+
return err
336+
}
337+
}
338+
return nil
339+
}
340+
341+
// GetNexusCompletion returns the activity's completion data in the format required by the Nexus callback invocation.
342+
// Implements callback.CompletionSource.
343+
func (a *Activity) GetNexusCompletion(ctx chasm.Context, _ string) (nexusrpc.CompleteOperationOptions, error) {
344+
if !a.LifecycleState(ctx).IsClosed() {
345+
return nexusrpc.CompleteOperationOptions{}, serviceerror.NewFailedPrecondition("activity has not completed yet")
346+
}
347+
348+
attempt := a.LastAttempt.Get(ctx)
349+
opts := nexusrpc.CompleteOperationOptions{
350+
StartTime: attempt.GetStartedTime().AsTime(),
351+
CloseTime: attempt.GetCompleteTime().AsTime(),
352+
}
353+
354+
outcome := a.Outcome.Get(ctx)
355+
if successful := outcome.GetSuccessful(); successful != nil {
356+
// Successful completion: return the first output payload as the result as Nexus supports only a single payload
357+
var p *commonpb.Payload
358+
if payloads := successful.GetOutput().GetPayloads(); len(payloads) > 0 {
359+
p = payloads[0]
360+
}
361+
opts.Result = p
362+
return opts, nil
363+
}
364+
365+
var failure *failurepb.Failure
366+
if f := outcome.GetFailed(); f != nil {
367+
failure = f.GetFailure()
368+
}
369+
if failure == nil {
370+
if details := attempt.GetLastFailureDetails(); details != nil {
371+
failure = details.GetFailure()
372+
}
373+
}
374+
375+
if failure != nil {
376+
state := nexus.OperationStateFailed
377+
message := "operation failed"
378+
if a.Status == activitypb.ACTIVITY_EXECUTION_STATUS_CANCELED {
379+
state = nexus.OperationStateCanceled
380+
message = "operation canceled"
381+
}
382+
383+
nf, err := commonnexus.TemporalFailureToNexusFailure(failure)
384+
if err != nil {
385+
return nexusrpc.CompleteOperationOptions{}, serviceerror.NewInternalf("failed to convert failure: %v", err)
386+
}
387+
388+
opErr := &nexus.OperationError{
389+
State: state,
390+
Message: message,
391+
Cause: &nexus.FailureError{Failure: nf},
392+
}
393+
if err := nexusrpc.MarkAsWrapperError(nexusrpc.DefaultFailureConverter(), opErr); err != nil {
394+
return nexusrpc.CompleteOperationOptions{}, err
395+
}
396+
opts.Error = opErr
397+
}
398+
399+
return opts, nil
261400
}
262401

263402
// HandleCompleted updates the activity on activity completion.
@@ -716,11 +855,17 @@ func (a *Activity) buildDescribeActivityExecutionResponse(
716855
input = a.RequestData.Get(ctx).GetInput()
717856
}
718857

858+
callbackInfos, err := a.buildCallbackInfos(ctx)
859+
if err != nil {
860+
return nil, err
861+
}
862+
719863
response := &workflowservice.DescribeActivityExecutionResponse{
720864
Info: info,
721865
RunId: ctx.ExecutionKey().RunID,
722866
Input: input,
723867
LongPollToken: token,
868+
Callbacks: callbackInfos,
724869
}
725870

726871
if request.GetIncludeOutcome() {
@@ -732,6 +877,52 @@ func (a *Activity) buildDescribeActivityExecutionResponse(
732877
}, nil
733878
}
734879

880+
func (a *Activity) buildCallbackInfos(ctx chasm.Context) ([]*workflowpb.CallbackInfo, error) {
881+
if len(a.Callbacks) == 0 {
882+
return nil, nil
883+
}
884+
885+
cbInfos := make([]*workflowpb.CallbackInfo, 0, len(a.Callbacks))
886+
for _, field := range a.Callbacks {
887+
cb := field.Get(ctx)
888+
889+
cbSpec, err := cb.ToAPICallback()
890+
if err != nil {
891+
return nil, err
892+
}
893+
894+
var state enumspb.CallbackState
895+
switch cb.Status {
896+
case callbackspb.CALLBACK_STATUS_UNSPECIFIED:
897+
return nil, serviceerror.NewInternal("callback with UNSPECIFIED state")
898+
case callbackspb.CALLBACK_STATUS_STANDBY:
899+
state = enumspb.CALLBACK_STATE_STANDBY
900+
case callbackspb.CALLBACK_STATUS_SCHEDULED:
901+
state = enumspb.CALLBACK_STATE_SCHEDULED
902+
case callbackspb.CALLBACK_STATUS_BACKING_OFF:
903+
state = enumspb.CALLBACK_STATE_BACKING_OFF
904+
case callbackspb.CALLBACK_STATUS_FAILED:
905+
state = enumspb.CALLBACK_STATE_FAILED
906+
case callbackspb.CALLBACK_STATUS_SUCCEEDED:
907+
state = enumspb.CALLBACK_STATE_SUCCEEDED
908+
default:
909+
return nil, serviceerror.NewInternalf("unknown callback state: %v", cb.Status)
910+
}
911+
912+
cbInfos = append(cbInfos, &workflowpb.CallbackInfo{
913+
Callback: cbSpec,
914+
Trigger: &workflowpb.CallbackInfo_Trigger{Variant: &workflowpb.CallbackInfo_Trigger_WorkflowClosed{}},
915+
RegistrationTime: cb.RegistrationTime,
916+
State: state,
917+
Attempt: cb.Attempt,
918+
LastAttemptCompleteTime: cb.LastAttemptCompleteTime,
919+
LastAttemptFailure: cb.LastAttemptFailure,
920+
NextAttemptScheduleTime: cb.NextAttemptScheduleTime,
921+
})
922+
}
923+
return cbInfos, nil
924+
}
925+
735926
func (a *Activity) buildPollActivityExecutionResponse(
736927
ctx chasm.Context,
737928
) *activitypb.PollActivityExecutionResponse {

chasm/lib/activity/config.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55

66
"go.temporal.io/server/common/dynamicconfig"
77
"go.temporal.io/server/common/retrypolicy"
8+
"go.temporal.io/server/components/callbacks"
89
)
910

1011
var (
@@ -33,10 +34,14 @@ type Config struct {
3334
BlobSizeLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter
3435
BlobSizeLimitWarn dynamicconfig.IntPropertyFnWithNamespaceFilter
3536
BreakdownMetricsByTaskQueue dynamicconfig.TypedPropertyFnWithTaskQueueFilter[bool]
37+
CallbackEndpointConfigs dynamicconfig.TypedPropertyFnWithNamespaceFilter[callbacks.AddressMatchRules]
38+
CallbackHeaderMaxSize dynamicconfig.IntPropertyFnWithNamespaceFilter
39+
CallbackURLMaxLength dynamicconfig.IntPropertyFnWithNamespaceFilter
3640
Enabled dynamicconfig.BoolPropertyFnWithNamespaceFilter
3741
LongPollBuffer dynamicconfig.DurationPropertyFnWithNamespaceFilter
3842
LongPollTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter
3943
MaxIDLengthLimit dynamicconfig.IntPropertyFn
44+
MaxCallbacksPerExecution dynamicconfig.IntPropertyFnWithNamespaceFilter
4045
DefaultActivityRetryPolicy dynamicconfig.TypedPropertyFnWithNamespaceFilter[retrypolicy.DefaultRetrySettings]
4146
VisibilityMaxPageSize dynamicconfig.IntPropertyFnWithNamespaceFilter
4247
}
@@ -46,11 +51,15 @@ func ConfigProvider(dc *dynamicconfig.Collection) *Config {
4651
BlobSizeLimitError: dynamicconfig.BlobSizeLimitError.Get(dc),
4752
BlobSizeLimitWarn: dynamicconfig.BlobSizeLimitWarn.Get(dc),
4853
BreakdownMetricsByTaskQueue: dynamicconfig.MetricsBreakdownByTaskQueue.Get(dc),
54+
CallbackEndpointConfigs: callbacks.AllowedAddresses.Get(dc),
55+
CallbackHeaderMaxSize: dynamicconfig.FrontendCallbackHeaderMaxSize.Get(dc),
56+
CallbackURLMaxLength: dynamicconfig.FrontendCallbackURLMaxLength.Get(dc),
4957
DefaultActivityRetryPolicy: dynamicconfig.DefaultActivityRetryPolicy.Get(dc),
5058
Enabled: Enabled.Get(dc),
5159
LongPollBuffer: LongPollBuffer.Get(dc),
5260
LongPollTimeout: LongPollTimeout.Get(dc),
5361
MaxIDLengthLimit: dynamicconfig.MaxIDLengthLimit.Get(dc),
62+
MaxCallbacksPerExecution: dynamicconfig.MaxCallbacksPerExecution.Get(dc),
5463
VisibilityMaxPageSize: dynamicconfig.FrontendVisibilityMaxPageSize.Get(dc),
5564
}
5665
}

chasm/lib/activity/frontend.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"go.temporal.io/server/common/metrics"
1717
"go.temporal.io/server/common/namespace"
1818
"go.temporal.io/server/common/searchattribute"
19+
"go.temporal.io/server/components/callbacks"
1920
"google.golang.org/protobuf/types/known/durationpb"
2021
"google.golang.org/protobuf/types/known/emptypb"
2122
"google.golang.org/protobuf/types/known/timestamppb"
@@ -426,6 +427,19 @@ func (h *frontendHandler) validateAndNormalizeStartActivityExecutionRequest(
426427
return serviceerror.NewInvalidArgument("input exceeds length limit")
427428
}
428429

430+
if cbs := req.GetCompletionCallbacks(); len(cbs) > 0 {
431+
if err := callbacks.ValidateCallbacks(
432+
cbs,
433+
h.config.MaxCallbacksPerExecution(req.GetNamespace()),
434+
h.config.CallbackURLMaxLength(req.GetNamespace()),
435+
h.config.CallbackHeaderMaxSize(req.GetNamespace()),
436+
h.config.CallbackEndpointConfigs(req.GetNamespace()),
437+
"an activity",
438+
); err != nil {
439+
return serviceerror.NewInvalidArgument(err.Error())
440+
}
441+
}
442+
429443
if req.GetSearchAttributes() != nil {
430444
if err := validateAndNormalizeSearchAttributes(
431445
req,

chasm/lib/activity/handler.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,13 @@ func (h *handler) StartActivityExecution(ctx context.Context, req *activitypb.St
7373
return nil, err
7474
}
7575

76+
if cbs := request.GetCompletionCallbacks(); len(cbs) > 0 {
77+
maxCallbacks := h.config.MaxCallbacksPerExecution(request.GetNamespace())
78+
if err := newActivity.addCompletionCallbacks(mutableContext, request.GetRequestId(), cbs, maxCallbacks); err != nil {
79+
return nil, err
80+
}
81+
}
82+
7683
err = TransitionScheduled.Apply(newActivity, mutableContext, nil)
7784
if err != nil {
7885
return nil, err

cmd/tools/getproto/files.go

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/dynamicconfig/constants.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1000,12 +1000,10 @@ so forwarding by endpoint ID will not work out of the box.`,
10001000
32,
10011001
`MaxCallbacksPerWorkflow is the maximum number of callbacks that can be attached to a workflow.`,
10021002
)
1003-
// NOTE (seankane): MaxCHASMCallbacksPerWorkflow is temporary, this will be removed and replaced with MaxCallbacksPerWorkflow
1004-
// once CHASM is fully enabled
1005-
MaxCHASMCallbacksPerWorkflow = NewNamespaceIntSetting(
1006-
"system.maxCHASMCallbacksPerWorkflow",
1003+
MaxCallbacksPerExecution = NewNamespaceIntSetting(
1004+
"system.maxCallbacksPerExecution",
10071005
2000,
1008-
`MaxCHASMCallbacksPerWorkflow is the maximum number of callbacks that can be attached to a workflow when using the CHASM implementation.`,
1006+
`MaxCallbacksPerExecution is the maximum number of callbacks that can be attached to a CHASM execution.`,
10091007
)
10101008
FrontendLinkMaxSize = NewNamespaceIntSetting(
10111009
"frontend.linkMaxSize",

0 commit comments

Comments
 (0)