Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 15 additions & 58 deletions chasm/lib/activity/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func (h *frontendHandler) DeleteActivityExecution(
return nil, ErrStandaloneActivityDisabled
}

if err := validateDeleteActivityExecutionRequest(req, h.config.MaxIDLengthLimit()); err != nil {
if err := validateAndNormalizeDeleteRequest(req, h.config.MaxIDLengthLimit()); err != nil {
return nil, err
}

Expand Down Expand Up @@ -293,11 +293,7 @@ func (h *frontendHandler) TerminateActivityExecution(
return nil, err
}

if req.GetRequestId() == "" {
req.RequestId = uuid.NewString()
}

if err := validateTerminateActivityExecutionRequest(
if err := validateAndNormalizeTerminateRequest(
req,
h.config.MaxIDLengthLimit(),
h.config.BlobSizeLimitError,
Expand Down Expand Up @@ -330,11 +326,7 @@ func (h *frontendHandler) RequestCancelActivityExecution(
return nil, err
}

if req.GetRequestId() == "" {
req.RequestId = uuid.NewString()
}

if err := validateRequestCancelActivityExecutionRequest(
if err := validateAndNormalizeCancelRequest(
req,
h.config.MaxIDLengthLimit(),
h.config.BlobSizeLimitError,
Expand All @@ -358,10 +350,12 @@ func (h *frontendHandler) validateAndPopulateStartRequest(
req *workflowservice.StartActivityExecutionRequest,
namespaceID namespace.ID,
) (*workflowservice.StartActivityExecutionRequest, error) {
// Since validation mutates the request, clone it first so that retries use the original
// request. However if the client did not set a request ID then set that before cloning so that
// retries use the same request ID.
if req.GetRequestId() == "" {
req.RequestId = uuid.NewString()
}
// Since validation includes mutation of the request, we clone it first so that any retries use the original request.
req = common.CloneProto(req)
activityType := req.ActivityType.GetName()

Expand All @@ -385,57 +379,20 @@ func (h *frontendHandler) validateAndPopulateStartRequest(
}
applyActivityOptionsToStartRequest(opts, req)

err = h.validateAndNormalizeStartActivityExecutionRequest(req)
if err != nil {
return nil, err
}

return req, nil
}

// validateAndNormalizeStartActivityExecutionRequest validates and normalizes the standalone
// activity specific attributes. Note that this method mutates the input params; the caller must
// clone the request if necessary (e.g. if it may be retried).
func (h *frontendHandler) validateAndNormalizeStartActivityExecutionRequest(
req *workflowservice.StartActivityExecutionRequest,
) error {
maxIDLengthLimit := h.config.MaxIDLengthLimit()

if len(req.GetRequestId()) > maxIDLengthLimit {
return serviceerror.NewInvalidArgumentf("request ID exceeds length limit. Length=%d Limit=%d",
len(req.GetRequestId()), maxIDLengthLimit)
}

if len(req.GetIdentity()) > maxIDLengthLimit {
return serviceerror.NewInvalidArgumentf("identity exceeds length limit. Length=%d Limit=%d",
len(req.GetIdentity()), maxIDLengthLimit)
}

if err := normalizeAndValidateIDPolicy(req); err != nil {
return err
}

if err := validateBlobSize(
req.GetActivityId(),
"StartActivityExecution",
err = validateAndNormalizeStartRequest(
req,
h.config.MaxIDLengthLimit(),
h.config.BlobSizeLimitError,
h.config.BlobSizeLimitWarn,
req.Input.Size(),
h.logger,
req.GetNamespace()); err != nil {
return serviceerror.NewInvalidArgument("input exceeds length limit")
}

if req.GetSearchAttributes() != nil {
if err := validateAndNormalizeSearchAttributes(
req,
h.saMapperProvider,
h.saValidator); err != nil {
return err
}
h.saMapperProvider,
h.saValidator,
)
if err != nil {
return nil, err
}

return nil
return req, nil
}

// activityOptionsFromStartRequest builds an ActivityOptions from the inlined fields
Expand Down
40 changes: 38 additions & 2 deletions chasm/lib/activity/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ import (
"google.golang.org/protobuf/types/known/durationpb"
)

type hasRequestID interface {
GetRequestId() string
}

// TestRequestIdStableAcrossRetries verifies that a request ID is re-used
// across retries, even if server-generated.
func TestRequestIdStableAcrossRetries(t *testing.T) {
Expand Down Expand Up @@ -55,11 +59,43 @@ func TestRequestIdStableAcrossRetries(t *testing.T) {
require.Equal(t, clone1.RequestId, clone2.RequestId)
}

t.Run("server-generated", func(t *testing.T) {
// validateTwice calls validate twice and asserts the request ID is stable.
validateTwice := func(t *testing.T, req hasRequestID, validate func() error) {
t.Helper()
require.NoError(t, validate())
require.NotEmpty(t, req.GetRequestId())
firstID := req.GetRequestId()
require.NoError(t, validate())
require.Equal(t, firstID, req.GetRequestId())
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional: if you wanted to be clever and remove some duplication you could have this use an interface that only provides GetRequestId; then you can pass both StartActivityExecutionRequest and TerminateActivityExecutionRequest into it


t.Run("start/server-generated", func(t *testing.T) {
validateTwoAttempts(t, newReq(""))
})

t.Run("client-provided", func(t *testing.T) {
t.Run("start/client-provided", func(t *testing.T) {
validateTwoAttempts(t, newReq("my-request-id"))
})

t.Run("terminate/server-generated", func(t *testing.T) {
req := &workflowservice.TerminateActivityExecutionRequest{
Namespace: "test-namespace",
ActivityId: "test-activity",
}
validateTwice(t, req, func() error {
return validateAndNormalizeTerminateRequest(
req, defaultMaxIDLengthLimit, defaultBlobSizeLimitError, defaultBlobSizeLimitWarn, log.NewNoopLogger())
})
})

t.Run("cancel/server-generated", func(t *testing.T) {
req := &workflowservice.RequestCancelActivityExecutionRequest{
Namespace: "test-namespace",
ActivityId: "test-activity",
}
validateTwice(t, req, func() error {
return validateAndNormalizeCancelRequest(
req, defaultMaxIDLengthLimit, defaultBlobSizeLimitError, defaultBlobSizeLimitWarn, log.NewNoopLogger())
})
})
}
68 changes: 60 additions & 8 deletions chasm/lib/activity/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func validateAndNormalizeActivityAttributes(
return serviceerror.NewInvalidArgumentf("invalid priorities: %v", err)
}

return normalizeAndValidateTimeouts(activityID,
return validateAndNormalizeTimeouts(activityID,
activityType,
runTimeout,
options)
Expand All @@ -140,7 +140,7 @@ func validateActivityRetryPolicy(
return retrypolicy.Validate(retryPolicy)
}

func normalizeAndValidateTimeouts(
func validateAndNormalizeTimeouts(
activityID string,
activityType string,
runTimeout *durationpb.Duration,
Expand Down Expand Up @@ -208,7 +208,7 @@ func normalizeAndValidateTimeouts(
return nil
}

func normalizeAndValidateIDPolicy(req *workflowservice.StartActivityExecutionRequest) error {
func validateAndNormalizeIDPolicy(req *workflowservice.StartActivityExecutionRequest) error {
if req.GetIdReusePolicy() == enumspb.ACTIVITY_ID_REUSE_POLICY_UNSPECIFIED {
req.IdReusePolicy = enumspb.ACTIVITY_ID_REUSE_POLICY_ALLOW_DUPLICATE
}
Expand Down Expand Up @@ -317,7 +317,55 @@ func validatePollActivityExecutionRequest(
return nil
}

func validateRequestCancelActivityExecutionRequest(
func validateAndNormalizeStartRequest(
req *workflowservice.StartActivityExecutionRequest,
maxIDLengthLimit int,
blobSizeLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter,
blobSizeLimitWarn dynamicconfig.IntPropertyFnWithNamespaceFilter,
logger log.Logger,
saMapperProvider searchattribute.MapperProvider,
saValidator *searchattribute.Validator,
) error {
if req.GetRequestId() == "" {
req.RequestId = uuid.NewString()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can never happen because validateAndPopulateStartRequest already does that?

Copy link
Copy Markdown
Contributor Author

@dandavison dandavison Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good spot. It's true, and this is questionable, but it bothered me to have the cancel and terminate ones doing it and the start one not doing it. That would mean the start one "knows" about what's going on in its upstream caller. So I decided to pay one instruction (or whatever) here and get consistency and future-proofing in return. But lmk if you don't like it.

} else if len(req.GetRequestId()) > maxIDLengthLimit {
return serviceerror.NewInvalidArgumentf("request ID exceeds length limit. Length=%d Limit=%d",
len(req.GetRequestId()), maxIDLengthLimit)
}

if len(req.GetIdentity()) > maxIDLengthLimit {
return serviceerror.NewInvalidArgumentf("identity exceeds length limit. Length=%d Limit=%d",
len(req.GetIdentity()), maxIDLengthLimit)
}

if err := validateAndNormalizeIDPolicy(req); err != nil {
return err
}

if err := validateBlobSize(
req.GetActivityId(),
"StartActivityExecution",
blobSizeLimitError,
blobSizeLimitWarn,
req.Input.Size(),
logger,
req.GetNamespace()); err != nil {
return serviceerror.NewInvalidArgument("input exceeds length limit")
}

if req.GetSearchAttributes() != nil {
if err := validateAndNormalizeSearchAttributes(
req,
saMapperProvider,
saValidator); err != nil {
return err
}
}

return nil
}

func validateAndNormalizeCancelRequest(
req *workflowservice.RequestCancelActivityExecutionRequest,
maxIDLengthLimit int,
blobSizeLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter,
Expand All @@ -333,7 +381,9 @@ func validateRequestCancelActivityExecutionRequest(
len(req.GetActivityId()), maxIDLengthLimit)
}

if len(req.GetRequestId()) > maxIDLengthLimit {
if req.GetRequestId() == "" {
req.RequestId = uuid.NewString()
} else if len(req.GetRequestId()) > maxIDLengthLimit {
return serviceerror.NewInvalidArgumentf("request ID exceeds length limit. Length=%d Limit=%d",
len(req.GetRequestId()), maxIDLengthLimit)
}
Expand Down Expand Up @@ -365,7 +415,7 @@ func validateRequestCancelActivityExecutionRequest(
return nil
}

func validateDeleteActivityExecutionRequest(
func validateAndNormalizeDeleteRequest(
req *workflowservice.DeleteActivityExecutionRequest,
maxIDLengthLimit int,
) error {
Expand All @@ -388,7 +438,7 @@ func validateDeleteActivityExecutionRequest(
return nil
}

func validateTerminateActivityExecutionRequest(
func validateAndNormalizeTerminateRequest(
req *workflowservice.TerminateActivityExecutionRequest,
maxIDLengthLimit int,
blobSizeLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter,
Expand All @@ -404,7 +454,9 @@ func validateTerminateActivityExecutionRequest(
len(req.GetActivityId()), maxIDLengthLimit)
}

if len(req.GetRequestId()) > maxIDLengthLimit {
if req.GetRequestId() == "" {
req.RequestId = uuid.NewString()
} else if len(req.GetRequestId()) > maxIDLengthLimit {
return serviceerror.NewInvalidArgumentf("request ID exceeds length limit. Length=%d Limit=%d",
len(req.GetRequestId()), maxIDLengthLimit)
}
Expand Down
18 changes: 9 additions & 9 deletions chasm/lib/activity/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ func TestValidateStandAloneRequestIDTooLong(t *testing.T) {
}

h := newTestFrontendHandler(defaultBlobSizeLimitError, defaultBlobSizeLimitWarn, defaultMaxIDLengthLimit)
err := h.validateAndNormalizeStartActivityExecutionRequest(req)
err := validateAndNormalizeStartRequest(req, h.config.MaxIDLengthLimit(), h.config.BlobSizeLimitError, h.config.BlobSizeLimitWarn, h.logger, h.saMapperProvider, h.saValidator)
var invalidArgErr *serviceerror.InvalidArgument
require.ErrorAs(t, err, &invalidArgErr)
}
Expand All @@ -423,7 +423,7 @@ func TestValidateStandAloneInputTooLarge(t *testing.T) {
}

h := newTestFrontendHandler(defaultBlobSizeLimitError, defaultBlobSizeLimitWarn, defaultMaxIDLengthLimit)
err := h.validateAndNormalizeStartActivityExecutionRequest(req)
err := validateAndNormalizeStartRequest(req, h.config.MaxIDLengthLimit(), h.config.BlobSizeLimitError, h.config.BlobSizeLimitWarn, h.logger, h.saMapperProvider, h.saValidator)
var invalidArgErr *serviceerror.InvalidArgument
require.ErrorAs(t, err, &invalidArgErr)
}
Expand All @@ -450,7 +450,7 @@ func TestValidateStandAloneInputWarningSizeShouldSucceed(t *testing.T) {
func(ns string) int { return payloadSize },
defaultMaxIDLengthLimit,
)
err := h.validateAndNormalizeStartActivityExecutionRequest(req)
err := validateAndNormalizeStartRequest(req, h.config.MaxIDLengthLimit(), h.config.BlobSizeLimitError, h.config.BlobSizeLimitWarn, h.logger, h.saMapperProvider, h.saValidator)
require.NoError(t, err)
}

Expand All @@ -468,7 +468,7 @@ func TestValidateStandAlone_IDPolicyShouldDefault(t *testing.T) {
}

h := newTestFrontendHandler(defaultBlobSizeLimitError, defaultBlobSizeLimitWarn, defaultMaxIDLengthLimit)
err := h.validateAndNormalizeStartActivityExecutionRequest(req)
err := validateAndNormalizeStartRequest(req, h.config.MaxIDLengthLimit(), h.config.BlobSizeLimitError, h.config.BlobSizeLimitWarn, h.logger, h.saMapperProvider, h.saValidator)

require.NoError(t, err)
require.Equal(t, enumspb.ACTIVITY_ID_REUSE_POLICY_ALLOW_DUPLICATE, req.IdReusePolicy)
Expand Down Expand Up @@ -620,7 +620,7 @@ func TestValidateDeleteActivityExecutionRequest(t *testing.T) {
req := &workflowservice.DeleteActivityExecutionRequest{
ActivityId: defaultActivityID,
}
err := validateDeleteActivityExecutionRequest(req, defaultMaxIDLengthLimit)
err := validateAndNormalizeDeleteRequest(req, defaultMaxIDLengthLimit)
require.NoError(t, err)
})

Expand All @@ -629,15 +629,15 @@ func TestValidateDeleteActivityExecutionRequest(t *testing.T) {
ActivityId: defaultActivityID,
RunId: "f47ac10b-58cc-4372-a567-0e02b2c3d479",
}
err := validateDeleteActivityExecutionRequest(req, defaultMaxIDLengthLimit)
err := validateAndNormalizeDeleteRequest(req, defaultMaxIDLengthLimit)
require.NoError(t, err)
})

t.Run("EmptyActivityID", func(t *testing.T) {
req := &workflowservice.DeleteActivityExecutionRequest{
ActivityId: "",
}
err := validateDeleteActivityExecutionRequest(req, defaultMaxIDLengthLimit)
err := validateAndNormalizeDeleteRequest(req, defaultMaxIDLengthLimit)
var invalidArgErr *serviceerror.InvalidArgument
require.ErrorAs(t, err, &invalidArgErr)
})
Expand All @@ -646,7 +646,7 @@ func TestValidateDeleteActivityExecutionRequest(t *testing.T) {
req := &workflowservice.DeleteActivityExecutionRequest{
ActivityId: string(make([]byte, defaultMaxIDLengthLimit+1)),
}
err := validateDeleteActivityExecutionRequest(req, defaultMaxIDLengthLimit)
err := validateAndNormalizeDeleteRequest(req, defaultMaxIDLengthLimit)
var invalidArgErr *serviceerror.InvalidArgument
require.ErrorAs(t, err, &invalidArgErr)
})
Expand All @@ -656,7 +656,7 @@ func TestValidateDeleteActivityExecutionRequest(t *testing.T) {
ActivityId: defaultActivityID,
RunId: "not-a-valid-uuid",
}
err := validateDeleteActivityExecutionRequest(req, defaultMaxIDLengthLimit)
err := validateAndNormalizeDeleteRequest(req, defaultMaxIDLengthLimit)
var invalidArgErr *serviceerror.InvalidArgument
require.ErrorAs(t, err, &invalidArgErr)
})
Expand Down
Loading