Skip to content

Commit 194be87

Browse files
committed
Refactor and rename for consistence with Standalone Nexus Operations
- Move request-ID generation into validation functions - Rename delete method
1 parent 885f60d commit 194be87

4 files changed

Lines changed: 112 additions & 75 deletions

File tree

chasm/lib/activity/frontend.go

Lines changed: 15 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ func (h *frontendHandler) DeleteActivityExecution(
258258
return nil, ErrStandaloneActivityDisabled
259259
}
260260

261-
if err := validateDeleteActivityExecutionRequest(req, h.config.MaxIDLengthLimit()); err != nil {
261+
if err := validateAndNormalizeDeleteRequest(req, h.config.MaxIDLengthLimit()); err != nil {
262262
return nil, err
263263
}
264264

@@ -293,11 +293,7 @@ func (h *frontendHandler) TerminateActivityExecution(
293293
return nil, err
294294
}
295295

296-
if req.GetRequestId() == "" {
297-
req.RequestId = uuid.NewString()
298-
}
299-
300-
if err := validateTerminateActivityExecutionRequest(
296+
if err := validateAndNormalizeTerminateRequest(
301297
req,
302298
h.config.MaxIDLengthLimit(),
303299
h.config.BlobSizeLimitError,
@@ -330,11 +326,7 @@ func (h *frontendHandler) RequestCancelActivityExecution(
330326
return nil, err
331327
}
332328

333-
if req.GetRequestId() == "" {
334-
req.RequestId = uuid.NewString()
335-
}
336-
337-
if err := validateRequestCancelActivityExecutionRequest(
329+
if err := validateAndNormalizeCancelRequest(
338330
req,
339331
h.config.MaxIDLengthLimit(),
340332
h.config.BlobSizeLimitError,
@@ -358,10 +350,12 @@ func (h *frontendHandler) validateAndPopulateStartRequest(
358350
req *workflowservice.StartActivityExecutionRequest,
359351
namespaceID namespace.ID,
360352
) (*workflowservice.StartActivityExecutionRequest, error) {
353+
// Since validation mutates the request, clone it first so that retries use the original
354+
// request. However if the client did not set a request ID then set that before cloning so that
355+
// retries use the same request ID.
361356
if req.GetRequestId() == "" {
362357
req.RequestId = uuid.NewString()
363358
}
364-
// Since validation includes mutation of the request, we clone it first so that any retries use the original request.
365359
req = common.CloneProto(req)
366360
activityType := req.ActivityType.GetName()
367361

@@ -385,57 +379,20 @@ func (h *frontendHandler) validateAndPopulateStartRequest(
385379
}
386380
applyActivityOptionsToStartRequest(opts, req)
387381

388-
err = h.validateAndNormalizeStartActivityExecutionRequest(req)
389-
if err != nil {
390-
return nil, err
391-
}
392-
393-
return req, nil
394-
}
395-
396-
// validateAndNormalizeStartActivityExecutionRequest validates and normalizes the standalone
397-
// activity specific attributes. Note that this method mutates the input params; the caller must
398-
// clone the request if necessary (e.g. if it may be retried).
399-
func (h *frontendHandler) validateAndNormalizeStartActivityExecutionRequest(
400-
req *workflowservice.StartActivityExecutionRequest,
401-
) error {
402-
maxIDLengthLimit := h.config.MaxIDLengthLimit()
403-
404-
if len(req.GetRequestId()) > maxIDLengthLimit {
405-
return serviceerror.NewInvalidArgumentf("request ID exceeds length limit. Length=%d Limit=%d",
406-
len(req.GetRequestId()), maxIDLengthLimit)
407-
}
408-
409-
if len(req.GetIdentity()) > maxIDLengthLimit {
410-
return serviceerror.NewInvalidArgumentf("identity exceeds length limit. Length=%d Limit=%d",
411-
len(req.GetIdentity()), maxIDLengthLimit)
412-
}
413-
414-
if err := normalizeAndValidateIDPolicy(req); err != nil {
415-
return err
416-
}
417-
418-
if err := validateBlobSize(
419-
req.GetActivityId(),
420-
"StartActivityExecution",
382+
err = validateAndNormalizeStartRequest(
383+
req,
384+
h.config.MaxIDLengthLimit(),
421385
h.config.BlobSizeLimitError,
422386
h.config.BlobSizeLimitWarn,
423-
req.Input.Size(),
424387
h.logger,
425-
req.GetNamespace()); err != nil {
426-
return serviceerror.NewInvalidArgument("input exceeds length limit")
427-
}
428-
429-
if req.GetSearchAttributes() != nil {
430-
if err := validateAndNormalizeSearchAttributes(
431-
req,
432-
h.saMapperProvider,
433-
h.saValidator); err != nil {
434-
return err
435-
}
388+
h.saMapperProvider,
389+
h.saValidator,
390+
)
391+
if err != nil {
392+
return nil, err
436393
}
437394

438-
return nil
395+
return req, nil
439396
}
440397

441398
// activityOptionsFromStartRequest builds an ActivityOptions from the inlined fields

chasm/lib/activity/frontend_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,4 +62,32 @@ func TestRequestIdStableAcrossRetries(t *testing.T) {
6262
t.Run("client-provided", func(t *testing.T) {
6363
validateTwoAttempts(t, newReq("my-request-id"))
6464
})
65+
66+
t.Run("terminate/server-generated", func(t *testing.T) {
67+
req := &workflowservice.TerminateActivityExecutionRequest{
68+
Namespace: "test-namespace",
69+
ActivityId: "test-activity",
70+
}
71+
require.NoError(t, validateAndNormalizeTerminateRequest(
72+
req, defaultMaxIDLengthLimit, defaultBlobSizeLimitError, defaultBlobSizeLimitWarn, log.NewNoopLogger()))
73+
require.NotEmpty(t, req.RequestId)
74+
firstID := req.RequestId
75+
require.NoError(t, validateAndNormalizeTerminateRequest(
76+
req, defaultMaxIDLengthLimit, defaultBlobSizeLimitError, defaultBlobSizeLimitWarn, log.NewNoopLogger()))
77+
require.Equal(t, firstID, req.RequestId)
78+
})
79+
80+
t.Run("cancel/server-generated", func(t *testing.T) {
81+
req := &workflowservice.RequestCancelActivityExecutionRequest{
82+
Namespace: "test-namespace",
83+
ActivityId: "test-activity",
84+
}
85+
require.NoError(t, validateAndNormalizeCancelRequest(
86+
req, defaultMaxIDLengthLimit, defaultBlobSizeLimitError, defaultBlobSizeLimitWarn, log.NewNoopLogger()))
87+
require.NotEmpty(t, req.RequestId)
88+
firstID := req.RequestId
89+
require.NoError(t, validateAndNormalizeCancelRequest(
90+
req, defaultMaxIDLengthLimit, defaultBlobSizeLimitError, defaultBlobSizeLimitWarn, log.NewNoopLogger()))
91+
require.Equal(t, firstID, req.RequestId)
92+
})
6593
}

chasm/lib/activity/validator.go

Lines changed: 60 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ func validateAndNormalizeActivityAttributes(
120120
return serviceerror.NewInvalidArgumentf("invalid priorities: %v", err)
121121
}
122122

123-
return normalizeAndValidateTimeouts(activityID,
123+
return validateAndNormalizeTimeouts(activityID,
124124
activityType,
125125
runTimeout,
126126
options)
@@ -140,7 +140,7 @@ func validateActivityRetryPolicy(
140140
return retrypolicy.Validate(retryPolicy)
141141
}
142142

143-
func normalizeAndValidateTimeouts(
143+
func validateAndNormalizeTimeouts(
144144
activityID string,
145145
activityType string,
146146
runTimeout *durationpb.Duration,
@@ -208,7 +208,7 @@ func normalizeAndValidateTimeouts(
208208
return nil
209209
}
210210

211-
func normalizeAndValidateIDPolicy(req *workflowservice.StartActivityExecutionRequest) error {
211+
func validateAndNormalizeIDPolicy(req *workflowservice.StartActivityExecutionRequest) error {
212212
if req.GetIdReusePolicy() == enumspb.ACTIVITY_ID_REUSE_POLICY_UNSPECIFIED {
213213
req.IdReusePolicy = enumspb.ACTIVITY_ID_REUSE_POLICY_ALLOW_DUPLICATE
214214
}
@@ -317,7 +317,55 @@ func validatePollActivityExecutionRequest(
317317
return nil
318318
}
319319

320-
func validateRequestCancelActivityExecutionRequest(
320+
func validateAndNormalizeStartRequest(
321+
req *workflowservice.StartActivityExecutionRequest,
322+
maxIDLengthLimit int,
323+
blobSizeLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter,
324+
blobSizeLimitWarn dynamicconfig.IntPropertyFnWithNamespaceFilter,
325+
logger log.Logger,
326+
saMapperProvider searchattribute.MapperProvider,
327+
saValidator *searchattribute.Validator,
328+
) error {
329+
if req.GetRequestId() == "" {
330+
req.RequestId = uuid.NewString()
331+
} else if len(req.GetRequestId()) > maxIDLengthLimit {
332+
return serviceerror.NewInvalidArgumentf("request ID exceeds length limit. Length=%d Limit=%d",
333+
len(req.GetRequestId()), maxIDLengthLimit)
334+
}
335+
336+
if len(req.GetIdentity()) > maxIDLengthLimit {
337+
return serviceerror.NewInvalidArgumentf("identity exceeds length limit. Length=%d Limit=%d",
338+
len(req.GetIdentity()), maxIDLengthLimit)
339+
}
340+
341+
if err := validateAndNormalizeIDPolicy(req); err != nil {
342+
return err
343+
}
344+
345+
if err := validateBlobSize(
346+
req.GetActivityId(),
347+
"StartActivityExecution",
348+
blobSizeLimitError,
349+
blobSizeLimitWarn,
350+
req.Input.Size(),
351+
logger,
352+
req.GetNamespace()); err != nil {
353+
return serviceerror.NewInvalidArgument("input exceeds length limit")
354+
}
355+
356+
if req.GetSearchAttributes() != nil {
357+
if err := validateAndNormalizeSearchAttributes(
358+
req,
359+
saMapperProvider,
360+
saValidator); err != nil {
361+
return err
362+
}
363+
}
364+
365+
return nil
366+
}
367+
368+
func validateAndNormalizeCancelRequest(
321369
req *workflowservice.RequestCancelActivityExecutionRequest,
322370
maxIDLengthLimit int,
323371
blobSizeLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter,
@@ -333,7 +381,9 @@ func validateRequestCancelActivityExecutionRequest(
333381
len(req.GetActivityId()), maxIDLengthLimit)
334382
}
335383

336-
if len(req.GetRequestId()) > maxIDLengthLimit {
384+
if req.GetRequestId() == "" {
385+
req.RequestId = uuid.NewString()
386+
} else if len(req.GetRequestId()) > maxIDLengthLimit {
337387
return serviceerror.NewInvalidArgumentf("request ID exceeds length limit. Length=%d Limit=%d",
338388
len(req.GetRequestId()), maxIDLengthLimit)
339389
}
@@ -365,7 +415,7 @@ func validateRequestCancelActivityExecutionRequest(
365415
return nil
366416
}
367417

368-
func validateDeleteActivityExecutionRequest(
418+
func validateAndNormalizeDeleteRequest(
369419
req *workflowservice.DeleteActivityExecutionRequest,
370420
maxIDLengthLimit int,
371421
) error {
@@ -388,7 +438,7 @@ func validateDeleteActivityExecutionRequest(
388438
return nil
389439
}
390440

391-
func validateTerminateActivityExecutionRequest(
441+
func validateAndNormalizeTerminateRequest(
392442
req *workflowservice.TerminateActivityExecutionRequest,
393443
maxIDLengthLimit int,
394444
blobSizeLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter,
@@ -404,7 +454,9 @@ func validateTerminateActivityExecutionRequest(
404454
len(req.GetActivityId()), maxIDLengthLimit)
405455
}
406456

407-
if len(req.GetRequestId()) > maxIDLengthLimit {
457+
if req.GetRequestId() == "" {
458+
req.RequestId = uuid.NewString()
459+
} else if len(req.GetRequestId()) > maxIDLengthLimit {
408460
return serviceerror.NewInvalidArgumentf("request ID exceeds length limit. Length=%d Limit=%d",
409461
len(req.GetRequestId()), maxIDLengthLimit)
410462
}

chasm/lib/activity/validator_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -403,7 +403,7 @@ func TestValidateStandAloneRequestIDTooLong(t *testing.T) {
403403
}
404404

405405
h := newTestFrontendHandler(defaultBlobSizeLimitError, defaultBlobSizeLimitWarn, defaultMaxIDLengthLimit)
406-
err := h.validateAndNormalizeStartActivityExecutionRequest(req)
406+
err := validateAndNormalizeStartRequest(req, h.config.MaxIDLengthLimit(), h.config.BlobSizeLimitError, h.config.BlobSizeLimitWarn, h.logger, h.saMapperProvider, h.saValidator)
407407
var invalidArgErr *serviceerror.InvalidArgument
408408
require.ErrorAs(t, err, &invalidArgErr)
409409
}
@@ -423,7 +423,7 @@ func TestValidateStandAloneInputTooLarge(t *testing.T) {
423423
}
424424

425425
h := newTestFrontendHandler(defaultBlobSizeLimitError, defaultBlobSizeLimitWarn, defaultMaxIDLengthLimit)
426-
err := h.validateAndNormalizeStartActivityExecutionRequest(req)
426+
err := validateAndNormalizeStartRequest(req, h.config.MaxIDLengthLimit(), h.config.BlobSizeLimitError, h.config.BlobSizeLimitWarn, h.logger, h.saMapperProvider, h.saValidator)
427427
var invalidArgErr *serviceerror.InvalidArgument
428428
require.ErrorAs(t, err, &invalidArgErr)
429429
}
@@ -450,7 +450,7 @@ func TestValidateStandAloneInputWarningSizeShouldSucceed(t *testing.T) {
450450
func(ns string) int { return payloadSize },
451451
defaultMaxIDLengthLimit,
452452
)
453-
err := h.validateAndNormalizeStartActivityExecutionRequest(req)
453+
err := validateAndNormalizeStartRequest(req, h.config.MaxIDLengthLimit(), h.config.BlobSizeLimitError, h.config.BlobSizeLimitWarn, h.logger, h.saMapperProvider, h.saValidator)
454454
require.NoError(t, err)
455455
}
456456

@@ -468,7 +468,7 @@ func TestValidateStandAlone_IDPolicyShouldDefault(t *testing.T) {
468468
}
469469

470470
h := newTestFrontendHandler(defaultBlobSizeLimitError, defaultBlobSizeLimitWarn, defaultMaxIDLengthLimit)
471-
err := h.validateAndNormalizeStartActivityExecutionRequest(req)
471+
err := validateAndNormalizeStartRequest(req, h.config.MaxIDLengthLimit(), h.config.BlobSizeLimitError, h.config.BlobSizeLimitWarn, h.logger, h.saMapperProvider, h.saValidator)
472472

473473
require.NoError(t, err)
474474
require.Equal(t, enumspb.ACTIVITY_ID_REUSE_POLICY_ALLOW_DUPLICATE, req.IdReusePolicy)
@@ -620,7 +620,7 @@ func TestValidateDeleteActivityExecutionRequest(t *testing.T) {
620620
req := &workflowservice.DeleteActivityExecutionRequest{
621621
ActivityId: defaultActivityID,
622622
}
623-
err := validateDeleteActivityExecutionRequest(req, defaultMaxIDLengthLimit)
623+
err := validateAndNormalizeDeleteRequest(req, defaultMaxIDLengthLimit)
624624
require.NoError(t, err)
625625
})
626626

@@ -629,15 +629,15 @@ func TestValidateDeleteActivityExecutionRequest(t *testing.T) {
629629
ActivityId: defaultActivityID,
630630
RunId: "f47ac10b-58cc-4372-a567-0e02b2c3d479",
631631
}
632-
err := validateDeleteActivityExecutionRequest(req, defaultMaxIDLengthLimit)
632+
err := validateAndNormalizeDeleteRequest(req, defaultMaxIDLengthLimit)
633633
require.NoError(t, err)
634634
})
635635

636636
t.Run("EmptyActivityID", func(t *testing.T) {
637637
req := &workflowservice.DeleteActivityExecutionRequest{
638638
ActivityId: "",
639639
}
640-
err := validateDeleteActivityExecutionRequest(req, defaultMaxIDLengthLimit)
640+
err := validateAndNormalizeDeleteRequest(req, defaultMaxIDLengthLimit)
641641
var invalidArgErr *serviceerror.InvalidArgument
642642
require.ErrorAs(t, err, &invalidArgErr)
643643
})
@@ -646,7 +646,7 @@ func TestValidateDeleteActivityExecutionRequest(t *testing.T) {
646646
req := &workflowservice.DeleteActivityExecutionRequest{
647647
ActivityId: string(make([]byte, defaultMaxIDLengthLimit+1)),
648648
}
649-
err := validateDeleteActivityExecutionRequest(req, defaultMaxIDLengthLimit)
649+
err := validateAndNormalizeDeleteRequest(req, defaultMaxIDLengthLimit)
650650
var invalidArgErr *serviceerror.InvalidArgument
651651
require.ErrorAs(t, err, &invalidArgErr)
652652
})
@@ -656,7 +656,7 @@ func TestValidateDeleteActivityExecutionRequest(t *testing.T) {
656656
ActivityId: defaultActivityID,
657657
RunId: "not-a-valid-uuid",
658658
}
659-
err := validateDeleteActivityExecutionRequest(req, defaultMaxIDLengthLimit)
659+
err := validateAndNormalizeDeleteRequest(req, defaultMaxIDLengthLimit)
660660
var invalidArgErr *serviceerror.InvalidArgument
661661
require.ErrorAs(t, err, &invalidArgErr)
662662
})

0 commit comments

Comments
 (0)