Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions integrations/supabase/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ classifiers = [
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: Implementation :: PyPy",
]
dependencies = ["haystack-ai>=2.26.1", "pgvector-haystack>=6.3.0"]
dependencies = ["haystack-ai>=2.26.1", "pgvector-haystack>=6.3.0", "supabase>=2.9.0"]

[project.urls]
Documentation = "https://github.com/deepset-ai/haystack-core-integrations/tree/main/integrations/supabase#readme"
Expand Down Expand Up @@ -64,9 +64,9 @@ dependencies = [
unit = 'pytest -m "not integration" {args:tests}'
integration = 'pytest -m "integration" {args:tests}'
all = 'pytest {args:tests}'
unit-cov-retry = 'pytest --cov=haystack_integrations.document_stores.supabase --cov=haystack_integrations.components.retrievers.supabase --reruns 3 --reruns-delay 30 -x -m "not integration" {args:tests}'
integration-cov-append-retry = 'pytest --cov=haystack_integrations.document_stores.supabase --cov=haystack_integrations.components.retrievers.supabase --cov-append --reruns 3 --reruns-delay 30 -x -m "integration" {args:tests}'
types = "mypy -p haystack_integrations.document_stores.supabase -p haystack_integrations.components.retrievers.supabase {args}"
unit-cov-retry = 'pytest --cov=haystack_integrations.document_stores.supabase --cov=haystack_integrations.components.retrievers.supabase --cov=haystack_integrations.components.downloaders.supabase --reruns 3 --reruns-delay 30 -x -m "not integration" {args:tests}'
integration-cov-append-retry = 'pytest --cov=haystack_integrations.document_stores.supabase --cov=haystack_integrations.components.retrievers.supabase --cov=haystack_integrations.components.downloaders.supabase --cov-append --reruns 3 --reruns-delay 30 -x -m "integration" {args:tests}'
types = "mypy -p haystack_integrations.document_stores.supabase -p haystack_integrations.components.retrievers.supabase -p haystack_integrations.components.downloaders.supabase {args}"

[tool.mypy]
install_types = true
Expand Down Expand Up @@ -142,7 +142,7 @@ ban-relative-imports = "parents"
"tests/**/*" = ["PLR2004", "S101", "TID252", "D", "ANN"]

[tool.coverage.run]
source = ["haystack_integrations.document_stores.supabase", "haystack_integrations.components.retrievers.supabase"]
source = ["haystack_integrations.document_stores.supabase", "haystack_integrations.components.retrievers.supabase", "haystack_integrations.components.downloaders.supabase"]
branch = true
parallel = false
relative_files = true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# SPDX-FileCopyrightText: 2023-present deepset GmbH <info@deepset.ai>
#
# SPDX-License-Identifier: Apache-2.0

from haystack_integrations.components.downloaders.supabase.supabase_bucket_downloader import SupabaseBucketDownloader

__all__ = ["SupabaseBucketDownloader"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# SPDX-FileCopyrightText: 2023-present deepset GmbH <info@deepset.ai>
#
# SPDX-License-Identifier: Apache-2.0

import mimetypes
from pathlib import Path
from typing import Any

from haystack import component, default_from_dict, default_to_dict, logging
from haystack.dataclasses import ByteStream
from haystack.utils.auth import Secret, deserialize_secrets_inplace

from supabase import create_client

logger = logging.getLogger(__name__)


@component
class SupabaseBucketDownloader:
"""
Downloads files from a Supabase Storage bucket and returns them as ByteStream objects.

Files are downloaded in-memory and returned as `ByteStream` objects ready for further
processing in indexing pipelines (e.g. passing to a `DocumentConverter`).

Example usage:

```python
from haystack_integrations.components.downloaders.supabase import SupabaseBucketDownloader
from haystack.utils import Secret

downloader = SupabaseBucketDownloader(
supabase_url="https://<project-ref>.supabase.co",
supabase_key=Secret.from_env_var("SUPABASE_SERVICE_KEY"),
bucket_name="my-documents",
)
result = downloader.run(sources=["reports/report.pdf", "data/notes.txt"])
streams = result["streams"]
```
"""

def __init__(
self,
*,
supabase_url: str,
supabase_key: Secret = Secret.from_env_var("SUPABASE_SERVICE_KEY"),
bucket_name: str,
file_extensions: list[str] | None = None,
) -> None:
"""
Creates a new SupabaseBucketDownloader instance.

:param supabase_url: The URL of your Supabase project, e.g. `https://<project-ref>.supabase.co`.
:param supabase_key: The Supabase API key used to authenticate requests. Defaults to the
`SUPABASE_SERVICE_KEY` environment variable. Use the service role key for private buckets.
:param bucket_name: The name of the Supabase Storage bucket to download files from.
:param file_extensions: Optional list of file extensions to filter downloads (e.g. `[".pdf", ".txt"]`).
If `None`, all files are downloaded. Extensions are matched case-insensitively.
"""
self.supabase_url = supabase_url
self.supabase_key = supabase_key
self.bucket_name = bucket_name
self.file_extensions = [e.lower() for e in file_extensions] if file_extensions else None

@component.output_types(streams=list[ByteStream])
def run(self, sources: list[str]) -> dict[str, list[ByteStream]]:
"""
Downloads files from the Supabase Storage bucket.

:param sources: List of file paths within the bucket to download,
e.g. `["folder/file.pdf", "notes.txt"]`.
:returns: A dictionary with:
- `streams`: list of `ByteStream` objects, one per successfully downloaded file.
Each `ByteStream` has `meta["file_path"]` and `meta["bucket_name"]` set.
"""
key = self.supabase_key.resolve_value()
if not key:
msg = "Supabase API key could not be resolved. Set the SUPABASE_SERVICE_KEY environment variable."
raise ValueError(msg)
client = create_client(self.supabase_url, key)
streams = []

for path in sources:
if self.file_extensions is not None:
ext = Path(path).suffix.lower()
if ext not in self.file_extensions:
logger.debug("Skipping {path} — extension not in filter list.", path=path)
continue

try:
data = client.storage.from_(self.bucket_name).download(path)
except Exception as e:
logger.warning(
"Failed to download {path} from bucket {bucket}: {error}",
path=path,
bucket=self.bucket_name,
error=e,
)
continue

mime_type = mimetypes.guess_type(path)[0] or "application/octet-stream"
streams.append(
ByteStream(
data=data,
meta={"file_path": path, "bucket_name": self.bucket_name},
mime_type=mime_type,
)
)

return {"streams": streams}

def to_dict(self) -> dict[str, Any]:
"""
Serializes the component to a dictionary.

:returns: Dictionary with serialized data.
"""
return default_to_dict(
self,
supabase_url=self.supabase_url,
supabase_key=self.supabase_key.to_dict(),
bucket_name=self.bucket_name,
file_extensions=self.file_extensions,
)

@classmethod
def from_dict(cls, data: dict[str, Any]) -> "SupabaseBucketDownloader":
"""
Deserializes the component from a dictionary.

:param data: Dictionary to deserialize from.
:returns: Deserialized component.
"""
deserialize_secrets_inplace(data["init_parameters"], ["supabase_key"])
return default_from_dict(cls, data)
169 changes: 169 additions & 0 deletions integrations/supabase/tests/test_supabase_bucket_downloader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
# SPDX-FileCopyrightText: 2023-present deepset GmbH <info@deepset.ai>
#
# SPDX-License-Identifier: Apache-2.0

import os
from unittest.mock import MagicMock, patch

import pytest
from haystack.core.serialization import component_from_dict, component_to_dict
from haystack.dataclasses import ByteStream
from haystack.utils import Secret

from haystack_integrations.components.downloaders.supabase import SupabaseBucketDownloader

_MODULE = "haystack_integrations.components.downloaders.supabase.supabase_bucket_downloader"
PATCH_PATH = f"{_MODULE}.create_client"
COMPONENT_TYPE = f"{_MODULE}.SupabaseBucketDownloader"


class TestSupabaseBucketDownloader:
def test_init_default(self, monkeypatch):
monkeypatch.setenv("SUPABASE_SERVICE_KEY", "test-key")
downloader = SupabaseBucketDownloader(
supabase_url="https://project.supabase.co",
bucket_name="my-bucket",
)
assert downloader.supabase_url == "https://project.supabase.co"
assert downloader.bucket_name == "my-bucket"
assert downloader.file_extensions is None
assert downloader.supabase_key.resolve_value() == "test-key"

def test_init_with_file_extensions(self, monkeypatch):
monkeypatch.setenv("SUPABASE_SERVICE_KEY", "test-key")
downloader = SupabaseBucketDownloader(
supabase_url="https://project.supabase.co",
bucket_name="my-bucket",
file_extensions=[".PDF", ".TXT"],
)
assert downloader.file_extensions == [".pdf", ".txt"]

def test_to_dict(self, monkeypatch):
monkeypatch.setenv("SUPABASE_SERVICE_KEY", "test-key")
downloader = SupabaseBucketDownloader(
supabase_url="https://project.supabase.co",
bucket_name="docs",
file_extensions=[".pdf"],
)
data = component_to_dict(downloader, "SupabaseBucketDownloader")
assert data["type"] == COMPONENT_TYPE
assert data["init_parameters"]["supabase_url"] == "https://project.supabase.co"
assert data["init_parameters"]["bucket_name"] == "docs"
assert data["init_parameters"]["file_extensions"] == [".pdf"]

def test_from_dict(self, monkeypatch):
monkeypatch.setenv("SUPABASE_SERVICE_KEY", "test-key")
data = {
"type": COMPONENT_TYPE,
"init_parameters": {
"supabase_url": "https://project.supabase.co",
"bucket_name": "docs",
"file_extensions": [".pdf", ".txt"],
"supabase_key": {"env_vars": ["SUPABASE_SERVICE_KEY"], "strict": True, "type": "env_var"},
},
}
downloader = component_from_dict(SupabaseBucketDownloader, data, "SupabaseBucketDownloader")
assert downloader.supabase_url == "https://project.supabase.co"
assert downloader.bucket_name == "docs"
assert downloader.file_extensions == [".pdf", ".txt"]

def test_run_returns_bytestreams(self, monkeypatch):
monkeypatch.setenv("SUPABASE_SERVICE_KEY", "test-key")
downloader = SupabaseBucketDownloader(
supabase_url="https://project.supabase.co",
supabase_key=Secret.from_token("test-key"),
bucket_name="my-bucket",
)
mock_bucket = MagicMock()
mock_bucket.download.return_value = b"file content"

with patch(PATCH_PATH) as mock_client:
mock_client.return_value.storage.from_.return_value = mock_bucket
result = downloader.run(sources=["folder/file.pdf"])

assert len(result["streams"]) == 1
assert isinstance(result["streams"][0], ByteStream)
assert result["streams"][0].data == b"file content"
assert result["streams"][0].meta["file_path"] == "folder/file.pdf"
assert result["streams"][0].meta["bucket_name"] == "my-bucket"
assert result["streams"][0].mime_type == "application/pdf"

def test_run_filters_by_extension(self, monkeypatch):
monkeypatch.setenv("SUPABASE_SERVICE_KEY", "test-key")
downloader = SupabaseBucketDownloader(
supabase_url="https://project.supabase.co",
supabase_key=Secret.from_token("test-key"),
bucket_name="my-bucket",
file_extensions=[".pdf"],
)
mock_bucket = MagicMock()
mock_bucket.download.return_value = b"pdf content"

with patch(PATCH_PATH) as mock_client:
mock_client.return_value.storage.from_.return_value = mock_bucket
result = downloader.run(sources=["doc.pdf", "notes.txt", "image.png"])

assert len(result["streams"]) == 1
assert result["streams"][0].meta["file_path"] == "doc.pdf"
assert mock_bucket.download.call_count == 1

def test_run_empty_sources(self, monkeypatch):
monkeypatch.setenv("SUPABASE_SERVICE_KEY", "test-key")
downloader = SupabaseBucketDownloader(
supabase_url="https://project.supabase.co",
supabase_key=Secret.from_token("test-key"),
bucket_name="my-bucket",
)
with patch(PATCH_PATH):
result = downloader.run(sources=[])

assert result["streams"] == []

def test_run_skips_failed_downloads(self, monkeypatch):
monkeypatch.setenv("SUPABASE_SERVICE_KEY", "test-key")
downloader = SupabaseBucketDownloader(
supabase_url="https://project.supabase.co",
supabase_key=Secret.from_token("test-key"),
bucket_name="my-bucket",
)
mock_bucket = MagicMock()
mock_bucket.download.side_effect = [Exception("Not found"), b"good content"]

with patch(PATCH_PATH) as mock_client:
mock_client.return_value.storage.from_.return_value = mock_bucket
result = downloader.run(sources=["missing.pdf", "exists.txt"])

assert len(result["streams"]) == 1
assert result["streams"][0].meta["file_path"] == "exists.txt"

def test_run_sets_mime_type(self, monkeypatch):
monkeypatch.setenv("SUPABASE_SERVICE_KEY", "test-key")
downloader = SupabaseBucketDownloader(
supabase_url="https://project.supabase.co",
supabase_key=Secret.from_token("test-key"),
bucket_name="my-bucket",
)
mock_bucket = MagicMock()
mock_bucket.download.return_value = b"data"

with patch(PATCH_PATH) as mock_client:
mock_client.return_value.storage.from_.return_value = mock_bucket
result = downloader.run(sources=["doc.pdf", "unknown.totallyfakeext999"])

assert result["streams"][0].mime_type == "application/pdf"
assert result["streams"][1].mime_type == "application/octet-stream"

@pytest.mark.skipif(
not os.environ.get("SUPABASE_SERVICE_KEY") or not os.environ.get("SUPABASE_URL"),
reason="Export SUPABASE_URL and SUPABASE_SERVICE_KEY to run integration tests.",
)
@pytest.mark.integration
def test_run_integration(self):
downloader = SupabaseBucketDownloader(
supabase_url=os.environ["SUPABASE_URL"],
supabase_key=Secret.from_env_var("SUPABASE_SERVICE_KEY"),
bucket_name=os.environ.get("SUPABASE_BUCKET_NAME", "test-bucket"),
)
result = downloader.run(sources=["test-file.txt"])
assert len(result["streams"]) > 0
assert isinstance(result["streams"][0], ByteStream)
Loading