Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def __init__(
api_key_id: Secret | str | None = Secret.from_env_var("ELASTIC_API_KEY_ID", strict=False),
embedding_similarity_function: Literal["cosine", "dot_product", "l2_norm", "max_inner_product"] = "cosine",
sparse_vector_field: str | None = None,
ingest_pipeline: str | None = None,
**kwargs: Any,
) -> None:
"""
Expand Down Expand Up @@ -122,6 +123,27 @@ def __init__(
:param sparse_vector_field: If set, the name of the Elasticsearch field where sparse embeddings
will be stored using the `sparse_vector` field type. When not set, any `sparse_embedding`
data on Documents is silently dropped during writes.
:param ingest_pipeline: If set, the id of an Elasticsearch ingest pipeline to run on each bulk
index or create. This is the recommended way to generate embeddings at index time using
Elasticsearch's inference processors (e.g. ELSER or a dense model) without running a
Haystack embedder component. Leading and trailing whitespace is stripped.

Requirements when using inference processors:

- Configure the processor with ``input_output`` so the embedding is written directly
to the right field: ``output_field`` must match ``"embedding"`` (for dense retrieval)
or the value of ``sparse_vector_field`` (for ELSER / sparse retrieval). The ES default
target ``ml.inference.<tag>`` will not be found by Haystack's retrievers.
- Do **not** also run a Haystack ``DocumentEmbedder`` upstream. If documents arrive with
a pre-computed ``embedding``, the pipeline will overwrite it with its own model's
vectors, causing a silent mismatch between stored and query embeddings at retrieval time.
- If you supply ``custom_mapping``, include the output field with the correct type
(``dense_vector`` or ``sparse_vector``).

Sparse embedding note: Elasticsearch does not store ``sparse_vector`` data generated
by inference pipelines in ``_source``; it goes only into the inverted index. Haystack
works around this by requesting the field via the ES ``fields`` API on every search so
that ``Document.sparse_embedding`` is populated correctly on returned documents.
:param **kwargs: Optional arguments that `Elasticsearch` takes.
"""
self._hosts = hosts
Expand All @@ -132,6 +154,14 @@ def __init__(
self._api_key_id = api_key_id
self._embedding_similarity_function = embedding_similarity_function
self._sparse_vector_field = sparse_vector_field
if ingest_pipeline is not None:
stripped_pipeline = ingest_pipeline.strip()
if not stripped_pipeline:
msg = "ingest_pipeline must be a non-empty string when set."
raise ValueError(msg)
self._ingest_pipeline: str | None = stripped_pipeline
else:
self._ingest_pipeline = None
self._custom_mapping = custom_mapping
self._kwargs = kwargs
self._initialized = False
Expand Down Expand Up @@ -295,6 +325,7 @@ def to_dict(self) -> dict[str, Any]:
api_key_id=self._api_key_id.to_dict() if isinstance(self._api_key_id, Secret) else None,
embedding_similarity_function=self._embedding_similarity_function,
sparse_vector_field=self._sparse_vector_field,
ingest_pipeline=self._ingest_pipeline,
**self._kwargs,
)

Expand Down Expand Up @@ -342,6 +373,12 @@ def _search_documents(self, **kwargs: Any) -> list[Document]:
if top_k is None and "knn" in kwargs and "k" in kwargs["knn"]:
top_k = kwargs["knn"]["k"]

# sparse_vector data written by an ingest pipeline is not stored in _source,
# but is retrievable via the fields API. Request it explicitly so that
# _deserialize_document can populate Document.sparse_embedding correctly.
if self._sparse_vector_field and "fields" not in kwargs:
kwargs["fields"] = [self._sparse_vector_field]

documents: list[Document] = []
from_ = 0
# Handle pagination
Expand Down Expand Up @@ -369,6 +406,12 @@ async def _search_documents_async(self, **kwargs: Any) -> list[Document]:
if top_k is None and "knn" in kwargs and "k" in kwargs["knn"]:
top_k = kwargs["knn"]["k"]

# sparse_vector data written by an ingest pipeline is not stored in _source,
# but is retrievable via the fields API. Request it explicitly so that
# _deserialize_document can populate Document.sparse_embedding correctly.
if self._sparse_vector_field and "fields" not in kwargs:
kwargs["fields"] = [self._sparse_vector_field]

documents: list[Document] = []
from_ = 0

Expand Down Expand Up @@ -437,20 +480,33 @@ def _deserialize_document(self, hit: dict[str, Any]) -> Document:
data["metadata"]["highlighted"] = hit["highlight"]
data["score"] = hit["_score"]

