Skip to content

Commit f8bbf7c

Browse files
authored
Standalone Activity: preserve server-generated request IDs across restarts (#9724)
## What changed? When generating a request ID server-side, set it on the request struct before any cloning so that the mutation is re-used by all retries. ## Why? Without this, there is a bug, although I have not attempted to repro it: 1. Request arrives at Frontend without `requestID` 2. Inside retry interceptor loop, `requestID` is generated and set on a cloned copy 3. Request proceeds to history, starts the execution, but then some networking condition in the cell causes `RetryableInterceptor ` not to receive the Ack (it sees a context expiry) 4. Frontend retries, **generating a new request ID**. But meanwhile the activity has completed. This would be rare, but technically possible. 5. The default reuse policy permits a second execution to be started. This would be a bug: the second start should have been prevented by the request ID. If the user's activity lacks idempotency protection it will lead to incorrectness in the user's systems. ## How did you test it? - [x] built - [x] added weak new unit test(s) for the Start case. - [x] manually tested: ```diff commit fa2476c Author: Dan Davison <dan.davison@temporal.io> Date: 2 days ago Not-for-merge functional test for request-ID stability across server retries Use a package-level atomic to fail StartActivityExecution once after the activity is created at history, triggering the RetryableInterceptor. Without the fix, the retry generates a new request ID and gets ActivityExecutionAlreadyStarted. With the fix, the retry reuses the same request ID and the dedup succeeds. diff --git a/chasm/lib/activity/frontend.go b/chasm/lib/activity/frontend.go index c013b17..2a7b96b 100644 --- a/chasm/lib/activity/frontend.go +++ b/chasm/lib/activity/frontend.go @@ -2,6 +2,7 @@ import ( "context" + "sync/atomic" "github.com/google/uuid" apiactivitypb "go.temporal.io/api/activity/v1" //nolint:importas @@ -35,6 +36,10 @@ type FrontendHandler interface { var ErrStandaloneActivityDisabled = serviceerror.NewUnimplemented("Standalone activity is disabled") +// TestStartFailOnce, when set to true, causes the next StartActivityExecution to return Unavailable +// after the activity is created. It fires once (CAS to false). +var TestStartFailOnce atomic.Bool + type frontendHandler struct { FrontendHandler client activitypb.ActivityServiceClient @@ -100,6 +105,13 @@ func (h *frontendHandler) StartActivityExecution(ctx context.Context, req *workf NamespaceId: namespaceID.String(), FrontendRequest: modifiedReq, }) + if err != nil { + return nil, err + } + + if TestStartFailOnce.CompareAndSwap(true, false) { + return nil, serviceerror.NewUnavailable("test: injected failure after successful creation") + } return resp.GetFrontendResponse(), err } diff --git a/tests/standalone_activity_test.go b/tests/standalone_activity_test.go index 6afc7b6..d5ded28 100644 --- a/tests/standalone_activity_test.go +++ b/tests/standalone_activity_test.go @@ -270,6 +270,36 @@ func (s *standaloneActivityTestSuite) TestIDConflictPolicy() { }) } +func (s *standaloneActivityTestSuite) TestServerGeneratedRequestIDStableAcrossRetries() { + t := s.T() + ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second) + defer cancel() + + activityID := testcore.RandomizeStr(t.Name()) + taskQueue := testcore.RandomizeStr(t.Name()) + + // Make the handler fail once with a retryable error so the RetryableInterceptor retries. + activity.TestStartFailOnce.Store(true) + + resp, err := s.FrontendClient().StartActivityExecution(ctx, &workflowservice.StartActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + ActivityType: s.tv.ActivityType(), + Identity: s.tv.WorkerIdentity(), + Input: defaultInput, + TaskQueue: &taskqueuepb.TaskQueue{ + Name: taskQueue, + }, + StartToCloseTimeout: durationpb.New(defaultStartToCloseTimeout), + // No RequestId — server generates one. + }) + // With the fix, the retry uses the same request ID, so history recognizes it as a dedup + // and succeeds (with Started=false). Without the fix, the retry generates a new request ID + // and gets ActivityExecutionAlreadyStarted. + require.NoError(t, err) + require.NotNil(t, resp) +} + func (s *standaloneActivityTestSuite) TestPollActivityTaskQueue() { t := s.T() ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second) ``` ## Potential risks Could introduce incorrectness into Standalone Activity <!-- CURSOR_SUMMARY --> --- > [!NOTE] > **Medium Risk** > Changes request ID generation semantics for standalone activity Start/Cancel/Terminate paths to improve deduplication across retries; risk is moderate because it touches request mutation behavior that affects idempotency and retry interactions. > > **Overview** > Ensures standalone activity requests reuse a **single** `RequestId` across frontend retries by generating the server-side ID *before* cloning/mutating the request (so subsequent retry attempts see the same ID). > > Removes the prior pre-mutation cloning for `TerminateActivityExecution` and `RequestCancelActivityExecution` request-ID population, and adds a unit test (`frontend_test.go`) asserting `StartActivityExecution` keeps a stable `RequestId` across multiple `validateAndPopulateStartRequest` calls. > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit 885f60d. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent 268e312 commit f8bbf7c

2 files changed

Lines changed: 68 additions & 8 deletions

File tree

chasm/lib/activity/frontend.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -294,8 +294,6 @@ func (h *frontendHandler) TerminateActivityExecution(
294294
}
295295

296296
if req.GetRequestId() == "" {
297-
// Since this mutates the request, we clone it first so that any retries use the original request.
298-
req = common.CloneProto(req)
299297
req.RequestId = uuid.NewString()
300298
}
301299

@@ -333,8 +331,6 @@ func (h *frontendHandler) RequestCancelActivityExecution(
333331
}
334332

335333
if req.GetRequestId() == "" {
336-
// Since this mutates the request, we clone it first so that any retries use the original request.
337-
req = common.CloneProto(req)
338334
req.RequestId = uuid.NewString()
339335
}
340336

@@ -362,6 +358,9 @@ func (h *frontendHandler) validateAndPopulateStartRequest(
362358
req *workflowservice.StartActivityExecutionRequest,
363359
namespaceID namespace.ID,
364360
) (*workflowservice.StartActivityExecutionRequest, error) {
361+
if req.GetRequestId() == "" {
362+
req.RequestId = uuid.NewString()
363+
}
365364
// Since validation includes mutation of the request, we clone it first so that any retries use the original request.
366365
req = common.CloneProto(req)
367366
activityType := req.ActivityType.GetName()
@@ -400,10 +399,6 @@ func (h *frontendHandler) validateAndPopulateStartRequest(
400399
func (h *frontendHandler) validateAndNormalizeStartActivityExecutionRequest(
401400
req *workflowservice.StartActivityExecutionRequest,
402401
) error {
403-
if req.GetRequestId() == "" {
404-
req.RequestId = uuid.NewString()
405-
}
406-
407402
maxIDLengthLimit := h.config.MaxIDLengthLimit()
408403

409404
if len(req.GetRequestId()) > maxIDLengthLimit {
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package activity
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/stretchr/testify/require"
8+
commonpb "go.temporal.io/api/common/v1"
9+
taskqueuepb "go.temporal.io/api/taskqueue/v1"
10+
"go.temporal.io/api/workflowservice/v1"
11+
"go.temporal.io/server/common/log"
12+
"go.temporal.io/server/common/namespace"
13+
"google.golang.org/protobuf/types/known/durationpb"
14+
)
15+
16+
// TestRequestIdStableAcrossRetries verifies that a request ID is re-used
17+
// across retries, even if server-generated.
18+
func TestRequestIdStableAcrossRetries(t *testing.T) {
19+
h := &frontendHandler{
20+
config: &Config{
21+
BlobSizeLimitError: defaultBlobSizeLimitError,
22+
BlobSizeLimitWarn: defaultBlobSizeLimitWarn,
23+
MaxIDLengthLimit: func() int { return defaultMaxIDLengthLimit },
24+
DefaultActivityRetryPolicy: getDefaultRetrySettings,
25+
},
26+
logger: log.NewNoopLogger(),
27+
}
28+
nsID := namespace.ID("test-namespace-id")
29+
30+
newReq := func(requestId string) *workflowservice.StartActivityExecutionRequest {
31+
return &workflowservice.StartActivityExecutionRequest{
32+
Namespace: "test-namespace",
33+
ActivityId: "test-activity",
34+
ActivityType: &commonpb.ActivityType{
35+
Name: "test-type",
36+
},
37+
TaskQueue: &taskqueuepb.TaskQueue{
38+
Name: "test-queue",
39+
},
40+
StartToCloseTimeout: durationpb.New(time.Minute),
41+
RequestId: requestId,
42+
}
43+
}
44+
45+
// Simulate two RetryableInterceptor attempts: both call
46+
// validateAndPopulateStartRequest with the same request pointer.
47+
validateTwoAttempts := func(t *testing.T, req *workflowservice.StartActivityExecutionRequest) {
48+
t.Helper()
49+
clone1, err := h.validateAndPopulateStartRequest(req, nsID)
50+
require.NoError(t, err)
51+
require.NotEmpty(t, clone1.RequestId)
52+
53+
clone2, err := h.validateAndPopulateStartRequest(req, nsID)
54+
require.NoError(t, err)
55+
require.Equal(t, clone1.RequestId, clone2.RequestId)
56+
}
57+
58+
t.Run("server-generated", func(t *testing.T) {
59+
validateTwoAttempts(t, newReq(""))
60+
})
61+
62+
t.Run("client-provided", func(t *testing.T) {
63+
validateTwoAttempts(t, newReq("my-request-id"))
64+
})
65+
}

0 commit comments

Comments
 (0)