@@ -52,6 +52,85 @@ class JobMetadata:
5252 output_rows : Optional [int ] = None
5353 source_format : Optional [str ] = None
5454
55+ @classmethod
56+ def from_job (
57+ cls , query_job : Union [QueryJob , LoadJob ], exec_seconds : Optional [float ] = None
58+ ) -> "JobMetadata" :
59+ query_text = getattr (query_job , "query" , None )
60+ if query_text and len (query_text ) > 1024 :
61+ query_text = query_text [:1021 ] + "..."
62+
63+ metadata = cls (
64+ job_id = query_job .job_id ,
65+ location = query_job .location ,
66+ project = query_job .project ,
67+ creation_time = query_job .created ,
68+ start_time = query_job .started ,
69+ end_time = query_job .ended ,
70+ duration_seconds = exec_seconds ,
71+ status = query_job .state ,
72+ job_type = query_job .job_type ,
73+ error_result = query_job .error_result ,
74+ query = query_text ,
75+ job_url = f"https://console.cloud.google.com/bigquery?project={ query_job .project } &j=bq:{ query_job .location } :{ query_job .job_id } &page=queryresults"
76+ if getattr (query_job , "job_id" , None )
77+ else None ,
78+ )
79+ if isinstance (query_job , QueryJob ):
80+ metadata .cached = getattr (query_job , "cache_hit" , None )
81+ metadata .destination_table = (
82+ str (query_job .destination ) if query_job .destination else None
83+ )
84+ metadata .total_bytes_processed = getattr (
85+ query_job , "total_bytes_processed" , None
86+ )
87+ metadata .total_slot_ms = getattr (query_job , "slot_millis" , None )
88+ elif isinstance (query_job , LoadJob ):
89+ metadata .output_rows = getattr (query_job , "output_rows" , None )
90+ metadata .input_files = getattr (query_job , "input_files" , None )
91+ metadata .input_bytes = getattr (query_job , "input_bytes" , None )
92+ metadata .destination_table = (
93+ str (query_job .destination )
94+ if getattr (query_job , "destination" , None )
95+ else None
96+ )
97+ if getattr (query_job , "source_uris" , None ):
98+ metadata .source_uris = list (query_job .source_uris )
99+ if query_job .configuration and hasattr (
100+ query_job .configuration , "source_format"
101+ ):
102+ metadata .source_format = query_job .configuration .source_format
103+
104+ return metadata
105+
106+ @classmethod
107+ def from_row_iterator (
108+ cls , row_iterator : bq_table .RowIterator , exec_seconds : Optional [float ] = None
109+ ) -> "JobMetadata" :
110+ query_text = getattr (row_iterator , "query" , None )
111+ if query_text and len (query_text ) > 1024 :
112+ query_text = query_text [:1021 ] + "..."
113+
114+ return cls (
115+ job_id = getattr (row_iterator , "job_id" , None ),
116+ query_id = getattr (row_iterator , "query_id" , None ),
117+ location = getattr (row_iterator , "location" , None ),
118+ project = getattr (row_iterator , "project" , None ),
119+ creation_time = getattr (row_iterator , "created" , None ),
120+ start_time = getattr (row_iterator , "started" , None ),
121+ end_time = getattr (row_iterator , "ended" , None ),
122+ duration_seconds = exec_seconds ,
123+ status = "DONE" ,
124+ total_bytes_processed = getattr (row_iterator , "total_bytes_processed" , None ),
125+ total_slot_ms = getattr (row_iterator , "slot_millis" , None ),
126+ job_type = "query" ,
127+ cached = getattr (row_iterator , "cache_hit" , None ),
128+ query = query_text ,
129+ job_url = f"https://console.cloud.google.com/bigquery?project={ getattr (row_iterator , 'project' , '' )} &j=bq:{ getattr (row_iterator , 'location' , '' )} :{ getattr (row_iterator , 'job_id' , '' )} &page=queryresults"
130+ if getattr (row_iterator , "job_id" , None )
131+ else None ,
132+ )
133+
55134
56135@dataclasses .dataclass
57136class ExecutionMetrics :
@@ -87,25 +166,7 @@ def count_job_stats(
87166 self .execution_secs += exec_seconds
88167
89168 self .jobs .append (
90- JobMetadata (
91- job_id = getattr (row_iterator , "job_id" , None ),
92- query_id = getattr (row_iterator , "query_id" , None ),
93- location = getattr (row_iterator , "location" , None ),
94- project = getattr (row_iterator , "project" , None ),
95- creation_time = created ,
96- start_time = getattr (row_iterator , "started" , None ),
97- end_time = ended ,
98- duration_seconds = exec_seconds ,
99- status = "DONE" ,
100- total_bytes_processed = bytes_processed ,
101- total_slot_ms = slot_millis ,
102- job_type = "query" ,
103- cached = getattr (row_iterator , "cache_hit" , None ),
104- query = getattr (row_iterator , "query" , None ),
105- job_url = f"https://console.cloud.google.com/bigquery?project={ getattr (row_iterator , 'project' , '' )} &j=bq:{ getattr (row_iterator , 'location' , '' )} :{ getattr (row_iterator , 'job_id' , '' )} &page=queryresults"
106- if getattr (row_iterator , "job_id" , None )
107- else None ,
108- )
169+ JobMetadata .from_row_iterator (row_iterator , exec_seconds = exec_seconds )
109170 )
110171
111172 elif isinstance (query_job , QueryJob ) and query_job .configuration .dry_run :
@@ -125,28 +186,10 @@ def count_job_stats(
125186 self .slot_millis += slot_millis or 0
126187 self .execution_secs += exec_seconds or 0
127188
128- self .jobs .append (
129- JobMetadata (
130- job_id = query_job .job_id ,
131- location = query_job .location ,
132- project = query_job .project ,
133- creation_time = query_job .created ,
134- start_time = query_job .started ,
135- end_time = query_job .ended ,
136- duration_seconds = exec_seconds ,
137- status = query_job .state ,
138- total_bytes_processed = bytes_processed ,
139- total_slot_ms = slot_millis ,
140- job_type = query_job .job_type ,
141- error_result = query_job .error_result ,
142- cached = query_job .cache_hit ,
143- query = query_job .query ,
144- destination_table = str (query_job .destination )
145- if query_job .destination
146- else None ,
147- job_url = f"https://console.cloud.google.com/bigquery?project={ query_job .project } &j=bq:{ query_job .location } :{ query_job .job_id } &page=queryresults" ,
148- )
149- )
189+ metadata = JobMetadata .from_job (query_job , exec_seconds = exec_seconds )
190+ metadata .total_bytes_processed = bytes_processed
191+ metadata .total_slot_ms = slot_millis
192+ self .jobs .append (metadata )
150193
151194 else :
152195 # Handle other job types (e.g. LoadJob)
@@ -156,36 +199,7 @@ def count_job_stats(
156199 if query_job .ended and query_job .created
157200 else None
158201 )
159-
160- job_metadata = JobMetadata (
161- job_id = query_job .job_id ,
162- location = query_job .location ,
163- project = query_job .project ,
164- creation_time = query_job .created ,
165- start_time = query_job .started ,
166- end_time = query_job .ended ,
167- duration_seconds = duration ,
168- status = query_job .state ,
169- job_type = query_job .job_type ,
170- error_result = query_job .error_result ,
171- job_url = f"https://console.cloud.google.com/bigquery?project={ query_job .project } &j=bq:{ query_job .location } :{ query_job .job_id } &page=queryresults" ,
172- )
173-
174- if isinstance (query_job , bigquery .LoadJob ):
175- job_metadata .output_rows = getattr (query_job , "output_rows" , None )
176- job_metadata .input_files = getattr (query_job , "input_files" , None )
177- job_metadata .input_bytes = getattr (query_job , "input_bytes" , None )
178- job_metadata .destination_table = (
179- str (query_job .destination ) if query_job .destination else None
180- )
181- if query_job .source_uris :
182- job_metadata .source_uris = list (query_job .source_uris )
183- if query_job .configuration and hasattr (
184- query_job .configuration , "source_format"
185- ):
186- job_metadata .source_format = query_job .configuration .source_format
187-
188- self .jobs .append (job_metadata )
202+ self .jobs .append (JobMetadata .from_job (query_job , exec_seconds = duration ))
189203
190204 # For pytest runs only, log information about the query job
191205 # to a file in order to create a performance report.
0 commit comments