-
Notifications
You must be signed in to change notification settings - Fork 103
Expand file tree
/
Copy pathabstract_problem.py
More file actions
328 lines (286 loc) · 12.2 KB
/
abstract_problem.py
File metadata and controls
328 lines (286 loc) · 12.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
"""Module for the AbstractProblem class."""
from abc import ABCMeta, abstractmethod
from copy import deepcopy
from ..utils import check_consistency
from ..domain import DomainInterface, CartesianDomain
from ..condition.domain_equation_condition import DomainEquationCondition
from ..label_tensor import LabelTensor
from ..utils import merge_tensors
class AbstractProblem(metaclass=ABCMeta):
"""
Abstract base class for PINA problems. All specific problem types should
inherit from this class.
A PINA problem is defined by key components, which typically include output
variables, conditions, and domains over which the conditions are applied.
"""
def __init__(self):
"""
Initialization of the :class:`AbstractProblem` class.
"""
self._discretised_domains = {}
# create hook conditions <-> problems
for condition_name in self.conditions:
self.conditions[condition_name].problem = self
# Store in domains dict all the domains object directly passed to
# ConditionInterface. Done for back compatibility with PINA <0.2
if not hasattr(self, "domains"):
self.domains = {}
for cond_name, cond in self.conditions.items():
if isinstance(cond, DomainEquationCondition):
if isinstance(cond.domain, DomainInterface):
self.domains[cond_name] = cond.domain
cond.domain = cond_name
self._collected_data = {}
@property
def collected_data(self):
"""
Return the collected data from the problem's conditions.
:return: The collected data. Keys are condition names, and values are
dictionaries containing the input points and the corresponding
equations or target points.
:rtype: dict
"""
if not self._collected_data:
raise RuntimeError(
"You have to call collect_data() before accessing the data."
)
return self._collected_data
# back compatibility 0.1
@property
def input_pts(self):
"""
Return a dictionary mapping condition names to their corresponding
input points.
:return: The input points of the problem.
:rtype: dict
"""
to_return = {}
if self._collected_data is None:
raise RuntimeError(
"You have to call collect_data() before accessing the data."
)
for cond_name, data in self._collected_data.items():
to_return[cond_name] = data["input"]
return to_return
@property
def discretised_domains(self):
"""
Return a dictionary mapping domains to their corresponding sampled
points.
:return: The discretised domains.
:rtype: dict
"""
return self._discretised_domains
def __deepcopy__(self, memo):
"""
Perform a deep copy of the :class:`AbstractProblem` instance.
:param dict memo: A dictionary used to track objects already copied
during the deep copy process to prevent redundant copies.
:return: A deep copy of the :class:`AbstractProblem` instance.
:rtype: AbstractProblem
"""
cls = self.__class__
result = cls.__new__(cls)
memo[id(self)] = result
for k, v in self.__dict__.items():
setattr(result, k, deepcopy(v, memo))
return result
@property
def are_all_domains_discretised(self):
"""
Check if all the domains are discretised.
:return: ``True`` if all domains are discretised, ``False`` otherwise.
:rtype: bool
"""
return all(
domain in self.discretised_domains for domain in self.domains
)
@property
def input_variables(self):
"""
Get the input variables of the problem.
:return: The input variables of the problem.
:rtype: list[str]
"""
variables = []
if hasattr(self, "spatial_variables"):
variables += self.spatial_variables
if hasattr(self, "temporal_variable"):
variables += self.temporal_variable
if hasattr(self, "parameters"):
variables += self.parameters
return variables
@input_variables.setter
def input_variables(self, variables):
"""
Set the input variables of the AbstractProblem.
:param list[str] variables: The input variables of the problem.
:raises RuntimeError: Not implemented.
"""
raise RuntimeError
@property
@abstractmethod
def output_variables(self):
"""
Get the output variables of the problem.
"""
@property
@abstractmethod
def conditions(self):
"""
Get the conditions of the problem.
:return: The conditions of the problem.
:rtype: dict
"""
return self.conditions
def discretise_domain(
self, n=None, mode="random", domains="all", sample_rules=None
):
"""
Discretize the problem's domains by sampling a specified number of
points according to the selected sampling mode.
:param int n: The number of points to sample.
:param mode: The sampling method. Default is ``random``.
Available modes include: random sampling, ``random``;
latin hypercube sampling, ``latin`` or ``lh``;
chebyshev sampling, ``chebyshev``; grid sampling ``grid``.
:param domains: The domains from which to sample. Default is ``all``.
:type domains: str | list[str]
:param dict sample_rules: A dictionary defining custom sampling rules
for input variables. If provided, it must contain a dictionary
specifying the sampling rule for each variable, overriding the
``n`` and ``mode`` arguments. Each key must correspond to the
input variables from
:meth:~pina.problem.AbstractProblem.input_variables, and its value
should be another dictionary with
two keys: ``n`` (number of points to sample) and ``mode``
(sampling method). Defaults to None.
:raises RuntimeError: If both ``n`` and ``sample_rules`` are specified.
:raises RuntimeError: If neither ``n`` nor ``sample_rules`` are set.
:Example:
>>> problem.discretise_domain(n=10, mode='grid')
>>> problem.discretise_domain(n=10, mode='grid', domains=['gamma1'])
>>> problem.discretise_domain(
... sample_rules={
... 'x': {'n': 10, 'mode': 'grid'},
... 'y': {'n': 100, 'mode': 'grid'}
... },
... domains=['D']
... )
.. warning::
``random`` is currently the only implemented ``mode`` for all
geometries, i.e. :class:`~pina.domain.ellipsoid.EllipsoidDomain`,
:class:`~pina.domain.cartesian.CartesianDomain`,
:class:`~pina.domain.simplex.SimplexDomain`, and geometry
compositions :class:`~pina.domain.union_domain.Union`,
:class:`~pina.domain.difference_domain.Difference`,
:class:`~pina.domain.exclusion_domain.Exclusion`, and
:class:`~pina.domain.intersection_domain.Intersection`.
The modes ``latin`` or ``lh``, ``chebyshev``, ``grid`` are only
implemented for :class:`~pina.domain.cartesian.CartesianDomain`.
.. warning::
If custom discretisation is applied by setting ``sample_rules`` not
to ``None``, then the discretised domain must be of class
:class:`~pina.domain.cartesian.CartesianDomain`
"""
# check consistecy n, mode, variables, locations
if sample_rules is not None:
check_consistency(sample_rules, dict)
if mode is not None:
check_consistency(mode, str)
check_consistency(domains, (list, str))
# check correct location
if domains == "all":
domains = self.domains.keys()
elif not isinstance(domains, (list)):
domains = [domains]
if n is not None and sample_rules is None:
self._apply_default_discretization(n, mode, domains)
if n is None and sample_rules is not None:
self._apply_custom_discretization(sample_rules, domains)
elif n is not None and sample_rules is not None:
raise RuntimeError(
"You can't specify both n and sample_rules at the same time."
)
elif n is None and sample_rules is None:
raise RuntimeError("You have to specify either n or sample_rules.")
def _apply_default_discretization(self, n, mode, domains):
"""
Apply default discretization to the problem's domains.
:param int n: The number of points to sample.
:param mode: The sampling method.
:param domains: The domains from which to sample.
:type domains: str | list[str]
"""
for domain in domains:
self.discretised_domains[domain] = (
self.domains[domain].sample(n, mode).sort_labels()
)
def _apply_custom_discretization(self, sample_rules, domains):
"""
Apply custom discretization to the problem's domains.
:param dict sample_rules: A dictionary of custom sampling rules.
:param domains: The domains from which to sample.
:type domains: str | list[str]
:raises RuntimeError: If the keys of the sample_rules dictionary are not
the same as the input variables.
:raises RuntimeError: If custom discretisation is applied on a domain
that is not a CartesianDomain.
"""
if sorted(list(sample_rules.keys())) != sorted(self.input_variables):
raise RuntimeError(
"The keys of the sample_rules dictionary must be the same as "
"the input variables."
)
for domain in domains:
if not isinstance(self.domains[domain], CartesianDomain):
raise RuntimeError(
"Custom discretisation can be applied only on Cartesian "
"domains"
)
discretised_tensor = []
for var, rules in sample_rules.items():
n, mode = rules["n"], rules["mode"]
points = self.domains[domain].sample(n, mode, var)
discretised_tensor.append(points)
self.discretised_domains[domain] = merge_tensors(
discretised_tensor
).sort_labels()
def add_points(self, new_points_dict):
"""
Add new points to an already sampled domain.
:param dict new_points_dict: The dictionary mapping new points to their
corresponding domain.
"""
for k, v in new_points_dict.items():
self.discretised_domains[k] = LabelTensor.vstack(
[self.discretised_domains[k], v]
)
def collect_data(self):
"""
Aggregate data from the problem's conditions into a single dictionary.
"""
data = {}
# check if all domains are discretised
if not self.are_all_domains_discretised:
raise RuntimeError(
"All domains must be discretised before aggregating data."
)
# Iterate over the conditions and collect data
for condition_name in self.conditions:
condition = self.conditions[condition_name]
# Check if the condition has an domain attribute
if hasattr(condition, "domain"):
# Store the discretisation points
samples = self.discretised_domains[condition.domain]
data[condition_name] = {
"input": samples,
"equation": condition.equation,
}
else:
# If the condition does not have a domain attribute, store
# the input and target points
keys = condition.__slots__
values = [getattr(condition, name) for name in keys]
data[condition_name] = dict(zip(keys, values))
self._collected_data = data