Skip to content

Commit 4020c23

Browse files
IceS2TeddyCrSumanMaharana
authored
fix: SQLAlchemy 2.0 e2e fixes for Redshift, Oracle, Athena, MSSQL profiler (#26411)
* fix: SQLAlchemy 2.0 e2e fixes for Redshift, Athena, and profiler - Redshift: override _pg_class_filter_scope_schema, get_multi_pk_constraint, and get_multi_unique_constraints to avoid unsupported pg_class.relpersistence and array_agg(... ORDER BY ...) queries - Athena: replace private _set_parent() with public append_column(replace_existing=True) - Profiler: gracefully skip tables with no columns instead of raising ValueError - sqlalchemy_utils: rollback on all DDL reflection failures, not just ProgrammingError * fix: Oracle get_multi_columns, sampler None-handling, partitioned CTE session - Oracle: add get_multi_columns override so SQA 2.0 batch reflection uses our custom get_columns() instead of the dialect's native implementation that returns empty column lists - Sampler processor: skip tables with no columns early (same as profiler) - Sampler: build partitioned table CTE inside session context; SQA 2.0 requires the session to be alive when .cte() is called on a Query * chore: migrate to sqa 2.0 * chore: fix e2e profiler failures * chore: fix return core statement * chore: ran python linting * chore: adjust expectation * chore: remove skip empty columns * paused snowflake & bigquery multi project fixes * fix quicksight count * ignore furture warnings --------- Co-authored-by: TeddyCr <teddy.crepineau@gmail.com> Co-authored-by: SumanMaharana <sumanmaharana786@gmail.com>
1 parent da40038 commit 4020c23

14 files changed

Lines changed: 200 additions & 46 deletions

File tree

ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
from metadata.ingestion.api.delete import delete_entity_by_name
6868
from metadata.ingestion.api.models import Either
6969
from metadata.ingestion.api.steps import InvalidSourceException
70+
from metadata.ingestion.models.life_cycle import OMetaLifeCycleData
7071
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
7172
from metadata.ingestion.ometa.ometa_api import OpenMetadata
7273
from metadata.ingestion.source.connections import get_test_connection_fn
@@ -448,6 +449,44 @@ def get_dataset_obj(self, schema_name: str):
448449
)
449450
return self._current_dataset_obj
450451

452+
def yield_life_cycle_data(self, _) -> Iterable[Either[OMetaLifeCycleData]]:
453+
"""
454+
Override to skip lifecycle data for schemas whose dataset location does not
455+
match the configured usageLocation.
456+
457+
BigQuery routes INFORMATION_SCHEMA queries to the location specified in the
458+
connection (usageLocation). When a dataset lives in a different GCP region,
459+
the query returns a 404. Skipping early avoids one failed API call per table
460+
in the affected schema.
461+
"""
462+
usage_location = getattr(self.service_connection, "usageLocation", None)
463+
if usage_location:
464+
schema_name = self.context.get().database_schema
465+
try:
466+
dataset_obj = self.get_dataset_obj(schema_name)
467+
dataset_location = getattr(dataset_obj, "location", None)
468+
if (
469+
dataset_location
470+
and dataset_location.upper() != usage_location.upper()
471+
):
472+
logger.debug(
473+
"Skipping lifecycle data for schema '%s': dataset location '%s' "
474+
"differs from configured usageLocation '%s'. "
475+
"BigQuery INFORMATION_SCHEMA queries are location-specific.",
476+
schema_name,
477+
dataset_location,
478+
usage_location,
479+
)
480+
return
481+
except Exception as exc:
482+
logger.debug(
483+
"Could not verify dataset location for schema '%s', "
484+
"proceeding with lifecycle query: %s",
485+
schema_name,
486+
exc,
487+
)
488+
yield from super().yield_life_cycle_data(_)
489+
451490
def _prefetch_policy_tags(self):
452491
"""Pre-fetch all policy tags at schema level to avoid per-column API calls"""
453492
if not self.service_connection.includePolicyTags:

ingestion/src/metadata/ingestion/source/database/oracle/utils.py

