feat(engine/producer): enable multi-worker streaming encode via interleaved frame distribution#1351
feat(engine/producer): enable multi-worker streaming encode via interleaved frame distribution#1351Claudemeri wants to merge 5 commits into
Conversation
…leaved frame distribution Parallel captures previously disabled streaming encode (`shouldUseStreamingEncode` returned `workerCount === 1`). The infrastructure for multi-worker streaming was already present in `captureStreamingStage` and `FrameReorderBuffer`, but couldn't be reached. The blocker was the frame-distribution strategy: with contiguous chunk distribution, worker 1 blocks at its first frame until worker 0 finishes its entire chunk — collapsing N parallel workers down to 1 for the streaming path. Fix: add `distributeFramesInterleaved` in `parallelCoordinator`, which assigns frames i, i+N, i+2N, ... to worker i (stride = workerCount). All workers then advance through the timeline in lockstep; the `FrameReorderBuffer` finds the next cursor value ready (or nearly ready) on every write, keeping contention near zero. Use `distributeFramesInterleaved` in `captureStreamingStage` for the parallel branch. Remove the `workerCount === 1` gate in `shouldUseStreamingEncode`. Also lift the `streamingEncodeMaxDurationSeconds` duration cap for multi-worker renders: the cap guards against back-pressure accumulating when a single capture+encode pipeline runs long. With interleaved distribution the in-flight buffer is bounded by `workerCount` (not composition length), so the cap is unnecessary there. Result: `--workers N` now enables both parallel capture AND streaming encode for all composition lengths. Previously, `--workers 2` on a short comp was ~8× slower than `--workers 1` (streaming disabled → disk path); with this change it is ~2× faster. For long compositions that exceeded `streamingEncodeMaxDurationSeconds`, multi-worker streaming now also applies, eliminating the separate disk I/O phase.
jrusso1020
left a comment
There was a problem hiding this comment.
Reviewed the full pipeline: distributeFramesInterleaved → captureFrameRange (with the stride loop) → onFrameBuffer → FrameReorderBuffer.waitForFrame/advanceTo → StreamingEncoder.writeFrame → ffmpeg.stdin.write. The interleaved distribution is the right answer for the capture side, and the "why don't we do this already" answer is in the diff itself — the multi-worker branch in captureStreamingStage was scaffolded but unreachable because shouldUseStreamingEncode returned false for workerCount > 1. Clean unlock there.
Approve the architecture; want one concrete change before ship.
Important — removing the duration cap for multi-worker is unsafe on long comps
The PR's framing is "in-flight frame buffers are bounded by workerCount" — that's true for the capture-to-reorder-buffer side (each worker holds at most one captured frame, awaits its turn at the reorder buffer, then moves on). It is not true for the reorder-buffer-to-FFmpeg side.
Walk through StreamingEncoder.writeFrame at packages/engine/src/services/streamingEncoder.ts:450-471:
const accepted = ffmpeg.stdin.write(copy);
if (accepted) resetTimer();
return accepted;writeFrame does not await drain when accepted === false. The caller (onFrameBuffer) doesn't await either. So when FFmpeg is slower than total capture throughput, Node.js's writable-stream internal buffer grows unboundedly — Node respects highWaterMark for the signal (returns false) but doesn't actually refuse the write. The streamingTimeout inactivity timer is the only guardrail, and it only fires after the entire encoder stalls, not on creep.
Concrete numbers for the worst case the PR's test now enables (workerCount=3, durationSeconds=3600 → true):
- 3-worker capture: ~3 × 35 = 105 fps
- FFmpeg h264 medium: ~60 fps
- Net backlog: ~45 fps × ~500 KB/frame × 3600 s ≈ 80 GB of unbounded Node buffer growth
That'll OOM the producer pod well before the 1-hour render finishes. The cap was the only thing preventing this for single-worker; removing it entirely for multi-worker makes the risk worse, not better, because total capture throughput is higher.
Pick one:
- Keep a (more generous) cap for multi-worker. Halfway between current 240s and removing it. Suggest 1800s — long enough to unblock the real cases (the 548s example in the PR description) but short enough to bound the OOM-territory worst case.
- Implement real back-pressure in
writeFrame. Whenaccepted === false, return aPromise<void>that awaits thedrainevent before resolving. ThenonFrameBufferactually blocks workers when FFmpeg is behind. Real fix, slightly bigger blast radius (writeFrame signature change). - Document the trade-off explicitly + rely on
streamingTimeoutas the OOM-prevention net. Note in the code comment that uncapped multi-worker streaming assumes encode ≥ total capture throughput, and that violating this risks OOM (mitigated by streamingTimeout SIGTERM). Lowest-effort but leaves the failure mode obscured.
Strong preference for (1) for this PR; (2) is the right long-term fix worth a follow-up.
Minor — sharpen the in-code comment on shouldUseStreamingEncode
Current comment says "in-flight frame buffers are bounded by workerCount (not composition length)". A future reader could read this as the total pipeline being bounded, which it isn't. Suggest:
// Capture-side: workers serialize through FrameReorderBuffer, so at most
// `workerCount` captured frames are in flight at any moment. The encoder-side
// buffer (Node stdin → FFmpeg) is NOT explicitly bounded and relies on FFmpeg
// keeping up with `workerCount × per-worker-fps`. Long comps + slow encode
// can still grow Node's internal write buffer — see streamingEncodeMaxDurationSeconds
// for the single-worker safeguard.Architectural correctness — verified
distributeFramesInterleavedmath: worker i → framesi, i+N, i+2N, …(where N=workerCount). Loop guardi < workerCount && i < totalFramescorrectly handles theworkerCount > totalFramesedge case (fewer workers spun up; each remaining worker still produces one frame).captureFrameRangestride:for (i = startFrame; i < endFrame; i += stride)withstartFrame=i, endFrame=totalFrames, stride=Nproduces exactly the interleaved sequence.FrameReorderBufferinteraction: cursor advances by 1 per write; each worker awaitswaitForFrame(i)before its write and callsadvanceTo(i+1)after. With interleaved distribution, the next frame's owner (a different worker) is almost-immediately ready — vs contiguous, where worker 1 waits for all of worker 0's chunk before any of its writes can land. That's a real-deal correctness reason for the change.outputFrameOffsetinteraction: streaming path doesn't write per-worker disk files (nofileFrameIdxcollision), and disk path keepsdistributeFrames(contiguous), somergeWorkerFramesis unaffected. ✓executeWorkerTaskseparate-browser gating (needsSeparateBrowsers) for multi-worker on Linux + headless-shell is unchanged. ✓
Tests — accurate but should add the edge case
The shouldUseStreamingEncode test changes faithfully describe the new gates. Missing:
- Unit test for
distributeFramesInterleaveditself (assert worker i gets[i, i+N, i+2N, …], and theworkerCount > totalFramesguard works). Pure function; trivial test; locks in the contract.
Why we don't do this already (your question, James)
The multi-worker streaming branch in captureStreamingStage was already implemented — it has the workerCount > 1 block with FrameReorderBuffer and executeParallelCapture(..., onFrameBuffer, ...) wired up. It was unreachable because:
shouldUseStreamingEncodehardcodedreturn workerCount === 1.- The only frame distribution available was
distributeFrames(contiguous chunks). If you'd lifted the gate but kept contiguous distribution, the streaming path would collapse to effectively single-worker (worker 1 blocks on the reorder buffer until worker 0 finishes its whole chunk).
So the answer is: someone scaffolded multi-worker streaming, hit the contiguous-chunk pathology, gated the path off, and never wrote the interleaved distribution that would make it work. This PR is exactly the missing piece. Clean architectural win — modulo the encoder-backlog concern above.
Summary
REQUEST_CHANGES for the cap-removal concern (pick option 1 minimum; encoder-side buffer is not bounded by workerCount). All other findings are nits. Ship after.
Review by Jerrai
…n buffer growth Per reviewer feedback on heygen-com#1351: removing the duration cap entirely for multi-worker is unsafe. `StreamingEncoder.writeFrame` calls `ffmpeg.stdin.write(copy)` without awaiting drain when `accepted === false` — Node's writable-stream buffer grows without bound if FFmpeg encodes slower than workers capture. Worst case (3 workers, 1hr comp): ~80 GB buffer growth → OOM before the render finishes. Fix: apply a fixed 1800s cap for multi-worker (independent of the single-worker config value). 1800s is 3× the longest known practical composition (~548s), giving real-world headroom while bounding worst-case buffer growth to a tolerable level. Compositions >1800s fall back to the disk-capture path. Also tighten the `shouldUseStreamingEncode` comment: "in-flight frame buffers are bounded by workerCount" was misleadingly broad — that bound applies only to the capture→reorder side, not to the reorder→FFmpeg stdin buffer. The comment now calls out both sides explicitly. The durable fix (real back-pressure: await drain in `writeFrame` when `accepted === false`) is tracked as a follow-up issue.
|
Thanks @jrusso1020 — the OOM analysis is correct and I should have caught it. Took option (1): latest commit caps multi-worker streaming at a fixed 1800s ( For the durable fix (option 2 — real |
…r's comment verbatim
- Add `describe("distributeFramesInterleaved")` in parallelCoordinator.test.ts
covering: stride assignment (worker i gets frames [i, i+N, i+2N, …]),
full-coverage invariant for various (total, workers) pairs, workerCount >
totalFrames guard, single-worker degenerate case, output-dir naming.
- Replace shouldUseStreamingEncode comment with the verbatim text from reviewer
@jrusso1020: cleanly distinguishes capture-side (bounded by workerCount via
FrameReorderBuffer) from encoder-side (NOT explicitly bounded; relies on
FFmpeg keeping up). References streamingEncodeMaxDurationSeconds as the
single-worker safeguard.
|
Added in the latest commit:
All three pieces from the review are addressed. Let me know if anything else needs adjusting. |
jrusso1020
left a comment
There was a problem hiding this comment.
All three findings cleanly addressed. Verified the final diff:
MULTI_WORKER_MAX_DURATION_SECONDS = 1800lands atrenderOrchestrator.ts:1452;maxDurationternary correctly picks per-worker-count cap.- Comment now distinguishes capture→reorder bound (workerCount-bounded) from reorder→FFmpeg buffer (unbounded). Stale 720s draft from the first iteration was removed.
distributeFramesInterleavedunit tests added — stride assignment, full-coverage invariant,workerCount > totalFramesguard, single-worker degenerate, output-dir naming.- Follow-up issue #1353 filed for the durable
await drainfix.
Ship it.
Review by Jerrai
There was a problem hiding this comment.
Re-opening — my prior approval missed an interaction with the existing ffmpegStreamingTimeout. Holding the merge.
The concern
ffmpegStreamingTimeout (default 600_000ms = 10 min, at packages/engine/src/services/streamingEncoder.ts:437) is an inactivity timer that resets ONLY on ffmpeg.stdin.write === true (line 470). When FFmpeg falls behind and writes keep returning false (data buffering in Node), the timer doesn't reset → SIGTERM at the 10-min mark. The 1800s cap prevents indefinite buffer growth (OOM) but doesn't prevent the inactivity timer from firing mid-render and failing the encode.
Multi-worker capture has higher net capture-vs-encode delta than single-worker — FFmpeg encodes single-threaded; capture scales with N. So multi-worker is MORE likely to produce sustained false-writes, not less. The cap should arguably be tighter for multi-worker, not more generous.
Ask
Two paths, either works:
- Test with long renders before merging. A 1500s+ multi-worker render under realistic CPU contention (encode competing with capture) — does the inactivity timer fire? If it doesn't, the 1800s cap is defensible empirically. If it does, the failure mode is real and the cap needs tightening.
- Tighten the cap conservatively now. Something in the 120-300s range gives the new fast path real-world headroom (covers the cited 548s longest comp? — actually no, 548s > 300s, so this option pushes that workload to disk-capture). If preserving the 548s case matters, do (1).
The durable fix is still the await drain back-pressure (the follow-up issue #1353 you filed). Once that lands, the inactivity timer can't fire from buffer-growth alone because workers actually block. Until then, the cap is the only safety net for both OOM AND timeout.
Happy to take #1 ourselves if it'd unblock you — we have the infrastructure to spin a long multi-worker render locally + monitor accepted=true/false ratio.
Review by Jerrai
|
Thanks for pulling the approval and sharing the We're evaluating two tracks in parallel:
Will report back within 24h on which path lands. Thanks for the |
Makes writeFrame async so capture workers naturally throttle to FFmpeg's encode throughput instead of growing Node's stdin buffer unboundedly on long high-worker-count renders. Also ensures ffmpegStreamingTimeout resets promptly after a drain (next write returns true) rather than firing on extended back-to-back false writes from a slow-but-alive FFmpeg. Fixes the second review concern on PR heygen-com#1351 raised by @jrusso1020. Addresses issue heygen-com#1353. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
The durable back-pressure fix from #1353 is now in this PR (commit
All 5 call sites updated to |
jrusso1020
left a comment
There was a problem hiding this comment.
R3 — APPROVE. The new await drain commit (a060e47a) is the right durable fix.
Verified end-to-end:
writeFramesignature →Promise<boolean>. Both production callers incaptureStreamingStage.ts(single-worker line 270, multi-worker line 202) correctlyawait. Test file updated to match.- Drain wait listens on
drain/finish/errorwithremoveListenercleanup on all three — no listener accumulation. if (exitStatus !== "running") return false;after the drain wait handles encoder-death-mid-wait cleanly.- Inactivity timer behavior: only resets on accepted writes. During a long drain wait the timer ticks normally → SIGTERM still fires if FFmpeg is genuinely hung past
ffmpegStreamingTimeout. That's the right shape: drain unblocks fast normally, SIGTERM is the safety net for true hangs. - 1800s multi-worker cap kept as belt-and-suspenders. With real back-pressure in place, the cap is now redundant for OOM-prevention but still a useful guard against pathological renders. Fine to keep.
One real-world spot check: ran multi-worker streaming on a real composition (hyperframes-launch, 49.77s @ 60fps = 2987 frames, --workers 2, h264). Clean exit (exit=0), 186s wall, 31.3 MB output. Streaming-encode gate engaged correctly. No SIGTERM, no buffer-growth issues.
Ship it.
Review by Jerrai
miguel-heygen
left a comment
There was a problem hiding this comment.
Review
Verdict: real fix for a real problem, needs additional test coverage before merge
What it does
Two changes bundled in one PR:
-
Back-pressure in
writeFrame(the #1353 fix): ChangeswriteFramefrom syncboolean→ asyncPromise<boolean>and awaitsdrainwhenffmpeg.stdin.write()returnsfalse. This is the correct fix — currently every caller ignores the sync return value (currentEncoder.writeFrame(buffer)with noawaitor return check), so Node's internal buffer grows without bound when FFmpeg encodes slower than workers capture. The math in #1353 is accurate: 3 workers × ~35fps surplus × ~500KB/frame → OOM on long renders. -
Interleaved frame distribution: Adds
distributeFramesInterleavedwhere workericaptures framesi, i+N, i+2N, …instead of contiguous chunks. This keeps all workers in lockstep so theFrameReorderBufferstays nearly uncontended. With contiguous chunks, worker 1 blocks at frame 0 until worker 0 finishes its entire chunk — defeating multi-worker parallelism on the streaming path. Good optimization.
Code quality
- The
drain/finish/errorlistener triple in the back-pressure await is correct — handles all stdin termination cases without leaking listeners exitStatusre-check after drain await correctly handles the race where FFmpeg dies while we're waitingcaptureFrameRangestride loop is clean:for (let i = task.startFrame; i < task.endFrame; i += stride)- All existing callers (
captureStreamingStage,captureHdrHybridLoop,captureHdrSequentialLoop) are updated toawaitthe now-asyncwriteFrame
Test gaps
The tests cover interleaved distribution thoroughly (5 tests) and the shouldUseStreamingEncode gate changes. But the core back-pressure behavior has no dedicated test. This is the most critical change in the PR.
Missing tests that should be added:
-
Back-pressure blocks caller until drain — mock
ffmpeg.stdin.write()to returnfalse, verifywriteFrame()doesn't resolve untildrainis emitted, then emitdrainand verify it resolves withtrue. -
Back-pressure returns false on encoder death — mock
write()→false, then setexitStatus = 'error'and emitdrain. VerifywriteFrame()resolves withfalse. -
Back-pressure cleans up listeners on finish/error — verify that if
finishfires beforedrain, the promise resolves and nodrainlistener leaks. -
Heartbeat timer resets after drain — verify
resetTimer()is called after the drain await, not just on accepted writes. The current code inmainonly resets onaccepted === true, which means back-pressured writes never reset the timer → false timeout kills on slow-but-alive FFmpeg.
These can go in streamingEncoder.test.ts alongside the existing lifecycle tests.
Producer baseline tests
Not needed — the interleaved distribution delivers the same frames in the same order to FFmpeg (the reorder buffer ensures sequential ordering). The back-pressure only throttles write speed, not frame content. Visual output is identical.
Recommendation
The fix is correct and addresses a real OOM risk. Add the 4 back-pressure unit tests above before merging — this is a writeFrame signature change that touches every encode path, and the drain-await behavior is the part most likely to regress.
…drain path Four new tests covering the async drain behavior introduced in the previous commit: 1. blocks caller until drain is emitted (no spurious early resolve) 2. returns false when encoder dies while awaiting drain 3. cleans up all three listeners (drain/finish/error) when finish fires first 4. resets inactivity timer after drain — slow-but-alive FFmpeg never triggers SIGTERM Also clarifies the inline timer-reset comment to explicitly document the three-case policy: reset on accepted=true, no reset on immediate false, reset after drain. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
Thanks @miguel-heygen — four back-pressure tests added in commit 17c7ac6. On test #4 (timer reset):
Tests added (
Ready for re-review. |
Problem
Parallel renders (
--workers N) currently disable streaming encode entirely.shouldUseStreamingEncodegates toworkerCount === 1, so any multi-worker render falls back to disk capture: all frames written to disk, then a separate sequential FFmpeg encode.This means:
--workers 2are ~8× slower than--workers 1(measured: 1-worker streaming 83s vs 2-worker disk 681s for the same 49s composition).The infrastructure for multi-worker streaming was already present —
captureStreamingStagehas a fullworkerCount > 1branch usingFrameReorderBufferfor ordered writes. It was simply unreachable becauseshouldUseStreamingEncodeblocked it.Root cause
The blocker was the frame-distribution strategy. With contiguous chunk distribution (worker 0 gets frames 0–5000, worker 1 gets 5001–10000), the
FrameReorderBufferblocks worker 1 at its first frame until worker 0 finishes its entire chunk — collapsing N parallel captures to effectively 1 for the streaming path.Fix
1.
distributeFramesInterleaved— interleaved round-robin assignmentAdded to
packages/engine/src/services/parallelCoordinator.ts. Workericaptures framesi, i+N, i+2N, …whereN = workerCount. All workers advance in lockstep, so theFrameReorderBufferfinds the next cursor value ready (or nearly ready) on every write. Implemented via astride?: numberfield onWorkerTaskand a one-line loop change incaptureFrameRange.2.
captureStreamingStage— use interleaved distribution for streamingThe existing
workerCount > 1branch now callsdistributeFramesInterleavedinstead ofdistributeFrames. The disk-capture path is unaffected — it keeps contiguous chunks formergeWorkerFramescompatibility.3.
shouldUseStreamingEncode— lift theworkerCount === 1gateRemoved. Also refined the
streamingEncodeMaxDurationSecondsguard: it only applies toworkerCount === 1. With interleaved distribution, in-flight frame buffers are bounded byworkerCount(not composition length), so the cap is unnecessary for multi-worker renders.Test
Updated
shouldUseStreamingEncodeunit tests inrenderOrchestrator.test.ts:keeps png-sequence on the non-streaming path— unchanged behaviorenables streaming for multi-worker renders (parallel streaming)— new assertion:workerCount=2,4→trueapplies the duration cap to single-worker only; multi-worker streaming is uncapped— single-worker cap preserved; multi-worker works for comps >240sExpected impact
--workers 1(streaming)--workers 2--workers 2