Skip to content

Commit 37ba072

Browse files
committed
Multi-threading added
1 parent 94a0d7a commit 37ba072

6 files changed

Lines changed: 129 additions & 81 deletions

File tree

Makefile.in

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
CXXFLAGS += -O3 -DNDEBUG -std=c++11 -Isrc -I @mathinc@
1+
CXXFLAGS += -O3 -DNDEBUG -std=c++11 -fopenmp -Isrc -I @mathinc@
22
CPPFLAGS += @amcppflags@
33

44
UNAME_S=$(shell uname -s)

src/cgi/core_genome_identity.cpp

Lines changed: 54 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <ctime>
99
#include <chrono>
1010
#include <functional>
11+
#include <omp.h>
1112

1213
//Own includes
1314
#include "map/include/map_parameters.hpp"
@@ -23,6 +24,12 @@
2324

2425
int main(int argc, char** argv)
2526
{
27+
/*
28+
* Make sure env variable MALLOC_ARENA_MAX is unset
29+
* for efficient multi-threaded execution
30+
*/
31+
unsetenv((char *)"MALLOC_ARENA_MAX");
32+
2633
CommandLineProcessing::ArgvParser cmd;
2734
using namespace std::placeholders; // for _1, _2, _3...
2835

@@ -34,60 +41,72 @@ int main(int argc, char** argv)
3441

3542
skch::parseandSave(argc, argv, cmd, parameters);
3643

37-
//Redirect Mashmap's mapping output to null fs, using file name for CGI output
3844
std::string fileName = parameters.outFileName;
3945

40-
#ifdef DEBUG
41-
parameters.outFileName = parameters.outFileName + ".map";
42-
#else
46+
//To redirect Mashmap's mapping output to null fs, using file name for CGI output
4347
parameters.outFileName = "/dev/null";
44-
#endif
45-
46-
auto t0 = skch::Time::now();
47-
48-
//Build the sketch for reference
49-
skch::Sketch referSketch(parameters);
50-
51-
std::chrono::duration<double> timeRefSketch = skch::Time::now() - t0;
52-
std::cerr << "INFO, skch::main, Time spent sketching the reference : " << timeRefSketch.count() << " sec" << std::endl;
5348

54-
//Initialize the files to delete the existing content
55-
{
56-
#ifdef DEBUG
57-
std::ofstream outstrm2(fileName + ".map.1way");
58-
std::ofstream outstrm3(fileName + ".map.2way");
59-
#endif
60-
if(parameters.visualize)
61-
std::ofstream outstrm4(fileName + ".visual");
62-
}
49+
//Set up for parallel execution
50+
omp_set_num_threads( parameters.threads );
51+
std::vector <skch::Parameters> parameters_split (parameters.threads);
52+
cgi::splitReferenceGenomes (parameters, parameters_split);
6353

6454
//Final output vector of ANI computation
6555
std::vector<cgi::CGI_Results> finalResults;
6656

67-
//Loop over query genomes
68-
for(uint64_t queryno = 0; queryno < parameters.querySequences.size(); queryno++)
57+
#pragma omp parallel for
58+
for (uint64_t i = 0; i < parameters.threads; i++)
6959
{
70-
t0 = skch::Time::now();
60+
//start timer
61+
auto t0 = skch::Time::now();
62+
63+
//Build the sketch for reference
64+
skch::Sketch referSketch(parameters_split[i]);
65+
66+
std::chrono::duration<double> timeRefSketch = skch::Time::now() - t0;
67+
68+
if ( omp_get_thread_num() == 0)
69+
std::cerr << "INFO [thread 0], skch::main, Time spent sketching the reference : " << timeRefSketch.count() << " sec" << std::endl;
70+
71+
//Final output vector of ANI computation
72+
std::vector<cgi::CGI_Results> finalResults_local;
73+
74+
//Loop over query genomes
75+
for(uint64_t queryno = 0; queryno < parameters_split[i].querySequences.size(); queryno++)
76+
{
77+
t0 = skch::Time::now();
78+
79+
skch::MappingResultsVector_t mapResults;
80+
uint64_t totalQueryFragments = 0;
81+
82+
auto fn = std::bind(skch::Map::insertL2ResultsToVec, std::ref(mapResults), _1);
83+
skch::Map mapper = skch::Map(parameters_split[i], referSketch, totalQueryFragments, queryno, fn);
84+
85+
std::chrono::duration<double> timeMapQuery = skch::Time::now() - t0;
86+
87+
if ( omp_get_thread_num() == 0)
88+
std::cerr << "INFO [thread 0], skch::main, Time spent mapping fragments in query #" << queryno + 1 << " : " << timeMapQuery.count() << " sec" << std::endl;
7189

72-
skch::MappingResultsVector_t mapResults;
73-
uint64_t totalQueryFragments = 0;
90+
t0 = skch::Time::now();
7491

75-
auto fn = std::bind(skch::Map::insertL2ResultsToVec, std::ref(mapResults), _1);
76-
skch::Map mapper = skch::Map(parameters, referSketch, totalQueryFragments, queryno, fn);
92+
cgi::computeCGI(parameters_split[i], mapResults, mapper, referSketch, totalQueryFragments, queryno, fileName, finalResults_local);
7793

78-
std::chrono::duration<double> timeMapQuery = skch::Time::now() - t0;
79-
std::cerr << "INFO, skch::main, Time spent mapping fragments in query #" << queryno + 1 << " : " << timeMapQuery.count() << " sec" << std::endl;
94+
std::chrono::duration<double> timeCGI = skch::Time::now() - t0;
8095

81-
t0 = skch::Time::now();
96+
if ( omp_get_thread_num() == 0)
97+
std::cerr << "INFO [thread 0], skch::main, Time spent post mapping : " << timeCGI.count() << " sec" << std::endl;
98+
}
8299

83-
cgi::computeCGI(parameters, mapResults, mapper, referSketch, totalQueryFragments, queryno, fileName, finalResults);
100+
cgi::correctRefGenomeIds (finalResults_local);
84101

85-
std::chrono::duration<double> timeCGI = skch::Time::now() - t0;
86-
std::cerr << "INFO, skch::main, Time spent post mapping : " << timeCGI.count() << " sec" << std::endl;
102+
#pragma omp critical
103+
finalResults.insert (finalResults.end(), finalResults_local.begin(), finalResults_local.end());
87104
}
88105

106+
//report output in file
89107
cgi::outputCGI (parameters, finalResults, fileName);
90108

109+
//report output as matrix
91110
if (parameters.matrixOutput)
92111
cgi::outputPhylip (parameters, finalResults, fileName);
93112
}

