diff --git a/Sources/AsyncStreaming/AsyncReader/AsyncReader+forEach.swift b/Sources/AsyncStreaming/AsyncReader/AsyncReader+forEach.swift index 5364e048..297b7b83 100644 --- a/Sources/AsyncStreaming/AsyncReader/AsyncReader+forEach.swift +++ b/Sources/AsyncStreaming/AsyncReader/AsyncReader+forEach.swift @@ -11,7 +11,7 @@ #if UnstableAsyncStreaming && compiler(>=6.4) -import ContainersPreview +public import ContainersPreview // swift-format-ignore: AmbiguousTrailingClosureOverload @available(macOS 10.14.4, iOS 12.2, watchOS 5.2, tvOS 12.2, *) @@ -47,7 +47,9 @@ extension AsyncReader where Self: ~Copyable, Self: ~Escapable { var done = false while !done { try await self.read { (next, finalElement) throws(Failure) -> Void in - try await body(&next) + if !next.isEmpty { + try await body(&next) + } if let finalElement { final = finalElement done = true @@ -79,21 +81,15 @@ extension AsyncReader where Self: ~Copyable, Self: ~Escapable { public consuming func forEachBuffer( body: (inout Buffer) async -> Void ) async -> FinalElement where ReadFailure == Never { - var finalElement: FinalElement? = nil - while finalElement == nil { - do { - try await self.read { (next, final) -> Void in - await body(&next) - if let final { - finalElement = final - } - } - } catch { - fatalError() + do { + let final: FinalElement? = try await self.forEachBuffer { (buffer) async throws(Never) -> Void in + await body(&buffer) } + // The force-unwrap is safe since final element must be set at this point + return final! + } catch { + fatalError() } - // The force-unwrap is safe since final element must be set at this point - return finalElement! } } #endif diff --git a/Tests/AsyncStreamingTests/AsyncReader/AsyncReader+forEachTests.swift b/Tests/AsyncStreamingTests/AsyncReader/AsyncReader+forEachTests.swift index b70a9089..ec621359 100644 --- a/Tests/AsyncStreamingTests/AsyncReader/AsyncReader+forEachTests.swift +++ b/Tests/AsyncStreamingTests/AsyncReader/AsyncReader+forEachTests.swift @@ -53,8 +53,34 @@ struct AsyncReaderforEachBufferTests { callCount += 1 } - // The reader still emits a terminal call (with an empty buffer + finalElement). - #expect(callCount == 1) + // The reader signals end-of-stream with an empty buffer; body should not be called. + #expect(callCount == 0) + } + + @Test + func forEachBufferDoesNotInvokeBodyWithEmptyTerminalBuffer() async throws { + let reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 0, copying: [])) + var observedCounts: [Int] = [] + + _ = await reader.forEachBuffer { buffer in + observedCounts.append(buffer.count) + } + + #expect(observedCounts.allSatisfy { $0 > 0 }) + } + + @Test + func forEachBufferThrowingDoesNotInvokeBodyWithEmptyTerminalBuffer() async throws { + enum TestError: Error {} + + let reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 0, copying: [])) + var observedCounts: [Int] = [] + + _ = try await reader.forEachBuffer { (buffer) throws(TestError) -> Void in + observedCounts.append(buffer.count) + } + + #expect(observedCounts.allSatisfy { $0 > 0 }) } @Test diff --git a/Tests/AsyncStreamingTests/DuplexChannel/DuplexAsyncChannelTests.swift b/Tests/AsyncStreamingTests/DuplexChannel/DuplexAsyncChannelTests.swift index 4cc0b8e4..350f44e0 100644 --- a/Tests/AsyncStreamingTests/DuplexChannel/DuplexAsyncChannelTests.swift +++ b/Tests/AsyncStreamingTests/DuplexChannel/DuplexAsyncChannelTests.swift @@ -449,7 +449,7 @@ struct DuplexAsyncChannelTests { // it through a generic function that only sees the protocol. func finishViaProtocol( _ writer: consuming W, - finalElement: consuming W.FinalElement? + finalElement: consuming W.FinalElement ) async throws(W.WriteFailure) where W.WriteElement == Int { var buf = UniqueArray(minimumCapacity: 3) buf.append(7) @@ -466,7 +466,7 @@ struct DuplexAsyncChannelTests { _ = readerA _ = writerB - try await finishViaProtocol(writerA, finalElement: .some(())) + try await finishViaProtocol(writerA, finalElement: ()) var collected: [Int] = [] var sawFinal = false