Skip to content
This repository was archived by the owner on Mar 6, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions google/cloud/bigquery/_pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

from google.cloud.bigquery import _pyarrow_helpers
from google.cloud.bigquery import _versions_helpers
from google.cloud.bigquery import retry as bq_retry
from google.cloud.bigquery import schema


Expand Down Expand Up @@ -928,6 +929,7 @@ def _download_table_bqstorage(
if "@" in table.table_id:
raise ValueError("Reading from a specific snapshot is not currently supported.")

start_time = time.time()
requested_streams = determine_requested_streams(preserve_order, max_stream_count)

requested_session = bigquery_storage.types.stream.ReadSession(
Expand All @@ -944,10 +946,16 @@ def _download_table_bqstorage(
ArrowSerializationOptions.CompressionCodec(1)
)

retry_policy = None
if timeout is not None:
retry_policy = bq_retry.DEFAULT_RETRY.with_deadline(timeout)

session = bqstorage_client.create_read_session(
parent="projects/{}".format(project_id),
read_session=requested_session,
max_stream_count=requested_streams,
retry=retry_policy,
timeout=timeout,
)

_LOGGER.debug(
Expand Down Expand Up @@ -983,8 +991,6 @@ def _download_table_bqstorage(
# Manually manage the pool to control shutdown behavior on timeout.
pool = concurrent.futures.ThreadPoolExecutor(max_workers=max(1, total_streams))
wait_on_shutdown = True
start_time = time.time()
Comment thread
chalmerlowe marked this conversation as resolved.

try:
# Manually submit jobs and wait for download to complete rather
# than using pool.map because pool.map continues running in the
Expand Down
8 changes: 5 additions & 3 deletions google/cloud/bigquery/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,13 @@ def _should_retry(exc):
We retry if and only if the 'reason' is 'backendError'
or 'rateLimitExceeded'.
"""
if not hasattr(exc, "errors") or len(exc.errors) == 0:
# Check for unstructured error returns, e.g. from GFE
try:
reason = exc.errors[0]["reason"]
except (AttributeError, IndexError, TypeError, KeyError):
# Fallback for when errors attribute is missing, empty, or not a dict
# or doesn't contain "reason" (e.g. gRPC exceptions).
return isinstance(exc, _UNSTRUCTURED_RETRYABLE_TYPES)
Comment thread
chalmerlowe marked this conversation as resolved.

reason = exc.errors[0]["reason"]
return reason in _RETRYABLE_REASONS


Expand Down
48 changes: 48 additions & 0 deletions tests/unit/test_download_bqstorage_timeout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import pytest
from unittest import mock
from google.cloud.bigquery import _pandas_helpers

try:
from google.cloud import bigquery_storage
except ImportError:
bigquery_storage = None


@pytest.mark.skipif(
bigquery_storage is None, reason="Requires `google-cloud-bigquery-storage`"
)
def test_download_table_bqstorage_passes_timeout_to_create_read_session():
# Mock dependencies
project_id = "test-project"
table = mock.Mock()
table.table_id = "test_table"
table.to_bqstorage.return_value = "projects/test/datasets/test/tables/test"

bqstorage_client = mock.Mock(spec=bigquery_storage.BigQueryReadClient)
# Mock create_read_session to return a session with no streams so the function returns early
# (Checking start of loop logic vs empty streams return)
session = mock.Mock()
# If streams is empty, _download_table_bqstorage returns early, which is fine for this test
session.streams = []
bqstorage_client.create_read_session.return_value = session

# Call the function
timeout = 123.456
# download_arrow_bqstorage yields frames, so we need to iterate to trigger execution
list(
_pandas_helpers.download_arrow_bqstorage(
project_id, table, bqstorage_client, timeout=timeout
)
)

# Verify timeout and retry were passed
bqstorage_client.create_read_session.assert_called_once()
_, kwargs = bqstorage_client.create_read_session.call_args
assert "timeout" in kwargs
assert kwargs["timeout"] == timeout

assert "retry" in kwargs
retry_policy = kwargs["retry"]
assert retry_policy is not None
# Check if deadline is set correctly in the retry policy
assert retry_policy._deadline == timeout
Loading