Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions ingestion/src/metadata/ingestion/api/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ class Status(BaseModel):
warnings: Annotated[List[Any], Field(default_factory=list)] # noqa: UP006
filtered: Annotated[List[Dict[str, str]], Field(default_factory=list)] # noqa: UP006
failures: Annotated[List[TruncatedStackTraceError], Field(default_factory=list)] # noqa: UP006
discovered_counts: Annotated[Dict[str, int], Field(default_factory=dict)] # noqa: UP006
# True per-entity-type count of filter rejections, even when the name is
# not appended to `filtered` because the per-type cap was hit. Source of
# truth for the end-of-step FILTER VISIBILITY REPORT count.
filtered_counts: Annotated[Dict[str, int], Field(default_factory=dict)] # noqa: UP006

def scanned(self, record: Any) -> None:
"""
Expand Down Expand Up @@ -101,6 +106,40 @@ def warning(self, key: str, reason: str) -> None:
def filter(self, key: str, reason: str) -> None:
self.filtered.append({key: reason})

def get_filtered_count(self) -> int:
"""True count of filter rejections, including any entries past the
per-entity-type cap that were counted in `filtered_counts` but not
appended to `filtered` to bound memory.

Handles three populations:
- Helper-driven types (in `filtered_counts`): true count comes
from `filtered_counts`, which keeps growing past the per-type
cap even when the name is not stored in `filtered`.
- Legacy direct `status.filter()` callers: not represented in
`filtered_counts`. Count them by walking `filtered` and
including only entries whose entity-type prefix isn't already
tracked by the helper, so we don't double-count.
- No helper usage at all: degenerate case; fall back to
`len(filtered)`.
"""
if not self.filtered_counts:
return len(self.filtered)

helper_total = sum(self.filtered_counts.values())
tracked_types = set(self.filtered_counts)
legacy_count = 0
for entry in self.filtered:
for reason in entry.values():
if not any(reason.startswith(f"{t} ") for t in tracked_types):
legacy_count += 1
return helper_total + legacy_count

def record_discovered(self, entity_type: str, count: int) -> None:
"""Record the count of entities discovered from the source before any
filter pattern is applied. Used by log_step_summary to report the
discovered vs. filtered vs. kept breakdown."""
self.discovered_counts[entity_type] = self.discovered_counts.get(entity_type, 0) + count

def as_string(self) -> str:
parts = []
for key, value in self.__dict__.items():
Expand Down
2 changes: 1 addition & 1 deletion ingestion/src/metadata/ingestion/api/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def from_step(cls, step: Step) -> "Summary":
updated_records=len(step.status.updated_records),
warnings=len(step.status.warnings),
errors=len(step.status.failures),
filtered=len(step.status.filtered),
filtered=step.status.get_filtered_count(),
failures=step.status.failures[0:10] if step.status.failures else None,
progress=progress_tracker.get_progress_as_dict() or None,
operationMetrics=operation_metrics.get_summary() or None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_connection, test_connection_common
from metadata.utils import fqn
from metadata.utils.filter_visibility import (
log_discovered,
log_filtered,
log_step_summary,
)
from metadata.utils.filters import filter_by_dashboard, filter_by_project
from metadata.utils.logger import ingestion_logger

Expand Down Expand Up @@ -418,6 +423,10 @@
return

def close(self):
try:
log_step_summary(logger, self.status, self.config.serviceName)
except Exception:
logger.warning("Filter visibility report failed; continuing close()", exc_info=True)
self.metadata.close()

def get_services(self) -> Iterable[WorkflowSource]:
Expand Down Expand Up @@ -520,8 +529,8 @@

@staticmethod
def _get_add_lineage_request(
to_entity: Union[Dashboard, DashboardDataModel, Chart], # noqa: UP007

Check warning on line 532 in ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Use a union type expression for this type hint.

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZ5MmGoT68hX41I62CIf&open=AZ5MmGoT68hX41I62CIf&pullRequest=28355
from_entity: Union[Table, DashboardDataModel, Dashboard], # noqa: UP007

Check warning on line 533 in ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Use a union type expression for this type hint.

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZ5MmGoT68hX41I62CIg&open=AZ5MmGoT68hX41I62CIg&pullRequest=28355
column_lineage: List[ColumnLineage] = None, # noqa: RUF013, UP006
sql: Optional[str] = None, # noqa: UP045
) -> Optional[Either[AddLineageRequest]]: # noqa: UP045
Expand Down Expand Up @@ -566,15 +575,26 @@
"""
Method to iterate through dashboard lists filter dashboards & yield dashboard details
"""
for dashboard in self.get_dashboards_list():
dashboard_name = self.get_dashboard_name(dashboard)
# `or []` matches the Optional[List[Any]] contract used by Pipeline /
# Messaging sources — `list(None)` would raise TypeError if a
# concrete connector returned None for an empty workspace. Avoid
# the extra shallow copy when the source already gave us a list.
dashboards_result = self.get_dashboards_list() or []
dashboards = dashboards_result if isinstance(dashboards_result, list) else list(dashboards_result)
# Compute names once and reuse — `get_dashboard_name` is a
# connector-specific call that may not be free.
dashboard_pairs = [(d, self.get_dashboard_name(d)) for d in dashboards]
log_discovered(logger, self.status, "Dashboard", [n for _, n in dashboard_pairs])
for dashboard, dashboard_name in dashboard_pairs:
if filter_by_dashboard(
self.source_config.dashboardFilterPattern,
dashboard_name,
):
self.status.filter(
log_filtered(
logger,
self.status,
"Dashboard",
dashboard_name,
"Dashboard Filtered Out",
)
continue

Expand All @@ -594,9 +614,19 @@
self.source_config.projectFilterPattern,
project_name,
):
self.status.filter(
project_name,
"Project / Workspace Filtered Out",
# project_name was reassigned to a list above when the
# connector returned multiple project names; coerce
# to a single string so the stored reason and report
# output stay readable (and _entity_type_from_reason
# can still parse the reason cleanly).
project_label = (
project_name if isinstance(project_name, str) else ", ".join(str(p) for p in project_name)
)
log_filtered(
logger,
self.status,
"Project",
project_label,
)
continue

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
from metadata.utils import fqn
from metadata.utils.credentials import GOOGLE_CREDENTIALS
from metadata.utils.execution_time_tracker import calculate_execution_time
from metadata.utils.filter_visibility import log_discovered, log_filtered
from metadata.utils.filters import filter_by_database, filter_by_schema
from metadata.utils.helpers import retry_with_docker_host
from metadata.utils.logger import ingestion_logger
Expand Down Expand Up @@ -663,20 +664,36 @@ def _get_filtered_datasets(self, project_id: str) -> List[str]: # noqa: UP006
]

def _get_filtered_schema_names(self, return_fqn: bool = False, add_to_status: bool = True) -> Iterable[str]:
for schema_name in self.get_raw_database_schema_names():
# Only materialize when log_discovered needs the count up front;
# mark-deleted paths iterate once with add_to_status=False so
# streaming saves O(n) memory on large projects.
if add_to_status:
raw_names: Iterable[str] = list(self.get_raw_database_schema_names())
log_discovered(logger, self.status, "Schema", raw_names)
else:
raw_names = self.get_raw_database_schema_names()
for schema_name in raw_names:
schema_fqn = fqn.build(
self.metadata,
entity_type=DatabaseSchema,
service_name=self.context.get().database_service,
database_name=self.context.get().database,
schema_name=schema_name,
)
filter_name = schema_fqn if self.source_config.useFqnForFiltering else schema_name
if filter_by_schema(
self.source_config.schemaFilterPattern,
schema_fqn if self.source_config.useFqnForFiltering else schema_name,
filter_name,
):
if add_to_status:
self.status.filter(schema_fqn, "Schema Filtered Out")
log_filtered(
logger,
self.status,
"Schema",
schema_name,
matched_against=filter_name,
use_fqn_for_filtering=self.source_config.useFqnForFiltering,
)
continue

if self.incremental.enabled:
Expand Down Expand Up @@ -826,18 +843,27 @@ def get_database_names_raw(self) -> Iterable[str]:
yield from self.project_ids

def get_database_names(self) -> Iterable[str]:
log_discovered(logger, self.status, "Database", self.project_ids)
for project_id in self.project_ids:
database_fqn = fqn.build(
self.metadata,
entity_type=Database,
service_name=self.context.get().database_service,
database_name=project_id,
)
filter_name = database_fqn if self.source_config.useFqnForFiltering else project_id
if filter_by_database(
self.source_config.databaseFilterPattern,
database_fqn if self.source_config.useFqnForFiltering else project_id,
filter_name,
):
self.status.filter(database_fqn, "Database Filtered out")
log_filtered(
logger,
self.status,
"Database",
project_id,
matched_against=filter_name,
use_fqn_for_filtering=self.source_config.useFqnForFiltering,
)
else:
try:
self.set_inspector(database_name=project_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@
calculate_execution_time,
calculate_execution_time_generator,
)
from metadata.utils.filter_visibility import (
log_discovered,
log_filtered,
log_step_summary,
)
from metadata.utils.filters import filter_by_table
from metadata.utils.helpers import retry_with_docker_host
from metadata.utils.logger import ingestion_logger
Expand All @@ -84,7 +89,7 @@
class ColumnAndReferredColumn(BaseModel):
table_name: str
schema_name: str
db_name: Optional[str] # noqa: UP045

Check failure on line 92 in ingestion/src/metadata/ingestion/source/database/common_db_source.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Add an explicit default value to this optional field.

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZ5MmGpY68hX41I62CIi&open=AZ5MmGpY68hX41I62CIi&pullRequest=28355
column: Dict # noqa: UP006


Expand Down Expand Up @@ -353,7 +358,7 @@
for table_name in self.inspector.get_view_names(schema_name) or []
]

def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, TableType]]]: # noqa: UP006, UP045

Check failure on line 361 in ingestion/src/metadata/ingestion/source/database/common_db_source.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 32 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZ5MmGpY68hX41I62CIj&open=AZ5MmGpY68hX41I62CIj&pullRequest=28355
"""
Handle table and views.

