feat(ingestion): cross-connector filter visibility report#28355
feat(ingestion): cross-connector filter visibility report#28355harshach wants to merge 6 commits into
Conversation
Generalize Snowflake's per-database discovery + filter-reason logging (#28336) to a reusable helper used by every connector family — Database, Dashboard, Pipeline, Messaging, Storage, MLModel. At end of each source step, emit a single grep-friendly "FILTER VISIBILITY REPORT" block listing the discovered count, every filtered name + reason, and the kept count per entity type, so users can tell whether a missing entity was removed by includes/excludes or never visible to the ingestion role. Storage profile is the diff only: discovered counts (int per type) + the existing Status.filtered list (now carrying richer reasons). No discovered-name or kept-name lists; both are derivable. No spec/JSON schema change, no UI work. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR adds a reusable ingestion-side logging utility to standardize “discovered → filtered → kept” visibility across connector families, culminating in a single grep-friendly FILTER VISIBILITY REPORT emitted at step close to help users distinguish source-permission invisibility from filter-pattern exclusion.
Changes:
- Introduces
metadata.utils.filter_visibilityhelpers (log_discovered,log_filtered,log_step_summary) and wires them into multiple connector base classes and concrete sources. - Extends
Statuswith an in-memorydiscovered_countsaccumulator to compute “kept” counts without storing full discovered/kept name lists. - Adds unit tests for helper behavior and report formatting.
Reviewed changes
Copilot reviewed 17 out of 17 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| ingestion/tests/unit/utils/test_filter_visibility.py | New unit tests validating helper accumulation and report formatting. |
| ingestion/src/metadata/utils/filter_visibility.py | New centralized helper module for discovery/filter logging and consolidated report emission. |
| ingestion/src/metadata/ingestion/source/storage/storage_service.py | Uses helper for filtered container entries and emits step summary on close. |
| ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py | Logs discovered pipelines, logs filtered pipelines with rich reasons, emits step summary on close. |
| ingestion/src/metadata/ingestion/source/mlmodel/sagemaker/metadata.py | Logs discovered/filtered ML models using helper. |
| ingestion/src/metadata/ingestion/source/mlmodel/mlmodel_service.py | Emits step summary on close for MLModel sources. |
| ingestion/src/metadata/ingestion/source/mlmodel/mlflow/metadata.py | Logs discovered/filtered ML models using helper. |
| ingestion/src/metadata/ingestion/source/messaging/messaging_service.py | Logs discovered/filtered topics and emits step summary on close. |
| ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py | Moves Snowflake database discovery/filter logging to shared helper. |
| ingestion/src/metadata/ingestion/source/database/redshift/metadata.py | Adds discovery + rich filter logging for databases via helper. |
| ingestion/src/metadata/ingestion/source/database/postgres/metadata.py | Adds discovery + rich filter logging for databases via helper. |
| ingestion/src/metadata/ingestion/source/database/mssql/metadata.py | Adds discovery + rich filter logging for databases via helper. |
| ingestion/src/metadata/ingestion/source/database/database_service.py | Instruments DB base-class filtering paths with helper-driven rich filter logging (and schema discovery logging). |
| ingestion/src/metadata/ingestion/source/database/common_db_source.py | Logs discovered tables/views, logs filtered tables/views with rich reasons, emits step summary on close. |
| ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py | Adds discovery + rich filter logging for databases/schemas via helper. |
| ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py | Logs discovered dashboards, logs filtered dashboards/projects, emits step summary on close. |
| ingestion/src/metadata/ingestion/api/status.py | Adds discovered_counts + record_discovered() to support report computations. |
…efenses
Observability code in the ingestion hot path must not fail connectors.
Adds three layers of defense:
1. Helper-internal try/except in log_discovered, log_filtered, and
log_step_summary. Any failure is logged once at WARN and swallowed.
2. Call-site try/except around log_step_summary in every base class
close() (database, dashboard, pipeline, messaging, storage, mlmodel)
so a summary failure can't mask the real cleanup work.
3. Per-entity-type cap on Status.filtered (MAX_FILTERED_ENTRIES_PER_TYPE
= 50_000). Past the cap, log_filtered only bumps a new
Status.filtered_counts dict so the true count stays accurate without
unbounded memory growth. The report annotates the truncation.
Test coverage doubled (12 -> 23): adds bounded-growth tests for the cap,
resilience tests for broken loggers / bad inputs / corrupted Status, and
an integration-style multi-database lifecycle test that verifies counts
are correct after the full streaming-log + summary cycle.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- log_discovered: skip list() materialization when DEBUG logging is off. Use len() directly for Sized inputs (zero alloc) and a streaming sum() for generators. Cuts the per-discovery cost from O(n) memory to O(1) on large catalogs when --debug isn't active. - _entity_type_from_reason: case-insensitive marker match so legacy reasons like "Database Filtered out" (lowercase 'out' from older BigQuery code paths) get bucketed into the right report section instead of being silently undercounted. - dashboard_service.get_dashboard: null-safe via `or []` (matches the Optional[List[Any]] contract Pipeline / Messaging sources already use); materialize names once into a list so log_discovered can take the zero-allocation Sized path instead of re-listing a generator over the heavy dashboard objects. - common_db_source.get_tables_name_and_type / dashboard_service: skip the O(n) shallow copy when query_*_names_and_types() / get_dashboards_list() already returned a list. Only materialize when a subclass returned a generator. - Report: rename "Will be published to OpenMetadata" → "Passed filter patterns" and prepend a note explaining the count is filter-decision only (does not subtract source-side extraction errors or secondary filters like projectFilterPattern). Avoids confusion when reconciling report counts with the final ingested set. Tests: 23 → 28 (added DEBUG-gated materialization, Sized len() path, case-insensitive marker, and clarifying-note assertions). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Status.get_filtered_count(): true count of filter rejections that includes overflow past the per-type cap. Summary.from_step (step.py) now uses it so persisted StepSummary.filtered stays accurate even when names were dropped to bound memory. Falls back to len(filtered) for legacy callers that never populated filtered_counts. - log_step_summary docstring: aligned with the actual "Passed filter patterns" label and the kept-semantic note printed inside the report. - _get_filtered_database_names: restored streaming iteration. The earlier list() was needed for log_discovered, but discovery logging was moved out of this maintenance-pass helper, so materialization is now pure overhead on large catalogs during mark-deleted. - messaging_service.get_topic / pipeline_service.get_pipeline: same isinstance optimization as dashboard_service / common_db_source — skip the O(n) shallow copy when the source already returned a list. Pre-materialize names once into a list so log_discovered takes the zero-allocation Sized path. - storage_service _filter_entries: preserve bucket context when logging a containerFilterPattern rejection. Stored name is now `bucket/path` and the raw `path` is exposed via matched_against — operators can identify which bucket an entry came from (dataPath alone is not unique across buckets). Tests: 28 -> 31 (added get_filtered_count cap-overflow + legacy + empty cases). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- dashboard_service.get_dashboard: coerce project_name to a string before passing to log_filtered. project_name is reassigned to a list (project_names) when get_project_names returns a non-empty list — passing the list verbatim produced a stringified list like "['a', 'b']" as the stored name, which made the reason unparseable by _entity_type_from_reason and the report ugly. Now joined with ", " when it's a list, kept as-is when it's already a string. - Status.get_filtered_count: precise per-entity-type formula so a mixed helper + legacy + cap-overflow scenario stays correct. The prior max() formula undercounted when one type hit the cap (helper) AND another type had legacy direct status.filter() entries — exactly the worst case the reviewer called out. Now: helper-tracked true count + legacy entries whose entity-type prefix isn't in filtered_counts. Tests: 31 -> 33 (added mixed helper+legacy case and the worst-case mixed helper-cap-overflow + legacy case). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
🟡 Playwright Results — all passed (10 flaky)✅ 4150 passed · ❌ 0 failed · 🟡 10 flaky · ⏭️ 90 skipped
🟡 10 flaky test(s) (passed on retry)
How to debug locally# Download playwright-test-results-<shard> artifact and unzip
npx playwright show-trace path/to/trace.zip # view trace |
- mark_databases_as_deleted: pass add_to_status=False to
_get_filtered_database_names. Without this, every filtered database
was being recorded twice (once during main ingestion via the
per-connector get_database_names, once during the maintenance pass),
inflating Status.filtered + filtered_counts and skewing the report.
- _get_filtered_schema_names: only materialize the raw schema names
when add_to_status=True (where log_discovered needs the count up
front). The mark-deleted maintenance paths iterate once with
add_to_status=False — streaming saves O(n) memory on large schemas.
Same fix applied to BigQuery's override.
- dashboard/pipeline/messaging get_X(): compute names once into
(entity, name) tuples instead of calling the connector-specific
get_*_name() twice per entity (once to build the discovery list,
once inside the filter loop). Real savings when name extraction is
non-trivial (e.g., Tableau workbook lookups).
- DB connector log_filtered calls: use the raw entity name (database_name,
schema_name, table_name, view_name, new_database, project_id) as
`name` and the FQN as `matched_against`. The previous convention of
`name=<fqn>` produced confusing report output when useFqnForFiltering
was False ("filtered name = FQN, matched against = raw name") and
was inconsistent with non-SQL connectors (Dashboard/Pipeline/Topic/
MLModel) which already stored raw names. Report column is now
readable across all connector families.
Tests: existing 33 still pass (no test changes needed — the storage-key
change to raw names is verified by the existing dashboard/pipeline/topic
assertions which always asserted raw names).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Code Review ✅ Approved 4 resolved / 4 findingsGeneralizes filter visibility reporting across all connector families by introducing a centralized helper, improving log consistency and debugging capabilities. The implementation addresses memory concerns, count inaccuracies, and formatting issues identified during the review process. ✅ 4 resolved✅ Performance: Materializing full dashboard list may cause memory pressure
✅ Edge Case: 'kept' count may be misleading when dashboards fail detail extraction
✅ Edge Case: log_filtered called with list instead of string for project_name
✅ Edge Case: get_filtered_count undercounts in mixed helper+legacy usage
OptionsDisplay: compact → Showing less information. Comment with these commands to change:
Was this helpful? React with 👍 / 👎 | Gitar |
|



