Skip to content

Commit d68a861

Browse files
committed
Initial implementation of BatchedSumma
1 parent 01fe71f commit d68a861

2 files changed

Lines changed: 536 additions & 2 deletions

File tree

pylops_mpi/signalprocessing/Fredholm1.py

Lines changed: 371 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
import math
12
import numpy as np
23

34
from mpi4py import MPI
5+
from typing import Optional, Any, Tuple
46
from pylops.utils.backend import get_module
57
from pylops.utils.typing import DTypeLike, NDArray
68

@@ -9,8 +11,376 @@
911
MPILinearOperator,
1012
Partition
1113
)
14+
from pylops_mpi.Distributed import DistributedMixIn
15+
from pylops_mpi.DistributedArray import subcomm_split
1216

1317

18+
def _choose_pb_and_p(P: int, nsl: int) -> Tuple[int, int]:
19+
"""
20+
Choose Pb to minimize the α–β model under constraint P/Pb is a perfect square.
21+
Heuristic: largest Pb <= nsl such that P % Pb == 0 and is_square(P/Pb).
22+
"""
23+
best = None
24+
for Pb in range(min(P, nsl), 0, -1):
25+
if P % Pb != 0: continue
26+
P2 = P // Pb
27+
p = int(math.isqrt(P2))
28+
if p * p == P2:
29+
best = (Pb, p)
30+
break
31+
if best is None:
32+
raise ValueError(
33+
f"No valid (Pb,p) with Pb<=nsl and P/Pb square. P={P}, nsl={nsl}."
34+
)
35+
return best
36+
37+
38+
class MPIFredholm1SUMMA(DistributedMixIn, MPILinearOperator):
39+
"""
40+
Distributed Fredholm-1 using batched SUMMA on contraction:
41+
d[k,:,:] = G[k,:,:] @ m[k,:,:]
42+
43+
G is distributed as tiles (batch, x_tile, y_tile) over (batch_group, grid_row, grid_col).
44+
m is distributed as tiles (batch, y_tile, z_tile) over (batch_group, grid_row, grid_col).
45+
d is distributed as tiles (batch, x_tile, z_tile) over (batch_group, grid_row, grid_col).
46+
47+
This operator uses Partition.SCATTER for both input and output.
48+
49+
Parameters
50+
----------
51+
G_local : ndarray
52+
Local tile of G of shape (B_g, nx_loc, ny_loc) for this rank.
53+
nz : int
54+
Global nz dimension.
55+
nsl_global : int, optional
56+
Global number of slices. If None, inferred from batch sizes across ranks.
57+
saveGt : bool, optional
58+
Save local conjugate-transpose of G tile for adjoint.
59+
pb : int, optional
60+
Number of batch groups. If None, auto-chosen.
61+
base_comm : MPI.Comm
62+
base_comm_nccl : optional NCCL comm (only if base_comm == COMM_WORLD)
63+
dtype : str
64+
"""
65+
def __init__(
66+
self,
67+
G_local: NDArray,
68+
nz: int,
69+
nsl_global: Optional[int] = None,
70+
saveGt: bool = False,
71+
pb: Optional[int] = None,
72+
base_comm: MPI.Comm = MPI.COMM_WORLD,
73+
base_comm_nccl: Optional[Any] = None,
74+
dtype: DTypeLike = "float64",
75+
) -> None:
76+
if base_comm_nccl is not None and base_comm is not MPI.COMM_WORLD:
77+
raise ValueError("base_comm_nccl requires base_comm=MPI.COMM_WORLD")
78+
79+
self.base_comm = base_comm
80+
self.base_comm_nccl = base_comm_nccl
81+
self.rank = base_comm.Get_rank()
82+
self.size = base_comm.Get_size()
83+
84+
# Local batch size
85+
if G_local.ndim != 3: raise ValueError(f"G_local must be 3D (B,nx_loc,ny_loc). Got {G_local.shape}")
86+
self.B = int(G_local.shape[0])
87+
self.nz = int(nz)
88+
89+
# Determine batch-grouping (Pb) and inner SUMMA grid size (p)
90+
# Need nsl_global for optimal choice; if not provided, use a conservative estimate from all ranks:
91+
if nsl_global is None:
92+
# Infer: sum(B_rank) over all ranks = p^2 * nsl_global (only true after we pick p)
93+
# So we first pick Pb,p using nsl_est = sum(B_rank)/min_square_factor_guess
94+
# Practical approach: assume Pb=1 initially, require P square, then compute nsl_global
95+
# If user doesn't provide nsl_global, we do *no* auto-optimization; pb must be given or P must be square
96+
if pb is None:
97+
p0 = int(math.isqrt(self.size))
98+
if p0 * p0 != self.size:
99+
raise ValueError(
100+
"If nsl_global is not provided, pb must be provided, "
101+
"or P must be a perfect square (so pb=1 is valid)."
102+
)
103+
pb = 1
104+
105+
if pb is None:
106+
pb, p = _choose_pb_and_p(self.size, int(nsl_global))
107+
else:
108+
# For now we error but we could do something like where we would deactivate certain procs
109+
if self.size % pb != 0:
110+
raise ValueError(f"pb must divide P. Got pb={pb}, P={self.size}.")
111+
P2 = self.size // pb
112+
p = int(math.isqrt(P2))
113+
if p * p != P2:
114+
raise ValueError(f"P/pb must be a perfect square. Got P/pb={P2}.")
115+
if nsl_global is not None and pb > nsl_global:
116+
raise ValueError(f"pb must be <= nsl_global. Got pb={pb}, nsl_global={nsl_global}.")
117+
118+
self.pb = int(pb)
119+
self.p = int(p)
120+
self.P2 = self.p * self.p
121+
122+
# Batch-group id and rank within group
123+
self.batch_id = self.rank // self.P2
124+
self.rank_in_group = self.rank % self.P2
125+
126+
if self.batch_id >= self.pb:
127+
raise ValueError(
128+
f"Rank mapping expects P == pb*p^2. "
129+
f"Got P={self.size}, pb={self.pb}, p={self.p} => pb*p^2={self.pb*self.P2}."
130+
)
131+
132+
# Create batch communicator
133+
self.batch_comm = base_comm.Split(color=self.batch_id, key=self.rank_in_group)
134+
135+
# Within group, 2D grid coords
136+
self.row_id, self.col_id = divmod(self.rank_in_group, self.p)
137+
138+
# Row/col communicators (within group)
139+
self.row_comm = self.batch_comm.Split(color=self.row_id, key=self.col_id)
140+
self.col_comm = self.batch_comm.Split(color=self.col_id, key=self.row_id)
141+
142+
# # NCCL subcomms if provided
143+
# if base_comm_nccl is not None:
144+
# # subcomm_split expects mask per WORLD rank
145+
# # batch_comm: group by batch_id
146+
# mask_batch = [r // self.P2 for r in range(self.size)]
147+
# self.batch_comm_nccl = subcomm_split(mask_batch, base_comm_nccl)
148+
#
149+
# # row_comm: group by (batch_id,row_id)
150+
# mask_row = []
151+
# mask_col = []
152+
# for r in range(self.size):
153+
# bid = r // self.P2
154+
# rig = r % self.P2
155+
# rr, cc = divmod(rig, self.p)
156+
# mask_row.append(bid * self.p + rr)
157+
# mask_col.append(bid * self.p + cc)
158+
# self.row_comm_nccl = subcomm_split(mask_row, base_comm_nccl)
159+
# self.col_comm_nccl = subcomm_split(mask_col, base_comm_nccl)
160+
# else:
161+
self.batch_comm_nccl = None
162+
self.row_comm_nccl = None
163+
self.col_comm_nccl = None
164+
165+
# Store G tile and optional GT
166+
self.G = G_local.astype(np.dtype(dtype))
167+
if saveGt:
168+
# (B, nx_loc, ny_loc) -> (B, ny_loc, nx_loc)
169+
self.GT = self.G.transpose(0, 2, 1).conj()
170+
171+
# Infer global nx, ny from within-group tiling
172+
# A tile: (nx_loc, ny_loc) where nx is reduced on col_comm, ny on row_comm
173+
nx_loc = self.G.shape[1]
174+
ny_loc = self.G.shape[2]
175+
self.nx = int(self.col_comm.allreduce(nx_loc, op=MPI.SUM))
176+
self.ny = int(self.row_comm.allreduce(ny_loc, op=MPI.SUM))
177+
178+
# Determine global nsl
179+
if nsl_global is None:
180+
# sum B over WORLD ranks = p^2 * sum(B over batch groups) = p^2 * nsl_global
181+
Bsum = int(self.base_comm.allreduce(self.B, op=MPI.SUM))
182+
if Bsum % self.P2 != 0:
183+
raise ValueError(
184+
f"Cannot infer nsl_global cleanly: sum(B)={Bsum} not divisible by p^2={self.P2}."
185+
)
186+
self.nsl = Bsum // self.P2
187+
else:
188+
self.nsl = int(nsl_global)
189+
190+
# Padding sizes for SUMMA blocks
191+
self.nx_pad = math.ceil(self.nx / self.p) * self.p
192+
self.ny_pad = math.ceil(self.ny / self.p) * self.p
193+
self.nz_pad = math.ceil(self.nz / self.p) * self.p
194+
195+
self.bn = self.nx_pad // self.p
196+
self.bk = self.ny_pad // self.p
197+
self.bm = self.nz_pad // self.p
198+
199+
# Local (unpadded) extents for this rank’s output tile (x,z) and input tile (y,z)
200+
self.local_n = max(0, min(self.bn, self.nx - self.row_id * self.bn))
201+
self.local_k = max(0, min(self.bk, self.ny - self.row_id * self.bk)) # for m (K rows) uses row_id
202+
self.local_ka = max(0, min(self.bk, self.ny - self.col_id * self.bk)) # for G (K cols) uses col_id
203+
self.local_m = max(0, min(self.bm, self.nz - self.col_id * self.bm))
204+
205+
# Operator global shapes (conceptual / unpadded)
206+
self.dims_model = (self.nsl, self.ny, self.nz)
207+
self.dims_data = (self.nsl, self.nx, self.nz)
208+
shape = (int(np.prod(self.dims_data)), int(np.prod(self.dims_model)))
209+
super().__init__(shape=shape, dtype=np.dtype(dtype), base_comm=base_comm)
210+
211+
# Ensure local G matches expected tile sizes in (nx_loc, ny_loc) for A distribution
212+
# We allow edge tiles to be smaller since we will pad later
213+
if self.G.shape[1] != self.local_n or self.G.shape[2] != self.local_ka:
214+
# Not necessarily fatal if user pre-padded; allow larger, but disallow mismatch that breaks slicing
215+
if self.G.shape[1] < self.local_n or self.G.shape[2] < self.local_ka:
216+
raise ValueError(
217+
f"G_local tile too small for this rank. "
218+
f"Expected at least ({self.B},{self.local_n},{self.local_ka}), got {self.G.shape}."
219+
)
220+
221+
def _matvec(self, x: DistributedArray) -> DistributedArray:
222+
ncp = get_module(x.engine)
223+
if x.partition != Partition.SCATTER:
224+
raise ValueError(f"x should have partition={Partition.SCATTER}. Got {x.partition} instead.")
225+
226+
# Input local tile expected shape: (B, local_k (by row_id), local_m (by col_id))
227+
expected_in = self.B * self.local_k * self.local_m
228+
if x.local_array.size != expected_in:
229+
raise ValueError(
230+
f"Local x size mismatch. Expected {expected_in} elements "
231+
f"(B={self.B}, local_k={self.local_k}, local_m={self.local_m}), "
232+
f"got {x.local_array.size}."
233+
)
234+
235+
output_dtype = np.result_type(self.dtype, x.dtype)
236+
237+
# Output local shapes for SCATTER vector
238+
my_out = self.B * self.local_n * self.local_m
239+
local_shapes = self.base_comm.allgather(my_out)
240+
241+
y = DistributedArray(
242+
global_shape=int(np.prod(self.dims_data)),
243+
local_shapes=local_shapes,
244+
mask=x.mask,
245+
partition=Partition.SCATTER,
246+
engine=x.engine,
247+
dtype=output_dtype,
248+
base_comm=x.base_comm,
249+
base_comm_nccl=x.base_comm_nccl,
250+
)
251+
252+
# Reshape local x tile and pad to (B, bk, bm)
253+
X = x.local_array.reshape((self.B, self.local_k, self.local_m)).astype(output_dtype)
254+
if self.local_k != self.bk or self.local_m != self.bm:
255+
Xp = ncp.zeros((self.B, self.bk, self.bm), dtype=output_dtype)
256+
Xp[:, :self.local_k, :self.local_m] = X
257+
X = Xp
258+
259+
# Pad local G tile to (B, bn, bk) for SUMMA A tiles
260+
G = self.G[:, :self.local_n, :self.local_ka].astype(output_dtype)
261+
if self.local_n != self.bn or self.local_ka != self.bk:
262+
Gp = ncp.zeros((self.B, self.bn, self.bk), dtype=output_dtype)
263+
Gp[:, :self.local_n, :self.local_ka] = G
264+
G = Gp
265+
266+
Y = ncp.zeros((self.B, self.bn, self.bm), dtype=output_dtype)
267+
268+
row_nccl = self.row_comm_nccl if x.engine == "cupy" else None
269+
col_nccl = self.col_comm_nccl if x.engine == "cupy" else None
270+
271+
# Batched SUMMA
272+
for k in range(self.p):
273+
Atemp = G.copy() if self.col_id == k else ncp.empty_like(G)
274+
Btemp = X.copy() if self.row_id == k else ncp.empty_like(X)
275+
276+
Atemp = self._bcast(self.row_comm, row_nccl, Atemp, root=k, engine=x.engine)
277+
Btemp = self._bcast(self.col_comm, col_nccl, Btemp, root=k, engine=x.engine)
278+
279+
Y += ncp.matmul(Atemp, Btemp)
280+
281+
# Unpad to local (B, local_n, local_m) and write out
282+
Y = Y[:, :self.local_n, :self.local_m]
283+
y[:] = Y.ravel()
284+
return y
285+
286+
def _rmatvec(self, x: DistributedArray) -> DistributedArray:
287+
ncp = get_module(x.engine)
288+
if x.partition != Partition.SCATTER:
289+
raise ValueError(f"x should have partition={Partition.SCATTER}. Got {x.partition} instead.")
290+
291+
# Input to adjoint is data tile: (B, local_n, local_m)
292+
expected_in = self.B * self.local_n * self.local_m
293+
if x.local_array.size != expected_in:
294+
raise ValueError(
295+
f"Local x size mismatch for adjoint. Expected {expected_in} elements "
296+
f"(B={self.B}, local_n={self.local_n}, local_m={self.local_m}), got {x.local_array.size}."
297+
)
298+
299+
# Output dtype rules similar to your matrix-mult operators
300+
if np.iscomplexobj(self.G):
301+
output_dtype = np.result_type(self.dtype, x.dtype)
302+
else:
303+
output_dtype = x.dtype if np.iscomplexobj(x.local_array) else self.dtype
304+
output_dtype = np.result_type(self.dtype, output_dtype)
305+
306+
# Output local shapes for SCATTER model vector
307+
my_out = self.B * self.local_k * self.local_m # (B, local_k(row_id), local_m(col_id))
308+
local_shapes = self.base_comm.allgather(my_out)
309+
310+
y = DistributedArray(
311+
global_shape=int(np.prod(self.dims_model)),
312+
local_shapes=local_shapes,
313+
mask=x.mask,
314+
partition=Partition.SCATTER,
315+
engine=x.engine,
316+
dtype=output_dtype,
317+
base_comm=x.base_comm,
318+
base_comm_nccl=x.base_comm_nccl,
319+
)
320+
321+
# Reshape x tile and pad to (B, bn, bm)
322+
X = x.local_array.reshape((self.B, self.local_n, self.local_m)).astype(output_dtype)
323+
if self.local_n != self.bn or self.local_m != self.bm:
324+
Xp = ncp.zeros((self.B, self.bn, self.bm), dtype=output_dtype)
325+
Xp[:, :self.local_n, :self.local_m] = X
326+
X = Xp
327+
328+
# Local A^H tile (transpose-conj of A tile): (B, bk, bn)
329+
if hasattr(self, "GT"):
330+
AT_local = self.GT[:, :self.local_ka, :self.local_n].astype(output_dtype)
331+
else:
332+
AT_local = self.G[:, :self.local_n, :self.local_ka].transpose(0, 2, 1).conj().astype(output_dtype)
333+
334+
if self.local_ka != self.bk or self.local_n != self.bn:
335+
ATp = ncp.zeros((self.B, self.bk, self.bn), dtype=output_dtype)
336+
ATp[:, :self.local_ka, :self.local_n] = AT_local
337+
AT_local = ATp
338+
AT_local = ncp.ascontiguousarray(AT_local)
339+
340+
Y = ncp.zeros((self.B, self.bk, self.bm), dtype=output_dtype)
341+
342+
base_nccl = self.base_comm_nccl if x.engine == "cupy" else None
343+
col_nccl = self.col_comm_nccl if x.engine == "cupy" else None
344+
345+
# Batched adjoint SUMMA variant matching your existing _MPISummaMatrixMult._rmatvec:
346+
# - broadcast X panels down col_comm
347+
# - move AT blocks across WORLD ranks to emulate transposed distribution
348+
for k in range(self.p):
349+
Xtemp = X.copy() if self.row_id == k else ncp.empty_like(X)
350+
Xtemp = self._bcast(self.col_comm, col_nccl, Xtemp, root=k, engine=x.engine)
351+
352+
# Determine source rank for AT block needed this iteration
353+
# WORLD rank mapping inside batch group:
354+
# world_rank = batch_id*P2 + (row*p + col)
355+
# Need AT from srcA = (row=k, col=row_id) within this batch group:
356+
srcA_in_group = k * self.p + self.row_id
357+
srcA = self.batch_id * self.P2 + srcA_in_group
358+
359+
ATtemp = AT_local if (self.rank == srcA) else None
360+
361+
# Send from ranks with row_id==k (within group) to row=col_id targets, across all columns (within group),
362+
# using WORLD communicator for explicit point-to-point
363+
for moving_col in range(self.p):
364+
if self.row_id == k:
365+
# sender is (row=k, col=self.col_id)
366+
dest_in_group = self.col_id * self.p + moving_col
367+
destA = self.batch_id * self.P2 + dest_in_group
368+
if destA != self.rank:
369+
tagA = (100 + k) * 100000 + destA
370+
self._send(self.base_comm, base_nccl, AT_local, dest=destA, tag=tagA, engine=x.engine)
371+
372+
if self.col_id == moving_col and ATtemp is None:
373+
tagA = (100 + k) * 100000 + self.rank
374+
recv_buf = ncp.empty_like(AT_local)
375+
ATtemp = self._recv(self.base_comm, base_nccl, recv_buf, source=srcA, tag=tagA, engine=x.engine)
376+
377+
Y += ncp.matmul(ATtemp, Xtemp)
378+
379+
# Unpad output to (B, local_k(row_id), local_m)
380+
Y = Y[:, :self.local_k, :self.local_m]
381+
y[:] = Y.ravel()
382+
return y
383+
14384
class MPIFredholm1(MPILinearOperator):
15385
r"""Fredholm integral of first kind.
16386
@@ -166,6 +536,5 @@ def _rmatvec(self, x: NDArray) -> NDArray:
166536
y1[isl] = ncp.dot(x[isl].T.conj(), self.G[isl]).T.conj()
167537

168538
# gather results
169-
y[:] = ncp.vstack(y._allgather(y.base_comm, y.base_comm_nccl, y1,
170-
engine=y.engine)).ravel()
539+
y[:] = ncp.vstack(y._allgather(y.base_comm, y.base_comm_nccl, y1, engine=y.engine)).ravel()
171540
return y

0 commit comments

Comments
 (0)