Expand All @@ -365,11 +370,21 @@
schema_name = self.context.get().database_schema
if self.source_config.includeTables:
try:
table_iter = self.query_table_names_and_types(schema_name)
# Default impl returns a list; only materialize if a subclass
# returns a generator. Avoids an O(n) shallow copy on the
# common path for large schemas.
table_result = self.query_table_names_and_types(schema_name)
table_iter = table_result if isinstance(table_result, list) else list(table_result)
except Exception as err:
logger.warning(f"Fetching table list failed for schema {schema_name} due to - {err}")
logger.debug(traceback.format_exc())
table_iter = []
log_discovered(
logger,
self.status,
"Table",
(item.name for item in table_iter),
)
Comment thread
harshach marked this conversation as resolved.
for table_and_type in table_iter:
try:
table_name = self.standardize_table_name(schema_name, table_and_type.name)
Expand All @@ -382,13 +397,18 @@
table_name=table_name,
skip_es_search=True,
)
filter_name = table_fqn if self.source_config.useFqnForFiltering else table_name
if filter_by_table(
self.source_config.tableFilterPattern,
(table_fqn if self.source_config.useFqnForFiltering else table_name),
filter_name,
):
self.status.filter(
table_fqn,
"Table Filtered Out",
log_filtered(
logger,
self.status,
"Table",
table_name,
matched_against=filter_name,
use_fqn_for_filtering=self.source_config.useFqnForFiltering,
)
continue
except Exception as err:
Expand All @@ -399,11 +419,18 @@

if self.source_config.includeViews:
try:
view_iter = self.query_view_names_and_types(schema_name)
view_result = self.query_view_names_and_types(schema_name)
view_iter = view_result if isinstance(view_result, list) else list(view_result)
except Exception as err:
logger.warning(f"Fetching view list failed for schema {schema_name} due to - {err}")
logger.debug(traceback.format_exc())
view_iter = []
log_discovered(
logger,
self.status,
"Table",
(item.name for item in view_iter),
)
for view_and_type in view_iter:
try:
view_name = self.standardize_table_name(schema_name, view_and_type.name)
Expand All @@ -416,13 +443,18 @@
table_name=view_name,
)

filter_name = view_fqn if self.source_config.useFqnForFiltering else view_name
if filter_by_table(
self.source_config.tableFilterPattern,
(view_fqn if self.source_config.useFqnForFiltering else view_name),
filter_name,
):
self.status.filter(
view_fqn,
"Table Filtered Out",
log_filtered(
logger,
self.status,
"Table",
view_name,
matched_against=filter_name,
use_fqn_for_filtering=self.source_config.useFqnForFiltering,
)
continue
except Exception as err:
Expand Down Expand Up @@ -746,6 +778,13 @@
return self._inspector_map[thread_id]

def close(self):
# Defense-in-depth: helper has its own internal try/except, but a
# bug in close()'s observability step must never block the real
# cleanup work (engine release, SSL temp file cleanup).
try:
log_step_summary(logger, self.status, self.config.serviceName)
except Exception:
logger.warning("Filter visibility report failed; continuing close()", exc_info=True)
self._release_engine()
if hasattr(self, "ssl_manager") and self.ssl_manager:
self.ssl_manager = cast(SSLManager, self.ssl_manager) # noqa: TC006
Expand Down
Loading
Loading