Skip to content

fix(unitycatalog): scope lineage caches per catalog, surface lineage failures#28648

Open
ulixius9 wants to merge 11 commits into
mainfrom
uc-lineage-scoping-bounded-caches
Open

fix(unitycatalog): scope lineage caches per catalog, surface lineage failures#28648
ulixius9 wants to merge 11 commits into
mainfrom
uc-lineage-scoping-bounded-caches

Conversation

@ulixius9

@ulixius9 ulixius9 commented Jun 2, 2026

Copy link
Copy Markdown
Member

Describe your changes

The Unity Catalog lineage workflow previously bulk-loaded the entire workspace's system.access.table_lineage / system.access.column_lineage / external-table graph into unbounded in-memory maps before processing a single table, and per-edge lineage failures were only logged at DEBUG — invisible to operators at the default log level.

This PR reworks UC lineage into a bounded, per-day-window streaming model — the Databricks analogue of the Snowflake ACCESS_HISTORY lineage path — so memory stays bounded regardless of metastore size and lineage is emitted as a stream instead of after a full-graph preload.

Per-window streaming instead of in-memory caches

  • _iter() walks one catalog at a time; for each catalog the configured queryLogDuration lookback is split into lineageQueryChunkSize-day windows (_iter_date_windows). Each window runs one combined table+column lineage query (UNITY_CATALOG_LINEAGE) scoped to that catalog (lower(split_part(target_table_full_name, '.', 1)) = '{catalog}') and time window, and rows are streamed (stream_results=True) — never buffered as a whole-graph map.
  • Column pairs are aggregated server-side per edge via collect_set(struct(...)) + to_json, so client memory stays O(window) no matter how large the lineage graph is. collect_set (not collect_list) avoids the table × column row fan-out.
  • Each edge endpoint is resolved to an OpenMetadata table through a bounded LRUCache (maxsize 1000, hits + misses cached), so a hot upstream table referenced by many edges is fetched once and repeated unresolvable lookups stay cheap.
  • The unbounded table_lineage_map / column_lineage_map / external_location_map dicts and the per-catalog cache/clear machinery are removed entirely.
  • External-location lineage is now edge-driven off the external-tables query and shares the same bounded LRU, instead of iterating every OpenMetadata table.

New lineageQueryChunkSize connection field (default 7)

  • Tunes how many days are scanned per system-table query. Larger values issue fewer queries (faster wall-clock); smaller values keep each scan smaller. Streaming bounds client memory regardless of the value.

Case-insensitive FQN matching

  • Source/target FQNs are lowercased at the resolution boundary so casing differences between OpenMetadata names and UC system-table values can no longer silently drop lineage edges.

Failure visibility

  • Per-edge / per-table lineage failures yield Either(left=StackTraceError(...)) so they appear in workflow status and counts instead of being swallowed at DEBUG.
  • Each catalog logs an emitted / skipped / failed summary so operators can see why edges were dropped (filtered or unresolved tables vs. row errors) instead of a silent zero.

Smaller cleanups

  • information_schema tag queries use explicit column projection instead of SELECT *.
  • Incremental metadata runs drain context.deleted_tables after each catalog's mark-deleted pass so the list no longer grows unbounded across the whole run.

Type of change

  • Bug fix
  • Improvement

Testing

Validated end-to-end against a real Unity Catalog (e2e_uc) + OpenMetadata instance:

  • Metadata + lineage workflows complete at 100% success with 0 errors.
  • The new per-window streaming path runs correctly and surfaces an emitted/skipped/failed summary per catalog.
  • lineageQueryChunkSize confirmed to control query count and wall-clock: chunk=30 → 1 query (~32s), chunk=7 (default) → ~5 queries (~58s), chunk=1 → 30 queries (~6m) over a 30-day lookback — all 100% success.
  • UC unit tests pass (streaming windows, LRU hit/miss caching, server-side column-pair parsing, edge building, filtering, Either(left) failure surfacing, _iter catalog filtering). ruff + basedpyright clean.

Note for reviewers: lineageQueryChunkSize is a new connection-schema field — run make generate in a full dev env so the generated Java/TS models and the UI connection form pick it up (the Python model regenerates from the JSON schema; generated dirs are gitignored).

Checklist

  • I have read the CONTRIBUTING document.
  • My code follows the style guidelines of this project.
  • I have performed a self-review of my own code.
  • I have added tests that prove my fix is effective or that my feature works.
  • New and existing unit tests pass locally with my changes.

🤖 Generated with Claude Code

Fixes #28908

Greptile Summary

