diff --git a/chasm/lib/activity/frontend.go b/chasm/lib/activity/frontend.go index c013b17c245..eb738455b22 100644 --- a/chasm/lib/activity/frontend.go +++ b/chasm/lib/activity/frontend.go @@ -294,8 +294,6 @@ func (h *frontendHandler) TerminateActivityExecution( } if req.GetRequestId() == "" { - // Since this mutates the request, we clone it first so that any retries use the original request. - req = common.CloneProto(req) req.RequestId = uuid.NewString() } @@ -333,8 +331,6 @@ func (h *frontendHandler) RequestCancelActivityExecution( } if req.GetRequestId() == "" { - // Since this mutates the request, we clone it first so that any retries use the original request. - req = common.CloneProto(req) req.RequestId = uuid.NewString() } @@ -362,6 +358,9 @@ func (h *frontendHandler) validateAndPopulateStartRequest( req *workflowservice.StartActivityExecutionRequest, namespaceID namespace.ID, ) (*workflowservice.StartActivityExecutionRequest, error) { + if req.GetRequestId() == "" { + req.RequestId = uuid.NewString() + } // Since validation includes mutation of the request, we clone it first so that any retries use the original request. req = common.CloneProto(req) activityType := req.ActivityType.GetName() @@ -400,10 +399,6 @@ func (h *frontendHandler) validateAndPopulateStartRequest( func (h *frontendHandler) validateAndNormalizeStartActivityExecutionRequest( req *workflowservice.StartActivityExecutionRequest, ) error { - if req.GetRequestId() == "" { - req.RequestId = uuid.NewString() - } - maxIDLengthLimit := h.config.MaxIDLengthLimit() if len(req.GetRequestId()) > maxIDLengthLimit { diff --git a/chasm/lib/activity/frontend_test.go b/chasm/lib/activity/frontend_test.go new file mode 100644 index 00000000000..9607e7d6869 --- /dev/null +++ b/chasm/lib/activity/frontend_test.go @@ -0,0 +1,65 @@ +package activity + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + commonpb "go.temporal.io/api/common/v1" + taskqueuepb "go.temporal.io/api/taskqueue/v1" + "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/namespace" + "google.golang.org/protobuf/types/known/durationpb" +) + +// TestRequestIdStableAcrossRetries verifies that a request ID is re-used +// across retries, even if server-generated. +func TestRequestIdStableAcrossRetries(t *testing.T) { + h := &frontendHandler{ + config: &Config{ + BlobSizeLimitError: defaultBlobSizeLimitError, + BlobSizeLimitWarn: defaultBlobSizeLimitWarn, + MaxIDLengthLimit: func() int { return defaultMaxIDLengthLimit }, + DefaultActivityRetryPolicy: getDefaultRetrySettings, + }, + logger: log.NewNoopLogger(), + } + nsID := namespace.ID("test-namespace-id") + + newReq := func(requestId string) *workflowservice.StartActivityExecutionRequest { + return &workflowservice.StartActivityExecutionRequest{ + Namespace: "test-namespace", + ActivityId: "test-activity", + ActivityType: &commonpb.ActivityType{ + Name: "test-type", + }, + TaskQueue: &taskqueuepb.TaskQueue{ + Name: "test-queue", + }, + StartToCloseTimeout: durationpb.New(time.Minute), + RequestId: requestId, + } + } + + // Simulate two RetryableInterceptor attempts: both call + // validateAndPopulateStartRequest with the same request pointer. + validateTwoAttempts := func(t *testing.T, req *workflowservice.StartActivityExecutionRequest) { + t.Helper() + clone1, err := h.validateAndPopulateStartRequest(req, nsID) + require.NoError(t, err) + require.NotEmpty(t, clone1.RequestId) + + clone2, err := h.validateAndPopulateStartRequest(req, nsID) + require.NoError(t, err) + require.Equal(t, clone1.RequestId, clone2.RequestId) + } + + t.Run("server-generated", func(t *testing.T) { + validateTwoAttempts(t, newReq("")) + }) + + t.Run("client-provided", func(t *testing.T) { + validateTwoAttempts(t, newReq("my-request-id")) + }) +}