Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
fdeb121
mpi: Add NBX sparse-exchange transport for distributed indexing
mloubout Jun 24, 2026
db91582
mpi: Add Selection layer normalizing NumPy indices
mloubout Jun 24, 2026
c9557b5
mpi: Add Layout and ExchangePlan for index redistribution
mloubout Jun 24, 2026
2f5e1ff
mpi: Add Exchange value object with cached redistribution plans
mloubout Jun 24, 2026
b1ce47d
mpi: Route distributed advanced indexing through the redistribution e…
mloubout Jun 24, 2026
e22fff7
api: Add data_local and return local_indices as a DimensionTuple
mloubout Jun 24, 2026
02b347b
tests: Cover distributed advanced indexing and assignment
mloubout Jun 24, 2026
fa959fb
mpi: Redistribute decomposed-value assignment point-to-point
mloubout Jun 24, 2026
6566f52
mpi: Evaluate negative-step slice reads via point-to-point pull
mloubout Jun 24, 2026
f92de50
data: Add induced decomposition for distributed slicing
mloubout Jun 24, 2026
4b86a3f
mpi: Make distributed indexing local and remove mpi_index_maps
mloubout Jun 24, 2026
b8a8c2d
tests: Update distributed slicing tests to local induced semantics
mloubout Jun 24, 2026
e4e5b96
examples: Add MPI data initialization notebook
mloubout Jun 24, 2026
391ab49
data: Simplify distributed redistribution engine internals
mloubout Jun 24, 2026
ec0d8b2
data: Remove redundant comm_type indexing apparatus
mloubout Jun 24, 2026
29d086b
tests: Add serial unit tests for the distributed engine
mloubout Jun 24, 2026
d9f3b47
data: Standardize docstrings in the distributed engine
mloubout Jun 25, 2026
9b29276
examples: Expand MPI data notebook to current capabilities
mloubout Jun 25, 2026
cf38077
data: Fix strided slice with start inside a subdomain
mloubout Jun 25, 2026
375bd23
examples: Show strided sub-block gather in MPI data notebook
mloubout Jun 25, 2026
011a76a
data: Drop reStructuredText roles from engine docstrings
mloubout Jun 25, 2026
48ed4ac
data: Fix distributed engine package summary line
mloubout Jun 25, 2026
0be9726
tests: Fold distributed-engine unit tests into test_data.py
mloubout Jun 25, 2026
6882ad3
data: Rename engine selector Scalar to IndexScalar
mloubout Jun 25, 2026
293a832
data: Use single backticks in docstrings for code names
mloubout Jun 25, 2026
01a9954
examples: Add MPI data-indexing benchmark vs NumPy
mloubout Jun 25, 2026
1597d0e
data: Make local indexing O(result size), not O(domain)
mloubout Jun 25, 2026
ad8366a
examples: Reframe the MPI data-indexing benchmark
mloubout Jun 25, 2026
872d6a4
data: Skip the O(n) np.delete in index_handle_oob for numeric indices
mloubout Jun 25, 2026
82081ba
examples: Report indexing overhead honestly in the benchmark
mloubout Jun 25, 2026
6d1a540
data: Yield the CPU in the NBX exchange loop under oversubscription
mloubout Jun 25, 2026
dbf3379
ci: Make OpenMPI yield when idle for the MPI example notebooks
mloubout Jun 25, 2026
f17919a
data: Replace NBX consensus with a portable sparse exchange
mloubout Jun 25, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/examples-mpi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ jobs:
ipcluster start --profile=mpi --engines=mpi -n 4 --daemonize
# A few seconds to ensure workers are ready
sleep 10
pytest -v --nbval examples/mpi
pytest -vvv --nbval examples/mpi
ipcluster stop --profile=mpi

- name: Test seismic examples
Expand Down
301 changes: 109 additions & 192 deletions devito/data/data.py

Large diffs are not rendered by default.

