1818 ReformImpactsService ,
1919)
2020from policyengine_api .endpoints .economy .compare import compare_economic_outputs
21- from policyengine_api .endpoints .economy .reform_impact import set_comment_on_job
2221from policyengine_api .constants import COUNTRY_PACKAGE_VERSIONS
2322from policyengine_api .country import COUNTRIES , create_policy_reform
2423from policyengine_api .utils .v2_v1_comparison import (
3433
3534from policyengine_us import Microsimulation
3635from policyengine_uk import Microsimulation
37- import logging
38- import huggingface_hub
3936
4037load_dotenv ()
4138
8178)
8279
8380if not check_against_api_v2 :
84- logging .warn (
85- "Didn't find any GOOGLE_APPLICATION_CREDENTIALS, so will not check APIv1 results against APIv2."
81+ logger .log_text (
82+ "Didn't find any GOOGLE_APPLICATION_CREDENTIALS, so will not check APIv1 results against APIv2." ,
83+ severity = "WARNING" ,
8684 )
8785
8886
@@ -104,7 +102,28 @@ def run(
104102 baseline_policy : dict ,
105103 reform_policy : Annotated [str , "String-formatted JSON" ],
106104 ):
107- print (f"Starting CalculateEconomySimulationJob.run" )
105+ job_id = self ._set_job_id ()
106+ job_setup_options = {
107+ "job_id" : job_id ,
108+ "job_type" : "CALCULATE_ECONOMY_SIMULATION_JOB" ,
109+ "baseline_policy_id" : baseline_policy_id ,
110+ "reform_policy_id" : policy_id ,
111+ "country_id" : country_id ,
112+ "region" : region ,
113+ "dataset" : dataset ,
114+ "time_period" : time_period ,
115+ "options" : options ,
116+ "baseline_policy" : baseline_policy ,
117+ "reform_policy" : reform_policy ,
118+ }
119+ logger .log_struct (
120+ {
121+ "message" : "Starting job with job_id {job_id}" ,
122+ ** job_setup_options ,
123+ },
124+ severity = "INFO" ,
125+ )
126+
108127 try :
109128 # Configure inputs
110129 # Note for anyone modifying options_hash: redis-queue treats ":" as a namespace
@@ -115,6 +134,13 @@ def run(
115134 baseline_policy_id = int (baseline_policy_id )
116135 policy_id = int (policy_id )
117136
137+ logger .log_struct (
138+ {
139+ "message" : "Checking if completed result already exists" ,
140+ ** job_setup_options ,
141+ }
142+ )
143+
118144 # Check if a completed result already exists
119145 existing = reform_impacts_service .get_all_reform_impacts (
120146 country_id ,
@@ -127,7 +153,12 @@ def run(
127153 COUNTRY_PACKAGE_VERSIONS [country_id ],
128154 )
129155 if any (x ["status" ] == "ok" for x in existing ):
130- print (f"Job already completed successfully" )
156+ logger .log_struct (
157+ {
158+ "message" : "Found existing completed result" ,
159+ ** job_setup_options ,
160+ }
161+ )
131162 return
132163
133164 # Save identifiers for later commenting on processing status
@@ -141,7 +172,12 @@ def run(
141172 options_hash ,
142173 )
143174
144- print ("Checking existing reform impacts..." )
175+ logger .log_struct (
176+ {
177+ "message" : "No existing completed result found, proceeding with computation" ,
178+ ** job_setup_options ,
179+ }
180+ )
145181 # Query existing impacts before deleting
146182 existing = reform_impacts_service .get_all_reform_impacts (
147183 country_id ,
@@ -153,7 +189,6 @@ def run(
153189 options_hash ,
154190 COUNTRY_PACKAGE_VERSIONS [country_id ],
155191 )
156- print (f"Found { len (existing )} existing impacts before delete" )
157192
158193 # Delete any existing reform impact rows with the same identifiers
159194 reform_impacts_service .delete_reform_impact (
@@ -166,8 +201,12 @@ def run(
166201 options_hash ,
167202 )
168203
169- print ("Deleted existing computing impacts" )
170-
204+ logger .log_struct (
205+ {
206+ "message" : "Creating new reform impact computation process" ,
207+ ** job_setup_options ,
208+ }
209+ )
171210 # Insert new reform impact row with status 'computing'
172211 reform_impacts_service .set_reform_impact (
173212 country_id = country_id ,
@@ -187,49 +226,53 @@ def run(
187226 ),
188227 )
189228
190- comment = lambda x : set_comment_on_job (x , * identifiers )
191- comment ("Computing baseline" )
229+ logger .log_struct (
230+ {
231+ "message" : "Starting computation of baseline policy" ,
232+ ** job_setup_options ,
233+ }
234+ )
192235
193236 # If comparing against API v2, start job
194237 if check_against_api_v2 :
195238
196- # Populate v2/v1 comparison config data; we will pass this
197- # to GCP logs either on error or success
198- comparison_data = {
199- "country_id" : country_id ,
200- "region" : region ,
201- "reform_policy" : reform_policy ,
202- "baseline_policy" : baseline_policy ,
203- "reform_policy_id" : policy_id ,
204- "baseline_policy_id" : baseline_policy_id ,
205- "time_period" : time_period ,
206- "dataset" : dataset ,
207- "v1_country_package_version" : COUNTRY_PACKAGE_VERSIONS [
208- country_id
209- ],
210- "v2_id" : None , # Unavailable until job starts
211- "v2_country_package_version" : None , # Unavailable until job completes
212- }
239+ try :
240+ # Populate v2/v1 comparison config data; we will pass this
241+ # to GCP logs either on error or success
242+ comparison_data = {
243+ ** job_setup_options ,
244+ "v1_country_package_version" : COUNTRY_PACKAGE_VERSIONS [
245+ country_id
246+ ],
247+ "v2_id" : None , # Unavailable until job starts
248+ "v2_country_package_version" : None , # Unavailable until job completes
249+ }
213250
214- # Set up APIv2 job
215- comment ("Setting up APIv2 job" )
216- sim_config : dict [str , Any ] = self .api_v2 ._setup_sim_options (
217- country_id = country_id ,
218- scope = "macro" ,
219- reform_policy = reform_policy ,
220- baseline_policy = baseline_policy ,
221- time_period = time_period ,
222- region = region ,
223- dataset = dataset ,
224- model_version = COUNTRY_PACKAGE_VERSIONS [country_id ],
225- data_version = (
226- uk_dataset_version
227- if country_id == "uk"
228- else us_dataset_version
229- ),
230- )
251+ # Set up APIv2 job
252+ logger .log_struct (
253+ {
254+ "message" : "Setting up APIv2 job" ,
255+ ** comparison_data ,
256+ }
257+ )
258+ sim_config : dict [str , Any ] = (
259+ self .api_v2 ._setup_sim_options (
260+ country_id = country_id ,
261+ scope = "macro" ,
262+ reform_policy = reform_policy ,
263+ baseline_policy = baseline_policy ,
264+ time_period = time_period ,
265+ region = region ,
266+ dataset = dataset ,
267+ model_version = COUNTRY_PACKAGE_VERSIONS [country_id ],
268+ data_version = (
269+ uk_dataset_version
270+ if country_id == "uk"
271+ else us_dataset_version
272+ ),
273+ )
274+ )
231275
232- try :
233276 api_v2_execution = self .api_v2 .run (sim_config )
234277 execution_id = self .api_v2 .get_execution_id (
235278 api_v2_execution
@@ -245,7 +288,7 @@ def run(
245288 "v1_impact" : None ,
246289 "v2_impact" : None ,
247290 "v1_v2_diff" : None ,
248- "message" : "CALCULATE_ECONOMY_SIMULATION_JOB: APIv2 job started" ,
291+ "message" : "APIv2 job started" ,
249292 }
250293 )
251294 )
@@ -266,6 +309,12 @@ def run(
266309 )
267310
268311 # Compute baseline economy
312+ logger .log_struct (
313+ {
314+ "message" : "Computing baseline economy..." ,
315+ ** job_setup_options ,
316+ }
317+ )
269318 baseline_economy = self ._compute_economy (
270319 country_id = country_id ,
271320 region = region ,
@@ -274,9 +323,14 @@ def run(
274323 options = options ,
275324 policy_json = baseline_policy ,
276325 )
277- comment ("Computing reform" )
278326
279327 # Compute reform economy
328+ logger .log_struct (
329+ {
330+ "message" : "Computing reform economy..." ,
331+ ** job_setup_options ,
332+ }
333+ )
280334 reform_economy = self ._compute_economy (
281335 country_id = country_id ,
282336 region = region ,
@@ -288,7 +342,12 @@ def run(
288342
289343 baseline_economy = baseline_economy ["result" ]
290344 reform_economy = reform_economy ["result" ]
291- comment ("Comparing baseline and reform" )
345+ logger .log_struct (
346+ {
347+ "message" : "Computed baseline and reform economies" ,
348+ ** job_setup_options ,
349+ }
350+ )
292351 impact : dict [str , Any ] = compare_economic_outputs (
293352 baseline_economy , reform_economy , country_id = country_id
294353 )
@@ -314,7 +373,7 @@ def run(
314373 "v2_impact" : api_v2_output ,
315374 "v1_v2_diff" : None ,
316375 "v2_country_package_version" : v2_country_package_version ,
317- "message" : "CALCULATE_ECONOMY_SIMULATION_JOB: APIv2 job completed" ,
376+ "message" : "APIv2 job completed" ,
318377 }
319378 )
320379 )
@@ -337,7 +396,7 @@ def run(
337396 "v2_impact" : api_v2_output ,
338397 "v1_v2_diff" : v1_v2_diff ,
339398 "v2_country_package_version" : v2_country_package_version ,
340- "message" : "CALCULATE_ECONOMY_SIMULATION_JOB: APIv2 job comparison with APIv1 completed" ,
399+ "message" : "APIv2 job comparison with APIv1 completed" ,
341400 }
342401 )
343402 )
@@ -356,7 +415,7 @@ def run(
356415 "v1_impact" : impact ,
357416 "v2_impact" : None ,
358417 "v1_v2_diff" : None ,
359- "message" : "CALCULATE_ECONOMY_SIMULATION_JOB: APIv2 job failed" ,
418+ "message" : "APIv2 job failed" ,
360419 }
361420 )
362421 logger .log_struct (
@@ -390,14 +449,7 @@ def run(
390449 # Show that API v1 failed and API v2 was not run
391450 error_log : V2V1Comparison = V2V1Comparison .model_validate (
392451 {
393- "country_id" : country_id ,
394- "region" : region ,
395- "reform_policy" : reform_policy ,
396- "baseline_policy" : baseline_policy ,
397- "reform_policy_id" : policy_id ,
398- "baseline_policy_id" : baseline_policy_id ,
399- "time_period" : time_period ,
400- "dataset" : dataset ,
452+ ** job_setup_options ,
401453 "v1_country_package_version" : COUNTRY_PACKAGE_VERSIONS [
402454 country_id
403455 ],
@@ -408,15 +460,30 @@ def run(
408460 "v1_impact" : None ,
409461 "v2_impact" : None ,
410462 "v1_v2_diff" : None ,
411- "message" : "CALCULATE_ECONOMY_SIMULATION_JOB: APIv1 job failed" ,
463+ "message" : "APIv1 job failed" ,
412464 }
413465 )
414466 logger .log_struct (
415467 error_log .model_dump (mode = "json" ), severity = "ERROR"
416468 )
417- print (f"Error setting reform impact: { str (e )} " )
469+ logger .log_struct (
470+ {
471+ "message" : "Error during job execution" ,
472+ "error" : str (e ),
473+ ** job_setup_options ,
474+ }
475+ )
418476 raise e
419477
478+ def _set_job_id (self ) -> str :
479+ """
480+ Generate a unique job ID based on the current timestamp and a random number.
481+ This is used to track the job in the database and logs.
482+ """
483+ timestamp = datetime .datetime .now ().strftime ("%Y%m%d%H%M%S" )
484+ random_number = np .random .randint (1000 , 9999 )
485+ return f"job_{ timestamp } _{ random_number } "
486+
420487 def _compute_economy (
421488 self , country_id , region , dataset , time_period , options , policy_json
422489 ):
0 commit comments