Skip to content
This repository was archived by the owner on Apr 1, 2026. It is now read-only.

Commit 5c5bdde

Browse files
committed
feat: Move execution_history to base namespace and implement job tracking
1 parent be33279 commit 5c5bdde

File tree

6 files changed

+226
-25
lines changed

6 files changed

+226
-25
lines changed

bigframes/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from bigframes._config.bigquery_options import BigQueryOptions # noqa: E402
2929
from bigframes.core.global_session import ( # noqa: E402
3030
close_session,
31+
execution_history,
3132
get_global_session,
3233
)
3334
import bigframes.enums as enums # noqa: E402
@@ -57,6 +58,7 @@ def load_ipython_extension(ipython):
5758
"BigQueryOptions",
5859
"get_global_session",
5960
"close_session",
61+
"execution_history",
6062
"enums",
6163
"exceptions",
6264
"connect",

bigframes/core/global_session.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import bigframes.exceptions as bfe
2727

2828
if TYPE_CHECKING:
29+
import pandas
30+
2931
import bigframes.session
3032

3133
_global_session: Optional[bigframes.session.Session] = None
@@ -124,6 +126,14 @@ def with_default_session(func_: Callable[..., _T], *args, **kwargs) -> _T:
124126
return func_(get_global_session(), *args, **kwargs)
125127

126128

