Skip to content

Commit 8a128ef

Browse files
nattb8claude
andauthored
feat(audience): exponential backoff, 429/Retry-After, HTTP timeout, 4xx drop (SDK-291) (#2873)
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent fc8b5e3 commit 8a128ef

5 files changed

Lines changed: 405 additions & 29 deletions

File tree

packages/audience/core/src/errors.ts

Lines changed: 55 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -46,42 +46,51 @@ export class TransportError extends Error {
4646
* `ok: true` means the backend accepted the payload (HTTP 2xx). On
4747
* `ok: false`, `error` is always populated with a structured reason.
4848
*
49-
* Implementations of `HttpSend` MUST NOT reject failures travel
49+
* Implementations of `HttpSend` MUST NOT reject; failures travel
5050
* through this result type. Callers (notably `MessageQueue.flushUnload`)
5151
* rely on this contract for fire-and-forget paths.
52+
*
53+
* `retryAfterMs` is set when the server returned 429 with a parseable
54+
* `Retry-After` header. The queue uses this to override the exponential
55+
* backoff schedule with the server-supplied delay.
5256
*/
5357
export interface TransportResult {
5458
ok: boolean;
5559
error?: TransportError;
60+
retryAfterMs?: number;
5661
}
5762

5863
/**
5964
* Stable, machine-readable code identifying the kind of audience SDK
6065
* failure. Studios can branch on this in their `onError` handler.
6166
*
62-
* - `'FLUSH_FAILED'`POST to `/v1/audience/messages` returned non-2xx.
63-
* - `'CONSENT_SYNC_FAILED'`PUT to `/v1/audience/tracking-consent` returned non-2xx.
64-
* - `'NETWORK_ERROR'`fetch rejected before a response was received
67+
* - `'FLUSH_FAILED'`:POST to `/v1/audience/messages` returned non-2xx.
68+
* - `'CONSENT_SYNC_FAILED'`:PUT to `/v1/audience/tracking-consent` returned non-2xx.
69+
* - `'NETWORK_ERROR'`:fetch rejected before a response was received
6570
* (network failure, CORS, DNS, etc.).
66-
* - `'VALIDATION_REJECTED'` — backend returned 2xx but the body reported
67-
* `rejected > 0`. Terminal: retrying won't help, the
68-
* messages were dropped from the queue. Inspect
69-
* `responseBody` for the per-message detail when the
70-
* backend provides it.
71+
* - `'VALIDATION_REJECTED'`:backend returned 2xx but the body reported
72+
* `rejected > 0`, or the server returned a 4xx (non-429)
73+
* indicating the payload was malformed or unauthorised.
74+
* Terminal: retrying won't help, the messages were dropped
75+
* from the queue.
76+
* - `'RATE_LIMITED'`:server returned 429. The batch is retained and will
77+
* be retried after the backoff window (honoring
78+
* `Retry-After` when present).
7179
*/
7280
export type AudienceErrorCode =
7381
| 'FLUSH_FAILED'
7482
| 'CONSENT_SYNC_FAILED'
7583
| 'NETWORK_ERROR'
76-
| 'VALIDATION_REJECTED';
84+
| 'VALIDATION_REJECTED'
85+
| 'RATE_LIMITED';
7786

7887
/**
7988
* Public error type passed to the SDK's `onError` callback. Wraps the
8089
* low-level {@link TransportError} and adds a closed `code` union plus a
8190
* human-readable `message`.
8291
*
8392
* Lives in `@imtbl/audience-core` so every surface (web, pixel, unity,
84-
* unreal) reports failures through the same shape no per-package
93+
* unreal) reports failures through the same shape, with no per-package
8594
* error class, no duplicated mapping logic.
8695
*
8796
* Is an instance of `Error` so it can be thrown, logged, or passed to
@@ -126,7 +135,7 @@ export class AudienceError extends Error {
126135
* own copy of `status === 0 → NETWORK_ERROR` mapping logic.
127136
*
128137
* @param err The transport-level failure.
129-
* @param source Which subsystem hit the error selects the error code
138+
* @param source Which subsystem hit the error; selects the error code
130139
* and shapes the human message.
131140
* @param count For `'flush'` failures, the number of messages in the
132141
* batch. Used in the human-readable message; ignored for
@@ -137,7 +146,7 @@ export function toAudienceError(
137146
source: 'flush' | 'consent',
138147
count?: number,
139148
): AudienceError {
140-
// Network failure no HTTP response received.
149+
// Network failure: no HTTP response received.
141150
if (err.status === 0) {
142151
return new AudienceError({
143152
code: 'NETWORK_ERROR',
@@ -150,8 +159,8 @@ export function toAudienceError(
150159
});
151160
}
152161

153-
// 2xx response with backend-rejected messages. Terminal, do not retry
154-
// the only way ok:false comes back with a 2xx status is when httpSend
162+
// 2xx response with backend-rejected messages. Terminal, do not retry.
163+
// The only way ok:false comes back with a 2xx status is when httpSend
155164
// detected `rejected > 0` in the parsed response body.
156165
if (err.status >= 200 && err.status < 300) {
157166
const body = err.body as { accepted?: number; rejected?: number } | undefined;
@@ -166,7 +175,34 @@ export function toAudienceError(
166175
});
167176
}
168177

169-
// Generic HTTP failure (4xx / 5xx).
178+
// 429: rate limited, retryable. Batch is kept; queue applies backoff.
179+
if (err.status === 429) {
180+
return new AudienceError({
181+
code: 'RATE_LIMITED',
182+
message: source === 'flush'
183+
? 'Flush rate limited (429)'
184+
: 'Consent sync rate limited (429)',
185+
status: err.status,
186+
endpoint: err.endpoint,
187+
responseBody: err.body,
188+
});
189+
}
190+
191+
// 4xx (non-429): server deterministically rejected the payload. Terminal:
192+
// a bad publishable key or malformed body won't be fixed by retrying.
193+
if (err.status >= 400 && err.status < 500) {
194+
return new AudienceError({
195+
code: source === 'flush' ? 'VALIDATION_REJECTED' : 'CONSENT_SYNC_FAILED',
196+
message: source === 'flush'
197+
? `Flush rejected with status ${err.status}`
198+
: `Consent sync failed with status ${err.status}`,
199+
status: err.status,
200+
endpoint: err.endpoint,
201+
responseBody: err.body,
202+
});
203+
}
204+
205+
// 5xx or other non-2xx/4xx: server unhealthy. Keep batch, apply backoff.
170206
return new AudienceError({
171207
code: source === 'flush' ? 'FLUSH_FAILED' : 'CONSENT_SYNC_FAILED',
172208
message: source === 'flush'
@@ -183,13 +219,13 @@ export function toAudienceError(
183219
* Invoke a studio-supplied `onError` callback, swallowing any exception
184220
* it throws.
185221
*
186-
* Used by {@link MessageQueue} and {@link createConsentManager} both
222+
* Used by {@link MessageQueue} and {@link createConsentManager}; both
187223
* must not wedge their internal state machines on a badly-written handler.
188224
* Centralised here to keep the swallow-and-continue semantics identical
189225
* across every audience surface and avoid duplicating the try/catch at
190226
* each call site.
191227
*
192-
* Intentionally not re-exported from `index.ts` this is an internal
228+
* Intentionally not re-exported from `index.ts`; this is an internal
193229
* helper, not public API.
194230
*/
195231
export function invokeOnError(
@@ -200,6 +236,6 @@ export function invokeOnError(
200236
try {
201237
onError(err);
202238
} catch {
203-
// Swallow handler must not crash the state machine.
239+
// Swallow; handler must not crash the state machine.
204240
}
205241
}

packages/audience/core/src/queue.test.ts

Lines changed: 177 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ describe('MessageQueue', () => {
9393
expect(send).not.toHaveBeenCalled();
9494

9595
queue.enqueue(makeMessage('2'));
96-
// flush is async await the microtask
96+
// flush is async, await the microtask
9797
await Promise.resolve();
9898
expect(send).toHaveBeenCalledTimes(1);
9999
});
@@ -326,6 +326,180 @@ describe('MessageQueue', () => {
326326
});
327327
});
328328

329+
describe('exponential backoff', () => {
330+
it('skips flush while inside the backoff window', async () => {
331+
const start = 1_000_000;
332+
jest.setSystemTime(start);
333+
const send = jest.fn<ReturnType<HttpSend>, Parameters<HttpSend>>().mockResolvedValue(failResult);
334+
const queue = createQueue(send);
335+
336+
queue.enqueue(makeMessage('1'));
337+
338+
// First flush: records failure, sets backoff to start+5000
339+
await queue.flush();
340+
expect(send).toHaveBeenCalledTimes(1);
341+
342+
// Still inside backoff window: flush is a no-op
343+
jest.setSystemTime(start + 4_999);
344+
await queue.flush();
345+
expect(send).toHaveBeenCalledTimes(1);
346+
347+
// Past backoff window: flush proceeds
348+
jest.setSystemTime(start + 5_001);
349+
await queue.flush();
350+
expect(send).toHaveBeenCalledTimes(2);
351+
});
352+
353+
it('escalates backoff: 5s → 10s → 20s → 40s → 60s', async () => {
354+
// Each step: trigger a failure, assert blocked before window, assert unblocked after.
355+
const send = jest.fn<ReturnType<HttpSend>, Parameters<HttpSend>>().mockResolvedValue(failResult);
356+
const queue = createQueue(send);
357+
queue.enqueue(makeMessage('1'));
358+
359+
let now = 1_000_000;
360+
let calls = 0;
361+
362+
const step = async (delay: number) => {
363+
jest.setSystemTime(now);
364+
await queue.flush();
365+
calls++;
366+
expect(send).toHaveBeenCalledTimes(calls);
367+
368+
jest.setSystemTime(now + delay - 1);
369+
await queue.flush();
370+
expect(send).toHaveBeenCalledTimes(calls); // still blocked
371+
372+
now += delay + 1;
373+
jest.setSystemTime(now);
374+
};
375+
376+
await step(5_000);
377+
await step(10_000);
378+
await step(20_000);
379+
await step(40_000);
380+
await step(60_000);
381+
});
382+
383+
it('resets backoff on a successful flush', async () => {
384+
const start = 1_000_000;
385+
jest.setSystemTime(start);
386+
const send = jest.fn<ReturnType<HttpSend>, Parameters<HttpSend>>()
387+
.mockResolvedValueOnce(failResult)
388+
.mockResolvedValue(okResult);
389+
const queue = createQueue(send);
390+
391+
queue.enqueue(makeMessage('1'));
392+
queue.enqueue(makeMessage('2'));
393+
394+
// First flush fails; backoff starts
395+
await queue.flush();
396+
expect(send).toHaveBeenCalledTimes(1);
397+
398+
// Advance past the 5s window; second flush succeeds, backoff resets
399+
jest.setSystemTime(start + 5_001);
400+
await queue.flush();
401+
expect(send).toHaveBeenCalledTimes(2);
402+
403+
// Should be able to flush immediately after reset
404+
queue.enqueue(makeMessage('3'));
405+
await queue.flush();
406+
expect(send).toHaveBeenCalledTimes(3);
407+
});
408+
409+
it('uses Retry-After delay when server supplies it on 429', async () => {
410+
const start = 1_000_000;
411+
jest.setSystemTime(start);
412+
const rateLimitResult: TransportResult = {
413+
ok: false,
414+
error: new TransportError({ status: 429, endpoint: 'https://api.immutable.com/v1/audience/messages' }),
415+
retryAfterMs: 30_000,
416+
};
417+
const send = jest.fn<ReturnType<HttpSend>, Parameters<HttpSend>>()
418+
.mockResolvedValueOnce(rateLimitResult)
419+
.mockResolvedValue(okResult);
420+
const queue = createQueue(send);
421+
422+
queue.enqueue(makeMessage('1'));
423+
424+
await queue.flush();
425+
expect(send).toHaveBeenCalledTimes(1);
426+
427+
// 29s: still inside Retry-After window
428+
jest.setSystemTime(start + 29_000);
429+
await queue.flush();
430+
expect(send).toHaveBeenCalledTimes(1);
431+
432+
// Past the window
433+
jest.setSystemTime(start + 30_001);
434+
await queue.flush();
435+
expect(send).toHaveBeenCalledTimes(2);
436+
expect(queue.length).toBe(0);
437+
});
438+
439+
it('fires RATE_LIMITED via onError on 429 and keeps the batch', async () => {
440+
const onError = jest.fn();
441+
const rateLimitResult: TransportResult = {
442+
ok: false,
443+
error: new TransportError({ status: 429, endpoint: 'https://api.immutable.com/v1/audience/messages' }),
444+
};
445+
const send = jest.fn<ReturnType<HttpSend>, Parameters<HttpSend>>().mockResolvedValue(rateLimitResult);
446+
const queue = createQueue(send, { onError });
447+
448+
queue.enqueue(makeMessage('1'));
449+
await queue.flush();
450+
451+
expect(queue.length).toBe(1);
452+
expect(onError).toHaveBeenCalledTimes(1);
453+
expect(onError.mock.calls[0][0].code).toBe('RATE_LIMITED');
454+
expect(onError.mock.calls[0][0].status).toBe(429);
455+
});
456+
});
457+
458+
describe('4xx drop', () => {
459+
it('drops batch and fires VALIDATION_REJECTED on non-retryable 4xx', async () => {
460+
const onError = jest.fn();
461+
const send = jest.fn<ReturnType<HttpSend>, Parameters<HttpSend>>().mockResolvedValue({
462+
ok: false,
463+
error: new TransportError({
464+
status: 401,
465+
endpoint: 'https://api.immutable.com/v1/audience/messages',
466+
body: 'Unauthorized',
467+
}),
468+
});
469+
const queue = createQueue(send, { onError });
470+
471+
queue.enqueue(makeMessage('1'));
472+
queue.enqueue(makeMessage('2'));
473+
await queue.flush();
474+
475+
expect(queue.length).toBe(0);
476+
expect(onError).toHaveBeenCalledTimes(1);
477+
const err = onError.mock.calls[0][0];
478+
expect(err.code).toBe('VALIDATION_REJECTED');
479+
expect(err.status).toBe(401);
480+
});
481+
482+
it('drops batch and fires VALIDATION_REJECTED on 400', async () => {
483+
const onError = jest.fn();
484+
const send = jest.fn<ReturnType<HttpSend>, Parameters<HttpSend>>().mockResolvedValue({
485+
ok: false,
486+
error: new TransportError({
487+
status: 400,
488+
endpoint: 'https://api.immutable.com/v1/audience/messages',
489+
body: { error: 'bad request' },
490+
}),
491+
});
492+
const queue = createQueue(send, { onError });
493+
494+
queue.enqueue(makeMessage('1'));
495+
await queue.flush();
496+
497+
expect(queue.length).toBe(0);
498+
expect(onError.mock.calls[0][0].code).toBe('VALIDATION_REJECTED');
499+
expect(onError.mock.calls[0][0].status).toBe(400);
500+
});
501+
});
502+
329503
describe('page-unload flush (keepalive)', () => {
330504
it('flushes via keepalive fetch on visibilitychange to hidden', () => {
331505
const send = jest.fn<ReturnType<HttpSend>, Parameters<HttpSend>>().mockResolvedValue(okResult);
@@ -417,7 +591,7 @@ describe('page-unload flush (keepalive)', () => {
417591
);
418592
expect(queue.length).toBe(0);
419593

420-
// Listeners removed no double flush
594+
// Listeners removed - no double flush
421595
queue.enqueue(makeMessage('3'));
422596
window.dispatchEvent(new Event('pagehide'));
423597
expect(send).toHaveBeenCalledTimes(1);
@@ -435,7 +609,7 @@ describe('page-unload flush (keepalive)', () => {
435609
// Start an async flush (sets flushing = true)
436610
const pending = queue.flush();
437611

438-
// pagehide fires while async flush is in flight unload flush should be skipped
612+
// pagehide fires while async flush is in flight; unload flush should be skipped
439613
window.dispatchEvent(new Event('pagehide'));
440614
// Only 1 call (the async flush), no keepalive call
441615
expect(send).toHaveBeenCalledTimes(1);

0 commit comments

Comments
 (0)