Skip to content

Commit 301e4c2

Browse files
committed
Drop stale unavailable pool connections
1 parent 10a6582 commit 301e4c2

6 files changed

Lines changed: 236 additions & 0 deletions

File tree

httpcore/_async/connection_pool.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,17 @@ async def handle_async_request(self, request: Request) -> Response:
242242
#
243243
# In this case we clear the connection and try again.
244244
pool_request.clear_connection()
245+
with self._optional_thread_lock:
246+
closing = []
247+
# If the connection still claims to be available then
248+
# it would be immediately assigned again, so drop it.
249+
if (
250+
connection in self._connections
251+
and connection.is_available()
252+
):
253+
self._connections.remove(connection)
254+
closing.append(connection)
255+
await self._close_connections(closing)
245256
else:
246257
break # pragma: nocover
247258

httpcore/_async/http2.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,7 @@ def can_handle_request(self, origin: Origin) -> bool:
511511
def is_available(self) -> bool:
512512
return (
513513
self._state != HTTPConnectionState.CLOSED
514+
and self._connection_terminated is None
514515
and not self._connection_error
515516
and not self._used_all_stream_ids
516517
and not (

httpcore/_sync/connection_pool.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,17 @@ def handle_request(self, request: Request) -> Response:
242242
#
243243
# In this case we clear the connection and try again.
244244
pool_request.clear_connection()
245+
with self._optional_thread_lock:
246+
closing = []
247+
# If the connection still claims to be available then
248+
# it would be immediately assigned again, so drop it.
249+
if (
250+
connection in self._connections
251+
and connection.is_available()
252+
):
253+
self._connections.remove(connection)
254+
closing.append(connection)
255+
self._close_connections(closing)
245256
else:
246257
break # pragma: nocover
247258

httpcore/_sync/http2.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,7 @@ def can_handle_request(self, origin: Origin) -> bool:
511511
def is_available(self) -> bool:
512512
return (
513513
self._state != HTTPConnectionState.CLOSED
514+
and self._connection_terminated is None
514515
and not self._connection_error
515516
and not self._used_all_stream_ids
516517
and not (

tests/_async/test_connection_pool.py

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,112 @@ async def trace(name, kwargs):
451451
]
452452

453453

454+
@pytest.mark.anyio
455+
async def test_connection_pool_drops_stale_available_connection():
456+
"""
457+
If a connection claims to be available but then refuses a request,
458+
the pool should not keep assigning requests to that stale connection.
459+
"""
460+
461+
class StaleConnection(httpcore.AsyncConnectionInterface):
462+
def __init__(self) -> None:
463+
self.closed = False
464+
465+
async def handle_async_request(
466+
self, request: httpcore.Request
467+
) -> httpcore.Response:
468+
raise httpcore.ConnectionNotAvailable()
469+
470+
def can_handle_request(self, origin: httpcore.Origin) -> bool:
471+
return True
472+
473+
def is_available(self) -> bool:
474+
return True
475+
476+
def has_expired(self) -> bool:
477+
return False
478+
479+
def is_idle(self) -> bool:
480+
return True
481+
482+
def is_closed(self) -> bool:
483+
return False
484+
485+
async def aclose(self) -> None:
486+
self.closed = True
487+
488+
def info(self) -> str:
489+
return "STALE"
490+
491+
class SuccessConnection(httpcore.AsyncConnectionInterface):
492+
def __init__(self) -> None:
493+
self.closed = False
494+
495+
async def handle_async_request(
496+
self, request: httpcore.Request
497+
) -> httpcore.Response:
498+
async def content() -> typing.AsyncIterator[bytes]:
499+
yield b"ok"
500+
501+
return httpcore.Response(200, content=content())
502+
503+
def can_handle_request(self, origin: httpcore.Origin) -> bool:
504+
return True
505+
506+
def is_available(self) -> bool:
507+
return True
508+
509+
def has_expired(self) -> bool:
510+
return False
511+
512+
def is_idle(self) -> bool:
513+
return True
514+
515+
def is_closed(self) -> bool:
516+
return False
517+
518+
async def aclose(self) -> None:
519+
self.closed = True
520+
521+
def info(self) -> str:
522+
return "OK"
523+
524+
class Pool(httpcore.AsyncConnectionPool):
525+
def __init__(self) -> None:
526+
super().__init__()
527+
self.stale_connection = StaleConnection()
528+
self.success_connection = SuccessConnection()
529+
self._created = 0
530+
531+
def create_connection(
532+
self, origin: httpcore.Origin
533+
) -> httpcore.AsyncConnectionInterface:
534+
self._created += 1
535+
if self._created == 1:
536+
return self.stale_connection
537+
return self.success_connection
538+
539+
origin = httpcore.Origin(b"https", b"example.com", 443)
540+
pool = Pool()
541+
async with pool:
542+
response = await pool.request("GET", "https://example.com/")
543+
544+
assert response.status == 200
545+
assert response.content == b"ok"
546+
assert pool.stale_connection.closed
547+
assert len(pool.connections) == 1
548+
connection = typing.cast(typing.Any, pool.connections[0])
549+
assert connection is pool.success_connection
550+
assert pool.stale_connection.can_handle_request(origin)
551+
assert not pool.stale_connection.has_expired()
552+
assert pool.stale_connection.is_idle()
553+
assert not pool.stale_connection.is_closed()
554+
assert pool.stale_connection.info() == "STALE"
555+
assert pool.success_connection.can_handle_request(origin)
556+
assert pool.success_connection.is_available()
557+
assert pool.success_connection.info() == "OK"
558+
559+
454560
@pytest.mark.anyio
455561
async def test_connection_pool_with_immediate_expiry():
456562
"""

tests/_sync/test_connection_pool.py

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,112 @@ def trace(name, kwargs):
452452

453453

454454

455+
def test_connection_pool_drops_stale_available_connection():
456+
"""
457+
If a connection claims to be available but then refuses a request,
458+
the pool should not keep assigning requests to that stale connection.
459+
"""
460+
461+
class StaleConnection(httpcore.ConnectionInterface):
462+
def __init__(self) -> None:
463+
self.closed = False
464+
465+
def handle_request(
466+
self, request: httpcore.Request
467+
) -> httpcore.Response:
468+
raise httpcore.ConnectionNotAvailable()
469+
470+
def can_handle_request(self, origin: httpcore.Origin) -> bool:
471+
return True
472+
473+
def is_available(self) -> bool:
474+
return True
475+
476+
def has_expired(self) -> bool:
477+
return False
478+
479+
def is_idle(self) -> bool:
480+
return True
481+
482+
def is_closed(self) -> bool:
483+
return False
484+
485+
def close(self) -> None:
486+
self.closed = True
487+
488+
def info(self) -> str:
489+
return "STALE"
490+
491+
class SuccessConnection(httpcore.ConnectionInterface):
492+
def __init__(self) -> None:
493+
self.closed = False
494+
495+
def handle_request(
496+
self, request: httpcore.Request
497+
) -> httpcore.Response:
498+
def content() -> typing.Iterator[bytes]:
499+
yield b"ok"
500+
501+
return httpcore.Response(200, content=content())
502+
503+
def can_handle_request(self, origin: httpcore.Origin) -> bool:
504+
return True
505+
506+
def is_available(self) -> bool:
507+
return True
508+
509+
def has_expired(self) -> bool:
510+
return False
511+
512+
def is_idle(self) -> bool:
513+
return True
514+
515+
def is_closed(self) -> bool:
516+
return False
517+
518+
def close(self) -> None:
519+
self.closed = True
520+
521+
def info(self) -> str:
522+
return "OK"
523+
524+
class Pool(httpcore.ConnectionPool):
525+
def __init__(self) -> None:
526+
super().__init__()
527+
self.stale_connection = StaleConnection()
528+
self.success_connection = SuccessConnection()
529+
self._created = 0
530+
531+
def create_connection(
532+
self, origin: httpcore.Origin
533+
) -> httpcore.ConnectionInterface:
534+
self._created += 1
535+
if self._created == 1:
536+
return self.stale_connection
537+
return self.success_connection
538+
539+
origin = httpcore.Origin(b"https", b"example.com", 443)
540+
pool = Pool()
541+
with pool:
542+
response = pool.request("GET", "https://example.com/")
543+
544+
assert response.status == 200
545+
assert response.content == b"ok"
546+
assert pool.stale_connection.closed
547+
assert len(pool.connections) == 1
548+
connection = typing.cast(typing.Any, pool.connections[0])
549+
assert connection is pool.success_connection
550+
assert pool.stale_connection.can_handle_request(origin)
551+
assert not pool.stale_connection.has_expired()
552+
assert pool.stale_connection.is_idle()
553+
assert not pool.stale_connection.is_closed()
554+
assert pool.stale_connection.info() == "STALE"
555+
assert pool.success_connection.can_handle_request(origin)
556+
assert pool.success_connection.is_available()
557+
assert pool.success_connection.info() == "OK"
558+
559+
560+
455561
def test_connection_pool_with_immediate_expiry():
456562
"""
457563
Connection pools with keepalive_expiry=0.0 should immediately expire

0 commit comments

Comments
 (0)