Skip to content

Commit 0258405

Browse files
authored
fix(bigquery): close GAPIC storage transport and auth sessions to prevent socket leaks (googleapis#17508)
This PR resolves resource leaks (specifically open sockets left in the `ESTABLISHED` state) that occur during client lifecycle operations and credential refreshing in system/unit tests. ### The Problem 1. **Transport Lifecycle:** When closing the BigQuery Storage client, calling `_transport.grpc_channel.close()` was insufficient for releasing all network resources. The full `_transport.close()` method needs to be invoked to tear down the underlying transport channel correctly. 2. **Dynamic Auth Sessions:** Under certain authentication environments (like Workload Identity/GCE Metadata server inside Kokoro CI), the `google-auth` library dynamically instantiates helper HTTP sessions to fetch access tokens. These sessions are not owned by the BigQuery client and are not closed automatically, leading to leaked sockets. 3. **Flaky Test Assertions:** Socket count assertions in system tests were flaky because Python's garbage collection is non-deterministic, meaning sockets remained open in the operating system even after client close calls until a garbage collection cycle swept them. ### Changes * **Client & Transport Lifecycle:** * Updated [connection.py](file:///usr/local/google/home/chalmerlowe/titan-src/projects/google-cloud-python/main/packages/google-cloud-bigquery/google/cloud/bigquery/dbapi/connection.py), [magics.py](file:///usr/local/google/home/chalmerlowe/titan-src/projects/google-cloud-python/main/packages/google-cloud-bigquery/google/cloud/bigquery/magics/magics.py), and [table.py](file:///usr/local/google/home/chalmerlowe/titan-src/projects/google-cloud-python/main/packages/google-cloud-bigquery/google/cloud/bigquery/table.py) to close the BigQuery Storage transport using `_transport.close()` instead of `_transport.grpc_channel.close()`. * **Testing Improvements:** * Added `patch_tracked_requests` interceptor to system/unit tests to track and explicitly close all dynamically spawned credential-refreshing HTTP sessions when the test context exits. * Added explicit `gc.collect()` calls to socket leak verification tests to force synchronous sweeping of unreferenced socket objects before asserting final socket counts. * **Code Coverage:** * Appended `# pragma: NO COVER` to Python version checks in [`__init__.py`](file:///usr/local/google/home/chalmerlowe/titan-src/projects/google-cloud-python/main/packages/google-cloud-bigquery/google/cloud/bigquery/__init__.py) for code paths that render a deprecation warning code if attempted to run on Python runtimes <3.10 test matrix (these paths do not run in our CI/CD since we never execute code with the older runtimes).
1 parent 141ad0e commit 0258405

10 files changed

Lines changed: 189 additions & 73 deletions

File tree

packages/google-cloud-bigquery/google/cloud/bigquery/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@
123123
except ImportError:
124124
bigquery_magics = None
125125

126-
if sys.version_info < (3, 10):
126+
if sys.version_info < (3, 10): # pragma: NO COVER
127127
warnings.warn(
128128
"The python-bigquery library no longer supports Python <= 3.9. "
129129
f"Your Python version is {sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}. We "

packages/google-cloud-bigquery/google/cloud/bigquery/dbapi/connection.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ def close(self):
8484

8585
if self._owns_bqstorage_client:
8686
# There is no close() on the BQ Storage client itself.
87-
self._bqstorage_client._transport.grpc_channel.close()
87+
self._bqstorage_client._transport.close()
8888

8989
for cursor_ in self._cursors_created:
9090
if not cursor_._closed:

packages/google-cloud-bigquery/google/cloud/bigquery/magics/magics.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -773,4 +773,4 @@ def _close_transports(client, bqstorage_client):
773773
"""
774774
client.close()
775775
if bqstorage_client is not None:
776-
bqstorage_client._transport.grpc_channel.close()
776+
bqstorage_client._transport.close()

packages/google-cloud-bigquery/google/cloud/bigquery/table.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2353,7 +2353,9 @@ def to_arrow(
23532353
progress_bar.close()
23542354
finally:
23552355
if owns_bqstorage_client:
2356-
bqstorage_client._transport.grpc_channel.close() # type: ignore
2356+
# mypy: bqstorage_client is guaranteed to be not None when owns_bqstorage_client is True,
2357+
# but mypy cannot infer this correlation. We ignore the union-attr error here.
2358+
bqstorage_client._transport.close() # type: ignore[union-attr]
23572359

23582360
if record_batches and bqstorage_client is not None:
23592361
return pyarrow.Table.from_batches(record_batches)

packages/google-cloud-bigquery/tests/system/helpers.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import contextlib
1516
import datetime
1617
import decimal
1718
import uuid
@@ -21,7 +22,6 @@
2122

2223
from google.cloud._helpers import UTC
2324

24-
2525
_naive = datetime.datetime(2016, 12, 5, 12, 41, 9)
2626
_naive_microseconds = datetime.datetime(2016, 12, 5, 12, 41, 9, 250000)
2727
_stamp = "%s %s" % (_naive.date().isoformat(), _naive.time().isoformat())
@@ -104,3 +104,29 @@ def _rate_limit_exceeded(forbidden):
104104
google.api_core.exceptions.Forbidden,
105105
error_predicate=_rate_limit_exceeded,
106106
)
107+
108+
109+
@contextlib.contextmanager
110+
def patch_tracked_requests():
111+
"""Context manager to patch google-auth requests and track/close their HTTP sessions.
112+
113+
This prevents socket leaks in system tests that use Workload Identity or metadata server auth.
114+
"""
115+
import google.auth.transport.requests
116+
117+
original_init = google.auth.transport.requests.Request.__init__
118+
tracked_requests = []
119+
120+
def patched_init(self, session=None):
121+
original_init(self, session=session)
122+
if session is None:
123+
tracked_requests.append(self)
124+
125+
google.auth.transport.requests.Request.__init__ = patched_init
126+
try:
127+
yield tracked_requests
128+
finally:
129+
google.auth.transport.requests.Request.__init__ = original_init
130+
for req in tracked_requests:
131+
if hasattr(req, "session") and req.session is not None:
132+
req.session.close()

packages/google-cloud-bigquery/tests/system/test_client.py

Lines changed: 43 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@
5858

5959
from . import helpers
6060

61-
6261
JOB_TIMEOUT = 120 # 2 minutes
6362
DATA_PATH = pathlib.Path(__file__).parent.parent / "data"
6463

@@ -234,23 +233,29 @@ def _create_bucket(self, bucket_name, location=None):
234233

235234
def test_close_releases_open_sockets(self):
236235
current_process = psutil.Process()
237-
conn_count_start = len(current_process.net_connections())
236+
conn_start = current_process.net_connections()
237+
conn_count_start = len(conn_start)
238+
239+
with helpers.patch_tracked_requests():
240+
client = Config.CLIENT
241+
client.query(
242+
"""
243+
SELECT
244+
source_year AS year, COUNT(is_male) AS birth_count
245+
FROM `bigquery-public-data.samples.natality`
246+
GROUP BY year
247+
ORDER BY year DESC
248+
LIMIT 15
249+
"""
250+
)
238251

239-
client = Config.CLIENT
240-
client.query(
241-
"""
242-
SELECT
243-
source_year AS year, COUNT(is_male) AS birth_count
244-
FROM `bigquery-public-data.samples.natality`
245-
GROUP BY year
246-
ORDER BY year DESC
247-
LIMIT 15
248-
"""
249-
)
252+
client.close()
250253

251-
client.close()
254+
import gc
252255

253-
conn_count_end = len(current_process.net_connections())
256+
gc.collect()
257+
conn_end = current_process.net_connections()
258+
conn_count_end = len(conn_end)
254259
self.assertLessEqual(conn_count_end, conn_count_start)
255260

256261
def test_create_dataset(self):
@@ -2174,25 +2179,31 @@ def test_dbapi_dry_run_query(self):
21742179
def test_dbapi_connection_does_not_leak_sockets(self):
21752180
pytest.importorskip("google.cloud.bigquery_storage")
21762181
current_process = psutil.Process()
2177-
conn_count_start = len(current_process.net_connections())
2178-
2179-
# Provide no explicit clients, so that the connection will create and own them.
2180-
connection = dbapi.connect()
2181-
cursor = connection.cursor()
2182-
2183-
cursor.execute(
2182+
conn_start = current_process.net_connections()
2183+
conn_count_start = len(conn_start)
2184+
2185+
with helpers.patch_tracked_requests():
2186+
# Provide no explicit clients, so that the connection will create and own them.
2187+
connection = dbapi.connect()
2188+
cursor = connection.cursor()
2189+
2190+
cursor.execute(
2191+
"""
2192+
SELECT id, `by`, timestamp
2193+
FROM `bigquery-public-data.hacker_news.full`
2194+
ORDER BY `id` ASC
2195+
LIMIT 100000
21842196
"""
2185-
SELECT id, `by`, timestamp
2186-
FROM `bigquery-public-data.hacker_news.full`
2187-
ORDER BY `id` ASC
2188-
LIMIT 100000
2189-
"""
2190-
)
2191-
rows = cursor.fetchall()
2192-
self.assertEqual(len(rows), 100000)
2197+
)
2198+
rows = cursor.fetchall()
2199+
self.assertEqual(len(rows), 100000)
2200+
2201+
connection.close()
2202+
import gc
21932203

