Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 40 additions & 9 deletions deepset_cloud_sdk/_s3/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@ class RetryableHttpError(Exception):
"""An error that indicates a function should be retried."""

def __init__(
self, error: Union[aiohttp.ClientResponseError, aiohttp.ServerDisconnectedError, aiohttp.ClientConnectionError]
self,
error: Union[
aiohttp.ClientResponseError,
aiohttp.ServerDisconnectedError,
aiohttp.ClientConnectionError,
],
Comment thread
abrahamy marked this conversation as resolved.
) -> None:
"""Store the original exception."""
self.error = error
Expand Down Expand Up @@ -76,7 +81,12 @@ def make_safe_file_name(file_name: str) -> str:
class S3:
"""Client for S3 operations related to deepset Cloud uploads."""

def __init__(self, concurrency: int = 120, rate_limit: Rate = Rate(3000, Duration.SECOND), max_attempts: int = 5):
def __init__(
self,
concurrency: int = 120,
rate_limit: Rate = Rate(3000, Duration.SECOND),
max_attempts: int = 5,
):
"""
Initialize the client.

Expand All @@ -99,8 +109,15 @@ async def __aexit__(
) -> None:
"""Exit the context manager."""
await self.connector.close()
for bucket in self.limiter.buckets():
self.limiter.dispose(bucket)

# Handle limiter cleanup based on available methods
# Support both older and newer versions of pyrate_limiter
# In version 3.7.0, the dispose method was added to the Limiter class
# See diff here: https://github.com/vutran1710/PyrateLimiter/compare/v3.6.2...master
try:
list(map(self.limiter.dispose, self.limiter.buckets()))
except AttributeError:
pass

async def _upload_file_with_retries(
self,
Expand Down Expand Up @@ -257,7 +274,9 @@ async def upload_from_memory(
return S3UploadResult(file_name=file_name, success=False, exception=exception)

async def _process_results(
self, tasks: List[Coroutine[Any, Any, S3UploadResult]], show_progress: bool = True
self,
tasks: List[Coroutine[Any, Any, S3UploadResult]],
show_progress: bool = True,
) -> S3UploadSummary:
"""Summarize the results of the uploads to S3.

Expand Down Expand Up @@ -297,7 +316,10 @@ async def _process_results(
return result_summary

async def upload_files_from_paths(
self, upload_session: UploadSession, file_paths: List[Path], show_progress: bool = True
self,
upload_session: UploadSession,
file_paths: List[Path],
show_progress: bool = True,
) -> S3UploadSummary:
"""Upload a set of files to the prefixed S3 namespace given a list of paths.

Expand All @@ -316,7 +338,10 @@ async def upload_files_from_paths(
return result_summary

async def upload_in_memory(
self, upload_session: UploadSession, files: Sequence[DeepsetCloudFileBase], show_progress: bool = True
self,
upload_session: UploadSession,
files: Sequence[DeepsetCloudFileBase],
show_progress: bool = True,
) -> S3UploadSummary:
"""Upload a set of files to the prefixed S3 namespace given a list of paths.

Expand All @@ -326,7 +351,8 @@ async def upload_in_memory(
:return: S3UploadSummary object.
"""
async with aiohttp.ClientSession(
connector=self.connector, timeout=aiohttp.ClientTimeout(total=ASYNC_CLIENT_TIMEOUT)
connector=self.connector,
timeout=aiohttp.ClientTimeout(total=ASYNC_CLIENT_TIMEOUT),
) as client_session:
tasks = []

Expand All @@ -339,7 +365,12 @@ async def upload_in_memory(
if file.meta is not None:
meta_name = f"{file_name}.meta.json"
tasks.append(
self.upload_from_memory(meta_name, upload_session, file.meta_as_string(), client_session)
self.upload_from_memory(
meta_name,
upload_session,
file.meta_as_string(),
client_session,
)
)

result_summary = await self._process_results(tasks, show_progress=show_progress)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ dependencies = [
"tabulate>=0.9.0",
"tqdm>=4.66.4",
"yaspin>=3.0.0",
"pyrate-limiter>=3.6.0",
"pyrate-limiter>=3.7.0",
]

[project.urls]
Expand Down