Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Incremental Processor for Unity Catalog.

Detects, for a single catalog and since a given watermark:
- changed tables, via `information_schema.tables.last_altered`
- deleted tables, via the `system.access.audit` `deleteTable` events

Both queries degrade gracefully: a failure on either leaves that map empty and
logs a warning so the rest of the ingestion can proceed (e.g. when the
`system.access` schema is not enabled).
"""

import traceback
from typing import Any, Callable, Dict, Optional, Set, Tuple # noqa: UP035

from sqlalchemy.engine import Connection
from sqlalchemy.sql import text

from metadata.ingestion.source.database.unitycatalog.queries import (
UNITY_CATALOG_GET_CHANGED_TABLES,
UNITY_CATALOG_GET_DELETED_TABLES,
)
from metadata.utils.logger import ingestion_logger

logger = ingestion_logger()

SchemaToTables = Dict[str, Set[str]] # noqa: UP006


class UnityCatalogIncrementalTableProcessor:
"""Prepares the data needed for Unity Catalog incremental metadata extraction."""

def __init__(self, connection: Connection):
self.connection = connection
self._changed_map: SchemaToTables = {}
self._deleted_map: SchemaToTables = {}

@classmethod
def create(cls, connection: Connection) -> "UnityCatalogIncrementalTableProcessor":
return cls(connection)

def set_table_map(self, catalog: str, start_timestamp: int) -> None:
"""Populate the changed and deleted table maps for a single catalog."""
self._changed_map = self._run(
UNITY_CATALOG_GET_CHANGED_TABLES,
catalog,
start_timestamp,
row_to_schema_table=lambda row: (row[0], row[1]),
context="changed tables (information_schema)",
)
self._deleted_map = self._run(
UNITY_CATALOG_GET_DELETED_TABLES,
catalog,
start_timestamp,
row_to_schema_table=lambda row: self._split_full_name(row[0]),
context="deleted tables (system.access.audit)",
)

def _run(
self,
query: str,
catalog: str,
start_timestamp: int,
row_to_schema_table: Callable[[Any], Optional[Tuple[str, str]]], # noqa: UP006, UP045
context: str,
) -> SchemaToTables:
"""Execute a query and bucket its rows into a {schema: {table, ...}} map.

On any failure (e.g. the system schema is not enabled), warn and return
an empty map so the ingestion can continue without delete/change data.
"""
table_map: SchemaToTables = {}
try:
rows = self.connection.execute(text(query.format(catalog=catalog, start_timestamp=start_timestamp)))
for row in rows or []:
Comment on lines +83 to +86
schema_table = row_to_schema_table(row)
if schema_table:
schema, table = schema_table
table_map.setdefault(schema, set()).add(table)
except Exception as exc:
logger.warning(
"Could not query %s for catalog [%s]; incremental detection for this source will be skipped: %s",
context,
catalog,
exc,
)
logger.debug(traceback.format_exc())
return table_map

@staticmethod
def _split_full_name(full_name: Optional[str]) -> Optional[Tuple[str, str]]: # noqa: UP045, UP006
"""Split a `catalog.schema.table` name into its (schema, table) parts."""
result = None
if full_name and full_name.count(".") == 2:
_, schema, table = full_name.split(".")
result = (schema, table)
return result

def get_changed(self, schema_name: str) -> Set[str]: # noqa: UP006
"""Return the names of tables changed since the watermark for a schema."""
return self._changed_map.get(schema_name, set())

def get_deleted(self, schema_name: str) -> Set[str]: # noqa: UP006
"""Return the names of tables deleted since the watermark for a schema."""
return self._deleted_map.get(schema_name, set())
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
Markdown,
)
from metadata.generated.schema.type.entityReferenceList import EntityReferenceList
from metadata.ingestion.api.delete import delete_entity_by_name
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
Expand All @@ -65,13 +66,19 @@
from metadata.ingestion.source.database.external_table_lineage_mixin import (
ExternalTableLineageMixin,
)
from metadata.ingestion.source.database.incremental_metadata_extraction import (
IncrementalConfig,
)
from metadata.ingestion.source.database.multi_db_source import MultiDBSource
from metadata.ingestion.source.database.stored_procedures_mixin import QueryByProcedure
from metadata.ingestion.source.database.unitycatalog.client import UnityCatalogClient
from metadata.ingestion.source.database.unitycatalog.connection import (
get_connection,
get_sqlalchemy_connection,
)
from metadata.ingestion.source.database.unitycatalog.incremental_table_processor import (
UnityCatalogIncrementalTableProcessor,
)
from metadata.ingestion.source.database.unitycatalog.models import (
ColumnJson,
ElementType,
Expand Down Expand Up @@ -108,7 +115,12 @@
"""

