Skip to content

Commit 84d6efc

Browse files
authored
Cancel AsyncIO when the child process exits (#272)
* Cancel AsyncIO when the child process exits Subprocess used to assume that a child's exit closes the pipe FDs it inherited. When the child forked a grandchild that also holds those FDs, the kernel keeps the pipes alive after the child exits and AsyncIO's read/write loops never see EOF or EPIPE — run() hangs. Split process monitoring into two phases: - waitForProcessTermination waits for the kernel to report exit. - reapProcess consumes the zombie and returns the status. Configuration.run now spawns a parallel monitor task and races it against the body closure via an AtomicCounter. If the process exits while the body is still doing I/O, the monitor calls AsyncIO.cancelAsyncIO(for:) to unblock every pending read or write for that process. Each platform's AsyncIO backend gains a Registration struct that groups file descriptors (or IOCP completion keys on Windows) by process. Cancellation walks those descriptors and either deletes them from epoll/kqueue or calls CancelIoEx. A cancelledProcesses set rejects late registrations so a register-vs-cancel race cannot leak a stream. run clears the marker before reapProcess so PID reuse is safe. Adds regression tests covering stdin, stdout, and stderr on Unix (POSIX shell + sleep grandchild) and Windows (PowerShell + Start-Process -NoNewWindow grandchild). * Update macOS CI to using Xcode 26.4
1 parent 745d4a7 commit 84d6efc

20 files changed

Lines changed: 1449 additions & 413 deletions

.github/workflows/pull_request.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,17 +35,17 @@ jobs:
3535
# Test dependencies
3636
yum install -y procps
3737
fi
38-
linux_build_command: 'swift test && swift test -c release && swift test --disable-default-traits'
38+
linux_build_command: "swift test && swift test -c release && swift test --disable-default-traits"
3939
enable_freebsd_checks: true
40-
freebsd_build_command: 'swift test && swift test -c release && swift test --disable-default-traits'
40+
freebsd_build_command: "swift test && swift test -c release && swift test --disable-default-traits"
4141
windows_swift_versions: '["6.2", "nightly-main"]'
4242
windows_build_command: |
4343
Invoke-Program swift test
4444
Invoke-Program swift test -c release
4545
Invoke-Program swift test --disable-default-traits
4646
enable_macos_checks: true
47-
macos_xcode_versions: '["26.0"]'
48-
macos_build_command: 'xcrun swift test && xcrun swift test -c release && xcrun swift test --disable-default-traits'
47+
macos_xcode_versions: '["26.4"]'
48+
macos_build_command: "xcrun swift test && xcrun swift test -c release && xcrun swift test --disable-default-traits"
4949
enable_linux_static_sdk_build: true
5050
enable_android_sdk_build: true
5151
android_ndk_versions: '["r27d", "r29"]'

Sources/Subprocess/API.swift

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,10 @@ public func run<
275275
return try await withThrowingTaskGroup(of: _RunGroupResult<Output, Error>.self) { group in
276276
var writer: StandardInputWriter?
277277
if inputIOBox != nil {
278-
let inputWriter = StandardInputWriter(diskIO: inputIOBox.take()!)
278+
let inputWriter = StandardInputWriter(
279+
diskIO: inputIOBox.take()!,
280+
processIdentifier: processIdentifier
281+
)
279282
writer = inputWriter
280283

281284
if Input.self != CustomWriteInput.self {
@@ -294,7 +297,8 @@ public func run<
294297
if Output.self == SequenceOutput.self {
295298
var diskIO = outputIOBox.take()
296299
outputSequence = AsyncBufferSequence(
297-
diskIO: diskIO!.consumeDescriptor()
300+
diskIO: diskIO!.consumeDescriptor(),
301+
processIdentifier: processIdentifier
298302
)
299303
} else if Output.OutputType.self == Void.self {
300304
// No need to capture output
@@ -303,15 +307,18 @@ public func run<
303307
} else {
304308
var diskIO = outputIOBox.take()
305309
group.addTask {
306-
let result = try await output.captureOutput(from: diskIO.take())
310+
let result = try await output.captureOutput(
311+
from: diskIO.take(), for: processIdentifier
312+
)
307313
return .standardOutputCaptured(result)
308314
}
309315
}
310316

311317
if Error.self == SequenceOutput.self {
312318
var diskIO = errorIOBox.take()
313319
errorSequence = AsyncBufferSequence(
314-
diskIO: diskIO!.consumeDescriptor()
320+
diskIO: diskIO!.consumeDescriptor(),
321+
processIdentifier: processIdentifier
315322
)
316323
} else if Error.OutputType.self == Void.self {
317324
// No need to capture error
@@ -320,7 +327,9 @@ public func run<
320327
} else {
321328
var diskIO = errorIOBox.take()
322329
group.addTask {
323-
let result = try await error.captureOutput(from: diskIO.take())
330+
let result = try await error.captureOutput(
331+
from: diskIO.take(), for: processIdentifier
332+
)
324333
return .standardErrorCaptured(result)
325334
}
326335
}

Sources/Subprocess/AsyncBufferSequence.swift

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,13 @@ public struct AsyncBufferSequence: AsyncSequence, @unchecked Sendable {
5757
public typealias Element = Buffer
5858

5959
private let diskIO: DiskIO
60+
private let processIdentifier: ProcessIdentifier
6061
private let preferredBufferSize: Int
6162
private var buffer: [Buffer]
6263

63-
internal init(diskIO: DiskIO) {
64+
internal init(diskIO: DiskIO, processIdentifier: ProcessIdentifier) {
6465
self.diskIO = diskIO
66+
self.processIdentifier = processIdentifier
6567
self.buffer = []
6668
// Only need to query it once at beginning of stream
6769
self.preferredBufferSize = AsyncIO.queryPipeBufferSize(for: diskIO)
@@ -76,6 +78,7 @@ public struct AsyncBufferSequence: AsyncSequence, @unchecked Sendable {
7678
// Read more data
7779
let data = try await AsyncIO.shared.read(
7880
from: self.diskIO,
81+
for: self.processIdentifier,
7982
upTo: self.preferredBufferSize
8083
)
8184
guard let data else {
@@ -97,10 +100,12 @@ public struct AsyncBufferSequence: AsyncSequence, @unchecked Sendable {
97100
}
98101

99102
private let diskIO: DiskIO
103+
private let processIdentifier: ProcessIdentifier
100104
private let state: State
101105

102-
internal init(diskIO: DiskIO) {
106+
internal init(diskIO: DiskIO, processIdentifier: ProcessIdentifier) {
103107
self.diskIO = diskIO
108+
self.processIdentifier = processIdentifier
104109
self.state = State()
105110
}
106111

@@ -109,7 +114,7 @@ public struct AsyncBufferSequence: AsyncSequence, @unchecked Sendable {
109114
guard self.state.initializedCount() == 1 else {
110115
fatalError("AsyncBufferSequence is single pass. It can only be iterated once.")
111116
}
112-
return Iterator(diskIO: self.diskIO)
117+
return Iterator(diskIO: self.diskIO, processIdentifier: self.processIdentifier)
113118
}
114119

115120
/// Splits the buffer into strings using the specified separator.

Sources/Subprocess/Configuration.swift

Lines changed: 68 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,14 @@ public struct Configuration: Sendable {
3838
public var environment: Environment
3939
/// The working directory to use when running the executable.
4040
///
41-
/// If this property is `nil`, the subprocess will inherit
42-
/// the working directory from the parent process.
41+
/// If this property is `nil`, the subprocess inherits the working
42+
/// directory from the parent process.
4343
public var workingDirectory: FilePath?
4444
/// The platform-specific options to use when
4545
/// running the subprocess.
4646
public var platformOptions: PlatformOptions
4747

48-
/// Creates a Configuration with the parameters you provide.
48+
/// Creates a configuration with the parameters you provide.
4949
/// - Parameters:
5050
/// - executable: The executable to run.
5151
/// - arguments: The arguments to pass to the executable.
@@ -107,37 +107,78 @@ public struct Configuration: Sendable {
107107
let processIdentifier = spawnResults.processIdentifier
108108

109109
defer {
110-
// Close process file descriptor now we finished monitoring
110+
// Close the process file descriptor now that monitoring has finished.
111111
processIdentifier.close()
112112
}
113113

114114
return try await withAsyncTaskCleanupHandler { () throws -> ExecutionOutcome<Result> in
115-
let inputIO = spawnResults.inputWriteEnd()
116-
let outputIO = spawnResults.outputReadEnd()
117-
let errorIO = spawnResults.errorReadEnd()
118-
119-
let result: Swift.Result<Result, any Swift.Error>
120-
do {
121-
// Body runs in the same isolation
122-
let bodyResult = try await body(processIdentifier, inputIO, outputIO, errorIO)
123-
124-
result = .success(bodyResult)
125-
} catch {
126-
result = .failure(error)
115+
// The counter coordinates a race between two finishers: the body
116+
// closure and the process-termination monitor. Whichever side
117+
// increments first observes a value of `1` and owns the response;
118+
// the other side sees `2` and stays out of the way.
119+
//
120+
// If the monitor wins, the child has exited while the body is
121+
// still reading or writing. The body might be blocked on a pipe
122+
// that an inherited grandchild keeps open, so the monitor calls
123+
// `cancelAsyncIO` to unblock it. If the body wins, the I/O
124+
// already finished cleanly and no cancellation is needed.
125+
let taskFinishFlag = AtomicCounter()
126+
127+
let (result, monitorError) = try await withThrowingTaskGroup(
128+
of: SubprocessError?.self,
129+
returning: (Swift.Result<Result, any Swift.Error>, SubprocessError?).self
130+
) { group in
131+
group.addTask {
132+
do throws(SubprocessError) {
133+
try await waitForProcessTermination(for: processIdentifier)
134+
if taskFinishFlag.addOne() == 1 {
135+
// The body closure hasn't finished but the child
136+
// process has terminated. Cancel all active
137+
// AsyncIO now.
138+
try AsyncIO.shared.cancelAsyncIO(for: processIdentifier)
139+
}
140+
return nil
141+
} catch {
142+
try? AsyncIO.shared.cancelAsyncIO(for: processIdentifier)
143+
return error
144+
}
145+
}
146+
147+
let inputIO = spawnResults.inputWriteEnd()
148+
let outputIO = spawnResults.outputReadEnd()
149+
let errorIO = spawnResults.errorReadEnd()
150+
151+
let result: Swift.Result<Result, any Swift.Error>
152+
do {
153+
// Body runs in the same isolation.
154+
let bodyResult = try await body(processIdentifier, inputIO, outputIO, errorIO)
155+
taskFinishFlag.addOne()
156+
result = .success(bodyResult)
157+
} catch {
158+
result = .failure(error)
159+
}
160+
161+
// Wait for the monitor child task to finish.
162+
let monitorError = try await group.next() ?? nil
163+
return (result, monitorError)
127164
}
128165

129-
// Ensure that we begin monitoring process termination after `body` runs
130-
// and regardless of whether `body` throws, so that the pid gets reaped
131-
// even if `body` throws, and we are not leaving zombie processes in the
132-
// process table which will cause the process termination monitoring thread
133-
// to effectively hang due to the pid never being awaited
134-
let terminationStatus = try await monitorProcessTermination(
135-
for: processIdentifier
136-
)
166+
// Drop the cancellation marker before reaping the zombie. After
167+
// `reapProcess` runs the kernel can immediately reuse this PID,
168+
// so the marker must be gone first; otherwise a concurrent
169+
// `run()` that happens to inherit the same PID would see the
170+
// stale entry and reject its registrations.
171+
AsyncIO.shared.cleanup(processIdentifier: processIdentifier)
172+
173+
let terminationStatus = try reapProcess(with: processIdentifier)
174+
175+
if let monitorError {
176+
throw monitorError
177+
}
137178

138-
return ExecutionOutcome(
179+
return try ExecutionOutcome(
139180
terminationStatus: terminationStatus,
140-
value: try result.get()
181+
value: result.get()
141182
)
142183
} onCleanup: {
143184
let execution = Execution<Input, Output, Error>(
@@ -146,7 +187,7 @@ public struct Configuration: Sendable {
146187
outputStream: nil,
147188
errorStream: nil
148189
)
149-
// Attempt to terminate the child process
190+
// Attempt to terminate the child process.
150191
await execution.runTeardownSequence(
151192
self.platformOptions.teardownSequence
152193
)

0 commit comments

Comments
 (0)