Skip to content

Commit a70c44e

Browse files
committed
Specify distribution strategy per env var
1 parent e9557ff commit a70c44e

File tree

1 file changed

+45
-3
lines changed

1 file changed

+45
-3
lines changed

src/binding/python/openpmd_api/pipe/__init__.py

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
"""
1111
from .. import openpmd_api_cxx as io
1212
import argparse
13+
import os
14+
import re
1315
import sys # sys.stderr.write
1416

1517
# MPI is an optional dependency
@@ -19,7 +21,7 @@
1921
except ImportError:
2022
HAVE_MPI = False
2123

22-
debug = True
24+
debug = False
2325

2426

2527
class FallbackMPICommunicator:
@@ -39,7 +41,12 @@ def parse_args():
3941
Parallelization with MPI is optionally possible and is done automatically
4042
as soon as the mpi4py package is found and this tool is called in an MPI
4143
context. In that case, each dataset will be equally sliced along the dimension
42-
with the largest extent.""")
44+
with the largest extent.
45+
A chunk distribution strategy may be selected via the environment variable
46+
OPENPMD_CHUNK_DISTRIBUTION. Options include "roundrobin", "binpacking",
47+
"slicedataset" and "hostname_<1>_<2>", where <1> should be replaced with a
48+
strategy to be applied within a compute node and <2> with a secondary strategy
49+
in case the hostname strategy does not distribute all chunks.""")
4350

4451
parser.add_argument('--infile', type=str, help='In file')
4552
parser.add_argument('--outfile', type=str, help='Out file')
@@ -85,6 +92,40 @@ def run(self):
8592
self.dest.store(index, item)
8693

8794

95+
def distribution_strategy(dataset_extent,
96+
mpi_rank,
97+
mpi_size,
98+
strategy_identifier=None):
99+
if strategy_identifier is None or not strategy_identifier:
100+
if 'OPENPMD_CHUNK_DISTRIBUTION' in os.environ:
101+
strategy_identifier = os.environ[
102+
'OPENPMD_CHUNK_DISTRIBUTION'].lower()
103+
else:
104+
strategy_identifier = 'hostname_binpacking_slicedataset' # default
105+
match = re.search('hostname_(.*)_(.*)', strategy_identifier)
106+
if match is not None:
107+
inside_node = distribution_strategy(dataset_extent,
108+
mpi_rank,
109+
mpi_size,
110+
strategy_identifier=match.group(1))
111+
second_phase = distribution_strategy(
112+
dataset_extent,
113+
mpi_rank,
114+
mpi_size,
115+
strategy_identifier=match.group(2))
116+
return io.FromPartialStrategy(io.ByHostname(inside_node), second_phase)
117+
elif strategy_identifier == 'roundrobin':
118+
return io.RoundRobin()
119+
elif strategy_identifier == 'binpacking':
120+
return io.BinPacking()
121+
elif strategy_identifier == 'slicedataset':
122+
return io.ByCuboidSlice(io.OneDimensionalBlockSlicer(), dataset_extent,
123+
mpi_rank, mpi_size)
124+
else:
125+
raise RuntimeError("Unknown distribution strategy: " +
126+
strategy_identifier)
127+
128+
88129
class pipe:
89130
"""
90131
Represents the configuration of one "pipe" pass.
@@ -200,7 +241,8 @@ def __copy(self, src, dest, current_path="/data/"):
200241
dest.make_constant(src.get_attribute("value"))
201242
else:
202243
chunk_table = src.available_chunks()
203-
strategy = io.BinPacking()
244+
strategy = distribution_strategy(shape, self.comm.rank,
245+
self.comm.size)
204246
my_chunks = strategy.assign_chunks(chunk_table, self.inranks,
205247
self.outranks)
206248
for chunk in [

0 commit comments

Comments
 (0)