diff --git a/deepset_cloud_sdk/_s3/upload.py b/deepset_cloud_sdk/_s3/upload.py index 96580074..1b27681c 100644 --- a/deepset_cloud_sdk/_s3/upload.py +++ b/deepset_cloud_sdk/_s3/upload.py @@ -7,7 +7,7 @@ from http import HTTPStatus from pathlib import Path from types import TracebackType -from typing import Any, Coroutine, List, Optional, Sequence, Type, Union +from typing import Any, Coroutine, Dict, List, Optional, Sequence, Type, Union import aiofiles import aiohttp @@ -94,7 +94,16 @@ def __init__( """ self.connector = aiohttp.TCPConnector(limit=concurrency) self.semaphore = asyncio.BoundedSemaphore(concurrency) - self.limiter = Limiter(rate_limit, raise_when_fail=False, max_delay=Duration.SECOND * 1) + + try: + # pyrate-limiter 3.x + self.limiter = Limiter(rate_limit, raise_when_fail=False, max_delay=Duration.SECOND * 1) + self._try_acquire_kwargs: Dict[str, Any] = {} + except TypeError: + # pyrate-limiter 4.0.0+ removed raise_when_fail and max_delay + self.limiter = Limiter(rate_limit) + self._try_acquire_kwargs = {"blocking": False} + self.max_attempts = max_attempts async def __aenter__(self) -> "S3": @@ -110,15 +119,15 @@ async def __aexit__( """Exit the context manager.""" await self.connector.close() - # Handle limiter cleanup based on available methods - # Support both older and newer versions of pyrate_limiter - # In version 3.7.0, the dispose method was added to the Limiter class - # See diff here: https://github.com/vutran1710/PyrateLimiter/compare/v3.6.2...master try: list(map(self.limiter.dispose, self.limiter.buckets())) except AttributeError: pass + def _rate_limit_acquire(self) -> None: + """Acquire a rate limit token with backward compat for pyrate-limiter 3.x and 4.x.""" + self.limiter.try_acquire("", **self._try_acquire_kwargs) + async def _upload_file_with_retries( self, file_name: str, @@ -158,7 +167,8 @@ async def _upload_file( file_data = self._build_file_data(content, aws_safe_name, aws_config) try: - self.limiter.try_acquire("") # rate limit requests + self._rate_limit_acquire() + async with client_session.post( aws_config.url, data=file_data, @@ -172,7 +182,8 @@ async def _upload_file( # for example during automatic redirects. See https://github.com/aio-libs/aiohttp/issues/5577 redirect_url = response.headers["Location"] file_data = self._build_file_data(content, aws_safe_name, aws_config) - self.limiter.try_acquire("") # rate limit requests + self._rate_limit_acquire() + async with client_session.post( redirect_url, json=file_data,