Skip to content

Commit fc8b5c1

Browse files
fix(elasticsearch): fix async write/delete bugs and restore comments
Fix three async behavior deviations from the standard DocumentStore interface: - write_documents_async with DuplicatePolicy.FAIL now raises DuplicateDocumentError instead of DocumentStoreError (outer try/except was swallowing the inner DuplicateDocumentError) - write_documents_async with DuplicatePolicy.SKIP now returns 0 when skipping an existing document (errors are now categorized like the sync version: version_conflict_engine_exception with SKIP policy is silently ignored) - delete_documents_async no longer raises on non-existent IDs (adds raise_on_error=False to match the sync version behaviour) Also restore docstrings and inline comments that were accidentally removed in the previous refactor commit.
1 parent cd04a3d commit fc8b5c1

2 files changed

Lines changed: 48 additions & 63 deletions

File tree

integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py

Lines changed: 38 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -710,29 +710,38 @@ async def write_documents_async(
710710
}
711711
actions.append(action)
712712

713-
try:
714-
success, failed = await helpers.async_bulk(
715-
client=self.async_client,
716-
actions=actions,
717-
index=self._index,
718-
refresh=refresh,
719-
raise_on_error=False,
720-
stats_only=False,
721-
)
722-
if failed:
723-
# with stats_only=False, failed is guaranteed to be a list of dicts
724-
assert isinstance(failed, list)
725-
if policy == DuplicatePolicy.FAIL:
726-
for error in failed:
727-
if "create" in error and error["create"]["status"] == DOC_ALREADY_EXISTS:
728-
msg = f"ID '{error['create']['_id']}' already exists in the document store"
729-
raise DuplicateDocumentError(msg)
730-
msg = f"Failed to write documents to Elasticsearch. Errors:\n{failed}"
713+
documents_written, errors = await helpers.async_bulk(
714+
client=self.async_client,
715+
actions=actions,
716+
index=self._index,
717+
refresh=refresh,
718+
raise_on_error=False,
719+
stats_only=False,
720+
)
721+
722+
if errors:
723+
# with stats_only=False, errors is guaranteed to be a list of dicts
724+
assert isinstance(errors, list)
725+
duplicate_errors_ids = []
726+
other_errors = []
727+
for e in errors:
728+
error_type = e["create"]["error"]["type"]
729+
if policy == DuplicatePolicy.FAIL and error_type == "version_conflict_engine_exception":
730+
duplicate_errors_ids.append(e["create"]["_id"])
731+
elif policy == DuplicatePolicy.SKIP and error_type == "version_conflict_engine_exception":
732+
continue
733+
else:
734+
other_errors.append(e)
735+
736+
if len(duplicate_errors_ids) > 0:
737+
msg = f"IDs '{', '.join(duplicate_errors_ids)}' already exist in the document store."
738+
raise DuplicateDocumentError(msg)
739+
740+
if len(other_errors) > 0:
741+
msg = f"Failed to write documents to Elasticsearch. Errors:\n{other_errors}"
731742
raise DocumentStoreError(msg)
732-
return success
733-
except Exception as e:
734-
msg = f"Failed to write documents to Elasticsearch: {e!s}"
735-
raise DocumentStoreError(msg) from e
743+
744+
return documents_written
736745

737746
def delete_documents(self, document_ids: list[str], refresh: Literal["wait_for", True, False] = "wait_for") -> None:
738747
"""
@@ -776,16 +785,13 @@ async def delete_documents_async(
776785
"""
777786
self._ensure_initialized()
778787

779-
try:
780-
await helpers.async_bulk(
781-
client=self.async_client,
782-
actions=({"_op_type": "delete", "_id": id_} for id_ in document_ids),
783-
index=self._index,
784-
refresh=refresh,
785-
)
786-
except Exception as e:
787-
msg = f"Failed to delete documents from Elasticsearch: {e!s}"
788-
raise DocumentStoreError(msg) from e
788+
await helpers.async_bulk(
789+
client=self.async_client,
790+
actions=({"_op_type": "delete", "_id": id_} for id_ in document_ids),
791+
index=self._index,
792+
refresh=refresh,
793+
raise_on_error=False,
794+
)
789795

