Skip to content

Commit 6cf58a3

Browse files
chore: Support flag change listeners in contract tests (#349)
Implements listenerRegistry in the test service to handle registerFlagChangeListener, registerFlagValueChangeListener, and unregisterListener commands. Each registration spawns a goroutine consuming events from the SDK's FlagTracker and POSTing notifications to the test harness callback URI. Advertises flag-change-listeners and flag-value-change-listeners capabilities. No changes to the SDK itself — only the test service is modified.
1 parent fb75f67 commit 6cf58a3

5 files changed

Lines changed: 220 additions & 25 deletions

File tree

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"sync"
6+
7+
"github.com/launchdarkly/go-sdk-common/v3/ldcontext"
8+
"github.com/launchdarkly/go-sdk-common/v3/ldvalue"
9+
"github.com/launchdarkly/go-server-sdk/v7/interfaces"
10+
"github.com/launchdarkly/go-server-sdk/v7/testservice/servicedef"
11+
)
12+
13+
// listenerEntry holds the cancellation handle for one registered listener goroutine.
14+
type listenerEntry struct {
15+
cancel context.CancelFunc
16+
}
17+
18+
// listenerRegistry manages all active flag change listener registrations for a single
19+
// SDK client entity. It is safe to use from multiple goroutines.
20+
type listenerRegistry struct {
21+
mu sync.Mutex
22+
listeners map[string]*listenerEntry // keyed by listenerId
23+
tracker interfaces.FlagTracker
24+
}
25+
26+
func newListenerRegistry(tracker interfaces.FlagTracker) *listenerRegistry {
27+
return &listenerRegistry{
28+
listeners: make(map[string]*listenerEntry),
29+
tracker: tracker,
30+
}
31+
}
32+
33+
// storeListener registers a new listener entry under listenerID, cancelling any
34+
// previously registered listener with the same ID. Returns the new entry's context.
35+
func (r *listenerRegistry) storeListener(listenerID string) context.Context {
36+
ctx, cancel := context.WithCancel(context.Background())
37+
r.mu.Lock()
38+
if old, exists := r.listeners[listenerID]; exists {
39+
old.cancel()
40+
}
41+
r.listeners[listenerID] = &listenerEntry{cancel: cancel}
42+
r.mu.Unlock()
43+
return ctx
44+
}
45+
46+
// registerFlagChangeListener subscribes to general flag configuration changes.
47+
// All flag change events are forwarded to the callback URI.
48+
func (r *listenerRegistry) registerFlagChangeListener(listenerID, callbackURI string) {
49+
ch := r.tracker.AddFlagChangeListener()
50+
ctx := r.storeListener(listenerID)
51+
52+
svc := callbackService{baseURL: callbackURI}
53+
go func() {
54+
defer r.tracker.RemoveFlagChangeListener(ch)
55+
for {
56+
select {
57+
case <-ctx.Done():
58+
return
59+
case event, ok := <-ch:
60+
if !ok {
61+
return
62+
}
63+
_ = svc.post("", servicedef.ListenerNotification{
64+
ListenerID: listenerID,
65+
FlagKey: event.Key,
66+
}, nil)
67+
}
68+
}
69+
}()
70+
}
71+
72+
// registerFlagValueChangeListener subscribes to value changes for a specific flag and
73+
// evaluation context. The callback is invoked only when the evaluated value actually
74+
// changes; configuration changes that leave the value unchanged are suppressed by the SDK.
75+
func (r *listenerRegistry) registerFlagValueChangeListener(
76+
listenerID, flagKey string,
77+
evalCtx ldcontext.Context,
78+
defaultValue ldvalue.Value,
79+
callbackURI string,
80+
) {
81+
ch := r.tracker.AddFlagValueChangeListener(flagKey, evalCtx, defaultValue)
82+
ctx := r.storeListener(listenerID)
83+
84+
svc := callbackService{baseURL: callbackURI}
85+
go func() {
86+
defer r.tracker.RemoveFlagValueChangeListener(ch)
87+
for {
88+
select {
89+
case <-ctx.Done():
90+
return
91+
case event, ok := <-ch:
92+
if !ok {
93+
return
94+
}
95+
oldVal := event.OldValue
96+
newVal := event.NewValue
97+
_ = svc.post("", servicedef.ListenerNotification{
98+
ListenerID: listenerID,
99+
FlagKey: event.Key,
100+
OldValue: &oldVal,
101+
NewValue: &newVal,
102+
}, nil)
103+
}
104+
}
105+
}()
106+
}
107+
108+
// unregister stops the listener goroutine for the given ID and removes it from the
109+
// registry. Returns false if no listener with that ID was found.
110+
func (r *listenerRegistry) unregister(listenerID string) bool {
111+
r.mu.Lock()
112+
entry, ok := r.listeners[listenerID]
113+
if ok {
114+
delete(r.listeners, listenerID)
115+
}
116+
r.mu.Unlock()
117+
118+
if ok {
119+
entry.cancel()
120+
}
121+
return ok
122+
}
123+
124+
// closeAll stops all active listener goroutines. Called when the SDK client entity closes.
125+
func (r *listenerRegistry) closeAll() {
126+
r.mu.Lock()
127+
listeners := r.listeners
128+
r.listeners = make(map[string]*listenerEntry)
129+
r.mu.Unlock()
130+
131+
for _, entry := range listeners {
132+
entry.cancel()
133+
}
134+
}

