diff --git a/deepset_cloud_sdk/_s3/upload.py b/deepset_cloud_sdk/_s3/upload.py index 123b4abe..d047b277 100644 --- a/deepset_cloud_sdk/_s3/upload.py +++ b/deepset_cloud_sdk/_s3/upload.py @@ -30,7 +30,12 @@ class RetryableHttpError(Exception): """An error that indicates a function should be retried.""" def __init__( - self, error: Union[aiohttp.ClientResponseError, aiohttp.ServerDisconnectedError, aiohttp.ClientConnectionError] + self, + error: Union[ + aiohttp.ClientResponseError, + aiohttp.ServerDisconnectedError, + aiohttp.ClientConnectionError, + ], ) -> None: """Store the original exception.""" self.error = error @@ -76,7 +81,12 @@ def make_safe_file_name(file_name: str) -> str: class S3: """Client for S3 operations related to deepset Cloud uploads.""" - def __init__(self, concurrency: int = 120, rate_limit: Rate = Rate(3000, Duration.SECOND), max_attempts: int = 5): + def __init__( + self, + concurrency: int = 120, + rate_limit: Rate = Rate(3000, Duration.SECOND), + max_attempts: int = 5, + ): """ Initialize the client. @@ -99,8 +109,15 @@ async def __aexit__( ) -> None: """Exit the context manager.""" await self.connector.close() - for bucket in self.limiter.buckets(): - self.limiter.dispose(bucket) + + # Handle limiter cleanup based on available methods + # Support both older and newer versions of pyrate_limiter + # In version 3.7.0, the dispose method was added to the Limiter class + # See diff here: https://github.com/vutran1710/PyrateLimiter/compare/v3.6.2...master + try: + list(map(self.limiter.dispose, self.limiter.buckets())) + except AttributeError: + pass async def _upload_file_with_retries( self, @@ -257,7 +274,9 @@ async def upload_from_memory( return S3UploadResult(file_name=file_name, success=False, exception=exception) async def _process_results( - self, tasks: List[Coroutine[Any, Any, S3UploadResult]], show_progress: bool = True + self, + tasks: List[Coroutine[Any, Any, S3UploadResult]], + show_progress: bool = True, ) -> S3UploadSummary: """Summarize the results of the uploads to S3. @@ -297,7 +316,10 @@ async def _process_results( return result_summary async def upload_files_from_paths( - self, upload_session: UploadSession, file_paths: List[Path], show_progress: bool = True + self, + upload_session: UploadSession, + file_paths: List[Path], + show_progress: bool = True, ) -> S3UploadSummary: """Upload a set of files to the prefixed S3 namespace given a list of paths. @@ -316,7 +338,10 @@ async def upload_files_from_paths( return result_summary async def upload_in_memory( - self, upload_session: UploadSession, files: Sequence[DeepsetCloudFileBase], show_progress: bool = True + self, + upload_session: UploadSession, + files: Sequence[DeepsetCloudFileBase], + show_progress: bool = True, ) -> S3UploadSummary: """Upload a set of files to the prefixed S3 namespace given a list of paths. @@ -326,7 +351,8 @@ async def upload_in_memory( :return: S3UploadSummary object. """ async with aiohttp.ClientSession( - connector=self.connector, timeout=aiohttp.ClientTimeout(total=ASYNC_CLIENT_TIMEOUT) + connector=self.connector, + timeout=aiohttp.ClientTimeout(total=ASYNC_CLIENT_TIMEOUT), ) as client_session: tasks = [] @@ -339,7 +365,12 @@ async def upload_in_memory( if file.meta is not None: meta_name = f"{file_name}.meta.json" tasks.append( - self.upload_from_memory(meta_name, upload_session, file.meta_as_string(), client_session) + self.upload_from_memory( + meta_name, + upload_session, + file.meta_as_string(), + client_session, + ) ) result_summary = await self._process_results(tasks, show_progress=show_progress) diff --git a/pyproject.toml b/pyproject.toml index f8ad19d0..99b1682b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,7 +32,7 @@ dependencies = [ "tabulate>=0.9.0", "tqdm>=4.66.4", "yaspin>=3.0.0", - "pyrate-limiter>=3.6.0", + "pyrate-limiter>=3.7.0", ] [project.urls]