Skip to content

Commit d14a83b

Browse files
authored
Adds a new AsyncReader.collect method (#415)
Often developers want to collect elements from a reader up to a certain limit. This PR provides a few convenience methods for this.
1 parent 0a5f92d commit d14a83b

3 files changed

Lines changed: 298 additions & 1 deletion

File tree

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Swift Async Algorithms open source project
4+
//
5+
// Copyright (c) 2026 Apple Inc. and the Swift project authors
6+
// Licensed under Apache License v2.0 with Runtime Library Exception
7+
//
8+
// See https://swift.org/LICENSE.txt for license information
9+
//
10+
//===----------------------------------------------------------------------===//
11+
12+
#if UnstableAsyncStreaming && compiler(>=6.4)
13+
14+
public import ContainersPreview
15+
import BasicContainers
16+
17+
@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *)
18+
extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Copyable {
19+
/// Collects elements from the reader up to a specified limit and processes them with a body function.
20+
///
21+
/// This method continuously reads elements from the async reader, accumulating them in a buffer
22+
/// until either it reaches the end of the stream (indicated by an empty `Span`) or reaches
23+
/// the specified limit. Once collection completes, it passes the accumulated elements to the
24+
/// provided body function as a `Span` for processing.
25+
///
26+
/// - Parameters:
27+
/// - limit: The maximum number of elements to collect. This prevents unbounded memory
28+
/// growth when reading from potentially infinite streams.
29+
/// - body: A closure that receives a `Span` containing all collected elements and returns
30+
/// a result of type `Result`. The method calls this closure once after collecting all
31+
/// elements successfully.
32+
///
33+
/// - Returns: The value returned by the body closure after processing the collected elements.
34+
///
35+
/// - Throws: An `EitherError` containing either a `ReadFailure` from the read operation
36+
/// or a `Failure` from the body closure.
37+
///
38+
/// ## Example
39+
///
40+
/// ```swift
41+
/// var reader: SomeAsyncReader = ...
42+
///
43+
/// let processedData = try await reader.collect(upTo: 1000) { span in
44+
/// // Process all collected elements
45+
/// }
46+
/// ```
47+
///
48+
/// ## Memory Considerations
49+
///
50+
/// Since this method buffers all elements in memory before processing, it should be used
51+
/// with caution on large datasets. The `limit` parameter serves as a safety mechanism
52+
/// to prevent excessive memory usage.
53+
public mutating func collect<Result, Failure: Error>(
54+
upTo limit: Int,
55+
body: (consuming InputSpan<ReadElement>) async throws(Failure) -> Result
56+
) async throws(EitherError<ReadFailure, Failure>) -> Result {
57+
// TODO: In the future we might want to use a temporary allocation instead
58+
// but those don't support async closures yet.
59+
var buffer = UniqueArray<ReadElement>()
60+
buffer.reserveCapacity(limit)
61+
var shouldContinue = true
62+
do {
63+
while shouldContinue {
64+
try await self.read(
65+
maximumCount: limit - buffer.count
66+
) { (span: consuming InputSpan<ReadElement>) in
67+
guard span.count > 0 else {
68+
shouldContinue = false
69+
return
70+
}
71+
precondition(span.count <= limit - buffer.count)
72+
while let element = span.popFirst() {
73+
buffer.append(element)
74+
}
75+
}
76+
}
77+
} catch {
78+
switch error {
79+
case .first(let error):
80+
throw .first(error)
81+
case .second:
82+
fatalError()
83+
}
84+
}
85+
do {
86+
var consumer = buffer.consumeAll()
87+
return try await body(consumer.drainNext())
88+
} catch {
89+
throw .second(error)
90+
}
91+
}
92+
93+
/// Collects elements from the reader up to a specified limit and processes them with a body function.
94+
///
95+
/// This method continuously reads elements from the async reader, accumulating them in a buffer
96+
/// until either it reaches the end of the stream (indicated by an empty `Span`) or reaches
97+
/// the specified limit. Once collection completes, it passes the accumulated elements to the
98+
/// provided body function as a `Span` for processing.
99+
///
100+
/// - Parameters:
101+
/// - limit: The maximum number of elements to collect. This prevents unbounded memory
102+
/// growth when reading from potentially infinite streams.
103+
/// - body: A closure that receives a `Span` containing all collected elements and returns
104+
/// a result of type `Result`. The method calls this closure once after collecting all
105+
/// elements successfully.
106+
///
107+
/// - Returns: The value returned by the body closure after processing the collected elements.
108+
///
109+
/// ## Example
110+
///
111+
/// ```swift
112+
/// var reader: SomeAsyncReader = ...
113+
///
114+
/// let processedData = try await reader.collect(upTo: 1000) { span in
115+
/// // Process all collected elements
116+
/// }
117+
/// ```
118+
///
119+
/// ## Memory Considerations
120+
///
121+
/// Since this method buffers all elements in memory before processing, it should be used
122+
/// with caution on large datasets. The `limit` parameter serves as a safety mechanism
123+
/// to prevent excessive memory usage.
124+
public mutating func collect<Result>(
125+
upTo limit: Int,
126+
body: (consuming InputSpan<ReadElement>) async -> Result
127+
) async -> Result where ReadFailure == Never {
128+
// TODO: In the future we might want to use a temporary allocation instead
129+
// but those don't support async closures yet.
130+
var buffer = UniqueArray<ReadElement>()
131+
buffer.reserveCapacity(limit)
132+
var shouldContinue = true
133+
while limit - buffer.count > 0 && shouldContinue {
134+
// This force-try is safe since neither read nor the closure are throwing
135+
try! await self.read(
136+
maximumCount: limit - buffer.count
137+
) { (span: consuming InputSpan<ReadElement>) in
138+
precondition(span.count <= limit - buffer.count)
139+
guard span.count > 0 else {
140+
// This means the underlying reader is finished and we can return
141+
shouldContinue = false
142+
return
143+
}
144+
while let element = span.popFirst() {
145+
buffer.append(element)
146+
}
147+
}
148+
}
149+
var consumer = buffer.consumeAll()
150+
return await body(consumer.drainNext())
151+
}
152+
153+
/// Collects elements from the reader into an output span until the span is full.
154+
///
155+
/// This method continuously reads elements from the async reader and appends them to the
156+
/// provided output span until the span reaches its capacity. This provides an efficient
157+
/// way to fill a pre-allocated buffer with elements from the reader.
158+
///
159+
/// - Parameter outputSpan: An `OutputSpan` to append read elements into. The method continues
160+
/// reading until this span is full.
161+
///
162+
/// - Throws: An error of type `ReadFailure` if any read operation fails.
163+
///
164+
/// ## Example
165+
///
166+
/// ```swift
167+
/// var reader: SomeAsyncReader = ...
168+
/// var buffer = [Int](repeating: 0, count: 100)
169+
///
170+
/// try await buffer.withOutputSpan { outputSpan in
171+
/// try await reader.collect(into: &outputSpan)
172+
/// }
173+
/// ```
174+
public mutating func collect(
175+
into outputSpan: inout OutputSpan<ReadElement>
176+
) async throws(ReadFailure) {
177+
while !outputSpan.isFull {
178+
do {
179+
try await self.read(maximumCount: outputSpan.freeCapacity) { (span: consuming InputSpan<ReadElement>) in
180+
while let element = span.popFirst() {
181+
outputSpan.append(element)
182+
}
183+
}
184+
} catch {
185+
switch error {
186+
case .first(let error):
187+
throw error
188+
case .second:
189+
fatalError()
190+
}
191+
}
192+
}
193+
}
194+
}
195+
196+
#endif

