Skip to content

Commit 28bee87

Browse files
Parser keepalive (#145)
* Add parser keepalive, drop callback from stream * Drop erronous checkin
1 parent 9ff90ce commit 28bee87

File tree

6 files changed

+46
-54
lines changed

6 files changed

+46
-54
lines changed

src/ahttpx/_parsers.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import enum
22
import io
3+
import time
34
import typing
45

56
from ._streams import Stream
@@ -89,6 +90,7 @@ def __init__(self, stream: Stream, mode: str) -> None:
8990
self.stream = stream
9091
self.parser = ReadAheadParser(stream)
9192
self.mode = {'CLIENT': Mode.CLIENT, 'SERVER': Mode.SERVER}[mode]
93+
self.keepalive_duration = 5.0
9294

9395
# Track state...
9496
if self.mode == Mode.CLIENT:
@@ -107,6 +109,7 @@ def __init__(self, stream: Stream, mode: str) -> None:
107109
# Track connection keep alive...
108110
self.send_keep_alive = True
109111
self.recv_keep_alive = True
112+
self.keepalive_until: float | None = None
110113

111114
# Special states...
112115
self.processing_1xx = False
@@ -119,6 +122,9 @@ async def send_method_line(self, method: bytes, target: bytes, protocol: bytes)
119122
120123
Sending state will switch to SEND_HEADERS state.
121124
"""
125+
# Scrub connection keepalive
126+
self.keepalive_until = None
127+
122128
if self.send_state != State.SEND_METHOD_LINE:
123129
msg = f"Called 'send_method_line' in invalid state {self.send_state}"
124130
raise ProtocolError(msg)
@@ -244,6 +250,9 @@ async def recv_method_line(self) -> tuple[bytes, bytes, bytes]:
244250
245251
Receive state will switch to RECV_HEADERS.
246252
"""
253+
# Scrub connection keepalive
254+
self.keepalive_until = None
255+
247256
if self.recv_state != State.RECV_METHOD_LINE:
248257
msg = f"Called 'recv_method_line' in invalid state {self.recv_state}"
249258
raise ProtocolError(msg)
@@ -409,6 +418,7 @@ async def complete(self):
409418
self.send_keep_alive = True
410419
self.recv_keep_alive = True
411420
self.processing_1xx = False
421+
self.keepalive_until = time.monotonic() + self.keepalive_duration
412422

413423
async def close(self):
414424
if self.send_state != State.CLOSED:
@@ -425,6 +435,9 @@ def is_idle(self) -> bool:
425435
def is_closed(self) -> bool:
426436
return self.send_state == State.CLOSED
427437

438+
def keepalive_expired(self) -> bool:
439+
return (self.keepalive_until is not None) and (time.monotonic() > self.keepalive_until)
440+
428441
def description(self) -> str:
429442
return {
430443
State.SEND_METHOD_LINE: "idle",
@@ -439,10 +452,9 @@ def __repr__(self) -> str:
439452

440453

441454
class HTTPStream(Stream):
442-
def __init__(self, parser: HTTPParser, callback: typing.Callable | None = None):
455+
def __init__(self, parser: HTTPParser):
443456
self._parser = parser
444457
self._buffer = io.BytesIO()
445-
self._callback = callback
446458

447459
async def read(self, size=-1) -> bytes:
448460
sections = []
@@ -474,12 +486,8 @@ async def read(self, size=-1) -> bytes:
474486
return output
475487

476488
async def close(self) -> None:
477-
try:
478-
self._buffer.close()
479-
await self._parser.complete()
480-
finally:
481-
if self._callback is not None:
482-
await self._callback()
489+
self._buffer.close()
490+
await self._parser.complete()
483491

484492

485493
class ReadAheadParser:

src/ahttpx/_pool.py

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,8 @@ async def _get_connection(self, request: Request) -> "Connection":
8484
# Attempt to reuse an existing connection.
8585
url = request.url
8686
origin = URL(scheme=url.scheme, host=url.host, port=url.port)
87-
now = time.monotonic()
8887
for conn in self._connections:
89-
if conn.origin() == origin and conn.is_idle() and not conn.is_expired(now):
88+
if conn.origin() == origin and conn.is_idle() and not conn.is_expired():
9089
return conn
9190

9291
# Or else create a new connection.
@@ -102,7 +101,7 @@ async def _get_connection(self, request: Request) -> "Connection":
102101
async def _cleanup(self) -> None:
103102
now = time.monotonic()
104103
for conn in list(self._connections):
105-
if conn.is_expired(now):
104+
if conn.is_expired():
106105
await conn.close()
107106
if conn.is_closed():
108107
self._connections.remove(conn)
@@ -142,8 +141,6 @@ class Connection(Transport):
142141
def __init__(self, stream: Stream, origin: URL | str):
143142
self._stream = stream
144143
self._origin = URL(origin) if not isinstance(origin, URL) else origin
145-
self._keepalive_duration = 5.0
146-
self._idle_expiry = time.monotonic() + self._keepalive_duration
147144
self._request_lock = Lock()
148145
self._parser = HTTPParser(stream, mode='CLIENT')
149146

@@ -154,8 +151,8 @@ def origin(self) -> URL:
154151
def is_idle(self) -> bool:
155152
return self._parser.is_idle()
156153

157-
def is_expired(self, when: float) -> bool:
158-
return self._parser.is_idle() and when > self._idle_expiry
154+
def is_expired(self) -> bool:
155+
return self._parser.is_idle() and self._parser.keepalive_expired()
159156

160157
def is_closed(self) -> bool:
161158
return self._parser.is_closed()
@@ -170,7 +167,7 @@ async def send(self, request: Request) -> Response:
170167
await self._send_head(request)
171168
await self._send_body(request)
172169
code, headers = await self._recv_head()
173-
stream = HTTPStream(self._parser, callback=self._complete)
170+
stream = HTTPStream(self._parser)
174171
# TODO...
175172
return Response(code, headers=headers, content=stream)
176173
# finally:
@@ -233,9 +230,6 @@ async def _recv_body(self) -> bytes:
233230
return await self._parser.recv_body()
234231

235232
# Request/response cycle complete...
236-
async def _complete(self) -> None:
237-
self._idle_expiry = time.monotonic() + self._keepalive_duration
238-
239233
async def _close(self) -> None:
240234
await self._parser.close()
241235

src/ahttpx/_server.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,13 @@ def __init__(self, stream, endpoint):
2424
self._stream = stream
2525
self._endpoint = endpoint
2626
self._parser = HTTPParser(stream, mode='SERVER')
27-
self._keepalive_duration = 5.0
28-
self._idle_expiry = time.monotonic() + self._keepalive_duration
2927

3028
# API entry points...
3129
async def handle_requests(self):
3230
try:
33-
while not await self._parser.recv_close():
31+
while not (self._parser.keepalive_expired() or await self._parser.recv_close()):
3432
method, url, headers = await self._recv_head()
35-
stream = HTTPStream(self._parser, callback=self._complete)
33+
stream = HTTPStream(self._parser)
3634
# TODO: Handle endpoint exceptions
3735
async with Request(method, url, headers=headers, content=stream) as request:
3836
try:
@@ -82,10 +80,6 @@ async def _send_body(self, response: Response):
8280
await self._parser.send_body(data)
8381
await self._parser.send_body(b'')
8482

85-
# Start it all over again...
86-
async def _complete(self):
87-
self._idle_expiry = time.monotonic() + self._keepalive_duration
88-
8983

9084
class HTTPServer:
9185
def __init__(self, host, port):

src/httpx/_parsers.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import enum
22
import io
3+
import time
34
import typing
45

56
from ._streams import Stream
@@ -89,6 +90,7 @@ def __init__(self, stream: Stream, mode: str) -> None:
8990
self.stream = stream
9091
self.parser = ReadAheadParser(stream)
9192
self.mode = {'CLIENT': Mode.CLIENT, 'SERVER': Mode.SERVER}[mode]
93+
self.keepalive_duration = 5.0
9294

9395
# Track state...
9496
if self.mode == Mode.CLIENT:
@@ -107,6 +109,7 @@ def __init__(self, stream: Stream, mode: str) -> None:
107109
# Track connection keep alive...
108110
self.send_keep_alive = True
109111
self.recv_keep_alive = True
112+
self.keepalive_until: float | None = None
110113

111114
# Special states...
112115
self.processing_1xx = False
@@ -119,6 +122,9 @@ def send_method_line(self, method: bytes, target: bytes, protocol: bytes) -> Non
119122
120123
Sending state will switch to SEND_HEADERS state.
121124
"""
125+
# Scrub connection keepalive
126+
self.keepalive_until = None
127+
122128
if self.send_state != State.SEND_METHOD_LINE:
123129
msg = f"Called 'send_method_line' in invalid state {self.send_state}"
124130
raise ProtocolError(msg)
@@ -244,6 +250,9 @@ def recv_method_line(self) -> tuple[bytes, bytes, bytes]:
244250
245251
Receive state will switch to RECV_HEADERS.
246252
"""
253+
# Scrub connection keepalive
254+
self.keepalive_until = None
255+
247256
if self.recv_state != State.RECV_METHOD_LINE:
248257
msg = f"Called 'recv_method_line' in invalid state {self.recv_state}"
249258
raise ProtocolError(msg)
@@ -409,6 +418,7 @@ def complete(self):
409418
self.send_keep_alive = True
410419
self.recv_keep_alive = True
411420
self.processing_1xx = False
421+
self.keepalive_until = time.monotonic() + self.keepalive_duration
412422

413423
def close(self):
414424
if self.send_state != State.CLOSED:
@@ -425,6 +435,9 @@ def is_idle(self) -> bool:
425435
def is_closed(self) -> bool:
426436
return self.send_state == State.CLOSED
427437

438+
def keepalive_expired(self) -> bool:
439+
return (self.keepalive_until is not None) and (time.monotonic() > self.keepalive_until)
440+
428441
def description(self) -> str:
429442
return {
430443
State.SEND_METHOD_LINE: "idle",
@@ -439,10 +452,9 @@ def __repr__(self) -> str:
439452

440453

441454
class HTTPStream(Stream):
442-
def __init__(self, parser: HTTPParser, callback: typing.Callable | None = None):
455+
def __init__(self, parser: HTTPParser):
443456
self._parser = parser
444457
self._buffer = io.BytesIO()
445-
self._callback = callback
446458

447459
def read(self, size=-1) -> bytes:
448460
sections = []
@@ -474,12 +486,8 @@ def read(self, size=-1) -> bytes:
474486
return output
475487

476488
def close(self) -> None:
477-
try:
478-
self._buffer.close()
479-
self._parser.complete()
480-
finally:
481-
if self._callback is not None:
482-
self._callback()
489+
self._buffer.close()
490+
self._parser.complete()
483491

484492

485493
class ReadAheadParser:

src/httpx/_pool.py

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,8 @@ def _get_connection(self, request: Request) -> "Connection":
8484
# Attempt to reuse an existing connection.
8585
url = request.url
8686
origin = URL(scheme=url.scheme, host=url.host, port=url.port)
87-
now = time.monotonic()
8887
for conn in self._connections:
89-
if conn.origin() == origin and conn.is_idle() and not conn.is_expired(now):
88+
if conn.origin() == origin and conn.is_idle() and not conn.is_expired():
9089
return conn
9190

9291
# Or else create a new connection.
@@ -102,7 +101,7 @@ def _get_connection(self, request: Request) -> "Connection":
102101
def _cleanup(self) -> None:
103102
now = time.monotonic()
104103
for conn in list(self._connections):
105-
if conn.is_expired(now):
104+
if conn.is_expired():
106105
conn.close()
107106
if conn.is_closed():
108107
self._connections.remove(conn)
@@ -142,8 +141,6 @@ class Connection(Transport):
142141
def __init__(self, stream: Stream, origin: URL | str):
143142
self._stream = stream
144143
self._origin = URL(origin) if not isinstance(origin, URL) else origin
145-
self._keepalive_duration = 5.0
146-
self._idle_expiry = time.monotonic() + self._keepalive_duration
147144
self._request_lock = Lock()
148145
self._parser = HTTPParser(stream, mode='CLIENT')
149146

@@ -154,8 +151,8 @@ def origin(self) -> URL:
154151
def is_idle(self) -> bool:
155152
return self._parser.is_idle()
156153

157-
def is_expired(self, when: float) -> bool:
158-
return self._parser.is_idle() and when > self._idle_expiry
154+
def is_expired(self) -> bool:
155+
return self._parser.is_idle() and self._parser.keepalive_expired()
159156

160157
def is_closed(self) -> bool:
161158
return self._parser.is_closed()
@@ -170,7 +167,7 @@ def send(self, request: Request) -> Response:
170167
self._send_head(request)
171168
self._send_body(request)
172169
code, headers = self._recv_head()
173-
stream = HTTPStream(self._parser, callback=self._complete)
170+
stream = HTTPStream(self._parser)
174171
# TODO...
175172
return Response(code, headers=headers, content=stream)
176173
# finally:
@@ -233,9 +230,6 @@ def _recv_body(self) -> bytes:
233230
return self._parser.recv_body()
234231

235232
# Request/response cycle complete...
236-
def _complete(self) -> None:
237-
self._idle_expiry = time.monotonic() + self._keepalive_duration
238-
239233
def _close(self) -> None:
240234
self._parser.close()
241235

src/httpx/_server.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,13 @@ def __init__(self, stream, endpoint):
2424
self._stream = stream
2525
self._endpoint = endpoint
2626
self._parser = HTTPParser(stream, mode='SERVER')
27-
self._keepalive_duration = 5.0
28-
self._idle_expiry = time.monotonic() + self._keepalive_duration
2927

3028
# API entry points...
3129
def handle_requests(self):
3230
try:
33-
while not self._parser.recv_close():
31+
while not (self._parser.keepalive_expired() or self._parser.recv_close()):
3432
method, url, headers = self._recv_head()
35-
stream = HTTPStream(self._parser, callback=self._complete)
33+
stream = HTTPStream(self._parser)
3634
# TODO: Handle endpoint exceptions
3735
with Request(method, url, headers=headers, content=stream) as request:
3836
try:
@@ -82,10 +80,6 @@ def _send_body(self, response: Response):
8280
self._parser.send_body(data)
8381
self._parser.send_body(b'')
8482

85-
# Start it all over again...
86-
def _complete(self):
87-
self._idle_expiry = time.monotonic() + self._keepalive_duration
88-
8983

9084
class HTTPServer:
9185
def __init__(self, host, port):

0 commit comments

Comments
 (0)