Skip to content

Commit 0976d32

Browse files
committed
Add memory leak harness self-test
Refactors the RSS detector into a shared e2e harness and adds a fast synthetic self-test that proves the detector stays green for a stable child process and trips for an intentionally leaky one. The dedicated memory leak CI job now runs both the harness self-test and the real engine regression test. Made-with: Cursor Committed-By-Agent: cursor
1 parent bca1f3e commit 0976d32

4 files changed

Lines changed: 351 additions & 109 deletions

File tree

.github/workflows/ci.yml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -500,7 +500,10 @@ jobs:
500500
run: pnpm install --frozen-lockfile && pnpm build
501501

502502
- name: Memory leak regression test
503-
run: pnpm --filter @stripe/sync-e2e exec vitest run memory-leak.test.ts
503+
run: |
504+
pnpm --filter @stripe/sync-e2e exec vitest run \
505+
memory-leak-harness.test.ts \
506+
memory-leak.test.ts
504507
timeout-minutes: 10
505508

506509
# ---------------------------------------------------------------------------
@@ -574,6 +577,7 @@ jobs:
574577
--exclude 'test-sync-e2e.test.ts' \
575578
--exclude 'test-sync-engine.test.ts' \
576579
--exclude 'test-e2e-network.test.ts' \
580+
--exclude 'memory-leak-harness.test.ts' \
577581
--exclude 'memory-leak.test.ts' # ↑ run in dedicated jobs
578582
env:
579583
STRIPE_API_KEY: ${{ secrets.STRIPE_API_KEY }}

e2e/memory-leak-harness.test.ts

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
import { afterEach, describe, expect, it } from 'vitest'
2+
import { spawn, type ChildProcess } from 'node:child_process'
3+
import {
4+
drainNdjsonResponse,
5+
formatMemoryLeakSummary,
6+
hasTimeLimitEof,
7+
runMemoryLeakDetector,
8+
type MemoryLeakSettings,
9+
} from './memory-leak-harness.js'
10+
11+
const DETECTOR_SETTINGS: MemoryLeakSettings = {
12+
warmupIterations: 6,
13+
testIterations: 12,
14+
settleMs: 50,
15+
slopeThresholdKb: 3000,
16+
growthThresholdMb: 300,
17+
}
18+
19+
const SYNTHETIC_LEAK_BYTES = 8 * 1024 * 1024
20+
21+
const children = new Set<ChildProcess>()
22+
23+
function spawnSyntheticServer(leakBytesPerRequest: number): Promise<{ proc: ChildProcess; baseUrl: string }> {
24+
return new Promise((resolve, reject) => {
25+
const code = [
26+
'import http from "node:http";',
27+
'const retained = [];',
28+
`const leakBytesPerRequest = ${leakBytesPerRequest};`,
29+
'const server = http.createServer((req, res) => {',
30+
' if (req.url === "/health") {',
31+
' res.writeHead(200, { "content-type": "application/json" });',
32+
' res.end(JSON.stringify({ ok: true }));',
33+
' return;',
34+
' }',
35+
' if (req.url?.startsWith("/pipeline_setup")) {',
36+
' res.writeHead(200, { "content-type": "application/x-ndjson" });',
37+
' res.end(JSON.stringify({ type: "control" }) + "\\n");',
38+
' return;',
39+
' }',
40+
' if (req.url?.startsWith("/pipeline_sync")) {',
41+
' if (leakBytesPerRequest > 0) retained.push(Buffer.alloc(leakBytesPerRequest, 1));',
42+
' res.writeHead(200, { "content-type": "application/x-ndjson" });',
43+
' res.end(JSON.stringify({ type: "eof", eof: { reason: "time_limit" } }) + "\\n");',
44+
' return;',
45+
' }',
46+
' res.writeHead(404);',
47+
' res.end("not found");',
48+
'});',
49+
'server.listen(0, "127.0.0.1", () => {',
50+
' const addr = server.address();',
51+
' console.log(`READY:${addr.port}`);',
52+
'});',
53+
].join('\n')
54+
55+
const proc = spawn('node', ['--input-type=module', '-e', code], {
56+
stdio: ['ignore', 'pipe', 'pipe'],
57+
}) as ChildProcess
58+
children.add(proc)
59+
60+
let output = ''
61+
const timeout = setTimeout(() => {
62+
proc.kill('SIGKILL')
63+
reject(new Error(`Synthetic server did not start within 5s\noutput: ${output}`))
64+
}, 5_000)
65+
66+
proc.stdout!.on('data', (chunk: Buffer) => {
67+
output += chunk.toString()
68+
const match = output.match(/READY:(\d+)/)
69+
if (!match) return
70+
clearTimeout(timeout)
71+
resolve({ proc, baseUrl: `http://127.0.0.1:${match[1]}` })
72+
})
73+
74+
proc.stderr!.on('data', (chunk: Buffer) => {
75+
output += chunk.toString()
76+
})
77+
78+
proc.on('error', (err: Error) => {
79+
clearTimeout(timeout)
80+
reject(err)
81+
})
82+
83+
proc.on('exit', (code: number | null) => {
84+
clearTimeout(timeout)
85+
if (!output.includes('READY:')) {
86+
reject(new Error(`Synthetic server exited with code ${code}\noutput: ${output}`))
87+
}
88+
})
89+
})
90+
}
91+
92+
async function runSyntheticScenario(leakBytesPerRequest: number) {
93+
const { proc, baseUrl } = await spawnSyntheticServer(leakBytesPerRequest)
94+
95+
const setupRes = await fetch(`${baseUrl}/pipeline_setup`, { method: 'POST' })
96+
expect(setupRes.ok).toBe(true)
97+
await drainNdjsonResponse(setupRes)
98+
99+
const result = await runMemoryLeakDetector({
100+
pid: proc.pid!,
101+
settings: DETECTOR_SETTINGS,
102+
iterate: async () => {
103+
const res = await fetch(`${baseUrl}/pipeline_sync?time_limit=0.1`, { method: 'POST' })
104+
expect(res.ok).toBe(true)
105+
const messages = await drainNdjsonResponse(res)
106+
return { sawTimeLimit: hasTimeLimitEof(messages) }
107+
},
108+
})
109+
110+
return result
111+
}
112+
113+
afterEach(async () => {
114+
for (const child of children) {
115+
if (child.pid) child.kill('SIGKILL')
116+
await new Promise<void>((resolve) => {
117+
child.once('exit', () => resolve())
118+
setTimeout(resolve, 500)
119+
})
120+
children.delete(child)
121+
}
122+
})
123+
124+
describe('memory leak harness', { timeout: 120_000 }, () => {
125+
it('does not flag a stable synthetic process', async () => {
126+
const result = await runSyntheticScenario(0)
127+
128+
console.log(formatMemoryLeakSummary(result))
129+
130+
expect(result.timeLimitCount).toBe(result.totalIterations)
131+
expect(result.passesThresholds).toBe(true)
132+
expect(result.slopeKbPerIteration).toBeLessThan(DETECTOR_SETTINGS.slopeThresholdKb)
133+
})
134+
135+
it('flags an intentionally leaky synthetic process', async () => {
136+
const result = await runSyntheticScenario(SYNTHETIC_LEAK_BYTES)
137+
138+
console.log(formatMemoryLeakSummary(result))
139+
140+
expect(result.timeLimitCount).toBe(result.totalIterations)
141+
expect(result.passesThresholds).toBe(false)
142+
expect(result.slopeKbPerIteration).toBeGreaterThan(DETECTOR_SETTINGS.slopeThresholdKb)
143+
})
144+
})

