-
Notifications
You must be signed in to change notification settings - Fork 1.7k
feat: Add bigframes.execution_history API to track BigQuery jobs #16588
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 5 commits
eab6cdb
e5ec3e9
44e1b62
6055998
df8dbcd
30f0a2b
8c9deb8
a46ce69
c09b946
6f11279
60a19ae
d428370
39f4c2a
b669473
2398a67
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -109,6 +109,37 @@ | |
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class _ExecutionHistory(pandas.DataFrame): | ||
| @property | ||
| def _constructor(self): | ||
| return _ExecutionHistory | ||
|
|
||
| def _repr_html_(self) -> str | None: | ||
| import bigframes.formatting_helpers as formatter | ||
|
|
||
| if self.empty: | ||
| return "<div>No executions found.</div>" | ||
|
|
||
| cols = ["job_id", "status", "total_bytes_processed", "job_url"] | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How will this generalize to other execution tasks beyond specifically bq jobs?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've added job_type to the HTML display to make it clearer when a job is Polars vs BigQuery. I also made the column selection safe against missing columns, so it can adapt to different execution engines that might provide different sets of metadata. |
||
|
|
||
| def format_url(url): | ||
| return f'<a target="_blank" href="{url}">Open Job</a>' if url else "" | ||
|
|
||
| try: | ||
| df_display = self[cols].copy() | ||
| df_display["total_bytes_processed"] = df_display[ | ||
| "total_bytes_processed" | ||
| ].apply(formatter.get_formatted_bytes) | ||
| df_display["job_url"] = df_display["job_url"].apply(format_url) | ||
|
|
||
| # Rename job_id to query_id to match user expectations | ||
| df_display = df_display.rename(columns={"job_id": "query_id"}) | ||
|
|
||
| return df_display.to_html(escape=False, index=False) | ||
| except Exception: | ||
| return super()._repr_html_() # type: ignore | ||
|
|
||
|
|
||
| @log_adapter.class_logger | ||
| class Session( | ||
| third_party_pandas_gbq.GBQIOMixin, | ||
|
|
@@ -233,6 +264,7 @@ def __init__( | |
| ) | ||
|
|
||
| self._metrics = metrics.ExecutionMetrics() | ||
| self._publisher.subscribe(self._metrics.on_event) | ||
| self._function_session = bff_session.FunctionSession() | ||
| self._anon_dataset_manager = anonymous_dataset.AnonymousDatasetManager( | ||
| self._clients_provider.bqclient, | ||
|
|
@@ -371,6 +403,10 @@ def slot_millis_sum(self): | |
| """The sum of all slot time used by bigquery jobs in this session.""" | ||
| return self._metrics.slot_millis | ||
|
|
||
| def execution_history(self) -> pandas.DataFrame: | ||
| """Returns a list of underlying BigQuery executions initiated by BigFrames in the current session.""" | ||
| return _ExecutionHistory([job.__dict__ for job in self._metrics.jobs]) | ||
|
|
||
| @property | ||
| def _allows_ambiguity(self) -> bool: | ||
| return self._allow_ambiguity | ||
|
|
@@ -432,7 +468,8 @@ def read_gbq( # type: ignore[overload-overlap] | |
| col_order: Iterable[str] = ..., | ||
| dry_run: Literal[False] = ..., | ||
| allow_large_results: Optional[bool] = ..., | ||
| ) -> dataframe.DataFrame: ... | ||
| ) -> dataframe.DataFrame: | ||
| ... | ||
|
|
||
| @overload | ||
| def read_gbq( | ||
|
|
@@ -448,7 +485,8 @@ def read_gbq( | |
| col_order: Iterable[str] = ..., | ||
| dry_run: Literal[True] = ..., | ||
| allow_large_results: Optional[bool] = ..., | ||
| ) -> pandas.Series: ... | ||
| ) -> pandas.Series: | ||
| ... | ||
|
|
||
| def read_gbq( | ||
| self, | ||
|
|
@@ -520,7 +558,8 @@ def _read_gbq_colab( | |
| *, | ||
| pyformat_args: Optional[Dict[str, Any]] = None, | ||
| dry_run: Literal[False] = ..., | ||
| ) -> dataframe.DataFrame: ... | ||
| ) -> dataframe.DataFrame: | ||
| ... | ||
|
|
||
| @overload | ||
| def _read_gbq_colab( | ||
|
|
@@ -529,7 +568,8 @@ def _read_gbq_colab( | |
| *, | ||
| pyformat_args: Optional[Dict[str, Any]] = None, | ||
| dry_run: Literal[True] = ..., | ||
| ) -> pandas.Series: ... | ||
| ) -> pandas.Series: | ||
| ... | ||
|
|
||
| @log_adapter.log_name_override("read_gbq_colab") | ||
| def _read_gbq_colab( | ||
|
|
@@ -590,7 +630,8 @@ def read_gbq_query( # type: ignore[overload-overlap] | |
| filters: third_party_pandas_gbq.FiltersType = ..., | ||
| dry_run: Literal[False] = ..., | ||
| allow_large_results: Optional[bool] = ..., | ||
| ) -> dataframe.DataFrame: ... | ||
| ) -> dataframe.DataFrame: | ||
| ... | ||
|
|
||
| @overload | ||
| def read_gbq_query( | ||
|
|
@@ -606,7 +647,8 @@ def read_gbq_query( | |
| filters: third_party_pandas_gbq.FiltersType = ..., | ||
| dry_run: Literal[True] = ..., | ||
| allow_large_results: Optional[bool] = ..., | ||
| ) -> pandas.Series: ... | ||
| ) -> pandas.Series: | ||
| ... | ||
|
|
||
| def read_gbq_query( | ||
| self, | ||
|
|
@@ -753,7 +795,8 @@ def read_gbq_table( # type: ignore[overload-overlap] | |
| use_cache: bool = ..., | ||
| col_order: Iterable[str] = ..., | ||
| dry_run: Literal[False] = ..., | ||
| ) -> dataframe.DataFrame: ... | ||
| ) -> dataframe.DataFrame: | ||
| ... | ||
|
|
||
| @overload | ||
| def read_gbq_table( | ||
|
|
@@ -767,7 +810,8 @@ def read_gbq_table( | |
| use_cache: bool = ..., | ||
| col_order: Iterable[str] = ..., | ||
| dry_run: Literal[True] = ..., | ||
| ) -> pandas.Series: ... | ||
| ) -> pandas.Series: | ||
| ... | ||
|
|
||
| def read_gbq_table( | ||
| self, | ||
|
|
@@ -918,23 +962,26 @@ def read_pandas( | |
| pandas_dataframe: pandas.Index, | ||
| *, | ||
| write_engine: constants.WriteEngineType = "default", | ||
| ) -> bigframes.core.indexes.Index: ... | ||
| ) -> bigframes.core.indexes.Index: | ||
| ... | ||
|
|
||
| @typing.overload | ||
| def read_pandas( | ||
| self, | ||
| pandas_dataframe: pandas.Series, | ||
| *, | ||
| write_engine: constants.WriteEngineType = "default", | ||
| ) -> bigframes.series.Series: ... | ||
| ) -> bigframes.series.Series: | ||
| ... | ||
|
|
||
| @typing.overload | ||
| def read_pandas( | ||
| self, | ||
| pandas_dataframe: pandas.DataFrame, | ||
| *, | ||
| write_engine: constants.WriteEngineType = "default", | ||
| ) -> dataframe.DataFrame: ... | ||
| ) -> dataframe.DataFrame: | ||
| ... | ||
|
|
||
| def read_pandas( | ||
| self, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need execution history to be a dataframe itself? this comes with a lot of baggage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great point! I've refactored _ExecutionHistory to use composition instead of inheritance. It is now a standard class that wraps a DataFrame internally, avoiding the baggage of subclassing. Users can call .to_dataframe() on it to get the DataFrame representation.