Skip to content

Commit 97019b3

Browse files
committed
[AsyncStreaming] Add bidirectional adapters for the async writers
## Motivation It is quite common to have one type of writer and wanting to adapt to the other type of writer. ## Modifications This PR adds two adapters into either direction between the writers to make conversion seamless. ## Result It is easy to go from one type of writer to the other type.
1 parent 90a5e45 commit 97019b3

2 files changed

Lines changed: 216 additions & 0 deletions

File tree

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Swift Async Algorithms open source project
4+
//
5+
// Copyright (c) 2026 Apple Inc. and the Swift project authors
6+
// Licensed under Apache License v2.0 with Runtime Library Exception
7+
//
8+
// See https://swift.org/LICENSE.txt for license information
9+
//
10+
//===----------------------------------------------------------------------===//
11+
12+
#if UnstableAsyncStreaming && compiler(>=6.4)
13+
public import ContainersPreview
14+
15+
/// A ``CallerAsyncWriter`` that is implemented in terms of an
16+
/// ``AsyncWriter``.
17+
///
18+
/// Each ``write(buffer:)`` call drains the caller's buffer through one
19+
/// or more ``AsyncWriter/write(_:)`` closures on the underlying writer.
20+
/// When the underlying writer's buffer fills before the caller's empties,
21+
/// the adapter loops with another closure call to continue draining.
22+
///
23+
/// The adapter introduces no buffer of its own — elements move directly
24+
/// from the caller-supplied buffer into the underlying writer's
25+
/// closure-supplied buffer. The underlying writer's deferred-flush
26+
/// behavior, if any, is preserved.
27+
@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *)
28+
public struct AsyncWriterCallerAsyncWriterAdapter<
29+
Underlying: AsyncWriter & ~Copyable
30+
>: ~Copyable, CallerAsyncWriter {
31+
public typealias WriteElement = Underlying.WriteElement
32+
public typealias WriteFailure = Underlying.WriteFailure
33+
public typealias FinalElement = Underlying.FinalElement
34+
35+
@usableFromInline
36+
var underlying: Underlying
37+
38+
@inlinable
39+
init(underlying: consuming Underlying) {
40+
self.underlying = underlying
41+
}
42+
43+
@inlinable
44+
public mutating func write<Buffer: RangeReplaceableContainer<WriteElement> & ~Copyable>(
45+
buffer: inout Buffer
46+
) async throws(WriteFailure) {
47+
var consumer = buffer.consumeAll()
48+
while let head = consumer.next() {
49+
var pending: WriteElement? = head
50+
do throws(EitherError<WriteFailure, Never>) {
51+
try await self.underlying.write {
52+
(innerBuffer: inout Underlying.Buffer) async throws(Never) -> Void in
53+
if case .some(let element) = pending.take() {
54+
innerBuffer.append(element)
55+
}
56+
while innerBuffer.freeCapacity > 0 {
57+
guard let element = consumer.next() else { return }
58+
innerBuffer.append(element)
59+
}
60+
}
61+
} catch {
62+
switch error {
63+
case .first(let writeFailure): throw writeFailure
64+
case .second: fatalError("Unreachable")
65+
}
66+
}
67+
}
68+
}
69+
70+
@inlinable
71+
public consuming func finish<Buffer: RangeReplaceableContainer<WriteElement> & ~Copyable>(
72+
buffer: inout Buffer,
73+
finalElement: consuming FinalElement
74+
) async throws(WriteFailure) {
75+
try await self.write(buffer: &buffer)
76+
try await self.underlying.finish(finalElement: finalElement)
77+
}
78+
}
79+
80+
@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *)
81+
extension AsyncWriter where Self: ~Copyable {
82+
/// Adapts this ``AsyncWriter`` to a ``CallerAsyncWriter``.
83+
///
84+
/// The returned adapter accepts caller-supplied buffers via
85+
/// ``CallerAsyncWriter/write(buffer:)`` and drains them through this
86+
/// writer's closure-based ``AsyncWriter/write(_:)``. When this
87+
/// writer's buffer fills before the caller's empties, the adapter
88+
/// loops with another closure call.
89+
///
90+
/// The adapter introduces no buffer of its own.
91+
///
92+
/// - Returns: An adapter that conforms to ``CallerAsyncWriter``.
93+
@inlinable
94+
public consuming func asCallerAsyncWriter() -> AsyncWriterCallerAsyncWriterAdapter<Self> {
95+
.init(underlying: self)
96+
}
97+
}
98+
#endif
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Swift Async Algorithms open source project
4+
//
5+
// Copyright (c) 2026 Apple Inc. and the Swift project authors
6+
// Licensed under Apache License v2.0 with Runtime Library Exception
7+
//
8+
// See https://swift.org/LICENSE.txt for license information
9+
//
10+
//===----------------------------------------------------------------------===//
11+
12+
#if UnstableAsyncStreaming && compiler(>=6.4)
13+
public import ContainersPreview
14+
public import BasicContainers
15+
16+
/// An ``AsyncWriter`` that is implemented in terms of a ``CallerAsyncWriter``.
17+
///
18+
/// The adapter allocates a fresh buffer for each ``write(_:)`` call, runs
19+
/// the body to fill it, and immediately drains it into the underlying
20+
/// ``CallerAsyncWriter``. Writes are flushed *eagerly*: the adapter does
21+
/// not defer the most recent buffer to fuse it with ``finish(finalElement:)``.
22+
///
23+
/// Eager flushing keeps request/response patterns deadlock-free — a write
24+
/// is observable to the peer as soon as the underlying writer accepts it.
25+
/// The trade-off is that fused close (HTTP/2 DATA+END_STREAM coalescing,
26+
/// and similar) is not available through this adapter; the underlying
27+
/// ``CallerAsyncWriter/finish(buffer:finalElement:)`` always receives an
28+
/// empty buffer. Conformers that need fused close should implement
29+
/// ``AsyncWriter`` directly rather than going through this adapter.
30+
@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *)
31+
public struct CallerAsyncWriterAsyncWriterAdapter<
32+
Underlying: CallerAsyncWriter & ~Copyable,
33+
Buffer: DynamicContainer<Underlying.WriteElement> & ~Copyable
34+
>: ~Copyable, AsyncWriter {
35+
public typealias WriteElement = Underlying.WriteElement
36+
public typealias WriteFailure = Underlying.WriteFailure
37+
public typealias FinalElement = Underlying.FinalElement
38+
39+
@usableFromInline
40+
var underlying: Underlying
41+
42+
@usableFromInline
43+
let initialCapacity: Int
44+
45+
@inlinable
46+
init(underlying: consuming Underlying, initialCapacity: Int) {
47+
self.underlying = underlying
48+
self.initialCapacity = initialCapacity
49+
}
50+
51+
@inlinable
52+
public mutating func write<Return: ~Copyable, Failure: Error>(
53+
_ body: (inout Buffer) async throws(Failure) -> Return
54+
) async throws(EitherError<WriteFailure, Failure>) -> Return {
55+
var buffer = Buffer(minimumCapacity: self.initialCapacity)
56+
let result: Return
57+
do throws(Failure) {
58+
result = try await body(&buffer)
59+
} catch {
60+
throw .second(error)
61+
}
62+
do throws(WriteFailure) {
63+
try await self.underlying.write(buffer: &buffer)
64+
} catch {
65+
throw .first(error)
66+
}
67+
return result
68+
}
69+
70+
@inlinable
71+
public consuming func finish(
72+
finalElement: consuming FinalElement
73+
) async throws(WriteFailure) {
74+
var empty = Buffer()
75+
try await self.underlying.finish(buffer: &empty, finalElement: finalElement)
76+
}
77+
}
78+
79+
@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *)
80+
extension CallerAsyncWriter where Self: ~Copyable {
81+
/// Adapts this ``CallerAsyncWriter`` to an ``AsyncWriter``, using
82+
/// ``UniqueArray`` as the buffer container.
83+
///
84+
/// Each ``AsyncWriter/write(_:)`` call on the returned adapter
85+
/// allocates a fresh buffer, runs the closure to fill it, and
86+
/// immediately drains it into this writer. Writes are flushed
87+
/// eagerly — see ``CallerAsyncWriterAsyncWriterAdapter`` for the
88+
/// trade-off this implies for fused close.
89+
///
90+
/// - Parameter initialCapacity: The capacity reserved on each
91+
/// freshly allocated buffer.
92+
/// - Returns: An adapter that conforms to ``AsyncWriter``.
93+
@inlinable
94+
public consuming func asAsyncWriter(
95+
initialCapacity: Int = 4096
96+
) -> CallerAsyncWriterAsyncWriterAdapter<Self, UniqueArray<WriteElement>> {
97+
.init(underlying: self, initialCapacity: initialCapacity)
98+
}
99+
100+
/// Adapts this ``CallerAsyncWriter`` to an ``AsyncWriter`` with a
101+
/// caller-chosen buffer container type.
102+
///
103+
/// - Parameters:
104+
/// - bufferType: The container type for buffers handed to the
105+
/// ``AsyncWriter/write(_:)`` body.
106+
/// - initialCapacity: The capacity reserved on each freshly
107+
/// allocated buffer.
108+
/// - Returns: An adapter that conforms to ``AsyncWriter``.
109+
@inlinable
110+
public consuming func asAsyncWriter<Buffer>(
111+
bufferOf bufferType: Buffer.Type,
112+
initialCapacity: Int = 4096
113+
) -> CallerAsyncWriterAsyncWriterAdapter<Self, Buffer>
114+
where Buffer: DynamicContainer<WriteElement> & ~Copyable {
115+
.init(underlying: self, initialCapacity: initialCapacity)
116+
}
117+
}
118+
#endif

0 commit comments

Comments
 (0)