1- import { IPluggableStorageWrapper , IStorageAsync , IStorageAsyncFactory , IStorageFactoryParams , ITelemetryCacheAsync } from '../types' ;
1+ import { IPluggableStorageWrapper , IStorageAsyncFactory , IStorageFactoryParams , ITelemetryCacheAsync } from '../types' ;
22
33import { KeyBuilderSS } from '../KeyBuilderSS' ;
44import { SplitsCachePluggable } from './SplitsCachePluggable' ;
@@ -62,11 +62,12 @@ export function PluggableStorage(options: PluggableStorageOptions): IStorageAsyn
6262
6363 const prefix = validatePrefix ( options . prefix ) ;
6464
65- function PluggableStorageFactory ( params : IStorageFactoryParams ) : IStorageAsync {
65+ function PluggableStorageFactory ( params : IStorageFactoryParams ) {
6666 const { onReadyCb, settings, settings : { log, mode, sync : { impressionsMode } , scheduler : { impressionsQueueSize, eventsQueueSize } } } = params ;
6767 const metadata = metadataBuilder ( settings ) ;
6868 const keys = new KeyBuilderSS ( prefix , metadata ) ;
6969 const wrapper = wrapperAdapter ( log , options . wrapper ) ;
70+ let connectPromise : Promise < void > ;
7071
7172 const isSyncronizer = mode === undefined ; // If mode is not defined, the synchronizer is running
7273 const isPartialConsumer = mode === CONSUMER_PARTIAL_MODE ;
@@ -89,35 +90,6 @@ export function PluggableStorage(options: PluggableStorageOptions): IStorageAsyn
8990 new UniqueKeysCachePluggable ( log , keys . buildUniqueKeysKey ( ) , wrapper ) :
9091 undefined ;
9192
92- // Connects to wrapper and emits SDK_READY event on main client
93- const connectPromise = wrapper . connect ( ) . then ( ( ) => {
94- if ( isSyncronizer ) {
95- // In standalone or producer mode, clear storage if SDK key or feature flag filter has changed
96- return wrapper . get ( keys . buildHashKey ( ) ) . then ( ( hash ) => {
97- const currentHash = getStorageHash ( settings ) ;
98- if ( hash !== currentHash ) {
99- log . info ( LOG_PREFIX + 'Storage HASH has changed (SDK key, flags filter criteria or flags spec version was modified). Clearing cache' ) ;
100- return wrapper . getKeysByPrefix ( `${ keys . prefix } .` ) . then ( storageKeys => {
101- return Promise . all ( storageKeys . map ( storageKey => wrapper . del ( storageKey ) ) ) ;
102- } ) . then ( ( ) => wrapper . set ( keys . buildHashKey ( ) , currentHash ) ) ;
103- }
104- } ) . then ( ( ) => {
105- onReadyCb ( ) ;
106- } ) ;
107- } else {
108- // Start periodic flush of async storages if not running synchronizer (producer mode)
109- if ( impressionCountsCache && ( impressionCountsCache as ImpressionCountsCachePluggable ) . start ) ( impressionCountsCache as ImpressionCountsCachePluggable ) . start ( ) ;
110- if ( uniqueKeysCache && ( uniqueKeysCache as UniqueKeysCachePluggable ) . start ) ( uniqueKeysCache as UniqueKeysCachePluggable ) . start ( ) ;
111- if ( telemetry && ( telemetry as ITelemetryCacheAsync ) . recordConfig ) ( telemetry as ITelemetryCacheAsync ) . recordConfig ( ) ;
112-
113- onReadyCb ( ) ;
114- }
115- } ) . catch ( ( e ) => {
116- e = e || new Error ( 'Error connecting wrapper' ) ;
117- onReadyCb ( e ) ;
118- return e ; // Propagate error for shared clients
119- } ) ;
120-
12193 return {
12294 splits : new SplitsCachePluggable ( log , keys , wrapper , settings . sync . __splitFiltersValidation ) ,
12395 segments : new SegmentsCachePluggable ( log , keys , wrapper ) ,
@@ -127,6 +99,39 @@ export function PluggableStorage(options: PluggableStorageOptions): IStorageAsyn
12799 telemetry,
128100 uniqueKeys : uniqueKeysCache ,
129101
102+ init ( ) {
103+ if ( connectPromise ) return connectPromise ;
104+
105+ // Connects to wrapper and emits SDK_READY event on main client
106+ return connectPromise = wrapper . connect ( ) . then ( ( ) => {
107+ if ( isSyncronizer ) {
108+ // In standalone or producer mode, clear storage if SDK key or feature flag filter has changed
109+ return wrapper . get ( keys . buildHashKey ( ) ) . then ( ( hash ) => {
110+ const currentHash = getStorageHash ( settings ) ;
111+ if ( hash !== currentHash ) {
112+ log . info ( LOG_PREFIX + 'Storage HASH has changed (SDK key, flags filter criteria or flags spec version was modified). Clearing cache' ) ;
113+ return wrapper . getKeysByPrefix ( `${ keys . prefix } .` ) . then ( storageKeys => {
114+ return Promise . all ( storageKeys . map ( storageKey => wrapper . del ( storageKey ) ) ) ;
115+ } ) . then ( ( ) => wrapper . set ( keys . buildHashKey ( ) , currentHash ) ) ;
116+ }
117+ } ) . then ( ( ) => {
118+ onReadyCb ( ) ;
119+ } ) ;
120+ } else {
121+ // Start periodic flush of async storages if not running synchronizer (producer mode)
122+ if ( impressionCountsCache && ( impressionCountsCache as ImpressionCountsCachePluggable ) . start ) ( impressionCountsCache as ImpressionCountsCachePluggable ) . start ( ) ;
123+ if ( uniqueKeysCache && ( uniqueKeysCache as UniqueKeysCachePluggable ) . start ) ( uniqueKeysCache as UniqueKeysCachePluggable ) . start ( ) ;
124+ if ( telemetry && ( telemetry as ITelemetryCacheAsync ) . recordConfig ) ( telemetry as ITelemetryCacheAsync ) . recordConfig ( ) ;
125+
126+ onReadyCb ( ) ;
127+ }
128+ } ) . catch ( ( e ) => {
129+ e = e || new Error ( 'Error connecting wrapper' ) ;
130+ onReadyCb ( e ) ;
131+ return e ; // Propagate error for shared clients
132+ } ) ;
133+ } ,
134+
130135 // Stop periodic flush and disconnect the underlying storage
131136 destroy ( ) {
132137 return Promise . all ( isSyncronizer ? [ ] : [
@@ -136,8 +141,8 @@ export function PluggableStorage(options: PluggableStorageOptions): IStorageAsyn
136141 } ,
137142
138143 // emits SDK_READY event on shared clients and returns a reference to the storage
139- shared ( _ , onReadyCb ) {
140- connectPromise . then ( onReadyCb ) ;
144+ shared ( _ : string , onReadyCb : ( error ?: any ) => void ) {
145+ this . init ( ) . then ( onReadyCb ) ;
141146
142147 return {
143148 ...this ,
0 commit comments