From 9c9166127b0e4a53385fc9f65d2d1bc4e09d9ac5 Mon Sep 17 00:00:00 2001 From: Ofek Weiss Date: Sun, 24 Aug 2025 13:02:43 +0300 Subject: [PATCH 1/7] use sets for alert filters --- elementary/monitor/data_monitoring/schema.py | 94 +++++++++++--------- 1 file changed, 52 insertions(+), 42 deletions(-) diff --git a/elementary/monitor/data_monitoring/schema.py b/elementary/monitor/data_monitoring/schema.py index 315886a42..556b224b7 100644 --- a/elementary/monitor/data_monitoring/schema.py +++ b/elementary/monitor/data_monitoring/schema.py @@ -1,7 +1,17 @@ import re from datetime import datetime from enum import Enum -from typing import Any, Generic, Iterable, List, Optional, Pattern, Set, Tuple, TypeVar +from functools import cached_property +from typing import ( + Generic, + Iterable, + List, + Optional, + Pattern, + Set, + Tuple, + TypeVar, +) from elementary.utils.log import get_logger from elementary.utils.pydantic_shim import BaseModel, Field, validator @@ -49,23 +59,12 @@ def normalized_status(self) -> List[Status]: return [Status(status) for status in self.statuses if status in list(Status)] -def apply_filter(filter_type: FilterType, value: Any, filter_value: Any) -> bool: - if filter_type == FilterType.IS: - return value == filter_value - elif filter_type == FilterType.IS_NOT: - return value != filter_value - elif filter_type == FilterType.CONTAINS: - return str(filter_value).lower() in str(value).lower() - elif filter_type == FilterType.NOT_CONTAINS: - return str(filter_value).lower() not in str(value).lower() - raise ValueError(f"Unsupported filter type: {filter_type}") - - ValueT = TypeVar("ValueT") ANY_OPERATORS = [FilterType.IS, FilterType.CONTAINS] ALL_OPERATORS = [FilterType.IS_NOT, FilterType.NOT_CONTAINS] +NEGATIVE_OPERATORS = [FilterType.IS_NOT, FilterType.NOT_CONTAINS] class FilterSchema(BaseModel, Generic[ValueT]): @@ -77,43 +76,54 @@ class Config: # Make sure that serializing Enum return values use_enum_values = True - def _apply_filter_type(self, value: ValueT, filter_value: ValueT) -> bool: - return apply_filter(self.type, value, filter_value) - - def apply_filter_on_value(self, value: ValueT) -> bool: - if self.type in ANY_OPERATORS: - return any( - self._apply_filter_type(value, filter_value) - for filter_value in self.values - ) - elif self.type in ALL_OPERATORS: - return all( - self._apply_filter_type(value, filter_value) - for filter_value in self.values - ) - raise ValueError(f"Unsupported filter type: {self.type}") + @cached_property + def _normalized_values(self) -> list[str]: + return [str(value).lower() for value in self.values] - def apply_filter_on_values(self, values: List[ValueT]) -> bool: - if self.type in ANY_OPERATORS: - return any(self.apply_filter_on_value(value) for value in values) - elif self.type in ALL_OPERATORS: - return all(self.apply_filter_on_value(value) for value in values) - raise ValueError(f"Unsupported filter type: {self.type}") + @cached_property + def _values_set(self) -> Set[ValueT]: + return set(self.values) def get_matching_values(self, values: Iterable[ValueT]) -> Set[ValueT]: - values_list = set(values) - matching_values = set( - value for value in values_list if self.apply_filter_on_value(value) - ) - if self.type in ANY_OPERATORS: + values_set = set(values) + if self.type == FilterType.IS: + return values_set.intersection(self._values_set) + elif self.type == FilterType.IS_NOT: + matching_values = values_set.difference(self._values_set) + if len(matching_values) != len(values_set): + return set() return matching_values - elif self.type in ALL_OPERATORS: - if len(matching_values) != len(values_list): + if self.type == FilterType.CONTAINS: + return set( + value + for value in values_set + if any( + filter_value in str(value).lower() + for filter_value in self._normalized_values + ) + ) + if self.type == FilterType.NOT_CONTAINS: + matching_values = set( + value + for value in values_set + if not any( + filter_value in str(value).lower() + for filter_value in self._normalized_values + ) + ) + if len(matching_values) != len(values_set): return set() return matching_values - raise ValueError(f"Unsupported filter type: {self.type}") + def apply_filter_on_values(self, values: List[ValueT]) -> bool: + if self.type in NEGATIVE_OPERATORS and not values: + return True + return bool(self.get_matching_values(values)) + + def apply_filter_on_value(self, value: ValueT) -> bool: + return self.apply_filter_on_values([value]) + class StatusFilterSchema(FilterSchema[Status]): values: List[Status] From 930cf8419b392399f905295a5b399a4d3ceab4f9 Mon Sep 17 00:00:00 2001 From: Ofek Weiss Date: Sun, 24 Aug 2025 16:00:59 +0300 Subject: [PATCH 2/7] extract normalization --- elementary/monitor/data_monitoring/schema.py | 36 +++++++++++++------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/elementary/monitor/data_monitoring/schema.py b/elementary/monitor/data_monitoring/schema.py index 556b224b7..95d7c5cd4 100644 --- a/elementary/monitor/data_monitoring/schema.py +++ b/elementary/monitor/data_monitoring/schema.py @@ -3,6 +3,7 @@ from enum import Enum from functools import cached_property from typing import ( + Any, Generic, Iterable, List, @@ -76,27 +77,32 @@ class Config: # Make sure that serializing Enum return values use_enum_values = True - @cached_property - def _normalized_values(self) -> list[str]: - return [str(value).lower() for value in self.values] + @staticmethod + def normalize_value(value: Any) -> str: + if isinstance(value, Enum): + return str(value.value).lower() + return str(value).lower() + + @staticmethod + def normalize_values(values: Iterable[ValueT]) -> Set[str]: + return {FilterSchema.normalize_value(value) for value in values} @cached_property - def _values_set(self) -> Set[ValueT]: - return set(self.values) + def _normalized_values(self) -> Set[str]: + return FilterSchema.normalize_values(self.values) - def get_matching_values(self, values: Iterable[ValueT]) -> Set[ValueT]: - values_set = set(values) + def get_matching_normalized_values(self, values: Set[str]) -> Set[str]: if self.type == FilterType.IS: - return values_set.intersection(self._values_set) + return values.intersection(self._normalized_values) elif self.type == FilterType.IS_NOT: - matching_values = values_set.difference(self._values_set) - if len(matching_values) != len(values_set): + matching_values = values.difference(self._normalized_values) + if len(matching_values) != len(values): return set() return matching_values if self.type == FilterType.CONTAINS: return set( value - for value in values_set + for value in values if any( filter_value in str(value).lower() for filter_value in self._normalized_values @@ -105,17 +111,21 @@ def get_matching_values(self, values: Iterable[ValueT]) -> Set[ValueT]: if self.type == FilterType.NOT_CONTAINS: matching_values = set( value - for value in values_set + for value in values if not any( filter_value in str(value).lower() for filter_value in self._normalized_values ) ) - if len(matching_values) != len(values_set): + if len(matching_values) != len(values): return set() return matching_values raise ValueError(f"Unsupported filter type: {self.type}") + def get_matching_values(self, values: Iterable[ValueT]) -> Set[str]: + values_set = FilterSchema.normalize_values(values) + return self.get_matching_normalized_values(values_set) + def apply_filter_on_values(self, values: List[ValueT]) -> bool: if self.type in NEGATIVE_OPERATORS and not values: return True From 5d423c6178b654800077f2f0652bc09116679d98 Mon Sep 17 00:00:00 2001 From: Ofek Weiss Date: Sun, 24 Aug 2025 16:08:27 +0300 Subject: [PATCH 3/7] isort --- elementary/monitor/data_monitoring/schema.py | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/elementary/monitor/data_monitoring/schema.py b/elementary/monitor/data_monitoring/schema.py index 95d7c5cd4..c427ac53b 100644 --- a/elementary/monitor/data_monitoring/schema.py +++ b/elementary/monitor/data_monitoring/schema.py @@ -2,17 +2,7 @@ from datetime import datetime from enum import Enum from functools import cached_property -from typing import ( - Any, - Generic, - Iterable, - List, - Optional, - Pattern, - Set, - Tuple, - TypeVar, -) +from typing import Any, Generic, Iterable, List, Optional, Pattern, Set, Tuple, TypeVar from elementary.utils.log import get_logger from elementary.utils.pydantic_shim import BaseModel, Field, validator From 5af40f2c1af06929e87310350bc84c41554c0e5b Mon Sep 17 00:00:00 2001 From: mschmidoev <88450858+mschmidoev@users.noreply.github.com> Date: Tue, 19 Aug 2025 10:43:45 +0900 Subject: [PATCH 4/7] Fix: Replace double quotes with single quotes in elementary_database_and_schema - Replace double quotes with single quotes for proper SQL compatibility - Fixes SQL errors in Slack alert messages when database/schema names contain quotes - Added conditional check to handle None values safely --- elementary/monitor/data_monitoring/data_monitoring.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/elementary/monitor/data_monitoring/data_monitoring.py b/elementary/monitor/data_monitoring/data_monitoring.py index 74888eea8..3e142e5ea 100644 --- a/elementary/monitor/data_monitoring/data_monitoring.py +++ b/elementary/monitor/data_monitoring/data_monitoring.py @@ -79,6 +79,8 @@ def get_elementary_database_and_schema(self): relation = self.internal_dbt_runner.run_operation( "elementary_cli.get_elementary_database_and_schema", quiet=True )[0] + # Replace double quotes with single quotes for proper SQL compatibility + relation = relation.replace('"', "'") if relation else relation logger.info(f"Elementary's database and schema: '{relation}'") return relation except Exception as ex: From 2734f7853b960f7ca0e4b11b72249772a47975cf Mon Sep 17 00:00:00 2001 From: mschmidoev <88450858+mschmidoev@users.noreply.github.com> Date: Tue, 19 Aug 2025 11:56:30 +0900 Subject: [PATCH 5/7] Replace double quotes with backticks for better SQL compatibility --- elementary/monitor/data_monitoring/data_monitoring.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/elementary/monitor/data_monitoring/data_monitoring.py b/elementary/monitor/data_monitoring/data_monitoring.py index 3e142e5ea..68fcaa15c 100644 --- a/elementary/monitor/data_monitoring/data_monitoring.py +++ b/elementary/monitor/data_monitoring/data_monitoring.py @@ -79,8 +79,8 @@ def get_elementary_database_and_schema(self): relation = self.internal_dbt_runner.run_operation( "elementary_cli.get_elementary_database_and_schema", quiet=True )[0] - # Replace double quotes with single quotes for proper SQL compatibility - relation = relation.replace('"', "'") if relation else relation + # Replace double quotes with backticks for proper SQL compatibility + relation = relation.replace('"', "`") if relation else relation logger.info(f"Elementary's database and schema: '{relation}'") return relation except Exception as ex: From a3a358155e74b62aad9da2cc462ff7211a4a603f Mon Sep 17 00:00:00 2001 From: mschmidoev <88450858+mschmidoev@users.noreply.github.com> Date: Mon, 25 Aug 2025 11:48:50 +0900 Subject: [PATCH 6/7] Wrap each identifier part in double quotes for standard SQL compatibility --- elementary/monitor/data_monitoring/data_monitoring.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/elementary/monitor/data_monitoring/data_monitoring.py b/elementary/monitor/data_monitoring/data_monitoring.py index 68fcaa15c..43a8d9e95 100644 --- a/elementary/monitor/data_monitoring/data_monitoring.py +++ b/elementary/monitor/data_monitoring/data_monitoring.py @@ -79,8 +79,12 @@ def get_elementary_database_and_schema(self): relation = self.internal_dbt_runner.run_operation( "elementary_cli.get_elementary_database_and_schema", quiet=True )[0] - # Replace double quotes with backticks for proper SQL compatibility - relation = relation.replace('"', "`") if relation else relation + # Split on dot and wrap each part in double quotes for standard SQL compatibility + if relation and "." in relation: + db, schema = relation.split(".", 1) + relation = f'"{db}"."{schema}"' + elif relation: + relation = f'"{relation}"' logger.info(f"Elementary's database and schema: '{relation}'") return relation except Exception as ex: From da1246a93b2643754d57d882aecf68f410a16144 Mon Sep 17 00:00:00 2001 From: Michael Schmid Date: Tue, 2 Dec 2025 11:26:18 +0000 Subject: [PATCH 7/7] Simplify by just stripping the " --- .../monitor/data_monitoring/data_monitoring.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/elementary/monitor/data_monitoring/data_monitoring.py b/elementary/monitor/data_monitoring/data_monitoring.py index 43a8d9e95..dde9fd000 100644 --- a/elementary/monitor/data_monitoring/data_monitoring.py +++ b/elementary/monitor/data_monitoring/data_monitoring.py @@ -79,14 +79,10 @@ def get_elementary_database_and_schema(self): relation = self.internal_dbt_runner.run_operation( "elementary_cli.get_elementary_database_and_schema", quiet=True )[0] - # Split on dot and wrap each part in double quotes for standard SQL compatibility - if relation and "." in relation: - db, schema = relation.split(".", 1) - relation = f'"{db}"."{schema}"' - elif relation: - relation = f'"{relation}"' - logger.info(f"Elementary's database and schema: '{relation}'") - return relation + # Strip quotes from relation to avoid case sensitivity issues and double-quoting + strip_relation = relation.strip('"') + logger.info(f"Elementary's database and schema: '{strip_relation}'") + return strip_relation except Exception as ex: logger.error("Failed to parse Elementary's database and schema.") if self.tracking: