Skip to content

Commit e667ecc

Browse files
committed
add processMultipleStacksInParallel parameter to AffineBlockSolverSetup and default process to a serial flow
1 parent bc3f4a2 commit e667ecc

2 files changed

Lines changed: 56 additions & 46 deletions

File tree

render-ws-java-client/src/main/java/org/janelia/render/client/newsolver/setup/AffineBlockSolverSetup.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -89,14 +89,11 @@ public class AffineBlockSolverSetup extends CommandLineParameters
8989
@Parameter(names = "--maxZRangeMatches", description = "max z-range in which to load matches (default: '-1' - no limit)")
9090
public int maxZRangeMatches = -1;
9191

92-
// TODO: remove this parameter if it remains unused after we are done with the wafer 53 alignment
93-
// Parameter for testing
94-
@SuppressWarnings("unused")
95-
@Parameter(
96-
names = "--visualizeResults",
97-
description = "Visualize results (if running interactively)",
98-
arity = 0)
99-
public boolean visualizeResults = false;
92+
@Parameter(
93+
names = "--processMultipleStacksInParallel",
94+
description = "If multiple stacks are to be solved, process them in parallel (default is to process them serially)."
95+
)
96+
public boolean processMultipleStacksInParallel = false;
10097

10198
public void initDefaultValues() {
10299
if (!blockOptimizer.isConsistent())

render-ws-spark-client/src/main/java/org/janelia/render/client/spark/newsolver/DistributedAffineBlockSolverClient.java

Lines changed: 51 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public void runPipelineStep(final JavaSparkContext sparkContext,
100100
throws IllegalArgumentException, IOException {
101101

102102
final MultiProjectParameters multiProject = pipelineParameters.getMultiProject(pipelineParameters.getRawNamingGroup());
103-
final List<AffineBlockSolverSetup> setupList = new ArrayList<>();
103+
final List<AffineBlockSolverSetup> basicSetupList = new ArrayList<>();
104104
final AffineBlockSolverSetup setup = pipelineParameters.getAffineBlockSolverSetup();
105105
final List<StackWithZValues> stackList = multiProject.buildListOfStackWithAllZ();
106106
final int nRuns = setup.alternatingRuns.nRuns;
@@ -109,63 +109,76 @@ public void runPipelineStep(final JavaSparkContext sparkContext,
109109
final String matchSuffix = pipelineParameters.getMatchCopyToCollectionSuffix();
110110
for (final StackWithZValues stackWithZValues : stackList) {
111111
if (setup.stitchOnly) {
112-
setupList.addAll(buildSetupForEachZLayer(stackWithZValues,
113-
multiProject,
114-
setup,
115-
matchSuffix));
112+
basicSetupList.addAll(buildSetupForEachZLayer(stackWithZValues,
113+
multiProject,
114+
setup,
115+
matchSuffix));
116116
} else {
117-
setupList.add(setup.buildPipelineClone(multiProject.getBaseDataUrl(),
118-
stackWithZValues,
119-
multiProject.deriveMatchCollectionNamesFromProject,
120-
matchSuffix));
117+
basicSetupList.add(setup.buildPipelineClone(multiProject.getBaseDataUrl(),
118+
stackWithZValues,
119+
multiProject.deriveMatchCollectionNamesFromProject,
120+
matchSuffix));
121121
}
122122
}
123123

124124
final DistributedAffineBlockSolverClient affineBlockSolverClient = new DistributedAffineBlockSolverClient();
125125

126-
if (setup.stitchOnly) {
126+
final List<List<AffineBlockSolverSetup>> listOfSetupLists = new ArrayList<>();
127+
if (setup.processMultipleStacksInParallel) {
128+
LOG.info("runPipelineStep: processing {} setups in parallel", basicSetupList.size());
129+
listOfSetupLists.add(basicSetupList);
130+
} else {
131+
LOG.info("runPipelineStep: processing {} setups serially", basicSetupList.size());
132+
for (final AffineBlockSolverSetup serialSetup : basicSetupList) {
133+
listOfSetupLists.add(Collections.singletonList(serialSetup));
134+
}
135+
}
127136

128-
// stitch each layer
129-
affineBlockSolverClient.alignSetupList(sparkContext, setupList);
137+
for (final List<AffineBlockSolverSetup> setupList : listOfSetupLists) {
138+
if (setup.stitchOnly) {
139+
140+
// stitch each layer
141+
affineBlockSolverClient.alignSetupList(sparkContext, setupList);
130142

131-
if (setup.targetStack.completeStack) {
132-
final Set<String> completedTargetStacks = new HashSet<>();
133-
for (final AffineBlockSolverSetup stackSetup : setupList) {
134-
final String targetStack = stackSetup.targetStack.stack;
135-
if (completedTargetStacks.contains(targetStack)) {
136-
continue;
143+
if (setup.targetStack.completeStack) {
144+
final Set<String> completedTargetStacks = new HashSet<>();
145+
for (final AffineBlockSolverSetup stackSetup : setupList) {
146+
final String targetStack = stackSetup.targetStack.stack;
147+
if (completedTargetStacks.contains(targetStack)) {
148+
continue;
149+
}
150+
final RenderDataClient renderDataClient = stackSetup.renderWeb.getDataClient();
151+
renderDataClient.setStackState(stackSetup.targetStack.stack, StackMetaData.StackState.COMPLETE);
152+
completedTargetStacks.add(targetStack);
137153
}
138-
final RenderDataClient renderDataClient = stackSetup.renderWeb.getDataClient();
139-
renderDataClient.setStackState(stackSetup.targetStack.stack, StackMetaData.StackState.COMPLETE);
140-
completedTargetStacks.add(targetStack);
141154
}
142-
}
143155

144-
} else if (nRuns == 1) {
156+
} else if (nRuns == 1) {
145157

146-
affineBlockSolverClient.alignSetupList(sparkContext, setupList);
158+
affineBlockSolverClient.alignSetupList(sparkContext, setupList);
147159

148-
} else {
160+
} else {
149161

150-
// Different stacks can be aligned in parallel, but each run must be done sequentially.
151-
// So, for each run, create a list of setups that will be aligned in parallel.
152-
final List<List<AffineBlockSolverSetup>> setupListsForRuns = buildSetupListsForRuns(nRuns, setupList);
162+
// Different stacks can be aligned in parallel, but each run must be done sequentially.
163+
// So, for each run, create a list of setups that will be aligned in parallel.
164+
final List<List<AffineBlockSolverSetup>> setupListsForRuns = buildSetupListsForRuns(nRuns, setupList);
153165

154-
// loop through each run and align the stacks in parallel ...
155-
for (int runIndex = 0; runIndex < nRuns; runIndex++) {
166+
// loop through each run and align the stacks in parallel ...
167+
for (int runIndex = 0; runIndex < nRuns; runIndex++) {
156168

157-
final List<AffineBlockSolverSetup> setupListForRun = setupListsForRuns.get(runIndex);
169+
final List<AffineBlockSolverSetup> setupListForRun = setupListsForRuns.get(runIndex);
158170

159-
// align all stacks for this run
160-
affineBlockSolverClient.alignSetupList(sparkContext, setupListForRun);
171+
// align all stacks for this run
172+
affineBlockSolverClient.alignSetupList(sparkContext, setupListForRun);
161173

162-
// clean-up intermediate stacks for prior runs if requested
163-
if (cleanUpIntermediateStacks && (runIndex > 0)) {
164-
setupListForRun.forEach(s -> AlternatingSolveUtils.cleanUpIntermediateStack(s.renderWeb,
165-
s.stack));
174+
// clean-up intermediate stacks for prior runs if requested
175+
if (cleanUpIntermediateStacks && (runIndex > 0)) {
176+
setupListForRun.forEach(s -> AlternatingSolveUtils.cleanUpIntermediateStack(s.renderWeb,
177+
s.stack));
178+
}
166179
}
167-
}
168180

181+
}
169182
}
170183

171184
}

0 commit comments

Comments
 (0)