@retry_with_docker_host()
def __init__(self, config: WorkflowSource, metadata: OpenMetadata):
def __init__(
self,
config: WorkflowSource,
metadata: OpenMetadata,
incremental_configuration: IncrementalConfig,
):
super().__init__()
self.config = config
self.source_config: DatabaseServiceMetadataPipeline = self.config.sourceConfig.config
Expand All @@ -124,6 +136,16 @@
self._catalog_cache: dict[str, Any] = {}
self._schema_cache: dict[str, Any] = {}
self._owner_cache: dict[str, Optional[EntityReferenceList]] = {} # noqa: UP045

self.incremental = incremental_configuration
self.incremental_table_processor: UnityCatalogIncrementalTableProcessor | None = None
self.context.get_global().deleted_tables = []

Check failure on line 142 in ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py

View workflow job for this annotation

GitHub Actions / Unit Tests & Static Checks (3.10)

Cannot assign to attribute "deleted_tables" for class "TopologyContext"   Attribute "deleted_tables" is unknown (reportAttributeAccessIssue)
if self.incremental.enabled:
logger.info(
"Starting Incremental Metadata Extraction.\n\t Considering Table changes from %s",
self.incremental.start_datetime_utc,
)

self.test_connection()

self._sql_connection_map = {}
Expand Down Expand Up @@ -156,7 +178,8 @@
connection: UnityCatalogConnection = config.serviceConnection.root.config
if not isinstance(connection, UnityCatalogConnection):
raise InvalidSourceException(f"Expected UnityCatalogConnection, but got {connection}")
return cls(config, metadata)
incremental_config = IncrementalConfig.create(config.sourceConfig.config.incremental, pipeline_name, metadata)

Check failure on line 181 in ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py

View workflow job for this annotation

GitHub Actions / Unit Tests & Static Checks (3.10)

Cannot access attribute "incremental" for class "MlModelServiceMetadataPipeline"   Attribute "incremental" is unknown (reportAttributeAccessIssue)

Check failure on line 181 in ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py

View workflow job for this annotation

GitHub Actions / Unit Tests & Static Checks (3.10)

Cannot access attribute "incremental" for class "PipelineServiceMetadataPipeline"   Attribute "incremental" is unknown (reportAttributeAccessIssue)

Check failure on line 181 in ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py

View workflow job for this annotation

GitHub Actions / Unit Tests & Static Checks (3.10)

Cannot access attribute "incremental" for class "DatabaseServiceAutoClassificationPipeline"   Attribute "incremental" is unknown (reportAttributeAccessIssue)

Check failure on line 181 in ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py

View workflow job for this annotation

GitHub Actions / Unit Tests & Static Checks (3.10)

Cannot access attribute "incremental" for class "DatabaseServiceProfilerPipeline"   Attribute "incremental" is unknown (reportAttributeAccessIssue)

Check failure on line 181 in ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py

View workflow job for this annotation

GitHub Actions / Unit Tests & Static Checks (3.10)

Cannot access attribute "incremental" for class "MessagingServiceMetadataPipeline"   Attribute "incremental" is unknown (reportAttributeAccessIssue)

Check failure on line 181 in ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py

View workflow job for this annotation

GitHub Actions / Unit Tests & Static Checks (3.10)

Cannot access attribute "incremental" for class "DashboardServiceMetadataPipeline"   Attribute "incremental" is unknown (reportAttributeAccessIssue)

Check failure on line 181 in ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py

View workflow job for this annotation

GitHub Actions / Unit Tests & Static Checks (3.10)

Cannot access attribute "incremental" for class "DatabaseServiceQueryLineagePipeline"   Attribute "incremental" is unknown (reportAttributeAccessIssue)

Check failure on line 181 in ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py

View workflow job for this annotation

GitHub Actions / Unit Tests & Static Checks (3.10)

Cannot access attribute "incremental" for class "DatabaseServiceQueryUsagePipeline"   Attribute "incremental" is unknown (reportAttributeAccessIssue)

Check failure on line 181 in ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py

View workflow job for this annotation

GitHub Actions / Unit Tests & Static Checks (3.10)

Argument of type "Incremental | Unknown | None" cannot be assigned to parameter "incremental" of type "bool | None" in function "create"   Type "Incremental | Unknown | None" is not assignable to type "bool | None"     Type "Incremental" is not assignable to type "bool | None"       "Incremental" is not assignable to "bool"       "Incremental" is not assignable to "None" (reportArgumentType)
return cls(config, metadata, incremental_config)

def get_database_names(self) -> Iterable[str]:
"""
Expand All @@ -178,6 +201,7 @@
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Failed to fetch configured catalog [{configured_catalog}]: {exc}")
self._set_incremental_table_processor(configured_catalog)
yield configured_catalog
else:
for catalog_name in self.get_database_names_raw():
Expand All @@ -198,6 +222,7 @@
"Database (Catalog ID) Filtered Out",
)
continue
self._set_incremental_table_processor(catalog_name)
yield catalog_name
except Exception as exc:
self.status.failed(
Expand All @@ -208,6 +233,14 @@
)
)

def _set_incremental_table_processor(self, catalog: str) -> None:
"""Prepare the changed/deleted table maps for incremental extraction of a catalog."""
if self.incremental.enabled:
self.incremental_table_processor = UnityCatalogIncrementalTableProcessor.create(self.sql_connection)
self.incremental_table_processor.set_table_map(
catalog=catalog, start_timestamp=self.incremental.start_timestamp
)

def yield_database(self, database_name: str) -> Iterable[Either[CreateDatabaseRequest]]:
"""
From topology.
Expand Down Expand Up @@ -289,51 +322,95 @@
Fetches them up using the context information and
the inspector set when preparing the db.

In incremental mode, only the tables changed since the watermark are
fetched and processed; in full mode every table is listed.

