Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
from metadata.ingestion.api.delete import delete_entity_by_name
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.models.life_cycle import OMetaLifeCycleData
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_test_connection_fn
Expand Down Expand Up @@ -448,6 +449,44 @@ def get_dataset_obj(self, schema_name: str):
)
return self._current_dataset_obj

def yield_life_cycle_data(self, _) -> Iterable[Either[OMetaLifeCycleData]]:
"""
Override to skip lifecycle data for schemas whose dataset location does not
match the configured usageLocation.

BigQuery routes INFORMATION_SCHEMA queries to the location specified in the
connection (usageLocation). When a dataset lives in a different GCP region,
the query returns a 404. Skipping early avoids one failed API call per table
in the affected schema.
"""
usage_location = getattr(self.service_connection, "usageLocation", None)
if usage_location:
schema_name = self.context.get().database_schema
try:
dataset_obj = self.get_dataset_obj(schema_name)
dataset_location = getattr(dataset_obj, "location", None)
if (
dataset_location
and dataset_location.upper() != usage_location.upper()
):
logger.debug(
"Skipping lifecycle data for schema '%s': dataset location '%s' "
"differs from configured usageLocation '%s'. "
"BigQuery INFORMATION_SCHEMA queries are location-specific.",
schema_name,
dataset_location,
usage_location,
)
return
except Exception as exc:
logger.debug(
"Could not verify dataset location for schema '%s', "
"proceeding with lifecycle query: %s",
schema_name,
exc,
)
yield from super().yield_life_cycle_data(_)

def _prefetch_policy_tags(self):
"""Pre-fetch all policy tags at schema level to avoid per-column API calls"""
if not self.service_connection.includePolicyTags:
Expand Down
89 changes: 70 additions & 19 deletions ingestion/src/metadata/ingestion/source/database/oracle/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@

# pylint: disable=too-many-locals
@reflection.cache
def get_columns(self, connection, table_name, schema=None, **kw):

Check failure on line 146 in ingestion/src/metadata/ingestion/source/database/oracle/utils.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 36 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZzfd4SfAEk4RYmk077B&open=AZzfd4SfAEk4RYmk077B&pullRequest=26411
"""

Dialect method overridden to add raw data type
Expand All @@ -155,18 +155,40 @@
dblink

"""
resolve_synonyms = kw.get("oracle_resolve_synonyms", False)
dblink = kw.get("dblink", "")
resolve_synonyms = kw.get("oracle_resolve_synonyms", False)
info_cache = kw.get("info_cache")

(table_name, schema, dblink, _) = self._prepare_reflection_args(
connection,
table_name,
schema,
resolve_synonyms,
dblink,
info_cache=info_cache,
)
if resolve_synonyms:
try:
rows = list(
self._get_synonyms(
connection, schema, [table_name], dblink, info_cache=info_cache
)
)
except Exception:
rows = []

if rows:
row = rows[0]
actual_name = getattr(row, "table_name", None)
actual_owner = getattr(row, "table_owner", None)
db_link_val = getattr(row, "db_link", None)

if actual_name:
table_name = self.denormalize_name(actual_name)
if actual_owner:
schema = self.denormalize_name(actual_owner)
if db_link_val:
if not db_link_val.startswith("@"):
dblink = "@" + db_link_val
else:
dblink = db_link_val
else:
table_name = self.denormalize_name(table_name)
if schema is not None:
schema = self.denormalize_name(schema)

columns = []

char_length_col = "data_length"
Expand Down Expand Up @@ -385,32 +407,61 @@


