Skip to content

Commit 4e58ed4

Browse files
committed
Manage subprocesses, add retries and shutdown hooks
1 parent 1d89ada commit 4e58ed4

File tree

9 files changed

+396
-180
lines changed

9 files changed

+396
-180
lines changed

apps/media-server/src/index.ts

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,34 @@
11
import app from "./app";
2+
import { abortAllJobs } from "./lib/job-manager";
3+
import { terminateAllSubprocesses } from "./lib/subprocess";
24

35
const port = Number(process.env.PORT) || 3456;
46

57
console.log(`[media-server] Starting on port ${port}`);
68

7-
const shutdown = () => {
9+
let shuttingDown = false;
10+
11+
const shutdown = async () => {
12+
if (shuttingDown) return;
13+
shuttingDown = true;
814
console.log("[media-server] Shutting down...");
15+
const abortedJobs = abortAllJobs();
16+
if (abortedJobs > 0) {
17+
console.log(`[media-server] Aborted ${abortedJobs} active jobs`);
18+
}
19+
await terminateAllSubprocesses();
920
process.exit(0);
1021
};
1122

12-
process.on("SIGINT", shutdown);
13-
process.on("SIGTERM", shutdown);
14-
process.on("SIGHUP", shutdown);
23+
process.on("SIGINT", () => {
24+
void shutdown();
25+
});
26+
process.on("SIGTERM", () => {
27+
void shutdown();
28+
});
29+
process.on("SIGHUP", () => {
30+
void shutdown();
31+
});
1532

1633
export default {
1734
port,

apps/media-server/src/lib/ffmpeg-video.ts

Lines changed: 37 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
import { file, type Subprocess, spawn } from "bun";
22
import type { VideoMetadata } from "./job-manager";
3+
import { registerSubprocess, terminateProcess } from "./subprocess";
34
import { createTempFile, type TempFileHandle } from "./temp-files";
45

56
const PROCESS_TIMEOUT_MS = 45 * 60 * 1000;
67
const PROCESS_TIMEOUT_PER_SECOND_MS = 20_000;
78
const MAX_PROCESS_TIMEOUT_MS = 2 * 60 * 60 * 1000;
8-
const PROCESS_EXIT_WAIT_MS = 5_000;
99
const THUMBNAIL_TIMEOUT_MS = 60_000;
1010
const DOWNLOAD_TIMEOUT_MS = 10 * 60 * 1000;
1111
const UPLOAD_TIMEOUT_MS = 10 * 60 * 1000;
@@ -66,32 +66,6 @@ export function normalizeVideoInputExtension(
6666
: (`.${normalized}` as `.${string}`);
6767
}
6868

69-
function killProcess(proc: Subprocess): void {
70-
try {
71-
proc.kill();
72-
} catch {}
73-
}
74-
75-
async function waitForProcessExit(
76-
proc: Pick<Subprocess, "exited">,
77-
timeoutMs = PROCESS_EXIT_WAIT_MS,
78-
): Promise<void> {
79-
await Promise.race([
80-
proc.exited.then(
81-
() => undefined,
82-
() => undefined,
83-
),
84-
new Promise<void>((resolve) => {
85-
setTimeout(resolve, timeoutMs);
86-
}),
87-
]);
88-
}
89-
90-
async function terminateProcess(proc: Subprocess): Promise<void> {
91-
killProcess(proc);
92-
await waitForProcessExit(proc);
93-
}
94-
9569
export async function withTimeout<T>(
9670
promise: Promise<T>,
9771
timeoutMs: number,
@@ -144,11 +118,18 @@ async function readStreamWithLimit(
144118
let totalBytes = 0;
145119

146120
try {
147-
while (totalBytes < maxBytes) {
121+
while (true) {
148122
const { done, value } = await reader.read();
149123
if (done) break;
150-
chunks.push(value);
151-
totalBytes += value.length;
124+
if (totalBytes < maxBytes) {
125+
const remainingBytes = maxBytes - totalBytes;
126+
const chunk =
127+
value.length > remainingBytes
128+
? value.slice(0, remainingBytes)
129+
: value;
130+
chunks.push(chunk);
131+
totalBytes += chunk.length;
132+
}
152133
}
153134
} finally {
154135
reader.releaseLock();
@@ -230,8 +211,7 @@ export async function downloadVideoToTemp(
230211
throw new Error("No response body");
231212
}
232213

233-
const arrayBuffer = await response.arrayBuffer();
234-
await Bun.write(tempFile.path, arrayBuffer);
214+
await Bun.write(tempFile.path, response);
235215

236216
const fileHandle = file(tempFile.path);
237217
const fileSize = fileHandle.size;
@@ -280,16 +260,18 @@ export async function repairContainer(
280260

281261
console.log(`[repairContainer] Running: ${ffmpegArgs.join(" ")}`);
282262

283-
const proc = spawn({
284-
cmd: ffmpegArgs,
285-
stdout: "pipe",
286-
stderr: "pipe",
287-
});
263+
const proc = registerSubprocess(
264+
spawn({
265+
cmd: ffmpegArgs,
266+
stdout: "pipe",
267+
stderr: "pipe",
268+
}),
269+
);
288270

289271
let abortCleanup: (() => void) | undefined;
290272
if (abortSignal) {
291273
abortCleanup = () => {
292-
killProcess(proc);
274+
void terminateProcess(proc);
293275
};
294276
abortSignal.addEventListener("abort", abortCleanup, { once: true });
295277
}
@@ -537,18 +519,20 @@ export async function processVideo(
537519

538520
console.log(`[processVideo] Running FFmpeg: ${ffmpegArgs.join(" ")}`);
539521

540-
const proc = spawn({
541-
cmd: ffmpegArgs,
542-
stdout: "pipe",
543-
stderr: "pipe",
544-
});
522+
const proc = registerSubprocess(
523+
spawn({
524+
cmd: ffmpegArgs,
525+
stdout: "pipe",
526+
stderr: "pipe",
527+
}),
528+
);
545529

546530
const totalDurationUs = metadata.duration * 1_000_000;
547531

548532
let abortCleanup: (() => void) | undefined;
549533
if (abortSignal) {
550534
abortCleanup = () => {
551-
killProcess(proc);
535+
void terminateProcess(proc);
552536
};
553537
abortSignal.addEventListener("abort", abortCleanup, { once: true });
554538
}
@@ -659,11 +643,13 @@ export async function generateThumbnail(
659643
"pipe:1",
660644
];
661645

662-
const proc = spawn({
663-
cmd: ffmpegArgs,
664-
stdout: "pipe",
665-
stderr: "pipe",
666-
});
646+
const proc = registerSubprocess(
647+
spawn({
648+
cmd: ffmpegArgs,
649+
stdout: "pipe",
650+
stderr: "pipe",
651+
}),
652+
);
667653

668654
try {
669655
const result = await withTimeout(
@@ -708,12 +694,12 @@ export async function generateThumbnail(
708694
return output;
709695
})(),
710696
THUMBNAIL_TIMEOUT_MS,
711-
() => killProcess(proc),
697+
() => terminateProcess(proc),
712698
);
713699

714700
return result;
715701
} finally {
716-
killProcess(proc);
702+
await terminateProcess(proc);
717703
}
718704
}
719705

apps/media-server/src/lib/ffmpeg.ts

Lines changed: 45 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { type Subprocess, spawn } from "bun";
2+
import { registerSubprocess, terminateProcess } from "./subprocess";
23

34
export interface AudioExtractionOptions {
45
format?: "mp3";
@@ -28,21 +29,18 @@ export function canAcceptNewProcess(): boolean {
2829
return activeProcesses < MAX_CONCURRENT_PROCESSES;
2930
}
3031

31-
function killProcess(proc: Subprocess): void {
32-
try {
33-
proc.kill();
34-
} catch {}
35-
}
36-
3732
async function withTimeout<T>(
3833
promise: Promise<T>,
3934
timeoutMs: number,
40-
cleanup?: () => void,
35+
cleanup?: () => void | Promise<void>,
4136
): Promise<T> {
4237
let timeoutId: ReturnType<typeof setTimeout> | undefined;
38+
let cleanupPromise: Promise<void> | undefined;
4339
const timeoutPromise = new Promise<never>((_, reject) => {
4440
timeoutId = setTimeout(() => {
45-
cleanup?.();
41+
cleanupPromise = Promise.resolve()
42+
.then(() => cleanup?.())
43+
.then(() => undefined);
4644
reject(new Error(`Operation timed out after ${timeoutMs}ms`));
4745
}, timeoutMs);
4846
});
@@ -52,6 +50,9 @@ async function withTimeout<T>(
5250
if (timeoutId) clearTimeout(timeoutId);
5351
return result;
5452
} catch (err) {
53+
if (cleanupPromise) {
54+
await cleanupPromise;
55+
}
5556
if (timeoutId) clearTimeout(timeoutId);
5657
throw err;
5758
}
@@ -80,11 +81,18 @@ async function readStreamWithLimit(
8081
let totalBytes = 0;
8182

8283
try {
83-
while (totalBytes < maxBytes) {
84+
while (true) {
8485
const { done, value } = await reader.read();
8586
if (done) break;
86-
chunks.push(value);
87-
totalBytes += value.length;
87+
if (totalBytes < maxBytes) {
88+
const remainingBytes = maxBytes - totalBytes;
89+
const chunk =
90+
value.length > remainingBytes
91+
? value.slice(0, remainingBytes)
92+
: value;
93+
chunks.push(chunk);
94+
totalBytes += chunk.length;
95+
}
8896
}
8997
} finally {
9098
reader.releaseLock();
@@ -105,11 +113,13 @@ export async function checkHasAudioTrack(videoUrl: string): Promise<boolean> {
105113

106114
activeProcesses++;
107115

108-
const proc = spawn({
109-
cmd: ["ffmpeg", "-i", videoUrl, "-hide_banner"],
110-
stdout: "pipe",
111-
stderr: "pipe",
112-
});
116+
const proc = registerSubprocess(
117+
spawn({
118+
cmd: ["ffmpeg", "-i", videoUrl, "-hide_banner"],
119+
stdout: "pipe",
120+
stderr: "pipe",
121+
}),
122+
);
113123

114124
try {
115125
const result = await withTimeout(
@@ -124,13 +134,13 @@ export async function checkHasAudioTrack(videoUrl: string): Promise<boolean> {
124134
return /Stream #\d+:\d+.*Audio:/.test(stderrText);
125135
})(),
126136
CHECK_TIMEOUT_MS,
127-
() => killProcess(proc),
137+
() => terminateProcess(proc),
128138
);
129139

130140
return result;
131141
} finally {
132142
activeProcesses--;
133-
killProcess(proc);
143+
await terminateProcess(proc);
134144
}
135145
}
136146

@@ -160,11 +170,13 @@ export async function extractAudio(
160170
"pipe:1",
161171
];
162172

163-
const proc = spawn({
164-
cmd: ffmpegArgs,
165-
stdout: "pipe",
166-
stderr: "pipe",
167-
});
173+
const proc = registerSubprocess(
174+
spawn({
175+
cmd: ffmpegArgs,
176+
stdout: "pipe",
177+
stderr: "pipe",
178+
}),
179+
);
168180

169181
try {
170182
const result = await withTimeout(
@@ -215,13 +227,13 @@ export async function extractAudio(
215227
return output;
216228
})(),
217229
EXTRACT_TIMEOUT_MS,
218-
() => killProcess(proc),
230+
() => terminateProcess(proc),
219231
);
220232

221233
return result;
222234
} finally {
223235
activeProcesses--;
224-
killProcess(proc);
236+
await terminateProcess(proc);
225237
}
226238
}
227239

@@ -257,11 +269,13 @@ export function extractAudioStream(
257269
"pipe:1",
258270
];
259271

260-
const proc = spawn({
261-
cmd: ffmpegArgs,
262-
stdout: "pipe",
263-
stderr: "pipe",
264-
});
272+
const proc = registerSubprocess(
273+
spawn({
274+
cmd: ffmpegArgs,
275+
stdout: "pipe",
276+
stderr: "pipe",
277+
}),
278+
);
265279

266280
let timeoutId: ReturnType<typeof setTimeout> | undefined;
267281
let cleaned = false;
@@ -279,7 +293,7 @@ export function extractAudioStream(
279293
reader = null;
280294
}
281295
activeProcesses--;
282-
killProcess(proc);
296+
void terminateProcess(proc);
283297
};
284298

285299
timeoutId = setTimeout(() => {

0 commit comments

Comments
 (0)