Skip to content

Commit 72d2eb6

Browse files
committed
Rewrite _assign_requests_to_connections to fix bugs and improve perf
Redesign of the connection pool's core request-connection assignment algorithm (_assign_requests_to_connections) fixing bugs and performance issues identified in the original implementation. Fixes: - Fixed incorrect idle connection counting, causing premature connection evictions - Fixed bug where multiple requests could be assigned to the same HTTP/1.1 connection, leading to ConnectionNotAvailable errors Performance improvements: - Reduced time complexity from O(N²+NxM) to O(N+M) where N=connections, M=requests
1 parent b7ce985 commit 72d2eb6

3 files changed

Lines changed: 293 additions & 110 deletions

File tree

httpcore/_async/connection_pool.py

Lines changed: 145 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,12 @@
1313
from .connection import AsyncHTTPConnection
1414
from .interfaces import AsyncConnectionInterface, AsyncRequestInterface
1515

16+
try:
17+
from .http2 import AsyncHTTP2Connection
18+
except ImportError:
19+
# ImportError happens when the user installed httpcore without the optional http2 dependency
20+
AsyncHTTP2Connection = None # type: ignore[assignment, misc]
21+
1622

1723
class AsyncPoolRequest:
1824
def __init__(self, request: Request) -> None:
@@ -277,66 +283,150 @@ def _assign_requests_to_connections(self) -> list[AsyncConnectionInterface]:
277283
Any closing connections are returned, allowing the I/O for closing
278284
those connections to be handled seperately.
279285
"""
280-
closing_connections = []
281-
282-
# First we handle cleaning up any connections that are closed,
283-
# have expired their keep-alive, or surplus idle connections.
284-
for connection in list(self._connections):
285-
if connection.is_closed():
286-
# log: "removing closed connection"
287-
self._connections.remove(connection)
288-
elif connection.has_expired():
289-
# log: "closing expired connection"
290-
self._connections.remove(connection)
291-
closing_connections.append(connection)
292-
elif (
293-
connection.is_idle()
294-
and len([connection.is_idle() for connection in self._connections])
295-
> self._max_keepalive_connections
296-
):
297-
# log: "closing idle connection"
298-
self._connections.remove(connection)
299-
closing_connections.append(connection)
300-
301-
# Assign queued requests to connections.
302-
queued_requests = [request for request in self._requests if request.is_queued()]
303-
for pool_request in queued_requests:
286+
# Initialize connection buckets
287+
closing_conns: list[AsyncConnectionInterface] = []
288+
available_conns: list[AsyncConnectionInterface] = []
289+
occupied_conns: list[AsyncConnectionInterface] = []
290+
291+
# Track HTTP/2 connection capacity
292+
http2_conn_stream_capacity: dict[AsyncConnectionInterface, int] = {}
293+
294+
# Phase 1: Categorize all connections in a single pass
295+
for conn in self._connections:
296+
if conn.is_closed():
297+
# Closed connections are simply skipped (not added to any bucket)
298+
continue
299+
elif conn.has_expired():
300+
# Expired connections need to be closed
301+
closing_conns.append(conn)
302+
elif conn.is_available():
303+
# Available connections
304+
available_conns.append(conn)
305+
# Track HTTP/2 connection capacity
306+
if self._http2 and isinstance(conn, AsyncHTTP2Connection):
307+
# Get the actual available stream count from the connection
308+
http2_conn_stream_capacity[conn] = (
309+
conn.get_available_stream_capacity()
310+
)
311+
elif conn.is_idle():
312+
# Idle but not available (this shouldn't happen, but handle it by closing the connection)
313+
closing_conns.append(conn)
314+
else:
315+
# Occupied connections
316+
occupied_conns.append(conn)
317+
318+
# Calculate how many new connections we can create
319+
total_existing_connections = (
320+
len(available_conns) + len(occupied_conns) + len(closing_conns)
321+
)
322+
new_conns_remaining_count = self._max_connections - total_existing_connections
323+
324+
# Phase 2: Assign queued requests to connections
325+
for pool_request in self._requests:
326+
if not pool_request.is_queued():
327+
continue
328+
304329
origin = pool_request.request.url.origin
305-
available_connections = [
306-
connection
307-
for connection in self._connections
308-
if connection.can_handle_request(origin) and connection.is_available()
309-
]
310-
idle_connections = [
311-
connection for connection in self._connections if connection.is_idle()
312-
]
313330

331+
# Try to find an available connection that can handle this request
314332
# There are three cases for how we may be able to handle the request:
315333
#
316-
# 1. There is an existing connection that can handle the request.
334+
# 1. There is an existing available connection that can handle the request.
317335
# 2. We can create a new connection to handle the request.
318-
# 3. We can close an idle connection and then create a new connection
319-
# to handle the request.
320-
if available_connections:
321-
# log: "reusing existing connection"
322-
connection = available_connections[0]
323-
pool_request.assign_to_connection(connection)
324-
elif len(self._connections) < self._max_connections:
325-
# log: "creating new connection"
326-
connection = self.create_connection(origin)
327-
self._connections.append(connection)
328-
pool_request.assign_to_connection(connection)
329-
elif idle_connections:
330-
# log: "closing idle connection"
331-
connection = idle_connections[0]
332-
self._connections.remove(connection)
333-
closing_connections.append(connection)
334-
# log: "creating new connection"
335-
connection = self.create_connection(origin)
336-
self._connections.append(connection)
337-
pool_request.assign_to_connection(connection)
338-
339-
return closing_connections
336+
# 3. We can close an idle connection and then create a new connection to handle the request.
337+
338+
assigned = False
339+
340+
# Case 1: try to use an available connection
341+
for i in range(len(available_conns) - 1, -1, -1):
342+
# Loop in reverse order since popping an element from the end of the list is O(1),
343+
# whereas popping from the beginning of the list is O(n)
344+
345+
conn = available_conns[i]
346+
if conn.can_handle_request(origin):
347+
# Assign the request to this connection
348+
pool_request.assign_to_connection(conn)
349+
350+
# Handle HTTP/1.1 vs HTTP/2 differently
351+
if self._http2 and conn in http2_conn_stream_capacity:
352+
# HTTP/2: Decrement available capacity
353+
http2_conn_stream_capacity[conn] -= 1
354+
if http2_conn_stream_capacity[conn] <= 0:
355+
# Move to occupied if no more capacity
356+
available_conns.pop(i)
357+
occupied_conns.append(conn)
358+
del http2_conn_stream_capacity[conn]
359+
else:
360+
# HTTP/1.1: Move to occupied immediately
361+
available_conns.pop(i)
362+
occupied_conns.append(conn)
363+
364+
assigned = True
365+
break
366+
367+
if assigned:
368+
continue
369+
370+
# Case 2: Try to create a new connection
371+
if new_conns_remaining_count > 0:
372+
conn = self.create_connection(origin)
373+
pool_request.assign_to_connection(conn)
374+
# New connections go to occupied (we don't know if HTTP/1.1 or HTTP/2 yet, so assume no multiplexing)
375+
occupied_conns.append(conn)
376+
new_conns_remaining_count -= 1
377+
continue
378+
379+
# Case 3, last resort: evict an idle connection and create a new connection
380+
assigned = False
381+
for i in range(len(available_conns) - 1, -1, -1):
382+
# Loop in reverse order since popping an element from the end of the list is O(1),
383+
# whereas popping from the beginning of the list is O(n)
384+
conn = available_conns[i]
385+
if conn.is_idle():
386+
evicted_conn = available_conns.pop(i)
387+
closing_conns.append(evicted_conn)
388+
# Create new connection for the required origin
389+
conn = self.create_connection(origin)
390+
pool_request.assign_to_connection(conn)
391+
occupied_conns.append(conn)
392+
assigned = True
393+
break
394+
395+
# All attempts failed: all connections are occupied and we can't create a new one
396+
if not assigned:
397+
# Break out of the loop since no more queued requests can be serviced at this time
398+
break
399+
400+
# Phase 3: Enforce self._max_keepalive_connections by closing excess idle connections
401+
#
402+
# Only run keepalive enforcement if len(available_conns) > max_keepalive.
403+
# Since idle connections are a subset of available connections, if there are
404+
# fewer available connections than the limit, we cannot possibly violate it.
405+
if len(available_conns) > self._max_keepalive_connections:
406+
keepalive_available_conns: list[AsyncConnectionInterface] = []
407+
n_idle_conns_kept = 0
408+
409+
for conn in available_conns:
410+
if conn.is_idle():
411+
if n_idle_conns_kept >= self._max_keepalive_connections:
412+
# We've already kept the maximum allowed idle connections, close this one
413+
closing_conns.append(conn)
414+
else:
415+
# Keep this idle connection as we're still under the limit
416+
keepalive_available_conns.append(conn)
417+
n_idle_conns_kept += 1
418+
else:
419+
# This is an available but not idle connection (active HTTP/2 with capacity)
420+
# Always keep these as they don't count against keepalive limits
421+
keepalive_available_conns.append(conn)
422+
423+
# Replace available_conns with the filtered list (excess idle connections removed)
424+
available_conns = keepalive_available_conns
425+
426+
# Rebuild self._connections from all buckets
427+
self._connections = available_conns + occupied_conns
428+
429+
return closing_conns
340430

341431
async def _close_connections(self, closing: list[AsyncConnectionInterface]) -> None:
342432
# Close connections which have been removed from the pool.

0 commit comments

Comments
 (0)