Skip to content

Commit e6932b0

Browse files
committed
adding get_field_unique_values async
1 parent e0be21f commit e6932b0

2 files changed

Lines changed: 123 additions & 0 deletions

File tree

integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1352,5 +1352,66 @@ def get_field_unique_values(
13521352

13531353
return unique_values, total_count
13541354

1355+
async def get_field_unique_values_async(
1356+
self, metadata_field: str, search_term: str | None, from_: int, size: int
1357+
) -> tuple[list[str], int]:
1358+
"""
1359+
Asynchronously returns unique values for a metadata field, optionally filtered by a search term in the content.
1360+
1361+
:param metadata_field: The metadata field to get unique values for.
1362+
:param search_term: Optional search term to filter documents by matching in the content field.
1363+
:param from_: The starting index for pagination.
1364+
:param size: The number of unique values to return.
1365+
:returns: A tuple containing (list of unique values, total count of unique values).
1366+
"""
1367+
await self._ensure_initialized_async()
1368+
assert self._async_client is not None
1369+
1370+
field_name = self._normalize_metadata_field_name(metadata_field)
1371+
1372+
# filter by search_term if provided
1373+
query = {"match_all": {}}
1374+
if search_term:
1375+
# Use match_phrase for exact phrase matching to avoid tokenization issues
1376+
query = {"match_phrase": {"content": search_term}}
1377+
1378+
# Build aggregations
1379+
# Terms aggregation for paginated unique values
1380+
# Note: Terms aggregation doesn't support 'from' parameter directly,
1381+
# so we fetch from_ + size results and slice them
1382+
# Cardinality aggregation for total count
1383+
terms_size = from_ + size if from_ > 0 else size
1384+
body = {
1385+
"query": query,
1386+
"aggs": {
1387+
"unique_values": {
1388+
"terms": {
1389+
"field": field_name,
1390+
"size": terms_size,
1391+
}
1392+
},
1393+
"total_count": {
1394+
"cardinality": {
1395+
"field": field_name,
1396+
}
1397+
},
1398+
},
1399+
"size": 0, # we only need aggregations, not documents
1400+
}
1401+
1402+
result = await self._async_client.search(index=self._index, body=body)
1403+
aggregations = result.get("aggregations", {})
1404+
1405+
# Extract unique values from terms aggregation buckets
1406+
unique_values_buckets = aggregations.get("unique_values", {}).get("buckets", [])
1407+
# Apply pagination by slicing the results
1408+
paginated_buckets = unique_values_buckets[from_ : from_ + size]
1409+
unique_values = [str(bucket["key"]) for bucket in paginated_buckets]
1410+
1411+
# Extract total count from cardinality aggregation
1412+
total_count = int(aggregations.get("total_count", {}).get("value", 0))
1413+
1414+
return unique_values, total_count
1415+
13551416
def query_sql(self, query: str):
13561417
pass

integrations/opensearch/tests/test_document_store_async.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -499,4 +499,66 @@ async def test_get_field_min_max(self, document_store: OpenSearchDocumentStore):
499499
assert min_max_score["min"] == pytest.approx(5.2)
500500
assert min_max_score["max"] == pytest.approx(20.3)
501501

502+
@pytest.mark.asyncio
503+
async def test_get_field_unique_values(self, document_store: OpenSearchDocumentStore):
504+
# Test with string values
505+
docs = [
506+
Document(content="Python programming", meta={"category": "A", "language": "Python"}),
507+
Document(content="Java programming", meta={"category": "B", "language": "Java"}),
508+
Document(content="Python scripting", meta={"category": "A", "language": "Python"}),
509+
Document(content="JavaScript development", meta={"category": "C", "language": "JavaScript"}),
510+
Document(content="Python data science", meta={"category": "A", "language": "Python"}),
511+
Document(content="Java backend", meta={"category": "B", "language": "Java"}),
512+
]
513+
await document_store.write_documents_async(docs)
514+
515+
# Test getting all unique values without search term
516+
unique_values, total_count = await document_store.get_field_unique_values_async("meta.category", None, 0, 10)
517+
assert set(unique_values) == {"A", "B", "C"}
518+
assert total_count == 3
519+
520+
# Test with "meta." prefix
521+
unique_languages, lang_count = await document_store.get_field_unique_values_async("meta.language", None, 0, 10)
522+
assert set(unique_languages) == {"Python", "Java", "JavaScript"}
523+
assert lang_count == 3
524+
525+
# Test pagination - first page
526+
unique_values_page1, total_count = await document_store.get_field_unique_values_async("meta.category", None, 0, 2)
527+
assert len(unique_values_page1) == 2
528+
assert total_count == 3
529+
assert all(val in ["A", "B", "C"] for val in unique_values_page1)
530+
531+
# Test pagination - second page
532+
unique_values_page2, total_count = await document_store.get_field_unique_values_async("meta.category", None, 2, 2)
533+
assert len(unique_values_page2) == 1
534+
assert total_count == 3
535+
assert unique_values_page2[0] in ["A", "B", "C"]
536+
537+
# Test with search term - filter by content matching "Python"
538+
unique_values_filtered, total_count = await document_store.get_field_unique_values_async("meta.category", "Python", 0, 10)
539+
assert set(unique_values_filtered) == {"A"} # Only category A has documents with "Python" in content
540+
assert total_count == 1
541+
542+
# Test with search term - filter by content matching "Java"
543+
unique_values_java, total_count = await document_store.get_field_unique_values_async("meta.category", "Java", 0, 10)
544+
assert set(unique_values_java) == {"B"} # Only category B has documents with "Java" in content
545+
assert total_count == 1
546+
547+
# Test with integer values
548+
int_docs = [
549+
Document(content="Doc 1", meta={"priority": 1}),
550+
Document(content="Doc 2", meta={"priority": 2}),
551+
Document(content="Doc 3", meta={"priority": 1}),
552+
Document(content="Doc 4", meta={"priority": 3}),
553+
]
554+
await document_store.write_documents_async(int_docs)
555+
unique_priorities, priority_count = await document_store.get_field_unique_values_async("meta.priority", None, 0, 10)
556+
assert set(unique_priorities) == {"1", "2", "3"}
557+
assert priority_count == 3
558+
559+
# Test with search term on integer field
560+
unique_priorities_filtered, priority_count = await document_store.get_field_unique_values_async("meta.priority", "Doc 1", 0, 10)
561+
assert set(unique_priorities_filtered) == {"1"}
562+
assert priority_count == 1
563+
502564

0 commit comments

Comments
 (0)