Skip to content

Commit 3da39bb

Browse files
authored
[AsyncStreaming] Redesign AsyncReader.collect to take a target container (#436)
Replace `collect(upTo:body:)` with three target-based overloads: `collect(into:)`, `collect(exactlyInto:)`, and `collect(into:maximumSize:)`. Add `AsyncReaderInsufficientElementsError` for the exact-match shortfall case.
1 parent c8a9349 commit 3da39bb

8 files changed

Lines changed: 334 additions & 243 deletions

File tree

Sources/AsyncStreaming/AsyncReader/AsyncReader+collect.swift

Lines changed: 121 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -12,114 +12,148 @@
1212
#if UnstableAsyncStreaming && compiler(>=6.4)
1313

1414
public import ContainersPreview
15-
import BasicContainers
1615

17-
/// An error indicating that the reader produced more elements than the specified collection limit.
18-
///
19-
/// This error occurs when calling ``AsyncReader/collect(upTo:body:)`` and the reader's buffer
20-
/// contains more elements than the allowed limit.
16+
/// An error that indicates the reader produced more elements than the
17+
/// destination container could accept.
2118
public struct AsyncReaderLeftOverElementsError: Error, Hashable {
2219
public init() {}
2320
}
2421

22+
/// An error that indicates the reader signaled end-of-stream before producing
23+
/// enough elements to fill the destination container.
24+
public struct AsyncReaderInsufficientElementsError: Error, Hashable {
25+
public init() {}
26+
}
27+
2528
@available(macOS 10.14.4, iOS 12.2, watchOS 5.2, tvOS 12.2, *)
2629
extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Copyable {
27-
/// Collects elements from the reader up to a specified limit and processes them.
28-
///
29-
/// This method continuously reads elements from the async reader, accumulating them in an
30-
/// internal buffer until either the reader signals end-of-stream (by delivering a
31-
/// non-`nil` ``AsyncReader/FinalElement``) or the specified limit is reached.
32-
/// Once collection completes, it passes the accumulated elements to the provided body
33-
/// closure as an `InputSpan` for processing, and returns the body's result together
34-
/// with the ``AsyncReader/FinalElement``.
30+
/// Collects elements from the reader into the provided container, up to the
31+
/// container's available space.
3532
///
36-
/// - Parameters:
37-
/// - limit: The maximum number of elements to collect. This prevents unbounded memory
38-
/// growth when reading from potentially infinite streams.
39-
/// - body: A closure that receives an `InputSpan` containing all collected elements and returns
40-
/// a result of type `Result`.
41-
///
42-
/// - Returns: A tuple of the body closure's result and the ``AsyncReader/FinalElement``
43-
/// delivered with the terminal chunk.
33+
/// Reads chunks from the reader and moves them into `target` until the reader
34+
/// signals end-of-stream. The reader can deliver fewer elements than
35+
/// `target.freeCapacity`; the container needn't be full when the stream ends.
36+
/// If the reader produces more elements than `target` can accept, the method
37+
/// throws ``AsyncReaderLeftOverElementsError``.
4438
///
45-
/// - Throws: An `EitherError` wrapping either a read failure (which itself may be an
46-
/// ``AsyncReaderLeftOverElementsError`` if the reader produces more elements than the limit),
47-
/// or a `Failure` from the body closure.
48-
///
49-
/// ## Example
39+
/// - Parameter target: The container that receives the collected elements.
40+
/// The method preserves the container's existing contents and appends
41+
/// collected elements to the end.
42+
/// - Returns: The ``AsyncReader/FinalElement`` delivered with the terminal chunk.
43+
/// - Throws: An `EitherError` wrapping either a ``AsyncReader/ReadFailure`` or
44+
/// an ``AsyncReaderLeftOverElementsError`` if the reader produced more
45+
/// elements than `target` could accept.
46+
public consuming func collect<Container: RangeReplaceableContainer<ReadElement> & ~Copyable & ~Escapable>(
47+
into target: inout Container
48+
) async throws(EitherError<ReadFailure, AsyncReaderLeftOverElementsError>) -> FinalElement {
49+
var reader = self
50+
var finalElement: FinalElement? = nil
51+
while finalElement == nil {
52+
try await reader.read { (buffer, final) throws(AsyncReaderLeftOverElementsError) -> Void in
53+
if buffer.count > target.freeCapacity {
54+
throw AsyncReaderLeftOverElementsError()
55+
}
56+
target.append(moving: buffer.startIndex..<buffer.endIndex, from: &buffer)
57+
if let final {
58+
finalElement = final
59+
}
60+
}
61+
}
62+
// The force-unwrap is safe since final element must be set at this point
63+
return finalElement!
64+
}
65+
66+
/// Collects elements from the reader into the provided container, requiring
67+
/// that the reader fill the container exactly.
5068
///
51-
/// ```swift
52-
/// var reader: SomeAsyncReader = ...
69+
/// Reads chunks from the reader and moves them into `target` until either the
70+
/// reader signals end-of-stream or the container becomes full. The reader
71+
/// must produce exactly `target.freeCapacity` elements. If it produces fewer
72+
/// before signaling end-of-stream, the method throws
73+
/// ``AsyncReaderInsufficientElementsError``. If it produces more, it throws
74+
/// ``AsyncReaderLeftOverElementsError``.
5375
///
54-
/// let (processedData, _) = try await reader.collect(upTo: 1000) { span in
55-
/// // Process all collected elements
56-
/// }
57-
/// ```
58-
// TODO: We should make this method take an inout `RangeReplacableCollection` instead
59-
public consuming func collect<Result, Failure: Error>(
60-
upTo limit: Int,
61-
body: (consuming InputSpan<ReadElement>) async throws(Failure) -> Result
62-
) async throws(EitherError<EitherError<ReadFailure, AsyncReaderLeftOverElementsError>, Failure>) -> (
63-
Result, FinalElement
64-
) {
76+
/// - Parameter target: The container to fill exactly. The method appends
77+
/// collected elements to the container's existing contents.
78+
/// - Returns: The ``AsyncReader/FinalElement`` delivered with the terminal chunk.
79+
/// - Throws: An `EitherError` wrapping either a ``AsyncReader/ReadFailure``
80+
/// or an `EitherError` wrapping
81+
/// ``AsyncReaderLeftOverElementsError`` (too many elements) or
82+
/// ``AsyncReaderInsufficientElementsError`` (too few elements).
83+
public consuming func collect<Container: RangeReplaceableContainer<ReadElement> & ~Copyable & ~Escapable>(
84+
exactlyInto target: inout Container
85+
) async throws(EitherError<
86+
ReadFailure, EitherError<AsyncReaderLeftOverElementsError, AsyncReaderInsufficientElementsError>
87+
>) -> FinalElement {
6588
var reader = self
66-
// TODO: In the future we might want to use a temporary allocation instead
67-
// but those don't support async closures yet.
68-
var collectedBuffer = UniqueArray<ReadElement>()
69-
collectedBuffer.reserveCapacity(limit)
7089
var finalElement: FinalElement? = nil
71-
do {
72-
while finalElement == nil {
73-
try await reader.read {
74-
(buffer: inout Buffer, final: FinalElement?) throws(AsyncReaderLeftOverElementsError) -> Void in
75-
if buffer.count > 0 {
76-
if limit - collectedBuffer.count < buffer.count {
77-
throw AsyncReaderLeftOverElementsError()
78-
}
79-
var consumer = buffer.consumeAll()
80-
while let element = consumer.next() {
81-
collectedBuffer.append(element)
82-
}
83-
}
84-
if let final {
85-
finalElement = final
86-
}
90+
while finalElement == nil {
91+
try await reader.read {
92+
(
93+
buffer,
94+
final
95+
) throws(EitherError<AsyncReaderLeftOverElementsError, AsyncReaderInsufficientElementsError>) -> Void in
96+
if buffer.count > target.freeCapacity {
97+
throw .first(AsyncReaderLeftOverElementsError())
98+
}
99+
if final != nil, buffer.count < target.freeCapacity {
100+
throw .second(AsyncReaderInsufficientElementsError())
101+
}
102+
target.append(moving: buffer.startIndex..<buffer.endIndex, from: &buffer)
103+
if let final {
104+
finalElement = final
87105
}
88106
}
89-
} catch {
90-
throw .first(error)
91-
}
92-
do {
93-
var consumer = collectedBuffer.consumeAll()
94-
let result = try await body(consumer.drainNext())
95-
// The force-unwrap is safe since final element must be set at this point
96-
return (result, finalElement!)
97-
} catch {
98-
throw .second(error)
99107
}
108+
// The force-unwrap is safe since final element must be set at this point
109+
return finalElement!
100110
}
101-
}
102111

103-
@available(macOS 10.14.4, iOS 12.2, watchOS 5.2, tvOS 12.2, *)
104-
extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Copyable, FinalElement == Void {
105-
/// Collects elements from the reader up to a specified limit and processes them.
112+
/// Collects elements from the reader into the provided dynamic container,
113+
/// growing it up to the specified maximum size.
106114
///
107-
/// This overload is available when ``AsyncReader/FinalElement`` is `Void`.
108-
/// It returns only the body closure's result — there is no payload to surface.
115+
/// Reads chunks from the reader and appends them to `target`, which grows as
116+
/// elements arrive. The reader can deliver fewer than `maximumSize` elements
117+
/// before signaling end-of-stream; if it delivers more, the method throws
118+
/// ``AsyncReaderLeftOverElementsError``.
109119
///
110120
/// - Parameters:
111-
/// - limit: The maximum number of elements to collect.
112-
/// - body: A closure that receives an `InputSpan` of collected elements.
113-
/// - Returns: The body closure's result.
114-
/// - Throws: An `EitherError` wrapping either a read failure (possibly an
115-
/// ``AsyncReaderLeftOverElementsError``) or a `Failure` from `body`.
116-
// TODO: We should make this method take an inout `RangeReplacableCollection` instead
117-
public consuming func collect<Result, Failure: Error>(
118-
upTo limit: Int,
119-
body: (consuming InputSpan<ReadElement>) async throws(Failure) -> Result
120-
) async throws(EitherError<EitherError<ReadFailure, AsyncReaderLeftOverElementsError>, Failure>) -> Result {
121-
let (result, _): (Result, Void?) = try await self.collect(upTo: limit, body: body)
122-
return result
121+
/// - target: The dynamic container that receives the collected elements.
122+
/// The method appends collected elements to the container's existing
123+
/// contents.
124+
/// - maximumSize: The maximum number of elements to append to `target`.
125+
/// - Returns: The ``AsyncReader/FinalElement`` delivered with the terminal chunk.
126+
/// - Throws: An `EitherError` wrapping either a ``AsyncReader/ReadFailure`` or
127+
/// an ``AsyncReaderLeftOverElementsError`` if the reader produced more than
128+
/// `maximumSize` elements.
129+
public consuming func collect<Container: DynamicContainer<ReadElement> & ~Copyable>(
130+
into target: inout Container,
131+
maximumSize: Int
132+
) async throws(EitherError<ReadFailure, AsyncReaderLeftOverElementsError>) -> FinalElement {
133+
precondition(maximumSize >= 0, "maximumSize must be non-negative")
134+
var reader = self
135+
var finalElement: FinalElement? = nil
136+
var remaining = maximumSize
137+
while finalElement == nil {
138+
try await reader.read { (buffer, final) throws(AsyncReaderLeftOverElementsError) -> Void in
139+
let chunkCount = buffer.count
140+
if chunkCount > remaining {
141+
throw AsyncReaderLeftOverElementsError()
142+
}
143+
if chunkCount > 0 {
144+
var consumer = buffer.consumeAll()
145+
while let element = consumer.next() {
146+
target.append(element)
147+
}
148+
remaining -= chunkCount
149+
}
150+
if let final {
151+
finalElement = final
152+
}
153+
}
154+
}
155+
// The force-unwrap is safe since final element must be set at this point
156+
return finalElement!
123157
}
124158
}
125159

Sources/AsyncStreaming/NNNN-async-streaming.md

Lines changed: 36 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -707,7 +707,7 @@ core protocols are established.
707707

708708
Two common patterns emerge immediately when working with `AsyncReader`:
709709
iterating over all chunks until the stream ends, and collecting elements into
710-
a buffer up to a specified limit. We envision convenience extensions for both,
710+
a caller-provided container. We envision convenience extensions for both,
711711
both of which surface the `FinalElement` payload to the caller:
712712

713713
```swift
@@ -721,23 +721,45 @@ extension AsyncReader where Self: ~Copyable, Self: ~Escapable {
721721
}
722722

723723
extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Copyable {
724-
/// Collects up to `limit` elements, then passes the accumulated elements
725-
/// to `body` as an `InputSpan`. Returns the body's result together with
726-
/// the `FinalElement`.
727-
public consuming func collect<Result, Failure: Error>(
728-
upTo limit: Int,
729-
body: (consuming InputSpan<ReadElement>) async throws(Failure) -> Result
730-
) async throws(EitherError<EitherError<ReadFailure, AsyncReaderLeftOverElementsError>, Failure>) -> (Result, FinalElement)
724+
/// Collects elements into `target` up to its free capacity. The reader
725+
/// can deliver fewer; producing more throws.
726+
public consuming func collect<
727+
Container: RangeReplaceableContainer<ReadElement> & ~Copyable & ~Escapable
728+
>(
729+
into target: inout Container
730+
) async throws(EitherError<ReadFailure, AsyncReaderLeftOverElementsError>) -> FinalElement
731+
732+
/// Collects exactly `target.freeCapacity` elements into `target`. Producing
733+
/// fewer throws ``AsyncReaderInsufficientElementsError``; producing more
734+
/// throws ``AsyncReaderLeftOverElementsError``.
735+
public consuming func collect<
736+
Container: RangeReplaceableContainer<ReadElement> & ~Copyable & ~Escapable
737+
>(
738+
exactlyInto target: inout Container
739+
) async throws(
740+
EitherError<ReadFailure, EitherError<AsyncReaderLeftOverElementsError, AsyncReaderInsufficientElementsError>>
741+
) -> FinalElement
742+
743+
/// Collects elements into a dynamic container, growing it up to
744+
/// `maximumSize`. Producing more than `maximumSize` throws.
745+
public consuming func collect<
746+
Container: DynamicContainer<ReadElement> & ~Copyable
747+
>(
748+
into target: inout Container,
749+
maximumSize: Int
750+
) async throws(EitherError<ReadFailure, AsyncReaderLeftOverElementsError>) -> FinalElement
731751
}
732752
```
733753

734754
`forEachBuffer` provides a simple way to consume an entire stream without
735-
manually looping over `read` calls and threading the end signal. `collect`
736-
accumulates elements from multiple reads into a single buffer before
737-
processing, which is useful when an algorithm needs all data in contiguous
738-
memory (for example, parsing a complete message frame). A second
739-
`where FinalElement == Void` overload of `collect` lets callers that don't
740-
need the payload omit the tuple.
755+
manually looping over `read` calls and threading the end signal. The three
756+
`collect` overloads cover the common ways a caller wants to gather a stream's
757+
elements into an existing container: `collect(into:)` opportunistically fills
758+
a fixed-capacity container, `collect(exactlyInto:)` requires the reader to
759+
match the container's free capacity (useful when parsing a known-length
760+
frame), and `collect(into:maximumSize:)` grows a dynamic container with an
761+
upper bound. All three preserve the container's existing contents and append
762+
collected elements to the end.
741763

742764
These helpers are intentionally excluded from this proposal because their
743765
error-handling shapes (particularly `collect`'s nested `EitherError` and the

0 commit comments

Comments
 (0)