Skip to content

Commit 9630762

Browse files
committed
Add ResourceUsage to report subprocess CPU time and memory consumption
Introduce a public ResourceUsage struct that exposes userTime, systemTime (as Duration), and maxRSS (in bytes) for every terminated subprocess. An ExecutionResult protocol provides common access to terminationStatus and resourceUsage across both ExecutionOutcome and ExecutionRecord. On Unix, resource data is collected via wait4 (BSD) or the Linux kernel's 5-argument waitid syscall, which populates a rusage struct alongside the termination status. A linux_waitid C shim is added because glibc and musl only expose the 4-parameter POSIX waitid that omits rusage. The raw rusage is available as a public property on non-Windows platforms, with maxRSS normalized from KiB to bytes on Linux, FreeBSD, and OpenBSD. On Windows, GetProcessTimes provides CPU time (converted from FILETIME 100ns units) and GetProcessMemoryInfo provides PeakWorkingSetSize (maxRSS). This functionality is particularly necessary as part of SwiftSubprocess because collecting rusage information from a terminated subprocess requires the ability to run code when a process has changed state from executing to zombie, but before its pid has been reaped - something not possible with the current Subprocess API. Further, it is notoriously difficult to collect this information across all OSes for an arbitrary PID anyways, at least in a way that doesn't also simultaneously reap the pid. User time, system time, and maxRSS are some of the most common metrics typically extracted from getrusage. Exposing the raw rusage struct provides access to the rest, and on Windows, callers can use DuplicateHandle to get a process descriptor that can outlive Subprocess's control of the process and collect any additional metrics from the process when it is known to be in a terminated state.
1 parent e417c3f commit 9630762

10 files changed

Lines changed: 267 additions & 61 deletions

File tree

Sources/Subprocess/API.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -473,6 +473,7 @@ public func run<
473473
return ExecutionRecord(
474474
processIdentifier: result.value.processIdentifier,
475475
terminationStatus: result.terminationStatus,
476+
resourceUsage: result.resourceUsage,
476477
standardOutput: result.value.standardOutput,
477478
standardError: result.value.standardError
478479
)
@@ -576,6 +577,7 @@ public func run<
576577
return ExecutionRecord(
577578
processIdentifier: result.value.processIdentifier,
578579
terminationStatus: result.terminationStatus,
580+
resourceUsage: result.resourceUsage,
579581
standardOutput: result.value.standardOutput,
580582
standardError: result.value.standardError
581583
)

