|
29 | 29 |
|
30 | 30 | import os |
31 | 31 | import re |
| 32 | +import pickle |
32 | 33 |
|
33 | 34 | import numpy as np |
34 | 35 |
|
35 | 36 | import rmgpy.util as util |
36 | 37 | from rmgpy.species import Species |
37 | 38 | from rmgpy.tools.data import GenericData |
38 | 39 | from rmgpy.tools.plot import parse_csv_data, plot_sensitivity, ReactionSensitivityPlot, ThermoSensitivityPlot |
| 40 | +from rmgpy.kinetics.surface import SurfaceArrheniusBEP, StickingCoefficientBEP, SurfaceArrheniusBEP |
39 | 41 |
|
40 | 42 |
|
41 | 43 | class ThermoParameterUncertainty(object): |
@@ -333,6 +335,9 @@ def __init__(self, species_list=None, reaction_list=None, output_directory=''): |
333 | 335 | self.all_kinetic_sources = None |
334 | 336 | self.thermo_input_uncertainties = None |
335 | 337 | self.kinetic_input_uncertainties = None |
| 338 | + self.thermo_covariance_matrix = None |
| 339 | + self.kinetic_covariance_matrix = None |
| 340 | + self.overall_covariance_matrix = None |
336 | 341 | self.output_directory = output_directory if output_directory else os.getcwd() |
337 | 342 |
|
338 | 343 | # For extra species needed for correlated analysis but not in model |
@@ -1084,6 +1089,315 @@ def local_analysis(self, sensitive_species, reaction_system_index=0, correlated= |
1084 | 1089 |
|
1085 | 1090 | return output |
1086 | 1091 |
|
| 1092 | + def get_thermo_covariance_matrix(self): |
| 1093 | + """ |
| 1094 | + Export the thermo covariance matrix as a numpy array |
| 1095 | + """ |
| 1096 | + assert self.thermo_input_uncertainties is not None, 'Must call assign_parameter_uncertainties first' |
| 1097 | + assert len(self.thermo_input_uncertainties) > 0, 'No thermodynamic parameters found' |
| 1098 | + if isinstance(self.thermo_input_uncertainties[0], np.float64): |
| 1099 | + print("""Warning -- parameter uncertainties assigned without correlations. |
| 1100 | +All off diagonals will be zero unless you call assign_parameter_uncertainties(correlated=True)""") |
| 1101 | + self.thermo_covariance_matrix = np.float_power(np.diag(self.thermo_input_uncertainties), 2.0) |
| 1102 | + return self.thermo_covariance_matrix |
| 1103 | + |
| 1104 | + self.thermo_covariance_matrix = np.zeros((len(self.species_list), len(self.species_list))) |
| 1105 | + |
| 1106 | + for i in range(len(self.species_list)): |
| 1107 | + for j in range(len(self.species_list)): |
| 1108 | + |
| 1109 | + # assuming only sources that match are correlated |
| 1110 | + for source_i in self.thermo_input_uncertainties[i].keys(): |
| 1111 | + if source_i in self.thermo_input_uncertainties[j].keys(): |
| 1112 | + self.thermo_covariance_matrix[i, j] += self.thermo_input_uncertainties[i][source_i] * self.thermo_input_uncertainties[j][source_i] |
| 1113 | + |
| 1114 | + # load correlations between adsorbates calculated with BEEF vdW |
| 1115 | + # load the saved species ensemble data: this includes a numpy matrix of dG values for each species and a pickle of the species adjacency list |
| 1116 | + if self.thermo_corr_dir is not None: |
| 1117 | + def get_i_spec(spec, species_list): |
| 1118 | + for i in range(len(species_list)): |
| 1119 | + if spec.is_isomorphic(species_list[i]): |
| 1120 | + return i |
| 1121 | + return None |
| 1122 | + |
| 1123 | + def get_i_group(group, group_list): |
| 1124 | + for i in range(len(group_list)): |
| 1125 | + if group.is_identical(group_list[i]): |
| 1126 | + return i |
| 1127 | + return None |
| 1128 | + |
| 1129 | + # load the ensemble data |
| 1130 | + # TODO - generalize this |
| 1131 | + my_library = 'surfaceThermoPt111' |
| 1132 | + corr_thermo_data_file = os.path.join(self.thermo_corr_dir, f'{my_library}_cov.npy') |
| 1133 | + self.thermo_corr_data = {my_library: np.load(corr_thermo_data_file) / 4.18 / 4.18} # convert from kJ^2/mol^2 to kcal^2/mol^2 |
| 1134 | + molecules_file = os.path.join(os.path.join(self.thermo_corr_dir, f'{my_library}_molecules.pickle')) |
| 1135 | + with open(molecules_file, 'rb') as f: |
| 1136 | + # this should be a list of molecule adjacency lists |
| 1137 | + molecules_data = pickle.load(f) |
| 1138 | + correlated_species = [] |
| 1139 | + for i in range(len(molecules_data)): |
| 1140 | + sp = Species().from_adjacency_list(molecules_data[i]) |
| 1141 | + correlated_species.append(sp) |
| 1142 | + assert len(correlated_species) == self.thermo_corr_data[my_library].shape[0] |
| 1143 | + |
| 1144 | + assert 'adsorptionPt111' in self.database.thermo.groups, 'BEEF adsorption corrections require adsorptionPt111 group in the thermo database' |
| 1145 | + db_ads_group_items = [self.database.thermo.groups['adsorptionPt111'].entries[key].item for key in self.database.thermo.groups['adsorptionPt111'].entries] |
| 1146 | + |
| 1147 | + def get_species_indices_in_group(group): |
| 1148 | + species_used = [] |
| 1149 | + for j in range(len(correlated_species)): |
| 1150 | + if correlated_species[j].molecule[0].is_subgraph_isomorphic(group, generate_initial_map=True): |
| 1151 | + species_used.append(j) |
| 1152 | + return species_used |
| 1153 | + |
| 1154 | + def get_cov_sp_lists(sp_list_i, sp_list_j): |
| 1155 | + if len(sp_list_i) == 0 or len(sp_list_j) == 0: |
| 1156 | + return 0 |
| 1157 | + |
| 1158 | + cov = 0 |
| 1159 | + for i in sp_list_i: |
| 1160 | + for j in sp_list_j: |
| 1161 | + cov += self.thermo_corr_data[my_library][i, j] |
| 1162 | + cov /= len(sp_list_i) * len(sp_list_j) |
| 1163 | + return cov |
| 1164 | + |
| 1165 | + # now splice this into the big thermo covariance matrix |
| 1166 | + for i_spc in range(len(self.species_list)): |
| 1167 | + for j_spc in range(len(self.species_list)): |
| 1168 | + |
| 1169 | + # does species use surfaceThermoPt111 lin? |
| 1170 | + i_uses_pt111_lib = False |
| 1171 | + j_uses_pt111_lib = False |
| 1172 | + if 'surfaceThermoPt111' in self.species_list[i_spc].thermo.comment: |
| 1173 | + i_uses_pt111_lib = True |
| 1174 | + if 'surfaceThermoPt111' in self.species_list[j_spc].thermo.comment: |
| 1175 | + j_uses_pt111_lib = True |
| 1176 | + |
| 1177 | + # does species use adsorptionPt111 group? |
| 1178 | + i_ads_group = None |
| 1179 | + j_ads_group = None |
| 1180 | + if 'ADS' in self.species_sources_dict[self.species_list[i_spc]]: |
| 1181 | + i_ads_group = self.species_sources_dict[self.species_list[i_spc]]['ADS']['adsorptionPt111'][0][0].item |
| 1182 | + if 'ADS' in self.species_sources_dict[self.species_list[j_spc]]: |
| 1183 | + j_ads_group = self.species_sources_dict[self.species_list[j_spc]]['ADS']['adsorptionPt111'][0][0].item |
| 1184 | + |
| 1185 | + # two libraries |
| 1186 | + if i_uses_pt111_lib and j_uses_pt111_lib: |
| 1187 | + i_spc_beef = get_i_spec(self.species_list[i_spc], correlated_species) |
| 1188 | + j_spc_beef = get_i_spec(self.species_list[j_spc], correlated_species) |
| 1189 | + if i_spc_beef is not None and j_spc_beef is not None: |
| 1190 | + self.thermo_covariance_matrix[i_spc, j_spc] += self.thermo_corr_data[my_library][i_spc_beef, j_spc_beef] |
| 1191 | + else: |
| 1192 | + # TODO add default value here?? |
| 1193 | + pass |
| 1194 | + # two groups |
| 1195 | + elif i_ads_group is not None and j_ads_group is not None: |
| 1196 | + i_beef = get_i_group(i_ads_group, db_ads_group_items) |
| 1197 | + j_beef = get_i_group(j_ads_group, db_ads_group_items) |
| 1198 | + if i_beef is not None and j_beef is not None: |
| 1199 | + species_list_i = get_species_indices_in_group(db_ads_group_items[i_beef]) |
| 1200 | + species_list_j = get_species_indices_in_group(db_ads_group_items[j_beef]) |
| 1201 | + self.thermo_covariance_matrix[i_spc, j_spc] += get_cov_sp_lists(species_list_i, species_list_j) |
| 1202 | + else: |
| 1203 | + # TODO add default value here?? |
| 1204 | + pass |
| 1205 | + # one group and one library |
| 1206 | + elif i_uses_pt111_lib and j_ads_group is not None: |
| 1207 | + i_spc_beef = get_i_spec(self.species_list[i_spc], correlated_species) |
| 1208 | + if i_spc_beef is not None: |
| 1209 | + species_list_i = [i_spc_beef] |
| 1210 | + j_beef = get_i_group(j_ads_group, db_ads_group_items) |
| 1211 | + if j_beef is not None: |
| 1212 | + species_list_j = get_species_indices_in_group(db_ads_group_items[j_beef]) |
| 1213 | + self.thermo_covariance_matrix[i_spc, j_spc] += get_cov_sp_lists(species_list_i, species_list_j) |
| 1214 | + else: |
| 1215 | + # TODO add default value here?? |
| 1216 | + pass |
| 1217 | + elif j_uses_pt111_lib and i_ads_group is not None: |
| 1218 | + j_spc_beef = get_i_spec(self.species_list[j_spc], correlated_species) |
| 1219 | + if j_spc_beef is not None: |
| 1220 | + species_list_j = [j_spc_beef] |
| 1221 | + i_beef = get_i_group(i_ads_group, db_ads_group_items) |
| 1222 | + if i_beef is not None: |
| 1223 | + species_list_i = get_species_indices_in_group(db_ads_group_items[i_beef]) |
| 1224 | + self.thermo_covariance_matrix[i_spc, j_spc] += get_cov_sp_lists(species_list_i, species_list_j) |
| 1225 | + else: |
| 1226 | + # TODO add default value here?? |
| 1227 | + pass |
| 1228 | + |
| 1229 | + return self.thermo_covariance_matrix |
| 1230 | + |
| 1231 | + def get_kinetic_covariance_matrix(self, k_param_engine=None): |
| 1232 | + """ |
| 1233 | + Export the kinetic covariance matrix as a numpy array |
| 1234 | + """ |
| 1235 | + assert self.kinetic_input_uncertainties is not None, 'Must call assign_parameter_uncertainties first' |
| 1236 | + assert len(self.kinetic_input_uncertainties) > 0, 'No kinetic parameters found' |
| 1237 | + if isinstance(self.kinetic_input_uncertainties[0], np.float64): |
| 1238 | + print("""Warning -- parameter uncertainties assigned without correlations. |
| 1239 | +All off diagonals will be zero unless you call assign_parameter_uncertainties(correlated=True)""") |
| 1240 | + self.kinetic_covariance_matrix = np.float_power(np.diag(self.kinetic_input_uncertainties), 2.0) |
| 1241 | + return self.kinetic_covariance_matrix |
| 1242 | + |
| 1243 | + if k_param_engine is None: |
| 1244 | + k_param_engine = KineticParameterUncertainty() |
| 1245 | + |
| 1246 | + def species_in_list(new_species, species_list): |
| 1247 | + for sp in species_list: |
| 1248 | + if new_species.is_isomorphic(sp): |
| 1249 | + return True |
| 1250 | + return False |
| 1251 | + |
| 1252 | + self.kinetic_covariance_matrix = np.zeros((len(self.reaction_list), len(self.reaction_list))) |
| 1253 | + |
| 1254 | + # takes a while to load the family reaction maps # Julia required |
| 1255 | + auto_gen_family_rxn_maps = {} |
| 1256 | + if self.all_kinetic_sources is None: |
| 1257 | + self.compile_all_sources() |
| 1258 | + for family in self.all_kinetic_sources['Rate Rules'].keys(): |
| 1259 | + if self.database.kinetics.families[family].auto_generated: |
| 1260 | + auto_gen_family_rxn_maps[family] = self.database.kinetics.families[family].get_reaction_matches( |
| 1261 | + thermo_database=self.database.thermo, |
| 1262 | + remove_degeneracy=True, |
| 1263 | + get_reverse=True, |
| 1264 | + exact_matches_only=False, |
| 1265 | + fix_labels=True |
| 1266 | + ) |
| 1267 | + |
| 1268 | + for i, reaction in enumerate(self.reaction_list): |
| 1269 | + source_dict_i = self.reaction_sources_dict[self.reaction_list[i]] |
| 1270 | + for j, other_reaction in enumerate(self.reaction_list): |
| 1271 | + # assuming only sources that match are correlated |
| 1272 | + source_dict_j = self.reaction_sources_dict[self.reaction_list[j]] |
| 1273 | + |
| 1274 | + for source_i in self.kinetic_input_uncertainties[i].keys(): |
| 1275 | + if source_i in self.kinetic_input_uncertainties[j].keys(): |
| 1276 | + self.kinetic_covariance_matrix[i, j] += self.kinetic_input_uncertainties[i][source_i] * self.kinetic_input_uncertainties[j][source_i] |
| 1277 | + else: |
| 1278 | + # no match in rules, but there may be overlap if they're SIDT trees using the same family |
| 1279 | + if 'Rate Rules' in source_dict_i.keys() and 'Rate Rules' in source_dict_j.keys(): |
| 1280 | + if source_dict_i['Rate Rules'][1]['autogenerated'] and source_dict_j['Rate Rules'][1]['autogenerated'] and \ |
| 1281 | + source_dict_i['Rate Rules'][0] == source_dict_j['Rate Rules'][0]: |
| 1282 | + # get #training reactions in overlap |
| 1283 | + family = source_dict_i['Rate Rules'][0] |
| 1284 | + node_name_i = source_dict_i['Rate Rules'][1]['template'][0].label |
| 1285 | + node_name_j = source_dict_j['Rate Rules'][1]['template'][0].label |
| 1286 | + rxns_i = auto_gen_family_rxn_maps[family][node_name_i] |
| 1287 | + rxns_j = auto_gen_family_rxn_maps[family][node_name_j] |
| 1288 | + |
| 1289 | + # count overlapping reactions: |
| 1290 | + overlap_count = 0 |
| 1291 | + for r_i in rxns_i: |
| 1292 | + if r_i in rxns_j: |
| 1293 | + overlap_count += 1 |
| 1294 | + |
| 1295 | + self.kinetic_covariance_matrix[i, j] += (overlap_count / len(rxns_i)) * (overlap_count / len(rxns_j)) * (k_param_engine.dlnk_rule ** 2.0) |
| 1296 | + |
| 1297 | + # check if a training reaction exactly matches a rate rule data entry |
| 1298 | + if 'Training' in source_dict_i.keys() and 'Rate Rules' in source_dict_j.keys(): |
| 1299 | + rate_rules_training_reactions = [t[1] for t in source_dict_j['Rate Rules'][1]['training']] |
| 1300 | + weights = [t[2] for t in source_dict_j['Rate Rules'][1]['training']] |
| 1301 | + training_reaction = source_dict_i['Training'][1] |
| 1302 | + for k in range(len(rate_rules_training_reactions)): |
| 1303 | + if rate_rules_training_reactions[k].item.is_isomorphic(training_reaction.item): |
| 1304 | + self.kinetic_covariance_matrix[i, j] += weights[k] * k_param_engine.dlnk_training * k_param_engine.dlnk_rule |
| 1305 | + elif 'Training' in source_dict_j.keys() and 'Rate Rules' in source_dict_i.keys(): |
| 1306 | + rate_rules_training_reactions = [t[1] for t in source_dict_i['Rate Rules'][1]['training']] |
| 1307 | + weights = [t[2] for t in source_dict_i['Rate Rules'][1]['training']] |
| 1308 | + training_reaction = source_dict_j['Training'][1] |
| 1309 | + for k in range(len(rate_rules_training_reactions)): |
| 1310 | + if rate_rules_training_reactions[k].item.is_isomorphic(training_reaction.item): |
| 1311 | + self.kinetic_covariance_matrix[i, j] += weights[k] * k_param_engine.dlnk_training * k_param_engine.dlnk_rule |
| 1312 | + |
| 1313 | + # check if one of them is an exact training reaction - might be used in node rate rule |
| 1314 | + |
| 1315 | + # Add in thermo correlations if both BEP |
| 1316 | + if isinstance(reaction.kinetics, (SurfaceArrheniusBEP, StickingCoefficientBEP)) and \ |
| 1317 | + isinstance(other_reaction.kinetics, (SurfaceArrheniusBEP, StickingCoefficientBEP)): |
| 1318 | + |
| 1319 | + alpha_i = reaction.kinetics.alpha.value_si |
| 1320 | + alpha_j = other_reaction.kinetics.alpha.value_si |
| 1321 | + |
| 1322 | + R = 8.314472 |
| 1323 | + T = 1000.0 |
| 1324 | + r1_sp_indices = [self.species_list.index(sp) for sp in reaction.reactants + reaction.products] |
| 1325 | + r1_coefficients = [-1 for x in reaction.reactants] |
| 1326 | + r1_coefficients.extend([1 for x in reaction.products]) |
| 1327 | + |
| 1328 | + r2_sp_indices = [self.species_list.index(sp) for sp in other_reaction.reactants + other_reaction.products] |
| 1329 | + r2_coefficients = [-1 for x in other_reaction.reactants] |
| 1330 | + r2_coefficients.extend([1 for x in other_reaction.products]) |
| 1331 | + for r1 in range(len(r1_sp_indices)): |
| 1332 | + for r2 in range(len(r2_sp_indices)): |
| 1333 | + |
| 1334 | + covH = self.thermo_covariance_matrix[r1_sp_indices[r1], r2_sp_indices[r2]] * 4184 * 4184 # convert from kcal/mol to J/mol |
| 1335 | + nu_i = r1_coefficients[r1] |
| 1336 | + nu_j = r2_coefficients[r2] |
| 1337 | + |
| 1338 | + self.kinetic_covariance_matrix[i, j] += nu_i * nu_j * alpha_i * alpha_j * covH / np.float_power(R * T, 2.0) |
| 1339 | + |
| 1340 | + return self.kinetic_covariance_matrix |
| 1341 | + |
| 1342 | + def get_overall_covariance_matrix(self): |
| 1343 | + # make combined thermo and kinetics covariance matrix, species first, then reactions |
| 1344 | + |
| 1345 | + assert self.kinetic_input_uncertainties is not None, 'Must call assign_parameter_uncertainties first' |
| 1346 | + assert len(self.kinetic_input_uncertainties) > 0, 'No kinetic parameters found' |
| 1347 | + assert self.thermo_covariance_matrix is not None, 'Must create thermo covariance matrix first' |
| 1348 | + assert self.kinetic_covariance_matrix is not None, 'Must create kinetics covariance matrix first' |
| 1349 | + |
| 1350 | + def species_in_list(new_species, species_list): |
| 1351 | + for sp in species_list: |
| 1352 | + if new_species.is_isomorphic(sp): |
| 1353 | + return True |
| 1354 | + return False |
| 1355 | + |
| 1356 | + self.overall_covariance_matrix = np.zeros((len(self.species_list) + len(self.reaction_list), len(self.species_list) + len(self.reaction_list))) |
| 1357 | + self.overall_covariance_matrix[:len(self.species_list), :len(self.species_list)] = self.thermo_covariance_matrix |
| 1358 | + self.overall_covariance_matrix[len(self.species_list):, len(self.species_list):] = self.kinetic_covariance_matrix |
| 1359 | + |
| 1360 | + # fill in the covariance between reaction and species? |
| 1361 | + |
| 1362 | + for i in range(len(self.reaction_list)): |
| 1363 | + reaction = self.reaction_list[i] |
| 1364 | + |
| 1365 | + BEP = None |
| 1366 | + BEP_types = [SurfaceArrheniusBEP, StickingCoefficientBEP] |
| 1367 | + if isinstance(reaction.kinetics, tuple(BEP_types)): |
| 1368 | + BEP = reaction.kinetics |
| 1369 | + elif 'Rate Rules' not in self.reaction_sources_dict[reaction]: |
| 1370 | + pass # nothing to do here if kinetics doesn't depend on thermo through a BEP |
| 1371 | + elif self.reaction_sources_dict[reaction]['Rate Rules'][1]['rules'] and \ |
| 1372 | + isinstance(self.reaction_sources_dict[reaction]['Rate Rules'][1]['rules'][0][0].data, tuple(BEP_types)): |
| 1373 | + BEP = self.reaction_sources_dict[reaction]['Rate Rules'][1]['rules'][0][0].data |
| 1374 | + elif self.reaction_sources_dict[reaction]['Rate Rules'][1]['training'] and \ |
| 1375 | + isinstance(self.reaction_sources_dict[reaction]['Rate Rules'][1]['training'][0][0].data, tuple(BEP_types)): |
| 1376 | + BEP = self.reaction_sources_dict[reaction]['Rate Rules'][1]['training'][0][0].data |
| 1377 | + |
| 1378 | + if BEP: |
| 1379 | + alpha_i = BEP.alpha.value_si |
| 1380 | + |
| 1381 | + R = 8.314472 |
| 1382 | + T = 1000.0 |
| 1383 | + r1_sp_indices = [self.species_list.index(sp) for sp in reaction.reactants + reaction.products] |
| 1384 | + r1_coefficients = [-1 for x in reaction.reactants] |
| 1385 | + r1_coefficients.extend([1 for x in reaction.products]) |
| 1386 | + |
| 1387 | + for r1 in range(len(r1_sp_indices)): # loop over species in the reaction |
| 1388 | + for j in range(len(self.species_list)): # loop over all species |
| 1389 | + covH = self.thermo_covariance_matrix[r1_sp_indices[r1], j] * 4184 * 4184 # convert from kcal/mol to J/mol |
| 1390 | + nu_i = r1_coefficients[r1] |
| 1391 | + |
| 1392 | + self.overall_covariance_matrix[len(self.species_list) + i, j] += nu_i * alpha_i * covH / (R * T) / 4184 # convert back to kcal/mol |
| 1393 | + |
| 1394 | + # fill in the lower triangle by copying from the top |
| 1395 | + for i in range(len(self.reaction_list)): |
| 1396 | + for j in range(len(self.species_list)): |
| 1397 | + self.overall_covariance_matrix[j, len(self.species_list) + i] = self.overall_covariance_matrix[len(self.species_list) + i, j] |
| 1398 | + |
| 1399 | + return self.overall_covariance_matrix |
| 1400 | + |
1087 | 1401 |
|
1088 | 1402 | def process_local_results(results, sensitive_species, number=10): |
1089 | 1403 | """ |
|
0 commit comments