Skip to content

Commit 3ae853c

Browse files
fix(supabase): address reviewer feedback - lazy init, DocumentStore base class, filters, async
1 parent ceb8394 commit 3ae853c

3 files changed

Lines changed: 283 additions & 66 deletions

File tree

integrations/supabase/src/haystack_integrations/components/retrievers/supabase/groonga_retriever.py

Lines changed: 58 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -15,27 +15,31 @@
1515
@component
1616
class SupabaseGroongaRetriever:
1717
"""
18-
Retrieves documents from SupabaseGroongaDocumentStore using PGroonga full-text search.
18+
Retrieves documents from SupabaseGroongaDocumentStore using PGroonga full-text search.
1919
20-
This retriever works without embeddings — it searches documents using plain text queries.
21-
It can be used alongside SupabasePgvectorEmbeddingRetriever in hybrid search pipelines.
20+
This retriever works without embeddings — it searches documents using plain text queries.
21+
It can be used alongside SupabasePgvectorEmbeddingRetriever in hybrid search pipelines.
2222
23-
Example usage:
23+
Note: async operations are not supported as the supabase-py sync client does not expose
24+
awaitable query methods. Use the sync run() method instead.
2425
25-
```python
26-
from haystack_integrations.document_stores.supabase import SupabaseGroongaDocumentStore
27-
from haystack_integrations.components.retrievers.supabase import SupabaseGroongaRetriever
28-
from haystack.utils import Secret
29-
30-
document_store = SupabaseGroongaDocumentStore(
31-
supabase_url="https://<project>.supabase.co",
32-
supabase_key=Secret.from_env_var("SUPABASE_SERVICE_KEY"),
33-
table_name="haystack_fts_documents",
34-
)
26+
Example usage:
3527
36-
retriever = SupabaseGroongaRetriever(document_store=document_store, top_k=10)
37-
result = retriever.run(query="python programming")
38-
print(result["documents"])
28+
```python
29+
from haystack_integrations.document_stores.supabase import SupabaseGroongaDocumentStore
30+
from haystack_integrations.components.retrievers.supabase import SupabaseGroongaRetriever
31+
from haystack.utils import Secret
32+
33+
document_store = SupabaseGroongaDocumentStore(
34+
supabase_url="https://<project>.supabase.co",
35+
supabase_key=Secret.from_env_var("SUPABASE_SERVICE_KEY"),
36+
table_name="haystack_fts_documents",
37+
)
38+
document_store.warm_up()
39+
40+
retriever = SupabaseGroongaRetriever(document_store=document_store, top_k=10)
41+
result = retriever.run(query="python programming")
42+
print(result["documents"])
3943
```
4044
"""
4145