:return: tables or views, depending on config
"""
schema_name = self.context.get().database_schema
catalog_name = self.context.get().database
for table in self.client.tables.list(
catalog_name=catalog_name,
schema_name=schema_name,
):
try:
table_name = table.name
table_fqn = fqn.build(
self.metadata,
if self.incremental.enabled and self.incremental_table_processor:
yield from self._get_incremental_tables(catalog_name, schema_name)
else:
for table in self.client.tables.list(
catalog_name=catalog_name,
schema_name=schema_name,
):
yield from self._process_table(table, catalog_name, schema_name)

def _get_incremental_tables(self, catalog_name: str, schema_name: str) -> Iterable[Tuple[str, str]]: # noqa: UP006
"""Record deleted tables and yield only the tables changed since the watermark."""
processor = self.incremental_table_processor
if processor is None:
return
changed = processor.get_changed(schema_name)
# A name in both sets was dropped and recreated within the window.
# information_schema only lists existing tables, so a changed table
# exists now and must not be marked deleted.
for table_name in processor.get_deleted(schema_name) - changed:
self.context.get_global().deleted_tables.append(
fqn.build(
metadata=self.metadata,
entity_type=Table,
service_name=self.context.get().database_service,
database_name=self.context.get().database,
schema_name=self.context.get().database_schema,
database_name=catalog_name,
schema_name=schema_name,
table_name=table_name,
)
if filter_by_table(
self.config.sourceConfig.config.tableFilterPattern, # pyright: ignore[reportAttributeAccessIssue]
(table_fqn if self.config.sourceConfig.config.useFqnForFiltering else table_name), # pyright: ignore[reportAttributeAccessIssue]
):
self.status.filter(
table_fqn,
"Table Filtered Out",
)
continue
table_type: TableType = TableType.Regular
if table.table_type:
if table.table_type.value.lower() == TableType.View.value.lower():
table_type: TableType = TableType.View
if table.table_type.value.lower() == "materialized_view":
table_type: TableType = TableType.MaterializedView
elif table.table_type.value.lower() == TableType.External.value.lower():
table_type: TableType = TableType.External
self.context.get().table_data = table
yield table_name, table_type
)
for table_name in changed:
try:
table = self.client.tables.get(f"{catalog_name}.{schema_name}.{table_name}")
except Exception as exc:
self.status.failed(
StackTraceError(
name=table.name,
error=f"Unexpected exception to get table [{table.name}]: {exc}",
name=table_name,
error=f"Unexpected exception to get changed table [{table_name}]: {exc}",
stackTrace=traceback.format_exc(),
)
)
continue
yield from self._process_table(table, catalog_name, schema_name)

def _process_table(self, table: Any, catalog_name: str, schema_name: str) -> Iterable[Tuple[str, str]]: # noqa: UP006
"""Apply filtering and table-type detection, then yield the table to the topology."""
try:
table_name = table.name
table_fqn = fqn.build(
self.metadata,
entity_type=Table,
service_name=self.context.get().database_service,
database_name=catalog_name,
schema_name=schema_name,
table_name=table_name,
)
if filter_by_table(
self.config.sourceConfig.config.tableFilterPattern, # pyright: ignore[reportAttributeAccessIssue]
(table_fqn if self.config.sourceConfig.config.useFqnForFiltering else table_name), # pyright: ignore[reportAttributeAccessIssue]
):
self.status.filter(
table_fqn,
"Table Filtered Out",
)
return
table_type: TableType = TableType.Regular
if table.table_type:
if table.table_type.value.lower() == TableType.View.value.lower():
table_type: TableType = TableType.View
if table.table_type.value.lower() == "materialized_view":
table_type: TableType = TableType.MaterializedView
elif table.table_type.value.lower() == TableType.External.value.lower():
table_type: TableType = TableType.External
self.context.get().table_data = table
yield table_name, table_type
except Exception as exc:
self.status.failed(
StackTraceError(
name=table.name,
error=f"Unexpected exception to get table [{table.name}]: {exc}",
Comment on lines +407 to +410
stackTrace=traceback.format_exc(),
)
)

def get_schema_definition(self, table_name: str, table_type: TableType, table: Any) -> Optional[str]: # noqa: UP045
"""
Expand Down Expand Up @@ -504,7 +581,28 @@
def prepare(self):
"""Nothing to prepare"""

def mark_tables_as_deleted(self):
"""
Mark tables as deleted.

In incremental mode only the tables detected as deleted (from the audit
log) are marked; in full mode the standard stale-entity scan is used.
"""
if self.incremental.enabled:
if not self.context.get().__dict__.get("database"):
raise ValueError("No Database found in the context. We cannot run the table deletion.")
if self.source_config.markDeletedTables:
logger.info(f"Mark Deleted Tables set to True. Processing database [{self.context.get().database}]")
yield from delete_entity_by_name(
self.metadata,
entity_type=Table,
entity_names=self.context.get_global().deleted_tables,
mark_deleted_entity=self.source_config.markDeletedTables,
)
else:
yield from super().mark_tables_as_deleted()

def add_complex_datatype_descriptions(self, column: Column, column_json: ColumnJson):

Check failure on line 605 in ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 20 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZ5QZwSxmCTVyzoSIWQl&open=AZ5QZwSxmCTVyzoSIWQl&pullRequest=28380
"""
Method to add descriptions to complex datatypes
"""
Expand Down
Loading
Loading