Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 52 additions & 42 deletions elementary/monitor/data_monitoring/schema.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
import re
from datetime import datetime
from enum import Enum
from typing import Any, Generic, Iterable, List, Optional, Pattern, Set, Tuple, TypeVar
from functools import cached_property
from typing import (
Generic,
Iterable,
List,
Optional,
Pattern,
Set,
Tuple,
TypeVar,
)

from elementary.utils.log import get_logger
from elementary.utils.pydantic_shim import BaseModel, Field, validator
Expand Down Expand Up @@ -49,23 +59,12 @@ def normalized_status(self) -> List[Status]:
return [Status(status) for status in self.statuses if status in list(Status)]


def apply_filter(filter_type: FilterType, value: Any, filter_value: Any) -> bool:
if filter_type == FilterType.IS:
return value == filter_value
elif filter_type == FilterType.IS_NOT:
return value != filter_value
elif filter_type == FilterType.CONTAINS:
return str(filter_value).lower() in str(value).lower()
elif filter_type == FilterType.NOT_CONTAINS:
return str(filter_value).lower() not in str(value).lower()
raise ValueError(f"Unsupported filter type: {filter_type}")


ValueT = TypeVar("ValueT")


ANY_OPERATORS = [FilterType.IS, FilterType.CONTAINS]
ALL_OPERATORS = [FilterType.IS_NOT, FilterType.NOT_CONTAINS]
NEGATIVE_OPERATORS = [FilterType.IS_NOT, FilterType.NOT_CONTAINS]


class FilterSchema(BaseModel, Generic[ValueT]):
Expand All @@ -77,43 +76,54 @@ class Config:
# Make sure that serializing Enum return values
use_enum_values = True

def _apply_filter_type(self, value: ValueT, filter_value: ValueT) -> bool:
return apply_filter(self.type, value, filter_value)

def apply_filter_on_value(self, value: ValueT) -> bool:
if self.type in ANY_OPERATORS:
return any(
self._apply_filter_type(value, filter_value)
for filter_value in self.values
)
elif self.type in ALL_OPERATORS:
return all(
self._apply_filter_type(value, filter_value)
for filter_value in self.values
)
raise ValueError(f"Unsupported filter type: {self.type}")
@cached_property
def _normalized_values(self) -> list[str]:
return [str(value).lower() for value in self.values]

def apply_filter_on_values(self, values: List[ValueT]) -> bool:
if self.type in ANY_OPERATORS:
return any(self.apply_filter_on_value(value) for value in values)
elif self.type in ALL_OPERATORS:
return all(self.apply_filter_on_value(value) for value in values)
raise ValueError(f"Unsupported filter type: {self.type}")
@cached_property
def _values_set(self) -> Set[ValueT]:
return set(self.values)

def get_matching_values(self, values: Iterable[ValueT]) -> Set[ValueT]:
values_list = set(values)
matching_values = set(
value for value in values_list if self.apply_filter_on_value(value)
)
if self.type in ANY_OPERATORS:
values_set = set(values)
if self.type == FilterType.IS:
return values_set.intersection(self._values_set)
elif self.type == FilterType.IS_NOT:
matching_values = values_set.difference(self._values_set)
if len(matching_values) != len(values_set):
return set()
return matching_values
elif self.type in ALL_OPERATORS:
if len(matching_values) != len(values_list):
if self.type == FilterType.CONTAINS:
return set(
value
for value in values_set
if any(
filter_value in str(value).lower()
for filter_value in self._normalized_values
)
)
if self.type == FilterType.NOT_CONTAINS:
matching_values = set(
value
for value in values_set
if not any(
filter_value in str(value).lower()
for filter_value in self._normalized_values
)
)
if len(matching_values) != len(values_set):
return set()
return matching_values

raise ValueError(f"Unsupported filter type: {self.type}")

def apply_filter_on_values(self, values: List[ValueT]) -> bool:
if self.type in NEGATIVE_OPERATORS and not values:
return True
return bool(self.get_matching_values(values))

def apply_filter_on_value(self, value: ValueT) -> bool:
return self.apply_filter_on_values([value])


class StatusFilterSchema(FilterSchema[Status]):
values: List[Status]
Expand Down
Loading