Lines changed: 70 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -155,18 +155,40 @@ def get_columns(self, connection, table_name, schema=None, **kw):
155155
dblink
156156
157157
"""
158-
resolve_synonyms = kw.get("oracle_resolve_synonyms", False)
159158
dblink = kw.get("dblink", "")
159+
resolve_synonyms = kw.get("oracle_resolve_synonyms", False)
160160
info_cache = kw.get("info_cache")
161161

162-
(table_name, schema, dblink, _) = self._prepare_reflection_args(
163-
connection,
164-
table_name,
165-
schema,
166-
resolve_synonyms,
167-
dblink,
168-
info_cache=info_cache,
169-
)
162+
if resolve_synonyms:
163+
try:
164+
rows = list(
165+
self._get_synonyms(
166+
connection, schema, [table_name], dblink, info_cache=info_cache
167+
)
168+
)
169+
except Exception:
170+
rows = []
171+
172+
if rows:
173+
row = rows[0]
174+
actual_name = getattr(row, "table_name", None)
175+
actual_owner = getattr(row, "table_owner", None)
176+
db_link_val = getattr(row, "db_link", None)
177+
178+
if actual_name:
179+
table_name = self.denormalize_name(actual_name)
180+
if actual_owner:
181+
schema = self.denormalize_name(actual_owner)
182+
if db_link_val:
183+
if not db_link_val.startswith("@"):
184+
dblink = "@" + db_link_val
185+
else:
186+
dblink = db_link_val
187+
else:
188+
table_name = self.denormalize_name(table_name)
189+
if schema is not None:
190+
schema = self.denormalize_name(schema)
191+
170192
columns = []
171193

172194
char_length_col = "data_length"
@@ -395,22 +417,51 @@ def get_indexes_preserve_case(
395417
**kw,
396418
):
397419
"""Override get_indexes to fix two issues when preserveIdentifierCase=True:
398-
1. Use original table_name (before _prepare_reflection_args uppercases it)
420+
1. Use original table_name (before denormalize_name uppercases it)
399421
so quoted lowercase identifiers are found in ALL_IND_COLUMNS.
400422
2. Access result row columns case-insensitively — Oracle thick mode returns
401423
INDEX_NAME (uppercase) while thin mode returns index_name (lowercase).
402424
A lowercased dict handles both without branching.
403425
"""
404426
original_table_name = table_name
405-
info_cache = kw.get("info_cache")
406-
(table_name, schema, dblink, _) = self._prepare_reflection_args(
407-
connection,
408-
table_name,
409-
schema,
410-
resolve_synonyms,
411-
dblink,
412-
info_cache=info_cache,
413-
)
427+
resolve_synonyms = kw.get("oracle_resolve_synonyms", False)
428+
429+
# SQLAlchemy 2.0 removed _prepare_reflection_args; denormalize schema/table
430+
# for the pk_constraint lookup while the index query itself uses
431+
# original_table_name (preserve-case mode keeps identifiers as-is).
432+
table_name = self.denormalize_name(table_name)
433+
if schema is not None:
434+
schema = self.denormalize_name(schema)
435+
if dblink and not dblink.startswith("@"):
436+
dblink = "@" + dblink
437+
438+
if resolve_synonyms:
439+
try:
440+
rows = list(
441+
self._get_synonyms(
442+
connection,
443+
schema,
444+
[table_name],
445+
dblink,
446+
info_cache=kw.get("info_cache"),
447+
)
448+
)
449+
except Exception:
450+
rows = []
451+
if rows:
452+
row = rows[0]
453+
actual_name = getattr(row, "table_name", None)
454+
actual_owner = getattr(row, "table_owner", None)
455+
db_link_val = getattr(row, "db_link", None)
456+
if actual_name:
457+
table_name = self.denormalize_name(actual_name)
458+
if actual_owner:
459+
schema = self.denormalize_name(actual_owner)
460+
if db_link_val:
461+
if not db_link_val.startswith("@"):
462+
dblink = "@" + db_link_val
463+
else:
464+
dblink = db_link_val
414465

415466
params = {"table_name": original_table_name}
416467
text = (

ingestion/src/metadata/profiler/interface/sqlalchemy/athena/profiler_interface.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,7 @@ def _get_struct_columns(
9191
table_service_type=DatabaseServiceType.Athena,
9292
_quote=False,
9393
)
94-
sqa_col._set_parent( # pylint: disable=protected-access
95-
self.table.__table__
96-
)
94+
self.table.__table__.append_column(sqa_col, replace_existing=True)
9795
columns_list.append(sqa_col)
9896
else:
9997
cols = self._get_struct_columns(
@@ -112,8 +110,6 @@ def get_columns(self) -> List[Column]:
112110
)
113111
else:
114112
col = build_orm_col(idx, column_obj, DatabaseServiceType.Athena)
115-
col._set_parent( # pylint: disable=protected-access
116-
self.table.__table__
117-
)
113+
self.table.__table__.append_column(col, replace_existing=True)
118114
columns.append(col)
119115
return columns

ingestion/src/metadata/profiler/orm/converter/base.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
from metadata.generated.schema.entity.data.table import Column, Table
2525
from metadata.ingestion.ometa.ometa_api import OpenMetadata
2626
from metadata.profiler.orm.converter.converter_registry import converter_registry
27+
from metadata.utils.logger import profiler_logger
28+
29+
logger = profiler_logger()
2730

2831

2932
class Base(DeclarativeBase):
@@ -102,7 +105,7 @@ def build_orm_col(
102105

103106
def ometa_to_sqa_orm(
104107
table: Table, metadata: OpenMetadata, sqa_metadata_obj: Optional[MetaData] = None
105-
) -> type:
108+
) -> Optional[type]:
106109
"""
107110
Given an OpenMetadata instance, prepare
108111
the SQLAlchemy ORM class
@@ -126,11 +129,11 @@ def ometa_to_sqa_orm(
126129
# SQA 2.x raises a hard error if no primary key columns are found (was just a warning in 1.x).
127130
# Since build_orm_col assigns PK to the first column, we need at least one column.
128131
if not table.columns:
129-
raise ValueError(
130-
f"Table '{table.name.root}' has no columns. "
131-
"Cannot create ORM class without at least one column. "
132-
"Ensure the table's column metadata was ingested correctly."
132+
logger.warning(
133+
"Table '%s' has no columns. Skipping ORM class creation.",
134+
table.name.root,
133135
)
136+
return None
134137

135138
orm_database_name = get_orm_database(table, metadata)
136139
# SQLite does not support schemas

ingestion/src/metadata/profiler/processor/processor.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@
3636
Inject,
3737
inject,
3838
)
39+
from metadata.utils.logger import profiler_logger
40+
41+
logger = profiler_logger()
3942

4043

4144
class ProfilerProcessor(Processor):
@@ -70,6 +73,12 @@ def name(self) -> str:
7073
return "Profiler"
7174

7275
def _run(self, record: ProfilerSourceAndEntity) -> Either[ProfilerResponse]:
76+
if not record.entity.columns:
77+
logger.warning(
78+
"Table '%s' has no columns — continuing to run profiler for table-level metrics",
79+
record.entity.fullyQualifiedName.root,
80+
)
81+
7382
profiler_runner: Profiler = record.profiler_source.get_profiler_runner(
7483
record.entity, self.profiler_config
7584
)

ingestion/src/metadata/sampler/processor.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,12 @@
4747
Inject,
4848
inject,
4949
)
50+
from metadata.utils.logger import profiler_logger
5051
from metadata.utils.profiler_utils import get_context_entities
5152
from metadata.utils.service_spec.service_spec import import_sampler_class
5253