This PR replaces the Unity Catalog lineage workflow's bulk-preload-then-iterate model with a per-window streaming model: a single combined table+column lineage SQL query is issued per time-window chunk, rows are streamed with stream_results=True, and each endpoint is resolved through a bounded LRU cache, keeping client memory O(window) regardless of metastore size. Per-edge and per-window failures now surface as Either(left=StackTraceError(...)) instead of being swallowed at DEBUG, and each run logs an emitted/skipped/failed summary.

  • Streaming windowed lineage: _iter_date_windows() splits queryLogDuration into lineageQueryChunkSize-day chunks; the new UNITY_CATALOG_LINEAGE CTE combines table and column lineage in one server-side-aggregated query per window using collect_set.
  • LRU-cached table resolution: both hits and misses are cached (maxsize 1000) so a popular upstream table is fetched from OpenMetadata once across all windows and edges.
  • metadata.py fix: deleted_tables is now snapshot-and-cleared under _state_lock after each catalog's mark-deleted pass, preventing unbounded list growth across a full run.
  • New schema field: lineageQueryChunkSize (default 7, minimum 1) is added to unityCatalogConnection.json with corresponding generated TypeScript types.

Confidence Score: 4/5

Safe to merge for the memory-bounding and failure-visibility goals; one edge case where column lineage is silently dropped should be addressed before or shortly after merge.

The cross-window dedup in _yield_row_lineage keys only on (source, target). When a table-lineage event for edge A->B falls in window 1 with no column info, and a later query in window 2 generates both table and column lineage for the same edge, window 2's column-enriched row is counted as a duplicate and skipped. The old bulk-load model avoided this regression. All other changes are sound.

ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py — specifically the deduplication logic in _yield_row_lineage and its interaction with cross-window column lineage availability.

Important Files Changed

Filename Overview
ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py Core rewrite from bulk-cache to streaming windowed model; introduces cross-window dedup that can silently drop column lineage when table-level events precede column-level events across windows
ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py Replaces two separate queries with a single combined CTE; uses server-side collect_set for column aggregation; information_schema queries gain explicit column projection
ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py Drains deleted_tables under _state_lock after each catalog's mark-deleted pass; correct snapshot-then-clear pattern prevents unbounded list growth
ingestion/tests/unit/topology/database/test_unity_catalog_lineage.py Full test suite replacement covering date windows, LRU hit/miss caching, column-pair parsing, edge building, filter short-circuit, Either(left) failure surfacing, and dedup across windows
ingestion/tests/unit/topology/database/test_unitycatalog_incremental.py New test verifies that deleted_tables is drained after mark_tables_as_deleted processes it, preventing cross-catalog accumulation
openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/unityCatalogConnection.json Adds lineageQueryChunkSize field with default 7 and minimum 1; schema change is backward compatible

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant Iter as _iter()
    participant TL as _yield_table_lineage()
    participant WL as _yield_window_lineage()
    participant DB as Databricks system.access
    participant LRU as _table_cache (LRU)
    participant OM as OpenMetadata API
    participant EL as _yield_external_lineage()

    Iter->>TL: yield from
    loop for each day-window chunk
        TL->>WL: _yield_window_lineage(start, end, stats)
        WL->>DB: "UNITY_CATALOG_LINEAGE (stream_results=True)"
        DB-->>WL: streaming rows (source_fqn, target_fqn, column_pairs JSON)
        loop for each row
            WL->>WL: dedup check (_seen_edges LRU)
            alt not seen before
                WL->>WL: _is_filtered_table(source/target)
                alt not filtered
                    WL->>LRU: _resolve_table(source_fqn)
                    alt cache miss
                        LRU->>OM: get_by_name(Table, fqn)
                        OM-->>LRU: Table or None
                    end
                    WL->>LRU: _resolve_table(target_fqn)
                    WL-->>Iter: "Either(right=AddLineageRequest) or Either(left=error)"
                end
            end
        end
    end
    TL-->>Iter: stats log
    Iter->>EL: yield from
    EL->>DB: "UNITY_CATALOG_EXTERNAL_TABLES (stream_results=True)"
    DB-->>EL: streaming rows
    loop for each row
        EL->>LRU: _resolve_table(fqn)
        alt resolved
            EL->>OM: es_search_container_by_path(storage_path)
            EL-->>Iter: "Either(right=AddLineageRequest) or Either(left=error)"
        end
    end
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant Iter as _iter()
    participant TL as _yield_table_lineage()
    participant WL as _yield_window_lineage()
    participant DB as Databricks system.access
    participant LRU as _table_cache (LRU)
    participant OM as OpenMetadata API
    participant EL as _yield_external_lineage()

    Iter->>TL: yield from
    loop for each day-window chunk
        TL->>WL: _yield_window_lineage(start, end, stats)
        WL->>DB: "UNITY_CATALOG_LINEAGE (stream_results=True)"
        DB-->>WL: streaming rows (source_fqn, target_fqn, column_pairs JSON)
        loop for each row
            WL->>WL: dedup check (_seen_edges LRU)
            alt not seen before
                WL->>WL: _is_filtered_table(source/target)
                alt not filtered
                    WL->>LRU: _resolve_table(source_fqn)
                    alt cache miss
                        LRU->>OM: get_by_name(Table, fqn)
                        OM-->>LRU: Table or None
                    end
                    WL->>LRU: _resolve_table(target_fqn)
                    WL-->>Iter: "Either(right=AddLineageRequest) or Either(left=error)"
                end
            end
        end
    end
    TL-->>Iter: stats log
    Iter->>EL: yield from
    EL->>DB: "UNITY_CATALOG_EXTERNAL_TABLES (stream_results=True)"
    DB-->>EL: streaming rows
    loop for each row
        EL->>LRU: _resolve_table(fqn)
        alt resolved
            EL->>OM: es_search_container_by_path(storage_path)
            EL-->>Iter: "Either(right=AddLineageRequest) or Either(left=error)"
        end
    end
