1515from __future__ import annotations
1616
1717import dataclasses
18+ import datetime
1819import os
19- from typing import Optional , Tuple
20+ from typing import Any , Mapping , Optional , Tuple , Union
2021
2122import google .cloud .bigquery as bigquery
22- import google .cloud .bigquery .job as bq_job
23+ from google .cloud .bigquery .job .load import LoadJob
24+ from google .cloud .bigquery .job .query import QueryJob
2325import google .cloud .bigquery .table as bq_table
2426
2527LOGGING_NAME_ENV_VAR = "BIGFRAMES_PERFORMANCE_LOG_NAME"
2628
2729
30+ @dataclasses .dataclass
31+ class JobMetadata :
32+ job_id : Optional [str ] = None
33+ query_id : Optional [str ] = None
34+ location : Optional [str ] = None
35+ project : Optional [str ] = None
36+ creation_time : Optional [datetime .datetime ] = None
37+ start_time : Optional [datetime .datetime ] = None
38+ end_time : Optional [datetime .datetime ] = None
39+ duration_seconds : Optional [float ] = None
40+ status : Optional [str ] = None
41+ total_bytes_processed : Optional [int ] = None
42+ total_slot_ms : Optional [int ] = None
43+ job_type : Optional [str ] = None
44+ error_result : Optional [Mapping [str , Any ]] = None
45+ cached : Optional [bool ] = None
46+ job_url : Optional [str ] = None
47+ query : Optional [str ] = None
48+ destination_table : Optional [str ] = None
49+ source_uris : Optional [list [str ]] = None
50+ input_files : Optional [int ] = None
51+ input_bytes : Optional [int ] = None
52+ output_rows : Optional [int ] = None
53+ source_format : Optional [str ] = None
54+
55+ @classmethod
56+ def from_job (
57+ cls , query_job : Union [QueryJob , LoadJob ], exec_seconds : Optional [float ] = None
58+ ) -> "JobMetadata" :
59+ query_text = getattr (query_job , "query" , None )
60+ if query_text and len (query_text ) > 1024 :
61+ query_text = query_text [:1021 ] + "..."
62+
63+ job_id = getattr (query_job , "job_id" , None )
64+ job_url = None
65+ if job_id :
66+ job_url = f"https://console.cloud.google.com/bigquery?project={ query_job .project } &j=bq:{ query_job .location } :{ job_id } &page=queryresults"
67+
68+ metadata = cls (
69+ job_id = query_job .job_id ,
70+ location = query_job .location ,
71+ project = query_job .project ,
72+ creation_time = query_job .created ,
73+ start_time = query_job .started ,
74+ end_time = query_job .ended ,
75+ duration_seconds = exec_seconds ,
76+ status = query_job .state ,
77+ job_type = query_job .job_type ,
78+ error_result = query_job .error_result ,
79+ query = query_text ,
80+ job_url = job_url ,
81+ )
82+ if isinstance (query_job , QueryJob ):
83+ metadata .cached = getattr (query_job , "cache_hit" , None )
84+ metadata .destination_table = (
85+ str (query_job .destination ) if query_job .destination else None
86+ )
87+ metadata .total_bytes_processed = getattr (
88+ query_job , "total_bytes_processed" , None
89+ )
90+ metadata .total_slot_ms = getattr (query_job , "slot_millis" , None )
91+ elif isinstance (query_job , LoadJob ):
92+ metadata .output_rows = getattr (query_job , "output_rows" , None )
93+ metadata .input_files = getattr (query_job , "input_files" , None )
94+ metadata .input_bytes = getattr (query_job , "input_bytes" , None )
95+ metadata .destination_table = (
96+ str (query_job .destination )
97+ if getattr (query_job , "destination" , None )
98+ else None
99+ )
100+ if getattr (query_job , "source_uris" , None ):
101+ metadata .source_uris = list (query_job .source_uris )
102+ if query_job .configuration and hasattr (
103+ query_job .configuration , "source_format"
104+ ):
105+ metadata .source_format = query_job .configuration .source_format
106+
107+ return metadata
108+
109+ @classmethod
110+ def from_row_iterator (
111+ cls , row_iterator : bq_table .RowIterator , exec_seconds : Optional [float ] = None
112+ ) -> "JobMetadata" :
113+ query_text = getattr (row_iterator , "query" , None )
114+ if query_text and len (query_text ) > 1024 :
115+ query_text = query_text [:1021 ] + "..."
116+
117+ job_id = getattr (row_iterator , "job_id" , None )
118+ job_url = None
119+ if job_id :
120+ project = getattr (row_iterator , "project" , "" )
121+ location = getattr (row_iterator , "location" , "" )
122+ job_url = f"https://console.cloud.google.com/bigquery?project={ project } &j=bq:{ location } :{ job_id } &page=queryresults"
123+
124+ return cls (
125+ job_id = job_id ,
126+ query_id = getattr (row_iterator , "query_id" , None ),
127+ location = getattr (row_iterator , "location" , None ),
128+ project = getattr (row_iterator , "project" , None ),
129+ creation_time = getattr (row_iterator , "created" , None ),
130+ start_time = getattr (row_iterator , "started" , None ),
131+ end_time = getattr (row_iterator , "ended" , None ),
132+ duration_seconds = exec_seconds ,
133+ status = "DONE" ,
134+ total_bytes_processed = getattr (row_iterator , "total_bytes_processed" , None ),
135+ total_slot_ms = getattr (row_iterator , "slot_millis" , None ),
136+ job_type = "query" ,
137+ cached = getattr (row_iterator , "cache_hit" , None ),
138+ query = query_text ,
139+ job_url = job_url ,
140+ )
141+
142+
28143@dataclasses .dataclass
29144class ExecutionMetrics :
30145 execution_count : int = 0
31146 slot_millis : int = 0
32147 bytes_processed : int = 0
33148 execution_secs : float = 0
34149 query_char_count : int = 0
150+ jobs : list [JobMetadata ] = dataclasses .field (default_factory = list )
35151
36152 def count_job_stats (
37153 self ,
38- query_job : Optional [bq_job . QueryJob ] = None ,
154+ query_job : Optional [Union [ QueryJob , LoadJob ] ] = None ,
39155 row_iterator : Optional [bq_table .RowIterator ] = None ,
40156 ):
41157 if query_job is None :
@@ -57,42 +173,71 @@ def count_job_stats(
57173 self .slot_millis += slot_millis
58174 self .execution_secs += exec_seconds
59175
60- elif query_job .configuration .dry_run :
61- query_char_count = len (query_job .query )
176+ self .jobs .append (
177+ JobMetadata .from_row_iterator (row_iterator , exec_seconds = exec_seconds )
178+ )
179+
180+ elif isinstance (query_job , QueryJob ) and query_job .configuration .dry_run :
181+ query_char_count = len (getattr (query_job , "query" , "" ))
62182
63183 # TODO(tswast): Pass None after making benchmark publishing robust to missing data.
64184 bytes_processed = 0
65185 slot_millis = 0
66186 exec_seconds = 0.0
67187
68- elif (stats := get_performance_stats (query_job )) is not None :
69- query_char_count , bytes_processed , slot_millis , exec_seconds = stats
188+ elif isinstance (query_job , bigquery .QueryJob ):
189+ if (stats := get_performance_stats (query_job )) is not None :
190+ query_char_count , bytes_processed , slot_millis , exec_seconds = stats
191+ self .execution_count += 1
192+ self .query_char_count += query_char_count or 0
193+ self .bytes_processed += bytes_processed or 0
194+ self .slot_millis += slot_millis or 0
195+ self .execution_secs += exec_seconds or 0
196+
197+ metadata = JobMetadata .from_job (query_job , exec_seconds = exec_seconds )
198+ metadata .total_bytes_processed = bytes_processed
199+ metadata .total_slot_ms = slot_millis
200+ self .jobs .append (metadata )
201+
202+ else :
70203 self .execution_count += 1
71- self .query_char_count += query_char_count or 0
72- self .bytes_processed += bytes_processed or 0
73- self .slot_millis += slot_millis or 0
74- self .execution_secs += exec_seconds or 0
204+ duration = (
205+ (query_job .ended - query_job .created ).total_seconds ()
206+ if query_job .ended and query_job .created
207+ else None
208+ )
209+ self .jobs .append (JobMetadata .from_job (query_job , exec_seconds = duration ))
210+
211+ # For pytest runs only, log information about the query job
212+ # to a file in order to create a performance report.
213+ if (
214+ isinstance (query_job , bigquery .QueryJob )
215+ and not query_job .configuration .dry_run
216+ ):
217+ stats = get_performance_stats (query_job )
218+ if stats :
219+ write_stats_to_disk (
220+ query_char_count = stats [0 ],
221+ bytes_processed = stats [1 ],
222+ slot_millis = stats [2 ],
223+ exec_seconds = stats [3 ],
224+ )
225+ elif row_iterator is not None :
226+ bytes_processed = getattr (row_iterator , "total_bytes_processed" , 0 ) or 0
227+ query_char_count = len (getattr (row_iterator , "query" , "" ) or "" )
228+ slot_millis = getattr (row_iterator , "slot_millis" , 0 ) or 0
229+ created = getattr (row_iterator , "created" , None )
230+ ended = getattr (row_iterator , "ended" , None )
231+ exec_seconds = (
232+ (ended - created ).total_seconds () if created and ended else 0.0
233+ )
75234 write_stats_to_disk (
76235 query_char_count = query_char_count ,
77236 bytes_processed = bytes_processed ,
78237 slot_millis = slot_millis ,
79238 exec_seconds = exec_seconds ,
80239 )
81240
82- else :
83- # TODO(tswast): Pass None after making benchmark publishing robust to missing data.
84- bytes_processed = 0
85- query_char_count = 0
86- slot_millis = 0
87- exec_seconds = 0
88-
89- write_stats_to_disk (
90- query_char_count = query_char_count ,
91- bytes_processed = bytes_processed ,
92- slot_millis = slot_millis ,
93- exec_seconds = exec_seconds ,
94- )
95-
96241
97242def get_performance_stats (
98243 query_job : bigquery .QueryJob ,
0 commit comments