Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 135 additions & 0 deletions integrations/valkey/tests/test_document_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@

import struct
from dataclasses import replace
from unittest.mock import MagicMock, patch

import pytest
from glide_shared.commands.server_modules.ft_options.ft_create_options import DistanceMetricType
from glide_shared.commands.server_modules.ft_options.ft_search_options import FtSearchOptions
from haystack.dataclasses import Document
from haystack.dataclasses.byte_stream import ByteStream
from haystack.document_stores.types import DuplicatePolicy
from haystack.errors import FilterError
from haystack.testing.document_store import (
CountDocumentsByFilterTest,
CountDocumentsTest,
Expand All @@ -31,6 +33,8 @@
from haystack.utils import Secret

from haystack_integrations.document_stores.valkey import ValkeyDocumentStore
from haystack_integrations.document_stores.valkey import document_store as ds_module
from haystack_integrations.document_stores.valkey.document_store import ValkeyDocumentStoreError


def _filterable_docs_embedding_dim_3() -> list[Document]:
Expand Down Expand Up @@ -1087,6 +1091,22 @@ def test_static_methods_dont_need_instance(self):
ValkeyDocumentStore._validate_documents([Document(id="1", content="test")])
ValkeyDocumentStore._validate_policy(DuplicatePolicy.NONE)

@pytest.mark.parametrize(
"metadata_fields, expected_error",
[
("not_a_dict", r"metadata_fields must be a dictionary"),
({"": str}, r"Field name must be a non-empty string"),
({"category": list}, r"Unsupported field type"),
],
)
def test_validate_and_normalize_metadata_fields_rejects_invalid_inputs(self, metadata_fields, expected_error):
with pytest.raises(ValueError, match=expected_error):
ValkeyDocumentStore._validate_and_normalize_metadata_fields(metadata_fields)

def test_validate_policy_logs_warning_for_unsupported_policy(self, caplog):
ValkeyDocumentStore._validate_policy(DuplicatePolicy.SKIP)
assert "only supports `DuplicatePolicy.OVERWRITE`" in caplog.text


class TestValkeyDocumentStoreConverters:
def test_to_dict(self):
Expand Down Expand Up @@ -1205,3 +1225,118 @@ def test_prepare_document_dict_validates_numeric_field_type():

with pytest.raises(ValueError, match="Field 'priority' expects numeric value but got str"):
store._prepare_document_dict(doc)


@pytest.fixture
def unit_store():
return ValkeyDocumentStore(
index_name="unit_test",
embedding_dim=3,
metadata_fields={"category": str, "priority": int},
)


class TestValkeyDocumentStoreErrorPaths:
@pytest.mark.parametrize("cluster_mode", [False, True])
def test_get_connection_wraps_create_errors(self, unit_store, cluster_mode):
unit_store._cluster_mode = cluster_mode
target = "SyncGlideClusterClient" if cluster_mode else "SyncGlideClient"
with patch.object(ds_module, target) as mock_cls:
mock_cls.create.side_effect = RuntimeError("connect failed")
with pytest.raises(ValkeyDocumentStoreError, match="Failed to connect to Valkey"):
unit_store._get_connection()

def test_close_swallows_client_exception(self, unit_store, caplog):
unit_store._client = MagicMock()
unit_store._client.close.side_effect = RuntimeError("close boom")
unit_store.close()
assert unit_store._client is None
assert "Failed to close Valkey client" in caplog.text

def test_count_documents_returns_zero_when_no_index(self, unit_store):
unit_store._client = MagicMock()
with patch.object(unit_store, "_has_index", return_value=False):
assert unit_store.count_documents() == 0

def test_embedding_retrieval_returns_empty_when_no_index(self, unit_store, caplog):
unit_store._client = MagicMock()
with patch.object(unit_store, "_has_index", return_value=False):
assert unit_store._embedding_retrieval([0.1, 0.2, 0.3]) == []
assert "does not exist" in caplog.text

def test_embedding_retrieval_returns_empty_for_nonpositive_limit(self, unit_store):
unit_store._client = MagicMock()
with patch.object(unit_store, "_has_index", return_value=True):
assert unit_store._embedding_retrieval([0.1, 0.2, 0.3], limit=0) == []
assert unit_store._embedding_retrieval([0.1, 0.2, 0.3], limit=-1) == []

def test_embedding_retrieval_reraises_filter_error(self, unit_store):
unit_store._client = MagicMock()
with patch.object(unit_store, "_has_index", return_value=True):
bad_filters = {
"operator": "AND",
"conditions": [{"field": "meta.unknown", "operator": "==", "value": "x"}],
}
with pytest.raises(FilterError):
unit_store._embedding_retrieval([0.1, 0.2, 0.3], filters=bad_filters, limit=5)

def test_write_documents_empty_list_returns_zero(self, unit_store, caplog):
assert unit_store.write_documents([]) == 0
assert "empty list" in caplog.text

def test_delete_all_documents_without_index_is_noop(self, unit_store):
unit_store._client = MagicMock()
with (
patch.object(unit_store, "_has_index", return_value=False),
patch.object(ds_module, "sync_ft") as mock_ft,
):
unit_store.delete_all_documents()
mock_ft.dropindex.assert_not_called()

def test_create_index_wraps_non_ok_response(self, unit_store):
unit_store._client = MagicMock()
with (
patch.object(unit_store, "_has_index", return_value=False),
patch.object(ds_module, "sync_ft") as mock_ft,
):
mock_ft.create.return_value = b"ERR"
with pytest.raises(ValkeyDocumentStoreError, match="Error creating collection"):
unit_store._create_index()

@pytest.mark.parametrize(
"method_name, args, expected_msg",
[
(
"filter_documents",
({"field": "meta.category", "operator": "==", "value": "x"},),
"Error filtering documents",
),
(
"delete_by_filter",
({"field": "meta.category", "operator": "==", "value": "x"},),
"Failed to delete documents by filter",
),
(
"update_by_filter",
({"field": "meta.category", "operator": "==", "value": "x"}, {"status": "new"}),
"Failed to update documents by filter",
),
(
"count_documents_by_filter",
({"field": "meta.category", "operator": "==", "value": "x"},),
"Failed to count documents by filter",
),
(
"count_unique_metadata_by_filter",
({"field": "meta.category", "operator": "==", "value": "x"}, ["category"]),
"Failed to count unique metadata by filter",
),
("get_metadata_field_min_max", ("priority",), "Failed to get min/max for field"),
("get_metadata_field_unique_values", ("category",), "Failed to get unique values for field"),
],
)
def test_filter_dependent_methods_wrap_internal_errors(self, unit_store, method_name, args, expected_msg):
with patch.object(unit_store, "count_documents", side_effect=RuntimeError("boom")):
method = getattr(unit_store, method_name)
with pytest.raises(ValkeyDocumentStoreError, match=expected_msg):
method(*args)
118 changes: 117 additions & 1 deletion integrations/valkey/tests/test_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,15 @@
from haystack.testing.document_store import FilterDocumentsTest

from haystack_integrations.document_stores.valkey import ValkeyDocumentStore
from haystack_integrations.document_stores.valkey.filters import _normalize_filters, _validate_filters
from haystack_integrations.document_stores.valkey.filters import (
_greater_than,
_greater_than_equal,
_less_than,
_less_than_equal,
_normalize_filters,
_not_equal,
_validate_filters,
)


@pytest.mark.integration
Expand Down Expand Up @@ -177,6 +185,32 @@ def test_comparison_less_than_equal_with_list(self, document_store, filterable_d
},
"(@meta_timestamp:[1609459200 +inf] @meta_timestamp:[-inf (1640995200])",
),
# NOT logical operator combining two conditions
(
{
"operator": "NOT",
"conditions": [
{"field": "meta.category", "operator": "==", "value": "spam"},
{"field": "meta.priority", "operator": ">", "value": 5},
],
},
"-(@meta_category:{spam} @meta_priority:[(5 +inf])",
),
# NOT IN for NumericField
(
{"operator": "AND", "conditions": [{"field": "meta.priority", "operator": "not in", "value": [1, 2]}]},
"(-(@meta_priority:[1 1] | @meta_priority:[2 2]))",
),
# Field already with meta_ prefix (no "meta." remapping)
(
{"operator": "AND", "conditions": [{"field": "meta_category", "operator": "==", "value": "news"}]},
"(@meta_category:{news})",
),
# Bool value normalized to int on numeric field
(
{"operator": "AND", "conditions": [{"field": "meta.priority", "operator": "==", "value": True}]},
"(@meta_priority:[1 1])",
),
]


