Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions chasm/lib/activity/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,6 @@ func (h *frontendHandler) TerminateActivityExecution(
}

if req.GetRequestId() == "" {
// Since this mutates the request, we clone it first so that any retries use the original request.
req = common.CloneProto(req)
req.RequestId = uuid.NewString()
}

Expand Down Expand Up @@ -333,8 +331,6 @@ func (h *frontendHandler) RequestCancelActivityExecution(
}

if req.GetRequestId() == "" {
// Since this mutates the request, we clone it first so that any retries use the original request.
req = common.CloneProto(req)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fwiw, in my PR now I've moved this into the validation and normalization methods like this:

    if req.GetRequestId() == "" {
		req.RequestId = uuid.NewString()
	} else if len(req.GetRequestId()) > config.MaxIDLengthLimit() {
		return serviceerror.NewInvalidArgumentf("request_id exceeds length limit. Length=%d Limit=%d",
			len(req.GetRequestId()), config.MaxIDLengthLimit())
	}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've opened a pure refactoring PR to make SAA more consistent with SNO #9795

req.RequestId = uuid.NewString()
}

Expand Down Expand Up @@ -362,6 +358,9 @@ func (h *frontendHandler) validateAndPopulateStartRequest(
req *workflowservice.StartActivityExecutionRequest,
namespaceID namespace.ID,
) (*workflowservice.StartActivityExecutionRequest, error) {
if req.GetRequestId() == "" {
req.RequestId = uuid.NewString()
}
// Since validation includes mutation of the request, we clone it first so that any retries use the original request.
req = common.CloneProto(req)
activityType := req.ActivityType.GetName()
Expand Down Expand Up @@ -400,10 +399,6 @@ func (h *frontendHandler) validateAndPopulateStartRequest(
func (h *frontendHandler) validateAndNormalizeStartActivityExecutionRequest(
req *workflowservice.StartActivityExecutionRequest,
) error {
if req.GetRequestId() == "" {
req.RequestId = uuid.NewString()
}

maxIDLengthLimit := h.config.MaxIDLengthLimit()

if len(req.GetRequestId()) > maxIDLengthLimit {
Expand Down
65 changes: 65 additions & 0 deletions chasm/lib/activity/frontend_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package activity

import (
"testing"
"time"

"github.com/stretchr/testify/require"
commonpb "go.temporal.io/api/common/v1"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/namespace"
"google.golang.org/protobuf/types/known/durationpb"
)

// TestRequestIdStableAcrossRetries verifies that a request ID is re-used
// across retries, even if server-generated.
func TestRequestIdStableAcrossRetries(t *testing.T) {
h := &frontendHandler{
config: &Config{
BlobSizeLimitError: defaultBlobSizeLimitError,
BlobSizeLimitWarn: defaultBlobSizeLimitWarn,
MaxIDLengthLimit: func() int { return defaultMaxIDLengthLimit },
DefaultActivityRetryPolicy: getDefaultRetrySettings,
},
logger: log.NewNoopLogger(),
}
nsID := namespace.ID("test-namespace-id")

newReq := func(requestId string) *workflowservice.StartActivityExecutionRequest {
return &workflowservice.StartActivityExecutionRequest{
Namespace: "test-namespace",
ActivityId: "test-activity",
ActivityType: &commonpb.ActivityType{
Name: "test-type",
},
TaskQueue: &taskqueuepb.TaskQueue{
Name: "test-queue",
},
StartToCloseTimeout: durationpb.New(time.Minute),
RequestId: requestId,
}
}

// Simulate two RetryableInterceptor attempts: both call
// validateAndPopulateStartRequest with the same request pointer.
validateTwoAttempts := func(t *testing.T, req *workflowservice.StartActivityExecutionRequest) {
t.Helper()
clone1, err := h.validateAndPopulateStartRequest(req, nsID)
require.NoError(t, err)
require.NotEmpty(t, clone1.RequestId)

clone2, err := h.validateAndPopulateStartRequest(req, nsID)
require.NoError(t, err)
require.Equal(t, clone1.RequestId, clone2.RequestId)
}

t.Run("server-generated", func(t *testing.T) {
validateTwoAttempts(t, newReq(""))
})

t.Run("client-provided", func(t *testing.T) {
validateTwoAttempts(t, newReq("my-request-id"))
})
}
Loading