Skip to content

Commit 90a5e45

Browse files
committed
[AsyncStreaming] Add support for final elements
## Motivation Real transports deliver structured data alongside end-of-stream and need to fuse the last write with the close. HTTP trailers and gRPC status are the obvious cases. Neither can be expressed by an empty-buffer terminator, and both lose H2/H3/QUIC's DATA+END_STREAM coalescing without a fused call. ## Modification Adds `FinalElement: ~Copyable = Void` as a primary associated type on all four protocols. `AsyncReader` delivers it via a `consuming FinalElement?` closure parameter; `CallerAsyncReader` returns it. Both writers gain a consuming `finish` carrying the last chunk and the payload in one call. `forEachBuffer`, `collect`, and the `pipe` variants thread the payload through. `collect` becomes consuming and gains a `Void`-final overload returning just the result. Adds an "Alternatives considered" entry covering what would break without this: HTTP body, gRPC, and fused close on H2/H3/QUIC. ## Result The four protocols can back HTTP body, gRPC streaming, and similar shapes without giving up the fused close. Default `Void` keeps simple conformers unchanged; `Never` marks infinite streams. Custom and `~Copyable` payloads work end to end.
1 parent 3bd2de0 commit 90a5e45

20 files changed

Lines changed: 993 additions & 520 deletions

Sources/AsyncStreaming/AsyncReader/AsyncReader+collect.swift

