From 84ec25ad795b9ec24fc2998d60b0a65c3fdc47af Mon Sep 17 00:00:00 2001 From: ulixius9 Date: Tue, 2 Jun 2026 23:22:23 +0530 Subject: [PATCH 1/7] fix(unitycatalog): scope lineage caches per catalog, surface lineage failures Lineage workflow previously materialized the entire workspace's system.access.table_lineage / column_lineage / external table graph into unbounded in-memory maps, and per-edge failures were only visible at DEBUG. - Scope table/column lineage and external-location queries to one catalog at a time (system table predicates) and clear the maps between catalogs so memory stays bounded to a single catalog - Normalize FQN casing (lowercase) at cache boundaries so case differences between OM names and UC system-table values cannot silently drop edges - Per-edge lineage failures now yield Either(left=StackTraceError) so they appear in workflow status; helper failures log at WARNING instead of DEBUG - Replace SELECT * in information_schema tag queries with explicit column projection - Drain context.deleted_tables after each catalog's mark-deleted pass so the list does not grow across the whole incremental run Co-Authored-By: Claude Opus 4.8 (1M context) --- .../source/database/unitycatalog/lineage.py | 145 +++++++++++----- .../source/database/unitycatalog/metadata.py | 9 +- .../source/database/unitycatalog/queries.py | 13 +- .../database/test_unity_catalog_lineage.py | 164 +++++++++++++++++- .../database/test_unitycatalog_incremental.py | 19 ++ 5 files changed, 297 insertions(+), 53 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py index a01b49f50b21..591b49f6af12 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py @@ -26,6 +26,9 @@ from metadata.generated.schema.entity.services.connections.database.unityCatalogConnection import ( UnityCatalogConnection, ) +from metadata.generated.schema.entity.services.ingestionPipelines.status import ( + StackTraceError, +) from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) @@ -100,58 +103,82 @@ def create(cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str raise InvalidSourceException(f"Expected UnityCatalogConnection, but got {connection}") return cls(config, metadata) - def _cache_lineage(self): + def _cache_lineage(self, catalog_name: str): """ - Bulk-fetch all table and column lineage from system tables into memory. + Fetch table and column lineage for a single catalog from system tables + into memory. FQNs are lowercased so lookups are case-insensitive. """ query_log_duration = self.source_config.queryLogDuration or 1 # pyright: ignore[reportAttributeAccessIssue] - logger.info(f"Caching lineage from system tables (lookback: {query_log_duration} days)") + logger.info(f"Caching lineage for catalog [{catalog_name}] (lookback: {query_log_duration} days)") try: with self.engine.connect() as conn: - rows = conn.execute(text(UNITY_CATALOG_TABLE_LINEAGE.format(query_log_duration=query_log_duration))) + rows = conn.execute( + text( + UNITY_CATALOG_TABLE_LINEAGE.format(query_log_duration=query_log_duration, catalog=catalog_name) + ) + ) for row in rows: - self.table_lineage_map[row.target_table_full_name].add(row.source_table_full_name) + self.table_lineage_map[row.target_table_full_name.lower()].add(row.source_table_full_name.lower()) logger.info( - f"Cached table lineage: {sum(len(v) for v in self.table_lineage_map.values())} edges " + f"Cached table lineage for catalog [{catalog_name}]: " + f"{sum(len(v) for v in self.table_lineage_map.values())} edges " f"for {len(self.table_lineage_map)} target tables" ) except Exception as exc: logger.debug(traceback.format_exc()) - logger.warning(f"Failed to cache table lineage: {exc}") + logger.warning(f"Failed to cache table lineage for catalog [{catalog_name}]: {exc}") try: with self.engine.connect() as conn: - rows = conn.execute(text(UNITY_CATALOG_COLUMN_LINEAGE.format(query_log_duration=query_log_duration))) + rows = conn.execute( + text( + UNITY_CATALOG_COLUMN_LINEAGE.format(query_log_duration=query_log_duration, catalog=catalog_name) + ) + ) for row in rows: table_key = ( - row.source_table_full_name, - row.target_table_full_name, + row.source_table_full_name.lower(), + row.target_table_full_name.lower(), ) self.column_lineage_map[table_key].append((row.source_column_name, row.target_column_name)) logger.info( - f"Cached column lineage: {sum(len(v) for v in self.column_lineage_map.values())} " + f"Cached column lineage for catalog [{catalog_name}]: " + f"{sum(len(v) for v in self.column_lineage_map.values())} " f"column mappings for {len(self.column_lineage_map)} table pairs" ) except Exception as exc: logger.debug(traceback.format_exc()) - logger.warning(f"Failed to cache column lineage: {exc}") + logger.warning(f"Failed to cache column lineage for catalog [{catalog_name}]: {exc}") - def _cache_external_locations(self): + def _cache_external_locations(self, catalog_name: str): """ - Bulk-fetch all external table storage locations from system.information_schema.tables. + Fetch external table storage locations for a single catalog from + system.information_schema.tables. FQNs are lowercased so lookups + are case-insensitive. """ - logger.info("Caching external table locations from system tables") + logger.info(f"Caching external table locations for catalog [{catalog_name}]") try: with self.engine.connect() as conn: - rows = conn.execute(text(UNITY_CATALOG_EXTERNAL_TABLES)) + rows = conn.execute(text(UNITY_CATALOG_EXTERNAL_TABLES.format(catalog=catalog_name))) for row in rows: - table_fqn = f"{row.table_catalog}.{row.table_schema}.{row.table_name}" + table_fqn = f"{row.table_catalog}.{row.table_schema}.{row.table_name}".lower() self.external_location_map[table_fqn] = row.storage_path - logger.info(f"Cached {len(self.external_location_map)} external table locations") + logger.info( + f"Cached {len(self.external_location_map)} external table locations for catalog [{catalog_name}]" + ) except Exception as exc: logger.debug(traceback.format_exc()) - logger.warning(f"Failed to cache external table locations: {exc}") + logger.warning(f"Failed to cache external table locations for catalog [{catalog_name}]: {exc}") + + def _clear_lineage_caches(self): + """ + Release the per-catalog lineage maps so memory stays bounded to one + catalog at a time. + """ + self.table_lineage_map.clear() + self.column_lineage_map.clear() + self.external_location_map.clear() def _get_data_model_column_fqn(self, data_model_entity: ContainerDataModel, column: str) -> Optional[str]: # noqa: UP045 if not data_model_entity: @@ -182,7 +209,9 @@ def _get_container_column_lineage( ) return None # noqa: TRY300 except Exception as exc: - logger.debug(f"Error computing container column lineage for {table_entity.fullyQualifiedName.root}: {exc}") + logger.warning( + f"Error computing container column lineage for {table_entity.fullyQualifiedName.root}: {exc}" # pyright: ignore[reportOptionalMemberAccess] + ) logger.debug(traceback.format_exc()) return None @@ -210,7 +239,7 @@ def _get_column_lineage_details( return LineageDetails(columnsLineage=col_lineage, source=LineageSource.QueryLineage) return None # noqa: TRY300 except Exception as exc: - logger.debug(f"Error computing column lineage: {exc}") + logger.warning(f"Error computing column lineage for {source_table_fqn} -> {target_table_fqn}: {exc}") logger.debug(traceback.format_exc()) return None @@ -250,8 +279,13 @@ def _process_external_location_lineage( ), ) except Exception as exc: - logger.debug(f"Error processing external location lineage for {databricks_table_fqn}: {exc}") - logger.debug(traceback.format_exc()) + yield Either( # pyright: ignore[reportCallIssue] + left=StackTraceError( + name=databricks_table_fqn, + error=f"Error processing external location lineage for {databricks_table_fqn}: {exc}", + stackTrace=traceback.format_exc(), + ) + ) def _process_table_lineage(self, table: Table, databricks_table_fqn: str) -> Iterable[Either[AddLineageRequest]]: upstream_tables = self.table_lineage_map.get(databricks_table_fqn, set()) @@ -295,17 +329,20 @@ def _process_table_lineage(self, table: Table, databricks_table_fqn: str) -> Ite ), ) except Exception as exc: - logger.debug(f"Error processing lineage {source_table_full_name} -> {databricks_table_fqn}: {exc}") - logger.debug(traceback.format_exc()) + yield Either( # pyright: ignore[reportCallIssue] + left=StackTraceError( + name=databricks_table_fqn, + error=f"Error processing lineage {source_table_full_name} -> {databricks_table_fqn}: {exc}", + stackTrace=traceback.format_exc(), + ) + ) def _iter(self, *_, **__) -> Iterable[Either[AddLineageRequest]]: """ Fetch lineage from system tables for both table-to-table - and external location lineage. + and external location lineage, one catalog at a time so the + cached lineage maps stay bounded to a single catalog. """ - self._cache_lineage() - self._cache_external_locations() - for database in self.metadata.list_all_entities(entity=Database, params={"service": self.config.serviceName}): if filter_by_database(self.source_config.databaseFilterPattern, database.name.root): # pyright: ignore[reportAttributeAccessIssue] self.status.filter( @@ -313,9 +350,21 @@ def _iter(self, *_, **__) -> Iterable[Either[AddLineageRequest]]: "Catalog Filtered Out", ) continue + + yield from self._process_catalog_lineage(database) + + def _process_catalog_lineage(self, database: Database) -> Iterable[Either[AddLineageRequest]]: + """ + Cache lineage scoped to one catalog, process its tables, and release + the caches before moving to the next catalog. + """ + catalog_name = database.name.root.lower() + self._cache_lineage(catalog_name) + self._cache_external_locations(catalog_name) + try: for schema in self.metadata.list_all_entities( entity=DatabaseSchema, - params={"database": database.fullyQualifiedName.root}, + params={"database": database.fullyQualifiedName.root}, # pyright: ignore[reportOptionalMemberAccess] ): if filter_by_schema(self.source_config.schemaFilterPattern, schema.name.root): # pyright: ignore[reportAttributeAccessIssue] self.status.filter( @@ -323,22 +372,32 @@ def _iter(self, *_, **__) -> Iterable[Either[AddLineageRequest]]: "Schema Filtered Out", ) continue - for table in self.metadata.list_all_entities( - entity=Table, - params={"databaseSchema": schema.fullyQualifiedName.root}, - ): - if filter_by_table(self.source_config.tableFilterPattern, table.name.root): # pyright: ignore[reportAttributeAccessIssue] - self.status.filter( - table.fullyQualifiedName.root, - "Table Filtered Out", - ) - continue - databricks_table_fqn = f"{table.database.name}.{table.databaseSchema.name}.{table.name.root}" + yield from self._process_schema_lineage(schema) + finally: + self._clear_lineage_caches() + + def _process_schema_lineage(self, schema: DatabaseSchema) -> Iterable[Either[AddLineageRequest]]: + """ + Process table and external location lineage for every + non-filtered table of a schema. + """ + for table in self.metadata.list_all_entities( + entity=Table, + params={"databaseSchema": schema.fullyQualifiedName.root}, # pyright: ignore[reportOptionalMemberAccess] + ): + if filter_by_table(self.source_config.tableFilterPattern, table.name.root): # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess] + self.status.filter( + table.fullyQualifiedName.root, + "Table Filtered Out", + ) + continue + + databricks_table_fqn = f"{table.database.name}.{table.databaseSchema.name}.{table.name.root}".lower() - yield from self._process_table_lineage(table, databricks_table_fqn) + yield from self._process_table_lineage(table, databricks_table_fqn) - yield from self._process_external_location_lineage(table, databricks_table_fqn) + yield from self._process_external_location_lineage(table, databricks_table_fqn) def test_connection(self) -> None: test_connection_common(self.metadata, self.connection_obj, self.service_connection) diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py index 2d021dce2589..554136724b8a 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py @@ -617,10 +617,17 @@ def mark_tables_as_deleted(self): raise ValueError("No Database found in the context. We cannot run the table deletion.") if self.source_config.markDeletedTables: logger.info(f"Mark Deleted Tables set to True. Processing database [{self.context.get().database}]") # pyright: ignore[reportAttributeAccessIssue] + # Drain the global list so it stays bounded to one catalog's + # deletions instead of growing across the whole run. + with self._state_lock: + deleted_tables = list( + self.context.get_global().deleted_tables # pyright: ignore[reportAttributeAccessIssue] + ) + self.context.get_global().deleted_tables.clear() # pyright: ignore[reportAttributeAccessIssue] yield from delete_entity_by_name( self.metadata, entity_type=Table, - entity_names=self.context.get_global().deleted_tables, # pyright: ignore[reportAttributeAccessIssue] + entity_names=deleted_tables, recursive=self.source_config.markDeletedTables, ) else: diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py index bb7d7d1aa206..d063e1002457 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py @@ -15,19 +15,21 @@ import textwrap UNITY_CATALOG_GET_CATALOGS_TAGS = """ -SELECT * FROM `{database}`.information_schema.catalog_tags; +SELECT catalog_name, tag_name, tag_value FROM `{database}`.information_schema.catalog_tags; """ UNITY_CATALOG_GET_ALL_SCHEMA_TAGS = """ -SELECT * FROM `{database}`.information_schema.schema_tags; +SELECT catalog_name, schema_name, tag_name, tag_value FROM `{database}`.information_schema.schema_tags; """ UNITY_CATALOG_GET_ALL_TABLE_TAGS = """ -SELECT * FROM `{database}`.information_schema.table_tags WHERE schema_name = '{schema}'; +SELECT catalog_name, schema_name, table_name, tag_name, tag_value +FROM `{database}`.information_schema.table_tags WHERE schema_name = '{schema}'; """ UNITY_CATALOG_GET_ALL_TABLE_COLUMNS_TAGS = """ -SELECT * FROM `{database}`.information_schema.column_tags WHERE schema_name = '{schema}'; +SELECT catalog_name, schema_name, table_name, column_name, tag_name, tag_value +FROM `{database}`.information_schema.column_tags WHERE schema_name = '{schema}'; """ UNITY_CATALOG_SQL_STATEMENT = textwrap.dedent( @@ -65,6 +67,7 @@ WHERE event_time >= current_date() - INTERVAL {query_log_duration} DAYS AND source_table_full_name IS NOT NULL AND target_table_full_name IS NOT NULL + AND lower(split_part(target_table_full_name, '.', 1)) = '{catalog}' GROUP BY source_table_full_name, target_table_full_name """ ) @@ -82,6 +85,7 @@ AND target_table_full_name IS NOT NULL AND source_column_name IS NOT NULL AND target_column_name IS NOT NULL + AND lower(split_part(target_table_full_name, '.', 1)) = '{catalog}' GROUP BY source_table_full_name, source_column_name, @@ -100,6 +104,7 @@ FROM system.information_schema.tables WHERE table_type = 'EXTERNAL' AND storage_path IS NOT NULL + AND lower(table_catalog) = '{catalog}' """ ) diff --git a/ingestion/tests/unit/topology/database/test_unity_catalog_lineage.py b/ingestion/tests/unit/topology/database/test_unity_catalog_lineage.py index 54394804ad1f..1cf413ba493b 100644 --- a/ingestion/tests/unit/topology/database/test_unity_catalog_lineage.py +++ b/ingestion/tests/unit/topology/database/test_unity_catalog_lineage.py @@ -24,6 +24,7 @@ Container, ContainerDataModel, ) +from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.table import ( Column, ColumnName, @@ -102,7 +103,7 @@ def test_cache_table_lineage(self, lineage_source): lineage_source.engine.connect.return_value.__enter__ = Mock(return_value=mock_conn) lineage_source.engine.connect.return_value.__exit__ = Mock(return_value=False) - lineage_source._cache_lineage() + lineage_source._cache_lineage("cat") assert "cat.schema.target1" in lineage_source.table_lineage_map assert lineage_source.table_lineage_map["cat.schema.target1"] == { @@ -142,7 +143,7 @@ def mock_execute(query): lineage_source.engine.connect.return_value.__enter__ = Mock(return_value=mock_conn) lineage_source.engine.connect.return_value.__exit__ = Mock(return_value=False) - lineage_source._cache_lineage() + lineage_source._cache_lineage("cat") key = ("cat.schema.src", "cat.schema.tgt") assert key in lineage_source.column_lineage_map @@ -157,11 +158,50 @@ def test_cache_lineage_handles_query_failure(self, lineage_source): lineage_source.engine.connect.return_value.__enter__ = Mock(return_value=mock_conn) lineage_source.engine.connect.return_value.__exit__ = Mock(return_value=False) - lineage_source._cache_lineage() + lineage_source._cache_lineage("cat") assert len(lineage_source.table_lineage_map) == 0 assert len(lineage_source.column_lineage_map) == 0 + def test_cache_lineage_normalizes_case(self, lineage_source): + TableRow = namedtuple("TableRow", ["source_table_full_name", "target_table_full_name"]) + mock_rows = [ + TableRow("Cat.Schema.Source1", "CAT.SCHEMA.Target1"), + ] + + mock_conn = MagicMock() + mock_conn.execute.return_value = mock_rows + lineage_source.engine.connect.return_value.__enter__ = Mock(return_value=mock_conn) + lineage_source.engine.connect.return_value.__exit__ = Mock(return_value=False) + + lineage_source._cache_lineage("cat") + + assert lineage_source.table_lineage_map["cat.schema.target1"] == {"cat.schema.source1"} + + def test_cache_lineage_query_scoped_to_catalog(self, lineage_source): + mock_conn = MagicMock() + mock_conn.execute.return_value = [] + lineage_source.engine.connect.return_value.__enter__ = Mock(return_value=mock_conn) + lineage_source.engine.connect.return_value.__exit__ = Mock(return_value=False) + + lineage_source._cache_lineage("my_catalog") + + executed_queries = [str(call.args[0]) for call in mock_conn.execute.call_args_list] + assert len(executed_queries) == 2 + for query in executed_queries: + assert "= 'my_catalog'" in query + + def test_clear_lineage_caches(self, lineage_source): + lineage_source.table_lineage_map["cat.schema.tgt"].add("cat.schema.src") + lineage_source.column_lineage_map[("cat.schema.src", "cat.schema.tgt")].append(("col_a", "col_x")) + lineage_source.external_location_map["cat.schema.ext"] = "s3://bucket/path" + + lineage_source._clear_lineage_caches() + + assert len(lineage_source.table_lineage_map) == 0 + assert len(lineage_source.column_lineage_map) == 0 + assert len(lineage_source.external_location_map) == 0 + class TestProcessTableLineage: def test_process_table_lineage_from_cache(self, lineage_source): @@ -265,6 +305,26 @@ def test_process_table_lineage_skips_missing_entity(self, lineage_source): assert len(results) == 0 + def test_process_table_lineage_failure_yields_left(self, lineage_source): + lineage_source.table_lineage_map = {"cat.schema.target": {"cat.schema.source"}} + lineage_source.column_lineage_map = {} + + target_table = Table( + id=uuid4(), + name=EntityName(root="target"), + fullyQualifiedName=FullyQualifiedEntityName(root="local_unitycatalog.cat.schema.target"), + columns=[], + ) + + lineage_source.metadata.get_by_name.side_effect = Exception("Connection reset") + + results = list(lineage_source._process_table_lineage(target_table, "cat.schema.target")) + + assert len(results) == 1 + assert results[0].left is not None + assert results[0].right is None + assert "Connection reset" in results[0].left.error + class TestColumnLineageDetails: def test_self_loop_prevention(self, lineage_source): @@ -340,7 +400,7 @@ def test_cache_external_locations(self, lineage_source): lineage_source.engine.connect.return_value.__enter__ = Mock(return_value=mock_conn) lineage_source.engine.connect.return_value.__exit__ = Mock(return_value=False) - lineage_source._cache_external_locations() + lineage_source._cache_external_locations("cat") assert len(lineage_source.external_location_map) == 2 assert lineage_source.external_location_map["cat.schema.ext_table1"] == "s3://bucket/path1" @@ -352,10 +412,28 @@ def test_cache_external_locations_handles_failure(self, lineage_source): lineage_source.engine.connect.return_value.__enter__ = Mock(return_value=mock_conn) lineage_source.engine.connect.return_value.__exit__ = Mock(return_value=False) - lineage_source._cache_external_locations() + lineage_source._cache_external_locations("cat") assert len(lineage_source.external_location_map) == 0 + def test_cache_external_locations_normalizes_case(self, lineage_source): + ExternalRow = namedtuple( + "ExternalRow", + ["table_catalog", "table_schema", "table_name", "storage_path"], + ) + mock_rows = [ + ExternalRow("Cat", "Schema", "Ext_Table", "s3://bucket/Path1"), + ] + + mock_conn = MagicMock() + mock_conn.execute.return_value = mock_rows + lineage_source.engine.connect.return_value.__enter__ = Mock(return_value=mock_conn) + lineage_source.engine.connect.return_value.__exit__ = Mock(return_value=False) + + lineage_source._cache_external_locations("cat") + + assert lineage_source.external_location_map == {"cat.schema.ext_table": "s3://bucket/Path1"} + def test_process_external_location_lineage_from_cache(self, lineage_source): lineage_source.external_location_map = {"cat.schema.test_table": "s3://bucket/path"} @@ -443,6 +521,82 @@ def test_process_external_location_no_container_found(self, lineage_source): assert len(results) == 0 + def test_process_external_location_failure_yields_left(self, lineage_source): + lineage_source.external_location_map = {"cat.schema.test_table": "s3://bucket/path"} + + table_entity = Table( + id=uuid4(), + name=EntityName(root="test_table"), + fullyQualifiedName=FullyQualifiedEntityName(root="service.db.schema.test_table"), + columns=[], + ) + + lineage_source.metadata.es_search_container_by_path.side_effect = Exception("Search unavailable") + + results = list(lineage_source._process_external_location_lineage(table_entity, "cat.schema.test_table")) + + assert len(results) == 1 + assert results[0].left is not None + assert results[0].right is None + assert "Search unavailable" in results[0].left.error + + +class TestPerCatalogProcessing: + def _make_database(self, name="cat"): + return Database( + id=uuid4(), + name=EntityName(root=name), + fullyQualifiedName=FullyQualifiedEntityName(root=f"local_unitycatalog.{name}"), + service=EntityReference(id=uuid4(), type="databaseService"), + ) + + def test_process_catalog_lineage_scopes_and_clears_caches(self, lineage_source): + database = self._make_database() + lineage_source.metadata.list_all_entities.return_value = [] + + with ( + patch.object(lineage_source, "_cache_lineage") as mock_cache_lineage, + patch.object(lineage_source, "_cache_external_locations") as mock_cache_locations, + ): + lineage_source.table_lineage_map["cat.schema.tgt"].add("cat.schema.src") + lineage_source.external_location_map["cat.schema.ext"] = "s3://bucket/path" + + results = list(lineage_source._process_catalog_lineage(database)) + + assert results == [] + mock_cache_lineage.assert_called_once_with("cat") + mock_cache_locations.assert_called_once_with("cat") + assert len(lineage_source.table_lineage_map) == 0 + assert len(lineage_source.column_lineage_map) == 0 + assert len(lineage_source.external_location_map) == 0 + + def test_process_catalog_lineage_lowercases_catalog_name(self, lineage_source): + database = self._make_database(name="MyCat") + lineage_source.metadata.list_all_entities.return_value = [] + + with ( + patch.object(lineage_source, "_cache_lineage") as mock_cache_lineage, + patch.object(lineage_source, "_cache_external_locations") as mock_cache_locations, + ): + list(lineage_source._process_catalog_lineage(database)) + + mock_cache_lineage.assert_called_once_with("mycat") + mock_cache_locations.assert_called_once_with("mycat") + + def test_process_catalog_lineage_clears_caches_on_error(self, lineage_source): + database = self._make_database() + lineage_source.metadata.list_all_entities.side_effect = Exception("API error") + + with ( + patch.object(lineage_source, "_cache_lineage"), + patch.object(lineage_source, "_cache_external_locations"), + pytest.raises(Exception, match="API error"), + ): + lineage_source.table_lineage_map["cat.schema.tgt"].add("cat.schema.src") + list(lineage_source._process_catalog_lineage(database)) + + assert len(lineage_source.table_lineage_map) == 0 + class TestContainerColumnLineage: def test_get_data_model_column_fqn(self, lineage_source): diff --git a/ingestion/tests/unit/topology/database/test_unitycatalog_incremental.py b/ingestion/tests/unit/topology/database/test_unitycatalog_incremental.py index 7114c941d083..f0a32ac85666 100644 --- a/ingestion/tests/unit/topology/database/test_unitycatalog_incremental.py +++ b/ingestion/tests/unit/topology/database/test_unitycatalog_incremental.py @@ -364,6 +364,25 @@ def test_mark_tables_as_deleted_incremental_uses_explicit_list(self): _, kwargs = delete_mock.call_args assert kwargs["entity_names"] == ["svc.cat.schema1.dropped"] + def test_mark_tables_as_deleted_drains_global_list(self): + source = self._make_source() + source.incremental.enabled = True + source.source_config.markDeletedTables = True + source.context.get.return_value = SimpleNamespace(database="cat") + deleted_tables = ["svc.cat.schema1.dropped", "svc.cat.schema2.dropped"] + source.context.get_global.return_value = SimpleNamespace(deleted_tables=deleted_tables) + + with patch( + f"{UC_METADATA_MODULE}.delete_entity_by_name", + return_value=iter(["deleted"]), + ) as delete_mock: + result = list(UnitycatalogSource.mark_tables_as_deleted(source)) + + assert result == ["deleted"] + _, kwargs = delete_mock.call_args + assert kwargs["entity_names"] == ["svc.cat.schema1.dropped", "svc.cat.schema2.dropped"] + assert deleted_tables == [] + def test_mark_tables_as_deleted_incremental_respects_mark_flag(self): source = self._make_source() source.incremental.enabled = True From 79907ff7ea8dce56cd75d604511869d75eaca9e2 Mon Sep 17 00:00:00 2001 From: ulixius9 Date: Tue, 9 Jun 2026 16:23:32 +0530 Subject: [PATCH 2/7] refactor(unitycatalog): stream lineage per day-window instead of per-catalog caches Replace the per-catalog in-memory lineage maps with a Snowflake ACCESS_HISTORY-style streaming model: iterate the configured lookback in `lineageQueryChunkSize`-day windows, stream one combined table+column lineage query per catalog per window, and resolve each edge endpoint to an OpenMetadata table through a bounded LRU cache (hits + misses). - Combined `UNITY_CATALOG_LINEAGE` query aggregates column pairs server-side via `collect_set(struct(...))` so client memory stays O(window) regardless of metastore lineage size; `collect_set` avoids the table x column fan-out. - New `lineageQueryChunkSize` connection field (default 7) tunes days scanned per query: bigger = fewer queries (faster), smaller = smaller scans. - External-location lineage is now edge-driven off the external-tables query and shares the same bounded LRU instead of iterating every OM table. - Drops the unbounded `table_lineage_map`/`column_lineage_map`/ `external_location_map` dicts and the per-catalog cache/clear machinery. - Keeps case-insensitive FQN matching and per-edge `Either(left)` failure surfacing; adds per-catalog emitted/skipped/failed diagnostics. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../source/database/unitycatalog/lineage.py | 437 ++++++------ .../source/database/unitycatalog/queries.py | 69 +- .../database/test_unity_catalog_lineage.py | 631 +++++++----------- .../database/unityCatalogConnection.json | 6 + 4 files changed, 520 insertions(+), 623 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py index 591b49f6af12..f213a2a7a2e8 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py @@ -12,16 +12,17 @@ Databricks Unity Catalog Lineage Source Module """ +import json import traceback -from collections import defaultdict -from typing import Iterable, Optional # noqa: UP035 +from datetime import datetime, timedelta +from typing import Any, Iterable, List, Optional, Tuple # noqa: UP035 +from cachetools import LRUCache from sqlalchemy import text from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.container import ContainerDataModel from metadata.generated.schema.entity.data.database import Database -from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.services.connections.database.unityCatalogConnection import ( UnityCatalogConnection, @@ -49,21 +50,31 @@ get_sqlalchemy_connection, ) from metadata.ingestion.source.database.unitycatalog.queries import ( - UNITY_CATALOG_COLUMN_LINEAGE, UNITY_CATALOG_EXTERNAL_TABLES, - UNITY_CATALOG_TABLE_LINEAGE, + UNITY_CATALOG_LINEAGE, ) from metadata.utils import fqn from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table -from metadata.utils.helpers import retry_with_docker_host +from metadata.utils.helpers import get_start_and_end, retry_with_docker_host from metadata.utils.logger import ingestion_logger logger = ingestion_logger() +TABLE_RESOLUTION_CACHE_SIZE = 1000 + +DEFAULT_LINEAGE_CHUNK_DAYS = 7 + class UnitycatalogLineageSource(Source): """ - Lineage Unity Catalog Source + Lineage Unity Catalog Source. + + Lineage edges are streamed one catalog and one day-window at a time from + `system.access.table_lineage` / `column_lineage` (the Databricks analogue + of Snowflake's ACCESS_HISTORY). Column pairs are aggregated server-side per + edge, and each endpoint is resolved to an OpenMetadata table through a + bounded LRU cache, so client memory stays O(window) regardless of how large + the metastore lineage graph is. """ @retry_with_docker_host() @@ -79,11 +90,21 @@ def __init__( self.source_config = self.config.sourceConfig.config self.connection_obj = get_connection(self.service_connection) self.engine = get_sqlalchemy_connection(self.service_connection) - self.table_lineage_map: dict[str, set[str]] = defaultdict(set) - self.column_lineage_map: dict[tuple[str, str], list[tuple[str, str]]] = defaultdict(list) - self.external_location_map: dict[str, str] = {} + self._table_cache: LRUCache = LRUCache(maxsize=TABLE_RESOLUTION_CACHE_SIZE) + self._chunk_days = self._resolve_chunk_days() self.test_connection() + def _resolve_chunk_days(self) -> int: + """ + Days of lineage scanned per system-table query, read from the + `lineageQueryChunkSize` connection field. Clamp to >= 1 so the + date-window iterator always makes forward progress. + """ + configured = getattr(self.service_connection, "lineageQueryChunkSize", None) + if configured is None: + return DEFAULT_LINEAGE_CHUNK_DAYS + return max(1, int(configured)) + def close(self): """ By default, there is nothing to close @@ -103,82 +124,188 @@ def create(cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str raise InvalidSourceException(f"Expected UnityCatalogConnection, but got {connection}") return cls(config, metadata) - def _cache_lineage(self, catalog_name: str): + def _iter_date_windows(self) -> Iterable[Tuple[datetime, datetime]]: # noqa: UP006 """ - Fetch table and column lineage for a single catalog from system tables - into memory. FQNs are lowercased so lookups are case-insensitive. + Split the configured `queryLogDuration` lookback into + `lineageQueryChunkSize` day windows. Streaming one window at a time keeps + each system-table scan and its result set bounded instead of pulling the + whole lookback at once. """ - query_log_duration = self.source_config.queryLogDuration or 1 # pyright: ignore[reportAttributeAccessIssue] - logger.info(f"Caching lineage for catalog [{catalog_name}] (lookback: {query_log_duration} days)") - - try: - with self.engine.connect() as conn: - rows = conn.execute( - text( - UNITY_CATALOG_TABLE_LINEAGE.format(query_log_duration=query_log_duration, catalog=catalog_name) - ) - ) - for row in rows: - self.table_lineage_map[row.target_table_full_name.lower()].add(row.source_table_full_name.lower()) - logger.info( - f"Cached table lineage for catalog [{catalog_name}]: " - f"{sum(len(v) for v in self.table_lineage_map.values())} edges " - f"for {len(self.table_lineage_map)} target tables" + start, end = get_start_and_end(self.source_config.queryLogDuration or 1) # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess] + window_start = start + while window_start < end: + window_end = min(window_start + timedelta(days=self._chunk_days), end) + yield window_start, window_end + window_start = window_end + + def _resolve_table(self, databricks_table_fqn: str) -> Optional[Table]: # noqa: UP045 + """ + Resolve a `catalog.schema.table` Unity Catalog name to an OpenMetadata + Table entity. Both hits and misses are cached in a bounded LRU so a busy + upstream table referenced by many edges is fetched once, and repeated + unresolvable lookups stay cheap. + """ + cache_key = databricks_table_fqn.lower() + if cache_key in self._table_cache: + return self._table_cache[cache_key] + entity = self._fetch_table_entity(databricks_table_fqn) + self._table_cache[cache_key] = entity + return entity + + def _fetch_table_entity(self, databricks_table_fqn: str) -> Optional[Table]: # noqa: UP045 + entity = None + parts = databricks_table_fqn.split(".") + if len(parts) != 3: + logger.debug(f"Skipping malformed table name: {databricks_table_fqn}") + else: + catalog_name, schema_name, table_name = parts + table_fqn = fqn.build( + metadata=self.metadata, + entity_type=Table, + service_name=self.config.serviceName, + database_name=catalog_name, + schema_name=schema_name, + table_name=table_name, ) - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.warning(f"Failed to cache table lineage for catalog [{catalog_name}]: {exc}") + entity = self.metadata.get_by_name(entity=Table, fqn=table_fqn) # pyright: ignore[reportArgumentType] + return entity + + def _is_filtered_table(self, databricks_table_fqn: str) -> bool: + is_filtered = False + parts = databricks_table_fqn.split(".") + if len(parts) == 3: + _, schema_name, table_name = parts + is_filtered = filter_by_schema( + self.source_config.schemaFilterPattern, # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess] + schema_name, + ) or filter_by_table( + self.source_config.tableFilterPattern, # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess] + table_name, + ) + return is_filtered - try: - with self.engine.connect() as conn: - rows = conn.execute( - text( - UNITY_CATALOG_COLUMN_LINEAGE.format(query_log_duration=query_log_duration, catalog=catalog_name) + @staticmethod + def _parse_column_pairs(raw: object) -> List[Tuple[str, str]]: # noqa: UP006 + """ + Decode the server-aggregated `column_pairs` JSON into a list of + (source_column, target_column) tuples. The driver can hand back either a + JSON string or an already-parsed list, so handle both. + """ + pairs: List[Tuple[str, str]] = [] # noqa: UP006 + if isinstance(raw, str): + try: + raw = json.loads(raw) + except (ValueError, TypeError): + raw = None + if isinstance(raw, list): + for item in raw: + if isinstance(item, dict): + source_col = item.get("u") or item.get("U") + target_col = item.get("d") or item.get("D") + if source_col and target_col: + pairs.append((source_col, target_col)) + return pairs + + def _build_column_lineage( + self, from_table: Table, to_table: Table, raw_column_pairs: object + ) -> List[ColumnLineage]: # noqa: UP006 + col_lineage = [] + for source_col, target_col in self._parse_column_pairs(raw_column_pairs): + from_col_fqn = get_column_fqn(from_table, source_col) + to_col_fqn = get_column_fqn(to_table, target_col) + if from_col_fqn and to_col_fqn and from_col_fqn != to_col_fqn: + col_lineage.append(ColumnLineage(fromColumns=[from_col_fqn], toColumn=to_col_fqn)) # pyright: ignore[reportCallIssue] + return col_lineage + + def _build_table_edge(self, row: Any) -> Optional[AddLineageRequest]: # noqa: UP045 + """ + Resolve both endpoints of a streamed lineage row to OpenMetadata tables + and build the lineage request with column lineage attached. Returns None + when either side is filtered out or not present in OpenMetadata. + """ + edge = None + source_fqn = row.source_table_full_name + target_fqn = row.target_table_full_name + if not (self._is_filtered_table(source_fqn) or self._is_filtered_table(target_fqn)): + from_entity = self._resolve_table(source_fqn) + to_entity = self._resolve_table(target_fqn) + if from_entity and to_entity: + column_lineage = self._build_column_lineage(from_entity, to_entity, row.column_pairs) + edge = AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference(id=from_entity.id, type="table"), # pyright: ignore[reportCallIssue] + toEntity=EntityReference(id=to_entity.id, type="table"), # pyright: ignore[reportCallIssue] + lineageDetails=LineageDetails( # pyright: ignore[reportCallIssue] + columnsLineage=column_lineage or None, + source=LineageSource.QueryLineage, + ), ) ) - for row in rows: - table_key = ( - row.source_table_full_name.lower(), - row.target_table_full_name.lower(), - ) - self.column_lineage_map[table_key].append((row.source_column_name, row.target_column_name)) - logger.info( - f"Cached column lineage for catalog [{catalog_name}]: " - f"{sum(len(v) for v in self.column_lineage_map.values())} " - f"column mappings for {len(self.column_lineage_map)} table pairs" - ) - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.warning(f"Failed to cache column lineage for catalog [{catalog_name}]: {exc}") + else: + logger.debug( + f"Skipping edge, table not found in OpenMetadata: " + f"{source_fqn} (found={from_entity is not None}) -> " + f"{target_fqn} (found={to_entity is not None})" + ) + return edge - def _cache_external_locations(self, catalog_name: str): + def _fetch_lineage_rows(self, catalog_name: str, window_start: datetime, window_end: datetime) -> Iterable: """ - Fetch external table storage locations for a single catalog from - system.information_schema.tables. FQNs are lowercased so lookups - are case-insensitive. + Run the combined lineage query for one catalog and one [start, end) + window, streaming rows so the driver does not buffer the whole result + set. A failure on one window is logged and swallowed so the remaining + windows still run. """ - logger.info(f"Caching external table locations for catalog [{catalog_name}]") + sql_statement = UNITY_CATALOG_LINEAGE.format( + catalog=catalog_name, + start_time=window_start, + end_time=window_end, + ) try: with self.engine.connect() as conn: - rows = conn.execute(text(UNITY_CATALOG_EXTERNAL_TABLES.format(catalog=catalog_name))) - for row in rows: - table_fqn = f"{row.table_catalog}.{row.table_schema}.{row.table_name}".lower() - self.external_location_map[table_fqn] = row.storage_path - logger.info( - f"Cached {len(self.external_location_map)} external table locations for catalog [{catalog_name}]" - ) + rows = conn.execution_options(stream_results=True, max_row_buffer=1000).execute(text(sql_statement)) + yield from rows except Exception as exc: + logger.warning( + f"Failed to fetch lineage for catalog [{catalog_name}] window {window_start} - {window_end}: {exc}" + ) logger.debug(traceback.format_exc()) - logger.warning(f"Failed to cache external table locations for catalog [{catalog_name}]: {exc}") - def _clear_lineage_caches(self): + def _yield_catalog_table_lineage(self, catalog_name: str) -> Iterable[Either[AddLineageRequest]]: """ - Release the per-catalog lineage maps so memory stays bounded to one - catalog at a time. + Stream table/column lineage for one catalog, one day-window at a time, + emitting one request per resolved edge. Per-row failures surface as + Either(left) instead of being swallowed. """ - self.table_lineage_map.clear() - self.column_lineage_map.clear() - self.external_location_map.clear() + emitted = 0 + skipped = 0 + failed = 0 + for window_start, window_end in self._iter_date_windows(): + for row in self._fetch_lineage_rows(catalog_name, window_start, window_end): + try: + edge = self._build_table_edge(row) + except Exception as exc: + failed += 1 + yield Either( # pyright: ignore[reportCallIssue] + left=StackTraceError( + name=row.target_table_full_name, + error=( + f"Error processing lineage {row.source_table_full_name} -> " + f"{row.target_table_full_name}: {exc}" + ), + stackTrace=traceback.format_exc(), + ) + ) + continue + if edge is None: + skipped += 1 + continue + emitted += 1 + yield Either(right=edge) # pyright: ignore[reportCallIssue] + logger.info( + f"Table lineage for catalog [{catalog_name}]: emitted {emitted} edges, " + f"skipped {skipped} (filtered or unresolved tables), failed {failed} (row errors)" + ) def _get_data_model_column_fqn(self, data_model_entity: ContainerDataModel, column: str) -> Optional[str]: # noqa: UP045 if not data_model_entity: @@ -215,47 +342,20 @@ def _get_container_column_lineage( logger.debug(traceback.format_exc()) return None - def _get_column_lineage_details( - self, - from_table: Table, - to_table: Table, - source_table_fqn: str, - target_table_fqn: str, - ) -> Optional[LineageDetails]: # noqa: UP045 - try: - table_key = (source_table_fqn, target_table_fqn) - column_pairs = self.column_lineage_map.get(table_key, []) - if not column_pairs: - return None - - col_lineage = [] - for source_col, target_col in column_pairs: - from_col_fqn = get_column_fqn(from_table, source_col) - to_col_fqn = get_column_fqn(to_table, target_col) - if from_col_fqn and to_col_fqn and from_col_fqn != to_col_fqn: - col_lineage.append(ColumnLineage(fromColumns=[from_col_fqn], toColumn=to_col_fqn)) - - if col_lineage: - return LineageDetails(columnsLineage=col_lineage, source=LineageSource.QueryLineage) - return None # noqa: TRY300 - except Exception as exc: - logger.warning(f"Error computing column lineage for {source_table_fqn} -> {target_table_fqn}: {exc}") - logger.debug(traceback.format_exc()) - return None - def _process_external_location_lineage( - self, table: Table, databricks_table_fqn: str + self, + table: Table, + storage_path: Optional[str], # noqa: UP045 ) -> Iterable[Either[AddLineageRequest]]: """ - Look up external table storage location from cache and create - container lineage if a matching container is found. + Create container lineage for an external table from its storage path, + if a matching container has been ingested. """ - storage_location = self.external_location_map.get(databricks_table_fqn) - if not storage_location: + if not storage_path: return try: - storage_location = storage_location.rstrip("/") + storage_location = storage_path.rstrip("/") location_entity = self.metadata.es_search_container_by_path(full_path=storage_location, fields="dataModel") if location_entity and location_entity[0]: @@ -281,67 +381,40 @@ def _process_external_location_lineage( except Exception as exc: yield Either( # pyright: ignore[reportCallIssue] left=StackTraceError( - name=databricks_table_fqn, - error=f"Error processing external location lineage for {databricks_table_fqn}: {exc}", + name=table.fullyQualifiedName.root, # pyright: ignore[reportOptionalMemberAccess] + error=f"Error processing external location lineage for {storage_path}: {exc}", stackTrace=traceback.format_exc(), ) ) - def _process_table_lineage(self, table: Table, databricks_table_fqn: str) -> Iterable[Either[AddLineageRequest]]: - upstream_tables = self.table_lineage_map.get(databricks_table_fqn, set()) - - for source_table_full_name in upstream_tables: - try: - parts = source_table_full_name.split(".") - if len(parts) != 3: - logger.debug(f"Skipping malformed source table name: {source_table_full_name}") - continue - catalog_name, schema_name, table_name = parts - - from_entity_fqn = fqn.build( - metadata=self.metadata, - entity_type=Table, - database_name=catalog_name, - schema_name=schema_name, - table_name=table_name, - service_name=self.config.serviceName, - ) - - from_entity = self.metadata.get_by_name(entity=Table, fqn=from_entity_fqn) - if not from_entity: - logger.debug(f"Unable to find upstream entity: {source_table_full_name} -> {databricks_table_fqn}") - continue - - lineage_details = self._get_column_lineage_details( - from_table=from_entity, - to_table=table, - source_table_fqn=source_table_full_name, - target_table_fqn=databricks_table_fqn, - ) - - yield Either( - right=AddLineageRequest( - edge=EntitiesEdge( - toEntity=EntityReference(id=table.id, type="table"), - fromEntity=EntityReference(id=from_entity.id, type="table"), - lineageDetails=lineage_details, - ) - ), - ) - except Exception as exc: - yield Either( # pyright: ignore[reportCallIssue] - left=StackTraceError( - name=databricks_table_fqn, - error=f"Error processing lineage {source_table_full_name} -> {databricks_table_fqn}: {exc}", - stackTrace=traceback.format_exc(), - ) + def _yield_catalog_external_lineage(self, catalog_name: str) -> Iterable[Either[AddLineageRequest]]: + """ + Stream external tables for one catalog and create container lineage for + each, resolving the table through the shared bounded LRU. External-table + storage paths are a current snapshot, not an event stream, so this is a + single per-catalog scan rather than a windowed one. + """ + try: + with self.engine.connect() as conn: + rows = conn.execution_options(stream_results=True, max_row_buffer=1000).execute( + text(UNITY_CATALOG_EXTERNAL_TABLES.format(catalog=catalog_name)) ) + for row in rows: + databricks_table_fqn = f"{row.table_catalog}.{row.table_schema}.{row.table_name}".lower() + if self._is_filtered_table(databricks_table_fqn): + continue + table_entity = self._resolve_table(databricks_table_fqn) + if table_entity: + yield from self._process_external_location_lineage(table_entity, row.storage_path) + except Exception as exc: + logger.warning(f"Failed to fetch external table locations for catalog [{catalog_name}]: {exc}") + logger.debug(traceback.format_exc()) def _iter(self, *_, **__) -> Iterable[Either[AddLineageRequest]]: """ - Fetch lineage from system tables for both table-to-table - and external location lineage, one catalog at a time so the - cached lineage maps stay bounded to a single catalog. + Stream table/column and external-location lineage one catalog at a time. + Catalogs are scoped at the SQL layer and resolved edges share a single + bounded table-resolution cache across the whole run. """ for database in self.metadata.list_all_entities(entity=Database, params={"service": self.config.serviceName}): if filter_by_database(self.source_config.databaseFilterPattern, database.name.root): # pyright: ignore[reportAttributeAccessIssue] @@ -351,53 +424,9 @@ def _iter(self, *_, **__) -> Iterable[Either[AddLineageRequest]]: ) continue - yield from self._process_catalog_lineage(database) - - def _process_catalog_lineage(self, database: Database) -> Iterable[Either[AddLineageRequest]]: - """ - Cache lineage scoped to one catalog, process its tables, and release - the caches before moving to the next catalog. - """ - catalog_name = database.name.root.lower() - self._cache_lineage(catalog_name) - self._cache_external_locations(catalog_name) - try: - for schema in self.metadata.list_all_entities( - entity=DatabaseSchema, - params={"database": database.fullyQualifiedName.root}, # pyright: ignore[reportOptionalMemberAccess] - ): - if filter_by_schema(self.source_config.schemaFilterPattern, schema.name.root): # pyright: ignore[reportAttributeAccessIssue] - self.status.filter( - schema.fullyQualifiedName.root, - "Schema Filtered Out", - ) - continue - - yield from self._process_schema_lineage(schema) - finally: - self._clear_lineage_caches() - - def _process_schema_lineage(self, schema: DatabaseSchema) -> Iterable[Either[AddLineageRequest]]: - """ - Process table and external location lineage for every - non-filtered table of a schema. - """ - for table in self.metadata.list_all_entities( - entity=Table, - params={"databaseSchema": schema.fullyQualifiedName.root}, # pyright: ignore[reportOptionalMemberAccess] - ): - if filter_by_table(self.source_config.tableFilterPattern, table.name.root): # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess] - self.status.filter( - table.fullyQualifiedName.root, - "Table Filtered Out", - ) - continue - - databricks_table_fqn = f"{table.database.name}.{table.databaseSchema.name}.{table.name.root}".lower() - - yield from self._process_table_lineage(table, databricks_table_fqn) - - yield from self._process_external_location_lineage(table, databricks_table_fqn) + catalog_name = database.name.root.lower() + yield from self._yield_catalog_table_lineage(catalog_name) + yield from self._yield_catalog_external_lineage(catalog_name) def test_connection(self) -> None: test_connection_common(self.metadata, self.connection_obj, self.service_connection) diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py index d063e1002457..ecaed6f2b46f 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py @@ -58,39 +58,44 @@ UNITY_CATALOG_GET_TABLE_DDL = "SHOW CREATE TABLE `{database}`.`{schema}`.`{table}`" -UNITY_CATALOG_TABLE_LINEAGE = textwrap.dedent( - """ - SELECT - source_table_full_name, - target_table_full_name - FROM system.access.table_lineage - WHERE event_time >= current_date() - INTERVAL {query_log_duration} DAYS - AND source_table_full_name IS NOT NULL - AND target_table_full_name IS NOT NULL - AND lower(split_part(target_table_full_name, '.', 1)) = '{catalog}' - GROUP BY source_table_full_name, target_table_full_name - """ -) - -UNITY_CATALOG_COLUMN_LINEAGE = textwrap.dedent( - """ +UNITY_CATALOG_LINEAGE = textwrap.dedent( + """ + WITH column_pairs AS ( + SELECT + source_table_full_name, + target_table_full_name, + collect_set( + struct(source_column_name AS u, target_column_name AS d) + ) AS pairs + FROM system.access.column_lineage + WHERE event_time >= to_timestamp('{start_time}') + AND event_time < to_timestamp('{end_time}') + AND source_table_full_name IS NOT NULL + AND target_table_full_name IS NOT NULL + AND source_column_name IS NOT NULL + AND target_column_name IS NOT NULL + AND lower(split_part(target_table_full_name, '.', 1)) = '{catalog}' + GROUP BY source_table_full_name, target_table_full_name + ), + table_edges AS ( + SELECT DISTINCT + source_table_full_name, + target_table_full_name + FROM system.access.table_lineage + WHERE event_time >= to_timestamp('{start_time}') + AND event_time < to_timestamp('{end_time}') + AND source_table_full_name IS NOT NULL + AND target_table_full_name IS NOT NULL + AND lower(split_part(target_table_full_name, '.', 1)) = '{catalog}' + ) SELECT - source_table_full_name, - source_column_name, - target_table_full_name, - target_column_name - FROM system.access.column_lineage - WHERE event_time >= current_date() - INTERVAL {query_log_duration} DAYS - AND source_table_full_name IS NOT NULL - AND target_table_full_name IS NOT NULL - AND source_column_name IS NOT NULL - AND target_column_name IS NOT NULL - AND lower(split_part(target_table_full_name, '.', 1)) = '{catalog}' - GROUP BY - source_table_full_name, - source_column_name, - target_table_full_name, - target_column_name + t.source_table_full_name AS source_table_full_name, + t.target_table_full_name AS target_table_full_name, + to_json(c.pairs) AS column_pairs + FROM table_edges t + LEFT JOIN column_pairs c + ON c.source_table_full_name = t.source_table_full_name + AND c.target_table_full_name = t.target_table_full_name """ ) diff --git a/ingestion/tests/unit/topology/database/test_unity_catalog_lineage.py b/ingestion/tests/unit/topology/database/test_unity_catalog_lineage.py index 1cf413ba493b..0d45d070460a 100644 --- a/ingestion/tests/unit/topology/database/test_unity_catalog_lineage.py +++ b/ingestion/tests/unit/topology/database/test_unity_catalog_lineage.py @@ -14,6 +14,8 @@ """ from collections import namedtuple +from datetime import timedelta +from itertools import pairwise from unittest.mock import MagicMock, Mock, patch from uuid import uuid4 @@ -37,7 +39,6 @@ from metadata.generated.schema.type.basic import EntityName, FullyQualifiedEntityName from metadata.generated.schema.type.entityLineage import Source as LineageSource from metadata.generated.schema.type.entityReference import EntityReference -from metadata.ingestion.api.models import Either from metadata.ingestion.source.database.unitycatalog.lineage import ( UnitycatalogLineageSource, ) @@ -62,7 +63,7 @@ "httpPath": "/sql/1.0/warehouses/test", } }, - "sourceConfig": {"config": {"type": "DatabaseLineage"}}, + "sourceConfig": {"config": {"type": "DatabaseLineage", "queryLogDuration": 1}}, }, "sink": {"type": "metadata-rest", "config": {}}, "workflowConfig": { @@ -74,6 +75,28 @@ }, } +LineageRow = namedtuple( + "LineageRow", + ["source_table_full_name", "target_table_full_name", "column_pairs"], +) + + +def _make_table(name: str, fqn: str, columns=None) -> Table: + return Table( + id=uuid4(), + name=EntityName(root=name), + fullyQualifiedName=FullyQualifiedEntityName(root=fqn), + columns=columns or [], + ) + + +def _make_column(name: str, fqn: str) -> Column: + return Column( + name=ColumnName(root=name), + dataType=DataType.STRING, + fullyQualifiedName=FullyQualifiedEntityName(root=fqn), + ) + @pytest.fixture def lineage_source(): @@ -89,459 +112,312 @@ def lineage_source(): yield source -class TestCacheLineage: - def test_cache_table_lineage(self, lineage_source): - TableRow = namedtuple("TableRow", ["source_table_full_name", "target_table_full_name"]) - mock_rows = [ - TableRow("cat.schema.source1", "cat.schema.target1"), - TableRow("cat.schema.source2", "cat.schema.target1"), - TableRow("cat.schema.source1", "cat.schema.target2"), - ] +def _mock_query_rows(lineage_source, rows): + mock_conn = MagicMock() + mock_conn.execution_options.return_value.execute.return_value = rows + mock_conn.execute.return_value = rows + lineage_source.engine.connect.return_value.__enter__ = Mock(return_value=mock_conn) + lineage_source.engine.connect.return_value.__exit__ = Mock(return_value=False) + return mock_conn - mock_conn = MagicMock() - mock_conn.execute.return_value = mock_rows - lineage_source.engine.connect.return_value.__enter__ = Mock(return_value=mock_conn) - lineage_source.engine.connect.return_value.__exit__ = Mock(return_value=False) - lineage_source._cache_lineage("cat") +class TestResolveChunkDays: + def test_uses_configured_value(self, lineage_source): + lineage_source.service_connection.lineageQueryChunkSize = 14 - assert "cat.schema.target1" in lineage_source.table_lineage_map - assert lineage_source.table_lineage_map["cat.schema.target1"] == { - "cat.schema.source1", - "cat.schema.source2", - } - assert lineage_source.table_lineage_map["cat.schema.target2"] == { - "cat.schema.source1", - } + assert lineage_source._resolve_chunk_days() == 14 - def test_cache_column_lineage(self, lineage_source): - TableRow = namedtuple("TableRow", ["source_table_full_name", "target_table_full_name"]) - ColumnRow = namedtuple( - "ColumnRow", - [ - "source_table_full_name", - "source_column_name", - "target_table_full_name", - "target_column_name", - ], - ) + def test_clamps_to_at_least_one(self, lineage_source): + lineage_source.service_connection.lineageQueryChunkSize = 0 - call_count = 0 + assert lineage_source._resolve_chunk_days() == 1 - def mock_execute(query): - nonlocal call_count - call_count += 1 - if call_count == 1: - return [TableRow("cat.schema.src", "cat.schema.tgt")] - return [ - ColumnRow("cat.schema.src", "col_a", "cat.schema.tgt", "col_x"), - ColumnRow("cat.schema.src", "col_b", "cat.schema.tgt", "col_y"), - ] + def test_defaults_when_unset(self, lineage_source): + lineage_source.service_connection.lineageQueryChunkSize = None - mock_conn = MagicMock() - mock_conn.execute.side_effect = mock_execute - lineage_source.engine.connect.return_value.__enter__ = Mock(return_value=mock_conn) - lineage_source.engine.connect.return_value.__exit__ = Mock(return_value=False) + assert lineage_source._resolve_chunk_days() == 7 - lineage_source._cache_lineage("cat") - - key = ("cat.schema.src", "cat.schema.tgt") - assert key in lineage_source.column_lineage_map - assert lineage_source.column_lineage_map[key] == [ - ("col_a", "col_x"), - ("col_b", "col_y"), - ] - - def test_cache_lineage_handles_query_failure(self, lineage_source): - mock_conn = MagicMock() - mock_conn.execute.side_effect = Exception("Access denied") - lineage_source.engine.connect.return_value.__enter__ = Mock(return_value=mock_conn) - lineage_source.engine.connect.return_value.__exit__ = Mock(return_value=False) - lineage_source._cache_lineage("cat") +class TestDateWindows: + def test_windows_are_contiguous_and_chunk_sized(self, lineage_source): + lineage_source._chunk_days = 1 + lineage_source.source_config.queryLogDuration = 3 - assert len(lineage_source.table_lineage_map) == 0 - assert len(lineage_source.column_lineage_map) == 0 + windows = list(lineage_source._iter_date_windows()) - def test_cache_lineage_normalizes_case(self, lineage_source): - TableRow = namedtuple("TableRow", ["source_table_full_name", "target_table_full_name"]) - mock_rows = [ - TableRow("Cat.Schema.Source1", "CAT.SCHEMA.Target1"), - ] + assert len(windows) == 4 + for window_start, window_end in windows: + assert window_end - window_start <= timedelta(days=1) + for prev, curr in pairwise(windows): + assert prev[1] == curr[0] - mock_conn = MagicMock() - mock_conn.execute.return_value = mock_rows - lineage_source.engine.connect.return_value.__enter__ = Mock(return_value=mock_conn) - lineage_source.engine.connect.return_value.__exit__ = Mock(return_value=False) + def test_larger_chunk_issues_fewer_windows(self, lineage_source): + lineage_source._chunk_days = 7 + lineage_source.source_config.queryLogDuration = 30 - lineage_source._cache_lineage("cat") + windows = list(lineage_source._iter_date_windows()) - assert lineage_source.table_lineage_map["cat.schema.target1"] == {"cat.schema.source1"} + assert len(windows) == 5 + for prev, curr in pairwise(windows): + assert prev[1] == curr[0] - def test_cache_lineage_query_scoped_to_catalog(self, lineage_source): - mock_conn = MagicMock() - mock_conn.execute.return_value = [] - lineage_source.engine.connect.return_value.__enter__ = Mock(return_value=mock_conn) - lineage_source.engine.connect.return_value.__exit__ = Mock(return_value=False) + def test_window_covers_full_lookback(self, lineage_source): + lineage_source._chunk_days = 1 + lineage_source.source_config.queryLogDuration = 2 - lineage_source._cache_lineage("my_catalog") + windows = list(lineage_source._iter_date_windows()) - executed_queries = [str(call.args[0]) for call in mock_conn.execute.call_args_list] - assert len(executed_queries) == 2 - for query in executed_queries: - assert "= 'my_catalog'" in query + assert windows[-1][1] - windows[0][0] == timedelta(days=3) - def test_clear_lineage_caches(self, lineage_source): - lineage_source.table_lineage_map["cat.schema.tgt"].add("cat.schema.src") - lineage_source.column_lineage_map[("cat.schema.src", "cat.schema.tgt")].append(("col_a", "col_x")) - lineage_source.external_location_map["cat.schema.ext"] = "s3://bucket/path" - lineage_source._clear_lineage_caches() +class TestResolveTable: + def test_resolves_and_caches_hit(self, lineage_source): + source_table = _make_table("source", "local_unitycatalog.cat.schema.source") + lineage_source.metadata.get_by_name.return_value = source_table - assert len(lineage_source.table_lineage_map) == 0 - assert len(lineage_source.column_lineage_map) == 0 - assert len(lineage_source.external_location_map) == 0 + first = lineage_source._resolve_table("cat.schema.source") + second = lineage_source._resolve_table("CAT.SCHEMA.SOURCE") + assert first is source_table + assert second is source_table + assert lineage_source.metadata.get_by_name.call_count == 1 -class TestProcessTableLineage: - def test_process_table_lineage_from_cache(self, lineage_source): - lineage_source.table_lineage_map = {"cat.schema.target": {"cat.schema.source"}} - lineage_source.column_lineage_map = {} + def test_caches_misses(self, lineage_source): + lineage_source.metadata.get_by_name.return_value = None - target_table = Table( - id=uuid4(), - name=EntityName(root="target"), - fullyQualifiedName=FullyQualifiedEntityName(root="local_unitycatalog.cat.schema.target"), - columns=[], - ) + assert lineage_source._resolve_table("cat.schema.missing") is None + assert lineage_source._resolve_table("cat.schema.missing") is None + assert lineage_source.metadata.get_by_name.call_count == 1 - source_table = Table( - id=uuid4(), - name=EntityName(root="source"), - fullyQualifiedName=FullyQualifiedEntityName(root="local_unitycatalog.cat.schema.source"), - columns=[], - ) + def test_malformed_name_not_resolved(self, lineage_source): + assert lineage_source._resolve_table("not_a_fqn") is None + lineage_source.metadata.get_by_name.assert_not_called() - lineage_source.metadata.get_by_name.return_value = source_table - results = list(lineage_source._process_table_lineage(target_table, "cat.schema.target")) +class TestColumnPairs: + def test_parses_json_string(self, lineage_source): + raw = '[{"u":"col_a","d":"col_x"},{"u":"col_b","d":"col_y"}]' - assert len(results) == 1 - assert isinstance(results[0], Either) - assert isinstance(results[0].right, AddLineageRequest) - assert results[0].right.edge.fromEntity.id == source_table.id - assert results[0].right.edge.toEntity.id == target_table.id + assert lineage_source._parse_column_pairs(raw) == [("col_a", "col_x"), ("col_b", "col_y")] - def test_process_table_lineage_with_column_lineage(self, lineage_source): - lineage_source.table_lineage_map = {"cat.schema.target": {"cat.schema.source"}} - lineage_source.column_lineage_map = {("cat.schema.source", "cat.schema.target"): [("col_a", "col_x")]} + def test_parses_already_parsed_list(self, lineage_source): + raw = [{"u": "col_a", "d": "col_x"}] - target_table = Table( - id=uuid4(), - name=EntityName(root="target"), - fullyQualifiedName=FullyQualifiedEntityName(root="local_unitycatalog.cat.schema.target"), - columns=[ - Column( - name=ColumnName(root="col_x"), - dataType=DataType.STRING, - fullyQualifiedName=FullyQualifiedEntityName(root="local_unitycatalog.cat.schema.target.col_x"), - ) - ], - ) + assert lineage_source._parse_column_pairs(raw) == [("col_a", "col_x")] - source_table = Table( - id=uuid4(), - name=EntityName(root="source"), - fullyQualifiedName=FullyQualifiedEntityName(root="local_unitycatalog.cat.schema.source"), - columns=[ - Column( - name=ColumnName(root="col_a"), - dataType=DataType.STRING, - fullyQualifiedName=FullyQualifiedEntityName(root="local_unitycatalog.cat.schema.source.col_a"), - ) - ], - ) + def test_handles_none_and_malformed(self, lineage_source): + assert lineage_source._parse_column_pairs(None) == [] + assert lineage_source._parse_column_pairs("not json") == [] + assert lineage_source._parse_column_pairs('[{"u":"only_source"}]') == [] - lineage_source.metadata.get_by_name.return_value = source_table - results = list(lineage_source._process_table_lineage(target_table, "cat.schema.target")) +class TestBuildTableEdge: + def test_builds_edge_without_columns(self, lineage_source): + source_table = _make_table("source", "local_unitycatalog.cat.schema.source") + target_table = _make_table("target", "local_unitycatalog.cat.schema.target") + lineage_source.metadata.get_by_name.side_effect = [source_table, target_table] - assert len(results) == 1 - lineage_details = results[0].right.edge.lineageDetails - assert lineage_details is not None - assert len(lineage_details.columnsLineage) == 1 - assert lineage_details.columnsLineage[0].fromColumns[0].root == "local_unitycatalog.cat.schema.source.col_a" - assert lineage_details.columnsLineage[0].toColumn.root == "local_unitycatalog.cat.schema.target.col_x" + row = LineageRow("cat.schema.source", "cat.schema.target", None) + edge = lineage_source._build_table_edge(row) - def test_process_table_lineage_skips_malformed_names(self, lineage_source): - lineage_source.table_lineage_map = {"cat.schema.target": {"malformed_name"}} - lineage_source.column_lineage_map = {} + assert isinstance(edge, AddLineageRequest) + assert edge.edge.fromEntity.id == source_table.id + assert edge.edge.toEntity.id == target_table.id - target_table = Table( - id=uuid4(), - name=EntityName(root="target"), - fullyQualifiedName=FullyQualifiedEntityName(root="local_unitycatalog.cat.schema.target"), - columns=[], + def test_builds_edge_with_column_lineage(self, lineage_source): + source_table = _make_table( + "source", + "local_unitycatalog.cat.schema.source", + [_make_column("col_a", "local_unitycatalog.cat.schema.source.col_a")], ) - - results = list(lineage_source._process_table_lineage(target_table, "cat.schema.target")) - - assert len(results) == 0 - - def test_process_table_lineage_skips_missing_entity(self, lineage_source): - lineage_source.table_lineage_map = {"cat.schema.target": {"cat.schema.source"}} - lineage_source.column_lineage_map = {} - - target_table = Table( - id=uuid4(), - name=EntityName(root="target"), - fullyQualifiedName=FullyQualifiedEntityName(root="local_unitycatalog.cat.schema.target"), - columns=[], + target_table = _make_table( + "target", + "local_unitycatalog.cat.schema.target", + [_make_column("col_x", "local_unitycatalog.cat.schema.target.col_x")], ) + lineage_source.metadata.get_by_name.side_effect = [source_table, target_table] - lineage_source.metadata.get_by_name.return_value = None - - results = list(lineage_source._process_table_lineage(target_table, "cat.schema.target")) + row = LineageRow("cat.schema.source", "cat.schema.target", '[{"u":"col_a","d":"col_x"}]') + edge = lineage_source._build_table_edge(row) - assert len(results) == 0 + column_lineage = edge.edge.lineageDetails.columnsLineage + assert len(column_lineage) == 1 + assert column_lineage[0].fromColumns[0].root == "local_unitycatalog.cat.schema.source.col_a" + assert column_lineage[0].toColumn.root == "local_unitycatalog.cat.schema.target.col_x" - def test_process_table_lineage_failure_yields_left(self, lineage_source): - lineage_source.table_lineage_map = {"cat.schema.target": {"cat.schema.source"}} - lineage_source.column_lineage_map = {} + def test_returns_none_when_target_unresolved(self, lineage_source): + source_table = _make_table("source", "local_unitycatalog.cat.schema.source") + lineage_source.metadata.get_by_name.side_effect = [source_table, None] - target_table = Table( - id=uuid4(), - name=EntityName(root="target"), - fullyQualifiedName=FullyQualifiedEntityName(root="local_unitycatalog.cat.schema.target"), - columns=[], - ) + row = LineageRow("cat.schema.source", "cat.schema.target", None) - lineage_source.metadata.get_by_name.side_effect = Exception("Connection reset") + assert lineage_source._build_table_edge(row) is None - results = list(lineage_source._process_table_lineage(target_table, "cat.schema.target")) - - assert len(results) == 1 - assert results[0].left is not None - assert results[0].right is None - assert "Connection reset" in results[0].left.error + def test_returns_none_when_table_filtered(self, lineage_source): + lineage_source.source_config.tableFilterPattern = MagicMock() + with patch( + "metadata.ingestion.source.database.unitycatalog.lineage.filter_by_table", + return_value=True, + ): + row = LineageRow("cat.schema.source", "cat.schema.target", None) + assert lineage_source._build_table_edge(row) is None + lineage_source.metadata.get_by_name.assert_not_called() + def test_self_loop_column_dropped(self, lineage_source): + source_table = _make_table( + "tbl", + "local_unitycatalog.cat.schema.tbl", + [_make_column("col_a", "local_unitycatalog.cat.schema.tbl.col_a")], + ) + same_table = source_table + lineage_source.metadata.get_by_name.side_effect = [source_table, same_table] -class TestColumnLineageDetails: - def test_self_loop_prevention(self, lineage_source): - lineage_source.column_lineage_map = {("cat.schema.src", "cat.schema.tgt"): [("col_a", "col_a")]} + row = LineageRow("cat.schema.tbl", "cat.schema.tbl", '[{"u":"col_a","d":"col_a"}]') + edge = lineage_source._build_table_edge(row) - table = Table( - id=uuid4(), - name=EntityName(root="tgt"), - fullyQualifiedName=FullyQualifiedEntityName(root="local_unitycatalog.cat.schema.tgt"), - columns=[ - Column( - name=ColumnName(root="col_a"), - dataType=DataType.STRING, - fullyQualifiedName=FullyQualifiedEntityName(root="local_unitycatalog.cat.schema.tgt.col_a"), - ) - ], - ) + assert edge.edge.lineageDetails.columnsLineage is None - same_table_as_source = Table( - id=uuid4(), - name=EntityName(root="src"), - fullyQualifiedName=FullyQualifiedEntityName(root="local_unitycatalog.cat.schema.src"), - columns=[ - Column( - name=ColumnName(root="col_a"), - dataType=DataType.STRING, - fullyQualifiedName=FullyQualifiedEntityName(root="local_unitycatalog.cat.schema.src.col_a"), - ) - ], - ) - result = lineage_source._get_column_lineage_details( - same_table_as_source, table, "cat.schema.src", "cat.schema.tgt" - ) +class TestYieldCatalogTableLineage: + def test_emits_resolved_edges(self, lineage_source): + lineage_source.source_config.queryLogDuration = 1 + source_table = _make_table("source", "local_unitycatalog.cat.schema.source") + target_table = _make_table("target", "local_unitycatalog.cat.schema.target") + lineage_source.metadata.get_by_name.side_effect = [source_table, target_table, None, None] - assert result is not None - assert len(result.columnsLineage) == 1 + rows = [LineageRow("cat.schema.source", "cat.schema.target", None)] + with patch.object(lineage_source, "_iter_date_windows", return_value=[("s", "e")]): + _mock_query_rows(lineage_source, rows) + results = list(lineage_source._yield_catalog_table_lineage("cat")) - def test_no_column_lineage_returns_none(self, lineage_source): - lineage_source.column_lineage_map = {} + assert len(results) == 1 + assert isinstance(results[0].right, AddLineageRequest) - table = Table( - id=uuid4(), - name=EntityName(root="tgt"), - fullyQualifiedName=FullyQualifiedEntityName(root="local_unitycatalog.cat.schema.tgt"), - columns=[], - ) - from_table = Table( - id=uuid4(), - name=EntityName(root="src"), - fullyQualifiedName=FullyQualifiedEntityName(root="local_unitycatalog.cat.schema.src"), - columns=[], - ) + def test_skips_unresolved_edges(self, lineage_source): + lineage_source.metadata.get_by_name.return_value = None - result = lineage_source._get_column_lineage_details(from_table, table, "cat.schema.src", "cat.schema.tgt") + rows = [LineageRow("cat.schema.source", "cat.schema.target", None)] + with patch.object(lineage_source, "_iter_date_windows", return_value=[("s", "e")]): + _mock_query_rows(lineage_source, rows) + results = list(lineage_source._yield_catalog_table_lineage("cat")) - assert result is None + assert results == [] + def test_row_failure_yields_left(self, lineage_source): + rows = [LineageRow("cat.schema.source", "cat.schema.target", None)] + with ( + patch.object(lineage_source, "_iter_date_windows", return_value=[("s", "e")]), + patch.object(lineage_source, "_build_table_edge", side_effect=Exception("boom")), + ): + _mock_query_rows(lineage_source, rows) + results = list(lineage_source._yield_catalog_table_lineage("cat")) -class TestExternalLocationLineage: - def test_cache_external_locations(self, lineage_source): - ExternalRow = namedtuple( - "ExternalRow", - ["table_catalog", "table_schema", "table_name", "storage_path"], - ) - mock_rows = [ - ExternalRow("cat", "schema", "ext_table1", "s3://bucket/path1"), - ExternalRow("cat", "schema", "ext_table2", "s3://bucket/path2/"), - ] + assert len(results) == 1 + assert results[0].left is not None + assert results[0].right is None + assert "boom" in results[0].left.error + def test_query_failure_is_swallowed(self, lineage_source): mock_conn = MagicMock() - mock_conn.execute.return_value = mock_rows + mock_conn.execution_options.return_value.execute.side_effect = Exception("Access denied") lineage_source.engine.connect.return_value.__enter__ = Mock(return_value=mock_conn) lineage_source.engine.connect.return_value.__exit__ = Mock(return_value=False) - lineage_source._cache_external_locations("cat") + with patch.object(lineage_source, "_iter_date_windows", return_value=[("s", "e")]): + results = list(lineage_source._yield_catalog_table_lineage("cat")) - assert len(lineage_source.external_location_map) == 2 - assert lineage_source.external_location_map["cat.schema.ext_table1"] == "s3://bucket/path1" - assert lineage_source.external_location_map["cat.schema.ext_table2"] == "s3://bucket/path2/" + assert results == [] - def test_cache_external_locations_handles_failure(self, lineage_source): - mock_conn = MagicMock() - mock_conn.execute.side_effect = Exception("Access denied") - lineage_source.engine.connect.return_value.__enter__ = Mock(return_value=mock_conn) - lineage_source.engine.connect.return_value.__exit__ = Mock(return_value=False) + def test_query_scoped_to_catalog_and_window(self, lineage_source): + with patch.object(lineage_source, "_iter_date_windows", return_value=[("2026-01-01", "2026-01-02")]): + mock_conn = _mock_query_rows(lineage_source, []) + list(lineage_source._yield_catalog_table_lineage("my_catalog")) - lineage_source._cache_external_locations("cat") + executed = str(mock_conn.execution_options.return_value.execute.call_args.args[0]) + assert "= 'my_catalog'" in executed + assert "2026-01-01" in executed + assert "2026-01-02" in executed - assert len(lineage_source.external_location_map) == 0 - def test_cache_external_locations_normalizes_case(self, lineage_source): +class TestExternalLocationLineage: + def test_yields_container_lineage(self, lineage_source): ExternalRow = namedtuple( "ExternalRow", ["table_catalog", "table_schema", "table_name", "storage_path"], ) - mock_rows = [ - ExternalRow("Cat", "Schema", "Ext_Table", "s3://bucket/Path1"), - ] - - mock_conn = MagicMock() - mock_conn.execute.return_value = mock_rows - lineage_source.engine.connect.return_value.__enter__ = Mock(return_value=mock_conn) - lineage_source.engine.connect.return_value.__exit__ = Mock(return_value=False) - - lineage_source._cache_external_locations("cat") - - assert lineage_source.external_location_map == {"cat.schema.ext_table": "s3://bucket/Path1"} - - def test_process_external_location_lineage_from_cache(self, lineage_source): - lineage_source.external_location_map = {"cat.schema.test_table": "s3://bucket/path"} - - table_entity = Table( - id=uuid4(), - name=EntityName(root="test_table"), - fullyQualifiedName=FullyQualifiedEntityName(root="service.db.schema.test_table"), - columns=[], - ) - + rows = [ExternalRow("cat", "schema", "ext_table", "s3://bucket/path/")] + table_entity = _make_table("ext_table", "local_unitycatalog.cat.schema.ext_table") container_entity = Container( id=uuid4(), name=EntityName(root="test_container"), service=EntityReference(id=uuid4(), type="storageService"), ) + lineage_source.metadata.get_by_name.return_value = table_entity lineage_source.metadata.es_search_container_by_path.return_value = [container_entity] + _mock_query_rows(lineage_source, rows) - results = list(lineage_source._process_external_location_lineage(table_entity, "cat.schema.test_table")) + results = list(lineage_source._yield_catalog_external_lineage("cat")) assert len(results) == 1 - assert isinstance(results[0], Either) - assert isinstance(results[0].right, AddLineageRequest) assert results[0].right.edge.fromEntity.id == container_entity.id assert results[0].right.edge.fromEntity.type == "container" assert results[0].right.edge.toEntity.id == table_entity.id - assert results[0].right.edge.toEntity.type == "table" - lineage_source.metadata.es_search_container_by_path.assert_called_once_with( full_path="s3://bucket/path", fields="dataModel" ) - def test_process_external_location_strips_trailing_slash(self, lineage_source): - lineage_source.external_location_map = {"cat.schema.test_table": "s3://test-bucket/data/"} - - table_entity = Table( - id=uuid4(), - name=EntityName(root="test_table"), - fullyQualifiedName=FullyQualifiedEntityName(root="service.db.schema.test_table"), - columns=[], - ) - - container_entity = Container( - id=uuid4(), - name=EntityName(root="test_container"), - service=EntityReference(id=uuid4(), type="storageService"), - ) - - lineage_source.metadata.es_search_container_by_path.return_value = [container_entity] - - results = list(lineage_source._process_external_location_lineage(table_entity, "cat.schema.test_table")) - - assert len(results) == 1 - lineage_source.metadata.es_search_container_by_path.assert_called_once_with( - full_path="s3://test-bucket/data", fields="dataModel" + def test_skips_when_table_not_in_om(self, lineage_source): + ExternalRow = namedtuple( + "ExternalRow", + ["table_catalog", "table_schema", "table_name", "storage_path"], ) + rows = [ExternalRow("cat", "schema", "ext_table", "s3://bucket/path")] + lineage_source.metadata.get_by_name.return_value = None + _mock_query_rows(lineage_source, rows) - def test_process_external_location_no_cache_entry(self, lineage_source): - lineage_source.external_location_map = {} + results = list(lineage_source._yield_catalog_external_lineage("cat")) - table_entity = Table( - id=uuid4(), - name=EntityName(root="test_table"), - fullyQualifiedName=FullyQualifiedEntityName(root="service.db.schema.test_table"), - columns=[], - ) - - results = list(lineage_source._process_external_location_lineage(table_entity, "cat.schema.test_table")) + assert results == [] + lineage_source.metadata.es_search_container_by_path.assert_not_called() - assert len(results) == 0 + def test_no_storage_path_yields_nothing(self, lineage_source): + table_entity = _make_table("test_table", "service.db.schema.test_table") - def test_process_external_location_no_container_found(self, lineage_source): - lineage_source.external_location_map = {"cat.schema.test_table": "s3://bucket/path"} + results = list(lineage_source._process_external_location_lineage(table_entity, None)) - table_entity = Table( - id=uuid4(), - name=EntityName(root="test_table"), - fullyQualifiedName=FullyQualifiedEntityName(root="service.db.schema.test_table"), - columns=[], - ) + assert results == [] + def test_no_container_found(self, lineage_source): + table_entity = _make_table("test_table", "service.db.schema.test_table") lineage_source.metadata.es_search_container_by_path.return_value = [] - results = list(lineage_source._process_external_location_lineage(table_entity, "cat.schema.test_table")) - - assert len(results) == 0 + results = list(lineage_source._process_external_location_lineage(table_entity, "s3://bucket/path")) - def test_process_external_location_failure_yields_left(self, lineage_source): - lineage_source.external_location_map = {"cat.schema.test_table": "s3://bucket/path"} - - table_entity = Table( - id=uuid4(), - name=EntityName(root="test_table"), - fullyQualifiedName=FullyQualifiedEntityName(root="service.db.schema.test_table"), - columns=[], - ) + assert results == [] + def test_failure_yields_left(self, lineage_source): + table_entity = _make_table("test_table", "service.db.schema.test_table") lineage_source.metadata.es_search_container_by_path.side_effect = Exception("Search unavailable") - results = list(lineage_source._process_external_location_lineage(table_entity, "cat.schema.test_table")) + results = list(lineage_source._process_external_location_lineage(table_entity, "s3://bucket/path")) assert len(results) == 1 assert results[0].left is not None - assert results[0].right is None assert "Search unavailable" in results[0].left.error + def test_external_query_failure_is_swallowed(self, lineage_source): + mock_conn = MagicMock() + mock_conn.execution_options.return_value.execute.side_effect = Exception("Access denied") + lineage_source.engine.connect.return_value.__enter__ = Mock(return_value=mock_conn) + lineage_source.engine.connect.return_value.__exit__ = Mock(return_value=False) + + results = list(lineage_source._yield_catalog_external_lineage("cat")) -class TestPerCatalogProcessing: + assert results == [] + + +class TestIter: def _make_database(self, name="cat"): return Database( id=uuid4(), @@ -550,52 +426,33 @@ def _make_database(self, name="cat"): service=EntityReference(id=uuid4(), type="databaseService"), ) - def test_process_catalog_lineage_scopes_and_clears_caches(self, lineage_source): - database = self._make_database() - lineage_source.metadata.list_all_entities.return_value = [] - - with ( - patch.object(lineage_source, "_cache_lineage") as mock_cache_lineage, - patch.object(lineage_source, "_cache_external_locations") as mock_cache_locations, - ): - lineage_source.table_lineage_map["cat.schema.tgt"].add("cat.schema.src") - lineage_source.external_location_map["cat.schema.ext"] = "s3://bucket/path" - - results = list(lineage_source._process_catalog_lineage(database)) - - assert results == [] - mock_cache_lineage.assert_called_once_with("cat") - mock_cache_locations.assert_called_once_with("cat") - assert len(lineage_source.table_lineage_map) == 0 - assert len(lineage_source.column_lineage_map) == 0 - assert len(lineage_source.external_location_map) == 0 - - def test_process_catalog_lineage_lowercases_catalog_name(self, lineage_source): - database = self._make_database(name="MyCat") - lineage_source.metadata.list_all_entities.return_value = [] + def test_processes_each_unfiltered_catalog(self, lineage_source): + lineage_source.metadata.list_all_entities.return_value = [ + self._make_database("CatOne"), + self._make_database("CatTwo"), + ] with ( - patch.object(lineage_source, "_cache_lineage") as mock_cache_lineage, - patch.object(lineage_source, "_cache_external_locations") as mock_cache_locations, + patch.object(lineage_source, "_yield_catalog_table_lineage", return_value=[]) as mock_table, + patch.object(lineage_source, "_yield_catalog_external_lineage", return_value=[]) as mock_external, ): - list(lineage_source._process_catalog_lineage(database)) + list(lineage_source._iter()) - mock_cache_lineage.assert_called_once_with("mycat") - mock_cache_locations.assert_called_once_with("mycat") + assert [c.args[0] for c in mock_table.call_args_list] == ["catone", "cattwo"] + assert [c.args[0] for c in mock_external.call_args_list] == ["catone", "cattwo"] - def test_process_catalog_lineage_clears_caches_on_error(self, lineage_source): - database = self._make_database() - lineage_source.metadata.list_all_entities.side_effect = Exception("API error") + def test_filters_catalog(self, lineage_source): + lineage_source.metadata.list_all_entities.return_value = [self._make_database("skipme")] + lineage_source.status = MagicMock() with ( - patch.object(lineage_source, "_cache_lineage"), - patch.object(lineage_source, "_cache_external_locations"), - pytest.raises(Exception, match="API error"), + patch("metadata.ingestion.source.database.unitycatalog.lineage.filter_by_database", return_value=True), + patch.object(lineage_source, "_yield_catalog_table_lineage") as mock_table, ): - lineage_source.table_lineage_map["cat.schema.tgt"].add("cat.schema.src") - list(lineage_source._process_catalog_lineage(database)) + list(lineage_source._iter()) - assert len(lineage_source.table_lineage_map) == 0 + mock_table.assert_not_called() + lineage_source.status.filter.assert_called_once() class TestContainerColumnLineage: diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/unityCatalogConnection.json b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/unityCatalogConnection.json index adae56758679..72cacd44fdc1 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/unityCatalogConnection.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/unityCatalogConnection.json @@ -80,6 +80,12 @@ "type": "integer", "default": 120 }, + "lineageQueryChunkSize": { + "title": "Lineage Query Chunk Size", + "description": "Number of days of lineage scanned per query against the Unity Catalog system tables. The configured lineage lookback window is split into chunks of this size and streamed one chunk at a time so each scan stays bounded. Larger values issue fewer queries (faster); smaller values keep each scan smaller.", + "type": "integer", + "default": 7 + }, "connectionOptions": { "title": "Connection Options", "$ref": "../connectionBasicType.json#/definitions/connectionOptions" From 867b77efd001a8cf3bad6148fb124ea4216a0891 Mon Sep 17 00:00:00 2001 From: ulixius9 Date: Tue, 9 Jun 2026 19:03:30 +0530 Subject: [PATCH 3/7] fix(unitycatalog): drop catalog SQL filter, lowercase FQN resolution Address gitar bot review on #28648: - Remove the catalog name interpolated into the lineage and external-table system-table scans (`'{catalog}'`). The streaming day-window model plus the bounded LRU already keep memory O(window), so per-catalog SQL scoping is redundant; dropping it also removes the string-interpolation/SQL-injection surface flagged in the review. Lineage now runs one global windowed scan and one external-table scan instead of looping per catalog. - Honor `databaseFilterPattern` per edge in `_is_filtered_table` (catalog now checked alongside schema/table) since `_iter` no longer enumerates catalogs. - Fix case-insensitive resolution: `_resolve_table` now resolves with the lowercased FQN it already computes for the cache key, so `Cat.Schema.Table` from the system tables matches an entity stored as `cat.schema.table` instead of silently dropping the edge. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../source/database/unitycatalog/lineage.py | 88 +++++++++---------- .../source/database/unitycatalog/queries.py | 3 - .../database/test_unity_catalog_lineage.py | 74 ++++++++-------- 3 files changed, 78 insertions(+), 87 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py index f213a2a7a2e8..07aabaa944c7 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py @@ -22,7 +22,6 @@ from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.container import ContainerDataModel -from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.services.connections.database.unityCatalogConnection import ( UnityCatalogConnection, @@ -69,7 +68,7 @@ class UnitycatalogLineageSource(Source): """ Lineage Unity Catalog Source. - Lineage edges are streamed one catalog and one day-window at a time from + Lineage edges are streamed one day-window at a time from `system.access.table_lineage` / `column_lineage` (the Databricks analogue of Snowflake's ACCESS_HISTORY). Column pairs are aggregated server-side per edge, and each endpoint is resolved to an OpenMetadata table through a @@ -148,7 +147,7 @@ def _resolve_table(self, databricks_table_fqn: str) -> Optional[Table]: # noqa: cache_key = databricks_table_fqn.lower() if cache_key in self._table_cache: return self._table_cache[cache_key] - entity = self._fetch_table_entity(databricks_table_fqn) + entity = self._fetch_table_entity(cache_key) self._table_cache[cache_key] = entity return entity @@ -174,13 +173,20 @@ def _is_filtered_table(self, databricks_table_fqn: str) -> bool: is_filtered = False parts = databricks_table_fqn.split(".") if len(parts) == 3: - _, schema_name, table_name = parts - is_filtered = filter_by_schema( - self.source_config.schemaFilterPattern, # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess] - schema_name, - ) or filter_by_table( - self.source_config.tableFilterPattern, # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess] - table_name, + catalog_name, schema_name, table_name = parts + is_filtered = ( + filter_by_database( + self.source_config.databaseFilterPattern, # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess] + catalog_name, + ) + or filter_by_schema( + self.source_config.schemaFilterPattern, # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess] + schema_name, + ) + or filter_by_table( + self.source_config.tableFilterPattern, # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess] + table_name, + ) ) return is_filtered @@ -249,15 +255,13 @@ def _build_table_edge(self, row: Any) -> Optional[AddLineageRequest]: # noqa: U ) return edge - def _fetch_lineage_rows(self, catalog_name: str, window_start: datetime, window_end: datetime) -> Iterable: + def _fetch_lineage_rows(self, window_start: datetime, window_end: datetime) -> Iterable: """ - Run the combined lineage query for one catalog and one [start, end) - window, streaming rows so the driver does not buffer the whole result - set. A failure on one window is logged and swallowed so the remaining - windows still run. + Run the combined lineage query for one [start, end) window, streaming + rows so the driver does not buffer the whole result set. A failure on one + window is logged and swallowed so the remaining windows still run. """ sql_statement = UNITY_CATALOG_LINEAGE.format( - catalog=catalog_name, start_time=window_start, end_time=window_end, ) @@ -266,22 +270,20 @@ def _fetch_lineage_rows(self, catalog_name: str, window_start: datetime, window_ rows = conn.execution_options(stream_results=True, max_row_buffer=1000).execute(text(sql_statement)) yield from rows except Exception as exc: - logger.warning( - f"Failed to fetch lineage for catalog [{catalog_name}] window {window_start} - {window_end}: {exc}" - ) + logger.warning(f"Failed to fetch lineage for window {window_start} - {window_end}: {exc}") logger.debug(traceback.format_exc()) - def _yield_catalog_table_lineage(self, catalog_name: str) -> Iterable[Either[AddLineageRequest]]: + def _yield_table_lineage(self) -> Iterable[Either[AddLineageRequest]]: """ - Stream table/column lineage for one catalog, one day-window at a time, - emitting one request per resolved edge. Per-row failures surface as - Either(left) instead of being swallowed. + Stream table/column lineage one day-window at a time, emitting one + request per resolved edge. Per-row failures surface as Either(left) + instead of being swallowed. """ emitted = 0 skipped = 0 failed = 0 for window_start, window_end in self._iter_date_windows(): - for row in self._fetch_lineage_rows(catalog_name, window_start, window_end): + for row in self._fetch_lineage_rows(window_start, window_end): try: edge = self._build_table_edge(row) except Exception as exc: @@ -303,7 +305,7 @@ def _yield_catalog_table_lineage(self, catalog_name: str) -> Iterable[Either[Add emitted += 1 yield Either(right=edge) # pyright: ignore[reportCallIssue] logger.info( - f"Table lineage for catalog [{catalog_name}]: emitted {emitted} edges, " + f"Table lineage: emitted {emitted} edges, " f"skipped {skipped} (filtered or unresolved tables), failed {failed} (row errors)" ) @@ -387,17 +389,18 @@ def _process_external_location_lineage( ) ) - def _yield_catalog_external_lineage(self, catalog_name: str) -> Iterable[Either[AddLineageRequest]]: + def _yield_external_lineage(self) -> Iterable[Either[AddLineageRequest]]: """ - Stream external tables for one catalog and create container lineage for - each, resolving the table through the shared bounded LRU. External-table - storage paths are a current snapshot, not an event stream, so this is a - single per-catalog scan rather than a windowed one. + Stream external tables and create container lineage for each, resolving + the table through the shared bounded LRU. External-table storage paths + are a current snapshot, not an event stream, so this is a single scan + rather than a windowed one. Catalogs excluded by the database filter are + dropped per row via `_is_filtered_table`. """ try: with self.engine.connect() as conn: rows = conn.execution_options(stream_results=True, max_row_buffer=1000).execute( - text(UNITY_CATALOG_EXTERNAL_TABLES.format(catalog=catalog_name)) + text(UNITY_CATALOG_EXTERNAL_TABLES) ) for row in rows: databricks_table_fqn = f"{row.table_catalog}.{row.table_schema}.{row.table_name}".lower() @@ -407,26 +410,19 @@ def _yield_catalog_external_lineage(self, catalog_name: str) -> Iterable[Either[ if table_entity: yield from self._process_external_location_lineage(table_entity, row.storage_path) except Exception as exc: - logger.warning(f"Failed to fetch external table locations for catalog [{catalog_name}]: {exc}") + logger.warning(f"Failed to fetch external table locations: {exc}") logger.debug(traceback.format_exc()) def _iter(self, *_, **__) -> Iterable[Either[AddLineageRequest]]: """ - Stream table/column and external-location lineage one catalog at a time. - Catalogs are scoped at the SQL layer and resolved edges share a single - bounded table-resolution cache across the whole run. + Stream table/column and external-location lineage across the whole + metastore. The system-table scans are no longer scoped per catalog; + catalogs excluded by `databaseFilterPattern` are dropped per edge during + resolution, and resolved edges share a single bounded table-resolution + cache across the whole run. """ - for database in self.metadata.list_all_entities(entity=Database, params={"service": self.config.serviceName}): - if filter_by_database(self.source_config.databaseFilterPattern, database.name.root): # pyright: ignore[reportAttributeAccessIssue] - self.status.filter( - database.fullyQualifiedName.root, - "Catalog Filtered Out", - ) - continue - - catalog_name = database.name.root.lower() - yield from self._yield_catalog_table_lineage(catalog_name) - yield from self._yield_catalog_external_lineage(catalog_name) + yield from self._yield_table_lineage() + yield from self._yield_external_lineage() def test_connection(self) -> None: test_connection_common(self.metadata, self.connection_obj, self.service_connection) diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py index ecaed6f2b46f..0a837488a7dd 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py @@ -74,7 +74,6 @@ AND target_table_full_name IS NOT NULL AND source_column_name IS NOT NULL AND target_column_name IS NOT NULL - AND lower(split_part(target_table_full_name, '.', 1)) = '{catalog}' GROUP BY source_table_full_name, target_table_full_name ), table_edges AS ( @@ -86,7 +85,6 @@ AND event_time < to_timestamp('{end_time}') AND source_table_full_name IS NOT NULL AND target_table_full_name IS NOT NULL - AND lower(split_part(target_table_full_name, '.', 1)) = '{catalog}' ) SELECT t.source_table_full_name AS source_table_full_name, @@ -109,7 +107,6 @@ FROM system.information_schema.tables WHERE table_type = 'EXTERNAL' AND storage_path IS NOT NULL - AND lower(table_catalog) = '{catalog}' """ ) diff --git a/ingestion/tests/unit/topology/database/test_unity_catalog_lineage.py b/ingestion/tests/unit/topology/database/test_unity_catalog_lineage.py index 0d45d070460a..61ce387f4e8d 100644 --- a/ingestion/tests/unit/topology/database/test_unity_catalog_lineage.py +++ b/ingestion/tests/unit/topology/database/test_unity_catalog_lineage.py @@ -26,7 +26,6 @@ Container, ContainerDataModel, ) -from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.table import ( Column, ColumnName, @@ -278,7 +277,7 @@ def test_self_loop_column_dropped(self, lineage_source): assert edge.edge.lineageDetails.columnsLineage is None -class TestYieldCatalogTableLineage: +class TestYieldTableLineage: def test_emits_resolved_edges(self, lineage_source): lineage_source.source_config.queryLogDuration = 1 source_table = _make_table("source", "local_unitycatalog.cat.schema.source") @@ -288,7 +287,7 @@ def test_emits_resolved_edges(self, lineage_source): rows = [LineageRow("cat.schema.source", "cat.schema.target", None)] with patch.object(lineage_source, "_iter_date_windows", return_value=[("s", "e")]): _mock_query_rows(lineage_source, rows) - results = list(lineage_source._yield_catalog_table_lineage("cat")) + results = list(lineage_source._yield_table_lineage()) assert len(results) == 1 assert isinstance(results[0].right, AddLineageRequest) @@ -299,7 +298,7 @@ def test_skips_unresolved_edges(self, lineage_source): rows = [LineageRow("cat.schema.source", "cat.schema.target", None)] with patch.object(lineage_source, "_iter_date_windows", return_value=[("s", "e")]): _mock_query_rows(lineage_source, rows) - results = list(lineage_source._yield_catalog_table_lineage("cat")) + results = list(lineage_source._yield_table_lineage()) assert results == [] @@ -310,7 +309,7 @@ def test_row_failure_yields_left(self, lineage_source): patch.object(lineage_source, "_build_table_edge", side_effect=Exception("boom")), ): _mock_query_rows(lineage_source, rows) - results = list(lineage_source._yield_catalog_table_lineage("cat")) + results = list(lineage_source._yield_table_lineage()) assert len(results) == 1 assert results[0].left is not None @@ -324,19 +323,19 @@ def test_query_failure_is_swallowed(self, lineage_source): lineage_source.engine.connect.return_value.__exit__ = Mock(return_value=False) with patch.object(lineage_source, "_iter_date_windows", return_value=[("s", "e")]): - results = list(lineage_source._yield_catalog_table_lineage("cat")) + results = list(lineage_source._yield_table_lineage()) assert results == [] - def test_query_scoped_to_catalog_and_window(self, lineage_source): + def test_query_scoped_to_window(self, lineage_source): with patch.object(lineage_source, "_iter_date_windows", return_value=[("2026-01-01", "2026-01-02")]): mock_conn = _mock_query_rows(lineage_source, []) - list(lineage_source._yield_catalog_table_lineage("my_catalog")) + list(lineage_source._yield_table_lineage()) executed = str(mock_conn.execution_options.return_value.execute.call_args.args[0]) - assert "= 'my_catalog'" in executed assert "2026-01-01" in executed assert "2026-01-02" in executed + assert "split_part" not in executed class TestExternalLocationLineage: @@ -357,7 +356,7 @@ def test_yields_container_lineage(self, lineage_source): lineage_source.metadata.es_search_container_by_path.return_value = [container_entity] _mock_query_rows(lineage_source, rows) - results = list(lineage_source._yield_catalog_external_lineage("cat")) + results = list(lineage_source._yield_external_lineage()) assert len(results) == 1 assert results[0].right.edge.fromEntity.id == container_entity.id @@ -376,7 +375,7 @@ def test_skips_when_table_not_in_om(self, lineage_source): lineage_source.metadata.get_by_name.return_value = None _mock_query_rows(lineage_source, rows) - results = list(lineage_source._yield_catalog_external_lineage("cat")) + results = list(lineage_source._yield_external_lineage()) assert results == [] lineage_source.metadata.es_search_container_by_path.assert_not_called() @@ -412,47 +411,46 @@ def test_external_query_failure_is_swallowed(self, lineage_source): lineage_source.engine.connect.return_value.__enter__ = Mock(return_value=mock_conn) lineage_source.engine.connect.return_value.__exit__ = Mock(return_value=False) - results = list(lineage_source._yield_catalog_external_lineage("cat")) + results = list(lineage_source._yield_external_lineage()) assert results == [] -class TestIter: - def _make_database(self, name="cat"): - return Database( - id=uuid4(), - name=EntityName(root=name), - fullyQualifiedName=FullyQualifiedEntityName(root=f"local_unitycatalog.{name}"), - service=EntityReference(id=uuid4(), type="databaseService"), - ) +class TestIsFilteredTable: + def test_filters_by_database(self, lineage_source): + with patch( + "metadata.ingestion.source.database.unitycatalog.lineage.filter_by_database", + return_value=True, + ): + assert lineage_source._is_filtered_table("system.schema.tbl") is True - def test_processes_each_unfiltered_catalog(self, lineage_source): - lineage_source.metadata.list_all_entities.return_value = [ - self._make_database("CatOne"), - self._make_database("CatTwo"), - ] + def test_not_filtered_when_all_patterns_pass(self, lineage_source): + assert lineage_source._is_filtered_table("cat.schema.tbl") is False + def test_malformed_name_not_filtered(self, lineage_source): + assert lineage_source._is_filtered_table("not_a_fqn") is False + + +class TestIter: + def test_chains_table_then_external_lineage(self, lineage_source): with ( - patch.object(lineage_source, "_yield_catalog_table_lineage", return_value=[]) as mock_table, - patch.object(lineage_source, "_yield_catalog_external_lineage", return_value=[]) as mock_external, + patch.object(lineage_source, "_yield_table_lineage", return_value=["t1", "t2"]) as mock_table, + patch.object(lineage_source, "_yield_external_lineage", return_value=["e1"]) as mock_external, ): - list(lineage_source._iter()) - - assert [c.args[0] for c in mock_table.call_args_list] == ["catone", "cattwo"] - assert [c.args[0] for c in mock_external.call_args_list] == ["catone", "cattwo"] + results = list(lineage_source._iter()) - def test_filters_catalog(self, lineage_source): - lineage_source.metadata.list_all_entities.return_value = [self._make_database("skipme")] - lineage_source.status = MagicMock() + mock_table.assert_called_once_with() + mock_external.assert_called_once_with() + assert results == ["t1", "t2", "e1"] + def test_does_not_enumerate_catalogs(self, lineage_source): with ( - patch("metadata.ingestion.source.database.unitycatalog.lineage.filter_by_database", return_value=True), - patch.object(lineage_source, "_yield_catalog_table_lineage") as mock_table, + patch.object(lineage_source, "_yield_table_lineage", return_value=[]), + patch.object(lineage_source, "_yield_external_lineage", return_value=[]), ): list(lineage_source._iter()) - mock_table.assert_not_called() - lineage_source.status.filter.assert_called_once() + lineage_source.metadata.list_all_entities.assert_not_called() class TestContainerColumnLineage: From 96e2077ce121638470585919f7c9eb3d616859e5 Mon Sep 17 00:00:00 2001 From: ulixius9 Date: Tue, 9 Jun 2026 19:11:25 +0530 Subject: [PATCH 4/7] fix(unitycatalog): normalize filter casing, split lineage skip counts Address gitar bot follow-up review on #28648: - Lowercase the FQN inside `_is_filtered_table` so the table/column-lineage path (raw system-table casing) and the external-table path (pre-lowercased) filter identically against `databaseFilterPattern`/schema/table patterns. - Split the table-lineage summary's `skipped` counter into `filtered` (dropped by databaseFilterPattern) and `unresolved` (tables absent from OpenMetadata) so operators can tell intentional filtering from missing metadata. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../source/database/unitycatalog/lineage.py | 20 +++++++++++++------ .../database/test_unity_catalog_lineage.py | 10 ++++++++++ 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py index 07aabaa944c7..245bd2e7f9df 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py @@ -171,7 +171,7 @@ def _fetch_table_entity(self, databricks_table_fqn: str) -> Optional[Table]: # def _is_filtered_table(self, databricks_table_fqn: str) -> bool: is_filtered = False - parts = databricks_table_fqn.split(".") + parts = databricks_table_fqn.lower().split(".") if len(parts) == 3: catalog_name, schema_name, table_name = parts is_filtered = ( @@ -277,13 +277,21 @@ def _yield_table_lineage(self) -> Iterable[Either[AddLineageRequest]]: """ Stream table/column lineage one day-window at a time, emitting one request per resolved edge. Per-row failures surface as Either(left) - instead of being swallowed. + instead of being swallowed. Edges dropped by `databaseFilterPattern` and + edges whose tables are absent from OpenMetadata are counted separately so + the summary distinguishes intentional filtering from missing metadata. """ emitted = 0 - skipped = 0 + filtered = 0 + unresolved = 0 failed = 0 for window_start, window_end in self._iter_date_windows(): for row in self._fetch_lineage_rows(window_start, window_end): + if self._is_filtered_table(row.source_table_full_name) or self._is_filtered_table( + row.target_table_full_name + ): + filtered += 1 + continue try: edge = self._build_table_edge(row) except Exception as exc: @@ -300,13 +308,13 @@ def _yield_table_lineage(self) -> Iterable[Either[AddLineageRequest]]: ) continue if edge is None: - skipped += 1 + unresolved += 1 continue emitted += 1 yield Either(right=edge) # pyright: ignore[reportCallIssue] logger.info( - f"Table lineage: emitted {emitted} edges, " - f"skipped {skipped} (filtered or unresolved tables), failed {failed} (row errors)" + f"Table lineage: emitted {emitted} edges, filtered {filtered} (databaseFilterPattern), " + f"unresolved {unresolved} (tables not in OpenMetadata), failed {failed} (row errors)" ) def _get_data_model_column_fqn(self, data_model_entity: ContainerDataModel, column: str) -> Optional[str]: # noqa: UP045 diff --git a/ingestion/tests/unit/topology/database/test_unity_catalog_lineage.py b/ingestion/tests/unit/topology/database/test_unity_catalog_lineage.py index 61ce387f4e8d..0336c25cd84c 100644 --- a/ingestion/tests/unit/topology/database/test_unity_catalog_lineage.py +++ b/ingestion/tests/unit/topology/database/test_unity_catalog_lineage.py @@ -430,6 +430,16 @@ def test_not_filtered_when_all_patterns_pass(self, lineage_source): def test_malformed_name_not_filtered(self, lineage_source): assert lineage_source._is_filtered_table("not_a_fqn") is False + def test_normalizes_case_before_filtering(self, lineage_source): + seen = [] + with patch( + "metadata.ingestion.source.database.unitycatalog.lineage.filter_by_schema", + side_effect=lambda _pattern, name: seen.append(name) or False, + ): + lineage_source._is_filtered_table("CAT.MySchema.MyTable") + + assert seen == ["myschema"] + class TestIter: def test_chains_table_then_external_lineage(self, lineage_source): From 3968edfec8870f7c849039d0619f876e76ca75f1 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Tue, 9 Jun 2026 14:04:47 +0000 Subject: [PATCH 5/7] Update generated TypeScript types --- .../ui/src/generated/api/automations/createWorkflow.ts | 7 +++++++ .../src/generated/api/services/createDatabaseService.ts | 7 +++++++ .../ingestionPipelines/createIngestionPipeline.ts | 7 +++++++ .../entity/automations/testServiceConnection.ts | 7 +++++++ .../ui/src/generated/entity/automations/workflow.ts | 7 +++++++ .../connections/database/unityCatalogConnection.ts | 9 ++++++++- .../entity/services/connections/serviceConnection.ts | 7 +++++++ .../ui/src/generated/entity/services/databaseService.ts | 7 +++++++ .../services/ingestionPipelines/ingestionPipeline.ts | 7 +++++++ .../src/generated/metadataIngestion/testSuitePipeline.ts | 7 +++++++ .../ui/src/generated/metadataIngestion/workflow.ts | 7 +++++++ 11 files changed, 78 insertions(+), 1 deletion(-) diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/api/automations/createWorkflow.ts b/openmetadata-ui/src/main/resources/ui/src/generated/api/automations/createWorkflow.ts index fa823066b2f0..59b7c16c5ea4 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/api/automations/createWorkflow.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/api/automations/createWorkflow.ts @@ -1548,6 +1548,13 @@ export interface ConfigObject { * Hostname of the Couchbase service. */ hostport?: string; + /** + * Number of days of lineage scanned per query against the Unity Catalog system tables. The + * configured lineage lookback window is split into chunks of this size and streamed one + * chunk at a time so each scan stays bounded. Larger values issue fewer queries (faster); + * smaller values keep each scan smaller. + */ + lineageQueryChunkSize?: number; /** * Enable dataflow for ingestion */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/api/services/createDatabaseService.ts b/openmetadata-ui/src/main/resources/ui/src/generated/api/services/createDatabaseService.ts index b17143b3a417..71715b51e449 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/api/services/createDatabaseService.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/api/services/createDatabaseService.ts @@ -1037,6 +1037,13 @@ export interface Connection { * Hostname of the Couchbase service. */ hostport?: string; + /** + * Number of days of lineage scanned per query against the Unity Catalog system tables. The + * configured lineage lookback window is split into chunks of this size and streamed one + * chunk at a time so each scan stays bounded. Larger values issue fewer queries (faster); + * smaller values keep each scan smaller. + */ + lineageQueryChunkSize?: number; /** * Enable dataflow for ingestion */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/api/services/ingestionPipelines/createIngestionPipeline.ts b/openmetadata-ui/src/main/resources/ui/src/generated/api/services/ingestionPipelines/createIngestionPipeline.ts index 8b68eae97540..d45715ea0117 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/api/services/ingestionPipelines/createIngestionPipeline.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/api/services/ingestionPipelines/createIngestionPipeline.ts @@ -4749,6 +4749,13 @@ export interface ConfigObject { * Hostname of the Couchbase service. */ hostport?: string; + /** + * Number of days of lineage scanned per query against the Unity Catalog system tables. The + * configured lineage lookback window is split into chunks of this size and streamed one + * chunk at a time so each scan stays bounded. Larger values issue fewer queries (faster); + * smaller values keep each scan smaller. + */ + lineageQueryChunkSize?: number; /** * Enable dataflow for ingestion */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/automations/testServiceConnection.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/automations/testServiceConnection.ts index 7cd592cc6213..33a59959786b 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/automations/testServiceConnection.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/automations/testServiceConnection.ts @@ -1430,6 +1430,13 @@ export interface ConfigObject { * Hostname of the Couchbase service. */ hostport?: string; + /** + * Number of days of lineage scanned per query against the Unity Catalog system tables. The + * configured lineage lookback window is split into chunks of this size and streamed one + * chunk at a time so each scan stays bounded. Larger values issue fewer queries (faster); + * smaller values keep each scan smaller. + */ + lineageQueryChunkSize?: number; /** * Enable dataflow for ingestion */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/automations/workflow.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/automations/workflow.ts index 002a76e3177e..40971bccd10c 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/automations/workflow.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/automations/workflow.ts @@ -2101,6 +2101,13 @@ export interface ConfigObject { * Hostname of the Couchbase service. */ hostport?: string; + /** + * Number of days of lineage scanned per query against the Unity Catalog system tables. The + * configured lineage lookback window is split into chunks of this size and streamed one + * chunk at a time so each scan stays bounded. Larger values issue fewer queries (faster); + * smaller values keep each scan smaller. + */ + lineageQueryChunkSize?: number; /** * Enable dataflow for ingestion */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/database/unityCatalogConnection.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/database/unityCatalogConnection.ts index 928550b27184..c88f451a38f7 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/database/unityCatalogConnection.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/database/unityCatalogConnection.ts @@ -49,7 +49,14 @@ export interface UnityCatalogConnection { /** * Databricks compute resources URL. */ - httpPath?: string; + httpPath?: string; + /** + * Number of days of lineage scanned per query against the Unity Catalog system tables. The + * configured lineage lookback window is split into chunks of this size and streamed one + * chunk at a time so each scan stays bounded. Larger values issue fewer queries (faster); + * smaller values keep each scan smaller. + */ + lineageQueryChunkSize?: number; sampleDataStorageConfig?: SampleDataStorageConfig; /** * Regex to only include/exclude schemas that matches the pattern. diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/serviceConnection.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/serviceConnection.ts index 56f62e3eba5d..8ab178cf5ed3 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/serviceConnection.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/serviceConnection.ts @@ -1643,6 +1643,13 @@ export interface ConfigObject { * Hostname of the Couchbase service. */ hostport?: string; + /** + * Number of days of lineage scanned per query against the Unity Catalog system tables. The + * configured lineage lookback window is split into chunks of this size and streamed one + * chunk at a time so each scan stays bounded. Larger values issue fewer queries (faster); + * smaller values keep each scan smaller. + */ + lineageQueryChunkSize?: number; /** * Enable dataflow for ingestion */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/databaseService.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/databaseService.ts index 0f985c9b7914..a0796f9453e3 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/databaseService.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/databaseService.ts @@ -1168,6 +1168,13 @@ export interface Connection { * Hostname of the Couchbase service. */ hostport?: string; + /** + * Number of days of lineage scanned per query against the Unity Catalog system tables. The + * configured lineage lookback window is split into chunks of this size and streamed one + * chunk at a time so each scan stays bounded. Larger values issue fewer queries (faster); + * smaller values keep each scan smaller. + */ + lineageQueryChunkSize?: number; /** * Enable dataflow for ingestion */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/ingestionPipelines/ingestionPipeline.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/ingestionPipelines/ingestionPipeline.ts index b0211ab4f91e..d07a9cf22839 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/ingestionPipelines/ingestionPipeline.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/ingestionPipelines/ingestionPipeline.ts @@ -5279,6 +5279,13 @@ export interface ConfigObject { * Hostname of the Couchbase service. */ hostport?: string; + /** + * Number of days of lineage scanned per query against the Unity Catalog system tables. The + * configured lineage lookback window is split into chunks of this size and streamed one + * chunk at a time so each scan stays bounded. Larger values issue fewer queries (faster); + * smaller values keep each scan smaller. + */ + lineageQueryChunkSize?: number; /** * Enable dataflow for ingestion */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/testSuitePipeline.ts b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/testSuitePipeline.ts index 05ef7fb7d753..7aa769bafac7 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/testSuitePipeline.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/testSuitePipeline.ts @@ -1741,6 +1741,13 @@ export interface ConfigObject { * Hostname of the Couchbase service. */ hostport?: string; + /** + * Number of days of lineage scanned per query against the Unity Catalog system tables. The + * configured lineage lookback window is split into chunks of this size and streamed one + * chunk at a time so each scan stays bounded. Larger values issue fewer queries (faster); + * smaller values keep each scan smaller. + */ + lineageQueryChunkSize?: number; /** * Enable dataflow for ingestion */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/workflow.ts b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/workflow.ts index 6bf3ae9a7749..d523df1be8a3 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/workflow.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/workflow.ts @@ -1732,6 +1732,13 @@ export interface ConfigObject { * Hostname of the Couchbase service. */ hostport?: string; + /** + * Number of days of lineage scanned per query against the Unity Catalog system tables. The + * configured lineage lookback window is split into chunks of this size and streamed one + * chunk at a time so each scan stays bounded. Larger values issue fewer queries (faster); + * smaller values keep each scan smaller. + */ + lineageQueryChunkSize?: number; /** * Enable dataflow for ingestion */ From d9b374fab5864d46ed04da027bff20cf2cd05146 Mon Sep 17 00:00:00 2001 From: ulixius9 Date: Tue, 9 Jun 2026 20:13:04 +0530 Subject: [PATCH 6/7] fix(unitycatalog): surface window-level lineage failures, drop redundant filter Address gitar bot follow-up review on #28648: - Surface whole-window and external-scan query failures as Either(left=StackTraceError) instead of swallowing them with a log warning. A recurring failure (e.g. missing permissions on system.access.table_lineage) would otherwise reproduce the "silent zero edges, workflow reports success" problem this PR set out to fix. - Drop the redundant _is_filtered_table guard inside _build_table_edge. _yield_table_lineage already filters each row before calling it, so the guard re-ran the three regex filter calls on every emitted row; a None return now unambiguously means "unresolved". Row processing is split into _yield_window_lineage / _yield_row_lineage helpers for clarity. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../source/database/unitycatalog/lineage.py | 158 ++++++++++-------- .../database/test_unity_catalog_lineage.py | 35 ++-- 2 files changed, 112 insertions(+), 81 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py index 245bd2e7f9df..f33bac445b90 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py @@ -226,96 +226,113 @@ def _build_column_lineage( def _build_table_edge(self, row: Any) -> Optional[AddLineageRequest]: # noqa: UP045 """ Resolve both endpoints of a streamed lineage row to OpenMetadata tables - and build the lineage request with column lineage attached. Returns None - when either side is filtered out or not present in OpenMetadata. + and build the lineage request with column lineage attached. Callers drop + filtered rows first, so a None return means one of the endpoints is not + present in OpenMetadata. """ edge = None source_fqn = row.source_table_full_name target_fqn = row.target_table_full_name - if not (self._is_filtered_table(source_fqn) or self._is_filtered_table(target_fqn)): - from_entity = self._resolve_table(source_fqn) - to_entity = self._resolve_table(target_fqn) - if from_entity and to_entity: - column_lineage = self._build_column_lineage(from_entity, to_entity, row.column_pairs) - edge = AddLineageRequest( - edge=EntitiesEdge( - fromEntity=EntityReference(id=from_entity.id, type="table"), # pyright: ignore[reportCallIssue] - toEntity=EntityReference(id=to_entity.id, type="table"), # pyright: ignore[reportCallIssue] - lineageDetails=LineageDetails( # pyright: ignore[reportCallIssue] - columnsLineage=column_lineage or None, - source=LineageSource.QueryLineage, - ), - ) - ) - else: - logger.debug( - f"Skipping edge, table not found in OpenMetadata: " - f"{source_fqn} (found={from_entity is not None}) -> " - f"{target_fqn} (found={to_entity is not None})" + from_entity = self._resolve_table(source_fqn) + to_entity = self._resolve_table(target_fqn) + if from_entity and to_entity: + column_lineage = self._build_column_lineage(from_entity, to_entity, row.column_pairs) + edge = AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference(id=from_entity.id, type="table"), # pyright: ignore[reportCallIssue] + toEntity=EntityReference(id=to_entity.id, type="table"), # pyright: ignore[reportCallIssue] + lineageDetails=LineageDetails( # pyright: ignore[reportCallIssue] + columnsLineage=column_lineage or None, + source=LineageSource.QueryLineage, + ), ) + ) + else: + logger.debug( + f"Skipping edge, table not found in OpenMetadata: " + f"{source_fqn} (found={from_entity is not None}) -> " + f"{target_fqn} (found={to_entity is not None})" + ) return edge def _fetch_lineage_rows(self, window_start: datetime, window_end: datetime) -> Iterable: """ Run the combined lineage query for one [start, end) window, streaming - rows so the driver does not buffer the whole result set. A failure on one - window is logged and swallowed so the remaining windows still run. + rows so the driver does not buffer the whole result set. Exceptions + propagate to the caller, which surfaces them in workflow status. """ sql_statement = UNITY_CATALOG_LINEAGE.format( start_time=window_start, end_time=window_end, ) - try: - with self.engine.connect() as conn: - rows = conn.execution_options(stream_results=True, max_row_buffer=1000).execute(text(sql_statement)) - yield from rows - except Exception as exc: - logger.warning(f"Failed to fetch lineage for window {window_start} - {window_end}: {exc}") - logger.debug(traceback.format_exc()) + with self.engine.connect() as conn: + rows = conn.execution_options(stream_results=True, max_row_buffer=1000).execute(text(sql_statement)) + yield from rows def _yield_table_lineage(self) -> Iterable[Either[AddLineageRequest]]: """ Stream table/column lineage one day-window at a time, emitting one - request per resolved edge. Per-row failures surface as Either(left) - instead of being swallowed. Edges dropped by `databaseFilterPattern` and - edges whose tables are absent from OpenMetadata are counted separately so - the summary distinguishes intentional filtering from missing metadata. + request per resolved edge. Per-row and per-window failures both surface + as Either(left) instead of being swallowed. Edges dropped by + `databaseFilterPattern` and edges whose tables are absent from + OpenMetadata are counted separately so the summary distinguishes + intentional filtering from missing metadata. """ - emitted = 0 - filtered = 0 - unresolved = 0 - failed = 0 + stats = {"emitted": 0, "filtered": 0, "unresolved": 0, "failed": 0} for window_start, window_end in self._iter_date_windows(): + yield from self._yield_window_lineage(window_start, window_end, stats) + logger.info( + f"Table lineage: emitted {stats['emitted']} edges, filtered {stats['filtered']} " + f"(databaseFilterPattern), unresolved {stats['unresolved']} (tables not in OpenMetadata), " + f"failed {stats['failed']} (row/window errors)" + ) + + def _yield_window_lineage( + self, window_start: datetime, window_end: datetime, stats: dict + ) -> Iterable[Either[AddLineageRequest]]: + """ + Process one [start, end) window. A failure fetching the window (e.g. a + permissions error on the system tables that would recur for every + window) is surfaced as an Either(left) so it appears in workflow status + rather than silently producing zero edges. + """ + try: for row in self._fetch_lineage_rows(window_start, window_end): - if self._is_filtered_table(row.source_table_full_name) or self._is_filtered_table( - row.target_table_full_name - ): - filtered += 1 - continue - try: - edge = self._build_table_edge(row) - except Exception as exc: - failed += 1 - yield Either( # pyright: ignore[reportCallIssue] - left=StackTraceError( - name=row.target_table_full_name, - error=( - f"Error processing lineage {row.source_table_full_name} -> " - f"{row.target_table_full_name}: {exc}" - ), - stackTrace=traceback.format_exc(), - ) + yield from self._yield_row_lineage(row, stats) + except Exception as exc: + stats["failed"] += 1 + yield Either( # pyright: ignore[reportCallIssue] + left=StackTraceError( + name=f"table-lineage-window:{window_start}/{window_end}", + error=f"Failed to fetch lineage for window {window_start} - {window_end}: {exc}", + stackTrace=traceback.format_exc(), + ) + ) + + def _yield_row_lineage(self, row: Any, stats: dict) -> Iterable[Either[AddLineageRequest]]: + if self._is_filtered_table(row.source_table_full_name) or self._is_filtered_table(row.target_table_full_name): + stats["filtered"] += 1 + else: + try: + edge = self._build_table_edge(row) + except Exception as exc: + stats["failed"] += 1 + yield Either( # pyright: ignore[reportCallIssue] + left=StackTraceError( + name=row.target_table_full_name, + error=( + f"Error processing lineage {row.source_table_full_name} -> " + f"{row.target_table_full_name}: {exc}" + ), + stackTrace=traceback.format_exc(), ) - continue + ) + else: if edge is None: - unresolved += 1 - continue - emitted += 1 - yield Either(right=edge) # pyright: ignore[reportCallIssue] - logger.info( - f"Table lineage: emitted {emitted} edges, filtered {filtered} (databaseFilterPattern), " - f"unresolved {unresolved} (tables not in OpenMetadata), failed {failed} (row errors)" - ) + stats["unresolved"] += 1 + else: + stats["emitted"] += 1 + yield Either(right=edge) # pyright: ignore[reportCallIssue] def _get_data_model_column_fqn(self, data_model_entity: ContainerDataModel, column: str) -> Optional[str]: # noqa: UP045 if not data_model_entity: @@ -418,8 +435,13 @@ def _yield_external_lineage(self) -> Iterable[Either[AddLineageRequest]]: if table_entity: yield from self._process_external_location_lineage(table_entity, row.storage_path) except Exception as exc: - logger.warning(f"Failed to fetch external table locations: {exc}") - logger.debug(traceback.format_exc()) + yield Either( # pyright: ignore[reportCallIssue] + left=StackTraceError( + name="external-table-lineage", + error=f"Failed to fetch external table locations: {exc}", + stackTrace=traceback.format_exc(), + ) + ) def _iter(self, *_, **__) -> Iterable[Either[AddLineageRequest]]: """ diff --git a/ingestion/tests/unit/topology/database/test_unity_catalog_lineage.py b/ingestion/tests/unit/topology/database/test_unity_catalog_lineage.py index 4205512848a1..b89e0a50c3c5 100644 --- a/ingestion/tests/unit/topology/database/test_unity_catalog_lineage.py +++ b/ingestion/tests/unit/topology/database/test_unity_catalog_lineage.py @@ -246,16 +246,6 @@ def test_returns_none_when_target_unresolved(self, lineage_source): assert lineage_source._build_table_edge(row) is None - def test_returns_none_when_table_filtered(self, lineage_source): - lineage_source.source_config.tableFilterPattern = MagicMock() - with patch( - "metadata.ingestion.source.database.unitycatalog.lineage.filter_by_table", - return_value=True, - ): - row = LineageRow("cat.schema.source", "cat.schema.target", None) - assert lineage_source._build_table_edge(row) is None - lineage_source.metadata.get_by_name.assert_not_called() - def test_self_loop_column_dropped(self, lineage_source): source_table = _make_table( "tbl", @@ -310,7 +300,7 @@ def test_row_failure_yields_left(self, lineage_source): assert results[0].right is None assert "boom" in results[0].left.error - def test_query_failure_is_swallowed(self, lineage_source): + def test_window_failure_yields_left(self, lineage_source): mock_conn = MagicMock() mock_conn.execution_options.return_value.execute.side_effect = Exception("Access denied") lineage_source.engine.connect.return_value.__enter__ = Mock(return_value=mock_conn) @@ -319,7 +309,24 @@ def test_query_failure_is_swallowed(self, lineage_source): with patch.object(lineage_source, "_iter_date_windows", return_value=[("s", "e")]): results = list(lineage_source._yield_table_lineage()) + assert len(results) == 1 + assert results[0].right is None + assert "Access denied" in results[0].left.error + + def test_filtered_rows_skipped_without_resolution(self, lineage_source): + rows = [LineageRow("cat.schema.source", "cat.schema.target", None)] + with ( + patch.object(lineage_source, "_iter_date_windows", return_value=[("s", "e")]), + patch( + "metadata.ingestion.source.database.unitycatalog.lineage.filter_by_table", + return_value=True, + ), + ): + _mock_query_rows(lineage_source, rows) + results = list(lineage_source._yield_table_lineage()) + assert results == [] + lineage_source.metadata.get_by_name.assert_not_called() def test_query_scoped_to_window(self, lineage_source): with patch.object(lineage_source, "_iter_date_windows", return_value=[("2026-01-01", "2026-01-02")]): @@ -399,7 +406,7 @@ def test_failure_yields_left(self, lineage_source): assert results[0].left is not None assert "Search unavailable" in results[0].left.error - def test_external_query_failure_is_swallowed(self, lineage_source): + def test_external_scan_failure_yields_left(self, lineage_source): mock_conn = MagicMock() mock_conn.execution_options.return_value.execute.side_effect = Exception("Access denied") lineage_source.engine.connect.return_value.__enter__ = Mock(return_value=mock_conn) @@ -407,7 +414,9 @@ def test_external_query_failure_is_swallowed(self, lineage_source): results = list(lineage_source._yield_external_lineage()) - assert results == [] + assert len(results) == 1 + assert results[0].right is None + assert "Access denied" in results[0].left.error class TestIsFilteredTable: From a1151b93b53c62e87d28c69764ff44f77e820a1c Mon Sep 17 00:00:00 2001 From: ulixius9 Date: Tue, 16 Jun 2026 15:54:36 +0530 Subject: [PATCH 7/7] fix(unitycatalog): dedup cross-window lineage edges, enforce chunk-size minimum - Add bounded LRU dedup so an edge spanning multiple day-windows is emitted once instead of once per window; surface the deduplicated count in the per-run summary. - Add `minimum: 1` to the lineageQueryChunkSize connection schema so invalid values are rejected by UI/API, matching the runtime clamp. - Correct the lineage summary log to credit database/schema/table filter patterns rather than only databaseFilterPattern. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../source/database/unitycatalog/lineage.py | 33 +++++++++++++++---- .../database/test_unity_catalog_lineage.py | 14 ++++++++ .../database/unityCatalogConnection.json | 3 +- 3 files changed, 43 insertions(+), 7 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py index f33bac445b90..19e39586efd3 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py @@ -61,6 +61,8 @@ TABLE_RESOLUTION_CACHE_SIZE = 1000 +EDGE_DEDUP_CACHE_SIZE = 1000 + DEFAULT_LINEAGE_CHUNK_DAYS = 7 @@ -90,6 +92,7 @@ def __init__( self.connection_obj = get_connection(self.service_connection) self.engine = get_sqlalchemy_connection(self.service_connection) self._table_cache: LRUCache = LRUCache(maxsize=TABLE_RESOLUTION_CACHE_SIZE) + self._seen_edges: LRUCache = LRUCache(maxsize=EDGE_DEDUP_CACHE_SIZE) self._chunk_days = self._resolve_chunk_days() self.test_connection() @@ -273,17 +276,18 @@ def _yield_table_lineage(self) -> Iterable[Either[AddLineageRequest]]: """ Stream table/column lineage one day-window at a time, emitting one request per resolved edge. Per-row and per-window failures both surface - as Either(left) instead of being swallowed. Edges dropped by - `databaseFilterPattern` and edges whose tables are absent from - OpenMetadata are counted separately so the summary distinguishes + as Either(left) instead of being swallowed. Edges dropped by the + database/schema/table filter patterns and edges whose tables are absent + from OpenMetadata are counted separately so the summary distinguishes intentional filtering from missing metadata. """ - stats = {"emitted": 0, "filtered": 0, "unresolved": 0, "failed": 0} + stats = {"emitted": 0, "duplicate": 0, "filtered": 0, "unresolved": 0, "failed": 0} for window_start, window_end in self._iter_date_windows(): yield from self._yield_window_lineage(window_start, window_end, stats) logger.info( - f"Table lineage: emitted {stats['emitted']} edges, filtered {stats['filtered']} " - f"(databaseFilterPattern), unresolved {stats['unresolved']} (tables not in OpenMetadata), " + f"Table lineage: emitted {stats['emitted']} edges, deduplicated {stats['duplicate']} " + f"cross-window duplicates, filtered {stats['filtered']} (database/schema/table filter " + f"patterns), unresolved {stats['unresolved']} (tables not in OpenMetadata), " f"failed {stats['failed']} (row/window errors)" ) @@ -310,6 +314,23 @@ def _yield_window_lineage( ) def _yield_row_lineage(self, row: Any, stats: dict) -> Iterable[Either[AddLineageRequest]]: + """ + Skip edges already streamed in an earlier window so an edge whose events + span multiple windows is emitted once instead of once per window. The + dedup set is a bounded LRU, so only recently-seen edges are suppressed; + edges past the cache window may re-emit (lineage adds are idempotent). + """ + edge_key = ( + row.source_table_full_name.lower(), + row.target_table_full_name.lower(), + ) + if edge_key in self._seen_edges: + stats["duplicate"] += 1 + else: + self._seen_edges[edge_key] = True + yield from self._process_unseen_row(row, stats) + + def _process_unseen_row(self, row: Any, stats: dict) -> Iterable[Either[AddLineageRequest]]: if self._is_filtered_table(row.source_table_full_name) or self._is_filtered_table(row.target_table_full_name): stats["filtered"] += 1 else: diff --git a/ingestion/tests/unit/topology/database/test_unity_catalog_lineage.py b/ingestion/tests/unit/topology/database/test_unity_catalog_lineage.py index b89e0a50c3c5..d826b75c77a4 100644 --- a/ingestion/tests/unit/topology/database/test_unity_catalog_lineage.py +++ b/ingestion/tests/unit/topology/database/test_unity_catalog_lineage.py @@ -276,6 +276,20 @@ def test_emits_resolved_edges(self, lineage_source): assert len(results) == 1 assert isinstance(results[0].right, AddLineageRequest) + def test_dedupes_edges_across_windows(self, lineage_source): + source_table = _make_table("source", "local_unitycatalog.cat.schema.source") + target_table = _make_table("target", "local_unitycatalog.cat.schema.target") + lineage_source.metadata.get_by_name.side_effect = [source_table, target_table] + + rows = [LineageRow("cat.schema.source", "cat.schema.target", None)] + with patch.object(lineage_source, "_iter_date_windows", return_value=[("s1", "e1"), ("s2", "e2")]): + _mock_query_rows(lineage_source, rows) + results = list(lineage_source._yield_table_lineage()) + + assert len(results) == 1 + assert isinstance(results[0].right, AddLineageRequest) + assert lineage_source.metadata.get_by_name.call_count == 2 + def test_skips_unresolved_edges(self, lineage_source): lineage_source.metadata.get_by_name.return_value = None diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/unityCatalogConnection.json b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/unityCatalogConnection.json index 72cacd44fdc1..f30a22b34024 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/unityCatalogConnection.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/unityCatalogConnection.json @@ -84,7 +84,8 @@ "title": "Lineage Query Chunk Size", "description": "Number of days of lineage scanned per query against the Unity Catalog system tables. The configured lineage lookback window is split into chunks of this size and streamed one chunk at a time so each scan stays bounded. Larger values issue fewer queries (faster); smaller values keep each scan smaller.", "type": "integer", - "default": 7 + "default": 7, + "minimum": 1 }, "connectionOptions": { "title": "Connection Options",