Skip to content

Commit d482067

Browse files
przemekborutaclaude
andcommitted
fix: respect max_parallel_requests in HTTP connection pool size
Pass a pre-configured HTTPTransport/AsyncHTTPTransport with the correct limits into RetryTransport instead of letting it create its own pool with httpx defaults (100 connections). Previously, the limits calculated from max_parallel_requests were passed to httpx.Client(limits=...) which silently ignores them when a custom transport is provided. Fixes #459 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Przemysław <przemekboruta@interia.pl>
1 parent c25a708 commit d482067

4 files changed

Lines changed: 77 additions & 5 deletions

File tree

packages/data-designer-engine/src/data_designer/engine/models/clients/adapters/http_model_client.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,12 @@ def _get_sync_client(self) -> httpx.Client:
9797
raise RuntimeError("Model client is closed.")
9898
if self._client is None:
9999
if self._transport is None:
100-
self._transport = create_retry_transport(self._retry_config, strip_rate_limit_codes=False)
100+
inner = lazy.httpx.HTTPTransport(limits=self._limits)
101+
self._transport = create_retry_transport(
102+
self._retry_config, strip_rate_limit_codes=False, transport=inner
103+
)
101104
self._client = lazy.httpx.Client(
102105
transport=self._transport,
103-
limits=self._limits,
104106
timeout=lazy.httpx.Timeout(self._timeout_s),
105107
)
106108
return self._client
@@ -113,10 +115,12 @@ def _get_async_client(self) -> httpx.AsyncClient:
113115
raise RuntimeError("Model client is closed.")
114116
if self._aclient is None:
115117
if self._transport is None:
116-
self._transport = create_retry_transport(self._retry_config, strip_rate_limit_codes=True)
118+
inner = lazy.httpx.AsyncHTTPTransport(limits=self._limits)
119+
self._transport = create_retry_transport(
120+
self._retry_config, strip_rate_limit_codes=True, transport=inner
121+
)
117122
self._aclient = lazy.httpx.AsyncClient(
118123
transport=self._transport,
119-
limits=self._limits,
120124
timeout=lazy.httpx.Timeout(self._timeout_s),
121125
)
122126
return self._aclient

packages/data-designer-engine/src/data_designer/engine/models/clients/retry.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,13 @@
55

66
import logging
77
from dataclasses import dataclass, field
8+
from typing import TYPE_CHECKING
89

910
from httpx_retries import Retry, RetryTransport
1011

12+
if TYPE_CHECKING:
13+
import httpx
14+
1115
logger = logging.getLogger(__name__)
1216

1317
# 429 must not be retried at the transport layer so that rate-limit signals
@@ -37,6 +41,7 @@ def create_retry_transport(
3741
config: RetryConfig | None = None,
3842
*,
3943
strip_rate_limit_codes: bool = True,
44+
transport: httpx.BaseTransport | httpx.AsyncBaseTransport | None = None,
4045
) -> RetryTransport:
4146
"""Build an httpx ``RetryTransport`` from a :class:`RetryConfig`.
4247
@@ -72,4 +77,4 @@ def create_retry_transport(
7277
respect_retry_after_header=True,
7378
allowed_methods=Retry.RETRYABLE_METHODS | frozenset(["POST"]),
7479
)
75-
return RetryTransport(retry=retry)
80+
return RetryTransport(transport=transport, retry=retry)

packages/data-designer-engine/tests/engine/models/clients/test_native_http_clients.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,3 +289,41 @@ async def test_acompletion_lazy_initializes_async_client(
289289

290290
mock_ctor.assert_called_once()
291291
assert result.message.content == "lazy result"
292+
293+
294+
# ---------------------------------------------------------------------------
295+
# Connection pool size regression tests (issue #459)
296+
# ---------------------------------------------------------------------------
297+
298+
299+
def test_sync_client_pool_size_respects_max_parallel_requests() -> None:
300+
"""Connection pool max_connections must be 2*max_parallel_requests, not the httpx default of 100."""
301+
client = OpenAICompatibleClient(
302+
provider_name=_OPENAI_PROVIDER,
303+
endpoint=_OPENAI_ENDPOINT,
304+
api_key="sk-test",
305+
max_parallel_requests=300,
306+
concurrency_mode=ClientConcurrencyMode.SYNC,
307+
)
308+
with patch(_SYNC_CLIENT_PATCH):
309+
client._get_sync_client()
310+
311+
# pool_max = max(32, 2 * 300) = 600
312+
assert client._transport._sync_transport._pool._max_connections == 600
313+
314+
315+
@pytest.mark.asyncio
316+
async def test_async_client_pool_size_respects_max_parallel_requests() -> None:
317+
"""Async connection pool max_connections must be 2*max_parallel_requests, not the httpx default of 100."""
318+
client = OpenAICompatibleClient(
319+
provider_name=_OPENAI_PROVIDER,
320+
endpoint=_OPENAI_ENDPOINT,
321+
api_key="sk-test",
322+
max_parallel_requests=300,
323+
concurrency_mode=ClientConcurrencyMode.ASYNC,
324+
)
325+
with patch(_ASYNC_CLIENT_PATCH):
326+
client._get_async_client()
327+
328+
# pool_max = max(32, 2 * 300) = 600
329+
assert client._transport._async_transport._pool._max_connections == 600

packages/data-designer-engine/tests/engine/models/clients/test_retry.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,3 +82,28 @@ def test_create_retry_transport_default_codes_exclude_429() -> None:
8282
"""Default retryable_status_codes do not include 429 regardless of strip flag."""
8383
transport = create_retry_transport(strip_rate_limit_codes=False)
8484
assert 429 not in transport.retry.status_forcelist
85+
86+
87+
def test_create_retry_transport_forwards_sync_transport() -> None:
88+
"""Provided sync transport is used directly rather than replaced with a default pool."""
89+
import httpx
90+
91+
inner = httpx.HTTPTransport(limits=httpx.Limits(max_connections=600))
92+
transport = create_retry_transport(transport=inner)
93+
assert transport._sync_transport is inner
94+
95+
96+
def test_create_retry_transport_forwards_async_transport() -> None:
97+
"""Provided async transport is used directly rather than replaced with a default pool."""
98+
import httpx
99+
100+
inner = httpx.AsyncHTTPTransport(limits=httpx.Limits(max_connections=600))
101+
transport = create_retry_transport(transport=inner)
102+
assert transport._async_transport is inner
103+
104+
105+
def test_create_retry_transport_no_transport_creates_defaults() -> None:
106+
"""Without a transport argument both sync and async default pools are created."""
107+
transport = create_retry_transport()
108+
assert transport._sync_transport is not None
109+
assert transport._async_transport is not None

0 commit comments

Comments
 (0)