1+ #if ! NET45
2+ #endif
3+
14namespace Microsoft . Net . Http . Client ;
25
3- internal sealed class BufferedReadStream : WriteClosableStream , IPeekableStream
6+ internal class BufferedReadStream : WriteClosableStream , IPeekableStream
47{
8+ private const char CR = '\r ' ;
9+ private const char LF = '\n ' ;
10+
511 private readonly Stream _inner ;
612 private readonly Socket _socket ;
713 private readonly byte [ ] _buffer ;
8- private readonly ILogger _logger ;
9- private int _bufferRefCount ;
10- private int _bufferOffset ;
11- private int _bufferCount ;
14+ private volatile int _bufferRefCount ;
15+ private int _bufferOffset = 0 ;
16+ private int _bufferCount = 0 ;
17+ private bool _disposed ;
1218
13- public BufferedReadStream ( Stream inner , Socket socket , ILogger logger )
14- : this ( inner , socket , 8192 , logger )
15- {
16- }
19+ public BufferedReadStream ( Stream inner , Socket socket )
20+ : this ( inner , socket , 1024 )
21+ { }
1722
18- public BufferedReadStream ( Stream inner , Socket socket , int bufferLength , ILogger logger )
23+ public BufferedReadStream ( Stream inner , Socket socket , int bufferLength )
1924 {
20- _inner = inner ?? throw new ArgumentNullException ( nameof ( inner ) ) ;
25+ if ( inner == null )
26+ {
27+ throw new ArgumentNullException ( nameof ( inner ) ) ;
28+ }
29+ _inner = inner ;
2130 _socket = socket ;
22- _buffer = ArrayPool < byte > . Shared . Rent ( bufferLength ) ;
23- _logger = logger ;
31+ #if ! NET45
2432 _bufferRefCount = 1 ;
33+ _buffer = ArrayPool < byte > . Shared . Rent ( bufferLength ) ;
34+ #else
35+ _buffer = new byte [ bufferLength ] ;
36+ #endif
2537 }
2638
2739 public override bool CanRead
@@ -55,23 +67,6 @@ public override long Position
5567 set { throw new NotSupportedException ( ) ; }
5668 }
5769
58- public override bool CanCloseWrite => _socket != null || _inner is WriteClosableStream ;
59-
60- protected override void Dispose ( bool disposing )
61- {
62- if ( disposing )
63- {
64- if ( Interlocked . Exchange ( ref _bufferRefCount , 0 ) == 1 )
65- {
66- ArrayPool < byte > . Shared . Return ( _buffer ) ;
67- }
68-
69- _inner . Dispose ( ) ;
70- }
71-
72- base . Dispose ( disposing ) ;
73- }
74-
7570 public override long Seek ( long offset , SeekOrigin origin )
7671 {
7772 throw new NotSupportedException ( ) ;
@@ -82,6 +77,24 @@ public override void SetLength(long value)
8277 throw new NotSupportedException ( ) ;
8378 }
8479
80+ protected override void Dispose ( bool disposing )
81+ {
82+ if ( ! _disposed )
83+ {
84+ _disposed = true ;
85+ if ( disposing )
86+ {
87+ _inner . Dispose ( ) ;
88+ #if ! NET45
89+ if ( Interlocked . Decrement ( ref _bufferRefCount ) == 0 )
90+ {
91+ ArrayPool < byte > . Shared . Return ( _buffer ) ;
92+ }
93+ #endif
94+ }
95+ }
96+ }
97+
8598 public override void Flush ( )
8699 {
87100 _inner . Flush ( ) ;
@@ -124,23 +137,6 @@ public override Task<int> ReadAsync(byte[] buffer, int offset, int count, Cancel
124137 return _inner . ReadAsync ( buffer , offset , count , cancellationToken ) ;
125138 }
126139
127- public override void CloseWrite ( )
128- {
129- if ( _socket != null )
130- {
131- _socket . Shutdown ( SocketShutdown . Send ) ;
132- return ;
133- }
134-
135- if ( _inner is WriteClosableStream writeClosableStream )
136- {
137- writeClosableStream . CloseWrite ( ) ;
138- return ;
139- }
140-
141- throw new NotSupportedException ( "Cannot shutdown write on this transport" ) ;
142- }
143-
144140 public bool Peek ( byte [ ] buffer , uint toPeek , out uint peeked , out uint available , out uint remaining )
145141 {
146142 int read = PeekBuffer ( buffer , toPeek , out peeked , out available , out remaining ) ;
@@ -157,49 +153,6 @@ public bool Peek(byte[] buffer, uint toPeek, out uint peeked, out uint available
157153 throw new NotSupportedException ( "_inner stream isn't a peekable stream" ) ;
158154 }
159155
160- public async Task < string > ReadLineAsync ( CancellationToken cancellationToken )
161- {
162- var line = new StringBuilder ( _buffer . Length ) ;
163-
164- var crIndex = - 1 ;
165-
166- var lfIndex = - 1 ;
167-
168- bool crlfFound ;
169-
170- do
171- {
172- if ( _bufferCount == 0 )
173- {
174- _bufferOffset = 0 ;
175-
176- _bufferCount = await _inner . ReadAsync ( _buffer , 0 , _buffer . Length , cancellationToken )
177- . ConfigureAwait ( false ) ;
178- }
179-
180- var c = ( char ) _buffer [ _bufferOffset ] ;
181- line . Append ( c ) ;
182-
183- _bufferOffset ++ ;
184- _bufferCount -- ;
185-
186- switch ( c )
187- {
188- case '\r ' :
189- crIndex = line . Length ;
190- break ;
191- case '\n ' :
192- lfIndex = line . Length ;
193- break ;
194- }
195-
196- crlfFound = crIndex + 1 == lfIndex ;
197- }
198- while ( ! crlfFound ) ;
199-
200- return line . ToString ( 0 , line . Length - 2 ) ;
201- }
202-
203156 private int ReadBuffer ( byte [ ] buffer , int offset , int count )
204157 {
205158 if ( _bufferCount > 0 )
@@ -231,4 +184,101 @@ private int PeekBuffer(byte[] buffer, uint toPeek, out uint peeked, out uint ava
231184 remaining = 0 ;
232185 return 0 ;
233186 }
187+
188+ private async Task EnsureBufferedAsync ( CancellationToken cancel )
189+ {
190+ if ( _bufferCount == 0 )
191+ {
192+ _bufferOffset = 0 ;
193+ #if ! NET45
194+ bool validBuffer = Interlocked . Increment ( ref _bufferRefCount ) > 1 ;
195+ try
196+ {
197+ if ( validBuffer )
198+ {
199+ _bufferCount = await _inner . ReadAsync ( _buffer , _bufferOffset , _buffer . Length , cancel ) . ConfigureAwait ( false ) ;
200+ }
201+ }
202+ finally
203+ {
204+ if ( ( Interlocked . Decrement ( ref _bufferRefCount ) == 0 ) && validBuffer )
205+ {
206+ ArrayPool < byte > . Shared . Return ( _buffer ) ;
207+ }
208+ }
209+ #else
210+ _bufferCount = await _inner . ReadAsync ( _buffer , _bufferOffset , _buffer . Length , cancel ) . ConfigureAwait ( false ) ;
211+ #endif
212+ if ( _bufferCount == 0 )
213+ {
214+ throw new IOException ( "Unexpected end of stream" ) ;
215+ }
216+ }
217+ }
218+
219+ // TODO: Line length limits?
220+ public async Task < string > ReadLineAsync ( CancellationToken cancel )
221+ {
222+ ThrowIfDisposed ( ) ;
223+ StringBuilder builder = new StringBuilder ( ) ;
224+ bool foundCR = false , foundCRLF = false ;
225+ do
226+ {
227+ if ( _bufferCount == 0 )
228+ {
229+ await EnsureBufferedAsync ( cancel ) . ConfigureAwait ( false ) ;
230+ }
231+
232+ char ch = ( char ) _buffer [ _bufferOffset ] ; // TODO: Encoding enforcement
233+ builder . Append ( ch ) ;
234+ _bufferOffset ++ ;
235+ _bufferCount -- ;
236+ if ( ch == CR )
237+ {
238+ foundCR = true ;
239+ }
240+ else if ( ch == LF )
241+ {
242+ if ( foundCR )
243+ {
244+ foundCRLF = true ;
245+ }
246+ else
247+ {
248+ foundCR = false ;
249+ }
250+ }
251+ }
252+ while ( ! foundCRLF ) ;
253+
254+ return builder . ToString ( 0 , builder . Length - 2 ) ; // Drop the CRLF
255+ }
256+
257+ private void ThrowIfDisposed ( )
258+ {
259+ if ( _disposed )
260+ {
261+ throw new ObjectDisposedException ( nameof ( BufferedReadStream ) ) ;
262+ }
263+ }
264+
265+ public override bool CanCloseWrite => _socket != null || _inner is WriteClosableStream ;
266+
267+ public override void CloseWrite ( )
268+ {
269+ if ( _socket != null )
270+ {
271+ _socket . Shutdown ( SocketShutdown . Send ) ;
272+ return ;
273+ }
274+
275+ var s = _inner as WriteClosableStream ;
276+ if ( s != null )
277+ {
278+ s . CloseWrite ( ) ;
279+ return ;
280+ }
281+
282+ throw new NotSupportedException ( "Cannot shutdown write on this transport" ) ;
283+ }
234284}
0 commit comments