Skip to content
11 changes: 8 additions & 3 deletions service/matching/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type (
NewMatcherSub dynamicconfig.TypedSubscribableWithTaskQueueFilter[dynamicconfig.GradualChange[bool]]
EnableFairnessSub dynamicconfig.TypedSubscribableWithTaskQueueFilter[dynamicconfig.GradualChange[bool]]
EnableMigration dynamicconfig.BoolPropertyFnWithTaskQueueFilter
AutoEnableV2 dynamicconfig.BoolPropertyFnWithTaskQueueFilter
AutoEnableV2Sub dynamicconfig.TypedSubscribableWithTaskQueueFilter[bool]
GetTasksBatchSize dynamicconfig.IntPropertyFnWithTaskQueueFilter
GetTasksReloadAt dynamicconfig.IntPropertyFnWithTaskQueueFilter
UpdateAckInterval dynamicconfig.DurationPropertyFnWithTaskQueueFilter
Expand Down Expand Up @@ -163,6 +163,7 @@ type (
EnableFairnessSub func(func(dynamicconfig.GradualChange[bool])) (dynamicconfig.GradualChange[bool], func())
EnableMigration func() bool
AutoEnableV2 func() bool
AutoEnableV2Sub func(func(bool)) (bool, func())
GetTasksBatchSize func() int
GetTasksReloadAt func() int
UpdateAckInterval func() time.Duration
Expand Down Expand Up @@ -276,7 +277,7 @@ func NewConfig(
NewMatcherSub: dynamicconfig.MatchingUseNewMatcher.Subscribe(dc),
EnableFairnessSub: dynamicconfig.MatchingEnableFairness.Subscribe(dc),
EnableMigration: dynamicconfig.MatchingEnableMigration.Get(dc),
AutoEnableV2: dynamicconfig.MatchingAutoEnableV2.Get(dc),
AutoEnableV2Sub: dynamicconfig.MatchingAutoEnableV2.Subscribe(dc),
GetTasksBatchSize: dynamicconfig.MatchingGetTasksBatchSize.Get(dc),
GetTasksReloadAt: dynamicconfig.MatchingGetTasksReloadAt.Get(dc),
UpdateAckInterval: dynamicconfig.MatchingUpdateAckInterval.Get(dc),
Expand Down Expand Up @@ -385,7 +386,11 @@ func newTaskQueueConfig(tq *tqid.TaskQueue, config *Config, ns namespace.Name) *
return config.EnableMigration(ns.String(), taskQueueName, taskType)
},
AutoEnableV2: func() bool {
return config.AutoEnableV2(ns.String(), taskQueueName, taskType)
v, _ := config.AutoEnableV2Sub(ns.String(), taskQueueName, taskType, nil)
return v
},
AutoEnableV2Sub: func(cb func(bool)) (bool, func()) {
return config.AutoEnableV2Sub(ns.String(), taskQueueName, taskType, cb)
},
GetTasksBatchSize: func() int {
return config.GetTasksBatchSize(ns.String(), taskQueueName, taskType)
Expand Down
212 changes: 211 additions & 1 deletion service/matching/matching_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5649,7 +5649,7 @@ func defaultTestConfig() *Config {
config := NewConfig(dynamicconfig.NewNoopCollection())
config.LongPollExpirationInterval = dynamicconfig.GetDurationPropertyFnFilteredByTaskQueue(100 * time.Millisecond)
config.MaxTaskDeleteBatchSize = dynamicconfig.GetIntPropertyFnFilteredByTaskQueue(1)
config.AutoEnableV2 = dynamicconfig.GetBoolPropertyFnFilteredByTaskQueue(true)
config.AutoEnableV2Sub = trueTaskQueueSub
return config
}

Expand Down Expand Up @@ -5691,6 +5691,10 @@ func staticTrueChange(_, _ string, _ enumspb.TaskQueueType, _ func(dynamicconfig
return dynamicconfig.StaticGradualChange(true), func() {}
}

func trueTaskQueueSub(_, _ string, _ enumspb.TaskQueueType, _ func(bool)) (bool, func()) {
return true, func() {}
}

func staticFalseChange(_, _ string, _ enumspb.TaskQueueType, _ func(dynamicconfig.GradualChange[bool])) (dynamicconfig.GradualChange[bool], func()) {
return dynamicconfig.StaticGradualChange(false), func() {}
}
Expand Down Expand Up @@ -5826,3 +5830,209 @@ func TestCancelOutstandingWorkerPolls(t *testing.T) {
require.Equal(t, 0, engine.shutdownWorkers.Size())
})
}

// TestAutoEnableV2ConfigChange tests that switching autoEnable triggers unload when effective config changes
func TestAutoEnableV2ConfigChange(t *testing.T) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be a parameterized test func so we can test more combinations of: {newMatcher config, enableFairness config, fairness state} -> expected result when autoenable flipped {on, off} ?

controller := gomock.NewController(t)

logger := testlogger.NewTestLogger(t, testlogger.FailOnAnyUnexpectedError)

dcClient := dynamicconfig.NewMemoryClient()
dcCollection := dynamicconfig.NewCollection(dcClient, logger)
dcCollection.Start()
defer dcCollection.Stop()

matchingClient := matchingservicemock.NewMockMatchingServiceClient(controller)
matchingClient.EXPECT().ForceLoadTaskQueuePartition(gomock.Any(), gomock.Any(), gomock.Any()).
Return(&matchingservice.ForceLoadTaskQueuePartitionResponse{}, nil).AnyTimes()

_, registry := createMockNamespaceCache(controller, namespace.Name(namespaceName))

config := NewConfig(dcCollection)
config.EnableMigration = dynamicconfig.GetBoolPropertyFnFilteredByTaskQueue(false)
config.LongPollExpirationInterval = dynamicconfig.GetDurationPropertyFnFilteredByTaskQueue(100 * time.Millisecond)
config.MaxTaskDeleteBatchSize = dynamicconfig.GetIntPropertyFnFilteredByTaskQueue(1)

engine := createTestMatchingEngine(logger, controller, config, matchingClient, registry)
engine.Start()
defer engine.Stop()

// autoEnable ON, base configs OFF -> with V2 fairnessState, effective config is NewMatcher=true, EnableFairness=true
cleanupAutoEnable := dcClient.OverrideSetting(dynamicconfig.MatchingAutoEnableV2, true)
cleanupFairness := dcClient.OverrideSetting(dynamicconfig.MatchingEnableFairness, false)
cleanupNewMatcher := dcClient.OverrideSetting(dynamicconfig.MatchingUseNewMatcher, false)
defer cleanupAutoEnable()
defer cleanupFairness()
defer cleanupNewMatcher()

testNamespaceID := uuid.NewString()
testTaskQueueName := "test-tq-" + uuid.NewString()

ns := namespace.NewLocalNamespaceForTest(
&persistencespb.NamespaceInfo{Name: "test-namespace", Id: testNamespaceID},
nil,
"",
)

f, err := tqid.NewTaskQueueFamily(testNamespaceID, testTaskQueueName)
require.NoError(t, err)
partition := f.TaskQueue(enumspb.TASK_QUEUE_TYPE_WORKFLOW).RootPartition()

tqConfig := newTaskQueueConfig(partition.TaskQueue(), engine.config, ns.Name())

userData := &mockUserDataManager{
data: &persistencespb.VersionedTaskQueueUserData{
Data: &persistencespb.TaskQueueUserData{
PerType: map[int32]*persistencespb.TaskQueueTypeUserData{
int32(enumspb.TASK_QUEUE_TYPE_WORKFLOW): {
FairnessState: enumsspb.FAIRNESS_STATE_V2,
},
},
},
},
}

pm, err := newTaskQueuePartitionManager(
engine,
ns,
partition,
tqConfig,
logger,
logger,
metrics.NoopMetricsHandler,
userData,
)
require.NoError(t, err)

engine.partitions[partition.Key()] = pm

pm.Start()
defer pm.Stop(unloadCauseIdle)

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
err = pm.WaitUntilInitialized(ctx)
require.NoError(t, err)

pq, err := pm.defaultQueueFuture.Get(ctx)
require.NoError(t, err)

// Turn autoEnable OFF -> effective config changes to NewMatcher=false, EnableFairness=false
cleanupAutoEnable()
_ = dcClient.OverrideSetting(dynamicconfig.MatchingAutoEnableV2, false)

require.Eventually(t, func() bool {
return !pm.config.AutoEnableV2()
}, 2*time.Second, 10*time.Millisecond, "autoEnable should be updated")

require.Eventually(t, func() bool {
return pq.(*physicalTaskQueueManagerImpl).tqCtx.Err() != nil
}, 2*time.Second, 10*time.Millisecond, "physical queue should be stopped when effective config changes")
}

func TestAutoEnableV2ConfigChange_NoUnloadWhenEffectiveConfigUnchanged(t *testing.T) {
controller := gomock.NewController(t)

logger := testlogger.NewTestLogger(t, testlogger.FailOnAnyUnexpectedError)

dcClient := dynamicconfig.NewMemoryClient()
dcCollection := dynamicconfig.NewCollection(dcClient, logger)
dcCollection.Start()
defer dcCollection.Stop()

matchingClient := matchingservicemock.NewMockMatchingServiceClient(controller)
matchingClient.EXPECT().ForceLoadTaskQueuePartition(gomock.Any(), gomock.Any(), gomock.Any()).
Return(&matchingservice.ForceLoadTaskQueuePartitionResponse{}, nil).AnyTimes()

_, registry := createMockNamespaceCache(controller, namespace.Name(namespaceName))

config := NewConfig(dcCollection)
config.EnableMigration = dynamicconfig.GetBoolPropertyFnFilteredByTaskQueue(false)
config.LongPollExpirationInterval = dynamicconfig.GetDurationPropertyFnFilteredByTaskQueue(100 * time.Millisecond)
config.MaxTaskDeleteBatchSize = dynamicconfig.GetIntPropertyFnFilteredByTaskQueue(1)

engine := createTestMatchingEngine(logger, controller, config, matchingClient, registry)
engine.Start()
defer engine.Stop()

// autoEnable OFF, base configs ON -> with V2 fairnessState, effective config is NewMatcher=true, EnableFairness=true
cleanupAutoEnable := dcClient.OverrideSetting(dynamicconfig.MatchingAutoEnableV2, false)
cleanupFairness := dcClient.OverrideSetting(dynamicconfig.MatchingEnableFairness, true)
cleanupNewMatcher := dcClient.OverrideSetting(dynamicconfig.MatchingUseNewMatcher, true)
defer cleanupAutoEnable()
defer cleanupFairness()
defer cleanupNewMatcher()

testNamespaceID := uuid.NewString()
testTaskQueueName := "test-tq-" + uuid.NewString()

ns := namespace.NewLocalNamespaceForTest(
&persistencespb.NamespaceInfo{Name: "test-namespace", Id: testNamespaceID},
nil,
"",
)

f, err := tqid.NewTaskQueueFamily(testNamespaceID, testTaskQueueName)
require.NoError(t, err)
partition := f.TaskQueue(enumspb.TASK_QUEUE_TYPE_WORKFLOW).RootPartition()

tqConfig := newTaskQueueConfig(partition.TaskQueue(), engine.config, ns.Name())

userData := &mockUserDataManager{
data: &persistencespb.VersionedTaskQueueUserData{
Data: &persistencespb.TaskQueueUserData{
PerType: map[int32]*persistencespb.TaskQueueTypeUserData{
int32(enumspb.TASK_QUEUE_TYPE_WORKFLOW): {
FairnessState: enumsspb.FAIRNESS_STATE_V2,
},
},
},
},
}

pm, err := newTaskQueuePartitionManager(
engine,
ns,
partition,
tqConfig,
logger,
logger,
metrics.NoopMetricsHandler,
userData,
)
require.NoError(t, err)

engine.partitions[partition.Key()] = pm

pm.Start()
defer pm.Stop(unloadCauseIdle)

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
err = pm.WaitUntilInitialized(ctx)
require.NoError(t, err)

require.Eventually(t, func() bool {
return !pm.config.AutoEnableV2() && pm.config.NewMatcher && pm.config.EnableFairness
}, 2*time.Second, 10*time.Millisecond, "config should be initialized")

pq, err := pm.defaultQueueFuture.Get(ctx)
require.NoError(t, err)

// Turn autoEnable ON -> effective config stays NewMatcher=true, EnableFairness=true (same as before)
cleanupAutoEnable()
_ = dcClient.OverrideSetting(dynamicconfig.MatchingAutoEnableV2, true)

require.Eventually(t, func() bool {
return pm.config.AutoEnableV2()
}, 2*time.Second, 10*time.Millisecond, "autoEnable should be updated")

require.Never(t, func() bool {
select {
case <-pq.(*physicalTaskQueueManagerImpl).tqCtx.Done():
return true
default:
return false
}
}, 100*time.Millisecond, 10*time.Millisecond, "physical queue should NOT be stopped when effective config does not change")
}
Loading
Loading