65 changes: 63 additions & 2 deletions devito/data/decomposition.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,14 @@ def index_glb_to_loc(self, *args, rel=True):
loc_min = loc_abs_min - base \
+ np.mod(glb_idx.step - np.mod(base, glb_idx.step),
glb_idx.step)
else: # glb start is given explicitly
elif glb_idx_min >= loc_abs_min:
# The slice starts within this subdomain, so its first
# selected index is the start itself (the stride phase
# begins here, not at loc_abs_min).
loc_min = glb_idx_min - base
else: # the slice started in an earlier subdomain
# Advance to the first in-stride index at or after this
# subdomain's start.
loc_min = loc_abs_min - base \
+ np.mod(glb_idx.step - np.mod(base - glb_idx.start,
glb_idx.step), glb_idx.step)
Expand All @@ -319,7 +326,14 @@ def index_glb_to_loc(self, *args, rel=True):
loc_max = top - base \
+ np.mod(glb_idx.step - np.mod(top - glb_max,
glb_idx.step), glb_idx.step)
else:
elif glb_idx_max <= loc_abs_max:
# The slice's high end (its start) is within this
# subdomain, so the first selected index is the start
# itself (the stride phase begins here, not at loc_abs_max).
loc_max = glb_idx_max - base
else: # the slice's high end is in a later subdomain
# Step back to the first in-stride index at or below this
# subdomain's end.
loc_max = top - base \
+ np.mod(glb_idx.step - np.mod(top - glb_idx.start,
glb_idx.step), glb_idx.step)
Expand Down Expand Up @@ -555,3 +569,50 @@ def reshape(self, *args):
items = [i + nleft for i in items]

return Decomposition(items, self.local)

def index_decomposition(self, idx):
"""
The decomposition induced by NumPy-indexing this axis with a slice.

Each subdomain keeps the result positions, in index order, of the global
indices it owns. A strided or reversed slice therefore just relabels
which rank owns which result index -- indexing never moves data across
ranks. Unlike `reshape` (which adjusts subdomain *boundaries* and
is used for halos), this follows exact NumPy slicing semantics.

Examples
--------
>>> d = Decomposition([[0, 1, 2, 3], [4, 5, 6, 7]], 0)
>>> d.index_decomposition(slice(None, None, -1))
Decomposition(<<[4,7]>>, [0,3])
>>> d.index_decomposition(slice(None, None, -3))
Decomposition(<<[2,2]>>, [0,1])
"""
if self.glb_max is None:
return Decomposition([np.array([], dtype=np.int64)]*len(self),
self.local)
size = self.glb_max - self.glb_min + 1

# The global indices selected by `idx`, in result order. For a slice
# (the only caller) this is built directly, without materialising the
# whole axis.
if isinstance(idx, slice):
start, stop, step = idx.indices(size)
selected = self.glb_min + np.arange(start, stop, step)
else:
selected = np.arange(self.glb_min, self.glb_min + size)[idx]

# Bucket the selected indices by owning subdomain. A subdomain is a
# contiguous range, so membership is a bounds check in O(len(selected))
# -- avoiding an O(axis) `isin` on every slice. A non-contiguous
# subdomain (not produced by gridding/slicing) falls back to `isin`.
items = []
for sd in self:
if sd.size == 0:
items.append(np.array([], dtype=np.int64))
elif sd[-1] - sd[0] + 1 == sd.size:
owned = (selected >= sd[0]) & (selected <= sd[-1])
items.append(np.nonzero(owned)[0])
else:
items.append(np.nonzero(np.isin(selected, sd))[0])
return Decomposition(items, self.local)
22 changes: 22 additions & 0 deletions devito/data/distributed/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""
Indexing engine for MPI-distributed arrays.

Indexing an MPI-distributed array is treated as a redistribution between layouts.
The engine is built as four pure layers plus a value object:

