src/objtools/data_loaders/genbank/cache/writer_cache.cpp

Go to the documentation of this file.
00001 /*  $Id: writer_cache.cpp 175002 2009-11-03 13:15:34Z vasilche $
00002  * ===========================================================================
00003  *                            PUBLIC DOMAIN NOTICE
00004  *               National Center for Biotechnology Information
00005  *
00006  *  This software/database is a "United States Government Work" under the
00007  *  terms of the United States Copyright Act.  It was written as part of
00008  *  the author's official duties as a United States Government employee and
00009  *  thus cannot be copyrighted.  This software/database is freely available
00010  *  to the public for use. The National Library of Medicine and the U.S.
00011  *  Government have not placed any restriction on its use or reproduction.
00012  *
00013  *  Although all reasonable efforts have been taken to ensure the accuracy
00014  *  and reliability of the software and data, the NLM and the U.S.
00015  *  Government do not and cannot warrant the performance or results that
00016  *  may be obtained by using this software or data. The NLM and the U.S.
00017  *  Government disclaim all warranties, express or implied, including
00018  *  warranties of performance, merchantability or fitness for any particular
00019  *  purpose.
00020  *
00021  *  Please cite the author in any work or product based on this material.
00022  *
00023  * ===========================================================================
00024  *
00025  *  Author:  Eugene Vasilchenko, Anatoliy Kuznetsov
00026  *
00027  *  File Description: Cached writer for GenBank data loader
00028  *
00029  */
00030 #include <ncbi_pch.hpp>
00031 #include <objtools/data_loaders/genbank/cache/writer_cache.hpp>
00032 #include <objtools/data_loaders/genbank/cache/writer_cache_entry.hpp>
00033 #include <objtools/data_loaders/genbank/cache/reader_cache_params.h>
00034 #include <objtools/data_loaders/genbank/readers.hpp> // for entry point
00035 #include <objtools/data_loaders/genbank/request_result.hpp>
00036 #include <objtools/data_loaders/genbank/dispatcher.hpp>
00037 
00038 #include <corelib/rwstream.hpp>
00039 #include <corelib/plugin_manager_store.hpp>
00040 
00041 #include <serial/objostrasnb.hpp>
00042 #include <serial/serial.hpp>
00043 
00044 #include <objmgr/objmgr_exception.hpp>
00045 
00046 #include <util/cache/icache.hpp>
00047 
00048 #include <memory>
00049 
00050 BEGIN_NCBI_SCOPE
00051 BEGIN_SCOPE(objects)
00052 
00053 CCacheWriter::CCacheWriter(void)
00054 {
00055 }
00056 
00057 
00058 void CCacheWriter::InitializeCache(CReaderCacheManager& cache_manager,
00059                                    const TPluginManagerParamTree* params)
00060 {
00061     const TPluginManagerParamTree* writer_params = params ?
00062         params->FindNode(NCBI_GBLOADER_WRITER_CACHE_DRIVER_NAME) : 0;
00063     ICache* id_cache = 0;
00064     ICache* blob_cache = 0;
00065     auto_ptr<TParams> id_params
00066         (GetCacheParams(writer_params, eCacheWriter, eIdCache));
00067     auto_ptr<TParams> blob_params
00068         (GetCacheParams(writer_params, eCacheWriter, eBlobCache));
00069     _ASSERT(id_params.get());
00070     _ASSERT(blob_params.get());
00071     const TParams* share_id_param =
00072         id_params->FindNode(NCBI_GBLOADER_WRITER_CACHE_PARAM_SHARE);
00073     bool share_id = !share_id_param  ||
00074         NStr::StringToBool(share_id_param->GetValue().value);
00075     const TParams* share_blob_param =
00076         blob_params->FindNode(NCBI_GBLOADER_WRITER_CACHE_PARAM_SHARE);
00077     bool share_blob = !share_blob_param  ||
00078         NStr::StringToBool(share_blob_param->GetValue().value);
00079     if (share_id  ||  share_blob) {
00080         if ( share_id ) {
00081             ICache* cache = cache_manager.
00082                 FindCache(CReaderCacheManager::fCache_Id,
00083                           id_params.get());
00084             if ( cache ) {
00085                 _ASSERT(!id_cache);
00086                 id_cache = cache;
00087             }
00088         }
00089         if ( share_blob ) {
00090             ICache* cache = cache_manager.
00091                 FindCache(CReaderCacheManager::fCache_Blob,
00092                           blob_params.get());
00093             if ( cache ) {
00094                 _ASSERT(!blob_cache);
00095                 blob_cache = cache;
00096             }
00097         }
00098     }
00099     if ( !id_cache ) {
00100         id_cache = CreateCache(writer_params, eCacheWriter, eIdCache);
00101         if ( id_cache ) {
00102             cache_manager.RegisterCache(*id_cache,
00103                 CReaderCacheManager::fCache_Id);
00104         }
00105     }
00106     if ( !blob_cache ) {
00107         blob_cache = CreateCache(writer_params, eCacheWriter, eBlobCache);
00108         if ( blob_cache ) {
00109             cache_manager.RegisterCache(*blob_cache,
00110                 CReaderCacheManager::fCache_Blob);
00111         }
00112     }
00113     SetIdCache(id_cache);
00114     SetBlobCache(blob_cache);
00115 }
00116 
00117 
00118 void CCacheWriter::ResetCache(void)
00119 {
00120     SetIdCache(0);
00121     SetBlobCache(0);
00122 }
00123 
00124 
00125 void CCacheWriter::SaveStringSeq_ids(CReaderRequestResult& result,
00126                                      const string& seq_id)
00127 {
00128     if ( !m_IdCache) {
00129         return;
00130     }
00131 
00132     CLoadLockSeq_ids ids(result, seq_id);
00133     WriteSeq_ids(seq_id, ids);
00134 }
00135 
00136 
00137 namespace {
00138     class CStoreBuffer {
00139     public:
00140         CStoreBuffer(void)
00141             : m_Buffer(m_Buffer0),
00142               m_End(m_Buffer0+sizeof(m_Buffer0)),
00143               m_Ptr(m_Buffer)
00144             {
00145             }
00146         ~CStoreBuffer(void)
00147             {
00148                 x_FreeBuffer();
00149             }
00150         
00151         const char* data(void) const
00152             {
00153                 return m_Buffer;
00154             }
00155         size_t size(void) const
00156             {
00157                 return m_Ptr - m_Buffer;
00158             }
00159         void CheckStore(size_t size);
00160         void StoreUint4(Uint4 v)
00161             {
00162                 CheckStore(4);
00163                 x_StoreUint4(v);
00164             }
00165         void StoreInt4(Int4 v)
00166             {
00167                 StoreUint4(v);
00168             }
00169         void StoreString(const string& s)
00170             {
00171                 size_t size = s.size();
00172                 CheckStore(4+size);
00173                 x_StoreUint4(size);
00174                 memcpy(m_Ptr, s.data(), size);
00175                 m_Ptr += size;
00176             }
00177 
00178     protected:
00179         void x_FreeBuffer(void);
00180         void x_StoreUint4(Uint4 v)
00181             {
00182                 m_Ptr[0] = v>>24;
00183                 m_Ptr[1] = v>>16;
00184                 m_Ptr[2] = v>>8;
00185                 m_Ptr[3] = v;
00186                 m_Ptr += 4;
00187             }
00188 
00189     private:
00190         CStoreBuffer(const CStoreBuffer&);
00191         void operator=(const CStoreBuffer&);
00192 
00193         char m_Buffer0[256];
00194         char* m_Buffer;
00195         char* m_End;
00196         char* m_Ptr;
00197     };
00198 
00199     
00200     void CStoreBuffer::CheckStore(size_t add)
00201     {
00202         if ( m_Ptr + add > m_End ) {
00203             size_t old_size = size();
00204             size_t new_size = (old_size+add)*2;
00205             char* new_buf = new char[new_size];
00206             memcpy(new_buf, data(), old_size);
00207             x_FreeBuffer();
00208             m_Buffer = new_buf;
00209             m_Ptr = new_buf + old_size;
00210             m_End = new_buf + new_size;
00211         }
00212         _ASSERT(m_Ptr + add <= m_End);
00213     }
00214 
00215 
00216     void CStoreBuffer::x_FreeBuffer(void)
00217     {
00218         if ( m_Buffer != m_Buffer0 ) {
00219             delete[] m_Buffer;
00220             m_Buffer = m_End = m_Ptr = 0;
00221         }
00222     }
00223 }
00224 
00225 void CCacheWriter::SaveStringGi(CReaderRequestResult& result,
00226                                 const string& seq_id)
00227 {
00228     if( !m_IdCache) {
00229         return;
00230     }
00231 
00232     CLoadLockSeq_ids ids(result, seq_id);
00233     if ( ids->IsLoadedGi() ) {
00234         CStoreBuffer str;
00235         str.StoreInt4(ids->GetGi());
00236         try {
00237             m_IdCache->Store(seq_id, 0, GetGiSubkey(),
00238                              str.data(), str.size());
00239         }
00240         catch ( ... ) { // ignored
00241         }
00242     }
00243 }
00244 
00245 
00246 void CCacheWriter::SaveSeq_idSeq_ids(CReaderRequestResult& result,
00247                                      const CSeq_id_Handle& seq_id)
00248 {
00249     if( !m_IdCache) {
00250         return;
00251     }
00252 
00253     CLoadLockSeq_ids ids(result, seq_id);
00254     WriteSeq_ids(GetIdKey(seq_id), ids);
00255 }
00256 
00257 
00258 void CCacheWriter::SaveSeq_idGi(CReaderRequestResult& result,
00259                                 const CSeq_id_Handle& seq_id)
00260 {
00261     if( !m_IdCache) {
00262         return;
00263     }
00264 
00265     CLoadLockSeq_ids ids(result, seq_id);
00266     if ( ids->IsLoadedGi() ) {
00267         CStoreBuffer str;
00268         str.StoreInt4(ids->GetGi());
00269         try {
00270             m_IdCache->Store(GetIdKey(seq_id), 0, GetGiSubkey(),
00271                              str.data(), str.size());
00272         }
00273         catch ( ... ) { // ignored
00274         }
00275     }
00276 }
00277 
00278 
00279 void CCacheWriter::SaveSeq_idAccVer(CReaderRequestResult& result,
00280                                     const CSeq_id_Handle& seq_id)
00281 {
00282     if( !m_IdCache) {
00283         return;
00284     }
00285 
00286     CLoadLockSeq_ids ids(result, seq_id);
00287     if ( ids->IsLoadedAccVer() ) {
00288         CSeq_id_Handle acc = ids->GetAccVer();
00289         const string& str = acc.AsString();
00290         try {
00291             m_IdCache->Store(GetIdKey(seq_id), 0, GetAccVerSubkey(),
00292                              str.data(), str.size());
00293         }
00294         catch ( ... ) { // ignored
00295         }
00296     }
00297 }
00298 
00299 
00300 void CCacheWriter::SaveSeq_idLabel(CReaderRequestResult& result,
00301                                    const CSeq_id_Handle& seq_id)
00302 {
00303     if( !m_IdCache) {
00304         return;
00305     }
00306 
00307     CLoadLockSeq_ids ids(result, seq_id);
00308     if ( ids->IsLoadedLabel() ) {
00309         const string& str = ids->GetLabel();
00310         try {
00311             m_IdCache->Store(GetIdKey(seq_id), 0, GetLabelSubkey(),
00312                              str.data(), str.size());
00313         }
00314         catch ( ... ) { // ignored
00315         }
00316     }
00317 }
00318 
00319 
00320 namespace {
00321     class CCacheDataEraser {
00322         CCacheDataEraser(ICache* cache,
00323                          const string& key, int version, const string& subkey)
00324             : m_Cache(cache),
00325               m_Key(key), m_Version(version), m_Subkey(subkey)
00326             {
00327             }
00328         ~CCacheDataEraser(void) {
00329             if ( m_Cache ) {
00330                 try {
00331                     m_Cache->Remove(m_Key, m_Version, m_Subkey);
00332                 }
00333                 catch (...) { // ignored
00334                 }
00335                 m_Cache = 0;
00336             }
00337         }
00338 
00339         void Done(void) {
00340             m_Cache = 0;
00341         }
00342         
00343     private:
00344         ICache* m_Cache;
00345         string m_Key;
00346         int m_Version;
00347         string m_Subkey;
00348     };
00349 }
00350 
00351 
00352 void CCacheWriter::WriteSeq_ids(const string& key,
00353                                 const CLoadLockSeq_ids& ids)
00354 {
00355     if( !m_IdCache) {
00356         return;
00357     }
00358 
00359     if ( !ids.IsLoaded() ) {
00360         return;
00361     }
00362 
00363     try {
00364         auto_ptr<IWriter> writer
00365             (m_IdCache->GetWriteStream(key, 0, GetSeq_idsSubkey()));
00366         if ( !writer.get() ) {
00367             return;
00368         }
00369 
00370         {{
00371             CWStream w_stream(writer.get());
00372             CObjectOStreamAsnBinary obj_stream(w_stream);
00373             static_cast<CObjectOStream&>(obj_stream).WriteUint4(ids->size());
00374             ITERATE ( CLoadInfoSeq_ids, it, *ids ) {
00375                 obj_stream << *it->GetSeqId();
00376             }
00377         }}
00378 
00379         writer.reset();
00380     }
00381     catch ( ... ) {
00382         // In case of an error we need to remove incomplete data
00383         // from the cache.
00384         try {
00385             m_BlobCache->Remove(key, 0, GetSeq_idsSubkey());
00386         }
00387         catch ( ... ) { // ignored
00388         }
00389         // ignore cache write error - it doesn't affect application
00390     }
00391 }
00392 
00393 
00394 void CCacheWriter::SaveSeq_idBlob_ids(CReaderRequestResult& result,
00395                                       const CSeq_id_Handle& seq_id,
00396                                       const SAnnotSelector* sel)
00397 {
00398     if ( !m_IdCache) {
00399         return;
00400     }
00401 
00402     CLoadLockBlob_ids ids(result, seq_id, sel);
00403     if ( !ids.IsLoaded() ) {
00404         return;
00405     }
00406 
00407     string subkey, true_subkey;
00408     GetBlob_idsSubkey(sel, subkey, true_subkey);
00409     CStoreBuffer str;
00410     str.StoreInt4(IDS_MAGIC);
00411     str.StoreUint4(ids->GetState());
00412     str.StoreUint4(ids->size());
00413     ITERATE ( CLoadInfoBlob_ids, it, *ids ) {
00414         const CBlob_id& id = *it->first;
00415         str.StoreUint4(id.GetSat());
00416         str.StoreUint4(id.GetSubSat());
00417         str.StoreUint4(id.GetSatKey());
00418         const CBlob_Info& info = it->second;
00419         str.StoreUint4(info.GetContentsMask());
00420         str.StoreUint4(info.GetNamedAnnotNames().size());
00421         ITERATE(CBlob_Info::TNamedAnnotNames, it2, info.GetNamedAnnotNames()) {
00422             str.StoreString(*it2);
00423         }
00424     }
00425     if ( !true_subkey.empty() ) {
00426         str.StoreString(true_subkey);
00427     }
00428     try {
00429         m_IdCache->Store(GetIdKey(seq_id), 0, subkey, str.data(), str.size());
00430     }
00431     catch ( ... ) { // ignored
00432     }
00433 }
00434 
00435 
00436 void CCacheWriter::SaveBlobVersion(CReaderRequestResult& /*result*/,
00437                                    const TBlobId& blob_id,
00438                                    TBlobVersion version)
00439 {
00440     if( !m_IdCache ) {
00441         return;
00442     }
00443 
00444     CStoreBuffer str;
00445     str.StoreInt4(version);
00446     try {
00447         m_IdCache->Store(GetBlobKey(blob_id), 0, GetBlobVersionSubkey(),
00448                          str.data(), str.size());
00449     }
00450     catch ( ... ) { // ignored
00451     }
00452 }
00453 
00454 
00455 class CCacheBlobStream : public CWriter::CBlobStream
00456 {
00457 public:
00458     typedef int TVersion;
00459 
00460     CCacheBlobStream(ICache* cache, const string& key,
00461                      TVersion version, const string& subkey)
00462         : m_Cache(cache), m_Key(key), m_Version(version), m_Subkey(subkey),
00463           m_Writer(cache->GetWriteStream(key, version, subkey))
00464         {
00465             if ( m_Writer.get() ) {
00466                 m_Stream.reset(new CWStream(m_Writer.get()));
00467             }
00468         }
00469     ~CCacheBlobStream(void)
00470         {
00471             if ( m_Stream.get() ) {
00472                 Abort();
00473             }
00474         }
00475 
00476     bool CanWrite(void) const
00477         {
00478             return m_Stream.get() != 0;
00479         }
00480 
00481     CNcbiOstream& operator*(void)
00482         {
00483             _ASSERT(m_Stream.get());
00484             return *m_Stream;
00485         }
00486 
00487     void Close(void)
00488         {
00489             *m_Stream << flush;
00490             if ( !*m_Stream ) {
00491                 Abort();
00492             }
00493             m_Stream.reset();
00494             m_Writer.reset();
00495         }
00496 
00497     void Abort(void)
00498         {
00499             m_Stream.reset();
00500             m_Writer.reset();
00501             Remove();
00502         }
00503 
00504     void Remove(void)
00505         {
00506             try {
00507                 m_Cache->Remove(m_Key, m_Version, m_Subkey);
00508             }
00509             catch (...) { // ignored
00510             }
00511         }
00512 
00513 private:
00514     ICache*             m_Cache;
00515     string              m_Key;
00516     TVersion            m_Version;
00517     string              m_Subkey;
00518     auto_ptr<IWriter>   m_Writer;
00519     auto_ptr<CWStream>  m_Stream;
00520 };
00521 
00522 
00523 CRef<CWriter::CBlobStream>
00524 CCacheWriter::OpenBlobStream(CReaderRequestResult& result,
00525                              const TBlobId& blob_id,
00526                              TChunkId chunk_id,
00527                              const CProcessor& processor)
00528 {
00529     if( !m_BlobCache ) {
00530         return null;
00531     }
00532 
00533     try {
00534         CLoadLockBlob blob(result, blob_id);
00535         CRef<CBlobStream> stream
00536             (new CCacheBlobStream(m_BlobCache, GetBlobKey(blob_id),
00537                                   blob.GetBlobVersion(),
00538                                   GetBlobSubkey(chunk_id)));
00539         if ( !stream->CanWrite() ) {
00540             return null;
00541         }
00542         
00543         WriteProcessorTag(**stream, processor);
00544         return stream;
00545     }
00546     catch ( ... ) { // ignored
00547         return null;
00548     }
00549 }
00550 
00551 
00552 bool CCacheWriter::CanWrite(EType type) const
00553 {
00554     return (type == eIdWriter ? m_IdCache : m_BlobCache) != 0;
00555 }
00556 
00557 
00558 END_SCOPE(objects)
00559 
00560 
00561 using namespace objects;
00562 
00563 
00564 /// Class factory for Cache writer
00565 ///
00566 /// @internal
00567 ///
00568 class CCacheWriterCF :
00569     public CSimpleClassFactoryImpl<CWriter, CCacheWriter>
00570 {
00571 private:
00572     typedef CSimpleClassFactoryImpl<CWriter, CCacheWriter> TParent;
00573 public:
00574     CCacheWriterCF()
00575         : TParent(NCBI_GBLOADER_WRITER_CACHE_DRIVER_NAME, 0) {}
00576     ~CCacheWriterCF() {}
00577 
00578 
00579     CWriter*
00580     CreateInstance(const string& driver  = kEmptyStr,
00581                    CVersionInfo version = NCBI_INTERFACE_VERSION(CWriter),
00582                    const TPluginManagerParamTree* params = 0) const
00583     {
00584         if ( !driver.empty()  &&  driver != m_DriverName )
00585             return 0;
00586 
00587         if ( !version.Match(NCBI_INTERFACE_VERSION(CWriter)) ) {
00588             return 0;
00589         }
00590         return new CCacheWriter();
00591     }
00592 };
00593 
00594 
00595 void NCBI_EntryPoint_CacheWriter(
00596      CPluginManager<CWriter>::TDriverInfoList&   info_list,
00597      CPluginManager<CWriter>::EEntryPointRequest method)
00598 {
00599     CHostEntryPointImpl<CCacheWriterCF>::NCBI_EntryPointImpl(info_list,
00600                                                              method);
00601 }
00602 
00603 
00604 void NCBI_EntryPoint_xwriter_cache(
00605      CPluginManager<CWriter>::TDriverInfoList&   info_list,
00606      CPluginManager<CWriter>::EEntryPointRequest method)
00607 {
00608     NCBI_EntryPoint_CacheWriter(info_list, method);
00609 }
00610 
00611 
00612 void GenBankWriters_Register_Cache(void)
00613 {
00614     RegisterEntryPoint<CWriter>(NCBI_EntryPoint_CacheWriter);
00615 }
00616 
00617 
00618 END_NCBI_SCOPE
00619 
00620 

Generated on Sun Dec 6 22:41:17 2009 for NCBI C++ ToolKit by  doxygen 1.4.6
Modified on Mon Dec 07 16:21:12 2009 by modify_doxy.py rev. 173732