diff --git a/ingestion/src/metadata/ingestion/api/status.py b/ingestion/src/metadata/ingestion/api/status.py index f8d0ad94c21b..dee47830fbcd 100644 --- a/ingestion/src/metadata/ingestion/api/status.py +++ b/ingestion/src/metadata/ingestion/api/status.py @@ -60,6 +60,11 @@ class Status(BaseModel): warnings: Annotated[List[Any], Field(default_factory=list)] # noqa: UP006 filtered: Annotated[List[Dict[str, str]], Field(default_factory=list)] # noqa: UP006 failures: Annotated[List[TruncatedStackTraceError], Field(default_factory=list)] # noqa: UP006 + discovered_counts: Annotated[Dict[str, int], Field(default_factory=dict)] # noqa: UP006 + # True per-entity-type count of filter rejections, even when the name is + # not appended to `filtered` because the per-type cap was hit. Source of + # truth for the end-of-step FILTER VISIBILITY REPORT count. + filtered_counts: Annotated[Dict[str, int], Field(default_factory=dict)] # noqa: UP006 def scanned(self, record: Any) -> None: """ @@ -101,6 +106,40 @@ def warning(self, key: str, reason: str) -> None: def filter(self, key: str, reason: str) -> None: self.filtered.append({key: reason}) + def get_filtered_count(self) -> int: + """True count of filter rejections, including any entries past the + per-entity-type cap that were counted in `filtered_counts` but not + appended to `filtered` to bound memory. + + Handles three populations: + - Helper-driven types (in `filtered_counts`): true count comes + from `filtered_counts`, which keeps growing past the per-type + cap even when the name is not stored in `filtered`. + - Legacy direct `status.filter()` callers: not represented in + `filtered_counts`. Count them by walking `filtered` and + including only entries whose entity-type prefix isn't already + tracked by the helper, so we don't double-count. + - No helper usage at all: degenerate case; fall back to + `len(filtered)`. + """ + if not self.filtered_counts: + return len(self.filtered) + + helper_total = sum(self.filtered_counts.values()) + tracked_types = set(self.filtered_counts) + legacy_count = 0 + for entry in self.filtered: + for reason in entry.values(): + if not any(reason.startswith(f"{t} ") for t in tracked_types): + legacy_count += 1 + return helper_total + legacy_count + + def record_discovered(self, entity_type: str, count: int) -> None: + """Record the count of entities discovered from the source before any + filter pattern is applied. Used by log_step_summary to report the + discovered vs. filtered vs. kept breakdown.""" + self.discovered_counts[entity_type] = self.discovered_counts.get(entity_type, 0) + count + def as_string(self) -> str: parts = [] for key, value in self.__dict__.items(): diff --git a/ingestion/src/metadata/ingestion/api/step.py b/ingestion/src/metadata/ingestion/api/step.py index 4069fcd482e6..ed527839966e 100644 --- a/ingestion/src/metadata/ingestion/api/step.py +++ b/ingestion/src/metadata/ingestion/api/step.py @@ -117,7 +117,7 @@ def from_step(cls, step: Step) -> "Summary": updated_records=len(step.status.updated_records), warnings=len(step.status.warnings), errors=len(step.status.failures), - filtered=len(step.status.filtered), + filtered=step.status.get_filtered_count(), failures=step.status.failures[0:10] if step.status.failures else None, progress=progress_tracker.get_progress_as_dict() or None, operationMetrics=operation_metrics.get_summary() or None, diff --git a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py index e93846d0879d..b51e673e943d 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py @@ -73,6 +73,11 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import get_connection, test_connection_common from metadata.utils import fqn +from metadata.utils.filter_visibility import ( + log_discovered, + log_filtered, + log_step_summary, +) from metadata.utils.filters import filter_by_dashboard, filter_by_project from metadata.utils.logger import ingestion_logger @@ -418,6 +423,10 @@ def yield_dashboard_usage(self, *args, **kwargs) -> Iterable[DashboardUsage]: return def close(self): + try: + log_step_summary(logger, self.status, self.config.serviceName) + except Exception: + logger.warning("Filter visibility report failed; continuing close()", exc_info=True) self.metadata.close() def get_services(self) -> Iterable[WorkflowSource]: @@ -566,15 +575,26 @@ def get_dashboard(self) -> Any: """ Method to iterate through dashboard lists filter dashboards & yield dashboard details """ - for dashboard in self.get_dashboards_list(): - dashboard_name = self.get_dashboard_name(dashboard) + # `or []` matches the Optional[List[Any]] contract used by Pipeline / + # Messaging sources — `list(None)` would raise TypeError if a + # concrete connector returned None for an empty workspace. Avoid + # the extra shallow copy when the source already gave us a list. + dashboards_result = self.get_dashboards_list() or [] + dashboards = dashboards_result if isinstance(dashboards_result, list) else list(dashboards_result) + # Compute names once and reuse — `get_dashboard_name` is a + # connector-specific call that may not be free. + dashboard_pairs = [(d, self.get_dashboard_name(d)) for d in dashboards] + log_discovered(logger, self.status, "Dashboard", [n for _, n in dashboard_pairs]) + for dashboard, dashboard_name in dashboard_pairs: if filter_by_dashboard( self.source_config.dashboardFilterPattern, dashboard_name, ): - self.status.filter( + log_filtered( + logger, + self.status, + "Dashboard", dashboard_name, - "Dashboard Filtered Out", ) continue @@ -594,9 +614,19 @@ def get_dashboard(self) -> Any: self.source_config.projectFilterPattern, project_name, ): - self.status.filter( - project_name, - "Project / Workspace Filtered Out", + # project_name was reassigned to a list above when the + # connector returned multiple project names; coerce + # to a single string so the stored reason and report + # output stay readable (and _entity_type_from_reason + # can still parse the reason cleanly). + project_label = ( + project_name if isinstance(project_name, str) else ", ".join(str(p) for p in project_name) + ) + log_filtered( + logger, + self.status, + "Project", + project_label, ) continue diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py b/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py index 96e07b80a357..479623b77815 100644 --- a/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py @@ -108,6 +108,7 @@ from metadata.utils import fqn from metadata.utils.credentials import GOOGLE_CREDENTIALS from metadata.utils.execution_time_tracker import calculate_execution_time +from metadata.utils.filter_visibility import log_discovered, log_filtered from metadata.utils.filters import filter_by_database, filter_by_schema from metadata.utils.helpers import retry_with_docker_host from metadata.utils.logger import ingestion_logger @@ -663,7 +664,15 @@ def _get_filtered_datasets(self, project_id: str) -> List[str]: # noqa: UP006 ] def _get_filtered_schema_names(self, return_fqn: bool = False, add_to_status: bool = True) -> Iterable[str]: - for schema_name in self.get_raw_database_schema_names(): + # Only materialize when log_discovered needs the count up front; + # mark-deleted paths iterate once with add_to_status=False so + # streaming saves O(n) memory on large projects. + if add_to_status: + raw_names: Iterable[str] = list(self.get_raw_database_schema_names()) + log_discovered(logger, self.status, "Schema", raw_names) + else: + raw_names = self.get_raw_database_schema_names() + for schema_name in raw_names: schema_fqn = fqn.build( self.metadata, entity_type=DatabaseSchema, @@ -671,12 +680,20 @@ def _get_filtered_schema_names(self, return_fqn: bool = False, add_to_status: bo database_name=self.context.get().database, schema_name=schema_name, ) + filter_name = schema_fqn if self.source_config.useFqnForFiltering else schema_name if filter_by_schema( self.source_config.schemaFilterPattern, - schema_fqn if self.source_config.useFqnForFiltering else schema_name, + filter_name, ): if add_to_status: - self.status.filter(schema_fqn, "Schema Filtered Out") + log_filtered( + logger, + self.status, + "Schema", + schema_name, + matched_against=filter_name, + use_fqn_for_filtering=self.source_config.useFqnForFiltering, + ) continue if self.incremental.enabled: @@ -826,6 +843,7 @@ def get_database_names_raw(self) -> Iterable[str]: yield from self.project_ids def get_database_names(self) -> Iterable[str]: + log_discovered(logger, self.status, "Database", self.project_ids) for project_id in self.project_ids: database_fqn = fqn.build( self.metadata, @@ -833,11 +851,19 @@ def get_database_names(self) -> Iterable[str]: service_name=self.context.get().database_service, database_name=project_id, ) + filter_name = database_fqn if self.source_config.useFqnForFiltering else project_id if filter_by_database( self.source_config.databaseFilterPattern, - database_fqn if self.source_config.useFqnForFiltering else project_id, + filter_name, ): - self.status.filter(database_fqn, "Database Filtered out") + log_filtered( + logger, + self.status, + "Database", + project_id, + matched_against=filter_name, + use_fqn_for_filtering=self.source_config.useFqnForFiltering, + ) else: try: self.set_inspector(database_name=project_id) diff --git a/ingestion/src/metadata/ingestion/source/database/common_db_source.py b/ingestion/src/metadata/ingestion/source/database/common_db_source.py index da26ad973a88..64ba524f2e15 100644 --- a/ingestion/src/metadata/ingestion/source/database/common_db_source.py +++ b/ingestion/src/metadata/ingestion/source/database/common_db_source.py @@ -73,6 +73,11 @@ calculate_execution_time, calculate_execution_time_generator, ) +from metadata.utils.filter_visibility import ( + log_discovered, + log_filtered, + log_step_summary, +) from metadata.utils.filters import filter_by_table from metadata.utils.helpers import retry_with_docker_host from metadata.utils.logger import ingestion_logger @@ -365,11 +370,21 @@ def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, TableType]]]: schema_name = self.context.get().database_schema if self.source_config.includeTables: try: - table_iter = self.query_table_names_and_types(schema_name) + # Default impl returns a list; only materialize if a subclass + # returns a generator. Avoids an O(n) shallow copy on the + # common path for large schemas. + table_result = self.query_table_names_and_types(schema_name) + table_iter = table_result if isinstance(table_result, list) else list(table_result) except Exception as err: logger.warning(f"Fetching table list failed for schema {schema_name} due to - {err}") logger.debug(traceback.format_exc()) table_iter = [] + log_discovered( + logger, + self.status, + "Table", + (item.name for item in table_iter), + ) for table_and_type in table_iter: try: table_name = self.standardize_table_name(schema_name, table_and_type.name) @@ -382,13 +397,18 @@ def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, TableType]]]: table_name=table_name, skip_es_search=True, ) + filter_name = table_fqn if self.source_config.useFqnForFiltering else table_name if filter_by_table( self.source_config.tableFilterPattern, - (table_fqn if self.source_config.useFqnForFiltering else table_name), + filter_name, ): - self.status.filter( - table_fqn, - "Table Filtered Out", + log_filtered( + logger, + self.status, + "Table", + table_name, + matched_against=filter_name, + use_fqn_for_filtering=self.source_config.useFqnForFiltering, ) continue except Exception as err: @@ -399,11 +419,18 @@ def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, TableType]]]: if self.source_config.includeViews: try: - view_iter = self.query_view_names_and_types(schema_name) + view_result = self.query_view_names_and_types(schema_name) + view_iter = view_result if isinstance(view_result, list) else list(view_result) except Exception as err: logger.warning(f"Fetching view list failed for schema {schema_name} due to - {err}") logger.debug(traceback.format_exc()) view_iter = [] + log_discovered( + logger, + self.status, + "Table", + (item.name for item in view_iter), + ) for view_and_type in view_iter: try: view_name = self.standardize_table_name(schema_name, view_and_type.name) @@ -416,13 +443,18 @@ def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, TableType]]]: table_name=view_name, ) + filter_name = view_fqn if self.source_config.useFqnForFiltering else view_name if filter_by_table( self.source_config.tableFilterPattern, - (view_fqn if self.source_config.useFqnForFiltering else view_name), + filter_name, ): - self.status.filter( - view_fqn, - "Table Filtered Out", + log_filtered( + logger, + self.status, + "Table", + view_name, + matched_against=filter_name, + use_fqn_for_filtering=self.source_config.useFqnForFiltering, ) continue except Exception as err: @@ -746,6 +778,13 @@ def inspector(self) -> Inspector: return self._inspector_map[thread_id] def close(self): + # Defense-in-depth: helper has its own internal try/except, but a + # bug in close()'s observability step must never block the real + # cleanup work (engine release, SSL temp file cleanup). + try: + log_step_summary(logger, self.status, self.config.serviceName) + except Exception: + logger.warning("Filter visibility report failed; continuing close()", exc_info=True) self._release_engine() if hasattr(self, "ssl_manager") and self.ssl_manager: self.ssl_manager = cast(SSLManager, self.ssl_manager) # noqa: TC006 diff --git a/ingestion/src/metadata/ingestion/source/database/database_service.py b/ingestion/src/metadata/ingestion/source/database/database_service.py index 31628601ac6a..8caa7e118d2a 100644 --- a/ingestion/src/metadata/ingestion/source/database/database_service.py +++ b/ingestion/src/metadata/ingestion/source/database/database_service.py @@ -71,6 +71,7 @@ from metadata.ingestion.source.connections import test_connection_common from metadata.utils import fqn from metadata.utils.execution_time_tracker import calculate_execution_time +from metadata.utils.filter_visibility import log_discovered, log_filtered from metadata.utils.filters import filter_by_schema, filter_by_stored_procedure from metadata.utils.logger import ingestion_logger from metadata.utils.owner_utils import get_owner_from_config @@ -545,27 +546,57 @@ def register_record_schema_request(self, schema_request: CreateDatabaseSchemaReq def _get_filtered_database_names(self, return_fqn: bool = False, add_to_status: bool = True) -> Iterable[str]: """ - Get filtered database names based on the database filter pattern + Get filtered database names based on the database filter pattern. + + Note: this is a maintenance-pass helper (called from + mark_databases_as_deleted), not the main ingestion path. Discovery + logging lives in each connector's get_database_names() to avoid + double-counting; this method only emits filter-rejection logs when + add_to_status=True. The raw names iterable is consumed once and + streamed — never materialized — so mark-deleted stays O(1) memory + on large catalogs. """ - database_names_iterable = getattr(self, "get_database_names_raw", self.get_database_names)() - for database_name in database_names_iterable: + raw_names_iter = getattr(self, "get_database_names_raw", self.get_database_names)() + for database_name in raw_names_iter: database_fqn = fqn.build( self.metadata, entity_type=Database, service_name=self.context.get().database_service, database_name=database_name, ) + filter_name = database_fqn if self.source_config.useFqnForFiltering else database_name if filter_by_schema( self.source_config.databaseFilterPattern, - (database_fqn if self.source_config.useFqnForFiltering else database_name), + filter_name, ): if add_to_status: - self.status.filter(database_fqn, "Database Filtered Out") + # Store the raw database name; expose the FQN via + # matched_against. Matches the convention used by + # Dashboard/Pipeline/Topic/MLModel sources and keeps + # the report column readable when useFqnForFiltering + # is False. + log_filtered( + logger, + self.status, + "Database", + database_name, + matched_against=filter_name, + use_fqn_for_filtering=self.source_config.useFqnForFiltering, + ) continue yield database_fqn if return_fqn else database_name def _get_filtered_schema_names(self, return_fqn: bool = False, add_to_status: bool = True) -> Iterable[str]: - for schema_name in self.get_raw_database_schema_names(): + # Only materialize when log_discovered actually needs the count up + # front. Mark-deleted maintenance paths call us with + # add_to_status=False and iterate once — streaming saves O(n) memory + # on large schemas. + if add_to_status: + raw_names: Iterable[str] = list(self.get_raw_database_schema_names()) + log_discovered(logger, self.status, "Schema", raw_names) + else: + raw_names = self.get_raw_database_schema_names() + for schema_name in raw_names: schema_fqn = fqn.build( self.metadata, entity_type=DatabaseSchema, @@ -573,12 +604,20 @@ def _get_filtered_schema_names(self, return_fqn: bool = False, add_to_status: bo database_name=self.context.get().database, schema_name=schema_name, ) + filter_name = schema_fqn if self.source_config.useFqnForFiltering else schema_name if filter_by_schema( self.source_config.schemaFilterPattern, - schema_fqn if self.source_config.useFqnForFiltering else schema_name, + filter_name, ): if add_to_status: - self.status.filter(schema_fqn, "Schema Filtered Out") + log_filtered( + logger, + self.status, + "Schema", + schema_name, + matched_against=filter_name, + use_fqn_for_filtering=self.source_config.useFqnForFiltering, + ) continue yield schema_fqn if return_fqn else schema_name @@ -771,8 +810,12 @@ def mark_databases_as_deleted(self): # filtered out, as well as any databases that were processed in this run all_database_fqns = set() - # Get all databases from the source (both filtered-in and filtered-out) - for database_name in self._get_filtered_database_names(): + # Get all databases from the source (both filtered-in and + # filtered-out). Pass add_to_status=False — the main ingestion + # path already called log_filtered for each rejected database, + # so recording them again here would double-count Status.filtered + # and the report's "Passed filter patterns" math would go negative. + for database_name in self._get_filtered_database_names(add_to_status=False): database_fqn = fqn.build( self.metadata, entity_type=Database, diff --git a/ingestion/src/metadata/ingestion/source/database/mssql/metadata.py b/ingestion/src/metadata/ingestion/source/database/mssql/metadata.py index c4ebafe619fc..6aa47acd11db 100644 --- a/ingestion/src/metadata/ingestion/source/database/mssql/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/mssql/metadata.py @@ -61,6 +61,7 @@ ) from metadata.ingestion.source.database.multi_db_source import MultiDBSource from metadata.utils import fqn +from metadata.utils.filter_visibility import log_discovered, log_filtered from metadata.utils.filters import filter_by_database from metadata.utils.logger import ingestion_logger from metadata.utils.sqa_utils import update_mssql_ischema_names @@ -198,7 +199,9 @@ def get_database_names(self) -> Iterable[str]: self.set_inspector(database_name=configured_db) yield configured_db else: - for new_database in self.get_database_names_raw(): + database_names = list(self.get_database_names_raw()) + log_discovered(logger, self.status, "Database", database_names) + for new_database in database_names: database_fqn = fqn.build( self.metadata, entity_type=Database, @@ -206,11 +209,19 @@ def get_database_names(self) -> Iterable[str]: database_name=new_database, ) + filter_name = database_fqn if self.source_config.useFqnForFiltering else new_database if filter_by_database( self.source_config.databaseFilterPattern, - (database_fqn if self.source_config.useFqnForFiltering else new_database), + filter_name, ): - self.status.filter(database_fqn, "Database Filtered Out") + log_filtered( + logger, + self.status, + "Database", + new_database, + matched_against=filter_name, + use_fqn_for_filtering=self.source_config.useFqnForFiltering, + ) continue try: diff --git a/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py b/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py index 39baa542482b..105bfe6eb35f 100644 --- a/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py @@ -82,6 +82,7 @@ get_view_definition, ) from metadata.utils import fqn +from metadata.utils.filter_visibility import log_discovered, log_filtered from metadata.utils.filters import filter_by_database from metadata.utils.importer import import_side_effects from metadata.utils.logger import ingestion_logger @@ -180,7 +181,9 @@ def get_database_names(self) -> Iterable[str]: self.set_schema_description_map() yield configured_db else: - for new_database in self.get_database_names_raw(): + database_names = list(self.get_database_names_raw()) + log_discovered(logger, self.status, "Database", database_names) + for new_database in database_names: database_fqn = fqn.build( self.metadata, entity_type=Database, @@ -188,11 +191,19 @@ def get_database_names(self) -> Iterable[str]: database_name=new_database, ) + filter_name = database_fqn if self.source_config.useFqnForFiltering else new_database if filter_by_database( self.source_config.databaseFilterPattern, - (database_fqn if self.source_config.useFqnForFiltering else new_database), + filter_name, ): - self.status.filter(database_fqn, "Database Filtered Out") + log_filtered( + logger, + self.status, + "Database", + new_database, + matched_against=filter_name, + use_fqn_for_filtering=self.source_config.useFqnForFiltering, + ) continue try: diff --git a/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py b/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py index 2d2331da1307..2fab846e025a 100644 --- a/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py @@ -96,6 +96,7 @@ ) from metadata.utils import fqn from metadata.utils.execution_time_tracker import calculate_execution_time_generator +from metadata.utils.filter_visibility import log_discovered, log_filtered from metadata.utils.filters import filter_by_database from metadata.utils.helpers import clean_up_starting_ending_double_quotes_in_string from metadata.utils.logger import ingestion_logger @@ -285,7 +286,9 @@ def get_database_names(self) -> Iterable[str]: self.set_external_location_map(configured_db) yield configured_db else: - for new_database in self.get_database_names_raw(): + database_names = list(self.get_database_names_raw()) + log_discovered(logger, self.status, "Database", database_names) + for new_database in database_names: database_fqn = fqn.build( self.metadata, entity_type=Database, @@ -293,11 +296,19 @@ def get_database_names(self) -> Iterable[str]: database_name=new_database, ) + filter_name = database_fqn if self.source_config.useFqnForFiltering else new_database if filter_by_database( self.source_config.databaseFilterPattern, - (database_fqn if self.source_config.useFqnForFiltering else new_database), + filter_name, ): - self.status.filter(database_fqn, "Database Filtered Out") + log_filtered( + logger, + self.status, + "Database", + new_database, + matched_against=filter_name, + use_fqn_for_filtering=self.source_config.useFqnForFiltering, + ) continue try: diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py b/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py index cee2ba80871a..14a26e16ba59 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py @@ -128,6 +128,7 @@ normalize_names, ) from metadata.utils import fqn +from metadata.utils.filter_visibility import log_discovered, log_filtered from metadata.utils.filters import filter_by_database from metadata.utils.logger import ingestion_logger from metadata.utils.sqlalchemy_utils import ( @@ -386,11 +387,6 @@ def get_configured_database(self) -> Optional[str]: # noqa: UP045 def get_database_names_raw(self) -> Iterable[str]: results = self.connection.execute(text(SNOWFLAKE_GET_DATABASES)).fetchall() database_names = [list(res)[1] for res in results] - logger.info( - "SHOW DATABASES returned %d database(s) visible to the ingestion role", - len(database_names), - ) - logger.debug("Databases visible to the ingestion role: %s", database_names) yield from database_names def get_database_names(self) -> Iterable[str]: @@ -406,7 +402,9 @@ def get_database_names(self) -> Iterable[str]: self.set_database_tags_map(configured_db) yield configured_db else: - for new_database in self.get_database_names_raw(): + database_names = list(self.get_database_names_raw()) + log_discovered(logger, self.status, "Database", database_names) + for new_database in database_names: database_fqn = fqn.build( self.metadata, entity_type=Database, @@ -421,14 +419,14 @@ def get_database_names(self) -> Iterable[str]: self.source_config.databaseFilterPattern, filter_name, ): - logger.info( - "Filtering out database '%s': did not pass databaseFilterPattern " - "(matched against '%s', useFqnForFiltering=%s)", + log_filtered( + logger, + self.status, + "Database", new_database, - filter_name, - self.source_config.useFqnForFiltering, + matched_against=filter_name, + use_fqn_for_filtering=self.source_config.useFqnForFiltering, ) - self.status.filter(database_fqn, "Database Filtered Out") continue try: diff --git a/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py b/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py index 591daef1c5d9..b4b6eae34a37 100644 --- a/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py +++ b/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py @@ -49,6 +49,11 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import get_connection, test_connection_common from metadata.utils import fqn +from metadata.utils.filter_visibility import ( + log_discovered, + log_filtered, + log_step_summary, +) from metadata.utils.filters import filter_by_topic from metadata.utils.helpers import retry_with_docker_host from metadata.utils.logger import ingestion_logger @@ -206,15 +211,24 @@ def get_topic_name(self, topic_details: Any) -> str: """ def get_topic(self) -> Any: - for topic_details in self.get_topic_list(): - topic_name = self.get_topic_name(topic_details) + # `or []` for null safety + isinstance check to skip the redundant + # shallow copy when the source already returned a list. + topics_result = self.get_topic_list() or [] + topics = topics_result if isinstance(topics_result, list) else list(topics_result) + # Compute names once and reuse — `get_topic_name` is a + # connector-specific call that may not be free. + topic_pairs = [(t, self.get_topic_name(t)) for t in topics] + log_discovered(logger, self.status, "Topic", [n for _, n in topic_pairs]) + for topic_details, topic_name in topic_pairs: if filter_by_topic( self.source_config.topicFilterPattern, topic_name, ): - self.status.filter( + log_filtered( + logger, + self.status, + "Topic", topic_name, - "Topic Filtered Out", ) continue yield topic_details @@ -257,3 +271,7 @@ def register_record(self, topic_request: CreateTopicRequest) -> None: def close(self): """By default, nothing to close""" + try: + log_step_summary(logger, self.status, self.config.serviceName) + except Exception: + logger.warning("Filter visibility report failed; continuing close()", exc_info=True) diff --git a/ingestion/src/metadata/ingestion/source/mlmodel/mlflow/metadata.py b/ingestion/src/metadata/ingestion/source/mlmodel/mlflow/metadata.py index 8f2676c2ffd7..7967f80acaf8 100644 --- a/ingestion/src/metadata/ingestion/source/mlmodel/mlflow/metadata.py +++ b/ingestion/src/metadata/ingestion/source/mlmodel/mlflow/metadata.py @@ -45,6 +45,7 @@ from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.mlmodel.mlmodel_service import MlModelServiceSource +from metadata.utils.filter_visibility import log_discovered, log_filtered from metadata.utils.filters import filter_by_mlmodel from metadata.utils.helpers import clean_uri from metadata.utils.logger import ingestion_logger @@ -74,17 +75,28 @@ def get_mlmodels( # pylint: disable=arguments-differ """ List and filters models from the registry """ - for model in cast(RegisteredModel, self.client.search_registered_models()): # noqa: TC006 + # search_registered_models returns List[RegisteredModel]; the prior + # cast() narrowed it to a single RegisteredModel, which made + # `list(...)` give pyright `list[tuple[str, Any]]` and broke + # `m.name` access in this loop. + models = cast("list[RegisteredModel]", list(self.client.search_registered_models())) + log_discovered(logger, self.status, "MlModel", [m.name for m in models]) + for model in models: if filter_by_mlmodel(self.source_config.mlModelFilterPattern, mlmodel_name=model.name): - self.status.filter( - model.name, - "MlModel name pattern not allowed", - ) + log_filtered(logger, self.status, "MlModel", model.name) continue - # Get the latest version + # Get the latest version. `latest_versions` is Optional on the + # mlflow model — the previous code worked only because pyright + # was treating model as Any (via the now-fixed cast). Guard with + # `or []` so the iteration is safe even on models with no + # versions tracked. latest_version: Optional[ModelVersion] = next( # noqa: UP045 - (ver for ver in model.latest_versions if ver.last_updated_timestamp == model.last_updated_timestamp), + ( + ver + for ver in (model.latest_versions or []) + if ver.last_updated_timestamp == model.last_updated_timestamp + ), None, ) if not latest_version: diff --git a/ingestion/src/metadata/ingestion/source/mlmodel/mlmodel_service.py b/ingestion/src/metadata/ingestion/source/mlmodel/mlmodel_service.py index 997886639018..91dad88ba173 100644 --- a/ingestion/src/metadata/ingestion/source/mlmodel/mlmodel_service.py +++ b/ingestion/src/metadata/ingestion/source/mlmodel/mlmodel_service.py @@ -51,6 +51,7 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import get_connection, test_connection_common from metadata.utils import fqn +from metadata.utils.filter_visibility import log_step_summary from metadata.utils.helpers import retry_with_docker_host from metadata.utils.logger import ingestion_logger @@ -174,6 +175,10 @@ def _get_algorithm(self, *args, **kwargs) -> str: def close(self): """By default, nothing to close""" + try: + log_step_summary(logger, self.status, self.config.serviceName) + except Exception: + logger.warning("Filter visibility report failed; continuing close()", exc_info=True) def test_connection(self) -> None: test_connection_common(self.metadata, self.connection_obj, self.service_connection) diff --git a/ingestion/src/metadata/ingestion/source/mlmodel/sagemaker/metadata.py b/ingestion/src/metadata/ingestion/source/mlmodel/sagemaker/metadata.py index 4ca73985f9f1..b71f99f9c37d 100644 --- a/ingestion/src/metadata/ingestion/source/mlmodel/sagemaker/metadata.py +++ b/ingestion/src/metadata/ingestion/source/mlmodel/sagemaker/metadata.py @@ -46,6 +46,7 @@ from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.mlmodel.mlmodel_service import MlModelServiceSource +from metadata.utils.filter_visibility import log_discovered, log_filtered from metadata.utils.filters import filter_by_mlmodel from metadata.utils.logger import ingestion_logger @@ -134,16 +135,14 @@ def get_mlmodels( # pylint: disable=arguments-differ else: logger.debug(f"No registered models found under sagemaker unified studio") # noqa: F541 + log_discovered(logger, self.status, "MlModel", (m["ModelName"] for m in models)) for model in models: try: if filter_by_mlmodel( self.source_config.mlModelFilterPattern, mlmodel_name=model["ModelName"], ): - self.status.filter( - model["ModelName"], - "MlModel name pattern not allowed", - ) + log_filtered(logger, self.status, "MlModel", model["ModelName"]) continue yield SageMakerModel( name=model["ModelName"], diff --git a/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py b/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py index eadba98a9d1e..550e8583c489 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py @@ -58,6 +58,11 @@ from metadata.ingestion.source.pipeline.openlineage.models import TableDetails from metadata.ingestion.source.pipeline.openlineage.utils import FQNNotFoundException from metadata.utils import fqn +from metadata.utils.filter_visibility import ( + log_discovered, + log_filtered, + log_step_summary, +) from metadata.utils.filters import filter_by_pipeline from metadata.utils.helpers import retry_with_docker_host from metadata.utils.logger import ingestion_logger @@ -345,6 +350,10 @@ def yield_tag(self, pipeline_details: Any) -> Iterable[Either[OMetaTagAndClassif def close(self): """Method to implement any required logic after the ingestion process is completed""" + try: + log_step_summary(logger, self.status, self.config.serviceName) + except Exception: + logger.warning("Filter visibility report failed; continuing close()", exc_info=True) self.metadata.compute_percentile(Pipeline, self.today) def get_services(self) -> Iterable[WorkflowSource]: @@ -354,15 +363,24 @@ def yield_create_request_pipeline_service(self, config: WorkflowSource): yield Either(right=self.metadata.get_create_service_from_source(entity=PipelineService, config=config)) def get_pipeline(self) -> Any: - for pipeline_detail in self.get_pipelines_list(): - pipeline_name = self.get_pipeline_name(pipeline_detail) + # `or []` for null safety + isinstance check to skip the redundant + # shallow copy when the source already returned a list. + pipelines_result = self.get_pipelines_list() or [] + pipelines = pipelines_result if isinstance(pipelines_result, list) else list(pipelines_result) + # Compute names once and reuse — `get_pipeline_name` is a + # connector-specific call that may not be free. + pipeline_pairs = [(p, self.get_pipeline_name(p)) for p in pipelines] + log_discovered(logger, self.status, "Pipeline", [n for _, n in pipeline_pairs]) + for pipeline_detail, pipeline_name in pipeline_pairs: if filter_by_pipeline( self.source_config.pipelineFilterPattern, pipeline_name, ): - self.status.filter( + log_filtered( + logger, + self.status, + "Pipeline", pipeline_name, - "Pipeline Filtered Out", ) continue yield pipeline_detail diff --git a/ingestion/src/metadata/ingestion/source/storage/storage_service.py b/ingestion/src/metadata/ingestion/source/storage/storage_service.py index 29f8c2c57cd4..887695fa7fef 100644 --- a/ingestion/src/metadata/ingestion/source/storage/storage_service.py +++ b/ingestion/src/metadata/ingestion/source/storage/storage_service.py @@ -64,6 +64,7 @@ DataFrameColumnParser, fetch_dataframe_first_chunk, ) +from metadata.utils.filter_visibility import log_filtered, log_step_summary from metadata.utils.helpers import retry_with_docker_host from metadata.utils.logger import ingestion_logger from metadata.utils.path_pattern import ( @@ -302,6 +303,10 @@ def yield_create_container_requests(self, container_details: Any) -> Iterable[Ei def close(self): """By default, nothing needs to be closed""" + try: + log_step_summary(logger, self.status, self.config.serviceName) + except Exception: + logger.warning("Filter visibility report failed; continuing close()", exc_info=True) def get_services(self) -> Iterable[WorkflowSource]: yield self.config @@ -666,11 +671,22 @@ def filter_manifest_entries(self, bucket_name: str, entries: List[MetadataEntry] # 2. Pipeline-level containerFilterPattern against the dataPath. if pattern and filter_by_container(pattern, path): - logger.info( - f"Skipping manifest entry '{path}' in bucket '{bucket_name}' — filtered by containerFilterPattern." - ) if hasattr(self, "status") and hasattr(self.status, "filter"): - self.status.filter(path, "containerFilterPattern excluded") + # Use bucket/path as the stored name so operators can + # tell which bucket an entry came from — dataPath alone + # is not unique across buckets. + log_filtered( + logger, + self.status, + "Container", + f"{bucket_name}/{path}", + matched_against=path, + ) + else: + logger.info( + f"Skipping manifest entry '{path}' in bucket '{bucket_name}' " + f"— filtered by containerFilterPattern." + ) continue filtered.append(entry) diff --git a/ingestion/src/metadata/utils/filter_visibility.py b/ingestion/src/metadata/utils/filter_visibility.py new file mode 100644 index 000000000000..45eed554a6f4 --- /dev/null +++ b/ingestion/src/metadata/utils/filter_visibility.py @@ -0,0 +1,269 @@ +# Copyright 2025 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Centralized helpers for logging connector filter visibility. + +When users don't see an entity after ingestion (database, schema, table, +dashboard, pipeline, topic, container, mlmodel), they need to be able to +distinguish: + + 1. Source-side permissions — the connecting user can't see the object + 2. Filter config — includes/excludes patterns removed it + +These helpers emit a consistent "discovered → filtered → kept" trail across +every connector family and end each step with a single FILTER VISIBILITY +REPORT block. The report is the diagnostic anchor: one log block, easy to +grep ("FILTER VISIBILITY REPORT"), containing exactly the information that +isn't derivable from elsewhere. + +Defensive contract: these helpers are observability only. Any exception +raised internally is caught and logged as a single warning; helpers MUST +NOT propagate failures to the connector ingestion path. + +Memory profile: we deliberately store only the *diff* — discovered count +(int) plus the names of items the filter rejected (plus their reasons). +Visible names and kept names are not stored; they're derivable as counts +and the kept items show up in normal ingestion logs / Status.records. + +Bounded growth: per-entity-type cap on stored filtered names +(MAX_FILTERED_ENTRIES_PER_TYPE). Past the cap, only the true count keeps +growing — names beyond the cap are dropped and the report annotates the +truncation. Prevents OOM on pathological catalogs (e.g., 10M filtered +S3 objects) while preserving correctness of counts. +""" + +import logging +from collections.abc import Sized +from typing import Iterable # noqa: UP035 + +from metadata.ingestion.api.status import Status + +REPORT_HEADER_PREFIX = "FILTER VISIBILITY REPORT" +MAX_FILTERED_ENTRIES_PER_TYPE = 50_000 +_REASON_SUFFIX = "Filtered Out" + + +def log_discovered( + logger: logging.Logger, + status: Status, + entity_type: str, + names: Iterable[str], +) -> None: + """Log the count of entities visible from the source before any filter + is applied, and record the count on Status so the end-of-step report + can compute discovered vs. kept. Names are emitted at DEBUG only — the + actionable diff (what got filtered) lives in log_filtered + the + end-of-step report; the full visible list would explode logs on large + catalogs without adding actionable information. + + Memory: avoids materializing `names` when DEBUG isn't enabled. If the + caller already passes a Sized collection (list / tuple), we use len() + directly — zero extra allocation. For generators we either stream-count + (when DEBUG is off) or materialize once (when DEBUG is on, so we can + emit the full list). + + Exceptions are swallowed: observability must never break ingestion.""" + try: + debug_on = logger.isEnabledFor(logging.DEBUG) + if isinstance(names, Sized): + count = len(names) + elif debug_on: + names = list(names) + count = len(names) + else: + count = sum(1 for _ in names) + status.record_discovered(entity_type, count) + logger.info( + "Discovered %d %s(s) visible to the ingestion user", + count, + entity_type.lower(), + ) + if debug_on: + logger.debug( + "%s(s) visible to the ingestion user: %s", + entity_type, + names, + ) + except Exception: + logger.warning( + "log_discovered failed for entity_type=%r; continuing ingestion", + entity_type, + exc_info=True, + ) + + +def log_filtered( + logger: logging.Logger, + status: Status, + entity_type: str, + name: str, + *, + matched_against: str | None = None, + use_fqn_for_filtering: bool | None = None, +) -> None: + """Log a filter-pattern rejection and record it on Status. The reason + string stored on Status is rich enough that the end-of-step report can + reproduce the full diagnostic (which pattern field, what was matched + against, whether FQN filtering was on) without needing extra state. + + Per-entity-type cap: once a type has accumulated + MAX_FILTERED_ENTRIES_PER_TYPE entries in Status.filtered, subsequent + rejections only bump Status.filtered_counts (cheap) and drop the name + to avoid unbounded memory growth. The report flags the truncation. + + Exceptions are swallowed: observability must never break ingestion.""" + try: + current_count = status.filtered_counts.get(entity_type, 0) + status.filtered_counts[entity_type] = current_count + 1 + + pattern_field = _pattern_field(entity_type) + detail_parts = [f"did not pass {pattern_field}"] + if matched_against is not None and matched_against != name: + detail_parts.append(f"matched against '{matched_against}'") + if use_fqn_for_filtering is not None: + detail_parts.append(f"useFqnForFiltering={use_fqn_for_filtering}") + detail = ", ".join(detail_parts) + + if current_count < MAX_FILTERED_ENTRIES_PER_TYPE: + logger.info("Filtering out %s '%s': %s", entity_type.lower(), name, detail) + reason = f"{entity_type} {_REASON_SUFFIX}: {detail}" + status.filter(name, reason) + elif current_count == MAX_FILTERED_ENTRIES_PER_TYPE: + logger.warning( + "Reached cap of %d filtered %s names; subsequent rejections will be counted but not stored", + MAX_FILTERED_ENTRIES_PER_TYPE, + entity_type.lower(), + ) + except Exception: + logger.warning( + "log_filtered failed for entity_type=%r name=%r; continuing ingestion", + entity_type, + name, + exc_info=True, + ) + + +def log_step_summary( + logger: logging.Logger, + status: Status, + source_name: str | None, +) -> None: + """Emit the end-of-step FILTER VISIBILITY REPORT. One log block, + framed with grep-friendly markers, listing per entity type: + - count visible to the ingestion user + - count + names + reasons of everything the filter dropped + (with "and N more (truncated at cap)" footer if applicable) + - count that passed all filter patterns (this is a filter-decision + count, NOT the final ingested count; downstream extraction + failures and secondary filters like projectFilterPattern are + not subtracted — see the note printed inside the report) + No-op when there's nothing to report (e.g., a sink-only step). + + Exceptions are swallowed: observability must never break ingestion.""" + try: + by_type = _group_filtered_by_entity_type(status) + entity_types = sorted(set(status.discovered_counts) | set(by_type) | set(status.filtered_counts)) + if not entity_types: + return + + border = "=" * 70 + lines = [ + "", + border, + f" {REPORT_HEADER_PREFIX}: {source_name or ''}", + border, + " Note: 'Passed filter patterns' = discovered - filter rejections.", + " It does not subtract source-side extraction failures or", + " secondary filters (e.g., projectFilterPattern on dashboards).", + " For the final ingested count, see Status.records / OpenMetadata.", + ] + for entity_type in entity_types: + lines.extend(_format_entity_section(status, entity_type, by_type.get(entity_type, []))) + lines.append(border) + logger.info("\n".join(lines)) + except Exception: + logger.warning( + "log_step_summary failed for source=%r; continuing ingestion", + source_name, + exc_info=True, + ) + + +def _format_entity_section( + status: Status, + entity_type: str, + filtered_entries: list[tuple[str, str]], +) -> list[str]: + """Format a single entity type's section of the report. Visible and + kept are counts only; filtered shows every name + the reason it was + rejected so the user can diff against their own filterPattern config. + True filter count comes from Status.filtered_counts so the kept math + stays correct even when names were dropped past the per-type cap.""" + pattern_field = _pattern_field(entity_type) + discovered = status.discovered_counts.get(entity_type) + stored_count = len(filtered_entries) + true_count = status.filtered_counts.get(entity_type, stored_count) + section = ["", f"{entity_type} ({pattern_field}):"] + + if discovered is not None: + section.append(f" Visible to ingestion user: {discovered}") + section.append(f" Filtered out ({true_count}):") + if filtered_entries: + max_name_width = max(len(name) for name, _ in filtered_entries) + name_pad = min(max_name_width, 50) + for name, reason in filtered_entries: + detail = reason.split(": ", 1)[1] if ": " in reason else reason + section.append(f" {name:<{name_pad}} → {detail}") + if true_count > stored_count: + section.append( + f" ... and {true_count - stored_count} more " + f"(full list truncated at cap of {MAX_FILTERED_ENTRIES_PER_TYPE})" + ) + if discovered is not None: + kept = discovered - true_count + section.append(f" Passed filter patterns: {kept}") + + return section + + +def _group_filtered_by_entity_type(status: Status) -> dict[str, list[tuple[str, str]]]: + """Walk Status.filtered (the existing project-wide accumulator) and + bucket entries by entity type, parsed from the reason string this + module wrote. Entries whose reason doesn't follow our convention + (legacy callers of status.filter) are skipped, not guessed at.""" + by_type: dict[str, list[tuple[str, str]]] = {} + for entry in status.filtered: + for name, reason in entry.items(): + entity_type = _entity_type_from_reason(reason) + if entity_type: + by_type.setdefault(entity_type, []).append((name, reason)) + return by_type + + +def _pattern_field(entity_type: str) -> str: + """Map 'Database' -> 'databaseFilterPattern', preserving the same field + name connectors use in their YAML config so log messages are + grep-actionable against the user's config.""" + return f"{entity_type[0].lower()}{entity_type[1:]}FilterPattern" + + +def _entity_type_from_reason(reason: str) -> str | None: + """Inverse of the reason string built in log_filtered. Case-insensitive + match on ' Filtered Out' so it catches both the enriched form + ('Database Filtered Out: did not pass databaseFilterPattern...') and + historical variants (e.g., 'Database Filtered out' with lowercase + 'out' that show up in older BigQuery code paths). The slice preserves + the original entity-type casing so the report still reads correctly.""" + marker_lower = f" {_REASON_SUFFIX}".lower() + idx = reason.lower().find(marker_lower) + if idx > 0: + return reason[:idx] + return None diff --git a/ingestion/src/metadata/utils/filters.py b/ingestion/src/metadata/utils/filters.py index 02f316f426ca..1e44c83d9d01 100644 --- a/ingestion/src/metadata/utils/filters.py +++ b/ingestion/src/metadata/utils/filters.py @@ -76,11 +76,14 @@ def _filter(filter_pattern: Optional[FilterPattern], name: Optional[str]) -> boo return False -def filter_by_schema(schema_filter_pattern: Optional[FilterPattern], schema_name: str) -> bool: # noqa: UP045 +def filter_by_schema(schema_filter_pattern: Optional[FilterPattern], schema_name: Optional[str]) -> bool: # noqa: UP045 """ Return True if the schema needs to be filtered, False otherwise - Include takes precedence over exclude + Include takes precedence over exclude. None name is treated as + filtered out (matches `_filter`'s contract) so callers can pass + Optional[str] freely (common when the name comes from `X if useFqn + else Y` where one side is FQN-typed as Optional). :param schema_filter_pattern: Model defining schema filtering logic :param schema fqn: table schema fqn @@ -89,11 +92,12 @@ def filter_by_schema(schema_filter_pattern: Optional[FilterPattern], schema_name return _filter(schema_filter_pattern, schema_name) -def filter_by_table(table_filter_pattern: Optional[FilterPattern], table_name: str) -> bool: # noqa: UP045 +def filter_by_table(table_filter_pattern: Optional[FilterPattern], table_name: Optional[str]) -> bool: # noqa: UP045 """ Return True if the table needs to be filtered, False otherwise - Include takes precedence over exclude + Include takes precedence over exclude. None name is treated as + filtered out (matches `_filter`'s contract). :param table_filter_pattern: Model defining schema filtering logic :param table_fqn: table fqn @@ -170,11 +174,12 @@ def filter_by_fqn(fqn_filter_pattern: Optional[FilterPattern], fqn: str) -> bool return _filter(fqn_filter_pattern, fqn) -def filter_by_database(database_filter_pattern: Optional[FilterPattern], database_name: str) -> bool: # noqa: UP045 +def filter_by_database(database_filter_pattern: Optional[FilterPattern], database_name: Optional[str]) -> bool: # noqa: UP045 """ Return True if the schema needs to be filtered, False otherwise - Include takes precedence over exclude + Include takes precedence over exclude. None name is treated as + filtered out (matches `_filter`'s contract). :param database_filter_pattern: Model defining database filtering logic :param database_name: database name diff --git a/ingestion/tests/unit/utils/test_filter_visibility.py b/ingestion/tests/unit/utils/test_filter_visibility.py new file mode 100644 index 000000000000..f84f340781f0 --- /dev/null +++ b/ingestion/tests/unit/utils/test_filter_visibility.py @@ -0,0 +1,515 @@ +# Copyright 2025 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for filter_visibility helpers used across connector base classes. + +Three test groups: + - Behavior: counts, reasons, report formatting, back-compat with legacy + `Status.filter()` reason strings. + - Bounded growth: per-entity-type cap on Status.filtered enforced by + log_filtered; report annotates truncation; counts stay correct. + - Resilience: helpers swallow all exceptions and never propagate them to + the connector, even with bad inputs or a broken logger. +""" + +import logging +from unittest.mock import MagicMock + +import pytest + +from metadata.ingestion.api.status import Status +from metadata.utils.filter_visibility import ( + MAX_FILTERED_ENTRIES_PER_TYPE, + log_discovered, + log_filtered, + log_step_summary, +) + + +@pytest.fixture +def status() -> Status: + return Status() + + +@pytest.fixture +def logger() -> logging.Logger: + return logging.getLogger("test_filter_visibility") + + +# --------------------------------------------------------------------------- +# Behavior +# --------------------------------------------------------------------------- + + +def test_log_discovered_records_count_and_emits_info(status, logger, caplog): + with caplog.at_level(logging.INFO, logger=logger.name): + log_discovered(logger, status, "Database", ["db1", "db2", "db3"]) + + assert status.discovered_counts == {"Database": 3} + assert any("Discovered 3 database(s) visible to the ingestion user" in r.message for r in caplog.records) + + +def test_log_discovered_emits_full_list_at_debug(status, logger, caplog): + with caplog.at_level(logging.DEBUG, logger=logger.name): + log_discovered(logger, status, "Schema", ["public", "internal"]) + + debug_messages = [r.message for r in caplog.records if r.levelno == logging.DEBUG] + assert any("public" in msg and "internal" in msg for msg in debug_messages) + + +def test_log_discovered_accumulates_across_calls(status, logger): + log_discovered(logger, status, "Database", ["db1"]) + log_discovered(logger, status, "Database", ["db2", "db3"]) + + assert status.discovered_counts == {"Database": 3} + + +def test_log_discovered_accepts_generator(status, logger): + def gen(): + yield from ["a", "b"] + + log_discovered(logger, status, "Topic", gen()) + + assert status.discovered_counts == {"Topic": 2} + + +def test_log_discovered_skips_materialization_when_debug_disabled(status, logger): + """For large catalogs the names iterable can be expensive to materialize. + When DEBUG isn't enabled we have no use for the names — only the count + matters — so the helper must stream-count instead of calling list().""" + materialize_count = {"calls": 0} + + def tracking_gen(): + for i in range(5): + materialize_count["calls"] += 1 + yield f"n{i}" + + # logger fixture is at WARNING by default — DEBUG is OFF + log_discovered(logger, status, "Topic", tracking_gen()) + + # Generator was consumed exactly once for the count (5 yields), but no + # list() copy was made on top of that + assert materialize_count["calls"] == 5 + assert status.discovered_counts == {"Topic": 5} + + +def test_log_discovered_uses_len_directly_for_sized_collections(status, logger): + """A list/tuple already knows its length — no need to iterate at all + when DEBUG is disabled.""" + + class SizedNoIter: + """A Sized that raises if iterated — proves len() path is used.""" + + def __len__(self): + return 42 + + def __iter__(self): + raise AssertionError("must not iterate when only len() is needed") + + log_discovered(logger, status, "Table", SizedNoIter()) # type: ignore[arg-type] + + assert status.discovered_counts == {"Table": 42} + + +def test_log_discovered_materializes_when_debug_enabled(status, logger, caplog): + """When the operator turned DEBUG on, they explicitly want the names — + so the helper materializes to emit the full list.""" + + def gen(): + yield from ["n1", "n2"] + + with caplog.at_level(logging.DEBUG, logger=logger.name): + log_discovered(logger, status, "Topic", gen()) + + debug_msgs = [r.message for r in caplog.records if r.levelno == logging.DEBUG] + assert any("n1" in m and "n2" in m for m in debug_msgs) + assert status.discovered_counts == {"Topic": 2} + + +def test_log_filtered_stores_rich_reason_on_status(status, logger): + log_filtered( + logger, + status, + "Database", + "BACKUP_DB", + matched_against="service.BACKUP_DB", + use_fqn_for_filtering=True, + ) + + assert status.filtered_counts == {"Database": 1} + assert len(status.filtered) == 1 + name, reason = next(iter(status.filtered[0].items())) + assert name == "BACKUP_DB" + assert reason.startswith("Database Filtered Out: ") + assert "did not pass databaseFilterPattern" in reason + assert "matched against 'service.BACKUP_DB'" in reason + assert "useFqnForFiltering=True" in reason + + +def test_log_filtered_omits_matched_against_when_same_as_name(status, logger): + log_filtered(logger, status, "Schema", "TEMP", matched_against="TEMP") + + name, reason = next(iter(status.filtered[0].items())) + assert name == "TEMP" + assert "matched against" not in reason + + +def test_log_filtered_works_with_minimal_args(status, logger): + log_filtered(logger, status, "Topic", "noisy.topic") + + name, reason = next(iter(status.filtered[0].items())) + assert name == "noisy.topic" + assert reason == "Topic Filtered Out: did not pass topicFilterPattern" + + +def test_log_step_summary_emits_consolidated_report(status, logger, caplog): + log_discovered(logger, status, "Database", ["a", "b", "c", "d"]) + log_discovered(logger, status, "Schema", ["s1", "s2", "s3"]) + log_filtered(logger, status, "Database", "a", matched_against="svc.a", use_fqn_for_filtering=True) + log_filtered(logger, status, "Schema", "s1") + log_filtered(logger, status, "Schema", "s2") + + caplog.clear() + with caplog.at_level(logging.INFO, logger=logger.name): + log_step_summary(logger, status, "snowflake") + + summary = "\n".join(r.message for r in caplog.records) + assert "FILTER VISIBILITY REPORT: snowflake" in summary + assert "Database (databaseFilterPattern):" in summary + assert "Visible to ingestion user: 4" in summary + assert "Filtered out (1):" in summary + assert "Passed filter patterns: 3" in summary + assert "a" in summary and "did not pass databaseFilterPattern" in summary + assert "matched against 'svc.a'" in summary + + assert "Schema (schemaFilterPattern):" in summary + assert "Visible to ingestion user: 3" in summary + assert "Filtered out (2):" in summary + assert "Passed filter patterns: 1" in summary + + +def test_log_step_summary_handles_filtered_without_discovered(status, logger, caplog): + log_filtered(logger, status, "Pipeline", "p1") + + caplog.clear() + with caplog.at_level(logging.INFO, logger=logger.name): + log_step_summary(logger, status, "airflow") + + summary = "\n".join(r.message for r in caplog.records) + assert "Pipeline (pipelineFilterPattern):" in summary + assert "Filtered out (1):" in summary + assert "p1" in summary + + +def test_log_step_summary_noop_when_nothing_to_report(status, logger, caplog): + with caplog.at_level(logging.INFO, logger=logger.name): + log_step_summary(logger, status, "empty_source") + + assert not caplog.records + + +def test_log_step_summary_skips_unrelated_filter_reasons(status, logger, caplog): + log_discovered(logger, status, "Database", ["a", "b"]) + status.filter("x", "some other reason that is not from our helper") + + caplog.clear() + with caplog.at_level(logging.INFO, logger=logger.name): + log_step_summary(logger, status, "mixed") + + summary = "\n".join(r.message for r in caplog.records) + assert "Visible to ingestion user: 2" in summary + assert "Filtered out (0):" in summary + assert "Passed filter patterns: 2" in summary + assert "some other reason" not in summary + + +def test_log_step_summary_recognizes_legacy_reason_strings(status, logger, caplog): + """status.filter() callers predating this helper (e.g., 'Database Filtered Out' + without a trailing colon) must still be grouped into the right entity-type + section of the report.""" + log_discovered(logger, status, "Database", ["a", "b"]) + status.filter("a", "Database Filtered Out") + + caplog.clear() + with caplog.at_level(logging.INFO, logger=logger.name): + log_step_summary(logger, status, "legacy") + + summary = "\n".join(r.message for r in caplog.records) + assert "Database (databaseFilterPattern):" in summary + assert "Filtered out (1):" in summary + assert "Passed filter patterns: 1" in summary + + +def test_log_step_summary_matches_legacy_lowercase_out_variant(status, logger, caplog): + """Pre-existing BigQuery code path historically used 'Database Filtered out' + (lowercase 'out'). Our marker match must be case-insensitive so those + entries still get counted in the right section of the report.""" + log_discovered(logger, status, "Database", ["a", "b", "c"]) + status.filter("a", "Database Filtered out") # lowercase 'out' from older code + + caplog.clear() + with caplog.at_level(logging.INFO, logger=logger.name): + log_step_summary(logger, status, "legacy_lowercase") + + summary = "\n".join(r.message for r in caplog.records) + assert "Database (databaseFilterPattern):" in summary + assert "Filtered out (1):" in summary + assert "Passed filter patterns: 2" in summary + + +def test_get_filtered_count_returns_true_count_past_cap(status, logger): + """Status.get_filtered_count must surface the true count (including + overflow past the per-type cap) so persisted StepSummary.filtered + stays accurate even when names were dropped to bound memory.""" + for i in range(MAX_FILTERED_ENTRIES_PER_TYPE + 17): + log_filtered(logger, status, "Table", f"t{i}") + + assert len(status.filtered) == MAX_FILTERED_ENTRIES_PER_TYPE + # get_filtered_count returns the true count, not the stored len + assert status.get_filtered_count() == MAX_FILTERED_ENTRIES_PER_TYPE + 17 + + +def test_get_filtered_count_handles_legacy_only_entries(status): + """If a legacy caller bypassed the helper and called status.filter() + directly, the count is len(filtered) (filtered_counts is empty).""" + status.filter("a", "Database Filtered Out") + status.filter("b", "Database Filtered Out") + + assert status.get_filtered_count() == 2 + + +def test_get_filtered_count_empty_status_is_zero(status): + assert status.get_filtered_count() == 0 + + +def test_get_filtered_count_mixed_helper_and_legacy(status, logger): + """Helper tracks Database; legacy callers added Schema entries directly. + The legacy Schema entries must be counted in addition to the helper's + true Database count — not absorbed by the helper's tally.""" + for i in range(10): + log_filtered(logger, status, "Database", f"db_{i}") + status.filter("legacy_schema_1", "Schema Filtered Out") + status.filter("legacy_schema_2", "Schema Filtered Out") + + # 10 helper Database + 2 legacy Schema = 12 + assert status.get_filtered_count() == 12 + + +def test_get_filtered_count_mixed_with_cap_overflow(status, logger): + """Worst case the second-pass reviewer called out: helper hits the cap + on one type AND legacy adds entries for a different type. True count + must include both the cap-overflow and the legacy entries.""" + for i in range(MAX_FILTERED_ENTRIES_PER_TYPE + 30): + log_filtered(logger, status, "Table", f"t_{i}") + status.filter("legacy_schema_1", "Schema Filtered Out") + status.filter("legacy_schema_2", "Schema Filtered Out") + + # MAX + 30 helper Table + 2 legacy Schema + assert status.get_filtered_count() == MAX_FILTERED_ENTRIES_PER_TYPE + 32 + + +def test_log_step_summary_includes_clarifying_note(status, logger, caplog): + """Kept counts can over-report actual ingestion when downstream + extraction fails or secondary filters reject items. The report header + must say so explicitly so users don't try to reconcile numbers.""" + log_discovered(logger, status, "Dashboard", ["d1"]) + log_filtered(logger, status, "Dashboard", "d1") + + caplog.clear() + with caplog.at_level(logging.INFO, logger=logger.name): + log_step_summary(logger, status, "doc_note") + + summary = "\n".join(r.message for r in caplog.records) + assert "Note:" in summary + assert "does not subtract source-side extraction failures" in summary + + +# --------------------------------------------------------------------------- +# Bounded growth — per-entity-type cap +# --------------------------------------------------------------------------- + + +def test_log_filtered_caps_stored_names_but_preserves_count(status, logger): + """Past the per-type cap, the true count keeps climbing but the + name list stops growing — Status.filtered stays bounded.""" + for i in range(MAX_FILTERED_ENTRIES_PER_TYPE + 25): + log_filtered(logger, status, "Table", f"table_{i}") + + assert status.filtered_counts["Table"] == MAX_FILTERED_ENTRIES_PER_TYPE + 25 + stored = [next(iter(entry.keys())) for entry in status.filtered] + assert len(stored) == MAX_FILTERED_ENTRIES_PER_TYPE + assert stored[0] == "table_0" + assert stored[-1] == f"table_{MAX_FILTERED_ENTRIES_PER_TYPE - 1}" + + +def test_log_step_summary_annotates_truncation_when_cap_exceeded(status, logger, caplog): + log_discovered(logger, status, "Table", [f"table_{i}" for i in range(MAX_FILTERED_ENTRIES_PER_TYPE + 200)]) + for i in range(MAX_FILTERED_ENTRIES_PER_TYPE + 200): + log_filtered(logger, status, "Table", f"table_{i}") + + caplog.clear() + with caplog.at_level(logging.INFO, logger=logger.name): + log_step_summary(logger, status, "huge_catalog") + + summary = "\n".join(r.message for r in caplog.records) + expected_total = MAX_FILTERED_ENTRIES_PER_TYPE + 200 + expected_overflow = 200 + assert f"Filtered out ({expected_total}):" in summary + assert ( + f"... and {expected_overflow} more (full list truncated at cap of {MAX_FILTERED_ENTRIES_PER_TYPE})" in summary + ) + # Kept math must use the TRUE count, not the stored count, or it would be wrong by 200 + assert "Passed filter patterns: 0" in summary + + +def test_log_filtered_per_type_cap_is_independent(status, logger): + """Hitting the cap on one entity type must not affect another.""" + for i in range(MAX_FILTERED_ENTRIES_PER_TYPE + 5): + log_filtered(logger, status, "Table", f"t_{i}") + log_filtered(logger, status, "Schema", "tmp") + + stored_tables = [next(iter(e.keys())) for e in status.filtered if next(iter(e.values())).startswith("Table ")] + stored_schemas = [next(iter(e.keys())) for e in status.filtered if next(iter(e.values())).startswith("Schema ")] + assert len(stored_tables) == MAX_FILTERED_ENTRIES_PER_TYPE + assert stored_schemas == ["tmp"] + assert status.filtered_counts == {"Table": MAX_FILTERED_ENTRIES_PER_TYPE + 5, "Schema": 1} + + +# --------------------------------------------------------------------------- +# Resilience — observability must never propagate exceptions +# --------------------------------------------------------------------------- + + +def test_log_discovered_swallows_exceptions_from_logger(status): + """Even a broken logger must not break ingestion.""" + broken_logger = MagicMock() + broken_logger.info.side_effect = RuntimeError("logger blew up") + + log_discovered(broken_logger, status, "Database", ["db1", "db2"]) + + # The count update happened before the failing info call, so it sticks + assert status.discovered_counts == {"Database": 2} + # Helper swallowed the exception and logged a warning on the same logger + broken_logger.warning.assert_called_once() + + +def test_log_discovered_swallows_exceptions_from_bad_iterable(status, logger): + """A connector returning a generator that raises mid-iteration must + not crash the connector.""" + + def exploding_generator(): + yield "ok_name" + raise RuntimeError("source went away") + + log_discovered(logger, status, "Database", exploding_generator()) # must not raise + + +def test_log_filtered_swallows_exceptions(status): + broken_logger = MagicMock() + broken_logger.info.side_effect = RuntimeError("logger blew up") + + log_filtered(broken_logger, status, "Database", "BACKUP_DB") + + # filtered_counts updated before the failing info call, so it sticks + assert status.filtered_counts == {"Database": 1} + broken_logger.warning.assert_called_once() + + +def test_log_filtered_handles_weird_name_values(status, logger): + """Bad names — None, empty, non-ASCII, control chars — must not crash.""" + # None as name; matched_against also None + log_filtered(logger, status, "Table", None) # type: ignore[arg-type] + # Empty string + log_filtered(logger, status, "Table", "") + # Unicode + log_filtered(logger, status, "Table", "日本語_table") + # Control characters + log_filtered(logger, status, "Table", "tab\there\nnewline") + + # All four counted, none crashed + assert status.filtered_counts["Table"] == 4 + + +def test_log_step_summary_swallows_exceptions(logger, caplog): + """A corrupted Status (wrong type for filtered_counts) must not crash close().""" + broken_status = Status() + # Inject a value that breaks the report math: int where dict is expected + broken_status.discovered_counts = "not a dict" # type: ignore[assignment] + + with caplog.at_level(logging.WARNING, logger=logger.name): + log_step_summary(logger, broken_status, "broken_source") # must not raise + + warnings = [r for r in caplog.records if r.levelno == logging.WARNING] + assert any("log_step_summary failed" in w.message for w in warnings) + + +def test_log_step_summary_handles_empty_status_gracefully(status, logger): + """A source step that never called log_discovered / log_filtered should + produce no report and no exception (covers sink-only steps).""" + log_step_summary(logger, status, "no_op") # must not raise + + +# --------------------------------------------------------------------------- +# Integration-style — minimal connector base class lifecycle +# --------------------------------------------------------------------------- + + +def test_end_to_end_helper_lifecycle_produces_correct_report(logger, caplog): + """Simulate what a real connector's discover-filter-close cycle looks + like — multiple databases, each with their own schemas and tables — + and verify the report counts and per-entity-type breakdown end up + correct after the streaming logs all fire.""" + status = Status() + + # Database-level: 5 visible, 2 filtered + log_discovered(logger, status, "Database", ["db_a", "db_b", "db_c", "db_d", "db_e"]) + log_filtered(logger, status, "Database", "db_d", matched_against="svc.db_d", use_fqn_for_filtering=True) + log_filtered(logger, status, "Database", "db_e", matched_against="svc.db_e", use_fqn_for_filtering=True) + + # Schemas: visited per kept database (3 dbs x 4 schemas), 1 filtered per db + for db in ("db_a", "db_b", "db_c"): + log_discovered(logger, status, "Schema", [f"{db}.public", f"{db}.audit", f"{db}.tmp", f"{db}.staging"]) + log_filtered(logger, status, "Schema", f"{db}.tmp") + + # Tables: per kept schema (3 dbs x 3 kept schemas x 5 tables), 0 filtered + for db in ("db_a", "db_b", "db_c"): + for schema in ("public", "audit", "staging"): + log_discovered(logger, status, "Table", [f"{db}.{schema}.t{i}" for i in range(5)]) + + caplog.clear() + with caplog.at_level(logging.INFO, logger=logger.name): + log_step_summary(logger, status, "fake_source") + + summary = "\n".join(r.message for r in caplog.records) + assert "FILTER VISIBILITY REPORT: fake_source" in summary + # Database: 5 visible, 2 filtered, 3 kept + assert "Database (databaseFilterPattern):" in summary + assert "Visible to ingestion user: 5" in summary + assert "Filtered out (2):" in summary + # Schema: 3 dbs x 4 = 12 visible, 3 filtered, 9 kept + assert "Schema (schemaFilterPattern):" in summary + assert "Visible to ingestion user: 12" in summary + assert "Filtered out (3):" in summary + # Table: 3 x 3 x 5 = 45 visible, 0 filtered, 45 kept + assert "Table (tableFilterPattern):" in summary + assert "Visible to ingestion user: 45" in summary + assert "Filtered out (0):" in summary + assert "Passed filter patterns: 45" in summary + + +def test_end_to_end_close_lifecycle_with_broken_summary_does_not_raise(logger): + """If log_step_summary somehow propagated (despite internal guard), + the connector's close() wrappers must catch it. Simulate by calling + log_step_summary on a Status with corrupted internals and asserting + no exception escapes.""" + status = Status() + status.discovered_counts = 12345 # type: ignore[assignment] + + log_step_summary(logger, status, "corrupted") # must not raise