Skip to content

Commit 160fdce

Browse files
Fixes #24559: Add Greenplum 7 partition support
Greenplum 7 adopted PostgreSQL 10+ declarative partitioning, replacing the GP-specific pg_partition and pg_partition_rule catalog tables with the standard pg_partitioned_table and relispartition column in pg_class. The connector now detects the Greenplum major version at runtime by parsing SELECT version() output with a regex, and selects the appropriate queries: - GP6: pg_partition_rule JOIN for table names, pg_partition with parkind/paratts for partition details (existing behavior) - GP7+: relispartition=false for table names, pg_partitioned_table with partstrat/partattrs for partition details (PostgreSQL-compatible) The version is cached per source instance to avoid repeated queries.
1 parent f3ae6cf commit 160fdce

3 files changed

Lines changed: 284 additions & 13 deletions

File tree

ingestion/src/metadata/ingestion/source/database/greenplum/metadata.py

Lines changed: 81 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
"""
1212
Greenplum source module
1313
"""
14+
15+
import re
1416
import traceback
1517
from collections import namedtuple
1618
from typing import Iterable, Optional, Tuple
@@ -47,8 +49,11 @@
4749
)
4850
from metadata.ingestion.source.database.greenplum.queries import (
4951
GREENPLUM_GET_DB_NAMES,
50-
GREENPLUM_GET_TABLE_NAMES,
51-
GREENPLUM_PARTITION_DETAILS,
52+
GREENPLUM_GET_TABLE_NAMES_V6,
53+
GREENPLUM_GET_TABLE_NAMES_V7,
54+
GREENPLUM_GET_VERSION,
55+
GREENPLUM_PARTITION_DETAILS_V6,
56+
GREENPLUM_PARTITION_DETAILS_V7,
5257
)
5358
from metadata.ingestion.source.database.greenplum.utils import (
5459
get_column_info,
@@ -91,6 +96,10 @@ class GreenplumSource(CommonDbSourceService, MultiDBSource):
9196
Database metadata from Greenplum Source
9297
"""
9398

99+
def __init__(self, config, metadata):
100+
super().__init__(config, metadata)
101+
self._greenplum_version: Optional[int] = None
102+
94103
@classmethod
95104
def create(
96105
cls,
@@ -106,15 +115,44 @@ def create(
106115
)
107116
return cls(config, metadata)
108117

118+
def _get_greenplum_major_version(self) -> int:
119+
if self._greenplum_version is None:
120+
try:
121+
result = self.engine.execute(text(GREENPLUM_GET_VERSION))
122+
version_string = result.scalar()
123+
match = re.search(r"Greenplum Database (\d+)", version_string)
124+
if match:
125+
self._greenplum_version = int(match.group(1))
126+
else:
127+
logger.warning(
128+
"Could not parse Greenplum version from '%s',"
129+
" defaulting to 7",
130+
version_string,
131+
)
132+
self._greenplum_version = 7
133+
except Exception:
134+
logger.debug(traceback.format_exc())
135+
logger.warning("Could not determine Greenplum version, defaulting to 7")
136+
self._greenplum_version = 7
137+
return self._greenplum_version
138+
139+
def _is_v7_or_later(self) -> bool:
140+
return self._get_greenplum_major_version() >= 7
141+
109142
def query_table_names_and_types(
110143
self, schema_name: str
111144
) -> Iterable[TableNameAndType]:
112145
"""
113146
Overwrite the inspector implementation to handle partitioned
114147
and foreign types
115148
"""
149+
table_names_query = (
150+
GREENPLUM_GET_TABLE_NAMES_V7
151+
if self._is_v7_or_later()
152+
else GREENPLUM_GET_TABLE_NAMES_V6
153+
)
116154
result = self.connection.execute(
117-
sql.text(GREENPLUM_GET_TABLE_NAMES),
155+
sql.text(table_names_query),
118156
{"schema": schema_name},
119157
)
120158

@@ -149,9 +187,11 @@ def get_database_names(self) -> Iterable[str]:
149187

150188
if filter_by_database(
151189
self.source_config.databaseFilterPattern,
152-
database_fqn
153-
if self.source_config.useFqnForFiltering
154-
else new_database,
190+
(
191+
database_fqn
192+
if self.source_config.useFqnForFiltering
193+
else new_database
194+
),
155195
):
156196
self.status.filter(database_fqn, "Database Filtered Out")
157197
continue
@@ -167,11 +207,18 @@ def get_database_names(self) -> Iterable[str]:
167207

168208
def get_table_partition_details(
169209
self, table_name: str, schema_name: str, inspector: Inspector
210+
) -> Tuple[bool, Optional[TablePartition]]:
211+
if self._is_v7_or_later():
212+
return self._get_table_partition_details_v7(table_name, schema_name)
213+
return self._get_table_partition_details_v6(table_name, schema_name)
214+
215+
def _get_table_partition_details_v6(
216+
self, table_name: str, schema_name: str
170217
) -> Tuple[bool, Optional[TablePartition]]:
171218
with self.engine.connect() as conn:
172219
result = conn.execute(
173220
text(
174-
GREENPLUM_PARTITION_DETAILS.format(
221+
GREENPLUM_PARTITION_DETAILS_V6.format(
175222
table_name=table_name, schema_name=schema_name
176223
)
177224
)
@@ -194,3 +241,30 @@ def get_table_partition_details(
194241
)
195242
return True, partition_details
196243
return False, None
244+
245+
def _get_table_partition_details_v7(
246+
self, table_name: str, schema_name: str
247+
) -> Tuple[bool, Optional[TablePartition]]:
248+
with self.engine.connect() as conn:
249+
result = conn.execute(
250+
text(GREENPLUM_PARTITION_DETAILS_V7),
251+
{"table_name": table_name, "schema_name": schema_name},
252+
).all()
253+
254+
if result:
255+
partition_details = TablePartition(
256+
columns=[
257+
PartitionColumnDetails(
258+
columnName=row.column_name,
259+
intervalType=INTERVAL_TYPE_MAP.get(
260+
result[0].partition_strategy,
261+
PartitionIntervalTypes.COLUMN_VALUE,
262+
),
263+
interval=None,
264+
)
265+
for row in result
266+
if row.column_name
267+
]
268+
)
269+
return True, partition_details
270+
return False, None

ingestion/src/metadata/ingestion/source/database/greenplum/queries.py

Lines changed: 48 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616

1717
# https://www.postgresql.org/docs/current/catalog-pg-class.html
1818
# r = ordinary table, v = view, m = materialized view, c = composite type, f = foreign table, p = partitioned table,
19-
GREENPLUM_GET_TABLE_NAMES = """
19+
20+
# Greenplum 6: uses pg_partition_rule to filter out child partitions
21+
GREENPLUM_GET_TABLE_NAMES_V6 = """
2022
select c.relname, c.relkind
2123
from pg_catalog.pg_class c
2224
left outer join pg_catalog.pg_partition_rule pr on c.oid = pr.parchildrelid
@@ -26,8 +28,15 @@
2628
and n.nspname = :schema
2729
"""
2830

29-
GREENPLUM_PARTITION_DETAILS = textwrap.dedent(
30-
"""
31+
# Greenplum 7+: uses PostgreSQL-style relispartition to filter out child partitions
32+
GREENPLUM_GET_TABLE_NAMES_V7 = """
33+
SELECT c.relname, c.relkind FROM pg_class c
34+
JOIN pg_namespace n ON n.oid = c.relnamespace
35+
WHERE n.nspname = :schema AND c.relkind in ('r', 'p', 'f') AND relispartition = false
36+
"""
37+
38+
# Greenplum 6: uses pg_partition catalog with parkind/paratts columns
39+
GREENPLUM_PARTITION_DETAILS_V6 = textwrap.dedent("""
3140
select
3241
ns.nspname as schema,
3342
par.relname as table_name,
@@ -57,8 +66,38 @@
5766
and col.table_name = par.relname
5867
and ordinal_position = pt.column_index
5968
where par.relname='{table_name}' and ns.nspname='{schema_name}'
60-
"""
61-
)
69+
""")
70+
71+
# Greenplum 7+: uses PostgreSQL-standard pg_partitioned_table with partstrat/partattrs
72+
GREENPLUM_PARTITION_DETAILS_V7 = textwrap.dedent("""
73+
select
74+
par.relnamespace::regnamespace::text as schema,
75+
par.relname as table_name,
76+
partition_strategy,
77+
col.column_name
78+
from
79+
(select
80+
partrelid,
81+
partnatts,
82+
case partstrat
83+
when 'l' then 'list'
84+
when 'h' then 'hash'
85+
when 'r' then 'range' end as partition_strategy,
86+
unnest(partattrs) column_index
87+
from
88+
pg_partitioned_table) pt
89+
join
90+
pg_class par
91+
on
92+
par.oid = pt.partrelid
93+
left join
94+
information_schema.columns col
95+
on
96+
col.table_schema = par.relnamespace::regnamespace::text
97+
and col.table_name = par.relname
98+
and ordinal_position = pt.column_index
99+
where par.relname=:table_name and par.relnamespace::regnamespace::text=:schema_name
100+
""")
62101

63102
GREENPLUM_TABLE_COMMENTS = """
64103
SELECT n.nspname as schema,
@@ -139,3 +178,7 @@
139178
GREENPLUM_GET_SERVER_VERSION = """
140179
show server_version
141180
"""
181+
182+
GREENPLUM_GET_VERSION = """
183+
SELECT version()
184+
"""

ingestion/tests/unit/topology/database/test_greenplum.py

Lines changed: 155 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,15 @@
1313
Test Greenplum using the topology
1414
"""
1515

16+
from collections import namedtuple
1617
from unittest import TestCase
17-
from unittest.mock import patch
18+
from unittest.mock import MagicMock, patch
1819

20+
from metadata.generated.schema.entity.data.table import (
21+
PartitionColumnDetails,
22+
PartitionIntervalTypes,
23+
TablePartition,
24+
)
1925
from metadata.generated.schema.metadataIngestion.workflow import (
2026
OpenMetadataWorkflowConfig,
2127
)
@@ -55,6 +61,10 @@
5561
},
5662
}
5763

