Skip to content

Commit f344060

Browse files
phigepanakin87
andauthored
feat: implement delete_all_documents for weaviate integration (#2354)
* implement first test version * weaviate: add delete_all_documents(recreate_index, batching) Signed-off-by: phigep <philipp.geppner@gmail.com> * fixed linter issues * add requested tests and fix max query result issues with helper * linter again * forgot a type hint * simplify and throw warning if batchsize too big * refinements * fix --------- Signed-off-by: phigep <philipp.geppner@gmail.com> Co-authored-by: anakin87 <stefanofiorucci@gmail.com>
1 parent 3f4978c commit f344060

2 files changed

Lines changed: 88 additions & 0 deletions

File tree

integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -505,6 +505,55 @@ def delete_documents(self, document_ids: List[str]) -> None:
505505
weaviate_ids = [generate_uuid5(doc_id) for doc_id in document_ids]
506506
self.collection.data.delete_many(where=weaviate.classes.query.Filter.by_id().contains_any(weaviate_ids))
507507

508+
def delete_all_documents(self, *, recreate_index: bool = False, batch_size: int = 1000) -> None:
509+
"""
510+
Deletes all documents in a collection.
511+
512+
If recreate_index is False, it keeps the collection but deletes documents iteratively.
513+
If recreate_index is True, the collection is dropped and faithfully recreated.
514+
This is recommended for performance reasons.
515+
516+
:param recreate_index: Use drop and recreate strategy. (recommended for performance)
517+
:param batch_size: Only relevant if recreate_index is false. Defines the deletion batch size.
518+
Note that this parameter needs to be less or equal to the set `QUERY_MAXIMUM_RESULTS` variable
519+
set for the weaviate deployment (default is 10000).
520+
Reference: https://docs.weaviate.io/weaviate/manage-objects/delete#delete-all-objects
521+
"""
522+
523+
if recreate_index:
524+
# get current up-to-date config from server, so we can recreate the collection faithfully
525+
cfg = self.client.collections.get(self._collection_settings["class"]).config.get().to_dict()
526+
class_name = cfg.get("class", self._collection_settings["class"])
527+
528+
self.client.collections.delete(class_name)
529+
self.client.collections.create_from_dict(cfg)
530+
531+
self._collection_settings = cfg
532+
self._collection = self.client.collections.get(class_name)
533+
return
534+
535+
uuids = []
536+
batch_size = max(1, int(batch_size))
537+
538+
for obj in self.collection.iterator(return_properties=[], include_vector=False):
539+
uuids.append(obj.uuid)
540+
if len(uuids) >= batch_size:
541+
res = self.collection.data.delete_many(where=weaviate.classes.query.Filter.by_id().contains_any(uuids))
542+
if res.successful < len(uuids):
543+
logger.warning(
544+
"Not all documents in the batch have been deleted. "
545+
"Make sure to specify a deletion `batch_size` which is less than `QUERY_MAXIMUM_RESULTS`.",
546+
)
547+
uuids.clear()
548+
549+
if uuids:
550+
res = self.collection.data.delete_many(where=weaviate.classes.query.Filter.by_id().contains_any(uuids))
551+
if res.successful < len(uuids):
552+
logger.warning(
553+
"Not all documents have been deleted. "
554+
"Make sure to specify a deletion `batch_size` which is less than `QUERY_MAXIMUM_RESULTS`.",
555+
)
556+
508557
def _bm25_retrieval(
509558
self, query: str, filters: Optional[Dict[str, Any]] = None, top_k: Optional[int] = None
510559
) -> List[Document]:

integrations/weaviate/tests/test_document_store.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
# SPDX-License-Identifier: Apache-2.0
44

55
import base64
6+
import logging
67
import os
78
from typing import List
89
from unittest.mock import MagicMock, patch
@@ -794,3 +795,41 @@ def test_connect_to_local(self):
794795
def test_connect_to_embedded(self):
795796
document_store = WeaviateDocumentStore(embedded_options=EmbeddedOptions())
796797
assert document_store.client
798+
799+
def test_delete_all_documents(self, document_store):
800+
docs = [Document(content="test doc 1"), Document(content="test doc 2")]
801+
assert document_store.write_documents(docs) == 2
802+
assert document_store.count_documents() == 2
803+
document_store.delete_all_documents()
804+
assert document_store.count_documents() == 0
805+
806+
def test_delete_all_documents_recreate(self, document_store):
807+
docs = [Document(content="test doc 1"), Document(content="test doc 2")]
808+
assert document_store.write_documents(docs) == 2
809+
assert document_store.count_documents() == 2
810+
811+
cls = document_store._collection_settings["class"]
812+
collection = document_store.client.collections.get(cls)
813+
previous_config = collection.config.get().to_dict()
814+
815+
document_store.delete_all_documents(recreate_index=True)
816+
assert document_store.count_documents() == 0
817+
818+
new_config = document_store.client.collections.get(cls).config.get().to_dict()
819+
assert previous_config == new_config
820+
821+
def test_delete_all_documents_batch_size(self, document_store):
822+
docs = [Document(content=str(i)) for i in range(0, 5)]
823+
assert document_store.write_documents(docs) == 5
824+
document_store.delete_all_documents(batch_size=2)
825+
assert document_store.count_documents() == 0
826+
827+
def test_delete_all_documents_excessive_batch_size(self, document_store, caplog):
828+
"""Test that the deletion is not complete if the batch size exceeds the QUERY_MAXIMUM_RESULTS."""
829+
# assume QUERY_MAXIMUM_RESULTS == 10000 with standard deployment
830+
docs = [Document(content=str(i)) for i in range(0, 10005)]
831+
assert document_store.write_documents(docs) == 10005
832+
with caplog.at_level(logging.WARNING):
833+
document_store.delete_all_documents(batch_size=20000)
834+
assert document_store.count_documents() == 5
835+
assert "Not all documents have been deleted." in caplog.text

0 commit comments

Comments
 (0)