Skip to content

Commit c609850

Browse files
fix(engine): real back-pressure in StreamingEncoder.writeFrame (#1372)
writeFrame returned the stdin.write boolean synchronously; when FFmpeg encoded slower than workers captured, Node's writable buffer grew without bound (multi-worker worst case ~80GB over a 1h render) until the kernel OOM-killed the process. writeFrame is now async: a buffered write awaits the drain event before resolving, so back-pressure propagates through the frame reorder buffer to the capture loops and in-flight frames stay bounded. Inactivity-timer semantics are preserved: no reset before drain, so a hung FFmpeg still trips SIGTERM. The drain wait races one-shot drain/close listeners (aborted in a finally) rather than chaining onto the shared exit promise — V8 retains reaction-list entries on unsettled promises, so per-frame .then chains would accumulate ~108K closures over a 1h back-pressured render. An exit-status re-check after listener attachment closes the close-before-attach hang window. All five writeFrame call sites (streaming stage and HDR loops) check the result via a shared ensureFrameWritten guard and stop the render with a frame-indexed error when the encoder is gone instead of discarding the boolean. The MULTI_WORKER_MAX_DURATION_SECONDS cap can be relaxed in a follow-up now that buffering is bounded. Fixes #1353
1 parent 5917c03 commit c609850

6 files changed

Lines changed: 247 additions & 31 deletions

File tree

packages/engine/src/services/streamingEncoder.test.ts

Lines changed: 156 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,20 @@ const baseOptions: StreamingEncoderOptions = {
418418
useGpu: false,
419419
};
420420

421+
async function resolveWithin<T>(promise: Promise<T>, ms = 100): Promise<T | "timeout"> {
422+
let timeout: ReturnType<typeof setTimeout> | undefined;
423+
try {
424+
return await Promise.race([
425+
promise,
426+
new Promise<"timeout">((resolve) => {
427+
timeout = setTimeout(() => resolve("timeout"), ms);
428+
}),
429+
]);
430+
} finally {
431+
if (timeout) clearTimeout(timeout);
432+
}
433+
}
434+
421435
describe("spawnStreamingEncoder lifecycle and cleanup", () => {
422436
afterEach(() => {
423437
vi.resetModules();
@@ -556,7 +570,7 @@ describe("spawnStreamingEncoder lifecycle and cleanup", () => {
556570
const dir = mkdtempSync(join(tmpdir(), "se-writefail-"));
557571
const encoder = await spawnStreamingEncoder(join(dir, "out.mp4"), baseOptions);
558572

559-
expect(encoder.writeFrame(Buffer.from([0]))).toBe(true);
573+
expect(await encoder.writeFrame(Buffer.from([0]))).toBe(true);
560574

561575
const proc = calls[0]!.proc;
562576
await new Promise<void>((resolve) => {
@@ -566,7 +580,136 @@ describe("spawnStreamingEncoder lifecycle and cleanup", () => {
566580
});
567581
});
568582

569-
expect(encoder.writeFrame(Buffer.from([0]))).toBe(false);
583+
expect(await encoder.writeFrame(Buffer.from([0]))).toBe(false);
584+
});
585+
586+
it("writeFrame waits for stdin drain when FFmpeg applies back-pressure", async () => {
587+
const { spawn, calls } = createSpawnSpy();
588+
vi.resetModules();
589+
vi.doMock("child_process", () => ({ spawn }));
590+
591+
const { spawnStreamingEncoder } = await import("./streamingEncoder.js");
592+
const dir = mkdtempSync(join(tmpdir(), "se-drain-"));
593+
const encoder = await spawnStreamingEncoder(join(dir, "out.mp4"), baseOptions);
594+
595+
const proc = calls[0]!.proc;
596+
proc.stdin.write = (_chunk: Buffer): boolean => false;
597+
598+
const writeResult = encoder.writeFrame(Buffer.from([1])) as unknown;
599+
expect(writeResult).toBeInstanceOf(Promise);
600+
601+
const writePromise = writeResult as Promise<boolean>;
602+
let settled = false;
603+
void writePromise.then(() => {
604+
settled = true;
605+
});
606+
607+
await Promise.resolve();
608+
expect(settled).toBe(false);
609+
expect(proc.stdin.listenerCount("drain")).toBe(1);
610+
611+
proc.stdin.emit("drain");
612+
613+
await expect(writePromise).resolves.toBe(true);
614+
expect(settled).toBe(true);
615+
expect(proc.stdin.listenerCount("drain")).toBe(0);
616+
617+
process.nextTick(() => proc.emit("close", 0));
618+
await encoder.close();
619+
});
620+
621+
it("does not accumulate process close listeners across repeated back-pressured writes", async () => {
622+
const { spawn, calls } = createSpawnSpy();
623+
vi.resetModules();
624+
vi.doMock("child_process", () => ({ spawn }));
625+
626+
const { spawnStreamingEncoder } = await import("./streamingEncoder.js");
627+
const dir = mkdtempSync(join(tmpdir(), "se-drain-listeners-"));
628+
const encoder = await spawnStreamingEncoder(join(dir, "out.mp4"), baseOptions);
629+
630+
const proc = calls[0]!.proc;
631+
const baselineCloseListeners = proc.listenerCount("close");
632+
const baselineDrainListeners = proc.stdin.listenerCount("drain");
633+
proc.stdin.write = (_chunk: Buffer): boolean => false;
634+
635+
for (let i = 0; i < 12; i++) {
636+
const writePromise = encoder.writeFrame(Buffer.from([i]));
637+
638+
await Promise.resolve();
639+
expect(proc.stdin.listenerCount("drain")).toBe(baselineDrainListeners + 1);
640+
expect(proc.listenerCount("close")).toBe(baselineCloseListeners + 1);
641+
642+
proc.stdin.emit("drain");
643+
644+
await expect(writePromise).resolves.toBe(true);
645+
expect(proc.stdin.listenerCount("drain")).toBe(baselineDrainListeners);
646+
expect(proc.listenerCount("close")).toBe(baselineCloseListeners);
647+
}
648+
649+
process.nextTick(() => proc.emit("close", 0));
650+
await encoder.close();
651+
});
652+
653+
it("writeFrame resolves false instead of hanging when FFmpeg exits before drain", async () => {
654+
const { spawn, calls } = createSpawnSpy();
655+
vi.resetModules();
656+
vi.doMock("child_process", () => ({ spawn }));
657+
658+
const { spawnStreamingEncoder } = await import("./streamingEncoder.js");
659+
const dir = mkdtempSync(join(tmpdir(), "se-drain-exit-"));
660+
const encoder = await spawnStreamingEncoder(join(dir, "out.mp4"), baseOptions);
661+
662+
const proc = calls[0]!.proc;
663+
proc.stdin.write = (_chunk: Buffer): boolean => false;
664+
665+
const writeResult = encoder.writeFrame(Buffer.from([1])) as unknown;
666+
expect(writeResult).toBeInstanceOf(Promise);
667+
668+
const writePromise = writeResult as Promise<boolean>;
669+
let settled = false;
670+
void writePromise.then(() => {
671+
settled = true;
672+
});
673+
674+
await Promise.resolve();
675+
expect(settled).toBe(false);
676+
expect(proc.stdin.listenerCount("drain")).toBe(1);
677+
678+
proc.emit("close", 1);
679+
680+
await expect(writePromise).resolves.toBe(false);
681+
expect(settled).toBe(true);
682+
expect(proc.stdin.listenerCount("drain")).toBe(0);
683+
684+
const result = await encoder.close();
685+
expect(result.success).toBe(false);
686+
});
687+
688+
it("writeFrame resolves false when close fires after write returns false before await attaches listeners", async () => {
689+
const { spawn, calls } = createSpawnSpy();
690+
vi.resetModules();
691+
vi.doMock("child_process", () => ({ spawn }));
692+
693+
const { spawnStreamingEncoder } = await import("./streamingEncoder.js");
694+
const dir = mkdtempSync(join(tmpdir(), "se-drain-already-closed-"));
695+
const encoder = await spawnStreamingEncoder(join(dir, "out.mp4"), baseOptions);
696+
697+
const proc = calls[0]!.proc;
698+
const baselineCloseListeners = proc.listenerCount("close");
699+
proc.stdin.write = (_chunk: Buffer): boolean => {
700+
proc.emit("close", 1);
701+
return false;
702+
};
703+
704+
const writePromise = encoder.writeFrame(Buffer.from([1]));
705+
706+
await expect(resolveWithin(writePromise)).resolves.toBe(false);
707+
expect(encoder.getExitStatus()).toBe("error");
708+
expect(proc.stdin.listenerCount("drain")).toBe(0);
709+
expect(proc.listenerCount("close")).toBe(baselineCloseListeners);
710+
711+
const result = await encoder.close();
712+
expect(result.success).toBe(false);
570713
});
571714

572715
it("close() removes the abort listener so a post-close abort does not re-kill ffmpeg", async () => {
@@ -613,7 +756,7 @@ describe("spawnStreamingEncoder lifecycle and cleanup", () => {
613756
// progressing" capture the encoder must still be alive. The old total-
614757
// render timeout would have fired SIGTERM at ~1000ms.
615758
for (let i = 0; i < 9; i++) {
616-
encoder.writeFrame(Buffer.from([i]));
759+
await encoder.writeFrame(Buffer.from([i]));
617760
vi.advanceTimersByTime(900);
618761
}
619762
expect(proc.kill).not.toHaveBeenCalled();
@@ -647,14 +790,17 @@ describe("spawnStreamingEncoder lifecycle and cleanup", () => {
647790
const proc = calls[0]!.proc;
648791
proc.stdin.write = (_chunk: Buffer) => false;
649792

650-
// Pump 9 frames at 900ms intervals — all returning false. The reset
651-
// should NOT fire (every write was buffered, not accepted), so the
652-
// 1000ms timer (last reset on spawn) elapses near the start.
653-
for (let i = 0; i < 9; i++) {
654-
encoder.writeFrame(Buffer.from([i]));
655-
vi.advanceTimersByTime(900);
656-
}
793+
// A buffered write should remain pending and must NOT reset the timer.
794+
// The 1000ms timer (last reset on spawn) therefore elapses while the
795+
// caller is correctly back-pressured on the first frame.
796+
const writePromise = encoder.writeFrame(Buffer.from([0]));
797+
await Promise.resolve();
798+
799+
vi.advanceTimersByTime(1100);
657800
expect(proc.kill).toHaveBeenCalledWith("SIGTERM");
801+
802+
proc.emit("close", null);
803+
await expect(writePromise).resolves.toBe(false);
658804
} finally {
659805
vi.useRealTimers();
660806
}

packages/engine/src/services/streamingEncoder.ts

Lines changed: 70 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,11 @@
1010
* 1. Frame reorder buffer – ensures out-of-order parallel workers feed
1111
* frames to FFmpeg stdin in sequential order.
1212
* 2. Streaming FFmpeg encoder – spawns FFmpeg with `-f image2pipe` and
13-
* exposes a `writeFrame(buffer)` + `close()` API.
13+
* exposes an async `writeFrame(buffer)` + `close()` API.
1414
*/
1515

1616
import { spawn, type ChildProcess } from "child_process";
17+
import { once } from "events";
1718
import { trackChildProcess } from "../utils/processTracker.js";
1819
import { existsSync, mkdirSync, statSync } from "fs";
1920
import { dirname } from "path";
@@ -126,7 +127,14 @@ export interface StreamingEncoderResult {
126127
}
127128

128129
export interface StreamingEncoder {
129-
writeFrame: (buffer: Buffer) => boolean;
130+
/**
131+
* Write one frame to FFmpeg stdin, awaiting `drain` when the pipe is full
132+
* so back-pressure propagates to the caller. Resolves `false` when FFmpeg
133+
* is already gone. Callers must serialize calls — one in-flight writeFrame
134+
* per encoder (the frame reorder buffer provides this ordering); concurrent
135+
* calls would interleave frame bytes on the pipe and race the drain wait.
136+
*/
137+
writeFrame: (buffer: Buffer) => Promise<boolean>;
130138
close: () => Promise<StreamingEncoderResult>;
131139
getExitStatus: () => "running" | "success" | "error";
132140
}
@@ -448,9 +456,45 @@ export async function spawnStreamingEncoder(
448456
};
449457
resetTimer();
450458

459+
const waitForDrainOrExit = async (
460+
stdin: NonNullable<ChildProcess["stdin"]>,
461+
): Promise<"drain" | "exit"> => {
462+
// Back-pressure can hit once per frame. Do not race `exitPromise.then(...)`
463+
// here: V8 retains `.then` reaction-list entries on an unsettled promise,
464+
// so a one-hour 30fps render under steady back-pressure can accumulate
465+
// ~108K closures + AbortControllers. Use one-shot listeners for this write
466+
// instead, then abort them in finally. `close` is the event that flips
467+
// `exitStatus`; re-check after listener attachment so a close emitted
468+
// between `stdin.write(false)` and this await cannot hang forever.
469+
const abortController = new AbortController();
470+
try {
471+
const drainPromise = once(stdin, "drain", { signal: abortController.signal }).then(
472+
() => "drain" as const,
473+
);
474+
const closePromise = once(ffmpeg, "close", { signal: abortController.signal }).then(
475+
() => "exit" as const,
476+
);
477+
const racePromise = Promise.race([drainPromise, closePromise]).catch((err: unknown) => {
478+
if (err instanceof Error && err.name === "AbortError") {
479+
return "exit" as const;
480+
}
481+
throw err;
482+
});
483+
484+
if (exitStatus !== "running") {
485+
return "exit";
486+
}
487+
488+
return await racePromise;
489+
} finally {
490+
abortController.abort();
491+
}
492+
};
493+
451494
const encoder: StreamingEncoder = {
452-
writeFrame: (buffer: Buffer): boolean => {
453-
if (exitStatus !== "running" || !ffmpeg.stdin || ffmpeg.stdin.destroyed) {
495+
writeFrame: async (buffer: Buffer): Promise<boolean> => {
496+
const stdin = ffmpeg.stdin;
497+
if (exitStatus !== "running" || !stdin || stdin.destroyed) {
454498
return false;
455499
}
456500
// Copy the buffer before writing — Node streams hold a reference to the
@@ -459,18 +503,28 @@ export async function spawnStreamingEncoder(
459503
// so without this copy the pipe would read partially-overwritten data
460504
// and flicker.
461505
const copy = Buffer.from(buffer);
462-
const accepted = ffmpeg.stdin.write(copy);
463-
// Reset inactivity timer ONLY on `accepted === true`. `true` means the
464-
// write went through to the kernel pipe without buffering in Node —
465-
// proof FFmpeg is actually consuming. `false` means Node's writable
466-
// stream had to buffer (FFmpeg hasn't drained the pipe yet); we deliberately
467-
// don't reset on `false` so a hung FFmpeg with a still-producing Chrome
468-
// can't keep us alive forever while Node's stdin buffer grows to OOM. In
469-
// steady state with a slower-but-alive FFmpeg, writes alternate between
470-
// true and false as the buffer drains and refills; the trues are enough
471-
// to keep the heartbeat ticking.
472-
if (accepted) resetTimer();
473-
return accepted;
506+
const accepted = stdin.write(copy);
507+
// Reset inactivity timer immediately ONLY on `accepted === true`. `true`
508+
// means the write went through to the kernel pipe without buffering in
509+
// Node — proof FFmpeg is actually consuming. `false` means Node's writable
510+
// stream had to buffer (FFmpeg hasn't drained the pipe yet); we await
511+
// `drain` before letting callers produce the next frame, and only reset
512+
// after drain proves consumption. We deliberately don't reset before
513+
// drain so a hung FFmpeg with a still-producing Chrome can't keep us
514+
// alive forever while Node's stdin buffer grows to OOM. If FFmpeg exits
515+
// before draining, waitForDrainOrExit returns "exit", removes its
516+
// one-shot listeners, and callers see `false` instead of hanging.
517+
if (accepted) {
518+
resetTimer();
519+
return true;
520+
}
521+
522+
const drainResult = await waitForDrainOrExit(stdin);
523+
if (drainResult !== "drain" || exitStatus !== "running") {
524+
return false;
525+
}
526+
resetTimer();
527+
return true;
474528
},
475529

476530
close: async (): Promise<StreamingEncoderResult> => {

packages/producer/src/services/render/stages/captureHdrFrameShared.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,19 @@ export async function captureTransitionFrameOnWorker(
301301
}
302302
}
303303

304+
// ─── Streaming-encoder write guard ──────────────────────────────────────────
305+
306+
/**
307+
* Streaming-encoder writes report `false` when FFmpeg is already gone.
308+
* Continuing to capture into a dead encoder wastes the rest of the render,
309+
* so every frame loop stops with a frame-indexed error instead.
310+
*/
311+
export function ensureFrameWritten(frameWritten: boolean, frameIndex: number): void {
312+
if (!frameWritten) {
313+
throw new Error(`Streaming encoder exited before frame ${frameIndex} was written`);
314+
}
315+
}
316+
304317
// ─── HDR video raw-frame cleanup (sequential path only) ────────────────────
305318

306319
export function cleanupEndedHdrVideos(args: {

packages/producer/src/services/render/stages/captureHdrHybridLoop.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ import {
5454
type LayeredTransitionBuffers,
5555
captureTransitionFrameOnWorker,
5656
distributeLayeredHybridFrameRanges,
57+
ensureFrameWritten,
5758
partitionTransitionFrames,
5859
} from "./captureHdrFrameShared.js";
5960
import { updateJobStatus } from "../shared.js";
@@ -185,7 +186,7 @@ export async function runHybridLayeredFrameLoop(input: HybridLoopInput): Promise
185186
const writeEncoded = async (frameIdx: number, buf: Buffer): Promise<void> => {
186187
await reorderBuffer.waitForFrame(frameIdx);
187188
const writeStart = Date.now();
188-
hdrEncoder.writeFrame(buf);
189+
ensureFrameWritten(await hdrEncoder.writeFrame(buf), frameIdx);
189190
addHdrTiming(hdrPerf, "encoderWriteMs", writeStart);
190191
reorderBuffer.advanceTo(frameIdx + 1);
191192
framesWritten += 1;

packages/producer/src/services/render/stages/captureHdrSequentialLoop.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import { writeFileExclusiveSync } from "../shared.js";
3434
import {
3535
captureSceneIntoBuffer,
3636
cleanupEndedHdrVideos,
37+
ensureFrameWritten,
3738
type LayeredTransitionBuffers,
3839
} from "./captureHdrFrameShared.js";
3940
import { updateJobStatus } from "../shared.js";
@@ -189,7 +190,7 @@ export async function runSequentialLayeredFrameLoop(input: SequentialLoopInput):
189190
);
190191
addHdrTiming(hdrPerf, "transitionCompositeMs", transitionTimingStart);
191192
timingStart = Date.now();
192-
hdrEncoder.writeFrame(transitionBuffers.output);
193+
ensureFrameWritten(await hdrEncoder.writeFrame(transitionBuffers.output), i);
193194
addHdrTiming(hdrPerf, "encoderWriteMs", timingStart);
194195
} else {
195196
if (hdrPerf) hdrPerf.normalFrames += 1;
@@ -206,7 +207,7 @@ export async function runSequentialLayeredFrameLoop(input: SequentialLoopInput):
206207
);
207208
}
208209
timingStart = Date.now();
209-
hdrEncoder.writeFrame(normalCanvas);
210+
ensureFrameWritten(await hdrEncoder.writeFrame(normalCanvas), i);
210211
addHdrTiming(hdrPerf, "encoderWriteMs", timingStart);
211212
}
212213

0 commit comments

Comments
 (0)