-
Notifications
You must be signed in to change notification settings - Fork 2.1k
feat(hive): add MSSQL and Oracle backends for Hive metastore #26977
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
284263b
66a6af7
1001520
7de921f
393b27e
6d77886
3023a29
ddee240
7c9e950
1df706e
88b92c7
d5b6aa0
0d3d35c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,6 +12,7 @@ | |
| """ | ||
| Source connection handler | ||
| """ | ||
|
|
||
| from copy import deepcopy | ||
| from enum import Enum | ||
| from functools import singledispatch | ||
|
|
@@ -28,9 +29,15 @@ | |
| HiveConnection, | ||
| HiveScheme, | ||
| ) | ||
| from metadata.generated.schema.entity.services.connections.database.mssqlConnection import ( | ||
| MssqlConnection, | ||
| ) | ||
| from metadata.generated.schema.entity.services.connections.database.mysqlConnection import ( | ||
| MysqlConnection, | ||
| ) | ||
| from metadata.generated.schema.entity.services.connections.database.oracleConnection import ( | ||
| OracleConnection, | ||
| ) | ||
| from metadata.generated.schema.entity.services.connections.database.postgresConnection import ( | ||
| PostgresConnection, | ||
| ) | ||
|
|
@@ -56,6 +63,8 @@ | |
|
|
||
| HIVE_POSTGRES_SCHEME = "hive+postgres" | ||
| HIVE_MYSQL_SCHEME = "hive+mysql" | ||
| HIVE_MSSQL_SCHEME = "hive+mssql" | ||
| HIVE_ORACLE_SCHEME = "hive+oracle" | ||
|
|
||
| # Monkey-patch the pyhive.hive module to use our custom connection | ||
| import pyhive.hive | ||
|
|
@@ -203,6 +212,61 @@ class CustomMysqlConnection(MysqlConnection): | |
| ) | ||
|
|
||
|
|
||
| @get_metastore_connection.register | ||
| def _(connection: MssqlConnection): | ||
| # import required to load sqlalchemy plugin | ||
| # pylint: disable=import-outside-toplevel,unused-import | ||
| from metadata.ingestion.source.database.hive.metastore_dialects.mssql import ( # nopycln: import | ||
| HiveMssqlMetaStoreDialect, | ||
| ) | ||
|
|
||
| class CustomMssqlScheme(Enum): | ||
| HIVE_MSSQL = HIVE_MSSQL_SCHEME | ||
|
|
||
| class CustomMssqlConnection(MssqlConnection): | ||
| scheme: Optional[CustomMssqlScheme] | ||
|
|
||
| connection_copy = deepcopy(connection.__dict__) | ||
| connection_copy["scheme"] = CustomMssqlScheme.HIVE_MSSQL | ||
|
|
||
| custom_connection = CustomMssqlConnection(**connection_copy) | ||
|
|
||
| return create_generic_db_connection( | ||
| connection=custom_connection, | ||
| get_connection_url_fn=get_connection_url_common, | ||
| get_connection_args_fn=get_connection_args_common, | ||
| ) | ||
|
Comment on lines
+232
to
+238
|
||
|
|
||
|
|
||
| @get_metastore_connection.register | ||
| def _(connection: OracleConnection): | ||
| # import required to load sqlalchemy plugin | ||
| # pylint: disable=import-outside-toplevel,unused-import | ||
| from metadata.ingestion.source.database.hive.metastore_dialects.oracle import ( # nopycln: import | ||
| HiveOracleMetaStoreDialect, | ||
| ) | ||
| from metadata.ingestion.source.database.oracle.connection import ( | ||
| OracleConnection as OracleConnectionHandler, | ||
| ) | ||
|
|
||
|
Comment on lines
+241
to
+251
|
||
| class CustomOracleScheme(Enum): | ||
| HIVE_ORACLE = HIVE_ORACLE_SCHEME | ||
|
|
||
| class CustomOracleConnection(OracleConnection): | ||
| scheme: Optional[CustomOracleScheme] | ||
|
|
||
| connection_copy = deepcopy(connection.__dict__) | ||
| connection_copy["scheme"] = CustomOracleScheme.HIVE_ORACLE | ||
|
|
||
| custom_connection = CustomOracleConnection(**connection_copy) | ||
|
|
||
| return create_generic_db_connection( | ||
| connection=custom_connection, | ||
| get_connection_url_fn=OracleConnectionHandler.get_connection_url, | ||
| get_connection_args_fn=get_connection_args_common, | ||
| ) | ||
|
Comment on lines
+263
to
+267
|
||
|
|
||
|
|
||
| def test_connection( | ||
| metadata: OpenMetadata, | ||
| engine: Engine, | ||
|
|
@@ -218,20 +282,27 @@ def test_connection( | |
| metastore_conn = service_connection.metastoreConnection | ||
|
|
||
| if metastore_conn: | ||
| if isinstance(metastore_conn, (PostgresConnection, MysqlConnection)): | ||
| if isinstance( | ||
| metastore_conn, | ||
| (PostgresConnection, MysqlConnection, MssqlConnection, OracleConnection), | ||
| ): | ||
| engine = get_metastore_connection(metastore_conn) | ||
| elif isinstance(metastore_conn, dict) and len(metastore_conn) > 0: | ||
| try: | ||
| service_connection.metastoreConnection = ( | ||
| PostgresConnection.model_validate(metastore_conn) | ||
| ) | ||
| except ValidationError: | ||
| for conn_cls in ( | ||
| PostgresConnection, | ||
| MysqlConnection, | ||
| MssqlConnection, | ||
| OracleConnection, | ||
| ): | ||
| try: | ||
| service_connection.metastoreConnection = ( | ||
| MysqlConnection.model_validate(metastore_conn) | ||
| service_connection.metastoreConnection = conn_cls.model_validate( | ||
| metastore_conn | ||
| ) | ||
| break | ||
| except ValidationError: | ||
| raise ValueError("Invalid metastore connection") | ||
| continue | ||
| else: | ||
| raise ValueError("Invalid metastore connection") | ||
| engine = get_metastore_connection(service_connection.metastoreConnection) | ||
|
|
||
| return test_connection_db_schema_sources( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,25 @@ | ||
| # Copyright 2025 Collate | ||
| # Licensed under the Collate Community License, Version 1.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
| """ | ||
| Hive Metastore MSSQL Dialect | ||
| """ | ||
|
|
||
| from sqlalchemy.dialects import registry | ||
|
|
||
| from .dialect import HiveMssqlMetaStoreDialect | ||
|
|
||
| __version__ = "0.1.0" | ||
| __all__ = ["HiveMssqlMetaStoreDialect"] | ||
| registry.register( | ||
| "hive.mssql", | ||
| "metadata.ingestion.source.database.hive.metastore_dialects.mssql.dialect", | ||
| "HiveMssqlMetaStoreDialect", | ||
| ) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,156 @@ | ||
| # Copyright 2025 Collate | ||
| # Licensed under the Collate Community License, Version 1.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
| """ | ||
| Hive Metastore MSSQL Dialect Mixin | ||
| """ | ||
|
|
||
| from sqlalchemy import text | ||
| from sqlalchemy.dialects.mssql.base import MSDialect | ||
| from sqlalchemy.engine import reflection | ||
|
Comment on lines
+15
to
+17
|
||
|
|
||
| from metadata.ingestion.source.database.hive.metastore_dialects.mixin import ( | ||
| HiveMetaStoreDialectMixin, | ||
| ) | ||
| from metadata.utils.logger import ingestion_logger | ||
| from metadata.utils.sqlalchemy_utils import ( | ||
| get_table_comment_wrapper, | ||
| get_view_definition_wrapper, | ||
| ) | ||
|
|
||
| logger = ingestion_logger() | ||
|
|
||
|
|
||
| # pylint: disable=abstract-method | ||
| class HiveMssqlMetaStoreDialect(HiveMetaStoreDialectMixin, MSDialect): | ||
| """ | ||
| MSSQL metastore dialect class for Hive metastore backed by SQL Server. | ||
| Uses unquoted identifiers and supports CTEs. | ||
| """ | ||
|
|
||
| name = "hive" | ||
| driver = "mssql" | ||
| supports_statement_cache = False | ||
|
Comment on lines
+15
to
+40
|
||
|
|
||
| def get_schema_names(self, connection, **kw): | ||
| # Equivalent to SHOW DATABASES | ||
| schema_names = [ | ||
| row[0] for row in connection.execute(text("SELECT NAME FROM DBS")) | ||
| ] | ||
| logger.debug(f"Fetched schema names: {schema_names}") | ||
| return schema_names | ||
|
|
||
| # pylint: disable=arguments-differ | ||
| def get_view_names(self, connection, schema=None, **kw): | ||
| query, params = self._get_table_names_base_query(schema=schema) | ||
| query += " WHERE TBL_TYPE = 'VIRTUAL_VIEW'" | ||
| view_names = [row[0] for row in connection.execute(text(query), params)] | ||
| logger.debug(f"Fetched view names for schema '{schema}': {view_names}") | ||
| return view_names | ||
|
|
||
| def _get_table_columns(self, connection, table_name, schema): | ||
| params = {"table_name": table_name} | ||
| schema_join = ( | ||
| """ | ||
| JOIN DBS db ON tbsl.DB_ID = db.DB_ID | ||
| AND db.NAME = :schema | ||
| """ | ||
|
Comment on lines
+60
to
+64
|
||
| if schema | ||
| else "" | ||
| ) | ||
| if schema: | ||
| params["schema"] = schema | ||
|
|
||
| query = f""" | ||
| WITH regular_columns AS ( | ||
| SELECT | ||
| col.COLUMN_NAME, | ||
| col.TYPE_NAME, | ||
| col.COMMENT | ||
| FROM COLUMNS_V2 col | ||
| JOIN CDS cds ON col.CD_ID = cds.CD_ID | ||
| JOIN SDS sds ON sds.CD_ID = cds.CD_ID | ||
| JOIN TBLS tbsl ON sds.SD_ID = tbsl.SD_ID | ||
| AND tbsl.TBL_NAME = :table_name | ||
| {schema_join} | ||
| ), | ||
| partition_columns AS ( | ||
| SELECT | ||
| pk.PKEY_NAME AS COLUMN_NAME, | ||
| pk.PKEY_TYPE AS TYPE_NAME, | ||
| pk.PKEY_COMMENT AS COMMENT | ||
| FROM PARTITION_KEYS pk | ||
| JOIN TBLS tbsl ON pk.TBL_ID = tbsl.TBL_ID | ||
| AND tbsl.TBL_NAME = :table_name | ||
| {schema_join} | ||
| ) | ||
|
Comment on lines
+58
to
+93
|
||
| SELECT * FROM regular_columns | ||
| UNION ALL | ||
| SELECT * FROM partition_columns | ||
| """ | ||
| return connection.execute(text(query), params).fetchall() | ||
|
|
||
| def _get_table_names_base_query(self, schema=None): | ||
| query = "SELECT TBL_NAME FROM TBLS tbl" | ||
| params = {} | ||
| if schema: | ||
| query += " JOIN DBS db ON tbl.DB_ID = db.DB_ID AND db.NAME = :schema" | ||
| params["schema"] = schema | ||
| return query, params | ||
|
|
||
| def get_table_names(self, connection, schema=None, **kw): | ||
| query, params = self._get_table_names_base_query(schema=schema) | ||
| query += " WHERE (TBL_TYPE != 'VIRTUAL_VIEW' OR TBL_TYPE IS NULL)" | ||
| table_names = [row[0] for row in connection.execute(text(query), params)] | ||
| logger.debug(f"Fetched table names for schema '{schema}': {table_names}") | ||
| return table_names | ||
|
|
||
| @reflection.cache | ||
| def get_view_definition(self, connection, view_name, schema=None, **kw): | ||
| query = """ | ||
| SELECT | ||
| dbs.NAME AS [schema], | ||
| tbls.TBL_NAME AS view_name, | ||
| tbls.VIEW_ORIGINAL_TEXT AS view_def | ||
| FROM TBLS tbls | ||
| JOIN DBS dbs ON tbls.DB_ID = dbs.DB_ID | ||
| WHERE tbls.VIEW_ORIGINAL_TEXT IS NOT NULL | ||
| """ | ||
| return get_view_definition_wrapper( | ||
| self, | ||
| connection, | ||
| table_name=view_name, | ||
| schema=schema, | ||
| query=query, | ||
| ) | ||
|
|
||
| @reflection.cache | ||
| def get_table_comment(self, connection, table_name, schema=None, **kw): | ||
| query = """ | ||
| SELECT | ||
| DBS.NAME AS [schema], | ||
| TBLS.TBL_NAME AS table_name, | ||
| TABLE_PARAMS.PARAM_VALUE AS table_comment | ||
| FROM DBS | ||
| JOIN TBLS ON DBS.DB_ID = TBLS.DB_ID | ||
| LEFT JOIN TABLE_PARAMS ON TBLS.TBL_ID = TABLE_PARAMS.TBL_ID | ||
| AND TABLE_PARAMS.PARAM_KEY = 'comment' | ||
| """ | ||
| return get_table_comment_wrapper( | ||
| self, | ||
| connection, | ||
| table_name=table_name, | ||
| schema=schema, | ||
| query=query, | ||
| ) | ||
|
|
||
| # pylint: disable=arguments-renamed | ||
| def get_dialect_cls(self): | ||
| return HiveMssqlMetaStoreDialect | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For MSSQL metastore connections this overwrites the user-selected
MssqlConnection.scheme(mssql+pyodbc/pytds/pymssql) with a singlehive+mssqlscheme. SincemssqlConnection.jsondefines multiple schemes (defaulting tomssql+pytds), this can change the DBAPI/driver used for the metastore and break connections in environments that rely on the configured/default scheme. Consider preserving the original scheme and applying the Hive metastore reflection behavior another way, or mapping each supported MSSQL scheme to a distinct hive-metastore scheme/dialect so driver selection is retained.