diff --git a/elementary/monitor/data_monitoring/schema.py b/elementary/monitor/data_monitoring/schema.py index 764550527..b7694485b 100644 --- a/elementary/monitor/data_monitoring/schema.py +++ b/elementary/monitor/data_monitoring/schema.py @@ -1,7 +1,7 @@ import re from datetime import datetime from enum import Enum -from typing import Any, Generic, List, Optional, Pattern, Tuple, TypeVar +from typing import Any, Generic, Iterable, List, Optional, Pattern, Set, Tuple, TypeVar from elementary.utils.log import get_logger from elementary.utils.pydantic_shim import BaseModel, Field, validator @@ -97,6 +97,20 @@ def apply_filter_on_values(self, values: List[ValueT]) -> bool: return all(self.apply_filter_on_value(value) for value in values) raise ValueError(f"Unsupported filter type: {self.type}") + def get_matching_values(self, values: Iterable[ValueT]) -> Set[ValueT]: + values_list = set(values) + matching_values = set( + value for value in values_list if self.apply_filter_on_value(value) + ) + if self.type in ANY_OPERATORS: + return matching_values + elif self.type in ALL_OPERATORS: + if len(matching_values) != len(values_list): + return set() + return matching_values + + raise ValueError(f"Unsupported filter type: {self.type}") + class StatusFilterSchema(FilterSchema[Status]): values: List[Status] diff --git a/tests/unit/monitor/data_monitoring/test_filter_schema.py b/tests/unit/monitor/data_monitoring/test_filter_schema.py index 34b7de270..69b35f048 100644 --- a/tests/unit/monitor/data_monitoring/test_filter_schema.py +++ b/tests/unit/monitor/data_monitoring/test_filter_schema.py @@ -71,3 +71,26 @@ def test_filter_schema_apply_filter_on_values_contains_operator(): # Should not match when no values contain any filter values assert filter_schema.apply_filter_on_values(["abc", "xyz"]) is False + + +def test_get_matching_values() -> None: + filter_schema = FilterSchema(values=["test1", "test2"], type=FilterType.IS) + values = ["test1", "test3", "test4"] + assert filter_schema.get_matching_values(values) == {"test1"} + + filter_schema = FilterSchema(values=["test"], type=FilterType.CONTAINS) + values = ["test1", "testing", "other"] + assert filter_schema.get_matching_values(values) == {"test1", "testing"} + + filter_schema = FilterSchema(values=["test1"], type=FilterType.IS_NOT) + values = ["test2", "test3"] + assert filter_schema.get_matching_values(values) == {"test2", "test3"} + + filter_schema = FilterSchema(values=["test1"], type=FilterType.IS_NOT) + values = ["test1", "test2", "test3"] + assert filter_schema.get_matching_values(values) == set() + + filter_schema = FilterSchema(values=["test"], type=FilterType.IS) + filter_schema.type = "unsupported" # type: ignore + with pytest.raises(ValueError, match="Unsupported filter type: unsupported"): + filter_schema.get_matching_values(values)