Skip to content

Commit 41f4857

Browse files
majirudnr
authored andcommitted
Add subscription for autoEnable and handle changes (#9781)
## What changed? Move autoenable to a subscription, and check when it changes to see if we need to unload. ## Why? When we disable autoEnable as we approach GA we need to be able to go back to the old dynamic config being the source of truth. --------- Co-authored-by: David Reiss <dnr@dnr.im>
1 parent dd6265f commit 41f4857

3 files changed

Lines changed: 297 additions & 36 deletions

File tree

service/matching/config.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ type (
4343
NewMatcherSub dynamicconfig.TypedSubscribableWithTaskQueueFilter[dynamicconfig.GradualChange[bool]]
4444
EnableFairnessSub dynamicconfig.TypedSubscribableWithTaskQueueFilter[dynamicconfig.GradualChange[bool]]
4545
EnableMigration dynamicconfig.BoolPropertyFnWithTaskQueueFilter
46-
AutoEnableV2 dynamicconfig.BoolPropertyFnWithTaskQueueFilter
46+
AutoEnableV2Sub dynamicconfig.TypedSubscribableWithTaskQueueFilter[bool]
4747
GetTasksBatchSize dynamicconfig.IntPropertyFnWithTaskQueueFilter
4848
GetTasksReloadAt dynamicconfig.IntPropertyFnWithTaskQueueFilter
4949
UpdateAckInterval dynamicconfig.DurationPropertyFnWithTaskQueueFilter
@@ -162,6 +162,7 @@ type (
162162
EnableFairnessSub func(func(dynamicconfig.GradualChange[bool])) (dynamicconfig.GradualChange[bool], func())
163163
EnableMigration func() bool
164164
AutoEnableV2 func() bool
165+
AutoEnableV2Sub func(func(bool)) (bool, func())
165166
GetTasksBatchSize func() int
166167
GetTasksReloadAt func() int
167168
UpdateAckInterval func() time.Duration
@@ -274,7 +275,7 @@ func NewConfig(
274275
NewMatcherSub: dynamicconfig.MatchingUseNewMatcher.Subscribe(dc),
275276
EnableFairnessSub: dynamicconfig.MatchingEnableFairness.Subscribe(dc),
276277
EnableMigration: dynamicconfig.MatchingEnableMigration.Get(dc),
277-
AutoEnableV2: dynamicconfig.MatchingAutoEnableV2.Get(dc),
278+
AutoEnableV2Sub: dynamicconfig.MatchingAutoEnableV2.Subscribe(dc),
278279
GetTasksBatchSize: dynamicconfig.MatchingGetTasksBatchSize.Get(dc),
279280
GetTasksReloadAt: dynamicconfig.MatchingGetTasksReloadAt.Get(dc),
280281
UpdateAckInterval: dynamicconfig.MatchingUpdateAckInterval.Get(dc),
@@ -383,7 +384,11 @@ func newTaskQueueConfig(tq *tqid.TaskQueue, config *Config, ns namespace.Name) *
383384
return config.EnableMigration(ns.String(), taskQueueName, taskType)
384385
},
385386
AutoEnableV2: func() bool {
386-
return config.AutoEnableV2(ns.String(), taskQueueName, taskType)
387+
v, _ := config.AutoEnableV2Sub(ns.String(), taskQueueName, taskType, nil)
388+
return v
389+
},
390+
AutoEnableV2Sub: func(cb func(bool)) (bool, func()) {
391+
return config.AutoEnableV2Sub(ns.String(), taskQueueName, taskType, cb)
387392
},
388393
GetTasksBatchSize: func() int {
389394
return config.GetTasksBatchSize(ns.String(), taskQueueName, taskType)

service/matching/matching_engine_test.go

Lines changed: 211 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5601,7 +5601,7 @@ func defaultTestConfig() *Config {
56015601
config := NewConfig(dynamicconfig.NewNoopCollection())
56025602
config.LongPollExpirationInterval = dynamicconfig.GetDurationPropertyFnFilteredByTaskQueue(100 * time.Millisecond)
56035603
config.MaxTaskDeleteBatchSize = dynamicconfig.GetIntPropertyFnFilteredByTaskQueue(1)
5604-
config.AutoEnableV2 = dynamicconfig.GetBoolPropertyFnFilteredByTaskQueue(true)
5604+
config.AutoEnableV2Sub = trueTaskQueueSub
56055605
return config
56065606
}
56075607

@@ -5643,6 +5643,10 @@ func staticTrueChange(_, _ string, _ enumspb.TaskQueueType, _ func(dynamicconfig
56435643
return dynamicconfig.StaticGradualChange(true), func() {}
56445644
}
56455645

5646+
func trueTaskQueueSub(_, _ string, _ enumspb.TaskQueueType, _ func(bool)) (bool, func()) {
5647+
return true, func() {}
5648+
}
5649+
56465650
func staticFalseChange(_, _ string, _ enumspb.TaskQueueType, _ func(dynamicconfig.GradualChange[bool])) (dynamicconfig.GradualChange[bool], func()) {
56475651
return dynamicconfig.StaticGradualChange(false), func() {}
56485652
}
@@ -5778,3 +5782,209 @@ func TestCancelOutstandingWorkerPolls(t *testing.T) {
57785782
require.Equal(t, 0, engine.shutdownWorkers.Size())
57795783
})
57805784
}
5785+
5786+
// TestAutoEnableV2ConfigChange tests that switching autoEnable triggers unload when effective config changes
5787+
func TestAutoEnableV2ConfigChange(t *testing.T) {
5788+
controller := gomock.NewController(t)
5789+
5790+
logger := testlogger.NewTestLogger(t, testlogger.FailOnAnyUnexpectedError)
5791+
5792+
dcClient := dynamicconfig.NewMemoryClient()
5793+
dcCollection := dynamicconfig.NewCollection(dcClient, logger)
5794+
dcCollection.Start()
5795+
defer dcCollection.Stop()
5796+
5797+
matchingClient := matchingservicemock.NewMockMatchingServiceClient(controller)
5798+
matchingClient.EXPECT().ForceLoadTaskQueuePartition(gomock.Any(), gomock.Any(), gomock.Any()).
5799+
Return(&matchingservice.ForceLoadTaskQueuePartitionResponse{}, nil).AnyTimes()
5800+
5801+
_, registry := createMockNamespaceCache(controller, namespace.Name(namespaceName))
5802+
5803+
config := NewConfig(dcCollection)
5804+
config.EnableMigration = dynamicconfig.GetBoolPropertyFnFilteredByTaskQueue(false)
5805+
config.LongPollExpirationInterval = dynamicconfig.GetDurationPropertyFnFilteredByTaskQueue(100 * time.Millisecond)
5806+
config.MaxTaskDeleteBatchSize = dynamicconfig.GetIntPropertyFnFilteredByTaskQueue(1)
5807+
5808+
engine := createTestMatchingEngine(logger, controller, config, matchingClient, registry)
5809+
engine.Start()
5810+
defer engine.Stop()
5811+
5812+
// autoEnable ON, base configs OFF -> with V2 fairnessState, effective config is NewMatcher=true, EnableFairness=true
5813+
cleanupAutoEnable := dcClient.OverrideSetting(dynamicconfig.MatchingAutoEnableV2, true)
5814+
cleanupFairness := dcClient.OverrideSetting(dynamicconfig.MatchingEnableFairness, false)
5815+
cleanupNewMatcher := dcClient.OverrideSetting(dynamicconfig.MatchingUseNewMatcher, false)
5816+
defer cleanupAutoEnable()
5817+
defer cleanupFairness()
5818+
defer cleanupNewMatcher()
5819+
5820+
testNamespaceID := uuid.NewString()
5821+
testTaskQueueName := "test-tq-" + uuid.NewString()
5822+
5823+
ns := namespace.NewLocalNamespaceForTest(
5824+
&persistencespb.NamespaceInfo{Name: "test-namespace", Id: testNamespaceID},
5825+
nil,
5826+
"",
5827+
)
5828+
5829+
f, err := tqid.NewTaskQueueFamily(testNamespaceID, testTaskQueueName)
5830+
require.NoError(t, err)
5831+
partition := f.TaskQueue(enumspb.TASK_QUEUE_TYPE_WORKFLOW).RootPartition()
5832+
5833+
tqConfig := newTaskQueueConfig(partition.TaskQueue(), engine.config, ns.Name())
5834+
5835+
userData := &mockUserDataManager{
5836+
data: &persistencespb.VersionedTaskQueueUserData{
5837+
Data: &persistencespb.TaskQueueUserData{
5838+
PerType: map[int32]*persistencespb.TaskQueueTypeUserData{
5839+
int32(enumspb.TASK_QUEUE_TYPE_WORKFLOW): {
5840+
FairnessState: enumsspb.FAIRNESS_STATE_V2,
5841+
},
5842+
},
5843+
},
5844+
},
5845+
}
5846+
5847+
pm, err := newTaskQueuePartitionManager(
5848+
engine,
5849+
ns,
5850+
partition,
5851+
tqConfig,
5852+
logger,
5853+
logger,
5854+
metrics.NoopMetricsHandler,
5855+
userData,
5856+
)
5857+
require.NoError(t, err)
5858+
5859+
engine.partitions[partition.Key()] = pm
5860+
5861+
pm.Start()
5862+
defer pm.Stop(unloadCauseIdle)
5863+
5864+
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
5865+
defer cancel()
5866+
err = pm.WaitUntilInitialized(ctx)
5867+
require.NoError(t, err)
5868+
5869+
pq, err := pm.defaultQueueFuture.Get(ctx)
5870+
require.NoError(t, err)
5871+
5872+
// Turn autoEnable OFF -> effective config changes to NewMatcher=false, EnableFairness=false
5873+
cleanupAutoEnable()
5874+
_ = dcClient.OverrideSetting(dynamicconfig.MatchingAutoEnableV2, false)
5875+
5876+
require.Eventually(t, func() bool {
5877+
return !pm.config.AutoEnableV2()
5878+
}, 2*time.Second, 10*time.Millisecond, "autoEnable should be updated")
5879+
5880+
require.Eventually(t, func() bool {
5881+
return pq.(*physicalTaskQueueManagerImpl).tqCtx.Err() != nil
5882+
}, 2*time.Second, 10*time.Millisecond, "physical queue should be stopped when effective config changes")
5883+
}
5884+
5885+
func TestAutoEnableV2ConfigChange_NoUnloadWhenEffectiveConfigUnchanged(t *testing.T) {
5886+
controller := gomock.NewController(t)
5887+
5888+
logger := testlogger.NewTestLogger(t, testlogger.FailOnAnyUnexpectedError)
5889+
5890+
dcClient := dynamicconfig.NewMemoryClient()
5891+
dcCollection := dynamicconfig.NewCollection(dcClient, logger)
5892+
dcCollection.Start()
5893+
defer dcCollection.Stop()
5894+
5895+
matchingClient := matchingservicemock.NewMockMatchingServiceClient(controller)
5896+
matchingClient.EXPECT().ForceLoadTaskQueuePartition(gomock.Any(), gomock.Any(), gomock.Any()).
5897+
Return(&matchingservice.ForceLoadTaskQueuePartitionResponse{}, nil).AnyTimes()
5898+
5899+
_, registry := createMockNamespaceCache(controller, namespace.Name(namespaceName))
5900+
5901+
config := NewConfig(dcCollection)
5902+
config.EnableMigration = dynamicconfig.GetBoolPropertyFnFilteredByTaskQueue(false)
5903+
config.LongPollExpirationInterval = dynamicconfig.GetDurationPropertyFnFilteredByTaskQueue(100 * time.Millisecond)
5904+
config.MaxTaskDeleteBatchSize = dynamicconfig.GetIntPropertyFnFilteredByTaskQueue(1)
5905+
5906+
engine := createTestMatchingEngine(logger, controller, config, matchingClient, registry)
5907+
engine.Start()
5908+
defer engine.Stop()
5909+
5910+
// autoEnable OFF, base configs ON -> with V2 fairnessState, effective config is NewMatcher=true, EnableFairness=true
5911+
cleanupAutoEnable := dcClient.OverrideSetting(dynamicconfig.MatchingAutoEnableV2, false)
5912+
cleanupFairness := dcClient.OverrideSetting(dynamicconfig.MatchingEnableFairness, true)
5913+
cleanupNewMatcher := dcClient.OverrideSetting(dynamicconfig.MatchingUseNewMatcher, true)
5914+
defer cleanupAutoEnable()
5915+
defer cleanupFairness()
5916+
defer cleanupNewMatcher()
5917+
5918+
testNamespaceID := uuid.NewString()
5919+
testTaskQueueName := "test-tq-" + uuid.NewString()
5920+
5921+
ns := namespace.NewLocalNamespaceForTest(
5922+
&persistencespb.NamespaceInfo{Name: "test-namespace", Id: testNamespaceID},
5923+
nil,
5924+
"",
5925+
)
5926+
5927+
f, err := tqid.NewTaskQueueFamily(testNamespaceID, testTaskQueueName)
5928+
require.NoError(t, err)
5929+
partition := f.TaskQueue(enumspb.TASK_QUEUE_TYPE_WORKFLOW).RootPartition()
5930+
5931+
tqConfig := newTaskQueueConfig(partition.TaskQueue(), engine.config, ns.Name())
5932+
5933+
userData := &mockUserDataManager{
5934+
data: &persistencespb.VersionedTaskQueueUserData{
5935+
Data: &persistencespb.TaskQueueUserData{
5936+
PerType: map[int32]*persistencespb.TaskQueueTypeUserData{
5937+
int32(enumspb.TASK_QUEUE_TYPE_WORKFLOW): {
5938+
FairnessState: enumsspb.FAIRNESS_STATE_V2,
5939+
},
5940+
},
5941+
},
5942+
},
5943+
}
5944+
5945+
pm, err := newTaskQueuePartitionManager(
5946+
engine,
5947+
ns,
5948+
partition,
5949+
tqConfig,
5950+
logger,
5951+
logger,
5952+
metrics.NoopMetricsHandler,
5953+
userData,
5954+
)
5955+
require.NoError(t, err)
5956+
5957+
engine.partitions[partition.Key()] = pm
5958+
5959+
pm.Start()
5960+
defer pm.Stop(unloadCauseIdle)
5961+
5962+
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
5963+
defer cancel()
5964+
err = pm.WaitUntilInitialized(ctx)
5965+
require.NoError(t, err)
5966+
5967+
require.Eventually(t, func() bool {
5968+
return !pm.config.AutoEnableV2() && pm.config.NewMatcher && pm.config.EnableFairness
5969+
}, 2*time.Second, 10*time.Millisecond, "config should be initialized")
5970+
5971+
pq, err := pm.defaultQueueFuture.Get(ctx)
5972+
require.NoError(t, err)
5973+
5974+
// Turn autoEnable ON -> effective config stays NewMatcher=true, EnableFairness=true (same as before)
5975+
cleanupAutoEnable()
5976+
_ = dcClient.OverrideSetting(dynamicconfig.MatchingAutoEnableV2, true)
5977+
5978+
require.Eventually(t, func() bool {
5979+
return pm.config.AutoEnableV2()
5980+
}, 2*time.Second, 10*time.Millisecond, "autoEnable should be updated")
5981+
5982+
require.Never(t, func() bool {
5983+
select {
5984+
case <-pq.(*physicalTaskQueueManagerImpl).tqCtx.Done():
5985+
return true
5986+
default:
5987+
return false
5988+
}
5989+
}, 100*time.Millisecond, 10*time.Millisecond, "physical queue should NOT be stopped when effective config does not change")
5990+
}

0 commit comments

Comments
 (0)