testservice/sdk_client_entity.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@ import (
3939
const defaultStartWaitTime = 5 * time.Second
4040

4141
type SDKClientEntity struct {
42-
sdk *ld.LDClient
43-
logger *log.Logger
42+
sdk *ld.LDClient
43+
logger *log.Logger
44+
listeners *listenerRegistry
4445
}
4546

4647
func NewSDKClientEntity(params servicedef.CreateInstanceParams) (*SDKClientEntity, error) {
@@ -71,11 +72,13 @@ func NewSDKClientEntity(params servicedef.CreateInstanceParams) (*SDKClientEntit
7172
return nil, err
7273
}
7374
c.sdk = sdk
75+
c.listeners = newListenerRegistry(sdk.GetFlagTracker())
7476

7577
return c, nil
7678
}
7779

7880
func (c *SDKClientEntity) Close() {
81+
c.listeners.closeAll()
7982
_ = c.sdk.Close()
8083
c.logger.Println("Test ended")
8184
c.logger.SetOutput(io.Discard)
@@ -130,6 +133,20 @@ func (c *SDKClientEntity) DoCommand(params servicedef.CommandParams) (interface{
130133
return servicedef.MigrationVariationResponse{Result: string(stage)}, nil
131134
case servicedef.CommandMigrationOperation:
132135
return c.migrationOperation(*params.MigrationOperation)
136+
case servicedef.CommandRegisterFlagChangeListener:
137+
p := params.RegisterFlagChangeListener
138+
c.listeners.registerFlagChangeListener(p.ListenerID, p.CallbackURI)
139+
return nil, nil
140+
case servicedef.CommandRegisterFlagValueChangeListener:
141+
p := params.RegisterFlagValueChangeListener
142+
c.listeners.registerFlagValueChangeListener(p.ListenerID, p.FlagKey, p.Context, p.DefaultValue, p.CallbackURI)
143+
return nil, nil
144+
case servicedef.CommandUnregisterListener:
145+
p := params.UnregisterListener
146+
if !c.listeners.unregister(p.ListenerID) {
147+
return nil, BadRequestError{Message: fmt.Sprintf("no listener with id %q", p.ListenerID)}
148+
}
149+
return nil, nil
133150
default:
134151
return nil, BadRequestError{Message: fmt.Sprintf("unknown command %q", params.Command)}
135152
}

testservice/service.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ var capabilities = []string{
4848
servicedef.CapabilityPersistentDataStoreRedis,
4949
servicedef.CapabilityPersistentDataStoreConsul,
5050
servicedef.CapabilityPersistentDataStoreDynamoDB,
51+
servicedef.CapabilityFlagChangeListeners,
52+
servicedef.CapabilityFlagValueChangeListeners,
5153
}
5254

5355
// gets the specified environment variable, or the default if not set

testservice/servicedef/command_params.go

Lines changed: 63 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,21 @@ import (
88
)
99

1010
const (
11-
CommandEvaluateFlag = "evaluate"
12-
CommandEvaluateAllFlags = "evaluateAll"
13-
CommandIdentifyEvent = "identifyEvent"
14-
CommandCustomEvent = "customEvent"
15-
CommandAliasEvent = "aliasEvent"
16-
CommandFlushEvents = "flushEvents"
17-
CommandGetBigSegmentStoreStatus = "getBigSegmentStoreStatus"
18-
CommandContextBuild = "contextBuild"
19-
CommandContextConvert = "contextConvert"
20-
CommandSecureModeHash = "secureModeHash"
21-
CommandMigrationVariation = "migrationVariation"
22-
CommandMigrationOperation = "migrationOperation"
11+
CommandEvaluateFlag = "evaluate"
12+
CommandEvaluateAllFlags = "evaluateAll"
13+
CommandIdentifyEvent = "identifyEvent"
14+
CommandCustomEvent = "customEvent"
15+
CommandAliasEvent = "aliasEvent"
16+
CommandFlushEvents = "flushEvents"
17+
CommandGetBigSegmentStoreStatus = "getBigSegmentStoreStatus"
18+
CommandContextBuild = "contextBuild"
19+
CommandContextConvert = "contextConvert"
20+
CommandSecureModeHash = "secureModeHash"
21+
CommandMigrationVariation = "migrationVariation"
22+
CommandMigrationOperation = "migrationOperation"
23+
CommandRegisterFlagChangeListener = "registerFlagChangeListener"
24+
CommandRegisterFlagValueChangeListener = "registerFlagValueChangeListener"
25+
CommandUnregisterListener = "unregisterListener"
2326
)
2427

2528
type ValueType string
@@ -33,16 +36,19 @@ const (
3336
)
3437

3538
type CommandParams struct {
36-
Command string `json:"command"`
37-
Evaluate *EvaluateFlagParams `json:"evaluate,omitempty"`
38-
EvaluateAll *EvaluateAllFlagsParams `json:"evaluateAll,omitempty"`
39-
CustomEvent *CustomEventParams `json:"customEvent,omitempty"`
40-
IdentifyEvent *IdentifyEventParams `json:"identifyEvent,omitempty"`
41-
ContextBuild *ContextBuildParams `json:"contextBuild,omitempty"`
42-
ContextConvert *ContextConvertParams `json:"contextConvert,omitempty"`
43-
SecureModeHash *SecureModeHashParams `json:"secureModeHash,omitempty"`
44-
MigrationVariation *MigrationVariationParams `json:"migrationVariation,omitempty"`
45-
MigrationOperation *MigrationOperationParams `json:"migrationOperation,omitempty"`
39+
Command string `json:"command"`
40+
Evaluate *EvaluateFlagParams `json:"evaluate,omitempty"`
41+
EvaluateAll *EvaluateAllFlagsParams `json:"evaluateAll,omitempty"`
42+
CustomEvent *CustomEventParams `json:"customEvent,omitempty"`
43+
IdentifyEvent *IdentifyEventParams `json:"identifyEvent,omitempty"`
44+
ContextBuild *ContextBuildParams `json:"contextBuild,omitempty"`
45+
ContextConvert *ContextConvertParams `json:"contextConvert,omitempty"`
46+
SecureModeHash *SecureModeHashParams `json:"secureModeHash,omitempty"`
47+
MigrationVariation *MigrationVariationParams `json:"migrationVariation,omitempty"`
48+
MigrationOperation *MigrationOperationParams `json:"migrationOperation,omitempty"`
49+
RegisterFlagChangeListener *RegisterFlagChangeListenerParams `json:"registerFlagChangeListener,omitempty"` //nolint:lll
50+
RegisterFlagValueChangeListener *RegisterFlagValueChangeListenerParams `json:"registerFlagValueChangeListener,omitempty"` //nolint:lll
51+
UnregisterListener *UnregisterListenerParams `json:"unregisterListener,omitempty"`
4652
}
4753

4854
type EvaluateFlagParams struct {
@@ -180,5 +186,39 @@ type HookExecutionEvaluationPayload struct {
180186

181187
type HookExecutionTrackPayload struct {
182188
TrackSeriesContext TrackSeriesContext `json:"trackSeriesContext,omitempty"`
183-
Stage HookStage `json:"stage,omitempty"`
189+
Stage HookStage `json:"stage,omitempty"`
190+
}
191+
192+
// RegisterFlagChangeListenerParams defines parameters for registering a general flag change listener.
193+
// The listener will be notified whenever any flag's configuration changes.
194+
type RegisterFlagChangeListenerParams struct {
195+
ListenerID string `json:"listenerId"`
196+
CallbackURI string `json:"callbackUri"`
197+
}
198+
199+
// RegisterFlagValueChangeListenerParams defines parameters for registering a flag value change listener.
200+
// The listener fires when the evaluated value of FlagKey changes for the given Context.
201+
type RegisterFlagValueChangeListenerParams struct {
202+
ListenerID string `json:"listenerId"`
203+
FlagKey string `json:"flagKey"`
204+
Context ldcontext.Context `json:"context"`
205+
DefaultValue ldvalue.Value `json:"defaultValue"`
206+
CallbackURI string `json:"callbackUri"`
207+
}
208+
209+
// UnregisterListenerParams defines parameters for unregistering a previously registered listener.
210+
// Works for both flag change and flag value change listeners.
211+
type UnregisterListenerParams struct {
212+
ListenerID string `json:"listenerId"`
213+
}
214+
215+
// ListenerNotification is the JSON payload POSTed by the test service to a callback URI when a
216+
// listener fires. OldValue and NewValue are only present for value-change notifications
217+
// (registerFlagValueChangeListener); they are nil for general flag-change notifications
218+
// (registerFlagChangeListener).
219+
type ListenerNotification struct {
220+
ListenerID string `json:"listenerId"`
221+
FlagKey string `json:"flagKey"`
222+
OldValue *ldvalue.Value `json:"oldValue,omitempty"`
223+
NewValue *ldvalue.Value `json:"newValue,omitempty"`
184224
}

testservice/servicedef/service_params.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ const (
2929
CapabilityPersistentDataStoreRedis = "persistent-data-store-redis"
3030
CapabilityPersistentDataStoreConsul = "persistent-data-store-consul"
3131
CapabilityPersistentDataStoreDynamoDB = "persistent-data-store-dynamodb"
32+
CapabilityFlagChangeListeners = "flag-change-listeners"
33+
CapabilityFlagValueChangeListeners = "flag-value-change-listeners"
3234
)
3335

3436
type StatusRep struct {

0 commit comments

Comments
 (0)