Skip to content
This repository was archived by the owner on Apr 1, 2026. It is now read-only.

Commit 9c2513e

Browse files
committed
feat: Add bigframes.pandas.job_history() API to track BigQuery jobs
1 parent 318752a commit 9c2513e

File tree

6 files changed

+360
-25
lines changed

6 files changed

+360
-25
lines changed

bigframes/pandas/__init__.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,13 @@ def reset_session():
362362
reset_session.__doc__ = global_session.close_session.__doc__
363363

364364

365+
def job_history() -> pandas.DataFrame:
366+
return global_session.with_default_session(bigframes.session.Session.job_history)
367+
368+
369+
job_history.__doc__ = inspect.getdoc(bigframes.session.Session.job_history)
370+
371+
365372
# SQL Compilation uses recursive algorithms on deep trees
366373
# 10M tree depth should be sufficient to generate any sql that is under bigquery limit
367374
# Note: This limit does not have the desired effect on Python 3.12 in
@@ -385,6 +392,7 @@ def reset_session():
385392
deploy_remote_function,
386393
deploy_udf,
387394
get_default_session_id,
395+
job_history,
388396
get_dummies,
389397
merge,
390398
qcut,
@@ -419,6 +427,7 @@ def reset_session():
419427
"deploy_remote_function",
420428
"deploy_udf",
421429
"get_default_session_id",
430+
"job_history",
422431
"get_dummies",
423432
"merge",
424433
"qcut",

bigframes/session/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,10 @@ def slot_millis_sum(self):
366366
"""The sum of all slot time used by bigquery jobs in this session."""
367367
return self._metrics.slot_millis
368368

369+
def job_history(self) -> pandas.DataFrame:
370+
"""Returns a list of BigQuery jobs initiated by BigFrames in the current session."""
371+
return pandas.DataFrame([job.__dict__ for job in self._metrics.jobs])
372+
369373
@property
370374
def _allows_ambiguity(self) -> bool:
371375
return self._allow_ambiguity

bigframes/session/_io/bigquery/read_gbq_table.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,7 @@ def check_if_index_columns_are_unique(
326326
index_cols: List[str],
327327
*,
328328
publisher: bigframes.core.events.Publisher,
329+
metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None,
329330
) -> Tuple[str, ...]:
330331
import bigframes.core.sql
331332
import bigframes.session._io.bigquery
@@ -341,7 +342,7 @@ def check_if_index_columns_are_unique(
341342
timeout=None,
342343
location=None,
343344
project=None,
344-
metrics=None,
345+
metrics=metrics,
345346
query_with_job=False,
346347
publisher=publisher,
347348
)

bigframes/session/loader.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -562,6 +562,9 @@ def _start_generic_job(self, job: formatting_helpers.GenericJob):
562562
else:
563563
job.result()
564564

