diff --git a/integrations/qdrant/src/haystack_integrations/components/retrievers/qdrant/retriever.py b/integrations/qdrant/src/haystack_integrations/components/retrievers/qdrant/retriever.py index 59b0897605..7cd9fa3901 100644 --- a/integrations/qdrant/src/haystack_integrations/components/retrievers/qdrant/retriever.py +++ b/integrations/qdrant/src/haystack_integrations/components/retrievers/qdrant/retriever.py @@ -46,7 +46,7 @@ def __init__( score_threshold: Optional[float] = None, group_by: Optional[str] = None, group_size: Optional[int] = None, - ): + ) -> None: """ Create a QdrantEmbeddingRetriever component. @@ -136,7 +136,7 @@ def run( score_threshold: Optional[float] = None, group_by: Optional[str] = None, group_size: Optional[int] = None, - ): + ) -> Dict[str, List[Document]]: """ Run the Embedding Retriever on the given input data. @@ -180,7 +180,7 @@ async def run_async( score_threshold: Optional[float] = None, group_by: Optional[str] = None, group_size: Optional[int] = None, - ): + ) -> Dict[str, List[Document]]: """ Asynchronously run the Embedding Retriever on the given input data. @@ -252,7 +252,7 @@ def __init__( score_threshold: Optional[float] = None, group_by: Optional[str] = None, group_size: Optional[int] = None, - ): + ) -> None: """ Create a QdrantSparseEmbeddingRetriever component. @@ -342,7 +342,7 @@ def run( score_threshold: Optional[float] = None, group_by: Optional[str] = None, group_size: Optional[int] = None, - ): + ) -> Dict[str, List[Document]]: """ Run the Sparse Embedding Retriever on the given input data. @@ -391,7 +391,7 @@ async def run_async( score_threshold: Optional[float] = None, group_by: Optional[str] = None, group_size: Optional[int] = None, - ): + ) -> Dict[str, List[Document]]: """ Asynchronously run the Sparse Embedding Retriever on the given input data. @@ -473,7 +473,7 @@ def __init__( score_threshold: Optional[float] = None, group_by: Optional[str] = None, group_size: Optional[int] = None, - ): + ) -> None: """ Create a QdrantHybridRetriever component. @@ -557,7 +557,7 @@ def run( score_threshold: Optional[float] = None, group_by: Optional[str] = None, group_size: Optional[int] = None, - ): + ) -> Dict[str, List[Document]]: """ Run the Sparse Embedding Retriever on the given input data. @@ -606,7 +606,7 @@ async def run_async( score_threshold: Optional[float] = None, group_by: Optional[str] = None, group_size: Optional[int] = None, - ): + ) -> Dict[str, List[Document]]: """ Asynchronously run the Sparse Embedding Retriever on the given input data. diff --git a/integrations/qdrant/src/haystack_integrations/document_stores/qdrant/document_store.py b/integrations/qdrant/src/haystack_integrations/document_stores/qdrant/document_store.py index fdde8e2a89..357b5adb04 100644 --- a/integrations/qdrant/src/haystack_integrations/document_stores/qdrant/document_store.py +++ b/integrations/qdrant/src/haystack_integrations/document_stores/qdrant/document_store.py @@ -1,6 +1,6 @@ import inspect from itertools import islice -from typing import Any, AsyncGenerator, ClassVar, Dict, Generator, List, Optional, Set, Union +from typing import Any, AsyncGenerator, ClassVar, Dict, Generator, List, Optional, Set, Tuple, Union import numpy as np import qdrant_client @@ -18,6 +18,7 @@ from .converters import ( DENSE_VECTORS_NAME, SPARSE_VECTORS_NAME, + QdrantPoint, convert_haystack_documents_to_qdrant_points, convert_id, convert_qdrant_point_to_haystack_document, @@ -34,7 +35,7 @@ class QdrantStoreError(DocumentStoreError): FilterType = Dict[str, Union[Dict[str, Any], List[Any], str, int, float, bool]] -def get_batches_from_generator(iterable, n): +def get_batches_from_generator(iterable: List, n: int) -> Generator: """ Batch elements of an iterable into fixed-length chunks or blocks. """ @@ -127,10 +128,10 @@ def __init__( write_batch_size: int = 100, scroll_size: int = 10_000, payload_fields_to_index: Optional[List[dict]] = None, - ): + ) -> None: """ :param location: - If `memory` - use in-memory Qdrant instance. + If `":memory:"` - use in-memory Qdrant instance. If `str` - use it as a URL parameter. If `None` - use default values for host and port. :param url: @@ -164,7 +165,7 @@ def __init__( Dimension of the embeddings. :param on_disk: Whether to store the collection on disk. - :param use_sparse_embedding: + :param use_sparse_embeddings: If set to `True`, enables support for sparse embeddings. :param sparse_idf: If set to `True`, computes the Inverse Document Frequency (IDF) when using sparse embeddings. @@ -232,7 +233,6 @@ def __init__( self.path = path self.force_disable_check_same_thread = force_disable_check_same_thread self.metadata = metadata or {} - self.api_key = api_key # Store the Qdrant collection specific attributes self.shard_number = shard_number @@ -258,9 +258,10 @@ def __init__( self.write_batch_size = write_batch_size self.scroll_size = scroll_size - def _initialize_client(self): + def _initialize_client(self) -> None: if self._client is None: client_params = self._prepare_client_params() + # This step adds the api-key and User-Agent to metadata self._client = qdrant_client.QdrantClient(**client_params) # Make sure the collection is properly set up self._set_up_collection( @@ -274,7 +275,7 @@ def _initialize_client(self): self.payload_fields_to_index, ) - async def _initialize_async_client(self): + async def _initialize_async_client(self) -> None: """ Returns the asynchronous Qdrant client, initializing it if necessary. """ @@ -628,8 +629,6 @@ def get_documents_by_id( :param ids: A list of document IDs to retrieve. - :param index: - The name of the index to retrieve documents from. :returns: A list of documents. """ @@ -661,8 +660,6 @@ async def get_documents_by_id_async( :param ids: A list of document IDs to retrieve. - :param index: - The name of the index to retrieve documents from. :returns: A list of documents. """ @@ -1210,7 +1207,7 @@ def get_distance(self, similarity: str) -> rest.Distance: ) raise QdrantStoreError(msg) from ke - def _create_payload_index(self, collection_name: str, payload_fields_to_index: Optional[List[dict]] = None): + def _create_payload_index(self, collection_name: str, payload_fields_to_index: Optional[List[dict]] = None) -> None: """ Create payload index for the collection if payload_fields_to_index is provided See: https://qdrant.tech/documentation/concepts/indexing/#payload-index @@ -1229,7 +1226,7 @@ def _create_payload_index(self, collection_name: str, payload_fields_to_index: O async def _create_payload_index_async( self, collection_name: str, payload_fields_to_index: Optional[List[dict]] = None - ): + ) -> None: """ Asynchronously create payload index for the collection if payload_fields_to_index is provided See: https://qdrant.tech/documentation/concepts/indexing/#payload-index @@ -1257,7 +1254,7 @@ def _set_up_collection( sparse_idf: bool, on_disk: bool = False, payload_fields_to_index: Optional[List[dict]] = None, - ): + ) -> None: """ Sets up the Qdrant collection with the specified parameters. :param collection_name: @@ -1313,7 +1310,7 @@ async def _set_up_collection_async( sparse_idf: bool, on_disk: bool = False, payload_fields_to_index: Optional[List[dict]] = None, - ): + ) -> None: """ Asynchronously sets up the Qdrant collection with the specified parameters. :param collection_name: @@ -1367,7 +1364,7 @@ def recreate_collection( on_disk: Optional[bool] = None, use_sparse_embeddings: Optional[bool] = None, sparse_idf: bool = False, - ): + ) -> None: """ Recreates the Qdrant collection with the specified parameters. @@ -1410,7 +1407,7 @@ async def recreate_collection_async( on_disk: Optional[bool] = None, use_sparse_embeddings: Optional[bool] = None, sparse_idf: bool = False, - ): + ) -> None: """ Asynchronously recreates the Qdrant collection with the specified parameters. @@ -1449,7 +1446,7 @@ def _handle_duplicate_documents( self, documents: List[Document], policy: DuplicatePolicy = None, - ): + ) -> List[Document]: """ Checks whether any of the passed documents is already existing in the chosen index and returns a list of documents that are not in the index yet. @@ -1476,7 +1473,7 @@ async def _handle_duplicate_documents_async( self, documents: List[Document], policy: DuplicatePolicy = None, - ): + ) -> List[Document]: """ Asynchronously checks whether any of the passed documents is already existing in the chosen index and returns a list of @@ -1521,7 +1518,7 @@ def _drop_duplicate_documents(self, documents: List[Document]) -> List[Document] return _documents - def _prepare_collection_params(self): + def _prepare_collection_params(self) -> Dict[str, Any]: """ Prepares the common parameters for collection creation. """ @@ -1537,7 +1534,7 @@ def _prepare_collection_params(self): "init_from": self.init_from, } - def _prepare_client_params(self): + def _prepare_client_params(self) -> Dict[str, Any]: """ Prepares the common parameters for client initialization. @@ -1554,7 +1551,10 @@ def _prepare_client_params(self): "timeout": self.timeout, "host": self.host, "path": self.path, - "metadata": self.metadata, + # NOTE: We purposefully expand the fields of self.metadata to avoid modifying the original self.metadata + # class attribute. For example, the resolved api key is added to metadata by the QdrantClient class + # when using a hosted Qdrant service, which means running to_dict() exposes the api key. + "metadata": {**self.metadata}, "force_disable_check_same_thread": self.force_disable_check_same_thread, } @@ -1565,7 +1565,7 @@ def _prepare_collection_config( on_disk: Optional[bool] = None, use_sparse_embeddings: Optional[bool] = None, sparse_idf: bool = False, - ): + ) -> Tuple[Dict[str, rest.VectorParams], Optional[Dict[str, rest.SparseVectorParams]]]: """ Prepares the configuration for creating or recreating a Qdrant collection. @@ -1595,9 +1595,12 @@ def _prepare_collection_config( return vectors_config, sparse_vectors_config - def _validate_filters(self, filters: Optional[Union[Dict[str, Any], rest.Filter]] = None): + def _validate_filters(self, filters: Optional[Union[Dict[str, Any], rest.Filter]] = None) -> None: """ Validates the filters provided for querying. + + :param filters: Filters to validate. Can be a dictionary or an instance of `qdrant_client.http.models.Filter`. + :raises ValueError: If the filters are not in the correct format or syntax. """ if filters and not isinstance(filters, dict) and not isinstance(filters, rest.Filter): msg = "Filter must be a dictionary or an instance of `qdrant_client.http.models.Filter`" @@ -1607,7 +1610,7 @@ def _validate_filters(self, filters: Optional[Union[Dict[str, Any], rest.Filter] msg = "Invalid filter syntax. See https://docs.haystack.deepset.ai/docs/metadata-filtering for details." raise ValueError(msg) - def _process_query_point_results(self, results, scale_score: bool = False): + def _process_query_point_results(self, results: List[QdrantPoint], scale_score: bool = False) -> List[Document]: """ Processes query results from Qdrant. """ @@ -1627,7 +1630,7 @@ def _process_query_point_results(self, results, scale_score: bool = False): return documents - def _process_group_results(self, groups): + def _process_group_results(self, groups: List[rest.PointGroup]) -> List[Document]: """ Processes grouped query results from Qdrant. @@ -1647,7 +1650,7 @@ def _validate_collection_compatibility( collection_info, distance, embedding_dim: int, - ): + ) -> None: """ Validates that an existing collection is compatible with the current configuration. """ diff --git a/integrations/qdrant/src/haystack_integrations/document_stores/qdrant/filters.py b/integrations/qdrant/src/haystack_integrations/document_stores/qdrant/filters.py index 69fd7cbbd1..36d512ae4f 100644 --- a/integrations/qdrant/src/haystack_integrations/document_stores/qdrant/filters.py +++ b/integrations/qdrant/src/haystack_integrations/document_stores/qdrant/filters.py @@ -138,10 +138,10 @@ def convert_filters_to_qdrant( def build_filters_for_repeated_operators( - must_clauses, - should_clauses, - must_not_clauses, - qdrant_filter, + must_clauses: List, + should_clauses: List, + must_not_clauses: List, + qdrant_filter: List[models.Filter], ) -> List[models.Filter]: """ Flattens the nested lists of clauses by creating separate Filters for each clause of a logical operator. diff --git a/integrations/qdrant/src/haystack_integrations/document_stores/qdrant/migrate_to_sparse.py b/integrations/qdrant/src/haystack_integrations/document_stores/qdrant/migrate_to_sparse.py index 954269ce4b..0af7096828 100644 --- a/integrations/qdrant/src/haystack_integrations/document_stores/qdrant/migrate_to_sparse.py +++ b/integrations/qdrant/src/haystack_integrations/document_stores/qdrant/migrate_to_sparse.py @@ -11,7 +11,7 @@ logger.setLevel(python_logging.INFO) -def migrate_to_sparse_embeddings_support(old_document_store: QdrantDocumentStore, new_index: str): +def migrate_to_sparse_embeddings_support(old_document_store: QdrantDocumentStore, new_index: str) -> None: """ Utility function to migrate an existing `QdrantDocumentStore` to a new one with support for sparse embeddings. diff --git a/integrations/qdrant/tests/test_document_store.py b/integrations/qdrant/tests/test_document_store.py index 2e496cd0fa..1920aaf8c2 100644 --- a/integrations/qdrant/tests/test_document_store.py +++ b/integrations/qdrant/tests/test_document_store.py @@ -12,6 +12,7 @@ WriteDocumentsTest, _random_embeddings, ) +from haystack.utils import Secret from qdrant_client.http import models as rest from haystack_integrations.document_stores.qdrant.document_store import ( @@ -38,6 +39,79 @@ def test_init_is_lazy(self): QdrantDocumentStore(location=":memory:", use_sparse_embeddings=True) mocked_qdrant.assert_not_called() + def test_prepare_client_params_no_mutability(self): + metadata = {"key": "value"} + doc_store = QdrantDocumentStore( + ":memory:", + recreate_index=True, + return_embedding=True, + wait_result_from_api=True, + use_sparse_embeddings=False, + metadata=metadata, + ) + client_params = doc_store._prepare_client_params() + # Mutate value of metadata in client_params + client_params["metadata"] = client_params["metadata"].update({"new_key": "new_value"}) + + # Assert that the original metadata in the document store is unchanged + assert metadata == {"key": "value"} + + def test_to_dict(self, monkeypatch): + monkeypatch.setenv("QDRANT_API_KEY", "test_api_key") + doc_store = QdrantDocumentStore( + ":memory:", + recreate_index=True, + return_embedding=True, + wait_result_from_api=True, + use_sparse_embeddings=False, + api_key=Secret.from_env_var("QDRANT_API_KEY"), + ) + expected_dict = { + "type": "haystack_integrations.document_stores.qdrant.document_store.QdrantDocumentStore", + "init_parameters": { + "location": ":memory:", + "url": None, + "port": 6333, + "grpc_port": 6334, + "prefer_grpc": False, + "https": None, + "api_key": { + "env_vars": ["QDRANT_API_KEY"], + "strict": True, + "type": "env_var", + }, + "prefix": None, + "timeout": None, + "host": None, + "path": None, + "force_disable_check_same_thread": False, + "index": "Document", + "embedding_dim": 768, + "on_disk": False, + "use_sparse_embeddings": False, + "sparse_idf": False, + "similarity": "cosine", + "return_embedding": True, + "progress_bar": True, + "recreate_index": True, + "shard_number": None, + "replication_factor": None, + "write_consistency_factor": None, + "on_disk_payload": None, + "hnsw_config": None, + "optimizers_config": None, + "wal_config": None, + "quantization_config": None, + "init_from": None, + "wait_result_from_api": True, + "metadata": {}, + "write_batch_size": 100, + "scroll_size": 10000, + "payload_fields_to_index": None, + }, + } + assert doc_store.to_dict() == expected_dict + def assert_documents_are_equal(self, received: List[Document], expected: List[Document]): """ Assert that two lists of Documents are equal.