@reflection.cache
def get_indexes_preserve_case(

Check failure on line 410 in ingestion/src/metadata/ingestion/source/database/oracle/utils.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 35 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZzfd4SfAEk4RYmk077C&open=AZzfd4SfAEk4RYmk077C&pullRequest=26411
self,
connection,
table_name,
schema=None,
resolve_synonyms=False,

Check warning on line 415 in ingestion/src/metadata/ingestion/source/database/oracle/utils.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Introduce a new variable or use its initial value before reassigning 'resolve_synonyms'.

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZzfd4SfAEk4RYmk077D&open=AZzfd4SfAEk4RYmk077D&pullRequest=26411
dblink="",
**kw,
):
"""Override get_indexes to fix two issues when preserveIdentifierCase=True:
1. Use original table_name (before _prepare_reflection_args uppercases it)
1. Use original table_name (before denormalize_name uppercases it)
so quoted lowercase identifiers are found in ALL_IND_COLUMNS.
2. Access result row columns case-insensitively — Oracle thick mode returns
INDEX_NAME (uppercase) while thin mode returns index_name (lowercase).
A lowercased dict handles both without branching.
"""
original_table_name = table_name
info_cache = kw.get("info_cache")
(table_name, schema, dblink, _) = self._prepare_reflection_args(
connection,
table_name,
schema,
resolve_synonyms,
dblink,
info_cache=info_cache,
)
resolve_synonyms = kw.get("oracle_resolve_synonyms", False)

# SQLAlchemy 2.0 removed _prepare_reflection_args; denormalize schema/table
# for the pk_constraint lookup while the index query itself uses
# original_table_name (preserve-case mode keeps identifiers as-is).
table_name = self.denormalize_name(table_name)
if schema is not None:
schema = self.denormalize_name(schema)
if dblink and not dblink.startswith("@"):
dblink = "@" + dblink

if resolve_synonyms:
try:
rows = list(
self._get_synonyms(
connection,
schema,
[table_name],
dblink,
info_cache=kw.get("info_cache"),
)
)
except Exception:
rows = []
if rows:
row = rows[0]
actual_name = getattr(row, "table_name", None)
actual_owner = getattr(row, "table_owner", None)
db_link_val = getattr(row, "db_link", None)
if actual_name:
table_name = self.denormalize_name(actual_name)
if actual_owner:
schema = self.denormalize_name(actual_owner)
if db_link_val:
if not db_link_val.startswith("@"):
dblink = "@" + db_link_val
else:
dblink = db_link_val

params = {"table_name": original_table_name}
text = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,7 @@ def _get_struct_columns(
table_service_type=DatabaseServiceType.Athena,
_quote=False,
)
sqa_col._set_parent( # pylint: disable=protected-access
self.table.__table__
)
self.table.__table__.append_column(sqa_col, replace_existing=True)
columns_list.append(sqa_col)
else:
cols = self._get_struct_columns(
Expand All @@ -112,8 +110,6 @@ def get_columns(self) -> List[Column]:
)
else:
col = build_orm_col(idx, column_obj, DatabaseServiceType.Athena)
col._set_parent( # pylint: disable=protected-access
self.table.__table__
)
self.table.__table__.append_column(col, replace_existing=True)
columns.append(col)
return columns
13 changes: 8 additions & 5 deletions ingestion/src/metadata/profiler/orm/converter/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
from metadata.generated.schema.entity.data.table import Column, Table
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.profiler.orm.converter.converter_registry import converter_registry
from metadata.utils.logger import profiler_logger

logger = profiler_logger()


