Skip to content
Open
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
d887714
fix: Respect display.progress_bar=None in background threads
shuoweil Apr 17, 2026
42e4a50
refactor: Refactor BQ event progress bar config
shuoweil Apr 17, 2026
d6757bc
Refactor BQ event progress bar config and add system test
shuoweil Apr 17, 2026
3e8ddde
refactor: refactor code
shuoweil Apr 17, 2026
be825b5
Merge branch 'main' into shuowei-anywidget-extraneous-output
shuoweil Apr 20, 2026
96aaff3
docs: add ignore
shuoweil Apr 20, 2026
bfba719
format file
shuoweil Apr 20, 2026
4ffd647
format file
shuoweil Apr 20, 2026
ffbc397
Roll back .pre-commit-config.yaml changes
shuoweil Apr 20, 2026
27010d3
Merge branch 'main' into shuowei-anywidget-extraneous-output
shuoweil Apr 22, 2026
a6c42af
chore: format files
shuoweil Apr 22, 2026
9742165
Merge branch 'main' into shuowei-anywidget-extraneous-output
shuoweil Apr 29, 2026
cdcbf0c
rename _FALLBACK_TO_GLOBAL to _DEFAULT
shuoweil Apr 29, 2026
c20751b
Merge branch 'main' into shuowei-anywidget-extraneous-output
shuoweil May 1, 2026
a139c65
format: format code
shuoweil May 1, 2026
85bf48f
Merge branch 'main' into shuowei-anywidget-extraneous-output
shuoweil May 8, 2026
67c16c6
Apply review comments in events.py
shuoweil May 8, 2026
13b75e7
format code
shuoweil May 8, 2026
7b9f249
format code
shuoweil May 8, 2026
dd69a8d
fix mypy
shuoweil May 8, 2026
a3e4ac5
update code for lint
shuoweil May 8, 2026
f393a0d
format code
shuoweil May 8, 2026
193d33c
Merge branch 'main' into shuowei-anywidget-extraneous-output
shuoweil May 11, 2026
d001a9b
refactor: remove progress_bar from events
shuoweil May 11, 2026
6377840
refactor: use EventEnvelope for progress_bar context
shuoweil May 11, 2026
5584531
style: fix line length lints in formatting_helpers and __init__
shuoweil May 11, 2026
a7d0171
Merge branch 'main' into shuowei-anywidget-extraneous-output
shuoweil May 11, 2026
d372472
format file
shuoweil May 12, 2026
8879473
Merge branch 'main' into shuowei-anywidget-extraneous-output
shuoweil May 12, 2026
457cf84
fix lint
shuoweil May 12, 2026
2339d17
format code
shuoweil May 12, 2026
44dd6de
format code
shuoweil May 12, 2026
5e4dcd8
reformat stubs and fix flake8 E704
shuoweil May 12, 2026
26420bf
Merge branch 'main' into shuowei-anywidget-extraneous-output
shuoweil May 13, 2026
e46c163
chore: add clarifying comment for on_event envelope check
shuoweil May 13, 2026
27bba29
chore: add clarifying comment for formatting_helpers envelope check
shuoweil May 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 61 additions & 35 deletions packages/bigframes/bigframes/core/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,19 @@
import datetime
import threading
import uuid
from typing import Any, Callable, Optional, Set
from typing import Any, Callable, Literal, Set

import google.cloud.bigquery._job_helpers
import google.cloud.bigquery.job.query
import google.cloud.bigquery.table

import bigframes.session.executor

_DEFAULT: Literal["default"] = "default"


class Subscriber:
def __init__(self, callback: Callable[[Event], None], *, publisher: Publisher):
def __init__(self, callback: Callable[[Event], None], *, publisher: Publisher): # noqa: E501
self._publisher = publisher
self._callback = callback
self._subscriber_id = uuid.uuid4()
Expand Down Expand Up @@ -106,7 +108,7 @@ class ExecutionRunning(Event):

