From 7d0c7681fb04403ec1022bd54880bfc94dd3c0f1 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Tue, 31 Mar 2026 15:08:49 -0400 Subject: [PATCH 1/3] Add functional test for request-ID stability across server-side retries Use a package-level atomic to fail StartActivityExecution once after the activity is created at history, triggering the RetryableInterceptor. Without the fix, the retry generates a new request ID and gets ActivityExecutionAlreadyStarted. With the fix, the retry reuses the same request ID and the dedup succeeds. --- chasm/lib/activity/frontend.go | 12 ++++++++++++ tests/standalone_activity_test.go | 30 ++++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/chasm/lib/activity/frontend.go b/chasm/lib/activity/frontend.go index eb738455b22..cbc4122c09f 100644 --- a/chasm/lib/activity/frontend.go +++ b/chasm/lib/activity/frontend.go @@ -2,6 +2,7 @@ package activity import ( "context" + "sync/atomic" "github.com/google/uuid" apiactivitypb "go.temporal.io/api/activity/v1" //nolint:importas @@ -35,6 +36,10 @@ type FrontendHandler interface { var ErrStandaloneActivityDisabled = serviceerror.NewUnimplemented("Standalone activity is disabled") +// TestStartFailOnce, when set to true, causes the next StartActivityExecution to return Unavailable +// after the activity is created. It fires once (CAS to false). +var TestStartFailOnce atomic.Bool + type frontendHandler struct { FrontendHandler client activitypb.ActivityServiceClient @@ -100,6 +105,13 @@ func (h *frontendHandler) StartActivityExecution(ctx context.Context, req *workf NamespaceId: namespaceID.String(), FrontendRequest: modifiedReq, }) + if err != nil { + return nil, err + } + + if TestStartFailOnce.CompareAndSwap(true, false) { + return nil, serviceerror.NewUnavailable("test: injected failure after successful creation") + } return resp.GetFrontendResponse(), err } diff --git a/tests/standalone_activity_test.go b/tests/standalone_activity_test.go index 6afc7b606f0..d5ded28c1f3 100644 --- a/tests/standalone_activity_test.go +++ b/tests/standalone_activity_test.go @@ -270,6 +270,36 @@ func (s *standaloneActivityTestSuite) TestIDConflictPolicy() { }) } +func (s *standaloneActivityTestSuite) TestServerGeneratedRequestIDStableAcrossRetries() { + t := s.T() + ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second) + defer cancel() + + activityID := testcore.RandomizeStr(t.Name()) + taskQueue := testcore.RandomizeStr(t.Name()) + + // Make the handler fail once with a retryable error so the RetryableInterceptor retries. + activity.TestStartFailOnce.Store(true) + + resp, err := s.FrontendClient().StartActivityExecution(ctx, &workflowservice.StartActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + ActivityType: s.tv.ActivityType(), + Identity: s.tv.WorkerIdentity(), + Input: defaultInput, + TaskQueue: &taskqueuepb.TaskQueue{ + Name: taskQueue, + }, + StartToCloseTimeout: durationpb.New(defaultStartToCloseTimeout), + // No RequestId — server generates one. + }) + // With the fix, the retry uses the same request ID, so history recognizes it as a dedup + // and succeeds (with Started=false). Without the fix, the retry generates a new request ID + // and gets ActivityExecutionAlreadyStarted. + require.NoError(t, err) + require.NotNil(t, resp) +} + func (s *standaloneActivityTestSuite) TestPollActivityTaskQueue() { t := s.T() ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second) From ae6fa7b34e1d533597f9be0cba812f6ac81e8dcc Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Thu, 2 Apr 2026 17:31:11 -0400 Subject: [PATCH 2/3] Revert "Add functional test for request-ID stability across server-side retries" This reverts commit 885673aa5ccb567755ad489d6a7f9537f7e98bd6. --- chasm/lib/activity/frontend.go | 12 ------------ tests/standalone_activity_test.go | 30 ------------------------------ 2 files changed, 42 deletions(-) diff --git a/chasm/lib/activity/frontend.go b/chasm/lib/activity/frontend.go index cbc4122c09f..eb738455b22 100644 --- a/chasm/lib/activity/frontend.go +++ b/chasm/lib/activity/frontend.go @@ -2,7 +2,6 @@ package activity import ( "context" - "sync/atomic" "github.com/google/uuid" apiactivitypb "go.temporal.io/api/activity/v1" //nolint:importas @@ -36,10 +35,6 @@ type FrontendHandler interface { var ErrStandaloneActivityDisabled = serviceerror.NewUnimplemented("Standalone activity is disabled") -// TestStartFailOnce, when set to true, causes the next StartActivityExecution to return Unavailable -// after the activity is created. It fires once (CAS to false). -var TestStartFailOnce atomic.Bool - type frontendHandler struct { FrontendHandler client activitypb.ActivityServiceClient @@ -105,13 +100,6 @@ func (h *frontendHandler) StartActivityExecution(ctx context.Context, req *workf NamespaceId: namespaceID.String(), FrontendRequest: modifiedReq, }) - if err != nil { - return nil, err - } - - if TestStartFailOnce.CompareAndSwap(true, false) { - return nil, serviceerror.NewUnavailable("test: injected failure after successful creation") - } return resp.GetFrontendResponse(), err } diff --git a/tests/standalone_activity_test.go b/tests/standalone_activity_test.go index d5ded28c1f3..6afc7b606f0 100644 --- a/tests/standalone_activity_test.go +++ b/tests/standalone_activity_test.go @@ -270,36 +270,6 @@ func (s *standaloneActivityTestSuite) TestIDConflictPolicy() { }) } -func (s *standaloneActivityTestSuite) TestServerGeneratedRequestIDStableAcrossRetries() { - t := s.T() - ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second) - defer cancel() - - activityID := testcore.RandomizeStr(t.Name()) - taskQueue := testcore.RandomizeStr(t.Name()) - - // Make the handler fail once with a retryable error so the RetryableInterceptor retries. - activity.TestStartFailOnce.Store(true) - - resp, err := s.FrontendClient().StartActivityExecution(ctx, &workflowservice.StartActivityExecutionRequest{ - Namespace: s.Namespace().String(), - ActivityId: activityID, - ActivityType: s.tv.ActivityType(), - Identity: s.tv.WorkerIdentity(), - Input: defaultInput, - TaskQueue: &taskqueuepb.TaskQueue{ - Name: taskQueue, - }, - StartToCloseTimeout: durationpb.New(defaultStartToCloseTimeout), - // No RequestId — server generates one. - }) - // With the fix, the retry uses the same request ID, so history recognizes it as a dedup - // and succeeds (with Started=false). Without the fix, the retry generates a new request ID - // and gets ActivityExecutionAlreadyStarted. - require.NoError(t, err) - require.NotNil(t, resp) -} - func (s *standaloneActivityTestSuite) TestPollActivityTaskQueue() { t := s.T() ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second) From afb0fa575239b4e3df97424694b63961e043c0cf Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Tue, 31 Mar 2026 11:08:03 -0400 Subject: [PATCH 3/3] Refactor and rename for consistence with Standalone Nexus Operations - Move request-ID generation into validation functions - Rename delete method --- chasm/lib/activity/frontend.go | 73 ++++++---------------------- chasm/lib/activity/frontend_test.go | 40 ++++++++++++++- chasm/lib/activity/validator.go | 68 +++++++++++++++++++++++--- chasm/lib/activity/validator_test.go | 18 +++---- 4 files changed, 122 insertions(+), 77 deletions(-) diff --git a/chasm/lib/activity/frontend.go b/chasm/lib/activity/frontend.go index eb738455b22..f2633672037 100644 --- a/chasm/lib/activity/frontend.go +++ b/chasm/lib/activity/frontend.go @@ -258,7 +258,7 @@ func (h *frontendHandler) DeleteActivityExecution( return nil, ErrStandaloneActivityDisabled } - if err := validateDeleteActivityExecutionRequest(req, h.config.MaxIDLengthLimit()); err != nil { + if err := validateAndNormalizeDeleteRequest(req, h.config.MaxIDLengthLimit()); err != nil { return nil, err } @@ -293,11 +293,7 @@ func (h *frontendHandler) TerminateActivityExecution( return nil, err } - if req.GetRequestId() == "" { - req.RequestId = uuid.NewString() - } - - if err := validateTerminateActivityExecutionRequest( + if err := validateAndNormalizeTerminateRequest( req, h.config.MaxIDLengthLimit(), h.config.BlobSizeLimitError, @@ -330,11 +326,7 @@ func (h *frontendHandler) RequestCancelActivityExecution( return nil, err } - if req.GetRequestId() == "" { - req.RequestId = uuid.NewString() - } - - if err := validateRequestCancelActivityExecutionRequest( + if err := validateAndNormalizeCancelRequest( req, h.config.MaxIDLengthLimit(), h.config.BlobSizeLimitError, @@ -358,10 +350,12 @@ func (h *frontendHandler) validateAndPopulateStartRequest( req *workflowservice.StartActivityExecutionRequest, namespaceID namespace.ID, ) (*workflowservice.StartActivityExecutionRequest, error) { + // Since validation mutates the request, clone it first so that retries use the original + // request. However if the client did not set a request ID then set that before cloning so that + // retries use the same request ID. if req.GetRequestId() == "" { req.RequestId = uuid.NewString() } - // Since validation includes mutation of the request, we clone it first so that any retries use the original request. req = common.CloneProto(req) activityType := req.ActivityType.GetName() @@ -385,57 +379,20 @@ func (h *frontendHandler) validateAndPopulateStartRequest( } applyActivityOptionsToStartRequest(opts, req) - err = h.validateAndNormalizeStartActivityExecutionRequest(req) - if err != nil { - return nil, err - } - - return req, nil -} - -// validateAndNormalizeStartActivityExecutionRequest validates and normalizes the standalone -// activity specific attributes. Note that this method mutates the input params; the caller must -// clone the request if necessary (e.g. if it may be retried). -func (h *frontendHandler) validateAndNormalizeStartActivityExecutionRequest( - req *workflowservice.StartActivityExecutionRequest, -) error { - maxIDLengthLimit := h.config.MaxIDLengthLimit() - - if len(req.GetRequestId()) > maxIDLengthLimit { - return serviceerror.NewInvalidArgumentf("request ID exceeds length limit. Length=%d Limit=%d", - len(req.GetRequestId()), maxIDLengthLimit) - } - - if len(req.GetIdentity()) > maxIDLengthLimit { - return serviceerror.NewInvalidArgumentf("identity exceeds length limit. Length=%d Limit=%d", - len(req.GetIdentity()), maxIDLengthLimit) - } - - if err := normalizeAndValidateIDPolicy(req); err != nil { - return err - } - - if err := validateBlobSize( - req.GetActivityId(), - "StartActivityExecution", + err = validateAndNormalizeStartRequest( + req, + h.config.MaxIDLengthLimit(), h.config.BlobSizeLimitError, h.config.BlobSizeLimitWarn, - req.Input.Size(), h.logger, - req.GetNamespace()); err != nil { - return serviceerror.NewInvalidArgument("input exceeds length limit") - } - - if req.GetSearchAttributes() != nil { - if err := validateAndNormalizeSearchAttributes( - req, - h.saMapperProvider, - h.saValidator); err != nil { - return err - } + h.saMapperProvider, + h.saValidator, + ) + if err != nil { + return nil, err } - return nil + return req, nil } // activityOptionsFromStartRequest builds an ActivityOptions from the inlined fields diff --git a/chasm/lib/activity/frontend_test.go b/chasm/lib/activity/frontend_test.go index 9607e7d6869..9dc13cda79c 100644 --- a/chasm/lib/activity/frontend_test.go +++ b/chasm/lib/activity/frontend_test.go @@ -13,6 +13,10 @@ import ( "google.golang.org/protobuf/types/known/durationpb" ) +type hasRequestID interface { + GetRequestId() string +} + // TestRequestIdStableAcrossRetries verifies that a request ID is re-used // across retries, even if server-generated. func TestRequestIdStableAcrossRetries(t *testing.T) { @@ -55,11 +59,43 @@ func TestRequestIdStableAcrossRetries(t *testing.T) { require.Equal(t, clone1.RequestId, clone2.RequestId) } - t.Run("server-generated", func(t *testing.T) { + // validateTwice calls validate twice and asserts the request ID is stable. + validateTwice := func(t *testing.T, req hasRequestID, validate func() error) { + t.Helper() + require.NoError(t, validate()) + require.NotEmpty(t, req.GetRequestId()) + firstID := req.GetRequestId() + require.NoError(t, validate()) + require.Equal(t, firstID, req.GetRequestId()) + } + + t.Run("start/server-generated", func(t *testing.T) { validateTwoAttempts(t, newReq("")) }) - t.Run("client-provided", func(t *testing.T) { + t.Run("start/client-provided", func(t *testing.T) { validateTwoAttempts(t, newReq("my-request-id")) }) + + t.Run("terminate/server-generated", func(t *testing.T) { + req := &workflowservice.TerminateActivityExecutionRequest{ + Namespace: "test-namespace", + ActivityId: "test-activity", + } + validateTwice(t, req, func() error { + return validateAndNormalizeTerminateRequest( + req, defaultMaxIDLengthLimit, defaultBlobSizeLimitError, defaultBlobSizeLimitWarn, log.NewNoopLogger()) + }) + }) + + t.Run("cancel/server-generated", func(t *testing.T) { + req := &workflowservice.RequestCancelActivityExecutionRequest{ + Namespace: "test-namespace", + ActivityId: "test-activity", + } + validateTwice(t, req, func() error { + return validateAndNormalizeCancelRequest( + req, defaultMaxIDLengthLimit, defaultBlobSizeLimitError, defaultBlobSizeLimitWarn, log.NewNoopLogger()) + }) + }) } diff --git a/chasm/lib/activity/validator.go b/chasm/lib/activity/validator.go index 3bc1883d374..fab57845c3b 100644 --- a/chasm/lib/activity/validator.go +++ b/chasm/lib/activity/validator.go @@ -120,7 +120,7 @@ func validateAndNormalizeActivityAttributes( return serviceerror.NewInvalidArgumentf("invalid priorities: %v", err) } - return normalizeAndValidateTimeouts(activityID, + return validateAndNormalizeTimeouts(activityID, activityType, runTimeout, options) @@ -140,7 +140,7 @@ func validateActivityRetryPolicy( return retrypolicy.Validate(retryPolicy) } -func normalizeAndValidateTimeouts( +func validateAndNormalizeTimeouts( activityID string, activityType string, runTimeout *durationpb.Duration, @@ -208,7 +208,7 @@ func normalizeAndValidateTimeouts( return nil } -func normalizeAndValidateIDPolicy(req *workflowservice.StartActivityExecutionRequest) error { +func validateAndNormalizeIDPolicy(req *workflowservice.StartActivityExecutionRequest) error { if req.GetIdReusePolicy() == enumspb.ACTIVITY_ID_REUSE_POLICY_UNSPECIFIED { req.IdReusePolicy = enumspb.ACTIVITY_ID_REUSE_POLICY_ALLOW_DUPLICATE } @@ -317,7 +317,55 @@ func validatePollActivityExecutionRequest( return nil } -func validateRequestCancelActivityExecutionRequest( +func validateAndNormalizeStartRequest( + req *workflowservice.StartActivityExecutionRequest, + maxIDLengthLimit int, + blobSizeLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter, + blobSizeLimitWarn dynamicconfig.IntPropertyFnWithNamespaceFilter, + logger log.Logger, + saMapperProvider searchattribute.MapperProvider, + saValidator *searchattribute.Validator, +) error { + if req.GetRequestId() == "" { + req.RequestId = uuid.NewString() + } else if len(req.GetRequestId()) > maxIDLengthLimit { + return serviceerror.NewInvalidArgumentf("request ID exceeds length limit. Length=%d Limit=%d", + len(req.GetRequestId()), maxIDLengthLimit) + } + + if len(req.GetIdentity()) > maxIDLengthLimit { + return serviceerror.NewInvalidArgumentf("identity exceeds length limit. Length=%d Limit=%d", + len(req.GetIdentity()), maxIDLengthLimit) + } + + if err := validateAndNormalizeIDPolicy(req); err != nil { + return err + } + + if err := validateBlobSize( + req.GetActivityId(), + "StartActivityExecution", + blobSizeLimitError, + blobSizeLimitWarn, + req.Input.Size(), + logger, + req.GetNamespace()); err != nil { + return serviceerror.NewInvalidArgument("input exceeds length limit") + } + + if req.GetSearchAttributes() != nil { + if err := validateAndNormalizeSearchAttributes( + req, + saMapperProvider, + saValidator); err != nil { + return err + } + } + + return nil +} + +func validateAndNormalizeCancelRequest( req *workflowservice.RequestCancelActivityExecutionRequest, maxIDLengthLimit int, blobSizeLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter, @@ -333,7 +381,9 @@ func validateRequestCancelActivityExecutionRequest( len(req.GetActivityId()), maxIDLengthLimit) } - if len(req.GetRequestId()) > maxIDLengthLimit { + if req.GetRequestId() == "" { + req.RequestId = uuid.NewString() + } else if len(req.GetRequestId()) > maxIDLengthLimit { return serviceerror.NewInvalidArgumentf("request ID exceeds length limit. Length=%d Limit=%d", len(req.GetRequestId()), maxIDLengthLimit) } @@ -365,7 +415,7 @@ func validateRequestCancelActivityExecutionRequest( return nil } -func validateDeleteActivityExecutionRequest( +func validateAndNormalizeDeleteRequest( req *workflowservice.DeleteActivityExecutionRequest, maxIDLengthLimit int, ) error { @@ -388,7 +438,7 @@ func validateDeleteActivityExecutionRequest( return nil } -func validateTerminateActivityExecutionRequest( +func validateAndNormalizeTerminateRequest( req *workflowservice.TerminateActivityExecutionRequest, maxIDLengthLimit int, blobSizeLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter, @@ -404,7 +454,9 @@ func validateTerminateActivityExecutionRequest( len(req.GetActivityId()), maxIDLengthLimit) } - if len(req.GetRequestId()) > maxIDLengthLimit { + if req.GetRequestId() == "" { + req.RequestId = uuid.NewString() + } else if len(req.GetRequestId()) > maxIDLengthLimit { return serviceerror.NewInvalidArgumentf("request ID exceeds length limit. Length=%d Limit=%d", len(req.GetRequestId()), maxIDLengthLimit) } diff --git a/chasm/lib/activity/validator_test.go b/chasm/lib/activity/validator_test.go index c4ef1838039..5f035d66002 100644 --- a/chasm/lib/activity/validator_test.go +++ b/chasm/lib/activity/validator_test.go @@ -403,7 +403,7 @@ func TestValidateStandAloneRequestIDTooLong(t *testing.T) { } h := newTestFrontendHandler(defaultBlobSizeLimitError, defaultBlobSizeLimitWarn, defaultMaxIDLengthLimit) - err := h.validateAndNormalizeStartActivityExecutionRequest(req) + err := validateAndNormalizeStartRequest(req, h.config.MaxIDLengthLimit(), h.config.BlobSizeLimitError, h.config.BlobSizeLimitWarn, h.logger, h.saMapperProvider, h.saValidator) var invalidArgErr *serviceerror.InvalidArgument require.ErrorAs(t, err, &invalidArgErr) } @@ -423,7 +423,7 @@ func TestValidateStandAloneInputTooLarge(t *testing.T) { } h := newTestFrontendHandler(defaultBlobSizeLimitError, defaultBlobSizeLimitWarn, defaultMaxIDLengthLimit) - err := h.validateAndNormalizeStartActivityExecutionRequest(req) + err := validateAndNormalizeStartRequest(req, h.config.MaxIDLengthLimit(), h.config.BlobSizeLimitError, h.config.BlobSizeLimitWarn, h.logger, h.saMapperProvider, h.saValidator) var invalidArgErr *serviceerror.InvalidArgument require.ErrorAs(t, err, &invalidArgErr) } @@ -450,7 +450,7 @@ func TestValidateStandAloneInputWarningSizeShouldSucceed(t *testing.T) { func(ns string) int { return payloadSize }, defaultMaxIDLengthLimit, ) - err := h.validateAndNormalizeStartActivityExecutionRequest(req) + err := validateAndNormalizeStartRequest(req, h.config.MaxIDLengthLimit(), h.config.BlobSizeLimitError, h.config.BlobSizeLimitWarn, h.logger, h.saMapperProvider, h.saValidator) require.NoError(t, err) } @@ -468,7 +468,7 @@ func TestValidateStandAlone_IDPolicyShouldDefault(t *testing.T) { } h := newTestFrontendHandler(defaultBlobSizeLimitError, defaultBlobSizeLimitWarn, defaultMaxIDLengthLimit) - err := h.validateAndNormalizeStartActivityExecutionRequest(req) + err := validateAndNormalizeStartRequest(req, h.config.MaxIDLengthLimit(), h.config.BlobSizeLimitError, h.config.BlobSizeLimitWarn, h.logger, h.saMapperProvider, h.saValidator) require.NoError(t, err) require.Equal(t, enumspb.ACTIVITY_ID_REUSE_POLICY_ALLOW_DUPLICATE, req.IdReusePolicy) @@ -620,7 +620,7 @@ func TestValidateDeleteActivityExecutionRequest(t *testing.T) { req := &workflowservice.DeleteActivityExecutionRequest{ ActivityId: defaultActivityID, } - err := validateDeleteActivityExecutionRequest(req, defaultMaxIDLengthLimit) + err := validateAndNormalizeDeleteRequest(req, defaultMaxIDLengthLimit) require.NoError(t, err) }) @@ -629,7 +629,7 @@ func TestValidateDeleteActivityExecutionRequest(t *testing.T) { ActivityId: defaultActivityID, RunId: "f47ac10b-58cc-4372-a567-0e02b2c3d479", } - err := validateDeleteActivityExecutionRequest(req, defaultMaxIDLengthLimit) + err := validateAndNormalizeDeleteRequest(req, defaultMaxIDLengthLimit) require.NoError(t, err) }) @@ -637,7 +637,7 @@ func TestValidateDeleteActivityExecutionRequest(t *testing.T) { req := &workflowservice.DeleteActivityExecutionRequest{ ActivityId: "", } - err := validateDeleteActivityExecutionRequest(req, defaultMaxIDLengthLimit) + err := validateAndNormalizeDeleteRequest(req, defaultMaxIDLengthLimit) var invalidArgErr *serviceerror.InvalidArgument require.ErrorAs(t, err, &invalidArgErr) }) @@ -646,7 +646,7 @@ func TestValidateDeleteActivityExecutionRequest(t *testing.T) { req := &workflowservice.DeleteActivityExecutionRequest{ ActivityId: string(make([]byte, defaultMaxIDLengthLimit+1)), } - err := validateDeleteActivityExecutionRequest(req, defaultMaxIDLengthLimit) + err := validateAndNormalizeDeleteRequest(req, defaultMaxIDLengthLimit) var invalidArgErr *serviceerror.InvalidArgument require.ErrorAs(t, err, &invalidArgErr) }) @@ -656,7 +656,7 @@ func TestValidateDeleteActivityExecutionRequest(t *testing.T) { ActivityId: defaultActivityID, RunId: "not-a-valid-uuid", } - err := validateDeleteActivityExecutionRequest(req, defaultMaxIDLengthLimit) + err := validateAndNormalizeDeleteRequest(req, defaultMaxIDLengthLimit) var invalidArgErr *serviceerror.InvalidArgument require.ErrorAs(t, err, &invalidArgErr) })