Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
get_multi_columns,
get_redshift_columns,
get_table_comment,
get_temp_table_names,
get_view_definition,
)
from metadata.utils import fqn
Expand Down Expand Up @@ -132,6 +133,7 @@
RedshiftDialect.get_all_table_comments = get_all_table_comments
RedshiftDialect.get_table_comment = get_table_comment
RedshiftDialect.get_view_definition = get_view_definition
RedshiftDialect.get_temp_table_names = get_temp_table_names
RedshiftDialect._get_redshift_columns = get_redshift_columns
RedshiftDialect._get_all_relation_info = ( # pylint: disable=protected-access
_get_all_relation_info
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
self._has_native_hstore = False


def _load_domains(self, connection, **kw):

Check warning on line 68 in ingestion/src/metadata/ingestion/source/database/redshift/utils.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Remove the unused function parameter "kw".

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZzXTKIykz7ky6K7M_aT&open=AZzXTKIykz7ky6K7M_aT&pullRequest=26367

Check warning on line 68 in ingestion/src/metadata/ingestion/source/database/redshift/utils.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Remove the unused function parameter "connection".

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZzXTKIykz7ky6K7M_aS&open=AZzXTKIykz7ky6K7M_aS&pullRequest=26367
"""
Override to return empty dict since Redshift does not support user-created
domains and pg_catalog.pg_collation does not exist in Redshift, causing a
Expand All @@ -74,6 +74,16 @@
return {}


def get_temp_table_names(self, connection, schema=None, **kw):

Check warning on line 77 in ingestion/src/metadata/ingestion/source/database/redshift/utils.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Remove the unused function parameter "kw".

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZzXTKIykz7ky6K7M_aW&open=AZzXTKIykz7ky6K7M_aW&pullRequest=26367

Check warning on line 77 in ingestion/src/metadata/ingestion/source/database/redshift/utils.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Remove the unused function parameter "connection".

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZzXTKIykz7ky6K7M_aU&open=AZzXTKIykz7ky6K7M_aU&pullRequest=26367

Check warning on line 77 in ingestion/src/metadata/ingestion/source/database/redshift/utils.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Remove the unused function parameter "schema".

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZzXTKIykz7ky6K7M_aV&open=AZzXTKIykz7ky6K7M_aV&pullRequest=26367
"""
Override PGDialect's get_temp_table_names to avoid querying
pg_catalog.pg_class.relpersistence which does not exist in Redshift,
causing a ProgrammingError that aborts the transaction and breaks all
subsequent queries.
"""
return []


def get_multi_columns(self, connection, **kw):
"""
Override PGDialect's get_multi_columns to avoid querying
Expand Down
9 changes: 9 additions & 0 deletions ingestion/src/metadata/utils/sqlalchemy_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from sqlalchemy import text
from sqlalchemy.engine import Engine, reflection
from sqlalchemy.exc import ProgrammingError
from sqlalchemy.schema import CreateTable, MetaData

from metadata.utils.logger import ingestion_logger
Expand Down Expand Up @@ -158,6 +159,14 @@ def get_all_table_ddls(
except Exception as exc:
logger.debug(traceback.format_exc())
logger.debug(f"Failed to get table ddls for {schema_name}: {exc}")
# Roll back the aborted transaction so the connection remains usable
# for subsequent queries (e.g. get_table_comment). Without this,
# psycopg2 raises InFailedSqlTransaction on every query that follows.
if isinstance(exc, ProgrammingError):

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Edge Case: Rollback only handles ProgrammingError, other DB errors also abort txn

The rollback guard at line 165 only triggers for ProgrammingError, but PostgreSQL/Redshift can abort a transaction on other exception types too (e.g., OperationalError, DatabaseError). Any unhandled aborted transaction will cause the same InFailedSqlTransaction problem for subsequent queries. Since this is inside a broad except Exception block, it would be safer to attempt rollback for any DBAPIError (the base class for all DBAPI-related SQLAlchemy exceptions).

Suggested fix:

from sqlalchemy.exc import DBAPIError
...
        if isinstance(exc, DBAPIError):
            try:
                connection.rollback()
            except Exception:
                pass

Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion

try:
connection.rollback()
except Exception:

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Quality: Silent pass on rollback failure hides connection issues

The bare except Exception: pass around connection.rollback() silently swallows all errors. If the rollback itself fails, it likely means the connection is broken, which would be useful to log for debugging production issues.

Suggested fix:

try:
    connection.rollback()
except Exception as rollback_exc:
    logger.debug(f"Failed to rollback after DDL error: {rollback_exc}")

Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion

pass


def get_table_ddl_wrapper(
Expand Down
15 changes: 7 additions & 8 deletions ingestion/tests/cli_e2e/common_e2e_sqa_mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,27 @@
"""
Add Common E2E Sqlalchemy Mixins
"""
from sqlalchemy import text


class SQACommonMethods:
def create_table_and_view(self) -> None:
with self.engine.begin() as connection:
connection.execute(text(self.create_table_query))
connection.exec_driver_sql(self.create_table_query)
for insert_query in self.insert_data_queries:
connection.execute(text(insert_query))
connection.execute(text(self.create_view_query))
connection.exec_driver_sql(insert_query)
connection.exec_driver_sql(self.create_view_query)

def delete_table_and_view(self) -> None:
with self.engine.begin() as connection:
connection.execute(text(self.drop_view_query))
connection.execute(text(self.drop_table_query))
connection.exec_driver_sql(self.drop_view_query)
connection.exec_driver_sql(self.drop_table_query)

def run_update_queries(self) -> None:
with self.engine.begin() as connection:
for update_query in self.update_queries():
connection.execute(text(update_query))
connection.exec_driver_sql(update_query)

def run_delete_queries(self) -> None:
with self.engine.begin() as connection:
for drop_query in self.delete_queries():
connection.execute(text(drop_query))
connection.exec_driver_sql(drop_query)
Loading