@@ -95,14 +95,17 @@ class CalcCascade(Calc):
9595 )
9696 """Names of the required uncertainty variables"""
9797
98- _metric_names = ("aai_agg" , "freq_curve" , "at_event" , "eai_exp" )
98+ _metric_names = [
99+ "imp_met"
100+ ] # need to be a list instead of tuple in case of unique metric
99101 """Names of the cost benefit output metrics"""
100102
101103 def __init__ (
102104 self ,
103105 nw_input_var : Union [InputVar , Exposures ],
104106 impf_input_var : Union [InputVar , ImpactFuncSet ],
105107 haz_input_var : Union [InputVar , Hazard ],
108+ ci_types : list ,
106109 ):
107110 """Initialize UncCalcImpact
108111
@@ -124,16 +127,14 @@ def __init__(
124127 self .nw_input_var = InputVar .var_to_inputvar (nw_input_var )
125128 self .impf_input_var = InputVar .var_to_inputvar (impf_input_var )
126129 self .haz_input_var = InputVar .var_to_inputvar (haz_input_var )
127-
130+ self . ci_types = ci_types
128131 self .value_unit = "people"
129132 self .check_distr ()
130133
131134 def uncertainty (
132135 self ,
133136 unc_sample ,
134- df_dependencies ,
135137 friction_surf ,
136- ci_types = None ,
137138 processes = 1 ,
138139 chunksize = None ,
139140 ):
@@ -212,26 +213,18 @@ def uncertainty(
212213 if chunksize is None :
213214 chunksize = _multiprocess_chunksize (samples_df , processes )
214215 unit = self .value_unit
215-
216- if ci_types is None :
217- ci_types = df_dependencies .source .unique ().tolist () + ["people" ]
218-
219- self .ci_types = ci_types
220- self .df_dependencies = df_dependencies
221216 self .friction_surf = friction_surf
222-
223217 one_sample = samples_df .iloc [0 :1 ]
224218 start = time .time ()
225219 self ._compute_metrics (one_sample , chunksize = 1 , processes = 1 )
226220 elapsed_time = time .time () - start
227221 self .est_comp_time (unc_sample .n_samples , elapsed_time , processes )
228222
229- imp_met_dict = self ._compute_metrics (
223+ imp_met_list = self ._compute_metrics (
230224 samples_df , chunksize = chunksize , processes = processes
231225 )
232-
233226 # Assign computed impact distribution data to self
234- imp_met_unc_df = pd .DataFrame (imp_met_dict )
227+ imp_met_unc_df = pd .DataFrame (imp_met_list , index = self . ci_types ). T
235228 # freq_curve_unc_df = pd.DataFrame(
236229 # freq_curve_list, columns=["rp" + str(n) for n in rp]
237230 # )
@@ -246,7 +239,6 @@ def uncertainty(
246239 # )
247240 # else:
248241 # coord_df = pd.DataFrame([])
249-
250242 return UncCascadeOutput (
251243 samples_df = samples_df ,
252244 unit = unit ,
@@ -283,11 +275,14 @@ def _compute_metrics(self, samples_df, chunksize, processes):
283275 impf_input_var = self .impf_input_var ,
284276 haz_input_var = self .haz_input_var ,
285277 ci_types = self .ci_types ,
286- df_dependencies = self .df_dependencies ,
287278 friction_surf = self .friction_surf ,
288279 )
289280 if processes > 1 :
290- with mp .Pool (processes = processes ) as pool :
281+ from multiprocess import get_context
282+
283+ # need to use multiprocess instead of multiprocessing
284+ # to avoid pickling error and get_context to avoid deadlocks
285+ with get_context ("spawn" ).Pool (processes = processes ) as pool :
291286 LOGGER .info ("Using %s CPUs." , processes )
292287 imp_metrics = pool .starmap (_map_impact_calc , p_iterator )
293288 else :
@@ -296,6 +291,7 @@ def _compute_metrics(self, samples_df, chunksize, processes):
296291 # Perform the actual computation
297292 with log_level (level = "ERROR" , name_prefix = "climada" ):
298293 return _transpose_chunked_data (imp_metrics )
294+ # return imp_metrics
299295
300296
301297def _map_impact_calc (
@@ -304,7 +300,6 @@ def _map_impact_calc(
304300 impf_input_var ,
305301 haz_input_var ,
306302 ci_types ,
307- df_dependencies ,
308303 friction_surf ,
309304):
310305 """
@@ -332,13 +327,14 @@ def _map_impact_calc(
332327 (np.array([]) if self.calc_at_event=False).
333328
334329 """
335- uncertainty_values = {k : [] for k in ci_types }
330+ # uncertainty_values = {k: [] for k in ci_types}
331+ uncertainty_values = []
336332 for _ , sample in sample_chunks .iterrows ():
337333 nw_samples = sample [nw_input_var .labels ].to_dict ()
338334 impf_samples = sample [impf_input_var .labels ].to_dict ()
339335 haz_samples = sample [haz_input_var .labels ].to_dict ()
340336
341- nw = nw_input_var .evaluate (** nw_samples ) # create network
337+ nw , df_dep = nw_input_var .evaluate (** nw_samples ) # create network
342338 impf = impf_input_var .evaluate (** impf_samples )
343339 haz = haz_input_var .evaluate (** haz_samples )
344340
@@ -351,7 +347,7 @@ def _map_impact_calc(
351347 ci_graph_disr = Graph (nw_disr , directed = False )
352348 ci_graph_disr = cascade (
353349 ci_graph_disr ,
354- df_dependencies ,
350+ df_dep ,
355351 friction_surf = friction_surf ,
356352 initial = False ,
357353 criterion = "distance" ,
@@ -362,15 +358,15 @@ def _map_impact_calc(
362358 imp_dict = nwu .disaster_impact_allservices_df (
363359 nw .nodes , nw_disr .nodes , services = ci_types
364360 )
365- if "people" in ci_types :
366- imp_dict [ "people" ] = sum (
367- nw_disr . nodes [ nw_disr . nodes . ci_type == "people" ]. imp_dir
368- )
369-
370- {uncertainty_values [k ].append (v ) for k , v in imp_dict .items ()}
371- # uncertainty_values.append([v for v in imp_dict.values()])
372-
373- return uncertainty_values # list(zip(*uncertainty_values))
361+ imp_types = [
362+ ci_type + "_access" if ci_type != "people" else ci_type
363+ for ci_type in ci_types
364+ ]
365+ print ( imp_types )
366+ # {uncertainty_values[k].append(v) for k, v in imp_dict.items()}
367+ imp_list = [ imp_dict [ imp_type ] for imp_type in imp_types ]
368+ uncertainty_values . append ( imp_list )
369+ return list (zip (* uncertainty_values ))
374370
375371
376372## For now, copy the nw functions here
@@ -480,7 +476,7 @@ def disrupt_network(network, haz, impf_thresh_set, ci_types=None, res_disagg=500
480476 for exp in exp_list :
481477 impf = impf_thresh_set .getImpf (exp .description )
482478 exp .gdf [f"impf_{ haz .haz_type } " ] = impf .id
483- imp = calc_point_impacts (haz , exp , impf )
479+ imp = calc_point_impacts (haz , exp , ImpactFuncSet ([ impf ]) )
484480 if exp .description in ["road" ]:
485481 imp = u_lp .impact_pnt_agg (imp , exp .gdf , u_lp .AggMethod .SUM )
486482 network_disr = impacts_to_network (
0 commit comments