Skip to content

Commit e417c3f

Browse files
authored
Make AsyncIO.read's maxLength a cap and rewrite Darwin backend (#259)
Previously AsyncIO.read(upTo: maxLength) treated maxLength as a target length. It would accumulate bytes in a loop until it had gathered that many before returning. This deadlocks when a subprocess writes fewer bytes than maxLength and then pauses or waits on stdin: the parent blocks forever on data the child never sends. This commit change maxLength to a "cap", as its name suggests. Each read(upTo:) now returns as soon as the underlying nonblocking read(2) (or ReadFile) completes. Callers that need a specific byte count accumulate across calls themselves. Follow-on cleanup enabled by the semantics fix: - Replace Darwin's DispatchIO backend with kqueue + read(2). The new Darwin path is a thin event-source shim over the same nonblocking read/write loop that Linux and Android already use, sharing implementation in a new IO/AsyncIO+Unix.swift. - Collapse IOChannel into IODescriptor. IOChannel only existed to wrap a DispatchIO; with DispatchIO gone, a single fd-owning type suffices. - Drop the public `preferredBufferSize` parameter from every run() overload. It was introduced to let callers work around the old hang; buffer size is now derived from the pipe's capacity via F_GETPIPE_SZ (Linux/Android), fstat().st_blksize (Darwin/OpenBSD), or a fixed 64 KB fallback (FreeBSD, Windows).
1 parent 371baa0 commit e417c3f

24 files changed

Lines changed: 1117 additions & 1196 deletions

