Skip to content
This repository was archived by the owner on Apr 1, 2026. It is now read-only.

Commit 0008e99

Browse files
committed
move publisher to session
1 parent 91506c3 commit 0008e99

File tree

12 files changed

+98
-36
lines changed

12 files changed

+98
-36
lines changed

bigframes/blob/_functions.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ def _create_udf(self):
9999
project=None,
100100
timeout=None,
101101
query_with_job=True,
102+
publisher=self._session._publisher,
102103
)
103104

104105
return udf_name

bigframes/core/events.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,6 @@ def send(self, event: Event):
6868
callback(event)
6969

7070

71-
publisher = Publisher()
72-
publisher.subscribe(bigframes.formatting_helpers.progress_callback)
73-
74-
7571
class Event:
7672
pass
7773

bigframes/functions/_function_client.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ def _create_bq_function(self, create_function_ddl: str) -> None:
145145
timeout=None,
146146
metrics=None,
147147
query_with_job=True,
148+
publisher=self._session._publisher,
148149
)
149150
logger.info(f"Created bigframes function {query_job.ddl_target_routine}")
150151

bigframes/functions/function.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,13 @@ def __call__(self, *args, **kwargs):
219219

220220
args_string = ", ".join(map(bf_sql.simple_literal, args))
221221
sql = f"SELECT `{str(self._udf_def.routine_ref)}`({args_string})"
222-
iter, job = bf_io_bigquery.start_query_with_client(self._session.bqclient, sql=sql, query_with_job=True, job_config=bigquery.QueryJobConfig()) # type: ignore
222+
iter, job = bf_io_bigquery.start_query_with_client(
223+
self._session.bqclient,
224+
sql=sql,
225+
query_with_job=True,
226+
job_config=bigquery.QueryJobConfig(),
227+
publisher=self._session._publisher,
228+
) # type: ignore
223229
return list(iter.to_arrow().to_pydict().values())[0][0]
224230

225231
@property
@@ -297,7 +303,13 @@ def __call__(self, *args, **kwargs):
297303

298304
args_string = ", ".join(map(bf_sql.simple_literal, args))
299305
sql = f"SELECT `{str(self._udf_def.routine_ref)}`({args_string})"
300-
iter, job = bf_io_bigquery.start_query_with_client(self._session.bqclient, sql=sql, query_with_job=True, job_config=bigquery.QueryJobConfig()) # type: ignore
306+
iter, job = bf_io_bigquery.start_query_with_client(
307+
self._session.bqclient,
308+
sql=sql,
309+
query_with_job=True,
310+
job_config=bigquery.QueryJobConfig(),
311+
publisher=self._session._publisher,
312+
) # type: ignore
301313
return list(iter.to_arrow().to_pydict().values())[0][0]
302314

303315
@property

bigframes/session/__init__.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,9 @@
6767
import bigframes.constants
6868
import bigframes.core
6969
from bigframes.core import blocks, log_adapter, utils
70+
import bigframes.core.events
7071
import bigframes.core.pyformat
71-
72-
# Even though the ibis.backends.bigquery import is unused, it's needed
73-
# to register new and replacement ops with the Ibis BigQuery backend.
72+
import bigframes.formatting_helpers
7473
import bigframes.functions._function_session as bff_session
7574
import bigframes.functions.function as bff
7675
from bigframes.session import bigquery_session, bq_caching_executor, executor
@@ -137,6 +136,11 @@ def __init__(
137136

138137
_warn_if_bf_version_is_obsolete()
139138

139+
# Publisher needs to be created before the other objects, especially
140+
# the executors, because they access it.
141+
self._publisher = bigframes.core.events.Publisher()
142+
self._publisher.subscribe(bigframes.formatting_helpers.progress_callback)
143+
140144
if context is None:
141145
context = bigquery_options.BigQueryOptions()
142146

@@ -251,6 +255,7 @@ def __init__(
251255
scan_index_uniqueness=self._strictly_ordered,
252256
force_total_order=self._strictly_ordered,
253257
metrics=self._metrics,
258+
publisher=self._publisher,
254259
)
255260
self._executor: executor.Executor = bq_caching_executor.BigQueryCachingExecutor(
256261
bqclient=self._clients_provider.bqclient,
@@ -260,6 +265,7 @@ def __init__(
260265
strictly_ordered=self._strictly_ordered,
261266
metrics=self._metrics,
262267
enable_polars_execution=context.enable_polars_execution,
268+
publisher=self._publisher,
263269
)
264270

265271
def __del__(self):
@@ -2150,6 +2156,7 @@ def _start_query_ml_ddl(
21502156
timeout=None,
21512157
query_with_job=True,
21522158
job_retry=third_party_gcb_retry.DEFAULT_ML_JOB_RETRY,
2159+
publisher=self._publisher,
21532160
)
21542161
return iterator, query_job
21552162

@@ -2177,6 +2184,7 @@ def _create_object_table(self, path: str, connection: str) -> str:
21772184
project=None,
21782185
timeout=None,
21792186
query_with_job=True,
2187+
publisher=self._publisher,
21802188
)
21812189

21822190
return table

bigframes/session/_io/bigquery/__init__.py

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -240,19 +240,22 @@ def add_and_trim_labels(job_config):
240240
)
241241