src/cgi/include/computeCoreIdentity.hpp

Lines changed: 40 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include <algorithm>
1111
#include <unordered_map>
1212
#include <fstream>
13+
#include <omp.h>
1314

1415
//Own includes
1516
#include "map/include/base_types.hpp"
@@ -172,25 +173,6 @@ namespace cgi
172173
}
173174
}
174175

175-
#ifdef DEBUG
176-
{
177-
std::ofstream outstrm(fileName + ".map.1way", std::ios::app);
178-
179-
//Report all mappings that contribute to core-genome identity estimate
180-
for(auto &e : mappings_1way)
181-
{
182-
if(e.nucIdentity != 0)
183-
outstrm << parameters.querySequences[queryFileNo]
184-
<< " " << parameters.refSequences[e.genomeId]
185-
<< " " << e.querySeqId
186-
<< " " << e.refSequenceId
187-
<< " " << e.mapRefPosBin
188-
<< " " << e.nucIdentity
189-
<< "\n";
190-
}
191-
}
192-
#endif
193-
194176
///2. Now, we compute 2-way ANI
195177
//For each mapped region, and within a reference bin bucket, single best query mapping is preserved
196178
{
@@ -214,32 +196,13 @@ namespace cgi
214196
}
215197
}
216198

217-
#ifdef DEBUG
218-
{
219-
std::ofstream outstrm(fileName + ".map.2way", std::ios::app);
220-
221-
//Report all mappings that contribute to core-genome identity estimate
222-
for(auto &e : mappings_2way)
223-
{
224-
outstrm << parameters.querySequences[queryFileNo]
225-
<< " " << parameters.refSequences[e.genomeId]
226-
<< " " << e.querySeqId
227-
<< " " << e.refSequenceId
228-
<< " " << e.mapRefPosBin
229-
<< " " << e.nucIdentity
230-
<< "\n";
231-
}
232-
}
233-
#endif
234-
235199
{
236200
if(parameters.visualize)
237201
{
238202
outputVisualizationFile(parameters, mappings_2way, mapper, refSketch, queryFileNo, fileName);
239203
}
240204
}
241205

