From b1c6b7b9ee4ba3deb9c324c2ecd4402c964e3052 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Thu, 21 May 2026 13:38:59 -0700 Subject: [PATCH 1/7] feat(ingestion): cross-connector filter visibility report MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Generalize Snowflake's per-database discovery + filter-reason logging (#28336) to a reusable helper used by every connector family — Database, Dashboard, Pipeline, Messaging, Storage, MLModel. At end of each source step, emit a single grep-friendly "FILTER VISIBILITY REPORT" block listing the discovered count, every filtered name + reason, and the kept count per entity type, so users can tell whether a missing entity was removed by includes/excludes or never visible to the ingestion role. Storage profile is the diff only: discovered counts (int per type) + the existing Status.filtered list (now carrying richer reasons). No discovered-name or kept-name lists; both are derivable. No spec/JSON schema change, no UI work. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/metadata/ingestion/api/status.py | 7 + .../source/dashboard/dashboard_service.py | 27 ++- .../source/database/bigquery/metadata.py | 31 ++- .../source/database/common_db_source.py | 44 ++++- .../source/database/database_service.py | 42 +++- .../source/database/mssql/metadata.py | 17 +- .../source/database/postgres/metadata.py | 17 +- .../source/database/redshift/metadata.py | 17 +- .../source/database/snowflake/metadata.py | 24 ++- .../source/messaging/messaging_service.py | 21 +- .../source/mlmodel/mlflow/metadata.py | 10 +- .../source/mlmodel/mlmodel_service.py | 2 + .../source/mlmodel/sagemaker/metadata.py | 7 +- .../source/pipeline/pipeline_service.py | 21 +- .../source/storage/storage_service.py | 12 +- .../src/metadata/utils/filter_visibility.py | 184 ++++++++++++++++++ .../unit/utils/test_filter_visibility.py | 180 +++++++++++++++++ 17 files changed, 596 insertions(+), 67 deletions(-) create mode 100644 ingestion/src/metadata/utils/filter_visibility.py create mode 100644 ingestion/tests/unit/utils/test_filter_visibility.py diff --git a/ingestion/src/metadata/ingestion/api/status.py b/ingestion/src/metadata/ingestion/api/status.py index f8d0ad94c21b..e4b344e86492 100644 --- a/ingestion/src/metadata/ingestion/api/status.py +++ b/ingestion/src/metadata/ingestion/api/status.py @@ -60,6 +60,7 @@ class Status(BaseModel): warnings: Annotated[List[Any], Field(default_factory=list)] # noqa: UP006 filtered: Annotated[List[Dict[str, str]], Field(default_factory=list)] # noqa: UP006 failures: Annotated[List[TruncatedStackTraceError], Field(default_factory=list)] # noqa: UP006 + discovered_counts: Annotated[Dict[str, int], Field(default_factory=dict)] # noqa: UP006 def scanned(self, record: Any) -> None: """ @@ -101,6 +102,12 @@ def warning(self, key: str, reason: str) -> None: def filter(self, key: str, reason: str) -> None: self.filtered.append({key: reason}) + def record_discovered(self, entity_type: str, count: int) -> None: + """Record the count of entities discovered from the source before any + filter pattern is applied. Used by log_step_summary to report the + discovered vs. filtered vs. kept breakdown.""" + self.discovered_counts[entity_type] = self.discovered_counts.get(entity_type, 0) + count + def as_string(self) -> str: parts = [] for key, value in self.__dict__.items(): diff --git a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py index e93846d0879d..0e9fbb4362c9 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py @@ -73,6 +73,11 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import get_connection, test_connection_common from metadata.utils import fqn +from metadata.utils.filter_visibility import ( + log_discovered, + log_filtered, + log_step_summary, +) from metadata.utils.filters import filter_by_dashboard, filter_by_project from metadata.utils.logger import ingestion_logger @@ -418,6 +423,7 @@ def yield_dashboard_usage(self, *args, **kwargs) -> Iterable[DashboardUsage]: return def close(self): + log_step_summary(logger, self.status, self.config.serviceName) self.metadata.close() def get_services(self) -> Iterable[WorkflowSource]: @@ -566,15 +572,24 @@ def get_dashboard(self) -> Any: """ Method to iterate through dashboard lists filter dashboards & yield dashboard details """ - for dashboard in self.get_dashboards_list(): + dashboards = list(self.get_dashboards_list()) + log_discovered( + logger, + self.status, + "Dashboard", + (self.get_dashboard_name(d) for d in dashboards), + ) + for dashboard in dashboards: dashboard_name = self.get_dashboard_name(dashboard) if filter_by_dashboard( self.source_config.dashboardFilterPattern, dashboard_name, ): - self.status.filter( + log_filtered( + logger, + self.status, + "Dashboard", dashboard_name, - "Dashboard Filtered Out", ) continue @@ -594,9 +609,11 @@ def get_dashboard(self) -> Any: self.source_config.projectFilterPattern, project_name, ): - self.status.filter( + log_filtered( + logger, + self.status, + "Project", project_name, - "Project / Workspace Filtered Out", ) continue diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py b/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py index 96e07b80a357..ffd95be15fda 100644 --- a/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py @@ -108,6 +108,7 @@ from metadata.utils import fqn from metadata.utils.credentials import GOOGLE_CREDENTIALS from metadata.utils.execution_time_tracker import calculate_execution_time +from metadata.utils.filter_visibility import log_discovered, log_filtered from metadata.utils.filters import filter_by_database, filter_by_schema from metadata.utils.helpers import retry_with_docker_host from metadata.utils.logger import ingestion_logger @@ -663,7 +664,10 @@ def _get_filtered_datasets(self, project_id: str) -> List[str]: # noqa: UP006 ] def _get_filtered_schema_names(self, return_fqn: bool = False, add_to_status: bool = True) -> Iterable[str]: - for schema_name in self.get_raw_database_schema_names(): + raw_names = list(self.get_raw_database_schema_names()) + if add_to_status: + log_discovered(logger, self.status, "Schema", raw_names) + for schema_name in raw_names: schema_fqn = fqn.build( self.metadata, entity_type=DatabaseSchema, @@ -671,12 +675,20 @@ def _get_filtered_schema_names(self, return_fqn: bool = False, add_to_status: bo database_name=self.context.get().database, schema_name=schema_name, ) + filter_name = schema_fqn if self.source_config.useFqnForFiltering else schema_name if filter_by_schema( self.source_config.schemaFilterPattern, - schema_fqn if self.source_config.useFqnForFiltering else schema_name, + filter_name, ): if add_to_status: - self.status.filter(schema_fqn, "Schema Filtered Out") + log_filtered( + logger, + self.status, + "Schema", + schema_fqn, + matched_against=filter_name, + use_fqn_for_filtering=self.source_config.useFqnForFiltering, + ) continue if self.incremental.enabled: @@ -826,6 +838,7 @@ def get_database_names_raw(self) -> Iterable[str]: yield from self.project_ids def get_database_names(self) -> Iterable[str]: + log_discovered(logger, self.status, "Database", self.project_ids) for project_id in self.project_ids: database_fqn = fqn.build( self.metadata, @@ -833,11 +846,19 @@ def get_database_names(self) -> Iterable[str]: service_name=self.context.get().database_service, database_name=project_id, ) + filter_name = database_fqn if self.source_config.useFqnForFiltering else project_id if filter_by_database( self.source_config.databaseFilterPattern, - database_fqn if self.source_config.useFqnForFiltering else project_id, + filter_name, ): - self.status.filter(database_fqn, "Database Filtered out") + log_filtered( + logger, + self.status, + "Database", + database_fqn, + matched_against=filter_name, + use_fqn_for_filtering=self.source_config.useFqnForFiltering, + ) else: try: self.set_inspector(database_name=project_id) diff --git a/ingestion/src/metadata/ingestion/source/database/common_db_source.py b/ingestion/src/metadata/ingestion/source/database/common_db_source.py index da26ad973a88..7e1bcddc6ff1 100644 --- a/ingestion/src/metadata/ingestion/source/database/common_db_source.py +++ b/ingestion/src/metadata/ingestion/source/database/common_db_source.py @@ -73,6 +73,11 @@ calculate_execution_time, calculate_execution_time_generator, ) +from metadata.utils.filter_visibility import ( + log_discovered, + log_filtered, + log_step_summary, +) from metadata.utils.filters import filter_by_table from metadata.utils.helpers import retry_with_docker_host from metadata.utils.logger import ingestion_logger @@ -365,11 +370,17 @@ def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, TableType]]]: schema_name = self.context.get().database_schema if self.source_config.includeTables: try: - table_iter = self.query_table_names_and_types(schema_name) + table_iter = list(self.query_table_names_and_types(schema_name)) except Exception as err: logger.warning(f"Fetching table list failed for schema {schema_name} due to - {err}") logger.debug(traceback.format_exc()) table_iter = [] + log_discovered( + logger, + self.status, + "Table", + (item.name for item in table_iter), + ) for table_and_type in table_iter: try: table_name = self.standardize_table_name(schema_name, table_and_type.name) @@ -382,13 +393,18 @@ def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, TableType]]]: table_name=table_name, skip_es_search=True, ) + filter_name = table_fqn if self.source_config.useFqnForFiltering else table_name if filter_by_table( self.source_config.tableFilterPattern, - (table_fqn if self.source_config.useFqnForFiltering else table_name), + filter_name, ): - self.status.filter( + log_filtered( + logger, + self.status, + "Table", table_fqn, - "Table Filtered Out", + matched_against=filter_name, + use_fqn_for_filtering=self.source_config.useFqnForFiltering, ) continue except Exception as err: @@ -399,11 +415,17 @@ def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, TableType]]]: if self.source_config.includeViews: try: - view_iter = self.query_view_names_and_types(schema_name) + view_iter = list(self.query_view_names_and_types(schema_name)) except Exception as err: logger.warning(f"Fetching view list failed for schema {schema_name} due to - {err}") logger.debug(traceback.format_exc()) view_iter = [] + log_discovered( + logger, + self.status, + "Table", + (item.name for item in view_iter), + ) for view_and_type in view_iter: try: view_name = self.standardize_table_name(schema_name, view_and_type.name) @@ -416,13 +438,18 @@ def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, TableType]]]: table_name=view_name, ) + filter_name = view_fqn if self.source_config.useFqnForFiltering else view_name if filter_by_table( self.source_config.tableFilterPattern, - (view_fqn if self.source_config.useFqnForFiltering else view_name), + filter_name, ): - self.status.filter( + log_filtered( + logger, + self.status, + "Table", view_fqn, - "Table Filtered Out", + matched_against=filter_name, + use_fqn_for_filtering=self.source_config.useFqnForFiltering, ) continue except Exception as err: @@ -746,6 +773,7 @@ def inspector(self) -> Inspector: return self._inspector_map[thread_id] def close(self): + log_step_summary(logger, self.status, self.config.serviceName) self._release_engine() if hasattr(self, "ssl_manager") and self.ssl_manager: self.ssl_manager = cast(SSLManager, self.ssl_manager) # noqa: TC006 diff --git a/ingestion/src/metadata/ingestion/source/database/database_service.py b/ingestion/src/metadata/ingestion/source/database/database_service.py index 31628601ac6a..d7ceacb386ba 100644 --- a/ingestion/src/metadata/ingestion/source/database/database_service.py +++ b/ingestion/src/metadata/ingestion/source/database/database_service.py @@ -71,6 +71,7 @@ from metadata.ingestion.source.connections import test_connection_common from metadata.utils import fqn from metadata.utils.execution_time_tracker import calculate_execution_time +from metadata.utils.filter_visibility import log_discovered, log_filtered from metadata.utils.filters import filter_by_schema, filter_by_stored_procedure from metadata.utils.logger import ingestion_logger from metadata.utils.owner_utils import get_owner_from_config @@ -545,27 +546,44 @@ def register_record_schema_request(self, schema_request: CreateDatabaseSchemaReq def _get_filtered_database_names(self, return_fqn: bool = False, add_to_status: bool = True) -> Iterable[str]: """ - Get filtered database names based on the database filter pattern + Get filtered database names based on the database filter pattern. + + Note: this is a maintenance-pass helper (called from + mark_databases_as_deleted), not the main ingestion path. Discovery + logging lives in each connector's get_database_names() to avoid + double-counting; this method only emits filter-rejection logs when + add_to_status=True. """ - database_names_iterable = getattr(self, "get_database_names_raw", self.get_database_names)() - for database_name in database_names_iterable: + raw_names = list(getattr(self, "get_database_names_raw", self.get_database_names)()) + for database_name in raw_names: database_fqn = fqn.build( self.metadata, entity_type=Database, service_name=self.context.get().database_service, database_name=database_name, ) + filter_name = database_fqn if self.source_config.useFqnForFiltering else database_name if filter_by_schema( self.source_config.databaseFilterPattern, - (database_fqn if self.source_config.useFqnForFiltering else database_name), + filter_name, ): if add_to_status: - self.status.filter(database_fqn, "Database Filtered Out") + log_filtered( + logger, + self.status, + "Database", + database_fqn, + matched_against=filter_name, + use_fqn_for_filtering=self.source_config.useFqnForFiltering, + ) continue yield database_fqn if return_fqn else database_name def _get_filtered_schema_names(self, return_fqn: bool = False, add_to_status: bool = True) -> Iterable[str]: - for schema_name in self.get_raw_database_schema_names(): + raw_names = list(self.get_raw_database_schema_names()) + if add_to_status: + log_discovered(logger, self.status, "Schema", raw_names) + for schema_name in raw_names: schema_fqn = fqn.build( self.metadata, entity_type=DatabaseSchema, @@ -573,12 +591,20 @@ def _get_filtered_schema_names(self, return_fqn: bool = False, add_to_status: bo database_name=self.context.get().database, schema_name=schema_name, ) + filter_name = schema_fqn if self.source_config.useFqnForFiltering else schema_name if filter_by_schema( self.source_config.schemaFilterPattern, - schema_fqn if self.source_config.useFqnForFiltering else schema_name, + filter_name, ): if add_to_status: - self.status.filter(schema_fqn, "Schema Filtered Out") + log_filtered( + logger, + self.status, + "Schema", + schema_fqn, + matched_against=filter_name, + use_fqn_for_filtering=self.source_config.useFqnForFiltering, + ) continue yield schema_fqn if return_fqn else schema_name diff --git a/ingestion/src/metadata/ingestion/source/database/mssql/metadata.py b/ingestion/src/metadata/ingestion/source/database/mssql/metadata.py index c4ebafe619fc..4c14ea367fc9 100644 --- a/ingestion/src/metadata/ingestion/source/database/mssql/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/mssql/metadata.py @@ -61,6 +61,7 @@ ) from metadata.ingestion.source.database.multi_db_source import MultiDBSource from metadata.utils import fqn +from metadata.utils.filter_visibility import log_discovered, log_filtered from metadata.utils.filters import filter_by_database from metadata.utils.logger import ingestion_logger from metadata.utils.sqa_utils import update_mssql_ischema_names @@ -198,7 +199,9 @@ def get_database_names(self) -> Iterable[str]: self.set_inspector(database_name=configured_db) yield configured_db else: - for new_database in self.get_database_names_raw(): + database_names = list(self.get_database_names_raw()) + log_discovered(logger, self.status, "Database", database_names) + for new_database in database_names: database_fqn = fqn.build( self.metadata, entity_type=Database, @@ -206,11 +209,19 @@ def get_database_names(self) -> Iterable[str]: database_name=new_database, ) + filter_name = database_fqn if self.source_config.useFqnForFiltering else new_database if filter_by_database( self.source_config.databaseFilterPattern, - (database_fqn if self.source_config.useFqnForFiltering else new_database), + filter_name, ): - self.status.filter(database_fqn, "Database Filtered Out") + log_filtered( + logger, + self.status, + "Database", + database_fqn, + matched_against=filter_name, + use_fqn_for_filtering=self.source_config.useFqnForFiltering, + ) continue try: diff --git a/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py b/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py index 39baa542482b..ba625d249122 100644 --- a/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py @@ -82,6 +82,7 @@ get_view_definition, ) from metadata.utils import fqn +from metadata.utils.filter_visibility import log_discovered, log_filtered from metadata.utils.filters import filter_by_database from metadata.utils.importer import import_side_effects from metadata.utils.logger import ingestion_logger @@ -180,7 +181,9 @@ def get_database_names(self) -> Iterable[str]: self.set_schema_description_map() yield configured_db else: - for new_database in self.get_database_names_raw(): + database_names = list(self.get_database_names_raw()) + log_discovered(logger, self.status, "Database", database_names) + for new_database in database_names: database_fqn = fqn.build( self.metadata, entity_type=Database, @@ -188,11 +191,19 @@ def get_database_names(self) -> Iterable[str]: database_name=new_database, ) + filter_name = database_fqn if self.source_config.useFqnForFiltering else new_database if filter_by_database( self.source_config.databaseFilterPattern, - (database_fqn if self.source_config.useFqnForFiltering else new_database), + filter_name, ): - self.status.filter(database_fqn, "Database Filtered Out") + log_filtered( + logger, + self.status, + "Database", + database_fqn, + matched_against=filter_name, + use_fqn_for_filtering=self.source_config.useFqnForFiltering, + ) continue try: diff --git a/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py b/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py index 2d2331da1307..e3b2e6513181 100644 --- a/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py @@ -96,6 +96,7 @@ ) from metadata.utils import fqn from metadata.utils.execution_time_tracker import calculate_execution_time_generator +from metadata.utils.filter_visibility import log_discovered, log_filtered from metadata.utils.filters import filter_by_database from metadata.utils.helpers import clean_up_starting_ending_double_quotes_in_string from metadata.utils.logger import ingestion_logger @@ -285,7 +286,9 @@ def get_database_names(self) -> Iterable[str]: self.set_external_location_map(configured_db) yield configured_db else: - for new_database in self.get_database_names_raw(): + database_names = list(self.get_database_names_raw()) + log_discovered(logger, self.status, "Database", database_names) + for new_database in database_names: database_fqn = fqn.build( self.metadata, entity_type=Database, @@ -293,11 +296,19 @@ def get_database_names(self) -> Iterable[str]: database_name=new_database, ) + filter_name = database_fqn if self.source_config.useFqnForFiltering else new_database if filter_by_database( self.source_config.databaseFilterPattern, - (database_fqn if self.source_config.useFqnForFiltering else new_database), + filter_name, ): - self.status.filter(database_fqn, "Database Filtered Out") + log_filtered( + logger, + self.status, + "Database", + database_fqn, + matched_against=filter_name, + use_fqn_for_filtering=self.source_config.useFqnForFiltering, + ) continue try: diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py b/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py index cee2ba80871a..0ba4cb9b8666 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py @@ -128,6 +128,7 @@ normalize_names, ) from metadata.utils import fqn +from metadata.utils.filter_visibility import log_discovered, log_filtered from metadata.utils.filters import filter_by_database from metadata.utils.logger import ingestion_logger from metadata.utils.sqlalchemy_utils import ( @@ -386,11 +387,6 @@ def get_configured_database(self) -> Optional[str]: # noqa: UP045 def get_database_names_raw(self) -> Iterable[str]: results = self.connection.execute(text(SNOWFLAKE_GET_DATABASES)).fetchall() database_names = [list(res)[1] for res in results] - logger.info( - "SHOW DATABASES returned %d database(s) visible to the ingestion role", - len(database_names), - ) - logger.debug("Databases visible to the ingestion role: %s", database_names) yield from database_names def get_database_names(self) -> Iterable[str]: @@ -406,7 +402,9 @@ def get_database_names(self) -> Iterable[str]: self.set_database_tags_map(configured_db) yield configured_db else: - for new_database in self.get_database_names_raw(): + database_names = list(self.get_database_names_raw()) + log_discovered(logger, self.status, "Database", database_names) + for new_database in database_names: database_fqn = fqn.build( self.metadata, entity_type=Database, @@ -421,14 +419,14 @@ def get_database_names(self) -> Iterable[str]: self.source_config.databaseFilterPattern, filter_name, ): - logger.info( - "Filtering out database '%s': did not pass databaseFilterPattern " - "(matched against '%s', useFqnForFiltering=%s)", - new_database, - filter_name, - self.source_config.useFqnForFiltering, + log_filtered( + logger, + self.status, + "Database", + database_fqn, + matched_against=filter_name, + use_fqn_for_filtering=self.source_config.useFqnForFiltering, ) - self.status.filter(database_fqn, "Database Filtered Out") continue try: diff --git a/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py b/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py index 591daef1c5d9..e9804a81cbf5 100644 --- a/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py +++ b/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py @@ -49,6 +49,11 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import get_connection, test_connection_common from metadata.utils import fqn +from metadata.utils.filter_visibility import ( + log_discovered, + log_filtered, + log_step_summary, +) from metadata.utils.filters import filter_by_topic from metadata.utils.helpers import retry_with_docker_host from metadata.utils.logger import ingestion_logger @@ -206,15 +211,24 @@ def get_topic_name(self, topic_details: Any) -> str: """ def get_topic(self) -> Any: - for topic_details in self.get_topic_list(): + topics = list(self.get_topic_list() or []) + log_discovered( + logger, + self.status, + "Topic", + (self.get_topic_name(t) for t in topics), + ) + for topic_details in topics: topic_name = self.get_topic_name(topic_details) if filter_by_topic( self.source_config.topicFilterPattern, topic_name, ): - self.status.filter( + log_filtered( + logger, + self.status, + "Topic", topic_name, - "Topic Filtered Out", ) continue yield topic_details @@ -257,3 +271,4 @@ def register_record(self, topic_request: CreateTopicRequest) -> None: def close(self): """By default, nothing to close""" + log_step_summary(logger, self.status, self.config.serviceName) diff --git a/ingestion/src/metadata/ingestion/source/mlmodel/mlflow/metadata.py b/ingestion/src/metadata/ingestion/source/mlmodel/mlflow/metadata.py index 8f2676c2ffd7..b56c5f9ec041 100644 --- a/ingestion/src/metadata/ingestion/source/mlmodel/mlflow/metadata.py +++ b/ingestion/src/metadata/ingestion/source/mlmodel/mlflow/metadata.py @@ -45,6 +45,7 @@ from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.mlmodel.mlmodel_service import MlModelServiceSource +from metadata.utils.filter_visibility import log_discovered, log_filtered from metadata.utils.filters import filter_by_mlmodel from metadata.utils.helpers import clean_uri from metadata.utils.logger import ingestion_logger @@ -74,12 +75,11 @@ def get_mlmodels( # pylint: disable=arguments-differ """ List and filters models from the registry """ - for model in cast(RegisteredModel, self.client.search_registered_models()): # noqa: TC006 + models = list(cast(RegisteredModel, self.client.search_registered_models())) # noqa: TC006 + log_discovered(logger, self.status, "MlModel", (m.name for m in models)) + for model in models: if filter_by_mlmodel(self.source_config.mlModelFilterPattern, mlmodel_name=model.name): - self.status.filter( - model.name, - "MlModel name pattern not allowed", - ) + log_filtered(logger, self.status, "MlModel", model.name) continue # Get the latest version diff --git a/ingestion/src/metadata/ingestion/source/mlmodel/mlmodel_service.py b/ingestion/src/metadata/ingestion/source/mlmodel/mlmodel_service.py index 997886639018..80be6ae44fa7 100644 --- a/ingestion/src/metadata/ingestion/source/mlmodel/mlmodel_service.py +++ b/ingestion/src/metadata/ingestion/source/mlmodel/mlmodel_service.py @@ -51,6 +51,7 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import get_connection, test_connection_common from metadata.utils import fqn +from metadata.utils.filter_visibility import log_step_summary from metadata.utils.helpers import retry_with_docker_host from metadata.utils.logger import ingestion_logger @@ -174,6 +175,7 @@ def _get_algorithm(self, *args, **kwargs) -> str: def close(self): """By default, nothing to close""" + log_step_summary(logger, self.status, self.config.serviceName) def test_connection(self) -> None: test_connection_common(self.metadata, self.connection_obj, self.service_connection) diff --git a/ingestion/src/metadata/ingestion/source/mlmodel/sagemaker/metadata.py b/ingestion/src/metadata/ingestion/source/mlmodel/sagemaker/metadata.py index 4ca73985f9f1..b71f99f9c37d 100644 --- a/ingestion/src/metadata/ingestion/source/mlmodel/sagemaker/metadata.py +++ b/ingestion/src/metadata/ingestion/source/mlmodel/sagemaker/metadata.py @@ -46,6 +46,7 @@ from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.mlmodel.mlmodel_service import MlModelServiceSource +from metadata.utils.filter_visibility import log_discovered, log_filtered from metadata.utils.filters import filter_by_mlmodel from metadata.utils.logger import ingestion_logger @@ -134,16 +135,14 @@ def get_mlmodels( # pylint: disable=arguments-differ else: logger.debug(f"No registered models found under sagemaker unified studio") # noqa: F541 + log_discovered(logger, self.status, "MlModel", (m["ModelName"] for m in models)) for model in models: try: if filter_by_mlmodel( self.source_config.mlModelFilterPattern, mlmodel_name=model["ModelName"], ): - self.status.filter( - model["ModelName"], - "MlModel name pattern not allowed", - ) + log_filtered(logger, self.status, "MlModel", model["ModelName"]) continue yield SageMakerModel( name=model["ModelName"], diff --git a/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py b/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py index eadba98a9d1e..c2a7bb6405f7 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py @@ -58,6 +58,11 @@ from metadata.ingestion.source.pipeline.openlineage.models import TableDetails from metadata.ingestion.source.pipeline.openlineage.utils import FQNNotFoundException from metadata.utils import fqn +from metadata.utils.filter_visibility import ( + log_discovered, + log_filtered, + log_step_summary, +) from metadata.utils.filters import filter_by_pipeline from metadata.utils.helpers import retry_with_docker_host from metadata.utils.logger import ingestion_logger @@ -345,6 +350,7 @@ def yield_tag(self, pipeline_details: Any) -> Iterable[Either[OMetaTagAndClassif def close(self): """Method to implement any required logic after the ingestion process is completed""" + log_step_summary(logger, self.status, self.config.serviceName) self.metadata.compute_percentile(Pipeline, self.today) def get_services(self) -> Iterable[WorkflowSource]: @@ -354,15 +360,24 @@ def yield_create_request_pipeline_service(self, config: WorkflowSource): yield Either(right=self.metadata.get_create_service_from_source(entity=PipelineService, config=config)) def get_pipeline(self) -> Any: - for pipeline_detail in self.get_pipelines_list(): + pipelines = list(self.get_pipelines_list() or []) + log_discovered( + logger, + self.status, + "Pipeline", + (self.get_pipeline_name(p) for p in pipelines), + ) + for pipeline_detail in pipelines: pipeline_name = self.get_pipeline_name(pipeline_detail) if filter_by_pipeline( self.source_config.pipelineFilterPattern, pipeline_name, ): - self.status.filter( + log_filtered( + logger, + self.status, + "Pipeline", pipeline_name, - "Pipeline Filtered Out", ) continue yield pipeline_detail diff --git a/ingestion/src/metadata/ingestion/source/storage/storage_service.py b/ingestion/src/metadata/ingestion/source/storage/storage_service.py index 29f8c2c57cd4..f1654aaba2f8 100644 --- a/ingestion/src/metadata/ingestion/source/storage/storage_service.py +++ b/ingestion/src/metadata/ingestion/source/storage/storage_service.py @@ -64,6 +64,7 @@ DataFrameColumnParser, fetch_dataframe_first_chunk, ) +from metadata.utils.filter_visibility import log_filtered, log_step_summary from metadata.utils.helpers import retry_with_docker_host from metadata.utils.logger import ingestion_logger from metadata.utils.path_pattern import ( @@ -302,6 +303,7 @@ def yield_create_container_requests(self, container_details: Any) -> Iterable[Ei def close(self): """By default, nothing needs to be closed""" + log_step_summary(logger, self.status, self.config.serviceName) def get_services(self) -> Iterable[WorkflowSource]: yield self.config @@ -666,11 +668,13 @@ def filter_manifest_entries(self, bucket_name: str, entries: List[MetadataEntry] # 2. Pipeline-level containerFilterPattern against the dataPath. if pattern and filter_by_container(pattern, path): - logger.info( - f"Skipping manifest entry '{path}' in bucket '{bucket_name}' — filtered by containerFilterPattern." - ) if hasattr(self, "status") and hasattr(self.status, "filter"): - self.status.filter(path, "containerFilterPattern excluded") + log_filtered(logger, self.status, "Container", path) + else: + logger.info( + f"Skipping manifest entry '{path}' in bucket '{bucket_name}' " + f"— filtered by containerFilterPattern." + ) continue filtered.append(entry) diff --git a/ingestion/src/metadata/utils/filter_visibility.py b/ingestion/src/metadata/utils/filter_visibility.py new file mode 100644 index 000000000000..cb82d232a105 --- /dev/null +++ b/ingestion/src/metadata/utils/filter_visibility.py @@ -0,0 +1,184 @@ +# Copyright 2025 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Centralized helpers for logging connector filter visibility. + +When users don't see an entity after ingestion (database, schema, table, +dashboard, pipeline, topic, container, mlmodel), they need to be able to +distinguish: + + 1. Source-side permissions — the connecting user can't see the object + 2. Filter config — includes/excludes patterns removed it + +These helpers emit a consistent "discovered → filtered → kept" trail across +every connector family and end each step with a single FILTER VISIBILITY +REPORT block. The report is the diagnostic anchor: one log block, easy to +grep ("FILTER VISIBILITY REPORT"), containing exactly the information that +isn't derivable from elsewhere. + +Memory profile: we deliberately store only the *diff* — discovered count +(int) plus the names of items the filter rejected (plus their reasons). +Visible names and kept names are not stored; they're derivable as counts +and the kept items show up in normal ingestion logs / Status.records. +""" + +import logging +from typing import Iterable # noqa: UP035 + +from metadata.ingestion.api.status import Status + +REPORT_HEADER_PREFIX = "FILTER VISIBILITY REPORT" +_REASON_SUFFIX = "Filtered Out" + + +def log_discovered( + logger: logging.Logger, + status: Status, + entity_type: str, + names: Iterable[str], +) -> None: + """Log the count of entities visible from the source before any filter + is applied, and record the count on Status so the end-of-step report + can compute discovered vs. kept. Names are emitted at DEBUG only — the + actionable diff (what got filtered) lives in log_filtered + the + end-of-step report; the full visible list would explode logs on large + catalogs without adding actionable information.""" + name_list = list(names) + count = len(name_list) + status.record_discovered(entity_type, count) + logger.info( + "Discovered %d %s(s) visible to the ingestion user", + count, + entity_type.lower(), + ) + logger.debug( + "%s(s) visible to the ingestion user: %s", + entity_type, + name_list, + ) + + +def log_filtered( + logger: logging.Logger, + status: Status, + entity_type: str, + name: str, + *, + matched_against: str | None = None, + use_fqn_for_filtering: bool | None = None, +) -> None: + """Log a filter-pattern rejection and record it on Status. The + reason string stored on Status is rich enough that the end-of-step + report can reproduce the full diagnostic (which pattern field, + what was matched against, whether FQN filtering was on) without + needing to retain any extra state.""" + pattern_field = _pattern_field(entity_type) + detail_parts = [f"did not pass {pattern_field}"] + if matched_against is not None and matched_against != name: + detail_parts.append(f"matched against '{matched_against}'") + if use_fqn_for_filtering is not None: + detail_parts.append(f"useFqnForFiltering={use_fqn_for_filtering}") + detail = ", ".join(detail_parts) + + logger.info("Filtering out %s '%s': %s", entity_type.lower(), name, detail) + reason = f"{entity_type} {_REASON_SUFFIX}: {detail}" + status.filter(name, reason) + + +def log_step_summary( + logger: logging.Logger, + status: Status, + source_name: str, +) -> None: + """Emit the end-of-step FILTER VISIBILITY REPORT. One log block, + framed with grep-friendly markers, listing per entity type: + - count visible to the ingestion user + - count + names + reasons of everything the filter dropped + - count that will be published to OpenMetadata + No-op when there's nothing to report (e.g., a sink-only step).""" + by_type = _group_filtered_by_entity_type(status) + entity_types = sorted(set(status.discovered_counts) | set(by_type)) + if not entity_types: + return + + border = "=" * 70 + lines = [ + "", + border, + f" {REPORT_HEADER_PREFIX}: {source_name}", + border, + ] + for entity_type in entity_types: + lines.extend(_format_entity_section(status, entity_type, by_type.get(entity_type, []))) + lines.append(border) + logger.info("\n".join(lines)) + + +def _format_entity_section( + status: Status, + entity_type: str, + filtered_entries: list[tuple[str, str]], +) -> list[str]: + """Format a single entity type's section of the report. Visible and + kept are counts only; filtered shows every name + the reason it was + rejected so the user can diff against their own filterPattern config.""" + pattern_field = _pattern_field(entity_type) + discovered = status.discovered_counts.get(entity_type) + filtered_count = len(filtered_entries) + section = ["", f"{entity_type} ({pattern_field}):"] + + if discovered is not None: + section.append(f" Visible to ingestion user: {discovered}") + section.append(f" Filtered out ({filtered_count}):") + if filtered_entries: + max_name_width = max(len(name) for name, _ in filtered_entries) + name_pad = min(max_name_width, 50) + for name, reason in filtered_entries: + detail = reason.split(": ", 1)[1] if ": " in reason else reason + section.append(f" {name:<{name_pad}} → {detail}") + if discovered is not None: + kept = discovered - filtered_count + section.append(f" Will be published to OpenMetadata: {kept}") + + return section + + +def _group_filtered_by_entity_type(status: Status) -> dict[str, list[tuple[str, str]]]: + """Walk Status.filtered (the existing project-wide accumulator) and + bucket entries by entity type, parsed from the reason string this + module wrote. Entries whose reason doesn't follow our convention + (legacy callers of status.filter) are skipped, not guessed at.""" + by_type: dict[str, list[tuple[str, str]]] = {} + for entry in status.filtered: + for name, reason in entry.items(): + entity_type = _entity_type_from_reason(reason) + if entity_type: + by_type.setdefault(entity_type, []).append((name, reason)) + return by_type + + +def _pattern_field(entity_type: str) -> str: + """Map 'Database' -> 'databaseFilterPattern', preserving the same field + name connectors use in their YAML config so log messages are + grep-actionable against the user's config.""" + return f"{entity_type[0].lower()}{entity_type[1:]}FilterPattern" + + +def _entity_type_from_reason(reason: str) -> str | None: + """Inverse of the reason string built in log_filtered. Looks for the + ' Filtered Out' marker so it works for both legacy reasons + ('Database Filtered Out') and the enriched form + ('Database Filtered Out: did not pass databaseFilterPattern...').""" + marker = f" {_REASON_SUFFIX}" + idx = reason.find(marker) + if idx > 0: + return reason[:idx] + return None diff --git a/ingestion/tests/unit/utils/test_filter_visibility.py b/ingestion/tests/unit/utils/test_filter_visibility.py new file mode 100644 index 000000000000..5486fdc87a41 --- /dev/null +++ b/ingestion/tests/unit/utils/test_filter_visibility.py @@ -0,0 +1,180 @@ +# Copyright 2025 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for filter_visibility helpers used across connector base classes.""" + +import logging + +import pytest + +from metadata.ingestion.api.status import Status +from metadata.utils.filter_visibility import ( + log_discovered, + log_filtered, + log_step_summary, +) + + +@pytest.fixture +def status() -> Status: + return Status() + + +@pytest.fixture +def logger() -> logging.Logger: + return logging.getLogger("test_filter_visibility") + + +def test_log_discovered_records_count_and_emits_info(status, logger, caplog): + with caplog.at_level(logging.INFO, logger=logger.name): + log_discovered(logger, status, "Database", ["db1", "db2", "db3"]) + + assert status.discovered_counts == {"Database": 3} + assert any("Discovered 3 database(s) visible to the ingestion user" in r.message for r in caplog.records) + + +def test_log_discovered_emits_full_list_at_debug(status, logger, caplog): + with caplog.at_level(logging.DEBUG, logger=logger.name): + log_discovered(logger, status, "Schema", ["public", "internal"]) + + debug_messages = [r.message for r in caplog.records if r.levelno == logging.DEBUG] + assert any("public" in msg and "internal" in msg for msg in debug_messages) + + +def test_log_discovered_accumulates_across_calls(status, logger): + log_discovered(logger, status, "Database", ["db1"]) + log_discovered(logger, status, "Database", ["db2", "db3"]) + + assert status.discovered_counts == {"Database": 3} + + +def test_log_discovered_accepts_generator(status, logger): + def gen(): + yield from ["a", "b"] + + log_discovered(logger, status, "Topic", gen()) + + assert status.discovered_counts == {"Topic": 2} + + +def test_log_filtered_stores_rich_reason_on_status(status, logger, caplog): + with caplog.at_level(logging.INFO, logger=logger.name): + log_filtered( + logger, + status, + "Database", + "BACKUP_DB", + matched_against="service.BACKUP_DB", + use_fqn_for_filtering=True, + ) + + assert len(status.filtered) == 1 + name, reason = next(iter(status.filtered[0].items())) + assert name == "BACKUP_DB" + assert reason.startswith("Database Filtered Out: ") + assert "did not pass databaseFilterPattern" in reason + assert "matched against 'service.BACKUP_DB'" in reason + assert "useFqnForFiltering=True" in reason + + +def test_log_filtered_omits_matched_against_when_same_as_name(status, logger, caplog): + with caplog.at_level(logging.INFO, logger=logger.name): + log_filtered(logger, status, "Schema", "TEMP", matched_against="TEMP") + + name, reason = next(iter(status.filtered[0].items())) + assert name == "TEMP" + assert "matched against" not in reason + + +def test_log_filtered_works_with_minimal_args(status, logger, caplog): + with caplog.at_level(logging.INFO, logger=logger.name): + log_filtered(logger, status, "Topic", "noisy.topic") + + name, reason = next(iter(status.filtered[0].items())) + assert name == "noisy.topic" + assert reason == "Topic Filtered Out: did not pass topicFilterPattern" + + +def test_log_step_summary_emits_consolidated_report(status, logger, caplog): + log_discovered(logger, status, "Database", ["a", "b", "c", "d"]) + log_discovered(logger, status, "Schema", ["s1", "s2", "s3"]) + log_filtered(logger, status, "Database", "a", matched_against="svc.a", use_fqn_for_filtering=True) + log_filtered(logger, status, "Schema", "s1") + log_filtered(logger, status, "Schema", "s2") + + caplog.clear() + with caplog.at_level(logging.INFO, logger=logger.name): + log_step_summary(logger, status, "snowflake") + + summary = "\n".join(r.message for r in caplog.records) + assert "FILTER VISIBILITY REPORT: snowflake" in summary + assert "Database (databaseFilterPattern):" in summary + assert "Visible to ingestion user: 4" in summary + assert "Filtered out (1):" in summary + assert "Will be published to OpenMetadata: 3" in summary + assert "a" in summary and "did not pass databaseFilterPattern" in summary + assert "matched against 'svc.a'" in summary + + assert "Schema (schemaFilterPattern):" in summary + assert "Visible to ingestion user: 3" in summary + assert "Filtered out (2):" in summary + assert "Will be published to OpenMetadata: 1" in summary + + +def test_log_step_summary_handles_filtered_without_discovered(status, logger, caplog): + log_filtered(logger, status, "Pipeline", "p1") + + caplog.clear() + with caplog.at_level(logging.INFO, logger=logger.name): + log_step_summary(logger, status, "airflow") + + summary = "\n".join(r.message for r in caplog.records) + assert "Pipeline (pipelineFilterPattern):" in summary + assert "Filtered out (1):" in summary + assert "p1" in summary + + +def test_log_step_summary_noop_when_nothing_to_report(status, logger, caplog): + with caplog.at_level(logging.INFO, logger=logger.name): + log_step_summary(logger, status, "empty_source") + + assert not caplog.records + + +def test_log_step_summary_skips_unrelated_filter_reasons(status, logger, caplog): + log_discovered(logger, status, "Database", ["a", "b"]) + status.filter("x", "some other reason that is not from our helper") + + caplog.clear() + with caplog.at_level(logging.INFO, logger=logger.name): + log_step_summary(logger, status, "mixed") + + summary = "\n".join(r.message for r in caplog.records) + assert "Visible to ingestion user: 2" in summary + assert "Filtered out (0):" in summary + assert "Will be published to OpenMetadata: 2" in summary + assert "some other reason" not in summary + + +def test_log_step_summary_recognizes_legacy_reason_strings(status, logger, caplog): + """status.filter() callers predating this helper (e.g., 'Database Filtered Out' + without a trailing colon) must still be grouped into the right entity-type + section of the report.""" + log_discovered(logger, status, "Database", ["a", "b"]) + status.filter("a", "Database Filtered Out") + + caplog.clear() + with caplog.at_level(logging.INFO, logger=logger.name): + log_step_summary(logger, status, "legacy") + + summary = "\n".join(r.message for r in caplog.records) + assert "Database (databaseFilterPattern):" in summary + assert "Filtered out (1):" in summary + assert "Will be published to OpenMetadata: 1" in summary From 67fd64342d72a5f6d89a575f15d9777e635abc04 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Thu, 21 May 2026 13:53:53 -0700 Subject: [PATCH 2/7] test(filter_visibility): add resilience, cap, and lifecycle tests + defenses Observability code in the ingestion hot path must not fail connectors. Adds three layers of defense: 1. Helper-internal try/except in log_discovered, log_filtered, and log_step_summary. Any failure is logged once at WARN and swallowed. 2. Call-site try/except around log_step_summary in every base class close() (database, dashboard, pipeline, messaging, storage, mlmodel) so a summary failure can't mask the real cleanup work. 3. Per-entity-type cap on Status.filtered (MAX_FILTERED_ENTRIES_PER_TYPE = 50_000). Past the cap, log_filtered only bumps a new Status.filtered_counts dict so the true count stays accurate without unbounded memory growth. The report annotates the truncation. Test coverage doubled (12 -> 23): adds bounded-growth tests for the cap, resilience tests for broken loggers / bad inputs / corrupted Status, and an integration-style multi-database lifecycle test that verifies counts are correct after the full streaming-log + summary cycle. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/metadata/ingestion/api/status.py | 4 + .../source/dashboard/dashboard_service.py | 5 +- .../source/database/common_db_source.py | 8 +- .../source/messaging/messaging_service.py | 5 +- .../source/mlmodel/mlmodel_service.py | 5 +- .../source/pipeline/pipeline_service.py | 5 +- .../source/storage/storage_service.py | 5 +- .../src/metadata/utils/filter_visibility.py | 164 +++++++++---- .../unit/utils/test_filter_visibility.py | 232 ++++++++++++++++-- 9 files changed, 359 insertions(+), 74 deletions(-) diff --git a/ingestion/src/metadata/ingestion/api/status.py b/ingestion/src/metadata/ingestion/api/status.py index e4b344e86492..018c0961ca80 100644 --- a/ingestion/src/metadata/ingestion/api/status.py +++ b/ingestion/src/metadata/ingestion/api/status.py @@ -61,6 +61,10 @@ class Status(BaseModel): filtered: Annotated[List[Dict[str, str]], Field(default_factory=list)] # noqa: UP006 failures: Annotated[List[TruncatedStackTraceError], Field(default_factory=list)] # noqa: UP006 discovered_counts: Annotated[Dict[str, int], Field(default_factory=dict)] # noqa: UP006 + # True per-entity-type count of filter rejections, even when the name is + # not appended to `filtered` because the per-type cap was hit. Source of + # truth for the end-of-step FILTER VISIBILITY REPORT count. + filtered_counts: Annotated[Dict[str, int], Field(default_factory=dict)] # noqa: UP006 def scanned(self, record: Any) -> None: """ diff --git a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py index 0e9fbb4362c9..1b83b81aab0b 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py @@ -423,7 +423,10 @@ def yield_dashboard_usage(self, *args, **kwargs) -> Iterable[DashboardUsage]: return def close(self): - log_step_summary(logger, self.status, self.config.serviceName) + try: + log_step_summary(logger, self.status, self.config.serviceName) + except Exception: + logger.warning("Filter visibility report failed; continuing close()", exc_info=True) self.metadata.close() def get_services(self) -> Iterable[WorkflowSource]: diff --git a/ingestion/src/metadata/ingestion/source/database/common_db_source.py b/ingestion/src/metadata/ingestion/source/database/common_db_source.py index 7e1bcddc6ff1..77863cd89082 100644 --- a/ingestion/src/metadata/ingestion/source/database/common_db_source.py +++ b/ingestion/src/metadata/ingestion/source/database/common_db_source.py @@ -773,7 +773,13 @@ def inspector(self) -> Inspector: return self._inspector_map[thread_id] def close(self): - log_step_summary(logger, self.status, self.config.serviceName) + # Defense-in-depth: helper has its own internal try/except, but a + # bug in close()'s observability step must never block the real + # cleanup work (engine release, SSL temp file cleanup). + try: + log_step_summary(logger, self.status, self.config.serviceName) + except Exception: + logger.warning("Filter visibility report failed; continuing close()", exc_info=True) self._release_engine() if hasattr(self, "ssl_manager") and self.ssl_manager: self.ssl_manager = cast(SSLManager, self.ssl_manager) # noqa: TC006 diff --git a/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py b/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py index e9804a81cbf5..048f06a9b371 100644 --- a/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py +++ b/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py @@ -271,4 +271,7 @@ def register_record(self, topic_request: CreateTopicRequest) -> None: def close(self): """By default, nothing to close""" - log_step_summary(logger, self.status, self.config.serviceName) + try: + log_step_summary(logger, self.status, self.config.serviceName) + except Exception: + logger.warning("Filter visibility report failed; continuing close()", exc_info=True) diff --git a/ingestion/src/metadata/ingestion/source/mlmodel/mlmodel_service.py b/ingestion/src/metadata/ingestion/source/mlmodel/mlmodel_service.py index 80be6ae44fa7..91dad88ba173 100644 --- a/ingestion/src/metadata/ingestion/source/mlmodel/mlmodel_service.py +++ b/ingestion/src/metadata/ingestion/source/mlmodel/mlmodel_service.py @@ -175,7 +175,10 @@ def _get_algorithm(self, *args, **kwargs) -> str: def close(self): """By default, nothing to close""" - log_step_summary(logger, self.status, self.config.serviceName) + try: + log_step_summary(logger, self.status, self.config.serviceName) + except Exception: + logger.warning("Filter visibility report failed; continuing close()", exc_info=True) def test_connection(self) -> None: test_connection_common(self.metadata, self.connection_obj, self.service_connection) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py b/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py index c2a7bb6405f7..78fa1f252aaa 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py @@ -350,7 +350,10 @@ def yield_tag(self, pipeline_details: Any) -> Iterable[Either[OMetaTagAndClassif def close(self): """Method to implement any required logic after the ingestion process is completed""" - log_step_summary(logger, self.status, self.config.serviceName) + try: + log_step_summary(logger, self.status, self.config.serviceName) + except Exception: + logger.warning("Filter visibility report failed; continuing close()", exc_info=True) self.metadata.compute_percentile(Pipeline, self.today) def get_services(self) -> Iterable[WorkflowSource]: diff --git a/ingestion/src/metadata/ingestion/source/storage/storage_service.py b/ingestion/src/metadata/ingestion/source/storage/storage_service.py index f1654aaba2f8..f04812dec182 100644 --- a/ingestion/src/metadata/ingestion/source/storage/storage_service.py +++ b/ingestion/src/metadata/ingestion/source/storage/storage_service.py @@ -303,7 +303,10 @@ def yield_create_container_requests(self, container_details: Any) -> Iterable[Ei def close(self): """By default, nothing needs to be closed""" - log_step_summary(logger, self.status, self.config.serviceName) + try: + log_step_summary(logger, self.status, self.config.serviceName) + except Exception: + logger.warning("Filter visibility report failed; continuing close()", exc_info=True) def get_services(self) -> Iterable[WorkflowSource]: yield self.config diff --git a/ingestion/src/metadata/utils/filter_visibility.py b/ingestion/src/metadata/utils/filter_visibility.py index cb82d232a105..af0548945840 100644 --- a/ingestion/src/metadata/utils/filter_visibility.py +++ b/ingestion/src/metadata/utils/filter_visibility.py @@ -24,10 +24,20 @@ grep ("FILTER VISIBILITY REPORT"), containing exactly the information that isn't derivable from elsewhere. +Defensive contract: these helpers are observability only. Any exception +raised internally is caught and logged as a single warning; helpers MUST +NOT propagate failures to the connector ingestion path. + Memory profile: we deliberately store only the *diff* — discovered count (int) plus the names of items the filter rejected (plus their reasons). Visible names and kept names are not stored; they're derivable as counts and the kept items show up in normal ingestion logs / Status.records. + +Bounded growth: per-entity-type cap on stored filtered names +(MAX_FILTERED_ENTRIES_PER_TYPE). Past the cap, only the true count keeps +growing — names beyond the cap are dropped and the report annotates the +truncation. Prevents OOM on pathological catalogs (e.g., 10M filtered +S3 objects) while preserving correctness of counts. """ import logging @@ -36,6 +46,7 @@ from metadata.ingestion.api.status import Status REPORT_HEADER_PREFIX = "FILTER VISIBILITY REPORT" +MAX_FILTERED_ENTRIES_PER_TYPE = 50_000 _REASON_SUFFIX = "Filtered Out" @@ -50,20 +61,29 @@ def log_discovered( can compute discovered vs. kept. Names are emitted at DEBUG only — the actionable diff (what got filtered) lives in log_filtered + the end-of-step report; the full visible list would explode logs on large - catalogs without adding actionable information.""" - name_list = list(names) - count = len(name_list) - status.record_discovered(entity_type, count) - logger.info( - "Discovered %d %s(s) visible to the ingestion user", - count, - entity_type.lower(), - ) - logger.debug( - "%s(s) visible to the ingestion user: %s", - entity_type, - name_list, - ) + catalogs without adding actionable information. + + Exceptions are swallowed: observability must never break ingestion.""" + try: + name_list = list(names) + count = len(name_list) + status.record_discovered(entity_type, count) + logger.info( + "Discovered %d %s(s) visible to the ingestion user", + count, + entity_type.lower(), + ) + logger.debug( + "%s(s) visible to the ingestion user: %s", + entity_type, + name_list, + ) + except Exception: + logger.warning( + "log_discovered failed for entity_type=%r; continuing ingestion", + entity_type, + exc_info=True, + ) def log_filtered( @@ -75,22 +95,46 @@ def log_filtered( matched_against: str | None = None, use_fqn_for_filtering: bool | None = None, ) -> None: - """Log a filter-pattern rejection and record it on Status. The - reason string stored on Status is rich enough that the end-of-step - report can reproduce the full diagnostic (which pattern field, - what was matched against, whether FQN filtering was on) without - needing to retain any extra state.""" - pattern_field = _pattern_field(entity_type) - detail_parts = [f"did not pass {pattern_field}"] - if matched_against is not None and matched_against != name: - detail_parts.append(f"matched against '{matched_against}'") - if use_fqn_for_filtering is not None: - detail_parts.append(f"useFqnForFiltering={use_fqn_for_filtering}") - detail = ", ".join(detail_parts) - - logger.info("Filtering out %s '%s': %s", entity_type.lower(), name, detail) - reason = f"{entity_type} {_REASON_SUFFIX}: {detail}" - status.filter(name, reason) + """Log a filter-pattern rejection and record it on Status. The reason + string stored on Status is rich enough that the end-of-step report can + reproduce the full diagnostic (which pattern field, what was matched + against, whether FQN filtering was on) without needing extra state. + + Per-entity-type cap: once a type has accumulated + MAX_FILTERED_ENTRIES_PER_TYPE entries in Status.filtered, subsequent + rejections only bump Status.filtered_counts (cheap) and drop the name + to avoid unbounded memory growth. The report flags the truncation. + + Exceptions are swallowed: observability must never break ingestion.""" + try: + current_count = status.filtered_counts.get(entity_type, 0) + status.filtered_counts[entity_type] = current_count + 1 + + pattern_field = _pattern_field(entity_type) + detail_parts = [f"did not pass {pattern_field}"] + if matched_against is not None and matched_against != name: + detail_parts.append(f"matched against '{matched_against}'") + if use_fqn_for_filtering is not None: + detail_parts.append(f"useFqnForFiltering={use_fqn_for_filtering}") + detail = ", ".join(detail_parts) + + if current_count < MAX_FILTERED_ENTRIES_PER_TYPE: + logger.info("Filtering out %s '%s': %s", entity_type.lower(), name, detail) + reason = f"{entity_type} {_REASON_SUFFIX}: {detail}" + status.filter(name, reason) + elif current_count == MAX_FILTERED_ENTRIES_PER_TYPE: + logger.warning( + "Reached cap of %d filtered %s names; subsequent rejections will be counted but not stored", + MAX_FILTERED_ENTRIES_PER_TYPE, + entity_type.lower(), + ) + except Exception: + logger.warning( + "log_filtered failed for entity_type=%r name=%r; continuing ingestion", + entity_type, + name, + exc_info=True, + ) def log_step_summary( @@ -102,24 +146,34 @@ def log_step_summary( framed with grep-friendly markers, listing per entity type: - count visible to the ingestion user - count + names + reasons of everything the filter dropped + (with "and N more (truncated at cap)" footer if applicable) - count that will be published to OpenMetadata - No-op when there's nothing to report (e.g., a sink-only step).""" - by_type = _group_filtered_by_entity_type(status) - entity_types = sorted(set(status.discovered_counts) | set(by_type)) - if not entity_types: - return - - border = "=" * 70 - lines = [ - "", - border, - f" {REPORT_HEADER_PREFIX}: {source_name}", - border, - ] - for entity_type in entity_types: - lines.extend(_format_entity_section(status, entity_type, by_type.get(entity_type, []))) - lines.append(border) - logger.info("\n".join(lines)) + No-op when there's nothing to report (e.g., a sink-only step). + + Exceptions are swallowed: observability must never break ingestion.""" + try: + by_type = _group_filtered_by_entity_type(status) + entity_types = sorted(set(status.discovered_counts) | set(by_type) | set(status.filtered_counts)) + if not entity_types: + return + + border = "=" * 70 + lines = [ + "", + border, + f" {REPORT_HEADER_PREFIX}: {source_name}", + border, + ] + for entity_type in entity_types: + lines.extend(_format_entity_section(status, entity_type, by_type.get(entity_type, []))) + lines.append(border) + logger.info("\n".join(lines)) + except Exception: + logger.warning( + "log_step_summary failed for source=%r; continuing ingestion", + source_name, + exc_info=True, + ) def _format_entity_section( @@ -129,23 +183,31 @@ def _format_entity_section( ) -> list[str]: """Format a single entity type's section of the report. Visible and kept are counts only; filtered shows every name + the reason it was - rejected so the user can diff against their own filterPattern config.""" + rejected so the user can diff against their own filterPattern config. + True filter count comes from Status.filtered_counts so the kept math + stays correct even when names were dropped past the per-type cap.""" pattern_field = _pattern_field(entity_type) discovered = status.discovered_counts.get(entity_type) - filtered_count = len(filtered_entries) + stored_count = len(filtered_entries) + true_count = status.filtered_counts.get(entity_type, stored_count) section = ["", f"{entity_type} ({pattern_field}):"] if discovered is not None: section.append(f" Visible to ingestion user: {discovered}") - section.append(f" Filtered out ({filtered_count}):") + section.append(f" Filtered out ({true_count}):") if filtered_entries: max_name_width = max(len(name) for name, _ in filtered_entries) name_pad = min(max_name_width, 50) for name, reason in filtered_entries: detail = reason.split(": ", 1)[1] if ": " in reason else reason section.append(f" {name:<{name_pad}} → {detail}") + if true_count > stored_count: + section.append( + f" ... and {true_count - stored_count} more " + f"(full list truncated at cap of {MAX_FILTERED_ENTRIES_PER_TYPE})" + ) if discovered is not None: - kept = discovered - filtered_count + kept = discovered - true_count section.append(f" Will be published to OpenMetadata: {kept}") return section diff --git a/ingestion/tests/unit/utils/test_filter_visibility.py b/ingestion/tests/unit/utils/test_filter_visibility.py index 5486fdc87a41..e02dce95c4d8 100644 --- a/ingestion/tests/unit/utils/test_filter_visibility.py +++ b/ingestion/tests/unit/utils/test_filter_visibility.py @@ -8,14 +8,25 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -"""Tests for filter_visibility helpers used across connector base classes.""" +"""Tests for filter_visibility helpers used across connector base classes. + +Three test groups: + - Behavior: counts, reasons, report formatting, back-compat with legacy + `Status.filter()` reason strings. + - Bounded growth: per-entity-type cap on Status.filtered enforced by + log_filtered; report annotates truncation; counts stay correct. + - Resilience: helpers swallow all exceptions and never propagate them to + the connector, even with bad inputs or a broken logger. +""" import logging +from unittest.mock import MagicMock import pytest from metadata.ingestion.api.status import Status from metadata.utils.filter_visibility import ( + MAX_FILTERED_ENTRIES_PER_TYPE, log_discovered, log_filtered, log_step_summary, @@ -32,6 +43,11 @@ def logger() -> logging.Logger: return logging.getLogger("test_filter_visibility") +# --------------------------------------------------------------------------- +# Behavior +# --------------------------------------------------------------------------- + + def test_log_discovered_records_count_and_emits_info(status, logger, caplog): with caplog.at_level(logging.INFO, logger=logger.name): log_discovered(logger, status, "Database", ["db1", "db2", "db3"]) @@ -64,17 +80,17 @@ def gen(): assert status.discovered_counts == {"Topic": 2} -def test_log_filtered_stores_rich_reason_on_status(status, logger, caplog): - with caplog.at_level(logging.INFO, logger=logger.name): - log_filtered( - logger, - status, - "Database", - "BACKUP_DB", - matched_against="service.BACKUP_DB", - use_fqn_for_filtering=True, - ) +def test_log_filtered_stores_rich_reason_on_status(status, logger): + log_filtered( + logger, + status, + "Database", + "BACKUP_DB", + matched_against="service.BACKUP_DB", + use_fqn_for_filtering=True, + ) + assert status.filtered_counts == {"Database": 1} assert len(status.filtered) == 1 name, reason = next(iter(status.filtered[0].items())) assert name == "BACKUP_DB" @@ -84,18 +100,16 @@ def test_log_filtered_stores_rich_reason_on_status(status, logger, caplog): assert "useFqnForFiltering=True" in reason -def test_log_filtered_omits_matched_against_when_same_as_name(status, logger, caplog): - with caplog.at_level(logging.INFO, logger=logger.name): - log_filtered(logger, status, "Schema", "TEMP", matched_against="TEMP") +def test_log_filtered_omits_matched_against_when_same_as_name(status, logger): + log_filtered(logger, status, "Schema", "TEMP", matched_against="TEMP") name, reason = next(iter(status.filtered[0].items())) assert name == "TEMP" assert "matched against" not in reason -def test_log_filtered_works_with_minimal_args(status, logger, caplog): - with caplog.at_level(logging.INFO, logger=logger.name): - log_filtered(logger, status, "Topic", "noisy.topic") +def test_log_filtered_works_with_minimal_args(status, logger): + log_filtered(logger, status, "Topic", "noisy.topic") name, reason = next(iter(status.filtered[0].items())) assert name == "noisy.topic" @@ -178,3 +192,187 @@ def test_log_step_summary_recognizes_legacy_reason_strings(status, logger, caplo assert "Database (databaseFilterPattern):" in summary assert "Filtered out (1):" in summary assert "Will be published to OpenMetadata: 1" in summary + + +# --------------------------------------------------------------------------- +# Bounded growth — per-entity-type cap +# --------------------------------------------------------------------------- + + +def test_log_filtered_caps_stored_names_but_preserves_count(status, logger): + """Past the per-type cap, the true count keeps climbing but the + name list stops growing — Status.filtered stays bounded.""" + for i in range(MAX_FILTERED_ENTRIES_PER_TYPE + 25): + log_filtered(logger, status, "Table", f"table_{i}") + + assert status.filtered_counts["Table"] == MAX_FILTERED_ENTRIES_PER_TYPE + 25 + stored = [next(iter(entry.keys())) for entry in status.filtered] + assert len(stored) == MAX_FILTERED_ENTRIES_PER_TYPE + assert stored[0] == "table_0" + assert stored[-1] == f"table_{MAX_FILTERED_ENTRIES_PER_TYPE - 1}" + + +def test_log_step_summary_annotates_truncation_when_cap_exceeded(status, logger, caplog): + log_discovered(logger, status, "Table", [f"table_{i}" for i in range(MAX_FILTERED_ENTRIES_PER_TYPE + 200)]) + for i in range(MAX_FILTERED_ENTRIES_PER_TYPE + 200): + log_filtered(logger, status, "Table", f"table_{i}") + + caplog.clear() + with caplog.at_level(logging.INFO, logger=logger.name): + log_step_summary(logger, status, "huge_catalog") + + summary = "\n".join(r.message for r in caplog.records) + expected_total = MAX_FILTERED_ENTRIES_PER_TYPE + 200 + expected_overflow = 200 + assert f"Filtered out ({expected_total}):" in summary + assert ( + f"... and {expected_overflow} more (full list truncated at cap of {MAX_FILTERED_ENTRIES_PER_TYPE})" in summary + ) + # Kept math must use the TRUE count, not the stored count, or it would be wrong by 200 + assert "Will be published to OpenMetadata: 0" in summary + + +def test_log_filtered_per_type_cap_is_independent(status, logger): + """Hitting the cap on one entity type must not affect another.""" + for i in range(MAX_FILTERED_ENTRIES_PER_TYPE + 5): + log_filtered(logger, status, "Table", f"t_{i}") + log_filtered(logger, status, "Schema", "tmp") + + stored_tables = [next(iter(e.keys())) for e in status.filtered if next(iter(e.values())).startswith("Table ")] + stored_schemas = [next(iter(e.keys())) for e in status.filtered if next(iter(e.values())).startswith("Schema ")] + assert len(stored_tables) == MAX_FILTERED_ENTRIES_PER_TYPE + assert stored_schemas == ["tmp"] + assert status.filtered_counts == {"Table": MAX_FILTERED_ENTRIES_PER_TYPE + 5, "Schema": 1} + + +# --------------------------------------------------------------------------- +# Resilience — observability must never propagate exceptions +# --------------------------------------------------------------------------- + + +def test_log_discovered_swallows_exceptions_from_logger(status): + """Even a broken logger must not break ingestion.""" + broken_logger = MagicMock() + broken_logger.info.side_effect = RuntimeError("logger blew up") + + log_discovered(broken_logger, status, "Database", ["db1", "db2"]) + + # The count update happened before the failing info call, so it sticks + assert status.discovered_counts == {"Database": 2} + # Helper swallowed the exception and logged a warning on the same logger + broken_logger.warning.assert_called_once() + + +def test_log_discovered_swallows_exceptions_from_bad_iterable(status, logger): + """A connector returning a generator that raises mid-iteration must + not crash the connector.""" + + def exploding_generator(): + yield "ok_name" + raise RuntimeError("source went away") + + log_discovered(logger, status, "Database", exploding_generator()) # must not raise + + +def test_log_filtered_swallows_exceptions(status): + broken_logger = MagicMock() + broken_logger.info.side_effect = RuntimeError("logger blew up") + + log_filtered(broken_logger, status, "Database", "BACKUP_DB") + + # filtered_counts updated before the failing info call, so it sticks + assert status.filtered_counts == {"Database": 1} + broken_logger.warning.assert_called_once() + + +def test_log_filtered_handles_weird_name_values(status, logger): + """Bad names — None, empty, non-ASCII, control chars — must not crash.""" + # None as name; matched_against also None + log_filtered(logger, status, "Table", None) # type: ignore[arg-type] + # Empty string + log_filtered(logger, status, "Table", "") + # Unicode + log_filtered(logger, status, "Table", "日本語_table") + # Control characters + log_filtered(logger, status, "Table", "tab\there\nnewline") + + # All four counted, none crashed + assert status.filtered_counts["Table"] == 4 + + +def test_log_step_summary_swallows_exceptions(logger, caplog): + """A corrupted Status (wrong type for filtered_counts) must not crash close().""" + broken_status = Status() + # Inject a value that breaks the report math: int where dict is expected + broken_status.discovered_counts = "not a dict" # type: ignore[assignment] + + with caplog.at_level(logging.WARNING, logger=logger.name): + log_step_summary(logger, broken_status, "broken_source") # must not raise + + warnings = [r for r in caplog.records if r.levelno == logging.WARNING] + assert any("log_step_summary failed" in w.message for w in warnings) + + +def test_log_step_summary_handles_empty_status_gracefully(status, logger): + """A source step that never called log_discovered / log_filtered should + produce no report and no exception (covers sink-only steps).""" + log_step_summary(logger, status, "no_op") # must not raise + + +# --------------------------------------------------------------------------- +# Integration-style — minimal connector base class lifecycle +# --------------------------------------------------------------------------- + + +def test_end_to_end_helper_lifecycle_produces_correct_report(logger, caplog): + """Simulate what a real connector's discover-filter-close cycle looks + like — multiple databases, each with their own schemas and tables — + and verify the report counts and per-entity-type breakdown end up + correct after the streaming logs all fire.""" + status = Status() + + # Database-level: 5 visible, 2 filtered + log_discovered(logger, status, "Database", ["db_a", "db_b", "db_c", "db_d", "db_e"]) + log_filtered(logger, status, "Database", "db_d", matched_against="svc.db_d", use_fqn_for_filtering=True) + log_filtered(logger, status, "Database", "db_e", matched_against="svc.db_e", use_fqn_for_filtering=True) + + # Schemas: visited per kept database (3 dbs x 4 schemas), 1 filtered per db + for db in ("db_a", "db_b", "db_c"): + log_discovered(logger, status, "Schema", [f"{db}.public", f"{db}.audit", f"{db}.tmp", f"{db}.staging"]) + log_filtered(logger, status, "Schema", f"{db}.tmp") + + # Tables: per kept schema (3 dbs x 3 kept schemas x 5 tables), 0 filtered + for db in ("db_a", "db_b", "db_c"): + for schema in ("public", "audit", "staging"): + log_discovered(logger, status, "Table", [f"{db}.{schema}.t{i}" for i in range(5)]) + + caplog.clear() + with caplog.at_level(logging.INFO, logger=logger.name): + log_step_summary(logger, status, "fake_source") + + summary = "\n".join(r.message for r in caplog.records) + assert "FILTER VISIBILITY REPORT: fake_source" in summary + # Database: 5 visible, 2 filtered, 3 kept + assert "Database (databaseFilterPattern):" in summary + assert "Visible to ingestion user: 5" in summary + assert "Filtered out (2):" in summary + # Schema: 3 dbs x 4 = 12 visible, 3 filtered, 9 kept + assert "Schema (schemaFilterPattern):" in summary + assert "Visible to ingestion user: 12" in summary + assert "Filtered out (3):" in summary + # Table: 3 x 3 x 5 = 45 visible, 0 filtered, 45 kept + assert "Table (tableFilterPattern):" in summary + assert "Visible to ingestion user: 45" in summary + assert "Filtered out (0):" in summary + assert "Will be published to OpenMetadata: 45" in summary + + +def test_end_to_end_close_lifecycle_with_broken_summary_does_not_raise(logger): + """If log_step_summary somehow propagated (despite internal guard), + the connector's close() wrappers must catch it. Simulate by calling + log_step_summary on a Status with corrupted internals and asserting + no exception escapes.""" + status = Status() + status.discovered_counts = 12345 # type: ignore[assignment] + + log_step_summary(logger, status, "corrupted") # must not raise From a2af8e92d4989531d4c4fe8833f39038cb52cd53 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Thu, 21 May 2026 15:21:22 -0700 Subject: [PATCH 3/7] fix(filter_visibility): address PR review findings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - log_discovered: skip list() materialization when DEBUG logging is off. Use len() directly for Sized inputs (zero alloc) and a streaming sum() for generators. Cuts the per-discovery cost from O(n) memory to O(1) on large catalogs when --debug isn't active. - _entity_type_from_reason: case-insensitive marker match so legacy reasons like "Database Filtered out" (lowercase 'out' from older BigQuery code paths) get bucketed into the right report section instead of being silently undercounted. - dashboard_service.get_dashboard: null-safe via `or []` (matches the Optional[List[Any]] contract Pipeline / Messaging sources already use); materialize names once into a list so log_discovered can take the zero-allocation Sized path instead of re-listing a generator over the heavy dashboard objects. - common_db_source.get_tables_name_and_type / dashboard_service: skip the O(n) shallow copy when query_*_names_and_types() / get_dashboards_list() already returned a list. Only materialize when a subclass returned a generator. - Report: rename "Will be published to OpenMetadata" → "Passed filter patterns" and prepend a note explaining the count is filter-decision only (does not subtract source-side extraction errors or secondary filters like projectFilterPattern). Avoids confusion when reconciling report counts with the final ingested set. Tests: 23 → 28 (added DEBUG-gated materialization, Sized len() path, case-insensitive marker, and clarifying-note assertions). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../source/dashboard/dashboard_service.py | 18 ++-- .../source/database/common_db_source.py | 9 +- .../src/metadata/utils/filter_visibility.py | 48 ++++++--- .../unit/utils/test_filter_visibility.py | 98 +++++++++++++++++-- 4 files changed, 144 insertions(+), 29 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py index 1b83b81aab0b..88be94fb9adf 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py @@ -575,13 +575,17 @@ def get_dashboard(self) -> Any: """ Method to iterate through dashboard lists filter dashboards & yield dashboard details """ - dashboards = list(self.get_dashboards_list()) - log_discovered( - logger, - self.status, - "Dashboard", - (self.get_dashboard_name(d) for d in dashboards), - ) + # `or []` matches the Optional[List[Any]] contract used by Pipeline / + # Messaging sources — `list(None)` would raise TypeError if a + # concrete connector returned None for an empty workspace. Avoid + # the extra shallow copy when the source already gave us a list. + dashboards_result = self.get_dashboards_list() or [] + dashboards = dashboards_result if isinstance(dashboards_result, list) else list(dashboards_result) + # Materialize names once and pass the list to log_discovered so it + # can take the zero-allocation Sized path instead of re-listing a + # generator over the (potentially large) dashboard objects. + dashboard_names = [self.get_dashboard_name(d) for d in dashboards] + log_discovered(logger, self.status, "Dashboard", dashboard_names) for dashboard in dashboards: dashboard_name = self.get_dashboard_name(dashboard) if filter_by_dashboard( diff --git a/ingestion/src/metadata/ingestion/source/database/common_db_source.py b/ingestion/src/metadata/ingestion/source/database/common_db_source.py index 77863cd89082..ba9ea8141599 100644 --- a/ingestion/src/metadata/ingestion/source/database/common_db_source.py +++ b/ingestion/src/metadata/ingestion/source/database/common_db_source.py @@ -370,7 +370,11 @@ def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, TableType]]]: schema_name = self.context.get().database_schema if self.source_config.includeTables: try: - table_iter = list(self.query_table_names_and_types(schema_name)) + # Default impl returns a list; only materialize if a subclass + # returns a generator. Avoids an O(n) shallow copy on the + # common path for large schemas. + table_result = self.query_table_names_and_types(schema_name) + table_iter = table_result if isinstance(table_result, list) else list(table_result) except Exception as err: logger.warning(f"Fetching table list failed for schema {schema_name} due to - {err}") logger.debug(traceback.format_exc()) @@ -415,7 +419,8 @@ def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, TableType]]]: if self.source_config.includeViews: try: - view_iter = list(self.query_view_names_and_types(schema_name)) + view_result = self.query_view_names_and_types(schema_name) + view_iter = view_result if isinstance(view_result, list) else list(view_result) except Exception as err: logger.warning(f"Fetching view list failed for schema {schema_name} due to - {err}") logger.debug(traceback.format_exc()) diff --git a/ingestion/src/metadata/utils/filter_visibility.py b/ingestion/src/metadata/utils/filter_visibility.py index af0548945840..da11598f5958 100644 --- a/ingestion/src/metadata/utils/filter_visibility.py +++ b/ingestion/src/metadata/utils/filter_visibility.py @@ -41,6 +41,7 @@ """ import logging +from collections.abc import Sized from typing import Iterable # noqa: UP035 from metadata.ingestion.api.status import Status @@ -63,21 +64,34 @@ def log_discovered( end-of-step report; the full visible list would explode logs on large catalogs without adding actionable information. + Memory: avoids materializing `names` when DEBUG isn't enabled. If the + caller already passes a Sized collection (list / tuple), we use len() + directly — zero extra allocation. For generators we either stream-count + (when DEBUG is off) or materialize once (when DEBUG is on, so we can + emit the full list). + Exceptions are swallowed: observability must never break ingestion.""" try: - name_list = list(names) - count = len(name_list) + debug_on = logger.isEnabledFor(logging.DEBUG) + if isinstance(names, Sized): + count = len(names) + elif debug_on: + names = list(names) + count = len(names) + else: + count = sum(1 for _ in names) status.record_discovered(entity_type, count) logger.info( "Discovered %d %s(s) visible to the ingestion user", count, entity_type.lower(), ) - logger.debug( - "%s(s) visible to the ingestion user: %s", - entity_type, - name_list, - ) + if debug_on: + logger.debug( + "%s(s) visible to the ingestion user: %s", + entity_type, + names, + ) except Exception: logger.warning( "log_discovered failed for entity_type=%r; continuing ingestion", @@ -163,6 +177,10 @@ def log_step_summary( border, f" {REPORT_HEADER_PREFIX}: {source_name}", border, + " Note: 'Passed filter patterns' = discovered - filter rejections.", + " It does not subtract source-side extraction failures or", + " secondary filters (e.g., projectFilterPattern on dashboards).", + " For the final ingested count, see Status.records / OpenMetadata.", ] for entity_type in entity_types: lines.extend(_format_entity_section(status, entity_type, by_type.get(entity_type, []))) @@ -208,7 +226,7 @@ def _format_entity_section( ) if discovered is not None: kept = discovered - true_count - section.append(f" Will be published to OpenMetadata: {kept}") + section.append(f" Passed filter patterns: {kept}") return section @@ -235,12 +253,14 @@ def _pattern_field(entity_type: str) -> str: def _entity_type_from_reason(reason: str) -> str | None: - """Inverse of the reason string built in log_filtered. Looks for the - ' Filtered Out' marker so it works for both legacy reasons - ('Database Filtered Out') and the enriched form - ('Database Filtered Out: did not pass databaseFilterPattern...').""" - marker = f" {_REASON_SUFFIX}" - idx = reason.find(marker) + """Inverse of the reason string built in log_filtered. Case-insensitive + match on ' Filtered Out' so it catches both the enriched form + ('Database Filtered Out: did not pass databaseFilterPattern...') and + historical variants (e.g., 'Database Filtered out' with lowercase + 'out' that show up in older BigQuery code paths). The slice preserves + the original entity-type casing so the report still reads correctly.""" + marker_lower = f" {_REASON_SUFFIX}".lower() + idx = reason.lower().find(marker_lower) if idx > 0: return reason[:idx] return None diff --git a/ingestion/tests/unit/utils/test_filter_visibility.py b/ingestion/tests/unit/utils/test_filter_visibility.py index e02dce95c4d8..99c76fcee0e1 100644 --- a/ingestion/tests/unit/utils/test_filter_visibility.py +++ b/ingestion/tests/unit/utils/test_filter_visibility.py @@ -80,6 +80,59 @@ def gen(): assert status.discovered_counts == {"Topic": 2} +def test_log_discovered_skips_materialization_when_debug_disabled(status, logger): + """For large catalogs the names iterable can be expensive to materialize. + When DEBUG isn't enabled we have no use for the names — only the count + matters — so the helper must stream-count instead of calling list().""" + materialize_count = {"calls": 0} + + def tracking_gen(): + for i in range(5): + materialize_count["calls"] += 1 + yield f"n{i}" + + # logger fixture is at WARNING by default — DEBUG is OFF + log_discovered(logger, status, "Topic", tracking_gen()) + + # Generator was consumed exactly once for the count (5 yields), but no + # list() copy was made on top of that + assert materialize_count["calls"] == 5 + assert status.discovered_counts == {"Topic": 5} + + +def test_log_discovered_uses_len_directly_for_sized_collections(status, logger): + """A list/tuple already knows its length — no need to iterate at all + when DEBUG is disabled.""" + + class SizedNoIter: + """A Sized that raises if iterated — proves len() path is used.""" + + def __len__(self): + return 42 + + def __iter__(self): + raise AssertionError("must not iterate when only len() is needed") + + log_discovered(logger, status, "Table", SizedNoIter()) # type: ignore[arg-type] + + assert status.discovered_counts == {"Table": 42} + + +def test_log_discovered_materializes_when_debug_enabled(status, logger, caplog): + """When the operator turned DEBUG on, they explicitly want the names — + so the helper materializes to emit the full list.""" + + def gen(): + yield from ["n1", "n2"] + + with caplog.at_level(logging.DEBUG, logger=logger.name): + log_discovered(logger, status, "Topic", gen()) + + debug_msgs = [r.message for r in caplog.records if r.levelno == logging.DEBUG] + assert any("n1" in m and "n2" in m for m in debug_msgs) + assert status.discovered_counts == {"Topic": 2} + + def test_log_filtered_stores_rich_reason_on_status(status, logger): log_filtered( logger, @@ -132,14 +185,14 @@ def test_log_step_summary_emits_consolidated_report(status, logger, caplog): assert "Database (databaseFilterPattern):" in summary assert "Visible to ingestion user: 4" in summary assert "Filtered out (1):" in summary - assert "Will be published to OpenMetadata: 3" in summary + assert "Passed filter patterns: 3" in summary assert "a" in summary and "did not pass databaseFilterPattern" in summary assert "matched against 'svc.a'" in summary assert "Schema (schemaFilterPattern):" in summary assert "Visible to ingestion user: 3" in summary assert "Filtered out (2):" in summary - assert "Will be published to OpenMetadata: 1" in summary + assert "Passed filter patterns: 1" in summary def test_log_step_summary_handles_filtered_without_discovered(status, logger, caplog): @@ -173,7 +226,7 @@ def test_log_step_summary_skips_unrelated_filter_reasons(status, logger, caplog) summary = "\n".join(r.message for r in caplog.records) assert "Visible to ingestion user: 2" in summary assert "Filtered out (0):" in summary - assert "Will be published to OpenMetadata: 2" in summary + assert "Passed filter patterns: 2" in summary assert "some other reason" not in summary @@ -191,7 +244,40 @@ def test_log_step_summary_recognizes_legacy_reason_strings(status, logger, caplo summary = "\n".join(r.message for r in caplog.records) assert "Database (databaseFilterPattern):" in summary assert "Filtered out (1):" in summary - assert "Will be published to OpenMetadata: 1" in summary + assert "Passed filter patterns: 1" in summary + + +def test_log_step_summary_matches_legacy_lowercase_out_variant(status, logger, caplog): + """Pre-existing BigQuery code path historically used 'Database Filtered out' + (lowercase 'out'). Our marker match must be case-insensitive so those + entries still get counted in the right section of the report.""" + log_discovered(logger, status, "Database", ["a", "b", "c"]) + status.filter("a", "Database Filtered out") # lowercase 'out' from older code + + caplog.clear() + with caplog.at_level(logging.INFO, logger=logger.name): + log_step_summary(logger, status, "legacy_lowercase") + + summary = "\n".join(r.message for r in caplog.records) + assert "Database (databaseFilterPattern):" in summary + assert "Filtered out (1):" in summary + assert "Passed filter patterns: 2" in summary + + +def test_log_step_summary_includes_clarifying_note(status, logger, caplog): + """Kept counts can over-report actual ingestion when downstream + extraction fails or secondary filters reject items. The report header + must say so explicitly so users don't try to reconcile numbers.""" + log_discovered(logger, status, "Dashboard", ["d1"]) + log_filtered(logger, status, "Dashboard", "d1") + + caplog.clear() + with caplog.at_level(logging.INFO, logger=logger.name): + log_step_summary(logger, status, "doc_note") + + summary = "\n".join(r.message for r in caplog.records) + assert "Note:" in summary + assert "does not subtract source-side extraction failures" in summary # --------------------------------------------------------------------------- @@ -229,7 +315,7 @@ def test_log_step_summary_annotates_truncation_when_cap_exceeded(status, logger, f"... and {expected_overflow} more (full list truncated at cap of {MAX_FILTERED_ENTRIES_PER_TYPE})" in summary ) # Kept math must use the TRUE count, not the stored count, or it would be wrong by 200 - assert "Will be published to OpenMetadata: 0" in summary + assert "Passed filter patterns: 0" in summary def test_log_filtered_per_type_cap_is_independent(status, logger): @@ -364,7 +450,7 @@ def test_end_to_end_helper_lifecycle_produces_correct_report(logger, caplog): assert "Table (tableFilterPattern):" in summary assert "Visible to ingestion user: 45" in summary assert "Filtered out (0):" in summary - assert "Will be published to OpenMetadata: 45" in summary + assert "Passed filter patterns: 45" in summary def test_end_to_end_close_lifecycle_with_broken_summary_does_not_raise(logger): From 1c477014f8923430ebf5ea847bdc4ea634ace279 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Thu, 21 May 2026 15:51:59 -0700 Subject: [PATCH 4/7] fix(filter_visibility): address second-pass review findings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Status.get_filtered_count(): true count of filter rejections that includes overflow past the per-type cap. Summary.from_step (step.py) now uses it so persisted StepSummary.filtered stays accurate even when names were dropped to bound memory. Falls back to len(filtered) for legacy callers that never populated filtered_counts. - log_step_summary docstring: aligned with the actual "Passed filter patterns" label and the kept-semantic note printed inside the report. - _get_filtered_database_names: restored streaming iteration. The earlier list() was needed for log_discovered, but discovery logging was moved out of this maintenance-pass helper, so materialization is now pure overhead on large catalogs during mark-deleted. - messaging_service.get_topic / pipeline_service.get_pipeline: same isinstance optimization as dashboard_service / common_db_source — skip the O(n) shallow copy when the source already returned a list. Pre-materialize names once into a list so log_discovered takes the zero-allocation Sized path. - storage_service _filter_entries: preserve bucket context when logging a containerFilterPattern rejection. Stored name is now `bucket/path` and the raw `path` is exposed via matched_against — operators can identify which bucket an entry came from (dataPath alone is not unique across buckets). Tests: 28 -> 31 (added get_filtered_count cap-overflow + legacy + empty cases). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/metadata/ingestion/api/status.py | 9 +++++++ ingestion/src/metadata/ingestion/api/step.py | 2 +- .../source/database/database_service.py | 8 +++--- .../source/messaging/messaging_service.py | 15 +++++------ .../source/pipeline/pipeline_service.py | 15 +++++------ .../source/storage/storage_service.py | 11 +++++++- .../src/metadata/utils/filter_visibility.py | 5 +++- .../unit/utils/test_filter_visibility.py | 25 +++++++++++++++++++ 8 files changed, 70 insertions(+), 20 deletions(-) diff --git a/ingestion/src/metadata/ingestion/api/status.py b/ingestion/src/metadata/ingestion/api/status.py index 018c0961ca80..7132d250223f 100644 --- a/ingestion/src/metadata/ingestion/api/status.py +++ b/ingestion/src/metadata/ingestion/api/status.py @@ -106,6 +106,15 @@ def warning(self, key: str, reason: str) -> None: def filter(self, key: str, reason: str) -> None: self.filtered.append({key: reason}) + def get_filtered_count(self) -> int: + """True count of filter rejections, including any entries past the + per-entity-type cap that were counted in `filtered_counts` but not + appended to `filtered` to bound memory. Returns the larger of the + stored list length and the sum of per-type true counts; this stays + correct under both legacy callers (only populate `filtered`) and + the helper-driven path (populates `filtered_counts` even past cap).""" + return max(len(self.filtered), sum(self.filtered_counts.values())) + def record_discovered(self, entity_type: str, count: int) -> None: """Record the count of entities discovered from the source before any filter pattern is applied. Used by log_step_summary to report the diff --git a/ingestion/src/metadata/ingestion/api/step.py b/ingestion/src/metadata/ingestion/api/step.py index 4069fcd482e6..ed527839966e 100644 --- a/ingestion/src/metadata/ingestion/api/step.py +++ b/ingestion/src/metadata/ingestion/api/step.py @@ -117,7 +117,7 @@ def from_step(cls, step: Step) -> "Summary": updated_records=len(step.status.updated_records), warnings=len(step.status.warnings), errors=len(step.status.failures), - filtered=len(step.status.filtered), + filtered=step.status.get_filtered_count(), failures=step.status.failures[0:10] if step.status.failures else None, progress=progress_tracker.get_progress_as_dict() or None, operationMetrics=operation_metrics.get_summary() or None, diff --git a/ingestion/src/metadata/ingestion/source/database/database_service.py b/ingestion/src/metadata/ingestion/source/database/database_service.py index d7ceacb386ba..5bd57f344196 100644 --- a/ingestion/src/metadata/ingestion/source/database/database_service.py +++ b/ingestion/src/metadata/ingestion/source/database/database_service.py @@ -552,10 +552,12 @@ def _get_filtered_database_names(self, return_fqn: bool = False, add_to_status: mark_databases_as_deleted), not the main ingestion path. Discovery logging lives in each connector's get_database_names() to avoid double-counting; this method only emits filter-rejection logs when - add_to_status=True. + add_to_status=True. The raw names iterable is consumed once and + streamed — never materialized — so mark-deleted stays O(1) memory + on large catalogs. """ - raw_names = list(getattr(self, "get_database_names_raw", self.get_database_names)()) - for database_name in raw_names: + raw_names_iter = getattr(self, "get_database_names_raw", self.get_database_names)() + for database_name in raw_names_iter: database_fqn = fqn.build( self.metadata, entity_type=Database, diff --git a/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py b/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py index 048f06a9b371..aae251b13b37 100644 --- a/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py +++ b/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py @@ -211,13 +211,14 @@ def get_topic_name(self, topic_details: Any) -> str: """ def get_topic(self) -> Any: - topics = list(self.get_topic_list() or []) - log_discovered( - logger, - self.status, - "Topic", - (self.get_topic_name(t) for t in topics), - ) + # `or []` for null safety + isinstance check to skip the redundant + # shallow copy when the source already returned a list. + topics_result = self.get_topic_list() or [] + topics = topics_result if isinstance(topics_result, list) else list(topics_result) + # Materialize names once into a list so log_discovered takes the + # zero-allocation Sized path instead of re-listing a generator. + topic_names = [self.get_topic_name(t) for t in topics] + log_discovered(logger, self.status, "Topic", topic_names) for topic_details in topics: topic_name = self.get_topic_name(topic_details) if filter_by_topic( diff --git a/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py b/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py index 78fa1f252aaa..ff7fadea8981 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py @@ -363,13 +363,14 @@ def yield_create_request_pipeline_service(self, config: WorkflowSource): yield Either(right=self.metadata.get_create_service_from_source(entity=PipelineService, config=config)) def get_pipeline(self) -> Any: - pipelines = list(self.get_pipelines_list() or []) - log_discovered( - logger, - self.status, - "Pipeline", - (self.get_pipeline_name(p) for p in pipelines), - ) + # `or []` for null safety + isinstance check to skip the redundant + # shallow copy when the source already returned a list. + pipelines_result = self.get_pipelines_list() or [] + pipelines = pipelines_result if isinstance(pipelines_result, list) else list(pipelines_result) + # Materialize names once into a list so log_discovered takes the + # zero-allocation Sized path instead of re-listing a generator. + pipeline_names = [self.get_pipeline_name(p) for p in pipelines] + log_discovered(logger, self.status, "Pipeline", pipeline_names) for pipeline_detail in pipelines: pipeline_name = self.get_pipeline_name(pipeline_detail) if filter_by_pipeline( diff --git a/ingestion/src/metadata/ingestion/source/storage/storage_service.py b/ingestion/src/metadata/ingestion/source/storage/storage_service.py index f04812dec182..887695fa7fef 100644 --- a/ingestion/src/metadata/ingestion/source/storage/storage_service.py +++ b/ingestion/src/metadata/ingestion/source/storage/storage_service.py @@ -672,7 +672,16 @@ def filter_manifest_entries(self, bucket_name: str, entries: List[MetadataEntry] # 2. Pipeline-level containerFilterPattern against the dataPath. if pattern and filter_by_container(pattern, path): if hasattr(self, "status") and hasattr(self.status, "filter"): - log_filtered(logger, self.status, "Container", path) + # Use bucket/path as the stored name so operators can + # tell which bucket an entry came from — dataPath alone + # is not unique across buckets. + log_filtered( + logger, + self.status, + "Container", + f"{bucket_name}/{path}", + matched_against=path, + ) else: logger.info( f"Skipping manifest entry '{path}' in bucket '{bucket_name}' " diff --git a/ingestion/src/metadata/utils/filter_visibility.py b/ingestion/src/metadata/utils/filter_visibility.py index da11598f5958..6f10fc60bb71 100644 --- a/ingestion/src/metadata/utils/filter_visibility.py +++ b/ingestion/src/metadata/utils/filter_visibility.py @@ -161,7 +161,10 @@ def log_step_summary( - count visible to the ingestion user - count + names + reasons of everything the filter dropped (with "and N more (truncated at cap)" footer if applicable) - - count that will be published to OpenMetadata + - count that passed all filter patterns (this is a filter-decision + count, NOT the final ingested count; downstream extraction + failures and secondary filters like projectFilterPattern are + not subtracted — see the note printed inside the report) No-op when there's nothing to report (e.g., a sink-only step). Exceptions are swallowed: observability must never break ingestion.""" diff --git a/ingestion/tests/unit/utils/test_filter_visibility.py b/ingestion/tests/unit/utils/test_filter_visibility.py index 99c76fcee0e1..5bb7a316161e 100644 --- a/ingestion/tests/unit/utils/test_filter_visibility.py +++ b/ingestion/tests/unit/utils/test_filter_visibility.py @@ -264,6 +264,31 @@ def test_log_step_summary_matches_legacy_lowercase_out_variant(status, logger, c assert "Passed filter patterns: 2" in summary +def test_get_filtered_count_returns_true_count_past_cap(status, logger): + """Status.get_filtered_count must surface the true count (including + overflow past the per-type cap) so persisted StepSummary.filtered + stays accurate even when names were dropped to bound memory.""" + for i in range(MAX_FILTERED_ENTRIES_PER_TYPE + 17): + log_filtered(logger, status, "Table", f"t{i}") + + assert len(status.filtered) == MAX_FILTERED_ENTRIES_PER_TYPE + # get_filtered_count returns the true count, not the stored len + assert status.get_filtered_count() == MAX_FILTERED_ENTRIES_PER_TYPE + 17 + + +def test_get_filtered_count_handles_legacy_only_entries(status): + """If a legacy caller bypassed the helper and called status.filter() + directly, the count is len(filtered) (filtered_counts is empty).""" + status.filter("a", "Database Filtered Out") + status.filter("b", "Database Filtered Out") + + assert status.get_filtered_count() == 2 + + +def test_get_filtered_count_empty_status_is_zero(status): + assert status.get_filtered_count() == 0 + + def test_log_step_summary_includes_clarifying_note(status, logger, caplog): """Kept counts can over-report actual ingestion when downstream extraction fails or secondary filters reject items. The report header From 5b042a6a2474a81c57c519a42f1b1bb133e17e23 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Thu, 21 May 2026 16:58:46 -0700 Subject: [PATCH 5/7] fix(filter_visibility): address third-pass review findings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - dashboard_service.get_dashboard: coerce project_name to a string before passing to log_filtered. project_name is reassigned to a list (project_names) when get_project_names returns a non-empty list — passing the list verbatim produced a stringified list like "['a', 'b']" as the stored name, which made the reason unparseable by _entity_type_from_reason and the report ugly. Now joined with ", " when it's a list, kept as-is when it's already a string. - Status.get_filtered_count: precise per-entity-type formula so a mixed helper + legacy + cap-overflow scenario stays correct. The prior max() formula undercounted when one type hit the cap (helper) AND another type had legacy direct status.filter() entries — exactly the worst case the reviewer called out. Now: helper-tracked true count + legacy entries whose entity-type prefix isn't in filtered_counts. Tests: 31 -> 33 (added mixed helper+legacy case and the worst-case mixed helper-cap-overflow + legacy case). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/metadata/ingestion/api/status.py | 29 +++++++++++++++---- .../source/dashboard/dashboard_service.py | 10 ++++++- .../unit/utils/test_filter_visibility.py | 26 +++++++++++++++++ 3 files changed, 59 insertions(+), 6 deletions(-) diff --git a/ingestion/src/metadata/ingestion/api/status.py b/ingestion/src/metadata/ingestion/api/status.py index 7132d250223f..dee47830fbcd 100644 --- a/ingestion/src/metadata/ingestion/api/status.py +++ b/ingestion/src/metadata/ingestion/api/status.py @@ -109,11 +109,30 @@ def filter(self, key: str, reason: str) -> None: def get_filtered_count(self) -> int: """True count of filter rejections, including any entries past the per-entity-type cap that were counted in `filtered_counts` but not - appended to `filtered` to bound memory. Returns the larger of the - stored list length and the sum of per-type true counts; this stays - correct under both legacy callers (only populate `filtered`) and - the helper-driven path (populates `filtered_counts` even past cap).""" - return max(len(self.filtered), sum(self.filtered_counts.values())) + appended to `filtered` to bound memory. + + Handles three populations: + - Helper-driven types (in `filtered_counts`): true count comes + from `filtered_counts`, which keeps growing past the per-type + cap even when the name is not stored in `filtered`. + - Legacy direct `status.filter()` callers: not represented in + `filtered_counts`. Count them by walking `filtered` and + including only entries whose entity-type prefix isn't already + tracked by the helper, so we don't double-count. + - No helper usage at all: degenerate case; fall back to + `len(filtered)`. + """ + if not self.filtered_counts: + return len(self.filtered) + + helper_total = sum(self.filtered_counts.values()) + tracked_types = set(self.filtered_counts) + legacy_count = 0 + for entry in self.filtered: + for reason in entry.values(): + if not any(reason.startswith(f"{t} ") for t in tracked_types): + legacy_count += 1 + return helper_total + legacy_count def record_discovered(self, entity_type: str, count: int) -> None: """Record the count of entities discovered from the source before any diff --git a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py index 88be94fb9adf..5f6f2904f189 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py @@ -616,11 +616,19 @@ def get_dashboard(self) -> Any: self.source_config.projectFilterPattern, project_name, ): + # project_name was reassigned to a list above when the + # connector returned multiple project names; coerce + # to a single string so the stored reason and report + # output stay readable (and _entity_type_from_reason + # can still parse the reason cleanly). + project_label = ( + project_name if isinstance(project_name, str) else ", ".join(str(p) for p in project_name) + ) log_filtered( logger, self.status, "Project", - project_name, + project_label, ) continue diff --git a/ingestion/tests/unit/utils/test_filter_visibility.py b/ingestion/tests/unit/utils/test_filter_visibility.py index 5bb7a316161e..f84f340781f0 100644 --- a/ingestion/tests/unit/utils/test_filter_visibility.py +++ b/ingestion/tests/unit/utils/test_filter_visibility.py @@ -289,6 +289,32 @@ def test_get_filtered_count_empty_status_is_zero(status): assert status.get_filtered_count() == 0 +def test_get_filtered_count_mixed_helper_and_legacy(status, logger): + """Helper tracks Database; legacy callers added Schema entries directly. + The legacy Schema entries must be counted in addition to the helper's + true Database count — not absorbed by the helper's tally.""" + for i in range(10): + log_filtered(logger, status, "Database", f"db_{i}") + status.filter("legacy_schema_1", "Schema Filtered Out") + status.filter("legacy_schema_2", "Schema Filtered Out") + + # 10 helper Database + 2 legacy Schema = 12 + assert status.get_filtered_count() == 12 + + +def test_get_filtered_count_mixed_with_cap_overflow(status, logger): + """Worst case the second-pass reviewer called out: helper hits the cap + on one type AND legacy adds entries for a different type. True count + must include both the cap-overflow and the legacy entries.""" + for i in range(MAX_FILTERED_ENTRIES_PER_TYPE + 30): + log_filtered(logger, status, "Table", f"t_{i}") + status.filter("legacy_schema_1", "Schema Filtered Out") + status.filter("legacy_schema_2", "Schema Filtered Out") + + # MAX + 30 helper Table + 2 legacy Schema + assert status.get_filtered_count() == MAX_FILTERED_ENTRIES_PER_TYPE + 32 + + def test_log_step_summary_includes_clarifying_note(status, logger, caplog): """Kept counts can over-report actual ingestion when downstream extraction fails or secondary filters reject items. The report header From 96af826b041912167253f56f6626aae3c37dcab3 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Fri, 22 May 2026 08:50:49 -0700 Subject: [PATCH 6/7] fix(filter_visibility): address fourth-pass review findings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - mark_databases_as_deleted: pass add_to_status=False to _get_filtered_database_names. Without this, every filtered database was being recorded twice (once during main ingestion via the per-connector get_database_names, once during the maintenance pass), inflating Status.filtered + filtered_counts and skewing the report. - _get_filtered_schema_names: only materialize the raw schema names when add_to_status=True (where log_discovered needs the count up front). The mark-deleted maintenance paths iterate once with add_to_status=False — streaming saves O(n) memory on large schemas. Same fix applied to BigQuery's override. - dashboard/pipeline/messaging get_X(): compute names once into (entity, name) tuples instead of calling the connector-specific get_*_name() twice per entity (once to build the discovery list, once inside the filter loop). Real savings when name extraction is non-trivial (e.g., Tableau workbook lookups). - DB connector log_filtered calls: use the raw entity name (database_name, schema_name, table_name, view_name, new_database, project_id) as `name` and the FQN as `matched_against`. The previous convention of `name=` produced confusing report output when useFqnForFiltering was False ("filtered name = FQN, matched against = raw name") and was inconsistent with non-SQL connectors (Dashboard/Pipeline/Topic/ MLModel) which already stored raw names. Report column is now readable across all connector families. Tests: existing 33 still pass (no test changes needed — the storage-key change to raw names is verified by the existing dashboard/pipeline/topic assertions which always asserted raw names). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../source/dashboard/dashboard_service.py | 12 ++++----- .../source/database/bigquery/metadata.py | 11 +++++--- .../source/database/common_db_source.py | 4 +-- .../source/database/database_service.py | 25 +++++++++++++++---- .../source/database/mssql/metadata.py | 2 +- .../source/database/postgres/metadata.py | 2 +- .../source/database/redshift/metadata.py | 2 +- .../source/database/snowflake/metadata.py | 2 +- .../source/messaging/messaging_service.py | 11 ++++---- .../source/pipeline/pipeline_service.py | 11 ++++---- 10 files changed, 49 insertions(+), 33 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py index 5f6f2904f189..b51e673e943d 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py @@ -581,13 +581,11 @@ def get_dashboard(self) -> Any: # the extra shallow copy when the source already gave us a list. dashboards_result = self.get_dashboards_list() or [] dashboards = dashboards_result if isinstance(dashboards_result, list) else list(dashboards_result) - # Materialize names once and pass the list to log_discovered so it - # can take the zero-allocation Sized path instead of re-listing a - # generator over the (potentially large) dashboard objects. - dashboard_names = [self.get_dashboard_name(d) for d in dashboards] - log_discovered(logger, self.status, "Dashboard", dashboard_names) - for dashboard in dashboards: - dashboard_name = self.get_dashboard_name(dashboard) + # Compute names once and reuse — `get_dashboard_name` is a + # connector-specific call that may not be free. + dashboard_pairs = [(d, self.get_dashboard_name(d)) for d in dashboards] + log_discovered(logger, self.status, "Dashboard", [n for _, n in dashboard_pairs]) + for dashboard, dashboard_name in dashboard_pairs: if filter_by_dashboard( self.source_config.dashboardFilterPattern, dashboard_name, diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py b/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py index ffd95be15fda..479623b77815 100644 --- a/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py @@ -664,9 +664,14 @@ def _get_filtered_datasets(self, project_id: str) -> List[str]: # noqa: UP006 ] def _get_filtered_schema_names(self, return_fqn: bool = False, add_to_status: bool = True) -> Iterable[str]: - raw_names = list(self.get_raw_database_schema_names()) + # Only materialize when log_discovered needs the count up front; + # mark-deleted paths iterate once with add_to_status=False so + # streaming saves O(n) memory on large projects. if add_to_status: + raw_names: Iterable[str] = list(self.get_raw_database_schema_names()) log_discovered(logger, self.status, "Schema", raw_names) + else: + raw_names = self.get_raw_database_schema_names() for schema_name in raw_names: schema_fqn = fqn.build( self.metadata, @@ -685,7 +690,7 @@ def _get_filtered_schema_names(self, return_fqn: bool = False, add_to_status: bo logger, self.status, "Schema", - schema_fqn, + schema_name, matched_against=filter_name, use_fqn_for_filtering=self.source_config.useFqnForFiltering, ) @@ -855,7 +860,7 @@ def get_database_names(self) -> Iterable[str]: logger, self.status, "Database", - database_fqn, + project_id, matched_against=filter_name, use_fqn_for_filtering=self.source_config.useFqnForFiltering, ) diff --git a/ingestion/src/metadata/ingestion/source/database/common_db_source.py b/ingestion/src/metadata/ingestion/source/database/common_db_source.py index ba9ea8141599..64ba524f2e15 100644 --- a/ingestion/src/metadata/ingestion/source/database/common_db_source.py +++ b/ingestion/src/metadata/ingestion/source/database/common_db_source.py @@ -406,7 +406,7 @@ def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, TableType]]]: logger, self.status, "Table", - table_fqn, + table_name, matched_against=filter_name, use_fqn_for_filtering=self.source_config.useFqnForFiltering, ) @@ -452,7 +452,7 @@ def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, TableType]]]: logger, self.status, "Table", - view_fqn, + view_name, matched_against=filter_name, use_fqn_for_filtering=self.source_config.useFqnForFiltering, ) diff --git a/ingestion/src/metadata/ingestion/source/database/database_service.py b/ingestion/src/metadata/ingestion/source/database/database_service.py index 5bd57f344196..8caa7e118d2a 100644 --- a/ingestion/src/metadata/ingestion/source/database/database_service.py +++ b/ingestion/src/metadata/ingestion/source/database/database_service.py @@ -570,11 +570,16 @@ def _get_filtered_database_names(self, return_fqn: bool = False, add_to_status: filter_name, ): if add_to_status: + # Store the raw database name; expose the FQN via + # matched_against. Matches the convention used by + # Dashboard/Pipeline/Topic/MLModel sources and keeps + # the report column readable when useFqnForFiltering + # is False. log_filtered( logger, self.status, "Database", - database_fqn, + database_name, matched_against=filter_name, use_fqn_for_filtering=self.source_config.useFqnForFiltering, ) @@ -582,9 +587,15 @@ def _get_filtered_database_names(self, return_fqn: bool = False, add_to_status: yield database_fqn if return_fqn else database_name def _get_filtered_schema_names(self, return_fqn: bool = False, add_to_status: bool = True) -> Iterable[str]: - raw_names = list(self.get_raw_database_schema_names()) + # Only materialize when log_discovered actually needs the count up + # front. Mark-deleted maintenance paths call us with + # add_to_status=False and iterate once — streaming saves O(n) memory + # on large schemas. if add_to_status: + raw_names: Iterable[str] = list(self.get_raw_database_schema_names()) log_discovered(logger, self.status, "Schema", raw_names) + else: + raw_names = self.get_raw_database_schema_names() for schema_name in raw_names: schema_fqn = fqn.build( self.metadata, @@ -603,7 +614,7 @@ def _get_filtered_schema_names(self, return_fqn: bool = False, add_to_status: bo logger, self.status, "Schema", - schema_fqn, + schema_name, matched_against=filter_name, use_fqn_for_filtering=self.source_config.useFqnForFiltering, ) @@ -799,8 +810,12 @@ def mark_databases_as_deleted(self): # filtered out, as well as any databases that were processed in this run all_database_fqns = set() - # Get all databases from the source (both filtered-in and filtered-out) - for database_name in self._get_filtered_database_names(): + # Get all databases from the source (both filtered-in and + # filtered-out). Pass add_to_status=False — the main ingestion + # path already called log_filtered for each rejected database, + # so recording them again here would double-count Status.filtered + # and the report's "Passed filter patterns" math would go negative. + for database_name in self._get_filtered_database_names(add_to_status=False): database_fqn = fqn.build( self.metadata, entity_type=Database, diff --git a/ingestion/src/metadata/ingestion/source/database/mssql/metadata.py b/ingestion/src/metadata/ingestion/source/database/mssql/metadata.py index 4c14ea367fc9..6aa47acd11db 100644 --- a/ingestion/src/metadata/ingestion/source/database/mssql/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/mssql/metadata.py @@ -218,7 +218,7 @@ def get_database_names(self) -> Iterable[str]: logger, self.status, "Database", - database_fqn, + new_database, matched_against=filter_name, use_fqn_for_filtering=self.source_config.useFqnForFiltering, ) diff --git a/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py b/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py index ba625d249122..105bfe6eb35f 100644 --- a/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py @@ -200,7 +200,7 @@ def get_database_names(self) -> Iterable[str]: logger, self.status, "Database", - database_fqn, + new_database, matched_against=filter_name, use_fqn_for_filtering=self.source_config.useFqnForFiltering, ) diff --git a/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py b/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py index e3b2e6513181..2fab846e025a 100644 --- a/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py @@ -305,7 +305,7 @@ def get_database_names(self) -> Iterable[str]: logger, self.status, "Database", - database_fqn, + new_database, matched_against=filter_name, use_fqn_for_filtering=self.source_config.useFqnForFiltering, ) diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py b/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py index 0ba4cb9b8666..14a26e16ba59 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py @@ -423,7 +423,7 @@ def get_database_names(self) -> Iterable[str]: logger, self.status, "Database", - database_fqn, + new_database, matched_against=filter_name, use_fqn_for_filtering=self.source_config.useFqnForFiltering, ) diff --git a/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py b/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py index aae251b13b37..b4b6eae34a37 100644 --- a/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py +++ b/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py @@ -215,12 +215,11 @@ def get_topic(self) -> Any: # shallow copy when the source already returned a list. topics_result = self.get_topic_list() or [] topics = topics_result if isinstance(topics_result, list) else list(topics_result) - # Materialize names once into a list so log_discovered takes the - # zero-allocation Sized path instead of re-listing a generator. - topic_names = [self.get_topic_name(t) for t in topics] - log_discovered(logger, self.status, "Topic", topic_names) - for topic_details in topics: - topic_name = self.get_topic_name(topic_details) + # Compute names once and reuse — `get_topic_name` is a + # connector-specific call that may not be free. + topic_pairs = [(t, self.get_topic_name(t)) for t in topics] + log_discovered(logger, self.status, "Topic", [n for _, n in topic_pairs]) + for topic_details, topic_name in topic_pairs: if filter_by_topic( self.source_config.topicFilterPattern, topic_name, diff --git a/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py b/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py index ff7fadea8981..550e8583c489 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py @@ -367,12 +367,11 @@ def get_pipeline(self) -> Any: # shallow copy when the source already returned a list. pipelines_result = self.get_pipelines_list() or [] pipelines = pipelines_result if isinstance(pipelines_result, list) else list(pipelines_result) - # Materialize names once into a list so log_discovered takes the - # zero-allocation Sized path instead of re-listing a generator. - pipeline_names = [self.get_pipeline_name(p) for p in pipelines] - log_discovered(logger, self.status, "Pipeline", pipeline_names) - for pipeline_detail in pipelines: - pipeline_name = self.get_pipeline_name(pipeline_detail) + # Compute names once and reuse — `get_pipeline_name` is a + # connector-specific call that may not be free. + pipeline_pairs = [(p, self.get_pipeline_name(p)) for p in pipelines] + log_discovered(logger, self.status, "Pipeline", [n for _, n in pipeline_pairs]) + for pipeline_detail, pipeline_name in pipeline_pairs: if filter_by_pipeline( self.source_config.pipelineFilterPattern, pipeline_name, From e36f0ee02f53f5acb423f6adb51a319c9764915e Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Sun, 24 May 2026 15:41:41 -0700 Subject: [PATCH 7/7] fix(filter_visibility): fix basedpyright type errors from CI MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three categories of new basedpyright errors introduced by the filter visibility refactor — all surfaced by the `--baselinemode=discard` CI gate: - log_step_summary: source_name was typed `str` but every call site passes `self.config.serviceName` which pyright infers as `str | None`. Widened to `str | None` and gracefully fall back to '' in the report header. - filter_by_schema / filter_by_table / filter_by_database: I extracted the `filter_name = X if useFqn else Y` ternary into a variable across several connectors. The inline ternary was fine but the extracted variable became `str | None` to pyright (because one branch is an FQN typed as Optional). Widened these three helpers in filters.py to accept `Optional[str]` — they already delegate to `_filter()` which handles None per its own contract. Aligns the wrapper signatures with the actual delegate behavior; one-file fix instead of casting at 9 call sites. - mlflow.get_mlmodels: the existing `cast(RegisteredModel, ...)` told pyright the search call returned a single model, so my `list(...)` wrap produced `list[tuple[str, Any]]` and broke `m.name` access. Fixed the cast to `cast(List[RegisteredModel], list(...))`. That surfaced a pre-existing latent bug: `model.latest_versions` is Optional but was iterated directly — guarded with `or []`. Verification: - `basedpyright -p pyproject.toml --baselinefile .basedpyright/baseline.json --baselinemode=discard` now reports zero new errors (the remaining errors are env-specific airflow/locust/pydoris import warnings already in the baseline; CI has those packages installed). - `make py_format_check` clean. - 79/79 helper + filter_pattern + common_db_source + snowflake tests still pass. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../source/mlmodel/mlflow/metadata.py | 20 +++++++++++++++---- .../src/metadata/utils/filter_visibility.py | 4 ++-- ingestion/src/metadata/utils/filters.py | 17 ++++++++++------ 3 files changed, 29 insertions(+), 12 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/mlmodel/mlflow/metadata.py b/ingestion/src/metadata/ingestion/source/mlmodel/mlflow/metadata.py index b56c5f9ec041..7967f80acaf8 100644 --- a/ingestion/src/metadata/ingestion/source/mlmodel/mlflow/metadata.py +++ b/ingestion/src/metadata/ingestion/source/mlmodel/mlflow/metadata.py @@ -75,16 +75,28 @@ def get_mlmodels( # pylint: disable=arguments-differ """ List and filters models from the registry """ - models = list(cast(RegisteredModel, self.client.search_registered_models())) # noqa: TC006 - log_discovered(logger, self.status, "MlModel", (m.name for m in models)) + # search_registered_models returns List[RegisteredModel]; the prior + # cast() narrowed it to a single RegisteredModel, which made + # `list(...)` give pyright `list[tuple[str, Any]]` and broke + # `m.name` access in this loop. + models = cast("list[RegisteredModel]", list(self.client.search_registered_models())) + log_discovered(logger, self.status, "MlModel", [m.name for m in models]) for model in models: if filter_by_mlmodel(self.source_config.mlModelFilterPattern, mlmodel_name=model.name): log_filtered(logger, self.status, "MlModel", model.name) continue - # Get the latest version + # Get the latest version. `latest_versions` is Optional on the + # mlflow model — the previous code worked only because pyright + # was treating model as Any (via the now-fixed cast). Guard with + # `or []` so the iteration is safe even on models with no + # versions tracked. latest_version: Optional[ModelVersion] = next( # noqa: UP045 - (ver for ver in model.latest_versions if ver.last_updated_timestamp == model.last_updated_timestamp), + ( + ver + for ver in (model.latest_versions or []) + if ver.last_updated_timestamp == model.last_updated_timestamp + ), None, ) if not latest_version: diff --git a/ingestion/src/metadata/utils/filter_visibility.py b/ingestion/src/metadata/utils/filter_visibility.py index 6f10fc60bb71..45eed554a6f4 100644 --- a/ingestion/src/metadata/utils/filter_visibility.py +++ b/ingestion/src/metadata/utils/filter_visibility.py @@ -154,7 +154,7 @@ def log_filtered( def log_step_summary( logger: logging.Logger, status: Status, - source_name: str, + source_name: str | None, ) -> None: """Emit the end-of-step FILTER VISIBILITY REPORT. One log block, framed with grep-friendly markers, listing per entity type: @@ -178,7 +178,7 @@ def log_step_summary( lines = [ "", border, - f" {REPORT_HEADER_PREFIX}: {source_name}", + f" {REPORT_HEADER_PREFIX}: {source_name or ''}", border, " Note: 'Passed filter patterns' = discovered - filter rejections.", " It does not subtract source-side extraction failures or", diff --git a/ingestion/src/metadata/utils/filters.py b/ingestion/src/metadata/utils/filters.py index 02f316f426ca..1e44c83d9d01 100644 --- a/ingestion/src/metadata/utils/filters.py +++ b/ingestion/src/metadata/utils/filters.py @@ -76,11 +76,14 @@ def _filter(filter_pattern: Optional[FilterPattern], name: Optional[str]) -> boo return False -def filter_by_schema(schema_filter_pattern: Optional[FilterPattern], schema_name: str) -> bool: # noqa: UP045 +def filter_by_schema(schema_filter_pattern: Optional[FilterPattern], schema_name: Optional[str]) -> bool: # noqa: UP045 """ Return True if the schema needs to be filtered, False otherwise - Include takes precedence over exclude + Include takes precedence over exclude. None name is treated as + filtered out (matches `_filter`'s contract) so callers can pass + Optional[str] freely (common when the name comes from `X if useFqn + else Y` where one side is FQN-typed as Optional). :param schema_filter_pattern: Model defining schema filtering logic :param schema fqn: table schema fqn @@ -89,11 +92,12 @@ def filter_by_schema(schema_filter_pattern: Optional[FilterPattern], schema_name return _filter(schema_filter_pattern, schema_name) -def filter_by_table(table_filter_pattern: Optional[FilterPattern], table_name: str) -> bool: # noqa: UP045 +def filter_by_table(table_filter_pattern: Optional[FilterPattern], table_name: Optional[str]) -> bool: # noqa: UP045 """ Return True if the table needs to be filtered, False otherwise - Include takes precedence over exclude + Include takes precedence over exclude. None name is treated as + filtered out (matches `_filter`'s contract). :param table_filter_pattern: Model defining schema filtering logic :param table_fqn: table fqn @@ -170,11 +174,12 @@ def filter_by_fqn(fqn_filter_pattern: Optional[FilterPattern], fqn: str) -> bool return _filter(fqn_filter_pattern, fqn) -def filter_by_database(database_filter_pattern: Optional[FilterPattern], database_name: str) -> bool: # noqa: UP045 +def filter_by_database(database_filter_pattern: Optional[FilterPattern], database_name: Optional[str]) -> bool: # noqa: UP045 """ Return True if the schema needs to be filtered, False otherwise - Include takes precedence over exclude + Include takes precedence over exclude. None name is treated as + filtered out (matches `_filter`'s contract). :param database_filter_pattern: Model defining database filtering logic :param database_name: database name