242242

243-
def publish_bq_event(event):
244-
if isinstance(event, google.cloud.bigquery._job_helpers.QueryFinishedEvent):
245-
bf_event = bigframes.core.events.BigQueryFinishedEvent.from_bqclient(event)
246-
elif isinstance(event, google.cloud.bigquery._job_helpers.QueryReceivedEvent):
247-
bf_event = bigframes.core.events.BigQueryReceivedEvent.from_bqclient(event)
248-
elif isinstance(event, google.cloud.bigquery._job_helpers.QueryRetryEvent):
249-
bf_event = bigframes.core.events.BigQueryRetryEvent.from_bqclient(event)
250-
elif isinstance(event, google.cloud.bigquery._job_helpers.QuerySentEvent):
251-
bf_event = bigframes.core.events.BigQuerySentEvent.from_bqclient(event)
252-
else:
253-
bf_event = bigframes.core.events.BigQueryUnknownEvent(event)
243+
def create_bq_event_callback(publisher):
244+
def publish_bq_event(event):
245+
if isinstance(event, google.cloud.bigquery._job_helpers.QueryFinishedEvent):
246+
bf_event = bigframes.core.events.BigQueryFinishedEvent.from_bqclient(event)
247+
elif isinstance(event, google.cloud.bigquery._job_helpers.QueryReceivedEvent):
248+
bf_event = bigframes.core.events.BigQueryReceivedEvent.from_bqclient(event)
249+
elif isinstance(event, google.cloud.bigquery._job_helpers.QueryRetryEvent):
250+
bf_event = bigframes.core.events.BigQueryRetryEvent.from_bqclient(event)
251+
elif isinstance(event, google.cloud.bigquery._job_helpers.QuerySentEvent):
252+
bf_event = bigframes.core.events.BigQuerySentEvent.from_bqclient(event)
253+
else:
254+
bf_event = bigframes.core.events.BigQueryUnknownEvent(event)
254255

255-
bigframes.core.events.publisher.send(bf_event)
256+
publisher.send(bf_event)
257+
258+
return publish_bq_event
256259

257260

258261
@overload
@@ -266,6 +269,7 @@ def start_query_with_client(
266269
timeout: Optional[float],
267270
metrics: Optional[bigframes.session.metrics.ExecutionMetrics],
268271
query_with_job: Literal[True],
272+
publisher: bigframes.core.events.Publisher,
269273
) -> Tuple[google.cloud.bigquery.table.RowIterator, bigquery.QueryJob]:
270274
...
271275

@@ -281,6 +285,7 @@ def start_query_with_client(
281285
timeout: Optional[float],
282286
metrics: Optional[bigframes.session.metrics.ExecutionMetrics],
283287
query_with_job: Literal[False],
288+
publisher: bigframes.core.events.Publisher,
284289
) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]:
285290
...
286291