242-
243206
//Do average for ANI/AAI computation
244207
//mappings_2way should be sorted by genomeId
245208

@@ -397,6 +360,45 @@ namespace cgi
397360

398361
outstrm.close();
399362
}
363+
364+
/**
365+
* @brief generate multiple parameter objects from one
366+
* @details purpose it to divide the list of reference genomes
367+
* into as many buckets as there are threads
368+
* @param[in] parameters
369+
* @param[out] parameters_split
370+
*/
371+
void splitReferenceGenomes(skch::Parameters &parameters,
372+
std::vector <skch::Parameters> &parameters_split)
373+
{
374+
for (int i = 0; i < parameters.threads; i++)
375+
{
376+
parameters_split[i] = parameters;
377+
378+
//update the reference genomes list
379+
parameters_split[i].refSequences.clear();
380+
381+
//assign ref. genome to threads in round-robin fashion
382+
for (int j = 0; j < parameters.refSequences.size(); j++)
383+
{
384+
if (j % parameters.threads == i)
385+
parameters_split[i].refSequences.push_back (parameters.refSequences[j]);
386+
}
387+
}
388+
}
389+
390+
/**
391+
* @brief update thread local reference genome ids to global ids
392+
* @param[in/out] CGI_ResultsVector
393+
*/
394+
void correctRefGenomeIds (std::vector<cgi::CGI_Results> &CGI_ResultsVector)
395+
{
396+
int tid = omp_get_thread_num();
397+
int thread_count = omp_get_num_threads();
398+
399+
for (auto &e : CGI_ResultsVector)
400+
e.refGenomeId = e.refGenomeId * thread_count + tid;
401+
}
400402
}
401403

402404
#endif

src/map/include/map_parameters.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ namespace skch
2424
int windowSize; //window size used for sketching
2525
int minReadLength; //minimum read length which code maps
2626
int minFragments; //minimum fragment mappings for trusting ANI value
27+
int threads; //thread count
2728
int alphabetSize; //alphabet size
2829
uint64_t referenceSize; //Approximate reference size
2930
float percentageIdentity; //user defined threshold for good similarity

src/map/include/parseCmdArgs.hpp

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,12 @@ Example usage: \n\
5252
cmd.defineOption("kmer", "kmer size <= 16 [default : 16]", ArgvParser::OptionRequiresValue);
5353
cmd.defineOptionAlternative("kmer","k");
5454

55+
cmd.defineOption("threads", "thread count for parallel execution [default : 1]", ArgvParser::OptionRequiresValue);
56+
cmd.defineOptionAlternative("threads","t");
57+
5558
cmd.defineOption("fragLen", "fragment length [default : 3,000]", ArgvParser::OptionRequiresValue);
5659

57-
cmd.defineOption("minFrag", "minimum fragments for trusting ANI [default : 50]", ArgvParser::OptionRequiresValue);
60+
cmd.defineOption("minFrag", "minimum matched fragments for trusting ANI [default : 50]", ArgvParser::OptionRequiresValue);
5861

5962
cmd.defineOption("visualize", "output mappings for visualization, can be enabled for single genome to single genome comparison only [disabled by default]");
6063

@@ -141,6 +144,7 @@ Example usage: \n\
141144
std::cerr << "Query = " << parameters.querySequences << std::endl;
142145
std::cerr << "Kmer size = " << parameters.kmerSize << std::endl;
143146
std::cerr << "Fragment length = " << parameters.minReadLength << std::endl;
147+
std::cerr << "Threads = " << parameters.threads << std::endl;
144148
std::cerr << "ANI output file = " << parameters.outFileName << std::endl;
145149
std::cerr << ">>>>>>>>>>>>>>>>>>" << std::endl;
146150
}
@@ -196,7 +200,10 @@ Example usage: \n\
196200
}
197201

