diff --git a/ingestion/src/metadata/ingestion/source/database/greenplum/metadata.py b/ingestion/src/metadata/ingestion/source/database/greenplum/metadata.py index 81b98ddab5d4..1261b03e7c70 100644 --- a/ingestion/src/metadata/ingestion/source/database/greenplum/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/greenplum/metadata.py @@ -11,6 +11,7 @@ """ Greenplum source module """ + import traceback from collections import namedtuple from typing import Iterable, Optional, Tuple @@ -54,6 +55,7 @@ get_column_info, get_columns, get_table_comment, + get_table_ddl, get_view_definition, ) from metadata.ingestion.source.database.multi_db_source import MultiDBSource @@ -64,7 +66,6 @@ get_all_table_comments, get_all_table_ddls, get_all_view_definitions, - get_table_ddl, ) TableKey = namedtuple("TableKey", ["schema", "table_name"]) @@ -149,9 +150,11 @@ def get_database_names(self) -> Iterable[str]: if filter_by_database( self.source_config.databaseFilterPattern, - database_fqn - if self.source_config.useFqnForFiltering - else new_database, + ( + database_fqn + if self.source_config.useFqnForFiltering + else new_database + ), ): self.status.filter(database_fqn, "Database Filtered Out") continue diff --git a/ingestion/src/metadata/ingestion/source/database/greenplum/queries.py b/ingestion/src/metadata/ingestion/source/database/greenplum/queries.py index 3a66c86f3e61..74ce8cb7510a 100644 --- a/ingestion/src/metadata/ingestion/source/database/greenplum/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/greenplum/queries.py @@ -26,8 +26,7 @@ and n.nspname = :schema """ -GREENPLUM_PARTITION_DETAILS = textwrap.dedent( - """ +GREENPLUM_PARTITION_DETAILS = textwrap.dedent(""" select ns.nspname as schema, par.relname as table_name, @@ -57,8 +56,7 @@ and col.table_name = par.relname and ordinal_position = pt.column_index where par.relname='{table_name}' and ns.nspname='{schema_name}' - """ -) + """) GREENPLUM_TABLE_COMMENTS = """ SELECT n.nspname as schema, @@ -135,6 +133,58 @@ AND a.attnum > 0 AND NOT a.attisdropped ORDER BY a.attnum """ +# Generates CREATE TABLE DDL for a given schema using a single bulk query. +# Replaces MetaData.reflect() which issues ~8 catalog queries per table. +# Uses pg_get_expr instead of the deprecated adsrc column (removed in PG 12). +# Includes Greenplum-specific storage options and distribution policy. +GREENPLUM_TABLE_DDLS = """ + SELECT + n.nspname AS schema_name, + c.relname AS table_name, + 'CREATE TABLE ' || n.nspname || '.' || c.relname || ' (' || chr(10) || + string_agg( + ' ' || a.attname || ' ' || + pg_catalog.format_type(a.atttypid, a.atttypmod) || + CASE WHEN a.attnotnull THEN ' NOT NULL' ELSE '' END || + CASE + WHEN d.adbin IS NOT NULL + THEN ' DEFAULT ' || pg_catalog.pg_get_expr(d.adbin, d.adrelid) + ELSE '' + END, + ',' || chr(10) ORDER BY a.attnum + ) || chr(10) || ')' || + CASE + WHEN c.reloptions IS NOT NULL + THEN chr(10) || 'WITH (' || array_to_string(c.reloptions, ', ') || ')' + ELSE '' + END || + CASE + WHEN ( + SELECT array_length(dp.distkey, 1) + FROM gp_distribution_policy dp + WHERE dp.localoid = c.oid + ) > 0 + THEN chr(10) || 'DISTRIBUTED BY (' || ( + SELECT string_agg(att.attname, ', ' ORDER BY att.attnum) + FROM gp_distribution_policy dp + JOIN pg_attribute att ON att.attrelid = c.oid + AND att.attnum = ANY(dp.distkey) + AND NOT att.attisdropped + WHERE dp.localoid = c.oid + ) || ')' + ELSE chr(10) || 'DISTRIBUTED RANDOMLY' + END AS ddl + FROM pg_class c + JOIN pg_namespace n ON n.oid = c.relnamespace + JOIN pg_attribute a ON a.attrelid = c.oid + AND a.attnum > 0 + AND NOT a.attisdropped + LEFT JOIN pg_attrdef d ON d.adrelid = c.oid AND d.adnum = a.attnum + WHERE n.nspname = :schema_name + AND c.relkind = 'r' + GROUP BY n.nspname, c.relname, c.reloptions, c.oid +""" + GREENPLUM_GET_SERVER_VERSION = """ show server_version diff --git a/ingestion/src/metadata/ingestion/source/database/greenplum/utils.py b/ingestion/src/metadata/ingestion/source/database/greenplum/utils.py index 0c91523af14c..9fb33b2520c2 100644 --- a/ingestion/src/metadata/ingestion/source/database/greenplum/utils.py +++ b/ingestion/src/metadata/ingestion/source/database/greenplum/utils.py @@ -13,6 +13,7 @@ """ Greenplum SQLAlchemy util methods """ + import re from typing import Dict, Tuple @@ -25,10 +26,12 @@ GREENPLUM_COL_IDENTITY, GREENPLUM_SQL_COLUMNS, GREENPLUM_TABLE_COMMENTS, + GREENPLUM_TABLE_DDLS, GREENPLUM_VIEW_DEFINITIONS, ) from metadata.utils.sqlalchemy_utils import ( get_table_comment_wrapper, + get_table_ddl_wrapper, get_view_definition_wrapper, ) @@ -347,3 +350,16 @@ def get_view_definition( schema=schema, query=GREENPLUM_VIEW_DEFINITIONS, ) + + +@reflection.cache +def get_table_ddl( + self, connection, table_name, schema=None, **kw +): # pylint: disable=unused-argument + return get_table_ddl_wrapper( + self, + connection=connection, + query=GREENPLUM_TABLE_DDLS, + table_name=table_name, + schema=schema, + ) diff --git a/ingestion/src/metadata/utils/sqlalchemy_utils.py b/ingestion/src/metadata/utils/sqlalchemy_utils.py index 20adf6a16b6d..8ac25fe4ae84 100644 --- a/ingestion/src/metadata/utils/sqlalchemy_utils.py +++ b/ingestion/src/metadata/utils/sqlalchemy_utils.py @@ -13,13 +13,13 @@ """ Module for sqlalchemy dialect utils """ + import traceback from typing import Dict, Optional, Tuple from sqlalchemy import text from sqlalchemy.engine import Engine, reflection from sqlalchemy.exc import ProgrammingError -from sqlalchemy.schema import CreateTable, MetaData from metadata.utils.logger import ingestion_logger @@ -36,9 +36,9 @@ def get_all_table_comments(self, connection, query): result = connection.execute(text(query) if isinstance(query, str) else query) for table in result: table_dict = {k.lower(): v for k, v in dict(table._mapping).items()} - self.all_table_comments[ - (table_dict["table_name"], table_dict["schema"]) - ] = table_dict["table_comment"] + self.all_table_comments[(table_dict["table_name"], table_dict["schema"])] = ( + table_dict["table_comment"] + ) def get_table_comment_wrapper(self, connection, query, table_name, schema=None): @@ -149,28 +149,17 @@ def get_all_table_ddls( """ Method to fetch ddl of all available tables """ - try: - self.all_table_ddls: Dict[Tuple[str, str], str] = {} - self.current_db: str = schema_name - meta = MetaData() - meta.reflect(bind=connection, schema=schema_name) - for table in meta.sorted_tables or []: - self.all_table_ddls[(table.schema, table.name)] = str(CreateTable(table)) - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.debug(f"Failed to get table ddls for {schema_name}: {exc}") - # Roll back the aborted transaction so the connection remains usable - # for subsequent queries (e.g. get_table_comment). Without this, - # psycopg2 raises InFailedSqlTransaction on every query that follows. - if isinstance(exc, ProgrammingError): - try: - connection.rollback() - except Exception: - pass - try: - connection.rollback() - except Exception: - pass + self.all_table_ddls: Dict[Tuple[str, str], str] = {} + self.current_db: str = schema_name + if query is None: + return + result = connection.execute( + text(query).bindparams(schema_name=schema_name) + if isinstance(query, str) + else query + ) + for row in result: + self.all_table_ddls[(row.schema_name, row.table_name)] = row.ddl def get_table_ddl_wrapper(