565+
if self._metrics is not None and isinstance(job, google.cloud.bigquery.job.Job):
566+
self._metrics.count_job_stats(query_job=job)
567+
565568
@overload
566569
def read_gbq_table( # type: ignore[overload-overlap]
567570
self,
@@ -878,6 +881,7 @@ def read_gbq_table(
878881
table=table,
879882
index_cols=index_cols,
880883
publisher=self._publisher,
884+
metrics=self._metrics,
881885
)
882886
if publish_execution:
883887
self._publisher.publish(

bigframes/session/metrics.py

Lines changed: 146 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,27 +15,54 @@
1515
from __future__ import annotations
1616

1717
import dataclasses
18+
import datetime
1819
import os
19-
from typing import Optional, Tuple
20+
from typing import Any, Mapping, Optional, Tuple
2021

2122
import google.cloud.bigquery as bigquery
22-
import google.cloud.bigquery.job as bq_job
2323
import google.cloud.bigquery.table as bq_table
2424

2525
LOGGING_NAME_ENV_VAR = "BIGFRAMES_PERFORMANCE_LOG_NAME"
2626

2727

28+
@dataclasses.dataclass
29+
class JobMetadata:
30+
job_id: Optional[str] = None
31+
query_id: Optional[str] = None
32+
location: Optional[str] = None
33+
project: Optional[str] = None
34+
creation_time: Optional[datetime.datetime] = None
35+
start_time: Optional[datetime.datetime] = None
36+
end_time: Optional[datetime.datetime] = None
37+
duration_seconds: Optional[float] = None
38+
status: Optional[str] = None
39+
total_bytes_processed: Optional[int] = None
40+
total_slot_ms: Optional[int] = None
41+
job_type: Optional[str] = None
42+
error_result: Optional[Mapping[str, Any]] = None
43+
cached: Optional[bool] = None
44+
job_url: Optional[str] = None
45+
query: Optional[str] = None
46+
destination_table: Optional[str] = None
47+
source_uris: Optional[list[str]] = None
48+
input_files: Optional[int] = None
49+
input_bytes: Optional[int] = None
50+
output_rows: Optional[int] = None
51+
source_format: Optional[str] = None
52+
53+
2854
@dataclasses.dataclass
2955
class ExecutionMetrics:
3056
execution_count: int = 0
3157
slot_millis: int = 0
3258
bytes_processed: int = 0
3359
execution_secs: float = 0
3460
query_char_count: int = 0
61+
jobs: list[JobMetadata] = dataclasses.field(default_factory=list)
3562

3663
def count_job_stats(
3764
self,
38-
query_job: Optional[bq_job.QueryJob] = None,
65+
query_job: Optional[bigquery.job.Job] = None,
3966
row_iterator: Optional[bq_table.RowIterator] = None,
4067
):
4168
if query_job is None:
@@ -57,42 +84,137 @@ def count_job_stats(
5784
self.slot_millis += slot_millis
5885
self.execution_secs += exec_seconds
5986

87+
self.jobs.append(
88+
JobMetadata(
89+
job_id=getattr(row_iterator, "job_id", None),
90+
query_id=getattr(row_iterator, "query_id", None),
91+
location=getattr(row_iterator, "location", None),
92+
project=getattr(row_iterator, "project", None),
93+
creation_time=created,
94+
start_time=getattr(row_iterator, "started", None),
95+
end_time=ended,
96+
duration_seconds=exec_seconds,
97+
status="DONE",
98+
total_bytes_processed=bytes_processed,
99+
total_slot_ms=slot_millis,
100+
job_type="query",
101+
cached=getattr(row_iterator, "cache_hit", None),
102+
query=getattr(row_iterator, "query", None),
103+
job_url=f"https://console.cloud.google.com/bigquery?project={getattr(row_iterator, 'project', '')}&j=bq:{getattr(row_iterator, 'location', '')}:{getattr(row_iterator, 'job_id', '')}&page=queryresults"
104+
if getattr(row_iterator, "job_id", None)
105+
else None,
106+
)
107+
)
108+
60109
elif query_job.configuration.dry_run:
61-
query_char_count = len(query_job.query)
110+
query_char_count = len(getattr(query_job, "query", ""))
62111

63112
# TODO(tswast): Pass None after making benchmark publishing robust to missing data.
64113
bytes_processed = 0
65114
slot_millis = 0
66115
exec_seconds = 0.0
67116

68-
elif (stats := get_performance_stats(query_job)) is not None:
69-
query_char_count, bytes_processed, slot_millis, exec_seconds = stats
117+
elif isinstance(query_job, bigquery.QueryJob):
118+
if (stats := get_performance_stats(query_job)) is not None:
119+
query_char_count, bytes_processed, slot_millis, exec_seconds = stats
120+
self.execution_count += 1
121+
self.query_char_count += query_char_count or 0
122+
self.bytes_processed += bytes_processed or 0
123+
self.slot_millis += slot_millis or 0
124+
self.execution_secs += exec_seconds or 0
125+
126+
self.jobs.append(
127+
JobMetadata(
128+
job_id=query_job.job_id,
129+
location=query_job.location,
130+
project=query_job.project,
131+
creation_time=query_job.created,
132+
start_time=query_job.started,
133+
end_time=query_job.ended,
134+
duration_seconds=exec_seconds,
135+
status=query_job.state,
136+
total_bytes_processed=bytes_processed,
137+
total_slot_ms=slot_millis,
138+
job_type=query_job.job_type,
139+
error_result=query_job.error_result,
140+
cached=query_job.cache_hit,
141+
query=query_job.query,
142+
destination_table=str(query_job.destination)
143+
if query_job.destination
144+
else None,
145+
job_url=f"https://console.cloud.google.com/bigquery?project={query_job.project}&j=bq:{query_job.location}:{query_job.job_id}&page=queryresults",
146+
)
147+
)
148+
149+
else:
150+
# Handle other job types (e.g. LoadJob)
70151
self.execution_count += 1
71-
self.query_char_count += query_char_count or 0
72-
self.bytes_processed += bytes_processed or 0
73-
self.slot_millis += slot_millis or 0
74-
self.execution_secs += exec_seconds or 0
152+
duration = (
153+
(query_job.ended - query_job.created).total_seconds()
154+
if query_job.ended and query_job.created
155+
else None
156+
)
157+
158+
job_metadata = JobMetadata(
159+
job_id=query_job.job_id,
160+
location=query_job.location,
161+
project=query_job.project,
162+
creation_time=query_job.created,
163+
start_time=query_job.started,
164+
end_time=query_job.ended,
165+
duration_seconds=duration,
166+
status=query_job.state,
167+
job_type=query_job.job_type,
168+
error_result=query_job.error_result,
169+
job_url=f"https://console.cloud.google.com/bigquery?project={query_job.project}&j=bq:{query_job.location}:{query_job.job_id}&page=queryresults",
170+
)
171+
172+
if isinstance(query_job, bigquery.LoadJob):
173+
job_metadata.output_rows = getattr(query_job, "output_rows", None)
174+
job_metadata.input_files = getattr(query_job, "input_files", None)
175+
job_metadata.input_bytes = getattr(query_job, "input_bytes", None)
176+
job_metadata.destination_table = (
177+
str(query_job.destination) if query_job.destination else None
178+
)
179+
if query_job.source_uris:
180+
job_metadata.source_uris = list(query_job.source_uris)
181+
if query_job.configuration and hasattr(
182+
query_job.configuration, "source_format"
183+
):
184+
job_metadata.source_format = query_job.configuration.source_format
185+
186+
self.jobs.append(job_metadata)
187+
188+
# For pytest runs only, log information about the query job
189+
# to a file in order to create a performance report.
190+
if (
191+
isinstance(query_job, bigquery.QueryJob)
192+
and not query_job.configuration.dry_run
193+
):
194+
stats = get_performance_stats(query_job)
195+
if stats:
196+
write_stats_to_disk(
197+
query_char_count=stats[0],
198+
bytes_processed=stats[1],
199+
slot_millis=stats[2],
200+
exec_seconds=stats[3],
201+
)
202+
elif row_iterator is not None:
203+
bytes_processed = getattr(row_iterator, "total_bytes_processed", 0) or 0
204+
query_char_count = len(getattr(row_iterator, "query", "") or "")
205+
slot_millis = getattr(row_iterator, "slot_millis", 0) or 0
206+
created = getattr(row_iterator, "created", None)
207+
ended = getattr(row_iterator, "ended", None)
208+
exec_seconds = (
209+
(ended - created).total_seconds() if created and ended else 0.0
210+
)
75211
write_stats_to_disk(
76212
query_char_count=query_char_count,
77213
bytes_processed=bytes_processed,
78214
slot_millis=slot_millis,
79215
exec_seconds=exec_seconds,
80216
)
81217

82-
else:
83-
# TODO(tswast): Pass None after making benchmark publishing robust to missing data.
84-
bytes_processed = 0
85-
query_char_count = 0
86-
slot_millis = 0
87-
exec_seconds = 0
88-
89-
write_stats_to_disk(
90-
query_char_count=query_char_count,
91-
bytes_processed=bytes_processed,
92-
slot_millis=slot_millis,
93-
exec_seconds=exec_seconds,
94-
)
95-
96218

97219
def get_performance_stats(
98220
query_job: bigquery.QueryJob,

0 commit comments

Comments
 (0)