54+
logger = profiler_logger()
55+
5356

5457
class SamplerProcessor(Processor):
5558
"""Use the profiler interface to fetch the sample data"""
@@ -91,6 +94,12 @@ def name(self) -> str:
9194

9295
def _run(self, record: ProfilerSourceAndEntity) -> Either[SamplerResponse]:
9396
"""Fetch the sample data and pass it down the pipeline"""
97+
if not record.entity.columns:
98+
logger.warning(
99+
"Skipping sampler for table '%s': no columns found",
100+
record.entity.fullyQualifiedName.root,
101+
)
102+
return Either()
94103

95104
try:
96105
entity = cast(Table, record.entity)

ingestion/src/metadata/sampler/sqlalchemy/sampler.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import hashlib
1616
from typing import List, Optional, Union, cast
1717

18-
from sqlalchemy import Column, inspect, text
18+
from sqlalchemy import Column, inspect, select, text
1919
from sqlalchemy.orm import Query
2020
from sqlalchemy.orm.util import AliasedClass
2121
from sqlalchemy.schema import Table
@@ -199,8 +199,7 @@ def get_dataset(self, column=None, **__) -> Union[type, AliasedClass]:
199199
and self.sample_config.profileSample == 100
200200
):
201201
if self.partition_details:
202-
partitioned = self._partitioned_table()
203-
return partitioned.cte(f"{self.get_sampler_table_name()}_partitioned")
202+
return self._partitioned_table()
204203

