From cedc2ac7cda7fe73e4d6044dc473b1eae7d537e9 Mon Sep 17 00:00:00 2001 From: SumanMaharana Date: Tue, 10 Mar 2026 13:50:35 +0530 Subject: [PATCH 1/2] remove text from sql queries --- ingestion/tests/cli_e2e/common_e2e_sqa_mixins.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/ingestion/tests/cli_e2e/common_e2e_sqa_mixins.py b/ingestion/tests/cli_e2e/common_e2e_sqa_mixins.py index 390fc8459f3e..5d60422c8896 100644 --- a/ingestion/tests/cli_e2e/common_e2e_sqa_mixins.py +++ b/ingestion/tests/cli_e2e/common_e2e_sqa_mixins.py @@ -12,28 +12,27 @@ """ Add Common E2E Sqlalchemy Mixins """ -from sqlalchemy import text class SQACommonMethods: def create_table_and_view(self) -> None: with self.engine.begin() as connection: - connection.execute(text(self.create_table_query)) + connection.exec_driver_sql(self.create_table_query) for insert_query in self.insert_data_queries: - connection.execute(text(insert_query)) - connection.execute(text(self.create_view_query)) + connection.exec_driver_sql(insert_query) + connection.exec_driver_sql(self.create_view_query) def delete_table_and_view(self) -> None: with self.engine.begin() as connection: - connection.execute(text(self.drop_view_query)) - connection.execute(text(self.drop_table_query)) + connection.exec_driver_sql(self.drop_view_query) + connection.exec_driver_sql(self.drop_table_query) def run_update_queries(self) -> None: with self.engine.begin() as connection: for update_query in self.update_queries(): - connection.execute(text(update_query)) + connection.exec_driver_sql(update_query) def run_delete_queries(self) -> None: with self.engine.begin() as connection: for drop_query in self.delete_queries(): - connection.execute(text(drop_query)) + connection.exec_driver_sql(drop_query) From 5a008ca0638c76aebb421edce000b561de8eb2c0 Mon Sep 17 00:00:00 2001 From: SumanMaharana Date: Tue, 10 Mar 2026 14:13:59 +0530 Subject: [PATCH 2/2] added ddl fixes --- .../ingestion/source/database/redshift/metadata.py | 2 ++ .../ingestion/source/database/redshift/utils.py | 10 ++++++++++ ingestion/src/metadata/utils/sqlalchemy_utils.py | 9 +++++++++ 3 files changed, 21 insertions(+) diff --git a/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py b/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py index 69b9786e6b83..136889c8ba77 100644 --- a/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py @@ -95,6 +95,7 @@ get_multi_columns, get_redshift_columns, get_table_comment, + get_temp_table_names, get_view_definition, ) from metadata.utils import fqn @@ -132,6 +133,7 @@ RedshiftDialect.get_all_table_comments = get_all_table_comments RedshiftDialect.get_table_comment = get_table_comment RedshiftDialect.get_view_definition = get_view_definition +RedshiftDialect.get_temp_table_names = get_temp_table_names RedshiftDialect._get_redshift_columns = get_redshift_columns RedshiftDialect._get_all_relation_info = ( # pylint: disable=protected-access _get_all_relation_info diff --git a/ingestion/src/metadata/ingestion/source/database/redshift/utils.py b/ingestion/src/metadata/ingestion/source/database/redshift/utils.py index 32e1551f842b..79fa2a952797 100644 --- a/ingestion/src/metadata/ingestion/source/database/redshift/utils.py +++ b/ingestion/src/metadata/ingestion/source/database/redshift/utils.py @@ -74,6 +74,16 @@ def _load_domains(self, connection, **kw): return {} +def get_temp_table_names(self, connection, schema=None, **kw): + """ + Override PGDialect's get_temp_table_names to avoid querying + pg_catalog.pg_class.relpersistence which does not exist in Redshift, + causing a ProgrammingError that aborts the transaction and breaks all + subsequent queries. + """ + return [] + + def get_multi_columns(self, connection, **kw): """ Override PGDialect's get_multi_columns to avoid querying diff --git a/ingestion/src/metadata/utils/sqlalchemy_utils.py b/ingestion/src/metadata/utils/sqlalchemy_utils.py index 1b344282d2e9..e8fdc2a22f57 100644 --- a/ingestion/src/metadata/utils/sqlalchemy_utils.py +++ b/ingestion/src/metadata/utils/sqlalchemy_utils.py @@ -18,6 +18,7 @@ from sqlalchemy import text from sqlalchemy.engine import Engine, reflection +from sqlalchemy.exc import ProgrammingError from sqlalchemy.schema import CreateTable, MetaData from metadata.utils.logger import ingestion_logger @@ -158,6 +159,14 @@ def get_all_table_ddls( except Exception as exc: logger.debug(traceback.format_exc()) logger.debug(f"Failed to get table ddls for {schema_name}: {exc}") + # Roll back the aborted transaction so the connection remains usable + # for subsequent queries (e.g. get_table_comment). Without this, + # psycopg2 raises InFailedSqlTransaction on every query that follows. + if isinstance(exc, ProgrammingError): + try: + connection.rollback() + except Exception: + pass def get_table_ddl_wrapper(