Sources/Subprocess/Configuration.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,12 +127,13 @@ public struct Configuration: Sendable {
127127
// even if `body` throws, and we are not leaving zombie processes in the
128128
// process table which will cause the process termination monitoring thread
129129
// to effectively hang due to the pid never being awaited
130-
let terminationStatus = try await monitorProcessTermination(
130+
let (terminationStatus, resourceUsage) = try await monitorProcessTermination(
131131
for: execution.processIdentifier
132132
)
133133

134134
return ExecutionOutcome(
135135
terminationStatus: terminationStatus,
136+
resourceUsage: resourceUsage,
136137
value: try result.get()
137138
)
138139
} onCleanup: {

Sources/Subprocess/Platforms/Subprocess+BSD.swift

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@ internal import Dispatch
2929
@Sendable
3030
internal func monitorProcessTermination(
3131
for processIdentifier: ProcessIdentifier
32-
) async throws(SubprocessError) -> TerminationStatus {
33-
switch Result(catching: { () throws(Errno) -> TerminationStatus? in try processIdentifier.reap() }) {
34-
case let .success(status?):
35-
return status
32+
) async throws(SubprocessError) -> (TerminationStatus, ResourceUsage) {
33+
switch Result(catching: { () throws(Errno) -> (TerminationStatus, ResourceUsage)? in try processIdentifier.reap() }) {
34+
case let .success(result?):
35+
return result
3636
case .success(nil):
3737
break
3838
case let .failure(error):
@@ -50,10 +50,9 @@ internal func monitorProcessTermination(
5050

5151
do throws(Errno) {
5252
// NOTE_EXIT may be delivered slightly before the process becomes reapable,
53-
// so we must call waitid without WNOHANG to avoid a narrow possibility of a race condition.
54-
// If waitid does block, it won't do so for very long at all.
55-
let status = try processIdentifier.blockingReap()
56-
continuation.resume(returning: status)
53+
// so we must call wait4 without WNOHANG to avoid a narrow possibility of a race condition.
54+
// If wait4 does block, it won't do so for very long at all.
55+
continuation.resume(returning: try processIdentifier.blockingReap())
5756
} catch {
5857
let subprocessError: SubprocessError = .failedToMonitor(withUnderlyingError: error)
5958
continuation.resume(throwing: subprocessError)

Sources/Subprocess/Platforms/Subprocess+Linux.swift

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,10 @@ extension Int32 {
7070
@Sendable
7171
internal func monitorProcessTermination(
7272
for processIdentifier: ProcessIdentifier
73-
) async throws(SubprocessError) -> TerminationStatus {
73+
) async throws(SubprocessError) -> (TerminationStatus, ResourceUsage) {
7474
return try await _castError {
7575
return try await withCheckedThrowingContinuation { continuation in
76-
let status = _processMonitorState.withLock { state -> Result<TerminationStatus, SubprocessError>? in
76+
let status = _processMonitorState.withLock { state -> Result<(TerminationStatus, ResourceUsage), SubprocessError>? in
7777
switch state {
7878
case .notStarted:
7979
let error: SubprocessError = .failedToMonitor(withUnderlyingError: nil)
@@ -114,9 +114,9 @@ internal func monitorProcessTermination(
114114
// Since Linux coalesce signals, it's possible by the time we request
115115
// monitoring the process has already exited. Check to make sure that
116116
// is not the case and only save continuation then.
117-
switch Result(catching: { () throws(Errno) -> TerminationStatus? in try processIdentifier.reap() }) {
118-
case let .success(status?):
119-
return .success(status)
117+
switch Result(catching: { () throws(Errno) -> (TerminationStatus, ResourceUsage)? in try processIdentifier.reap() }) {
118+
case let .success(result?):
119+
return .success(result)
120120
case .success(nil):
121121
// Save this continuation to be called by signal handler
122122
var newState = storage
@@ -149,7 +149,7 @@ private enum ProcessMonitorState {
149149
let epollFileDescriptor: CInt
150150
let shutdownFileDescriptor: CInt
151151
let monitorThread: pthread_t
152-
var continuations: [PlatformFileDescriptor: CheckedContinuation<TerminationStatus, any Error>]
152+
var continuations: [PlatformFileDescriptor: CheckedContinuation<(TerminationStatus, ResourceUsage), any Error>]
153153
}
154154

155155
case notStarted
@@ -240,8 +240,8 @@ private func monitorThreadFunc(context: MonitorThreadContext) {
240240
let error: SubprocessError = .failedToMonitor(
241241
withUnderlyingError: Errno(rawValue: pwaitErrno)
242242
)
243-
let continuations = _processMonitorState.withLock { state -> [CheckedContinuation<TerminationStatus, any Error>] in
244-
let result: [CheckedContinuation<TerminationStatus, any Error>]
243+
let continuations = _processMonitorState.withLock { state -> [CheckedContinuation<(TerminationStatus, ResourceUsage), any Error>] in
244+
let result: [CheckedContinuation<(TerminationStatus, ResourceUsage), any Error>]
245245
if case .started(let storage) = state {
246246
result = Array(storage.continuations.values)
247247
} else {
@@ -408,8 +408,17 @@ internal func _setupMonitorSignalHandler() {
408408
}
409409

410410
private func _blockAndWaitForProcessDescriptor(_ pidfd: CInt, context: MonitorThreadContext) {
411-
var terminationStatus = Result(catching: { () throws(Errno) in
412-
try TerminationStatus(_waitid(idtype: idtype_t(UInt32(P_PIDFD)), id: id_t(pidfd), flags: WEXITED))
411+
var terminationResult = Result(catching: { () throws(Errno) -> (TerminationStatus, ResourceUsage) in
412+
while true {
413+
var siginfo = siginfo_t()
414+
var usage = rusage()
415+
let rc = linux_waitid(idtype_t(UInt32(P_PIDFD)), id_t(pidfd), &siginfo, WEXITED, &usage)
416+
if rc != -1 {
417+
return (TerminationStatus(siginfo), ResourceUsage(usage))
418+
} else if errno != EINTR {
419+
throw Errno(rawValue: errno)
420+
}
421+
}
413422
}).mapError { underlyingError in
414423
return SubprocessError.failedToMonitor(withUnderlyingError: underlyingError)
415424
}
@@ -423,14 +432,14 @@ private func _blockAndWaitForProcessDescriptor(_ pidfd: CInt, context: MonitorTh
423432
)
424433
if rc != 0 {
425434
let epollErrno = errno
426-
terminationStatus = .failure(
435+
terminationResult = .failure(
427436
SubprocessError.failedToMonitor(
428437
withUnderlyingError: Errno(rawValue: epollErrno)
429438
)
430439
)
431440
}
432441
// Notify the continuation
433-
let continuation = _processMonitorState.withLock { state -> CheckedContinuation<TerminationStatus, any Error>? in
442+
let continuation = _processMonitorState.withLock { state -> CheckedContinuation<(TerminationStatus, ResourceUsage), any Error>? in
434443
guard case .started(let storage) = state,
435444
let continuation = storage.continuations[pidfd]
436445
else {
@@ -442,13 +451,13 @@ private func _blockAndWaitForProcessDescriptor(_ pidfd: CInt, context: MonitorTh
442451
state = .started(newStorage)
443452
return continuation
444453
}
445-
continuation?.resume(with: terminationStatus)
454+
continuation?.resume(with: terminationResult)
446455
}
447456

448457
// On older kernels, fall back to using signal handlers
449458
private typealias ResultContinuation = (
450-
result: Result<TerminationStatus, SubprocessError>,
451-
continuation: CheckedContinuation<TerminationStatus, any Error>
459+
result: Result<(TerminationStatus, ResourceUsage), SubprocessError>,
460+
continuation: CheckedContinuation<(TerminationStatus, ResourceUsage), any Error>
452461
)
453462
private func _reapAllKnownChildProcesses(_ signalFd: CInt, context: MonitorThreadContext) {
454463
guard signalFd == _signalPipe.readEnd else {
@@ -472,19 +481,19 @@ private func _reapAllKnownChildProcesses(_ signalFd: CInt, context: MonitorThrea
472481
// Since Linux coalesce signals, we need to loop through all known child process
473482
// to check if they exited.
474483
loop: for (knownChildPID, continuation) in storage.continuations {
475-
let terminationStatus: Result<TerminationStatus, SubprocessError>
476-
switch Result(catching: { () throws(Errno) -> TerminationStatus? in try _reap(pid: knownChildPID) }) {
477-
case let .success(status?):
478-
terminationStatus = .success(status)
484+
let terminationResult: Result<(TerminationStatus, ResourceUsage), SubprocessError>
485+
switch Result(catching: { () throws(Errno) -> (TerminationStatus, ResourceUsage)? in try _reap(pid: knownChildPID) }) {
486+
case let .success(result?):
487+
terminationResult = .success(result)
479488
case .success(nil):
480489
// Move on to the next child
481490
continue loop
482491
case let .failure(error):
483-
terminationStatus = .failure(
492+
terminationResult = .failure(
484493
SubprocessError.failedToMonitor(withUnderlyingError: error)
485494
)
486495
}
487-
results.append((result: terminationStatus, continuation: continuation))
496+
results.append((result: terminationResult, continuation: continuation))
488497
// Now we have the exit code, remove saved continuations
489498
updatedContinuations.removeValue(forKey: knownChildPID)
490499
}

Sources/Subprocess/Platforms/Subprocess+Unix.swift

Lines changed: 57 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -691,37 +691,63 @@ extension PlatformOptions: CustomStringConvertible, CustomDebugStringConvertible
691691
extension ProcessIdentifier {
692692
/// Reaps the zombie for the exited process. This function may block.
693693
@available(*, noasync)
694-
internal func blockingReap() throws(Errno) -> TerminationStatus {
694+
internal func blockingReap() throws(Errno) -> (TerminationStatus, ResourceUsage) {
695695
try _blockingReap(pid: value)
696696
}
697697

698698
/// Reaps the zombie for the exited process, or returns `nil` if the process is still running. This function will not block.
699-
internal func reap() throws(Errno) -> TerminationStatus? {
699+
internal func reap() throws(Errno) -> (TerminationStatus, ResourceUsage)? {
700700
try _reap(pid: value)
701701
}
702702
}
703703

704704
@available(*, noasync)
705-
internal func _blockingReap(pid: pid_t) throws(Errno) -> TerminationStatus {
706-
let siginfo = try _waitid(idtype: P_PID, id: id_t(pid), flags: WEXITED)
707-
return TerminationStatus(siginfo)
708-
}
709-
710-
internal func _reap(pid: pid_t) throws(Errno) -> TerminationStatus? {
711-
let siginfo = try _waitid(idtype: P_PID, id: id_t(pid), flags: WEXITED | WNOHANG)
712-
// If si_pid and si_signo are both 0, the child is still running since we used WNOHANG
713-
if siginfo.si_pid == 0 && siginfo.si_signo == 0 {
714-
return nil
705+
internal func _blockingReap(pid: pid_t) throws(Errno) -> (TerminationStatus, ResourceUsage) {
706+
while true {
707+
var usage = rusage()
708+
#if os(macOS) || os(FreeBSD) || os(OpenBSD)
709+
var status: CInt = 0
710+
let rc = wait4(pid, &status, 0, &usage)
711+
#elseif os(Linux) || os(Android)
712+
var siginfo = siginfo_t()
713+
let rc = linux_waitid(P_PID, id_t(pid), &siginfo, WEXITED, &usage)
714+
#endif
715+
if rc >= 0 {
716+
#if os(macOS) || os(FreeBSD) || os(OpenBSD)
717+
return (TerminationStatus(waitStatus: status), ResourceUsage(usage))
718+
#elseif os(Linux) || os(Android)
719+
return (TerminationStatus(siginfo), ResourceUsage(usage))
720+
#endif
721+
} else if errno != EINTR {
722+
throw Errno(rawValue: errno)
723+
}
715724
}
716-
return TerminationStatus(siginfo)
717725
}
718726

719-
internal func _waitid(idtype: idtype_t, id: id_t, flags: Int32) throws(Errno) -> siginfo_t {
727+
internal func _reap(pid: pid_t) throws(Errno) -> (TerminationStatus, ResourceUsage)? {
720728
while true {
729+
var usage = rusage()
730+
#if os(macOS) || os(FreeBSD) || os(OpenBSD)
731+
var status: CInt = 0
732+
let rc = wait4(pid, &status, WNOHANG, &usage)
733+
if rc > 0 {
734+
return (TerminationStatus(waitStatus: status), ResourceUsage(usage))
735+
} else if rc == 0 {
736+
return nil // Child still running
737+
}
738+
#elseif os(Linux) || os(Android)
721739
var siginfo = siginfo_t()
722-
if waitid(idtype, id, &siginfo, flags) != -1 {
723-
return siginfo
724-
} else if errno != EINTR {
740+
let rc = linux_waitid(P_PID, id_t(pid), &siginfo, WEXITED | WNOHANG, &usage)
741+
if rc != -1 {
742+
// If si_pid and si_signo are both 0, the child is still running since we used WNOHANG
743+
if siginfo.si_pid == 0 && siginfo.si_signo == 0 {
744+
return nil
745+
}
746+
return (TerminationStatus(siginfo), ResourceUsage(usage))
747+
}
748+
#endif
749+
// rc == -1: either EINTR (retry) or a real error
750+
if errno != EINTR {
725751
throw Errno(rawValue: errno)
726752
}
727753
}
@@ -740,6 +766,20 @@ internal extension TerminationStatus {
740766
}
741767
}
742768

769+
#if os(macOS) || os(FreeBSD) || os(OpenBSD)
770+
internal extension TerminationStatus {
771+
init(waitStatus: CInt) {
772+
if _was_process_exited(waitStatus) != 0 {
773+
self = .exited(CInt(_get_exit_code(waitStatus)))
774+
} else if _was_process_signaled(waitStatus) != 0 {
775+
self = .signaled(CInt(_get_signal_code(waitStatus)))
776+
} else {
777+
fatalError("Unexpected wait status: \(waitStatus)")
778+
}
779+
}
780+
}
781+
#endif
782+
743783
#if os(OpenBSD) || os(Linux) || os(Android)
744784
internal extension siginfo_t {
745785
var si_status: Int32 {

Sources/Subprocess/Platforms/Subprocess+Windows.swift

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -610,7 +610,7 @@ extension PlatformOptions: CustomStringConvertible, CustomDebugStringConvertible
610610
@Sendable
611611
internal func monitorProcessTermination(
612612
for processIdentifier: ProcessIdentifier
613-
) async throws(SubprocessError) -> TerminationStatus {
613+
) async throws(SubprocessError) -> (TerminationStatus, ResourceUsage) {
614614
// Once the continuation resumes, it will need to unregister the wait, so
615615
// yield the wait handle back to the calling scope.
616616
var waitHandle: HANDLE?
@@ -655,14 +655,17 @@ internal func monitorProcessTermination(
655655
}
656656
}
657657

658+
// Collect resource usage while the process handle is still valid
659+
let resourceUsage = ResourceUsage(processHandle: processIdentifier.processDescriptor)
660+
658661
var status: DWORD = 0
659662
guard GetExitCodeProcess(processIdentifier.processDescriptor, &status) else {
660663
throw SubprocessError.failedToMonitor(
661664
withUnderlyingError: .init(rawValue: GetLastError())
662665
)
663666
}
664667

665-
return .exited(status)
668+
return (.exited(status), resourceUsage)
666669
}
667670

668671
// MARK: - Subprocess Control

0 commit comments

Comments
 (0)