Skip to content

Commit 68b0141

Browse files
refactor!: Elasticsearch - use async mixin tests and fix write/delete behaviour (#3196)
Co-authored-by: David S. Batista <dsbatista@gmail.com>
1 parent 711ef06 commit 68b0141

3 files changed

Lines changed: 147 additions & 443 deletions

File tree

integrations/elasticsearch/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ classifiers = [
2424
"Programming Language :: Python :: Implementation :: PyPy",
2525
]
2626
dependencies = [
27-
"haystack-ai>=2.26.1",
27+
"haystack-ai>=2.28.0",
2828
"elasticsearch>=8,<9",
2929
"aiohttp>=3.9.0" # for async support https://elasticsearch-py.readthedocs.io/en/latest/async.html#valueerror-when-initializing-asyncelasticsearch
3030
]

integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py

Lines changed: 43 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -646,15 +646,15 @@ def write_documents(
646646
assert isinstance(errors, list)
647647
duplicate_errors_ids = []
648648
other_errors = []
649-
for e in errors:
650-
error_type = e["create"]["error"]["type"]
649+
for error in errors:
650+
error_type = error["create"]["error"]["type"]
651651
if policy == DuplicatePolicy.FAIL and error_type == "version_conflict_engine_exception":
652-
duplicate_errors_ids.append(e["create"]["_id"])
652+
duplicate_errors_ids.append(error["create"]["_id"])
653653
elif policy == DuplicatePolicy.SKIP and error_type == "version_conflict_engine_exception":
654654
# when the policy is skip, duplication errors are OK and we should not raise an exception
655655
continue
656656
else:
657-
other_errors.append(e)
657+
other_errors.append(error)
658658

659659
if len(duplicate_errors_ids) > 0:
660660
msg = f"IDs '{', '.join(duplicate_errors_ids)}' already exist in the document store."
@@ -704,35 +704,44 @@ async def write_documents_async(
704704
self._handle_sparse_embedding(doc_dict, doc.id)
705705

706706
action = {
707-
"_op_type": "create" if policy == DuplicatePolicy.FAIL else "index",
707+
"_op_type": "index" if policy == DuplicatePolicy.OVERWRITE else "create",
708708
"_id": doc.id,
709709
"_source": doc_dict,
710710
}
711711
actions.append(action)
712712

713-
try:
714-
success, failed = await helpers.async_bulk(
715-
client=self.async_client,
716-
actions=actions,
717-
index=self._index,
718-
refresh=refresh,
719-
raise_on_error=False,
720-
stats_only=False,
721-
)
722-
if failed:
723-
# with stats_only=False, failed is guaranteed to be a list of dicts
724-
assert isinstance(failed, list)
725-
if policy == DuplicatePolicy.FAIL:
726-
for error in failed:
727-
if "create" in error and error["create"]["status"] == DOC_ALREADY_EXISTS:
728-
msg = f"ID '{error['create']['_id']}' already exists in the document store"
729-
raise DuplicateDocumentError(msg)
730-
msg = f"Failed to write documents to Elasticsearch. Errors:\n{failed}"
713+
documents_written, errors = await helpers.async_bulk(
714+
client=self.async_client,
715+
actions=actions,
716+
index=self._index,
717+
refresh=refresh,
718+
raise_on_error=False,
719+
stats_only=False,
720+
)
721+
722+
if errors:
723+
# with stats_only=False, errors is guaranteed to be a list of dicts
724+
assert isinstance(errors, list)
725+
duplicate_errors_ids = []
726+
other_errors = []
727+
for error in errors:
728+
error_type = error["create"]["error"]["type"]
729+
if policy == DuplicatePolicy.FAIL and error_type == "version_conflict_engine_exception":
730+
duplicate_errors_ids.append(error["create"]["_id"])
731+
elif policy == DuplicatePolicy.SKIP and error_type == "version_conflict_engine_exception":
732+
continue
733+
else:
734+
other_errors.append(error)
735+
736+
if len(duplicate_errors_ids) > 0:
737+
msg = f"IDs '{', '.join(duplicate_errors_ids)}' already exist in the document store."
738+
raise DuplicateDocumentError(msg)
739+
740+
if len(other_errors) > 0:
741+
msg = f"Failed to write documents to Elasticsearch. Errors:\n{other_errors}"
731742
raise DocumentStoreError(msg)
732-
return success
733-
except Exception as e:
734-
msg = f"Failed to write documents to Elasticsearch: {e!s}"
735-
raise DocumentStoreError(msg) from e
743+
744+
return documents_written
736745

737746
def delete_documents(self, document_ids: list[str], refresh: Literal["wait_for", True, False] = "wait_for") -> None:
738747
"""
@@ -776,16 +785,13 @@ async def delete_documents_async(
776785
"""
777786
self._ensure_initialized()
778787

779-
try:
780-
await helpers.async_bulk(
781-
client=self.async_client,
782-
actions=({"_op_type": "delete", "_id": id_} for id_ in document_ids),
783-
index=self._index,
784-
refresh=refresh,
785-
)
786-
except Exception as e:
787-
msg = f"Failed to delete documents from Elasticsearch: {e!s}"
788-
raise DocumentStoreError(msg) from e
788+
await helpers.async_bulk(
789+
client=self.async_client,
790+
actions=({"_op_type": "delete", "_id": id_} for id_ in document_ids),
791+
index=self._index,
792+
refresh=refresh,
793+
raise_on_error=False,
794+
)
789795

790796
def delete_all_documents(self, recreate_index: bool = False, refresh: bool = True) -> None:
791797
"""

0 commit comments

Comments
 (0)