|
NCBI C++ ToolKit
|
00001 /* $Id: bdb_blobcache.cpp 52450 2011-12-27 20:42:58Z kazimird $ 00002 * =========================================================================== 00003 * 00004 * PUBLIC DOMAIN NOTICE 00005 * National Center for Biotechnology Information 00006 * 00007 * This software/database is a "United States Government Work" under the 00008 * terms of the United States Copyright Act. It was written as part of 00009 * the author's official duties as a United States Government employee and 00010 * thus cannot be copyrighted. This software/database is freely available 00011 * to the public for use. The National Library of Medicine and the U.S. 00012 * Government have not placed any restriction on its use or reproduction. 00013 * 00014 * Although all reasonable efforts have been taken to ensure the accuracy 00015 * and reliability of the software and data, the NLM and the U.S. 00016 * Government do not and cannot warrant the performance or results that 00017 * may be obtained by using this software or data. The NLM and the U.S. 00018 * Government disclaim all warranties, express or implied, including 00019 * warranties of performance, merchantability or fitness for any particular 00020 * purpose. 00021 * 00022 * Please cite the author in any work or product based on this material. 00023 * 00024 * =========================================================================== 00025 * 00026 * Author: Anatoliy Kuznetsov 00027 * 00028 * File Description: BDB libarary BLOB cache implementation. 00029 * 00030 */ 00031 00032 #include <ncbi_pch.hpp> 00033 00034 #include <corelib/ncbitime.hpp> 00035 #include <corelib/ncbifile.hpp> 00036 #include <corelib/ncbi_process.hpp> 00037 #include <corelib/plugin_manager_impl.hpp> 00038 #include <corelib/ncbi_system.hpp> 00039 #include <corelib/ncbi_limits.h> 00040 #include <corelib/ncbimtx.hpp> 00041 #include <corelib/ncbitime.hpp> 00042 #include <corelib/plugin_manager_store.hpp> 00043 #include <corelib/ncbiexpt.hpp> 00044 00045 #include <connect/server_monitor.hpp> 00046 00047 #include <db.h> 00048 00049 #include <db/bdb/bdb_blobcache.hpp> 00050 #include <db/bdb/bdb_cursor.hpp> 00051 #include <db/bdb/bdb_trans.hpp> 00052 00053 #include <db/error_codes.hpp> 00054 00055 #include <util/cache/icache_cf.hpp> 00056 #include <util/cache/icache_clean_thread.hpp> 00057 #include <util/simple_buffer.hpp> 00058 00059 #include <time.h> 00060 #include <math.h> 00061 00062 00063 #define NCBI_USE_ERRCODE_X Db_Bdb_BlobCache 00064 00065 00066 BEGIN_NCBI_SCOPE 00067 00068 00069 // =========================================================================== 00070 // 00071 // Various implementation notes: 00072 // 00073 // The code below uses mixed transaction model, data store transactions 00074 // go in the default mode (sync or async) (configurable). 00075 // Miscelaneous service transactions (like those from GC (Purge)) are 00076 // explicitly async (for speed). This is not critical if we lose some of them. 00077 // 00078 // 00079 00080 const char* kBDBCacheStartedFileName = "__ncbi_cache_started__"; 00081 00082 00083 static void s_MakeOverflowFileName(string& buf, 00084 const string& path, 00085 const string& cache, 00086 const string& key, 00087 int version, 00088 const string& subkey) 00089 { 00090 buf = path + cache + '_' + key + '_' 00091 + NStr::IntToString(version) + '_' + subkey + ".ov_"; 00092 } 00093 00094 00095 /// @internal 00096 struct SCacheDescr 00097 { 00098 string key; 00099 int version; 00100 string subkey; 00101 int overflow; 00102 unsigned blob_id; 00103 00104 SCacheDescr(string x_key, 00105 int x_version, 00106 string x_subkey, 00107 int x_overflow, 00108 unsigned x_blob_id) 00109 : key(x_key), 00110 version(x_version), 00111 subkey(x_subkey), 00112 overflow(x_overflow), 00113 blob_id(x_blob_id) 00114 {} 00115 00116 SCacheDescr() {} 00117 }; 00118 00119 00120 /// @internal 00121 class CBDB_CacheIReader : public IReader 00122 { 00123 public: 00124 CBDB_CacheIReader(CBDB_Cache& bdb_cache, 00125 CNcbiIfstream* overflow_file, 00126 CBDB_Cache::TBlobLock& blob_lock) 00127 : m_Cache(bdb_cache), 00128 m_OverflowFile(overflow_file), 00129 m_RawBuffer(0), 00130 m_BufferPtr(0), 00131 m_BlobLock(blob_lock.GetLockVector(), blob_lock.GetTimeout()) 00132 { 00133 m_BlobLock.TakeFrom(blob_lock); 00134 } 00135 00136 CBDB_CacheIReader(CBDB_Cache& bdb_cache, 00137 CBDB_RawFile::TBuffer* raw_buffer, 00138 CBDB_Cache::TBlobLock& blob_lock) 00139 : m_Cache(bdb_cache), 00140 m_OverflowFile(0), 00141 m_RawBuffer(raw_buffer), 00142 m_BufferPtr(raw_buffer->data()), 00143 m_BufferSize(raw_buffer->size()), 00144 m_BlobLock(blob_lock.GetLockVector(), blob_lock.GetTimeout()) 00145 { 00146 m_BlobLock.TakeFrom(blob_lock); 00147 } 00148 00149 virtual ~CBDB_CacheIReader() 00150 { 00151 if ( m_RawBuffer ) { 00152 if ( m_BufferSize ) { 00153 ERR_POST("CBDB_CacheIReader: detected unread input "<< 00154 m_BufferSize); 00155 } 00156 delete m_RawBuffer; 00157 } 00158 if ( m_OverflowFile ) { 00159 CNcbiStreampos pos = m_OverflowFile->tellg(); 00160 m_OverflowFile->seekg(0, IOS_BASE::end); 00161 CNcbiStreampos end = m_OverflowFile->tellg(); 00162 if ( pos != end ) { 00163 ERR_POST("CBDB_CacheIReader: detected unread input "<< 00164 (end-pos)<<": "<<pos<<" of "<<end); 00165 } 00166 delete m_OverflowFile; 00167 } 00168 } 00169 00170 00171 virtual ERW_Result Read(void* buf, 00172 size_t count, 00173 size_t* bytes_read) 00174 { 00175 if (count == 0) 00176 return eRW_Success; 00177 00178 // Check if BLOB is memory based... 00179 if (m_RawBuffer) { 00180 if (m_BufferSize == 0) { 00181 *bytes_read = 0; 00182 return eRW_Eof; 00183 } 00184 *bytes_read = min(count, m_BufferSize); 00185 ::memcpy(buf, m_BufferPtr, *bytes_read); 00186 m_BufferPtr += *bytes_read; 00187 m_BufferSize -= *bytes_read; 00188 return eRW_Success; 00189 } 00190 00191 // Check if BLOB is file based... 00192 if (m_OverflowFile) { 00193 m_OverflowFile->read((char*)buf, count); 00194 *bytes_read = m_OverflowFile->gcount(); 00195 if (*bytes_read == 0) { 00196 return eRW_Eof; 00197 } 00198 return eRW_Success; 00199 } 00200 00201 return eRW_Success; 00202 } 00203 00204 virtual ERW_Result PendingCount(size_t* count) 00205 { 00206 if ( m_RawBuffer ) { 00207 *count = m_BufferSize; 00208 return eRW_Success; 00209 } 00210 else if ( m_OverflowFile ) { 00211 *count = m_OverflowFile->good()? 1: 0; 00212 return eRW_Success; 00213 } 00214 *count = 0; 00215 return eRW_Error; 00216 } 00217 00218 00219 private: 00220 CBDB_CacheIReader(const CBDB_CacheIReader&); 00221 CBDB_CacheIReader& operator=(const CBDB_CacheIReader&); 00222 00223 private: 00224 CBDB_Cache& m_Cache; 00225 CNcbiIfstream* m_OverflowFile; 00226 CBDB_RawFile::TBuffer* m_RawBuffer; 00227 unsigned char* m_BufferPtr; 00228 size_t m_BufferSize; 00229 CBDB_Cache::TBlobLock m_BlobLock; 00230 }; 00231 00232 /// Buffer resize strategy, to balance memory reallocs and heap 00233 /// consumption 00234 /// 00235 /// @internal 00236 class CCacheBufferResizeStrategy 00237 { 00238 public: 00239 static size_t GetNewCapacity(size_t cur_capacity, size_t requested_size) 00240 { 00241 size_t reserve = requested_size / 2; 00242 if (reserve > 1 * 1024 * 1024) { 00243 reserve = 1 * 1024 * 1024; 00244 } 00245 return requested_size + reserve; 00246 } 00247 }; 00248 00249 00250 /// @internal 00251 class CBDB_CacheIWriter : public IWriter 00252 { 00253 public: 00254 typedef 00255 CSimpleBufferT<unsigned char, CCacheBufferResizeStrategy> TBuffer; 00256 00257 public: 00258 CBDB_CacheIWriter(CBDB_Cache& bdb_cache, 00259 const char* path, 00260 unsigned blob_id_ext, 00261 const string& blob_key, 00262 int version, 00263 const string& subkey, 00264 SCache_AttrDB& attr_db, 00265 unsigned int ttl, 00266 time_t request_time, 00267 const string& owner, 00268 CBDB_Cache::TBlobLock& blob_lock) 00269 : m_Cache(bdb_cache), 00270 m_Path(path), 00271 m_BlobIdExt(blob_id_ext), 00272 m_BlobKey(blob_key), 00273 m_Version(version), 00274 m_SubKey(subkey), 00275 m_AttrDB(attr_db), 00276 m_Buffer(0), 00277 // m_BytesInBuffer(0), 00278 m_OverflowFile(0), 00279 m_TTL(ttl), 00280 m_RequestTime(request_time), 00281 m_Flushed(false), 00282 m_BlobSize(0), 00283 m_Overflow(0), 00284 m_BlobStore(0), 00285 m_BlobUpdate(0), 00286 m_Owner(owner), 00287 m_BlobLock(blob_lock.GetLockVector(), blob_lock.GetTimeout()) 00288 { 00289 //_TRACE("CBDB_CacheIWriter::CBDB_CacheIWriter point 1"); 00290 // m_Buffer = new unsigned char[m_Cache.GetOverflowLimit()]; 00291 m_Buffer.reserve(4 * 1024); 00292 m_BlobLock.TakeFrom(blob_lock); 00293 //_TRACE("CBDB_CacheIWriter::CBDB_CacheIWriter point 2"); 00294 } 00295 00296 virtual ~CBDB_CacheIWriter() 00297 { 00298 //_TRACE("CBDB_CacheIWriter::~CBDB_CacheIWriter point 1"); 00299 try { 00300 //bool upd_statistics = false; 00301 00302 // Dumping the buffer 00303 try { 00304 //if (m_Buffer.size()) { 00305 if (!m_OverflowFile && !m_Flushed) { 00306 _TRACE("CBDB_CacheIWriter::~CBDB_CacheIWriter point 2"); 00307 m_Cache.x_Store(m_BlobIdExt, 00308 m_BlobKey, 00309 m_Version, 00310 m_SubKey, 00311 m_Buffer.data(), 00312 m_Buffer.size(), // m_BytesInBuffer, 00313 m_TTL, 00314 m_Owner, 00315 false // do not lock blob 00316 ); 00317 //delete[] m_Buffer; m_Buffer = 0; 00318 } 00319 } catch (CBDB_Exception& ) { 00320 //_TRACE("CBDB_CacheIWriter::~CBDB_CacheIWriter point 3"); 00321 m_Cache.KillBlob(m_BlobKey, m_Version, m_SubKey, 00322 1, 00323 0); 00324 throw; 00325 } 00326 //_TRACE("CBDB_CacheIWriter::~CBDB_CacheIWriter point 4"); 00327 00328 if (m_OverflowFile) { 00329 if (m_OverflowFile->is_open()) { 00330 //_TRACE("CBDB_CacheIWriter::~CBDB_CacheIWriter point 5"); 00331 m_OverflowFile->close(); 00332 try { 00333 m_Cache.RegisterOverflow(m_BlobKey, m_Version, m_SubKey, 00334 m_TTL, m_Owner); 00335 } catch (exception& ) { 00336 m_Cache.KillBlob(m_BlobKey, m_Version, m_SubKey, 1, 0); 00337 throw; 00338 } 00339 //_TRACE("CBDB_CacheIWriter::~CBDB_CacheIWriter point 6"); 00340 //upd_statistics = true; 00341 } 00342 delete m_OverflowFile; m_OverflowFile = 0; 00343 00344 // statistics 00345 // FIXME: ideally statistics should not sit on the same main mutex 00346 if (/*upd_statistics && */m_Cache.IsSaveStatistics()) { 00347 try { 00348 CFastMutexGuard guard(m_Cache.m_DB_Lock); 00349 m_Cache.m_Statistics.AddStore(m_Owner, 00350 m_RequestTime, 00351 m_BlobStore, 00352 m_BlobUpdate, 00353 m_BlobSize, 00354 m_Overflow); 00355 } catch (exception& ) { 00356 // ignore non critical exceptions 00357 } 00358 } 00359 } 00360 //_TRACE("CBDB_CacheIWriter::~CBDB_CacheIWriter point 7"); 00361 00362 } catch (exception & ex) { 00363 ERR_POST_X(1, "Exception in ~CBDB_CacheIWriter() : " << ex.what() 00364 << " " << m_BlobKey); 00365 // final attempt to avoid leaks 00366 // delete[] m_Buffer; 00367 delete m_OverflowFile; 00368 } 00369 } 00370 00371 virtual ERW_Result Write(const void* buf, 00372 size_t count, 00373 size_t* bytes_written = 0) 00374 { 00375 _ASSERT(m_Flushed == false); 00376 00377 if (bytes_written) 00378 *bytes_written = 0; 00379 if (count == 0) 00380 return eRW_Success; 00381 // m_AttrUpdFlag = false; 00382 m_BlobSize += count; 00383 m_Flushed = false; 00384 00385 // check BLOB size quota 00386 if (m_Cache.GetMaxBlobSize()) { 00387 if (m_BlobSize > m_Cache.GetMaxBlobSize()) { 00388 //delete m_Buffer; m_Buffer = 0; 00389 m_Buffer.clear(); 00390 delete m_OverflowFile; m_OverflowFile = 0; 00391 m_Cache.KillBlob(m_BlobKey, m_Version, m_SubKey, 1, 0); 00392 // FIXME: 00393 if (m_Cache.IsSaveStatistics()) { 00394 CFastMutexGuard guard(m_Cache.m_DB_Lock); 00395 m_Cache.m_Statistics.AddBlobQuotaError(m_Owner); 00396 } 00397 00398 string msg("BLOB larger than allowed. size="); 00399 msg.append(NStr::UIntToString(m_BlobSize)); 00400 msg.append(" quota="); 00401 msg.append(NStr::UIntToString(m_Cache.GetMaxBlobSize())); 00402 BDB_THROW(eQuotaLimit, msg); 00403 } 00404 } 00405 00406 00407 // if (m_Buffer.size()) { 00408 if (!m_OverflowFile) { 00409 unsigned int new_buf_length = m_Buffer.size() + count; 00410 if (new_buf_length <= m_Cache.GetOverflowLimit()) { 00411 size_t old_size = m_Buffer.size(); 00412 m_Buffer.resize(old_size + count); 00413 ::memcpy(m_Buffer.data() + old_size, buf, count); 00414 //m_BytesInBuffer = new_buf_length; 00415 if (bytes_written) { 00416 *bytes_written = count; 00417 } 00418 00419 return eRW_Success; 00420 } else { 00421 // Buffer overflow. Writing to file. 00422 OpenOverflowFile(); 00423 if (m_OverflowFile) { 00424 if (m_Buffer.size()) { 00425 try { 00426 x_WriteOverflow((const char*)m_Buffer.data(), 00427 m_Buffer.size()); 00428 } 00429 catch (exception& ) { 00430 m_Buffer.clear(); 00431 //delete[] m_Buffer; m_Buffer = 0; 00432 delete m_OverflowFile; m_OverflowFile = 0; 00433 throw; 00434 } 00435 } 00436 //delete[] m_Buffer; m_Buffer = 0; m_BytesInBuffer = 0; 00437 m_Buffer.clear(); 00438 } 00439 } 00440 00441 } 00442 if (m_OverflowFile) { 00443 00444 _ASSERT(m_Buffer.size() == 0); 00445 00446 x_WriteOverflow((char*)buf, count); 00447 if (bytes_written) { 00448 *bytes_written = count; 00449 } 00450 return eRW_Success; 00451 } 00452 00453 return eRW_Error; 00454 } 00455 00456 /// Flush pending data (if any) down to output device. 00457 virtual ERW_Result Flush(void) 00458 { 00459 if (m_Flushed) 00460 return eRW_Success; 00461 00462 m_Flushed = true; 00463 00464 // Dumping the buffer 00465 00466 // if (m_Buffer.size()) { 00467 if (!m_OverflowFile) { 00468 00469 try { 00470 m_Cache.x_Store(m_BlobIdExt, 00471 m_BlobKey, 00472 m_Version, 00473 m_SubKey, 00474 m_Buffer.data(), 00475 m_Buffer.size(), // m_BytesInBuffer, 00476 m_TTL, 00477 m_Owner, 00478 false // do not lock blob 00479 ); 00480 //m_BlobLock.Unlock(); 00481 //return eRW_Success; 00482 } 00483 catch (exception&) { 00484 m_Buffer.clear(); 00485 // delete[] m_Buffer; m_Buffer = 0; 00486 throw; 00487 } 00488 00489 //delete[] m_Buffer; m_Buffer = 0; 00490 //m_BytesInBuffer = 0; 00491 //m_Buffer.clear(); 00492 //return eRW_Success; 00493 } 00494 00495 if ( m_OverflowFile ) { 00496 00497 _ASSERT(m_Buffer.size() == 0); 00498 00499 try { 00500 m_OverflowFile->flush(); 00501 if ( m_OverflowFile->bad() ) { 00502 m_OverflowFile->close(); 00503 BDB_THROW(eOverflowFileIO, 00504 "Error trying to flush an overflow file"); 00505 } 00506 /* 00507 m_Cache.RegisterOverflow(m_BlobKey, m_Version, m_SubKey, 00508 m_TTL, m_Owner); 00509 00510 // FIXME: 00511 if (m_Cache.IsSaveStatistics()) { 00512 CFastMutexGuard guard(m_Cache.m_DB_Lock); 00513 try { 00514 m_Cache.m_Statistics.AddStore(m_Owner, 00515 m_RequestTime, 00516 m_BlobStore, 00517 m_BlobUpdate, 00518 m_BlobSize, 00519 m_Overflow); 00520 } catch (exception&) { 00521 // ignore 00522 } 00523 } 00524 */ 00525 } catch (CBDB_Exception& ) { 00526 m_Cache.KillBlob(m_BlobKey, m_Version, m_SubKey, 00527 1, 00528 0); 00529 throw; 00530 } 00531 } 00532 m_BlobLock.Unlock(); 00533 return eRW_Success; 00534 } 00535 private: 00536 void OpenOverflowFile() 00537 { 00538 s_MakeOverflowFileName( 00539 m_OverflowFilePath, m_Path, m_Cache.GetName(), m_BlobKey, m_Version, m_SubKey); 00540 00541 _TRACE("LC: Making overflow file " << m_OverflowFilePath); 00542 m_OverflowFile = 00543 new CNcbiOfstream(m_OverflowFilePath.c_str(), 00544 IOS_BASE::out | 00545 IOS_BASE::trunc | 00546 IOS_BASE::binary); 00547 if (!m_OverflowFile->is_open() || m_OverflowFile->bad()) { 00548 delete m_OverflowFile; m_OverflowFile = 0; 00549 string err = "LC: Cannot create overflow file "; 00550 err += m_OverflowFilePath; 00551 BDB_THROW(eCannotOpenOverflowFile, err); 00552 } 00553 m_Overflow = 1; 00554 } 00555 00556 void x_WriteOverflow(const char* buf, streamsize count) 00557 { 00558 _ASSERT(m_OverflowFile); 00559 00560 if (!m_OverflowFile->is_open()) { 00561 BDB_THROW(eOverflowFileIO, 00562 "LC: Attempt to write to a non-open overflow file"); 00563 } 00564 try { 00565 m_Cache.WriteOverflow(*m_OverflowFile, 00566 m_OverflowFilePath, 00567 buf, count); 00568 } 00569 catch (exception&) { 00570 // any error here is critical to the BLOB storage integrity 00571 // if file IO error happens we delete BLOB altogether 00572 m_Cache.KillBlob(m_BlobKey, m_Version, m_SubKey, 1, 0); 00573 throw; 00574 } 00575 } 00576 00577 private: 00578 CBDB_CacheIWriter(const CBDB_CacheIWriter&); 00579 CBDB_CacheIWriter& operator=(const CBDB_CacheIWriter&); 00580 00581 private: 00582 CBDB_Cache& m_Cache; 00583 const char* m_Path; 00584 unsigned m_BlobIdExt; 00585 string m_BlobKey; 00586 int m_Version; 00587 string m_SubKey; 00588 SCache_AttrDB& m_AttrDB; 00589 00590 TBuffer m_Buffer; 00591 // unsigned char* m_Buffer; 00592 // unsigned int m_BytesInBuffer; 00593 CNcbiOfstream* m_OverflowFile; 00594 string m_OverflowFilePath; 00595 00596 int m_StampSubKey; 00597 // bool m_AttrUpdFlag; ///< Flags attributes are up to date 00598 CBDB_Cache::EWriteSyncMode m_WSync; 00599 unsigned int m_TTL; 00600 time_t m_RequestTime; 00601 bool m_Flushed; ///< FALSE until Flush() called 00602 00603 unsigned m_BlobSize; ///< Number of bytes written 00604 unsigned m_Overflow; ///< Overflow file created 00605 unsigned m_BlobStore; 00606 unsigned m_BlobUpdate; 00607 string m_Owner; 00608 CBDB_Cache::TBlobLock m_BlobLock; 00609 }; 00610 00611 00612 SBDB_CacheUnitStatistics::SBDB_CacheUnitStatistics() 00613 { 00614 Init(); 00615 } 00616 00617 void SBDB_CacheUnitStatistics::Init() 00618 { 00619 blobs_stored_total = blobs_updates_total = 00620 blobs_never_read_total = blobs_expl_deleted_total = 00621 blobs_purge_deleted_total = blobs_db = 00622 blobs_overflow_total = blobs_read_total = blob_size_max_total = 0; 00623 00624 err_protocol = err_internal = err_communication = 00625 err_no_blob = err_blob_get = err_blob_put = err_blob_over_quota = 0; 00626 00627 blobs_size_total = blobs_size_db = 0.0; 00628 InitHistorgam(&blob_size_hist); 00629 00630 time_access.erase(time_access.begin(), time_access.end()); 00631 } 00632 00633 void SBDB_CacheUnitStatistics::InitHistorgam(TBlobSizeHistogram* hist) 00634 { 00635 _ASSERT(hist); 00636 00637 hist->clear(); 00638 unsigned hist_value = 512; 00639 for (unsigned i = 0; i < 100; ++i) { 00640 (*hist)[hist_value] = 0; 00641 hist_value *= 2; 00642 } 00643 (*hist)[kMax_UInt] = 0; 00644 } 00645 00646 void SBDB_CacheUnitStatistics::AddToHistogram(TBlobSizeHistogram* hist, 00647 unsigned size) 00648 { 00649 if (hist->empty()) { 00650 return; 00651 } 00652 TBlobSizeHistogram::iterator it = hist->upper_bound(size); 00653 if (it == hist->end()) { 00654 return; 00655 } 00656 ++(it->second); 00657 } 00658 00659 00660 /// Compute current hour of day 00661 static 00662 void s_GetDayHour(time_t time_in_secs, unsigned* day, unsigned* hour) 00663 { 00664 *day = static_cast<unsigned>(time_in_secs / (24 * 60 * 60)); 00665 unsigned secs_in_day = static_cast<unsigned>(time_in_secs % (24 * 60 * 60)); 00666 *hour = secs_in_day / 3600; 00667 } 00668 00669 void SBDB_CacheUnitStatistics::AddStore(time_t tm, 00670 unsigned store, 00671 unsigned update, 00672 unsigned blob_size, 00673 unsigned overflow) 00674 { 00675 blobs_stored_total += store; 00676 blobs_updates_total += update; 00677 blobs_size_total += blob_size; 00678 blobs_overflow_total += overflow; 00679 if (blob_size > blob_size_max_total) { 00680 blob_size_max_total = blob_size; 00681 } 00682 AddToHistogram(&blob_size_hist, blob_size); 00683 00684 unsigned day, hour; 00685 s_GetDayHour(tm, &day, &hour); 00686 if (time_access.empty()) { 00687 time_access.push_back(SBDB_TimeAccessStatistics(day, hour, 1, 0)); 00688 return; 00689 } 00690 SBDB_TimeAccessStatistics& ta_stat = time_access.back(); 00691 if (ta_stat.day == day && ta_stat.hour == hour) { 00692 ++ta_stat.put_count; 00693 } else { 00694 time_access.push_back(SBDB_TimeAccessStatistics(day, hour, 1, 0)); 00695 if (time_access.size() > 48) { 00696 time_access.pop_front(); 00697 } 00698 } 00699 } 00700 00701 void SBDB_CacheUnitStatistics::AddRead(time_t tm) 00702 { 00703 ++blobs_read_total; 00704 00705 unsigned day, hour; 00706 s_GetDayHour(tm, &day, &hour); 00707 if (time_access.empty()) { 00708 time_access.push_back(SBDB_TimeAccessStatistics(day, hour, 0, 1)); 00709 return; 00710 } 00711 SBDB_TimeAccessStatistics& ta_stat = time_access.back(); 00712 if (ta_stat.day == day && ta_stat.hour == hour) { 00713 ++ta_stat.get_count; 00714 } else { 00715 time_access.push_back(SBDB_TimeAccessStatistics(day, hour, 0, 1)); 00716 if (time_access.size() > 48) { 00717 time_access.pop_front(); 00718 } 00719 } 00720 } 00721 00722 void SBDB_CacheUnitStatistics::x_AddErrGetPut(EErrGetPut operation) 00723 { 00724 switch (operation) { 00725 case eErr_Put: 00726 ++err_blob_put; 00727 break; 00728 case eErr_Get: 00729 ++err_blob_get; 00730 break; 00731 case eErr_Unknown: 00732 break; 00733 default: 00734 _ASSERT(0); 00735 break; 00736 } // switch 00737 } 00738 00739 void SBDB_CacheUnitStatistics::AddInternalError(EErrGetPut operation) 00740 { 00741 ++err_internal; 00742 x_AddErrGetPut(operation); 00743 } 00744 00745 void SBDB_CacheUnitStatistics::AddBlobQuotaError() 00746 { 00747 ++err_blob_over_quota; 00748 x_AddErrGetPut(eErr_Put); 00749 } 00750 00751 void SBDB_CacheUnitStatistics::AddProtocolError(EErrGetPut operation) 00752 { 00753 ++err_protocol; 00754 x_AddErrGetPut(operation); 00755 } 00756 00757 void SBDB_CacheUnitStatistics::AddNoBlobError(EErrGetPut operation) 00758 { 00759 ++err_no_blob; 00760 x_AddErrGetPut(operation); 00761 } 00762 00763 void SBDB_CacheUnitStatistics::AddCommError(EErrGetPut operation) 00764 { 00765 ++err_communication; 00766 x_AddErrGetPut(operation); 00767 } 00768 00769 void SBDB_CacheUnitStatistics::PrintStatistics(CNcbiOstream& out) const 00770 { 00771 out 00772 << "Total number of blobs ever stored " << "\t" << blobs_stored_total << "\n" 00773 << "Total number of overflow blobs (large size) " << "\t" << blobs_overflow_total << "\n" 00774 << "Total number of blobs updates " << "\t" << blobs_updates_total << "\n" 00775 << "Total number of blobs stored but never read " << "\t" << blobs_never_read_total << "\n" 00776 << "Total number of reads " << "\t" << blobs_read_total << "\n" 00777 << "Total number of explicit deletes " << "\t" << blobs_expl_deleted_total << "\n" 00778 << "Total number of BLOBs deletes by garbage collector " << "\t" << blobs_purge_deleted_total << "\n" 00779 << "Total size of all BLOBs ever stored " << "\t" << blob_size_max_total << "\n" 00780 00781 << "Current database number of records(BLOBs) " << "\t" << blobs_db << "\n" 00782 << "Current size of all BLOBs " << "\t" << blobs_size_db << "\n" 00783 00784 << "Number of NetCache protocol errors " << "\t" << err_protocol << "\n" 00785 << "Number of communication errors " << "\t" << err_communication << "\n" 00786 << "Number of NetCache server internal errors " << "\t" << err_internal << "\n" 00787 << "Number of BLOB not found situations " << "\t" << err_no_blob << "\n" 00788 << "Number of errors when getting BLOBs " << "\t" << err_blob_get << "\n" 00789 << "Number of errors when storing BLOBs " << "\t" << err_blob_put << "\n" 00790 << "Number of errors when BLOB is over the size limit " << "\t" << err_blob_over_quota << "\n"; 00791 00792 out << "\n\n"; 00793 00794 if (!time_access.empty()) { 00795 out << "# Time access statistics:" << "\n" << "\n"; 00796 out << "# Hour \t Puts \t Gets" << "\n"; 00797 00798 ITERATE(TTimeAccess, it, time_access) { 00799 const SBDB_TimeAccessStatistics& ta_stat = *it; 00800 out << ta_stat.hour << "\t" 00801 << ta_stat.put_count << "\t" 00802 << ta_stat.get_count << "\n"; 00803 } 00804 } 00805 out << "\n\n"; 00806 00807 if (!blob_size_hist.empty()) { 00808 out << "# BLOB size histogram:" << "\n" << "\n"; 00809 out << "# Size \t Count" << "\n"; 00810 00811 const TBlobSizeHistogram& hist = blob_size_hist; 00812 SBDB_CacheUnitStatistics::TBlobSizeHistogram::const_iterator hist_end = 00813 hist.end(); 00814 00815 ITERATE(SBDB_CacheUnitStatistics::TBlobSizeHistogram, it, hist) { 00816 if (it->second > 0) { 00817 hist_end = it; 00818 } 00819 } 00820 ITERATE(SBDB_CacheUnitStatistics::TBlobSizeHistogram, it, hist) { 00821 out << it->first << "\t" << it->second << "\n"; 00822 if (it == hist_end) { 00823 break; 00824 } 00825 } // for 00826 } 00827 00828 } 00829 00830 void SBDB_CacheUnitStatistics::ConvertToRegistry(IRWRegistry* reg, 00831 const string& sect_name_postfix) const 00832 { 00833 string postfix(sect_name_postfix); 00834 postfix = NStr::Replace(postfix, " ", "_"); 00835 00836 // convert main parameters 00837 00838 {{ 00839 string sect_stat("bdb_stat"); 00840 if (!postfix.empty()) { 00841 sect_stat.append("_"); 00842 sect_stat.append(postfix); 00843 } 00844 00845 reg->Set(sect_stat, "blobs_stored_total", 00846 NStr::UIntToString(blobs_stored_total), 0, 00847 "Total number of blobs ever stored"); 00848 reg->Set(sect_stat, "blobs_overflow_total", 00849 NStr::UIntToString(blobs_overflow_total), 0, 00850 "Total number of overflow blobs (large size)"); 00851 reg->Set(sect_stat, "blobs_updates_total", 00852 NStr::UIntToString(blobs_updates_total), 0, 00853 "Total number of blobs updates"); 00854 reg->Set(sect_stat, "blobs_never_read_total", 00855 NStr::UIntToString(blobs_never_read_total), 0, 00856 "Total number of blobs stored but never read"); 00857 reg->Set(sect_stat, "blobs_read_total", 00858 NStr::UIntToString(blobs_read_total), 0, 00859 "Total number of reads"); 00860 reg->Set(sect_stat, "blobs_expl_deleted_total", 00861 NStr::UIntToString(blobs_expl_deleted_total), 0, 00862 "Total number of explicit deletes"); 00863 reg->Set(sect_stat, "blobs_purge_deleted_total", 00864 NStr::UIntToString(blobs_purge_deleted_total), 0, 00865 "Total number of BLOBs deletes by garbage collector"); 00866 reg->Set(sect_stat, "blobs_size_total", 00867 NStr::UIntToString(unsigned(blobs_size_total)), 0, 00868 "Total size of all BLOBs ever stored"); 00869 reg->Set(sect_stat, "blob_size_max_total", 00870 NStr::UIntToString(blob_size_max_total), 0, 00871 "Size of the largest BLOB ever stored"); 00872 00873 00874 reg->Set(sect_stat, "blobs_db", 00875 NStr::UIntToString(blobs_db), 0, 00876 "Current database number of records(BLOBs)"); 00877 reg->Set(sect_stat, "blobs_size_db", 00878 NStr::UIntToString(unsigned(blobs_size_db)), 0, 00879 "Current size of all BLOBs"); 00880 00881 reg->Set(sect_stat, "err_protocol", 00882 NStr::UIntToString(err_protocol), 0, 00883 "Number of NetCache protocol errors"); 00884 reg->Set(sect_stat, "err_communication", 00885 NStr::UIntToString(err_communication), 0, 00886 "Number of communication errors"); 00887 reg->Set(sect_stat, "err_internal", 00888 NStr::UIntToString(err_internal), 0, 00889 "Number of NetCache server internal errors"); 00890 reg->Set(sect_stat, "err_no_blob", 00891 NStr::UIntToString(err_no_blob), 0, 00892 "Number of BLOB not found situations"); 00893 reg->Set(sect_stat, "err_blob_get", 00894 NStr::UIntToString(err_blob_get), 0, 00895 "Number of errors when getting BLOBs"); 00896 reg->Set(sect_stat, "err_blob_put", 00897 NStr::UIntToString(err_blob_put), 0, 00898 "Number of errors when storing BLOBs"); 00899 reg->Set(sect_stat, "err_blob_over_quota", 00900 NStr::UIntToString(err_blob_over_quota), 0, 00901 "Number of errors when BLOB is over the size limit"); 00902 00903 // convert time access statistics 00904 // 00905 if (!time_access.empty()) { 00906 00907 string get_history; 00908 string put_history; 00909 00910 ITERATE(TTimeAccess, it, time_access) { 00911 const SBDB_TimeAccessStatistics& ta_stat = *it; 00912 string hour = NStr::UIntToString(ta_stat.hour); 00913 string put_count = NStr::UIntToString(ta_stat.put_count); 00914 string get_count = NStr::UIntToString(ta_stat.get_count); 00915 00916 string pair = hour; 00917 pair.append("="); 00918 pair.append(put_count); 00919 00920 if (!put_history.empty()) { 00921 put_history.append(";"); 00922 } 00923 put_history.append(pair); 00924 00925 00926 pair = hour; 00927 pair.append("="); 00928 pair.append(get_count); 00929 00930 if (!get_history.empty()) { 00931 get_history.append(";"); 00932 } 00933 get_history.append(pair); 00934 00935 } // ITERATE 00936 00937 reg->Set(sect_stat, "get_access", 00938 get_history, 0, 00939 "Read access by hours (hour=count)"); 00940 reg->Set(sect_stat, "put_access", 00941 put_history, 0, 00942 "Write access by hours (hour=count)"); 00943 00944 } 00945 00946 00947 }} 00948 00949 // convert historgam 00950 // 00951 if (!blob_size_hist.empty()) { 00952 string sect_hist("bdb_stat_hist"); 00953 if (!postfix.empty()) { 00954 sect_hist.append("_"); 00955 sect_hist.append(postfix); 00956 } 00957 00958 const TBlobSizeHistogram& hist = blob_size_hist; 00959 SBDB_CacheUnitStatistics::TBlobSizeHistogram::const_iterator hist_end = 00960 hist.end(); 00961 00962 ITERATE(SBDB_CacheUnitStatistics::TBlobSizeHistogram, it, hist) { 00963 if (it->second > 0) { 00964 hist_end = it; 00965 } 00966 } 00967 ITERATE(SBDB_CacheUnitStatistics::TBlobSizeHistogram, it, hist) { 00968 string var_name = "size_"; 00969 var_name += NStr::UIntToString(it->first); 00970 00971 reg->Set(sect_hist, var_name, NStr::UIntToString(it->second)); 00972 00973 if (it == hist_end) { 00974 break; 00975 } 00976 } // for 00977 } 00978 00979 } 00980 00981 00982 00983 00984 SBDB_CacheStatistics::SBDB_CacheStatistics() 00985 { 00986 } 00987 00988 void SBDB_CacheStatistics::Init() 00989 { 00990 m_GlobalStat.Init(); 00991 m_OwnerStatMap.erase(m_OwnerStatMap.begin(), m_OwnerStatMap.end()); 00992 } 00993 00994 00995 void SBDB_CacheStatistics::AddStore(const string& client, 00996 time_t tm, 00997 unsigned store, 00998 unsigned update, 00999 unsigned blob_size, 01000 unsigned overflow) 01001 { 01002 m_GlobalStat.AddStore(tm, store, update, blob_size, overflow); 01003 if (!client.empty()) { 01004 m_OwnerStatMap[client].AddStore(tm, store, update, blob_size, overflow); 01005 } 01006 } 01007 01008 void SBDB_CacheStatistics::AddRead(const string& client, time_t tm) 01009 { 01010 m_GlobalStat.AddRead(tm); 01011 if (!client.empty()) { 01012 m_OwnerStatMap[client].AddRead(tm); 01013 } 01014 } 01015 01016 void SBDB_CacheStatistics::AddExplDelete(const string& client) 01017 { 01018 m_GlobalStat.AddExplDelete(); 01019 if (!client.empty()) { 01020 m_OwnerStatMap[client].AddExplDelete(); 01021 } 01022 } 01023 01024 void SBDB_CacheStatistics::AddPurgeDelete(const string& client) 01025 { 01026 m_GlobalStat.AddPurgeDelete(); 01027 if (!client.empty()) { 01028 m_OwnerStatMap[client].AddPurgeDelete(); 01029 } 01030 } 01031 01032 void SBDB_CacheStatistics::AddNeverRead(const string& client) 01033 { 01034 m_GlobalStat.AddNeverRead(); 01035 if (!client.empty()) { 01036 m_OwnerStatMap[client].AddNeverRead(); 01037 } 01038 } 01039 01040 void SBDB_CacheStatistics::AddBlobQuotaError(const string& client) 01041 { 01042 m_GlobalStat.AddBlobQuotaError(); 01043 if (!client.empty()) { 01044 m_OwnerStatMap[client].AddBlobQuotaError(); 01045 } 01046 } 01047 01048 void SBDB_CacheStatistics::AddInternalError(const string& client, 01049 SBDB_CacheUnitStatistics::EErrGetPut operation) 01050 { 01051 m_GlobalStat.AddInternalError(operation); 01052 if (!client.empty()) { 01053 m_OwnerStatMap[client].AddInternalError(operation); 01054 } 01055 } 01056 01057 void SBDB_CacheStatistics::AddProtocolError(const string& client, 01058 SBDB_CacheUnitStatistics::EErrGetPut operation) 01059 { 01060 m_GlobalStat.AddProtocolError(operation); 01061 if (!client.empty()) { 01062 m_OwnerStatMap[client].AddProtocolError(operation); 01063 } 01064 } 01065 01066 void SBDB_CacheStatistics::AddNoBlobError(const string& client, 01067 SBDB_CacheUnitStatistics::EErrGetPut operation) 01068 { 01069 m_GlobalStat.AddNoBlobError(operation); 01070 if (!client.empty()) { 01071 m_OwnerStatMap[client].AddNoBlobError(operation); 01072 } 01073 } 01074 01075 void SBDB_CacheStatistics::AddCommError(const string& client, 01076 SBDB_CacheUnitStatistics::EErrGetPut operation) 01077 { 01078 m_GlobalStat.AddCommError(operation); 01079 if (!client.empty()) { 01080 m_OwnerStatMap[client].AddCommError(operation); 01081 } 01082 } 01083 01084 void SBDB_CacheStatistics::ConvertToRegistry(IRWRegistry* reg) const 01085 { 01086 m_GlobalStat.ConvertToRegistry(reg, kEmptyStr); 01087 ITERATE(TOwnerStatMap, it, m_OwnerStatMap) { 01088 const string& client = it->first; 01089 const SBDB_CacheUnitStatistics& stat = it->second; 01090 stat.ConvertToRegistry(reg, client); 01091 } 01092 } 01093 01094 void SBDB_CacheStatistics::PrintStatistics(CNcbiOstream& out) const 01095 { 01096 out << "## " << "\n" 01097 << "## Global statistics" << "\n" 01098 << "## " << "\n\n"; 01099 m_GlobalStat.PrintStatistics(out); 01100 out << "\n\n"; 01101 01102 ITERATE(TOwnerStatMap, it, m_OwnerStatMap) { 01103 const string& client = it->first; 01104 const SBDB_CacheUnitStatistics& stat = it->second; 01105 01106 out << "## " << "\n" 01107 << "## Owner statistics:" << client << "\n" 01108 << "## " << "\n\n"; 01109 01110 stat.PrintStatistics(out); 01111 out << "\n\n"; 01112 } 01113 } 01114 01115 01116 CBDB_Cache::CBDB_Cache() 01117 : m_PidGuard(0), 01118 m_ReadOnly(false), 01119 m_InitIfDirty(false), 01120 m_JoinedEnv(false), 01121 m_LockTimeout(20 * 1000), 01122 m_Env(0), 01123 m_Closed(true), 01124 m_BLOB_SplitStore(0), 01125 m_CacheAttrDB(0), 01126 m_CacheIdIDX(0), 01127 m_CacheAttrDB_RO1(0), 01128 m_CacheAttrDB_RO2(0), 01129 m_CacheIdIDX_RO(0), 01130 m_Timeout(0), 01131 m_MaxTimeout(0), 01132 m_VersionFlag(eDropOlder), 01133 m_WSync(eWriteNoSync), 01134 m_PurgeBatchSize(150), 01135 m_BatchSleep(0), 01136 m_PurgeStopSignal(0, 100), // purge stop semaphore max 100 01137 m_CleanLogOnPurge(0), 01138 m_PurgeCount(0), 01139 m_LogSizeMax(0), 01140 m_PurgeNowRunning(false), 01141 m_RunPurgeThread(false), 01142 m_PurgeThreadDelay(10), 01143 m_CheckPointInterval(24 * (1024 * 1024)), 01144 m_OverflowLimit(512 * 1024), 01145 m_MaxTTL_prolong(0), 01146 m_SaveStatistics(false), 01147 m_MaxBlobSize(0), 01148 m_RoundRobinVolumes(0), 01149 m_MempTrickle(10), 01150 m_Monitor(0), 01151 m_TimeLine(0) 01152 { 01153 m_TimeStampFlag = fTimeStampOnRead | 01154 fExpireLeastFrequentlyUsed | 01155 fPurgeOnStartup; 01156 01157 } 01158 01159 CBDB_Cache::~CBDB_Cache() 01160 { 01161 try { 01162 Close(); 01163 } catch (exception& ) 01164 {} 01165 } 01166 01167 void CBDB_Cache::SetTTL_Prolongation(unsigned ttl_prolong) 01168 { 01169 m_MaxTTL_prolong = ttl_prolong; 01170 } 01171 01172 void CBDB_Cache::x_PidLock(ELockMode lm) 01173 { 01174 string lock_file = string("lcs_") + m_Name + string(".pid"); 01175 string lock_file_path = m_Path + lock_file; 01176 01177 switch (lm) 01178 { 01179 case ePidLock: 01180 _ASSERT(m_PidGuard == 0); 01181 m_PidGuard = new CPIDGuard(lock_file_path, m_Path); 01182 break; 01183 case eNoLock: 01184 break; 01185 default: 01186 break; 01187 } 01188 } 01189 01190 void CBDB_Cache::SetOverflowLimit(unsigned limit) 01191 { 01192 _ASSERT(!IsOpen()); 01193 m_OverflowLimit = limit; 01194 } 01195 01196 void CBDB_Cache::Open(const string& cache_path, 01197 const string& cache_name, 01198 ELockMode lm, 01199 Uint8 cache_ram_size, 01200 ETRansact use_trans, 01201 unsigned int log_mem_size) 01202 { 01203 {{ 01204 m_NextExpTime = 0; 01205 // m_PurgeSkipCnt = 0; 01206 m_LastTimeLineCheck = 0; 01207 01208 Close(); 01209 m_Closed = false; 01210 01211 CFastMutexGuard guard(m_DB_Lock); 01212 01213 m_Path = CDirEntry::AddTrailingPathSeparator(cache_path); 01214 m_Name = cache_name; 01215 01216 CDir dir(m_Path); 01217 CDir::TEntries fl = dir.GetEntries("__db.*", CDir::eIgnoreRecursive); 01218 CFile fl_clean(CDirEntry::MakePath(m_Path, kBDBCacheStartedFileName)); 01219 01220 static set<string> s_OpenedDirs; 01221 01222 if ((!fl.empty() || fl_clean.Exists()) 01223 && s_OpenedDirs.count(m_Path) == 0) 01224 { 01225 if (m_InitIfDirty) { 01226 LOG_POST("Database was closed uncleanly. Removing directory " 01227 << m_Path); 01228 dir.Remove(); 01229 } 01230 else { 01231 // Opening db with recover flags is unreliable. 01232 LOG_POST("Database was closed uncleanly. Running recovery..."); 01233 BDB_RecoverEnv(m_Path, false); 01234 fl_clean.Remove(); 01235 } 01236 } 01237 01238 01239 // Make sure our directory exists 01240 if ( !dir.Exists() ) { 01241 dir.Create(); 01242 } 01243 01244 if (!fl_clean.Exists()) { 01245 CFileWriter writer(fl_clean.GetPath()); 01246 string pid = NStr::NumericToString(CProcess::GetCurrentPid()); 01247 writer.Write(pid.data(), pid.size()); 01248 s_OpenedDirs.insert(m_Path); 01249 } 01250 01251 m_Env = new CBDB_Env(); 01252 01253 string err_file = m_Path + "err" + string(cache_name) + ".log"; 01254 m_Env->OpenErrFile(err_file.c_str()); 01255 01256 m_JoinedEnv = false; 01257 bool needs_recovery = false; 01258 01259 if (!fl.empty()) { 01260 try { 01261 m_Env->JoinEnv(cache_path, CBDB_Env::eThreaded); 01262 if (m_Env->IsTransactional()) { 01263 LOG_POST_X(2, Info << 01264 "LC: '" << cache_name << 01265 "' Joined transactional environment "); 01266 } else { 01267 LOG_POST_X(3, Info << 01268 "LC: '" << cache_name << 01269 "' Warning: Joined non-transactional environment "); 01270 } 01271 m_JoinedEnv = true; 01272 } 01273 catch (CBDB_ErrnoException& err_ex) 01274 { 01275 if (err_ex.IsRecovery()) { 01276 LOG_POST_X(4, Warning << 01277 "LC: '" << cache_name << 01278 "'Warning: DB_ENV returned DB_RUNRECOVERY code." 01279 " Running the recovery procedure."); 01280 } 01281 needs_recovery = true; 01282 } 01283 catch (CBDB_Exception&) 01284 { 01285 needs_recovery = true; 01286 } 01287 } 01288 01289 if (!m_JoinedEnv) { 01290 if (log_mem_size == 0) { 01291 if (m_LogSizeMax >= (20 * 1024 * 1024)) { 01292 m_Env->SetLogFileMax(m_LogSizeMax); 01293 } else { 01294 m_Env->SetLogFileMax(200 * 1024 * 1024); 01295 } 01296 m_Env->SetLogBSize(1 * 1024 * 1024); 01297 } else { 01298 m_Env->SetLogInMemory(true); 01299 m_Env->SetLogBSize(log_mem_size); 01300 } 01301 if (!m_LogDir.empty()) { 01302 m_Env->SetLogDir(m_LogDir); 01303 } 01304 01305 if (cache_ram_size) { 01306 // empirically if total cache size is large 01307 // you Berkeley DB works faster if you break it into several 01308 // files. The sweetspot seems to be around 250M per file. 01309 // (but growing number of files is not a good idea as well) 01310 // so here I'm dancing to set some reasonable number of caches 01311 // 01312 int cache_num = 1; 01313 if (cache_ram_size > (500 * 1024 * 1024)) { 01314 cache_num = (int)((cache_ram_size) / Uint8(250 * 1024 * 1024)); 01315 } 01316 if (!cache_num) { 01317 cache_num = 1; // paranoid check 01318 } 01319 if (cache_num > 10) { // too many files? 01320 cache_num = 10; 01321 } 01322 m_Env->SetCacheSize(cache_ram_size, cache_num); 01323 } 01324 01325 unsigned checkpoint_KB = m_CheckPointInterval / 1024; 01326 if (checkpoint_KB == 0) { 01327 checkpoint_KB = 64; 01328 } 01329 m_Env->SetCheckPointKB(checkpoint_KB); 01330 01331 m_Env->SetLogAutoRemove(true); 01332 01333 x_PidLock(lm); 01334 switch (use_trans) 01335 { 01336 case eUseTrans: 01337 { 01338 CBDB_Env::TEnvOpenFlags env_flags = CBDB_Env::eThreaded; 01339 if (needs_recovery) { 01340 env_flags |= CBDB_Env::eRunRecovery; 01341 } 01342 m_Env->SetMaxLocks(50000); 01343 m_Env->OpenWithTrans(cache_path, env_flags); 01344 } 01345 break; 01346 case eNoTrans: 01347 LOG_POST_X(5, Info << "BDB_Cache: Creating locking environment"); 01348 m_Env->OpenWithLocks(cache_path); 01349 break; 01350 default: 01351 _ASSERT(0); 01352 } // switch 01353 01354 m_Env->SetLockTimeout(30 * 1000000); // 30 sec 01355 if (m_Env->IsTransactional()) { 01356 m_Env->SetTransactionTimeout(30 * 1000000); // 30 sec 01357 } 01358 01359 } 01360 01361 if (GetWriteSync() == eWriteSync) { 01362 m_Env->SetTransactionSync(CBDB_Transaction::eTransSync); 01363 } else { 01364 m_Env->SetTransactionSync(CBDB_Transaction::eTransASync); 01365 } 01366 01367 m_BLOB_SplitStore = 01368 new TSplitStore(new CBDB_BlobDeMux_RoundRobin(m_RoundRobinVolumes)); 01369 m_CacheAttrDB = new SCache_AttrDB(); 01370 m_CacheAttrDB_RO1 = new SCache_AttrDB(); 01371 m_CacheAttrDB_RO2 = new SCache_AttrDB(); 01372 m_CacheIdIDX = new SCache_IdIDX(); 01373 m_CacheIdIDX_RO = new SCache_IdIDX(); 01374 01375 m_BLOB_SplitStore->SetEnv(*m_Env); 01376 m_CacheAttrDB->SetEnv(*m_Env); 01377 m_CacheAttrDB_RO1->SetEnv(*m_Env); 01378 m_CacheAttrDB_RO2->SetEnv(*m_Env); 01379 m_CacheIdIDX->SetEnv(*m_Env); 01380 m_CacheIdIDX_RO->SetEnv(*m_Env); 01381 01382 string cache_blob_db_name = 01383 string("lcs_") + string(cache_name) + string("_blob"); 01384 string attr_db_name = 01385 string("lcs_") + string(cache_name) + string("_attr5") + string(".db"); 01386 string id_idx_name = 01387 string("lcs_") + string(cache_name) + string("_id") + string(".idx"); 01388 01389 m_BLOB_SplitStore->Open(cache_blob_db_name, CBDB_RawFile::eReadWriteCreate); 01390 m_CacheAttrDB->Open(attr_db_name, CBDB_RawFile::eReadWriteCreate); 01391 m_CacheAttrDB_RO1->Open(attr_db_name, CBDB_RawFile::eReadOnly); 01392 m_CacheAttrDB_RO2->Open(attr_db_name, CBDB_RawFile::eReadOnly); 01393 m_CacheIdIDX->Open(id_idx_name, CBDB_RawFile::eReadWriteCreate); 01394 m_CacheIdIDX_RO->Open(id_idx_name, CBDB_RawFile::eReadOnly); 01395 01396 // try to open all databases 01397 // 01398 m_BLOB_SplitStore->OpenProjections(); 01399 01400 }} 01401 01402 // Create timeline for BLOB expiration 01403 // 01404 01405 {{ 01406 unsigned timeline_precision = 5 * 60; // 5min default precision 01407 if (m_Timeout) { 01408 timeline_precision = m_Timeout / 10; 01409 } 01410 if (timeline_precision > (5 * 60)) { // 5 min max. 01411 timeline_precision = 5 * 60; 01412 } 01413 if (timeline_precision < (1 * 60)) { 01414 timeline_precision = 1 * 60; 01415 } 01416 m_TimeLine = new TTimeLine(timeline_precision, 0); 01417 }} 01418 01419 01420 // read cache attributes so we can adjust atomic counter 01421 // 01422 01423 {{ 01424 LOG_POST_X(6, Info << "Scanning cache content."); 01425 01426 m_CacheAttrDB->SetTransaction(0); 01427 01428 CBDB_FileCursor cur(*m_CacheAttrDB); 01429 cur.SetCondition(CBDB_FileCursor::eFirst); 01430 cur.InitMultiFetch(10 * 1024 * 1024); 01431 unsigned max_blob_id = 0; 01432 while (cur.Fetch() == eBDB_Ok) { 01433 unsigned blob_id = m_CacheAttrDB->blob_id; 01434 max_blob_id = max(max_blob_id, blob_id); 01435 01436 unsigned coord[2]; 01437 coord[0] = m_CacheAttrDB->volume_id; 01438 coord[1] = m_CacheAttrDB->split_id; 01439 01440 m_BLOB_SplitStore->AssignCoordinates(blob_id, coord); 01441 01442 // check BLOB id index integrity 01443 {{ 01444 m_CacheIdIDX->blob_id = blob_id; 01445 if (m_CacheIdIDX->Fetch() == eBDB_Ok) { 01446 } else { 01447 // index not found... corruption? 01448 string key = m_CacheAttrDB->key.GetString(); 01449 int version = m_CacheAttrDB->version; 01450 string subkey = m_CacheAttrDB->subkey.GetString(); 01451 01452 m_CacheIdIDX->blob_id = blob_id; 01453 m_CacheIdIDX->key = key; 01454 m_CacheIdIDX->version = version; 01455 m_CacheIdIDX->subkey = subkey; 01456 01457 m_CacheIdIDX->Insert(); 01458 } 01459 }} 01460 01461 } 01462 m_BlobIdCounter.Set(max_blob_id); 01463 }} 01464 01465 if (m_TimeStampFlag & fPurgeOnStartup) { 01466 unsigned batch_sleep = m_BatchSleep; 01467 unsigned batch_size = m_PurgeBatchSize; 01468 unsigned purge_thread_delay = m_PurgeThreadDelay; 01469 01470 // setup parameters which favor fast Purge execution 01471 // (Open purge needs to be fast because all waiting for it) 01472 01473 if (m_PurgeBatchSize < 2500) { 01474 m_PurgeBatchSize = 2500; 01475 } 01476 m_BatchSleep = 0; 01477 m_PurgeThreadDelay = 0; 01478 01479 Purge(GetTimeout()); 01480 01481 // Restore parameters 01482 01483 m_BatchSleep = batch_sleep; 01484 m_PurgeBatchSize = batch_size; 01485 m_PurgeThreadDelay = purge_thread_delay; 01486 } 01487 01488 01489 if (m_RunPurgeThread) { 01490 # ifdef NCBI_THREADS 01491 LOG_POST_X(7, Info << "Starting cache cleaning thread."); 01492 m_PurgeThread.Reset( 01493 new CCacheCleanerThread(this, m_PurgeThreadDelay, 5)); 01494 m_PurgeThread->Run(); 01495 01496 if (!m_JoinedEnv) { 01497 CBDB_Env::TBackgroundFlags flags = 01498 CBDB_Env::eBackground_MempTrickle | 01499 CBDB_Env::eBackground_Checkpoint | 01500 CBDB_Env::eBackground_DeadLockDetect; 01501 //_TRACE("Running background writer with delay " << m_PurgeThreadDelay); 01502 m_Env->RunBackgroundWriter(flags, 01503 m_CheckPointDelay, 01504 m_MempTrickle); 01505 } 01506 # else 01507 LOG_POST_X(8, Warning << 01508 "Cannot run background thread in non-MT configuration."); 01509 m_Env->TransactionCheckpoint(); 01510 # endif 01511 } 01512 01513 m_ReadOnly = false; 01514 01515 LOG_POST_X(9, Info << 01516 "LC: '" << cache_name << 01517 "' Cache mount at: " << cache_path); 01518 01519 } 01520 01521 void CBDB_Cache::RunPurgeThread(unsigned purge_delay) 01522 { 01523 m_RunPurgeThread = true; 01524 m_PurgeThreadDelay = purge_delay; 01525 } 01526 01527 void CBDB_Cache::StopPurgeThread() 01528 { 01529 # ifdef NCBI_THREADS 01530 if (!m_PurgeThread.Empty()) { 01531 LOG_POST_X(10, Info << "Stopping cache cleaning thread..."); 01532 StopPurge(); 01533 m_PurgeThread->RequestStop(); 01534 m_PurgeThread->Join(); 01535 LOG_POST_X(11, Info << "Stopped."); 01536 } 01537 # endif 01538 } 01539 01540 void CBDB_Cache::OpenReadOnly(const string& cache_path, 01541 const string& cache_name, 01542 unsigned int cache_ram_size) 01543 { 01544 {{ 01545 01546 Close(); 01547 01548 CFastMutexGuard guard(m_DB_Lock); 01549 01550 m_Path = CDirEntry::AddTrailingPathSeparator(cache_path); 01551 m_Name = cache_name; 01552 01553 m_BLOB_SplitStore = new TSplitStore(new CBDB_BlobDeMux_RoundRobin(0)); 01554 m_CacheAttrDB = new SCache_AttrDB(); 01555 /* 01556 if (cache_ram_size) 01557 m_CacheBLOB_DB->SetCacheSize(cache_ram_size); 01558 */ 01559 string cache_blob_db_name = 01560 string("lcs_") + string(cache_name) + string("_blob"); 01561 string attr_db_name = 01562 m_Path + string("lcs_") + string(cache_name) + string("_attr5") 01563 + string(".db"); 01564 01565 01566 m_BLOB_SplitStore->Open(cache_blob_db_name, CBDB_RawFile::eReadOnly); 01567 m_CacheAttrDB->Open(attr_db_name.c_str(), CBDB_RawFile::eReadOnly); 01568 01569 }} 01570 01571 m_ReadOnly = true; 01572 01573 LOG_POST_X(12, Info << 01574 "LC: '" << cache_name << 01575 "' Cache mount read-only at: " << cache_path); 01576 } 01577 01578 01579 void CBDB_Cache::Close() 01580 { 01581 if (m_Closed) return; 01582 // We mark cache as closed early to prevent double close attempt if 01583 // exception fires before completion. We MUST not StopPurgeThread twice. 01584 m_Closed = true; 01585 StopPurgeThread(); 01586 01587 if (m_Env && !m_JoinedEnv) { 01588 m_Env->StopBackgroundWriterThread(); 01589 } 01590 01591 if (m_BLOB_SplitStore) { 01592 m_BLOB_SplitStore->Save(); 01593 } 01594 01595 delete m_PidGuard; m_PidGuard = 0; 01596 01597 delete m_CacheAttrDB_RO1; m_CacheAttrDB_RO1 = 0; 01598 delete m_CacheAttrDB_RO2; m_CacheAttrDB_RO2 = 0; 01599 delete m_CacheIdIDX_RO; m_CacheIdIDX_RO = 0; 01600 01601 delete m_BLOB_SplitStore; m_BLOB_SplitStore = 0; 01602 delete m_CacheAttrDB; m_CacheAttrDB = 0; 01603 delete m_CacheIdIDX; m_CacheIdIDX = 0; 01604 01605 delete m_TimeLine; m_TimeLine = 0; 01606 01607 if (m_Env == 0) { 01608 return; 01609 } 01610 try { 01611 m_Env->ForceTransactionCheckpoint(); 01612 CleanLog(); 01613 01614 if (m_Env->CheckRemove()) { 01615 LOG_POST_X(13, Info << 01616 "LC: '" << m_Name << "' Unmounted. BDB ENV deleted."); 01617 } else { 01618 LOG_POST_X(14, Info << "LC: '" << m_Name 01619 << "' environment still in use."); 01620 } 01621 } 01622 catch (exception& ex) { 01623 LOG_POST_X(15, Warning << "LC: '" << m_Name 01624 << "' Exception in Close() " << ex.what() 01625 << " (ignored.)"); 01626 } 01627 01628 delete m_Env; m_Env = 0; 01629 01630 CFile fl_clean(CDirEntry::MakePath(m_Path, kBDBCacheStartedFileName)); 01631 fl_clean.Remove(); 01632 } 01633 01634 void CBDB_Cache::CleanLog() 01635 { 01636 if (m_Env) { 01637 if (m_Env->IsTransactional()) { 01638 m_Env->CleanLog(); 01639 } 01640 } 01641 } 01642 01643 void CBDB_Cache::x_Close() 01644 { 01645 delete m_PidGuard; m_PidGuard = 0; 01646 delete m_CacheAttrDB; m_CacheAttrDB = 0; 01647 delete m_Env; m_Env = 0; 01648 } 01649 01650 ICache::TFlags CBDB_Cache::GetFlags() 01651 { 01652 return (TFlags) 0; 01653 } 01654 01655 void CBDB_Cache::SetFlags(ICache::TFlags flags) 01656 { 01657 } 01658 01659 void CBDB_Cache::SetTimeStampPolicy(TTimeStampFlags policy, 01660 unsigned int timeout, 01661 unsigned int max_timeout) 01662 { 01663 CFastMutexGuard guard(m_DB_Lock); 01664 01665 m_TimeStampFlag = policy; 01666 m_Timeout = timeout; 01667 01668 if (max_timeout) { 01669 m_MaxTimeout = max_timeout > timeout ? max_timeout : timeout; 01670 } else { 01671 if (m_MaxTTL_prolong) { 01672 m_MaxTimeout = timeout * m_MaxTTL_prolong; 01673 } else { 01674 m_MaxTimeout = 0; 01675 } 01676 } 01677 } 01678 01679 CBDB_Cache::TTimeStampFlags CBDB_Cache::GetTimeStampPolicy() const 01680 { 01681 return m_TimeStampFlag; 01682 } 01683 01684 int CBDB_Cache::GetTimeout() const 01685 { 01686 return m_Timeout; 01687 } 01688 01689 void CBDB_Cache::SetVersionRetention(EKeepVersions policy) 01690 { 01691 CFastMutexGuard guard(m_DB_Lock); 01692 m_VersionFlag = policy; 01693 } 01694 01695 CBDB_Cache::EKeepVersions CBDB_Cache::GetVersionRetention() const 01696 { 01697 return m_VersionFlag; 01698 } 01699 01700 void CBDB_Cache::GetStatistics(SBDB_CacheStatistics* cache_stat) const 01701 { 01702 _ASSERT(cache_stat); 01703 01704 CFastMutexGuard guard(m_DB_Lock); 01705 *cache_stat = m_Statistics; 01706 } 01707 01708 void CBDB_Cache::InitStatistics() 01709 { 01710 CFastMutexGuard guard(m_DB_Lock); 01711 m_Statistics.Init(); 01712 } 01713 01714 void CBDB_Cache::KillBlob(const string& key, 01715 int version, 01716 const string& subkey, 01717 int overflow, 01718 unsigned blob_id) 01719 { 01720 01721 CBDB_Transaction trans(*m_Env, 01722 CBDB_Transaction::eEnvDefault, 01723 CBDB_Transaction::eNoAssociation); 01724 {{ 01725 CFastMutexGuard guard(m_DB_Lock); 01726 m_BLOB_SplitStore->SetTransaction(&trans); 01727 x_DropBlob(key, version, subkey, overflow, blob_id, trans); 01728 }} 01729 trans.Commit(); 01730 01731 } 01732 01733 void CBDB_Cache::DropBlob(const string& key, 01734 int version, 01735 const string& subkey, 01736 bool for_update, 01737 unsigned* blob_id, 01738 unsigned* coord) 01739 { 01740 _ASSERT(blob_id); 01741 _ASSERT(coord); 01742 01743 int overflow = 0; 01744 01745 {{ 01746 01747 CBDB_Transaction trans(*m_Env, 01748 CBDB_Transaction::eEnvDefault, 01749 CBDB_Transaction::eNoAssociation); 01750 01751 {{ 01752 01753 CFastMutexGuard guard(m_DB_Lock); 01754 m_CacheAttrDB->SetTransaction(&trans); 01755 01756 CBDB_FileCursor cur(*m_CacheAttrDB, trans, 01757 CBDB_FileCursor::eReadModifyUpdate); 01758 cur.SetCondition(CBDB_FileCursor::eEQ); 01759 01760 cur.From << key << version << subkey; 01761 01762 if (cur.Fetch() == eBDB_Ok) { 01763 overflow = m_CacheAttrDB->overflow; 01764 *blob_id = m_CacheAttrDB->blob_id; 01765 01766 coord[0] = m_CacheAttrDB->volume_id; 01767 coord[1] = m_CacheAttrDB->split_id; 01768 01769 if (!for_update) { // permanent BLOB removal 01770 string owner_name; 01771 m_CacheAttrDB->owner_name.ToString(owner_name); 01772 01773 // FIXME: 01774 if (IsSaveStatistics()) { 01775 m_Statistics.AddExplDelete(owner_name); 01776 if (0 == m_CacheAttrDB->read_count) { 01777 m_Statistics.AddNeverRead(owner_name); 01778 } 01779 x_UpdateOwnerStatOnDelete(owner_name, 01780 true);//explicit del 01781 } 01782 01783 cur.Delete(); 01784 } 01785 01786 } else { 01787 *blob_id = 0; 01788 return; 01789 } 01790 }} 01791 01792 // delete the split store BLOB 01793 if (!for_update) { // permanent BLOB removal 01794 unsigned split_coord[2]; 01795 EBDB_ErrCode ret = 01796 m_BLOB_SplitStore->GetCoordinates(*blob_id, split_coord); 01797 01798 m_BLOB_SplitStore->SetTransaction(&trans); 01799 if (ret == eBDB_Ok) { 01800 if (coord[0] != split_coord[0] || coord[1] != split_coord[1]) { 01801 m_BLOB_SplitStore->Delete(*blob_id, split_coord, 01802 CBDB_RawFile::eThrowOnError); 01803 } 01804 } 01805 m_BLOB_SplitStore->Delete(*blob_id, coord, 01806 CBDB_RawFile::eThrowOnError); 01807 01808 } 01809 01810 trans.Commit(); 01811 }} 01812 01813 if (overflow) { 01814 x_DropOverflow(key.c_str(), version, subkey.c_str()); 01815 } 01816 01817 } 01818 01819 void CBDB_Cache::RegisterOverflow(const string& key, 01820 int version, 01821 const string& subkey, 01822 unsigned time_to_live, 01823 const string& owner) 01824 { 01825 time_t curr = time(0); 01826 01827 unsigned blob_id = 0; 01828 unsigned coord[2]={0,}; 01829 01830 {{ 01831 CBDB_Transaction trans(*m_Env, 01832 CBDB_Transaction::eEnvDefault, 01833 CBDB_Transaction::eNoAssociation); 01834 01835 {{ 01836 CFastMutexGuard guard(m_DB_Lock); 01837 m_CacheAttrDB->SetTransaction(&trans); 01838 m_CacheIdIDX->SetTransaction(&trans); 01839 01840 EBDB_ErrCode ret; 01841 {{ 01842 CBDB_FileCursor cur(*m_CacheAttrDB, 01843 trans, 01844 CBDB_FileCursor::eReadModifyUpdate); 01845 cur.SetCondition(CBDB_FileCursor::eEQ); 01846 cur.From << key << version << subkey; 01847 01848 ret = cur.Fetch(); 01849 if (ret == eBDB_Ok) { 01850 m_CacheAttrDB->time_stamp = (unsigned)curr; 01851 m_CacheAttrDB->overflow = 1; 01852 m_CacheAttrDB->ttl = time_to_live; 01853 m_CacheAttrDB->max_time = (Uint4)ComputeMaxTime(curr); 01854 unsigned upd_count = m_CacheAttrDB->upd_count; 01855 ++upd_count; 01856 m_CacheAttrDB->upd_count = upd_count; 01857 m_CacheAttrDB->owner_name = owner; 01858 01859 blob_id = m_CacheAttrDB->blob_id; 01860 coord[0] = m_CacheAttrDB->volume_id; 01861 coord[1] = m_CacheAttrDB->split_id; 01862 01863 cur.Update(); 01864 } 01865 }} // cursor 01866 01867 if (ret != eBDB_Ok) { // record not found: re-registration 01868 unsigned blob_id = GetNextBlobId(false /*no locking*/); 01869 m_CacheAttrDB->key = key; 01870 m_CacheAttrDB->version = version; 01871 m_CacheAttrDB->subkey = subkey; 01872 m_CacheAttrDB->time_stamp = (unsigned)curr; 01873 m_CacheAttrDB->overflow = 1; 01874 m_CacheAttrDB->ttl = time_to_live; 01875 m_CacheAttrDB->max_time = (Uint4)ComputeMaxTime(curr); 01876 m_CacheAttrDB->upd_count = 0; 01877 m_CacheAttrDB->read_count = 0; 01878 m_CacheAttrDB->owner_name = owner; 01879 m_CacheAttrDB->blob_id = blob_id; 01880 m_CacheAttrDB->volume_id = 0; 01881 m_CacheAttrDB->split_id = 0; 01882 01883 ret = m_CacheAttrDB->Insert(); 01884 if (ret != eBDB_Ok) { 01885 LOG_POST_X(16, Error << "Failed to insert BLOB attributes " 01886 << key << " " << version << " " << subkey); 01887 } else { 01888 m_CacheIdIDX->blob_id = blob_id; 01889 m_CacheIdIDX->key = key; 01890 m_CacheIdIDX->version = version; 01891 m_CacheIdIDX->subkey = subkey; 01892 01893 ret = m_CacheIdIDX->Insert(); 01894 if (ret != eBDB_Ok) { 01895 LOG_POST_X(17, Error << "Failed to insert BLOB id index " 01896 << key << " " << version << " " << subkey); 01897 } 01898 } 01899 01900 01901 } 01902 01903 }} // m_DB_Lock 01904 01905 trans.Commit(); 01906 }} // trans 01907 01908 if (blob_id) { // clean up the split store 01909 CBDB_Transaction trans(*m_Env, 01910 CBDB_Transaction::eEnvDefault, 01911 CBDB_Transaction::eNoAssociation); 01912 01913 m_BLOB_SplitStore->SetTransaction(&trans); 01914 01915 try { 01916 unsigned split_coord[2]; 01917 EBDB_ErrCode ret = 01918 m_BLOB_SplitStore->GetCoordinates(blob_id, split_coord); 01919 if (ret == eBDB_Ok) { 01920 if (coord[0] != split_coord[0] || coord[1] != split_coord[1]) { 01921 m_BLOB_SplitStore->Delete(blob_id, split_coord, 01922 CBDB_RawFile::eThrowOnError); 01923 } 01924 } 01925 m_BLOB_SplitStore->Delete(blob_id, coord, 01926 CBDB_RawFile::eThrowOnError); 01927 } 01928 catch (CBDB_Exception& ex) { 01929 LOG_POST_X(18, Error << "Cannot delete BLOB from split store " 01930 << ex.what()); 01931 throw; 01932 } 01933 trans.Commit(); 01934 } 01935 } 01936 01937 void CBDB_Cache::x_Store(unsigned blob_id, 01938 const string& key, 01939 int version, 01940 const string& subkey, 01941 const void* data, 01942 size_t size, 01943 unsigned int time_to_live, 01944 const string& owner, 01945 bool do_blob_lock) 01946 { 01947 if (IsReadOnly()) { 01948 return; 01949 } 01950 //_TRACE("CBDB_Cache::x_Store point 1 size=" << size); 01951 unsigned coord[2] = {0,}; 01952 unsigned overflow = 0, old_overflow = 0; 01953 EBDB_ErrCode ret; 01954 01955 time_t curr = time(0); 01956 int tz_delta = m_LocalTimer.GetLocalTimezone(); 01957 01958 // ---------------------------------------------------- 01959 // check BLOB size quota 01960 // ---------------------------------------------------- 01961 if (GetMaxBlobSize() && size > GetMaxBlobSize()) { 01962 // FIXME: 01963 if (IsSaveStatistics()) { 01964 CFastMutexGuard guard(m_DB_Lock); 01965 m_Statistics.AddBlobQuotaError(owner); 01966 } 01967 01968 string msg("BLOB larger than allowed. size="); 01969 msg.append(NStr::SizetToString(size)); 01970 msg.append(" quota="); 01971 msg.append(NStr::UIntToString(GetMaxBlobSize())); 01972 BDB_THROW(eQuotaLimit, msg); 01973 } 01974 01975 01976 // ---------------------------------------------------- 01977 // Optional pre-cleaning 01978 // ---------------------------------------------------- 01979 01980 if (m_VersionFlag == eDropAll || m_VersionFlag == eDropOlder) { 01981 //_TRACE("CBDB_Cache::x_Store point 2"); 01982 Purge(key, subkey, 0, m_VersionFlag); 01983 } 01984 01985 TBlobLock blob_lock(m_LockVector, m_LockTimeout); 01986 01987 // ---------------------------------------------------- 01988 // BLOB check-in, read attributes, lock the id 01989 // ---------------------------------------------------- 01990 01991 EBlobCheckinRes check_res = 01992 BlobCheckIn(blob_id, 01993 key, version, subkey, 01994 eBlobCheckIn_Create, 01995 blob_lock, do_blob_lock, 01996 &coord[0], &coord[1], 01997 &old_overflow); 01998 //_TRACE("CBDB_Cache::x_Store point 3"); 01999 _ASSERT(check_res != EBlobCheckIn_NotFound); 02000 _ASSERT(blob_lock.GetId()); 02001 02002 blob_id = blob_lock.GetId(); 02003 02004 try { 02005 02006 if (size > GetOverflowLimit()) { 02007 //_TRACE("CBDB_Cache::x_Store point 4"); 02008 // ---------------------------------------------------- 02009 // write overflow file 02010 // ---------------------------------------------------- 02011 string path; 02012 s_MakeOverflowFileName(path, m_Path, GetName(), key, version, subkey); 02013 {{ 02014 CNcbiOfstream oveflow_file(path.c_str(), 02015 IOS_BASE::out | 02016 IOS_BASE::trunc | 02017 IOS_BASE::binary); 02018 if (!oveflow_file.is_open() || oveflow_file.bad()) { 02019 string err = "LC: Cannot create overflow file "; 02020 err += path; 02021 BDB_THROW(eCannotOpenOverflowFile, err); 02022 } 02023 WriteOverflow(oveflow_file, path, (const char*)data, size); 02024 }} 02025 overflow = 1; 02026 } 02027 02028 // ---------------------------------------------------- 02029 // Check if BLOB changed overflow location 02030 // ---------------------------------------------------- 02031 02032 if (overflow != old_overflow && check_res == eBlobCheckIn_Found) { 02033 if (overflow) { // BLOB goes from split to file 02034 //_TRACE("CBDB_Cache::x_Store point 5"); 02035 // ---------------------------------------------------- 02036 // Delete old split store content 02037 // ---------------------------------------------------- 02038 CBDB_Transaction trans(*m_Env, 02039 CBDB_Transaction::eEnvDefault, 02040 CBDB_Transaction::eNoAssociation); 02041 m_BLOB_SplitStore->SetTransaction(&trans); 02042 02043 try { 02044 unsigned split_coord[2]; 02045 EBDB_ErrCode ret = 02046 m_BLOB_SplitStore->GetCoordinates(blob_id, split_coord); 02047 if (ret == eBDB_Ok) { 02048 if (coord[0] != split_coord[0] || coord[1] != split_coord[1]) { 02049 m_BLOB_SplitStore->Delete(blob_id, split_coord, 02050 CBDB_RawFile::eThrowOnError); 02051 } 02052 } 02053 m_BLOB_SplitStore->Delete(blob_id, coord, 02054 CBDB_RawFile::eThrowOnError); 02055 } 02056 catch (CBDB_Exception& ex) { 02057 LOG_POST_X(19, Error << "Cannot delete BLOB from split store " 02058 << ex.what()); 02059 throw; 02060 } 02061 trans.Commit(); 02062 coord[0] = coord[1] = 0; 02063 } else { 02064 //_TRACE("CBDB_Cache::x_Store point 6"); 02065 // ---------------------------------------------------- 02066 // Delete old overflow file 02067 // ---------------------------------------------------- 02068 02069 x_DropOverflow(key, version, subkey); 02070 } 02071 } 02072 02073 // ---------------------------------------------------- 02074 // Update the split store content 02075 // ---------------------------------------------------- 02076 02077 {{ 02078 //_TRACE("CBDB_Cache::x_Store point 7"); 02079 CBDB_Transaction trans(*m_Env, 02080 CBDB_Transaction::eEnvDefault, 02081 CBDB_Transaction::eNoAssociation); 02082 m_BLOB_SplitStore->SetTransaction(&trans); 02083 //_TRACE("CBDB_Cache::x_Store point 32"); 02084 02085 m_BLOB_SplitStore->UpdateInsert(blob_id, 02086 coord, 02087 data, size, 02088 coord); 02089 //_TRACE("CBDB_Cache::x_Store point 33"); 02090 trans.Commit(); 02091 }} 02092 02093 02094 // ---------------------------------------------------- 02095 // Update BLOB attributes 02096 // ---------------------------------------------------- 02097 if (m_MaxTimeout) { 02098 if (time_to_live > m_MaxTimeout) { 02099 time_to_live = m_MaxTimeout; 02100 } 02101 } else { // m_MaxTimeout == 0 02102 if (m_MaxTTL_prolong != 0 && m_Timeout != 0) { 02103 time_to_live = min(m_Timeout * m_MaxTTL_prolong, time_to_live); 02104 } 02105 } 02106 02107 time_t ttl_max = ComputeMaxTime(curr); 02108 02109 {{ 02110 CBDB_Transaction trans(*m_Env, 02111 CBDB_Transaction::eEnvDefault, 02112 CBDB_Transaction::eNoAssociation); 02113 {{ 02114 CFastMutexGuard guard(m_DB_Lock); 02115 m_CacheAttrDB->SetTransaction(&trans); 02116 02117 CBDB_FileCursor cur(*m_CacheAttrDB, 02118 trans, 02119 CBDB_FileCursor::eReadModifyUpdate); 02120 cur.SetCondition(CBDB_FileCursor::eEQ); 02121 cur.From << key << version << subkey; 02122 ret = cur.Fetch(); 02123 if (ret == eBDB_Ok) { 02124 old_overflow = m_CacheAttrDB->overflow; 02125 02126 m_CacheAttrDB->time_stamp = (unsigned)curr; 02127 m_CacheAttrDB->overflow = overflow; 02128 m_CacheAttrDB->ttl = time_to_live; 02129 m_CacheAttrDB->max_time = (Uint4)ttl_max; 02130 unsigned upd_count = m_CacheAttrDB->upd_count; 02131 m_CacheAttrDB->upd_count = ++upd_count; 02132 m_CacheAttrDB->owner_name = owner; 02133 02134 // here is a paranoid check in case blob id ever changes 02135 // this should never happen! 02136 // 02137 unsigned old_blob_id = m_CacheAttrDB->blob_id; 02138 if (blob_id != old_blob_id) { 02139 BDB_THROW(eRaceCondition, 02140 "BLOB id mutation detected!"); 02141 } 02142 m_CacheAttrDB->volume_id = coord[0]; 02143 m_CacheAttrDB->split_id = coord[1]; 02144 02145 ret = cur.Update(); 02146 } 02147 }} // m_DB_Lock 02148 02149 02150 if (ret == eBDB_Ok) { 02151 trans.Commit(); 02152 } else { 02153 // BLOB got deleted! 02154 string msg = "BLOB insert-delete race detected!"; 02155 if (m_Monitor && m_Monitor->IsActive()) { 02156 msg += "\n"; 02157 m_Monitor->Send(msg); 02158 } 02159 BDB_THROW(eRaceCondition, msg); 02160 } 02161 02162 }} // trans 02163 02164 {{ 02165 time_t exp_time = 02166 x_ComputeExpTime((int)curr, time_to_live, GetTimeout()); 02167 02168 CFastMutexGuard guard(m_TimeLine_Lock); 02169 m_TimeLine->AddObject(exp_time, blob_id); 02170 }} // m_TimeLine_Lock 02171 02172 02173 // FIXME: locks, correct statistics, etc 02174 unsigned blob_updated = 0; 02175 unsigned blob_stored = 0; 02176 02177 if (check_res == eBlobCheckIn_Found) { 02178 blob_updated = 1; 02179 } else { 02180 blob_stored = 1; 02181 } 02182 02183 if (IsSaveStatistics()) { 02184 CFastMutexGuard guard(m_DB_Lock); 02185 m_Statistics.AddStore(owner, curr - tz_delta, 02186 blob_stored, blob_updated, size, overflow); 02187 } 02188 02189 } 02190 catch (exception) 02191 { 02192 // if things go wrong we do not want the database in an uncertain state, so 02193 // BLOB is getting killed here 02194 // 02195 02196 KillBlob(key, version, subkey, 02197 1, // try to delete overflow file too 02198 0); 02199 throw; 02200 } 02201 } 02202 02203 02204 void CBDB_Cache::Store(unsigned blob_id_ext, 02205 const string& key, 02206 int version, 02207 const string& subkey, 02208 const void* data, 02209 size_t size, 02210 unsigned int time_to_live, 02211 const string& owner) 02212 { 02213 x_Store(blob_id_ext, key, version, subkey, data, size, time_to_live, owner, 02214 true // do blob is locking 02215 ); 02216 } 02217 02218 02219 void CBDB_Cache::Store(const string& key, 02220 int version, 02221 const string& subkey, 02222 const void* data, 02223 size_t size, 02224 unsigned int time_to_live, 02225 const string& owner) 02226 { 02227 x_Store(0, key, version, subkey, data, size, time_to_live, owner, 02228 true // do blob is locking 02229 ); 02230 } 02231 02232 02233 bool CBDB_Cache::GetSizeEx(const string& key, 02234 int version, 02235 const string& subkey, 02236 size_t* size) 02237 { 02238 size_t blob_size = 0; 02239 int overflow; 02240 unsigned int ttl, blob_id, volume_id, split_id; 02241 02242 blob_id = GetBlobId(key, version, subkey); 02243 if (!blob_id) return false; 02244 TBlobLock blob_lock(m_LockVector, blob_id, m_LockTimeout); 02245 02246 {{ 02247 CFastMutexGuard guard(m_DB_Lock); 02248 m_CacheAttrDB->SetTransaction(0); 02249 02250 bool rec_exists = 02251 x_RetrieveBlobAttributes(key, version, subkey, 02252 overflow, ttl, 02253 blob_id, volume_id, split_id); 02254 if (!rec_exists) return false; 02255 02256 // check expiration here 02257 // joukovv 2006-06-24: expiration of WHAT? Apparently, not of a given 02258 // BLOB - no BLOB-specific parameters passed. Should we bother? 02259 if (m_TimeStampFlag & fCheckExpirationAlways) { 02260 if (x_CheckTimeStampExpired(*m_CacheAttrDB, time(0))) 02261 return false; 02262 } 02263 overflow = m_CacheAttrDB->overflow; 02264 }} 02265 02266 if (overflow) { 02267 string path; 02268 s_MakeOverflowFileName(path, m_Path, GetName(), key, version, subkey); 02269 CFile entry(path); 02270 02271 if (!entry.Exists()) 02272 return false; 02273 blob_size = (size_t)entry.GetLength(); 02274 } else { 02275 // Regular inline BLOB 02276 if (blob_id == 0) return false; 02277 02278 unsigned coords[2]; 02279 coords[0] = volume_id; 02280 coords[1] = split_id; 02281 02282 m_BLOB_SplitStore->SetTransaction(0); 02283 02284 EBDB_ErrCode ret = 02285 m_BLOB_SplitStore->BlobSize(blob_id, coords, &blob_size); 02286 if (ret != eBDB_Ok) return false; 02287 } 02288 if (size) *size = blob_size; 02289 return true; 02290 } 02291 02292 02293 size_t CBDB_Cache::GetSize(const string& key, 02294 int version, 02295 const string& subkey) 02296 { 02297 size_t size; 02298 return GetSizeEx(key, version, subkey, &size) ? size : 0; 02299 } 02300 02301 02302 void CBDB_Cache::GetBlobOwner(const string& key, 02303 int version, 02304 const string& subkey, 02305 string* owner) 02306 { 02307 _ASSERT(owner); 02308 02309 CFastMutexGuard guard(m_DB_Lock); 02310 m_CacheAttrDB->SetTransaction(0); 02311 02312 if (!x_FetchBlobAttributes(key, version, subkey)) { 02313 owner->erase(); 02314 return; 02315 } 02316 02317 m_CacheAttrDB->owner_name.ToString(*owner); 02318 } 02319 02320 02321 bool CBDB_Cache::HasBlobs(const string& key, 02322 const string& subkey) 02323 { 02324 time_t curr = time(0); 02325 02326 CFastMutexGuard guard(m_DB_Lock); 02327 m_CacheAttrDB->SetTransaction(0); 02328 02329 CBDB_FileCursor cur(*m_CacheAttrDB); 02330 02331 // Simple case - key only. Just look it up 02332 if (subkey.empty()) { 02333 cur.SetCondition(CBDB_FileCursor::eEQ); 02334 cur.From << key; 02335 return cur.FetchFirst() == eBDB_Ok && 02336 !x_CheckTimeStampExpired(*m_CacheAttrDB, curr); 02337 } 02338 02339 // More complicated case with subkey 02340 int version = 0; 02341 for (;;) { 02342 // Get version for key 02343 cur.SetCondition(CBDB_FileCursor::eGE); 02344 cur.From << key; 02345 cur.From << version; 02346 if (cur.FetchFirst() != eBDB_Ok) 02347 return false; 02348 const char* found_key = m_CacheAttrDB->key; 02349 if ( key != found_key ) 02350 return false; 02351 // We have version, now fetch subkey 02352 version = m_CacheAttrDB->version; 02353 cur.SetCondition(CBDB_FileCursor::eEQ); 02354 cur.From << key; 02355 cur.From << version; 02356 cur.From << subkey; 02357 if (cur.FetchFirst() == eBDB_Ok && 02358 !x_CheckTimeStampExpired(*m_CacheAttrDB, curr)) 02359 return true; 02360 ++version; 02361 } 02362 02363 return false; 02364 02365 /* I don't know why we need this extra check if we already have 02366 a record in CacheAttrDB. Moreover, for overflown blobs there is 02367 NO record in SplitStore DB. So I am commenting this out. 02368 joukovv 2007-11-27 02369 TBlobLock blob_lock(m_LockVector, blob_id, m_LockTimeout); 02370 02371 _ASSERT(blob_id); 02372 unsigned split_coord[2]; 02373 EBDB_ErrCode ret = 02374 m_BLOB_SplitStore->GetCoordinates(blob_id, split_coord); 02375 if (ret != eBDB_Ok) { 02376 return false; 02377 } */ 02378 } 02379 02380 02381 bool CBDB_Cache::Read(const string& key, 02382 int version, 02383 const string& subkey, 02384 void* buf, 02385 size_t buf_size) 02386 { 02387 EBDB_ErrCode ret; 02388 02389 time_t curr = time(0); 02390 int tz_delta = m_LocalTimer.GetLocalTimezone(); 02391 02392 int overflow = 0; 02393 unsigned volume_id = 0, split_id = 0; 02394 02395 unsigned blob_id = GetBlobId(key, version, subkey); 02396 if (!blob_id) return false; 02397 TBlobLock blob_lock(m_LockVector, blob_id, m_LockTimeout); 02398 02399 02400 CBDB_Transaction trans(*m_Env, 02401 CBDB_Transaction::eTransASync, // async! 02402 CBDB_Transaction::eNoAssociation); 02403 02404 02405 {{ 02406 CFastMutexGuard guard(m_DB_Lock); 02407 m_CacheAttrDB->SetTransaction(&trans); 02408 CBDB_FileCursor cur(*m_CacheAttrDB, trans, 02409 CBDB_FileCursor::eReadModifyUpdate); 02410 cur.SetCondition(CBDB_FileCursor::eEQ); 02411 cur.From << key << version << subkey; 02412 ret = cur.Fetch(); 02413 if (ret == eBDB_Ok) { 02414 if (m_TimeStampFlag & fCheckExpirationAlways) { 02415 if (x_CheckTimeStampExpired(*m_CacheAttrDB, curr)) { 02416 return false; 02417 } 02418 } 02419 unsigned read_count = m_CacheAttrDB->read_count; 02420 m_CacheAttrDB->read_count = read_count + 1; 02421 02422 unsigned max_time = m_CacheAttrDB->max_time; 02423 if (max_time == 0 || max_time >= (unsigned)curr) { 02424 if ( m_TimeStampFlag & fTimeStampOnRead ) { 02425 m_CacheAttrDB->time_stamp = (unsigned)curr; 02426 } 02427 } 02428 02429 blob_id = m_CacheAttrDB->blob_id; 02430 overflow = m_CacheAttrDB->overflow; 02431 volume_id = m_CacheAttrDB->volume_id; 02432 split_id = m_CacheAttrDB->split_id; 02433 02434 ret = cur.Update(); 02435 02436 } else { 02437 return false; 02438 } 02439 }} // m_DB_Lock 02440 02441 if (ret != eBDB_Ok) { 02442 return false; 02443 } 02444 _ASSERT(blob_id); 02445 trans.Commit(); 02446 02447 // FIXME: locks, etc 02448 string owner_name; 02449 m_CacheAttrDB->owner_name.ToString(owner_name); 02450 if (IsSaveStatistics()) { 02451 CFastMutexGuard guard(m_DB_Lock); 02452 m_Statistics.AddRead(owner_name, curr - tz_delta); 02453 } 02454 02455 // read the data 02456 02457 02458 if (overflow) { 02459 string path; 02460 s_MakeOverflowFileName(path, m_Path, GetName(),key, version, subkey); 02461 02462 auto_ptr<CNcbiIfstream> 02463 overflow_file(new CNcbiIfstream(path.c_str(), 02464 IOS_BASE::in | IOS_BASE::binary)); 02465 if (!overflow_file->is_open()) { 02466 // TODO: Kill the registration record 02467 return false; 02468 } 02469 overflow_file->read((char*)buf, buf_size); 02470 if (!*overflow_file) { 02471 return false; 02472 } 02473 } 02474 else { 02475 if (blob_id == 0) { 02476 return false; 02477 } 02478 m_BLOB_SplitStore->SetTransaction(0); 02479 02480 unsigned coords[2]; 02481 ret = m_BLOB_SplitStore->GetCoordinates(blob_id, coords); 02482 if (ret == eBDB_Ok) { 02483 if (coords[0] != volume_id || 02484 coords[1] != split_id) { 02485 // TODO: restore de-mux mapping 02486 } 02487 } else { 02488 // TODO: restore de-mux mapping 02489 } 02490 02491 coords[0] = volume_id; 02492 coords[1] = split_id; 02493 02494 size_t blob_size; 02495 ret = m_BLOB_SplitStore->Fetch(blob_id, coords, 02496 &buf, buf_size, 02497 CBDB_RawFile::eReallocForbidden, 02498 &blob_size); 02499 02500 if (ret != eBDB_Ok) { 02501 return false; 02502 } 02503 } 02504 return true; 02505 02506 02507 02508 /* 02509 02510 {{ 02511 CFastMutexGuard guard(m_DB_Lock); 02512 m_CacheAttrDB->SetTransaction(0); 02513 02514 int overflow; 02515 unsigned int ttl, blob_id, volume_id, split_id; 02516 bool rec_exists = 02517 x_RetrieveBlobAttributes(key, version, subkey, 02518 overflow, ttl, 02519 blob_id, volume_id, split_id); 02520 if (!rec_exists) { 02521 return false; 02522 } 02523 02524 // check expiration 02525 if (m_TimeStampFlag & fCheckExpirationAlways) { 02526 if (x_CheckTimeStampExpired()) { 02527 return false; 02528 } 02529 } 02530 02531 m_CacheAttrDB->owner_name.ToString(m_TmpOwnerName); 02532 m_Statistics.AddRead(m_TmpOwnerName, curr - tz_delta); 02533 02534 02535 if (overflow) { 02536 string path; 02537 s_MakeOverflowFileName(path, m_Path, GetName(),key, version, subkey); 02538 02539 auto_ptr<CNcbiIfstream> 02540 overflow_file(new CNcbiIfstream(path.c_str(), 02541 IOS_BASE::in | IOS_BASE::binary)); 02542 if (!overflow_file->is_open()) { 02543 return false; 02544 } 02545 overflow_file->read((char*)buf, buf_size); 02546 if (!*overflow_file) { 02547 return false; 02548 } 02549 } 02550 else { 02551 02552 if (blob_id == 0) { 02553 return false; 02554 } 02555 02556 m_BLOB_SplitStore->SetTransaction(0); 02557 02558 02559 unsigned coords[2]; 02560 ret = m_BLOB_SplitStore->GetCoordinates(blob_id, coords); 02561 if (ret == eBDB_Ok) { 02562 if (coords[0] != volume_id || 02563 coords[1] != split_id) { 02564 // TO DO: restore de-mux mapping 02565 } 02566 } else { 02567 // TODO: restore de-mux mapping 02568 } 02569 02570 coords[0] = volume_id; 02571 coords[1] = split_id; 02572 02573 size_t blob_size; 02574 02575 ret = 02576 m_BLOB_SplitStore->Fetch(blob_id, coords, 02577 &buf, buf_size, 02578 CBDB_RawFile::eReallocForbidden, 02579 &blob_size); 02580 02581 if (ret != eBDB_Ok) { 02582 return false; 02583 } 02584 } 02585 02586 02587 02588 if ( m_TimeStampFlag & fTimeStampOnRead ) { 02589 m_CacheAttrDB->SetTransaction(&trans); 02590 x_UpdateReadAccessTime(key, version, subkey, trans); 02591 } 02592 02593 }} // m_DB_Lock 02594 02595 trans.Commit(); 02596 02597 02598 return true; 02599 */ 02600 02601 } 02602 02603 IReader* CBDB_Cache::x_CreateOverflowReader(const string& key, 02604 int version, 02605 const string& subkey, 02606 size_t& file_length, 02607 TBlobLock& blob_lock) 02608 { 02609 string path; 02610 s_MakeOverflowFileName(path, m_Path, GetName(), key, version, subkey); 02611 auto_ptr<CNcbiIfstream> 02612 overflow_file(new CNcbiIfstream(path.c_str(), 02613 IOS_BASE::in | IOS_BASE::binary)); 02614 if (!overflow_file->is_open()) { 02615 return 0; 02616 } 02617 CFile entry(path); 02618 file_length = (size_t) entry.GetLength(); 02619 02620 return new CBDB_CacheIReader(*this, overflow_file.release(), blob_lock); 02621 } 02622 02623 02624 IReader* CBDB_Cache::GetReadStream(const string& key, 02625 int version, 02626 const string& subkey) 02627 { 02628 EBDB_ErrCode ret; 02629 02630 time_t curr = time(0); 02631 int tz_delta = m_LocalTimer.GetLocalTimezone(); 02632 int overflow = 0; 02633 unsigned volume_id = 0, split_id = 0; 02634 02635 unsigned blob_id = GetBlobId(key, version, subkey); 02636 if (!blob_id) return 0; 02637 TBlobLock blob_lock(m_LockVector, blob_id, m_LockTimeout); 02638 02639 02640 // TODO: unify read prolog code for all read functions 02641 02642 CBDB_Transaction trans(*m_Env, 02643 CBDB_Transaction::eTransASync, // async! 02644 CBDB_Transaction::eNoAssociation); 02645 02646 {{ 02647 CFastMutexGuard guard(m_DB_Lock); 02648 {{ // TODO: nested scope here is nonsensical 02649 m_CacheAttrDB->SetTransaction(&trans); 02650 CBDB_FileCursor cur(*m_CacheAttrDB, trans, 02651 CBDB_FileCursor::eReadModifyUpdate); 02652 cur.SetCondition(CBDB_FileCursor::eEQ); 02653 cur.From << key << version << subkey; 02654 ret = cur.Fetch(); 02655 if (ret == eBDB_Ok) { 02656 if (m_TimeStampFlag & fCheckExpirationAlways) { 02657 if (x_CheckTimeStampExpired(*m_CacheAttrDB, curr)) { 02658 return false; 02659 } 02660 } 02661 unsigned read_count = m_CacheAttrDB->read_count; 02662 m_CacheAttrDB->read_count = read_count + 1; 02663 02664 unsigned max_time = m_CacheAttrDB->max_time; 02665 if (max_time == 0 || max_time >= (unsigned) curr) { 02666 if ( m_TimeStampFlag & fTimeStampOnRead ) { 02667 m_CacheAttrDB->time_stamp = (unsigned)curr; 02668 } 02669 } 02670 02671 blob_id = m_CacheAttrDB->blob_id; 02672 overflow = m_CacheAttrDB->overflow; 02673 volume_id = m_CacheAttrDB->volume_id; 02674 split_id = m_CacheAttrDB->split_id; 02675 02676 ret = cur.Update(); 02677 02678 } else { 02679 return false; 02680 } 02681 }} // cursor 02682 }} // m_DB_Lock 02683 02684 if (ret != eBDB_Ok) { 02685 return false; 02686 } 02687 _ASSERT(blob_id); 02688 trans.Commit(); 02689 02690 // FIXME: locks, statistics, etc 02691 string owner_name; 02692 m_CacheAttrDB->owner_name.ToString(owner_name); 02693 if (IsSaveStatistics()) { 02694 CFastMutexGuard guard(m_DB_Lock); 02695 m_Statistics.AddRead(owner_name, curr - tz_delta); 02696 } 02697 02698 if (overflow) { 02699 size_t bsize; 02700 auto_ptr<IReader> rd; 02701 rd.reset( 02702 x_CreateOverflowReader(key, version, subkey, bsize, blob_lock)); 02703 return rd.release(); 02704 } 02705 02706 // Inline BLOB, reading from BDB storage 02707 if (blob_id == 0) { 02708 return 0; 02709 } 02710 02711 unsigned coords[2]; 02712 ret = m_BLOB_SplitStore->GetCoordinates(blob_id, coords); 02713 if (ret == eBDB_Ok) { 02714 if (coords[0] != volume_id || 02715 coords[1] != split_id) { 02716 // TODO: restore de-mux mapping 02717 } 02718 } else { 02719 // TODO: restore de-mux mapping 02720 } 02721 02722 coords[0] = volume_id; 02723 coords[1] = split_id; 02724 02725 m_BLOB_SplitStore->SetTransaction(0); 02726 02727 auto_ptr<CBDB_RawFile::TBuffer> buffer(new CBDB_RawFile::TBuffer(8 * 1024)); 02728 ret = m_BLOB_SplitStore->ReadRealloc(blob_id, coords, *buffer); 02729 if (ret != eBDB_Ok) { 02730 return 0; 02731 } 02732 return new CBDB_CacheIReader(*this, buffer.release(), blob_lock); 02733 02734 02735 /* 02736 {{ 02737 CFastMutexGuard guard(m_DB_Lock); 02738 m_CacheAttrDB->SetTransaction(0); 02739 02740 bool rec_exists = 02741 x_RetrieveBlobAttributes(key, version, subkey, 02742 overflow, ttl, 02743 blob_id, volume_id, split_id); 02744 if (!rec_exists) { 02745 return 0; 02746 } 02747 // check expiration 02748 if (m_TimeStampFlag & fCheckExpirationAlways) { 02749 if (x_CheckTimeStampExpired()) { 02750 return 0; 02751 } 02752 } 02753 02754 m_CacheAttrDB->owner_name.ToString(m_TmpOwnerName); 02755 m_Statistics.AddRead(m_TmpOwnerName, curr - tz_delta); 02756 02757 // Update the time stamp 02758 if ( m_TimeStampFlag & fTimeStampOnRead ) { 02759 CBDB_Transaction trans(*m_Env, 02760 CBDB_Transaction::eEnvDefault, 02761 CBDB_Transaction::eNoAssociation); 02762 m_CacheAttrDB->SetTransaction(&trans); 02763 02764 x_UpdateReadAccessTime(key, version, subkey, trans); 02765 02766 trans.Commit(); 02767 } 02768 }} // m_DB_Lock 02769 02770 02771 02772 // Check if it's an overflow BLOB (external file) 02773 {{ 02774 size_t bsize; 02775 auto_ptr<IReader> rd; 02776 rd.reset(x_CreateOverflowReader(overflow, key, version, subkey, bsize)); 02777 if (rd.get()) { 02778 if ( m_TimeStampFlag & fTimeStampOnRead ) { 02779 CBDB_Transaction trans(*m_Env, 02780 CBDB_Transaction::eEnvDefault, 02781 CBDB_Transaction::eNoAssociation); 02782 {{ 02783 CFastMutexGuard guard(m_DB_Lock); 02784 x_UpdateReadAccessTime(key, version, subkey, trans); 02785 }} // m_DB_Lock 02786 trans.Commit(); 02787 } 02788 return rd.release(); 02789 } 02790 }} 02791 02792 02793 // Inline BLOB, reading from BDB storage 02794 if (blob_id == 0) { 02795 return 0; 02796 } 02797 02798 unsigned coords[2]; 02799 ret = m_BLOB_SplitStore->GetCoordinates(blob_id, coords); 02800 if (ret == eBDB_Ok) { 02801 if (coords[0] != volume_id || 02802 coords[1] != split_id) { 02803 // TO DO: restore de-mux mapping 02804 } 02805 } else { 02806 // TODO: restore de-mux mapping 02807 } 02808 02809 coords[0] = volume_id; 02810 coords[1] = split_id; 02811 02812 m_BLOB_SplitStore->SetTransaction(0); 02813 02814 auto_ptr<CBDB_RawFile::TBuffer> buffer(new CBDB_RawFile::TBuffer(4096)); 02815 ret = m_BLOB_SplitStore->ReadRealloc(blob_id, coords, *buffer); 02816 if (ret != eBDB_Ok) { 02817 return 0; 02818 } 02819 return new CBDB_CacheIReader(*this, buffer.release()); 02820 */ 02821 } 02822 02823 IReader* CBDB_Cache::GetReadStream(const string& /* key */, 02824 const string& /* subkey */, 02825 int* /* version */, 02826 ICache::EBlobVersionValidity* /* validity */) 02827 { 02828 NCBI_USER_THROW("CBDB_Cache::GetReadStream(" 02829 "key, subkey, &version, &validity) is not implemented"); 02830 } 02831 02832 02833 void CBDB_Cache::SetBlobVersionAsCurrent(const string& /* key */, 02834 const string& /* subkey */, 02835 int /* version */) 02836 { 02837 NCBI_USER_THROW("CBDB_Cache::GetReadStream(" 02838 "key, subkey, version) is not implemented"); 02839 } 02840 02841 02842 void CBDB_Cache::GetBlobAccess(const string& key, 02843 int version, 02844 const string& subkey, 02845 SBlobAccessDescr* blob_descr) 02846 { 02847 _ASSERT(blob_descr); 02848 02849 blob_descr->reader.reset(); 02850 blob_descr->blob_size = 0; 02851 blob_descr->blob_found = false; 02852 02853 EBDB_ErrCode ret; 02854 02855 time_t curr = time(0); 02856 int tz_delta = m_LocalTimer.GetLocalTimezone(); 02857 int overflow = 0; 02858 unsigned volume_id = 0, split_id = 0; 02859 02860 unsigned blob_id = GetBlobId(key, version, subkey); 02861 if (!blob_id) { 02862 //_TRACE("CBDB_Cache::GetBlobAccess return 1"); 02863 return; 02864 } 02865 TBlobLock blob_lock(m_LockVector, blob_id, m_LockTimeout); 02866 02867 02868 // TODO: unify read prolog code for all read functions 02869 02870 CBDB_Transaction trans(*m_Env, 02871 CBDB_Transaction::eTransASync, // async! 02872 CBDB_Transaction::eNoAssociation); 02873 02874 {{ 02875 CFastMutexGuard guard(m_DB_Lock); 02876 {{ // TODO: nested scope here is nonsensical 02877 m_CacheAttrDB->SetTransaction(&trans); 02878 02879 CBDB_FileCursor cur(*m_CacheAttrDB, trans, 02880 CBDB_FileCursor::eReadModifyUpdate); 02881 cur.SetCondition(CBDB_FileCursor::eEQ); 02882 cur.From << key << version << subkey; 02883 ret = cur.Fetch(); 02884 if (ret == eBDB_Ok) { 02885 if (m_TimeStampFlag & fCheckExpirationAlways) { 02886 if (x_CheckTimeStampExpired(*m_CacheAttrDB, curr)) { 02887 //_TRACE("CBDB_Cache::GetBlobAccess return 2"); 02888 return; 02889 } 02890 } 02891 unsigned read_count = m_CacheAttrDB->read_count; 02892 m_CacheAttrDB->read_count = read_count + 1; 02893 02894 unsigned max_time = m_CacheAttrDB->max_time; 02895 if (max_time == 0 || max_time >= (unsigned) curr) { 02896 if ( m_TimeStampFlag & fTimeStampOnRead ) { 02897 m_CacheAttrDB->time_stamp = (unsigned)curr; 02898 } 02899 } 02900 02901 overflow = m_CacheAttrDB->overflow; 02902 volume_id = m_CacheAttrDB->volume_id; 02903 split_id = m_CacheAttrDB->split_id; 02904 02905 ret = cur.Update(); 02906 } else { 02907 //_TRACE("CBDB_Cache::GetBlobAccess return 3"); 02908 return; 02909 } 02910 }} // cursor 02911 }} // m_DB_Lock 02912 02913 if (ret != eBDB_Ok) { 02914 //_TRACE("CBDB_Cache::GetBlobAccess return 4"); 02915 return; 02916 } 02917 _ASSERT(blob_id); 02918 trans.Commit(); 02919 02920 // FIXME: locks, statistics, etc 02921 string owner_name; 02922 m_CacheAttrDB->owner_name.ToString(owner_name); 02923 if (IsSaveStatistics()) { 02924 CFastMutexGuard guard(m_DB_Lock); 02925 m_Statistics.AddRead(owner_name, curr - tz_delta); 02926 } 02927 02928 02929 // Here we have a race(sic!) between overflow flag read/write and blob_id locking 02930 // same is in all read functions! 02931 // this race may affects coordinate mapping too (but it is auto-restored later) 02932 02933 if (overflow) { 02934 blob_descr->reader.reset( 02935 x_CreateOverflowReader(key, version, subkey, 02936 blob_descr->blob_size, blob_lock)); 02937 if (blob_descr->reader.get()) { 02938 blob_descr->blob_found = true; 02939 return; 02940 } else { 02941 } 02942 } 02943 02944 // Inline BLOB, reading from BDB storage 02945 if (blob_id == 0) { 02946 //_TRACE("CBDB_Cache::GetBlobAccess return 5"); 02947 return; 02948 } 02949 02950 m_BLOB_SplitStore->SetTransaction(0); 02951 02952 unsigned coords[2]; 02953 ret = m_BLOB_SplitStore->GetCoordinates(blob_id, coords); 02954 if (ret == eBDB_Ok) { 02955 if (coords[0] != volume_id || 02956 coords[1] != split_id) { 02957 // TO DO: restore de-mux mapping 02958 } else { 02959 // TODO: restore de-mux mapping 02960 } 02961 } else { 02962 } 02963 02964 coords[0] = volume_id; 02965 coords[1] = split_id; 02966 02967 if (blob_descr->buf && blob_descr->buf_size) { 02968 // use speculative read (hope provided buffer is sufficient) 02969 try { 02970 02971 char** ptr = &blob_descr->buf; 02972 ret = m_BLOB_SplitStore->Fetch(blob_id, coords, 02973 (void**)ptr, 02974 blob_descr->buf_size, 02975 CBDB_RawFile::eReallocForbidden, 02976 &blob_descr->blob_size); 02977 if (ret == eBDB_Ok) { 02978 blob_descr->blob_found = true; 02979 return; 02980 } else { 02981 } 02982 } catch (CBDB_Exception&) { 02983 } 02984 } 02985 02986 // speculative Fetch failed (or impossible), read it another way 02987 // 02988 // TODO: Add more realistic estimation of the BLOB size based on split 02989 // 02990 auto_ptr<CBDB_RawFile::TBuffer> buffer( 02991 new CBDB_RawFile::TBuffer(8 * 1024)); 02992 ret = m_BLOB_SplitStore->ReadRealloc(blob_id, coords, *buffer); 02993 if (ret != eBDB_Ok) { 02994 //_TRACE("CBDB_Cache::GetBlobAccess return 6"); 02995 return; 02996 } 02997 blob_descr->blob_found = true; 02998 blob_descr->blob_size = buffer->size(); 02999 blob_descr->reader.reset( 03000 new CBDB_CacheIReader(*this, buffer.release(), blob_lock)); 03001 03002 03003 /* 03004 03005 EBDB_ErrCode ret; 03006 int overflow; 03007 unsigned int ttl, blob_id, volume_id, split_id; 03008 03009 time_t curr = time(0); 03010 int tz_delta = m_LocalTimer.GetLocalTimezone(); 03011 03012 {{ 03013 CBDB_Transaction trans(*m_Env, 03014 CBDB_Transaction::eEnvDefault, 03015 CBDB_Transaction::eNoAssociation); 03016 03017 {{ 03018 CFastMutexGuard guard(m_DB_Lock); 03019 m_CacheAttrDB->SetTransaction(0); 03020 03021 bool rec_exists = 03022 x_RetrieveBlobAttributes(key, version, subkey, 03023 overflow, ttl, 03024 blob_id, volume_id, split_id); 03025 if (!rec_exists) { 03026 return; 03027 } 03028 // check expiration 03029 if (m_TimeStampFlag & fCheckExpirationAlways) { 03030 if (x_CheckTimeStampExpired()) { 03031 return; 03032 } 03033 } 03034 03035 m_CacheAttrDB->owner_name.ToString(m_TmpOwnerName); 03036 m_Statistics.AddRead(m_TmpOwnerName, curr - tz_delta); 03037 03038 // Update the time stamp 03039 if ( m_TimeStampFlag & fTimeStampOnRead ) { 03040 x_UpdateReadAccessTime(key, version, subkey, trans); 03041 } 03042 }} // m_DB_Lock 03043 03044 trans.Commit(); 03045 }} 03046 03047 // Check if it's an overflow BLOB (external file) 03048 // 03049 blob_descr->reader.reset( 03050 x_CreateOverflowReader(overflow, 03051 key, version, subkey, 03052 blob_descr->blob_size)); 03053 if (blob_descr->reader.get()) { 03054 blob_descr->blob_found = true; 03055 if ( m_TimeStampFlag & fTimeStampOnRead ) { 03056 CBDB_Transaction trans(*m_Env, 03057 CBDB_Transaction::eEnvDefault, 03058 CBDB_Transaction::eNoAssociation); 03059 {{ 03060 CFastMutexGuard guard(m_DB_Lock); 03061 x_UpdateReadAccessTime(key, version, subkey, trans); 03062 }} // m_DB_Lock 03063 trans.Commit(); 03064 } 03065 return; 03066 } 03067 03068 03069 // Inline BLOB, reading from BDB storage 03070 // 03071 if (blob_id == 0) { 03072 blob_descr->blob_found = false; 03073 return; 03074 } 03075 03076 03077 m_BLOB_SplitStore->SetTransaction(0); 03078 03079 unsigned coords[2]; 03080 ret = m_BLOB_SplitStore->GetCoordinates(blob_id, coords); 03081 if (ret == eBDB_Ok) { 03082 if (coords[0] != volume_id || 03083 coords[1] != split_id) { 03084 // TO DO: restore de-mux mapping 03085 } 03086 } else { 03087 // TODO: restore de-mux mapping 03088 } 03089 03090 coords[0] = volume_id; 03091 coords[1] = split_id; 03092 03093 03094 if (blob_descr->buf && blob_descr->buf_size) { 03095 // use speculative read (hope provided buffer is sufficient) 03096 try { 03097 03098 char** ptr = &blob_descr->buf; 03099 ret = m_BLOB_SplitStore->Fetch(blob_id, coords, 03100 (void**)ptr, 03101 blob_descr->buf_size, 03102 CBDB_RawFile::eReallocForbidden, 03103 &blob_descr->blob_size); 03104 if (ret == eBDB_Ok) { 03105 blob_descr->blob_found = true; 03106 return; 03107 } 03108 } catch (CBDB_Exception&) { 03109 } 03110 } 03111 03112 // speculative Fetch failed (or impossible), read it another way 03113 // 03114 // TODO: Add more realistic estimation of the BLOB size based on split 03115 // 03116 auto_ptr<CBDB_RawFile::TBuffer> buffer(new CBDB_RawFile::TBuffer(4096)); 03117 ret = m_BLOB_SplitStore->ReadRealloc(blob_id, coords, *buffer); 03118 if (ret != eBDB_Ok) { 03119 blob_descr->blob_found = false; 03120 return; 03121 } 03122 blob_descr->blob_found = true; 03123 blob_descr->blob_size = buffer->size(); 03124 blob_descr->reader.reset( 03125 new CBDB_CacheIReader(*this, buffer.release())); 03126 */ 03127 } 03128 03129 unsigned CBDB_Cache::GetBlobId(const string& key, 03130 int version, 03131 const string& subkey) 03132 { 03133 CFastMutexGuard guard(m_CARO1_Lock); 03134 03135 m_CacheAttrDB_RO1->SetTransaction(0); 03136 03137 m_CacheAttrDB_RO1->key = key; 03138 m_CacheAttrDB_RO1->version = version; 03139 m_CacheAttrDB_RO1->subkey = subkey; 03140 03141 if (m_CacheAttrDB_RO1->Fetch() == eBDB_Ok) { 03142 return m_CacheAttrDB_RO1->blob_id; 03143 } 03144 return 0; 03145 } 03146 03147 03148 IWriter* CBDB_Cache::GetWriteStream(const string& key, 03149 int version, 03150 const string& subkey, 03151 unsigned int time_to_live, 03152 const string& owner) 03153 { 03154 return GetWriteStream(0, key, version, subkey, true /*lock id*/, 03155 time_to_live, owner); 03156 } 03157 03158 03159 IWriter* CBDB_Cache::GetWriteStream(unsigned blob_id_ext, 03160 const string& key, 03161 int version, 03162 const string& subkey, 03163 bool do_id_lock, 03164 unsigned int time_to_live, 03165 const string& owner) 03166 { 03167 if (IsReadOnly()) { 03168 return 0; 03169 } 03170 03171 //_TRACE("CBDB_Cache::GetWriteStream point 1"); 03172 if (m_VersionFlag == eDropAll || m_VersionFlag == eDropOlder) { 03173 //_TRACE("CBDB_Cache::GetWriteStream point 2"); 03174 Purge(key, subkey, 0, m_VersionFlag); 03175 } 03176 03177 //_TRACE("CBDB_Cache::GetWriteStream point 3"); 03178 if (m_MaxTimeout) { 03179 if (time_to_live > m_MaxTimeout) { 03180 time_to_live = m_MaxTimeout; 03181 } 03182 } else { // m_MaxTimeout == 0 03183 if (m_MaxTTL_prolong != 0 && m_Timeout != 0) { 03184 time_to_live = min(m_Timeout * m_MaxTTL_prolong, time_to_live); 03185 } 03186 } 03187 time_t curr = time(0); 03188 int tz_delta = m_LocalTimer.GetLocalTimezone(); 03189 03190 03191 TBlobLock blob_lock(m_LockVector, m_LockTimeout); 03192 03193 // ---------------------------------------------------- 03194 // BLOB check-in, read attributes, lock the id 03195 // ---------------------------------------------------- 03196 unsigned coord[2] = {0,}; 03197 unsigned overflow; 03198 _VERIFY( 03199 BlobCheckIn(blob_id_ext, 03200 key, version, subkey, 03201 eBlobCheckIn_Create, 03202 blob_lock, do_id_lock, 03203 &coord[0], &coord[1], 03204 &overflow) 03205 != EBlobCheckIn_NotFound); 03206 _ASSERT(blob_lock.GetId()); 03207 03208 03209 return 03210 new CBDB_CacheIWriter(*this, 03211 m_Path.c_str(), 03212 blob_lock.GetId(), 03213 key, 03214 version, 03215 subkey, 03216 *m_CacheAttrDB, 03217 time_to_live, 03218 curr - tz_delta, 03219 owner, 03220 blob_lock); 03221 } 03222 03223 03224 void CBDB_Cache::Remove(const string& key) 03225 { 03226 if (IsReadOnly()) { 03227 return; 03228 } 03229 03230 vector<SCacheDescr> cache_elements; 03231 03232 // Search the records to delete 03233 03234 {{ 03235 CFastMutexGuard guard(m_DB_Lock); 03236 m_CacheAttrDB->SetTransaction(0); 03237 03238 CBDB_FileCursor cur(*m_CacheAttrDB); 03239 cur.SetCondition(CBDB_FileCursor::eEQ); 03240 03241 cur.From << key; 03242 while (cur.Fetch() == eBDB_Ok) { 03243 int version = m_CacheAttrDB->version; 03244 const char* subkey = m_CacheAttrDB->subkey; 03245 03246 int overflow = m_CacheAttrDB->overflow; 03247 unsigned blob_id = m_CacheAttrDB->blob_id; 03248 03249 cache_elements.push_back( 03250 SCacheDescr(key, version, subkey, overflow, blob_id)); 03251 03252 if (IsSaveStatistics()) { 03253 unsigned read_count = m_CacheAttrDB->read_count; 03254 string owner_name; 03255 m_CacheAttrDB->owner_name.ToString(owner_name); 03256 if (0 == read_count) { 03257 m_Statistics.AddNeverRead(owner_name); 03258 } 03259 m_Statistics.AddExplDelete(owner_name); 03260 x_UpdateOwnerStatOnDelete(owner_name, true/*expl-delete*/); 03261 } 03262 } 03263 }} // m_DB_Lock 03264 03265 {{ 03266 CBDB_Transaction trans(*m_Env, 03267 CBDB_Transaction::eEnvDefault, 03268 CBDB_Transaction::eNoAssociation); 03269 03270 // Now delete all objects 03271 03272 ITERATE(vector<SCacheDescr>, it, cache_elements) { 03273 {{ 03274 CFastMutexGuard guard(m_DB_Lock); 03275 m_BLOB_SplitStore->SetTransaction(&trans); 03276 03277 x_DropBlob(it->key, 03278 it->version, 03279 it->subkey, 03280 it->overflow, 03281 it->blob_id, 03282 trans); 03283 03284 }} // m_DB_Lock 03285 03286 {{ 03287 CFastMutexGuard guard(m_TimeLine_Lock); 03288 m_TimeLine->RemoveObject(it->blob_id); 03289 }} // m_TimeLine_Lock 03290 03291 } 03292 03293 trans.Commit(); 03294 }} 03295 03296 } 03297 03298 void CBDB_Cache::Remove(const string& key, 03299 int version, 03300 const string& subkey) 03301 { 03302 if (IsReadOnly()) { 03303 return; 03304 } 03305 03306 // Search the records to delete 03307 03308 vector<SCacheDescr> cache_elements; 03309 03310 {{ 03311 CFastMutexGuard guard(m_DB_Lock); 03312 m_CacheAttrDB->SetTransaction(0); 03313 03314 CBDB_FileCursor cur(*m_CacheAttrDB); 03315 cur.SetCondition(CBDB_FileCursor::eEQ); 03316 03317 cur.From << key << version << subkey; 03318 while (cur.Fetch() == eBDB_Ok) { 03319 int overflow = m_CacheAttrDB->overflow; 03320 unsigned blob_id = m_CacheAttrDB->blob_id; 03321 03322 cache_elements.push_back( 03323 SCacheDescr(key, version, subkey, overflow, blob_id)); 03324 03325 if (IsSaveStatistics()) { 03326 unsigned read_count = m_CacheAttrDB->read_count; 03327 string owner_name; 03328 m_CacheAttrDB->owner_name.ToString(owner_name); 03329 if (0 == read_count) { 03330 m_Statistics.AddNeverRead(owner_name); 03331 } 03332 m_Statistics.AddExplDelete(owner_name); 03333 x_UpdateOwnerStatOnDelete(owner_name, true/*expl-delete*/); 03334 } 03335 } 03336 03337 }} // m_DB_Lock 03338 03339 03340 // TODO: add blob id locking here 03341 // 03342 CBDB_Transaction trans(*m_Env, 03343 CBDB_Transaction::eEnvDefault, 03344 CBDB_Transaction::eNoAssociation); 03345 03346 ITERATE(vector<SCacheDescr>, it, cache_elements) { 03347 {{ 03348 CFastMutexGuard guard(m_DB_Lock); 03349 m_BLOB_SplitStore->SetTransaction(&trans); 03350 03351 x_DropBlob(it->key, 03352 it->version, 03353 it->subkey, 03354 it->overflow, 03355 it->blob_id, 03356 trans); 03357 }} // m_DB_Lock 03358 03359 {{ 03360 CFastMutexGuard guard(m_TimeLine_Lock); 03361 m_TimeLine->RemoveObject(it->blob_id); 03362 }} // m_TimeLine_Lock 03363 } 03364 03365 trans.Commit(); 03366 } 03367 03368 03369 time_t CBDB_Cache::GetAccessTime(const string& key, 03370 int version, 03371 const string& subkey) 03372 { 03373 _ASSERT(m_CacheAttrDB); 03374 CFastMutexGuard guard(m_DB_Lock); 03375 m_CacheAttrDB->SetTransaction(0); 03376 03377 m_CacheAttrDB->key = key; 03378 m_CacheAttrDB->version = version; 03379 m_CacheAttrDB->subkey = subkey; 03380 //(m_TimeStampFlag & fTrackSubKey) ? subkey : ""; 03381 03382 EBDB_ErrCode ret = m_CacheAttrDB->Fetch(); 03383 if (ret != eBDB_Ok) { 03384 return 0; 03385 } 03386 03387 return (int) m_CacheAttrDB->time_stamp; 03388 } 03389 03390 void CBDB_Cache::SetPurgeBatchSize(unsigned batch_size) 03391 { 03392 CFastMutexGuard guard(m_DB_Lock); 03393 m_PurgeBatchSize = batch_size; 03394 } 03395 03396 unsigned CBDB_Cache::GetPurgeBatchSize() const 03397 { 03398 CFastMutexGuard guard(m_DB_Lock); 03399 return m_PurgeBatchSize; 03400 } 03401 03402 void CBDB_Cache::SetBatchSleep(unsigned sleep) 03403 { 03404 CFastMutexGuard guard(m_DB_Lock); 03405 m_BatchSleep = sleep; 03406 } 03407 03408 unsigned CBDB_Cache::GetBatchSleep() const 03409 { 03410 CFastMutexGuard guard(m_DB_Lock); 03411 return m_BatchSleep; 03412 } 03413 03414 void CBDB_Cache::StopPurge() 03415 { 03416 CFastMutexGuard guard(m_DB_Lock); 03417 m_PurgeStopSignal.Post(); 03418 } 03419 03420 03421 void CBDB_Cache::WriteOverflow(CNcbiOfstream& overflow_file, 03422 const string& overflow_file_path, 03423 const char* buf, 03424 streamsize count) 03425 { 03426 overflow_file.write(buf, count); 03427 if (overflow_file.bad()) { 03428 03429 overflow_file.close(); 03430 03431 string err("Overflow file IO error "); 03432 err += overflow_file_path; 03433 03434 x_DropOverflow(overflow_file_path.c_str()); 03435 03436 BDB_THROW(eOverflowFileIO, err); 03437 } 03438 } 03439 03440 void CBDB_Cache::AddToTimeLine(unsigned blob_id, time_t exp_time) 03441 { 03442 {{ 03443 CFastMutexGuard guard(m_TimeLine_Lock); 03444 m_TimeLine->AddObject(exp_time, blob_id); 03445 }} 03446 } 03447 03448 /// @internal 03449 /// 03450 /// Sets specified flag to FALSE on destruction 03451 /// 03452 class CPurgeFlagGuard 03453 { 03454 public: 03455 CPurgeFlagGuard() 03456 : m_Flag(0) 03457 {} 03458 03459 ~CPurgeFlagGuard() 03460 { 03461 if (m_Flag) { 03462 *m_Flag = false; 03463 } 03464 } 03465 03466 void SetFlag(bool* flag) 03467 { 03468 m_Flag = flag; 03469 *m_Flag = true; 03470 } 03471 private: 03472 bool* m_Flag; 03473 }; 03474 03475 03476 void CBDB_Cache::EvaluateTimeLine(bool* interrupted) 03477 { 03478 time_t curr = time(0); 03479 03480 if (m_LastTimeLineCheck) { 03481 if ((curr - m_LastTimeLineCheck) < (time_t)m_TimeLine->GetDiscrFactor()) { 03482 if (m_Monitor && m_Monitor->IsActive()) { 03483 string msg = "Purge: Timeline evaluation skiped "; 03484 msg += "(early wakeup for this precision) remains="; 03485 msg += NStr::ULongToString( 03486 (unsigned long)(m_TimeLine->GetDiscrFactor() - 03487 (curr - m_LastTimeLineCheck))); 03488 msg += " precision="; 03489 msg += NStr::UIntToString(m_TimeLine->GetDiscrFactor()); 03490 msg += "\n"; 03491 m_Monitor->Send(msg); 03492 } 03493 03494 return; 03495 } 03496 } 03497 m_LastTimeLineCheck = curr; 03498 03499 TBitVector delete_candidates; 03500 {{ 03501 CFastMutexGuard guard(m_TimeLine_Lock); 03502 m_TimeLine->ExtractObjects(curr, &delete_candidates); 03503 if (!delete_candidates.any()) { 03504 if (m_Monitor && m_Monitor->IsActive()) { 03505 string msg = 03506 "Purge: Timeline evaluation exits " 03507 "(no candidates) \n"; 03508 m_Monitor->Send(msg); 03509 } 03510 03511 return; 03512 } 03513 }} 03514 03515 unsigned batch_size = m_PurgeBatchSize; 03516 if (batch_size < 10) { 03517 batch_size = 10; 03518 } 03519 03520 unsigned candidates = delete_candidates.count(); 03521 03522 delete_candidates -= m_GC_Deleted; 03523 unsigned cleaned_candidates = delete_candidates.count(); 03524 if (cleaned_candidates != candidates) { 03525 if (m_Monitor && m_Monitor->IsActive()) { 03526 string msg; 03527 msg = "Purge: Timeline total timeline candidates="; 03528 msg += NStr::UIntToString(candidates); 03529 msg += " cleaned_candidates="; 03530 msg += NStr::UIntToString(cleaned_candidates); 03531 msg += "\n"; 03532 03533 m_Monitor->Send(msg); 03534 } 03535 03536 candidates = cleaned_candidates; 03537 } 03538 03539 03540 {{ 03541 double id_trans_time = 0.0; // id translation time 03542 double exp_check_time = 0.0; // expiration check time 03543 double del_time = 0.0; // BLOB delete time 03544 unsigned deleted_cnt = 0; // total number of deleted BLOBs 03545 03546 vector<unsigned> blob_id_vect(batch_size); 03547 vector<SCacheDescr> blob_batch_vect(batch_size); 03548 vector<SCacheDescr> blob_exp_vect(batch_size); 03549 03550 for (TBitVector::enumerator en = delete_candidates.first(); en.valid();) { 03551 03552 // extract batch of blob ids out of the bit-vector 03553 // 03554 blob_id_vect.resize(0); 03555 blob_batch_vect.resize(0); 03556 blob_exp_vect.resize(0); 03557 for (unsigned i = 0; en.valid() && i < batch_size; ++i) { 03558 unsigned blob_id = *en; 03559 blob_id_vect.push_back(blob_id); 03560 ++en; 03561 } // for i 03562 03563 // Translate IDs to keys 03564 // 03565 if (blob_id_vect.size()) { 03566 CStopWatch sw(CStopWatch::eStart); 03567 vector<unsigned>::const_iterator it = blob_id_vect.begin(); 03568 unsigned blob_id = *it; 03569 03570 CFastMutexGuard guard(m_IDIDX_Lock_RO); 03571 m_CacheIdIDX_RO->SetTransaction(0); 03572 CBDB_FileCursor cur(*m_CacheIdIDX_RO); 03573 cur.SetCondition(CBDB_FileCursor::eGE); 03574 cur.From << blob_id; 03575 03576 while (true) { 03577 if (cur.Fetch() == eBDB_Ok) { 03578 unsigned bid = m_CacheIdIDX_RO->blob_id; 03579 if (blob_id != bid) { // record has been deleted by now 03580 if ((++it) == blob_id_vect.end()) 03581 break; 03582 cur.SetCondition(CBDB_FileCursor::eGE); 03583 cur.From << (blob_id = *it); 03584 03585 } else { 03586 int version = m_CacheIdIDX_RO->version; 03587 const char* key = m_CacheIdIDX_RO->key; 03588 const char* subkey = m_CacheIdIDX_RO->subkey; 03589 blob_batch_vect.push_back( 03590 SCacheDescr(key, version, subkey, 0, blob_id)); 03591 } 03592 } 03593 ++it; 03594 if (it == blob_id_vect.end()) { 03595 break; 03596 } 03597 // check if next blob can be get by fetch or we need to restart 03598 // the cursor 03599 // 03600 if ((blob_id + 1) == *it) { 03601 ++blob_id; 03602 } else { 03603 // cursor restart 03604 // 03605 cur.SetCondition(CBDB_FileCursor::eGE); 03606 cur.From << (blob_id = *it); 03607 } 03608 } // while 03609 03610 id_trans_time = sw.Elapsed(); 03611 03612 } // if 03613 03614 // Check expiration of BLOB keys 03615 // 03616 if (blob_batch_vect.size()) { 03617 time_t curr = time(0); // initial timepoint 03618 CStopWatch sw(CStopWatch::eStart); 03619 03620 {{ 03621 CFastMutexGuard guard(m_CARO2_Lock); 03622 03623 m_CacheAttrDB_RO2->SetTransaction(0); 03624 CBDB_FileCursor cur(*m_CacheAttrDB_RO2); 03625 03626 for (size_t i = 0; i < blob_batch_vect.size(); ++i) { 03627 SCacheDescr& blob_descr = blob_batch_vect[i]; 03628 cur.SetCondition(CBDB_FileCursor::eEQ); 03629 cur.From << blob_descr.key 03630 << blob_descr.version 03631 << blob_descr.subkey; 03632 if (cur.Fetch() == eBDB_Ok) { 03633 blob_descr.overflow = m_CacheAttrDB_RO2->overflow; 03634 _ASSERT(blob_descr.blob_id == m_CacheAttrDB_RO2->blob_id); 03635 blob_descr.blob_id = m_CacheAttrDB_RO2->blob_id; 03636 time_t exp_time; 03637 if (x_CheckTimeStampExpired( 03638 *m_CacheAttrDB_RO2, curr, &exp_time)) { 03639 03640 blob_exp_vect.push_back(blob_descr); 03641 03642 } else { 03643 {{ 03644 CFastMutexGuard guard(m_TimeLine_Lock); 03645 m_TimeLine->AddObject(exp_time, blob_descr.blob_id); 03646 }} // m_TimeLine_Lock 03647 } 03648 } 03649 } // for 03650 }} // m_CARO2_Lock 03651 03652 exp_check_time = sw.Elapsed(); 03653 } 03654 03655 unsigned deleted_batch_cnt = 0; 03656 // Delete the batch of expired BLOBs 03657 // 03658 if (blob_exp_vect.size()) { 03659 CStopWatch sw(CStopWatch::eStart); 03660 for (size_t i = 0; i < blob_exp_vect.size(); ++i) { 03661 03662 const SCacheDescr& blob_descr = blob_exp_vect[i]; 03663 CBDB_Transaction trans(*m_Env, 03664 CBDB_Transaction::eTransASync, // async! 03665 CBDB_Transaction::eNoAssociation); 03666 03667 if (m_Monitor && m_Monitor->IsActive()) { 03668 string msg = GetFastLocalTime().AsString(); 03669 msg += " Purge: DELETING \""; 03670 msg += blob_descr.key; 03671 msg += "\"-"; 03672 msg += NStr::UIntToString(blob_descr.version); 03673 msg += "-\""; 03674 msg += blob_descr.subkey; 03675 msg += "\"\n"; 03676 m_Monitor->Send(msg); 03677 } 03678 03679 bool blob_deleted = 03680 DropBlobWithExpCheck(blob_descr.key, 03681 blob_descr.version, 03682 blob_descr.subkey, 03683 trans); 03684 trans.Commit(); 03685 deleted_batch_cnt += blob_deleted; 03686 /* 03687 if (blob_deleted) { 03688 if (m_Monitor && m_Monitor->IsActive()) { 03689 string msg = 03690 "Purge: Timeline DELETE blob =" + blob_descr.key 03691 "\n"; 03692 m_Monitor->Send(msg); 03693 } 03694 } 03695 */ 03696 03697 } // for i 03698 03699 del_time = sw.Elapsed(); 03700 deleted_cnt += deleted_batch_cnt; 03701 } 03702 03703 if (m_Monitor && m_Monitor->IsActive()) { 03704 string msg; 03705 msg = "Purge: Timeline deleted_batch_cnt="; 03706 msg += NStr::UIntToString(deleted_batch_cnt); 03707 msg += " deleted_cnt="; 03708 msg += NStr::UIntToString(deleted_cnt); 03709 msg += " candidates="; 03710 msg += NStr::UIntToString(candidates); 03711 03712 msg += " id_trans_time="; 03713 msg += NStr::DoubleToString(id_trans_time, 5); 03714 msg += " exp_check_time="; 03715 msg += NStr::DoubleToString(exp_check_time, 5); 03716 msg += " del_time="; 03717 msg += NStr::DoubleToString(del_time, 5); 03718 msg += "\n"; 03719 03720 m_Monitor->Send(msg); 03721 } 03722 03723 m_GC_Deleted.optimize(); 03724 03725 // check for shutdown signal 03726 {{ 03727 unsigned delay = m_BatchSleep * 1000000; 03728 bool scan_stop = m_PurgeStopSignal.TryWait(0, delay); 03729 if (scan_stop) { 03730 if (interrupted) { 03731 *interrupted = true; 03732 return; 03733 } 03734 } 03735 }} 03736 03737 } // for en 03738 }} 03739 03740 /* 03741 for (; en.valid(); ++en) { 03742 unsigned blob_id = *en; 03743 try { 03744 CBDB_Transaction trans(*m_Env, 03745 CBDB_Transaction::eTransASync, // async! 03746 CBDB_Transaction::eNoAssociation); 03747 03748 bool blob_deleted = DropBlobWithExpCheck(blob_id, trans); 03749 03750 if (blob_deleted) { 03751 trans.Commit(); 03752 ++deleted; 03753 if (m_Monitor && m_Monitor->IsActive()) { 03754 string msg = 03755 "Purge: Timeline DELETE blob id=" + 03756 NStr::UIntToString(blob_id)+ "\n"; 03757 m_Monitor->Send(msg); 03758 } 03759 } 03760 03761 // BLOB delete part of timeline can be quite expensive, lock 03762 // some resources needed by competing threads 03763 // so this code introduces delay to give way to others 03764 // ideally this delay should be adaptive dictated by current server load 03765 if (++cnt >= batch_size) { 03766 // get to nanoseconds as CSemaphore::TryWait() needs 03767 unsigned delay = m_BatchSleep * 1000000; 03768 bool scan_stop = m_PurgeStopSignal.TryWait(0, delay); 03769 if (scan_stop) { 03770 if (interrupted) { 03771 *interrupted = true; 03772 return; 03773 } 03774 } 03775 cnt = 0; 03776 } 03777 03778 } 03779 catch (CBDB_ErrnoException& ex) 03780 { 03781 if (ex.IsRecovery()) { // serious stuff! need to quit 03782 throw; 03783 } 03784 LOG_POST_X(20, Error 03785 << "Purge suppressed exception when deleting BLOB " 03786 << ex.what()); 03787 if (m_Monitor && m_Monitor->IsActive()) { 03788 m_Monitor->Send(ex.what()); 03789 } 03790 03791 } 03792 catch(exception& ex) { 03793 LOG_POST_X(21, Error 03794 << "Purge suppressed exception when deleting BLOB " 03795 << ex.what()); 03796 if (m_Monitor && m_Monitor->IsActive()) { 03797 m_Monitor->Send(ex.what()); 03798 } 03799 } 03800 03801 } // for en 03802 03803 if (interrupted) { 03804 *interrupted = false; 03805 } 03806 03807 if (m_Monitor && m_Monitor->IsActive()) { 03808 string msg = 03809 "Purge: Timeline deleted=" + NStr::UIntToString(deleted) 03810 + " out of candidates=" + NStr::UIntToString(candidates) 03811 + "\n"; 03812 m_Monitor->Send(msg); 03813 } 03814 */ 03815 } 03816 03817 void CBDB_Cache::Purge(time_t access_timeout, 03818 EKeepVersions keep_last_version) 03819 { 03820 // Protect Purge form double (run from different threads) 03821 CPurgeFlagGuard pf_guard; 03822 {{ 03823 CFastMutexGuard guard(m_DB_Lock); 03824 if (m_PurgeNowRunning) 03825 return; 03826 pf_guard.SetFlag(&m_PurgeNowRunning); 03827 }} 03828 03829 03830 if (IsReadOnly()) { 03831 return; 03832 } 03833 03834 if (keep_last_version == eDropAll && access_timeout == 0) { 03835 CFastMutexGuard guard(m_DB_Lock); 03836 // TODO: SetTransaction here ??? Which one, zero? 03837 x_TruncateDB(); 03838 return; 03839 } 03840 purge_start: 03841 03842 bool scan_interrupted = false; 03843 EvaluateTimeLine(&scan_interrupted); 03844 if (scan_interrupted) { 03845 return; 03846 } 03847 03848 time_t gc_start = time(0); // time when we started GC scan 03849 03850 if (m_NextExpTime) { 03851 03852 // Mutiply to give new GC chance to kill first 03853 unsigned precision = m_TimeLine->GetDiscrFactor() * 3; 03854 03855 // [ precision ] 03856 // | 03857 // V 03858 // -----------*-------------------#<========>---------*-->> time 03859 // * # ^ 03860 // * # * 03861 // gc_start { Next Expiration } * 03862 // * * 03863 // ***************************************** 03864 // 03865 // wait until current time goes after next projected expiration, plus 03866 // timeline precision, otherwise no need to do scanning 03867 // 03868 if (gc_start < (m_NextExpTime + (time_t)precision)) { 03869 if (m_Monitor && m_Monitor->IsActive()) { 03870 unsigned remains = (unsigned)((m_NextExpTime + precision) - gc_start); 03871 unsigned rc = 0; 03872 {{ 03873 CFastMutexGuard guard(m_CARO2_Lock); 03874 m_CacheAttrDB_RO2->SetTransaction(0); 03875 rc = m_CacheAttrDB_RO2->CountRecs(); 03876 }} 03877 03878 TSplitStore::TBitVector bv; 03879 m_BLOB_SplitStore->GetIdVector(&bv); 03880 unsigned split_store_blobs = bv.count(); 03881 03882 TBitVector timeline_blobs; 03883 {{ 03884 CFastMutexGuard guard(m_TimeLine_Lock); 03885 m_TimeLine->EnumerateObjects(&timeline_blobs); 03886 }} 03887 unsigned tl_count = timeline_blobs.count(); 03888 03889 m_Monitor->Send( 03890 "Purge: scan skipped(nothing todo) remains=" 03891 + NStr::UIntToString(remains) 03892 + "s. precision=" + NStr::UIntToString(precision) 03893 + "s. RecCount=" + NStr::UIntToString(rc) 03894 + " SplitCount="+ NStr::UIntToString(split_store_blobs) 03895 + " TimeLineCount="+ NStr::UIntToString(tl_count) 03896 + "\n"); 03897 } 03898 return; 03899 } else { 03900 m_NextExpTime = 0; 03901 } 03902 03903 /* 03904 if (m_PurgeSkipCnt >= 50) { // do not yeild Purge more than N times 03905 m_PurgeSkipCnt = 0; 03906 } else { 03907 // Check if we nothing to purge 03908 if (!(m_NextExpTime <= gc_start)) { 03909 ++m_PurgeSkipCnt; 03910 if (m_Monitor && m_Monitor->IsActive()) { 03911 m_Monitor->Send("Purge: scan skipped(no delete candidates)\n"); 03912 } 03913 return; 03914 } 03915 } 03916 */ 03917 } 03918 03919 if (m_Monitor && m_Monitor->IsActive()) { 03920 m_Monitor->Send("Purge: scan started.\n"); 03921 } 03922 m_NextExpTime = 0; 03923 03924 03925 unsigned delay = 0; 03926 vector<SCacheDescr> cache_entries; 03927 cache_entries.reserve(1000); 03928 03929 unsigned db_recs = 0; 03930 03931 03932 // Search the database for obsolete cache entries 03933 string first_key, last_key; 03934 string next_exp_key; 03935 03936 for (bool flag = true; flag;) { 03937 unsigned batch_size = GetPurgeBatchSize(); 03938 unsigned rec_cnt; 03939 {{ 03940 time_t curr = time(0); // initial timepoint 03941 03942 CFastMutexGuard guard(m_CARO2_Lock); 03943 // get to nanoseconds as CSemaphore::TryWait() needs 03944 delay = m_BatchSleep * 1000000; 03945 03946 m_CacheAttrDB_RO2->SetTransaction(0); 03947 03948 CBDB_FileCursor cur(*m_CacheAttrDB_RO2); 03949 cur.SetCondition(CBDB_FileCursor::eGE); 03950 cur.From << last_key; 03951 03952 for (rec_cnt = 0; rec_cnt < batch_size; ++rec_cnt) { 03953 if (cur.Fetch() != eBDB_Ok) { 03954 flag = false; 03955 break; 03956 } 03957 03958 int version = m_CacheAttrDB_RO2->version; 03959 const char* key = m_CacheAttrDB_RO2->key; 03960 last_key = key; 03961 int overflow = m_CacheAttrDB_RO2->overflow; 03962 const char* subkey = m_CacheAttrDB_RO2->subkey; 03963 unsigned blob_id = m_CacheAttrDB_RO2->blob_id; 03964 03965 time_t exp_time; 03966 if (x_CheckTimeStampExpired(*m_CacheAttrDB_RO2, curr, &exp_time)) { 03967 03968 string owner_name; 03969 m_CacheAttrDB_RO2->owner_name.ToString(owner_name); 03970 03971 // FIXME: statistics, locking, etc 03972 if (IsSaveStatistics()) { 03973 CFastMutexGuard guard(m_DB_Lock); 03974 if (0 == m_CacheAttrDB_RO2->read_count) { 03975 m_Statistics.AddNeverRead(owner_name); 03976 } 03977 m_Statistics.AddPurgeDelete(owner_name); 03978 x_UpdateOwnerStatOnDelete(owner_name, 03979 false//non-expl-delete 03980 ); 03981 } 03982 03983 cache_entries.push_back( 03984 SCacheDescr(key, version, subkey, overflow, blob_id)); 03985 } else { 03986 ++db_recs; 03987 03988 {{ 03989 CFastMutexGuard guard(m_TimeLine_Lock); 03990 m_TimeLine->AddObject(exp_time, blob_id); 03991 }} // m_TimeLine_Lock 03992 03993 if (m_NextExpTime) { 03994 if (exp_time < m_NextExpTime) { 03995 // compute fraction of the difference 03996 // with sensitivity based on purge thread delay 03997 // we need this in order not to move point of restart 03998 // too agressively 03999 04000 double delta = fabs(double(m_NextExpTime - exp_time)); 04001 double fraction = fabs(double(delta / m_NextExpTime)); 04002 if (fraction > 0.1 && delta > m_PurgeThreadDelay) { 04003 next_exp_key = key; 04004 m_NextExpTime = exp_time; 04005 } 04006 } 04007 } else { 04008 m_NextExpTime = exp_time; 04009 next_exp_key = key; 04010 } 04011 04012 04013 } 04014 if (rec_cnt == 0) { // first record in the batch 04015 first_key = last_key; 04016 } else 04017 if (rec_cnt == (batch_size - 1)) { // last record 04018 // if batch stops in the same key we increase 04019 // the batch size to avoid the infinite loop 04020 // when new batch starts with the very same key 04021 // and never comes to the end 04022 // 04023 if (first_key == last_key) { 04024 batch_size += 10; 04025 } 04026 } 04027 04028 } // for i 04029 }} // m_CARO2_Lock 04030 04031 if (m_Monitor && m_Monitor->IsActive()) { 04032 string msg("Purge: Inspected "); 04033 msg += NStr::UIntToString(rec_cnt); 04034 msg += " records.\n"; 04035 04036 m_Monitor->Send(msg); 04037 } 04038 04039 04040 // Delete BLOBs if there are deletion candidates 04041 if (cache_entries.size() > 0) { 04042 if (m_Monitor && m_Monitor->IsActive()) { 04043 m_Monitor->Send("Purge:Deleting expired BLOBs...\n"); 04044 } 04045 04046 for (size_t i = 0; i < cache_entries.size(); ++i) { 04047 try { 04048 const SCacheDescr& it = cache_entries[i]; 04049 04050 CBDB_Transaction trans(*m_Env, 04051 CBDB_Transaction::eTransASync, // async! 04052 CBDB_Transaction::eNoAssociation); 04053 04054 if (m_Monitor && m_Monitor->IsActive()) { 04055 string msg = GetFastLocalTime().AsString(); 04056 msg += " Purge: DELETING \""; 04057 msg += it.key; 04058 msg += "\"-"; 04059 msg += NStr::UIntToString(it.version); 04060 msg += "-\""; 04061 msg += it.subkey; 04062 msg += "\"\n"; 04063 m_Monitor->Send(msg); 04064 } 04065 04066 {{ 04067 DropBlobWithExpCheck(it.key, 04068 it.version, 04069 it.subkey, 04070 trans); 04071 }} 04072 04073 /* 04074 {{ 04075 CFastMutexGuard guard(m_DB_Lock); 04076 m_BLOB_SplitStore->SetTransaction(&trans); 04077 04078 x_DropBlob(it.key, 04079 it.version, 04080 it.subkey, 04081 it.overflow, 04082 it.blob_id, 04083 trans); 04084 04085 }} // m_DB_Lock 04086 */ 04087 trans.Commit(); 04088 } 04089 catch (CBDB_ErrnoException& ex) 04090 { 04091 if (ex.IsRecovery()) { // serious stuff! need to quit 04092 throw; 04093 } 04094 LOG_POST_X(22, Error 04095 << "Purge suppressed exception when deleting BLOB " 04096 << ex.what()); 04097 if (m_Monitor && m_Monitor->IsActive()) { 04098 m_Monitor->Send(ex.what()); 04099 } 04100 04101 } 04102 catch(exception& ex) { 04103 LOG_POST_X(23, Error 04104 << "Purge suppressed exception when deleting BLOB " 04105 << ex.what()); 04106 if (m_Monitor && m_Monitor->IsActive()) { 04107 m_Monitor->Send(ex.what()); 04108 } 04109 } 04110 04111 } // for i 04112 04113 if (m_Monitor && m_Monitor->IsActive()) { 04114 string msg = 04115 "Purge: deleted " + 04116 NStr::SizetToString(cache_entries.size()) + 04117 " BLOBs\n"; 04118 m_Monitor->Send(msg); 04119 } 04120 04121 cache_entries.resize(0); 04122 } 04123 04124 bool purge_stop = m_PurgeStopSignal.TryWait(0, delay); 04125 if (purge_stop) { 04126 LOG_POST_X(24, Info << "BDB Cache: Stopping Purge execution."); 04127 return; 04128 } 04129 04130 EvaluateTimeLine(&scan_interrupted); 04131 if (scan_interrupted) { 04132 LOG_POST_X(25, Warning << "BDB Cache: Stopping Purge execution."); 04133 return; 04134 } 04135 04136 04137 } // for flag 04138 04139 04140 ++m_PurgeCount; 04141 04142 if (m_CleanLogOnPurge) { 04143 if ((m_PurgeCount % m_CleanLogOnPurge) == 0) { 04144 m_Env->CleanLog(); 04145 } 04146 } 04147 04148 if (IsSaveStatistics()) { 04149 CFastMutexGuard guard(m_DB_Lock); 04150 m_Statistics.GlobalStatistics().blobs_db = db_recs; 04151 } 04152 04153 if ((m_PurgeCount % 200) == 0) { 04154 m_BLOB_SplitStore->Save(); 04155 m_Env->ForceTransactionCheckpoint(); 04156 } 04157 else 04158 if ((m_PurgeCount % 50) == 0) { 04159 m_BLOB_SplitStore->FreeUnusedMem(); 04160 m_LockVector.FreeUnusedMem(); 04161 } 04162 04163 unsigned time_in_purge = 0; 04164 04165 // check if we want to rescan the database right away 04166 if (m_PurgeThreadDelay) { 04167 time_t curr = time(0); 04168 // we spent more time in Purge than we planned to sleep 04169 // it means we have a lot of records and need to run GC 04170 // continuosly 04171 time_in_purge = (unsigned)curr - (unsigned)gc_start; 04172 if (time_in_purge >= m_PurgeThreadDelay || 04173 (m_NextExpTime != 0 && (curr > m_NextExpTime))) { 04174 04175 if (m_Monitor && m_Monitor->IsActive()) { 04176 m_Monitor->Send("Purge: scan restarted. time=" + 04177 NStr::UIntToString(time_in_purge)+ "s.\n"); 04178 } 04179 04180 goto purge_start; 04181 } 04182 } 04183 04184 if (m_Monitor && m_Monitor->IsActive()) { 04185 m_Monitor->Send("Purge: scan ended. Scan time=" + 04186 NStr::UIntToString(time_in_purge)+ "s.\n"); 04187 } 04188 04189 } 04190 04191 04192 void CBDB_Cache::Purge(const string& key, 04193 const string& subkey, 04194 time_t access_timeout, 04195 EKeepVersions keep_last_version) 04196 { 04197 if (IsReadOnly()) { 04198 return; 04199 } 04200 04201 if (key.empty() && keep_last_version == eDropAll && access_timeout == 0) { 04202 CFastMutexGuard guard(m_DB_Lock); 04203 // TODO: SetTransaction ??? see TODO above 04204 x_TruncateDB(); 04205 return; 04206 } 04207 04208 // Search the database for obsolete cache entries 04209 vector<SCacheDescr> cache_entries; 04210 04211 unsigned recs_scanned = 0; 04212 04213 {{ 04214 CFastMutexGuard guard(m_DB_Lock); 04215 m_CacheAttrDB->SetTransaction(0); 04216 04217 CBDB_FileCursor cur(*m_CacheAttrDB); 04218 cur.SetCondition(CBDB_FileCursor::eEQ); 04219 04220 cur.From << key; 04221 04222 //CTime time_stamp(CTime::eCurrent); 04223 time_t curr = time(0); // (int)time_stamp.GetTimeT(); 04224 int timeout = GetTimeout(); 04225 if (access_timeout && access_timeout < timeout) { 04226 timeout = (int)access_timeout; 04227 } 04228 04229 while (cur.Fetch() == eBDB_Ok) { 04230 ++recs_scanned; 04231 04232 unsigned db_time_stamp = m_CacheAttrDB->time_stamp; 04233 int version = m_CacheAttrDB->version; 04234 const char* x_key = m_CacheAttrDB->key; 04235 int overflow = m_CacheAttrDB->overflow; 04236 string x_subkey = (const char*) m_CacheAttrDB->subkey; 04237 unsigned blob_id = m_CacheAttrDB->blob_id; 04238 04239 unsigned ttl = m_CacheAttrDB->ttl; 04240 unsigned to = timeout; 04241 04242 if (ttl) { // individual timeout 04243 if (m_MaxTimeout && ttl > m_MaxTimeout) { 04244 to = max((unsigned)timeout, (unsigned)m_MaxTimeout); 04245 } else { 04246 to = ttl; 04247 } 04248 } 04249 04250 if (subkey.empty()) { 04251 } 04252 04253 // check if timeout control has been requested and stored element 04254 // should not be removed 04255 if (access_timeout && (!(curr - to > db_time_stamp))) { 04256 continue; 04257 } 04258 04259 if (subkey.empty() || (subkey == x_subkey)) { 04260 cache_entries.push_back( 04261 SCacheDescr(x_key, version, x_subkey, overflow, blob_id)); 04262 04263 if (IsSaveStatistics()) { 04264 unsigned read_count = m_CacheAttrDB->read_count; 04265 string owner_name; 04266 m_CacheAttrDB->owner_name.ToString(owner_name); 04267 if (0 == read_count) { 04268 m_Statistics.AddNeverRead(owner_name); 04269 } 04270 m_Statistics.AddPurgeDelete(owner_name); 04271 x_UpdateOwnerStatOnDelete(owner_name, 04272 false/*non-expl-delete*/); 04273 } 04274 04275 04276 continue; 04277 } 04278 } // while 04279 04280 }} // m_DB_Lock 04281 04282 04283 ITERATE(vector<SCacheDescr>, it, cache_entries) { 04284 CBDB_Transaction trans(*m_Env, 04285 CBDB_Transaction::eEnvDefault, 04286 CBDB_Transaction::eNoAssociation); 04287 {{ 04288 CFastMutexGuard guard(m_DB_Lock); 04289 m_BLOB_SplitStore->SetTransaction(&trans); 04290 x_DropBlob(it->key, 04291 it->version, 04292 it->subkey, 04293 it->overflow, 04294 it->blob_id, 04295 trans); 04296 }} // m_DB_Lock 04297 04298 trans.Commit(); 04299 } 04300 } 04301 04302 void CBDB_Cache::Verify(const string& cache_path, 04303 const string& cache_name, 04304 const string& err_file, 04305 bool force_remove) 04306 { 04307 Close(); 04308 04309 m_Path = CDirEntry::AddTrailingPathSeparator(cache_path); 04310 04311 m_Env = new CBDB_Env(); 04312 04313 m_Env->SetCacheSize(10 * 1024 * 1024); 04314 m_Env->OpenErrFile(!err_file.empty() ? err_file : string("stderr")); 04315 try { 04316 m_Env->Open(cache_path, DB_INIT_MPOOL | DB_USE_ENVIRON); 04317 } 04318 catch (CBDB_Exception& /*ex*/) 04319 { 04320 m_Env->Open(cache_path, 04321 DB_CREATE | DB_INIT_MPOOL | DB_PRIVATE | DB_USE_ENVIRON); 04322 } 04323 04324 LOG_POST_X(26, "Cache location: " + string(cache_path)); 04325 04326 string cache_blob_db_name = 04327 string("lcs_") + string(cache_name) + string("_blob")+ string(".db"); 04328 string attr_db_name = 04329 string("lcs_") + string(cache_name) + string("_attr5") + string(".db"); 04330 04331 m_CacheAttrDB = new SCache_AttrDB(); 04332 m_CacheAttrDB->SetEnv(*m_Env); 04333 04334 string bak = m_Path + attr_db_name + ".bak"; 04335 FILE* fl = fopen(bak.c_str(), "wb"); 04336 04337 LOG_POST_X(27, "Running verification for: " + attr_db_name); 04338 m_CacheAttrDB->Verify(attr_db_name.c_str(), 0, fl); 04339 delete m_CacheAttrDB; m_CacheAttrDB = 0; 04340 04341 fclose(fl); 04342 04343 if (force_remove) { 04344 m_Env->ForceRemove(); 04345 } else { 04346 bool deleted = m_Env->Remove(); 04347 04348 if (!deleted) 04349 LOG_POST_X(28, "Cannot delete the environment (it is busy by another process)"); 04350 } 04351 delete m_Env; m_Env = 0; 04352 } 04353 04354 void CBDB_Cache::x_PerformCheckPointNoLock(unsigned bytes_written) 04355 { 04356 if (m_RunPurgeThread == false) { 04357 m_Env->TransactionCheckpoint(); 04358 } 04359 } 04360 04361 time_t 04362 CBDB_Cache::x_ComputeExpTime(int time_stamp, unsigned ttl, int timeout) 04363 { 04364 time_t exp_time; 04365 if (ttl) { // individual timeout 04366 if (m_MaxTimeout && ttl > m_MaxTimeout) { 04367 timeout = 04368 (int) max((unsigned)timeout, (unsigned)m_MaxTimeout); 04369 ttl = timeout; // used in diagnostics only 04370 } else { 04371 timeout = ttl; 04372 } 04373 } 04374 04375 // predicted job expiration time 04376 exp_time = time_stamp + timeout; 04377 return exp_time; 04378 } 04379 04380 bool CBDB_Cache::x_CheckTimeStampExpired(SCache_AttrDB& attr_db, 04381 time_t curr, 04382 time_t* exp_time) 04383 { 04384 int timeout = GetTimeout(); 04385 04386 if (timeout) { 04387 04388 int db_time_stamp = attr_db.time_stamp; 04389 unsigned int ttl = attr_db.ttl; 04390 04391 if (ttl) { // individual timeout 04392 if (m_MaxTimeout && ttl > m_MaxTimeout) { 04393 timeout = 04394 (int) max((unsigned)timeout, (unsigned)m_MaxTimeout); 04395 ttl = timeout; // used in diagnostics only 04396 } else { 04397 timeout = ttl; 04398 } 04399 } 04400 04401 // predicted job expiration time 04402 if (exp_time) { 04403 *exp_time = db_time_stamp + timeout; 04404 } 04405 04406 if (curr - timeout > db_time_stamp) { 04407 return true; 04408 } 04409 } 04410 return false; 04411 } 04412 04413 04414 void CBDB_Cache::x_UpdateAccessTime(const string& key, 04415 int version, 04416 const string& subkey, 04417 EBlobAccessType access_type, 04418 CBDB_Transaction& trans) 04419 { 04420 if (IsReadOnly()) { 04421 return; 04422 } 04423 04424 unsigned timeout = (unsigned)time(0); 04425 {{ 04426 CBDB_FileCursor cur(*m_CacheAttrDB, trans, 04427 CBDB_FileCursor::eReadModifyUpdate); 04428 04429 cur.SetCondition(CBDB_FileCursor::eEQ); 04430 cur.From << key << version; 04431 04432 if (cur.Fetch() == eBDB_Ok) { 04433 unsigned old_ts = m_CacheAttrDB->time_stamp; 04434 if (old_ts < timeout) { 04435 unsigned max_time = m_CacheAttrDB->max_time; 04436 if (max_time == 0 || max_time >= timeout) { 04437 m_CacheAttrDB->time_stamp = timeout; 04438 04439 switch (access_type) { 04440 case eBlobRead: 04441 { 04442 unsigned read_count = m_CacheAttrDB->read_count; 04443 ++read_count; 04444 m_CacheAttrDB->read_count = read_count; 04445 } 04446 break; 04447 case eBlobUpdate: 04448 { 04449 unsigned upd_count = m_CacheAttrDB->upd_count; 04450 ++upd_count; 04451 m_CacheAttrDB->upd_count = upd_count; 04452 } 04453 break; 04454 case eBlobStore: 04455 break; 04456 default: 04457 _ASSERT(0); 04458 } 04459 04460 cur.Update(); 04461 } 04462 } 04463 } // if 04464 04465 }} // cursor 04466 } 04467 04468 void CBDB_Cache::x_TruncateDB() 04469 { 04470 if (IsReadOnly()) { 04471 return; 04472 } 04473 04474 // TODO: optimization of store truncate 04475 {{ 04476 CBDB_FileCursor cur(*m_CacheAttrDB); 04477 cur.SetCondition(CBDB_FileCursor::eFirst); 04478 while (cur.Fetch() == eBDB_Ok) { 04479 unsigned blob_id, volume_id, split_id; 04480 blob_id = m_CacheAttrDB->blob_id; 04481 volume_id = m_CacheAttrDB->volume_id; 04482 split_id = m_CacheAttrDB->split_id; 04483 04484 if (blob_id) { 04485 unsigned coords[2] = { volume_id, split_id }; 04486 m_BLOB_SplitStore->SetTransaction(0); 04487 m_BLOB_SplitStore->Delete(blob_id, coords); 04488 } 04489 } // while 04490 04491 }} 04492 04493 m_BLOB_SplitStore->Save(); 04494 04495 04496 LOG_POST_X(29, Info << "CBDB_BLOB_Cache:: cache database truncated"); 04497 m_CacheAttrDB->Truncate(); 04498 04499 CDir dir(m_Path); 04500 CMaskFileName mask; 04501 mask.Add(this->GetName() + "_*.ov_"); 04502 04503 string ext; 04504 string ov_(".ov_"); 04505 if (dir.Exists()) { 04506 CDir::TEntries content(dir.GetEntries(mask)); 04507 ITERATE(CDir::TEntries, it, content) { 04508 if ( (*it)->IsFile() ) { 04509 ext = (*it)->GetExt(); 04510 if (ext == ov_) { 04511 (*it)->Remove(); 04512 } 04513 } 04514 } 04515 } 04516 } 04517 04518 04519 bool CBDB_Cache::x_RetrieveBlobAttributes(const string& key, 04520 int version, 04521 const string& subkey, 04522 int& overflow, 04523 unsigned int& ttl, 04524 unsigned int& blob_id, 04525 unsigned int& volume_id, 04526 unsigned int& split_id) 04527 { 04528 if (!x_FetchBlobAttributes(key, version, subkey)) return false; 04529 04530 overflow = m_CacheAttrDB->overflow; 04531 ttl = m_CacheAttrDB->ttl; 04532 blob_id = m_CacheAttrDB->blob_id; 04533 volume_id = m_CacheAttrDB->volume_id; 04534 split_id = m_CacheAttrDB->split_id; 04535 04536 /* 04537 if (!(m_TimeStampFlag & fTrackSubKey)) { 04538 m_CacheAttrDB->subkey = ""; 04539 04540 EBDB_ErrCode err = m_CacheAttrDB->Fetch(); 04541 if (err != eBDB_Ok) { 04542 return false; 04543 } 04544 } 04545 */ 04546 return true; 04547 } 04548 04549 bool CBDB_Cache::x_FetchBlobAttributes(const string& key, 04550 int version, 04551 const string& subkey) 04552 { 04553 m_CacheAttrDB->key = key; 04554 m_CacheAttrDB->version = version; 04555 m_CacheAttrDB->subkey = subkey; 04556 04557 EBDB_ErrCode ret = m_CacheAttrDB->Fetch(); 04558 if (ret != eBDB_Ok) { 04559 return false; 04560 } 04561 return true; 04562 } 04563 04564 04565 04566 void CBDB_Cache::x_DropOverflow(const string& key, 04567 int version, 04568 const string& subkey) 04569 { 04570 string path; 04571 try { 04572 s_MakeOverflowFileName(path, m_Path, GetName(), key, version, subkey); 04573 x_DropOverflow(path); 04574 } catch (exception& ex) { 04575 ERR_POST_X(30, "Blob Store: Cannot remove file: " << path 04576 << " " << ex.what()); 04577 } 04578 } 04579 04580 void CBDB_Cache::x_DropOverflow(const string& file_path) 04581 { 04582 try { 04583 CDirEntry entry(file_path); 04584 if (entry.Exists()) { 04585 entry.Remove(); 04586 } 04587 } catch (exception& ex) { 04588 ERR_POST_X(31, "Blob Store: Cannot remove file: " << file_path 04589 << " " << ex.what()); 04590 } 04591 } 04592 04593 unsigned CBDB_Cache::GetNextBlobId(bool lock_id) 04594 { 04595 unsigned blob_id = m_BlobIdCounter.Add(1); 04596 if (blob_id >= kMax_UInt) { 04597 m_BlobIdCounter.Set(0); 04598 blob_id = m_BlobIdCounter.Add(1); 04599 m_GC_Deleted.clear(); 04600 } 04601 if (lock_id) { 04602 bool locked = m_LockVector.TryLock(blob_id); 04603 if (!locked) { 04604 BDB_THROW(eInvalidOperation, "Cannot lock new BLOB ID"); 04605 } 04606 } 04607 return blob_id; 04608 } 04609 04610 bool CBDB_Cache::DropBlobWithExpCheck(unsigned blob_id, 04611 CBDB_Transaction& trans) 04612 { 04613 string key, subkey; 04614 int version; 04615 EBDB_ErrCode ret; 04616 {{ 04617 CFastMutexGuard guard(m_IDIDX_Lock_RO); 04618 04619 m_CacheIdIDX_RO->blob_id = blob_id; 04620 ret = m_CacheIdIDX_RO->Fetch(); 04621 if (ret == eBDB_Ok) { 04622 key = m_CacheIdIDX_RO->key; 04623 version = m_CacheIdIDX_RO->version; 04624 subkey = m_CacheIdIDX_RO->subkey; 04625 } 04626 }} // m_IDIDX_Lock_RO 04627 04628 if (ret == eBDB_Ok) { 04629 return DropBlobWithExpCheck(key, version, subkey, trans); 04630 } 04631 return false; 04632 } 04633 04634 04635 04636 bool CBDB_Cache::DropBlobWithExpCheck(const string& key, 04637 int version, 04638 const string& subkey, 04639 CBDB_Transaction& trans) 04640 { 04641 if (IsReadOnly()) { 04642 return false; 04643 } 04644 04645 time_t curr = time(0); 04646 04647 unsigned coords[2] = {0,}; 04648 unsigned split_coord[2] = {0,}; 04649 int overflow; 04650 unsigned blob_id; 04651 04652 bool blob_expired = false; 04653 time_t exp_time; 04654 04655 {{ 04656 CFastMutexGuard guard(m_CARO2_Lock); 04657 04658 m_CacheAttrDB_RO2->SetTransaction(0); 04659 04660 m_CacheAttrDB_RO2->key = key; 04661 m_CacheAttrDB_RO2->version = version; 04662 m_CacheAttrDB_RO2->subkey = subkey; 04663 if (m_CacheAttrDB_RO2->Fetch() != eBDB_Ok) { 04664 return false; 04665 } 04666 04667 if (x_CheckTimeStampExpired(*m_CacheAttrDB_RO2, curr, &exp_time)) { 04668 blob_expired = true; 04669 overflow = m_CacheAttrDB_RO2->overflow; 04670 coords[0] = m_CacheAttrDB_RO2->volume_id; 04671 coords[1] = m_CacheAttrDB_RO2->split_id; 04672 blob_id = m_CacheAttrDB_RO2->blob_id; 04673 } else { 04674 blob_expired = false; 04675 blob_id = m_CacheAttrDB_RO2->blob_id; 04676 } 04677 04678 }} // m_CARO2_Lock 04679 04680 if (!blob_expired) { 04681 {{ 04682 CFastMutexGuard guard(m_TimeLine_Lock); 04683 _ASSERT(exp_time); 04684 m_TimeLine->AddObject(exp_time, blob_id); 04685 }} // m_TimeLine_Lock 04686 return false; 04687 } 04688 04689 // BLOB expired and needs to be deleted 04690 // 04691 04692 04693 // Delete the overflow file 04694 // 04695 if (overflow == 1) { 04696 x_DropOverflow(key, version, subkey); 04697 } 04698 04699 // Important implementation note: 04700 // There is a temptation to delete BLOB split store first 04701 // and registration record second to minimize time registration record 04702 // stays locked. We should NOT do it in ONE transaction 04703 // because in other places order of access is different and this may 04704 // create DEADLOCK 04705 04706 // Delete BLOB attributes and index 04707 // 04708 {{ 04709 CFastMutexGuard guard(m_DB_Lock); 04710 m_CacheAttrDB->SetTransaction(&trans); 04711 04712 m_CacheAttrDB->key = key; 04713 m_CacheAttrDB->version = version; 04714 m_CacheAttrDB->subkey = subkey; 04715 if (m_CacheAttrDB->Fetch() != eBDB_Ok) { 04716 return false; 04717 } 04718 m_CacheAttrDB->Delete(CBDB_RawFile::eThrowOnError); 04719 04720 m_CacheIdIDX->SetTransaction(&trans); 04721 m_CacheIdIDX->blob_id = blob_id; 04722 m_CacheIdIDX->Delete(CBDB_RawFile::eThrowOnError); 04723 04724 }} // m_DB_Lock 04725 04726 // Delete split store 04727 // 04728 EBDB_ErrCode ret = 04729 m_BLOB_SplitStore->GetCoordinates(blob_id, split_coord); 04730 m_BLOB_SplitStore->SetTransaction(&trans); 04731 if (ret == eBDB_Ok) { 04732 if (split_coord[0] != coords[0] || 04733 split_coord[1] != coords[1]) { 04734 // split coords un-sync: delete de-facto BLOB 04735 m_BLOB_SplitStore->Delete(blob_id, 04736 CBDB_RawFile::eThrowOnError); 04737 } 04738 } 04739 // Delete BLOB by known coordinates 04740 m_BLOB_SplitStore->Delete(blob_id, coords, 04741 CBDB_RawFile::eThrowOnError); 04742 04743 m_GC_Deleted.set(blob_id); 04744 04745 return true; 04746 } 04747 04748 04749 04750 void CBDB_Cache::x_DropBlob(const string& key, 04751 int version, 04752 const string& subkey, 04753 int overflow, 04754 unsigned blob_id, 04755 CBDB_Transaction& trans) 04756 { 04757 if (IsReadOnly()) { 04758 return; 04759 } 04760 04761 if (overflow == 1) { 04762 x_DropOverflow(key, version, subkey); 04763 } 04764 04765 unsigned coords[2] = {0,}; 04766 unsigned split_coord[2] = {0,}; 04767 04768 if (blob_id) { 04769 bool delete_split = false; 04770 {{ 04771 //m_CacheAttrDB->SetTransaction(0); 04772 04773 CBDB_FileCursor cur(*m_CacheAttrDB); 04774 cur.SetCondition(CBDB_FileCursor::eEQ); 04775 cur.From << key << version << subkey; 04776 if (cur.Fetch() == eBDB_Ok) { 04777 coords[0] = m_CacheAttrDB->volume_id; 04778 coords[1] = m_CacheAttrDB->split_id; 04779 04780 EBDB_ErrCode ret = 04781 m_BLOB_SplitStore->GetCoordinates(blob_id, split_coord); 04782 if (ret == eBDB_Ok) { 04783 if (split_coord[0] != coords[0] || 04784 split_coord[1] != coords[1]) { 04785 delete_split = true; 04786 } 04787 } 04788 04789 } 04790 }} 04791 04792 // delete blob as pointed by de-mux splitter 04793 if (delete_split) { 04794 m_BLOB_SplitStore->Delete(blob_id, 04795 CBDB_RawFile::eThrowOnError); 04796 } 04797 // delete blob as accounted by meta-information 04798 m_BLOB_SplitStore->Delete(blob_id, coords, 04799 CBDB_RawFile::eThrowOnError); 04800 04801 } 04802 m_CacheAttrDB->SetTransaction(&trans); 04803 04804 m_CacheAttrDB->key = key; 04805 m_CacheAttrDB->version = version; 04806 m_CacheAttrDB->subkey = subkey; 04807 04808 m_CacheAttrDB->Delete(CBDB_RawFile::eThrowOnError); 04809 04810 m_CacheIdIDX->SetTransaction(&trans); 04811 04812 m_CacheIdIDX->blob_id = blob_id; 04813 m_CacheIdIDX->Delete(CBDB_RawFile::eThrowOnError); 04814 } 04815 04816 bool CBDB_Cache::IsLocked(unsigned blob_id) 04817 { 04818 return m_LockVector.IsLocked(blob_id); 04819 } 04820 04821 bool CBDB_Cache::IsLocked(const string& key, 04822 int version, 04823 const string& subkey) 04824 { 04825 unsigned blob_id = GetBlobId(key, version, subkey); 04826 if (!blob_id) return false; 04827 return IsLocked(blob_id); 04828 } 04829 04830 04831 CBDB_Cache::EBlobCheckinRes 04832 CBDB_Cache::BlobCheckIn(unsigned blob_id_ext, 04833 const string& key, 04834 int version, 04835 const string& subkey, 04836 EBlobCheckinMode mode, 04837 TBlobLock& blob_lock, 04838 bool do_id_lock, 04839 unsigned* volume_id, 04840 unsigned* split_id, 04841 unsigned* overflow) 04842 { 04843 _ASSERT(volume_id); 04844 _ASSERT(split_id); 04845 _ASSERT(overflow); 04846 04847 EBDB_ErrCode ret; 04848 unsigned blob_id = 0; 04849 04850 while (blob_id == 0) { 04851 {{ 04852 CFastMutexGuard guard(m_DB_Lock); 04853 m_CacheAttrDB->SetTransaction(0); 04854 04855 CBDB_FileCursor cur(*m_CacheAttrDB); 04856 cur.SetCondition(CBDB_FileCursor::eEQ); 04857 cur.From << key << version << subkey; 04858 04859 ret = cur.Fetch(); 04860 if (ret == eBDB_Ok) { 04861 blob_id = m_CacheAttrDB->blob_id; 04862 *volume_id = m_CacheAttrDB->volume_id; 04863 *split_id = m_CacheAttrDB->split_id; 04864 *overflow = m_CacheAttrDB->overflow; 04865 } 04866 }} // m_DB_Lock 04867 04868 if (ret == eBDB_Ok) { 04869 _ASSERT(blob_id); 04870 if (do_id_lock) { 04871 blob_lock.Lock(blob_id); 04872 } else { 04873 blob_lock.SetId(blob_id); 04874 } 04875 return eBlobCheckIn_Found; 04876 } 04877 04878 *overflow = *volume_id = *split_id = 0; 04879 04880 switch (mode) { 04881 case eBlobCheckIn: 04882 break; 04883 case eBlobCheckIn_Create: 04884 { 04885 if (blob_id_ext == 0) { 04886 blob_id = GetNextBlobId(false/*do not lock*/); 04887 } else { 04888 blob_id = blob_id_ext; 04889 } 04890 04891 CBDB_Transaction trans(*m_Env, 04892 CBDB_Transaction::eTransASync, // async! 04893 CBDB_Transaction::eNoAssociation); 04894 04895 // create registration record: use temp magic values 04896 // (and overide some later) 04897 // 04898 {{ 04899 CFastMutexGuard guard(m_DB_Lock); 04900 m_CacheAttrDB->SetTransaction(&trans); 04901 04902 m_CacheAttrDB->key = key; 04903 m_CacheAttrDB->version = version; 04904 m_CacheAttrDB->subkey = subkey; 04905 m_CacheAttrDB->time_stamp = Uint4(time(0)+1); 04906 m_CacheAttrDB->overflow = 0; 04907 m_CacheAttrDB->ttl = 77; 04908 m_CacheAttrDB->max_time = 77; 04909 m_CacheAttrDB->upd_count = 0; 04910 m_CacheAttrDB->read_count = 0; 04911 m_CacheAttrDB->owner_name = "BDB_cache"; 04912 m_CacheAttrDB->blob_id = blob_id; 04913 m_CacheAttrDB->volume_id = 0; 04914 m_CacheAttrDB->split_id = 0; 04915 04916 ret = m_CacheAttrDB->Insert(); 04917 04918 if (ret == eBDB_Ok) { 04919 m_CacheIdIDX->SetTransaction(&trans); 04920 04921 m_CacheIdIDX->blob_id = blob_id; 04922 m_CacheIdIDX->key = key; 04923 m_CacheIdIDX->version = version; 04924 m_CacheIdIDX->subkey = subkey; 04925 04926 ret = m_CacheIdIDX->Insert(); 04927 if (ret == eBDB_Ok) { 04928 trans.Commit(); 04929 if (do_id_lock) { 04930 blob_lock.Lock(blob_id); 04931 } else { 04932 blob_lock.SetId(blob_id); 04933 } 04934 return eBlobCheckIn_Created; 04935 } else { 04936 BDB_THROW(eInvalidOperation, 04937 "Cannot update blob id index"); 04938 } 04939 } 04940 }} // m_DB_Lock 04941 04942 // Insert failed (record exists) - re-read the record 04943 blob_id = 0; 04944 } 04945 break; 04946 default: 04947 _ASSERT(0); 04948 } 04949 } // while (!blob_id) 04950 04951 return EBlobCheckIn_NotFound; 04952 } 04953 04954 04955 04956 void CBDB_Cache::x_UpdateOwnerStatOnDelete(const string& owner, 04957 bool expl_delete) 04958 { 04959 /* 04960 SBDB_CacheStatistics& st = m_OwnerStatistics[owner]; 04961 04962 const char* key = m_CacheAttrDB->key; 04963 int version = m_CacheAttrDB->version; 04964 const char* subkey = m_CacheAttrDB->subkey; 04965 04966 unsigned upd_count = m_CacheAttrDB->upd_count; 04967 unsigned read_count = m_CacheAttrDB->read_count; 04968 int overflow = m_CacheAttrDB->overflow; 04969 04970 size_t blob_size = x_GetBlobSize(key, version, subkey); 04971 04972 if (!blob_size) { 04973 return; 04974 } 04975 04976 ++st.blobs_stored_total; 04977 st.blobs_overflow_total += overflow; 04978 st.blobs_updates_total += upd_count; 04979 if (!read_count) { 04980 ++st.blobs_never_read_total; 04981 } 04982 st.blobs_read_total += read_count; 04983 st.blobs_expl_deleted_total += expl_delete; 04984 if (!expl_delete) { 04985 st.AddPurgeDelete(); 04986 } 04987 st.blobs_size_total += blob_size; 04988 if (blob_size > st.blob_size_max_total) { 04989 st.blob_size_max_total = blob_size; 04990 } 04991 04992 SBDB_CacheStatistics::AddToHistogram(&st.blob_size_hist, blob_size); 04993 */ 04994 } 04995 /* 04996 size_t CBDB_Cache::x_GetBlobSize(const char* key, 04997 int version, 04998 const char* subkey) 04999 { 05000 int overflow = m_CacheAttrDB->overflow; 05001 unsigned blob_id = m_CacheAttrDB->blob_id; 05002 if (overflow) { 05003 string path; 05004 s_MakeOverflowFileName(path, m_Path, GetName(), key, version, subkey); 05005 CFile entry(path); 05006 05007 if (entry.Exists()) { 05008 return (size_t) entry.GetLength(); 05009 } 05010 } 05011 05012 // Regular inline BLOB 05013 05014 if (blob_id == 0) { 05015 return 0; 05016 } 05017 05018 m_CacheBLOB_DB->blob_id = blob_id; 05019 EBDB_ErrCode ret = m_CacheBLOB_DB->Fetch(); 05020 05021 if (ret != eBDB_Ok) { 05022 return 0; 05023 } 05024 return m_CacheBLOB_DB->LobSize(); 05025 } 05026 */ 05027 05028 void CBDB_Cache::Lock() 05029 { 05030 m_DB_Lock.Lock(); 05031 } 05032 05033 void CBDB_Cache::Unlock() 05034 { 05035 m_DB_Lock.Unlock(); 05036 } 05037 05038 05039 05040 void CBDB_Cache::RegisterInternalError( 05041 SBDB_CacheUnitStatistics::EErrGetPut operation, 05042 const string& owner) 05043 { 05044 if (IsSaveStatistics()) { 05045 CFastMutexGuard guard(m_DB_Lock); 05046 m_Statistics.AddInternalError(owner, operation); 05047 } 05048 } 05049 05050 void CBDB_Cache::RegisterProtocolError( 05051 SBDB_CacheUnitStatistics::EErrGetPut operation, 05052 const string& owner) 05053 { 05054 if (IsSaveStatistics()) { 05055 CFastMutexGuard guard(m_DB_Lock); 05056 m_Statistics.AddProtocolError(owner, operation); 05057 } 05058 } 05059 05060 void CBDB_Cache::RegisterNoBlobError( 05061 SBDB_CacheUnitStatistics::EErrGetPut operation, 05062 const string& owner) 05063 { 05064 if (IsSaveStatistics()) { 05065 CFastMutexGuard guard(m_DB_Lock); 05066 m_Statistics.AddNoBlobError(owner, operation); 05067 } 05068 } 05069 05070 void CBDB_Cache::RegisterCommError( 05071 SBDB_CacheUnitStatistics::EErrGetPut operation, 05072 const string& owner) 05073 { 05074 if (IsSaveStatistics()) { 05075 CFastMutexGuard guard(m_DB_Lock); 05076 m_Statistics.AddCommError(owner, operation); 05077 } 05078 } 05079 05080 05081 05082 05083 05084 05085 05086 05087 CBDB_Cache::CacheKey::CacheKey(const string& x_key, 05088 int x_version, 05089 const string& x_subkey) 05090 : key(x_key), version(x_version), subkey(x_subkey) 05091 {} 05092 05093 05094 bool 05095 CBDB_Cache::CacheKey::operator < (const CBDB_Cache::CacheKey& cache_key) const 05096 { 05097 int cmp = NStr::Compare(key, cache_key.key); 05098 if (cmp != 0) 05099 return cmp < 0; 05100 if (version != cache_key.version) return (version < cache_key.version); 05101 cmp = NStr::Compare(subkey, cache_key.subkey); 05102 if (cmp != 0) 05103 return cmp < 0; 05104 return false; 05105 } 05106 05107 05108 void BDB_Register_Cache(void) 05109 { 05110 RegisterEntryPoint<ICache>(NCBI_BDB_ICacheEntryPoint); 05111 } 05112 05113 05114 const char* kBDBCacheDriverName = "bdb"; 05115 05116 /// Class factory for BDB implementation of ICache 05117 /// 05118 /// @internal 05119 /// 05120 class CBDB_CacheReaderCF : public CICacheCF<CBDB_Cache> 05121 { 05122 public: 05123 typedef CICacheCF<CBDB_Cache> TParent; 05124 public: 05125 CBDB_CacheReaderCF() : TParent(kBDBCacheDriverName, 0) 05126 { 05127 } 05128 ~CBDB_CacheReaderCF() 05129 { 05130 } 05131 05132 virtual 05133 ICache* CreateInstance( 05134 const string& driver = kEmptyStr, 05135 CVersionInfo version = NCBI_INTERFACE_VERSION(ICache), 05136 const TPluginManagerParamTree* params = 0) const; 05137 05138 }; 05139 05140 // List of parameters accepted by the CF 05141 05142 static const char* kCFParam_path = "path"; 05143 static const char* kCFParam_name = "name"; 05144 static const char* kCFParam_drop_if_dirty = "drop_if_dirty"; 05145 05146 static const char* kCFParam_lock = "lock"; 05147 static const char* kCFParam_lock_default = "no_lock"; 05148 static const char* kCFParam_lock_pid_lock = "pid_lock"; 05149 05150 static const char* kCFParam_mem_size = "mem_size"; 05151 static const char* kCFParam_log_mem_size = "log_mem_size"; 05152 static const char* kCFParam_read_only = "read_only"; 05153 static const char* kCFParam_write_sync = "write_sync"; 05154 static const char* kCFParam_use_transactions = "use_transactions"; 05155 static const char* kCFParam_direct_db = "direct_db"; 05156 static const char* kCFParam_direct_log = "direct_log"; 05157 static const char* kCFParam_transaction_log_path = "transaction_log_path"; 05158 05159 static const char* kCFParam_purge_batch_size = "purge_batch_size"; 05160 static const char* kCFParam_purge_batch_sleep = "purge_batch_sleep"; 05161 static const char* kCFParam_purge_clean_log = "purge_clean_log"; 05162 static const char* kCFParam_purge_thread = "purge_thread"; 05163 static const char* kCFParam_purge_thread_delay = "purge_thread_delay"; 05164 static const char* kCFParam_checkpoint_delay = "checkpoint_delay"; 05165 static const char* kCFParam_checkpoint_bytes = "checkpoint_bytes"; 05166 static const char* kCFParam_log_file_max = "log_file_max"; 05167 static const char* kCFParam_overflow_limit = "overflow_limit"; 05168 static const char* kCFParam_ttl_prolong = "ttl_prolong"; 05169 static const char* kCFParam_max_blob_size = "max_blob_size"; 05170 static const char* kCFParam_rr_volumes = "rr_volumes"; 05171 static const char* kCFParam_memp_trickle = "memp_trickle"; 05172 static const char* kCFParam_TAS_spins = "tas_spins"; 05173 05174 05175 bool CBDB_Cache::SameCacheParams(const TCacheParams* params) const 05176 { 05177 if ( !params ) { 05178 return false; 05179 } 05180 const TCacheParams* driver = params->FindNode("driver"); 05181 if (!driver || driver->GetValue().value != kBDBCacheDriverName) { 05182 return false; 05183 } 05184 const TCacheParams* driver_params = params->FindNode(kBDBCacheDriverName); 05185 if ( !driver_params ) { 05186 return false; 05187 } 05188 const TCacheParams* path = driver_params->FindNode(kCFParam_path); 05189 string str_path = path ? 05190 CDirEntry::AddTrailingPathSeparator( 05191 path->GetValue().value) : kEmptyStr; 05192 if (!path || str_path != m_Path) { 05193 return false; 05194 } 05195 const TCacheParams* name = driver_params->FindNode(kCFParam_name); 05196 return name && name->GetValue().value == m_Name; 05197 } 05198 05199 05200 ICache* CBDB_CacheReaderCF::CreateInstance( 05201 const string& driver, 05202 CVersionInfo version, 05203 const TPluginManagerParamTree* params) const 05204 { 05205 auto_ptr<CBDB_Cache> drv; 05206 if (driver.empty() || driver == m_DriverName) { 05207 if (version.Match(NCBI_INTERFACE_VERSION(ICache)) 05208 != CVersionInfo::eNonCompatible) { 05209 drv.reset(new CBDB_Cache()); 05210 } 05211 } else { 05212 return 0; 05213 } 05214 05215 if (!params) 05216 return drv.release(); 05217 05218 // cache configuration 05219 05220 const string& path = 05221 GetParam(params, kCFParam_path, true); 05222 string name = 05223 GetParam(params, kCFParam_name, false, "lcache"); 05224 string locking = 05225 GetParam(params, kCFParam_lock, false, kCFParam_lock_default); 05226 05227 CBDB_Cache::ELockMode lock = CBDB_Cache::eNoLock; 05228 if (NStr::CompareNocase(locking, kCFParam_lock_pid_lock) == 0) { 05229 lock = CBDB_Cache::ePidLock; 05230 } 05231 unsigned overflow_limit = (unsigned) 05232 GetParamDataSize(params, kCFParam_overflow_limit, false, 0); 05233 if (overflow_limit) { 05234 drv->SetOverflowLimit(overflow_limit); 05235 } 05236 05237 drv->SetInitIfDirty(GetParamBool(params, kCFParam_drop_if_dirty, false, false)); 05238 05239 Uint8 mem_size = 05240 GetParamDataSize(params, kCFParam_mem_size, false, 0); 05241 05242 Uint8 log_mem_size = 05243 GetParamDataSize(params, kCFParam_log_mem_size, false, 0); 05244 05245 unsigned checkpoint_bytes = (unsigned) 05246 GetParamDataSize(params, kCFParam_checkpoint_bytes, 05247 false, 24 * 1024 * 1024); 05248 drv->SetCheckpoint(checkpoint_bytes); 05249 05250 unsigned checkpoint_delay = 05251 GetParamInt(params, kCFParam_checkpoint_delay, false, 15); 05252 drv->SetCheckpointDelay(checkpoint_delay); 05253 05254 unsigned log_file_max = (unsigned) 05255 GetParamDataSize(params, kCFParam_log_file_max, 05256 false, 200 * 1024 * 1024); 05257 drv->SetLogFileMax(log_file_max); 05258 05259 string transaction_log_path = 05260 GetParam(params, kCFParam_transaction_log_path, false, path); 05261 if (transaction_log_path != path) { 05262 drv->SetLogDir(transaction_log_path); 05263 } 05264 05265 05266 bool ro = 05267 GetParamBool(params, kCFParam_read_only, false, false); 05268 05269 bool w_sync = 05270 GetParamBool(params, kCFParam_write_sync, false, false); 05271 drv->SetWriteSync(w_sync ? 05272 CBDB_Cache::eWriteSync : CBDB_Cache::eWriteNoSync); 05273 05274 unsigned ttl_prolong = 05275 GetParamInt(params, kCFParam_ttl_prolong, false, 0); 05276 drv->SetTTL_Prolongation(ttl_prolong); 05277 05278 unsigned max_blob_size =(unsigned) 05279 GetParamDataSize(params, kCFParam_max_blob_size, false, 0); 05280 drv->SetMaxBlobSize(max_blob_size); 05281 05282 unsigned rr_volumes = 05283 GetParamInt(params, kCFParam_rr_volumes, false, 3); 05284 drv->SetRR_Volumes(rr_volumes); 05285 05286 unsigned memp_trickle = 05287 GetParamInt(params, kCFParam_memp_trickle, false, 60); 05288 drv->SetMempTrickle(memp_trickle); 05289 05290 ConfigureICache(drv.get(), params); 05291 05292 bool use_trans = 05293 GetParamBool(params, kCFParam_use_transactions, false, true); 05294 05295 unsigned batch_size = 05296 GetParamInt(params, kCFParam_purge_batch_size, false, 70); 05297 drv->SetPurgeBatchSize(batch_size); 05298 05299 unsigned batch_sleep = 05300 GetParamInt(params, kCFParam_purge_batch_sleep, false, 0); 05301 drv->SetBatchSleep(batch_sleep); 05302 05303 unsigned purge_clean_factor = 05304 GetParamInt(params, kCFParam_purge_clean_log, false, 0); 05305 drv->CleanLogOnPurge(purge_clean_factor); 05306 05307 bool purge_thread = 05308 GetParamBool(params, kCFParam_purge_thread, false, false); 05309 unsigned purge_thread_delay = 05310 GetParamInt(params, kCFParam_purge_thread_delay, false, 30); 05311 05312 if (purge_thread) { 05313 drv->RunPurgeThread(purge_thread_delay); 05314 } 05315 05316 unsigned tas_spins = 05317 GetParamInt(params, kCFParam_TAS_spins, false, 200); 05318 05319 if (ro) { 05320 drv->OpenReadOnly(path.c_str(), name.c_str(), (unsigned)mem_size); 05321 } else { 05322 drv->Open(path, name, 05323 lock, mem_size, 05324 use_trans ? CBDB_Cache::eUseTrans : CBDB_Cache::eNoTrans, 05325 (unsigned)log_mem_size); 05326 } 05327 05328 if (!drv->IsJoinedEnv()) { 05329 bool direct_db = 05330 GetParamBool(params, kCFParam_direct_db, false, false); 05331 bool direct_log = 05332 GetParamBool(params, kCFParam_direct_log, false, false); 05333 05334 CBDB_Env* env = drv->GetEnv(); 05335 env->SetDirectDB(direct_db); 05336 env->SetDirectLog(direct_log); 05337 if (tas_spins) { 05338 env->SetTasSpins(tas_spins); 05339 } 05340 env->SetLkDetect(CBDB_Env::eDeadLock_Default); 05341 env->SetMpMaxWrite(0, 0); 05342 } 05343 05344 return drv.release(); 05345 05346 } 05347 05348 05349 void NCBI_BDB_ICacheEntryPoint( 05350 CPluginManager<ICache>::TDriverInfoList& info_list, 05351 CPluginManager<ICache>::EEntryPointRequest method) 05352 { 05353 CHostEntryPointImpl<CBDB_CacheReaderCF>:: 05354 NCBI_EntryPointImpl(info_list, method); 05355 } 05356 05357 void NCBI_EntryPoint_xcache_bdb( 05358 CPluginManager<ICache>::TDriverInfoList& info_list, 05359 CPluginManager<ICache>::EEntryPointRequest method) 05360 { 05361 NCBI_BDB_ICacheEntryPoint(info_list, method); 05362 } 05363 05364 05365 CBDB_CacheHolder::CBDB_CacheHolder(ICache* blob_cache, ICache* id_cache) 05366 : m_BlobCache(blob_cache), 05367 m_IdCache(id_cache) 05368 {} 05369 05370 CBDB_CacheHolder::~CBDB_CacheHolder() 05371 { 05372 delete m_BlobCache; 05373 delete m_IdCache; 05374 } 05375 05376 05377 void BDB_ConfigureCache(CBDB_Cache& bdb_cache, 05378 const string& path, 05379 const string& name, 05380 unsigned timeout, 05381 ICache::TTimeStampFlags tflags) 05382 { 05383 if (!tflags) { 05384 tflags = 05385 ICache::fTimeStampOnCreate | 05386 ICache::fExpireLeastFrequentlyUsed | 05387 ICache::fPurgeOnStartup | 05388 // ICache::fTrackSubKey | 05389 ICache::fCheckExpirationAlways; 05390 } 05391 if (timeout == 0) { 05392 timeout = 24 * 60 * 60; 05393 } 05394 05395 bdb_cache.SetTimeStampPolicy(tflags, timeout); 05396 bdb_cache.SetVersionRetention(ICache::eKeepAll); 05397 05398 bdb_cache.Open(path.c_str(), 05399 name.c_str(), 05400 CBDB_Cache::eNoLock, 05401 10 * 1024 * 1024, 05402 CBDB_Cache::eUseTrans); 05403 05404 } 05405 05406 05407 END_NCBI_SCOPE
1.7.5.1
Modified on Wed May 23 13:10:12 2012 by modify_doxy.py rev. 337098