diff --git a/src/festim/__init__.py b/src/festim/__init__.py index 823d519d9..11db36aec 100644 --- a/src/festim/__init__.py +++ b/src/festim/__init__.py @@ -35,7 +35,12 @@ from .exports.total_surface import TotalSurface from .exports.total_volume import TotalVolume from .exports.volume_quantity import VolumeQuantity -from .exports.vtx import ExportBaseClass, VTXSpeciesExport, VTXTemperatureExport +from .exports.vtx import ( + ExportBaseClass, + VTXSpeciesExport, + VTXTemperatureExport, + CustomField, +) from .exports.xdmf import XDMFExport from .heat_transfer_problem import HeatTransferProblem from .helpers import ( diff --git a/src/festim/exports/__init__.py b/src/festim/exports/__init__.py index 31a287244..57c0e822f 100644 --- a/src/festim/exports/__init__.py +++ b/src/festim/exports/__init__.py @@ -10,12 +10,13 @@ from .total_surface import TotalSurface from .total_volume import TotalVolume from .volume_quantity import VolumeQuantity -from .vtx import ExportBaseClass, VTXSpeciesExport, VTXTemperatureExport +from .vtx import ExportBaseClass, VTXSpeciesExport, VTXTemperatureExport, CustomField from .xdmf import XDMFExport __all__ = [ "AverageSurface", "AverageVolume", + "CustomField", "ExportBaseClass", "MaximumSurface", "MaximumVolume", diff --git a/src/festim/exports/vtx.py b/src/festim/exports/vtx.py index 7159360fd..d2572f3cb 100644 --- a/src/festim/exports/vtx.py +++ b/src/festim/exports/vtx.py @@ -1,9 +1,14 @@ +import inspect import warnings +from collections.abc import Callable from pathlib import Path +from typing import Union +import ufl from dolfinx import fem, io -from festim.species import Species +from festim.helpers import get_interpolation_points +from festim.species import Species, ImplicitSpecies from festim.subdomain.volume_subdomain import VolumeSubdomain @@ -172,3 +177,150 @@ def get_functions(self) -> list[fem.Function]: field.subdomain_to_post_processing_solution[self._subdomain] ) return outfiles + + +class CustomField(ExportBaseClass): + """Export a custom field to a VTX file + + Args: + filename: The name of the output file + expression: A function evaluating the custom field. Positional + arguments of the function can be "t" (time), "x" (spatial coordinate), + "T" (temperature), or any key from the `species_dependent_value` dictionary. + species_dependent_value: A dictionary mapping argument names + in `expression` to Species objects. Defaults to None. + times: if provided, the field will be exported at these timesteps. Otherwise + exports at all timesteps. Defaults to None. + subdomain: The volume subdomain on which the custom + field is evaluated. Defaults to None. + checkpoint: If True, the export will be a checkpoint file using + adios4dolfinx and won't be readable by ParaView. Default is False. + + Attributes: + filename: The name of the output file + expression: A function evaluating the custom field. + species_dependent_value: A dictionary mapping argument names to Species objects. + subdomain: The volume subdomain on which the custom field is evaluated. + checkpoint: If True, the export will be a checkpoint file. + times: if provided, the field will be exported at these timesteps. Otherwise + exports at all timesteps. + function: the function containing the custom field values + writer: The VTXWriter object used to write the file + dolfinx_expression: the dolfinx expression used to evaluate the function + """ + + function: fem.Function + writer: io.VTXWriter + dolfinx_expression: fem.Expression + expression: Callable + species_dependent_value: dict[str, Species] + subdomain: VolumeSubdomain + checkpoint: bool + + def __init__( + self, + filename: Union[str, Path], + expression: Callable, + species_dependent_value: Union[dict[str, Species], None] = None, + times: Union[list[float], list[int], None] = None, + subdomain: VolumeSubdomain = None, + checkpoint: bool = False, + ): + super().__init__( + filename=filename, + times=times, + ext=".bp", + ) + self.expression = expression + self.species_dependent_value = species_dependent_value or {} + self.checkpoint = checkpoint + self.subdomain = subdomain + + def set_dolfinx_expression( + self, + temperature: fem.Constant | fem.Function, + time: fem.Constant, + ): + """ + Set the dolfinx expression used to evaluate the custom field. This is done by + evaluating the user-provided expression with the appropriate arguments and using + the result to create a dolfinx expression. + + Args: + temperature: The temperature field to use in the expression + time: The time to use in the expression + """ + # check if we are in a mixed domain/discontinuous case + mixed_domain = any( + spe.subdomain_to_post_processing_solution + for spe in self.species_dependent_value.values() + ) or (self.subdomain.sub_T if self.subdomain else None) + + # get the arguments of the user-provided expression + arguments = inspect.signature(self.expression).parameters + + # create a dictionary mapping the arguments to the appropriate values + kwargs = {} + if "t" in arguments: + kwargs["t"] = time + if "x" in arguments: + x = ufl.SpatialCoordinate(self.function.function_space.mesh) + kwargs["x"] = x + if "T" in arguments: + if isinstance(temperature, fem.Function) and mixed_domain: + # fem.Function in mixed domain/discontinuous case, use sub_T + # NOTE I'm not sure that sub_T is updated at every time step + kwargs["T"] = self.subdomain.sub_T + else: + # else use the provided temperature + kwargs["T"] = temperature + # check if there are other arguments and if they are in species_dependent_value + for arg in arguments: + if arg in self.species_dependent_value: + spe = self.species_dependent_value[arg] + if isinstance(spe, ImplicitSpecies): + raise NotImplementedError( + "Custom fields depending on implicit species are not" + "implemented yet." + ) + if mixed_domain: + kwargs[arg] = spe.subdomain_to_post_processing_solution[ + self.subdomain + ] + else: + kwargs[arg] = spe.post_processing_solution + assert kwargs[arg] is not None, ( + f"Argument {arg} not found in species_dependent_value" + ) + + self.check_valid_inputs(kwargs, mixed_domain) + + # evaluate the user-provided expression with the appropriate arguments and create a + # dolfinx.fem.Expression + self.dolfinx_expression = fem.Expression( + self.expression(**kwargs), + get_interpolation_points(self.function.function_space.element), + ) + + def check_valid_inputs(self, kwargs: dict, mixed_domain: bool): + """ + Check if we are in the mixed domain/discontinuous case and if the user-provided + expression is valid in this case. + dolfinx.fem.Expression does not support co-dim 0 submeshes and time is defined + on the parent mesh, so we cannot have time-dependent custom fields in the mixed + domain/discontinuous case. + + When https://github.com/FEniCS/dolfinx/issues/3207 is resolved we should be + able to support this + """ + + # check the domain of all kwargs and check that they are the same + + if mixed_domain and "t" in kwargs: + raise NotImplementedError( + "Time-dependent custom fields are not implemented in the case of a " + "mixed domain/discontinuous case." + "dolfinx.fem.Expression does not support co-dim 0 submeshes and time is" + "defined on the parent mesh." + "See https://github.com/FEniCS/dolfinx/issues/3207 for more details." + ) diff --git a/src/festim/helpers.py b/src/festim/helpers.py index 0a1ba5290..236d31159 100644 --- a/src/festim/helpers.py +++ b/src/festim/helpers.py @@ -34,6 +34,7 @@ def as_fenics_constant( ) +# TODO change this to accept species dependent values def as_mapped_function( value: Callable, function_space: fem.FunctionSpace | None = None, @@ -68,6 +69,7 @@ def as_mapped_function( return value(**kwargs) +# TODO change this to accept species dependent values def as_fenics_interp_expr_and_function( value: Callable, function_space: dolfinx.fem.function.FunctionSpace, diff --git a/src/festim/hydrogen_transport_problem.py b/src/festim/hydrogen_transport_problem.py index 14d9c35b4..b065d8ded 100644 --- a/src/festim/hydrogen_transport_problem.py +++ b/src/festim/hydrogen_transport_problem.py @@ -449,6 +449,21 @@ def initialise_exports(self): else: adios4dolfinx.write_mesh(export.filename, mesh=self.mesh.mesh) + elif isinstance(export, exports.CustomField): + export.function = fem.Function(self.V_CG_1) + export.set_dolfinx_expression( + temperature=self.temperature_fenics, + time=self.t, + ) + + export.writer = dolfinx.io.VTXWriter( + comm=export.function.function_space.mesh.comm, + filename=export.filename, + output=export.function, + engine="BP5", + ) + continue + elif isinstance(export, exports.SurfaceQuantity | exports.VolumeQuantity): # raise not implemented error if the derived quantity don't match the # type of mesh eg. SurfaceFlux is used with cylindrical mesh @@ -629,6 +644,7 @@ def define_function_spaces(self, element_degree: int = 1): ) self.V_DG_0 = fem.functionspace(self.mesh.mesh, element_DG0) self.V_DG_1 = fem.functionspace(self.mesh.mesh, element_DG1) + self.V_CG_1 = fem.functionspace(self.mesh.mesh, ("CG", 1)) self.u = fem.Function(self.function_space) self.u_n = fem.Function(self.function_space) @@ -646,6 +662,7 @@ def assign_functions_to_species(self): spe.sub_function_space = self.function_space.sub(idx) spe.sub_function = self.u.sub(idx) # TODO add this to discontinuous class spe.post_processing_solution = self.u.sub(idx).collapse() + spe.post_processing_solution.name = spe.name spe.collapsed_function_space, spe.map_sub_to_main_solution = ( self.function_space.sub(idx).collapse() ) @@ -981,6 +998,10 @@ def post_processing(self): self._get_temperature_field_as_function() ) export.writer.write(float(self.t)) + elif isinstance(export, exports.CustomField): + # update internal function + export.function.interpolate(export.dolfinx_expression) + export.writer.write(float(self.t)) # TODO if export type derived quantity if isinstance(export, exports.SurfaceQuantity): @@ -1095,6 +1116,7 @@ def __init__( self.interfaces = interfaces or [] self.surface_to_volume = surface_to_volume or {} self.subdomain_to_species = {} # maps subdomain to species defined in it + self.subdomain_to_V_CG1 = {} @property def method_interface(self): @@ -1335,6 +1357,10 @@ def define_function_spaces( u = dolfinx.fem.Function(V) u_n = dolfinx.fem.Function(V) + self.subdomain_to_V_CG1[subdomain] = dolfinx.fem.functionspace( + subdomain.submesh, ("CG", 1) + ) + # store attributes in the subdomain object subdomain.u = u subdomain.u_n = u_n @@ -1662,7 +1688,21 @@ def initialise_exports(self): export.filename, mesh=functions[0].function_space.mesh, ) + elif isinstance(export, exports.CustomField): + # need to find an appropriate function space on the right submesh + V = self.subdomain_to_V_CG1[export.subdomain] + export.function = fem.Function(V) + export.set_dolfinx_expression( + temperature=self.temperature_fenics, # need to pass the right temperature + time=self.t, + ) + export.writer = dolfinx.io.VTXWriter( + comm=export.function.function_space.mesh.comm, + filename=export.filename, + output=export.function, + engine="BP5", + ) # compute diffusivity function for surface fluxes # for the discontinuous case, we don't use D_global as in # HydrogenTransportProblem @@ -1704,11 +1744,11 @@ def post_processing(self): continue # handle VTX exports if isinstance(export, exports.ExportBaseClass): - if not isinstance(export, exports.VTXSpeciesExport): - raise NotImplementedError( - f"Export type {type(export)} not implemented" - ) - if isinstance(export, exports.VTXSpeciesExport): + if isinstance(export, exports.CustomField): + # update internal function + export.function.interpolate(export.dolfinx_expression) + export.writer.write(float(self.t)) + elif isinstance(export, exports.VTXSpeciesExport): if export._checkpoint: for species in export.field: post_processing_solution = ( @@ -1724,7 +1764,10 @@ def post_processing(self): ) else: export.writer.write(float(self.t)) - + else: + raise NotImplementedError( + f"Export type {type(export)} not implemented" + ) # handle derived quantities if isinstance(export, exports.SurfaceQuantity): if isinstance( diff --git a/test/test_vtx.py b/test/test_vtx.py index edbb2e531..d6452ad1a 100644 --- a/test/test_vtx.py +++ b/test/test_vtx.py @@ -179,3 +179,227 @@ def test_filename_temp_raises_error_when_wrong_type(): """Test that the filename attribute for VTXTemperature export raises an error if the extension is not .bp""" with pytest.raises(TypeError): F.VTXTemperatureExport(1) + + +@pytest.mark.parametrize( + "expression", + [ + lambda x: x[0] + x[1] * 2, + lambda T: T + 1, + lambda c_A, c_B: c_A + c_B, + lambda c_A, c_B, x: c_A * c_B + x[0], + lambda c_A, T, x: c_A * T + x[0], + ], +) +def test_custom_field(tmp_path, expression): + """ + Test custom field export functionality. + This test checks that a custom field can be created with various types of + expressions. + """ + + my_model = F.HydrogenTransportProblem() + + mat = F.Material(D_0=1, E_D=0, K_S_0=1, E_K_S=0) + + vol = F.VolumeSubdomain(id=1, material=mat) + + top = F.SurfaceSubdomain(id=1, locator=lambda x: np.isclose(x[1], 1)) + bottom = F.SurfaceSubdomain(id=2, locator=lambda x: np.isclose(x[1], 0)) + left = F.SurfaceSubdomain(id=3, locator=lambda x: np.isclose(x[0], 0)) + right = F.SurfaceSubdomain(id=4, locator=lambda x: np.isclose(x[0], 1)) + + my_model.subdomains = [vol, top, bottom, left, right] + + dolfinx_mesh = dolfinx.mesh.create_unit_square(MPI.COMM_WORLD, 10, 10) + my_model.mesh = F.Mesh(dolfinx_mesh) + + A = F.Species("A") + B = F.Species("B") + C = F.Species("C") + D = F.Species("D") + + my_model.species = [A, B, C, D] + + my_model.boundary_conditions = ( + [ + F.FixedConcentrationBC(species=A, subdomain=top, value=1), + F.FixedConcentrationBC(species=B, subdomain=left, value=1), + ] + + [ + F.FixedConcentrationBC(species=C, subdomain=surf, value=0) + for surf in [top, bottom, left, right] + ] + + [ + F.FixedConcentrationBC(species=D, subdomain=surf, value=0) + for surf in [top, bottom, left, right] + ] + ) + + my_model.reactions = [ + F.Reaction( + reactant=[A, B], product=[C], k_0=1, E_k=0, p_0=0, E_p=0, volume=vol + ), + F.Reaction(reactant=[C], product=[D], k_0=0.1, E_k=0, p_0=0, E_p=0, volume=vol), + ] + + my_model.temperature = 300 + + my_model.settings = F.Settings(transient=False, atol=1e-9, rtol=1e-9) + + custom_field = F.CustomField( + filename=tmp_path / "custom_field.bp", + expression=expression, + species_dependent_value={"c_A": A, "c_B": B}, + ) + + my_model.exports = [ + custom_field, + ] + + my_model.initialise() + + my_model.run() + + +@pytest.mark.parametrize( + "expression", + [ + lambda x: x[0] + x[1] * 2, + lambda T: T + 1, + lambda c_A, c_B: c_A + c_B, + lambda c_A, T: c_A * T, + lambda c_A, c_B, x: c_A * c_B + x[0], + lambda c_A, T, x: c_A * T + x[0], + lambda T, x: T + x[0], + ], +) +def test_custom_field_discontinuous(tmp_path, expression): + """ + Test custom field export functionality. + This test checks that a custom field can be created with various types of + expressions. + """ + + my_model = F.HydrogenTransportProblemDiscontinuous() + + mat = F.Material(D_0=1, E_D=0, K_S_0=1, E_K_S=0) + + vol = F.VolumeSubdomain(id=1, material=mat) + + top = F.SurfaceSubdomain(id=1, locator=lambda x: np.isclose(x[1], 1)) + bottom = F.SurfaceSubdomain(id=2, locator=lambda x: np.isclose(x[1], 0)) + left = F.SurfaceSubdomain(id=3, locator=lambda x: np.isclose(x[0], 0)) + right = F.SurfaceSubdomain(id=4, locator=lambda x: np.isclose(x[0], 1)) + + my_model.subdomains = [vol, top, bottom, left, right] + + my_model.surface_to_volume = {top: vol, bottom: vol, left: vol, right: vol} + + dolfinx_mesh = dolfinx.mesh.create_unit_square(MPI.COMM_WORLD, 10, 10) + my_model.mesh = F.Mesh(dolfinx_mesh) + + A = F.Species("A", subdomains=my_model.volume_subdomains) + B = F.Species("B", subdomains=my_model.volume_subdomains) + C = F.Species("C", subdomains=my_model.volume_subdomains) + D = F.Species("D", subdomains=my_model.volume_subdomains) + + my_model.species = [A, B, C, D] + + my_model.boundary_conditions = ( + [ + F.FixedConcentrationBC(species=A, subdomain=top, value=1), + F.FixedConcentrationBC(species=B, subdomain=left, value=1), + ] + + [ + F.FixedConcentrationBC(species=C, subdomain=surf, value=0) + for surf in [top, bottom, left, right] + ] + + [ + F.FixedConcentrationBC(species=D, subdomain=surf, value=0) + for surf in [top, bottom, left, right] + ] + ) + + my_model.temperature = lambda x: 300 + 100 * x[0] + + my_model.settings = F.Settings(transient=False, atol=1e-9, rtol=1e-9) + + custom_field = F.CustomField( + filename=tmp_path / "custom_field.bp", + expression=expression, + species_dependent_value={"c_A": A, "c_B": B}, + subdomain=vol, + ) + + my_model.exports = [ + custom_field, + ] + + my_model.initialise() + + my_model.run() + + +@pytest.mark.parametrize( + "expression", + [lambda c_A, t: c_A * t, lambda t: t], +) +def test_custom_field_not_implemented_error(expression): + my_model = F.HydrogenTransportProblemDiscontinuous() + + mat = F.Material(D_0=1, E_D=0, K_S_0=1, E_K_S=0) + + vol = F.VolumeSubdomain(id=1, material=mat) + + top = F.SurfaceSubdomain(id=1, locator=lambda x: np.isclose(x[1], 1)) + bottom = F.SurfaceSubdomain(id=2, locator=lambda x: np.isclose(x[1], 0)) + left = F.SurfaceSubdomain(id=3, locator=lambda x: np.isclose(x[0], 0)) + right = F.SurfaceSubdomain(id=4, locator=lambda x: np.isclose(x[0], 1)) + + my_model.subdomains = [vol, top, bottom, left, right] + + my_model.surface_to_volume = {top: vol, bottom: vol, left: vol, right: vol} + + dolfinx_mesh = dolfinx.mesh.create_unit_square(MPI.COMM_WORLD, 10, 10) + my_model.mesh = F.Mesh(dolfinx_mesh) + + A = F.Species("A", subdomains=my_model.volume_subdomains) + B = F.Species("B", subdomains=my_model.volume_subdomains) + C = F.Species("C", subdomains=my_model.volume_subdomains) + D = F.Species("D", subdomains=my_model.volume_subdomains) + + my_model.species = [A, B, C, D] + + my_model.boundary_conditions = ( + [ + F.FixedConcentrationBC(species=A, subdomain=top, value=1), + F.FixedConcentrationBC(species=B, subdomain=left, value=1), + ] + + [ + F.FixedConcentrationBC(species=C, subdomain=surf, value=0) + for surf in [top, bottom, left, right] + ] + + [ + F.FixedConcentrationBC(species=D, subdomain=surf, value=0) + for surf in [top, bottom, left, right] + ] + ) + + my_model.temperature = lambda x: 300 + 100 * x[0] + + my_model.settings = F.Settings(transient=False, atol=1e-9, rtol=1e-9) + + custom_field = F.CustomField( + filename="custom_field.bp", + expression=expression, + species_dependent_value={"c_A": A, "c_B": B}, + subdomain=vol, + ) + + my_model.exports = [ + custom_field, + ] + + with pytest.raises(NotImplementedError): + my_model.initialise()