Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion integrations/qdrant/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ classifiers = [
"Programming Language :: Python :: Implementation :: PyPy",
]
dependencies = [
"haystack-ai>=2.24.0",
"haystack-ai>=2.26.1",
"qdrant-client>=1.12.0"
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from haystack.document_stores.errors import DocumentStoreError, DuplicateDocumentError
from haystack.document_stores.types import DuplicatePolicy
from haystack.utils import Secret, deserialize_secrets_inplace
from haystack.utils.misc import _normalize_metadata_field_name
from numpy import exp
from qdrant_client.http import models as rest
from qdrant_client.http.exceptions import UnexpectedResponse
Expand Down Expand Up @@ -603,14 +604,59 @@ def _check_stop_scrolling(next_offset: Any) -> bool:
)

@staticmethod
def _metadata_fields_info_from_schema(payload_schema: dict[str, Any]) -> dict[str, str]:
"""Build field name -> type dict from Qdrant payload_schema. Used by get_metadata_fields_info (sync/async)."""
fields_info: dict[str, str] = {}
def _infer_type_from_value(value: Any) -> str:
"""
Infers the type from a metadata value for get_metadata_fields_info.

Returns types matching OpenSearch format for consistency:
- 'keyword' for strings
- 'long' for integers
- 'float' for floats
- 'boolean' for booleans
"""
if isinstance(value, bool):
return "boolean"
elif isinstance(value, int):
return "long"
elif isinstance(value, float):
return "float"
elif isinstance(value, str):
return "keyword"
else:
return "keyword"

@staticmethod
def _process_records_fields_info(records: list[Any], field_info: dict[str, dict[str, str]]) -> None:
"""
Update field_info from a batch of Qdrant records.

Used by get_metadata_fields_info (sync/async). Extracts metadata from
payload["meta"] and infers types for each field.
"""
for record in records:
if record.payload and "meta" in record.payload:
meta = record.payload["meta"]
for field_name, value in meta.items():
if value is not None and field_name not in field_info:
field_info[field_name] = {"type": QdrantDocumentStore._infer_type_from_value(value)}

@staticmethod
def _metadata_fields_info_from_schema(payload_schema: dict[str, Any]) -> dict[str, dict[str, str]]:
"""
Build field name -> {type: ...} dict from Qdrant payload_schema.

Used when payload_schema has indexed metadata fields (e.g. meta.category).
Returns empty dict when schema has no metadata field info.
"""
fields_info: dict[str, dict[str, str]] = {}
for field_name, field_config in payload_schema.items():
if hasattr(field_config, "data_type"):
fields_info[field_name] = str(field_config.data_type)
else:
fields_info[field_name] = "unknown"
if field_name.startswith("meta."):
meta_field = field_name[5:]
if hasattr(field_config, "data_type"):
qdrant_type = str(field_config.data_type).lower()
fields_info[meta_field] = {"type": qdrant_type}
else:
fields_info[meta_field] = {"type": "unknown"}
return fields_info

@staticmethod
Expand Down Expand Up @@ -978,14 +1024,18 @@ async def count_documents_by_filter_async(self, filters: dict[str, Any]) -> int:
logger.warning(f"Error {e} when calling QdrantDocumentStore.count_documents_by_filter_async()")
return 0

def get_metadata_fields_info(self) -> dict[str, str]:
def get_metadata_fields_info(self) -> dict[str, dict[str, str]]:
"""
Returns the information about the fields from the collection.
Returns the information about the metadata fields in the collection.

Since Qdrant may not have a payload schema for unindexed metadata,
this method scrolls through documents to infer field types from
payload["meta"].

:returns:
A dictionary mapping field names to their types e.g.:
A dictionary mapping field names to their type information e.g.:
```python
{"field_name": "integer"}
{"category": {"type": "keyword"}, "priority": {"type": "long"}}
```
"""
self._initialize_client()
Expand All @@ -994,19 +1044,40 @@ def get_metadata_fields_info(self) -> dict[str, str]:
try:
collection_info = self._client.get_collection(self.index)
payload_schema = collection_info.payload_schema or {}
return self._metadata_fields_info_from_schema(payload_schema)
fields_info = self._metadata_fields_info_from_schema(payload_schema)

if not fields_info:
next_offset = None
while True:
records, next_offset = self._client.scroll(
collection_name=self.index,
scroll_filter=None,
limit=self.scroll_size,
offset=next_offset,
with_payload=True,
with_vectors=False,
)
self._process_records_fields_info(records, fields_info)
if self._check_stop_scrolling(next_offset):
break

return fields_info
except (UnexpectedResponse, ValueError) as e:
logger.warning(f"Error {e} when calling QdrantDocumentStore.get_metadata_fields_info()")
return {}

