diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py index a01b49f50b21..19e39586efd3 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py @@ -12,20 +12,23 @@ Databricks Unity Catalog Lineage Source Module """ +import json import traceback -from collections import defaultdict -from typing import Iterable, Optional # noqa: UP035 +from datetime import datetime, timedelta +from typing import Any, Iterable, List, Optional, Tuple # noqa: UP035 +from cachetools import LRUCache from sqlalchemy import text from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.container import ContainerDataModel -from metadata.generated.schema.entity.data.database import Database -from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.services.connections.database.unityCatalogConnection import ( UnityCatalogConnection, ) +from metadata.generated.schema.entity.services.ingestionPipelines.status import ( + StackTraceError, +) from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) @@ -46,21 +49,33 @@ get_sqlalchemy_connection, ) from metadata.ingestion.source.database.unitycatalog.queries import ( - UNITY_CATALOG_COLUMN_LINEAGE, UNITY_CATALOG_EXTERNAL_TABLES, - UNITY_CATALOG_TABLE_LINEAGE, + UNITY_CATALOG_LINEAGE, ) from metadata.utils import fqn from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table -from metadata.utils.helpers import retry_with_docker_host +from metadata.utils.helpers import get_start_and_end, retry_with_docker_host from metadata.utils.logger import ingestion_logger logger = ingestion_logger() +TABLE_RESOLUTION_CACHE_SIZE = 1000 + +EDGE_DEDUP_CACHE_SIZE = 1000 + +DEFAULT_LINEAGE_CHUNK_DAYS = 7 + class UnitycatalogLineageSource(Source): """ - Lineage Unity Catalog Source + Lineage Unity Catalog Source. + + Lineage edges are streamed one day-window at a time from + `system.access.table_lineage` / `column_lineage` (the Databricks analogue + of Snowflake's ACCESS_HISTORY). Column pairs are aggregated server-side per + edge, and each endpoint is resolved to an OpenMetadata table through a + bounded LRU cache, so client memory stays O(window) regardless of how large + the metastore lineage graph is. """ @retry_with_docker_host() @@ -76,11 +91,22 @@ def __init__( self.source_config = self.config.sourceConfig.config self.connection_obj = get_connection(self.service_connection) self.engine = get_sqlalchemy_connection(self.service_connection) - self.table_lineage_map: dict[str, set[str]] = defaultdict(set) - self.column_lineage_map: dict[tuple[str, str], list[tuple[str, str]]] = defaultdict(list) - self.external_location_map: dict[str, str] = {} + self._table_cache: LRUCache = LRUCache(maxsize=TABLE_RESOLUTION_CACHE_SIZE) + self._seen_edges: LRUCache = LRUCache(maxsize=EDGE_DEDUP_CACHE_SIZE) + self._chunk_days = self._resolve_chunk_days() self.test_connection() + def _resolve_chunk_days(self) -> int: + """ + Days of lineage scanned per system-table query, read from the + `lineageQueryChunkSize` connection field. Clamp to >= 1 so the + date-window iterator always makes forward progress. + """ + configured = getattr(self.service_connection, "lineageQueryChunkSize", None) + if configured is None: + return DEFAULT_LINEAGE_CHUNK_DAYS + return max(1, int(configured)) + def close(self): """ By default, there is nothing to close @@ -100,58 +126,234 @@ def create(cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str raise InvalidSourceException(f"Expected UnityCatalogConnection, but got {connection}") return cls(config, metadata) - def _cache_lineage(self): + def _iter_date_windows(self) -> Iterable[Tuple[datetime, datetime]]: # noqa: UP006 """ - Bulk-fetch all table and column lineage from system tables into memory. + Split the configured `queryLogDuration` lookback into + `lineageQueryChunkSize` day windows. Streaming one window at a time keeps + each system-table scan and its result set bounded instead of pulling the + whole lookback at once. """ - query_log_duration = self.source_config.queryLogDuration or 1 # pyright: ignore[reportAttributeAccessIssue] - logger.info(f"Caching lineage from system tables (lookback: {query_log_duration} days)") - - try: - with self.engine.connect() as conn: - rows = conn.execute(text(UNITY_CATALOG_TABLE_LINEAGE.format(query_log_duration=query_log_duration))) - for row in rows: - self.table_lineage_map[row.target_table_full_name].add(row.source_table_full_name) - logger.info( - f"Cached table lineage: {sum(len(v) for v in self.table_lineage_map.values())} edges " - f"for {len(self.table_lineage_map)} target tables" + start, end = get_start_and_end(self.source_config.queryLogDuration or 1) # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess] + window_start = start + while window_start < end: + window_end = min(window_start + timedelta(days=self._chunk_days), end) + yield window_start, window_end + window_start = window_end + + def _resolve_table(self, databricks_table_fqn: str) -> Optional[Table]: # noqa: UP045 + """ + Resolve a `catalog.schema.table` Unity Catalog name to an OpenMetadata + Table entity. Both hits and misses are cached in a bounded LRU so a busy + upstream table referenced by many edges is fetched once, and repeated + unresolvable lookups stay cheap. + """ + cache_key = databricks_table_fqn.lower() + if cache_key in self._table_cache: + return self._table_cache[cache_key] + entity = self._fetch_table_entity(cache_key) + self._table_cache[cache_key] = entity + return entity + + def _fetch_table_entity(self, databricks_table_fqn: str) -> Optional[Table]: # noqa: UP045 + entity = None + parts = databricks_table_fqn.split(".") + if len(parts) != 3: + logger.debug(f"Skipping malformed table name: {databricks_table_fqn}") + else: + catalog_name, schema_name, table_name = parts + table_fqn = fqn.build( + metadata=self.metadata, + entity_type=Table, + service_name=self.config.serviceName, + database_name=catalog_name, + schema_name=schema_name, + table_name=table_name, ) - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.warning(f"Failed to cache table lineage: {exc}") + entity = self.metadata.get_by_name(entity=Table, fqn=table_fqn) # pyright: ignore[reportArgumentType] + return entity + + def _is_filtered_table(self, databricks_table_fqn: str) -> bool: + is_filtered = False + parts = databricks_table_fqn.lower().split(".") + if len(parts) == 3: + catalog_name, schema_name, table_name = parts + is_filtered = ( + filter_by_database( + self.source_config.databaseFilterPattern, # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess] + catalog_name, + ) + or filter_by_schema( + self.source_config.schemaFilterPattern, # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess] + schema_name, + ) + or filter_by_table( + self.source_config.tableFilterPattern, # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess] + table_name, + ) + ) + return is_filtered - try: - with self.engine.connect() as conn: - rows = conn.execute(text(UNITY_CATALOG_COLUMN_LINEAGE.format(query_log_duration=query_log_duration))) - for row in rows: - table_key = ( - row.source_table_full_name, - row.target_table_full_name, - ) - self.column_lineage_map[table_key].append((row.source_column_name, row.target_column_name)) - logger.info( - f"Cached column lineage: {sum(len(v) for v in self.column_lineage_map.values())} " - f"column mappings for {len(self.column_lineage_map)} table pairs" + @staticmethod + def _parse_column_pairs(raw: object) -> List[Tuple[str, str]]: # noqa: UP006 + """ + Decode the server-aggregated `column_pairs` JSON into a list of + (source_column, target_column) tuples. The driver can hand back either a + JSON string or an already-parsed list, so handle both. + """ + pairs: List[Tuple[str, str]] = [] # noqa: UP006 + if isinstance(raw, str): + try: + raw = json.loads(raw) + except (ValueError, TypeError): + raw = None + if isinstance(raw, list): + for item in raw: + if isinstance(item, dict): + source_col = item.get("u") or item.get("U") + target_col = item.get("d") or item.get("D") + if source_col and target_col: + pairs.append((source_col, target_col)) + return pairs + + def _build_column_lineage( + self, from_table: Table, to_table: Table, raw_column_pairs: object + ) -> List[ColumnLineage]: # noqa: UP006 + col_lineage = [] + for source_col, target_col in self._parse_column_pairs(raw_column_pairs): + from_col_fqn = get_column_fqn(from_table, source_col) + to_col_fqn = get_column_fqn(to_table, target_col) + if from_col_fqn and to_col_fqn and from_col_fqn != to_col_fqn: + col_lineage.append(ColumnLineage(fromColumns=[from_col_fqn], toColumn=to_col_fqn)) # pyright: ignore[reportCallIssue] + return col_lineage + + def _build_table_edge(self, row: Any) -> Optional[AddLineageRequest]: # noqa: UP045 + """ + Resolve both endpoints of a streamed lineage row to OpenMetadata tables + and build the lineage request with column lineage attached. Callers drop + filtered rows first, so a None return means one of the endpoints is not + present in OpenMetadata. + """ + edge = None + source_fqn = row.source_table_full_name + target_fqn = row.target_table_full_name + from_entity = self._resolve_table(source_fqn) + to_entity = self._resolve_table(target_fqn) + if from_entity and to_entity: + column_lineage = self._build_column_lineage(from_entity, to_entity, row.column_pairs) + edge = AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference(id=from_entity.id, type="table"), # pyright: ignore[reportCallIssue] + toEntity=EntityReference(id=to_entity.id, type="table"), # pyright: ignore[reportCallIssue] + lineageDetails=LineageDetails( # pyright: ignore[reportCallIssue] + columnsLineage=column_lineage or None, + source=LineageSource.QueryLineage, + ), + ) ) - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.warning(f"Failed to cache column lineage: {exc}") + else: + logger.debug( + f"Skipping edge, table not found in OpenMetadata: " + f"{source_fqn} (found={from_entity is not None}) -> " + f"{target_fqn} (found={to_entity is not None})" + ) + return edge - def _cache_external_locations(self): + def _fetch_lineage_rows(self, window_start: datetime, window_end: datetime) -> Iterable: + """ + Run the combined lineage query for one [start, end) window, streaming + rows so the driver does not buffer the whole result set. Exceptions + propagate to the caller, which surfaces them in workflow status. """ - Bulk-fetch all external table storage locations from system.information_schema.tables. + sql_statement = UNITY_CATALOG_LINEAGE.format( + start_time=window_start, + end_time=window_end, + ) + with self.engine.connect() as conn: + rows = conn.execution_options(stream_results=True, max_row_buffer=1000).execute(text(sql_statement)) + yield from rows + + def _yield_table_lineage(self) -> Iterable[Either[AddLineageRequest]]: + """ + Stream table/column lineage one day-window at a time, emitting one + request per resolved edge. Per-row and per-window failures both surface + as Either(left) instead of being swallowed. Edges dropped by the + database/schema/table filter patterns and edges whose tables are absent + from OpenMetadata are counted separately so the summary distinguishes + intentional filtering from missing metadata. + """ + stats = {"emitted": 0, "duplicate": 0, "filtered": 0, "unresolved": 0, "failed": 0} + for window_start, window_end in self._iter_date_windows(): + yield from self._yield_window_lineage(window_start, window_end, stats) + logger.info( + f"Table lineage: emitted {stats['emitted']} edges, deduplicated {stats['duplicate']} " + f"cross-window duplicates, filtered {stats['filtered']} (database/schema/table filter " + f"patterns), unresolved {stats['unresolved']} (tables not in OpenMetadata), " + f"failed {stats['failed']} (row/window errors)" + ) + + def _yield_window_lineage( + self, window_start: datetime, window_end: datetime, stats: dict + ) -> Iterable[Either[AddLineageRequest]]: + """ + Process one [start, end) window. A failure fetching the window (e.g. a + permissions error on the system tables that would recur for every + window) is surfaced as an Either(left) so it appears in workflow status + rather than silently producing zero edges. """ - logger.info("Caching external table locations from system tables") try: - with self.engine.connect() as conn: - rows = conn.execute(text(UNITY_CATALOG_EXTERNAL_TABLES)) - for row in rows: - table_fqn = f"{row.table_catalog}.{row.table_schema}.{row.table_name}" - self.external_location_map[table_fqn] = row.storage_path - logger.info(f"Cached {len(self.external_location_map)} external table locations") + for row in self._fetch_lineage_rows(window_start, window_end): + yield from self._yield_row_lineage(row, stats) except Exception as exc: - logger.debug(traceback.format_exc()) - logger.warning(f"Failed to cache external table locations: {exc}") + stats["failed"] += 1 + yield Either( # pyright: ignore[reportCallIssue] + left=StackTraceError( + name=f"table-lineage-window:{window_start}/{window_end}", + error=f"Failed to fetch lineage for window {window_start} - {window_end}: {exc}", + stackTrace=traceback.format_exc(), + ) + ) + + def _yield_row_lineage(self, row: Any, stats: dict) -> Iterable[Either[AddLineageRequest]]: + """ + Skip edges already streamed in an earlier window so an edge whose events + span multiple windows is emitted once instead of once per window. The + dedup set is a bounded LRU, so only recently-seen edges are suppressed; + edges past the cache window may re-emit (lineage adds are idempotent). + """ + edge_key = ( + row.source_table_full_name.lower(), + row.target_table_full_name.lower(), + ) + if edge_key in self._seen_edges: + stats["duplicate"] += 1 + else: + self._seen_edges[edge_key] = True + yield from self._process_unseen_row(row, stats) + + def _process_unseen_row(self, row: Any, stats: dict) -> Iterable[Either[AddLineageRequest]]: + if self._is_filtered_table(row.source_table_full_name) or self._is_filtered_table(row.target_table_full_name): + stats["filtered"] += 1 + else: + try: + edge = self._build_table_edge(row) + except Exception as exc: + stats["failed"] += 1 + yield Either( # pyright: ignore[reportCallIssue] + left=StackTraceError( + name=row.target_table_full_name, + error=( + f"Error processing lineage {row.source_table_full_name} -> " + f"{row.target_table_full_name}: {exc}" + ), + stackTrace=traceback.format_exc(), + ) + ) + else: + if edge is None: + stats["unresolved"] += 1 + else: + stats["emitted"] += 1 + yield Either(right=edge) # pyright: ignore[reportCallIssue] def _get_data_model_column_fqn(self, data_model_entity: ContainerDataModel, column: str) -> Optional[str]: # noqa: UP045 if not data_model_entity: @@ -182,51 +384,26 @@ def _get_container_column_lineage( ) return None # noqa: TRY300 except Exception as exc: - logger.debug(f"Error computing container column lineage for {table_entity.fullyQualifiedName.root}: {exc}") - logger.debug(traceback.format_exc()) - return None - - def _get_column_lineage_details( - self, - from_table: Table, - to_table: Table, - source_table_fqn: str, - target_table_fqn: str, - ) -> Optional[LineageDetails]: # noqa: UP045 - try: - table_key = (source_table_fqn, target_table_fqn) - column_pairs = self.column_lineage_map.get(table_key, []) - if not column_pairs: - return None - - col_lineage = [] - for source_col, target_col in column_pairs: - from_col_fqn = get_column_fqn(from_table, source_col) - to_col_fqn = get_column_fqn(to_table, target_col) - if from_col_fqn and to_col_fqn and from_col_fqn != to_col_fqn: - col_lineage.append(ColumnLineage(fromColumns=[from_col_fqn], toColumn=to_col_fqn)) - - if col_lineage: - return LineageDetails(columnsLineage=col_lineage, source=LineageSource.QueryLineage) - return None # noqa: TRY300 - except Exception as exc: - logger.debug(f"Error computing column lineage: {exc}") + logger.warning( + f"Error computing container column lineage for {table_entity.fullyQualifiedName.root}: {exc}" # pyright: ignore[reportOptionalMemberAccess] + ) logger.debug(traceback.format_exc()) return None def _process_external_location_lineage( - self, table: Table, databricks_table_fqn: str + self, + table: Table, + storage_path: Optional[str], # noqa: UP045 ) -> Iterable[Either[AddLineageRequest]]: """ - Look up external table storage location from cache and create - container lineage if a matching container is found. + Create container lineage for an external table from its storage path, + if a matching container has been ingested. """ - storage_location = self.external_location_map.get(databricks_table_fqn) - if not storage_location: + if not storage_path: return try: - storage_location = storage_location.rstrip("/") + storage_location = storage_path.rstrip("/") location_entity = self.metadata.es_search_container_by_path(full_path=storage_location, fields="dataModel") if location_entity and location_entity[0]: @@ -250,95 +427,53 @@ def _process_external_location_lineage( ), ) except Exception as exc: - logger.debug(f"Error processing external location lineage for {databricks_table_fqn}: {exc}") - logger.debug(traceback.format_exc()) - - def _process_table_lineage(self, table: Table, databricks_table_fqn: str) -> Iterable[Either[AddLineageRequest]]: - upstream_tables = self.table_lineage_map.get(databricks_table_fqn, set()) - - for source_table_full_name in upstream_tables: - try: - parts = source_table_full_name.split(".") - if len(parts) != 3: - logger.debug(f"Skipping malformed source table name: {source_table_full_name}") - continue - catalog_name, schema_name, table_name = parts - - from_entity_fqn = fqn.build( - metadata=self.metadata, - entity_type=Table, - database_name=catalog_name, - schema_name=schema_name, - table_name=table_name, - service_name=self.config.serviceName, + yield Either( # pyright: ignore[reportCallIssue] + left=StackTraceError( + name=table.fullyQualifiedName.root, # pyright: ignore[reportOptionalMemberAccess] + error=f"Error processing external location lineage for {storage_path}: {exc}", + stackTrace=traceback.format_exc(), ) + ) - from_entity = self.metadata.get_by_name(entity=Table, fqn=from_entity_fqn) - if not from_entity: - logger.debug(f"Unable to find upstream entity: {source_table_full_name} -> {databricks_table_fqn}") - continue - - lineage_details = self._get_column_lineage_details( - from_table=from_entity, - to_table=table, - source_table_fqn=source_table_full_name, - target_table_fqn=databricks_table_fqn, + def _yield_external_lineage(self) -> Iterable[Either[AddLineageRequest]]: + """ + Stream external tables and create container lineage for each, resolving + the table through the shared bounded LRU. External-table storage paths + are a current snapshot, not an event stream, so this is a single scan + rather than a windowed one. Catalogs excluded by the database filter are + dropped per row via `_is_filtered_table`. + """ + try: + with self.engine.connect() as conn: + rows = conn.execution_options(stream_results=True, max_row_buffer=1000).execute( + text(UNITY_CATALOG_EXTERNAL_TABLES) ) - - yield Either( - right=AddLineageRequest( - edge=EntitiesEdge( - toEntity=EntityReference(id=table.id, type="table"), - fromEntity=EntityReference(id=from_entity.id, type="table"), - lineageDetails=lineage_details, - ) - ), + for row in rows: + databricks_table_fqn = f"{row.table_catalog}.{row.table_schema}.{row.table_name}".lower() + if self._is_filtered_table(databricks_table_fqn): + continue + table_entity = self._resolve_table(databricks_table_fqn) + if table_entity: + yield from self._process_external_location_lineage(table_entity, row.storage_path) + except Exception as exc: + yield Either( # pyright: ignore[reportCallIssue] + left=StackTraceError( + name="external-table-lineage", + error=f"Failed to fetch external table locations: {exc}", + stackTrace=traceback.format_exc(), ) - except Exception as exc: - logger.debug(f"Error processing lineage {source_table_full_name} -> {databricks_table_fqn}: {exc}") - logger.debug(traceback.format_exc()) + ) def _iter(self, *_, **__) -> Iterable[Either[AddLineageRequest]]: """ - Fetch lineage from system tables for both table-to-table - and external location lineage. + Stream table/column and external-location lineage across the whole + metastore. The system-table scans are no longer scoped per catalog; + catalogs excluded by `databaseFilterPattern` are dropped per edge during + resolution, and resolved edges share a single bounded table-resolution + cache across the whole run. """ - self._cache_lineage() - self._cache_external_locations() - - for database in self.metadata.list_all_entities(entity=Database, params={"service": self.config.serviceName}): - if filter_by_database(self.source_config.databaseFilterPattern, database.name.root): # pyright: ignore[reportAttributeAccessIssue] - self.status.filter( - database.fullyQualifiedName.root, - "Catalog Filtered Out", - ) - continue - for schema in self.metadata.list_all_entities( - entity=DatabaseSchema, - params={"database": database.fullyQualifiedName.root}, - ): - if filter_by_schema(self.source_config.schemaFilterPattern, schema.name.root): # pyright: ignore[reportAttributeAccessIssue] - self.status.filter( - schema.fullyQualifiedName.root, - "Schema Filtered Out", - ) - continue - for table in self.metadata.list_all_entities( - entity=Table, - params={"databaseSchema": schema.fullyQualifiedName.root}, - ): - if filter_by_table(self.source_config.tableFilterPattern, table.name.root): # pyright: ignore[reportAttributeAccessIssue] - self.status.filter( - table.fullyQualifiedName.root, - "Table Filtered Out", - ) - continue - - databricks_table_fqn = f"{table.database.name}.{table.databaseSchema.name}.{table.name.root}" - - yield from self._process_table_lineage(table, databricks_table_fqn) - - yield from self._process_external_location_lineage(table, databricks_table_fqn) + yield from self._yield_table_lineage() + yield from self._yield_external_lineage() def test_connection(self) -> None: test_connection_common(self.metadata, self.connection_obj, self.service_connection) diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py index a78be8dccdfb..bde62f3b0a66 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py @@ -648,10 +648,17 @@ def mark_tables_as_deleted(self): raise ValueError("No Database found in the context. We cannot run the table deletion.") if self.source_config.markDeletedTables: logger.info(f"Mark Deleted Tables set to True. Processing database [{self.context.get().database}]") # pyright: ignore[reportAttributeAccessIssue] + # Drain the global list so it stays bounded to one catalog's + # deletions instead of growing across the whole run. + with self._state_lock: + deleted_tables = list( + self.context.get_global().deleted_tables # pyright: ignore[reportAttributeAccessIssue] + ) + self.context.get_global().deleted_tables.clear() # pyright: ignore[reportAttributeAccessIssue] yield from delete_entity_by_name( self.metadata, entity_type=Table, - entity_names=self.context.get_global().deleted_tables, # pyright: ignore[reportAttributeAccessIssue] + entity_names=deleted_tables, recursive=self.source_config.markDeletedTables, ) else: diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py index 27f48e0dbcf6..0f956dd20365 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py @@ -15,19 +15,21 @@ import textwrap UNITY_CATALOG_GET_CATALOGS_TAGS = """ -SELECT * FROM `{database}`.information_schema.catalog_tags; +SELECT catalog_name, tag_name, tag_value FROM `{database}`.information_schema.catalog_tags; """ UNITY_CATALOG_GET_ALL_SCHEMA_TAGS = """ -SELECT * FROM `{database}`.information_schema.schema_tags; +SELECT catalog_name, schema_name, tag_name, tag_value FROM `{database}`.information_schema.schema_tags; """ UNITY_CATALOG_GET_ALL_TABLE_TAGS = """ -SELECT * FROM `{database}`.information_schema.table_tags WHERE schema_name = '{schema}'; +SELECT catalog_name, schema_name, table_name, tag_name, tag_value +FROM `{database}`.information_schema.table_tags WHERE schema_name = '{schema}'; """ UNITY_CATALOG_GET_ALL_TABLE_COLUMNS_TAGS = """ -SELECT * FROM `{database}`.information_schema.column_tags WHERE schema_name = '{schema}'; +SELECT catalog_name, schema_name, table_name, column_name, tag_name, tag_value +FROM `{database}`.information_schema.column_tags WHERE schema_name = '{schema}'; """ UNITY_CATALOG_SQL_STATEMENT = textwrap.dedent( @@ -52,37 +54,42 @@ UNITY_CATALOG_GET_TABLE_DDL = "SHOW CREATE TABLE `{database}`.`{schema}`.`{table}`" -UNITY_CATALOG_TABLE_LINEAGE = textwrap.dedent( - """ - SELECT - source_table_full_name, - target_table_full_name - FROM system.access.table_lineage - WHERE event_time >= current_date() - INTERVAL {query_log_duration} DAYS - AND source_table_full_name IS NOT NULL - AND target_table_full_name IS NOT NULL - GROUP BY source_table_full_name, target_table_full_name - """ -) - -UNITY_CATALOG_COLUMN_LINEAGE = textwrap.dedent( - """ +UNITY_CATALOG_LINEAGE = textwrap.dedent( + """ + WITH column_pairs AS ( + SELECT + source_table_full_name, + target_table_full_name, + collect_set( + struct(source_column_name AS u, target_column_name AS d) + ) AS pairs + FROM system.access.column_lineage + WHERE event_time >= to_timestamp('{start_time}') + AND event_time < to_timestamp('{end_time}') + AND source_table_full_name IS NOT NULL + AND target_table_full_name IS NOT NULL + AND source_column_name IS NOT NULL + AND target_column_name IS NOT NULL + GROUP BY source_table_full_name, target_table_full_name + ), + table_edges AS ( + SELECT DISTINCT + source_table_full_name, + target_table_full_name + FROM system.access.table_lineage + WHERE event_time >= to_timestamp('{start_time}') + AND event_time < to_timestamp('{end_time}') + AND source_table_full_name IS NOT NULL + AND target_table_full_name IS NOT NULL + ) SELECT - source_table_full_name, - source_column_name, - target_table_full_name, - target_column_name - FROM system.access.column_lineage - WHERE event_time >= current_date() - INTERVAL {query_log_duration} DAYS - AND source_table_full_name IS NOT NULL - AND target_table_full_name IS NOT NULL - AND source_column_name IS NOT NULL - AND target_column_name IS NOT NULL - GROUP BY - source_table_full_name, - source_column_name, - target_table_full_name, - target_column_name + t.source_table_full_name AS source_table_full_name, + t.target_table_full_name AS target_table_full_name, + to_json(c.pairs) AS column_pairs + FROM table_edges t + LEFT JOIN column_pairs c + ON c.source_table_full_name = t.source_table_full_name + AND c.target_table_full_name = t.target_table_full_name """ ) diff --git a/ingestion/tests/unit/topology/database/test_unity_catalog_lineage.py b/ingestion/tests/unit/topology/database/test_unity_catalog_lineage.py index 757a8f23234d..d826b75c77a4 100644 --- a/ingestion/tests/unit/topology/database/test_unity_catalog_lineage.py +++ b/ingestion/tests/unit/topology/database/test_unity_catalog_lineage.py @@ -14,6 +14,8 @@ """ from collections import namedtuple +from datetime import timedelta +from itertools import pairwise from unittest.mock import MagicMock, Mock, patch from uuid import uuid4 @@ -36,7 +38,6 @@ from metadata.generated.schema.type.basic import EntityName, FullyQualifiedEntityName from metadata.generated.schema.type.entityLineage import Source as LineageSource from metadata.generated.schema.type.entityReference import EntityReference -from metadata.ingestion.api.models import Either from metadata.ingestion.source.database.unitycatalog.lineage import ( UnitycatalogLineageSource, ) @@ -55,7 +56,7 @@ "httpPath": "/sql/1.0/warehouses/test", } }, - "sourceConfig": {"config": {"type": "DatabaseLineage"}}, + "sourceConfig": {"config": {"type": "DatabaseLineage", "queryLogDuration": 1}}, }, "sink": {"type": "metadata-rest", "config": {}}, "workflowConfig": { @@ -67,6 +68,28 @@ }, } +LineageRow = namedtuple( + "LineageRow", + ["source_table_full_name", "target_table_full_name", "column_pairs"], +) + + +def _make_table(name: str, fqn: str, columns=None) -> Table: + return Table( + id=uuid4(), + name=EntityName(root=name), + fullyQualifiedName=FullyQualifiedEntityName(root=fqn), + columns=columns or [], + ) + + +def _make_column(name: str, fqn: str) -> Column: + return Column( + name=ColumnName(root=name), + dataType=DataType.STRING, + fullyQualifiedName=FullyQualifiedEntityName(root=fqn), + ) + @pytest.fixture def lineage_source(): @@ -82,360 +105,379 @@ def lineage_source(): yield source -class TestCacheLineage: - def test_cache_table_lineage(self, lineage_source): - TableRow = namedtuple("TableRow", ["source_table_full_name", "target_table_full_name"]) - mock_rows = [ - TableRow("cat.schema.source1", "cat.schema.target1"), - TableRow("cat.schema.source2", "cat.schema.target1"), - TableRow("cat.schema.source1", "cat.schema.target2"), - ] +def _mock_query_rows(lineage_source, rows): + mock_conn = MagicMock() + mock_conn.execution_options.return_value.execute.return_value = rows + mock_conn.execute.return_value = rows + lineage_source.engine.connect.return_value.__enter__ = Mock(return_value=mock_conn) + lineage_source.engine.connect.return_value.__exit__ = Mock(return_value=False) + return mock_conn - mock_conn = MagicMock() - mock_conn.execute.return_value = mock_rows - lineage_source.engine.connect.return_value.__enter__ = Mock(return_value=mock_conn) - lineage_source.engine.connect.return_value.__exit__ = Mock(return_value=False) - lineage_source._cache_lineage() +class TestResolveChunkDays: + def test_uses_configured_value(self, lineage_source): + lineage_source.service_connection.lineageQueryChunkSize = 14 - assert "cat.schema.target1" in lineage_source.table_lineage_map - assert lineage_source.table_lineage_map["cat.schema.target1"] == { - "cat.schema.source1", - "cat.schema.source2", - } - assert lineage_source.table_lineage_map["cat.schema.target2"] == { - "cat.schema.source1", - } + assert lineage_source._resolve_chunk_days() == 14 - def test_cache_column_lineage(self, lineage_source): - TableRow = namedtuple("TableRow", ["source_table_full_name", "target_table_full_name"]) - ColumnRow = namedtuple( - "ColumnRow", - [ - "source_table_full_name", - "source_column_name", - "target_table_full_name", - "target_column_name", - ], - ) + def test_clamps_to_at_least_one(self, lineage_source): + lineage_source.service_connection.lineageQueryChunkSize = 0 - call_count = 0 + assert lineage_source._resolve_chunk_days() == 1 - def mock_execute(query): - nonlocal call_count - call_count += 1 - if call_count == 1: - return [TableRow("cat.schema.src", "cat.schema.tgt")] - return [ - ColumnRow("cat.schema.src", "col_a", "cat.schema.tgt", "col_x"), - ColumnRow("cat.schema.src", "col_b", "cat.schema.tgt", "col_y"), - ] + def test_defaults_when_unset(self, lineage_source): + lineage_source.service_connection.lineageQueryChunkSize = None - mock_conn = MagicMock() - mock_conn.execute.side_effect = mock_execute - lineage_source.engine.connect.return_value.__enter__ = Mock(return_value=mock_conn) - lineage_source.engine.connect.return_value.__exit__ = Mock(return_value=False) + assert lineage_source._resolve_chunk_days() == 7 - lineage_source._cache_lineage() - key = ("cat.schema.src", "cat.schema.tgt") - assert key in lineage_source.column_lineage_map - assert lineage_source.column_lineage_map[key] == [ - ("col_a", "col_x"), - ("col_b", "col_y"), - ] +class TestDateWindows: + def test_windows_are_contiguous_and_chunk_sized(self, lineage_source): + lineage_source._chunk_days = 1 + lineage_source.source_config.queryLogDuration = 3 - def test_cache_lineage_handles_query_failure(self, lineage_source): - mock_conn = MagicMock() - mock_conn.execute.side_effect = Exception("Access denied") - lineage_source.engine.connect.return_value.__enter__ = Mock(return_value=mock_conn) - lineage_source.engine.connect.return_value.__exit__ = Mock(return_value=False) + windows = list(lineage_source._iter_date_windows()) - lineage_source._cache_lineage() + assert len(windows) == 4 + for window_start, window_end in windows: + assert window_end - window_start <= timedelta(days=1) + for prev, curr in pairwise(windows): + assert prev[1] == curr[0] - assert len(lineage_source.table_lineage_map) == 0 - assert len(lineage_source.column_lineage_map) == 0 + def test_larger_chunk_issues_fewer_windows(self, lineage_source): + lineage_source._chunk_days = 7 + lineage_source.source_config.queryLogDuration = 30 + windows = list(lineage_source._iter_date_windows()) -class TestProcessTableLineage: - def test_process_table_lineage_from_cache(self, lineage_source): - lineage_source.table_lineage_map = {"cat.schema.target": {"cat.schema.source"}} - lineage_source.column_lineage_map = {} + assert len(windows) == 5 + for prev, curr in pairwise(windows): + assert prev[1] == curr[0] - target_table = Table( - id=uuid4(), - name=EntityName(root="target"), - fullyQualifiedName=FullyQualifiedEntityName(root="local_unitycatalog.cat.schema.target"), - columns=[], - ) + def test_window_covers_full_lookback(self, lineage_source): + lineage_source._chunk_days = 1 + lineage_source.source_config.queryLogDuration = 2 - source_table = Table( - id=uuid4(), - name=EntityName(root="source"), - fullyQualifiedName=FullyQualifiedEntityName(root="local_unitycatalog.cat.schema.source"), - columns=[], - ) + windows = list(lineage_source._iter_date_windows()) + + assert windows[-1][1] - windows[0][0] == timedelta(days=3) + +class TestResolveTable: + def test_resolves_and_caches_hit(self, lineage_source): + source_table = _make_table("source", "local_unitycatalog.cat.schema.source") lineage_source.metadata.get_by_name.return_value = source_table - results = list(lineage_source._process_table_lineage(target_table, "cat.schema.target")) + first = lineage_source._resolve_table("cat.schema.source") + second = lineage_source._resolve_table("CAT.SCHEMA.SOURCE") - assert len(results) == 1 - assert isinstance(results[0], Either) - assert isinstance(results[0].right, AddLineageRequest) - assert results[0].right.edge.fromEntity.id == source_table.id - assert results[0].right.edge.toEntity.id == target_table.id + assert first is source_table + assert second is source_table + assert lineage_source.metadata.get_by_name.call_count == 1 - def test_process_table_lineage_with_column_lineage(self, lineage_source): - lineage_source.table_lineage_map = {"cat.schema.target": {"cat.schema.source"}} - lineage_source.column_lineage_map = {("cat.schema.source", "cat.schema.target"): [("col_a", "col_x")]} + def test_caches_misses(self, lineage_source): + lineage_source.metadata.get_by_name.return_value = None - target_table = Table( - id=uuid4(), - name=EntityName(root="target"), - fullyQualifiedName=FullyQualifiedEntityName(root="local_unitycatalog.cat.schema.target"), - columns=[ - Column( - name=ColumnName(root="col_x"), - dataType=DataType.STRING, - fullyQualifiedName=FullyQualifiedEntityName(root="local_unitycatalog.cat.schema.target.col_x"), - ) - ], - ) + assert lineage_source._resolve_table("cat.schema.missing") is None + assert lineage_source._resolve_table("cat.schema.missing") is None + assert lineage_source.metadata.get_by_name.call_count == 1 - source_table = Table( - id=uuid4(), - name=EntityName(root="source"), - fullyQualifiedName=FullyQualifiedEntityName(root="local_unitycatalog.cat.schema.source"), - columns=[ - Column( - name=ColumnName(root="col_a"), - dataType=DataType.STRING, - fullyQualifiedName=FullyQualifiedEntityName(root="local_unitycatalog.cat.schema.source.col_a"), - ) - ], - ) + def test_malformed_name_not_resolved(self, lineage_source): + assert lineage_source._resolve_table("not_a_fqn") is None + lineage_source.metadata.get_by_name.assert_not_called() - lineage_source.metadata.get_by_name.return_value = source_table - results = list(lineage_source._process_table_lineage(target_table, "cat.schema.target")) +class TestColumnPairs: + def test_parses_json_string(self, lineage_source): + raw = '[{"u":"col_a","d":"col_x"},{"u":"col_b","d":"col_y"}]' - assert len(results) == 1 - lineage_details = results[0].right.edge.lineageDetails - assert lineage_details is not None - assert len(lineage_details.columnsLineage) == 1 - assert lineage_details.columnsLineage[0].fromColumns[0].root == "local_unitycatalog.cat.schema.source.col_a" - assert lineage_details.columnsLineage[0].toColumn.root == "local_unitycatalog.cat.schema.target.col_x" + assert lineage_source._parse_column_pairs(raw) == [("col_a", "col_x"), ("col_b", "col_y")] - def test_process_table_lineage_skips_malformed_names(self, lineage_source): - lineage_source.table_lineage_map = {"cat.schema.target": {"malformed_name"}} - lineage_source.column_lineage_map = {} + def test_parses_already_parsed_list(self, lineage_source): + raw = [{"u": "col_a", "d": "col_x"}] - target_table = Table( - id=uuid4(), - name=EntityName(root="target"), - fullyQualifiedName=FullyQualifiedEntityName(root="local_unitycatalog.cat.schema.target"), - columns=[], - ) + assert lineage_source._parse_column_pairs(raw) == [("col_a", "col_x")] - results = list(lineage_source._process_table_lineage(target_table, "cat.schema.target")) + def test_handles_none_and_malformed(self, lineage_source): + assert lineage_source._parse_column_pairs(None) == [] + assert lineage_source._parse_column_pairs("not json") == [] + assert lineage_source._parse_column_pairs('[{"u":"only_source"}]') == [] - assert len(results) == 0 - def test_process_table_lineage_skips_missing_entity(self, lineage_source): - lineage_source.table_lineage_map = {"cat.schema.target": {"cat.schema.source"}} - lineage_source.column_lineage_map = {} +class TestBuildTableEdge: + def test_builds_edge_without_columns(self, lineage_source): + source_table = _make_table("source", "local_unitycatalog.cat.schema.source") + target_table = _make_table("target", "local_unitycatalog.cat.schema.target") + lineage_source.metadata.get_by_name.side_effect = [source_table, target_table] - target_table = Table( - id=uuid4(), - name=EntityName(root="target"), - fullyQualifiedName=FullyQualifiedEntityName(root="local_unitycatalog.cat.schema.target"), - columns=[], + row = LineageRow("cat.schema.source", "cat.schema.target", None) + edge = lineage_source._build_table_edge(row) + + assert isinstance(edge, AddLineageRequest) + assert edge.edge.fromEntity.id == source_table.id + assert edge.edge.toEntity.id == target_table.id + + def test_builds_edge_with_column_lineage(self, lineage_source): + source_table = _make_table( + "source", + "local_unitycatalog.cat.schema.source", + [_make_column("col_a", "local_unitycatalog.cat.schema.source.col_a")], ) + target_table = _make_table( + "target", + "local_unitycatalog.cat.schema.target", + [_make_column("col_x", "local_unitycatalog.cat.schema.target.col_x")], + ) + lineage_source.metadata.get_by_name.side_effect = [source_table, target_table] - lineage_source.metadata.get_by_name.return_value = None + row = LineageRow("cat.schema.source", "cat.schema.target", '[{"u":"col_a","d":"col_x"}]') + edge = lineage_source._build_table_edge(row) - results = list(lineage_source._process_table_lineage(target_table, "cat.schema.target")) + column_lineage = edge.edge.lineageDetails.columnsLineage + assert len(column_lineage) == 1 + assert column_lineage[0].fromColumns[0].root == "local_unitycatalog.cat.schema.source.col_a" + assert column_lineage[0].toColumn.root == "local_unitycatalog.cat.schema.target.col_x" - assert len(results) == 0 + def test_returns_none_when_target_unresolved(self, lineage_source): + source_table = _make_table("source", "local_unitycatalog.cat.schema.source") + lineage_source.metadata.get_by_name.side_effect = [source_table, None] + row = LineageRow("cat.schema.source", "cat.schema.target", None) -class TestColumnLineageDetails: - def test_self_loop_prevention(self, lineage_source): - lineage_source.column_lineage_map = {("cat.schema.src", "cat.schema.tgt"): [("col_a", "col_a")]} + assert lineage_source._build_table_edge(row) is None - table = Table( - id=uuid4(), - name=EntityName(root="tgt"), - fullyQualifiedName=FullyQualifiedEntityName(root="local_unitycatalog.cat.schema.tgt"), - columns=[ - Column( - name=ColumnName(root="col_a"), - dataType=DataType.STRING, - fullyQualifiedName=FullyQualifiedEntityName(root="local_unitycatalog.cat.schema.tgt.col_a"), - ) - ], + def test_self_loop_column_dropped(self, lineage_source): + source_table = _make_table( + "tbl", + "local_unitycatalog.cat.schema.tbl", + [_make_column("col_a", "local_unitycatalog.cat.schema.tbl.col_a")], ) + same_table = source_table + lineage_source.metadata.get_by_name.side_effect = [source_table, same_table] - same_table_as_source = Table( - id=uuid4(), - name=EntityName(root="src"), - fullyQualifiedName=FullyQualifiedEntityName(root="local_unitycatalog.cat.schema.src"), - columns=[ - Column( - name=ColumnName(root="col_a"), - dataType=DataType.STRING, - fullyQualifiedName=FullyQualifiedEntityName(root="local_unitycatalog.cat.schema.src.col_a"), - ) - ], - ) + row = LineageRow("cat.schema.tbl", "cat.schema.tbl", '[{"u":"col_a","d":"col_a"}]') + edge = lineage_source._build_table_edge(row) - result = lineage_source._get_column_lineage_details( - same_table_as_source, table, "cat.schema.src", "cat.schema.tgt" - ) + assert edge.edge.lineageDetails.columnsLineage is None - assert result is not None - assert len(result.columnsLineage) == 1 - def test_no_column_lineage_returns_none(self, lineage_source): - lineage_source.column_lineage_map = {} +class TestYieldTableLineage: + def test_emits_resolved_edges(self, lineage_source): + lineage_source.source_config.queryLogDuration = 1 + source_table = _make_table("source", "local_unitycatalog.cat.schema.source") + target_table = _make_table("target", "local_unitycatalog.cat.schema.target") + lineage_source.metadata.get_by_name.side_effect = [source_table, target_table, None, None] - table = Table( - id=uuid4(), - name=EntityName(root="tgt"), - fullyQualifiedName=FullyQualifiedEntityName(root="local_unitycatalog.cat.schema.tgt"), - columns=[], - ) - from_table = Table( - id=uuid4(), - name=EntityName(root="src"), - fullyQualifiedName=FullyQualifiedEntityName(root="local_unitycatalog.cat.schema.src"), - columns=[], - ) + rows = [LineageRow("cat.schema.source", "cat.schema.target", None)] + with patch.object(lineage_source, "_iter_date_windows", return_value=[("s", "e")]): + _mock_query_rows(lineage_source, rows) + results = list(lineage_source._yield_table_lineage()) - result = lineage_source._get_column_lineage_details(from_table, table, "cat.schema.src", "cat.schema.tgt") + assert len(results) == 1 + assert isinstance(results[0].right, AddLineageRequest) - assert result is None + def test_dedupes_edges_across_windows(self, lineage_source): + source_table = _make_table("source", "local_unitycatalog.cat.schema.source") + target_table = _make_table("target", "local_unitycatalog.cat.schema.target") + lineage_source.metadata.get_by_name.side_effect = [source_table, target_table] + rows = [LineageRow("cat.schema.source", "cat.schema.target", None)] + with patch.object(lineage_source, "_iter_date_windows", return_value=[("s1", "e1"), ("s2", "e2")]): + _mock_query_rows(lineage_source, rows) + results = list(lineage_source._yield_table_lineage()) -class TestExternalLocationLineage: - def test_cache_external_locations(self, lineage_source): - ExternalRow = namedtuple( - "ExternalRow", - ["table_catalog", "table_schema", "table_name", "storage_path"], - ) - mock_rows = [ - ExternalRow("cat", "schema", "ext_table1", "s3://bucket/path1"), - ExternalRow("cat", "schema", "ext_table2", "s3://bucket/path2/"), - ] + assert len(results) == 1 + assert isinstance(results[0].right, AddLineageRequest) + assert lineage_source.metadata.get_by_name.call_count == 2 - mock_conn = MagicMock() - mock_conn.execute.return_value = mock_rows - lineage_source.engine.connect.return_value.__enter__ = Mock(return_value=mock_conn) - lineage_source.engine.connect.return_value.__exit__ = Mock(return_value=False) + def test_skips_unresolved_edges(self, lineage_source): + lineage_source.metadata.get_by_name.return_value = None + + rows = [LineageRow("cat.schema.source", "cat.schema.target", None)] + with patch.object(lineage_source, "_iter_date_windows", return_value=[("s", "e")]): + _mock_query_rows(lineage_source, rows) + results = list(lineage_source._yield_table_lineage()) - lineage_source._cache_external_locations() + assert results == [] - assert len(lineage_source.external_location_map) == 2 - assert lineage_source.external_location_map["cat.schema.ext_table1"] == "s3://bucket/path1" - assert lineage_source.external_location_map["cat.schema.ext_table2"] == "s3://bucket/path2/" + def test_row_failure_yields_left(self, lineage_source): + rows = [LineageRow("cat.schema.source", "cat.schema.target", None)] + with ( + patch.object(lineage_source, "_iter_date_windows", return_value=[("s", "e")]), + patch.object(lineage_source, "_build_table_edge", side_effect=Exception("boom")), + ): + _mock_query_rows(lineage_source, rows) + results = list(lineage_source._yield_table_lineage()) + + assert len(results) == 1 + assert results[0].left is not None + assert results[0].right is None + assert "boom" in results[0].left.error - def test_cache_external_locations_handles_failure(self, lineage_source): + def test_window_failure_yields_left(self, lineage_source): mock_conn = MagicMock() - mock_conn.execute.side_effect = Exception("Access denied") + mock_conn.execution_options.return_value.execute.side_effect = Exception("Access denied") lineage_source.engine.connect.return_value.__enter__ = Mock(return_value=mock_conn) lineage_source.engine.connect.return_value.__exit__ = Mock(return_value=False) - lineage_source._cache_external_locations() + with patch.object(lineage_source, "_iter_date_windows", return_value=[("s", "e")]): + results = list(lineage_source._yield_table_lineage()) - assert len(lineage_source.external_location_map) == 0 + assert len(results) == 1 + assert results[0].right is None + assert "Access denied" in results[0].left.error + + def test_filtered_rows_skipped_without_resolution(self, lineage_source): + rows = [LineageRow("cat.schema.source", "cat.schema.target", None)] + with ( + patch.object(lineage_source, "_iter_date_windows", return_value=[("s", "e")]), + patch( + "metadata.ingestion.source.database.unitycatalog.lineage.filter_by_table", + return_value=True, + ), + ): + _mock_query_rows(lineage_source, rows) + results = list(lineage_source._yield_table_lineage()) + + assert results == [] + lineage_source.metadata.get_by_name.assert_not_called() + + def test_query_scoped_to_window(self, lineage_source): + with patch.object(lineage_source, "_iter_date_windows", return_value=[("2026-01-01", "2026-01-02")]): + mock_conn = _mock_query_rows(lineage_source, []) + list(lineage_source._yield_table_lineage()) + + executed = str(mock_conn.execution_options.return_value.execute.call_args.args[0]) + assert "2026-01-01" in executed + assert "2026-01-02" in executed + assert "split_part" not in executed - def test_process_external_location_lineage_from_cache(self, lineage_source): - lineage_source.external_location_map = {"cat.schema.test_table": "s3://bucket/path"} - table_entity = Table( - id=uuid4(), - name=EntityName(root="test_table"), - fullyQualifiedName=FullyQualifiedEntityName(root="service.db.schema.test_table"), - columns=[], +class TestExternalLocationLineage: + def test_yields_container_lineage(self, lineage_source): + ExternalRow = namedtuple( + "ExternalRow", + ["table_catalog", "table_schema", "table_name", "storage_path"], ) - + rows = [ExternalRow("cat", "schema", "ext_table", "s3://bucket/path/")] + table_entity = _make_table("ext_table", "local_unitycatalog.cat.schema.ext_table") container_entity = Container( id=uuid4(), name=EntityName(root="test_container"), service=EntityReference(id=uuid4(), type="storageService"), ) + lineage_source.metadata.get_by_name.return_value = table_entity lineage_source.metadata.es_search_container_by_path.return_value = [container_entity] + _mock_query_rows(lineage_source, rows) - results = list(lineage_source._process_external_location_lineage(table_entity, "cat.schema.test_table")) + results = list(lineage_source._yield_external_lineage()) assert len(results) == 1 - assert isinstance(results[0], Either) - assert isinstance(results[0].right, AddLineageRequest) assert results[0].right.edge.fromEntity.id == container_entity.id assert results[0].right.edge.fromEntity.type == "container" assert results[0].right.edge.toEntity.id == table_entity.id - assert results[0].right.edge.toEntity.type == "table" - lineage_source.metadata.es_search_container_by_path.assert_called_once_with( full_path="s3://bucket/path", fields="dataModel" ) - def test_process_external_location_strips_trailing_slash(self, lineage_source): - lineage_source.external_location_map = {"cat.schema.test_table": "s3://test-bucket/data/"} - - table_entity = Table( - id=uuid4(), - name=EntityName(root="test_table"), - fullyQualifiedName=FullyQualifiedEntityName(root="service.db.schema.test_table"), - columns=[], + def test_skips_when_table_not_in_om(self, lineage_source): + ExternalRow = namedtuple( + "ExternalRow", + ["table_catalog", "table_schema", "table_name", "storage_path"], ) + rows = [ExternalRow("cat", "schema", "ext_table", "s3://bucket/path")] + lineage_source.metadata.get_by_name.return_value = None + _mock_query_rows(lineage_source, rows) - container_entity = Container( - id=uuid4(), - name=EntityName(root="test_container"), - service=EntityReference(id=uuid4(), type="storageService"), - ) + results = list(lineage_source._yield_external_lineage()) - lineage_source.metadata.es_search_container_by_path.return_value = [container_entity] + assert results == [] + lineage_source.metadata.es_search_container_by_path.assert_not_called() - results = list(lineage_source._process_external_location_lineage(table_entity, "cat.schema.test_table")) + def test_no_storage_path_yields_nothing(self, lineage_source): + table_entity = _make_table("test_table", "service.db.schema.test_table") - assert len(results) == 1 - lineage_source.metadata.es_search_container_by_path.assert_called_once_with( - full_path="s3://test-bucket/data", fields="dataModel" - ) + results = list(lineage_source._process_external_location_lineage(table_entity, None)) - def test_process_external_location_no_cache_entry(self, lineage_source): - lineage_source.external_location_map = {} + assert results == [] - table_entity = Table( - id=uuid4(), - name=EntityName(root="test_table"), - fullyQualifiedName=FullyQualifiedEntityName(root="service.db.schema.test_table"), - columns=[], - ) + def test_no_container_found(self, lineage_source): + table_entity = _make_table("test_table", "service.db.schema.test_table") + lineage_source.metadata.es_search_container_by_path.return_value = [] - results = list(lineage_source._process_external_location_lineage(table_entity, "cat.schema.test_table")) + results = list(lineage_source._process_external_location_lineage(table_entity, "s3://bucket/path")) - assert len(results) == 0 + assert results == [] - def test_process_external_location_no_container_found(self, lineage_source): - lineage_source.external_location_map = {"cat.schema.test_table": "s3://bucket/path"} + def test_failure_yields_left(self, lineage_source): + table_entity = _make_table("test_table", "service.db.schema.test_table") + lineage_source.metadata.es_search_container_by_path.side_effect = Exception("Search unavailable") - table_entity = Table( - id=uuid4(), - name=EntityName(root="test_table"), - fullyQualifiedName=FullyQualifiedEntityName(root="service.db.schema.test_table"), - columns=[], - ) + results = list(lineage_source._process_external_location_lineage(table_entity, "s3://bucket/path")) - lineage_source.metadata.es_search_container_by_path.return_value = [] + assert len(results) == 1 + assert results[0].left is not None + assert "Search unavailable" in results[0].left.error + + def test_external_scan_failure_yields_left(self, lineage_source): + mock_conn = MagicMock() + mock_conn.execution_options.return_value.execute.side_effect = Exception("Access denied") + lineage_source.engine.connect.return_value.__enter__ = Mock(return_value=mock_conn) + lineage_source.engine.connect.return_value.__exit__ = Mock(return_value=False) - results = list(lineage_source._process_external_location_lineage(table_entity, "cat.schema.test_table")) + results = list(lineage_source._yield_external_lineage()) - assert len(results) == 0 + assert len(results) == 1 + assert results[0].right is None + assert "Access denied" in results[0].left.error + + +class TestIsFilteredTable: + def test_filters_by_database(self, lineage_source): + with patch( + "metadata.ingestion.source.database.unitycatalog.lineage.filter_by_database", + return_value=True, + ): + assert lineage_source._is_filtered_table("system.schema.tbl") is True + + def test_not_filtered_when_all_patterns_pass(self, lineage_source): + assert lineage_source._is_filtered_table("cat.schema.tbl") is False + + def test_malformed_name_not_filtered(self, lineage_source): + assert lineage_source._is_filtered_table("not_a_fqn") is False + + def test_normalizes_case_before_filtering(self, lineage_source): + seen = [] + with patch( + "metadata.ingestion.source.database.unitycatalog.lineage.filter_by_schema", + side_effect=lambda _pattern, name: seen.append(name) or False, + ): + lineage_source._is_filtered_table("CAT.MySchema.MyTable") + + assert seen == ["myschema"] + + +class TestIter: + def test_chains_table_then_external_lineage(self, lineage_source): + with ( + patch.object(lineage_source, "_yield_table_lineage", return_value=["t1", "t2"]) as mock_table, + patch.object(lineage_source, "_yield_external_lineage", return_value=["e1"]) as mock_external, + ): + results = list(lineage_source._iter()) + + mock_table.assert_called_once_with() + mock_external.assert_called_once_with() + assert results == ["t1", "t2", "e1"] + + def test_does_not_enumerate_catalogs(self, lineage_source): + with ( + patch.object(lineage_source, "_yield_table_lineage", return_value=[]), + patch.object(lineage_source, "_yield_external_lineage", return_value=[]), + ): + list(lineage_source._iter()) + + lineage_source.metadata.list_all_entities.assert_not_called() class TestContainerColumnLineage: diff --git a/ingestion/tests/unit/topology/database/test_unitycatalog_incremental.py b/ingestion/tests/unit/topology/database/test_unitycatalog_incremental.py index f2b8905b9925..defcdb53a98e 100644 --- a/ingestion/tests/unit/topology/database/test_unitycatalog_incremental.py +++ b/ingestion/tests/unit/topology/database/test_unitycatalog_incremental.py @@ -366,6 +366,25 @@ def test_mark_tables_as_deleted_incremental_uses_explicit_list(self): _, kwargs = delete_mock.call_args assert kwargs["entity_names"] == ["svc.cat.schema1.dropped"] + def test_mark_tables_as_deleted_drains_global_list(self): + source = self._make_source() + source.incremental.enabled = True + source.source_config.markDeletedTables = True + source.context.get.return_value = SimpleNamespace(database="cat") + deleted_tables = ["svc.cat.schema1.dropped", "svc.cat.schema2.dropped"] + source.context.get_global.return_value = SimpleNamespace(deleted_tables=deleted_tables) + + with patch( + f"{UC_METADATA_MODULE}.delete_entity_by_name", + return_value=iter(["deleted"]), + ) as delete_mock: + result = list(UnitycatalogSource.mark_tables_as_deleted(source)) + + assert result == ["deleted"] + _, kwargs = delete_mock.call_args + assert kwargs["entity_names"] == ["svc.cat.schema1.dropped", "svc.cat.schema2.dropped"] + assert deleted_tables == [] + def test_mark_tables_as_deleted_incremental_respects_mark_flag(self): source = self._make_source() source.incremental.enabled = True diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/unityCatalogConnection.json b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/unityCatalogConnection.json index adae56758679..f30a22b34024 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/unityCatalogConnection.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/unityCatalogConnection.json @@ -80,6 +80,13 @@ "type": "integer", "default": 120 }, + "lineageQueryChunkSize": { + "title": "Lineage Query Chunk Size", + "description": "Number of days of lineage scanned per query against the Unity Catalog system tables. The configured lineage lookback window is split into chunks of this size and streamed one chunk at a time so each scan stays bounded. Larger values issue fewer queries (faster); smaller values keep each scan smaller.", + "type": "integer", + "default": 7, + "minimum": 1 + }, "connectionOptions": { "title": "Connection Options", "$ref": "../connectionBasicType.json#/definitions/connectionOptions" diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/api/automations/createWorkflow.ts b/openmetadata-ui/src/main/resources/ui/src/generated/api/automations/createWorkflow.ts index 491617db60f8..c6b61dcb28e0 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/api/automations/createWorkflow.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/api/automations/createWorkflow.ts @@ -1548,6 +1548,13 @@ export interface ConfigObject { * Hostname of the Couchbase service. */ hostport?: string; + /** + * Number of days of lineage scanned per query against the Unity Catalog system tables. The + * configured lineage lookback window is split into chunks of this size and streamed one + * chunk at a time so each scan stays bounded. Larger values issue fewer queries (faster); + * smaller values keep each scan smaller. + */ + lineageQueryChunkSize?: number; /** * Enable dataflow for ingestion */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/api/services/createDatabaseService.ts b/openmetadata-ui/src/main/resources/ui/src/generated/api/services/createDatabaseService.ts index b17143b3a417..71715b51e449 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/api/services/createDatabaseService.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/api/services/createDatabaseService.ts @@ -1037,6 +1037,13 @@ export interface Connection { * Hostname of the Couchbase service. */ hostport?: string; + /** + * Number of days of lineage scanned per query against the Unity Catalog system tables. The + * configured lineage lookback window is split into chunks of this size and streamed one + * chunk at a time so each scan stays bounded. Larger values issue fewer queries (faster); + * smaller values keep each scan smaller. + */ + lineageQueryChunkSize?: number; /** * Enable dataflow for ingestion */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/api/services/ingestionPipelines/createIngestionPipeline.ts b/openmetadata-ui/src/main/resources/ui/src/generated/api/services/ingestionPipelines/createIngestionPipeline.ts index 95464c28a38f..a8aa35725bbc 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/api/services/ingestionPipelines/createIngestionPipeline.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/api/services/ingestionPipelines/createIngestionPipeline.ts @@ -4773,6 +4773,13 @@ export interface ConfigObject { * Hostname of the Couchbase service. */ hostport?: string; + /** + * Number of days of lineage scanned per query against the Unity Catalog system tables. The + * configured lineage lookback window is split into chunks of this size and streamed one + * chunk at a time so each scan stays bounded. Larger values issue fewer queries (faster); + * smaller values keep each scan smaller. + */ + lineageQueryChunkSize?: number; /** * Enable dataflow for ingestion */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/automations/testServiceConnection.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/automations/testServiceConnection.ts index 7cd592cc6213..33a59959786b 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/automations/testServiceConnection.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/automations/testServiceConnection.ts @@ -1430,6 +1430,13 @@ export interface ConfigObject { * Hostname of the Couchbase service. */ hostport?: string; + /** + * Number of days of lineage scanned per query against the Unity Catalog system tables. The + * configured lineage lookback window is split into chunks of this size and streamed one + * chunk at a time so each scan stays bounded. Larger values issue fewer queries (faster); + * smaller values keep each scan smaller. + */ + lineageQueryChunkSize?: number; /** * Enable dataflow for ingestion */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/automations/workflow.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/automations/workflow.ts index 05ec6b676fda..8b59a0679f65 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/automations/workflow.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/automations/workflow.ts @@ -2101,6 +2101,13 @@ export interface ConfigObject { * Hostname of the Couchbase service. */ hostport?: string; + /** + * Number of days of lineage scanned per query against the Unity Catalog system tables. The + * configured lineage lookback window is split into chunks of this size and streamed one + * chunk at a time so each scan stays bounded. Larger values issue fewer queries (faster); + * smaller values keep each scan smaller. + */ + lineageQueryChunkSize?: number; /** * Enable dataflow for ingestion */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/database/unityCatalogConnection.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/database/unityCatalogConnection.ts index 928550b27184..c88f451a38f7 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/database/unityCatalogConnection.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/database/unityCatalogConnection.ts @@ -49,7 +49,14 @@ export interface UnityCatalogConnection { /** * Databricks compute resources URL. */ - httpPath?: string; + httpPath?: string; + /** + * Number of days of lineage scanned per query against the Unity Catalog system tables. The + * configured lineage lookback window is split into chunks of this size and streamed one + * chunk at a time so each scan stays bounded. Larger values issue fewer queries (faster); + * smaller values keep each scan smaller. + */ + lineageQueryChunkSize?: number; sampleDataStorageConfig?: SampleDataStorageConfig; /** * Regex to only include/exclude schemas that matches the pattern. diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/serviceConnection.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/serviceConnection.ts index 56f62e3eba5d..8ab178cf5ed3 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/serviceConnection.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/serviceConnection.ts @@ -1643,6 +1643,13 @@ export interface ConfigObject { * Hostname of the Couchbase service. */ hostport?: string; + /** + * Number of days of lineage scanned per query against the Unity Catalog system tables. The + * configured lineage lookback window is split into chunks of this size and streamed one + * chunk at a time so each scan stays bounded. Larger values issue fewer queries (faster); + * smaller values keep each scan smaller. + */ + lineageQueryChunkSize?: number; /** * Enable dataflow for ingestion */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/databaseService.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/databaseService.ts index 46166b1e6a5f..59cc541678ec 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/databaseService.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/databaseService.ts @@ -1168,6 +1168,13 @@ export interface Connection { * Hostname of the Couchbase service. */ hostport?: string; + /** + * Number of days of lineage scanned per query against the Unity Catalog system tables. The + * configured lineage lookback window is split into chunks of this size and streamed one + * chunk at a time so each scan stays bounded. Larger values issue fewer queries (faster); + * smaller values keep each scan smaller. + */ + lineageQueryChunkSize?: number; /** * Enable dataflow for ingestion */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/ingestionPipelines/ingestionPipeline.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/ingestionPipelines/ingestionPipeline.ts index d27da5d0ac75..dec9255c5b1e 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/ingestionPipelines/ingestionPipeline.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/ingestionPipelines/ingestionPipeline.ts @@ -5303,6 +5303,13 @@ export interface ConfigObject { * Hostname of the Couchbase service. */ hostport?: string; + /** + * Number of days of lineage scanned per query against the Unity Catalog system tables. The + * configured lineage lookback window is split into chunks of this size and streamed one + * chunk at a time so each scan stays bounded. Larger values issue fewer queries (faster); + * smaller values keep each scan smaller. + */ + lineageQueryChunkSize?: number; /** * Enable dataflow for ingestion */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/testSuitePipeline.ts b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/testSuitePipeline.ts index 05ef7fb7d753..7aa769bafac7 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/testSuitePipeline.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/testSuitePipeline.ts @@ -1741,6 +1741,13 @@ export interface ConfigObject { * Hostname of the Couchbase service. */ hostport?: string; + /** + * Number of days of lineage scanned per query against the Unity Catalog system tables. The + * configured lineage lookback window is split into chunks of this size and streamed one + * chunk at a time so each scan stays bounded. Larger values issue fewer queries (faster); + * smaller values keep each scan smaller. + */ + lineageQueryChunkSize?: number; /** * Enable dataflow for ingestion */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/workflow.ts b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/workflow.ts index e9660f4cf725..4f674cc7c661 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/workflow.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/workflow.ts @@ -1732,6 +1732,13 @@ export interface ConfigObject { * Hostname of the Couchbase service. */ hostport?: string; + /** + * Number of days of lineage scanned per query against the Unity Catalog system tables. The + * configured lineage lookback window is split into chunks of this size and streamed one + * chunk at a time so each scan stays bounded. Larger values issue fewer queries (faster); + * smaller values keep each scan smaller. + */ + lineageQueryChunkSize?: number; /** * Enable dataflow for ingestion */