Skip to content

Commit d0a991d

Browse files
Content vs stream (#148)
* __bytes__ interface on Content * Add Binary to public interface * Request/Response become properly immutable.
1 parent 57b1d68 commit d0a991d

29 files changed

+785
-610
lines changed

scripts/unasync

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ unasync.unasync_files(
55
fpath_list = [
66
"src/ahttpx/__init__.py",
77
"src/ahttpx/__version__.py",
8+
"src/ahttpx/_body.py",
89
"src/ahttpx/_client.py",
910
"src/ahttpx/_connection.py",
1011
"src/ahttpx/_content.py",

src/ahttpx/__init__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
from .__version__ import __title__, __version__
22
from ._client import * # Client
33
from ._connection import * # Connection, Transport
4-
from ._content import * # Content, File, Files, Form, HTML, JSON, MultiPart, Text
4+
from ._content import * # Binary, Content, File, Files, Form, HTML, JSON, MultiPart, Text
55
from ._headers import * # Headers
66
from ._network import * # NetworkBackend, NetworkStream, timeout
7-
from ._parsers import * # HTTPParser, HTTPStream, ProtocolError
7+
from ._parsers import * # HTTPParser, ProtocolError
88
from ._pool import * # ConnectionPool
99
from ._quickstart import * # get, post, put, patch, delete
1010
from ._response import * # StatusCode, Response
@@ -18,6 +18,7 @@
1818
__all__ = [
1919
"__title__",
2020
"__version__",
21+
"Binary",
2122
"ByteStream",
2223
"Client",
2324
"Connection",
@@ -33,7 +34,6 @@
3334
"Headers",
3435
"HTML",
3536
"HTTPParser",
36-
"HTTPStream",
3737
"JSON",
3838
"Method",
3939
"MultiPart",

src/ahttpx/_body.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import io
2+
3+
from ._content import Content
4+
from ._headers import Headers
5+
from ._parsers import HTTPParser
6+
from ._streams import Stream
7+
8+
9+
__all__ = ['RequestContent', 'ResponseContent', 'HTTPStream']
10+
11+
12+
class RequestContent(Content):
13+
def __init__(self, parser: HTTPParser):
14+
self._parser = parser
15+
16+
def open(self) -> Stream:
17+
return HTTPStream(self._parser, is_response=False)
18+
19+
def headers(self) -> Headers:
20+
return Headers()
21+
22+
23+
class ResponseContent(Content):
24+
def __init__(self, parser: HTTPParser):
25+
self._parser = parser
26+
27+
def open(self) -> Stream:
28+
return HTTPStream(self._parser, is_response=True)
29+
30+
def headers(self) -> Headers:
31+
return Headers()
32+
33+
34+
class HTTPStream(Stream):
35+
def __init__(self, parser: HTTPParser, is_response: bool):
36+
self._parser = parser
37+
self._is_response = is_response
38+
self._buffer = io.BytesIO()
39+
40+
async def read(self, size=-1) -> bytes:
41+
sections = []
42+
length = 0
43+
44+
# If we have any data in the buffer read that and clear the buffer.
45+
buffered = self._buffer.read()
46+
if buffered:
47+
sections.append(buffered)
48+
length += len(buffered)
49+
self._buffer.seek(0)
50+
self._buffer.truncate(0)
51+
52+
# Read each chunk in turn.
53+
while (size < 0) or (length < size):
54+
section = await self._parser.recv_body()
55+
sections.append(section)
56+
length += len(section)
57+
if section == b'':
58+
break
59+
60+
# If we've more data than requested, then push some back into the buffer.
61+
output = b''.join(sections)
62+
if size > -1 and len(output) > size:
63+
output, remainder = output[:size], output[size:]
64+
self._buffer.write(remainder)
65+
self._buffer.seek(0)
66+
67+
return output
68+
69+
async def close(self) -> None:
70+
self._buffer.close()
71+
if self._is_response:
72+
await self._parser.complete()

src/ahttpx/_client.py

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
from ._pool import ConnectionPool, Transport
77
from ._request import Method, Request
88
from ._response import Response
9-
from ._streams import Stream
109
from ._urls import URL
1110

1211
__all__ = ["Client"]
@@ -36,7 +35,7 @@ def build_request(
3635
method: Method | str,
3736
url: URL | str,
3837
headers: Headers | dict[str, str] | None = None,
39-
content: Content | Stream | bytes | None = None,
38+
content: Content | bytes | None = None,
4039
) -> Request:
4140
return Request(
4241
method=method,
@@ -50,19 +49,23 @@ async def request(
5049
method: Method | str,
5150
url: URL | str,
5251
headers: Headers | dict[str, str] | None = None,
53-
content: Content | Stream | bytes | None = None,
52+
content: Content | bytes | None = None,
5453
) -> Response:
5554
request = self.build_request(method, url, headers=headers, content=content)
56-
async with await self.via.send(request) as response:
57-
await response.read()
58-
return response
55+
async with await self.via.send(request) as stream:
56+
body = await stream.read()
57+
return Response(
58+
status_code=stream.status_code,
59+
headers=stream.headers,
60+
content=body,
61+
)
5962

6063
async def stream(
6164
self,
6265
method: Method | str,
6366
url: URL | str,
6467
headers: Headers | dict[str, str] | None = None,
65-
content: Content | Stream | bytes | None = None,
68+
content: Content | bytes | None = None,
6669
) -> Response:
6770
request = self.build_request(method, url, headers=headers, content=content)
6871
return await self.via.send(request)
@@ -78,23 +81,23 @@ async def post(
7881
self,
7982
url: URL | str,
8083
headers: Headers | dict[str, str] | None = None,
81-
content: Content | Stream | bytes | None = None,
84+
content: Content | bytes | None = None,
8285
):
8386
return await self.request("POST", url, headers=headers, content=content)
8487

8588
async def put(
8689
self,
8790
url: URL | str,
8891
headers: Headers | dict[str, str] | None = None,
89-
content: Content | Stream | bytes | None = None,
92+
content: Content | bytes | None = None,
9093
):
9194
return await self.request("PUT", url, headers=headers, content=content)
9295

9396
async def patch(
9497
self,
9598
url: URL | str,
9699
headers: Headers | dict[str, str] | None = None,
97-
content: Content | Stream | bytes | None = None,
100+
content: Content | bytes | None = None,
98101
):
99102
return await self.request("PATCH", url, headers=headers, content=content)
100103

@@ -146,8 +149,8 @@ async def send(self, request: Request) -> Response:
146149
# If we have a redirect, then we read the body of the response.
147150
# Ensures that the HTTP connection is available for a new
148151
# request/response cycle.
149-
await response.read()
150-
await response.close()
152+
async with response as stream:
153+
await stream.read()
151154

152155
# We've made a request-response and now need to issue a redirect request.
153156
request = self.build_redirect_request(request, response)

src/ahttpx/_connection.py

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
import types
22

3+
from ._body import ResponseContent
34
from ._content import Content
45
from ._headers import Headers
56
from ._network import Lock, NetworkBackend
6-
from ._parsers import HTTPParser, HTTPStream
7+
from ._parsers import HTTPParser
78
from ._response import Response
89
from ._request import Method, Request
910
from ._streams import Stream
@@ -29,23 +30,22 @@ async def request(
2930
method: Method | str,
3031
url: URL | str,
3132
headers: Headers | dict[str, str] | None = None,
32-
content: Content | Stream | bytes | None = None,
33+
content: Content | bytes | None = None,
3334
) -> Response:
3435
request = Request(method, url, headers=headers, content=content)
35-
async with await self.send(request) as response:
36-
await response.read()
37-
return response
36+
async with await self.send(request) as stream:
37+
body = await stream.read()
38+
return Response(stream.status_code, headers=stream.headers, content=body)
3839

3940
async def stream(
4041
self,
4142
method: Method | str,
4243
url: URL | str,
4344
headers: Headers | dict[str, str] | None = None,
44-
content: Content | Stream | bytes | None = None,
45+
content: Content | bytes | None = None,
4546
) -> Response:
4647
request = Request(method, url, headers=headers, content=content)
47-
response = await self.send(request)
48-
return response
48+
return await self.send(request)
4949

5050

5151
class Connection(Transport):
@@ -78,11 +78,8 @@ async def send(self, request: Request) -> Response:
7878
await self._send_head(request)
7979
await self._send_body(request)
8080
code, headers = await self._recv_head()
81-
stream = HTTPStream(self._parser)
82-
# TODO...
83-
return Response(code, headers=headers, content=stream)
84-
# finally:
85-
# await self._cycle_complete()
81+
content = ResponseContent(self._parser)
82+
return Response(code, headers=headers, content=content)
8683

8784
async def close(self) -> None:
8885
async with self._request_lock:
@@ -94,20 +91,20 @@ async def request(
9491
method: Method | str,
9592
url: URL | str,
9693
headers: Headers | dict[str, str] | None = None,
97-
content: Content | Stream | bytes | None = None,
94+
content: Content | bytes | None = None,
9895
) -> Response:
9996
url = self._origin.join(url)
10097
request = Request(method, url, headers=headers, content=content)
101-
async with await self.send(request) as response:
102-
await response.read()
103-
return response
98+
async with await self.send(request) as stream:
99+
body = await stream.read()
100+
return Response(stream.status_code, headers=stream.headers, content=body)
104101

105102
async def stream(
106103
self,
107104
method: Method | str,
108105
url: URL | str,
109106
headers: Headers | dict[str, str] | None = None,
110-
content: Content | Stream | bytes | None = None,
107+
content: Content | bytes | None = None,
111108
) -> Response:
112109
url = self._origin.join(url)
113110
request = Request(method, url, headers=headers, content=content)
@@ -123,9 +120,10 @@ async def _send_head(self, request: Request) -> None:
123120
await self._parser.send_headers(headers)
124121

125122
async def _send_body(self, request: Request) -> None:
126-
while data := await request.stream.read(64 * 1024):
127-
await self._parser.send_body(data)
128-
await self._parser.send_body(b'')
123+
async with request as stream:
124+
while data := await stream.read(64 * 1024):
125+
await self._parser.send_body(data)
126+
await self._parser.send_body(b'')
129127

130128
# Receive the response...
131129
async def _recv_head(self) -> tuple[int, Headers]:

0 commit comments

Comments
 (0)