Skip to content

Commit 1e0341d

Browse files
logaretmJPeer264
andauthored
fix(cloudflare): avoid repeated flush lock wrapping (#21156)
Durable Objects can reuse the same instrumented context across many requests. `makeFlushLock()` replaced `context.waitUntil` on every init, so each request wrapped the previous wrapper and built an unbounded call chain. A later `waitUntil()` call then had to recurse through every wrapper, eventually causing stack overflows and retaining stale closures. I added a mechanism to reuse a single `waitUntil` wrapper per Cloudflare context instead of wrapping again on every SDK init. Also added a regression coverage for long-lived contexts repeatedly creating flush locks. Fixes #21150 --------- Co-authored-by: JPeer264 <jan.peer@sentry.io>
1 parent d75440c commit 1e0341d

3 files changed

Lines changed: 164 additions & 16 deletions

File tree

dev-packages/e2e-tests/test-applications/cloudflare-workers/tests/memory.test.ts

Lines changed: 54 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,28 +3,76 @@ import { expect, test } from '@playwright/test';
33
import { INSPECTOR_PORT } from '../playwright.config';
44

55
test.describe('Worker V8 isolate memory tests', () => {
6-
test('worker memory is reclaimed after GC', async ({ baseURL }) => {
6+
test('worker memory is stable across request batches', async ({ baseURL }) => {
77
const profiler = new MemoryProfiler({ port: INSPECTOR_PORT });
88

99
// Warm up: make initial requests and let the runtime settle
10-
for (let i = 0; i < 5; i++) {
10+
for (let i = 0; i < 20; i++) {
1111
await fetch(baseURL!);
1212
}
1313

1414
await profiler.connect();
1515

16-
const baselineSnapshot = await profiler.takeHeapSnapshot();
16+
// First batch
17+
for (let i = 0; i < 50; i++) {
18+
const res = await fetch(baseURL!);
19+
expect(res.status).toBe(200);
20+
await res.text();
21+
}
22+
23+
const afterFirstBatch = await profiler.takeHeapSnapshot();
1724

25+
// Second batch
1826
for (let i = 0; i < 50; i++) {
1927
const res = await fetch(baseURL!);
2028
expect(res.status).toBe(200);
2129
await res.text();
2230
}
2331

24-
const finalSnapshot = await profiler.takeHeapSnapshot();
25-
const result = profiler.compareSnapshots(baselineSnapshot, finalSnapshot);
32+
const afterSecondBatch = await profiler.takeHeapSnapshot();
33+
34+
// Compare batches to detect per-request leaks (excludes warm-up effects)
35+
const result = profiler.compareSnapshots(afterFirstBatch, afterSecondBatch);
36+
37+
expect(result.nodeGrowthPercent).toBeLessThan(0.15);
38+
39+
await profiler.close();
40+
});
41+
42+
test('durable object memory is stable across request batches', async ({ baseURL }) => {
43+
const profiler = new MemoryProfiler({ port: INSPECTOR_PORT });
44+
45+
// Warm up: let JIT compile, caches fill, and DO instance stabilize
46+
for (let i = 0; i < 30; i++) {
47+
await fetch(`${baseURL}/pass-to-object/storage/put`);
48+
}
49+
50+
await profiler.connect();
51+
52+
// First batch of requests to the same DO
53+
for (let i = 0; i < 50; i++) {
54+
const res = await fetch(`${baseURL}/pass-to-object/storage/put`);
55+
expect(res.status).toBe(200);
56+
await res.text();
57+
}
58+
59+
const afterFirstBatch = await profiler.takeHeapSnapshot();
60+
61+
// Second batch of requests to the same DO
62+
for (let i = 0; i < 50; i++) {
63+
const res = await fetch(`${baseURL}/pass-to-object/storage/put`);
64+
expect(res.status).toBe(200);
65+
await res.text();
66+
}
67+
68+
const afterSecondBatch = await profiler.takeHeapSnapshot();
69+
70+
// Compare batches to detect per-request leaks (excludes warm-up effects)
71+
// Before fix: makeFlushLock re-wrapped waitUntil on each request = leak
72+
// After fix: growth should be minimal
73+
const result = profiler.compareSnapshots(afterFirstBatch, afterSecondBatch);
2674

27-
expect(result.nodeGrowthPercent).toBeLessThan(1);
75+
expect(result.nodeGrowthPercent).toBeLessThan(0.15);
2876

2977
await profiler.close();
3078
});

packages/cloudflare/src/flush.ts

Lines changed: 63 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,17 @@ type FlushLock = {
77
readonly finalize: () => Promise<void>;
88
};
99

10+
type FlushLockRegistry = {
11+
readonly locks: Set<FlushLockInternal>;
12+
};
13+
14+
type FlushLockInternal = FlushLock & {
15+
readonly acquire: () => void;
16+
readonly release: () => void;
17+
};
18+
19+
const flushLockRegistries = new WeakMap<ExecutionContext['waitUntil'], FlushLockRegistry>();
20+
1021
/**
1122
* Enhances the given execution context by wrapping its `waitUntil` method with a proxy
1223
* to monitor pending tasks, and provides a flusher function to ensure all tasks
@@ -16,27 +27,69 @@ type FlushLock = {
1627
* @return {FlushLock} Returns a flusher function if a valid context is provided, otherwise undefined.
1728
*/
1829
export function makeFlushLock(context: ExecutionContext): FlushLock {
30+
const registry = getOrCreateFlushLockRegistry(context);
1931
let resolveAllDone: () => void = () => undefined;
2032
const allDone = new Promise<void>(res => {
2133
resolveAllDone = res;
2234
});
2335
let pending = 0;
36+
37+
const lock: FlushLockInternal = {
38+
ready: allDone,
39+
acquire: () => {
40+
pending++;
41+
},
42+
release: () => {
43+
if (--pending === 0) {
44+
registry.locks.delete(lock);
45+
resolveAllDone();
46+
}
47+
},
48+
finalize: () => {
49+
if (pending === 0) {
50+
registry.locks.delete(lock);
51+
resolveAllDone();
52+
}
53+
return allDone;
54+
},
55+
};
56+
57+
registry.locks.add(lock);
58+
return Object.freeze(lock);
59+
}
60+
61+
function getOrCreateFlushLockRegistry(context: ExecutionContext): FlushLockRegistry {
62+
// eslint-disable-next-line @typescript-eslint/unbound-method
63+
const waitUntil = context.waitUntil;
64+
const existingRegistry = flushLockRegistries.get(waitUntil);
65+
66+
if (existingRegistry) {
67+
return existingRegistry;
68+
}
69+
70+
const registry: FlushLockRegistry = { locks: new Set() };
2471
const originalWaitUntil = context.waitUntil.bind(context) as typeof context.waitUntil;
25-
context.waitUntil = promise => {
26-
pending++;
72+
const instrumentedWaitUntil: typeof context.waitUntil = promise => {
73+
// Snapshot active locks so locks created after this call do not wait for earlier waitUntil work.
74+
const locks = [...registry.locks];
75+
76+
for (const lock of locks) {
77+
lock.acquire();
78+
}
79+
2780
return originalWaitUntil(
2881
promise.finally(() => {
29-
if (--pending === 0) resolveAllDone();
82+
for (const lock of locks) {
83+
lock.release();
84+
}
3085
}),
3186
);
3287
};
33-
return Object.freeze({
34-
ready: allDone,
35-
finalize: () => {
36-
if (pending === 0) resolveAllDone();
37-
return allDone;
38-
},
39-
});
88+
89+
flushLockRegistries.set(instrumentedWaitUntil, registry);
90+
context.waitUntil = instrumentedWaitUntil;
91+
92+
return registry;
4093
}
4194

4295
/**

packages/cloudflare/test/flush.test.ts

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,53 @@ describe('Flush buffer test', () => {
2929
await Promise.all(waitUntilPromises);
3030
await expect(lock.ready).resolves.toBeUndefined();
3131
});
32+
33+
it('does not grow the waitUntil wrapper stack on repeated flush lock creation', async () => {
34+
const waitUntilPromises: Promise<void>[] = [];
35+
const context: ExecutionContext = {
36+
waitUntil: vi.fn(promise => {
37+
waitUntilPromises.push(promise);
38+
}),
39+
passThroughOnException: vi.fn(),
40+
};
41+
42+
for (let i = 0; i < 20_000; i++) {
43+
makeFlushLock(context);
44+
}
45+
46+
expect(() => context.waitUntil(Promise.resolve())).not.toThrow();
47+
await Promise.all(waitUntilPromises);
48+
});
49+
50+
it('creates a fresh flush lock when waitUntil was already instrumented', async () => {
51+
const waitUntilPromises: Promise<void>[] = [];
52+
const context: ExecutionContext = {
53+
waitUntil: vi.fn(promise => {
54+
waitUntilPromises.push(promise);
55+
}),
56+
passThroughOnException: vi.fn(),
57+
};
58+
59+
const firstLock = makeFlushLock(context);
60+
await firstLock.finalize();
61+
62+
let resolveWaitUntil!: () => void;
63+
const secondTask = new Promise<void>(resolve => {
64+
resolveWaitUntil = resolve;
65+
});
66+
const secondLock = makeFlushLock(context);
67+
68+
context.waitUntil(secondTask);
69+
void secondLock.finalize();
70+
71+
await Promise.resolve();
72+
expect(waitUntilPromises).toHaveLength(1);
73+
await expect(Promise.race([secondLock.ready, Promise.resolve('pending')])).resolves.toBe('pending');
74+
75+
resolveWaitUntil();
76+
await Promise.all(waitUntilPromises);
77+
await expect(secondLock.ready).resolves.toBeUndefined();
78+
});
3279
});
3380

3481
describe('flushAndDispose', () => {

0 commit comments

Comments
 (0)