Expand Down Expand Up @@ -327,3 +361,85 @@ def test_numeric_not_equal():
def test_filters_must_be_dict():
with pytest.raises(FilterError, match="Filters must be a dictionary"):
_normalize_filters("invalid", DEFAULT_SUPPORTED_FIELDS)


@pytest.mark.parametrize("operator", [">", ">=", "<", "<="])
@pytest.mark.parametrize(
"value, expected_error",
[
(None, r"None value is not supported"),
("bad_string", r"requires numeric value"),
],
ids=["none_value", "non_numeric_value"],
)
def test_numeric_comparison_invalid_inputs(operator, value, expected_error):
with pytest.raises(FilterError, match=expected_error):
_normalize_filters(
{"operator": "AND", "conditions": [{"field": "meta.score", "operator": operator, "value": value}]},
DEFAULT_SUPPORTED_FIELDS,
)


@pytest.mark.parametrize(
"func, op_symbol",
[
(_greater_than, ">"),
(_greater_than_equal, ">="),
(_less_than, "<"),
(_less_than_equal, "<="),
],
)
def test_numeric_comparison_fn_rejects_tag_field_type(func, op_symbol):
with pytest.raises(FilterError, match=f"Operator '{op_symbol}' not supported for TagField"):
func("meta_category", "value", "tag")


