1414public import ContainersPreview
1515import BasicContainers
1616
17+ /// An error indicating that the reader produced more elements than the specified collection limit.
18+ ///
19+ /// This error occurs when calling ``AsyncReader/collect(upTo:body:)`` and the reader's buffer
20+ /// contains more elements than the allowed limit.
21+ public struct AsyncReaderLeftOverElementsError : Error , Hashable {
22+ public init ( ) { }
23+ }
24+
1725@available ( macOS 26 . 2 , iOS 26 . 2 , watchOS 26 . 2 , tvOS 26 . 2 , visionOS 26 . 2 , * )
1826extension AsyncReader where Self: ~ Copyable, Self: ~ Escapable, ReadElement: ~ Copyable {
19- /// Collects elements from the reader up to a specified limit and processes them with a body function .
27+ /// Collects elements from the reader up to a specified limit and processes them.
2028 ///
21- /// This method continuously reads elements from the async reader, accumulating them in a buffer
22- /// until either it reaches the end of the stream (indicated by an empty `Span`) or reaches
23- /// the specified limit. Once collection completes, it passes the accumulated elements to the
24- /// provided body function as a `Span ` for processing.
29+ /// This method continuously reads elements from the async reader, accumulating them in an
30+ /// internal buffer until either it reaches the end of the stream or the specified limit.
31+ /// Once collection completes, it passes the accumulated elements to the provided body
32+ /// closure as an `InputSpan ` for processing.
2533 ///
2634 /// - Parameters:
2735 /// - limit: The maximum number of elements to collect. This prevents unbounded memory
2836 /// growth when reading from potentially infinite streams.
29- /// - body: A closure that receives a `Span` containing all collected elements and returns
30- /// a result of type `Result`. The method calls this closure once after collecting all
31- /// elements successfully.
37+ /// - body: A closure that receives an `InputSpan` containing all collected elements and returns
38+ /// a result of type `Result`.
3239 ///
3340 /// - Returns: The value returned by the body closure after processing the collected elements.
3441 ///
35- /// - Throws: An `EitherError` containing either a `ReadFailure` from the read operation
42+ /// - Throws: An `EitherError` wrapping either a read failure (which itself may be an
43+ /// ``AsyncReaderLeftOverElementsError`` if the reader produces more elements than the limit),
3644 /// or a `Failure` from the body closure.
3745 ///
3846 /// ## Example
@@ -44,153 +52,41 @@ extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Cop
4452 /// // Process all collected elements
4553 /// }
4654 /// ```
47- ///
48- /// ## Memory Considerations
49- ///
50- /// Since this method buffers all elements in memory before processing, it should be used
51- /// with caution on large datasets. The `limit` parameter serves as a safety mechanism
52- /// to prevent excessive memory usage.
5355 public mutating func collect< Result, Failure: Error > (
5456 upTo limit: Int ,
5557 body: ( consuming InputSpan < ReadElement > ) async throws ( Failure ) -> Result
56- ) async throws ( EitherError< ReadFailure, Failure> ) -> Result {
58+ ) async throws ( EitherError< EitherError < ReadFailure, AsyncReaderLeftOverElementsError > , Failure> ) -> Result {
5759 // TODO: In the future we might want to use a temporary allocation instead
5860 // but those don't support async closures yet.
59- var buffer = UniqueArray < ReadElement > ( )
60- buffer . reserveCapacity ( limit)
61+ var collectedBuffer = UniqueArray < ReadElement > ( )
62+ collectedBuffer . reserveCapacity ( limit)
6163 var shouldContinue = true
6264 do {
6365 while shouldContinue {
64- try await self . read (
65- maximumCount: limit - buffer. count
66- ) { ( span: consuming InputSpan < ReadElement > ) in
67- guard span. count > 0 else {
66+ try await self . read { ( buffer: inout Buffer ) throws ( AsyncReaderLeftOverElementsError) -> Void in
67+ guard buffer. count > 0 else {
6868 shouldContinue = false
6969 return
7070 }
71- precondition ( span. count <= limit - buffer. count)
72- while let element = span. popFirst ( ) {
73- buffer. append ( element)
71+ if limit - collectedBuffer. count < buffer. count {
72+ throw AsyncReaderLeftOverElementsError ( )
73+ }
74+ var consumer = buffer. consumeAll ( )
75+ while let element = consumer. next ( ) {
76+ collectedBuffer. append ( element)
7477 }
7578 }
7679 }
7780 } catch {
78- switch error {
79- case . first( let error) :
80- throw . first( error)
81- case . second:
82- fatalError ( )
83- }
81+ throw . first( error)
8482 }
8583 do {
86- var consumer = buffer . consumeAll ( )
84+ var consumer = collectedBuffer . consumeAll ( )
8785 return try await body ( consumer. drainNext ( ) )
8886 } catch {
8987 throw . second( error)
9088 }
9189 }
92-
93- /// Collects elements from the reader up to a specified limit and processes them with a body function.
94- ///
95- /// This method continuously reads elements from the async reader, accumulating them in a buffer
96- /// until either it reaches the end of the stream (indicated by an empty `Span`) or reaches
97- /// the specified limit. Once collection completes, it passes the accumulated elements to the
98- /// provided body function as a `Span` for processing.
99- ///
100- /// - Parameters:
101- /// - limit: The maximum number of elements to collect. This prevents unbounded memory
102- /// growth when reading from potentially infinite streams.
103- /// - body: A closure that receives a `Span` containing all collected elements and returns
104- /// a result of type `Result`. The method calls this closure once after collecting all
105- /// elements successfully.
106- ///
107- /// - Returns: The value returned by the body closure after processing the collected elements.
108- ///
109- /// ## Example
110- ///
111- /// ```swift
112- /// var reader: SomeAsyncReader = ...
113- ///
114- /// let processedData = try await reader.collect(upTo: 1000) { span in
115- /// // Process all collected elements
116- /// }
117- /// ```
118- ///
119- /// ## Memory Considerations
120- ///
121- /// Since this method buffers all elements in memory before processing, it should be used
122- /// with caution on large datasets. The `limit` parameter serves as a safety mechanism
123- /// to prevent excessive memory usage.
124- public mutating func collect< Result> (
125- upTo limit: Int ,
126- body: ( consuming InputSpan < ReadElement > ) async -> Result
127- ) async -> Result where ReadFailure == Never {
128- // TODO: In the future we might want to use a temporary allocation instead
129- // but those don't support async closures yet.
130- var buffer = UniqueArray < ReadElement > ( )
131- buffer. reserveCapacity ( limit)
132- var shouldContinue = true
133- while limit - buffer. count > 0 && shouldContinue {
134- // This force-try is safe since neither read nor the closure are throwing
135- try ! await self . read (
136- maximumCount: limit - buffer. count
137- ) { ( span: consuming InputSpan < ReadElement > ) in
138- precondition ( span. count <= limit - buffer. count)
139- guard span. count > 0 else {
140- // This means the underlying reader is finished and we can return
141- shouldContinue = false
142- return
143- }
144- while let element = span. popFirst ( ) {
145- buffer. append ( element)
146- }
147- }
148- }
149- var consumer = buffer. consumeAll ( )
150- return await body ( consumer. drainNext ( ) )
151- }
152-
153- /// Collects elements from the reader into an output span until the span is full.
154- ///
155- /// This method continuously reads elements from the async reader and appends them to the
156- /// provided output span until the span reaches its capacity. This provides an efficient
157- /// way to fill a pre-allocated buffer with elements from the reader.
158- ///
159- /// - Parameter outputSpan: An `OutputSpan` to append read elements into. The method continues
160- /// reading until this span is full.
161- ///
162- /// - Throws: An error of type `ReadFailure` if any read operation fails.
163- ///
164- /// ## Example
165- ///
166- /// ```swift
167- /// var reader: SomeAsyncReader = ...
168- /// var buffer = [Int](repeating: 0, count: 100)
169- ///
170- /// try await buffer.withOutputSpan { outputSpan in
171- /// try await reader.collect(into: &outputSpan)
172- /// }
173- /// ```
174- public mutating func collect(
175- into outputSpan: inout OutputSpan < ReadElement >
176- ) async throws ( ReadFailure) {
177- while !outputSpan. isFull {
178- do {
179- try await self . read ( maximumCount: outputSpan. freeCapacity) { ( span: consuming InputSpan < ReadElement > ) in
180- while let element = span. popFirst ( ) {
181- outputSpan. append ( element)
182- }
183- }
184- } catch {
185- switch error {
186- case . first( let error) :
187- throw error
188- case . second:
189- fatalError ( )
190- }
191- }
192- }
193- }
19490}
19591
19692#endif
0 commit comments