|
33 | 33 | import java.util.concurrent.ExecutorService; |
34 | 34 | import java.util.concurrent.Executors; |
35 | 35 | import java.util.stream.Collectors; |
| 36 | +import java.util.Set; |
| 37 | +import java.util.HashSet; |
36 | 38 |
|
37 | 39 | import org.apache.spark.SparkConf; |
38 | 40 | import org.apache.spark.api.java.JavaRDD; |
@@ -662,6 +664,11 @@ public Void call() throws Exception |
662 | 664 | rddResult.count(); |
663 | 665 |
|
664 | 666 | final List<Tuple3<ViewId, long[][], String>> results = rddResult.collect(); |
| 667 | + final Set<String> datasetSet = new HashSet<String>(); |
| 668 | + for (Tuple3<ViewId, long[][], String> res : results) { |
| 669 | + datasetSet.add(res._3()); |
| 670 | + } |
| 671 | + final List<String> datasetNoDups = new ArrayList<>(datasetSet); |
665 | 672 |
|
666 | 673 | // assemble all interest point intervals per ViewId |
667 | 674 | final HashMap< ViewId, List< List< InterestPoint > > > interestPointsPerViewId = new HashMap<>(); |
@@ -704,18 +711,18 @@ public Void call() throws Exception |
704 | 711 | { |
705 | 712 | System.out.println( "Deleting temporary Spark files ... "); |
706 | 713 |
|
707 | | - final JavaRDD<Tuple3<ViewId, long[][], String>> rdd = sc.parallelize( results, Math.min( Spark.maxPartitions, results.size() ) ); |
| 714 | + final JavaRDD<String> rdd = sc.parallelize( datasetNoDups, Math.min( Spark.maxPartitions, datasetNoDups.size() ) ); |
708 | 715 |
|
709 | 716 | rdd.foreach( boundingBox -> |
710 | 717 | { |
711 | 718 | final N5Writer n5WriterLocal = URITools.instantiateN5Writer( StorageFormat.N5, tempURI ); |
712 | 719 |
|
713 | | - if ( n5WriterLocal.datasetExists( tempDataset + "/" + boundingBox._3() + "/points" )) |
| 720 | + if ( n5WriterLocal.datasetExists( tempDataset + "/" + boundingBox + "/points" )) |
714 | 721 | { |
715 | | - n5WriterLocal.remove( tempDataset + "/" + boundingBox._3() + "/points" ); |
| 722 | + n5WriterLocal.remove( tempDataset + "/" + boundingBox + "/points" ); |
716 | 723 |
|
717 | | - if ( n5WriterLocal.datasetExists( tempDataset + "/" + boundingBox._3() + "/intensities" ) ) |
718 | | - n5WriterLocal.remove( tempDataset + "/" + boundingBox._3() + "/intensities" ); |
| 724 | + if ( n5WriterLocal.datasetExists( tempDataset + "/" + boundingBox + "/intensities" ) ) |
| 725 | + n5WriterLocal.remove( tempDataset + "/" + boundingBox + "/intensities" ); |
719 | 726 |
|
720 | 727 | n5WriterLocal.close(); |
721 | 728 | } |
|
0 commit comments