e2e/memory-leak-harness.ts

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
import { execSync } from 'node:child_process'
2+
3+
export type MemoryLeakSettings = {
4+
warmupIterations: number
5+
testIterations: number
6+
settleMs: number
7+
slopeThresholdKb: number
8+
growthThresholdMb: number
9+
}
10+
11+
export type MemoryLeakIterationResult = {
12+
sawTimeLimit: boolean
13+
}
14+
15+
export type MemoryLeakResult = {
16+
settings: MemoryLeakSettings
17+
totalIterations: number
18+
timeLimitCount: number
19+
rssSamplesByIterationKb: Array<number | null>
20+
postWarmupSamplesKb: number[]
21+
slopeKbPerIteration: number
22+
totalGrowthMb: number
23+
passesThresholds: boolean
24+
}
25+
26+
const decoder = new TextDecoder()
27+
28+
export function getRssKb(pid: number): number | null {
29+
try {
30+
const raw = execSync(`ps -o rss= -p ${pid}`, { encoding: 'utf8' }).trim()
31+
const kb = parseInt(raw, 10)
32+
return Number.isFinite(kb) ? kb : null
33+
} catch {
34+
return null
35+
}
36+
}
37+
38+
/** Least-squares slope: KB growth per iteration. */
39+
export function linearRegressionSlope(ys: number[]): number {
40+
const n = ys.length
41+
if (n < 2) return 0
42+
let sumX = 0,
43+
sumY = 0,
44+
sumXY = 0,
45+
sumXX = 0
46+
for (let i = 0; i < n; i++) {
47+
sumX += i
48+
sumY += ys[i]
49+
sumXY += i * ys[i]
50+
sumXX += i * i
51+
}
52+
return (n * sumXY - sumX * sumY) / (n * sumXX - sumX * sumX)
53+
}
54+
55+
export async function drainNdjsonResponse(res: Response): Promise<unknown[]> {
56+
const reader = res.body?.getReader()
57+
if (!reader) return []
58+
59+
let buffer = ''
60+
while (true) {
61+
const { done, value } = await reader.read()
62+
if (done) break
63+
buffer += decoder.decode(value, { stream: true })
64+
}
65+
buffer += decoder.decode()
66+
67+
const messages: unknown[] = []
68+
for (const line of buffer.split('\n')) {
69+
const trimmed = line.trim()
70+
if (!trimmed) continue
71+
messages.push(JSON.parse(trimmed))
72+
}
73+
return messages
74+
}
75+
76+
export function hasTimeLimitEof(messages: unknown[]): boolean {
77+
return messages.some((message) => {
78+
if (!message || typeof message !== 'object') return false
79+
if (!('type' in message) || message.type !== 'eof') return false
80+
if (!('eof' in message) || !message.eof || typeof message.eof !== 'object') return false
81+
return 'reason' in message.eof && message.eof.reason === 'time_limit'
82+
})
83+
}
84+
85+
export async function runMemoryLeakDetector(opts: {
86+
pid: number
87+
settings: MemoryLeakSettings
88+
iterate: (iteration: number) => Promise<MemoryLeakIterationResult>
89+
}): Promise<MemoryLeakResult> {
90+
const { pid, settings, iterate } = opts
91+
const totalIterations = settings.warmupIterations + settings.testIterations
92+
const rssSamplesByIterationKb: Array<number | null> = []
93+
let timeLimitCount = 0
94+
95+
for (let i = 0; i < totalIterations; i++) {
96+
const { sawTimeLimit } = await iterate(i)
97+
if (sawTimeLimit) timeLimitCount++
98+
99+
await new Promise((resolve) => setTimeout(resolve, settings.settleMs))
100+
101+
rssSamplesByIterationKb.push(getRssKb(pid))
102+
}
103+
104+
const postWarmupSamplesKb = rssSamplesByIterationKb
105+
.slice(settings.warmupIterations)
106+
.filter((value): value is number => value !== null)
107+
108+
const slopeKbPerIteration = linearRegressionSlope(postWarmupSamplesKb)
109+
const totalGrowthMb =
110+
postWarmupSamplesKb.length >= 2
111+
? (postWarmupSamplesKb[postWarmupSamplesKb.length - 1] - postWarmupSamplesKb[0]) / 1024
112+
: 0
113+
114+
return {
115+
settings,
116+
totalIterations,
117+
timeLimitCount,
118+
rssSamplesByIterationKb,
119+
postWarmupSamplesKb,
120+
slopeKbPerIteration,
121+
totalGrowthMb,
122+
passesThresholds:
123+
slopeKbPerIteration < settings.slopeThresholdKb &&
124+
totalGrowthMb < settings.growthThresholdMb,
125+
}
126+
}
127+
128+
export function formatRssSamplesTable(result: MemoryLeakResult): string {
129+
const lines = [' RSS samples (MB):', ' iter │ RSS (MB) │ delta', ' ──────┼────────────┼────────']
130+
131+
for (let i = 0; i < result.rssSamplesByIterationKb.length; i++) {
132+
const current = result.rssSamplesByIterationKb[i]
133+
const previous = i > 0 ? result.rssSamplesByIterationKb[i - 1] : null
134+
const mb = current === null ? ' n/a' : (current / 1024).toFixed(1).padStart(8)
135+
const delta =
136+
current === null || previous === null
137+
? ' n/a'
138+
: (((current - previous) / 1024).toFixed(1)).padStart(6)
139+
const marker = i + 1 === result.settings.warmupIterations + 1 ? ' ← warmup end' : ''
140+
lines.push(` ${String(i + 1).padStart(4)}${mb}${delta}${marker}`)
141+
}
142+
143+
return lines.join('\n')
144+
}
145+
146+
export function formatMemoryLeakSummary(result: MemoryLeakResult): string {
147+
const baseline = result.postWarmupSamplesKb[0]
148+
const final = result.postWarmupSamplesKb[result.postWarmupSamplesKb.length - 1]
149+
150+
return [
151+
` Canary: ${result.timeLimitCount}/${result.totalIterations} windows ended by time_limit (${((result.timeLimitCount / result.totalIterations) * 100).toFixed(0)}%)`,
152+
'',
153+
' Post-warmup analysis:',
154+
` Baseline RSS: ${baseline === undefined ? 'n/a' : (baseline / 1024).toFixed(1) + ' MB'}`,
155+
` Final RSS: ${final === undefined ? 'n/a' : (final / 1024).toFixed(1) + ' MB'}`,
156+
` Total growth: ${result.totalGrowthMb.toFixed(1)} MB`,
157+
` Slope: ${result.slopeKbPerIteration.toFixed(1)} KB/iteration`,
158+
].join('\n')
159+
}

0 commit comments

Comments
 (0)