@@ -297,6 +302,7 @@ def start_query_with_client(
297302
metrics: Optional[bigframes.session.metrics.ExecutionMetrics],
298303
query_with_job: Literal[True],
299304
job_retry: google.api_core.retry.Retry,
305+
publisher: bigframes.core.events.Publisher,
300306
) -> Tuple[google.cloud.bigquery.table.RowIterator, bigquery.QueryJob]:
301307
...
302308

@@ -313,6 +319,7 @@ def start_query_with_client(
313319
metrics: Optional[bigframes.session.metrics.ExecutionMetrics],
314320
query_with_job: Literal[False],
315321
job_retry: google.api_core.retry.Retry,
322+
publisher: bigframes.core.events.Publisher,
316323
) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]:
317324
...
318325

@@ -332,6 +339,7 @@ def start_query_with_client(
332339
# https://github.com/googleapis/python-bigquery/pull/2256 merged, likely
333340
# version 3.36.0 or later.
334341
job_retry: google.api_core.retry.Retry = third_party_gcb_retry.DEFAULT_JOB_RETRY,
342+
publisher: bigframes.core.events.Publisher,
335343
) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]:
336344
"""
337345
Starts query job and waits for results.
@@ -350,7 +358,7 @@ def start_query_with_client(
350358
project=project,
351359
api_timeout=timeout,
352360
job_retry=job_retry,
353-
callback=publish_bq_event,
361+
callback=create_bq_event_callback(publisher),
354362
)
355363
if metrics is not None:
356364
metrics.count_job_stats(row_iterator=results_iterator)
@@ -370,7 +378,7 @@ def start_query_with_client(
370378
raise
371379

372380
if not query_job.configuration.dry_run:
373-
bigframes.core.events.publisher.send(
381+
publisher.send(
374382
bigframes.core.events.BigQuerySentEvent(
375383
sql,
376384
billing_project=query_job.project,
@@ -381,7 +389,7 @@ def start_query_with_client(
381389
)
382390
results_iterator = query_job.result()
383391
if not query_job.configuration.dry_run:
384-
bigframes.core.events.publisher.send(
392+
publisher.send(
385393
bigframes.core.events.BigQueryFinishedEvent(
386394
billing_project=query_job.project,
387395
location=query_job.location,
@@ -436,6 +444,8 @@ def create_bq_dataset_reference(
436444
bq_client: bigquery.Client,
437445
location: Optional[str] = None,
438446
project: Optional[str] = None,
447+
*,
448+
publisher: bigframes.core.events.Publisher,
439449
) -> bigquery.DatasetReference:
440450
"""Create and identify dataset(s) for temporary BQ resources.
441451
@@ -467,6 +477,7 @@ def create_bq_dataset_reference(
467477
timeout=None,
468478
metrics=None,
469479
query_with_job=True,
480+
publisher=publisher,
470481
)
471482

472483
# The anonymous dataset is used by BigQuery to write query results and

bigframes/session/_io/bigquery/read_gbq_table.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import google.cloud.bigquery as bigquery
2929
import google.cloud.bigquery.table
3030

31+
import bigframes.core.events
3132
import bigframes.exceptions as bfe
3233
import bigframes.session._io.bigquery
3334

@@ -43,6 +44,7 @@ def get_table_metadata(
4344
*,
4445
cache: Dict[bigquery.TableReference, Tuple[datetime.datetime, bigquery.Table]],
4546
use_cache: bool = True,
47+
publisher: bigframes.core.events.Publisher,
4648
) -> Tuple[datetime.datetime, google.cloud.bigquery.table.Table]:
4749
"""Get the table metadata, either from cache or via REST API."""
4850

