diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py b/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py index 0923fbd4cff4..f4bd779a4813 100644 --- a/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py @@ -67,6 +67,7 @@ from metadata.ingestion.api.delete import delete_entity_by_name from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import InvalidSourceException +from metadata.ingestion.models.life_cycle import OMetaLifeCycleData from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import get_test_connection_fn @@ -448,6 +449,44 @@ def get_dataset_obj(self, schema_name: str): ) return self._current_dataset_obj + def yield_life_cycle_data(self, _) -> Iterable[Either[OMetaLifeCycleData]]: + """ + Override to skip lifecycle data for schemas whose dataset location does not + match the configured usageLocation. + + BigQuery routes INFORMATION_SCHEMA queries to the location specified in the + connection (usageLocation). When a dataset lives in a different GCP region, + the query returns a 404. Skipping early avoids one failed API call per table + in the affected schema. + """ + usage_location = getattr(self.service_connection, "usageLocation", None) + if usage_location: + schema_name = self.context.get().database_schema + try: + dataset_obj = self.get_dataset_obj(schema_name) + dataset_location = getattr(dataset_obj, "location", None) + if ( + dataset_location + and dataset_location.upper() != usage_location.upper() + ): + logger.debug( + "Skipping lifecycle data for schema '%s': dataset location '%s' " + "differs from configured usageLocation '%s'. " + "BigQuery INFORMATION_SCHEMA queries are location-specific.", + schema_name, + dataset_location, + usage_location, + ) + return + except Exception as exc: + logger.debug( + "Could not verify dataset location for schema '%s', " + "proceeding with lifecycle query: %s", + schema_name, + exc, + ) + yield from super().yield_life_cycle_data(_) + def _prefetch_policy_tags(self): """Pre-fetch all policy tags at schema level to avoid per-column API calls""" if not self.service_connection.includePolicyTags: diff --git a/ingestion/src/metadata/ingestion/source/database/oracle/utils.py b/ingestion/src/metadata/ingestion/source/database/oracle/utils.py index b3f8880fc7c7..8ae85e214772 100644 --- a/ingestion/src/metadata/ingestion/source/database/oracle/utils.py +++ b/ingestion/src/metadata/ingestion/source/database/oracle/utils.py @@ -155,18 +155,40 @@ def get_columns(self, connection, table_name, schema=None, **kw): dblink """ - resolve_synonyms = kw.get("oracle_resolve_synonyms", False) dblink = kw.get("dblink", "") + resolve_synonyms = kw.get("oracle_resolve_synonyms", False) info_cache = kw.get("info_cache") - (table_name, schema, dblink, _) = self._prepare_reflection_args( - connection, - table_name, - schema, - resolve_synonyms, - dblink, - info_cache=info_cache, - ) + if resolve_synonyms: + try: + rows = list( + self._get_synonyms( + connection, schema, [table_name], dblink, info_cache=info_cache + ) + ) + except Exception: + rows = [] + + if rows: + row = rows[0] + actual_name = getattr(row, "table_name", None) + actual_owner = getattr(row, "table_owner", None) + db_link_val = getattr(row, "db_link", None) + + if actual_name: + table_name = self.denormalize_name(actual_name) + if actual_owner: + schema = self.denormalize_name(actual_owner) + if db_link_val: + if not db_link_val.startswith("@"): + dblink = "@" + db_link_val + else: + dblink = db_link_val + else: + table_name = self.denormalize_name(table_name) + if schema is not None: + schema = self.denormalize_name(schema) + columns = [] char_length_col = "data_length" @@ -395,22 +417,51 @@ def get_indexes_preserve_case( **kw, ): """Override get_indexes to fix two issues when preserveIdentifierCase=True: - 1. Use original table_name (before _prepare_reflection_args uppercases it) + 1. Use original table_name (before denormalize_name uppercases it) so quoted lowercase identifiers are found in ALL_IND_COLUMNS. 2. Access result row columns case-insensitively — Oracle thick mode returns INDEX_NAME (uppercase) while thin mode returns index_name (lowercase). A lowercased dict handles both without branching. """ original_table_name = table_name - info_cache = kw.get("info_cache") - (table_name, schema, dblink, _) = self._prepare_reflection_args( - connection, - table_name, - schema, - resolve_synonyms, - dblink, - info_cache=info_cache, - ) + resolve_synonyms = kw.get("oracle_resolve_synonyms", False) + + # SQLAlchemy 2.0 removed _prepare_reflection_args; denormalize schema/table + # for the pk_constraint lookup while the index query itself uses + # original_table_name (preserve-case mode keeps identifiers as-is). + table_name = self.denormalize_name(table_name) + if schema is not None: + schema = self.denormalize_name(schema) + if dblink and not dblink.startswith("@"): + dblink = "@" + dblink + + if resolve_synonyms: + try: + rows = list( + self._get_synonyms( + connection, + schema, + [table_name], + dblink, + info_cache=kw.get("info_cache"), + ) + ) + except Exception: + rows = [] + if rows: + row = rows[0] + actual_name = getattr(row, "table_name", None) + actual_owner = getattr(row, "table_owner", None) + db_link_val = getattr(row, "db_link", None) + if actual_name: + table_name = self.denormalize_name(actual_name) + if actual_owner: + schema = self.denormalize_name(actual_owner) + if db_link_val: + if not db_link_val.startswith("@"): + dblink = "@" + db_link_val + else: + dblink = db_link_val params = {"table_name": original_table_name} text = ( diff --git a/ingestion/src/metadata/profiler/interface/sqlalchemy/athena/profiler_interface.py b/ingestion/src/metadata/profiler/interface/sqlalchemy/athena/profiler_interface.py index dd56e6b782b6..ed98a6707025 100644 --- a/ingestion/src/metadata/profiler/interface/sqlalchemy/athena/profiler_interface.py +++ b/ingestion/src/metadata/profiler/interface/sqlalchemy/athena/profiler_interface.py @@ -91,9 +91,7 @@ def _get_struct_columns( table_service_type=DatabaseServiceType.Athena, _quote=False, ) - sqa_col._set_parent( # pylint: disable=protected-access - self.table.__table__ - ) + self.table.__table__.append_column(sqa_col, replace_existing=True) columns_list.append(sqa_col) else: cols = self._get_struct_columns( @@ -112,8 +110,6 @@ def get_columns(self) -> List[Column]: ) else: col = build_orm_col(idx, column_obj, DatabaseServiceType.Athena) - col._set_parent( # pylint: disable=protected-access - self.table.__table__ - ) + self.table.__table__.append_column(col, replace_existing=True) columns.append(col) return columns diff --git a/ingestion/src/metadata/profiler/orm/converter/base.py b/ingestion/src/metadata/profiler/orm/converter/base.py index 40fe402ffbef..e2c8fe34a697 100644 --- a/ingestion/src/metadata/profiler/orm/converter/base.py +++ b/ingestion/src/metadata/profiler/orm/converter/base.py @@ -24,6 +24,9 @@ from metadata.generated.schema.entity.data.table import Column, Table from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.profiler.orm.converter.converter_registry import converter_registry +from metadata.utils.logger import profiler_logger + +logger = profiler_logger() class Base(DeclarativeBase): @@ -102,7 +105,7 @@ def build_orm_col( def ometa_to_sqa_orm( table: Table, metadata: OpenMetadata, sqa_metadata_obj: Optional[MetaData] = None -) -> type: +) -> Optional[type]: """ Given an OpenMetadata instance, prepare the SQLAlchemy ORM class @@ -126,11 +129,11 @@ def ometa_to_sqa_orm( # SQA 2.x raises a hard error if no primary key columns are found (was just a warning in 1.x). # Since build_orm_col assigns PK to the first column, we need at least one column. if not table.columns: - raise ValueError( - f"Table '{table.name.root}' has no columns. " - "Cannot create ORM class without at least one column. " - "Ensure the table's column metadata was ingested correctly." + logger.warning( + "Table '%s' has no columns. Skipping ORM class creation.", + table.name.root, ) + return None orm_database_name = get_orm_database(table, metadata) # SQLite does not support schemas diff --git a/ingestion/src/metadata/profiler/processor/processor.py b/ingestion/src/metadata/profiler/processor/processor.py index d80ff40083e5..48fa97acee63 100644 --- a/ingestion/src/metadata/profiler/processor/processor.py +++ b/ingestion/src/metadata/profiler/processor/processor.py @@ -36,6 +36,9 @@ Inject, inject, ) +from metadata.utils.logger import profiler_logger + +logger = profiler_logger() class ProfilerProcessor(Processor): @@ -70,6 +73,12 @@ def name(self) -> str: return "Profiler" def _run(self, record: ProfilerSourceAndEntity) -> Either[ProfilerResponse]: + if not record.entity.columns: + logger.warning( + "Table '%s' has no columns — continuing to run profiler for table-level metrics", + record.entity.fullyQualifiedName.root, + ) + profiler_runner: Profiler = record.profiler_source.get_profiler_runner( record.entity, self.profiler_config ) diff --git a/ingestion/src/metadata/sampler/processor.py b/ingestion/src/metadata/sampler/processor.py index 9c44d00cc7d6..787303f8895a 100644 --- a/ingestion/src/metadata/sampler/processor.py +++ b/ingestion/src/metadata/sampler/processor.py @@ -47,9 +47,12 @@ Inject, inject, ) +from metadata.utils.logger import profiler_logger from metadata.utils.profiler_utils import get_context_entities from metadata.utils.service_spec.service_spec import import_sampler_class +logger = profiler_logger() + class SamplerProcessor(Processor): """Use the profiler interface to fetch the sample data""" @@ -91,6 +94,12 @@ def name(self) -> str: def _run(self, record: ProfilerSourceAndEntity) -> Either[SamplerResponse]: """Fetch the sample data and pass it down the pipeline""" + if not record.entity.columns: + logger.warning( + "Skipping sampler for table '%s': no columns found", + record.entity.fullyQualifiedName.root, + ) + return Either() try: entity = cast(Table, record.entity) diff --git a/ingestion/src/metadata/sampler/sqlalchemy/sampler.py b/ingestion/src/metadata/sampler/sqlalchemy/sampler.py index 4f9159e44138..20e3825025f0 100644 --- a/ingestion/src/metadata/sampler/sqlalchemy/sampler.py +++ b/ingestion/src/metadata/sampler/sqlalchemy/sampler.py @@ -15,7 +15,7 @@ import hashlib from typing import List, Optional, Union, cast -from sqlalchemy import Column, inspect, text +from sqlalchemy import Column, inspect, select, text from sqlalchemy.orm import Query from sqlalchemy.orm.util import AliasedClass from sqlalchemy.schema import Table @@ -199,8 +199,7 @@ def get_dataset(self, column=None, **__) -> Union[type, AliasedClass]: and self.sample_config.profileSample == 100 ): if self.partition_details: - partitioned = self._partitioned_table() - return partitioned.cte(f"{self.get_sampler_table_name()}_partitioned") + return self._partitioned_table() return self.raw_dataset @@ -306,9 +305,18 @@ def _rdn_sample_from_user_query(self) -> Query: f"{self.get_sampler_table_name()}_user_sampled" ) - def _partitioned_table(self) -> Query: - """Return the Query object for partitioned tables""" - return self.get_partitioned_query() + def _partitioned_table(self): + """Return a CTE for partitioned tables. + + Build the CTE using Core select() so it does not require an active Session. + """ + self.partition_details = cast(PartitionProfilerConfig, self.partition_details) + partition_filter = build_partition_predicate( + self.partition_details, + self.raw_dataset.__table__.c, + ) + stmt = select(self.raw_dataset).where(partition_filter) + return stmt.cte(f"{self.get_sampler_table_name()}_partitioned") def get_partitioned_query(self, query=None) -> Query: """Return the partitioned query""" @@ -322,8 +330,8 @@ def get_partitioned_query(self, query=None) -> Query: if query is not None: return query.filter(partition_filter) - with self.session_factory() as client: - return client.query(self.raw_dataset).filter(partition_filter) + # Return a Core select so callers do not require an active Session + return select(self.raw_dataset).where(partition_filter) def get_columns(self): """get columns from entity""" diff --git a/ingestion/src/metadata/utils/sqlalchemy_utils.py b/ingestion/src/metadata/utils/sqlalchemy_utils.py index e8fdc2a22f57..20adf6a16b6d 100644 --- a/ingestion/src/metadata/utils/sqlalchemy_utils.py +++ b/ingestion/src/metadata/utils/sqlalchemy_utils.py @@ -167,6 +167,10 @@ def get_all_table_ddls( connection.rollback() except Exception: pass + try: + connection.rollback() + except Exception: + pass def get_table_ddl_wrapper( diff --git a/ingestion/tests/cli_e2e/base/config_builders/builders.py b/ingestion/tests/cli_e2e/base/config_builders/builders.py index 2b7b5cf03f10..83afa8b2e3cf 100644 --- a/ingestion/tests/cli_e2e/base/config_builders/builders.py +++ b/ingestion/tests/cli_e2e/base/config_builders/builders.py @@ -69,7 +69,42 @@ def build(self) -> dict: } self.config["source"]["sourceConfig"]["config"]["includeViews"] = True + # By default, use the default profiler metrics. For certain connectors that support system + # profile (BigQuery, Databricks, Redshift, Snowflake) include the system metric as well self.config["processor"] = {"type": "orm-profiler", "config": {}} + + connector = str(self.config.get("source", {}).get("type", "")).lower() + connectors_with_system = {"bigquery", "databricks", "redshift", "snowflake"} + if connector in connectors_with_system: + # Default metrics used by DefaultProfiler + the system metric + default_metric_names = [ + "rowCount", + "columnCount", + "columnNames", + "median", + "firstQuartile", + "thirdQuartile", + "mean", + "valuesCount", + "distinctCount", + "distinctProportion", + "min", + "max", + "nullCount", + "nullProportion", + "stddev", + "sum", + "uniqueCount", + "uniqueProportion", + "interQuartileRange", + "nonParametricSkew", + "system", + ] + self.config["processor"]["config"]["profiler"] = { + "name": "default_profiler", + "metrics": default_metric_names, + } + return self.config diff --git a/ingestion/tests/cli_e2e/base/test_cli.py b/ingestion/tests/cli_e2e/base/test_cli.py index 0e3a2df84783..2a5f5877c8a5 100644 --- a/ingestion/tests/cli_e2e/base/test_cli.py +++ b/ingestion/tests/cli_e2e/base/test_cli.py @@ -58,7 +58,9 @@ def run_command(self, command: str = "ingest", test_file_path=None) -> str: "-c", file_path, ] - process_status = subprocess.Popen(args, stderr=subprocess.PIPE) + env = os.environ.copy() + env["PYTHONWARNINGS"] = "ignore::FutureWarning" + process_status = subprocess.Popen(args, stderr=subprocess.PIPE, env=env) _, stderr = process_status.communicate() if process_status.returncode != 0: print(stderr.decode("utf-8")) diff --git a/ingestion/tests/cli_e2e/test_cli_datalake_s3.py b/ingestion/tests/cli_e2e/test_cli_datalake_s3.py index 2617f4e6e6f2..5fb9ce4b8084 100644 --- a/ingestion/tests/cli_e2e/test_cli_datalake_s3.py +++ b/ingestion/tests/cli_e2e/test_cli_datalake_s3.py @@ -59,7 +59,7 @@ def expected_tables() -> int: @staticmethod def expected_profiled_tables() -> int: - return 6 + return 5 def expected_sample_size(self) -> int: return 50 diff --git a/ingestion/tests/cli_e2e/test_cli_mssql.py b/ingestion/tests/cli_e2e/test_cli_mssql.py index 3d9ceeabfa67..1c6aef13049b 100644 --- a/ingestion/tests/cli_e2e/test_cli_mssql.py +++ b/ingestion/tests/cli_e2e/test_cli_mssql.py @@ -150,10 +150,6 @@ def get_profiler_time_partition_results() -> dict: "distinctProportion": 1.0, "duplicateCount": None, "firstQuartile": 2.5, - "histogram": { - "boundaries": ["1.000 to 3.773", "3.773 and up"], - "frequencies": [1, 2], - }, "interQuartileRange": 2.0, "max": 5.0, "maxLength": None, diff --git a/ingestion/tests/cli_e2e/test_cli_quicksight.py b/ingestion/tests/cli_e2e/test_cli_quicksight.py index c8a12fa8154d..2a9470cd9252 100644 --- a/ingestion/tests/cli_e2e/test_cli_quicksight.py +++ b/ingestion/tests/cli_e2e/test_cli_quicksight.py @@ -48,7 +48,7 @@ def expected_not_included_entities(self) -> int: return 6 def expected_not_included_sink_entities(self) -> int: - return 7 + return 3 def expected_filtered_mix(self) -> int: return 2 @@ -75,7 +75,7 @@ def expected_datamodels(self) -> int: return 0 def expected_dashboards_and_charts_after_patch(self) -> int: - return 7 + return 3 @pytest.mark.order(11) def test_lineage(self) -> None: diff --git a/ingestion/tests/cli_e2e/test_cli_snowflake.py b/ingestion/tests/cli_e2e/test_cli_snowflake.py index cfead0856abb..f05eb2c6a0a9 100644 --- a/ingestion/tests/cli_e2e/test_cli_snowflake.py +++ b/ingestion/tests/cli_e2e/test_cli_snowflake.py @@ -31,6 +31,8 @@ from .common_e2e_sqa_mixins import SQACommonMethods +# TODO: Paused due to credential issue - re-enable once credentials are restored +@pytest.mark.skip(reason="TODO: Paused due to credential issue") class SnowflakeCliTest(CliCommonDB.TestSuite, SQACommonMethods): """ Snowflake CLI Tests