Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 31 additions & 21 deletions pina/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,44 +275,42 @@ def fast_laplacian(output_, input_, components, d, method="std"):
def fast_advection(output_, input_, velocity_field, components, d):
"""
Perform the advection operation on the ``output_`` with respect to the
``input``. This operator support vector-valued functions with multiple input
coordinates.
``input``. This operator supports vector-valued functions with multiple
input coordinates.

Unlike ``advection``, this function performs no internal checks on input and
output tensors. The user is required to specify both ``components`` and
``d`` as lists of strings. It is designed to enhance computation speed.

:param LabelTensor output_: The output tensor on which the advection is
computed.
computed. It includes both the velocity and the quantity to be advected.
:param LabelTensor input_: the input tensor with respect to which advection
is computed.
:param str velocity_field: The name of the output variable used as velocity
field. It must be chosen among the output labels.
:param list[str] velocity_field: The name of the output variables used as
velocity field. It must be chosen among the output labels.
:param list[str] components: The names of the output variables for which to
compute the advection. It must be a subset of the output labels.
:param list[str] d: The names of the input variables with respect to which
the advection is computed. It must be a subset of the input labels.
:return: The computed advection tensor.
:rtype: torch.Tensor
:rtype: LabelTensor
"""
# Add a dimension to the velocity field for following operations
velocity = output_.extract(velocity_field).unsqueeze(-1)

# Remove the velocity field from the components
filter_components = [c for c in components if c != velocity_field]

# Compute the gradient
grads = fast_grad(
output_=output_, input_=input_, components=filter_components, d=d
output_=output_, input_=input_, components=components, d=d
)

# Reshape into [..., len(filter_components), len(d)]
tmp = grads.reshape(*output_.shape[:-1], len(filter_components), len(d))
tmp = grads.reshape(*output_.shape[:-1], len(components), len(d))

# Transpose to [..., len(d), len(filter_components)]
tmp = tmp.transpose(-1, -2)

return (tmp * velocity).sum(dim=tmp.tensor.ndim - 2)
adv = (tmp * velocity).sum(dim=tmp.tensor.ndim - 2)
return LabelTensor(adv, labels=[f"adv_{c}" for c in components])


def grad(output_, input_, components=None, d=None):
Expand Down Expand Up @@ -425,15 +423,16 @@ def laplacian(output_, input_, components=None, d=None, method="std"):
def advection(output_, input_, velocity_field, components=None, d=None):
"""
Perform the advection operation on the ``output_`` with respect to the
``input``. This operator support vector-valued functions with multiple input
coordinates.
``input``. This operator supports vector-valued functions with multiple
input coordinates.

:param LabelTensor output_: The output tensor on which the advection is
computed.
computed. It includes both the velocity and the quantity to be advected.
:param LabelTensor input_: the input tensor with respect to which advection
is computed.
:param str velocity_field: The name of the output variable used as velocity
:param velocity_field: The name of the output variables used as velocity
field. It must be chosen among the output labels.
:type velocity_field: str | list[str]
:param components: The names of the output variables for which to compute
the advection. It must be a subset of the output labels.
If ``None``, all output variables are considered. Default is ``None``.
Expand All @@ -444,18 +443,29 @@ def advection(output_, input_, velocity_field, components=None, d=None):
:type d: str | list[str]
:raises TypeError: If the input tensor is not a LabelTensor.
:raises TypeError: If the output tensor is not a LabelTensor.
:raises RuntimeError: If the velocity field is not in the output labels.
:raises RuntimeError: If the velocity field is not a subset of the output
labels.
:raises RuntimeError: If the dimensionality of the velocity field does not
match that of the input tensor.
:return: The computed advection tensor.
:rtype: torch.Tensor
:rtype: LabelTensor
"""
components, d = _check_values(
output_=output_, input_=input_, components=components, d=d
)

# Check if velocity field is present in the output labels
if velocity_field not in output_.labels:
# Map velocity_field to a list if it is a string
if isinstance(velocity_field, str):
velocity_field = [velocity_field]

# Check if all the velocity_field labels are present in the output labels
if not all(vi in output_.labels for vi in velocity_field):
raise RuntimeError("Velocity labels missing from output tensor.")

# Check if the velocity has the same dimensionality as the input tensor
if len(velocity_field) != len(d):
raise RuntimeError(
f"Velocity {velocity_field} is not present in the output labels."
"Velocity dimensionality does not match input dimensionality."
)