@@ -59,6 +61,7 @@ def get_table_metadata(
5961
# Don't warn, because that will already have been taken care of.
6062
should_warn=False,
6163
should_dry_run=False,
64+
publisher=publisher,
6265
):
6366
# This warning should only happen if the cached snapshot_time will
6467
# have any effect on bigframes (b/437090788). For example, with
@@ -108,6 +111,7 @@ def is_time_travel_eligible(
108111
*,
109112
should_warn: bool,
110113
should_dry_run: bool,
114+
publisher: bigframes.core.events.Publisher,
111115
):
112116
"""Check if a table is eligible to use time-travel.
113117
@@ -184,6 +188,7 @@ def is_time_travel_eligible(
184188
timeout=None,
185189
metrics=None,
186190
query_with_job=False,
191+
publisher=publisher,
187192
)
188193
return True
189194

@@ -235,6 +240,8 @@ def check_if_index_columns_are_unique(
235240
bqclient: bigquery.Client,
236241
table: google.cloud.bigquery.table.Table,
237242
index_cols: List[str],
243+
*,
244+
publisher: bigframes.core.events.Publisher,
238245
) -> Tuple[str, ...]:
239246
import bigframes.core.sql
240247
import bigframes.session._io.bigquery
@@ -252,6 +259,7 @@ def check_if_index_columns_are_unique(
252259
project=None,
253260
metrics=None,
254261
query_with_job=False,
262+
publisher=publisher,
255263
)
256264
row = next(iter(results))
257265

bigframes/session/bq_caching_executor.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ def __init__(
140140
strictly_ordered: bool = True,
141141
metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None,
142142
enable_polars_execution: bool = False,
143+
publisher: bigframes.core.events.Publisher,
143144
):
144145
self.bqclient = bqclient
145146
self.storage_manager = storage_manager
@@ -149,6 +150,9 @@ def __init__(
149150
self.loader = loader
150151
self.bqstoragereadclient = bqstoragereadclient
151152
self._enable_polars_execution = enable_polars_execution
153+
self._publisher = publisher
154+
155+
# TODO(tswast): Send events from semi-executors, too.
152156
self._semi_executors: Sequence[semi_executor.SemiExecutor] = (
153157
read_api_execution.ReadApiSemiExecutor(
154158
bqstoragereadclient=bqstoragereadclient,
@@ -188,7 +192,7 @@ def execute(
188192
array_value: bigframes.core.ArrayValue,
189193
execution_spec: ex_spec.ExecutionSpec,
190194
) -> executor.ExecuteResult:
191-
bigframes.core.events.publisher.send(bigframes.core.events.ExecutionStarted())
195+
self._publisher.send(bigframes.core.events.ExecutionStarted())
192196

193197
# TODO: Support export jobs in combination with semi executors
194198
if execution_spec.destination_spec is None:
@@ -198,7 +202,7 @@ def execute(
198202
plan, ordered=execution_spec.ordered, peek=execution_spec.peek
199203
)
200204
if maybe_result:
201-
bigframes.core.events.publisher.send(
205+
self._publisher.send(
202206
bigframes.core.events.ExecutionFinished(
203207
result=maybe_result,
204208
)
@@ -212,7 +216,7 @@ def execute(
212216
)
213217
# separate path for export_gbq, as it has all sorts of annoying logic, such as possibly running as dml
214218
result = self._export_gbq(array_value, execution_spec.destination_spec)
215-
bigframes.core.events.publisher.send(
219+
self._publisher.send(
216220
bigframes.core.events.ExecutionFinished(
217221
result=result,
218222
)
@@ -232,7 +236,7 @@ def execute(
232236
if isinstance(execution_spec.destination_spec, ex_spec.GcsOutputSpec):
233237
self._export_result_gcs(result, execution_spec.destination_spec)
234238

235-
bigframes.core.events.publisher.send(
239+
self._publisher.send(
236240
bigframes.core.events.ExecutionFinished(
237241
result=result,
238242
)
@@ -261,6 +265,7 @@ def _export_result_gcs(
261265
location=None,
262266
timeout=None,
263267
query_with_job=True,
268+
publisher=self._publisher,
264269
)
265270

266271
def _maybe_find_existing_table(
@@ -419,6 +424,7 @@ def _run_execute_query(
419424
location=None,
420425
timeout=None,
421426
query_with_job=True,
427+
publisher=self._publisher,
422428
)
423429
else:
424430
return bq_io.start_query_with_client(
@@ -430,6 +436,7 @@ def _run_execute_query(
430436
location=None,
431437
timeout=None,
432438
query_with_job=False,
439+
publisher=self._publisher,
433440
)
434441

435442
except google.api_core.exceptions.BadRequest as e:

0 commit comments

Comments
 (0)