Skip to content

Commit 651afc6

Browse files
committed
Copy DownsampleHelper, N5RetryUtil, and RetryStats from hotknife and adapt for use by N5Client to address Google Cloud rate limit exceptions.
1 parent 3684b59 commit 651afc6

5 files changed

Lines changed: 555 additions & 61 deletions

File tree

Lines changed: 286 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,286 @@
1+
package org.janelia.render.client.spark.n5;
2+
3+
import com.beust.jcommander.Parameter;
4+
5+
import java.io.IOException;
6+
import java.io.Serializable;
7+
import java.nio.file.Paths;
8+
import java.util.Arrays;
9+
import java.util.List;
10+
import java.util.stream.Collectors;
11+
12+
import org.apache.spark.SparkConf;
13+
import org.apache.spark.api.java.JavaSparkContext;
14+
import org.janelia.alignment.util.NeuroglancerAttributes;
15+
import org.janelia.render.client.ClientRunner;
16+
import org.janelia.render.client.parameter.CommandLineParameters;
17+
import org.janelia.saalfeldlab.n5.DatasetAttributes;
18+
import org.janelia.saalfeldlab.n5.N5Writer;
19+
import org.janelia.saalfeldlab.n5.spark.downsample.N5DownsamplerSpark;
20+
import org.janelia.saalfeldlab.n5.spark.supplier.N5WriterSupplier;
21+
import org.janelia.saalfeldlab.n5.universe.N5Factory;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
25+
/**
26+
* Helper for downsampling a dataset.
27+
*/
28+
public class DownsampleHelper
29+
implements Serializable {
30+
31+
private final String basePathOrStorageUrl;
32+
private final String sZeroDatasetPath;
33+
private final int[] downsampleFactors;
34+
private final int requiredSLevel;
35+
private final List<Double> stackResolutionValues;
36+
private final String stackResolutionUnit;
37+
private final List<Long> translatePixels;
38+
private final N5RetryUtil.RetryParameters retryParameters;
39+
40+
/**
41+
* @param basePathOrStorageUrl base path or storage URL
42+
* e.g. gs://janelia-spark-test/hess_wafers_60_61_export or
43+
* /nrs/hess/data/hess_wafers_60_61/export/hess_wafers_60_61.n5
44+
*
45+
* @param sZeroDatasetPath full-resolution dataset path
46+
* e.g. /flat/w61_serial_070_to_079/w61_s076_r00/raw_clahe/s0
47+
*
48+
* @param downsampleFactors per-dimension factors applied at each downsampling step (e.g. 2,2,1).
49+
*
50+
* @param requiredSLevel the minimum s-level that must be produced (e.g. 9).
51+
*
52+
* @param retryParameters parameters controlling retry behavior on failure
53+
* (specify as null if you do not want retries performed).
54+
*
55+
* @throws IOException
56+
* if the sZeroDatasetPath does not end with '/s0'.
57+
*/
58+
public DownsampleHelper(final String basePathOrStorageUrl,
59+
final String sZeroDatasetPath,
60+
final int[] downsampleFactors,
61+
final int requiredSLevel,
62+
final List<Double> stackResolutionValues,
63+
final String stackResolutionUnit,
64+
final List<Long> translatePixels,
65+
final N5RetryUtil.RetryParameters retryParameters)
66+
throws IOException {
67+
68+
this.basePathOrStorageUrl = basePathOrStorageUrl;
69+
70+
this.sZeroDatasetPath = sZeroDatasetPath;
71+
if (! sZeroDatasetPath.endsWith("/s0")) {
72+
throw new IOException("sZeroDatasetPath must end with '/s0'");
73+
}
74+
75+
this.downsampleFactors = downsampleFactors;
76+
this.requiredSLevel = requiredSLevel;
77+
this.stackResolutionValues = stackResolutionValues;
78+
this.stackResolutionUnit = stackResolutionUnit;
79+
this.translatePixels = translatePixels;
80+
this.retryParameters = retryParameters;
81+
}
82+
83+
/**
84+
* Downsamples the N5 dataset iteratively, creating s1, s2, ... and writing neuroglancer attributes.
85+
* Downsampling will stop when the sN result contains a single block is greater than or equal to the requiredSLevel.
86+
*
87+
* @param sparkContext the Spark context used for distributed processing.
88+
*
89+
* @throws IOException
90+
* if an N5 read or write operation fails.
91+
*/
92+
public void run(final JavaSparkContext sparkContext)
93+
throws IOException {
94+
95+
LOG.info("run: entry, basePathOrStorageUrl={}, sZeroDatasetPath={}, downsampleFactors={}, requiredSLevel={}, retryParameters={}",
96+
basePathOrStorageUrl, sZeroDatasetPath, Arrays.toString(downsampleFactors), requiredSLevel, retryParameters);
97+
98+
final N5WriterSupplier n5Supplier = () ->
99+
new N5Factory().openWriter(N5Factory.StorageFormat.N5, basePathOrStorageUrl);
100+
101+
final N5Writer n5 = n5Supplier.get();
102+
final DatasetAttributes fullScaleAttributes = n5.getDatasetAttributes(sZeroDatasetPath);
103+
final long[] dimensions = fullScaleAttributes.getDimensions();
104+
final int numberOfDimensions = dimensions.length;
105+
final int[] outputBlockSize = fullScaleAttributes.getBlockSize();
106+
final String outputGroupPath = sZeroDatasetPath.substring(0, sZeroDatasetPath.lastIndexOf('/'));
107+
108+
int numberOfDownsampledDatasets = 0;
109+
long downsampledBlockCount = 2;
110+
for (int scale = 1; (downsampledBlockCount > 1) || scale <= requiredSLevel; scale++) {
111+
112+
final String fromDataset = scale == 1 ? sZeroDatasetPath : outputGroupPath + "/s" + (scale - 1);
113+
final String toDataset = outputGroupPath + "/s" + scale;
114+
115+
final int[] scaleFactors = new int[numberOfDimensions];
116+
for (int d = 0; d < numberOfDimensions; d++) {
117+
scaleFactors[d] = (int) Math.round(Math.pow(downsampleFactors[d], scale));
118+
}
119+
120+
long blockCount = 1;
121+
final long[] downsampledDimensions = new long[numberOfDimensions];
122+
for (int d = 0; d < numberOfDimensions; d++) {
123+
downsampledDimensions[d] = dimensions[d] / scaleFactors[d];
124+
final long blocksInDim = (downsampledDimensions[d] + outputBlockSize[d] - 1) / outputBlockSize[d];
125+
blockCount *= blocksInDim;
126+
}
127+
downsampledBlockCount = blockCount;
128+
129+
if (n5.datasetExists(toDataset)) {
130+
131+
final DatasetAttributes toDatasetAttributes = n5.getDatasetAttributes(toDataset);
132+
final long[] toDatasetDimensions = toDatasetAttributes.getDimensions();
133+
for (int d = 0; d < numberOfDimensions; d++) {
134+
if (toDatasetDimensions[d] != downsampledDimensions[d]) {
135+
throw new IOException(
136+
"existing dataset " + toDataset + " has " + toDatasetDimensions[d] +
137+
" pixels in axis " + d + " instead of " + downsampledDimensions[d] +
138+
" pixels (based on downsampleFactor " + downsampleFactors[d] + ")");
139+
}
140+
}
141+
142+
LOG.info("run: skipping s{} because {} already exists", scale, toDataset);
143+
numberOfDownsampledDatasets++;
144+
145+
continue;
146+
}
147+
148+
final String operationDescription = "downsample " + fromDataset + " to " + toDataset;
149+
LOG.info("run: {} with {} downsampled block(s)", operationDescription, downsampledBlockCount);
150+
151+
if (retryParameters == null) {
152+
N5DownsamplerSpark.downsample(sparkContext,
153+
n5Supplier,
154+
fromDataset,
155+
toDataset,
156+
downsampleFactors,
157+
null);
158+
} else {
159+
160+
try {
161+
final RetryStats retryStats = N5RetryUtil.executeWithRetryVoid(
162+
() -> N5DownsamplerSpark.downsample(sparkContext,
163+
n5Supplier,
164+
fromDataset,
165+
toDataset,
166+
downsampleFactors),
167+
retryParameters,
168+
operationDescription);
169+
170+
LOG.info("run: {}", retryStats);
171+
172+
} catch (final Exception e) {
173+
throw new IOException(e);
174+
}
175+
176+
}
177+
178+
numberOfDownsampledDatasets++;
179+
}
180+
181+
// save additional parameters so that n5 can be viewed in neuroglancer
182+
final NeuroglancerAttributes ngAttributes =
183+
new NeuroglancerAttributes(stackResolutionValues,
184+
stackResolutionUnit,
185+
numberOfDownsampledDatasets,
186+
downsampleFactors,
187+
translatePixels,
188+
NeuroglancerAttributes.NumpyContiguousOrdering.FORTRAN);
189+
190+
ngAttributes.write(n5Supplier.get(), Paths.get(sZeroDatasetPath));
191+
192+
LOG.info("run: exit, generated {} downsampled datasets for {}", numberOfDownsampledDatasets, outputGroupPath);
193+
}
194+
195+
public static class Parameters
196+
extends CommandLineParameters {
197+
198+
@Parameter(
199+
names = "--basePathOrStorageUrl",
200+
description = "Base path or storage URL, e.g. gs://janelia-spark-test/hess_wafers_60_61_export or " +
201+
"/nrs/hess/data/hess_wafers_60_61/export/hess_wafers_60_61.n5",
202+
required = true)
203+
public String basePathOrStorageUrl;
204+
205+
@Parameter(
206+
names = "--fullResolutionDataset",
207+
description = "Full-resolution dataset path, e.g. /flat/w61_serial_070_to_079/w61_s076_r00/raw_clahe/s0",
208+
required = true)
209+
public String fullResolutionDataset;
210+
211+
@Parameter(
212+
names = "--factors",
213+
description = "Scale pyramid with given factors, e.g. 2,2,1",
214+
required = true)
215+
public String factors;
216+
217+
@Parameter(
218+
names = "--requiredSLevel",
219+
description = "The minimum s-level that must be produced, e.g. 9")
220+
public int requiredSLevel = 0;
221+
222+
@Parameter(
223+
names = "--stackResolution",
224+
description = "Resolution of the full scale x, y, and z axis pixels, e.g. 8,8,8",
225+
required = true)
226+
public String stackResolution;
227+
228+
@Parameter(
229+
names = "--stackResolutionUnit",
230+
description = "Unit description for stack resolution values, e.g. nm, um, ...")
231+
public String stackResolutionUnit = "nm";
232+
233+
@Parameter(
234+
names = "--translate",
235+
description = "Translation pixels for the full scale x, y, and z axis, e.g. 100,-77,1. " +
236+
"Omit if translation is not needed.")
237+
public String translate;
238+
239+
public int[] getDownsampleFactors() {
240+
return Util.parseCSIntArray(factors);
241+
}
242+
243+
public List<Double> getStackResolutionValues() {
244+
return Arrays.stream(Util.parseCSIntArray(stackResolution)).asDoubleStream()
245+
.boxed().collect(Collectors.toList());
246+
247+
}
248+
249+
public List<Long> getTranslatePixels() {
250+
return Arrays.stream(Util.parseCSIntArray(translate)).asLongStream()
251+
.boxed().collect(Collectors.toList());
252+
}
253+
}
254+
255+
public static void main(final String[] args) throws Exception {
256+
final ClientRunner clientRunner = new ClientRunner(args) {
257+
@Override
258+
public void runClient(final String[] args)
259+
throws Exception {
260+
261+
final Parameters parameters = new Parameters();
262+
parameters.parse(args);
263+
264+
LOG.info("runClient: entry, parameters={}", parameters);
265+
266+
final DownsampleHelper helper = new DownsampleHelper(parameters.basePathOrStorageUrl,
267+
parameters.fullResolutionDataset,
268+
parameters.getDownsampleFactors(),
269+
parameters.requiredSLevel,
270+
parameters.getStackResolutionValues(),
271+
parameters.stackResolutionUnit,
272+
parameters.getTranslatePixels(),
273+
new N5RetryUtil.RetryParameters());
274+
275+
final SparkConf conf = new SparkConf().setAppName("DownsampleHelper");
276+
final JavaSparkContext sparkContext = new JavaSparkContext(conf);
277+
helper.run(sparkContext);
278+
sparkContext.close();
279+
}
280+
};
281+
clientRunner.run();
282+
}
283+
284+
private static final Logger LOG = LoggerFactory.getLogger(DownsampleHelper.class);
285+
286+
}

0 commit comments

Comments
 (0)