fix(unitycatalog): scope lineage caches per catalog, surface lineage failures#28648
fix(unitycatalog): scope lineage caches per catalog, surface lineage failures#28648ulixius9 wants to merge 11 commits into
Conversation
…failures Lineage workflow previously materialized the entire workspace's system.access.table_lineage / column_lineage / external table graph into unbounded in-memory maps, and per-edge failures were only visible at DEBUG. - Scope table/column lineage and external-location queries to one catalog at a time (system table predicates) and clear the maps between catalogs so memory stays bounded to a single catalog - Normalize FQN casing (lowercase) at cache boundaries so case differences between OM names and UC system-table values cannot silently drop edges - Per-edge lineage failures now yield Either(left=StackTraceError) so they appear in workflow status; helper failures log at WARNING instead of DEBUG - Replace SELECT * in information_schema tag queries with explicit column projection - Drain context.deleted_tables after each catalog's mark-deleted pass so the list does not grow across the whole incremental run Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…catalog caches Replace the per-catalog in-memory lineage maps with a Snowflake ACCESS_HISTORY-style streaming model: iterate the configured lookback in `lineageQueryChunkSize`-day windows, stream one combined table+column lineage query per catalog per window, and resolve each edge endpoint to an OpenMetadata table through a bounded LRU cache (hits + misses). - Combined `UNITY_CATALOG_LINEAGE` query aggregates column pairs server-side via `collect_set(struct(...))` so client memory stays O(window) regardless of metastore lineage size; `collect_set` avoids the table x column fan-out. - New `lineageQueryChunkSize` connection field (default 7) tunes days scanned per query: bigger = fewer queries (faster), smaller = smaller scans. - External-location lineage is now edge-driven off the external-tables query and shares the same bounded LRU instead of iterating every OM table. - Drops the unbounded `table_lineage_map`/`column_lineage_map`/ `external_location_map` dicts and the per-catalog cache/clear machinery. - Keeps case-insensitive FQN matching and per-edge `Either(left)` failure surfacing; adds per-catalog emitted/skipped/failed diagnostics. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Address gitar bot review on #28648: - Remove the catalog name interpolated into the lineage and external-table system-table scans (`'{catalog}'`). The streaming day-window model plus the bounded LRU already keep memory O(window), so per-catalog SQL scoping is redundant; dropping it also removes the string-interpolation/SQL-injection surface flagged in the review. Lineage now runs one global windowed scan and one external-table scan instead of looping per catalog. - Honor `databaseFilterPattern` per edge in `_is_filtered_table` (catalog now checked alongside schema/table) since `_iter` no longer enumerates catalogs. - Fix case-insensitive resolution: `_resolve_table` now resolves with the lowercased FQN it already computes for the cache key, so `Cat.Schema.Table` from the system tables matches an entity stored as `cat.schema.table` instead of silently dropping the edge. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Address gitar bot follow-up review on #28648: - Lowercase the FQN inside `_is_filtered_table` so the table/column-lineage path (raw system-table casing) and the external-table path (pre-lowercased) filter identically against `databaseFilterPattern`/schema/table patterns. - Split the table-lineage summary's `skipped` counter into `filtered` (dropped by databaseFilterPattern) and `unresolved` (tables absent from OpenMetadata) so operators can tell intentional filtering from missing metadata. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
❌ PR checklist incompleteThis PR cannot be merged until the following are addressed on its linked issue:
The fields live on the linked issue in the Shipping project (open the issue → right sidebar → Projects). After you set them, re-run this check (or push a commit) — issue/project changes do not re-trigger it automatically. Maintainers can bypass this check by adding the |
There was a problem hiding this comment.
Pull request overview
This PR reworks Databricks Unity Catalog lineage extraction in the ingestion framework to avoid unbounded in-memory lineage caches by streaming lineage results in configurable day-sized windows, and improves operational visibility by surfacing per-row failures in workflow output. It also adds a new Unity Catalog connection field to tune lineage query chunking, tightens Unity Catalog tag queries to project explicit columns, and ensures incremental mark-deleted bookkeeping doesn’t grow unbounded across a run.
Changes:
- Add
lineageQueryChunkSize(default7) to the Unity Catalog connection schema and use it to split lineage extraction into day windows. - Replace bulk lineage caching with streaming lineage extraction using a combined table+column lineage query, plus an LRU cache for table resolution.
- Improve incremental deletion handling by draining the global deleted-tables list per deletion pass; update Unity Catalog tag queries to avoid
SELECT *; extend unit tests accordingly.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/unityCatalogConnection.json | Adds lineageQueryChunkSize connection field for chunked lineage querying. |
| ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py | Implements windowed streaming lineage extraction with bounded LRU table-resolution caching and error surfacing. |
| ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py | Replaces separate lineage queries with a combined lineage query; projects explicit columns for tag queries. |
| ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py | Drains context.deleted_tables under lock before deletion to keep memory bounded across runs. |
| ingestion/tests/unit/topology/database/test_unity_catalog_lineage.py | Updates/adds unit tests for chunk sizing, window iteration, streaming lineage behavior, and failure surfacing. |
| ingestion/tests/unit/topology/database/test_unitycatalog_incremental.py | Adds unit test to ensure the global deleted-tables list is drained after deletion. |
✅ TypeScript Types Auto-UpdatedThe generated TypeScript types have been automatically updated based on JSON schema changes in this PR. |
…ant filter Address gitar bot follow-up review on #28648: - Surface whole-window and external-scan query failures as Either(left=StackTraceError) instead of swallowing them with a log warning. A recurring failure (e.g. missing permissions on system.access.table_lineage) would otherwise reproduce the "silent zero edges, workflow reports success" problem this PR set out to fix. - Drop the redundant _is_filtered_table guard inside _build_table_edge. _yield_table_lineage already filters each row before calling it, so the guard re-ran the three regex filter calls on every emitted row; a None return now unambiguously means "unresolved". Row processing is split into _yield_window_lineage / _yield_row_lineage helpers for clarity. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ze minimum - Add bounded LRU dedup so an edge spanning multiple day-windows is emitted once instead of once per window; surface the deduplicated count in the per-run summary. - Add `minimum: 1` to the lineageQueryChunkSize connection schema so invalid values are rejected by UI/API, matching the runtime clamp. - Correct the lineage summary log to credit database/schema/table filter patterns rather than only databaseFilterPattern. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
| edge_key = ( | ||
| row.source_table_full_name.lower(), | ||
| row.target_table_full_name.lower(), | ||
| ) | ||
| if edge_key in self._seen_edges: | ||
| stats["duplicate"] += 1 | ||
| else: | ||
| self._seen_edges[edge_key] = True | ||
| yield from self._process_unseen_row(row, stats) |
There was a problem hiding this comment.
⚠️ Edge Case: Cross-window dedup keys on table pair only, dropping column lineage
_yield_row_lineage builds edge_key from only (source_table_full_name, target_table_full_name) and suppresses any later row with the same table pair. But column pairs are aggregated server-side per window (collect_set over that window's events), so different windows can carry different column-level pairs for the same table edge. Once the first window emits the A→B edge, every subsequent window's row for A→B is counted as a duplicate and dropped — including any column-lineage pairs that only appeared in those later windows. Result: column-level lineage becomes whatever subset happened to land in the first window in which the edge appeared, rather than the union across the lookback. Before this commit each window re-emitted the edge, so all column pairs were eventually sent (at the cost of redundant edge emits). This commit fixes the redundant table-edge emits but silently regresses column-lineage completeness.
Consider folding a normalized column-pair signature into the dedup key so edges that differ in their column lineage are still emitted, while true duplicates (same table pair AND same column set) are still suppressed.
Include the normalized column-pair set in the dedup key so windows that contribute distinct column lineage for the same table edge are still emitted.:
column_pairs = tuple(sorted(self._parse_column_pairs(row.column_pairs)))
edge_key = (
row.source_table_full_name.lower(),
row.target_table_full_name.lower(),
column_pairs,
)
if edge_key in self._seen_edges:
stats["duplicate"] += 1
else:
self._seen_edges[edge_key] = True
yield from self._process_unseen_row(row, stats)
- Apply fix
Check the box to apply the fix or reply for a change | Was this helpful? React with 👍 / 👎
| else: | ||
| self._seen_edges[edge_key] = True | ||
| yield from self._process_unseen_row(row, stats) |
There was a problem hiding this comment.
💡 Bug: Edge marked seen before processing, so failed rows never retry
_yield_row_lineage inserts edge_key into _seen_edges before _process_unseen_row runs. If processing raises (counted as failed and surfaced as Either(left)), the edge is still recorded as seen, so the same edge appearing in a later window is silently treated as a duplicate and never re-attempted — the operator sees one failure and zero recovery even though a later window might have succeeded. Marking the edge seen only after a successful emit (or at least not on the failure path) would let transient per-row failures recover in a subsequent window. Low severity since most failures are deterministic, but the bounded LRU + fail-then-skip combination can permanently drop an edge.
Only record the edge as seen once it has been successfully emitted, so failed rows can be retried in a later window.:
if edge_key in self._seen_edges:
stats["duplicate"] += 1
return
emitted = False
for either in self._process_unseen_row(row, stats):
if either.right is not None:
emitted = True
yield either
if emitted:
self._seen_edges[edge_key] = True
- Apply fix
Check the box to apply the fix or reply for a change | Was this helpful? React with 👍 / 👎
| def _process_unseen_row(self, row: Any, stats: dict) -> Iterable[Either[AddLineageRequest]]: | ||
| if self._is_filtered_table(row.source_table_full_name) or self._is_filtered_table(row.target_table_full_name): | ||
| stats["filtered"] += 1 |
There was a problem hiding this comment.
Cross-catalog lineage silently dropped when source is filtered
The old _iter enumerated only target-side entities that passed filter patterns, leaving source-side resolution unrestricted (any OpenMetadata-known source was linked). Now _process_unseen_row drops an edge when either endpoint matches databaseFilterPattern/schemaFilterPattern/tableFilterPattern. Any user who has databaseFilterPattern set to include a subset of catalogs but whose lineage graph has upstream tables in excluded (yet ingested) catalogs will silently lose those cross-catalog edges — they are counted as filtered, not unresolved, so there is no indication to the operator that real edges were removed. Consider applying _is_filtered_table to the target only, and using only the _resolve_table miss to gate the source, which mirrors the old behavior.
| source_col = item.get("u") or item.get("U") | ||
| target_col = item.get("d") or item.get("D") |
There was a problem hiding this comment.
The uppercase fallbacks
item.get("U") and item.get("D") are never exercised: the SQL aliases are always lowercase (struct(source_column_name AS u, target_column_name AS d)) and Databricks serialises struct field names verbatim. Keeping them creates confusion about the expected key format, and the or short-circuit means a hypothetically falsy (empty-string) value in "u" would silently fall through to "U" rather than being rejected. Removing the fallbacks makes the contract explicit.
| source_col = item.get("u") or item.get("U") | |
| target_col = item.get("d") or item.get("D") | |
| source_col = item.get("u") | |
| target_col = item.get("d") |
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
| def _yield_external_lineage(self) -> Iterable[Either[AddLineageRequest]]: | ||
| """ | ||
| Stream external tables and create container lineage for each, resolving | ||
| the table through the shared bounded LRU. External-table storage paths | ||
| are a current snapshot, not an event stream, so this is a single scan | ||
| rather than a windowed one. Catalogs excluded by the database filter are | ||
| dropped per row via `_is_filtered_table`. | ||
| """ | ||
| try: | ||
| with self.engine.connect() as conn: | ||
| rows = conn.execution_options(stream_results=True, max_row_buffer=1000).execute( | ||
| text(UNITY_CATALOG_EXTERNAL_TABLES) | ||
| ) | ||
|
|
||
| yield Either( | ||
| right=AddLineageRequest( | ||
| edge=EntitiesEdge( | ||
| toEntity=EntityReference(id=table.id, type="table"), | ||
| fromEntity=EntityReference(id=from_entity.id, type="table"), | ||
| lineageDetails=lineage_details, | ||
| ) | ||
| ), | ||
| for row in rows: | ||
| databricks_table_fqn = f"{row.table_catalog}.{row.table_schema}.{row.table_name}".lower() | ||
| if self._is_filtered_table(databricks_table_fqn): | ||
| continue | ||
| table_entity = self._resolve_table(databricks_table_fqn) | ||
| if table_entity: | ||
| yield from self._process_external_location_lineage(table_entity, row.storage_path) | ||
| except Exception as exc: | ||
| yield Either( # pyright: ignore[reportCallIssue] | ||
| left=StackTraceError( | ||
| name="external-table-lineage", | ||
| error=f"Failed to fetch external table locations: {exc}", | ||
| stackTrace=traceback.format_exc(), | ||
| ) | ||
| except Exception as exc: | ||
| logger.debug(f"Error processing lineage {source_table_full_name} -> {databricks_table_fqn}: {exc}") | ||
| logger.debug(traceback.format_exc()) | ||
| ) |
There was a problem hiding this comment.
External lineage has no stats tracking unlike table lineage
_yield_table_lineage keeps a detailed stats dict and logs an emitted / duplicate / filtered / unresolved / failed summary. _yield_external_lineage emits no equivalent summary, so operators cannot tell how many external-location edges were produced, skipped due to filtering, or left unresolved (table not in OpenMetadata). Adding a parallel counter and a logger.info at the end of _yield_external_lineage would give operators the same visibility for the external scan that they now have for the system-table scan.
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
Code Review
|
| Compact |
|
Was this helpful? React with 👍 / 👎 | Gitar
❌ UI Checkstyle Failed❌ I18n SyncTranslation locale files are out of sync with Affected files
Fix locally (fast — only checks files changed in this branch): make ui-checkstyle-changed |
| def _yield_row_lineage(self, row: Any, stats: dict) -> Iterable[Either[AddLineageRequest]]: | ||
| """ | ||
| Skip edges already streamed in an earlier window so an edge whose events | ||
| span multiple windows is emitted once instead of once per window. The | ||
| dedup set is a bounded LRU, so only recently-seen edges are suppressed; | ||
| edges past the cache window may re-emit (lineage adds are idempotent). | ||
| """ | ||
| edge_key = ( | ||
| row.source_table_full_name.lower(), | ||
| row.target_table_full_name.lower(), | ||
| ) | ||
| if edge_key in self._seen_edges: | ||
| stats["duplicate"] += 1 | ||
| else: | ||
| self._seen_edges[edge_key] = True | ||
| yield from self._process_unseen_row(row, stats) |
There was a problem hiding this comment.
Cross-window dedup silently drops column lineage
_seen_edges deduplicates on (source, target) alone. When window 1 contains a table-lineage event for edge A→B but no column-lineage event (e.g., the originating query used SELECT *, which Databricks does not always track at column level), and window 2 contains both a table-lineage event and column-lineage events for the same edge, the window 2 row is counted as a "duplicate" and skipped. The edge is emitted without column pairs, even though column lineage is available within the lookback window.
The PR description explicitly states lineage adds are idempotent, making cross-window dedup a pure performance optimisation. Removing it entirely — or only deduplicating rows where column_pairs is empty — would restore the old behaviour of always capturing available column lineage. Without dedup the same edge just gets re-sent, which is already the documented fallback when the LRU evicts an entry.
|
|



Describe your changes
The Unity Catalog lineage workflow previously bulk-loaded the entire workspace's
system.access.table_lineage/system.access.column_lineage/ external-table graph into unbounded in-memory maps before processing a single table, and per-edge lineage failures were only logged at DEBUG — invisible to operators at the default log level.This PR reworks UC lineage into a bounded, per-day-window streaming model — the Databricks analogue of the Snowflake
ACCESS_HISTORYlineage path — so memory stays bounded regardless of metastore size and lineage is emitted as a stream instead of after a full-graph preload.Per-window streaming instead of in-memory caches
_iter()walks one catalog at a time; for each catalog the configuredqueryLogDurationlookback is split intolineageQueryChunkSize-day windows (_iter_date_windows). Each window runs one combined table+column lineage query (UNITY_CATALOG_LINEAGE) scoped to that catalog (lower(split_part(target_table_full_name, '.', 1)) = '{catalog}') and time window, and rows are streamed (stream_results=True) — never buffered as a whole-graph map.collect_set(struct(...))+to_json, so client memory staysO(window)no matter how large the lineage graph is.collect_set(notcollect_list) avoids the table × column row fan-out.LRUCache(maxsize 1000, hits + misses cached), so a hot upstream table referenced by many edges is fetched once and repeated unresolvable lookups stay cheap.table_lineage_map/column_lineage_map/external_location_mapdicts and the per-catalog cache/clear machinery are removed entirely.New
lineageQueryChunkSizeconnection field (default 7)Case-insensitive FQN matching
Failure visibility
yield Either(left=StackTraceError(...))so they appear in workflow status and counts instead of being swallowed at DEBUG.emitted / skipped / failedsummary so operators can see why edges were dropped (filtered or unresolved tables vs. row errors) instead of a silent zero.Smaller cleanups
information_schematag queries use explicit column projection instead ofSELECT *.context.deleted_tablesafter each catalog's mark-deleted pass so the list no longer grows unbounded across the whole run.Type of change
Testing
Validated end-to-end against a real Unity Catalog (
e2e_uc) + OpenMetadata instance:emitted/skipped/failedsummary per catalog.lineageQueryChunkSizeconfirmed to control query count and wall-clock: chunk=30 → 1 query (~32s), chunk=7 (default) → ~5 queries (~58s), chunk=1 → 30 queries (~6m) over a 30-day lookback — all 100% success.Either(left)failure surfacing,_itercatalog filtering).ruff+basedpyrightclean.Checklist
🤖 Generated with Claude Code
Fixes #28908
Greptile Summary
This PR replaces the Unity Catalog lineage workflow's bulk-preload-then-iterate model with a per-window streaming model: a single combined table+column lineage SQL query is issued per time-window chunk, rows are streamed with
stream_results=True, and each endpoint is resolved through a bounded LRU cache, keeping client memoryO(window)regardless of metastore size. Per-edge and per-window failures now surface asEither(left=StackTraceError(...))instead of being swallowed at DEBUG, and each run logs anemitted/skipped/failedsummary._iter_date_windows()splitsqueryLogDurationintolineageQueryChunkSize-day chunks; the newUNITY_CATALOG_LINEAGECTE combines table and column lineage in one server-side-aggregated query per window usingcollect_set.metadata.pyfix:deleted_tablesis now snapshot-and-cleared under_state_lockafter each catalog's mark-deleted pass, preventing unbounded list growth across a full run.lineageQueryChunkSize(default 7, minimum 1) is added tounityCatalogConnection.jsonwith corresponding generated TypeScript types.Confidence Score: 4/5
Safe to merge for the memory-bounding and failure-visibility goals; one edge case where column lineage is silently dropped should be addressed before or shortly after merge.
The cross-window dedup in _yield_row_lineage keys only on (source, target). When a table-lineage event for edge A->B falls in window 1 with no column info, and a later query in window 2 generates both table and column lineage for the same edge, window 2's column-enriched row is counted as a duplicate and skipped. The old bulk-load model avoided this regression. All other changes are sound.
ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py — specifically the deduplication logic in _yield_row_lineage and its interaction with cross-window column lineage availability.
Important Files Changed
Sequence Diagram
%%{init: {'theme': 'neutral'}}%% sequenceDiagram participant Iter as _iter() participant TL as _yield_table_lineage() participant WL as _yield_window_lineage() participant DB as Databricks system.access participant LRU as _table_cache (LRU) participant OM as OpenMetadata API participant EL as _yield_external_lineage() Iter->>TL: yield from loop for each day-window chunk TL->>WL: _yield_window_lineage(start, end, stats) WL->>DB: "UNITY_CATALOG_LINEAGE (stream_results=True)" DB-->>WL: streaming rows (source_fqn, target_fqn, column_pairs JSON) loop for each row WL->>WL: dedup check (_seen_edges LRU) alt not seen before WL->>WL: _is_filtered_table(source/target) alt not filtered WL->>LRU: _resolve_table(source_fqn) alt cache miss LRU->>OM: get_by_name(Table, fqn) OM-->>LRU: Table or None end WL->>LRU: _resolve_table(target_fqn) WL-->>Iter: "Either(right=AddLineageRequest) or Either(left=error)" end end end end TL-->>Iter: stats log Iter->>EL: yield from EL->>DB: "UNITY_CATALOG_EXTERNAL_TABLES (stream_results=True)" DB-->>EL: streaming rows loop for each row EL->>LRU: _resolve_table(fqn) alt resolved EL->>OM: es_search_container_by_path(storage_path) EL-->>Iter: "Either(right=AddLineageRequest) or Either(left=error)" end end%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%% sequenceDiagram participant Iter as _iter() participant TL as _yield_table_lineage() participant WL as _yield_window_lineage() participant DB as Databricks system.access participant LRU as _table_cache (LRU) participant OM as OpenMetadata API participant EL as _yield_external_lineage() Iter->>TL: yield from loop for each day-window chunk TL->>WL: _yield_window_lineage(start, end, stats) WL->>DB: "UNITY_CATALOG_LINEAGE (stream_results=True)" DB-->>WL: streaming rows (source_fqn, target_fqn, column_pairs JSON) loop for each row WL->>WL: dedup check (_seen_edges LRU) alt not seen before WL->>WL: _is_filtered_table(source/target) alt not filtered WL->>LRU: _resolve_table(source_fqn) alt cache miss LRU->>OM: get_by_name(Table, fqn) OM-->>LRU: Table or None end WL->>LRU: _resolve_table(target_fqn) WL-->>Iter: "Either(right=AddLineageRequest) or Either(left=error)" end end end end TL-->>Iter: stats log Iter->>EL: yield from EL->>DB: "UNITY_CATALOG_EXTERNAL_TABLES (stream_results=True)" DB-->>EL: streaming rows loop for each row EL->>LRU: _resolve_table(fqn) alt resolved EL->>OM: es_search_container_by_path(storage_path) EL-->>Iter: "Either(right=AddLineageRequest) or Either(left=error)" end endReviews (2): Last reviewed commit: "Merge branch 'main' into uc-lineage-scop..." | Re-trigger Greptile