|
NCBI C++ ToolKit
|
00001 /* $Id: align_sort.cpp 53842 2012-04-20 15:08:22Z mozese2 $ 00002 * =========================================================================== 00003 * 00004 * PUBLIC DOMAIN NOTICE 00005 * National Center for Biotechnology Information 00006 * 00007 * This software/database is a "United States Government Work" under the 00008 * terms of the United States Copyright Act. It was written as part of 00009 * the author's official duties as a United States Government employee and 00010 * thus cannot be copyrighted. This software/database is freely available 00011 * to the public for use. The National Library of Medicine and the U.S. 00012 * Government have not placed any restriction on its use or reproduction. 00013 * 00014 * Although all reasonable efforts have been taken to ensure the accuracy 00015 * and reliability of the software and data, the NLM and the U.S. 00016 * Government do not and cannot warrant the performance or results that 00017 * may be obtained by using this software or data. The NLM and the U.S. 00018 * Government disclaim all warranties, express or implied, including 00019 * warranties of performance, merchantability or fitness for any particular 00020 * purpose. 00021 * 00022 * Please cite the author in any work or product based on this material. 00023 * 00024 * =========================================================================== 00025 * 00026 * Authors: Mike DiCuccio 00027 * 00028 * File Description: 00029 * 00030 */ 00031 00032 #include <ncbi_pch.hpp> 00033 00034 #include <corelib/ncbifile.hpp> 00035 #include <corelib/ncbi_system.hpp> 00036 00037 #include <objmgr/util/sequence.hpp> 00038 00039 #include <objects/seq/MolInfo.hpp> 00040 #include <objects/seqalign/Score.hpp> 00041 #include <objects/general/Object_id.hpp> 00042 00043 #include <algo/align/util/align_sort.hpp> 00044 00045 00046 BEGIN_NCBI_SCOPE 00047 USING_SCOPE(objects); 00048 00049 00050 ///////////////////////////////////////////////////////////////////////////// 00051 00052 bool CAlignSort::SSortKey_Less::operator()(const TAlignment& k1, 00053 const TAlignment& k2) const 00054 { 00055 const SSortKey& key1 = k1.first; 00056 const SSortKey& key2 = k2.first; 00057 00058 for (size_t i = 0; 00059 i < key1.items.size() && i < key2.items.size(); 00060 ++i) { 00061 ESortDir dir = eAscending; 00062 if (i < sort_dirs.size()) { 00063 dir = sort_dirs[i]; 00064 } 00065 00066 switch (dir) { 00067 case eAscending: 00068 if (key1.items[i] < key2.items[i]) { 00069 return true; 00070 } 00071 if (key2.items[i] < key1.items[i]) { 00072 return false; 00073 } 00074 break; 00075 00076 case eDescending: 00077 if (key2.items[i] < key1.items[i]) { 00078 return true; 00079 } 00080 if (key1.items[i] < key2.items[i]) { 00081 return false; 00082 } 00083 break; 00084 } 00085 } 00086 00087 return key1.items.size() < key2.items.size(); 00088 } 00089 00090 00091 CAlignSort::SSortKey 00092 CAlignSort::SAlignExtractor::operator()(const CSeq_align& align) 00093 { 00094 SSortKey key; 00095 ITERATE (vector<string>, iter, key_toks) { 00096 SSortKey::TItem item; 00097 00098 if (NStr::EqualNocase(*iter, "query")) { 00099 CSeq_id_Handle idh = 00100 CSeq_id_Handle::GetHandle(align.GetSeq_id(0)); 00101 idh = sequence::GetId(idh, *scope, 00102 sequence::eGetId_Canonical); 00103 item.first = idh.GetSeqId()->AsFastaString(); 00104 } 00105 else if (NStr::EqualNocase(*iter, "subject")) { 00106 CSeq_id_Handle idh = 00107 CSeq_id_Handle::GetHandle(align.GetSeq_id(1)); 00108 idh = sequence::GetId(idh, *scope, 00109 sequence::eGetId_Canonical); 00110 item.first = idh.GetSeqId()->AsFastaString(); 00111 } 00112 00113 else if (NStr::EqualNocase(*iter, "query_start")) { 00114 item.second = align.GetSeqStart(0); 00115 } 00116 else if (NStr::EqualNocase(*iter, "subject_start")) { 00117 item.second = align.GetSeqStart(1); 00118 } 00119 00120 else if (NStr::EqualNocase(*iter, "query_end")) { 00121 item.second = align.GetSeqStop(0); 00122 } 00123 else if (NStr::EqualNocase(*iter, "subject_end")) { 00124 item.second = align.GetSeqStop(1); 00125 } 00126 00127 else if (NStr::EqualNocase(*iter, "query_strand")) { 00128 item.second = align.GetSeqStrand(0); 00129 } 00130 else if (NStr::EqualNocase(*iter, "subject_strand")) { 00131 item.second = align.GetSeqStrand(1); 00132 } 00133 00134 else if (NStr::EqualNocase(*iter, "query_align_len")) { 00135 item.second = align.GetSeqRange(0).GetLength(); 00136 } 00137 else if (NStr::EqualNocase(*iter, "subject_align_len")) { 00138 item.second = align.GetSeqRange(1).GetLength(); 00139 } 00140 00141 else { 00142 /// assume it is a score 00143 double score = 0; 00144 if (align.IsSetScore()) { 00145 ITERATE (CSeq_align::TScore, it, align.GetScore()) { 00146 const CScore& s = **it; 00147 if (s.IsSetId() && s.GetId().IsStr() && 00148 NStr::EqualNocase(*iter, s.GetId().GetStr())) { 00149 if (s.GetValue().IsInt()) { 00150 score = s.GetValue().GetInt(); 00151 } else if (s.GetValue().IsReal()) { 00152 score = s.GetValue().GetReal(); 00153 } else { 00154 NCBI_THROW(CException, eUnknown, 00155 "invalid score type"); 00156 } 00157 } 00158 } 00159 } 00160 00161 item.second = score; 00162 } 00163 key.items.push_back(item); 00164 } 00165 00166 ++count; 00167 if (count % 100000 == 0) { 00168 double e = sw.Elapsed(); 00169 LOG_POST(Error << " processed " << count 00170 << " alignments (" 00171 << count / e << " alignments/sec)"); 00172 } 00173 00174 return key; 00175 } 00176 00177 00178 ///////////////////////////////////////////////////////////////////////////// 00179 00180 CAlignSort::CAlignSort(CScope &scope, 00181 string sorting_keys, 00182 CRef<CAlignFilter> filter, 00183 const string &tmp_path, 00184 size_t memory_limit, 00185 size_t count_limit) 00186 : m_Filter(filter) 00187 , m_MemoryLimit(memory_limit) 00188 , m_CountLimit(count_limit) 00189 , m_DataSizeLimit(0) 00190 , m_Extractor(scope) 00191 { 00192 NStr::Tokenize(sorting_keys, ", \t\r\n", m_Extractor.key_toks, NStr::eMergeDelims); 00193 NON_CONST_ITERATE (vector<string>, iter, m_Extractor.key_toks) { 00194 *iter = NStr::TruncateSpaces(*iter); 00195 00196 if ((*iter)[0] == '-') { 00197 m_Predicate.sort_dirs.push_back(eDescending); 00198 iter->erase(0, 1); 00199 } else if ((*iter)[0] == '+') { 00200 m_Predicate.sort_dirs.push_back(eAscending); 00201 iter->erase(0, 1); 00202 } else { 00203 m_Predicate.sort_dirs.push_back(eAscending); 00204 } 00205 } 00206 00207 if ( !m_MemoryLimit && !m_CountLimit ) { 00208 /// default is to use 50% of available RAM for sorting 00209 m_MemoryLimit = GetPhysicalMemorySize() / 2; 00210 LOG_POST(Info << "default physical memory size = " << m_MemoryLimit); 00211 } 00212 00213 m_TmpPath = CDirEntry::NormalizePath(CDirEntry::CreateAbsolutePath(tmp_path)); 00214 CDir d(m_TmpPath); 00215 if ( !d.Exists() && !d.CreatePath() ) { 00216 NCBI_THROW(CException, eUnknown, 00217 "failed to create temporary path"); 00218 } 00219 00220 m_TmpPath = CDir::GetTmpNameEx(m_TmpPath, "align_sort_"); 00221 m_TmpPath += "."; 00222 } 00223 00224 void CAlignSort::SortAlignments(IAlignSource &align_source, 00225 IAlignSortedOutput &sorted_output) 00226 { 00227 /// temporary array of seq-aligns 00228 /// when this exhausts our ability to manage 00229 TAlignments aligns; 00230 00231 /// 00232 /// loop on our input stream 00233 /// if we hit the limit, we dump a temporary file and merge at the end 00234 /// 00235 LOG_POST(Error << "pass 1: extracting alignments"); 00236 00237 size_t last_flush_point = 0; 00238 vector<string> tmp_volumes; 00239 00240 try { 00241 while (!align_source.EndOfData()) { 00242 CRef<CSeq_align> align = align_source.GetNext(); 00243 if (m_Filter && !m_Filter->Match(*align)) { 00244 continue; 00245 } 00246 00247 TAlignments::value_type val; 00248 val.first = m_Extractor(*align); 00249 val.second = align; 00250 aligns.push_back(val); 00251 00252 if (m_MemoryLimit && !m_DataSizeLimit && 00253 m_Extractor.count % 10000 == 0) 00254 { 00255 /// check to see if we've exceeded memory limits 00256 size_t total_mem = 0; 00257 size_t resident_mem = 0; 00258 size_t shared_mem = 0; 00259 if (GetMemoryUsage(&total_mem, 00260 &resident_mem, 00261 &shared_mem)) { 00262 if (total_mem > m_MemoryLimit) { 00263 m_DataSizeLimit = align_source.DataSizeSoFar(); 00264 } 00265 } 00266 } 00267 00268 if ((m_CountLimit && aligns.size() >= m_CountLimit) || 00269 (m_DataSizeLimit && 00270 align_source.DataSizeSoFar() - last_flush_point 00271 >= m_DataSizeLimit)) 00272 { 00273 last_flush_point = align_source.DataSizeSoFar(); 00274 std::sort(aligns.begin(), aligns.end(), m_Predicate); 00275 00276 string fname = m_TmpPath; 00277 fname += NStr::NumericToString(tmp_volumes.size() + 1); 00278 tmp_volumes.push_back(fname); 00279 00280 LOG_POST(Error << " tmp volume: " << fname 00281 << ": " << aligns.size() << " alignments"); 00282 CNcbiOfstream tmp_ostr(fname.c_str(), ios::binary | ios::out); 00283 auto_ptr<CObjectOStream> tmp_os 00284 (CObjectOStream::Open(eSerial_AsnBinary, tmp_ostr)); 00285 ITERATE (TAlignments, it, aligns) { 00286 if ( !tmp_ostr ) { 00287 NCBI_THROW(CException, eUnknown, 00288 "output stream error"); 00289 } 00290 00291 *tmp_os << *it->second; 00292 } 00293 aligns.clear(); 00294 } 00295 } 00296 00297 if (tmp_volumes.size() && aligns.size()) { 00298 /// 00299 /// we need to do a merge sort 00300 /// for purposes of algorithmic uniformity, write any spare alignments 00301 /// to their own volume 00302 /// 00303 if (aligns.size()) { 00304 std::sort(aligns.begin(), aligns.end(), m_Predicate); 00305 00306 string fname = m_TmpPath; 00307 fname += NStr::NumericToString(tmp_volumes.size() + 1); 00308 tmp_volumes.push_back(fname); 00309 00310 LOG_POST(Error << " tmp volume: " << fname 00311 << ": " << aligns.size() << " alignments"); 00312 CNcbiOfstream tmp_ostr(fname.c_str(), ios::binary | ios::out); 00313 auto_ptr<CObjectOStream> tmp_os 00314 (CObjectOStream::Open(eSerial_AsnBinary, tmp_ostr)); 00315 ITERATE (TAlignments, it, aligns) { 00316 if ( !tmp_ostr ) { 00317 NCBI_THROW(CException, eUnknown, 00318 "output stream error"); 00319 } 00320 00321 *tmp_os << *it->second; 00322 } 00323 aligns.clear(); 00324 } 00325 } 00326 00327 LOG_POST(Error << "pass 2: sorting"); 00328 if (tmp_volumes.size()) { 00329 LOG_POST(Error << "...performing merge sort..."); 00330 00331 /// 00332 /// Open each volume 00333 /// NB: there is a hole here - if we have more than, say, 8k volumes, 00334 /// the open may fail because we will run out of file descriptors 00335 /// the solution to this is to do several partial merges 00336 /// 00337 typedef vector< AutoPtr<CObjectIStream> > TFiles; 00338 TFiles files; 00339 files.reserve(tmp_volumes.size()); 00340 ITERATE (vector<string>, it, tmp_volumes) { 00341 AutoPtr<CObjectIStream> is 00342 (CObjectIStream::Open(eSerial_AsnBinary, *it)); 00343 files.push_back(is); 00344 } 00345 00346 /// 00347 /// seet a priority queue 00348 /// the priority queue keeps track of which alignment is the next for 00349 /// us to process, and provides overall an O(ln n) algorithmic 00350 /// complexity 00351 /// 00352 typedef priority_queue<SKeyAndFile, 00353 vector<SKeyAndFile>, 00354 SPQSort> TQueue; 00355 m_Extractor.sw.Restart(); 00356 m_Extractor.count = 0; 00357 SPQSort pqs(m_Predicate); 00358 TQueue q(pqs); 00359 size_t count = 0; 00360 ITERATE (TFiles, it, files) { 00361 if ( !(*it)->EndOfData() ) { 00362 /// NB: this is expensive 00363 /// we are re-extracting our key information from our alignment 00364 /// we should try and make this less heavy 00365 /// one solution is to serialize the key from above and then 00366 /// only extract the binary strip of data corresponding to the 00367 /// actual alignment. 00368 CRef<CSeq_align> sa(new CSeq_align); 00369 (**it) >> *sa; 00370 00371 SKeyAndFile kf; 00372 kf.first.first = m_Extractor(*sa); 00373 kf.first.second = sa; 00374 kf.second = count; 00375 q.push(kf); 00376 } 00377 ++count; 00378 } 00379 00380 while ( !q.empty() ) { 00381 SKeyAndFile kf = q.top(); 00382 q.pop(); 00383 sorted_output.Write(kf.first); 00384 00385 if ( !(*files[kf.second]).EndOfData() ) { 00386 /// NB: as above, this is expensive 00387 CRef<CSeq_align> sa(new CSeq_align); 00388 (*files[kf.second]) >> *sa; 00389 00390 kf.first.first = m_Extractor(*sa); 00391 kf.first.second = sa; 00392 q.push(kf); 00393 } 00394 else { 00395 /// close our file once done 00396 files[kf.second].reset(); 00397 LOG_POST(Error << " freeing volume: " 00398 << tmp_volumes[kf.second]); 00399 if ( !CFile(tmp_volumes[kf.second]).Remove() ) { 00400 LOG_POST(Error << " failed to remove temp file: " 00401 << tmp_volumes[kf.second]); 00402 } 00403 } 00404 } 00405 00406 } else { 00407 /// 00408 /// this side is much simpler - all alignments fit into RAM 00409 /// sort and dump 00410 /// 00411 std::sort(aligns.begin(), aligns.end(), m_Predicate); 00412 00413 ITERATE (TAlignments, it, aligns) { 00414 sorted_output.Write(*it); 00415 } 00416 } 00417 } 00418 catch (CException&) { 00419 ITERATE (vector<string>, it, tmp_volumes) { 00420 LOG_POST(Error << "removing tmp volume: " << *it); 00421 CFile(*it).Remove(); 00422 } 00423 00424 throw; 00425 } 00426 } 00427 00428 END_NCBI_SCOPE
1.7.5.1
Modified on Wed May 23 13:24:36 2012 by modify_doxy.py rev. 337098