diff --git a/packages/bigframes/bigframes/__init__.py b/packages/bigframes/bigframes/__init__.py index 29a27e4b6f90..7061300b5cc5 100644 --- a/packages/bigframes/bigframes/__init__.py +++ b/packages/bigframes/bigframes/__init__.py @@ -45,6 +45,7 @@ from bigframes._config.bigquery_options import BigQueryOptions # noqa: E402 from bigframes.core.global_session import ( # noqa: E402 close_session, + execution_history, get_global_session, ) from bigframes.session import Session, connect # noqa: E402 @@ -69,6 +70,7 @@ def load_ipython_extension(ipython): "BigQueryOptions", "get_global_session", "close_session", + "execution_history", "enums", "exceptions", "connect", diff --git a/packages/bigframes/bigframes/core/global_session.py b/packages/bigframes/bigframes/core/global_session.py index ce3b16d041e3..6ffb37ac5acf 100644 --- a/packages/bigframes/bigframes/core/global_session.py +++ b/packages/bigframes/bigframes/core/global_session.py @@ -124,6 +124,14 @@ def with_default_session(func_: Callable[..., _T], *args, **kwargs) -> _T: return func_(get_global_session(), *args, **kwargs) +def execution_history() -> "bigframes.session._ExecutionHistory": + import pandas # noqa: F401 + + import bigframes.session + + return with_default_session(bigframes.session.Session.execution_history) + + class _GlobalSessionContext: """ Context manager for testing that sets global session. diff --git a/packages/bigframes/bigframes/core/sql/__init__.py b/packages/bigframes/bigframes/core/sql/__init__.py index 051cb045223d..b28d59216950 100644 --- a/packages/bigframes/bigframes/core/sql/__init__.py +++ b/packages/bigframes/bigframes/core/sql/__init__.py @@ -11,12 +11,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from __future__ import annotations """ Utility functions for SQL construction. """ +from __future__ import annotations + import json from typing import ( TYPE_CHECKING, diff --git a/packages/bigframes/bigframes/session/__init__.py b/packages/bigframes/bigframes/session/__init__.py index 3b14cbd711b9..afca1ed0554f 100644 --- a/packages/bigframes/bigframes/session/__init__.py +++ b/packages/bigframes/bigframes/session/__init__.py @@ -109,6 +109,46 @@ logger = logging.getLogger(__name__) +class _ExecutionHistory: + def __init__(self, jobs: list[dict]): + self._df = pandas.DataFrame(jobs) + + def to_dataframe(self) -> pandas.DataFrame: + """Returns the execution history as a pandas DataFrame.""" + return self._df + + def _repr_html_(self) -> str | None: + import bigframes.formatting_helpers as formatter + + if self._df.empty: + return "
No executions found.
" + + cols = ["job_type", "job_id", "status", "total_bytes_processed", "job_url"] + + # Filter columns to only those that exist in the dataframe + available_cols = [c for c in cols if c in self._df.columns] + + def format_url(url): + return f'Open Job' if url else "" + + try: + df_display = self._df[available_cols].copy() + if "total_bytes_processed" in df_display.columns: + df_display["total_bytes_processed"] = df_display[ + "total_bytes_processed" + ].apply(formatter.get_formatted_bytes) + if "job_url" in df_display.columns: + df_display["job_url"] = df_display["job_url"].apply(format_url) + + # Rename job_id to query_id to match user expectations + if "job_id" in df_display.columns: + df_display = df_display.rename(columns={"job_id": "query_id"}) + + return df_display.to_html(escape=False, index=False) + except Exception: + return self._df.to_html() + + @log_adapter.class_logger class Session( third_party_pandas_gbq.GBQIOMixin, @@ -233,6 +273,7 @@ def __init__( ) self._metrics = metrics.ExecutionMetrics() + self._publisher.subscribe(self._metrics.on_event) self._function_session = bff_session.FunctionSession() self._anon_dataset_manager = anonymous_dataset.AnonymousDatasetManager( self._clients_provider.bqclient, @@ -371,6 +412,13 @@ def slot_millis_sum(self): """The sum of all slot time used by bigquery jobs in this session.""" return self._metrics.slot_millis + def execution_history(self) -> _ExecutionHistory: + """Returns the history of executions initiated by BigFrames in the current session. + + Use `.to_dataframe()` on the result to get a pandas DataFrame. + """ + return _ExecutionHistory([job.__dict__ for job in self._metrics.jobs]) + @property def _allows_ambiguity(self) -> bool: return self._allow_ambiguity diff --git a/packages/bigframes/bigframes/session/loader.py b/packages/bigframes/bigframes/session/loader.py index 960208063105..d3be3d09f235 100644 --- a/packages/bigframes/bigframes/session/loader.py +++ b/packages/bigframes/bigframes/session/loader.py @@ -52,6 +52,8 @@ import pandas import pyarrow as pa from google.cloud import bigquery_storage_v1 +from google.cloud.bigquery.job.load import LoadJob +from google.cloud.bigquery.job.query import QueryJob from google.cloud.bigquery_storage_v1 import ( types as bq_storage_types, writer as bq_storage_writer, @@ -621,6 +623,9 @@ def _start_generic_job(self, job: formatting_helpers.GenericJob): else: job.result() + if self._metrics is not None and isinstance(job, (QueryJob, LoadJob)): + self._metrics.count_job_stats(query_job=job) + @overload def read_gbq_table( # type: ignore[overload-overlap] self, diff --git a/packages/bigframes/bigframes/session/metrics.py b/packages/bigframes/bigframes/session/metrics.py index 8d43a83d7309..d2682bbcaf7f 100644 --- a/packages/bigframes/bigframes/session/metrics.py +++ b/packages/bigframes/bigframes/session/metrics.py @@ -15,16 +15,131 @@ from __future__ import annotations import dataclasses +import datetime import os -from typing import Optional, Tuple +from typing import Any, Mapping, Optional, Tuple, Union import google.cloud.bigquery as bigquery -import google.cloud.bigquery.job as bq_job import google.cloud.bigquery.table as bq_table +from google.cloud.bigquery.job.load import LoadJob +from google.cloud.bigquery.job.query import QueryJob LOGGING_NAME_ENV_VAR = "BIGFRAMES_PERFORMANCE_LOG_NAME" +@dataclasses.dataclass +class JobMetadata: + job_id: Optional[str] = None + query_id: Optional[str] = None + location: Optional[str] = None + project: Optional[str] = None + creation_time: Optional[datetime.datetime] = None + start_time: Optional[datetime.datetime] = None + end_time: Optional[datetime.datetime] = None + duration_seconds: Optional[float] = None + status: Optional[str] = None + total_bytes_processed: Optional[int] = None + total_slot_ms: Optional[int] = None + job_type: Optional[str] = None + error_result: Optional[Mapping[str, Any]] = None + cached: Optional[bool] = None + job_url: Optional[str] = None + query: Optional[str] = None + destination_table: Optional[str] = None + source_uris: Optional[list[str]] = None + input_files: Optional[int] = None + input_bytes: Optional[int] = None + output_rows: Optional[int] = None + source_format: Optional[str] = None + + @classmethod + def from_job( + cls, query_job: Union[QueryJob, LoadJob], exec_seconds: Optional[float] = None + ) -> "JobMetadata": + query_text = getattr(query_job, "query", None) + if query_text and len(query_text) > 1024: + query_text = query_text[:1021] + "..." + + job_id = getattr(query_job, "job_id", None) + job_url = None + if job_id: + job_url = f"https://console.cloud.google.com/bigquery?project={query_job.project}&j=bq:{query_job.location}:{job_id}&page=queryresults" + + metadata = cls( + job_id=query_job.job_id, + location=query_job.location, + project=query_job.project, + creation_time=query_job.created, + start_time=query_job.started, + end_time=query_job.ended, + duration_seconds=exec_seconds, + status=query_job.state, + job_type=query_job.job_type, + error_result=query_job.error_result, + query=query_text, + job_url=job_url, + ) + if isinstance(query_job, QueryJob): + metadata.cached = getattr(query_job, "cache_hit", None) + metadata.destination_table = ( + str(query_job.destination) if query_job.destination else None + ) + metadata.total_bytes_processed = getattr( + query_job, "total_bytes_processed", None + ) + metadata.total_slot_ms = getattr(query_job, "slot_millis", None) + elif isinstance(query_job, LoadJob): + metadata.output_rows = getattr(query_job, "output_rows", None) + metadata.input_files = getattr(query_job, "input_files", None) + metadata.input_bytes = getattr(query_job, "input_bytes", None) + metadata.destination_table = ( + str(query_job.destination) + if getattr(query_job, "destination", None) + else None + ) + if getattr(query_job, "source_uris", None): + metadata.source_uris = list(query_job.source_uris) + if query_job.configuration and hasattr( + query_job.configuration, "source_format" + ): + metadata.source_format = query_job.configuration.source_format + + return metadata + + @classmethod + def from_row_iterator( + cls, row_iterator: bq_table.RowIterator, exec_seconds: Optional[float] = None + ) -> "JobMetadata": + query_text = getattr(row_iterator, "query", None) + if query_text and len(query_text) > 1024: + query_text = query_text[:1021] + "..." + + job_id = getattr(row_iterator, "job_id", None) + job_url = None + if job_id: + project = getattr(row_iterator, "project", "") + location = getattr(row_iterator, "location", "") + job_url = f"https://console.cloud.google.com/bigquery?project={project}&j=bq:{location}:{job_id}&page=queryresults" + + return cls( + job_id=job_id, + query_id=getattr(row_iterator, "query_id", None), + location=getattr(row_iterator, "location", None), + project=getattr(row_iterator, "project", None), + creation_time=getattr(row_iterator, "created", None), + start_time=getattr(row_iterator, "started", None), + end_time=getattr(row_iterator, "ended", None), + duration_seconds=exec_seconds, + status="DONE", + total_bytes_processed=getattr(row_iterator, "total_bytes_processed", None), + total_slot_ms=getattr(row_iterator, "slot_millis", None), + job_type="query", + cached=getattr(row_iterator, "cache_hit", None), + query=query_text, + job_url=job_url, + ) + + @dataclasses.dataclass class ExecutionMetrics: execution_count: int = 0 @@ -32,10 +147,11 @@ class ExecutionMetrics: bytes_processed: int = 0 execution_secs: float = 0 query_char_count: int = 0 + jobs: list[JobMetadata] = dataclasses.field(default_factory=list) def count_job_stats( self, - query_job: Optional[bq_job.QueryJob] = None, + query_job: Optional[Union[QueryJob, LoadJob]] = None, row_iterator: Optional[bq_table.RowIterator] = None, ): if query_job is None: @@ -57,21 +173,62 @@ def count_job_stats( self.slot_millis += slot_millis self.execution_secs += exec_seconds - elif query_job.configuration.dry_run: - query_char_count = len(query_job.query) + self.jobs.append( + JobMetadata.from_row_iterator(row_iterator, exec_seconds=exec_seconds) + ) + + elif isinstance(query_job, QueryJob) and query_job.configuration.dry_run: + query_char_count = len(getattr(query_job, "query", "")) # TODO(tswast): Pass None after making benchmark publishing robust to missing data. bytes_processed = 0 slot_millis = 0 exec_seconds = 0.0 - elif (stats := get_performance_stats(query_job)) is not None: - query_char_count, bytes_processed, slot_millis, exec_seconds = stats + elif isinstance(query_job, bigquery.QueryJob): + if (stats := get_performance_stats(query_job)) is not None: + query_char_count, bytes_processed, slot_millis, exec_seconds = stats + self.execution_count += 1 + self.query_char_count += query_char_count or 0 + self.bytes_processed += bytes_processed or 0 + self.slot_millis += slot_millis or 0 + self.execution_secs += exec_seconds or 0 + + metadata = JobMetadata.from_job(query_job, exec_seconds=exec_seconds) + self.jobs.append(metadata) + + else: self.execution_count += 1 - self.query_char_count += query_char_count or 0 - self.bytes_processed += bytes_processed or 0 - self.slot_millis += slot_millis or 0 - self.execution_secs += exec_seconds or 0 + duration = ( + (query_job.ended - query_job.created).total_seconds() + if query_job.ended and query_job.created + else None + ) + self.jobs.append(JobMetadata.from_job(query_job, exec_seconds=duration)) + + # For pytest runs only, log information about the query job + # to a file in order to create a performance report. + if ( + isinstance(query_job, bigquery.QueryJob) + and not query_job.configuration.dry_run + ): + stats = get_performance_stats(query_job) + if stats: + write_stats_to_disk( + query_char_count=stats[0], + bytes_processed=stats[1], + slot_millis=stats[2], + exec_seconds=stats[3], + ) + elif row_iterator is not None: + bytes_processed = getattr(row_iterator, "total_bytes_processed", 0) or 0 + query_char_count = len(getattr(row_iterator, "query", "") or "") + slot_millis = getattr(row_iterator, "slot_millis", 0) or 0 + created = getattr(row_iterator, "created", None) + ended = getattr(row_iterator, "ended", None) + exec_seconds = ( + (ended - created).total_seconds() if created and ended else 0.0 + ) write_stats_to_disk( query_char_count=query_char_count, bytes_processed=bytes_processed, @@ -79,19 +236,25 @@ def count_job_stats( exec_seconds=exec_seconds, ) - else: - # TODO(tswast): Pass None after making benchmark publishing robust to missing data. - bytes_processed = 0 - query_char_count = 0 - slot_millis = 0 - exec_seconds = 0 + def on_event(self, event: Any): + try: + import bigframes.core.events + from bigframes.session.executor import LocalExecuteResult + except ImportError: + return - write_stats_to_disk( - query_char_count=query_char_count, - bytes_processed=bytes_processed, - slot_millis=slot_millis, - exec_seconds=exec_seconds, - ) + if isinstance(event, bigframes.core.events.ExecutionFinished): + if event.result and isinstance(event.result, LocalExecuteResult): + self.execution_count += 1 + bytes_processed = event.result.total_bytes_processed or 0 + self.bytes_processed += bytes_processed + + metadata = JobMetadata( + job_type="polars", + status="DONE", + total_bytes_processed=bytes_processed, + ) + self.jobs.append(metadata) def get_performance_stats( diff --git a/packages/bigframes/tests/system/small/session/test_read_gbq_colab.py b/packages/bigframes/tests/system/small/session/test_read_gbq_colab.py index 65f47fe4e31d..6ba9c760847a 100644 --- a/packages/bigframes/tests/system/small/session/test_read_gbq_colab.py +++ b/packages/bigframes/tests/system/small/session/test_read_gbq_colab.py @@ -89,11 +89,20 @@ def test_read_gbq_colab_fresh_session_is_hybrid(): assert len(result) == 100 assert session._executor._enable_polars_execution is True # type: ignore - assert executions_after == executions_before_python == 1 + assert executions_before_python == 1 + assert executions_after == 2 + history = session.execution_history().to_dataframe() + assert history.iloc[-1]["job_type"] == "polars" def test_read_gbq_colab_peek_avoids_requery(maybe_ordered_session): - executions_before_sql = maybe_ordered_session._metrics.execution_count + history_before = maybe_ordered_session.execution_history().to_dataframe() + queries_before = ( + len(history_before[history_before["job_type"] == "query"]) + if "job_type" in history_before.columns + else 0 + ) + df = maybe_ordered_session._read_gbq_colab( """ SELECT @@ -107,20 +116,36 @@ def test_read_gbq_colab_peek_avoids_requery(maybe_ordered_session): LIMIT 300 """ ) - executions_before_python = maybe_ordered_session._metrics.execution_count + + history_after_read = maybe_ordered_session.execution_history().to_dataframe() + queries_after_read = len( + history_after_read[history_after_read["job_type"] == "query"] + ) + result = df.peek(100) - executions_after = maybe_ordered_session._metrics.execution_count + + history_after_peek = maybe_ordered_session.execution_history().to_dataframe() + queries_after_peek = len( + history_after_peek[history_after_peek["job_type"] == "query"] + ) # Ok, this isn't guaranteed by peek, but should happen with read api based impl # if starts failing, maybe stopped using read api? assert result["total"].is_monotonic_decreasing assert len(result) == 100 - assert executions_after == executions_before_python == executions_before_sql + 1 + assert queries_after_read == queries_before + 1 + assert queries_after_peek == queries_after_read def test_read_gbq_colab_repr_avoids_requery(maybe_ordered_session): - executions_before_sql = maybe_ordered_session._metrics.execution_count + history_before = maybe_ordered_session.execution_history().to_dataframe() + queries_before = ( + len(history_before[history_before["job_type"] == "query"]) + if "job_type" in history_before.columns + else 0 + ) + df = maybe_ordered_session._read_gbq_colab( """ SELECT @@ -134,10 +159,21 @@ def test_read_gbq_colab_repr_avoids_requery(maybe_ordered_session): LIMIT 300 """ ) - executions_before_python = maybe_ordered_session._metrics.execution_count + + history_after_read = maybe_ordered_session.execution_history().to_dataframe() + queries_after_read = len( + history_after_read[history_after_read["job_type"] == "query"] + ) + _ = repr(df) - executions_after = maybe_ordered_session._metrics.execution_count - assert executions_after == executions_before_python == executions_before_sql + 1 + + history_after_repr = maybe_ordered_session.execution_history().to_dataframe() + queries_after_repr = len( + history_after_repr[history_after_repr["job_type"] == "query"] + ) + + assert queries_after_read == queries_before + 1 + assert queries_after_repr == queries_after_read def test_read_gbq_colab_includes_formatted_scalars(session): diff --git a/packages/bigframes/tests/system/small/test_dataframe.py b/packages/bigframes/tests/system/small/test_dataframe.py index 8df13a5bcbda..ce18c6456767 100644 --- a/packages/bigframes/tests/system/small/test_dataframe.py +++ b/packages/bigframes/tests/system/small/test_dataframe.py @@ -945,41 +945,55 @@ def test_join_repr(scalars_dfs_maybe_ordered): def test_repr_w_display_options(scalars_dfs, session): - metrics = session._metrics scalars_df, _ = scalars_dfs # get a pandas df of the expected format df, _ = scalars_df._block.to_pandas() pandas_df = df.set_axis(scalars_df._block.column_labels, axis=1) pandas_df.index.name = scalars_df.index.name - executions_pre = metrics.execution_count + history_pre = session.execution_history().to_dataframe() + queries_pre = ( + len(history_pre[history_pre["job_type"] == "query"]) + if "job_type" in history_pre.columns + else 0 + ) + with bigframes.option_context( "display.max_rows", 10, "display.max_columns", 5, "display.max_colwidth", 10 ): # When there are 10 or fewer rows, the outputs should be identical except for the extra note. actual = scalars_df.head(10).__repr__() - executions_post = metrics.execution_count + + history_post = session.execution_history().to_dataframe() + queries_post = len(history_post[history_post["job_type"] == "query"]) with display_options.pandas_repr(bigframes.options.display): pandas_repr = pandas_df.head(10).__repr__() assert actual == pandas_repr - assert (executions_post - executions_pre) <= 3 + assert (queries_post - queries_pre) <= 2 def test_mimebundle_html_repr_w_all_rows(scalars_dfs, session): - metrics = session._metrics scalars_df, _ = scalars_dfs # get a pandas df of the expected format df, _ = scalars_df._block.to_pandas() pandas_df = df.set_axis(scalars_df._block.column_labels, axis=1) pandas_df.index.name = scalars_df.index.name - executions_pre = metrics.execution_count + history_pre = session.execution_history().to_dataframe() + queries_pre = ( + len(history_pre[history_pre["job_type"] == "query"]) + if "job_type" in history_pre.columns + else 0 + ) + # When there are 10 or fewer rows, the outputs should be identical except for the extra note. bundle = scalars_df.head(10)._repr_mimebundle_() actual = bundle["text/html"] - executions_post = metrics.execution_count + + history_post = session.execution_history().to_dataframe() + queries_post = len(history_post[history_post["job_type"] == "query"]) with display_options.pandas_repr(bigframes.options.display): pandas_repr = pandas_df.head(10)._repr_html_() @@ -989,7 +1003,7 @@ def test_mimebundle_html_repr_w_all_rows(scalars_dfs, session): + f"[{len(pandas_df.index)} rows x {len(pandas_df.columns)} columns in total]" ) assert actual == expected - assert (executions_post - executions_pre) <= 3 + assert (queries_post - queries_pre) <= 2 def test_df_column_name_with_space(scalars_dfs): @@ -3094,18 +3108,23 @@ def test_binop_with_self_aggregate(scalars_dfs_maybe_ordered): df_columns = ["int64_col", "float64_col", "int64_too"] - # Ensure that this takes the optimized single-query path by counting executions - execution_count_before = scalars_df._session._metrics.execution_count + history_before = scalars_df._session.execution_history().to_dataframe() + queries_before = ( + len(history_before[history_before["job_type"] == "query"]) + if "job_type" in history_before.columns + else 0 + ) + bf_df = scalars_df[df_columns] bf_result = (bf_df - bf_df.mean()).to_pandas() - execution_count_after = scalars_df._session._metrics.execution_count + + history_after = scalars_df._session.execution_history().to_dataframe() + queries_after = len(history_after[history_after["job_type"] == "query"]) pd_df = scalars_pandas_df[df_columns] pd_result = pd_df - pd_df.mean() - executions = execution_count_after - execution_count_before - - assert executions == 1 + assert (queries_after - queries_before) == 1 assert_frame_equal(bf_result, pd_result, check_dtype=False) @@ -3114,18 +3133,23 @@ def test_binop_with_self_aggregate_w_index_reset(scalars_dfs_maybe_ordered): df_columns = ["int64_col", "float64_col", "int64_too"] - # Ensure that this takes the optimized single-query path by counting executions - execution_count_before = scalars_df._session._metrics.execution_count + history_before = scalars_df._session.execution_history().to_dataframe() + queries_before = ( + len(history_before[history_before["job_type"] == "query"]) + if "job_type" in history_before.columns + else 0 + ) + bf_df = scalars_df[df_columns].reset_index(drop=True) bf_result = (bf_df - bf_df.mean()).to_pandas() - execution_count_after = scalars_df._session._metrics.execution_count + + history_after = scalars_df._session.execution_history().to_dataframe() + queries_after = len(history_after[history_after["job_type"] == "query"]) pd_df = scalars_pandas_df[df_columns].reset_index(drop=True) pd_result = pd_df - pd_df.mean() - executions = execution_count_after - execution_count_before - - assert executions == 1 + assert (queries_after - queries_before) == 1 pd_result.index = pd_result.index.astype("Int64") assert_frame_equal(bf_result, pd_result, check_dtype=False, check_index_type=False) @@ -5948,16 +5972,22 @@ def test_dataframe_explode(col_names, ignore_index, session): "C": [["a", "b", "c"], np.nan, ["d", "e"]], } - metrics = session._metrics df = bpd.DataFrame(data, session=session) pd_df = df.to_pandas() pd_result = pd_df.explode(col_names, ignore_index=ignore_index) bf_result = df.explode(col_names, ignore_index=ignore_index) - # Check that to_pandas() results in at most a single query execution - execs_pre = metrics.execution_count + history_pre = session.execution_history().to_dataframe() + queries_pre = ( + len(history_pre[history_pre["job_type"] == "query"]) + if "job_type" in history_pre.columns + else 0 + ) + bf_materialized = bf_result.to_pandas() - execs_post = metrics.execution_count + + history_post = session.execution_history().to_dataframe() + queries_post = len(history_post[history_post["job_type"] == "query"]) bigframes.testing.utils.assert_frame_equal( bf_materialized, @@ -5967,7 +5997,7 @@ def test_dataframe_explode(col_names, ignore_index, session): ) # we test this property on this method in particular as compilation # is non-deterministic and won't use the query cache as implemented - assert execs_post - execs_pre <= 1 + assert (queries_post - queries_pre) <= 1 @pytest.mark.parametrize( diff --git a/packages/bigframes/tests/system/small/test_polars_execution.py b/packages/bigframes/tests/system/small/test_polars_execution.py index 1b58dc9d12b1..fad8d9dba2f1 100644 --- a/packages/bigframes/tests/system/small/test_polars_execution.py +++ b/packages/bigframes/tests/system/small/test_polars_execution.py @@ -39,7 +39,7 @@ def test_polar_execution_sorted(session_w_polars, scalars_pandas_df_index): ] bf_result = bf_df.sort_index(ascending=False)[["int64_too", "bool_col"]].to_pandas() - assert session_w_polars._metrics.execution_count == execution_count_before + assert session_w_polars._metrics.execution_count == execution_count_before + 1 assert_frame_equal(bf_result, pd_result) @@ -56,7 +56,7 @@ def test_polar_execution_sorted_filtered(session_w_polars, scalars_pandas_df_ind .to_pandas() ) - assert session_w_polars._metrics.execution_count == execution_count_before + assert session_w_polars._metrics.execution_count == execution_count_before + 1 assert_frame_equal(bf_result, pd_result) @@ -70,5 +70,28 @@ def test_polar_execution_unsupported_sql_fallback( bf_result = bf_df.to_pandas() # geo fns not supported by polar engine yet, so falls back to bq execution - assert session_w_polars._metrics.execution_count == (execution_count_before + 1) + assert session_w_polars._metrics.execution_count == (execution_count_before + 2) assert math.isclose(bf_result.geo_area.sum(), 70.52332050, rel_tol=0.00001) + + +def test_polars_execution_history(session_w_polars): + import pandas as pd + + # Create a small local DataFrame + pdf = pd.DataFrame({"col_a": [1, 2, 3], "col_b": ["x", "y", "z"]}) + + # Read simple local data + df = session_w_polars.read_pandas(pdf) + + # Trigger execution + _ = df.to_pandas() + + # Verify the execution history captured the local job + history = session_w_polars.execution_history().to_dataframe() + + # Verify we have at least one job and logged as polars + assert len(history) > 0 + last_job = history.iloc[-1] + + assert last_job["job_type"] == "polars" + assert last_job["status"] == "DONE" diff --git a/packages/bigframes/tests/system/small/test_series_io.py b/packages/bigframes/tests/system/small/test_series_io.py index 2f1780812ae8..83c2de70cae6 100644 --- a/packages/bigframes/tests/system/small/test_series_io.py +++ b/packages/bigframes/tests/system/small/test_series_io.py @@ -30,13 +30,23 @@ def test_to_pandas_override_global_option(scalars_df_index): assert table_id is not None session = bf_series._block.session - execution_count = session._metrics.execution_count + + history_before = session.execution_history().to_dataframe() + queries_before = ( + len(history_before[history_before["job_type"] == "query"]) + if "job_type" in history_before.columns + else 0 + ) # When allow_large_results=False, a query_job object should not be created. # Therefore, the table_id should remain unchanged. bf_series.to_pandas(allow_large_results=False) assert bf_series._query_job.destination.table_id == table_id - assert session._metrics.execution_count - execution_count == 1 + + history_after = session.execution_history().to_dataframe() + queries_after = len(history_after[history_after["job_type"] == "query"]) + + assert (queries_after - queries_before) == 1 @pytest.mark.parametrize( diff --git a/packages/bigframes/tests/unit/session/test_clients.py b/packages/bigframes/tests/unit/session/test_clients.py index 0a9b3edffefa..0de6c75e01b0 100644 --- a/packages/bigframes/tests/unit/session/test_clients.py +++ b/packages/bigframes/tests/unit/session/test_clients.py @@ -182,12 +182,18 @@ def test_user_agent_not_in_vscode(monkeypatch): @mock.patch.dict(os.environ, {"VSCODE_PID": "12345"}, clear=True) def test_user_agent_in_vscode(monkeypatch): monkeypatch_client_constructors(monkeypatch) - provider = create_clients_provider() - assert_clients_w_user_agent(provider, "vscode") - assert_clients_wo_user_agent(provider, "googlecloudtools.cloudcode") - # We still need to include attribution to bigframes - assert_clients_w_user_agent(provider, f"bigframes/{bigframes.version.__version__}") + with tempfile.TemporaryDirectory() as tmpdir: + user_home = pathlib.Path(tmpdir) + with mock.patch("pathlib.Path.home", return_value=user_home): + provider = create_clients_provider() + assert_clients_w_user_agent(provider, "vscode") + assert_clients_wo_user_agent(provider, "googlecloudtools.cloudcode") + + # We still need to include attribution to bigframes + assert_clients_w_user_agent( + provider, f"bigframes/{bigframes.version.__version__}" + ) @mock.patch.dict(os.environ, {"VSCODE_PID": "12345"}, clear=True) diff --git a/packages/bigframes/tests/unit/session/test_metrics.py b/packages/bigframes/tests/unit/session/test_metrics.py index 7c2f01c5b98f..296c6e96c5af 100644 --- a/packages/bigframes/tests/unit/session/test_metrics.py +++ b/packages/bigframes/tests/unit/session/test_metrics.py @@ -245,3 +245,21 @@ def test_write_stats_to_disk_no_env_var(tmp_path, monkeypatch): exec_seconds=1.23, ) assert len(list(tmp_path.iterdir())) == 0 + + +def test_on_event_with_local_execute_result(): + import bigframes.core.events + from bigframes.session.executor import LocalExecuteResult + + local_result = unittest.mock.create_autospec(LocalExecuteResult, instance=True) + local_result.total_bytes_processed = 1024 + + event = bigframes.core.events.ExecutionFinished(result=local_result) + execution_metrics = metrics.ExecutionMetrics() + execution_metrics.on_event(event) + + assert execution_metrics.execution_count == 1 + assert len(execution_metrics.jobs) == 1 + assert execution_metrics.jobs[0].job_type == "polars" + assert execution_metrics.jobs[0].status == "DONE" + assert execution_metrics.jobs[0].total_bytes_processed == 1024 diff --git a/packages/bigframes/tests/unit/test_pandas.py b/packages/bigframes/tests/unit/test_pandas.py index e1e713697db1..a79d7a059bb9 100644 --- a/packages/bigframes/tests/unit/test_pandas.py +++ b/packages/bigframes/tests/unit/test_pandas.py @@ -37,6 +37,8 @@ def all_session_methods(): session_attributes.remove("close") # streaming isn't in pandas session_attributes.remove("read_gbq_table_streaming") + # execution_history is in base namespace, not pandas + session_attributes.remove("execution_history") for attribute in sorted(session_attributes): session_method = getattr(bigframes.session.Session, attribute)