Lines changed: 53 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,20 @@ extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Cop
2727
/// Collects elements from the reader up to a specified limit and processes them.
2828
///
2929
/// This method continuously reads elements from the async reader, accumulating them in an
30-
/// internal buffer until either it reaches the end of the stream or the specified limit.
30+
/// internal buffer until either the reader signals end-of-stream (by delivering a
31+
/// non-`nil` ``AsyncReader/FinalElement``) or the specified limit is reached.
3132
/// Once collection completes, it passes the accumulated elements to the provided body
32-
/// closure as an `InputSpan` for processing.
33+
/// closure as an `InputSpan` for processing, and returns the body's result together
34+
/// with the ``AsyncReader/FinalElement``.
3335
///
3436
/// - Parameters:
3537
/// - limit: The maximum number of elements to collect. This prevents unbounded memory
3638
/// growth when reading from potentially infinite streams.
3739
/// - body: A closure that receives an `InputSpan` containing all collected elements and returns
3840
/// a result of type `Result`.
3941
///
40-
/// - Returns: The value returned by the body closure after processing the collected elements.
42+
/// - Returns: A tuple of the body closure's result and the ``AsyncReader/FinalElement``
43+
/// delivered with the terminal chunk.
4144
///
4245
/// - Throws: An `EitherError` wrapping either a read failure (which itself may be an
4346
/// ``AsyncReaderLeftOverElementsError`` if the reader produces more elements than the limit),
@@ -48,32 +51,38 @@ extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Cop
4851
/// ```swift
4952
/// var reader: SomeAsyncReader = ...
5053
///
51-
/// let processedData = try await reader.collect(upTo: 1000) { span in
54+
/// let (processedData, _) = try await reader.collect(upTo: 1000) { span in
5255
/// // Process all collected elements
5356
/// }
5457
/// ```
55-
public mutating func collect<Result, Failure: Error>(
58+
// TODO: We should make this method take an inout `RangeReplacableCollection` instead
59+
public consuming func collect<Result, Failure: Error>(
5660
upTo limit: Int,
5761
body: (consuming InputSpan<ReadElement>) async throws(Failure) -> Result
58-
) async throws(EitherError<EitherError<ReadFailure, AsyncReaderLeftOverElementsError>, Failure>) -> Result {
62+
) async throws(EitherError<EitherError<ReadFailure, AsyncReaderLeftOverElementsError>, Failure>) -> (
63+
Result, FinalElement
64+
) {
65+
var reader = self
5966
// TODO: In the future we might want to use a temporary allocation instead
6067
// but those don't support async closures yet.
6168
var collectedBuffer = UniqueArray<ReadElement>()
6269
collectedBuffer.reserveCapacity(limit)
63-
var shouldContinue = true
70+
var finalElement: FinalElement? = nil
6471
do {
65-
while shouldContinue {
66-
try await self.read { (buffer: inout Buffer) throws(AsyncReaderLeftOverElementsError) -> Void in
67-
guard buffer.count > 0 else {
68-
shouldContinue = false
69-
return
70-
}
71-
if limit - collectedBuffer.count < buffer.count {
72-
throw AsyncReaderLeftOverElementsError()
72+
while finalElement == nil {
73+
try await reader.read {
74+
(buffer: inout Buffer, final: FinalElement?) throws(AsyncReaderLeftOverElementsError) -> Void in
75+
if buffer.count > 0 {
76+
if limit - collectedBuffer.count < buffer.count {
77+
throw AsyncReaderLeftOverElementsError()
78+
}
79+
var consumer = buffer.consumeAll()
80+
while let element = consumer.next() {
81+
collectedBuffer.append(element)
82+
}
7383
}
74-
var consumer = buffer.consumeAll()
75-
while let element = consumer.next() {
76-
collectedBuffer.append(element)
84+
if let final {
85+
finalElement = final
7786
}
7887
}
7988
}
@@ -82,11 +91,36 @@ extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Cop
8291
}
8392
do {
8493
var consumer = collectedBuffer.consumeAll()
85-
return try await body(consumer.drainNext())
94+
let result = try await body(consumer.drainNext())
95+
// The force-unwrap is safe since final element must be set at this point
96+
return (result, finalElement!)
8697
} catch {
8798
throw .second(error)
8899
}
89100
}
90101
}
91102

103+
@available(macOS 10.14.4, iOS 12.2, watchOS 5.2, tvOS 12.2, *)
104+
extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Copyable, FinalElement == Void {
105+
/// Collects elements from the reader up to a specified limit and processes them.
106+
///
107+
/// This overload is available when ``AsyncReader/FinalElement`` is `Void`.
108+
/// It returns only the body closure's result — there is no payload to surface.
109+
///
110+
/// - Parameters:
111+
/// - limit: The maximum number of elements to collect.
112+
/// - body: A closure that receives an `InputSpan` of collected elements.
113+
/// - Returns: The body closure's result.
114+
/// - Throws: An `EitherError` wrapping either a read failure (possibly an
115+
/// ``AsyncReaderLeftOverElementsError``) or a `Failure` from `body`.
116+
// TODO: We should make this method take an inout `RangeReplacableCollection` instead
117+
public consuming func collect<Result, Failure: Error>(
118+
upTo limit: Int,
119+
body: (consuming InputSpan<ReadElement>) async throws(Failure) -> Result
120+
) async throws(EitherError<EitherError<ReadFailure, AsyncReaderLeftOverElementsError>, Failure>) -> Result {
121+
let (result, _): (Result, Void?) = try await self.collect(upTo: limit, body: body)
122+
return result
123+
}
124+
}
125+
92126
#endif

Sources/AsyncStreaming/AsyncReader/AsyncReader+forEach.swift

Lines changed: 41 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -11,87 +11,89 @@
1111

1212
#if UnstableAsyncStreaming && compiler(>=6.4)
1313

14-
public import ContainersPreview
14+
import ContainersPreview
1515

1616
// swift-format-ignore: AmbiguousTrailingClosureOverload
1717
@available(macOS 10.14.4, iOS 12.2, watchOS 5.2, tvOS 12.2, *)
1818
extension AsyncReader where Self: ~Copyable, Self: ~Escapable {
19-
/// Iterates over all chunks from the reader, executing the provided body for each buffer.
19+
/// Iterates over all chunks from the reader, executing the provided body for
20+
/// each buffer until the stream signals end-of-stream.
2021
///
21-
/// This method continuously reads chunks from the async reader until the stream ends,
22-
/// executing the provided closure for each buffer of elements read. The iteration terminates
23-
/// when the reader produces an empty buffer, indicating the end of the stream.
24-
///
25-
/// - Parameter body: An asynchronous closure that processes each buffer of elements read
26-
/// from the stream.
27-
///
28-
/// - Throws: An `EitherError` containing either a `ReadFailure` from the read operation
29-
/// or a `Failure` from the body closure.
22+
/// This method continuously reads chunks from the async reader, executing
23+
/// `body` for every chunk — including the terminal one — and terminates the
24+
/// loop when the reader delivers a non-`nil` ``AsyncReader/FinalElement``.
25+
/// The returned value is that ``AsyncReader/FinalElement``.
3026
///
3127
/// ## Example
3228
///
3329
/// ```swift
3430
/// var fileReader: FileAsyncReader = ...
3531
///
36-
/// try await fileReader.forEachBuffer { buffer in
32+
/// _ = try await fileReader.forEachBuffer { buffer in
3733
/// print("Processing \(buffer.count) elements")
3834
/// }
3935
/// ```
36+
///
37+
/// - Parameter body: An asynchronous closure that processes each buffer of
38+
/// elements read from the stream.
39+
/// - Returns: The ``AsyncReader/FinalElement`` delivered with the terminal
40+
/// chunk, or `nil` if none was observed.
41+
/// - Throws: An `EitherError` containing either a `ReadFailure` from the
42+
/// read operation or a `Failure` from the body closure.
4043
public consuming func forEachBuffer<Failure: Error>(
4144
body: (inout Buffer) async throws(Failure) -> Void
42-
) async throws(EitherError<ReadFailure, Failure>) {
43-
var shouldContinue = true
44-
while shouldContinue {
45-
try await self.read { (next) throws(Failure) -> Void in
46-
guard next.count > 0 else {
47-
shouldContinue = false
48-
return
49-
}
50-
45+
) async throws(EitherError<ReadFailure, Failure>) -> FinalElement? {
46+
var final: FinalElement? = nil
47+
var done = false
48+
while !done {
49+
try await self.read { (next, finalElement) throws(Failure) -> Void in
5150
try await body(&next)
51+
if let finalElement {
52+
final = finalElement
53+
done = true
54+
}
5255
}
5356
}
57+
return final
5458
}
5559

56-
/// Iterates over all chunks from a non-failing reader, executing the provided body for each buffer.
57-
///
58-
/// This method continuously reads chunks from the async reader until the stream ends,
59-
/// executing the provided closure for each buffer of elements read. The iteration terminates
60-
/// when the reader produces an empty buffer, indicating the end of the stream.
60+
/// Iterates over all chunks from a non-failing reader, executing the
61+
/// provided body for each buffer until the stream signals end-of-stream.
6162
///
6263
/// Use this overload when the reader's ``AsyncReader/ReadFailure`` type is `Never`.
6364
///
64-
/// - Parameter body: An asynchronous closure that processes each buffer of elements read
65-
/// from the stream.
66-
///
6765
/// ## Example
6866
///
6967
/// ```swift
7068
/// var fileReader: FileAsyncReader = ...
7169
///
72-
/// await fileReader.forEachBuffer { buffer in
70+
/// _ = await fileReader.forEachBuffer { buffer in
7371
/// print("Processing \(buffer.count) elements")
7472
/// }
7573
/// ```
74+
///
75+
/// - Parameter body: An asynchronous closure that processes each buffer of
76+
/// elements read from the stream.
77+
/// - Returns: The ``AsyncReader/FinalElement`` delivered with the terminal chunk.
7678
@inlinable
7779
public consuming func forEachBuffer(
7880
body: (inout Buffer) async -> Void
79-
) async where ReadFailure == Never {
80-
var shouldContinue = true
81-
while shouldContinue {
81+
) async -> FinalElement where ReadFailure == Never {
82+
var finalElement: FinalElement? = nil
83+
while finalElement == nil {
8284
do {
83-
try await self.read { (next) -> Void in
84-
guard next.count > 0 else {
85-
shouldContinue = false
86-
return
87-
}
88-
85+
try await self.read { (next, final) -> Void in
8986
await body(&next)
87+
if let final {
88+
finalElement = final
89+
}
9090
}
9191
} catch {
9292
fatalError()
9393
}
9494
}
95+
// The force-unwrap is safe since final element must be set at this point
96+
return finalElement!
9597
}
9698
}
9799
#endif

0 commit comments

Comments
 (0)