@dataclasses.dataclass(frozen=True)
class ExecutionFinished(Event):
result: Optional[bigframes.session.executor.ExecuteResult] = None
result: bigframes.session.executor.ExecuteResult | None = None


@dataclasses.dataclass(frozen=True)
Expand All @@ -121,19 +123,26 @@ class BigQuerySentEvent(ExecutionRunning):
"""Query sent to BigQuery."""

query: str
billing_project: Optional[str] = None
location: Optional[str] = None
job_id: Optional[str] = None
request_id: Optional[str] = None
billing_project: str | None = None
location: str | None = None
job_id: str | None = None
request_id: str | None = None
progress_bar: Literal["default", "auto", "notebook", "terminal"] | None = _DEFAULT
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"progress_bar" isn't really state about the event, is it? Could you describe to me again what this is trying to solve?

Consider an alternative where the progress bar type is attached to the subscriber, instead, as that seems to be more appropriate. Or another alternative in which we wrap the events with some sort of "CellState" that gives an indicator of the current execution context, of which the progress_bar could be a property of that.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Tim, thanks for the suggestions.

I looked into the first alternative (attaching the progress bar type to the subscriber), but it doesn't quite work for what we are trying to solve. The progress_bar option is thread-local and needs to be captured from the initiating thread at the moment the query starts. Since subscribers are typically long-lived (session-scoped), they cannot easily capture this per-query context. Furthermore, the callbacks from the BigQuery client library run in a background thread where the initiating thread's local storage is not accessible.

