Skip to content

Commit 66bd94f

Browse files
feat(supabase): add SupabaseBucketDownloader component (#3250)
Co-authored-by: David S. Batista <dsbatista@gmail.com>
1 parent ae36114 commit 66bd94f

6 files changed

Lines changed: 350 additions & 5 deletions

File tree

integrations/supabase/pydoc/config_docusaurus.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ loaders:
33
- haystack_integrations.document_stores.supabase.document_store
44
- haystack_integrations.components.retrievers.supabase.embedding_retriever
55
- haystack_integrations.components.retrievers.supabase.keyword_retriever
6+
- haystack_integrations.components.downloaders.supabase.supabase_bucket_downloader
67
search_path: [../src]
78
processors:
89
- type: filter

integrations/supabase/pyproject.toml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ classifiers = [
2323
"Programming Language :: Python :: Implementation :: CPython",
2424
"Programming Language :: Python :: Implementation :: PyPy",
2525
]
26-
dependencies = ["haystack-ai>=2.26.1", "pgvector-haystack>=6.3.0"]
26+
dependencies = ["haystack-ai>=2.26.1", "pgvector-haystack>=6.3.0", "supabase>=2.9.0"]
2727

2828
[project.urls]
2929
Documentation = "https://github.com/deepset-ai/haystack-core-integrations/tree/main/integrations/supabase#readme"
@@ -64,9 +64,9 @@ dependencies = [
6464
unit = 'pytest -m "not integration" {args:tests}'
6565
integration = 'pytest -m "integration" {args:tests}'
6666
all = 'pytest {args:tests}'
67-
unit-cov-retry = 'pytest --cov=haystack_integrations.document_stores.supabase --cov=haystack_integrations.components.retrievers.supabase --reruns 3 --reruns-delay 30 -x -m "not integration" {args:tests}'
68-
integration-cov-append-retry = 'pytest --cov=haystack_integrations.document_stores.supabase --cov=haystack_integrations.components.retrievers.supabase --cov-append --reruns 3 --reruns-delay 30 -x -m "integration" {args:tests}'
69-
types = "mypy -p haystack_integrations.document_stores.supabase -p haystack_integrations.components.retrievers.supabase {args}"
67+
unit-cov-retry = 'pytest --cov=haystack_integrations.document_stores.supabase --cov=haystack_integrations.components.retrievers.supabase --cov=haystack_integrations.components.downloaders.supabase --reruns 3 --reruns-delay 30 -x -m "not integration" {args:tests}'
68+
integration-cov-append-retry = 'pytest --cov=haystack_integrations.document_stores.supabase --cov=haystack_integrations.components.retrievers.supabase --cov=haystack_integrations.components.downloaders.supabase --cov-append --reruns 3 --reruns-delay 30 -x -m "integration" {args:tests}'
69+
types = "mypy -p haystack_integrations.document_stores.supabase -p haystack_integrations.components.retrievers.supabase -p haystack_integrations.components.downloaders.supabase {args}"
7070

7171
[tool.mypy]
7272
install_types = true
@@ -142,7 +142,7 @@ ban-relative-imports = "parents"
142142
"tests/**/*" = ["PLR2004", "S101", "TID252", "D", "ANN"]
143143

144144
[tool.coverage.run]
145-
source = ["haystack_integrations.document_stores.supabase", "haystack_integrations.components.retrievers.supabase"]
145+
source = ["haystack_integrations.document_stores.supabase", "haystack_integrations.components.retrievers.supabase", "haystack_integrations.components.downloaders.supabase"]
146146
branch = true
147147
parallel = false
148148
relative_files = true

integrations/supabase/src/haystack_integrations/components/downloaders/py.typed

Whitespace-only changes.
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# SPDX-FileCopyrightText: 2023-present deepset GmbH <info@deepset.ai>
2+
#
3+
# SPDX-License-Identifier: Apache-2.0
4+
5+
from haystack_integrations.components.downloaders.supabase.supabase_bucket_downloader import SupabaseBucketDownloader
6+
7+
__all__ = ["SupabaseBucketDownloader"]
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
# SPDX-FileCopyrightText: 2023-present deepset GmbH <info@deepset.ai>
2+
#
3+
# SPDX-License-Identifier: Apache-2.0
4+
5+
import mimetypes
6+
from pathlib import Path
7+
from typing import Any, cast
8+
9+
from haystack import component, default_from_dict, default_to_dict, logging
10+
from haystack.dataclasses import ByteStream
11+
from haystack.utils.auth import Secret, deserialize_secrets_inplace
12+
13+
from supabase import Client, create_client
14+
15+
logger = logging.getLogger(__name__)
16+
17+
18+
@component
19+
class SupabaseBucketDownloader:
20+
"""
21+
Downloads files from a Supabase Storage bucket and returns them as ByteStream objects.
22+
23+
Files are downloaded in-memory and returned as `ByteStream` objects ready for further
24+
processing in indexing pipelines (e.g. passing to a `DocumentConverter`).
25+
26+
Example usage:
27+
28+
```python
29+
from haystack_integrations.components.downloaders.supabase import SupabaseBucketDownloader
30+
from haystack.utils import Secret
31+
32+
downloader = SupabaseBucketDownloader(
33+
supabase_url="https://<project-ref>.supabase.co",
34+
supabase_key=Secret.from_env_var("SUPABASE_SERVICE_KEY"),
35+
bucket_name="my-documents",
36+
)
37+
result = downloader.run(sources=["reports/report.pdf", "data/notes.txt"])
38+
streams = result["streams"]
39+
```
40+
"""
41+
42+
def __init__(
43+
self,
44+
*,
45+
supabase_url: str,
46+
supabase_key: Secret = Secret.from_env_var("SUPABASE_SERVICE_KEY"),
47+
bucket_name: str,
48+
file_extensions: list[str] | None = None,
49+
) -> None:
50+
"""
51+
Creates a new SupabaseBucketDownloader instance.
52+
53+
:param supabase_url: The URL of your Supabase project, e.g. `https://<project-ref>.supabase.co`.
54+
:param supabase_key: The Supabase API key used to authenticate requests. Defaults to the
55+
`SUPABASE_SERVICE_KEY` environment variable. Use the service role key for private buckets.
56+
:param bucket_name: The name of the Supabase Storage bucket to download files from.
57+
:param file_extensions: Optional list of file extensions to filter downloads (e.g. `[".pdf", ".txt"]`).
58+
If `None`, all files are downloaded. Extensions are matched case-insensitively.
59+
"""
60+
self.supabase_url = supabase_url
61+
self.supabase_key = supabase_key
62+
self.bucket_name = bucket_name
63+
self.file_extensions = [e.lower() for e in file_extensions] if file_extensions else None
64+
self._client: Client | None = None
65+
66+
def warm_up(self) -> None:
67+
"""
68+
Initializes the Supabase client.
69+
70+
Called automatically on the first run(), or can be called explicitly in a pipeline.
71+
"""
72+
if self._client is None:
73+
key = self.supabase_key.resolve_value()
74+
if not key:
75+
msg = "Supabase API key could not be resolved. Set the SUPABASE_SERVICE_KEY environment variable."
76+
raise ValueError(msg)
77+
self._client = create_client(self.supabase_url, key)
78+
79+
@component.output_types(streams=list[ByteStream])
80+
def run(self, sources: list[str]) -> dict[str, list[ByteStream]]:
81+
"""
82+
Downloads files from the Supabase Storage bucket.
83+
84+
:param sources: List of file paths within the bucket to download,
85+
e.g. `["folder/file.pdf", "notes.txt"]`.
86+
:returns: A dictionary with:
87+
- `streams`: list of `ByteStream` objects, one per successfully downloaded file.
88+
Each `ByteStream` has `meta["file_path"]` and `meta["bucket_name"]` set.
89+
"""
90+
if self._client is None:
91+
self.warm_up()
92+
client = cast(Client, self._client)
93+
streams = []
94+
95+
for path in sources:
96+
if self.file_extensions is not None:
97+
ext = Path(path).suffix.lower()
98+
if ext not in self.file_extensions:
99+
logger.debug("Skipping {path} — extension not in filter list.", path=path)
100+
continue
101+
102+
try:
103+
data = client.storage.from_(self.bucket_name).download(path)
104+
except Exception as e:
105+
logger.warning(
106+
"Failed to download {path} from bucket {bucket}: {error}",
107+
path=path,
108+
bucket=self.bucket_name,
109+
error=e,
110+
)
111+
continue
112+
113+
mime_type = mimetypes.guess_type(path)[0] or "application/octet-stream"
114+
streams.append(
115+
ByteStream(
116+
data=data,
117+
meta={"file_path": path, "bucket_name": self.bucket_name},
118+
mime_type=mime_type,
119+
)
120+
)
121+
122+
return {"streams": streams}
123+
124+
def to_dict(self) -> dict[str, Any]:
125+
"""
126+
Serializes the component to a dictionary.
127+
128+
:returns: Dictionary with serialized data.
129+
"""
130+
return default_to_dict(
131+
self,
132+
supabase_url=self.supabase_url,
133+
supabase_key=self.supabase_key.to_dict(),
134+
bucket_name=self.bucket_name,
135+
file_extensions=self.file_extensions,
136+
)
137+
138+
@classmethod
139+
def from_dict(cls, data: dict[str, Any]) -> "SupabaseBucketDownloader":
140+
"""
141+
Deserializes the component from a dictionary.
142+
143+
:param data: Dictionary to deserialize from.
144+
:returns: Deserialized component.
145+
"""
146+
deserialize_secrets_inplace(data["init_parameters"], ["supabase_key"])
147+
return default_from_dict(cls, data)
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
# SPDX-FileCopyrightText: 2023-present deepset GmbH <info@deepset.ai>
2+
#
3+
# SPDX-License-Identifier: Apache-2.0
4+
5+
import os
6+
from unittest.mock import MagicMock, patch
7+
8+
import pytest
9+
from haystack.core.serialization import component_from_dict, component_to_dict
10+
from haystack.dataclasses import ByteStream
11+
from haystack.utils import Secret
12+
13+
from haystack_integrations.components.downloaders.supabase import SupabaseBucketDownloader
14+
15+
_MODULE = "haystack_integrations.components.downloaders.supabase.supabase_bucket_downloader"
16+
PATCH_PATH = f"{_MODULE}.create_client"
17+
COMPONENT_TYPE = f"{_MODULE}.SupabaseBucketDownloader"
18+
19+
20+
class TestSupabaseBucketDownloader:
21+
def test_init_default(self, monkeypatch):
22+
monkeypatch.setenv("SUPABASE_SERVICE_KEY", "test-key")
23+
downloader = SupabaseBucketDownloader(
24+
supabase_url="https://project.supabase.co",
25+
bucket_name="my-bucket",
26+
)
27+
assert downloader.supabase_url == "https://project.supabase.co"
28+
assert downloader.bucket_name == "my-bucket"
29+
assert downloader.file_extensions is None
30+
assert downloader.supabase_key.resolve_value() == "test-key"
31+
32+
def test_init_with_file_extensions(self, monkeypatch):
33+
monkeypatch.setenv("SUPABASE_SERVICE_KEY", "test-key")
34+
downloader = SupabaseBucketDownloader(
35+
supabase_url="https://project.supabase.co",
36+
bucket_name="my-bucket",
37+
file_extensions=[".PDF", ".TXT"],
38+
)
39+
assert downloader.file_extensions == [".pdf", ".txt"]
40+
41+
def test_to_dict(self, monkeypatch):
42+
monkeypatch.setenv("SUPABASE_SERVICE_KEY", "test-key")
43+
downloader = SupabaseBucketDownloader(
44+
supabase_url="https://project.supabase.co",
45+
bucket_name="docs",
46+
file_extensions=[".pdf"],
47+
)
48+
data = component_to_dict(downloader, "SupabaseBucketDownloader")
49+
assert data["type"] == COMPONENT_TYPE
50+
assert data["init_parameters"]["supabase_url"] == "https://project.supabase.co"
51+
assert data["init_parameters"]["bucket_name"] == "docs"
52+
assert data["init_parameters"]["file_extensions"] == [".pdf"]
53+
54+
def test_from_dict(self, monkeypatch):
55+
monkeypatch.setenv("SUPABASE_SERVICE_KEY", "test-key")
56+
data = {
57+
"type": COMPONENT_TYPE,
58+
"init_parameters": {
59+
"supabase_url": "https://project.supabase.co",
60+
"bucket_name": "docs",
61+
"file_extensions": [".pdf", ".txt"],
62+
"supabase_key": {"env_vars": ["SUPABASE_SERVICE_KEY"], "strict": True, "type": "env_var"},
63+
},
64+
}
65+
downloader = component_from_dict(SupabaseBucketDownloader, data, "SupabaseBucketDownloader")
66+
assert downloader.supabase_url == "https://project.supabase.co"
67+
assert downloader.bucket_name == "docs"
68+
assert downloader.file_extensions == [".pdf", ".txt"]
69+
assert downloader.supabase_key.resolve_value() == "test-key"
70+
71+
def test_warm_up(self, monkeypatch):
72+
monkeypatch.setenv("SUPABASE_SERVICE_KEY", "test-key")
73+
downloader = SupabaseBucketDownloader(
74+
supabase_url="https://project.supabase.co",
75+
supabase_key=Secret.from_token("test-key"),
76+
bucket_name="my-bucket",
77+
)
78+
assert downloader._client is None
79+
with patch(PATCH_PATH) as mock_create:
80+
downloader.warm_up()
81+
mock_create.assert_called_once_with("https://project.supabase.co", "test-key")
82+
assert downloader._client is not None
83+
84+
# Second call should not re-create the client
85+
with patch(PATCH_PATH) as mock_create2:
86+
downloader.warm_up()
87+
mock_create2.assert_not_called()
88+
89+
def test_run_returns_bytestreams(self, monkeypatch):
90+
monkeypatch.setenv("SUPABASE_SERVICE_KEY", "test-key")
91+
downloader = SupabaseBucketDownloader(
92+
supabase_url="https://project.supabase.co",
93+
supabase_key=Secret.from_token("test-key"),
94+
bucket_name="my-bucket",
95+
)
96+
mock_bucket = MagicMock()
97+
mock_bucket.download.return_value = b"file content"
98+
99+
with patch(PATCH_PATH) as mock_client:
100+
mock_client.return_value.storage.from_.return_value = mock_bucket
101+
result = downloader.run(sources=["folder/file.pdf"])
102+
103+
assert len(result["streams"]) == 1
104+
assert isinstance(result["streams"][0], ByteStream)
105+
assert result["streams"][0].data == b"file content"
106+
assert result["streams"][0].meta["file_path"] == "folder/file.pdf"
107+
assert result["streams"][0].meta["bucket_name"] == "my-bucket"
108+
assert result["streams"][0].mime_type == "application/pdf"
109+
110+
def test_run_filters_by_extension(self, monkeypatch):
111+
monkeypatch.setenv("SUPABASE_SERVICE_KEY", "test-key")
112+
downloader = SupabaseBucketDownloader(
113+
supabase_url="https://project.supabase.co",
114+
supabase_key=Secret.from_token("test-key"),
115+
bucket_name="my-bucket",
116+
file_extensions=[".pdf"],
117+
)
118+
mock_bucket = MagicMock()
119+
mock_bucket.download.return_value = b"pdf content"
120+
121+
with patch(PATCH_PATH) as mock_client:
122+
mock_client.return_value.storage.from_.return_value = mock_bucket
123+
result = downloader.run(sources=["doc.pdf", "notes.txt", "image.png"])
124+
125+
assert len(result["streams"]) == 1
126+
assert result["streams"][0].meta["file_path"] == "doc.pdf"
127+
assert mock_bucket.download.call_count == 1
128+
129+
def test_run_empty_sources(self, monkeypatch):
130+
monkeypatch.setenv("SUPABASE_SERVICE_KEY", "test-key")
131+
downloader = SupabaseBucketDownloader(
132+
supabase_url="https://project.supabase.co",
133+
supabase_key=Secret.from_token("test-key"),
134+
bucket_name="my-bucket",
135+
)
136+
with patch(PATCH_PATH):
137+
result = downloader.run(sources=[])
138+
139+
assert result["streams"] == []
140+
141+
def test_run_skips_failed_downloads(self, monkeypatch):
142+
monkeypatch.setenv("SUPABASE_SERVICE_KEY", "test-key")
143+
downloader = SupabaseBucketDownloader(
144+
supabase_url="https://project.supabase.co",
145+
supabase_key=Secret.from_token("test-key"),
146+
bucket_name="my-bucket",
147+
)
148+
mock_bucket = MagicMock()
149+
mock_bucket.download.side_effect = [Exception("Not found"), b"good content"]
150+
151+
with patch(PATCH_PATH) as mock_client:
152+
mock_client.return_value.storage.from_.return_value = mock_bucket
153+
result = downloader.run(sources=["missing.pdf", "exists.txt"])
154+
155+
assert len(result["streams"]) == 1
156+
assert result["streams"][0].meta["file_path"] == "exists.txt"
157+
158+
def test_run_sets_mime_type(self, monkeypatch):
159+
monkeypatch.setenv("SUPABASE_SERVICE_KEY", "test-key")
160+
downloader = SupabaseBucketDownloader(
161+
supabase_url="https://project.supabase.co",
162+
supabase_key=Secret.from_token("test-key"),
163+
bucket_name="my-bucket",
164+
)
165+
mock_bucket = MagicMock()
166+
mock_bucket.download.return_value = b"data"
167+
168+
with patch(PATCH_PATH) as mock_client:
169+
mock_client.return_value.storage.from_.return_value = mock_bucket
170+
result = downloader.run(sources=["doc.pdf", "unknown.totallyfakeext999"])
171+
172+
assert result["streams"][0].mime_type == "application/pdf"
173+
assert result["streams"][1].mime_type == "application/octet-stream"
174+
175+
@pytest.mark.skipif(
176+
not os.environ.get("SUPABASE_SERVICE_KEY") or not os.environ.get("SUPABASE_URL"),
177+
reason="Export SUPABASE_URL and SUPABASE_SERVICE_KEY to run integration tests.",
178+
)
179+
@pytest.mark.integration
180+
def test_run_integration(self):
181+
downloader = SupabaseBucketDownloader(
182+
supabase_url=os.environ["SUPABASE_URL"],
183+
supabase_key=Secret.from_env_var("SUPABASE_SERVICE_KEY"),
184+
bucket_name=os.environ.get("SUPABASE_BUCKET_NAME", "test-bucket"),
185+
)
186+
result = downloader.run(sources=["test-file.txt"])
187+
assert len(result["streams"]) > 0
188+
assert isinstance(result["streams"][0], ByteStream)
189+
assert result["streams"][0].meta["file_path"] == "test-file.txt"
190+
assert result["streams"][0].meta["bucket_name"] == os.environ.get("SUPABASE_BUCKET_NAME", "test-bucket")

0 commit comments

Comments
 (0)