Skip to content
This repository was archived by the owner on Apr 1, 2026. It is now read-only.

Commit b35015e

Browse files
committed
add explicit unsubscribe
1 parent 51e0ca6 commit b35015e

File tree

5 files changed

+72
-49
lines changed

5 files changed

+72
-49
lines changed

bigframes/core/events.py

Lines changed: 55 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -16,56 +16,70 @@
1616

1717
import dataclasses
1818
import datetime
19-
import threading
20-
from typing import List, Optional
21-
import weakref
19+
from typing import Any, Callable, Optional, Set
20+
import uuid
2221

2322
import google.cloud.bigquery._job_helpers
2423
import google.cloud.bigquery.job.query
2524
import google.cloud.bigquery.table
2625

27-
import bigframes.formatting_helpers
2826
import bigframes.session.executor
2927

3028

31-
@dataclasses.dataclass(frozen=True)
3229
class Subscriber:
33-
callback_ref: weakref.ref
34-
# TODO(tswast): Add block_id to allow filter in context managers.
30+
def __init__(self, callback: Callable[[Event], None], *, publisher: Publisher):
31+
self._publisher = publisher
32+
self._callback = callback
33+
self._subscriber_id = str(uuid.uuid4())
34+
35+
def __call__(self, *args, **kwargs):
36+
return self._callback(*args, **kwargs)
37+
38+
def __hash__(self) -> int:
39+
return hash(self._subscriber_id)
40+
41+
def __eq__(self, value: object):
42+
if not isinstance(value, Subscriber):
43+
return NotImplemented
44+
return value._subscriber_id == self._subscriber_id
45+
46+
def close(self):
47+
self._publisher.unsubscribe(self)
48+
del self._publisher
49+
del self._callback
50+
51+
def __enter__(self):
52+
return self
53+
54+
def __exit__(self, exc_type, exc_value, traceback):
55+
if exc_value is not None:
56+
self(
57+
UnknownErrorEvent(
58+
exc_type=exc_type,
59+
exc_value=exc_value,
60+
traceback=traceback,
61+
)
62+
)
63+
self.close()
3564

3665

3766
class Publisher:
3867
def __init__(self):
39-
self._subscribers: List[Subscriber] = []
40-
self._subscribers_lock = threading.Lock()
41-
42-
def subscribe(self, callback):
43-
subscriber = Subscriber(callback_ref=weakref.ref(callback))
44-
45-
with self._subscribers_lock:
46-
# TODO(tswast): Add block_id to allow filter in context managers.
47-
self._subscribers.append(subscriber)
48-
49-
def send(self, event: Event):
50-
to_delete = []
51-
to_call = []
68+
self._subscribers: Set[Subscriber] = set()
5269

53-
with self._subscribers_lock:
54-
for sid, subscriber in enumerate(self._subscribers):
55-
callback = subscriber.callback_ref()
70+
def subscribe(self, callback: Callable[[Event], None]) -> Subscriber:
71+
# TODO(b/448176657): figure out how to handle subscribers/publishers in
72+
# a background thread. Maybe subscribers should be thread-local?
73+
subscriber = Subscriber(callback, publisher=self)
74+
self._subscribers.add(subscriber)
75+
return subscriber
5676

57-
if callback is None:
58-
to_delete.append(sid)
59-
else:
60-
# TODO(tswast): Add if statement for block_id to allow filter
61-
# in context managers.
62-
to_call.append(callback)
77+
def unsubscribe(self, subscriber: Subscriber):
78+
self._subscribers.remove(subscriber)
6379

64-
for sid in reversed(to_delete):
65-
del self._subscribers[sid]
66-
67-
for callback in to_call:
68-
callback(event)
80+
def publish(self, event: Event):
81+
for subscriber in self._subscribers:
82+
subscriber(event)
6983

7084

7185
class Event:
@@ -90,6 +104,13 @@ class ExecutionFinished(Event):
90104
result: Optional[bigframes.session.executor.ExecuteResult] = None
91105

92106

107+
@dataclasses.dataclass(frozen=True)
108+
class UnknownErrorEvent(Event):
109+
exc_type: Any
110+
exc_value: Any
111+
traceback: Any
112+
113+
93114
@dataclasses.dataclass(frozen=True)
94115
class BigQuerySentEvent(ExecutionRunning):
95116
"""Query sent to BigQuery."""

bigframes/session/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,9 @@ def close(self):
383383

384384
publisher_session = getattr(self, "_publisher", None)
385385
if publisher_session:
386-
publisher_session.send(self.session_id)
386+
publisher_session.publish(
387+
bigframes.core.events.SessionClosed(self.session_id)
388+
)
387389

