diff --git a/integrations/valkey/pyproject.toml b/integrations/valkey/pyproject.toml index dc065662ad..48cb1efa53 100644 --- a/integrations/valkey/pyproject.toml +++ b/integrations/valkey/pyproject.toml @@ -26,7 +26,7 @@ classifiers = [ "Programming Language :: Python :: Implementation :: CPython", "Programming Language :: Python :: Implementation :: PyPy", ] -dependencies = ["haystack-ai>=2.26.1", "valkey-glide>=2.2.0", "valkey-glide-sync>=2.2.0"] +dependencies = ["haystack-ai>=2.27.0", "valkey-glide>=2.2.0", "valkey-glide-sync>=2.2.0"] [project.urls] Documentation = "https://github.com/deepset-ai/haystack-core-integrations/tree/main/integrations/valkey#readme" diff --git a/integrations/valkey/tests/test_document_store_async.py b/integrations/valkey/tests/test_document_store_async.py index 8b254507fc..f264748a47 100644 --- a/integrations/valkey/tests/test_document_store_async.py +++ b/integrations/valkey/tests/test_document_store_async.py @@ -9,20 +9,54 @@ import pytest import pytest_asyncio from haystack.dataclasses import ByteStream, Document +from haystack.testing.document_store import ( + CountDocumentsByFilterAsyncTest, + CountUniqueMetadataByFilterAsyncTest, + UpdateByFilterAsyncTest, +) +from haystack.testing.document_store_async import ( + CountDocumentsAsyncTest, + DeleteAllAsyncTest, + DeleteByFilterAsyncTest, + DeleteDocumentsAsyncTest, + GetMetadataFieldMinMaxAsyncTest, + GetMetadataFieldUniqueValuesAsyncTest, + WriteDocumentsAsyncTest, +) from haystack_integrations.document_stores.valkey import ValkeyDocumentStore @pytest.mark.integration @pytest.mark.asyncio -class TestValkeyDocumentStoreAsync: +class TestValkeyDocumentStoreAsync( + CountDocumentsAsyncTest, + WriteDocumentsAsyncTest, + DeleteDocumentsAsyncTest, + DeleteAllAsyncTest, + DeleteByFilterAsyncTest, + UpdateByFilterAsyncTest, + CountDocumentsByFilterAsyncTest, + CountUniqueMetadataByFilterAsyncTest, + GetMetadataFieldMinMaxAsyncTest, + GetMetadataFieldUniqueValuesAsyncTest, +): @pytest_asyncio.fixture async def document_store(self): store = ValkeyDocumentStore( index_name="test_async_haystack_document", embedding_dim=3, batch_size=5, - metadata_fields={"category": str, "priority": int, "status": str, "score": int, "quality": str}, + metadata_fields={ + "category": str, + "priority": int, + "status": str, + "score": int, + "quality": str, + "rating": float, + "age": int, + "year": int, + }, ) yield store await store.close_async() @@ -36,19 +70,89 @@ async def cleanup_after_test(self, document_store): except Exception: pass - async def test_async_write_and_count_documents(self, document_store): + # --- Override for mixin bug: test_count_not_empty_async is missing `self` in DeleteByFilterAsyncTest --- + + @pytest.mark.asyncio + async def test_count_not_empty_async(self, document_store): + """Test count is greater than zero if the document store contains documents.""" + await document_store.write_documents_async( + [Document(content="test doc 1"), Document(content="test doc 2"), Document(content="test doc 3")] + ) + assert await document_store.count_documents_async() == 3 + + # --- Overrides for WriteDocumentsAsyncTest --- + # ValkeyDocumentStore only supports DuplicatePolicy.NONE and DuplicatePolicy.OVERWRITE + + @pytest.mark.asyncio + async def test_write_documents_async(self, document_store): + """ValkeyDocumentStore default policy (NONE) behaves like OVERWRITE.""" + docs = [Document(content="test doc 1"), Document(content="test doc 2")] + result = await document_store.write_documents_async(docs) + assert result == 2 + + @pytest.mark.asyncio + async def test_write_documents_duplicate_fail_async(self, document_store): + pytest.skip("ValkeyDocumentStore does not support DuplicatePolicy.FAIL") + + @pytest.mark.asyncio + async def test_write_documents_duplicate_skip_async(self, document_store): + pytest.skip("ValkeyDocumentStore does not support DuplicatePolicy.SKIP") + + # --- Overrides for mixin tests that use undeclared metadata fields --- + # ValkeyDocumentStore requires metadata fields to be pre-declared in the fixture. + + @pytest.mark.asyncio + async def test_update_by_filter_async(self, document_store): + """Override: use declared metadata fields (category, priority) instead of filterable_docs fixture.""" test_id = str(uuid.uuid4())[:8] docs = [ - Document(id=f"async1_{test_id}", content="async test doc 1", embedding=[0.1, 0.2, 0.3]), - Document(id=f"async2_{test_id}", content="async test doc 2", embedding=[0.4, 0.5, 0.6]), - Document(id=f"async3_{test_id}", content="async test doc 3"), # No embedding + Document( + id=f"u1_{test_id}", content="doc 1", embedding=[0.1, 0.2, 0.3], meta={"category": "news", "priority": 1} + ), + Document( + id=f"u2_{test_id}", content="doc 2", embedding=[0.2, 0.3, 0.4], meta={"category": "blog", "priority": 2} + ), + Document( + id=f"u3_{test_id}", content="doc 3", embedding=[0.3, 0.4, 0.5], meta={"category": "news", "priority": 3} + ), ] + await document_store.write_documents_async(docs) - result = await document_store.write_documents_async(docs) - assert result == 3 + updated_count = await document_store.update_by_filter_async( + filters={"field": "meta.category", "operator": "==", "value": "news"}, meta={"status": "archived"} + ) + assert updated_count == 2 - count = await document_store.count_documents_async() - assert count == 3 + all_docs = await document_store.filter_documents_async(filters=None) + by_id = {d.id: d for d in all_docs} + assert by_id[f"u1_{test_id}"].meta.get("status") == "archived" + assert by_id[f"u2_{test_id}"].meta.get("status") is None + assert by_id[f"u3_{test_id}"].meta.get("status") == "archived" + + @pytest.mark.asyncio + async def test_count_unique_metadata_by_filter_async_with_multiple_filters(self, document_store): + """Override: use declared metadata fields (category, priority) instead of year.""" + test_id = str(uuid.uuid4())[:8] + docs = [ + Document(id=f"cu1_{test_id}", content="doc 1", meta={"category": "A", "priority": 1}), + Document(id=f"cu2_{test_id}", content="doc 2", meta={"category": "A", "priority": 2}), + Document(id=f"cu3_{test_id}", content="doc 3", meta={"category": "B", "priority": 1}), + Document(id=f"cu4_{test_id}", content="doc 4", meta={"category": "B", "priority": 2}), + ] + await document_store.write_documents_async(docs) + + count = await document_store.count_documents_by_filter_async( + filters={ + "operator": "AND", + "conditions": [ + {"field": "meta.category", "operator": "==", "value": "B"}, + {"field": "meta.priority", "operator": "==", "value": 1}, + ], + } + ) + assert count == 1 + + # --- Valkey-specific tests --- async def test_async_write_exceed_batch_size(self, document_store): test_id = str(uuid.uuid4())[:8] @@ -89,34 +193,6 @@ async def test_async_write_multiple_full_batches(self, document_store): count = await document_store.count_documents_async() assert count == 11 - async def test_async_write_and_delete_documents(self, document_store): - test_id = str(uuid.uuid4())[:8] - docs = [ - Document(id=f"async_del1_{test_id}", content="async delete me", embedding=[0.1, 0.2, 0.3]), - Document(id=f"async_del2_{test_id}", content="async delete me too", embedding=[0.4, 0.5, 0.6]), - ] - - await document_store.write_documents_async(docs) - count = await document_store.count_documents_async() - assert count == 2 - - await document_store.delete_documents_async([f"async_del1_{test_id}", f"async_del2_{test_id}"]) - count = await document_store.count_documents_async() - assert count == 0 - - async def test_async_overwrite_documents(self, document_store): - test_id = str(uuid.uuid4())[:8] - doc1 = Document(id=f"async_overwrite_{test_id}", content="async original", embedding=[0.1, 0.2, 0.3]) - - await document_store.write_documents_async([doc1]) - - doc2 = Document(id=f"async_overwrite_{test_id}", content="async updated", embedding=[0.4, 0.5, 0.6]) - result = await document_store.write_documents_async([doc2]) - - assert result == 1 - count = await document_store.count_documents_async() - assert count == 1 - async def test_async_search_by_embedding_no_limit(self, document_store): test_id = str(uuid.uuid4())[:8] docs = [ @@ -146,11 +222,9 @@ async def test_async_search_by_embedding_no_limit(self, document_store): await document_store.write_documents_async(docs) - # Verify documents are written count = await document_store.count_documents_async() assert count == 4 - # Search with embedding similar to first document query_embedding = [0.1, 0.2, 0.3] results = await document_store._embedding_retrieval_async(query_embedding, limit=100) @@ -187,11 +261,9 @@ async def test_async_search_by_embedding_with_limit(self, document_store): await document_store.write_documents_async(docs) - # Verify documents are written count = await document_store.count_documents_async() assert count == 4 - # Search with embedding similar to first document query_embedding = [0.1, 0.2, 0.3] results = await document_store._embedding_retrieval_async(query_embedding, limit=2) @@ -228,11 +300,9 @@ async def test_async_search_by_embedding_with_category_filter(self, document_sto await document_store.write_documents_async(docs) - # Verify documents are written count = await document_store.count_documents_async() assert count == 4 - # Search with embedding similar to first document query_embedding = [0.1, 0.2, 0.3] filters = {"operator": "AND", "conditions": [{"field": "meta.category", "operator": "==", "value": "test"}]} results = await document_store._embedding_retrieval_async(query_embedding, filters, limit=2) @@ -261,7 +331,6 @@ async def test_async_search_by_embedding_with_numeric_filter(self, document_stor assert len(results) == 2 assert {doc.id for doc in results} == {f"n2_{test_id}", f"n3_{test_id}"} - # Results should be ordered by similarity to query assert results[0].id == f"n2_{test_id}" # Closer to query embedding async def test_async_search_by_embedding_with_or_filter(self, document_store): @@ -300,7 +369,6 @@ async def test_async_search_by_embedding_with_or_filter(self, document_store): assert len(results) == 2 assert {doc.id for doc in results} == {f"o1_{test_id}", f"o2_{test_id}"} - # Results should be ordered by similarity assert results[0].id == f"o1_{test_id}" # Closer to query embedding async def test_async_search_by_embedding_with_in_filter(self, document_store): @@ -322,28 +390,8 @@ async def test_async_search_by_embedding_with_in_filter(self, document_store): assert len(results) == 2 assert {doc.id for doc in results} == {f"i1_{test_id}", f"i2_{test_id}"} - # Results should be ordered by similarity assert results[0].id == f"i1_{test_id}" # Closest to query - async def test_async_delete_all_documents(self, document_store): - docs = [ - Document(id="del1", content="doc 1", embedding=[0.1, 0.2, 0.3]), - Document(id="del2", content="doc 2", embedding=[0.4, 0.5, 0.6]), - Document(id="del3", content="doc 3", embedding=[0.7, 0.8, 0.9]), - ] - - await document_store.write_documents_async(docs) - assert await document_store.count_documents_async() == 3 - - await document_store.delete_all_documents_async() - assert await document_store.count_documents_async() == 0 - - async def test_async_delete_all_documents_empty_store(self, document_store): - assert await document_store.count_documents_async() == 0 - - await document_store.delete_all_documents_async() - assert await document_store.count_documents_async() == 0 - async def test_async_write_large_batch_performance(self, document_store): """Test writing large number of documents to verify batching performance""" test_id = str(uuid.uuid4())[:8] @@ -368,31 +416,25 @@ async def test_async_similarity_scores_are_set_correctly(self, document_store): await document_store.write_documents_async(docs) - # Search with query identical to first document query_embedding = [1.0, 0.0, 0.0] results = await document_store._embedding_retrieval_async(query_embedding, limit=10) - # All documents should have similarity scores set assert len(results) == 4 for doc in results: assert doc.score is not None, f"Document {doc.id} has no similarity score" assert isinstance(doc.score, float), f"Document {doc.id} score is not a float: {type(doc.score)}" - # Results should be ordered by similarity (highest first for cosine similarity) assert results[0].id == f"sim1_{test_id}" # Identical vector should have highest score assert results[1].id == f"sim2_{test_id}" # Similar vector should be second - # Verify scores are properly computed (not just dummy values) scores = [doc.score for doc in results] assert len(set(scores)) > 1, "All similarity scores are identical, suggesting they're not properly computed" - # Verify all results are sorted by similarity score (lower is better for distance metrics) for i in range(len(results) - 1): assert results[i].score <= results[i + 1].score, ( f"Results not sorted by score: {results[i].score} > {results[i + 1].score} at positions {i} and {i + 1}" ) - # The identical vector should have the lowest (best) similarity score assert results[0].score <= results[1].score, ( f"Expected identical vector to have best score, got {results[0].score} vs {results[1].score}" ) @@ -419,7 +461,6 @@ async def test_async_filter_by_meta_score(self, document_store): ] await document_store.write_documents_async(docs) - # Filter by meta.score >= 0.7 filters = {"operator": "AND", "conditions": [{"field": "meta.score", "operator": ">=", "value": 0.7}]} results = await document_store.filter_documents_async(filters) @@ -427,7 +468,6 @@ async def test_async_filter_by_meta_score(self, document_store): result_ids = {doc.id for doc in results} assert result_ids == {f"ms1_{test_id}", f"ms4_{test_id}"} - # Verify meta.score values are preserved for doc in results: assert "score" in doc.meta assert doc.meta["score"] >= 0.7 @@ -457,163 +497,21 @@ async def test_async_search_with_meta_score_filter(self, document_store): ] await document_store.write_documents_async(docs) - # Search with query similar to first document, but filter by meta.score query_embedding = [1.0, 0.0, 0.0] filters = {"operator": "AND", "conditions": [{"field": "meta.score", "operator": ">=", "value": 0.7}]} results = await document_store._embedding_retrieval_async(query_embedding, filters, limit=10) - # Should return documents with meta.score >= 0.7, ordered by vector similarity assert len(results) == 3 result_ids = [doc.id for doc in results] assert set(result_ids) == {f"sms1_{test_id}", f"sms3_{test_id}", f"sms4_{test_id}"} - # Verify ordering by vector similarity (sms1 should be first as it's most similar) assert results[0].id == f"sms1_{test_id}" - # Verify both similarity scores and meta scores are preserved for doc in results: assert doc.score is not None # Vector similarity score assert "score" in doc.meta # User metadata score assert doc.meta["score"] >= 0.7 - # --- delete_by_filter_async, update_by_filter_async, count_documents_by_filter_async --- - - async def test_delete_by_filter_async(self, document_store): - """Test async deleting documents that match a filter.""" - test_id = str(uuid.uuid4())[:8] - docs = [ - Document( - id=f"dbf1_{test_id}", - content="doc 1", - embedding=[0.1, 0.2, 0.3], - meta={"category": "remove", "priority": 1}, - ), - Document( - id=f"dbf2_{test_id}", - content="doc 2", - embedding=[0.2, 0.3, 0.4], - meta={"category": "keep", "priority": 2}, - ), - Document( - id=f"dbf3_{test_id}", - content="doc 3", - embedding=[0.3, 0.4, 0.5], - meta={"category": "remove", "priority": 3}, - ), - ] - await document_store.write_documents_async(docs) - assert await document_store.count_documents_async() == 3 - - filters = {"operator": "AND", "conditions": [{"field": "meta.category", "operator": "==", "value": "remove"}]} - deleted = await document_store.delete_by_filter_async(filters) - assert deleted == 2 - assert await document_store.count_documents_async() == 1 - remaining = await document_store.filter_documents_async(filters=None) - assert len(remaining) == 1 - assert remaining[0].meta.get("category") == "keep" - - async def test_update_by_filter_async(self, document_store): - """Test async updating metadata of documents that match a filter.""" - test_id = str(uuid.uuid4())[:8] - docs = [ - Document( - id=f"ubf1_{test_id}", - content="doc 1", - embedding=[0.1, 0.2, 0.3], - meta={"category": "news", "priority": 1}, - ), - Document( - id=f"ubf2_{test_id}", - content="doc 2", - embedding=[0.2, 0.3, 0.4], - meta={"category": "blog", "priority": 2}, - ), - Document( - id=f"ubf3_{test_id}", - content="doc 3", - embedding=[0.3, 0.4, 0.5], - meta={"category": "news", "priority": 3}, - ), - ] - await document_store.write_documents_async(docs) - - filters = {"operator": "AND", "conditions": [{"field": "meta.category", "operator": "==", "value": "news"}]} - updated = await document_store.update_by_filter_async(filters, meta={"status": "archived", "priority": 99}) - assert updated == 2 - - all_docs = await document_store.filter_documents_async(filters=None) - by_id = {d.id: d for d in all_docs} - assert by_id[f"ubf1_{test_id}"].meta.get("status") == "archived" - assert by_id[f"ubf1_{test_id}"].meta.get("priority") == 99 - assert by_id[f"ubf2_{test_id}"].meta.get("status") is None - assert by_id[f"ubf2_{test_id}"].meta.get("priority") == 2 - assert by_id[f"ubf3_{test_id}"].meta.get("status") == "archived" - assert by_id[f"ubf3_{test_id}"].meta.get("priority") == 99 - - async def test_count_documents_by_filter_async(self, document_store): - """Test async counting documents that match a filter.""" - test_id = str(uuid.uuid4())[:8] - docs = [ - Document( - id=f"cbf1_{test_id}", - content="doc 1", - embedding=[0.1, 0.2, 0.3], - meta={"category": "a", "priority": 1}, - ), - Document( - id=f"cbf2_{test_id}", - content="doc 2", - embedding=[0.2, 0.3, 0.4], - meta={"category": "b", "priority": 2}, - ), - Document( - id=f"cbf3_{test_id}", - content="doc 3", - embedding=[0.3, 0.4, 0.5], - meta={"category": "a", "priority": 3}, - ), - ] - await document_store.write_documents_async(docs) - - filters_a = {"operator": "AND", "conditions": [{"field": "meta.category", "operator": "==", "value": "a"}]} - assert await document_store.count_documents_by_filter_async(filters_a) == 2 - filters_b = {"operator": "AND", "conditions": [{"field": "meta.category", "operator": "==", "value": "b"}]} - assert await document_store.count_documents_by_filter_async(filters_b) == 1 - filters_none = {"operator": "AND", "conditions": [{"field": "meta.category", "operator": "==", "value": "z"}]} - assert await document_store.count_documents_by_filter_async(filters_none) == 0 - - async def test_count_unique_metadata_by_filter_async(self, document_store): - """Test async counting unique values per metadata field for documents matching a filter.""" - test_id = str(uuid.uuid4())[:8] - docs = [ - Document( - id=f"cumb1_{test_id}", - content="doc 1", - embedding=[0.1, 0.2, 0.3], - meta={"category": "tech", "priority": 1}, - ), - Document( - id=f"cumb2_{test_id}", - content="doc 2", - embedding=[0.2, 0.3, 0.4], - meta={"category": "tech", "priority": 2}, - ), - Document( - id=f"cumb3_{test_id}", - content="doc 3", - embedding=[0.3, 0.4, 0.5], - meta={"category": "news", "priority": 2}, - ), - ] - await document_store.write_documents_async(docs) - - filters = {"operator": "AND", "conditions": [{"field": "meta.priority", "operator": ">=", "value": 1}]} - counts = await document_store.count_unique_metadata_by_filter_async( - filters, metadata_fields=["category", "priority"] - ) - assert counts["category"] == 2 - assert counts["priority"] == 2 - async def test_get_metadata_fields_info_async(self, document_store): """Test get_metadata_fields_info (sync) returns configured field names and types.""" info = document_store.get_metadata_fields_info() @@ -625,34 +523,6 @@ async def test_get_metadata_fields_info_async(self, document_store): assert "score" in info assert "quality" in info - async def test_get_metadata_field_min_max_async(self, document_store): - """Test async get_metadata_field_min_max for a numeric field.""" - test_id = str(uuid.uuid4())[:8] - docs = [ - Document( - id=f"gmm1_{test_id}", - content="doc 1", - embedding=[0.1, 0.2, 0.3], - meta={"priority": 10, "category": "a"}, - ), - Document( - id=f"gmm2_{test_id}", - content="doc 2", - embedding=[0.2, 0.3, 0.4], - meta={"priority": 5, "category": "b"}, - ), - Document( - id=f"gmm3_{test_id}", - content="doc 3", - embedding=[0.3, 0.4, 0.5], - meta={"priority": 20, "category": "c"}, - ), - ] - await document_store.write_documents_async(docs) - result = await document_store.get_metadata_field_min_max_async("priority") - assert result["min"] == 5 - assert result["max"] == 20 - async def test_get_metadata_field_min_max_empty_store_async(self, document_store): """Test async get_metadata_field_min_max when store has no documents.""" result = await document_store.get_metadata_field_min_max_async("priority")