Therefore, I took your second alternative: I removed progress_bar from the event classes and instead wrapped the events in an EventEnvelope (acting as the execution context/'CellState' you suggested). This envelope is created in the callback (which captures the initiating thread's option via a closure) and carries the progress_bar setting along with the event to the subscribers.

This keeps the events clean of display preferences while solving the cross-thread propagation issue. Thanks a lot.


@classmethod
def from_bqclient(cls, event: google.cloud.bigquery._job_helpers.QuerySentEvent):
def from_bqclient(
cls,
event: google.cloud.bigquery._job_helpers.QuerySentEvent,
progress_bar: Literal["default", "auto", "notebook", "terminal"]
| None = _DEFAULT,
):
return cls(
query=event.query,
billing_project=event.billing_project,
location=event.location,
job_id=event.job_id,
request_id=event.request_id,
progress_bar=progress_bar,
)


Expand All @@ -142,39 +151,50 @@ class BigQueryRetryEvent(ExecutionRunning):
"""Query sent another time because the previous attempt failed."""

query: str
billing_project: Optional[str] = None
location: Optional[str] = None
job_id: Optional[str] = None
request_id: Optional[str] = None
billing_project: str | None = None
location: str | None = None
job_id: str | None = None
request_id: str | None = None
progress_bar: Literal["default", "auto", "notebook", "terminal"] | None = _DEFAULT

@classmethod
def from_bqclient(cls, event: google.cloud.bigquery._job_helpers.QueryRetryEvent):
def from_bqclient(
cls,
event: google.cloud.bigquery._job_helpers.QueryRetryEvent,
progress_bar: Literal["default", "auto", "notebook", "terminal"]
| None = _DEFAULT,
):
return cls(
query=event.query,
billing_project=event.billing_project,
location=event.location,
job_id=event.job_id,
request_id=event.request_id,
progress_bar=progress_bar,
)


@dataclasses.dataclass(frozen=True)
class BigQueryReceivedEvent(ExecutionRunning):
"""Query received and acknowledged by the BigQuery API."""

billing_project: Optional[str] = None
location: Optional[str] = None
job_id: Optional[str] = None
statement_type: Optional[str] = None
state: Optional[str] = None
query_plan: Optional[list[google.cloud.bigquery.job.query.QueryPlanEntry]] = None
created: Optional[datetime.datetime] = None
started: Optional[datetime.datetime] = None
ended: Optional[datetime.datetime] = None
billing_project: str | None = None
location: str | None = None
job_id: str | None = None
statement_type: str | None = None
state: str | None = None
query_plan: list[google.cloud.bigquery.job.query.QueryPlanEntry] | None = None
created: datetime.datetime | None = None
started: datetime.datetime | None = None
ended: datetime.datetime | None = None
progress_bar: Literal["default", "auto", "notebook", "terminal"] | None = _DEFAULT

@classmethod
def from_bqclient(
cls, event: google.cloud.bigquery._job_helpers.QueryReceivedEvent
cls,
event: google.cloud.bigquery._job_helpers.QueryReceivedEvent,
progress_bar: Literal["default", "auto", "notebook", "terminal"]
| None = _DEFAULT,
):
return cls(
billing_project=event.billing_project,
Expand All @@ -186,28 +206,33 @@ def from_bqclient(
created=event.created,
started=event.started,
ended=event.ended,
progress_bar=progress_bar,
)


@dataclasses.dataclass(frozen=True)
class BigQueryFinishedEvent(ExecutionRunning):
"""Query finished successfully."""

billing_project: Optional[str] = None
location: Optional[str] = None
query_id: Optional[str] = None
job_id: Optional[str] = None
destination: Optional[google.cloud.bigquery.table.TableReference] = None
total_rows: Optional[int] = None
total_bytes_processed: Optional[int] = None
slot_millis: Optional[int] = None
created: Optional[datetime.datetime] = None
started: Optional[datetime.datetime] = None
ended: Optional[datetime.datetime] = None
billing_project: str | None = None
location: str | None = None
query_id: str | None = None
job_id: str | None = None
destination: google.cloud.bigquery.table.TableReference | None = None
total_rows: int | None = None
total_bytes_processed: int | None = None
slot_millis: int | None = None
created: datetime.datetime | None = None
started: datetime.datetime | None = None
ended: datetime.datetime | None = None
progress_bar: Literal["default", "auto", "notebook", "terminal"] | None = _DEFAULT

@classmethod
def from_bqclient(
cls, event: google.cloud.bigquery._job_helpers.QueryFinishedEvent
cls,
event: google.cloud.bigquery._job_helpers.QueryFinishedEvent,
progress_bar: Literal["default", "auto", "notebook", "terminal"]
| None = _DEFAULT,
):
return cls(
billing_project=event.billing_project,
Expand All @@ -221,6 +246,7 @@ def from_bqclient(
created=event.created,
started=event.started,
ended=event.ended,
progress_bar=progress_bar,
)


Expand Down
5 changes: 4 additions & 1 deletion packages/bigframes/bigframes/formatting_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,10 @@ def progress_callback(
# This will allow cleanup to continue.
return

progress_bar = bigframes._config.options.display.progress_bar
# Prioritize progress_bar set on the event, falling back to thread-local option.
progress_bar = getattr(event, "progress_bar", bigframes.core.events._DEFAULT)
if progress_bar == bigframes.core.events._DEFAULT:
progress_bar = bigframes._config.options.display.progress_bar

if progress_bar == "auto":
progress_bar = "notebook" if in_ipython() else "terminal"
Expand Down
2 changes: 1 addition & 1 deletion packages/bigframes/bigframes/pandas/io/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -654,8 +654,8 @@ def from_glob_path(
def _get_bqclient_and_project() -> Tuple[bigquery.Client, str]:
# Address circular imports in doctest due to bigframes/session/__init__.py
# containing a lot of logic and samples.
from bigframes.session import clients
import bigframes._config.auth
from bigframes.session import clients

credentials, project = bigframes._config.auth.resolve_credentials_and_project(
config.options.bigquery
Expand Down
27 changes: 16 additions & 11 deletions packages/bigframes/bigframes/session/_io/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,18 +245,23 @@ def add_and_trim_labels(job_config, session=None):


def create_bq_event_callback(publisher):
def publish_bq_event(event):
if isinstance(event, google.cloud.bigquery._job_helpers.QueryFinishedEvent):
bf_event = bigframes.core.events.BigQueryFinishedEvent.from_bqclient(event)
elif isinstance(event, google.cloud.bigquery._job_helpers.QueryReceivedEvent):
bf_event = bigframes.core.events.BigQueryReceivedEvent.from_bqclient(event)
elif isinstance(event, google.cloud.bigquery._job_helpers.QueryRetryEvent):
bf_event = bigframes.core.events.BigQueryRetryEvent.from_bqclient(event)
elif isinstance(event, google.cloud.bigquery._job_helpers.QuerySentEvent):
bf_event = bigframes.core.events.BigQuerySentEvent.from_bqclient(event)
else:
bf_event = bigframes.core.events.BigQueryUnknownEvent(event)
import bigframes._config

progress_bar = bigframes._config.options.display.progress_bar

event_map = {
google.cloud.bigquery._job_helpers.QueryFinishedEvent: bigframes.core.events.BigQueryFinishedEvent,
google.cloud.bigquery._job_helpers.QueryReceivedEvent: bigframes.core.events.BigQueryReceivedEvent,
google.cloud.bigquery._job_helpers.QueryRetryEvent: bigframes.core.events.BigQueryRetryEvent,
google.cloud.bigquery._job_helpers.QuerySentEvent: bigframes.core.events.BigQuerySentEvent,
}

def publish_bq_event(event):
bf_event = bigframes.core.events.BigQueryUnknownEvent(event)
for bq_type, bf_type in event_map.items():
if isinstance(event, bq_type):
bf_event = bf_type.from_bqclient(event, progress_bar=progress_bar) # type: ignore
break
publisher.publish(bf_event)

return publish_bq_event
Expand Down
17 changes: 17 additions & 0 deletions packages/bigframes/tests/system/small/test_progress_bar.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,23 @@ def test_progress_bar_load_jobs(
assert_loading_msg_exist(capsys.readouterr().out, pattern="Load")


def test_progress_bar_uniqueness_check(session: bf.Session, capsys):
# Ensure strictly_ordered is True (default) to trigger uniqueness check
assert session._strictly_ordered

capsys.readouterr() # clear output

with bf.option_context("display.progress_bar", "terminal"):
# Read a table and specify a non-unique index_col to trigger the check.
# We use a public table to make it a "real" test.
session.read_gbq_table(
"bigquery-public-data.ml_datasets.penguins",
index_col="island",
)

assert_loading_msg_exist(capsys.readouterr().out)


def assert_loading_msg_exist(capstdout: str, pattern=job_load_message_regex):
num_loading_msg = 0
lines = capstdout.split("\n")
Expand Down
25 changes: 25 additions & 0 deletions packages/bigframes/tests/unit/test_formatting_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,3 +212,28 @@ def test_get_job_url():
job_id=job_id, location=location, project_id=project_id
)
assert actual_url == expected_url


def test_progress_callback_respects_event_progress_bar():
event = bfevents.BigQuerySentEvent(
query="SELECT * FROM my_table",
progress_bar=None,
)

with mock.patch("bigframes._config.options.display.progress_bar", "terminal"):
with mock.patch("bigframes.formatting_helpers.in_ipython", return_value=False):
with mock.patch("builtins.print") as mock_print:
formatting_helpers.progress_callback(event)
mock_print.assert_not_called()


def test_progress_callback_falls_back_to_global():
event = bfevents.BigQuerySentEvent(
query="SELECT * FROM my_table",
)

with mock.patch("bigframes._config.options.display.progress_bar", "terminal"):
with mock.patch("bigframes.formatting_helpers.in_ipython", return_value=False):
with mock.patch("builtins.print") as mock_print:
formatting_helpers.progress_callback(event)
mock_print.assert_called_once()
Loading