Skip to content

Commit 0a19c55

Browse files
authored
feat: add a new S3Downloader component (#2192)
* Add s3downloader component * Small updates * Add test files * Update tests * Update component * Restructure * Update methods * use download_file instead of get_object * Update the component * Fix linting * Add an example * Add an example * Remove ssl param * remove sources * Add storage and settings * Remove s3_key * Add utils file * PR comments * Update S3Storage * Add tests * Update dependency * Update tests * Add errors file * Add a pipeline example * Fix docstrings * Update tests * PR comments * Add py.types * Update Read Me * PR comments * Add a fix for credentials error * Fix typing errors * Fix typing errors * Fix all errors * Update workflow * Update tests * More errors fix
1 parent 10c6248 commit 0a19c55

14 files changed

Lines changed: 724 additions & 3 deletions

File tree

.github/workflows/amazon_bedrock.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,12 @@ permissions:
2626
env:
2727
PYTHONUNBUFFERED: "1"
2828
FORCE_COLOR: "1"
29+
2930
AWS_REGION: "us-east-1"
3031
AWS_BEDROCK_GUARDRAIL_ID: ${{ secrets.AWS_BEDROCK_GUARDRAIL_ID }}
3132
AWS_BEDROCK_GUARDRAIL_VERSION: "1"
33+
S3_DOWNLOADER_BUCKET: ${{ secrets.S3_DOWNLOADER_BUCKET }}
34+
3235

3336
jobs:
3437
run:

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ Please check out our [Contribution Guidelines](CONTRIBUTING.md) for all the deta
2525

2626
| Package | Type | PyPi Package | Status |
2727
|----------------------------------------------------------------------------------------------------------------|-----------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
28-
| [amazon-bedrock-haystack](integrations/amazon_bedrock/) | Embedder, Generator, Ranker | [![PyPI - Version](https://img.shields.io/pypi/v/amazon-bedrock-haystack.svg)](https://pypi.org/project/amazon-bedrock-haystack) | [![Test / amazon_bedrock](https://github.com/deepset-ai/haystack-core-integrations/actions/workflows/amazon_bedrock.yml/badge.svg)](https://github.com/deepset-ai/haystack-core-integrations/actions/workflows/amazon_bedrock.yml) |
28+
| [amazon-bedrock-haystack](integrations/amazon_bedrock/) | Embedder, Generator, Ranker, Downloader | [![PyPI - Version](https://img.shields.io/pypi/v/amazon-bedrock-haystack.svg)](https://pypi.org/project/amazon-bedrock-haystack) | [![Test / amazon_bedrock](https://github.com/deepset-ai/haystack-core-integrations/actions/workflows/amazon_bedrock.yml/badge.svg)](https://github.com/deepset-ai/haystack-core-integrations/actions/workflows/amazon_bedrock.yml) |
2929
| [amazon-sagemaker-haystack](integrations/amazon_sagemaker/) | Generator | [![PyPI - Version](https://img.shields.io/pypi/v/amazon-sagemaker-haystack.svg)](https://pypi.org/project/amazon-sagemaker-haystack) | [![Test / amazon_sagemaker](https://github.com/deepset-ai/haystack-core-integrations/actions/workflows/amazon_sagemaker.yml/badge.svg)](https://github.com/deepset-ai/haystack-core-integrations/actions/workflows/amazon_sagemaker.yml) |
3030
| [anthropic-haystack](integrations/anthropic/) | Generator | [![PyPI - Version](https://img.shields.io/pypi/v/anthropic-haystack.svg)](https://pypi.org/project/anthropic-haystack) | [![Test / anthropic](https://github.com/deepset-ai/haystack-core-integrations/actions/workflows/anthropic.yml/badge.svg)](https://github.com/deepset-ai/haystack-core-integrations/actions/workflows/anthropic.yml) |
3131
| [astra-haystack](integrations/astra/) | Document Store | [![PyPI - Version](https://img.shields.io/pypi/v/astra-haystack.svg)](https://pypi.org/project/astra-haystack) | [![Test / astra](https://github.com/deepset-ai/haystack-core-integrations/actions/workflows/astra.yml/badge.svg)](https://github.com/deepset-ai/haystack-core-integrations/actions/workflows/astra.yml) |
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
# To run this example, you will need to
2+
# 1) set `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY` and `AWS_DEFAULT_REGION` environment variables
3+
# 2) enabled access to the selected S3 bucket
4+
# 3) `S3_DOWNLOADER_BUCKET` environment variable should be set to the name of the S3 bucket.
5+
6+
# The example shows how to use the S3Downloader component in a query pipeline to download files from an S3 bucket.
7+
# To run this example, set the file_name in docs to your files in the S3 bucket.
8+
# The files are then downloaded, converted to images and used to answer a question.
9+
10+
11+
from pathlib import Path
12+
from uuid import uuid4
13+
14+
from haystack import Pipeline
15+
from haystack.components.builders import ChatPromptBuilder
16+
from haystack.components.converters.image import DocumentToImageContent
17+
from haystack.components.routers import DocumentTypeRouter
18+
from haystack.dataclasses import Document
19+
20+
from haystack_integrations.components.downloaders.s3.s3_downloader import S3Downloader
21+
from haystack_integrations.components.generators.amazon_bedrock import AmazonBedrockChatGenerator
22+
23+
docs = [
24+
Document(meta={"file_id": str(uuid4()), "file_name": "text-sample.txt"}),
25+
Document(meta={"file_id": str(uuid4()), "file_name": "document-sample.pdf", "page_number": 1}),
26+
]
27+
28+
chat_prompt_builder = ChatPromptBuilder(
29+
required_variables=["question"],
30+
template="""{% message role="system" %}
31+
You are a friendly assistant that answers questions based on provided documents and images.
32+
{% endmessage %}
33+
34+
{%- message role="user" -%}
35+
Only provide an answer to the question using the images and text passages provided.
36+
37+
These are the text-only documents:
38+
{%- if documents|length > 0 %}
39+
{%- for doc in documents %}
40+
Text Document [{{ loop.index }}] :
41+
{{ doc.content }}
42+
{% endfor -%}
43+
{%- else %}
44+
No relevant text documents were found.
45+
{% endif %}
46+
End of text documents.
47+
48+
Question: {{ question }}
49+
Answer:
50+
51+
Images:
52+
{%- if image_contents|length > 0 %}
53+
{%- for img in image_contents -%}
54+
{{ img | templatize_part }}
55+
{%- endfor -%}
56+
{% endif %}
57+
{%- endmessage -%}
58+
""",
59+
)
60+
61+
pipe = Pipeline()
62+
pipe.add_component(
63+
"s3_downloader", S3Downloader(file_root_path=str(Path.cwd() / "s3_downloads"), file_extensions=[".pdf"])
64+
)
65+
pipe.add_component(
66+
"doc_type_router", DocumentTypeRouter(file_path_meta_field="file_path", mime_types=["application/pdf"])
67+
)
68+
pipe.add_component("doc_to_image", DocumentToImageContent(detail="auto"))
69+
pipe.add_component("chat_prompt_builder", chat_prompt_builder)
70+
pipe.add_component("llm", AmazonBedrockChatGenerator(model="anthropic.claude-3-haiku-20240307-v1:0"))
71+
72+
pipe.connect("s3_downloader.documents", "doc_type_router.documents")
73+
pipe.connect("doc_type_router.application/pdf", "doc_to_image.documents")
74+
pipe.connect("doc_to_image.image_contents", "chat_prompt_builder.image_contents")
75+
pipe.connect("s3_downloader.documents", "chat_prompt_builder.documents")
76+
pipe.connect("chat_prompt_builder.prompt", "llm.messages")
77+
78+
result = pipe.run(
79+
data={
80+
"s3_downloader": {"documents": docs},
81+
"chat_prompt_builder": {"question": "What is the main topic of the document?"},
82+
}
83+
)

integrations/amazon_bedrock/pydoc/config.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ loaders:
1111
"haystack_integrations.common.amazon_bedrock.errors",
1212
"haystack_integrations.components.generators.amazon_bedrock.chat.chat_generator",
1313
"haystack_integrations.components.rankers.amazon_bedrock.ranker",
14+
"haystack_integrations.components.downloaders.s3.s3_downloader",
15+
"haystack_integrations.common.s3.utils",
16+
"haystack_integrations.common.s3.errors",
1417
]
1518
ignore_when_discovered: ["__init__"]
1619
processors:

integrations/amazon_bedrock/pyproject.toml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ build-backend = "hatchling.build"
55
[project]
66
name = "amazon-bedrock-haystack"
77
dynamic = ["version"]
8-
description = 'An integration of Amazon Bedrock as an AmazonBedrockGenerator component.'
8+
description = 'An integration of AWS S3 and Bedrock as a Downloader and Generator components.'
99
readme = "README.md"
1010
requires-python = ">=3.9"
1111
license = "Apache-2.0"
@@ -71,7 +71,10 @@ cov-retry = 'all --cov=haystack_integrations --reruns 3 --reruns-delay 30 -x'
7171
types = """mypy -p haystack_integrations.common.amazon_bedrock \
7272
-p haystack_integrations.components.embedders.amazon_bedrock \
7373
-p haystack_integrations.components.generators.amazon_bedrock \
74-
-p haystack_integrations.components.rankers.amazon_bedrock {args}"""
74+
-p haystack_integrations.components.rankers.amazon_bedrock \
75+
-p haystack_integrations.components.downloaders.s3 \
76+
-p haystack_integrations.common.s3 {args}"""
77+
7578

7679
[tool.mypy]
7780
install_types = true

integrations/amazon_bedrock/src/haystack_integrations/common/amazon_bedrock/errors.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
# SPDX-FileCopyrightText: 2023-present deepset GmbH <info@deepset.ai>
2+
#
3+
# SPDX-License-Identifier: Apache-2.0
4+
5+
16
class AmazonBedrockError(Exception):
27
"""
38
Any error generated by the Amazon Bedrock integration.

integrations/amazon_bedrock/src/haystack_integrations/common/amazon_bedrock/utils.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
# SPDX-FileCopyrightText: 2023-present deepset GmbH <info@deepset.ai>
2+
#
3+
# SPDX-License-Identifier: Apache-2.0
4+
15
from typing import Any, Optional, Union
26

37
import aioboto3
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# SPDX-FileCopyrightText: 2023-present deepset GmbH <info@deepset.ai>
2+
#
3+
# SPDX-License-Identifier: Apache-2.0
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# SPDX-FileCopyrightText: 2023-present deepset GmbH <info@deepset.ai>
2+
#
3+
# SPDX-License-Identifier: Apache-2.0
4+
5+
6+
class S3Error(Exception):
7+
"""Exception for issues that occur in the S3 based components"""
8+
9+
10+
class S3ConfigurationError(S3Error):
11+
"""Exception raised when AmazonS3 node is not configured correctly"""
12+
13+
14+
class S3StorageError(S3Error):
15+
"""This exception is raised when an error occurs while interacting with a S3Storage object."""
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
# SPDX-FileCopyrightText: 2023-present deepset GmbH <info@deepset.ai>
2+
#
3+
# SPDX-License-Identifier: Apache-2.0
4+
5+
import os
6+
from dataclasses import dataclass
7+
from http import HTTPStatus
8+
from pathlib import Path
9+
from typing import Optional
10+
11+
from boto3.session import Session
12+
from botocore.config import Config
13+
from botocore.exceptions import ClientError, NoCredentialsError, PartialCredentialsError
14+
15+
from haystack_integrations.common.s3.errors import S3ConfigurationError, S3StorageError
16+
17+
18+
@dataclass
19+
class S3Storage:
20+
"""This class provides a storage class for downloading files from an AWS S3 bucket."""
21+
22+
def __init__(
23+
self,
24+
s3_bucket: str,
25+
session: Session,
26+
s3_prefix: Optional[str] = None,
27+
endpoint_url: Optional[str] = None,
28+
config: Optional[Config] = None,
29+
) -> None:
30+
"""
31+
Initializes the S3Storage object with the provided parameters.
32+
33+
:param s3_bucket: The name of the S3 bucket to download files from.
34+
:param session: The session to use for the S3 client.
35+
:param s3_prefix: The optional prefix of the files in the S3 bucket.
36+
Can be used to specify folder or naming structure.
37+
For example, if the file is in the folder "folder/subfolder/file.txt",
38+
the s3_prefix should be "folder/subfolder/". If the file is in the root of the S3 bucket,
39+
the s3_prefix should be None.
40+
:param endpoint_url: The endpoint URL of the S3 bucket to download files from.
41+
:param config: The configuration to use for the S3 client.
42+
"""
43+
44+
self.s3_bucket = s3_bucket
45+
self.s3_prefix = s3_prefix
46+
self.endpoint_url = endpoint_url
47+
self.session = session
48+
self.config = config
49+
50+
try:
51+
self._client = self.session.client("s3", endpoint_url=self.endpoint_url, config=self.config)
52+
except Exception as e:
53+
msg = f"Failed to create S3 session client: {e}"
54+
raise S3ConfigurationError(msg) from e
55+
56+
def download(self, key: str, local_file_path: Path) -> None:
57+
"""Download a file from S3.
58+
59+
:param key: The key of the file to download.
60+
:param local_file_path: The folder path to download the file to.
61+
It will be created if it does not exist. The file will be downloaded to
62+
the folder with the same name as the key.
63+
:raises S3ConfigurationError: If the S3 session client cannot be created.
64+
:raises S3StorageError: If the file does not exist in the S3 bucket
65+
or the file cannot be downloaded.
66+
"""
67+
68+
if self.s3_prefix:
69+
s3_key = f"{self.s3_prefix}{key}"
70+
else:
71+
s3_key = key
72+
73+
try:
74+
self._client.download_file(self.s3_bucket, s3_key, str(local_file_path))
75+
76+
except (NoCredentialsError, PartialCredentialsError) as e:
77+
msg = (
78+
f"Missing AWS credentials. Please check your AWS credentials (access key, secret key, region)."
79+
f"Error: {e}"
80+
)
81+
raise S3ConfigurationError(msg) from e
82+
83+
except ClientError as e:
84+
error_code = int(e.response["Error"]["Code"])
85+
86+
if error_code == HTTPStatus.FORBIDDEN:
87+
msg = (
88+
f"Failed to access S3 bucket {self.s3_bucket!r}. "
89+
f"Please check your AWS credentials (access key, secret key, region) and ensure "
90+
f"they have the necessary S3 permissions. "
91+
f"Error: {e}"
92+
)
93+
raise S3ConfigurationError(msg) from e
94+
95+
elif error_code == HTTPStatus.NOT_FOUND:
96+
msg = f"The object {s3_key!r} does not exist in the S3 bucket {self.s3_bucket!r}. \n Error: {e}"
97+
raise S3StorageError(msg) from e
98+
else:
99+
msg = f"Failed to download file {s3_key!r} from S3. Error: {e}"
100+
raise S3StorageError(msg) from e
101+
102+
@classmethod
103+
def from_env(cls, *, session: Session, config: Config) -> "S3Storage":
104+
"""Create a S3Storage object from environment variables."""
105+
s3_bucket = os.getenv("S3_DOWNLOADER_BUCKET")
106+
if not s3_bucket:
107+
msg = (
108+
"Missing environment variable S3_DOWNLOADER_BUCKET."
109+
"Please set it to the name of the S3 bucket to download files from."
110+
)
111+
raise ValueError(msg)
112+
s3_prefix = os.getenv("S3_DOWNLOADER_PREFIX") or None
113+
endpoint_url = os.getenv("AWS_ENDPOINT_URL") or None
114+
return cls(
115+
s3_bucket=s3_bucket,
116+
s3_prefix=s3_prefix,
117+
endpoint_url=endpoint_url,
118+
session=session,
119+
config=config,
120+
)

0 commit comments

Comments
 (0)