Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Sources/Subprocess/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

add_library(Subprocess
Execution.swift
ExecutionContext.swift
Buffer.swift
Error.swift
Teardown.swift
Expand Down
264 changes: 182 additions & 82 deletions Sources/Subprocess/Configuration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,36 @@ public struct Configuration: Sendable {
) async throws -> Result
)
) async throws -> ExecutionOutcome<Result> {
var spawnResults = try await self.spawn(
withInput: input,
outputPipe: output,
errorPipe: error
)
// Built before spawning so it can be attached to spawn failures, where
// no resolved values exist yet.
let baseContext = ExecutionContext(self)

var spawnResults: SpawnResult
#if os(Windows)
// `spawn` is `throws(SubprocessError)` on Windows, so `error` is
// already typed; an `as SubprocessError` pattern is a redundant cast
// that crashes the 6.2 toolchain's SIL ownership verifier. Elsewhere
// `spawn` is untyped `throws`, where the cast is meaningful.
do {
spawnResults = try await self.spawn(
withInput: input,
outputPipe: output,
errorPipe: error
)
} catch {
throw error.withExecutionContext(baseContext)
}
#else
do {
spawnResults = try await self.spawn(
withInput: input,
outputPipe: output,
errorPipe: error
)
} catch let error as SubprocessError {
throw error.withExecutionContext(baseContext)
}
#endif

let processIdentifier = spawnResults.processIdentifier

Expand All @@ -95,92 +120,105 @@ public struct Configuration: Sendable {
processIdentifier.close()
}

return try await withAsyncTaskCleanupHandler { () throws -> ExecutionOutcome<Result> in
// The counter coordinates a race between two finishers: the body
// closure and the process-termination monitor. Whichever side
// increments first observes a value of `1` and owns the response;
// the other side sees `2` and stays out of the way.
//
// If the monitor wins, the child has exited while the body is
// still reading or writing. The body might be blocked on a pipe
// that an inherited grandchild keeps open, so the monitor calls
// `cancelAsyncIO` to unblock it. If the body wins, the I/O
// already finished cleanly and no cancellation is needed.
let taskFinishFlag = AtomicCounter()

let (result, monitorError) = try await withThrowingTaskGroup(
of: SubprocessError?.self,
returning: (Swift.Result<Result, any Swift.Error>, SubprocessError?).self
) { group in
group.addTask {
do throws(SubprocessError) {
try await waitForProcessTermination(for: processIdentifier)
if taskFinishFlag.addOne() == 1 {
// The body closure hasn't finished but the child
// process has terminated. Cancel all active
// AsyncIO now.
try AsyncIO.shared.cancelAsyncIO(for: processIdentifier)
// Spawn succeeded, so resolved values are available. This context is
// attached to any error surfacing after spawn.
let executionContext = ExecutionContext(
self,
resolvedExecutable: spawnResults.resolvedExecutable,
resolvedEnvironment: self.environment.resolvedValues(),
resolvedWorkingDirectory: self.workingDirectory ?? currentWorkingDirectory()
)

do {
return try await withAsyncTaskCleanupHandler { () throws -> ExecutionOutcome<Result> in
// The counter coordinates a race between two finishers: the body
// closure and the process-termination monitor. Whichever side
// increments first observes a value of `1` and owns the response;
// the other side sees `2` and stays out of the way.
//
// If the monitor wins, the child has exited while the body is
// still reading or writing. The body might be blocked on a pipe
// that an inherited grandchild keeps open, so the monitor calls
// `cancelAsyncIO` to unblock it. If the body wins, the I/O
// already finished cleanly and no cancellation is needed.
let taskFinishFlag = AtomicCounter()

let (result, monitorError) = try await withThrowingTaskGroup(
of: SubprocessError?.self,
returning: (Swift.Result<Result, any Swift.Error>, SubprocessError?).self
) { group in
group.addTask {
do throws(SubprocessError) {
try await waitForProcessTermination(for: processIdentifier)
if taskFinishFlag.addOne() == 1 {
// The body closure hasn't finished but the child
// process has terminated. Cancel all active
// AsyncIO now.
try AsyncIO.shared.cancelAsyncIO(for: processIdentifier)
}
return nil
} catch {
try? AsyncIO.shared.cancelAsyncIO(for: processIdentifier)
return error
}
return nil
} catch {
try? AsyncIO.shared.cancelAsyncIO(for: processIdentifier)
return error
}
}

let inputIO = spawnResults.inputWriteEnd()
let outputIO = spawnResults.outputReadEnd()
let errorIO = spawnResults.errorReadEnd()
let inputIO = spawnResults.inputWriteEnd()
let outputIO = spawnResults.outputReadEnd()
let errorIO = spawnResults.errorReadEnd()

let result: Swift.Result<Result, any Swift.Error>
do {
// Body runs in the same isolation.
let bodyResult = try await body(processIdentifier, inputIO, outputIO, errorIO)
taskFinishFlag.addOne()
result = .success(bodyResult)
} catch {
let execution = Execution<Input, Output, Error>(
processIdentifier: processIdentifier,
inputWriter: nil,
outputStream: nil,
errorStream: nil
)
// Attempt to terminate the child process when the body throws
await execution.teardown(using: self.platformOptions.teardownSequence)
result = .failure(error)
let result: Swift.Result<Result, any Swift.Error>
do {
// Body runs in the same isolation.
let bodyResult = try await body(processIdentifier, inputIO, outputIO, errorIO)
taskFinishFlag.addOne()
result = .success(bodyResult)
} catch {
let execution = Execution<Input, Output, Error>(
processIdentifier: processIdentifier,
inputWriter: nil,
outputStream: nil,
errorStream: nil
)
// Attempt to terminate the child process when the body throws
await execution.teardown(using: self.platformOptions.teardownSequence)
result = .failure(error)
}

// Wait for the monitor child task to finish.
let monitorError = try await group.next() ?? nil
return (result, monitorError)
}

// Wait for the monitor child task to finish.
let monitorError = try await group.next() ?? nil
return (result, monitorError)
}
// Drop the cancellation marker before reaping the zombie. After
// `reapProcess` runs the kernel can immediately reuse this PID,
// so the marker must be gone first; otherwise a concurrent
// `run()` that happens to inherit the same PID would see the
// stale entry and reject its registrations.
AsyncIO.shared.cleanup(processIdentifier: processIdentifier)

// Drop the cancellation marker before reaping the zombie. After
// `reapProcess` runs the kernel can immediately reuse this PID,
// so the marker must be gone first; otherwise a concurrent
// `run()` that happens to inherit the same PID would see the
// stale entry and reject its registrations.
AsyncIO.shared.cleanup(processIdentifier: processIdentifier)
let terminationStatus = try reapProcess(with: processIdentifier)

let terminationStatus = try reapProcess(with: processIdentifier)
if let monitorError {
throw monitorError
}

if let monitorError {
throw monitorError
return try ExecutionOutcome(
terminationStatus: terminationStatus,
value: result.get()
)
} onCleanup: {
let execution = Execution<Input, Output, Error>(
processIdentifier: processIdentifier,
inputWriter: nil,
outputStream: nil,
errorStream: nil
)
// Attempt to terminate the child process.
await execution.teardown(using: self.platformOptions.teardownSequence)
}

return try ExecutionOutcome(
terminationStatus: terminationStatus,
value: result.get()
)
} onCleanup: {
let execution = Execution<Input, Output, Error>(
processIdentifier: processIdentifier,
inputWriter: nil,
outputStream: nil,
errorStream: nil
)
// Attempt to terminate the child process.
await execution.teardown(using: self.platformOptions.teardownSequence)
} catch let error as SubprocessError {
throw error.withExecutionContext(executionContext)
}
}
}
Expand Down Expand Up @@ -573,6 +611,40 @@ extension Environment: CustomStringConvertible, CustomDebugStringConvertible {
return results
}
}

/// The concrete environment variables passed to the child process, or
/// `nil` if they cannot be represented as a `[Key: String]` map.
///
/// This mirrors the environment built by the platform spawn path: for
/// `.inherit`, the parent's current values with the configured overrides
/// applied (a `nil` override removes the key); for `.custom`, those values
/// directly; for raw-byte environments, `nil`. On Windows, `PATH` is
/// injected from the parent when absent, matching the spawn path's
/// DLL-resolution behavior.
internal func resolvedValues() -> [Environment.Key: String]? {
var values: [Environment.Key: String]
switch self.config {
case .custom(let customValues):
values = customValues
case .inherit(let overrides):
values = Self.currentEnvironmentValues()
for (key, override) in overrides {
values[key] = override
}
#if !os(Windows)
case .rawBytes:
return nil
#endif
}
#if os(Windows)
if values[.path] == nil,
let parentPath = Self.currentEnvironmentValues()[.path]
{
values[.path] = parentPath
}
#endif
return values
}
}

extension Environment.Key {
Expand Down Expand Up @@ -697,6 +769,7 @@ extension Configuration {
/// via `SpawnResult` to perform actual reads
internal struct SpawnResult: ~Copyable {
let processIdentifier: ProcessIdentifier
let resolvedExecutable: FilePath?
var _inputWriteEnd: IODescriptor?
var _outputReadEnd: IODescriptor?
var _errorReadEnd: IODescriptor?
Expand All @@ -705,12 +778,14 @@ extension Configuration {
processIdentifier: ProcessIdentifier,
inputWriteEnd: consuming IODescriptor?,
outputReadEnd: consuming IODescriptor?,
errorReadEnd: consuming IODescriptor?
errorReadEnd: consuming IODescriptor?,
resolvedExecutable: FilePath? = nil
) {
self.processIdentifier = processIdentifier
self._inputWriteEnd = consume inputWriteEnd
self._outputReadEnd = consume outputReadEnd
self._errorReadEnd = consume errorReadEnd
self.resolvedExecutable = resolvedExecutable
}

mutating func inputWriteEnd() -> IODescriptor? {
Expand Down Expand Up @@ -1346,3 +1421,28 @@ extension StringProtocol {
}
}
}

/// Best-effort lookup of the parent process's current working directory.
///
/// Returns `nil` on failure; never throws, since it only feeds diagnostics.
internal func currentWorkingDirectory() -> FilePath? {
#if os(Windows)
let length = GetCurrentDirectoryW(0, nil)
guard length > 0 else { return nil }
let path = try? fillNullTerminatedWideStringBuffer(
initialSize: length,
maxSize: DWORD(Int16.max)
) {
GetCurrentDirectoryW(DWORD($0.count), $0.baseAddress)
}
guard let path else { return nil }
return FilePath(path)
#else
return withUnsafeTemporaryAllocation(of: CChar.self, capacity: Int(PATH_MAX)) { buffer -> FilePath? in
guard getcwd(buffer.baseAddress, buffer.count) != nil else {
return nil
}
return FilePath(platformString: buffer.baseAddress!)
}
#endif
}
Loading
Loading