790796
def delete_all_documents(self, recreate_index: bool = False, refresh: bool = True) -> None:
791797
"""

integrations/elasticsearch/tests/test_document_store_async.py

Lines changed: 10 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,17 @@ class TestElasticsearchDocumentStoreAsync(
4343
):
4444
@pytest_asyncio.fixture
4545
async def document_store(self, request):
46+
"""
47+
Basic fixture providing a document store instance for async tests
48+
"""
4649
hosts = ["http://localhost:9200"]
50+
# Use a different index for each test so we can run them in parallel
4751
index = f"{request.node.name}"
4852

4953
store = ElasticsearchDocumentStore(hosts=hosts, index=index)
5054
yield store
5155
store.client.options(ignore_status=[400, 404]).indices.delete(index=index)
56+
5257
await store.async_client.close()
5358

5459
def assert_documents_are_equal(self, received: list[Document], expected: list[Document]):
@@ -63,37 +68,6 @@ async def test_count_not_empty_async(self, document_store):
6368
)
6469
assert await document_store.count_documents_async() == 3
6570

66-
@pytest.mark.asyncio
67-
async def test_delete_documents_empty_document_store_async(self, document_store):
68-
# Elasticsearch raises DocumentStoreError when deleting a non-existent document
69-
# rather than silently ignoring it, so we override the mixin test here.
70-
with pytest.raises(DocumentStoreError):
71-
await document_store.delete_documents_async(["non_existing_id"])
72-
73-
@pytest.mark.asyncio
74-
async def test_delete_documents_non_existing_document_async(self, document_store):
75-
# Same as above: Elasticsearch raises on missing IDs rather than a no-op.
76-
doc = Document(content="test doc")
77-
await document_store.write_documents_async([doc])
78-
assert await document_store.count_documents_async() == 1
79-
with pytest.raises(DocumentStoreError):
80-
await document_store.delete_documents_async(["non_existing_id"])
81-
82-
@pytest.mark.asyncio
83-
async def test_write_documents_duplicate_fail_async(self, document_store):
84-
# Elasticsearch raises DocumentStoreError instead of DuplicateDocumentError on duplicate FAIL
85-
doc = Document(content="test doc")
86-
assert await document_store.write_documents_async([doc], policy=DuplicatePolicy.FAIL) == 1
87-
with pytest.raises(DocumentStoreError):
88-
await document_store.write_documents_async(documents=[doc], policy=DuplicatePolicy.FAIL)
89-
90-
@pytest.mark.asyncio
91-
async def test_write_documents_duplicate_skip_async(self, document_store):
92-
# Elasticsearch returns 1 (not 0) when skipping an already-existing document
93-
doc = Document(content="test doc")
94-
assert await document_store.write_documents_async([doc], policy=DuplicatePolicy.SKIP) == 1
95-
assert await document_store.write_documents_async(documents=[doc], policy=DuplicatePolicy.SKIP) == 1
96-
9771
@pytest.mark.asyncio
9872
async def test_write_documents_async(self, document_store):
9973
docs = [Document(id="1", content="test")]
@@ -104,12 +78,14 @@ async def test_write_documents_async(self, document_store):
10478

10579
@pytest.mark.asyncio
10680
async def test_write_documents_async_invalid_document_type(self, document_store):
81+
"""Test write_documents with invalid document type"""
10782
invalid_docs = [{"id": "1", "content": "test"}]
10883
with pytest.raises(ValueError, match="param 'documents' must contain a list of objects of type Document"):
10984
await document_store.write_documents_async(invalid_docs)
11085

11186
@pytest.mark.asyncio
11287
async def test_write_documents_async_with_sparse_embedding_warning(self, document_store, caplog):
88+
"""Test write_documents with document containing sparse_embedding field"""
11389
doc = Document(id="1", content="test", sparse_embedding=SparseEmbedding(indices=[0, 1], values=[0.5, 0.5]))
11490
await document_store.write_documents_async([doc])
11591
assert "but `sparse_vector_field` is not configured" in caplog.text
@@ -121,6 +97,7 @@ async def test_write_documents_async_with_sparse_embedding_warning(self, documen
12197

12298
@pytest.mark.asyncio
12399
async def test_write_documents_async_with_sparse_vectors(self):
100+
"""Test write_documents with document containing sparse_embedding field"""
124101
store = ElasticsearchDocumentStore(
125102
hosts=["http://localhost:9200"], index="test_async_sparse", sparse_vector_field="sparse_vec"
126103
)
@@ -129,9 +106,11 @@ async def test_write_documents_async_with_sparse_vectors(self):
129106
doc = Document(id="1", content="test", sparse_embedding=SparseEmbedding(indices=[0, 1], values=[0.5, 0.5]))
130107
await store.write_documents_async([doc])
131108

109+
# check ES natively
132110
raw_doc = await store.async_client.get(index="test_async_sparse", id="1")
133111
assert raw_doc["_source"]["sparse_vec"] == {"0": 0.5, "1": 0.5}
134112

113+
# check retrieval
135114
results = await store.filter_documents_async()
136115
assert len(results) == 1
137116
assert results[0].sparse_embedding is not None

0 commit comments

Comments
 (0)