diff --git a/api/matchingservice/v1/request_response.pb.go b/api/matchingservice/v1/request_response.pb.go index 924fc90ee2d..3221c319e9b 100644 --- a/api/matchingservice/v1/request_response.pb.go +++ b/api/matchingservice/v1/request_response.pb.go @@ -5374,10 +5374,25 @@ func (x *CheckTaskQueueVersionMembershipRequest) GetVersion() *v110.WorkerDeploy } type CheckTaskQueueVersionMembershipResponse struct { - state protoimpl.MessageState `protogen:"open.v1"` - IsMember bool `protobuf:"varint,1,opt,name=is_member,json=isMember,proto3" json:"is_member,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + IsMember bool `protobuf:"varint,1,opt,name=is_member,json=isMember,proto3" json:"is_member,omitempty"` + // True when a reactivation signal to this version would be redundant — i.e., matching + // determined the version is already in a state where it does not need to be reactivated + // (today: CURRENT, RAMPING, or DRAINING). History uses this to suppress such signals. + // The zero value (false) is the safe default; it applies when matching has no definitive + // answer (version not present in matching's deployment data, or old matching servers + // that do not set this field) and tells history to send the signal. + ShouldSkipReactivation bool `protobuf:"varint,2,opt,name=should_skip_reactivation,json=shouldSkipReactivation,proto3" json:"should_skip_reactivation,omitempty"` + // revision_number is the version's current revision as tracked in matching's per-TQ + // deployment data. It is returned so history can compose a stable, cluster-wide-deterministic + // RequestId on the reactivation signal. All history pods querying the same version at the + // same point in time converge on the same revision_number, so Temporal's built-in + // SignalRequestedIds dedup (see signalworkflow/api.go) collapses the N-pod signal fan-out + // into exactly one event on the version workflow. Zero when unknown (old matching server or + // legacy DeploymentVersionData format that does not carry revision_number). + RevisionNumber int64 `protobuf:"varint,3,opt,name=revision_number,json=revisionNumber,proto3" json:"revision_number,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *CheckTaskQueueVersionMembershipResponse) Reset() { @@ -5417,6 +5432,20 @@ func (x *CheckTaskQueueVersionMembershipResponse) GetIsMember() bool { return false } +func (x *CheckTaskQueueVersionMembershipResponse) GetShouldSkipReactivation() bool { + if x != nil { + return x.ShouldSkipReactivation + } + return false +} + +func (x *CheckTaskQueueVersionMembershipResponse) GetRevisionNumber() int64 { + if x != nil { + return x.RevisionNumber + } + return 0 +} + // PollConditions are extra conditions to set on the poll. Only supported with new matcher. type PollConditions struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -6160,9 +6189,11 @@ const file_temporal_server_api_matchingservice_v1_request_response_proto_rawDesc "\n" + "task_queue\x18\x02 \x01(\tR\ttaskQueue\x12L\n" + "\x0ftask_queue_type\x18\x03 \x01(\x0e2$.temporal.api.enums.v1.TaskQueueTypeR\rtaskQueueType\x12T\n" + - "\aversion\x18\x04 \x01(\v2:.temporal.server.api.deployment.v1.WorkerDeploymentVersionR\aversion\"F\n" + + "\aversion\x18\x04 \x01(\v2:.temporal.server.api.deployment.v1.WorkerDeploymentVersionR\aversion\"\xa9\x01\n" + "'CheckTaskQueueVersionMembershipResponse\x12\x1b\n" + - "\tis_member\x18\x01 \x01(\bR\bisMember\"L\n" + + "\tis_member\x18\x01 \x01(\bR\bisMember\x128\n" + + "\x18should_skip_reactivation\x18\x02 \x01(\bR\x16shouldSkipReactivation\x12'\n" + + "\x0frevision_number\x18\x03 \x01(\x03R\x0erevisionNumber\"L\n" + "\x0ePollConditions\x12!\n" + "\fmin_priority\x18\x01 \x01(\x05R\vminPriority\x12\x17\n" + "\ano_wait\x18\x02 \x01(\bR\x06noWaitB>Z= 5 - }, 10*time.Second, 1*time.Second) - - // Signal the workflow to complete. - s.NoError(s.SdkClient().SignalWorkflow(ctx, wfTV2.WorkflowID(), run2.GetRunID(), "complete", nil)) - s.NoError(run2.Get(ctx, &result)) -} - -func (s *DeploymentVersionSuite) TestReactivationSignalCache_Deduplication_SignalWithStart() { - s.OverrideDynamicConfig(dynamicconfig.VersionDrainageStatusVisibilityGracePeriod, testVersionDrainageVisibilityGracePeriod) - s.OverrideDynamicConfig(dynamicconfig.VersionDrainageStatusRefreshInterval, testLongVersionDrainageRefreshInterval) - - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) - defer cancel() - - // Use shorter, explicit deployment series names to avoid truncation issues - deploymentName := fmt.Sprintf("test-sws-cache-dedup-wfv%d", s.workflowVersion) - tv1 := testvars.New(s).WithDeploymentSeries(deploymentName).WithBuildID(deploymentName + "-v1").WithTaskQueue("test-sws-cache-dedup-tq") - - s.startVersionWorkflow(ctx, tv1) - - // Workflow that waits for a signal before completing - wf := func(ctx workflow.Context) (string, error) { - workflow.GetSignalChannel(ctx, "complete").Receive(ctx, nil) - return "done", nil - } - - // Register worker for version 1 - w1 := worker.New(s.SdkClient(), tv1.TaskQueue().String(), worker.Options{ - DeploymentOptions: worker.DeploymentOptions{ - Version: tv1.SDKDeploymentVersion(), - UseVersioning: true, - }, - }) - w1.RegisterWorkflowWithOptions(wf, workflow.RegisterOptions{ - Name: "waitingWorkflow", - VersioningBehavior: workflow.VersioningBehaviorAutoUpgrade, - }) - s.NoError(w1.Start()) - defer w1.Stop() - - // === FIRST SIGNAL WITH START: Should trigger reactivation (cache miss) === - wfTV1 := testvars.New(s) - var run1 sdkclient.WorkflowRun - s.Eventually(func() bool { - var startErr error - run1, startErr = s.SdkClient().SignalWithStartWorkflow(ctx, - wfTV1.WorkflowID(), - "start-signal", - nil, - sdkclient.StartWorkflowOptions{ - TaskQueue: tv1.TaskQueue().String(), - VersioningOverride: &sdkclient.PinnedVersioningOverride{ - Version: tv1.SDKDeploymentVersion(), - }, - }, - "waitingWorkflow", - ) - return startErr == nil - }, 10*time.Second, 500*time.Millisecond) - - // Version 1 should transition to DRAINING (reactivated) - s.checkVersionDrainageAndVersionStatus(ctx, tv1, - &deploymentpb.VersionDrainageInfo{Status: enumspb.VERSION_DRAINAGE_STATUS_DRAINING}, - enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_DRAINING, - 0) - - // Signal workflow to complete - s.NoError(s.SdkClient().SignalWorkflow(ctx, wfTV1.WorkflowID(), run1.GetRunID(), "complete", nil)) - - // Wait for workflow to complete - var result string - s.NoError(run1.Get(ctx, &result)) - - // Wait for version 1 to become DRAINED again (workflow completed) - s.checkVersionDrainageAndVersionStatus(ctx, tv1, - &deploymentpb.VersionDrainageInfo{Status: enumspb.VERSION_DRAINAGE_STATUS_DRAINED}, - enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_DRAINED, - testVersionDrainageVisibilityGracePeriod+testLongVersionDrainageRefreshInterval) - - // === SECOND SIGNAL WITH START: Should NOT trigger reactivation (cache hit) === - wfTV2 := testvars.New(s) - var run2 sdkclient.WorkflowRun - s.Eventually(func() bool { - var startErr error - run2, startErr = s.SdkClient().SignalWithStartWorkflow(ctx, - wfTV2.WorkflowID(), - "start-signal", - nil, - sdkclient.StartWorkflowOptions{ - TaskQueue: tv1.TaskQueue().String(), - VersioningOverride: &sdkclient.PinnedVersioningOverride{ - Version: tv1.SDKDeploymentVersion(), - }, - }, - "waitingWorkflow", - ) - return startErr == nil - }, 10*time.Second, 500*time.Millisecond) - - // Verify version stays DRAINED for several checks (workflow is still running) - // Use Eventually with a counter to check multiple times that the signal was cached - drainedCheckCount := 0 - s.Eventually(func() bool { - resp, err := s.describeVersion(tv1) - s.NoError(err) - s.Equal(enumspb.VERSION_DRAINAGE_STATUS_DRAINED, resp.GetWorkerDeploymentVersionInfo().GetDrainageInfo().GetStatus(), - "Version should remain DRAINED because reactivation signal was cached") - s.Equal(enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_DRAINED, resp.GetWorkerDeploymentVersionInfo().GetStatus(), - "Version status should remain DRAINED because reactivation signal was cached") - drainedCheckCount++ - return drainedCheckCount >= 5 - }, 10*time.Second, 1*time.Second) - - // Signal the workflow to complete - s.NoError(s.SdkClient().SignalWorkflow(ctx, wfTV2.WorkflowID(), run2.GetRunID(), "complete", nil)) - s.NoError(run2.Get(ctx, &result)) -} - -func (s *DeploymentVersionSuite) TestReactivationSignalCache_Deduplication_UpdateOptions() { - s.OverrideDynamicConfig(dynamicconfig.VersionDrainageStatusVisibilityGracePeriod, testVersionDrainageVisibilityGracePeriod) - s.OverrideDynamicConfig(dynamicconfig.VersionDrainageStatusRefreshInterval, testLongVersionDrainageRefreshInterval) - - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) - defer cancel() - - // Use shorter, explicit deployment series names to avoid truncation issues - deploymentName := fmt.Sprintf("test-opts-cache-dedup-wfv%d", s.workflowVersion) - tv1 := testvars.New(s).WithDeploymentSeries(deploymentName).WithBuildID(deploymentName + "-v1").WithTaskQueue("test-opts-cache-dedup-tq") - tv2 := testvars.New(s).WithDeploymentSeries(deploymentName).WithBuildID(deploymentName + "-v2").WithTaskQueue("test-opts-cache-dedup-tq") - - s.startVersionWorkflow(ctx, tv1) - - // Set version 2 as current so workflows can start on it before being pinned/updated to v1 - s.startVersionWorkflow(ctx, tv2) - err := s.setCurrent(tv2, true) - s.NoError(err) - - // Workflow that waits for a signal before completing - wf := func(ctx workflow.Context) (string, error) { - workflow.GetSignalChannel(ctx, "complete").Receive(ctx, nil) - return "done", nil - } - - // Register worker for version 1 (INACTIVE) - w1 := worker.New(s.SdkClient(), tv1.TaskQueue().String(), worker.Options{ - DeploymentOptions: worker.DeploymentOptions{ - Version: tv1.SDKDeploymentVersion(), - UseVersioning: true, - }, - }) - w1.RegisterWorkflowWithOptions(wf, workflow.RegisterOptions{ - Name: "waitingWorkflow", - VersioningBehavior: workflow.VersioningBehaviorAutoUpgrade, - }) - s.NoError(w1.Start()) - defer w1.Stop() - - // Register worker for version 2 (CURRENT) on the same task queue - w2 := worker.New(s.SdkClient(), tv1.TaskQueue().String(), worker.Options{ - DeploymentOptions: worker.DeploymentOptions{ - Version: tv2.SDKDeploymentVersion(), - UseVersioning: true, - }, - }) - w2.RegisterWorkflowWithOptions(wf, workflow.RegisterOptions{ - Name: "waitingWorkflow", - VersioningBehavior: workflow.VersioningBehaviorAutoUpgrade, - }) - s.NoError(w2.Start()) - defer w2.Stop() - - s.waitForPollers(ctx, tv1, tv2) - - pinnedOverride := &workflowpb.VersioningOverride{ - Override: &workflowpb.VersioningOverride_Pinned{ - Pinned: &workflowpb.VersioningOverride_PinnedOverride{ - Behavior: workflowpb.VersioningOverride_PINNED_OVERRIDE_BEHAVIOR_PINNED, - Version: tv1.ExternalDeploymentVersion(), - }, - }, - } - - // === FIRST UPDATE OPTIONS: Should trigger reactivation (cache miss) === - // Start workflow on v2 (current version) - waits for signal to complete - wfTV1 := testvars.New(s) - run1, err := s.SdkClient().ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{ - TaskQueue: tv1.TaskQueue().String(), - ID: wfTV1.WorkflowID(), - }, "waitingWorkflow") - s.NoError(err) - - // Pin the workflow to v1 using UpdateWorkflowExecutionOptions - s.Eventually(func() bool { - _, err = s.FrontendClient().UpdateWorkflowExecutionOptions(ctx, - &workflowservice.UpdateWorkflowExecutionOptionsRequest{ - Namespace: s.Namespace().String(), - WorkflowExecution: &commonpb.WorkflowExecution{ - WorkflowId: wfTV1.WorkflowID(), - RunId: run1.GetRunID(), - }, - WorkflowExecutionOptions: &workflowpb.WorkflowExecutionOptions{ - VersioningOverride: pinnedOverride, - }, - UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"versioning_override"}}, - }) - return err == nil - }, 10*time.Second, 500*time.Millisecond) - - // Signal workflow to complete - s.NoError(s.SdkClient().SignalWorkflow(ctx, wfTV1.WorkflowID(), run1.GetRunID(), "complete", nil)) - - // Wait for workflow to complete - var result string - s.NoError(run1.Get(ctx, &result)) - - // Version 1 should transition to DRAINING (reactivated) - s.checkVersionDrainageAndVersionStatus(ctx, tv1, - &deploymentpb.VersionDrainageInfo{Status: enumspb.VERSION_DRAINAGE_STATUS_DRAINING}, - enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_DRAINING, - 0) - - // Wait for version 1 to become DRAINED again (workflow completed) - s.checkVersionDrainageAndVersionStatus(ctx, tv1, - &deploymentpb.VersionDrainageInfo{Status: enumspb.VERSION_DRAINAGE_STATUS_DRAINED}, - enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_DRAINED, - testVersionDrainageVisibilityGracePeriod+testLongVersionDrainageRefreshInterval) - - // === SECOND UPDATE OPTIONS: Should NOT trigger reactivation (cache hit) === - // Start another workflow on v2 - waits for signal to complete - wfTV2 := testvars.New(s) - run2, err := s.SdkClient().ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{ - TaskQueue: tv1.TaskQueue().String(), - ID: wfTV2.WorkflowID(), - }, "waitingWorkflow") - s.NoError(err) - - // Pin this workflow to v1 using UpdateWorkflowExecutionOptions (should be cached) - s.Eventually(func() bool { - _, err = s.FrontendClient().UpdateWorkflowExecutionOptions(ctx, - &workflowservice.UpdateWorkflowExecutionOptionsRequest{ - Namespace: s.Namespace().String(), - WorkflowExecution: &commonpb.WorkflowExecution{ - WorkflowId: wfTV2.WorkflowID(), - RunId: run2.GetRunID(), - }, - WorkflowExecutionOptions: &workflowpb.WorkflowExecutionOptions{ - VersioningOverride: pinnedOverride, - }, - UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"versioning_override"}}, - }) - return err == nil - }, 10*time.Second, 500*time.Millisecond) - - // Verify version stays DRAINED for several checks (workflow is still running) - // Use Eventually with a counter to check multiple times that the signal was cached - drainedCheckCount := 0 - s.Eventually(func() bool { - resp, err := s.describeVersion(tv1) - s.NoError(err) - s.Equal(enumspb.VERSION_DRAINAGE_STATUS_DRAINED, resp.GetWorkerDeploymentVersionInfo().GetDrainageInfo().GetStatus(), - "Version should remain DRAINED because reactivation signal was cached") - s.Equal(enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_DRAINED, resp.GetWorkerDeploymentVersionInfo().GetStatus(), - "Version status should remain DRAINED because reactivation signal was cached") - drainedCheckCount++ - return drainedCheckCount >= 5 - }, 10*time.Second, 1*time.Second) - - // Signal the workflow to complete - s.NoError(s.SdkClient().SignalWorkflow(ctx, wfTV2.WorkflowID(), run2.GetRunID(), "complete", nil)) - s.NoError(run2.Get(ctx, &result)) -} - -// TestReactivationSignalCache_Deduplication_Reset verifies that the version reactivation signal cache -// deduplicates signals when ResetWorkflowExecution is called multiple times with a pinned override -// to a non-current (INACTIVE or DRAINED) version. -func (s *DeploymentVersionSuite) TestReactivationSignalCache_Deduplication_Reset() { - s.OverrideDynamicConfig(dynamicconfig.VersionDrainageStatusVisibilityGracePeriod, testVersionDrainageVisibilityGracePeriod) - s.OverrideDynamicConfig(dynamicconfig.VersionDrainageStatusRefreshInterval, testLongVersionDrainageRefreshInterval) - - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) - defer cancel() - - // Use shorter, explicit deployment series names to avoid truncation issues - deploymentName := fmt.Sprintf("test-reset-cache-dedup-wfv%d", s.workflowVersion) - tv1 := testvars.New(s).WithDeploymentSeries(deploymentName).WithBuildID(deploymentName + "-v1").WithTaskQueue("test-reset-cache-dedup-tq") - tv2 := testvars.New(s).WithDeploymentSeries(deploymentName).WithBuildID(deploymentName + "-v2").WithTaskQueue("test-reset-cache-dedup-tq") - - s.startVersionWorkflow(ctx, tv1) - - // Set version 2 as current so workflows can start on it before being reset with a pinned override to v1 - s.startVersionWorkflow(ctx, tv2) - err := s.setCurrent(tv2, true) - s.NoError(err) - - // Workflow that waits for a signal before completing - wf := func(ctx workflow.Context) (string, error) { - workflow.GetSignalChannel(ctx, "complete").Receive(ctx, nil) - return "done", nil - } - - // Register worker for version 1 (INACTIVE) - w1 := worker.New(s.SdkClient(), tv1.TaskQueue().String(), worker.Options{ - DeploymentOptions: worker.DeploymentOptions{ - Version: tv1.SDKDeploymentVersion(), - UseVersioning: true, - }, - }) - w1.RegisterWorkflowWithOptions(wf, workflow.RegisterOptions{ - Name: "waitingWorkflow", - VersioningBehavior: workflow.VersioningBehaviorAutoUpgrade, - }) - s.NoError(w1.Start()) - defer w1.Stop() - - // Register worker for version 2 (CURRENT) on the same task queue - w2 := worker.New(s.SdkClient(), tv1.TaskQueue().String(), worker.Options{ - DeploymentOptions: worker.DeploymentOptions{ - Version: tv2.SDKDeploymentVersion(), - UseVersioning: true, - }, - }) - w2.RegisterWorkflowWithOptions(wf, workflow.RegisterOptions{ - Name: "waitingWorkflow", - VersioningBehavior: workflow.VersioningBehaviorAutoUpgrade, - }) - s.NoError(w2.Start()) - defer w2.Stop() - - s.waitForPollers(ctx, tv1, tv2) - - pinnedOverride := &workflowpb.VersioningOverride{ - Override: &workflowpb.VersioningOverride_Pinned{ - Pinned: &workflowpb.VersioningOverride_PinnedOverride{ - Behavior: workflowpb.VersioningOverride_PINNED_OVERRIDE_BEHAVIOR_PINNED, - Version: tv1.ExternalDeploymentVersion(), - }, - }, - } - - // Helper function to start a workflow, wait for task completion, and get reset event ID - startAndGetResetEventID := func(wfID string) (sdkclient.WorkflowRun, int64) { - run, err := s.SdkClient().ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{ - TaskQueue: tv1.TaskQueue().String(), - ID: wfID, - }, "waitingWorkflow") - s.NoError(err) - - // Wait for workflow task completion (creates a reset point) - s.Eventually(func() bool { - hist := s.SdkClient().GetWorkflowHistory(ctx, wfID, run.GetRunID(), false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT) - for hist.HasNext() { - event, err := hist.Next() - if err != nil { - return false - } - if event.EventType == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED { - return true - } - } - return false - }, 10*time.Second, 200*time.Millisecond) - - // Find the reset event ID - var resetEventID int64 - hist := s.SdkClient().GetWorkflowHistory(ctx, wfID, run.GetRunID(), false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT) - for hist.HasNext() { - event, err := hist.Next() - s.NoError(err) - if event.EventType == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED { - resetEventID = event.EventId - break - } - } - s.Positive(resetEventID) - return run, resetEventID - } - - // === FIRST RESET: Should trigger reactivation (cache miss) === - wfTV1 := testvars.New(s) - run1, resetEventID1 := startAndGetResetEventID(wfTV1.WorkflowID()) - - // Reset with pinned override to v1 (DRAINED) - var resetResp1 *workflowservice.ResetWorkflowExecutionResponse - s.Eventually(func() bool { - var resetErr error - resetResp1, resetErr = s.FrontendClient().ResetWorkflowExecution(ctx, &workflowservice.ResetWorkflowExecutionRequest{ - Namespace: s.Namespace().String(), - WorkflowExecution: &commonpb.WorkflowExecution{ - WorkflowId: wfTV1.WorkflowID(), - RunId: run1.GetRunID(), - }, - Reason: "testing-reset-cache-dedup-1", - RequestId: uuid.NewString(), - WorkflowTaskFinishEventId: resetEventID1, - PostResetOperations: []*workflowpb.PostResetOperation{ - { - Variant: &workflowpb.PostResetOperation_UpdateWorkflowOptions_{ - UpdateWorkflowOptions: &workflowpb.PostResetOperation_UpdateWorkflowOptions{ - WorkflowExecutionOptions: &workflowpb.WorkflowExecutionOptions{ - VersioningOverride: pinnedOverride, - }, - UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"versioning_override"}}, - }, - }, - }, - }, - }) - return resetErr == nil - }, 10*time.Second, 500*time.Millisecond) - - // Signal the reset workflow to complete - s.NoError(s.SdkClient().SignalWorkflow(ctx, wfTV1.WorkflowID(), resetResp1.RunId, "complete", nil)) - - // Wait for workflow to complete - resetRun1 := s.SdkClient().GetWorkflow(ctx, wfTV1.WorkflowID(), resetResp1.RunId) - var result string - s.NoError(resetRun1.Get(ctx, &result)) - - // Version 1 should transition to DRAINING (reactivated) - s.checkVersionDrainageAndVersionStatus(ctx, tv1, - &deploymentpb.VersionDrainageInfo{Status: enumspb.VERSION_DRAINAGE_STATUS_DRAINING}, - enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_DRAINING, - 0) - - // Wait for version 1 to become DRAINED again (workflow completed) - s.checkVersionDrainageAndVersionStatus(ctx, tv1, - &deploymentpb.VersionDrainageInfo{Status: enumspb.VERSION_DRAINAGE_STATUS_DRAINED}, - enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_DRAINED, - testVersionDrainageVisibilityGracePeriod+testLongVersionDrainageRefreshInterval) - - // === SECOND RESET: Should NOT trigger reactivation (cache hit) === - wfTV2 := testvars.New(s) - run2, resetEventID2 := startAndGetResetEventID(wfTV2.WorkflowID()) - - // Reset with pinned override to v1 (should be cached) - var resetResp2 *workflowservice.ResetWorkflowExecutionResponse - s.Eventually(func() bool { - var resetErr error - resetResp2, resetErr = s.FrontendClient().ResetWorkflowExecution(ctx, &workflowservice.ResetWorkflowExecutionRequest{ - Namespace: s.Namespace().String(), - WorkflowExecution: &commonpb.WorkflowExecution{ - WorkflowId: wfTV2.WorkflowID(), - RunId: run2.GetRunID(), - }, - Reason: "testing-reset-cache-dedup-2", - RequestId: uuid.NewString(), - WorkflowTaskFinishEventId: resetEventID2, - PostResetOperations: []*workflowpb.PostResetOperation{ - { - Variant: &workflowpb.PostResetOperation_UpdateWorkflowOptions_{ - UpdateWorkflowOptions: &workflowpb.PostResetOperation_UpdateWorkflowOptions{ - WorkflowExecutionOptions: &workflowpb.WorkflowExecutionOptions{ - VersioningOverride: pinnedOverride, - }, - UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"versioning_override"}}, - }, - }, - }, - }, - }) - return resetErr == nil - }, 10*time.Second, 500*time.Millisecond) - - // Verify version stays DRAINED for several checks (reset workflow is still running) - // Use Eventually with a counter to check multiple times that the signal was cached - drainedCheckCount := 0 - s.Eventually(func() bool { - resp, err := s.describeVersion(tv1) - s.NoError(err) - s.Equal(enumspb.VERSION_DRAINAGE_STATUS_DRAINED, resp.GetWorkerDeploymentVersionInfo().GetDrainageInfo().GetStatus(), - "Version should remain DRAINED because reactivation signal was cached") - s.Equal(enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_DRAINED, resp.GetWorkerDeploymentVersionInfo().GetStatus(), - "Version status should remain DRAINED because reactivation signal was cached") - drainedCheckCount++ - return drainedCheckCount >= 5 - }, 10*time.Second, 1*time.Second) - - // Signal the reset workflow to complete - s.NoError(s.SdkClient().SignalWorkflow(ctx, wfTV2.WorkflowID(), resetResp2.RunId, "complete", nil)) - - // Wait for workflow to complete - resetRun2 := s.SdkClient().GetWorkflow(ctx, wfTV2.WorkflowID(), resetResp2.RunId) - var result2 string - s.NoError(resetRun2.Get(ctx, &result2)) -} - func (s *DeploymentVersionSuite) TestDeleteVersion_ThenRecreateByPolling() { s.skipBeforeVersion(workerdeployment.VersionDataRevisionNumber) s.OverrideDynamicConfig(dynamicconfig.PollerHistoryTTL, 500*time.Millisecond)