@pytest.mark.parametrize(
"operator, expected",
[
("==", "(-@meta_score:[-inf +inf])"),
("!=", "(@meta_score:[-inf +inf])"),
],
)
def test_equal_operators_with_none_on_numeric(operator, expected):
result = _normalize_filters(
{"operator": "AND", "conditions": [{"field": "meta.score", "operator": operator, "value": None}]},
DEFAULT_SUPPORTED_FIELDS,
)
assert result == expected


@pytest.mark.parametrize("operator", ["==", "!="])
@pytest.mark.parametrize(
"field, value, expected_error",
[
("meta.category", 123, r"TagField 'meta_category' requires string value"),
("meta.score", "bad", r"NumericField 'meta_score' requires numeric value"),
],
)
def test_equality_operators_reject_wrong_value_types(operator, field, value, expected_error):
with pytest.raises(FilterError, match=expected_error):
_normalize_filters(
{"operator": "AND", "conditions": [{"field": field, "operator": operator, "value": value}]},
DEFAULT_SUPPORTED_FIELDS,
)


def test_not_equal_fn_direct_call_with_none_on_tag_field():
assert _not_equal("meta_category", None, "tag") == "@meta_category:[-inf +inf]"


@pytest.mark.parametrize(
"field, value, expected_error",
[
("meta.category", "not_a_list", r"'not in' operator requires a list value"),
("meta.category", ["valid", 123], r"TagField 'meta_category' requires string values in list"),
("meta.priority", [1, "invalid"], r"NumericField 'meta_priority' requires numeric values in list"),
],
)
def test_not_in_rejects_invalid_inputs(field, value, expected_error):
with pytest.raises(FilterError, match=expected_error):
_normalize_filters(
{"operator": "AND", "conditions": [{"field": field, "operator": "not in", "value": value}]},
DEFAULT_SUPPORTED_FIELDS,
)
Loading