diff --git a/Sources/AsyncStreaming/AsyncReader/AsyncReader+collect.swift b/Sources/AsyncStreaming/AsyncReader/AsyncReader+collect.swift index 3eaf9dce..885eb90e 100644 --- a/Sources/AsyncStreaming/AsyncReader/AsyncReader+collect.swift +++ b/Sources/AsyncStreaming/AsyncReader/AsyncReader+collect.swift @@ -12,114 +12,148 @@ #if UnstableAsyncStreaming && compiler(>=6.4) public import ContainersPreview -import BasicContainers -/// An error indicating that the reader produced more elements than the specified collection limit. -/// -/// This error occurs when calling ``AsyncReader/collect(upTo:body:)`` and the reader's buffer -/// contains more elements than the allowed limit. +/// An error that indicates the reader produced more elements than the +/// destination container could accept. public struct AsyncReaderLeftOverElementsError: Error, Hashable { public init() {} } +/// An error that indicates the reader signaled end-of-stream before producing +/// enough elements to fill the destination container. +public struct AsyncReaderInsufficientElementsError: Error, Hashable { + public init() {} +} + @available(macOS 10.14.4, iOS 12.2, watchOS 5.2, tvOS 12.2, *) extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Copyable { - /// Collects elements from the reader up to a specified limit and processes them. - /// - /// This method continuously reads elements from the async reader, accumulating them in an - /// internal buffer until either the reader signals end-of-stream (by delivering a - /// non-`nil` ``AsyncReader/FinalElement``) or the specified limit is reached. - /// Once collection completes, it passes the accumulated elements to the provided body - /// closure as an `InputSpan` for processing, and returns the body's result together - /// with the ``AsyncReader/FinalElement``. + /// Collects elements from the reader into the provided container, up to the + /// container's available space. /// - /// - Parameters: - /// - limit: The maximum number of elements to collect. This prevents unbounded memory - /// growth when reading from potentially infinite streams. - /// - body: A closure that receives an `InputSpan` containing all collected elements and returns - /// a result of type `Result`. - /// - /// - Returns: A tuple of the body closure's result and the ``AsyncReader/FinalElement`` - /// delivered with the terminal chunk. + /// Reads chunks from the reader and moves them into `target` until the reader + /// signals end-of-stream. The reader can deliver fewer elements than + /// `target.freeCapacity`; the container needn't be full when the stream ends. + /// If the reader produces more elements than `target` can accept, the method + /// throws ``AsyncReaderLeftOverElementsError``. /// - /// - Throws: An `EitherError` wrapping either a read failure (which itself may be an - /// ``AsyncReaderLeftOverElementsError`` if the reader produces more elements than the limit), - /// or a `Failure` from the body closure. - /// - /// ## Example + /// - Parameter target: The container that receives the collected elements. + /// The method preserves the container's existing contents and appends + /// collected elements to the end. + /// - Returns: The ``AsyncReader/FinalElement`` delivered with the terminal chunk. + /// - Throws: An `EitherError` wrapping either a ``AsyncReader/ReadFailure`` or + /// an ``AsyncReaderLeftOverElementsError`` if the reader produced more + /// elements than `target` could accept. + public consuming func collect & ~Copyable & ~Escapable>( + into target: inout Container + ) async throws(EitherError) -> FinalElement { + var reader = self + var finalElement: FinalElement? = nil + while finalElement == nil { + try await reader.read { (buffer, final) throws(AsyncReaderLeftOverElementsError) -> Void in + if buffer.count > target.freeCapacity { + throw AsyncReaderLeftOverElementsError() + } + target.append(moving: buffer.startIndex..( - upTo limit: Int, - body: (consuming InputSpan) async throws(Failure) -> Result - ) async throws(EitherError, Failure>) -> ( - Result, FinalElement - ) { + /// - Parameter target: The container to fill exactly. The method appends + /// collected elements to the container's existing contents. + /// - Returns: The ``AsyncReader/FinalElement`` delivered with the terminal chunk. + /// - Throws: An `EitherError` wrapping either a ``AsyncReader/ReadFailure`` + /// or an `EitherError` wrapping + /// ``AsyncReaderLeftOverElementsError`` (too many elements) or + /// ``AsyncReaderInsufficientElementsError`` (too few elements). + public consuming func collect & ~Copyable & ~Escapable>( + exactlyInto target: inout Container + ) async throws(EitherError< + ReadFailure, EitherError + >) -> FinalElement { var reader = self - // TODO: In the future we might want to use a temporary allocation instead - // but those don't support async closures yet. - var collectedBuffer = UniqueArray() - collectedBuffer.reserveCapacity(limit) var finalElement: FinalElement? = nil - do { - while finalElement == nil { - try await reader.read { - (buffer: inout Buffer, final: FinalElement?) throws(AsyncReaderLeftOverElementsError) -> Void in - if buffer.count > 0 { - if limit - collectedBuffer.count < buffer.count { - throw AsyncReaderLeftOverElementsError() - } - var consumer = buffer.consumeAll() - while let element = consumer.next() { - collectedBuffer.append(element) - } - } - if let final { - finalElement = final - } + while finalElement == nil { + try await reader.read { + ( + buffer, + final + ) throws(EitherError) -> Void in + if buffer.count > target.freeCapacity { + throw .first(AsyncReaderLeftOverElementsError()) + } + if final != nil, buffer.count < target.freeCapacity { + throw .second(AsyncReaderInsufficientElementsError()) + } + target.append(moving: buffer.startIndex..( - upTo limit: Int, - body: (consuming InputSpan) async throws(Failure) -> Result - ) async throws(EitherError, Failure>) -> Result { - let (result, _): (Result, Void?) = try await self.collect(upTo: limit, body: body) - return result + /// - target: The dynamic container that receives the collected elements. + /// The method appends collected elements to the container's existing + /// contents. + /// - maximumSize: The maximum number of elements to append to `target`. + /// - Returns: The ``AsyncReader/FinalElement`` delivered with the terminal chunk. + /// - Throws: An `EitherError` wrapping either a ``AsyncReader/ReadFailure`` or + /// an ``AsyncReaderLeftOverElementsError`` if the reader produced more than + /// `maximumSize` elements. + public consuming func collect & ~Copyable>( + into target: inout Container, + maximumSize: Int + ) async throws(EitherError) -> FinalElement { + precondition(maximumSize >= 0, "maximumSize must be non-negative") + var reader = self + var finalElement: FinalElement? = nil + var remaining = maximumSize + while finalElement == nil { + try await reader.read { (buffer, final) throws(AsyncReaderLeftOverElementsError) -> Void in + let chunkCount = buffer.count + if chunkCount > remaining { + throw AsyncReaderLeftOverElementsError() + } + if chunkCount > 0 { + var consumer = buffer.consumeAll() + while let element = consumer.next() { + target.append(element) + } + remaining -= chunkCount + } + if let final { + finalElement = final + } + } + } + // The force-unwrap is safe since final element must be set at this point + return finalElement! } } diff --git a/Sources/AsyncStreaming/NNNN-async-streaming.md b/Sources/AsyncStreaming/NNNN-async-streaming.md index 1c1074c2..fa1faab5 100644 --- a/Sources/AsyncStreaming/NNNN-async-streaming.md +++ b/Sources/AsyncStreaming/NNNN-async-streaming.md @@ -707,7 +707,7 @@ core protocols are established. Two common patterns emerge immediately when working with `AsyncReader`: iterating over all chunks until the stream ends, and collecting elements into -a buffer up to a specified limit. We envision convenience extensions for both, +a caller-provided container. We envision convenience extensions for both, both of which surface the `FinalElement` payload to the caller: ```swift @@ -721,23 +721,45 @@ extension AsyncReader where Self: ~Copyable, Self: ~Escapable { } extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Copyable { - /// Collects up to `limit` elements, then passes the accumulated elements - /// to `body` as an `InputSpan`. Returns the body's result together with - /// the `FinalElement`. - public consuming func collect( - upTo limit: Int, - body: (consuming InputSpan) async throws(Failure) -> Result - ) async throws(EitherError, Failure>) -> (Result, FinalElement) + /// Collects elements into `target` up to its free capacity. The reader + /// can deliver fewer; producing more throws. + public consuming func collect< + Container: RangeReplaceableContainer & ~Copyable & ~Escapable + >( + into target: inout Container + ) async throws(EitherError) -> FinalElement + + /// Collects exactly `target.freeCapacity` elements into `target`. Producing + /// fewer throws ``AsyncReaderInsufficientElementsError``; producing more + /// throws ``AsyncReaderLeftOverElementsError``. + public consuming func collect< + Container: RangeReplaceableContainer & ~Copyable & ~Escapable + >( + exactlyInto target: inout Container + ) async throws( + EitherError> + ) -> FinalElement + + /// Collects elements into a dynamic container, growing it up to + /// `maximumSize`. Producing more than `maximumSize` throws. + public consuming func collect< + Container: DynamicContainer & ~Copyable + >( + into target: inout Container, + maximumSize: Int + ) async throws(EitherError) -> FinalElement } ``` `forEachBuffer` provides a simple way to consume an entire stream without -manually looping over `read` calls and threading the end signal. `collect` -accumulates elements from multiple reads into a single buffer before -processing, which is useful when an algorithm needs all data in contiguous -memory (for example, parsing a complete message frame). A second -`where FinalElement == Void` overload of `collect` lets callers that don't -need the payload omit the tuple. +manually looping over `read` calls and threading the end signal. The three +`collect` overloads cover the common ways a caller wants to gather a stream's +elements into an existing container: `collect(into:)` opportunistically fills +a fixed-capacity container, `collect(exactlyInto:)` requires the reader to +match the container's free capacity (useful when parsing a known-length +frame), and `collect(into:maximumSize:)` grows a dynamic container with an +upper bound. All three preserve the container's existing contents and append +collected elements to the end. These helpers are intentionally excluded from this proposal because their error-handling shapes (particularly `collect`'s nested `EitherError` and the diff --git a/Tests/AsyncStreamingTests/AsyncReader/AsyncReader+collectTests.swift b/Tests/AsyncStreamingTests/AsyncReader/AsyncReader+collectTests.swift index f2de3087..04aebc92 100644 --- a/Tests/AsyncStreamingTests/AsyncReader/AsyncReader+collectTests.swift +++ b/Tests/AsyncStreamingTests/AsyncReader/AsyncReader+collectTests.swift @@ -17,82 +17,156 @@ import ContainersPreview import Testing @Suite -struct AsyncReaderCollectTests { +struct AsyncReaderCollectIntoTests { @Test - func collectAllElements() async throws { - let reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 5, copying: [1, 2, 3, 4, 5])) + func collectFillsTargetWhenReaderHasFewerElements() async throws { + let reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 3, copying: [1, 2, 3])) + var target = RigidArray(capacity: 10) - let result = try await reader.collect(upTo: 10) { span in - return Array(span) - } + try await reader.collect(into: &target) - #expect(result == [1, 2, 3, 4, 5]) + var collected: [Int] = [] + var c = target.consumeAll() + while let v = c.next() { collected.append(v) } + #expect(collected == [1, 2, 3]) } @Test - func collectWithExactLimit() async throws { + func collectFillsTargetWhenReaderHasExactlyFreeCapacityElements() async throws { + let reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 3, copying: [1, 2, 3])) + var target = RigidArray(capacity: 3) + + try await reader.collect(into: &target) + + #expect(target.count == 3) + } + + @Test + func collectThrowsWhenReaderProducesMoreThanFreeCapacity() async throws { let reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 5, copying: [1, 2, 3, 4, 5])) + var target = RigidArray(capacity: 2) - let result = try await reader.collect(upTo: 5) { span in - return Array(span) + do { + try await reader.collect(into: &target) + Issue.record("Expected error") + } catch { + let expected = EitherError.second(AsyncReaderLeftOverElementsError()) + #expect(error == expected) } - - #expect(result == [1, 2, 3, 4, 5]) } @Test - func collectEmptyReader() async throws { + func collectIntoEmptyReader() async throws { let reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 0, copying: [])) + var target = RigidArray(capacity: 5) - let result = try await reader.collect(upTo: 10) { span in - return span.count - } + try await reader.collect(into: &target) - #expect(result == 0) + #expect(target.count == 0) } +} +@Suite +struct AsyncReaderCollectExactlyTests { @Test - func collectProcessesAllElements() async throws { - let reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 3, copying: [10, 20, 30])) + func collectExactlyFillsTargetWhenReaderProducesExactCount() async throws { + let reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 3, copying: [1, 2, 3])) + var target = RigidArray(capacity: 3) - let result = try await reader.collect(upTo: 10) { span in - var sum = 0 - for i in span.indices { - sum += span[i] - } - return sum - } + try await reader.collect(exactlyInto: &target) - #expect(result == 60) + var collected: [Int] = [] + var c = target.consumeAll() + while let v = c.next() { collected.append(v) } + #expect(collected == [1, 2, 3]) } @Test - func collectVoidOverloadReturnsResultOnly() async throws { + func collectExactlyThrowsWhenReaderProducesFewer() async throws { let reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 3, copying: [1, 2, 3])) + var target = RigidArray(capacity: 5) - let result: [Int] = try await reader.collect(upTo: 10) { span in - return Array(span) + do { + try await reader.collect(exactlyInto: &target) + Issue.record("Expected error") + } catch { + let expected = EitherError< + Never, + EitherError + >.second(.second(AsyncReaderInsufficientElementsError())) + #expect(error == expected) } + } + + @Test + func collectExactlyThrowsWhenReaderProducesMore() async throws { + let reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 5, copying: [1, 2, 3, 4, 5])) + var target = RigidArray(capacity: 2) - #expect(result == [1, 2, 3]) + do { + try await reader.collect(exactlyInto: &target) + Issue.record("Expected error") + } catch { + let expected = EitherError< + Never, + EitherError + >.second(.first(AsyncReaderLeftOverElementsError())) + #expect(error == expected) + } } +} +@Suite +struct AsyncReaderCollectIntoMaximumSizeTests { @Test - func collectThrowsLeftOverElements() async throws { + func collectGrowsContainerWhenReaderHasFewerThanMaximum() async throws { let reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 3, copying: [1, 2, 3])) + var target = UniqueArray() + + try await reader.collect(into: &target, maximumSize: 10) + + var collected: [Int] = [] + var c = target.consumeAll() + while let v = c.next() { collected.append(v) } + #expect(collected == [1, 2, 3]) + } + + @Test + func collectFillsContainerExactlyAtMaximum() async throws { + let reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 3, copying: [10, 20, 30])) + var target = UniqueArray() + + try await reader.collect(into: &target, maximumSize: 3) + + #expect(target.count == 3) + } + + @Test + func collectThrowsWhenReaderProducesMoreThanMaximum() async throws { + let reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 5, copying: [1, 2, 3, 4, 5])) + var target = UniqueArray() do { - _ = try await reader.collect(upTo: 1) { (span) -> Int in - return span.count - } + try await reader.collect(into: &target, maximumSize: 2) Issue.record("Expected error") } catch { - let expected = EitherError, Never>.first( - .second(AsyncReaderLeftOverElementsError()) - ) + let expected = EitherError.second(AsyncReaderLeftOverElementsError()) #expect(error == expected) } } + + @Test + func collectAppendsToExistingContents() async throws { + let reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 2, copying: [3, 4])) + var target = UniqueArray(capacity: 2, copying: [1, 2]) + + try await reader.collect(into: &target, maximumSize: 10) + + var collected: [Int] = [] + var c = target.consumeAll() + while let v = c.next() { collected.append(v) } + #expect(collected == [1, 2, 3, 4]) + } } #endif diff --git a/Tests/AsyncStreamingTests/AsyncReader/AsyncReader+pipe.swift b/Tests/AsyncStreamingTests/AsyncReader/AsyncReader+pipe.swift index cf5c17be..ad80e6ad 100644 --- a/Tests/AsyncStreamingTests/AsyncReader/AsyncReader+pipe.swift +++ b/Tests/AsyncStreamingTests/AsyncReader/AsyncReader+pipe.swift @@ -36,14 +36,9 @@ struct AsyncReaderPipeTests { writerA.finish() try await readerB.pipe(into: writerB) - try await readerA.collect(upTo: 5) { span in - #expect(span.count == 5) - #expect(span[0] == 1) - #expect(span[1] == 2) - #expect(span[2] == 3) - #expect(span[3] == 4) - #expect(span[4] == 5) - } + var target = RigidArray(capacity: 5) + try await readerA.collect(exactlyInto: &target) + #expect(Array(draining: &target) == [1, 2, 3, 4, 5]) } } @@ -62,9 +57,9 @@ struct AsyncReaderPipeTests { writerA.finish() try await readerB.pipe(into: writerB) - try await readerA.collect(upTo: 5) { span in - #expect(span.count == 0) - } + var target = RigidArray(capacity: 5) + try await readerA.collect(into: &target) + #expect(target.count == 0) } } @@ -86,12 +81,9 @@ struct AsyncReaderPipeTests { writerA.finish() try await readerB.pipe(into: writerB) - try await readerA.collect(upTo: 50) { span in - #expect(span.count == 50) - for i in 0..<50 { - #expect(span[i] == elements[i]) - } - } + var target = RigidArray(capacity: 50) + try await readerA.collect(exactlyInto: &target) + #expect(Array(draining: &target) == elements) } } @@ -114,14 +106,9 @@ struct AsyncReaderPipeTests { writerA.finish() try await readerB.pipe(copyingInto: writerB.asAsyncWriter()) - try await readerA.collect(upTo: 5) { span in - #expect(span.count == 5) - #expect(span[0] == 1) - #expect(span[1] == 2) - #expect(span[2] == 3) - #expect(span[3] == 4) - #expect(span[4] == 5) - } + var target = RigidArray(capacity: 5) + try await readerA.collect(exactlyInto: &target) + #expect(Array(draining: &target) == [1, 2, 3, 4, 5]) } } @@ -140,9 +127,9 @@ struct AsyncReaderPipeTests { writerA.finish() try await readerB.pipe(copyingInto: writerB.asAsyncWriter()) - try await readerA.collect(upTo: 5) { span in - #expect(span.count == 0) - } + var target = RigidArray(capacity: 5) + try await readerA.collect(into: &target) + #expect(target.count == 0) } } @@ -166,12 +153,9 @@ struct AsyncReaderPipeTests { writerA.finish() try await readerB.pipe(copyingInto: writerB.asAsyncWriter(initialCapacity: 16)) - try await readerA.collect(upTo: 200) { span in - #expect(span.count == 200) - for i in 0..<200 { - #expect(span[i] == elements[i]) - } - } + var target = RigidArray(capacity: 200) + try await readerA.collect(exactlyInto: &target) + #expect(Array(draining: &target) == elements) } } } diff --git a/Tests/AsyncStreamingTests/AsyncWriter/AsyncWriterCallerAsyncWriterAdapterTests.swift b/Tests/AsyncStreamingTests/AsyncWriter/AsyncWriterCallerAsyncWriterAdapterTests.swift index ef349a02..917959ba 100644 --- a/Tests/AsyncStreamingTests/AsyncWriter/AsyncWriterCallerAsyncWriterAdapterTests.swift +++ b/Tests/AsyncStreamingTests/AsyncWriter/AsyncWriterCallerAsyncWriterAdapterTests.swift @@ -39,12 +39,9 @@ struct AsyncWriterCallerAsyncWriterAdapterTests { var empty = UniqueArray() try await callerWriter.finish(buffer: &empty, finalElement: ()) - try await readerA.collect(upTo: 5) { span in - #expect(span.count == 3) - #expect(span[0] == 1) - #expect(span[1] == 2) - #expect(span[2] == 3) - } + var target = RigidArray(capacity: 5) + try await readerA.collect(into: &target) + #expect(Array(draining: &target) == [1, 2, 3]) } } @@ -72,12 +69,9 @@ struct AsyncWriterCallerAsyncWriterAdapterTests { var empty = UniqueArray() try await callerWriter.finish(buffer: &empty, finalElement: ()) - try await readerA.collect(upTo: 100) { span in - #expect(span.count == 100) - for i in 0..<100 { - #expect(span[i] == elements[i]) - } - } + var target = RigidArray(capacity: 100) + try await readerA.collect(exactlyInto: &target) + #expect(Array(draining: &target) == elements) } } @@ -95,9 +89,9 @@ struct AsyncWriterCallerAsyncWriterAdapterTests { var empty = UniqueArray() try await callerWriter.finish(buffer: &empty, finalElement: ()) - try await readerA.collect(upTo: 5) { span in - #expect(span.count == 0) - } + var target = RigidArray(capacity: 5) + try await readerA.collect(into: &target) + #expect(target.count == 0) } } @@ -115,12 +109,9 @@ struct AsyncWriterCallerAsyncWriterAdapterTests { var trailing = UniqueArray(copying: [42, 43, 44]) try await callerWriter.finish(buffer: &trailing, finalElement: ()) - try await readerA.collect(upTo: 5) { span in - #expect(span.count == 3) - #expect(span[0] == 42) - #expect(span[1] == 43) - #expect(span[2] == 44) - } + var target = RigidArray(capacity: 5) + try await readerA.collect(into: &target) + #expect(Array(draining: &target) == [42, 43, 44]) } } } diff --git a/Tests/AsyncStreamingTests/CallerAsyncReader/CallerAsyncReader+pipe.swift b/Tests/AsyncStreamingTests/CallerAsyncReader/CallerAsyncReader+pipe.swift index 93092ff2..bb014a63 100644 --- a/Tests/AsyncStreamingTests/CallerAsyncReader/CallerAsyncReader+pipe.swift +++ b/Tests/AsyncStreamingTests/CallerAsyncReader/CallerAsyncReader+pipe.swift @@ -34,14 +34,9 @@ struct CallerAsyncReaderPipeTests { ) try await source.pipe(into: writerB.asAsyncWriter()) - try await readerA.collect(upTo: 5) { span in - #expect(span.count == 5) - #expect(span[0] == 1) - #expect(span[1] == 2) - #expect(span[2] == 3) - #expect(span[3] == 4) - #expect(span[4] == 5) - } + var target = RigidArray(capacity: 5) + try await readerA.collect(exactlyInto: &target) + #expect(Array(draining: &target) == [1, 2, 3, 4, 5]) } } @@ -58,9 +53,9 @@ struct CallerAsyncReaderPipeTests { let source = UniqueArrayCallerAsyncReader(storage: UniqueArray()) try await source.pipe(into: writerB.asAsyncWriter()) - try await readerA.collect(upTo: 5) { span in - #expect(span.count == 0) - } + var target = RigidArray(capacity: 5) + try await readerA.collect(into: &target) + #expect(target.count == 0) } } @@ -82,12 +77,9 @@ struct CallerAsyncReaderPipeTests { ) try await source.pipe(into: writerB.asAsyncWriter(initialCapacity: 16)) - try await readerA.collect(upTo: 200) { span in - #expect(span.count == 200) - for i in 0..<200 { - #expect(span[i] == elements[i]) - } - } + var target = RigidArray(capacity: 200) + try await readerA.collect(exactlyInto: &target) + #expect(Array(draining: &target) == elements) } } @@ -108,14 +100,9 @@ struct CallerAsyncReaderPipeTests { ) try await source.pipe(bufferingInto: writerB, intermediateCapacity: 16) - try await readerA.collect(upTo: 5) { span in - #expect(span.count == 5) - #expect(span[0] == 1) - #expect(span[1] == 2) - #expect(span[2] == 3) - #expect(span[3] == 4) - #expect(span[4] == 5) - } + var target = RigidArray(capacity: 5) + try await readerA.collect(exactlyInto: &target) + #expect(Array(draining: &target) == [1, 2, 3, 4, 5]) } } @@ -132,9 +119,9 @@ struct CallerAsyncReaderPipeTests { let source = UniqueArrayCallerAsyncReader(storage: UniqueArray()) try await source.pipe(bufferingInto: writerB, intermediateCapacity: 16) - try await readerA.collect(upTo: 5) { span in - #expect(span.count == 0) - } + var target = RigidArray(capacity: 5) + try await readerA.collect(into: &target) + #expect(target.count == 0) } } @@ -156,12 +143,9 @@ struct CallerAsyncReaderPipeTests { ) try await source.pipe(bufferingInto: writerB, intermediateCapacity: 16) - try await readerA.collect(upTo: 100) { span in - #expect(span.count == 100) - for i in 0..<100 { - #expect(span[i] == elements[i]) - } - } + var target = RigidArray(capacity: 100) + try await readerA.collect(exactlyInto: &target) + #expect(Array(draining: &target) == elements) } } } diff --git a/Tests/AsyncStreamingTests/CallerAsyncWriter/CallerAsyncWriterAsyncWriterAdapterTests.swift b/Tests/AsyncStreamingTests/CallerAsyncWriter/CallerAsyncWriterAsyncWriterAdapterTests.swift index a5f999de..8043c6de 100644 --- a/Tests/AsyncStreamingTests/CallerAsyncWriter/CallerAsyncWriterAsyncWriterAdapterTests.swift +++ b/Tests/AsyncStreamingTests/CallerAsyncWriter/CallerAsyncWriterAsyncWriterAdapterTests.swift @@ -39,12 +39,9 @@ struct CallerAsyncWriterAsyncWriterAdapterTests { } try await asyncWriter.finish() - try await readerA.collect(upTo: 5) { span in - #expect(span.count == 3) - #expect(span[0] == 1) - #expect(span[1] == 2) - #expect(span[2] == 3) - } + var target = RigidArray(capacity: 5) + try await readerA.collect(into: &target) + #expect(Array(draining: &target) == [1, 2, 3]) } } @@ -99,9 +96,9 @@ struct CallerAsyncWriterAsyncWriterAdapterTests { let asyncWriter = writerB.asAsyncWriter() try await asyncWriter.finish() - try await readerA.collect(upTo: 5) { span in - #expect(span.count == 0) - } + var target = RigidArray(capacity: 5) + try await readerA.collect(into: &target) + #expect(target.count == 0) } } @@ -124,12 +121,9 @@ struct CallerAsyncWriterAsyncWriterAdapterTests { } try await asyncWriter.finish() - try await readerA.collect(upTo: 5) { span in - #expect(span.count == 5) - for i in 0..<5 { - #expect(span[i] == i + 1) - } - } + var target = RigidArray(capacity: 5) + try await readerA.collect(exactlyInto: &target) + #expect(Array(draining: &target) == [1, 2, 3, 4, 5]) } } } diff --git a/Tests/AsyncStreamingTests/Helpers/Array+Span.swift b/Tests/AsyncStreamingTests/Helpers/Array+Span.swift index b1b69563..22a1002d 100644 --- a/Tests/AsyncStreamingTests/Helpers/Array+Span.swift +++ b/Tests/AsyncStreamingTests/Helpers/Array+Span.swift @@ -28,6 +28,14 @@ extension Array { } } + init & ~Copyable & ~Escapable>(draining container: inout C) { + self.init() + var consumer = container.consumeAll() + while let value = consumer.next() { + self.append(value) + } + } + mutating func append(span: Span) { for index in span.indices { self.append(span[index])