Skip to content

Commit 9cb839f

Browse files
mbeijenVictorPrins
andcommitted
Rewrite _assign_requests_to_connections as a single-pass O(N+M) loop
The previous implementation re-scanned `self._connections` per queued request to rebuild `available_connections` and `idle_connections` lists, giving O(N*M) behaviour when many requests are queued against a populated pool. The keepalive-eviction step in pass 1 was also quadratic, since `sum(... is_idle() ...)` was recomputed for every connection. This rewrite: - Walks `self._connections` once to drop closed connections and schedule expired ones for close. - Counts idle connections once, then evicts surplus idle connections in a single pass to enforce `max_keepalive_connections`. - Snapshots `available_connections` once before the queued-request loop, and consults it (rather than re-filtering `self._connections`) for each request. Behaviour is preserved exactly: - The same connection may still be assigned to multiple HTTP/1.1 requests in a single call. The pool's existing `ConnectionNotAvailable` retry loop in `handle_async_request` handles that case as before. - HTTP/2 multiplexing onto a single connection is preserved for the same reason — connections are not removed from `available_connections` on assignment. Inspired by encode/httpcore#1035 by @VictorPrins Co-Authored-By: Victor Prins <32959052+VictorPrins@users.noreply.github.com>
1 parent b4c5940 commit 9cb839f

2 files changed

Lines changed: 118 additions & 78 deletions

File tree

src/httpcore2/httpcore2/_async/connection_pool.py

Lines changed: 59 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -261,61 +261,81 @@ def _assign_requests_to_connections(self) -> list[AsyncConnectionInterface]:
261261
Any closing connections are returned, allowing the I/O for closing
262262
those connections to be handled seperately.
263263
"""
264-
closing_connections = []
264+
closing_connections: list[AsyncConnectionInterface] = []
265+
retained_connections: list[AsyncConnectionInterface] = []
265266

266-
# First we handle cleaning up any connections that are closed,
267-
# have expired their keep-alive, or surplus idle connections.
268-
for connection in list(self._connections):
267+
# First we handle cleaning up any connections that are closed
268+
# or have expired their keep-alive, in a single pass.
269+
for connection in self._connections:
269270
if connection.is_closed():
270271
# log: "removing closed connection"
271-
self._connections.remove(connection)
272+
continue
272273
elif connection.has_expired():
273274
# log: "closing expired connection"
274-
self._connections.remove(connection)
275-
closing_connections.append(connection)
276-
elif (
277-
connection.is_idle()
278-
and sum(connection.is_idle() for connection in self._connections) > self._max_keepalive_connections
279-
):
280-
# log: "closing idle connection"
281-
self._connections.remove(connection)
282275
closing_connections.append(connection)
276+
else:
277+
retained_connections.append(connection)
278+
279+
# Then we close any surplus idle connections, to enforce the
280+
# max_keepalive_connections setting.
281+
idle_surplus = (
282+
sum(connection.is_idle() for connection in retained_connections) - self._max_keepalive_connections
283+
)
284+
if idle_surplus > 0:
285+
kept: list[AsyncConnectionInterface] = []
286+
for connection in retained_connections:
287+
if idle_surplus > 0 and connection.is_idle():
288+
# log: "closing idle connection"
289+
closing_connections.append(connection)
290+
idle_surplus -= 1
291+
else:
292+
kept.append(connection)
293+
retained_connections = kept
294+
295+
self._connections = retained_connections
296+
297+
# Snapshot the set of reusable connections once, rather than rebuilding
298+
# it per queued request — this is what brings the loop from O(N*M) to
299+
# O(N+M) in the common case.
300+
available_connections = [connection for connection in self._connections if connection.is_available()]
301+
new_connection_budget = self._max_connections - len(self._connections)
283302

284303
# Assign queued requests to connections.
285-
queued_requests = [request for request in self._requests if request.is_queued()]
286-
for pool_request in queued_requests:
304+
for pool_request in self._requests:
305+
if not pool_request.is_queued():
306+
continue
287307
origin = pool_request.request.url.origin
288-
available_connections = [
289-
connection
290-
for connection in self._connections
291-
if connection.can_handle_request(origin) and connection.is_available()
292-
]
293-
idle_connections = [connection for connection in self._connections if connection.is_idle()]
294308

295309
# There are three cases for how we may be able to handle the request:
296310
#
297311
# 1. There is an existing connection that can handle the request.
298312
# 2. We can create a new connection to handle the request.
299313
# 3. We can close an idle connection and then create a new connection
300314
# to handle the request.
301-
if available_connections:
302-
# log: "reusing existing connection"
303-
connection = available_connections[0]
304-
pool_request.assign_to_connection(connection)
305-
elif len(self._connections) < self._max_connections:
306-
# log: "creating new connection"
307-
connection = self.create_connection(origin)
308-
self._connections.append(connection)
309-
pool_request.assign_to_connection(connection)
310-
elif idle_connections:
311-
# log: "closing idle connection"
312-
connection = idle_connections[0]
313-
self._connections.remove(connection)
314-
closing_connections.append(connection)
315-
# log: "creating new connection"
316-
connection = self.create_connection(origin)
317-
self._connections.append(connection)
318-
pool_request.assign_to_connection(connection)
315+
for connection in available_connections:
316+
if connection.can_handle_request(origin):
317+
# log: "reusing existing connection"
318+
pool_request.assign_to_connection(connection)
319+
break
320+
else:
321+
if new_connection_budget > 0:
322+
# log: "creating new connection"
323+
connection = self.create_connection(origin)
324+
self._connections.append(connection)
325+
pool_request.assign_to_connection(connection)
326+
new_connection_budget -= 1
327+
continue
328+
for idx, connection in enumerate(available_connections):
329+
if connection.is_idle():
330+
# log: "closing idle connection"
331+
del available_connections[idx]
332+
self._connections.remove(connection)
333+
closing_connections.append(connection)
334+
# log: "creating new connection"
335+
connection = self.create_connection(origin)
336+
self._connections.append(connection)
337+
pool_request.assign_to_connection(connection)
338+
break
319339

320340
return closing_connections
321341

src/httpcore2/httpcore2/_sync/connection_pool.py

Lines changed: 59 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -261,61 +261,81 @@ def _assign_requests_to_connections(self) -> list[ConnectionInterface]:
261261
Any closing connections are returned, allowing the I/O for closing
262262
those connections to be handled seperately.
263263
"""
264-
closing_connections = []
264+
closing_connections: list[ConnectionInterface] = []
265+
retained_connections: list[ConnectionInterface] = []
265266