Describe your changes:
Generalizes #28336's per-database discovery + filter-reason logging into a reusable helper that every connector family — Database, Dashboard, Pipeline, Messaging, Storage, MLModel — now uses. At end of each source step it emits a single grep-friendly FILTER VISIBILITY REPORT block listing the discovered count, every filtered name + reason, and the kept count per entity type, so users can immediately tell whether a missing entity was removed by
includes/excludesor never visible to the ingestion role at all. Storage profile is the diff only — discovered counts (int per type) plus the existingStatus.filteredlist now carrying richer reasons; no discovered-name or kept-name lists, both are derivable.Type of change:
High-level design:
ingestion/src/metadata/utils/filter_visibility.py— three helpers (log_discovered,log_filtered,log_step_summary) own the message format centrally so every connector family produces identical output.Statusgaineddiscovered_counts: Dict[str, int](in-memory only, no spec/JSON schema change) so the report can compute kept = discovered − filtered.database_service.py(_get_filtered_database_names,_get_filtered_schema_names),common_db_source.py(get_tables_name_and_typefor tables + views),dashboard_service.py,pipeline_service.py,messaging_service.py,storage_service.py,mlmodel_service.py— each base class'sclose()emits the report.get_database_namesdiscovery lifted into Snowflake (replacing fix(snowflake): log discovered databases and filter-out reasons #28336's inline logging with helper calls — same output, central format), Postgres, BigQuery, Redshift, MSSQL. MySQL inherits the no-op single-DB default and needs no override.sagemaker/metadata.pyandmlflow/metadata.pyswitched their existingfilter_by_mlmodelcalls to use the helper so they participate in the report._get_filtered_database_namesis reached from both main ingestion and themark_databases_as_deletedmaintenance pass. Discovery logging is therefore placed in each connector'sget_database_names, not in*_raw, to avoid double-counting; a docstring on the base helper flags this for future contributors.Example output (Snowflake, Cvent-like config):
Tests:
Use cases covered
databaseFilterPattern.excludes, and can immediately grepFILTER VISIBILITY REPORTto confirm which databases got removed and whydashboardFilterPattern,projectFilterPattern), pipeline (pipelineFilterPattern), messaging (topicFilterPattern), storage (containerFilterPattern), mlmodel (mlModelFilterPattern)status.filter(name, "Database Filtered Out")callers predating this helper still get bucketed into the right entity-type section of the report (back-compat)Unit tests
ingestion/tests/unit/utils/test_filter_visibility.py— 12 tests covering count accumulation, generator inputs, optional kwargs, rich reason storage, consolidated report format, empty-state no-op, unrelated-reason skip, and legacy-reason-string back-compattest_snowflake,test_postgres,test_bigquery,test_redshift,test_mssql,test_mysql,test_common_db_source,test_tableau,test_looker,test_airflow,test_s3_storage,test_sagemaker— 1 skipped, 0 failed)Backend integration tests
Ingestion integration tests
Status.filteris still called with the same"<EntityType> Filtered Out"prefix; persistedStepSummary.filteredcount is unchanged)Playwright (UI) tests
Manual testing performed
source env/bin/activate && cd ingestion && pip install -e . --no-depspython -m pytest ingestion/tests/unit/utils/test_filter_visibility.py -v→ 12/12 passpython -m pytest <12 connector test files>→ 189/190 pass (1 skipped, 0 failed)cd ingestion && make py_format_check→ cleanUI screen recording / screenshots:
Not applicable.
Checklist:
🤖 Generated with Claude Code
Summary by Gitar
get_filtered_countinstatus.pyto correctly aggregate helper-driven and legacyfilter()counts without double-counting.project_nameto string indashboard_service.pyfor consistent formatting in visibility reports.test_filter_visibility.pyto verify accuracy under mixed helper and legacy caller scenarios.This will update automatically on new commits.