Skip to content

Commit 2841129

Browse files
authored
Fix process monitoring hang on Linux kernels < 5.4 (#273)
Install a SIGCHLD handler via a new _subprocess_install_sigchld_handler C shim so the kernel notifies the monitor thread when children exit. Pass an empty sigmask to epoll_pwait so SIGCHLD is deliverable during the wait, and mark both ends of the self-pipe O_NONBLOCK so the drain loop in _notifyAllKnownChildProcesses exits cleanly on EAGAIN. Call close_range via a direct syscall wrapper on Linux, with fallback definitions for SYS_close_range and CLOSE_RANGE_CLOEXEC, so the build works against glibc < 2.34 and older kernel-headers (e.g. Amazon Linux 2 with kernel 4.14).
1 parent 84d6efc commit 2841129

3 files changed

Lines changed: 92 additions & 32 deletions

File tree

Sources/Subprocess/Platforms/Subprocess+Linux.swift

Lines changed: 61 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,9 @@ internal func waitForProcessTermination(
103103
}
104104
// Now save the registration
105105
var newState = storage
106-
newState.continuations[processIdentifier.processDescriptor] = continuation
106+
var list = newState.continuations[processIdentifier.processDescriptor] ?? []
107+
list.append(continuation)
108+
newState.continuations[processIdentifier.processDescriptor] = list
107109
state = .started(newState)
108110
// No state to resume
109111
return nil
@@ -114,10 +116,14 @@ internal func waitForProcessTermination(
114116
// is not the case and only save continuation then.
115117
switch Result(catching: { () throws(Errno) -> Bool in try processIdentifier.peekIfExited() }) {
116118
case let .success(processExited):
117-
// Save this continuation to be called by signal handler
118-
var newState = storage
119-
newState.continuations[processIdentifier.value] = continuation
120-
state = .started(newState)
119+
if !processExited {
120+
// Save this continuation to be called by signal handler
121+
var newState = storage
122+
var list = newState.continuations[processIdentifier.processDescriptor] ?? []
123+
list.append(continuation)
124+
newState.continuations[processIdentifier.value] = list
125+
state = .started(newState)
126+
}
121127
return .success(processExited)
122128
case let .failure(underlyingError):
123129
let error: SubprocessError = .failedToMonitor(
@@ -153,7 +159,7 @@ private enum ProcessMonitorState {
153159
let epollFileDescriptor: CInt
154160
let shutdownFileDescriptor: CInt
155161
let monitorThread: pthread_t
156-
var continuations: [PlatformFileDescriptor: CheckedContinuation<Void, any Error>]
162+
var continuations: [PlatformFileDescriptor: [CheckedContinuation<Void, any Error>]]
157163
}
158164

159165
case notStarted
@@ -204,11 +210,7 @@ private func shutdown() {
204210
/// See the following page for the complete list of `async-signal-safe` functions
205211
/// https://man7.org/linux/man-pages/man7/signal-safety.7.html
206212
/// Only these functions can be used in the signal handler below
207-
private func signalHandler(
208-
_ signalNumber: CInt,
209-
_ signalInfo: UnsafeMutablePointer<siginfo_t>?,
210-
_ context: UnsafeMutableRawPointer?
211-
) {
213+
private func signalHandler(_ signalNumber: CInt) {
212214
let savedErrno = errno
213215
var one: UInt8 = 1
214216
withUnsafeBytes(of: &one) { ptr in
@@ -225,7 +227,6 @@ private func monitorThreadFunc(context: MonitorThreadContext) {
225227
)
226228
var waitMask = sigset_t()
227229
sigemptyset(&waitMask)
228-
sigaddset(&waitMask, SIGCHLD)
229230
// Enter the monitor loop
230231
monitorLoop: while true {
231232
let eventCount = epoll_pwait(
@@ -247,7 +248,7 @@ private func monitorThreadFunc(context: MonitorThreadContext) {
247248
let continuations = _processMonitorState.withLock { state -> [CheckedContinuation<Void, any Error>] in
248249
let result: [CheckedContinuation<Void, any Error>]
249250
if case .started(let storage) = state {
250-
result = Array(storage.continuations.values)
251+
result = storage.continuations.values.flatMap { $0 }
251252
} else {
252253
result = []
253254
}
@@ -333,6 +334,18 @@ private let setup: () = {
333334
do {
334335
let (readEnd, writeEnd) = try FileDescriptor.pipe()
335336
_signalPipe = (readEnd.rawValue, writeEnd.rawValue)
337+
// Make the pipe non-blocking. The read end MUST be non-blocking
338+
// so the drain loop in _notifyAllKnownChildProcesses exits cleanly
339+
// with EAGAIN once the pipe is empty.
340+
for fd in [readEnd.rawValue, writeEnd.rawValue] {
341+
let existing = fcntl(fd, F_GETFL)
342+
if existing < 0 {
343+
throw Errno(rawValue: errno)
344+
}
345+
if fcntl(fd, F_SETFL, existing | O_NONBLOCK) < 0 {
346+
throw Errno(rawValue: errno)
347+
}
348+
}
336349
} catch {
337350
var underlying: Errno? = nil
338351
if let err = error as? Errno {
@@ -364,6 +377,11 @@ private let setup: () = {
364377
_reportFailureWithErrno(errno)
365378
return
366379
}
380+
// Install the SIGCHLD handler
381+
guard _subprocess_install_sigchld_handler(signalHandler) == 0 else {
382+
_reportFailureWithErrno(errno)
383+
return
384+
}
367385
} else {
368386
// Mark waitid(P_PIDFD) as supported
369387
_waitProcessDescriptorSupported = true
@@ -413,9 +431,9 @@ internal func _setupMonitorSignalHandler() {
413431

414432
private func _unregisterProcessDescriptorAndNotify(_ pidfd: CInt, context: MonitorThreadContext) {
415433
// Remove the continuation
416-
let result = _processMonitorState.withLock { state -> (continuation: CheckedContinuation<Void, any Error>, error: SubprocessError?)? in
434+
let result = _processMonitorState.withLock { state -> (continuations: [CheckedContinuation<Void, any Error>], error: SubprocessError?)? in
417435
guard case .started(let storage) = state,
418-
let continuation = storage.continuations[pidfd]
436+
let continuationList = storage.continuations[pidfd]
419437
else {
420438
return nil
421439
}
@@ -436,23 +454,31 @@ private func _unregisterProcessDescriptorAndNotify(_ pidfd: CInt, context: Monit
436454
let error = SubprocessError.failedToMonitor(
437455
withUnderlyingError: Errno(rawValue: epollErrno)
438456
)
439-
return (continuation, error)
457+
return (continuationList, error)
440458
}
441459

442-
return (continuation, nil)
460+
return (continuationList, nil)
443461
}
444462

445-
if let error = result?.error {
446-
result?.continuation.resume(throwing: error)
463+
guard let result else {
464+
return
465+
}
466+
467+
if let error = result.error {
468+
for c in result.continuations {
469+
c.resume(throwing: error)
470+
}
447471
} else {
448-
result?.continuation.resume()
472+
for c in result.continuations {
473+
c.resume()
474+
}
449475
}
450476
}
451477

452478
// On older kernels, fall back to using signal handlers
453-
private typealias ResultContinuation = (
479+
private typealias ResultContinuations = (
454480
failure: SubprocessError?,
455-
continuation: CheckedContinuation<Void, any Error>
481+
continuations: [CheckedContinuation<Void, any Error>]
456482
)
457483
private func _notifyAllKnownChildProcesses(_ signalFd: CInt, context: MonitorThreadContext) {
458484
guard signalFd == _signalPipe.readEnd else {
@@ -467,25 +493,25 @@ private func _notifyAllKnownChildProcesses(_ signalFd: CInt, context: MonitorThr
467493
}
468494
}
469495

470-
let resumingContinuations: [ResultContinuation] = _processMonitorState.withLock { state in
496+
let resumingContinuations: [ResultContinuations] = _processMonitorState.withLock { state in
471497
guard case .started(let storage) = state else {
472498
return []
473499
}
474500
var updatedContinuations = storage.continuations
475-
var results: [ResultContinuation] = []
501+
var results: [ResultContinuations] = []
476502
// Since Linux coalesce signals, we need to loop through all known child process
477503
// to check if they exited.
478-
loop: for (knownChildPID, continuation) in storage.continuations {
504+
loop: for (knownChildPID, continuations) in storage.continuations {
479505
switch Result(catching: { () throws(Errno) -> Bool in try _peekIfExited(pid: knownChildPID) }) {
480506
case let .success(processExisted):
481507
if processExisted {
482-
results.append((failure: nil, continuation: continuation))
508+
results.append((failure: nil, continuations: continuations))
483509
} else {
484510
// The process is still running, move on to the next one
485511
continue loop
486512
}
487513
case let .failure(error):
488-
results.append((failure: SubprocessError.failedToMonitor(withUnderlyingError: error), continuation: continuation))
514+
results.append((failure: SubprocessError.failedToMonitor(withUnderlyingError: error), continuations: continuations))
489515
}
490516
// Now we have the exit code, remove saved continuations
491517
updatedContinuations.removeValue(forKey: knownChildPID)
@@ -497,11 +523,15 @@ private func _notifyAllKnownChildProcesses(_ signalFd: CInt, context: MonitorThr
497523
return results
498524
}
499525
// Resume continuations
500-
for c in resumingContinuations {
501-
if let error = c.failure {
502-
c.continuation.resume(throwing: error)
526+
for resumingContinuation in resumingContinuations {
527+
if let error = resumingContinuation.failure {
528+
for c in resumingContinuation.continuations {
529+
c.resume(throwing: error)
530+
}
503531
} else {
504-
c.continuation.resume()
532+
for c in resumingContinuation.continuations {
533+
c.resume()
534+
}
505535
}
506536
}
507537
}

Sources/_SubprocessCShims/include/process_shims.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,8 @@ int _pidfd_open(pid_t pid);
120120
#define P_PIDFD 3
121121
#endif
122122

123+
int _subprocess_install_sigchld_handler(void (* _Nonnull handler)(int));
124+
123125
#endif
124126

125127
#ifdef __cplusplus

Sources/_SubprocessCShims/process_shims.c

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,30 @@ struct linux_dirent64 {
347347
static int _getdents64(int fd, struct linux_dirent64 *dirp, size_t nbytes) {
348348
return syscall(SYS_getdents64, fd, dirp, nbytes);
349349
}
350+
351+
// SYS_close_range is only defined on Linux Kernel 5.9 and above.
352+
// Define our value if it's not available and call the syscall directly because
353+
// glibc < 2.34 (e.g. Amazon Linux 2) doesn't provide a close_range() wrapper.
354+
#ifndef SYS_close_range
355+
#define SYS_close_range 436
356+
#endif
357+
358+
#ifndef CLOSE_RANGE_CLOEXEC
359+
#define CLOSE_RANGE_CLOEXEC (1U << 2)
360+
#endif
361+
362+
static int _close_range(unsigned int first, unsigned int last, unsigned int flags) {
363+
return syscall(SYS_close_range, first, last, flags);
364+
}
365+
366+
int _subprocess_install_sigchld_handler(void (*handler)(int)) {
367+
struct sigaction sa;
368+
memset(&sa, 0, sizeof(sa));
369+
sa.sa_handler = handler;
370+
sa.sa_flags = SA_RESTART | SA_NOCLDSTOP;
371+
sigemptyset(&sa.sa_mask);
372+
return sigaction(SIGCHLD, &sa, NULL);
373+
}
350374
#endif
351375

352376
static pid_t _subprocess_pdfork(int *fdp) {
@@ -674,7 +698,11 @@ int _subprocess_fork_exec(
674698
// Close all other file descriptors
675699
rc = -1;
676700
errno = ENOSYS;
677-
#if (__has_include(<linux/close_range.h>) && (!defined(__ANDROID__) || __ANDROID_API__ >= 34)) || defined(__FreeBSD__)
701+
#if defined(__linux__)
702+
// We must NOT close pipefd[1] for writing errors
703+
rc = _close_range(STDERR_FILENO + 1, pipefd[1] - 1, CLOSE_RANGE_CLOEXEC);
704+
rc |= _close_range(pipefd[1] + 1, ~0U, CLOSE_RANGE_CLOEXEC);
705+
#elif defined(__FreeBSD__)
678706
// We must NOT close pipefd[1] for writing errors
679707
rc = close_range(STDERR_FILENO + 1, pipefd[1] - 1, CLOSE_RANGE_CLOEXEC);
680708
rc |= close_range(pipefd[1] + 1, ~0U, CLOSE_RANGE_CLOEXEC);

0 commit comments

Comments
 (0)