diff --git a/config/config.go b/config/config.go index 4570e051..d16bbefb 100644 --- a/config/config.go +++ b/config/config.go @@ -50,6 +50,11 @@ const ( // DefaultDatabaseCacheTTL is the default value for the LocalTTL parameter for databases if not specified. DefaultDatabaseCacheTTL = time.Second * 30 + // DefaultDataStoreHealthCheckInterval is the default interval for checking whether the persistent + // data store still contains its initialization data. If data loss is detected (e.g. after a Redis + // restart), the relay will automatically repopulate the store from its in-memory snapshot. + DefaultDataStoreHealthCheckInterval = time.Second * 30 + // DefaultPrometheusPort is the default value for PrometheusConfig.Port if not specified. DefaultPrometheusPort = 8031 @@ -212,13 +217,14 @@ type EventsConfig struct { // variables, individual fields are not documented here; instead, see the `README.md` section on // configuration. type RedisConfig struct { - Host string `conf:"REDIS_HOST"` - Port ct.OptIntGreaterThanZero - URL ct.OptURLAbsolute `conf:"REDIS_URL"` - LocalTTL ct.OptDuration `conf:"CACHE_TTL"` - TLS bool `conf:"REDIS_TLS"` - Username string `conf:"REDIS_USERNAME"` - Password string `conf:"REDIS_PASSWORD"` + Host string `conf:"REDIS_HOST"` + Port ct.OptIntGreaterThanZero + URL ct.OptURLAbsolute `conf:"REDIS_URL"` + LocalTTL ct.OptDuration `conf:"CACHE_TTL"` + TLS bool `conf:"REDIS_TLS"` + Username string `conf:"REDIS_USERNAME"` + Password string `conf:"REDIS_PASSWORD"` + HealthCheckInterval ct.OptDuration `conf:"REDIS_HEALTH_CHECK_INTERVAL"` } // ConsulConfig configures the optional Consul integration. @@ -231,10 +237,11 @@ type RedisConfig struct { // variables, individual fields are not documented here; instead, see the `README.md` section on // configuration. type ConsulConfig struct { - Host string `conf:"CONSUL_HOST"` - LocalTTL ct.OptDuration `conf:"CACHE_TTL"` - Token string `conf:"CONSUL_TOKEN"` - TokenFile string `conf:"CONSUL_TOKEN_FILE"` + Host string `conf:"CONSUL_HOST"` + LocalTTL ct.OptDuration `conf:"CACHE_TTL"` + Token string `conf:"CONSUL_TOKEN"` + TokenFile string `conf:"CONSUL_TOKEN_FILE"` + HealthCheckInterval ct.OptDuration `conf:"CONSUL_HEALTH_CHECK_INTERVAL"` } // DynamoDBConfig configures the optional DynamoDB integration, which is used only if Enabled is true. @@ -245,10 +252,11 @@ type ConsulConfig struct { // variables, individual fields are not documented here; instead, see the `README.md` section on // configuration. type DynamoDBConfig struct { - Enabled bool `conf:"USE_DYNAMODB"` - TableName string `conf:"DYNAMODB_TABLE"` - URL ct.OptURLAbsolute `conf:"DYNAMODB_URL"` - LocalTTL ct.OptDuration `conf:"CACHE_TTL"` + Enabled bool `conf:"USE_DYNAMODB"` + TableName string `conf:"DYNAMODB_TABLE"` + URL ct.OptURLAbsolute `conf:"DYNAMODB_URL"` + LocalTTL ct.OptDuration `conf:"CACHE_TTL"` + HealthCheckInterval ct.OptDuration `conf:"DYNAMODB_HEALTH_CHECK_INTERVAL"` } // EnvConfig describes an environment to be relayed. There may be any number of these. diff --git a/docs/persistent-storage.md b/docs/persistent-storage.md index 6a0ce1a3..7d7cde2f 100644 --- a/docs/persistent-storage.md +++ b/docs/persistent-storage.md @@ -129,6 +129,54 @@ note over Relay Proxy: TTL fresh, serve from memory Relay Proxy-->>SDK2: Streaming response ``` +## Data Store Health Check and Automatic Repopulation + +The Relay Proxy includes a health check mechanism that detects when a persistent store loses its data (for example, when Redis restarts without persistence enabled, or when Consul/DynamoDB data is deleted). Without this, SDKs using daemon mode (such as PHP) would receive empty flag evaluations until the Relay Proxy is manually restarted. + +### How it works + +When using a persistent data store (Redis, Consul, or DynamoDB), the Relay Proxy periodically checks for the presence of a sentinel key (`$inited`) that the SDK writes when it first populates the store. If this key is missing but the Relay has valid data in memory, it automatically repopulates the store. + +The health check also includes a **circuit breaker**: if a read from the persistent store fails with a connection error, subsequent reads are served directly from an in-memory snapshot, avoiding connection pool exhaustion and timeout cascades. The circuit breaker is cleared automatically when the health check confirms the store is available again. + +### Configuration + +The health check is enabled by default for all persistent store backends with a 30-second interval. The interval is configurable per backend. Setting the interval to `0` disables the health check. + +``` +# Configuration file examples +[Redis] + host = "localhost" + port = 6379 + localTtl = 30s + healthCheckInterval = 30s + +[Consul] + host = "localhost" + healthCheckInterval = 30s + +[DynamoDB] + tableName = "my-feature-flags" + healthCheckInterval = 30s +``` + +``` +# Environment variable examples +REDIS_HEALTH_CHECK_INTERVAL=30s +CONSUL_HEALTH_CHECK_INTERVAL=30s +DYNAMODB_HEALTH_CHECK_INTERVAL=30s + +# To disable the health check: +REDIS_HEALTH_CHECK_INTERVAL=0 +``` + +### Behavior summary + +- **Store read error (e.g. connection failure):** Circuit breaker activates immediately. Proxy-mode SDKs are served from the in-memory snapshot. The health check probes for recovery at the configured interval. +- **Store data loss (e.g. Redis restart, Consul KV deletion):** Detected within one health check interval. The store is automatically repopulated from the in-memory snapshot. +- **Store recovered:** Circuit breaker is cleared. Normal read path resumes. +- **No snapshot available (e.g. Relay just started):** Health check cannot repopulate. Errors pass through normally. + ## Example: Persistent Store during LaunchDarkly Outage - Cold Relay In this example, LaunchDarkly SaaS is down. Additionally, the Relay in this diagram is starting up **during** the diff --git a/internal/relayenv/env_context_impl.go b/internal/relayenv/env_context_impl.go index 1e56fa0c..68ae0630 100644 --- a/internal/relayenv/env_context_impl.go +++ b/internal/relayenv/env_context_impl.go @@ -30,6 +30,9 @@ import ( "github.com/launchdarkly/go-server-sdk/v7/subsystems" "github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoreimpl" "github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes" + + ldconsul "github.com/launchdarkly/go-server-sdk-consul/v3" + redigo "github.com/gomodule/redigo/redis" ) // LogNameMode is used in NewEnvContext to determine whether the environment's log messages should be @@ -120,6 +123,8 @@ type envContextImpl struct { stopMonitoringCredentials chan struct{} doneMonitoringCredentials chan struct{} connectionMapper ConnectionMapper + storeHealthCheck *store.StoreHealthCheck + storeInitChecker store.StoreInitCheckerCloser offline bool closed bool } @@ -401,6 +406,16 @@ func NewEnvContext( // Connecting may take time, so do this in parallel go envContext.startSDKClient(envConfig.SDKKey, readyCh, allConfig.Main.IgnoreConnectionErrors) + // Start the persistent store health check for any configured persistent store. + // A health check interval of 0 disables the check. + if initChecker, interval, err := createInitChecker(allConfig, envConfig); err != nil { + envLoggers.Errorf("Failed to create data store health checker: %s", err) + } else if initChecker != nil && interval > 0 { + envContext.storeInitChecker = initChecker + thingsToCleanUp.AddFunc(func() { _ = initChecker.Close() }) + envContext.deferredHealthCheckStart(initChecker, interval, envLoggers) + } + cleanupInterval := params.ExpiredCredentialCleanupInterval if cleanupInterval == 0 { // 0 means it wasn't specified; the config system disallows 0 as a valid value. cleanupInterval = defaultCredentialCleanupInterval @@ -412,6 +427,100 @@ func NewEnvContext( return envContext, nil } +func createInitChecker( + allConfig config.Config, + envConfig config.EnvConfig, +) (store.StoreInitCheckerCloser, time.Duration, error) { + if allConfig.Redis.URL.IsDefined() { + interval := allConfig.Redis.HealthCheckInterval.GetOrElse(config.DefaultDataStoreHealthCheckInterval) + if interval <= 0 { + return nil, 0, nil + } + redisURL, prefix := sdks.GetRedisBasicProperties(allConfig.Redis, envConfig) + var dialOptions []redigo.DialOption + if allConfig.Redis.Password != "" { + dialOptions = append(dialOptions, redigo.DialPassword(allConfig.Redis.Password)) + } + if allConfig.Redis.Username != "" { + dialOptions = append(dialOptions, redigo.DialUsername(allConfig.Redis.Username)) + } + checker := store.NewRedisInitChecker(redisURL, prefix, dialOptions) + return checker, interval, nil + } + + if allConfig.Consul.Host != "" { + interval := allConfig.Consul.HealthCheckInterval.GetOrElse(config.DefaultDataStoreHealthCheckInterval) + if interval <= 0 { + return nil, 0, nil + } + prefix := envConfig.Prefix + if prefix == "" { + prefix = ldconsul.DefaultPrefix + } + checker, err := store.NewConsulInitChecker( + allConfig.Consul.Host, allConfig.Consul.Token, allConfig.Consul.TokenFile, prefix, + ) + if err != nil { + return nil, 0, err + } + return checker, interval, nil + } + + if allConfig.DynamoDB.Enabled { + interval := allConfig.DynamoDB.HealthCheckInterval.GetOrElse(config.DefaultDataStoreHealthCheckInterval) + if interval <= 0 { + return nil, 0, nil + } + endpoint, tableName, prefix := sdks.GetDynamoDBBasicProperties(allConfig.DynamoDB, envConfig) + if tableName == "" { + return nil, 0, nil + } + checker, err := store.NewDynamoDBInitChecker(tableName, prefix, endpoint) + if err != nil { + return nil, 0, err + } + return checker, interval, nil + } + + return nil, 0, nil +} + +func (c *envContextImpl) deferredHealthCheckStart( + initChecker store.StoreInitChecker, + interval time.Duration, + loggers ldlog.Loggers, +) { + go func() { + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-ticker.C: + ss := c.storeAdapter.GetSnapshotStore() + if ss == nil { + continue + } + hc := store.NewStoreHealthCheck(ss, initChecker, interval, loggers) + if hc == nil { + return + } + c.mu.Lock() + if c.closed { + c.mu.Unlock() + return + } + c.storeHealthCheck = hc + c.mu.Unlock() + hc.Start() + loggers.Info("Data store health check started") + return + case <-c.stopMonitoringCredentials: + return + } + } + }() +} + func (c *envContextImpl) cleanupExpiredCredentials(interval time.Duration) { ticker := time.NewTicker(interval) defer ticker.Stop() @@ -748,6 +857,15 @@ func (c *envContextImpl) Close() error { if c.sdkBigSegments != nil { c.sdkBigSegments.Close() } + c.mu.RLock() + hc := c.storeHealthCheck + c.mu.RUnlock() + if hc != nil { + hc.Stop() + } + if c.storeInitChecker != nil { + _ = c.storeInitChecker.Close() + } return nil } diff --git a/internal/store/consul_init_checker.go b/internal/store/consul_init_checker.go new file mode 100644 index 00000000..2126f0ca --- /dev/null +++ b/internal/store/consul_init_checker.go @@ -0,0 +1,50 @@ +package store + +import ( + consul "github.com/hashicorp/consul/api" +) + +// ConsulInitChecker implements StoreInitChecker by directly querying Consul +// for the $inited KV key, bypassing the SDK's caching layer. +type ConsulInitChecker struct { + client *consul.Client + prefix string +} + +// NewConsulInitChecker creates a checker that connects to Consul at the given address. +// The prefix should match the store prefix used by the SDK (e.g. "launchdarkly"). +func NewConsulInitChecker(address string, token string, tokenFile string, prefix string) (*ConsulInitChecker, error) { + config := consul.DefaultConfig() + config.Address = address + if token != "" { + config.Token = token + } else if tokenFile != "" { + config.TokenFile = tokenFile + } + client, err := consul.NewClient(config) + if err != nil { + return nil, err + } + return &ConsulInitChecker{ + client: client, + prefix: prefix, + }, nil +} + +func (c *ConsulInitChecker) initedKey() string { + return c.prefix + "/$inited" +} + +// CheckInitialized checks if the $inited key exists in Consul KV. +func (c *ConsulInitChecker) CheckInitialized() (available bool, initialized bool, err error) { + pair, _, err := c.client.KV().Get(c.initedKey(), nil) + if err != nil { + return false, false, err + } + return true, pair != nil, nil +} + +// Close is a no-op for Consul (the HTTP client doesn't need explicit cleanup). +func (c *ConsulInitChecker) Close() error { + return nil +} diff --git a/internal/store/dynamodb_init_checker.go b/internal/store/dynamodb_init_checker.go new file mode 100644 index 00000000..d8935523 --- /dev/null +++ b/internal/store/dynamodb_init_checker.go @@ -0,0 +1,78 @@ +package store + +import ( + "context" + + "github.com/aws/aws-sdk-go-v2/aws" + awsconfig "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" +) + +const ( + dynamoTablePartitionKey = "namespace" + dynamoTableSortKey = "key" +) + +// DynamoDBInitChecker implements StoreInitChecker by directly querying DynamoDB +// for the $inited item, bypassing the SDK's caching layer. +type DynamoDBInitChecker struct { + client *dynamodb.Client + tableName string + prefix string +} + +// NewDynamoDBInitChecker creates a checker that connects to DynamoDB. +// The tableName and prefix should match those used by the SDK store. +// If endpoint is non-nil, it overrides the default AWS endpoint (for local testing). +func NewDynamoDBInitChecker(tableName string, prefix string, endpoint *string) (*DynamoDBInitChecker, error) { + cfg, err := awsconfig.LoadDefaultConfig(context.Background()) + if err != nil { + return nil, err + } + var options []func(*dynamodb.Options) + if endpoint != nil { + options = append(options, func(o *dynamodb.Options) { + o.BaseEndpoint = endpoint + }) + } + client := dynamodb.NewFromConfig(cfg, options...) + return &DynamoDBInitChecker{ + client: client, + tableName: tableName, + prefix: prefix, + }, nil +} + +func (d *DynamoDBInitChecker) initedKey() string { + if d.prefix == "" { + return "$inited" + } + return d.prefix + ":$inited" +} + +func attrValueStr(s string) *types.AttributeValueMemberS { + return &types.AttributeValueMemberS{Value: s} +} + +// CheckInitialized checks if the $inited item exists in the DynamoDB table. +func (d *DynamoDBInitChecker) CheckInitialized() (available bool, initialized bool, err error) { + key := d.initedKey() + result, err := d.client.GetItem(context.Background(), &dynamodb.GetItemInput{ + TableName: aws.String(d.tableName), + Key: map[string]types.AttributeValue{ + dynamoTablePartitionKey: attrValueStr(key), + dynamoTableSortKey: attrValueStr(key), + }, + ConsistentRead: aws.Bool(true), + }) + if err != nil { + return false, false, err + } + return true, len(result.Item) > 0, nil +} + +// Close is a no-op for DynamoDB (the client doesn't need explicit cleanup). +func (d *DynamoDBInitChecker) Close() error { + return nil +} diff --git a/internal/store/redis_init_checker.go b/internal/store/redis_init_checker.go new file mode 100644 index 00000000..d96bf5ef --- /dev/null +++ b/internal/store/redis_init_checker.go @@ -0,0 +1,51 @@ +package store + +import ( + "fmt" + + redigo "github.com/gomodule/redigo/redis" +) + +// RedisInitChecker implements StoreInitChecker by directly querying Redis for the +// $inited sentinel key, bypassing the SDK's caching layer. +type RedisInitChecker struct { + pool *redigo.Pool + prefix string +} + +// NewRedisInitChecker creates a checker that connects to Redis using the given URL and +// dial options. The prefix should match the store prefix used by the SDK (e.g. "launchdarkly"). +func NewRedisInitChecker(redisURL string, prefix string, dialOptions []redigo.DialOption) *RedisInitChecker { + pool := &redigo.Pool{ + MaxIdle: 1, + MaxActive: 1, + Dial: func() (redigo.Conn, error) { + return redigo.DialURL(redisURL, dialOptions...) + }, + } + return &RedisInitChecker{ + pool: pool, + prefix: prefix, + } +} + +func (r *RedisInitChecker) initedKey() string { + return fmt.Sprintf("%s:$inited", r.prefix) +} + +// CheckInitialized checks if the $inited key exists in Redis. +func (r *RedisInitChecker) CheckInitialized() (available bool, initialized bool, err error) { + conn := r.pool.Get() + defer conn.Close() //nolint:errcheck + + exists, err := redigo.Bool(conn.Do("EXISTS", r.initedKey())) + if err != nil { + return false, false, err + } + return true, exists, nil +} + +// Close releases the Redis connection pool. +func (r *RedisInitChecker) Close() error { + return r.pool.Close() +} diff --git a/internal/store/relay_feature_store.go b/internal/store/relay_feature_store.go index 5e3d6a7e..55dfb5b8 100644 --- a/internal/store/relay_feature_store.go +++ b/internal/store/relay_feature_store.go @@ -46,6 +46,21 @@ func (a *SSERelayDataStoreAdapter) GetStore() subsystems.DataStore { return store } +// GetSnapshotStore returns the current data store as a SnapshotStore (for health check use), +// or nil if the store has not been created. +func (a *SSERelayDataStoreAdapter) GetSnapshotStore() SnapshotStore { + a.mu.RLock() + s := a.store + a.mu.RUnlock() + if s == nil { + return nil + } + if ss, ok := s.(SnapshotStore); ok { + return ss + } + return nil +} + // GetUpdates returns the EnvStreamUpdates that will receive all updates sent to this store. This is // exposed for testing so that we can simulate receiving updates from LaunchDarkly to this component. func (a *SSERelayDataStoreAdapter) GetUpdates() streams.EnvStreamUpdates { @@ -89,10 +104,19 @@ func (a *SSERelayDataStoreAdapter) Build( // A DataStore implementation that delegates to an underlying store // but also publishes stream updates when the store is modified. +// It also maintains an in-memory snapshot of the latest dataset for resilience +// against data store failures (e.g., Redis restart causing data loss). type streamUpdatesStoreWrapper struct { store subsystems.DataStore updates streams.EnvStreamUpdates loggers ldlog.Loggers + + snapshotMu sync.RWMutex + snapshot []ldstoretypes.Collection + snapshotHasData bool + + storeDownMu sync.RWMutex + storeDown bool } func newStreamUpdatesStoreWrapper( @@ -108,6 +132,105 @@ func newStreamUpdatesStoreWrapper( return relayStore } +// HasSnapshot returns true if the wrapper has a valid snapshot with data. +func (sw *streamUpdatesStoreWrapper) HasSnapshot() bool { + sw.snapshotMu.RLock() + defer sw.snapshotMu.RUnlock() + return sw.snapshotHasData +} + +// GetSnapshot returns a structural copy of the current snapshot, or nil if none exists. +// The returned slices are independent but ItemDescriptor.Item pointers are shared (safe +// because flag/segment objects are immutable in the SDK). +func (sw *streamUpdatesStoreWrapper) GetSnapshot() []ldstoretypes.Collection { + sw.snapshotMu.RLock() + defer sw.snapshotMu.RUnlock() + if !sw.snapshotHasData { + return nil + } + return copyCollectionStructure(sw.snapshot) +} + +func (sw *streamUpdatesStoreWrapper) saveSnapshot(allData []ldstoretypes.Collection) { + hasData := false + for _, coll := range allData { + if len(coll.Items) > 0 { + hasData = true + break + } + } + + sw.snapshotMu.Lock() + defer sw.snapshotMu.Unlock() + if hasData { + sw.snapshot = copyCollectionStructure(allData) + sw.snapshotHasData = true + } else { + sw.snapshot = nil + sw.snapshotHasData = false + } +} + +func (sw *streamUpdatesStoreWrapper) updateSnapshotItem( + kind ldstoretypes.DataKind, + key string, + item ldstoretypes.ItemDescriptor, +) { + sw.snapshotMu.Lock() + defer sw.snapshotMu.Unlock() + if !sw.snapshotHasData { + return + } + + for i, coll := range sw.snapshot { + if coll.Kind.GetName() == kind.GetName() { + for j, existing := range coll.Items { + if existing.Key == key { + if item.Version >= existing.Item.Version { + sw.snapshot[i].Items[j] = ldstoretypes.KeyedItemDescriptor{ + Key: key, + Item: item, + } + } + return + } + } + sw.snapshot[i].Items = append(sw.snapshot[i].Items, ldstoretypes.KeyedItemDescriptor{ + Key: key, + Item: item, + }) + return + } + } + + sw.snapshot = append(sw.snapshot, ldstoretypes.Collection{ + Kind: kind, + Items: []ldstoretypes.KeyedItemDescriptor{ + {Key: key, Item: item}, + }, + }) +} + +// copyCollectionStructure copies the collection and item slices so that +// modifications to the returned structure (appending, replacing items) do not +// affect the source. The ItemDescriptor.Item pointers are shared, which is safe +// because the LaunchDarkly SDK treats flag/segment objects as immutable. +func copyCollectionStructure(src []ldstoretypes.Collection) []ldstoretypes.Collection { + if src == nil { + return nil + } + dst := make([]ldstoretypes.Collection, len(src)) + for i, coll := range src { + items := make([]ldstoretypes.KeyedItemDescriptor, len(coll.Items)) + copy(items, coll.Items) + dst[i] = ldstoretypes.Collection{ + Kind: coll.Kind, + Items: items, + } + } + return dst +} + func (sw *streamUpdatesStoreWrapper) Close() error { return sw.store.Close() } @@ -116,18 +239,103 @@ func (sw *streamUpdatesStoreWrapper) IsStatusMonitoringEnabled() bool { return sw.store.IsStatusMonitoringEnabled() } +// IsStoreDown returns true if the circuit breaker is open (store is considered unavailable). +func (sw *streamUpdatesStoreWrapper) IsStoreDown() bool { + sw.storeDownMu.RLock() + defer sw.storeDownMu.RUnlock() + return sw.storeDown +} + +// SetStoreDown sets or clears the circuit breaker state. +func (sw *streamUpdatesStoreWrapper) SetStoreDown(down bool) { + sw.storeDownMu.Lock() + defer sw.storeDownMu.Unlock() + sw.storeDown = down +} + func (sw *streamUpdatesStoreWrapper) Get(kind ldstoretypes.DataKind, key string) (ldstoretypes.ItemDescriptor, error) { - return sw.store.Get(kind, key) + if sw.IsStoreDown() && sw.HasSnapshot() { + return sw.getFromSnapshot(kind, key), nil + } + + item, err := sw.store.Get(kind, key) + if err != nil { + if sw.HasSnapshot() { + sw.openCircuitBreaker() + return sw.getFromSnapshot(kind, key), nil + } + return item, err + } + return item, nil } func (sw *streamUpdatesStoreWrapper) GetAll(kind ldstoretypes.DataKind) ([]ldstoretypes.KeyedItemDescriptor, error) { - return sw.store.GetAll(kind) + if sw.IsStoreDown() && sw.HasSnapshot() { + return sw.getAllFromSnapshot(kind), nil + } + + items, err := sw.store.GetAll(kind) + if err != nil { + if sw.HasSnapshot() { + sw.openCircuitBreaker() + return sw.getAllFromSnapshot(kind), nil + } + return nil, err + } + return items, nil +} + +func (sw *streamUpdatesStoreWrapper) openCircuitBreaker() { + sw.storeDownMu.Lock() + alreadyDown := sw.storeDown + sw.storeDown = true + sw.storeDownMu.Unlock() + if !alreadyDown { + sw.loggers.Warn("Data store read error, activating circuit breaker and serving from in-memory snapshot") + } +} + +func (sw *streamUpdatesStoreWrapper) getFromSnapshot(kind ldstoretypes.DataKind, key string) ldstoretypes.ItemDescriptor { + sw.snapshotMu.RLock() + defer sw.snapshotMu.RUnlock() + for _, coll := range sw.snapshot { + if coll.Kind.GetName() == kind.GetName() { + for _, item := range coll.Items { + if item.Key == key { + return item.Item + } + } + } + } + return ldstoretypes.ItemDescriptor{}.NotFound() +} + +func (sw *streamUpdatesStoreWrapper) getAllFromSnapshot(kind ldstoretypes.DataKind) []ldstoretypes.KeyedItemDescriptor { + sw.snapshotMu.RLock() + defer sw.snapshotMu.RUnlock() + for _, coll := range sw.snapshot { + if coll.Kind.GetName() == kind.GetName() { + items := make([]ldstoretypes.KeyedItemDescriptor, len(coll.Items)) + copy(items, coll.Items) + return items + } + } + return nil +} + +// RepopulateStore writes data directly to the underlying store, bypassing snapshot +// updates and SSE broadcasting. Used by the health check to restore Redis after data loss +// without risking snapshot regression from concurrent streaming Upserts. +func (sw *streamUpdatesStoreWrapper) RepopulateStore(allData []ldstoretypes.Collection) error { + return sw.store.Init(allData) } func (sw *streamUpdatesStoreWrapper) Init(allData []ldstoretypes.Collection) error { sw.loggers.Debug("Received all feature flags") err := sw.store.Init(allData) + sw.saveSnapshot(allData) + // See comments in Upsert for why we call SendAllDataUpdate here even if Init returned an error. sw.updates.SendAllDataUpdate(allData) @@ -163,6 +371,8 @@ func (sw *streamUpdatesStoreWrapper) Upsert( // connected clients, because they may be using the stream rather than the database as their source of // truth. + sw.updateSnapshotItem(kind, key, item) + sw.updates.SendSingleItemUpdate(kind, key, item) return updated, err diff --git a/internal/store/relay_feature_store_test.go b/internal/store/relay_feature_store_test.go index 48768f80..e9da6726 100644 --- a/internal/store/relay_feature_store_test.go +++ b/internal/store/relay_feature_store_test.go @@ -9,6 +9,7 @@ import ( "github.com/launchdarkly/go-server-sdk-evaluation/v3/ldbuilders" "github.com/launchdarkly/go-server-sdk/v7/subsystems" "github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoreimpl" + "github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -285,3 +286,307 @@ func TestStoreClose(t *testing.T) { wrappedStore.Close() assert.True(t, baseStore.closed) } + +func TestSnapshotInitSavesData(t *testing.T) { + _, wrappedStore, _ := makeTestComponents() + + assert.False(t, wrappedStore.HasSnapshot(), "snapshot should not exist before Init") + + err := wrappedStore.Init(allData) + require.NoError(t, err) + + assert.True(t, wrappedStore.HasSnapshot(), "snapshot should exist after Init with data") + + snapshot := wrappedStore.GetSnapshot() + require.NotNil(t, snapshot) + assert.Equal(t, len(allData), len(snapshot)) + + for i, coll := range allData { + assert.Equal(t, coll.Kind, snapshot[i].Kind) + assert.Equal(t, len(coll.Items), len(snapshot[i].Items)) + for j, item := range coll.Items { + assert.Equal(t, item.Key, snapshot[i].Items[j].Key) + assert.Equal(t, item.Item.Version, snapshot[i].Items[j].Item.Version) + } + } +} + +func TestSnapshotIsDeepCopy(t *testing.T) { + _, wrappedStore, _ := makeTestComponents() + err := wrappedStore.Init(allData) + require.NoError(t, err) + + snapshot1 := wrappedStore.GetSnapshot() + snapshot2 := wrappedStore.GetSnapshot() + + // Modifying one snapshot should not affect the other + snapshot1[0].Items = nil + assert.NotNil(t, snapshot2[0].Items, "snapshots should be independent deep copies") +} + +func TestSnapshotEmptyInitDoesNotSetHasData(t *testing.T) { + _, wrappedStore, _ := makeTestComponents() + err := wrappedStore.Init([]ldstoretypes.Collection{}) + require.NoError(t, err) + + assert.False(t, wrappedStore.HasSnapshot(), "snapshot should not be set for empty Init") +} + +func TestSnapshotNilInitDoesNotSetHasData(t *testing.T) { + _, wrappedStore, _ := makeTestComponents() + err := wrappedStore.Init(nil) + require.NoError(t, err) + + assert.False(t, wrappedStore.HasSnapshot(), "snapshot should not be set for nil Init") +} + +func TestSnapshotInitWithEmptyCollectionsDoesNotSetHasData(t *testing.T) { + _, wrappedStore, _ := makeTestComponents() + emptyCollections := []ldstoretypes.Collection{ + {Kind: ldstoreimpl.Features(), Items: []ldstoretypes.KeyedItemDescriptor{}}, + {Kind: ldstoreimpl.Segments(), Items: []ldstoretypes.KeyedItemDescriptor{}}, + } + err := wrappedStore.Init(emptyCollections) + require.NoError(t, err) + + assert.False(t, wrappedStore.HasSnapshot(), "snapshot should not be set when all collections are empty") +} + +func TestSnapshotUpsertUpdatesExistingItem(t *testing.T) { + _, wrappedStore, _ := makeTestComponents() + err := wrappedStore.Init(allData) + require.NoError(t, err) + + testFlag1v2 := ldbuilders.NewFlagBuilder(testFlag1.Key).Version(testFlag1.Version + 1).On(false).Build() + _, _ = sharedtest.UpsertFlag(wrappedStore, testFlag1v2) + + snapshot := wrappedStore.GetSnapshot() + require.NotNil(t, snapshot) + + var found bool + for _, coll := range snapshot { + if coll.Kind == ldstoreimpl.Features() { + for _, item := range coll.Items { + if item.Key == testFlag1.Key { + assert.Equal(t, testFlag1v2.Version, item.Item.Version) + found = true + } + } + } + } + assert.True(t, found, "updated flag should be in snapshot") +} + +func TestSnapshotUpsertAddsNewItem(t *testing.T) { + _, wrappedStore, _ := makeTestComponents() + err := wrappedStore.Init(allData) + require.NoError(t, err) + + _, _ = sharedtest.UpsertFlag(wrappedStore, testFlag2) + + snapshot := wrappedStore.GetSnapshot() + require.NotNil(t, snapshot) + + var foundFlag1, foundFlag2 bool + for _, coll := range snapshot { + if coll.Kind == ldstoreimpl.Features() { + for _, item := range coll.Items { + if item.Key == testFlag1.Key { + foundFlag1 = true + } + if item.Key == testFlag2.Key { + foundFlag2 = true + } + } + } + } + assert.True(t, foundFlag1, "original flag should still be in snapshot") + assert.True(t, foundFlag2, "new flag should be in snapshot") +} + +func TestSnapshotUpsertBeforeInitIsIgnored(t *testing.T) { + _, wrappedStore, _ := makeTestComponents() + + _, _ = sharedtest.UpsertFlag(wrappedStore, testFlag1) + assert.False(t, wrappedStore.HasSnapshot(), "snapshot should not be created by Upsert alone") +} + +func TestSnapshotUpsertSegment(t *testing.T) { + _, wrappedStore, _ := makeTestComponents() + err := wrappedStore.Init(allData) + require.NoError(t, err) + + testSegment1v2 := ldbuilders.NewSegmentBuilder(testSegment1.Key).Version(testSegment1.Version + 1).Build() + _, _ = sharedtest.UpsertSegment(wrappedStore, testSegment1v2) + + snapshot := wrappedStore.GetSnapshot() + require.NotNil(t, snapshot) + + var found bool + for _, coll := range snapshot { + if coll.Kind == ldstoreimpl.Segments() { + for _, item := range coll.Items { + if item.Key == testSegment1.Key { + assert.Equal(t, testSegment1v2.Version, item.Item.Version) + found = true + } + } + } + } + assert.True(t, found, "updated segment should be in snapshot") +} + +func TestSnapshotUpsertIgnoresOlderVersion(t *testing.T) { + _, wrappedStore, _ := makeTestComponents() + err := wrappedStore.Init(allData) + require.NoError(t, err) + + newerFlag := ldbuilders.NewFlagBuilder(testFlag1.Key).Version(testFlag1.Version + 10).On(false).Build() + _, _ = sharedtest.UpsertFlag(wrappedStore, newerFlag) + + olderFlag := ldbuilders.NewFlagBuilder(testFlag1.Key).Version(testFlag1.Version + 1).On(true).Build() + _, _ = sharedtest.UpsertFlag(wrappedStore, olderFlag) + + snapshot := wrappedStore.GetSnapshot() + require.NotNil(t, snapshot) + + for _, coll := range snapshot { + if coll.Kind == ldstoreimpl.Features() { + for _, item := range coll.Items { + if item.Key == testFlag1.Key { + assert.Equal(t, newerFlag.Version, item.Item.Version, + "snapshot should retain the newer version, not regress to older") + return + } + } + } + } + t.Fatal("flag not found in snapshot") +} + +// Circuit breaker tests + +func TestCircuitBreakerOpensOnGetError(t *testing.T) { + baseStore, wrappedStore, _ := makeTestComponents() + err := wrappedStore.Init(allData) + require.NoError(t, err) + + // Simulate store failure + baseStore.fakeError = fakeError + + item, err := wrappedStore.Get(ldstoreimpl.Features(), testFlag1.Key) + assert.NoError(t, err, "should not return error when snapshot is available") + assert.Equal(t, testFlag1.Version, item.Version) + assert.True(t, wrappedStore.IsStoreDown(), "circuit breaker should be open") +} + +func TestCircuitBreakerOpensOnGetAllError(t *testing.T) { + baseStore, wrappedStore, _ := makeTestComponents() + err := wrappedStore.Init(allData) + require.NoError(t, err) + + baseStore.fakeError = fakeError + + items, err := wrappedStore.GetAll(ldstoreimpl.Features()) + assert.NoError(t, err, "should not return error when snapshot is available") + assert.Equal(t, 1, len(items), "should return snapshot data") + assert.True(t, wrappedStore.IsStoreDown(), "circuit breaker should be open") +} + +func TestCircuitBreakerSkipsStoreOnSubsequentReads(t *testing.T) { + baseStore, wrappedStore, _ := makeTestComponents() + err := wrappedStore.Init(allData) + require.NoError(t, err) + + // Trigger circuit breaker + baseStore.fakeError = fakeError + _, _ = wrappedStore.Get(ldstoreimpl.Features(), testFlag1.Key) + assert.True(t, wrappedStore.IsStoreDown()) + + // Subsequent reads should serve from snapshot without touching the store + item, err := wrappedStore.Get(ldstoreimpl.Features(), testFlag1.Key) + assert.NoError(t, err) + assert.Equal(t, testFlag1.Version, item.Version) + + items, err := wrappedStore.GetAll(ldstoreimpl.Features()) + assert.NoError(t, err) + assert.Equal(t, 1, len(items)) +} + +func TestCircuitBreakerReturnsErrorWithoutSnapshot(t *testing.T) { + baseStore, wrappedStore, _ := makeTestComponents() + // No Init called, so no snapshot exists + + baseStore.fakeError = fakeError + + // Without snapshot, errors pass through even with circuit open + _, err := wrappedStore.Get(ldstoreimpl.Features(), "any-key") + assert.Equal(t, fakeError, err, "should return error when no snapshot is available") + + _, err = wrappedStore.GetAll(ldstoreimpl.Features()) + assert.Equal(t, fakeError, err, "should return error when no snapshot is available") + + // Circuit breaker should still be set even though we can't serve data + assert.False(t, wrappedStore.IsStoreDown(), "circuit should not open without snapshot") +} + +func TestCircuitBreakerFallsThroughToStoreWithoutSnapshot(t *testing.T) { + baseStore, wrappedStore, _ := makeTestComponents() + // No Init, so no snapshot. Set circuit breaker manually. + wrappedStore.SetStoreDown(true) + baseStore.fakeError = fakeError + + // With storeDown=true but no snapshot, it should still try the store + _, err := wrappedStore.Get(ldstoreimpl.Features(), "any-key") + assert.Equal(t, fakeError, err, "should fall through to store when no snapshot") + + _, err = wrappedStore.GetAll(ldstoreimpl.Features()) + assert.Equal(t, fakeError, err, "should fall through to store when no snapshot") +} + +func TestCircuitBreakerClearResumesNormalReads(t *testing.T) { + baseStore, wrappedStore, _ := makeTestComponents() + err := wrappedStore.Init(allData) + require.NoError(t, err) + + // Open circuit + baseStore.fakeError = fakeError + _, _ = wrappedStore.Get(ldstoreimpl.Features(), testFlag1.Key) + assert.True(t, wrappedStore.IsStoreDown()) + + // Clear circuit and fix the store + baseStore.fakeError = nil + wrappedStore.SetStoreDown(false) + assert.False(t, wrappedStore.IsStoreDown()) + + // Reads should go to the real store again + item, err := wrappedStore.Get(ldstoreimpl.Features(), testFlag1.Key) + assert.NoError(t, err) + assert.Equal(t, testFlag1.Version, item.Version) +} + +func TestCircuitBreakerGetReturnsNotFoundFromSnapshot(t *testing.T) { + _, wrappedStore, _ := makeTestComponents() + err := wrappedStore.Init(allData) + require.NoError(t, err) + + // Circuit is open, ask for a key that doesn't exist in snapshot + wrappedStore.SetStoreDown(true) + + item, err := wrappedStore.Get(ldstoreimpl.Features(), "nonexistent-key") + assert.NoError(t, err) + assert.Equal(t, ldstoretypes.ItemDescriptor{}.NotFound(), item) +} + +func TestCircuitBreakerGetAllReturnsEmptyForUnknownKind(t *testing.T) { + _, wrappedStore, _ := makeTestComponents() + err := wrappedStore.Init(allData) + require.NoError(t, err) + + wrappedStore.SetStoreDown(true) + + // Segments exist in snapshot, so this should work + items, err := wrappedStore.GetAll(ldstoreimpl.Segments()) + assert.NoError(t, err) + assert.Equal(t, 1, len(items)) +} diff --git a/internal/store/store_health_check.go b/internal/store/store_health_check.go new file mode 100644 index 00000000..2f129a0d --- /dev/null +++ b/internal/store/store_health_check.go @@ -0,0 +1,149 @@ +package store + +import ( + "sync" + "time" + + "github.com/launchdarkly/go-sdk-common/v3/ldlog" + "github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes" +) + +// StoreInitChecker abstracts the ability to check whether a persistent store still +// contains its initialization marker (e.g. the $inited sentinel key). Implementations +// should query the store directly, bypassing any SDK-level caching. +type StoreInitChecker interface { + // CheckInitialized returns: + // available=true, initialized=true → store is up and has data + // available=true, initialized=false → store is up but data is missing (needs repopulation) + // available=false with err → store is unreachable (retry later) + CheckInitialized() (available bool, initialized bool, err error) +} + +// StoreInitCheckerCloser extends StoreInitChecker with a Close method for resource cleanup. +type StoreInitCheckerCloser interface { + StoreInitChecker + Close() error +} + +// SnapshotStore is the interface that StoreHealthCheck uses to interact with the store wrapper. +// It provides access to snapshot data and circuit breaker state for resilience operations. +type SnapshotStore interface { + HasSnapshot() bool + GetSnapshot() []ldstoretypes.Collection + IsStoreDown() bool + SetStoreDown(bool) + RepopulateStore([]ldstoretypes.Collection) error + IsInitialized() bool +} + +// StoreHealthCheck periodically verifies that the persistent data store still contains +// its initialization data. If data loss is detected (e.g. after a Redis restart), it +// repopulates the store from the in-memory snapshot and manages the circuit breaker state. +type StoreHealthCheck struct { + store SnapshotStore + checker StoreInitChecker + interval time.Duration + loggers ldlog.Loggers + stopCh chan struct{} + stopOnce sync.Once +} + +// NewStoreHealthCheck creates a new health check instance. Returns nil if store or checker is nil. +func NewStoreHealthCheck( + store SnapshotStore, + checker StoreInitChecker, + interval time.Duration, + loggers ldlog.Loggers, +) *StoreHealthCheck { + if store == nil || checker == nil { + return nil + } + return &StoreHealthCheck{ + store: store, + checker: checker, + interval: interval, + loggers: loggers, + stopCh: make(chan struct{}), + } +} + +// Start begins the periodic health check in a background goroutine. +func (hc *StoreHealthCheck) Start() { + go hc.run() +} + +// Stop terminates the health check goroutine. Safe to call multiple times. +func (hc *StoreHealthCheck) Stop() { + hc.stopOnce.Do(func() { + close(hc.stopCh) + }) +} + +func (hc *StoreHealthCheck) run() { + ticker := time.NewTicker(hc.interval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + hc.check() + case <-hc.stopCh: + return + } + } +} + +func (hc *StoreHealthCheck) check() { + available, initialized, err := hc.checker.CheckInitialized() + + if err != nil { + hc.loggers.Debugf("Data store health check: connection error: %s", err) + return + } + + if !available { + hc.loggers.Debug("Data store health check: store not available") + return + } + + // Store is available + if initialized { + if hc.store.IsStoreDown() { + hc.loggers.Info("Data store has recovered, resuming normal reads") + hc.store.SetStoreDown(false) + } + return + } + + // Store is available but not initialized -- data was lost + hc.loggers.Warn("Data store lost initialization data, possible store restart detected") + + if !hc.store.HasSnapshot() { + hc.loggers.Warn("Cannot repopulate data store: no snapshot data available to restore") + return + } + + hc.repopulate() +} + +func (hc *StoreHealthCheck) repopulate() { + snapshot := hc.store.GetSnapshot() + if snapshot == nil { + return + } + + hc.loggers.Warn("Repopulating data store from in-memory snapshot") + + err := hc.store.RepopulateStore(snapshot) + if err != nil { + hc.loggers.Errorf("Failed to repopulate data store from snapshot: %s", err) + return + } + + hc.loggers.Info("Successfully repopulated data store from snapshot") + + if hc.store.IsStoreDown() { + hc.store.SetStoreDown(false) + hc.loggers.Info("Circuit breaker cleared after repopulation") + } +} diff --git a/internal/store/store_health_check_test.go b/internal/store/store_health_check_test.go new file mode 100644 index 00000000..32648599 --- /dev/null +++ b/internal/store/store_health_check_test.go @@ -0,0 +1,180 @@ +package store + +import ( + "sync" + "testing" + "time" + + "github.com/launchdarkly/go-sdk-common/v3/ldlog" + "github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoreimpl" + "github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type mockInitChecker struct { + mu sync.Mutex + available bool + inited bool + err error +} + +func (m *mockInitChecker) CheckInitialized() (available bool, initialized bool, err error) { + m.mu.Lock() + defer m.mu.Unlock() + return m.available, m.inited, m.err +} + +func (m *mockInitChecker) set(available, inited bool, err error) { + m.mu.Lock() + defer m.mu.Unlock() + m.available = available + m.inited = inited + m.err = err +} + +func makeHealthCheckTestComponents() (*mockStore, *streamUpdatesStoreWrapper, *mockInitChecker, *mockEnvStreamsUpdates) { + baseStore, wrappedStore, updates := makeTestComponents() + checker := &mockInitChecker{available: true, inited: true} + return baseStore, wrappedStore, checker, updates +} + +func TestHealthCheckDetectsDataLossAndRepopulates(t *testing.T) { + baseStore, wrappedStore, checker, _ := makeHealthCheckTestComponents() + err := wrappedStore.Init(allData) + require.NoError(t, err) + + // Clear the base store to simulate Redis data loss + _ = baseStore.Init(nil) + + // Redis is up but $inited is gone (data loss after restart) + checker.set(true, false, nil) + + hc := NewStoreHealthCheck(wrappedStore, checker, 10*time.Millisecond, ldlog.NewDisabledLoggers()) + require.NotNil(t, hc) + hc.Start() + defer hc.Stop() + + // Health check should trigger repopulation - verify data comes back + assert.Eventually(t, func() bool { + flags, e := baseStore.GetAll(ldstoreimpl.Features()) + return e == nil && len(flags) > 0 + }, time.Second, 5*time.Millisecond, "health check should repopulate the store") +} + +func TestHealthCheckClearsCircuitBreaker(t *testing.T) { + _, wrappedStore, checker, _ := makeHealthCheckTestComponents() + err := wrappedStore.Init(allData) + require.NoError(t, err) + + wrappedStore.SetStoreDown(true) + checker.set(true, true, nil) + + hc := NewStoreHealthCheck(wrappedStore, checker, 10*time.Millisecond, ldlog.NewDisabledLoggers()) + require.NotNil(t, hc) + hc.Start() + defer hc.Stop() + + assert.Eventually(t, func() bool { + return !wrappedStore.IsStoreDown() + }, time.Second, 5*time.Millisecond, "health check should clear circuit breaker") +} + +func TestHealthCheckDoesNothingOnConnectionError(t *testing.T) { + _, wrappedStore, checker, _ := makeHealthCheckTestComponents() + err := wrappedStore.Init(allData) + require.NoError(t, err) + + wrappedStore.SetStoreDown(true) + checker.set(false, false, fakeError) + + hc := NewStoreHealthCheck(wrappedStore, checker, 10*time.Millisecond, ldlog.NewDisabledLoggers()) + require.NotNil(t, hc) + hc.Start() + defer hc.Stop() + + time.Sleep(50 * time.Millisecond) + assert.True(t, wrappedStore.IsStoreDown(), "circuit breaker should remain open on connection error") +} + +func TestHealthCheckDoesNotRepopulateWithoutSnapshot(t *testing.T) { + _, wrappedStore, checker, updates := makeHealthCheckTestComponents() + // No Init called, no snapshot + checker.set(true, false, nil) + + hc := NewStoreHealthCheck(wrappedStore, checker, 10*time.Millisecond, ldlog.NewDisabledLoggers()) + require.NotNil(t, hc) + hc.Start() + defer hc.Stop() + + time.Sleep(50 * time.Millisecond) + updates.expectNoAllDataUpdate(t) +} + +func TestHealthCheckStops(t *testing.T) { + _, wrappedStore, checker, _ := makeHealthCheckTestComponents() + err := wrappedStore.Init(allData) + require.NoError(t, err) + checker.set(true, true, nil) + + hc := NewStoreHealthCheck(wrappedStore, checker, 10*time.Millisecond, ldlog.NewDisabledLoggers()) + require.NotNil(t, hc) + hc.Start() + hc.Stop() + // Verify Stop doesn't panic on double-call + hc.Stop() +} + +func TestHealthCheckNilParams(t *testing.T) { + hc := NewStoreHealthCheck(nil, nil, 10*time.Millisecond, ldlog.NewDisabledLoggers()) + assert.Nil(t, hc, "health check should not be created without store and checker") +} + +func TestHealthCheckRepopulationIncludesUpserts(t *testing.T) { + baseStore, wrappedStore, checker, _ := makeHealthCheckTestComponents() + err := wrappedStore.Init(allData) + require.NoError(t, err) + + // Upsert a new flag after Init + testFlag2Desc := ldstoretypes.ItemDescriptor{Version: testFlag2.Version, Item: &testFlag2} + _, _ = wrappedStore.Upsert(ldstoreimpl.Features(), testFlag2.Key, testFlag2Desc) + + // Clear store to simulate Redis restart + _ = baseStore.Init(nil) + + // Simulate Redis data loss + checker.set(true, false, nil) + + hc := NewStoreHealthCheck(wrappedStore, checker, 10*time.Millisecond, ldlog.NewDisabledLoggers()) + require.NotNil(t, hc) + hc.Start() + defer hc.Stop() + + // Wait for repopulation + assert.Eventually(t, func() bool { + flags, e := baseStore.GetAll(ldstoreimpl.Features()) + return e == nil && len(flags) == 2 + }, time.Second, 5*time.Millisecond, "repopulation should include upserted flags") +} + +func TestHealthCheckNormalOperationDoesNothing(t *testing.T) { + _, wrappedStore, checker, updates := makeHealthCheckTestComponents() + err := wrappedStore.Init(allData) + require.NoError(t, err) + // Reset the updates tracker since Init sends an update + updates.allData = nil + + // Everything is fine + checker.set(true, true, nil) + + hc := NewStoreHealthCheck(wrappedStore, checker, 10*time.Millisecond, ldlog.NewDisabledLoggers()) + require.NotNil(t, hc) + hc.Start() + defer hc.Stop() + + time.Sleep(50 * time.Millisecond) + // No repopulation should have been triggered + updates.expectNoAllDataUpdate(t) + assert.False(t, wrappedStore.IsStoreDown()) +}