return fast_advection(
Expand Down
185 changes: 173 additions & 12 deletions tests/test_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,22 +296,183 @@ def test_laplacian(f):
laplacian(output_=output_, input_=input_, components=["a", "b", "c"])


def test_advection():
def test_advection_scalar():

# Define input and output
# Define 3-dimensional input
input_ = torch.rand((20, 3), requires_grad=True)
input_ = LabelTensor(input_, ["x", "y", "z"])
output_ = LabelTensor(input_**2, ["u", "v", "c"])

# Define the velocity field
velocity = output_.extract(["c"])
# Define 3-dimensional velocity field and quantity to be advected
velocity = torch.rand((20, 3), requires_grad=True)
field = torch.sum(input_**2, dim=-1, keepdim=True)

# Combine velocity and field into a LabelTensor
labels = ["ux", "uy", "uz", "c"]
output_ = LabelTensor(torch.cat((velocity, field), dim=1), labels)

# Compute the pina advection
components = ["c"]
pina_adv = advection(
output_=output_,
input_=input_,
velocity_field=["ux", "uy", "uz"],
components=components,
d=["x", "y", "z"],
)

# Compute the true advection
grads = 2 * input_
true_adv = torch.sum(grads * velocity, dim=grads.ndim - 1, keepdim=True)

# Check the shape, labels, and value of the advection
assert pina_adv.shape == (*output_.shape[:-1], len(components))
assert pina_adv.labels == ["adv_c"]
assert torch.allclose(pina_adv, true_adv)

# Should fail if input not a LabelTensor
with pytest.raises(TypeError):
advection(
output_=output_,
input_=input_.tensor,
velocity_field=["ux", "uy", "uz"],
)

# Should fail if output not a LabelTensor
with pytest.raises(TypeError):
advection(
output_=output_.tensor,
input_=input_,
velocity_field=["ux", "uy", "uz"],
)

# Should fail for non-existent input labels
with pytest.raises(RuntimeError):
advection(
output_=output_,
input_=input_,
d=["x", "a"],
velocity_field=["ux", "uy", "uz"],
)

# Should fail for non-existent output labels
with pytest.raises(RuntimeError):
advection(
output_=output_,
input_=input_,
components=["a", "b", "c"],
velocity_field=["ux", "uy", "uz"],
)

# Should fail if velocity_field labels are not present in the output labels
with pytest.raises(RuntimeError):
advection(
output_=output_,
input_=input_,
velocity_field=["ux", "uy", "nonexistent"],
components=["c"],
)

# Should fail if velocity_field dimensionality does not match input tensor
with pytest.raises(RuntimeError):
advection(
output_=output_,
input_=input_,
velocity_field=["ux", "uy"],
components=["c"],
)


def test_advection_vector():

# Compute the true advection and the pina advection
pina_advection = advection(
output_=output_, input_=input_, velocity_field="c"
# Define 3-dimensional input
input_ = torch.rand((20, 3), requires_grad=True)
input_ = LabelTensor(input_, ["x", "y", "z"])

# Define 3-dimensional velocity field
velocity = torch.rand((20, 3), requires_grad=True)

# Define 2-dimensional field to be advected
field_1 = torch.sum(input_**2, dim=-1, keepdim=True)
field_2 = torch.sum(input_**3, dim=-1, keepdim=True)

# Combine velocity and field into a LabelTensor
labels = ["ux", "uy", "uz", "c1", "c2"]
output_ = LabelTensor(
torch.cat((velocity, field_1, field_2), dim=1), labels
)

# Compute the pina advection
components = ["c1", "c2"]
pina_adv = advection(
output_=output_,
input_=input_,
velocity_field=["ux", "uy", "uz"],
components=components,
d=["x", "y", "z"],
)
true_advection = velocity * 2 * input_.extract(["x", "y"])

# Check the shape of the advection
assert pina_advection.shape == (*output_.shape[:-1], output_.shape[-1] - 1)
assert torch.allclose(pina_advection, true_advection)
# Compute the true gradients of the fields "c1", "c2"
grads1 = 2 * input_
grads2 = 3 * input_**2

# Compute the true advection for each field
true_adv1 = torch.sum(grads1 * velocity, dim=grads1.ndim - 1, keepdim=True)
true_adv2 = torch.sum(grads2 * velocity, dim=grads2.ndim - 1, keepdim=True)
true_adv = torch.cat((true_adv1, true_adv2), dim=-1)

# Check the shape, labels, and value of the advection
assert pina_adv.shape == (*output_.shape[:-1], len(components))
assert pina_adv.labels == ["adv_c1", "adv_c2"]
assert torch.allclose(pina_adv, true_adv)

# Should fail if input not a LabelTensor
with pytest.raises(TypeError):
advection(
output_=output_,
input_=input_.tensor,
velocity_field=["ux", "uy", "uz"],
)

# Should fail if output not a LabelTensor
with pytest.raises(TypeError):
advection(
output_=output_.tensor,
input_=input_,
velocity_field=["ux", "uy", "uz"],
)

# Should fail for non-existent input labels
with pytest.raises(RuntimeError):
advection(
output_=output_,
input_=input_,
d=["x", "a"],
velocity_field=["ux", "uy", "uz"],
)

# Should fail for non-existent output labels
with pytest.raises(RuntimeError):
advection(
output_=output_,
input_=input_,
components=["a", "b", "c"],
velocity_field=["ux", "uy", "uz"],
)

# Should fail if velocity_field labels are not present in the output labels
with pytest.raises(RuntimeError):
advection(
output_=output_,
input_=input_,
velocity_field=["ux", "uy", "nonexistent"],
components=["c"],
)

# Should fail if velocity_field dimensionality does not match input tensor
with pytest.raises(RuntimeError):
advection(
output_=output_,
input_=input_,
velocity_field=["ux", "uy"],
components=["c"],
)