Skip to content

Commit 7431dea

Browse files
authored
[#37930][Runners] Reap child processes in ProcessManager to prevent zombie accumulation (#37932)
* [#37930][Runners] Reap child processes in ProcessManager to prevent zombie accumulation ProcessManager.stopProcess() calls destroy()/destroyForcibly() to terminate child processes, but never calls Process.waitFor() to collect the exit status. On POSIX systems, this means the terminated child process entry remains in the kernel process table as a zombie (state Z/defunct) until the parent process itself exits. In long-running environments like Flink TaskManagers using --environment_type=PROCESS, the expansion service processes are repeatedly spawned and stopped but never reaped. Over time this leads to significant zombie accumulation (176+ observed on production hosts). The fix adds process.waitFor() calls after process termination in: - stopProcess(): after the destroy/destroyForcibly sequence - killAllProcesses(): after destroyForcibly in the shutdown hook path Fixes #37930 * Update CHANGES.md with ProcessManager zombie fix
1 parent 683eceb commit 7431dea

2 files changed

Lines changed: 19 additions & 2 deletions

File tree

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@
8484
## Bugfixes
8585

8686
* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
87+
* Fixed ProcessManager not reaping child processes, causing zombie process accumulation on long-running Flink deployments (Java) ([#37930](https://github.com/apache/beam/issues/37930)).
8788

8889
## Security Fixes
8990

runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessManager.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,14 @@ private void stopProcess(String id, Process process) {
203203
}
204204
}
205205
}
206+
// Reap the child process to prevent zombie accumulation. destroy()/destroyForcibly() send
207+
// signals but do not call waitpid(), so the terminated process remains in the kernel process
208+
// table as a zombie until waitFor() collects its exit status.
209+
try {
210+
process.waitFor();
211+
} catch (InterruptedException e) {
212+
Thread.currentThread().interrupt();
213+
}
206214
}
207215

208216
/** Returns true if the process exists within maxWaitTimeMillis. */
@@ -249,8 +257,16 @@ private void stopAllProcesses() {
249257
processes.forEach((id, process) -> process.destroy());
250258
}
251259

252-
/** Kill all remaining processes forcibly, i.e. upon JVM shutdown */
260+
/** Kill all remaining processes forcibly and reap them, i.e. upon JVM shutdown. */
253261
private void killAllProcesses() {
254-
processes.forEach((id, process) -> process.destroyForcibly());
262+
processes.forEach(
263+
(id, process) -> {
264+
process.destroyForcibly();
265+
try {
266+
process.waitFor();
267+
} catch (InterruptedException e) {
268+
Thread.currentThread().interrupt();
269+
}
270+
});
255271
}
256272
}

0 commit comments

Comments
 (0)