@@ -38,7 +38,7 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable
3838 /// The type of errors that can occur during reading operations.
3939 public typealias ReadFailure = any Error
4040
41- /// The shared reader state that holds the iterator and captures trailers .
41+ /// The shared reader state for iterator recovery and trailer capture .
4242 fileprivate var state : ReaderState
4343
4444 struct RequestBodyStateMachine {
@@ -69,10 +69,17 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable
6969
7070 enum ReadResult {
7171 case readBody( ByteBuffer )
72+ /// The request end was received. The caller should return the iterator
73+ /// to ReaderState and nil it out.
7274 case requestFinished
75+ /// The request was already finished in a previous read.
76+ case alreadyFinished
7377 }
7478
75- mutating func read( limit: Int ? ) async throws -> ReadResult {
79+ mutating func read(
80+ iterator: inout NIOAsyncChannelInboundStream < HTTPRequestPart > . AsyncIterator ? ,
81+ limit: Int ?
82+ ) async throws -> ReadResult {
7683 switch self . state {
7784 case . readingBody( let readingBodyState) :
7885 var bodyElement : ByteBuffer
@@ -85,42 +92,20 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable
8592
8693 case . noExcess:
8794 // There is no excess from previous reads. We obtain the next element from the stream.
88- // Take the iterator from ReaderState, read one part, and put it back.
89- // This ensures the iterator is always recoverable even if the reader
90- // is dropped without consuming .end.
91- guard var iterator = self . readerState. takeIterator ( ) else {
92- throw RequestBodyReadError . requestEndedBeforeReceivingEnd
93- }
94-
95- let requestPart : HTTPRequestPart ?
96- do {
97- requestPart = try await iterator. next ( isolation: #isolation)
98- } catch {
99- // Put the iterator back before propagating the error.
100- nonisolated ( unsafe) let iter = iterator
101- self . readerState. putIterator ( iter)
102- throw error
103- }
95+ let requestPart = try await iterator? . next ( isolation: #isolation)
10496
10597 switch requestPart {
10698 case . head:
107- nonisolated ( unsafe) let iter = iterator
108- self . readerState. putIterator ( iter)
10999 fatalError ( " Unexpectedly received a request head. " )
110100
111101 case . none:
112- // Stream ended without .end — don't put iterator back.
113102 throw RequestBodyReadError . requestEndedBeforeReceivingEnd
114103
115104 case . body( let element) :
116- nonisolated ( unsafe) let iter = iterator
117- self . readerState. putIterator ( iter)
118105 bodyElement = element
119106
120107 case . end( let trailers) :
121108 self . state = . finished
122- nonisolated ( unsafe) let iter = iterator
123- self . readerState. putIterator ( iter)
124109 self . readerState. wrapped. withLock { state in
125110 state. finishedReading = true
126111 state. trailers = trailers
@@ -142,17 +127,35 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable
142127 return . readBody( bodyElement)
143128
144129 case . finished:
145- return . requestFinished
130+ return . alreadyFinished
146131 }
147132 }
148133 }
149134
150135 var requestBodyStateMachine : RequestBodyStateMachine
151136
137+ /// The iterator that provides HTTP request parts from the underlying channel.
138+ /// Owned by the reader so it can be returned to ReaderState in deinit
139+ /// even if the handler didn't consume .end.
140+ private var iterator : NIOAsyncChannelInboundStream < HTTPRequestPart > . AsyncIterator ?
141+
152142 /// Initializes a new request body reader.
143+ ///
144+ /// Takes the iterator from ReaderState so the reader owns it for the read cycle.
153145 fileprivate init ( readerState: ReaderState ) {
154146 self . requestBodyStateMachine = . init( readerState: readerState)
155147 self . state = readerState
148+ self . iterator = readerState. takeIterator ( )
149+ }
150+
151+ deinit {
152+ // If the reader is dropped without having consumed .end, return the
153+ // iterator to ReaderState so the server can drain unconsumed parts
154+ // and reuse the connection.
155+ if let iterator = self . iterator {
156+ nonisolated ( unsafe) let iter = iterator
157+ self . state. putIterator ( iter)
158+ }
156159 }
157160
158161 /// Reads a chunk of request body data.
@@ -167,7 +170,10 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable
167170 ) async throws ( EitherError< ReadFailure, Failure> ) -> Return {
168171 let readResult : RequestBodyStateMachine . ReadResult
169172 do {
170- readResult = try await self . requestBodyStateMachine. read ( limit: maximumCount)
173+ readResult = try await self . requestBodyStateMachine. read (
174+ iterator: & self . iterator,
175+ limit: maximumCount
176+ )
171177 } catch {
172178 throw . first( error)
173179 }
@@ -178,6 +184,15 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable
178184 return try await body ( Array ( buffer: readElement) . span)
179185
180186 case . requestFinished:
187+ // Return iterator to ReaderState for connection reuse.
188+ if let iter = self . iterator {
189+ nonisolated ( unsafe) let iter = iter
190+ self . state. putIterator ( iter)
191+ }
192+ self . iterator = nil
193+ return try await body ( . init( ) )
194+
195+ case . alreadyFinished:
181196 return try await body ( . init( ) )
182197 }
183198 } catch {
0 commit comments