Skip to content
This repository was archived by the owner on Apr 1, 2026. It is now read-only.

Commit aa3622d

Browse files
committed
feat: add StreamingDataFrame.to_bigtable and .to_pubsub start_timestamp parameter
1 parent 8804ada commit aa3622d

File tree

3 files changed

+49
-11
lines changed

3 files changed

+49
-11
lines changed

bigframes/pandas/__init__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from __future__ import annotations
1818

1919
from collections import namedtuple
20-
from datetime import datetime
20+
from datetime import date, datetime
2121
import inspect
2222
import sys
2323
import typing
@@ -194,7 +194,7 @@ def to_datetime(
194194

195195
@typing.overload
196196
def to_datetime(
197-
arg: Union[int, float, str, datetime],
197+
arg: Union[int, float, str, datetime, date],
198198
*,
199199
utc: bool = False,
200200
format: Optional[str] = None,
@@ -205,7 +205,7 @@ def to_datetime(
205205

206206
def to_datetime(
207207
arg: Union[
208-
Union[int, float, str, datetime],
208+
Union[int, float, str, datetime, date],
209209
vendored_pandas_datetimes.local_iterables,
210210
bigframes.series.Series,
211211
bigframes.dataframe.DataFrame,

bigframes/streaming/dataframe.py

Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,16 @@
1515
"""Module for bigquery continuous queries"""
1616
from __future__ import annotations
1717

18+
from abc import abstractmethod
19+
from datetime import date, datetime
1820
import functools
1921
import inspect
2022
import json
21-
from typing import Optional
23+
from typing import Optional, Union
2224
import warnings
2325

2426
from google.cloud import bigquery
27+
import pandas as pd
2528

2629
from bigframes import dataframe
2730
from bigframes.core import log_adapter, nodes
@@ -54,9 +57,14 @@ def _curate_df_doc(doc: Optional[str]):
5457

5558

5659
class StreamingBase:
57-
_appends_sql: str
5860
_session: bigframes.session.Session
5961

62+
@abstractmethod
63+
def _appends_sql(
64+
self, start_timestamp: Optional[Union[int, float, str, datetime, date]]
65+
) -> str:
66+
pass
67+
6068
def to_bigtable(
6169
self,
6270
*,
@@ -70,6 +78,8 @@ def to_bigtable(
7078
bigtable_options: Optional[dict] = None,
7179
job_id: Optional[str] = None,
7280
job_id_prefix: Optional[str] = None,
81+
start_timestamp: Optional[Union[int, float, str, datetime, date]] = None,
82+
end_timestamp: Optional[Union[int, float, str, datetime, date]] = None,
7383
) -> bigquery.QueryJob:
7484
"""
7585
Export the StreamingDataFrame as a continue job and returns a
@@ -115,6 +125,8 @@ def to_bigtable(
115125
If specified, a job id prefix for the query, see
116126
job_id_prefix parameter of
117127
https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.client.Client#google_cloud_bigquery_client_Client_query
128+
start_timestamp (int, float, str, datetime, date, default None):
129+
The start timestamp of the query. Possible values should be within the recent 7 days. If None, will start from the max value, 7 days ago. If pass in time zone naive values, use UTC time zone.
118130
119131
Returns:
120132
google.cloud.bigquery.QueryJob:
@@ -123,8 +135,15 @@ def to_bigtable(
123135
For example, the job can be cancelled or its error status
124136
can be examined.
125137
"""
138+
if not isinstance(
139+
start_timestamp, (int, float, str, datetime, date, type(None))
140+
):
141+
raise ValueError(
142+
f"Unsupported start_timestamp type {type(start_timestamp)}"
143+
)
144+
126145
return _to_bigtable(
127-
self._appends_sql,
146+
self._appends_sql(start_timestamp),
128147
instance=instance,
129148
table=table,
130149
service_account_email=service_account_email,
@@ -145,6 +164,7 @@ def to_pubsub(
145164
service_account_email: str,
146165
job_id: Optional[str] = None,
147166
job_id_prefix: Optional[str] = None,
167+
start_timestamp: Optional[Union[int, float, str, datetime, date]] = None,
148168
) -> bigquery.QueryJob:
149169
"""
150170
Export the StreamingDataFrame as a continue job and returns a
@@ -172,6 +192,8 @@ def to_pubsub(
172192
If specified, a job id prefix for the query, see
173193
job_id_prefix parameter of
174194
https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.client.Client#google_cloud_bigquery_client_Client_query
195+
start_timestamp (int, float, str, datetime, date, default None):
196+
The start timestamp of the query. Possible values should be within the recent 7 days. If None, will start from the max value, 7 days ago. If pass in time zone naive values, use UTC time zone.
175197
176198
Returns:
177199
google.cloud.bigquery.QueryJob:
@@ -180,8 +202,15 @@ def to_pubsub(
180202
For example, the job can be cancelled or its error status
181203
can be examined.
182204
"""
205+
if not isinstance(
206+
start_timestamp, (int, float, str, datetime, date, type(None))
207+
):
208+
raise ValueError(
209+
f"Unsupported start_timestamp type {type(start_timestamp)}"
210+
)
211+
183212
return _to_pubsub(
184-
self._appends_sql,
213+
self._appends_sql(start_timestamp),
185214
topic=topic,
186215
service_account_email=service_account_email,
187216
session=self._session,
@@ -280,14 +309,21 @@ def sql(self):
280309
sql.__doc__ = _curate_df_doc(inspect.getdoc(dataframe.DataFrame.sql))
281310

282311
# Patch for the required APPENDS clause
283-
@property
284-
def _appends_sql(self):
312+
def _appends_sql(
313+
self, start_timestamp: Optional[Union[int, float, str, datetime, date]]
314+
) -> str:
285315
sql_str = self.sql
286316
original_table = self._original_table
287317
assert original_table is not None
288318

289319
# TODO(b/405691193): set start time back to NULL. Now set it slightly after 7 days max interval to avoid the bug.
290-
appends_clause = f"APPENDS(TABLE `{original_table}`, CURRENT_TIMESTAMP() - (INTERVAL 7 DAY - INTERVAL 5 MINUTE))"
320+
start_ts_str = (
321+
str(pd.to_datetime(start_timestamp))
322+
if start_timestamp
323+
else "CURRENT_TIMESTAMP() - (INTERVAL 7 DAY - INTERVAL 5 MINUTE)"
324+
)
325+
326+
appends_clause = f"APPENDS(TABLE `{original_table}`, {start_ts_str})"
291327
sql_str = sql_str.replace(f"`{original_table}`", appends_clause)
292328
return sql_str
293329

tests/system/large/streaming/test_bigtable.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
from datetime import datetime, timedelta
1516
import time
1617
from typing import Generator
1718
import uuid
@@ -67,7 +68,7 @@ def bigtable_table(
6768
bt_table.delete()
6869

6970

70-
@pytest.mark.flaky(retries=3, delay=10)
71+
# @pytest.mark.flaky(retries=3, delay=10)
7172
def test_streaming_df_to_bigtable(
7273
session_load: bigframes.Session, bigtable_table: table.Table
7374
):
@@ -91,6 +92,7 @@ def test_streaming_df_to_bigtable(
9192
bigtable_options={},
9293
job_id=None,
9394
job_id_prefix=job_id_prefix,
95+
start_timestamp=datetime.now() - timedelta(days=1),
9496
)
9597

9698
# wait 100 seconds in order to ensure the query doesn't stop

0 commit comments

Comments
 (0)