Skip to content
This repository was archived by the owner on Mar 6, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 40 additions & 9 deletions google/cloud/bigquery/_pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

from google.cloud.bigquery import _pyarrow_helpers
from google.cloud.bigquery import _versions_helpers
from google.cloud.bigquery import retry as bq_retry
from google.cloud.bigquery import schema


Expand Down Expand Up @@ -740,7 +741,7 @@ def _row_iterator_page_to_arrow(page, column_names, arrow_types):
return pyarrow.RecordBatch.from_arrays(arrays, names=column_names)


def download_arrow_row_iterator(pages, bq_schema):
def download_arrow_row_iterator(pages, bq_schema, timeout=None):
"""Use HTTP JSON RowIterator to construct an iterable of RecordBatches.

Args:
Expand All @@ -751,6 +752,10 @@ def download_arrow_row_iterator(pages, bq_schema):
Mapping[str, Any] \
]]):
A decription of the fields in result pages.
timeout (Optional[float]):
The number of seconds to wait for the underlying download to complete.
If ``None``, wait indefinitely.

Yields:
:class:`pyarrow.RecordBatch`
The next page of records as a ``pyarrow`` record batch.
Expand All @@ -759,8 +764,16 @@ def download_arrow_row_iterator(pages, bq_schema):
column_names = bq_to_arrow_schema(bq_schema) or [field.name for field in bq_schema]
arrow_types = [bq_to_arrow_data_type(field) for field in bq_schema]

for page in pages:
yield _row_iterator_page_to_arrow(page, column_names, arrow_types)
if timeout is None:
for page in pages:
yield _row_iterator_page_to_arrow(page, column_names, arrow_types)
else:
start_time = time.monotonic()
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE to reviewer:

Why a monotonic clock? A monotonic clock is guaranteed to move forward or stay still, but never go backward, making it ideal for measuring elapsed time and durations, unlike the system's wall clock (time.time()), which can be adjusted manually or by network time protocols (NTP) (i.e. fall back in the fall). Not likely to be a huge issue here, but good practice for this use case.

for page in pages:
if time.monotonic() - start_time > timeout:
raise concurrent.futures.TimeoutError()

yield _row_iterator_page_to_arrow(page, column_names, arrow_types)


def _row_iterator_page_to_dataframe(page, column_names, dtypes):
Expand All @@ -778,7 +791,7 @@ def _row_iterator_page_to_dataframe(page, column_names, dtypes):
return pandas.DataFrame(columns, columns=column_names)


def download_dataframe_row_iterator(pages, bq_schema, dtypes):
def download_dataframe_row_iterator(pages, bq_schema, dtypes, timeout=None):
"""Use HTTP JSON RowIterator to construct a DataFrame.

Args:
Expand All @@ -792,14 +805,27 @@ def download_dataframe_row_iterator(pages, bq_schema, dtypes):
dtypes(Mapping[str, numpy.dtype]):
The types of columns in result data to hint construction of the
resulting DataFrame. Not all column types have to be specified.
timeout (Optional[float]):
The number of seconds to wait for the underlying download to complete.
If ``None``, wait indefinitely.