64+
PartitionRow = namedtuple(
65+
"PartitionRow", ["schema", "table_name", "partition_strategy", "column_name"]
66+
)
67+
5868

5969
class greenplumUnitTest(TestCase):
6070
@patch(
@@ -76,3 +86,147 @@ def __init__(self, methodName, test_connection) -> None:
7686
def test_close_connection(self, engine, connection):
7787
connection.return_value = True
7888
self.greenplum_source.close()
89+
90+
def test_version_detection_v6(self):
91+
mock_engine = MagicMock()
92+
mock_result = MagicMock()
93+
mock_result.scalar.return_value = (
94+
"PostgreSQL 9.4.26 (Greenplum Database 6.25.3 build dev) on"
95+
" x86_64-pc-linux-gnu compiled by gcc 6.4.0"
96+
)
97+
mock_engine.execute.return_value = mock_result
98+
self.greenplum_source.engine = mock_engine
99+
100+
self.greenplum_source._greenplum_version = None
101+
assert self.greenplum_source._get_greenplum_major_version() == 6
102+
assert not self.greenplum_source._is_v7_or_later()
103+
104+
def test_version_detection_v7(self):
105+
mock_engine = MagicMock()
106+
mock_result = MagicMock()
107+
mock_result.scalar.return_value = (
108+
"PostgreSQL 12.12 (Greenplum Database 7.0.0 build dev) on"
109+
" x86_64-pc-linux-gnu compiled by gcc 12.3.1"
110+
)
111+
mock_engine.execute.return_value = mock_result
112+
self.greenplum_source.engine = mock_engine
113+
114+
self.greenplum_source._greenplum_version = None
115+
assert self.greenplum_source._get_greenplum_major_version() == 7
116+
assert self.greenplum_source._is_v7_or_later()
117+
118+
def test_version_detection_caches_result(self):
119+
mock_engine = MagicMock()
120+
mock_result = MagicMock()
121+
mock_result.scalar.return_value = (
122+
"PostgreSQL 12.12 (Greenplum Database 7.2.0 build dev) on"
123+
" x86_64-pc-linux-gnu"
124+
)
125+
mock_engine.execute.return_value = mock_result
126+
self.greenplum_source.engine = mock_engine
127+
128+
self.greenplum_source._greenplum_version = None
129+
self.greenplum_source._get_greenplum_major_version()
130+
self.greenplum_source._get_greenplum_major_version()
131+
mock_engine.execute.assert_called_once()
132+
133+
def test_version_detection_defaults_to_7_on_error(self):
134+
mock_engine = MagicMock()
135+
mock_engine.execute.side_effect = Exception("connection error")
136+
self.greenplum_source.engine = mock_engine
137+
138+
self.greenplum_source._greenplum_version = None
139+
assert self.greenplum_source._get_greenplum_major_version() == 7
140+
141+
def test_version_detection_defaults_to_7_on_unparseable(self):
142+
mock_engine = MagicMock()
143+
mock_result = MagicMock()
144+
mock_result.scalar.return_value = "PostgreSQL 14.0 on x86_64-pc-linux-gnu"
145+
mock_engine.execute.return_value = mock_result
146+
self.greenplum_source.engine = mock_engine
147+
148+
self.greenplum_source._greenplum_version = None
149+
assert self.greenplum_source._get_greenplum_major_version() == 7
150+
151+
def test_partition_details_v7(self):
152+
mock_engine = MagicMock()
153+
mock_conn = MagicMock()
154+
mock_engine.connect.return_value.__enter__ = MagicMock(return_value=mock_conn)
155+
mock_engine.connect.return_value.__exit__ = MagicMock(return_value=False)
156+
157+
mock_conn.execute.return_value.all.return_value = [
158+
PartitionRow(
159+
schema="public",
160+
table_name="sales",
161+
partition_strategy="range",
162+
column_name="sale_date",
163+
)
164+
]
165+
166+
self.greenplum_source.engine = mock_engine
167+
self.greenplum_source._greenplum_version = 7
168+
169+
is_partitioned, partition = self.greenplum_source.get_table_partition_details(
170+
"sales", "public", MagicMock()
171+
)
172+
173+
assert is_partitioned is True
174+
assert partition == TablePartition(
175+
columns=[
176+
PartitionColumnDetails(
177+
columnName="sale_date",
178+
intervalType=PartitionIntervalTypes.TIME_UNIT,
179+
interval=None,
180+
)
181+
]
182+
)
183+
184+
def test_partition_details_v6(self):
185+
mock_engine = MagicMock()
186+
mock_conn = MagicMock()
187+
mock_engine.connect.return_value.__enter__ = MagicMock(return_value=mock_conn)
188+
mock_engine.connect.return_value.__exit__ = MagicMock(return_value=False)
189+
190+
mock_conn.execute.return_value.all.return_value = [
191+
PartitionRow(
192+
schema="public",
193+
table_name="sales",
194+
partition_strategy="list",
195+
column_name="region",
196+
)
197+
]
198+
199+
self.greenplum_source.engine = mock_engine
200+
self.greenplum_source._greenplum_version = 6
201+
202+
is_partitioned, partition = self.greenplum_source.get_table_partition_details(
203+
"sales", "public", MagicMock()
204+
)
205+
206+
assert is_partitioned is True
207+
assert partition == TablePartition(
208+
columns=[
209+
PartitionColumnDetails(
210+
columnName="region",
211+
intervalType=PartitionIntervalTypes.COLUMN_VALUE,
212+
interval=None,
213+
)
214+
]
215+
)
216+
217+
def test_partition_details_no_results(self):
218+
mock_engine = MagicMock()
219+
mock_conn = MagicMock()
220+
mock_engine.connect.return_value.__enter__ = MagicMock(return_value=mock_conn)
221+
mock_engine.connect.return_value.__exit__ = MagicMock(return_value=False)
222+
mock_conn.execute.return_value.all.return_value = []
223+
224+
self.greenplum_source.engine = mock_engine
225+
self.greenplum_source._greenplum_version = 7
226+
227+
is_partitioned, partition = self.greenplum_source.get_table_partition_details(
228+
"not_partitioned", "public", MagicMock()
229+
)
230+
231+
assert is_partitioned is False
232+
assert partition is None

0 commit comments

Comments
 (0)