Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
a50137f
Implemented CuPy version of `DistArray`
brownbaerchen Dec 8, 2023
e770b56
Cleanup and tests
brownbaerchen Dec 8, 2023
424fdb1
Added `cupy` backend for FFTs
brownbaerchen Dec 12, 2023
44be7bc
Removed `useCuPy` argument from `NewDistArray`
brownbaerchen Dec 12, 2023
90551b4
Fixed tests
brownbaerchen Dec 12, 2023
202f30c
Added cupyx-scipy backend
brownbaerchen Dec 14, 2023
e5d4b56
Implemented NCCL for Alltoallw operations
brownbaerchen Dec 18, 2023
d0a493e
Minor refactor
brownbaerchen Dec 18, 2023
6b47e36
Changed `Alltoallw` communication pattern
brownbaerchen Dec 20, 2023
21805de
Added custom MPI based Alltoallw implementation
brownbaerchen Jan 4, 2024
ceccc7d
Cleanup
brownbaerchen Jan 12, 2024
c3c20ff
Overlapped all communication within a single Alltoallw in NCCL backend
brownbaerchen Feb 13, 2024
405fb4d
Cosmetic changes
brownbaerchen Feb 13, 2024
b69a303
Swapped `array[...]=data` for `cupy.copyto`
brownbaerchen Feb 23, 2024
b9ac0f7
Removed unnecessary rescaling in CuPy backend
brownbaerchen Feb 27, 2024
77b96f3
Use CUDA graphs in NCCL Alltoallw
brownbaerchen Feb 29, 2024
75f0343
Removed dependence on CuPy
brownbaerchen Apr 16, 2024
a920dca
Merge remote-tracking branch 'upstream/master' into cupy_implementation
brownbaerchen May 24, 2024
7330b6b
Fixes
brownbaerchen Oct 12, 2024
5500da4
Merge remote-tracking branch 'upstream/master' into cupy_implementation
brownbaerchen Apr 4, 2025
a7aeec6
Merge branch 'cupy_implementation' of github.com:brownbaerchen/mpi4py…
brownbaerchen Apr 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions mpi4py_fft/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,8 @@
from . import fftw
from .fftw import fftlib
from .io import HDF5File, NCFile, generate_xdmf

try:
from .distarrayCuPy import DistArrayCuPy
except ModuleNotFoundError:
pass
263 changes: 151 additions & 112 deletions mpi4py_fft/distarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,13 @@

comm = MPI.COMM_WORLD

class DistArray(np.ndarray):
"""Distributed Numpy array

This Numpy array is part of a larger global array. Information about the
distribution is contained in the attributes.

Parameters
----------
global_shape : sequence of ints
Shape of non-distributed global array
subcomm : None, :class:`.Subcomm` object or sequence of ints, optional
Describes how to distribute the array
val : Number or None, optional
Initialize array with this number if buffer is not given
dtype : np.dtype, optional
Type of array
buffer : Numpy array, optional
Array of correct shape. The buffer owns the memory that is used for
this array.
alignment : None or int, optional
Make sure array is aligned in this direction. Note that alignment does
not take rank into consideration.
rank : int, optional
Rank of tensor (number of free indices, a scalar is zero, vector one,
matrix two)

class DistArrayBase:
"""
Abstract class for distributed arrays in numpy or cupy implementation

For more information, see `numpy.ndarray <https://docs.scipy.org/doc/numpy/reference/arrays.ndarray.html>`_
Attributes:
- xp: Numerical library, a.k.a. numpy or cupy as class attribute

Note
----
Expand All @@ -53,58 +32,9 @@ class DistArray(np.ndarray):
Also note that the ``alignment`` keyword does not take rank into
consideration. Setting alignment=2 for the array above means that the last
axis will be aligned, also when rank>0.

