Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"""
Source connection handler
"""

from copy import deepcopy
from enum import Enum
from functools import singledispatch
Expand All @@ -28,9 +29,15 @@
HiveConnection,
HiveScheme,
)
from metadata.generated.schema.entity.services.connections.database.mssqlConnection import (
MssqlConnection,
)
from metadata.generated.schema.entity.services.connections.database.mysqlConnection import (
MysqlConnection,
)
from metadata.generated.schema.entity.services.connections.database.oracleConnection import (
OracleConnection,
)
from metadata.generated.schema.entity.services.connections.database.postgresConnection import (
PostgresConnection,
)
Expand All @@ -56,6 +63,8 @@

HIVE_POSTGRES_SCHEME = "hive+postgres"
HIVE_MYSQL_SCHEME = "hive+mysql"
HIVE_MSSQL_SCHEME = "hive+mssql"
HIVE_ORACLE_SCHEME = "hive+oracle"

# Monkey-patch the pyhive.hive module to use our custom connection
import pyhive.hive
Expand Down Expand Up @@ -203,6 +212,61 @@ class CustomMysqlConnection(MysqlConnection):
)


@get_metastore_connection.register
def _(connection: MssqlConnection):
# import required to load sqlalchemy plugin
# pylint: disable=import-outside-toplevel,unused-import
from metadata.ingestion.source.database.hive.metastore_dialects.mssql import ( # nopycln: import
HiveMssqlMetaStoreDialect,
)

class CustomMssqlScheme(Enum):
HIVE_MSSQL = HIVE_MSSQL_SCHEME

class CustomMssqlConnection(MssqlConnection):
scheme: Optional[CustomMssqlScheme]

connection_copy = deepcopy(connection.__dict__)
connection_copy["scheme"] = CustomMssqlScheme.HIVE_MSSQL

custom_connection = CustomMssqlConnection(**connection_copy)

Comment on lines +229 to +233
Copy link

Copilot AI Apr 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For MSSQL metastore connections this overwrites the user-selected MssqlConnection.scheme (mssql+pyodbc/pytds/pymssql) with a single hive+mssql scheme. Since mssqlConnection.json defines multiple schemes (defaulting to mssql+pytds), this can change the DBAPI/driver used for the metastore and break connections in environments that rely on the configured/default scheme. Consider preserving the original scheme and applying the Hive metastore reflection behavior another way, or mapping each supported MSSQL scheme to a distinct hive-metastore scheme/dialect so driver selection is retained.

Copilot uses AI. Check for mistakes.
return create_generic_db_connection(
connection=custom_connection,
get_connection_url_fn=get_connection_url_common,
get_connection_args_fn=get_connection_args_common,
)
Comment on lines +232 to +238
Copy link

Copilot AI Apr 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HiveMssqlMetaStoreDialect subclasses MSDialect_pyodbc, but this metastore connection handler builds the engine using get_connection_url_common after forcing the scheme to hive+mssql. get_connection_url_common does not include the pyodbc-specific URL pieces (e.g., ?driver=... or odbc_connect), and the driver field on MssqlConnection will be ignored. This is likely to fail at runtime for SQL Server metastore connections. Consider using the existing MSSQL pyodbc URL builder (or a dedicated metastore URL builder) while still using the hive+mssql dialect prefix.

Copilot uses AI. Check for mistakes.


@get_metastore_connection.register
def _(connection: OracleConnection):
# import required to load sqlalchemy plugin
# pylint: disable=import-outside-toplevel,unused-import
from metadata.ingestion.source.database.hive.metastore_dialects.oracle import ( # nopycln: import
HiveOracleMetaStoreDialect,
)
from metadata.ingestion.source.database.oracle.connection import (
OracleConnection as OracleConnectionHandler,
)

Comment on lines +241 to +251
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oracle metastore connection path doesn’t apply the cx_Oracle→oracledb compatibility shim used by the main Oracle connector (sys.modules["cx_Oracle"] = oracledb, version pinning, etc.). Since HiveOracleMetaStoreDialect inherits OracleDialect_cx_oracle, engine creation will fail unless cx_Oracle is installed. Reuse the Oracle connector’s URL builder / initialization logic (or switch the dialect to the oracledb driver) so the required DBAPI is consistently available.