2194-
connection.close()
2195-
conn_count_end = len(current_process.net_connections())
2204+
gc.collect()
2205+
conn_end = current_process.net_connections()
2206+
conn_count_end = len(conn_end)
21962207
self.assertLessEqual(conn_count_end, conn_count_start)
21972208

21982209
def _load_table_for_dml(self, rows, dataset_id, table_id):

packages/google-cloud-bigquery/tests/system/test_magics.py

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import pytest
2020
import psutil
2121

22+
from . import helpers
2223

2324
IPython = pytest.importorskip("IPython")
2425
io = pytest.importorskip("IPython.utils.io")
@@ -48,27 +49,30 @@ def ipython_interactive(ipython):
4849
def test_bigquery_magic(ipython_interactive):
4950
ip = IPython.get_ipython()
5051
current_process = psutil.Process()
51-
conn_count_start = len(current_process.net_connections())
52-
53-
# Deprecated, but should still work in google-cloud-bigquery 3.x.
54-
with pytest.warns(FutureWarning, match="bigquery_magics"):
55-
ip.extension_manager.load_extension("google.cloud.bigquery")
56-
57-
sql = """
58-
SELECT
59-
CONCAT(
60-
'https://stackoverflow.com/questions/',
61-
CAST(id as STRING)) as url,
62-
view_count
63-
FROM `bigquery-public-data.stackoverflow.posts_questions`
64-
WHERE tags like '%google-bigquery%'
65-
ORDER BY view_count DESC
66-
LIMIT 10
67-
"""
68-
with io.capture_output() as captured:
69-
result = ip.run_cell_magic("bigquery", "--use_rest_api", sql)
70-
71-
conn_count_end = len(current_process.net_connections())
52+
conn_start = current_process.net_connections()
53+
conn_count_start = len(conn_start)
54+
55+
with helpers.patch_tracked_requests():
56+
# Deprecated, but should still work in google-cloud-bigquery 3.x.
57+
with pytest.warns(FutureWarning, match="bigquery_magics"):
58+
ip.extension_manager.load_extension("google.cloud.bigquery")
59+
60+
sql = """
61+
SELECT
62+
CONCAT(
63+
'https://stackoverflow.com/questions/',
64+
CAST(id as STRING)) as url,
65+
view_count
66+
FROM `bigquery-public-data.stackoverflow.posts_questions`
67+
WHERE tags like '%google-bigquery%'
68+
ORDER BY view_count DESC
69+
LIMIT 10
70+
"""
71+
with io.capture_output() as captured:
72+
result = ip.run_cell_magic("bigquery", "--use_rest_api", sql)
73+
74+
conn_end = current_process.net_connections()
75+
conn_count_end = len(conn_end)
7276

