Skip to content

Commit 0f29326

Browse files
authored
Merge branch 'main' into feature/falkordb-integration
2 parents d091066 + 1266327 commit 0f29326

10 files changed

Lines changed: 834 additions & 11 deletions

File tree

.github/workflows/e2b.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ jobs:
8787
- name: Store unit tests coverage
8888
id: coverage_comment
8989
if: matrix.python-version == '3.10' && runner.os == 'Linux' && github.event_name != 'schedule'
90-
uses: py-cov-action/python-coverage-comment-action@7188638f871f721a365d644f505d1ff3df20d683 # v3.40
90+
uses: py-cov-action/python-coverage-comment-action@63f52f4fbbffada6e8dee8ec432de7e01df9ba79 # v3.41
9191
with:
9292
GITHUB_TOKEN: ${{ github.token }}
9393
COVERAGE_PATH: integrations/e2b
@@ -97,7 +97,7 @@ jobs:
9797

9898
- name: Upload coverage comment to be posted
9999
if: matrix.python-version == '3.10' && runner.os == 'Linux' && github.event_name == 'pull_request' && steps.coverage_comment.outputs.COMMENT_FILE_WRITTEN == 'true'
100-
uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7.0.0
100+
uses: actions/upload-artifact@043fb46d1a93c77aae656e7c1c64a875d1fc6a0a # v7.0.1
101101
with:
102102
name: coverage-comment-e2b
103103
path: python-coverage-comment-action-e2b.txt
@@ -108,7 +108,7 @@ jobs:
108108

109109
- name: Store combined coverage
110110
if: github.event_name == 'push'
111-
uses: py-cov-action/python-coverage-comment-action@7188638f871f721a365d644f505d1ff3df20d683 # v3.40
111+
uses: py-cov-action/python-coverage-comment-action@63f52f4fbbffada6e8dee8ec432de7e01df9ba79 # v3.41
112112
with:
113113
GITHUB_TOKEN: ${{ github.token }}
114114
COVERAGE_PATH: integrations/e2b

