Skip to content

Commit 8b35e9d

Browse files
feat(supabase): add async support to SupabaseGroongaDocumentStore and SupabaseGroongaBM25Retriever (#3380)
Co-authored-by: David S. Batista <dsbatista@gmail.com>
1 parent fdedcb1 commit 8b35e9d

5 files changed

Lines changed: 724 additions & 134 deletions

File tree

integrations/supabase/src/haystack_integrations/components/retrievers/supabase/groonga_bm25_retriever.py

Lines changed: 46 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,15 @@
55
import copy
66
from typing import Any
77

8-
from haystack import component, default_from_dict, default_to_dict
8+
from haystack import component, default_from_dict, default_to_dict, logging
99
from haystack.dataclasses import Document
1010
from haystack.document_stores.types import FilterPolicy
1111
from haystack.document_stores.types.filter_policy import apply_filter_policy
1212

1313
from haystack_integrations.document_stores.supabase import SupabaseGroongaDocumentStore
1414

15+
logger = logging.getLogger(__name__)
16+
1517

1618
@component
1719
class SupabaseGroongaBM25Retriever:
@@ -21,8 +23,8 @@ class SupabaseGroongaBM25Retriever:
2123
This retriever works without embeddings — it searches documents using plain text queries.
2224
It can be used alongside SupabasePgvectorEmbeddingRetriever in hybrid search pipelines.
2325
24-
Note: async operations are not supported as the supabase-py sync client does not expose
25-
awaitable query methods. Use the sync run() method instead.
26+
Supports both synchronous and asynchronous retrieval. For async pipelines, call
27+
``run_async()`` — the document store's async client is initialized lazily on first use.
2628
2729
Example usage:
2830
@@ -51,6 +53,7 @@ def __init__(
5153
filters: dict[str, Any] | None = None,
5254
top_k: int = 10,
5355
filter_policy: str | FilterPolicy = FilterPolicy.REPLACE,
56+
raise_on_failure: bool = True,
5457
) -> None:
5558
"""
5659
Initialize the SupabaseGroongaBM25Retriever.
@@ -59,15 +62,22 @@ def __init__(
5962
:param filters: Optional filters applied to retrieved Documents.
6063
:param top_k: Maximum number of Documents to return. Defaults to 10.
6164
:param filter_policy: Policy to determine how filters are applied.
62-
:raises ValueError: If document_store is not an instance of SupabaseGroongaDocumentStore.
65+
:param raise_on_failure: If True, raises exceptions from the document store.
66+
If False, logs the error and returns an empty document list.
67+
:raises ValueError: If document_store is not an instance of SupabaseGroongaDocumentStore
68+
or if top_k is not a positive integer.
6369
"""
6470
if not isinstance(document_store, SupabaseGroongaDocumentStore):
6571
msg = "document_store must be an instance of SupabaseGroongaDocumentStore"
6672
raise ValueError(msg)
73+
if top_k <= 0:
74+
msg = f"top_k must be greater than 0, got {top_k}"
75+
raise ValueError(msg)
6776

6877
self.document_store = document_store
6978
self.filters = filters or {}
7079
self.top_k = top_k
80+
self.raise_on_failure = raise_on_failure
7181
self.filter_policy = (
7282
filter_policy if isinstance(filter_policy, FilterPolicy) else FilterPolicy.from_str(filter_policy)
7383
)
@@ -93,11 +103,17 @@ def run(
93103
merged_filters = apply_filter_policy(self.filter_policy, self.filters, filters)
94104
effective_top_k = top_k if top_k is not None else self.top_k
95105

96-
documents = self.document_store._groonga_retrieval(
97-
query=query,
98-
top_k=effective_top_k,
99-
filters=merged_filters,
100-
)
106+
try:
107+
documents = self.document_store._groonga_retrieval(
108+
query=query,
109+
top_k=effective_top_k,
110+
filters=merged_filters,
111+
)
112+
except Exception as e:
113+
if self.raise_on_failure:
114+
raise
115+
logger.warning(f"SupabaseGroongaBM25Retriever run() failed: {e}")
116+
return {"documents": []}
101117

102118
return {"documents": documents}
103119

@@ -109,19 +125,32 @@ async def run_async(
109125
top_k: int | None = None,
110126
) -> dict[str, list[Document]]:
111127
"""
112-
Async version of run().
113-
114-
Note: supabase-py's sync client does not support native async queries.
115-
This method runs the synchronous retrieval and returns the result.
116-
For fully async support, consider using acreate_client() from supabase-py
117-
and refactoring the document store accordingly.
128+
Async version of run(). The document store's async client is initialized lazily on first use.
118129
119130
:param query: The text query to search for.
120131
:param filters: Optional runtime filters. Merged or replaced based on filter_policy.
121132
:param top_k: Optional override for maximum number of documents to return.
122133
:returns: Dictionary with key "documents" containing list of matching Documents.
123134
"""
124-
return self.run(query=query, filters=filters, top_k=top_k)
135+
if not query:
136+
return {"documents": []}
137+
138+
merged_filters = apply_filter_policy(self.filter_policy, self.filters, filters)
139+
effective_top_k = top_k if top_k is not None else self.top_k
140+
141+
try:
142+
documents = await self.document_store._groonga_retrieval_async(
143+
query=query,
144+
top_k=effective_top_k,
145+
filters=merged_filters,
146+
)
147+
except Exception as e:
148+
if self.raise_on_failure:
149+
raise
150+
logger.warning(f"SupabaseGroongaBM25Retriever run_async() failed: {e}")
151+
return {"documents": []}
152+
153+
return {"documents": documents}
125154

126155
def to_dict(self) -> dict[str, Any]:
127156
"""
@@ -134,6 +163,7 @@ def to_dict(self) -> dict[str, Any]:
134163
filters=self.filters,
135164
top_k=self.top_k,
136165
filter_policy=self.filter_policy.value,
166+
raise_on_failure=self.raise_on_failure,
137167
document_store=self.document_store.to_dict(),
138168
)
139169

0 commit comments

Comments
 (0)