diff --git a/chasm/lib/activity/frontend.go b/chasm/lib/activity/frontend.go index eb738455b22..f2633672037 100644 --- a/chasm/lib/activity/frontend.go +++ b/chasm/lib/activity/frontend.go @@ -258,7 +258,7 @@ func (h *frontendHandler) DeleteActivityExecution( return nil, ErrStandaloneActivityDisabled } - if err := validateDeleteActivityExecutionRequest(req, h.config.MaxIDLengthLimit()); err != nil { + if err := validateAndNormalizeDeleteRequest(req, h.config.MaxIDLengthLimit()); err != nil { return nil, err } @@ -293,11 +293,7 @@ func (h *frontendHandler) TerminateActivityExecution( return nil, err } - if req.GetRequestId() == "" { - req.RequestId = uuid.NewString() - } - - if err := validateTerminateActivityExecutionRequest( + if err := validateAndNormalizeTerminateRequest( req, h.config.MaxIDLengthLimit(), h.config.BlobSizeLimitError, @@ -330,11 +326,7 @@ func (h *frontendHandler) RequestCancelActivityExecution( return nil, err } - if req.GetRequestId() == "" { - req.RequestId = uuid.NewString() - } - - if err := validateRequestCancelActivityExecutionRequest( + if err := validateAndNormalizeCancelRequest( req, h.config.MaxIDLengthLimit(), h.config.BlobSizeLimitError, @@ -358,10 +350,12 @@ func (h *frontendHandler) validateAndPopulateStartRequest( req *workflowservice.StartActivityExecutionRequest, namespaceID namespace.ID, ) (*workflowservice.StartActivityExecutionRequest, error) { + // Since validation mutates the request, clone it first so that retries use the original + // request. However if the client did not set a request ID then set that before cloning so that + // retries use the same request ID. if req.GetRequestId() == "" { req.RequestId = uuid.NewString() } - // Since validation includes mutation of the request, we clone it first so that any retries use the original request. req = common.CloneProto(req) activityType := req.ActivityType.GetName() @@ -385,57 +379,20 @@ func (h *frontendHandler) validateAndPopulateStartRequest( } applyActivityOptionsToStartRequest(opts, req) - err = h.validateAndNormalizeStartActivityExecutionRequest(req) - if err != nil { - return nil, err - } - - return req, nil -} - -// validateAndNormalizeStartActivityExecutionRequest validates and normalizes the standalone -// activity specific attributes. Note that this method mutates the input params; the caller must -// clone the request if necessary (e.g. if it may be retried). -func (h *frontendHandler) validateAndNormalizeStartActivityExecutionRequest( - req *workflowservice.StartActivityExecutionRequest, -) error { - maxIDLengthLimit := h.config.MaxIDLengthLimit() - - if len(req.GetRequestId()) > maxIDLengthLimit { - return serviceerror.NewInvalidArgumentf("request ID exceeds length limit. Length=%d Limit=%d", - len(req.GetRequestId()), maxIDLengthLimit) - } - - if len(req.GetIdentity()) > maxIDLengthLimit { - return serviceerror.NewInvalidArgumentf("identity exceeds length limit. Length=%d Limit=%d", - len(req.GetIdentity()), maxIDLengthLimit) - } - - if err := normalizeAndValidateIDPolicy(req); err != nil { - return err - } - - if err := validateBlobSize( - req.GetActivityId(), - "StartActivityExecution", + err = validateAndNormalizeStartRequest( + req, + h.config.MaxIDLengthLimit(), h.config.BlobSizeLimitError, h.config.BlobSizeLimitWarn, - req.Input.Size(), h.logger, - req.GetNamespace()); err != nil { - return serviceerror.NewInvalidArgument("input exceeds length limit") - } - - if req.GetSearchAttributes() != nil { - if err := validateAndNormalizeSearchAttributes( - req, - h.saMapperProvider, - h.saValidator); err != nil { - return err - } + h.saMapperProvider, + h.saValidator, + ) + if err != nil { + return nil, err } - return nil + return req, nil } // activityOptionsFromStartRequest builds an ActivityOptions from the inlined fields diff --git a/chasm/lib/activity/frontend_test.go b/chasm/lib/activity/frontend_test.go index 9607e7d6869..9dc13cda79c 100644 --- a/chasm/lib/activity/frontend_test.go +++ b/chasm/lib/activity/frontend_test.go @@ -13,6 +13,10 @@ import ( "google.golang.org/protobuf/types/known/durationpb" ) +type hasRequestID interface { + GetRequestId() string +} + // TestRequestIdStableAcrossRetries verifies that a request ID is re-used // across retries, even if server-generated. func TestRequestIdStableAcrossRetries(t *testing.T) { @@ -55,11 +59,43 @@ func TestRequestIdStableAcrossRetries(t *testing.T) { require.Equal(t, clone1.RequestId, clone2.RequestId) } - t.Run("server-generated", func(t *testing.T) { + // validateTwice calls validate twice and asserts the request ID is stable. + validateTwice := func(t *testing.T, req hasRequestID, validate func() error) { + t.Helper() + require.NoError(t, validate()) + require.NotEmpty(t, req.GetRequestId()) + firstID := req.GetRequestId() + require.NoError(t, validate()) + require.Equal(t, firstID, req.GetRequestId()) + } + + t.Run("start/server-generated", func(t *testing.T) { validateTwoAttempts(t, newReq("")) }) - t.Run("client-provided", func(t *testing.T) { + t.Run("start/client-provided", func(t *testing.T) { validateTwoAttempts(t, newReq("my-request-id")) }) + + t.Run("terminate/server-generated", func(t *testing.T) { + req := &workflowservice.TerminateActivityExecutionRequest{ + Namespace: "test-namespace", + ActivityId: "test-activity", + } + validateTwice(t, req, func() error { + return validateAndNormalizeTerminateRequest( + req, defaultMaxIDLengthLimit, defaultBlobSizeLimitError, defaultBlobSizeLimitWarn, log.NewNoopLogger()) + }) + }) + + t.Run("cancel/server-generated", func(t *testing.T) { + req := &workflowservice.RequestCancelActivityExecutionRequest{ + Namespace: "test-namespace", + ActivityId: "test-activity", + } + validateTwice(t, req, func() error { + return validateAndNormalizeCancelRequest( + req, defaultMaxIDLengthLimit, defaultBlobSizeLimitError, defaultBlobSizeLimitWarn, log.NewNoopLogger()) + }) + }) } diff --git a/chasm/lib/activity/validator.go b/chasm/lib/activity/validator.go index 3bc1883d374..fab57845c3b 100644 --- a/chasm/lib/activity/validator.go +++ b/chasm/lib/activity/validator.go @@ -120,7 +120,7 @@ func validateAndNormalizeActivityAttributes( return serviceerror.NewInvalidArgumentf("invalid priorities: %v", err) } - return normalizeAndValidateTimeouts(activityID, + return validateAndNormalizeTimeouts(activityID, activityType, runTimeout, options) @@ -140,7 +140,7 @@ func validateActivityRetryPolicy( return retrypolicy.Validate(retryPolicy) } -func normalizeAndValidateTimeouts( +func validateAndNormalizeTimeouts( activityID string, activityType string, runTimeout *durationpb.Duration, @@ -208,7 +208,7 @@ func normalizeAndValidateTimeouts( return nil } -func normalizeAndValidateIDPolicy(req *workflowservice.StartActivityExecutionRequest) error { +func validateAndNormalizeIDPolicy(req *workflowservice.StartActivityExecutionRequest) error { if req.GetIdReusePolicy() == enumspb.ACTIVITY_ID_REUSE_POLICY_UNSPECIFIED { req.IdReusePolicy = enumspb.ACTIVITY_ID_REUSE_POLICY_ALLOW_DUPLICATE } @@ -317,7 +317,55 @@ func validatePollActivityExecutionRequest( return nil } -func validateRequestCancelActivityExecutionRequest( +func validateAndNormalizeStartRequest( + req *workflowservice.StartActivityExecutionRequest, + maxIDLengthLimit int, + blobSizeLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter, + blobSizeLimitWarn dynamicconfig.IntPropertyFnWithNamespaceFilter, + logger log.Logger, + saMapperProvider searchattribute.MapperProvider, + saValidator *searchattribute.Validator, +) error { + if req.GetRequestId() == "" { + req.RequestId = uuid.NewString() + } else if len(req.GetRequestId()) > maxIDLengthLimit { + return serviceerror.NewInvalidArgumentf("request ID exceeds length limit. Length=%d Limit=%d", + len(req.GetRequestId()), maxIDLengthLimit) + } + + if len(req.GetIdentity()) > maxIDLengthLimit { + return serviceerror.NewInvalidArgumentf("identity exceeds length limit. Length=%d Limit=%d", + len(req.GetIdentity()), maxIDLengthLimit) + } + + if err := validateAndNormalizeIDPolicy(req); err != nil { + return err + } + + if err := validateBlobSize( + req.GetActivityId(), + "StartActivityExecution", + blobSizeLimitError, + blobSizeLimitWarn, + req.Input.Size(), + logger, + req.GetNamespace()); err != nil { + return serviceerror.NewInvalidArgument("input exceeds length limit") + } + + if req.GetSearchAttributes() != nil { + if err := validateAndNormalizeSearchAttributes( + req, + saMapperProvider, + saValidator); err != nil { + return err + } + } + + return nil +} + +func validateAndNormalizeCancelRequest( req *workflowservice.RequestCancelActivityExecutionRequest, maxIDLengthLimit int, blobSizeLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter, @@ -333,7 +381,9 @@ func validateRequestCancelActivityExecutionRequest( len(req.GetActivityId()), maxIDLengthLimit) } - if len(req.GetRequestId()) > maxIDLengthLimit { + if req.GetRequestId() == "" { + req.RequestId = uuid.NewString() + } else if len(req.GetRequestId()) > maxIDLengthLimit { return serviceerror.NewInvalidArgumentf("request ID exceeds length limit. Length=%d Limit=%d", len(req.GetRequestId()), maxIDLengthLimit) } @@ -365,7 +415,7 @@ func validateRequestCancelActivityExecutionRequest( return nil } -func validateDeleteActivityExecutionRequest( +func validateAndNormalizeDeleteRequest( req *workflowservice.DeleteActivityExecutionRequest, maxIDLengthLimit int, ) error { @@ -388,7 +438,7 @@ func validateDeleteActivityExecutionRequest( return nil } -func validateTerminateActivityExecutionRequest( +func validateAndNormalizeTerminateRequest( req *workflowservice.TerminateActivityExecutionRequest, maxIDLengthLimit int, blobSizeLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter, @@ -404,7 +454,9 @@ func validateTerminateActivityExecutionRequest( len(req.GetActivityId()), maxIDLengthLimit) } - if len(req.GetRequestId()) > maxIDLengthLimit { + if req.GetRequestId() == "" { + req.RequestId = uuid.NewString() + } else if len(req.GetRequestId()) > maxIDLengthLimit { return serviceerror.NewInvalidArgumentf("request ID exceeds length limit. Length=%d Limit=%d", len(req.GetRequestId()), maxIDLengthLimit) } diff --git a/chasm/lib/activity/validator_test.go b/chasm/lib/activity/validator_test.go index c4ef1838039..5f035d66002 100644 --- a/chasm/lib/activity/validator_test.go +++ b/chasm/lib/activity/validator_test.go @@ -403,7 +403,7 @@ func TestValidateStandAloneRequestIDTooLong(t *testing.T) { } h := newTestFrontendHandler(defaultBlobSizeLimitError, defaultBlobSizeLimitWarn, defaultMaxIDLengthLimit) - err := h.validateAndNormalizeStartActivityExecutionRequest(req) + err := validateAndNormalizeStartRequest(req, h.config.MaxIDLengthLimit(), h.config.BlobSizeLimitError, h.config.BlobSizeLimitWarn, h.logger, h.saMapperProvider, h.saValidator) var invalidArgErr *serviceerror.InvalidArgument require.ErrorAs(t, err, &invalidArgErr) } @@ -423,7 +423,7 @@ func TestValidateStandAloneInputTooLarge(t *testing.T) { } h := newTestFrontendHandler(defaultBlobSizeLimitError, defaultBlobSizeLimitWarn, defaultMaxIDLengthLimit) - err := h.validateAndNormalizeStartActivityExecutionRequest(req) + err := validateAndNormalizeStartRequest(req, h.config.MaxIDLengthLimit(), h.config.BlobSizeLimitError, h.config.BlobSizeLimitWarn, h.logger, h.saMapperProvider, h.saValidator) var invalidArgErr *serviceerror.InvalidArgument require.ErrorAs(t, err, &invalidArgErr) } @@ -450,7 +450,7 @@ func TestValidateStandAloneInputWarningSizeShouldSucceed(t *testing.T) { func(ns string) int { return payloadSize }, defaultMaxIDLengthLimit, ) - err := h.validateAndNormalizeStartActivityExecutionRequest(req) + err := validateAndNormalizeStartRequest(req, h.config.MaxIDLengthLimit(), h.config.BlobSizeLimitError, h.config.BlobSizeLimitWarn, h.logger, h.saMapperProvider, h.saValidator) require.NoError(t, err) } @@ -468,7 +468,7 @@ func TestValidateStandAlone_IDPolicyShouldDefault(t *testing.T) { } h := newTestFrontendHandler(defaultBlobSizeLimitError, defaultBlobSizeLimitWarn, defaultMaxIDLengthLimit) - err := h.validateAndNormalizeStartActivityExecutionRequest(req) + err := validateAndNormalizeStartRequest(req, h.config.MaxIDLengthLimit(), h.config.BlobSizeLimitError, h.config.BlobSizeLimitWarn, h.logger, h.saMapperProvider, h.saValidator) require.NoError(t, err) require.Equal(t, enumspb.ACTIVITY_ID_REUSE_POLICY_ALLOW_DUPLICATE, req.IdReusePolicy) @@ -620,7 +620,7 @@ func TestValidateDeleteActivityExecutionRequest(t *testing.T) { req := &workflowservice.DeleteActivityExecutionRequest{ ActivityId: defaultActivityID, } - err := validateDeleteActivityExecutionRequest(req, defaultMaxIDLengthLimit) + err := validateAndNormalizeDeleteRequest(req, defaultMaxIDLengthLimit) require.NoError(t, err) }) @@ -629,7 +629,7 @@ func TestValidateDeleteActivityExecutionRequest(t *testing.T) { ActivityId: defaultActivityID, RunId: "f47ac10b-58cc-4372-a567-0e02b2c3d479", } - err := validateDeleteActivityExecutionRequest(req, defaultMaxIDLengthLimit) + err := validateAndNormalizeDeleteRequest(req, defaultMaxIDLengthLimit) require.NoError(t, err) }) @@ -637,7 +637,7 @@ func TestValidateDeleteActivityExecutionRequest(t *testing.T) { req := &workflowservice.DeleteActivityExecutionRequest{ ActivityId: "", } - err := validateDeleteActivityExecutionRequest(req, defaultMaxIDLengthLimit) + err := validateAndNormalizeDeleteRequest(req, defaultMaxIDLengthLimit) var invalidArgErr *serviceerror.InvalidArgument require.ErrorAs(t, err, &invalidArgErr) }) @@ -646,7 +646,7 @@ func TestValidateDeleteActivityExecutionRequest(t *testing.T) { req := &workflowservice.DeleteActivityExecutionRequest{ ActivityId: string(make([]byte, defaultMaxIDLengthLimit+1)), } - err := validateDeleteActivityExecutionRequest(req, defaultMaxIDLengthLimit) + err := validateAndNormalizeDeleteRequest(req, defaultMaxIDLengthLimit) var invalidArgErr *serviceerror.InvalidArgument require.ErrorAs(t, err, &invalidArgErr) }) @@ -656,7 +656,7 @@ func TestValidateDeleteActivityExecutionRequest(t *testing.T) { ActivityId: defaultActivityID, RunId: "not-a-valid-uuid", } - err := validateDeleteActivityExecutionRequest(req, defaultMaxIDLengthLimit) + err := validateAndNormalizeDeleteRequest(req, defaultMaxIDLengthLimit) var invalidArgErr *serviceerror.InvalidArgument require.ErrorAs(t, err, &invalidArgErr) })