Skip to content

Commit 22363a1

Browse files
authored
perf(distributed): parallelize chunk capture across multiple workers (#906)
* perf(distributed): parallelize chunk capture across multiple workers The distributed `renderChunk` primitive hardcoded `workerCount: 1` and `captureStage` explicitly forbade `workerCount > 1` when `frameRange` was set, with the comment: "Distributed chunk workers fan out at the activity layer; reduce workerCount to 1 when passing frameRange." The assumption was that orchestration-layer fan-out (Temporal / Lambda / K8s Jobs / SSH) saturates the available CPU on its own. In practice adopters that deploy chunks onto multi-core hosts (8-24 vCPU is the standard producer-worker pod sizing) end up pinning only ~3-4 cores per chunk while the rest sit idle: chunk-level fan-out at the orchestration layer gives each pod one chunk at a time, and the chunk render itself was single-threaded. Validated against a real 1080p / 30fps / 22-second shader-heavy composition on a 22-vCPU Temporal pod: each chunk rendered at 165-273ms per frame (vs 94-98ms for the in-process streaming render which runs `workerCount=2` by default). The slowest chunk gates total wall-clock under parallel chunk fan-out, so the 2-3x per-frame gap compounds and `distributed` was net-slower than `in-process` on every composition smaller than ~5min of texture-class content. Lifting the restriction is a measured ~2x per-chunk speedup with no contract change at the framesDir or encoder layer. Wire-up: * `WorkerTask.outputFrameOffset` — optional offset subtracted from the absolute frame index when computing the captured file's name. Default 0 (the in-process contract; file name == absolute index). Distributed chunks set this to the chunk's startFrame so file names land 0-indexed within the chunk's range, matching the sequential chunk-capture contract and the encoder's expectation that frames are read sequentially without an `-start_number` override. * `distributeFrames(totalFrames, workerCount, workDir, rangeStart=0)` — offsets both `startFrame`/`endFrame` (used for per-frame time math on the page's virtual clock) by `rangeStart`, and threads `outputFrameOffset = rangeStart` onto each task it emits. With the default `rangeStart=0` it is a no-op for in-process renders. * `executeWorkerTask` — uses `i - (task.outputFrameOffset ?? 0)` for the captured file name, leaving the per-frame TIME computation `(i * fps.den) / fps.num` untouched so the page's virtual clock is unchanged. * `executeDiskCaptureWithAdaptiveRetry({ frameRangeStart? })` — accepts the chunk's absolute startFrame and forwards it to `distributeFrames` and `buildMissingFrameRetryBatches`. Default `undefined` preserves the in-process contract. * `buildMissingFrameRetryBatches(ranges, ..., rangeStart=0)` — `findMissingFrameRanges` walks LOCAL 0-indexed file names; the retry batch translates the local missing-range pair back to ABSOLUTE composition indices for `WorkerTask.startFrame/endFrame` and sets `outputFrameOffset = rangeStart` so the retried capture writes back to the same local file name. * `captureStage` — drops the assert; passes `frameRangeStart: frameRange?.startFrame` to the parallel branch so workers land on absolute composition frame indices for time math while file names stay 0-indexed within the chunk range. Docstring updated to reflect that the parallel branch is now supported. * `renderChunk` — `workerCount: 1` → `workerCount: 2`. The pre-warmed `probeSession` is consumed only by the sequential branch; the parallel branch closes it during stage entry and creates its own worker sessions. Documented as a follow-up: skip probeSession creation when `workerCount > 1` to recover the ~3-5s warmup cost. Backwards compatibility: every change is gated on a parameter that defaults to the prior behavior. In-process callers (`executeRenderJob`) pass no `frameRangeStart`, so `rangeStart === 0`, `outputFrameOffset` defaults to 0, and the file-name math collapses to the prior `i` value. The framesDir contract (`frame_0..frame_(totalFrames-1)`) and the WorkerTask interface are extended, not replaced. Tests: 24 pass / 0 fail across the distributed test suite (renderChunk, plan, assemble, planFormatBanlist, planSizeCap, publicExports). 7 pass / 0 fail in `parallelCoordinator.test.ts`. The renderOrchestrator suite has one pre-existing Windows-only failure (`writeCompiledArtifacts — external assets on Windows drive-letter paths`) unrelated to this change; the other 56 tests pass. Refs: distributed-vs-inprocess benchmark thread at heygen-com/experiment-framework#36950 * perf(distributed): auto-size chunk workerCount via calculateOptimalWorkers Match the in-process renderer's worker selection instead of hardcoding 2. `calculateOptimalWorkers(framesInChunk, undefined, cfg)` is the same call `resolveRenderWorkerCount` makes under the hood, minus the capture-cost calibration reduction (which would require plumbing the chunk's compiled metadata through — left as a follow-up). For a typical 22-vCPU producer-worker pod with `cfg.concurrency: "auto"` this resolves to ~6 workers for a 240-frame chunk (capped by `defaultSafeMaxWorkers() = max(6, min(16, floor(cpuCount/8)))`), matching what `executeRenderJob` (the in-process path) already does. The prior hardcoded `workerCount: 2` was a safe-minimum starting point that undersized chunks vs prod's auto behavior. Tests: 12/12 pass in `renderChunk.test.ts` (unchanged — the test suite mocks the inner runCaptureStage call so workerCount selection is opaque to it). * refactor(distributed): /simplify pass on PR #906 Review pass on the parallel-capture frame-range change. Four targeted cleanups identified by code-quality and efficiency review agents: 1. Add the missing `frameRange.endFrame - frameRange.startFrame === totalFrames` assert. The parallel branch forwards `totalFrames` separately from `frameRangeStart`; a caller passing mismatched values would have got a silently wrong distribution. The sequential branch already implicitly relied on this via its `rangeFrames = rangeEnd - rangeStart` arithmetic. 2. Collapse three near-duplicate docstrings (on `WorkerTask.outputFrameOffset`, `executeDiskCaptureWithAdaptiveRetry.frameRangeStart`, and `runCaptureStage`'s `frameRange`) so only the WorkerTask field carries the full contract. The other two cross-reference it. 3. Drop the WHAT-narrating comments inside `executeWorkerTask`'s per-frame loop. The variable names (`fileFrameIdx = i - outputOffset`) already say what the line does; the only remaining comment flags the non-obvious contract that the streaming callback gets the absolute index. 4. Trim the 30-line `chunkWorkerCount` block in `renderChunk` to one paragraph explaining the one non-obvious thing (why we use `calculateOptimalWorkers` directly instead of `resolveRenderWorkerCount`). The probeSession-wasted-on- parallel acknowledgement stays as a 3-line follow-up flag — investigated skipping it in this pass, but the SwiftShader probe is safety-critical and has no per-worker equivalent, so deferred to a separate change with proper per-worker assertion plumbing. Tests + format + lint clean: * `bun test parallelCoordinator.test.ts` — 7/7 * `bun test distributed/{renderChunk,plan}.test.ts` — 24/24 * `bunx oxfmt` + `bunx oxlint` — clean
1 parent aa58ebc commit 22363a1

4 files changed

Lines changed: 81 additions & 40 deletions

File tree

packages/engine/src/services/parallelCoordinator.ts

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,15 @@ export interface WorkerTask {
2929
startFrame: number;
3030
endFrame: number;
3131
outputDir: string;
32+
/**
33+
* Offset subtracted from the absolute frame index when naming the captured
34+
* file (`frame_<i - outputFrameOffset>.{ext}`). Default 0. Distributed
35+
* chunks set this to the chunk's absolute startFrame so file names land
36+
* 0-indexed within the chunk's range — the encoder reads frames
37+
* sequentially without an `-start_number` override. The per-frame TIME
38+
* calculation still uses the absolute frame index.
39+
*/
40+
outputFrameOffset?: number;
3241
}
3342

3443
export interface WorkerResult {
@@ -148,20 +157,22 @@ export function distributeFrames(
148157
totalFrames: number,
149158
workerCount: number,
150159
workDir: string,
160+
rangeStart: number = 0,
151161
): WorkerTask[] {
152162
const tasks: WorkerTask[] = [];
153163
const framesPerWorker = Math.ceil(totalFrames / workerCount);
154164

155165
for (let i = 0; i < workerCount; i++) {
156-
const startFrame = i * framesPerWorker;
157-
const endFrame = Math.min((i + 1) * framesPerWorker, totalFrames);
158-
if (startFrame >= totalFrames) break;
166+
const startFrame = rangeStart + i * framesPerWorker;
167+
const endFrame = Math.min(rangeStart + (i + 1) * framesPerWorker, rangeStart + totalFrames);
168+
if (startFrame >= rangeStart + totalFrames) break;
159169

160170
tasks.push({
161171
workerId: i,
162172
startFrame,
163173
endFrame,
164174
outputDir: join(workDir, `worker-${i}`),
175+
outputFrameOffset: rangeStart,
165176
});
166177
}
167178

@@ -196,6 +207,7 @@ async function executeWorkerTask(
196207
);
197208
await initializeSession(session);
198209

210+
const outputOffset = task.outputFrameOffset ?? 0;
199211
for (let i = task.startFrame; i < task.endFrame; i++) {
200212
if (signal?.aborted) {
201213
throw new Error("Parallel worker cancelled");
@@ -204,14 +216,16 @@ async function executeWorkerTask(
204216
// frame-index → time math. The 1-in-1001 ULP loss for NTSC is invisible
205217
// at our scales (frame count tops out at single-digit thousands).
206218
const time = (i * captureOptions.fps.den) / captureOptions.fps.num;
219+
const fileFrameIdx = i - outputOffset;
207220

208221
if (onFrameBuffer) {
209-
// Streaming mode: capture to buffer and invoke callback
210-
const { buffer } = await captureFrameToBuffer(session, i, time);
222+
// The streaming-encode callback receives the absolute index `i`
223+
// (not `fileFrameIdx`) so the encoder sequences frames against the
224+
// composition's timeline.
225+
const { buffer } = await captureFrameToBuffer(session, fileFrameIdx, time);
211226
await onFrameBuffer(i, buffer);
212227
} else {
213-
// Disk mode: capture to file
214-
await captureFrame(session, i, time);
228+
await captureFrame(session, fileFrameIdx, time);
215229
}
216230
framesCaptured++;
217231

packages/producer/src/services/distributed/renderChunk.ts

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import {
4242
assertSwiftShader,
4343
type BeforeCaptureHook,
4444
BROWSER_GPU_NOT_SOFTWARE,
45+
calculateOptimalWorkers,
4546
type CaptureOptions,
4647
type CaptureSession,
4748
closeCaptureSession,
@@ -531,7 +532,13 @@ export async function renderChunk(
531532
// would deadlock Chrome's compositor by issuing a second beginFrame
532533
// at a `frameTimeTicks` it had just advanced to.
533534

534-
// ── Capture the chunk's range via runCaptureStage ──
535+
// Capture-cost calibration based on shader transitions /
536+
// renderModeHints is not threaded through to chunks yet; the in-process
537+
// renderer's `resolveRenderWorkerCount` wraps this with that reduction,
538+
// but `PlanJson` doesn't carry the compiled hints needed to call it
539+
// directly. The existing adaptive-retry path reduces workers if
540+
// compositor contention surfaces as CDP timeouts.
541+
const chunkWorkerCount = calculateOptimalWorkers(framesInChunk, undefined, cfg);
535542
await runCaptureStage({
536543
fileServer,
537544
workDir,
@@ -541,11 +548,10 @@ export async function renderChunk(
541548
cfg,
542549
forceScreenshot: encoder.forceScreenshot,
543550
log,
544-
workerCount: 1,
545-
// Pass the pre-warmed session through as `probeSession` so captureStage
546-
// reuses it via `prepareCaptureSessionForReuse` instead of spinning up
547-
// a fresh browser. The stage closes the session in its `finally`,
548-
// so we MUST clear our own reference here to avoid a double-close.
551+
workerCount: chunkWorkerCount,
552+
// The parallel branch closes this session and spins up its own
553+
// worker sessions, wasting the ~3-5s of pre-warmed setup. Worth a
554+
// follow-up to skip pre-warmup when the resolved workerCount > 1.
549555
probeSession: session,
550556
needsAlpha: plan.dimensions.format !== "mp4",
551557
captureAttempts: [],

packages/producer/src/services/render/stages/captureStage.ts

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -96,23 +96,15 @@ export interface CaptureStageInput {
9696
onProgress?: ProgressCallback;
9797
/**
9898
* Capture a sub-range `[startFrame, endFrame)` of the composition's
99-
* timeline. Used by distributed `renderChunk` workers to render only
100-
* their assigned chunk. Captured frames are written with file names
101-
* normalized to start at zero (`frame_000000.{ext}`) so the encoder
102-
* doesn't need an `-start_number` override; per-frame TIMES still
103-
* reflect the absolute frame index via `(absIdx * fps.den) / fps.num`,
104-
* keeping the page's virtual clock identical to what an in-process
105-
* render at that frame would see.
99+
* timeline. Used by distributed `renderChunk` to render only its chunk.
100+
* Captured file names are 0-indexed within the range; per-frame TIMES use
101+
* the absolute frame index so the page's virtual clock matches an
102+
* in-process render at that frame. Supported on both the sequential and
103+
* parallel branches; the parallel branch threads `frameRange.startFrame`
104+
* through as `frameRangeStart`. See `WorkerTask.outputFrameOffset`.
106105
*
107-
* Only honored on the sequential capture branch (workerCount === 1).
108-
* The parallel branch in this stage targets in-process renders where
109-
* adaptive retry across the whole timeline is the contract, and chunk
110-
* workers fan out at the activity layer instead. Passing `frameRange`
111-
* with `workerCount > 1` throws — the caller should reduce
112-
* `workerCount` to 1.
113-
*
114-
* Default `undefined`: the stage captures `[0, totalFrames)` (the
115-
* in-process contract).
106+
* Default `undefined`: capture `[0, totalFrames)` (in-process contract).
107+
* When set, `endFrame - startFrame` MUST equal `totalFrames`.
116108
*/
117109
frameRange?: { startFrame: number; endFrame: number };
118110
}
@@ -155,12 +147,6 @@ export async function runCaptureStage(input: CaptureStageInput): Promise<Capture
155147
const captureCfg: EngineConfig =
156148
cfg.forceScreenshot === forceScreenshot ? cfg : { ...cfg, forceScreenshot };
157149

158-
if (frameRange !== undefined && workerCount > 1) {
159-
throw new Error(
160-
`[captureStage] frameRange capture requires workerCount === 1 (received workerCount=${workerCount}). ` +
161-
`Distributed chunk workers fan out at the activity layer; reduce workerCount to 1 when passing frameRange.`,
162-
);
163-
}
164150
if (frameRange !== undefined) {
165151
if (
166152
!Number.isFinite(frameRange.startFrame) ||
@@ -173,10 +159,24 @@ export async function runCaptureStage(input: CaptureStageInput): Promise<Capture
173159
`Expected non-negative startFrame strictly less than endFrame.`,
174160
);
175161
}
162+
// The parallel branch passes `totalFrames` to executeDiskCaptureWithAdaptiveRetry
163+
// (which drives `distributeFrames` partitioning and `findMissingFrameRanges`
164+
// completion checks) AND `frameRangeStart` separately. They must describe the
165+
// same window: callers passing `totalFrames=100, frameRange={50, 200}` would
166+
// get a silently wrong distribution.
167+
const rangeFrames = frameRange.endFrame - frameRange.startFrame;
168+
if (rangeFrames !== totalFrames) {
169+
throw new Error(
170+
`[captureStage] frameRange size (${rangeFrames}) must equal totalFrames (${totalFrames}). ` +
171+
`Received frameRange=${JSON.stringify(frameRange)}.`,
172+
);
173+
}
176174
}
177175

178176
if (workerCount > 1) {
179-
// Parallel capture
177+
// Parallel capture. When `frameRange` is set (distributed chunk), pass
178+
// `frameRangeStart` so workers land on absolute composition frame indices
179+
// for time math while file names stay 0-indexed within the chunk range.
180180
const attempts = await executeDiskCaptureWithAdaptiveRetry({
181181
serverUrl: fileServer.url,
182182
workDir,
@@ -188,6 +188,7 @@ export async function runCaptureStage(input: CaptureStageInput): Promise<Capture
188188
captureOptions: buildCaptureOptions(),
189189
createBeforeCaptureHook: createRenderVideoFrameInjector,
190190
abortSignal,
191+
frameRangeStart: frameRange?.startFrame,
191192
onProgress: (progress) => {
192193
job.framesRendered = progress.capturedFrames;
193194
const frameProgress = progress.capturedFrames / progress.totalFrames;

packages/producer/src/services/renderOrchestrator.ts

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -536,17 +536,24 @@ export function buildMissingFrameRetryBatches(
536536
maxWorkers: number,
537537
workDir: string,
538538
attempt: number,
539+
rangeStart: number = 0,
539540
): WorkerTask[][] {
540541
const workersPerBatch = Math.max(1, Math.floor(maxWorkers));
541542
const batches: WorkerTask[][] = [];
542543

544+
// `ranges` are 0-indexed within the chunk's frame range (or full timeline
545+
// when `rangeStart === 0`); translate to absolute composition indices so
546+
// `WorkerTask`'s per-frame time math lands on the page's actual virtual
547+
// clock, and propagate `outputFrameOffset` so the retry captures back at
548+
// the same local file name `findMissingFrameRanges` was looking for.
543549
for (let i = 0; i < ranges.length; i += workersPerBatch) {
544550
const batchIndex = batches.length;
545551
const batch = ranges.slice(i, i + workersPerBatch).map((range, workerId) => ({
546552
workerId,
547-
startFrame: range.startFrame,
548-
endFrame: range.endFrame,
553+
startFrame: rangeStart + range.startFrame,
554+
endFrame: rangeStart + range.endFrame,
549555
outputDir: join(workDir, `retry-${attempt}-batch-${batchIndex}-worker-${workerId}`),
556+
outputFrameOffset: rangeStart,
550557
}));
551558
batches.push(batch);
552559
}
@@ -605,11 +612,18 @@ export async function executeDiskCaptureWithAdaptiveRetry(options: {
605612
onProgress?: (progress: ParallelProgress) => void;
606613
cfg: EngineConfig;
607614
log: ProducerLogger;
615+
/**
616+
* Forwarded to each `WorkerTask`'s `outputFrameOffset` and to the
617+
* `buildMissingFrameRetryBatches` translation. Default 0 (in-process
618+
* contract: `[0, totalFrames)`). See `WorkerTask.outputFrameOffset`.
619+
*/
620+
frameRangeStart?: number;
608621
}): Promise<CaptureAttemptSummary[]> {
609622
const attempts: CaptureAttemptSummary[] = [];
610623
let currentWorkers = options.initialWorkerCount;
611624
let missingRanges: FrameRange[] | null = null;
612625
let attempt = 0;
626+
const rangeStart = options.frameRangeStart ?? 0;
613627

614628
while (true) {
615629
const frameCount = missingRanges ? countFrameRanges(missingRanges) : options.totalFrames;
@@ -622,8 +636,14 @@ export async function executeDiskCaptureWithAdaptiveRetry(options: {
622636

623637
const attemptWorkDir = join(options.workDir, `capture-attempt-${attempt}`);
624638
const batches = missingRanges
625-
? buildMissingFrameRetryBatches(missingRanges, currentWorkers, attemptWorkDir, attempt)
626-
: [distributeFrames(options.totalFrames, currentWorkers, attemptWorkDir)];
639+
? buildMissingFrameRetryBatches(
640+
missingRanges,
641+
currentWorkers,
642+
attemptWorkDir,
643+
attempt,
644+
rangeStart,
645+
)
646+
: [distributeFrames(options.totalFrames, currentWorkers, attemptWorkDir, rangeStart)];
627647

628648
try {
629649
for (const tasks of batches) {

0 commit comments

Comments
 (0)