* `selection` -- what an index expression means (serial NumPy semantics).
* `layout` -- where a global coordinate physically lives.
* `plan` -- the rank-to-rank routing, built without communication.
* `transport` -- the sparse point-to-point exchange (NBX).
* `exchange` -- `Exchange(data, idx).get()/.put(value)`.

Only `Exchange` is needed by `Data`; the lower layers are independently
testable (in serial, or with toy buffers).
"""

from devito.data.distributed.exchange import Exchange, cached_exchange # noqa
from devito.data.distributed.layout import Layout # noqa
from devito.data.distributed.plan import ExchangePlan # noqa
from devito.data.distributed.redistribution import redistribute_set # noqa
from devito.data.distributed.selection import Selection # noqa
from devito.data.distributed.transport import sparse_exchange # noqa
113 changes: 113 additions & 0 deletions devito/data/distributed/exchange.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
"""
Exchange: the value object tying a Selection and a Layout into a reusable plan.

`Exchange(data, idx)` builds the routing for one index expression without any
communication. `get` pulls `data[idx]` and `put` assigns `data[idx] =
value`. Because the plan is communication-free to build and independent of the
data *values*, it can be cached and replayed across calls with the same index
(e.g. sparse injection every timestep), so the steady state is pure
point-to-point with no re-planning. `cached_exchange` provides that cache;
`Data` uses it so `data[idx] = value` is automatically plan-cached.
"""

from functools import lru_cache

import numpy as np

from devito.data.distributed.layout import Layout
from devito.data.distributed.plan import ExchangePlan
from devito.data.distributed.selection import Selection
from devito.tools import as_tuple

__all__ = ['Exchange', 'cached_exchange']


class Exchange:

"""
A reusable redistribution plan for `data[idx]` on distributed `data`.

Parameters
----------
data : Data
The MPI-distributed array being indexed.
idx : index expression
Any NumPy index (scalars, slices, integer arrays, boolean masks).
"""

def __init__(self, data, idx):
# Keep the data reference (not a snapshot): the underlying buffer is
# stable across `f.data` views, so a cached plan always reads/writes the
# current values.
self._data = data
global_shape = tuple(
dec.size if dec is not None else size
for dec, size in zip(data._decomposition, data.shape, strict=True)
)
self._layout = Layout(data._distributor, data._decomposition, global_shape)
self._selection = Selection.from_index(idx, global_shape)
self._plan = ExchangePlan.build(self._selection, self._layout)

def get(self):
"""Return `data[idx]` as a NumPy array."""
return self._plan.get(np.asarray(self._data))

def put(self, value):
"""Assign `data[idx] = value`."""
self._plan.put(np.asarray(self._data), value)


class _ExchangeKey:

"""
Hashable, content-addressed key wrapping `(data, idx)` for plan caching.

Generates a signature from the `data` identity and the `idx` content, so that
the same plan is reused across calls with the same index expression, even if
the `data` object is a different view of the same underlying buffer.
The signature does not include the `data` values,
but only the `data` metadata (shape, decomposition, distributor)
and the `idx` content so that the same plan is reused across calls.
"""

__slots__ = ('data', 'idx', '_sig')

def __init__(self, data, idx):
self.data = data
self.idx = idx
self._sig = _signature(self.data, self.idx)

def __hash__(self):
return hash(self._sig)

def __eq__(self, other):
return self._sig == other._sig


@lru_cache(maxsize=64)
def _build(key):
return Exchange(key.data, key.idx)


def cached_exchange(data, idx):
"""Return a (cached) `Exchange` for `data[idx]`."""
return _build(_ExchangeKey(data, idx))


def _signature(data, idx):
# The decomposition/distributor are keyed by identity: while cached, the key
# keeps them alive so their ids cannot be recycled, and distinct live objects
# always have distinct ids.
sig = [id(data._distributor), id(data._decomposition), tuple(data.shape)]
for component in as_tuple(idx):
if isinstance(component, np.ndarray):
sig.append(('arr', component.shape, component.dtype.str,
component.tobytes()))
elif isinstance(component, slice):
sig.append(('slc', component.start, component.stop, component.step))
elif isinstance(component, (list, tuple)):
arr = np.asarray(component)
sig.append(('seq', arr.shape, arr.dtype.str, arr.tobytes()))
else:
sig.append(('obj', component))
return tuple(sig)
108 changes: 108 additions & 0 deletions devito/data/distributed/layout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
"""
Layout layer: where a global coordinate physically lives.

