Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
b223aa3
feat: add sparse vector storage to ElasticsearchDocumentStore (#2939)
GunaPalanivel Mar 19, 2026
17ce682
test: update retriever tests for new ElasticsearchDocumentStore seria…
GunaPalanivel Mar 19, 2026
29541c2
test: add sync and async tests for sparse vector storage
GunaPalanivel Mar 19, 2026
b341345
style: fix B905 (strict zip) and E501 (line length) linting errors
GunaPalanivel Mar 19, 2026
8f85f8d
style: fix mypy type inference for _default_mappings
GunaPalanivel Mar 19, 2026
b8f77c1
refactor: address PR review feedback for sparse vector storage
GunaPalanivel Mar 21, 2026
766def9
test: address PR review feedback for sparse vector tests
GunaPalanivel Mar 21, 2026
4f9def6
Merge branch 'main' into fix/2939-sparse-vector-storage
davidsbatista Mar 24, 2026
a78cbdf
fixing docstrings
davidsbatista Mar 24, 2026
884e1c5
just as a safeguard original custom_mapping dict is left unchanged
davidsbatista Mar 24, 2026
98c096d
organising imports
davidsbatista Mar 24, 2026
1187f89
formatting
davidsbatista Mar 24, 2026
e669c49
Merge branch 'main' into fix/2939-sparse-vector-storage
davidsbatista Mar 24, 2026
6a60ccd
adding more tests + fixing typing issues
davidsbatista Mar 24, 2026
e3cb2ab
Merge branch 'main' into fix/2939-sparse-vector-storage
davidsbatista Mar 26, 2026
4726a28
Merge branch 'main' into fix/2939-sparse-vector-storage
davidsbatista Mar 26, 2026
420ecce
Merge branch 'main' into fix/2939-sparse-vector-storage
davidsbatista Mar 31, 2026
f789e81
Merge branch 'main' into elastic-search-sparse-vector
davidsbatista Apr 9, 2026
3dab623
formatting
davidsbatista Apr 9, 2026
a05da64
Merge branch 'main' into fix/2939-sparse-vector-storage
davidsbatista Apr 10, 2026
e2cfd6f
updating unit tests
davidsbatista Apr 10, 2026
82530e1
adding unit tests for _handle_sparse_embedding function
davidsbatista Apr 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
# SPDX-FileCopyrightText: 2023-present deepset GmbH <info@deepset.ai>
#
# SPDX-License-Identifier: Apache-2.0
import copy

# ruff: noqa: FBT002, FBT001 boolean-type-hint-positional-argument and boolean-default-value-positional-argument
# ruff: noqa: B008 function-call-in-default-argument
# ruff: noqa: S101 disable checks for uses of the assert keyword


from collections.abc import Mapping
from dataclasses import replace
from typing import Any, Literal
Expand Down Expand Up @@ -86,6 +85,7 @@ def __init__(
api_key: Secret | str | None = Secret.from_env_var("ELASTIC_API_KEY", strict=False),
api_key_id: Secret | str | None = Secret.from_env_var("ELASTIC_API_KEY_ID", strict=False),
embedding_similarity_function: Literal["cosine", "dot_product", "l2_norm", "max_inner_product"] = "cosine",
sparse_vector_field: str | None = None,
**kwargs: Any,
) -> None:
"""
Expand Down Expand Up @@ -117,6 +117,9 @@ def __init__(
To choose the most appropriate function, look for information about your embedding model.
To understand how document scores are computed, see the Elasticsearch
[documentation](https://www.elastic.co/guide/en/elasticsearch/reference/current/dense-vector.html#dense-vector-params)
:param sparse_vector_field: If set, the name of the Elasticsearch field where sparse embeddings
will be stored using the `sparse_vector` field type. When not set, any `sparse_embedding`
data on Documents is silently dropped during writes.
:param **kwargs: Optional arguments that `Elasticsearch` takes.
"""
self._hosts = hosts
Expand All @@ -126,16 +129,26 @@ def __init__(
self._api_key = api_key
self._api_key_id = api_key_id
self._embedding_similarity_function = embedding_similarity_function
self._sparse_vector_field = sparse_vector_field
self._custom_mapping = custom_mapping
self._kwargs = kwargs
self._initialized = False

if self._sparse_vector_field and self._sparse_vector_field in SPECIAL_FIELDS:
msg = f"sparse_vector_field '{self._sparse_vector_field}' conflicts with a reserved field name."
raise ValueError(msg)

if self._custom_mapping and not isinstance(self._custom_mapping, dict):
msg = "custom_mapping must be a dictionary"
raise ValueError(msg)

if self._custom_mapping and self._sparse_vector_field:
self._custom_mapping = copy.deepcopy(custom_mapping) # original custom_mapping dict is left unchanged
self._custom_mapping.setdefault("properties", {}) # type: ignore # can't be None here
self._custom_mapping["properties"][self._sparse_vector_field] = {"type": "sparse_vector"} # type: ignore # can't be None here

if not self._custom_mapping:
self._default_mappings = {
self._default_mappings: dict[str, Any] = {
"properties": {
"embedding": {
"type": "dense_vector",
Expand All @@ -156,6 +169,8 @@ def __init__(
}
],
}
if self._sparse_vector_field:
self._default_mappings["properties"][self._sparse_vector_field] = {"type": "sparse_vector"}

def _ensure_initialized(self) -> None:
"""
Expand Down Expand Up @@ -277,6 +292,7 @@ def to_dict(self) -> dict[str, Any]:
api_key=self._api_key.to_dict() if isinstance(self._api_key, Secret) else None,
api_key_id=self._api_key_id.to_dict() if isinstance(self._api_key_id, Secret) else None,
embedding_similarity_function=self._embedding_similarity_function,
sparse_vector_field=self._sparse_vector_field,
**self._kwargs,
)

Expand Down Expand Up @@ -404,12 +420,11 @@ async def filter_documents_async(self, filters: dict[str, Any] | None = None) ->
documents = await self._search_documents_async(query=query)
return documents

@staticmethod
def _deserialize_document(hit: dict[str, Any]) -> Document:
def _deserialize_document(self, hit: dict[str, Any]) -> Document:
"""
Creates a `Document` from the search hit provided.

This is mostly useful in self.filter_documents().
This is mostly useful in self.filter_documents() and self.filter_documents_async().

:param hit: A search hit from Elasticsearch.
:returns: `Document` created from the search hit.
Expand All @@ -420,8 +435,40 @@ def _deserialize_document(hit: dict[str, Any]) -> Document:
data["metadata"]["highlighted"] = hit["highlight"]
data["score"] = hit["_score"]

if self._sparse_vector_field and self._sparse_vector_field in data:
es_sparse = data.pop(self._sparse_vector_field)
sorted_items = sorted(es_sparse.items(), key=lambda x: int(x[0]))
data["sparse_embedding"] = {
"indices": [int(k) for k, _ in sorted_items],
"values": [v for _, v in sorted_items],
}

return Document.from_dict(data)

def _handle_sparse_embedding(self, doc_dict: dict[str, Any], doc_id: str) -> None:
"""
Extracts the sparse_embedding from a document dict and converts it to the Elasticsearch sparse_vector format.

:param doc_dict: The dictionary representation of the document.
:param doc_id: The document ID, used for warning messages.
"""
if "sparse_embedding" not in doc_dict:
return
sparse_embedding = doc_dict.pop("sparse_embedding")
if not sparse_embedding:
return
if self._sparse_vector_field:
doc_dict[self._sparse_vector_field] = {
str(idx): val for idx, val in zip(sparse_embedding["indices"], sparse_embedding["values"], strict=True)
}
else:
logger.warning(
"Document {doc_id} has the `sparse_embedding` field set, "
"but `sparse_vector_field` is not configured for this ElasticsearchDocumentStore. "
"The `sparse_embedding` field will be ignored.",
doc_id=doc_id,
)

def write_documents(
self,
documents: list[Document],
Expand Down Expand Up @@ -457,16 +504,7 @@ def write_documents(
elasticsearch_actions = []
for doc in documents:
doc_dict = doc.to_dict()

if "sparse_embedding" in doc_dict:
sparse_embedding = doc_dict.pop("sparse_embedding", None)
if sparse_embedding:
logger.warning(
"Document {doc_id} has the `sparse_embedding` field set,"
"but storing sparse embeddings in Elasticsearch is not currently supported."
"The `sparse_embedding` field will be ignored.",
doc_id=doc.id,
)
self._handle_sparse_embedding(doc_dict, doc.id)
elasticsearch_actions.append(
{
"_op_type": action,
Expand Down Expand Up @@ -544,16 +582,7 @@ async def write_documents_async(
actions = []
for doc in documents:
doc_dict = doc.to_dict()

if "sparse_embedding" in doc_dict:
sparse_embedding = doc_dict.pop("sparse_embedding", None)
if sparse_embedding:
logger.warning(
"Document {doc_id} has the `sparse_embedding` field set,"
"but storing sparse embeddings in Elasticsearch is not currently supported."
"The `sparse_embedding` field will be ignored.",
doc_id=doc.id,
)
self._handle_sparse_embedding(doc_dict, doc.id)

action = {
"_op_type": "create" if policy == DuplicatePolicy.FAIL else "index",
Expand Down
5 changes: 3 additions & 2 deletions integrations/elasticsearch/tests/test_bm25_retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def test_to_dict(_mock_elasticsearch_client):
"custom_mapping": None,
"index": "default",
"embedding_similarity_function": "cosine",
"sparse_vector_field": None,
},
"type": "haystack_integrations.document_stores.elasticsearch.document_store.ElasticsearchDocumentStore",
},
Expand All @@ -74,7 +75,7 @@ def test_from_dict(_mock_elasticsearch_client):
"type": "haystack_integrations.components.retrievers.elasticsearch.bm25_retriever.ElasticsearchBM25Retriever",
"init_parameters": {
"document_store": {
"init_parameters": {"hosts": "some fake host", "index": "default"},
"init_parameters": {"hosts": "some fake host", "index": "default", "sparse_vector_field": None},
"type": "haystack_integrations.document_stores.elasticsearch.document_store.ElasticsearchDocumentStore",
},
"filters": {},
Expand All @@ -99,7 +100,7 @@ def test_from_dict_no_filter_policy(_mock_elasticsearch_client):
"type": "haystack_integrations.components.retrievers.elasticsearch.bm25_retriever.ElasticsearchBM25Retriever",
"init_parameters": {
"document_store": {
"init_parameters": {"hosts": "some fake host", "index": "default"},
"init_parameters": {"hosts": "some fake host", "index": "default", "sparse_vector_field": None},
"type": "haystack_integrations.document_stores.elasticsearch.document_store.ElasticsearchDocumentStore",
},
"filters": {},
Expand Down
Loading
Loading