class Base(DeclarativeBase):
Expand Down Expand Up @@ -102,7 +105,7 @@ def build_orm_col(

def ometa_to_sqa_orm(
table: Table, metadata: OpenMetadata, sqa_metadata_obj: Optional[MetaData] = None
) -> type:
) -> Optional[type]:
"""
Given an OpenMetadata instance, prepare
the SQLAlchemy ORM class
Expand All @@ -126,11 +129,11 @@ def ometa_to_sqa_orm(
# SQA 2.x raises a hard error if no primary key columns are found (was just a warning in 1.x).
# Since build_orm_col assigns PK to the first column, we need at least one column.
if not table.columns:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Bug: ometa_to_sqa_orm callers don't handle new None return

The PR changes ometa_to_sqa_orm to return None when a table has no columns (base.py:131-136), but none of its callers handle this. Specifically, SQAInterfaceMixin.build_table_orm in sqa_mixin.py:125 passes the result through, and SQASampler.__init__ at sampler.py:74 assigns it to self._table without a None check. Later accesses like self.raw_dataset.__table__ (sampler.py:134) will raise AttributeError: 'NoneType' object has no attribute '__table__'.

The new guards in ProfilerProcessor._run and SamplerProcessor._run (checking record.entity.columns) should prevent this path in the normal pipeline, but ometa_to_sqa_orm and build_table_orm are also called from other places (e.g., tests, integration paths). The function contract now promises Optional[type] but callers still assume a non-None return.

Suggested fix:

Either propagate the None check through `build_table_orm` and `SQASampler.__init__` (raising early with a clear error), or keep the original `raise ValueError` in `ometa_to_sqa_orm` and rely solely on the processor-level guards to skip tables with no columns before ORM creation is attempted.

Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion

raise ValueError(
f"Table '{table.name.root}' has no columns. "
"Cannot create ORM class without at least one column. "
"Ensure the table's column metadata was ingested correctly."
logger.warning(
"Table '%s' has no columns. Skipping ORM class creation.",
table.name.root,
)
return None

orm_database_name = get_orm_database(table, metadata)
# SQLite does not support schemas
Expand Down
9 changes: 9 additions & 0 deletions ingestion/src/metadata/profiler/processor/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
Inject,
inject,
)
from metadata.utils.logger import profiler_logger

logger = profiler_logger()


class ProfilerProcessor(Processor):
Expand Down Expand Up @@ -70,6 +73,12 @@ def name(self) -> str:
return "Profiler"

def _run(self, record: ProfilerSourceAndEntity) -> Either[ProfilerResponse]:
if not record.entity.columns:
logger.warning(
"Table '%s' has no columns — continuing to run profiler for table-level metrics",
record.entity.fullyQualifiedName.root,
)

profiler_runner: Profiler = record.profiler_source.get_profiler_runner(
record.entity, self.profiler_config
)
Expand Down
9 changes: 9 additions & 0 deletions ingestion/src/metadata/sampler/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,12 @@
Inject,
inject,
)
from metadata.utils.logger import profiler_logger
from metadata.utils.profiler_utils import get_context_entities
from metadata.utils.service_spec.service_spec import import_sampler_class

logger = profiler_logger()


class SamplerProcessor(Processor):
"""Use the profiler interface to fetch the sample data"""
Expand Down Expand Up @@ -91,6 +94,12 @@ def name(self) -> str:

def _run(self, record: ProfilerSourceAndEntity) -> Either[SamplerResponse]:
"""Fetch the sample data and pass it down the pipeline"""
if not record.entity.columns:
logger.warning(
"Skipping sampler for table '%s': no columns found",
record.entity.fullyQualifiedName.root,
)
return Either()

try:
entity = cast(Table, record.entity)
Expand Down
24 changes: 16 additions & 8 deletions ingestion/src/metadata/sampler/sqlalchemy/sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import hashlib
from typing import List, Optional, Union, cast

from sqlalchemy import Column, inspect, text
from sqlalchemy import Column, inspect, select, text
from sqlalchemy.orm import Query
from sqlalchemy.orm.util import AliasedClass
from sqlalchemy.schema import Table
Expand Down Expand Up @@ -199,8 +199,7 @@ def get_dataset(self, column=None, **__) -> Union[type, AliasedClass]:
and self.sample_config.profileSample == 100
):
if self.partition_details:
partitioned = self._partitioned_table()
return partitioned.cte(f"{self.get_sampler_table_name()}_partitioned")
return self._partitioned_table()

return self.raw_dataset

Expand Down Expand Up @@ -306,9 +305,18 @@ def _rdn_sample_from_user_query(self) -> Query:
f"{self.get_sampler_table_name()}_user_sampled"
)

def _partitioned_table(self) -> Query:
"""Return the Query object for partitioned tables"""
return self.get_partitioned_query()
def _partitioned_table(self):
"""Return a CTE for partitioned tables.