Loading

Reviews (2): Last reviewed commit: "Merge branch 'main' into uc-lineage-scop..." | Re-trigger Greptile

Greptile also left 1 inline comment on this PR.

…failures

Lineage workflow previously materialized the entire workspace's
system.access.table_lineage / column_lineage / external table graph into
unbounded in-memory maps, and per-edge failures were only visible at DEBUG.

- Scope table/column lineage and external-location queries to one catalog
  at a time (system table predicates) and clear the maps between catalogs
  so memory stays bounded to a single catalog
- Normalize FQN casing (lowercase) at cache boundaries so case differences
  between OM names and UC system-table values cannot silently drop edges
- Per-edge lineage failures now yield Either(left=StackTraceError) so they
  appear in workflow status; helper failures log at WARNING instead of DEBUG
- Replace SELECT * in information_schema tag queries with explicit column
  projection
- Drain context.deleted_tables after each catalog's mark-deleted pass so the
  list does not grow across the whole incremental run

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@github-actions github-actions Bot added Ingestion safe to test Add this label to run secure Github workflows on PRs labels Jun 2, 2026
Comment thread ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py Outdated
…catalog caches

Replace the per-catalog in-memory lineage maps with a Snowflake
ACCESS_HISTORY-style streaming model: iterate the configured lookback in
`lineageQueryChunkSize`-day windows, stream one combined table+column
lineage query per catalog per window, and resolve each edge endpoint to an
OpenMetadata table through a bounded LRU cache (hits + misses).

- Combined `UNITY_CATALOG_LINEAGE` query aggregates column pairs server-side
  via `collect_set(struct(...))` so client memory stays O(window) regardless
  of metastore lineage size; `collect_set` avoids the table x column fan-out.
- New `lineageQueryChunkSize` connection field (default 7) tunes days scanned
  per query: bigger = fewer queries (faster), smaller = smaller scans.
- External-location lineage is now edge-driven off the external-tables query
  and shares the same bounded LRU instead of iterating every OM table.
- Drops the unbounded `table_lineage_map`/`column_lineage_map`/
  `external_location_map` dicts and the per-catalog cache/clear machinery.
- Keeps case-insensitive FQN matching and per-edge `Either(left)` failure
  surfacing; adds per-catalog emitted/skipped/failed diagnostics.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Address gitar bot review on #28648:

- Remove the catalog name interpolated into the lineage and external-table
  system-table scans (`'{catalog}'`). The streaming day-window model plus the
  bounded LRU already keep memory O(window), so per-catalog SQL scoping is
  redundant; dropping it also removes the string-interpolation/SQL-injection
  surface flagged in the review. Lineage now runs one global windowed scan and
  one external-table scan instead of looping per catalog.

- Honor `databaseFilterPattern` per edge in `_is_filtered_table` (catalog now
  checked alongside schema/table) since `_iter` no longer enumerates catalogs.

- Fix case-insensitive resolution: `_resolve_table` now resolves with the
  lowercased FQN it already computes for the cache key, so `Cat.Schema.Table`
  from the system tables matches an entity stored as `cat.schema.table` instead
  of silently dropping the edge.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Comment thread ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py Outdated
Comment thread ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py Outdated
ulixius9 and others added 2 commits June 9, 2026 19:11
Address gitar bot follow-up review on #28648:

- Lowercase the FQN inside `_is_filtered_table` so the table/column-lineage
  path (raw system-table casing) and the external-table path (pre-lowercased)
  filter identically against `databaseFilterPattern`/schema/table patterns.

