From d51b8f538abdb19b864e22ce7709853dc6ec6a27 Mon Sep 17 00:00:00 2001 From: broken-circle <252359939+broken-circle@users.noreply.github.com> Date: Wed, 3 Jun 2026 16:57:42 -0700 Subject: [PATCH] Drain buffered output on Windows after cancellation When a child exits, the run cancels its pending I/O. On Windows, a read issued after that cancellation returned `nil` without consuming bytes already buffered in the pipe, dropping output the child had already written. The kqueue and epoll backends drain the descriptor on this path; the IOCP backend did not. `registerHandle()` now surfaces its registration outcome so `read()` detects the cancelled state before issuing any I/O and drains the buffer itself. `write()` reports a zero-length write on the same path. Both previously issued an overlapped operation and returned without awaiting it, leaving the kernel to write into a buffer the call had already released. --- Sources/Subprocess/IO/AsyncIO+Windows.swift | 389 +++++++++++++------- Tests/SubprocessTests/AsyncIOTests.swift | 80 ++++ 2 files changed, 332 insertions(+), 137 deletions(-) diff --git a/Sources/Subprocess/IO/AsyncIO+Windows.swift b/Sources/Subprocess/IO/AsyncIO+Windows.swift index 1a6ebbe9..84388ee2 100644 --- a/Sources/Subprocess/IO/AsyncIO+Windows.swift +++ b/Sources/Subprocess/IO/AsyncIO+Windows.swift @@ -178,73 +178,220 @@ final class AsyncIO: @unchecked Sendable { private func registerHandle( _ handle: HANDLE, processIdentifier: ProcessIdentifier - ) -> SignalStream { - return SignalStream { (continuation: SignalStream.Continuation) in - switch self.ioCompletionPort { - case .success(let ioPort): - // Make sure thread setup also succeeded. - if case .failure(let error) = monitorThread { - continuation.finish(throwing: error) - return - } - let completionKey = UInt64(UInt(bitPattern: handle)) - - // Hold the lock across both the map insert and - // `CreateIoCompletionPort` so a concurrent `cancelAsyncIO` - // cannot mark the process cancelled between the two steps - // and leave behind a HANDLE that's bound to the IOCP but - // missing the cancellation marker. - let outcome: RegistrationOutcome = _registration.withLock { storage in - let result = storage.register( - completionKey: completionKey, - continuation: continuation, - processIdentifier: processIdentifier - ) - switch result { - case .alreadyCancelled: - return .alreadyCancelled - case .updated: - // The handle was already attached to the IOCP by a - // previous read or write; the new continuation - // replaces the previous one in the map. - return .registered - case .registered: - // Per the Windows documentation, the function - // returns the handle of the existing I/O completion - // port on success. - guard - CreateIoCompletionPort( - handle, ioPort, completionKey, 0 - ) == ioPort - else { - let capturedError = GetLastError() - // Roll back the registration so a future - // attempt (such as the next read on this - // handle) gets a clean slate rather than - // seeing a stale entry. - _ = storage.removeRegistration(for: completionKey) - let error: SubprocessError = .asyncIOFailed( - reason: "CreateIoCompletionPort failed", - underlyingError: SubprocessError.WindowsError(win32Error: capturedError) - ) - return .failed(error) - } - return .registered - } - } + ) -> (stream: SignalStream, outcome: RegistrationOutcome) { + let (stream, continuation) = SignalStream.makeStream() + + let ioPort: HANDLE + switch self.ioCompletionPort { + case .success(let port): + ioPort = port + case .failure(let error): + continuation.finish(throwing: error) + return (stream, .failed(error)) + } + // Make sure thread setup also succeeded. + if case .failure(let error) = self.monitorThread { + continuation.finish(throwing: error) + return (stream, .failed(error)) + } + + let completionKey = UInt64(UInt(bitPattern: handle)) - switch outcome { - case .registered: - break - case .alreadyCancelled: - continuation.finish() - case .failed(let error): - continuation.finish(throwing: error) + // Hold the lock across both the map insert and `CreateIoCompletionPort` + // so a concurrent `cancelAsyncIO` cannot mark the process cancelled + // between the two steps and leave behind a HANDLE that's bound to the + // IOCP but missing the cancellation marker. + let outcome: RegistrationOutcome = _registration.withLock { storage in + let result = storage.register( + completionKey: completionKey, + continuation: continuation, + processIdentifier: processIdentifier + ) + switch result { + case .alreadyCancelled: + return .alreadyCancelled + case .updated: + // The handle was already attached to the IOCP by a previous + // read or write; the new continuation replaces the previous + // one in the map. + return .registered + case .registered: + // Per the Windows documentation, the function returns the + // handle of the existing I/O completion port on success. + guard + CreateIoCompletionPort( + handle, ioPort, completionKey, 0 + ) == ioPort + else { + let capturedError = GetLastError() + // Roll back the registration so a future attempt (such as + // the next read on this handle) gets a clean slate rather + // than seeing a stale entry. + _ = storage.removeRegistration(for: completionKey) + return .failed( + .asyncIOFailed( + reason: "CreateIoCompletionPort failed", + underlyingError: SubprocessError.WindowsError(win32Error: capturedError) + )) } - case .failure(let error): - continuation.finish(throwing: error) + return .registered + } + } + + switch outcome { + case .registered: + break + case .alreadyCancelled: + continuation.finish() + case .failed(let error): + continuation.finish(throwing: error) + } + return (stream, outcome) + } + + private enum OverlappedReadOutcome { + /// The read placed `count` bytes at the front of the buffer. + case read(Int) + /// The signal stream finished before delivering data: EOF, a broken + /// pipe, or an operation aborted by cancellation. + case finished + } + + /// Issues a single overlapped `ReadFile` on `handle` and awaits its + /// completion through `stream`. The caller must already hold a + /// `.registered` outcome for `handle`. + private func issueAndAwaitRead( + on handle: HANDLE, + stream: SignalStream, + into resultBuffer: inout [UInt8] + ) async throws(SubprocessError) -> OverlappedReadOutcome { + var iterator = stream.makeAsyncIterator() + + // Empty `_OVERLAPPED()` because `ReadFile` only reads pipes here. + var overlapped = _OVERLAPPED() + let succeed = resultBuffer.withUnsafeMutableBufferPointer { bufferPointer in + let targetCount: DWORD = self.calculateRemainingCount( + totalCount: bufferPointer.count, + readCount: 0 + ) + return ReadFile(handle, bufferPointer.baseAddress!, targetCount, nil, &overlapped) + } + + if !succeed { + let lastError = GetLastError() + if lastError == ERROR_BROKEN_PIPE { + return .finished + } + guard lastError == ERROR_IO_PENDING else { + throw SubprocessError.failedToReadFromProcess( + withUnderlyingError: SubprocessError.WindowsError(win32Error: lastError) + ) } } + + let bytesRead: DWORD + do { + guard let next = try await iterator.next() else { + return .finished + } + bytesRead = next + } catch { + if let subprocessError = error as? SubprocessError { + throw subprocessError + } + throw SubprocessError.failedToReadFromProcess( + withUnderlyingError: error as? SubprocessError.UnderlyingError + ) + } + + if bytesRead == 0 { + return .finished + } + return .read(Int(truncatingIfNeeded: bytesRead)) + } + + /// Reads bytes already buffered in the pipe after the owning process has + /// been cancelled, without routing through the completion port. + /// + /// Bytes the child already wrote remain in the pipe and must not be + /// dropped, but a fresh read awaited through the monitor could hang: a + /// surviving descendant can hold the write end open, so EOF may never + /// arrive. `PeekNamedPipe` neither consumes data nor blocks, so it gates + /// the read; `ReadFile` is issued only when bytes are present, which + /// then completes promptly. + /// + /// The read is overlapped (the handle is `FILE_FLAG_OVERLAPPED`), so it's + /// synchronized on a private event rather than the completion port. The low + /// bit of `OVERLAPPED.hEvent` is set so that, on a handle a prior read + /// already bound to the port, this completion is not enqueued there for the + /// monitor to pick up against an `OVERLAPPED` this call has freed. The wait + /// is bounded since `PeekNamedPipe` has already confirmed the bytes are + /// present. + /// + /// Returns the drained bytes, or `nil` once the buffer is empty or the pipe + /// has reached EOF. The caller drives repeated calls to drain successive + /// chunks. + private func drainBufferedDataAfterCancellation( + from handle: HANDLE, + capacity: Int + ) throws(SubprocessError) -> [UInt8]? { + func readFailed(_ error: DWORD) -> SubprocessError { + .failedToReadFromProcess( + withUnderlyingError: SubprocessError.WindowsError(win32Error: error) + ) + } + + var bytesAvailable: DWORD = 0 + guard PeekNamedPipe(handle, nil, 0, nil, &bytesAvailable, nil) else { + let lastError = GetLastError() + // Empty pipe, write end closed: EOF, nothing left to drain. + if lastError == ERROR_BROKEN_PIPE { return nil } + throw readFailed(lastError) + } + guard bytesAvailable > 0 else { + // Buffer empty. Don't issue a read: with the process cancelled, + // nothing guarantees the write end ever closes, so a pending read + // could hang indefinitely. + return nil + } + + // The `OVERLAPPED` gets a copy with the low bit set to keep the + // completion off the IOCP. + guard let event = CreateEventW(nil, true, false, nil) else { + throw readFailed(GetLastError()) + } + defer { CloseHandle(event) } + + var resultBuffer = [UInt8](repeating: 0, count: capacity) + var overlapped = _OVERLAPPED() + overlapped.hEvent = HANDLE(bitPattern: UInt(bitPattern: event) | 1) + + let started = resultBuffer.withUnsafeMutableBufferPointer { bufferPointer in + let targetCount = self.calculateRemainingCount( + totalCount: bufferPointer.count, + readCount: 0 + ) + return ReadFile(handle, bufferPointer.baseAddress!, targetCount, nil, &overlapped) + } + if !started { + let lastError = GetLastError() + if lastError == ERROR_BROKEN_PIPE { return nil } + guard lastError == ERROR_IO_PENDING else { + throw readFailed(lastError) + } + // `PeekNamedPipe` already saw the bytes, so this returns promptly. + WaitForSingleObject(event, INFINITE) + } + + var bytesRead: DWORD = 0 + guard GetOverlappedResult(handle, &overlapped, &bytesRead, false) else { + let lastError = GetLastError() + if lastError == ERROR_BROKEN_PIPE { return nil } + throw readFailed(lastError) + } + guard bytesRead > 0 else { return nil } + resultBuffer.removeLast(resultBuffer.count - Int(truncatingIfNeeded: bytesRead)) + return resultBuffer } internal func removeRegistration(for handle: HANDLE) { @@ -312,85 +459,42 @@ final class AsyncIO: @unchecked Sendable { bufferLength = maxLength } - var resultBuffer: [UInt8] = Array( - repeating: 0, count: bufferLength - ) - var signalStream = self.registerHandle( + let (stream, outcome) = self.registerHandle( handle, processIdentifier: processIdentifier - ).makeAsyncIterator() + ) - // Use an empty `_OVERLAPPED()` here because `ReadFile` below only - // reads non-seekable files (pipes). - var overlapped = _OVERLAPPED() - let succeed = resultBuffer.withUnsafeMutableBufferPointer { bufferPointer in - // Get a pointer to the memory at the specified offset. - // Windows `ReadFile` uses `DWORD` for target count, which means - // the call can read at most `DWORD.max` (`UInt32.max`) bytes. - let targetCount: DWORD = self.calculateRemainingCount( - totalCount: bufferPointer.count, - readCount: 0 + switch outcome { + case .registered: + break + case .failed(let error): + throw SubprocessError.failedToReadFromProcess( + withUnderlyingError: error.underlyingError ) - - // Read directly into the buffer at the offset. - return ReadFile( - handle, - bufferPointer.baseAddress!, - targetCount, - nil, - &overlapped + case .alreadyCancelled: + // Cancelled before this read began. Surface buffered bytes rather + // than reporting an empty stream. + return try self.drainBufferedDataAfterCancellation( + from: handle, capacity: bufferLength ) } - if !succeed { - // `ReadFile` is expected to return `false` in async mode. - // Confirm the call returned `ERROR_IO_PENDING` or - // `ERROR_BROKEN_PIPE`. - let lastError = GetLastError() - if lastError == ERROR_BROKEN_PIPE { - // Reached end of file before any data was read. - return nil - } - guard lastError == ERROR_IO_PENDING else { - let error: SubprocessError = .failedToReadFromProcess( - withUnderlyingError: SubprocessError.WindowsError(win32Error: lastError) - ) - throw error - } - - } - // Wait for the read to finish. - let bytesRead: DWORD - do { - guard let next = try await signalStream.next() else { - // The signal stream finished without delivering data. This - // happens when `cancelAsyncIO` ran (typically because the - // child exited) and the IOCP delivered an - // `ERROR_OPERATION_ABORTED` completion. By the time the - // monitor thread finishes the stream, the kernel has - // released its references to `resultBuffer` and - // `overlapped`, so it's safe to return. - return nil - } - bytesRead = next - } catch { - if let subprocessError = error as? SubprocessError { - throw subprocessError - } - throw SubprocessError.failedToReadFromProcess( - withUnderlyingError: error as? SubprocessError.UnderlyingError + var resultBuffer = [UInt8](repeating: 0, count: bufferLength) + switch try await self.issueAndAwaitRead( + on: handle, + stream: stream, + into: &resultBuffer + ) { + case .read(let count): + resultBuffer.removeLast(resultBuffer.count - count) + return resultBuffer + case .finished: + // EOF, or the read was aborted in flight. If a write landed in the + // buffer with no read pending to receive it, drain it now. + return try self.drainBufferedDataAfterCancellation( + from: handle, capacity: bufferLength ) } - - if bytesRead == 0 { - // End of file. - return nil - } - - // Got data — return immediately so the caller can process it - // without waiting for the buffer to fill. - resultBuffer.removeLast(resultBuffer.count - Int(truncatingIfNeeded: bytesRead)) - return resultBuffer } func write( @@ -410,10 +514,21 @@ final class AsyncIO: @unchecked Sendable { return 0 } let handle = diskIO.descriptor() - var signalStream = self.registerHandle( - handle, - processIdentifier: processIdentifier - ).makeAsyncIterator() + let (stream, outcome) = self.registerHandle(handle, processIdentifier: processIdentifier) + var signalStream = stream.makeAsyncIterator() + + switch outcome { + case .registered: + break + case .failed(let error): + throw SubprocessError.failedToWriteToProcess(withUnderlyingError: error.underlyingError) + case .alreadyCancelled: + // The subprocess has exited; its stdin no longer has a reader. + // Report a zero-length write instead of issuing a fire-and-forget + // `WriteFile` whose completion nothing would await. + return 0 + } + var writtenLength: Int = 0 while true { // Use an empty `_OVERLAPPED()` here because `WriteFile` below diff --git a/Tests/SubprocessTests/AsyncIOTests.swift b/Tests/SubprocessTests/AsyncIOTests.swift index 1c909b2e..c564209e 100644 --- a/Tests/SubprocessTests/AsyncIOTests.swift +++ b/Tests/SubprocessTests/AsyncIOTests.swift @@ -239,6 +239,86 @@ extension SubprocessAsyncIOTests { } } +// MARK: - Cancellation Tests +extension SubprocessAsyncIOTests { + @Test(.timeLimit(.minutes(1))) + func testReadDrainsBufferedDataAfterCancellation() async throws { + let payload = randomData(count: 1024) + let io = AsyncIO.shared + defer { + io.cleanup(processIdentifier: _currentPID) + } + + var pipe = try CreatedPipe(closeWhenDone: true, purpose: .input) + let readTestBed = try TestBed(ioDescriptor: _require(pipe.readFileDescriptor())) + let writeTestBed = try TestBed(ioDescriptor: _require(pipe.writeFileDescriptor())) + + // Buffer the payload with no read pending. It fits comfortably within + // the pipe capacity, so the write completes without a reader. + let written = try await io.write(payload, to: writeTestBed.ioDescriptor) + #expect(written == payload.count) + + // Stand in for the termination monitor: the child has exited, so the + // run cancels its I/O while bytes are still buffered. + try io.cancelAsyncIO(for: _currentPID) + + let drained = try await io.read( + from: readTestBed.ioDescriptor, + upTo: payload.count + ) + + // Close promptly to bound any dangling-I/O window on a backend that + // issues a read on the cancelled path without awaiting completion. + try await readTestBed.finish() + try await writeTestBed.finish() + + let bytes = try #require( + drained, + "read returned nil while \(payload.count) bytes were buffered: the cancelled path dropped them instead of draining" + ) + #expect(bytes == payload) + } + + @Test(.timeLimit(.minutes(1))) + func testReadDrainsBufferedDataAfterCancellationOnAttachedHandle() async throws { + let firstChunk = randomData(count: 1024) + let secondChunk = randomData(count: 1024) + let io = AsyncIO.shared + defer { + io.cleanup(processIdentifier: _currentPID) + } + + var pipe = try CreatedPipe(closeWhenDone: true, purpose: .input) + let readTestBed = try TestBed(ioDescriptor: _require(pipe.readFileDescriptor())) + let writeTestBed = try TestBed(ioDescriptor: _require(pipe.writeFileDescriptor())) + + // Both chunks sit in the pipe buffer together. + let written = try await io.write(firstChunk + secondChunk, to: writeTestBed.ioDescriptor) + #expect(written == firstChunk.count + secondChunk.count) + + // A first, uncancelled read attaches the descriptor and consumes the + // first chunk. + let first = try await io.read(from: readTestBed.ioDescriptor, upTo: firstChunk.count) + #expect(first == firstChunk) + + // Cancel while the second chunk is still buffered. + try io.cancelAsyncIO(for: _currentPID) + + // The cancelled read must drain the second chunk from the now-cancelled, + // already-attached descriptor. + let second = try await io.read(from: readTestBed.ioDescriptor, upTo: secondChunk.count) + + try await readTestBed.finish() + try await writeTestBed.finish() + + let drained = try #require( + second, + "read returned nil while \(secondChunk.count) buffered bytes remained on an already-attached descriptor: the cancelled path dropped them" + ) + #expect(drained == secondChunk) + } +} + // MARK: - Utils extension SubprocessAsyncIOTests { final class TestBed: Sendable {