NCBI C++ ToolKit
align_sort.cpp
Go to the documentation of this file.
00001 /*  $Id: align_sort.cpp 53842 2012-04-20 15:08:22Z mozese2 $
00002  * ===========================================================================
00003  *
00004  *                            PUBLIC DOMAIN NOTICE
00005  *               National Center for Biotechnology Information
00006  *
00007  *  This software/database is a "United States Government Work" under the
00008  *  terms of the United States Copyright Act.  It was written as part of
00009  *  the author's official duties as a United States Government employee and
00010  *  thus cannot be copyrighted.  This software/database is freely available
00011  *  to the public for use. The National Library of Medicine and the U.S.
00012  *  Government have not placed any restriction on its use or reproduction.
00013  *
00014  *  Although all reasonable efforts have been taken to ensure the accuracy
00015  *  and reliability of the software and data, the NLM and the U.S.
00016  *  Government do not and cannot warrant the performance or results that
00017  *  may be obtained by using this software or data. The NLM and the U.S.
00018  *  Government disclaim all warranties, express or implied, including
00019  *  warranties of performance, merchantability or fitness for any particular
00020  *  purpose.
00021  *
00022  *  Please cite the author in any work or product based on this material.
00023  *
00024  * ===========================================================================
00025  *
00026  * Authors:  Mike DiCuccio
00027  *
00028  * File Description:
00029  *
00030  */
00031 
00032 #include <ncbi_pch.hpp>
00033 
00034 #include <corelib/ncbifile.hpp>
00035 #include <corelib/ncbi_system.hpp>
00036 
00037 #include <objmgr/util/sequence.hpp>
00038 
00039 #include <objects/seq/MolInfo.hpp>
00040 #include <objects/seqalign/Score.hpp>
00041 #include <objects/general/Object_id.hpp>
00042 
00043 #include <algo/align/util/align_sort.hpp>
00044 
00045 
00046 BEGIN_NCBI_SCOPE
00047 USING_SCOPE(objects);
00048 
00049 
00050 /////////////////////////////////////////////////////////////////////////////
00051 
00052 bool CAlignSort::SSortKey_Less::operator()(const TAlignment& k1,
00053                                             const TAlignment& k2) const
00054 {
00055     const SSortKey& key1 = k1.first;
00056     const SSortKey& key2 = k2.first;
00057 
00058     for (size_t i = 0;
00059          i < key1.items.size()  &&  i < key2.items.size();
00060          ++i) {
00061         ESortDir dir = eAscending;
00062         if (i < sort_dirs.size()) {
00063             dir = sort_dirs[i];
00064         }
00065 
00066         switch (dir) {
00067         case eAscending:
00068             if (key1.items[i] < key2.items[i]) {
00069                 return true;
00070             }
00071             if (key2.items[i] < key1.items[i]) {
00072                 return false;
00073             }
00074             break;
00075 
00076         case eDescending:
00077             if (key2.items[i] < key1.items[i]) {
00078                 return true;
00079             }
00080             if (key1.items[i] < key2.items[i]) {
00081                 return false;
00082             }
00083             break;
00084         }
00085     }
00086 
00087     return key1.items.size() < key2.items.size();
00088 }
00089 
00090 
00091 CAlignSort::SSortKey
00092 CAlignSort::SAlignExtractor::operator()(const CSeq_align& align)
00093 {
00094     SSortKey key;
00095     ITERATE (vector<string>, iter, key_toks) {
00096         SSortKey::TItem item;
00097 
00098         if (NStr::EqualNocase(*iter, "query")) {
00099             CSeq_id_Handle idh =
00100                 CSeq_id_Handle::GetHandle(align.GetSeq_id(0));
00101             idh = sequence::GetId(idh, *scope,
00102                                   sequence::eGetId_Canonical);
00103             item.first = idh.GetSeqId()->AsFastaString();
00104         }
00105         else if (NStr::EqualNocase(*iter, "subject")) {
00106             CSeq_id_Handle idh =
00107                 CSeq_id_Handle::GetHandle(align.GetSeq_id(1));
00108             idh = sequence::GetId(idh, *scope,
00109                                   sequence::eGetId_Canonical);
00110             item.first = idh.GetSeqId()->AsFastaString();
00111         }
00112 
00113         else if (NStr::EqualNocase(*iter, "query_start")) {
00114             item.second = align.GetSeqStart(0);
00115         }
00116         else if (NStr::EqualNocase(*iter, "subject_start")) {
00117             item.second = align.GetSeqStart(1);
00118         }
00119 
00120         else if (NStr::EqualNocase(*iter, "query_end")) {
00121             item.second = align.GetSeqStop(0);
00122         }
00123         else if (NStr::EqualNocase(*iter, "subject_end")) {
00124             item.second = align.GetSeqStop(1);
00125         }
00126 
00127         else if (NStr::EqualNocase(*iter, "query_strand")) {
00128             item.second = align.GetSeqStrand(0);
00129         }
00130         else if (NStr::EqualNocase(*iter, "subject_strand")) {
00131             item.second = align.GetSeqStrand(1);
00132         }
00133 
00134         else if (NStr::EqualNocase(*iter, "query_align_len")) {
00135             item.second = align.GetSeqRange(0).GetLength();
00136         }
00137         else if (NStr::EqualNocase(*iter, "subject_align_len")) {
00138             item.second = align.GetSeqRange(1).GetLength();
00139         }
00140 
00141         else {
00142             /// assume it is a score
00143             double score = 0;
00144             if (align.IsSetScore()) {
00145                 ITERATE (CSeq_align::TScore, it, align.GetScore()) {
00146                     const CScore& s = **it;
00147                     if (s.IsSetId()  &&  s.GetId().IsStr()  &&
00148                         NStr::EqualNocase(*iter, s.GetId().GetStr())) {
00149                         if (s.GetValue().IsInt()) {
00150                             score = s.GetValue().GetInt();
00151                         } else if (s.GetValue().IsReal()) {
00152                             score = s.GetValue().GetReal();
00153                         } else {
00154                             NCBI_THROW(CException, eUnknown,
00155                                        "invalid score type");
00156                         }
00157                     }
00158                 }
00159             }
00160 
00161             item.second = score;
00162         }
00163         key.items.push_back(item);
00164     }
00165 
00166     ++count;
00167     if (count % 100000 == 0) {
00168         double e = sw.Elapsed();
00169         LOG_POST(Error << "  processed " << count
00170                  << " alignments ("
00171                  << count / e << " alignments/sec)");
00172     }
00173 
00174     return key;
00175 }
00176 
00177 
00178 /////////////////////////////////////////////////////////////////////////////
00179 
00180 CAlignSort::CAlignSort(CScope &scope,
00181                        string sorting_keys,
00182                        CRef<CAlignFilter> filter,
00183                        const string &tmp_path,
00184                        size_t memory_limit,
00185                        size_t count_limit)
00186 : m_Filter(filter)
00187 , m_MemoryLimit(memory_limit)
00188 , m_CountLimit(count_limit)
00189 , m_DataSizeLimit(0)
00190 , m_Extractor(scope)
00191 {
00192     NStr::Tokenize(sorting_keys, ", \t\r\n", m_Extractor.key_toks, NStr::eMergeDelims);
00193     NON_CONST_ITERATE (vector<string>, iter, m_Extractor.key_toks) {
00194         *iter = NStr::TruncateSpaces(*iter);
00195 
00196         if ((*iter)[0] == '-') {
00197             m_Predicate.sort_dirs.push_back(eDescending);
00198             iter->erase(0, 1);
00199         } else if ((*iter)[0] == '+') {
00200             m_Predicate.sort_dirs.push_back(eAscending);
00201             iter->erase(0, 1);
00202         } else {
00203             m_Predicate.sort_dirs.push_back(eAscending);
00204         }
00205     }
00206 
00207     if ( !m_MemoryLimit  &&  !m_CountLimit ) {
00208         /// default is to use 50% of available RAM for sorting
00209         m_MemoryLimit = GetPhysicalMemorySize() / 2;
00210         LOG_POST(Info << "default physical memory size = " << m_MemoryLimit);
00211     }
00212 
00213     m_TmpPath = CDirEntry::NormalizePath(CDirEntry::CreateAbsolutePath(tmp_path));
00214     CDir d(m_TmpPath);
00215     if ( !d.Exists()  &&  !d.CreatePath() ) {
00216          NCBI_THROW(CException, eUnknown,
00217                     "failed to create temporary path");
00218     }
00219 
00220     m_TmpPath = CDir::GetTmpNameEx(m_TmpPath, "align_sort_");
00221     m_TmpPath += ".";
00222 }
00223 
00224 void CAlignSort::SortAlignments(IAlignSource &align_source,
00225                                 IAlignSortedOutput &sorted_output)
00226 {
00227     /// temporary array of seq-aligns
00228     /// when this exhausts our ability to manage
00229     TAlignments aligns;
00230 
00231     ///
00232     /// loop on our input stream
00233     /// if we hit the limit, we dump a temporary file and merge at the end
00234     ///
00235     LOG_POST(Error << "pass 1: extracting alignments");
00236 
00237     size_t last_flush_point = 0;
00238     vector<string> tmp_volumes;
00239 
00240     try {
00241         while (!align_source.EndOfData()) {
00242             CRef<CSeq_align> align = align_source.GetNext();
00243             if (m_Filter  &&  !m_Filter->Match(*align)) {
00244                 continue;
00245             }
00246 
00247             TAlignments::value_type val;
00248             val.first = m_Extractor(*align);
00249             val.second = align;
00250             aligns.push_back(val);
00251 
00252             if (m_MemoryLimit && !m_DataSizeLimit &&
00253                 m_Extractor.count % 10000 == 0)
00254             {
00255                 /// check to see if we've exceeded memory limits
00256                 size_t total_mem = 0;
00257                 size_t resident_mem = 0;
00258                 size_t shared_mem = 0;
00259                 if (GetMemoryUsage(&total_mem,
00260                                    &resident_mem,
00261                                    &shared_mem)) {
00262                     if (total_mem > m_MemoryLimit) {
00263                         m_DataSizeLimit = align_source.DataSizeSoFar();
00264                     }
00265                 }
00266             }
00267 
00268             if ((m_CountLimit  &&  aligns.size() >= m_CountLimit)  ||
00269                (m_DataSizeLimit  &&
00270                 align_source.DataSizeSoFar() - last_flush_point
00271                     >= m_DataSizeLimit))
00272             {
00273                 last_flush_point = align_source.DataSizeSoFar();
00274                 std::sort(aligns.begin(), aligns.end(), m_Predicate);
00275 
00276                 string fname = m_TmpPath;
00277                 fname += NStr::NumericToString(tmp_volumes.size() + 1);
00278                 tmp_volumes.push_back(fname);
00279 
00280                 LOG_POST(Error << "  tmp volume: " << fname
00281                          << ": " << aligns.size() << " alignments");
00282                 CNcbiOfstream tmp_ostr(fname.c_str(), ios::binary | ios::out);
00283                 auto_ptr<CObjectOStream> tmp_os
00284                     (CObjectOStream::Open(eSerial_AsnBinary, tmp_ostr));
00285                 ITERATE (TAlignments, it, aligns) {
00286                     if ( !tmp_ostr ) {
00287                         NCBI_THROW(CException, eUnknown,
00288                                    "output stream error");
00289                     }
00290 
00291                     *tmp_os << *it->second;
00292                 }
00293                 aligns.clear();
00294             }
00295         }
00296 
00297         if (tmp_volumes.size()  &&  aligns.size()) {
00298             ///
00299             /// we need to do a merge sort
00300             /// for purposes of algorithmic uniformity, write any spare alignments
00301             /// to their own volume
00302             ///
00303             if (aligns.size()) {
00304                 std::sort(aligns.begin(), aligns.end(), m_Predicate);
00305 
00306                 string fname = m_TmpPath;
00307                 fname += NStr::NumericToString(tmp_volumes.size() + 1);
00308                 tmp_volumes.push_back(fname);
00309 
00310                 LOG_POST(Error << "  tmp volume: " << fname
00311                          << ": " << aligns.size() << " alignments");
00312                 CNcbiOfstream tmp_ostr(fname.c_str(), ios::binary | ios::out);
00313                 auto_ptr<CObjectOStream> tmp_os
00314                     (CObjectOStream::Open(eSerial_AsnBinary, tmp_ostr));
00315                 ITERATE (TAlignments, it, aligns) {
00316                     if ( !tmp_ostr ) {
00317                         NCBI_THROW(CException, eUnknown,
00318                                    "output stream error");
00319                     }
00320 
00321                     *tmp_os << *it->second;
00322                 }
00323                 aligns.clear();
00324             }
00325         }
00326 
00327         LOG_POST(Error << "pass 2: sorting");
00328         if (tmp_volumes.size()) {
00329             LOG_POST(Error << "...performing merge sort...");
00330 
00331             ///
00332             /// Open each volume
00333             /// NB: there is a hole here - if we have more than, say, 8k volumes,
00334             /// the open may fail because we will run out of file descriptors
00335             /// the solution to this is to do several partial merges
00336             ///
00337             typedef vector< AutoPtr<CObjectIStream> > TFiles;
00338             TFiles files;
00339             files.reserve(tmp_volumes.size());
00340             ITERATE (vector<string>, it, tmp_volumes) {
00341                 AutoPtr<CObjectIStream> is
00342                     (CObjectIStream::Open(eSerial_AsnBinary, *it));
00343                 files.push_back(is);
00344             }
00345 
00346             ///
00347             /// seet a priority queue
00348             /// the priority queue keeps track of which alignment is the next for
00349             /// us to process, and provides overall an O(ln n) algorithmic
00350             /// complexity
00351             ///
00352             typedef priority_queue<SKeyAndFile,
00353                                    vector<SKeyAndFile>,
00354                                    SPQSort> TQueue;
00355             m_Extractor.sw.Restart();
00356             m_Extractor.count = 0;
00357             SPQSort pqs(m_Predicate);
00358             TQueue q(pqs);
00359             size_t count = 0;
00360             ITERATE (TFiles, it, files) {
00361                 if ( !(*it)->EndOfData() ) {
00362                     /// NB: this is expensive
00363                     /// we are re-extracting our key information from our alignment
00364                     /// we should try and make this less heavy
00365                     /// one solution is to serialize the key from above and then
00366                     /// only extract the binary strip of data corresponding to the
00367                     /// actual alignment.
00368                     CRef<CSeq_align> sa(new CSeq_align);
00369                     (**it) >> *sa;
00370 
00371                     SKeyAndFile kf;
00372                     kf.first.first = m_Extractor(*sa);
00373                     kf.first.second = sa;
00374                     kf.second = count;
00375                     q.push(kf);
00376                 }
00377                 ++count;
00378             }
00379 
00380             while ( !q.empty() ) {
00381                 SKeyAndFile kf = q.top();
00382                 q.pop();
00383                 sorted_output.Write(kf.first);
00384 
00385                 if ( !(*files[kf.second]).EndOfData() ) {
00386                     /// NB: as above, this is expensive
00387                     CRef<CSeq_align> sa(new CSeq_align);
00388                     (*files[kf.second]) >> *sa;
00389 
00390                     kf.first.first = m_Extractor(*sa);
00391                     kf.first.second = sa;
00392                     q.push(kf);
00393                 }
00394                 else {
00395                     /// close our file once done
00396                     files[kf.second].reset();
00397                     LOG_POST(Error << "  freeing volume: "
00398                              << tmp_volumes[kf.second]);
00399                     if ( !CFile(tmp_volumes[kf.second]).Remove() ) {
00400                         LOG_POST(Error << "    failed to remove temp file: "
00401                                  << tmp_volumes[kf.second]);
00402                     }
00403                 }
00404             }
00405 
00406         } else {
00407             ///
00408             /// this side is much simpler - all alignments fit into RAM
00409             /// sort and dump
00410             ///
00411             std::sort(aligns.begin(), aligns.end(), m_Predicate);
00412 
00413             ITERATE (TAlignments, it, aligns) {
00414                 sorted_output.Write(*it);
00415             }
00416         }
00417     }
00418     catch (CException&) {
00419         ITERATE (vector<string>, it, tmp_volumes) {
00420             LOG_POST(Error << "removing tmp volume: " << *it);
00421             CFile(*it).Remove();
00422         }
00423 
00424         throw;
00425     }
00426 }
00427 
00428 END_NCBI_SCOPE
Modified on Wed May 23 13:24:36 2012 by modify_doxy.py rev. 337098