Skip to content

Commit fcfd026

Browse files
Fixes #24348: Strip URL scheme from hostPort to prevent ValueError
1 parent f3ae6cf commit fcfd026

9 files changed

Lines changed: 142 additions & 12 deletions

File tree

ingestion/src/metadata/ingestion/connections/builders.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
"""
1313
Get and test connection utilities
1414
"""
15+
1516
from functools import partial
1617
from typing import Any, Callable, Dict, Optional
1718
from urllib.parse import quote_plus
@@ -34,6 +35,7 @@
3435
from metadata.ingestion.connections.query_logger import attach_query_tracker
3536
from metadata.ingestion.connections.secrets import connection_with_options_secrets
3637
from metadata.utils.constants import BUILDER_PASSWORD_ATTR
38+
from metadata.utils.db_utils import clean_host_port
3739
from metadata.utils.logger import cli_logger
3840

3941
logger = cli_logger()
@@ -152,7 +154,13 @@ def get_password_secret(connection) -> SecretStr:
152154
aws_client = AWSClient(
153155
config=connection.authType.awsConfig
154156
).get_rds_client()
155-
host, port = connection.hostPort.split(":")
157+
cleaned = clean_host_port(connection.hostPort)
158+
if ":" not in cleaned:
159+
raise ValueError(
160+
f"hostPort '{connection.hostPort}' must include a port "
161+
f"for IAM authentication (expected 'hostname:port')."
162+
)
163+
host, port = cleaned.split(":", 1)
156164
password = SecretStr(
157165
aws_client.generate_db_auth_token(
158166
DBHostname=host,
@@ -188,7 +196,7 @@ def get_connection_url_common(connection) -> str:
188196
url = _add_password(url, connection)
189197
url += "@"
190198

191-
url += connection.hostPort
199+
url += clean_host_port(connection.hostPort)
192200
if hasattr(connection, "database"):
193201
url += f"/{connection.database}" if connection.database else ""
194202

ingestion/src/metadata/ingestion/source/database/cassandra/connection.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
"""
1313
Source connection handler
1414
"""
15+
1516
from functools import partial
1617
from typing import Optional
1718

@@ -44,6 +45,7 @@
4445
CASSANDRA_GET_RELEASE_VERSION,
4546
)
4647
from metadata.utils.constants import THREE_MIN
48+
from metadata.utils.db_utils import clean_host_port
4749

4850

4951
def get_connection(connection: CassandraConnection):
@@ -70,7 +72,7 @@ def get_connection(connection: CassandraConnection):
7072
}
7173
)
7274
else:
73-
host, port = connection.hostPort.split(":")
75+
host, port = clean_host_port(connection.hostPort).split(":")
7476
cluster_config.update({"contact_points": [host], "port": port})
7577
if connection.username and getattr(connection.authType, "password", None):
7678
cluster_config["auth_provider"] = PlainTextAuthProvider(

ingestion/src/metadata/ingestion/source/database/databricks/auth.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
"""
1313
This module provides authentication utilities for Databricks and Unity Catalog connections.
1414
"""
15+
1516
from typing import Union
1617

1718
from databricks.sdk.core import Config, azure_service_principal, oauth_service_principal
@@ -31,6 +32,7 @@
3132
from metadata.generated.schema.entity.services.connections.database.unityCatalogConnection import (
3233
UnityCatalogConnection,
3334
)
35+
from metadata.utils.db_utils import clean_host_port
3436

3537

3638
def get_personal_access_token_auth(
@@ -50,7 +52,7 @@ def get_databricks_oauth_auth(
5052
"""
5153

5254
def credential_provider():
53-
hostname = connection.hostPort.split(":")[0]
55+
hostname = clean_host_port(connection.hostPort).split(":")[0]
5456
config = Config(
5557
host=f"https://{hostname}",
5658
client_id=connection.authType.clientId,
@@ -67,7 +69,7 @@ def get_azure_ad_auth(connection: Union[DatabricksConnection, UnityCatalogConnec
6769
"""
6870

6971
def credential_provider():
70-
hostname = connection.hostPort.split(":")[0]
72+
hostname = clean_host_port(connection.hostPort).split(":")[0]
7173
config = Config(
7274
host=f"https://{hostname}",
7375
azure_client_secret=connection.authType.azureClientSecret.get_secret_value(),

ingestion/src/metadata/ingestion/source/database/databricks/client.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
"""
1212
Client to interact with databricks apis
1313
"""
14+
1415
import base64
1516
import json
1617
import traceback
@@ -34,6 +35,7 @@
3435
DATABRICKS_GET_TABLE_LINEAGE,
3536
)
3637
from metadata.utils.constants import QUERY_WITH_DBT, QUERY_WITH_OM_VERSION
38+
from metadata.utils.db_utils import clean_host_port
3739
from metadata.utils.helpers import datetime_to_ts
3840
from metadata.utils.logger import ingestion_logger
3941

@@ -62,7 +64,7 @@ def __init__(
6264
engine: Optional[Engine] = None,
6365
):
6466
self.config = config
65-
base_url, *_ = self.config.hostPort.split(":")
67+
base_url, *_ = clean_host_port(self.config.hostPort).split(":")
6668
self.base_url = f"https://{base_url}{API_VERSION}"
6769
self.base_query_url = f"{self.base_url}{QUERIES_PATH}"
6870
self.base_job_url = f"https://{base_url}{JOB_API_VERSION}/jobs"

ingestion/src/metadata/ingestion/source/database/db2/connection.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
"""
1313
Source connection handler
1414
"""
15+
1516
import importlib
1617
import sys
1718
from pathlib import Path
@@ -43,6 +44,7 @@
4344
install_clidriver,
4445
)
4546
from metadata.utils.constants import THREE_MIN, UTF_8
47+
from metadata.utils.db_utils import clean_host_port
4648
from metadata.utils.logger import ingestion_logger
4749
from metadata.utils.ssl_manager import check_ssl_and_init
4850

@@ -59,7 +61,7 @@ def _get_ibmi_connection_url(connection: Db2Connection) -> str:
5961
6062
Port is passed separately via connect_args.
6163
"""
62-
hostname = connection.hostPort.split(":")[0]
64+
hostname = clean_host_port(connection.hostPort).split(":")[0]
6365

6466
url = f"{connection.scheme.value}://"
6567
if connection.username:
@@ -81,7 +83,7 @@ def _get_ibmi_connection_args(connection: Db2Connection) -> Dict[str, Any]:
8183
rejects it in the URL.
8284
"""
8385
args = get_connection_args_common(connection)
84-
host_port = connection.hostPort
86+
host_port = clean_host_port(connection.hostPort)
8587
if ":" in host_port:
8688
port_str = host_port.split(":")[1]
8789
try:

ingestion/src/metadata/ingestion/source/database/redshift/connection.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
"""
1313
Source connection handler
1414
"""
15+
1516
from functools import partial
1617
from typing import Optional
1718
from urllib.parse import quote_plus
@@ -55,6 +56,7 @@
5556
REDSHIFT_TEST_GET_QUERIES_MAP,
5657
)
5758
from metadata.utils.constants import THREE_MIN
59+
from metadata.utils.db_utils import clean_host_port
5860
from metadata.utils.logger import ingestion_logger
5961

6062
logger = ingestion_logger()
@@ -129,7 +131,7 @@ def _get_redshift_iam_credentials(connection: RedshiftConnection) -> tuple:
129131
Get temporary credentials for Redshift using IAM authentication.
130132
Detects Serverless vs Provisioned from the host and uses the appropriate API.
131133
"""
132-
host = connection.hostPort.split(":")[0]
134+
host = clean_host_port(connection.hostPort).split(":")[0]
133135

134136
if _is_serverless_host(host):
135137
return _get_serverless_iam_credentials(connection, host)
@@ -150,7 +152,7 @@ def get_redshift_connection_url(connection: RedshiftConnection) -> str:
150152

151153
url = f"{connection.scheme.value}://"
152154
url += f"{quote_plus(username)}:{quote_plus(password)}@"
153-
url += connection.hostPort
155+
url += clean_host_port(connection.hostPort)
154156
url += f"/{connection.database}" if connection.database else ""
155157

156158
options = get_connection_options_dict(connection)

ingestion/src/metadata/utils/db_utils.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,11 @@
1212
"""
1313
Helpers module for db sources
1414
"""
15+
1516
import time
1617
import traceback
1718
from typing import Iterable, List, Union
19+
from urllib.parse import urlparse
1820

1921
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
2022
from metadata.generated.schema.entity.data.table import Table
@@ -43,12 +45,43 @@
4345
PUBLIC_SCHEMA = "public"
4446

4547

48+
def clean_host_port(host_port: str) -> str:
49+
"""
50+
Strip URL scheme prefixes from a hostPort string.
51+
52+
Users sometimes enter a full URL (e.g. 'http://localhost:3306')
53+
instead of just 'localhost:3306'. This strips the scheme to avoid
54+
ValueError when parsing host and port.
55+
"""
56+
host_port = host_port.strip()
57+
if "://" not in host_port:
58+
return host_port.rstrip("/")
59+
60+
logger.warning(
61+
"The hostPort '%s' contains a URL scheme. "
62+
"Expected format is 'hostname[:port]' (e.g. 'localhost:3306'). "
63+
"Stripping the scheme prefix.",
64+
host_port,
65+
)
66+
parsed = urlparse(host_port)
67+
hostname = parsed.hostname or ""
68+
port = parsed.port
69+
70+
if not hostname:
71+
# urlparse couldn't extract hostname (e.g. jdbc:postgresql://host:5432)
72+
# Fall back to stripping everything before the last ://
73+
raw = host_port.rsplit("://", 1)[-1].split("/")[0]
74+
return raw
75+
76+
return f"{hostname}:{port}" if port else hostname
77+
78+
4679
def get_host_from_host_port(uri: str) -> str:
4780
"""
4881
if uri is like "localhost:9000"
4982
then return the host "localhost"
5083
"""
51-
return uri.split(":")[0]
84+
return clean_host_port(uri).split(":")[0]
5285

5386

5487
# pylint: disable=too-many-locals

ingestion/tests/unit/test_build_connection_url.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,34 @@ def test_get_connection_url_mysql(self):
9797
"mysql+pymysql://openmetadata_user:mocked_token@localhost:3306/openmetadata_db",
9898
)
9999

100+
def test_get_connection_url_mysql_with_url_scheme(self):
101+
"""hostPort with http:// prefix should be cleaned automatically"""
102+
connection = MysqlConnectionConfig(
103+
username="openmetadata_user",
104+
authType=BasicAuth(password="openmetadata_password"),
105+
hostPort="http://localhost:3306",
106+
databaseSchema="openmetadata_db",
107+
)
108+
engine_connection = MySQLConnection(connection).client
109+
self.assertEqual(
110+
engine_connection.url.render_as_string(hide_password=False),
111+
"mysql+pymysql://openmetadata_user:openmetadata_password@localhost:3306/openmetadata_db",
112+
)
113+
114+
def test_get_connection_url_postgres_with_url_scheme(self):
115+
"""hostPort with https:// prefix should be cleaned automatically"""
116+
connection = PostgresConnectionConfig(
117+
username="openmetadata_user",
118+
authType=BasicAuth(password="openmetadata_password"),
119+
hostPort="https://localhost:5432",
120+
database="openmetadata_db",
121+
)
122+
engine_connection = PostgresConnection(connection).client
123+
self.assertEqual(
124+
engine_connection.url.render_as_string(hide_password=False),
125+
"postgresql+psycopg2://openmetadata_user:openmetadata_password@localhost:5432/openmetadata_db",
126+
)
127+
100128
def test_get_connection_url_postgres(self):
101129
connection = PostgresConnectionConfig(
102130
username="openmetadata_user",

ingestion/tests/unit/test_db_utils.py

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
"""
1313
Unit tests for db_utils module
1414
"""
15+
1516
import uuid
1617
from copy import deepcopy
1718
from unittest import TestCase
@@ -36,7 +37,11 @@
3637
from metadata.ingestion.lineage.models import Dialect
3738
from metadata.ingestion.lineage.sql_lineage import search_cache
3839
from metadata.ingestion.source.models import TableView
39-
from metadata.utils.db_utils import get_host_from_host_port, get_view_lineage
40+
from metadata.utils.db_utils import (
41+
clean_host_port,
42+
get_host_from_host_port,
43+
get_view_lineage,
44+
)
4045

4146

4247
# Mock LineageTable class to simulate collate_sqllineage.core.models.Table
@@ -118,6 +123,52 @@ def test_get_host_from_host_port(self):
118123
self.assertEqual(get_host_from_host_port("localhost"), "localhost")
119124
self.assertEqual(get_host_from_host_port("example.com"), "example.com")
120125

126+
# Test with URL scheme prefixes
127+
self.assertEqual(get_host_from_host_port("http://localhost:3306"), "localhost")
128+
self.assertEqual(
129+
get_host_from_host_port("https://example.com:5432"), "example.com"
130+
)
131+
self.assertEqual(get_host_from_host_port("http://localhost"), "localhost")
132+
133+
def test_clean_host_port(self):
134+
"""Test clean_host_port strips URL scheme prefixes"""
135+
# Already-clean values pass through unchanged
136+
self.assertEqual(clean_host_port("localhost:3306"), "localhost:3306")
137+
self.assertEqual(clean_host_port("127.0.0.1:5432"), "127.0.0.1:5432")
138+
self.assertEqual(clean_host_port("example.com"), "example.com")
139+
140+
# HTTP prefix is stripped
141+
self.assertEqual(clean_host_port("http://localhost:3306"), "localhost:3306")
142+
self.assertEqual(clean_host_port("http://example.com:8080"), "example.com:8080")
143+
144+
# HTTPS prefix is stripped
145+
self.assertEqual(clean_host_port("https://localhost:5432"), "localhost:5432")
146+
self.assertEqual(
147+
clean_host_port("https://mydb.example.com:3306"), "mydb.example.com:3306"
148+
)
149+
150+
# Trailing slash is stripped
151+
self.assertEqual(clean_host_port("http://localhost:3306/"), "localhost:3306")
152+
153+
# Host only with scheme
154+
self.assertEqual(clean_host_port("http://localhost"), "localhost")
155+
self.assertEqual(clean_host_port("https://example.com"), "example.com")
156+
157+
# URL with path is handled — path/query/fragment are discarded
158+
self.assertEqual(clean_host_port("http://localhost:3306/db"), "localhost:3306")
159+
self.assertEqual(
160+
clean_host_port("https://example.com:5432/mydb?ssl=true"),
161+
"example.com:5432",
162+
)
163+
164+
# Whitespace is stripped
165+
self.assertEqual(clean_host_port(" localhost:3306 "), "localhost:3306")
166+
self.assertEqual(clean_host_port(" http://localhost:3306 "), "localhost:3306")
167+
168+
# JDBC-style URLs fall back to raw extraction
169+
self.assertEqual(clean_host_port("jdbc:postgresql://host:5432"), "host:5432")
170+
self.assertEqual(clean_host_port("jdbc:postgresql://host:5432/db"), "host:5432")
171+
121172
@patch("metadata.utils.db_utils.ConnectionTypeDialectMapper")
122173
@patch("metadata.utils.db_utils.fqn")
123174
def test_get_view_lineage_success_with_lineage_parser(

0 commit comments

Comments
 (0)