Skip to content

Commit 16ab11d

Browse files
HTTPStream takes a parser as the parameter (#139)
1 parent a0b1904 commit 16ab11d

File tree

14 files changed

+122
-156
lines changed

14 files changed

+122
-156
lines changed

src/ahttpx/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@
33
from ._content import * # Content, File, Files, Form, HTML, JSON, MultiPart, Text
44
from ._headers import * # Headers
55
from ._network import * # NetworkBackend, NetworkStream, timeout
6-
from ._parsers import * # HTTPParser, ProtocolError
6+
from ._parsers import * # HTTPParser, HTTPStream, ProtocolError
77
from ._pool import * # Connection, ConnectionPool, Transport
88
from ._quickstart import * # get, post, put, patch, delete
99
from ._response import * # Response
1010
from ._request import * # Request
11-
from ._streams import * # ByteStream, DuplexStream, FileStream, HTTPStream, Stream
11+
from ._streams import * # ByteStream, DuplexStream, FileStream, Stream
1212
from ._server import * # serve_http, run
1313
from ._urlencode import * # quote, unquote, urldecode, urlencode
1414
from ._urls import * # QueryParams, URL

src/ahttpx/_parsers.py

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import enum
2+
import io
3+
import typing
24

35
from ._streams import Stream
46

5-
__all__ = ['HTTPParser', 'Mode', 'ProtocolError']
7+
__all__ = ['HTTPParser', 'HTTPStream', 'Mode', 'ProtocolError']
68

79

810
# TODO...
@@ -436,6 +438,50 @@ def __repr__(self) -> str:
436438
return f'<HTTPParser [{detail}]>'
437439

438440

441+
class HTTPStream(Stream):
442+
def __init__(self, parser: HTTPParser, callback: typing.Callable | None = None):
443+
self._parser = parser
444+
self._buffer = io.BytesIO()
445+
self._callback = callback
446+
447+
async def read(self, size=-1) -> bytes:
448+
sections = []
449+
length = 0
450+
451+
# If we have any data in the buffer read that and clear the buffer.
452+
buffered = self._buffer.read()
453+
if buffered:
454+
sections.append(buffered)
455+
length += len(buffered)
456+
self._buffer.seek(0)
457+
self._buffer.truncate(0)
458+
459+
# Read each chunk in turn.
460+
while (size < 0) or (length < size):
461+
section = await self._parser.recv_body()
462+
sections.append(section)
463+
length += len(section)
464+
if section == b'':
465+
break
466+
467+
# If we've more data than requested, then push some back into the buffer.
468+
output = b''.join(sections)
469+
if size > -1 and len(output) > size:
470+
output, remainder = output[:size], output[size:]
471+
self._buffer.write(remainder)
472+
self._buffer.seek(0)
473+
474+
return output
475+
476+
async def close(self) -> None:
477+
try:
478+
self._buffer.close()
479+
await self._parser.complete()
480+
finally:
481+
if self._callback is not None:
482+
await self._callback()
483+
484+
439485
class ReadAheadParser:
440486
"""
441487
A buffered I/O stream, with methods for read-ahead parsing.

src/ahttpx/_pool.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@
55
from ._content import Content
66
from ._headers import Headers
77
from ._network import Lock, NetworkBackend, Semaphore
8-
from ._parsers import HTTPParser
8+
from ._parsers import HTTPParser, HTTPStream
99
from ._response import Response
1010
from ._request import Request
11-
from ._streams import HTTPStream, Stream
11+
from ._streams import Stream
1212
from ._urls import URL
1313

1414

@@ -170,7 +170,7 @@ async def send(self, request: Request) -> Response:
170170
await self._send_head(request)
171171
await self._send_body(request)
172172
code, headers = await self._recv_head()
173-
stream = HTTPStream(self._recv_body, self._complete)
173+
stream = HTTPStream(self._parser, callback=self._complete)
174174
# TODO...
175175
return Response(code, headers=headers, content=stream)
176176
# finally:
@@ -237,7 +237,6 @@ async def _recv_body(self) -> bytes:
237237

238238
# Request/response cycle complete...
239239
async def _complete(self) -> None:
240-
await self._parser.complete()
241240
self._idle_expiry = time.monotonic() + self._keepalive_duration
242241

243242
async def _close(self) -> None:

src/ahttpx/_server.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,10 @@
33
import time
44

55
from ._content import Text
6-
from ._parsers import HTTPParser
6+
from ._parsers import HTTPParser, HTTPStream
77
from ._request import Request
88
from ._response import Response
99
from ._network import NetworkBackend, sleep
10-
from ._streams import HTTPStream
1110

1211
__all__ = [
1312
"serve_http", "run"
@@ -33,7 +32,7 @@ async def handle_requests(self):
3332
try:
3433
while not await self._parser.recv_close():
3534
method, url, headers = await self._recv_head()
36-
stream = HTTPStream(self._recv_body, self._complete)
35+
stream = HTTPStream(self._parser, callback=self._complete)
3736
# TODO: Handle endpoint exceptions
3837
async with Request(method, url, headers=headers, content=stream) as request:
3938
try:
@@ -89,7 +88,6 @@ async def _send_body(self, response: Response):
8988

9089
# Start it all over again...
9190
async def _complete(self):
92-
await self._parser.complete()
9391
self._idle_expiry = time.monotonic() + self._keepalive_duration
9492

9593

src/ahttpx/_streams.py

Lines changed: 3 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
import os
44

55

6+
__all__ = ['Stream', 'ByteStream', 'DuplexStream', 'FileStream', 'MultiPartStream']
7+
8+
69
class Stream:
710
async def read(self, size: int=-1) -> bytes:
811
raise NotImplementedError()
@@ -103,47 +106,6 @@ async def __aenter__(self):
103106
return self
104107

105108

106-
class HTTPStream(Stream):
107-
def __init__(self, next_chunk, complete):
108-
self._next_chunk = next_chunk
109-
self._complete = complete
110-
self._buffer = io.BytesIO()
111-
112-
async def read(self, size=-1) -> bytes:
113-
sections = []
114-
length = 0
115-
116-
# If we have any data in the buffer read that and clear the buffer.
117-
buffered = self._buffer.read()
118-
if buffered:
119-
sections.append(buffered)
120-
length += len(buffered)
121-
self._buffer.seek(0)
122-
self._buffer.truncate(0)
123-
124-
# Read each chunk in turn.
125-
while (size < 0) or (length < size):
126-
section = await self._next_chunk()
127-
sections.append(section)
128-
length += len(section)
129-
if section == b'':
130-
break
131-
132-
# If we've more data than requested, then push some back into the buffer.
133-
output = b''.join(sections)
134-
if size > -1 and len(output) > size:
135-
output, remainder = output[:size], output[size:]
136-
self._buffer.write(remainder)
137-
self._buffer.seek(0)
138-
139-
return output
140-
141-
async def close(self) -> None:
142-
self._buffer.close()
143-
if self._complete is not None:
144-
await self._complete()
145-
146-
147109
class MultiPartStream(Stream):
148110
def __init__(self, form: list[tuple[str, str]], files: list[tuple[str, str]], boundary=''):
149111
self._form = list(form)

src/httpx/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@
33
from ._content import * # Content, File, Files, Form, HTML, JSON, MultiPart, Text
44
from ._headers import * # Headers
55
from ._network import * # NetworkBackend, NetworkStream, timeout
6-
from ._parsers import * # HTTPParser, ProtocolError
6+
from ._parsers import * # HTTPParser, HTTPStream, ProtocolError
77
from ._pool import * # Connection, ConnectionPool, Transport
88
from ._quickstart import * # get, post, put, patch, delete
99
from ._response import * # Response
1010
from ._request import * # Request
11-
from ._streams import * # ByteStream, DuplexStream, FileStream, HTTPStream, Stream
11+
from ._streams import * # ByteStream, DuplexStream, FileStream, Stream
1212
from ._server import * # serve_http, run
1313
from ._urlencode import * # quote, unquote, urldecode, urlencode
1414
from ._urls import * # QueryParams, URL

src/httpx/_parsers.py

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import enum
2+
import io
3+
import typing
24

35
from ._streams import Stream
46

5-
__all__ = ['HTTPParser', 'Mode', 'ProtocolError']
7+
__all__ = ['HTTPParser', 'HTTPStream', 'Mode', 'ProtocolError']
68

79

810
# TODO...
@@ -436,6 +438,50 @@ def __repr__(self) -> str:
436438
return f'<HTTPParser [{detail}]>'
437439

438440

441+
class HTTPStream(Stream):
442+
def __init__(self, parser: HTTPParser, callback: typing.Callable | None = None):
443+
self._parser = parser
444+
self._buffer = io.BytesIO()
445+
self._callback = callback
446+
447+
def read(self, size=-1) -> bytes:
448+
sections = []
449+
length = 0
450+
451+
# If we have any data in the buffer read that and clear the buffer.
452+
buffered = self._buffer.read()
453+
if buffered:
454+
sections.append(buffered)
455+
length += len(buffered)
456+
self._buffer.seek(0)
457+
self._buffer.truncate(0)
458+
459+
# Read each chunk in turn.
460+
while (size < 0) or (length < size):
461+
section = self._parser.recv_body()
462+
sections.append(section)
463+
length += len(section)
464+
if section == b'':
465+
break
466+
467+
# If we've more data than requested, then push some back into the buffer.
468+
output = b''.join(sections)
469+
if size > -1 and len(output) > size:
470+
output, remainder = output[:size], output[size:]
471+
self._buffer.write(remainder)
472+
self._buffer.seek(0)
473+
474+
return output
475+
476+
def close(self) -> None:
477+
try:
478+
self._buffer.close()
479+
self._parser.complete()
480+
finally:
481+
if self._callback is not None:
482+
self._callback()
483+
484+
439485
class ReadAheadParser:
440486
"""
441487
A buffered I/O stream, with methods for read-ahead parsing.

src/httpx/_pool.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@
55
from ._content import Content
66
from ._headers import Headers
77
from ._network import Lock, NetworkBackend, Semaphore
8-
from ._parsers import HTTPParser
8+
from ._parsers import HTTPParser, HTTPStream
99
from ._response import Response
1010
from ._request import Request
11-
from ._streams import HTTPStream, Stream
11+
from ._streams import Stream
1212
from ._urls import URL
1313

1414

@@ -170,7 +170,7 @@ def send(self, request: Request) -> Response:
170170
self._send_head(request)
171171
self._send_body(request)
172172
code, headers = self._recv_head()
173-
stream = HTTPStream(self._recv_body, self._complete)
173+
stream = HTTPStream(self._parser, callback=self._complete)
174174
# TODO...
175175
return Response(code, headers=headers, content=stream)
176176
# finally:
@@ -237,7 +237,6 @@ def _recv_body(self) -> bytes:
237237

238238
# Request/response cycle complete...
239239
def _complete(self) -> None:
240-
self._parser.complete()
241240
self._idle_expiry = time.monotonic() + self._keepalive_duration
242241

243242
def _close(self) -> None:

src/httpx/_server.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,10 @@
33
import time
44

55
from ._content import Text
6-
from ._parsers import HTTPParser
6+
from ._parsers import HTTPParser, HTTPStream
77
from ._request import Request
88
from ._response import Response
99
from ._network import NetworkBackend, sleep
10-
from ._streams import HTTPStream
1110

1211
__all__ = [
1312
"serve_http", "run"
@@ -33,7 +32,7 @@ def handle_requests(self):
3332
try:
3433
while not self._parser.recv_close():
3534
method, url, headers = self._recv_head()
36-
stream = HTTPStream(self._recv_body, self._complete)
35+
stream = HTTPStream(self._parser, callback=self._complete)
3736
# TODO: Handle endpoint exceptions
3837
with Request(method, url, headers=headers, content=stream) as request:
3938
try:
@@ -89,7 +88,6 @@ def _send_body(self, response: Response):
8988

9089
# Start it all over again...
9190
def _complete(self):
92-
self._parser.complete()
9391
self._idle_expiry = time.monotonic() + self._keepalive_duration
9492

9593

src/httpx/_streams.py

Lines changed: 3 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
import os
44

55

6+
__all__ = ['Stream', 'ByteStream', 'DuplexStream', 'FileStream', 'MultiPartStream']
7+
8+
69
class Stream:
710
def read(self, size: int=-1) -> bytes:
811
raise NotImplementedError()
@@ -103,47 +106,6 @@ def __enter__(self):
103106
return self
104107

105108

106-
class HTTPStream(Stream):
107-
def __init__(self, next_chunk, complete):
108-
self._next_chunk = next_chunk
109-
self._complete = complete
110-
self._buffer = io.BytesIO()
111-
112-
def read(self, size=-1) -> bytes:
113-
sections = []
114-
length = 0
115-
116-
# If we have any data in the buffer read that and clear the buffer.
117-
buffered = self._buffer.read()
118-
if buffered:
119-
sections.append(buffered)
120-
length += len(buffered)
121-
self._buffer.seek(0)
122-
self._buffer.truncate(0)
123-
124-
# Read each chunk in turn.
125-
while (size < 0) or (length < size):
126-
section = self._next_chunk()
127-
sections.append(section)
128-
length += len(section)
129-
if section == b'':
130-
break
131-
132-
# If we've more data than requested, then push some back into the buffer.
133-
output = b''.join(sections)
134-
if size > -1 and len(output) > size:
135-
output, remainder = output[:size], output[size:]
136-
self._buffer.write(remainder)
137-
self._buffer.seek(0)
138-
139-
return output
140-
141-
def close(self) -> None:
142-
self._buffer.close()
143-
if self._complete is not None:
144-
self._complete()
145-
146-
147109
class MultiPartStream(Stream):
148110
def __init__(self, form: list[tuple[str, str]], files: list[tuple[str, str]], boundary=''):
149111
self._form = list(form)

0 commit comments

Comments
 (0)