forked from apple/swift-async-algorithms
-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathAsyncReader+forEach.swift
More file actions
99 lines (95 loc) · 3.39 KB
/
Copy pathAsyncReader+forEach.swift
File metadata and controls
99 lines (95 loc) · 3.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Async Algorithms open source project
//
// Copyright (c) 2026 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
//
//===----------------------------------------------------------------------===//
#if UnstableAsyncStreaming && compiler(>=6.4)
import ContainersPreview
// swift-format-ignore: AmbiguousTrailingClosureOverload
@available(macOS 10.14.4, iOS 12.2, watchOS 5.2, tvOS 12.2, *)
extension AsyncReader where Self: ~Copyable, Self: ~Escapable {
/// Iterates over all chunks from the reader, executing the provided body for
/// each buffer until the stream signals end-of-stream.
///
/// This method continuously reads chunks from the async reader, executing
/// `body` for every chunk — including the terminal one — and terminates the
/// loop when the reader delivers a non-`nil` ``AsyncReader/FinalElement``.
/// The returned value is that ``AsyncReader/FinalElement``.
///
/// ## Example
///
/// ```swift
/// var fileReader: FileAsyncReader = ...
///
/// _ = try await fileReader.forEachBuffer { buffer in
/// print("Processing \(buffer.count) elements")
/// }
/// ```
///
/// - Parameter body: An asynchronous closure that processes each buffer of
/// elements read from the stream.
/// - Returns: The ``AsyncReader/FinalElement`` delivered with the terminal
/// chunk, or `nil` if none was observed.
/// - Throws: An `EitherError` containing either a `ReadFailure` from the
/// read operation or a `Failure` from the body closure.
public consuming func forEachBuffer<Failure: Error>(
body: (inout Buffer) async throws(Failure) -> Void
) async throws(EitherError<ReadFailure, Failure>) -> FinalElement? {
var final: FinalElement? = nil
var done = false
while !done {
try await self.read { (next, finalElement) throws(Failure) -> Void in
try await body(&next)
if let finalElement {
final = finalElement
done = true
}
}
}
return final
}
/// Iterates over all chunks from a non-failing reader, executing the
/// provided body for each buffer until the stream signals end-of-stream.
///
/// Use this overload when the reader's ``AsyncReader/ReadFailure`` type is `Never`.
///
/// ## Example
///
/// ```swift
/// var fileReader: FileAsyncReader = ...
///
/// _ = await fileReader.forEachBuffer { buffer in
/// print("Processing \(buffer.count) elements")
/// }
/// ```
///
/// - Parameter body: An asynchronous closure that processes each buffer of
/// elements read from the stream.
/// - Returns: The ``AsyncReader/FinalElement`` delivered with the terminal chunk.
@inlinable
public consuming func forEachBuffer(
body: (inout Buffer) async -> Void
) async -> FinalElement where ReadFailure == Never {
var finalElement: FinalElement? = nil
while finalElement == nil {
do {
try await self.read { (next, final) -> Void in
await body(&next)
if let final {
finalElement = final
}
}
} catch {
fatalError()
}
}
// The force-unwrap is safe since final element must be set at this point
return finalElement!
}
}
#endif