205204
return self.raw_dataset
206205

@@ -306,9 +305,18 @@ def _rdn_sample_from_user_query(self) -> Query:
306305
f"{self.get_sampler_table_name()}_user_sampled"
307306
)
308307

309-
def _partitioned_table(self) -> Query:
310-
"""Return the Query object for partitioned tables"""
311-
return self.get_partitioned_query()
308+
def _partitioned_table(self):
309+
"""Return a CTE for partitioned tables.
310+
311+
Build the CTE using Core select() so it does not require an active Session.
312+
"""
313+
self.partition_details = cast(PartitionProfilerConfig, self.partition_details)
314+
partition_filter = build_partition_predicate(
315+
self.partition_details,
316+
self.raw_dataset.__table__.c,
317+
)
318+
stmt = select(self.raw_dataset).where(partition_filter)
319+
return stmt.cte(f"{self.get_sampler_table_name()}_partitioned")
312320

313321
def get_partitioned_query(self, query=None) -> Query:
314322
"""Return the partitioned query"""
@@ -322,8 +330,8 @@ def get_partitioned_query(self, query=None) -> Query:
322330
if query is not None:
323331
return query.filter(partition_filter)
324332

325-
with self.session_factory() as client:
326-
return client.query(self.raw_dataset).filter(partition_filter)
333+
# Return a Core select so callers do not require an active Session
334+
return select(self.raw_dataset).where(partition_filter)
327335

328336
def get_columns(self):
329337
"""get columns from entity"""

ingestion/src/metadata/utils/sqlalchemy_utils.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,10 @@ def get_all_table_ddls(
167167
connection.rollback()
168168
except Exception:
169169
pass
170+
try:
171+
connection.rollback()
172+
except Exception:
173+
pass
170174

171175

172176
def get_table_ddl_wrapper(

ingestion/tests/cli_e2e/base/config_builders/builders.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,42 @@ def build(self) -> dict:
6969
}
7070
self.config["source"]["sourceConfig"]["config"]["includeViews"] = True
7171

72+
# By default, use the default profiler metrics. For certain connectors that support system
73+
# profile (BigQuery, Databricks, Redshift, Snowflake) include the system metric as well
7274
self.config["processor"] = {"type": "orm-profiler", "config": {}}
75+
76+
connector = str(self.config.get("source", {}).get("type", "")).lower()
77+
connectors_with_system = {"bigquery", "databricks", "redshift", "snowflake"}
78+
if connector in connectors_with_system:
79+
# Default metrics used by DefaultProfiler + the system metric
80+
default_metric_names = [
81+
"rowCount",
82+
"columnCount",
83+
"columnNames",
84+
"median",
85+
"firstQuartile",
86+
"thirdQuartile",
87+
"mean",
88+
"valuesCount",
89+
"distinctCount",
90+
"distinctProportion",
91+
"min",
92+
"max",
93+
"nullCount",
94+
"nullProportion",
95+
"stddev",
96+
"sum",
97+
"uniqueCount",
98+
"uniqueProportion",
99+
"interQuartileRange",
100+
"nonParametricSkew",
101+
"system",
102+
]
103+
self.config["processor"]["config"]["profiler"] = {
104+
"name": "default_profiler",
105+
"metrics": default_metric_names,
106+
}
107+
73108
return self.config
74109

75110

ingestion/tests/cli_e2e/base/test_cli.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,9 @@ def run_command(self, command: str = "ingest", test_file_path=None) -> str:
5858
"-c",
5959
file_path,
6060
]
61-
process_status = subprocess.Popen(args, stderr=subprocess.PIPE)
61+
env = os.environ.copy()
62+
env["PYTHONWARNINGS"] = "ignore::FutureWarning"
63+
process_status = subprocess.Popen(args, stderr=subprocess.PIPE, env=env)
6264
_, stderr = process_status.communicate()
6365
if process_status.returncode != 0:
6466
print(stderr.decode("utf-8"))

0 commit comments

Comments
 (0)