Skip to content

Commit adaf220

Browse files
committed
Rework AsyncStreaming protocols to use generic RangeReplaceableContainer buffers
Replace `InputSpan`/`OutputSpan` with generic `Buffer` associated types constrained to `RangeReplaceableContainer` across all four streaming protocols. This sidesteps the limitation that `OutputSpan` (and other `~Escapable` types) cannot be used in async contexts today, while still allowing conforming types to choose a buffer representation optimized for their use case (e.g. `UniqueArray` for heap-backed storage, or a future stack-allocated container for embedded). Switch callee-owned protocols from `consuming` to `inout` closure semantics, rename `forEachChunk` to `forEachBuffer`, and move `forEach`/`collect` to future directions in the proposal. Adds test coverage for `AsyncWriter`, `CallerAsyncReader`, and `EitherError`. Updates all documentation to reflect the new API surface.
1 parent 6783e0f commit adaf220

19 files changed

Lines changed: 596 additions & 492 deletions

Sources/AsyncStreaming/AsyncReader/AsyncReader+collect.swift

Lines changed: 30 additions & 134 deletions
Original file line numberDiff line numberDiff line change
@@ -14,25 +14,33 @@
1414
public import ContainersPreview
1515
import BasicContainers
1616

17+
/// An error indicating that the reader produced more elements than the specified collection limit.
18+
///
19+
/// This error occurs when calling ``AsyncReader/collect(upTo:body:)`` and the reader's buffer
20+
/// contains more elements than the allowed limit.
21+
public struct AsyncReaderLeftOverElementsError: Error, Hashable {
22+
public init() {}
23+
}
24+
1725
@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *)
1826
extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Copyable {
19-
/// Collects elements from the reader up to a specified limit and processes them with a body function.
27+
/// Collects elements from the reader up to a specified limit and processes them.
2028
///
21-
/// This method continuously reads elements from the async reader, accumulating them in a buffer
22-
/// until either it reaches the end of the stream (indicated by an empty `Span`) or reaches
23-
/// the specified limit. Once collection completes, it passes the accumulated elements to the
24-
/// provided body function as a `Span` for processing.
29+
/// This method continuously reads elements from the async reader, accumulating them in an
30+
/// internal buffer until either it reaches the end of the stream or the specified limit.
31+
/// Once collection completes, it passes the accumulated elements to the provided body
32+
/// closure as an `InputSpan` for processing.
2533
///
2634
/// - Parameters:
2735
/// - limit: The maximum number of elements to collect. This prevents unbounded memory
2836
/// growth when reading from potentially infinite streams.
29-
/// - body: A closure that receives a `Span` containing all collected elements and returns
30-
/// a result of type `Result`. The method calls this closure once after collecting all
31-
/// elements successfully.
37+
/// - body: A closure that receives an `InputSpan` containing all collected elements and returns
38+
/// a result of type `Result`.
3239
///
3340
/// - Returns: The value returned by the body closure after processing the collected elements.
3441
///
35-
/// - Throws: An `EitherError` containing either a `ReadFailure` from the read operation
42+
/// - Throws: An `EitherError` wrapping either a read failure (which itself may be an
43+
/// ``AsyncReaderLeftOverElementsError`` if the reader produces more elements than the limit),
3644
/// or a `Failure` from the body closure.
3745
///
3846
/// ## Example
@@ -44,153 +52,41 @@ extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Cop
4452
/// // Process all collected elements
4553
/// }
4654
/// ```
47-
///
48-
/// ## Memory Considerations
49-
///
50-
/// Since this method buffers all elements in memory before processing, it should be used
51-
/// with caution on large datasets. The `limit` parameter serves as a safety mechanism
52-
/// to prevent excessive memory usage.
5355
public mutating func collect<Result, Failure: Error>(
5456
upTo limit: Int,
5557
body: (consuming InputSpan<ReadElement>) async throws(Failure) -> Result
56-
) async throws(EitherError<ReadFailure, Failure>) -> Result {
58+
) async throws(EitherError<EitherError<ReadFailure, AsyncReaderLeftOverElementsError>, Failure>) -> Result {
5759
// TODO: In the future we might want to use a temporary allocation instead
5860
// but those don't support async closures yet.
59-
var buffer = UniqueArray<ReadElement>()
60-
buffer.reserveCapacity(limit)
61+
var collectedBuffer = UniqueArray<ReadElement>()
62+
collectedBuffer.reserveCapacity(limit)
6163
var shouldContinue = true
6264
do {
6365
while shouldContinue {
64-
try await self.read(
65-
maximumCount: limit - buffer.count
66-
) { (span: consuming InputSpan<ReadElement>) in
67-
guard span.count > 0 else {
66+
try await self.read { (buffer: inout Buffer) throws(AsyncReaderLeftOverElementsError) -> Void in
67+
guard buffer.count > 0 else {
6868
shouldContinue = false
6969
return
7070
}
71-
precondition(span.count <= limit - buffer.count)
72-
while let element = span.popFirst() {
73-
buffer.append(element)
71+
if limit - collectedBuffer.count < buffer.count {
72+
throw AsyncReaderLeftOverElementsError()
73+
}
74+
var consumer = buffer.consumeAll()
75+
while let element = consumer.next() {
76+
collectedBuffer.append(element)
7477
}
7578
}
7679
}
7780
} catch {
78-
switch error {
79-
case .first(let error):
80-
throw .first(error)
81-
case .second:
82-
fatalError()
83-
}
81+
throw .first(error)
8482
}
8583
do {
86-
var consumer = buffer.consumeAll()
84+
var consumer = collectedBuffer.consumeAll()
8785
return try await body(consumer.drainNext())
8886
} catch {
8987
throw .second(error)
9088
}
9189
}
92-
93-
/// Collects elements from the reader up to a specified limit and processes them with a body function.
94-
///
95-
/// This method continuously reads elements from the async reader, accumulating them in a buffer
96-
/// until either it reaches the end of the stream (indicated by an empty `Span`) or reaches
97-
/// the specified limit. Once collection completes, it passes the accumulated elements to the
98-
/// provided body function as a `Span` for processing.
99-
///
100-
/// - Parameters:
101-
/// - limit: The maximum number of elements to collect. This prevents unbounded memory
102-
/// growth when reading from potentially infinite streams.
103-
/// - body: A closure that receives a `Span` containing all collected elements and returns
104-
/// a result of type `Result`. The method calls this closure once after collecting all
105-
/// elements successfully.
106-
///
107-
/// - Returns: The value returned by the body closure after processing the collected elements.
108-
///
109-
/// ## Example
110-
///
111-
/// ```swift
112-
/// var reader: SomeAsyncReader = ...
113-
///
114-
/// let processedData = try await reader.collect(upTo: 1000) { span in
115-
/// // Process all collected elements
116-
/// }
117-
/// ```
118-
///
119-
/// ## Memory Considerations
120-
///
121-
/// Since this method buffers all elements in memory before processing, it should be used
122-
/// with caution on large datasets. The `limit` parameter serves as a safety mechanism
123-
/// to prevent excessive memory usage.
124-
public mutating func collect<Result>(
125-
upTo limit: Int,
126-
body: (consuming InputSpan<ReadElement>) async -> Result
127-
) async -> Result where ReadFailure == Never {
128-
// TODO: In the future we might want to use a temporary allocation instead
129-
// but those don't support async closures yet.
130-
var buffer = UniqueArray<ReadElement>()
131-
buffer.reserveCapacity(limit)
132-
var shouldContinue = true
133-
while limit - buffer.count > 0 && shouldContinue {
134-
// This force-try is safe since neither read nor the closure are throwing
135-
try! await self.read(
136-
maximumCount: limit - buffer.count
137-
) { (span: consuming InputSpan<ReadElement>) in
138-
precondition(span.count <= limit - buffer.count)
139-
guard span.count > 0 else {
140-
// This means the underlying reader is finished and we can return
141-
shouldContinue = false
142-
return
143-
}
144-
while let element = span.popFirst() {
145-
buffer.append(element)
146-
}
147-
}
148-
}
149-
var consumer = buffer.consumeAll()
150-
return await body(consumer.drainNext())
151-
}
152-
153-
/// Collects elements from the reader into an output span until the span is full.
154-
///
155-
/// This method continuously reads elements from the async reader and appends them to the
156-
/// provided output span until the span reaches its capacity. This provides an efficient
157-
/// way to fill a pre-allocated buffer with elements from the reader.
158-
///
159-
/// - Parameter outputSpan: An `OutputSpan` to append read elements into. The method continues
160-
/// reading until this span is full.
161-
///
162-
/// - Throws: An error of type `ReadFailure` if any read operation fails.
163-
///
164-
/// ## Example
165-
///
166-
/// ```swift
167-
/// var reader: SomeAsyncReader = ...
168-
/// var buffer = [Int](repeating: 0, count: 100)
169-
///
170-
/// try await buffer.withOutputSpan { outputSpan in
171-
/// try await reader.collect(into: &outputSpan)
172-
/// }
173-
/// ```
174-
public mutating func collect(
175-
into outputSpan: inout OutputSpan<ReadElement>
176-
) async throws(ReadFailure) {
177-
while !outputSpan.isFull {
178-
do {
179-
try await self.read(maximumCount: outputSpan.freeCapacity) { (span: consuming InputSpan<ReadElement>) in
180-
while let element = span.popFirst() {
181-
outputSpan.append(element)
182-
}
183-
}
184-
} catch {
185-
switch error {
186-
case .first(let error):
187-
throw error
188-
case .second:
189-
fatalError()
190-
}
191-
}
192-
}
193-
}
19490
}
19591

19692
#endif

Sources/AsyncStreaming/AsyncReader/AsyncReader+forEach.swift

Lines changed: 21 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,14 @@ public import ContainersPreview
1616
// swift-format-ignore: AmbiguousTrailingClosureOverload
1717
@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *)
1818
extension AsyncReader where Self: ~Copyable, Self: ~Escapable {
19-
/// Iterates over all chunks from the reader, executing the provided body for each span.
19+
/// Iterates over all chunks from the reader, executing the provided body for each buffer.
2020
///
2121
/// This method continuously reads chunks from the async reader until the stream ends,
22-
/// executing the provided closure for each span of elements read. The iteration terminates
23-
/// when the reader produces an empty span, indicating the end of the stream.
22+
/// executing the provided closure for each buffer of elements read. The iteration terminates
23+
/// when the reader produces an empty buffer, indicating the end of the stream.
2424
///
25-
/// - Parameter body: An asynchronous closure that processes each span of elements read
26-
/// from the stream. The closure receives a `Span<ReadElement>` for each read operation.
25+
/// - Parameter body: An asynchronous closure that processes each buffer of elements read
26+
/// from the stream.
2727
///
2828
/// - Throws: An `EitherError` containing either a `ReadFailure` from the read operation
2929
/// or a `Failure` from the body closure.
@@ -33,14 +33,12 @@ extension AsyncReader where Self: ~Copyable, Self: ~Escapable {
3333
/// ```swift
3434
/// var fileReader: FileAsyncReader = ...
3535
///
36-
/// // Process each chunk of data from the file
37-
/// try await fileReader.forEach { chunk in
38-
/// print("Processing \(chunk.count) elements")
39-
/// // Process the chunk
36+
/// try await fileReader.forEachBuffer { buffer in
37+
/// print("Processing \(buffer.count) elements")
4038
/// }
4139
/// ```
42-
public consuming func forEachChunk<Failure: Error>(
43-
body: (consuming InputSpan<ReadElement>) async throws(Failure) -> Void
40+
public consuming func forEachBuffer<Failure: Error>(
41+
body: (inout Buffer) async throws(Failure) -> Void
4442
) async throws(EitherError<ReadFailure, Failure>) {
4543
var shouldContinue = true
4644
while shouldContinue {
@@ -50,37 +48,34 @@ extension AsyncReader where Self: ~Copyable, Self: ~Escapable {
5048
return
5149
}
5250

53-
try await body(next)
51+
try await body(&next)
5452
}
5553
}
5654
}
5755

58-
/// Iterates over all chunks from the reader, executing the provided body for each span.
56+
/// Iterates over all chunks from a non-failing reader, executing the provided body for each buffer.
5957
///
6058
/// This method continuously reads chunks from the async reader until the stream ends,
61-
/// executing the provided closure for each span of elements read. The iteration terminates
62-
/// when the reader produces an empty span, indicating the end of the stream.
59+
/// executing the provided closure for each buffer of elements read. The iteration terminates
60+
/// when the reader produces an empty buffer, indicating the end of the stream.
6361
///
64-
/// - Parameter body: An asynchronous closure that processes each span of elements read
65-
/// from the stream. The closure receives a `Span<ReadElement>` for each read operation.
62+
/// Use this overload when the reader's ``AsyncReader/ReadFailure`` type is `Never`.
6663
///
67-
/// - Throws: An error of type `Failure` from the body closure. Since this reader never fails,
68-
/// only the body closure can throw errors.
64+
/// - Parameter body: An asynchronous closure that processes each buffer of elements read
65+
/// from the stream.
6966
///
7067
/// ## Example
7168
///
7269
/// ```swift
7370
/// var fileReader: FileAsyncReader = ...
7471
///
75-
/// // Process each chunk of data from the file
76-
/// try await fileReader.forEach { chunk in
77-
/// print("Processing \(chunk.count) elements")
78-
/// // Process the chunk
72+
/// await fileReader.forEachBuffer { buffer in
73+
/// print("Processing \(buffer.count) elements")
7974
/// }
8075
/// ```
8176
@inlinable
82-
public consuming func forEachChunk(
83-
body: (consuming InputSpan<ReadElement>) async -> Void
77+
public consuming func forEachBuffer(
78+
body: (inout Buffer) async -> Void
8479
) async where ReadFailure == Never {
8580
var shouldContinue = true
8681
while shouldContinue {
@@ -91,7 +86,7 @@ extension AsyncReader where Self: ~Copyable, Self: ~Escapable {
9186
return
9287
}
9388

94-
await body(next)
89+
await body(&next)
9590
}
9691
} catch {
9792
fatalError()

Sources/AsyncStreaming/AsyncReader/AsyncReader.swift

Lines changed: 18 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -12,58 +12,47 @@
1212
#if UnstableAsyncStreaming && compiler(>=6.4)
1313
public import ContainersPreview
1414

15-
/// Reads elements asynchronously from a source.
15+
/// Reads elements asynchronously from a source using callee-managed buffering.
1616
///
17-
/// Adopt ``AsyncReader`` when you need to provide callee-managed buffering,
18-
/// where the reader controls the buffer and passes a span of elements
19-
/// to the caller through the `body` closure.
17+
/// Adopt ``AsyncReader`` when you need callee-managed buffering,
18+
/// where the reader controls the buffer and passes it to the caller
19+
/// through the `body` closure.
2020
@available(macOS 10.14.4, iOS 12.2, watchOS 5.2, tvOS 12.2, visionOS 1.0, *)
2121
public protocol AsyncReader<ReadElement, ReadFailure>: ~Copyable, ~Escapable {
2222
/// The type of elements this reader reads.
2323
associatedtype ReadElement: ~Copyable
2424

25+
/// The container type the reader uses to pass elements to the caller.
26+
associatedtype Buffer: RangeReplaceableContainer<ReadElement> & ~Copyable
27+
2528
/// The error type that reading operations throw.
2629
associatedtype ReadFailure: Error
2730

2831
/// Reads elements from the underlying source and passes them to the provided body closure.
2932
///
30-
/// This method asynchronously reads a span of elements from the source,
31-
/// then passes them to `body` for processing.
33+
/// This method asynchronously reads elements from the source into a buffer,
34+
/// then passes the buffer to `body` for processing. When the buffer is empty,
35+
/// the stream has ended.
3236
///
3337
/// ```swift
3438
/// var fileReader: FileAsyncReader = ...
3539
///
36-
/// // Read data from a file asynchronously and process it.
37-
/// let result = try await fileReader.read { data in
38-
/// guard data.count > 0 else {
39-
/// return
40+
/// let result = try await fileReader.read { buffer in
41+
/// guard buffer.count > 0 else {
42+
/// return 0
4043
/// }
41-
/// return data
44+
/// return buffer.count
4245
/// }
4346
/// ```
4447
///
45-
/// - Parameter maximumCount: The maximum count of items you're ready
46-
/// to process. Must be greater than zero.
47-
/// - Parameter body: A closure that processes a span of read elements
48-
/// and returns a value of type `Return`. When the span is empty,
49-
/// it indicates the end of the stream.
48+
/// - Parameter body: A closure that receives a mutable reference to the buffer
49+
/// of read elements and returns a value of type `Return`. When the buffer
50+
/// is empty, it indicates the end of the stream.
5051
/// - Returns: The value the body closure returns after processing the read elements.
5152
/// - Throws: An `EitherError` containing either a `ReadFailure` from the read operation
5253
/// or a `Failure` from the body closure.
5354
mutating func read<Return: ~Copyable, Failure: Error>(
54-
maximumCount: Int,
55-
body: (consuming InputSpan<ReadElement>) async throws(Failure) -> Return
55+
body: (inout Buffer) async throws(Failure) -> Return
5656
) async throws(EitherError<ReadFailure, Failure>) -> Return
57-
58-
}
59-
60-
@available(macOS 10.14.4, iOS 12.2, watchOS 5.2, tvOS 12.2, visionOS 1.0, *)
61-
extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Copyable {
62-
/// Reads elements with no upper bound on span size.
63-
public mutating func read<Return: ~Copyable, Failure: Error>(
64-
body: (consuming InputSpan<ReadElement>) async throws(Failure) -> Return
65-
) async throws(EitherError<ReadFailure, Failure>) -> Return {
66-
try await read(maximumCount: .max, body: body)
67-
}
6857
}
6958
#endif

0 commit comments

Comments
 (0)