diff --git a/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py b/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py index 898d7a5027..e27394f948 100644 --- a/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py +++ b/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py @@ -88,6 +88,7 @@ def __init__( api_key_id: Secret | str | None = Secret.from_env_var("ELASTIC_API_KEY_ID", strict=False), embedding_similarity_function: Literal["cosine", "dot_product", "l2_norm", "max_inner_product"] = "cosine", sparse_vector_field: str | None = None, + ingest_pipeline: str | None = None, **kwargs: Any, ) -> None: """ @@ -122,6 +123,27 @@ def __init__( :param sparse_vector_field: If set, the name of the Elasticsearch field where sparse embeddings will be stored using the `sparse_vector` field type. When not set, any `sparse_embedding` data on Documents is silently dropped during writes. + :param ingest_pipeline: If set, the id of an Elasticsearch ingest pipeline to run on each bulk + index or create. This is the recommended way to generate embeddings at index time using + Elasticsearch's inference processors (e.g. ELSER or a dense model) without running a + Haystack embedder component. Leading and trailing whitespace is stripped. + + Requirements when using inference processors: + + - Configure the processor with ``input_output`` so the embedding is written directly + to the right field: ``output_field`` must match ``"embedding"`` (for dense retrieval) + or the value of ``sparse_vector_field`` (for ELSER / sparse retrieval). The ES default + target ``ml.inference.`` will not be found by Haystack's retrievers. + - Do **not** also run a Haystack ``DocumentEmbedder`` upstream. If documents arrive with + a pre-computed ``embedding``, the pipeline will overwrite it with its own model's + vectors, causing a silent mismatch between stored and query embeddings at retrieval time. + - If you supply ``custom_mapping``, include the output field with the correct type + (``dense_vector`` or ``sparse_vector``). + + Sparse embedding note: Elasticsearch does not store ``sparse_vector`` data generated + by inference pipelines in ``_source``; it goes only into the inverted index. Haystack + works around this by requesting the field via the ES ``fields`` API on every search so + that ``Document.sparse_embedding`` is populated correctly on returned documents. :param **kwargs: Optional arguments that `Elasticsearch` takes. """ self._hosts = hosts @@ -132,6 +154,14 @@ def __init__( self._api_key_id = api_key_id self._embedding_similarity_function = embedding_similarity_function self._sparse_vector_field = sparse_vector_field + if ingest_pipeline is not None: + stripped_pipeline = ingest_pipeline.strip() + if not stripped_pipeline: + msg = "ingest_pipeline must be a non-empty string when set." + raise ValueError(msg) + self._ingest_pipeline: str | None = stripped_pipeline + else: + self._ingest_pipeline = None self._custom_mapping = custom_mapping self._kwargs = kwargs self._initialized = False @@ -295,6 +325,7 @@ def to_dict(self) -> dict[str, Any]: api_key_id=self._api_key_id.to_dict() if isinstance(self._api_key_id, Secret) else None, embedding_similarity_function=self._embedding_similarity_function, sparse_vector_field=self._sparse_vector_field, + ingest_pipeline=self._ingest_pipeline, **self._kwargs, ) @@ -342,6 +373,12 @@ def _search_documents(self, **kwargs: Any) -> list[Document]: if top_k is None and "knn" in kwargs and "k" in kwargs["knn"]: top_k = kwargs["knn"]["k"] + # sparse_vector data written by an ingest pipeline is not stored in _source, + # but is retrievable via the fields API. Request it explicitly so that + # _deserialize_document can populate Document.sparse_embedding correctly. + if self._sparse_vector_field and "fields" not in kwargs: + kwargs["fields"] = [self._sparse_vector_field] + documents: list[Document] = [] from_ = 0 # Handle pagination @@ -369,6 +406,12 @@ async def _search_documents_async(self, **kwargs: Any) -> list[Document]: if top_k is None and "knn" in kwargs and "k" in kwargs["knn"]: top_k = kwargs["knn"]["k"] + # sparse_vector data written by an ingest pipeline is not stored in _source, + # but is retrievable via the fields API. Request it explicitly so that + # _deserialize_document can populate Document.sparse_embedding correctly. + if self._sparse_vector_field and "fields" not in kwargs: + kwargs["fields"] = [self._sparse_vector_field] + documents: list[Document] = [] from_ = 0 @@ -437,20 +480,33 @@ def _deserialize_document(self, hit: dict[str, Any]) -> Document: data["metadata"]["highlighted"] = hit["highlight"] data["score"] = hit["_score"] - if self._sparse_vector_field and self._sparse_vector_field in data: - es_sparse = data.pop(self._sparse_vector_field) - try: - # Haystack SparseEmbedding requires integer indices. Documents indexed via - # Haystack use numeric string keys ("0", "1", ...). Documents indexed by an - # ES inference pipeline (e.g. ELSER) use token-string keys ("berlin", ...) which - # cannot be mapped to integer indices, so sparse_embedding is left unset. - sorted_items = sorted(es_sparse.items(), key=lambda x: int(x[0])) - data["sparse_embedding"] = { - "indices": [int(k) for k, _ in sorted_items], - "values": [v for _, v in sorted_items], - } - except ValueError: - pass + if self._sparse_vector_field: + sparse_field = self._sparse_vector_field + # Prefer the fields API response over _source: sparse_vector data written by an + # ingest pipeline (e.g. ELSER) is indexed but not stored in _source. The fields + # API returns it when requested by _search_documents / _search_documents_async. + # For documents indexed directly by Haystack the data is in _source as usual. + fields_hit = hit.get("fields", {}) + if sparse_field in fields_hit: + values = fields_hit[sparse_field] # fields API wraps values in a list + es_sparse = values[0] if values else None + data.pop(sparse_field, None) + else: + es_sparse = data.pop(sparse_field, None) + + if es_sparse: + try: + # Haystack SparseEmbedding requires integer indices. Documents indexed via + # Haystack use numeric string keys ("0", "1", ...). Documents indexed by an + # ES inference pipeline (e.g. ELSER) use token-string keys ("berlin", ...) which + # cannot be mapped to integer indices, so sparse_embedding is left unset. + sorted_items = sorted(es_sparse.items(), key=lambda x: int(x[0])) + data["sparse_embedding"] = { + "indices": [int(k) for k, _ in sorted_items], + "values": [v for _, v in sorted_items], + } + except ValueError: + pass return Document.from_dict(data) @@ -623,6 +679,14 @@ def write_documents( elasticsearch_actions = [] for doc in documents: doc_dict = doc.to_dict() + # ES rejects null for strongly-typed fields (dense_vector, sparse_vector) when the + # index mapping carries explicit configuration such as `dims`. A missing field is + # always valid — it lets ingest pipelines populate the value at index time, and for + # ordinary writes it simply means no value is stored. We only strip the known + # Haystack document fields here; metadata values are left untouched intentionally. + for field in ("embedding", "blob", "score"): + if doc_dict.get(field) is None: + doc_dict.pop(field, None) self._handle_sparse_embedding(doc_dict, doc.id) elasticsearch_actions.append( { @@ -632,14 +696,18 @@ def write_documents( } ) - documents_written, errors = helpers.bulk( - client=self.client, - actions=elasticsearch_actions, - refresh=refresh, - index=self._index, - raise_on_error=False, - stats_only=False, - ) + bulk_kwargs: dict[str, Any] = { + "client": self.client, + "actions": elasticsearch_actions, + "refresh": refresh, + "index": self._index, + "raise_on_error": False, + "stats_only": False, + } + if self._ingest_pipeline is not None: + bulk_kwargs["pipeline"] = self._ingest_pipeline + + documents_written, errors = helpers.bulk(**bulk_kwargs) if errors: # with stats_only=False, errors is guaranteed to be a list of dicts @@ -701,6 +769,14 @@ async def write_documents_async( actions = [] for doc in documents: doc_dict = doc.to_dict() + # ES rejects null for strongly-typed fields (dense_vector, sparse_vector) when the + # index mapping carries explicit configuration such as `dims`. A missing field is + # always valid — it lets ingest pipelines populate the value at index time, and for + # ordinary writes it simply means no value is stored. We only strip the known + # Haystack document fields here; metadata values are left untouched intentionally. + for field in ("embedding", "blob", "score"): + if doc_dict.get(field) is None: + doc_dict.pop(field, None) self._handle_sparse_embedding(doc_dict, doc.id) action = { @@ -710,14 +786,18 @@ async def write_documents_async( } actions.append(action) - documents_written, errors = await helpers.async_bulk( - client=self.async_client, - actions=actions, - index=self._index, - refresh=refresh, - raise_on_error=False, - stats_only=False, - ) + async_bulk_kwargs: dict[str, Any] = { + "client": self.async_client, + "actions": actions, + "index": self._index, + "refresh": refresh, + "raise_on_error": False, + "stats_only": False, + } + if self._ingest_pipeline is not None: + async_bulk_kwargs["pipeline"] = self._ingest_pipeline + + documents_written, errors = await helpers.async_bulk(**async_bulk_kwargs) if errors: # with stats_only=False, errors is guaranteed to be a list of dicts diff --git a/integrations/elasticsearch/tests/conftest.py b/integrations/elasticsearch/tests/conftest.py index a8c8733212..33d1f1de92 100644 --- a/integrations/elasticsearch/tests/conftest.py +++ b/integrations/elasticsearch/tests/conftest.py @@ -87,22 +87,24 @@ def inference_sparse_document_store(): """ Document store fixture for ElasticsearchInferenceSparseRetriever integration tests. - Connects to a managed Elastic Cloud instance. Requires three environment variables: - - ELASTICSEARCH_URL - cluster endpoint, e.g. https://my-cluster.es.io:443 - - ELASTIC_API_KEY - base64-encoded API key - - ELASTICSEARCH_INFERENCE_ID - deployed inference endpoint, e.g. ".elser-2-elasticsearch" + Connects to a managed Elastic Cloud instance. Requires two environment variables: + - ELASTICSEARCH_URL cluster endpoint, e.g. https://my-cluster.es.io:443 + - ELASTIC_API_KEY base64-encoded API key (id:secret) - Tests that use this fixture are skipped automatically when the variables are absent. + Optional: + - ELASTICSEARCH_INFERENCE_ID sparse inference endpoint to use + (default: ".elser-2-elastic", Elastic's hosted ELSER service + which does not consume local ML node capacity) + + Tests that use this fixture are skipped automatically when the required variables are absent. """ url = os.environ.get("ELASTICSEARCH_URL") api_key = os.environ.get("ELASTIC_API_KEY") - inference_id = os.environ.get("ELASTICSEARCH_INFERENCE_ID") - if not all([url, api_key, inference_id]): - pytest.skip("Set ELASTICSEARCH_URL, ELASTIC_API_KEY and ELASTICSEARCH_INFERENCE_ID to run inference tests") + if not all([url, api_key]): + pytest.skip("Set ELASTICSEARCH_URL and ELASTIC_API_KEY to run inference tests") + + inference_id = os.environ.get("ELASTICSEARCH_INFERENCE_ID", ".elser-2-elastic") index = f"test_inference_sparse_{uuid.uuid4().hex}" store = ElasticsearchDocumentStore( @@ -123,17 +125,144 @@ def inference_sparse_document_store(): @pytest.fixture -def hybrid_inference_document_store(): +def ingest_pipeline_dense_document_store(): """ - Document store fixture for ElasticsearchInferenceHybridRetriever integration tests. + Document store fixture for ingest pipeline tests that generate dense embeddings at index time. - Connects to a managed Elastic Cloud instance. Requires three environment variables: + Connects to a managed Elastic Cloud instance. Requires four environment variables: - ELASTICSEARCH_URL cluster endpoint, e.g. https://my-cluster.es.io:443 - ELASTIC_API_KEY base64-encoded API key - - ELASTICSEARCH_INFERENCE_ID - deployed inference endpoint, e.g. ".elser-2-elasticsearch" + - ELASTICSEARCH_DENSE_INFERENCE_ID + deployed dense inference endpoint, e.g. ".multilingual-e5-small-elasticsearch" + - ELASTICSEARCH_DENSE_EMBEDDING_DIMS + output dimension of the model, e.g. "384" + + The fixture creates a dedicated ingest pipeline and index, then tears both down after the test. + Tests that use this fixture are skipped automatically when the variables are absent. + """ + url = os.environ.get("ELASTICSEARCH_URL") + api_key = os.environ.get("ELASTIC_API_KEY") + + if not all([url, api_key]): + pytest.skip("Set ELASTICSEARCH_URL and ELASTIC_API_KEY to run ingest pipeline dense tests") + + inference_id = os.environ.get("ELASTICSEARCH_DENSE_INFERENCE_ID", ".multilingual-e5-small-elasticsearch") + dims_str = os.environ.get("ELASTICSEARCH_DENSE_EMBEDDING_DIMS", "384") + dims = int(dims_str) + pipeline_id = f"test_dense_ingest_{uuid.uuid4().hex}" + index = f"test_dense_ingest_{uuid.uuid4().hex}" + + raw_client = Elasticsearch(url, api_key=api_key) + store = None + try: + raw_client.ingest.put_pipeline( + id=pipeline_id, + processors=[ + { + "inference": { + "model_id": inference_id, + "input_output": [{"input_field": "content", "output_field": "embedding"}], + } + } + ], + ) + custom_mapping = { + "properties": { + "embedding": {"type": "dense_vector", "dims": dims, "index": True, "similarity": "cosine"}, + "content": {"type": "text"}, + } + } + store = ElasticsearchDocumentStore( + hosts=url, + api_key=Secret.from_token(api_key), + index=index, + ingest_pipeline=pipeline_id, + custom_mapping=custom_mapping, + ) + store._ensure_initialized() + yield store, inference_id + finally: + raw_client.options(ignore_status=[400, 404]).ingest.delete_pipeline(id=pipeline_id) + raw_client.options(ignore_status=[400, 404]).indices.delete(index=index) + raw_client.close() + if store is not None: + if store._client is not None: + store.client.close() + if store._async_client is not None: + asyncio.run(store._async_client.close()) + + +@pytest.fixture +def ingest_pipeline_sparse_document_store(): + """ + Document store fixture for ingest pipeline tests that generate ELSER sparse embeddings at index time. + + Connects to a managed Elastic Cloud instance. Requires two environment variables: + - ELASTICSEARCH_URL cluster endpoint, e.g. https://my-cluster.es.io:443 + - ELASTIC_API_KEY base64-encoded API key (id:secret) + + Optional: + - ELASTICSEARCH_INFERENCE_ID sparse inference endpoint to use + (default: ".elser-2-elastic") + + Tests that use this fixture are skipped automatically when the required variables are absent. + """ + url = os.environ.get("ELASTICSEARCH_URL") + api_key = os.environ.get("ELASTIC_API_KEY") + + if not all([url, api_key]): + pytest.skip("Set ELASTICSEARCH_URL and ELASTIC_API_KEY to run ingest pipeline sparse tests") + + inference_id = os.environ.get("ELASTICSEARCH_INFERENCE_ID", ".elser-2-elastic") + pipeline_id = f"test_sparse_ingest_{uuid.uuid4().hex}" + index = f"test_sparse_ingest_{uuid.uuid4().hex}" + sparse_field = "sparse_vec" + + raw_client = Elasticsearch(url, api_key=api_key) + store = None + try: + raw_client.ingest.put_pipeline( + id=pipeline_id, + processors=[ + { + "inference": { + "model_id": inference_id, + "input_output": [{"input_field": "content", "output_field": sparse_field}], + } + } + ], + ) + store = ElasticsearchDocumentStore( + hosts=url, + api_key=Secret.from_token(api_key), + index=index, + ingest_pipeline=pipeline_id, + sparse_vector_field=sparse_field, + ) + store._ensure_initialized() + yield store, inference_id + finally: + raw_client.options(ignore_status=[400, 404]).ingest.delete_pipeline(id=pipeline_id) + raw_client.options(ignore_status=[400, 404]).indices.delete(index=index) + raw_client.close() + if store is not None: + if store._client is not None: + store.client.close() + if store._async_client is not None: + asyncio.run(store._async_client.close()) + + +@pytest.fixture +def hybrid_inference_document_store(): + """ + Document store fixture for ElasticsearchInferenceHybridRetriever integration tests. + + Connects to a managed Elastic Cloud instance. Requires three environment variables: + - ELASTICSEARCH_URL cluster endpoint, e.g. https://my-cluster.es.io:443 + - ELASTIC_API_KEY base64-encoded API key (id:secret) + - ELASTICSEARCH_INFERENCE_ID deployed sparse inference endpoint, e.g. ".elser-2-elasticsearch" Tests that use this fixture are skipped automatically when the variables are absent. """ diff --git a/integrations/elasticsearch/tests/test_bm25_retriever.py b/integrations/elasticsearch/tests/test_bm25_retriever.py index 92b7e82794..8f40bbea88 100644 --- a/integrations/elasticsearch/tests/test_bm25_retriever.py +++ b/integrations/elasticsearch/tests/test_bm25_retriever.py @@ -12,6 +12,11 @@ from haystack_integrations.document_stores.elasticsearch import ElasticsearchDocumentStore +def test_init_rejects_non_elasticsearch_document_store(): + with pytest.raises(ValueError, match="document_store must be an instance of ElasticsearchDocumentStore"): + ElasticsearchBM25Retriever(document_store=Mock()) + + def test_init_default(): mock_store = Mock(spec=ElasticsearchDocumentStore) retriever = ElasticsearchBM25Retriever(document_store=mock_store) @@ -57,6 +62,7 @@ def test_to_dict(_mock_elasticsearch_client): "index": "default", "embedding_similarity_function": "cosine", "sparse_vector_field": None, + "ingest_pipeline": None, }, "type": "haystack_integrations.document_stores.elasticsearch.document_store.ElasticsearchDocumentStore", }, @@ -75,7 +81,12 @@ def test_from_dict(_mock_elasticsearch_client): "type": "haystack_integrations.components.retrievers.elasticsearch.bm25_retriever.ElasticsearchBM25Retriever", "init_parameters": { "document_store": { - "init_parameters": {"hosts": "some fake host", "index": "default", "sparse_vector_field": None}, + "init_parameters": { + "hosts": "some fake host", + "index": "default", + "sparse_vector_field": None, + "ingest_pipeline": None, + }, "type": "haystack_integrations.document_stores.elasticsearch.document_store.ElasticsearchDocumentStore", }, "filters": {}, @@ -100,7 +111,12 @@ def test_from_dict_no_filter_policy(_mock_elasticsearch_client): "type": "haystack_integrations.components.retrievers.elasticsearch.bm25_retriever.ElasticsearchBM25Retriever", "init_parameters": { "document_store": { - "init_parameters": {"hosts": "some fake host", "index": "default", "sparse_vector_field": None}, + "init_parameters": { + "hosts": "some fake host", + "index": "default", + "sparse_vector_field": None, + "ingest_pipeline": None, + }, "type": "haystack_integrations.document_stores.elasticsearch.document_store.ElasticsearchDocumentStore", }, "filters": {}, diff --git a/integrations/elasticsearch/tests/test_inference_sparse_retriever.py b/integrations/elasticsearch/tests/test_cloud_inference_sparse_retriever.py similarity index 95% rename from integrations/elasticsearch/tests/test_inference_sparse_retriever.py rename to integrations/elasticsearch/tests/test_cloud_inference_sparse_retriever.py index 52434e4ace..70b0aeeafa 100644 --- a/integrations/elasticsearch/tests/test_inference_sparse_retriever.py +++ b/integrations/elasticsearch/tests/test_cloud_inference_sparse_retriever.py @@ -1,6 +1,24 @@ # SPDX-FileCopyrightText: 2023-present deepset GmbH # # SPDX-License-Identifier: Apache-2.0 +# +# Integration tests in TestElasticsearchInferenceSparseRetrieverIntegration connect to a managed +# Elastic Cloud cluster. They are skipped automatically when the required environment variables +# are absent. +# +# Required: +# ELASTICSEARCH_URL - cluster endpoint, e.g. https://my-cluster.es.io:443 +# ELASTIC_API_KEY - base64-encoded API key (id:secret) +# +# Optional: +# ELASTICSEARCH_INFERENCE_ID - deployed sparse inference endpoint +# (default: ".elser-2-elastic", Elastic's hosted ELSER service +# which does not consume local ML node capacity) +# +# Example (bash): +# export ELASTICSEARCH_URL="https://my-cluster.es.io:443" +# export ELASTIC_API_KEY="" +# pytest -m integration tests/test_cloud_inference_sparse_retriever.py import uuid from unittest.mock import Mock, patch @@ -68,6 +86,7 @@ def test_to_dict(_mock_elasticsearch_client): "index": "default", "embedding_similarity_function": "cosine", "sparse_vector_field": "sparse_vec", + "ingest_pipeline": None, }, "type": "haystack_integrations.document_stores.elasticsearch.document_store.ElasticsearchDocumentStore", }, diff --git a/integrations/elasticsearch/tests/test_cloud_ingest_pipeline.py b/integrations/elasticsearch/tests/test_cloud_ingest_pipeline.py new file mode 100644 index 0000000000..025052eea5 --- /dev/null +++ b/integrations/elasticsearch/tests/test_cloud_ingest_pipeline.py @@ -0,0 +1,335 @@ +# SPDX-FileCopyrightText: 2023-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 +# +# These tests connect to a managed Elastic Cloud cluster and exercise real ingest pipelines +# that generate embeddings at index time using Elasticsearch inference processors. +# +# Required environment variables (tests are skipped automatically when absent): +# +# For both TestIngestPipelineDense and TestIngestPipelineSparse: +# ELASTICSEARCH_URL - cluster endpoint, e.g. https://my-cluster.es.io:443 +# ELASTIC_API_KEY - base64-encoded API key (id:secret) +# +# Optional (sensible defaults are used when not set): +# +# For TestIngestPipelineDense: +# ELASTICSEARCH_DENSE_INFERENCE_ID - deployed dense inference endpoint +# (default: ".multilingual-e5-small-elasticsearch") +# ELASTICSEARCH_DENSE_EMBEDDING_DIMS - output dimension of that model +# (default: "384") +# +# For TestIngestPipelineSparse: +# ELASTICSEARCH_INFERENCE_ID - deployed sparse inference endpoint +# (default: ".elser-2-elastic", Elastic's hosted ELSER +# service which does not consume local ML node capacity) +# +# Example (bash): +# export ELASTICSEARCH_URL="https://my-cluster.es.io:443" +# export ELASTIC_API_KEY="" +# pytest -m integration tests/test_cloud_ingest_pipeline.py + +import pytest +from haystack.dataclasses import Document + +from haystack_integrations.components.retrievers.elasticsearch import ( + ElasticsearchEmbeddingRetriever, + ElasticsearchInferenceSparseRetriever, +) + + +def _get_dense_query_embedding(client, inference_id: str, text: str) -> list[float]: + """Call the ES inference API to embed a query string using the same model as the ingest pipeline.""" + response = client.inference.inference(inference_id=inference_id, input=[text]) + return response["text_embedding"][0]["embedding"] + + +@pytest.mark.integration +class TestIngestPipelineDense: + """ + End-to-end integration tests for ElasticsearchDocumentStore with an ingest pipeline + that generates dense embeddings at index time. + + The fixture creates a real ES ingest pipeline (inference processor → embedding field) + on Elastic Cloud. Documents are written without pre-computed embeddings; the pipeline + fills the embedding field before the document is committed to the index. + """ + + def test_indexed_document_has_embedding_filled_by_pipeline(self, ingest_pipeline_dense_document_store): + store, inference_id = ingest_pipeline_dense_document_store + store.write_documents([Document(id="doc-1", content="The Eiffel Tower is located in Paris.")]) + + store.client.indices.refresh(index=store._index) + raw = store.client.get(index=store._index, id="doc-1") + + # ES inference pipelines do NOT write dense_vector data to _source; only to the vector index. + assert raw["_source"].get("embedding") is None + + # The vector IS in the index: a KNN search using the same model finds the document. + query_embedding = _get_dense_query_embedding(store.client, inference_id, "Eiffel Tower Paris") + result = store.client.search( + index=store._index, + knn={"field": "embedding", "query_vector": query_embedding, "k": 1, "num_candidates": 10}, + ) + assert result["hits"]["total"]["value"] == 1, "pipeline did not populate the 'embedding' field" + + def test_embedding_retriever_finds_most_relevant_document(self, ingest_pipeline_dense_document_store): + store, inference_id = ingest_pipeline_dense_document_store + retriever = ElasticsearchEmbeddingRetriever(document_store=store, top_k=1) + + store.write_documents( + [ + Document(id="1", content="The Eiffel Tower is a famous landmark in Paris, France."), + Document(id="2", content="The Amazon River flows through the South American rainforest."), + ] + ) + + query_embedding = _get_dense_query_embedding(store.client, inference_id, "famous tower in France") + result = retriever.run(query_embedding=query_embedding) + + assert len(result["documents"]) == 1 + assert "Eiffel" in result["documents"][0].content + + def test_embedding_retriever_respects_top_k(self, ingest_pipeline_dense_document_store): + store, inference_id = ingest_pipeline_dense_document_store + retriever = ElasticsearchEmbeddingRetriever(document_store=store, top_k=2) + + store.write_documents( + [ + Document(id="1", content="Python is a popular high-level programming language."), + Document(id="2", content="Java is widely used in enterprise software development."), + Document(id="3", content="Rust focuses on memory safety and systems programming."), + ] + ) + + query_embedding = _get_dense_query_embedding(store.client, inference_id, "programming language") + result = retriever.run(query_embedding=query_embedding) + + assert 0 < len(result["documents"]) <= 2 + + def test_embedding_retriever_with_metadata_filter(self, ingest_pipeline_dense_document_store): + store, inference_id = ingest_pipeline_dense_document_store + retriever = ElasticsearchEmbeddingRetriever(document_store=store, top_k=5) + + store.write_documents( + [ + Document(id="1", content="Berlin is the capital of Germany.", meta={"lang": "en"}), + Document(id="2", content="Berlin ist die Hauptstadt von Deutschland.", meta={"lang": "de"}), + ] + ) + + query_embedding = _get_dense_query_embedding(store.client, inference_id, "capital of Germany") + result = retriever.run( + query_embedding=query_embedding, + filters={"field": "meta.lang", "operator": "==", "value": "en"}, + ) + + assert len(result["documents"]) == 1 + assert result["documents"][0].content == "Berlin is the capital of Germany." + + def test_multiple_documents_are_all_indexed_with_embeddings(self, ingest_pipeline_dense_document_store): + store, inference_id = ingest_pipeline_dense_document_store + docs = [Document(id=f"doc-{i}", content=f"Document number {i} about various topics.") for i in range(5)] + store.write_documents(docs) + + store.client.indices.refresh(index=store._index) + assert store.count_documents() == 5 + + for doc in docs: + raw = store.client.get(index=store._index, id=doc.id) + # ES inference pipelines do NOT write dense_vector data to _source; only to the vector index. + assert raw["_source"].get("embedding") is None + + # Verify all documents have embeddings via KNN search — finds docs only if vectors exist. + query_embedding = _get_dense_query_embedding(store.client, inference_id, "document about topics") + result = store.client.search( + index=store._index, + knn={"field": "embedding", "query_vector": query_embedding, "k": 5, "num_candidates": 50}, + ) + assert result["hits"]["total"]["value"] == 5, "not all documents were indexed with embeddings" + + def test_retrieved_documents_carry_score(self, ingest_pipeline_dense_document_store): + store, inference_id = ingest_pipeline_dense_document_store + retriever = ElasticsearchEmbeddingRetriever(document_store=store, top_k=2) + + store.write_documents( + [ + Document(id="1", content="Mount Everest is the highest mountain on Earth."), + Document(id="2", content="The Pacific Ocean is the largest ocean on Earth."), + ] + ) + + query_embedding = _get_dense_query_embedding(store.client, inference_id, "tallest mountain") + result = retriever.run(query_embedding=query_embedding) + + assert len(result["documents"]) > 0 + for doc in result["documents"]: + assert doc.score is not None + assert doc.content is not None + + @pytest.mark.asyncio + async def test_async_write_documents_via_pipeline(self, ingest_pipeline_dense_document_store): + store, inference_id = ingest_pipeline_dense_document_store + retriever = ElasticsearchEmbeddingRetriever(document_store=store, top_k=1) + + await store.write_documents_async( + [ + Document(id="async-1", content="Rome is the capital of Italy."), + Document(id="async-2", content="Tokyo is the capital of Japan."), + ] + ) + + store.client.indices.refresh(index=store._index) + raw = store.client.get(index=store._index, id="async-1") + # ES inference pipelines do NOT write dense_vector data to _source; only to the vector index. + assert raw["_source"].get("embedding") is None + + query_embedding = _get_dense_query_embedding(store.client, inference_id, "capital of Italy") + result = retriever.run(query_embedding=query_embedding) + + assert len(result["documents"]) == 1 + assert "Rome" in result["documents"][0].content + + +@pytest.mark.integration +class TestIngestPipelineSparse: + """ + End-to-end integration tests for ElasticsearchDocumentStore with an ingest pipeline + that generates ELSER sparse embeddings at index time. + + The fixture creates a real ES ingest pipeline (ELSER inference processor → sparse_vec field) + on Elastic Cloud. Documents are written without pre-computed sparse embeddings; the pipeline + fills the sparse_vec field before the document is committed to the index. + + Run with: pytest -m integration + """ + + def test_indexed_document_has_sparse_embedding_filled_by_pipeline(self, ingest_pipeline_sparse_document_store): + store, _ = ingest_pipeline_sparse_document_store + store.write_documents([Document(id="doc-1", content="The Eiffel Tower is located in Paris.")]) + + store.client.indices.refresh(index=store._index) + raw = store.client.get(index=store._index, id="doc-1") + + # ES inference pipelines do NOT write sparse_vector data to _source; only to the inverted index. + assert raw["_source"].get("sparse_vec") is None + + # The data IS in the index and retrievable via the fields API. + result = store.client.search( + index=store._index, + fields=["sparse_vec"], + query={"ids": {"values": ["doc-1"]}}, + ) + hit = result["hits"]["hits"][0] + assert "sparse_vec" in hit.get("fields", {}), "pipeline did not write to the 'sparse_vec' field" + + def test_inference_sparse_retriever_finds_most_relevant_document(self, ingest_pipeline_sparse_document_store): + store, inference_id = ingest_pipeline_sparse_document_store + retriever = ElasticsearchInferenceSparseRetriever(document_store=store, inference_id=inference_id, top_k=1) + + store.write_documents( + [ + Document(id="1", content="The Eiffel Tower is a famous landmark in Paris, France."), + Document(id="2", content="The Amazon River flows through the South American rainforest."), + ] + ) + + result = retriever.run(query="famous tower in France") + + assert len(result["documents"]) == 1 + assert "Eiffel" in result["documents"][0].content + + def test_inference_sparse_retriever_respects_top_k(self, ingest_pipeline_sparse_document_store): + store, inference_id = ingest_pipeline_sparse_document_store + retriever = ElasticsearchInferenceSparseRetriever(document_store=store, inference_id=inference_id, top_k=2) + + store.write_documents( + [ + Document(id="1", content="Python is a popular high-level programming language."), + Document(id="2", content="Java is widely used in enterprise software development."), + Document(id="3", content="Rust focuses on memory safety and systems programming."), + ] + ) + + result = retriever.run(query="programming language") + + assert 0 < len(result["documents"]) <= 2 + + def test_inference_sparse_retriever_with_metadata_filter(self, ingest_pipeline_sparse_document_store): + store, inference_id = ingest_pipeline_sparse_document_store + retriever = ElasticsearchInferenceSparseRetriever(document_store=store, inference_id=inference_id, top_k=5) + + store.write_documents( + [ + Document(id="1", content="Berlin is the capital of Germany.", meta={"lang": "en"}), + Document(id="2", content="Berlin ist die Hauptstadt von Deutschland.", meta={"lang": "de"}), + ] + ) + + result = retriever.run( + query="capital of Germany", + filters={"field": "meta.lang", "operator": "==", "value": "en"}, + ) + + assert len(result["documents"]) == 1 + assert result["documents"][0].content == "Berlin is the capital of Germany." + + def test_multiple_documents_are_all_indexed_with_sparse_embeddings(self, ingest_pipeline_sparse_document_store): + store, _ = ingest_pipeline_sparse_document_store + docs = [Document(id=f"doc-{i}", content=f"Document number {i} about various topics.") for i in range(5)] + store.write_documents(docs) + + store.client.indices.refresh(index=store._index) + assert store.count_documents() == 5 + + for doc in docs: + raw = store.client.get(index=store._index, id=doc.id) + # ES inference pipelines do NOT write sparse_vector data to _source; only to the inverted index. + assert raw["_source"].get("sparse_vec") is None + + # Verify all documents have sparse_vec populated in the index via the fields API. + doc_ids = [doc.id for doc in docs] + result = store.client.search( + index=store._index, + fields=["sparse_vec"], + query={"ids": {"values": doc_ids}}, + size=len(docs), + ) + for hit in result["hits"]["hits"]: + assert "sparse_vec" in hit.get("fields", {}), f"doc {hit['_id']} missing sparse_vec in index" + + def test_retrieved_documents_carry_score(self, ingest_pipeline_sparse_document_store): + store, inference_id = ingest_pipeline_sparse_document_store + retriever = ElasticsearchInferenceSparseRetriever(document_store=store, inference_id=inference_id, top_k=2) + + store.write_documents( + [ + Document(id="1", content="Mount Everest is the highest mountain on Earth."), + Document(id="2", content="The Pacific Ocean is the largest ocean on Earth."), + ] + ) + + result = retriever.run(query="tallest mountain in the world") + + assert len(result["documents"]) > 0 + for doc in result["documents"]: + assert doc.score is not None + assert doc.content is not None + + @pytest.mark.asyncio + async def test_async_write_documents_via_pipeline(self, ingest_pipeline_sparse_document_store): + store, inference_id = ingest_pipeline_sparse_document_store + retriever = ElasticsearchInferenceSparseRetriever(document_store=store, inference_id=inference_id, top_k=1) + + await store.write_documents_async( + [ + Document(id="async-1", content="Rome is the capital of Italy."), + Document(id="async-2", content="Tokyo is the capital of Japan."), + ] + ) + + store.client.indices.refresh(index=store._index) + result = retriever.run(query="capital of Italy") + + assert len(result["documents"]) == 1 + assert "Rome" in result["documents"][0].content diff --git a/integrations/elasticsearch/tests/test_document_store.py b/integrations/elasticsearch/tests/test_document_store.py index d713ce8232..e361d67297 100644 --- a/integrations/elasticsearch/tests/test_document_store.py +++ b/integrations/elasticsearch/tests/test_document_store.py @@ -37,6 +37,16 @@ def test_init_with_special_fields_raises_error(): ElasticsearchDocumentStore(sparse_vector_field="content") +def test_init_ingest_pipeline_empty_raises(): + with pytest.raises(ValueError, match="ingest_pipeline must be a non-empty string"): + ElasticsearchDocumentStore(ingest_pipeline=" ") + + +def test_init_ingest_pipeline_strips_whitespace(): + store = ElasticsearchDocumentStore(ingest_pipeline=" my-pipeline ") + assert store._ingest_pipeline == "my-pipeline" + + def test_init_with_custom_mapping_injects_sparse_vector(): custom_mapping = {"properties": {"some_field": {"type": "text"}}} store = ElasticsearchDocumentStore(custom_mapping=custom_mapping, sparse_vector_field="my_sparse_vec") @@ -114,6 +124,7 @@ def test_to_dict(): "index": "default", "embedding_similarity_function": "cosine", "sparse_vector_field": None, + "ingest_pipeline": None, }, } @@ -129,6 +140,7 @@ def test_from_dict(): "api_key_id": None, "embedding_similarity_function": "cosine", "sparse_vector_field": None, + "ingest_pipeline": None, }, } document_store = ElasticsearchDocumentStore.from_dict(data) @@ -139,6 +151,7 @@ def test_from_dict(): assert document_store._sparse_vector_field is None assert document_store._api_key_id is None assert document_store._embedding_similarity_function == "cosine" + assert document_store._ingest_pipeline is None def test_to_dict_with_api_keys_env_vars(): @@ -182,6 +195,7 @@ def test_from_dict_with_api_keys_env_vars(): "api_key_id": {"type": "env_var", "env_vars": ["ELASTIC_API_KEY_ID"], "strict": False}, "embedding_similarity_function": "cosine", "sparse_vector_field": None, + "ingest_pipeline": None, }, } @@ -201,6 +215,7 @@ def test_from_dict_with_api_keys_str(): "api_key_id": "my_api_key_id", "embedding_similarity_function": "cosine", "sparse_vector_field": None, + "ingest_pipeline": None, }, } @@ -226,6 +241,90 @@ def test_from_dict_without_sparse_vector_field(): assert document_store._sparse_vector_field is None +def test_from_dict_without_ingest_pipeline(): + data = { + "type": "haystack_integrations.document_stores.elasticsearch.document_store.ElasticsearchDocumentStore", + "init_parameters": { + "hosts": "some hosts", + "custom_mapping": None, + "index": "default", + "api_key": "my_api_key", + "api_key_id": "my_api_key_id", + "embedding_similarity_function": "cosine", + "sparse_vector_field": None, + }, + } + + document_store = ElasticsearchDocumentStore.from_dict(data) + assert document_store._ingest_pipeline is None + + +@patch("haystack_integrations.document_stores.elasticsearch.document_store.helpers.bulk") +@patch("haystack_integrations.document_stores.elasticsearch.document_store.AsyncElasticsearch") +@patch("haystack_integrations.document_stores.elasticsearch.document_store.Elasticsearch") +def test_write_documents_bulk_passes_pipeline_when_configured(mock_es, _mock_async_es, mock_bulk): + mock_client = Mock() + mock_client.info.return_value = {"version": {"number": "8.0.0"}} + mock_client.indices.exists.return_value = True + mock_es.return_value = mock_client + mock_bulk.return_value = (1, []) + + store = ElasticsearchDocumentStore( + hosts="http://localhost:9200", + index="idx_test_pipeline", + ingest_pipeline="my-ingest", + ) + _ = store.client + store.write_documents([Document(id="1", content="a")]) + + mock_bulk.assert_called_once() + assert mock_bulk.call_args.kwargs["pipeline"] == "my-ingest" + + +@patch("haystack_integrations.document_stores.elasticsearch.document_store.helpers.bulk") +@patch("haystack_integrations.document_stores.elasticsearch.document_store.AsyncElasticsearch") +@patch("haystack_integrations.document_stores.elasticsearch.document_store.Elasticsearch") +def test_write_documents_bulk_omits_pipeline_when_not_configured(mock_es, _mock_async_es, mock_bulk): + mock_client = Mock() + mock_client.info.return_value = {"version": {"number": "8.0.0"}} + mock_client.indices.exists.return_value = True + mock_es.return_value = mock_client + mock_bulk.return_value = (1, []) + + store = ElasticsearchDocumentStore(hosts="http://localhost:9200", index="idx_no_pipeline") + _ = store.client + store.write_documents([Document(id="1", content="a")]) + + mock_bulk.assert_called_once() + assert "pipeline" not in mock_bulk.call_args.kwargs + + +@pytest.mark.asyncio +@patch("haystack_integrations.document_stores.elasticsearch.document_store.helpers.async_bulk") +@patch("haystack_integrations.document_stores.elasticsearch.document_store.AsyncElasticsearch") +@patch("haystack_integrations.document_stores.elasticsearch.document_store.Elasticsearch") +async def test_write_documents_async_bulk_passes_pipeline_when_configured(mock_es, mock_async_es_cls, mock_async_bulk): + mock_client = Mock() + mock_client.info.return_value = {"version": {"number": "8.0.0"}} + mock_client.indices.exists.return_value = True + mock_es.return_value = mock_client + + mock_async_es_cls.return_value = AsyncMock() + + mock_async_bulk.return_value = (1, []) + + store = ElasticsearchDocumentStore( + hosts="http://localhost:9200", + index="idx_async_pipeline", + ingest_pipeline="pipe-async", + ) + _ = store.client + await store.write_documents_async([Document(id="1", content="a")]) + + mock_async_bulk.assert_called_once() + assert mock_async_bulk.call_args.kwargs["pipeline"] == "pipe-async" + + def test_api_key_validation_only_api_key(): api_key = Secret.from_token("test_api_key") document_store = ElasticsearchDocumentStore(hosts="https://localhost:9200", api_key=api_key) @@ -367,6 +466,48 @@ def test_deserialize_document_with_elser_string_token_keys(): assert doc.sparse_embedding is None +def test_deserialize_document_fields_api_with_numeric_keys(): + # When ES returns sparse data via the fields API (list-wrapped), numeric keys are deserialized correctly. + # This covers the ingest-pipeline case where sparse_vec is absent from _source. + store = ElasticsearchDocumentStore(hosts="testhost", sparse_vector_field="sparse_vec") + hit = { + "_source": {"id": "doc-1", "content": "Berlin"}, + "_score": 1.0, + "fields": {"sparse_vec": [{"0": 0.9, "2": 0.5, "1": 0.7}]}, + } + doc = store._deserialize_document(hit) + assert doc.sparse_embedding is not None + assert doc.sparse_embedding.indices == [0, 1, 2] + assert doc.sparse_embedding.values == [0.9, 0.7, 0.5] + + +def test_deserialize_document_fields_api_takes_precedence_over_source(): + # fields API data wins over _source when both are present. + store = ElasticsearchDocumentStore(hosts="testhost", sparse_vector_field="sparse_vec") + hit = { + "_source": {"id": "doc-1", "content": "Berlin", "sparse_vec": {"0": 0.1}}, + "_score": 1.0, + "fields": {"sparse_vec": [{"0": 0.9, "1": 0.7}]}, + } + doc = store._deserialize_document(hit) + assert doc.sparse_embedding is not None + assert doc.sparse_embedding.indices == [0, 1] + assert doc.sparse_embedding.values == [0.9, 0.7] + + +def test_deserialize_document_fields_api_empty_list_does_not_crash(): + # ES can return the field key with an empty list if the field is mapped but has no value. + store = ElasticsearchDocumentStore(hosts="testhost", sparse_vector_field="sparse_vec") + hit = { + "_source": {"id": "doc-1", "content": "Berlin"}, + "_score": 1.0, + "fields": {"sparse_vec": []}, + } + doc = store._deserialize_document(hit) + assert doc.content == "Berlin" + assert doc.sparse_embedding is None + + def test_sparse_vector_retrieval_builds_query_without_filters(): store = ElasticsearchDocumentStore(hosts="some hosts", sparse_vector_field="sparse_vec") diff --git a/integrations/elasticsearch/tests/test_embedding_retriever.py b/integrations/elasticsearch/tests/test_embedding_retriever.py index 555ddce727..f2d6e2107d 100644 --- a/integrations/elasticsearch/tests/test_embedding_retriever.py +++ b/integrations/elasticsearch/tests/test_embedding_retriever.py @@ -1,6 +1,7 @@ # SPDX-FileCopyrightText: 2023-present deepset GmbH # # SPDX-License-Identifier: Apache-2.0 + from unittest.mock import Mock, patch import pytest @@ -11,6 +12,11 @@ from haystack_integrations.document_stores.elasticsearch import ElasticsearchDocumentStore +def test_init_rejects_non_elasticsearch_document_store(): + with pytest.raises(ValueError, match="document_store must be an instance of ElasticsearchDocumentStore"): + ElasticsearchEmbeddingRetriever(document_store=Mock()) + + def test_init_default(): mock_store = Mock(spec=ElasticsearchDocumentStore) retriever = ElasticsearchEmbeddingRetriever(document_store=mock_store) @@ -56,6 +62,7 @@ def test_to_dict(_mock_elasticsearch_client): "index": "default", "embedding_similarity_function": "cosine", "sparse_vector_field": None, + "ingest_pipeline": None, }, "type": "haystack_integrations.document_stores.elasticsearch.document_store.ElasticsearchDocumentStore", }, @@ -74,7 +81,12 @@ def test_from_dict(_mock_elasticsearch_client): "type": t, "init_parameters": { "document_store": { - "init_parameters": {"hosts": "some fake host", "index": "default", "sparse_vector_field": None}, + "init_parameters": { + "hosts": "some fake host", + "index": "default", + "sparse_vector_field": None, + "ingest_pipeline": None, + }, "type": "haystack_integrations.document_stores.elasticsearch.document_store.ElasticsearchDocumentStore", }, "filters": {}, @@ -97,7 +109,12 @@ def test_from_dict_no_filter_policy(_mock_elasticsearch_client): "type": t, "init_parameters": { "document_store": { - "init_parameters": {"hosts": "some fake host", "index": "default", "sparse_vector_field": None}, + "init_parameters": { + "hosts": "some fake host", + "index": "default", + "sparse_vector_field": None, + "ingest_pipeline": None, + }, "type": "haystack_integrations.document_stores.elasticsearch.document_store.ElasticsearchDocumentStore", }, "filters": {}, diff --git a/integrations/elasticsearch/tests/test_filters.py b/integrations/elasticsearch/tests/test_filters.py index 5f09f0a49d..47dce690a4 100644 --- a/integrations/elasticsearch/tests/test_filters.py +++ b/integrations/elasticsearch/tests/test_filters.py @@ -219,3 +219,80 @@ def test_normalize_ranges(): assert conditions == [ {"range": {"date": {"lt": "2021-01-01", "gte": "2015-01-01"}}}, ] + + +def test_normalize_filters_rejects_non_dict(): + with pytest.raises(FilterError, match="Filters must be a dictionary"): + _normalize_filters("not-a-dict") # type: ignore[arg-type] + + +def test_equal_none_list_and_text_branches(): + assert _normalize_filters({"field": "meta.k", "operator": "==", "value": None}) == { + "bool": {"must": {"bool": {"must_not": {"exists": {"field": "k"}}}}} + } + out = _normalize_filters({"field": "meta.tags", "operator": "==", "value": ["a", "b"]}) + inner = out["bool"]["must"] + assert "terms_set" in inner + assert inner["terms_set"]["tags"]["terms"] == ["a", "b"] + + +def test_not_equal_none_list_text_and_term(): + ne_none = _normalize_filters({"field": "meta.k", "operator": "!=", "value": None}) + assert ne_none == {"bool": {"must": {"exists": {"field": "k"}}}} + ne_list = _normalize_filters({"field": "meta.k", "operator": "!=", "value": [1, 2]}) + assert ne_list == {"bool": {"must": {"bool": {"must_not": {"terms": {"k": [1, 2]}}}}}} + + ne_text = _normalize_filters({"field": "text", "operator": "!=", "value": "hello"}) + assert "must_not" in ne_text["bool"]["must"]["bool"] + + ne_term = _normalize_filters({"field": "meta.k", "operator": "!=", "value": "v"}) + assert ne_term == {"bool": {"must": {"bool": {"must_not": {"term": {"k": "v"}}}}}} + + +@pytest.mark.parametrize( + ("op", "bound"), + [ + (">", "gt"), + (">=", "gte"), + ("<", "lt"), + ("<=", "lte"), + ], +) +def test_range_operators_numeric_and_iso_string(op: str, bound: str): + body = _normalize_filters({"field": "meta.n", "operator": op, "value": 5}) + assert body == {"bool": {"must": {"range": {"n": {bound: 5}}}}} + + body_dt = _normalize_filters({"field": "meta.d", "operator": op, "value": "2020-01-01"}) + assert body_dt["bool"]["must"]["range"]["d"][bound] == "2020-01-01" + + +@pytest.mark.parametrize("op", [">", ">=", "<", "<="]) +def test_range_operators_none_yields_empty_match(op: str): + body = _normalize_filters({"field": "meta.x", "operator": op, "value": None}) + inner = body["bool"]["must"] + assert "bool" in inner and "must" in inner["bool"] + assert len(inner["bool"]["must"]) == 2 + + +@pytest.mark.parametrize("op", [">", ">=", "<", "<="]) +def test_range_operators_reject_non_iso_string(op: str): + with pytest.raises(FilterError, match="ISO formatted dates"): + _normalize_filters({"field": "meta.x", "operator": op, "value": "not-a-date"}) + + +@pytest.mark.parametrize("op", [">", ">=", "<", "<="]) +def test_range_operators_reject_list_value(op: str): + with pytest.raises(FilterError, match="Filter value can't be of type"): + _normalize_filters({"field": "meta.x", "operator": op, "value": [1, 2]}) + + +def test_in_and_not_in_require_list(): + with pytest.raises(FilterError, match="must be a list"): + _normalize_filters({"field": "meta.x", "operator": "in", "value": "single"}) + with pytest.raises(FilterError, match="must be a list"): + _normalize_filters({"field": "meta.x", "operator": "not in", "value": 1}) + + +def test_not_in_with_list_value(): + body = _normalize_filters({"field": "meta.x", "operator": "not in", "value": ["a", "b"]}) + assert body == {"bool": {"must": {"bool": {"must_not": {"terms": {"x": ["a", "b"]}}}}}} diff --git a/integrations/elasticsearch/tests/test_elasticsearch_hybrid_retriever.py b/integrations/elasticsearch/tests/test_hybrid_retriever.py similarity index 99% rename from integrations/elasticsearch/tests/test_elasticsearch_hybrid_retriever.py rename to integrations/elasticsearch/tests/test_hybrid_retriever.py index 1abd62fdb8..f94fe3f24c 100644 --- a/integrations/elasticsearch/tests/test_elasticsearch_hybrid_retriever.py +++ b/integrations/elasticsearch/tests/test_hybrid_retriever.py @@ -38,6 +38,7 @@ class TestElasticsearchHybridRetriever: "api_key_id": {"type": "env_var", "env_vars": ["ELASTIC_API_KEY_ID"], "strict": False}, "embedding_similarity_function": "cosine", "sparse_vector_field": None, + "ingest_pipeline": None, }, }, "embedder": { diff --git a/integrations/elasticsearch/tests/test_inference_hybrid_retriever.py b/integrations/elasticsearch/tests/test_inference_hybrid_retriever.py new file mode 100644 index 0000000000..00862278d9 --- /dev/null +++ b/integrations/elasticsearch/tests/test_inference_hybrid_retriever.py @@ -0,0 +1,356 @@ +# SPDX-FileCopyrightText: 2023-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +# To run these integration tests, you need access to an Elastic Cloud account with the ELSER model available. +# If you don't have one, you can sign up for a free trial at https://cloud.elastic.co/signup. +# +# Go to cloud.elastic.co and create a new Elasticsearch Serverless project: +# +# 1. Click Create project +# 2. Serverless projects → Elasticsearch +# 3. Choose a region (e.g. eu-central-1 or closest to you) +# 4. Give it a name and click Create +# +# Once it's ready (takes ~1-2 min), grab: +# - Endpoint URL → ELASTICSEARCH_URL +# - API key → create one under API Keys in the project settings → ELASTIC_API_KEY +# +# Then run the tests with the environment variables set: +# +# ELASTICSEARCH_INFERENCE_ID=".elser-2-elasticsearch" \ +# ELASTICSEARCH_URL="https://.es..aws.elastic.cloud" \ +# ELASTIC_API_KEY="" \ +# +# No model deployment needed — .elser-2-elasticsearch is available out of the box on Serverless. + + +from copy import deepcopy +from unittest.mock import AsyncMock, Mock, patch + +import pytest +from haystack import Document +from haystack.document_stores.types import FilterPolicy + +from haystack_integrations.components.retrievers.elasticsearch import ElasticsearchInferenceHybridRetriever +from haystack_integrations.document_stores.elasticsearch import ElasticsearchDocumentStore + +serialised = { + "type": "haystack_integrations.components.retrievers.elasticsearch.inference_hybrid_retriever.ElasticsearchInferenceHybridRetriever", # noqa: E501 + "init_parameters": { + "document_store": { + "type": "haystack_integrations.document_stores.elasticsearch.document_store.ElasticsearchDocumentStore", + "init_parameters": { + "hosts": None, + "custom_mapping": None, + "index": "default", + "api_key": {"type": "env_var", "env_vars": ["ELASTIC_API_KEY"], "strict": False}, + "api_key_id": {"type": "env_var", "env_vars": ["ELASTIC_API_KEY_ID"], "strict": False}, + "embedding_similarity_function": "cosine", + "sparse_vector_field": None, + }, + }, + "inference_id": ".elser_model_2", + "filters": {}, + "fuzziness": "AUTO", + "top_k": 10, + "filter_policy": "replace", + "rank_window_size": 100, + "rank_constant": 60, + }, +} + + +# --- Unit tests --- + + +@patch("haystack_integrations.document_stores.elasticsearch.document_store.Elasticsearch") +def test_init_default(_mock_es): + doc_store = ElasticsearchDocumentStore() + retriever = ElasticsearchInferenceHybridRetriever(document_store=doc_store, inference_id=".elser_model_2") + assert retriever._document_store is doc_store + assert retriever._inference_id == ".elser_model_2" + assert retriever._filters == {} + assert retriever._fuzziness == "AUTO" + assert retriever._top_k == 10 + assert retriever._filter_policy == FilterPolicy.REPLACE + assert retriever._rank_window_size == 100 + assert retriever._rank_constant == 60 + + +@patch("haystack_integrations.document_stores.elasticsearch.document_store.Elasticsearch") +def test_init_custom(_mock_es): + doc_store = ElasticsearchDocumentStore() + retriever = ElasticsearchInferenceHybridRetriever( + document_store=doc_store, + inference_id=".elser_model_2", + filters={"field": "value"}, + fuzziness="1", + top_k=5, + filter_policy=FilterPolicy.MERGE, + rank_window_size=50, + rank_constant=30, + ) + assert retriever._filters == {"field": "value"} + assert retriever._fuzziness == "1" + assert retriever._top_k == 5 + assert retriever._filter_policy == FilterPolicy.MERGE + assert retriever._rank_window_size == 50 + assert retriever._rank_constant == 30 + + +@patch("haystack_integrations.document_stores.elasticsearch.document_store.Elasticsearch") +def test_init_requires_inference_id(_mock_es): + doc_store = ElasticsearchDocumentStore() + with pytest.raises(ValueError, match="inference_id must be provided"): + ElasticsearchInferenceHybridRetriever(document_store=doc_store, inference_id="") + + +@patch("haystack_integrations.document_stores.elasticsearch.document_store.Elasticsearch") +def test_init_wrong_document_store_type(_mock_es): + with pytest.raises(ValueError, match="document_store must be an instance of ElasticsearchDocumentStore"): + ElasticsearchInferenceHybridRetriever(document_store=Mock(), inference_id=".elser_model_2") + + +@patch("haystack_integrations.document_stores.elasticsearch.document_store.Elasticsearch") +def test_to_dict(_mock_es): + doc_store = ElasticsearchDocumentStore() + retriever = ElasticsearchInferenceHybridRetriever(document_store=doc_store, inference_id=".elser_model_2") + assert retriever.to_dict() == serialised + + +@patch("haystack_integrations.document_stores.elasticsearch.document_store.Elasticsearch") +def test_from_dict(_mock_es): + data = deepcopy(serialised) + deserialized = ElasticsearchInferenceHybridRetriever.from_dict(data) + assert isinstance(deserialized, ElasticsearchInferenceHybridRetriever) + assert deserialized.to_dict() == serialised + + +@patch("haystack_integrations.document_stores.elasticsearch.document_store.Elasticsearch") +def test_from_dict_no_filter_policy(_mock_es): + data = deepcopy(serialised) + del data["init_parameters"]["filter_policy"] + deserialized = ElasticsearchInferenceHybridRetriever.from_dict(data) + assert isinstance(deserialized, ElasticsearchInferenceHybridRetriever) + + +def test_run(): + mock_store = Mock(spec=ElasticsearchDocumentStore) + mock_store._hybrid_retrieval_inference.return_value = [ + Document(content="BM25 result"), + Document(content="Sparse result"), + ] + retriever = ElasticsearchInferenceHybridRetriever(document_store=mock_store, inference_id=".elser_model_2") + result = retriever.run(query="test query") + + mock_store._hybrid_retrieval_inference.assert_called_once_with( + query="test query", + inference_id=".elser_model_2", + filters={}, + fuzziness="AUTO", + top_k=10, + rank_window_size=100, + rank_constant=60, + ) + assert len(result["documents"]) == 2 + + +def test_run_runtime_top_k_overrides(): + mock_store = Mock(spec=ElasticsearchDocumentStore) + mock_store._hybrid_retrieval_inference.return_value = [Document(content="Result")] + retriever = ElasticsearchInferenceHybridRetriever(document_store=mock_store, inference_id=".elser_model_2") + retriever.run(query="test query", top_k=3) + + mock_store._hybrid_retrieval_inference.assert_called_once_with( + query="test query", + inference_id=".elser_model_2", + filters={}, + fuzziness="AUTO", + top_k=3, + rank_window_size=100, + rank_constant=60, + ) + + +def test_run_replace_filter_policy(): + mock_store = Mock(spec=ElasticsearchDocumentStore) + mock_store._hybrid_retrieval_inference.return_value = [] + retriever = ElasticsearchInferenceHybridRetriever( + document_store=mock_store, + inference_id=".elser_model_2", + filters={"field": "init", "operator": "==", "value": "init"}, + filter_policy=FilterPolicy.REPLACE, + ) + retriever.run(query="test", filters={"field": "runtime", "operator": "==", "value": "runtime"}) + + call_filters = mock_store._hybrid_retrieval_inference.call_args.kwargs["filters"] + assert call_filters == {"field": "runtime", "operator": "==", "value": "runtime"} + + +def test_run_merge_filter_policy(): + mock_store = Mock(spec=ElasticsearchDocumentStore) + mock_store._hybrid_retrieval_inference.return_value = [] + retriever = ElasticsearchInferenceHybridRetriever( + document_store=mock_store, + inference_id=".elser_model_2", + filters={"field": "category", "operator": "==", "value": "news"}, + filter_policy=FilterPolicy.MERGE, + ) + retriever.run(query="test", filters={"field": "lang", "operator": "==", "value": "en"}) + + call_filters = mock_store._hybrid_retrieval_inference.call_args.kwargs["filters"] + assert call_filters is not None + + +@pytest.mark.asyncio +async def test_run_async(): + mock_store = Mock(spec=ElasticsearchDocumentStore) + mock_store._hybrid_retrieval_inference_async = AsyncMock(return_value=[Document(content="Async result")]) + retriever = ElasticsearchInferenceHybridRetriever(document_store=mock_store, inference_id=".elser_model_2") + result = await retriever.run_async(query="test query") + + mock_store._hybrid_retrieval_inference_async.assert_awaited_once_with( + query="test query", + inference_id=".elser_model_2", + filters={}, + fuzziness="AUTO", + top_k=10, + rank_window_size=100, + rank_constant=60, + ) + assert len(result["documents"]) == 1 + assert result["documents"][0].content == "Async result" + + +# --- Integration tests --- + + +def _index_documents_with_inference(client, index: str, inference_id: str, docs: list[dict]) -> None: + response = client.inference.inference( + inference_id=inference_id, + input=[doc["content"] for doc in docs], + ) + embeddings = [item["embedding"] for item in response["sparse_embedding"]] + for doc, sparse_embedding in zip(docs, embeddings, strict=False): + body: dict = {"id": doc["id"], "content": doc["content"], "sparse_vec": sparse_embedding} + body.update(doc.get("meta", {})) + client.index(index=index, id=doc["id"], body=body) + client.indices.refresh(index=index) + + +@pytest.mark.integration +class TestElasticsearchInferenceHybridRetrieverIntegration: + """ + End-to-end tests against a real Elastic Cloud cluster with a deployed ELSER endpoint. + Run with: pytest -m integration + """ + + def test_retrieval_returns_documents(self, hybrid_inference_document_store): + store, inference_id = hybrid_inference_document_store + retriever = ElasticsearchInferenceHybridRetriever( + document_store=store, inference_id=inference_id, top_k=2 + ) + _index_documents_with_inference( + store.client, + store._index, + inference_id, + [ + {"id": "1", "content": "The Eiffel Tower is a famous landmark in Paris, France."}, + {"id": "2", "content": "The Amazon rainforest covers most of the Amazon basin in South America."}, + {"id": "3", "content": "Mount Fuji is the highest mountain in Japan."}, + ], + ) + + result = retriever.run(query="famous tower in France") + + assert 0 < len(result["documents"]) <= 2 + assert all(isinstance(doc, Document) for doc in result["documents"]) + + def test_most_relevant_document_ranks_first(self, hybrid_inference_document_store): + store, inference_id = hybrid_inference_document_store + retriever = ElasticsearchInferenceHybridRetriever( + document_store=store, inference_id=inference_id, top_k=3 + ) + _index_documents_with_inference( + store.client, + store._index, + inference_id, + [ + {"id": "1", "content": "The Eiffel Tower is a famous landmark in Paris, France."}, + {"id": "2", "content": "The Amazon rainforest covers most of the Amazon basin in South America."}, + {"id": "3", "content": "Mount Fuji is the highest mountain in Japan."}, + ], + ) + + result = retriever.run(query="famous tower in France") + + assert len(result["documents"]) > 0 + assert "Eiffel" in result["documents"][0].content + + def test_respects_top_k(self, hybrid_inference_document_store): + store, inference_id = hybrid_inference_document_store + retriever = ElasticsearchInferenceHybridRetriever( + document_store=store, inference_id=inference_id, top_k=1 + ) + _index_documents_with_inference( + store.client, + store._index, + inference_id, + [ + {"id": "1", "content": "The Eiffel Tower is a famous landmark in Paris, France."}, + {"id": "2", "content": "The Amazon rainforest covers most of the Amazon basin in South America."}, + ], + ) + + result = retriever.run(query="famous landmark") + + assert len(result["documents"]) == 1 + + def test_filter_applied_to_both_retrievers(self, hybrid_inference_document_store): + store, inference_id = hybrid_inference_document_store + retriever = ElasticsearchInferenceHybridRetriever( + document_store=store, inference_id=inference_id, top_k=5 + ) + _index_documents_with_inference( + store.client, + store._index, + inference_id, + [ + {"id": "1", "content": "The Eiffel Tower is a famous landmark in Paris, France.", "category": "europe"}, + {"id": "2", "content": "The Amazon rainforest covers most of the Amazon basin.", "category": "america"}, + ], + ) + + result = retriever.run( + query="famous landmark", + filters={"field": "category", "operator": "==", "value": "europe"}, + ) + + assert all(doc.meta.get("category") == "europe" for doc in result["documents"]) + + def test_single_elasticsearch_request(self, hybrid_inference_document_store): + """Only one search call should be made — confirms server-side RRF, not client-side fusion.""" + store, inference_id = hybrid_inference_document_store + _index_documents_with_inference( + store.client, + store._index, + inference_id, + [{"id": "1", "content": "The Eiffel Tower is in Paris."}], + ) + retriever = ElasticsearchInferenceHybridRetriever( + document_store=store, inference_id=inference_id, top_k=1 + ) + + original_search = store.client.search + call_count = 0 + + def counting_search(**kwargs): + nonlocal call_count + call_count += 1 + return original_search(**kwargs) + + with patch.object(store.client, "search", side_effect=counting_search): + retriever.run(query="tower in France") + + assert call_count == 1, f"Expected 1 search call (server-side RRF), got {call_count}" diff --git a/integrations/elasticsearch/tests/test_sparse_embedding_retriever.py b/integrations/elasticsearch/tests/test_sparse_embedding_retriever.py index 3f0831b56b..5dcb8c2264 100644 --- a/integrations/elasticsearch/tests/test_sparse_embedding_retriever.py +++ b/integrations/elasticsearch/tests/test_sparse_embedding_retriever.py @@ -65,6 +65,7 @@ def test_to_dict(_mock_elasticsearch_client): "index": "default", "embedding_similarity_function": "cosine", "sparse_vector_field": "sparse_vec", + "ingest_pipeline": None, }, "type": "haystack_integrations.document_stores.elasticsearch.document_store.ElasticsearchDocumentStore", },