From 1fb5d754de69a4d1c292ebb510f42e1c3efaf051 Mon Sep 17 00:00:00 2001 From: ulixius9 Date: Fri, 22 May 2026 20:02:55 +0530 Subject: [PATCH 1/2] feat(ingestion): incremental metadata extraction for Unity Catalog Detect changed tables via information_schema.tables.last_altered and deleted tables via system.access.audit deleteTable events, mirroring the Snowflake/BigQuery incremental pattern. Degrades to no-op delete detection when the audit schema is unavailable. Tables present in both the changed and deleted sets (dropped and recreated within the window) are kept rather than deleted. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../incremental_table_processor.py | 116 +++++++ .../source/database/unitycatalog/metadata.py | 164 ++++++++-- .../source/database/unitycatalog/queries.py | 22 ++ .../database/test_unitycatalog_incremental.py | 302 ++++++++++++++++++ 4 files changed, 571 insertions(+), 33 deletions(-) create mode 100644 ingestion/src/metadata/ingestion/source/database/unitycatalog/incremental_table_processor.py create mode 100644 ingestion/tests/unit/topology/database/test_unitycatalog_incremental.py diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/incremental_table_processor.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/incremental_table_processor.py new file mode 100644 index 000000000000..251e5735469f --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/incremental_table_processor.py @@ -0,0 +1,116 @@ +# Copyright 2025 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Incremental Processor for Unity Catalog. + +Detects, for a single catalog and since a given watermark: + - changed tables, via `information_schema.tables.last_altered` + - deleted tables, via the `system.access.audit` `deleteTable` events + +Both queries degrade gracefully: a failure on either leaves that map empty and +logs a warning so the rest of the ingestion can proceed (e.g. when the +`system.access` schema is not enabled). +""" + +import traceback +from typing import Any, Callable, Dict, Optional, Set, Tuple # noqa: UP035 + +from sqlalchemy.engine import Connection +from sqlalchemy.sql import text + +from metadata.ingestion.source.database.unitycatalog.queries import ( + UNITY_CATALOG_GET_CHANGED_TABLES, + UNITY_CATALOG_GET_DELETED_TABLES, +) +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() + +SchemaToTables = Dict[str, Set[str]] # noqa: UP006 + + +class UnityCatalogIncrementalTableProcessor: + """Prepares the data needed for Unity Catalog incremental metadata extraction.""" + + def __init__(self, connection: Connection): + self.connection = connection + self._changed_map: SchemaToTables = {} + self._deleted_map: SchemaToTables = {} + + @classmethod + def create(cls, connection: Connection) -> "UnityCatalogIncrementalTableProcessor": + return cls(connection) + + def set_table_map(self, catalog: str, start_timestamp: int) -> None: + """Populate the changed and deleted table maps for a single catalog.""" + self._changed_map = self._run( + UNITY_CATALOG_GET_CHANGED_TABLES, + catalog, + start_timestamp, + row_to_schema_table=lambda row: (row[0], row[1]), + context="changed tables (information_schema)", + ) + self._deleted_map = self._run( + UNITY_CATALOG_GET_DELETED_TABLES, + catalog, + start_timestamp, + row_to_schema_table=lambda row: self._split_full_name(row[0]), + context="deleted tables (system.access.audit)", + ) + + def _run( + self, + query: str, + catalog: str, + start_timestamp: int, + row_to_schema_table: Callable[[Any], Optional[Tuple[str, str]]], # noqa: UP006, UP045 + context: str, + ) -> SchemaToTables: + """Execute a query and bucket its rows into a {schema: {table, ...}} map. + + On any failure (e.g. the system schema is not enabled), warn and return + an empty map so the ingestion can continue without delete/change data. + """ + table_map: SchemaToTables = {} + try: + rows = self.connection.execute(text(query.format(catalog=catalog, start_timestamp=start_timestamp))) + for row in rows or []: + schema_table = row_to_schema_table(row) + if schema_table: + schema, table = schema_table + table_map.setdefault(schema, set()).add(table) + except Exception as exc: + logger.warning( + "Could not query %s for catalog [%s]; incremental detection for this source will be skipped: %s", + context, + catalog, + exc, + ) + logger.debug(traceback.format_exc()) + return table_map + + @staticmethod + def _split_full_name(full_name: Optional[str]) -> Optional[Tuple[str, str]]: # noqa: UP045, UP006 + """Split a `catalog.schema.table` name into its (schema, table) parts.""" + result = None + if full_name and full_name.count(".") == 2: + _, schema, table = full_name.split(".") + result = (schema, table) + return result + + def get_changed(self, schema_name: str) -> Set[str]: # noqa: UP006 + """Return the names of tables changed since the watermark for a schema.""" + return self._changed_map.get(schema_name, set()) + + def get_deleted(self, schema_name: str) -> Set[str]: # noqa: UP006 + """Return the names of tables deleted since the watermark for a schema.""" + return self._deleted_map.get(schema_name, set()) diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py index b50aa43dad74..ef955d464bb0 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py @@ -56,6 +56,7 @@ Markdown, ) from metadata.generated.schema.type.entityReferenceList import EntityReferenceList +from metadata.ingestion.api.delete import delete_entity_by_name from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification @@ -65,6 +66,9 @@ from metadata.ingestion.source.database.external_table_lineage_mixin import ( ExternalTableLineageMixin, ) +from metadata.ingestion.source.database.incremental_metadata_extraction import ( + IncrementalConfig, +) from metadata.ingestion.source.database.multi_db_source import MultiDBSource from metadata.ingestion.source.database.stored_procedures_mixin import QueryByProcedure from metadata.ingestion.source.database.unitycatalog.client import UnityCatalogClient @@ -72,6 +76,9 @@ get_connection, get_sqlalchemy_connection, ) +from metadata.ingestion.source.database.unitycatalog.incremental_table_processor import ( + UnityCatalogIncrementalTableProcessor, +) from metadata.ingestion.source.database.unitycatalog.models import ( ColumnJson, ElementType, @@ -108,7 +115,12 @@ class UnitycatalogSource(ExternalTableLineageMixin, DatabaseServiceSource, Multi """ @retry_with_docker_host() - def __init__(self, config: WorkflowSource, metadata: OpenMetadata): + def __init__( + self, + config: WorkflowSource, + metadata: OpenMetadata, + incremental_configuration: IncrementalConfig, + ): super().__init__() self.config = config self.source_config: DatabaseServiceMetadataPipeline = self.config.sourceConfig.config @@ -124,6 +136,16 @@ def __init__(self, config: WorkflowSource, metadata: OpenMetadata): self._catalog_cache: dict[str, Any] = {} self._schema_cache: dict[str, Any] = {} self._owner_cache: dict[str, Optional[EntityReferenceList]] = {} # noqa: UP045 + + self.incremental = incremental_configuration + self.incremental_table_processor: UnityCatalogIncrementalTableProcessor | None = None + self.context.get_global().deleted_tables = [] + if self.incremental.enabled: + logger.info( + "Starting Incremental Metadata Extraction.\n\t Considering Table changes from %s", + self.incremental.start_datetime_utc, + ) + self.test_connection() self._sql_connection_map = {} @@ -156,7 +178,8 @@ def create(cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str connection: UnityCatalogConnection = config.serviceConnection.root.config if not isinstance(connection, UnityCatalogConnection): raise InvalidSourceException(f"Expected UnityCatalogConnection, but got {connection}") - return cls(config, metadata) + incremental_config = IncrementalConfig.create(config.sourceConfig.config.incremental, pipeline_name, metadata) + return cls(config, metadata, incremental_config) def get_database_names(self) -> Iterable[str]: """ @@ -178,6 +201,7 @@ def get_database_names(self) -> Iterable[str]: except Exception as exc: logger.debug(traceback.format_exc()) logger.warning(f"Failed to fetch configured catalog [{configured_catalog}]: {exc}") + self._set_incremental_table_processor(configured_catalog) yield configured_catalog else: for catalog_name in self.get_database_names_raw(): @@ -198,6 +222,7 @@ def get_database_names(self) -> Iterable[str]: "Database (Catalog ID) Filtered Out", ) continue + self._set_incremental_table_processor(catalog_name) yield catalog_name except Exception as exc: self.status.failed( @@ -208,6 +233,14 @@ def get_database_names(self) -> Iterable[str]: ) ) + def _set_incremental_table_processor(self, catalog: str) -> None: + """Prepare the changed/deleted table maps for incremental extraction of a catalog.""" + if self.incremental.enabled: + self.incremental_table_processor = UnityCatalogIncrementalTableProcessor.create(self.sql_connection) + self.incremental_table_processor.set_table_map( + catalog=catalog, start_timestamp=self.incremental.start_timestamp + ) + def yield_database(self, database_name: str) -> Iterable[Either[CreateDatabaseRequest]]: """ From topology. @@ -289,51 +322,95 @@ def get_tables_name_and_type(self) -> Iterable[Tuple[str, str]]: # noqa: UP006 Fetches them up using the context information and the inspector set when preparing the db. + In incremental mode, only the tables changed since the watermark are + fetched and processed; in full mode every table is listed. + :return: tables or views, depending on config """ schema_name = self.context.get().database_schema catalog_name = self.context.get().database - for table in self.client.tables.list( - catalog_name=catalog_name, - schema_name=schema_name, - ): - try: - table_name = table.name - table_fqn = fqn.build( - self.metadata, + if self.incremental.enabled and self.incremental_table_processor: + yield from self._get_incremental_tables(catalog_name, schema_name) + else: + for table in self.client.tables.list( + catalog_name=catalog_name, + schema_name=schema_name, + ): + yield from self._process_table(table, catalog_name, schema_name) + + def _get_incremental_tables(self, catalog_name: str, schema_name: str) -> Iterable[Tuple[str, str]]: # noqa: UP006 + """Record deleted tables and yield only the tables changed since the watermark.""" + processor = self.incremental_table_processor + if processor is None: + return + changed = processor.get_changed(schema_name) + # A name in both sets was dropped and recreated within the window. + # information_schema only lists existing tables, so a changed table + # exists now and must not be marked deleted. + for table_name in processor.get_deleted(schema_name) - changed: + self.context.get_global().deleted_tables.append( + fqn.build( + metadata=self.metadata, entity_type=Table, service_name=self.context.get().database_service, - database_name=self.context.get().database, - schema_name=self.context.get().database_schema, + database_name=catalog_name, + schema_name=schema_name, table_name=table_name, ) - if filter_by_table( - self.config.sourceConfig.config.tableFilterPattern, # pyright: ignore[reportAttributeAccessIssue] - (table_fqn if self.config.sourceConfig.config.useFqnForFiltering else table_name), # pyright: ignore[reportAttributeAccessIssue] - ): - self.status.filter( - table_fqn, - "Table Filtered Out", - ) - continue - table_type: TableType = TableType.Regular - if table.table_type: - if table.table_type.value.lower() == TableType.View.value.lower(): - table_type: TableType = TableType.View - if table.table_type.value.lower() == "materialized_view": - table_type: TableType = TableType.MaterializedView - elif table.table_type.value.lower() == TableType.External.value.lower(): - table_type: TableType = TableType.External - self.context.get().table_data = table - yield table_name, table_type + ) + for table_name in changed: + try: + table = self.client.tables.get(f"{catalog_name}.{schema_name}.{table_name}") except Exception as exc: self.status.failed( StackTraceError( - name=table.name, - error=f"Unexpected exception to get table [{table.name}]: {exc}", + name=table_name, + error=f"Unexpected exception to get changed table [{table_name}]: {exc}", stackTrace=traceback.format_exc(), ) ) + continue + yield from self._process_table(table, catalog_name, schema_name) + + def _process_table(self, table: Any, catalog_name: str, schema_name: str) -> Iterable[Tuple[str, str]]: # noqa: UP006 + """Apply filtering and table-type detection, then yield the table to the topology.""" + try: + table_name = table.name + table_fqn = fqn.build( + self.metadata, + entity_type=Table, + service_name=self.context.get().database_service, + database_name=catalog_name, + schema_name=schema_name, + table_name=table_name, + ) + if filter_by_table( + self.config.sourceConfig.config.tableFilterPattern, # pyright: ignore[reportAttributeAccessIssue] + (table_fqn if self.config.sourceConfig.config.useFqnForFiltering else table_name), # pyright: ignore[reportAttributeAccessIssue] + ): + self.status.filter( + table_fqn, + "Table Filtered Out", + ) + return + table_type: TableType = TableType.Regular + if table.table_type: + if table.table_type.value.lower() == TableType.View.value.lower(): + table_type: TableType = TableType.View + if table.table_type.value.lower() == "materialized_view": + table_type: TableType = TableType.MaterializedView + elif table.table_type.value.lower() == TableType.External.value.lower(): + table_type: TableType = TableType.External + self.context.get().table_data = table + yield table_name, table_type + except Exception as exc: + self.status.failed( + StackTraceError( + name=table.name, + error=f"Unexpected exception to get table [{table.name}]: {exc}", + stackTrace=traceback.format_exc(), + ) + ) def get_schema_definition(self, table_name: str, table_type: TableType, table: Any) -> Optional[str]: # noqa: UP045 """ @@ -504,6 +581,27 @@ def update_table_constraints(self, table_constraints, foreign_columns, columns) def prepare(self): """Nothing to prepare""" + def mark_tables_as_deleted(self): + """ + Mark tables as deleted. + + In incremental mode only the tables detected as deleted (from the audit + log) are marked; in full mode the standard stale-entity scan is used. + """ + if self.incremental.enabled: + if not self.context.get().__dict__.get("database"): + raise ValueError("No Database found in the context. We cannot run the table deletion.") + if self.source_config.markDeletedTables: + logger.info(f"Mark Deleted Tables set to True. Processing database [{self.context.get().database}]") + yield from delete_entity_by_name( + self.metadata, + entity_type=Table, + entity_names=self.context.get_global().deleted_tables, + mark_deleted_entity=self.source_config.markDeletedTables, + ) + else: + yield from super().mark_tables_as_deleted() + def add_complex_datatype_descriptions(self, column: Column, column_json: ColumnJson): """ Method to add descriptions to complex datatypes diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py index 329c59a7d9bb..46b52f94a583 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py @@ -103,6 +103,28 @@ """ ) +UNITY_CATALOG_GET_CHANGED_TABLES = textwrap.dedent( + """ + SELECT + table_schema, + table_name + FROM `{catalog}`.information_schema.tables + WHERE last_altered >= timestamp_millis({start_timestamp}) + """ +) + +UNITY_CATALOG_GET_DELETED_TABLES = textwrap.dedent( + """ + SELECT DISTINCT request_params.full_name_arg AS table_full_name + FROM system.access.audit + WHERE service_name = 'unityCatalog' + AND action_name = 'deleteTable' + AND event_date >= date(timestamp_millis({start_timestamp})) + AND event_time >= timestamp_millis({start_timestamp}) + AND request_params.full_name_arg LIKE '{catalog}.%%' + """ +) + UNITY_CATALOG_TEST_TABLE_LINEAGE = textwrap.dedent( """ SELECT COUNT(*) as count diff --git a/ingestion/tests/unit/topology/database/test_unitycatalog_incremental.py b/ingestion/tests/unit/topology/database/test_unitycatalog_incremental.py new file mode 100644 index 000000000000..0f537e9c4fb8 --- /dev/null +++ b/ingestion/tests/unit/topology/database/test_unitycatalog_incremental.py @@ -0,0 +1,302 @@ +# Copyright 2025 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Unit tests for Unity Catalog incremental metadata extraction. +""" + +from types import SimpleNamespace +from unittest.mock import Mock, patch + +from metadata.generated.schema.entity.data.table import TableType +from metadata.generated.schema.entity.services.connections.database.unityCatalogConnection import ( + UnityCatalogConnection, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) +from metadata.ingestion.source.database.database_service import DatabaseServiceSource +from metadata.ingestion.source.database.incremental_metadata_extraction import ( + IncrementalConfig, +) +from metadata.ingestion.source.database.unitycatalog.incremental_table_processor import ( + UnityCatalogIncrementalTableProcessor, +) +from metadata.ingestion.source.database.unitycatalog.metadata import UnitycatalogSource + +UC_METADATA_MODULE = "metadata.ingestion.source.database.unitycatalog.metadata" + +CHANGED_ROWS = [ + ("schema1", "tbl_a"), + ("schema1", "tbl_b"), + ("schema2", "tbl_c"), +] +DELETED_ROWS = [ + ("cat.schema1.dropped",), + ("cat.schema2.gone",), +] + + +class TestUnityCatalogIncrementalTableProcessor: + """Tests for the changed/deleted table detection processor.""" + + def test_set_table_map_buckets_changed_and_deleted(self): + connection = Mock() + connection.execute.side_effect = [CHANGED_ROWS, DELETED_ROWS] + + processor = UnityCatalogIncrementalTableProcessor.create(connection) + processor.set_table_map(catalog="cat", start_timestamp=1000) + + assert processor.get_changed("schema1") == {"tbl_a", "tbl_b"} + assert processor.get_changed("schema2") == {"tbl_c"} + assert processor.get_changed("unknown") == set() + assert processor.get_deleted("schema1") == {"dropped"} + assert processor.get_deleted("schema2") == {"gone"} + assert processor.get_deleted("unknown") == set() + + def test_set_table_map_degrades_when_both_queries_fail(self): + connection = Mock() + connection.execute.side_effect = Exception("system schema not enabled") + + processor = UnityCatalogIncrementalTableProcessor.create(connection) + processor.set_table_map(catalog="cat", start_timestamp=1000) + + assert processor.get_changed("schema1") == set() + assert processor.get_deleted("schema1") == set() + + def test_set_table_map_degrades_only_for_failing_query(self): + connection = Mock() + connection.execute.side_effect = [CHANGED_ROWS, Exception("no audit access")] + + processor = UnityCatalogIncrementalTableProcessor.create(connection) + processor.set_table_map(catalog="cat", start_timestamp=1000) + + assert processor.get_changed("schema1") == {"tbl_a", "tbl_b"} + assert processor.get_deleted("schema1") == set() + + def test_split_full_name(self): + assert UnityCatalogIncrementalTableProcessor._split_full_name("cat.sch.tbl") == ( + "sch", + "tbl", + ) + assert UnityCatalogIncrementalTableProcessor._split_full_name("only.two") is None + assert UnityCatalogIncrementalTableProcessor._split_full_name("a.b.c.d") is None + assert UnityCatalogIncrementalTableProcessor._split_full_name(None) is None + assert UnityCatalogIncrementalTableProcessor._split_full_name("") is None + + +class TestUnityCatalogIncrementalSource: + """Tests for the incremental hooks on the Unity Catalog source.""" + + def _make_source(self): + """A plain Mock used as `self`. + + We cannot use Mock(spec=UnitycatalogSource) here because the methods + under test read instance attributes (config, context, client, status, + incremental, ...) that are assigned in __init__ and are therefore not + part of the class spec. + """ + return Mock() + + def test_process_table_yields_regular_table(self): + source = self._make_source() + source.config.sourceConfig.config.useFqnForFiltering = False + table = SimpleNamespace(name="tbl_a", table_type=None) + + with ( + patch(f"{UC_METADATA_MODULE}.fqn") as fqn_mock, + patch(f"{UC_METADATA_MODULE}.filter_by_table", return_value=False), + ): + fqn_mock.build.return_value = "svc.cat.schema1.tbl_a" + result = list(UnitycatalogSource._process_table(source, table, "cat", "schema1")) + + assert result == [("tbl_a", TableType.Regular)] + assert source.context.get().table_data is table + + def test_process_table_detects_view(self): + source = self._make_source() + source.config.sourceConfig.config.useFqnForFiltering = False + table = SimpleNamespace(name="v_a", table_type=SimpleNamespace(value="VIEW")) + + with ( + patch(f"{UC_METADATA_MODULE}.fqn") as fqn_mock, + patch(f"{UC_METADATA_MODULE}.filter_by_table", return_value=False), + ): + fqn_mock.build.return_value = "svc.cat.schema1.v_a" + result = list(UnitycatalogSource._process_table(source, table, "cat", "schema1")) + + assert result == [("v_a", TableType.View)] + + def test_process_table_skips_filtered_table(self): + source = self._make_source() + source.config.sourceConfig.config.useFqnForFiltering = False + table = SimpleNamespace(name="tbl_a", table_type=None) + + with ( + patch(f"{UC_METADATA_MODULE}.fqn") as fqn_mock, + patch(f"{UC_METADATA_MODULE}.filter_by_table", return_value=True), + ): + fqn_mock.build.return_value = "svc.cat.schema1.tbl_a" + result = list(UnitycatalogSource._process_table(source, table, "cat", "schema1")) + + assert result == [] + source.status.filter.assert_called_once() + + def test_get_incremental_tables_records_deletes_and_yields_changes(self): + source = self._make_source() + processor = Mock() + processor.get_deleted.return_value = {"dropped"} + processor.get_changed.return_value = {"chg"} + source.incremental_table_processor = processor + source.context.get_global.return_value = SimpleNamespace(deleted_tables=[]) + changed_table = SimpleNamespace(name="chg", table_type=None) + source.client.tables.get.return_value = changed_table + source._process_table.return_value = iter([("chg", TableType.Regular)]) + + with patch(f"{UC_METADATA_MODULE}.fqn") as fqn_mock: + fqn_mock.build.return_value = "svc.cat.schema1.dropped" + result = list(UnitycatalogSource._get_incremental_tables(source, "cat", "schema1")) + + assert result == [("chg", TableType.Regular)] + assert source.context.get_global().deleted_tables == ["svc.cat.schema1.dropped"] + source.client.tables.get.assert_called_once_with("cat.schema1.chg") + source._process_table.assert_called_once_with(changed_table, "cat", "schema1") + + def test_get_incremental_tables_excludes_recreated_from_deletes(self): + """A table dropped AND recreated within the window exists now (it shows up + in information_schema), so it must be processed, not marked deleted.""" + source = self._make_source() + processor = Mock() + processor.get_deleted.return_value = {"keep_deleted", "recreated"} + processor.get_changed.return_value = {"recreated", "new_change"} + source.incremental_table_processor = processor + source.context.get_global.return_value = SimpleNamespace(deleted_tables=[]) + source.client.tables.get.return_value = SimpleNamespace(name="x", table_type=None) + source._process_table.return_value = iter([]) + + with patch(f"{UC_METADATA_MODULE}.fqn") as fqn_mock: + fqn_mock.build.side_effect = lambda **kw: kw["table_name"] + list(UnitycatalogSource._get_incremental_tables(source, "cat", "schema1")) + + assert source.context.get_global().deleted_tables == ["keep_deleted"] + fetched = {c.args[0] for c in source.client.tables.get.call_args_list} + assert fetched == {"cat.schema1.recreated", "cat.schema1.new_change"} + + def test_get_incremental_tables_handles_get_failure(self): + source = self._make_source() + processor = Mock() + processor.get_deleted.return_value = set() + processor.get_changed.return_value = {"chg"} + source.incremental_table_processor = processor + source.context.get_global.return_value = SimpleNamespace(deleted_tables=[]) + source.client.tables.get.side_effect = Exception("boom") + + result = list(UnitycatalogSource._get_incremental_tables(source, "cat", "schema1")) + + assert result == [] + source.status.failed.assert_called_once() + + def test_get_incremental_tables_noop_without_processor(self): + source = self._make_source() + source.incremental_table_processor = None + + result = list(UnitycatalogSource._get_incremental_tables(source, "cat", "schema1")) + + assert result == [] + + def test_get_tables_uses_incremental_path_when_enabled(self): + source = self._make_source() + source.incremental.enabled = True + source.incremental_table_processor = Mock() + source.context.get.return_value = SimpleNamespace(database="cat", database_schema="schema1") + source._get_incremental_tables.return_value = iter([("x", TableType.Regular)]) + + result = list(UnitycatalogSource.get_tables_name_and_type(source)) + + assert result == [("x", TableType.Regular)] + source._get_incremental_tables.assert_called_once_with("cat", "schema1") + source.client.tables.list.assert_not_called() + + def test_get_tables_uses_full_path_when_disabled(self): + source = self._make_source() + source.incremental.enabled = False + source.context.get.return_value = SimpleNamespace(database="cat", database_schema="schema1") + tables = [SimpleNamespace(name="t1"), SimpleNamespace(name="t2")] + source.client.tables.list.return_value = tables + source._process_table.side_effect = lambda table, catalog, schema: iter([(table.name, TableType.Regular)]) + + result = list(UnitycatalogSource.get_tables_name_and_type(source)) + + assert result == [("t1", TableType.Regular), ("t2", TableType.Regular)] + source.client.tables.list.assert_called_once_with(catalog_name="cat", schema_name="schema1") + source._get_incremental_tables.assert_not_called() + + def test_mark_tables_as_deleted_incremental_uses_explicit_list(self): + source = self._make_source() + source.incremental.enabled = True + source.source_config.markDeletedTables = True + source.context.get.return_value = SimpleNamespace(database="cat") + source.context.get_global.return_value = SimpleNamespace(deleted_tables=["svc.cat.schema1.dropped"]) + + with patch( + f"{UC_METADATA_MODULE}.delete_entity_by_name", + return_value=iter(["deleted"]), + ) as delete_mock: + result = list(UnitycatalogSource.mark_tables_as_deleted(source)) + + assert result == ["deleted"] + _, kwargs = delete_mock.call_args + assert kwargs["entity_names"] == ["svc.cat.schema1.dropped"] + + def test_mark_tables_as_deleted_incremental_respects_mark_flag(self): + source = self._make_source() + source.incremental.enabled = True + source.source_config.markDeletedTables = False + source.context.get.return_value = SimpleNamespace(database="cat") + + with patch(f"{UC_METADATA_MODULE}.delete_entity_by_name") as delete_mock: + result = list(UnitycatalogSource.mark_tables_as_deleted(source)) + + assert result == [] + delete_mock.assert_not_called() + + def test_mark_tables_as_deleted_full_mode_delegates_to_super(self): + instance = UnitycatalogSource.__new__(UnitycatalogSource) + instance.incremental = SimpleNamespace(enabled=False) + + with patch.object( + DatabaseServiceSource, + "mark_tables_as_deleted", + return_value=iter(["base"]), + ) as base_mark: + result = list(instance.mark_tables_as_deleted()) + + assert result == ["base"] + base_mark.assert_called_once() + + def test_create_builds_and_forwards_incremental_config(self): + config = Mock() + config.serviceConnection.root.config = Mock(spec=UnityCatalogConnection) + metadata = Mock() + sentinel = object() + + with ( + patch.object(WorkflowSource, "model_validate", return_value=config), + patch.object(IncrementalConfig, "create", return_value=sentinel) as incremental_create, + patch.object(UnitycatalogSource, "__init__", return_value=None) as init_mock, + ): + UnitycatalogSource.create({"k": "v"}, metadata, pipeline_name="pipe") + + incremental_create.assert_called_once_with(config.sourceConfig.config.incremental, "pipe", metadata) + init_args = init_mock.call_args.args + assert config in init_args + assert metadata in init_args + assert sentinel in init_args From b355b82980df9178ab2ea443554c9728f91698a9 Mon Sep 17 00:00:00 2001 From: ulixius9 Date: Mon, 25 May 2026 16:04:11 +0530 Subject: [PATCH 2/2] fix(unitycatalog): address review feedback on incremental ingestion - Exact catalog match for delete detection: replace the LIKE '{catalog}.%' wildcard (underscores in catalog names act as single-char wildcards) with substring_index(full_name_arg,'.',1) = '{catalog}'. - Harden against SQL injection: validate the catalog name against an allowlist before interpolating it into the change/delete queries; skip incremental detection (warn) on an unexpected name. - Use getattr(table,"name","") in the _process_table exception handler so a malformed table cannot raise while logging. - Type-checking: align get_tables_name_and_type / _get_incremental_tables / _process_table return types to Tuple[str, TableType] (matching the base class), collapse the table_type redeclaration to a single annotation, and add targeted pyright ignores for the framework's dynamic TopologyContext attributes. - Add unit tests for the catalog allowlist, the exact-match delete query, and the safe-identifier exception path. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../incremental_table_processor.py | 18 ++++++++- .../source/database/unitycatalog/metadata.py | 40 ++++++++++--------- .../source/database/unitycatalog/queries.py | 2 +- .../database/test_unitycatalog_incremental.py | 31 ++++++++++++++ 4 files changed, 70 insertions(+), 21 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/incremental_table_processor.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/incremental_table_processor.py index 251e5735469f..13e7fdfe5a96 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/incremental_table_processor.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/incremental_table_processor.py @@ -21,6 +21,7 @@ `system.access` schema is not enabled). """ +import re import traceback from typing import Any, Callable, Dict, Optional, Set, Tuple # noqa: UP035 @@ -37,6 +38,8 @@ SchemaToTables = Dict[str, Set[str]] # noqa: UP006 +VALID_CATALOG_NAME = re.compile(r"^[A-Za-z0-9_]+$") + class UnityCatalogIncrementalTableProcessor: """Prepares the data needed for Unity Catalog incremental metadata extraction.""" @@ -51,7 +54,20 @@ def create(cls, connection: Connection) -> "UnityCatalogIncrementalTableProcesso return cls(connection) def set_table_map(self, catalog: str, start_timestamp: int) -> None: - """Populate the changed and deleted table maps for a single catalog.""" + """Populate the changed and deleted table maps for a single catalog. + + The catalog name is interpolated into SQL, so it is validated against an + allowlist first. An unexpected name skips incremental detection (the maps + stay empty) rather than risking an injected query. + """ + self._changed_map = {} + self._deleted_map = {} + if not VALID_CATALOG_NAME.match(catalog): + logger.warning( + "Catalog name [%s] is not a simple identifier; skipping incremental change/delete detection for it.", + catalog, + ) + return self._changed_map = self._run( UNITY_CATALOG_GET_CHANGED_TABLES, catalog, diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py index ef955d464bb0..5a6f0dfb02f7 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py @@ -139,7 +139,7 @@ def __init__( self.incremental = incremental_configuration self.incremental_table_processor: UnityCatalogIncrementalTableProcessor | None = None - self.context.get_global().deleted_tables = [] + self.context.get_global().deleted_tables = [] # pyright: ignore[reportAttributeAccessIssue] if self.incremental.enabled: logger.info( "Starting Incremental Metadata Extraction.\n\t Considering Table changes from %s", @@ -178,7 +178,7 @@ def create(cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str connection: UnityCatalogConnection = config.serviceConnection.root.config if not isinstance(connection, UnityCatalogConnection): raise InvalidSourceException(f"Expected UnityCatalogConnection, but got {connection}") - incremental_config = IncrementalConfig.create(config.sourceConfig.config.incremental, pipeline_name, metadata) + incremental_config = IncrementalConfig.create(config.sourceConfig.config.incremental, pipeline_name, metadata) # pyright: ignore[reportArgumentType, reportAttributeAccessIssue, reportOptionalMemberAccess] return cls(config, metadata, incremental_config) def get_database_names(self) -> Iterable[str]: @@ -238,7 +238,8 @@ def _set_incremental_table_processor(self, catalog: str) -> None: if self.incremental.enabled: self.incremental_table_processor = UnityCatalogIncrementalTableProcessor.create(self.sql_connection) self.incremental_table_processor.set_table_map( - catalog=catalog, start_timestamp=self.incremental.start_timestamp + catalog=catalog, + start_timestamp=self.incremental.start_timestamp, # pyright: ignore[reportArgumentType] ) def yield_database(self, database_name: str) -> Iterable[Either[CreateDatabaseRequest]]: @@ -315,7 +316,7 @@ def yield_database_schema(self, schema_name: str) -> Iterable[Either[CreateDatab yield Either(right=schema_request) self.register_record_schema_request(schema_request=schema_request) - def get_tables_name_and_type(self) -> Iterable[Tuple[str, str]]: # noqa: UP006 + def get_tables_name_and_type(self) -> Iterable[Tuple[str, TableType]]: # noqa: UP006 """ Handle table and views. @@ -338,7 +339,7 @@ def get_tables_name_and_type(self) -> Iterable[Tuple[str, str]]: # noqa: UP006 ): yield from self._process_table(table, catalog_name, schema_name) - def _get_incremental_tables(self, catalog_name: str, schema_name: str) -> Iterable[Tuple[str, str]]: # noqa: UP006 + def _get_incremental_tables(self, catalog_name: str, schema_name: str) -> Iterable[Tuple[str, TableType]]: # noqa: UP006 """Record deleted tables and yield only the tables changed since the watermark.""" processor = self.incremental_table_processor if processor is None: @@ -348,7 +349,7 @@ def _get_incremental_tables(self, catalog_name: str, schema_name: str) -> Iterab # information_schema only lists existing tables, so a changed table # exists now and must not be marked deleted. for table_name in processor.get_deleted(schema_name) - changed: - self.context.get_global().deleted_tables.append( + self.context.get_global().deleted_tables.append( # pyright: ignore[reportAttributeAccessIssue] fqn.build( metadata=self.metadata, entity_type=Table, @@ -372,42 +373,43 @@ def _get_incremental_tables(self, catalog_name: str, schema_name: str) -> Iterab continue yield from self._process_table(table, catalog_name, schema_name) - def _process_table(self, table: Any, catalog_name: str, schema_name: str) -> Iterable[Tuple[str, str]]: # noqa: UP006 + def _process_table(self, table: Any, catalog_name: str, schema_name: str) -> Iterable[Tuple[str, TableType]]: # noqa: UP006 """Apply filtering and table-type detection, then yield the table to the topology.""" try: table_name = table.name table_fqn = fqn.build( self.metadata, entity_type=Table, - service_name=self.context.get().database_service, + service_name=self.context.get().database_service, # pyright: ignore[reportAttributeAccessIssue] database_name=catalog_name, schema_name=schema_name, table_name=table_name, ) if filter_by_table( - self.config.sourceConfig.config.tableFilterPattern, # pyright: ignore[reportAttributeAccessIssue] - (table_fqn if self.config.sourceConfig.config.useFqnForFiltering else table_name), # pyright: ignore[reportAttributeAccessIssue] + self.config.sourceConfig.config.tableFilterPattern, # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess] + (table_fqn if self.config.sourceConfig.config.useFqnForFiltering else table_name), # pyright: ignore[reportAttributeAccessIssue, reportArgumentType, reportOptionalMemberAccess] ): self.status.filter( - table_fqn, + table_fqn, # pyright: ignore[reportArgumentType] "Table Filtered Out", ) return table_type: TableType = TableType.Regular if table.table_type: if table.table_type.value.lower() == TableType.View.value.lower(): - table_type: TableType = TableType.View + table_type = TableType.View if table.table_type.value.lower() == "materialized_view": - table_type: TableType = TableType.MaterializedView + table_type = TableType.MaterializedView elif table.table_type.value.lower() == TableType.External.value.lower(): - table_type: TableType = TableType.External - self.context.get().table_data = table + table_type = TableType.External + self.context.get().table_data = table # pyright: ignore[reportAttributeAccessIssue] yield table_name, table_type except Exception as exc: + table_identifier = getattr(table, "name", "") self.status.failed( StackTraceError( - name=table.name, - error=f"Unexpected exception to get table [{table.name}]: {exc}", + name=table_identifier, + error=f"Unexpected exception to get table [{table_identifier}]: {exc}", stackTrace=traceback.format_exc(), ) ) @@ -592,11 +594,11 @@ def mark_tables_as_deleted(self): if not self.context.get().__dict__.get("database"): raise ValueError("No Database found in the context. We cannot run the table deletion.") if self.source_config.markDeletedTables: - logger.info(f"Mark Deleted Tables set to True. Processing database [{self.context.get().database}]") + logger.info(f"Mark Deleted Tables set to True. Processing database [{self.context.get().database}]") # pyright: ignore[reportAttributeAccessIssue] yield from delete_entity_by_name( self.metadata, entity_type=Table, - entity_names=self.context.get_global().deleted_tables, + entity_names=self.context.get_global().deleted_tables, # pyright: ignore[reportAttributeAccessIssue] mark_deleted_entity=self.source_config.markDeletedTables, ) else: diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py index 46b52f94a583..bb7d7d1aa206 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py @@ -121,7 +121,7 @@ AND action_name = 'deleteTable' AND event_date >= date(timestamp_millis({start_timestamp})) AND event_time >= timestamp_millis({start_timestamp}) - AND request_params.full_name_arg LIKE '{catalog}.%%' + AND substring_index(request_params.full_name_arg, '.', 1) = '{catalog}' """ ) diff --git a/ingestion/tests/unit/topology/database/test_unitycatalog_incremental.py b/ingestion/tests/unit/topology/database/test_unitycatalog_incremental.py index 0f537e9c4fb8..b1ec6712ec58 100644 --- a/ingestion/tests/unit/topology/database/test_unitycatalog_incremental.py +++ b/ingestion/tests/unit/topology/database/test_unitycatalog_incremental.py @@ -92,6 +92,27 @@ def test_split_full_name(self): assert UnityCatalogIncrementalTableProcessor._split_full_name(None) is None assert UnityCatalogIncrementalTableProcessor._split_full_name("") is None + def test_set_table_map_rejects_unsafe_catalog_name(self): + connection = Mock() + + processor = UnityCatalogIncrementalTableProcessor.create(connection) + processor.set_table_map(catalog="bad'; DROP TABLE x", start_timestamp=1000) + + connection.execute.assert_not_called() + assert processor.get_changed("schema1") == set() + assert processor.get_deleted("schema1") == set() + + def test_set_table_map_uses_exact_catalog_match_for_deletes(self): + connection = Mock() + connection.execute.side_effect = [CHANGED_ROWS, DELETED_ROWS] + + processor = UnityCatalogIncrementalTableProcessor.create(connection) + processor.set_table_map(catalog="cat", start_timestamp=1000) + + deleted_sql = str(connection.execute.call_args_list[1].args[0]) + assert "substring_index(request_params.full_name_arg, '.', 1) = 'cat'" in deleted_sql + assert "LIKE" not in deleted_sql + class TestUnityCatalogIncrementalSource: """Tests for the incremental hooks on the Unity Catalog source.""" @@ -150,6 +171,16 @@ def test_process_table_skips_filtered_table(self): assert result == [] source.status.filter.assert_called_once() + def test_process_table_failure_uses_safe_identifier(self): + source = self._make_source() + table = SimpleNamespace(table_type=None) + + result = list(UnitycatalogSource._process_table(source, table, "cat", "schema1")) + + assert result == [] + source.status.failed.assert_called_once() + assert source.status.failed.call_args.args[0].name == "" + def test_get_incremental_tables_records_deletes_and_yields_changes(self): source = self._make_source() processor = Mock()