diff --git a/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py b/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py index 8e95e1d47f..fd5670a7c5 100644 --- a/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py +++ b/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py @@ -774,6 +774,8 @@ def delete_all_documents(self, recreate_index: bool = False, refresh: bool = Tru :param recreate_index: If True, the index will be deleted and recreated with the original mappings and settings. If False, all documents will be deleted using the `delete_by_query` API. + ``recreate_index=True`` is not supported when the configured index name is an alias; a + :class:`haystack.document_stores.errors.DocumentStoreError` is raised in that case. :param refresh: If True, OpenSearch refreshes all shards involved in the delete by query after the request completes. If False, no refresh is performed. For more details, see the [OpenSearch delete_by_query refresh documentation](https://opensearch.org/docs/latest/api-reference/document-apis/delete-by-query/). @@ -784,11 +786,18 @@ def delete_all_documents(self, recreate_index: bool = False, refresh: bool = Tru try: if recreate_index: # get the current index mappings and settings - index_name = self._index index_info = self._client.indices.get(index=self._index) + actual_index = next(iter(index_info)) + if actual_index != self._index: + msg = ( + f"Cannot recreate index '{self._index}' because it is an alias pointing to " + f"'{actual_index}'. Use recreate_index=False to delete all documents via " + f"delete_by_query, or operate directly on the concrete index." + ) + raise DocumentStoreError(msg) body = { - "mappings": index_info[index_name]["mappings"], - "settings": index_info[index_name]["settings"], + "mappings": index_info[actual_index]["mappings"], + "settings": index_info[actual_index]["settings"], } body["settings"]["index"].pop("uuid", None) body["settings"]["index"].pop("creation_date", None) @@ -818,6 +827,8 @@ async def delete_all_documents_async(self, recreate_index: bool = False, refresh :param recreate_index: If True, the index will be deleted and recreated with the original mappings and settings. If False, all documents will be deleted using the `delete_by_query` API. + ``recreate_index=True`` is not supported when the configured index name is an alias; a + :class:`haystack.document_stores.errors.DocumentStoreError` is raised in that case. :param refresh: If True, OpenSearch refreshes all shards involved in the delete by query after the request completes. If False, no refresh is performed. For more details, see the [OpenSearch delete_by_query refresh documentation](https://opensearch.org/docs/latest/api-reference/document-apis/delete-by-query/). @@ -828,11 +839,18 @@ async def delete_all_documents_async(self, recreate_index: bool = False, refresh try: if recreate_index: # get the current index mappings and settings - index_name = self._index index_info = await self._async_client.indices.get(index=self._index) + actual_index = next(iter(index_info)) + if actual_index != self._index: + msg = ( + f"Cannot recreate index '{self._index}' because it is an alias pointing to " + f"'{actual_index}'. Use recreate_index=False to delete all documents via " + f"delete_by_query, or operate directly on the concrete index." + ) + raise DocumentStoreError(msg) body = { - "mappings": index_info[index_name]["mappings"], - "settings": index_info[index_name]["settings"], + "mappings": index_info[actual_index]["mappings"], + "settings": index_info[actual_index]["settings"], } body["settings"]["index"].pop("uuid", None) body["settings"]["index"].pop("creation_date", None) @@ -1817,7 +1835,8 @@ def count_unique_metadata_by_filter(self, filters: dict[str, Any], metadata_fiel # use index mapping to get all fields mapping = self._client.indices.get_mapping(index=self._index) - index_mapping = mapping[self._index]["mappings"]["properties"] + actual_index = next(iter(mapping)) + index_mapping = mapping[actual_index]["mappings"]["properties"] # normalize field names normalized_metadata_fields = [_normalize_metadata_field_name(field) for field in metadata_fields] @@ -1860,7 +1879,8 @@ async def count_unique_metadata_by_filter_async( # use index mapping to get all fields mapping = await self._async_client.indices.get_mapping(index=self._index) - index_mapping = mapping[self._index]["mappings"]["properties"] + actual_index = next(iter(mapping)) + index_mapping = mapping[actual_index]["mappings"]["properties"] # normalize field names normalized_metadata_fields = [_normalize_metadata_field_name(field) for field in metadata_fields] @@ -1912,7 +1932,8 @@ def get_metadata_fields_info(self) -> dict[str, dict[str, str]]: assert self._client is not None mapping = self._client.indices.get_mapping(index=self._index) - index_mapping = mapping[self._index]["mappings"]["properties"] + actual_index = next(iter(mapping)) + index_mapping = mapping[actual_index]["mappings"]["properties"] # remove all fields that are not metadata fields index_mapping = {k: v for k, v in index_mapping.items() if k not in SPECIAL_FIELDS} return index_mapping @@ -1945,7 +1966,8 @@ async def get_metadata_fields_info_async(self) -> dict[str, dict[str, str]]: assert self._async_client is not None mapping = await self._async_client.indices.get_mapping(index=self._index) - index_mapping = mapping[self._index]["mappings"]["properties"] + actual_index = next(iter(mapping)) + index_mapping = mapping[actual_index]["mappings"]["properties"] # remove all fields that are not metadata fields index_mapping = {k: v for k, v in index_mapping.items() if k not in SPECIAL_FIELDS} return index_mapping diff --git a/integrations/opensearch/tests/test_document_store.py b/integrations/opensearch/tests/test_document_store.py index 0053037319..238b78d6a0 100644 --- a/integrations/opensearch/tests/test_document_store.py +++ b/integrations/opensearch/tests/test_document_store.py @@ -692,6 +692,69 @@ async def test_ensure_index_exists_async_no_create_when_disabled(_mock_sync_clie mock_client.indices.get_mapping.assert_not_called() +@patch("haystack_integrations.document_stores.opensearch.document_store.OpenSearch") +def test_delete_all_documents_recreate_raises_for_alias(_mock_opensearch_client): + """delete_all_documents(recreate_index=True) raises DocumentStoreError when self._index is an alias.""" + store = OpenSearchDocumentStore(hosts="testhost", index="my-alias", http_auth=("a", "b")) + mock_client = MagicMock() + store._client = mock_client + # indices.get() returns a key that differs from self._index — signals an alias + mock_client.indices.get.return_value = {"my-real-index-v1": {"mappings": {}, "settings": {"index": {}}}} + + with pytest.raises(DocumentStoreError, match="is an alias"): + store.delete_all_documents(recreate_index=True) + + mock_client.indices.delete.assert_not_called() + mock_client.indices.create.assert_not_called() + + +@patch("haystack_integrations.document_stores.opensearch.document_store.OpenSearch") +def test_delete_all_documents_recreate_works_for_concrete_index(_mock_opensearch_client): + """delete_all_documents(recreate_index=True) proceeds normally when self._index is a concrete index.""" + store = OpenSearchDocumentStore(hosts="testhost", index="my-index", http_auth=("a", "b")) + mock_client = MagicMock() + store._client = mock_client + mock_client.indices.get.return_value = {"my-index": {"mappings": {}, "settings": {"index": {}}}} + + store.delete_all_documents(recreate_index=True) + + mock_client.indices.delete.assert_called_once_with(index="my-index") + mock_client.indices.create.assert_called_once() + + +@pytest.mark.asyncio +@patch("haystack_integrations.document_stores.opensearch.document_store.AsyncOpenSearch") +@patch("haystack_integrations.document_stores.opensearch.document_store.OpenSearch") +async def test_delete_all_documents_async_recreate_raises_for_alias(_mock_sync_client, _mock_async_client): + """delete_all_documents_async(recreate_index=True) raises DocumentStoreError when self._index is an alias.""" + store = OpenSearchDocumentStore(hosts="testhost", index="my-alias", http_auth=("a", "b")) + mock_client = AsyncMock() + store._async_client = mock_client + mock_client.indices.get = AsyncMock(return_value={"my-real-index-v1": {"mappings": {}, "settings": {"index": {}}}}) + + with pytest.raises(DocumentStoreError, match="is an alias"): + await store.delete_all_documents_async(recreate_index=True) + + mock_client.indices.delete.assert_not_called() + mock_client.indices.create.assert_not_called() + + +@pytest.mark.asyncio +@patch("haystack_integrations.document_stores.opensearch.document_store.AsyncOpenSearch") +@patch("haystack_integrations.document_stores.opensearch.document_store.OpenSearch") +async def test_delete_all_documents_async_recreate_works_for_concrete_index(_mock_sync_client, _mock_async_client): + """delete_all_documents_async(recreate_index=True) proceeds normally when self._index is a concrete index.""" + store = OpenSearchDocumentStore(hosts="testhost", index="my-index", http_auth=("a", "b")) + mock_client = AsyncMock() + store._async_client = mock_client + mock_client.indices.get = AsyncMock(return_value={"my-index": {"mappings": {}, "settings": {"index": {}}}}) + + await store.delete_all_documents_async(recreate_index=True) + + mock_client.indices.delete.assert_called_once_with(index="my-index") + mock_client.indices.create.assert_called_once() + + @pytest.mark.integration class TestDocumentStore( OpenSearchDocumentStoreTestMixin, @@ -1649,3 +1712,100 @@ def test_document_store_with_alias(self, document_store: OpenSearchDocumentStore assert results[0].content == "doc via alias" finally: client.indices.delete_alias(index=document_store._index, name=alias_name) + + def test_delete_all_documents_with_alias(self, document_store: OpenSearchDocumentStore): + """delete_all_documents(recreate_index=True) raises DocumentStoreError when self._index is an alias.""" + alias_name = f"alias_del_{document_store._index}" + client = document_store._client + client.indices.put_alias(index=document_store._index, name=alias_name) + try: + alias_store = OpenSearchDocumentStore( + hosts=["https://localhost:9200"], + http_auth=("admin", "SecureHaystack!2026"), + verify_certs=False, + index=alias_name, + embedding_dim=768, + ) + alias_store._ensure_initialized() + + with pytest.raises(DocumentStoreError, match="is an alias"): + alias_store.delete_all_documents(recreate_index=True) + finally: + client.indices.delete_alias(index=document_store._index, name=alias_name) + + def test_delete_all_documents_no_recreate_with_alias(self, document_store: OpenSearchDocumentStore): + """delete_all_documents(recreate_index=False) removes all documents when self._index is an alias.""" + alias_name = f"alias_del_norecr_{document_store._index}" + client = document_store._client + client.indices.put_alias(index=document_store._index, name=alias_name) + try: + alias_store = OpenSearchDocumentStore( + hosts=["https://localhost:9200"], + http_auth=("admin", "SecureHaystack!2026"), + verify_certs=False, + index=alias_name, + embedding_dim=768, + ) + alias_store._ensure_initialized() + alias_store.write_documents([Document(content="a"), Document(content="b")]) + assert alias_store.count_documents() == 2 + + alias_store.delete_all_documents(recreate_index=False) + + assert alias_store.count_documents() == 0 + assert alias_store.write_documents([Document(content="after delete")]) == 1 + finally: + client.indices.delete_alias(index=document_store._index, name=alias_name) + + def test_count_unique_metadata_by_filter_with_alias(self, document_store: OpenSearchDocumentStore): + """count_unique_metadata_by_filter reads the mapping via alias without KeyError.""" + alias_name = f"alias_cnt_{document_store._index}" + client = document_store._client + client.indices.put_alias(index=document_store._index, name=alias_name) + try: + alias_store = OpenSearchDocumentStore( + hosts=["https://localhost:9200"], + http_auth=("admin", "SecureHaystack!2026"), + verify_certs=False, + index=alias_name, + embedding_dim=768, + ) + alias_store._ensure_initialized() + alias_store.write_documents( + [ + Document(content="a", meta={"category": "X"}), + Document(content="b", meta={"category": "X"}), + Document(content="c", meta={"category": "Y"}), + ] + ) + + result = alias_store.count_unique_metadata_by_filter( + filters={"field": "meta.category", "operator": "in", "value": ["X", "Y"]}, + metadata_fields=["category"], + ) + assert result == {"category": 2} + finally: + client.indices.delete_alias(index=document_store._index, name=alias_name) + + def test_get_metadata_fields_info_with_alias(self, document_store: OpenSearchDocumentStore): + """get_metadata_fields_info reads the mapping via alias without KeyError.""" + alias_name = f"alias_mfi_{document_store._index}" + client = document_store._client + client.indices.put_alias(index=document_store._index, name=alias_name) + try: + alias_store = OpenSearchDocumentStore( + hosts=["https://localhost:9200"], + http_auth=("admin", "SecureHaystack!2026"), + verify_certs=False, + index=alias_name, + embedding_dim=768, + ) + alias_store._ensure_initialized() + alias_store.write_documents([Document(content="doc", meta={"category": "A", "count": 1})]) + + fields_info = alias_store.get_metadata_fields_info() + assert "category" in fields_info + assert fields_info["category"]["type"] == "keyword" + assert "count" in fields_info + finally: + client.indices.delete_alias(index=document_store._index, name=alias_name) diff --git a/integrations/opensearch/tests/test_document_store_async.py b/integrations/opensearch/tests/test_document_store_async.py index 08234f53dd..98b5e27aaa 100644 --- a/integrations/opensearch/tests/test_document_store_async.py +++ b/integrations/opensearch/tests/test_document_store_async.py @@ -653,3 +653,117 @@ async def test_document_store_async_with_alias(self, document_store: OpenSearchD client.indices.delete_alias(index=document_store._index, name=alias_name) if alias_store._async_client: await alias_store._async_client.close() + + @pytest.mark.asyncio + async def test_delete_all_documents_async_with_alias(self, document_store: OpenSearchDocumentStore): + """delete_all_documents_async(recreate_index=True) raises DocumentStoreError when self._index is an alias.""" + alias_name = f"alias_del_{document_store._index}" + client = document_store._client + client.indices.put_alias(index=document_store._index, name=alias_name) + alias_store = OpenSearchDocumentStore( + hosts=["https://localhost:9200"], + http_auth=("admin", "SecureHaystack!2026"), + verify_certs=False, + index=alias_name, + embedding_dim=768, + ) + try: + await alias_store._ensure_initialized_async() + + with pytest.raises(DocumentStoreError, match="is an alias"): + await alias_store.delete_all_documents_async(recreate_index=True) + + # delete_by_query path still works for aliases + await alias_store.write_documents_async([Document(content="x")]) + await alias_store.delete_all_documents_async(recreate_index=False) + assert await alias_store.count_documents_async() == 0 + finally: + client.indices.delete_alias(index=document_store._index, name=alias_name) + if alias_store._async_client: + await alias_store._async_client.close() + + @pytest.mark.asyncio + async def test_delete_all_documents_no_recreate_async_with_alias(self, document_store: OpenSearchDocumentStore): + """delete_all_documents_async(recreate_index=False) removes all documents when self._index is an alias.""" + alias_name = f"alias_del_norecr_{document_store._index}" + client = document_store._client + client.indices.put_alias(index=document_store._index, name=alias_name) + alias_store = OpenSearchDocumentStore( + hosts=["https://localhost:9200"], + http_auth=("admin", "SecureHaystack!2026"), + verify_certs=False, + index=alias_name, + embedding_dim=768, + ) + try: + await alias_store._ensure_initialized_async() + await alias_store.write_documents_async([Document(content="a"), Document(content="b")]) + assert await alias_store.count_documents_async() == 2 + + await alias_store.delete_all_documents_async(recreate_index=False) + + assert await alias_store.count_documents_async() == 0 + assert await alias_store.write_documents_async([Document(content="after delete")]) == 1 + finally: + client.indices.delete_alias(index=document_store._index, name=alias_name) + if alias_store._async_client: + await alias_store._async_client.close() + + @pytest.mark.asyncio + async def test_count_unique_metadata_by_filter_async_with_alias(self, document_store: OpenSearchDocumentStore): + """count_unique_metadata_by_filter_async reads the mapping via alias without KeyError.""" + alias_name = f"alias_cnt_{document_store._index}" + client = document_store._client + client.indices.put_alias(index=document_store._index, name=alias_name) + alias_store = OpenSearchDocumentStore( + hosts=["https://localhost:9200"], + http_auth=("admin", "SecureHaystack!2026"), + verify_certs=False, + index=alias_name, + embedding_dim=768, + ) + try: + await alias_store._ensure_initialized_async() + await alias_store.write_documents_async( + [ + Document(content="a", meta={"category": "X"}), + Document(content="b", meta={"category": "X"}), + Document(content="c", meta={"category": "Y"}), + ] + ) + + result = await alias_store.count_unique_metadata_by_filter_async( + filters={"field": "meta.category", "operator": "in", "value": ["X", "Y"]}, + metadata_fields=["category"], + ) + assert result == {"category": 2} + finally: + client.indices.delete_alias(index=document_store._index, name=alias_name) + if alias_store._async_client: + await alias_store._async_client.close() + + @pytest.mark.asyncio + async def test_get_metadata_fields_info_async_with_alias(self, document_store: OpenSearchDocumentStore): + """get_metadata_fields_info_async reads the mapping via alias without KeyError.""" + alias_name = f"alias_mfi_{document_store._index}" + client = document_store._client + client.indices.put_alias(index=document_store._index, name=alias_name) + alias_store = OpenSearchDocumentStore( + hosts=["https://localhost:9200"], + http_auth=("admin", "SecureHaystack!2026"), + verify_certs=False, + index=alias_name, + embedding_dim=768, + ) + try: + await alias_store._ensure_initialized_async() + await alias_store.write_documents_async([Document(content="doc", meta={"category": "A", "count": 1})]) + + fields_info = await alias_store.get_metadata_fields_info_async() + assert "category" in fields_info + assert fields_info["category"]["type"] == "keyword" + assert "count" in fields_info + finally: + client.indices.delete_alias(index=document_store._index, name=alias_name) + if alias_store._async_client: + await alias_store._async_client.close()