Skip to content

Commit 13b8103

Browse files
Merge branch 'main' into filter-options-qdrant
2 parents 05cba20 + fe2dd54 commit 13b8103

10 files changed

Lines changed: 937 additions & 1 deletion

File tree

integrations/elasticsearch/CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
# Changelog
22

3+
## [integrations/elasticsearch-v5.2.0] - 2026-02-02
4+
5+
### 🚀 Features
6+
7+
- Add SQLRetriever to ElasticsearchDocumentStore (#2801)
8+
9+
310
## [integrations/elasticsearch-v5.1.1] - 2026-01-29
411

512
### 🐛 Bug Fixes

integrations/elasticsearch/pydoc/config_docusaurus.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ loaders:
44
modules:
55
- haystack_integrations.components.retrievers.elasticsearch.bm25_retriever
66
- haystack_integrations.components.retrievers.elasticsearch.embedding_retriever
7+
- haystack_integrations.components.retrievers.elasticsearch.sql_retriever
78
- haystack_integrations.document_stores.elasticsearch.document_store
89
- haystack_integrations.document_stores.elasticsearch.filters
910
search_path:

integrations/elasticsearch/src/haystack_integrations/components/retrievers/elasticsearch/__init__.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,10 @@
33
# SPDX-License-Identifier: Apache-2.0
44
from .bm25_retriever import ElasticsearchBM25Retriever
55
from .embedding_retriever import ElasticsearchEmbeddingRetriever
6+
from .sql_retriever import ElasticsearchSQLRetriever
67

7-
__all__ = ["ElasticsearchBM25Retriever", "ElasticsearchEmbeddingRetriever"]
8+
__all__ = [
9+
"ElasticsearchBM25Retriever",
10+
"ElasticsearchEmbeddingRetriever",
11+
"ElasticsearchSQLRetriever",
12+
]
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
# SPDX-FileCopyrightText: 2023-present deepset GmbH <info@deepset.ai>
2+
#
3+
# SPDX-License-Identifier: Apache-2.0
4+
5+
from typing import Any
6+
7+
from haystack import component, default_from_dict, default_to_dict, logging
8+
9+
from haystack_integrations.document_stores.elasticsearch import ElasticsearchDocumentStore
10+
11+
logger = logging.getLogger(__name__)
12+
13+
14+
@component
15+
class ElasticsearchSQLRetriever:
16+
"""
17+
Executes raw Elasticsearch SQL queries against an ElasticsearchDocumentStore.
18+
19+
This component allows you to execute SQL queries directly against the Elasticsearch index,
20+
which is useful for fetching metadata, aggregations, and other structured data at runtime.
21+
22+
Returns the raw JSON response from the Elasticsearch SQL API.
23+
24+
Usage example:
25+
```python
26+
from haystack_integrations.document_stores.elasticsearch import ElasticsearchDocumentStore
27+
from haystack_integrations.components.retrievers.elasticsearch import ElasticsearchSQLRetriever
28+
29+
document_store = ElasticsearchDocumentStore(hosts="http://localhost:9200")
30+
retriever = ElasticsearchSQLRetriever(document_store=document_store)
31+
32+
result = retriever.run(
33+
query="SELECT content, category FROM \\"my_index\\" WHERE category = 'A'"
34+
)
35+
# result["result"] contains the raw Elasticsearch JSON response
36+
```
37+
"""
38+
39+
def __init__(
40+
self,
41+
*,
42+
document_store: ElasticsearchDocumentStore,
43+
raise_on_failure: bool = True,
44+
fetch_size: int | None = None,
45+
):
46+
"""
47+
Creates the ElasticsearchSQLRetriever component.
48+
49+
:param document_store: An instance of ElasticsearchDocumentStore to use with the Retriever.
50+
:param raise_on_failure:
51+
Whether to raise an exception if the API call fails. Otherwise, log a warning and return an empty dict.
52+
:param fetch_size: Optional number of results to fetch per page. If not provided, the default
53+
fetch size set in Elasticsearch is used.
54+
55+
:raises ValueError: If `document_store` is not an instance of ElasticsearchDocumentStore.
56+
"""
57+
if not isinstance(document_store, ElasticsearchDocumentStore):
58+
msg = "document_store must be an instance of ElasticsearchDocumentStore"
59+
raise ValueError(msg)
60+
61+
self._document_store = document_store
62+
self._raise_on_failure = raise_on_failure
63+
self._fetch_size = fetch_size
64+
65+
def to_dict(self) -> dict[str, Any]:
66+
"""
67+
Serializes the component to a dictionary.
68+
69+
:returns:
70+
Dictionary with serialized data.
71+
"""
72+
return default_to_dict(
73+
self,
74+
document_store=self._document_store.to_dict(),
75+
raise_on_failure=self._raise_on_failure,
76+
fetch_size=self._fetch_size,
77+
)
78+
79+
@classmethod
80+
def from_dict(cls, data: dict[str, Any]) -> "ElasticsearchSQLRetriever":
81+
"""
82+
Deserializes the component from a dictionary.
83+
84+
:param data:
85+
Dictionary to deserialize from.
86+
87+
:returns:
88+
Deserialized component.
89+
"""
90+
data["init_parameters"]["document_store"] = ElasticsearchDocumentStore.from_dict(
91+
data["init_parameters"]["document_store"]
92+
)
93+
return default_from_dict(cls, data)
94+
95+
@component.output_types(result=dict[str, Any])
96+
def run(
97+
self,
98+
query: str,
99+
document_store: ElasticsearchDocumentStore | None = None,
100+
fetch_size: int | None = None,
101+
) -> dict[str, dict[str, Any]]:
102+
"""
103+
Execute a raw Elasticsearch SQL query against the index.
104+
105+
:param query: The Elasticsearch SQL query to execute.
106+
:param document_store: Optionally, an instance of ElasticsearchDocumentStore to use with the Retriever.
107+
:param fetch_size: Optional number of results to fetch per page. If not provided, uses the value
108+
specified during initialization, or the default fetch size set in Elasticsearch.
109+
110+
:returns:
111+
A dictionary containing the raw JSON response from Elasticsearch SQL API:
112+
- result: The raw JSON response from Elasticsearch (dict) or empty dict on error.
113+
114+
Example:
115+
```python
116+
retriever = ElasticsearchSQLRetriever(document_store=document_store)
117+
result = retriever.run(
118+
query="SELECT content, category FROM \\"my_index\\" WHERE category = 'A'"
119+
)
120+
# result["result"] contains the raw Elasticsearch JSON response
121+
# result["result"]["columns"] contains column metadata
122+
# result["result"]["rows"] contains the data rows
123+
```
124+
"""
125+
if document_store is not None:
126+
if not isinstance(document_store, ElasticsearchDocumentStore):
127+
msg = "document_store must be an instance of ElasticsearchDocumentStore"
128+
raise ValueError(msg)
129+
doc_store = document_store
130+
else:
131+
doc_store = self._document_store
132+
133+
fetch_size = fetch_size if fetch_size is not None else self._fetch_size
134+
135+
try:
136+
result = doc_store._query_sql(query=query, fetch_size=fetch_size)
137+
except Exception as e:
138+
if self._raise_on_failure:
139+
raise e
140+
else:
141+
logger.warning(
142+
"An error during SQL query execution occurred and will be ignored by returning empty dict: {error}",
143+
error=str(e),
144+
exc_info=True,
145+
)
146+
result = {}
147+
148+
return {"result": result}
149+
150+
@component.output_types(result=dict[str, Any])
151+
async def run_async(
152+
self,
153+
query: str,
154+
document_store: ElasticsearchDocumentStore | None = None,
155+
fetch_size: int | None = None,
156+
) -> dict[str, dict[str, Any]]:
157+
"""
158+
Asynchronously execute a raw Elasticsearch SQL query against the index.
159+
160+
:param query: The Elasticsearch SQL query to execute.
161+
:param document_store: Optionally, an instance of ElasticsearchDocumentStore to use with the Retriever.
162+
:param fetch_size: Optional number of results to fetch per page. If not provided, uses the value
163+
specified during initialization, or the default fetch size set in Elasticsearch.
164+
165+
:returns:
166+
A dictionary containing the raw JSON response from Elasticsearch SQL API:
167+
- result: The raw JSON response from Elasticsearch (dict) or empty dict on error.
168+
169+
Example:
170+
```python
171+
retriever = ElasticsearchSQLRetriever(document_store=document_store)
172+
result = await retriever.run_async(
173+
query="SELECT content, category FROM \\"my_index\\" WHERE category = 'A'"
174+
)
175+
# result["result"] contains the raw Elasticsearch JSON response
176+
# result["result"]["columns"] contains column metadata
177+
# result["result"]["rows"] contains the data rows
178+
```
179+
"""
180+
if document_store is not None:
181+
if not isinstance(document_store, ElasticsearchDocumentStore):
182+
msg = "document_store must be an instance of ElasticsearchDocumentStore"
183+
raise ValueError(msg)
184+
doc_store = document_store
185+
else:
186+
doc_store = self._document_store
187+
188+
fetch_size = fetch_size if fetch_size is not None else self._fetch_size
189+
190+
try:
191+
result = await doc_store._query_sql_async(query=query, fetch_size=fetch_size)
192+
except Exception as e:
193+
if self._raise_on_failure:
194+
raise e
195+
else:
196+
logger.warning(
197+
"An error during SQL query execution occurred and will be ignored by returning empty dict: {error}",
198+
error=str(e),
199+
exc_info=True,
200+
)
201+
result = {}
202+
203+
return {"result": result}

integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1475,3 +1475,61 @@ async def get_metadata_field_unique_values_async(
14751475
after_key = None
14761476

14771477
return unique_values, after_key
1478+
1479+
def _query_sql(self, query: str, fetch_size: int | None = None) -> dict[str, Any]:
1480+
"""
1481+
Execute a raw Elasticsearch SQL query against the index.
1482+
1483+
This method is not meant to be part of the public interface of
1484+
`ElasticsearchDocumentStore` nor called directly.
1485+
`ElasticsearchSQLRetriever` uses this method directly and is the public interface for it.
1486+
1487+
See `ElasticsearchSQLRetriever` for more information.
1488+
1489+
:param query: The Elasticsearch SQL query to execute
1490+
:param fetch_size: Optional number of results to fetch per page.
1491+
:returns: The raw JSON response from Elasticsearch SQL API.
1492+
"""
1493+
self._ensure_initialized()
1494+
assert self._client is not None
1495+
1496+
try:
1497+
body: dict[str, Any] = {"query": query}
1498+
if fetch_size is not None:
1499+
body["fetch_size"] = fetch_size
1500+
1501+
response = self._client.sql.query(body=body)
1502+
1503+
return dict(response)
1504+
except Exception as e:
1505+
msg = f"Failed to execute SQL query in Elasticsearch: {e!s}"
1506+
raise DocumentStoreError(msg) from e
1507+
1508+
async def _query_sql_async(self, query: str, fetch_size: int | None = None) -> dict[str, Any]:
1509+
"""
1510+
Asynchronously execute a raw Elasticsearch SQL query against the index.
1511+
1512+
This method is not meant to be part of the public interface of
1513+
`ElasticsearchDocumentStore` nor called directly.
1514+
`ElasticsearchSQLRetriever` uses this method directly and is the public interface for it.
1515+
1516+
See `ElasticsearchSQLRetriever` for more information.
1517+
1518+
:param query: The Elasticsearch SQL query to execute
1519+
:param fetch_size: Optional number of results to fetch per page.
1520+
:returns: The raw JSON response from Elasticsearch SQL API.
1521+
"""
1522+
self._ensure_initialized()
1523+
assert self._async_client is not None
1524+
1525+
try:
1526+
body: dict[str, Any] = {"query": query}
1527+
if fetch_size is not None:
1528+
body["fetch_size"] = fetch_size
1529+
1530+
response = await self._async_client.sql.query(body=body)
1531+
1532+
return dict(response)
1533+
except Exception as e:
1534+
msg = f"Failed to execute SQL query in Elasticsearch: {e!s}"
1535+
raise DocumentStoreError(msg) from e
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
# SPDX-FileCopyrightText: 2023-present deepset GmbH <info@deepset.ai>
2+
#
3+
# SPDX-License-Identifier: Apache-2.0
4+
5+
import uuid
6+
7+
import pytest
8+
9+
from haystack_integrations.document_stores.elasticsearch import ElasticsearchDocumentStore
10+
11+
12+
def _get_unique_index_name() -> str:
13+
"""
14+
Generate a unique, valid Elasticsearch index name for test isolation.
15+
16+
Each test gets its own index to enable parallel test execution without conflicts.
17+
"""
18+
return f"test_sql_{uuid.uuid4().hex}"
19+
20+
21+
@pytest.fixture
22+
def document_store():
23+
"""
24+
Document store fixture for SQL retriever integration tests.
25+
"""
26+
hosts = ["http://localhost:9200"]
27+
index = _get_unique_index_name()
28+
embedding_similarity_function = "max_inner_product"
29+
30+
store = ElasticsearchDocumentStore(
31+
hosts=hosts,
32+
index=index,
33+
embedding_similarity_function=embedding_similarity_function,
34+
)
35+
yield store
36+
37+
store._ensure_initialized()
38+
store.client.options(ignore_status=[400, 404]).indices.delete(index=index)
39+
store.client.close()
40+
41+
42+
@pytest.fixture
43+
def document_store_2():
44+
"""
45+
Second document store fixture for runtime document store switching tests.
46+
"""
47+
hosts = ["http://localhost:9200"]
48+
index = f"test_sql_2_{uuid.uuid4().hex}"
49+
embedding_similarity_function = "max_inner_product"
50+
51+
store = ElasticsearchDocumentStore(
52+
hosts=hosts,
53+
index=index,
54+
embedding_similarity_function=embedding_similarity_function,
55+
)
56+
yield store
57+
58+
store._ensure_initialized()
59+
store.client.options(ignore_status=[400, 404]).indices.delete(index=index)
60+
store.client.close()

0 commit comments

Comments
 (0)