|
| 1 | +#include "DBReader.h" |
| 2 | +#include "DBWriter.h" |
| 3 | +#include "Debug.h" |
| 4 | +#include "Util.h" |
| 5 | +#include "FastSort.h" |
| 6 | +#include "Parameters.h" |
| 7 | + |
| 8 | +#ifdef OPENMP |
| 9 | +#include <omp.h> |
| 10 | +#endif |
| 11 | + |
| 12 | +int createclusearchdb(int argc, const char **argv, const Command& command) { |
| 13 | + Parameters &par = Parameters::getInstance(); |
| 14 | + par.parseParameters(argc, argv, command, true, 0, MMseqsParameter::COMMAND_ALIGN); |
| 15 | + DBReader<unsigned int> clusterReader(par.db2.c_str(), par.db2Index.c_str(), par.threads, |
| 16 | + DBReader<unsigned int>::USE_DATA | DBReader<unsigned int>::USE_INDEX); |
| 17 | + clusterReader.open(DBReader<unsigned int>::NOSORT); |
| 18 | + std::vector<std::string> suffixes = Util::split(par.dbSuffixList, ","); |
| 19 | + suffixes.insert(suffixes.begin(), ""); |
| 20 | + for(size_t prefix = 0; prefix < suffixes.size(); prefix++) { |
| 21 | + std::string db1 = par.db1 + suffixes[prefix]; |
| 22 | + std::string db1Index = par.db1 + suffixes[prefix] + ".index"; |
| 23 | + DBReader<unsigned int> reader(db1.c_str(), db1Index.c_str(), par.threads, |
| 24 | + DBReader<unsigned int>::USE_DATA | DBReader<unsigned int>::USE_INDEX); |
| 25 | + reader.open(DBReader<unsigned int>::NOSORT); |
| 26 | + reader.readMmapedDataInMemory(); |
| 27 | + |
| 28 | + std::string repDbSeq = par.db3 + suffixes[prefix]; |
| 29 | + std::string repDbSeqIdx = par.db3 + suffixes[prefix] + ".index"; |
| 30 | + |
| 31 | + DBWriter dbwRep(repDbSeq.c_str(), repDbSeqIdx.c_str(), static_cast<unsigned int>(par.threads), par.compressed, |
| 32 | + reader.getDbtype()); |
| 33 | + dbwRep.open(); |
| 34 | + std::string seqsDbSeq = par.db3 + "_seq" + suffixes[prefix]; |
| 35 | + std::string seqsDbSeqIdx = par.db3 + "_seq" + suffixes[prefix] + ".index"; |
| 36 | + DBWriter dbwClu(seqsDbSeq.c_str(), seqsDbSeqIdx.c_str(), static_cast<unsigned int>(par.threads), par.compressed, |
| 37 | + reader.getDbtype()); |
| 38 | + dbwClu.open(); |
| 39 | + Debug::Progress progress(clusterReader.getSize()); |
| 40 | + #pragma omp parallel |
| 41 | + { |
| 42 | + unsigned int thread_idx = 0; |
| 43 | + #ifdef OPENMP |
| 44 | + thread_idx = static_cast<unsigned int>(omp_get_thread_num()); |
| 45 | + #endif |
| 46 | + std::string resultBuffer; |
| 47 | + // write output file |
| 48 | + #pragma omp for schedule(dynamic, 1) |
| 49 | + for (size_t id = 0; id < clusterReader.getSize(); id++) { |
| 50 | + progress.updateProgress(); |
| 51 | + char *data = clusterReader.getData(id, thread_idx); |
| 52 | + size_t repKey = clusterReader.getDbKey(id); |
| 53 | + size_t repDataId = reader.getId(repKey); |
| 54 | + size_t repEntryLen = reader.getEntryLen(repDataId); |
| 55 | + dbwRep.writeData(reader.getData(repDataId, thread_idx), repEntryLen - 1, repKey, thread_idx); |
| 56 | + while (*data != '\0') { |
| 57 | + // parse dbkey |
| 58 | + size_t dbKey = Util::fast_atoi<unsigned int>(data); |
| 59 | + if (dbKey == repKey) { |
| 60 | + data = Util::skipLine(data); |
| 61 | + continue; |
| 62 | + } |
| 63 | + size_t readerId = reader.getId(dbKey); |
| 64 | + dbwClu.writeData(reader.getData(readerId, thread_idx), |
| 65 | + reader.getEntryLen(readerId) - 1, dbKey, thread_idx); |
| 66 | + data = Util::skipLine(data); |
| 67 | + } |
| 68 | + resultBuffer.clear(); |
| 69 | + } |
| 70 | + } |
| 71 | + dbwRep.close(true); |
| 72 | + dbwClu.close(true); |
| 73 | + reader.close(); |
| 74 | + |
| 75 | + // merge index |
| 76 | + DBReader<unsigned int> dbrRep(repDbSeq.c_str(), repDbSeqIdx.c_str(), par.threads, |
| 77 | + DBReader<unsigned int>::USE_INDEX); |
| 78 | + dbrRep.open(DBReader<unsigned int>::NOSORT); |
| 79 | + DBReader<unsigned int> dbrClu(seqsDbSeq.c_str(), seqsDbSeqIdx.c_str(), par.threads, |
| 80 | + DBReader<unsigned int>::USE_INDEX); |
| 81 | + dbrClu.open(DBReader<unsigned int>::NOSORT); |
| 82 | + std::string seqsDbSeqIdxTmp = seqsDbSeqIdx + "_tmp"; |
| 83 | + |
| 84 | + FILE *sIndex = FileUtil::openAndDelete(seqsDbSeqIdxTmp.c_str(), "w"); |
| 85 | + std::vector<DBReader<unsigned int>::Index> allIndex(dbrClu.getSize() + dbrRep.getSize()); |
| 86 | + size_t dataSize = 0; |
| 87 | + for (size_t i = 0; i < dbrRep.getSize(); i++) { |
| 88 | + allIndex[i] = *dbrRep.getIndex(i); |
| 89 | + dataSize += allIndex[i].length; |
| 90 | + } |
| 91 | + for (size_t i = 0; i < dbrClu.getSize(); i++) { |
| 92 | + DBReader<unsigned int>::Index *index = dbrClu.getIndex(i); |
| 93 | + index->offset += dataSize; |
| 94 | + allIndex[dbrRep.getSize() + i] = *index; |
| 95 | + } |
| 96 | + SORT_PARALLEL(allIndex.begin(), allIndex.end(), DBReader<unsigned int>::Index::compareById); |
| 97 | + char buffer[1024]; |
| 98 | + for (size_t i = 0; i < allIndex.size(); i++) { |
| 99 | + size_t len = DBWriter::indexToBuffer(buffer, allIndex[i].id, allIndex[i].offset, allIndex[i].length); |
| 100 | + size_t written = fwrite(buffer, sizeof(char), len, sIndex); |
| 101 | + if (written != len) { |
| 102 | + Debug(Debug::ERROR) << "Cannot write index file " << seqsDbSeqIdxTmp << "\n"; |
| 103 | + EXIT(EXIT_FAILURE); |
| 104 | + } |
| 105 | + } |
| 106 | + if (fclose(sIndex) != 0) { |
| 107 | + Debug(Debug::ERROR) << "Cannot close index file " << seqsDbSeqIdxTmp << "\n"; |
| 108 | + EXIT(EXIT_FAILURE); |
| 109 | + } |
| 110 | + FileUtil::move(seqsDbSeqIdxTmp.c_str(), seqsDbSeqIdx.c_str()); |
| 111 | + FileUtil::symlinkAlias(repDbSeq, seqsDbSeq + ".0"); |
| 112 | + FileUtil::move(seqsDbSeq.c_str(), (seqsDbSeq + ".1").c_str()); |
| 113 | + } |
| 114 | + clusterReader.close(); |
| 115 | + DBReader<unsigned int>::copyDb(par.db2, par.db3 + "_clu"); |
| 116 | + |
| 117 | + struct DBSuffix { |
| 118 | + DBFiles::Files flag; |
| 119 | + const char *suffix; |
| 120 | + }; |
| 121 | + |
| 122 | + const DBSuffix suffices[] = { |
| 123 | + {DBFiles::HEADER, "_h"}, |
| 124 | + {DBFiles::HEADER_INDEX, "_h.index"}, |
| 125 | + {DBFiles::HEADER_DBTYPE, "_h.dbtype"}, |
| 126 | + {DBFiles::LOOKUP, ".lookup"}, |
| 127 | + {DBFiles::SOURCE, ".source"}, |
| 128 | + {DBFiles::TAX_MAPPING, "_mapping"}, |
| 129 | + {DBFiles::TAX_NAMES, "_names.dmp"}, |
| 130 | + {DBFiles::TAX_NODES, "_nodes.dmp"}, |
| 131 | + {DBFiles::TAX_MERGED, "_merged.dmp"}, |
| 132 | + {DBFiles::TAX_MERGED, "_taxonomy"}, |
| 133 | + }; |
| 134 | + |
| 135 | + for (size_t i = 0; i < ARRAY_SIZE(suffices); ++i) { |
| 136 | + std::string file = par.db1 + suffices[i].suffix; |
| 137 | + if (suffices[i].flag && FileUtil::fileExists(file.c_str())) { |
| 138 | + DBReader<unsigned int>::copyDb(file, par.db3 + suffices[i].suffix); |
| 139 | + } |
| 140 | + } |
| 141 | + for (size_t i = 0; i < ARRAY_SIZE(suffices); ++i) { |
| 142 | + std::string file = par.db3 + suffices[i].suffix; |
| 143 | + if (suffices[i].flag && FileUtil::fileExists(file.c_str())) { |
| 144 | + DBReader<unsigned int>::aliasDb(file, par.db3 + "_seq" + suffices[i].suffix); |
| 145 | + } |
| 146 | + } |
| 147 | + return EXIT_SUCCESS; |
| 148 | +} |
0 commit comments