Skip to content

Commit 6d9fedc

Browse files
authored
fix(s3): cap parallel uploads behind proxy (ECO-410) (#308)
- Apply S3 semaphore to upload_from_memory (parity with path uploads) - Use reduced concurrency when HTTP(S) proxy env vars are set - Add unit tests for concurrency resolution Made-with: Cursor
1 parent f0ef0df commit 6d9fedc

3 files changed

Lines changed: 82 additions & 13 deletions

File tree

deepset_cloud_sdk/_s3/upload.py

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -266,23 +266,28 @@ async def upload_from_memory(
266266
) -> S3UploadResult:
267267
"""Upload content to the prefixed S3 namespace.
268268
269+
Uses the same concurrency limit as :meth:`upload_from_file` so bulk in-memory
270+
uploads (including sidecar ``.meta.json`` objects) do not open unbounded
271+
parallel connections—important behind corporate HTTP proxies.
272+
269273
:param file_name: Name of the file.
270274
:param upload_session: UploadSession to associate the upload with.
271275
:param content: Content of the file.
272276
:param client_session: The aiohttp ClientSession to use for this request.
273277
:return: S3UploadResult object.
274278
"""
275-
try:
276-
await self._upload_file_with_retries(file_name, upload_session, content, client_session)
277-
return S3UploadResult(file_name=file_name, success=True)
278-
except Exception as exception: # pylint: disable=bare-except, disable=broad-exception-caught
279-
logger.warning(
280-
"Could not upload a file to deepset AI Platform",
281-
file_name=file_name,
282-
session_id=upload_session.session_id,
283-
reason=str(exception),
284-
)
285-
return S3UploadResult(file_name=file_name, success=False, exception=exception)
279+
async with self.semaphore:
280+
try:
281+
await self._upload_file_with_retries(file_name, upload_session, content, client_session)
282+
return S3UploadResult(file_name=file_name, success=True)
283+
except Exception as exception: # pylint: disable=broad-exception-caught
284+
logger.warning(
285+
"Could not upload a file to deepset AI Platform",
286+
file_name=file_name,
287+
session_id=upload_session.session_id,
288+
reason=str(exception),
289+
)
290+
return S3UploadResult(file_name=file_name, success=False, exception=exception)
286291

287292
async def _process_results(
288293
self,

deepset_cloud_sdk/_service/files_service.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,29 @@
3939
META_SUFFIX = ".meta.json"
4040
DIRECT_UPLOAD_THRESHOLD = 20
4141
DEFAULT_S3_CONCURRENCY = 10
42+
# Fewer parallel TLS connections through typical corporate proxies (avoids Windows
43+
# socket / semaphore exhaustion when many files upload at once; see ECO-410).
44+
PROXY_S3_CONCURRENCY = 4
4245
DEFAULT_MAX_ATTEMPTS = 5
4346
SAFE_MODE_CONCURRENCY = 1
4447
SAFE_MODE_MAX_ATTEMPTS = 10
4548

49+
_PROXY_ENV_KEYS = ("HTTP_PROXY", "HTTPS_PROXY", "http_proxy", "https_proxy", "ALL_PROXY", "all_proxy")
50+
51+
52+
def _http_proxy_configured() -> bool:
53+
"""Return True when a proxy is set via common environment variables."""
54+
return any(os.environ.get(key, "").strip() for key in _PROXY_ENV_KEYS)
55+
56+
57+
def _resolve_s3_concurrency(safe_mode: bool) -> int:
58+
"""Pick S3 client concurrency: safe mode, proxy-capped, or default."""
59+
if safe_mode:
60+
return SAFE_MODE_CONCURRENCY
61+
if _http_proxy_configured():
62+
return PROXY_S3_CONCURRENCY
63+
return DEFAULT_S3_CONCURRENCY
64+
4665

4766
class FilesService:
4867
"""Service for all file-related operations."""
@@ -69,8 +88,13 @@ async def factory(cls, config: CommonConfig) -> AsyncGenerator[FilesService, Non
6988
async with DeepsetCloudAPI.factory(config) as deepset_cloud_api:
7089
files_api = FilesAPI(deepset_cloud_api)
7190
upload_sessions_api = UploadSessionsAPI(deepset_cloud_api)
72-
concurrency = SAFE_MODE_CONCURRENCY if config.safe_mode else DEFAULT_S3_CONCURRENCY
91+
concurrency = _resolve_s3_concurrency(config.safe_mode)
7392
max_attempts = SAFE_MODE_MAX_ATTEMPTS if config.safe_mode else DEFAULT_MAX_ATTEMPTS
93+
if not config.safe_mode and _http_proxy_configured():
94+
logger.info(
95+
"HTTP(S) proxy detected: using reduced S3 upload concurrency.",
96+
concurrency=concurrency,
97+
)
7498
async with S3(concurrency=concurrency, max_attempts=max_attempts) as s3:
7599
yield cls(upload_sessions_api, files_api, s3)
76100

tests/unit/service/test_files_service.py

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,14 @@
2828
WriteMode,
2929
)
3030
from deepset_cloud_sdk._s3.upload import S3UploadResult, S3UploadSummary
31-
from deepset_cloud_sdk._service.files_service import FilesService
31+
from deepset_cloud_sdk._service.files_service import (
32+
DEFAULT_S3_CONCURRENCY,
33+
PROXY_S3_CONCURRENCY,
34+
SAFE_MODE_CONCURRENCY,
35+
FilesService,
36+
_http_proxy_configured,
37+
_resolve_s3_concurrency,
38+
)
3239
from deepset_cloud_sdk.models import DeepsetCloudFile, UserInfo
3340

3441

@@ -1117,3 +1124,36 @@ def test_directories_excluded_from_path_non_recursive(self) -> None:
11171124
paths = [Path("tests/data/upload_folder_nested")]
11181125
file_paths = FilesService._get_file_paths(paths=paths, recursive=False)
11191126
assert file_paths == [Path("tests/data/upload_folder_nested/example.txt")]
1127+
1128+
1129+
class TestResolveS3Concurrency:
1130+
@staticmethod
1131+
def _clear_proxy_env(monkeypatch: MonkeyPatch) -> None:
1132+
for key in (
1133+
"HTTP_PROXY",
1134+
"HTTPS_PROXY",
1135+
"http_proxy",
1136+
"https_proxy",
1137+
"ALL_PROXY",
1138+
"all_proxy",
1139+
):
1140+
monkeypatch.delenv(key, raising=False)
1141+
1142+
def test_safe_mode_ignores_proxy(self, monkeypatch: MonkeyPatch) -> None:
1143+
self._clear_proxy_env(monkeypatch)
1144+
monkeypatch.setenv("HTTPS_PROXY", "http://proxy.example:8080")
1145+
assert _resolve_s3_concurrency(safe_mode=True) == SAFE_MODE_CONCURRENCY
1146+
1147+
def test_proxy_lowers_concurrency(self, monkeypatch: MonkeyPatch) -> None:
1148+
self._clear_proxy_env(monkeypatch)
1149+
monkeypatch.setenv("HTTPS_PROXY", "http://proxy.example:8080")
1150+
assert _resolve_s3_concurrency(safe_mode=False) == PROXY_S3_CONCURRENCY
1151+
1152+
def test_no_proxy_uses_default(self, monkeypatch: MonkeyPatch) -> None:
1153+
self._clear_proxy_env(monkeypatch)
1154+
assert _resolve_s3_concurrency(safe_mode=False) == DEFAULT_S3_CONCURRENCY
1155+
1156+
def test_http_proxy_configured_empty_string(self, monkeypatch: MonkeyPatch) -> None:
1157+
self._clear_proxy_env(monkeypatch)
1158+
monkeypatch.setenv("HTTP_PROXY", " ")
1159+
assert _http_proxy_configured() is False

0 commit comments

Comments
 (0)