diff --git a/Package.swift b/Package.swift index 92b8be8d..be67d2a1 100644 --- a/Package.swift +++ b/Package.swift @@ -68,6 +68,7 @@ let package = Package( dependencies: [ .product(name: "BasicContainers", package: "swift-collections"), .product(name: "ContainersPreview", package: "swift-collections"), + .product(name: "DequeModule", package: "swift-collections"), ], swiftSettings: [ .enableExperimentalFeature("SuppressedAssociatedTypesWithDefaults"), diff --git a/Sources/AsyncStreaming/DuplexChannel/DuplexAsyncChannel.swift b/Sources/AsyncStreaming/DuplexChannel/DuplexAsyncChannel.swift new file mode 100644 index 00000000..7e793518 --- /dev/null +++ b/Sources/AsyncStreaming/DuplexChannel/DuplexAsyncChannel.swift @@ -0,0 +1,536 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2026 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +#if UnstableAsyncStreaming && compiler(>=6.4) +public import DequeModule +public import ContainersPreview + +/// A bidirectional, in-memory duplex channel with four connected handles. +/// +/// Each call to ``withDuplex(of:withFinalElement:throwing:backpressureStrategy:isolation:body:)`` +/// creates two ``Writer``s and two ``Reader``s connected by a pair of +/// internal ``MultiProducerSingleConsumerAsyncChannel`` storages — one +/// per direction: +/// +/// ``` +/// forward +/// writerA ────────────────────────────────────────> readerB +/// +/// reverse +/// readerA <──────────────────────────────────────── writerB +/// ``` +/// +/// The four handles are independent `~Copyable` values so each can be +/// sent to its own task without an intermediate decomposition step. +/// +/// Each direction applies backpressure independently using the configured +/// ``BackpressureStrategy``: writes suspend when the per-direction buffer +/// rises above the high watermark and resume once it drops below the low +/// watermark. +/// +/// +/// To scope the channel and its handles to a structured-concurrency +/// region, use +/// ``withDuplex(of:withFinalElement:throwing:backpressureStrategy:isolation:body:)``. +/// When `body` returns, both directions are finalized and any remaining +/// suspended producers are resumed with an error. +/// +/// The ``FinalElement`` and ``Failure`` types apply to both directions. +/// Each direction's writer terminates its half of the channel +/// independently by calling ``Writer/finish(finalElement:)`` or +/// ``Writer/finish(throwing:)``. +@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) +public struct DuplexAsyncChannel< + Element: Sendable, + FinalElement: Sendable, + Failure: Error +>: ~Copyable { + /// Creates a new duplex channel with four connected handles and runs + /// `body` with all four. + /// + /// The handles are paired by side: + /// + /// - `writerA` and `readerA` belong to side A. Elements `writerA` + /// sends are observed on `readerB`; `readerA` observes elements + /// `writerB` sends. + /// - `writerB` and `readerB` belong to side B, mirrored. + /// + /// After `body` returns, the duplex finalizes both directions and + /// resumes any remaining suspended producers with an error. + /// + /// The handles are noncopyable and have no `deinit`-based cleanup. To + /// terminate one direction before the scope ends, call + /// ``Writer/finish(finalElement:)`` or ``Writer/finish(throwing:)`` on + /// the corresponding writer. Otherwise `withDuplex` finalizes both + /// directions when `body` returns. + /// + /// - Parameters: + /// - elementType: The element type of both directions. + /// - finalElementType: The end-of-stream payload type of both + /// directions. + /// - failureType: The failure type of both directions. + /// - backpressureStrategy: The backpressure strategy applied + /// independently to each direction. + /// - isolation: The actor isolation in which `body` runs. Defaults to + /// the caller's isolation. + /// - body: A closure that receives ownership of the four connected + /// handles, in order: side A's writer, side A's reader, side B's + /// writer, side B's reader. + /// - Returns: The value returned from `body`. + @inlinable + public static func withDuplex( + of elementType: Element.Type = Element.self, + withFinalElement finalElementType: FinalElement.Type, + throwing failureType: Failure.Type = Never.self, + backpressureStrategy: BackpressureStrategy, + isolation: isolated (any Actor)? = #isolation, + body: ( + consuming sending Writer, + consuming sending Reader, + consuming sending Writer, + consuming sending Reader + ) async throws(BodyFailure) -> sending Result + ) async throws(BodyFailure) -> sending Result { + let forward = MultiProducerSingleConsumerAsyncChannel._Storage( + backpressureStrategy: backpressureStrategy.internalBackpressureStrategy + ) + let reverse = MultiProducerSingleConsumerAsyncChannel._Storage( + backpressureStrategy: backpressureStrategy.internalBackpressureStrategy + ) + + let writerA = Writer(storage: forward) + let readerA = Reader(storage: reverse) + let writerB = Writer(storage: reverse) + let readerB = Reader(storage: forward) + + let result: Result + do throws(BodyFailure) { + result = try await body(writerA, readerA, writerB, readerB) + } catch { + forward.finish(throwing: nil, finalElement: nil) + reverse.finish(throwing: nil, finalElement: nil) + forward.channelDeinitialized() + reverse.channelDeinitialized() + throw error + } + forward.finish(throwing: nil, finalElement: nil) + reverse.finish(throwing: nil, finalElement: nil) + forward.channelDeinitialized() + reverse.channelDeinitialized() + return result + } +} + +@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) +extension DuplexAsyncChannel where FinalElement == Void { + /// Creates a new duplex channel with a `Void` end-of-stream payload and + /// runs `body` with all four handles. + /// + /// This overload is available when ``FinalElement`` is `Void`. It's + /// equivalent to calling + /// ``withDuplex(of:withFinalElement:throwing:backpressureStrategy:isolation:body:)`` + /// with `withFinalElement: Void.self`. + @inlinable + public static func withDuplex( + of elementType: Element.Type = Element.self, + throwing failureType: Failure.Type = Never.self, + backpressureStrategy: BackpressureStrategy, + isolation: isolated (any Actor)? = #isolation, + body: ( + consuming sending Writer, + consuming sending Reader, + consuming sending Writer, + consuming sending Reader + ) async throws(BodyFailure) -> sending Result + ) async throws(BodyFailure) -> sending Result { + try await self.withDuplex( + of: elementType, + withFinalElement: Void.self, + throwing: failureType, + backpressureStrategy: backpressureStrategy, + isolation: isolation, + body: body + ) + } +} + +// MARK: - Backpressure strategy + +@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) +extension DuplexAsyncChannel { + /// A backpressure strategy applied independently to each direction of + /// the duplex. + public struct BackpressureStrategy: Sendable { + @usableFromInline + var internalBackpressureStrategy: + MultiProducerSingleConsumerAsyncChannel._InternalBackpressureStrategy + + @inlinable + init( + internalBackpressureStrategy: + MultiProducerSingleConsumerAsyncChannel._InternalBackpressureStrategy + ) { + self.internalBackpressureStrategy = internalBackpressureStrategy + } + + /// A backpressure strategy that suspends and resumes producers based + /// on high and low watermarks. + /// + /// - Parameters: + /// - low: When the buffered element count drops below this + /// watermark, the channel resumes suspended producers in that + /// direction. + /// - high: When the buffered element count rises above this + /// watermark, the channel suspends new writes in that direction. + @inlinable + public static func watermark(low: Int, high: Int) -> BackpressureStrategy { + .init( + internalBackpressureStrategy: .watermark( + .init(low: low, high: high, waterLevelForElement: nil) + ) + ) + } + + /// A backpressure strategy that suspends and resumes producers based + /// on high and low watermarks, weighted by a per-element water level. + /// + /// - Parameters: + /// - low: When the water level drops below this watermark, the + /// channel resumes suspended producers in that direction. + /// - high: When the water level rises above this watermark, the + /// channel suspends new writes in that direction. + /// - waterLevelForElement: A closure that returns the water-level + /// contribution of a single element. The channel calls this + /// closure while holding its lock, so the closure must be free of + /// side effects and should run in constant time. + @inlinable + public static func watermark( + low: Int, + high: Int, + waterLevelForElement: @escaping @Sendable (borrowing Element) -> Int + ) -> BackpressureStrategy { + .init( + internalBackpressureStrategy: .watermark( + .init(low: low, high: high, waterLevelForElement: waterLevelForElement) + ) + ) + } + } +} + +// MARK: - Writer + +@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) +extension DuplexAsyncChannel { + /// The writer half of one side of a ``DuplexAsyncChannel``. + /// + /// Conforms to ``CallerAsyncWriter``: callers provide their own + /// ``RangeReplaceableContainer``-conforming buffer that the writer + /// drains. + /// + /// Elements written here travel the channel's forward or reverse + /// direction and are observed on the peer side's ``Reader``. The writer + /// applies backpressure: ``write(buffer:)`` suspends when the + /// destination's buffer rises above the configured high watermark. + /// + /// Writers can be cloned with ``clone()`` to produce concurrently from + /// multiple tasks. Terminate the direction with + /// ``finish(buffer:finalElement:)``, ``finish(finalElement:)``, or + /// ``finish(throwing:)``. + public struct Writer: ~Copyable, CallerAsyncWriter { + public typealias WriteElement = Element + public typealias WriteFailure = any Error + @usableFromInline + let _storage: MultiProducerSingleConsumerAsyncChannel._Storage + + @usableFromInline + let _id: UInt64 + + @usableFromInline + init( + storage: MultiProducerSingleConsumerAsyncChannel._Storage + ) { + self._storage = storage + self._id = storage.sourceInitialized() + } + + /// Sets a callback to invoke when this direction terminates. + /// + /// The duplex calls `callback` after the peer's reader observes its + /// last element on this direction. If the direction has already + /// terminated, the duplex invokes `callback` immediately. + /// + /// - Important: A writer supports a single termination callback. + /// Setting a new callback replaces any previous one. + @inlinable + public func setOnTerminationCallback(_ callback: (@Sendable () -> Void)?) { + self._storage.setOnTerminationCallback(sourceID: self._id, callback: callback) + } + + /// Creates an additional writer for this direction so multiple + /// producers can send concurrently. + /// + /// The cloned writer terminates the direction independently — the + /// direction stays open until every clone has finished or been + /// dropped, mirroring ``MultiProducerSingleConsumerAsyncChannel/Source/clone()``. + @inlinable + public mutating func clone() -> sending Self { + .init(storage: self._storage) + } + + /// Terminates this direction with the supplied error. + /// + /// After the peer reader consumes all buffered elements on this + /// direction, its next ``Reader/read(body:)`` call throws `error`. + /// This path delivers no ``FinalElement`` payload to the peer. + /// + /// To terminate this direction cleanly with an end-of-stream payload, + /// call ``finish(finalElement:)`` instead. When ``FinalElement`` is + /// `Void`, you can also call the ``finish()`` convenience. + @inlinable + public consuming func finish(throwing error: Failure) { + self._storage.finish(throwing: error, finalElement: nil) + } + + /// Finishes this direction with a ``FinalElement`` payload. + /// + /// The peer reader observes end-of-stream as a non-`nil` `finalElement` + /// argument to the body of its next ``Reader/read(body:)`` call. The + /// channel delivers any elements still buffered from earlier + /// ``write(buffer:)`` calls before the terminator. + /// + /// - Note: This method delivers only the end-of-stream signal. To + /// send a final batch of elements alongside the terminator, call + /// ``write(buffer:)`` first and then ``finish(finalElement:)``. + @inlinable + public consuming func finish(finalElement: consuming sending FinalElement) { + self._storage.finish(throwing: nil, finalElement: finalElement) + } + + /// Writes every element of `buffer` to this direction. + /// + /// On success the call drains `buffer` completely. If the + /// direction's backpressure strategy signals that production should + /// pause, the call suspends until the peer reader drains enough of + /// the channel to fall below the low watermark. + /// + /// - Throws: ``MultiProducerSingleConsumerAsyncChannelAlreadyFinishedError`` + /// if this direction has already finished, or `CancellationError` + /// if the task is canceled while suspended on backpressure. + @inlinable + public mutating func write & ~Copyable>( + buffer: inout Buffer + ) async throws { + // Move the caller's buffer into a `nonisolated(unsafe)` local so we + // can hand it to the storage's `inout sending` API. Safe because + // the elements are Sendable and we have the buffer inout so an exclusive + // ownership. + nonisolated(unsafe) var localBuffer = consume buffer + let sendResult: MultiProducerSingleConsumerAsyncChannel.Source._SendResult + do { + sendResult = try self._storage.write(buffer: &localBuffer) + } catch { + buffer = consume localBuffer + throw error + } + buffer = consume localBuffer + + switch consume sendResult { + case .produceMore: + return + + case .enqueueCallback(let token): + let storage = self._storage + do { + try await withTaskCancellationHandler { + try await withUnsafeThrowingContinuation { (continuation: UnsafeContinuation) in + storage.enqueueProducer(callbackToken: token, continuation: continuation) + } + } onCancel: { + storage.cancelProducer(callbackToken: token) + } + } catch { + throw error + } + } + } + + /// Drains `buffer` to the peer, then signals end-of-stream with the + /// ``FinalElement`` payload. Consumes the writer. + /// + /// This is the ``CallerAsyncWriter`` protocol entry point. The + /// duplex's in-memory transport doesn't fuse the last write with the + /// end-of-stream signal — `write(buffer:)` and `finish` are issued + /// sequentially. The observable result for the peer reader matches + /// the fused contract: the peer sees the trailing buffer's elements + /// and the `finalElement` together on its terminal `read`. + /// + /// - Parameters: + /// - buffer: A buffer of remaining elements to write before + /// signaling end-of-stream. + /// - finalElement: The payload to deliver alongside the + /// end-of-stream signal. + /// - Throws: Any error thrown while draining `buffer`. If draining + /// fails, the direction is left unterminated; the scope's + /// finalizer terminates it on body return. + @inlinable + public consuming func finish & ~Copyable>( + buffer: inout Buffer, + finalElement: consuming FinalElement + ) async throws { + try await self.write(buffer: &buffer) + self._storage.finish(throwing: nil, finalElement: finalElement) + } + } +} + +@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) +extension DuplexAsyncChannel.Writer where FinalElement == Void { + /// Finishes this direction with an empty `Void` end-of-stream payload. + /// + /// This method is equivalent to calling ``finish(finalElement:)`` with + /// `()`. The peer reader observes end-of-stream as a non-`nil` + /// `finalElement` argument to the body of its next read. + @inlinable + public consuming func finish() { + self._storage.finish(throwing: nil, finalElement: ()) + } +} + +// MARK: - Reader + +@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) +extension DuplexAsyncChannel { + /// The reader half of one side of a ``DuplexAsyncChannel``. + /// + /// Conforms to ``AsyncReader``: the reader hands a noncopyable + /// ``UniqueDeque`` of elements to the body closure alongside an + /// optional ``FinalElement`` payload that signals end-of-stream when + /// present. + /// + /// Reads elements written by the peer side's ``Writer``. + public struct Reader: ~Copyable, AsyncReader { + public typealias ReadElement = Element + public typealias Buffer = UniqueDeque + public typealias ReadFailure = EitherError + @usableFromInline + let _storage: MultiProducerSingleConsumerAsyncChannel._Storage + + @usableFromInline + init( + storage: MultiProducerSingleConsumerAsyncChannel._Storage + ) { + self._storage = storage + } + + /// Reads the next chunk of elements from this direction. + /// + /// The reader passes the buffered elements to `body` along with an + /// optional ``FinalElement`` payload. A non-`nil` `finalElement` + /// marks the chunk as terminal and delivers the end-of-stream signal. + /// The terminal chunk's buffer may be empty or contain a final batch + /// of elements. + /// + /// - Throws: An ``EitherError`` whose outer `.first` arm carries a + /// read-side error — either the duplex's `Failure` (when the peer + /// writer called ``Writer/finish(throwing:)``) or a + /// `CancellationError` (when the task is canceled while suspended + /// in `read`) — and whose outer `.second` arm carries the failure + /// thrown by `body`. + /// + /// - Important: After the reader observes a non-`nil` `finalElement`, + /// calling `read(body:)` again is a programmer error. + @inlinable + public mutating func read( + body: (inout UniqueDeque, consuming FinalElement?) async throws(BodyFailure) -> Return + ) async throws(EitherError, BodyFailure>) -> Return { + while true { + let action = self._storage.readAvailable() + switch consume action { + case .returnElements(let disconnected): + var buffer = disconnected.take() + let result: Return + do throws(BodyFailure) { + result = try await body(&buffer, nil) + let buffer = buffer + self._storage.returnCachedReadBuffer(buffer) + } catch { + let buffer = buffer + self._storage.returnCachedReadBuffer(buffer) + throw .second(error) + } + return result + + case .returnElementsAndResumeProducers(let disconnected, let producers): + var buffer = disconnected.take() + for producer in producers { + switch producer { + case .closure(let onProduceMore): + onProduceMore(Result.success(())) + case .continuation(let continuation): + continuation.resume() + } + } + let result: Return + do throws(BodyFailure) { + result = try await body(&buffer, nil) + let buffer = buffer + self._storage.returnCachedReadBuffer(buffer) + } catch { + let buffer = buffer + self._storage.returnCachedReadBuffer(buffer) + throw .second(error) + } + return result + + case .returnTerminalChunk(let disconnectedBuffer, let disconnectedFinal, let onTerminations): + for (_, callback) in onTerminations { callback() } + var buffer = disconnectedBuffer.take() + let final = disconnectedFinal.take() + do throws(BodyFailure) { + return try await body(&buffer, final) + } catch { + throw .second(error) + } + + case .throwFailure(let failure, let onTerminations): + for (_, callback) in onTerminations { callback() } + if let failure { + throw .first(.first(failure)) + } + var empty = UniqueDeque() + do throws(BodyFailure) { + return try await body(&empty, nil) + } catch { + throw .second(error) + } + + case .returnNil: + var empty = UniqueDeque() + do throws(BodyFailure) { + return try await body(&empty, nil) + } catch { + throw .second(error) + } + + case .suspend: + do { + try await self._storage.suspendRead() + } catch { + throw .first(error) + } + continue + } + } + } + } +} +#endif diff --git a/Sources/AsyncStreaming/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerAsyncChannel+Internal.swift b/Sources/AsyncStreaming/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerAsyncChannel+Internal.swift new file mode 100644 index 00000000..598f4c73 --- /dev/null +++ b/Sources/AsyncStreaming/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerAsyncChannel+Internal.swift @@ -0,0 +1,1048 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2026 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +#if UnstableAsyncStreaming && compiler(>=6.4) +public import DequeModule +public import Synchronization +public import ContainersPreview + +@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) +extension MultiProducerSingleConsumerAsyncChannel { + @usableFromInline + enum _InternalBackpressureStrategy: Sendable, CustomStringConvertible { + @usableFromInline + struct _Watermark: Sendable, CustomStringConvertible { + @usableFromInline + let _low: Int + + @usableFromInline + let _high: Int + + @usableFromInline + var _currentWatermark: Int = 0 + + @usableFromInline + let _waterLevelForElement: (@Sendable (borrowing Element) -> Int)? + + @usableFromInline + var description: String { "watermark(\(self._currentWatermark))" } + + @inlinable + init(low: Int, high: Int, waterLevelForElement: (@Sendable (borrowing Element) -> Int)?) { + precondition(low <= high) + self._low = low + self._high = high + self._waterLevelForElement = waterLevelForElement + } + + /// Records that elements at offsets `appendedFromOffset.., appendedFromOffset offset: Int) -> Bool { + if let f = self._waterLevelForElement { + for i in offset..= 0) + return self._currentWatermark < self._high + } + + /// Records that all elements in `buffer` are about to leave the channel. + /// Returns whether more should be produced now. + @inlinable + mutating func didConsume(buffer: borrowing UniqueDeque) -> Bool { + if let f = self._waterLevelForElement { + for i in 0..= 0) + return self._currentWatermark < self._low + } + } + + case watermark(_Watermark) + + @usableFromInline + var description: String { + switch consume self { + case .watermark(let s): return s.description + } + } + + @inlinable + mutating func didSend(buffer: borrowing UniqueDeque, appendedFromOffset offset: Int) -> Bool { + switch consume self { + case .watermark(var s): + let r = s.didSend(buffer: buffer, appendedFromOffset: offset) + self = .watermark(s) + return r + } + } + + @inlinable + mutating func didConsume(buffer: borrowing UniqueDeque) -> Bool { + switch consume self { + case .watermark(var s): + let r = s.didConsume(buffer: buffer) + self = .watermark(s) + return r + } + } + } +} + +@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) +extension MultiProducerSingleConsumerAsyncChannel { + @usableFromInline + final class _Storage: Sendable { + @usableFromInline + let _stateMachine: Mutex<_StateMachine> + + @inlinable + init(backpressureStrategy: _InternalBackpressureStrategy) { + self._stateMachine = Mutex<_StateMachine>(_StateMachine(backpressureStrategy: backpressureStrategy)) + } + + @inlinable + func setOnTerminationCallback(sourceID: UInt64, callback: (@Sendable () -> Void)?) { + let action = self._stateMachine.withLock { + $0.setOnTerminationCallback(sourceID: sourceID, callback: callback) + } + switch action { + case .callOnTermination(let onTermination): + onTermination() + case .none: + break + } + } + + @inlinable + func channelDeinitialized() { + let action = self._stateMachine.withLock { $0.channelDeinitialized() } + switch action { + case .callOnTerminations(let onTerminations): + for (_, cb) in onTerminations { cb() } + case .failProducersAndCallOnTerminations(let producers, let onTerminations): + Self._failProducers(producers) + for (_, cb) in onTerminations { cb() } + case .none: + break + } + } + + func sourceInitialized() -> UInt64 { + self._stateMachine.withLock { $0.sourceInitialized() } + } + + @inlinable + func write & ~Copyable>( + buffer: inout sending Buffer + ) throws -> MultiProducerSingleConsumerAsyncChannel.Source._SendResult { + var disconnectedBuffer = _Disconnected(value: Optional(buffer)) + let action = self._stateMachine.withLock { + var buffer = disconnectedBuffer.swap(newValue: nil)! + let action = $0.send(buffer: &buffer) + disconnectedBuffer.swap(newValue: buffer) + return action + } + buffer = disconnectedBuffer.take()! + + switch consume action { + case .returnProduceMore: + return .produceMore + case .returnEnqueue(let token): + return .enqueueCallback(callbackToken: token) + case .resumeReaderAndReturnProduceMore(let continuation): + continuation.resume() + return .produceMore + case .resumeReaderAndReturnEnqueue(let continuation, let token): + continuation.resume() + return .enqueueCallback(callbackToken: token) + case .throwFinishedError: + throw MultiProducerSingleConsumerAsyncChannelAlreadyFinishedError() + } + } + + @inlinable + func enqueueProducer(callbackToken: UInt64, continuation: UnsafeContinuation) { + let action = self._stateMachine.withLock { + $0.enqueueContinuation(callbackToken: callbackToken, continuation: continuation) + } + switch action { + case .resumeProducer(let c): + c.resume() + case .resumeProducerWithError(let c, let err): + c.resume(throwing: err) + case .none: + break + } + } + + @inlinable + func enqueueProducer( + callbackToken: UInt64, + onProduceMore: sending @escaping (Result) -> Void + ) { + var optionalCallback = _Disconnected(value: Optional(onProduceMore)) + let action = self._stateMachine.withLock { + let cb = optionalCallback.swap(newValue: nil)! + return $0.enqueueProducer(callbackToken: callbackToken, onProduceMore: cb) + } + switch consume action { + case .resumeProducer(let cb): + cb.take()(.success(())) + case .resumeProducerWithError(let cb, let err): + cb.take()(.failure(err)) + case .none: + break + } + } + + @inlinable + func cancelProducer(callbackToken: UInt64) { + let action = self._stateMachine.withLock { $0.cancelProducer(callbackToken: callbackToken) } + switch action { + case .resumeProducerWithCancellationError(let p): + switch p { + case .closure(let cb): + cb(.failure(CancellationError())) + case .continuation(let c): + c.resume(throwing: CancellationError()) + } + case .none: + break + } + } + + @inlinable + func finish(throwing failure: Failure?, finalElement: consuming sending FinalElement?) { + var optionalFinal = Optional(_Disconnected(value: finalElement)) + let action = self._stateMachine.withLock { + let fe = optionalFinal.take()!.take() + return $0.finish(failure: failure, finalElement: fe) + } + switch action { + case .callOnTerminations(let onTerminations): + for (_, cb) in onTerminations { cb() } + case .resumeProducers(let producers): + Self._failProducers(producers) + case .resumeReaderAndResumeProducers(let reader, let producers): + reader.resume() + Self._failProducers(producers) + case .none: + break + } + } + + @inlinable + func readAvailable() -> _StateMachine.ReadAvailableAction { + self._stateMachine.withLock { $0.readAvailable() } + } + + @inlinable + func returnCachedReadBuffer(_ buffer: consuming sending UniqueDeque) { + var disconnected = Optional(_Disconnected(value: buffer)) + self._stateMachine.withLock { + $0.returnCachedReadBuffer(disconnected.take()!.take()) + } + } + + @inlinable + func suspendRead() async throws(EitherError) { + try await withTaskCancellationHandler { () throws(EitherError) -> Void in + try await withUnsafeThrowingContinuation { + (continuation: UnsafeContinuation>) in + let action = self._stateMachine.withLock { + $0.suspendRead(continuation: continuation) + } + switch consume action { + case .resumeReader(let c): + c.resume() + case .none: + break + } + } + } onCancel: { + let action = self._stateMachine.withLock { $0.cancelRead() } + switch action { + case .resumeReaderWithCancellationError(let c, let producers, let onTerminations): + c.resume(throwing: .second(CancellationError())) + Self._failProducers(producers) + for (_, cb) in onTerminations { cb() } + case .failProducersAndCallOnTerminations(let producers, let onTerminations): + Self._failProducers(producers) + for (_, cb) in onTerminations { cb() } + case .none: + break + } + } + } + + @inlinable + static func _failProducers(_ producers: [_MultiProducerSingleConsumerSuspendedProducer]) { + for p in producers { + switch p { + case .closure(let cb): + cb(.failure(MultiProducerSingleConsumerAsyncChannelAlreadyFinishedError())) + case .continuation(let c): + c.resume(throwing: MultiProducerSingleConsumerAsyncChannelAlreadyFinishedError()) + } + } + } + } +} + +@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) +extension MultiProducerSingleConsumerAsyncChannel._Storage { + @usableFromInline + struct _StateMachine: ~Copyable, Sendable { + @usableFromInline + var _state: _State + + @inlinable + init(backpressureStrategy: MultiProducerSingleConsumerAsyncChannel._InternalBackpressureStrategy) { + self._state = .channeling( + .init( + backpressureStrategy: backpressureStrategy, + buffer: _Disconnected(value: UniqueDeque()), + producerContinuations: .init(), + cancelledAsyncProducers: .init(), + hasOutstandingDemand: true, + nextCallbackTokenID: 0, + nextSourceID: 0 + ) + ) + } + + @inlinable + init(state: consuming _State) { + self._state = state + } + + @usableFromInline + enum SetOnTerminationCallback: Sendable { + case callOnTermination(@Sendable () -> Void) + } + + @inlinable + mutating func setOnTerminationCallback( + sourceID: UInt64, + callback: (@Sendable () -> Void)? + ) -> SetOnTerminationCallback? { + switch consume self._state { + case .channeling(var s): + Self._upsertOnTermination(&s.onTerminations, sourceID: sourceID, callback: callback) + self = .init(state: .channeling(s)) + return .none + + case .sourceFinished(var s): + Self._upsertOnTermination(&s.onTerminations, sourceID: sourceID, callback: callback) + self = .init(state: .sourceFinished(s)) + return .none + + case .finished(let s): + self = .init(state: .finished(s)) + guard let callback else { return .none } + return .callOnTermination(callback) + } + } + + @inlinable + static func _upsertOnTermination( + _ list: inout [(UInt64, @Sendable () -> Void)], + sourceID: UInt64, + callback: (@Sendable () -> Void)? + ) { + if let callback { + if let idx = list.firstIndex(where: { $0.0 == sourceID }) { + list[idx] = (sourceID, callback) + } else { + list.append((sourceID, callback)) + } + } else { + list.removeAll(where: { $0.0 == sourceID }) + } + } + + @inlinable + mutating func sourceInitialized() -> UInt64 { + switch consume self._state { + case .channeling(var s): + let id = s.nextSourceID() + self = .init(state: .channeling(s)) + return id + case .sourceFinished(var s): + let id = s.nextSourceID() + self = .init(state: .sourceFinished(s)) + return id + case .finished(let s): + self = .init(state: .finished(s)) + return .max + } + } + + @usableFromInline + enum ChannelDeinitializedAction: Sendable { + case callOnTerminations([(UInt64, @Sendable () -> Void)]) + case failProducersAndCallOnTerminations( + [_MultiProducerSingleConsumerSuspendedProducer], + [(UInt64, @Sendable () -> Void)] + ) + } + + @inlinable + mutating func channelDeinitialized() -> ChannelDeinitializedAction? { + switch consume self._state { + case .channeling(let s): + let producers = Array(s.suspendedProducers.lazy.map { $0.1 }) + let onTerminations = s.onTerminations + self = .init(state: .finished(.init(sourceFinished: false))) + return .failProducersAndCallOnTerminations(producers, onTerminations) + + case .sourceFinished(let s): + let onTerminations = s.onTerminations + self = .init(state: .finished(.init(sourceFinished: true))) + return .callOnTerminations(onTerminations) + + case .finished(let s): + self = .init(state: .finished(s)) + return .none + } + } + + @usableFromInline + enum SendAction: ~Copyable, Sendable { + case returnProduceMore + case returnEnqueue(callbackToken: UInt64) + case resumeReaderAndReturnProduceMore( + continuation: UnsafeContinuation> + ) + case resumeReaderAndReturnEnqueue( + continuation: UnsafeContinuation>, + callbackToken: UInt64 + ) + case throwFinishedError + } + + @inlinable + mutating func send( + buffer: inout sending some RangeReplaceableContainer & ~Copyable + ) -> sending SendAction { + switch consume self._state { + case .channeling(var s): + // Take the noncopyable buffer out, drain the caller's buffer into it, + // and put it back. We iterate elements via `consumeAll` so the caller + // is left holding an empty buffer (per the writer contract). + + let shouldProduceMore: Bool = s.buffer.withValue { + (inner: inout UniqueDeque?) -> Bool in + // Take the deque out (or fabricate an empty one) so we can mutate + // it without contending with the inout's exclusivity, then put it + // back when we're done. + var current: UniqueDeque + if case .some(let taken) = inner.take() { + current = taken + } else { + current = UniqueDeque() + } + let offsetBefore = current.count + // Drain the caller's buffer into the channel's internal deque so + // the caller is left holding an empty buffer (per the writer + // contract). `consumeAll` iterates by-move and works for both + // `Copyable` and `~Copyable` element types. + var consumer = buffer.consumeAll() + while let element = consumer.next() { + current.append(element) + } + let didProduce = s.backpressureStrategy.didSend( + buffer: current, + appendedFromOffset: offsetBefore + ) + inner = .some(current) + return didProduce + } + s.hasOutstandingDemand = shouldProduceMore + + if let reader = s.readerContinuation.take() { + let token = shouldProduceMore ? nil : s.nextCallbackToken() + self = .init(state: .channeling(s)) + guard let token else { + return .resumeReaderAndReturnProduceMore(continuation: reader) + } + return .resumeReaderAndReturnEnqueue(continuation: reader, callbackToken: token) + } + + let token = shouldProduceMore ? nil : s.nextCallbackToken() + self = .init(state: .channeling(s)) + guard let token else { + return .returnProduceMore + } + return .returnEnqueue(callbackToken: token) + + case .sourceFinished(let s): + self = .init(state: .sourceFinished(s)) + return .throwFinishedError + + case .finished(let s): + self = .init(state: .finished(s)) + return .throwFinishedError + } + } + + @usableFromInline + enum EnqueueProducerAction: ~Copyable, Sendable { + case resumeProducer(_Disconnected<(Result) -> Void>) + case resumeProducerWithError(_Disconnected<(Result) -> Void>, any Error) + } + + @inlinable + mutating func enqueueProducer( + callbackToken: UInt64, + onProduceMore: sending @escaping (Result) -> Void + ) -> EnqueueProducerAction? { + switch consume self._state { + case .channeling(var s): + if let idx = s.cancelledAsyncProducers.firstIndex(of: callbackToken) { + s.cancelledAsyncProducers.remove(at: idx) + self = .init(state: .channeling(s)) + return .resumeProducerWithError(.init(value: onProduceMore), CancellationError()) + } + if s.hasOutstandingDemand { + self = .init(state: .channeling(s)) + return .resumeProducer(.init(value: onProduceMore)) + } + s.suspendedProducers.append((callbackToken, .closure(onProduceMore))) + self = .init(state: .channeling(s)) + return .none + + case .sourceFinished(let s): + self = .init(state: .sourceFinished(s)) + return .resumeProducerWithError( + .init(value: onProduceMore), + MultiProducerSingleConsumerAsyncChannelAlreadyFinishedError() + ) + + case .finished(let s): + self = .init(state: .finished(s)) + return .resumeProducerWithError( + .init(value: onProduceMore), + MultiProducerSingleConsumerAsyncChannelAlreadyFinishedError() + ) + } + } + + @usableFromInline + enum EnqueueContinuationAction: Sendable { + case resumeProducer(UnsafeContinuation) + case resumeProducerWithError(UnsafeContinuation, any Error) + } + + @inlinable + mutating func enqueueContinuation( + callbackToken: UInt64, + continuation: UnsafeContinuation + ) -> EnqueueContinuationAction? { + switch consume self._state { + case .channeling(var s): + if let idx = s.cancelledAsyncProducers.firstIndex(of: callbackToken) { + s.cancelledAsyncProducers.remove(at: idx) + self = .init(state: .channeling(s)) + return .resumeProducerWithError(continuation, CancellationError()) + } + if s.hasOutstandingDemand { + self = .init(state: .channeling(s)) + return .resumeProducer(continuation) + } + s.suspendedProducers.append((callbackToken, .continuation(continuation))) + self = .init(state: .channeling(s)) + return .none + + case .sourceFinished(let s): + self = .init(state: .sourceFinished(s)) + return .resumeProducerWithError( + continuation, + MultiProducerSingleConsumerAsyncChannelAlreadyFinishedError() + ) + + case .finished(let s): + self = .init(state: .finished(s)) + return .resumeProducerWithError( + continuation, + MultiProducerSingleConsumerAsyncChannelAlreadyFinishedError() + ) + } + } + + @usableFromInline + enum CancelProducerAction: Sendable { + case resumeProducerWithCancellationError(_MultiProducerSingleConsumerSuspendedProducer) + } + + @inlinable + mutating func cancelProducer(callbackToken: UInt64) -> CancelProducerAction? { + switch consume self._state { + case .channeling(var s): + guard let idx = s.suspendedProducers.firstIndex(where: { $0.0 == callbackToken }) else { + s.cancelledAsyncProducers.append(callbackToken) + self = .init(state: .channeling(s)) + return .none + } + let producer = s.suspendedProducers.remove(at: idx).1 + self = .init(state: .channeling(s)) + return .resumeProducerWithCancellationError(producer) + + case .sourceFinished(let s): + self = .init(state: .sourceFinished(s)) + return .none + + case .finished(let s): + self = .init(state: .finished(s)) + return .none + } + } + + @usableFromInline + enum FinishAction: Sendable { + case callOnTerminations([(UInt64, @Sendable () -> Void)]) + case resumeProducers([_MultiProducerSingleConsumerSuspendedProducer]) + case resumeReaderAndResumeProducers( + UnsafeContinuation>, + [_MultiProducerSingleConsumerSuspendedProducer] + ) + } + + @inlinable + mutating func finish(failure: Failure?, finalElement: consuming sending FinalElement?) -> FinishAction? { + switch consume self._state { + case .channeling(var s): + let reader = s.readerContinuation.take() + let producers = Array(s.suspendedProducers.lazy.map { $0.1 }) + s.suspendedProducers.removeAll(keepingCapacity: false) + + self = .init( + state: .sourceFinished( + .init( + buffer: _Disconnected(value: s.buffer.take()!), + failure: failure, + finalElement: _Disconnected(value: finalElement), + onTerminations: s.onTerminations, + nextSourceID: s._nextSourceID + ) + ) + ) + + if let reader { + return .resumeReaderAndResumeProducers(reader, producers) + } + return .resumeProducers(producers) + + case .sourceFinished(let s): + self = .init(state: .sourceFinished(s)) + return .none + + case .finished(let s): + self = .init(state: .finished(s)) + return .none + } + } + + @usableFromInline + enum ReadAvailableAction: ~Copyable, Sendable { + case returnElements(_Disconnected>) + case returnElementsAndResumeProducers( + _Disconnected>, + [_MultiProducerSingleConsumerSuspendedProducer] + ) + case suspend + /// Fused terminal chunk: deliver any remaining elements together with + /// the optional `FinalElement` payload. The channel transitions to its + /// finished state. + case returnTerminalChunk( + _Disconnected>, + _Disconnected, + [(UInt64, @Sendable () -> Void)] + ) + /// The channel was finished with a failure and the buffer is now drained; + /// throw the failure to the reader. + case throwFailure(Failure?, [(UInt64, @Sendable () -> Void)]) + case returnNil + } + + @inlinable + mutating func readAvailable() -> ReadAvailableAction { + switch consume self._state { + case .channeling(var s): + let isProducerBufferEmpty = s.buffer.withValue { + $0.borrow()!.value.isEmpty + } + guard isProducerBufferEmpty else { + // We are going to swap the two buffers around. The cached buffer + // may not exist yet on the first read; fall back to a fresh empty + // deque so the producer side always gets a valid container back. + let readerBuffer = s.cachedReadBuffer.swap(newValue: nil) ?? UniqueDeque() + let producerBuffer = s.buffer.swap(newValue: readerBuffer)! + let shouldProduceMore = s.backpressureStrategy.didConsume(buffer: producerBuffer) + s.hasOutstandingDemand = shouldProduceMore + + if shouldProduceMore && !s.suspendedProducers.isEmpty { + let producers = Array(s.suspendedProducers.lazy.map { $0.1 }) + s.suspendedProducers.removeAll(keepingCapacity: true) + self = .init(state: .channeling(s)) + return .returnElementsAndResumeProducers( + _Disconnected(value: producerBuffer), + producers + ) + } + self = .init(state: .channeling(s)) + return .returnElements(_Disconnected(value: producerBuffer)) + } + self = .init(state: .channeling(s)) + return .suspend + + case .sourceFinished(var s): + let buffer = s.buffer.swap(newValue: UniqueDeque()) + + // Failure-path drains buffered elements first (without consuming the + // failure), so the reader sees the trailing batch on one read and the + // thrown failure on the next. + if !buffer.isEmpty && s.failure != nil { + // Leave s.buffer wrapping the now-empty placeholder. + self = .init(state: .sourceFinished(s)) + nonisolated(unsafe) let bufferSending = consume buffer + return .returnElements(_Disconnected(value: bufferSending)) + } + + // Otherwise fuse the (possibly empty) buffer with the optional + // `FinalElement` and transition to finished. + let fe = s.finalElement.swap(newValue: nil) + let onTerminations = s.onTerminations + let failure = s.failure + self = .init(state: .finished(.init(sourceFinished: true))) + + if let failure { + return .throwFailure(failure, onTerminations) + } + nonisolated(unsafe) let bufferSending = consume buffer + nonisolated(unsafe) let feSending = consume fe + return .returnTerminalChunk( + _Disconnected(value: bufferSending), + _Disconnected(value: feSending), + onTerminations + ) + + case .finished(let s): + self = .init(state: .finished(s)) + return .returnNil + } + } + + @inlinable + mutating func returnCachedReadBuffer(_ buffer: consuming sending UniqueDeque) { + var buffer = buffer + switch consume self._state { + case .channeling(var s): + if !buffer.isEmpty { + // The body did not consume every element. Re-add the leftover to + // the watermark accounting (didConsume was already called for the + // full handed-out batch in `readAvailable`), then prepend the + // leftover to any newly-buffered producer writes so the next read + // delivers them in order. + _ = s.backpressureStrategy.didSend(buffer: buffer, appendedFromOffset: 0) + s.buffer.withValue { (inner: inout UniqueDeque?) in + var current: UniqueDeque + if case .some(let taken) = inner.take() { + current = taken + } else { + current = UniqueDeque() + } + while let last = buffer.popLast() { + current.prepend(last) + } + inner = .some(current) + } + } + // `buffer` is empty at this point; stash it for reuse on the next read. + let _ = s.cachedReadBuffer.swap(newValue: buffer) + self = .init(state: .channeling(s)) + + case .sourceFinished(var s): + if !buffer.isEmpty { + // Preserve unconsumed elements at the head of the source-finished + // buffer so the next read still sees them. + var inner = s.buffer.swap(newValue: UniqueDeque()) + while let last = buffer.popLast() { + inner.prepend(last) + } + nonisolated(unsafe) let innerSending = consume inner + s.buffer = _Disconnected(value: innerSending) + } + self = .init(state: .sourceFinished(s)) + + case .finished(let s): + self = .init(state: .finished(s)) + } + } + + @usableFromInline + enum SuspendReadAction: ~Copyable, Sendable { + case resumeReader(UnsafeContinuation>) + } + + @inlinable + mutating func suspendRead( + continuation: UnsafeContinuation> + ) -> SuspendReadAction? { + switch consume self._state { + case .channeling(var s): + guard s.readerContinuation == nil else { + fatalError("MultiProducerSingleConsumerAsyncChannel internal inconsistency: concurrent readers") + } + let isEmpty = s.buffer.withValue { $0.borrow()!.value.isEmpty } + if !isEmpty { + self = .init(state: .channeling(s)) + return .resumeReader(continuation) + } + s.readerContinuation = continuation + self = .init(state: .channeling(s)) + return .none + + case .sourceFinished(let s): + self = .init(state: .sourceFinished(s)) + return .resumeReader(continuation) + + case .finished(let s): + self = .init(state: .finished(s)) + return .resumeReader(continuation) + } + } + + @usableFromInline + enum CancelReadAction: Sendable { + case resumeReaderWithCancellationError( + UnsafeContinuation>, + [_MultiProducerSingleConsumerSuspendedProducer], + [(UInt64, @Sendable () -> Void)] + ) + case failProducersAndCallOnTerminations( + [_MultiProducerSingleConsumerSuspendedProducer], + [(UInt64, @Sendable () -> Void)] + ) + } + + @inlinable + mutating func cancelRead() -> CancelReadAction? { + switch consume self._state { + case .channeling(var s): + let reader = s.readerContinuation.take() + let producers = Array(s.suspendedProducers.lazy.map { $0.1 }) + let onTerminations = s.onTerminations + self = .init(state: .finished(.init(sourceFinished: false))) + if let reader { + return .resumeReaderWithCancellationError(reader, producers, onTerminations) + } + return .failProducersAndCallOnTerminations(producers, onTerminations) + + case .sourceFinished(let s): + self = .init(state: .sourceFinished(s)) + return .none + + case .finished(let s): + self = .init(state: .finished(s)) + return .none + } + } + } +} + +@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) +extension MultiProducerSingleConsumerAsyncChannel._Storage._StateMachine { + @usableFromInline + enum _State: ~Copyable, Sendable { + @usableFromInline + struct Channeling: ~Copyable, Sendable { + @usableFromInline + var backpressureStrategy: MultiProducerSingleConsumerAsyncChannel._InternalBackpressureStrategy + + @usableFromInline + var onTerminations: [(UInt64, @Sendable () -> Void)] = [] + + /// The buffer of elements pending consumption. + @usableFromInline + var buffer: _Disconnected?> + + /// A reusable empty buffer kept across reads to avoid per-read allocations. + @usableFromInline + var cachedReadBuffer: _Disconnected?> + + /// The continuation of a suspended ``read`` call. + @usableFromInline + var readerContinuation: UnsafeContinuation>? = nil + + @usableFromInline + var suspendedProducers: Deque<(UInt64, _MultiProducerSingleConsumerSuspendedProducer)> + + @usableFromInline + var cancelledAsyncProducers: Deque + + @usableFromInline + var hasOutstandingDemand: Bool + + @usableFromInline + var nextCallbackTokenID: UInt64 + + @usableFromInline + var _nextSourceID: UInt64 + + @inlinable + init( + backpressureStrategy: MultiProducerSingleConsumerAsyncChannel._InternalBackpressureStrategy, + buffer: consuming _Disconnected?>, + producerContinuations: Deque<(UInt64, _MultiProducerSingleConsumerSuspendedProducer)>, + cancelledAsyncProducers: Deque, + hasOutstandingDemand: Bool, + nextCallbackTokenID: UInt64, + nextSourceID: UInt64 + ) { + self.backpressureStrategy = backpressureStrategy + self.buffer = buffer + self.cachedReadBuffer = _Disconnected(value: nil) + self.suspendedProducers = producerContinuations + self.cancelledAsyncProducers = cancelledAsyncProducers + self.hasOutstandingDemand = hasOutstandingDemand + self.nextCallbackTokenID = nextCallbackTokenID + self._nextSourceID = nextSourceID + } + + @inlinable + mutating func nextCallbackToken() -> UInt64 { + defer { self.nextCallbackTokenID += 1 } + return self.nextCallbackTokenID + } + + @inlinable + mutating func nextSourceID() -> UInt64 { + defer { self._nextSourceID += 1 } + return self._nextSourceID + } + } + + @usableFromInline + struct SourceFinished: ~Copyable, Sendable { + @usableFromInline + var buffer: _Disconnected> + + @usableFromInline + var failure: Failure? + + @usableFromInline + var finalElement: _Disconnected + + @usableFromInline + var onTerminations: [(UInt64, @Sendable () -> Void)] + + @usableFromInline + var _nextSourceID: UInt64 + + @inlinable + init( + buffer: consuming _Disconnected>, + failure: Failure? = nil, + finalElement: consuming _Disconnected = .init(value: nil), + onTerminations: [(UInt64, @Sendable () -> Void)] = [], + nextSourceID: UInt64 + ) { + self.buffer = buffer + self.failure = failure + self.finalElement = finalElement + self.onTerminations = onTerminations + self._nextSourceID = nextSourceID + } + + @inlinable + mutating func nextSourceID() -> UInt64 { + defer { self._nextSourceID += 1 } + return self._nextSourceID + } + } + + @usableFromInline + struct Finished: ~Copyable, Sendable { + @usableFromInline + var sourceFinished: Bool + + @inlinable + init(sourceFinished: Bool) { self.sourceFinished = sourceFinished } + } + + case channeling(Channeling) + case sourceFinished(SourceFinished) + case finished(Finished) + } +} + +/// A producer suspended waiting for backpressure to allow further sends. +@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) +@usableFromInline +enum _MultiProducerSingleConsumerSuspendedProducer: @unchecked Sendable { + case closure((Result) -> Void) + case continuation(UnsafeContinuation) +} + +/// Helper to move a non-Sendable value across isolation regions (mirror of +/// the helper in AsyncAlgorithms; kept private to AsyncStreaming to avoid +/// reaching into another module's internals). +@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) +@usableFromInline +struct _Disconnected: ~Copyable, Sendable { + private nonisolated(unsafe) var value: Value + + @usableFromInline + init(value: consuming sending Value) { + self.value = value + } + + @usableFromInline + consuming func take() -> sending Value { + let value = consume value + return value + } + + @discardableResult + @usableFromInline + mutating func swap(newValue: consuming sending Value) -> sending Value { + let value = consume value + self = _Disconnected(value: newValue) + return value + } + + @usableFromInline + mutating func withValue( + body: (inout sending Value) throws(Failure) -> Return + ) throws(Failure) -> Return { + var value = consume value + let result: Return + do throws(Failure) { + result = try body(&value) + } catch { + self = _Disconnected(value: value) + throw error + } + self = _Disconnected(value: value) + return result + } +} +#endif diff --git a/Sources/AsyncStreaming/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerAsyncChannel.swift b/Sources/AsyncStreaming/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerAsyncChannel.swift new file mode 100644 index 00000000..036a3b59 --- /dev/null +++ b/Sources/AsyncStreaming/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerAsyncChannel.swift @@ -0,0 +1,421 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2026 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +#if UnstableAsyncStreaming && compiler(>=6.4) +public import DequeModule +public import ContainersPreview + +/// A multi-producer single-consumer channel. +/// +/// This is the streaming-oriented variant of +/// ``AsyncAlgorithms.MultiProducerSingleConsumerAsyncChannel``. Instead of +/// exposing the consumer side as an `AsyncSequence`, it offers a chunked +/// ``read(body:)`` method that delivers a noncopyable ``UniqueDeque`` buffer +/// to the caller, so elements move through the channel without copying. +/// +/// The channel applies backpressure to producers: it suspends writes when +/// the buffer rises above the high watermark and resumes them once the +/// buffer drops below the low watermark. +/// +/// To scope the channel and its initial source to a structured-concurrency +/// region, use ``withChannel(of:withFinalElement:throwing:backpressureStrategy:isolation:body:)``. +/// +/// The channel takes a ``FinalElement`` type that it delivers alongside the +/// end-of-stream signal. A producer terminates the channel by calling +/// either ``Source/finish(finalElement:)`` to signal end-of-stream +/// or ``Source/finish(throwing:)`` to terminate with a failure. +@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) +public struct MultiProducerSingleConsumerAsyncChannel< + Element, + FinalElement, + Failure: Error +>: ~Copyable { + @usableFromInline + let storage: _Storage + + @usableFromInline + init(storage: _Storage) { + self.storage = storage + } + + /// Creates a new channel and runs `body` with the channel and its initial + /// source. After `body` returns, the channel finalizes itself and resumes + /// any remaining suspended producers with an error. + /// + /// The channel and source are noncopyable and have no `deinit`-based + /// cleanup. To terminate the channel before its scope ends, call + /// ``Source/finish(finalElement:)`` or ``Source/finish(throwing:)`` on a + /// source. Otherwise `withChannel` finalizes the channel when `body` + /// returns. + /// + /// - Parameters: + /// - elementType: The element type of the channel. + /// - finalElementType: The end-of-stream payload type of the channel. + /// - failureType: The failure type of the channel. + /// - backpressureStrategy: The backpressure strategy that the channel uses. + /// - isolation: The actor isolation in which `body` runs. Defaults to the caller's isolation. + /// - body: A closure that receives ownership of the channel and its initial source. + /// - Returns: The value returned from `body`. + @inlinable + public static func withChannel( + of elementType: Element.Type = Element.self, + withFinalElement finalElementType: FinalElement.Type, + throwing failureType: Failure.Type = Never.self, + backpressureStrategy: Source.BackpressureStrategy, + isolation: isolated (any Actor)? = #isolation, + body: ( + consuming sending MultiProducerSingleConsumerAsyncChannel, + consuming sending Source + ) async throws(BodyFailure) -> sending Result + ) async throws(BodyFailure) -> sending Result { + let storage = _Storage( + backpressureStrategy: backpressureStrategy.internalBackpressureStrategy + ) + let channel = MultiProducerSingleConsumerAsyncChannel(storage: storage) + let source = Source(storage: storage) + let result: Result + do throws(BodyFailure) { + result = try await body(channel, source) + } catch { + storage.finish(throwing: nil, finalElement: nil) + storage.channelDeinitialized() + throw error + } + storage.finish(throwing: nil, finalElement: nil) + storage.channelDeinitialized() + return result + } +} + +@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) +extension MultiProducerSingleConsumerAsyncChannel where FinalElement == Void { + /// Creates a new channel with a `Void` end-of-stream payload and runs + /// `body` with the channel and its initial source. + /// + /// This overload is available when ``FinalElement`` is `Void`. It's + /// equivalent to calling + /// ``withChannel(of:withFinalElement:throwing:backpressureStrategy:isolation:body:)`` + /// with `withFinalElement: Void.self`. + public static func withChannel( + of elementType: Element.Type = Element.self, + throwing failureType: Failure.Type = Never.self, + backpressureStrategy: Source.BackpressureStrategy, + isolation: isolated (any Actor)? = #isolation, + body: ( + consuming sending MultiProducerSingleConsumerAsyncChannel, + consuming sending Source + ) async throws(BodyFailure) -> sending Result + ) async throws(BodyFailure) -> sending Result { + try await self.withChannel( + of: elementType, + withFinalElement: Void.self, + throwing: failureType, + backpressureStrategy: backpressureStrategy, + isolation: isolation, + body: body + ) + } +} + +@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) +extension MultiProducerSingleConsumerAsyncChannel { + /// A handle for sending elements to the channel. + public struct Source: ~Copyable, Sendable { + /// A backpressure strategy for the channel. + public struct BackpressureStrategy: Sendable { + @usableFromInline + var internalBackpressureStrategy: _InternalBackpressureStrategy + + @inlinable + init(internalBackpressureStrategy: _InternalBackpressureStrategy) { + self.internalBackpressureStrategy = internalBackpressureStrategy + } + + /// A backpressure strategy that suspends and resumes producers based on + /// high and low watermarks. + /// + /// - Parameters: + /// - low: When the buffered element count drops below this watermark, the channel resumes suspended producers. + /// - high: When the buffered element count rises above this watermark, the channel suspends new writes. + @inlinable + public static func watermark(low: Int, high: Int) -> BackpressureStrategy { + .init( + internalBackpressureStrategy: .watermark( + .init(low: low, high: high, waterLevelForElement: nil) + ) + ) + } + + /// A backpressure strategy that suspends and resumes producers based on + /// high and low watermarks, weighted by a per-element water level. + /// + /// - Parameters: + /// - low: When the water level drops below this watermark, the channel resumes suspended producers. + /// - high: When the water level rises above this watermark, the channel suspends new writes. + /// - waterLevelForElement: A closure that returns the water-level + /// contribution of a single element. The channel calls this closure + /// while holding its lock, so the closure must be free of side + /// effects and should run in constant time. + @inlinable + public static func watermark( + low: Int, + high: Int, + waterLevelForElement: @escaping @Sendable (borrowing Element) -> Int + ) -> BackpressureStrategy { + .init( + internalBackpressureStrategy: .watermark( + .init(low: low, high: high, waterLevelForElement: waterLevelForElement) + ) + ) + } + } + + @usableFromInline + enum _SendResult: ~Copyable, Sendable { + case produceMore + case enqueueCallback(callbackToken: UInt64) + } + + @usableFromInline + let _storage: _Storage + + @usableFromInline + let _id: UInt64 + + @usableFromInline + init(storage: _Storage) { + self._storage = storage + self._id = self._storage.sourceInitialized() + } + + /// Sets a callback to invoke when the channel terminates. + /// + /// The channel calls `callback` after the reader observes its last element. + /// If the channel has already terminated, the channel invokes `callback` + /// immediately. + /// + /// - Important: A source supports a single termination callback. Setting a + /// new callback replaces any previous one. + @inlinable + public func setOnTerminationCallback(_ callback: (@Sendable () -> Void)?) { + self._storage.setOnTerminationCallback(sourceID: self._id, callback: callback) + } + + /// Creates an additional source for sending elements to the channel + /// concurrently from multiple producers. + @inlinable + public mutating func clone() -> sending Self { + .init(storage: self._storage) + } + + /// Terminates the channel with the supplied error. + /// + /// After the reader consumes all buffered elements, the next call to + /// ``MultiProducerSingleConsumerAsyncChannel/read(body:)`` throws `error`. + /// This path delivers no ``FinalElement`` payload to the reader. + /// + /// To terminate the channel cleanly with an end-of-stream payload, call + /// ``finish(finalElement:)`` instead. When ``FinalElement`` is `Void`, + /// you can also call the ``finish()`` convenience. + @inlinable + public consuming func finish(throwing error: Failure) { + self._storage.finish(throwing: error, finalElement: nil) + } + + /// Finishes the channel with a ``FinalElement`` payload. + /// + /// The reader observes end-of-stream as a non-`nil` `finalElement` + /// argument to the body of its next + /// ``MultiProducerSingleConsumerAsyncChannel/read(body:)`` call. The + /// channel delivers any elements still buffered from earlier + /// ``write(buffer:)`` calls before the terminator. + /// + /// - Note: This method delivers only the end-of-stream signal. To send a + /// final batch of elements alongside the terminator, call + /// ``write(buffer:)`` first and then ``finish(finalElement:)``. + @inlinable + public consuming func finish(finalElement: consuming sending FinalElement) { + self._storage.finish(throwing: nil, finalElement: finalElement) + } + + /// Writes every element of `buffer` to the channel. + /// + /// On success the call drains `buffer` completely. If the channel's + /// backpressure strategy signals that production should pause, the call + /// suspends until the reader drains enough of the channel to fall below + /// the low watermark. + /// + /// - Throws: ``MultiProducerSingleConsumerAsyncChannelAlreadyFinishedError`` + /// if the channel has already finished, or `CancellationError` if the + /// task is canceled while suspended on backpressure. + @inlinable + public mutating func write & ~Copyable & Sendable>( + buffer: inout sending Buffer + ) async throws { + let sendResult: _SendResult + do { + sendResult = try self._storage.write(buffer: &buffer) + } catch { + throw error + } + + switch consume sendResult { + case .produceMore: + return + + case .enqueueCallback(let token): + let storage = self._storage + do { + try await withTaskCancellationHandler { + try await withUnsafeThrowingContinuation { (continuation: UnsafeContinuation) in + storage.enqueueProducer(callbackToken: token, continuation: continuation) + } + } onCancel: { + storage.cancelProducer(callbackToken: token) + } + } catch { + throw error + } + } + } + } +} + +@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) +extension MultiProducerSingleConsumerAsyncChannel.Source where FinalElement == Void { + /// Finishes the channel with an empty `Void` end-of-stream payload. + /// + /// This method is equivalent to calling ``finish(finalElement:)`` with + /// `()`. The reader observes end-of-stream as a non-`nil` + /// `finalElement` argument to the body of its next read. + @inlinable + public consuming func finish() { + self._storage.finish(throwing: nil, finalElement: ()) + } +} + +// MARK: - Reading + +@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) +extension MultiProducerSingleConsumerAsyncChannel { + /// Reads the next chunk of elements from the channel. + /// + /// The channel passes the buffered elements to `body` along with an + /// optional ``FinalElement`` payload. A non-`nil` `finalElement` marks + /// the chunk as terminal and delivers the end-of-stream signal. The + /// terminal chunk's buffer may be empty or contain a final batch of + /// elements. + /// + /// - Throws: An ``EitherError`` whose outer `.first` arm carries a + /// read-side error — either the channel's `Failure` (when a producer + /// called ``Source/finish(throwing:)``) or a `CancellationError` (when + /// the task is canceled while suspended in `read`) — and whose outer + /// `.second` arm carries the failure thrown by `body`. + /// + /// - Important: After the reader observes a non-`nil` `finalElement`, + /// calling `read(body:)` again is a programmer error. + @inlinable + public mutating func read( + body: (inout UniqueDeque, consuming FinalElement?) async throws(BodyFailure) -> Return + ) async throws(EitherError, BodyFailure>) -> Return { + while true { + let action = self.storage.readAvailable() + switch consume action { + case .returnElements(let disconnected): + var buffer = disconnected.take() + let result: Return + do throws(BodyFailure) { + result = try await body(&buffer, nil) + // TODO: This should not be necessary + nonisolated(unsafe) let buffer = buffer + self.storage.returnCachedReadBuffer(buffer) + } catch { + // TODO: This should not be necessary + nonisolated(unsafe) let buffer = buffer + self.storage.returnCachedReadBuffer(buffer) + throw .second(error) + } + return result + + case .returnElementsAndResumeProducers(let disconnected, let producers): + var buffer = disconnected.take() + for producer in producers { + switch producer { + case .closure(let onProduceMore): + onProduceMore(Result.success(())) + case .continuation(let continuation): + continuation.resume() + } + } + let result: Return + do throws(BodyFailure) { + result = try await body(&buffer, nil) + // TODO: This should not be necessary + nonisolated(unsafe) let buffer = buffer + self.storage.returnCachedReadBuffer(buffer) + } catch { + // TODO: This should not be necessary + nonisolated(unsafe) let buffer = buffer + self.storage.returnCachedReadBuffer(buffer) + throw .second(error) + } + return result + + case .returnTerminalChunk(let disconnectedBuffer, let disconnectedFinal, let onTerminations): + for (_, callback) in onTerminations { callback() } + var buffer = disconnectedBuffer.take() + let final = disconnectedFinal.take() + do throws(BodyFailure) { + return try await body(&buffer, final) + } catch { + throw .second(error) + } + + case .throwFailure(let failure, let onTerminations): + for (_, callback) in onTerminations { callback() } + if let failure { + throw .first(.first(failure)) + } + var empty = UniqueDeque() + do throws(BodyFailure) { + return try await body(&empty, nil) + } catch { + throw .second(error) + } + + case .returnNil: + var empty = UniqueDeque() + do throws(BodyFailure) { + return try await body(&empty, nil) + } catch { + throw .second(error) + } + + case .suspend: + do { + try await self.storage.suspendRead() + } catch { + throw .first(error) + } + continue + } + } + } +} + +/// An error that ``MultiProducerSingleConsumerAsyncChannel/Source/write(buffer:)`` +/// throws when its source has already finished. +@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) +public struct MultiProducerSingleConsumerAsyncChannelAlreadyFinishedError: Error { + @usableFromInline + init() {} +} +#endif diff --git a/Sources/AsyncStreaming/NNNN-async-streaming.md b/Sources/AsyncStreaming/NNNN-async-streaming.md index cc3c3c0b..1c1074c2 100644 --- a/Sources/AsyncStreaming/NNNN-async-streaming.md +++ b/Sources/AsyncStreaming/NNNN-async-streaming.md @@ -727,7 +727,7 @@ extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Cop public consuming func collect( upTo limit: Int, body: (consuming InputSpan) async throws(Failure) -> Result - ) async throws(EitherError, Failure>) -> (Result, FinalElement?) + ) async throws(EitherError, Failure>) -> (Result, FinalElement) } ``` diff --git a/Tests/AsyncStreamingTests/AsyncReader/AsyncReader+pipe.swift b/Tests/AsyncStreamingTests/AsyncReader/AsyncReader+pipe.swift index a8753127..cf5c17be 100644 --- a/Tests/AsyncStreamingTests/AsyncReader/AsyncReader+pipe.swift +++ b/Tests/AsyncStreamingTests/AsyncReader/AsyncReader+pipe.swift @@ -17,70 +17,162 @@ import Testing @Suite struct AsyncReaderPipeTests { + // MARK: - pipe(into:) — into a CallerAsyncWriter + + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) @Test func pipeIntoCopiesAllElements() async throws { - let reader = UniqueArrayAsyncReader( - storage: UniqueArray(capacity: 5, copying: [1, 2, 3, 4, 5]) - ) - let writer = UniqueArrayCallerAsyncWriter() + try await DuplexAsyncChannel.withDuplex( + of: Int.self, + backpressureStrategy: .watermark(low: 8, high: 32) + ) { writerA, readerA, writerB, readerB in + var writerA = writerA + let readerA = readerA + let writerB = writerB + let readerB = readerB + + var array = UniqueArray(copying: [1, 2, 3, 4, 5]) + try await writerA.write(buffer: &array) + writerA.finish() + try await readerB.pipe(into: writerB) - try await reader.pipe(into: writer) + try await readerA.collect(upTo: 5) { span in + #expect(span.count == 5) + #expect(span[0] == 1) + #expect(span[1] == 2) + #expect(span[2] == 3) + #expect(span[3] == 4) + #expect(span[4] == 5) + } + } } + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) @Test func pipeIntoWithEmptyReader() async throws { - let reader = UniqueArrayAsyncReader( - storage: UniqueArray() - ) - let writer = UniqueArrayCallerAsyncWriter() + try await DuplexAsyncChannel.withDuplex( + of: Int.self, + backpressureStrategy: .watermark(low: 8, high: 32) + ) { writerA, readerA, writerB, readerB in + let writerA = writerA + let readerA = readerA + let writerB = writerB + let readerB = readerB + + writerA.finish() + try await readerB.pipe(into: writerB) - try await reader.pipe(into: writer) + try await readerA.collect(upTo: 5) { span in + #expect(span.count == 0) + } + } } + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) @Test func pipeIntoPreservesElementOrder() async throws { - let elements = Array(1...50) - let reader = UniqueArrayAsyncReader( - storage: UniqueArray(capacity: elements.count, copying: elements) - ) - let writer = UniqueArrayCallerAsyncWriter(capacity: elements.count) + try await DuplexAsyncChannel.withDuplex( + of: Int.self, + backpressureStrategy: .watermark(low: 16, high: 100) + ) { writerA, readerA, writerB, readerB in + var writerA = writerA + let readerA = readerA + let writerB = writerB + let readerB = readerB - try await reader.pipe(into: writer) + let elements = Array(1...50) + var array = UniqueArray(copying: elements) + try await writerA.write(buffer: &array) + writerA.finish() + try await readerB.pipe(into: writerB) + + try await readerA.collect(upTo: 50) { span in + #expect(span.count == 50) + for i in 0..<50 { + #expect(span[i] == elements[i]) + } + } + } } + // MARK: - pipe(copyingInto:) — into an AsyncWriter via the adapter + + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) @Test func pipeCopyingIntoCopiesAllElements() async throws { - let reader = UniqueArrayAsyncReader( - storage: UniqueArray(capacity: 5, copying: [1, 2, 3, 4, 5]) - ) - let writer = UniqueArrayAsyncWriter() + try await DuplexAsyncChannel.withDuplex( + of: Int.self, + backpressureStrategy: .watermark(low: 8, high: 32) + ) { writerA, readerA, writerB, readerB in + var writerA = writerA + let readerA = readerA + let writerB = writerB + let readerB = readerB - try await reader.pipe(copyingInto: writer) + var array = UniqueArray(copying: [1, 2, 3, 4, 5]) + try await writerA.write(buffer: &array) + writerA.finish() + try await readerB.pipe(copyingInto: writerB.asAsyncWriter()) + + try await readerA.collect(upTo: 5) { span in + #expect(span.count == 5) + #expect(span[0] == 1) + #expect(span[1] == 2) + #expect(span[2] == 3) + #expect(span[3] == 4) + #expect(span[4] == 5) + } + } } + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) @Test func pipeCopyingIntoWithEmptyReader() async throws { - let reader = UniqueArrayAsyncReader( - storage: UniqueArray() - ) - let writer = UniqueArrayAsyncWriter() + try await DuplexAsyncChannel.withDuplex( + of: Int.self, + backpressureStrategy: .watermark(low: 8, high: 32) + ) { writerA, readerA, writerB, readerB in + let writerA = writerA + let readerA = readerA + let writerB = writerB + let readerB = readerB + + writerA.finish() + try await readerB.pipe(copyingInto: writerB.asAsyncWriter()) - try await reader.pipe(copyingInto: writer) + try await readerA.collect(upTo: 5) { span in + #expect(span.count == 0) + } + } } + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) @Test func pipeCopyingIntoChunksTerminalChunkAcrossMultipleWrites() async throws { - // The reader's terminal chunk is 200 elements; the writer hands out - // 64-element buffers. Verify pipe runs without dropping bytes — the - // payload-bearing version of this scenario in FinalElementPipeTests - // checks the actual contents delivered. - let elements = Array(1...200) - let reader = UniqueArrayAsyncReader( - storage: UniqueArray(capacity: elements.count, copying: elements) - ) - let writer = UniqueArrayAsyncWriter(capacity: 256) - - try await reader.pipe(copyingInto: writer) + // 200 elements through a small (16-element) AsyncWriter buffer + // forces the pipe loop to drain across multiple writes. + try await DuplexAsyncChannel.withDuplex( + of: Int.self, + backpressureStrategy: .watermark(low: 32, high: 256) + ) { writerA, readerA, writerB, readerB in + var writerA = writerA + let readerA = readerA + let writerB = writerB + let readerB = readerB + + let elements = Array(1...200) + var array = UniqueArray(copying: elements) + try await writerA.write(buffer: &array) + writerA.finish() + try await readerB.pipe(copyingInto: writerB.asAsyncWriter(initialCapacity: 16)) + + try await readerA.collect(upTo: 200) { span in + #expect(span.count == 200) + for i in 0..<200 { + #expect(span[i] == elements[i]) + } + } + } } } #endif diff --git a/Tests/AsyncStreamingTests/AsyncWriter/AsyncWriterCallerAsyncWriterAdapterTests.swift b/Tests/AsyncStreamingTests/AsyncWriter/AsyncWriterCallerAsyncWriterAdapterTests.swift new file mode 100644 index 00000000..ef349a02 --- /dev/null +++ b/Tests/AsyncStreamingTests/AsyncWriter/AsyncWriterCallerAsyncWriterAdapterTests.swift @@ -0,0 +1,127 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2026 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +#if UnstableAsyncStreaming && compiler(>=6.4) +import AsyncStreaming +import BasicContainers +import ContainersPreview +import Testing + +@Suite +struct AsyncWriterCallerAsyncWriterAdapterTests { + // The adapter wraps an AsyncWriter and exposes a CallerAsyncWriter. + // We can't get an AsyncWriter from the duplex directly (its Writer is + // a CallerAsyncWriter), so we wrap the duplex's Writer first via + // asAsyncWriter() to get an AsyncWriter, then wrap THAT via + // asCallerAsyncWriter() to exercise this adapter end-to-end. + + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) + @Test + func writeAndFinishRoundTrip() async throws { + try await DuplexAsyncChannel.withDuplex( + of: Int.self, + backpressureStrategy: .watermark(low: 8, high: 32) + ) { _, readerA, writerB, _ in + let readerA = readerA + let writerB = writerB + + var callerWriter = writerB.asAsyncWriter().asCallerAsyncWriter() + var buf = UniqueArray(copying: [1, 2, 3]) + try await callerWriter.write(buffer: &buf) + var empty = UniqueArray() + try await callerWriter.finish(buffer: &empty, finalElement: ()) + + try await readerA.collect(upTo: 5) { span in + #expect(span.count == 3) + #expect(span[0] == 1) + #expect(span[1] == 2) + #expect(span[2] == 3) + } + } + } + + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) + @Test + func writeLoopsAcrossMultipleUnderlyingBuffers() async throws { + // The CallerAsyncWriterAsyncWriterAdapter underneath uses a + // 4096-element default buffer. We use a small initialCapacity to + // force the inverse adapter to drive multiple underlying writes. + try await DuplexAsyncChannel.withDuplex( + of: Int.self, + backpressureStrategy: .watermark(low: 32, high: 256) + ) { _, readerA, writerB, _ in + let readerA = readerA + let writerB = writerB + + var callerWriter = + writerB + .asAsyncWriter(initialCapacity: 16) + .asCallerAsyncWriter() + + let elements = Array(1...100) + var buf = UniqueArray(copying: elements) + try await callerWriter.write(buffer: &buf) + var empty = UniqueArray() + try await callerWriter.finish(buffer: &empty, finalElement: ()) + + try await readerA.collect(upTo: 100) { span in + #expect(span.count == 100) + for i in 0..<100 { + #expect(span[i] == elements[i]) + } + } + } + } + + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) + @Test + func finishWithoutWriteDeliversEmptyTerminator() async throws { + try await DuplexAsyncChannel.withDuplex( + of: Int.self, + backpressureStrategy: .watermark(low: 8, high: 32) + ) { _, readerA, writerB, _ in + let readerA = readerA + let writerB = writerB + + let callerWriter = writerB.asAsyncWriter().asCallerAsyncWriter() + var empty = UniqueArray() + try await callerWriter.finish(buffer: &empty, finalElement: ()) + + try await readerA.collect(upTo: 5) { span in + #expect(span.count == 0) + } + } + } + + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) + @Test + func finishDeliversTrailingBufferAndPayload() async throws { + try await DuplexAsyncChannel.withDuplex( + of: Int.self, + backpressureStrategy: .watermark(low: 8, high: 32) + ) { _, readerA, writerB, _ in + let readerA = readerA + let writerB = writerB + + let callerWriter = writerB.asAsyncWriter().asCallerAsyncWriter() + var trailing = UniqueArray(copying: [42, 43, 44]) + try await callerWriter.finish(buffer: &trailing, finalElement: ()) + + try await readerA.collect(upTo: 5) { span in + #expect(span.count == 3) + #expect(span[0] == 42) + #expect(span[1] == 43) + #expect(span[2] == 44) + } + } + } +} +#endif diff --git a/Tests/AsyncStreamingTests/CallerAsyncReader/CallerAsyncReader+pipe.swift b/Tests/AsyncStreamingTests/CallerAsyncReader/CallerAsyncReader+pipe.swift index 0d851c12..93092ff2 100644 --- a/Tests/AsyncStreamingTests/CallerAsyncReader/CallerAsyncReader+pipe.swift +++ b/Tests/AsyncStreamingTests/CallerAsyncReader/CallerAsyncReader+pipe.swift @@ -17,66 +17,152 @@ import Testing @Suite struct CallerAsyncReaderPipeTests { + // MARK: - pipe(into:) — into an AsyncWriter via the adapter + + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) @Test func pipeIntoCopiesAllElements() async throws { - let reader = UniqueArrayCallerAsyncReader( - storage: UniqueArray(capacity: 5, copying: [1, 2, 3, 4, 5]) - ) - let writer = UniqueArrayAsyncWriter() + try await DuplexAsyncChannel.withDuplex( + of: Int.self, + backpressureStrategy: .watermark(low: 8, high: 32) + ) { _, readerA, writerB, _ in + let readerA = readerA + let writerB = writerB + + let source = UniqueArrayCallerAsyncReader( + storage: UniqueArray(copying: [1, 2, 3, 4, 5]) + ) + try await source.pipe(into: writerB.asAsyncWriter()) - try await reader.pipe(into: writer) + try await readerA.collect(upTo: 5) { span in + #expect(span.count == 5) + #expect(span[0] == 1) + #expect(span[1] == 2) + #expect(span[2] == 3) + #expect(span[3] == 4) + #expect(span[4] == 5) + } + } } + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) @Test func pipeIntoWithEmptyReader() async throws { - let reader = UniqueArrayCallerAsyncReader( - storage: UniqueArray() - ) - let writer = UniqueArrayAsyncWriter() + try await DuplexAsyncChannel.withDuplex( + of: Int.self, + backpressureStrategy: .watermark(low: 8, high: 32) + ) { _, readerA, writerB, _ in + let readerA = readerA + let writerB = writerB - try await reader.pipe(into: writer) + let source = UniqueArrayCallerAsyncReader(storage: UniqueArray()) + try await source.pipe(into: writerB.asAsyncWriter()) + + try await readerA.collect(upTo: 5) { span in + #expect(span.count == 0) + } + } } + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) @Test func pipeIntoLoopsAcrossMultipleBuffers() async throws { - let elements = Array(1...200) - let reader = UniqueArrayCallerAsyncReader( - storage: UniqueArray(capacity: elements.count, copying: elements) - ) - let writer = UniqueArrayAsyncWriter(capacity: 256) + // 200 elements through a small AsyncWriter buffer forces the pipe + // loop to iterate multiple times. + try await DuplexAsyncChannel.withDuplex( + of: Int.self, + backpressureStrategy: .watermark(low: 32, high: 256) + ) { _, readerA, writerB, _ in + let readerA = readerA + let writerB = writerB + + let elements = Array(1...200) + let source = UniqueArrayCallerAsyncReader( + storage: UniqueArray(copying: elements) + ) + try await source.pipe(into: writerB.asAsyncWriter(initialCapacity: 16)) - try await reader.pipe(into: writer) + try await readerA.collect(upTo: 200) { span in + #expect(span.count == 200) + for i in 0..<200 { + #expect(span[i] == elements[i]) + } + } + } } + // MARK: - pipe(bufferingInto:) — into a CallerAsyncWriter + + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) @Test func pipeBufferingIntoCopiesAllElements() async throws { - let reader = UniqueArrayCallerAsyncReader( - storage: UniqueArray(capacity: 5, copying: [1, 2, 3, 4, 5]) - ) - let writer = UniqueArrayCallerAsyncWriter() + try await DuplexAsyncChannel.withDuplex( + of: Int.self, + backpressureStrategy: .watermark(low: 8, high: 32) + ) { _, readerA, writerB, _ in + let readerA = readerA + let writerB = writerB + + let source = UniqueArrayCallerAsyncReader( + storage: UniqueArray(copying: [1, 2, 3, 4, 5]) + ) + try await source.pipe(bufferingInto: writerB, intermediateCapacity: 16) - try await reader.pipe(bufferingInto: writer, intermediateCapacity: 16) + try await readerA.collect(upTo: 5) { span in + #expect(span.count == 5) + #expect(span[0] == 1) + #expect(span[1] == 2) + #expect(span[2] == 3) + #expect(span[3] == 4) + #expect(span[4] == 5) + } + } } + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) @Test func pipeBufferingIntoWithEmptyReader() async throws { - let reader = UniqueArrayCallerAsyncReader( - storage: UniqueArray() - ) - let writer = UniqueArrayCallerAsyncWriter() + try await DuplexAsyncChannel.withDuplex( + of: Int.self, + backpressureStrategy: .watermark(low: 8, high: 32) + ) { _, readerA, writerB, _ in + let readerA = readerA + let writerB = writerB - try await reader.pipe(bufferingInto: writer, intermediateCapacity: 16) + let source = UniqueArrayCallerAsyncReader(storage: UniqueArray()) + try await source.pipe(bufferingInto: writerB, intermediateCapacity: 16) + + try await readerA.collect(upTo: 5) { span in + #expect(span.count == 0) + } + } } + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) @Test func pipeBufferingIntoReusesIntermediateBufferAcrossMultipleIterations() async throws { - let elements = Array(1...100) - let reader = UniqueArrayCallerAsyncReader( - storage: UniqueArray(capacity: elements.count, copying: elements) - ) - let writer = UniqueArrayCallerAsyncWriter(capacity: elements.count) + // 100 elements through a 16-element intermediate buffer forces the + // pipe loop to iterate the buffer many times. + try await DuplexAsyncChannel.withDuplex( + of: Int.self, + backpressureStrategy: .watermark(low: 16, high: 200) + ) { _, readerA, writerB, _ in + let readerA = readerA + let writerB = writerB + + let elements = Array(1...100) + let source = UniqueArrayCallerAsyncReader( + storage: UniqueArray(copying: elements) + ) + try await source.pipe(bufferingInto: writerB, intermediateCapacity: 16) - try await reader.pipe(bufferingInto: writer, intermediateCapacity: 16) + try await readerA.collect(upTo: 100) { span in + #expect(span.count == 100) + for i in 0..<100 { + #expect(span[i] == elements[i]) + } + } + } } } #endif diff --git a/Tests/AsyncStreamingTests/CallerAsyncWriter/CallerAsyncWriterAsyncWriterAdapterTests.swift b/Tests/AsyncStreamingTests/CallerAsyncWriter/CallerAsyncWriterAsyncWriterAdapterTests.swift new file mode 100644 index 00000000..a5f999de --- /dev/null +++ b/Tests/AsyncStreamingTests/CallerAsyncWriter/CallerAsyncWriterAsyncWriterAdapterTests.swift @@ -0,0 +1,136 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2026 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +#if UnstableAsyncStreaming && compiler(>=6.4) +import AsyncStreaming +import BasicContainers +import ContainersPreview +import Testing + +@Suite +struct CallerAsyncWriterAsyncWriterAdapterTests { + // The adapter wraps a CallerAsyncWriter and exposes an AsyncWriter, so + // we drive it through the duplex's CallerAsyncWriter side and verify + // the elements arrive on the peer reader. + + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) + @Test + func writeAndFinishRoundTrip() async throws { + try await DuplexAsyncChannel.withDuplex( + of: Int.self, + backpressureStrategy: .watermark(low: 8, high: 32) + ) { _, readerA, writerB, _ in + let readerA = readerA + let writerB = writerB + + var asyncWriter = writerB.asAsyncWriter() + try await asyncWriter.write { buffer in + buffer.append(1) + buffer.append(2) + buffer.append(3) + } + try await asyncWriter.finish() + + try await readerA.collect(upTo: 5) { span in + #expect(span.count == 3) + #expect(span[0] == 1) + #expect(span[1] == 2) + #expect(span[2] == 3) + } + } + } + + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) + @Test + func multipleWritesAreFlushedEagerly() async throws { + // The adapter must NOT defer the most recent write — each write call + // should flush before returning, so the peer can observe progress + // before close. We verify by reading back after each write. + try await DuplexAsyncChannel.withDuplex( + of: Int.self, + backpressureStrategy: .watermark(low: 8, high: 32) + ) { _, readerA, writerB, _ in + var readerA = readerA + let writerB = writerB + + var asyncWriter = writerB.asAsyncWriter() + + try await asyncWriter.write { $0.append(10) } + // Reader sees the first write before any second write or finish. + try await readerA.read { buffer, _ in + #expect(buffer.count == 1) + var c = buffer.consumeAll() + #expect(c.next() == 10) + } + + try await asyncWriter.write { $0.append(20) } + try await readerA.read { buffer, _ in + #expect(buffer.count == 1) + var c = buffer.consumeAll() + #expect(c.next() == 20) + } + + try await asyncWriter.finish() + try await readerA.read { buffer, finalElement in + #expect(buffer.count == 0) + #expect(finalElement != nil) + } + } + } + + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) + @Test + func finishWithoutWriteDeliversEmptyTerminator() async throws { + try await DuplexAsyncChannel.withDuplex( + of: Int.self, + backpressureStrategy: .watermark(low: 8, high: 32) + ) { _, readerA, writerB, _ in + let readerA = readerA + let writerB = writerB + + let asyncWriter = writerB.asAsyncWriter() + try await asyncWriter.finish() + + try await readerA.collect(upTo: 5) { span in + #expect(span.count == 0) + } + } + } + + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) + @Test + func customBufferTypePreservesElements() async throws { + try await DuplexAsyncChannel.withDuplex( + of: Int.self, + backpressureStrategy: .watermark(low: 8, high: 32) + ) { _, readerA, writerB, _ in + let readerA = readerA + let writerB = writerB + + var asyncWriter = writerB.asAsyncWriter( + bufferOf: UniqueArray.self, + initialCapacity: 16 + ) + try await asyncWriter.write { buffer in + for v in 1...5 { buffer.append(v) } + } + try await asyncWriter.finish() + + try await readerA.collect(upTo: 5) { span in + #expect(span.count == 5) + for i in 0..<5 { + #expect(span[i] == i + 1) + } + } + } + } +} +#endif diff --git a/Tests/AsyncStreamingTests/DuplexChannel/DuplexAsyncChannelTests.swift b/Tests/AsyncStreamingTests/DuplexChannel/DuplexAsyncChannelTests.swift new file mode 100644 index 00000000..4cc0b8e4 --- /dev/null +++ b/Tests/AsyncStreamingTests/DuplexChannel/DuplexAsyncChannelTests.swift @@ -0,0 +1,527 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2026 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +#if UnstableAsyncStreaming && compiler(>=6.4) + +import AsyncStreaming +import BasicContainers +import ContainersPreview +import DequeModule +import Testing + +@Suite(.serialized) +struct DuplexAsyncChannelTests { + // MARK: - Round-trip + + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) + @Test + func roundTripForwardDirection() async throws { + try await DuplexAsyncChannel.withDuplex( + of: Int.self, + backpressureStrategy: .watermark(low: 5, high: 10) + ) { writerA, readerA, writerB, readerB in + var writerA = writerA + var readerB = readerB + + var buf = UniqueArray(minimumCapacity: 5) + for v in [1, 2, 3, 4, 5] { buf.append(v) } + try await writerA.write(buffer: &buf) + + try await readerB.read { buffer, _ in + #expect(buffer.count == 5) + var c = buffer.consumeAll() + var collected: [Int] = [] + while let v = c.next() { collected.append(v) } + #expect(collected == [1, 2, 3, 4, 5]) + } + } + } + + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) + @Test + func roundTripReverseDirection() async throws { + try await DuplexAsyncChannel.withDuplex( + of: Int.self, + backpressureStrategy: .watermark(low: 5, high: 10) + ) { writerA, readerA, writerB, readerB in + var readerA = readerA + var writerB = writerB + + var buf = UniqueArray(minimumCapacity: 3) + for v in [10, 20, 30] { buf.append(v) } + try await writerB.write(buffer: &buf) + + try await readerA.read { buffer, _ in + #expect(buffer.count == 3) + var c = buffer.consumeAll() + var collected: [Int] = [] + while let v = c.next() { collected.append(v) } + #expect(collected == [10, 20, 30]) + } + } + } + + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) + @Test + func directionsAreIndependent() async throws { + // Bytes sent on the forward direction must NOT appear on the side that + // sent them. + try await DuplexAsyncChannel.withDuplex( + of: Int.self, + backpressureStrategy: .watermark(low: 2, high: 4) + ) { writerA, readerA, writerB, readerB in + var writerA = writerA + var readerA = readerA + var writerB = writerB + var readerB = readerB + + var fwdBuf = UniqueArray(minimumCapacity: 1) + fwdBuf.append(1) + try await writerA.write(buffer: &fwdBuf) + + var revBuf = UniqueArray(minimumCapacity: 1) + revBuf.append(99) + try await writerB.write(buffer: &revBuf) + + // readerB sees the forward write. + try await readerB.read { buffer, _ in + #expect(buffer.count == 1) + var c = buffer.consumeAll() + #expect(c.next() == 1) + } + + // readerA sees the reverse write, and only that. + try await readerA.read { buffer, _ in + #expect(buffer.count == 1) + var c = buffer.consumeAll() + #expect(c.next() == 99) + } + } + } + + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) + @Test + func concurrentReadsOnBothDirections() async throws { + try await withThrowingTaskGroup(of: [Int].self) { group in + try await DuplexAsyncChannel.withDuplex( + of: Int.self, + backpressureStrategy: .watermark(low: 5, high: 20) + ) { writerA, readerA, writerB, readerB in + var writerA = writerA + var readerA = readerA + var writerB = writerB + var readerB = readerB + + group.addTask { + var collected: [Int] = [] + var done = false + while !done { + try await readerA.read { buffer, finalElement in + var c = buffer.consumeAll() + while let v = c.next() { collected.append(v) } + if finalElement != nil { done = true } + } + } + return collected + } + + // Forward: write 0..<10 from main scope. + var fwd = UniqueArray(minimumCapacity: 10) + for i in 0..<10 { fwd.append(i) } + try await writerA.write(buffer: &fwd) + writerA.finish() + + // Reverse: write 100..<110 from main scope. + var rev = UniqueArray(minimumCapacity: 10) + for i in 100..<110 { rev.append(i) } + try await writerB.write(buffer: &rev) + writerB.finish() + + // Drain forward from the main scope. + var forwardCollected: [Int] = [] + var done = false + while !done { + try await readerB.read { buffer, finalElement in + var c = buffer.consumeAll() + while let v = c.next() { forwardCollected.append(v) } + if finalElement != nil { done = true } + } + } + #expect(forwardCollected == Array(0..<10)) + + let reverseCollected = try await group.next() ?? [] + #expect(reverseCollected == Array(100..<110)) + } + } + } + + // MARK: - Half-close + + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) + @Test + func finishingOneDirectionLeavesTheOtherOpen() async throws { + try await DuplexAsyncChannel.withDuplex( + of: Int.self, + backpressureStrategy: .watermark(low: 2, high: 4) + ) { writerA, readerA, writerB, readerB in + var writerA = writerA + var readerA = readerA + var writerB = writerB + var readerB = readerB + + // Close forward direction. + var fwdBuf = UniqueArray(minimumCapacity: 1) + fwdBuf.append(7) + try await writerA.write(buffer: &fwdBuf) + writerA.finish() + + // Reverse direction still works. + var revBuf = UniqueArray(minimumCapacity: 1) + revBuf.append(8) + try await writerB.write(buffer: &revBuf) + + // Forward EOS is fused with the buffered element on the same read. + var sawForwardFinal = false + try await readerB.read { buffer, finalElement in + #expect(buffer.count == 1) + if finalElement != nil { sawForwardFinal = true } + buffer.removeAll() + } + #expect(sawForwardFinal) + + // Reverse still delivers elements after forward is closed. + try await readerA.read { buffer, finalElement in + #expect(buffer.count == 1) + #expect(finalElement == nil) + buffer.removeAll() + } + } + } + + // MARK: - Final element + + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) + @Test + func finalElementDeliveredOnFinish() async throws { + try await DuplexAsyncChannel.withDuplex( + of: Int.self, + withFinalElement: String.self, + backpressureStrategy: .watermark(low: 2, high: 8) + ) { writerA, readerA, writerB, readerB in + var writerA = writerA + var readerB = readerB + + var buf = UniqueArray(minimumCapacity: 3) + for v in [1, 2, 3] { buf.append(v) } + try await writerA.write(buffer: &buf) + writerA.finish(finalElement: "trailers") + + var collected: [Int] = [] + var trailerSeen: String? = nil + var done = false + while !done { + try await readerB.read { buffer, finalElement in + var c = buffer.consumeAll() + while let v = c.next() { collected.append(v) } + if let f = finalElement { + trailerSeen = f + done = true + } + } + } + #expect(collected == [1, 2, 3]) + #expect(trailerSeen == "trailers") + } + } + + // MARK: - Failure isolation + + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) + @Test + func failureOnOneDirectionDoesNotPoisonTheOther() async throws { + struct TestError: Error, Equatable {} + + try await DuplexAsyncChannel.withDuplex( + of: Int.self, + throwing: TestError.self, + backpressureStrategy: .watermark(low: 2, high: 4) + ) { writerA, readerA, writerB, readerB in + var writerA = writerA + var readerA = readerA + var writerB = writerB + var readerB = readerB + + // Send a final element on reverse so we can verify it survives. + var revBuf = UniqueArray(minimumCapacity: 1) + revBuf.append(42) + try await writerB.write(buffer: &revBuf) + writerB.finish() + + // Fail forward direction. + writerA.finish(throwing: TestError()) + + // readerB sees the forward failure. + do { + try await readerB.read { _, _ in } + Issue.record("expected throw on forward direction") + } catch let EitherError, Never>.first(.first(err)) { + #expect(err == TestError()) + } catch { + Issue.record("unexpected error: \(error)") + } + + // readerA still gets the reverse element + EOS unaffected. The EOS + // is fused with the buffered element on the same read. + var sawFinal = false + try await readerA.read { buffer, finalElement in + #expect(buffer.count == 1) + var c = buffer.consumeAll() + #expect(c.next() == 42) + if finalElement != nil { sawFinal = true } + } + #expect(sawFinal) + } + } + + // MARK: - Backpressure isolation + + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) + @Test + func backpressureOnOneDirectionDoesNotBlockTheOther() async throws { + // Forward writer is suspended on backpressure (writes more than the + // high watermark with no concurrent reader). Reverse must still + // accept writes and deliver them while forward is stuck. + try await withThrowingTaskGroup(of: Void.self) { group in + try await DuplexAsyncChannel.withDuplex( + of: Int.self, + backpressureStrategy: .watermark(low: 1, high: 2) + ) { writerA, readerA, writerB, readerB in + var writerA = writerA + var readerA = readerA + var writerB = writerB + var readerB = readerB + + // Forward writer: 4 elements with high=2 → suspends mid-batch. + group.addTask { + var fwd = UniqueArray(minimumCapacity: 4) + for i in 0..<4 { fwd.append(i) } + try await writerA.write(buffer: &fwd) + } + + // Even with forward backpressured, reverse fully works. + var rev = UniqueArray(minimumCapacity: 1) + rev.append(99) + try await writerB.write(buffer: &rev) + try await readerA.read { buffer, _ in + #expect(buffer.count == 1) + var c = buffer.consumeAll() + #expect(c.next() == 99) + } + + // Drain forward (4 elements) so the suspended writer task can + // complete and we can join it before exiting the scope. + var collected: [Int] = [] + while collected.count < 4 { + try await readerB.read { buffer, _ in + var c = buffer.consumeAll() + while let v = c.next() { collected.append(v) } + } + } + #expect(collected == [0, 1, 2, 3]) + try await group.waitForAll() + } + } + } + + // MARK: - Multi-producer per direction + + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) + @Test + func clonedWriterCanProduceConcurrently() async throws { + // Two writes happen sequentially through the original writer and its + // clone, demonstrating both share the same direction. We can't issue + // `finish()` from inside an escaping closure (it consumes the writer), + // so we keep the multi-producer demonstration sequential here. + try await DuplexAsyncChannel.withDuplex( + of: Int.self, + backpressureStrategy: .watermark(low: 5, high: 50) + ) { writerA, readerA, writerB, readerB in + var writerA = writerA + var readerB = readerB + _ = readerA + _ = writerB + + var clone = writerA.clone() + + var buf1 = UniqueArray(minimumCapacity: 5) + for v in 0..<5 { buf1.append(v) } + try await writerA.write(buffer: &buf1) + + var buf2 = UniqueArray(minimumCapacity: 5) + for v in 100..<105 { buf2.append(v) } + try await clone.write(buffer: &buf2) + + // Either writer can close the direction independently. The other + // writer is still alive but the channel is now finishing. + writerA.finish() + clone.finish() + + nonisolated(unsafe) var collected = Set() + var done = false + while !done { + try await readerB.read { buffer, finalElement in + var c = buffer.consumeAll() + while let v = c.next() { collected.insert(v) } + if finalElement != nil { done = true } + } + } + #expect(collected == Set([0, 1, 2, 3, 4, 100, 101, 102, 103, 104])) + } + } + + // MARK: - Body-error wrapping + + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) + @Test + func readBodyErrorsWrappedInSecond() async throws { + struct BodyError: Error, Equatable {} + + try await DuplexAsyncChannel.withDuplex( + of: Int.self, + backpressureStrategy: .watermark(low: 2, high: 4) + ) { writerA, readerA, writerB, readerB in + var writerA = writerA + var readerB = readerB + _ = readerA + _ = writerB + + var buf = UniqueArray(minimumCapacity: 1) + buf.append(1) + try await writerA.write(buffer: &buf) + + do { + try await readerB.read { _, _ throws(BodyError) in + throw BodyError() + } + Issue.record("expected throw") + } catch let EitherError, BodyError>.second(err) { + #expect(err == BodyError()) + } catch { + Issue.record("unexpected error: \(error)") + } + } + } + + // MARK: - Scope cleanup + + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) + @Test + func scopeFinalizesBothDirectionsOnReturn() async throws { + nonisolated(unsafe) var aTerminated = false + nonisolated(unsafe) var bTerminated = false + await DuplexAsyncChannel.withDuplex( + of: Int.self, + backpressureStrategy: .watermark(low: 2, high: 4) + ) { writerA, readerA, writerB, readerB in + writerA.setOnTerminationCallback { aTerminated = true } + writerB.setOnTerminationCallback { bTerminated = true } + _ = readerA + _ = readerB + } + #expect(aTerminated) + #expect(bTerminated) + } + + // MARK: - Protocol conformance + + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) + @Test + func writerConformsToCallerAsyncWriter() async throws { + // Exercise CallerAsyncWriter.finish(buffer:finalElement:) by calling + // it through a generic function that only sees the protocol. + func finishViaProtocol( + _ writer: consuming W, + finalElement: consuming W.FinalElement? + ) async throws(W.WriteFailure) where W.WriteElement == Int { + var buf = UniqueArray(minimumCapacity: 3) + buf.append(7) + buf.append(8) + buf.append(9) + try await writer.finish(buffer: &buf, finalElement: finalElement) + } + + try await DuplexAsyncChannel.withDuplex( + of: Int.self, + backpressureStrategy: .watermark(low: 5, high: 50) + ) { writerA, readerA, writerB, readerB in + var readerB = readerB + _ = readerA + _ = writerB + + try await finishViaProtocol(writerA, finalElement: .some(())) + + var collected: [Int] = [] + var sawFinal = false + var done = false + while !done { + try await readerB.read { buffer, finalElement in + var c = buffer.consumeAll() + while let v = c.next() { collected.append(v) } + if finalElement != nil { + sawFinal = true + done = true + } + } + } + #expect(collected == [7, 8, 9]) + #expect(sawFinal) + } + } + + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) + @Test + func readerConformsToAsyncReader() async throws { + // Exercise AsyncReader.read through a generic function that only + // sees the protocol. + func readOneChunk( + _ reader: inout R + ) async throws -> [Int] where R.ReadElement == Int, R.Buffer == UniqueDeque { + nonisolated(unsafe) var collected: [Int] = [] + do throws(EitherError) { + try await reader.read { (buffer: inout R.Buffer, _: consuming R.FinalElement?) in + var c = buffer.consumeAll() + while let v = c.next() { collected.append(v) } + } + } catch { + // Swallow read-side errors for the test. + } + return collected + } + + try await DuplexAsyncChannel.withDuplex( + of: Int.self, + backpressureStrategy: .watermark(low: 2, high: 10) + ) { writerA, readerA, writerB, readerB in + var writerA = writerA + var readerB = readerB + _ = readerA + _ = writerB + + var buf = UniqueArray(minimumCapacity: 3) + for v in [11, 22, 33] { buf.append(v) } + try await writerA.write(buffer: &buf) + + let collected = try await readOneChunk(&readerB) + #expect(collected == [11, 22, 33]) + } + } +} +#endif diff --git a/Tests/AsyncStreamingTests/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerAsyncChannelTests.swift b/Tests/AsyncStreamingTests/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerAsyncChannelTests.swift new file mode 100644 index 00000000..56d5747d --- /dev/null +++ b/Tests/AsyncStreamingTests/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerAsyncChannelTests.swift @@ -0,0 +1,490 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2026 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +#if UnstableAsyncStreaming && compiler(>=6.4) + +import AsyncStreaming +import BasicContainers +import ContainersPreview +import DequeModule +import Testing + +@Suite(.serialized) +struct MultiProducerSingleConsumerAsyncChannelTests { + // MARK: - AsyncReader.read + + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) + @Test + func readReturnsAllBufferedElementsInOrder() async throws { + try await MultiProducerSingleConsumerAsyncChannel.withChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 5, high: 10) + ) { channel, source in + var channel = channel + var source = source + + var writeBuffer = UniqueArray(minimumCapacity: 5) + for v in [1, 2, 3, 4, 5] { writeBuffer.append(v) } + try await source.write(buffer: &writeBuffer) + + try await channel.read { buffer, _ in + #expect(buffer.count == 5) + var consumer = buffer.consumeAll() + var actual: [Int] = [] + while let v = consumer.next() { actual.append(v) } + #expect(actual == [1, 2, 3, 4, 5]) + } + } + } + + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) + @Test + func readSuspendsUntilElementArrives() async throws { + try await withThrowingTaskGroup(of: [Int].self) { group in + try await MultiProducerSingleConsumerAsyncChannel.withChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 2, high: 4) + ) { channel, source in + var channel = channel + var source = source + group.addTask { + var collected: [Int] = [] + try await channel.read { buffer, _ in + var consumer = buffer.consumeAll() + while let v = consumer.next() { collected.append(v) } + } + return collected + } + try await Task.sleep(nanoseconds: 10_000_000) + var writeBuffer = UniqueArray(minimumCapacity: 1) + writeBuffer.append(42) + try await source.write(buffer: &writeBuffer) + let result = try await group.next() + #expect(result == [42]) + } + } + } + + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) + @Test + func readReturnsEmptyBufferOnEOSAfterFinish() async throws { + try await MultiProducerSingleConsumerAsyncChannel.withChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 2, high: 4) + ) { channel, source in + var channel = channel + var source = source + + var writeBuffer = UniqueArray(minimumCapacity: 1) + writeBuffer.append(1) + try await source.write(buffer: &writeBuffer) + source.finish() + + try await channel.read { buffer, _ in + #expect(buffer.count == 1) + buffer.removeAll() + } + var sawEmpty = false + try await channel.read { buffer, _ in + sawEmpty = buffer.count == 0 + } + #expect(sawEmpty) + } + } + + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) + @Test + func readThrowsFailureAfterFinishWithError() async throws { + struct TestError: Error, Equatable {} + + try await MultiProducerSingleConsumerAsyncChannel.withChannel( + of: Int.self, + throwing: TestError.self, + backpressureStrategy: .watermark(low: 2, high: 4) + ) { channel, source in + var channel = channel + var source = source + + var writeBuffer = UniqueArray(minimumCapacity: 1) + writeBuffer.append(1) + try await source.write(buffer: &writeBuffer) + source.finish(throwing: TestError()) + + // First read still delivers the buffered element. + try? await channel.read { buffer, _ in + #expect(buffer.count == 1) + buffer.removeAll() + } + // Second read throws the queued failure through EitherError.first. + do { + try await channel.read { _, _ in } + Issue.record("expected throw") + } catch let EitherError, Never>.first(.first(err)) { + #expect(err == TestError()) + } catch { + Issue.record("unexpected error: \(error)") + } + } + } + + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) + @Test + func readBodyErrorsWrappedInSecond() async throws { + struct BodyError: Error, Equatable {} + + try await MultiProducerSingleConsumerAsyncChannel.withChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 2, high: 4) + ) { channel, source in + var channel = channel + var source = source + + var writeBuffer = UniqueArray(minimumCapacity: 1) + writeBuffer.append(1) + try await source.write(buffer: &writeBuffer) + + do { + try await channel.read { _, _ throws(BodyError) in + throw BodyError() + } + Issue.record("expected throw") + } catch let EitherError, BodyError>.second(err) { + #expect(err == BodyError()) + } catch { + Issue.record("unexpected error: \(error)") + } + } + } + + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) + @Test + func readPartialConsumptionRemainsVisibleOnNextRead() async throws { + try await MultiProducerSingleConsumerAsyncChannel.withChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 5, high: 10) + ) { channel, source in + var channel = channel + var source = source + + var writeBuffer = UniqueArray(minimumCapacity: 5) + for v in [1, 2, 3, 4, 5] { writeBuffer.append(v) } + try await source.write(buffer: &writeBuffer) + + try await channel.read { buffer, _ in + var consumer = buffer.consumeFirst(2) + #expect(consumer.next() == 1) + #expect(consumer.next() == 2) + } + + try await channel.read { buffer, _ in + #expect(buffer.count == 3) + var consumer = buffer.consumeAll() + var collected: [Int] = [] + while let v = consumer.next() { collected.append(v) } + #expect(collected == [3, 4, 5]) + } + } + } + + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) + @Test + func readResumesSuspendedProducersWhenWaterLevelDrops() async throws { + try await withThrowingTaskGroup(of: Void.self) { group in + try await MultiProducerSingleConsumerAsyncChannel.withChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 2, high: 4) + ) { channel, source in + var channel = channel + var source = source + + group.addTask { + // Fill to exactly the high watermark - this write suspends. + var firstBatch = UniqueArray(minimumCapacity: 4) + for v in [1, 2, 3, 4] { firstBatch.append(v) } + try await source.write(buffer: &firstBatch) + // Then send the 5th element once backpressure is relieved. + var secondBatch = UniqueArray(minimumCapacity: 1) + secondBatch.append(5) + try await source.write(buffer: &secondBatch) + } + + // Wait until the producer suspends after appending the first batch. + try await Task.sleep(nanoseconds: 10_000_000) + // Drain the buffered elements - the water level drops below the low + // watermark and wakes the producer. + try await channel.read { buffer, _ in + #expect(buffer.count == 4) + buffer.removeAll() + } + try await group.next() + // The 5th element produced after the producer resumed. + try await channel.read { buffer, _ in + #expect(buffer.count == 1) + buffer.removeAll() + } + } + } + } + + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) + @Test + func readWaterLevelForElementCalledPerElementInBatch() async throws { + nonisolated(unsafe) var callCount = 0 + try await MultiProducerSingleConsumerAsyncChannel.withChannel( + of: Int.self, + backpressureStrategy: .watermark( + low: 5, + high: 100, + waterLevelForElement: { _ in + callCount += 1 + return 1 + } + ) + ) { channel, source in + var channel = channel + var source = source + + var writeBuffer = UniqueArray(minimumCapacity: 4) + for v in [1, 2, 3, 4] { writeBuffer.append(v) } + try await source.write(buffer: &writeBuffer) + let afterSend = callCount + + try await channel.read { buffer, _ in + #expect(buffer.count == 4) + buffer.removeAll() + } + // Called once per element on send, once per element on consume. + #expect(afterSend == 4) + #expect(callCount == 8) + } + } + + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) + @Test + func readBufferIsReusedAcrossReads() async throws { + try await MultiProducerSingleConsumerAsyncChannel.withChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 5, high: 100) + ) { channel, source in + var channel = channel + var source = source + + // The channel alternates two internal buffers across reads (swap + // design): on each read the producer-side buffer is handed to the + // reader and the previously-handed-back buffer becomes the new + // producer-side. Verify that two reads of the same size land on + // matching capacities, i.e. the channel is reusing storage rather than + // allocating a fresh buffer per read. + var firstBatch = UniqueArray(minimumCapacity: 10) + for v in 0..<10 { firstBatch.append(v) } + try await source.write(buffer: &firstBatch) + nonisolated(unsafe) var firstCapacity = 0 + try await channel.read { buffer, _ in + firstCapacity = buffer.capacity + var c = buffer.consumeAll() + while c.next() != nil {} + } + #expect(firstCapacity >= 10) + + // Second read of the same size lands on the alternate buffer; after + // the first round trip both buffers have grown to at least the + // workload's capacity, so this read should see at least as much. + var secondBatch = UniqueArray(minimumCapacity: 10) + for v in 0..<10 { secondBatch.append(v) } + try await source.write(buffer: &secondBatch) + try await channel.read { buffer, _ in + #expect(buffer.capacity >= firstCapacity) + var c = buffer.consumeAll() + while c.next() != nil {} + } + + // A third read of the same size should reuse the original buffer + // (capacity matches first read's exactly). + var thirdBatch = UniqueArray(minimumCapacity: 10) + for v in 0..<10 { thirdBatch.append(v) } + try await source.write(buffer: &thirdBatch) + try await channel.read { buffer, _ in + #expect(buffer.capacity == firstCapacity) + var c = buffer.consumeAll() + while c.next() != nil {} + } + } + } + + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) + @Test + func readDeliversFinalElementOnFinish() async throws { + try await MultiProducerSingleConsumerAsyncChannel.withChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 2, high: 8) + ) { channel, source in + var channel = channel + var source = source + + var writeBuffer = UniqueArray(minimumCapacity: 3) + for v in [1, 2, 3] { writeBuffer.append(v) } + try await source.write(buffer: &writeBuffer) + source.finish() + + nonisolated(unsafe) var collected: [Int] = [] + var sawFinal = false + var done = false + while !done { + try await channel.read { buffer, finalElement in + var c = buffer.consumeAll() + while let v = c.next() { collected.append(v) } + if finalElement != nil { + sawFinal = true + done = true + } + } + } + #expect(collected == [1, 2, 3]) + #expect(sawFinal) + } + } + + // MARK: - CallerAsyncWriter.write + + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) + @Test + func writeAppendsAllElementsAndClearsBuffer() async throws { + try await MultiProducerSingleConsumerAsyncChannel.withChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 5, high: 100) + ) { channel, source in + var channel = channel + var source = source + + var writeBuffer = UniqueArray(minimumCapacity: 5) + writeBuffer.append(10) + writeBuffer.append(20) + writeBuffer.append(30) + + try await source.write(buffer: &writeBuffer) + #expect(writeBuffer.count == 0, "write should drain the caller's buffer") + + try await channel.read { buffer, _ in + #expect(buffer.count == 3) + var c = buffer.consumeAll() + var collected: [Int] = [] + while let v = c.next() { collected.append(v) } + #expect(collected == [10, 20, 30]) + } + } + } + + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) + @Test + func writeAfterFinishThrows() async throws { + await MultiProducerSingleConsumerAsyncChannel.withChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 1, high: 4) + ) { channel, source in + var channel = channel + var source = source + + // Keep an additional source around so we can attempt a write after the + // channel has been finished by consuming the first source. + var extraSource = source.clone() + source.finish() + + var writeBuffer = UniqueArray(minimumCapacity: 1) + writeBuffer.append(1) + + do { + try await extraSource.write(buffer: &writeBuffer) + Issue.record("expected throw") + } catch is MultiProducerSingleConsumerAsyncChannelAlreadyFinishedError { + // expected + } catch { + Issue.record("unexpected error: \(error)") + } + + // Drain anything still buffered before exiting the scope. + try? await channel.read { _, _ in } + } + } + + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) + @Test + func writeSuspendsOnBackpressureAndResumesAfterRead() async throws { + try await withThrowingTaskGroup(of: Void.self) { group in + try await MultiProducerSingleConsumerAsyncChannel.withChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 1, high: 2) + ) { channel, source in + var channel = channel + var source = source + + group.addTask { + // Reader drains until EOS. + while true { + var done = false + try await channel.read { buffer, _ in + if buffer.count == 0 { + done = true + } else { + buffer.removeAll() + } + } + if done { break } + } + } + + var buf = UniqueArray(minimumCapacity: 4) + for i in 0..<4 { buf.append(i) } + try await source.write(buffer: &buf) + source.finish() + try await group.next() + } + } + } + + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) + @Test + func writeReadRoundtripPreservesOrder() async throws { + let total = 100 + try await withThrowingTaskGroup(of: Void.self) { group in + try await MultiProducerSingleConsumerAsyncChannel.withChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 10, high: 50) + ) { channel, source in + var channel = channel + var source = source + + group.addTask { + nonisolated(unsafe) var collected: [Int] = [] + var done = false + while !done { + try await channel.read { buffer, _ in + if buffer.count == 0 { + done = true + } else { + var c = buffer.consumeAll() + while let v = c.next() { collected.append(v) } + } + } + } + #expect(collected == Array(0..(minimumCapacity: 10) + for i in start..<(start + 10) { buf.append(i) } + try await source.write(buffer: &buf) + } + source.finish() + try await group.next() + } + } + } +} +#endif