feat(hive): add metadata flag to identify partition keys (#26712)#27029
feat(hive): add metadata flag to identify partition keys (#26712)#27029SaaiAravindhRaja wants to merge 17 commits intoopen-metadata:mainfrom
Conversation
|
Hi there 👋 Thanks for your contribution! The OpenMetadata team will review the PR shortly! Once it has been labeled as Let us know if you need any help! |
1 similar comment
|
Hi there 👋 Thanks for your contribution! The OpenMetadata team will review the PR shortly! Once it has been labeled as Let us know if you need any help! |
There was a problem hiding this comment.
Pull request overview
Adds Hive-specific partition key extraction during ingestion so Hive tables can be marked as partitioned and expose tablePartition details, enabling the UI to distinguish partition columns from regular data columns.
Changes:
- Import
TablePartition/PartitionColumnDetailsschema types for partition metadata. - Add
HiveSource.get_table_partition_details()that runsDESCRIBE FORMATTEDand builds aTablePartitionfrom detected partition keys.
| with self.engine.connect() as conn: | ||
| rows = conn.execute( | ||
| text(f"DESCRIBE FORMATTED `{schema_name}`.`{table_name}`") | ||
| ) | ||
| for row in rows: | ||
| col_name = row[0].strip() if row[0] else "" | ||
| if col_name == "# Partition Information": | ||
| in_partition_section = True | ||
| continue | ||
| if in_partition_section: | ||
| if col_name.startswith("#") or not col_name: | ||
| continue | ||
| partition_keys.append(col_name) | ||
| except Exception as exc: | ||
| logger.debug(traceback.format_exc()) | ||
| logger.warning( | ||
| f"Failed to get partition details for {schema_name}.{table_name}: {exc}" | ||
| ) |
There was a problem hiding this comment.
When metastoreConnection is configured, prepare() replaces self.engine with a MySQL/Postgres metastore engine. Executing DESCRIBE FORMATTED against that engine will fail (and will likely log a warning per table), meaning partition details won’t be ingested for metastore-based setups. This method should either (a) query partition keys from the metastore tables (e.g., PARTITION_KEYS joined with TBLS/DBS), or (b) keep a separate HiveServer2 engine for HiveQL statements and use that here.
| if col_name == "# Partition Information": | ||
| in_partition_section = True | ||
| continue | ||
| if in_partition_section: |
There was a problem hiding this comment.
The parsing logic never exits the “Partition Information” section, so it can accidentally treat later non-column rows (e.g., table metadata like Database:/Owner: that appear after the partition block in DESCRIBE FORMATTED) as partition keys. Consider stopping collection when you hit the first blank row after reading at least one partition key, or when a new section header like # Detailed Table Information is encountered.
| if in_partition_section: | |
| if in_partition_section: | |
| if partition_keys and (col_name.startswith("#") or not col_name): | |
| break |
| def get_table_partition_details( | ||
| self, | ||
| table_name: str, | ||
| schema_name: str, | ||
| inspector: Inspector, | ||
| ) -> Tuple[bool, Optional[TablePartition]]: | ||
| """ | ||
| Extract partition key columns from DESCRIBE FORMATTED output. | ||
| Returns (is_partitioned, TablePartition) where TablePartition lists all | ||
| partition key columns with COLUMN_VALUE interval type. | ||
| """ | ||
| partition_keys: List[str] = [] | ||
| in_partition_section = False | ||
| try: | ||
| with self.engine.connect() as conn: | ||
| rows = conn.execute( | ||
| text(f"DESCRIBE FORMATTED `{schema_name}`.`{table_name}`") | ||
| ) | ||
| for row in rows: | ||
| col_name = row[0].strip() if row[0] else "" | ||
| if col_name == "# Partition Information": | ||
| in_partition_section = True | ||
| continue | ||
| if in_partition_section: | ||
| if col_name.startswith("#") or not col_name: | ||
| continue | ||
| partition_keys.append(col_name) | ||
| except Exception as exc: | ||
| logger.debug(traceback.format_exc()) | ||
| logger.warning( | ||
| f"Failed to get partition details for {schema_name}.{table_name}: {exc}" | ||
| ) | ||
| return False, None | ||
|
|
||
| if not partition_keys: | ||
| return False, None | ||
|
|
||
| return True, TablePartition( | ||
| columns=[ | ||
| PartitionColumnDetails( | ||
| columnName=key, | ||
| intervalType=PartitionIntervalTypes.COLUMN_VALUE, | ||
| interval=None, | ||
| ) | ||
| for key in partition_keys | ||
| ] | ||
| ) |
There was a problem hiding this comment.
This new behavior (Hive partition key detection + tablePartition population) isn’t covered by unit tests. There are existing Hive unit tests in ingestion/tests/unit/topology/database/test_hive.py; adding a test that stubs the DESCRIBE FORMATTED result (and a metastoreConnection case) would prevent regressions and verify that only actual partition key rows are returned.
|
Hi there 👋 Thanks for your contribution! The OpenMetadata team will review the PR shortly! Once it has been labeled as Let us know if you need any help! |
|
@SaaiAravindhRaja can you check the copilot comments |
|
@ulixius9 Noted! Will make the necessary updates asap |
|
@m0hitbansal I have pushed a pr please review I would like to work on this issue please assign me |
a6fd952 to
5398ec0
Compare
|
Hi there 👋 Thanks for your contribution! The OpenMetadata team will review the PR shortly! Once it has been labeled as Let us know if you need any help! |
| else: | ||
| with self.engine.connect() as conn: | ||
| rows = conn.execute( | ||
| text(f"DESCRIBE FORMATTED `{schema_name}`.`{table_name}`") | ||
| ) | ||
| for row in rows: | ||
| col_name = row[0].strip() if row[0] else "" | ||
| if col_name == "# Partition Information": | ||
| in_partition_section = True | ||
| continue | ||
| if in_partition_section: | ||
| if not col_name or col_name.startswith("# Detailed"): | ||
| break | ||
| if col_name.startswith("#"): | ||
| continue | ||
| partition_keys.append(col_name) |
There was a problem hiding this comment.
Unit tests added here cover the metastore-backed path, but the non-metastore path (parsing DESCRIBE FORMATTED output) is currently untested. Since that branch is likely the default when metastoreConnection is not configured, add a test that mocks the DESCRIBE output rows and asserts the extracted partition keys (and a test for a non-partitioned table returning (False, None)).
|
Hi there 👋 Thanks for your contribution! The OpenMetadata team will review the PR shortly! Once it has been labeled as Let us know if you need any help! |
| if drivername in {"hive+mysql", "hive+postgres"}: | ||
| query = ( | ||
| """ | ||
| SELECT pk.PKEY_NAME | ||
| FROM PARTITION_KEYS pk | ||
| JOIN TBLS tbl ON pk.TBL_ID = tbl.TBL_ID | ||
| JOIN DBS db ON tbl.DB_ID = db.DB_ID | ||
| WHERE db.NAME = :schema AND tbl.TBL_NAME = :table_name | ||
| ORDER BY pk.INTEGER_IDX | ||
| """ | ||
| if drivername == "hive+mysql" | ||
| else """ | ||
| SELECT pk."PKEY_NAME" | ||
| FROM "PARTITION_KEYS" pk | ||
| JOIN "TBLS" tbl ON pk."TBL_ID" = tbl."TBL_ID" | ||
| JOIN "DBS" db ON tbl."DB_ID" = db."DB_ID" | ||
| WHERE db."NAME" = :schema AND tbl."TBL_NAME" = :table_name | ||
| ORDER BY pk."INTEGER_IDX" | ||
| """ | ||
| ) | ||
| rows = self.connection.execute( | ||
| text(query), | ||
| {"table_name": table_name, "schema": schema_name}, | ||
| ).fetchall() | ||
| partition_keys = [row[0] for row in rows if row and row[0]] | ||
| else: | ||
| with self.engine.connect() as conn: | ||
| rows = conn.execute( | ||
| text(f"DESCRIBE FORMATTED `{schema_name}`.`{table_name}`") | ||
| ) | ||
| for row in rows: | ||
| col_name = row[0].strip() if row[0] else "" | ||
| if col_name == "# Partition Information": | ||
| in_partition_section = True | ||
| continue | ||
| if in_partition_section: | ||
| if not col_name or col_name.startswith("# Detailed"): | ||
| break | ||
| if col_name.startswith("#"): | ||
| continue |
There was a problem hiding this comment.
you can split that into 2 distinct methods and have the calling handle in a 3rd method. Let's also not use fetchall()
|
The Python checkstyle failed. Please run You can install the pre-commit hooks with |
8077416 to
2a23bc1
Compare
| def test_get_table_partition_details_non_partitioned_describe(self): | ||
| self.hive.engine = types.SimpleNamespace( | ||
| url=types.SimpleNamespace(drivername="hive") | ||
| ) | ||
| mock_connection = Mock() |
There was a problem hiding this comment.
The DESCRIBE-based tests override self.hive.engine with a SimpleNamespace that only contains url.drivername. get_table_partition_details() now uses self.engine.dialect.identifier_preparer.quote_identifier(...) when drivername is not metastore, so this test setup will raise AttributeError (and the test may pass/fail for the wrong reason). Update the mocked engine to include a dialect.identifier_preparer with quote_identifier, or avoid replacing the whole engine and just patch self.hive.engine.url.drivername.
| def test_get_table_partition_details_from_describe_formatted(self): | ||
| self.hive.engine = types.SimpleNamespace( | ||
| url=types.SimpleNamespace(drivername="hive") | ||
| ) | ||
| mock_connection = Mock() |
There was a problem hiding this comment.
Same issue as above: this test replaces self.hive.engine with a SimpleNamespace that lacks dialect.identifier_preparer, but the DESCRIBE FORMATTED path in get_table_partition_details() requires it for quoting. Without adding a mock dialect.identifier_preparer.quote_identifier, the code will error and get_table_partition_details() will return (False, None) from the exception handler, causing this test to fail (or not actually exercise the parsing logic).
| q = ( | ||
| lambda name: f'"{name}"' if drivername == "hive+postgres" else name | ||
| ) # noqa: E731 | ||
| return ( | ||
| f"SELECT pk.{q('PKEY_NAME')}" | ||
| f" FROM {q('PARTITION_KEYS')} pk" | ||
| f" JOIN {q('TBLS')} tbl ON pk.{q('TBL_ID')} = tbl.{q('TBL_ID')}" | ||
| f" JOIN {q('DBS')} db ON tbl.{q('DB_ID')} = db.{q('DB_ID')}" | ||
| f" WHERE db.{q('NAME')} = :schema AND tbl.{q('TBL_NAME')} = :table_name" | ||
| f" ORDER BY pk.{q('INTEGER_IDX')}" |
There was a problem hiding this comment.
_build_metastore_partition_query assigns a lambda to q and then suppresses the linter with # noqa: E731. Prefer a small local function (or conditional helper) instead so we don’t need to disable lint rules, which improves readability and keeps lint behavior consistent across the codebase.
| q = ( | |
| lambda name: f'"{name}"' if drivername == "hive+postgres" else name | |
| ) # noqa: E731 | |
| return ( | |
| f"SELECT pk.{q('PKEY_NAME')}" | |
| f" FROM {q('PARTITION_KEYS')} pk" | |
| f" JOIN {q('TBLS')} tbl ON pk.{q('TBL_ID')} = tbl.{q('TBL_ID')}" | |
| f" JOIN {q('DBS')} db ON tbl.{q('DB_ID')} = db.{q('DB_ID')}" | |
| f" WHERE db.{q('NAME')} = :schema AND tbl.{q('TBL_NAME')} = :table_name" | |
| f" ORDER BY pk.{q('INTEGER_IDX')}" | |
| def quote_identifier(name: str) -> str: | |
| return f'"{name}"' if drivername == "hive+postgres" else name | |
| return ( | |
| f"SELECT pk.{quote_identifier('PKEY_NAME')}" | |
| f" FROM {quote_identifier('PARTITION_KEYS')} pk" | |
| f" JOIN {quote_identifier('TBLS')} tbl ON pk.{quote_identifier('TBL_ID')} = tbl.{quote_identifier('TBL_ID')}" | |
| f" JOIN {quote_identifier('DBS')} db ON tbl.{quote_identifier('DB_ID')} = db.{quote_identifier('DB_ID')}" | |
| f" WHERE db.{quote_identifier('NAME')} = :schema AND tbl.{quote_identifier('TBL_NAME')} = :table_name" | |
| f" ORDER BY pk.{quote_identifier('INTEGER_IDX')}" |
| for row in rows: | ||
| col_name = row[0].strip() if row[0] else "" | ||
| if col_name == "# Partition Information": | ||
| in_partition_section = True | ||
| continue | ||
| if in_partition_section: | ||
| if not col_name or col_name.startswith("# Detailed"): | ||
| break | ||
| if col_name.startswith("#"): | ||
| continue | ||
| partition_keys.append(col_name) |
There was a problem hiding this comment.
_get_partition_keys_from_describe() only detects partition keys when the DESCRIBE FORMATTED output contains a # Partition Information section. In this test suite we already account for a Hive describe variant where partition columns appear as duplicate rows without that sentinel (see the existing test_get_columns_deduplicates_partition_column_no_sentinel). In that scenario, get_table_partition_details() will incorrectly report the table as non-partitioned and won’t populate tablePartition. Consider adding a fallback (e.g., SHOW PARTITIONS ... parsing, or another heuristic) and a unit test to cover the no-sentinel format.
Code Review ✅ Approved 4 resolved / 4 findingsImplements a metadata flag for partition keys, resolving the recursive parsing issue and adding missing test coverage for DESCRIBE FORMATTED paths. Redundant driver lookups and exceptions during SHOW PARTITIONS on non-partitioned tables are also addressed. ✅ 4 resolved✅ Bug: Partition parser never exits section, collects metadata as keys
✅ Quality: No test coverage for DESCRIBE FORMATTED parsing path
✅ Quality: Redundant drivername lookup in metastore partition path
✅ Edge Case: SHOW PARTITIONS on non-partitioned table raises exception in Hive
OptionsDisplay: compact → Showing less information. Comment with these commands to change:
Was this helpful? React with 👍 / 👎 | Gitar |
| For other Hive drivers, it falls back to parsing | ||
| ``DESCRIBE FORMATTED`` output. |
There was a problem hiding this comment.
The docstring for this method says the non-metastore path “falls back to parsing DESCRIBE FORMATTED output”, but the implementation also falls back to SHOW PARTITIONS when no partition keys are found. Please update the docstring to reflect the actual behavior so callers/debugging aren’t misled.
| For other Hive drivers, it falls back to parsing | |
| ``DESCRIBE FORMATTED`` output. | |
| For other Hive drivers, it first parses ``DESCRIBE FORMATTED`` | |
| output and, if no partition keys are found there, falls back to | |
| ``SHOW PARTITIONS`` and infers keys from the partition specification. |
|
|



Fixes #26712
Marks Hive partition columns with a metadata flag so users can distinguish partitioning columns from regular data columns in the schema view.
Summary by Gitar
SHOW PARTITIONSin_get_partition_keys_from_describewhen standard metadata extraction fails.quote_identifierhelper inmetadata.py.SHOW PARTITIONSparsing and updated mock configurations forDESCRIBE FORMATTEDtests.This will update automatically on new commits.