Skip to content

Commit 5d84a09

Browse files
fix(greenplum): default to v6 on version detection failure; add table-listing query branch tests
Address reviewer feedback on PR open-metadata#27288: - Default to Greenplum 6 (conservative) when version detection fails or output is unparseable, since GP7 queries reference catalogs/columns that do not exist on GP6 (was defaulting to 7, which could regress GP6 clusters). - Add unit tests verifying query_table_names_and_types selects GREENPLUM_GET_TABLE_NAMES_V6 vs GREENPLUM_GET_TABLE_NAMES_V7 based on detected major version, covering the table discovery path that originally failed in open-metadata#24559. - Apply ruff format to satisfy py-checkstyle CI.
1 parent c22f5d1 commit 5d84a09

2 files changed

Lines changed: 64 additions & 44 deletions

File tree

ingestion/src/metadata/ingestion/source/database/greenplum/metadata.py

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -123,40 +123,32 @@ def _get_greenplum_major_version(self) -> int:
123123
match = re.search(r"Greenplum Database (\d+)", version_string)
124124
if match:
125125
self._greenplum_version = int(match.group(1))
126-
logger.info(
127-
"Detected Greenplum major version %d", self._greenplum_version
128-
)
126+
logger.info("Detected Greenplum major version %d", self._greenplum_version)
129127
else:
130128
logger.warning(
131-
"Could not parse Greenplum major version from"
132-
" SELECT version() output, defaulting to 7"
129+
"Could not parse Greenplum major version from SELECT version() output, "
130+
"defaulting conservatively to 6"
133131
)
134132
logger.debug("Full version() output: %s", version_string)
135-
self._greenplum_version = 7
133+
self._greenplum_version = 6
136134
except SQLAlchemyError as exc:
137135
logger.debug(traceback.format_exc())
138136
logger.warning(
139-
"Could not determine Greenplum version (%s), defaulting to 7",
137+
"Could not determine Greenplum version (%s), defaulting conservatively to 6",
140138
exc,
141139
)
142-
self._greenplum_version = 7
140+
self._greenplum_version = 6
143141
return self._greenplum_version
144142

145143
def _is_v7_or_later(self) -> bool:
146144
return self._get_greenplum_major_version() >= 7
147145

148-
def query_table_names_and_types(
149-
self, schema_name: str
150-
) -> Iterable[TableNameAndType]:
146+
def query_table_names_and_types(self, schema_name: str) -> Iterable[TableNameAndType]:
151147
"""
152148
Overwrite the inspector implementation to handle partitioned
153149
and foreign types
154150
"""
155-
table_names_query = (
156-
GREENPLUM_GET_TABLE_NAMES_V7
157-
if self._is_v7_or_later()
158-
else GREENPLUM_GET_TABLE_NAMES_V6
159-
)
151+
table_names_query = GREENPLUM_GET_TABLE_NAMES_V7 if self._is_v7_or_later() else GREENPLUM_GET_TABLE_NAMES_V6
160152
result = self.connection.execute(
161153
sql.text(table_names_query),
162154
{"schema": schema_name},

ingestion/tests/unit/topology/database/test_greenplum.py

Lines changed: 56 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,7 @@
6363
},
6464
}
6565

66-
PartitionRow = namedtuple(
67-
"PartitionRow", ["schema", "table_name", "partition_strategy", "column_name"]
68-
)
66+
PartitionRow = namedtuple("PartitionRow", ["schema", "table_name", "partition_strategy", "column_name"])
6967

7068

7169
class greenplumUnitTest(TestCase):
@@ -91,8 +89,7 @@ def test_version_detection_v6(self):
9189
mock_engine.connect.return_value.__enter__ = MagicMock(return_value=mock_conn)
9290
mock_engine.connect.return_value.__exit__ = MagicMock(return_value=False)
9391
mock_conn.execute.return_value.scalar.return_value = (
94-
"PostgreSQL 9.4.26 (Greenplum Database 6.25.3 build dev) on"
95-
" x86_64-pc-linux-gnu compiled by gcc 6.4.0"
92+
"PostgreSQL 9.4.26 (Greenplum Database 6.25.3 build dev) on x86_64-pc-linux-gnu compiled by gcc 6.4.0"
9693
)
9794
self.greenplum_source.engine = mock_engine
9895

@@ -106,8 +103,7 @@ def test_version_detection_v7(self):
106103
mock_engine.connect.return_value.__enter__ = MagicMock(return_value=mock_conn)
107104
mock_engine.connect.return_value.__exit__ = MagicMock(return_value=False)
108105
mock_conn.execute.return_value.scalar.return_value = (
109-
"PostgreSQL 12.12 (Greenplum Database 7.0.0 build dev) on"
110-
" x86_64-pc-linux-gnu compiled by gcc 12.3.1"
106+
"PostgreSQL 12.12 (Greenplum Database 7.0.0 build dev) on x86_64-pc-linux-gnu compiled by gcc 12.3.1"
111107
)
112108
self.greenplum_source.engine = mock_engine
113109

@@ -121,8 +117,7 @@ def test_version_detection_caches_result(self):
121117
mock_engine.connect.return_value.__enter__ = MagicMock(return_value=mock_conn)
122118
mock_engine.connect.return_value.__exit__ = MagicMock(return_value=False)
123119
mock_conn.execute.return_value.scalar.return_value = (
124-
"PostgreSQL 12.12 (Greenplum Database 7.2.0 build dev) on"
125-
" x86_64-pc-linux-gnu"
120+
"PostgreSQL 12.12 (Greenplum Database 7.2.0 build dev) on x86_64-pc-linux-gnu"
126121
)
127122
self.greenplum_source.engine = mock_engine
128123

