Skip to content

Commit cfad373

Browse files
committed
adding sync methods + tests
1 parent ba53192 commit cfad373

2 files changed

Lines changed: 114 additions & 46 deletions

File tree

integrations/qdrant/src/haystack_integrations/document_stores/qdrant/document_store.py

Lines changed: 75 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,8 @@ def get_batches_from_generator(iterable: List, n: int) -> Generator:
5353

5454
class QdrantDocumentStore:
5555
"""
56-
A QdrantDocumentStore implementation that you
57-
can use with any Qdrant instance: in-memory, disk-persisted, Docker-based,
58-
and Qdrant Cloud Cluster deployments.
56+
A QdrantDocumentStore implementation that you can use with any Qdrant instance: in-memory, disk-persisted,
57+
Docker-based, and Qdrant Cloud Cluster deployments.
5958
6059
Usage example by creating an in-memory instance:
6160
@@ -65,7 +64,8 @@ class QdrantDocumentStore:
6564
6665
document_store = QdrantDocumentStore(
6766
":memory:",
68-
recreate_index=True
67+
recreate_index=True,
68+
embedding_dim=5
6969
)
7070
document_store.write_documents([
7171
Document(content="This is first", embedding=[0.0]*5),
@@ -135,7 +135,7 @@ def __init__(
135135
payload_fields_to_index: Optional[List[dict]] = None,
136136
) -> None:
137137
"""
138-
Initialize a QdrantDocumentStore.
138+
Initializes a QdrantDocumentStore.
139139
140140
:param location:
141141
If `":memory:"` - use in-memory Qdrant instance.
@@ -347,8 +347,7 @@ def filter_documents(
347347
[documentation](https://docs.haystack.deepset.ai/docs/metadata-filtering)
348348
349349
:param filters: The filters to apply to the document list.
350-
:returns:
351-
A list of documents that match the given filters.
350+
:returns: A list of documents that match the given filters.
352351
"""
353352
# No need to initialize client here as _get_documents_generator
354353
# will handle client initialization internally
@@ -380,7 +379,6 @@ def write_documents(
380379
) -> int:
381380
"""
382381
Writes documents to Qdrant using the specified policy.
383-
384382
The QdrantDocumentStore can handle duplicate documents based on the given policy.
385383
The available policies are:
386384
- `FAIL`: The operation will raise an error if any document already exists.
@@ -390,8 +388,7 @@ def write_documents(
390388
:param documents: A list of Document objects to write to Qdrant.
391389
:param policy: The policy for handling duplicate documents.
392390
393-
:returns:
394-
The number of documents written to the document store.
391+
:returns: The number of documents written to the document store.
395392
"""
396393

397394
self._initialize_client()
@@ -435,7 +432,6 @@ async def write_documents_async(
435432
) -> int:
436433
"""
437434
Asynchronously writes documents to Qdrant using the specified policy.
438-
439435
The QdrantDocumentStore can handle duplicate documents based on the given policy.
440436
The available policies are:
441437
- `FAIL`: The operation will raise an error if any document already exists.
@@ -527,39 +523,69 @@ async def delete_documents_async(self, document_ids: List[str]) -> None:
527523
"Called QdrantDocumentStore.delete_documents_async() on a non-existing ID",
528524
)
529525

530-
def delete_all_documents(self, recreate_index: bool = False) -> None:
526+
def delete_all_documents(self, recreate_index: bool = False):
531527
"""
532-
Deletes all documents in the document store.
528+
Deletes all documents from the document store.
533529
534-
:param recreate_index: If `True`, the index will be recreated after deletion.
535-
536-
It deletes the collection and recreates it to ensure all documents are removed.
530+
:param recreate_index: Whether to recreate the index after deleting all documents.
537531
"""
538532

533+
self._initialize_client()
534+
assert self._client is not None
535+
539536
if recreate_index:
540-
self.recreate_collection(
541-
self.index,
542-
self.get_distance(self.similarity),
543-
self.embedding_dim,
544-
self.on_disk,
545-
self.use_sparse_embeddings,
546-
self.sparse_idf,
537+
538+
# get current collection config
539+
collection_info = self._client.get_collection(collection_name=self.index)
540+
541+
"""
542+
x = CollectionInfo(
543+
status= < CollectionStatus.GREEN: 'green' >,
544+
optimizer_status = < OptimizersStatusOneOf.OK: 'ok' >, vectors_count = None, indexed_vectors_count = 0, points_count = 5, segments_count = 1, config = CollectionConfig(
545+
params=CollectionParams(vectors=VectorParams(size=768, distance= < Distance.COSINE: 'Cosine' >, hnsw_config = None, quantization_config = None, on_disk = False, datatype = None, multivector_config = None), shard_number = None, sharding_method = None, replication_factor = None, write_consistency_factor = None, read_fan_out_factor = None, on_disk_payload = None, sparse_vectors = None), hnsw_config = HnswConfig(
546+
m=16, ef_construct=100, full_scan_threshold=10000, max_indexing_threads=0, on_disk=None,
547+
payload_m=None), optimizer_config = OptimizersConfig(deleted_threshold=0.2,
548+
vacuum_min_vector_number=1000,
549+
default_segment_number=0, max_segment_size=None,
550+
memmap_threshold=None, indexing_threshold=20000,
551+
flush_interval_sec=5,
552+
max_optimization_threads=1), wal_config = WalConfig(
553+
wal_capacity_mb=32,
554+
wal_segments_ahead=0), quantization_config = None, strict_mode_config = None), payload_schema = {})
555+
"""
556+
557+
# recreate collection
558+
self._set_up_collection(
559+
collection_name=self.index,
560+
embedding_dim=collection_info.config.params.vectors.size,
561+
recreate_collection=True,
562+
similarity=collection_info.config.params.vectors.distance.value,
563+
use_sparse_embeddings=collection_info.config.params.sparse_vectors == SPARSE_VECTORS_NAME,
564+
sparse_idf=(collection_info.config.params.vectors.name == SPARSE_VECTORS_NAME) and
565+
collection_info.config.params.vectors.config.hnsw_config is not None,
566+
on_disk=collection_info.config.params.vectors.config.on_disk,
567+
# ToDo: investigate
568+
# - CollectionInfo has payload_schema as Optional[Dict[str, PayloadSchemaType]],
569+
# - self._set_up_collection expects Optional[List[dict]]
570+
payload_fields_to_index=None
547571
)
548-
else:
549-
self._initialize_client() # _initialize_client assures the client is initialized
550-
self._client.delete( # type: ignore
572+
573+
try:
574+
self._client.delete(
551575
collection_name=self.index,
552-
points_selector=rest.PointsSelectorAll(),
576+
points_selector=rest.FilterSelector(
577+
filter=rest.Filter(
578+
must=[],
579+
)
580+
),
553581
wait=self.wait_result_from_api,
554582
)
583+
except Exception as e:
584+
logger.warning(
585+
f"Error {e} when calling QdrantDocumentStore.delete_all_documents()",
586+
)
555587

556-
async def del_all_documents_async(self) -> None:
557-
"""
558-
Asynchronously deletes all documents in the document store.
559588

560-
It deletes the collection and recreates it to ensure all documents are removed.
561-
"""
562-
# ToDo
563589

564590
@classmethod
565591
def from_dict(cls, data: Dict[str, Any]) -> "QdrantDocumentStore":
@@ -672,8 +698,10 @@ def get_documents_by_id(
672698
"""
673699
Retrieves documents from Qdrant by their IDs.
674700
675-
:param ids: A list of document IDs to retrieve.
676-
:returns: A list of documents.
701+
:param ids:
702+
A list of document IDs to retrieve.
703+
:returns:
704+
A list of documents.
677705
"""
678706
documents: List[Document] = []
679707

@@ -701,7 +729,8 @@ async def get_documents_by_id_async(
701729
"""
702730
Retrieves documents from Qdrant by their IDs.
703731
704-
:param ids: A list of document IDs to retrieve.
732+
:param ids:
733+
A list of document IDs to retrieve.
705734
:returns:
706735
A list of documents.
707736
"""
@@ -892,7 +921,7 @@ def _query_hybrid(
892921
value, all values will be used for grouping. One point can be in multiple groups.
893922
:param group_size: Maximum amount of points to return per group. Default is 3.
894923
895-
:returns: A list of Document that are most similar to `query_embedding` and `query_sparse_embedding`.
924+
:returns: List of Document that are most similar to `query_embedding` and `query_sparse_embedding`.
896925
897926
:raises QdrantStoreError:
898927
If the Document Store was initialized with `use_sparse_embeddings=False`.
@@ -1002,7 +1031,7 @@ async def _query_by_sparse_async(
10021031
value, all values will be used for grouping. One point can be in multiple groups.
10031032
:param group_size: Maximum amount of points to return per group. Default is 3.
10041033
1005-
:returns: A list of documents that are most similar to `query_sparse_embedding`.
1034+
:returns: List of documents that are most similar to `query_sparse_embedding`.
10061035
10071036
:raises QdrantStoreError:
10081037
If the Document Store was initialized with `use_sparse_embeddings=False`.
@@ -1082,7 +1111,7 @@ async def _query_by_embedding_async(
10821111
value, all values will be used for grouping. One point can be in multiple groups.
10831112
:param group_size: Maximum amount of points to return per group. Default is 3.
10841113
1085-
:returns: A list of documents that are most similar to `query_embedding`.
1114+
:returns: List of documents that are most similar to `query_embedding`.
10861115
"""
10871116
await self._initialize_async_client()
10881117
assert self._async_client is not None
@@ -1147,7 +1176,7 @@ async def _query_hybrid_async(
11471176
value, all values will be used for grouping. One point can be in multiple groups.
11481177
:param group_size: Maximum amount of points to return per group. Default is 3.
11491178
1150-
:returns: A list of Document that are most similar to `query_embedding` and `query_sparse_embedding`.
1179+
:returns: List of Document that are most similar to `query_embedding` and `query_sparse_embedding`.
11511180
11521181
:raises QdrantStoreError:
11531182
If the Document Store was initialized with `use_sparse_embeddings=False`.
@@ -1322,6 +1351,7 @@ def _set_up_collection(
13221351
If the collection exists with incompatible settings.
13231352
:raises ValueError:
13241353
If the collection exists with a different similarity measure or embedding dimension.
1354+
13251355
"""
13261356

13271357
self._initialize_client()
@@ -1519,8 +1549,9 @@ async def _handle_duplicate_documents_async(
15191549
policy: Optional[DuplicatePolicy] = None,
15201550
) -> List[Document]:
15211551
"""
1522-
Asynchronously checks whether any of the passed documents is already existing in the chosen index and returns a
1523-
list of documents that are not in the index yet.
1552+
Asynchronously checks whether any of the passed documents is already existing
1553+
in the chosen index and returns a list of
1554+
documents that are not in the index yet.
15241555
15251556
:param documents: A list of Haystack Document objects.
15261557
:param policy: The duplicate policy to use when writing documents.
@@ -1544,9 +1575,6 @@ def _drop_duplicate_documents(self, documents: List[Document]) -> List[Document]
15441575
"""
15451576
Drop duplicate documents based on same hash ID.
15461577
1547-
:param documents: A list of Haystack Document objects.
1548-
1549-
:returns: A list of Haystack Document objects with unique IDs.
15501578
"""
15511579
_hash_ids: Set = set()
15521580
_documents: List[Document] = []
@@ -1583,6 +1611,7 @@ def _prepare_collection_params(self) -> Dict[str, Any]:
15831611
def _prepare_client_params(self) -> Dict[str, Any]:
15841612
"""
15851613
Prepares the common parameters for client initialization.
1614+
15861615
"""
15871616
return {
15881617
"location": self.location,
@@ -1685,6 +1714,7 @@ def _process_query_point_results(
16851714
def _process_group_results(self, groups: List[rest.PointGroup]) -> List[Document]:
16861715
"""
16871716
Processes grouped query results from Qdrant.
1717+
16881718
"""
16891719
if not groups:
16901720
return []

integrations/qdrant/tests/test_document_store.py

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from haystack import Document
66
from haystack.dataclasses import SparseEmbedding
77
from haystack.document_stores.errors import DuplicateDocumentError
8-
from haystack.document_stores.types import DuplicatePolicy
8+
from haystack.document_stores.types import DuplicatePolicy, DocumentStore
99
from haystack.testing.document_store import (
1010
CountDocumentsTest,
1111
DeleteDocumentsTest,
@@ -297,3 +297,41 @@ def test_set_up_collection_with_dimension_mismatch(self):
297297
):
298298
with pytest.raises(ValueError, match="different vector size"):
299299
document_store._set_up_collection("test_collection", 768, False, "cosine", False, False)
300+
301+
def test_delete_all_documents_no_index_recreation(self, document_store):
302+
document_store._initialize_client()
303+
304+
# write some documents
305+
docs = [Document(id=str(i)) for i in range(5)]
306+
document_store.write_documents(docs)
307+
308+
# delete all documents without recreating the index
309+
document_store.delete_all_documents(recreate_index=False)
310+
assert document_store.count_documents() == 0
311+
312+
# ensure the collection still exists by writing documents again
313+
document_store.write_documents(docs)
314+
assert document_store.count_documents() == 5
315+
316+
def test_delete_all_documents_index_recreation(self, document_store):
317+
document_store._initialize_client()
318+
319+
# write some documents
320+
docs = [Document(id=str(i)) for i in range(5)]
321+
document_store.write_documents(docs)
322+
323+
# get the current document_store config
324+
config_before = document_store._client.get_collection(document_store.index)
325+
326+
# delete all documents with recreating the index
327+
document_store.delete_all_documents(recreate_index=True)
328+
assert document_store.count_documents() == 0
329+
330+
# assure that with the same config
331+
config_after = document_store._client.get_collection(document_store.index)
332+
333+
assert config_before.config == config_after.config
334+
335+
# ensure the collection still exists by writing documents again
336+
document_store.write_documents(docs)
337+
assert document_store.count_documents() == 5

0 commit comments

Comments
 (0)