diff --git a/integrations/opensearch/pyproject.toml b/integrations/opensearch/pyproject.toml index 6059e0db3b..fefb8e8c70 100644 --- a/integrations/opensearch/pyproject.toml +++ b/integrations/opensearch/pyproject.toml @@ -24,7 +24,8 @@ classifiers = [ ] dependencies = [ "haystack-ai>=2.22.0", - "opensearch-py[async]>=2.4.0,<3"] + "opensearch-py[async]>=2.4.0,<3" +] [project.urls] Documentation = "https://github.com/deepset-ai/haystack-core-integrations/tree/main/integrations/opensearch#readme" diff --git a/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py b/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py index 05f167b400..41b9c9abf3 100644 --- a/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py +++ b/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py @@ -21,6 +21,7 @@ logger = logging.getLogger(__name__) +SPECIAL_FIELDS = {"content", "embedding", "id", "score", "sparse_embedding", "blob"} Hosts = str | list[str | Mapping[str, str | int]] @@ -1141,3 +1142,433 @@ def _render_custom_query(self, custom_query: Any, substitutions: dict[str, Any]) return substitutions.get(custom_query, custom_query) return custom_query + + def count_documents_by_filter(self, filters: dict[str, Any]) -> int: + """ + Returns the number of documents that match the provided filters. + + :param filters: The filters to apply to count documents. + For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) + :returns: The number of documents that match the filters. + """ + self._ensure_initialized() + assert self._client is not None + + normalized_filters = normalize_filters(filters) + body = {"query": {"bool": {"filter": normalized_filters}}} + return self._client.count(index=self._index, body=body)["count"] + + async def count_documents_by_filter_async(self, filters: dict[str, Any]) -> int: + """ + Asynchronously returns the number of documents that match the provided filters. + + :param filters: The filters to apply to count documents. + For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) + :returns: The number of documents that match the filters. + """ + await self._ensure_initialized_async() + assert self._async_client is not None + + normalized_filters = normalize_filters(filters) + body = {"query": {"bool": {"filter": normalized_filters}}} + return (await self._async_client.count(index=self._index, body=body))["count"] + + @staticmethod + def _build_cardinality_aggregations(index_mapping: dict[str, Any], fields: list[str]) -> dict[str, Any]: + """ + Builds cardinality aggregations for specified metadata fields in the index mapping. + + :param index_mapping: The index mapping containing field definitions. + :param fields: List of field names to build aggregations for. + :returns: Dictionary of cardinality aggregations. + + See: https://docs.opensearch.org/latest/aggregations/metric/cardinality/ + """ + aggs = {} + for field_name in fields: + if field_name not in SPECIAL_FIELDS and field_name in index_mapping: + aggs[f"{field_name}_cardinality"] = {"cardinality": {"field": field_name}} + return aggs + + @staticmethod + def _build_distinct_values_query_body(filters: dict[str, Any] | None, aggs: dict[str, Any]) -> dict[str, Any]: + """ + Builds the query body for distinct values counting with filters and aggregations. + """ + if filters: + normalized_filters = normalize_filters(filters) + return { + "query": {"bool": {"filter": normalized_filters}}, + "aggs": aggs, + "size": 0, # We only need aggregations, not documents + } + else: + # No filters - match all documents + return { + "query": {"match_all": {}}, + "aggs": aggs, + "size": 0, # We only need aggregations, not documents + } + + @staticmethod + def _extract_distinct_counts_from_aggregations( + aggregations: dict[str, Any], index_mapping: dict[str, Any], fields: list[str] + ) -> dict[str, int]: + """ + Extracts distinct value counts from search result aggregations. + + :param aggregations: The aggregations result from the search query. + :param index_mapping: The index mapping containing field definitions. + :param fields: List of field names to extract counts for. + :returns: Dictionary mapping field names to their distinct value counts. + """ + distinct_counts = {} + for field_name in fields: + if field_name not in SPECIAL_FIELDS and field_name in index_mapping: + agg_key = f"{field_name}_cardinality" + if agg_key in aggregations: + distinct_counts[field_name] = aggregations[agg_key]["value"] + return distinct_counts + + def count_unique_metadata_by_filter(self, filters: dict[str, Any], metadata_fields: list[str]) -> dict[str, int]: + """ + Returns the number of unique values for each specified metadata field of the documents + that match the provided filters. + + :param filters: The filters to apply to count documents. + For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) + :param metadata_fields: List of field names to calculate unique values for. + Field names can include or omit the "meta." prefix. + :returns: A dictionary mapping each metadata field name to the count of its unique values among the filtered + documents. + :raises ValueError: If any of the requested fields don't exist in the index mapping. + """ + self._ensure_initialized() + assert self._client is not None + + # use index mapping to get all fields + mapping = self._client.indices.get_mapping(index=self._index) + index_mapping = mapping[self._index]["mappings"]["properties"] + + # normalize field names + normalized_metadata_fields = [self._normalize_metadata_field_name(field) for field in metadata_fields] + # validate that all requested fields exist in the index mapping + missing_fields = [f for f in normalized_metadata_fields if f not in index_mapping] + if missing_fields: + msg = f"Fields not found in index mapping: {missing_fields}" + raise ValueError(msg) + + # build aggregations for specified metadata fields + aggs = self._build_cardinality_aggregations(index_mapping, normalized_metadata_fields) + if not aggs: + return {} + + # build and execute search query + body = self._build_distinct_values_query_body(filters, aggs) + result = self._client.search(index=self._index, body=body) + + # extract cardinality values from aggregations + return self._extract_distinct_counts_from_aggregations( + result.get("aggregations", {}), index_mapping, normalized_metadata_fields + ) + + async def count_unique_metadata_by_filter_async( + self, filters: dict[str, Any], metadata_fields: list[str] + ) -> dict[str, int]: + """ + Asynchronously returns the number of unique values for each specified metadata field of the documents + that match the provided filters. + + :param filters: The filters to apply to count documents. + For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) + :param metadata_fields: List of field names to calculate unique values for. + Field names can include or omit the "meta." prefix. + :returns: A dictionary mapping each metadata field name to the count of its unique values among the filtered + documents. + :raises ValueError: If any of the requested fields don't exist in the index mapping. + """ + await self._ensure_initialized_async() + assert self._async_client is not None + + # use index mapping to get all fields + mapping = await self._async_client.indices.get_mapping(index=self._index) + index_mapping = mapping[self._index]["mappings"]["properties"] + + # normalize field names + normalized_metadata_fields = [self._normalize_metadata_field_name(field) for field in metadata_fields] + # validate that all requested fields exist in the index mapping + missing_fields = [f for f in normalized_metadata_fields if f not in index_mapping] + if missing_fields: + msg = f"Fields not found in index mapping: {missing_fields}" + raise ValueError(msg) + + # build aggregations for specified metadata fields + aggs = self._build_cardinality_aggregations(index_mapping, normalized_metadata_fields) + if not aggs: + return {} + + # build and execute search query + body = self._build_distinct_values_query_body(filters, aggs) + result = await self._async_client.search(index=self._index, body=body) + + # extract cardinality values from aggregations + return self._extract_distinct_counts_from_aggregations( + result.get("aggregations", {}), index_mapping, normalized_metadata_fields + ) + + def get_metadata_fields_info(self) -> dict[str, dict[str, str]]: + """ + Returns the information about the fields in the index. + + If we populated the index with documents like: + + Document(content="Doc 1", meta={"category": "A", "status": "active", "priority": 1}) + Document(content="Doc 2", meta={"category": "B", "status": "inactive"}) + + This method would return: + + { + 'content': {'type': 'text'}, + 'category': {'type': 'keyword'}, + 'status': {'type': 'keyword'}, + 'priority': {'type': 'long'}, + } + + :returns: The information about the fields in the index. + """ + self._ensure_initialized() + assert self._client is not None + + mapping = self._client.indices.get_mapping(index=self._index) + index_mapping = mapping[self._index]["mappings"]["properties"] + # remove all fields that are not metadata fields + index_mapping = {k: v for k, v in index_mapping.items() if k not in SPECIAL_FIELDS} + return index_mapping + + async def get_metadata_fields_info_async(self) -> dict[str, dict[str, str]]: + """ + Asynchronously returns the information about the fields in the index. + + If we populated the index with documents like: + + Document(content="Doc 1", meta={"category": "A", "status": "active", "priority": 1}) + Document(content="Doc 2", meta={"category": "B", "status": "inactive"}) + + This method would return: + + { + 'content': {'type': 'text'}, + 'category': {'type': 'keyword'}, + 'status': {'type': 'keyword'}, + 'priority': {'type': 'long'}, + } + + :returns: The information about the fields in the index. + """ + await self._ensure_initialized_async() + assert self._async_client is not None + + mapping = await self._async_client.indices.get_mapping(index=self._index) + index_mapping = mapping[self._index]["mappings"]["properties"] + # remove all fields that are not metadata fields + index_mapping = {k: v for k, v in index_mapping.items() if k not in SPECIAL_FIELDS} + return index_mapping + + @staticmethod + def _normalize_metadata_field_name(metadata_field: str) -> str: + """ + Normalizes a metadata field name by removing the "meta." prefix if present. + """ + return metadata_field[5:] if metadata_field.startswith("meta.") else metadata_field + + @staticmethod + def _build_min_max_query_body(field_name: str) -> dict[str, Any]: + """ + Builds the query body for getting min and max values using stats aggregation. + """ + return { + "query": {"match_all": {}}, + "aggs": { + "field_stats": { + "stats": { + "field": field_name, + } + } + }, + "size": 0, # We only need aggregations, not documents + } + + @staticmethod + def _extract_min_max_from_stats(stats: dict[str, Any]) -> dict[str, int | None]: + """ + Extracts min and max values from stats aggregation results. + """ + min_value = stats.get("min") + max_value = stats.get("max") + return {"min": min_value, "max": max_value} + + def get_metadata_field_min_max(self, metadata_field: str) -> dict[str, int | None]: + """ + Returns the minimum and maximum values for the given metadata field. + + :param metadata_field: The metadata field to get the minimum and maximum values for. + :returns: A dictionary with the keys "min" and "max", where each value is the minimum or maximum value of the + metadata field across all documents. + """ + self._ensure_initialized() + assert self._client is not None + + field_name = self._normalize_metadata_field_name(metadata_field) + body = self._build_min_max_query_body(field_name) + result = self._client.search(index=self._index, body=body) + stats = result.get("aggregations", {}).get("field_stats", {}) + + return self._extract_min_max_from_stats(stats) + + async def get_metadata_field_min_max_async(self, metadata_field: str) -> dict[str, int | None]: + """ + Asynchronously returns the minimum and maximum values for the given metadata field. + + :param metadata_field: The metadata field to get the minimum and maximum values for. + :returns: A dictionary with the keys "min" and "max", where each value is the minimum or maximum value of the + metadata field across all documents. + """ + await self._ensure_initialized_async() + assert self._async_client is not None + + field_name = self._normalize_metadata_field_name(metadata_field) + body = self._build_min_max_query_body(field_name) + result = await self._async_client.search(index=self._index, body=body) + stats = result.get("aggregations", {}).get("field_stats", {}) + + return self._extract_min_max_from_stats(stats) + + def get_metadata_field_unique_values( + self, + metadata_field: str, + search_term: str | None = None, + size: int | None = 10000, + after: dict[str, Any] | None = None, + ) -> tuple[list[str], dict[str, Any] | None]: + """ + Returns unique values for a metadata field, optionally filtered by a search term in the content. + Uses composite aggregations for proper pagination beyond 10k results. + + :param metadata_field: The metadata field to get unique values for. + :param search_term: Optional search term to filter documents by matching in the content field. + :param size: The number of unique values to return per page. Defaults to 10000. + :param after: Optional pagination key from the previous response. Use None for the first page. + For subsequent pages, pass the `after_key` from the previous response. + :returns: A tuple containing (list of unique values, after_key for pagination). + The after_key is None when there are no more results. Use it in the `after` parameter + for the next page. + """ + self._ensure_initialized() + assert self._client is not None + + field_name = self._normalize_metadata_field_name(metadata_field) + + # filter by search_term if provided + query: dict[str, Any] = {"match_all": {}} + if search_term: + # Use match_phrase for exact phrase matching to avoid tokenization issues + query = {"match_phrase": {"content": search_term}} + + # Build composite aggregation for proper pagination + composite_agg: dict[str, Any] = { + "size": size, + "sources": [{field_name: {"terms": {"field": field_name}}}], + } + if after is not None: + composite_agg["after"] = after + + body = { + "query": query, + "aggs": { + "unique_values": { + "composite": composite_agg, + } + }, + "size": 0, # we only need aggregations, not documents + } + + result = self._client.search(index=self._index, body=body) + aggregations = result.get("aggregations", {}) + + # Extract unique values from composite aggregation buckets + unique_values_agg = aggregations.get("unique_values", {}) + unique_values_buckets = unique_values_agg.get("buckets", []) + unique_values = [str(bucket["key"][field_name]) for bucket in unique_values_buckets] + + # Extract after_key for pagination + # If we got fewer results than requested, we've reached the end + after_key = unique_values_agg.get("after_key") + if after_key is not None and size is not None and len(unique_values_buckets) < size: + after_key = None + + return unique_values, after_key + + async def get_metadata_field_unique_values_async( + self, + metadata_field: str, + search_term: str | None = None, + size: int | None = 10000, + after: dict[str, Any] | None = None, + ) -> tuple[list[str], dict[str, Any] | None]: + """ + Asynchronously returns unique values for a metadata field, optionally filtered by a search term in the content. + Uses composite aggregations for proper pagination beyond 10k results. + + :param metadata_field: The metadata field to get unique values for. + :param search_term: Optional search term to filter documents by matching in the content field. + :param size: The number of unique values to return per page. Defaults to 10000. + :param after: Optional pagination key from the previous response. Use None for the first page. + For subsequent pages, pass the `after_key` from the previous response. + :returns: A tuple containing (list of unique values, after_key for pagination). + The after_key is None when there are no more results. Use it in the `after` parameter + for the next page. + """ + await self._ensure_initialized_async() + assert self._async_client is not None + + field_name = self._normalize_metadata_field_name(metadata_field) + + # filter by search_term if provided + query: dict[str, Any] = {"match_all": {}} + if search_term: + # Use match_phrase for exact phrase matching to avoid tokenization issues + query = {"match_phrase": {"content": search_term}} + + # Build composite aggregation for proper pagination + composite_agg: dict[str, Any] = { + "size": size, + "sources": [{field_name: {"terms": {"field": field_name}}}], + } + if after is not None: + composite_agg["after"] = after + + body = { + "query": query, + "aggs": { + "unique_values": { + "composite": composite_agg, + } + }, + "size": 0, # we only need aggregations, not documents + } + + result = await self._async_client.search(index=self._index, body=body) + aggregations = result.get("aggregations", {}) + + # Extract unique values from composite aggregation buckets + unique_values_agg = aggregations.get("unique_values", {}) + unique_values_buckets = unique_values_agg.get("buckets", []) + unique_values = [str(bucket["key"][field_name]) for bucket in unique_values_buckets] + + # Extract after_key for pagination + # If we got fewer results than requested, we've reached the end + after_key = unique_values_agg.get("after_key") + if after_key is not None and size is not None and len(unique_values_buckets) < size: + after_key = None + + return unique_values, after_key diff --git a/integrations/opensearch/tests/test_bm25_retriever.py b/integrations/opensearch/tests/test_bm25_retriever.py index 03235c36b3..d6bb6350e4 100644 --- a/integrations/opensearch/tests/test_bm25_retriever.py +++ b/integrations/opensearch/tests/test_bm25_retriever.py @@ -424,6 +424,60 @@ def test_bm25_retriever_runtime_document_store_switching( assert len(results_1_again["documents"]) == 1 +@pytest.mark.integration +def test_bm25_retriever_document_structure_with_metadata(document_store): + """ + Test document structure with complex metadata (nested values, lists, etc.) + """ + docs = [ + Document( + content="Python is versatile", + meta={ + "category": "programming", + "tags": ["python", "general-purpose"], + "rating": 4.5, + "active": True, + "author": {"name": "John", "role": "developer"}, + }, + id="python_doc", + ), + Document( + content="JavaScript is dynamic", + meta={ + "category": "programming", + "tags": ["javascript", "web"], + "rating": 4.8, + "active": True, + }, + id="js_doc", + ), + ] + document_store.write_documents(docs, refresh=True) + retriever = OpenSearchBM25Retriever(document_store=document_store) + + results = retriever.run(query="programming", top_k=2) + assert len(results["documents"]) == 2 + + for doc in results["documents"]: + # Verify structure + assert hasattr(doc, "content") + assert hasattr(doc, "meta") + assert isinstance(doc.meta, dict) + + # Verify complex metadata is preserved + assert "category" in doc.meta + assert "tags" in doc.meta + assert isinstance(doc.meta["tags"], list) + assert "rating" in doc.meta + + # Verify document can be serialized/deserialized + doc_dict = doc.to_dict() + doc_from_dict = Document.from_dict(doc_dict) + assert doc_from_dict.content == doc.content + assert doc_from_dict.meta == doc.meta + assert doc_from_dict.id == doc.id + + @pytest.mark.asyncio @pytest.mark.integration async def test_bm25_retriever_async_runtime_document_store_switching( diff --git a/integrations/opensearch/tests/test_document_store.py b/integrations/opensearch/tests/test_document_store.py index c75a5ca0a9..c45288b3d7 100644 --- a/integrations/opensearch/tests/test_document_store.py +++ b/integrations/opensearch/tests/test_document_store.py @@ -593,6 +593,219 @@ def test_update_by_filter(self, document_store: OpenSearchDocumentStore): assert len(draft_docs) == 1 assert draft_docs[0].meta["category"] == "B" + def test_count_documents_by_filter(self, document_store: OpenSearchDocumentStore): + docs = [ + Document(content="Doc 1", meta={"category": "A", "status": "active"}), + Document(content="Doc 2", meta={"category": "B", "status": "active"}), + Document(content="Doc 3", meta={"category": "A", "status": "inactive"}), + Document(content="Doc 4", meta={"category": "A", "status": "active"}), + ] + document_store.write_documents(docs) + assert document_store.count_documents() == 4 + + count_a = document_store.count_documents_by_filter( + filters={"field": "meta.category", "operator": "==", "value": "A"} + ) + assert count_a == 3 + + count_a_active = document_store.count_documents_by_filter( + filters={ + "operator": "AND", + "conditions": [ + {"field": "meta.category", "operator": "==", "value": "A"}, + {"field": "meta.status", "operator": "==", "value": "active"}, + ], + } + ) + assert count_a_active == 2 + + def test_count_unique_metadata_by_filter(self, document_store: OpenSearchDocumentStore): + docs = [ + Document(content="Doc 1", meta={"category": "A", "status": "active", "priority": 1}), + Document(content="Doc 2", meta={"category": "B", "status": "active", "priority": 2}), + Document(content="Doc 3", meta={"category": "A", "status": "inactive", "priority": 1}), + Document(content="Doc 4", meta={"category": "A", "status": "active", "priority": 3}), + Document(content="Doc 5", meta={"category": "C", "status": "active", "priority": 2}), + ] + document_store.write_documents(docs) + assert document_store.count_documents() == 5 + + # Count distinct values for all documents + distinct_counts = document_store.count_unique_metadata_by_filter( + filters={}, metadata_fields=["category", "status", "priority"] + ) + assert distinct_counts["category"] == 3 # A, B, C + assert distinct_counts["status"] == 2 # active, inactive + assert distinct_counts["priority"] == 3 # 1, 2, 3 + + # Count distinct values for documents with category="A" + distinct_counts_a = document_store.count_unique_metadata_by_filter( + filters={"field": "meta.category", "operator": "==", "value": "A"}, + metadata_fields=["category", "status", "priority"], + ) + assert distinct_counts_a["category"] == 1 # Only A + assert distinct_counts_a["status"] == 2 # active, inactive + assert distinct_counts_a["priority"] == 2 # 1, 3 + + # Count distinct values for documents with status="active" + distinct_counts_active = document_store.count_unique_metadata_by_filter( + filters={"field": "meta.status", "operator": "==", "value": "active"}, + metadata_fields=["category", "status", "priority"], + ) + assert distinct_counts_active["category"] == 3 # A, B, C + assert distinct_counts_active["status"] == 1 # Only active + assert distinct_counts_active["priority"] == 3 # 1, 2, 3 + + # Count distinct values with complex filter (category="A" AND status="active") + distinct_counts_a_active = document_store.count_unique_metadata_by_filter( + filters={ + "operator": "AND", + "conditions": [ + {"field": "meta.category", "operator": "==", "value": "A"}, + {"field": "meta.status", "operator": "==", "value": "active"}, + ], + }, + metadata_fields=["category", "status", "priority"], + ) + assert distinct_counts_a_active["category"] == 1 # Only A + assert distinct_counts_a_active["status"] == 1 # Only active + assert distinct_counts_a_active["priority"] == 2 # 1, 3 + + # Test with only a subset of fields + distinct_counts_subset = document_store.count_unique_metadata_by_filter( + filters={}, metadata_fields=["category", "status"] + ) + assert distinct_counts_subset["category"] == 3 + assert distinct_counts_subset["status"] == 2 + assert "priority" not in distinct_counts_subset + + # Test field name normalization (with "meta." prefix) + distinct_counts_normalized = document_store.count_unique_metadata_by_filter( + filters={}, metadata_fields=["meta.category", "status", "meta.priority"] + ) + assert distinct_counts_normalized["category"] == 3 + assert distinct_counts_normalized["status"] == 2 + assert distinct_counts_normalized["priority"] == 3 + + # Test error handling when field doesn't exist + with pytest.raises(ValueError, match="Fields not found in index mapping"): + document_store.count_unique_metadata_by_filter(filters={}, metadata_fields=["nonexistent_field"]) + + def test_get_metadata_fields_info(self, document_store: OpenSearchDocumentStore): + docs = [ + Document(content="Doc 1", meta={"category": "A", "status": "active", "priority": 1}), + Document(content="Doc 2", meta={"category": "B", "status": "inactive"}), + ] + document_store.write_documents(docs) + + fields_info = document_store.get_metadata_fields_info() + + # Verify that fields_info contains expected fields + assert "category" in fields_info + assert "status" in fields_info + assert "priority" in fields_info + + assert fields_info["category"]["type"] == "keyword" + assert fields_info["status"]["type"] == "keyword" + assert fields_info["priority"]["type"] == "long" + + def test_get_metadata_field_min_max(self, document_store: OpenSearchDocumentStore): + # Test with integer values + docs = [ + Document(content="Doc 1", meta={"priority": 1, "age": 10}), + Document(content="Doc 2", meta={"priority": 5, "age": 20}), + Document(content="Doc 3", meta={"priority": 3, "age": 15}), + Document(content="Doc 4", meta={"priority": 10, "age": 5}), + Document(content="Doc 6", meta={"rating": 10.5}), + Document(content="Doc 7", meta={"rating": 20.3}), + Document(content="Doc 8", meta={"rating": 15.7}), + Document(content="Doc 9", meta={"rating": 5.2}), + ] + document_store.write_documents(docs) + + # Test with "meta." prefix for integer field + min_max_priority = document_store.get_metadata_field_min_max("meta.priority") + assert min_max_priority["min"] == 1 + assert min_max_priority["max"] == 10 + + # Test with "meta." prefix for another integer field + min_max_rating = document_store.get_metadata_field_min_max("meta.age") + assert min_max_rating["min"] == 5 + assert min_max_rating["max"] == 20 + + # Test with single value + single_doc = [Document(content="Doc 5", meta={"single_value": 42})] + document_store.write_documents(single_doc) + min_max_single = document_store.get_metadata_field_min_max("meta.single_value") + assert min_max_single["min"] == 42 + assert min_max_single["max"] == 42 + + # Test with float values + min_max_score = document_store.get_metadata_field_min_max("meta.rating") + assert min_max_score["min"] == pytest.approx(5.2) + assert min_max_score["max"] == pytest.approx(20.3) + + def test_get_metadata_field_unique_values(self, document_store: OpenSearchDocumentStore): + # Test with string values + docs = [ + Document(content="Python programming", meta={"category": "A", "language": "Python"}), + Document(content="Java programming", meta={"category": "B", "language": "Java"}), + Document(content="Python scripting", meta={"category": "A", "language": "Python"}), + Document(content="JavaScript development", meta={"category": "C", "language": "JavaScript"}), + Document(content="Python data science", meta={"category": "A", "language": "Python"}), + Document(content="Java backend", meta={"category": "B", "language": "Java"}), + ] + document_store.write_documents(docs) + + # Test getting all unique values without search term + unique_values, after_key = document_store.get_metadata_field_unique_values("meta.category", None, 10) + assert set(unique_values) == {"A", "B", "C"} + # after_key should be None when all results are returned + assert after_key is None + + # Test with "meta." prefix + unique_languages, _ = document_store.get_metadata_field_unique_values("meta.language", None, 10) + assert set(unique_languages) == {"Python", "Java", "JavaScript"} + + # Test pagination - first page + unique_values_page1, after_key_page1 = document_store.get_metadata_field_unique_values("meta.category", None, 2) + assert len(unique_values_page1) == 2 + assert all(val in ["A", "B", "C"] for val in unique_values_page1) + # Should have an after_key for pagination + assert after_key_page1 is not None + + # Test pagination - second page using after_key + unique_values_page2, after_key_page2 = document_store.get_metadata_field_unique_values( + "meta.category", None, 2, after=after_key_page1 + ) + assert len(unique_values_page2) == 1 + assert unique_values_page2[0] in ["A", "B", "C"] + # Should have no more results + assert after_key_page2 is None + + # Test with search term - filter by content matching "Python" + unique_values_filtered, _ = document_store.get_metadata_field_unique_values("meta.category", "Python", 10) + assert set(unique_values_filtered) == {"A"} # Only category A has documents with "Python" in content + + # Test with search term - filter by content matching "Java" + unique_values_java, _ = document_store.get_metadata_field_unique_values("meta.category", "Java", 10) + assert set(unique_values_java) == {"B"} # Only category B has documents with "Java" in content + + # Test with integer values + int_docs = [ + Document(content="Doc 1", meta={"priority": 1}), + Document(content="Doc 2", meta={"priority": 2}), + Document(content="Doc 3", meta={"priority": 1}), + Document(content="Doc 4", meta={"priority": 3}), + ] + document_store.write_documents(int_docs) + unique_priorities, _ = document_store.get_metadata_field_unique_values("meta.priority", None, 10) + assert set(unique_priorities) == {"1", "2", "3"} + + # Test with search term on integer field + unique_priorities_filtered, _ = document_store.get_metadata_field_unique_values("meta.priority", "Doc 1", 10) + assert set(unique_priorities_filtered) == {"1"} + @pytest.mark.integration def test_write_with_routing(self, document_store: OpenSearchDocumentStore): """Test writing documents with routing metadata""" diff --git a/integrations/opensearch/tests/test_document_store_async.py b/integrations/opensearch/tests/test_document_store_async.py index bbff724b9d..18ceb3c4de 100644 --- a/integrations/opensearch/tests/test_document_store_async.py +++ b/integrations/opensearch/tests/test_document_store_async.py @@ -236,6 +236,113 @@ async def test_filter_documents(self, document_store: OpenSearchDocumentStore): assert result[0].content == "2" assert result[0].meta["number"] == 100 + @pytest.mark.asyncio + async def test_count_documents_by_filter(self, document_store: OpenSearchDocumentStore): + filterable_docs = [ + Document(content="Doc 1", meta={"category": "A", "status": "active"}), + Document(content="Doc 2", meta={"category": "B", "status": "active"}), + Document(content="Doc 3", meta={"category": "A", "status": "inactive"}), + Document(content="Doc 4", meta={"category": "A", "status": "active"}), + ] + await document_store.write_documents_async(filterable_docs) + assert await document_store.count_documents_async() == 4 + + count_a = await document_store.count_documents_by_filter_async( + filters={"field": "meta.category", "operator": "==", "value": "A"} + ) + assert count_a == 3 + + count_active = await document_store.count_documents_by_filter_async( + filters={"field": "meta.status", "operator": "==", "value": "active"} + ) + assert count_active == 3 + + count_a_active = await document_store.count_documents_by_filter_async( + filters={ + "operator": "AND", + "conditions": [ + {"field": "meta.category", "operator": "==", "value": "A"}, + {"field": "meta.status", "operator": "==", "value": "active"}, + ], + } + ) + assert count_a_active == 2 + + @pytest.mark.asyncio + async def test_count_unique_metadata_by_filter(self, document_store: OpenSearchDocumentStore): + filterable_docs = [ + Document(content="Doc 1", meta={"category": "A", "status": "active", "priority": 1}), + Document(content="Doc 2", meta={"category": "B", "status": "active", "priority": 2}), + Document(content="Doc 3", meta={"category": "A", "status": "inactive", "priority": 1}), + Document(content="Doc 4", meta={"category": "A", "status": "active", "priority": 3}), + Document(content="Doc 5", meta={"category": "C", "status": "active", "priority": 2}), + ] + await document_store.write_documents_async(filterable_docs) + assert await document_store.count_documents_async() == 5 + + # count distinct values for all documents + distinct_counts = await document_store.count_unique_metadata_by_filter_async( + filters={}, metadata_fields=["category", "status", "priority"] + ) + assert distinct_counts["category"] == 3 # A, B, C + assert distinct_counts["status"] == 2 # active, inactive + assert distinct_counts["priority"] == 3 # 1, 2, 3 + + # count distinct values for documents with category="A" + distinct_counts_a = await document_store.count_unique_metadata_by_filter_async( + filters={"field": "meta.category", "operator": "==", "value": "A"}, + metadata_fields=["category", "status", "priority"], + ) + assert distinct_counts_a["category"] == 1 # Only A + assert distinct_counts_a["status"] == 2 # active, inactive + assert distinct_counts_a["priority"] == 2 # 1, 3 + + # count distinct values for documents with status="active" + distinct_counts_active = await document_store.count_unique_metadata_by_filter_async( + filters={"field": "meta.status", "operator": "==", "value": "active"}, + metadata_fields=["category", "status", "priority"], + ) + assert distinct_counts_active["category"] == 3 # A, B, C + assert distinct_counts_active["status"] == 1 # Only active + assert distinct_counts_active["priority"] == 3 # 1, 2, 3 + + # count distinct values with complex filter (category="A" AND status="active") + distinct_counts_a_active = await document_store.count_unique_metadata_by_filter_async( + filters={ + "operator": "AND", + "conditions": [ + {"field": "meta.category", "operator": "==", "value": "A"}, + {"field": "meta.status", "operator": "==", "value": "active"}, + ], + }, + metadata_fields=["category", "status", "priority"], + ) + assert distinct_counts_a_active["category"] == 1 # Only A + assert distinct_counts_a_active["status"] == 1 # Only active + assert distinct_counts_a_active["priority"] == 2 # 1, 3 + + # Test with only a subset of fields + distinct_counts_subset = await document_store.count_unique_metadata_by_filter_async( + filters={}, metadata_fields=["category", "status"] + ) + assert distinct_counts_subset["category"] == 3 + assert distinct_counts_subset["status"] == 2 + assert "priority" not in distinct_counts_subset + + # Test field name normalization (with "meta." prefix) + distinct_counts_normalized = await document_store.count_unique_metadata_by_filter_async( + filters={}, metadata_fields=["meta.category", "status", "meta.priority"] + ) + assert distinct_counts_normalized["category"] == 3 + assert distinct_counts_normalized["status"] == 2 + assert distinct_counts_normalized["priority"] == 3 + + # Test error handling when field doesn't exist + with pytest.raises(ValueError, match="Fields not found in index mapping"): + await document_store.count_unique_metadata_by_filter_async( + filters={}, metadata_fields=["nonexistent_field"] + ) + @pytest.mark.asyncio async def test_delete_documents(self, document_store: OpenSearchDocumentStore): doc = Document(content="test doc") @@ -391,3 +498,129 @@ async def test_update_by_filter_async(self, document_store: OpenSearchDocumentSt ) assert len(draft_docs) == 1 assert draft_docs[0].meta["category"] == "B" + + @pytest.mark.asyncio + async def test_get_metadata_fields_info_async(self, document_store: OpenSearchDocumentStore): + filterable_docs = [ + Document(content="Doc 1", meta={"category": "A", "status": "active", "priority": 1}), + Document(content="Doc 2", meta={"category": "B", "status": "inactive"}), + ] + await document_store.write_documents_async(filterable_docs) + + fields_info = await document_store.get_metadata_fields_info_async() + + # Verify that fields_info contains expected fields + assert "category" in fields_info + assert "status" in fields_info + assert "priority" in fields_info + + assert fields_info["category"]["type"] == "keyword" + assert fields_info["status"]["type"] == "keyword" + assert fields_info["priority"]["type"] == "long" + + @pytest.mark.asyncio + async def test_get_metadata_field_min_max_async(self, document_store: OpenSearchDocumentStore): + # Test with integer values + docs = [ + Document(content="Doc 1", meta={"priority": 1, "age": 10}), + Document(content="Doc 2", meta={"priority": 5, "age": 20}), + Document(content="Doc 3", meta={"priority": 3, "age": 15}), + Document(content="Doc 4", meta={"priority": 10, "age": 5}), + Document(content="Doc 6", meta={"rating": 10.5}), + Document(content="Doc 7", meta={"rating": 20.3}), + Document(content="Doc 8", meta={"rating": 15.7}), + Document(content="Doc 9", meta={"rating": 5.2}), + ] + await document_store.write_documents_async(docs) + + # Test with "meta." prefix for integer field + min_max_priority = await document_store.get_metadata_field_min_max_async("meta.priority") + assert min_max_priority["min"] == 1 + assert min_max_priority["max"] == 10 + + # Test with "meta." prefix for another integer field + min_max_rating = await document_store.get_metadata_field_min_max_async("meta.age") + assert min_max_rating["min"] == 5 + assert min_max_rating["max"] == 20 + + # Test with single value + single_doc = [Document(content="Doc 5", meta={"single_value": 42})] + await document_store.write_documents_async(single_doc) + min_max_single = await document_store.get_metadata_field_min_max_async("meta.single_value") + assert min_max_single["min"] == 42 + assert min_max_single["max"] == 42 + + # Test with float values + min_max_score = await document_store.get_metadata_field_min_max_async("meta.rating") + assert min_max_score["min"] == pytest.approx(5.2) + assert min_max_score["max"] == pytest.approx(20.3) + + @pytest.mark.asyncio + async def test_get_metadata_field_unique_values_async(self, document_store: OpenSearchDocumentStore): + # Test with string values + docs = [ + Document(content="Python programming", meta={"category": "A", "language": "Python"}), + Document(content="Java programming", meta={"category": "B", "language": "Java"}), + Document(content="Python scripting", meta={"category": "A", "language": "Python"}), + Document(content="JavaScript development", meta={"category": "C", "language": "JavaScript"}), + Document(content="Python data science", meta={"category": "A", "language": "Python"}), + Document(content="Java backend", meta={"category": "B", "language": "Java"}), + ] + await document_store.write_documents_async(docs) + + # Test getting all unique values without search term + unique_values, after_key = await document_store.get_metadata_field_unique_values_async( + "meta.category", None, 10 + ) + assert set(unique_values) == {"A", "B", "C"} + # after_key should be None when all results are returned + assert after_key is None + + # Test with "meta." prefix + unique_languages, _ = await document_store.get_metadata_field_unique_values_async("meta.language", None, 10) + assert set(unique_languages) == {"Python", "Java", "JavaScript"} + + # Test pagination - first page + unique_values_page1, after_key_page1 = await document_store.get_metadata_field_unique_values_async( + "meta.category", None, 2 + ) + assert len(unique_values_page1) == 2 + assert all(val in ["A", "B", "C"] for val in unique_values_page1) + # Should have an after_key for pagination + assert after_key_page1 is not None + + # Test pagination - second page using after_key + unique_values_page2, after_key_page2 = await document_store.get_metadata_field_unique_values_async( + "meta.category", None, 2, after=after_key_page1 + ) + assert len(unique_values_page2) == 1 + assert unique_values_page2[0] in ["A", "B", "C"] + # Should have no more results + assert after_key_page2 is None + + # Test with search term - filter by content matching "Python" + unique_values_filtered, _ = await document_store.get_metadata_field_unique_values_async( + "meta.category", "Python", 10 + ) + assert set(unique_values_filtered) == {"A"} # Only category A has documents with "Python" in content + + # Test with search term - filter by content matching "Java" + unique_values_java, _ = await document_store.get_metadata_field_unique_values_async("meta.category", "Java", 10) + assert set(unique_values_java) == {"B"} # Only category B has documents with "Java" in content + + # Test with integer values + int_docs = [ + Document(content="Doc 1", meta={"priority": 1}), + Document(content="Doc 2", meta={"priority": 2}), + Document(content="Doc 3", meta={"priority": 1}), + Document(content="Doc 4", meta={"priority": 3}), + ] + await document_store.write_documents_async(int_docs) + unique_priorities, _ = await document_store.get_metadata_field_unique_values_async("meta.priority", None, 10) + assert set(unique_priorities) == {"1", "2", "3"} + + # Test with search term on integer field + unique_priorities_filtered, _ = await document_store.get_metadata_field_unique_values_async( + "meta.priority", "Doc 1", 10 + ) + assert set(unique_priorities_filtered) == {"1"} diff --git a/integrations/opensearch/tests/test_embedding_retriever.py b/integrations/opensearch/tests/test_embedding_retriever.py index a01b7dc008..fe2f5865c6 100644 --- a/integrations/opensearch/tests/test_embedding_retriever.py +++ b/integrations/opensearch/tests/test_embedding_retriever.py @@ -404,3 +404,51 @@ async def test_embedding_retriever_runtime_document_store_switching_async( python_query_embedding = [0.4, 0.4, 0.4] + [0.0] * 765 results_1_again = await retriever.run_async(query_embedding=python_query_embedding) assert "Python" in results_1_again["documents"][0].content + + +@pytest.mark.integration +def test_embedding_retriever_document_structure_with_metadata(document_store, test_documents_with_embeddings_1): + """ + Test that documents returned by embedding retriever have correct structure: + - Metadata fields are in doc.meta (not at top level) + - Special fields (content, embedding, id, score) are at top level + - All original metadata is preserved + """ + document_store.write_documents(test_documents_with_embeddings_1, refresh=True) + retriever = OpenSearchEmbeddingRetriever(document_store=document_store) + + # Query embedding to match functional programming languages + query_embedding = [0.2, 0.3, 0.4] + [0.0] * 765 + results = retriever.run(query_embedding=query_embedding, top_k=5) + + assert len(results["documents"]) > 0 + + for doc in results["documents"]: + # Verify special fields are at top level + assert hasattr(doc, "content") + assert isinstance(doc.content, str) + assert hasattr(doc, "id") + assert isinstance(doc.id, str) + assert hasattr(doc, "score") + assert doc.score is not None + assert hasattr(doc, "embedding") + assert isinstance(doc.embedding, list) + assert len(doc.embedding) == 768 + + # Verify metadata fields are in meta dict (not at top level) + assert hasattr(doc, "meta") + assert isinstance(doc.meta, dict) + + # Verify original metadata is preserved + assert "likes" in doc.meta + assert "language_type" in doc.meta + assert isinstance(doc.meta["likes"], int) + assert isinstance(doc.meta["language_type"], str) + + # Verify document can be serialized/deserialized + doc_dict = doc.to_dict() + doc_from_dict = Document.from_dict(doc_dict) + assert doc_from_dict.content == doc.content + assert doc_from_dict.meta == doc.meta + assert doc_from_dict.id == doc.id + assert doc_from_dict.embedding == doc.embedding