async def get_metadata_fields_info_async(self) -> dict[str, str]:
async def get_metadata_fields_info_async(self) -> dict[str, dict[str, str]]:
"""
Asynchronously returns the information about the fields from the collection.
Asynchronously returns the information about the metadata fields in the collection.

Since Qdrant may not have a payload schema for unindexed metadata,
this method scrolls through documents to infer field types from
payload["meta"].

:returns:
A dictionary mapping field names to their types e.g.:
A dictionary mapping field names to their type information e.g.:
```python
{"field_name": "integer"}
{"category": {"type": "keyword"}, "priority": {"type": "long"}}
```
"""
await self._initialize_async_client()
Expand All @@ -1015,7 +1086,24 @@ async def get_metadata_fields_info_async(self) -> dict[str, str]:
try:
collection_info = await self._async_client.get_collection(self.index)
payload_schema = collection_info.payload_schema or {}
return self._metadata_fields_info_from_schema(payload_schema)
fields_info = self._metadata_fields_info_from_schema(payload_schema)

if not fields_info:
next_offset = None
while True:
records, next_offset = await self._async_client.scroll(
collection_name=self.index,
scroll_filter=None,
limit=self.scroll_size,
offset=next_offset,
with_payload=True,
with_vectors=False,
)
self._process_records_fields_info(records, fields_info)
if self._check_stop_scrolling(next_offset):
break

return fields_info
except (UnexpectedResponse, ValueError) as e:
logger.warning(f"Error {e} when calling QdrantDocumentStore.get_metadata_fields_info_async()")
return {}
Expand All @@ -1027,11 +1115,13 @@ def get_metadata_field_min_max(self, metadata_field: str) -> dict[str, Any]:
:param metadata_field: The metadata field key (inside ``meta``) to get the minimum and maximum values for.

:returns: A dictionary with the keys "min" and "max", where each value is the minimum or maximum value of the
metadata field across all documents. Returns an empty dict if no documents have the field.
metadata field across all documents. Returns ``{"min": None, "max": None}`` if no documents have
the field.
"""
self._initialize_client()
assert self._client is not None

field_name = _normalize_metadata_field_name(metadata_field)
try:
min_value: Any = None
max_value: Any = None
Expand All @@ -1046,13 +1136,11 @@ def get_metadata_field_min_max(self, metadata_field: str) -> dict[str, Any]:
with_payload=True,
with_vectors=False,
)
min_value, max_value = self._process_records_min_max(records, metadata_field, min_value, max_value)
min_value, max_value = self._process_records_min_max(records, field_name, min_value, max_value)
if self._check_stop_scrolling(next_offset):
break

if min_value is not None and max_value is not None:
return {"min": min_value, "max": max_value}
return {}
return {"min": min_value, "max": max_value}
except Exception as e:
logger.warning(f"Error {e} when calling QdrantDocumentStore.get_metadata_field_min_max()")
return {}
Expand All @@ -1064,11 +1152,13 @@ async def get_metadata_field_min_max_async(self, metadata_field: str) -> dict[st
:param metadata_field: The metadata field key (inside ``meta``) to get the minimum and maximum values for.

:returns: A dictionary with the keys "min" and "max", where each value is the minimum or maximum value of the
metadata field across all documents. Returns an empty dict if no documents have the field.
metadata field across all documents. Returns ``{"min": None, "max": None}`` if no documents have
the field.
"""
await self._initialize_async_client()
assert self._async_client is not None

field_name = _normalize_metadata_field_name(metadata_field)
try:
min_value: Any = None
max_value: Any = None
Expand All @@ -1083,13 +1173,11 @@ async def get_metadata_field_min_max_async(self, metadata_field: str) -> dict[st
with_payload=True,
with_vectors=False,
)
min_value, max_value = self._process_records_min_max(records, metadata_field, min_value, max_value)
min_value, max_value = self._process_records_min_max(records, field_name, min_value, max_value)
if self._check_stop_scrolling(next_offset):
break