Copilot uses AI. Check for mistakes.
class CustomOracleScheme(Enum):
HIVE_ORACLE = HIVE_ORACLE_SCHEME

class CustomOracleConnection(OracleConnection):
scheme: Optional[CustomOracleScheme]

connection_copy = deepcopy(connection.__dict__)
connection_copy["scheme"] = CustomOracleScheme.HIVE_ORACLE

custom_connection = CustomOracleConnection(**connection_copy)

return create_generic_db_connection(
connection=custom_connection,
get_connection_url_fn=OracleConnectionHandler.get_connection_url,
get_connection_args_fn=get_connection_args_common,
)
Comment on lines +263 to +267
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oracle metastore handler builds the SQLAlchemy URL via get_connection_url_common(), but OracleConnection doesn’t have a database/databaseSchema field and requires oracleConnectionType (service_name / schema / TNS). As a result the generated hive+oracle://... URL will omit the service name/schema and is likely invalid. Consider reusing the existing Oracle URL builder (e.g., OracleConnection.get_connection_url / a shared helper) so oracleConnectionType is encoded into the URL correctly for Hive metastore connections.

Copilot uses AI. Check for mistakes.


def test_connection(
metadata: OpenMetadata,
engine: Engine,
Expand All @@ -218,20 +282,27 @@ def test_connection(
metastore_conn = service_connection.metastoreConnection

if metastore_conn:
if isinstance(metastore_conn, (PostgresConnection, MysqlConnection)):
if isinstance(
metastore_conn,
(PostgresConnection, MysqlConnection, MssqlConnection, OracleConnection),
):
engine = get_metastore_connection(metastore_conn)
elif isinstance(metastore_conn, dict) and len(metastore_conn) > 0:
try:
service_connection.metastoreConnection = (
PostgresConnection.model_validate(metastore_conn)
)
except ValidationError:
for conn_cls in (
PostgresConnection,
MysqlConnection,
MssqlConnection,
OracleConnection,
):
try:
service_connection.metastoreConnection = (
MysqlConnection.model_validate(metastore_conn)
service_connection.metastoreConnection = conn_cls.model_validate(
metastore_conn
)
break
except ValidationError:
raise ValueError("Invalid metastore connection")
continue
else:
raise ValueError("Invalid metastore connection")
engine = get_metastore_connection(service_connection.metastoreConnection)

return test_connection_db_schema_sources(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"""
Hive lineage module
"""

from typing import Optional

from metadata.generated.schema.entity.services.connections.database.hiveConnection import (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,15 @@
from metadata.generated.schema.entity.services.connections.database.hiveConnection import (
HiveConnection,
)
from metadata.generated.schema.entity.services.connections.database.mssqlConnection import (
MssqlConnection,
)
from metadata.generated.schema.entity.services.connections.database.mysqlConnection import (
MysqlConnection,
)
from metadata.generated.schema.entity.services.connections.database.oracleConnection import (
OracleConnection,
)
from metadata.generated.schema.entity.services.connections.database.postgresConnection import (
PostgresConnection,
)
Expand Down Expand Up @@ -84,7 +90,9 @@

def _get_validated_metastore_connection(
self,
) -> Optional[Union[PostgresConnection, MysqlConnection]]:
) -> Optional[
Union[PostgresConnection, MysqlConnection, MssqlConnection, OracleConnection]
]:

Check warning on line 95 in ingestion/src/metadata/ingestion/source/database/hive/metadata.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Use a union type expression for this type hint.

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZ2GTKrH_PJUhgHKR3Zh&open=AZ2GTKrH_PJUhgHKR3Zh&pullRequest=26977
"""
Validate and return the metastore connection if it exists.
Handles cases where the connection may be a raw dict that needs validation.
Expand All @@ -94,18 +102,25 @@
if not metastore_conn:
return None

if isinstance(metastore_conn, (PostgresConnection, MysqlConnection)):
# Supported metastore connection types
METASTORE_CONNECTION_TYPES = (
PostgresConnection,
MysqlConnection,
MssqlConnection,
OracleConnection,
)

if isinstance(metastore_conn, METASTORE_CONNECTION_TYPES):
return metastore_conn

if isinstance(metastore_conn, dict) and len(metastore_conn) > 0:
try:
return PostgresConnection.model_validate(metastore_conn)
except ValidationError:
for conn_cls in METASTORE_CONNECTION_TYPES:
try:
return MysqlConnection.model_validate(metastore_conn)
return conn_cls.model_validate(metastore_conn)
except ValidationError:
logger.warning("Invalid metastore connection configuration")
return None
continue
logger.warning("Invalid metastore connection configuration")
return None

return None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"""
Hive Metastore Dialect Mixin
"""

from sqlalchemy.engine import reflection

from metadata.ingestion.source.database.hive.utils import get_columns
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Hive Metastore MSSQL Dialect
"""

from sqlalchemy.dialects import registry

from .dialect import HiveMssqlMetaStoreDialect

__version__ = "0.1.0"
__all__ = ["HiveMssqlMetaStoreDialect"]
registry.register(
"hive.mssql",
"metadata.ingestion.source.database.hive.metastore_dialects.mssql.dialect",
"HiveMssqlMetaStoreDialect",
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Hive Metastore MSSQL Dialect Mixin
"""

from sqlalchemy import text
from sqlalchemy.dialects.mssql.base import MSDialect
from sqlalchemy.engine import reflection
Comment on lines +15 to +17
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HiveMssqlMetaStoreDialect inherits from MSDialect_pyodbc, which forces a pyodbc DBAPI dependency and expects pyodbc-style URL parameters (e.g., driver=). However the Hive metastore connection code overrides the scheme to hive+mssql and uses get_connection_url_common(), which does not include the ODBC driver and will commonly be used with the default MSSQL stack (sqlalchemy-pytds) where pyodbc isn’t installed. This combination will likely break MSSQL metastore connections unless users install the separate mssql-odbc extras and configure driver options. Prefer inheriting from sqlalchemy_pytds.dialect.MSDialect_pytds (matching the default mssql+pytds support), or otherwise align the dialect base class and URL builder with the intended MSSQL driver requirements.

Copilot uses AI. Check for mistakes.

from metadata.ingestion.source.database.hive.metastore_dialects.mixin import (
HiveMetaStoreDialectMixin,
)
from metadata.utils.logger import ingestion_logger
from metadata.utils.sqlalchemy_utils import (
get_table_comment_wrapper,
get_view_definition_wrapper,
)

logger = ingestion_logger()


# pylint: disable=abstract-method
class HiveMssqlMetaStoreDialect(HiveMetaStoreDialectMixin, MSDialect):
"""
MSSQL metastore dialect class for Hive metastore backed by SQL Server.
Uses unquoted identifiers and supports CTEs.
"""

name = "hive"
driver = "mssql"
supports_statement_cache = False
Comment on lines +15 to +40
Copy link

Copilot AI Apr 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This dialect subclasses sqlalchemy.dialects.mssql.base.MSDialect, but MSSQL connectivity in this codebase is scheme/driver-specific (mssql+pyodbc, mssql+pytds, mssql+pymssql). If the metastore dialect always uses the base dialect, it can’t reflect the configured driver’s behavior and can diverge from the scheme used elsewhere (notably default mssql+pytds). Consider providing driver-specific Hive metastore dialects (e.g. based on the selected scheme) or explicitly constraining/validating which MSSQL driver is supported for Hive metastore connections.

Copilot uses AI. Check for mistakes.

def get_schema_names(self, connection, **kw):
# Equivalent to SHOW DATABASES
schema_names = [
row[0] for row in connection.execute(text("SELECT NAME FROM DBS"))
]
logger.debug(f"Fetched schema names: {schema_names}")
return schema_names

# pylint: disable=arguments-differ
def get_view_names(self, connection, schema=None, **kw):
query, params = self._get_table_names_base_query(schema=schema)
query += " WHERE TBL_TYPE = 'VIRTUAL_VIEW'"
view_names = [row[0] for row in connection.execute(text(query), params)]
logger.debug(f"Fetched view names for schema '{schema}': {view_names}")
return view_names

def _get_table_columns(self, connection, table_name, schema):
params = {"table_name": table_name}
schema_join = (
"""
JOIN DBS db ON tbsl.DB_ID = db.DB_ID
AND db.NAME = :schema
"""
Comment on lines +60 to +64
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This query interpolates schema and table_name directly into SQL via f-strings. Besides being vulnerable to SQL injection if these values are ever user-controlled, it will also break for legitimate names containing quotes/special characters. Prefer using bound parameters with text() (e.g., ... = :schema, ... = :table_name) and passing values via .bindparams(...), or use SQLAlchemy constructs for safe quoting.

Copilot uses AI. Check for mistakes.
if schema
else ""
)
if schema:
params["schema"] = schema

query = f"""
WITH regular_columns AS (
SELECT
col.COLUMN_NAME,
col.TYPE_NAME,
col.COMMENT
FROM COLUMNS_V2 col
JOIN CDS cds ON col.CD_ID = cds.CD_ID
JOIN SDS sds ON sds.CD_ID = cds.CD_ID
JOIN TBLS tbsl ON sds.SD_ID = tbsl.SD_ID
AND tbsl.TBL_NAME = :table_name
{schema_join}
),
partition_columns AS (
SELECT
pk.PKEY_NAME AS COLUMN_NAME,
pk.PKEY_TYPE AS TYPE_NAME,
pk.PKEY_COMMENT AS COMMENT
FROM PARTITION_KEYS pk
JOIN TBLS tbsl ON pk.TBL_ID = tbsl.TBL_ID
AND tbsl.TBL_NAME = :table_name
{schema_join}
)
Comment on lines +58 to +93
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These queries interpolate schema/table_name directly into SQL strings. Since schema can ultimately come from user-provided config (and table_name may include quotes), this creates an injection risk and can also break on names containing '. Use SQLAlchemy bind parameters for values in predicates (e.g., ... db.NAME = :schema, ... TBL_NAME = :table_name) instead of f-string substitution.

Copilot uses AI. Check for mistakes.
SELECT * FROM regular_columns
UNION ALL
SELECT * FROM partition_columns
"""
return connection.execute(text(query), params).fetchall()

def _get_table_names_base_query(self, schema=None):
query = "SELECT TBL_NAME FROM TBLS tbl"
params = {}
if schema:
query += " JOIN DBS db ON tbl.DB_ID = db.DB_ID AND db.NAME = :schema"
params["schema"] = schema
return query, params

def get_table_names(self, connection, schema=None, **kw):
query, params = self._get_table_names_base_query(schema=schema)
query += " WHERE (TBL_TYPE != 'VIRTUAL_VIEW' OR TBL_TYPE IS NULL)"
table_names = [row[0] for row in connection.execute(text(query), params)]
logger.debug(f"Fetched table names for schema '{schema}': {table_names}")
return table_names

@reflection.cache
def get_view_definition(self, connection, view_name, schema=None, **kw):
query = """
SELECT
dbs.NAME AS [schema],
tbls.TBL_NAME AS view_name,
tbls.VIEW_ORIGINAL_TEXT AS view_def
FROM TBLS tbls
JOIN DBS dbs ON tbls.DB_ID = dbs.DB_ID
WHERE tbls.VIEW_ORIGINAL_TEXT IS NOT NULL
"""
return get_view_definition_wrapper(
self,
connection,
table_name=view_name,
schema=schema,
query=query,
)

@reflection.cache
def get_table_comment(self, connection, table_name, schema=None, **kw):
query = """
SELECT
DBS.NAME AS [schema],
TBLS.TBL_NAME AS table_name,
TABLE_PARAMS.PARAM_VALUE AS table_comment
FROM DBS
JOIN TBLS ON DBS.DB_ID = TBLS.DB_ID
LEFT JOIN TABLE_PARAMS ON TBLS.TBL_ID = TABLE_PARAMS.TBL_ID
AND TABLE_PARAMS.PARAM_KEY = 'comment'
"""
return get_table_comment_wrapper(
self,
connection,
table_name=table_name,
schema=schema,
query=query,
)

# pylint: disable=arguments-renamed
def get_dialect_cls(self):
return HiveMssqlMetaStoreDialect
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"""
Hive Metastore Mysql Dialect
"""

from sqlalchemy.dialects import registry

from .dialect import HiveMysqlMetaStoreDialect
Expand Down
Loading
Loading