Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@
from sqlalchemy import text
from sqlalchemy.engine.reflection import Inspector

from metadata.generated.schema.entity.data.table import TableType
from metadata.generated.schema.entity.data.table import (
PartitionColumnDetails,
PartitionIntervalTypes,
TablePartition,
TableType,
)
from metadata.generated.schema.entity.services.connections.database.hiveConnection import (
HiveConnection,
)
Expand Down Expand Up @@ -80,7 +85,7 @@

def _get_validated_metastore_connection(
self,
) -> Optional[Union[PostgresConnection, MysqlConnection]]: # noqa: UP007, UP045

Check warning on line 88 in ingestion/src/metadata/ingestion/source/database/hive/metadata.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Use a union type expression for this type hint.

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZ3f3u9H6P2CFuSP7AJ2&open=AZ3f3u9H6P2CFuSP7AJ2&pullRequest=27278
"""
Validate and return the metastore connection if it exists.
Handles cases where the connection may be a raw dict that needs validation.
Expand Down Expand Up @@ -130,6 +135,82 @@
self._connection_map = {} # Lazy init as well
self._inspector_map = {}

def get_table_partition_details(

Check failure on line 138 in ingestion/src/metadata/ingestion/source/database/hive/metadata.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 19 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZ2ULmWg2JtrbwHVySlW&open=AZ2ULmWg2JtrbwHVySlW&pullRequest=27278
self,
table_name: str,
schema_name: str,
inspector: Inspector,
) -> tuple[bool, TablePartition | None]:
"""
Extract partition key columns from DESCRIBE FORMATTED output.
Returns (True, TablePartition) if partition keys are found,
otherwise (False, None).

