|
33 | 33 | import java.util.concurrent.ExecutorService; |
34 | 34 | import java.util.concurrent.Executors; |
35 | 35 | import java.util.stream.Collectors; |
| 36 | +import java.util.Set; |
| 37 | +import java.util.HashSet; |
36 | 38 |
|
37 | 39 | import org.apache.spark.SparkConf; |
38 | 40 | import org.apache.spark.api.java.JavaRDD; |
@@ -654,6 +656,11 @@ public Void call() throws Exception |
654 | 656 | rddResult.count(); |
655 | 657 |
|
656 | 658 | final List<Tuple3<ViewId, long[][], String>> results = rddResult.collect(); |
| 659 | + final Set<String> datasetSet = new HashSet<String>(); |
| 660 | + for (Tuple3<ViewId, long[][], String> res : results) { |
| 661 | + datasetSet.add(res._3()); |
| 662 | + } |
| 663 | + final List<String> datasetNoDups = new ArrayList<>(datasetSet); |
657 | 664 |
|
658 | 665 | // assemble all interest point intervals per ViewId |
659 | 666 | final HashMap< ViewId, List< List< InterestPoint > > > interestPointsPerViewId = new HashMap<>(); |
@@ -696,18 +703,18 @@ public Void call() throws Exception |
696 | 703 | { |
697 | 704 | System.out.println( "Deleting temporary Spark files ... "); |
698 | 705 |
|
699 | | - final JavaRDD<Tuple3<ViewId, long[][], String>> rdd = sc.parallelize( results, Math.min( Spark.maxPartitions, results.size() ) ); |
| 706 | + final JavaRDD<String> rdd = sc.parallelize( datasetNoDups, Math.min( Spark.maxPartitions, datasetNoDups.size() ) ); |
700 | 707 |
|
701 | 708 | rdd.foreach( boundingBox -> |
702 | 709 | { |
703 | 710 | final N5Writer n5WriterLocal = URITools.instantiateN5Writer( StorageFormat.N5, tempURI ); |
704 | 711 |
|
705 | | - if ( n5WriterLocal.datasetExists( tempDataset + "/" + boundingBox._3() + "/points" )) |
| 712 | + if ( n5WriterLocal.datasetExists( tempDataset + "/" + boundingBox + "/points" )) |
706 | 713 | { |
707 | | - n5WriterLocal.remove( tempDataset + "/" + boundingBox._3() + "/points" ); |
| 714 | + n5WriterLocal.remove( tempDataset + "/" + boundingBox + "/points" ); |
708 | 715 |
|
709 | | - if ( n5WriterLocal.datasetExists( tempDataset + "/" + boundingBox._3() + "/intensities" ) ) |
710 | | - n5WriterLocal.remove( tempDataset + "/" + boundingBox._3() + "/intensities" ); |
| 716 | + if ( n5WriterLocal.datasetExists( tempDataset + "/" + boundingBox + "/intensities" ) ) |
| 717 | + n5WriterLocal.remove( tempDataset + "/" + boundingBox + "/intensities" ); |
711 | 718 |
|
712 | 719 | n5WriterLocal.close(); |
713 | 720 | } |
|
0 commit comments