388390
@overload
389391
def read_gbq( # type: ignore[overload-overlap]

bigframes/session/_io/bigquery/__init__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ def publish_bq_event(event):
253253
else:
254254
bf_event = bigframes.core.events.BigQueryUnknownEvent(event)
255255

256-
publisher.send(bf_event)
256+
publisher.publish(bf_event)
257257

258258
return publish_bq_event
259259

@@ -378,7 +378,7 @@ def start_query_with_client(
378378
raise
379379

380380
if not query_job.configuration.dry_run:
381-
publisher.send(
381+
publisher.publish(
382382
bigframes.core.events.BigQuerySentEvent(
383383
sql,
384384
billing_project=query_job.project,
@@ -389,7 +389,7 @@ def start_query_with_client(
389389
)
390390
results_iterator = query_job.result()
391391
if not query_job.configuration.dry_run:
392-
publisher.send(
392+
publisher.publish(
393393
bigframes.core.events.BigQueryFinishedEvent(
394394
billing_project=query_job.project,
395395
location=query_job.location,

bigframes/session/bq_caching_executor.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ def execute(
192192
array_value: bigframes.core.ArrayValue,
193193
execution_spec: ex_spec.ExecutionSpec,
194194
) -> executor.ExecuteResult:
195-
self._publisher.send(bigframes.core.events.ExecutionStarted())
195+
self._publisher.publish(bigframes.core.events.ExecutionStarted())
196196

197197
# TODO: Support export jobs in combination with semi executors
198198
if execution_spec.destination_spec is None:
@@ -202,7 +202,7 @@ def execute(
202202
plan, ordered=execution_spec.ordered, peek=execution_spec.peek
203203
)
204204
if maybe_result:
205-
self._publisher.send(
205+
self._publisher.publish(
206206
bigframes.core.events.ExecutionFinished(
207207
result=maybe_result,
208208
)
@@ -216,7 +216,7 @@ def execute(
216216
)
217217
# separate path for export_gbq, as it has all sorts of annoying logic, such as possibly running as dml
218218
result = self._export_gbq(array_value, execution_spec.destination_spec)
219-
self._publisher.send(
219+
self._publisher.publish(
220220
bigframes.core.events.ExecutionFinished(
221221
result=result,
222222
)
@@ -236,7 +236,7 @@ def execute(
236236
if isinstance(execution_spec.destination_spec, ex_spec.GcsOutputSpec):
237237
self._export_result_gcs(result, execution_spec.destination_spec)
238238

239-
self._publisher.send(
239+
self._publisher.publish(
240240
bigframes.core.events.ExecutionFinished(
241241
result=result,
242242
)

bigframes/session/loader.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -788,7 +788,7 @@ def read_gbq_table(
788788
# If non in strict ordering mode, don't go through overhead of scanning index column(s) to determine if unique
789789
if not primary_key and self._scan_index_uniqueness and index_cols:
790790
if publish_execution:
791-
self._publisher.send(
791+
self._publisher.publish(
792792
bigframes.core.events.ExecutionStarted(),
793793
)
794794
primary_key = bf_read_gbq_table.check_if_index_columns_are_unique(
@@ -798,7 +798,7 @@ def read_gbq_table(
798798
publisher=self._publisher,
799799
)
800800
if publish_execution:
801-
self._publisher.send(
801+
self._publisher.publish(
802802
bigframes.core.events.ExecutionFinished(),
803803
)
804804

@@ -1021,7 +1021,7 @@ def read_gbq_query(
10211021

10221022
# We want to make sure we show progress when we actually do execute a
10231023
# query. Since we have got this far, we know it's not a dry run.
1024-
self._publisher.send(
1024+
self._publisher.publish(
10251025
bigframes.core.events.ExecutionStarted(),
10261026
)
10271027

@@ -1086,7 +1086,7 @@ def read_gbq_query(
10861086
index_col=index_col,
10871087
columns=columns,
10881088
)
1089-
self._publisher.send(
1089+
self._publisher.publish(
10901090
bigframes.core.events.ExecutionFinished(),
10911091
)
10921092
return df
@@ -1098,7 +1098,7 @@ def read_gbq_query(
10981098
query_job_for_metrics,
10991099
session=self._session,
11001100
)
1101-
self._publisher.send(
1101+
self._publisher.publish(
11021102
bigframes.core.events.ExecutionFinished(),
11031103
)
11041104
return df
@@ -1116,7 +1116,7 @@ def read_gbq_query(
11161116
query_job_for_metrics,
11171117
session=self._session,
11181118
)
1119-
self._publisher.send(
1119+
self._publisher.publish(
11201120
bigframes.core.events.ExecutionFinished(),
11211121
)
11221122
return df
@@ -1140,7 +1140,7 @@ def read_gbq_query(
11401140
# max_results and filters are omitted because they are already
11411141
# handled by to_query(), above.
11421142
)
1143-
self._publisher.send(
1143+
self._publisher.publish(
11441144
bigframes.core.events.ExecutionFinished(),
11451145
)
11461146
return df

0 commit comments

Comments
 (0)