From 4ff5325c0698ea7a5a2417347e5a529eaa52d453 Mon Sep 17 00:00:00 2001 From: Jin Whan Bae Date: Mon, 2 Mar 2026 10:06:29 -0500 Subject: [PATCH] add raw activator for quicker runs --- openmc_activator_raw.py | 716 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 716 insertions(+) create mode 100644 openmc_activator_raw.py diff --git a/openmc_activator_raw.py b/openmc_activator_raw.py new file mode 100644 index 0000000..02245c9 --- /dev/null +++ b/openmc_activator_raw.py @@ -0,0 +1,716 @@ +from pathlib import Path +import numpy as np +import openmc +import openmc.deplete +import os, tempfile +from typing import Sequence +import openmc.checkvalue as cv +from openmc.deplete.microxs import MicroXS, _find_cross_sections +from openmc.deplete import REACTION_MT, GROUP_STRUCTURES, Chain +from openmc.deplete.chain import _get_chain +from typing import TypedDict +from packaging.version import Version +from openmc.data import ELEMENT_SYMBOL, AVOGADRO +from openmc.deplete.cram import CRAM48 + + +class ActivationDict(TypedDict): + """Contains necessary data for material activation calculations. + + This TypedDict defines the required parameters for performing activation + calculations on materials using the deplete_materials function. + """ + materials: openmc.Material + """Material to be depleted during activation calculations""" + + multigroup_flux: Sequence[float] + """Energy-dependent multigroup flux values, length must match energy groups""" + + energy: Sequence[float] | str + """Energy group boundaries in [eV] or name of predefined group structure""" + + source_rate: Sequence[float] + """Source rates for each timestep in the depletion calculation""" + + timesteps: Sequence[float] + """Time intervals for the depletion steps in timestep_units""" + + +class OpenmcActivator: + + def __init__( + self, + activation_data: Sequence[ActivationDict], + timestep_units: str = 's', + reduce_chain_level: int = 2, + chain_file: cv.PathLike | openmc.deplete.Chain | None = None, + nuclides: Sequence[str] | None = None, + reactions: Sequence[str] | None = None, + **init_kwargs: dict + ): + + self.activation_data = activation_data + self.timestep_units = timestep_units + self.reduce_chain_level = reduce_chain_level + self.chain = _get_chain(chain_file) + chain_nuclides = [n.name for n in self.chain.nuclides] + if not nuclides: + self.nuclides = chain_nuclides + else: + # check if all nuclides in chain file + for nuc in nuclides: + if nuc not in chain_nuclides: + raise ValueError(f"Nuclide {nuc} not in chain file") + self.nuclides = nuclides + + chain_reactions = self.chain.reactions + if not reactions: + self.reactions = chain_reactions + else: + # check if all reactions in chain file + for r in reactions: + if r not in chain_reactions: + raise ValueError(f"Reaction {r} not in chain file") + self.reactions = reactions + if init_kwargs == {}: + self.init_kwargs = {'output': False} + + # collect useful data for later to avoid reduant processing + self.atomic_mass_vec = np.array([openmc.data.atomic_mass(nuc) for nuc in self.nuclides]) + self.q_vec = np.array([openmc.data.decay_energy(nuc) for nuc in self.nuclides]) # in eV + self.q_vec *= 1.60218e-19 # convert to Joules + self.lambda_vec = np.array([openmc.data.decay_constant(nuc) for nuc in self.nuclides]) # in 1/s + + + def activate(self, + metric_list: list=['mass'], + ) -> list[dict]: + + + all_metric_dict = [] + print('Loading microxs data and performing depletion calculations...') + # vectorize microxs collapsing + microxs = get_microxs(self.activation_data[0]['energy'], + multigroup_flux=np.array([q['multigroup_flux'] for q in self.activation_data]), + chain_file=self.chain, + #! this is probably not cool needs to be number or sequence + temperature=self.activation_data[0]['materials'].temperature or 294, + nuclides=self.nuclides, + reactions=self.reactions, + temperature_interpolation_method='nearest' + ) + print('Finished.') + + for mindx, entry in enumerate(self.activation_data): + print(f'Depleting entry {mindx+1}/{len(self.activation_data)}...') + + atom_arrs = run_depletion(entry['materials'], + microxs[mindx], + self.chain, + np.array(entry['timesteps']), + entry['source_rate'], + timestep_units=self.timestep_units + ) + + # process the array directly + metric_arr_dict = self.atom_arrs_to_values(atom_arrs, metric_list) + # process it + metric_dict = {} + for metric in metric_arr_dict.keys(): + # get the time array + metric_dict[metric] = { + f'meta_time_{self.timestep_units}': [float(x) for x in np.cumsum([0] + list(entry['timesteps']))] + } + # now for each nuclide + for i, iso in enumerate(self.nuclides): + ll = metric_arr_dict[metric][:,i].tolist() + if sum(ll) == 0: continue + metric_dict[metric][iso] = ll + metric_dict[metric]['meta_total'] = metric_arr_dict[metric].sum(axis=1).tolist() + all_metric_dict.append(metric_dict) + + return all_metric_dict + + # output processing + def atom_arrs_to_values(self, + atom_arrs: np.ndarray, + value_types: Sequence[str] + ) -> dict: + avail_types = ['mass', 'activity', 'decay_heat'] + if not all(t in avail_types for t in value_types): + raise ValueError(f'Invalid value types. Must be one of {avail_types}') + values_dict = {} + assert(atom_arrs.shape[1] == len(self.nuclides)), 'Number of nuclides in atom_arrs must match length of nuclides list' + for t in value_types: + if t == 'mass': + values_dict[t] = (atom_arrs / AVOGADRO # moles + * self.atomic_mass_vec # grams + ) + elif t == 'activity': + values_dict[t] = atom_arrs * self.lambda_vec + elif t == 'decay_heat': + values_dict[t] = atom_arrs * self.lambda_vec * self.q_vec + return values_dict + + +def get_atom_vector_from_mat(material:openmc.Material, + nuclides): + if material.volume is None: + raise ValueError('Material must have volume') + atoms = material.get_nuclide_atom_densities() + if 'C0' in atoms: + atoms['C12'] = atoms.pop('C0') + vol_in_b_cm = material.volume * 1e24 # convert from b-cm to cm3 + atoms = {k:v * vol_in_b_cm for k,v in atoms.items()} # atoms + atom_arr = np.zeros(len(nuclides)) + for i, nuc in enumerate(nuclides): + atom_arr[i] = atoms.get(nuc, 0.0) + if sum(atom_arr) == 0.0: + raise ValueError('Material has no atoms of the nuclides in the microxs object') + if sum(atom_arr) != sum(atoms.values()): + for k in atoms: + if k not in nuclides: + print(k) + print(f'Warning conversion has discrepancy by {1 - sum(atom_arr)/sum(atoms.values())}') + # raise ValueError('Some atoms in the material are not in the microxs nuclides, which would lead to loss of mass during depletion. Please ensure all nuclides in the material are included in the microxs object.') + return atom_arr + + +def run_depletion(mat: openmc.Material, microxs: openmc.deplete.MicroXS, + chain: openmc.deplete.Chain, timesteps: Sequence[float], + source_rates: Sequence[float], + timestep_units='s') -> np.ndarray: + """ + Run depletion calculation for a single material using OpenMC's IndependentOperator. + + Parameters: + ----------- + mat: openmc.Material + The material to be depleted. + microxs: openmc.deplete.MicroXS + The microcross sections for the material. + chain: openmc.deplete.Chain + The depletion chain to use for the calculation. + timesteps: Sequence[float] + The time intervals for the depletion steps, in seconds. + source_rates: Sequence[float] + The source rates for each timestep, in atoms/s. + + Returns: + -------- + atom_arrs: np.ndarray + A 2D array of shape (len(timesteps)+1, num_nuclides) containing the number of atoms of each nuclide at each timestep. + """ + # def get_transmutation_matrices_openmc(mat, microxs, chain, timesteps, source_rates): + if mat.volume is None: + raise ValueError('Material must have volume') + if len(timesteps) != len(source_rates): + raise ValueError('timesteps and source_rates must have the same length') + + # process timesteps to seconds + if timestep_units == 'd': + mult = 24*3600 + elif timestep_units == 'h': + mult = 3600 + elif timestep_units == 'm': + print(' by m you mean minutes, not months, right?') + mult = 60 + else: + raise ValueError(f"Invalid timestep_units: {timestep_units}") + + atom_arr = get_atom_vector_from_mat(mat, microxs.nuclides) + dts = np.asarray(timesteps, dtype=float) * mult + + mat.depletable = True + operator = openmc.deplete.IndependentOperator( + materials=[mat], + fluxes=[mat.volume], + micros=[microxs], + normalization_mode='source-rate', + chain_file=chain) + + vec = operator.initial_condition() + atom_arrs = np.zeros((len(timesteps)+1, len(atom_arr))) + atom_arrs[0] = vec[0] + + for tindx, (dt, source_rate) in enumerate(zip(dts, source_rates)): + op_result = operator(vec, source_rate) + matrix = operator.chain.form_matrix( + op_result.rates[0], + operator.chain.fission_yields[0], + ) + # matrices.append(matrix) + vec[0] = CRAM48(matrix, vec[0], dt) + atom_arrs[tindx+1] = vec[0] + + # no negative numbers please + atom_arrs[atom_arrs < 0] = 0.0 + return atom_arrs + + +def get_microxs(energies:Sequence[float] | str, + multigroup_flux: Sequence[float] | Sequence[Sequence[float]], + chain_file: str | cv.PathLike | None = None, + temperature:float = 294, + nuclides:Sequence[str] | None = None, + reactions: Sequence[str] | None = None, + settings_file: str | cv.PathLike | None = None, + temperature_interpolation_method: str | None = None, + ) -> MicroXS | list[MicroXS]: + """Full implementation + + + Parameters + ---------- + energies : iterable of float or str + Energy group boundaries in [eV] or the name of the group structure + multigroup_flux : sequence of float or sequence of sequence of float + Group-integrated flux values. Can be a single flux vector of length + ``G`` or a batch with shape ``(N, G)``. + chain_file : PathLike or Chain, optional + Path to the depletion chain XML file or an instance of + openmc.deplete.Chain. Defaults to ``openmc.config['chain_file']``. + temperature : int, optional + Temperature for cross section evaluation in [K]. + nuclides : list of str, optional + Nuclides to get cross sections for. If not specified, all burnable + nuclides from the depletion chain file are used. + reactions : list of str, optional + Reactions to get cross sections for. If not specified, all neutron + reactions listed in the depletion chain file are used. + + + Returns + ------- + MicroXS or list of MicroXS + + """ + # if energy is string then use group structure of that name + if isinstance(energies, str): + energies = GROUP_STRUCTURES[energies] + else: + # if user inputs energies check they are ascending (low to high) as + # some depletion codes use high energy to low energy. + if not np.all(np.diff(energies) > 0): + raise ValueError('Energy group boundaries must be in ascending order') + + + chain = _get_chain(chain_file) + cross_sections = _find_cross_sections(model=None) + + # If no nuclides were specified, default to all nuclides from the chain + if not nuclides: + nuclides = chain.nuclides + nuclides = [nuc.name for nuc in nuclides] + + # Get reaction MT values. If no reactions specified, default to the + # reactions available in the chain file + if reactions is None: + reactions = chain.reactions + mts = [REACTION_MT[name] for name in reactions] + + # check dimension consistency + if multigroup_flux.ndim == 1: # just one flux spectrum + if len(multigroup_flux) != len(energies) - 1: + raise ValueError('Length of flux array should be len(energies)-1') + microxs_matrix = np.zeros((len(nuclides), len(reactions))) + elif multigroup_flux.ndim == 2: # batch of flux spectra + # shape (# of spectra, # of energy groups) + if multigroup_flux.shape[1] != len(energies) - 1: + raise ValueError('Second dimension of flux array should be len(energies)-1') + microxs_matrix = np.zeros((len(nuclides), len(reactions), multigroup_flux.shape[0])) + else: + raise ValueError('multigroup_flux should be either 1D or 2D array') + + # get temperature interpolation method + if temperature_interpolation_method is not None: + temperature_method = temperature_interpolation_method + else: + if not settings_file: + # try at pwd + settings_file = Path('settings.xml') + if settings_file.exists(): + settings = openmc.Settings.from_xml(settings_file) + temperature_method = settings.temperature['method'] + else: + raise ValueError('No settings file provided and settings.xml not found in current directory') + else: + settings = openmc.Settings.from_xml(settings_file) + temperature_method = settings.temperature['method'] + + assert(temperature_method in ['nearest', 'interpolation']) + + + # now the actual construction of the MicroXS object + xs_path_dict = _get_cross_section_paths(cross_sections) + + # iterate over nuclides and reactions to get the appropriate cross sections, collapse them, and fill in the MicroXS object + for i, iso in enumerate(nuclides): + if (iso, 'neutron') not in xs_path_dict: + continue # skip nuclides without neutron cross section data + data = openmc.data.IncidentNeutron.from_hdf5(xs_path_dict[(iso, 'neutron')]) + temperature_fractions = _interpolate_xs(data, temperature, temperature_method) + for r, reaction in enumerate(reactions): + mt = REACTION_MT[reaction] + if mt not in data.reactions: continue # skip + nuc_data_xs = data[mt].xs + for key, frac in temperature_fractions.items(): + # need to do multiple collapsing + # because different temperature values + # have different energy grids + temp_xs = data[mt].xs[key] + microxs_matrix[i,r,...] += frac * collapse_rate(energies, multigroup_flux, temp_xs.x, temp_xs.y) + + # return a single microxs object if only one flux spectrum was provided, otherwise return a list of microxs objects for each spectrum in the batch + if multigroup_flux.ndim == 1: + return MicroXS(data=microxs_matrix[:,:,None], + nuclides=nuclides, + reactions=reactions) + else: + return [MicroXS(data=microxs_matrix[:,:,i][:,:,None], + nuclides=nuclides, + reactions=reactions) for i in range(multigroup_flux.shape[0])] + + + +def collapse_rate( + energy: Sequence[float], + flux: Sequence[float] | Sequence[Sequence[float]], + xs_energy: Sequence[float], + xs_values: Sequence[float], +) -> float | np.ndarray: + """Collapse tabulated microscopic XS onto one or more multigroup fluxes. + + Parameters + ---------- + energy : sequence of float + Group boundaries, length = len(flux) + 1. + flux : sequence of float or sequence of sequence of float + Group-integrated flux values. Can be a single flux vector of length + ``G`` or a batch with shape ``(N, G)``. + xs_energy : sequence of float + Energy grid for the tabulated microscopic cross section. + xs_values : sequence of float + Values for the tabulated microscopic cross section. + max_workers : int or None, optional + Unused. Retained for backward compatibility. + + Returns + ------- + float or numpy.ndarray + Flux-collapsed reaction rate for a single flux vector, or an array of + rates for a batch of flux vectors (in input order). + """ + energy = np.asarray(energy, dtype=float) + flux_arr = np.array(flux, dtype=float) + + if energy.ndim != 1: + raise ValueError("energy must be a 1D array.") + if energy.size == 0: + raise ValueError("energy must be non-empty.") + + group_weights = _collapse_group_weights(energy, xs_energy, xs_values) + group_size = energy.size - 1 + + if flux_arr.ndim == 1: + if group_size != flux_arr.size: + raise ValueError("energy must have length len(flux) + 1.") + flux_arr /= flux_arr.sum() + return float(np.dot(flux_arr, group_weights)) + + if flux_arr.ndim == 2: + if flux_arr.shape[0] == 0: + raise ValueError("flux batch cannot be empty.") + if flux_arr.shape[1] != group_size: + raise ValueError("Batches flux shape[1] has to equal energy dimensions") + + flux_sums = flux_arr.sum(axis=1, keepdims=True) + flux_arr = np.divide( + flux_arr, + flux_sums, + out=np.zeros_like(flux_arr), + where=flux_sums != 0.0, + ) + return flux_arr @ group_weights + + raise ValueError("flux must be a 1D vector or a 2D batch of vectors.") + + +def _get_interpolation_weights(values, target): + # check ascending + if not all(a < b for a, b in zip(values, values[1:])): + raise ValueError("values must be in ascending order") + + values = np.asarray(values) + if max(values) < target: + return {values[-1]: 1.0} + if min(values) > target: + return {values[0]: 1.0} + + i = np.searchsorted(values, target) + i = np.clip(i, 1, len(values) - 1) + + v0 = values[i - 1] + v1 = values[i] + alpha = (target - v0) / (v1 - v0) + + return {v0:1-alpha, v1:alpha} + + +def _find_temperature( + T: float, + avail_temps: Sequence[float], + temperature_method: str, +) -> dict: + """ + Determine temperature index and interpolation factor. + + Parameters + ---------- + T : float + Temperature in K (must be >= 0). + kTs : Sequence[float] + Available temperatures in energy units (kT), sorted ascending. + temperature_method : str + "nearest" or "interpolation". + + Returns + ------- + dict + Dictionary with temperature keys and interpolation weights as values. + """ + if T < 0.0: + raise ValueError("T must be >= 0.0") + if len(avail_temps) == 0: + raise ValueError("avail_temps must not be empty") + + if temperature_method == "nearest": + nearest_temp = min(avail_temps, key=lambda t: abs(t - T)) + return {nearest_temp: 1.0} + + elif temperature_method == "interpolation": + return _get_interpolation_weights(avail_temps, T) + + else: + raise ValueError("temperature_method must be 'nearest' or 'interpolation'") + + +def _interpolate_xs(incident_neutron_data:openmc.data.IncidentNeutron, + T: float, + temperature_interpolation_method: str | None = None) -> openmc.data.function.Tabulated1D: + """Interpolate cross sections to the desired temperature T.""" + temperature_keys = incident_neutron_data.temperatures + temperatures = [float(t.replace('K', '')) for t in temperature_keys] + interp_factor_dict = _find_temperature(T, temperatures, temperature_interpolation_method) + # return the string key directly for xs to avoid float / int conversion + key_dict = {t:k for t,k in zip(temperatures, temperature_keys)} + return {key_dict[t]: f for t, f in interp_factor_dict.items()} + + +def _collapse_group_weights( + energy: np.ndarray, + xs_energy: Sequence[float], + xs_values: Sequence[float], +) -> np.ndarray: + """Compute per-group collapse coefficients for a fixed energy grid and XS.""" + + if xs_energy.ndim != 1 or xs_values.ndim != 1: + raise ValueError("Tabulated1D x and y must be 1D arrays.") + if xs_energy.size < 2 or xs_values.size != xs_energy.size: + raise ValueError("Tabulated1D must have at least 2 grid points and matching y length.") + + if not np.any(xs_values != 0.0): + return np.zeros(energy.size - 1, dtype=float) + + # Index corresponding to first energy group boundary + i_low = int(np.searchsorted(xs_energy, energy[0], side="right") - 1) + i_low = max(0, min(i_low, xs_energy.size - 2)) + + # For Tabulated1D, explicit threshold index is not available. + # Integrate on geometric overlap only to avoid skipping tiny near-threshold + # contributions when first non-zero detection is noisy. + j_start = 0 + group_weights = np.zeros(energy.size - 1, dtype=float) + + + for j in range(j_start, energy.size - 1): + e_group_low = energy[j] + e_group_high = energy[j + 1] + width = e_group_high - e_group_low + if width <= 0.0: + continue + group_integral = 0.0 + + # Find high grid segment index intersecting this group + i_high = int(np.searchsorted(xs_energy, e_group_high, side="left") - 1) + i_high = max(i_low, min(i_high, xs_energy.size - 2)) + + for i in range(i_low, i_high + 1): + e_l = xs_energy[i] + e_r = xs_energy[i + 1] + if e_l == e_r: + continue + + xs_l = xs_values[i] + xs_r = xs_values[i + 1] + + e_low = max(e_group_low, e_l) + e_high = min(e_group_high, e_r) + if e_high <= e_low: + continue + + m = (xs_r - xs_l) / (e_r - e_l) + xs_low = xs_l + m * (e_low - e_l) + xs_high = xs_l + m * (e_high - e_l) + xs_avg = 0.5 * (xs_low + xs_high) + + group_integral += xs_avg * (e_high - e_low) + + group_weights[j] = group_integral / width + + i_low = i_high + if i_low + 1 == xs_energy.size: + break + + return group_weights + +def read_output( + materials:openmc.Materials, + nuclides, + metric_list:list, + timesteps:list, + timestep_units:str +): + + # get metrics + # time is cumulative time + metric_dict = { + metric: { + f'meta_time_{timestep_units}': [float(x) for x in np.cumsum([0] + list(timesteps))] + } + for metric in metric_list + } + # add all the isos + for metric in metric_list: + metric_dict[metric]['meta_total'] = [] + for iso in nuclides: + metric_dict[metric][iso] = [] + for mat in materials: + for metric in metric_dict.keys(): + if metric == 'mass': + td = {iso:mat.get_mass(iso) for iso in nuclides} + elif metric == 'atom': + td = mat.get_nuclide_atoms() + elif metric == 'decay_heat': + td = mat.get_decay_heat('W', by_nuclide=True) + elif metric == 'activity': + td = mat.get_activity('Bq', by_nuclide=True) + else: + raise ValueError('Invalid metric ' + metric) + + for iso in nuclides: + if iso not in td: + metric_dict[metric][iso].append(0.0) + else: + metric_dict[metric][iso].append(td[iso]) + metric_dict[metric]['meta_total'].append(float(sum(td.values()))) + + # Clean up the dictionary by removing entries with all zeros + for metric in metric_dict.keys(): + for iso in nuclides: + # Check if all values are zeros + if all(abs(value) == 0.0 for value in metric_dict[metric][iso]): + # Remove entries that are all zeros + del metric_dict[metric][iso] + + return metric_dict + + +def write_markdown_file( + experiment_names: list[str], + material_name: str, +): + """A small utility function to write the Jupyter Book markdown files for + each material irradiated: + """ + + element_names = {value: key for key, value in ELEMENT_SYMBOL.items()} + + # Create docs directory if it doesn't exist + Path('docs').mkdir(exist_ok=True) + + filename = f'docs/{material_name}.md' + + with open(filename, 'w') as f: + # Write the material header + if material_name in element_names.keys(): + + f.write(f'# {material_name} - {element_names[material_name]}\n\n') + else: + f.write(f'# {material_name}\n\n') + + # Write a section for each experiment + for exp in experiment_names: + f.write(f'## {exp}\n\n') + + f.write(f'\n\n') + + f.write(f'![Alt text]({material_name}_{exp}.png)\n\n') + +def _get_cross_section_paths(cross_section_xml_path: str | cv.PathLike) -> dict[tuple[str, str], str]: + """Read the cross section XML and return a dictionary mapping (material, reaction type) to the file path of the corresponding cross section data. + + Parameters + ---------- + cross_section_xml_path : str or PathLike + Path to the cross section XML file (e.g., "cross_sections.xml"). + + Returns + ------- + dict[tuple[str, str], str] + A dictionary where keys are tuples of (material name, reaction type) and values are the file paths to the corresponding cross section data. For example, a key might be ("U235", "neutron") and the value would be the path to the HDF5 file containing the neutron cross section data for U235. + + """ + # now the actual construction of the MicroXS object + xsdl = openmc.data.DataLibrary.from_xml(cross_section_xml_path) + # reconstruct xsdl for easier indexing + xs_path_dict = {} + for entry in xsdl: + for mat in entry['materials']: + # throw error if there's a duplicate entry for the same material and reaction type, as this would cause ambiguity in which file to read from + assert((mat, entry['type']) not in xs_path_dict), f"Duplicate entry for material {mat} and reaction type {entry['type']}" + xs_path_dict[(mat, entry['type'])] = entry['path'] + return xs_path_dict + + +def read_experimental_data(exp_file): + """ + Read experimental data from file and filter out rows where all values are 0. + + Returns: + tuple: (minutes, values, uncertainties) without any zero rows + """ + lines = open(exp_file).readlines() + + minutes = [] + vals = [] + unc = [] + + for line in lines: + parts = line.strip().split() + if len(parts) != 3: + continue + + min_val = float(parts[0]) + data_val = float(parts[1]) + unc_val = float(parts[2]) + + # Skip rows where all three values are effectively zero + if min_val == 0.0 and data_val == 0.0 and unc_val == 0.0: + continue + + minutes.append(min_val) + vals.append(data_val) + unc.append(unc_val) + + return (minutes, vals, unc) \ No newline at end of file