This repository was archived by the owner on Apr 1, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 67
fix: show progress even in job optional queries #2119
Merged
Merged
Changes from all commits
Commits
Show all changes
21 commits
Select commit
Hold shift + click to select a range
1eccb3a
fix: show progress even in job optional queries
tswast d03e5d1
first attempt at publisher
tswast e6c3ba9
Merge remote-tracking branch 'origin/main' into b409390651-progress-bar
tswast 70d8324
report execution started/stopped in read_gbq_query
tswast 5b4b250
render bigquery sent events
tswast 2370ea2
Feat render more events (#2121)
tswast 4d9f37a
fix job links
tswast fc1e630
fix system tests
tswast d1a7f70
fix mypy
tswast 253de65
fix unit tests
tswast 5fec058
support more event types
tswast 91506c3
Merge remote-tracking branch 'origin/main' into b409390651-progress-bar
tswast 0008e99
move publisher to session
tswast 1cf0dfd
fix remaining mypy errors
tswast a6600f8
update text
tswast 51e0ca6
Merge remote-tracking branch 'origin/main' into b409390651-progress-bar
tswast b35015e
add explicit unsubscribe
tswast 55cc2f7
Merge remote-tracking branch 'origin/main' into b409390651-progress-bar
tswast e7ca461
fix presubmits
tswast 0d0ad68
Merge remote-tracking branch 'origin/main' into b409390651-progress-bar
tswast 7edbb0a
add lock for publisher and publish temp table creations
tswast File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,237 @@ | ||
| # Copyright 2025 Google LLC | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import dataclasses | ||
| import datetime | ||
| import threading | ||
| from typing import Any, Callable, Optional, Set | ||
| import uuid | ||
|
|
||
| import google.cloud.bigquery._job_helpers | ||
| import google.cloud.bigquery.job.query | ||
| import google.cloud.bigquery.table | ||
|
|
||
| import bigframes.session.executor | ||
|
|
||
|
|
||
| class Subscriber: | ||
| def __init__(self, callback: Callable[[Event], None], *, publisher: Publisher): | ||
| self._publisher = publisher | ||
| self._callback = callback | ||
| self._subscriber_id = uuid.uuid4() | ||
|
|
||
| def __call__(self, *args, **kwargs): | ||
| return self._callback(*args, **kwargs) | ||
|
|
||
| def __hash__(self) -> int: | ||
| return hash(self._subscriber_id) | ||
|
|
||
| def __eq__(self, value: object): | ||
| if not isinstance(value, Subscriber): | ||
| return NotImplemented | ||
| return value._subscriber_id == self._subscriber_id | ||
|
|
||
| def close(self): | ||
| self._publisher.unsubscribe(self) | ||
| del self._publisher | ||
| del self._callback | ||
|
|
||
| def __enter__(self): | ||
| return self | ||
|
|
||
| def __exit__(self, exc_type, exc_value, traceback): | ||
| if exc_value is not None: | ||
| self( | ||
| UnknownErrorEvent( | ||
| exc_type=exc_type, | ||
| exc_value=exc_value, | ||
| traceback=traceback, | ||
| ) | ||
| ) | ||
| self.close() | ||
|
|
||
|
|
||
| class Publisher: | ||
| def __init__(self): | ||
| self._subscribers_lock = threading.Lock() | ||
| self._subscribers: Set[Subscriber] = set() | ||
|
|
||
| def subscribe(self, callback: Callable[[Event], None]) -> Subscriber: | ||
| # TODO(b/448176657): figure out how to handle subscribers/publishers in | ||
| # a background thread. Maybe subscribers should be thread-local? | ||
| subscriber = Subscriber(callback, publisher=self) | ||
| with self._subscribers_lock: | ||
| self._subscribers.add(subscriber) | ||
| return subscriber | ||
|
|
||
| def unsubscribe(self, subscriber: Subscriber): | ||
| with self._subscribers_lock: | ||
| self._subscribers.remove(subscriber) | ||
|
|
||
| def publish(self, event: Event): | ||
| with self._subscribers_lock: | ||
| for subscriber in self._subscribers: | ||
| subscriber(event) | ||
|
|
||
|
|
||
| class Event: | ||
| pass | ||
|
|
||
|
|
||
| @dataclasses.dataclass(frozen=True) | ||
| class SessionClosed(Event): | ||
| session_id: str | ||
|
|
||
|
|
||
| class ExecutionStarted(Event): | ||
| pass | ||
|
Comment on lines
+99
to
+100
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we have an execution_id or similar so we can correlate all the events tied to a single request?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That could help if we start doing async / background query execution. I don't think it's needed right now, though. |
||
|
|
||
|
|
||
| class ExecutionRunning(Event): | ||
| pass | ||
|
|
||
|
|
||
| @dataclasses.dataclass(frozen=True) | ||
| class ExecutionFinished(Event): | ||
| result: Optional[bigframes.session.executor.ExecuteResult] = None | ||
|
|
||
|
|
||
| @dataclasses.dataclass(frozen=True) | ||
| class UnknownErrorEvent(Event): | ||
| exc_type: Any | ||
| exc_value: Any | ||
| traceback: Any | ||
|
|
||
|
|
||
| @dataclasses.dataclass(frozen=True) | ||
| class BigQuerySentEvent(ExecutionRunning): | ||
| """Query sent to BigQuery.""" | ||
|
|
||
| query: str | ||
| billing_project: Optional[str] = None | ||
| location: Optional[str] = None | ||
| job_id: Optional[str] = None | ||
| request_id: Optional[str] = None | ||
|
|
||
| @classmethod | ||
| def from_bqclient(cls, event: google.cloud.bigquery._job_helpers.QuerySentEvent): | ||
| return cls( | ||
| query=event.query, | ||
| billing_project=event.billing_project, | ||
| location=event.location, | ||
| job_id=event.job_id, | ||
| request_id=event.request_id, | ||
| ) | ||
|
|
||
|
|
||
| @dataclasses.dataclass(frozen=True) | ||
| class BigQueryRetryEvent(ExecutionRunning): | ||
| """Query sent another time because the previous attempt failed.""" | ||
|
|
||
| query: str | ||
| billing_project: Optional[str] = None | ||
| location: Optional[str] = None | ||
| job_id: Optional[str] = None | ||
| request_id: Optional[str] = None | ||
|
|
||
| @classmethod | ||
| def from_bqclient(cls, event: google.cloud.bigquery._job_helpers.QueryRetryEvent): | ||
| return cls( | ||
| query=event.query, | ||
| billing_project=event.billing_project, | ||
| location=event.location, | ||
| job_id=event.job_id, | ||
| request_id=event.request_id, | ||
| ) | ||
|
|
||
|
|
||
| @dataclasses.dataclass(frozen=True) | ||
| class BigQueryReceivedEvent(ExecutionRunning): | ||
| """Query received and acknowledged by the BigQuery API.""" | ||
|
|
||
| billing_project: Optional[str] = None | ||
| location: Optional[str] = None | ||
| job_id: Optional[str] = None | ||
| statement_type: Optional[str] = None | ||
| state: Optional[str] = None | ||
| query_plan: Optional[list[google.cloud.bigquery.job.query.QueryPlanEntry]] = None | ||
| created: Optional[datetime.datetime] = None | ||
| started: Optional[datetime.datetime] = None | ||
| ended: Optional[datetime.datetime] = None | ||
|
|
||
| @classmethod | ||
| def from_bqclient( | ||
| cls, event: google.cloud.bigquery._job_helpers.QueryReceivedEvent | ||
| ): | ||
| return cls( | ||
| billing_project=event.billing_project, | ||
| location=event.location, | ||
| job_id=event.job_id, | ||
| statement_type=event.statement_type, | ||
| state=event.state, | ||
| query_plan=event.query_plan, | ||
| created=event.created, | ||
| started=event.started, | ||
| ended=event.ended, | ||
| ) | ||
|
|
||
|
|
||
| @dataclasses.dataclass(frozen=True) | ||
| class BigQueryFinishedEvent(ExecutionRunning): | ||
| """Query finished successfully.""" | ||
|
|
||
| billing_project: Optional[str] = None | ||
| location: Optional[str] = None | ||
| query_id: Optional[str] = None | ||
| job_id: Optional[str] = None | ||
| destination: Optional[google.cloud.bigquery.table.TableReference] = None | ||
| total_rows: Optional[int] = None | ||
| total_bytes_processed: Optional[int] = None | ||
| slot_millis: Optional[int] = None | ||
| created: Optional[datetime.datetime] = None | ||
| started: Optional[datetime.datetime] = None | ||
| ended: Optional[datetime.datetime] = None | ||
|
|
||
| @classmethod | ||
| def from_bqclient( | ||
| cls, event: google.cloud.bigquery._job_helpers.QueryFinishedEvent | ||
| ): | ||
| return cls( | ||
| billing_project=event.billing_project, | ||
| location=event.location, | ||
| query_id=event.query_id, | ||
| job_id=event.job_id, | ||
| destination=event.destination, | ||
| total_rows=event.total_rows, | ||
| total_bytes_processed=event.total_bytes_processed, | ||
| slot_millis=event.slot_millis, | ||
| created=event.created, | ||
| started=event.started, | ||
| ended=event.ended, | ||
| ) | ||
|
|
||
|
|
||
| @dataclasses.dataclass(frozen=True) | ||
| class BigQueryUnknownEvent(ExecutionRunning): | ||
| """Got unknown event from the BigQuery client library.""" | ||
|
|
||
| # TODO: should we just skip sending unknown events? | ||
|
|
||
| event: object | ||
|
|
||
| @classmethod | ||
| def from_bqclient(cls, event): | ||
| return cls(event) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another alternative to explicitly removing from subscriber list in a blocking way is to just flag oneself as closed, and the publisher can then remove at its convenience. I think this approach works fine though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wary of that because it would mean a circular reference would hang around until the next time an event is published, but I suppose that'd be OK.