00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 #include <ncbi_pch.hpp>
00031 #include <objtools/data_loaders/genbank/cache/writer_cache.hpp>
00032 #include <objtools/data_loaders/genbank/cache/writer_cache_entry.hpp>
00033 #include <objtools/data_loaders/genbank/cache/reader_cache_params.h>
00034 #include <objtools/data_loaders/genbank/readers.hpp>
00035 #include <objtools/data_loaders/genbank/request_result.hpp>
00036 #include <objtools/data_loaders/genbank/dispatcher.hpp>
00037
00038 #include <corelib/rwstream.hpp>
00039 #include <corelib/plugin_manager_store.hpp>
00040
00041 #include <serial/objostrasnb.hpp>
00042 #include <serial/serial.hpp>
00043
00044 #include <objmgr/objmgr_exception.hpp>
00045
00046 #include <util/cache/icache.hpp>
00047
00048 #include <memory>
00049
00050 BEGIN_NCBI_SCOPE
00051 BEGIN_SCOPE(objects)
00052
00053 CCacheWriter::CCacheWriter(void)
00054 {
00055 }
00056
00057
00058 void CCacheWriter::InitializeCache(CReaderCacheManager& cache_manager,
00059 const TPluginManagerParamTree* params)
00060 {
00061 const TPluginManagerParamTree* writer_params = params ?
00062 params->FindNode(NCBI_GBLOADER_WRITER_CACHE_DRIVER_NAME) : 0;
00063 ICache* id_cache = 0;
00064 ICache* blob_cache = 0;
00065 auto_ptr<TParams> id_params
00066 (GetCacheParams(writer_params, eCacheWriter, eIdCache));
00067 auto_ptr<TParams> blob_params
00068 (GetCacheParams(writer_params, eCacheWriter, eBlobCache));
00069 _ASSERT(id_params.get());
00070 _ASSERT(blob_params.get());
00071 const TParams* share_id_param =
00072 id_params->FindNode(NCBI_GBLOADER_WRITER_CACHE_PARAM_SHARE);
00073 bool share_id = !share_id_param ||
00074 NStr::StringToBool(share_id_param->GetValue().value);
00075 const TParams* share_blob_param =
00076 blob_params->FindNode(NCBI_GBLOADER_WRITER_CACHE_PARAM_SHARE);
00077 bool share_blob = !share_blob_param ||
00078 NStr::StringToBool(share_blob_param->GetValue().value);
00079 if (share_id || share_blob) {
00080 if ( share_id ) {
00081 ICache* cache = cache_manager.
00082 FindCache(CReaderCacheManager::fCache_Id,
00083 id_params.get());
00084 if ( cache ) {
00085 _ASSERT(!id_cache);
00086 id_cache = cache;
00087 }
00088 }
00089 if ( share_blob ) {
00090 ICache* cache = cache_manager.
00091 FindCache(CReaderCacheManager::fCache_Blob,
00092 blob_params.get());
00093 if ( cache ) {
00094 _ASSERT(!blob_cache);
00095 blob_cache = cache;
00096 }
00097 }
00098 }
00099 if ( !id_cache ) {
00100 id_cache = CreateCache(writer_params, eCacheWriter, eIdCache);
00101 if ( id_cache ) {
00102 cache_manager.RegisterCache(*id_cache,
00103 CReaderCacheManager::fCache_Id);
00104 }
00105 }
00106 if ( !blob_cache ) {
00107 blob_cache = CreateCache(writer_params, eCacheWriter, eBlobCache);
00108 if ( blob_cache ) {
00109 cache_manager.RegisterCache(*blob_cache,
00110 CReaderCacheManager::fCache_Blob);
00111 }
00112 }
00113 SetIdCache(id_cache);
00114 SetBlobCache(blob_cache);
00115 }
00116
00117
00118 void CCacheWriter::ResetCache(void)
00119 {
00120 SetIdCache(0);
00121 SetBlobCache(0);
00122 }
00123
00124
00125 void CCacheWriter::SaveStringSeq_ids(CReaderRequestResult& result,
00126 const string& seq_id)
00127 {
00128 if ( !m_IdCache) {
00129 return;
00130 }
00131
00132 CLoadLockSeq_ids ids(result, seq_id);
00133 WriteSeq_ids(seq_id, ids);
00134 }
00135
00136
00137 namespace {
00138 class CStoreBuffer {
00139 public:
00140 CStoreBuffer(void)
00141 : m_Buffer(m_Buffer0),
00142 m_End(m_Buffer0+sizeof(m_Buffer0)),
00143 m_Ptr(m_Buffer)
00144 {
00145 }
00146 ~CStoreBuffer(void)
00147 {
00148 x_FreeBuffer();
00149 }
00150
00151 const char* data(void) const
00152 {
00153 return m_Buffer;
00154 }
00155 size_t size(void) const
00156 {
00157 return m_Ptr - m_Buffer;
00158 }
00159 void CheckStore(size_t size);
00160 void StoreUint4(Uint4 v)
00161 {
00162 CheckStore(4);
00163 x_StoreUint4(v);
00164 }
00165 void StoreInt4(Int4 v)
00166 {
00167 StoreUint4(v);
00168 }
00169 void StoreString(const string& s)
00170 {
00171 size_t size = s.size();
00172 CheckStore(4+size);
00173 x_StoreUint4(size);
00174 memcpy(m_Ptr, s.data(), size);
00175 m_Ptr += size;
00176 }
00177
00178 protected:
00179 void x_FreeBuffer(void);
00180 void x_StoreUint4(Uint4 v)
00181 {
00182 m_Ptr[0] = v>>24;
00183 m_Ptr[1] = v>>16;
00184 m_Ptr[2] = v>>8;
00185 m_Ptr[3] = v;
00186 m_Ptr += 4;
00187 }
00188
00189 private:
00190 CStoreBuffer(const CStoreBuffer&);
00191 void operator=(const CStoreBuffer&);
00192
00193 char m_Buffer0[256];
00194 char* m_Buffer;
00195 char* m_End;
00196 char* m_Ptr;
00197 };
00198
00199
00200 void CStoreBuffer::CheckStore(size_t add)
00201 {
00202 if ( m_Ptr + add > m_End ) {
00203 size_t old_size = size();
00204 size_t new_size = (old_size+add)*2;
00205 char* new_buf = new char[new_size];
00206 memcpy(new_buf, data(), old_size);
00207 x_FreeBuffer();
00208 m_Buffer = new_buf;
00209 m_Ptr = new_buf + old_size;
00210 m_End = new_buf + new_size;
00211 }
00212 _ASSERT(m_Ptr + add <= m_End);
00213 }
00214
00215
00216 void CStoreBuffer::x_FreeBuffer(void)
00217 {
00218 if ( m_Buffer != m_Buffer0 ) {
00219 delete[] m_Buffer;
00220 m_Buffer = m_End = m_Ptr = 0;
00221 }
00222 }
00223 }
00224
00225 void CCacheWriter::SaveStringGi(CReaderRequestResult& result,
00226 const string& seq_id)
00227 {
00228 if( !m_IdCache) {
00229 return;
00230 }
00231
00232 CLoadLockSeq_ids ids(result, seq_id);
00233 if ( ids->IsLoadedGi() ) {
00234 CStoreBuffer str;
00235 str.StoreInt4(ids->GetGi());
00236 try {
00237 m_IdCache->Store(seq_id, 0, GetGiSubkey(),
00238 str.data(), str.size());
00239 }
00240 catch ( ... ) {
00241 }
00242 }
00243 }
00244
00245
00246 void CCacheWriter::SaveSeq_idSeq_ids(CReaderRequestResult& result,
00247 const CSeq_id_Handle& seq_id)
00248 {
00249 if( !m_IdCache) {
00250 return;
00251 }
00252
00253 CLoadLockSeq_ids ids(result, seq_id);
00254 WriteSeq_ids(GetIdKey(seq_id), ids);
00255 }
00256
00257
00258 void CCacheWriter::SaveSeq_idGi(CReaderRequestResult& result,
00259 const CSeq_id_Handle& seq_id)
00260 {
00261 if( !m_IdCache) {
00262 return;
00263 }
00264
00265 CLoadLockSeq_ids ids(result, seq_id);
00266 if ( ids->IsLoadedGi() ) {
00267 CStoreBuffer str;
00268 str.StoreInt4(ids->GetGi());
00269 try {
00270 m_IdCache->Store(GetIdKey(seq_id), 0, GetGiSubkey(),
00271 str.data(), str.size());
00272 }
00273 catch ( ... ) {
00274 }
00275 }
00276 }
00277
00278
00279 void CCacheWriter::SaveSeq_idAccVer(CReaderRequestResult& result,
00280 const CSeq_id_Handle& seq_id)
00281 {
00282 if( !m_IdCache) {
00283 return;
00284 }
00285
00286 CLoadLockSeq_ids ids(result, seq_id);
00287 if ( ids->IsLoadedAccVer() ) {
00288 CSeq_id_Handle acc = ids->GetAccVer();
00289 const string& str = acc.AsString();
00290 try {
00291 m_IdCache->Store(GetIdKey(seq_id), 0, GetAccVerSubkey(),
00292 str.data(), str.size());
00293 }
00294 catch ( ... ) {
00295 }
00296 }
00297 }
00298
00299
00300 void CCacheWriter::SaveSeq_idLabel(CReaderRequestResult& result,
00301 const CSeq_id_Handle& seq_id)
00302 {
00303 if( !m_IdCache) {
00304 return;
00305 }
00306
00307 CLoadLockSeq_ids ids(result, seq_id);
00308 if ( ids->IsLoadedLabel() ) {
00309 const string& str = ids->GetLabel();
00310 try {
00311 m_IdCache->Store(GetIdKey(seq_id), 0, GetLabelSubkey(),
00312 str.data(), str.size());
00313 }
00314 catch ( ... ) {
00315 }
00316 }
00317 }
00318
00319
00320 namespace {
00321 class CCacheDataEraser {
00322 CCacheDataEraser(ICache* cache,
00323 const string& key, int version, const string& subkey)
00324 : m_Cache(cache),
00325 m_Key(key), m_Version(version), m_Subkey(subkey)
00326 {
00327 }
00328 ~CCacheDataEraser(void) {
00329 if ( m_Cache ) {
00330 try {
00331 m_Cache->Remove(m_Key, m_Version, m_Subkey);
00332 }
00333 catch (...) {
00334 }
00335 m_Cache = 0;
00336 }
00337 }
00338
00339 void Done(void) {
00340 m_Cache = 0;
00341 }
00342
00343 private:
00344 ICache* m_Cache;
00345 string m_Key;
00346 int m_Version;
00347 string m_Subkey;
00348 };
00349 }
00350
00351
00352 void CCacheWriter::WriteSeq_ids(const string& key,
00353 const CLoadLockSeq_ids& ids)
00354 {
00355 if( !m_IdCache) {
00356 return;
00357 }
00358
00359 if ( !ids.IsLoaded() ) {
00360 return;
00361 }
00362
00363 try {
00364 auto_ptr<IWriter> writer
00365 (m_IdCache->GetWriteStream(key, 0, GetSeq_idsSubkey()));
00366 if ( !writer.get() ) {
00367 return;
00368 }
00369
00370 {{
00371 CWStream w_stream(writer.get());
00372 CObjectOStreamAsnBinary obj_stream(w_stream);
00373 static_cast<CObjectOStream&>(obj_stream).WriteUint4(ids->size());
00374 ITERATE ( CLoadInfoSeq_ids, it, *ids ) {
00375 obj_stream << *it->GetSeqId();
00376 }
00377 }}
00378
00379 writer.reset();
00380 }
00381 catch ( ... ) {
00382
00383
00384 try {
00385 m_BlobCache->Remove(key, 0, GetSeq_idsSubkey());
00386 }
00387 catch ( ... ) {
00388 }
00389
00390 }
00391 }
00392
00393
00394 void CCacheWriter::SaveSeq_idBlob_ids(CReaderRequestResult& result,
00395 const CSeq_id_Handle& seq_id,
00396 const SAnnotSelector* sel)
00397 {
00398 if ( !m_IdCache) {
00399 return;
00400 }
00401
00402 CLoadLockBlob_ids ids(result, seq_id, sel);
00403 if ( !ids.IsLoaded() ) {
00404 return;
00405 }
00406
00407 string subkey, true_subkey;
00408 GetBlob_idsSubkey(sel, subkey, true_subkey);
00409 CStoreBuffer str;
00410 str.StoreInt4(IDS_MAGIC);
00411 str.StoreUint4(ids->GetState());
00412 str.StoreUint4(ids->size());
00413 ITERATE ( CLoadInfoBlob_ids, it, *ids ) {
00414 const CBlob_id& id = *it->first;
00415 str.StoreUint4(id.GetSat());
00416 str.StoreUint4(id.GetSubSat());
00417 str.StoreUint4(id.GetSatKey());
00418 const CBlob_Info& info = it->second;
00419 str.StoreUint4(info.GetContentsMask());
00420 str.StoreUint4(info.GetNamedAnnotNames().size());
00421 ITERATE(CBlob_Info::TNamedAnnotNames, it2, info.GetNamedAnnotNames()) {
00422 str.StoreString(*it2);
00423 }
00424 }
00425 if ( !true_subkey.empty() ) {
00426 str.StoreString(true_subkey);
00427 }
00428 try {
00429 m_IdCache->Store(GetIdKey(seq_id), 0, subkey, str.data(), str.size());
00430 }
00431 catch ( ... ) {
00432 }
00433 }
00434
00435
00436 void CCacheWriter::SaveBlobVersion(CReaderRequestResult& ,
00437 const TBlobId& blob_id,
00438 TBlobVersion version)
00439 {
00440 if( !m_IdCache ) {
00441 return;
00442 }
00443
00444 CStoreBuffer str;
00445 str.StoreInt4(version);
00446 try {
00447 m_IdCache->Store(GetBlobKey(blob_id), 0, GetBlobVersionSubkey(),
00448 str.data(), str.size());
00449 }
00450 catch ( ... ) {
00451 }
00452 }
00453
00454
00455 class CCacheBlobStream : public CWriter::CBlobStream
00456 {
00457 public:
00458 typedef int TVersion;
00459
00460 CCacheBlobStream(ICache* cache, const string& key,
00461 TVersion version, const string& subkey)
00462 : m_Cache(cache), m_Key(key), m_Version(version), m_Subkey(subkey),
00463 m_Writer(cache->GetWriteStream(key, version, subkey))
00464 {
00465 if ( m_Writer.get() ) {
00466 m_Stream.reset(new CWStream(m_Writer.get()));
00467 }
00468 }
00469 ~CCacheBlobStream(void)
00470 {
00471 if ( m_Stream.get() ) {
00472 Abort();
00473 }
00474 }
00475
00476 bool CanWrite(void) const
00477 {
00478 return m_Stream.get() != 0;
00479 }
00480
00481 CNcbiOstream& operator*(void)
00482 {
00483 _ASSERT(m_Stream.get());
00484 return *m_Stream;
00485 }
00486
00487 void Close(void)
00488 {
00489 *m_Stream << flush;
00490 if ( !*m_Stream ) {
00491 Abort();
00492 }
00493 m_Stream.reset();
00494 m_Writer.reset();
00495 }
00496
00497 void Abort(void)
00498 {
00499 m_Stream.reset();
00500 m_Writer.reset();
00501 Remove();
00502 }
00503
00504 void Remove(void)
00505 {
00506 try {
00507 m_Cache->Remove(m_Key, m_Version, m_Subkey);
00508 }
00509 catch (...) {
00510 }
00511 }
00512
00513 private:
00514 ICache* m_Cache;
00515 string m_Key;
00516 TVersion m_Version;
00517 string m_Subkey;
00518 auto_ptr<IWriter> m_Writer;
00519 auto_ptr<CWStream> m_Stream;
00520 };
00521
00522
00523 CRef<CWriter::CBlobStream>
00524 CCacheWriter::OpenBlobStream(CReaderRequestResult& result,
00525 const TBlobId& blob_id,
00526 TChunkId chunk_id,
00527 const CProcessor& processor)
00528 {
00529 if( !m_BlobCache ) {
00530 return null;
00531 }
00532
00533 try {
00534 CLoadLockBlob blob(result, blob_id);
00535 CRef<CBlobStream> stream
00536 (new CCacheBlobStream(m_BlobCache, GetBlobKey(blob_id),
00537 blob.GetBlobVersion(),
00538 GetBlobSubkey(chunk_id)));
00539 if ( !stream->CanWrite() ) {
00540 return null;
00541 }
00542
00543 WriteProcessorTag(**stream, processor);
00544 return stream;
00545 }
00546 catch ( ... ) {
00547 return null;
00548 }
00549 }
00550
00551
00552 bool CCacheWriter::CanWrite(EType type) const
00553 {
00554 return (type == eIdWriter ? m_IdCache : m_BlobCache) != 0;
00555 }
00556
00557
00558 END_SCOPE(objects)
00559
00560
00561 using namespace objects;
00562
00563
00564
00565
00566
00567
00568 class CCacheWriterCF :
00569 public CSimpleClassFactoryImpl<CWriter, CCacheWriter>
00570 {
00571 private:
00572 typedef CSimpleClassFactoryImpl<CWriter, CCacheWriter> TParent;
00573 public:
00574 CCacheWriterCF()
00575 : TParent(NCBI_GBLOADER_WRITER_CACHE_DRIVER_NAME, 0) {}
00576 ~CCacheWriterCF() {}
00577
00578
00579 CWriter*
00580 CreateInstance(const string& driver = kEmptyStr,
00581 CVersionInfo version = NCBI_INTERFACE_VERSION(CWriter),
00582 const TPluginManagerParamTree* params = 0) const
00583 {
00584 if ( !driver.empty() && driver != m_DriverName )
00585 return 0;
00586
00587 if ( !version.Match(NCBI_INTERFACE_VERSION(CWriter)) ) {
00588 return 0;
00589 }
00590 return new CCacheWriter();
00591 }
00592 };
00593
00594
00595 void NCBI_EntryPoint_CacheWriter(
00596 CPluginManager<CWriter>::TDriverInfoList& info_list,
00597 CPluginManager<CWriter>::EEntryPointRequest method)
00598 {
00599 CHostEntryPointImpl<CCacheWriterCF>::NCBI_EntryPointImpl(info_list,
00600 method);
00601 }
00602
00603
00604 void NCBI_EntryPoint_xwriter_cache(
00605 CPluginManager<CWriter>::TDriverInfoList& info_list,
00606 CPluginManager<CWriter>::EEntryPointRequest method)
00607 {
00608 NCBI_EntryPoint_CacheWriter(info_list, method);
00609 }
00610
00611
00612 void GenBankWriters_Register_Cache(void)
00613 {
00614 RegisterEntryPoint<CWriter>(NCBI_EntryPoint_CacheWriter);
00615 }
00616
00617
00618 END_NCBI_SCOPE
00619
00620