diff --git a/elementary/monitor/data_monitoring/schema.py b/elementary/monitor/data_monitoring/schema.py index 315886a42..c427ac53b 100644 --- a/elementary/monitor/data_monitoring/schema.py +++ b/elementary/monitor/data_monitoring/schema.py @@ -1,6 +1,7 @@ import re from datetime import datetime from enum import Enum +from functools import cached_property from typing import Any, Generic, Iterable, List, Optional, Pattern, Set, Tuple, TypeVar from elementary.utils.log import get_logger @@ -49,23 +50,12 @@ def normalized_status(self) -> List[Status]: return [Status(status) for status in self.statuses if status in list(Status)] -def apply_filter(filter_type: FilterType, value: Any, filter_value: Any) -> bool: - if filter_type == FilterType.IS: - return value == filter_value - elif filter_type == FilterType.IS_NOT: - return value != filter_value - elif filter_type == FilterType.CONTAINS: - return str(filter_value).lower() in str(value).lower() - elif filter_type == FilterType.NOT_CONTAINS: - return str(filter_value).lower() not in str(value).lower() - raise ValueError(f"Unsupported filter type: {filter_type}") - - ValueT = TypeVar("ValueT") ANY_OPERATORS = [FilterType.IS, FilterType.CONTAINS] ALL_OPERATORS = [FilterType.IS_NOT, FilterType.NOT_CONTAINS] +NEGATIVE_OPERATORS = [FilterType.IS_NOT, FilterType.NOT_CONTAINS] class FilterSchema(BaseModel, Generic[ValueT]): @@ -77,42 +67,62 @@ class Config: # Make sure that serializing Enum return values use_enum_values = True - def _apply_filter_type(self, value: ValueT, filter_value: ValueT) -> bool: - return apply_filter(self.type, value, filter_value) + @staticmethod + def normalize_value(value: Any) -> str: + if isinstance(value, Enum): + return str(value.value).lower() + return str(value).lower() - def apply_filter_on_value(self, value: ValueT) -> bool: - if self.type in ANY_OPERATORS: - return any( - self._apply_filter_type(value, filter_value) - for filter_value in self.values + @staticmethod + def normalize_values(values: Iterable[ValueT]) -> Set[str]: + return {FilterSchema.normalize_value(value) for value in values} + + @cached_property + def _normalized_values(self) -> Set[str]: + return FilterSchema.normalize_values(self.values) + + def get_matching_normalized_values(self, values: Set[str]) -> Set[str]: + if self.type == FilterType.IS: + return values.intersection(self._normalized_values) + elif self.type == FilterType.IS_NOT: + matching_values = values.difference(self._normalized_values) + if len(matching_values) != len(values): + return set() + return matching_values + if self.type == FilterType.CONTAINS: + return set( + value + for value in values + if any( + filter_value in str(value).lower() + for filter_value in self._normalized_values + ) ) - elif self.type in ALL_OPERATORS: - return all( - self._apply_filter_type(value, filter_value) - for filter_value in self.values + if self.type == FilterType.NOT_CONTAINS: + matching_values = set( + value + for value in values + if not any( + filter_value in str(value).lower() + for filter_value in self._normalized_values + ) ) + if len(matching_values) != len(values): + return set() + return matching_values raise ValueError(f"Unsupported filter type: {self.type}") - def apply_filter_on_values(self, values: List[ValueT]) -> bool: - if self.type in ANY_OPERATORS: - return any(self.apply_filter_on_value(value) for value in values) - elif self.type in ALL_OPERATORS: - return all(self.apply_filter_on_value(value) for value in values) - raise ValueError(f"Unsupported filter type: {self.type}") + def get_matching_values(self, values: Iterable[ValueT]) -> Set[str]: + values_set = FilterSchema.normalize_values(values) + return self.get_matching_normalized_values(values_set) - def get_matching_values(self, values: Iterable[ValueT]) -> Set[ValueT]: - values_list = set(values) - matching_values = set( - value for value in values_list if self.apply_filter_on_value(value) - ) - if self.type in ANY_OPERATORS: - return matching_values - elif self.type in ALL_OPERATORS: - if len(matching_values) != len(values_list): - return set() - return matching_values + def apply_filter_on_values(self, values: List[ValueT]) -> bool: + if self.type in NEGATIVE_OPERATORS and not values: + return True + return bool(self.get_matching_values(values)) - raise ValueError(f"Unsupported filter type: {self.type}") + def apply_filter_on_value(self, value: ValueT) -> bool: + return self.apply_filter_on_values([value]) class StatusFilterSchema(FilterSchema[Status]):