Skip to content

Commit 7d0c768

Browse files
committed
Add functional test for request-ID stability across server-side retries
Use a package-level atomic to fail StartActivityExecution once after the activity is created at history, triggering the RetryableInterceptor. Without the fix, the retry generates a new request ID and gets ActivityExecutionAlreadyStarted. With the fix, the retry reuses the same request ID and the dedup succeeds.
1 parent f8bbf7c commit 7d0c768

2 files changed

Lines changed: 42 additions & 0 deletions

File tree

chasm/lib/activity/frontend.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package activity
22

33
import (
44
"context"
5+
"sync/atomic"
56

67
"github.com/google/uuid"
78
apiactivitypb "go.temporal.io/api/activity/v1" //nolint:importas
@@ -35,6 +36,10 @@ type FrontendHandler interface {
3536

3637
var ErrStandaloneActivityDisabled = serviceerror.NewUnimplemented("Standalone activity is disabled")
3738

39+
// TestStartFailOnce, when set to true, causes the next StartActivityExecution to return Unavailable
40+
// after the activity is created. It fires once (CAS to false).
41+
var TestStartFailOnce atomic.Bool
42+
3843
type frontendHandler struct {
3944
FrontendHandler
4045
client activitypb.ActivityServiceClient
@@ -100,6 +105,13 @@ func (h *frontendHandler) StartActivityExecution(ctx context.Context, req *workf
100105
NamespaceId: namespaceID.String(),
101106
FrontendRequest: modifiedReq,
102107
})
108+
if err != nil {
109+
return nil, err
110+
}
111+
112+
if TestStartFailOnce.CompareAndSwap(true, false) {
113+
return nil, serviceerror.NewUnavailable("test: injected failure after successful creation")
114+
}
103115

104116
return resp.GetFrontendResponse(), err
105117
}

tests/standalone_activity_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,36 @@ func (s *standaloneActivityTestSuite) TestIDConflictPolicy() {
270270
})
271271
}
272272

273+
func (s *standaloneActivityTestSuite) TestServerGeneratedRequestIDStableAcrossRetries() {
274+
t := s.T()
275+
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second)
276+
defer cancel()
277+
278+
activityID := testcore.RandomizeStr(t.Name())
279+
taskQueue := testcore.RandomizeStr(t.Name())
280+
281+
// Make the handler fail once with a retryable error so the RetryableInterceptor retries.
282+
activity.TestStartFailOnce.Store(true)
283+
284+
resp, err := s.FrontendClient().StartActivityExecution(ctx, &workflowservice.StartActivityExecutionRequest{
285+
Namespace: s.Namespace().String(),
286+
ActivityId: activityID,
287+
ActivityType: s.tv.ActivityType(),
288+
Identity: s.tv.WorkerIdentity(),
289+
Input: defaultInput,
290+
TaskQueue: &taskqueuepb.TaskQueue{
291+
Name: taskQueue,
292+
},
293+
StartToCloseTimeout: durationpb.New(defaultStartToCloseTimeout),
294+
// No RequestId — server generates one.
295+
})
296+
// With the fix, the retry uses the same request ID, so history recognizes it as a dedup
297+
// and succeeds (with Started=false). Without the fix, the retry generates a new request ID
298+
// and gets ActivityExecutionAlreadyStarted.
299+
require.NoError(t, err)
300+
require.NotNil(t, resp)
301+
}
302+
273303
func (s *standaloneActivityTestSuite) TestPollActivityTaskQueue() {
274304
t := s.T()
275305
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second)

0 commit comments

Comments
 (0)