Skip to content

Commit 72b507d

Browse files
committed
Adds a new AsyncReader.collect method
Often developers want to collect elements from a reader up to a certain limit. This PR provides a few convenience methods for this.
1 parent 0a5f92d commit 72b507d

3 files changed

Lines changed: 296 additions & 1 deletion

File tree

Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Swift Async Algorithms open source project
4+
//
5+
// Copyright (c) 2026 Apple Inc. and the Swift project authors
6+
// Licensed under Apache License v2.0 with Runtime Library Exception
7+
//
8+
// See https://swift.org/LICENSE.txt for license information
9+
//
10+
//===----------------------------------------------------------------------===//
11+
12+
#if UnstableAsyncStreaming && compiler(>=6.4) || true
13+
14+
public import ContainersPreview
15+
import BasicContainers
16+
17+
@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *)
18+
extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Copyable {
19+
/// Collects elements from the reader up to a specified limit and processes them with a body function.
20+
///
21+
/// This method continuously reads elements from the async reader, accumulating them in a buffer
22+
/// until either it reaches the end of the stream (indicated by an empty `Span`) or reaches
23+
/// the specified limit. Once collection completes, it passes the accumulated elements to the
24+
/// provided body function as a `Span` for processing.
25+
///
26+
/// - Parameters:
27+
/// - limit: The maximum number of elements to collect. This prevents unbounded memory
28+
/// growth when reading from potentially infinite streams.
29+
/// - body: A closure that receives a `Span` containing all collected elements and returns
30+
/// a result of type `Result`. The method calls this closure once after collecting all
31+
/// elements successfully.
32+
///
33+
/// - Returns: The value returned by the body closure after processing the collected elements.
34+
///
35+
/// - Throws: An `EitherError` containing either a `ReadFailure` from the read operation
36+
/// or a `Failure` from the body closure.
37+
///
38+
/// ## Example
39+
///
40+
/// ```swift
41+
/// var reader: SomeAsyncReader = ...
42+
///
43+
/// let processedData = try await reader.collect(upTo: 1000) { span in
44+
/// // Process all collected elements
45+
/// }
46+
/// ```
47+
///
48+
/// ## Memory Considerations
49+
///
50+
/// Since this method buffers all elements in memory before processing, it should be used
51+
/// with caution on large datasets. The `limit` parameter serves as a safety mechanism
52+
/// to prevent excessive memory usage.
53+
public mutating func collect<Result, Failure: Error>(
54+
upTo limit: Int,
55+
body: (consuming InputSpan<ReadElement>) async throws(Failure) -> Result
56+
) async throws(EitherError<ReadFailure, Failure>) -> Result {
57+
// TODO: In the future we might want to use a temporary allocation instead
58+
// but those don't support async closures yet.
59+
var buffer = UniqueArray<ReadElement>()
60+
var shouldContinue = true
61+
do {
62+
while shouldContinue {
63+
try await self.read(
64+
maximumCount: limit - buffer.count
65+
) { (span: consuming InputSpan<ReadElement>) in
66+
guard span.count > 0 else {
67+
shouldContinue = false
68+
return
69+
}
70+
precondition(span.count <= limit - buffer.count)
71+
while let element = span.popFirst() {
72+
buffer.append(element)
73+
}
74+
}
75+
}
76+
} catch {
77+
switch error {
78+
case .first(let error):
79+
throw .first(error)
80+
case .second:
81+
fatalError()
82+
}
83+
}
84+
do {
85+
var consumer = buffer.consumeAll()
86+
return try await body(consumer.drainNext())
87+
} catch {
88+
throw .second(error)
89+
}
90+
}
91+
92+
/// Collects elements from the reader up to a specified limit and processes them with a body function.
93+
///
94+
/// This method continuously reads elements from the async reader, accumulating them in a buffer
95+
/// until either it reaches the end of the stream (indicated by an empty `Span`) or reaches
96+
/// the specified limit. Once collection completes, it passes the accumulated elements to the
97+
/// provided body function as a `Span` for processing.
98+
///
99+
/// - Parameters:
100+
/// - limit: The maximum number of elements to collect. This prevents unbounded memory
101+
/// growth when reading from potentially infinite streams.
102+
/// - body: A closure that receives a `Span` containing all collected elements and returns
103+
/// a result of type `Result`. The method calls this closure once after collecting all
104+
/// elements successfully.
105+
///
106+
/// - Returns: The value returned by the body closure after processing the collected elements.
107+
///
108+
/// ## Example
109+
///
110+
/// ```swift
111+
/// var reader: SomeAsyncReader = ...
112+
///
113+
/// let processedData = try await reader.collect(upTo: 1000) { span in
114+
/// // Process all collected elements
115+
/// }
116+
/// ```
117+
///
118+
/// ## Memory Considerations
119+
///
120+
/// Since this method buffers all elements in memory before processing, it should be used
121+
/// with caution on large datasets. The `limit` parameter serves as a safety mechanism
122+
/// to prevent excessive memory usage.
123+
public mutating func collect<Result>(
124+
upTo limit: Int,
125+
body: (consuming InputSpan<ReadElement>) async -> Result
126+
) async -> Result where ReadFailure == Never {
127+
// TODO: In the future we might want to use a temporary allocation instead
128+
// but those don't support async closures yet.
129+
var buffer = UniqueArray<ReadElement>()
130+
var shouldContinue = true
131+
while limit - buffer.count > 0 && shouldContinue {
132+
// This force-try is safe since neither read nor the closure are throwing
133+
try! await self.read(
134+
maximumCount: limit - buffer.count
135+
) { (span: consuming InputSpan<ReadElement>) in
136+
precondition(span.count <= limit - buffer.count)
137+
guard span.count > 0 else {
138+
// This means the underlying reader is finished and we can return
139+
shouldContinue = false
140+
return
141+
}
142+
while let element = span.popFirst() {
143+
buffer.append(element)
144+
}
145+
}
146+
}
147+
var consumer = buffer.consumeAll()
148+
return await body(consumer.drainNext())
149+
}
150+
151+
/// Collects elements from the reader into an output span until the span is full.
152+
///
153+
/// This method continuously reads elements from the async reader and appends them to the
154+
/// provided output span until the span reaches its capacity. This provides an efficient
155+
/// way to fill a pre-allocated buffer with elements from the reader.
156+
///
157+
/// - Parameter outputSpan: An `OutputSpan` to append read elements into. The method continues
158+
/// reading until this span is full.
159+
///
160+
/// - Throws: An error of type `ReadFailure` if any read operation fails.
161+
///
162+
/// ## Example
163+
///
164+
/// ```swift
165+
/// var reader: SomeAsyncReader = ...
166+
/// var buffer = [Int](repeating: 0, count: 100)
167+
///
168+
/// try await buffer.withOutputSpan { outputSpan in
169+
/// try await reader.collect(into: &outputSpan)
170+
/// }
171+
/// ```
172+
public mutating func collect(
173+
into outputSpan: inout OutputSpan<ReadElement>
174+
) async throws(ReadFailure) {
175+
while !outputSpan.isFull {
176+
do {
177+
try await self.read(maximumCount: outputSpan.freeCapacity) { (span: consuming InputSpan<ReadElement>) in
178+
while let element = span.popFirst() {
179+
outputSpan.append(element)
180+
}
181+
}
182+
} catch {
183+
switch error {
184+
case .first(let error):
185+
throw error
186+
case .second:
187+
fatalError()
188+
}
189+
}
190+
}
191+
}
192+
}
193+
194+
#endif

