Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions deepset_cloud_sdk/_s3/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,23 +266,28 @@ async def upload_from_memory(
) -> S3UploadResult:
"""Upload content to the prefixed S3 namespace.

Uses the same concurrency limit as :meth:`upload_from_file` so bulk in-memory
uploads (including sidecar ``.meta.json`` objects) do not open unbounded
parallel connections—important behind corporate HTTP proxies.

:param file_name: Name of the file.
:param upload_session: UploadSession to associate the upload with.
:param content: Content of the file.
:param client_session: The aiohttp ClientSession to use for this request.
:return: S3UploadResult object.
"""
try:
await self._upload_file_with_retries(file_name, upload_session, content, client_session)
return S3UploadResult(file_name=file_name, success=True)
except Exception as exception: # pylint: disable=bare-except, disable=broad-exception-caught
logger.warning(
"Could not upload a file to deepset AI Platform",
file_name=file_name,
session_id=upload_session.session_id,
reason=str(exception),
)
return S3UploadResult(file_name=file_name, success=False, exception=exception)
async with self.semaphore:
try:
await self._upload_file_with_retries(file_name, upload_session, content, client_session)
return S3UploadResult(file_name=file_name, success=True)
except Exception as exception: # pylint: disable=broad-exception-caught
logger.warning(
"Could not upload a file to deepset AI Platform",
file_name=file_name,
session_id=upload_session.session_id,
reason=str(exception),
)
return S3UploadResult(file_name=file_name, success=False, exception=exception)

async def _process_results(
self,
Expand Down
26 changes: 25 additions & 1 deletion deepset_cloud_sdk/_service/files_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,29 @@
META_SUFFIX = ".meta.json"
DIRECT_UPLOAD_THRESHOLD = 20
DEFAULT_S3_CONCURRENCY = 10
# Fewer parallel TLS connections through typical corporate proxies (avoids Windows
# socket / semaphore exhaustion when many files upload at once; see ECO-410).
PROXY_S3_CONCURRENCY = 4
DEFAULT_MAX_ATTEMPTS = 5
SAFE_MODE_CONCURRENCY = 1
SAFE_MODE_MAX_ATTEMPTS = 10

_PROXY_ENV_KEYS = ("HTTP_PROXY", "HTTPS_PROXY", "http_proxy", "https_proxy", "ALL_PROXY", "all_proxy")


def _http_proxy_configured() -> bool:
"""Return True when a proxy is set via common environment variables."""
return any(os.environ.get(key, "").strip() for key in _PROXY_ENV_KEYS)


def _resolve_s3_concurrency(safe_mode: bool) -> int:
"""Pick S3 client concurrency: safe mode, proxy-capped, or default."""
if safe_mode:
return SAFE_MODE_CONCURRENCY
if _http_proxy_configured():
return PROXY_S3_CONCURRENCY
return DEFAULT_S3_CONCURRENCY


class FilesService:
"""Service for all file-related operations."""
Expand All @@ -69,8 +88,13 @@ async def factory(cls, config: CommonConfig) -> AsyncGenerator[FilesService, Non
async with DeepsetCloudAPI.factory(config) as deepset_cloud_api:
files_api = FilesAPI(deepset_cloud_api)
upload_sessions_api = UploadSessionsAPI(deepset_cloud_api)
concurrency = SAFE_MODE_CONCURRENCY if config.safe_mode else DEFAULT_S3_CONCURRENCY
concurrency = _resolve_s3_concurrency(config.safe_mode)
max_attempts = SAFE_MODE_MAX_ATTEMPTS if config.safe_mode else DEFAULT_MAX_ATTEMPTS
if not config.safe_mode and _http_proxy_configured():
logger.info(
"HTTP(S) proxy detected: using reduced S3 upload concurrency.",
concurrency=concurrency,
)
async with S3(concurrency=concurrency, max_attempts=max_attempts) as s3:
yield cls(upload_sessions_api, files_api, s3)

Expand Down
42 changes: 41 additions & 1 deletion tests/unit/service/test_files_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,14 @@
WriteMode,
)
from deepset_cloud_sdk._s3.upload import S3UploadResult, S3UploadSummary
from deepset_cloud_sdk._service.files_service import FilesService
from deepset_cloud_sdk._service.files_service import (
DEFAULT_S3_CONCURRENCY,
PROXY_S3_CONCURRENCY,
SAFE_MODE_CONCURRENCY,
FilesService,
_http_proxy_configured,
_resolve_s3_concurrency,
)
from deepset_cloud_sdk.models import DeepsetCloudFile, UserInfo


Expand Down Expand Up @@ -1117,3 +1124,36 @@ def test_directories_excluded_from_path_non_recursive(self) -> None:
paths = [Path("tests/data/upload_folder_nested")]
file_paths = FilesService._get_file_paths(paths=paths, recursive=False)
assert file_paths == [Path("tests/data/upload_folder_nested/example.txt")]


class TestResolveS3Concurrency:
@staticmethod
def _clear_proxy_env(monkeypatch: MonkeyPatch) -> None:
for key in (
"HTTP_PROXY",
"HTTPS_PROXY",
"http_proxy",
"https_proxy",
"ALL_PROXY",
"all_proxy",
):
monkeypatch.delenv(key, raising=False)

def test_safe_mode_ignores_proxy(self, monkeypatch: MonkeyPatch) -> None:
self._clear_proxy_env(monkeypatch)
monkeypatch.setenv("HTTPS_PROXY", "http://proxy.example:8080")
assert _resolve_s3_concurrency(safe_mode=True) == SAFE_MODE_CONCURRENCY

def test_proxy_lowers_concurrency(self, monkeypatch: MonkeyPatch) -> None:
self._clear_proxy_env(monkeypatch)
monkeypatch.setenv("HTTPS_PROXY", "http://proxy.example:8080")
assert _resolve_s3_concurrency(safe_mode=False) == PROXY_S3_CONCURRENCY

def test_no_proxy_uses_default(self, monkeypatch: MonkeyPatch) -> None:
self._clear_proxy_env(monkeypatch)
assert _resolve_s3_concurrency(safe_mode=False) == DEFAULT_S3_CONCURRENCY

def test_http_proxy_configured_empty_string(self, monkeypatch: MonkeyPatch) -> None:
self._clear_proxy_env(monkeypatch)
monkeypatch.setenv("HTTP_PROXY", " ")
assert _http_proxy_configured() is False
Loading