Skip to content
This repository was archived by the owner on Mar 6, 2026. It is now read-only.

Commit 0ca3d87

Browse files
committed
fix: updates timeout/retry code to respect hanging server
1 parent 7b8ceea commit 0ca3d87

3 files changed

Lines changed: 61 additions & 5 deletions

File tree

google/cloud/bigquery/_pandas_helpers.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
from google.cloud.bigquery import _pyarrow_helpers
3535
from google.cloud.bigquery import _versions_helpers
36+
from google.cloud.bigquery import retry as bq_retry
3637
from google.cloud.bigquery import schema
3738

3839

@@ -928,6 +929,7 @@ def _download_table_bqstorage(
928929
if "@" in table.table_id:
929930
raise ValueError("Reading from a specific snapshot is not currently supported.")
930931

932+
start_time = time.time()
931933
requested_streams = determine_requested_streams(preserve_order, max_stream_count)
932934

933935
requested_session = bigquery_storage.types.stream.ReadSession(
@@ -944,10 +946,16 @@ def _download_table_bqstorage(
944946
ArrowSerializationOptions.CompressionCodec(1)
945947
)
946948

949+
retry_policy = None
950+
if timeout is not None:
951+
retry_policy = bq_retry.DEFAULT_RETRY.with_deadline(timeout)
952+
947953
session = bqstorage_client.create_read_session(
948954
parent="projects/{}".format(project_id),
949955
read_session=requested_session,
950956
max_stream_count=requested_streams,
957+
retry=retry_policy,
958+
timeout=timeout,
951959
)
952960

953961
_LOGGER.debug(
@@ -983,8 +991,6 @@ def _download_table_bqstorage(
983991
# Manually manage the pool to control shutdown behavior on timeout.
984992
pool = concurrent.futures.ThreadPoolExecutor(max_workers=max(1, total_streams))
985993
wait_on_shutdown = True
986-
start_time = time.time()
987-
988994
try:
989995
# Manually submit jobs and wait for download to complete rather
990996
# than using pool.map because pool.map continues running in the

google/cloud/bigquery/retry.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,13 @@ def _should_retry(exc):
6464
We retry if and only if the 'reason' is 'backendError'
6565
or 'rateLimitExceeded'.
6666
"""
67-
if not hasattr(exc, "errors") or len(exc.errors) == 0:
68-
# Check for unstructured error returns, e.g. from GFE
67+
try:
68+
reason = exc.errors[0]["reason"]
69+
except (AttributeError, IndexError, TypeError, KeyError):
70+
# Fallback for when errors attribute is missing, empty, or not a dict
71+
# or doesn't contain "reason" (e.g. gRPC exceptions).
6972
return isinstance(exc, _UNSTRUCTURED_RETRYABLE_TYPES)
7073

71-
reason = exc.errors[0]["reason"]
7274
return reason in _RETRYABLE_REASONS
7375

7476

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import pytest
2+
from unittest import mock
3+
from google.cloud.bigquery import _pandas_helpers
4+
5+
try:
6+
from google.cloud import bigquery_storage
7+
except ImportError:
8+
bigquery_storage = None
9+
10+
11+
@pytest.mark.skipif(
12+
bigquery_storage is None, reason="Requires `google-cloud-bigquery-storage`"
13+
)
14+
def test_download_table_bqstorage_passes_timeout_to_create_read_session():
15+
# Mock dependencies
16+
project_id = "test-project"
17+
table = mock.Mock()
18+
table.table_id = "test_table"
19+
table.to_bqstorage.return_value = "projects/test/datasets/test/tables/test"
20+
21+
bqstorage_client = mock.Mock(spec=bigquery_storage.BigQueryReadClient)
22+
# Mock create_read_session to return a session with no streams so the function returns early
23+
# (Checking start of loop logic vs empty streams return)
24+
session = mock.Mock()
25+
# If streams is empty, _download_table_bqstorage returns early, which is fine for this test
26+
session.streams = []
27+
bqstorage_client.create_read_session.return_value = session
28+
29+
# Call the function
30+
timeout = 123.456
31+
# download_arrow_bqstorage yields frames, so we need to iterate to trigger execution
32+
list(
33+
_pandas_helpers.download_arrow_bqstorage(
34+
project_id, table, bqstorage_client, timeout=timeout
35+
)
36+
)
37+
38+
# Verify timeout and retry were passed
39+
bqstorage_client.create_read_session.assert_called_once()
40+
_, kwargs = bqstorage_client.create_read_session.call_args
41+
assert "timeout" in kwargs
42+
assert kwargs["timeout"] == timeout
43+
44+
assert "retry" in kwargs
45+
retry_policy = kwargs["retry"]
46+
assert retry_policy is not None
47+
# Check if deadline is set correctly in the retry policy
48+
assert retry_policy._deadline == timeout

0 commit comments

Comments
 (0)