Skip to content

Commit e19df7c

Browse files
Rework pairing to support more than two sequences
1 parent efacc69 commit e19df7c

File tree

1 file changed

+68
-72
lines changed

1 file changed

+68
-72
lines changed

src/util/pairaln.cpp

Lines changed: 68 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -9,110 +9,106 @@
99
#include "NcbiTaxonomy.h"
1010
#include "MappingReader.h"
1111

12-
#define ZSTD_STATIC_LINKING_ONLY
13-
#include <zstd.h>
14-
1512
#include <map>
1613

1714
#ifdef OPENMP
1815
#include <omp.h>
1916
#endif
2017

18+
19+
// need for sorting the results
20+
static bool compareByTaxId(const Matcher::result_t &first, const Matcher::result_t &second) {
21+
return (first.dbOrfStartPos < second.dbOrfStartPos);
22+
}
23+
2124
int pairaln(int argc, const char **argv, const Command& command) {
2225
Parameters &par = Parameters::getInstance();
2326
par.parseParameters(argc, argv, command, true, 0, 0);
2427

25-
const bool sameDB = par.db1.compare(par.db2) == 0 ? true : false;
26-
const bool touch = (par.preloadMode != Parameters::PRELOAD_MODE_MMAP);
27-
2828
std::string db2NoIndexName = PrefilteringIndexReader::dbPathWithoutIndex(par.db2);
2929
MappingReader* mapping = new MappingReader(db2NoIndexName);
30-
const int dbaccessMode = (DBReader<unsigned int>::USE_INDEX | DBReader<unsigned int>::USE_DATA);
31-
IndexReader qDbr(par.db1, par.threads, IndexReader::SRC_SEQUENCES, (touch) ? (IndexReader::PRELOAD_INDEX | IndexReader::PRELOAD_DATA) : 0, dbaccessMode);
32-
IndexReader *tDbr;
33-
if (sameDB) {
34-
tDbr = &qDbr;
35-
} else {
36-
tDbr = new IndexReader(par.db2, par.threads, IndexReader::SRC_SEQUENCES, (touch) ? (IndexReader::PRELOAD_INDEX | IndexReader::PRELOAD_DATA) : 0, dbaccessMode);
37-
}
3830

3931
DBReader<unsigned int> alnDbr(par.db3.c_str(), par.db3Index.c_str(), par.threads, DBReader<unsigned int>::USE_INDEX|DBReader<unsigned int>::USE_DATA);
4032
alnDbr.open(DBReader<unsigned int>::NOSORT);
41-
if(alnDbr.getSize() % 2 != 0){
42-
Debug(Debug::ERROR) << "Alignment database does not seem to be paired\n";
43-
EXIT(EXIT_FAILURE);
44-
}
45-
size_t localThreads = 1;
46-
#ifdef OPENMP
47-
localThreads = std::max(std::min((size_t)par.threads, alnDbr.getSize()), (size_t)1);
48-
#endif
4933

50-
DBWriter resultWriter(par.db4.c_str(), par.db4Index.c_str(), localThreads, par.compressed, alnDbr.getDbtype());
34+
DBWriter resultWriter(par.db4.c_str(), par.db4Index.c_str(), 1, par.compressed, alnDbr.getDbtype());
5135
resultWriter.open();
5236

5337
Debug::Progress progress(alnDbr.getSize());
54-
#pragma omp parallel num_threads(localThreads)
55-
{
56-
unsigned int thread_idx = 0;
57-
#ifdef OPENMP
58-
thread_idx = static_cast<unsigned int>(omp_get_thread_num());
59-
#endif
38+
do {
39+
int thread_idx = 0;
6040
char buffer[1024 + 32768*4];
61-
std::string result;
62-
result.reserve(1024 * 1024);
63-
std::vector<Matcher::result_t> resultA;
64-
resultA.reserve(100000);
65-
std::vector<Matcher::result_t> resultB;
66-
resultB.reserve(100000);
67-
std::unordered_map<unsigned int, Matcher::result_t *> findPair;
68-
std::string outputA;
69-
outputA.reserve(100000);
70-
std::string outputB;
71-
outputB.reserve(100000);
72-
#pragma omp for schedule(static, 2)
73-
for (size_t i = 0; i < alnDbr.getSize(); i+=2) {
74-
progress.updateProgress();
75-
progress.updateProgress();
76-
resultA.clear();
77-
resultB.clear();
78-
outputA.clear();
79-
outputB.clear();
80-
Matcher::readAlignmentResults(resultA, alnDbr.getData(i, thread_idx), true);
81-
Matcher::readAlignmentResults(resultB, alnDbr.getData(i+1, thread_idx), true);
41+
std::vector<Matcher::result_t> result;
42+
result.reserve(100000);
43+
std::unordered_map<unsigned int, size_t> findPair;
44+
std::string output;
45+
output.reserve(100000);
8246

83-
for (size_t resIdx = 0; resIdx < resultA.size(); ++resIdx) {
84-
unsigned int taxon = mapping->lookup(resultA[resIdx].dbKey);
85-
if(findPair.find(taxon) == findPair.end()){
86-
findPair.insert({taxon, &resultA[resIdx]});
47+
unsigned int referenceDbKey = 0;
48+
size_t id = alnDbr.getId(referenceDbKey);
49+
if (id == UINT_MAX) {
50+
break;
51+
}
52+
char* data = alnDbr.getData(id, thread_idx);
53+
Matcher::readAlignmentResults(result, data, true);
54+
for (size_t resIdx = 0; resIdx < result.size(); ++resIdx) {
55+
unsigned int taxon = mapping->lookup(result[resIdx].dbKey);
56+
if (findPair.find(taxon) == findPair.end()) {
57+
findPair.emplace(taxon, 1);
58+
}
59+
}
60+
61+
// find intersection between all proteins
62+
for (size_t i = 0; i < alnDbr.getSize(); i++) {
63+
unsigned int dbKey = alnDbr.getDbKey(i);
64+
if (dbKey == referenceDbKey) {
65+
continue;
66+
}
67+
result.clear();
68+
Matcher::readAlignmentResults(result, alnDbr.getData(i, thread_idx), true);
69+
// find pairs
70+
for (size_t resIdx = 0; resIdx < result.size(); ++resIdx) {
71+
unsigned int taxon = mapping->lookup(result[resIdx].dbKey);
72+
if (findPair.find(taxon) != findPair.end()) {
73+
findPair[taxon]++;
8774
}
8875
}
76+
}
77+
78+
for (size_t i = 0; i < alnDbr.getSize(); i++) {
79+
progress.updateProgress();
80+
result.clear();
81+
output.clear();
82+
Matcher::readAlignmentResults(result, alnDbr.getData(i, thread_idx), true);
8983
// find pairs
90-
for (size_t resIdx = 0; resIdx < resultB.size(); ++resIdx) {
91-
unsigned int taxon = mapping->lookup(resultB[resIdx].dbKey);
84+
for (size_t resIdx = 0; resIdx < result.size(); ++resIdx) {
85+
unsigned int taxon = mapping->lookup(result[resIdx].dbKey);
86+
// we don't want to introduce a new field, reuse existing unused field here
87+
result[resIdx].dbOrfStartPos = taxon;
88+
}
89+
// stable sort is required to assure that best hit is first per taxon
90+
std::stable_sort(result.begin(), result.end(), compareByTaxId);
91+
unsigned int prevTaxon = UINT_MAX;
92+
for (size_t resIdx = 0; resIdx < result.size(); ++resIdx) {
93+
unsigned int taxon = result[resIdx].dbOrfStartPos;
9294
// found pair!
93-
if(findPair.find(taxon) != findPair.end()) {
94-
bool hasBacktrace = (resultB[resIdx].backtrace.size() > 0);
95-
size_t len = Matcher::resultToBuffer(buffer, *findPair[taxon], hasBacktrace, false, false);
96-
outputA.append(buffer, len);
97-
len = Matcher::resultToBuffer(buffer, resultB[resIdx], hasBacktrace, false, false);
98-
outputB.append(buffer, len);
99-
findPair.erase (taxon);
95+
if (taxon != prevTaxon
96+
&& findPair.find(taxon) != findPair.end()
97+
&& findPair[taxon] == alnDbr.getSize()) {
98+
bool hasBacktrace = result[resIdx].backtrace.size() > 0;
99+
size_t len = Matcher::resultToBuffer(buffer, result[resIdx], hasBacktrace, false, false);
100+
output.append(buffer, len);
100101
}
102+
prevTaxon = taxon;
101103
}
102-
resultWriter.writeData(outputA.c_str(), outputA.length(), alnDbr.getDbKey(i), thread_idx);
103-
resultWriter.writeData(outputB.c_str(), outputB.length(), alnDbr.getDbKey(i+1), thread_idx);
104-
findPair.clear();
104+
resultWriter.writeData(output.c_str(), output.length(), alnDbr.getDbKey(i), thread_idx);
105105
}
106-
}
106+
} while(false);
107107

108108
resultWriter.close();
109109
// clean up
110110
delete mapping;
111111
alnDbr.close();
112-
if (sameDB == false) {
113-
delete tDbr;
114-
}
115-
116-
return 0;
112+
return EXIT_SUCCESS;
117113
}
118114

0 commit comments

Comments
 (0)