Yields:
:class:`pandas.DataFrame`
The next page of records as a ``pandas.DataFrame`` record batch.
"""
bq_schema = schema._to_schema_fields(bq_schema)
column_names = [field.name for field in bq_schema]
for page in pages:
yield _row_iterator_page_to_dataframe(page, column_names, dtypes)

if timeout is None:
for page in pages:
yield _row_iterator_page_to_dataframe(page, column_names, dtypes)
else:
start_time = time.monotonic()
for page in pages:
if time.monotonic() - start_time > timeout:
raise concurrent.futures.TimeoutError()

yield _row_iterator_page_to_dataframe(page, column_names, dtypes)


def _bqstorage_page_to_arrow(page):
Expand Down Expand Up @@ -928,6 +954,7 @@ def _download_table_bqstorage(
if "@" in table.table_id:
raise ValueError("Reading from a specific snapshot is not currently supported.")

start_time = time.time()
requested_streams = determine_requested_streams(preserve_order, max_stream_count)

requested_session = bigquery_storage.types.stream.ReadSession(
Expand All @@ -944,10 +971,16 @@ def _download_table_bqstorage(
ArrowSerializationOptions.CompressionCodec(1)
)

retry_policy = None
if timeout is not None:
retry_policy = bq_retry.DEFAULT_RETRY.with_deadline(timeout)

session = bqstorage_client.create_read_session(
parent="projects/{}".format(project_id),
read_session=requested_session,
max_stream_count=requested_streams,
retry=retry_policy,
timeout=timeout,
)

_LOGGER.debug(
Expand Down Expand Up @@ -983,8 +1016,6 @@ def _download_table_bqstorage(
# Manually manage the pool to control shutdown behavior on timeout.
pool = concurrent.futures.ThreadPoolExecutor(max_workers=max(1, total_streams))
wait_on_shutdown = True
start_time = time.time()
Comment thread
chalmerlowe marked this conversation as resolved.

try:
# Manually submit jobs and wait for download to complete rather
# than using pool.map because pool.map continues running in the
Expand All @@ -1006,7 +1037,7 @@ def _download_table_bqstorage(
while not_done:
# Check for timeout
if timeout is not None:
elapsed = time.time() - start_time
elapsed = time.monotonic() - start_time
if elapsed > timeout:
wait_on_shutdown = False
raise concurrent.futures.TimeoutError(
Expand Down
16 changes: 11 additions & 5 deletions google/cloud/bigquery/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging

from google.api_core import exceptions
from google.api_core import retry
import google.api_core.future.polling
from google.auth import exceptions as auth_exceptions # type: ignore
import requests.exceptions

_LOGGER = logging.getLogger(__name__)

_RETRYABLE_REASONS = frozenset(
["rateLimitExceeded", "backendError", "internalError", "badGateway"]
Expand Down Expand Up @@ -61,14 +64,17 @@
def _should_retry(exc):
"""Predicate for determining when to retry.
We retry if and only if the 'reason' is 'backendError'
or 'rateLimitExceeded'.
We retry if and only if the 'reason' is in _RETRYABLE_REASONS or is
in _UNSTRUCTURED_RETRYABLE_TYPES.
"""
if not hasattr(exc, "errors") or len(exc.errors) == 0:
# Check for unstructured error returns, e.g. from GFE
try:
reason = exc.errors[0]["reason"]
except (AttributeError, IndexError, TypeError, KeyError):
# Fallback for when errors attribute is missing, empty, or not a dict
# or doesn't contain "reason" (e.g. gRPC exceptions).
_LOGGER.debug("Inspecting unstructured error for retry: %r", exc)
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE to reviewer:

Why did we use %r in the string to use repr() to output the exception? To reduce unnecessary work by using lazy evaluation.

In the Python logging module, the old-style % syntax is often preferred for performance. It allows the logger to skip the string formatting entirely if the log level (DEBUG) is not enabled. With f-strings, the string is eagerly evaluated as soon as the function is called, even if logging is turned off or the display level means the message won't be captured.

Because this code is not in a super-tight loop, the difference is negligible, but none-the-less it is good practice.

return isinstance(exc, _UNSTRUCTURED_RETRYABLE_TYPES)
Comment thread
chalmerlowe marked this conversation as resolved.

reason = exc.errors[0]["reason"]
return reason in _RETRYABLE_REASONS


Expand Down
6 changes: 5 additions & 1 deletion google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -2152,7 +2152,10 @@ def to_arrow_iterable(
timeout=timeout,
)
tabledata_list_download = functools.partial(
_pandas_helpers.download_arrow_row_iterator, iter(self.pages), self.schema
_pandas_helpers.download_arrow_row_iterator,
iter(self.pages),
self.schema,
timeout=timeout,
)
return self._to_page_iterable(
bqstorage_download,
Expand Down Expand Up @@ -2366,6 +2369,7 @@ def to_dataframe_iterable(
iter(self.pages),
self.schema,
dtypes,
timeout=timeout,
)
return self._to_page_iterable(
bqstorage_download,
Expand Down
48 changes: 48 additions & 0 deletions tests/unit/test_download_bqstorage_timeout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import pytest
from unittest import mock
from google.cloud.bigquery import _pandas_helpers

try:
from google.cloud import bigquery_storage
except ImportError:
bigquery_storage = None


@pytest.mark.skipif(
bigquery_storage is None, reason="Requires `google-cloud-bigquery-storage`"
)
def test_download_table_bqstorage_passes_timeout_to_create_read_session():
# Mock dependencies
project_id = "test-project"
table = mock.Mock()
table.table_id = "test_table"
table.to_bqstorage.return_value = "projects/test/datasets/test/tables/test"

bqstorage_client = mock.Mock(spec=bigquery_storage.BigQueryReadClient)
# Mock create_read_session to return a session with no streams so the function returns early
# (Checking start of loop logic vs empty streams return)
session = mock.Mock()
# If streams is empty, _download_table_bqstorage returns early, which is fine for this test
session.streams = []
bqstorage_client.create_read_session.return_value = session

# Call the function
timeout = 123.456
# download_arrow_bqstorage yields frames, so we need to iterate to trigger execution
list(
_pandas_helpers.download_arrow_bqstorage(
project_id, table, bqstorage_client, timeout=timeout
)
)

# Verify timeout and retry were passed
bqstorage_client.create_read_session.assert_called_once()
_, kwargs = bqstorage_client.create_read_session.call_args
assert "timeout" in kwargs
assert kwargs["timeout"] == timeout

assert "retry" in kwargs
retry_policy = kwargs["retry"]
assert retry_policy is not None
# Check if deadline is set correctly in the retry policy
assert retry_policy._deadline == timeout
Loading