"""
def __new__(cls, global_shape, subcomm=None, val=None, dtype=float,
buffer=None, strides=None, alignment=None, rank=0):
if len(global_shape[rank:]) < 2: # 1D case
obj = np.ndarray.__new__(cls, global_shape, dtype=dtype, buffer=buffer, strides=strides)
if buffer is None and isinstance(val, Number):
obj.fill(val)
obj._rank = rank
obj._p0 = None
return obj

if isinstance(subcomm, Subcomm):
pass
else:
if isinstance(subcomm, (tuple, list)):
assert len(subcomm) == len(global_shape[rank:])
# Do nothing if already containing communicators. A tuple of subcommunicators is not necessarily a Subcomm
if not np.all([isinstance(s, MPI.Comm) for s in subcomm]):
subcomm = Subcomm(comm, subcomm)
else:
assert subcomm is None
subcomm = [0] * len(global_shape[rank:])
if alignment is not None:
subcomm[alignment] = 1
else:
subcomm[-1] = 1
alignment = len(subcomm)-1
subcomm = Subcomm(comm, subcomm)
sizes = [s.Get_size() for s in subcomm]
if alignment is not None:
assert isinstance(alignment, (int, np.integer))
assert sizes[alignment] == 1
else:
# Decide that alignment is the last axis with size 1
alignment = np.flatnonzero(np.array(sizes) == 1)[-1]
p0 = Pencil(subcomm, global_shape[rank:], axis=alignment)
subshape = p0.subshape
if rank > 0:
subshape = global_shape[:rank] + subshape
obj = np.ndarray.__new__(cls, subshape, dtype=dtype, buffer=buffer)
if buffer is None and isinstance(val, Number):
obj.fill(val)
obj._p0 = p0
obj._rank = rank
return obj

def __array_finalize__(self, obj):
if obj is None:
return
self._p0 = getattr(obj, '_p0', None)
self._rank = getattr(obj, '_rank', None)
xp = None

@property
def alignment(self):
Expand Down Expand Up @@ -152,32 +82,72 @@ def dimensions(self):
"""Return dimensions of array not including rank"""
return len(self._p0.shape)

@staticmethod
def get_subcomm(subcomm, global_shape, rank, alignment):
if isinstance(subcomm, Subcomm):
pass
else:
if isinstance(subcomm, (tuple, list)):
assert len(subcomm) == len(global_shape[rank:])
# Do nothing if already containing communicators. A tuple of subcommunicators is not necessarily a Subcomm
if not np.all([isinstance(s, MPI.Comm) for s in subcomm]):
subcomm = Subcomm(comm, subcomm)
else:
assert subcomm is None
subcomm = [0] * len(global_shape[rank:])
if alignment is not None:
subcomm[alignment] = 1
else:
subcomm[-1] = 1
alignment = len(subcomm) - 1
subcomm = Subcomm(comm, subcomm)
return subcomm

@classmethod
def setup_pencil(cls, subcomm, rank, global_shape, alignment):
sizes = [s.Get_size() for s in subcomm]
if alignment is not None:
assert isinstance(alignment, (int, cls.xp.integer))
assert sizes[alignment] == 1
else:
# Decide that alignment is the last axis with size 1
alignment = np.flatnonzero(np.array(sizes) == 1)[-1]

p0 = Pencil(subcomm, global_shape[rank:], axis=alignment)

subshape = p0.subshape
if rank > 0:
subshape = global_shape[:rank] + subshape

return p0, subshape

def __array_finalize__(self, obj):
if obj is None:
return
self._p0 = getattr(obj, "_p0", None)
self._rank = getattr(obj, "_rank", None)

def __getitem__(self, i):
# Return DistArray if the result is a component of a tensor
# Otherwise return ndarray view
if self.ndim == 1:
return np.ndarray.__getitem__(self, i)
return self.xp.ndarray.__getitem__(self, i)

if isinstance(i, (Integral, slice)) and self.rank > 0:
v0 = np.ndarray.__getitem__(self, i)
v0 = self.xp.ndarray.__getitem__(self, i)
v0._rank = self.rank - (self.ndim - v0.ndim)
return v0

if isinstance(i, (Integral, slice)) and self.rank == 0:
return np.ndarray.__getitem__(self.v, i)
return self.xp.ndarray.__getitem__(self.v, i)

assert isinstance(i, tuple)
if len(i) <= self.rank:
v0 = np.ndarray.__getitem__(self, i)
v0 = self.xp.ndarray.__getitem__(self, i)
v0._rank = self.rank - (self.ndim - v0.ndim)
return v0

return np.ndarray.__getitem__(self.v, i)

@property
def v(self):
""" Return local ``self`` array as an ``ndarray`` object"""
return self.__array__()
return self.xp.ndarray.__getitem__(self.v, i)

def get(self, gslice):
"""Return global slice of ``self``
Expand Down Expand Up @@ -215,6 +185,7 @@ def get(self, gslice):
# global MPI. We create a global file with MPI, but then open it without
# MPI and only on rank 0.
import h5py
# TODO: can we use h5py to communicate the data without copying to cpu first when using cupy?
f = h5py.File('tmp.h5', 'w', driver="mpio", comm=comm)
s = self.local_slice()
sp = np.nonzero([isinstance(x, slice) for x in gslice])[0]
Expand All @@ -230,7 +201,8 @@ def get(self, gslice):
else:
on_this_proc = False
if on_this_proc:
f["data"][sf] = self[tuple(gslice)]
data = self.asnumpy()
f["data"][sf] = data[tuple(gslice)]
f.close()
c = None
if comm.Get_rank() == 0:
Expand Down Expand Up @@ -277,24 +249,6 @@ def local_slice(self):
self._p0.subshape)]
return tuple([slice(0, s) for s in self.shape[:self.rank]] + v)