@@ -85,15 +89,7 @@ def run(
8589
if not query:
8690
return {"documents": []}
8791

88-
# Handle filter policy
89-
if filters is not None:
90-
if self.filter_policy == FilterPolicy.MERGE:
91-
merged_filters = {**self.filters, **filters}
92-
else:
93-
merged_filters = filters
94-
else:
95-
merged_filters = self.filters
96-
92+
merged_filters = self._merge_filters(filters)
9793
effective_top_k = top_k if top_k is not None else self.top_k
9894

9995
documents = self.document_store._groonga_retrieval(
@@ -104,6 +100,41 @@ def run(
104100

105101
return {"documents": documents}
106102

103+
@component.output_types(documents=list[Document])
104+
async def run_async(
105+
self,
106+
query: str,
107+
filters: dict[str, Any] | None = None,
108+
top_k: int | None = None,
109+
) -> dict[str, list[Document]]:
110+
"""
111+
Async version of run().
112+
113+
Note: supabase-py's sync client does not support native async queries.
114+
This method runs the synchronous retrieval and returns the result.
115+
For fully async support, consider using acreate_client() from supabase-py
116+
and refactoring the document store accordingly.
117+
118+
:param query: The text query to search for.
119+
:param filters: Optional runtime filters. Merged or replaced based on filter_policy.
120+
:param top_k: Optional override for maximum number of documents to return.
121+
:returns: Dictionary with key "documents" containing list of matching Documents.
122+
"""
123+
return self.run(query=query, filters=filters, top_k=top_k)
124+
125+
def _merge_filters(self, runtime_filters: dict[str, Any] | None) -> dict[str, Any]:
126+
"""
127+
Merges runtime filters with init filters based on filter_policy.
128+
129+
:param runtime_filters: Filters passed at runtime.
130+
:returns: Merged filters dictionary.
131+
"""
132+
if runtime_filters is not None:
133+
if self.filter_policy == FilterPolicy.MERGE:
134+
return {**self.filters, **runtime_filters}
135+
return runtime_filters
136+
return self.filters
137+
107138
def to_dict(self) -> dict[str, Any]:
108139
"""
109140
Serializes the component to a dictionary.
@@ -131,4 +162,4 @@ def from_dict(cls, data: dict[str, Any]) -> "SupabaseGroongaRetriever":
131162
data["init_parameters"]["document_store"] = SupabaseGroongaDocumentStore.from_dict(doc_store_params)
132163
if filter_policy := data["init_parameters"].get("filter_policy"):
133164
data["init_parameters"]["filter_policy"] = FilterPolicy.from_str(filter_policy)
134-
return default_from_dict(cls, data)
165+
return default_from_dict(cls, data)

integrations/supabase/src/haystack_integrations/document_stores/supabase/groonga_document_store.py

Lines changed: 136 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -2,42 +2,43 @@
22
#
33
# SPDX-License-Identifier: Apache-2.0
44

5-
from typing import Any
5+
from typing import Any, Optional
66

77
from haystack import default_from_dict, default_to_dict, logging
88
from haystack.dataclasses import Document
99
from haystack.document_stores.errors import DuplicateDocumentError
10-
from haystack.document_stores.types import DuplicatePolicy
10+
from haystack.document_stores.types import DocumentStore, DuplicatePolicy
1111
from haystack.utils.auth import Secret, deserialize_secrets_inplace
1212

13-
from supabase import create_client
13+
from supabase import create_client, Client
1414

1515
logger = logging.getLogger(__name__)
1616

1717

18-
class SupabaseGroongaDocumentStore:
18+
class SupabaseGroongaDocumentStore(DocumentStore):
1919
"""
20-
A Document Store for Supabase using PGroonga for full-text search.
20+
A Document Store for Supabase using PGroonga for full-text search.
2121
22-
PGroonga is a PostgreSQL extension for fast, multilingual full-text search.
23-
Unlike vector search, this store works with plain text queries — no embeddings needed.
22+
PGroonga is a PostgreSQL extension for fast, multilingual full-text search.
23+
Unlike vector search, this store works with plain text queries — no embeddings needed.
2424
25-
Prerequisites:
26-
- A Supabase project with PGroonga extension enabled.
27-
- Enable PGroonga in your Supabase project by running:
28-
`CREATE EXTENSION IF NOT EXISTS pgroonga;`
25+
Prerequisites:
26+
- A Supabase project with PGroonga extension enabled.
27+
- Enable PGroonga in your Supabase project by running:
28+
`CREATE EXTENSION IF NOT EXISTS pgroonga;`
2929
30-
Example usage:
30+
Example usage:
3131
3232
```python
33-
from haystack_integrations.document_stores.supabase import SupabaseGroongaDocumentStore
34-
from haystack.utils import Secret
35-
36-
document_store = SupabaseGroongaDocumentStore(
37-
supabase_url="https://<project>.supabase.co",
38-
supabase_key=Secret.from_env_var("SUPABASE_SERVICE_KEY"),
39-
table_name="haystack_fts_documents",
40-
)
33+
from haystack_integrations.document_stores.supabase import SupabaseGroongaDocumentStore
34+
from haystack.utils import Secret
35+
36+
document_store = SupabaseGroongaDocumentStore(
37+
supabase_url="https://<project>.supabase.co",
38+
supabase_key=Secret.from_env_var("SUPABASE_SERVICE_KEY"),
39+
table_name="haystack_fts_documents",
40+
)
41+
document_store.warm_up()
4142
```
4243
"""
4344

@@ -52,6 +53,8 @@ def __init__(
5253
"""
5354
Creates a new SupabaseGroongaDocumentStore instance.
5455
56+
Note: Call warm_up() before using the store to initialize the client and table.
57+
5558
:param supabase_url: The URL of your Supabase project.
5659
Format: `https://<project-ref>.supabase.co`
5760
:param supabase_key: The service role key for your Supabase project.
@@ -66,11 +69,17 @@ def __init__(
6669
self.table_name = table_name
6770
self.recreate_table = recreate_table
6871

69-
# Connect to Supabase
72+
# Client is initialized lazily in warm_up()
73+
self._client: Optional[Client] = None
74+
75+
def warm_up(self) -> None:
76+
"""
77+
Initializes the Supabase client and sets up the table.
78+
79+
Must be called before using the document store.
80+
"""
7081
key = self.supabase_key.resolve_value() or ""
7182
self._client = create_client(self.supabase_url, key)
72-
73-
# Set up the table
7483
self._setup_table()
7584

7685
def _setup_table(self) -> None:
@@ -79,6 +88,8 @@ def _setup_table(self) -> None:
7988
8089
If recreate_table is True, drops and recreates the table.
8190
"""
91+
assert self._client is not None, "Call warm_up() before using the document store."
92+
8293
if self.recreate_table:
8394
self._client.rpc("exec_sql", {"query": f"DROP TABLE IF EXISTS {self.table_name};"}).execute()
8495

@@ -103,36 +114,80 @@ def _setup_table(self) -> None:
103114

104115
def count_documents(self) -> int:
105116
"""
106-
the number of documents in the store.
117+
Returns the number of documents in the store.
107118
108119
:returns: Number of documents.
109120
"""
121+
assert self._client is not None, "Call warm_up() before using the document store."
110122
result = self._client.table(self.table_name).select("*", count="exact").execute()
111123
return int(result.count) if result.count is not None else 0
112124

113-
def filter_documents(self, filters: dict[str, Any] | None = None) -> list[Document]: # noqa: ARG002
125+
def filter_documents(self, filters: dict[str, Any] | None = None) -> list[Document]:
114126
"""
115127
Returns documents matching the given filters.
116128
117-
:param filters: Optional dictionary of filters.
129+
Supported filters: equality filters on `id`, `content`, and `meta` fields.
130+
131+
:param filters: Optional dictionary of filters. Example: {"field": "meta.language", "operator": "==", "value": "en"}
118132
:returns: List of matching Document objects.
119133
"""
134+
assert self._client is not None, "Call warm_up() before using the document store."
135+
120136
query = self._client.table(self.table_name).select("*")
137+
138+
if filters:
139+
query = self._apply_filters(query, filters)
140+
121141
result = query.execute()
122142
return [self._to_haystack_document(row) for row in result.data if isinstance(row, dict)]
123143

144+
def _apply_filters(self, query: Any, filters: dict[str, Any]) -> Any:
145+
"""
146+
Applies filters to a Supabase query.
147+
148+
:param query: The Supabase query builder.
149+
:param filters: Dictionary of filters to apply.
150+
:returns: The query with filters applied.
151+
"""
152+
operator = filters.get("operator", "AND")
153+
conditions = filters.get("conditions", [])
154+
155+
for condition in conditions:
156+
field = condition.get("field", "")
157+
op = condition.get("operator", "==")
158+
value = condition.get("value")
159+
160+
# Handle nested meta fields e.g. "meta.language"
161+
if field.startswith("meta."):
162+
meta_key = field[len("meta."):]
163+
if op == "==":
164+
query = query.eq(f"meta->>'{meta_key}'", value)
165+
elif op == "!=":
166+
query = query.neq(f"meta->>'{meta_key}'", value)
167+
else:
168+
if op == "==":
169+
query = query.eq(field, value)
170+
elif op == "!=":
171+
query = query.neq(field, value)
172+
elif op == "in":
173+
query = query.in_(field, value)
174+
175+
return query
176+
124177
def write_documents(
125178
self,
126179
documents: list[Document],
127-
policy: DuplicatePolicy = DuplicatePolicy.NONE,
180+
policy: DuplicatePolicy = DuplicatePolicy.FAIL,
128181
) -> int:
129182
"""
130183
Writes documents to the store.
131184
132185
:param documents: List of Haystack Document objects to write.
133-
:param policy: How to handle duplicate documents.
186+
:param policy: How to handle duplicate documents. Defaults to DuplicatePolicy.FAIL.
134187
:returns: Number of documents written.
135188
"""
189+
assert self._client is not None, "Call warm_up() before using the document store."
190+
136191
if not documents:
137192
return 0
138193

@@ -171,6 +226,8 @@ def delete_documents(self, document_ids: list[str]) -> None:
171226
172227
:param document_ids: List of document IDs to delete.
173228
"""
229+
assert self._client is not None, "Call warm_up() before using the document store."
230+
174231
if not document_ids:
175232
return
176233
self._client.table(self.table_name).delete().in_("id", document_ids).execute()
@@ -179,21 +236,68 @@ def _groonga_retrieval(
179236
self,
180237
query: str,
181238
top_k: int = 10,
182-
filters: dict[str, Any] | None = None, # noqa: ARG002
239+
filters: dict[str, Any] | None = None,
183240
) -> list[Document]:
184241
"""
185242
Searches documents using PGroonga full-text search.
186243
187244
:param query: The text query to search for.
188245
:param top_k: Maximum number of results to return.
189-
:param filters: Optional filters to apply.
246+
:param filters: Optional filters to apply after retrieval.
190247
:returns: List of matching Document objects ranked by relevance.
191248
"""
249+
assert self._client is not None, "Call warm_up() before using the document store."
250+
192251
result = self._client.rpc(
193252
"groonga_search", {"query_text": query, "table": self.table_name, "top_k": top_k}
194253
).execute()
195254

196-
return [self._to_haystack_document(row) for row in (result.data or []) if isinstance(row, dict)]
255+
documents = [self._to_haystack_document(row) for row in (result.data or []) if isinstance(row, dict)]
256+
257+
# Apply filters post-retrieval if provided
258+
if filters:
259+
documents = self._filter_documents_in_memory(documents, filters)
260+
261+
return documents
262+
263+
def _filter_documents_in_memory(self, documents: list[Document], filters: dict[str, Any]) -> list[Document]:
264+
"""
265+
Filters a list of documents in memory based on the given filters.
266+
267+
:param documents: List of documents to filter.
268+
:param filters: Dictionary of filters to apply.
269+
:returns: Filtered list of documents.
270+
"""
271+
conditions = filters.get("conditions", [])
272+
filtered = []
273+
274+
for doc in documents:
275+
match = True
276+
for condition in conditions:
277+
field = condition.get("field", "")
278+
op = condition.get("operator", "==")
279+
value = condition.get("value")
280+
281+
if field.startswith("meta."):
282+
meta_key = field[len("meta."):]
283+
doc_value = doc.meta.get(meta_key)
284+
else:
285+
doc_value = getattr(doc, field, None)
286+
287+
if op == "==" and doc_value != value:
288+
match = False
289+
break
290+
elif op == "!=" and doc_value == value:
291+
match = False
292+
break
293+
elif op == "in" and doc_value not in value:
294+
match = False
295+
break
296+
297+
if match:
298+
filtered.append(doc)
299+
300+
return filtered
197301

198302
def _to_haystack_document(self, row: dict[str, Any]) -> Document:
199303
"""
@@ -232,4 +336,4 @@ def from_dict(cls, data: dict[str, Any]) -> "SupabaseGroongaDocumentStore":
232336
:returns: Deserialized component.
233337
"""
234338
deserialize_secrets_inplace(data["init_parameters"], ["supabase_key"])
235-
return default_from_dict(cls, data)
339+
return default_from_dict(cls, data)

0 commit comments

Comments
 (0)