Skip to content

Commit 7670d19

Browse files
satisfy the mypy
1 parent 5e55aaf commit 7670d19

18 files changed

Lines changed: 116 additions & 150 deletions

packages/bigframes/bigframes/blob/_functions.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,15 +103,14 @@ def _create_udf(self):
103103
\"\"\"
104104
"""
105105

106-
bf_io_bigquery.start_query_with_client(
106+
bf_io_bigquery.start_query_with_job(
107107
self._session.bqclient,
108108
sql,
109109
job_config=bigquery.QueryJobConfig(),
110110
metrics=self._session._metrics,
111111
location=None,
112112
project=None,
113113
timeout=None,
114-
query_with_job=True,
115114
publisher=self._session._publisher,
116115
)
117116

packages/bigframes/bigframes/functions/_function_client.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,15 +142,14 @@ def _create_bq_function(self, create_function_ddl: str) -> None:
142142
# TODO(swast): plumb through the original, user-facing api_name.
143143
import bigframes.session._io.bigquery
144144

145-
_, query_job = bigframes.session._io.bigquery.start_query_with_client(
145+
_, query_job = bigframes.session._io.bigquery.start_query_with_job(
146146
cast(bigquery.Client, self._session.bqclient),
147147
create_function_ddl,
148148
job_config=bigquery.QueryJobConfig(),
149149
location=None,
150150
project=None,
151151
timeout=None,
152152
metrics=None,
153-
query_with_job=True,
154153
publisher=self._session._publisher,
155154
)
156155
logger.info(f"Created bigframes function {query_job.ddl_target_routine}")

packages/bigframes/bigframes/functions/function.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -183,10 +183,9 @@ def __call__(self, *args, **kwargs):
183183

184184
args_string = ", ".join([sg_sql.to_sql(sg_sql.literal(v)) for v in args])
185185
sql = f"SELECT `{str(self._udf_def.routine_ref)}`({args_string})"
186-
iter, job = bf_io_bigquery.start_query_with_client(
186+
iter, job = bf_io_bigquery.start_query_with_job(
187187
self._session.bqclient,
188188
sql=sql,
189-
query_with_job=True,
190189
job_config=bigquery.QueryJobConfig(),
191190
publisher=self._session._publisher,
192191
) # type: ignore
@@ -257,10 +256,9 @@ def __call__(self, *args, **kwargs):
257256

258257
args_string = ", ".join([sg_sql.to_sql(sg_sql.literal(v)) for v in args])
259258
sql = f"SELECT `{str(self._udf_def.routine_ref)}`({args_string})"
260-
iter, job = bf_io_bigquery.start_query_with_client(
259+
iter, job = bf_io_bigquery.start_query_with_job(
261260
self._session.bqclient,
262261
sql=sql,
263-
query_with_job=True,
264262
job_config=bigquery.QueryJobConfig(),
265263
publisher=self._session._publisher,
266264
) # type: ignore

packages/bigframes/bigframes/session/__init__.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2310,15 +2310,14 @@ def _start_query_ml_ddl(
23102310
# so we must reset any encryption set in the job config
23112311
# https://cloud.google.com/bigquery/docs/customer-managed-encryption#encrypt-model
23122312
job_config.destination_encryption_configuration = None
2313-
iterator, query_job = bf_io_bigquery.start_query_with_client(
2313+
iterator, query_job = bf_io_bigquery.start_query_with_job(
23142314
self.bqclient,
23152315
sql,
23162316
job_config=job_config,
23172317
metrics=self._metrics,
23182318
location=None,
23192319
project=None,
23202320
timeout=None,
2321-
query_with_job=True,
23222321
job_retry=third_party_gcb_retry.DEFAULT_ML_JOB_RETRY,
23232322
publisher=self._publisher,
23242323
session=self,
@@ -2340,15 +2339,14 @@ def _create_object_table(self, path: str, connection: str) -> str:
23402339
uris = ['{path}']);
23412340
"""
23422341
)
2343-
bf_io_bigquery.start_query_with_client(
2342+
bf_io_bigquery.start_query_with_job(
23442343
self.bqclient,
23452344
sql,
23462345
job_config=bigquery.QueryJobConfig(),
23472346
metrics=self._metrics,
23482347
location=None,
23492348
project=None,
23502349
timeout=None,
2351-
query_with_job=True,
23522350
publisher=self._publisher,
23532351
session=self,
23542352
)

packages/bigframes/bigframes/session/_io/bigquery/__init__.py

Lines changed: 58 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -262,73 +262,7 @@ def publish_bq_event(event):
262262
return publish_bq_event
263263

264264

265-
@overload
266-
def start_query_with_client(
267-
bq_client: bigquery.Client,
268-
sql: str,
269-
*,
270-
job_config: bigquery.QueryJobConfig,
271-
location: Optional[str],
272-
project: Optional[str],
273-
timeout: Optional[float],
274-
metrics: Optional[bigframes.session.metrics.ExecutionMetrics],
275-
query_with_job: Literal[True],
276-
publisher: bigframes.core.events.Publisher,
277-
session=None,
278-
) -> Tuple[google.cloud.bigquery.table.RowIterator, bigquery.QueryJob]: ...
279-
280-
281-
@overload
282-
def start_query_with_client(
283-
bq_client: bigquery.Client,
284-
sql: str,
285-
*,
286-
job_config: bigquery.QueryJobConfig,
287-
location: Optional[str],
288-
project: Optional[str],
289-
timeout: Optional[float],
290-
metrics: Optional[bigframes.session.metrics.ExecutionMetrics],
291-
query_with_job: Literal[False],
292-
publisher: bigframes.core.events.Publisher,
293-
session=None,
294-
) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]: ...
295-
296-
297-
@overload
298-
def start_query_with_client(
299-
bq_client: bigquery.Client,
300-
sql: str,
301-
*,
302-
job_config: bigquery.QueryJobConfig,
303-
location: Optional[str],
304-
project: Optional[str],
305-
timeout: Optional[float],
306-
metrics: Optional[bigframes.session.metrics.ExecutionMetrics],
307-
query_with_job: Literal[True],
308-
job_retry: google.api_core.retry.Retry,
309-
publisher: bigframes.core.events.Publisher,
310-
session=None,
311-
) -> Tuple[google.cloud.bigquery.table.RowIterator, bigquery.QueryJob]: ...
312-
313-
314-
@overload
315-
def start_query_with_client(
316-
bq_client: bigquery.Client,
317-
sql: str,
318-
*,
319-
job_config: bigquery.QueryJobConfig,
320-
location: Optional[str],
321-
project: Optional[str],
322-
timeout: Optional[float],
323-
metrics: Optional[bigframes.session.metrics.ExecutionMetrics],
324-
query_with_job: Literal[False],
325-
job_retry: google.api_core.retry.Retry,
326-
publisher: bigframes.core.events.Publisher,
327-
session=None,
328-
) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]: ...
329-
330-
331-
def start_query_with_client(
265+
def start_query_with_job(
332266
bq_client: bigquery.Client,
333267
sql: str,
334268
*,
@@ -337,7 +271,6 @@ def start_query_with_client(
337271
project: Optional[str] = None,
338272
timeout: Optional[float] = None,
339273
metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None,
340-
query_with_job: bool = True,
341274
# TODO(tswast): We can stop providing our own default once we use a
342275
# google-cloud-bigquery version with
343276
# https://github.com/googleapis/python-bigquery/pull/2256 merged, likely
@@ -355,20 +288,6 @@ def start_query_with_client(
355288
add_and_trim_labels(job_config, session=session)
356289

357290
try:
358-
if not query_with_job:
359-
results_iterator = bq_client._query_and_wait_bigframes(
360-
sql,
361-
job_config=job_config,
362-
location=location,
363-
project=project,
364-
api_timeout=timeout,
365-
job_retry=job_retry,
366-
callback=create_bq_event_callback(publisher),
367-
)
368-
if metrics is not None:
369-
metrics.count_job_stats(row_iterator=results_iterator)
370-
return results_iterator, None
371-
372291
query_job = bq_client.query(
373292
sql,
374293
job_config=job_config,
@@ -382,6 +301,61 @@ def start_query_with_client(
382301
ex.message += CHECK_DRIVE_PERMISSIONS
383302
raise
384303

304+
results_iterator = query_job.result()
305+
_publish_events(
306+
query_job=query_job,
307+
total_rows=results_iterator.total_rows,
308+
sql=sql,
309+
publisher=publisher,
310+
metrics=metrics,
311+
)
312+
return results_iterator, query_job
313+
314+
315+
def start_query_job_optional(
316+
bq_client: bigquery.Client,
317+
sql: str,
318+
*,
319+
job_config: bigquery.QueryJobConfig,
320+
location: Optional[str] = None,
321+
project: Optional[str] = None,
322+
timeout: Optional[float] = None,
323+
metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None,
324+
# TODO(tswast): We can stop providing our own default once we use a
325+
# google-cloud-bigquery version with
326+
# https://github.com/googleapis/python-bigquery/pull/2256 merged, likely
327+
# version 3.36.0 or later.
328+
job_retry: google.api_core.retry.Retry = third_party_gcb_retry.DEFAULT_JOB_RETRY,
329+
publisher: bigframes.core.events.Publisher,
330+
session=None,
331+
):
332+
add_and_trim_labels(job_config, session=session)
333+
try:
334+
results_iterator = bq_client._query_and_wait_bigframes(
335+
sql,
336+
job_config=job_config,
337+
location=location,
338+
project=project,
339+
api_timeout=timeout,
340+
job_retry=job_retry,
341+
callback=create_bq_event_callback(publisher),
342+
)
343+
if metrics is not None:
344+
metrics.count_job_stats(row_iterator=results_iterator)
345+
return results_iterator, None
346+
except google.api_core.exceptions.Forbidden as ex:
347+
if "Drive credentials" in ex.message:
348+
ex.message += CHECK_DRIVE_PERMISSIONS
349+
raise
350+
351+
352+
def _publish_events(
353+
query_job: bigquery.QueryJob,
354+
sql: str,
355+
total_rows: Optional[int],
356+
publisher: bigframes.core.events.Publisher,
357+
metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None,
358+
):
385359
if not query_job.configuration.dry_run:
386360
publisher.publish(
387361
bigframes.core.events.BigQuerySentEvent(
@@ -392,15 +366,14 @@ def start_query_with_client(
392366
request_id=None,
393367
)
394368
)
395-
results_iterator = query_job.result()
396369
if not query_job.configuration.dry_run:
397370
publisher.publish(
398371
bigframes.core.events.BigQueryFinishedEvent(
399372
billing_project=query_job.project,
400373
location=query_job.location,
401374
job_id=query_job.job_id,
402375
destination=query_job.destination,
403-
total_rows=results_iterator.total_rows,
376+
total_rows=total_rows,
404377
total_bytes_processed=query_job.total_bytes_processed,
405378
slot_millis=query_job.slot_millis,
406379
created=query_job.created,
@@ -411,7 +384,6 @@ def start_query_with_client(
411384

412385
if metrics is not None:
413386
metrics.count_job_stats(query_job=query_job)
414-
return results_iterator, query_job
415387

416388

417389
def delete_tables_matching_session_id(
@@ -473,15 +445,14 @@ def create_bq_dataset_reference(
473445
"""
474446
job_config = google.cloud.bigquery.QueryJobConfig()
475447

476-
_, query_job = start_query_with_client(
448+
_, query_job = start_query_with_job(
477449
bq_client,
478450
"SELECT 1",
479451
location=location,
480452
job_config=job_config,
481453
project=project,
482454
timeout=None,
483455
metrics=None,
484-
query_with_job=True,
485456
publisher=publisher,
486457
)
487458

packages/bigframes/bigframes/session/_io/bigquery/read_gbq_table.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -193,15 +193,14 @@ def is_time_travel_eligible(
193193
)
194194
try:
195195
# If this succeeds, we know that time travel will for sure work.
196-
bigframes.session._io.bigquery.start_query_with_client(
196+
bigframes.session._io.bigquery.start_query_with_job_optional(
197197
bq_client=bqclient,
198198
sql=snapshot_sql,
199199
job_config=bigquery.QueryJobConfig(dry_run=True),
200200
location=None,
201201
project=None,
202202
timeout=None,
203203
metrics=None,
204-
query_with_job=False,
205204
publisher=publisher,
206205
)
207206
return True
@@ -266,15 +265,14 @@ def check_if_index_columns_are_unique(
266265
index_cols, table.get_table_ref()
267266
)
268267
job_config = bigquery.QueryJobConfig()
269-
results, _ = bigframes.session._io.bigquery.start_query_with_client(
268+
results, _ = bigframes.session._io.bigquery.start_query_with_job_optional(
270269
bq_client=bqclient,
271270
sql=is_unique_sql,
272271
job_config=job_config,
273272
timeout=None,
274273
location=None,
275274
project=None,
276275
metrics=None,
277-
query_with_job=False,
278276
publisher=publisher,
279277
)
280278
row = next(iter(results))

0 commit comments

Comments
 (0)