def get_pencil_and_transfer(self, axis):
"""Return pencil and transfer objects for alignment along ``axis``

Parameters
----------
axis : int
The new axis to align data with

Returns
-------
2-tuple
2-tuple where first item is a :class:`.Pencil` aligned in ``axis``.
Second item is a :class:`.Transfer` object for executing the
redistribution of data
"""
p1 = self._p0.pencil(axis)
return p1, self._p0.transfer(p1, self.dtype)

def redistribute(self, axis=None, out=None):
"""Global redistribution of local ``self`` array

Expand Down Expand Up @@ -327,7 +281,7 @@ def redistribute(self, axis=None, out=None):
return self

if out is not None:
assert isinstance(out, DistArray)
assert issubclass(type(out), DistArrayBase)
assert self.global_shape == out.global_shape
axis = out.alignment
if self.commsizes == out.commsizes:
Expand All @@ -343,11 +297,11 @@ def redistribute(self, axis=None, out=None):

p1, transfer = self.get_pencil_and_transfer(axis)
if out is None:
out = DistArray(self.global_shape,
subcomm=p1.subcomm,
dtype=self.dtype,
alignment=axis,
rank=self.rank)
out = type(self)(self.global_shape,
subcomm=p1.subcomm,
dtype=self.dtype,
alignment=axis,
rank=self.rank)

if self.rank == 0:
transfer.forward(self, out)
Expand All @@ -362,6 +316,24 @@ def redistribute(self, axis=None, out=None):
transfer.destroy()
return out

def get_pencil_and_transfer(self, axis):
"""Return pencil and transfer objects for alignment along ``axis``

Parameters
----------
axis : int
The new axis to align data with

Returns
-------
2-tuple
2-tuple where first item is a :class:`.Pencil` aligned in ``axis``.
Second item is a :class:`.Transfer` object for executing the
redistribution of data
"""
p1 = self._p0.pencil(axis)
return p1, self._p0.transfer(p1, self.dtype)

def write(self, filename, name='darray', step=0, global_slice=None,
domain=None, as_scalar=False):
"""Write snapshot ``step`` of ``self`` to file ``filename``
Expand Down Expand Up @@ -439,6 +411,67 @@ def read(self, filename, name='darray', step=0):
f.read(self, name, step=step)


class DistArray(DistArrayBase, np.ndarray):
"""Distributed Numpy array

This Numpy array is part of a larger global array. Information about the
distribution is contained in the attributes.

Parameters
----------
global_shape : sequence of ints
Shape of non-distributed global array
subcomm : None, :class:`.Subcomm` object or sequence of ints, optional
Describes how to distribute the array
val : Number or None, optional
Initialize array with this number if buffer is not given
dtype : np.dtype, optional
Type of array
buffer : Numpy array, optional
Array of correct shape. The buffer owns the memory that is used for
this array.
alignment : None or int, optional
Make sure array is aligned in this direction. Note that alignment does
not take rank into consideration.
rank : int, optional
Rank of tensor (number of free indices, a scalar is zero, vector one,
matrix two)


For more information, see `numpy.ndarray <https://docs.scipy.org/doc/numpy/reference/arrays.ndarray.html>`_

"""

xp = np

def __new__(cls, global_shape, subcomm=None, val=None, dtype=float,
buffer=None, strides=None, alignment=None, rank=0):
if len(global_shape[rank:]) < 2: # 1D case
obj = cls.xp.ndarray.__new__(cls, global_shape, dtype=dtype, buffer=buffer, strides=strides)
if buffer is None and isinstance(val, Number):
obj.fill(val)
obj._rank = rank
obj._p0 = None
return obj

subcomm = cls.get_subcomm(subcomm, global_shape, rank, alignment)
p0, subshape = cls.setup_pencil(subcomm, rank, global_shape, alignment)

obj = cls.xp.ndarray.__new__(cls, subshape, dtype=dtype, buffer=buffer)
if buffer is None and isinstance(val, Number):
obj.fill(val)
obj._p0 = p0
obj._rank = rank
return obj

@property
def v(self):
"""Return local ``self`` array as an ``ndarray`` object"""
return self.__array__()

def asnumpy(self):
return self

def newDistArray(pfft, forward_output=True, val=0, rank=0, view=False):
"""Return a new :class:`.DistArray` object for provided :class:`.PFFT` object

Expand Down Expand Up @@ -480,7 +513,13 @@ def newDistArray(pfft, forward_output=True, val=0, rank=0, view=False):
else:
dtype = pfft.forward.input_array.dtype
global_shape = (len(global_shape),)*rank + global_shape
z = DistArray(global_shape, subcomm=p0.subcomm, val=val, dtype=dtype,

if pfft.xfftn[0].backend in ["cupy", "cupyx-scipy"]:
from mpi4py_fft.distarrayCuPy import DistArrayCuPy as darraycls
else:
darraycls = DistArray

z = darraycls(global_shape, subcomm=p0.subcomm, val=val, dtype=dtype,
alignment=p0.axis, rank=rank)
return z.v if view else z

Expand Down
Loading