Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Sources/Subprocess/API.swift
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ public func run<
return ExecutionResult(
processIdentifier: result.value.processIdentifier,
terminationStatus: result.terminationStatus,
resourceUsage: result.resourceUsage,
closureOutput: result.value.closureResult,
standardOutput: result.value.output,
standardError: result.value.error
Expand Down
3 changes: 2 additions & 1 deletion Sources/Subprocess/Configuration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,15 @@ public struct Configuration: Sendable {
// stale entry and reject its registrations.
AsyncIO.shared.cleanup(processIdentifier: processIdentifier)

let terminationStatus = try reapProcess(with: processIdentifier)
let (terminationStatus, resourceUsage) = try reapProcess(with: processIdentifier)

if let monitorError {
throw monitorError
}

return try ExecutionOutcome(
terminationStatus: terminationStatus,
resourceUsage: resourceUsage,
value: result.get()
)
} onCleanup: {
Expand Down
79 changes: 65 additions & 14 deletions Sources/Subprocess/Platforms/Subprocess+Unix.swift
Original file line number Diff line number Diff line change
Expand Up @@ -694,14 +694,13 @@ extension PlatformOptions: CustomStringConvertible, CustomDebugStringConvertible
@Sendable
internal func reapProcess(
with processIdentifier: ProcessIdentifier
) throws(SubprocessError) -> TerminationStatus {
) throws(SubprocessError) -> (TerminationStatus, ResourceUsage) {
do throws(Errno) {
// On some platforms, the process exit notification (in particular NOTE_EXIT from kqueue)
// may be delivered slightly before the process becomes reapable,
// so we must call waitid without WNOHANG to avoid a narrow possibility of a race condition.
// If waitid does block, it won't do so for very long at all.
let status = try processIdentifier.blockingReap()
return status
return try processIdentifier.blockingReap()
} catch {
let subprocessError: SubprocessError = .failedToMonitor(withUnderlyingError: error)
throw subprocessError
Expand All @@ -711,12 +710,12 @@ internal func reapProcess(
extension ProcessIdentifier {
/// Reaps the zombie for the exited process. This function may block.
@available(*, noasync)
internal func blockingReap() throws(Errno) -> TerminationStatus {
internal func blockingReap() throws(Errno) -> (TerminationStatus, ResourceUsage) {
try _blockingReap(pid: value)
}

/// Reaps the zombie for the exited process, or returns `nil` if the process is still running. This function will not block.
internal func reap() throws(Errno) -> TerminationStatus? {
internal func reap() throws(Errno) -> (TerminationStatus, ResourceUsage)? {
try _reap(pid: value)
}

Expand All @@ -730,18 +729,55 @@ extension ProcessIdentifier {
}

@available(*, noasync)
internal func _blockingReap(pid: pid_t) throws(Errno) -> TerminationStatus {
let siginfo = try _waitid(idtype: P_PID, id: id_t(pid), flags: WEXITED)
return TerminationStatus(siginfo)
internal func _blockingReap(pid: pid_t) throws(Errno) -> (TerminationStatus, ResourceUsage) {
while true {
var usage = rusage()
#if os(macOS) || os(FreeBSD) || os(OpenBSD)
var status: CInt = 0
let rc = wait4(pid, &status, 0, &usage)
#elseif os(Linux) || os(Android)
var siginfo = siginfo_t()
let rc = linux_waitid(P_PID, id_t(pid), &siginfo, WEXITED, &usage)
#endif
if rc >= 0 {
#if os(macOS) || os(FreeBSD) || os(OpenBSD)
return (TerminationStatus(waitStatus: status), ResourceUsage(usage))
#elseif os(Linux) || os(Android)
return (TerminationStatus(siginfo), ResourceUsage(usage))
#endif
} else if errno != EINTR {
throw Errno(rawValue: errno)
}
}
}

internal func _reap(pid: pid_t) throws(Errno) -> TerminationStatus? {
let siginfo = try _waitid(idtype: P_PID, id: id_t(pid), flags: WEXITED | WNOHANG)
// If si_pid and si_signo are both 0, the child is still running since we used WNOHANG
if siginfo.si_pid == 0 && siginfo.si_signo == 0 {
return nil
internal func _reap(pid: pid_t) throws(Errno) -> (TerminationStatus, ResourceUsage)? {
while true {
var usage = rusage()
#if os(macOS) || os(FreeBSD) || os(OpenBSD)
var status: CInt = 0
let rc = wait4(pid, &status, WNOHANG, &usage)
if rc > 0 {
return (TerminationStatus(waitStatus: status), ResourceUsage(usage))
} else if rc == 0 {
return nil // Child still running
}
#elseif os(Linux) || os(Android)
var siginfo = siginfo_t()
let rc = linux_waitid(P_PID, id_t(pid), &siginfo, WEXITED | WNOHANG, &usage)
if rc != -1 {
// If si_pid and si_signo are both 0, the child is still running since we used WNOHANG
if siginfo.si_pid == 0 && siginfo.si_signo == 0 {
return nil
}
return (TerminationStatus(siginfo), ResourceUsage(usage))
}
#endif
// rc == -1: either EINTR (retry) or a real error
if errno != EINTR {
throw Errno(rawValue: errno)
}
}
return TerminationStatus(siginfo)
}

internal func _peekIfExited(pid: pid_t) throws(Errno) -> Bool {
Expand Down Expand Up @@ -775,6 +811,21 @@ internal extension TerminationStatus {
}
}

#if os(macOS) || os(FreeBSD) || os(OpenBSD)
internal extension TerminationStatus {
init(waitStatus: CInt) {
switch (_was_process_exited(waitStatus) != 0, _was_process_signaled(waitStatus) != 0) {
case (true, false):
self = .exited(CInt(_get_exit_code(waitStatus)))
case (false, true):
self = .signaled(CInt(_get_signal_code(waitStatus)))
case (true, true), (false, false):
fatalError("Unexpected wait status: \(waitStatus)")
}
}
}
#endif

#if os(OpenBSD) || os(Linux) || os(Android)
internal extension siginfo_t {
var si_status: Int32 {
Expand Down
7 changes: 5 additions & 2 deletions Sources/Subprocess/Platforms/Subprocess+Windows.swift
Original file line number Diff line number Diff line change
Expand Up @@ -575,18 +575,21 @@ internal func waitForProcessTermination(
@Sendable
internal func reapProcess(
with processIdentifier: ProcessIdentifier
) throws(SubprocessError) -> TerminationStatus {
) throws(SubprocessError) -> (TerminationStatus, ResourceUsage) {
// Windows keeps the exit code reachable through the process HANDLE
// until the HANDLE is closed, so there is no zombie to reap. We just
// need to read the exit code via `GetExitCodeProcess`.
// Collect resource usage while the process handle is still valid
let resourceUsage = ResourceUsage(processHandle: processIdentifier.processDescriptor)

var status: DWORD = 0
guard GetExitCodeProcess(processIdentifier.processDescriptor, &status) else {
throw SubprocessError.failedToMonitor(
withUnderlyingError: .init(rawValue: GetLastError())
)
}

return .exited(status)
return (.exited(status), resourceUsage)
}

// MARK: - Subprocess Control
Expand Down
145 changes: 144 additions & 1 deletion Sources/Subprocess/Result.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,115 @@ import System
import SystemPackage
#endif

#if canImport(Darwin)
public import Darwin
#elseif canImport(Glibc)
public import Glibc
#elseif canImport(Musl)
public import Musl
#elseif canImport(Android)
public import Android
#elseif canImport(WinSDK)
import WinSDK
#endif

// MARK: - ResourceUsage

/// Resource usage information for a terminated subprocess.
public struct ResourceUsage: Sendable, Hashable {
/// The total amount of time spent executing in user mode.
public let userTime: Duration
/// The total amount of time spent executing in kernel mode.
public let systemTime: Duration
/// The peak resident set size (maximum memory used), in bytes.
public let maxRSS: Int

#if !os(Windows)
/// The underlying POSIX resource usage information.
public let rusage: rusage
#endif
}

extension ResourceUsage {
#if os(Windows)
internal init(processHandle: HANDLE) {
var creationTime = FILETIME()
var exitTime = FILETIME()
var kernelTime = FILETIME()
var userFileTime = FILETIME()

if GetProcessTimes(
processHandle,
&creationTime,
&exitTime,
&kernelTime,
&userFileTime
) {
self.userTime = Duration(userFileTime)
self.systemTime = Duration(kernelTime)
} else {
self.userTime = .zero
self.systemTime = .zero
}

var memInfo = PROCESS_MEMORY_COUNTERS()
memInfo.cb = DWORD(MemoryLayout<PROCESS_MEMORY_COUNTERS>.size)
if K32GetProcessMemoryInfo(
processHandle,
&memInfo,
DWORD(MemoryLayout<PROCESS_MEMORY_COUNTERS>.size)
) {
self.maxRSS = Int(memInfo.PeakWorkingSetSize)
} else {
self.maxRSS = 0
}
}
#else
internal init(_ usage: rusage) {
self.userTime = Duration(
secondsComponent: Int64(usage.ru_utime.tv_sec),
attosecondsComponent: Int64(usage.ru_utime.tv_usec) * 1_000_000_000_000
)
self.systemTime = Duration(
secondsComponent: Int64(usage.ru_stime.tv_sec),
attosecondsComponent: Int64(usage.ru_stime.tv_usec) * 1_000_000_000_000
)
#if canImport(Darwin)
self.maxRSS = Int(usage.ru_maxrss) // bytes on Darwin
#elseif os(Linux) || os(Android) || os(FreeBSD) || os(OpenBSD)
self.maxRSS = Int(usage.ru_maxrss) * 1024 // KiB to bytes (Linux, FreeBSD, OpenBSD, NetBSD)
#else
#error("ru_maxrss unit scaling not defined for this platform")
#endif
self.rusage = usage
}
#endif
}

#if os(Windows)
extension Duration {
fileprivate init(_ ft: FILETIME) {
let hundredNanos = UInt64(ft.dwHighDateTime) << 32 | UInt64(ft.dwLowDateTime)
let seconds = Int64(hundredNanos / 10_000_000)
let remainder = Int64(hundredNanos % 10_000_000)
self = Duration(
secondsComponent: seconds,
attosecondsComponent: remainder * 100_000_000_000
)
}
}
#endif

// MARK: - ExecutionSummary Protocol

/// Protocol providing common properties for subprocess execution results.
public protocol ExecutionSummary: Sendable {
/// The termination status of the child process.
var terminationStatus: TerminationStatus { get }
/// The resource usage of the terminated child process.
var resourceUsage: ResourceUsage { get }
}

// MARK: - Result

/// The result of running a subprocess, including the closure's return value,
Expand Down Expand Up @@ -42,27 +151,54 @@ public struct ExecutionResult<
public let standardOutput: Output.OutputType
/// The collected standard error of the subprocess.
public let standardError: Error.OutputType
/// The resource usage of the terminated child process.
public let resourceUsage: ResourceUsage

/// The value returned by the body closure passed to `run`.
public let closureOutput: ClosureResult

internal init(
processIdentifier: ProcessIdentifier,
terminationStatus: TerminationStatus,
resourceUsage: ResourceUsage,
closureOutput: ClosureResult,
standardOutput: Output.OutputType,
standardError: Error.OutputType
) {
self.processIdentifier = processIdentifier
self.terminationStatus = terminationStatus
self.resourceUsage = resourceUsage
self.closureOutput = closureOutput
self.standardOutput = standardOutput
self.standardError = standardError
}
}

// MARK: - rusage Conformances
#if !os(Windows)
extension rusage: @retroactive Equatable {
public static func == (lhs: rusage, rhs: rusage) -> Bool {
withUnsafeBytes(of: lhs) { lhsBytes in
withUnsafeBytes(of: rhs) { rhsBytes in
lhsBytes.elementsEqual(rhsBytes)
}
}
}
}

extension rusage: @retroactive Hashable {
public func hash(into hasher: inout Hasher) {
withUnsafeBytes(of: self) { bytes in
hasher.combine(bytes: bytes)
}
}
}
#endif

// MARK: - ExecutionResult Conformances

extension ExecutionResult: ExecutionSummary {}

extension ExecutionResult: Equatable where Output.OutputType: Equatable, Error.OutputType: Equatable, ClosureResult: Equatable {}

extension ExecutionResult: Hashable where Output.OutputType: Hashable, Error.OutputType: Hashable, ClosureResult: Hashable {}
Expand All @@ -74,6 +210,7 @@ extension ExecutionResult: CustomStringConvertible where Output.OutputType: Cust
ExecutionResult(
processIdentifier: \(self.processIdentifier),
terminationStatus: \(self.terminationStatus.description),
resourceUsage: \(self.resourceUsage),
closureOutput: \(String(describing: self.closureOutput)),
standardOutput: \(self.standardOutput.description)
standardError: \(self.standardError.description)
Expand All @@ -90,6 +227,7 @@ where Output.OutputType: CustomDebugStringConvertible, Error.OutputType: CustomD
ExecutionResult(
processIdentifier: \(self.processIdentifier),
terminationStatus: \(self.terminationStatus.debugDescription),
resourceUsage: \(self.resourceUsage),
closureOutput: \(String(describing: self.closureOutput)),
standardOutput: \(self.standardOutput.debugDescription)
standardError: \(self.standardError.debugDescription)
Expand All @@ -105,11 +243,14 @@ where Output.OutputType: CustomDebugStringConvertible, Error.OutputType: CustomD
internal struct ExecutionOutcome<Result: Sendable>: Sendable {
/// The termination status of the child process.
internal let terminationStatus: TerminationStatus
/// The resource usage of the terminated child process.
internal let resourceUsage: ResourceUsage
/// The value returned by the closure passed to the `run` method.
internal let value: Result

internal init(terminationStatus: TerminationStatus, value: Result) {
internal init(terminationStatus: TerminationStatus, resourceUsage: ResourceUsage, value: Result) {
self.terminationStatus = terminationStatus
self.resourceUsage = resourceUsage
self.value = value
}
}
Expand All @@ -124,6 +265,7 @@ extension ExecutionOutcome: CustomStringConvertible where Result: CustomStringCo
return """
ExecutionOutcome(
terminationStatus: \(self.terminationStatus.description),
resourceUsage: \(self.resourceUsage),
value: \(self.value.description)
)
"""
Expand All @@ -136,6 +278,7 @@ extension ExecutionOutcome: CustomDebugStringConvertible where Result: CustomDeb
return """
ExecutionOutcome(
terminationStatus: \(self.terminationStatus.debugDescription),
resourceUsage: \(self.resourceUsage),
value: \(self.value.debugDescription)
)
"""
Expand Down
Loading
Loading