Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,8 @@ def delete_all_documents(self, recreate_index: bool = False, refresh: bool = Tru

:param recreate_index: If True, the index will be deleted and recreated with the original mappings and
settings. If False, all documents will be deleted using the `delete_by_query` API.
``recreate_index=True`` is not supported when the configured index name is an alias; a
:class:`haystack.document_stores.errors.DocumentStoreError` is raised in that case.
:param refresh: If True, OpenSearch refreshes all shards involved in the delete by query after the request
completes. If False, no refresh is performed. For more details, see the
[OpenSearch delete_by_query refresh documentation](https://opensearch.org/docs/latest/api-reference/document-apis/delete-by-query/).
Expand All @@ -784,11 +786,18 @@ def delete_all_documents(self, recreate_index: bool = False, refresh: bool = Tru
try:
if recreate_index:
# get the current index mappings and settings
index_name = self._index
index_info = self._client.indices.get(index=self._index)
actual_index = next(iter(index_info))
if actual_index != self._index:
msg = (
f"Cannot recreate index '{self._index}' because it is an alias pointing to "
f"'{actual_index}'. Use recreate_index=False to delete all documents via "
f"delete_by_query, or operate directly on the concrete index."
)
raise DocumentStoreError(msg)
body = {
"mappings": index_info[index_name]["mappings"],
"settings": index_info[index_name]["settings"],
"mappings": index_info[actual_index]["mappings"],
"settings": index_info[actual_index]["settings"],
}
body["settings"]["index"].pop("uuid", None)
body["settings"]["index"].pop("creation_date", None)
Expand Down Expand Up @@ -818,6 +827,8 @@ async def delete_all_documents_async(self, recreate_index: bool = False, refresh

:param recreate_index: If True, the index will be deleted and recreated with the original mappings and
settings. If False, all documents will be deleted using the `delete_by_query` API.
``recreate_index=True`` is not supported when the configured index name is an alias; a
:class:`haystack.document_stores.errors.DocumentStoreError` is raised in that case.
:param refresh: If True, OpenSearch refreshes all shards involved in the delete by query after the request
completes. If False, no refresh is performed. For more details, see the
[OpenSearch delete_by_query refresh documentation](https://opensearch.org/docs/latest/api-reference/document-apis/delete-by-query/).
Expand All @@ -828,11 +839,18 @@ async def delete_all_documents_async(self, recreate_index: bool = False, refresh
try:
if recreate_index:
# get the current index mappings and settings
index_name = self._index
index_info = await self._async_client.indices.get(index=self._index)
actual_index = next(iter(index_info))
if actual_index != self._index:
msg = (
f"Cannot recreate index '{self._index}' because it is an alias pointing to "
f"'{actual_index}'. Use recreate_index=False to delete all documents via "
f"delete_by_query, or operate directly on the concrete index."
)
raise DocumentStoreError(msg)
body = {
"mappings": index_info[index_name]["mappings"],
"settings": index_info[index_name]["settings"],
"mappings": index_info[actual_index]["mappings"],
"settings": index_info[actual_index]["settings"],
}
body["settings"]["index"].pop("uuid", None)
body["settings"]["index"].pop("creation_date", None)
Expand Down Expand Up @@ -1817,7 +1835,8 @@ def count_unique_metadata_by_filter(self, filters: dict[str, Any], metadata_fiel

# use index mapping to get all fields
mapping = self._client.indices.get_mapping(index=self._index)
index_mapping = mapping[self._index]["mappings"]["properties"]
actual_index = next(iter(mapping))
index_mapping = mapping[actual_index]["mappings"]["properties"]

# normalize field names
normalized_metadata_fields = [_normalize_metadata_field_name(field) for field in metadata_fields]
Expand Down Expand Up @@ -1860,7 +1879,8 @@ async def count_unique_metadata_by_filter_async(

# use index mapping to get all fields
mapping = await self._async_client.indices.get_mapping(index=self._index)
index_mapping = mapping[self._index]["mappings"]["properties"]
actual_index = next(iter(mapping))
index_mapping = mapping[actual_index]["mappings"]["properties"]

# normalize field names
normalized_metadata_fields = [_normalize_metadata_field_name(field) for field in metadata_fields]
Expand Down Expand Up @@ -1912,7 +1932,8 @@ def get_metadata_fields_info(self) -> dict[str, dict[str, str]]:
assert self._client is not None

mapping = self._client.indices.get_mapping(index=self._index)
index_mapping = mapping[self._index]["mappings"]["properties"]
actual_index = next(iter(mapping))
index_mapping = mapping[actual_index]["mappings"]["properties"]
# remove all fields that are not metadata fields
index_mapping = {k: v for k, v in index_mapping.items() if k not in SPECIAL_FIELDS}
return index_mapping
Expand Down Expand Up @@ -1945,7 +1966,8 @@ async def get_metadata_fields_info_async(self) -> dict[str, dict[str, str]]:
assert self._async_client is not None

mapping = await self._async_client.indices.get_mapping(index=self._index)
index_mapping = mapping[self._index]["mappings"]["properties"]
actual_index = next(iter(mapping))
index_mapping = mapping[actual_index]["mappings"]["properties"]
# remove all fields that are not metadata fields
index_mapping = {k: v for k, v in index_mapping.items() if k not in SPECIAL_FIELDS}
return index_mapping
Expand Down
160 changes: 160 additions & 0 deletions integrations/opensearch/tests/test_document_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,69 @@ async def test_ensure_index_exists_async_no_create_when_disabled(_mock_sync_clie
mock_client.indices.get_mapping.assert_not_called()


@patch("haystack_integrations.document_stores.opensearch.document_store.OpenSearch")
def test_delete_all_documents_recreate_raises_for_alias(_mock_opensearch_client):
"""delete_all_documents(recreate_index=True) raises DocumentStoreError when self._index is an alias."""
store = OpenSearchDocumentStore(hosts="testhost", index="my-alias", http_auth=("a", "b"))
mock_client = MagicMock()
store._client = mock_client
# indices.get() returns a key that differs from self._index — signals an alias
mock_client.indices.get.return_value = {"my-real-index-v1": {"mappings": {}, "settings": {"index": {}}}}

with pytest.raises(DocumentStoreError, match="is an alias"):
store.delete_all_documents(recreate_index=True)

mock_client.indices.delete.assert_not_called()
mock_client.indices.create.assert_not_called()


@patch("haystack_integrations.document_stores.opensearch.document_store.OpenSearch")
def test_delete_all_documents_recreate_works_for_concrete_index(_mock_opensearch_client):
"""delete_all_documents(recreate_index=True) proceeds normally when self._index is a concrete index."""
store = OpenSearchDocumentStore(hosts="testhost", index="my-index", http_auth=("a", "b"))
mock_client = MagicMock()
store._client = mock_client
mock_client.indices.get.return_value = {"my-index": {"mappings": {}, "settings": {"index": {}}}}

store.delete_all_documents(recreate_index=True)

mock_client.indices.delete.assert_called_once_with(index="my-index")
mock_client.indices.create.assert_called_once()


@pytest.mark.asyncio
@patch("haystack_integrations.document_stores.opensearch.document_store.AsyncOpenSearch")
@patch("haystack_integrations.document_stores.opensearch.document_store.OpenSearch")
async def test_delete_all_documents_async_recreate_raises_for_alias(_mock_sync_client, _mock_async_client):
"""delete_all_documents_async(recreate_index=True) raises DocumentStoreError when self._index is an alias."""
store = OpenSearchDocumentStore(hosts="testhost", index="my-alias", http_auth=("a", "b"))
mock_client = AsyncMock()
store._async_client = mock_client
mock_client.indices.get = AsyncMock(return_value={"my-real-index-v1": {"mappings": {}, "settings": {"index": {}}}})

with pytest.raises(DocumentStoreError, match="is an alias"):
await store.delete_all_documents_async(recreate_index=True)

mock_client.indices.delete.assert_not_called()
mock_client.indices.create.assert_not_called()


@pytest.mark.asyncio
@patch("haystack_integrations.document_stores.opensearch.document_store.AsyncOpenSearch")
@patch("haystack_integrations.document_stores.opensearch.document_store.OpenSearch")
async def test_delete_all_documents_async_recreate_works_for_concrete_index(_mock_sync_client, _mock_async_client):
"""delete_all_documents_async(recreate_index=True) proceeds normally when self._index is a concrete index."""
store = OpenSearchDocumentStore(hosts="testhost", index="my-index", http_auth=("a", "b"))
mock_client = AsyncMock()
store._async_client = mock_client
mock_client.indices.get = AsyncMock(return_value={"my-index": {"mappings": {}, "settings": {"index": {}}}})

await store.delete_all_documents_async(recreate_index=True)

mock_client.indices.delete.assert_called_once_with(index="my-index")
mock_client.indices.create.assert_called_once()


@pytest.mark.integration
class TestDocumentStore(
OpenSearchDocumentStoreTestMixin,
Expand Down Expand Up @@ -1649,3 +1712,100 @@ def test_document_store_with_alias(self, document_store: OpenSearchDocumentStore
assert results[0].content == "doc via alias"
finally:
client.indices.delete_alias(index=document_store._index, name=alias_name)

def test_delete_all_documents_with_alias(self, document_store: OpenSearchDocumentStore):
"""delete_all_documents(recreate_index=True) raises DocumentStoreError when self._index is an alias."""
alias_name = f"alias_del_{document_store._index}"
client = document_store._client
client.indices.put_alias(index=document_store._index, name=alias_name)
try:
alias_store = OpenSearchDocumentStore(
hosts=["https://localhost:9200"],
http_auth=("admin", "SecureHaystack!2026"),
verify_certs=False,
index=alias_name,
embedding_dim=768,
)
alias_store._ensure_initialized()

with pytest.raises(DocumentStoreError, match="is an alias"):
alias_store.delete_all_documents(recreate_index=True)
finally:
client.indices.delete_alias(index=document_store._index, name=alias_name)

def test_delete_all_documents_no_recreate_with_alias(self, document_store: OpenSearchDocumentStore):
"""delete_all_documents(recreate_index=False) removes all documents when self._index is an alias."""
alias_name = f"alias_del_norecr_{document_store._index}"
client = document_store._client
client.indices.put_alias(index=document_store._index, name=alias_name)
try:
alias_store = OpenSearchDocumentStore(
hosts=["https://localhost:9200"],
http_auth=("admin", "SecureHaystack!2026"),
verify_certs=False,
index=alias_name,
embedding_dim=768,
)
alias_store._ensure_initialized()
alias_store.write_documents([Document(content="a"), Document(content="b")])
assert alias_store.count_documents() == 2

alias_store.delete_all_documents(recreate_index=False)

assert alias_store.count_documents() == 0
assert alias_store.write_documents([Document(content="after delete")]) == 1
finally:
client.indices.delete_alias(index=document_store._index, name=alias_name)

def test_count_unique_metadata_by_filter_with_alias(self, document_store: OpenSearchDocumentStore):
"""count_unique_metadata_by_filter reads the mapping via alias without KeyError."""
alias_name = f"alias_cnt_{document_store._index}"
client = document_store._client
client.indices.put_alias(index=document_store._index, name=alias_name)
try:
alias_store = OpenSearchDocumentStore(
hosts=["https://localhost:9200"],
http_auth=("admin", "SecureHaystack!2026"),
verify_certs=False,
index=alias_name,
embedding_dim=768,
)
alias_store._ensure_initialized()
alias_store.write_documents(
[
Document(content="a", meta={"category": "X"}),
Document(content="b", meta={"category": "X"}),
Document(content="c", meta={"category": "Y"}),
]
)

result = alias_store.count_unique_metadata_by_filter(
filters={"field": "meta.category", "operator": "in", "value": ["X", "Y"]},
metadata_fields=["category"],
)
assert result == {"category": 2}
finally:
client.indices.delete_alias(index=document_store._index, name=alias_name)

def test_get_metadata_fields_info_with_alias(self, document_store: OpenSearchDocumentStore):
"""get_metadata_fields_info reads the mapping via alias without KeyError."""
alias_name = f"alias_mfi_{document_store._index}"
client = document_store._client
client.indices.put_alias(index=document_store._index, name=alias_name)
try:
alias_store = OpenSearchDocumentStore(
hosts=["https://localhost:9200"],
http_auth=("admin", "SecureHaystack!2026"),
verify_certs=False,
index=alias_name,
embedding_dim=768,
)
alias_store._ensure_initialized()
alias_store.write_documents([Document(content="doc", meta={"category": "A", "count": 1})])

fields_info = alias_store.get_metadata_fields_info()
assert "category" in fields_info
assert fields_info["category"]["type"] == "keyword"
assert "count" in fields_info
finally:
client.indices.delete_alias(index=document_store._index, name=alias_name)
Loading
Loading