Skip to content

Commit 9a2c9e2

Browse files
Initial version of PythonProcessorAdapter for miniconda prepared with conda-pack or with an env installed under /home/yarn/opt
1 parent c3c69ec commit 9a2c9e2

3 files changed

Lines changed: 266 additions & 5 deletions

File tree

calvalus-processing/src/main/java/com/bc/calvalus/processing/ProcessorFactory.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.bc.calvalus.processing.beam.SnapOperatorAdapter;
2121
import com.bc.calvalus.processing.beam.SubsetProcessorAdapter;
2222
import com.bc.calvalus.processing.executable.ExecutableProcessorAdapter;
23+
import com.bc.calvalus.processing.executable.PythonProcessorAdapter;
2324
import com.bc.calvalus.processing.hadoop.HadoopProcessingService;
2425
import com.bc.ceres.core.ProgressMonitor;
2526
import org.apache.hadoop.conf.Configuration;
@@ -48,7 +49,7 @@ public class ProcessorFactory {
4849
public static final String CALVALUS_L2_PROCESSOR_FILES = "calvalus.l2.scriptFiles";
4950
private static final Logger logger = Logger.getLogger("com.bc.calvalus");
5051

51-
enum ProcessorType {OPERATOR, GRAPH, EXEC, NONE}
52+
enum ProcessorType {OPERATOR, GRAPH, EXEC, NONE, PYTHON}
5253

5354
public static ProcessorAdapter createAdapter(MapContext mapContext) throws IOException {
5455
String processorTypeString = mapContext.getConfiguration().get(JobConfigNames.CALVALUS_L2_PROCESSOR_TYPE, "NONE");
@@ -62,6 +63,8 @@ public static ProcessorAdapter createAdapter(MapContext mapContext) throws IOExc
6263
return new ExecutableProcessorAdapter(mapContext);
6364
case NONE:
6465
return new SubsetProcessorAdapter(mapContext);
66+
case PYTHON:
67+
return new PythonProcessorAdapter(mapContext);
6568

6669
}
6770
throw new IllegalArgumentException("Unknown processor type.");
@@ -92,8 +95,10 @@ public static void installProcessorBundles(String username, Configuration conf,
9295
if (i == 0) {
9396
processorType = detectProcessorType(bundlePath, executable, fs);
9497
}
95-
Collections.addAll(processorFiles, getBundleProcessorFiles(executable, bundlePath, fs));
96-
98+
Collections.addAll(processorFiles, getBundleProcessorFiles(processorType, executable, bundlePath, fs));
99+
if (processorType == ProcessorType.PYTHON) {
100+
HadoopProcessingService.addBundleScripts(bundlePath, fs, conf);
101+
}
97102
}
98103
// check for bundle to include, install it
99104
try {
@@ -143,6 +148,15 @@ public boolean accept(Path path) {
143148
if (executableFiles.length == 1) {
144149
return ProcessorType.EXEC;
145150
}
151+
final FileStatus[] pythonFiles = fs.listStatus(bundlePath, new PathFilter() {
152+
@Override
153+
public boolean accept(Path path) {
154+
return path.getName().equals(executable + ".py");
155+
}
156+
});
157+
if (pythonFiles.length == 1) {
158+
return ProcessorType.PYTHON;
159+
}
146160
return ProcessorType.OPERATOR;
147161
}
148162

@@ -219,12 +233,12 @@ public boolean accept(Path path) {
219233
}
220234
*/
221235

222-
private static String[] getBundleProcessorFiles(final String processorName, Path bundlePath, FileSystem fs) throws IOException {
236+
private static String[] getBundleProcessorFiles(final ProcessorType processorType, final String processorName, Path bundlePath, FileSystem fs) throws IOException {
223237
final FileStatus[] processorStatuses = fs.listStatus(bundlePath, new PathFilter() {
224238
@Override
225239
public boolean accept(Path path) {
226240
String filename = path.getName();
227-
return (filename.startsWith("common-") || filename.startsWith(processorName + "-")) && !isArchive(path);
241+
return (filename.startsWith("common-") || filename.startsWith(processorName + "-") || processorType == ProcessorType.PYTHON) && !isArchive(path);
228242
}
229243
});
230244
String[] processorFiles = new String[processorStatuses.length];
Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
/*
2+
* Copyright (C) 2012 Brockmann Consult GmbH (info@brockmann-consult.de)
3+
*
4+
* This program is free software; you can redistribute it and/or modify it
5+
* under the terms of the GNU General Public License as published by the Free
6+
* Software Foundation; either version 3 of the License, or (at your option)
7+
* any later version.
8+
* This program is distributed in the hope that it will be useful, but WITHOUT
9+
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10+
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11+
* more details.
12+
*
13+
* You should have received a copy of the GNU General Public License along
14+
* with this program; if not, see http://www.gnu.org/licenses/
15+
*/
16+
17+
package com.bc.calvalus.processing.executable;
18+
19+
import com.bc.calvalus.processing.ProcessorAdapter;
20+
import com.bc.calvalus.processing.beam.CalvalusProductIO;
21+
import com.bc.calvalus.processing.beam.LandsatCalvalusReaderPlugin;
22+
import com.bc.calvalus.processing.l2.ProductFormatter;
23+
import com.bc.ceres.core.ProcessObserver;
24+
import com.bc.ceres.core.ProgressMonitor;
25+
import org.apache.hadoop.conf.Configuration;
26+
import org.apache.hadoop.fs.Path;
27+
import org.apache.hadoop.mapreduce.MapContext;
28+
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
29+
import org.esa.snap.core.dataio.ProductIO;
30+
import org.esa.snap.core.dataio.ProductReader;
31+
import org.esa.snap.core.datamodel.Product;
32+
import org.esa.snap.core.gpf.GPF;
33+
import java.io.BufferedInputStream;
34+
import java.io.File;
35+
import java.io.FileInputStream;
36+
import java.io.IOException;
37+
import java.io.InputStream;
38+
import java.io.OutputStream;
39+
import java.util.HashMap;
40+
import java.util.Map;
41+
import java.util.logging.Level;
42+
43+
/**
44+
* A processor adapter that uses a Python command line to process an input product.
45+
*
46+
* Instructions how to prepare the processor bundle:
47+
* 1. Create a working dir, put an input into it, add a processor python script name1.py <input>
48+
* 2. install required packages+conda-pack into a miniconda3-name2
49+
* 3. Activate the miniconda and run the processor with ./name1.py input
50+
* 4. conda pack miniconda3-name2.tar.gz
51+
* 5. Create bundles conda-name2-1.0 with the miniconda3-name2.tar.gz and python-name1-1.0 with the python script name1.py .
52+
* 6. Test with processing request (processorName:name1, condaEnv:miniconda3-name2, outputPattern:*.nc,
53+
* checkIntersection:false, executableMemory:4096, processorBundles:conda-name2-1.0,python-name1-1.0)
54+
*
55+
* @author Martin
56+
*/
57+
public class PythonProcessorAdapter extends ProcessorAdapter {
58+
59+
private String[] outputFilenames;
60+
61+
public PythonProcessorAdapter(MapContext mapContext) {
62+
super(mapContext);
63+
}
64+
65+
@Override
66+
public boolean processSourceProduct(MODE mode, ProgressMonitor pm) throws IOException {
67+
// make sure the input is staged into the working directory
68+
Path inputPath = getInputPath();
69+
File inputFile = getInputFile();
70+
if (inputFile == null) {
71+
if (inputPath.getName().endsWith(".zip")) {
72+
for (File entry : CalvalusProductIO.uncompressArchiveToCWD(inputPath, getConfiguration())) {
73+
if ("xfdumanifest.xml".equals(entry.getName()) || entry.getName().endsWith("MTD.xml") || entry.getName().endsWith(".dim")) {
74+
inputFile = entry;
75+
break;
76+
}
77+
}
78+
if (inputFile == null) {
79+
inputFile = new File(inputPath.getName().substring(0, inputPath.getName().length()-4));
80+
}
81+
} else {
82+
inputFile = CalvalusProductIO.copyFileToLocal(inputPath, getConfiguration());
83+
}
84+
setInputFile(inputFile);
85+
}
86+
// bookkeeping of bytes read
87+
if (getMapContext().getInputSplit() instanceof FileSplit) {
88+
FileSplit fileSplit = (FileSplit) getMapContext().getInputSplit();
89+
getMapContext().getCounter("Direct File System Counters", "FILE_SPLIT_BYTES_READ").setValue(fileSplit.getLength());
90+
}
91+
//
92+
Configuration conf = getConfiguration();
93+
final String user = conf.get("mapreduce.job.user.name");
94+
final String condaenvName = conf.get("calvalus.l2.condaenv");
95+
final String processorName = conf.get("calvalus.l2.operator");
96+
final String processorCallPattern = conf.get("calvalus.l2.commandline", processorName + ".py $input");
97+
final String outputPattern = conf.get("calvalus.output.pattern");
98+
final String memoryLimit = conf.get("mapreduce.map.memory.mb");
99+
final String inputLocalPath = inputFile.getCanonicalPath();
100+
final String processorCall = processorCallPattern.replace("$input", inputLocalPath.endsWith(".zip") ? inputLocalPath.substring(0, inputLocalPath.length()-4) : inputLocalPath);
101+
final String script =
102+
"set -x; " +
103+
String.format("let m=(%s * 1024); ulimit -m $m; ", memoryLimit) +
104+
String.format("e=$(ls -d */envs/%s|head -n 1); ", condaenvName) +
105+
"if [ \"$e\" != \"\" ]; then " +
106+
" c=$(basename $(dirname $(dirname $e))); " +
107+
" miniconda_dir=$(ls -ld $c|awk '{print $11}'); " +
108+
" current_link=$(ls -ld /home/yarn/opt/$c 2> /dev/null | awk '{print $11}'); " +
109+
" if [ \"$current_link\" != \"$miniconda_dir\" ]; then " +
110+
" mkdir -p /home/yarn/opt; " +
111+
" ln -s -f -T $miniconda_dir /home/yarn/opt/$c; " +
112+
" fi; " +
113+
" eval \"$(/home/yarn/opt/$c/bin/conda shell.bash hook)\"; " +
114+
" conda activate ${e##*/}; " +
115+
"else " +
116+
String.format(" export PATH=%s/bin:$PATH; ", condaenvName) +
117+
" . activate; " +
118+
"fi; " +
119+
(inputLocalPath.endsWith(".zip") ? String.format("unzip %s; ", inputLocalPath) : "") +
120+
String.format("./%s; ", processorCall) +
121+
String.format("for n in $(ls %s); do echo CALVALUS_OUTPUT_PRODUCT $n; done", outputPattern);
122+
final String script2 = String.format("./process %s %s '%s' %s", memoryLimit, condaenvName, outputPattern, processorCall);
123+
getLogger().info("script=" + script);
124+
final String[] cmdArray = {"/bin/bash", "-c", script};
125+
final String[] env = new String[] { "HADOOP_USER_NAME=" + user };
126+
Process process = Runtime.getRuntime().exec(cmdArray, env);
127+
KeywordHandler keywordHandler = new KeywordHandler(processorName, getMapContext());
128+
129+
new ProcessObserver(process).
130+
setName(processorName).
131+
setProgressMonitor(pm).
132+
setHandler(keywordHandler).
133+
start();
134+
135+
outputFilenames = keywordHandler.getOutputFiles();
136+
return outputFilenames.length > 0;
137+
}
138+
139+
@Override
140+
public Product openProcessedProduct() throws IOException {
141+
if (outputFilenames != null && outputFilenames.length > 0) {
142+
Product product = ProductIO.readProduct(new File(".", outputFilenames[0]));
143+
CalvalusProductIO.printProductOnStdout(product, "executable output");
144+
File productFileLocation = product.getFileLocation();
145+
if (isSentinel2(outputFilenames[0])) {
146+
Map<String, Object> params = new HashMap<>();
147+
params.put("referenceBand", "B5");
148+
product = GPF.createProduct("Resample", params, product);
149+
CalvalusProductIO.printProductOnStdout(product, "resampled");
150+
product.setFileLocation(productFileLocation);
151+
} else if (isLandsat(outputFilenames[0])) {
152+
Map<String, Object> params = new HashMap<>();
153+
params.put("referenceBand", "red");
154+
product = GPF.createProduct("Resample", params, product);
155+
CalvalusProductIO.printProductOnStdout(product, "resampled");
156+
product.setFileLocation(productFileLocation);
157+
}
158+
getLogger().info(String.format("Opened product width = %d height = %d",
159+
product.getSceneRasterWidth(),
160+
product.getSceneRasterHeight()));
161+
ProductReader productReader = product.getProductReader();
162+
if (productReader != null) {
163+
getLogger().info(String.format("ReaderPlugin: %s", productReader.toString()));
164+
}
165+
if (hasInvalidStartAndStopTime(product)) {
166+
getLogger().log(Level.INFO, "Processed Product has no or invalid start/stop time. Copying from input.");
167+
// When processing with Polymere no time information is attached to the product.
168+
// When processing with MEGS and input rectangle the start time is invalid.
169+
// Therefor we have to adjust it here.
170+
copySceneRasterStartAndStopTime(getInputProduct(), product, getInputRectangle());
171+
}
172+
return product;
173+
}
174+
return null;
175+
}
176+
177+
178+
@Override
179+
public void saveProcessedProducts(ProgressMonitor pm) throws IOException {
180+
if (outputFilenames != null && outputFilenames.length > 0) {
181+
saveProcessedProductFiles(outputFilenames, pm);
182+
}
183+
}
184+
185+
protected void saveProcessedProductFiles(String[] outputFilesNames, ProgressMonitor pm) throws IOException {
186+
Configuration conf = getConfiguration();
187+
String tableOutputFilename = null;
188+
if (getInputParameters().length > 0) {
189+
for (int i=0; i<getInputParameters().length; i+=2) {
190+
if ("output".equals(getInputParameters()[i])) {
191+
tableOutputFilename = getInputParameters()[i+1];
192+
}
193+
}
194+
}
195+
pm.beginTask("saving", 1);
196+
MapContext mapContext = getMapContext();
197+
for (String outputFileName : outputFilesNames) {
198+
InputStream is = new BufferedInputStream(new FileInputStream(new File(".", outputFileName)));
199+
Path workPath = new Path(getWorkOutputDirectoryPath(),
200+
tableOutputFilename != null ? tableOutputFilename : outputFileName);
201+
OutputStream os = workPath.getFileSystem(conf).create(workPath);
202+
ProductFormatter.copyAndClose(is, os, mapContext);
203+
}
204+
pm.done();
205+
}
206+
207+
@Override
208+
public Path getOutputProductPath() throws IOException {
209+
if (outputFilenames != null && outputFilenames.length > 0) {
210+
return new Path(getWorkOutputDirectoryPath(), outputFilenames[0]);
211+
}
212+
return null;
213+
}
214+
215+
@Override
216+
public boolean supportsPullProcessing() {
217+
return false;
218+
}
219+
220+
private boolean isSentinel2(String filename) {
221+
return filename.matches("^S2.*_MSIL1C.*zip") ||
222+
filename.matches("^S2.*_MSIL2A.*zip");
223+
}
224+
225+
private boolean isLandsat(String filename) {
226+
for (String pattern : LandsatCalvalusReaderPlugin.FILENAME_PATTERNS) {
227+
if (filename.matches(pattern)) {
228+
return true;
229+
}
230+
}
231+
return false;
232+
}
233+
234+
}

calvalus-processing/src/main/java/com/bc/calvalus/processing/hadoop/HadoopProcessingService.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -659,6 +659,19 @@ public static boolean isLib(Path libPath) {
659659
return filename.endsWith(".so") || filename.contains(".so.") || filename.equals("VERSION.txt");
660660
}
661661

662+
public static void addBundleScripts(Path bundlePath, FileSystem fs, Configuration conf) throws IOException {
663+
final FileStatus[] libs = fs.listStatus(bundlePath, new PathFilter() {
664+
@Override
665+
public boolean accept(Path path) {
666+
return ! isLib(path) && ! isArchive(path);
667+
}
668+
});
669+
for (FileStatus lib : libs) {
670+
URI uri = fs.makeQualified(lib.getPath()).toUri();
671+
DistributedCache.addCacheFile(uri, conf);
672+
}
673+
}
674+
662675

663676
private static class BundleQueryCacheEntry {
664677

0 commit comments

Comments
 (0)