Skip to content

Commit 562d26f

Browse files
Shivs11claude
andauthored
Skip reactivation signals for current/ramping/draining versions (#9778)
## Summary - Extends `CheckTaskQueueVersionMembership` response with two new fields: `is_version_active_or_draining` (bool) and `revision_number` (int64). Matching populates both from its deployment data. - Reactivation signals are **skipped** when matching reports the target version as CURRENT/RAMPING/DRAINING; otherwise they are sent with a **deterministic UUID v5 RequestId** derived from `revision_number`. - Replaces the old TTL-based `ReactivationSignalCache` with a per-pod **revision-based dedup LRU** on the worker-deployment client: each entry records the highest revision this pod has successfully signaled for a given version, so older or equal signals are skipped. ## What changed on the wire (matching → history) `CheckTaskQueueVersionMembershipResponse` now has two new flat fields (no wrapper message): ```proto bool is_version_active_or_draining = 2; // true when status is CURRENT/RAMPING/DRAINING int64 revision_number = 3; // from WorkerDeploymentVersionData.revision_number; 0 if unknown / legacy ``` Matching's `CheckTaskQueueVersionMembership` fills both via the helper `worker_versioning.IsVersionActiveOrDraining(deploymentData, dep, build) (bool, int64)`. Naming choice — we picked `is_version_active_or_draining` (negative polarity) rather than something like `supports_reactivation` so the proto zero value (`false`) maps to the safe default ("send the signal"). Old matching binaries and runtime "version not found" both produce the zero value, and history correctly falls through. ## Where `revision_number` flows - **Matching**: populates the response field from the version's tracked revision. - **History-side helper/caches**: `ValidateVersioningOverrideAndGetReactivationEligibility` returns `(isVersionActiveOrDraining bool, revisionNumber int64, err)`. `VersionMembershipAndReactivationStatusCache` stores both. - **History signaler plumbing**: `VersionReactivationSignalerFn`, `ReactivateVersionWorkflowIfPinned`, and all five call sites (`startworkflow`, `signalwithstartworkflow`, `updateworkflowoptions`, `resetworkflow`, `multioperation`) carry `revisionNumber int64`. `resetworkflow.validatePostResetOperationInputs` returns parallel slices `([]bool, []int64, error)` for per-operation inputs. - **Signal RequestId**: `ClientImpl.SignalVersionReactivation` composes `requestID = uuid.NewSHA1(uuid.NameSpaceOID, []byte("reactivation-signal:" + revisionNumber)).String()` — a deterministic UUID v5 derived from the revision alone. Cassandra's `signal_requested set<uuid>` column requires UUID-formatted RequestIds. ## Why revision-based dedup History is sharded on `(namespaceID, workflowID)`. N concurrent `StartWorkflow` calls pinned to the same drained version fan out across potentially every history pod in the fleet. Before this PR each pod independently fired a reactivation signal at the version workflow, producing up to N `WorkflowExecutionSignaled` events — directly at odds with the version workflow's design (it intentionally keeps history minimal and CaNs aggressively, see `version_workflow.go:68-74`). Per-pod caches alone can't fix this because they don't coordinate. What we need is a **cluster-wide-deterministic dedup key** so all pods converge on the same value for the same reactivation cycle. The version's `revision_number` — incremented in `syncTaskQueuesAsync` on every status change — is exactly that signal. Every pod reads the same revision from matching, every pod composes the same UUID RequestId, and Temporal's built-in `mutableState.pendingSignalRequestedIDs` dedup (see `service/history/api/signalworkflow/api.go:40`) collapses concurrent signals into exactly one event on the version workflow. The per-pod map is a local optimization on top of that: it prevents a single pod from re-sending the same-or-older-revision signal once it has successfully sent one, cutting RPC volume. ## How the new caches look ### 1. `VersionMembershipAndReactivationStatusCache` (read-side, per-pod) Caches matching's `CheckTaskQueueVersionMembership` response so repeated pinned-override validations on the same task queue don't re-hit matching. - **Key**: `(namespaceID, taskQueue, taskQueueType, deploymentName, buildID)` - **Value**: `(isMember bool, isVersionActiveOrDraining bool, revisionNumber int64)` - **Eviction**: `VersionMembershipCacheTTL` (1s default; 5s in functional tests). ### 2. `highestRevSignaledToVersionWf` (write-side dedup, per-pod) A field on `ClientImpl` in `service/worker/workerdeployment/client.go`. For each target version workflow, stores the highest revision this pod has successfully signaled. Subsequent calls at the same-or-lower revision skip the RPC. - **Key**: `reactivationVersionKey{namespaceID, deploymentName, buildID}` - **Value**: `int64` (highest revision successfully signaled) - **Eviction**: LRU, bounded by `VersionReactivationSignalCacheMaxSize`. The previous TTL-based `ReactivationSignalCache` module (in `common/worker_versioning/`) has been deleted along with its provider and `VersionReactivationSignalCacheTTL` config. ## Backwards/forwards compatibility - **Old matching → new history**: old binaries don't set `is_version_active_or_draining` or `revision_number`; both default to proto zero values. `false` on the active bool → history falls through → signal fires (safe default). `revisionNumber = 0` flows through as-is. - **New matching → old history**: new fields on the response are ignored by old history → identical to pre-PR behavior. - **New matching → new history**: signal fires only when the version is not active/draining; cross-pod fires converge on one UUID RequestId and fold into one `WorkflowExecutionSignaled` event. ## Test plan - [x] Unit tests for `IsVersionActiveOrDraining` covering all status cases (CURRENT, RAMPING, DRAINING, DRAINED, INACTIVE, UNSPECIFIED), new vs. old format, deleted and not-found versions. - [x] Unit tests for `ValidateVersioningOverrideAndGetReactivationEligibility` (cache hit/miss, RPC with/without eligibility, Unimplemented fallback). - [x] Unit tests for the per-pod dedup on `ClientImpl.SignalVersionReactivation`: same-rev dedups, newer-rev fires, older-rev skipped, different version isolated, signal-failure allows retry. - [x] Unit test for RequestId format (UUID v5, deterministic across calls with the same revision). - [x] Functional tests (all pass on SQLite and cass-es): - `TestStartWorkflowExecution_ReactivateVersionOnPinned` - `TestStartWorkflowExecution_ReactivateVersionOnPinned_WithConflictPolicy` - `TestSignalWithStartWorkflowExecution_ReactivateVersionOnPinned` - `TestUpdateWorkflowExecutionOptions_ReactivateVersionOnPinned` - `TestResetWorkflowExecution_ReactivateVersionOnPinned` (The four `TestReactivationSignalCache_Deduplication_*` functional tests from an earlier iteration were deleted — their coverage moved to unit tests.) <!-- CURSOR_SUMMARY --> --- > [!NOTE] > **Medium Risk** > Changes matching↔history API and reactivation signaling semantics by skipping signals for active/draining versions and introducing revision-based dedup via deterministic RequestIds; issues could affect version workflow state transitions or signal fan-out during upgrades. > > **Overview** > Matching’s `CheckTaskQueueVersionMembershipResponse` is extended with `should_skip_reactivation` and `revision_number`, and matching now populates both from per-task-queue deployment data. > > History-side versioning validation is refactored to return and cache reactivation eligibility + revision, and reactivation signaling paths (`StartWorkflow`, `SignalWithStart`, `UpdateWorkflowExecutionOptions`, `ResetWorkflow`, multi-op) now **skip signals** when matching reports the version as *CURRENT/RAMPING/DRAINING*. > > The old TTL-based `ReactivationSignalCache` is removed (configs/metrics/providers updated), and the worker-deployment client now performs **revision-based per-pod dedup** plus receiver-side dedup by sending signals with a deterministic UUIDv5-like `RequestId` derived from `revision_number`. Tests are updated/added to cover status evaluation, new plumbing, and dedup behavior. > > <sup>Reviewed by [Cursor Bugbot](https://cursor.com/bugbot) for commit a1ec5e9. Bugbot is set up for automated code reviews on this repo. Configure [here](https://www.cursor.com/dashboard/bugbot).</sup> <!-- /CURSOR_SUMMARY --> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 28fd8c4 commit 562d26f

30 files changed

Lines changed: 850 additions & 1014 deletions

File tree

api/matchingservice/v1/request_response.pb.go

Lines changed: 37 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/dynamicconfig/constants.go

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2921,17 +2921,12 @@ instead of the previous HSM backed implementation.`,
29212921
`Maximum number of entries in the version membership cache.`,
29222922
)
29232923

2924-
VersionReactivationSignalCacheTTL = NewGlobalDurationSetting(
2925-
"history.versionReactivationSignalCacheTTL",
2926-
10*time.Second,
2927-
`TTL for caching drainage reactivation signals to version workflows. These signals are sent from the history service to update the version workflow's
2928-
draining status to DRAINING from DRAINED/INACTIVE states.`,
2929-
)
2930-
2931-
VersionReactivationSignalCacheMaxSize = NewGlobalIntSetting(
2932-
"history.versionReactivationSignalCacheMaxSize",
2924+
ReactivationSignalDedupCacheMaxSize = NewGlobalIntSetting(
2925+
"worker.reactivationSignalDedupCacheMaxSize",
29332926
10000,
2934-
`Maximum number of entries in the version reactivation signal cache.`,
2927+
`Maximum number of entries in the per-pod reactivation-signal dedup cache on the
2928+
worker deployment client. Each entry tracks the highest revision signaled for one
2929+
target version workflow.`,
29352930
)
29362931

29372932
EnableVersionReactivationSignals = NewGlobalBoolSetting(

common/metrics/metric_defs.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ const (
4747
MutableStateCacheTypeTagValue = "mutablestate"
4848
EventsCacheTypeTagValue = "events"
4949
VersionMembershipCacheTypeTagValue = "version_membership"
50-
VersionReactivationSignalCacheTypeTagValue = "version_reactivation_signal"
50+
ReactivationSignalDedupCacheTypeTagValue = "reactivation_signal_dedup"
5151
RoutingInfoCacheTypeTagValue = "routing_info"
5252
NexusEndpointRegistryReadThroughCacheTypeTagValue = "nexus_endpoint_registry_readthrough"
5353

@@ -459,8 +459,9 @@ const (
459459
VersionMembershipCacheGetScope = "VersionMembershipCacheGet"
460460
// VersionMembershipCachePutScope is the scope used by version membership cache
461461
VersionMembershipCachePutScope = "VersionMembershipCachePut"
462-
// VersionReactivationSignalCacheShouldSendScope is the scope used by version reactivation signal cache
463-
VersionReactivationSignalCacheShouldSendScope = "VersionReactivationSignalCacheShouldSend"
462+
// ReactivationSignalDedupScope is the scope used by the per-pod reactivation-signal
463+
// dedup cache on the worker-deployment client.
464+
ReactivationSignalDedupScope = "ReactivationSignalDedup"
464465
// RoutingInfoCacheGetScope is the scope used by routing info cache
465466
RoutingInfoCacheGetScope = "RoutingInfoCacheGet"
466467
// RoutingInfoCachePutScope is the scope used by routing info cache

common/worker_versioning/version_membership_cache.go

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,27 @@ import (
77
"go.temporal.io/server/common/metrics"
88
)
99

10-
// VersionMembershipCache is used to cache results of Matching's CheckTaskQueueVersionMembership
11-
// calls (used internally by the worker versioning pinned override validation).
10+
// VersionMembershipAndReactivationStatusCache caches results of Matching's
11+
// CheckTaskQueueVersionMembership calls. It stores three pieces of information per version:
12+
// - isMember: whether the task queue exists in the version (used for pinned override validation).
13+
// - shouldSkipReactivation: true when the version's current status is CURRENT, RAMPING,
14+
// or DRAINING, in which case the caller skips the reactivation signal. The zero value
15+
// (false) is the safe default and covers unknown / not-found / old-matching cases.
16+
// - revisionNumber: the version's current revision per matching's view. Used as part of the
17+
// reactivation signal's RequestId so that all history pods targeting the same version at
18+
// the same revision compose the same dedup key. Zero means unknown (old matching server or
19+
// legacy DeploymentVersionData format with no revision_number field).
1220
//
1321
// Implementations are expected to be safe for concurrent use.
1422
type (
15-
VersionMembershipCache interface {
16-
// Get returns (isMember, ok). ok=false means there was no cached value.
23+
VersionMembershipAndReactivationStatusCache interface {
1724
Get(
1825
namespaceID string,
1926
taskQueue string,
2027
taskQueueType enumspb.TaskQueueType,
2128
deploymentName string,
2229
buildID string,
23-
) (isMember bool, ok bool)
30+
) (isMember bool, shouldSkipReactivation bool, revisionNumber int64, ok bool)
2431

2532
Put(
2633
namespaceID string,
@@ -29,6 +36,8 @@ type (
2936
deploymentName string,
3037
buildID string,
3138
isMember bool,
39+
shouldSkipReactivation bool,
40+
revisionNumber int64,
3241
)
3342
}
3443

@@ -40,28 +49,34 @@ type (
4049
buildID string
4150
}
4251

43-
VersionMembershipCacheImpl struct {
52+
versionTaskQueueInfoCacheValue struct {
53+
isMember bool
54+
shouldSkipReactivation bool // false = unknown / not-found / eligible-for-reactivation
55+
revisionNumber int64 // 0 = unknown (old matching server or legacy format)
56+
}
57+
58+
VersionMembershipAndReactivationStatusCacheImpl struct {
4459
cache.Cache
4560
metricsHandler metrics.Handler
4661
}
4762
)
4863

49-
// NewVersionMembershipCache wraps the provided cache with a typed API and metrics.
50-
func NewVersionMembershipCache(c cache.Cache, metricsHandler metrics.Handler) VersionMembershipCache {
64+
// NewVersionMembershipAndReactivationStatusCache wraps the provided cache with a typed API and metrics.
65+
func NewVersionMembershipAndReactivationStatusCache(c cache.Cache, metricsHandler metrics.Handler) VersionMembershipAndReactivationStatusCache {
5166
h := metricsHandler.WithTags(metrics.CacheTypeTag(metrics.VersionMembershipCacheTypeTagValue))
52-
return &VersionMembershipCacheImpl{
67+
return &VersionMembershipAndReactivationStatusCacheImpl{
5368
Cache: c,
5469
metricsHandler: h,
5570
}
5671
}
5772

58-
func (c *VersionMembershipCacheImpl) Get(
73+
func (c *VersionMembershipAndReactivationStatusCacheImpl) Get(
5974
namespaceID string,
6075
taskQueue string,
6176
taskQueueType enumspb.TaskQueueType,
6277
deploymentName string,
6378
buildID string,
64-
) (isMember bool, ok bool) {
79+
) (isMember bool, shouldSkipReactivation bool, revisionNumber int64, ok bool) {
6580
handler := c.metricsHandler.WithTags(metrics.OperationTag(metrics.VersionMembershipCacheGetScope), metrics.NamespaceIDTag(namespaceID))
6681
metrics.CacheRequests.With(handler).Record(1)
6782

@@ -75,24 +90,26 @@ func (c *VersionMembershipCacheImpl) Get(
7590
v := c.Cache.Get(key)
7691
if v == nil {
7792
metrics.CacheMissCounter.With(handler).Record(1)
78-
return false, false
93+
return false, false, 0, false
7994
}
80-
isMember, ok = v.(bool)
95+
value, ok := v.(versionTaskQueueInfoCacheValue)
8196
if !ok {
8297
// Unexpected type: treat as miss to avoid false positives.
8398
metrics.CacheMissCounter.With(handler).Record(1)
84-
return false, false
99+
return false, false, 0, false
85100
}
86-
return isMember, true
101+
return value.isMember, value.shouldSkipReactivation, value.revisionNumber, true
87102
}
88103

89-
func (c *VersionMembershipCacheImpl) Put(
104+
func (c *VersionMembershipAndReactivationStatusCacheImpl) Put(
90105
namespaceID string,
91106
taskQueue string,
92107
taskQueueType enumspb.TaskQueueType,
93108
deploymentName string,
94109
buildID string,
95110
isMember bool,
111+
shouldSkipReactivation bool,
112+
revisionNumber int64,
96113
) {
97114
handler := c.metricsHandler.WithTags(metrics.OperationTag(metrics.VersionMembershipCachePutScope), metrics.NamespaceIDTag(namespaceID))
98115
metrics.CacheRequests.With(handler).Record(1)
@@ -104,5 +121,9 @@ func (c *VersionMembershipCacheImpl) Put(
104121
deploymentName: deploymentName,
105122
buildID: buildID,
106123
}
107-
c.Cache.Put(key, isMember)
124+
c.Cache.Put(key, versionTaskQueueInfoCacheValue{
125+
isMember: isMember,
126+
shouldSkipReactivation: shouldSkipReactivation,
127+
revisionNumber: revisionNumber,
128+
})
108129
}

common/worker_versioning/version_reactivation_signal_cache.go

Lines changed: 0 additions & 65 deletions
This file was deleted.

0 commit comments

Comments
 (0)