Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 22 additions & 6 deletions ingestion/src/metadata/domain/tags/canonicalizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
wait_random_exponential,
)

from metadata.generated.schema.entity.classification.classification import Classification
from metadata.generated.schema.entity.classification.classification import (
Classification,
)
from metadata.generated.schema.entity.classification.tag import Tag
from metadata.generated.schema.type.basic import ProviderType
from metadata.ingestion.ometa.ometa_api import OpenMetadata
Expand Down Expand Up @@ -87,10 +89,15 @@ def classification(
results = self._es_search(Classification, name)
canonical = Canonical(name=name, description=default_description)
for entity in results:
if entity.provider == ProviderType.system and entity.name.root.lower() == key:
if (
entity.provider == ProviderType.system
and entity.name.root.lower() == key
):
canonical = Canonical(
name=entity.name.root,
description=entity.description.root if entity.description else default_description,
description=entity.description.root
if entity.description
else default_description,
)
break

Expand All @@ -113,7 +120,9 @@ def tag(
"""
tag_fqn = cast(
"str",
fqn.build(None, Tag, classification_name=classification_name, tag_name=tag_name),
fqn.build(
None, Tag, classification_name=classification_name, tag_name=tag_name
),
)
key = tag_fqn.lower()
with self._lock:
Expand All @@ -131,7 +140,9 @@ def tag(
):
canonical = Canonical(
name=entity.name.root,
description=entity.description.root if entity.description else default_tag_description,
description=entity.description.root
if entity.description
else default_tag_description,
)
break

Expand All @@ -142,4 +153,9 @@ def tag(
@_es_retry
def _es_search(self, entity_type: Any, search_string: str) -> Iterable[Any]:
"""Run an ES search by FQN with retries."""
return self._metadata.es_search_from_fqn(entity_type=entity_type, fqn_search_string=search_string) or []
return (
self._metadata.es_search_from_fqn(
entity_type=entity_type, fqn_search_string=search_string
)
or []
)
38 changes: 31 additions & 7 deletions ingestion/src/metadata/domain/tags/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,24 @@ def __init__(self, metadata: OpenMetadata) -> None:
self._lock = threading.Lock()

def _intern_tag_label_locked(
self, *, classification_name: str, tag_name: str, label_type: LabelType, state: State
self,
*,
classification_name: str,
tag_name: str,
label_type: LabelType,
state: State,
) -> TagLabel:
"""Return the shared ``TagLabel`` for the given key. Caller must hold ``self._lock``."""
key = _TagLabelKey(classification_name, tag_name, label_type, state)
cached = self._tag_label_cache.get(key)
if cached is not None:
return cached
tag_fqn = cast("str", fqn.build(None, Tag, classification_name=classification_name, tag_name=tag_name))
tag_fqn = cast(
"str",
fqn.build(
None, Tag, classification_name=classification_name, tag_name=tag_name
),
)
cached = TagLabel( # pyright: ignore[reportCallIssue]
tagFQN=TagFQN(tag_fqn),
labelType=label_type,
Expand All @@ -113,7 +123,10 @@ def attach(
) -> None:
"""Register a tag <-> entity association."""
if not tag_name or not tag_name.strip():
logger.debug("TagRegistry: skipping empty tag for classification %s", classification_name)
logger.debug(
"TagRegistry: skipping empty tag for classification %s",
classification_name,
)
return

with self._lock:
Expand Down Expand Up @@ -164,11 +177,19 @@ def clear_scope(self, scope_fqn: str) -> None:

with self._lock:
self._cleared_scopes.add(scope_fqn)
kept = {k: v for k, v in self._labels_by_entity.items() if k != scope_fqn and not k.startswith(prefix)}
kept = {
k: v
for k, v in self._labels_by_entity.items()
if k != scope_fqn and not k.startswith(prefix)
}
dropped = len(self._labels_by_entity) - len(kept)
self._labels_by_entity = kept
if dropped:
logger.debug("TagRegistry: cleared scope %s (%d entity labels dropped)", scope_fqn, dropped)
logger.debug(
"TagRegistry: cleared scope %s (%d entity labels dropped)",
scope_fqn,
dropped,
)

def is_known(self, tag_fqn: str) -> bool:
"""Return True if the tag FQN has been recorded (case-sensitive match)."""
Expand All @@ -183,7 +204,9 @@ def ensure_known(self, tag_fqn: str) -> bool:
if self.is_known(tag_fqn):
return True

logger.debug("TagRegistry: cache miss for %s; fetching from OpenMetadata.", tag_fqn)
logger.debug(
"TagRegistry: cache miss for %s; fetching from OpenMetadata.", tag_fqn
)
try:
entity = self._metadata.get_by_name(entity=Tag, fqn=tag_fqn)
except Exception:
Expand All @@ -192,7 +215,8 @@ def ensure_known(self, tag_fqn: str) -> bool:

if entity is None:
logger.warning(
"TagRegistry: tag %s not found in OpenMetadata; labels referencing it will be skipped.", tag_fqn
"TagRegistry: tag %s not found in OpenMetadata; labels referencing it will be skipped.",
tag_fqn,
)
return False

Expand Down
15 changes: 13 additions & 2 deletions ingestion/src/metadata/ingestion/models/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,16 @@
import queue
import threading
from functools import cache, singledispatchmethod
from typing import Annotated, Any, Dict, Generic, List, Optional, Type, TypeVar # noqa: UP035
from typing import ( # noqa: UP035
Annotated,
Any,
Dict,
Generic,
List,
Optional,
Type,
TypeVar,
)

from pydantic import BaseModel, ConfigDict, Field, create_model

Expand Down Expand Up @@ -126,7 +135,9 @@ class TopologyNode(BaseModel):
] = None
threads: Annotated[
bool,
Field(description="Flag that defines if a node is open to MultiThreading processing."),
Field(
description="Flag that defines if a node is open to MultiThreading processing."
),
] = False


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -792,7 +792,9 @@ def create_report_dashboard_lineage(
) -> Iterable[Either[CreateDashboardRequest]]:
"""Create lineage between report and dashboard"""
try:
logger.debug(f"Processing to create report and dashboard lineage for dashboard: {dashboard_details.id}")
logger.debug(
f"Processing to create report and dashboard lineage for dashboard: {dashboard_details.id}"
)
charts = dashboard_details.tiles
dashboard_fqn = fqn.build(
self.metadata,
Expand All @@ -811,9 +813,13 @@ def create_report_dashboard_lineage(
return
for chart in charts or []:
if chart.reportId:
logger.debug(f"Dashboard's chart {chart.id} is linked with report id: {str(chart.reportId)}") # noqa: RUF010
logger.debug(
f"Dashboard's chart {chart.id} is linked with report id: {str(chart.reportId)}"
) # noqa: RUF010
else:
logger.debug(f"Dashboard's chart {chart.id} is not linked with any report")
logger.debug(
f"Dashboard's chart {chart.id} is not linked with any report"
)
continue
report = self._fetch_report_from_workspace(chart.reportId)
if report:
Expand All @@ -838,7 +844,9 @@ def create_report_dashboard_lineage(
logger.debug(
f"Creating lineage between report={report.id} and dashboard={dashboard_details.id}"
)
yield self._get_add_lineage_request(to_entity=dashboard_entity, from_entity=report_entity)
yield self._get_add_lineage_request(
to_entity=dashboard_entity, from_entity=report_entity
)
else:
logger.debug(
f"Could not fetch report with report id: {str(chart.reportId)} from workspace data to create lineage with dashboard: {dashboard_details.id}" # noqa: RUF010
Expand Down Expand Up @@ -873,7 +881,9 @@ def _get_dataset_ids_from_report_datasources(self, report_id: str) -> List[str]:
if match:
dataset_ids.append(match.group(1))
if dataset_ids:
logger.debug(f"Extracted dataset IDs from report datasources API call for report_id={report_id}")
logger.debug(
f"Extracted dataset IDs from report datasources API call for report_id={report_id}"
)
return dataset_ids

def create_datamodel_report_lineage(
Expand All @@ -885,7 +895,9 @@ def create_datamodel_report_lineage(
create the lineage between datamodel and report
"""
try:
logger.debug(f"Processing to create datamodel and report lineage for report: {dashboard_details.id}")
logger.debug(
f"Processing to create datamodel and report lineage for report: {dashboard_details.id}"
)
report_fqn = fqn.build(
self.metadata,
entity_type=Dashboard,
Expand All @@ -903,13 +915,17 @@ def create_datamodel_report_lineage(
return
dataset_ids = []
if dashboard_details.datasetId:
logger.debug(f"Report linked datasetId is present in api response for report: {dashboard_details.id}")
logger.debug(
f"Report linked datasetId is present in api response for report: {dashboard_details.id}"
)
dataset_ids = [dashboard_details.datasetId]
else:
logger.debug(
f"Processing to get report datasources from API to extract datasetIds for report: {dashboard_details.id} as datasetId is not present in api response"
)
dataset_ids = self._get_dataset_ids_from_report_datasources(report_id=dashboard_details.id)
dataset_ids = self._get_dataset_ids_from_report_datasources(
report_id=dashboard_details.id
)

if dataset_ids:
for dataset_id in dataset_ids:
Expand Down Expand Up @@ -2365,7 +2381,9 @@ def _fetch_dataset_from_workspace(
return dataset_data
return None

def _fetch_report_from_workspace(self, report_id: Optional[str]) -> Optional[PowerBIReport]: # noqa: UP045
def _fetch_report_from_workspace(
self, report_id: Optional[str]
) -> Optional[PowerBIReport]: # noqa: UP045
"""
Method to search the report using id in the workspace dict
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,9 @@ def prepare(self):
def get_schema_description(self, schema_name: str) -> Optional[str]:
return self.schema_description_map.get(schema_name)

def query_table_names_and_types(self, schema_name: str) -> Iterable[TableNameAndType]:
def query_table_names_and_types(
self, schema_name: str
) -> Iterable[TableNameAndType]:
"""Return tables with proper type detection using a single Glue API pass."""
if self.glue_client:
try:
Expand All @@ -176,13 +178,19 @@ def query_table_names_and_types(self, schema_name: str) -> Iterable[TableNameAnd
for table in page.get("TableList", []):
params = table.get("Parameters", {})
table_type = (
TableType.Iceberg if params.get("table_type") == ICEBERG_TABLE_TYPE else TableType.External
TableType.Iceberg
if params.get("table_type") == ICEBERG_TABLE_TYPE
else TableType.External
)
results.append(
TableNameAndType(name=table["Name"], type_=table_type)
)
results.append(TableNameAndType(name=table["Name"], type_=table_type))
return results # noqa: TRY300
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Failed to fetch Glue table metadata for schema [{schema_name}]: {exc}")
logger.warning(
f"Failed to fetch Glue table metadata for schema [{schema_name}]: {exc}"
)
return [
TableNameAndType(name=name, type_=TableType.External)
for name in self.inspector.get_table_names(schema_name)
Expand Down Expand Up @@ -335,9 +343,9 @@ def get_table_description(
try:
table_info: dict = inspector.get_table_comment(table_name, schema_name)
table_option = inspector.get_table_options(table_name, schema_name)
self.external_location_map[(self.context.get().database, schema_name, table_name)] = table_option.get(
"awsathena_location"
)
self.external_location_map[
(self.context.get().database, schema_name, table_name)
] = table_option.get("awsathena_location")
# Catch any exception without breaking the ingestion
except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
Expand Down Expand Up @@ -368,7 +376,9 @@ def _get_columns_internal(
glue_client=self.glue_client,
)

def get_table_extensions(self, table_name: str, table_type: TableType | None = None) -> dict[str, str] | None:
def get_table_extensions(
self, table_name: str, table_type: TableType | None = None
) -> dict[str, str] | None:
if not getattr(self.source_config, "includeCustomProperties", False):
return None
if not self._string_property_type_ref:
Expand All @@ -383,9 +393,13 @@ def get_table_extensions(self, table_name: str, table_type: TableType | None = N
for prop_name, prop_value in tbl_properties.items():
if not prop_value:
continue
sanitized_name = PROPERTY_NAME_INVALID_CHARS_PATTERN.sub(PROPERTY_NAME_REPLACEMENT, prop_name)
sanitized_name = PROPERTY_NAME_INVALID_CHARS_PATTERN.sub(
PROPERTY_NAME_REPLACEMENT, prop_name
)
if len(sanitized_name) > PROPERTY_NAME_MAX_LENGTH:
sanitized_name = hashlib.md5(prop_name.encode("utf-8"), usedforsecurity=False).hexdigest()
sanitized_name = hashlib.md5(
prop_name.encode("utf-8"), usedforsecurity=False
).hexdigest()
if sanitized_name not in self._processed_prop:
try:
self.metadata.create_or_update_custom_property( # pyright: ignore[reportUnknownMemberType, reportUnusedCallResult]
Expand All @@ -410,14 +424,24 @@ def get_table_extensions(self, table_name: str, table_type: TableType | None = N
registered_properties[sanitized_name] = prop_value
return registered_properties or None

def _fetch_iceberg_properties(self, schema_name: str, table_name: str) -> dict[str, str]:
def _fetch_iceberg_properties(
self, schema_name: str, table_name: str
) -> dict[str, str]:
"""Read Iceberg native properties from Athena's `<table>$properties` metatable."""
query = text(f'SELECT key, value FROM "{schema_name}"."{table_name}$properties"')
query = text(
f'SELECT key, value FROM "{schema_name}"."{table_name}$properties"'
)
try:
with self.engine.connect() as conn:
result = conn.execute(query)
return {str(row[0]): str(row[1]) for row in result if row[0] is not None and row[1] is not None}
return {
str(row[0]): str(row[1])
for row in result
if row[0] is not None and row[1] is not None
}
except Exception as exc:
logger.debug(f"Unable to read Iceberg $properties for [{schema_name}.{table_name}]: {exc}")
logger.debug(
f"Unable to read Iceberg $properties for [{schema_name}.{table_name}]: {exc}"
)
logger.debug(traceback.format_exc())
return {}
Loading
Loading