- Split the table-lineage summary's `skipped` counter into `filtered`
  (dropped by databaseFilterPattern) and `unresolved` (tables absent from
  OpenMetadata) so operators can tell intentional filtering from missing
  metadata.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@ulixius9 ulixius9 marked this pull request as ready for review June 9, 2026 13:58
@ulixius9 ulixius9 requested a review from a team as a code owner June 9, 2026 13:58
Copilot AI review requested due to automatic review settings June 9, 2026 13:58
@github-actions

github-actions Bot commented Jun 9, 2026

Copy link
Copy Markdown
Contributor

❌ PR checklist incomplete

This PR cannot be merged until the following are addressed on its linked issue:

The fields live on the linked issue in the Shipping project (open the issue → right sidebar → Projects). After you set them, re-run this check (or push a commit) — issue/project changes do not re-trigger it automatically.

Maintainers can bypass this check by adding the skip-pr-checks label.

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR reworks Databricks Unity Catalog lineage extraction in the ingestion framework to avoid unbounded in-memory lineage caches by streaming lineage results in configurable day-sized windows, and improves operational visibility by surfacing per-row failures in workflow output. It also adds a new Unity Catalog connection field to tune lineage query chunking, tightens Unity Catalog tag queries to project explicit columns, and ensures incremental mark-deleted bookkeeping doesn’t grow unbounded across a run.

Changes:

  • Add lineageQueryChunkSize (default 7) to the Unity Catalog connection schema and use it to split lineage extraction into day windows.
  • Replace bulk lineage caching with streaming lineage extraction using a combined table+column lineage query, plus an LRU cache for table resolution.
  • Improve incremental deletion handling by draining the global deleted-tables list per deletion pass; update Unity Catalog tag queries to avoid SELECT *; extend unit tests accordingly.

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/unityCatalogConnection.json Adds lineageQueryChunkSize connection field for chunked lineage querying.
ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py Implements windowed streaming lineage extraction with bounded LRU table-resolution caching and error surfacing.
ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py Replaces separate lineage queries with a combined lineage query; projects explicit columns for tag queries.
ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py Drains context.deleted_tables under lock before deletion to keep memory bounded across runs.
ingestion/tests/unit/topology/database/test_unity_catalog_lineage.py Updates/adds unit tests for chunk sizing, window iteration, streaming lineage behavior, and failure surfacing.
ingestion/tests/unit/topology/database/test_unitycatalog_incremental.py Adds unit test to ensure the global deleted-tables list is drained after deletion.

Comment thread ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py Outdated
Comment thread ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py Outdated
Comment thread ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py Outdated
@github-actions

github-actions Bot commented Jun 9, 2026

Copy link
Copy Markdown
Contributor

✅ TypeScript Types Auto-Updated

The generated TypeScript types have been automatically updated based on JSON schema changes in this PR.

@github-actions github-actions Bot requested a review from a team as a code owner June 9, 2026 14:04
Comment thread ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py Outdated
Comment thread ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py Outdated
…ant filter

Address gitar bot follow-up review on #28648:

- Surface whole-window and external-scan query failures as
  Either(left=StackTraceError) instead of swallowing them with a log
  warning. A recurring failure (e.g. missing permissions on
  system.access.table_lineage) would otherwise reproduce the "silent zero
  edges, workflow reports success" problem this PR set out to fix.

- Drop the redundant _is_filtered_table guard inside _build_table_edge.
  _yield_table_lineage already filters each row before calling it, so the
  guard re-ran the three regex filter calls on every emitted row; a None
  return now unambiguously means "unresolved". Row processing is split into
  _yield_window_lineage / _yield_row_lineage helpers for clarity.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings June 9, 2026 14:46

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 6 out of 17 changed files in this pull request and generated 3 comments.

…ze minimum

- Add bounded LRU dedup so an edge spanning multiple day-windows is emitted
  once instead of once per window; surface the deduplicated count in the
  per-run summary.
- Add `minimum: 1` to the lineageQueryChunkSize connection schema so invalid
  values are rejected by UI/API, matching the runtime clamp.
- Correct the lineage summary log to credit database/schema/table filter
  patterns rather than only databaseFilterPattern.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Comment on lines +323 to +331
edge_key = (
row.source_table_full_name.lower(),
row.target_table_full_name.lower(),
)
if edge_key in self._seen_edges:
stats["duplicate"] += 1
else:
self._seen_edges[edge_key] = True
yield from self._process_unseen_row(row, stats)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Edge Case: Cross-window dedup keys on table pair only, dropping column lineage

