Skip to content

Commit 498216a

Browse files
authored
feat: Add FDv1 polling synchronizer for FDv2 fallback (SDK-1923) (#1159)
1 parent e46476e commit 498216a

4 files changed

Lines changed: 520 additions & 1 deletion

File tree

Lines changed: 352 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,352 @@
1+
import { createFDv1PollingSynchronizer } from '../../../src/datasource/fdv2/FDv1PollingSynchronizer';
2+
import { LDRequestError, Requestor } from '../../../src/datasource/Requestor';
3+
import { makeLogger } from './testHelpers';
4+
5+
const logger = makeLogger();
6+
7+
beforeEach(() => {
8+
jest.useFakeTimers();
9+
jest.clearAllMocks();
10+
});
11+
12+
afterEach(() => {
13+
jest.useRealTimers();
14+
});
15+
16+
function makeFDv1Requestor(response: string): Requestor {
17+
return {
18+
requestPayload: jest.fn().mockResolvedValue(response),
19+
};
20+
}
21+
22+
function makeFDv1Flags(flags: Record<string, { value: any; version?: number }>) {
23+
const result: Record<string, any> = {};
24+
Object.entries(flags).forEach(([key, flag]) => {
25+
result[key] = { version: flag.version ?? 1, value: flag.value };
26+
});
27+
return JSON.stringify(result);
28+
}
29+
30+
it('does not poll until the first call to next', async () => {
31+
const requestor = makeFDv1Requestor(makeFDv1Flags({ flagA: { value: true } }));
32+
const sync = createFDv1PollingSynchronizer(requestor, 30000, logger);
33+
34+
// Advance time — should not have polled yet
35+
await jest.advanceTimersByTimeAsync(0);
36+
expect(requestor.requestPayload).not.toHaveBeenCalled();
37+
38+
// First next() triggers the poll
39+
const nextPromise = sync.next();
40+
await jest.advanceTimersByTimeAsync(0);
41+
42+
const result = await nextPromise;
43+
expect(requestor.requestPayload).toHaveBeenCalledTimes(1);
44+
expect(result.type).toBe('changeSet');
45+
46+
sync.close();
47+
});
48+
49+
it('produces a changeSet with full payload from a successful poll', async () => {
50+
const flags = { flagA: { value: 'yes', version: 5 }, flagB: { value: 42, version: 3 } };
51+
const requestor = makeFDv1Requestor(makeFDv1Flags(flags));
52+
const sync = createFDv1PollingSynchronizer(requestor, 30000, logger);
53+
54+
const nextPromise = sync.next();
55+
await jest.advanceTimersByTimeAsync(0);
56+
57+
const result = await nextPromise;
58+
expect(result.type).toBe('changeSet');
59+
if (result.type === 'changeSet') {
60+
expect(result.payload.type).toBe('full');
61+
expect(result.payload.id).toBe('FDv1Fallback');
62+
expect(result.payload.version).toBe(1);
63+
expect(result.payload.updates).toHaveLength(2);
64+
65+
const flagAUpdate = result.payload.updates.find((u) => u.key === 'flagA');
66+
expect(flagAUpdate).toEqual({
67+
kind: 'flag',
68+
key: 'flagA',
69+
version: 5,
70+
object: { version: 5, value: 'yes' },
71+
});
72+
73+
const flagBUpdate = result.payload.updates.find((u) => u.key === 'flagB');
74+
expect(flagBUpdate).toEqual({
75+
kind: 'flag',
76+
key: 'flagB',
77+
version: 3,
78+
object: { version: 3, value: 42 },
79+
});
80+
}
81+
82+
sync.close();
83+
});
84+
85+
it('produces payloads without a selector', async () => {
86+
const requestor = makeFDv1Requestor(makeFDv1Flags({ flag: { value: true } }));
87+
const sync = createFDv1PollingSynchronizer(requestor, 30000, logger);
88+
89+
const nextPromise = sync.next();
90+
await jest.advanceTimersByTimeAsync(0);
91+
92+
const result = await nextPromise;
93+
expect(result.type).toBe('changeSet');
94+
if (result.type === 'changeSet') {
95+
expect(result.payload.state).toBeUndefined();
96+
}
97+
98+
sync.close();
99+
});
100+
101+
it('produces results with fdv1Fallback set to false', async () => {
102+
const requestor = makeFDv1Requestor(makeFDv1Flags({ flag: { value: true } }));
103+
const sync = createFDv1PollingSynchronizer(requestor, 30000, logger);
104+
105+
const nextPromise = sync.next();
106+
await jest.advanceTimersByTimeAsync(0);
107+
108+
const result = await nextPromise;
109+
expect(result.fdv1Fallback).toBe(false);
110+
111+
sync.close();
112+
});
113+
114+
it('returns shutdown when close is called', async () => {
115+
const requestor = makeFDv1Requestor(makeFDv1Flags({ flag: { value: true } }));
116+
const sync = createFDv1PollingSynchronizer(requestor, 30000, logger);
117+
118+
// Start and consume the first result
119+
const firstPromise = sync.next();
120+
await jest.advanceTimersByTimeAsync(0);
121+
await firstPromise;
122+
123+
// Close, then next should return shutdown
124+
sync.close();
125+
const result = await sync.next();
126+
expect(result.type).toBe('status');
127+
if (result.type === 'status') {
128+
expect(result.state).toBe('shutdown');
129+
}
130+
});
131+
132+
it('does not start polling when close is called before next', async () => {
133+
const requestor = makeFDv1Requestor(makeFDv1Flags({ flag: { value: true } }));
134+
const sync = createFDv1PollingSynchronizer(requestor, 30000, logger);
135+
136+
sync.close();
137+
138+
const result = await sync.next();
139+
expect(result.type).toBe('status');
140+
if (result.type === 'status') {
141+
expect(result.state).toBe('shutdown');
142+
}
143+
expect(requestor.requestPayload).not.toHaveBeenCalled();
144+
});
145+
146+
it('produces interrupted on a recoverable HTTP error and continues polling', async () => {
147+
let callCount = 0;
148+
const requestor: Requestor = {
149+
requestPayload: jest.fn().mockImplementation(() => {
150+
callCount += 1;
151+
if (callCount === 1) {
152+
return Promise.reject(new LDRequestError('Server error', 503));
153+
}
154+
return Promise.resolve(makeFDv1Flags({ flag: { value: 'recovered' } }));
155+
}),
156+
};
157+
158+
const sync = createFDv1PollingSynchronizer(requestor, 1000, logger);
159+
160+
// First poll — recoverable error
161+
const firstPromise = sync.next();
162+
await jest.advanceTimersByTimeAsync(0);
163+
const result1 = await firstPromise;
164+
expect(result1.type).toBe('status');
165+
if (result1.type === 'status') {
166+
expect(result1.state).toBe('interrupted');
167+
expect(result1.fdv1Fallback).toBe(false);
168+
}
169+
170+
// Second poll — success after interval
171+
await jest.advanceTimersByTimeAsync(1000);
172+
const result2 = await sync.next();
173+
expect(result2.type).toBe('changeSet');
174+
175+
sync.close();
176+
});
177+
178+
it('produces terminal error on an unrecoverable HTTP error and stops polling', async () => {
179+
const requestor: Requestor = {
180+
requestPayload: jest.fn().mockRejectedValue(new LDRequestError('Unauthorized', 401)),
181+
};
182+
183+
const sync = createFDv1PollingSynchronizer(requestor, 1000, logger);
184+
185+
const nextPromise = sync.next();
186+
await jest.advanceTimersByTimeAsync(0);
187+
const result = await nextPromise;
188+
189+
expect(result.type).toBe('status');
190+
if (result.type === 'status') {
191+
expect(result.state).toBe('terminal_error');
192+
expect(result.fdv1Fallback).toBe(false);
193+
}
194+
195+
// No more polls should happen
196+
const callCount = (requestor.requestPayload as jest.Mock).mock.calls.length;
197+
await jest.advanceTimersByTimeAsync(5000);
198+
expect((requestor.requestPayload as jest.Mock).mock.calls.length).toBe(callCount);
199+
});
200+
201+
it('produces interrupted on invalid JSON and continues polling', async () => {
202+
let callCount = 0;
203+
const requestor: Requestor = {
204+
requestPayload: jest.fn().mockImplementation(() => {
205+
callCount += 1;
206+
if (callCount === 1) {
207+
return Promise.resolve('not valid json!!!');
208+
}
209+
return Promise.resolve(makeFDv1Flags({ flag: { value: true } }));
210+
}),
211+
};
212+
213+
const sync = createFDv1PollingSynchronizer(requestor, 1000, logger);
214+
215+
// First poll — invalid JSON
216+
const firstPromise = sync.next();
217+
await jest.advanceTimersByTimeAsync(0);
218+
const result1 = await firstPromise;
219+
expect(result1.type).toBe('status');
220+
if (result1.type === 'status') {
221+
expect(result1.state).toBe('interrupted');
222+
}
223+
224+
// Second poll — valid response
225+
await jest.advanceTimersByTimeAsync(1000);
226+
const result2 = await sync.next();
227+
expect(result2.type).toBe('changeSet');
228+
229+
sync.close();
230+
});
231+
232+
it('produces interrupted on non-object JSON and continues polling', async () => {
233+
let callCount = 0;
234+
const requestor: Requestor = {
235+
requestPayload: jest.fn().mockImplementation(() => {
236+
callCount += 1;
237+
if (callCount === 1) {
238+
return Promise.resolve('null');
239+
}
240+
return Promise.resolve(makeFDv1Flags({ flag: { value: true } }));
241+
}),
242+
};
243+
244+
const sync = createFDv1PollingSynchronizer(requestor, 1000, logger);
245+
246+
// First poll — valid JSON but not an object
247+
const firstPromise = sync.next();
248+
await jest.advanceTimersByTimeAsync(0);
249+
const result1 = await firstPromise;
250+
expect(result1.type).toBe('status');
251+
if (result1.type === 'status') {
252+
expect(result1.state).toBe('interrupted');
253+
expect(result1.errorInfo?.kind).toBe('INVALID_DATA');
254+
}
255+
256+
// Second poll — valid response
257+
await jest.advanceTimersByTimeAsync(1000);
258+
const result2 = await sync.next();
259+
expect(result2.type).toBe('changeSet');
260+
261+
sync.close();
262+
});
263+
264+
it('polls at the configured interval', async () => {
265+
let callCount = 0;
266+
const requestor: Requestor = {
267+
requestPayload: jest.fn().mockImplementation(() => {
268+
callCount += 1;
269+
return Promise.resolve(makeFDv1Flags({ flag: { value: callCount } }));
270+
}),
271+
};
272+
273+
const sync = createFDv1PollingSynchronizer(requestor, 5000, logger);
274+
275+
// First poll (immediate on first next())
276+
const firstPromise = sync.next();
277+
await jest.advanceTimersByTimeAsync(0);
278+
await firstPromise;
279+
expect(requestor.requestPayload).toHaveBeenCalledTimes(1);
280+
281+
// Not yet time for second poll
282+
await jest.advanceTimersByTimeAsync(3000);
283+
expect(requestor.requestPayload).toHaveBeenCalledTimes(1);
284+
285+
// Second poll after interval
286+
await jest.advanceTimersByTimeAsync(2000);
287+
const result2 = await sync.next();
288+
expect(requestor.requestPayload).toHaveBeenCalledTimes(2);
289+
expect(result2.type).toBe('changeSet');
290+
291+
sync.close();
292+
});
293+
294+
it('returns successive results from periodic polling', async () => {
295+
let callCount = 0;
296+
const requestor: Requestor = {
297+
requestPayload: jest.fn().mockImplementation(() => {
298+
callCount += 1;
299+
return Promise.resolve(
300+
JSON.stringify({ [`flag${callCount}`]: { version: callCount, value: callCount } }),
301+
);
302+
}),
303+
};
304+
305+
const sync = createFDv1PollingSynchronizer(requestor, 1000, logger);
306+
307+
// First poll
308+
const firstPromise = sync.next();
309+
await jest.advanceTimersByTimeAsync(0);
310+
const result1 = await firstPromise;
311+
expect(result1.type).toBe('changeSet');
312+
if (result1.type === 'changeSet') {
313+
expect(result1.payload.updates[0].key).toBe('flag1');
314+
}
315+
316+
// Second poll
317+
await jest.advanceTimersByTimeAsync(1000);
318+
const result2 = await sync.next();
319+
expect(result2.type).toBe('changeSet');
320+
if (result2.type === 'changeSet') {
321+
expect(result2.payload.updates[0].key).toBe('flag2');
322+
}
323+
324+
sync.close();
325+
});
326+
327+
it('produces a network error as interrupted when requestor throws without status', async () => {
328+
let callCount = 0;
329+
const requestor: Requestor = {
330+
requestPayload: jest.fn().mockImplementation(() => {
331+
callCount += 1;
332+
if (callCount === 1) {
333+
return Promise.reject(new LDRequestError('Network failure'));
334+
}
335+
return Promise.resolve(makeFDv1Flags({ flag: { value: true } }));
336+
}),
337+
};
338+
339+
const sync = createFDv1PollingSynchronizer(requestor, 1000, logger);
340+
341+
const firstPromise = sync.next();
342+
await jest.advanceTimersByTimeAsync(0);
343+
const result = await firstPromise;
344+
345+
expect(result.type).toBe('status');
346+
if (result.type === 'status') {
347+
expect(result.state).toBe('interrupted');
348+
expect(result.errorInfo?.kind).toBe('NETWORK_ERROR');
349+
}
350+
351+
sync.close();
352+
});

0 commit comments

Comments
 (0)