266-
# First we handle cleaning up any connections that are closed,
267-
# have expired their keep-alive, or surplus idle connections.
268-
for connection in list(self._connections):
267+
# First we handle cleaning up any connections that are closed
268+
# or have expired their keep-alive, in a single pass.
269+
for connection in self._connections:
269270
if connection.is_closed():
270271
# log: "removing closed connection"
271-
self._connections.remove(connection)
272+
continue
272273
elif connection.has_expired():
273274
# log: "closing expired connection"
274-
self._connections.remove(connection)
275-
closing_connections.append(connection)
276-
elif (
277-
connection.is_idle()
278-
and sum(connection.is_idle() for connection in self._connections) > self._max_keepalive_connections
279-
):
280-
# log: "closing idle connection"
281-
self._connections.remove(connection)
282275
closing_connections.append(connection)
276+
else:
277+
retained_connections.append(connection)
278+
279+
# Then we close any surplus idle connections, to enforce the
280+
# max_keepalive_connections setting.
281+
idle_surplus = (
282+
sum(connection.is_idle() for connection in retained_connections) - self._max_keepalive_connections
283+
)
284+
if idle_surplus > 0:
285+
kept: list[ConnectionInterface] = []
286+
for connection in retained_connections:
287+
if idle_surplus > 0 and connection.is_idle():
288+
# log: "closing idle connection"
289+
closing_connections.append(connection)
290+
idle_surplus -= 1
291+
else:
292+
kept.append(connection)
293+
retained_connections = kept
294+
295+
self._connections = retained_connections
296+
297+
# Snapshot the set of reusable connections once, rather than rebuilding
298+
# it per queued request — this is what brings the loop from O(N*M) to
299+
# O(N+M) in the common case.
300+
available_connections = [connection for connection in self._connections if connection.is_available()]
301+
new_connection_budget = self._max_connections - len(self._connections)
283302

284303
# Assign queued requests to connections.
285-
queued_requests = [request for request in self._requests if request.is_queued()]
286-
for pool_request in queued_requests:
304+
for pool_request in self._requests:
305+
if not pool_request.is_queued():
306+
continue
287307
origin = pool_request.request.url.origin
288-
available_connections = [
289-
connection
290-
for connection in self._connections
291-
if connection.can_handle_request(origin) and connection.is_available()
292-
]
293-
idle_connections = [connection for connection in self._connections if connection.is_idle()]
294308

295309
# There are three cases for how we may be able to handle the request:
296310
#
297311
# 1. There is an existing connection that can handle the request.
298312
# 2. We can create a new connection to handle the request.
299313
# 3. We can close an idle connection and then create a new connection
300314
# to handle the request.
301-
if available_connections:
302-
# log: "reusing existing connection"
303-
connection = available_connections[0]
304-
pool_request.assign_to_connection(connection)
305-
elif len(self._connections) < self._max_connections:
306-
# log: "creating new connection"
307-
connection = self.create_connection(origin)
308-
self._connections.append(connection)
309-
pool_request.assign_to_connection(connection)
310-
elif idle_connections:
311-
# log: "closing idle connection"
312-
connection = idle_connections[0]
313-
self._connections.remove(connection)
314-
closing_connections.append(connection)
315-
# log: "creating new connection"
316-
connection = self.create_connection(origin)
317-
self._connections.append(connection)
318-
pool_request.assign_to_connection(connection)
315+
for connection in available_connections:
316+
if connection.can_handle_request(origin):
317+
# log: "reusing existing connection"
318+
pool_request.assign_to_connection(connection)
319+
break
320+
else:
321+
if new_connection_budget > 0:
322+
# log: "creating new connection"
323+
connection = self.create_connection(origin)
324+
self._connections.append(connection)
325+
pool_request.assign_to_connection(connection)
326+
new_connection_budget -= 1
327+
continue
328+
for idx, connection in enumerate(available_connections):
329+
if connection.is_idle():
330+
# log: "closing idle connection"
331+
del available_connections[idx]
332+
self._connections.remove(connection)
333+
closing_connections.append(connection)
334+
# log: "creating new connection"
335+
connection = self.create_connection(origin)
336+
self._connections.append(connection)
337+
pool_request.assign_to_connection(connection)
338+
break
319339

320340
return closing_connections
321341

0 commit comments

Comments
 (0)