diff --git a/integrations/qdrant/pyproject.toml b/integrations/qdrant/pyproject.toml index 6eaad6af52..0a1fa8f89c 100644 --- a/integrations/qdrant/pyproject.toml +++ b/integrations/qdrant/pyproject.toml @@ -28,7 +28,7 @@ classifiers = [ "Programming Language :: Python :: Implementation :: PyPy", ] dependencies = [ - "haystack-ai>=2.24.0", + "haystack-ai>=2.26.1", "qdrant-client>=1.12.0" ] diff --git a/integrations/qdrant/src/haystack_integrations/document_stores/qdrant/document_store.py b/integrations/qdrant/src/haystack_integrations/document_stores/qdrant/document_store.py index cf3dcb53c3..a251c27a70 100644 --- a/integrations/qdrant/src/haystack_integrations/document_stores/qdrant/document_store.py +++ b/integrations/qdrant/src/haystack_integrations/document_stores/qdrant/document_store.py @@ -10,6 +10,7 @@ from haystack.document_stores.errors import DocumentStoreError, DuplicateDocumentError from haystack.document_stores.types import DuplicatePolicy from haystack.utils import Secret, deserialize_secrets_inplace +from haystack.utils.misc import _normalize_metadata_field_name from numpy import exp from qdrant_client.http import models as rest from qdrant_client.http.exceptions import UnexpectedResponse @@ -603,14 +604,59 @@ def _check_stop_scrolling(next_offset: Any) -> bool: ) @staticmethod - def _metadata_fields_info_from_schema(payload_schema: dict[str, Any]) -> dict[str, str]: - """Build field name -> type dict from Qdrant payload_schema. Used by get_metadata_fields_info (sync/async).""" - fields_info: dict[str, str] = {} + def _infer_type_from_value(value: Any) -> str: + """ + Infers the type from a metadata value for get_metadata_fields_info. + + Returns types matching OpenSearch format for consistency: + - 'keyword' for strings + - 'long' for integers + - 'float' for floats + - 'boolean' for booleans + """ + if isinstance(value, bool): + return "boolean" + elif isinstance(value, int): + return "long" + elif isinstance(value, float): + return "float" + elif isinstance(value, str): + return "keyword" + else: + return "keyword" + + @staticmethod + def _process_records_fields_info(records: list[Any], field_info: dict[str, dict[str, str]]) -> None: + """ + Update field_info from a batch of Qdrant records. + + Used by get_metadata_fields_info (sync/async). Extracts metadata from + payload["meta"] and infers types for each field. + """ + for record in records: + if record.payload and "meta" in record.payload: + meta = record.payload["meta"] + for field_name, value in meta.items(): + if value is not None and field_name not in field_info: + field_info[field_name] = {"type": QdrantDocumentStore._infer_type_from_value(value)} + + @staticmethod + def _metadata_fields_info_from_schema(payload_schema: dict[str, Any]) -> dict[str, dict[str, str]]: + """ + Build field name -> {type: ...} dict from Qdrant payload_schema. + + Used when payload_schema has indexed metadata fields (e.g. meta.category). + Returns empty dict when schema has no metadata field info. + """ + fields_info: dict[str, dict[str, str]] = {} for field_name, field_config in payload_schema.items(): - if hasattr(field_config, "data_type"): - fields_info[field_name] = str(field_config.data_type) - else: - fields_info[field_name] = "unknown" + if field_name.startswith("meta."): + meta_field = field_name[5:] + if hasattr(field_config, "data_type"): + qdrant_type = str(field_config.data_type).lower() + fields_info[meta_field] = {"type": qdrant_type} + else: + fields_info[meta_field] = {"type": "unknown"} return fields_info @staticmethod @@ -978,14 +1024,18 @@ async def count_documents_by_filter_async(self, filters: dict[str, Any]) -> int: logger.warning(f"Error {e} when calling QdrantDocumentStore.count_documents_by_filter_async()") return 0 - def get_metadata_fields_info(self) -> dict[str, str]: + def get_metadata_fields_info(self) -> dict[str, dict[str, str]]: """ - Returns the information about the fields from the collection. + Returns the information about the metadata fields in the collection. + + Since Qdrant may not have a payload schema for unindexed metadata, + this method scrolls through documents to infer field types from + payload["meta"]. :returns: - A dictionary mapping field names to their types e.g.: + A dictionary mapping field names to their type information e.g.: ```python - {"field_name": "integer"} + {"category": {"type": "keyword"}, "priority": {"type": "long"}} ``` """ self._initialize_client() @@ -994,19 +1044,40 @@ def get_metadata_fields_info(self) -> dict[str, str]: try: collection_info = self._client.get_collection(self.index) payload_schema = collection_info.payload_schema or {} - return self._metadata_fields_info_from_schema(payload_schema) + fields_info = self._metadata_fields_info_from_schema(payload_schema) + + if not fields_info: + next_offset = None + while True: + records, next_offset = self._client.scroll( + collection_name=self.index, + scroll_filter=None, + limit=self.scroll_size, + offset=next_offset, + with_payload=True, + with_vectors=False, + ) + self._process_records_fields_info(records, fields_info) + if self._check_stop_scrolling(next_offset): + break + + return fields_info except (UnexpectedResponse, ValueError) as e: logger.warning(f"Error {e} when calling QdrantDocumentStore.get_metadata_fields_info()") return {} - async def get_metadata_fields_info_async(self) -> dict[str, str]: + async def get_metadata_fields_info_async(self) -> dict[str, dict[str, str]]: """ - Asynchronously returns the information about the fields from the collection. + Asynchronously returns the information about the metadata fields in the collection. + + Since Qdrant may not have a payload schema for unindexed metadata, + this method scrolls through documents to infer field types from + payload["meta"]. :returns: - A dictionary mapping field names to their types e.g.: + A dictionary mapping field names to their type information e.g.: ```python - {"field_name": "integer"} + {"category": {"type": "keyword"}, "priority": {"type": "long"}} ``` """ await self._initialize_async_client() @@ -1015,7 +1086,24 @@ async def get_metadata_fields_info_async(self) -> dict[str, str]: try: collection_info = await self._async_client.get_collection(self.index) payload_schema = collection_info.payload_schema or {} - return self._metadata_fields_info_from_schema(payload_schema) + fields_info = self._metadata_fields_info_from_schema(payload_schema) + + if not fields_info: + next_offset = None + while True: + records, next_offset = await self._async_client.scroll( + collection_name=self.index, + scroll_filter=None, + limit=self.scroll_size, + offset=next_offset, + with_payload=True, + with_vectors=False, + ) + self._process_records_fields_info(records, fields_info) + if self._check_stop_scrolling(next_offset): + break + + return fields_info except (UnexpectedResponse, ValueError) as e: logger.warning(f"Error {e} when calling QdrantDocumentStore.get_metadata_fields_info_async()") return {} @@ -1027,11 +1115,13 @@ def get_metadata_field_min_max(self, metadata_field: str) -> dict[str, Any]: :param metadata_field: The metadata field key (inside ``meta``) to get the minimum and maximum values for. :returns: A dictionary with the keys "min" and "max", where each value is the minimum or maximum value of the - metadata field across all documents. Returns an empty dict if no documents have the field. + metadata field across all documents. Returns ``{"min": None, "max": None}`` if no documents have + the field. """ self._initialize_client() assert self._client is not None + field_name = _normalize_metadata_field_name(metadata_field) try: min_value: Any = None max_value: Any = None @@ -1046,13 +1136,11 @@ def get_metadata_field_min_max(self, metadata_field: str) -> dict[str, Any]: with_payload=True, with_vectors=False, ) - min_value, max_value = self._process_records_min_max(records, metadata_field, min_value, max_value) + min_value, max_value = self._process_records_min_max(records, field_name, min_value, max_value) if self._check_stop_scrolling(next_offset): break - if min_value is not None and max_value is not None: - return {"min": min_value, "max": max_value} - return {} + return {"min": min_value, "max": max_value} except Exception as e: logger.warning(f"Error {e} when calling QdrantDocumentStore.get_metadata_field_min_max()") return {} @@ -1064,11 +1152,13 @@ async def get_metadata_field_min_max_async(self, metadata_field: str) -> dict[st :param metadata_field: The metadata field key (inside ``meta``) to get the minimum and maximum values for. :returns: A dictionary with the keys "min" and "max", where each value is the minimum or maximum value of the - metadata field across all documents. Returns an empty dict if no documents have the field. + metadata field across all documents. Returns ``{"min": None, "max": None}`` if no documents have + the field. """ await self._initialize_async_client() assert self._async_client is not None + field_name = _normalize_metadata_field_name(metadata_field) try: min_value: Any = None max_value: Any = None @@ -1083,13 +1173,11 @@ async def get_metadata_field_min_max_async(self, metadata_field: str) -> dict[st with_payload=True, with_vectors=False, ) - min_value, max_value = self._process_records_min_max(records, metadata_field, min_value, max_value) + min_value, max_value = self._process_records_min_max(records, field_name, min_value, max_value) if self._check_stop_scrolling(next_offset): break - if min_value is not None and max_value is not None: - return {"min": min_value, "max": max_value} - return {} + return {"min": min_value, "max": max_value} except Exception as e: logger.warning(f"Error {e} when calling QdrantDocumentStore.get_metadata_field_min_max_async()") return {} diff --git a/integrations/qdrant/tests/test_document_store.py b/integrations/qdrant/tests/test_document_store.py index 74cc0c8067..d652fc7b7a 100644 --- a/integrations/qdrant/tests/test_document_store.py +++ b/integrations/qdrant/tests/test_document_store.py @@ -6,11 +6,16 @@ from haystack.document_stores.errors import DuplicateDocumentError from haystack.document_stores.types import DuplicatePolicy from haystack.testing.document_store import ( + CountDocumentsByFilterTest, CountDocumentsTest, + CountUniqueMetadataByFilterTest, DeleteAllTest, DeleteByFilterTest, DeleteDocumentsTest, FilterableDocsFixtureMixin, + GetMetadataFieldMinMaxTest, + GetMetadataFieldsInfoTest, + GetMetadataFieldUniqueValuesTest, UpdateByFilterTest, WriteDocumentsTest, _random_embeddings, @@ -27,11 +32,16 @@ class TestQdrantDocumentStore( + CountDocumentsByFilterTest, CountDocumentsTest, + CountUniqueMetadataByFilterTest, DeleteAllTest, DeleteByFilterTest, DeleteDocumentsTest, FilterableDocsFixtureMixin, + GetMetadataFieldMinMaxTest, + GetMetadataFieldUniqueValuesTest, + GetMetadataFieldsInfoTest, UpdateByFilterTest, WriteDocumentsTest, ): @@ -359,116 +369,6 @@ def test_update_by_filter_preserves_vectors(self, document_store: QdrantDocument assert updated_docs[0].embedding is not None assert len(updated_docs[0].embedding) == 768 - def test_count_documents_by_filter(self, document_store: QdrantDocumentStore): - """Test counting documents with filters.""" - docs = [ - Document(content="Doc 1", meta={"category": "A", "year": 2023}), - Document(content="Doc 2", meta={"category": "A", "year": 2024}), - Document(content="Doc 3", meta={"category": "B", "year": 2023}), - Document(content="Doc 4", meta={"category": "B", "year": 2024}), - ] - document_store.write_documents(docs) - - # Test counting all documents - assert document_store.count_documents() == 4 - - # Test counting with single filter - count = document_store.count_documents_by_filter( - filters={"field": "meta.category", "operator": "==", "value": "A"} - ) - assert count == 2 - - # Test counting with multiple filters - count = document_store.count_documents_by_filter( - filters={ - "operator": "AND", - "conditions": [ - {"field": "meta.category", "operator": "==", "value": "B"}, - {"field": "meta.year", "operator": "==", "value": 2023}, - ], - } - ) - assert count == 1 - - def test_get_metadata_fields_info(self, document_store: QdrantDocumentStore): - """Test getting metadata field information.""" - docs = [ - Document(content="Doc 1", meta={"category": "A", "score": 0.9, "tags": ["tag1", "tag2"]}), - Document(content="Doc 2", meta={"category": "B", "score": 0.8, "tags": ["tag2"]}), - ] - document_store.write_documents(docs) - - fields_info = document_store.get_metadata_fields_info() - # Should return empty dict or field info depending on Qdrant collection setup - assert isinstance(fields_info, dict) - - def test_get_metadata_field_min_max(self, document_store: QdrantDocumentStore): - """Test getting min/max values for a metadata field.""" - docs = [ - Document(content="Doc 1", meta={"score": 0.5}), - Document(content="Doc 2", meta={"score": 0.8}), - Document(content="Doc 3", meta={"score": 0.3}), - ] - document_store.write_documents(docs) - - result = document_store.get_metadata_field_min_max("score") - assert result.get("min") == 0.3 - assert result.get("max") == 0.8 - - def test_count_unique_metadata_by_filter(self, document_store: QdrantDocumentStore): - """Test counting unique metadata field values.""" - docs = [ - Document(content="Doc 1", meta={"category": "A"}), - Document(content="Doc 2", meta={"category": "B"}), - Document(content="Doc 3", meta={"category": "A"}), - Document(content="Doc 4", meta={"category": "C"}), - ] - document_store.write_documents(docs) - - result = document_store.count_unique_metadata_by_filter(filters={}, metadata_fields=["category"]) - assert result == {"category": 3} - - def test_count_unique_metadata_by_filter_multiple_fields(self, document_store: QdrantDocumentStore): - """Test counting unique values for multiple metadata fields.""" - docs = [ - Document(content="Doc 1", meta={"category": "A", "status": "active"}), - Document(content="Doc 2", meta={"category": "B", "status": "active"}), - Document(content="Doc 3", meta={"category": "A", "status": "inactive"}), - ] - document_store.write_documents(docs) - - result = document_store.count_unique_metadata_by_filter(filters={}, metadata_fields=["category", "status"]) - assert result == {"category": 2, "status": 2} - - def test_count_unique_metadata_by_filter_with_filter(self, document_store: QdrantDocumentStore): - """Test counting unique metadata field values with filtering.""" - docs = [ - Document(content="Doc 1", meta={"category": "A", "status": "active"}), - Document(content="Doc 2", meta={"category": "B", "status": "active"}), - Document(content="Doc 3", meta={"category": "A", "status": "inactive"}), - ] - document_store.write_documents(docs) - - result = document_store.count_unique_metadata_by_filter( - filters={"field": "meta.status", "operator": "==", "value": "active"}, - metadata_fields=["category"], - ) - assert result == {"category": 2} - - def test_get_metadata_field_unique_values(self, document_store: QdrantDocumentStore): - """Test getting unique metadata field values.""" - docs = [ - Document(content="Doc 1", meta={"category": "A"}), - Document(content="Doc 2", meta={"category": "B"}), - Document(content="Doc 3", meta={"category": "A"}), - Document(content="Doc 4", meta={"category": "C"}), - ] - document_store.write_documents(docs) - - values = document_store.get_metadata_field_unique_values("category") - assert len(values) == 3 - assert set(values) == {"A", "B", "C"} - def test_get_metadata_field_unique_values_pagination(self, document_store: QdrantDocumentStore): """Test getting unique metadata field values with pagination.""" docs = [Document(content=f"Doc {i}", meta={"value": i % 5}) for i in range(10)]