Package.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ dep.append(
2222
let defaultTraits: Set<String> = ["SubprocessFoundation"]
2323

2424
let packageSwiftSettings: [SwiftSetting] = [
25-
.define("SUBPROCESS_ASYNCIO_DISPATCH", .when(platforms: [.macOS, .custom("freebsd"), .openbsd])),
25+
.define("SUBPROCESS_ASYNCIO_KQUEUE", .when(platforms: [.macOS, .custom("freebsd"), .openbsd])),
2626
.enableUpcomingFeature("ExistentialAny"),
2727
.enableUpcomingFeature("MemberImportVisibility"),
2828
.enableUpcomingFeature("InternalImportsByDefault"),

Sources/Subprocess/API.swift

Lines changed: 61 additions & 120 deletions
Large diffs are not rendered by default.

Sources/Subprocess/AsyncBufferSequence.swift

Lines changed: 9 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,7 @@ public struct AsyncBufferSequence: AsyncSequence, @unchecked Sendable {
3838
/// The element type for the asynchronous sequence.
3939
public typealias Element = Buffer
4040

41-
#if SUBPROCESS_ASYNCIO_DISPATCH
42-
internal typealias DiskIO = DispatchIO
43-
#elseif canImport(WinSDK)
41+
#if canImport(WinSDK)
4442
internal typealias DiskIO = HANDLE
4543
#else
4644
internal typealias DiskIO = FileDescriptor
@@ -55,10 +53,11 @@ public struct AsyncBufferSequence: AsyncSequence, @unchecked Sendable {
5553
private let preferredBufferSize: Int
5654
private var buffer: [Buffer]
5755

58-
internal init(diskIO: DiskIO, preferredBufferSize: Int?) {
56+
internal init(diskIO: DiskIO) {
5957
self.diskIO = diskIO
6058
self.buffer = []
61-
self.preferredBufferSize = preferredBufferSize ?? readBufferSize
59+
// Only need to query it once at beginning of stream
60+
self.preferredBufferSize = AsyncIO.queryPipeBufferSize(for: diskIO)
6261
}
6362

6463
/// Retrieves the next buffer in the sequence, or `nil` if the sequence ended.
@@ -74,24 +73,14 @@ public struct AsyncBufferSequence: AsyncSequence, @unchecked Sendable {
7473
)
7574
guard let data else {
7675
// We finished reading. Close the file descriptor now
77-
#if SUBPROCESS_ASYNCIO_DISPATCH
78-
try _safelyClose(.dispatchIO(self.diskIO))
79-
#elseif canImport(WinSDK)
76+
#if canImport(WinSDK)
8077
try _safelyClose(.handle(self.diskIO))
8178
#else
8279
try _safelyClose(.fileDescriptor(self.diskIO))
8380
#endif
8481
return nil
8582
}
86-
let createdBuffers = Buffer.createFrom(data)
87-
// Most (all?) cases there should be only one buffer
88-
// because DispatchData are mostly contiguous
89-
if _fastPath(createdBuffers.count == 1) {
90-
// No need to push to the stack
91-
return createdBuffers[0]
92-
}
93-
self.buffer = createdBuffers
94-
return self.buffer.removeFirst()
83+
return Buffer(data: data)
9584
}
9685

9786
/// Retrieves the next buffer in the sequence, or `nil` if the sequence ended.
@@ -101,19 +90,14 @@ public struct AsyncBufferSequence: AsyncSequence, @unchecked Sendable {
10190
}
10291

10392
private let diskIO: DiskIO
104-
private let preferredBufferSize: Int?
10593

106-
internal init(diskIO: DiskIO, preferredBufferSize: Int?) {
94+
internal init(diskIO: DiskIO) {
10795
self.diskIO = diskIO
108-
self.preferredBufferSize = preferredBufferSize
10996
}
11097

11198
/// Creates an iterator for this asynchronous sequence.
11299
public func makeAsyncIterator() -> Iterator {
113-
return Iterator(
114-
diskIO: self.diskIO,
115-
preferredBufferSize: self.preferredBufferSize
116-
)
100+
return Iterator(diskIO: self.diskIO)
117101
}
118102

119103
/// Splits the buffer into strings using the specified separator.
@@ -268,29 +252,12 @@ extension AsyncBufferSequence {
268252
self.eofReached = true
269253
return nil
270254
}
271-
#if SUBPROCESS_ASYNCIO_DISPATCH
272-
// Unfortunately here we _have to_ copy the bytes out because
273-
// DispatchIO (rightfully) reuses buffer, which means `buffer.data`
274-
// has the same address on all iterations, therefore we can't directly
275-
// create the result array from buffer.data
276-
277-
// Calculate how many CodePoint elements we have
278-
let elementCount = buffer.data.count / MemoryLayout<Encoding.CodeUnit>.stride
279-
280-
// Create array by copying from the buffer reinterpreted as CodePoint
281-
let result: Array<Encoding.CodeUnit> = buffer.data.withUnsafeBytes { ptr -> Array<Encoding.CodeUnit> in
282-
return Array(
283-
UnsafeBufferPointer(start: ptr.baseAddress?.assumingMemoryBound(to: Encoding.CodeUnit.self), count: elementCount)
284-
)
285-
}
286-
#else
287255
// Cast data to CodeUnit type
288256
let result = buffer.withUnsafeBytes { ptr in
289257
return ptr.withMemoryRebound(to: Encoding.CodeUnit.self) { codeUnitPtr in
290258
return Array(codeUnitPtr)
291259
}
292260
}
293-
#endif
294261
return result.isEmpty ? nil : result
295262
}
296263

@@ -604,6 +571,6 @@ private let _pageSize: Int = Int(getpagesize())
604571
#endif // canImport(Darwin)
605572

606573
@inline(__always)
607-
internal var readBufferSize: Int {
574+
internal var systemPageSize: Int {
608575
return _pageSize
609576
}

Sources/Subprocess/Buffer.swift

Lines changed: 0 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,6 @@
99
//
1010
//===----------------------------------------------------------------------===//
1111

12-
// swift-format-ignore-file
13-
14-
#if canImport(Darwin) || canImport(Glibc) || canImport(Android) || canImport(Musl)
15-
@preconcurrency internal import Dispatch
16-
1712
#if SubprocessFoundation
1813

1914
#if canImport(Darwin)
@@ -24,41 +19,16 @@ internal import Foundation
2419
internal import FoundationEssentials
2520
#endif
2621

27-
#endif
2822
#endif
2923

3024
extension AsyncBufferSequence {
3125
/// An immutable collection of bytes.
3226
public struct Buffer: Sendable {
33-
#if SUBPROCESS_ASYNCIO_DISPATCH
34-
// We need to keep the backingData alive while Slice is alive
35-
internal let backingData: DispatchData
36-
internal let data: DispatchData.Region
37-
38-
internal init(data: DispatchData.Region, backingData: DispatchData) {
39-
self.data = data
40-
self.backingData = backingData
41-
}
42-
43-
internal static func createFrom(_ data: DispatchData) -> [Buffer] {
44-
let slices = data.regions
45-
// In most (all?) cases data should only have one slice
46-
if _fastPath(slices.count == 1) {
47-
return [.init(data: slices[0], backingData: data)]
48-
}
49-
return slices.map { .init(data: $0, backingData: data) }
50-
}
51-
#else
5227
internal let data: [UInt8]
5328

5429
internal init(data: [UInt8]) {
5530
self.data = data
5631
}
57-
58-
internal static func createFrom(_ data: [UInt8]) -> [Buffer] {
59-
return [.init(data: data)]
60-
}
61-
#endif // SUBPROCESS_ASYNCIO_DISPATCH
6232
}
6333
}
6434

@@ -104,78 +74,3 @@ extension AsyncBufferSequence.Buffer {
10474
}
10575
}
10676
}
107-
108-
// MARK: - Hashable, Equatable
109-
extension AsyncBufferSequence.Buffer: Equatable, Hashable {
110-
#if SUBPROCESS_ASYNCIO_DISPATCH
111-
/// Returns a Boolean value that indicates whether two buffers are equal.
112-
public static func == (lhs: AsyncBufferSequence.Buffer, rhs: AsyncBufferSequence.Buffer) -> Bool {
113-
return lhs.data == rhs.data
114-
}
115-
116-
/// Hashes the essential components of this value by feeding them into the given hasher.
117-
public func hash(into hasher: inout Hasher) {
118-
return self.data.hash(into: &hasher)
119-
}
120-
#endif
121-
// else Compiler generated conformances
122-
}
123-
124-
#if SUBPROCESS_ASYNCIO_DISPATCH
125-
extension DispatchData.Region {
126-
static func == (lhs: DispatchData.Region, rhs: DispatchData.Region) -> Bool {
127-
return lhs.withUnsafeBytes { lhsBytes in
128-
return rhs.withUnsafeBytes { rhsBytes in
129-
return lhsBytes.elementsEqual(rhsBytes)
130-
}
131-
}
132-
}
133-
134-
internal func hash(into hasher: inout Hasher) {
135-
return self.withUnsafeBytes { ptr in
136-
return hasher.combine(bytes: ptr)
137-
}
138-
}
139-
}
140-
#if !canImport(Darwin) || !SubprocessFoundation
141-
/// `DispatchData.Region` is defined in Foundation, but we can't depend on Foundation when the SubprocessFoundation trait is disabled.
142-
extension DispatchData {
143-
typealias Region = _ContiguousBufferView
144-
145-
var regions: [Region] {
146-
contiguousBufferViews
147-
}
148-
149-
internal struct _ContiguousBufferView: @unchecked Sendable, RandomAccessCollection {
150-
typealias Element = UInt8
151-
152-
internal let bytes: UnsafeBufferPointer<UInt8>
153-
154-
internal var startIndex: Int { self.bytes.startIndex }
155-
internal var endIndex: Int { self.bytes.endIndex }
156-
157-
internal init(bytes: UnsafeBufferPointer<UInt8>) {
158-
self.bytes = bytes
159-
}
160-
161-
internal func withUnsafeBytes<ResultType>(_ body: (UnsafeRawBufferPointer) throws -> ResultType) rethrows -> ResultType {
162-
return try body(UnsafeRawBufferPointer(self.bytes))
163-
}
164-
165-
subscript(position: Int) -> UInt8 {
166-
_read {
167-
yield self.bytes[position]
168-
}
169-
}
170-
}
171-
172-
internal var contiguousBufferViews: [_ContiguousBufferView] {
173-
var slices = [_ContiguousBufferView]()
174-
enumerateBytes { (bytes, index, stop) in
175-
slices.append(_ContiguousBufferView(bytes: bytes))
176-
}
177-
return slices
178-
}
179-
}
180-
#endif
181-
#endif

Sources/Subprocess/CMakeLists.txt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@ add_library(Subprocess
1818
Result.swift
1919
IO/Output.swift
2020
IO/Input.swift
21-
IO/AsyncIO+Dispatch.swift
21+
IO/AsyncIO+KQueue.swift
2222
IO/AsyncIO+Linux.swift
2323
IO/AsyncIO+Windows.swift
24+
IO/AsyncIO+Unix.swift
2425
Span+Subprocess.swift
2526
AsyncBufferSequence.swift
2627
API.swift
@@ -41,13 +42,13 @@ elseif(APPLE)
4142
Platforms/Subprocess+Darwin.swift
4243
Platforms/Subprocess+Unix.swift)
4344
target_compile_options(Subprocess PRIVATE
44-
"$<$<COMPILE_LANGUAGE:Swift>:-DSUBPROCESS_ASYNCIO_DISPATCH>")
45+
"$<$<COMPILE_LANGUAGE:Swift>:-DSUBPROCESS_ASYNCIO_KQUEUE>")
4546
elseif(FREEBSD OR OPENBSD)
4647
target_sources(Subprocess PRIVATE
4748
Platforms/Subprocess+BSD.swift
4849
Platforms/Subprocess+Unix.swift)
4950
target_compile_options(Subprocess PRIVATE
50-
"$<$<COMPILE_LANGUAGE:Swift>:-DSUBPROCESS_ASYNCIO_DISPATCH>")
51+
"$<$<COMPILE_LANGUAGE:Swift>:-DSUBPROCESS_ASYNCIO_KQUEUE>")
5152
endif()
5253

5354
target_compile_options(Subprocess PRIVATE

0 commit comments

Comments
 (0)