Skip to content

Commit a9d7f27

Browse files
committed
Adds a new AsyncReader.forEach method
It is often needed to iterate over all elements of a reader. This PR provides two convenience `forEach` methods to make iteration easier.
1 parent 0a5f92d commit a9d7f27

3 files changed

Lines changed: 247 additions & 0 deletions

File tree

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Swift Async Algorithms open source project
4+
//
5+
// Copyright (c) 2026 Apple Inc. and the Swift project authors
6+
// Licensed under Apache License v2.0 with Runtime Library Exception
7+
//
8+
// See https://swift.org/LICENSE.txt for license information
9+
//
10+
//===----------------------------------------------------------------------===//
11+
12+
#if UnstableAsyncStreaming && compiler(>=6.4)
13+
14+
public import ContainersPreview
15+
16+
// swift-format-ignore: AmbiguousTrailingClosureOverload
17+
@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *)
18+
extension AsyncReader where Self: ~Copyable, Self: ~Escapable {
19+
/// Iterates over all chunks from the reader, executing the provided body for each span.
20+
///
21+
/// This method continuously reads chunks from the async reader until the stream ends,
22+
/// executing the provided closure for each span of elements read. The iteration terminates
23+
/// when the reader produces an empty span, indicating the end of the stream.
24+
///
25+
/// - Parameter body: An asynchronous closure that processes each span of elements read
26+
/// from the stream. The closure receives a `Span<ReadElement>` for each read operation.
27+
///
28+
/// - Throws: An `EitherError` containing either a `ReadFailure` from the read operation
29+
/// or a `Failure` from the body closure.
30+
///
31+
/// ## Example
32+
///
33+
/// ```swift
34+
/// var fileReader: FileAsyncReader = ...
35+
///
36+
/// // Process each chunk of data from the file
37+
/// try await fileReader.forEach { chunk in
38+
/// print("Processing \(chunk.count) elements")
39+
/// // Process the chunk
40+
/// }
41+
/// ```
42+
public consuming func forEachChunk<Failure: Error>(
43+
body: (consuming InputSpan<ReadElement>) async throws(Failure) -> Void
44+
) async throws(EitherError<ReadFailure, Failure>) {
45+
var shouldContinue = true
46+
while shouldContinue {
47+
try await self.read { (next) throws(Failure) -> Void in
48+
guard next.count > 0 else {
49+
shouldContinue = false
50+
return
51+
}
52+
53+
try await body(next)
54+
}
55+
}
56+
}
57+
58+
/// Iterates over all chunks from the reader, executing the provided body for each span.
59+
///
60+
/// This method continuously reads chunks from the async reader until the stream ends,
61+
/// executing the provided closure for each span of elements read. The iteration terminates
62+
/// when the reader produces an empty span, indicating the end of the stream.
63+
///
64+
/// - Parameter body: An asynchronous closure that processes each span of elements read
65+
/// from the stream. The closure receives a `Span<ReadElement>` for each read operation.
66+
///
67+
/// - Throws: An error of type `Failure` from the body closure. Since this reader never fails,
68+
/// only the body closure can throw errors.
69+
///
70+
/// ## Example
71+
///
72+
/// ```swift
73+
/// var fileReader: FileAsyncReader = ...
74+
///
75+
/// // Process each chunk of data from the file
76+
/// try await fileReader.forEach { chunk in
77+
/// print("Processing \(chunk.count) elements")
78+
/// // Process the chunk
79+
/// }
80+
/// ```
81+
@inlinable
82+
public consuming func forEachChunk(
83+
body: (consuming InputSpan<ReadElement>) async -> Void
84+
) async where ReadFailure == Never {
85+
var shouldContinue = true
86+
while shouldContinue {
87+
do {
88+
try await self.read { (next) -> Void in
89+
guard next.count > 0 else {
90+
shouldContinue = false
91+
return
92+
}
93+
94+
await body(next)
95+
}
96+
} catch {
97+
fatalError()
98+
}
99+
}
100+
}
101+
}
102+
#endif

