Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/bigframes/bigframes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from bigframes._config.bigquery_options import BigQueryOptions # noqa: E402
from bigframes.core.global_session import ( # noqa: E402
close_session,
execution_history,
get_global_session,
)
from bigframes.session import Session, connect # noqa: E402
Expand All @@ -69,6 +70,7 @@ def load_ipython_extension(ipython):
"BigQueryOptions",
"get_global_session",
"close_session",
"execution_history",
"enums",
"exceptions",
"connect",
Expand Down
10 changes: 10 additions & 0 deletions packages/bigframes/bigframes/core/global_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import bigframes.exceptions as bfe

if TYPE_CHECKING:
import pandas

import bigframes.session

_global_session: Optional[bigframes.session.Session] = None
Expand Down Expand Up @@ -124,6 +126,14 @@ def with_default_session(func_: Callable[..., _T], *args, **kwargs) -> _T:
return func_(get_global_session(), *args, **kwargs)


def execution_history() -> "pandas.DataFrame":
import pandas # noqa: F401

import bigframes.session

return with_default_session(bigframes.session.Session.execution_history)


class _GlobalSessionContext:
"""
Context manager for testing that sets global session.
Expand Down
69 changes: 58 additions & 11 deletions packages/bigframes/bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,37 @@
logger = logging.getLogger(__name__)


class _ExecutionHistory(pandas.DataFrame):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need execution history to be a dataframe itself? this comes with a lot of baggage

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great point! I've refactored _ExecutionHistory to use composition instead of inheritance. It is now a standard class that wraps a DataFrame internally, avoiding the baggage of subclassing. Users can call .to_dataframe() on it to get the DataFrame representation.

@property
def _constructor(self):
return _ExecutionHistory

def _repr_html_(self) -> str | None:
import bigframes.formatting_helpers as formatter

if self.empty:
return "<div>No executions found.</div>"

cols = ["job_id", "status", "total_bytes_processed", "job_url"]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will this generalize to other execution tasks beyond specifically bq jobs?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added job_type to the HTML display to make it clearer when a job is Polars vs BigQuery. I also made the column selection safe against missing columns, so it can adapt to different execution engines that might provide different sets of metadata.


def format_url(url):
return f'<a target="_blank" href="{url}">Open Job</a>' if url else ""

try:
df_display = self[cols].copy()
df_display["total_bytes_processed"] = df_display[
"total_bytes_processed"
].apply(formatter.get_formatted_bytes)
df_display["job_url"] = df_display["job_url"].apply(format_url)

# Rename job_id to query_id to match user expectations
df_display = df_display.rename(columns={"job_id": "query_id"})

return df_display.to_html(escape=False, index=False)
except Exception:
return super()._repr_html_() # type: ignore


@log_adapter.class_logger
class Session(
third_party_pandas_gbq.GBQIOMixin,
Expand Down Expand Up @@ -233,6 +264,7 @@ def __init__(
)

self._metrics = metrics.ExecutionMetrics()
self._publisher.subscribe(self._metrics.on_event)
self._function_session = bff_session.FunctionSession()
self._anon_dataset_manager = anonymous_dataset.AnonymousDatasetManager(
self._clients_provider.bqclient,
Expand Down Expand Up @@ -371,6 +403,10 @@ def slot_millis_sum(self):
"""The sum of all slot time used by bigquery jobs in this session."""
return self._metrics.slot_millis

def execution_history(self) -> pandas.DataFrame:
"""Returns a list of underlying BigQuery executions initiated by BigFrames in the current session."""
return _ExecutionHistory([job.__dict__ for job in self._metrics.jobs])

@property
def _allows_ambiguity(self) -> bool:
return self._allow_ambiguity
Expand Down Expand Up @@ -432,7 +468,8 @@ def read_gbq( # type: ignore[overload-overlap]
col_order: Iterable[str] = ...,
dry_run: Literal[False] = ...,
allow_large_results: Optional[bool] = ...,
) -> dataframe.DataFrame: ...
) -> dataframe.DataFrame:
...

@overload
def read_gbq(
Expand All @@ -448,7 +485,8 @@ def read_gbq(
col_order: Iterable[str] = ...,
dry_run: Literal[True] = ...,
allow_large_results: Optional[bool] = ...,
) -> pandas.Series: ...
) -> pandas.Series:
...

def read_gbq(
self,
Expand Down Expand Up @@ -520,7 +558,8 @@ def _read_gbq_colab(
*,
pyformat_args: Optional[Dict[str, Any]] = None,
dry_run: Literal[False] = ...,
) -> dataframe.DataFrame: ...
) -> dataframe.DataFrame:
...

@overload
def _read_gbq_colab(
Expand All @@ -529,7 +568,8 @@ def _read_gbq_colab(
*,
pyformat_args: Optional[Dict[str, Any]] = None,
dry_run: Literal[True] = ...,
) -> pandas.Series: ...
) -> pandas.Series:
...

@log_adapter.log_name_override("read_gbq_colab")
def _read_gbq_colab(
Expand Down Expand Up @@ -590,7 +630,8 @@ def read_gbq_query( # type: ignore[overload-overlap]
filters: third_party_pandas_gbq.FiltersType = ...,
dry_run: Literal[False] = ...,
allow_large_results: Optional[bool] = ...,
) -> dataframe.DataFrame: ...
) -> dataframe.DataFrame:
...

@overload
def read_gbq_query(
Expand All @@ -606,7 +647,8 @@ def read_gbq_query(
filters: third_party_pandas_gbq.FiltersType = ...,
dry_run: Literal[True] = ...,
allow_large_results: Optional[bool] = ...,
) -> pandas.Series: ...
) -> pandas.Series:
...

def read_gbq_query(
self,
Expand Down Expand Up @@ -753,7 +795,8 @@ def read_gbq_table( # type: ignore[overload-overlap]
use_cache: bool = ...,
col_order: Iterable[str] = ...,
dry_run: Literal[False] = ...,
) -> dataframe.DataFrame: ...
) -> dataframe.DataFrame:
...

@overload
def read_gbq_table(
Expand All @@ -767,7 +810,8 @@ def read_gbq_table(
use_cache: bool = ...,
col_order: Iterable[str] = ...,
dry_run: Literal[True] = ...,
) -> pandas.Series: ...
) -> pandas.Series:
...

def read_gbq_table(
self,
Expand Down Expand Up @@ -918,23 +962,26 @@ def read_pandas(
pandas_dataframe: pandas.Index,
*,
write_engine: constants.WriteEngineType = "default",
) -> bigframes.core.indexes.Index: ...
) -> bigframes.core.indexes.Index:
...

@typing.overload
def read_pandas(
self,
pandas_dataframe: pandas.Series,
*,
write_engine: constants.WriteEngineType = "default",
) -> bigframes.series.Series: ...
) -> bigframes.series.Series:
...

@typing.overload
def read_pandas(
self,
pandas_dataframe: pandas.DataFrame,
*,
write_engine: constants.WriteEngineType = "default",
) -> dataframe.DataFrame: ...
) -> dataframe.DataFrame:
...

def read_pandas(
self,
Expand Down
5 changes: 5 additions & 0 deletions packages/bigframes/bigframes/session/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import pandas
import pyarrow as pa
from google.cloud import bigquery_storage_v1
from google.cloud.bigquery.job.load import LoadJob
from google.cloud.bigquery.job.query import QueryJob
from google.cloud.bigquery_storage_v1 import types as bq_storage_types

import bigframes._tools
Expand Down Expand Up @@ -605,6 +607,9 @@ def _start_generic_job(self, job: formatting_helpers.GenericJob):
else:
job.result()

if self._metrics is not None and isinstance(job, (QueryJob, LoadJob)):
self._metrics.count_job_stats(query_job=job)

@overload
def read_gbq_table( # type: ignore[overload-overlap]
self,
Expand Down
Loading
Loading