7377
lines = re.split("\n|\r", captured.stdout)
7478
# Removes blanks & terminal code (result of display clearing)

packages/google-cloud-bigquery/tests/unit/test_dbapi_connection.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ def _mock_bqstorage_client(self):
4040
from google.cloud import bigquery_storage
4141

4242
mock_client = mock.create_autospec(bigquery_storage.BigQueryReadClient)
43-
mock_client._transport = mock.Mock(spec=["channel"])
43+
mock_client._transport = mock.Mock(spec=["channel", "close"])
4444
mock_client._transport.grpc_channel = mock.Mock(spec=["close"])
4545
return mock_client
4646

@@ -176,7 +176,7 @@ def test_close_closes_all_created_bigquery_clients(self):
176176
connection.close()
177177

178178
self.assertTrue(client.close.called)
179-
self.assertTrue(bqstorage_client._transport.grpc_channel.close.called)
179+
self.assertTrue(bqstorage_client._transport.close.called)
180180

181181
def test_close_does_not_close_bigquery_clients_passed_to_it(self):
182182
pytest.importorskip("google.cloud.bigquery_storage")
@@ -187,7 +187,7 @@ def test_close_does_not_close_bigquery_clients_passed_to_it(self):
187187
connection.close()
188188

189189
self.assertFalse(client.close.called)
190-
self.assertFalse(bqstorage_client._transport.grpc_channel.close.called)
190+
self.assertFalse(bqstorage_client._transport.close.called)
191191

192192
def test_close_closes_all_created_cursors(self):
193193
connection = self._make_one(client=self._mock_client())

packages/google-cloud-bigquery/tests/unit/test_magics.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545

4646
@pytest.fixture()
4747
def use_local_magics_context(monkeypatch):
48-
if magics is not None:
48+
if magics is not None: # pragma: NO COVER
4949
local_context = magics.Context()
5050
local_context._project = "unit-test-project"
5151
mock_credentials = mock.create_autospec(
@@ -2195,13 +2195,10 @@ def test_bigquery_magic_create_dataset_fails(monkeypatch):
21952195

21962196

21972197
@pytest.mark.usefixtures("ipython_interactive")
2198-
def test_bigquery_magic_with_location(monkeypatch):
2198+
def test_bigquery_magic_with_location(monkeypatch, use_local_magics_context):
21992199
ip = IPython.get_ipython()
22002200
monkeypatch.setattr(bigquery, "bigquery_magics", None)
22012201
bigquery.load_ipython_extension(ip)
2202-
magics.context.credentials = mock.create_autospec(
2203-
google.auth.credentials.Credentials, instance=True
2204-
)
22052202

22062203
run_query_patch = mock.patch(
22072204
"google.cloud.bigquery.magics.magics._run_query", autospec=True

0 commit comments

Comments
 (0)