Skip to content

Commit 40a98b6

Browse files
committed
Fixes #29358: fix BigQuery test connection signature, per-project scoping, and engine teardown
1 parent 6d9f272 commit 40a98b6

4 files changed

Lines changed: 135 additions & 13 deletions

File tree

ingestion/src/metadata/ingestion/source/database/bigquery/connection.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,12 +148,14 @@ def _get_client(self) -> Engine:
148148
if connection.billingProjectId:
149149
kwargs["billing_project_id"] = connection.billingProjectId
150150

151-
return create_generic_db_connection(
151+
engine = create_generic_db_connection(
152152
connection=connection,
153153
get_connection_url_fn=get_connection_url,
154154
get_connection_args_fn=get_connection_args_common,
155155
**kwargs,
156156
)
157+
self._on_close(engine.dispose)
158+
return engine
157159

158160
def test_connection(
159161
self,
@@ -226,7 +228,11 @@ def test_connection_inner(engine): # pyright: ignore[reportMissingParameterType
226228
timeout_seconds=timeout_seconds,
227229
)
228230

229-
return test_connection_inner(engine)
231+
try:
232+
result = test_connection_inner(engine)
233+
finally:
234+
self.close()
235+
return result
230236

231237

232238
def get_table_view_names(connection, schema=None):

ingestion/src/metadata/ingestion/source/database/bigquery/helper.py

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -60,26 +60,38 @@ class InspectorWrapper(BaseModel):
6060
inspector: Any
6161

6262

63+
def clone_connection_for_project(database_name: str, service_connection: BigQueryConnection) -> BigQueryConnection:
64+
"""
65+
Return a copy of the service connection scoped to a single project, so each
66+
project in a multi-project connection can be inspected/tested independently.
67+
"""
68+
new_service_connection = deepcopy(service_connection)
69+
if isinstance(new_service_connection.credentials.gcpConfig, GcpCredentialsValues):
70+
new_service_connection.credentials.gcpConfig.projectId = SingleProjectId(database_name)
71+
return new_service_connection
72+
73+
6374
def get_inspector_details(database_name: str, service_connection: BigQueryConnection) -> InspectorWrapper:
6475
"""
6576
Method to get the bigquery inspector details
6677
"""
6778
# TODO support location property in JSON Schema
6879
# TODO support OAuth 2.0 scopes
69-
new_service_connection = deepcopy(service_connection)
80+
new_service_connection = clone_connection_for_project(database_name, service_connection)
7081
kwargs = {}
7182

7283
if new_service_connection.usageLocation:
7384
kwargs["location"] = new_service_connection.usageLocation
7485

75-
if isinstance(new_service_connection.credentials.gcpConfig, GcpCredentialsValues):
76-
new_service_connection.credentials.gcpConfig.projectId = SingleProjectId(database_name)
77-
if new_service_connection.credentials.gcpImpersonateServiceAccount:
78-
kwargs["impersonate_service_account"] = (
79-
new_service_connection.credentials.gcpImpersonateServiceAccount.impersonateServiceAccount
80-
)
86+
if (
87+
isinstance(new_service_connection.credentials.gcpConfig, GcpCredentialsValues)
88+
and new_service_connection.credentials.gcpImpersonateServiceAccount
89+
):
90+
kwargs["impersonate_service_account"] = (
91+
new_service_connection.credentials.gcpImpersonateServiceAccount.impersonateServiceAccount
92+
)
8193

82-
kwargs["lifetime"] = new_service_connection.credentials.gcpImpersonateServiceAccount.lifetime
94+
kwargs["lifetime"] = new_service_connection.credentials.gcpImpersonateServiceAccount.lifetime
8395

8496
client = get_bigquery_client(project_id=new_service_connection.billingProjectId or database_name, **kwargs)
8597
engine = get_connection(new_service_connection)

ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
from metadata.ingestion.source.database.bigquery.helper import (
7676
clear_constraint_cache,
7777
clear_constraint_cache_for_schema,
78+
clone_connection_for_project,
7879
get_foreign_keys,
7980
get_inspector_details,
8081
get_pk_constraint,
@@ -346,11 +347,11 @@ def _get_columns_internal(
346347

347348
def _test_connection(self) -> None:
348349
for project_id in self.project_ids:
349-
inspector_details = get_inspector_details(
350+
project_connection = clone_connection_for_project(
350351
database_name=project_id, service_connection=self.service_connection
351352
)
352-
test_connection_fn = get_test_connection_fn(self.service_connection)
353-
test_connection_fn(self.metadata, inspector_details.engine, self.service_connection)
353+
test_connection_fn = get_test_connection_fn(project_connection)
354+
test_connection_fn(self.metadata)
354355
# GOOGLE_CREDENTIALS may not have been set,
355356
# to avoid key error, we use `get` for dict
356357
if os.environ.get(GOOGLE_CREDENTIALS):
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
# Copyright 2025 Collate
2+
# Licensed under the Collate Community License, Version 1.0 (the "License");
3+
# you may not use this file except in compliance with the License.
4+
# You may obtain a copy of the License at
5+
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
6+
# Unless required by applicable law or agreed to in writing, software
7+
# distributed under the License is distributed on an "AS IS" BASIS,
8+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
9+
# See the License for the specific language governing permissions and
10+
# limitations under the License.
11+
12+
"""
13+
Regression tests for the BigQuery test-connection fix:
14+
15+
- multi-project test connection scopes each probe to its own project
16+
- ``_test_connection`` invokes the test function with the migrated signature
17+
(``metadata`` only) so a service-connection object can never leak into
18+
``timeout_seconds`` again
19+
- the engine built for the test connection is always disposed
20+
"""
21+
22+
from unittest.mock import MagicMock, patch
23+
24+
from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import (
25+
BigQueryConnection as BigQueryConnectionConfig,
26+
)
27+
from metadata.ingestion.source.database.bigquery.connection import BigQueryConnection
28+
from metadata.ingestion.source.database.bigquery.helper import (
29+
clone_connection_for_project,
30+
)
31+
from metadata.ingestion.source.database.bigquery.metadata import BigquerySource
32+
33+
_CONNECTION_MODULE = "metadata.ingestion.source.database.bigquery.connection"
34+
_METADATA_MODULE = "metadata.ingestion.source.database.bigquery.metadata"
35+
36+
_GCP_CONFIG = {
37+
"type": "service_account",
38+
"projectId": "placeholder",
39+
"privateKeyId": "key-id",
40+
"privateKey": "private-key",
41+
"clientEmail": "user@example.com",
42+
"clientId": "1234",
43+
"authUri": "https://accounts.google.com/o/oauth2/auth",
44+
"tokenUri": "https://oauth2.googleapis.com/token",
45+
"authProviderX509CertUrl": "https://www.googleapis.com/oauth2/v1/certs",
46+
"clientX509CertUrl": "https://www.googleapis.com/oauth2/v1/certs",
47+
}
48+
49+
50+
def _bq_config(project_id) -> BigQueryConnectionConfig:
51+
return BigQueryConnectionConfig.model_validate(
52+
{"type": "BigQuery", "credentials": {"gcpConfig": {**_GCP_CONFIG, "projectId": project_id}}}
53+
)
54+
55+
56+
def test_clone_connection_scopes_to_single_project():
57+
connection = _bq_config(["proj-a", "proj-b"])
58+
59+
cloned = clone_connection_for_project("proj-a", connection)
60+
61+
assert cloned.credentials.gcpConfig.projectId.root == "proj-a"
62+
# the original multi-project connection must stay untouched (deepcopy isolation)
63+
assert connection.credentials.gcpConfig.projectId.root == ["proj-a", "proj-b"]
64+
65+
66+
@patch(f"{_CONNECTION_MODULE}.test_connection_steps")
67+
@patch(f"{_CONNECTION_MODULE}.create_generic_db_connection")
68+
@patch(f"{_CONNECTION_MODULE}.set_google_credentials")
69+
def test_test_connection_probes_each_project_with_a_valid_timeout(mock_creds, mock_create, mock_steps):
70+
mock_create.return_value = MagicMock()
71+
source = object.__new__(BigquerySource)
72+
source.metadata = MagicMock()
73+
source.service_connection = _bq_config(["proj-a", "proj-b"])
74+
source.project_ids = ["proj-a", "proj-b"]
75+
source.temp_credentials_file_path = []
76+
77+
# real clone_connection_for_project + get_test_connection_fn path; only the
78+
# external boundaries (engine factory, credentials, step runner) are stubbed
79+
BigquerySource._test_connection(source)
80+
81+
assert mock_steps.call_count == 2
82+
for call in mock_steps.call_args_list:
83+
assert call.kwargs["service_type"] == "BigQuery"
84+
# the regression guard: the original bug passed the service connection
85+
# positionally into timeout_seconds, which then reached signal.alarm().
86+
# A valid timeout reaching the step runner proves the signature is right.
87+
assert isinstance(call.kwargs["timeout_seconds"], int)
88+
89+
90+
@patch(f"{_CONNECTION_MODULE}.test_connection_steps")
91+
@patch(f"{_CONNECTION_MODULE}.create_generic_db_connection")
92+
@patch(f"{_CONNECTION_MODULE}.set_google_credentials")
93+
def test_test_connection_disposes_engine(mock_creds, mock_create, mock_steps):
94+
engine = MagicMock()
95+
mock_create.return_value = engine
96+
97+
connection = BigQueryConnection(_bq_config("proj-a"))
98+
connection.test_connection(metadata=MagicMock())
99+
100+
mock_create.assert_called_once()
101+
# both halves of the fix: _get_client registers engine.dispose AND
102+
# test_connection calls self.close(), so the engine is always released.
103+
engine.dispose.assert_called_once()

0 commit comments

Comments
 (0)