Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"""
Greenplum source module
"""

import traceback
from collections import namedtuple
from typing import Iterable, Optional, Tuple
Expand Down Expand Up @@ -54,6 +55,7 @@
get_column_info,
get_columns,
get_table_comment,
get_table_ddl,
get_view_definition,
)
from metadata.ingestion.source.database.multi_db_source import MultiDBSource
Expand All @@ -64,7 +66,6 @@
get_all_table_comments,
get_all_table_ddls,
get_all_view_definitions,
get_table_ddl,
)

TableKey = namedtuple("TableKey", ["schema", "table_name"])
Expand Down Expand Up @@ -149,9 +150,11 @@ def get_database_names(self) -> Iterable[str]:

if filter_by_database(
self.source_config.databaseFilterPattern,
database_fqn
if self.source_config.useFqnForFiltering
else new_database,
(
database_fqn
if self.source_config.useFqnForFiltering
else new_database
),
):
self.status.filter(database_fqn, "Database Filtered Out")
continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@
and n.nspname = :schema
"""

GREENPLUM_PARTITION_DETAILS = textwrap.dedent(
"""
GREENPLUM_PARTITION_DETAILS = textwrap.dedent("""
select
ns.nspname as schema,
par.relname as table_name,
Expand Down Expand Up @@ -57,8 +56,7 @@
and col.table_name = par.relname
and ordinal_position = pt.column_index
where par.relname='{table_name}' and ns.nspname='{schema_name}'
"""
)
""")

GREENPLUM_TABLE_COMMENTS = """
SELECT n.nspname as schema,
Expand Down Expand Up @@ -135,6 +133,58 @@
AND a.attnum > 0 AND NOT a.attisdropped
ORDER BY a.attnum
"""
# Generates CREATE TABLE DDL for a given schema using a single bulk query.
# Replaces MetaData.reflect() which issues ~8 catalog queries per table.
# Uses pg_get_expr instead of the deprecated adsrc column (removed in PG 12).
# Includes Greenplum-specific storage options and distribution policy.
GREENPLUM_TABLE_DDLS = """
SELECT
n.nspname AS schema_name,
c.relname AS table_name,
'CREATE TABLE ' || n.nspname || '.' || c.relname || ' (' || chr(10) ||
string_agg(
' ' || a.attname || ' ' ||
pg_catalog.format_type(a.atttypid, a.atttypmod) ||
CASE WHEN a.attnotnull THEN ' NOT NULL' ELSE '' END ||
CASE
WHEN d.adbin IS NOT NULL
THEN ' DEFAULT ' || pg_catalog.pg_get_expr(d.adbin, d.adrelid)
ELSE ''
END,
',' || chr(10) ORDER BY a.attnum
) || chr(10) || ')' ||
CASE
WHEN c.reloptions IS NOT NULL
THEN chr(10) || 'WITH (' || array_to_string(c.reloptions, ', ') || ')'
ELSE ''
END ||
CASE
WHEN (
SELECT array_length(dp.distkey, 1)
FROM gp_distribution_policy dp
WHERE dp.localoid = c.oid
) > 0
THEN chr(10) || 'DISTRIBUTED BY (' || (
SELECT string_agg(att.attname, ', ' ORDER BY att.attnum)
FROM gp_distribution_policy dp
JOIN pg_attribute att ON att.attrelid = c.oid
AND att.attnum = ANY(dp.distkey)
AND NOT att.attisdropped
WHERE dp.localoid = c.oid
) || ')'
ELSE chr(10) || 'DISTRIBUTED RANDOMLY'
END AS ddl
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
JOIN pg_attribute a ON a.attrelid = c.oid
AND a.attnum > 0
AND NOT a.attisdropped
LEFT JOIN pg_attrdef d ON d.adrelid = c.oid AND d.adnum = a.attnum
WHERE n.nspname = :schema_name
AND c.relkind = 'r'
GROUP BY n.nspname, c.relname, c.reloptions, c.oid
"""


GREENPLUM_GET_SERVER_VERSION = """
show server_version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"""
Greenplum SQLAlchemy util methods
"""

import re
from typing import Dict, Tuple

Expand All @@ -25,10 +26,12 @@
GREENPLUM_COL_IDENTITY,
GREENPLUM_SQL_COLUMNS,
GREENPLUM_TABLE_COMMENTS,
GREENPLUM_TABLE_DDLS,
GREENPLUM_VIEW_DEFINITIONS,
)
from metadata.utils.sqlalchemy_utils import (
get_table_comment_wrapper,
get_table_ddl_wrapper,
get_view_definition_wrapper,
)

Expand Down Expand Up @@ -347,3 +350,16 @@ def get_view_definition(
schema=schema,
query=GREENPLUM_VIEW_DEFINITIONS,
)


@reflection.cache
def get_table_ddl(
self, connection, table_name, schema=None, **kw
): # pylint: disable=unused-argument
return get_table_ddl_wrapper(
self,
connection=connection,
query=GREENPLUM_TABLE_DDLS,
table_name=table_name,
schema=schema,
)
41 changes: 15 additions & 26 deletions ingestion/src/metadata/utils/sqlalchemy_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
"""
Module for sqlalchemy dialect utils
"""

import traceback
from typing import Dict, Optional, Tuple

from sqlalchemy import text
from sqlalchemy.engine import Engine, reflection
from sqlalchemy.exc import ProgrammingError
from sqlalchemy.schema import CreateTable, MetaData

from metadata.utils.logger import ingestion_logger

Expand All @@ -36,9 +36,9 @@ def get_all_table_comments(self, connection, query):
result = connection.execute(text(query) if isinstance(query, str) else query)
for table in result:
table_dict = {k.lower(): v for k, v in dict(table._mapping).items()}
self.all_table_comments[
(table_dict["table_name"], table_dict["schema"])
] = table_dict["table_comment"]
self.all_table_comments[(table_dict["table_name"], table_dict["schema"])] = (
table_dict["table_comment"]
)


def get_table_comment_wrapper(self, connection, query, table_name, schema=None):
Expand Down Expand Up @@ -149,28 +149,17 @@ def get_all_table_ddls(
"""
Method to fetch ddl of all available tables
"""
try:
self.all_table_ddls: Dict[Tuple[str, str], str] = {}
self.current_db: str = schema_name
meta = MetaData()
meta.reflect(bind=connection, schema=schema_name)
for table in meta.sorted_tables or []:
self.all_table_ddls[(table.schema, table.name)] = str(CreateTable(table))
except Exception as exc:
logger.debug(traceback.format_exc())
logger.debug(f"Failed to get table ddls for {schema_name}: {exc}")
# Roll back the aborted transaction so the connection remains usable
# for subsequent queries (e.g. get_table_comment). Without this,
# psycopg2 raises InFailedSqlTransaction on every query that follows.
if isinstance(exc, ProgrammingError):
try:
connection.rollback()
except Exception:
pass
try:
connection.rollback()
except Exception:
pass
self.all_table_ddls: Dict[Tuple[str, str], str] = {}
self.current_db: str = schema_name
if query is None:
return
result = connection.execute(
text(query).bindparams(schema_name=schema_name)
if isinstance(query, str)
else query
)
for row in result:
self.all_table_ddls[(row.schema_name, row.table_name)] = row.ddl


def get_table_ddl_wrapper(
Expand Down
Loading