198202
//Size of reference
199-
parameters.referenceSize = skch::CommonFunc::getReferenceSize(parameters.refSequences);
203+
//parameters.referenceSize = skch::CommonFunc::getReferenceSize(parameters.refSequences);
204+
205+
//fix reference length to a typical bacterial genome length
206+
parameters.referenceSize = 5000000;
200207
str.clear();
201208

202209
//Parse query files
@@ -261,6 +268,15 @@ Example usage: \n\
261268
parameters.kmerSize = 5;
262269
}
263270

271+
if(cmd.foundOption("threads"))
272+
{
273+
str << cmd.optionValue("threads");
274+
str >> parameters.threads;
275+
str.clear();
276+
}
277+
else
278+
parameters.threads = 1;
279+
264280
parameters.p_value = 1e-03;
265281

266282
if(cmd.foundOption("fragLen"))

src/map/include/winSketch.hpp

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include <map>
1414
#include <cassert>
1515
#include <zlib.h>
16+
#include <omp.h>
1617

1718
//Own includes
1819
#include "map/include/commonFunc.hpp"
@@ -165,7 +166,8 @@ namespace skch
165166
fclose(file);
166167
}
167168

168-
std::cerr << "INFO, skch::Sketch::build, minimizers picked from reference = " << minimizerIndex.size() << std::endl;
169+
if ( omp_get_thread_num() == 0)
170+
std::cerr << "INFO [thread 0], skch::Sketch::build, minimizers picked from reference = " << minimizerIndex.size() << std::endl;
169171

170172
}
171173

@@ -182,7 +184,8 @@ namespace skch
182184
MinimizerMetaData{e.seqId, e.wpos});
183185
}
184186

185-
std::cerr << "INFO, skch::Sketch::index, unique minimizers = " << minimizerPosLookupIndex.size() << std::endl;
187+
if ( omp_get_thread_num() == 0)
188+
std::cerr << "INFO [thread 0], skch::Sketch::index, unique minimizers = " << minimizerPosLookupIndex.size() << std::endl;
186189
}
187190

188191
/**
@@ -197,7 +200,8 @@ namespace skch
197200
for(auto &e : this->minimizerPosLookupIndex)
198201
this->minimizerFreqHistogram[e.second.size()] += 1;
199202

200-
std::cerr << "INFO, skch::Sketch::computeFreqHist, Frequency histogram of minimizers = " << *this->minimizerFreqHistogram.begin() << " ... " << *this->minimizerFreqHistogram.rbegin() << std::endl;
203+
if ( omp_get_thread_num() == 0)
204+
std::cerr << "INFO [thread 0], skch::Sketch::computeFreqHist, Frequency histogram of minimizers = " << *this->minimizerFreqHistogram.begin() << " ... " << *this->minimizerFreqHistogram.rbegin() << std::endl;
201205

202206
//2. Compute frequency threshold to ignore most frequent minimizers
203207

@@ -227,9 +231,15 @@ namespace skch
227231
}
228232

229233
if(this->freqThreshold != std::numeric_limits<int>::max())
230-
std::cerr << "INFO, skch::Sketch::computeFreqHist, With threshold " << this->percentageThreshold << "\%, ignore minimizers occurring >= " << this->freqThreshold << " times during lookup." << std::endl;
234+
{
235+
if ( omp_get_thread_num() == 0)
236+
std::cerr << "INFO [thread 0], skch::Sketch::computeFreqHist, With threshold " << this->percentageThreshold << "\%, ignore minimizers occurring >= " << this->freqThreshold << " times during lookup." << std::endl;
237+
}
231238
else
232-
std::cerr << "INFO, skch::Sketch::computeFreqHist, With threshold " << this->percentageThreshold << "\%, consider all minimizers during lookup." << std::endl;
239+
{
240+
if ( omp_get_thread_num() == 0)
241+
std::cerr << "INFO [thread 0], skch::Sketch::computeFreqHist, With threshold " << this->percentageThreshold << "\%, consider all minimizers during lookup." << std::endl;
242+
}
233243

234244
}
235245

0 commit comments

Comments
 (0)