Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
b223aa3
feat: add sparse vector storage to ElasticsearchDocumentStore (#2939)
GunaPalanivel Mar 19, 2026
17ce682
test: update retriever tests for new ElasticsearchDocumentStore seria…
GunaPalanivel Mar 19, 2026
29541c2
test: add sync and async tests for sparse vector storage
GunaPalanivel Mar 19, 2026
b341345
style: fix B905 (strict zip) and E501 (line length) linting errors
GunaPalanivel Mar 19, 2026
8f85f8d
style: fix mypy type inference for _default_mappings
GunaPalanivel Mar 19, 2026
b8f77c1
refactor: address PR review feedback for sparse vector storage
GunaPalanivel Mar 21, 2026
766def9
test: address PR review feedback for sparse vector tests
GunaPalanivel Mar 21, 2026
4f9def6
Merge branch 'main' into fix/2939-sparse-vector-storage
davidsbatista Mar 24, 2026
a78cbdf
fixing docstrings
davidsbatista Mar 24, 2026
884e1c5
just as a safeguard original custom_mapping dict is left unchanged
davidsbatista Mar 24, 2026
98c096d
organising imports
davidsbatista Mar 24, 2026
1187f89
formatting
davidsbatista Mar 24, 2026
e669c49
Merge branch 'main' into fix/2939-sparse-vector-storage
davidsbatista Mar 24, 2026
6a60ccd
adding more tests + fixing typing issues
davidsbatista Mar 24, 2026
e3cb2ab
Merge branch 'main' into fix/2939-sparse-vector-storage
davidsbatista Mar 26, 2026
4726a28
Merge branch 'main' into fix/2939-sparse-vector-storage
davidsbatista Mar 26, 2026
420ecce
Merge branch 'main' into fix/2939-sparse-vector-storage
davidsbatista Mar 31, 2026
f9d0755
test: add sparse retriever coverage for Elasticsearch
GunaPalanivel Apr 2, 2026
a9207fb
feat: add ElasticsearchSparseEmbeddingRetriever
GunaPalanivel Apr 2, 2026
94aa07b
fix: use weighted_tokens query for sparse retrieval
GunaPalanivel Apr 2, 2026
f054190
fix: use Elasticsearch 8.15 sparse retrieval
GunaPalanivel Apr 2, 2026
fed4c87
test: preserve sparse-vector mappings in retrieval tests
GunaPalanivel Apr 3, 2026
fc6f158
test: skip sparse query tests on unsupported Elasticsearch
GunaPalanivel Apr 3, 2026
a310896
merge: resolve elasticsearch document store test conflicts
GunaPalanivel Apr 3, 2026
83d73da
style(elasticsearch): format test_document_store
GunaPalanivel Apr 3, 2026
9b40826
Merge branch 'main' into feat/2941-elasticsearch-sparse-embedding-ret…
davidsbatista Apr 9, 2026
ab81bdf
Merge branch 'main' into feat/2941-elasticsearch-sparse-embedding-ret…
davidsbatista Apr 10, 2026
f0d90ee
updating unit tests
davidsbatista Apr 10, 2026
9f4d011
linting and formatting
davidsbatista Apr 10, 2026
dedbdb8
linting and formatting
davidsbatista Apr 10, 2026
7f5a863
Merge branch 'main' into 2941-elasticsearch-sparse-embedding-retriever
davidsbatista Apr 10, 2026
8a65805
using dataclass.replace to update docs
davidsbatista Apr 10, 2026
da94807
Merge branch 'main' into feat/2941-elasticsearch-sparse-embedding-ret…
davidsbatista Apr 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions integrations/elasticsearch/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
services:
elasticsearch:
image: "docker.elastic.co/elasticsearch/elasticsearch:8.11.1"
image: "docker.elastic.co/elasticsearch/elasticsearch:8.15.0"
ports:
- 9200:9200
restart: on-failure
Expand All @@ -12,4 +12,4 @@ services:
test: curl --fail http://localhost:9200/_cat/health || exit 1
interval: 10s
timeout: 1s
retries: 10
retries: 10
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
from .bm25_retriever import ElasticsearchBM25Retriever
from .elasticsearch_hybrid_retriever import ElasticsearchHybridRetriever
from .embedding_retriever import ElasticsearchEmbeddingRetriever
from .sparse_embedding_retriever import ElasticsearchSparseEmbeddingRetriever
from .sql_retriever import ElasticsearchSQLRetriever

__all__ = [
"ElasticsearchBM25Retriever",
"ElasticsearchEmbeddingRetriever",
"ElasticsearchHybridRetriever",
"ElasticsearchSQLRetriever",
"ElasticsearchSparseEmbeddingRetriever",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
# SPDX-FileCopyrightText: 2023-present deepset GmbH <info@deepset.ai>
#
# SPDX-License-Identifier: Apache-2.0
from typing import Any

from haystack import component, default_from_dict, default_to_dict
from haystack.dataclasses import Document
from haystack.dataclasses.sparse_embedding import SparseEmbedding
from haystack.document_stores.types import FilterPolicy
from haystack.document_stores.types.filter_policy import apply_filter_policy

from haystack_integrations.document_stores.elasticsearch.document_store import ElasticsearchDocumentStore


@component
class ElasticsearchSparseEmbeddingRetriever:
"""
ElasticsearchSparseEmbeddingRetriever retrieves documents using sparse vector similarity.

Usage example:
```python
from haystack import Document
from haystack.dataclasses.sparse_embedding import SparseEmbedding
from haystack_integrations.document_stores.elasticsearch import ElasticsearchDocumentStore
from haystack_integrations.components.retrievers.elasticsearch import ElasticsearchSparseEmbeddingRetriever

document_store = ElasticsearchDocumentStore(hosts="http://localhost:9200", sparse_vector_field="sparse_vec")
retriever = ElasticsearchSparseEmbeddingRetriever(document_store=document_store)

documents = [
Document(
content="My name is Carla and I live in Berlin",
sparse_embedding=SparseEmbedding(indices=[0, 1], values=[0.8, 0.4]),
),
Document(
content="My name is Paul and I live in New York",
sparse_embedding=SparseEmbedding(indices=[2, 3], values=[0.7, 0.6]),
),
]
document_store.write_documents(documents)

query_sparse_embedding = SparseEmbedding(indices=[0, 1], values=[1.0, 1.0])
result = retriever.run(query_sparse_embedding=query_sparse_embedding)
for doc in result["documents"]:
print(doc.content)
```
"""

def __init__(
self,
*,
document_store: ElasticsearchDocumentStore,
filters: dict[str, Any] | None = None,
top_k: int = 10,
filter_policy: str | FilterPolicy = FilterPolicy.REPLACE,
) -> None:
"""
Create the ElasticsearchSparseEmbeddingRetriever component.

:param document_store: An instance of ElasticsearchDocumentStore.
:param filters: Filters applied to the retrieved Documents.
:param top_k: Maximum number of Documents to return.
:param filter_policy: Policy to determine how filters are applied.
:raises ValueError: If `document_store` is not an instance of ElasticsearchDocumentStore.
"""
if not isinstance(document_store, ElasticsearchDocumentStore):
msg = "document_store must be an instance of ElasticsearchDocumentStore"
raise ValueError(msg)

self._document_store = document_store
self._filters = filters or {}
self._top_k = top_k
self._filter_policy = FilterPolicy.from_str(filter_policy) if isinstance(filter_policy, str) else filter_policy

def to_dict(self) -> dict[str, Any]:
"""
Serializes the component to a dictionary.

:returns:
Dictionary with serialized data.
"""
return default_to_dict(
self,
filters=self._filters,
top_k=self._top_k,
filter_policy=self._filter_policy.value,
document_store=self._document_store.to_dict(),
)

@classmethod
def from_dict(cls, data: dict[str, Any]) -> "ElasticsearchSparseEmbeddingRetriever":
"""
Deserializes the component from a dictionary.

:param data:
Dictionary to deserialize from.
:returns:
Deserialized component.
"""
data["init_parameters"]["document_store"] = ElasticsearchDocumentStore.from_dict(
data["init_parameters"]["document_store"]
)
if filter_policy := data["init_parameters"].get("filter_policy"):
data["init_parameters"]["filter_policy"] = FilterPolicy.from_str(filter_policy)
return default_from_dict(cls, data)

@component.output_types(documents=list[Document])
def run(
self,
query_sparse_embedding: SparseEmbedding,
filters: dict[str, Any] | None = None,
top_k: int | None = None,
) -> dict[str, list[Document]]:
"""
Retrieve documents using sparse vector similarity.

:param query_sparse_embedding: Sparse embedding of the query.
:param filters: Filters applied when fetching documents from the Document Store.
The way runtime filters are applied depends on the `filter_policy` selected when initializing the Retriever.
:param top_k: Maximum number of documents to return.
:returns: A dictionary with the following keys:
- `documents`: List of `Document`s most similar to the given `query_sparse_embedding`
"""
filters = apply_filter_policy(self._filter_policy, self._filters, filters)
docs = self._document_store._sparse_vector_retrieval(
query_sparse_embedding=query_sparse_embedding,
filters=filters,
top_k=top_k or self._top_k,
)
return {"documents": docs}

@component.output_types(documents=list[Document])
async def run_async(
self,
query_sparse_embedding: SparseEmbedding,
filters: dict[str, Any] | None = None,
top_k: int | None = None,
) -> dict[str, list[Document]]:
"""
Asynchronously retrieve documents using sparse vector similarity.

:param query_sparse_embedding: Sparse embedding of the query.
:param filters: Filters applied when fetching documents from the Document Store.
The way runtime filters are applied depends on the `filter_policy` selected when initializing the Retriever.
:param top_k: Maximum number of documents to return.
:returns: A dictionary with the following keys:
- `documents`: List of `Document`s most similar to the given `query_sparse_embedding`
"""
filters = apply_filter_policy(self._filter_policy, self._filters, filters)
docs = await self._document_store._sparse_vector_retrieval_async(
query_sparse_embedding=query_sparse_embedding,
filters=filters,
top_k=top_k or self._top_k,
)
return {"documents": docs}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from elastic_transport import NodeConfig
from haystack import default_from_dict, default_to_dict, logging
from haystack.dataclasses import Document
from haystack.dataclasses.sparse_embedding import SparseEmbedding
from haystack.document_stores.errors import DocumentStoreError, DuplicateDocumentError
from haystack.document_stores.types import DuplicatePolicy
from haystack.utils import Secret
Expand Down Expand Up @@ -445,6 +446,22 @@ def _deserialize_document(self, hit: dict[str, Any]) -> Document:

return Document.from_dict(data)

@staticmethod
def _sparse_embedding_to_es_vector(indices: list[int], values: list[float]) -> dict[str, float]:
"""
Converts sparse embedding indices and values to Elasticsearch sparse_vector format.

:param indices: Sparse embedding indices.
:param values: Sparse embedding values.
:returns: Sparse embedding in Elasticsearch sparse_vector format.
:raises ValueError: If indices or values are empty.
"""
if not indices or not values:
msg = "query_sparse_embedding must contain non-empty indices and values"
raise ValueError(msg)

return {str(idx): val for idx, val in zip(indices, values, strict=True)}

def _handle_sparse_embedding(self, doc_dict: dict[str, Any], doc_id: str) -> None:
"""
Extracts the sparse_embedding from a document dict and converts it to the Elasticsearch sparse_vector format.
Expand All @@ -458,9 +475,9 @@ def _handle_sparse_embedding(self, doc_dict: dict[str, Any], doc_id: str) -> Non
if not sparse_embedding:
return
if self._sparse_vector_field:
doc_dict[self._sparse_vector_field] = {
str(idx): val for idx, val in zip(sparse_embedding["indices"], sparse_embedding["values"], strict=True)
}
doc_dict[self._sparse_vector_field] = self._sparse_embedding_to_es_vector(
sparse_embedding["indices"], sparse_embedding["values"]
)
else:
logger.warning(
"Document {doc_id} has the `sparse_embedding` field set, "
Expand All @@ -469,6 +486,50 @@ def _handle_sparse_embedding(self, doc_dict: dict[str, Any], doc_id: str) -> Non
doc_id=doc_id,
)

def _create_sparse_retrieval_body(
self,
query_sparse_embedding: SparseEmbedding,
*,
filters: dict[str, Any] | None = None,
top_k: int = 10,
) -> dict[str, Any]:
"""
Builds the Elasticsearch search body for sparse vector retrieval.

:param query_sparse_embedding: Sparse embedding to search for.
:param filters: Optional filters to narrow down the search space.
:param top_k: Maximum number of documents to return.
:returns: Search body for Elasticsearch.
:raises ValueError: If sparse retrieval is not configured or the query sparse embedding is empty.
"""
if not self._sparse_vector_field:
msg = "sparse_vector_field must be set for sparse vector retrieval"
raise ValueError(msg)

query_vector = self._sparse_embedding_to_es_vector(
query_sparse_embedding.indices, query_sparse_embedding.values
)
body: dict[str, Any] = {
"size": top_k,
"query": {
"bool": {
"must": [
{
"sparse_vector": {
"field": self._sparse_vector_field,
"query_vector": query_vector,
}
}
]
}
},
}

if filters:
body["query"]["bool"]["filter"] = _normalize_filters(filters)

return body

def write_documents(
self,
documents: list[Document],
Expand Down Expand Up @@ -1086,6 +1147,53 @@ async def _embedding_retrieval_async(

return await self._search_documents_async(**search_body)

def _sparse_vector_retrieval(
self,
query_sparse_embedding: SparseEmbedding,
*,
filters: dict[str, Any] | None = None,
top_k: int = 10,
) -> list[Document]:
"""
Retrieves documents using sparse vector similarity search.

:param query_sparse_embedding: Sparse embedding to search for.
:param filters: Optional filters to narrow down the search space.
:param top_k: Maximum number of documents to return.
:returns: List of Documents most similar to query_sparse_embedding.
:raises ValueError: If sparse retrieval is not configured or the query sparse embedding is empty.
"""
body = self._create_sparse_retrieval_body(
query_sparse_embedding=query_sparse_embedding,
filters=filters,
top_k=top_k,
)
return self._search_documents(**body)

async def _sparse_vector_retrieval_async(
self,
query_sparse_embedding: SparseEmbedding,
*,
filters: dict[str, Any] | None = None,
top_k: int = 10,
) -> list[Document]:
"""
Asynchronously retrieves documents using sparse vector similarity search.

:param query_sparse_embedding: Sparse embedding to search for.
:param filters: Optional filters to narrow down the search space.
:param top_k: Maximum number of documents to return.
:returns: List of Documents most similar to query_sparse_embedding.
:raises ValueError: If sparse retrieval is not configured or the query sparse embedding is empty.
"""
self._ensure_initialized()
search_body = self._create_sparse_retrieval_body(
query_sparse_embedding=query_sparse_embedding,
filters=filters,
top_k=top_k,
)
return await self._search_documents_async(**search_body)

def count_documents_by_filter(self, filters: dict[str, Any]) -> int:
"""
Returns the number of documents that match the provided filters.
Expand Down
34 changes: 34 additions & 0 deletions integrations/elasticsearch/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,21 @@
import uuid

import pytest
from elasticsearch import Elasticsearch

from haystack_integrations.document_stores.elasticsearch import ElasticsearchDocumentStore


@pytest.fixture(scope="session")
def supports_sparse_vector_query() -> bool:
try:
version = Elasticsearch(["http://localhost:9200"]).info()["version"]["number"]
major, minor, patch = (int(x) for x in version.split(".")[:3])
except Exception:
return False
return (major, minor, patch) >= (8, 15, 0)


def _get_unique_index_name() -> str:
"""
Generate a unique, valid Elasticsearch index name for test isolation.
Expand All @@ -19,6 +30,29 @@ def _get_unique_index_name() -> str:
return f"test_sql_{uuid.uuid4().hex}"


@pytest.fixture
def sparse_document_store(supports_sparse_vector_query):
"""
Document store fixture with sparse_vector_field configured.
Automatically skips if the running Elasticsearch instance is < 8.15.0.
"""
if not supports_sparse_vector_query:
pytest.skip("Requires Elasticsearch >= 8.15.0")
index = f"test_sparse_{uuid.uuid4().hex}"
store = ElasticsearchDocumentStore(
hosts=["http://localhost:9200"],
index=index,
sparse_vector_field="sparse_vec",
)
yield store

store._ensure_initialized()
store.client.options(ignore_status=[400, 404]).indices.delete(index=index)
store.client.close()
if store._async_client is not None:
asyncio.run(store._async_client.close())


@pytest.fixture
def document_store():
"""
Expand Down
Loading
Loading