From 32d8092d6bac13f4fa0b87d1f23b9800c9fed6b1 Mon Sep 17 00:00:00 2001 From: Pablo Takara Date: Wed, 11 Mar 2026 15:49:34 +0100 Subject: [PATCH 01/11] fix: SQLAlchemy 2.0 e2e fixes for Redshift, Athena, and profiler - Redshift: override _pg_class_filter_scope_schema, get_multi_pk_constraint, and get_multi_unique_constraints to avoid unsupported pg_class.relpersistence and array_agg(... ORDER BY ...) queries - Athena: replace private _set_parent() with public append_column(replace_existing=True) - Profiler: gracefully skip tables with no columns instead of raising ValueError - sqlalchemy_utils: rollback on all DDL reflection failures, not just ProgrammingError --- .../source/database/redshift/metadata.py | 6 +++ .../source/database/redshift/utils.py | 44 +++++++++++++++++++ .../sqlalchemy/athena/profiler_interface.py | 8 +--- .../metadata/profiler/orm/converter/base.py | 13 +++--- .../metadata/profiler/processor/processor.py | 10 +++++ .../src/metadata/utils/sqlalchemy_utils.py | 4 ++ 6 files changed, 74 insertions(+), 11 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py b/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py index 136889c8ba77..de4bc09d2a2c 100644 --- a/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py @@ -90,9 +90,12 @@ _get_pg_column_info, _get_schema_column_info, _load_domains, + _pg_class_filter_scope_schema, _redshift_initialize, get_columns, get_multi_columns, + get_multi_pk_constraint, + get_multi_unique_constraints, get_redshift_columns, get_table_comment, get_temp_table_names, @@ -127,8 +130,11 @@ RedshiftDialectMixin._get_schema_column_info = _get_schema_column_info RedshiftDialectMixin.initialize = _redshift_initialize RedshiftDialectMixin._load_domains = _load_domains +RedshiftDialectMixin._pg_class_filter_scope_schema = _pg_class_filter_scope_schema RedshiftDialectMixin.get_columns = get_columns RedshiftDialectMixin.get_multi_columns = get_multi_columns +RedshiftDialectMixin.get_multi_pk_constraint = get_multi_pk_constraint +RedshiftDialectMixin.get_multi_unique_constraints = get_multi_unique_constraints PGDialect._get_column_info = _get_pg_column_info RedshiftDialect.get_all_table_comments = get_all_table_comments RedshiftDialect.get_table_comment = get_table_comment diff --git a/ingestion/src/metadata/ingestion/source/database/redshift/utils.py b/ingestion/src/metadata/ingestion/source/database/redshift/utils.py index 79fa2a952797..13038320bcbd 100644 --- a/ingestion/src/metadata/ingestion/source/database/redshift/utils.py +++ b/ingestion/src/metadata/ingestion/source/database/redshift/utils.py @@ -94,6 +94,50 @@ def get_multi_columns(self, connection, **kw): return self._default_multi_reflect(self.get_columns, connection, **kw) +def get_multi_pk_constraint(self, connection, **kw): + """ + Override PGDialect's get_multi_pk_constraint which uses + array_agg(... ORDER BY ...) not supported by Redshift. + Falls back to the default implementation that delegates to + the already-overridden get_pk_constraint() method. + """ + return self._default_multi_reflect(self.get_pk_constraint, connection, **kw) + + +def get_multi_unique_constraints(self, connection, **kw): + """ + Override PGDialect's get_multi_unique_constraints which uses + array_agg(... ORDER BY ...) not supported by Redshift. + Falls back to the default implementation that delegates to + the already-overridden get_unique_constraints() method. + """ + return self._default_multi_reflect(self.get_unique_constraints, connection, **kw) + + +def _pg_class_filter_scope_schema(self, query, schema, scope, pg_class_table=None): + """ + Override PGDialect's _pg_class_filter_scope_schema to skip the + pg_class.relpersistence filter which does not exist in Redshift. + """ + from sqlalchemy.dialects.postgresql import pg_catalog + + if pg_class_table is None: + pg_class_table = pg_catalog.pg_class + query = query.join( + pg_catalog.pg_namespace, + pg_catalog.pg_namespace.c.oid == pg_class_table.c.relnamespace, + ) + + if schema is None: + query = query.where( + pg_catalog.pg_table_is_visible(pg_class_table.c.oid), + pg_catalog.pg_namespace.c.nspname != "pg_catalog", + ) + else: + query = query.where(pg_catalog.pg_namespace.c.nspname == schema) + return query + + # pylint: disable=protected-access @calculate_execution_time() def get_columns(self, connection, table_name, schema=None, **kw): diff --git a/ingestion/src/metadata/profiler/interface/sqlalchemy/athena/profiler_interface.py b/ingestion/src/metadata/profiler/interface/sqlalchemy/athena/profiler_interface.py index dd56e6b782b6..ed98a6707025 100644 --- a/ingestion/src/metadata/profiler/interface/sqlalchemy/athena/profiler_interface.py +++ b/ingestion/src/metadata/profiler/interface/sqlalchemy/athena/profiler_interface.py @@ -91,9 +91,7 @@ def _get_struct_columns( table_service_type=DatabaseServiceType.Athena, _quote=False, ) - sqa_col._set_parent( # pylint: disable=protected-access - self.table.__table__ - ) + self.table.__table__.append_column(sqa_col, replace_existing=True) columns_list.append(sqa_col) else: cols = self._get_struct_columns( @@ -112,8 +110,6 @@ def get_columns(self) -> List[Column]: ) else: col = build_orm_col(idx, column_obj, DatabaseServiceType.Athena) - col._set_parent( # pylint: disable=protected-access - self.table.__table__ - ) + self.table.__table__.append_column(col, replace_existing=True) columns.append(col) return columns diff --git a/ingestion/src/metadata/profiler/orm/converter/base.py b/ingestion/src/metadata/profiler/orm/converter/base.py index 40fe402ffbef..e2c8fe34a697 100644 --- a/ingestion/src/metadata/profiler/orm/converter/base.py +++ b/ingestion/src/metadata/profiler/orm/converter/base.py @@ -24,6 +24,9 @@ from metadata.generated.schema.entity.data.table import Column, Table from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.profiler.orm.converter.converter_registry import converter_registry +from metadata.utils.logger import profiler_logger + +logger = profiler_logger() class Base(DeclarativeBase): @@ -102,7 +105,7 @@ def build_orm_col( def ometa_to_sqa_orm( table: Table, metadata: OpenMetadata, sqa_metadata_obj: Optional[MetaData] = None -) -> type: +) -> Optional[type]: """ Given an OpenMetadata instance, prepare the SQLAlchemy ORM class @@ -126,11 +129,11 @@ def ometa_to_sqa_orm( # SQA 2.x raises a hard error if no primary key columns are found (was just a warning in 1.x). # Since build_orm_col assigns PK to the first column, we need at least one column. if not table.columns: - raise ValueError( - f"Table '{table.name.root}' has no columns. " - "Cannot create ORM class without at least one column. " - "Ensure the table's column metadata was ingested correctly." + logger.warning( + "Table '%s' has no columns. Skipping ORM class creation.", + table.name.root, ) + return None orm_database_name = get_orm_database(table, metadata) # SQLite does not support schemas diff --git a/ingestion/src/metadata/profiler/processor/processor.py b/ingestion/src/metadata/profiler/processor/processor.py index d80ff40083e5..2903db6710b5 100644 --- a/ingestion/src/metadata/profiler/processor/processor.py +++ b/ingestion/src/metadata/profiler/processor/processor.py @@ -36,6 +36,9 @@ Inject, inject, ) +from metadata.utils.logger import profiler_logger + +logger = profiler_logger() class ProfilerProcessor(Processor): @@ -70,6 +73,13 @@ def name(self) -> str: return "Profiler" def _run(self, record: ProfilerSourceAndEntity) -> Either[ProfilerResponse]: + if not record.entity.columns: + logger.warning( + "Skipping profiler for table '%s': no columns found", + record.entity.fullyQualifiedName.root, + ) + return Either() + profiler_runner: Profiler = record.profiler_source.get_profiler_runner( record.entity, self.profiler_config ) diff --git a/ingestion/src/metadata/utils/sqlalchemy_utils.py b/ingestion/src/metadata/utils/sqlalchemy_utils.py index e8fdc2a22f57..20adf6a16b6d 100644 --- a/ingestion/src/metadata/utils/sqlalchemy_utils.py +++ b/ingestion/src/metadata/utils/sqlalchemy_utils.py @@ -167,6 +167,10 @@ def get_all_table_ddls( connection.rollback() except Exception: pass + try: + connection.rollback() + except Exception: + pass def get_table_ddl_wrapper( From f32a2b454398b098aae2f8b383d58fe04134dbc1 Mon Sep 17 00:00:00 2001 From: Pablo Takara Date: Wed, 11 Mar 2026 16:48:24 +0100 Subject: [PATCH 02/11] fix: Oracle get_multi_columns, sampler None-handling, partitioned CTE session - Oracle: add get_multi_columns override so SQA 2.0 batch reflection uses our custom get_columns() instead of the dialect's native implementation that returns empty column lists - Sampler processor: skip tables with no columns early (same as profiler) - Sampler: build partitioned table CTE inside session context; SQA 2.0 requires the session to be alive when .cte() is called on a Query --- .../source/database/oracle/metadata.py | 2 ++ .../ingestion/source/database/oracle/utils.py | 10 ++++++++ ingestion/src/metadata/sampler/processor.py | 9 +++++++ .../metadata/sampler/sqlalchemy/sampler.py | 25 +++++++++++++++---- 4 files changed, 41 insertions(+), 5 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/oracle/metadata.py b/ingestion/src/metadata/ingestion/source/database/oracle/metadata.py index 9926007f1f16..3146a7956b3a 100644 --- a/ingestion/src/metadata/ingestion/source/database/oracle/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/oracle/metadata.py @@ -62,6 +62,7 @@ get_all_view_definitions, get_columns, get_indexes_preserve_case, + get_multi_columns, get_mview_names, get_mview_names_dialect, get_table_comment, @@ -94,6 +95,7 @@ OracleDialect.get_table_comment = get_table_comment OracleDialect.get_columns = get_columns +OracleDialect.get_multi_columns = get_multi_columns OracleDialect._get_col_type = _get_col_type OracleDialect.get_view_definition = get_view_definition OracleDialect.get_all_view_definitions = get_all_view_definitions diff --git a/ingestion/src/metadata/ingestion/source/database/oracle/utils.py b/ingestion/src/metadata/ingestion/source/database/oracle/utils.py index b3f8880fc7c7..66bd08f0aed4 100644 --- a/ingestion/src/metadata/ingestion/source/database/oracle/utils.py +++ b/ingestion/src/metadata/ingestion/source/database/oracle/utils.py @@ -237,6 +237,16 @@ def get_columns(self, connection, table_name, schema=None, **kw): return columns +def get_multi_columns(self, connection, **kw): + """ + Override OracleDialect's get_multi_columns to ensure our custom + get_columns() is used. SQA 2.0 batch reflection bypasses get_columns() + in favor of get_multi_columns(), which would skip our custom column + metadata (system_data_type, identity columns, etc.). + """ + return self._default_multi_reflect(self.get_columns, connection, **kw) + + @reflection.cache def get_table_names(self, connection, schema=None, **kw): """ diff --git a/ingestion/src/metadata/sampler/processor.py b/ingestion/src/metadata/sampler/processor.py index 9c44d00cc7d6..787303f8895a 100644 --- a/ingestion/src/metadata/sampler/processor.py +++ b/ingestion/src/metadata/sampler/processor.py @@ -47,9 +47,12 @@ Inject, inject, ) +from metadata.utils.logger import profiler_logger from metadata.utils.profiler_utils import get_context_entities from metadata.utils.service_spec.service_spec import import_sampler_class +logger = profiler_logger() + class SamplerProcessor(Processor): """Use the profiler interface to fetch the sample data""" @@ -91,6 +94,12 @@ def name(self) -> str: def _run(self, record: ProfilerSourceAndEntity) -> Either[SamplerResponse]: """Fetch the sample data and pass it down the pipeline""" + if not record.entity.columns: + logger.warning( + "Skipping sampler for table '%s': no columns found", + record.entity.fullyQualifiedName.root, + ) + return Either() try: entity = cast(Table, record.entity) diff --git a/ingestion/src/metadata/sampler/sqlalchemy/sampler.py b/ingestion/src/metadata/sampler/sqlalchemy/sampler.py index 4f9159e44138..f5d9a37fe152 100644 --- a/ingestion/src/metadata/sampler/sqlalchemy/sampler.py +++ b/ingestion/src/metadata/sampler/sqlalchemy/sampler.py @@ -199,8 +199,7 @@ def get_dataset(self, column=None, **__) -> Union[type, AliasedClass]: and self.sample_config.profileSample == 100 ): if self.partition_details: - partitioned = self._partitioned_table() - return partitioned.cte(f"{self.get_sampler_table_name()}_partitioned") + return self._partitioned_table() return self.raw_dataset @@ -306,9 +305,25 @@ def _rdn_sample_from_user_query(self) -> Query: f"{self.get_sampler_table_name()}_user_sampled" ) - def _partitioned_table(self) -> Query: - """Return the Query object for partitioned tables""" - return self.get_partitioned_query() + def _partitioned_table(self): + """Return a CTE for partitioned tables. + + We build the CTE inside the session context because SQA 2.0 + requires the session to be alive when .cte() is called on a Query. + """ + self.partition_details = cast( + PartitionProfilerConfig, self.partition_details + ) + partition_filter = build_partition_predicate( + self.partition_details, + self.raw_dataset.__table__.c, + ) + with self.session_factory() as client: + return ( + client.query(self.raw_dataset) + .filter(partition_filter) + .cte(f"{self.get_sampler_table_name()}_partitioned") + ) def get_partitioned_query(self, query=None) -> Query: """Return the partitioned query""" From 555482e9d6c1639c2657bebcb5fb227e62e188bc Mon Sep 17 00:00:00 2001 From: TeddyCr Date: Wed, 11 Mar 2026 12:25:46 -0700 Subject: [PATCH 03/11] chore: migrate to sqa 2.0 --- .../ingestion/source/database/oracle/utils.py | 89 +++++++++++++++---- 1 file changed, 70 insertions(+), 19 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/oracle/utils.py b/ingestion/src/metadata/ingestion/source/database/oracle/utils.py index b3f8880fc7c7..8ae85e214772 100644 --- a/ingestion/src/metadata/ingestion/source/database/oracle/utils.py +++ b/ingestion/src/metadata/ingestion/source/database/oracle/utils.py @@ -155,18 +155,40 @@ def get_columns(self, connection, table_name, schema=None, **kw): dblink """ - resolve_synonyms = kw.get("oracle_resolve_synonyms", False) dblink = kw.get("dblink", "") + resolve_synonyms = kw.get("oracle_resolve_synonyms", False) info_cache = kw.get("info_cache") - (table_name, schema, dblink, _) = self._prepare_reflection_args( - connection, - table_name, - schema, - resolve_synonyms, - dblink, - info_cache=info_cache, - ) + if resolve_synonyms: + try: + rows = list( + self._get_synonyms( + connection, schema, [table_name], dblink, info_cache=info_cache + ) + ) + except Exception: + rows = [] + + if rows: + row = rows[0] + actual_name = getattr(row, "table_name", None) + actual_owner = getattr(row, "table_owner", None) + db_link_val = getattr(row, "db_link", None) + + if actual_name: + table_name = self.denormalize_name(actual_name) + if actual_owner: + schema = self.denormalize_name(actual_owner) + if db_link_val: + if not db_link_val.startswith("@"): + dblink = "@" + db_link_val + else: + dblink = db_link_val + else: + table_name = self.denormalize_name(table_name) + if schema is not None: + schema = self.denormalize_name(schema) + columns = [] char_length_col = "data_length" @@ -395,22 +417,51 @@ def get_indexes_preserve_case( **kw, ): """Override get_indexes to fix two issues when preserveIdentifierCase=True: - 1. Use original table_name (before _prepare_reflection_args uppercases it) + 1. Use original table_name (before denormalize_name uppercases it) so quoted lowercase identifiers are found in ALL_IND_COLUMNS. 2. Access result row columns case-insensitively — Oracle thick mode returns INDEX_NAME (uppercase) while thin mode returns index_name (lowercase). A lowercased dict handles both without branching. """ original_table_name = table_name - info_cache = kw.get("info_cache") - (table_name, schema, dblink, _) = self._prepare_reflection_args( - connection, - table_name, - schema, - resolve_synonyms, - dblink, - info_cache=info_cache, - ) + resolve_synonyms = kw.get("oracle_resolve_synonyms", False) + + # SQLAlchemy 2.0 removed _prepare_reflection_args; denormalize schema/table + # for the pk_constraint lookup while the index query itself uses + # original_table_name (preserve-case mode keeps identifiers as-is). + table_name = self.denormalize_name(table_name) + if schema is not None: + schema = self.denormalize_name(schema) + if dblink and not dblink.startswith("@"): + dblink = "@" + dblink + + if resolve_synonyms: + try: + rows = list( + self._get_synonyms( + connection, + schema, + [table_name], + dblink, + info_cache=kw.get("info_cache"), + ) + ) + except Exception: + rows = [] + if rows: + row = rows[0] + actual_name = getattr(row, "table_name", None) + actual_owner = getattr(row, "table_owner", None) + db_link_val = getattr(row, "db_link", None) + if actual_name: + table_name = self.denormalize_name(actual_name) + if actual_owner: + schema = self.denormalize_name(actual_owner) + if db_link_val: + if not db_link_val.startswith("@"): + dblink = "@" + db_link_val + else: + dblink = db_link_val params = {"table_name": original_table_name} text = ( From 0922db4facec713aadf6b680e964d753ed008e4b Mon Sep 17 00:00:00 2001 From: TeddyCr Date: Wed, 11 Mar 2026 15:48:25 -0700 Subject: [PATCH 04/11] chore: fix e2e profiler failures --- .../source/database/oracle/metadata.py | 2 - .../ingestion/source/database/oracle/utils.py | 10 ----- .../source/database/redshift/metadata.py | 6 --- .../source/database/redshift/utils.py | 44 ------------------- .../metadata/sampler/sqlalchemy/sampler.py | 4 +- .../cli_e2e/base/config_builders/builders.py | 35 +++++++++++++++ ingestion/tests/cli_e2e/test_cli_mssql.py | 4 -- 7 files changed, 36 insertions(+), 69 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/oracle/metadata.py b/ingestion/src/metadata/ingestion/source/database/oracle/metadata.py index 3146a7956b3a..9926007f1f16 100644 --- a/ingestion/src/metadata/ingestion/source/database/oracle/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/oracle/metadata.py @@ -62,7 +62,6 @@ get_all_view_definitions, get_columns, get_indexes_preserve_case, - get_multi_columns, get_mview_names, get_mview_names_dialect, get_table_comment, @@ -95,7 +94,6 @@ OracleDialect.get_table_comment = get_table_comment OracleDialect.get_columns = get_columns -OracleDialect.get_multi_columns = get_multi_columns OracleDialect._get_col_type = _get_col_type OracleDialect.get_view_definition = get_view_definition OracleDialect.get_all_view_definitions = get_all_view_definitions diff --git a/ingestion/src/metadata/ingestion/source/database/oracle/utils.py b/ingestion/src/metadata/ingestion/source/database/oracle/utils.py index 9a0c2601eb7c..8ae85e214772 100644 --- a/ingestion/src/metadata/ingestion/source/database/oracle/utils.py +++ b/ingestion/src/metadata/ingestion/source/database/oracle/utils.py @@ -259,16 +259,6 @@ def get_columns(self, connection, table_name, schema=None, **kw): return columns -def get_multi_columns(self, connection, **kw): - """ - Override OracleDialect's get_multi_columns to ensure our custom - get_columns() is used. SQA 2.0 batch reflection bypasses get_columns() - in favor of get_multi_columns(), which would skip our custom column - metadata (system_data_type, identity columns, etc.). - """ - return self._default_multi_reflect(self.get_columns, connection, **kw) - - @reflection.cache def get_table_names(self, connection, schema=None, **kw): """ diff --git a/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py b/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py index de4bc09d2a2c..136889c8ba77 100644 --- a/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py @@ -90,12 +90,9 @@ _get_pg_column_info, _get_schema_column_info, _load_domains, - _pg_class_filter_scope_schema, _redshift_initialize, get_columns, get_multi_columns, - get_multi_pk_constraint, - get_multi_unique_constraints, get_redshift_columns, get_table_comment, get_temp_table_names, @@ -130,11 +127,8 @@ RedshiftDialectMixin._get_schema_column_info = _get_schema_column_info RedshiftDialectMixin.initialize = _redshift_initialize RedshiftDialectMixin._load_domains = _load_domains -RedshiftDialectMixin._pg_class_filter_scope_schema = _pg_class_filter_scope_schema RedshiftDialectMixin.get_columns = get_columns RedshiftDialectMixin.get_multi_columns = get_multi_columns -RedshiftDialectMixin.get_multi_pk_constraint = get_multi_pk_constraint -RedshiftDialectMixin.get_multi_unique_constraints = get_multi_unique_constraints PGDialect._get_column_info = _get_pg_column_info RedshiftDialect.get_all_table_comments = get_all_table_comments RedshiftDialect.get_table_comment = get_table_comment diff --git a/ingestion/src/metadata/ingestion/source/database/redshift/utils.py b/ingestion/src/metadata/ingestion/source/database/redshift/utils.py index 13038320bcbd..79fa2a952797 100644 --- a/ingestion/src/metadata/ingestion/source/database/redshift/utils.py +++ b/ingestion/src/metadata/ingestion/source/database/redshift/utils.py @@ -94,50 +94,6 @@ def get_multi_columns(self, connection, **kw): return self._default_multi_reflect(self.get_columns, connection, **kw) -def get_multi_pk_constraint(self, connection, **kw): - """ - Override PGDialect's get_multi_pk_constraint which uses - array_agg(... ORDER BY ...) not supported by Redshift. - Falls back to the default implementation that delegates to - the already-overridden get_pk_constraint() method. - """ - return self._default_multi_reflect(self.get_pk_constraint, connection, **kw) - - -def get_multi_unique_constraints(self, connection, **kw): - """ - Override PGDialect's get_multi_unique_constraints which uses - array_agg(... ORDER BY ...) not supported by Redshift. - Falls back to the default implementation that delegates to - the already-overridden get_unique_constraints() method. - """ - return self._default_multi_reflect(self.get_unique_constraints, connection, **kw) - - -def _pg_class_filter_scope_schema(self, query, schema, scope, pg_class_table=None): - """ - Override PGDialect's _pg_class_filter_scope_schema to skip the - pg_class.relpersistence filter which does not exist in Redshift. - """ - from sqlalchemy.dialects.postgresql import pg_catalog - - if pg_class_table is None: - pg_class_table = pg_catalog.pg_class - query = query.join( - pg_catalog.pg_namespace, - pg_catalog.pg_namespace.c.oid == pg_class_table.c.relnamespace, - ) - - if schema is None: - query = query.where( - pg_catalog.pg_table_is_visible(pg_class_table.c.oid), - pg_catalog.pg_namespace.c.nspname != "pg_catalog", - ) - else: - query = query.where(pg_catalog.pg_namespace.c.nspname == schema) - return query - - # pylint: disable=protected-access @calculate_execution_time() def get_columns(self, connection, table_name, schema=None, **kw): diff --git a/ingestion/src/metadata/sampler/sqlalchemy/sampler.py b/ingestion/src/metadata/sampler/sqlalchemy/sampler.py index f5d9a37fe152..263ff8758532 100644 --- a/ingestion/src/metadata/sampler/sqlalchemy/sampler.py +++ b/ingestion/src/metadata/sampler/sqlalchemy/sampler.py @@ -311,9 +311,7 @@ def _partitioned_table(self): We build the CTE inside the session context because SQA 2.0 requires the session to be alive when .cte() is called on a Query. """ - self.partition_details = cast( - PartitionProfilerConfig, self.partition_details - ) + self.partition_details = cast(PartitionProfilerConfig, self.partition_details) partition_filter = build_partition_predicate( self.partition_details, self.raw_dataset.__table__.c, diff --git a/ingestion/tests/cli_e2e/base/config_builders/builders.py b/ingestion/tests/cli_e2e/base/config_builders/builders.py index 2b7b5cf03f10..83afa8b2e3cf 100644 --- a/ingestion/tests/cli_e2e/base/config_builders/builders.py +++ b/ingestion/tests/cli_e2e/base/config_builders/builders.py @@ -69,7 +69,42 @@ def build(self) -> dict: } self.config["source"]["sourceConfig"]["config"]["includeViews"] = True + # By default, use the default profiler metrics. For certain connectors that support system + # profile (BigQuery, Databricks, Redshift, Snowflake) include the system metric as well self.config["processor"] = {"type": "orm-profiler", "config": {}} + + connector = str(self.config.get("source", {}).get("type", "")).lower() + connectors_with_system = {"bigquery", "databricks", "redshift", "snowflake"} + if connector in connectors_with_system: + # Default metrics used by DefaultProfiler + the system metric + default_metric_names = [ + "rowCount", + "columnCount", + "columnNames", + "median", + "firstQuartile", + "thirdQuartile", + "mean", + "valuesCount", + "distinctCount", + "distinctProportion", + "min", + "max", + "nullCount", + "nullProportion", + "stddev", + "sum", + "uniqueCount", + "uniqueProportion", + "interQuartileRange", + "nonParametricSkew", + "system", + ] + self.config["processor"]["config"]["profiler"] = { + "name": "default_profiler", + "metrics": default_metric_names, + } + return self.config diff --git a/ingestion/tests/cli_e2e/test_cli_mssql.py b/ingestion/tests/cli_e2e/test_cli_mssql.py index 3d9ceeabfa67..1c6aef13049b 100644 --- a/ingestion/tests/cli_e2e/test_cli_mssql.py +++ b/ingestion/tests/cli_e2e/test_cli_mssql.py @@ -150,10 +150,6 @@ def get_profiler_time_partition_results() -> dict: "distinctProportion": 1.0, "duplicateCount": None, "firstQuartile": 2.5, - "histogram": { - "boundaries": ["1.000 to 3.773", "3.773 and up"], - "frequencies": [1, 2], - }, "interQuartileRange": 2.0, "max": 5.0, "maxLength": None, From 8ee6014b508552b3e91b51a85767054eaded4ecb Mon Sep 17 00:00:00 2001 From: TeddyCr Date: Wed, 11 Mar 2026 16:32:45 -0700 Subject: [PATCH 05/11] chore: fix return core statement --- .../src/metadata/sampler/sqlalchemy/sampler.py | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/ingestion/src/metadata/sampler/sqlalchemy/sampler.py b/ingestion/src/metadata/sampler/sqlalchemy/sampler.py index 263ff8758532..b1dc77361326 100644 --- a/ingestion/src/metadata/sampler/sqlalchemy/sampler.py +++ b/ingestion/src/metadata/sampler/sqlalchemy/sampler.py @@ -15,7 +15,7 @@ import hashlib from typing import List, Optional, Union, cast -from sqlalchemy import Column, inspect, text +from sqlalchemy import Column, inspect, text, select from sqlalchemy.orm import Query from sqlalchemy.orm.util import AliasedClass from sqlalchemy.schema import Table @@ -308,20 +308,15 @@ def _rdn_sample_from_user_query(self) -> Query: def _partitioned_table(self): """Return a CTE for partitioned tables. - We build the CTE inside the session context because SQA 2.0 - requires the session to be alive when .cte() is called on a Query. + Build the CTE using Core select() so it does not require an active Session. """ self.partition_details = cast(PartitionProfilerConfig, self.partition_details) partition_filter = build_partition_predicate( self.partition_details, self.raw_dataset.__table__.c, ) - with self.session_factory() as client: - return ( - client.query(self.raw_dataset) - .filter(partition_filter) - .cte(f"{self.get_sampler_table_name()}_partitioned") - ) + stmt = select(self.raw_dataset).where(partition_filter) + return stmt.cte(f"{self.get_sampler_table_name()}_partitioned") def get_partitioned_query(self, query=None) -> Query: """Return the partitioned query""" @@ -335,8 +330,8 @@ def get_partitioned_query(self, query=None) -> Query: if query is not None: return query.filter(partition_filter) - with self.session_factory() as client: - return client.query(self.raw_dataset).filter(partition_filter) + # Return a Core select so callers do not require an active Session + return select(self.raw_dataset).where(partition_filter) def get_columns(self): """get columns from entity""" From ac87175aad5cba16188ca77f8efa79b97e5da056 Mon Sep 17 00:00:00 2001 From: TeddyCr Date: Wed, 11 Mar 2026 16:33:29 -0700 Subject: [PATCH 06/11] chore: ran python linting --- ingestion/src/metadata/sampler/sqlalchemy/sampler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ingestion/src/metadata/sampler/sqlalchemy/sampler.py b/ingestion/src/metadata/sampler/sqlalchemy/sampler.py index b1dc77361326..20e3825025f0 100644 --- a/ingestion/src/metadata/sampler/sqlalchemy/sampler.py +++ b/ingestion/src/metadata/sampler/sqlalchemy/sampler.py @@ -15,7 +15,7 @@ import hashlib from typing import List, Optional, Union, cast -from sqlalchemy import Column, inspect, text, select +from sqlalchemy import Column, inspect, select, text from sqlalchemy.orm import Query from sqlalchemy.orm.util import AliasedClass from sqlalchemy.schema import Table From a0d7b548b343e64185f40685f2fd0d8552084bb4 Mon Sep 17 00:00:00 2001 From: TeddyCr Date: Wed, 11 Mar 2026 17:43:15 -0700 Subject: [PATCH 07/11] chore: adjust expectation --- ingestion/tests/cli_e2e/test_cli_datalake_s3.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ingestion/tests/cli_e2e/test_cli_datalake_s3.py b/ingestion/tests/cli_e2e/test_cli_datalake_s3.py index 2617f4e6e6f2..5fb9ce4b8084 100644 --- a/ingestion/tests/cli_e2e/test_cli_datalake_s3.py +++ b/ingestion/tests/cli_e2e/test_cli_datalake_s3.py @@ -59,7 +59,7 @@ def expected_tables() -> int: @staticmethod def expected_profiled_tables() -> int: - return 6 + return 5 def expected_sample_size(self) -> int: return 50 From f2df9a661408a139c7d821951b187112e7f68a65 Mon Sep 17 00:00:00 2001 From: TeddyCr Date: Wed, 11 Mar 2026 21:39:10 -0700 Subject: [PATCH 08/11] chore: remove skip empty columns --- ingestion/src/metadata/profiler/processor/processor.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ingestion/src/metadata/profiler/processor/processor.py b/ingestion/src/metadata/profiler/processor/processor.py index 2903db6710b5..48fa97acee63 100644 --- a/ingestion/src/metadata/profiler/processor/processor.py +++ b/ingestion/src/metadata/profiler/processor/processor.py @@ -75,10 +75,9 @@ def name(self) -> str: def _run(self, record: ProfilerSourceAndEntity) -> Either[ProfilerResponse]: if not record.entity.columns: logger.warning( - "Skipping profiler for table '%s': no columns found", + "Table '%s' has no columns — continuing to run profiler for table-level metrics", record.entity.fullyQualifiedName.root, ) - return Either() profiler_runner: Profiler = record.profiler_source.get_profiler_runner( record.entity, self.profiler_config From 1ec778ac2f5a90338d5a5ff46994046a511d3ca6 Mon Sep 17 00:00:00 2001 From: SumanMaharana Date: Thu, 12 Mar 2026 12:12:56 +0530 Subject: [PATCH 09/11] paused snowflake & bigquery multi project fixes --- .../source/database/bigquery/metadata.py | 39 +++++++++++++++++++ ingestion/tests/cli_e2e/test_cli_snowflake.py | 2 + 2 files changed, 41 insertions(+) diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py b/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py index 0923fbd4cff4..f4bd779a4813 100644 --- a/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py @@ -67,6 +67,7 @@ from metadata.ingestion.api.delete import delete_entity_by_name from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import InvalidSourceException +from metadata.ingestion.models.life_cycle import OMetaLifeCycleData from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import get_test_connection_fn @@ -448,6 +449,44 @@ def get_dataset_obj(self, schema_name: str): ) return self._current_dataset_obj + def yield_life_cycle_data(self, _) -> Iterable[Either[OMetaLifeCycleData]]: + """ + Override to skip lifecycle data for schemas whose dataset location does not + match the configured usageLocation. + + BigQuery routes INFORMATION_SCHEMA queries to the location specified in the + connection (usageLocation). When a dataset lives in a different GCP region, + the query returns a 404. Skipping early avoids one failed API call per table + in the affected schema. + """ + usage_location = getattr(self.service_connection, "usageLocation", None) + if usage_location: + schema_name = self.context.get().database_schema + try: + dataset_obj = self.get_dataset_obj(schema_name) + dataset_location = getattr(dataset_obj, "location", None) + if ( + dataset_location + and dataset_location.upper() != usage_location.upper() + ): + logger.debug( + "Skipping lifecycle data for schema '%s': dataset location '%s' " + "differs from configured usageLocation '%s'. " + "BigQuery INFORMATION_SCHEMA queries are location-specific.", + schema_name, + dataset_location, + usage_location, + ) + return + except Exception as exc: + logger.debug( + "Could not verify dataset location for schema '%s', " + "proceeding with lifecycle query: %s", + schema_name, + exc, + ) + yield from super().yield_life_cycle_data(_) + def _prefetch_policy_tags(self): """Pre-fetch all policy tags at schema level to avoid per-column API calls""" if not self.service_connection.includePolicyTags: diff --git a/ingestion/tests/cli_e2e/test_cli_snowflake.py b/ingestion/tests/cli_e2e/test_cli_snowflake.py index cfead0856abb..f05eb2c6a0a9 100644 --- a/ingestion/tests/cli_e2e/test_cli_snowflake.py +++ b/ingestion/tests/cli_e2e/test_cli_snowflake.py @@ -31,6 +31,8 @@ from .common_e2e_sqa_mixins import SQACommonMethods +# TODO: Paused due to credential issue - re-enable once credentials are restored +@pytest.mark.skip(reason="TODO: Paused due to credential issue") class SnowflakeCliTest(CliCommonDB.TestSuite, SQACommonMethods): """ Snowflake CLI Tests From 4312d722ec43f9af28f68aee6381b004de849181 Mon Sep 17 00:00:00 2001 From: SumanMaharana Date: Thu, 12 Mar 2026 13:36:55 +0530 Subject: [PATCH 10/11] fix quicksight count --- ingestion/tests/cli_e2e/test_cli_quicksight.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ingestion/tests/cli_e2e/test_cli_quicksight.py b/ingestion/tests/cli_e2e/test_cli_quicksight.py index c8a12fa8154d..2a9470cd9252 100644 --- a/ingestion/tests/cli_e2e/test_cli_quicksight.py +++ b/ingestion/tests/cli_e2e/test_cli_quicksight.py @@ -48,7 +48,7 @@ def expected_not_included_entities(self) -> int: return 6 def expected_not_included_sink_entities(self) -> int: - return 7 + return 3 def expected_filtered_mix(self) -> int: return 2 @@ -75,7 +75,7 @@ def expected_datamodels(self) -> int: return 0 def expected_dashboards_and_charts_after_patch(self) -> int: - return 7 + return 3 @pytest.mark.order(11) def test_lineage(self) -> None: From a7d1caacb8af707ca4281ab74f7f153b4750a232 Mon Sep 17 00:00:00 2001 From: SumanMaharana Date: Thu, 12 Mar 2026 15:02:06 +0530 Subject: [PATCH 11/11] ignore furture warnings --- ingestion/tests/cli_e2e/base/test_cli.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ingestion/tests/cli_e2e/base/test_cli.py b/ingestion/tests/cli_e2e/base/test_cli.py index 0e3a2df84783..2a5f5877c8a5 100644 --- a/ingestion/tests/cli_e2e/base/test_cli.py +++ b/ingestion/tests/cli_e2e/base/test_cli.py @@ -58,7 +58,9 @@ def run_command(self, command: str = "ingest", test_file_path=None) -> str: "-c", file_path, ] - process_status = subprocess.Popen(args, stderr=subprocess.PIPE) + env = os.environ.copy() + env["PYTHONWARNINGS"] = "ignore::FutureWarning" + process_status = subprocess.Popen(args, stderr=subprocess.PIPE, env=env) _, stderr = process_status.communicate() if process_status.returncode != 0: print(stderr.decode("utf-8"))