diff --git a/ingestion/src/metadata/ingestion/connections/builders.py b/ingestion/src/metadata/ingestion/connections/builders.py index 5ed6fb40bd56..cc982efbddf0 100644 --- a/ingestion/src/metadata/ingestion/connections/builders.py +++ b/ingestion/src/metadata/ingestion/connections/builders.py @@ -34,6 +34,7 @@ from metadata.ingestion.connections.headers import inject_query_header_by_conn from metadata.ingestion.connections.query_logger import attach_query_tracker from metadata.ingestion.connections.secrets import connection_with_options_secrets +from metadata.ingestion.models.custom_pydantic import strip_hostport_scheme from metadata.utils.constants import BUILDER_PASSWORD_ATTR from metadata.utils.logger import cli_logger @@ -185,7 +186,7 @@ def get_connection_url_common(connection) -> str: url = _add_password(url, connection) url += "@" - url += connection.hostPort + url += strip_hostport_scheme(connection.hostPort) if hasattr(connection, "database"): url += f"/{connection.database}" if connection.database else "" diff --git a/ingestion/src/metadata/ingestion/models/custom_pydantic.py b/ingestion/src/metadata/ingestion/models/custom_pydantic.py index 1c496607955f..95a010660d74 100644 --- a/ingestion/src/metadata/ingestion/models/custom_pydantic.py +++ b/ingestion/src/metadata/ingestion/models/custom_pydantic.py @@ -19,6 +19,7 @@ import json import logging from typing import Any, Callable, Dict, Literal, Optional, Union +from urllib.parse import urlparse from pydantic import BaseModel as PydanticBaseModel from pydantic import WrapSerializer, model_validator @@ -35,6 +36,101 @@ JSON_ENCODERS = "json_encoders" +def strip_hostport_scheme(raw: str) -> str: + """ + Strip an accidental URL scheme from a hostPort string. + + Self-contained helper that depends only on the standard library so it + can be imported from ``builders.py`` and ``db_utils.py`` without circular + imports. + + Raises ValueError if the value cannot be resolved to a valid + ``hostname[:port]`` string — e.g. when the port is non-numeric or when + the hostname is empty after stripping the scheme. This applies to both + standard URLs handled by ``urlparse`` and to JDBC-style URLs handled by + the fallback branch (e.g. ``jdbc:postgresql://host:abc/db``). + """ + value = raw.strip() + if "://" not in value: + return value + + parsed = urlparse(value) + hostname = parsed.hostname or "" + + # Build a sanitised label for log/error messages that identifies the + # problematic input without leaking credentials or a full path. + if parsed.scheme and hostname: + safe_label = f"{parsed.scheme}://{hostname}" + else: + # urlparse couldn't extract a hostname (e.g. jdbc:postgresql://…). + # Construct the label from the raw tail so messages remain actionable. + tail_for_label = value.rsplit("://", 1)[-1].split("/", 1)[0] + if "@" in tail_for_label: + tail_for_label = tail_for_label.rsplit("@", 1)[-1] + safe_label = tail_for_label or value + + logger.warning( + "The hostPort '%s' contains a URL scheme. Expected format is " + "'hostname[:port]' (e.g. 'localhost:3306'). Stripping the scheme prefix.", + safe_label, + ) + try: + port = parsed.port + except ValueError as exc: + raise ValueError( + f"Invalid hostPort '{safe_label}'. Expected format is " + "'hostname[:port]' (e.g. 'localhost:3306')." + ) from exc + + if not hostname: + # urlparse couldn't extract a hostname (e.g. 'jdbc:postgresql://host:5432/db'). + # Fall back to stripping the scheme and any trailing path/query/fragment/userinfo. + tail = value.rsplit("://", 1)[-1] + for sep in ("/", "?", "#"): + tail = tail.split(sep, 1)[0] + if "@" in tail: + tail = tail.rsplit("@", 1)[-1] + + if not tail: + raise ValueError( + f"Invalid hostPort '{safe_label}'. Expected format is " + "'hostname[:port]' (e.g. 'localhost:3306')." + ) + + # Validate the port in the fallback path so the same ValueError + # contract holds for JDBC-style URLs and other malformed inputs. + fallback_parsed = urlparse(f"//{tail}") + try: + fallback_hostname = fallback_parsed.hostname or "" + _ = fallback_parsed.port # raises ValueError for non-numeric ports + except ValueError as exc: + raise ValueError( + f"Invalid hostPort '{safe_label}'. Expected format is " + "'hostname[:port]' (e.g. 'localhost:3306')." + ) from exc + if not fallback_hostname: + raise ValueError( + f"Invalid hostPort '{safe_label}'. Expected format is " + "'hostname[:port]' (e.g. 'localhost:3306')." + ) + return tail + + host = f"[{hostname}]" if ":" in hostname else hostname + return f"{host}:{port}" if port is not None else host + + +# Backwards-compatible private alias retained for any internal callers that +# pinned to the original underscored symbol while the helper was private. +_strip_hostport_scheme = strip_hostport_scheme + +# Module sub-path that identifies database-connector classes. +# Only *Connection classes whose module contains this sub-path have a plain +# ``hostname[:port]`` hostPort — all other connection categories +# (dashboard, pipeline, search, metadata, …) legitimately store a full URL +# in hostPort and must NOT have their scheme stripped. +_DATABASE_CONNECTION_MODULE_MARKER = ".services.connections.database." + + class BaseModel(PydanticBaseModel): """ Base model for OpenMetadata generated models. @@ -46,14 +142,36 @@ def model_post_init(self, context: Any, /): This function is used to parse the FilterPattern fields for the Connection classes. This is needed because dict is defined in the JSON schema for the FilterPattern field, but a FilterPattern object is required in the generated code. + + Additionally, for Connection classes that store a plain ``hostname[:port]`` + in ``hostPort``, any accidental URL scheme prefix (e.g. ``http://``) is + stripped here so that all downstream connector code — including connectors + that call ``connection.hostPort.split(":")`` directly — receives a clean value. """ # pylint: disable=import-outside-toplevel + if not self.__class__.__name__.endswith("Connection"): + # Only process Connection classes + return + if not hasattr(self, "__pydantic_fields__"): + return + + # Strip accidental URL schemes from hostPort at model construction time, + # but ONLY for database-connector classes whose hostPort is a plain + # ``hostname[:port]``. Dashboard, pipeline, search, and other connector + # categories legitimately use a full URL in hostPort (e.g. + # ``http://grafana:3000``), so we restrict stripping to classes whose + # module path contains the database-connection marker. + # The isinstance(str) guard prevents an AttributeError when hostPort is + # optional and left unset (None). + host_port = getattr(self, "hostPort", None) + if ( + isinstance(host_port, str) + and "://" in host_port + and _DATABASE_CONNECTION_MODULE_MARKER in (self.__class__.__module__ or "") + ): + self.hostPort = strip_hostport_scheme(host_port) + try: - if not self.__class__.__name__.endswith("Connection"): - # Only parse FilterPattern for Connection classes - return - if not hasattr(self, "__pydantic_fields__"): - return for field in self.__pydantic_fields__: if field.endswith("FilterPattern"): from metadata.generated.schema.type.filterPattern import ( diff --git a/ingestion/src/metadata/utils/db_utils.py b/ingestion/src/metadata/utils/db_utils.py index 15543aaee9b4..3bebb820b7ad 100644 --- a/ingestion/src/metadata/utils/db_utils.py +++ b/ingestion/src/metadata/utils/db_utils.py @@ -33,6 +33,7 @@ get_lineage_by_query, get_lineage_via_table_entity, ) +from metadata.ingestion.models.custom_pydantic import strip_hostport_scheme from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.models import TableView from metadata.utils import fqn @@ -44,12 +45,32 @@ PUBLIC_SCHEMA = "public" +def clean_host_port(host_port: str) -> str: + """ + Strip URL scheme prefixes from a hostPort string. + + Users sometimes enter a full URL (e.g. 'http://localhost:3306') + instead of just 'localhost:3306'. This strips the scheme to avoid + ValueError when parsing host and port. + + Delegates to the stdlib-only helper colocated with ``BaseModel`` so the + behaviour stays in lockstep with Pydantic's ``model_post_init`` hook. + """ + value = host_port.strip() + if "://" not in value: + return value.rstrip("/") + return strip_hostport_scheme(value) + + def get_host_from_host_port(uri: str) -> str: """ if uri is like "localhost:9000" then return the host "localhost" """ - return uri.split(":")[0] + cleaned = clean_host_port(uri) + if cleaned.startswith("["): + return cleaned.split("]")[0] + "]" + return cleaned.split(":")[0] # pylint: disable=too-many-locals diff --git a/ingestion/tests/unit/test_build_connection_url.py b/ingestion/tests/unit/test_build_connection_url.py index 3ce1e58f9399..19179516d7f6 100644 --- a/ingestion/tests/unit/test_build_connection_url.py +++ b/ingestion/tests/unit/test_build_connection_url.py @@ -97,6 +97,34 @@ def test_get_connection_url_mysql(self): "mysql+pymysql://openmetadata_user:mocked_token@localhost:3306/openmetadata_db", ) + def test_get_connection_url_mysql_with_url_scheme(self): + """hostPort with http:// prefix should be cleaned automatically""" + connection = MysqlConnectionConfig( + username="openmetadata_user", + authType=BasicAuth(password="openmetadata_password"), + hostPort="http://localhost:3306", + databaseSchema="openmetadata_db", + ) + engine_connection = MySQLConnection(connection).client + self.assertEqual( + engine_connection.url.render_as_string(hide_password=False), + "mysql+pymysql://openmetadata_user:openmetadata_password@localhost:3306/openmetadata_db", + ) + + def test_get_connection_url_postgres_with_url_scheme(self): + """hostPort with https:// prefix should be cleaned automatically""" + connection = PostgresConnectionConfig( + username="openmetadata_user", + authType=BasicAuth(password="openmetadata_password"), + hostPort="https://localhost:5432", + database="openmetadata_db", + ) + engine_connection = PostgresConnection(connection).client + self.assertEqual( + engine_connection.url.render_as_string(hide_password=False), + "postgresql+psycopg2://openmetadata_user:openmetadata_password@localhost:5432/openmetadata_db", + ) + def test_get_connection_url_postgres(self): connection = PostgresConnectionConfig( username="openmetadata_user", diff --git a/ingestion/tests/unit/test_connection_builders.py b/ingestion/tests/unit/test_connection_builders.py index b5ac9dde1dc7..c47d04970915 100644 --- a/ingestion/tests/unit/test_connection_builders.py +++ b/ingestion/tests/unit/test_connection_builders.py @@ -17,14 +17,21 @@ from metadata.generated.schema.entity.services.connections.database.common.basicAuth import ( BasicAuth, ) +from metadata.generated.schema.entity.services.connections.database.cassandraConnection import ( + CassandraConnection, +) from metadata.generated.schema.entity.services.connections.database.mysqlConnection import ( MysqlConnection, ) +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) from metadata.ingestion.connections.builders import ( get_connection_args_common, get_connection_options_dict, init_empty_connection_arguments, ) +from metadata.ingestion.models.custom_pydantic import _DATABASE_CONNECTION_MODULE_MARKER class ConnectionBuilderTest(TestCase): @@ -78,3 +85,78 @@ def test_init_empty_connection_arguments(self): self.assertEqual(new_args.root.get("hello"), "world") self.assertIsNone(new_args.root.get("not there")) + + def test_model_post_init_strips_url_scheme_from_hostport(self): + """ + Verify that model_post_init strips URL schemes from hostPort at + construction time so connectors that call hostPort.split(":") directly + receive a clean hostname[:port] value. + """ + # http:// prefix should be stripped + conn = MysqlConnection( + username="user", + authType=BasicAuth(password="pass"), + hostPort="http://localhost:3306", + ) + self.assertEqual(conn.hostPort, "localhost:3306") + + # https:// prefix should be stripped + conn2 = MysqlConnection( + username="user", + authType=BasicAuth(password="pass"), + hostPort="https://myhost:5432", + ) + self.assertEqual(conn2.hostPort, "myhost:5432") + + # Already clean value should pass through unchanged + conn3 = MysqlConnection( + username="user", + authType=BasicAuth(password="pass"), + hostPort="localhost:3306", + ) + self.assertEqual(conn3.hostPort, "localhost:3306") + + def test_model_post_init_raises_for_invalid_port_in_hostport(self): + """ + Verify that a non-numeric port in a scheme-prefixed hostPort raises + at model construction time (fail-fast) rather than producing a + confusing error deep in connector code. + """ + with self.assertRaises(Exception): + MysqlConnection( + username="user", + authType=BasicAuth(password="pass"), + hostPort="http://localhost:abc", + ) + + def test_non_database_connections_not_stripped(self): + """ + Verify that non-database connection classes (metadata, dashboard, pipeline, …) + are NOT subject to hostPort scheme stripping because their hostPort legitimately + stores a full URL (e.g. OpenMetadataConnection hostPort = http://localhost:8585/api). + + The guard uses the module path: only classes whose __module__ contains + _DATABASE_CONNECTION_MODULE_MARKER ('.services.connections.database.') + are stripped. All others pass through unchanged. + """ + # Confirm the marker is correct + self.assertIn("database", _DATABASE_CONNECTION_MODULE_MARKER) + + # OpenMetadataConnection.hostPort is a plain str that expects a full URL. + # Its module contains '.connections.metadata.' — NOT the database marker — + # so it must NOT be stripped. + ometa = OpenMetadataConnection( + hostPort="http://localhost:8585/api", + ) + self.assertIn("http", ometa.hostPort) + self.assertIn("localhost", ometa.hostPort) + + def test_none_hostport_does_not_crash(self): + """ + Regression test for gitar-bot bug report: constructing a database + connection where hostPort is Optional and left as None must NOT raise + AttributeError from strip_hostport_scheme(None). + """ + # CassandraConnection.hostPort is Optional[str] = None by default. + conn = CassandraConnection() + self.assertIsNone(conn.hostPort) diff --git a/ingestion/tests/unit/test_db_utils.py b/ingestion/tests/unit/test_db_utils.py index 3ed85f9bb639..1c3dcf58cc85 100644 --- a/ingestion/tests/unit/test_db_utils.py +++ b/ingestion/tests/unit/test_db_utils.py @@ -37,7 +37,11 @@ from metadata.ingestion.lineage.models import Dialect from metadata.ingestion.lineage.sql_lineage import search_cache from metadata.ingestion.source.models import TableView -from metadata.utils.db_utils import get_host_from_host_port, get_view_lineage +from metadata.utils.db_utils import ( + clean_host_port, + get_host_from_host_port, + get_view_lineage, +) # Mock LineageTable class to simulate collate_sqllineage.core.models.Table @@ -115,6 +119,94 @@ def test_get_host_from_host_port(self): self.assertEqual(get_host_from_host_port("localhost"), "localhost") self.assertEqual(get_host_from_host_port("example.com"), "example.com") + # Test with URL scheme prefixes + self.assertEqual(get_host_from_host_port("http://localhost:3306"), "localhost") + self.assertEqual( + get_host_from_host_port("https://example.com:5432"), "example.com" + ) + self.assertEqual(get_host_from_host_port("http://localhost"), "localhost") + + # Test with IPv6 addresses + self.assertEqual(get_host_from_host_port("http://[::1]:3306"), "[::1]") + self.assertEqual(get_host_from_host_port("[::1]:3306"), "[::1]") + + def test_clean_host_port(self): + """Test clean_host_port strips URL scheme prefixes""" + # Already-clean values pass through unchanged + self.assertEqual(clean_host_port("localhost:3306"), "localhost:3306") + self.assertEqual(clean_host_port("127.0.0.1:5432"), "127.0.0.1:5432") + self.assertEqual(clean_host_port("example.com"), "example.com") + + # HTTP prefix is stripped + self.assertEqual(clean_host_port("http://localhost:3306"), "localhost:3306") + self.assertEqual(clean_host_port("http://example.com:8080"), "example.com:8080") + + # HTTPS prefix is stripped + self.assertEqual(clean_host_port("https://localhost:5432"), "localhost:5432") + self.assertEqual( + clean_host_port("https://mydb.example.com:3306"), "mydb.example.com:3306" + ) + + # Trailing slash is stripped + self.assertEqual(clean_host_port("http://localhost:3306/"), "localhost:3306") + + # Host only with scheme + self.assertEqual(clean_host_port("http://localhost"), "localhost") + self.assertEqual(clean_host_port("https://example.com"), "example.com") + + # URL with path is handled — path/query/fragment are discarded + self.assertEqual(clean_host_port("http://localhost:3306/db"), "localhost:3306") + self.assertEqual( + clean_host_port("https://example.com:5432/mydb?ssl=true"), + "example.com:5432", + ) + + # Whitespace is stripped + self.assertEqual(clean_host_port(" localhost:3306 "), "localhost:3306") + self.assertEqual(clean_host_port(" http://localhost:3306 "), "localhost:3306") + + # JDBC-style URLs fall back to raw extraction + self.assertEqual(clean_host_port("jdbc:postgresql://host:5432"), "host:5432") + self.assertEqual(clean_host_port("jdbc:postgresql://host:5432/db"), "host:5432") + self.assertEqual( + clean_host_port("jdbc:postgresql://host:5432?ssl=true"), "host:5432" + ) + self.assertEqual( + clean_host_port("jdbc:postgresql://host:5432/db?ssl=true#ref"), + "host:5432", + ) + + # IPv6 addresses — brackets are preserved + self.assertEqual(clean_host_port("http://[::1]:3306"), "[::1]:3306") + self.assertEqual(clean_host_port("https://[::1]:5432"), "[::1]:5432") + self.assertEqual(clean_host_port("http://[::1]"), "[::1]") + self.assertEqual( + clean_host_port("http://[2001:db8::1]:3306"), "[2001:db8::1]:3306" + ) + + # Plain IPv6 without scheme passes through unchanged + self.assertEqual(clean_host_port("[::1]:3306"), "[::1]:3306") + + # JDBC with userinfo — credentials are stripped + self.assertEqual( + clean_host_port("jdbc:postgresql://user:pass@host:5432/db"), + "host:5432", + ) + + # Invalid port raises ValueError + with self.assertRaises(ValueError): + clean_host_port("http://localhost:abc") + + # Empty host after scheme strip raises ValueError + with self.assertRaises(ValueError): + clean_host_port("http://") + + # Non-numeric port in JDBC-style fallback also raises ValueError + with self.assertRaises(ValueError): + clean_host_port("jdbc:postgresql://host:abc/db") + with self.assertRaises(ValueError): + clean_host_port("jdbc:postgresql://[::1]:abc") + @patch("metadata.utils.db_utils.ConnectionTypeDialectMapper") @patch("metadata.utils.db_utils.fqn") def test_get_view_lineage_success_with_lineage_parser(self, mock_fqn, mock_dialect_mapper):