Sources/AsyncStreaming/AsyncReader/AsyncReader.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public protocol AsyncReader<ReadElement, ReadFailure>: ~Copyable, ~Escapable {
5858
}
5959

6060
@available(macOS 10.14.4, iOS 12.2, watchOS 5.2, tvOS 12.2, visionOS 1.0, *)
61-
extension AsyncReader where Self: ~Copyable, Self: ~Escapable {
61+
extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Copyable {
6262
/// Reads elements with no upper bound on span size.
6363
public mutating func read<Return: ~Copyable, Failure: Error>(
6464
body: (consuming InputSpan<ReadElement>) async throws(Failure) -> Return
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Swift Async Algorithms open source project
4+
//
5+
// Copyright (c) 2026 Apple Inc. and the Swift project authors
6+
// Licensed under Apache License v2.0 with Runtime Library Exception
7+
//
8+
// See https://swift.org/LICENSE.txt for license information
9+
//
10+
//===----------------------------------------------------------------------===//
11+
12+
#if UnstableAsyncStreaming && compiler(>=6.4) || true
13+
14+
import AsyncStreaming
15+
import BasicContainers
16+
import ContainersPreview
17+
import Testing
18+
19+
@Suite
20+
struct AsyncReaderCollectTests {
21+
@Test
22+
@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *)
23+
func collectAllElements() async {
24+
var reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 5, copying: [1, 2, 3, 4, 5]))
25+
26+
let result = await reader.collect(upTo: 10) { span in
27+
return Array(span)
28+
}
29+
30+
#expect(result == [1, 2, 3, 4, 5])
31+
}
32+
33+
@Test
34+
@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *)
35+
func collectWithExactLimit() async {
36+
var reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 5, copying: [1, 2, 3, 4, 5]))
37+
38+
let result = await reader.collect(upTo: 5) { span in
39+
return Array(span)
40+
}
41+
42+
#expect(result == [1, 2, 3, 4, 5])
43+
}
44+
45+
@Test
46+
@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *)
47+
func collectEmptyReader() async {
48+
var reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 0, copying: []))
49+
50+
let result = await reader.collect(upTo: 10) { span in
51+
return span.count
52+
}
53+
54+
#expect(result == 0)
55+
}
56+
57+
@Test
58+
@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *)
59+
func collectProcessesAllElements() async {
60+
var reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 3, copying: [10, 20, 30]))
61+
62+
let result = await reader.collect(upTo: 10) { span in
63+
var sum = 0
64+
for i in span.indices {
65+
sum += span[i]
66+
}
67+
return sum
68+
}
69+
70+
#expect(result == 60)
71+
}
72+
73+
@Test
74+
@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *)
75+
func collectIntoOutputSpan() async {
76+
// TODO: Cannot test this yet since we can't get `InputSpan`s available in async contexts
77+
// var reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 5, copying: [1, 2, 3, 4, 5]))
78+
// var buffer = RigidArray<Int>.init(capacity: 5)
79+
//
80+
// await buffer.append(count: 5) { outputSpan in
81+
// await reader.collect(into: &outputSpan)
82+
// }
83+
//
84+
// #expect(buffer.count == 5)
85+
}
86+
87+
@Test
88+
@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *)
89+
func collectWithNeverFailingReader() async {
90+
var reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 3, copying: [1, 2, 3]))
91+
92+
// This tests the Never overload
93+
let result = await reader.collect(upTo: 10) { span in
94+
return span.count
95+
}
96+
97+
#expect(result == 3)
98+
}
99+
}
100+
101+
#endif

0 commit comments

Comments
 (0)