if min_value is not None and max_value is not None:
return {"min": min_value, "max": max_value}
return {}
return {"min": min_value, "max": max_value}
except Exception as e:
logger.warning(f"Error {e} when calling QdrantDocumentStore.get_metadata_field_min_max_async()")
return {}
Expand Down
120 changes: 10 additions & 110 deletions integrations/qdrant/tests/test_document_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,16 @@
from haystack.document_stores.errors import DuplicateDocumentError
from haystack.document_stores.types import DuplicatePolicy
from haystack.testing.document_store import (
CountDocumentsByFilterTest,
CountDocumentsTest,
CountUniqueMetadataByFilterTest,
DeleteAllTest,
DeleteByFilterTest,
DeleteDocumentsTest,
FilterableDocsFixtureMixin,
GetMetadataFieldMinMaxTest,
GetMetadataFieldsInfoTest,
GetMetadataFieldUniqueValuesTest,
UpdateByFilterTest,
WriteDocumentsTest,
_random_embeddings,
Expand All @@ -27,11 +32,16 @@


class TestQdrantDocumentStore(
CountDocumentsByFilterTest,
CountDocumentsTest,
CountUniqueMetadataByFilterTest,
DeleteAllTest,
DeleteByFilterTest,
DeleteDocumentsTest,
FilterableDocsFixtureMixin,
GetMetadataFieldMinMaxTest,
GetMetadataFieldUniqueValuesTest,
GetMetadataFieldsInfoTest,
UpdateByFilterTest,
WriteDocumentsTest,
):
Expand Down Expand Up @@ -359,116 +369,6 @@ def test_update_by_filter_preserves_vectors(self, document_store: QdrantDocument
assert updated_docs[0].embedding is not None
assert len(updated_docs[0].embedding) == 768

def test_count_documents_by_filter(self, document_store: QdrantDocumentStore):
"""Test counting documents with filters."""
docs = [
Document(content="Doc 1", meta={"category": "A", "year": 2023}),
Document(content="Doc 2", meta={"category": "A", "year": 2024}),
Document(content="Doc 3", meta={"category": "B", "year": 2023}),
Document(content="Doc 4", meta={"category": "B", "year": 2024}),
]
document_store.write_documents(docs)

# Test counting all documents
assert document_store.count_documents() == 4

# Test counting with single filter
count = document_store.count_documents_by_filter(
filters={"field": "meta.category", "operator": "==", "value": "A"}
)
assert count == 2

# Test counting with multiple filters
count = document_store.count_documents_by_filter(
filters={
"operator": "AND",
"conditions": [
{"field": "meta.category", "operator": "==", "value": "B"},
{"field": "meta.year", "operator": "==", "value": 2023},
],
}
)
assert count == 1

def test_get_metadata_fields_info(self, document_store: QdrantDocumentStore):
"""Test getting metadata field information."""
docs = [
Document(content="Doc 1", meta={"category": "A", "score": 0.9, "tags": ["tag1", "tag2"]}),
Document(content="Doc 2", meta={"category": "B", "score": 0.8, "tags": ["tag2"]}),
]
document_store.write_documents(docs)

fields_info = document_store.get_metadata_fields_info()
# Should return empty dict or field info depending on Qdrant collection setup
assert isinstance(fields_info, dict)

def test_get_metadata_field_min_max(self, document_store: QdrantDocumentStore):
"""Test getting min/max values for a metadata field."""
docs = [
Document(content="Doc 1", meta={"score": 0.5}),
Document(content="Doc 2", meta={"score": 0.8}),
Document(content="Doc 3", meta={"score": 0.3}),
]
document_store.write_documents(docs)

result = document_store.get_metadata_field_min_max("score")
assert result.get("min") == 0.3
assert result.get("max") == 0.8

def test_count_unique_metadata_by_filter(self, document_store: QdrantDocumentStore):
"""Test counting unique metadata field values."""
docs = [
Document(content="Doc 1", meta={"category": "A"}),
Document(content="Doc 2", meta={"category": "B"}),
Document(content="Doc 3", meta={"category": "A"}),
Document(content="Doc 4", meta={"category": "C"}),
]
document_store.write_documents(docs)

result = document_store.count_unique_metadata_by_filter(filters={}, metadata_fields=["category"])
assert result == {"category": 3}

def test_count_unique_metadata_by_filter_multiple_fields(self, document_store: QdrantDocumentStore):
"""Test counting unique values for multiple metadata fields."""
docs = [
Document(content="Doc 1", meta={"category": "A", "status": "active"}),
Document(content="Doc 2", meta={"category": "B", "status": "active"}),
Document(content="Doc 3", meta={"category": "A", "status": "inactive"}),
]
document_store.write_documents(docs)

result = document_store.count_unique_metadata_by_filter(filters={}, metadata_fields=["category", "status"])
assert result == {"category": 2, "status": 2}

def test_count_unique_metadata_by_filter_with_filter(self, document_store: QdrantDocumentStore):
"""Test counting unique metadata field values with filtering."""
docs = [
Document(content="Doc 1", meta={"category": "A", "status": "active"}),
Document(content="Doc 2", meta={"category": "B", "status": "active"}),
Document(content="Doc 3", meta={"category": "A", "status": "inactive"}),
]
document_store.write_documents(docs)

result = document_store.count_unique_metadata_by_filter(
filters={"field": "meta.status", "operator": "==", "value": "active"},
metadata_fields=["category"],
)
assert result == {"category": 2}

def test_get_metadata_field_unique_values(self, document_store: QdrantDocumentStore):
"""Test getting unique metadata field values."""
docs = [
Document(content="Doc 1", meta={"category": "A"}),
Document(content="Doc 2", meta={"category": "B"}),
Document(content="Doc 3", meta={"category": "A"}),
Document(content="Doc 4", meta={"category": "C"}),
]
document_store.write_documents(docs)

values = document_store.get_metadata_field_unique_values("category")
assert len(values) == 3
assert set(values) == {"A", "B", "C"}

def test_get_metadata_field_unique_values_pagination(self, document_store: QdrantDocumentStore):
"""Test getting unique metadata field values with pagination."""
docs = [Document(content=f"Doc {i}", meta={"value": i % 5}) for i in range(10)]
Expand Down
Loading