A `Layout` wraps a distributor and a per-axis decomposition and answers,
for any axis, "which rank owns global index g, and at what local offset". It is
the single bridge between the layout-independent `Selection` and the
physical MPI placement, and it is what makes replicated and distributed axes
look uniform to the planner. All maps are computed locally from replicated
metadata; no communication happens here.
"""

from functools import cached_property

import numpy as np

from devito.tools import prod

__all__ = ['Layout']


class Layout:

"""
Physical placement of a distributed array's axes.

Parameters
----------
distributor : Distributor
Provides the communicator, topology and rank/coord maps.
decomposition : tuple
One entry per array axis: a Decomposition for a distributed axis, or
`None` for a replicated axis.
global_shape : tuple of int
The global array shape (full size on every axis).
"""

def __init__(self, distributor, decomposition, global_shape):
self.distributor = distributor
self.decomposition = tuple(decomposition)
self.global_shape = tuple(global_shape)

@cached_property
def distributed_axes(self):
"""The array axes that are MPI-distributed, in increasing order."""
return tuple(a for a, d in enumerate(self.decomposition) if d is not None)

@cached_property
def replicated_axes(self):
"""The array axes that are replicated on every rank."""
return tuple(a for a, d in enumerate(self.decomposition) if d is None)

@cached_property
def replicated_size(self):
"""Product of the full local sizes of the replicated axes."""
return prod(self.global_shape[a] for a in self.replicated_axes)

def axis_maps(self, axis):
"""
Lookup tables for one distributed axis.

Returns
-------
owner : ndarray
`owner[g]` is the topology sub-rank owning global index `g` along
this axis (`-1` if out of bounds).
local : ndarray
`local[g]` is the offset of `g` within its owner's subdomain.
sizes : ndarray
`sizes[i]` is the number of indices owned by sub-rank `i`.
"""
return self._axis_maps[axis]

@cached_property
def _axis_maps(self):
maps = {}
for axis in self.distributed_axes:
dec = self.decomposition[axis]
gmin = dec.glb_min or 0
n = self.global_shape[axis]
owner = np.full(n, -1, dtype=np.int64)
local = np.full(n, -1, dtype=np.int64)
sizes = np.zeros(len(dec), dtype=np.int64)
for i, sub in enumerate(dec):
pos = np.asarray(sub, dtype=np.int64) - gmin
in_range = (pos >= 0) & (pos < n)
owner[pos[in_range]] = i
local[pos[in_range]] = np.arange(pos.size)[in_range]
sizes[i] = pos.size
maps[axis] = (owner, local, sizes)
return maps

@cached_property
def topology_shape(self):
"""Number of sub-ranks along each distributed axis."""
return tuple(len(self.decomposition[a]) for a in self.distributed_axes)

@cached_property
def coord_to_rank(self):
"""Map a topology coordinate tuple (over distributed axes) to a flat rank.

Grid distributors expose the inverse Cartesian map via `all_coords`;
single-axis distributors (e.g. sparse) lay ranks out linearly, so the
sole sub-rank index is the flat rank.
"""
if hasattr(self.distributor, 'all_coords'):
return {tuple(int(c) for c in coord): r
for r, coord in enumerate(self.distributor.all_coords)}
return {(r,): r for r in range(self.distributor.nprocs)}
Loading
Loading