Skip to content

Commit ad986dd

Browse files
committed
chore: bot comments
fixing up the race conditions that could happen when connection mode is changed during identify
1 parent 0892465 commit ad986dd

2 files changed

Lines changed: 200 additions & 11 deletions

File tree

packages/sdk/node-client/__tests__/NodeClient.dataSource.test.ts

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,142 @@ it('does not read cached flags when bootstrap is provided', async () => {
407407
await client.close();
408408
});
409409

410+
it('rejects an in-flight identify immediately when close() is called while the processor awaits its first event', async () => {
411+
// The streaming processor never fires a 'put' event, simulating a slow or stalled stream.
412+
// close() must reject _pendingIdentifyReject promptly so the caller is not left waiting for
413+
// the 5-second identify timeout.
414+
const createEventSource = jest.fn(() => ({
415+
addEventListener: jest.fn(),
416+
close: jest.fn(),
417+
onclose: jest.fn(),
418+
onerror: jest.fn(),
419+
onopen: jest.fn(),
420+
onretrying: jest.fn(),
421+
}));
422+
const fakePlatform = makeMockPlatform({
423+
requests: {
424+
fetch: jest.fn(),
425+
createEventSource: createEventSource as any,
426+
getEventSourceCapabilities: () => ({ readTimeout: true, headers: true, customMethod: true }),
427+
},
428+
});
429+
NodePlatformMock.mockImplementationOnce(() => fakePlatform);
430+
431+
const client = createClient(
432+
'client-side-id',
433+
{ kind: 'user', key: 'bob' },
434+
{
435+
initialConnectionMode: 'streaming',
436+
sendEvents: false,
437+
diagnosticOptOut: true,
438+
logger,
439+
},
440+
);
441+
442+
await client.start({ bootstrap: bootstrapData });
443+
444+
// Identify without bootstrap: cache miss -> _setupConnection -> processor running, no 'put' yet.
445+
const identifyPromise = client.identify({ kind: 'user', key: 'alice' }, { timeout: 5 });
446+
// Allow the identify to reach the streaming processor before closing.
447+
await new Promise((resolve) => setTimeout(resolve, 10));
448+
449+
await client.close();
450+
451+
const result = await identifyPromise;
452+
expect(result.status).toBe('error');
453+
});
454+
455+
it('rejects an in-flight identify when setConnectionMode offline runs after the processor started', async () => {
456+
// The processor phase: identify has passed loadCached and _setupConnection has registered
457+
// _pendingIdentifyReject. The stream never delivers a 'put', so the only way out is the
458+
// reject path.
459+
const createEventSource = jest.fn(() => ({
460+
addEventListener: jest.fn(),
461+
close: jest.fn(),
462+
onclose: jest.fn(),
463+
onerror: jest.fn(),
464+
onopen: jest.fn(),
465+
onretrying: jest.fn(),
466+
}));
467+
const fakePlatform = makeMockPlatform({
468+
requests: {
469+
fetch: jest.fn(),
470+
createEventSource: createEventSource as any,
471+
getEventSourceCapabilities: () => ({ readTimeout: true, headers: true, customMethod: true }),
472+
},
473+
});
474+
NodePlatformMock.mockImplementationOnce(() => fakePlatform);
475+
476+
const client = createClient(
477+
'client-side-id',
478+
{ kind: 'user', key: 'bob' },
479+
{
480+
initialConnectionMode: 'streaming',
481+
sendEvents: false,
482+
diagnosticOptOut: true,
483+
logger,
484+
},
485+
);
486+
487+
await client.start({ bootstrap: bootstrapData });
488+
489+
const identifyPromise = client.identify({ kind: 'user', key: 'alice' }, { timeout: 5 });
490+
await new Promise((resolve) => setTimeout(resolve, 10));
491+
492+
// Go offline while the streaming processor is running and waiting for its first event.
493+
await client.setConnectionMode('offline');
494+
495+
const result = await identifyPromise;
496+
expect(result.status).toBe('error');
497+
498+
await client.close();
499+
});
500+
501+
it('rejects an in-flight identify when setConnectionMode switches modes while the processor is running', async () => {
502+
// Switching from streaming to polling replaces the active processor. The pending identify
503+
// on the old processor must be rejected rather than silently abandoned.
504+
const createEventSource = jest.fn(() => ({
505+
addEventListener: jest.fn(),
506+
close: jest.fn(),
507+
onclose: jest.fn(),
508+
onerror: jest.fn(),
509+
onopen: jest.fn(),
510+
onretrying: jest.fn(),
511+
}));
512+
const fakePlatform = makeMockPlatform({
513+
requests: {
514+
fetch: jest.fn(),
515+
createEventSource: createEventSource as any,
516+
getEventSourceCapabilities: () => ({ readTimeout: true, headers: true, customMethod: true }),
517+
},
518+
});
519+
NodePlatformMock.mockImplementationOnce(() => fakePlatform);
520+
521+
const client = createClient(
522+
'client-side-id',
523+
{ kind: 'user', key: 'bob' },
524+
{
525+
initialConnectionMode: 'streaming',
526+
sendEvents: false,
527+
diagnosticOptOut: true,
528+
logger,
529+
},
530+
);
531+
532+
await client.start({ bootstrap: bootstrapData });
533+
534+
const identifyPromise = client.identify({ kind: 'user', key: 'alice' }, { timeout: 5 });
535+
await new Promise((resolve) => setTimeout(resolve, 10));
536+
537+
// Switch to polling -- the old streaming processor is replaced; the pending identify rejects.
538+
await client.setConnectionMode('polling');
539+
540+
const result = await identifyPromise;
541+
expect(result.status).toBe('error');
542+
543+
await client.close();
544+
});
545+
410546
it('rejects identify rather than hanging when the mode flips to offline mid-identify', async () => {
411547
// Gate the cached-flag read so we can flip the connection mode while identify is parked on
412548
// the await -- reproducing the race where _setupConnection later sees connectionMode==='offline'.

packages/sdk/node-client/src/NodeDataManager.ts

Lines changed: 64 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ const logTag = '[NodeDataManager]';
2121

2222
export default class NodeDataManager extends BaseDataManager {
2323
protected connectionMode: ConnectionMode = 'streaming';
24+
private _pendingIdentifyReject?: (err: Error) => void;
2425

2526
constructor(
2627
platform: Platform,
@@ -65,7 +66,9 @@ export default class NodeDataManager extends BaseDataManager {
6566
}
6667
this.context = context;
6768

68-
const offline = this.connectionMode === 'offline';
69+
// Snapshot the mode before any await so the bootstrap path and the stale-snapshot
70+
// detection below both see a consistent starting point.
71+
const startedOffline = this.connectionMode === 'offline';
6972

7073
// Bootstrap and cache are mutually exclusive: when bootstrap data is provided it
7174
// resolves identify immediately, so we must not also load (and potentially overwrite
@@ -77,17 +80,19 @@ export default class NodeDataManager extends BaseDataManager {
7780
);
7881
}
7982
this._finishIdentifyFromBootstrap(context, identifyOptions, identifyResolve);
80-
if (!offline) {
83+
if (!startedOffline) {
8184
// Open a connection for ongoing updates, but identify is already resolved so no
8285
// callbacks are forwarded.
8386
this._setupConnection(context);
8487
}
8588
return;
8689
}
8790

88-
const waitForNetworkResults = !!identifyOptions?.waitForNetworkResults && !offline;
89-
9091
const loadedFromCache = await this.flagManager.loadCached(context);
92+
// Re-read connectionMode after the await: a concurrent setConnectionMode call may have
93+
// changed it while loadCached was in flight.
94+
const offline = this.connectionMode === 'offline';
95+
const waitForNetworkResults = !!identifyOptions?.waitForNetworkResults && !offline;
9196
let identifyResolved = false;
9297
if (loadedFromCache && !waitForNetworkResults) {
9398
this._debugLog('Identify completing with cached flags');
@@ -96,6 +101,15 @@ export default class NodeDataManager extends BaseDataManager {
96101
}
97102

98103
if (offline) {
104+
if (!startedOffline) {
105+
// The connection mode changed to offline while we were awaiting the cache. Reject
106+
// rather than silently resolve so the caller knows the identify did not complete
107+
// in the originally-requested mode.
108+
if (!identifyResolved) {
109+
identifyReject(new Error("Connection mode changed to 'offline' during identify."));
110+
}
111+
return;
112+
}
99113
if (loadedFromCache) {
100114
this._debugLog('Offline identify - using cached flags.');
101115
} else {
@@ -142,6 +156,22 @@ export default class NodeDataManager extends BaseDataManager {
142156
return;
143157
}
144158

159+
// Wrap callbacks so _pendingIdentifyReject is cleared as soon as the identify settles,
160+
// preventing a stale reject from firing if setConnectionMode runs after resolution.
161+
const wrappedResolve = identifyResolve
162+
? () => {
163+
this._pendingIdentifyReject = undefined;
164+
identifyResolve();
165+
}
166+
: undefined;
167+
const wrappedReject = identifyReject
168+
? (err: Error) => {
169+
this._pendingIdentifyReject = undefined;
170+
identifyReject(err);
171+
}
172+
: undefined;
173+
this._pendingIdentifyReject = wrappedReject;
174+
145175
const plainContextString = JSON.stringify(rawContext);
146176
const requestor = makeRequestor(
147177
plainContextString,
@@ -163,27 +193,27 @@ export default class NodeDataManager extends BaseDataManager {
163193
rawContext,
164194
context,
165195
requestor,
166-
identifyResolve,
167-
identifyReject,
196+
wrappedResolve,
197+
wrappedReject,
168198
);
169199
break;
170200
case 'polling':
171201
this.createPollingProcessor(
172202
rawContext,
173203
context,
174204
requestor,
175-
identifyResolve,
176-
identifyReject,
205+
wrappedResolve,
206+
wrappedReject,
177207
);
178208
break;
179209
default:
180210
this.logger.warn(
181211
`${logTag} _setupConnection called with unsupported connectionMode '${this.connectionMode}'.`,
182212
);
183213
this.updateProcessor = undefined;
184-
// The mode may have changed to 'offline' while identify was awaiting the cache; reject
185-
// rather than leave the identify promise to hang until its timeout.
186-
identifyReject?.(
214+
// connectionMode is an unsupported value; reject the in-flight identify so the
215+
// promise does not hang until its timeout.
216+
wrappedReject?.(
187217
new Error(`Connection mode changed to '${this.connectionMode}' during identify.`),
188218
);
189219
return;
@@ -207,12 +237,26 @@ export default class NodeDataManager extends BaseDataManager {
207237

208238
switch (mode) {
209239
case 'offline':
240+
// Reject any in-flight identify before closing the processor -- the processor's
241+
// close() does not invoke pending callbacks, so without this the identify promise
242+
// would hang until its timeout.
243+
if (this._pendingIdentifyReject) {
244+
const reject = this._pendingIdentifyReject;
245+
this._pendingIdentifyReject = undefined;
246+
reject(new Error("Connection mode changed to 'offline' during identify."));
247+
}
210248
this.updateProcessor?.close();
211249
this.updateProcessor = undefined;
212250
break;
213251
case 'polling':
214252
case 'streaming':
215253
if (this.context) {
254+
// Reject any in-flight identify from the previous processor before replacing it.
255+
if (this._pendingIdentifyReject) {
256+
const reject = this._pendingIdentifyReject;
257+
this._pendingIdentifyReject = undefined;
258+
reject(new Error(`Connection mode changed to '${mode}' during identify.`));
259+
}
216260
this._setupConnection(this.context);
217261
}
218262
break;
@@ -227,4 +271,13 @@ export default class NodeDataManager extends BaseDataManager {
227271
getConnectionMode(): ConnectionMode {
228272
return this.connectionMode;
229273
}
274+
275+
override close(): void {
276+
if (this._pendingIdentifyReject) {
277+
const reject = this._pendingIdentifyReject;
278+
this._pendingIdentifyReject = undefined;
279+
reject(new Error('Client has been closed.'));
280+
}
281+
super.close();
282+
}
230283
}

0 commit comments

Comments
 (0)