Skipped when metastoreConnection is configured because self.engine
is replaced with a MySQL/Postgres metastore engine that cannot
execute HiveQL DESCRIBE FORMATTED statements.
"""
# pylint: disable=unused-argument
# When metastore connection is configured, self.engine is a
# MySQL/Postgres engine — DESCRIBE FORMATTED will fail against it.
# Skip partition detection in this case.
metastore_conn = self._get_validated_metastore_connection()
if metastore_conn:
logger.debug(
"Skipping partition key detection for"
f" {schema_name}.{table_name}: metastoreConnection"
" replaces HiveServer2 engine."
)
return False, None
Comment thread
mohitjeswani01 marked this conversation as resolved.

partition_keys: list[str] = []
in_partition_section = False

try:
dialect = getattr(self.engine, "dialect", None)
if dialect is None:
logger.debug(f"Engine has no dialect; skipping partition detection for {schema_name}.{table_name}")
return False, None
preparer = dialect.identifier_preparer
Comment thread
gitar-bot[bot] marked this conversation as resolved.
quoted_schema = preparer.quote(schema_name)
quoted_table = preparer.quote(table_name)
with self.engine.connect() as conn:
rows = conn.execute(text(f"DESCRIBE FORMATTED {quoted_schema}.{quoted_table}"))
Comment thread
mohitjeswani01 marked this conversation as resolved.
for row in rows:
col_name = row[0].strip() if row[0] else ""
if col_name == "# Partition Information":
in_partition_section = True
continue
if in_partition_section:
# Exit on blank line or new section header
# AFTER we have already collected at least one key.
# This prevents downstream metadata rows like
# "Database:", "Owner:", "Location:" from being
# collected as partition keys.
if col_name.startswith("#") or not col_name:
if partition_keys:
break
continue
partition_keys.append(col_name)
Comment thread
gitar-bot[bot] marked this conversation as resolved.
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Failed to get partition details for {schema_name}.{table_name}: {exc}")
return False, None

if not partition_keys:
return False, None

return True, TablePartition(
columns=[
PartitionColumnDetails(
columnName=key,
intervalType=PartitionIntervalTypes.COLUMN_VALUE,
interval=None,
)
for key in partition_keys
]
)

def get_schema_definition( # pylint: disable=unused-argument
self, table_type: str, table_name: str, schema_name: str, inspector: Inspector
) -> Optional[str]: # noqa: UP045
Expand Down
112 changes: 111 additions & 1 deletion ingestion/tests/unit/topology/database/test_hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import types
from unittest import TestCase
from unittest.mock import Mock, patch
from unittest.mock import MagicMock, Mock, patch

from sqlalchemy.types import INTEGER, VARCHAR, Integer, String

Expand Down Expand Up @@ -1172,3 +1172,113 @@ def test_get_validated_metastore_connection_with_invalid_dict(self):
self.hive.service_connection.metastoreConnection = invalid_dict
result = self.hive._get_validated_metastore_connection()
self.assertIsNone(result)


class TestHivePartitionDetails(TestCase):
"""Unit tests for HiveSource.get_table_partition_details"""

def setUp(self):
"""
Create a minimal HiveSource-like object with mocked internals.
Do NOT call HiveSource() constructor — mock only what the method needs:
- self.source.engine (MagicMock for context manager support)
- self.source._get_validated_metastore_connection returns None
"""
self.source = Mock()
self.source._get_validated_metastore_connection.return_value = None
self.source.engine = MagicMock()
# Bind the real method to our mock source
self.source.get_table_partition_details = HiveSource.get_table_partition_details.__get__(self.source)

def _make_rows(self, data):
"""Helper: convert list of tuples into mock row objects."""
rows = []
for item in data:
row = MagicMock()
row.__getitem__.side_effect = lambda i, _item=item: _item[i]
rows.append(row)
return rows
Comment thread
mohitjeswani01 marked this conversation as resolved.

def test_partition_keys_extracted_correctly(self):
"""Parser extracts year and country, stops before metadata rows."""
mock_rows = self._make_rows(
[
("col_name", "data_type", "comment"),
("id", "int", ""),
("name", "string", ""),
("# Partition Information", "", ""),
("# col_name", "data_type", "comment"),
("year", "int", ""),
("country", "string", ""),
("# Detailed Table Information", "", ""),
("Database:", "default", ""),
("Owner:", "hadoop", ""),
]
)
conn_mock = Mock()
conn_mock.execute.return_value = mock_rows
self.source.engine.connect.return_value.__enter__.return_value = conn_mock

is_partitioned, partition = self.source.get_table_partition_details(
table_name="sales",
schema_name="default",
inspector=Mock(),
)

self.assertTrue(is_partitioned)
self.assertIsNotNone(partition)
extracted = [col.columnName for col in partition.columns]
self.assertIn("year", extracted)
self.assertIn("country", extracted)
# Critical: metadata rows must NOT be collected
self.assertNotIn("Database:", extracted)
self.assertNotIn("Owner:", extracted)

def test_no_partition_section_returns_false(self):
"""Tables with no partition section return (False, None)."""
mock_rows = self._make_rows(
[
("col_name", "data_type", "comment"),
("id", "int", ""),
("name", "string", ""),
]
)
conn_mock = Mock()
conn_mock.execute.return_value = mock_rows
self.source.engine.connect.return_value.__enter__.return_value = conn_mock

is_partitioned, partition = self.source.get_table_partition_details(
table_name="simple_table",
schema_name="default",
inspector=Mock(),
)

self.assertFalse(is_partitioned)
self.assertIsNone(partition)

def test_metastore_connection_skips_detection(self):
"""When metastoreConnection is set, engine is never called."""
with patch.object(self.source, "_get_validated_metastore_connection", return_value=Mock()):
is_partitioned, partition = self.source.get_table_partition_details(
table_name="any_table",
schema_name="any_schema",
inspector=Mock(),
)

self.assertFalse(is_partitioned)
self.assertIsNone(partition)
# Engine must never be touched
self.source.engine.connect.assert_not_called()

def test_engine_exception_returns_false(self):
"""Engine failure is caught gracefully, returns (False, None)."""
self.source.engine.connect.side_effect = Exception("connection failed")

is_partitioned, partition = self.source.get_table_partition_details(
table_name="broken_table",
schema_name="default",
inspector=Mock(),
)

self.assertFalse(is_partitioned)
self.assertIsNone(partition)
Loading