Skip to content

Commit 08a0812

Browse files
committed
fix(replay): Avoid main-thread blocking in WorkerHandler under event bursts
WorkerHandler.postMessage attached a fresh 'message' listener per request, removed only when the matching response arrived. Under a burst of N in-flight requests, every worker response was dispatched to all N attached listeners, giving O(n^2) main-thread dispatch work. Replace with a single long-lived listener attached in the constructor, and route responses through a Map<id, { method, resolve, reject }>. destroy() also rejects pending requests instead of leaving them hanging. Public API unchanged. Closes #20547
1 parent 0daf962 commit 08a0812

1 file changed

Lines changed: 38 additions & 29 deletions

File tree

packages/replay-internal/src/eventBuffer/WorkerHandler.ts

Lines changed: 38 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@ import { DEBUG_BUILD } from '../debug-build';
22
import type { WorkerRequest, WorkerResponse } from '../types';
33
import { debug } from '../util/logger';
44

5+
interface PendingRequest {
6+
method: WorkerRequest['method'];
7+
resolve: (value: unknown) => void;
8+
reject: (reason: unknown) => void;
9+
}
10+
511
/**
612
* Event buffer that uses a web worker to compress events.
713
* Exported only for testing.
@@ -10,10 +16,16 @@ export class WorkerHandler {
1016
private _worker: Worker;
1117
private _id: number;
1218
private _ensureReadyPromise?: Promise<void>;
19+
private _pending: Map<number, PendingRequest>;
1320

1421
public constructor(worker: Worker) {
1522
this._worker = worker;
1623
this._id = 0;
24+
this._pending = new Map();
25+
// A single long-lived listener routes responses by id. Per-request
26+
// listeners would make worker dispatch O(n) per response, so a burst of N
27+
// in-flight requests becomes O(n^2) main-thread work.
28+
this._worker.addEventListener('message', this._onMessage);
1729
}
1830

1931
/**
@@ -62,6 +74,9 @@ export class WorkerHandler {
6274
*/
6375
public destroy(): void {
6476
DEBUG_BUILD && debug.log('Destroying compression worker');
77+
this._worker.removeEventListener('message', this._onMessage);
78+
this._pending.forEach(pending => pending.reject(new Error('Worker destroyed')));
79+
this._pending.clear();
6580
this._worker.terminate();
6681
}
6782

@@ -71,39 +86,33 @@ export class WorkerHandler {
7186
public postMessage<T>(method: WorkerRequest['method'], arg?: WorkerRequest['arg']): Promise<T> {
7287
const id = this._getAndIncrementId();
7388

74-
return new Promise((resolve, reject) => {
75-
const listener = ({ data }: MessageEvent): void => {
76-
const response = data as WorkerResponse;
77-
if (response.method !== method) {
78-
return;
79-
}
80-
81-
// There can be multiple listeners for a single method, the id ensures
82-
// that the response matches the caller.
83-
if (response.id !== id) {
84-
return;
85-
}
86-
87-
// At this point, we'll always want to remove listener regardless of result status
88-
this._worker.removeEventListener('message', listener);
89+
return new Promise<T>((resolve, reject) => {
90+
this._pending.set(id, {
91+
method,
92+
resolve: resolve as (value: unknown) => void,
93+
reject,
94+
});
95+
this._worker.postMessage({ id, method, arg });
96+
});
97+
}
8998

90-
if (!response.success) {
91-
// TODO: Do some error handling, not sure what
92-
DEBUG_BUILD && debug.error('Error in compression worker: ', response.response);
99+
private _onMessage = ({ data }: MessageEvent): void => {
100+
const response = data as WorkerResponse;
101+
const pending = this._pending.get(response.id);
102+
if (!pending || pending.method !== response.method) {
103+
return;
104+
}
93105

94-
reject(new Error('Error in compression worker'));
95-
return;
96-
}
106+
this._pending.delete(response.id);
97107

98-
resolve(response.response as T);
99-
};
108+
if (!response.success) {
109+
DEBUG_BUILD && debug.error('Error in compression worker: ', response.response);
110+
pending.reject(new Error('Error in compression worker'));
111+
return;
112+
}
100113

101-
// Note: we can't use `once` option because it's possible it needs to
102-
// listen to multiple messages
103-
this._worker.addEventListener('message', listener);
104-
this._worker.postMessage({ id, method, arg });
105-
});
106-
}
114+
pending.resolve(response.response);
115+
};
107116

108117
/** Get the current ID and increment it for the next call. */
109118
private _getAndIncrementId(): number {

0 commit comments

Comments
 (0)