Skip to content

Commit 3c4bbf3

Browse files
Fixes #24559: Add Greenplum 7 partition support
Greenplum 7 adopted PostgreSQL 10+ declarative partitioning, replacing the GP-specific pg_partition and pg_partition_rule catalog tables with the standard pg_partitioned_table and relispartition column in pg_class. The connector now detects the Greenplum major version at runtime by parsing SELECT version() output with a regex, and selects the appropriate queries: - GP6: pg_partition_rule JOIN for table names, pg_partition with parkind/paratts for partition details (existing behavior) - GP7+: relispartition=false for table names, pg_partitioned_table with partstrat/partattrs for partition details (PostgreSQL-compatible) The version is cached per source instance to avoid repeated queries.
1 parent 219490a commit 3c4bbf3

3 files changed

Lines changed: 293 additions & 11 deletions

File tree

ingestion/src/metadata/ingestion/source/database/greenplum/metadata.py

Lines changed: 83 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
"""
1212
Greenplum source module
1313
"""
14+
15+
import re
1416
import traceback
1517
from collections import namedtuple
1618
from typing import Iterable, Optional, Tuple
@@ -47,8 +49,11 @@
4749
)
4850
from metadata.ingestion.source.database.greenplum.queries import (
4951
GREENPLUM_GET_DB_NAMES,
50-
GREENPLUM_GET_TABLE_NAMES,
51-
GREENPLUM_PARTITION_DETAILS,
52+
GREENPLUM_GET_TABLE_NAMES_V6,
53+
GREENPLUM_GET_TABLE_NAMES_V7,
54+
GREENPLUM_GET_VERSION,
55+
GREENPLUM_PARTITION_DETAILS_V6,
56+
GREENPLUM_PARTITION_DETAILS_V7,
5257
)
5358
from metadata.ingestion.source.database.greenplum.utils import (
5459
get_column_info,
@@ -91,6 +96,10 @@ class GreenplumSource(CommonDbSourceService, MultiDBSource):
9196
Database metadata from Greenplum Source
9297
"""
9398

99+
def __init__(self, config, metadata):
100+
super().__init__(config, metadata)
101+
self._greenplum_version: Optional[int] = None
102+
94103
@classmethod
95104
def create(
96105
cls,
@@ -106,15 +115,45 @@ def create(
106115
)
107116
return cls(config, metadata)
108117

118+
def _get_greenplum_major_version(self) -> int:
119+
if self._greenplum_version is None:
120+
try:
121+
with self.engine.connect() as conn:
122+
result = conn.execute(text(GREENPLUM_GET_VERSION))
123+
version_string = result.scalar() or ""
124+
match = re.search(r"Greenplum Database (\d+)", version_string)
125+
if match:
126+
self._greenplum_version = int(match.group(1))
127+
else:
128+
logger.warning(
129+
"Could not parse Greenplum version from '%s',"
130+
" defaulting to 7",
131+
version_string,
132+
)
133+
self._greenplum_version = 7
134+
except Exception:
135+
logger.debug(traceback.format_exc())
136+
logger.warning("Could not determine Greenplum version, defaulting to 7")
137+
self._greenplum_version = 7
138+
return self._greenplum_version
139+
140+
def _is_v7_or_later(self) -> bool:
141+
return self._get_greenplum_major_version() >= 7
142+
109143
def query_table_names_and_types(
110144
self, schema_name: str
111145
) -> Iterable[TableNameAndType]:
112146
"""
113147
Overwrite the inspector implementation to handle partitioned
114148
and foreign types
115149
"""
150+
table_names_query = (
151+
GREENPLUM_GET_TABLE_NAMES_V7
152+
if self._is_v7_or_later()
153+
else GREENPLUM_GET_TABLE_NAMES_V6
154+
)
116155
result = self.connection.execute(
117-
sql.text(GREENPLUM_GET_TABLE_NAMES),
156+
sql.text(table_names_query),
118157
{"schema": schema_name},
119158
)
120159

@@ -149,9 +188,11 @@ def get_database_names(self) -> Iterable[str]:
149188

150189
if filter_by_database(
151190
self.source_config.databaseFilterPattern,
152-
database_fqn
153-
if self.source_config.useFqnForFiltering
154-
else new_database,
191+
(
192+
database_fqn
193+
if self.source_config.useFqnForFiltering
194+
else new_database
195+
),
155196
):
156197
self.status.filter(database_fqn, "Database Filtered Out")
157198
continue
@@ -167,11 +208,18 @@ def get_database_names(self) -> Iterable[str]:
167208

168209
def get_table_partition_details(
169210
self, table_name: str, schema_name: str, inspector: Inspector
211+
) -> Tuple[bool, Optional[TablePartition]]:
212+
if self._is_v7_or_later():
213+
return self._get_table_partition_details_v7(table_name, schema_name)
214+
return self._get_table_partition_details_v6(table_name, schema_name)
215+
216+
def _get_table_partition_details_v6(
217+
self, table_name: str, schema_name: str
170218
) -> Tuple[bool, Optional[TablePartition]]:
171219
with self.engine.connect() as conn:
172220
result = conn.execute(
173221
text(
174-
GREENPLUM_PARTITION_DETAILS.format(
222+
GREENPLUM_PARTITION_DETAILS_V6.format(
175223
table_name=table_name, schema_name=schema_name
176224
)
177225
)
@@ -183,7 +231,34 @@ def get_table_partition_details(
183231
PartitionColumnDetails(
184232
columnName=row.column_name,
185233
intervalType=INTERVAL_TYPE_MAP.get(
186-
result[0].partition_strategy,
234+
row.partition_strategy,
235+
PartitionIntervalTypes.COLUMN_VALUE,
236+
),
237+
interval=None,
238+
)
239+
for row in result
240+
if row.column_name
241+
]
242+
)
243+
return True, partition_details
244+
return False, None
245+
246+
def _get_table_partition_details_v7(
247+
self, table_name: str, schema_name: str
248+
) -> Tuple[bool, Optional[TablePartition]]:
249+
with self.engine.connect() as conn:
250+
result = conn.execute(
251+
text(GREENPLUM_PARTITION_DETAILS_V7),
252+
{"table_name": table_name, "schema_name": schema_name},
253+
).all()
254+
255+
if result:
256+
partition_details = TablePartition(
257+
columns=[
258+
PartitionColumnDetails(
259+
columnName=row.column_name,
260+
intervalType=INTERVAL_TYPE_MAP.get(
261+
row.partition_strategy,
187262
PartitionIntervalTypes.COLUMN_VALUE,
188263
),
189264
interval=None,

ingestion/src/metadata/ingestion/source/database/greenplum/queries.py

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616

1717
# https://www.postgresql.org/docs/current/catalog-pg-class.html
1818
# r = ordinary table, v = view, m = materialized view, c = composite type, f = foreign table, p = partitioned table,
19-
GREENPLUM_GET_TABLE_NAMES = """
19+
20+
# Greenplum 6: uses pg_partition_rule to filter out child partitions
21+
GREENPLUM_GET_TABLE_NAMES_V6 = """
2022
select c.relname, c.relkind
2123
from pg_catalog.pg_class c
2224
left outer join pg_catalog.pg_partition_rule pr on c.oid = pr.parchildrelid
@@ -26,7 +28,15 @@
2628
and n.nspname = :schema
2729
"""
2830

29-
GREENPLUM_PARTITION_DETAILS = textwrap.dedent(
31+
# Greenplum 7+: uses PostgreSQL-style relispartition to filter out child partitions
32+
GREENPLUM_GET_TABLE_NAMES_V7 = """
33+
SELECT c.relname, c.relkind FROM pg_catalog.pg_class c
34+
JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
35+
WHERE n.nspname = :schema AND c.relkind in ('r', 'p', 'f') AND c.relispartition = false
36+
"""
37+
38+
# Greenplum 6: uses pg_partition catalog with parkind/paratts columns
39+
GREENPLUM_PARTITION_DETAILS_V6 = textwrap.dedent(
3040
"""
3141
select
3242
ns.nspname as schema,
@@ -60,6 +70,39 @@
6070
"""
6171
)
6272

73+
# Greenplum 7+: uses PostgreSQL-standard pg_partitioned_table with partstrat/partattrs
74+
GREENPLUM_PARTITION_DETAILS_V7 = textwrap.dedent(
75+
"""
76+
select
77+
par.relnamespace::regnamespace::text as schema,
78+
par.relname as table_name,
79+
partition_strategy,
80+
col.column_name
81+
from
82+
(select
83+
partrelid,
84+
partnatts,
85+
case partstrat
86+
when 'l' then 'list'
87+
when 'h' then 'hash'
88+
when 'r' then 'range' end as partition_strategy,
89+
unnest(partattrs) column_index
90+
from
91+
pg_partitioned_table) pt
92+
join
93+
pg_class par
94+
on
95+
par.oid = pt.partrelid
96+
left join
97+
information_schema.columns col
98+
on
99+
col.table_schema = par.relnamespace::regnamespace::text
100+
and col.table_name = par.relname
101+
and ordinal_position = pt.column_index
102+
where par.relname=:table_name and par.relnamespace::regnamespace::text=:schema_name
103+
"""
104+
)
105+
63106
GREENPLUM_TABLE_COMMENTS = """
64107
SELECT n.nspname as schema,
65108
c.relname as table_name,
@@ -139,3 +182,7 @@
139182
GREENPLUM_GET_SERVER_VERSION = """
140183
show server_version
141184
"""
185+
186+
GREENPLUM_GET_VERSION = """
187+
SELECT version()
188+
"""

0 commit comments

Comments
 (0)