129+
def execution_history() -> "pandas.DataFrame":
130+
import pandas # noqa: F401
131+
132+
import bigframes.session
133+
134+
return with_default_session(bigframes.session.Session.execution_history)
135+
136+
127137
class _GlobalSessionContext:
128138
"""
129139
Context manager for testing that sets global session.

bigframes/session/__init__.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,39 @@
109109
logger = logging.getLogger(__name__)
110110

111111

112+
class _ExecutionHistory(pandas.DataFrame):
113+
@property
114+
def _constructor(self):
115+
return _ExecutionHistory
116+
117+
def _repr_html_(self) -> str | None:
118+
try:
119+
import bigframes.formatting_helpers as formatter
120+
121+
if self.empty:
122+
return "<div>No executions found.</div>"
123+
124+
cols = ["job_id", "status", "total_bytes_processed", "job_url"]
125+
df_display = self[cols].copy()
126+
df_display["total_bytes_processed"] = df_display[
127+
"total_bytes_processed"
128+
].apply(formatter.get_formatted_bytes)
129+
130+
def format_url(url):
131+
return f'<a target="_blank" href="{url}">Open Job</a>' if url else ""
132+
133+
df_display["job_url"] = df_display["job_url"].apply(format_url)
134+
135+
# Rename job_id to query_id to match user expectations
136+
df_display = df_display.rename(columns={"job_id": "query_id"})
137+
138+
compact_html = df_display.to_html(escape=False, index=False)
139+
140+
return compact_html
141+
except Exception:
142+
return super()._repr_html_() # type: ignore
143+
144+
112145
@log_adapter.class_logger
113146
class Session(
114147
third_party_pandas_gbq.GBQIOMixin,
@@ -371,6 +404,10 @@ def slot_millis_sum(self):
371404
"""The sum of all slot time used by bigquery jobs in this session."""
372405
return self._metrics.slot_millis
373406

407+
def execution_history(self) -> pandas.DataFrame:
408+
"""Returns a list of underlying BigQuery executions initiated by BigFrames in the current session."""
409+
return _ExecutionHistory([job.__dict__ for job in self._metrics.jobs])
410+
374411
@property
375412
def _allows_ambiguity(self) -> bool:
376413
return self._allow_ambiguity

bigframes/session/loader.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@
4949
from google.cloud import bigquery_storage_v1
5050
import google.cloud.bigquery
5151
import google.cloud.bigquery as bigquery
52+
from google.cloud.bigquery.job.load import LoadJob
53+
from google.cloud.bigquery.job.query import QueryJob
5254
import google.cloud.bigquery.table
5355
from google.cloud.bigquery_storage_v1 import types as bq_storage_types
5456
import pandas
@@ -605,6 +607,9 @@ def _start_generic_job(self, job: formatting_helpers.GenericJob):
605607
else:
606608
job.result()
607609

610+
if self._metrics is not None and isinstance(job, (QueryJob, LoadJob)):
611+
self._metrics.count_job_stats(query_job=job)
612+
608613
@overload
609614
def read_gbq_table( # type: ignore[overload-overlap]
610615
self,

bigframes/session/metrics.py

Lines changed: 170 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -15,27 +15,143 @@
1515
from __future__ import annotations
1616

1717
import dataclasses
18+
import datetime
1819
import os
19-
from typing import Optional, Tuple
20+
from typing import Any, Mapping, Optional, Tuple, Union
2021

2122
import google.cloud.bigquery as bigquery
22-
import google.cloud.bigquery.job as bq_job
23+
from google.cloud.bigquery.job.load import LoadJob
24+
from google.cloud.bigquery.job.query import QueryJob
2325
import google.cloud.bigquery.table as bq_table
2426

2527
LOGGING_NAME_ENV_VAR = "BIGFRAMES_PERFORMANCE_LOG_NAME"
2628

2729

30+
@dataclasses.dataclass
31+
class JobMetadata:
32+
job_id: Optional[str] = None
33+
query_id: Optional[str] = None
34+
location: Optional[str] = None
35+
project: Optional[str] = None
36+
creation_time: Optional[datetime.datetime] = None
37+
start_time: Optional[datetime.datetime] = None
38+
end_time: Optional[datetime.datetime] = None
39+
duration_seconds: Optional[float] = None
40+
status: Optional[str] = None
41+
total_bytes_processed: Optional[int] = None
42+
total_slot_ms: Optional[int] = None
43+
job_type: Optional[str] = None
44+
error_result: Optional[Mapping[str, Any]] = None
45+
cached: Optional[bool] = None
46+
job_url: Optional[str] = None
47+
query: Optional[str] = None
48+
destination_table: Optional[str] = None
49+
source_uris: Optional[list[str]] = None
50+
input_files: Optional[int] = None
51+
input_bytes: Optional[int] = None
52+
output_rows: Optional[int] = None
53+
source_format: Optional[str] = None
54+
55+
@classmethod
56+
def from_job(
57+
cls, query_job: Union[QueryJob, LoadJob], exec_seconds: Optional[float] = None
58+
) -> "JobMetadata":
59+
query_text = getattr(query_job, "query", None)
60+
if query_text and len(query_text) > 1024:
61+
query_text = query_text[:1021] + "..."
62+
63+
job_id = getattr(query_job, "job_id", None)
64+
job_url = None
65+
if job_id:
66+
job_url = f"https://console.cloud.google.com/bigquery?project={query_job.project}&j=bq:{query_job.location}:{job_id}&page=queryresults"
67+
68+
metadata = cls(
69+
job_id=query_job.job_id,
70+
location=query_job.location,
71+
project=query_job.project,
72+
creation_time=query_job.created,
73+
start_time=query_job.started,
74+
end_time=query_job.ended,
75+
duration_seconds=exec_seconds,
76+
status=query_job.state,
77+
job_type=query_job.job_type,
78+
error_result=query_job.error_result,
79+
query=query_text,
80+
job_url=job_url,
81+
)
82+
if isinstance(query_job, QueryJob):
83+
metadata.cached = getattr(query_job, "cache_hit", None)
84+
metadata.destination_table = (
85+
str(query_job.destination) if query_job.destination else None
86+
)
87+
metadata.total_bytes_processed = getattr(
88+
query_job, "total_bytes_processed", None
89+
)
90+
metadata.total_slot_ms = getattr(query_job, "slot_millis", None)
91+
elif isinstance(query_job, LoadJob):
92+
metadata.output_rows = getattr(query_job, "output_rows", None)
93+
metadata.input_files = getattr(query_job, "input_files", None)
94+
metadata.input_bytes = getattr(query_job, "input_bytes", None)
95+
metadata.destination_table = (
96+
str(query_job.destination)
97+
if getattr(query_job, "destination", None)
98+
else None
99+
)
100+
if getattr(query_job, "source_uris", None):
101+
metadata.source_uris = list(query_job.source_uris)
102+
if query_job.configuration and hasattr(
103+
query_job.configuration, "source_format"
104+
):
105+
metadata.source_format = query_job.configuration.source_format
106+
107+
return metadata
108+
109+
@classmethod
110+
def from_row_iterator(
111+
cls, row_iterator: bq_table.RowIterator, exec_seconds: Optional[float] = None
112+
) -> "JobMetadata":
113+
query_text = getattr(row_iterator, "query", None)
114+
if query_text and len(query_text) > 1024:
115+
query_text = query_text[:1021] + "..."
116+
117+
job_id = getattr(row_iterator, "job_id", None)
118+
job_url = None
119+
if job_id:
120+
project = getattr(row_iterator, "project", "")
121+
location = getattr(row_iterator, "location", "")
122+
job_url = f"https://console.cloud.google.com/bigquery?project={project}&j=bq:{location}:{job_id}&page=queryresults"
123+
124+
return cls(
125+
job_id=job_id,
126+
query_id=getattr(row_iterator, "query_id", None),
127+
location=getattr(row_iterator, "location", None),
128+
project=getattr(row_iterator, "project", None),
129+
creation_time=getattr(row_iterator, "created", None),
130+
start_time=getattr(row_iterator, "started", None),
131+
end_time=getattr(row_iterator, "ended", None),
132+
duration_seconds=exec_seconds,
133+
status="DONE",
134+
total_bytes_processed=getattr(row_iterator, "total_bytes_processed", None),
135+
total_slot_ms=getattr(row_iterator, "slot_millis", None),
136+
job_type="query",
137+
cached=getattr(row_iterator, "cache_hit", None),
138+
query=query_text,
139+
job_url=job_url,
140+
)
141+
142+
28143
@dataclasses.dataclass
29144
class ExecutionMetrics:
30145
execution_count: int = 0
31146
slot_millis: int = 0
32147
bytes_processed: int = 0
33148
execution_secs: float = 0
34149
query_char_count: int = 0
150+
jobs: list[JobMetadata] = dataclasses.field(default_factory=list)
35151

36152
def count_job_stats(
37153
self,
38-
query_job: Optional[bq_job.QueryJob] = None,
154+
query_job: Optional[Union[QueryJob, LoadJob]] = None,
39155
row_iterator: Optional[bq_table.RowIterator] = None,
40156
):
41157
if query_job is None:
@@ -57,42 +173,71 @@ def count_job_stats(
57173
self.slot_millis += slot_millis
58174
self.execution_secs += exec_seconds
59175

60-
elif query_job.configuration.dry_run:
61-
query_char_count = len(query_job.query)
176+
self.jobs.append(
177+
JobMetadata.from_row_iterator(row_iterator, exec_seconds=exec_seconds)
178+
)
179+
180+
elif isinstance(query_job, QueryJob) and query_job.configuration.dry_run:
181+
query_char_count = len(getattr(query_job, "query", ""))
62182

63183
# TODO(tswast): Pass None after making benchmark publishing robust to missing data.
64184
bytes_processed = 0
65185
slot_millis = 0
66186
exec_seconds = 0.0
67187

68-
elif (stats := get_performance_stats(query_job)) is not None:
69-
query_char_count, bytes_processed, slot_millis, exec_seconds = stats
188+
elif isinstance(query_job, bigquery.QueryJob):
189+
if (stats := get_performance_stats(query_job)) is not None:
190+
query_char_count, bytes_processed, slot_millis, exec_seconds = stats
191+
self.execution_count += 1
192+
self.query_char_count += query_char_count or 0
193+
self.bytes_processed += bytes_processed or 0
194+
self.slot_millis += slot_millis or 0
195+
self.execution_secs += exec_seconds or 0
196+
197+
metadata = JobMetadata.from_job(query_job, exec_seconds=exec_seconds)
198+
metadata.total_bytes_processed = bytes_processed
199+
metadata.total_slot_ms = slot_millis
200+
self.jobs.append(metadata)
201+
202+
else:
70203
self.execution_count += 1
71-
self.query_char_count += query_char_count or 0
72-
self.bytes_processed += bytes_processed or 0
73-
self.slot_millis += slot_millis or 0
74-
self.execution_secs += exec_seconds or 0
204+
duration = (
205+
(query_job.ended - query_job.created).total_seconds()
206+
if query_job.ended and query_job.created
207+
else None
208+
)
209+
self.jobs.append(JobMetadata.from_job(query_job, exec_seconds=duration))
210+
211+
# For pytest runs only, log information about the query job
212+
# to a file in order to create a performance report.
213+
if (
214+
isinstance(query_job, bigquery.QueryJob)
215+
and not query_job.configuration.dry_run
216+
):
217+
stats = get_performance_stats(query_job)
218+
if stats:
219+
write_stats_to_disk(
220+
query_char_count=stats[0],
221+
bytes_processed=stats[1],
222+
slot_millis=stats[2],
223+
exec_seconds=stats[3],
224+
)
225+
elif row_iterator is not None:
226+
bytes_processed = getattr(row_iterator, "total_bytes_processed", 0) or 0
227+
query_char_count = len(getattr(row_iterator, "query", "") or "")
228+
slot_millis = getattr(row_iterator, "slot_millis", 0) or 0
229+
created = getattr(row_iterator, "created", None)
230+
ended = getattr(row_iterator, "ended", None)
231+
exec_seconds = (
232+
(ended - created).total_seconds() if created and ended else 0.0
233+
)
75234
write_stats_to_disk(
76235
query_char_count=query_char_count,
77236
bytes_processed=bytes_processed,
78237
slot_millis=slot_millis,
79238
exec_seconds=exec_seconds,
80239
)
81240

82-
else:
83-
# TODO(tswast): Pass None after making benchmark publishing robust to missing data.
84-
bytes_processed = 0
85-
query_char_count = 0
86-
slot_millis = 0
87-
exec_seconds = 0
88-
89-
write_stats_to_disk(
90-
query_char_count=query_char_count,
91-
bytes_processed=bytes_processed,
92-
slot_millis=slot_millis,
93-
exec_seconds=exec_seconds,
94-
)
95-
96241

97242
def get_performance_stats(
98243
query_job: bigquery.QueryJob,

tests/unit/test_pandas.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ def all_session_methods():
3737
session_attributes.remove("close")
3838
# streaming isn't in pandas
3939
session_attributes.remove("read_gbq_table_streaming")
40+
# execution_history is in base namespace, not pandas
41+
session_attributes.remove("execution_history")
4042

4143
for attribute in sorted(session_attributes):
4244
session_method = getattr(bigframes.session.Session, attribute)

0 commit comments

Comments
 (0)