diff --git a/integrations/opensearch/src/haystack_integrations/components/retrievers/opensearch/bm25_retriever.py b/integrations/opensearch/src/haystack_integrations/components/retrievers/opensearch/bm25_retriever.py index 462990f11d..14f1722e1d 100644 --- a/integrations/opensearch/src/haystack_integrations/components/retrievers/opensearch/bm25_retriever.py +++ b/integrations/opensearch/src/haystack_integrations/components/retrievers/opensearch/bm25_retriever.py @@ -13,6 +13,8 @@ from haystack_integrations.document_stores.opensearch import OpenSearchDocumentStore +from .utils import _resolve_document_store + logger = logging.getLogger(__name__) @@ -268,13 +270,7 @@ def run( custom_query=custom_query, ) - if document_store is not None: - if not isinstance(document_store, OpenSearchDocumentStore): - msg = "document_store must be an instance of OpenSearchDocumentStore" - raise ValueError(msg) - doc_store = document_store - else: - doc_store = self._document_store + doc_store = _resolve_document_store(document_store, self._document_store) try: docs = doc_store._bm25_retrieval(**bm25_args) # example for BM25Retriever @@ -335,13 +331,7 @@ async def run_async( # pylint: disable=too-many-positional-arguments custom_query=custom_query, ) - if document_store is not None: - if not isinstance(document_store, OpenSearchDocumentStore): - msg = "document_store must be an instance of OpenSearchDocumentStore" - raise ValueError(msg) - doc_store = document_store - else: - doc_store = self._document_store + doc_store = _resolve_document_store(document_store, self._document_store) try: docs = await doc_store._bm25_retrieval_async(**bm25_args) diff --git a/integrations/opensearch/src/haystack_integrations/components/retrievers/opensearch/embedding_retriever.py b/integrations/opensearch/src/haystack_integrations/components/retrievers/opensearch/embedding_retriever.py index 74d93cc1fa..3418299684 100644 --- a/integrations/opensearch/src/haystack_integrations/components/retrievers/opensearch/embedding_retriever.py +++ b/integrations/opensearch/src/haystack_integrations/components/retrievers/opensearch/embedding_retriever.py @@ -13,6 +13,8 @@ from haystack_integrations.document_stores.opensearch import OpenSearchDocumentStore +from .utils import _resolve_document_store + logger = logging.getLogger(__name__) @@ -257,13 +259,7 @@ def run( docs: list[Document] = [] - if document_store is not None: - if not isinstance(document_store, OpenSearchDocumentStore): - msg = "document_store must be an instance of OpenSearchDocumentStore" - raise ValueError(msg) - doc_store = document_store - else: - doc_store = self._document_store + doc_store = _resolve_document_store(document_store, self._document_store) try: docs = doc_store._embedding_retrieval( @@ -383,13 +379,7 @@ async def run_async( docs: list[Document] = [] - if document_store is not None: - if not isinstance(document_store, OpenSearchDocumentStore): - msg = "document_store must be an instance of OpenSearchDocumentStore" - raise ValueError(msg) - doc_store = document_store - else: - doc_store = self._document_store + doc_store = _resolve_document_store(document_store, self._document_store) try: docs = await doc_store._embedding_retrieval_async( diff --git a/integrations/opensearch/src/haystack_integrations/components/retrievers/opensearch/metadata_retriever.py b/integrations/opensearch/src/haystack_integrations/components/retrievers/opensearch/metadata_retriever.py index 0f9871335e..48d5c630ec 100644 --- a/integrations/opensearch/src/haystack_integrations/components/retrievers/opensearch/metadata_retriever.py +++ b/integrations/opensearch/src/haystack_integrations/components/retrievers/opensearch/metadata_retriever.py @@ -9,6 +9,8 @@ from haystack_integrations.document_stores.opensearch import OpenSearchDocumentStore +from .utils import _resolve_document_store + logger = logging.getLogger(__name__) @@ -252,10 +254,7 @@ def run( # Returns: {"metadata": [{"category": "Python", "status": "active", "priority": 1}]} ``` """ - doc_store = document_store or self._document_store - if not isinstance(doc_store, OpenSearchDocumentStore): - msg = "document_store must be an instance of OpenSearchDocumentStore" - raise ValueError(msg) + doc_store = _resolve_document_store(document_store, self._document_store) fields_to_use = metadata_fields if metadata_fields is not None else self._metadata_fields top_k_to_use = top_k if top_k is not None else self._top_k @@ -366,10 +365,7 @@ async def run_async( # Returns: {"metadata": [{"category": "Python", "status": "active", "priority": 1}]} ``` """ - doc_store = document_store or self._document_store - if not isinstance(doc_store, OpenSearchDocumentStore): - msg = "document_store must be an instance of OpenSearchDocumentStore" - raise ValueError(msg) + doc_store = _resolve_document_store(document_store, self._document_store) fields_to_use = metadata_fields if metadata_fields is not None else self._metadata_fields top_k_to_use = top_k if top_k is not None else self._top_k diff --git a/integrations/opensearch/src/haystack_integrations/components/retrievers/opensearch/sql_retriever.py b/integrations/opensearch/src/haystack_integrations/components/retrievers/opensearch/sql_retriever.py index a761809197..6989ed8b23 100644 --- a/integrations/opensearch/src/haystack_integrations/components/retrievers/opensearch/sql_retriever.py +++ b/integrations/opensearch/src/haystack_integrations/components/retrievers/opensearch/sql_retriever.py @@ -8,6 +8,8 @@ from haystack_integrations.document_stores.opensearch import OpenSearchDocumentStore +from .utils import _resolve_document_store + logger = logging.getLogger(__name__) @@ -108,13 +110,7 @@ def run( # For aggregate queries: result["result"]["aggregations"] contains aggregations ``` """ - if document_store is not None: - if not isinstance(document_store, OpenSearchDocumentStore): - msg = "document_store must be an instance of OpenSearchDocumentStore" - raise ValueError(msg) - doc_store = document_store - else: - doc_store = self._document_store + doc_store = _resolve_document_store(document_store, self._document_store) fetch_size = fetch_size if fetch_size is not None else self._fetch_size @@ -163,13 +159,7 @@ async def run_async( # For aggregate queries: result["result"]["aggregations"] contains aggregations ``` """ - if document_store is not None: - if not isinstance(document_store, OpenSearchDocumentStore): - msg = "document_store must be an instance of OpenSearchDocumentStore" - raise ValueError(msg) - doc_store = document_store - else: - doc_store = self._document_store + doc_store = _resolve_document_store(document_store, self._document_store) fetch_size = fetch_size if fetch_size is not None else self._fetch_size diff --git a/integrations/opensearch/src/haystack_integrations/components/retrievers/opensearch/utils.py b/integrations/opensearch/src/haystack_integrations/components/retrievers/opensearch/utils.py new file mode 100644 index 0000000000..dbce8f7054 --- /dev/null +++ b/integrations/opensearch/src/haystack_integrations/components/retrievers/opensearch/utils.py @@ -0,0 +1,23 @@ +# SPDX-FileCopyrightText: 2023-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + + +from haystack_integrations.document_stores.opensearch import OpenSearchDocumentStore + + +def _resolve_document_store( + runtime_document_store: OpenSearchDocumentStore | None, + default_document_store: OpenSearchDocumentStore, +) -> OpenSearchDocumentStore: + """ + Return the runtime document store if provided and valid, otherwise the default one. + + :raises ValueError: If `runtime_document_store` is not None and not an OpenSearchDocumentStore. + """ + if runtime_document_store is None: + return default_document_store + if not isinstance(runtime_document_store, OpenSearchDocumentStore): + msg = "document_store must be an instance of OpenSearchDocumentStore" + raise ValueError(msg) + return runtime_document_store diff --git a/integrations/opensearch/tests/conftest.py b/integrations/opensearch/tests/conftest.py index 30a5f14325..8fc6677ed3 100644 --- a/integrations/opensearch/tests/conftest.py +++ b/integrations/opensearch/tests/conftest.py @@ -6,6 +6,13 @@ from haystack_integrations.document_stores.opensearch.document_store import OpenSearchDocumentStore +COMMON_KWARGS = { + "hosts": ["https://localhost:9200"], + "http_auth": ("admin", "SecureHaystack!2026"), + "verify_certs": False, +} +DEFAULT_METHOD = {"space_type": "cosinesimil", "engine": "lucene", "name": "hnsw"} + def _get_unique_index_name() -> str: """ @@ -17,178 +24,75 @@ def _get_unique_index_name() -> str: @pytest.fixture -def document_store(): - """ - OpenSearch document store instance. - Used by document_store and by TestDocumentStore to override the base test class fixture. - """ - hosts = ["https://localhost:9200"] - index = _get_unique_index_name() - - store = OpenSearchDocumentStore( - hosts=hosts, - index=index, - http_auth=("admin", "SecureHaystack!2026"), - verify_certs=False, - embedding_dim=768, - return_embedding=True, - method={"space_type": "cosinesimil", "engine": "lucene", "name": "hnsw"}, - ) - store._ensure_initialized() - yield store - - asyncio.run(store._ensure_initialized_async()) - assert store._client - assert store._async_client - store._client.indices.delete(index=index, params={"ignore": [400, 404]}) - asyncio.run(store._async_client.close()) +def opensearch_store(): + created: list[OpenSearchDocumentStore] = [] + + def _make(**overrides) -> OpenSearchDocumentStore: + kwargs = { + **COMMON_KWARGS, + "index": _get_unique_index_name(), + "embedding_dim": 768, + "return_embedding": True, + "method": DEFAULT_METHOD, + **overrides, + } + store = OpenSearchDocumentStore(**kwargs) + store._ensure_initialized() + created.append(store) + return store + + yield _make + + for store in created: + asyncio.run(store._ensure_initialized_async()) + store._client.indices.delete(index=store._index, params={"ignore": [400, 404]}) + asyncio.run(store._async_client.close()) @pytest.fixture -def document_store_2(): - hosts = ["https://localhost:9200"] - index = f"test_index_2_{_get_unique_index_name()}" +def document_store(opensearch_store): + return opensearch_store() - store = OpenSearchDocumentStore( - hosts=hosts, - index=index, - http_auth=("admin", "SecureHaystack!2026"), - verify_certs=False, - embedding_dim=768, - return_embedding=False, - method={"space_type": "cosinesimil", "engine": "lucene", "name": "hnsw"}, - ) - yield store - # Cleanup - store._ensure_initialized() - asyncio.run(store._ensure_initialized_async()) - assert store._client - assert store._async_client - store._client.indices.delete(index=index, params={"ignore": [400, 404]}) - asyncio.run(store._async_client.close()) +@pytest.fixture +def document_store_2(opensearch_store): + return opensearch_store(return_embedding=False) @pytest.fixture -def document_store_readonly(): - """ - A document store that does not automatically create the underlying index. - """ - hosts = ["https://localhost:9200"] - # Use a different index for each test so we can run them in parallel - index = _get_unique_index_name() - - store = OpenSearchDocumentStore( - hosts=hosts, - index=index, - http_auth=("admin", "SecureHaystack!2026"), - verify_certs=False, - embedding_dim=768, - method={"space_type": "cosinesimil", "engine": "lucene", "name": "hnsw"}, - create_index=False, - ) - store._ensure_initialized() - asyncio.run(store._ensure_initialized_async()) - assert store._client - assert store._async_client +def document_store_readonly(opensearch_store): + store = opensearch_store(create_index=False) store._client.cluster.put_settings(body={"transient": {"action.auto_create_index": False}}) yield store - store._client.cluster.put_settings(body={"transient": {"action.auto_create_index": True}}) - store._client.indices.delete(index=index, params={"ignore": [400, 404]}) @pytest.fixture -def document_store_embedding_dim_4_no_emb_returned(): - """ - A document store with embedding dimension 4 that does not return embeddings. - """ - hosts = ["https://localhost:9200"] - index = _get_unique_index_name() - - store = OpenSearchDocumentStore( - hosts=hosts, - index=index, - http_auth=("admin", "SecureHaystack!2026"), - verify_certs=False, - embedding_dim=4, - return_embedding=False, - method={"space_type": "cosinesimil", "engine": "lucene", "name": "hnsw"}, - ) - yield store - - store._client.indices.delete(index=index, params={"ignore": [400, 404]}) +def document_store_embedding_dim_4_no_emb_returned(opensearch_store): + return opensearch_store(embedding_dim=4, return_embedding=False) @pytest.fixture -def document_store_embedding_dim_4_no_emb_returned_faiss(): +def document_store_embedding_dim_4_no_emb_returned_faiss(opensearch_store): """ A document store with embedding dimension 4 that uses a FAISS engine with HNSW algorithm for vector search. We use this document store for testing efficient k-NN filtering according to https://opensearch.org/docs/latest/vector-search/filter-search-knn/efficient-knn-filtering/. """ - hosts = ["https://localhost:9200"] - index = _get_unique_index_name() - - store = OpenSearchDocumentStore( - hosts=hosts, - index=index, - http_auth=("admin", "SecureHaystack!2026"), - verify_certs=False, + return opensearch_store( embedding_dim=4, method={"space_type": "innerproduct", "engine": "faiss", "name": "hnsw"}, ) - yield store - - store._client.indices.delete(index=index, params={"ignore": [400, 404]}) @pytest.fixture -def document_store_nested(): - """ - OpenSearch document store with explicit nested fields. - """ - hosts = ["https://localhost:9200"] - index = _get_unique_index_name() - - store = OpenSearchDocumentStore( - hosts=hosts, - index=index, - http_auth=("admin", "SecureHaystack!2026"), - verify_certs=False, - embedding_dim=768, - return_embedding=False, - nested_fields=["refs", "tags"], - ) - store._ensure_initialized() - yield store - - assert store._client - store._client.indices.delete(index=index, params={"ignore": [400, 404]}) +def document_store_nested(opensearch_store): + return opensearch_store(return_embedding=False, nested_fields=["refs", "tags"]) @pytest.fixture -def document_store_wildcard_nested(): - """ - OpenSearch document store with wildcard nested field auto-detection. - """ - hosts = ["https://localhost:9200"] - index = _get_unique_index_name() - - store = OpenSearchDocumentStore( - hosts=hosts, - index=index, - http_auth=("admin", "SecureHaystack!2026"), - verify_certs=False, - embedding_dim=768, - return_embedding=False, - nested_fields="*", - ) - store._ensure_initialized() - yield store - - assert store._client - store._client.indices.delete(index=index, params={"ignore": [400, 404]}) +def document_store_wildcard_nested(opensearch_store): + return opensearch_store(return_embedding=False, nested_fields="*") @pytest.fixture diff --git a/integrations/opensearch/tests/test_auth.py b/integrations/opensearch/tests/test_auth.py index 393e374e91..bc5e26ed90 100644 --- a/integrations/opensearch/tests/test_auth.py +++ b/integrations/opensearch/tests/test_auth.py @@ -5,10 +5,16 @@ from unittest.mock import Mock, patch import pytest +from botocore.exceptions import BotoCoreError from haystack.utils.auth import Secret from opensearchpy import AWSV4SignerAsyncAuth, Urllib3AWSV4SignerAuth -from haystack_integrations.document_stores.opensearch.auth import AsyncAWSAuth, AWSAuth +from haystack_integrations.document_stores.opensearch.auth import ( + AsyncAWSAuth, + AWSAuth, + AWSConfigurationError, + _get_aws_session, +) from haystack_integrations.document_stores.opensearch.document_store import ( DEFAULT_MAX_CHUNK_BYTES, OpenSearchDocumentStore, @@ -131,6 +137,17 @@ def test_call_async(self, _get_aws_v4_signer_auth): async_aws_auth(method="GET", url="http://some.url", body="some body", headers={"Host": "localhost"}) signer_auth_mock.assert_called_once_with("GET", "http://some.url", "some body", {"Host": "localhost"}) + def test_get_aws_session_wraps_boto_core_error(self, mock_boto3_session): + mock_boto3_session.side_effect = BotoCoreError() + with pytest.raises(AWSConfigurationError, match="Failed to initialize the session"): + _get_aws_session(aws_access_key_id="x", aws_secret_access_key="y") + + @patch("haystack_integrations.document_stores.opensearch.auth.Urllib3AWSV4SignerAuth") + def test_get_aws_v4_signer_auth_wraps_exceptions(self, mock_signer): + mock_signer.side_effect = RuntimeError("signer creation failed") + with pytest.raises(AWSConfigurationError, match="Could not connect to AWS OpenSearch"): + AWSAuth() + def test_async_aws_auth_init(self): data = { "type": "haystack_integrations.document_stores.opensearch.auth.AWSAuth", diff --git a/integrations/opensearch/tests/test_bm25_retriever.py b/integrations/opensearch/tests/test_bm25_retriever.py index 5181db75e6..798493a5bf 100644 --- a/integrations/opensearch/tests/test_bm25_retriever.py +++ b/integrations/opensearch/tests/test_bm25_retriever.py @@ -326,6 +326,22 @@ def test_run_ignore_errors(caplog): assert "Some error" in caplog.text +@pytest.mark.asyncio +async def test_run_async_ignore_errors(caplog): + mock_store = Mock(spec=OpenSearchDocumentStore) + mock_store._bm25_retrieval_async.side_effect = Exception("Some error") + retriever = OpenSearchBM25Retriever(document_store=mock_store, raise_on_failure=False) + res = await retriever.run_async(query="some query") + assert len(res) == 1 + assert res["documents"] == [] + assert "Some error" in caplog.text + + +def test_init_raises_on_invalid_document_store(): + with pytest.raises(ValueError, match="document_store must be an instance of OpenSearchDocumentStore"): + OpenSearchBM25Retriever(document_store="not a document store") + + def test_run_with_runtime_document_store(): # initial document store initial_store = Mock(spec=OpenSearchDocumentStore) diff --git a/integrations/opensearch/tests/test_embedding_retriever.py b/integrations/opensearch/tests/test_embedding_retriever.py index dc35a1ed9d..2b84fedd0b 100644 --- a/integrations/opensearch/tests/test_embedding_retriever.py +++ b/integrations/opensearch/tests/test_embedding_retriever.py @@ -283,6 +283,22 @@ def test_run_ignore_errors(caplog): assert "Some error" in caplog.text +@pytest.mark.asyncio +async def test_run_async_ignore_errors(caplog): + mock_store = Mock(spec=OpenSearchDocumentStore) + mock_store._embedding_retrieval_async.side_effect = Exception("Some error") + retriever = OpenSearchEmbeddingRetriever(document_store=mock_store, raise_on_failure=False) + res = await retriever.run_async(query_embedding=[0.5, 0.7]) + assert len(res) == 1 + assert res["documents"] == [] + assert "Some error" in caplog.text + + +def test_init_raises_on_invalid_document_store(): + with pytest.raises(ValueError, match="document_store must be an instance of OpenSearchDocumentStore"): + OpenSearchEmbeddingRetriever(document_store="not a document store") + + def test_run_with_runtime_document_store(): """Test that runtime document store switching works correctly.""" # Setup initial document store diff --git a/integrations/opensearch/tests/test_open_search_hybrid_retriever.py b/integrations/opensearch/tests/test_open_search_hybrid_retriever.py index 201eb06044..6fcf76358b 100644 --- a/integrations/opensearch/tests/test_open_search_hybrid_retriever.py +++ b/integrations/opensearch/tests/test_open_search_hybrid_retriever.py @@ -142,6 +142,14 @@ def test_from_dict_with_extra_args(self): assert isinstance(hybrid, OpenSearchHybridRetriever) assert hybrid.to_dict() + def test_from_dict_without_optional_keys(self): + data = deepcopy(self.serialised) + del data["init_parameters"]["filter_policy_bm25"] + del data["init_parameters"]["filter_policy_embedding"] + del data["init_parameters"]["join_mode"] + hybrid = OpenSearchHybridRetriever.from_dict(data) + assert isinstance(hybrid, OpenSearchHybridRetriever) + def test_run(self, mock_embedder): # mocked document store mock_store = Mock(spec=OpenSearchDocumentStore) diff --git a/integrations/opensearch/tests/test_utils.py b/integrations/opensearch/tests/test_utils.py new file mode 100644 index 0000000000..c3539d4d19 --- /dev/null +++ b/integrations/opensearch/tests/test_utils.py @@ -0,0 +1,27 @@ +# SPDX-FileCopyrightText: 2023-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +from unittest.mock import Mock + +import pytest + +from haystack_integrations.components.retrievers.opensearch.utils import _resolve_document_store +from haystack_integrations.document_stores.opensearch import OpenSearchDocumentStore + + +def test_resolve_document_store_returns_default_when_runtime_is_none(): + default = Mock(spec=OpenSearchDocumentStore) + assert _resolve_document_store(None, default) is default + + +def test_resolve_document_store_returns_runtime_when_valid(): + default = Mock(spec=OpenSearchDocumentStore) + runtime = Mock(spec=OpenSearchDocumentStore) + assert _resolve_document_store(runtime, default) is runtime + + +def test_resolve_document_store_raises_on_invalid_runtime(): + default = Mock(spec=OpenSearchDocumentStore) + with pytest.raises(ValueError, match="document_store must be an instance of OpenSearchDocumentStore"): + _resolve_document_store("not a document store", default)