@@ -131,28 +126,24 @@ def test_version_detection_caches_result(self):
131126
self.greenplum_source._get_greenplum_major_version()
132127
mock_engine.connect.assert_called_once()
133128

134-
def test_version_detection_defaults_to_7_on_error(self):
129+
def test_version_detection_defaults_to_6_on_error(self):
135130
mock_engine = MagicMock()
136-
mock_engine.connect.side_effect = OperationalError(
137-
"SELECT version()", {}, Exception("connection error")
138-
)
131+
mock_engine.connect.side_effect = OperationalError("SELECT version()", {}, Exception("connection error"))
139132
self.greenplum_source.engine = mock_engine
140133

141134
self.greenplum_source._greenplum_version = None
142-
assert self.greenplum_source._get_greenplum_major_version() == 7
135+
assert self.greenplum_source._get_greenplum_major_version() == 6
143136

144-
def test_version_detection_defaults_to_7_on_unparseable(self):
137+
def test_version_detection_defaults_to_6_on_unparseable(self):
145138
mock_engine = MagicMock()
146139
mock_conn = MagicMock()
147140
mock_engine.connect.return_value.__enter__ = MagicMock(return_value=mock_conn)
148141
mock_engine.connect.return_value.__exit__ = MagicMock(return_value=False)
149-
mock_conn.execute.return_value.scalar.return_value = (
150-
"PostgreSQL 14.0 on x86_64-pc-linux-gnu"
151-
)
142+
mock_conn.execute.return_value.scalar.return_value = "PostgreSQL 14.0 on x86_64-pc-linux-gnu"
152143
self.greenplum_source.engine = mock_engine
153144

154145
self.greenplum_source._greenplum_version = None
155-
assert self.greenplum_source._get_greenplum_major_version() == 7
146+
assert self.greenplum_source._get_greenplum_major_version() == 6
156147

157148
def test_partition_details_v7(self):
158149
mock_engine = MagicMock()
@@ -172,9 +163,7 @@ def test_partition_details_v7(self):
172163
self.greenplum_source.engine = mock_engine
173164
self.greenplum_source._greenplum_version = 7
174165

175-
is_partitioned, partition = self.greenplum_source.get_table_partition_details(
176-
"sales", "public", MagicMock()
177-
)
166+
is_partitioned, partition = self.greenplum_source.get_table_partition_details("sales", "public", MagicMock())
178167

179168
assert is_partitioned is True
180169
assert partition == TablePartition(
@@ -205,9 +194,7 @@ def test_partition_details_v6(self):
205194
self.greenplum_source.engine = mock_engine
206195
self.greenplum_source._greenplum_version = 6
207196

208-
is_partitioned, partition = self.greenplum_source.get_table_partition_details(
209-
"sales", "public", MagicMock()
210-
)
197+
is_partitioned, partition = self.greenplum_source.get_table_partition_details("sales", "public", MagicMock())
211198

212199
assert is_partitioned is True
213200
assert partition == TablePartition(
@@ -258,9 +245,50 @@ def test_partition_details_expression_key_returns_true_no_columns(self):
258245
self.greenplum_source.engine = mock_engine
259246
self.greenplum_source._greenplum_version = 7
260247

261-
is_partitioned, partition = self.greenplum_source.get_table_partition_details(
262-
"sales", "public", MagicMock()
263-
)
248+
is_partitioned, partition = self.greenplum_source.get_table_partition_details("sales", "public", MagicMock())
264249

265250
assert is_partitioned is True
266251
assert partition is None
252+
253+
def test_query_table_names_uses_v6_query_for_v6(self):
254+
"""Issue #24559: ensure GP6 uses pg_partition_rule-based table-listing query."""
255+
from metadata.ingestion.source.database.greenplum.queries import (
256+
GREENPLUM_GET_TABLE_NAMES_V6,
257+
)
258+
259+
mock_connection = MagicMock()
260+
mock_connection.execute.return_value = []
261+
self.greenplum_source._greenplum_version = 6
262+
263+
with patch.object(
264+
type(self.greenplum_source),
265+
"connection",
266+
new_callable=lambda: property(lambda self: mock_connection),
267+
):
268+
list(self.greenplum_source.query_table_names_and_types("public"))
269+
270+
executed_sql = str(mock_connection.execute.call_args[0][0])
271+
assert executed_sql.strip() == GREENPLUM_GET_TABLE_NAMES_V6.strip()
272+
assert "pg_partition_rule" in executed_sql
273+
274+
def test_query_table_names_uses_v7_query_for_v7(self):
275+
"""Issue #24559: ensure GP7 uses relispartition-based table-listing query."""
276+
from metadata.ingestion.source.database.greenplum.queries import (
277+
GREENPLUM_GET_TABLE_NAMES_V7,
278+
)
279+
280+
mock_connection = MagicMock()
281+
mock_connection.execute.return_value = []
282+
self.greenplum_source._greenplum_version = 7
283+
284+
with patch.object(
285+
type(self.greenplum_source),
286+
"connection",
287+
new_callable=lambda: property(lambda self: mock_connection),
288+
):
289+
list(self.greenplum_source.query_table_names_and_types("public"))
290+
291+
executed_sql = str(mock_connection.execute.call_args[0][0])
292+
assert executed_sql.strip() == GREENPLUM_GET_TABLE_NAMES_V7.strip()
293+
assert "relispartition" in executed_sql
294+
assert "pg_partition_rule" not in executed_sql

0 commit comments

Comments
 (0)