Build the CTE using Core select() so it does not require an active Session.
"""
self.partition_details = cast(PartitionProfilerConfig, self.partition_details)
partition_filter = build_partition_predicate(
self.partition_details,
self.raw_dataset.__table__.c,
)
stmt = select(self.raw_dataset).where(partition_filter)
return stmt.cte(f"{self.get_sampler_table_name()}_partitioned")

def get_partitioned_query(self, query=None) -> Query:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Quality: get_partitioned_query return type annotation is incorrect

After this change, get_partitioned_query(query=None) returns a Core Select object instead of an ORM Query. The method signature still declares -> Query. While this works at runtime (the sole no-arg caller in table_diff_params_setter.py:177 accesses .whereclause, which exists on both types), the annotation is now misleading and will confuse static type checkers and future callers.

Similarly, QueryRunner.__init__ in runner.py types dataset as Union[type, AliasedClass], but _partitioned_table() now returns a Core CTE — another type mismatch at the annotation level.

Suggested fix:

from sqlalchemy.sql import Select

def get_partitioned_query(self, query=None) -> Union[Query, Select]:
    ...

Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion

"""Return the partitioned query"""
Expand All @@ -322,8 +330,8 @@ def get_partitioned_query(self, query=None) -> Query:
if query is not None:
return query.filter(partition_filter)

with self.session_factory() as client:
return client.query(self.raw_dataset).filter(partition_filter)
# Return a Core select so callers do not require an active Session
return select(self.raw_dataset).where(partition_filter)

def get_columns(self):
"""get columns from entity"""
Expand Down
4 changes: 4 additions & 0 deletions ingestion/src/metadata/utils/sqlalchemy_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ def get_all_table_ddls(
connection.rollback()
except Exception:
pass
try:
connection.rollback()
except Exception:
pass


def get_table_ddl_wrapper(
Expand Down
35 changes: 35 additions & 0 deletions ingestion/tests/cli_e2e/base/config_builders/builders.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,42 @@ def build(self) -> dict:
}
self.config["source"]["sourceConfig"]["config"]["includeViews"] = True

# By default, use the default profiler metrics. For certain connectors that support system
# profile (BigQuery, Databricks, Redshift, Snowflake) include the system metric as well
self.config["processor"] = {"type": "orm-profiler", "config": {}}

connector = str(self.config.get("source", {}).get("type", "")).lower()
connectors_with_system = {"bigquery", "databricks", "redshift", "snowflake"}
if connector in connectors_with_system:
# Default metrics used by DefaultProfiler + the system metric
default_metric_names = [
"rowCount",
"columnCount",
"columnNames",
"median",
"firstQuartile",
"thirdQuartile",
"mean",
"valuesCount",
"distinctCount",
"distinctProportion",
"min",
"max",
"nullCount",
"nullProportion",
"stddev",
"sum",
"uniqueCount",
"uniqueProportion",
"interQuartileRange",
"nonParametricSkew",
"system",
]
self.config["processor"]["config"]["profiler"] = {
"name": "default_profiler",
"metrics": default_metric_names,
}

return self.config


Expand Down
4 changes: 3 additions & 1 deletion ingestion/tests/cli_e2e/base/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ def run_command(self, command: str = "ingest", test_file_path=None) -> str:
"-c",
file_path,
]
process_status = subprocess.Popen(args, stderr=subprocess.PIPE)
env = os.environ.copy()
env["PYTHONWARNINGS"] = "ignore::FutureWarning"
process_status = subprocess.Popen(args, stderr=subprocess.PIPE, env=env)
_, stderr = process_status.communicate()
if process_status.returncode != 0:
print(stderr.decode("utf-8"))
Expand Down
2 changes: 1 addition & 1 deletion ingestion/tests/cli_e2e/test_cli_datalake_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def expected_tables() -> int:

@staticmethod
def expected_profiled_tables() -> int:
return 6
return 5

def expected_sample_size(self) -> int:
return 50
Expand Down
Loading
Loading