Skip to content

Commit 4b21ff6

Browse files
committed
adding new operations + tests
1 parent 1133b2d commit 4b21ff6

3 files changed

Lines changed: 667 additions & 0 deletions

File tree

integrations/qdrant/src/haystack_integrations/document_stores/qdrant/document_store.py

Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,249 @@ async def delete_documents_async(self, document_ids: list[str]) -> None:
517517
"Called QdrantDocumentStore.delete_documents_async() on a non-existing ID",
518518
)
519519

520+
def delete_by_filter(self, filters: dict[str, Any]) -> int:
521+
"""
522+
Deletes all documents that match the provided filters.
523+
524+
:param filters: The filters to apply to select documents for deletion.
525+
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
526+
527+
:returns:
528+
The number of documents deleted.
529+
"""
530+
self._initialize_client()
531+
assert self._client is not None
532+
533+
try:
534+
qdrant_filter = convert_filters_to_qdrant(filters)
535+
536+
if qdrant_filter is None:
537+
return 0
538+
539+
matching_docs = list(self.filter_documents(filters))
540+
count = len(matching_docs)
541+
542+
if count == 0:
543+
return 0
544+
545+
# perform deletion using FilterSelector
546+
self._client.delete(
547+
collection_name=self.index,
548+
points_selector=rest.FilterSelector(filter=qdrant_filter),
549+
wait=self.wait_result_from_api,
550+
)
551+
552+
logger.info(
553+
"Deleted {n_docs} documents from collection '{name}' using filters.",
554+
n_docs=count,
555+
name=self.index,
556+
)
557+
return count
558+
except Exception as e:
559+
msg = f"Failed to delete documents by filter from Qdrant: {e!s}"
560+
raise QdrantStoreError(msg) from e
561+
562+
async def delete_by_filter_async(self, filters: dict[str, Any]) -> int:
563+
"""
564+
Asynchronously deletes all documents that match the provided filters.
565+
566+
:param filters: The filters to apply to select documents for deletion.
567+
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
568+
569+
:returns:
570+
The number of documents deleted.
571+
"""
572+
await self._initialize_async_client()
573+
assert self._async_client is not None
574+
575+
try:
576+
qdrant_filter = convert_filters_to_qdrant(filters)
577+
578+
if qdrant_filter is None:
579+
return 0
580+
581+
matching_docs = []
582+
async for doc in self._get_documents_generator_async(filters):
583+
matching_docs.append(doc)
584+
count = len(matching_docs)
585+
586+
if count == 0:
587+
return 0
588+
589+
# perform deletion using FilterSelector
590+
await self._async_client.delete(
591+
collection_name=self.index,
592+
points_selector=rest.FilterSelector(filter=qdrant_filter),
593+
wait=self.wait_result_from_api,
594+
)
595+
596+
logger.info(
597+
"Deleted {n_docs} documents from collection '{name}' using filters.",
598+
n_docs=count,
599+
name=self.index,
600+
)
601+
return count
602+
except Exception as e:
603+
msg = f"Failed to delete documents by filter from Qdrant: {e!s}"
604+
raise QdrantStoreError(msg) from e
605+
606+
def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int:
607+
"""
608+
Updates the metadata of all documents that match the provided filters.
609+
610+
**Note**: This operation is not atomic. Documents matching the filter are fetched first,
611+
then updated. If documents are modified between the fetch and update operations,
612+
those changes may be lost.
613+
614+
:param filters: The filters to apply to select documents for updating.
615+
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
616+
:param meta: The metadata fields to update. This will be merged with existing metadata.
617+
618+
:returns:
619+
The number of documents updated.
620+
"""
621+
self._initialize_client()
622+
assert self._client is not None
623+
624+
try:
625+
qdrant_filter = convert_filters_to_qdrant(filters)
626+
if qdrant_filter is None:
627+
return 0
628+
629+
# get all matching documents using scroll
630+
updated_points = []
631+
next_offset = None
632+
stop_scrolling = False
633+
634+
while not stop_scrolling:
635+
records, next_offset = self._client.scroll(
636+
collection_name=self.index,
637+
scroll_filter=qdrant_filter,
638+
limit=self.scroll_size,
639+
offset=next_offset,
640+
with_payload=True,
641+
with_vectors=True,
642+
)
643+
644+
stop_scrolling = next_offset is None or (
645+
hasattr(next_offset, "num")
646+
and hasattr(next_offset, "uuid")
647+
and next_offset.num == 0
648+
and next_offset.uuid == ""
649+
)
650+
651+
# update payload for each record
652+
for record in records:
653+
# merge existing payload with new metadata
654+
updated_payload = {**(record.payload or {}), **meta}
655+
656+
# create updated point preserving vectors
657+
updated_point = rest.PointStruct(
658+
id=record.id,
659+
vector=record.vector or {},
660+
payload=updated_payload,
661+
)
662+
updated_points.append(updated_point)
663+
664+
if not updated_points:
665+
return 0
666+
667+
# upsert updated points back in batches
668+
for batch in get_batches_from_generator(updated_points, self.write_batch_size):
669+
self._client.upsert(
670+
collection_name=self.index,
671+
points=batch,
672+
wait=self.wait_result_from_api,
673+
)
674+
675+
logger.info(
676+
"Updated {n_docs} documents in collection '{name}' using filters.",
677+
n_docs=len(updated_points),
678+
name=self.index,
679+
)
680+
return len(updated_points)
681+
except Exception as e:
682+
msg = f"Failed to update documents by filter in Qdrant: {e!s}"
683+
raise QdrantStoreError(msg) from e
684+
685+
async def update_by_filter_async(self, filters: dict[str, Any], meta: dict[str, Any]) -> int:
686+
"""
687+
Asynchronously updates the metadata of all documents that match the provided filters.
688+
689+
**Note**: This operation is not atomic. Documents matching the filter are fetched first,
690+
then updated. If documents are modified between the fetch and update operations,
691+
those changes may be lost.
692+
693+
:param filters: The filters to apply to select documents for updating.
694+
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
695+
:param meta: The metadata fields to update. This will be merged with existing metadata.
696+
697+
:returns:
698+
The number of documents updated.
699+
"""
700+
await self._initialize_async_client()
701+
assert self._async_client is not None
702+
703+
try:
704+
qdrant_filter = convert_filters_to_qdrant(filters)
705+
if qdrant_filter is None:
706+
return 0
707+
708+
updated_points = []
709+
next_offset = None
710+
stop_scrolling = False
711+
712+
while not stop_scrolling:
713+
records, next_offset = await self._async_client.scroll(
714+
collection_name=self.index,
715+
scroll_filter=qdrant_filter,
716+
limit=self.scroll_size,
717+
offset=next_offset,
718+
with_payload=True,
719+
with_vectors=True,
720+
)
721+
722+
stop_scrolling = next_offset is None or (
723+
hasattr(next_offset, "num")
724+
and hasattr(next_offset, "uuid")
725+
and next_offset.num == 0
726+
and next_offset.uuid == ""
727+
)
728+
729+
# update payload for each record
730+
for record in records:
731+
# merge existing payload with new metadata
732+
updated_payload = {**(record.payload or {}), **meta}
733+
734+
# create updated point preserving vectors
735+
updated_point = rest.PointStruct(
736+
id=record.id,
737+
vector=record.vector or {},
738+
payload=updated_payload,
739+
)
740+
updated_points.append(updated_point)
741+
742+
if not updated_points:
743+
return 0
744+
745+
# upsert updated points back in batches
746+
for batch in get_batches_from_generator(updated_points, self.write_batch_size):
747+
await self._async_client.upsert(
748+
collection_name=self.index,
749+
points=batch,
750+
wait=self.wait_result_from_api,
751+
)
752+
753+
logger.info(
754+
"Updated {n_docs} documents in collection '{name}' using filters.",
755+
n_docs=len(updated_points),
756+
name=self.index,
757+
)
758+
return len(updated_points)
759+
except Exception as e:
760+
msg = f"Failed to update documents by filter in Qdrant: {e!s}"
761+
raise QdrantStoreError(msg) from e
762+
520763
def delete_all_documents(self, recreate_index: bool = False) -> None:
521764
"""
522765
Deletes all documents from the document store.

0 commit comments

Comments
 (0)