Skip to content

Commit 5b27533

Browse files
Tweak parsers API (#126)
1 parent 1db2ae3 commit 5b27533

File tree

13 files changed

+356
-218
lines changed

13 files changed

+356
-218
lines changed

docs/parsers.md

Lines changed: 90 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,22 @@
11
# Parsers
22

3-
```python
4-
writer = io.BytesIO()
5-
reader = io.BytesIO(
6-
b"HTTP/1.1 200 OK\r\n"
7-
b"Content-Length: 23\r\n"
8-
b"Content-Type: application/json\r\n"
9-
b"\r\n"
3+
### Client
4+
5+
<div class="tabs"><a onclick="httpx()" class="httpx">httpx</a> <a onclick="ahttpx()" class="ahttpx hidden">ahttpx</a></div>
6+
7+
```{ .python .httpx }
8+
stream = httpx.DuplexStream(
9+
b'HTTP/1.1 200 OK\r\n'
10+
b'Content-Length: 23\r\n'
11+
b'Content-Type: application/json\r\n'
12+
b'\r\n'
1013
b'{"msg": "hello, world"}'
1114
)
12-
p = httpx.HTTPParser(writer, reader)
15+
p = ahttpx.HTTPParser(stream, mode='CLIENT')
1316
1417
# Send the request...
15-
p.send_method_line(b"GET", b"/", b"HTTP/1.1")
16-
p.send_headers([(b"Host", b"example.com")])
18+
p.send_method_line(b'GET', b'/', b'HTTP/1.1')
19+
p.send_headers([(b'Host', b'www.example.com')])
1720
p.send_body(b'')
1821
1922
# Receive the response...
@@ -24,6 +27,83 @@ while buffer := p.recv_body():
2427
body += buffer
2528
```
2629

30+
```{ .python .ahttpx .hidden }
31+
stream = ahttpx.DuplexStream(
32+
b'HTTP/1.1 200 OK\r\n'
33+
b'Content-Length: 23\r\n'
34+
b'Content-Type: application/json\r\n'
35+
b'\r\n'
36+
b'{"msg": "hello, world"}'
37+
)
38+
p = ahttpx.HTTPParser(stream, mode='CLIENT')
39+
40+
# Send the request...
41+
await p.send_method_line(b'GET', b'/', b'HTTP/1.1')
42+
await p.send_headers([(b'Host', b'www.example.com')])
43+
await p.send_body(b'')
44+
45+
# Receive the response...
46+
protocol, code, reason_phase = await p.recv_status_line()
47+
headers = await p.recv_headers()
48+
body = b''
49+
while buffer := await p.recv_body():
50+
body += buffer
51+
```
52+
53+
### Server
54+
55+
<div class="tabs"><a onclick="httpx()" class="httpx">httpx</a> <a onclick="ahttpx()" class="ahttpx hidden">ahttpx</a></div>
56+
57+
```{ .python .httpx }
58+
stream = httpx.DuplexStream(
59+
b'GET / HTTP/1.1\r\n'
60+
b'Host: www.example.com\r\n'
61+
b'\r\n'
62+
)
63+
p = httpx.HTTPParser(stream, mode='SERVER')
64+
65+
# Receive the request...
66+
method, target, protocol = p.recv_method_line()
67+
headers = p.recv_headers()
68+
body = b''
69+
while buffer := p.recv_body():
70+
body += buffer
71+
72+
# Send the response...
73+
p.send_status_line(b'HTTP/1.1', 200, b'OK')
74+
p.send_headers([
75+
(b'Content-Length', b'23'),
76+
(b'Content-Type', b'application/json')
77+
])
78+
p.send_body(b'{"msg": "hello, world"}')
79+
p.send_body(b'')
80+
```
81+
82+
```{ .python .ahttpx .hidden }
83+
stream = ahttpx.DuplexStream(
84+
b'GET / HTTP/1.1\r\n'
85+
b'Host: www.example.com\r\n'
86+
b'\r\n'
87+
)
88+
p = ahttpx.HTTPParser(stream, mode='SERVER')
89+
90+
# Receive the request...
91+
method, target, protocol = await p.recv_method_line()
92+
headers = await p.recv_headers()
93+
body = b''
94+
while buffer := await p.recv_body():
95+
body += buffer
96+
97+
# Send the response...
98+
await p.send_status_line(b'HTTP/1.1', 200, b'OK')
99+
await p.send_headers([
100+
(b'Content-Length', b'23'),
101+
(b'Content-Type', b'application/json')
102+
])
103+
await p.send_body(b'{"msg": "hello, world"}')
104+
await p.send_body(b'')
105+
```
106+
27107
---
28108

29109
<span class="link-prev">← [Connections](connections.md)</span>

docs/streams.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,10 @@ The interfaces here are simplified versions of Python's standard I/O operations.
99
The base `Stream` class. The core of the interface is a subset of Python's `io.IOBase`...
1010

1111
* `.read(size=-1)` - *(bytes)* Return the bytes from the data stream. If the `size` argument is omitted or negative then the entire stream will be read. If `size` is an positive integer then the call returns at most `size` bytes. A return value of `b''` indicates the end of the stream has been reached.
12+
* `.write(self, data: bytes)` - *None* Write the given bytes to the data stream. May raise `NotImplmentedError` if this is not a writeable stream.
1213
* `.close()` - Close the stream. Any further operations will raise a `ValueError`.
1314

14-
Additionally, the following properties are also defined...
15+
Additionally, the following property is also defined...
1516

1617
* `.size` - *(int or None)* Return an integer indicating the size of the stream, or `None` if the size is unknown. When working with HTTP this is used to either set a `Content-Length: <size>` header, or a `Content-Encoding: chunked` header.
1718

src/ahttpx/__init__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@
33
from ._content import * # Content, File, Files, Form, HTML, JSON, MultiPart, Text
44
from ._headers import * # Headers
55
from ._network import * # NetworkBackend, NetworkStream, timeout
6-
from ._parsers import * # HTTPParser, Mode, ProtocolError
6+
from ._parsers import * # HTTPParser, ProtocolError
77
from ._pool import * # Connection, ConnectionPool, Transport
88
from ._quickstart import * # get, post, put, patch, delete
99
from ._response import * # Response
1010
from ._request import * # Request
11-
from ._streams import * # ByteStream, FileStream, HTTPStream, Stream
11+
from ._streams import * # ByteStream, DuplexStream, FileStream, HTTPStream, Stream
1212
from ._server import * # serve_http, run
1313
from ._urlencode import * # quote, unquote, urldecode, urlencode
1414
from ._urls import * # QueryParams, URL
@@ -23,6 +23,7 @@
2323
"ConnectionPool",
2424
"Content",
2525
"delete",
26+
"DuplexStream",
2627
"File",
2728
"FileStream",
2829
"Files",
@@ -33,7 +34,6 @@
3334
"HTTPParser",
3435
"HTTPStream",
3536
"JSON",
36-
"Mode",
3737
"MultiPart",
3838
"NetworkBackend",
3939
"NetworkStream",

src/ahttpx/_parsers.py

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -83,16 +83,15 @@ class HTTPParser:
8383
client.complete()
8484
client.close()
8585
"""
86-
def __init__(self, writer: Stream, reader: Stream, mode: Mode=Mode.CLIENT) -> None:
87-
self.writer = writer
88-
self.reader = reader
89-
self.parser = ReadAheadParser(reader)
90-
self.mode = mode
91-
92-
# Track client and server state...
93-
if mode == Mode.CLIENT:
94-
self.send_state = State.SEND_METHOD_LINE
95-
self.recv_state = State.WAIT
86+
def __init__(self, stream: Stream, mode: str) -> None:
87+
self.stream = stream
88+
self.parser = ReadAheadParser(stream)
89+
self.mode = {'CLIENT': Mode.CLIENT, 'SERVER': Mode.SERVER}[mode]
90+
91+
# Track state...
92+
if self.mode == Mode.CLIENT:
93+
self.send_state: State = State.SEND_METHOD_LINE
94+
self.recv_state: State = State.WAIT
9695
else:
9796
self.recv_state = State.RECV_METHOD_LINE
9897
self.send_state = State.WAIT
@@ -119,14 +118,14 @@ async def send_method_line(self, method: bytes, target: bytes, protocol: bytes)
119118
Sending state will switch to SEND_HEADERS state.
120119
"""
121120
if self.send_state != State.SEND_METHOD_LINE:
122-
msg = f"Called 'send_method_line' in invalid state {self.description()}"
121+
msg = f"Called 'send_method_line' in invalid state {self.send_state}"
123122
raise ProtocolError(msg)
124123

125124
# Send initial request line, eg. "GET / HTTP/1.1"
126125
if protocol != b'HTTP/1.1':
127126
raise ProtocolError("Sent unsupported protocol version")
128127
data = b" ".join([method, target, protocol]) + b"\r\n"
129-
await self.writer.write(data)
128+
await self.stream.write(data)
130129

131130
self.send_state = State.SEND_HEADERS
132131
self.recv_state = State.RECV_STATUS_LINE
@@ -140,15 +139,15 @@ async def send_status_line(self, protocol: bytes, status_code: int, reason: byte
140139
Sending state will switch to SEND_HEADERS state.
141140
"""
142141
if self.send_state != State.SEND_STATUS_LINE:
143-
msg = f"Called 'send_status_line' in invalid state {self.description()}"
142+
msg = f"Called 'send_status_line' in invalid state {self.send_state}"
144143
raise ProtocolError(msg)
145144

146145
# Send initial request line, eg. "GET / HTTP/1.1"
147146
if protocol != b'HTTP/1.1':
148147
raise ProtocolError("Sent unsupported protocol version")
149148
status_code_bytes = str(status_code).encode('ascii')
150149
data = b" ".join([protocol, status_code_bytes, reason]) + b"\r\n"
151-
await self.writer.write(data)
150+
await self.stream.write(data)
152151

153152
self.send_state = State.SEND_HEADERS
154153

@@ -161,7 +160,7 @@ async def send_headers(self, headers: list[tuple[bytes, bytes]]) -> None:
161160
Sending state will switch to SEND_BODY state.
162161
"""
163162
if self.send_state != State.SEND_HEADERS:
164-
msg = f"Called 'send_headers' in invalid state {self.description()}"
163+
msg = f"Called 'send_headers' in invalid state {self.send_state}"
165164
raise ProtocolError(msg)
166165

167166
# Update header state
@@ -187,7 +186,7 @@ async def send_headers(self, headers: list[tuple[bytes, bytes]]) -> None:
187186
# Send request headers
188187
lines = [name + b": " + value + b"\r\n" for name, value in headers]
189188
data = b"".join(lines) + b"\r\n"
190-
await self.writer.write(data)
189+
await self.stream.write(data)
191190

192191
self.send_state = State.SEND_BODY
193192

@@ -200,14 +199,14 @@ async def send_body(self, body: bytes) -> None:
200199
Sending state will switch to DONE.
201200
"""
202201
if self.send_state != State.SEND_BODY:
203-
msg = f"Called 'send_body' in invalid state {self.description()}"
202+
msg = f"Called 'send_body' in invalid state {self.send_state}"
204203
raise ProtocolError(msg)
205204

206205
if self.send_content_length is None:
207206
# Transfer-Encoding: chunked
208207
self.send_seen_length += len(body)
209208
marker = f'{len(body):x}\r\n'.encode('ascii')
210-
await self.writer.write(marker + body + b'\r\n')
209+
await self.stream.write(marker + body + b'\r\n')
211210

212211
else:
213212
# Content-Length: xxx
@@ -219,7 +218,7 @@ async def send_body(self, body: bytes) -> None:
219218
msg = 'Not enough data sent for declared Content-Length'
220219
raise ProtocolError(msg)
221220
if body:
222-
await self.writer.write(body)
221+
await self.stream.write(body)
223222

224223
if body == b'':
225224
# Handle body close
@@ -234,7 +233,7 @@ async def recv_method_line(self) -> tuple[bytes, bytes, bytes]:
234233
Receive state will switch to RECV_HEADERS.
235234
"""
236235
if self.recv_state != State.RECV_METHOD_LINE:
237-
msg = f"Called 'recv_method_line' in invalid state {self.description()}"
236+
msg = f"Called 'recv_method_line' in invalid state {self.recv_state}"
238237
raise ProtocolError(msg)
239238

240239
# Read initial response line, eg. "GET / HTTP/1.1"
@@ -257,7 +256,7 @@ async def recv_status_line(self) -> tuple[bytes, int, bytes]:
257256
Receive state will switch to RECV_HEADERS.
258257
"""
259258
if self.recv_state != State.RECV_STATUS_LINE:
260-
msg = f"Called 'recv_status_line' in invalid state {self.description()}"
259+
msg = f"Called 'recv_status_line' in invalid state {self.recv_state}"
261260
raise ProtocolError(msg)
262261

263262
# Read initial response line, eg. "HTTP/1.1 200 OK"
@@ -290,7 +289,7 @@ async def recv_headers(self) -> list[tuple[bytes, bytes]]:
290289
Receive state will revert to RECV_STATUS_CODE for interim 1xx responses.
291290
"""
292291
if self.recv_state != State.RECV_HEADERS:
293-
msg = f"Called 'recv_headers' in invalid state {self.description()}"
292+
msg = f"Called 'recv_headers' in invalid state {self.recv_state}"
294293
raise ProtocolError(msg)
295294

296295
# Read response headers
@@ -340,7 +339,7 @@ async def recv_body(self) -> bytes:
340339
The server will switch to DONE.
341340
"""
342341
if self.recv_state != State.RECV_BODY:
343-
msg = f"Called 'recv_body' in invalid state {self.description()}"
342+
msg = f"Called 'recv_body' in invalid state {self.recv_state}"
344343
raise ProtocolError(msg)
345344

346345
if self.recv_content_length is None:
@@ -390,6 +389,7 @@ async def complete(self):
390389
else:
391390
self.recv_state = State.RECV_METHOD_LINE
392391
self.send_state = State.WAIT
392+
393393
self.send_content_length = 0
394394
self.recv_content_length = 0
395395
self.send_seen_length = 0
@@ -402,7 +402,7 @@ async def close(self):
402402
if self.send_state != State.CLOSED:
403403
self.send_state = State.CLOSED
404404
self.recv_state = State.CLOSED
405-
await self.writer.close()
405+
await self.stream.close()
406406

407407
def is_idle(self) -> bool:
408408
return (

src/ahttpx/_pool.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ def __init__(self, stream: Stream, origin: URL | str):
145145
self._keepalive_duration = 5.0
146146
self._idle_expiry = time.monotonic() + self._keepalive_duration
147147
self._request_lock = Lock()
148-
self._parser = HTTPParser(stream, stream)
148+
self._parser = HTTPParser(stream, mode='CLIENT')
149149

150150
# API for connection pool management...
151151
def origin(self) -> URL:

src/ahttpx/_server.py

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -24,33 +24,33 @@ class HTTPConnection:
2424
def __init__(self, stream, endpoint):
2525
self._stream = stream
2626
self._endpoint = endpoint
27-
self._parser = HTTPParser(stream, stream, mode=Mode.SERVER)
27+
self._parser = HTTPParser(stream, mode='SERVER')
2828
self._keepalive_duration = 5.0
2929
self._idle_expiry = time.monotonic() + self._keepalive_duration
3030

3131
# API entry points...
3232
async def handle_requests(self):
33-
while not self._parser.is_closed():
34-
method, url, headers = await self._recv_head()
35-
stream = HTTPStream(self._recv_body, self._complete)
36-
# TODO: Handle endpoint exceptions
37-
async with Request(method, url, headers=headers, content=stream) as request:
38-
try:
39-
response = await self._endpoint(request)
40-
status_line = f"{request.method} {request.url.target} [{response.status_code} {response.reason_phrase}]"
41-
logger.info(status_line)
42-
except Exception as exc:
43-
logger.error("Internal Server Error", exc_info=True)
44-
content = Text("Internal Server Error")
45-
err = Response(code=500, content=content)
46-
await self._send_head(err)
47-
await self._send_body(err)
48-
else:
33+
try:
34+
while not self._parser.is_closed():
35+
method, url, headers = await self._recv_head()
36+
stream = HTTPStream(self._recv_body, self._complete)
37+
# TODO: Handle endpoint exceptions
38+
async with Request(method, url, headers=headers, content=stream) as request:
4939
try:
50-
await self._send_head(response)
51-
await self._send_body(response)
40+
response = await self._endpoint(request)
41+
status_line = f"{request.method} {request.url.target} [{response.status_code} {response.reason_phrase}]"
42+
logger.info(status_line)
5243
except Exception as exc:
5344
logger.error("Internal Server Error", exc_info=True)
45+
content = Text("Internal Server Error")
46+
err = Response(code=500, content=content)
47+
await self._send_head(err)
48+
await self._send_body(err)
49+
else:
50+
await self._send_head(response)
51+
await self._send_body(response)
52+
except Exception as exc:
53+
logger.error("Internal Server Error", exc_info=True)
5454

5555
async def close(self):
5656
self._parser.close()

0 commit comments

Comments
 (0)