From 2004769325a006019347fc6fe2bc3ad1e3d3976a Mon Sep 17 00:00:00 2001 From: Max Desiatov Date: Mon, 2 Mar 2026 17:29:09 +0000 Subject: [PATCH 1/8] Update AsyncProcess Test cleanups and bug fixes. --- Sources/AsyncProcess/NIOAsyncPipeWriter.swift | 2 + .../ProcessExecutor+Convenience.swift | 28 + Sources/AsyncProcess/ProcessExecutor.swift | 105 ++- .../AsyncProcess/ProcessImplementation.swift | 102 +++ .../AsyncProcess/SpecialAsyncSequences.swift | 45 ++ Sources/CProcessSpawnSync/include/ps-api.h | 3 + Sources/CProcessSpawnSync/spawner.c | 74 +- Sources/ProcessSpawnSync/ProcessSpawner.swift | 31 +- Sources/sap-exec/MainFile.swift | 55 ++ .../AsyncProcessTests/IntegrationTests.swift | 702 +++++++++++------- 10 files changed, 820 insertions(+), 327 deletions(-) create mode 100644 Sources/AsyncProcess/ProcessImplementation.swift create mode 100644 Sources/AsyncProcess/SpecialAsyncSequences.swift create mode 100644 Sources/sap-exec/MainFile.swift diff --git a/Sources/AsyncProcess/NIOAsyncPipeWriter.swift b/Sources/AsyncProcess/NIOAsyncPipeWriter.swift index c36d173..4b4d2e6 100644 --- a/Sources/AsyncProcess/NIOAsyncPipeWriter.swift +++ b/Sources/AsyncProcess/NIOAsyncPipeWriter.swift @@ -42,6 +42,8 @@ struct NIOAsyncPipeWriter where Chunks.Element } onCancel: { channel.close(promise: nil) } + // Re-raise CancellationError if we got cancelled and the cancellation handler ate the error. + try Task.checkCancellation() } finally: { _ in do { try await channel.close() diff --git a/Sources/AsyncProcess/ProcessExecutor+Convenience.swift b/Sources/AsyncProcess/ProcessExecutor+Convenience.swift index ea09a1f..838d614 100644 --- a/Sources/AsyncProcess/ProcessExecutor+Convenience.swift +++ b/Sources/AsyncProcess/ProcessExecutor+Convenience.swift @@ -509,4 +509,32 @@ extension ProcessExecutor { logger: logger ) } + + /// Runs the process by _replacing_ the current executable. + /// + /// This is achieved by calling `execve` with just the signal mask being cleared. + /// + /// - note: Contrary to normal executions, we inherit everything else (file descriptors, working directory, ...). + public static func _runReplacingCurrentProcess( + executable: String, + _ arguments: [String], + environment: [String: String] = [:], + logger: Logger = ProcessExecutor.disableLogging + ) async throws { + let spawnOptions = SpawnOptions.suitableForProcessReplacement + let exe = ProcessExecutor( + group: .singletonMultiThreadedEventLoopGroup, + executable: executable, + arguments, + environment: environment, + spawnOptions: spawnOptions, + standardInput: EOFSequence(), + standardOutput: .inherit, + standardError: .inherit, + teardownSequence: [], + logger: logger + ) + _ = try await exe.run() + fatalError("this should be unreachable: success replacing process and yet we're running!?") + } } diff --git a/Sources/AsyncProcess/ProcessExecutor.swift b/Sources/AsyncProcess/ProcessExecutor.swift index b826108..65ceadd 100644 --- a/Sources/AsyncProcess/ProcessExecutor.swift +++ b/Sources/AsyncProcess/ProcessExecutor.swift @@ -18,18 +18,6 @@ import ProcessSpawnSync @_exported import struct SystemPackage.FileDescriptor -#if os(Linux) || ASYNC_PROCESS_FORCE_PS_PROCESS - // Foundation.Process is too buggy on Linux - // - // - Foundation.Process on Linux throws error Error Domain=NSCocoaErrorDomain Code=256 "(null)" if executable not found - // https://github.com/swiftlang/swift-corelibs-foundation/issues/4810 - // - Foundation.Process on Linux doesn't correctly detect when child process dies (creating zombie processes) - // https://github.com/swiftlang/swift-corelibs-foundation/issues/4795 - // - Foundation.Process on Linux seems to inherit the Process.run()-calling thread's signal mask, even SIGTERM blocked - // https://github.com/swiftlang/swift-corelibs-foundation/issues/4772 - typealias Process = PSProcess -#endif - #if os(iOS) || os(tvOS) || os(watchOS) // Process & fork/exec unavailable #error("Process and fork() unavailable") @@ -205,6 +193,7 @@ public final actor ProcessExecutor { private let teardownSequence: TeardownSequence private let spawnOptions: SpawnOptions + @available(*, deprecated, message: "do not use") public static var isBackedByPSProcess: Bool { return Process.self == PSProcess.self } @@ -215,18 +204,12 @@ public final actor ProcessExecutor { /// The default and safe option is `true` but on Linux this incurs a performance penalty unless you have /// a new-enough Glibc & Linux that support the /// [`close_range`](https://man7.org/linux/man-pages/man2/close_range.2.html) syscall. - /// - /// On Darwin, `false` is only supported if you compile with `-Xswiftc -DASYNC_PROCESS_FORCE_PS_PROCESS`, - /// otherwise it will be silently ignored (and the other file descriptors will be closed anyway.). public var closeOtherFileDescriptors: Bool /// Change the working directory of the child process to this directory. public var changedWorkingDirectory: Optional /// Should we call `setsid()` in the child process? - /// - /// Not supported on Darwin, unless you compile with `-Xswiftc -DASYNC_PROCESS_FORCE_PS_PROCESS`, otherwise - /// it will be silently ignored (and no new session will be created). public var createNewSession: Bool /// If an `AsyncSequence` to write is provided to `standardInput`, should we ignore all write errors? @@ -248,6 +231,8 @@ public final actor ProcessExecutor { /// not return from `run(WithExtendedInfo)` until we streamed our full standard input (or it failed). public var cancelStandardInputWritingWhenProcessExits: Bool + internal var replaceProcess: Bool + /// Safe & sensible default options. public static var `default`: SpawnOptions { return SpawnOptions( @@ -256,9 +241,39 @@ public final actor ProcessExecutor { createNewSession: false, ignoreStdinStreamWriteErrors: false, cancelProcessOnStandardInputWriteFailure: true, - cancelStandardInputWritingWhenProcessExits: true + cancelStandardInputWritingWhenProcessExits: true, + replaceProcess: false + ) + } + + internal static var suitableForProcessReplacement: SpawnOptions { + return SpawnOptions( + closeOtherFileDescriptors: false, + changedWorkingDirectory: nil, + createNewSession: false, + ignoreStdinStreamWriteErrors: false, + cancelProcessOnStandardInputWriteFailure: false, + cancelStandardInputWritingWhenProcessExits: false, + replaceProcess: true ) } + + internal var requiresPSProcess: Bool { + #if os(Linux) || ASYNC_PROCESS_FORCE_PS_PROCESS + // Foundation.Process is too buggy on Linux + // + // - Foundation.Process on Linux throws error Error Domain=NSCocoaErrorDomain Code=256 "(null)" if executable not found + // https://github.com/swiftlang/swift-corelibs-foundation/issues/4810 + // - Foundation.Process on Linux doesn't correctly detect when child process dies (creating zombie processes) + // https://github.com/swiftlang/swift-corelibs-foundation/issues/4795 + // - Foundation.Process on Linux seems to inherit the Process.run()-calling thread's signal mask, even SIGTERM blocked + // https://github.com/swiftlang/swift-corelibs-foundation/issues/4772 + return true + #else + let requiresPSProcess = !self.closeOtherFileDescriptors || self.createNewSession || self.replaceProcess + return requiresPSProcess + #endif + } } public struct OSError: Error & Sendable & Hashable { @@ -436,7 +451,13 @@ public final actor ProcessExecutor { self.teardownSequence = teardownSequence self.spawnOptions = spawnOptions - self.standardInputPipe = StandardInput.self == EOFSequence.self ? .devNull : .ownedHandle(Pipe()) + if StandardInput.self == EOFSequence.self { + self.standardInputPipe = .devNull + } else if StandardInput.self == InheritStandardInput.self { + self.standardInputPipe = .inherit + } else { + self.standardInputPipe = .ownedHandle(Pipe()) + } let standardOutputWriteHandle: ChildFileState let standardErrorWriteHandle: ChildFileState @@ -566,7 +587,7 @@ public final actor ProcessExecutor { ) } - private func teardown(process: Process) async { + private func teardown(process: any ProcessImplementation) async { let childPid = self.processPid.load(ordering: .sequentiallyConsistent) guard childPid != 0 else { self.logger.warning( @@ -599,6 +620,7 @@ public final actor ProcessExecutor { return .processHasExited } } + logger.info("sending signal to process", metadata: ["signal": "\(signal)"]) try? await self.sendSignal(signal) return await group.next()! } @@ -659,7 +681,7 @@ public final actor ProcessExecutor { try await self.setupStandardOutput() try await self.setupStandardError() - let p = Process() + let p: any ProcessImplementation = Process.initialiseProcessImpl(spawnOptions: self.spawnOptions) #if canImport(Darwin) if #available(macOS 13.0, *) { p.executableURL = URL(filePath: self.executable) @@ -669,55 +691,53 @@ public final actor ProcessExecutor { #else p.executableURL = URL(fileURLWithPath: self.executable) #endif - p.arguments = self.arguments - p.environment = self.environment - p.standardInput = nil - func isTypeOf(_ existing: Existing, type: New.Type) -> New? { - return existing as? New - } + p.setArguments(self.arguments) + p.setEnvironment(self.environment) if let newCWD = self.spawnOptions.changedWorkingDirectory { p.currentDirectoryURL = URL.init(fileURLWithPath: newCWD) } - if let pSpecial = isTypeOf(p, type: PSProcess.self) { - assert(Self.isBackedByPSProcess) - pSpecial._closeOtherFileDescriptors = self.spawnOptions.closeOtherFileDescriptors - pSpecial._createNewSession = self.spawnOptions.createNewSession - } else { - assert(!Self.isBackedByPSProcess) + if !self.spawnOptions.closeOtherFileDescriptors { + (p as! PSProcess)._closeOtherFileDescriptors = false + } + if self.spawnOptions.createNewSession { + (p as! PSProcess)._createNewSession = true + } + if self.spawnOptions.replaceProcess { + (p as! PSProcess)._replaceProcess = true } switch self.standardInputPipe { case .inherit: () // We are _not_ setting it, this is `Foundation.Process`'s API for inheritance case .devNull: - p.standardInput = nil // Yes, setting to `nil` means `/dev/null` + p.setStandardInput(nil) // Yes, setting to `nil` means `/dev/null` case .ownedHandle(let pipe), .unownedHandle(let pipe): - p.standardInput = pipe + p.setStandardInput(pipe) } switch self.standardOutputWriteHandle { case .inherit: () // We are _not_ setting it, this is `Foundation.Process`'s API for inheritance case .devNull: - p.standardOutput = nil // Yes, setting to `nil` means `/dev/null` + p.setStandardOutput(nil) // Yes, setting to `nil` means `/dev/null` case .ownedHandle(let fileHandle), .unownedHandle(let fileHandle): - p.standardOutput = fileHandle + p.setStandardOutput(fileHandle) } switch self.standardErrorWriteHandle { case .inherit: () // We are _not_ setting it, this is `Foundation.Process`'s API for inheritance case .devNull: - p.standardError = nil // Yes, setting to `nil` means `/dev/null` + p.setStandardError(nil) // Yes, setting to `nil` means `/dev/null` case .ownedHandle(let fileHandle), .unownedHandle(let fileHandle): - p.standardError = fileHandle + p.setStandardError(fileHandle) } let (terminationStreamConsumer, terminationStreamProducer) = AsyncStream.justMakeIt( elementType: ProcessExitReason.self ) - p.terminationHandler = { p in + p.setTerminationHandler { p in let pProcessID = p.processIdentifier var terminationPidExchange: (exchanged: Bool, original: pid_t) = (false, -1) while !terminationPidExchange.exchanged { @@ -897,6 +917,9 @@ public final actor ProcessExecutor { ignoreWriteErrors: self.spawnOptions.ignoreStdinStreamWriteErrors, eventLoop: self.group.any() ) + } catch is CancellationError { + // The CancellationError comes from us cancelling this task when the process exits, and is expected. Don't surface an error in this case. + return .stdinWriter(nil) } catch { return .stdinWriter(error) } diff --git a/Sources/AsyncProcess/ProcessImplementation.swift b/Sources/AsyncProcess/ProcessImplementation.swift new file mode 100644 index 0000000..836562b --- /dev/null +++ b/Sources/AsyncProcess/ProcessImplementation.swift @@ -0,0 +1,102 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift open source project +// +// Copyright (c) 2022-2025 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors +// +//===----------------------------------------------------------------------===// + +import Foundation +import ProcessSpawnSync + +#if os(iOS) || os(tvOS) || os(watchOS) + #error("Process and fork() unavailable") +#endif + +internal protocol ProcessImplementation: AnyObject & Sendable { + var executableURL: URL? { get set } + var currentDirectoryURL: URL? { get set } + var launchPath: String? { get set } + var processIdentifier: pid_t { get } + var terminationReason: Process.TerminationReason { get } + var terminationStatus: CInt { get } + var isRunning: Bool { get } + + func run() throws + func setArguments(_ arguments: [String]) + func setEnvironment(_ environment: [String: String]) + func setStandardInput(_ standardInput: Pipe?) + func setStandardOutput(_ standardOutput: FileHandle?) + func setStandardError(_ standardError: FileHandle?) + func setTerminationHandler(_ handler: @Sendable @escaping (any ProcessImplementation) -> Void) +} + +extension ProcessImplementation { + static func initialiseProcessImpl(spawnOptions: ProcessExecutor.SpawnOptions) -> any ProcessImplementation { + if spawnOptions.requiresPSProcess { + return PSProcess() + } else { + return Process() + } + } +} + +extension Process: ProcessImplementation { + func setArguments(_ arguments: [String]) { + self.arguments = arguments + } + + func setEnvironment(_ environment: [String: String]) { + self.environment = environment + } + + func setStandardInput(_ standardInput: Pipe?) { + self.standardInput = standardInput + } + + func setStandardOutput(_ standardOutput: FileHandle?) { + self.standardOutput = standardOutput + } + + func setStandardError(_ standardError: FileHandle?) { + self.standardError = standardError + } + + func setTerminationHandler(_ handler: @Sendable @escaping (any ProcessImplementation) -> Void) { + self.terminationHandler = { process in + handler(process) + } + } +} + +extension PSProcess: ProcessImplementation { + func setArguments(_ arguments: [String]) { + self.arguments = arguments + } + + func setEnvironment(_ environment: [String: String]) { + self.environment = environment + } + + func setStandardInput(_ standardInput: Pipe?) { + self.standardInput = standardInput + } + + func setStandardOutput(_ standardOutput: FileHandle?) { + self.standardOutput = standardOutput + } + + func setStandardError(_ standardError: FileHandle?) { + self.standardError = standardError + } + + func setTerminationHandler(_ handler: @Sendable @escaping (any ProcessImplementation) -> Void) { + self.terminationHandler = { process in + handler(process) + } + } +} diff --git a/Sources/AsyncProcess/SpecialAsyncSequences.swift b/Sources/AsyncProcess/SpecialAsyncSequences.swift new file mode 100644 index 0000000..1251f2b --- /dev/null +++ b/Sources/AsyncProcess/SpecialAsyncSequences.swift @@ -0,0 +1,45 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift open source project +// +// Copyright (c) 2022-2025 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors +// +//===----------------------------------------------------------------------===// + +import NIO + +public struct EOFSequence: AsyncSequence & Sendable { + public typealias Element = Element + + public struct AsyncIterator: AsyncIteratorProtocol { + public mutating func next() async throws -> Element? { + return nil + } + } + + public init(of type: Element.Type = Element.self) {} + + public func makeAsyncIterator() -> AsyncIterator { + return AsyncIterator() + } +} + +public struct InheritStandardInput: AsyncSequence & Sendable { + public typealias Element = ByteBuffer + + public struct AsyncIterator: AsyncIteratorProtocol { + public mutating func next() async throws -> Element? { + return nil + } + } + + public init() {} + + public func makeAsyncIterator() -> AsyncIterator { + return AsyncIterator() + } +} diff --git a/Sources/CProcessSpawnSync/include/ps-api.h b/Sources/CProcessSpawnSync/include/ps-api.h index 271e6f3..f9056f7 100644 --- a/Sources/CProcessSpawnSync/include/ps-api.h +++ b/Sources/CProcessSpawnSync/include/ps-api.h @@ -29,6 +29,7 @@ typedef enum ps_error_kind_s { PS_ERROR_KIND_DUP = 10, PS_ERROR_KIND_SIGMASK_THREAD = 11, PS_ERROR_KIND_FAILED_CHILD_WAITPID = 12, + PS_ERROR_KIND_PTHREAD_CREATE = 13, } ps_error_kind; typedef struct ps_error_s { @@ -64,6 +65,8 @@ typedef struct ps_process_configuration_s { bool psc_new_session; bool psc_close_other_fds; + + bool psc_replace_process; } ps_process_configuration; pid_t ps_spawn_process(ps_process_configuration *config, ps_error *out_error); diff --git a/Sources/CProcessSpawnSync/spawner.c b/Sources/CProcessSpawnSync/spawner.c index a69791b..0f31203 100644 --- a/Sources/CProcessSpawnSync/spawner.c +++ b/Sources/CProcessSpawnSync/spawner.c @@ -72,6 +72,36 @@ struct child_scratch { int duplicated_fd; }; +struct sap_thread_data { + ps_process_configuration *config; + int result; + int error_code; +}; + +static void *execve_with_clean_signals(void *arg) { + struct sap_thread_data *data = (struct sap_thread_data *)arg; + + // Reset signal mask for this new thread + sigset_t sigset; + sigemptyset(&sigset); + int err = pthread_sigmask(SIG_SETMASK, &sigset, NULL); + if (err) { + data->error_code = errno; + data->result = -1; + return NULL; + } + + // Call execve with clean signal mask + err = execve(data->config->psc_path, + data->config->psc_argv, + data->config->psc_env); + + // If we get here, execve failed + data->error_code = errno; + data->result = -1; + return NULL; +} + static void setup_and_execve_child(ps_process_configuration *config, int error_pipe, struct child_scratch *scratch) { ps_error error = { 0 }; sigset_t sigset = { 0 }; @@ -205,7 +235,49 @@ pid_t ps_spawn_process(ps_process_configuration *config, ps_error *out_error) { sigset_t old_sigmask; struct child_scratch *scratch = NULL; int error_pid_fd[2] = { -1, -1 }; - int err = pipe(error_pid_fd); + int err = -1; + + if (config->psc_replace_process) { + ps_precondition(!config->psc_close_other_fds); + ps_precondition(config->psc_cwd == NULL); + ps_precondition(config->psc_fd_setup_count == 0); + ps_precondition(!config->psc_new_session); + + pthread_t thread; + pthread_attr_t attr; + struct sap_thread_data thread_data = { + .config = config, + .result = 0, + .error_code = 0 + }; + + // Create thread with default attributes + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + + err = pthread_create(&thread, &attr, execve_with_clean_signals, &thread_data); + pthread_attr_destroy(&attr); + + if (err) { + if (out_error) { + errno = err; + *out_error = MAKE_PS_ERROR_FROM_ERRNO(PS_ERROR_KIND_PTHREAD_CREATE); + } + return -1; + } + + // Wait for thread to either execve or fail + pthread_join(thread, NULL); + + // If we get here, execve failed (otherwise process would be replaced) + if (out_error) { + errno = thread_data.error_code; + *out_error = MAKE_PS_ERROR_FROM_ERRNO(PS_ERROR_KIND_EXECVE); + } + return -1; + } + + err = pipe(error_pid_fd); if (err) { ps_error error = MAKE_PS_ERROR_FROM_ERRNO(PS_ERROR_KIND_PIPE); if (out_error) { diff --git a/Sources/ProcessSpawnSync/ProcessSpawner.swift b/Sources/ProcessSpawnSync/ProcessSpawner.swift index d57fd03..905b38d 100644 --- a/Sources/ProcessSpawnSync/ProcessSpawner.swift +++ b/Sources/ProcessSpawnSync/ProcessSpawner.swift @@ -97,6 +97,7 @@ public final class PSProcess: Sendable { var currentDirectoryURL: URL? = nil var closeOtherFileDescriptors: Bool = true var createNewSession: Bool = false + var replaceProcess: Bool = false private(set) var pidWhenRunning: pid_t? = nil var standardInput: OptionallySet = .notSet var standardOutput: OptionallySet = .notSet @@ -217,11 +218,14 @@ public final class PSProcess: Sendable { stderrFDForChild = handle.fileDescriptor } - let psSetup: [ps_fd_setup] = [ - ps_fd_setup(psfd_kind: PS_MAP_FD, psfd_parent_fd: stdinFDForChild), - ps_fd_setup(psfd_kind: PS_MAP_FD, psfd_parent_fd: stdoutFDForChild), - ps_fd_setup(psfd_kind: PS_MAP_FD, psfd_parent_fd: stderrFDForChild), - ] + let psSetup: [ps_fd_setup] = + self._replaceProcess + ? [] + : [ + ps_fd_setup(psfd_kind: PS_MAP_FD, psfd_parent_fd: stdinFDForChild), + ps_fd_setup(psfd_kind: PS_MAP_FD, psfd_parent_fd: stdoutFDForChild), + ps_fd_setup(psfd_kind: PS_MAP_FD, psfd_parent_fd: stderrFDForChild), + ] let (pid, error) = psSetup.withUnsafeBufferPointer { psSetupPtr -> (pid_t, ps_error) in var config = ps_process_configuration_s( psc_path: path, @@ -231,7 +235,8 @@ public final class PSProcess: Sendable { psc_fd_setup_count: CInt(psSetupPtr.count), psc_fd_setup_instructions: psSetupPtr.baseAddress!, psc_new_session: state.createNewSession, - psc_close_other_fds: state.closeOtherFileDescriptors + psc_close_other_fds: state.closeOtherFileDescriptors, + psc_replace_process: state.replaceProcess ) var error = ps_error() let pid = ps_spawn_process(&config, &error) @@ -477,6 +482,20 @@ public final class PSProcess: Sendable { } } } + + public var _replaceProcess: Bool { + get { + self.state.withLockedValue { state in + return state.replaceProcess + } + } + set { + self.state.withLockedValue { state in + state.replaceProcess = newValue + return + } + } + } } func copyOwnedCTypedString(_ string: String) -> UnsafeMutablePointer { diff --git a/Sources/sap-exec/MainFile.swift b/Sources/sap-exec/MainFile.swift new file mode 100644 index 0000000..75ea964 --- /dev/null +++ b/Sources/sap-exec/MainFile.swift @@ -0,0 +1,55 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift open source project +// +// Copyright (c) 2022-2025 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors +// +//===----------------------------------------------------------------------===// + +import AsyncProcess +import Foundation + +#if canImport(Darwin) + import Darwin +#elseif canImport(Musl) + @preconcurrency import Musl +#elseif canImport(Glibc) + @preconcurrency import Glibc +#elseif canImport(WASILibc) + @preconcurrency import WASILibc +#elseif canImport(Bionic) + @preconcurrency import Bionic +#elseif canImport(Android) + @preconcurrency import Android +#else + #error("unknown libc, please fix") +#endif + +@main +struct SAPExec { + static func main() async throws { + let args = CommandLine.arguments.dropFirst() + + guard let executable = args.first else { + fputs("Usage: sap-exec [arguments...]\n", stderr) + exit(1) + } + + let executableArguments = Array(args.dropFirst()) + + do { + try await ProcessExecutor._runReplacingCurrentProcess( + executable: executable, + executableArguments, + environment: ProcessInfo.processInfo.environment + ) + } catch { + fputs("ERROR: \(error)\n", stderr) + exit(254) + } + } +} diff --git a/Tests/AsyncProcessTests/IntegrationTests.swift b/Tests/AsyncProcessTests/IntegrationTests.swift index 6e8c13e..631bb0f 100644 --- a/Tests/AsyncProcessTests/IntegrationTests.swift +++ b/Tests/AsyncProcessTests/IntegrationTests.swift @@ -38,6 +38,9 @@ final class IntegrationTests: XCTestCase { private var group: EventLoopGroup! private var logger: Logger! private var highestFD: CInt? + private var unusedDispatchSource: DispatchSourceSignal! + private var unusedTask: Task! + private var sapExecURL: URL! func testTheBasicsWork() async throws { let exe = ProcessExecutor( @@ -291,18 +294,18 @@ final class IntegrationTests: XCTestCase { } func testEnvironmentVariables() async throws { - let exe = ProcessExecutor( + let all = try await ProcessExecutor.runCollectingOutput( group: self.group, executable: "/bin/sh", ["-c", "echo $MY_VAR"], + collectStandardOutput: true, + collectStandardError: true, environment: ["MY_VAR": "value of my var"], - standardInput: EOFSequence(), logger: self.logger ) - let all = try await exe.runGetAllOutput() XCTAssertEqual(.exit(0), all.exitReason) - XCTAssertEqual("value of my var\n", String(buffer: all.standardOutput)) - XCTAssertEqual("", String(buffer: all.standardError)) + XCTAssertEqual("value of my var\n", String(buffer: all.standardOutput!)) + XCTAssertEqual("", String(buffer: all.standardError!)) } func testSimplePipe() async throws { @@ -323,15 +326,16 @@ final class IntegrationTests: XCTestCase { group.addTask { [elg = self.group!, logger = self.logger!] in let echoOutput = await echo.standardOutput - let sed = ProcessExecutor( + let sed = try await ProcessExecutor.runCollectingOutput( group: elg, executable: "/usr/bin/tr", ["[:lower:]", "[:upper:]"], standardInput: echoOutput, + collectStandardOutput: true, + collectStandardError: true, logger: logger ) - let output = try await sed.runGetAllOutput() - XCTAssertEqual(String(buffer: output.standardOutput), "FOO\n") + XCTAssertEqual(String(buffer: sed.standardOutput!), "FOO\n") } try await group.waitForAll() } @@ -339,17 +343,18 @@ final class IntegrationTests: XCTestCase { func testStressTestVeryLittleOutput() async throws { for _ in 0..<128 { - let exe = ProcessExecutor( + let all = try await ProcessExecutor.runCollectingOutput( group: self.group, executable: "/bin/sh", ["-c", "echo x; echo >&2 y;"], standardInput: EOFSequence(), + collectStandardOutput: true, + collectStandardError: true, logger: self.logger ) - let all = try await exe.runGetAllOutput() XCTAssertEqual(.exit(0), all.exitReason) - XCTAssertEqual("x\n", String(buffer: all.standardOutput)) - XCTAssertEqual("y\n", String(buffer: all.standardError)) + XCTAssertEqual("x\n", String(buffer: all.standardOutput!)) + XCTAssertEqual("y\n", String(buffer: all.standardError!)) } } @@ -1061,82 +1066,80 @@ final class IntegrationTests: XCTestCase { while try await outputIterator.next() != nil {} } - #if ASYNC_PROCESS_ENABLE_TESTS_WITH_PLATFORM_ASSUMPTIONS - func testCanDealWithRunawayChildProcesses() async throws { - self.logger = Logger(label: "x") - self.logger.logLevel = .info - let p = ProcessExecutor( - executable: "/bin/bash", - [ - "-c", - """ - set -e - /usr/bin/yes "Runaway process from \(#function), please file a swift-async-process bug." > /dev/null & - child_pid=$! - trap "echo >&2 'child: received signal, killing grand child ($child_pid)'; kill $child_pid" INT - echo "$$" # communicate our pid to our parent - echo "$child_pid" # communicate the child pid to our parent - exec >&- # close stdout - echo "child: waiting for grand child, pid: $child_pid" >&2 - wait - """, - ], - standardError: .inherit, - teardownSequence: [ - .sendSignal(SIGINT, allowedTimeToExitNS: 10_000_000_000) - ], - logger: self.logger - ) + func testCanDealWithRunawayChildProcesses() async throws { + self.logger = Logger(label: "x") + self.logger.logLevel = .info + let p = ProcessExecutor( + executable: "/bin/bash", + [ + "-c", + """ + set -e + /usr/bin/yes "Runaway process from \(#function), please file a swift-async-process bug." > /dev/null & + child_pid=$! + trap "echo >&2 'child: received signal, killing grand child ($child_pid)'; kill $child_pid" INT + echo "$$" # communicate our pid to our parent + echo "$child_pid" # communicate the child pid to our parent + exec >&- # close stdout + echo "child: waiting for grand child, pid: $child_pid" >&2 + wait + """, + ], + standardError: .inherit, + teardownSequence: [ + .sendSignal(SIGINT, allowedTimeToExitNS: 10_000_000_000) + ], + logger: self.logger + ) - try await withThrowingTaskGroup(of: (pid_t, pid_t)?.self) { group in - group.addTask { - let result = try await p.run() - XCTAssertEqual(.exit(128 + SIGINT), result) + try await withThrowingTaskGroup(of: (pid_t, pid_t)?.self) { group in + group.addTask { + let result = try await p.run() + XCTAssertEqual(.exit(128 + SIGINT), result) + return nil + } + + group.addTask { + let pidStrings = String(buffer: try await p.standardOutput.pullAllOfIt()).split(separator: "\n") + guard let childPID = pid_t((pidStrings.dropFirst(0).first ?? "n/a")) else { + XCTFail("couldn't get child's pid from \(pidStrings)") return nil } - - group.addTask { - let pidStrings = String(buffer: try await p.standardOutput.pullAllOfIt()).split(separator: "\n") - guard let childPID = pid_t((pidStrings.dropFirst(0).first ?? "n/a")) else { - XCTFail("couldn't get child's pid from \(pidStrings)") - return nil - } - guard let grandChildPID = pid_t((pidStrings.dropFirst(1).first ?? "n/a")) else { - XCTFail("couldn't get grand child's pid from \(pidStrings)") - return nil - } - return (childPID, grandChildPID) + guard let grandChildPID = pid_t((pidStrings.dropFirst(1).first ?? "n/a")) else { + XCTFail("couldn't get grand child's pid from \(pidStrings)") + return nil } + return (childPID, grandChildPID) + } - let maybePids = try await group.next()! - let (childPID, grandChildPID) = try XCTUnwrap(maybePids) - group.cancelAll() - try await group.waitForAll() - - // Let's check that the subprocess (/usr/bin/yes) of our subprocess (/bin/bash) is actually dead - // This is a tiny bit racy because the pid isn't immediately invalidated, so let's allow a few failures - for attempt in 1 ..< .max { - let killRet = kill(grandChildPID, 0) - let errnoCode = errno - if killRet == 0 && attempt < 10 { - logger.error("we expected kill to fail but it didn't. Attempt \(attempt), trying again...") - if attempt > 7 { - fputs("## lsof child:\n", stderr) - fputs(((try? await runLSOF(pid: childPID)) ?? "n/a") + "\n", stderr) - fputs("## lsof grand child:\n", stderr) - fputs(((try? await runLSOF(pid: grandChildPID)) ?? "n/a") + "\n", stderr) - fflush(stderr) - } - usleep(useconds_t(attempt) * 100_000) - continue + let maybePids = try await group.next()! + let (childPID, grandChildPID) = try XCTUnwrap(maybePids) + group.cancelAll() + try await group.waitForAll() + + // Let's check that the subprocess (/usr/bin/yes) of our subprocess (/bin/bash) is actually dead + // This is a tiny bit racy because the pid isn't immediately invalidated, so let's allow a few failures + for attempt in 1 ..< .max { + let killRet = kill(grandChildPID, 0) + let errnoCode = errno + if killRet == 0 && attempt < 10 { + logger.error("we expected kill to fail but it didn't. Attempt \(attempt), trying again...") + if attempt > 7 { + fputs("## lsof child:\n", stderr) + fputs(((try? await runLSOF(pid: childPID)) ?? "n/a") + "\n", stderr) + fputs("## lsof grand child:\n", stderr) + fputs(((try? await runLSOF(pid: grandChildPID)) ?? "n/a") + "\n", stderr) + fflush(stderr) } - XCTAssertEqual(-1, killRet, "\(blockingLSOF(pid: grandChildPID))") - XCTAssertEqual(ESRCH, errnoCode) - break + usleep(useconds_t(attempt) * 100_000) + continue } + XCTAssertEqual(-1, killRet, "\(blockingLSOF(pid: grandChildPID))") + XCTAssertEqual(ESRCH, errnoCode) + break } } - #endif + } func testShutdownSequenceWorks() async throws { let p = ProcessExecutor( @@ -1180,9 +1183,6 @@ final class IntegrationTests: XCTestCase { } func testCanInheritRandomFileDescriptors() async throws { - guard ProcessExecutor.isBackedByPSProcess else { - return // Foundation.Process does not support this - } var spawnOptions = ProcessExecutor.SpawnOptions.default spawnOptions.closeOtherFileDescriptors = false var pipeFDs: [Int32] = [-1, -1] @@ -1458,6 +1458,8 @@ final class IntegrationTests: XCTestCase { ].contains(error.errnoCode), "unexpected error: \(error)" ) + } catch let error as ChannelError { + XCTAssertEqual(.ioOnClosedChannel, error) } } @@ -1499,6 +1501,8 @@ final class IntegrationTests: XCTestCase { ].contains(error.errnoCode), "unexpected error: \(error)" ) + } catch let error as ChannelError { + XCTAssertEqual(.ioOnClosedChannel, error) } } @@ -1530,21 +1534,27 @@ final class IntegrationTests: XCTestCase { ) XCTAssertEqual(.exit(0), result.exitReason) // child exits by itself XCTAssertNotNil(result.standardInputWriteError) - XCTAssertEqual( - EPIPE, - (result.standardInputWriteError as? NIO.IOError).map { ioError in - if ioError.errnoCode == EBADF { - // Don't worry, not a real EBADF, just a NIO synthesised one - // https://github.com/apple/swift-nio/issues/3292 - // Let's fudge the error into a sensible one. - let ioError = NIO.IOError(errnoCode: EPIPE, reason: ioError.description) - return ioError - } else { - return ioError - } - }?.errnoCode, - "\(result.standardInputWriteError.debugDescription)" - ) + if let ioError = result.standardInputWriteError as? NIO.IOError { + XCTAssertEqual( + EPIPE, + { + if ioError.errnoCode == EBADF { + // Don't worry, not a real EBADF, just a NIO synthesised one + // https://github.com/apple/swift-nio/issues/3292 + // Let's fudge the error into a sensible one. + let ioError = NIO.IOError(errnoCode: EPIPE, reason: ioError.description) + return ioError + } else { + return ioError + } + }().errnoCode, + "\(result.standardInputWriteError.debugDescription)" + ) + } else if let channelError = result.standardInputWriteError as? ChannelError { + XCTAssertEqual(.ioOnClosedChannel, channelError) + } else { + XCTFail("unexpected error type: \(result.standardInputWriteError.debugDescription)") + } XCTAssertEqual("GO\n", String(buffer: result.standardOutput!)) XCTAssertEqual("", String(buffer: result.standardError!)) } @@ -1579,21 +1589,27 @@ final class IntegrationTests: XCTestCase { ) XCTAssertEqual(.signal(9), result.exitReason) // Child doesn't die by itself, so it'll be killed by our cancel XCTAssertNotNil(result.standardInputWriteError) - XCTAssertEqual( - EPIPE, - (result.standardInputWriteError as? NIO.IOError).map { ioError in - if ioError.errnoCode == EBADF { - // Don't worry, not a real EBADF, just a NIO synthesised one - // https://github.com/apple/swift-nio/issues/3292 - // Let's fudge the error into a sensible one. - let ioError = NIO.IOError(errnoCode: EPIPE, reason: ioError.description) - return ioError - } else { - return ioError - } - }?.errnoCode, - "\(result.standardInputWriteError.debugDescription)" - ) + if let ioError = result.standardInputWriteError as? NIO.IOError { + XCTAssertEqual( + EPIPE, + { + if ioError.errnoCode == EBADF { + // Don't worry, not a real EBADF, just a NIO synthesised one + // https://github.com/apple/swift-nio/issues/3292 + // Let's fudge the error into a sensible one. + let ioError = NIO.IOError(errnoCode: EPIPE, reason: ioError.description) + return ioError + } else { + return ioError + } + }().errnoCode, + "\(result.standardInputWriteError.debugDescription)" + ) + } else if let channelError = result.standardInputWriteError as? ChannelError { + XCTAssertEqual(.ioOnClosedChannel, channelError) + } else { + XCTFail("unexpected error type: \(result.standardInputWriteError.debugDescription)") + } XCTAssertEqual("GO\n", String(buffer: result.standardOutput!)) XCTAssertEqual("", String(buffer: result.standardError!)) } @@ -1680,6 +1696,8 @@ final class IntegrationTests: XCTestCase { ].contains(error.errnoCode), "unexpected error: \(error)" ) + } else if let error = error as? ChannelError { + XCTAssertEqual(.ioOnClosedChannel, error) } else { XCTFail("unexpected error: \(error)") } @@ -1872,102 +1890,128 @@ final class IntegrationTests: XCTestCase { } } - #if !os(Linux) // https://github.com/apple/swift-nio/issues/3294 - func testWeDoHangIfStandardInputWriterCouldStillWriteIfWeDisableCancellingInputWriterAfterExit() async throws { - // Here, we do the same thing as in testWeDoNotHangIfStandardInputRemainsOpenButProcessExits but to make matters - // worse, we're setting `spawnOptions.cancelStandardInputWritingWhenProcessExits = false` which means that we're - // not gonna return because the write will be hanging until we kill our long sleep. + func testWeDoHangIfStandardInputWriterCouldStillWriteIfWeDisableCancellingInputWriterAfterExit() async throws { + // Here, we do the same thing as in testWeDoNotHangIfStandardInputRemainsOpenButProcessExits but to make matters + // worse, we're setting `spawnOptions.cancelStandardInputWritingWhenProcessExits = false` which means that we're + // not gonna return because the write will be hanging until we kill our long sleep. - enum WhoReturned { - case processRun - case waiter - } + enum WhoReturned { + case processRun + case waiter + } - try await withThrowingTaskGroup(of: WhoReturned.self) { group in - let (stdinStream, stdinStreamProducer) = AsyncStream.makeStream(of: ByteBuffer.self) - var spawnOptions = ProcessExecutor.SpawnOptions.default - spawnOptions.cancelStandardInputWritingWhenProcessExits = false - let exe = ProcessExecutor( - executable: "/bin/sh", - [ - "-c", - #""" - # This construction attempts to emulate a simple `sleep 12345678 < /dev/null` but some shells (eg. dash) - # won't allow stdin inheritance for background processes... - exec 2>&- # close stderr - exec 2<&0 # duplicate stdin into fd 2 (so we can inherit it into sleep - - ( - exec 0<&2 # map the duplicated fd 2 as our stdin - exec 2>&- # close the duplicated fd2 - exec sleep 12345678 # sleep (this will now have the origin stdin as its stdin) - ) & # uber long sleep that will inherit our stdin pipe - exec 2>&- # close duplicated 2 - - read -r line - echo "$line" # write back the line - echo "$!" # write back the sleep - exec >&- - exit 0 - """#, - ], - spawnOptions: spawnOptions, - standardInput: stdinStream - ) - stdinStreamProducer.yield(ByteBuffer(string: "GO\n")) - stdinStreamProducer.yield(ByteBuffer(repeating: 0x42, count: 32 * 1024 * 1024)) + try await withThrowingTaskGroup(of: WhoReturned.self) { group in + let (stdinStream, stdinStreamProducer) = AsyncStream.makeStream(of: ByteBuffer.self) + var spawnOptions = ProcessExecutor.SpawnOptions.default + spawnOptions.cancelStandardInputWritingWhenProcessExits = false + let exe = ProcessExecutor( + executable: "/bin/sh", + [ + "-c", + #""" + # This construction attempts to emulate a simple `sleep 12345678 < /dev/null` but some shells (eg. dash) + # won't allow stdin inheritance for background processes... + exec 2>&- # close stderr + exec 2<&0 # duplicate stdin into fd 2 (so we can inherit it into sleep - group.addTask { - let result = try await exe.runWithExtendedInfo() - XCTAssertEqual(.exit(0), result.exitReason) - XCTAssertNotNil(result.standardInputWriteError) + ( + exec 0<&2 # map the duplicated fd 2 as our stdin + exec 2>&- # close the duplicated fd2 + exec sleep 12345678 # sleep (this will now have the origin stdin as its stdin) + ) & # uber long sleep that will inherit our stdin pipe + exec 2>&- # close duplicated 2 + + read -r line + echo "$line" # write back the line + echo "$!" # write back the sleep + exec >&- + exit 0 + """#, + ], + spawnOptions: spawnOptions, + standardInput: stdinStream + ) + stdinStreamProducer.yield(ByteBuffer(string: "GO\n")) + stdinStreamProducer.yield(ByteBuffer(repeating: 0x42, count: 32 * 1024 * 1024)) + + group.addTask { + let result = try await exe.runWithExtendedInfo() + XCTAssertEqual(.exit(0), result.exitReason) + XCTAssertNotNil(result.standardInputWriteError) + if let ioError = result.standardInputWriteError as? NIO.IOError { XCTAssert( - [ - .some(EPIPE), - .some(EBADF), // don't worry, this is a NIO-synthesised (already closed) EBADF - ].contains(result.standardInputWriteError.flatMap { $0 as? NIO.IOError }.map { $0.errnoCode }), + [EPIPE, EBADF].contains(ioError.errnoCode), "unexpected error: \(result.standardInputWriteError.debugDescription)" ) - stdinStreamProducer.finish() - return .processRun + } else if let channelError = result.standardInputWriteError as? ChannelError { + XCTAssertEqual(.ioOnClosedChannel, channelError) + } else { + XCTFail("unexpected error type: \(result.standardInputWriteError.debugDescription)") } - var stdoutLines = await exe.standardOutput.splitIntoLines().makeAsyncIterator() - let lineGo = try await stdoutLines.next() - XCTAssertEqual(ByteBuffer(string: "GO"), lineGo) - let linePid = try await stdoutLines.next().map(String.init(buffer:)) - let sleepPid = try XCTUnwrap(linePid.flatMap { CInt($0) }) - self.logger.debug("found our sleep grand-child", metadata: ["pid": "\(sleepPid)"]) + stdinStreamProducer.finish() + return .processRun + } + var stdoutLines = await exe.standardOutput.splitIntoLines().makeAsyncIterator() + let lineGo = try await stdoutLines.next() + XCTAssertEqual(ByteBuffer(string: "GO"), lineGo) + let linePid = try await stdoutLines.next().map(String.init(buffer:)) + let sleepPid = try XCTUnwrap(linePid.flatMap { CInt($0) }) + self.logger.debug("found our sleep grand-child", metadata: ["pid": "\(sleepPid)"]) - group.addTask { - try? await Task.sleep(nanoseconds: 500_000_000) // Wait until we're confident that we're stuck - return .waiter - } + group.addTask { + try? await Task.sleep(nanoseconds: 500_000_000) // Wait until we're confident that we're stuck + return .waiter + } - // The situation we set up is the following - // - Our direct child process will have exited here - // - Our grand child (sleep 12345678) is still running and has the stdin pipe - // - We switched off cancelling the stdin writer when our child exits - // - We're stuck now ... - // - ... until our `.waiter` returns - // - When we kill the grand-child - // - Which then unblocks everything else + // The situation we set up is the following + // - Our direct child process will have exited here + // - Our grand child (sleep 12345678) is still running and has the stdin pipe + // - We switched off cancelling the stdin writer when our child exits + // - We're stuck now ... + // - ... until our `.waiter` returns + // - When we kill the grand-child + // - Which then unblocks everything else - let actualReturn1 = try await group.next()! - XCTAssertEqual(.waiter, actualReturn1) + let actualReturn1 = try await group.next()! + XCTAssertEqual(.waiter, actualReturn1) - let stderrBytes = try await Array(exe.standardError) - XCTAssertEqual([], stderrBytes, "\(stderrBytes.map { $0.hexDump(format: .plain(maxBytes: .max)) })") + let stderrBytes = try await Array(exe.standardError) + XCTAssertEqual([], stderrBytes, "\(stderrBytes.map { $0.hexDump(format: .plain(maxBytes: .max)) })") - let killRet = kill(sleepPid, SIGKILL) - XCTAssertEqual(0, killRet, "kill failed: \(errno)") + let killRet = kill(sleepPid, SIGKILL) + XCTAssertEqual(0, killRet, "kill failed: \(errno)") - stdinStreamProducer.yield(ByteBuffer(repeating: 0x42, count: 1 * 1024 * 1024)) + stdinStreamProducer.yield(ByteBuffer(repeating: 0x42, count: 1 * 1024 * 1024)) - let actualReturn2 = try await group.next()! - XCTAssertEqual(.processRun, actualReturn2) - } + let actualReturn2 = try await group.next()! + XCTAssertEqual(.processRun, actualReturn2) } - #endif + } + + func testWeDoNotThrowIfProcessExitsBeforeStandardInputWriter() async throws { + // The default is + // spawnOptions.cancelStandardInputWritingWhenProcessExits = true + // therefore, this should not hang and the input-writing task should get cancelled when the process exits. (Without throwing an error.) + + // We never send data or a close signal through the stream's continuation, so the input-writing task doesn't finish. (Until it gets cancelled when the process exits.) + let (inputStreamThatNeverFinishes, _) = AsyncStream.justMakeIt(elementType: ByteBuffer.self) + + let exe = ProcessExecutor( + group: self.group, + executable: "/bin/sh", + ["-c", "exit 0"], + standardInput: inputStreamThatNeverFinishes, + standardOutput: .discard, + standardError: .discard, + logger: self.logger + ) + do { + let result = try await exe.run() + XCTAssertEqual(.exit(CInt(0)), result) + } catch { + XCTFail("Unexpected error in exe.run(): \(error)") + } + } func testTinyOutputConsumedAfterRun() async throws { let exe = ProcessExecutor( @@ -1997,37 +2041,180 @@ final class IntegrationTests: XCTestCase { XCTAssertEqual(.exit(0), result) } + func testRunReplacingCurrentProcessSimple() async throws { + let result = try await ProcessExecutor.runCollectingOutput( + group: self.group, + executable: self.sapExecURL.path, + ["/bin/echo", "test"], + standardInput: EOFSequence(), + collectStandardOutput: true, + collectStandardError: true, + logger: self.logger + ) + XCTAssertEqual(.exit(0), result.exitReason) + XCTAssertEqual("test\n", String(buffer: result.standardOutput!)) + } + + func testRunReplacingCurrentProcessExecutableNotFound() async throws { + let result = try await ProcessExecutor.runCollectingOutput( + group: self.group, + executable: self.sapExecURL.path, + ["/dev/null/nonexistent/executable"], + collectStandardOutput: true, + collectStandardError: true, + logger: self.logger + ) + + XCTAssertEqual(.exit(254), result.exitReason) + XCTAssertEqual("", String(buffer: result.standardOutput!)) + XCTAssertTrue(String(buffer: result.standardError!).hasPrefix("ERROR: ")) + } + + func testRunReplacingCurrentProcessInheritsStdio() async throws { + let result = try await ProcessExecutor.runCollectingOutput( + group: self.group, + executable: self.sapExecURL.path, + ["/bin/sh", "-c", "echo stdout; echo stderr >&2"], + standardInput: EOFSequence(), + collectStandardOutput: true, + collectStandardError: true, + logger: self.logger + ) + XCTAssertEqual(.exit(0), result.exitReason) + XCTAssertEqual("stdout\n", String(buffer: result.standardOutput!)) + XCTAssertEqual("stderr\n", String(buffer: result.standardError!)) + } + + func testRunReplacingCurrentProcessSignalHandling() async throws { + let startTime = DispatchTime.now().uptimeNanoseconds + + do { + let exe = ProcessExecutor( + group: self.group, + executable: self.sapExecURL.path, + ["/bin/sleep", "10000"], + standardInput: EOFSequence(), + standardOutput: .discard, + standardError: .discard, + teardownSequence: [ + .sendSignal(SIGTERM, allowedTimeToExitNS: 10_000_000_000_000) + ], + logger: self.logger + ) + + async let _ = exe.run() + + try await Task.sleep(nanoseconds: 100_000_000) // 100ms + // Leaving scope will cancel the child task + } + + let elapsedSeconds = (DispatchTime.now().uptimeNanoseconds - startTime) / 1_000_000_000 + + // Should be terminated quickly (not wait for the huge timeout) + XCTAssertLessThan(elapsedSeconds, 20, "Process should be terminated within 20 seconds") + } + + func testInheritStandardInput() async throws { + let oldStdin = dup(STDIN_FILENO) + defer { + dup2(oldStdin, STDIN_FILENO) + close(oldStdin) + } + let pipe = Pipe() + dup2(pipe.fileHandleForReading.fileDescriptor, STDIN_FILENO) + defer { + pipe.fileHandleForReading.closeFile() + pipe.fileHandleForWriting.closeFile() + } + + pipe.fileHandleForWriting.write(Data("hello\n".utf8)) + try pipe.fileHandleForWriting.close() + + let result = try await ProcessExecutor.runCollectingOutput( + executable: "/bin/bash", + ["-c", "while read -r line; do echo \"$line\"; done"], + standardInput: InheritStandardInput(), + collectStandardOutput: true, + collectStandardError: true + ) + + XCTAssertEqual(.exit(0), result.exitReason) + XCTAssertEqual("hello\n", String(buffer: result.standardOutput!)) + XCTAssertEqual("", String(buffer: result.standardError!)) + } + // MARK: - Setup/teardown override func setUp() async throws { self.group = MultiThreadedEventLoopGroup(numberOfThreads: 3) self.logger = Logger(label: "test", factory: { _ in SwiftLogNoOpLogHandler() }) - // Make sure the singleton threads have booted (because they use file descriptors) - try await MultiThreadedEventLoopGroup.singleton.next().submit {}.get() + // Find sap-exec binary + self.sapExecURL = nil + for bundleURL in [ + Bundle(for: Self.self).bundleURL, + Bundle.main.executableURL, + ] { + let candidateURL = bundleURL?.deletingLastPathComponent().appendingPathComponent("sap-exec") + if let candidateURL = candidateURL, + FileManager.default.fileExists(atPath: candidateURL.path.removingPercentEncoding!) + { + self.sapExecURL = candidateURL + break + } + } + XCTAssertNotNil(self.sapExecURL) + + // Make sure all the ELGs we use have booted (because they use file descriptors) + for loop in self.group.makeIterator() { + try await loop.submit {}.get() + } + for loop in MultiThreadedEventLoopGroup.singleton.makeIterator() { + try await loop.submit {}.get() + } + // Make sure libdispatch gets a chance to open its relevant fds (timerfd/signalfd) + self.unusedDispatchSource = DispatchSource.makeSignalSource(signal: SIGCHLD) + self.unusedTask = Task { + while !Task.isCancelled { + try? await Task.sleep(nanoseconds: 1_000_000_000) + } + } + await withCheckedContinuation { continuation in + self.unusedDispatchSource.setRegistrationHandler { + continuation.resume() + } + self.unusedDispatchSource.resume() + } self.highestFD = highestOpenFD() } override func tearDown() { - #if ASYNC_PROCESS_ENABLE_TESTS_WITH_PLATFORM_ASSUMPTIONS - var highestFD: CInt? = nil - for attempt in 0..<10 where highestFD != self.highestFD { - if highestFD != nil { - self.logger.debug( - "fd number differs", - metadata: [ - "before-test": "\(self.highestFD.debugDescription)", - "after-test": "\(highestFD.debugDescription)", - "attempt": "\(attempt)", - ] - ) - usleep(100_000) - } - highestFD = highestOpenFD() + var highestFD: CInt? = nil + for attempt in 0..<10 where highestFD != self.highestFD { + if highestFD != nil { + self.logger.debug( + "fd number differs", + metadata: [ + "before-test": "\(self.highestFD.debugDescription)", + "after-test": "\(highestFD.debugDescription)", + "attempt": "\(attempt)", + ] + ) + usleep(100_000) } - XCTAssertEqual(self.highestFD, highestFD, "\(blockingLSOF(pid: getpid()))") - #endif + highestFD = highestOpenFD() + } + XCTAssertEqual( + self.highestFD, + highestFD, + "potential file descriptor leak. Attempting to run lsof: \(blockingLSOF(pid: getpid()))" + ) self.highestFD = nil self.logger = nil + self.sapExecURL = nil + self.unusedDispatchSource!.cancel() + self.unusedDispatchSource = nil + self.unusedTask.cancel() + self.unusedTask = nil XCTAssertNoThrow(try self.group.syncShutdownGracefully()) self.group = nil @@ -2057,51 +2244,6 @@ extension AsyncSequence where Element == ByteBuffer { } } -extension ProcessExecutor { - struct AllOfAProcess: Sendable { - var exitReason: ProcessExitReason - var standardOutput: ByteBuffer - var standardError: ByteBuffer - } - - private enum What { - case exit(ProcessExitReason) - case stdout(ByteBuffer) - case stderr(ByteBuffer) - } - - func runGetAllOutput() async throws -> AllOfAProcess { - try await withThrowingTaskGroup(of: What.self, returning: AllOfAProcess.self) { group in - group.addTask { - return .exit(try await self.run()) - } - group.addTask { - return .stdout(try await self.standardOutput.pullAllOfIt()) - } - group.addTask { - return .stderr(try await self.standardError.pullAllOfIt()) - } - - var exitReason: ProcessExitReason? - var stdout: ByteBuffer? - var stderr: ByteBuffer? - - while let next = try await group.next() { - switch next { - case .exit(let value): - exitReason = value - case .stderr(let value): - stderr = value - case .stdout(let value): - stdout = value - } - } - - return AllOfAProcess(exitReason: exitReason!, standardOutput: stdout!, standardError: stderr!) - } - } -} - private func highestOpenFD() -> CInt? { #if os(macOS) guard let dirPtr = opendir("/dev/fd") else { @@ -2122,7 +2264,6 @@ private func highestOpenFD() -> CInt? { while let dirEntPtr = readdir(dirPtr) { var entryName = dirEntPtr.pointee.d_name let thisFD = withUnsafeBytes(of: &entryName) { entryNamePtr -> CInt? in - CInt(String(decoding: entryNamePtr.prefix(while: { $0 != 0 }), as: Unicode.UTF8.self)) } highestFDSoFar = max(thisFD ?? -1, highestFDSoFar) @@ -2132,27 +2273,30 @@ private func highestOpenFD() -> CInt? { } private func runLSOF(pid: pid_t) async throws -> String { - #if canImport(Darwin) - let lsofPath = "/usr/sbin/lsof" - #else - let lsofPath = "/usr/bin/lsof" - #endif - let result = try await ProcessExecutor.runCollectingOutput( - executable: lsofPath, - ["-Pnp", "\(pid)"], - collectStandardOutput: true, - collectStandardError: true - ) - let outString = """ - exit code: \(result.exitReason)\n - ## stdout - \(String(buffer: result.standardOutput!)) - - ## stderr - \(String(buffer: result.standardError!)) - - """ - return outString + let lsofPaths = ["/usr/sbin/lsof", "/usr/bin/lsof"] + + for lsofPath in lsofPaths { + guard FileManager.default.fileExists(atPath: lsofPath) else { + continue + } + let result = try await ProcessExecutor.runCollectingOutput( + executable: lsofPath, + ["-Pnp", "\(pid)"], + collectStandardOutput: true, + collectStandardError: true + ) + let outString = """ + exit code: \(result.exitReason)\n + ## stdout + \(String(buffer: result.standardOutput!)) + + ## stderr + \(String(buffer: result.standardError!)) + + """ + return outString + } + return "" } private func blockingLSOF(pid: pid_t) -> String { From d9c8d123c3ba5a4a7882b1f68869e5976e8914a9 Mon Sep 17 00:00:00 2001 From: Max Desiatov Date: Mon, 2 Mar 2026 18:04:23 +0000 Subject: [PATCH 2/8] Remove redundant file after update --- Sources/AsyncProcess/EOFSequence.swift | 27 -------------------------- 1 file changed, 27 deletions(-) delete mode 100644 Sources/AsyncProcess/EOFSequence.swift diff --git a/Sources/AsyncProcess/EOFSequence.swift b/Sources/AsyncProcess/EOFSequence.swift deleted file mode 100644 index 341dcc4..0000000 --- a/Sources/AsyncProcess/EOFSequence.swift +++ /dev/null @@ -1,27 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Swift open source project -// -// Copyright (c) 2022-2025 Apple Inc. and the Swift project authors -// Licensed under Apache License v2.0 with Runtime Library Exception -// -// See https://swift.org/LICENSE.txt for license information -// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors -// -//===----------------------------------------------------------------------===// - -public struct EOFSequence: AsyncSequence & Sendable { - public typealias Element = Element - - public struct AsyncIterator: AsyncIteratorProtocol { - public mutating func next() async throws -> Element? { - return nil - } - } - - public init(of type: Element.Type = Element.self) {} - - public func makeAsyncIterator() -> AsyncIterator { - return AsyncIterator() - } -} From 8ebdbc732ffe2b2be6009088530e773dfac7cd4a Mon Sep 17 00:00:00 2001 From: Max Desiatov Date: Tue, 3 Mar 2026 14:47:29 +0000 Subject: [PATCH 3/8] Fix imports ordering --- Tests/AsyncProcessTests/IntegrationTests.swift | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/Tests/AsyncProcessTests/IntegrationTests.swift b/Tests/AsyncProcessTests/IntegrationTests.swift index 631bb0f..fd50e2d 100644 --- a/Tests/AsyncProcessTests/IntegrationTests.swift +++ b/Tests/AsyncProcessTests/IntegrationTests.swift @@ -10,14 +10,6 @@ // //===----------------------------------------------------------------------===// -import AsyncAlgorithms -import AsyncProcess -import Atomics -import Logging -import NIO -import NIOConcurrencyHelpers -import XCTest - #if canImport(Darwin) import Darwin #elseif canImport(Musl) @@ -34,6 +26,14 @@ import XCTest #error("unknown libc, please fix") #endif +import AsyncAlgorithms +import AsyncProcess +import Atomics +import Logging +import NIO +import NIOConcurrencyHelpers +import XCTest + final class IntegrationTests: XCTestCase { private var group: EventLoopGroup! private var logger: Logger! From ca817c2773786b7735bd689159cd3aef44ec7e23 Mon Sep 17 00:00:00 2001 From: Max Desiatov Date: Tue, 3 Mar 2026 14:57:13 +0000 Subject: [PATCH 4/8] Disallow formatting on Tests/AsyncProcessTests/IntegrationTests.swift --- Tests/AsyncProcessTests/IntegrationTests.swift | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Tests/AsyncProcessTests/IntegrationTests.swift b/Tests/AsyncProcessTests/IntegrationTests.swift index fd50e2d..d272096 100644 --- a/Tests/AsyncProcessTests/IntegrationTests.swift +++ b/Tests/AsyncProcessTests/IntegrationTests.swift @@ -10,6 +10,8 @@ // //===----------------------------------------------------------------------===// +// swift-format-ignore-file + #if canImport(Darwin) import Darwin #elseif canImport(Musl) From 7674468e1c5f5f8a9628bf76aab173dbb4bd7505 Mon Sep 17 00:00:00 2001 From: Max Desiatov Date: Tue, 3 Mar 2026 15:09:48 +0000 Subject: [PATCH 5/8] Add missing `sap-exec` helper to `Package.swift` --- Package.swift | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/Package.swift b/Package.swift index de5966f..07e5bbd 100644 --- a/Package.swift +++ b/Package.swift @@ -105,10 +105,15 @@ let package = Package( .product(name: "SystemPackage", package: "swift-system"), ] ), + .executableTarget( + name: "sap-exec", + dependencies: ["AsyncProcess"] + ), .testTarget( name: "AsyncProcessTests", dependencies: [ "AsyncProcess", + "sap-exec", .product(name: "Atomics", package: "swift-atomics"), .product(name: "AsyncAlgorithms", package: "swift-async-algorithms"), .product(name: "NIO", package: "swift-nio"), From 678d3af126179e832e96c71f73be715c92827cb0 Mon Sep 17 00:00:00 2001 From: Max Desiatov Date: Tue, 3 Mar 2026 15:32:57 +0000 Subject: [PATCH 6/8] Fix another imports ordering issue --- Sources/sap-exec/MainFile.swift | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/Sources/sap-exec/MainFile.swift b/Sources/sap-exec/MainFile.swift index 75ea964..f2add84 100644 --- a/Sources/sap-exec/MainFile.swift +++ b/Sources/sap-exec/MainFile.swift @@ -10,8 +10,7 @@ // //===----------------------------------------------------------------------===// -import AsyncProcess -import Foundation +// swift-format-ignore-file #if canImport(Darwin) import Darwin @@ -29,6 +28,9 @@ import Foundation #error("unknown libc, please fix") #endif +import AsyncProcess +import Foundation + @main struct SAPExec { static func main() async throws { From a75ab44aebb41b20b2166c821e0259cecc2c4ad4 Mon Sep 17 00:00:00 2001 From: Max Desiatov Date: Wed, 4 Mar 2026 09:49:35 +0000 Subject: [PATCH 7/8] Bump SwiftNIO to 2.92.2 --- Package.swift | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Package.swift b/Package.swift index 07e5bbd..6514a07 100644 --- a/Package.swift +++ b/Package.swift @@ -106,8 +106,8 @@ let package = Package( ] ), .executableTarget( - name: "sap-exec", - dependencies: ["AsyncProcess"] + name: "sap-exec", + dependencies: ["AsyncProcess"] ), .testTarget( name: "AsyncProcessTests", @@ -170,7 +170,7 @@ if configuration.useLocalDependencies { .package(url: "https://github.com/apple/swift-atomics.git", from: "1.2.0"), .package(url: "https://github.com/apple/swift-collections.git", from: "1.1.2"), .package(url: "https://github.com/apple/swift-crypto.git", from: "3.1.0"), - .package(url: "https://github.com/apple/swift-nio.git", from: "2.65.0"), + .package(url: "https://github.com/apple/swift-nio.git", from: "2.92.2"), .package(url: "https://github.com/apple/swift-log.git", from: "1.5.4"), ] } From 2489cd377b57048c9a363ab7af08615206e5b75f Mon Sep 17 00:00:00 2001 From: Max Desiatov Date: Wed, 4 Mar 2026 09:53:26 +0000 Subject: [PATCH 8/8] Bump SwiftNIO to latest version --- Package.resolved | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Package.resolved b/Package.resolved index 0009e81..bd4fdff 100644 --- a/Package.resolved +++ b/Package.resolved @@ -104,8 +104,8 @@ "kind" : "remoteSourceControl", "location" : "https://github.com/apple/swift-nio.git", "state" : { - "revision" : "1c30f0f2053b654e3d1302492124aa6d242cdba7", - "version" : "2.86.0" + "revision" : "e932d3c4d8f77433c8f7093b5ebcbf91463948a0", + "version" : "2.95.0" } }, {