Skip to content

Commit a1151b9

Browse files
ulixius9claude
andcommitted
fix(unitycatalog): dedup cross-window lineage edges, enforce chunk-size minimum
- Add bounded LRU dedup so an edge spanning multiple day-windows is emitted once instead of once per window; surface the deduplicated count in the per-run summary. - Add `minimum: 1` to the lineageQueryChunkSize connection schema so invalid values are rejected by UI/API, matching the runtime clamp. - Correct the lineage summary log to credit database/schema/table filter patterns rather than only databaseFilterPattern. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 26f3434 commit a1151b9

3 files changed

Lines changed: 43 additions & 7 deletions

File tree

ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@
6161

6262
TABLE_RESOLUTION_CACHE_SIZE = 1000
6363

64+
EDGE_DEDUP_CACHE_SIZE = 1000
65+
6466
DEFAULT_LINEAGE_CHUNK_DAYS = 7
6567

6668

@@ -90,6 +92,7 @@ def __init__(
9092
self.connection_obj = get_connection(self.service_connection)
9193
self.engine = get_sqlalchemy_connection(self.service_connection)
9294
self._table_cache: LRUCache = LRUCache(maxsize=TABLE_RESOLUTION_CACHE_SIZE)
95+
self._seen_edges: LRUCache = LRUCache(maxsize=EDGE_DEDUP_CACHE_SIZE)
9396
self._chunk_days = self._resolve_chunk_days()
9497
self.test_connection()
9598

@@ -273,17 +276,18 @@ def _yield_table_lineage(self) -> Iterable[Either[AddLineageRequest]]:
273276
"""
274277
Stream table/column lineage one day-window at a time, emitting one
275278
request per resolved edge. Per-row and per-window failures both surface
276-
as Either(left) instead of being swallowed. Edges dropped by
277-
`databaseFilterPattern` and edges whose tables are absent from
278-
OpenMetadata are counted separately so the summary distinguishes
279+
as Either(left) instead of being swallowed. Edges dropped by the
280+
database/schema/table filter patterns and edges whose tables are absent
281+
from OpenMetadata are counted separately so the summary distinguishes
279282
intentional filtering from missing metadata.
280283
"""
281-
stats = {"emitted": 0, "filtered": 0, "unresolved": 0, "failed": 0}
284+
stats = {"emitted": 0, "duplicate": 0, "filtered": 0, "unresolved": 0, "failed": 0}
282285
for window_start, window_end in self._iter_date_windows():
283286
yield from self._yield_window_lineage(window_start, window_end, stats)
284287
logger.info(
285-
f"Table lineage: emitted {stats['emitted']} edges, filtered {stats['filtered']} "
286-
f"(databaseFilterPattern), unresolved {stats['unresolved']} (tables not in OpenMetadata), "
288+
f"Table lineage: emitted {stats['emitted']} edges, deduplicated {stats['duplicate']} "
289+
f"cross-window duplicates, filtered {stats['filtered']} (database/schema/table filter "
290+
f"patterns), unresolved {stats['unresolved']} (tables not in OpenMetadata), "
287291
f"failed {stats['failed']} (row/window errors)"
288292
)
289293

@@ -310,6 +314,23 @@ def _yield_window_lineage(
310314
)
311315

312316
def _yield_row_lineage(self, row: Any, stats: dict) -> Iterable[Either[AddLineageRequest]]:
317+
"""
318+
Skip edges already streamed in an earlier window so an edge whose events
319+
span multiple windows is emitted once instead of once per window. The
320+
dedup set is a bounded LRU, so only recently-seen edges are suppressed;
321+
edges past the cache window may re-emit (lineage adds are idempotent).
322+
"""
323+
edge_key = (
324+
row.source_table_full_name.lower(),
325+
row.target_table_full_name.lower(),
326+
)
327+
if edge_key in self._seen_edges:
328+
stats["duplicate"] += 1
329+
else:
330+
self._seen_edges[edge_key] = True
331+
yield from self._process_unseen_row(row, stats)
332+
333+
def _process_unseen_row(self, row: Any, stats: dict) -> Iterable[Either[AddLineageRequest]]:
313334
if self._is_filtered_table(row.source_table_full_name) or self._is_filtered_table(row.target_table_full_name):
314335
stats["filtered"] += 1
315336
else:

ingestion/tests/unit/topology/database/test_unity_catalog_lineage.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,20 @@ def test_emits_resolved_edges(self, lineage_source):
276276
assert len(results) == 1
277277
assert isinstance(results[0].right, AddLineageRequest)
278278

279+
def test_dedupes_edges_across_windows(self, lineage_source):
280+
source_table = _make_table("source", "local_unitycatalog.cat.schema.source")
281+
target_table = _make_table("target", "local_unitycatalog.cat.schema.target")
282+
lineage_source.metadata.get_by_name.side_effect = [source_table, target_table]
283+
284+
rows = [LineageRow("cat.schema.source", "cat.schema.target", None)]
285+
with patch.object(lineage_source, "_iter_date_windows", return_value=[("s1", "e1"), ("s2", "e2")]):
286+
_mock_query_rows(lineage_source, rows)
287+
results = list(lineage_source._yield_table_lineage())
288+
289+
assert len(results) == 1
290+
assert isinstance(results[0].right, AddLineageRequest)
291+
assert lineage_source.metadata.get_by_name.call_count == 2
292+
279293
def test_skips_unresolved_edges(self, lineage_source):
280294
lineage_source.metadata.get_by_name.return_value = None
281295

openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/unityCatalogConnection.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,8 @@
8484
"title": "Lineage Query Chunk Size",
8585
"description": "Number of days of lineage scanned per query against the Unity Catalog system tables. The configured lineage lookback window is split into chunks of this size and streamed one chunk at a time so each scan stays bounded. Larger values issue fewer queries (faster); smaller values keep each scan smaller.",
8686
"type": "integer",
87-
"default": 7
87+
"default": 7,
88+
"minimum": 1
8889
},
8990
"connectionOptions": {
9091
"title": "Connection Options",

0 commit comments

Comments
 (0)