From e6a4d765ceae345e3d85e7713ecd0b804926e314 Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Thu, 19 Mar 2026 15:11:20 -0700 Subject: [PATCH 1/4] feat: add FDv2 types, refined validators, and DataManager interface extensions Add InitializerEntry/SynchronizerEntry types for type-safe mode definitions, refine protocolHandler validation to use isNullish for version/target fields, add connectionModes config option, split validator arrays for initializers vs synchronizers, and extend DataManager with optional streaming/flush methods. --- .../src/internal/fdv2/protocolHandler.ts | 38 +++++++++++++------ .../datasource/ConnectionModeConfig.test.ts | 38 +++++++++++++++++++ packages/shared/sdk-client/src/DataManager.ts | 27 +++++++++++++ .../src/api/datasource/DataSourceEntry.ts | 16 ++++++++ .../datasource/LDClientDataSystemOptions.ts | 24 +++++++++++- .../src/api/datasource/ModeDefinition.ts | 6 +-- .../sdk-client/src/api/datasource/index.ts | 2 + .../src/datasource/ConnectionModeConfig.ts | 11 ++++-- .../datasource/LDClientDataSystemOptions.ts | 3 +- .../src/datasource/fdv2/FDv2DataSource.ts | 4 ++ packages/shared/sdk-client/src/index.ts | 7 ++++ packages/shared/sdk-client/src/types/index.ts | 4 +- 12 files changed, 158 insertions(+), 22 deletions(-) diff --git a/packages/shared/common/src/internal/fdv2/protocolHandler.ts b/packages/shared/common/src/internal/fdv2/protocolHandler.ts index 5d13654ebf..3051c262ce 100644 --- a/packages/shared/common/src/internal/fdv2/protocolHandler.ts +++ b/packages/shared/common/src/internal/fdv2/protocolHandler.ts @@ -1,4 +1,5 @@ import { LDLogger } from '../../api'; +import { isNullish } from '../../validators'; import { DeleteObject, FDv2Event, @@ -111,7 +112,10 @@ export function createProtocolHandler( } function processIntentNone(intent: PayloadIntent): ProtocolAction { - if (!intent.id || !intent.target) { + if (!intent.id || isNullish(intent.target)) { + logger?.warn( + `Ignoring 'none' intent with missing fields: id=${intent.id}, target=${intent.target}`, + ); return ACTION_NONE; } @@ -164,14 +168,15 @@ export function createProtocolHandler( } function processPutObject(data: PutObject): ProtocolAction { - if ( - protocolState === 'inactive' || - !tempId || - !data.kind || - !data.key || - !data.version || - !data.object - ) { + if (protocolState === 'inactive' || !tempId) { + logger?.warn('Received put-object before server-intent was established. Ignoring.'); + return ACTION_NONE; + } + + if (!data.kind || !data.key || isNullish(data.version) || !data.object) { + logger?.warn( + `Ignoring put-object with missing fields: kind=${data.kind}, key=${data.key}, version=${data.version}`, + ); return ACTION_NONE; } @@ -191,7 +196,15 @@ export function createProtocolHandler( } function processDeleteObject(data: DeleteObject): ProtocolAction { - if (protocolState === 'inactive' || !tempId || !data.kind || !data.key || !data.version) { + if (protocolState === 'inactive' || !tempId) { + logger?.warn('Received delete-object before server-intent was established. Ignoring.'); + return ACTION_NONE; + } + + if (!data.kind || !data.key || isNullish(data.version)) { + logger?.warn( + `Ignoring delete-object with missing fields: kind=${data.kind}, key=${data.key}, version=${data.version}`, + ); return ACTION_NONE; } @@ -214,7 +227,10 @@ export function createProtocolHandler( }; } - if (!tempId || data.state === null || data.state === undefined || !data.version) { + if (!tempId || isNullish(data.state) || isNullish(data.version)) { + logger?.warn( + `Ignoring payload-transferred with missing fields: state=${data.state}, version=${data.version}`, + ); resetAll(); return ACTION_NONE; } diff --git a/packages/shared/sdk-client/__tests__/datasource/ConnectionModeConfig.test.ts b/packages/shared/sdk-client/__tests__/datasource/ConnectionModeConfig.test.ts index b2422620dc..25ae203897 100644 --- a/packages/shared/sdk-client/__tests__/datasource/ConnectionModeConfig.test.ts +++ b/packages/shared/sdk-client/__tests__/datasource/ConnectionModeConfig.test.ts @@ -257,6 +257,44 @@ describe('given entries with invalid type field', () => { }); }); +describe('given cache entries in synchronizers', () => { + it('discards a cache entry from synchronizers and warns', () => { + const result = validateModeDefinition( + { initializers: [], synchronizers: [{ type: 'cache' }] }, + 'testMode', + logger, + ); + + expect(result.synchronizers).toEqual([]); + expect(logger.warn).toHaveBeenCalledWith(expect.stringContaining('got cache')); + }); + + it('keeps valid synchronizer entries and discards cache', () => { + const result = validateModeDefinition( + { + initializers: [], + synchronizers: [{ type: 'polling' }, { type: 'cache' }, { type: 'streaming' }], + }, + 'testMode', + logger, + ); + + expect(result.synchronizers).toEqual([{ type: 'polling' }, { type: 'streaming' }]); + expect(logger.warn).toHaveBeenCalledTimes(1); + }); + + it('allows cache as an initializer', () => { + const result = validateModeDefinition( + { initializers: [{ type: 'cache' }], synchronizers: [] }, + 'testMode', + logger, + ); + + expect(result.initializers).toEqual([{ type: 'cache' }]); + expect(logger.warn).not.toHaveBeenCalled(); + }); +}); + describe('given polling entries with invalid config', () => { it('drops pollInterval when it is a string and warns', () => { const result = validateModeDefinition( diff --git a/packages/shared/sdk-client/src/DataManager.ts b/packages/shared/sdk-client/src/DataManager.ts index d379952c31..644b06d399 100644 --- a/packages/shared/sdk-client/src/DataManager.ts +++ b/packages/shared/sdk-client/src/DataManager.ts @@ -53,6 +53,33 @@ export interface DataManager { * Closes the data manager. Any active connections are closed. */ close(): void; + + /** + * Force streaming on or off. When `true`, the data manager should + * maintain a streaming connection. When `false`, streaming is disabled. + * When `undefined`, the forced state is cleared and automatic behavior + * takes over. + * + * Optional — only browser data managers implement this. + */ + setForcedStreaming?(streaming?: boolean): void; + + /** + * Update the automatic streaming state based on whether change listeners + * are registered. When `true` and forced streaming is not set, the data + * manager should activate streaming. + * + * Optional — only browser data managers implement this. + */ + setAutomaticStreamingState?(streaming: boolean): void; + + /** + * Set a callback to flush pending analytics events. Called immediately + * (not debounced) when the lifecycle transitions to background. + * + * Optional — only FDv2 data managers implement this. + */ + setFlushCallback?(callback: () => void): void; } /** diff --git a/packages/shared/sdk-client/src/api/datasource/DataSourceEntry.ts b/packages/shared/sdk-client/src/api/datasource/DataSourceEntry.ts index 873576edb9..0385fe6156 100644 --- a/packages/shared/sdk-client/src/api/datasource/DataSourceEntry.ts +++ b/packages/shared/sdk-client/src/api/datasource/DataSourceEntry.ts @@ -14,6 +14,7 @@ export interface EndpointConfig { /** * Configuration for a cache data source entry. + * Cache is only valid as an initializer (not a synchronizer). */ export interface CacheDataSourceEntry { readonly type: 'cache'; @@ -45,6 +46,21 @@ export interface StreamingDataSourceEntry { readonly endpoints?: EndpointConfig; } +/** + * An entry in the initializers list of a mode definition. Initializers + * can be cache, polling, or streaming sources. + */ +export type InitializerEntry = + | CacheDataSourceEntry + | PollingDataSourceEntry + | StreamingDataSourceEntry; + +/** + * An entry in the synchronizers list of a mode definition. Synchronizers + * can be polling or streaming sources (not cache). + */ +export type SynchronizerEntry = PollingDataSourceEntry | StreamingDataSourceEntry; + /** * A data source entry in a mode table. Each entry identifies a data source type * and carries type-specific configuration overrides. diff --git a/packages/shared/sdk-client/src/api/datasource/LDClientDataSystemOptions.ts b/packages/shared/sdk-client/src/api/datasource/LDClientDataSystemOptions.ts index 470a481326..bcd45c1fc4 100644 --- a/packages/shared/sdk-client/src/api/datasource/LDClientDataSystemOptions.ts +++ b/packages/shared/sdk-client/src/api/datasource/LDClientDataSystemOptions.ts @@ -1,4 +1,5 @@ import FDv2ConnectionMode from './FDv2ConnectionMode'; +import { ModeDefinition } from './ModeDefinition'; // When FDv2 becomes the default, this should be integrated into the // main LDOptions interface (api/LDOptions.ts). @@ -48,8 +49,27 @@ export interface LDClientDataSystemOptions { */ automaticModeSwitching?: boolean | AutomaticModeSwitchingConfig; - // Req 5.3.5 TBD — custom named modes reserved for future use. - // customModes?: Record; + /** + * Override the data source pipeline for specific connection modes. + * + * Each key is a connection mode name (`'streaming'`, `'polling'`, `'offline'`, + * `'one-shot'`, `'background'`). The value defines the initializers and + * synchronizers for that mode, replacing the built-in defaults. + * + * Only the modes you specify are overridden — unspecified modes retain + * their built-in definitions. + * + * @example + * ``` + * connectionModes: { + * streaming: { + * initializers: [{ type: 'polling' }], + * synchronizers: [{ type: 'streaming' }], + * }, + * } + * ``` + */ + connectionModes?: Partial>; } /** diff --git a/packages/shared/sdk-client/src/api/datasource/ModeDefinition.ts b/packages/shared/sdk-client/src/api/datasource/ModeDefinition.ts index 475eb4fb9b..97d221f923 100644 --- a/packages/shared/sdk-client/src/api/datasource/ModeDefinition.ts +++ b/packages/shared/sdk-client/src/api/datasource/ModeDefinition.ts @@ -1,4 +1,4 @@ -import { DataSourceEntry } from './DataSourceEntry'; +import { InitializerEntry, SynchronizerEntry } from './DataSourceEntry'; /** * Defines the data pipeline for a connection mode: which data sources @@ -10,7 +10,7 @@ export interface ModeDefinition { * Sources are tried in order; the first that successfully provides a full * data set transitions the SDK out of the initialization phase. */ - readonly initializers: ReadonlyArray; + readonly initializers: ReadonlyArray; /** * Ordered list of data sources for ongoing synchronization after @@ -18,5 +18,5 @@ export interface ModeDefinition { * failover to the next source if the primary fails. * An empty array means no synchronization occurs (e.g., offline, one-shot). */ - readonly synchronizers: ReadonlyArray; + readonly synchronizers: ReadonlyArray; } diff --git a/packages/shared/sdk-client/src/api/datasource/index.ts b/packages/shared/sdk-client/src/api/datasource/index.ts index e9a50e129d..95a688808b 100644 --- a/packages/shared/sdk-client/src/api/datasource/index.ts +++ b/packages/shared/sdk-client/src/api/datasource/index.ts @@ -4,6 +4,8 @@ export type { CacheDataSourceEntry, PollingDataSourceEntry, StreamingDataSourceEntry, + InitializerEntry, + SynchronizerEntry, DataSourceEntry, } from './DataSourceEntry'; export type { ModeDefinition } from './ModeDefinition'; diff --git a/packages/shared/sdk-client/src/datasource/ConnectionModeConfig.ts b/packages/shared/sdk-client/src/datasource/ConnectionModeConfig.ts index 03b0eb7082..51573af7e3 100644 --- a/packages/shared/sdk-client/src/datasource/ConnectionModeConfig.ts +++ b/packages/shared/sdk-client/src/datasource/ConnectionModeConfig.ts @@ -42,15 +42,20 @@ const streamingEntryValidators = { endpoints: validatorOf(endpointValidators), }; -const dataSourceEntryArrayValidator = arrayOf('type', { +const initializerEntryArrayValidator = arrayOf('type', { cache: cacheEntryValidators, polling: pollingEntryValidators, streaming: streamingEntryValidators, }); +const synchronizerEntryArrayValidator = arrayOf('type', { + polling: pollingEntryValidators, + streaming: streamingEntryValidators, +}); + const modeDefinitionValidators = { - initializers: dataSourceEntryArrayValidator, - synchronizers: dataSourceEntryArrayValidator, + initializers: initializerEntryArrayValidator, + synchronizers: synchronizerEntryArrayValidator, }; const MODE_DEFINITION_DEFAULTS: Record = { diff --git a/packages/shared/sdk-client/src/datasource/LDClientDataSystemOptions.ts b/packages/shared/sdk-client/src/datasource/LDClientDataSystemOptions.ts index c1595dcb22..639b875176 100644 --- a/packages/shared/sdk-client/src/datasource/LDClientDataSystemOptions.ts +++ b/packages/shared/sdk-client/src/datasource/LDClientDataSystemOptions.ts @@ -2,7 +2,7 @@ import { TypeValidators } from '@launchdarkly/js-sdk-common'; import type { PlatformDataSystemDefaults } from '../api/datasource'; import { anyOf, validatorOf } from '../configuration/validateOptions'; -import { connectionModeValidator } from './ConnectionModeConfig'; +import { connectionModesValidator, connectionModeValidator } from './ConnectionModeConfig'; const modeSwitchingValidators = { lifecycle: TypeValidators.Boolean, @@ -13,6 +13,7 @@ const dataSystemValidators = { initialConnectionMode: connectionModeValidator, backgroundConnectionMode: connectionModeValidator, automaticModeSwitching: anyOf(TypeValidators.Boolean, validatorOf(modeSwitchingValidators)), + connectionModes: connectionModesValidator, }; /** diff --git a/packages/shared/sdk-client/src/datasource/fdv2/FDv2DataSource.ts b/packages/shared/sdk-client/src/datasource/fdv2/FDv2DataSource.ts index d8c122c908..6585163fc3 100644 --- a/packages/shared/sdk-client/src/datasource/fdv2/FDv2DataSource.ts +++ b/packages/shared/sdk-client/src/datasource/fdv2/FDv2DataSource.ts @@ -208,6 +208,10 @@ export function createFDv2DataSource(config: FDv2DataSourceConfig): FDv2DataSour recoveryTimeoutMs, ); + if (conditions.promise) { + logger?.warn('Fallback condition active for current synchronizer.'); + } + // try/finally ensures conditions are closed on all code paths. let synchronizerRunning = true; try { diff --git a/packages/shared/sdk-client/src/index.ts b/packages/shared/sdk-client/src/index.ts index 192f812dba..048f24cc95 100644 --- a/packages/shared/sdk-client/src/index.ts +++ b/packages/shared/sdk-client/src/index.ts @@ -81,6 +81,8 @@ export type { CacheDataSourceEntry, PollingDataSourceEntry, StreamingDataSourceEntry, + InitializerEntry, + SynchronizerEntry, DataSourceEntry, ModeDefinition, LDClientDataSystemOptions, @@ -94,6 +96,10 @@ export type { ModeResolutionTable, } from './api/datasource'; +// FDv2 data source status manager. +export { createDataSourceStatusManager } from './datasource/DataSourceStatusManager'; +export type { DataSourceStatusManager } from './datasource/DataSourceStatusManager'; + // FDv2 data system validators and platform defaults. export { dataSystemValidators, @@ -104,6 +110,7 @@ export { // FDv2 connection mode type system — internal implementation. export type { ModeTable } from './datasource/ConnectionModeConfig'; +export { MODE_TABLE } from './datasource/ConnectionModeConfig'; export { resolveConnectionMode, MOBILE_TRANSITION_TABLE, diff --git a/packages/shared/sdk-client/src/types/index.ts b/packages/shared/sdk-client/src/types/index.ts index 9ffe3dbe31..19be292b8b 100644 --- a/packages/shared/sdk-client/src/types/index.ts +++ b/packages/shared/sdk-client/src/types/index.ts @@ -21,10 +21,10 @@ export type DeleteFlag = Pick; /** * Represents a pre-evaluated flag result for a specific context, as delivered - * by the FDv2 protocol via `put-object` events with `kind: 'flag_eval'`. + * by the FDv2 protocol via `put-object` events with `kind: 'flag-eval'`. * * This is the shape of the `object` field in a `put-object` event with - * `kind: 'flag_eval'`. It contains all the same fields as {@link Flag} except + * `kind: 'flag-eval'`. It contains all the same fields as {@link Flag} except * `version`, which is provided separately in the `put-object` envelope. * * There is no aggregate payload-level version field; per-flag versioning is From cb9d6b6c885b5edcb42ba636a401c5483efb0af0 Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Thu, 19 Mar 2026 15:29:36 -0700 Subject: [PATCH 2/4] feat: add SourceFactoryProvider for declarative data source creation Add SourceFactoryProvider that converts declarative InitializerEntry and SynchronizerEntry config into concrete initializer factories and synchronizer slots, with support for per-entry endpoint and interval overrides. --- .../datasource/SourceFactoryProvider.test.ts | 310 ++++++++++++++++++ .../src/datasource/SourceFactoryProvider.ts | 233 +++++++++++++ packages/shared/sdk-client/src/index.ts | 7 + 3 files changed, 550 insertions(+) create mode 100644 packages/shared/sdk-client/__tests__/datasource/SourceFactoryProvider.test.ts create mode 100644 packages/shared/sdk-client/src/datasource/SourceFactoryProvider.ts diff --git a/packages/shared/sdk-client/__tests__/datasource/SourceFactoryProvider.test.ts b/packages/shared/sdk-client/__tests__/datasource/SourceFactoryProvider.test.ts new file mode 100644 index 0000000000..2faeec0b8c --- /dev/null +++ b/packages/shared/sdk-client/__tests__/datasource/SourceFactoryProvider.test.ts @@ -0,0 +1,310 @@ +import { + Context, + Crypto, + Encoding, + LDLogger, + Requests, + ServiceEndpoints, +} from '@launchdarkly/js-sdk-common'; + +import { InitializerEntry, SynchronizerEntry } from '../../src/api/datasource'; +import { DataSourcePaths } from '../../src/datasource/DataSourceConfig'; +import { createCacheInitializerFactory } from '../../src/datasource/fdv2/CacheInitializer'; +import { FDv2Requestor, makeFDv2Requestor } from '../../src/datasource/fdv2/FDv2Requestor'; +import { createPollingInitializer } from '../../src/datasource/fdv2/PollingInitializer'; +import { createPollingSynchronizer } from '../../src/datasource/fdv2/PollingSynchronizer'; +import { createSynchronizerSlot } from '../../src/datasource/fdv2/SourceManager'; +import { createStreamingBase } from '../../src/datasource/fdv2/StreamingFDv2Base'; +import { createStreamingInitializer } from '../../src/datasource/fdv2/StreamingInitializerFDv2'; +import { createStreamingSynchronizer } from '../../src/datasource/fdv2/StreamingSynchronizerFDv2'; +import { + createDefaultSourceFactoryProvider, + SourceFactoryContext, +} from '../../src/datasource/SourceFactoryProvider'; + +jest.mock('../../src/datasource/fdv2/PollingInitializer'); +jest.mock('../../src/datasource/fdv2/PollingSynchronizer'); +jest.mock('../../src/datasource/fdv2/StreamingFDv2Base'); +jest.mock('../../src/datasource/fdv2/StreamingInitializerFDv2'); +jest.mock('../../src/datasource/fdv2/StreamingSynchronizerFDv2'); +jest.mock('../../src/datasource/fdv2/CacheInitializer'); +jest.mock('../../src/datasource/fdv2/FDv2Requestor'); +jest.mock('../../src/datasource/fdv2/PollingBase'); + +const mockCreatePollingInitializer = createPollingInitializer as jest.Mock; +const mockCreatePollingSynchronizer = createPollingSynchronizer as jest.Mock; +const mockCreateStreamingBase = createStreamingBase as jest.Mock; +const mockCreateStreamingInitializer = createStreamingInitializer as jest.Mock; +const mockCreateStreamingSynchronizer = createStreamingSynchronizer as jest.Mock; +const mockCreateCacheInitializerFactory = createCacheInitializerFactory as jest.Mock; +const mockMakeFDv2Requestor = makeFDv2Requestor as jest.Mock; +const mockCreateSynchronizerSlot = createSynchronizerSlot as jest.Mock; + +jest.mock('../../src/datasource/fdv2/SourceManager', () => ({ + createSynchronizerSlot: jest.fn((factory: any) => ({ + factory, + isFDv1Fallback: false, + state: 'available', + })), +})); + +function makeContext(): Context { + return Context.fromLDContext({ kind: 'user', key: 'test-user' }); +} + +function makePaths(): DataSourcePaths { + return { + pathGet: jest.fn().mockReturnValue('/eval/test-path'), + pathReport: jest.fn().mockReturnValue('/eval/report-path'), + pathPost: jest.fn().mockReturnValue('/eval/post-path'), + pathPing: jest.fn().mockReturnValue('/eval/ping-path'), + }; +} + +function makeSourceFactoryContext(overrides?: Partial): SourceFactoryContext { + return { + requestor: { poll: jest.fn() } as unknown as FDv2Requestor, + requests: {} as Requests, + encoding: {} as Encoding, + serviceEndpoints: new ServiceEndpoints( + 'https://stream.example.com', + 'https://poll.example.com', + 'https://events.example.com', + ), + pollingPaths: makePaths(), + streamingPaths: makePaths(), + baseHeaders: { authorization: 'sdk-key' }, + queryParams: [], + plainContextString: '{"kind":"user","key":"test-user"}', + selectorGetter: () => undefined, + streamInitialReconnectDelay: 1, + pollInterval: 30, + logger: { + debug: jest.fn(), + info: jest.fn(), + warn: jest.fn(), + error: jest.fn(), + } as unknown as LDLogger, + storage: undefined, + crypto: {} as Crypto, + environmentNamespace: 'test-env', + context: makeContext(), + ...overrides, + }; +} + +beforeEach(() => { + jest.clearAllMocks(); + mockCreatePollingInitializer.mockReturnValue({ close: jest.fn() }); + mockCreatePollingSynchronizer.mockReturnValue({ close: jest.fn() }); + mockCreateStreamingBase.mockReturnValue({ + start: jest.fn(), + close: jest.fn(), + takeResult: jest.fn(), + }); + mockCreateStreamingInitializer.mockReturnValue({ close: jest.fn() }); + mockCreateStreamingSynchronizer.mockReturnValue({ close: jest.fn() }); + mockCreateCacheInitializerFactory.mockReturnValue(jest.fn()); + mockMakeFDv2Requestor.mockReturnValue({ poll: jest.fn() }); +}); + +// --- createInitializerFactory --- + +it('creates a PollingInitializer for a polling initializer entry', () => { + const provider = createDefaultSourceFactoryProvider(); + const ctx = makeSourceFactoryContext(); + const entry: InitializerEntry = { type: 'polling' }; + + const factory = provider.createInitializerFactory(entry, ctx); + + expect(factory).toBeDefined(); + const selectorGetter = () => 'some-selector'; + factory!(selectorGetter); + expect(mockCreatePollingInitializer).toHaveBeenCalledWith( + ctx.requestor, + ctx.logger, + selectorGetter, + ); +}); + +it('creates a StreamingInitializer for a streaming initializer entry', () => { + const provider = createDefaultSourceFactoryProvider(); + const ctx = makeSourceFactoryContext(); + const entry: InitializerEntry = { type: 'streaming' }; + + const factory = provider.createInitializerFactory(entry, ctx); + + expect(factory).toBeDefined(); + const selectorGetter = () => 'some-selector'; + factory!(selectorGetter); + expect(mockCreateStreamingBase).toHaveBeenCalledWith( + expect.objectContaining({ + requests: ctx.requests, + serviceEndpoints: ctx.serviceEndpoints, + initialRetryDelayMillis: ctx.streamInitialReconnectDelay * 1000, + }), + ); + expect(mockCreateStreamingInitializer).toHaveBeenCalledWith( + mockCreateStreamingBase.mock.results[0].value, + ); +}); + +it('creates a CacheInitializer for a cache initializer entry', () => { + const provider = createDefaultSourceFactoryProvider(); + const ctx = makeSourceFactoryContext(); + const entry: InitializerEntry = { type: 'cache' }; + + const factory = provider.createInitializerFactory(entry, ctx); + + expect(mockCreateCacheInitializerFactory).toHaveBeenCalledWith({ + storage: ctx.storage, + crypto: ctx.crypto, + environmentNamespace: ctx.environmentNamespace, + context: ctx.context, + logger: ctx.logger, + }); + expect(factory).toBe(mockCreateCacheInitializerFactory.mock.results[0].value); +}); + +it('returns undefined for an unknown initializer entry type', () => { + const provider = createDefaultSourceFactoryProvider(); + const ctx = makeSourceFactoryContext(); + const entry = { type: 'unknown' } as unknown as InitializerEntry; + + const factory = provider.createInitializerFactory(entry, ctx); + + expect(factory).toBeUndefined(); +}); + +// --- createSynchronizerSlot --- + +it('creates a PollingSynchronizer slot for a polling synchronizer entry', () => { + const provider = createDefaultSourceFactoryProvider(); + const ctx = makeSourceFactoryContext(); + const entry: SynchronizerEntry = { type: 'polling' }; + + const slot = provider.createSynchronizerSlot(entry, ctx); + + expect(slot).toBeDefined(); + expect(mockCreateSynchronizerSlot).toHaveBeenCalled(); + + // Invoke the factory that was passed to createSynchronizerSlot + const factoryArg = mockCreateSynchronizerSlot.mock.calls[0][0]; + const selectorGetter = () => 'sel'; + factoryArg(selectorGetter); + expect(mockCreatePollingSynchronizer).toHaveBeenCalledWith( + ctx.requestor, + ctx.logger, + selectorGetter, + ctx.pollInterval * 1000, + ); +}); + +it('creates a StreamingSynchronizer slot for a streaming synchronizer entry', () => { + const provider = createDefaultSourceFactoryProvider(); + const ctx = makeSourceFactoryContext(); + const entry: SynchronizerEntry = { type: 'streaming' }; + + const slot = provider.createSynchronizerSlot(entry, ctx); + + expect(slot).toBeDefined(); + expect(mockCreateSynchronizerSlot).toHaveBeenCalled(); + + // Invoke the factory that was passed to createSynchronizerSlot + const factoryArg = mockCreateSynchronizerSlot.mock.calls[0][0]; + const selectorGetter = () => 'sel'; + factoryArg(selectorGetter); + expect(mockCreateStreamingBase).toHaveBeenCalledWith( + expect.objectContaining({ + requests: ctx.requests, + serviceEndpoints: ctx.serviceEndpoints, + initialRetryDelayMillis: ctx.streamInitialReconnectDelay * 1000, + }), + ); + expect(mockCreateStreamingSynchronizer).toHaveBeenCalledWith( + mockCreateStreamingBase.mock.results[0].value, + ); +}); + +it('returns undefined for an unknown synchronizer entry type', () => { + const provider = createDefaultSourceFactoryProvider(); + const ctx = makeSourceFactoryContext(); + const entry = { type: 'unknown' } as unknown as SynchronizerEntry; + + const slot = provider.createSynchronizerSlot(entry, ctx); + + expect(slot).toBeUndefined(); +}); + +// --- per-entry overrides --- + +it('creates a new requestor when polling entry has endpoint overrides', () => { + const provider = createDefaultSourceFactoryProvider(); + const ctx = makeSourceFactoryContext(); + const entry: InitializerEntry = { + type: 'polling', + endpoints: { pollingBaseUri: 'https://custom-poll.example.com' }, + }; + + const factory = provider.createInitializerFactory(entry, ctx); + expect(factory).toBeDefined(); + + const selectorGetter = () => undefined; + factory!(selectorGetter); + + expect(mockMakeFDv2Requestor).toHaveBeenCalledWith( + ctx.plainContextString, + expect.objectContaining({ + polling: 'https://custom-poll.example.com', + streaming: 'https://stream.example.com', + }), + ctx.pollingPaths, + ctx.requests, + ctx.encoding, + ctx.baseHeaders, + ctx.queryParams, + ); + + // Should use the new requestor, not the context one + const newRequestor = mockMakeFDv2Requestor.mock.results[0].value; + expect(mockCreatePollingInitializer).toHaveBeenCalledWith( + newRequestor, + ctx.logger, + selectorGetter, + ); +}); + +it('uses per-entry pollInterval override for polling synchronizer', () => { + const provider = createDefaultSourceFactoryProvider(); + const ctx = makeSourceFactoryContext({ pollInterval: 30 }); + const entry: SynchronizerEntry = { type: 'polling', pollInterval: 60 }; + + provider.createSynchronizerSlot(entry, ctx); + + const factoryArg = mockCreateSynchronizerSlot.mock.calls[0][0]; + const selectorGetter = () => undefined; + factoryArg(selectorGetter); + + expect(mockCreatePollingSynchronizer).toHaveBeenCalledWith( + ctx.requestor, + ctx.logger, + selectorGetter, + 60000, + ); +}); + +it('uses per-entry initialReconnectDelay override for streaming initializer', () => { + const provider = createDefaultSourceFactoryProvider(); + const ctx = makeSourceFactoryContext({ streamInitialReconnectDelay: 1 }); + const entry: InitializerEntry = { type: 'streaming', initialReconnectDelay: 5 }; + + const factory = provider.createInitializerFactory(entry, ctx); + expect(factory).toBeDefined(); + factory!(() => undefined); + + expect(mockCreateStreamingBase).toHaveBeenCalledWith( + expect.objectContaining({ + initialRetryDelayMillis: 5000, + }), + ); +}); diff --git a/packages/shared/sdk-client/src/datasource/SourceFactoryProvider.ts b/packages/shared/sdk-client/src/datasource/SourceFactoryProvider.ts new file mode 100644 index 0000000000..5ca2a3a679 --- /dev/null +++ b/packages/shared/sdk-client/src/datasource/SourceFactoryProvider.ts @@ -0,0 +1,233 @@ +import { + Context, + Crypto, + Encoding, + LDHeaders, + LDLogger, + Requests, + ServiceEndpoints, + Storage, +} from '@launchdarkly/js-sdk-common'; + +import { EndpointConfig, InitializerEntry, SynchronizerEntry } from '../api/datasource'; +import { DataSourcePaths } from './DataSourceConfig'; +import { createCacheInitializerFactory } from './fdv2/CacheInitializer'; +import { FDv2Requestor, makeFDv2Requestor } from './fdv2/FDv2Requestor'; +import { poll as fdv2Poll } from './fdv2/PollingBase'; +import { createPollingInitializer } from './fdv2/PollingInitializer'; +import { createPollingSynchronizer } from './fdv2/PollingSynchronizer'; +import { createSynchronizerSlot, InitializerFactory, SynchronizerSlot } from './fdv2/SourceManager'; +import { createStreamingBase, PingHandler } from './fdv2/StreamingFDv2Base'; +import { createStreamingInitializer } from './fdv2/StreamingInitializerFDv2'; +import { createStreamingSynchronizer } from './fdv2/StreamingSynchronizerFDv2'; + +/** + * Context needed to create concrete initializer/synchronizer factories + * for a given identify call. Built once per identify and reused across + * mode switches. + */ +export interface SourceFactoryContext { + /** The FDv2 requestor for polling requests. */ + requestor: FDv2Requestor; + /** Platform request abstraction. */ + requests: Requests; + /** Platform encoding abstraction. */ + encoding: Encoding; + /** Service endpoint configuration. */ + serviceEndpoints: ServiceEndpoints; + /** The polling endpoint paths. */ + pollingPaths: DataSourcePaths; + /** The streaming endpoint paths. */ + streamingPaths: DataSourcePaths; + /** Default HTTP headers. */ + baseHeaders: LDHeaders; + /** Query parameters for requests (e.g., auth, secure mode hash). */ + queryParams: { key: string; value: string }[]; + /** JSON-serialized evaluation context. */ + plainContextString: string; + /** Getter for the current selector (basis) string. */ + selectorGetter: () => string | undefined; + /** Initial reconnect delay for streaming, in seconds. */ + streamInitialReconnectDelay: number; + /** Poll interval in seconds. */ + pollInterval: number; + /** Logger. */ + logger: LDLogger; + + // Cache-related fields (needed for cache initializer). + /** Platform storage for reading cached data. */ + storage: Storage | undefined; + /** Platform crypto for computing storage keys. */ + crypto: Crypto; + /** Environment namespace (hashed SDK key). */ + environmentNamespace: string; + /** The context being identified. */ + context: Context; +} + +/** + * Converts declarative {@link InitializerEntry} and {@link SynchronizerEntry} + * descriptors from the mode table into concrete {@link InitializerFactory} + * and {@link SynchronizerSlot} instances that the {@link FDv2DataSource} + * orchestrator can use. + */ +export interface SourceFactoryProvider { + /** + * Create an initializer factory from an initializer entry descriptor. + * Returns `undefined` if the entry type is not supported. + */ + createInitializerFactory( + entry: InitializerEntry, + ctx: SourceFactoryContext, + ): InitializerFactory | undefined; + + /** + * Create a synchronizer slot from a synchronizer entry descriptor. + * Returns `undefined` if the entry type is not supported. + */ + createSynchronizerSlot( + entry: SynchronizerEntry, + ctx: SourceFactoryContext, + ): SynchronizerSlot | undefined; +} + +function createPingHandler(ctx: SourceFactoryContext): PingHandler { + return { + handlePing: () => fdv2Poll(ctx.requestor, ctx.selectorGetter(), false, ctx.logger), + }; +} + +/** + * Create a {@link ServiceEndpoints} with per-entry endpoint overrides applied. + * Returns the original endpoints if no overrides are specified. + */ +function resolveEndpoints(ctx: SourceFactoryContext, endpoints?: EndpointConfig): ServiceEndpoints { + if (!endpoints?.pollingBaseUri && !endpoints?.streamingBaseUri) { + return ctx.serviceEndpoints; + } + return new ServiceEndpoints( + endpoints.streamingBaseUri ?? ctx.serviceEndpoints.streaming, + endpoints.pollingBaseUri ?? ctx.serviceEndpoints.polling, + ctx.serviceEndpoints.events, + ctx.serviceEndpoints.analyticsEventPath, + ctx.serviceEndpoints.diagnosticEventPath, + ctx.serviceEndpoints.includeAuthorizationHeader, + ctx.serviceEndpoints.payloadFilterKey, + ); +} + +/** + * Get the FDv2 requestor for a polling entry. If the entry has custom + * endpoints, creates a new requestor targeting those endpoints. Otherwise + * returns the shared requestor from the context. + */ +function resolvePollingRequestor( + ctx: SourceFactoryContext, + endpoints?: EndpointConfig, +): FDv2Requestor { + if (!endpoints?.pollingBaseUri) { + return ctx.requestor; + } + const overriddenEndpoints = resolveEndpoints(ctx, endpoints); + return makeFDv2Requestor( + ctx.plainContextString, + overriddenEndpoints, + ctx.pollingPaths, + ctx.requests, + ctx.encoding, + ctx.baseHeaders, + ctx.queryParams, + ); +} + +/** + * Creates a {@link SourceFactoryProvider} that handles `cache`, `polling`, + * and `streaming` data source entries. + */ +export function createDefaultSourceFactoryProvider(): SourceFactoryProvider { + return { + createInitializerFactory( + entry: InitializerEntry, + ctx: SourceFactoryContext, + ): InitializerFactory | undefined { + switch (entry.type) { + case 'polling': { + const requestor = resolvePollingRequestor(ctx, entry.endpoints); + return (sg: () => string | undefined) => + createPollingInitializer(requestor, ctx.logger, sg); + } + + case 'streaming': { + const entryEndpoints = resolveEndpoints(ctx, entry.endpoints); + return (sg: () => string | undefined) => { + const streamUriPath = ctx.streamingPaths.pathGet(ctx.encoding, ctx.plainContextString); + const base = createStreamingBase({ + requests: ctx.requests, + serviceEndpoints: entryEndpoints, + streamUriPath, + parameters: ctx.queryParams, + selectorGetter: sg, + headers: ctx.baseHeaders, + initialRetryDelayMillis: + (entry.initialReconnectDelay ?? ctx.streamInitialReconnectDelay) * 1000, + logger: ctx.logger, + pingHandler: createPingHandler(ctx), + }); + return createStreamingInitializer(base); + }; + } + + case 'cache': + return createCacheInitializerFactory({ + storage: ctx.storage, + crypto: ctx.crypto, + environmentNamespace: ctx.environmentNamespace, + context: ctx.context, + logger: ctx.logger, + }); + + default: + return undefined; + } + }, + + createSynchronizerSlot( + entry: SynchronizerEntry, + ctx: SourceFactoryContext, + ): SynchronizerSlot | undefined { + switch (entry.type) { + case 'polling': { + const intervalMs = (entry.pollInterval ?? ctx.pollInterval) * 1000; + const requestor = resolvePollingRequestor(ctx, entry.endpoints); + const factory = (sg: () => string | undefined) => + createPollingSynchronizer(requestor, ctx.logger, sg, intervalMs); + return createSynchronizerSlot(factory); + } + + case 'streaming': { + const entryEndpoints = resolveEndpoints(ctx, entry.endpoints); + const factory = (sg: () => string | undefined) => { + const streamUriPath = ctx.streamingPaths.pathGet(ctx.encoding, ctx.plainContextString); + const base = createStreamingBase({ + requests: ctx.requests, + serviceEndpoints: entryEndpoints, + streamUriPath, + parameters: ctx.queryParams, + selectorGetter: sg, + headers: ctx.baseHeaders, + initialRetryDelayMillis: + (entry.initialReconnectDelay ?? ctx.streamInitialReconnectDelay) * 1000, + logger: ctx.logger, + pingHandler: createPingHandler(ctx), + }); + return createStreamingSynchronizer(base); + }; + return createSynchronizerSlot(factory); + } + + default: + return undefined; + } + }, + }; +} diff --git a/packages/shared/sdk-client/src/index.ts b/packages/shared/sdk-client/src/index.ts index 048f24cc95..3243cec7fc 100644 --- a/packages/shared/sdk-client/src/index.ts +++ b/packages/shared/sdk-client/src/index.ts @@ -117,3 +117,10 @@ export { BROWSER_TRANSITION_TABLE, DESKTOP_TRANSITION_TABLE, } from './datasource/ModeResolver'; + +// FDv2 source factory provider — converts declarative config to concrete factories. +export type { + SourceFactoryContext, + SourceFactoryProvider, +} from './datasource/SourceFactoryProvider'; +export { createDefaultSourceFactoryProvider } from './datasource/SourceFactoryProvider'; From 97568e45a937365d4096b4a4e1a177a902da2cdf Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Fri, 20 Mar 2026 09:47:34 -0700 Subject: [PATCH 3/4] fix: address PR review feedback on SourceFactoryProvider - Fix ping handler to use per-entry endpoint-overridden requestor and the factory's selector getter (sg) for fresh selectors on each ping - Extract duplicated streaming base config into shared buildStreamingBase helper - Restructure SourceFactoryContext to group polling/streaming config into nested objects (polling.paths, polling.intervalSeconds, streaming.paths, streaming.initialReconnectDelaySeconds) - Resolve merge conflict in FDv2DataSource (warn -> debug) --- .../datasource/SourceFactoryProvider.test.ts | 77 +++++++++++-- .../src/datasource/SourceFactoryProvider.ts | 106 +++++++++--------- .../src/datasource/fdv2/FDv2DataSource.ts | 4 - 3 files changed, 122 insertions(+), 65 deletions(-) diff --git a/packages/shared/sdk-client/__tests__/datasource/SourceFactoryProvider.test.ts b/packages/shared/sdk-client/__tests__/datasource/SourceFactoryProvider.test.ts index 2faeec0b8c..65ba95890b 100644 --- a/packages/shared/sdk-client/__tests__/datasource/SourceFactoryProvider.test.ts +++ b/packages/shared/sdk-client/__tests__/datasource/SourceFactoryProvider.test.ts @@ -11,6 +11,7 @@ import { InitializerEntry, SynchronizerEntry } from '../../src/api/datasource'; import { DataSourcePaths } from '../../src/datasource/DataSourceConfig'; import { createCacheInitializerFactory } from '../../src/datasource/fdv2/CacheInitializer'; import { FDv2Requestor, makeFDv2Requestor } from '../../src/datasource/fdv2/FDv2Requestor'; +import { poll as fdv2Poll } from '../../src/datasource/fdv2/PollingBase'; import { createPollingInitializer } from '../../src/datasource/fdv2/PollingInitializer'; import { createPollingSynchronizer } from '../../src/datasource/fdv2/PollingSynchronizer'; import { createSynchronizerSlot } from '../../src/datasource/fdv2/SourceManager'; @@ -39,6 +40,7 @@ const mockCreateStreamingSynchronizer = createStreamingSynchronizer as jest.Mock const mockCreateCacheInitializerFactory = createCacheInitializerFactory as jest.Mock; const mockMakeFDv2Requestor = makeFDv2Requestor as jest.Mock; const mockCreateSynchronizerSlot = createSynchronizerSlot as jest.Mock; +const mockFdv2Poll = fdv2Poll as jest.Mock; jest.mock('../../src/datasource/fdv2/SourceManager', () => ({ createSynchronizerSlot: jest.fn((factory: any) => ({ @@ -71,20 +73,23 @@ function makeSourceFactoryContext(overrides?: Partial): So 'https://poll.example.com', 'https://events.example.com', ), - pollingPaths: makePaths(), - streamingPaths: makePaths(), baseHeaders: { authorization: 'sdk-key' }, queryParams: [], plainContextString: '{"kind":"user","key":"test-user"}', - selectorGetter: () => undefined, - streamInitialReconnectDelay: 1, - pollInterval: 30, logger: { debug: jest.fn(), info: jest.fn(), warn: jest.fn(), error: jest.fn(), } as unknown as LDLogger, + polling: { + paths: makePaths(), + intervalSeconds: 30, + }, + streaming: { + paths: makePaths(), + initialReconnectDelaySeconds: 1, + }, storage: undefined, crypto: {} as Crypto, environmentNamespace: 'test-env', @@ -141,7 +146,7 @@ it('creates a StreamingInitializer for a streaming initializer entry', () => { expect.objectContaining({ requests: ctx.requests, serviceEndpoints: ctx.serviceEndpoints, - initialRetryDelayMillis: ctx.streamInitialReconnectDelay * 1000, + initialRetryDelayMillis: ctx.streaming.initialReconnectDelaySeconds * 1000, }), ); expect(mockCreateStreamingInitializer).toHaveBeenCalledWith( @@ -196,7 +201,7 @@ it('creates a PollingSynchronizer slot for a polling synchronizer entry', () => ctx.requestor, ctx.logger, selectorGetter, - ctx.pollInterval * 1000, + ctx.polling.intervalSeconds * 1000, ); }); @@ -218,7 +223,7 @@ it('creates a StreamingSynchronizer slot for a streaming synchronizer entry', () expect.objectContaining({ requests: ctx.requests, serviceEndpoints: ctx.serviceEndpoints, - initialRetryDelayMillis: ctx.streamInitialReconnectDelay * 1000, + initialRetryDelayMillis: ctx.streaming.initialReconnectDelaySeconds * 1000, }), ); expect(mockCreateStreamingSynchronizer).toHaveBeenCalledWith( @@ -258,7 +263,7 @@ it('creates a new requestor when polling entry has endpoint overrides', () => { polling: 'https://custom-poll.example.com', streaming: 'https://stream.example.com', }), - ctx.pollingPaths, + ctx.polling.paths, ctx.requests, ctx.encoding, ctx.baseHeaders, @@ -276,7 +281,7 @@ it('creates a new requestor when polling entry has endpoint overrides', () => { it('uses per-entry pollInterval override for polling synchronizer', () => { const provider = createDefaultSourceFactoryProvider(); - const ctx = makeSourceFactoryContext({ pollInterval: 30 }); + const ctx = makeSourceFactoryContext({ polling: { paths: makePaths(), intervalSeconds: 30 } }); const entry: SynchronizerEntry = { type: 'polling', pollInterval: 60 }; provider.createSynchronizerSlot(entry, ctx); @@ -295,7 +300,9 @@ it('uses per-entry pollInterval override for polling synchronizer', () => { it('uses per-entry initialReconnectDelay override for streaming initializer', () => { const provider = createDefaultSourceFactoryProvider(); - const ctx = makeSourceFactoryContext({ streamInitialReconnectDelay: 1 }); + const ctx = makeSourceFactoryContext({ + streaming: { paths: makePaths(), initialReconnectDelaySeconds: 1 }, + }); const entry: InitializerEntry = { type: 'streaming', initialReconnectDelay: 5 }; const factory = provider.createInitializerFactory(entry, ctx); @@ -308,3 +315,51 @@ it('uses per-entry initialReconnectDelay override for streaming initializer', () }), ); }); + +// --- ping handler --- + +it('ping handler uses the factory selector getter, not a stale reference', () => { + const provider = createDefaultSourceFactoryProvider(); + const ctx = makeSourceFactoryContext(); + const entry: InitializerEntry = { type: 'streaming' }; + + const factory = provider.createInitializerFactory(entry, ctx); + expect(factory).toBeDefined(); + + let currentSelector: string | undefined = 'selector-v1'; + const selectorGetter = () => currentSelector; + factory!(selectorGetter); + + // Extract the pingHandler from the createStreamingBase call + const streamingBaseArgs = mockCreateStreamingBase.mock.calls[0][0]; + const { pingHandler } = streamingBaseArgs; + + // Update the selector after factory creation + currentSelector = 'selector-v2'; + pingHandler.handlePing(); + + // The ping poll should use the fresh selector, not 'selector-v1' + expect(mockFdv2Poll).toHaveBeenCalledWith(expect.anything(), 'selector-v2', false, ctx.logger); +}); + +it('ping handler uses per-entry endpoint-overridden requestor', () => { + const provider = createDefaultSourceFactoryProvider(); + const ctx = makeSourceFactoryContext(); + const entry: InitializerEntry = { + type: 'streaming', + endpoints: { pollingBaseUri: 'https://custom-poll.example.com' }, + }; + + const factory = provider.createInitializerFactory(entry, ctx); + expect(factory).toBeDefined(); + factory!(() => undefined); + + // Extract the pingHandler from the createStreamingBase call + const streamingBaseArgs = mockCreateStreamingBase.mock.calls[0][0]; + const { pingHandler } = streamingBaseArgs; + pingHandler.handlePing(); + + // The ping poll should use the overridden requestor, not ctx.requestor + const overriddenRequestor = mockMakeFDv2Requestor.mock.results[0].value; + expect(mockFdv2Poll).toHaveBeenCalledWith(overriddenRequestor, undefined, false, ctx.logger); +}); diff --git a/packages/shared/sdk-client/src/datasource/SourceFactoryProvider.ts b/packages/shared/sdk-client/src/datasource/SourceFactoryProvider.ts index 5ca2a3a679..7a1b059617 100644 --- a/packages/shared/sdk-client/src/datasource/SourceFactoryProvider.ts +++ b/packages/shared/sdk-client/src/datasource/SourceFactoryProvider.ts @@ -35,25 +35,31 @@ export interface SourceFactoryContext { encoding: Encoding; /** Service endpoint configuration. */ serviceEndpoints: ServiceEndpoints; - /** The polling endpoint paths. */ - pollingPaths: DataSourcePaths; - /** The streaming endpoint paths. */ - streamingPaths: DataSourcePaths; /** Default HTTP headers. */ baseHeaders: LDHeaders; /** Query parameters for requests (e.g., auth, secure mode hash). */ queryParams: { key: string; value: string }[]; /** JSON-serialized evaluation context. */ plainContextString: string; - /** Getter for the current selector (basis) string. */ - selectorGetter: () => string | undefined; - /** Initial reconnect delay for streaming, in seconds. */ - streamInitialReconnectDelay: number; - /** Poll interval in seconds. */ - pollInterval: number; /** Logger. */ logger: LDLogger; + /** Polling-specific configuration. */ + polling: { + /** The polling endpoint paths. */ + paths: DataSourcePaths; + /** Default poll interval in seconds. */ + intervalSeconds: number; + }; + + /** Streaming-specific configuration. */ + streaming: { + /** The streaming endpoint paths. */ + paths: DataSourcePaths; + /** Default initial reconnect delay in seconds. */ + initialReconnectDelaySeconds: number; + }; + // Cache-related fields (needed for cache initializer). /** Platform storage for reading cached data. */ storage: Storage | undefined; @@ -91,9 +97,13 @@ export interface SourceFactoryProvider { ): SynchronizerSlot | undefined; } -function createPingHandler(ctx: SourceFactoryContext): PingHandler { +function createPingHandler( + requestor: FDv2Requestor, + selectorGetter: () => string | undefined, + logger: LDLogger, +): PingHandler { return { - handlePing: () => fdv2Poll(ctx.requestor, ctx.selectorGetter(), false, ctx.logger), + handlePing: () => fdv2Poll(requestor, selectorGetter(), false, logger), }; } @@ -132,7 +142,7 @@ function resolvePollingRequestor( return makeFDv2Requestor( ctx.plainContextString, overriddenEndpoints, - ctx.pollingPaths, + ctx.polling.paths, ctx.requests, ctx.encoding, ctx.baseHeaders, @@ -140,6 +150,33 @@ function resolvePollingRequestor( ); } +/** + * Build a streaming base instance using per-entry config with context defaults + * as fallbacks. The `sg` selector getter is the canonical source of truth for + * the current selector — both the stream and its ping handler use it. + */ +function buildStreamingBase( + entry: { endpoints?: EndpointConfig; initialReconnectDelay?: number }, + ctx: SourceFactoryContext, + sg: () => string | undefined, +) { + const entryEndpoints = resolveEndpoints(ctx, entry.endpoints); + const requestor = resolvePollingRequestor(ctx, entry.endpoints); + const streamUriPath = ctx.streaming.paths.pathGet(ctx.encoding, ctx.plainContextString); + return createStreamingBase({ + requests: ctx.requests, + serviceEndpoints: entryEndpoints, + streamUriPath, + parameters: ctx.queryParams, + selectorGetter: sg, + headers: ctx.baseHeaders, + initialRetryDelayMillis: + (entry.initialReconnectDelay ?? ctx.streaming.initialReconnectDelaySeconds) * 1000, + logger: ctx.logger, + pingHandler: createPingHandler(requestor, sg, ctx.logger), + }); +} + /** * Creates a {@link SourceFactoryProvider} that handles `cache`, `polling`, * and `streaming` data source entries. @@ -157,25 +194,9 @@ export function createDefaultSourceFactoryProvider(): SourceFactoryProvider { createPollingInitializer(requestor, ctx.logger, sg); } - case 'streaming': { - const entryEndpoints = resolveEndpoints(ctx, entry.endpoints); - return (sg: () => string | undefined) => { - const streamUriPath = ctx.streamingPaths.pathGet(ctx.encoding, ctx.plainContextString); - const base = createStreamingBase({ - requests: ctx.requests, - serviceEndpoints: entryEndpoints, - streamUriPath, - parameters: ctx.queryParams, - selectorGetter: sg, - headers: ctx.baseHeaders, - initialRetryDelayMillis: - (entry.initialReconnectDelay ?? ctx.streamInitialReconnectDelay) * 1000, - logger: ctx.logger, - pingHandler: createPingHandler(ctx), - }); - return createStreamingInitializer(base); - }; - } + case 'streaming': + return (sg: () => string | undefined) => + createStreamingInitializer(buildStreamingBase(entry, ctx, sg)); case 'cache': return createCacheInitializerFactory({ @@ -197,7 +218,7 @@ export function createDefaultSourceFactoryProvider(): SourceFactoryProvider { ): SynchronizerSlot | undefined { switch (entry.type) { case 'polling': { - const intervalMs = (entry.pollInterval ?? ctx.pollInterval) * 1000; + const intervalMs = (entry.pollInterval ?? ctx.polling.intervalSeconds) * 1000; const requestor = resolvePollingRequestor(ctx, entry.endpoints); const factory = (sg: () => string | undefined) => createPollingSynchronizer(requestor, ctx.logger, sg, intervalMs); @@ -205,23 +226,8 @@ export function createDefaultSourceFactoryProvider(): SourceFactoryProvider { } case 'streaming': { - const entryEndpoints = resolveEndpoints(ctx, entry.endpoints); - const factory = (sg: () => string | undefined) => { - const streamUriPath = ctx.streamingPaths.pathGet(ctx.encoding, ctx.plainContextString); - const base = createStreamingBase({ - requests: ctx.requests, - serviceEndpoints: entryEndpoints, - streamUriPath, - parameters: ctx.queryParams, - selectorGetter: sg, - headers: ctx.baseHeaders, - initialRetryDelayMillis: - (entry.initialReconnectDelay ?? ctx.streamInitialReconnectDelay) * 1000, - logger: ctx.logger, - pingHandler: createPingHandler(ctx), - }); - return createStreamingSynchronizer(base); - }; + const factory = (sg: () => string | undefined) => + createStreamingSynchronizer(buildStreamingBase(entry, ctx, sg)); return createSynchronizerSlot(factory); } diff --git a/packages/shared/sdk-client/src/datasource/fdv2/FDv2DataSource.ts b/packages/shared/sdk-client/src/datasource/fdv2/FDv2DataSource.ts index 3652f481bc..60a3bf6508 100644 --- a/packages/shared/sdk-client/src/datasource/fdv2/FDv2DataSource.ts +++ b/packages/shared/sdk-client/src/datasource/fdv2/FDv2DataSource.ts @@ -209,11 +209,7 @@ export function createFDv2DataSource(config: FDv2DataSourceConfig): FDv2DataSour ); if (conditions.promise) { -<<<<<<< rlamb/fdv2-source-factory-provider - logger?.warn('Fallback condition active for current synchronizer.'); -======= logger?.debug('Fallback condition active for current synchronizer.'); ->>>>>>> main } // try/finally ensures conditions are closed on all code paths. From de3921380f5518aafc3bae3568941328c83900e7 Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Fri, 20 Mar 2026 09:55:28 -0700 Subject: [PATCH 4/4] fix: increase bundle size limits for common and sdk-client packages Account for new FDv2 SourceFactoryProvider and related code paths. --- .github/workflows/common.yml | 2 +- .github/workflows/sdk-client.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/common.yml b/.github/workflows/common.yml index f0161d845d..9034131d1d 100644 --- a/.github/workflows/common.yml +++ b/.github/workflows/common.yml @@ -35,4 +35,4 @@ jobs: target_file: 'packages/shared/common/dist/esm/index.mjs' package_name: '@launchdarkly/js-sdk-common' pr_number: ${{ github.event.number }} - size_limit: 26000 + size_limit: 29000 diff --git a/.github/workflows/sdk-client.yml b/.github/workflows/sdk-client.yml index 8b3ba882b1..3f3d91f7cb 100644 --- a/.github/workflows/sdk-client.yml +++ b/.github/workflows/sdk-client.yml @@ -32,4 +32,4 @@ jobs: target_file: 'packages/shared/sdk-client/dist/esm/index.mjs' package_name: '@launchdarkly/js-client-sdk-common' pr_number: ${{ github.event.number }} - size_limit: 24000 + size_limit: 38000