Skip to content

Commit a0a228f

Browse files
postgres & redshift ddl and e2e fixes (#26367)
* remove text from sql queries * added ddl fixes
1 parent 38315d5 commit a0a228f

4 files changed

Lines changed: 28 additions & 8 deletions

File tree

ingestion/src/metadata/ingestion/source/database/redshift/metadata.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@
9595
get_multi_columns,
9696
get_redshift_columns,
9797
get_table_comment,
98+
get_temp_table_names,
9899
get_view_definition,
99100
)
100101
from metadata.utils import fqn
@@ -132,6 +133,7 @@
132133
RedshiftDialect.get_all_table_comments = get_all_table_comments
133134
RedshiftDialect.get_table_comment = get_table_comment
134135
RedshiftDialect.get_view_definition = get_view_definition
136+
RedshiftDialect.get_temp_table_names = get_temp_table_names
135137
RedshiftDialect._get_redshift_columns = get_redshift_columns
136138
RedshiftDialect._get_all_relation_info = ( # pylint: disable=protected-access
137139
_get_all_relation_info

ingestion/src/metadata/ingestion/source/database/redshift/utils.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,16 @@ def _load_domains(self, connection, **kw):
7474
return {}
7575

7676

77+
def get_temp_table_names(self, connection, schema=None, **kw):
78+
"""
79+
Override PGDialect's get_temp_table_names to avoid querying
80+
pg_catalog.pg_class.relpersistence which does not exist in Redshift,
81+
causing a ProgrammingError that aborts the transaction and breaks all
82+
subsequent queries.
83+
"""
84+
return []
85+
86+
7787
def get_multi_columns(self, connection, **kw):
7888
"""
7989
Override PGDialect's get_multi_columns to avoid querying

ingestion/src/metadata/utils/sqlalchemy_utils.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
from sqlalchemy import text
2020
from sqlalchemy.engine import Engine, reflection
21+
from sqlalchemy.exc import ProgrammingError
2122
from sqlalchemy.schema import CreateTable, MetaData
2223

2324
from metadata.utils.logger import ingestion_logger
@@ -158,6 +159,14 @@ def get_all_table_ddls(
158159
except Exception as exc:
159160
logger.debug(traceback.format_exc())
160161
logger.debug(f"Failed to get table ddls for {schema_name}: {exc}")
162+
# Roll back the aborted transaction so the connection remains usable
163+
# for subsequent queries (e.g. get_table_comment). Without this,
164+
# psycopg2 raises InFailedSqlTransaction on every query that follows.
165+
if isinstance(exc, ProgrammingError):
166+
try:
167+
connection.rollback()
168+
except Exception:
169+
pass
161170

162171

163172
def get_table_ddl_wrapper(

ingestion/tests/cli_e2e/common_e2e_sqa_mixins.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,28 +12,27 @@
1212
"""
1313
Add Common E2E Sqlalchemy Mixins
1414
"""
15-
from sqlalchemy import text
1615

1716

1817
class SQACommonMethods:
1918
def create_table_and_view(self) -> None:
2019
with self.engine.begin() as connection:
21-
connection.execute(text(self.create_table_query))
20+
connection.exec_driver_sql(self.create_table_query)
2221
for insert_query in self.insert_data_queries:
23-
connection.execute(text(insert_query))
24-
connection.execute(text(self.create_view_query))
22+
connection.exec_driver_sql(insert_query)
23+
connection.exec_driver_sql(self.create_view_query)
2524

2625
def delete_table_and_view(self) -> None:
2726
with self.engine.begin() as connection:
28-
connection.execute(text(self.drop_view_query))
29-
connection.execute(text(self.drop_table_query))
27+
connection.exec_driver_sql(self.drop_view_query)
28+
connection.exec_driver_sql(self.drop_table_query)
3029

3130
def run_update_queries(self) -> None:
3231
with self.engine.begin() as connection:
3332
for update_query in self.update_queries():
34-
connection.execute(text(update_query))
33+
connection.exec_driver_sql(update_query)
3534

3635
def run_delete_queries(self) -> None:
3736
with self.engine.begin() as connection:
3837
for drop_query in self.delete_queries():
39-
connection.execute(text(drop_query))
38+
connection.exec_driver_sql(drop_query)

0 commit comments

Comments
 (0)