Skip to content

Commit 57b1d68

Browse files
Add connection module (#146)
1 parent 28bee87 commit 57b1d68

File tree

7 files changed

+381
-356
lines changed

7 files changed

+381
-356
lines changed

scripts/unasync

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ unasync.unasync_files(
66
"src/ahttpx/__init__.py",
77
"src/ahttpx/__version__.py",
88
"src/ahttpx/_client.py",
9+
"src/ahttpx/_connection.py",
910
"src/ahttpx/_content.py",
1011
"src/ahttpx/_headers.py",
1112
"src/ahttpx/_parsers.py",

src/ahttpx/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
from .__version__ import __title__, __version__
22
from ._client import * # Client
3+
from ._connection import * # Connection, Transport
34
from ._content import * # Content, File, Files, Form, HTML, JSON, MultiPart, Text
45
from ._headers import * # Headers
56
from ._network import * # NetworkBackend, NetworkStream, timeout
67
from ._parsers import * # HTTPParser, HTTPStream, ProtocolError
7-
from ._pool import * # Connection, ConnectionPool, Transport
8+
from ._pool import * # ConnectionPool
89
from ._quickstart import * # get, post, put, patch, delete
910
from ._response import * # StatusCode, Response
1011
from ._request import * # Method, Request

src/ahttpx/_connection.py

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
import types
2+
3+
from ._content import Content
4+
from ._headers import Headers
5+
from ._network import Lock, NetworkBackend
6+
from ._parsers import HTTPParser, HTTPStream
7+
from ._response import Response
8+
from ._request import Method, Request
9+
from ._streams import Stream
10+
from ._urls import URL
11+
12+
13+
__all__ = [
14+
"Connection",
15+
"Transport",
16+
"open_connection",
17+
]
18+
19+
20+
class Transport:
21+
async def send(self, request: Request) -> Response:
22+
raise NotImplementedError()
23+
24+
async def close(self):
25+
pass
26+
27+
async def request(
28+
self,
29+
method: Method | str,
30+
url: URL | str,
31+
headers: Headers | dict[str, str] | None = None,
32+
content: Content | Stream | bytes | None = None,
33+
) -> Response:
34+
request = Request(method, url, headers=headers, content=content)
35+
async with await self.send(request) as response:
36+
await response.read()
37+
return response
38+
39+
async def stream(
40+
self,
41+
method: Method | str,
42+
url: URL | str,
43+
headers: Headers | dict[str, str] | None = None,
44+
content: Content | Stream | bytes | None = None,
45+
) -> Response:
46+
request = Request(method, url, headers=headers, content=content)
47+
response = await self.send(request)
48+
return response
49+
50+
51+
class Connection(Transport):
52+
def __init__(self, stream: Stream, origin: URL | str):
53+
self._stream = stream
54+
self._origin = URL(origin) if not isinstance(origin, URL) else origin
55+
self._request_lock = Lock()
56+
self._parser = HTTPParser(stream, mode='CLIENT')
57+
58+
# API for connection pool management...
59+
def origin(self) -> URL:
60+
return self._origin
61+
62+
def is_idle(self) -> bool:
63+
return self._parser.is_idle()
64+
65+
def is_expired(self) -> bool:
66+
return self._parser.is_idle() and self._parser.keepalive_expired()
67+
68+
def is_closed(self) -> bool:
69+
return self._parser.is_closed()
70+
71+
def description(self) -> str:
72+
return self._parser.description()
73+
74+
# API entry points...
75+
async def send(self, request: Request) -> Response:
76+
#async with self._request_lock:
77+
# try:
78+
await self._send_head(request)
79+
await self._send_body(request)
80+
code, headers = await self._recv_head()
81+
stream = HTTPStream(self._parser)
82+
# TODO...
83+
return Response(code, headers=headers, content=stream)
84+
# finally:
85+
# await self._cycle_complete()
86+
87+
async def close(self) -> None:
88+
async with self._request_lock:
89+
await self._close()
90+
91+
# Top-level API for working directly with a connection.
92+
async def request(
93+
self,
94+
method: Method | str,
95+
url: URL | str,
96+
headers: Headers | dict[str, str] | None = None,
97+
content: Content | Stream | bytes | None = None,
98+
) -> Response:
99+
url = self._origin.join(url)
100+
request = Request(method, url, headers=headers, content=content)
101+
async with await self.send(request) as response:
102+
await response.read()
103+
return response
104+
105+
async def stream(
106+
self,
107+
method: Method | str,
108+
url: URL | str,
109+
headers: Headers | dict[str, str] | None = None,
110+
content: Content | Stream | bytes | None = None,
111+
) -> Response:
112+
url = self._origin.join(url)
113+
request = Request(method, url, headers=headers, content=content)
114+
return await self.send(request)
115+
116+
# Send the request...
117+
async def _send_head(self, request: Request) -> None:
118+
method = bytes(request.method)
119+
target = request.url.target.encode('ascii')
120+
protocol = b'HTTP/1.1'
121+
await self._parser.send_method_line(method, target, protocol)
122+
headers = request.headers.as_byte_pairs()
123+
await self._parser.send_headers(headers)
124+
125+
async def _send_body(self, request: Request) -> None:
126+
while data := await request.stream.read(64 * 1024):
127+
await self._parser.send_body(data)
128+
await self._parser.send_body(b'')
129+
130+
# Receive the response...
131+
async def _recv_head(self) -> tuple[int, Headers]:
132+
_, code, _ = await self._parser.recv_status_line()
133+
h = await self._parser.recv_headers()
134+
headers = Headers([
135+
(k.decode('ascii'), v.decode('ascii'))
136+
for k, v in h
137+
])
138+
return code, headers
139+
140+
async def _recv_body(self) -> bytes:
141+
return await self._parser.recv_body()
142+
143+
# Request/response cycle complete...
144+
async def _close(self) -> None:
145+
await self._parser.close()
146+
147+
# Builtins...
148+
def __repr__(self) -> str:
149+
return f"<Connection [{self._origin} {self.description()}]>"
150+
151+
async def __aenter__(self) -> "Connection":
152+
return self
153+
154+
async def __aexit__(
155+
self,
156+
exc_type: type[BaseException] | None = None,
157+
exc_value: BaseException | None = None,
158+
traceback: types.TracebackType | None = None,
159+
):
160+
await self.close()
161+
162+
163+
async def open_connection(
164+
url: URL | str,
165+
hostname: str = '',
166+
backend: NetworkBackend | None = None,
167+
) -> Connection:
168+
169+
if isinstance(url, str):
170+
url = URL(url)
171+
172+
if url.scheme not in ("http", "https"):
173+
raise ValueError("URL scheme must be 'http://' or 'https://'.")
174+
if backend is None:
175+
backend = NetworkBackend()
176+
177+
host = url.host
178+
port = url.port or {"http": 80, "https": 443}[url.scheme]
179+
180+
if url.scheme == "https":
181+
stream = await backend.connect_tls(host, port, hostname)
182+
else:
183+
stream = await backend.connect(host, port)
184+
185+
return Connection(stream, url)

0 commit comments

Comments
 (0)