Skip to content
This repository was archived by the owner on Mar 6, 2026. It is now read-only.

Commit 39febf9

Browse files
committed
test(bigquery): add unit tests for timeout logic in _pandas_helpers.py
1 parent 031adf1 commit 39febf9

3 files changed

Lines changed: 112 additions & 0 deletions

File tree

tests/unit/job/test_query_pandas.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1026,3 +1026,35 @@ def test_query_job_to_geodataframe_delegation(wait_for_query):
10261026
timeout=None,
10271027
)
10281028
assert df is row_iterator.to_geodataframe.return_value
1029+
1030+
1031+
@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
1032+
@mock.patch("google.cloud.bigquery.job.query.wait_for_query")
1033+
def test_query_job_to_dataframe_delegation(wait_for_query):
1034+
job = _make_job()
1035+
bqstorage_client = object()
1036+
timeout = 123.45
1037+
1038+
job.to_dataframe(bqstorage_client=bqstorage_client, timeout=timeout)
1039+
1040+
wait_for_query.assert_called_once_with(job, None, max_results=None)
1041+
row_iterator = wait_for_query.return_value
1042+
row_iterator.to_dataframe.assert_called_once()
1043+
call_args = row_iterator.to_dataframe.call_args
1044+
assert call_args.kwargs["timeout"] == timeout
1045+
1046+
1047+
@pytest.mark.skipif(pyarrow is None, reason="Requires `pyarrow`")
1048+
@mock.patch("google.cloud.bigquery.job.query.wait_for_query")
1049+
def test_query_job_to_arrow_delegation(wait_for_query):
1050+
job = _make_job()
1051+
bqstorage_client = object()
1052+
timeout = 123.45
1053+
1054+
job.to_arrow(bqstorage_client=bqstorage_client, timeout=timeout)
1055+
1056+
wait_for_query.assert_called_once_with(job, None, max_results=None)
1057+
row_iterator = wait_for_query.return_value
1058+
row_iterator.to_arrow.assert_called_once()
1059+
call_args = row_iterator.to_arrow.call_args
1060+
assert call_args.kwargs["timeout"] == timeout

tests/unit/test__pandas_helpers.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,14 @@
1313
# limitations under the License.
1414

1515
import collections
16+
import concurrent.futures
1617
import datetime
1718
import decimal
1819
import functools
1920
import gc
2021
import operator
2122
import queue
23+
import time
2224
from typing import Union
2325
from unittest import mock
2426
import warnings
@@ -2177,3 +2179,69 @@ def test_determine_requested_streams_invalid_max_stream_count():
21772179
"""Tests that a ValueError is raised if max_stream_count is negative."""
21782180
with pytest.raises(ValueError):
21792181
determine_requested_streams(preserve_order=False, max_stream_count=-1)
2182+
2183+
2184+
def test__download_table_bqstorage_w_timeout_error(module_under_test):
2185+
from google.cloud.bigquery import dataset
2186+
from google.cloud.bigquery import table
2187+
2188+
bqstorage_client = mock.create_autospec(
2189+
bigquery_storage.BigQueryReadClient, instance=True
2190+
)
2191+
# Give it one stream
2192+
fake_session = mock.Mock(streams=["stream/s0"])
2193+
bqstorage_client.create_read_session.return_value = fake_session
2194+
2195+
table_ref = table.TableReference(
2196+
dataset.DatasetReference("project-x", "dataset-y"),
2197+
"table-z",
2198+
)
2199+
2200+
def slow_download_stream(
2201+
download_state, bqstorage_client, session, stream, worker_queue, page_to_item
2202+
):
2203+
# Block until the main thread sets done=True (which it will on timeout)
2204+
while not download_state.done:
2205+
time.sleep(0.01)
2206+
2207+
with mock.patch.object(
2208+
module_under_test, "_download_table_bqstorage_stream", new=slow_download_stream
2209+
):
2210+
# Use a very small timeout
2211+
result_gen = module_under_test._download_table_bqstorage(
2212+
"some-project", table_ref, bqstorage_client, timeout=0.01
2213+
)
2214+
with pytest.raises(concurrent.futures.TimeoutError, match="timed out"):
2215+
list(result_gen)
2216+
2217+
2218+
def test__download_table_bqstorage_w_timeout_success(module_under_test):
2219+
from google.cloud.bigquery import dataset
2220+
from google.cloud.bigquery import table
2221+
2222+
bqstorage_client = mock.create_autospec(
2223+
bigquery_storage.BigQueryReadClient, instance=True
2224+
)
2225+
fake_session = mock.Mock(streams=["stream/s0"])
2226+
bqstorage_client.create_read_session.return_value = fake_session
2227+
2228+
table_ref = table.TableReference(
2229+
dataset.DatasetReference("project-x", "dataset-y"),
2230+
"table-z",
2231+
)
2232+
2233+
def fast_download_stream(
2234+
download_state, bqstorage_client, session, stream, worker_queue, page_to_item
2235+
):
2236+
worker_queue.put("result_page")
2237+
2238+
with mock.patch.object(
2239+
module_under_test, "_download_table_bqstorage_stream", new=fast_download_stream
2240+
):
2241+
# Use a generous timeout
2242+
result_gen = module_under_test._download_table_bqstorage(
2243+
"some-project", table_ref, bqstorage_client, timeout=10.0
2244+
)
2245+
results = list(result_gen)
2246+
2247+
assert results == ["result_page"]

tests/unit/test_table.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2495,6 +2495,18 @@ def test_to_geodataframe(self):
24952495
else:
24962496
assert not hasattr(df, "crs")
24972497

2498+
def test_methods_w_timeout(self):
2499+
# Ensure that timeout parameter is accepted by all methods and ignored.
2500+
row_iterator = self._make_one()
2501+
timeout = 42.0
2502+
2503+
# Just calling them to ensure no TypeError is raised
2504+
row_iterator.to_arrow(timeout=timeout)
2505+
row_iterator.to_arrow_iterable(timeout=timeout)
2506+
row_iterator.to_dataframe(timeout=timeout)
2507+
row_iterator.to_dataframe_iterable(timeout=timeout)
2508+
row_iterator.to_geodataframe(timeout=timeout)
2509+
24982510

24992511
class TestRowIterator(unittest.TestCase):
25002512
PYARROW_MINIMUM_VERSION = str(_versions_helpers._MIN_PYARROW_VERSION)

0 commit comments

Comments
 (0)