Sources/AsyncStreaming/AsyncReader/AsyncReader.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public protocol AsyncReader<ReadElement, ReadFailure>: ~Copyable, ~Escapable {
5858
}
5959

6060
@available(macOS 10.14.4, iOS 12.2, watchOS 5.2, tvOS 12.2, visionOS 1.0, *)
61-
extension AsyncReader where Self: ~Copyable, Self: ~Escapable {
61+
extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Copyable {
6262
/// Reads elements with no upper bound on span size.
6363
public mutating func read<Return: ~Copyable, Failure: Error>(
6464
body: (consuming InputSpan<ReadElement>) async throws(Failure) -> Return
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Swift Async Algorithms open source project
4+
//
5+
// Copyright (c) 2026 Apple Inc. and the Swift project authors
6+
// Licensed under Apache License v2.0 with Runtime Library Exception
7+
//
8+
// See https://swift.org/LICENSE.txt for license information
9+
//
10+
//===----------------------------------------------------------------------===//
11+
12+
#if UnstableAsyncStreaming && compiler(>=6.4)
13+
14+
import AsyncStreaming
15+
import BasicContainers
16+
import ContainersPreview
17+
import Testing
18+
19+
@Suite
20+
struct AsyncReaderCollectTests {
21+
@Test
22+
@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *)
23+
func collectAllElements() async {
24+
var reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 5, copying: [1, 2, 3, 4, 5]))
25+
26+
let result = await reader.collect(upTo: 10) { span in
27+
return Array(span)
28+
}
29+
30+
#expect(result == [1, 2, 3, 4, 5])
31+
}
32+
33+
@Test
34+
@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *)
35+
func collectWithExactLimit() async {
36+
var reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 5, copying: [1, 2, 3, 4, 5]))
37+
38+
let result = await reader.collect(upTo: 5) { span in
39+
return Array(span)
40+
}
41+
42+
#expect(result == [1, 2, 3, 4, 5])
43+
}
44+
45+
@Test
46+
@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *)
47+
func collectEmptyReader() async {
48+
var reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 0, copying: []))
49+
50+
let result = await reader.collect(upTo: 10) { span in
51+
return span.count
52+
}
53+
54+
#expect(result == 0)
55+
}
56+
57+
@Test
58+
@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *)
59+
func collectProcessesAllElements() async {
60+
var reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 3, copying: [10, 20, 30]))
61+
62+
let result = await reader.collect(upTo: 10) { span in
63+
var sum = 0
64+
for i in span.indices {
65+
sum += span[i]
66+
}
67+
return sum
68+
}
69+
70+
#expect(result == 60)
71+
}
72+
73+
@Test
74+
@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *)
75+
func collectIntoOutputSpan() async {
76+
// TODO: Cannot test this yet since we can't get `InputSpan`s available in async contexts
77+
// var reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 5, copying: [1, 2, 3, 4, 5]))
78+
// var buffer = RigidArray<Int>.init(capacity: 5)
79+
//
80+
// await buffer.append(count: 5) { outputSpan in
81+
// await reader.collect(into: &outputSpan)
82+
// }
83+
//
84+
// #expect(buffer.count == 5)
85+
}
86+
87+
@Test
88+
@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *)
89+
func collectWithNeverFailingReader() async {
90+
var reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 3, copying: [1, 2, 3]))
91+
92+
// This tests the Never overload
93+
let result = await reader.collect(upTo: 10) { span in
94+
return span.count
95+
}
96+
97+
#expect(result == 3)
98+
}
99+
}
100+
101+
#endif

0 commit comments

Comments
 (0)