Skip to content

Commit 20626d9

Browse files
nattb8claude
andauthored
feat(audience): add transport, queue, and context collection to core (#2813)
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent f1a32d8 commit 20626d9

10 files changed

Lines changed: 402 additions & 2 deletions

File tree

packages/audience/core/jest.config.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@ import type { Config } from 'jest';
33
const config: Config = {
44
roots: ['<rootDir>/src'],
55
moduleDirectories: ['node_modules', 'src'],
6+
moduleNameMapper: { '^@imtbl/(.*)$': '<rootDir>/../../../node_modules/@imtbl/$1/src' },
67
testEnvironment: 'jsdom',
78
transform: {
8-
'^.+\\.tsx?$': '@swc/jest',
9+
'^.+\\.(t|j)sx?$': '@swc/jest',
910
},
11+
transformIgnorePatterns: [],
1012
};
1113

1214
export default config;

packages/audience/core/package.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55
"author": "Immutable",
66
"private": true,
77
"bugs": "https://github.com/immutable/ts-immutable-sdk/issues",
8-
"dependencies": {},
8+
"dependencies": {
9+
"@imtbl/metrics": "workspace:*"
10+
},
911
"devDependencies": {
1012
"@swc/core": "^1.4.2",
1113
"@swc/jest": "^0.2.37",
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import { collectContext } from './context';
2+
3+
describe('collectContext', () => {
4+
it('includes library name and version', () => {
5+
const ctx = collectContext();
6+
expect(ctx.library).toBe('@imtbl/audience');
7+
expect(ctx.libraryVersion).toBeDefined();
8+
});
9+
10+
it('collects browser signals in jsdom', () => {
11+
const ctx = collectContext();
12+
expect(ctx.userAgent).toBeDefined();
13+
expect(ctx.locale).toBeDefined();
14+
expect(ctx.timezone).toBeDefined();
15+
expect(ctx.screen).toMatch(/^\d+x\d+$/);
16+
});
17+
18+
it('collects page info', () => {
19+
const ctx = collectContext();
20+
expect(ctx.pageUrl).toBeDefined();
21+
expect(ctx.pagePath).toBeDefined();
22+
expect(typeof ctx.pageReferrer).toBe('string');
23+
expect(typeof ctx.pageTitle).toBe('string');
24+
});
25+
});
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import type { EventContext } from './types';
2+
import { isBrowser } from './utils';
3+
4+
// WARNING: DO NOT CHANGE THE STRING BELOW. IT GETS REPLACED AT BUILD TIME.
5+
const SDK_VERSION = '__SDK_VERSION__';
6+
7+
export function collectContext(): EventContext {
8+
const context: EventContext = {
9+
library: '@imtbl/audience',
10+
libraryVersion: SDK_VERSION,
11+
};
12+
13+
if (!isBrowser()) return context;
14+
15+
context.userAgent = navigator.userAgent;
16+
context.locale = navigator.language;
17+
context.timezone = Intl.DateTimeFormat().resolvedOptions().timeZone;
18+
context.screen = `${window.screen.width}x${window.screen.height}`;
19+
context.pageUrl = window.location.href;
20+
context.pagePath = window.location.pathname;
21+
context.pageReferrer = document.referrer;
22+
context.pageTitle = document.title;
23+
24+
return context;
25+
}

packages/audience/core/src/index.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,8 @@ export {
2626
} from './config';
2727

2828
export { generateId, getTimestamp, isBrowser } from './utils';
29+
30+
export type { Transport } from './transport';
31+
export { httpTransport, httpSend } from './transport';
32+
export { MessageQueue } from './queue';
33+
export { collectContext } from './context';
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
import { MessageQueue } from './queue';
2+
import type { Transport } from './transport';
3+
import type { Message } from './types';
4+
import * as storage from './storage';
5+
6+
function makeMessage(id: string): Message {
7+
return {
8+
type: 'track',
9+
messageId: id,
10+
eventTimestamp: '2026-04-01T00:00:00.000Z',
11+
anonymousId: 'anon-1',
12+
surface: 'web',
13+
context: { library: '@imtbl/audience', libraryVersion: '0.0.0' },
14+
eventName: 'test',
15+
};
16+
}
17+
18+
function createQueue(
19+
transport: Transport,
20+
opts: { flushIntervalMs?: number; flushSize?: number } = {},
21+
) {
22+
return new MessageQueue(
23+
transport,
24+
'https://api.immutable.com/v1/audience/messages',
25+
'pk_imx_test',
26+
opts.flushIntervalMs ?? 5_000,
27+
opts.flushSize ?? 20,
28+
);
29+
}
30+
31+
beforeEach(() => {
32+
jest.useFakeTimers();
33+
localStorage.clear();
34+
});
35+
36+
afterEach(() => {
37+
jest.useRealTimers();
38+
});
39+
40+
describe('MessageQueue', () => {
41+
it('enqueues messages and flushes them', async () => {
42+
const send = jest.fn().mockResolvedValue(true);
43+
const queue = createQueue({ send });
44+
45+
queue.enqueue(makeMessage('1'));
46+
queue.enqueue(makeMessage('2'));
47+
48+
await queue.flush();
49+
50+
expect(send).toHaveBeenCalledTimes(1);
51+
expect(send.mock.calls[0][2].messages).toHaveLength(2);
52+
expect(queue.length).toBe(0);
53+
});
54+
55+
it('retains messages on failed flush', async () => {
56+
const send = jest.fn().mockResolvedValue(false);
57+
const queue = createQueue({ send });
58+
59+
queue.enqueue(makeMessage('1'));
60+
await queue.flush();
61+
62+
expect(queue.length).toBe(1);
63+
});
64+
65+
it('flushes automatically when batch size is reached', async () => {
66+
const send = jest.fn().mockResolvedValue(true);
67+
const queue = createQueue({ send }, { flushSize: 2 });
68+
69+
queue.enqueue(makeMessage('1'));
70+
expect(send).not.toHaveBeenCalled();
71+
72+
queue.enqueue(makeMessage('2'));
73+
// flush is async — await the microtask
74+
await Promise.resolve();
75+
expect(send).toHaveBeenCalledTimes(1);
76+
});
77+
78+
it('flushes on timer interval', async () => {
79+
const send = jest.fn().mockResolvedValue(true);
80+
const queue = createQueue({ send }, { flushIntervalMs: 1_000 });
81+
82+
queue.start();
83+
queue.enqueue(makeMessage('1'));
84+
85+
jest.advanceTimersByTime(1_000);
86+
// flush is async
87+
await Promise.resolve();
88+
expect(send).toHaveBeenCalledTimes(1);
89+
90+
queue.stop();
91+
});
92+
93+
it('persists messages to localStorage', () => {
94+
const send = jest.fn().mockResolvedValue(true);
95+
const queue = createQueue({ send });
96+
97+
queue.enqueue(makeMessage('1'));
98+
99+
const stored = JSON.parse(localStorage.getItem('__imtbl_audience_queue')!);
100+
expect(stored).toHaveLength(1);
101+
expect(stored[0].messageId).toBe('1');
102+
});
103+
104+
it('restores messages from localStorage on construction', () => {
105+
storage.setItem('queue', [makeMessage('restored')]);
106+
107+
const send = jest.fn().mockResolvedValue(true);
108+
const queue = createQueue({ send });
109+
110+
expect(queue.length).toBe(1);
111+
});
112+
113+
it('does not flush concurrently', async () => {
114+
let resolveFirst: () => void;
115+
const firstCall = new Promise<boolean>((r) => { resolveFirst = () => r(true); });
116+
const send = jest.fn()
117+
.mockReturnValueOnce(firstCall)
118+
.mockResolvedValue(true);
119+
120+
const queue = createQueue({ send });
121+
queue.enqueue(makeMessage('1'));
122+
123+
const flush1 = queue.flush();
124+
const flush2 = queue.flush(); // should no-op
125+
126+
resolveFirst!();
127+
await flush1;
128+
await flush2;
129+
130+
expect(send).toHaveBeenCalledTimes(1);
131+
});
132+
133+
it('clears all messages and storage', () => {
134+
const send = jest.fn().mockResolvedValue(true);
135+
const queue = createQueue({ send });
136+
137+
queue.enqueue(makeMessage('1'));
138+
queue.clear();
139+
140+
expect(queue.length).toBe(0);
141+
expect(localStorage.getItem('__imtbl_audience_queue')).toBeNull();
142+
});
143+
144+
it('handles messages enqueued during flush', async () => {
145+
let queue: ReturnType<typeof createQueue>;
146+
const send = jest.fn().mockImplementation(async () => {
147+
// Simulate a message arriving during the network request
148+
queue.enqueue(makeMessage('late'));
149+
return true;
150+
});
151+
152+
queue = createQueue({ send });
153+
queue.enqueue(makeMessage('1'));
154+
155+
await queue.flush();
156+
157+
// The original message was sent, but the late one should remain
158+
expect(queue.length).toBe(1);
159+
});
160+
});
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
import type { Message, BatchPayload } from './types';
2+
import type { Transport } from './transport';
3+
import * as storage from './storage';
4+
5+
const STORAGE_KEY = 'queue';
6+
7+
/**
8+
* Batched message queue with localStorage durability.
9+
*
10+
* Messages are flushed on a timer OR when the queue reaches `flushSize`,
11+
* whichever comes first. On success the sent messages are removed; on
12+
* failure they stay queued and retry on the next flush cycle.
13+
*
14+
* localStorage is used as a write-through cache so messages survive
15+
* page navigations. On construction, any previously-persisted messages
16+
* are restored into memory.
17+
*/
18+
export class MessageQueue {
19+
private messages: Message[];
20+
21+
private timer: ReturnType<typeof setInterval> | null = null;
22+
23+
private flushing = false;
24+
25+
constructor(
26+
private readonly transport: Transport,
27+
private readonly endpointUrl: string,
28+
private readonly publishableKey: string,
29+
private readonly flushIntervalMs: number,
30+
private readonly flushSize: number,
31+
) {
32+
this.messages = (storage.getItem(STORAGE_KEY) as Message[] | undefined) ?? [];
33+
}
34+
35+
start(): void {
36+
if (this.timer) return;
37+
this.timer = setInterval(() => this.flush(), this.flushIntervalMs);
38+
}
39+
40+
stop(): void {
41+
if (!this.timer) return;
42+
clearInterval(this.timer);
43+
this.timer = null;
44+
}
45+
46+
enqueue(message: Message): void {
47+
this.messages.push(message);
48+
this.persist();
49+
50+
if (this.messages.length >= this.flushSize) {
51+
this.flush();
52+
}
53+
}
54+
55+
/** Guard prevents concurrent flushes from racing on the same batch. */
56+
async flush(): Promise<void> {
57+
if (this.flushing || this.messages.length === 0) return;
58+
59+
this.flushing = true;
60+
try {
61+
const batch = [...this.messages];
62+
const payload: BatchPayload = { messages: batch };
63+
64+
const ok = await this.transport.send(this.endpointUrl, this.publishableKey, payload);
65+
if (ok) {
66+
// Slice rather than clear — new messages may have been enqueued during the request.
67+
this.messages = this.messages.slice(batch.length);
68+
this.persist();
69+
}
70+
} finally {
71+
this.flushing = false;
72+
}
73+
}
74+
75+
get length(): number {
76+
return this.messages.length;
77+
}
78+
79+
clear(): void {
80+
this.messages = [];
81+
storage.removeItem(STORAGE_KEY);
82+
}
83+
84+
private persist(): void {
85+
storage.setItem(STORAGE_KEY, this.messages);
86+
}
87+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import { httpSend } from './transport';
2+
import type { BatchPayload } from './types';
3+
4+
const payload: BatchPayload = {
5+
messages: [
6+
{
7+
type: 'track',
8+
messageId: 'msg-1',
9+
eventTimestamp: '2026-04-01T00:00:00.000Z',
10+
anonymousId: 'anon-1',
11+
surface: 'web',
12+
context: { library: '@imtbl/audience', libraryVersion: '0.0.0' },
13+
eventName: 'purchase',
14+
properties: { value: 9.99 },
15+
},
16+
],
17+
};
18+
19+
describe('httpSend', () => {
20+
const originalFetch = global.fetch;
21+
22+
afterEach(() => {
23+
global.fetch = originalFetch;
24+
});
25+
26+
it('sends POST with correct headers and body', async () => {
27+
const mockFetch = jest.fn().mockResolvedValue({ ok: true });
28+
global.fetch = mockFetch;
29+
30+
await httpSend('https://api.immutable.com/v1/audience/messages', 'pk_imx_test', payload);
31+
32+
expect(mockFetch).toHaveBeenCalledWith('https://api.immutable.com/v1/audience/messages', {
33+
method: 'POST',
34+
headers: {
35+
'Content-Type': 'application/json',
36+
'x-immutable-publishable-key': 'pk_imx_test',
37+
},
38+
body: JSON.stringify(payload),
39+
});
40+
});
41+
42+
it('returns true on success', async () => {
43+
global.fetch = jest.fn().mockResolvedValue({ ok: true });
44+
expect(await httpSend('https://example.com', 'pk', payload)).toBe(true);
45+
});
46+
47+
it('returns false on HTTP error', async () => {
48+
global.fetch = jest.fn().mockResolvedValue({ ok: false, status: 500 });
49+
expect(await httpSend('https://example.com', 'pk', payload)).toBe(false);
50+
});
51+
52+
it('returns false on network error', async () => {
53+
global.fetch = jest.fn().mockRejectedValue(new TypeError('Failed to fetch'));
54+
expect(await httpSend('https://example.com', 'pk', payload)).toBe(false);
55+
});
56+
});

0 commit comments

Comments
 (0)