Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
b327f97
Skip reactivation signals for versions that are not drained or inactive
Shivs11 Apr 2, 2026
c2dbd81
Address PR review comments: rename functions, improve comments, add o…
Shivs11 Apr 2, 2026
6072c83
Merge main and resolve proto indentation conflict
Shivs11 Apr 2, 2026
c3011dd
Fix formatting (make fmt)
Shivs11 Apr 3, 2026
6e53f2d
Handle deleted versions in IsVersionDrainedOrInactive and add unit tests
Shivs11 Apr 4, 2026
e34d88e
revision number for cache
Shivs11 Apr 20, 2026
101f3fa
Merge branch 'main' into fixing-reactivation-signals
Shivs11 Apr 20, 2026
787c4ac
Fix lint: use require instead of assert, add nolint for package name
Shivs11 Apr 20, 2026
67768ff
Remove reactivation_dedup tests since they are repeated and are alrea…
Shivs11 Apr 20, 2026
7783325
Removed versions from requiring to be draining and sped up tests
Shivs11 Apr 20, 2026
b173752
Fix lint: use require.Eventually instead of time.Sleep
Shivs11 Apr 20, 2026
403453f
Use deterministic UUID v5 for reactivation signal RequestId
Shivs11 Apr 21, 2026
dd2ed99
Dedup reactivation signals per revision and flatten eligibility bool
Shivs11 Apr 21, 2026
9a789c3
Merge remote-tracking branch 'origin/main' into fixing-reactivation-s…
Shivs11 Apr 21, 2026
8b35d68
Wire up fx Stop hook for the reactivation-signal dedup cache
Shivs11 Apr 21, 2026
2934433
Address lint: boolPtr, named results, gci, type assertion
Shivs11 Apr 21, 2026
b533272
Re-align Config struct fields after removing cache TTL entries
Shivs11 Apr 21, 2026
1f09288
Rename dedup cache, switch UUID namespace to Nil, re-add hit/miss met…
Shivs11 Apr 21, 2026
63e588f
Rename to should_skip_reactivation / ShouldSkipReactivation
Shivs11 Apr 22, 2026
8c620a6
Merge branch 'main' into fixing-reactivation-signals
Shivs11 Apr 22, 2026
c4ba543
Tighten shouldSkipReactivation cache comment
Shivs11 Apr 22, 2026
a1ec5e9
Merge branch 'fixing-reactivation-signals' of github.com:temporalio/t…
Shivs11 Apr 22, 2026
6a360f5
Merge branch 'main' into fixing-reactivation-signals
Shivs11 Apr 22, 2026
28395b9
Merge branch 'main' into fixing-reactivation-signals
Shivs11 Apr 22, 2026
1dffbf6
Merge branch 'main' into fixing-reactivation-signals
Shivs11 Apr 23, 2026
09d63f5
Merge branch 'main' into fixing-reactivation-signals
Shivs11 Apr 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 37 additions & 6 deletions api/matchingservice/v1/request_response.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 5 additions & 10 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2921,17 +2921,12 @@ instead of the previous HSM backed implementation.`,
`Maximum number of entries in the version membership cache.`,
)

VersionReactivationSignalCacheTTL = NewGlobalDurationSetting(
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: i think it's fine to remove this from the dynamic config given that the flag controlling the sending of the signal behaviour has been turned to false globally (history.enableVersionReactivationSignals)

"history.versionReactivationSignalCacheTTL",
10*time.Second,
`TTL for caching drainage reactivation signals to version workflows. These signals are sent from the history service to update the version workflow's
draining status to DRAINING from DRAINED/INACTIVE states.`,
)

VersionReactivationSignalCacheMaxSize = NewGlobalIntSetting(
"history.versionReactivationSignalCacheMaxSize",
ReactivationSignalDedupCacheMaxSize = NewGlobalIntSetting(
"worker.reactivationSignalDedupCacheMaxSize",
10000,
`Maximum number of entries in the version reactivation signal cache.`,
`Maximum number of entries in the per-pod reactivation-signal dedup cache on the
worker deployment client. Each entry tracks the highest revision signaled for one
target version workflow.`,
)

EnableVersionReactivationSignals = NewGlobalBoolSetting(
Expand Down
7 changes: 4 additions & 3 deletions common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ const (
MutableStateCacheTypeTagValue = "mutablestate"
EventsCacheTypeTagValue = "events"
VersionMembershipCacheTypeTagValue = "version_membership"
VersionReactivationSignalCacheTypeTagValue = "version_reactivation_signal"
ReactivationSignalDedupCacheTypeTagValue = "reactivation_signal_dedup"
RoutingInfoCacheTypeTagValue = "routing_info"
NexusEndpointRegistryReadThroughCacheTypeTagValue = "nexus_endpoint_registry_readthrough"

Expand Down Expand Up @@ -459,8 +459,9 @@ const (
VersionMembershipCacheGetScope = "VersionMembershipCacheGet"
// VersionMembershipCachePutScope is the scope used by version membership cache
VersionMembershipCachePutScope = "VersionMembershipCachePut"
// VersionReactivationSignalCacheShouldSendScope is the scope used by version reactivation signal cache
VersionReactivationSignalCacheShouldSendScope = "VersionReactivationSignalCacheShouldSend"
// ReactivationSignalDedupScope is the scope used by the per-pod reactivation-signal
// dedup cache on the worker-deployment client.
ReactivationSignalDedupScope = "ReactivationSignalDedup"
// RoutingInfoCacheGetScope is the scope used by routing info cache
RoutingInfoCacheGetScope = "RoutingInfoCacheGet"
// RoutingInfoCachePutScope is the scope used by routing info cache
Expand Down
55 changes: 38 additions & 17 deletions common/worker_versioning/version_membership_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,27 @@ import (
"go.temporal.io/server/common/metrics"
)

// VersionMembershipCache is used to cache results of Matching's CheckTaskQueueVersionMembership
// calls (used internally by the worker versioning pinned override validation).
// VersionMembershipAndReactivationStatusCache caches results of Matching's
// CheckTaskQueueVersionMembership calls. It stores three pieces of information per version:
// - isMember: whether the task queue exists in the version (used for pinned override validation).
// - shouldSkipReactivation: true when the version's current status is CURRENT, RAMPING,
// or DRAINING, in which case the caller skips the reactivation signal. The zero value
// (false) is the safe default and covers unknown / not-found / old-matching cases.
// - revisionNumber: the version's current revision per matching's view. Used as part of the
// reactivation signal's RequestId so that all history pods targeting the same version at
// the same revision compose the same dedup key. Zero means unknown (old matching server or
// legacy DeploymentVersionData format with no revision_number field).
//
// Implementations are expected to be safe for concurrent use.
type (
VersionMembershipCache interface {
// Get returns (isMember, ok). ok=false means there was no cached value.
VersionMembershipAndReactivationStatusCache interface {
Get(
namespaceID string,
taskQueue string,
taskQueueType enumspb.TaskQueueType,
deploymentName string,
buildID string,
) (isMember bool, ok bool)
) (isMember bool, shouldSkipReactivation bool, revisionNumber int64, ok bool)

Put(
namespaceID string,
Expand All @@ -29,6 +36,8 @@ type (
deploymentName string,
buildID string,
isMember bool,
shouldSkipReactivation bool,
revisionNumber int64,
)
}

Expand All @@ -40,28 +49,34 @@ type (
buildID string
}

VersionMembershipCacheImpl struct {
versionTaskQueueInfoCacheValue struct {
isMember bool
shouldSkipReactivation bool // false = unknown / not-found / eligible-for-reactivation
revisionNumber int64 // 0 = unknown (old matching server or legacy format)
}

VersionMembershipAndReactivationStatusCacheImpl struct {
cache.Cache
metricsHandler metrics.Handler
}
)

// NewVersionMembershipCache wraps the provided cache with a typed API and metrics.
func NewVersionMembershipCache(c cache.Cache, metricsHandler metrics.Handler) VersionMembershipCache {
// NewVersionMembershipAndReactivationStatusCache wraps the provided cache with a typed API and metrics.
func NewVersionMembershipAndReactivationStatusCache(c cache.Cache, metricsHandler metrics.Handler) VersionMembershipAndReactivationStatusCache {
h := metricsHandler.WithTags(metrics.CacheTypeTag(metrics.VersionMembershipCacheTypeTagValue))
return &VersionMembershipCacheImpl{
return &VersionMembershipAndReactivationStatusCacheImpl{
Cache: c,
metricsHandler: h,
}
}

func (c *VersionMembershipCacheImpl) Get(
func (c *VersionMembershipAndReactivationStatusCacheImpl) Get(
namespaceID string,
taskQueue string,
taskQueueType enumspb.TaskQueueType,
deploymentName string,
buildID string,
) (isMember bool, ok bool) {
) (isMember bool, shouldSkipReactivation bool, revisionNumber int64, ok bool) {
handler := c.metricsHandler.WithTags(metrics.OperationTag(metrics.VersionMembershipCacheGetScope), metrics.NamespaceIDTag(namespaceID))
metrics.CacheRequests.With(handler).Record(1)

Expand All @@ -75,24 +90,26 @@ func (c *VersionMembershipCacheImpl) Get(
v := c.Cache.Get(key)
if v == nil {
metrics.CacheMissCounter.With(handler).Record(1)
return false, false
return false, false, 0, false
}
isMember, ok = v.(bool)
value, ok := v.(versionTaskQueueInfoCacheValue)
if !ok {
// Unexpected type: treat as miss to avoid false positives.
metrics.CacheMissCounter.With(handler).Record(1)
return false, false
return false, false, 0, false
}
return isMember, true
return value.isMember, value.shouldSkipReactivation, value.revisionNumber, true
}

func (c *VersionMembershipCacheImpl) Put(
func (c *VersionMembershipAndReactivationStatusCacheImpl) Put(
namespaceID string,
taskQueue string,
taskQueueType enumspb.TaskQueueType,
deploymentName string,
buildID string,
isMember bool,
shouldSkipReactivation bool,
revisionNumber int64,
) {
handler := c.metricsHandler.WithTags(metrics.OperationTag(metrics.VersionMembershipCachePutScope), metrics.NamespaceIDTag(namespaceID))
metrics.CacheRequests.With(handler).Record(1)
Expand All @@ -104,5 +121,9 @@ func (c *VersionMembershipCacheImpl) Put(
deploymentName: deploymentName,
buildID: buildID,
}
c.Cache.Put(key, isMember)
c.Cache.Put(key, versionTaskQueueInfoCacheValue{
isMember: isMember,
shouldSkipReactivation: shouldSkipReactivation,
revisionNumber: revisionNumber,
})
}
65 changes: 0 additions & 65 deletions common/worker_versioning/version_reactivation_signal_cache.go

This file was deleted.

Loading
Loading