_yield_row_lineage builds edge_key from only (source_table_full_name, target_table_full_name) and suppresses any later row with the same table pair. But column pairs are aggregated server-side per window (collect_set over that window's events), so different windows can carry different column-level pairs for the same table edge. Once the first window emits the A→B edge, every subsequent window's row for A→B is counted as a duplicate and dropped — including any column-lineage pairs that only appeared in those later windows. Result: column-level lineage becomes whatever subset happened to land in the first window in which the edge appeared, rather than the union across the lookback. Before this commit each window re-emitted the edge, so all column pairs were eventually sent (at the cost of redundant edge emits). This commit fixes the redundant table-edge emits but silently regresses column-lineage completeness.

Consider folding a normalized column-pair signature into the dedup key so edges that differ in their column lineage are still emitted, while true duplicates (same table pair AND same column set) are still suppressed.

Include the normalized column-pair set in the dedup key so windows that contribute distinct column lineage for the same table edge are still emitted.:

column_pairs = tuple(sorted(self._parse_column_pairs(row.column_pairs)))
edge_key = (
    row.source_table_full_name.lower(),
    row.target_table_full_name.lower(),
    column_pairs,
)
if edge_key in self._seen_edges:
    stats["duplicate"] += 1
else:
    self._seen_edges[edge_key] = True
    yield from self._process_unseen_row(row, stats)
  • Apply fix

Check the box to apply the fix or reply for a change | Was this helpful? React with 👍 / 👎

Comment on lines +329 to +331
else:
self._seen_edges[edge_key] = True
yield from self._process_unseen_row(row, stats)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Bug: Edge marked seen before processing, so failed rows never retry

_yield_row_lineage inserts edge_key into _seen_edges before _process_unseen_row runs. If processing raises (counted as failed and surfaced as Either(left)), the edge is still recorded as seen, so the same edge appearing in a later window is silently treated as a duplicate and never re-attempted — the operator sees one failure and zero recovery even though a later window might have succeeded. Marking the edge seen only after a successful emit (or at least not on the failure path) would let transient per-row failures recover in a subsequent window. Low severity since most failures are deterministic, but the bounded LRU + fail-then-skip combination can permanently drop an edge.

Only record the edge as seen once it has been successfully emitted, so failed rows can be retried in a later window.:

if edge_key in self._seen_edges:
    stats["duplicate"] += 1
    return
emitted = False
for either in self._process_unseen_row(row, stats):
    if either.right is not None:
        emitted = True
    yield either
if emitted:
    self._seen_edges[edge_key] = True
  • Apply fix

Check the box to apply the fix or reply for a change | Was this helpful? React with 👍 / 👎

Comment on lines +333 to +335
def _process_unseen_row(self, row: Any, stats: dict) -> Iterable[Either[AddLineageRequest]]:
if self._is_filtered_table(row.source_table_full_name) or self._is_filtered_table(row.target_table_full_name):
stats["filtered"] += 1

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Cross-catalog lineage silently dropped when source is filtered

The old _iter enumerated only target-side entities that passed filter patterns, leaving source-side resolution unrestricted (any OpenMetadata-known source was linked). Now _process_unseen_row drops an edge when either endpoint matches databaseFilterPattern/schemaFilterPattern/tableFilterPattern. Any user who has databaseFilterPattern set to include a subset of catalogs but whose lineage graph has upstream tables in excluded (yet ingested) catalogs will silently lose those cross-catalog edges — they are counted as filtered, not unresolved, so there is no indication to the operator that real edges were removed. Consider applying _is_filtered_table to the target only, and using only the _resolve_table miss to gate the source, which mirrors the old behavior.

Comment on lines +212 to +213
source_col = item.get("u") or item.get("U")
target_col = item.get("d") or item.get("D")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 The uppercase fallbacks item.get("U") and item.get("D") are never exercised: the SQL aliases are always lowercase (struct(source_column_name AS u, target_column_name AS d)) and Databricks serialises struct field names verbatim. Keeping them creates confusion about the expected key format, and the or short-circuit means a hypothetically falsy (empty-string) value in "u" would silently fall through to "U" rather than being rejected. Removing the fallbacks makes the contract explicit.

Suggested change
source_col = item.get("u") or item.get("U")
target_col = item.get("d") or item.get("D")
source_col = item.get("u")
target_col = item.get("d")

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Comment on lines +438 to +465
def _yield_external_lineage(self) -> Iterable[Either[AddLineageRequest]]:
"""
Stream external tables and create container lineage for each, resolving
the table through the shared bounded LRU. External-table storage paths
are a current snapshot, not an event stream, so this is a single scan
rather than a windowed one. Catalogs excluded by the database filter are
dropped per row via `_is_filtered_table`.
"""
try:
with self.engine.connect() as conn:
rows = conn.execution_options(stream_results=True, max_row_buffer=1000).execute(
text(UNITY_CATALOG_EXTERNAL_TABLES)
)

yield Either(
right=AddLineageRequest(
edge=EntitiesEdge(
toEntity=EntityReference(id=table.id, type="table"),
fromEntity=EntityReference(id=from_entity.id, type="table"),
lineageDetails=lineage_details,
)
),
for row in rows:
databricks_table_fqn = f"{row.table_catalog}.{row.table_schema}.{row.table_name}".lower()
if self._is_filtered_table(databricks_table_fqn):
continue
table_entity = self._resolve_table(databricks_table_fqn)
if table_entity:
yield from self._process_external_location_lineage(table_entity, row.storage_path)
except Exception as exc:
yield Either( # pyright: ignore[reportCallIssue]
left=StackTraceError(
name="external-table-lineage",
error=f"Failed to fetch external table locations: {exc}",
stackTrace=traceback.format_exc(),
)
except Exception as exc:
logger.debug(f"Error processing lineage {source_table_full_name} -> {databricks_table_fqn}: {exc}")
logger.debug(traceback.format_exc())
)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 External lineage has no stats tracking unlike table lineage

_yield_table_lineage keeps a detailed stats dict and logs an emitted / duplicate / filtered / unresolved / failed summary. _yield_external_lineage emits no equivalent summary, so operators cannot tell how many external-location edges were produced, skipped due to filtering, or left unresolved (table not in OpenMetadata). Adding a parallel counter and a logger.info at the end of _yield_external_lineage would give operators the same visibility for the external scan that they now have for the system-table scan.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Copilot AI review requested due to automatic review settings June 18, 2026 05:53

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot was unable to review this pull request because the user who requested the review has reached their quota limit.

@gitar-bot

gitar-bot Bot commented Jun 18, 2026

Copy link
Copy Markdown
Code Review ⚠️ Changes requested 9 resolved / 11 findings

Reworks Unity Catalog lineage into a bounded, per-window streaming model to improve memory efficiency and failure visibility. Requires addressing critical issues where cross-window deduplication drops column-level data and prematurely marks edges as seen, preventing retries on failure.

⚠️ Edge Case: Cross-window dedup keys on table pair only, dropping column lineage

📄 ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py:323-331 📄 ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py:218-227 📄 ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py:242

_yield_row_lineage builds edge_key from only (source_table_full_name, target_table_full_name) and suppresses any later row with the same table pair. But column pairs are aggregated server-side per window (collect_set over that window's events), so different windows can carry different column-level pairs for the same table edge. Once the first window emits the A→B edge, every subsequent window's row for A→B is counted as a duplicate and dropped — including any column-lineage pairs that only appeared in those later windows. Result: column-level lineage becomes whatever subset happened to land in the first window in which the edge appeared, rather than the union across the lookback. Before this commit each window re-emitted the edge, so all column pairs were eventually sent (at the cost of redundant edge emits). This commit fixes the redundant table-edge emits but silently regresses column-lineage completeness.

Consider folding a normalized column-pair signature into the dedup key so edges that differ in their column lineage are still emitted, while true duplicates (same table pair AND same column set) are still suppressed.

Include the normalized column-pair set in the dedup key so windows that contribute distinct column lineage for the same table edge are still emitted.
column_pairs = tuple(sorted(self._parse_column_pairs(row.column_pairs)))
edge_key = (
    row.source_table_full_name.lower(),
    row.target_table_full_name.lower(),
    column_pairs,
)
if edge_key in self._seen_edges:
    stats["duplicate"] += 1
else:
    self._seen_edges[edge_key] = True
    yield from self._process_unseen_row(row, stats)
💡 Bug: Edge marked seen before processing, so failed rows never retry

📄 ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py:329-331 📄 ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py:333-347

_yield_row_lineage inserts edge_key into _seen_edges before _process_unseen_row runs. If processing raises (counted as failed and surfaced as Either(left)), the edge is still recorded as seen, so the same edge appearing in a later window is silently treated as a duplicate and never re-attempted — the operator sees one failure and zero recovery even though a later window might have succeeded. Marking the edge seen only after a successful emit (or at least not on the failure path) would let transient per-row failures recover in a subsequent window. Low severity since most failures are deterministic, but the bounded LRU + fail-then-skip combination can permanently drop an edge.

Only record the edge as seen once it has been successfully emitted, so failed rows can be retried in a later window.
if edge_key in self._seen_edges:
    stats["duplicate"] += 1
    return
emitted = False
for either in self._process_unseen_row(row, stats):
    if either.right is not None:
        emitted = True
    yield either
if emitted:
    self._seen_edges[edge_key] = True
✅ 9 resolved
Security: SQL query uses string interpolation for catalog filter

📄 ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py:70 📄 ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py:88 📄 ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py:107 📄 ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py:117-119 📄 ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py:134-137 📄 ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py:163
The new {catalog} placeholder in UNITY_CATALOG_TABLE_LINEAGE, UNITY_CATALOG_COLUMN_LINEAGE, and UNITY_CATALOG_EXTERNAL_TABLES is interpolated via Python .format() rather than parameterized query binding. While the value originates from database.name.root.lower() (metadata entities, not direct user input) and Databricks catalog names are restricted identifiers that cannot contain single quotes, this pattern is fragile — a future change to the data source or a misconfigured catalog name could introduce SQL injection.

In practice the risk is very low because Unity Catalog enforces identifier naming rules, but using SQLAlchemy's text(...).bindparams() would be more robust.

Bug: Case-insensitive resolution not applied to actual table lookup

📄 ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py:148-162
The PR's stated goal is case-insensitive FQN matching so casing differences between OpenMetadata names and Unity Catalog system-table values no longer silently drop lineage edges. However, _resolve_table only lowercases cache_key for caching purposes — it passes the original-case databricks_table_fqn to _fetch_table_entity, which splits it and calls fqn.build(...) + metadata.get_by_name(...) with the original-case parts.

fqn.build preserves case and get_by_name does an exact FQN match, so a system-table value like Cat.Schema.Table will not resolve to an entity stored as cat.schema.table. The old _process_table_lineage path lowercased the FQN parts before fqn.build, so this change is a regression of exactly the behavior the PR claims to fix. The same applies to _is_filtered_table, where filter patterns are matched against original-case schema/table names.

The unit tests don't catch this because test_resolves_and_caches_hit seeds the lookup with an already-lowercase FQN on the first call.

Security: Catalog name string-interpolated into lineage SQL

📄 ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py:61-75 📄 ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py:259-263
UNITY_CATALOG_LINEAGE is built with .format(catalog=catalog_name, start_time=..., end_time=...) and the resulting string is wrapped in text(...) with no bound parameters. catalog_name is database.name.root.lower() derived from the Unity Catalog metastore. While not direct end-user input, a catalog name containing a single quote (or crafted content) would break the query or allow SQL injection into the system-table scan. This is the previously flagged interpolation at the old table_lineage query, now carried into the combined query. Prefer SQLAlchemy bound parameters (text(...).bindparams(...) / :catalog) for the literal values, or strictly validate/escape the catalog identifier before interpolation.

Edge Case: Inconsistent case-normalization between filter paths

📄 ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py:172-186 📄 ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py:233-235 📄 ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py:406
_is_filtered_table is now called with differently-cased input on the two lineage paths. In _yield_external_lineage the FQN is lowercased before filtering (databricks_table_fqn = f"{...}".lower(), line 406), but in _build_table_edge the raw row.source_table_full_name / row.target_table_full_name are passed un-lowercased (lines 233-235). filter_by_database/filter_by_schema/filter_by_table perform regex matching, which is case-sensitive, so a databaseFilterPattern that matches one casing could filter external-table edges while letting table/column edges through (or vice-versa) when the UC system tables return names in a different case than the filter pattern expects. Normalizing case in one central place keeps the two paths consistent.

Quality: Filtered catalogs no longer reported in workflow status

📄 ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py:282-296 📄 ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py:416-425
Previously _iter enumerated catalogs and called self.status.filter(database.fullyQualifiedName.root, "Catalog Filtered Out") for catalogs excluded by databaseFilterPattern, giving operators explicit visibility into which catalogs were skipped. With the rewrite, catalog filtering happens silently per edge inside _build_table_edge/_yield_external_lineage and only contributes to the aggregate skipped counter in the INFO summary, which lumps filtered tables and unresolved tables together. Operators lose the explicit "Catalog Filtered Out" status entries. Consider tracking and logging the set of distinct catalogs dropped by the database filter, or splitting the skipped summary into filtered-vs-unresolved counts.

...and 4 more resolved from earlier reviews

🤖 Prompt for agents
Code Review: Reworks Unity Catalog lineage into a bounded, per-window streaming model to improve memory efficiency and failure visibility. Requires addressing critical issues where cross-window deduplication drops column-level data and prematurely marks edges as seen, preventing retries on failure.

1. ⚠️ Edge Case: Cross-window dedup keys on table pair only, dropping column lineage
   Files: ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py:323-331, ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py:218-227, ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py:242

   `_yield_row_lineage` builds `edge_key` from only `(source_table_full_name, target_table_full_name)` and suppresses any later row with the same table pair. But column pairs are aggregated server-side **per window** (`collect_set` over that window's events), so different windows can carry different column-level pairs for the same table edge. Once the first window emits the A→B edge, every subsequent window's row for A→B is counted as a `duplicate` and dropped — including any column-lineage pairs that only appeared in those later windows. Result: column-level lineage becomes whatever subset happened to land in the first window in which the edge appeared, rather than the union across the lookback. Before this commit each window re-emitted the edge, so all column pairs were eventually sent (at the cost of redundant edge emits). This commit fixes the redundant table-edge emits but silently regresses column-lineage completeness.
   
   Consider folding a normalized column-pair signature into the dedup key so edges that differ in their column lineage are still emitted, while true duplicates (same table pair AND same column set) are still suppressed.

   Fix (Include the normalized column-pair set in the dedup key so windows that contribute distinct column lineage for the same table edge are still emitted.):
   column_pairs = tuple(sorted(self._parse_column_pairs(row.column_pairs)))
   edge_key = (
       row.source_table_full_name.lower(),
       row.target_table_full_name.lower(),
       column_pairs,
   )
   if edge_key in self._seen_edges:
       stats["duplicate"] += 1
   else:
       self._seen_edges[edge_key] = True
       yield from self._process_unseen_row(row, stats)

2. 💡 Bug: Edge marked seen before processing, so failed rows never retry
   Files: ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py:329-331, ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py:333-347

   `_yield_row_lineage` inserts `edge_key` into `_seen_edges` before `_process_unseen_row` runs. If processing raises (counted as `failed` and surfaced as `Either(left)`), the edge is still recorded as seen, so the same edge appearing in a later window is silently treated as a `duplicate` and never re-attempted — the operator sees one failure and zero recovery even though a later window might have succeeded. Marking the edge seen only after a successful emit (or at least not on the failure path) would let transient per-row failures recover in a subsequent window. Low severity since most failures are deterministic, but the bounded LRU + fail-then-skip combination can permanently drop an edge.

   Fix (Only record the edge as seen once it has been successfully emitted, so failed rows can be retried in a later window.):
   if edge_key in self._seen_edges:
       stats["duplicate"] += 1
       return
   emitted = False
   for either in self._process_unseen_row(row, stats):
       if either.right is not None:
           emitted = True
       yield either
   if emitted:
       self._seen_edges[edge_key] = True

Options

Display: compact → Showing less information.

Comment with these commands to change:

Compact
gitar display:verbose         

Was this helpful? React with 👍 / 👎 | Gitar

@github-actions

Copy link
Copy Markdown
Contributor

❌ UI Checkstyle Failed

❌ I18n Sync

Translation locale files are out of sync with en-us.json.

Affected files
  • openmetadata-ui/src/main/resources/ui/src/locale/languages/sv-se.json

Fix locally (fast — only checks files changed in this branch):

make ui-checkstyle-changed

Comment on lines +316 to +331
def _yield_row_lineage(self, row: Any, stats: dict) -> Iterable[Either[AddLineageRequest]]:
"""
Skip edges already streamed in an earlier window so an edge whose events
span multiple windows is emitted once instead of once per window. The
dedup set is a bounded LRU, so only recently-seen edges are suppressed;
edges past the cache window may re-emit (lineage adds are idempotent).
"""
edge_key = (
row.source_table_full_name.lower(),
row.target_table_full_name.lower(),
)
if edge_key in self._seen_edges:
stats["duplicate"] += 1
else:
self._seen_edges[edge_key] = True
yield from self._process_unseen_row(row, stats)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Cross-window dedup silently drops column lineage

_seen_edges deduplicates on (source, target) alone. When window 1 contains a table-lineage event for edge A→B but no column-lineage event (e.g., the originating query used SELECT *, which Databricks does not always track at column level), and window 2 contains both a table-lineage event and column-lineage events for the same edge, the window 2 row is counted as a "duplicate" and skipped. The edge is emitted without column pairs, even though column lineage is available within the lookback window.

The PR description explicitly states lineage adds are idempotent, making cross-window dedup a pure performance optimisation. Removing it entirely — or only deduplicating rows where column_pairs is empty — would restore the old behaviour of always capturing available column lineage. Without dedup the same edge just gets re-sent, which is already the documented fallback when the LRU evicts an entry.

@sonarqubecloud

Copy link
Copy Markdown

@sonarqubecloud

Copy link
Copy Markdown

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Ingestion safe to test Add this label to run secure Github workflows on PRs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Unity Catalog lineage loads the entire metastore graph into unbounded memory and hides per-edge failures

3 participants