Sources/AsyncStreaming/EitherError.swift

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,4 +53,7 @@ public enum EitherError<First: Error, Second: Error>: Error {
5353
}
5454
}
5555
}
56+
57+
extension EitherError: Equatable where First: Equatable, Second: Equatable {}
58+
extension EitherError: Hashable where First: Hashable, Second: Hashable {}
5659
#endif
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Swift Async Algorithms open source project
4+
//
5+
// Copyright (c) 2026 Apple Inc. and the Swift project authors
6+
// Licensed under Apache License v2.0 with Runtime Library Exception
7+
//
8+
// See https://swift.org/LICENSE.txt for license information
9+
//
10+
//===----------------------------------------------------------------------===//
11+
12+
#if UnstableAsyncStreaming && compiler(>=6.4)
13+
14+
import AsyncStreaming
15+
import BasicContainers
16+
import ContainersPreview
17+
import Testing
18+
19+
@Suite
20+
struct AsyncReaderforEachChunkTests {
21+
@Test
22+
@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *)
23+
func forEachChunkIteratesAllSpans() async throws {
24+
let reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 5, copying: [1, 2, 3, 4, 5]))
25+
var elementCount = 0
26+
27+
await reader.forEachChunk { span in
28+
elementCount += span.count
29+
}
30+
31+
#expect(elementCount == 5)
32+
}
33+
34+
@Test
35+
@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *)
36+
func forEachChunkProcessesElements() async throws {
37+
let reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 3, copying: [10, 20, 30]))
38+
var sum = 0
39+
40+
await reader.forEachChunk { span in
41+
for i in span.indices {
42+
sum += span[i]
43+
}
44+
}
45+
46+
#expect(sum == 60)
47+
}
48+
49+
@Test
50+
@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *)
51+
func forEachChunkWithEmptyReader() async throws {
52+
let reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 0, copying: []))
53+
var callCount = 0
54+
55+
await reader.forEachChunk { span in
56+
callCount += 1
57+
}
58+
59+
#expect(callCount == 0)
60+
}
61+
62+
@Test
63+
@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *)
64+
func forEachChunkWithThrowingBody() async {
65+
enum TestError: Error {
66+
case failed
67+
}
68+
69+
let reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 3, copying: [1, 2, 3]))
70+
71+
do {
72+
try await reader.forEachChunk { (span) throws(TestError) -> Void in
73+
throw TestError.failed
74+
}
75+
Issue.record("Expected error to be thrown")
76+
} catch {
77+
#expect(error == EitherError.second(TestError.failed))
78+
}
79+
}
80+
81+
@Test
82+
@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *)
83+
func forEachChunkWithNeverFailingReader() async {
84+
enum TestError: Error {
85+
case failed
86+
}
87+
88+
let reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 3, copying: [1, 2, 3]))
89+
var count = 0
90+
91+
do {
92+
try await reader.forEachChunk { (span) throws(TestError) -> Void in
93+
count += span.count
94+
}
95+
} catch {
96+
Issue.record("No error should be thrown from reader")
97+
}
98+
99+
#expect(count == 3)
100+
}
101+
102+
@Test
103+
@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *)
104+
func forEachChunkWithAsyncWork() async throws {
105+
let reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 3, copying: [1, 2, 3]))
106+
var results: [Int] = []
107+
108+
await reader.forEachChunk { span in
109+
await Task.yield()
110+
for i in span.indices {
111+
results.append(span[i])
112+
}
113+
}
114+
115+
#expect(results == [1, 2, 3])
116+
}
117+
118+
@Test
119+
@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *)
120+
func forEachChunkMultipleSpans() async {
121+
var reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 6, copying: [1, 2, 3, 4, 5, 6]))
122+
var spanCounts: [Int] = []
123+
124+
// Force reading in smaller chunks
125+
while true {
126+
let hasMore = try! await reader.read(maximumCount: 2) { span in
127+
if span.count > 0 {
128+
spanCounts.append(span.count)
129+
return true
130+
}
131+
return false
132+
}
133+
if !hasMore {
134+
break
135+
}
136+
}
137+
138+
#expect(spanCounts == [2, 2, 2])
139+
}
140+
}
141+
142+
#endif

0 commit comments

Comments
 (0)