Skip to content

Commit 440ad2b

Browse files
committed
Add resource-aware job throttling & health info
Expose resource and job state in the health route (activeVideoProcesses, canAcceptNewVideoProcess, resources). Adjust FFmpeg subprocess handling to catch spawn errors and decrement active process tracking on failure. Update tests to mock the new job-manager API (getMaxConcurrentVideoProcesses, getSystemResources) and remove mocks for the removed increment/decrement functions.
1 parent d70f28c commit 440ad2b

File tree

4 files changed

+131
-48
lines changed

4 files changed

+131
-48
lines changed

apps/media-server/src/__tests__/routes/video.test.ts

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -311,11 +311,11 @@ describe("POST /video/process", () => {
311311
mock.module("../../lib/job-manager", () => ({
312312
canAcceptNewVideoProcess: () => false,
313313
getActiveVideoProcessCount: jobManager.getActiveVideoProcessCount,
314+
getMaxConcurrentVideoProcesses: jobManager.getMaxConcurrentVideoProcesses,
315+
getSystemResources: jobManager.getSystemResources,
314316
getAllJobs: jobManager.getAllJobs,
315317
generateJobId: jobManager.generateJobId,
316318
createJob: jobManager.createJob,
317-
incrementActiveVideoProcesses: jobManager.incrementActiveVideoProcesses,
318-
decrementActiveVideoProcesses: jobManager.decrementActiveVideoProcesses,
319319
getJob: jobManager.getJob,
320320
updateJob: jobManager.updateJob,
321321
deleteJob: jobManager.deleteJob,
@@ -347,6 +347,8 @@ describe("POST /video/process", () => {
347347
mock.module("../../lib/job-manager", () => ({
348348
canAcceptNewVideoProcess: () => true,
349349
getActiveVideoProcessCount: () => 0,
350+
getMaxConcurrentVideoProcesses: () => 3,
351+
getSystemResources: jobManager.getSystemResources,
350352
getAllJobs: () => [],
351353
generateJobId: () => "test-job-id",
352354
createJob: () => ({
@@ -358,8 +360,6 @@ describe("POST /video/process", () => {
358360
createdAt: new Date(),
359361
updatedAt: new Date(),
360362
}),
361-
incrementActiveVideoProcesses: () => {},
362-
decrementActiveVideoProcesses: () => {},
363363
getJob: () => null,
364364
updateJob: () => null,
365365
deleteJob: () => {},
@@ -410,11 +410,11 @@ describe("GET /video/process/:jobId/status", () => {
410410
mock.module("../../lib/job-manager", () => ({
411411
canAcceptNewVideoProcess: () => true,
412412
getActiveVideoProcessCount: () => 0,
413+
getMaxConcurrentVideoProcesses: () => 3,
414+
getSystemResources: jobManager.getSystemResources,
413415
getAllJobs: () => [],
414416
generateJobId: () => "test-job-id",
415417
createJob: jobManager.createJob,
416-
incrementActiveVideoProcesses: () => {},
417-
decrementActiveVideoProcesses: () => {},
418418
getJob: () => ({
419419
jobId: "test-job-id",
420420
videoId: "test-video",
@@ -459,11 +459,11 @@ describe("POST /video/process/:jobId/cancel", () => {
459459
mock.module("../../lib/job-manager", () => ({
460460
canAcceptNewVideoProcess: () => true,
461461
getActiveVideoProcessCount: () => 0,
462+
getMaxConcurrentVideoProcesses: () => 3,
463+
getSystemResources: jobManager.getSystemResources,
462464
getAllJobs: () => [],
463465
generateJobId: () => "test-job-id",
464466
createJob: jobManager.createJob,
465-
incrementActiveVideoProcesses: () => {},
466-
decrementActiveVideoProcesses: () => {},
467467
getJob: () => null,
468468
updateJob: () => null,
469469
deleteJob: () => {},
@@ -488,11 +488,11 @@ describe("POST /video/process/:jobId/cancel", () => {
488488
mock.module("../../lib/job-manager", () => ({
489489
canAcceptNewVideoProcess: () => true,
490490
getActiveVideoProcessCount: () => 0,
491+
getMaxConcurrentVideoProcesses: () => 3,
492+
getSystemResources: jobManager.getSystemResources,
491493
getAllJobs: () => [],
492494
generateJobId: () => "test-job-id",
493495
createJob: jobManager.createJob,
494-
incrementActiveVideoProcesses: () => {},
495-
decrementActiveVideoProcesses: () => {},
496496
getJob: () => ({
497497
jobId: "test-job-id",
498498
videoId: "test-video",
@@ -523,17 +523,16 @@ describe("POST /video/process/:jobId/cancel", () => {
523523
});
524524

525525
test("successfully cancels running job", async () => {
526-
const abortController = new AbortController();
527526
let abortCalled = false;
528527

529528
mock.module("../../lib/job-manager", () => ({
530529
canAcceptNewVideoProcess: () => true,
531530
getActiveVideoProcessCount: () => 0,
531+
getMaxConcurrentVideoProcesses: () => 3,
532+
getSystemResources: jobManager.getSystemResources,
532533
getAllJobs: () => [],
533534
generateJobId: () => "test-job-id",
534535
createJob: jobManager.createJob,
535-
incrementActiveVideoProcesses: () => {},
536-
decrementActiveVideoProcesses: () => {},
537536
getJob: () => ({
538537
jobId: "test-job-id",
539538
videoId: "test-video",

apps/media-server/src/lib/ffmpeg.ts

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -255,27 +255,33 @@ export function extractAudioStream(
255255
const opts = { ...DEFAULT_OPTIONS, ...options };
256256
const timeout = options.timeoutMs ?? EXTRACT_TIMEOUT_MS;
257257

258-
const ffmpegArgs = [
259-
"ffmpeg",
260-
"-i",
261-
videoUrl,
262-
"-vn",
263-
"-acodec",
264-
opts.codec,
265-
"-b:a",
266-
opts.bitrate,
267-
"-f",
268-
"mp3",
269-
"pipe:1",
270-
];
271-
272-
const proc = registerSubprocess(
273-
spawn({
274-
cmd: ffmpegArgs,
275-
stdout: "pipe",
276-
stderr: "pipe",
277-
}),
278-
);
258+
let proc: Subprocess;
259+
try {
260+
const ffmpegArgs = [
261+
"ffmpeg",
262+
"-i",
263+
videoUrl,
264+
"-vn",
265+
"-acodec",
266+
opts.codec,
267+
"-b:a",
268+
opts.bitrate,
269+
"-f",
270+
"mp3",
271+
"pipe:1",
272+
];
273+
274+
proc = registerSubprocess(
275+
spawn({
276+
cmd: ffmpegArgs,
277+
stdout: "pipe",
278+
stderr: "pipe",
279+
}),
280+
);
281+
} catch (err) {
282+
activeProcesses--;
283+
throw err;
284+
}
279285

280286
let timeoutId: ReturnType<typeof setTimeout> | undefined;
281287
let cleaned = false;

apps/media-server/src/lib/job-manager.ts

Lines changed: 82 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import os from "node:os";
12
import type { Subprocess } from "bun";
23
import type { TempFileHandle } from "./temp-files";
34

@@ -58,33 +59,94 @@ export interface Job {
5859
const jobs = new Map<string, Job>();
5960
const JOB_TTL_MS = 60 * 60 * 1000;
6061

61-
let activeVideoProcesses = 0;
62-
const maxConcurrentVideoProcesses =
62+
const configuredMaxProcesses =
6363
Number.parseInt(
64-
process.env.MEDIA_SERVER_MAX_CONCURRENT_VIDEO_PROCESSES ?? "3",
64+
process.env.MEDIA_SERVER_MAX_CONCURRENT_VIDEO_PROCESSES ?? "0",
6565
10,
66-
) || 3;
66+
) || 0;
67+
68+
const cpuCount = os.cpus().length;
69+
const totalMemoryMB = os.totalmem() / (1024 * 1024);
70+
71+
const CPU_LOAD_THRESHOLD = 0.8;
72+
const MEMORY_FREE_THRESHOLD = 0.15;
73+
74+
function isActivePhase(phase: JobPhase): boolean {
75+
return phase !== "complete" && phase !== "error" && phase !== "cancelled";
76+
}
6777

6878
export function getActiveVideoProcessCount(): number {
69-
return activeVideoProcesses;
79+
let count = 0;
80+
for (const job of jobs.values()) {
81+
if (isActivePhase(job.phase)) {
82+
count++;
83+
}
84+
}
85+
return count;
7086
}
7187

7288
export function getMaxConcurrentVideoProcesses(): number {
73-
return maxConcurrentVideoProcesses;
89+
if (configuredMaxProcesses > 0) {
90+
return configuredMaxProcesses;
91+
}
92+
return Math.max(1, Math.floor(cpuCount / 2));
7493
}
7594

76-
export function canAcceptNewVideoProcess(): boolean {
77-
return activeVideoProcesses < maxConcurrentVideoProcesses;
95+
export interface SystemResources {
96+
cpuCount: number;
97+
loadAvg1m: number;
98+
cpuPressure: number;
99+
totalMemoryMB: number;
100+
freeMemoryMB: number;
101+
memoryUsagePercent: number;
102+
configuredMax: number;
103+
effectiveMax: number;
104+
throttleReason: string | null;
78105
}
79106

80-
export function incrementActiveVideoProcesses(): void {
81-
activeVideoProcesses++;
82-
}
107+
export function getSystemResources(): SystemResources {
108+
const loadAvg1m = os.loadavg()[0];
109+
const freeMemoryMB = os.freemem() / (1024 * 1024);
110+
const cpuPressure = loadAvg1m / cpuCount;
111+
const memoryUsagePercent = 1 - freeMemoryMB / totalMemoryMB;
112+
const max = getMaxConcurrentVideoProcesses();
113+
114+
let effectiveMax = max;
115+
let throttleReason: string | null = null;
83116

84-
export function decrementActiveVideoProcesses(): void {
85-
if (activeVideoProcesses > 0) {
86-
activeVideoProcesses--;
117+
if (cpuPressure > CPU_LOAD_THRESHOLD) {
118+
effectiveMax = Math.max(
119+
1,
120+
Math.floor(max * (1 - (cpuPressure - CPU_LOAD_THRESHOLD))),
121+
);
122+
throttleReason = `CPU load ${cpuPressure.toFixed(2)} exceeds ${CPU_LOAD_THRESHOLD} threshold`;
87123
}
124+
125+
if (memoryUsagePercent > 1 - MEMORY_FREE_THRESHOLD) {
126+
const memMax = Math.max(1, Math.floor(max * (1 - memoryUsagePercent)));
127+
if (memMax < effectiveMax) {
128+
effectiveMax = memMax;
129+
throttleReason = `Memory usage ${(memoryUsagePercent * 100).toFixed(0)}% exceeds ${((1 - MEMORY_FREE_THRESHOLD) * 100).toFixed(0)}% threshold`;
130+
}
131+
}
132+
133+
return {
134+
cpuCount,
135+
loadAvg1m,
136+
cpuPressure,
137+
totalMemoryMB: Math.round(totalMemoryMB),
138+
freeMemoryMB: Math.round(freeMemoryMB),
139+
memoryUsagePercent,
140+
configuredMax: configuredMaxProcesses,
141+
effectiveMax,
142+
throttleReason,
143+
};
144+
}
145+
146+
export function canAcceptNewVideoProcess(): boolean {
147+
const active = getActiveVideoProcessCount();
148+
const resources = getSystemResources();
149+
return active < resources.effectiveMax;
88150
}
89151

90152
export function generateJobId(): string {
@@ -186,6 +248,12 @@ export function cleanupExpiredJobs(): number {
186248

187249
for (const [jobId, job] of jobs) {
188250
if (now - job.updatedAt > JOB_TTL_MS) {
251+
if (isActivePhase(job.phase)) {
252+
console.warn(
253+
`[job-manager] Cleaning up stuck job ${jobId} (phase=${job.phase}, age=${Math.round((now - job.createdAt) / 60000)}m)`,
254+
);
255+
job.abortController?.abort();
256+
}
189257
deleteJob(jobId);
190258
cleaned++;
191259
}

apps/media-server/src/routes/health.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
import { spawn } from "bun";
22
import { Hono } from "hono";
3+
import {
4+
canAcceptNewVideoProcess,
5+
getActiveVideoProcessCount,
6+
getSystemResources,
7+
} from "../lib/job-manager";
38

49
const health = new Hono();
510

@@ -26,12 +31,17 @@ health.get("/", async (c) => {
2631
}
2732
} catch {}
2833

34+
const resources = getSystemResources();
35+
2936
return c.json({
3037
status: ffmpegAvailable ? "ok" : "degraded",
3138
ffmpeg: {
3239
available: ffmpegAvailable,
3340
version: ffmpegVersion,
3441
},
42+
activeVideoProcesses: getActiveVideoProcessCount(),
43+
canAcceptNewVideoProcess: canAcceptNewVideoProcess(),
44+
resources,
3545
});
3646
});
3747

0 commit comments

Comments
 (0)