From 006121c2eec609341c08bfd74d558542a72874ea Mon Sep 17 00:00:00 2001 From: Jake Petroules Date: Mon, 16 Mar 2026 23:43:22 -0700 Subject: [PATCH] Add ResourceUsage to report subprocess CPU time and memory consumption Introduce a public ResourceUsage struct that exposes userTime, systemTime (as Duration), and maxRSS (in bytes) for every terminated subprocess. An ExecutionResult protocol provides common access to terminationStatus and resourceUsage across both ExecutionOutcome and ExecutionRecord. On Unix, resource data is collected via wait4 (BSD) or the Linux kernel's 5-argument waitid syscall, which populates a rusage struct alongside the termination status. A linux_waitid C shim is added because glibc and musl only expose the 4-parameter POSIX waitid that omits rusage. The raw rusage is available as a public property on non-Windows platforms, with maxRSS normalized from KiB to bytes on Linux, FreeBSD, and OpenBSD. On Windows, GetProcessTimes provides CPU time (converted from FILETIME 100ns units) and GetProcessMemoryInfo provides PeakWorkingSetSize (maxRSS). This functionality is particularly necessary as part of SwiftSubprocess because collecting rusage information from a terminated subprocess requires the ability to run code when a process has changed state from executing to zombie, but before its pid has been reaped - something not possible with the current Subprocess API. Further, it is notoriously difficult to collect this information across all OSes for an arbitrary PID anyways, at least in a way that doesn't also simultaneously reap the pid. User time, system time, and maxRSS are some of the most common metrics typically extracted from getrusage. Exposing the raw rusage struct provides access to the rest, and on Windows, callers can use DuplicateHandle to get a process descriptor that can outlive Subprocess's control of the process and collect any additional metrics from the process when it is known to be in a terminated state. --- Sources/Subprocess/API.swift | 1 + Sources/Subprocess/Configuration.swift | 3 +- .../Platforms/Subprocess+Unix.swift | 79 ++++++++-- .../Platforms/Subprocess+Windows.swift | 7 +- Sources/Subprocess/Result.swift | 145 +++++++++++++++++- .../_SubprocessCShims/include/process_shims.h | 3 + Sources/_SubprocessCShims/process_shims.c | 6 + .../ProcessMonitoringTests.swift | 23 ++- 8 files changed, 236 insertions(+), 31 deletions(-) diff --git a/Sources/Subprocess/API.swift b/Sources/Subprocess/API.swift index 8906345b..937a8694 100644 --- a/Sources/Subprocess/API.swift +++ b/Sources/Subprocess/API.swift @@ -378,6 +378,7 @@ public func run< return ExecutionResult( processIdentifier: result.value.processIdentifier, terminationStatus: result.terminationStatus, + resourceUsage: result.resourceUsage, closureOutput: result.value.closureResult, standardOutput: result.value.output, standardError: result.value.error diff --git a/Sources/Subprocess/Configuration.swift b/Sources/Subprocess/Configuration.swift index c9bb4a76..218d1532 100644 --- a/Sources/Subprocess/Configuration.swift +++ b/Sources/Subprocess/Configuration.swift @@ -162,7 +162,7 @@ public struct Configuration: Sendable { // stale entry and reject its registrations. AsyncIO.shared.cleanup(processIdentifier: processIdentifier) - let terminationStatus = try reapProcess(with: processIdentifier) + let (terminationStatus, resourceUsage) = try reapProcess(with: processIdentifier) if let monitorError { throw monitorError @@ -170,6 +170,7 @@ public struct Configuration: Sendable { return try ExecutionOutcome( terminationStatus: terminationStatus, + resourceUsage: resourceUsage, value: result.get() ) } onCleanup: { diff --git a/Sources/Subprocess/Platforms/Subprocess+Unix.swift b/Sources/Subprocess/Platforms/Subprocess+Unix.swift index 6ef928f8..f75c8a36 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Unix.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Unix.swift @@ -694,14 +694,13 @@ extension PlatformOptions: CustomStringConvertible, CustomDebugStringConvertible @Sendable internal func reapProcess( with processIdentifier: ProcessIdentifier -) throws(SubprocessError) -> TerminationStatus { +) throws(SubprocessError) -> (TerminationStatus, ResourceUsage) { do throws(Errno) { // On some platforms, the process exit notification (in particular NOTE_EXIT from kqueue) // may be delivered slightly before the process becomes reapable, // so we must call waitid without WNOHANG to avoid a narrow possibility of a race condition. // If waitid does block, it won't do so for very long at all. - let status = try processIdentifier.blockingReap() - return status + return try processIdentifier.blockingReap() } catch { let subprocessError: SubprocessError = .failedToMonitor(withUnderlyingError: error) throw subprocessError @@ -711,12 +710,12 @@ internal func reapProcess( extension ProcessIdentifier { /// Reaps the zombie for the exited process. This function may block. @available(*, noasync) - internal func blockingReap() throws(Errno) -> TerminationStatus { + internal func blockingReap() throws(Errno) -> (TerminationStatus, ResourceUsage) { try _blockingReap(pid: value) } /// Reaps the zombie for the exited process, or returns `nil` if the process is still running. This function will not block. - internal func reap() throws(Errno) -> TerminationStatus? { + internal func reap() throws(Errno) -> (TerminationStatus, ResourceUsage)? { try _reap(pid: value) } @@ -730,18 +729,55 @@ extension ProcessIdentifier { } @available(*, noasync) -internal func _blockingReap(pid: pid_t) throws(Errno) -> TerminationStatus { - let siginfo = try _waitid(idtype: P_PID, id: id_t(pid), flags: WEXITED) - return TerminationStatus(siginfo) +internal func _blockingReap(pid: pid_t) throws(Errno) -> (TerminationStatus, ResourceUsage) { + while true { + var usage = rusage() + #if os(macOS) || os(FreeBSD) || os(OpenBSD) + var status: CInt = 0 + let rc = wait4(pid, &status, 0, &usage) + #elseif os(Linux) || os(Android) + var siginfo = siginfo_t() + let rc = linux_waitid(P_PID, id_t(pid), &siginfo, WEXITED, &usage) + #endif + if rc >= 0 { + #if os(macOS) || os(FreeBSD) || os(OpenBSD) + return (TerminationStatus(waitStatus: status), ResourceUsage(usage)) + #elseif os(Linux) || os(Android) + return (TerminationStatus(siginfo), ResourceUsage(usage)) + #endif + } else if errno != EINTR { + throw Errno(rawValue: errno) + } + } } -internal func _reap(pid: pid_t) throws(Errno) -> TerminationStatus? { - let siginfo = try _waitid(idtype: P_PID, id: id_t(pid), flags: WEXITED | WNOHANG) - // If si_pid and si_signo are both 0, the child is still running since we used WNOHANG - if siginfo.si_pid == 0 && siginfo.si_signo == 0 { - return nil +internal func _reap(pid: pid_t) throws(Errno) -> (TerminationStatus, ResourceUsage)? { + while true { + var usage = rusage() + #if os(macOS) || os(FreeBSD) || os(OpenBSD) + var status: CInt = 0 + let rc = wait4(pid, &status, WNOHANG, &usage) + if rc > 0 { + return (TerminationStatus(waitStatus: status), ResourceUsage(usage)) + } else if rc == 0 { + return nil // Child still running + } + #elseif os(Linux) || os(Android) + var siginfo = siginfo_t() + let rc = linux_waitid(P_PID, id_t(pid), &siginfo, WEXITED | WNOHANG, &usage) + if rc != -1 { + // If si_pid and si_signo are both 0, the child is still running since we used WNOHANG + if siginfo.si_pid == 0 && siginfo.si_signo == 0 { + return nil + } + return (TerminationStatus(siginfo), ResourceUsage(usage)) + } + #endif + // rc == -1: either EINTR (retry) or a real error + if errno != EINTR { + throw Errno(rawValue: errno) + } } - return TerminationStatus(siginfo) } internal func _peekIfExited(pid: pid_t) throws(Errno) -> Bool { @@ -775,6 +811,21 @@ internal extension TerminationStatus { } } +#if os(macOS) || os(FreeBSD) || os(OpenBSD) +internal extension TerminationStatus { + init(waitStatus: CInt) { + switch (_was_process_exited(waitStatus) != 0, _was_process_signaled(waitStatus) != 0) { + case (true, false): + self = .exited(CInt(_get_exit_code(waitStatus))) + case (false, true): + self = .signaled(CInt(_get_signal_code(waitStatus))) + case (true, true), (false, false): + fatalError("Unexpected wait status: \(waitStatus)") + } + } +} +#endif + #if os(OpenBSD) || os(Linux) || os(Android) internal extension siginfo_t { var si_status: Int32 { diff --git a/Sources/Subprocess/Platforms/Subprocess+Windows.swift b/Sources/Subprocess/Platforms/Subprocess+Windows.swift index 5e5eda52..3ed28615 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Windows.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Windows.swift @@ -575,10 +575,13 @@ internal func waitForProcessTermination( @Sendable internal func reapProcess( with processIdentifier: ProcessIdentifier -) throws(SubprocessError) -> TerminationStatus { +) throws(SubprocessError) -> (TerminationStatus, ResourceUsage) { // Windows keeps the exit code reachable through the process HANDLE // until the HANDLE is closed, so there is no zombie to reap. We just // need to read the exit code via `GetExitCodeProcess`. + // Collect resource usage while the process handle is still valid + let resourceUsage = ResourceUsage(processHandle: processIdentifier.processDescriptor) + var status: DWORD = 0 guard GetExitCodeProcess(processIdentifier.processDescriptor, &status) else { throw SubprocessError.failedToMonitor( @@ -586,7 +589,7 @@ internal func reapProcess( ) } - return .exited(status) + return (.exited(status), resourceUsage) } // MARK: - Subprocess Control diff --git a/Sources/Subprocess/Result.swift b/Sources/Subprocess/Result.swift index 77be17a7..0286c750 100644 --- a/Sources/Subprocess/Result.swift +++ b/Sources/Subprocess/Result.swift @@ -15,6 +15,115 @@ import System import SystemPackage #endif +#if canImport(Darwin) +public import Darwin +#elseif canImport(Glibc) +public import Glibc +#elseif canImport(Musl) +public import Musl +#elseif canImport(Android) +public import Android +#elseif canImport(WinSDK) +import WinSDK +#endif + +// MARK: - ResourceUsage + +/// Resource usage information for a terminated subprocess. +public struct ResourceUsage: Sendable, Hashable { + /// The total amount of time spent executing in user mode. + public let userTime: Duration + /// The total amount of time spent executing in kernel mode. + public let systemTime: Duration + /// The peak resident set size (maximum memory used), in bytes. + public let maxRSS: Int + + #if !os(Windows) + /// The underlying POSIX resource usage information. + public let rusage: rusage + #endif +} + +extension ResourceUsage { + #if os(Windows) + internal init(processHandle: HANDLE) { + var creationTime = FILETIME() + var exitTime = FILETIME() + var kernelTime = FILETIME() + var userFileTime = FILETIME() + + if GetProcessTimes( + processHandle, + &creationTime, + &exitTime, + &kernelTime, + &userFileTime + ) { + self.userTime = Duration(userFileTime) + self.systemTime = Duration(kernelTime) + } else { + self.userTime = .zero + self.systemTime = .zero + } + + var memInfo = PROCESS_MEMORY_COUNTERS() + memInfo.cb = DWORD(MemoryLayout.size) + if K32GetProcessMemoryInfo( + processHandle, + &memInfo, + DWORD(MemoryLayout.size) + ) { + self.maxRSS = Int(memInfo.PeakWorkingSetSize) + } else { + self.maxRSS = 0 + } + } + #else + internal init(_ usage: rusage) { + self.userTime = Duration( + secondsComponent: Int64(usage.ru_utime.tv_sec), + attosecondsComponent: Int64(usage.ru_utime.tv_usec) * 1_000_000_000_000 + ) + self.systemTime = Duration( + secondsComponent: Int64(usage.ru_stime.tv_sec), + attosecondsComponent: Int64(usage.ru_stime.tv_usec) * 1_000_000_000_000 + ) + #if canImport(Darwin) + self.maxRSS = Int(usage.ru_maxrss) // bytes on Darwin + #elseif os(Linux) || os(Android) || os(FreeBSD) || os(OpenBSD) + self.maxRSS = Int(usage.ru_maxrss) * 1024 // KiB to bytes (Linux, FreeBSD, OpenBSD, NetBSD) + #else + #error("ru_maxrss unit scaling not defined for this platform") + #endif + self.rusage = usage + } + #endif +} + +#if os(Windows) +extension Duration { + fileprivate init(_ ft: FILETIME) { + let hundredNanos = UInt64(ft.dwHighDateTime) << 32 | UInt64(ft.dwLowDateTime) + let seconds = Int64(hundredNanos / 10_000_000) + let remainder = Int64(hundredNanos % 10_000_000) + self = Duration( + secondsComponent: seconds, + attosecondsComponent: remainder * 100_000_000_000 + ) + } +} +#endif + +// MARK: - ExecutionSummary Protocol + +/// Protocol providing common properties for subprocess execution results. +public protocol ExecutionSummary: Sendable { + /// The termination status of the child process. + var terminationStatus: TerminationStatus { get } + /// The resource usage of the terminated child process. + var resourceUsage: ResourceUsage { get } +} + // MARK: - Result /// The result of running a subprocess, including the closure's return value, @@ -42,6 +151,8 @@ public struct ExecutionResult< public let standardOutput: Output.OutputType /// The collected standard error of the subprocess. public let standardError: Error.OutputType + /// The resource usage of the terminated child process. + public let resourceUsage: ResourceUsage /// The value returned by the body closure passed to `run`. public let closureOutput: ClosureResult @@ -49,20 +160,45 @@ public struct ExecutionResult< internal init( processIdentifier: ProcessIdentifier, terminationStatus: TerminationStatus, + resourceUsage: ResourceUsage, closureOutput: ClosureResult, standardOutput: Output.OutputType, standardError: Error.OutputType ) { self.processIdentifier = processIdentifier self.terminationStatus = terminationStatus + self.resourceUsage = resourceUsage self.closureOutput = closureOutput self.standardOutput = standardOutput self.standardError = standardError } } +// MARK: - rusage Conformances +#if !os(Windows) +extension rusage: @retroactive Equatable { + public static func == (lhs: rusage, rhs: rusage) -> Bool { + withUnsafeBytes(of: lhs) { lhsBytes in + withUnsafeBytes(of: rhs) { rhsBytes in + lhsBytes.elementsEqual(rhsBytes) + } + } + } +} + +extension rusage: @retroactive Hashable { + public func hash(into hasher: inout Hasher) { + withUnsafeBytes(of: self) { bytes in + hasher.combine(bytes: bytes) + } + } +} +#endif + // MARK: - ExecutionResult Conformances +extension ExecutionResult: ExecutionSummary {} + extension ExecutionResult: Equatable where Output.OutputType: Equatable, Error.OutputType: Equatable, ClosureResult: Equatable {} extension ExecutionResult: Hashable where Output.OutputType: Hashable, Error.OutputType: Hashable, ClosureResult: Hashable {} @@ -74,6 +210,7 @@ extension ExecutionResult: CustomStringConvertible where Output.OutputType: Cust ExecutionResult( processIdentifier: \(self.processIdentifier), terminationStatus: \(self.terminationStatus.description), + resourceUsage: \(self.resourceUsage), closureOutput: \(String(describing: self.closureOutput)), standardOutput: \(self.standardOutput.description) standardError: \(self.standardError.description) @@ -90,6 +227,7 @@ where Output.OutputType: CustomDebugStringConvertible, Error.OutputType: CustomD ExecutionResult( processIdentifier: \(self.processIdentifier), terminationStatus: \(self.terminationStatus.debugDescription), + resourceUsage: \(self.resourceUsage), closureOutput: \(String(describing: self.closureOutput)), standardOutput: \(self.standardOutput.debugDescription) standardError: \(self.standardError.debugDescription) @@ -105,11 +243,14 @@ where Output.OutputType: CustomDebugStringConvertible, Error.OutputType: CustomD internal struct ExecutionOutcome: Sendable { /// The termination status of the child process. internal let terminationStatus: TerminationStatus + /// The resource usage of the terminated child process. + internal let resourceUsage: ResourceUsage /// The value returned by the closure passed to the `run` method. internal let value: Result - internal init(terminationStatus: TerminationStatus, value: Result) { + internal init(terminationStatus: TerminationStatus, resourceUsage: ResourceUsage, value: Result) { self.terminationStatus = terminationStatus + self.resourceUsage = resourceUsage self.value = value } } @@ -124,6 +265,7 @@ extension ExecutionOutcome: CustomStringConvertible where Result: CustomStringCo return """ ExecutionOutcome( terminationStatus: \(self.terminationStatus.description), + resourceUsage: \(self.resourceUsage), value: \(self.value.description) ) """ @@ -136,6 +278,7 @@ extension ExecutionOutcome: CustomDebugStringConvertible where Result: CustomDeb return """ ExecutionOutcome( terminationStatus: \(self.terminationStatus.debugDescription), + resourceUsage: \(self.resourceUsage), value: \(self.value.debugDescription) ) """ diff --git a/Sources/_SubprocessCShims/include/process_shims.h b/Sources/_SubprocessCShims/include/process_shims.h index ac076d84..5cec4294 100644 --- a/Sources/_SubprocessCShims/include/process_shims.h +++ b/Sources/_SubprocessCShims/include/process_shims.h @@ -16,6 +16,7 @@ #if !TARGET_OS_WINDOWS #include +#include #include #if _POSIX_SPAWN @@ -114,6 +115,8 @@ int _shims_snprintf( #if TARGET_OS_LINUX int _pidfd_open(pid_t pid); +int linux_waitid(idtype_t idtype, id_t id, siginfo_t * _Nonnull infop, int options, struct rusage * _Nonnull rusage); + // P_PIDFD is only defined on Linux Kernel 5.4 and above // Define our value if it's not available #ifndef P_PIDFD diff --git a/Sources/_SubprocessCShims/process_shims.c b/Sources/_SubprocessCShims/process_shims.c index 406ff3fe..68c95585 100644 --- a/Sources/_SubprocessCShims/process_shims.c +++ b/Sources/_SubprocessCShims/process_shims.c @@ -296,6 +296,12 @@ int _pidfd_send_signal(int pidfd, int signal) { return syscall(SYS_pidfd_send_signal, pidfd, signal, NULL, 0); } +// glibc/musl only expose the 4-parameter POSIX waitid variant. +// The Linux kernel's waitid syscall accepts a 5th parameter: struct rusage. +int linux_waitid(idtype_t idtype, id_t id, siginfo_t *infop, int options, struct rusage *rusage) { + return syscall(SYS_waitid, idtype, id, infop, options, rusage); +} + // SYS_clone3 is only defined on Linux Kernel 5.3 and above // Define our dummy value if it's not available (as is the case with Musl libc) #ifndef SYS_clone3 diff --git a/Tests/SubprocessTests/ProcessMonitoringTests.swift b/Tests/SubprocessTests/ProcessMonitoringTests.swift index e3291c4e..0b0e5beb 100644 --- a/Tests/SubprocessTests/ProcessMonitoringTests.swift +++ b/Tests/SubprocessTests/ProcessMonitoringTests.swift @@ -126,10 +126,9 @@ extension SubprocessProcessMonitoringTests { @Test func testNormalExit() async throws { let config = self.immediateExitProcess(withExitCode: 0) try await withSpawnedExecution(config: config) { execution in - let monitorResult = try await monitorProcessTermination( + let (monitorResult, _) = try await monitorProcessTermination( for: execution.processIdentifier ) - #expect(monitorResult.isSuccess) } } @@ -137,10 +136,9 @@ extension SubprocessProcessMonitoringTests { @Test func testExitCode() async throws { let config = self.immediateExitProcess(withExitCode: 42) try await withSpawnedExecution(config: config) { execution in - let monitorResult = try await monitorProcessTermination( + let (monitorResult, _) = try await monitorProcessTermination( for: execution.processIdentifier ) - #expect(monitorResult == .exited(42)) } } @@ -155,7 +153,7 @@ extension SubprocessProcessMonitoringTests { // Send signal to process try execution.send(signal: .terminate) - let result = try await monitorProcessTermination( + let (result, _) = try await monitorProcessTermination( for: execution.processIdentifier ) #expect(result == .signaled(SIGTERM)) @@ -185,7 +183,7 @@ extension SubprocessProcessMonitoringTests { ) #endif // Now make sure monitorProcessTermination() can still get the correct result - let monitorResult = try await monitorProcessTermination( + let (monitorResult, _) = try await monitorProcessTermination( for: execution.processIdentifier ) #expect(monitorResult == TerminationStatus.exited(0)) @@ -195,10 +193,9 @@ extension SubprocessProcessMonitoringTests { @Test func testCanMonitorLongRunningProcess() async throws { let config = self.longRunningProcess(withTimeOutSeconds: 1) try await withSpawnedExecution(config: config) { execution in - let monitorResult = try await monitorProcessTermination( + let (monitorResult, _) = try await monitorProcessTermination( for: execution.processIdentifier ) - #expect(monitorResult.isSuccess) } } @@ -237,7 +234,7 @@ extension SubprocessProcessMonitoringTests { try await withSpawnedExecution(config: child1) { child1Execution in try await withSpawnedExecution(config: child2) { child2Execution in // Monitor child2, but make sure we don't reap child1's status - let status = try await monitorProcessTermination( + let (status, _) = try await monitorProcessTermination( for: child2Execution.processIdentifier ) #expect(status.isSuccess) @@ -285,7 +282,7 @@ extension SubprocessProcessMonitoringTests { ) try await withSpawnedExecution(config: config) { execution in - let monitorResult = try await monitorProcessTermination( + let (monitorResult, _) = try await monitorProcessTermination( for: execution.processIdentifier ) #expect(monitorResult.isSuccess) @@ -325,7 +322,7 @@ extension SubprocessProcessMonitoringTests { try await group.waitForAll() } // Every waiter resumed without error; reap the process once. - let status = try reapProcess(with: execution.processIdentifier) + let (status, _) = try reapProcess(with: execution.processIdentifier) #expect(status.isSuccess) } } @@ -355,7 +352,7 @@ extension SubprocessProcessMonitoringTests { try await withThrowingTaskGroup { group in for pid in spawnedProcesses { group.addTask { - let status = try await monitorProcessTermination(for: pid) + let (status, _) = try await monitorProcessTermination(for: pid) #expect(status.isSuccess) } } @@ -365,7 +362,7 @@ extension SubprocessProcessMonitoringTests { } } -internal func monitorProcessTermination(for processIdentifier: ProcessIdentifier) async throws -> TerminationStatus { +internal func monitorProcessTermination(for processIdentifier: ProcessIdentifier) async throws -> (TerminationStatus, ResourceUsage) { try await waitForProcessTermination(for: processIdentifier) return try reapProcess(with: processIdentifier) }