@@ -741,7 +741,7 @@ def _row_iterator_page_to_arrow(page, column_names, arrow_types):
741741 return pyarrow .RecordBatch .from_arrays (arrays , names = column_names )
742742
743743
744- def download_arrow_row_iterator (pages , bq_schema ):
744+ def download_arrow_row_iterator (pages , bq_schema , timeout = None ):
745745 """Use HTTP JSON RowIterator to construct an iterable of RecordBatches.
746746
747747 Args:
@@ -752,6 +752,10 @@ def download_arrow_row_iterator(pages, bq_schema):
752752 Mapping[str, Any] \
753753 ]]):
754754 A decription of the fields in result pages.
755+ timeout (Optional[float]):
756+ The number of seconds to wait for the underlying download to complete.
757+ If ``None``, wait indefinitely.
758+
755759 Yields:
756760 :class:`pyarrow.RecordBatch`
757761 The next page of records as a ``pyarrow`` record batch.
@@ -760,8 +764,16 @@ def download_arrow_row_iterator(pages, bq_schema):
760764 column_names = bq_to_arrow_schema (bq_schema ) or [field .name for field in bq_schema ]
761765 arrow_types = [bq_to_arrow_data_type (field ) for field in bq_schema ]
762766
763- for page in pages :
764- yield _row_iterator_page_to_arrow (page , column_names , arrow_types )
767+ if timeout is None :
768+ for page in pages :
769+ yield _row_iterator_page_to_arrow (page , column_names , arrow_types )
770+ else :
771+ start_time = time .monotonic ()
772+ for page in pages :
773+ if time .monotonic () - start_time > timeout :
774+ raise concurrent .futures .TimeoutError ()
775+
776+ yield _row_iterator_page_to_arrow (page , column_names , arrow_types )
765777
766778
767779def _row_iterator_page_to_dataframe (page , column_names , dtypes ):
@@ -779,7 +791,7 @@ def _row_iterator_page_to_dataframe(page, column_names, dtypes):
779791 return pandas .DataFrame (columns , columns = column_names )
780792
781793
782- def download_dataframe_row_iterator (pages , bq_schema , dtypes ):
794+ def download_dataframe_row_iterator (pages , bq_schema , dtypes , timeout = None ):
783795 """Use HTTP JSON RowIterator to construct a DataFrame.
784796
785797 Args:
@@ -793,14 +805,27 @@ def download_dataframe_row_iterator(pages, bq_schema, dtypes):
793805 dtypes(Mapping[str, numpy.dtype]):
794806 The types of columns in result data to hint construction of the
795807 resulting DataFrame. Not all column types have to be specified.
808+ timeout (Optional[float]):
809+ The number of seconds to wait for the underlying download to complete.
810+ If ``None``, wait indefinitely.
811+
796812 Yields:
797813 :class:`pandas.DataFrame`
798814 The next page of records as a ``pandas.DataFrame`` record batch.
799815 """
800816 bq_schema = schema ._to_schema_fields (bq_schema )
801817 column_names = [field .name for field in bq_schema ]
802- for page in pages :
803- yield _row_iterator_page_to_dataframe (page , column_names , dtypes )
818+
819+ if timeout is None :
820+ for page in pages :
821+ yield _row_iterator_page_to_dataframe (page , column_names , dtypes )
822+ else :
823+ start_time = time .monotonic ()
824+ for page in pages :
825+ if time .monotonic () - start_time > timeout :
826+ raise concurrent .futures .TimeoutError ()
827+
828+ yield _row_iterator_page_to_dataframe (page , column_names , dtypes )
804829
805830
806831def _bqstorage_page_to_arrow (page ):
0 commit comments