integrations/elasticsearch/src/haystack_integrations/components/retrievers/elasticsearch/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from .bm25_retriever import ElasticsearchBM25Retriever
55
from .elasticsearch_hybrid_retriever import ElasticsearchHybridRetriever
66
from .embedding_retriever import ElasticsearchEmbeddingRetriever
7+
from .inference_hybrid_retriever import ElasticsearchInferenceHybridRetriever
78
from .inference_sparse_retriever import ElasticsearchInferenceSparseRetriever
89
from .sparse_embedding_retriever import ElasticsearchSparseEmbeddingRetriever
910
from .sql_retriever import ElasticsearchSQLRetriever
@@ -12,6 +13,7 @@
1213
"ElasticsearchBM25Retriever",
1314
"ElasticsearchEmbeddingRetriever",
1415
"ElasticsearchHybridRetriever",
16+
"ElasticsearchInferenceHybridRetriever",
1517
"ElasticsearchInferenceSparseRetriever",
1618
"ElasticsearchSQLRetriever",
1719
"ElasticsearchSparseEmbeddingRetriever",
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
# SPDX-FileCopyrightText: 2023-present deepset GmbH <info@deepset.ai>
2+
#
3+
# SPDX-License-Identifier: Apache-2.0
4+
5+
from typing import Any
6+
7+
from haystack import component, default_from_dict, default_to_dict
8+
from haystack.dataclasses import Document
9+
from haystack.document_stores.types import FilterPolicy
10+
from haystack.document_stores.types.filter_policy import apply_filter_policy
11+
12+
from haystack_integrations.document_stores.elasticsearch import ElasticsearchDocumentStore
13+
14+
15+
@component
16+
class ElasticsearchInferenceHybridRetriever:
17+
"""
18+
A fully server-side hybrid retriever combining BM25 and ELSER sparse vector search via Elasticsearch RRF.
19+
20+
Issues a single Elasticsearch request using the `retriever.rrf` API (ES 8.9+ for `rank.rrf`,
21+
ES 8.14+ for the Retriever API). No local embedding model is required and no client-side
22+
score merging takes place — ranking is handled entirely by Elasticsearch.
23+
24+
Usage example (Elastic Cloud with ELSER deployed):
25+
26+
```python
27+
import os
28+
from haystack_integrations.components.retrievers.elasticsearch import ElasticsearchInferenceHybridRetriever
29+
from haystack_integrations.document_stores.elasticsearch import ElasticsearchDocumentStore
30+
31+
doc_store = ElasticsearchDocumentStore(
32+
hosts=os.environ["ELASTICSEARCH_URL"],
33+
api_key=os.environ["ELASTIC_API_KEY"],
34+
sparse_vector_field="sparse_vec",
35+
)
36+
retriever = ElasticsearchInferenceHybridRetriever(
37+
document_store=doc_store,
38+
inference_id=".elser-2-elasticsearch",
39+
)
40+
results = retriever.run(query="What is reinforcement learning?")
41+
```
42+
"""
43+
44+
def __init__(
45+
self,
46+
*,
47+
document_store: ElasticsearchDocumentStore,
48+
inference_id: str,
49+
filters: dict[str, Any] | None = None,
50+
fuzziness: str = "AUTO",
51+
top_k: int = 10,
52+
filter_policy: str | FilterPolicy = FilterPolicy.REPLACE,
53+
rank_window_size: int = 100,
54+
rank_constant: int = 60,
55+
) -> None:
56+
"""
57+
Create the ElasticsearchInferenceHybridRetriever component.
58+
59+
:param document_store: An instance of ElasticsearchDocumentStore with `sparse_vector_field` configured.
60+
:param inference_id: The Elasticsearch inference endpoint ID used for sparse vector search e.g.
61+
".elser-2-elasticsearch"
62+
:param filters: Filters applied to both sub-retrievers.
63+
:param fuzziness: Fuzziness for the BM25 multi_match query.
64+
:param top_k: Maximum number of Documents to return.
65+
:param filter_policy: Policy to determine how runtime filters are merged with init-time filters.
66+
:param rank_window_size: Number of candidates each sub-retriever collects before RRF ranking.
67+
:param rank_constant: RRF rank constant. Higher values reduce the impact of rank position differences.
68+
:raises ValueError: If `document_store` is not an ElasticsearchDocumentStore or `inference_id` is empty.
69+
"""
70+
if not isinstance(document_store, ElasticsearchDocumentStore):
71+
msg = "document_store must be an instance of ElasticsearchDocumentStore"
72+
raise ValueError(msg)
73+
74+
if not inference_id:
75+
msg = "inference_id must be provided"
76+
raise ValueError(msg)
77+
78+
self._document_store = document_store
79+
self._inference_id = inference_id
80+
self._filters = filters or {}
81+
self._fuzziness = fuzziness
82+
self._top_k = top_k
83+
self._filter_policy = FilterPolicy.from_str(filter_policy) if isinstance(filter_policy, str) else filter_policy
84+
self._rank_window_size = rank_window_size
85+
self._rank_constant = rank_constant
86+
87+
def to_dict(self) -> dict[str, Any]:
88+
"""
89+
Serializes the component to a dictionary.
90+
91+
:returns: Dictionary with serialized data.
92+
"""
93+
return default_to_dict(
94+
self,
95+
document_store=self._document_store.to_dict(),
96+
inference_id=self._inference_id,
97+
filters=self._filters,
98+
fuzziness=self._fuzziness,
99+
top_k=self._top_k,
100+
filter_policy=self._filter_policy.value,
101+
rank_window_size=self._rank_window_size,
102+
rank_constant=self._rank_constant,
103+
)
104+
105+
@classmethod
106+
def from_dict(cls, data: dict[str, Any]) -> "ElasticsearchInferenceHybridRetriever":
107+
"""
108+
Deserializes the component from a dictionary.
109+
110+
:param data: Dictionary to deserialize from.
111+
:returns: Deserialized component instance.
112+
"""
113+
data["init_parameters"]["document_store"] = ElasticsearchDocumentStore.from_dict(
114+
data["init_parameters"]["document_store"]
115+
)
116+
if filter_policy := data["init_parameters"].get("filter_policy"):
117+
data["init_parameters"]["filter_policy"] = FilterPolicy.from_str(filter_policy)
118+
return default_from_dict(cls, data)
119+
120+
@component.output_types(documents=list[Document])
121+
def run(
122+
self,
123+
query: str,
124+
filters: dict[str, Any] | None = None,
125+
top_k: int | None = None,
126+
) -> dict[str, list[Document]]:
127+
"""
128+
Run a hybrid retrieval query against Elasticsearch.
129+
130+
:param query: The query string.
131+
:param filters: Runtime filters merged with init-time filters according to `filter_policy`.
132+
:param top_k: Maximum number of documents to return, overrides the init-time value.
133+
:returns: A dictionary with key `documents` containing the retrieved list of `Document`s.
134+
"""
135+
filters = apply_filter_policy(self._filter_policy, self._filters, filters)
136+
docs = self._document_store._hybrid_retrieval_inference(
137+
query=query,
138+
inference_id=self._inference_id,
139+
filters=filters,
140+
fuzziness=self._fuzziness,
141+
top_k=top_k or self._top_k,
142+
rank_window_size=self._rank_window_size,
143+
rank_constant=self._rank_constant,
144+
)
145+
return {"documents": docs}
146+
147+
@component.output_types(documents=list[Document])
148+
async def run_async(
149+
self,
150+
query: str,
151+
filters: dict[str, Any] | None = None,
152+
top_k: int | None = None,
153+
) -> dict[str, list[Document]]:
154+
"""
155+
Asynchronously run a hybrid retrieval query against Elasticsearch.
156+
157+
:param query: The query string.
158+
:param filters: Runtime filters merged with init-time filters according to `filter_policy`.
159+
:param top_k: Maximum number of documents to return, overrides the init-time value.
160+
:returns: A dictionary with key `documents` containing the retrieved list of `Document`s.
161+
"""
162+
filters = apply_filter_policy(self._filter_policy, self._filters, filters)
163+
docs = await self._document_store._hybrid_retrieval_inference_async(
164+
query=query,
165+
inference_id=self._inference_id,
166+
filters=filters,
167+
fuzziness=self._fuzziness,
168+
top_k=top_k or self._top_k,
169+
rank_window_size=self._rank_window_size,
170+
rank_constant=self._rank_constant,
171+
)
172+
return {"documents": docs}

integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1309,6 +1309,152 @@ async def _sparse_vector_retrieval_inference_async(
13091309
)
13101310
return await self._search_documents_async(**search_body)
13111311

1312+
def _create_hybrid_retrieval_inference_body(
1313+
self,
1314+
query: str,
1315+
inference_id: str,
1316+
*,
1317+
filters: dict[str, Any] | None = None,
1318+
fuzziness: str = "AUTO",
1319+
top_k: int = 10,
1320+
rank_window_size: int = 100,
1321+
rank_constant: int = 60,
1322+
) -> dict[str, Any]:
1323+
"""
1324+
Builds the Elasticsearch search body for server-side hybrid retrieval using the RRF retriever API.
1325+
1326+
Combines BM25 (multi_match) and sparse vector (ELSER inference) as two standard sub-retrievers
1327+
inside a single `retriever.rrf` request — no client-side merging.
1328+
1329+
:param query: Query text.
1330+
:param inference_id: Elasticsearch inference model ID (e.g. ".elser_model_2").
1331+
:param filters: Optional filters applied to both sub-retrievers.
1332+
:param fuzziness: Fuzziness for the BM25 multi_match query.
1333+
:param top_k: Number of documents to return.
1334+
:param rank_window_size: Number of candidates each sub-retriever collects before RRF merging.
1335+
:param rank_constant: RRF rank constant (higher values reduce the impact of rank differences).
1336+
:returns: Search body for Elasticsearch.
1337+
:raises ValueError: If `sparse_vector_field` is not configured or `query` is empty.
1338+
"""
1339+
if not self._sparse_vector_field:
1340+
msg = "sparse_vector_field must be set for hybrid retrieval"
1341+
raise ValueError(msg)
1342+
if not query:
1343+
msg = "query must be a non-empty string"
1344+
raise ValueError(msg)
1345+
1346+
bm25_clause: dict[str, Any] = {
1347+
"standard": {
1348+
"query": {
1349+
"multi_match": {
1350+
"query": query,
1351+
"fuzziness": fuzziness,
1352+
"type": "most_fields",
1353+
"operator": "OR",
1354+
}
1355+
}
1356+
}
1357+
}
1358+
sparse_clause: dict[str, Any] = {
1359+
"standard": {
1360+
"query": {
1361+
"sparse_vector": {
1362+
"field": self._sparse_vector_field,
1363+
"inference_id": inference_id,
1364+
"query": query,
1365+
}
1366+
}
1367+
}
1368+
}
1369+
1370+
if filters:
1371+
normalized = _normalize_filters(filters)
1372+
bm25_clause["standard"]["filter"] = normalized
1373+
sparse_clause["standard"]["filter"] = normalized
1374+
1375+
return {
1376+
"retriever": {
1377+
"rrf": {
1378+
"retrievers": [bm25_clause, sparse_clause],
1379+
"rank_window_size": rank_window_size,
1380+
"rank_constant": rank_constant,
1381+
}
1382+
},
1383+
"size": top_k,
1384+
}
1385+
1386+
def _hybrid_retrieval_inference(
1387+
self,
1388+
query: str,
1389+
inference_id: str,
1390+
*,
1391+
filters: dict[str, Any] | None = None,
1392+
fuzziness: str = "AUTO",
1393+
top_k: int = 10,
1394+
rank_window_size: int = 100,
1395+
rank_constant: int = 60,
1396+
) -> list[Document]:
1397+
"""
1398+
Retrieves documents using a fully server-side hybrid search (BM25 + ELSER RRF).
1399+
1400+
Issues a single Elasticsearch request using the `retriever.rrf` API (available since ES 8.9,
1401+
Retriever API since ES 8.14). No client-side score merging is performed.
1402+
1403+
:param query: Query text.
1404+
:param inference_id: Elasticsearch inference model ID (e.g. ".elser_model_2").
1405+
:param filters: Optional filters applied to both sub-retrievers.
1406+
:param fuzziness: Fuzziness for the BM25 multi_match query.
1407+
:param top_k: Maximum number of documents to return.
1408+
:param rank_window_size: Number of candidates each sub-retriever collects before RRF merging.
1409+
:param rank_constant: RRF rank constant.
1410+
:returns: List of Documents ranked by RRF score.
1411+
"""
1412+
body = self._create_hybrid_retrieval_inference_body(
1413+
query=query,
1414+
inference_id=inference_id,
1415+
filters=filters,
1416+
fuzziness=fuzziness,
1417+
top_k=top_k,
1418+
rank_window_size=rank_window_size,
1419+
rank_constant=rank_constant,
1420+
)
1421+
return self._search_documents(**body)
1422+
1423+
async def _hybrid_retrieval_inference_async(
1424+
self,
1425+
query: str,
1426+
inference_id: str,
1427+
*,
1428+
filters: dict[str, Any] | None = None,
1429+
fuzziness: str = "AUTO",
1430+
top_k: int = 10,
1431+
rank_window_size: int = 100,
1432+
rank_constant: int = 60,
1433+
) -> list[Document]:
1434+
"""
1435+
Asynchronously retrieves documents using a fully server-side hybrid search (BM25 + ELSER RRF).
1436+
1437+
:param query: Query text.
1438+
:param inference_id: Elasticsearch inference model ID (e.g. ".elser_model_2").
1439+
:param filters: Optional filters applied to both sub-retrievers.
1440+
:param fuzziness: Fuzziness for the BM25 multi_match query.
1441+
:param top_k: Maximum number of documents to return.
1442+
:param rank_window_size: Number of candidates each sub-retriever collects before RRF merging.
1443+
:param rank_constant: RRF rank constant.
1444+
:returns: List of Documents ranked by RRF score.
1445+
"""
1446+
self._ensure_initialized()
1447+
body = self._create_hybrid_retrieval_inference_body(
1448+
query=query,
1449+
inference_id=inference_id,
1450+
filters=filters,
1451+
fuzziness=fuzziness,
1452+
top_k=top_k,
1453+
rank_window_size=rank_window_size,
1454+
rank_constant=rank_constant,
1455+
)
1456+
return await self._search_documents_async(**body)
1457+
13121458
def count_documents_by_filter(self, filters: dict[str, Any]) -> int:
13131459
"""
13141460
Returns the number of documents that match the provided filters.

0 commit comments

Comments
 (0)