if self._sparse_vector_field and self._sparse_vector_field in data:
es_sparse = data.pop(self._sparse_vector_field)
try:
# Haystack SparseEmbedding requires integer indices. Documents indexed via
# Haystack use numeric string keys ("0", "1", ...). Documents indexed by an
# ES inference pipeline (e.g. ELSER) use token-string keys ("berlin", ...) which
# cannot be mapped to integer indices, so sparse_embedding is left unset.
sorted_items = sorted(es_sparse.items(), key=lambda x: int(x[0]))
data["sparse_embedding"] = {
"indices": [int(k) for k, _ in sorted_items],
"values": [v for _, v in sorted_items],
}
except ValueError:
pass
if self._sparse_vector_field:
sparse_field = self._sparse_vector_field
# Prefer the fields API response over _source: sparse_vector data written by an
# ingest pipeline (e.g. ELSER) is indexed but not stored in _source. The fields
# API returns it when requested by _search_documents / _search_documents_async.
# For documents indexed directly by Haystack the data is in _source as usual.
fields_hit = hit.get("fields", {})
if sparse_field in fields_hit:
values = fields_hit[sparse_field] # fields API wraps values in a list
es_sparse = values[0] if values else None
data.pop(sparse_field, None)
else:
es_sparse = data.pop(sparse_field, None)

if es_sparse:
try:
# Haystack SparseEmbedding requires integer indices. Documents indexed via
# Haystack use numeric string keys ("0", "1", ...). Documents indexed by an
# ES inference pipeline (e.g. ELSER) use token-string keys ("berlin", ...) which
# cannot be mapped to integer indices, so sparse_embedding is left unset.
sorted_items = sorted(es_sparse.items(), key=lambda x: int(x[0]))
data["sparse_embedding"] = {
"indices": [int(k) for k, _ in sorted_items],
"values": [v for _, v in sorted_items],
}
except ValueError:
pass

return Document.from_dict(data)

Expand Down Expand Up @@ -623,6 +679,14 @@ def write_documents(
elasticsearch_actions = []
for doc in documents:
doc_dict = doc.to_dict()
# ES rejects null for strongly-typed fields (dense_vector, sparse_vector) when the
# index mapping carries explicit configuration such as `dims`. A missing field is
# always valid — it lets ingest pipelines populate the value at index time, and for
# ordinary writes it simply means no value is stored. We only strip the known
# Haystack document fields here; metadata values are left untouched intentionally.
for field in ("embedding", "blob", "score"):
if doc_dict.get(field) is None:
doc_dict.pop(field, None)
self._handle_sparse_embedding(doc_dict, doc.id)
elasticsearch_actions.append(
{
Expand All @@ -632,14 +696,18 @@ def write_documents(
}
)

documents_written, errors = helpers.bulk(
client=self.client,
actions=elasticsearch_actions,
refresh=refresh,
index=self._index,
raise_on_error=False,
stats_only=False,
)
bulk_kwargs: dict[str, Any] = {
"client": self.client,
"actions": elasticsearch_actions,
"refresh": refresh,
"index": self._index,
"raise_on_error": False,
"stats_only": False,
}
if self._ingest_pipeline is not None:
bulk_kwargs["pipeline"] = self._ingest_pipeline

documents_written, errors = helpers.bulk(**bulk_kwargs)

if errors:
# with stats_only=False, errors is guaranteed to be a list of dicts
Expand Down Expand Up @@ -701,6 +769,14 @@ async def write_documents_async(
actions = []
for doc in documents:
doc_dict = doc.to_dict()
# ES rejects null for strongly-typed fields (dense_vector, sparse_vector) when the
# index mapping carries explicit configuration such as `dims`. A missing field is
# always valid — it lets ingest pipelines populate the value at index time, and for
# ordinary writes it simply means no value is stored. We only strip the known
# Haystack document fields here; metadata values are left untouched intentionally.
for field in ("embedding", "blob", "score"):
if doc_dict.get(field) is None:
doc_dict.pop(field, None)
self._handle_sparse_embedding(doc_dict, doc.id)

action = {
Expand All @@ -710,14 +786,18 @@ async def write_documents_async(
}
actions.append(action)

documents_written, errors = await helpers.async_bulk(
client=self.async_client,
actions=actions,
index=self._index,
refresh=refresh,
raise_on_error=False,
stats_only=False,
)
async_bulk_kwargs: dict[str, Any] = {
"client": self.async_client,
"actions": actions,
"index": self._index,
"refresh": refresh,
"raise_on_error": False,
"stats_only": False,
}
if self._ingest_pipeline is not None:
async_bulk_kwargs["pipeline"] = self._ingest_pipeline

documents_written, errors = await helpers.async_bulk(**async_bulk_kwargs)

if errors:
# with stats_only=False, errors is guaranteed to be a list of dicts
Expand Down
Loading
Loading