Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 19 additions & 8 deletions deepset_cloud_sdk/_s3/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from http import HTTPStatus
from pathlib import Path
from types import TracebackType
from typing import Any, Coroutine, List, Optional, Sequence, Type, Union
from typing import Any, Coroutine, Dict, List, Optional, Sequence, Type, Union

import aiofiles
import aiohttp
Expand Down Expand Up @@ -94,7 +94,16 @@ def __init__(
"""
self.connector = aiohttp.TCPConnector(limit=concurrency)
self.semaphore = asyncio.BoundedSemaphore(concurrency)
self.limiter = Limiter(rate_limit, raise_when_fail=False, max_delay=Duration.SECOND * 1)

try:
# pyrate-limiter 3.x
self.limiter = Limiter(rate_limit, raise_when_fail=False, max_delay=Duration.SECOND * 1)
self._try_acquire_kwargs: Dict[str, Any] = {}
except TypeError:
# pyrate-limiter 4.0.0+ removed raise_when_fail and max_delay
self.limiter = Limiter(rate_limit)
self._try_acquire_kwargs = {"blocking": False}

self.max_attempts = max_attempts

async def __aenter__(self) -> "S3":
Expand All @@ -110,15 +119,15 @@ async def __aexit__(
"""Exit the context manager."""
await self.connector.close()

# Handle limiter cleanup based on available methods
# Support both older and newer versions of pyrate_limiter
# In version 3.7.0, the dispose method was added to the Limiter class
# See diff here: https://github.com/vutran1710/PyrateLimiter/compare/v3.6.2...master
try:
list(map(self.limiter.dispose, self.limiter.buckets()))
except AttributeError:
pass

def _rate_limit_acquire(self) -> None:
"""Acquire a rate limit token with backward compat for pyrate-limiter 3.x and 4.x."""
self.limiter.try_acquire("", **self._try_acquire_kwargs)

async def _upload_file_with_retries(
self,
file_name: str,
Expand Down Expand Up @@ -158,7 +167,8 @@ async def _upload_file(
file_data = self._build_file_data(content, aws_safe_name, aws_config)

try:
self.limiter.try_acquire("") # rate limit requests
self._rate_limit_acquire()

async with client_session.post(
aws_config.url,
data=file_data,
Expand All @@ -172,7 +182,8 @@ async def _upload_file(
# for example during automatic redirects. See https://github.com/aio-libs/aiohttp/issues/5577
redirect_url = response.headers["Location"]
file_data = self._build_file_data(content, aws_safe_name, aws_config)
self.limiter.try_acquire("") # rate limit requests
self._rate_limit_acquire()

async with client_session.post(
redirect_url,
json=file_data,
Expand Down
Loading