Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 121 additions & 87 deletions Sources/AsyncStreaming/AsyncReader/AsyncReader+collect.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,114 +12,148 @@
#if UnstableAsyncStreaming && compiler(>=6.4)

public import ContainersPreview
import BasicContainers

/// An error indicating that the reader produced more elements than the specified collection limit.
///
/// This error occurs when calling ``AsyncReader/collect(upTo:body:)`` and the reader's buffer
/// contains more elements than the allowed limit.
/// An error that indicates the reader produced more elements than the
/// destination container could accept.
public struct AsyncReaderLeftOverElementsError: Error, Hashable {
public init() {}
}

/// An error that indicates the reader signaled end-of-stream before producing
/// enough elements to fill the destination container.
public struct AsyncReaderInsufficientElementsError: Error, Hashable {
public init() {}
}

@available(macOS 10.14.4, iOS 12.2, watchOS 5.2, tvOS 12.2, *)
extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Copyable {
/// Collects elements from the reader up to a specified limit and processes them.
///
/// This method continuously reads elements from the async reader, accumulating them in an
/// internal buffer until either the reader signals end-of-stream (by delivering a
/// non-`nil` ``AsyncReader/FinalElement``) or the specified limit is reached.
/// Once collection completes, it passes the accumulated elements to the provided body
/// closure as an `InputSpan` for processing, and returns the body's result together
/// with the ``AsyncReader/FinalElement``.
/// Collects elements from the reader into the provided container, up to the
/// container's available space.
///
/// - Parameters:
/// - limit: The maximum number of elements to collect. This prevents unbounded memory
/// growth when reading from potentially infinite streams.
/// - body: A closure that receives an `InputSpan` containing all collected elements and returns
/// a result of type `Result`.
///
/// - Returns: A tuple of the body closure's result and the ``AsyncReader/FinalElement``
/// delivered with the terminal chunk.
/// Reads chunks from the reader and moves them into `target` until the reader
/// signals end-of-stream. The reader can deliver fewer elements than
/// `target.freeCapacity`; the container needn't be full when the stream ends.
/// If the reader produces more elements than `target` can accept, the method
/// throws ``AsyncReaderLeftOverElementsError``.
///
/// - Throws: An `EitherError` wrapping either a read failure (which itself may be an
/// ``AsyncReaderLeftOverElementsError`` if the reader produces more elements than the limit),
/// or a `Failure` from the body closure.
///
/// ## Example
/// - Parameter target: The container that receives the collected elements.
/// The method preserves the container's existing contents and appends
/// collected elements to the end.
/// - Returns: The ``AsyncReader/FinalElement`` delivered with the terminal chunk.
/// - Throws: An `EitherError` wrapping either a ``AsyncReader/ReadFailure`` or
/// an ``AsyncReaderLeftOverElementsError`` if the reader produced more
/// elements than `target` could accept.
public consuming func collect<Container: RangeReplaceableContainer<ReadElement> & ~Copyable & ~Escapable>(
into target: inout Container
) async throws(EitherError<ReadFailure, AsyncReaderLeftOverElementsError>) -> FinalElement {
var reader = self
var finalElement: FinalElement? = nil
while finalElement == nil {
try await reader.read { (buffer, final) throws(AsyncReaderLeftOverElementsError) -> Void in
if buffer.count > target.freeCapacity {
throw AsyncReaderLeftOverElementsError()
}
target.append(moving: buffer.startIndex..<buffer.endIndex, from: &buffer)
if let final {
finalElement = final
}
}
}
// The force-unwrap is safe since final element must be set at this point
return finalElement!
}

/// Collects elements from the reader into the provided container, requiring
/// that the reader fill the container exactly.
///
/// ```swift
/// var reader: SomeAsyncReader = ...
/// Reads chunks from the reader and moves them into `target` until either the
/// reader signals end-of-stream or the container becomes full. The reader
/// must produce exactly `target.freeCapacity` elements. If it produces fewer
/// before signaling end-of-stream, the method throws
/// ``AsyncReaderInsufficientElementsError``. If it produces more, it throws
/// ``AsyncReaderLeftOverElementsError``.
///
/// let (processedData, _) = try await reader.collect(upTo: 1000) { span in
/// // Process all collected elements
/// }
/// ```
// TODO: We should make this method take an inout `RangeReplacableCollection` instead
public consuming func collect<Result, Failure: Error>(
upTo limit: Int,
body: (consuming InputSpan<ReadElement>) async throws(Failure) -> Result
) async throws(EitherError<EitherError<ReadFailure, AsyncReaderLeftOverElementsError>, Failure>) -> (
Result, FinalElement
) {
/// - Parameter target: The container to fill exactly. The method appends
/// collected elements to the container's existing contents.
/// - Returns: The ``AsyncReader/FinalElement`` delivered with the terminal chunk.
/// - Throws: An `EitherError` wrapping either a ``AsyncReader/ReadFailure``
/// or an `EitherError` wrapping
/// ``AsyncReaderLeftOverElementsError`` (too many elements) or
/// ``AsyncReaderInsufficientElementsError`` (too few elements).
public consuming func collect<Container: RangeReplaceableContainer<ReadElement> & ~Copyable & ~Escapable>(
exactlyInto target: inout Container
) async throws(EitherError<
ReadFailure, EitherError<AsyncReaderLeftOverElementsError, AsyncReaderInsufficientElementsError>
>) -> FinalElement {
var reader = self
// TODO: In the future we might want to use a temporary allocation instead
// but those don't support async closures yet.
var collectedBuffer = UniqueArray<ReadElement>()
collectedBuffer.reserveCapacity(limit)
var finalElement: FinalElement? = nil
do {
while finalElement == nil {
try await reader.read {
(buffer: inout Buffer, final: FinalElement?) throws(AsyncReaderLeftOverElementsError) -> Void in
if buffer.count > 0 {
if limit - collectedBuffer.count < buffer.count {
throw AsyncReaderLeftOverElementsError()
}
var consumer = buffer.consumeAll()
while let element = consumer.next() {
collectedBuffer.append(element)
}
}
if let final {
finalElement = final
}
while finalElement == nil {
try await reader.read {
(
buffer,
final
) throws(EitherError<AsyncReaderLeftOverElementsError, AsyncReaderInsufficientElementsError>) -> Void in
if buffer.count > target.freeCapacity {
throw .first(AsyncReaderLeftOverElementsError())
}
if final != nil, buffer.count < target.freeCapacity {
throw .second(AsyncReaderInsufficientElementsError())
}
target.append(moving: buffer.startIndex..<buffer.endIndex, from: &buffer)
if let final {
finalElement = final
}
}
} catch {
throw .first(error)
}
do {
var consumer = collectedBuffer.consumeAll()
let result = try await body(consumer.drainNext())
// The force-unwrap is safe since final element must be set at this point
return (result, finalElement!)
} catch {
throw .second(error)
}
// The force-unwrap is safe since final element must be set at this point
return finalElement!
}
}

@available(macOS 10.14.4, iOS 12.2, watchOS 5.2, tvOS 12.2, *)
extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Copyable, FinalElement == Void {
/// Collects elements from the reader up to a specified limit and processes them.
/// Collects elements from the reader into the provided dynamic container,
/// growing it up to the specified maximum size.
///
/// This overload is available when ``AsyncReader/FinalElement`` is `Void`.
/// It returns only the body closure's result — there is no payload to surface.
/// Reads chunks from the reader and appends them to `target`, which grows as
/// elements arrive. The reader can deliver fewer than `maximumSize` elements
/// before signaling end-of-stream; if it delivers more, the method throws
/// ``AsyncReaderLeftOverElementsError``.
///
/// - Parameters:
/// - limit: The maximum number of elements to collect.
/// - body: A closure that receives an `InputSpan` of collected elements.
/// - Returns: The body closure's result.
/// - Throws: An `EitherError` wrapping either a read failure (possibly an
/// ``AsyncReaderLeftOverElementsError``) or a `Failure` from `body`.
// TODO: We should make this method take an inout `RangeReplacableCollection` instead
public consuming func collect<Result, Failure: Error>(
upTo limit: Int,
body: (consuming InputSpan<ReadElement>) async throws(Failure) -> Result
) async throws(EitherError<EitherError<ReadFailure, AsyncReaderLeftOverElementsError>, Failure>) -> Result {
let (result, _): (Result, Void?) = try await self.collect(upTo: limit, body: body)
return result
/// - target: The dynamic container that receives the collected elements.
/// The method appends collected elements to the container's existing
/// contents.
/// - maximumSize: The maximum number of elements to append to `target`.
/// - Returns: The ``AsyncReader/FinalElement`` delivered with the terminal chunk.
/// - Throws: An `EitherError` wrapping either a ``AsyncReader/ReadFailure`` or
/// an ``AsyncReaderLeftOverElementsError`` if the reader produced more than
/// `maximumSize` elements.
public consuming func collect<Container: DynamicContainer<ReadElement> & ~Copyable>(
into target: inout Container,
maximumSize: Int
) async throws(EitherError<ReadFailure, AsyncReaderLeftOverElementsError>) -> FinalElement {
precondition(maximumSize >= 0, "maximumSize must be non-negative")
var reader = self
var finalElement: FinalElement? = nil
var remaining = maximumSize
while finalElement == nil {
try await reader.read { (buffer, final) throws(AsyncReaderLeftOverElementsError) -> Void in
let chunkCount = buffer.count
if chunkCount > remaining {
throw AsyncReaderLeftOverElementsError()
}
if chunkCount > 0 {
var consumer = buffer.consumeAll()
while let element = consumer.next() {
target.append(element)
}
remaining -= chunkCount
}
if let final {
finalElement = final
}
}
}
// The force-unwrap is safe since final element must be set at this point
return finalElement!
}
}

Expand Down
50 changes: 36 additions & 14 deletions Sources/AsyncStreaming/NNNN-async-streaming.md
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,7 @@ core protocols are established.

Two common patterns emerge immediately when working with `AsyncReader`:
iterating over all chunks until the stream ends, and collecting elements into
a buffer up to a specified limit. We envision convenience extensions for both,
a caller-provided container. We envision convenience extensions for both,
both of which surface the `FinalElement` payload to the caller:

```swift
Expand All @@ -721,23 +721,45 @@ extension AsyncReader where Self: ~Copyable, Self: ~Escapable {
}

extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Copyable {
/// Collects up to `limit` elements, then passes the accumulated elements
/// to `body` as an `InputSpan`. Returns the body's result together with
/// the `FinalElement`.
public consuming func collect<Result, Failure: Error>(
upTo limit: Int,
body: (consuming InputSpan<ReadElement>) async throws(Failure) -> Result
) async throws(EitherError<EitherError<ReadFailure, AsyncReaderLeftOverElementsError>, Failure>) -> (Result, FinalElement)
/// Collects elements into `target` up to its free capacity. The reader
/// can deliver fewer; producing more throws.
public consuming func collect<
Container: RangeReplaceableContainer<ReadElement> & ~Copyable & ~Escapable
>(
into target: inout Container
) async throws(EitherError<ReadFailure, AsyncReaderLeftOverElementsError>) -> FinalElement

/// Collects exactly `target.freeCapacity` elements into `target`. Producing
/// fewer throws ``AsyncReaderInsufficientElementsError``; producing more
/// throws ``AsyncReaderLeftOverElementsError``.
public consuming func collect<
Container: RangeReplaceableContainer<ReadElement> & ~Copyable & ~Escapable
>(
exactlyInto target: inout Container
) async throws(
EitherError<ReadFailure, EitherError<AsyncReaderLeftOverElementsError, AsyncReaderInsufficientElementsError>>
) -> FinalElement

/// Collects elements into a dynamic container, growing it up to
/// `maximumSize`. Producing more than `maximumSize` throws.
public consuming func collect<
Container: DynamicContainer<ReadElement> & ~Copyable
>(
into target: inout Container,
maximumSize: Int
) async throws(EitherError<ReadFailure, AsyncReaderLeftOverElementsError>) -> FinalElement
}
```

`forEachBuffer` provides a simple way to consume an entire stream without
manually looping over `read` calls and threading the end signal. `collect`
accumulates elements from multiple reads into a single buffer before
processing, which is useful when an algorithm needs all data in contiguous
memory (for example, parsing a complete message frame). A second
`where FinalElement == Void` overload of `collect` lets callers that don't
need the payload omit the tuple.
manually looping over `read` calls and threading the end signal. The three
`collect` overloads cover the common ways a caller wants to gather a stream's
elements into an existing container: `collect(into:)` opportunistically fills
a fixed-capacity container, `collect(exactlyInto:)` requires the reader to
match the container's free capacity (useful when parsing a known-length
frame), and `collect(into:maximumSize:)` grows a dynamic container with an
upper bound. All three preserve the container's existing contents and append
collected elements to the end.

These helpers are intentionally excluded from this proposal because their
error-handling shapes (particularly `collect`'s nested `EitherError` and the
Expand Down
Loading
Loading