Skip to content

Commit 78dc81a

Browse files
committed
refactoring to reduce duplicated code
1 parent 3255149 commit 78dc81a

1 file changed

Lines changed: 50 additions & 50 deletions

File tree

  • integrations/qdrant/src/haystack_integrations/document_stores/qdrant

integrations/qdrant/src/haystack_integrations/document_stores/qdrant/document_store.py

Lines changed: 50 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -603,6 +603,46 @@ async def delete_by_filter_async(self, filters: dict[str, Any]) -> int:
603603
msg = f"Failed to delete documents by filter from Qdrant: {e!s}"
604604
raise QdrantStoreError(msg) from e
605605

606+
@staticmethod
607+
def _check_stop_scrolling(next_offset: Any) -> bool:
608+
"""
609+
Checks if scrolling should stop based on the next_offset value.
610+
611+
:param next_offset: The offset returned from the scroll operation.
612+
:returns: True if scrolling should stop, False otherwise.
613+
"""
614+
return next_offset is None or (
615+
hasattr(next_offset, "num")
616+
and hasattr(next_offset, "uuid")
617+
and next_offset.num == 0
618+
and next_offset.uuid == ""
619+
)
620+
621+
@staticmethod
622+
def _create_updated_point_from_record(record: Any, meta: dict[str, Any]) -> rest.PointStruct:
623+
"""
624+
Creates an updated PointStruct from a Qdrant record with merged metadata.
625+
626+
:param record: The Qdrant record to update.
627+
:param meta: The metadata fields to merge with existing metadata.
628+
:returns: A PointStruct with updated metadata and preserved vectors.
629+
"""
630+
# merge existing payload with new metadata
631+
# Metadata is stored under the "meta" key in the payload
632+
updated_payload = dict(record.payload or {})
633+
if "meta" not in updated_payload:
634+
updated_payload["meta"] = {}
635+
updated_payload["meta"].update(meta)
636+
637+
# create updated point preserving vectors
638+
# Type cast needed because record.vector type doesn't include all PointStruct vector types
639+
vector_value = record.vector if record.vector is not None else {}
640+
return rest.PointStruct(
641+
id=record.id,
642+
vector=cast(Any, vector_value),
643+
payload=updated_payload,
644+
)
645+
606646
def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int:
607647
"""
608648
Updates the metadata of all documents that match the provided filters.
@@ -629,9 +669,8 @@ def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int
629669
# get all matching documents using scroll
630670
updated_points = []
631671
next_offset = None
632-
stop_scrolling = False
633672

634-
while not stop_scrolling:
673+
while True:
635674
records, next_offset = self._client.scroll(
636675
collection_name=self.index,
637676
scroll_filter=qdrant_filter,
@@ -641,31 +680,12 @@ def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int
641680
with_vectors=True,
642681
)
643682

644-
stop_scrolling = next_offset is None or (
645-
hasattr(next_offset, "num")
646-
and hasattr(next_offset, "uuid")
647-
and next_offset.num == 0
648-
and next_offset.uuid == ""
649-
)
650-
651683
# update payload for each record
652684
for record in records:
653-
# merge existing payload with new metadata
654-
# Metadata is stored under the "meta" key in the payload
655-
updated_payload = dict(record.payload or {})
656-
if "meta" not in updated_payload:
657-
updated_payload["meta"] = {}
658-
updated_payload["meta"].update(meta)
659-
660-
# create updated point preserving vectors
661-
# Type cast needed because record.vector type doesn't include all PointStruct vector types
662-
vector_value = record.vector if record.vector is not None else {}
663-
updated_point = rest.PointStruct(
664-
id=record.id,
665-
vector=cast(Any, vector_value),
666-
payload=updated_payload,
667-
)
668-
updated_points.append(updated_point)
685+
updated_points.append(self._create_updated_point_from_record(record, meta))
686+
687+
if self._check_stop_scrolling(next_offset):
688+
break
669689

670690
if not updated_points:
671691
return 0
@@ -713,9 +733,8 @@ async def update_by_filter_async(self, filters: dict[str, Any], meta: dict[str,
713733

714734
updated_points = []
715735
next_offset = None
716-
stop_scrolling = False
717736

718-
while not stop_scrolling:
737+
while True:
719738
records, next_offset = await self._async_client.scroll(
720739
collection_name=self.index,
721740
scroll_filter=qdrant_filter,
@@ -725,31 +744,12 @@ async def update_by_filter_async(self, filters: dict[str, Any], meta: dict[str,
725744
with_vectors=True,
726745
)
727746

728-
stop_scrolling = next_offset is None or (
729-
hasattr(next_offset, "num")
730-
and hasattr(next_offset, "uuid")
731-
and next_offset.num == 0
732-
and next_offset.uuid == ""
733-
)
734-
735747
# update payload for each record
736748
for record in records:
737-
# merge existing payload with new metadata
738-
# Metadata is stored under the "meta" key in the payload
739-
updated_payload = dict(record.payload or {})
740-
if "meta" not in updated_payload:
741-
updated_payload["meta"] = {}
742-
updated_payload["meta"].update(meta)
743-
744-
# create updated point preserving vectors
745-
# Type cast needed because record.vector type doesn't include all PointStruct vector types
746-
vector_value = record.vector if record.vector is not None else {}
747-
updated_point = rest.PointStruct(
748-
id=record.id,
749-
vector=cast(Any, vector_value),
750-
payload=updated_payload,
751-
)
752-
updated_points.append(updated_point)
749+
updated_points.append(self._create_updated_point_from_record(record, meta))
750+
751+
if self._check_stop_scrolling(next_offset):
752+
break
753753

754754
if not updated_points:
755755
return 0

0 commit comments

Comments
 (0)