|
20 | 20 |
|
21 | 21 | """ |
22 | 22 | from __future__ import annotations |
| 23 | +import concurrent.futures |
23 | 24 | import copy |
24 | 25 | from contextlib import ExitStack |
25 | 26 | from functools import partial |
26 | | -from typing import Optional, Iterable, Tuple |
| 27 | +from typing import Optional, Iterable, Tuple, Callable |
27 | 28 |
|
28 | 29 | import numpy as np |
29 | 30 | from numpy.typing import NDArray, ArrayLike |
30 | 31 |
|
31 | | -from mapflpy.globals import DEFAULT_BUFFER_SIZE, Traces, PathType, DirectionType |
| 32 | +from mapflpy.globals import DEFAULT_BUFFER_SIZE, Traces, Mapping, PathType, DirectionType, ContextType |
32 | 33 | from mapflpy.tracer import TracerMP |
33 | 34 | from mapflpy.utils import shift_phi_traces, shift_phi_lps, fetch_default_launch_points |
34 | 35 |
|
@@ -180,6 +181,240 @@ def run_fwdbwd_tracing(br: PathType, |
180 | 181 | return tracer.trace_fbwd(launch_points, buffer_size) |
181 | 182 |
|
182 | 183 |
|
| 184 | +def map_field_lines_in_parallel(trace_function: Callable, |
| 185 | + r_in: ArrayLike, |
| 186 | + t_in: ArrayLike, |
| 187 | + p_in: ArrayLike, |
| 188 | + nproc: int = 4): |
| 189 | + """Map field lines from r, t, p positions in parallel using a given tracing script. |
| 190 | +
|
| 191 | + The `trace_function` is a callable function that wraps the desired mapflpy script that uses TracerMP in |
| 192 | + the background. It must only accept one argument (`launch_points`) and otherwise wraps a tracing |
| 193 | + script that is defined with any specific keywords that determine how the trace is done. |
| 194 | + Currently only :func:`run_forward_tracing`, :func:`run_backward_tracing` make sense to wrap here. |
| 195 | +
|
| 196 | + The r,t,p input arrays can be any shape (1D, 2D, 3D, etc.) as long as it is consistent. |
| 197 | +
|
| 198 | + .. warning:: |
| 199 | + The wrapped trace function should specify the `buffer_size` keyword to a small number (e.g. 3) |
| 200 | + since the trace geometry is not used anyway. This is essential for keeping the memory footprint |
| 201 | + small and for the mapping to compute as quickly as possible. |
| 202 | +
|
| 203 | + Parameters |
| 204 | + ---------- |
| 205 | + trace_function : Callable |
| 206 | + A function that wraps the mapflpy forward or backwards tracing script to call in parallel |
| 207 | + (:func:`run_forward_tracing`, :func:`run_backward_tracing`) and sets the desired parameters to |
| 208 | + run the wrapped script with. |
| 209 | + Only the endpoints are used, so only one of forward or backwards mapping makes sense. |
| 210 | + r_in : ArrayLike |
| 211 | + An N-Dimensional array of radial positions to map from. |
| 212 | + t_in : ArrayLike |
| 213 | + An N-Dimensional array of theta positions to map from (shape must match `r_in`). |
| 214 | + p_in : ArrayLike |
| 215 | + An N-Dimensional array of phi positions to map from (shape must match `r_in`). |
| 216 | + nproc : int, optional |
| 217 | + The number of processes to spawn. This should be equal to or less than the number of threads |
| 218 | + that can be used on the machine. |
| 219 | +
|
| 220 | + Returns |
| 221 | + ------- |
| 222 | + mapping : :class:`~mapflpy.globals.Mapping` |
| 223 | + A namedtuple containing the mapping results (:class:`mapflpy.globals.Mapping`). |
| 224 | +
|
| 225 | + See Also |
| 226 | + -------- |
| 227 | + :func:`map_pt_forward` |
| 228 | + :func:`map_pt_backward` |
| 229 | + :func:`run_forward_tracing` |
| 230 | + :func:`run_backward_tracing` |
| 231 | + """ |
| 232 | + |
| 233 | + # convert the r,t,p positions to a list of launchpoints split into nproc chunks |
| 234 | + r1d = r_in.ravel() |
| 235 | + t1d = t_in.ravel() |
| 236 | + p1d = p_in.ravel() |
| 237 | + lp_all = np.stack([r1d, t1d, p1d], axis=0) |
| 238 | + lp_chunks = np.array_split(lp_all, nproc, axis=1) |
| 239 | + |
| 240 | + # run the mapping function on nprocs |
| 241 | + with concurrent.futures.ThreadPoolExecutor(max_workers=nproc) as executor: |
| 242 | + results = list(executor.map(trace_function, lp_chunks)) |
| 243 | + |
| 244 | + # collect the results back into one array |
| 245 | + end_pos_1d = np.concatenate([result.end_pos for result in results], axis=1) |
| 246 | + traced_to_boundary_1d = np.concatenate([result.traced_to_boundary for result in results]) |
| 247 | + integral_1d = np.concatenate([result.integral for result in results]) |
| 248 | + |
| 249 | + # reshape and return the Mapping object |
| 250 | + r_out = np.reshape(end_pos_1d[0, :], r_in.shape) |
| 251 | + t_out = np.reshape(end_pos_1d[1, :], t_in.shape) |
| 252 | + p_out = np.reshape(end_pos_1d[2, :], p_in.shape) |
| 253 | + traced_to_boundary = np.reshape(traced_to_boundary_1d, r_in.shape) |
| 254 | + integral = np.reshape(integral_1d, r_in.shape) |
| 255 | + |
| 256 | + return Mapping(r_out, t_out, p_out, traced_to_boundary, integral) |
| 257 | + |
| 258 | + |
| 259 | +def map_pt_forward(br: PathType, |
| 260 | + bt: PathType, |
| 261 | + bp: PathType, |
| 262 | + p1d: ArrayLike, |
| 263 | + t1d: ArrayLike, |
| 264 | + radius: float = 1.0, |
| 265 | + timeout: float = 3600., |
| 266 | + context: Optional[ContextType] = 'fork', |
| 267 | + nproc: int = 4, |
| 268 | + **mapfl_params): |
| 269 | + """Map field lines forwards on a phi, theta grid at constant radius. |
| 270 | +
|
| 271 | + The inputs are similar to instantiating a Tracer class except one also |
| 272 | + specifies 1D arrays that define a grid in phi, theta and a radius to map from. |
| 273 | +
|
| 274 | + Currently the mapping radius defaults to 1.0 or must be set. In the future |
| 275 | + this will default to the inner radius of the main mesh of the B files. |
| 276 | +
|
| 277 | + Notes |
| 278 | + ----- |
| 279 | + This is a very specific helper function for a common task. Generic mappings |
| 280 | + can be built by following this as an example for wrapping :func:`map_field_lines_in_parallel`. |
| 281 | +
|
| 282 | + Parameters |
| 283 | + ---------- |
| 284 | + br : PathType |
| 285 | + Path to the Br magnetic field file. |
| 286 | + bt : PathType |
| 287 | + Path to the Bt magnetic field file. |
| 288 | + bp : PathType |
| 289 | + Path to the Bp magnetic field file. |
| 290 | + p1d : ArrayLike |
| 291 | + A 1D numpy array of phi (longitude) positions in radians that will be used to |
| 292 | + build the mapping grid. |
| 293 | + t1d : ArrayLike |
| 294 | + A 1D numpy array of theta (co-latitude) positions in radians that will be used to |
| 295 | + build the mapping grid. |
| 296 | + radius : float, optional |
| 297 | + Radius to map forward from. Default 1.0. |
| 298 | + timeout : float, optional |
| 299 | + Timeout in seconds for interprocess communication. Default is 3600 seconds. |
| 300 | + context : ContextType, optional |
| 301 | + The multiprocessing context to use when spawning the subprocess. Since many |
| 302 | + processes are launched, generally 'fork' is recommended. Behavior may depend |
| 303 | + on the system architecture. Default is 'fork'. |
| 304 | + nproc : int, optional |
| 305 | + The number of processes to spawn. This should be equal to or less than the number of threads |
| 306 | + that can be used on the machine. |
| 307 | + **mapfl_params : dict |
| 308 | + Additional tracing parameters passed to the subprocess. |
| 309 | +
|
| 310 | + Returns |
| 311 | + ------- |
| 312 | + mapping : :class:`~mapflpy.globals.Mapping` |
| 313 | + A namedtuple containing the mapping results (:class:`mapflpy.globals.Mapping`). |
| 314 | +
|
| 315 | + See Also |
| 316 | + -------- |
| 317 | + :func:`map_field_lines_in_parallel` |
| 318 | + """ |
| 319 | + |
| 320 | + # build a 2D grid of p and t locations from the input 1D arrays |
| 321 | + p2d, t2d = np.meshgrid(p1d, t1d) |
| 322 | + # the r locations are on the specified radius |
| 323 | + r2d = np.ones_like(p2d)*radius |
| 324 | + |
| 325 | + # define the wrapped trace function |
| 326 | + def trace_function(launch_points): |
| 327 | + traces = run_forward_tracing(br, bt, bp, launch_points=launch_points, |
| 328 | + buffer_size=3, context=context, timeout=timeout, |
| 329 | + **mapfl_params) |
| 330 | + return traces |
| 331 | + |
| 332 | + # compute the mapping |
| 333 | + mapping = map_field_lines_in_parallel(trace_function, r2d, t2d, p2d, nproc=nproc) |
| 334 | + |
| 335 | + return mapping |
| 336 | + |
| 337 | + |
| 338 | +def map_pt_backward(br: PathType, |
| 339 | + bt: PathType, |
| 340 | + bp: PathType, |
| 341 | + p1d: ArrayLike, |
| 342 | + t1d: ArrayLike, |
| 343 | + radius: float = 30.0, |
| 344 | + timeout: float = 3600., |
| 345 | + context: Optional[ContextType] = 'fork', |
| 346 | + nproc: int = 4, |
| 347 | + **mapfl_params): |
| 348 | + """Map field lines backwards on a phi, theta grid at constant radius. |
| 349 | +
|
| 350 | + The inputs are similar to instantiating a Tracer class except one also |
| 351 | + specifies 1D arrays that define a grid in phi, theta and a radius to map from. |
| 352 | +
|
| 353 | + Currently the mapping radius defaults to 30 or must be set. In the future |
| 354 | + this will default to the outer radius of the main mesh of the B files. |
| 355 | +
|
| 356 | + Notes |
| 357 | + ----- |
| 358 | + This is a very specific helper function for a common task. Generic mappings |
| 359 | + can be built by following this as an example for wrapping :func:`map_field_lines_in_parallel`. |
| 360 | +
|
| 361 | + Parameters |
| 362 | + ---------- |
| 363 | + br : PathType |
| 364 | + Path to the Br magnetic field file. |
| 365 | + bt : PathType |
| 366 | + Path to the Bt magnetic field file. |
| 367 | + bp : PathType |
| 368 | + Path to the Bp magnetic field file. |
| 369 | + p1d : ArrayLike |
| 370 | + A 1D numpy array of phi (longitude) positions in radians that will be used to |
| 371 | + build the mapping grid. |
| 372 | + t1d : ArrayLike |
| 373 | + A 1D numpy array of theta (co-latitude) positions in radians that will be used to |
| 374 | + build the mapping grid. |
| 375 | + radius : float, optional |
| 376 | + Radius to map backwards from. Default 30.0. |
| 377 | + timeout : float, optional |
| 378 | + Timeout in seconds for interprocess communication. Default is 3600 seconds. |
| 379 | + context : ContextType, optional |
| 380 | + The multiprocessing context to use when spawning the subprocess. Since many |
| 381 | + processes are launched, generally 'fork' is recommended. Behavior may depend |
| 382 | + on the system architecture. Default is 'fork'. |
| 383 | + nproc : int, optional |
| 384 | + The number of processes to spawn. This should be equal to or less than the number of threads |
| 385 | + that can be used on the machine. |
| 386 | + **mapfl_params : dict |
| 387 | + Additional tracing parameters passed to the subprocess. |
| 388 | +
|
| 389 | + Returns |
| 390 | + ------- |
| 391 | + mapping : :class:`~mapflpy.globals.Mapping` |
| 392 | + A namedtuple containing the mapping results (:class:`mapflpy.globals.Mapping`). |
| 393 | +
|
| 394 | + See Also |
| 395 | + -------- |
| 396 | + :func:`map_field_lines_in_parallel` |
| 397 | + """ |
| 398 | + |
| 399 | + |
| 400 | + # build a 2D grid of p and t locations from the input 1D arrays |
| 401 | + p2d, t2d = np.meshgrid(p1d, t1d) |
| 402 | + # the r locations are on the specified radius |
| 403 | + r2d = np.ones_like(p2d)*radius |
| 404 | + |
| 405 | + # define the wrapped trace function |
| 406 | + def trace_function(launch_points): |
| 407 | + traces = run_backward_tracing(br, bt, bp, launch_points=launch_points, |
| 408 | + buffer_size=3, context=context, timeout=timeout, |
| 409 | + **mapfl_params) |
| 410 | + return traces |
| 411 | + |
| 412 | + # compute the mapping |
| 413 | + mapping = map_field_lines_in_parallel(trace_function, r2d, t2d, p2d, nproc=nproc) |
| 414 | + |
| 415 | + return mapping |
| 416 | + |
| 417 | + |
183 | 418 | def inter_domain_tracing(br_cor: PathType, |
184 | 419 | bt_cor: PathType, |
185 | 420 | bp_cor: PathType, |
|
0 commit comments