Skip to content

Commit 28a74de

Browse files
committed
Specify distribution strategy per env var
1 parent 43eac71 commit 28a74de

1 file changed

Lines changed: 43 additions & 1 deletion

File tree

src/binding/python/openpmd_api/pipe/__main__.py

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from .. import openpmd_api_cxx as io
1212
import argparse
1313
import os # os.path.basename
14+
import re
1415
import sys # sys.stderr.write
1516

1617
# MPI is an optional dependency
@@ -44,6 +45,12 @@ def parse_args(program_name):
4445
as soon as the mpi4py package is found and this tool is called in an MPI
4546
context. In that case, each dataset will be equally sliced along the dimension
4647
with the largest extent.
48+
A chunk distribution strategy may be selected via the environment variable
49+
OPENPMD_CHUNK_DISTRIBUTION. Options include "roundrobin", "binpacking",
50+
"slicedataset" and "hostname_<1>_<2>", where <1> should be replaced with a
51+
strategy to be applied within a compute node and <2> with a secondary strategy
52+
in case the hostname strategy does not distribute all chunks.
53+
4754
4855
Examples:
4956
{0} --infile simData.h5 --outfile simData_%T.bp
@@ -97,6 +104,40 @@ def run(self):
97104
self.dest.store(index, item)
98105

99106

107+
def distribution_strategy(dataset_extent,
108+
mpi_rank,
109+
mpi_size,
110+
strategy_identifier=None):
111+
if strategy_identifier is None or not strategy_identifier:
112+
if 'OPENPMD_CHUNK_DISTRIBUTION' in os.environ:
113+
strategy_identifier = os.environ[
114+
'OPENPMD_CHUNK_DISTRIBUTION'].lower()
115+
else:
116+
strategy_identifier = 'hostname_binpacking_slicedataset' # default
117+
match = re.search('hostname_(.*)_(.*)', strategy_identifier)
118+
if match is not None:
119+
inside_node = distribution_strategy(dataset_extent,
120+
mpi_rank,
121+
mpi_size,
122+
strategy_identifier=match.group(1))
123+
second_phase = distribution_strategy(
124+
dataset_extent,
125+
mpi_rank,
126+
mpi_size,
127+
strategy_identifier=match.group(2))
128+
return io.FromPartialStrategy(io.ByHostname(inside_node), second_phase)
129+
elif strategy_identifier == 'roundrobin':
130+
return io.RoundRobin()
131+
elif strategy_identifier == 'binpacking':
132+
return io.BinPacking()
133+
elif strategy_identifier == 'slicedataset':
134+
return io.ByCuboidSlice(io.OneDimensionalBlockSlicer(), dataset_extent,
135+
mpi_rank, mpi_size)
136+
else:
137+
raise RuntimeError("Unknown distribution strategy: " +
138+
strategy_identifier)
139+
140+
100141
class pipe:
101142
"""
102143
Represents the configuration of one "pipe" pass.
@@ -226,7 +267,8 @@ def __copy(self, src, dest, current_path="/data/"):
226267
dest.make_constant(src.get_attribute("value"))
227268
else:
228269
chunk_table = src.available_chunks()
229-
strategy = io.BinPacking()
270+
strategy = distribution_strategy(shape, self.comm.rank,
271+
self.comm.size)
230272
my_chunks = strategy.assign_chunks(chunk_table, self.inranks,
231273
self.outranks)
232274
for chunk in [

0 commit comments

Comments
 (0)