Skip to content

Commit 1163367

Browse files
authored
Make graceful shutdown return as soon as the subprocess exits (#281)
Race process-termination monitoring against the teardown sequence in a task group: if the monitor finishes first, the child has exited, so cancel the teardown task and return immediately; if teardown finishes first (after its terminal ), wait for the monitor to observe the exit Make waitForProcessTermination on all platforms idempotent, since now we are calling it multiple times
1 parent d7eec2b commit 1163367

5 files changed

Lines changed: 149 additions & 22 deletions

File tree

Sources/Subprocess/Configuration.swift

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -188,9 +188,7 @@ public struct Configuration: Sendable {
188188
errorStream: nil
189189
)
190190
// Attempt to terminate the child process.
191-
await execution.runTeardownSequence(
192-
self.platformOptions.teardownSequence
193-
)
191+
await execution.teardown(using: self.platformOptions.teardownSequence)
194192
}
195193
}
196194
}

Sources/Subprocess/Platforms/Subprocess+Linux.swift

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -83,26 +83,31 @@ internal func waitForProcessTermination(
8383
// On older releases, use signalfd so we do not need
8484
// to register anything with epoll
8585
if processIdentifier.processDescriptor > 0 {
86-
// Register processDescriptor with epoll
87-
var event = epoll_event(
88-
events: EPOLLIN.rawValue,
89-
data: epoll_data(fd: processIdentifier.processDescriptor)
90-
)
91-
let rc = epoll_ctl(
92-
storage.epollFileDescriptor,
93-
EPOLL_CTL_ADD,
94-
processIdentifier.processDescriptor,
95-
&event
96-
)
97-
if rc != 0 {
98-
let epollErrno = errno
99-
let error: SubprocessError = .failedToMonitor(
100-
withUnderlyingError: Errno(rawValue: epollErrno)
86+
var newState = storage
87+
// epoll rejects duplicate EPOLL_CTL_ADD for the same fd
88+
// with EEXIST, so only register the pidfd the first
89+
// time we see it. Subsequent waiters share the
90+
// existing registration via the continuation list.
91+
if newState.continuations[processIdentifier.processDescriptor] == nil {
92+
var event = epoll_event(
93+
events: EPOLLIN.rawValue,
94+
data: epoll_data(fd: processIdentifier.processDescriptor)
10195
)
102-
return .failure(error)
96+
let rc = epoll_ctl(
97+
storage.epollFileDescriptor,
98+
EPOLL_CTL_ADD,
99+
processIdentifier.processDescriptor,
100+
&event
101+
)
102+
if rc != 0 {
103+
let epollErrno = errno
104+
let error: SubprocessError = .failedToMonitor(
105+
withUnderlyingError: Errno(rawValue: epollErrno)
106+
)
107+
return .failure(error)
108+
}
103109
}
104110
// Now save the registration
105-
var newState = storage
106111
var list = newState.continuations[processIdentifier.processDescriptor] ?? []
107112
list.append(continuation)
108113
newState.continuations[processIdentifier.processDescriptor] = list
@@ -119,7 +124,7 @@ internal func waitForProcessTermination(
119124
if !processExited {
120125
// Save this continuation to be called by signal handler
121126
var newState = storage
122-
var list = newState.continuations[processIdentifier.processDescriptor] ?? []
127+
var list = newState.continuations[processIdentifier.value] ?? []
123128
list.append(continuation)
124129
newState.continuations[processIdentifier.value] = list
125130
state = .started(newState)

Sources/Subprocess/Teardown.swift

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,11 +102,37 @@ extension Execution {
102102
/// - Parameter sequence: The steps to perform.
103103
public func teardown(using sequence: some Sequence<TeardownStep> & Sendable) async {
104104
await withUncancelledTask {
105-
await runTeardownSequence(sequence)
105+
await withTaskGroup(of: TeardownGroupResult.self) { group in
106+
group.addTask {
107+
try? await waitForProcessTermination(for: self.processIdentifier)
108+
return .processMonitoringFinished
109+
}
110+
111+
group.addTask {
112+
await runTeardownSequence(sequence)
113+
return .teardownFinished
114+
}
115+
116+
let firstFinishedTask = await group.next()!
117+
switch firstFinishedTask {
118+
case .processMonitoringFinished:
119+
// Process has exited. Cancel the teardown task now
120+
group.cancelAll()
121+
case .teardownFinished:
122+
// Teardown sequence has finished. Wait for process monitoring
123+
// to finish due to `kill`
124+
await group.waitForAll()
125+
}
126+
}
106127
}
107128
}
108129
}
109130

131+
private enum TeardownGroupResult {
132+
case processMonitoringFinished
133+
case teardownFinished
134+
}
135+
110136
internal enum TeardownStepCompletion {
111137
case processHasExited
112138
case processStillAlive

Tests/SubprocessTests/ProcessMonitoringTests.swift

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,39 @@ extension SubprocessProcessMonitoringTests {
296296
}
297297
}
298298

299+
@Test func testCanMonitorSameProcessConcurrently() async throws {
300+
// Multiple tasks waiting on the *same* process must all observe its
301+
// termination. This is the idempotency contract of
302+
// waitForProcessTermination: registering the same process for
303+
// monitoring more than once concurrently must succeed on every
304+
// platform. (On Linux >= 5.4 this specifically guards against
305+
// double-registering the pidfd with epoll, which fails with EEXIST.)
306+
let waiterCount = 10
307+
// Keep the child alive long enough that every waiter registers
308+
// before it exits, so the concurrent-registration path is exercised.
309+
let config = self.longRunningProcess(withTimeOutSeconds: 1)
310+
try await withSpawnedExecution(config: config) { execution in
311+
try await withThrowingTaskGroup { group in
312+
for _ in 0..<waiterCount {
313+
group.addTask {
314+
// Call the monitoring primitive directly instead of
315+
// monitorProcessTermination: the zombie must be reaped
316+
// exactly once, so we reap only after every waiter has
317+
// observed termination.
318+
try await waitForProcessTermination(
319+
for: execution.processIdentifier
320+
)
321+
}
322+
}
323+
324+
try await group.waitForAll()
325+
}
326+
// Every waiter resumed without error; reap the process once.
327+
let status = try reapProcess(with: execution.processIdentifier)
328+
#expect(status.isSuccess)
329+
}
330+
}
331+
299332
@Test func testExitSignalCoalescing() async throws {
300333
// Spawn many immediately exit processes in a row to trigger
301334
// signal coalescing. Make sure we can handle this

Tests/SubprocessTests/UnixTests.swift

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,71 @@ extension SubprocessUnixTests {
239239
}
240240
}
241241

242+
// MARK: - Teardown Timing
243+
extension SubprocessUnixTests {
244+
/// Spawns a child that prints `ready` and then blocks in `sleep`, waits
245+
/// for that marker so the child is known to be running, then cancels the
246+
/// run to trigger teardown and returns how long the teardown took.
247+
private func measureCancelledTeardown(
248+
using teardownSequence: [TeardownStep]
249+
) async -> Duration {
250+
let (readyStream, readyContinuation) = AsyncStream.makeStream(of: Void.self)
251+
return await withTaskGroup(
252+
of: Void.self,
253+
returning: Duration.self
254+
) { group in
255+
group.addTask {
256+
var platformOptions = PlatformOptions()
257+
// Isolate the child in its own session so teardown can't reach
258+
// anything but the process we spawned.
259+
platformOptions.createSession = true
260+
platformOptions.teardownSequence = teardownSequence
261+
let configuration = Configuration(
262+
.path("/bin/sh"),
263+
// `exec` so the monitored child becomes `sleep` itself,
264+
// which dies instantly on SIGTERM/SIGKILL.
265+
arguments: ["-c", "echo ready; exec sleep 10"],
266+
platformOptions: platformOptions
267+
)
268+
_ = try? await Subprocess.run(
269+
configuration,
270+
input: .none,
271+
output: .sequence,
272+
error: .discarded
273+
) { execution in
274+
for try await line in execution.standardOutput.strings() {
275+
if line.trimmingCharacters(in: .whitespacesAndNewlines) == "ready" {
276+
readyContinuation.finish()
277+
}
278+
}
279+
}
280+
}
281+
// Block until the child confirms it is running.
282+
for await _ in readyStream {}
283+
// Time only the teardown triggered by cancellation.
284+
return await ContinuousClock().measure {
285+
group.cancelAll()
286+
await group.waitForAll()
287+
}
288+
}
289+
}
290+
291+
@Test func testKillTeardownReturnsAsSoonAsProcessExits() async {
292+
let elapsed = await self.measureCancelledTeardown(using: [
293+
.send(signal: .kill, allowedDurationToNextStep: .seconds(5))
294+
])
295+
#expect(elapsed < .seconds(5), "SIGKILL is uncatchable; teardown should not wait")
296+
}
297+
298+
@Test func testTerminateTeardownReturnsAsSoonAsProcessExits() async {
299+
let elapsed = await self.measureCancelledTeardown(using: [
300+
.send(signal: .terminate, allowedDurationToNextStep: .seconds(3)),
301+
.send(signal: .kill, allowedDurationToNextStep: .seconds(5)),
302+
])
303+
#expect(elapsed < .seconds(5), "sleep dies on SIGTERM instantly; teardown should not wait")
304+
}
305+
}
306+
242307
// MARK: - PATH Resolution Tests
243308
extension SubprocessUnixTests {
244309
@Test func testExecutablePathsPreserveOrder() throws {

0 commit comments

Comments
 (0)