Skip to content

Commit 6708ff2

Browse files
authored
fix(replay): Avoid main-thread blocking in WorkerHandler under event bursts (#20548)
Closes #20547 `WorkerHandler.postMessage` attached a fresh `'message'` listener per request, removed only when the matching response arrived. Under a burst of N in-flight requests, every worker response was dispatched to all N attached listeners, causing the main-thread to lock up. I replaced multiple listeners registration with a single long-lived listener attached in the constructor, routing responses through a `Map`. `destroy()` also removes the listener and rejects pending requests instead of leaving them hanging. Public API unchanged. I created a repro in https://github.com/logaretm/sentry-replay-worker-quadratic-repro and verified the issue before the fix and after.
1 parent 0daf962 commit 6708ff2

3 files changed

Lines changed: 226 additions & 30 deletions

File tree

.size-limit.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ module.exports = [
326326
path: createCDNPath('bundle.tracing.replay.feedback.min.js'),
327327
gzip: false,
328328
brotli: false,
329-
limit: '271 KB',
329+
limit: '272 KB',
330330
disablePlugins: ['@size-limit/esbuild'],
331331
},
332332
{

packages/replay-internal/src/eventBuffer/WorkerHandler.ts

Lines changed: 51 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@ import { DEBUG_BUILD } from '../debug-build';
22
import type { WorkerRequest, WorkerResponse } from '../types';
33
import { debug } from '../util/logger';
44

5+
interface PendingRequest {
6+
method: WorkerRequest['method'];
7+
resolve: (value: unknown) => void;
8+
reject: (reason: unknown) => void;
9+
}
10+
511
/**
612
* Event buffer that uses a web worker to compress events.
713
* Exported only for testing.
@@ -10,10 +16,16 @@ export class WorkerHandler {
1016
private _worker: Worker;
1117
private _id: number;
1218
private _ensureReadyPromise?: Promise<void>;
19+
private _pending: Map<number, PendingRequest>;
1320

1421
public constructor(worker: Worker) {
1522
this._worker = worker;
1623
this._id = 0;
24+
this._pending = new Map();
25+
// A single long-lived listener routes responses by id. Per-request
26+
// listeners would make worker dispatch O(n) per response, so a burst of N
27+
// in-flight requests becomes O(n^2) main-thread work.
28+
this._worker.addEventListener('message', this._onMessage);
1729
}
1830

1931
/**
@@ -62,6 +74,9 @@ export class WorkerHandler {
6274
*/
6375
public destroy(): void {
6476
DEBUG_BUILD && debug.log('Destroying compression worker');
77+
this._worker.removeEventListener('message', this._onMessage);
78+
this._pending.forEach(pending => pending.reject(new Error('Worker destroyed')));
79+
this._pending.clear();
6580
this._worker.terminate();
6681
}
6782

@@ -71,39 +86,46 @@ export class WorkerHandler {
7186
public postMessage<T>(method: WorkerRequest['method'], arg?: WorkerRequest['arg']): Promise<T> {
7287
const id = this._getAndIncrementId();
7388

74-
return new Promise((resolve, reject) => {
75-
const listener = ({ data }: MessageEvent): void => {
76-
const response = data as WorkerResponse;
77-
if (response.method !== method) {
78-
return;
79-
}
80-
81-
// There can be multiple listeners for a single method, the id ensures
82-
// that the response matches the caller.
83-
if (response.id !== id) {
84-
return;
85-
}
86-
87-
// At this point, we'll always want to remove listener regardless of result status
88-
this._worker.removeEventListener('message', listener);
89+
return new Promise<T>((resolve, reject) => {
90+
this._pending.set(id, {
91+
method,
92+
resolve: resolve as (value: unknown) => void,
93+
reject,
94+
});
95+
try {
96+
this._worker.postMessage({ id, method, arg });
97+
} catch (error) {
98+
// If postMessage throws synchronously (e.g. DataCloneError, worker
99+
// already terminated), drop the pending entry so it doesn't leak.
100+
this._pending.delete(id);
101+
reject(error);
102+
}
103+
});
104+
}
89105

90-
if (!response.success) {
91-
// TODO: Do some error handling, not sure what
92-
DEBUG_BUILD && debug.error('Error in compression worker: ', response.response);
106+
private _onMessage = ({ data }: MessageEvent): void => {
107+
const response = data as WorkerResponse;
108+
// The worker emits an init message with `id: undefined` on load, which is
109+
// handled by `ensureReady()` via its own listener. Ignore anything that
110+
// doesn't carry a numeric id we issued.
111+
if (typeof response.id !== 'number') {
112+
return;
113+
}
114+
const pending = this._pending.get(response.id);
115+
if (!pending || pending.method !== response.method) {
116+
return;
117+
}
93118

94-
reject(new Error('Error in compression worker'));
95-
return;
96-
}
119+
this._pending.delete(response.id);
97120

98-
resolve(response.response as T);
99-
};
121+
if (!response.success) {
122+
DEBUG_BUILD && debug.error('Error in compression worker: ', response.response);
123+
pending.reject(new Error('Error in compression worker'));
124+
return;
125+
}
100126

101-
// Note: we can't use `once` option because it's possible it needs to
102-
// listen to multiple messages
103-
this._worker.addEventListener('message', listener);
104-
this._worker.postMessage({ id, method, arg });
105-
});
106-
}
127+
pending.resolve(response.response);
128+
};
107129

108130
/** Get the current ID and increment it for the next call. */
109131
private _getAndIncrementId(): number {
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
/**
2+
* @vitest-environment jsdom
3+
*/
4+
5+
import { describe, expect, it } from 'vitest';
6+
import { WorkerHandler } from '../../../src/eventBuffer/WorkerHandler';
7+
import type { WorkerResponse } from '../../../src/types';
8+
9+
/**
10+
* Minimal Worker stub that lets tests control when responses dispatch and
11+
* track how many 'message' listeners are attached at any time. Real workers
12+
* are async; we model that with a queue we drain manually so the test can
13+
* assert on the listener count while requests are in flight.
14+
*/
15+
class MockWorker implements Pick<Worker, 'addEventListener' | 'removeEventListener' | 'postMessage' | 'terminate'> {
16+
public listenerCount = 0;
17+
public terminated = false;
18+
19+
private _listeners = new Map<string, Set<EventListenerOrEventListenerObject>>();
20+
private _pendingRequests: Array<{ id: number; method: string }> = [];
21+
22+
public addEventListener(type: string, listener: EventListenerOrEventListenerObject): void {
23+
if (!this._listeners.has(type)) this._listeners.set(type, new Set());
24+
this._listeners.get(type)!.add(listener);
25+
if (type === 'message') this.listenerCount++;
26+
}
27+
28+
public removeEventListener(type: string, listener: EventListenerOrEventListenerObject): void {
29+
const set = this._listeners.get(type);
30+
if (set?.delete(listener) && type === 'message') this.listenerCount--;
31+
}
32+
33+
public postMessage(data: unknown): void {
34+
const { id, method } = data as { id: number; method: string };
35+
this._pendingRequests.push({ id, method });
36+
}
37+
38+
public terminate(): void {
39+
this.terminated = true;
40+
}
41+
42+
/** Dispatch the queued response for a given id (FIFO order otherwise). */
43+
public flushOne(overrides?: Partial<WorkerResponse>): void {
44+
const next = this._pendingRequests.shift();
45+
if (!next) return;
46+
const response: WorkerResponse = {
47+
id: next.id,
48+
method: next.method,
49+
success: true,
50+
response: `result-${next.id}`,
51+
...overrides,
52+
};
53+
this._dispatch('message', { data: response } as MessageEvent);
54+
}
55+
56+
public flushAll(): void {
57+
while (this._pendingRequests.length > 0) this.flushOne();
58+
}
59+
60+
/** Dispatch a message that doesn't correspond to a queued request. */
61+
public dispatchRaw(response: Partial<WorkerResponse>): void {
62+
this._dispatch('message', { data: response } as MessageEvent);
63+
}
64+
65+
public get pendingCount(): number {
66+
return this._pendingRequests.length;
67+
}
68+
69+
private _dispatch(type: string, event: MessageEvent): void {
70+
const set = this._listeners.get(type);
71+
if (!set) return;
72+
for (const listener of set) {
73+
if (typeof listener === 'function') listener(event);
74+
else listener.handleEvent(event);
75+
}
76+
}
77+
}
78+
79+
const makeHandler = () => {
80+
const worker = new MockWorker();
81+
const handler = new WorkerHandler(worker as unknown as Worker);
82+
return { worker, handler };
83+
};
84+
85+
describe('Unit | eventBuffer | WorkerHandler', () => {
86+
it('does not attach a new message listener per postMessage call (regression: #20547)', async () => {
87+
const { worker, handler } = makeHandler();
88+
89+
// One listener is attached at construction time.
90+
expect(worker.listenerCount).toBe(1);
91+
92+
// Fire a burst of in-flight requests. The pre-fix implementation attached
93+
// one listener per call, growing linearly; this would dispatch every
94+
// response to all attached listeners (O(n^2) main-thread work).
95+
const promises = Array.from({ length: 100 }, (_, i) => handler.postMessage('addEvent', `arg-${i}`));
96+
97+
expect(worker.listenerCount).toBe(1);
98+
expect(worker.pendingCount).toBe(100);
99+
100+
worker.flushAll();
101+
await Promise.all(promises);
102+
103+
// Listener count is still 1 after the burst drains.
104+
expect(worker.listenerCount).toBe(1);
105+
});
106+
107+
it('resolves concurrent postMessage calls with the correct response per id', async () => {
108+
const { worker, handler } = makeHandler();
109+
110+
const p0 = handler.postMessage<string>('addEvent', 'a');
111+
const p1 = handler.postMessage<string>('addEvent', 'b');
112+
const p2 = handler.postMessage<string>('addEvent', 'c');
113+
114+
worker.flushAll();
115+
116+
await expect(p0).resolves.toBe('result-0');
117+
await expect(p1).resolves.toBe('result-1');
118+
await expect(p2).resolves.toBe('result-2');
119+
});
120+
121+
it('rejects when the worker reports success: false', async () => {
122+
const { worker, handler } = makeHandler();
123+
124+
const promise = handler.postMessage('addEvent', 'a');
125+
worker.flushOne({ success: false, response: 'boom' });
126+
127+
await expect(promise).rejects.toThrow('Error in compression worker');
128+
});
129+
130+
it('rejects and cleans up the pending entry when worker.postMessage throws synchronously', async () => {
131+
const { worker, handler } = makeHandler();
132+
const error = new Error('DataCloneError');
133+
worker.postMessage = () => {
134+
throw error;
135+
};
136+
137+
await expect(handler.postMessage('addEvent', 'a')).rejects.toBe(error);
138+
139+
// A subsequent successful call should still work — the previous failure
140+
// didn't leave a stale entry behind.
141+
worker.postMessage = MockWorker.prototype.postMessage.bind(worker);
142+
const promise = handler.postMessage<string>('addEvent', 'b');
143+
worker.flushOne();
144+
await expect(promise).resolves.toBe('result-1');
145+
});
146+
147+
it('ignores messages without a numeric id (e.g. the worker init message)', async () => {
148+
const { worker, handler } = makeHandler();
149+
150+
const promise = handler.postMessage<string>('addEvent', 'a');
151+
152+
// Simulate the init message the worker emits on load. Should be ignored
153+
// and not crash.
154+
worker.dispatchRaw({ id: undefined, method: 'init', success: true });
155+
156+
// The legitimate response still resolves.
157+
worker.flushOne();
158+
await expect(promise).resolves.toBe('result-0');
159+
});
160+
161+
it('destroy() rejects pending requests and detaches the listener', async () => {
162+
const { worker, handler } = makeHandler();
163+
164+
const p1 = handler.postMessage('addEvent', 'a');
165+
const p2 = handler.postMessage('addEvent', 'b');
166+
167+
handler.destroy();
168+
169+
await expect(p1).rejects.toThrow('Worker destroyed');
170+
await expect(p2).rejects.toThrow('Worker destroyed');
171+
expect(worker.terminated).toBe(true);
172+
expect(worker.listenerCount).toBe(0);
173+
});
174+
});

0 commit comments

Comments
 (0)