diff --git a/CHANGES.md b/CHANGES.md index e91da103c30e..dc442cc964c7 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -84,6 +84,7 @@ ## Bugfixes * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Fixed ProcessManager not reaping child processes, causing zombie process accumulation on long-running Flink deployments (Java) ([#37930](https://github.com/apache/beam/issues/37930)). ## Known Issues diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessManager.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessManager.java index 3570fef00df1..86299762783c 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessManager.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessManager.java @@ -203,6 +203,14 @@ private void stopProcess(String id, Process process) { } } } + // Reap the child process to prevent zombie accumulation. destroy()/destroyForcibly() send + // signals but do not call waitpid(), so the terminated process remains in the kernel process + // table as a zombie until waitFor() collects its exit status. + try { + process.waitFor(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } /** Returns true if the process exists within maxWaitTimeMillis. */ @@ -249,8 +257,16 @@ private void stopAllProcesses() { processes.forEach((id, process) -> process.destroy()); } - /** Kill all remaining processes forcibly, i.e. upon JVM shutdown */ + /** Kill all remaining processes forcibly and reap them, i.e. upon JVM shutdown. */ private void killAllProcesses() { - processes.forEach((id, process) -> process.destroyForcibly()); + processes.forEach( + (id, process) -> { + process.destroyForcibly(); + try { + process.waitFor(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); } }