Skip to content

Commit 793c7a5

Browse files
committed
adding delete_by_filter and updated_by_filter
1 parent ecb6203 commit 793c7a5

1 file changed

Lines changed: 264 additions & 0 deletions

File tree

  • integrations/weaviate/src/haystack_integrations/document_stores/weaviate

integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py

Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# SPDX-FileCopyrightText: 2023-present deepset GmbH <info@deepset.ai>
22
#
33
# SPDX-License-Identifier: Apache-2.0
4+
45
import base64
56
import datetime
67
import json
@@ -610,6 +611,269 @@ def delete_all_documents(self, *, recreate_index: bool = False, batch_size: int
610611
"Make sure to specify a deletion `batch_size` which is less than `QUERY_MAXIMUM_RESULTS`.",
611612
)
612613

614+
def delete_by_filter(self, filters: dict[str, Any]) -> int:
615+
"""
616+
Deletes all documents that match the provided filters.
617+
618+
:param filters: The filters to apply to select documents for deletion.
619+
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
620+
:returns: The number of documents deleted.
621+
"""
622+
if filters and "operator" not in filters and "conditions" not in filters:
623+
msg = "Invalid filter syntax. See https://docs.haystack.deepset.ai/docs/metadata-filtering for details."
624+
raise ValueError(msg)
625+
626+
try:
627+
weaviate_filter = convert_filters(filters)
628+
result = self.collection.data.delete_many(where=weaviate_filter)
629+
deleted_count = result.successful
630+
logger.info(
631+
"Deleted {n_docs} documents from collection '{collection}' using filters.",
632+
n_docs=deleted_count,
633+
collection=self.collection.name,
634+
)
635+
return deleted_count
636+
except weaviate.exceptions.WeaviateQueryError as e:
637+
msg = f"Failed to delete documents by filter in Weaviate. Error: {e.message}"
638+
raise DocumentStoreError(msg) from e
639+
except Exception as e:
640+
msg = f"Failed to delete documents by filter in Weaviate: {e!s}"
641+
raise DocumentStoreError(msg) from e
642+
643+
async def delete_by_filter_async(self, filters: dict[str, Any]) -> int:
644+
"""
645+
Asynchronously deletes all documents that match the provided filters.
646+
647+
:param filters: The filters to apply to select documents for deletion.
648+
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
649+
:returns: The number of documents deleted.
650+
"""
651+
if filters and "operator" not in filters and "conditions" not in filters:
652+
msg = "Invalid filter syntax. See https://docs.haystack.deepset.ai/docs/metadata-filtering for details."
653+
raise ValueError(msg)
654+
655+
try:
656+
collection = await self.async_collection
657+
weaviate_filter = convert_filters(filters)
658+
result = await collection.data.delete_many(where=weaviate_filter)
659+
deleted_count = result.successful
660+
logger.info(
661+
"Deleted {n_docs} documents from collection '{collection}' using filters.",
662+
n_docs=deleted_count,
663+
collection=collection.name,
664+
)
665+
return deleted_count
666+
except weaviate.exceptions.WeaviateQueryError as e:
667+
msg = f"Failed to delete documents by filter in Weaviate. Error: {e.message}"
668+
raise DocumentStoreError(msg) from e
669+
except Exception as e:
670+
msg = f"Failed to delete documents by filter in Weaviate: {e!s}"
671+
raise DocumentStoreError(msg) from e
672+
673+
def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int:
674+
"""
675+
Updates the metadata of all documents that match the provided filters.
676+
677+
:param filters: The filters to apply to select documents for updating.
678+
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
679+
:param meta: The metadata fields to update. These will be merged with existing metadata.
680+
:returns: The number of documents updated.
681+
"""
682+
if filters and "operator" not in filters and "conditions" not in filters:
683+
msg = "Invalid filter syntax. See https://docs.haystack.deepset.ai/docs/metadata-filtering for details."
684+
raise ValueError(msg)
685+
686+
if not isinstance(meta, dict):
687+
msg = "Meta must be a dictionary"
688+
raise ValueError(msg)
689+
690+
try:
691+
weaviate_filter = convert_filters(filters)
692+
properties = [p.name for p in self.collection.config.get().properties]
693+
694+
# Query all objects matching the filter
695+
matching_objects = []
696+
offset = 0
697+
partial_result = None
698+
699+
# Paginate through all matching objects
700+
# We include vector=True to preserve vectors when updating
701+
while partial_result is None or len(partial_result.objects) == DEFAULT_QUERY_LIMIT:
702+
partial_result = self.collection.query.fetch_objects(
703+
filters=weaviate_filter,
704+
include_vector=True,
705+
limit=DEFAULT_QUERY_LIMIT,
706+
offset=offset,
707+
return_properties=properties,
708+
)
709+
matching_objects.extend(partial_result.objects)
710+
offset += DEFAULT_QUERY_LIMIT
711+
if len(partial_result.objects) < DEFAULT_QUERY_LIMIT:
712+
break
713+
714+
if not matching_objects:
715+
return 0
716+
717+
# Update each object with the new metadata
718+
# Since metadata is stored flattened in Weaviate properties, we update properties directly
719+
updated_count = 0
720+
failed_updates = []
721+
722+
for obj in matching_objects:
723+
try:
724+
# Get current properties
725+
current_properties = obj.properties.copy() if obj.properties else {}
726+
727+
# Update with new metadata values
728+
# Note: metadata fields are stored directly in properties (flattened)
729+
for key, value in meta.items():
730+
current_properties[key] = value
731+
732+
# Update the object, preserving the vector
733+
# Get the vector from the object to preserve it during replace
734+
vector = None
735+
if isinstance(obj.vector, list):
736+
vector = obj.vector
737+
elif isinstance(obj.vector, dict):
738+
vector = obj.vector.get("default")
739+
740+
self.collection.data.replace(
741+
uuid=obj.uuid,
742+
properties=current_properties,
743+
vector=vector,
744+
)
745+
updated_count += 1
746+
except Exception as e:
747+
# Collect failed updates but continue with others
748+
obj_properties = obj.properties or {}
749+
id_ = obj_properties.get("_original_id", obj.uuid)
750+
failed_updates.append((id_, str(e)))
751+
752+
if failed_updates:
753+
msg = "\n".join(
754+
[
755+
f"Failed to update object with id '{id_}'. Error: '{error}'"
756+
for id_, error in failed_updates
757+
]
758+
)
759+
raise DocumentStoreError(msg)
760+
761+
logger.info(
762+
"Updated {n_docs} documents in collection '{collection}' using filters.",
763+
n_docs=updated_count,
764+
collection=self.collection.name,
765+
)
766+
return updated_count
767+
except weaviate.exceptions.WeaviateQueryError as e:
768+
msg = f"Failed to update documents by filter in Weaviate. Error: {e.message}"
769+
raise DocumentStoreError(msg) from e
770+
except Exception as e:
771+
msg = f"Failed to update documents by filter in Weaviate: {e!s}"
772+
raise DocumentStoreError(msg) from e
773+
774+
async def update_by_filter_async(self, filters: dict[str, Any], meta: dict[str, Any]) -> int:
775+
"""
776+
Asynchronously updates the metadata of all documents that match the provided filters.
777+
778+
:param filters: The filters to apply to select documents for updating.
779+
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
780+
:param meta: The metadata fields to update. These will be merged with existing metadata.
781+
:returns: The number of documents updated.
782+
"""
783+
if filters and "operator" not in filters and "conditions" not in filters:
784+
msg = "Invalid filter syntax. See https://docs.haystack.deepset.ai/docs/metadata-filtering for details."
785+
raise ValueError(msg)
786+
787+
if not isinstance(meta, dict):
788+
msg = "Meta must be a dictionary"
789+
raise ValueError(msg)
790+
791+
try:
792+
collection = await self.async_collection
793+
weaviate_filter = convert_filters(filters)
794+
config = await collection.config.get()
795+
properties = [p.name for p in config.properties]
796+
797+
# Query all objects matching the filter
798+
matching_objects = []
799+
offset = 0
800+
partial_result = None
801+
802+
# Paginate through all matching objects
803+
# We include vector=True to preserve vectors when updating
804+
while partial_result is None or len(partial_result.objects) == DEFAULT_QUERY_LIMIT:
805+
partial_result = await collection.query.fetch_objects(
806+
filters=weaviate_filter,
807+
include_vector=True,
808+
limit=DEFAULT_QUERY_LIMIT,
809+
offset=offset,
810+
return_properties=properties,
811+
)
812+
matching_objects.extend(partial_result.objects)
813+
offset += DEFAULT_QUERY_LIMIT
814+
if len(partial_result.objects) < DEFAULT_QUERY_LIMIT:
815+
break
816+
817+
if not matching_objects:
818+
return 0
819+
820+
# Update each object with the new metadata
821+
# Since metadata is stored flattened in Weaviate properties, we update properties directly
822+
updated_count = 0
823+
failed_updates = []
824+
825+
for obj in matching_objects:
826+
try:
827+
# Get current properties
828+
current_properties = obj.properties.copy() if obj.properties else {}
829+
830+
# Update with new metadata values
831+
# Note: metadata fields are stored directly in properties (flattened)
832+
for key, value in meta.items():
833+
current_properties[key] = value
834+
835+
# Update the object, preserving the vector
836+
# Get the vector from the object to preserve it during replace
837+
vector = None
838+
if isinstance(obj.vector, list):
839+
vector = obj.vector
840+
elif isinstance(obj.vector, dict):
841+
vector = obj.vector.get("default")
842+
843+
await collection.data.replace(
844+
uuid=obj.uuid,
845+
properties=current_properties,
846+
vector=vector,
847+
)
848+
updated_count += 1
849+
except Exception as e:
850+
# Collect failed updates but continue with others
851+
obj_properties = obj.properties or {}
852+
id_ = obj_properties.get("_original_id", obj.uuid)
853+
failed_updates.append((id_, str(e)))
854+
855+
if failed_updates:
856+
msg = "\n".join(
857+
[
858+
f"Failed to update object with id '{id_}'. Error: '{error}'"
859+
for id_, error in failed_updates
860+
]
861+
)
862+
raise DocumentStoreError(msg)
863+
864+
logger.info(
865+
"Updated {n_docs} documents in collection '{collection}' using filters.",
866+
n_docs=updated_count,
867+
collection=collection.name,
868+
)
869+
return updated_count
870+
except weaviate.exceptions.WeaviateQueryError as e:
871+
msg = f"Failed to update documents by filter in Weaviate. Error: {e.message}"
872+
raise DocumentStoreError(msg) from e
873+
except Exception as e:
874+
msg = f"Failed to update documents by filter in Weaviate: {e!s}"
875+
raise DocumentStoreError(msg) from e
876+
613877
def _bm25_retrieval(
614878
self, query: str, filters: Optional[dict[str, Any]] = None, top_k: Optional[int] = None
615879
) -> list[Document]:

0 commit comments

Comments
 (0)