NCBI C++ ToolKit
bdb_blobcache.cpp
Go to the documentation of this file.
00001 /*  $Id: bdb_blobcache.cpp 52450 2011-12-27 20:42:58Z kazimird $
00002  * ===========================================================================
00003  *
00004  *                            PUBLIC DOMAIN NOTICE
00005  *               National Center for Biotechnology Information
00006  *
00007  *  This software/database is a "United States Government Work" under the
00008  *  terms of the United States Copyright Act.  It was written as part of
00009  *  the author's official duties as a United States Government employee and
00010  *  thus cannot be copyrighted.  This software/database is freely available
00011  *  to the public for use. The National Library of Medicine and the U.S.
00012  *  Government have not placed any restriction on its use or reproduction.
00013  *
00014  *  Although all reasonable efforts have been taken to ensure the accuracy
00015  *  and reliability of the software and data, the NLM and the U.S.
00016  *  Government do not and cannot warrant the performance or results that
00017  *  may be obtained by using this software or data. The NLM and the U.S.
00018  *  Government disclaim all warranties, express or implied, including
00019  *  warranties of performance, merchantability or fitness for any particular
00020  *  purpose.
00021  *
00022  *  Please cite the author in any work or product based on this material.
00023  *
00024  * ===========================================================================
00025  *
00026  * Author: Anatoliy Kuznetsov
00027  *
00028  * File Description:  BDB libarary BLOB cache implementation.
00029  *
00030  */
00031 
00032 #include <ncbi_pch.hpp>
00033 
00034 #include <corelib/ncbitime.hpp>
00035 #include <corelib/ncbifile.hpp>
00036 #include <corelib/ncbi_process.hpp>
00037 #include <corelib/plugin_manager_impl.hpp>
00038 #include <corelib/ncbi_system.hpp>
00039 #include <corelib/ncbi_limits.h>
00040 #include <corelib/ncbimtx.hpp>
00041 #include <corelib/ncbitime.hpp>
00042 #include <corelib/plugin_manager_store.hpp>
00043 #include <corelib/ncbiexpt.hpp>
00044 
00045 #include <connect/server_monitor.hpp>
00046 
00047 #include <db.h>
00048 
00049 #include <db/bdb/bdb_blobcache.hpp>
00050 #include <db/bdb/bdb_cursor.hpp>
00051 #include <db/bdb/bdb_trans.hpp>
00052 
00053 #include <db/error_codes.hpp>
00054 
00055 #include <util/cache/icache_cf.hpp>
00056 #include <util/cache/icache_clean_thread.hpp>
00057 #include <util/simple_buffer.hpp>
00058 
00059 #include <time.h>
00060 #include <math.h>
00061 
00062 
00063 #define NCBI_USE_ERRCODE_X   Db_Bdb_BlobCache
00064 
00065 
00066 BEGIN_NCBI_SCOPE
00067 
00068 
00069 // ===========================================================================
00070 //
00071 // Various implementation notes:
00072 //
00073 // The code below uses mixed transaction model, data store transactions
00074 // go in the default mode (sync or async) (configurable).
00075 // Miscelaneous service transactions (like those from GC (Purge)) are
00076 // explicitly async (for speed). This is not critical if we lose some of them.
00077 //
00078 //
00079 
00080 const char* kBDBCacheStartedFileName = "__ncbi_cache_started__";
00081 
00082 
00083 static void s_MakeOverflowFileName(string& buf,
00084                                    const string& path,
00085                                    const string& cache,
00086                                    const string& key,
00087                                    int           version,
00088                                    const string& subkey)
00089 {
00090     buf = path + cache + '_' + key + '_'
00091                + NStr::IntToString(version) + '_' + subkey + ".ov_";
00092 }
00093 
00094 
00095 /// @internal
00096 struct SCacheDescr
00097 {
00098     string    key;
00099     int       version;
00100     string    subkey;
00101     int       overflow;
00102     unsigned  blob_id;
00103 
00104     SCacheDescr(string   x_key,
00105                 int      x_version,
00106                 string   x_subkey,
00107                 int      x_overflow,
00108                 unsigned x_blob_id)
00109     : key(x_key),
00110       version(x_version),
00111       subkey(x_subkey),
00112       overflow(x_overflow),
00113       blob_id(x_blob_id)
00114     {}
00115 
00116     SCacheDescr() {}
00117 };
00118 
00119 
00120 /// @internal
00121 class CBDB_CacheIReader : public IReader
00122 {
00123 public:
00124     CBDB_CacheIReader(CBDB_Cache&                bdb_cache,
00125                       CNcbiIfstream*             overflow_file,
00126                       CBDB_Cache::TBlobLock&     blob_lock)
00127     : m_Cache(bdb_cache),
00128       m_OverflowFile(overflow_file),
00129       m_RawBuffer(0),
00130       m_BufferPtr(0),
00131       m_BlobLock(blob_lock.GetLockVector(), blob_lock.GetTimeout())
00132     {
00133         m_BlobLock.TakeFrom(blob_lock);
00134     }
00135 
00136     CBDB_CacheIReader(CBDB_Cache&                bdb_cache,
00137                       CBDB_RawFile::TBuffer*     raw_buffer,
00138                       CBDB_Cache::TBlobLock&     blob_lock)
00139     : m_Cache(bdb_cache),
00140       m_OverflowFile(0),
00141       m_RawBuffer(raw_buffer),
00142       m_BufferPtr(raw_buffer->data()),
00143       m_BufferSize(raw_buffer->size()),
00144       m_BlobLock(blob_lock.GetLockVector(), blob_lock.GetTimeout())
00145     {
00146         m_BlobLock.TakeFrom(blob_lock);
00147     }
00148 
00149     virtual ~CBDB_CacheIReader()
00150     {
00151         if ( m_RawBuffer ) {
00152             if ( m_BufferSize ) {
00153                 ERR_POST("CBDB_CacheIReader: detected unread input "<<
00154                          m_BufferSize);
00155             }
00156             delete m_RawBuffer;
00157         }
00158         if ( m_OverflowFile ) {
00159             CNcbiStreampos pos = m_OverflowFile->tellg();
00160             m_OverflowFile->seekg(0, IOS_BASE::end);
00161             CNcbiStreampos end = m_OverflowFile->tellg();
00162             if ( pos != end ) {
00163                 ERR_POST("CBDB_CacheIReader: detected unread input "<<
00164                          (end-pos)<<": "<<pos<<" of "<<end);
00165             }
00166             delete m_OverflowFile;
00167         }
00168     }
00169 
00170 
00171     virtual ERW_Result Read(void*   buf,
00172                             size_t  count,
00173                             size_t* bytes_read)
00174     {
00175         if (count == 0)
00176             return eRW_Success;
00177 
00178         // Check if BLOB is memory based...
00179         if (m_RawBuffer) {
00180             if (m_BufferSize == 0) {
00181                 *bytes_read = 0;
00182                 return eRW_Eof;
00183             }
00184             *bytes_read = min(count, m_BufferSize);
00185             ::memcpy(buf, m_BufferPtr, *bytes_read);
00186             m_BufferPtr += *bytes_read;
00187             m_BufferSize -= *bytes_read;
00188             return eRW_Success;
00189         }
00190 
00191         // Check if BLOB is file based...
00192         if (m_OverflowFile) {
00193             m_OverflowFile->read((char*)buf, count);
00194             *bytes_read = m_OverflowFile->gcount();
00195             if (*bytes_read == 0) {
00196                 return eRW_Eof;
00197             }
00198             return eRW_Success;
00199         }
00200 
00201         return eRW_Success;
00202     }
00203 
00204     virtual ERW_Result PendingCount(size_t* count)
00205     {
00206         if ( m_RawBuffer ) {
00207             *count = m_BufferSize;
00208             return eRW_Success;
00209         }
00210         else if ( m_OverflowFile ) {
00211             *count = m_OverflowFile->good()? 1: 0;
00212             return eRW_Success;
00213         }
00214         *count = 0;
00215         return eRW_Error;
00216     }
00217 
00218 
00219 private:
00220     CBDB_CacheIReader(const CBDB_CacheIReader&);
00221     CBDB_CacheIReader& operator=(const CBDB_CacheIReader&);
00222 
00223 private:
00224     CBDB_Cache&                 m_Cache;
00225     CNcbiIfstream*              m_OverflowFile;
00226     CBDB_RawFile::TBuffer*      m_RawBuffer;
00227     unsigned char*              m_BufferPtr;
00228     size_t                      m_BufferSize;
00229     CBDB_Cache::TBlobLock       m_BlobLock;
00230 };
00231 
00232 /// Buffer resize strategy, to balance memory reallocs and heap
00233 /// consumption
00234 ///
00235 /// @internal
00236 class CCacheBufferResizeStrategy
00237 {
00238 public:
00239     static size_t GetNewCapacity(size_t cur_capacity, size_t requested_size)
00240     {
00241         size_t reserve = requested_size / 2;
00242         if (reserve > 1 * 1024 * 1024) {
00243             reserve = 1 * 1024 * 1024;
00244         }
00245         return requested_size + reserve;
00246     }
00247 };
00248 
00249 
00250 /// @internal
00251 class CBDB_CacheIWriter : public IWriter
00252 {
00253 public:
00254     typedef
00255         CSimpleBufferT<unsigned char, CCacheBufferResizeStrategy> TBuffer;
00256 
00257 public:
00258     CBDB_CacheIWriter(CBDB_Cache&                bdb_cache,
00259                       const char*                path,
00260                       unsigned                   blob_id_ext,
00261                       const string&              blob_key,
00262                       int                        version,
00263                       const string&              subkey,
00264                       SCache_AttrDB&             attr_db,
00265                       unsigned int               ttl,
00266                       time_t                     request_time,
00267                       const string&              owner,
00268                       CBDB_Cache::TBlobLock&     blob_lock)
00269     : m_Cache(bdb_cache),
00270       m_Path(path),
00271       m_BlobIdExt(blob_id_ext),
00272       m_BlobKey(blob_key),
00273       m_Version(version),
00274       m_SubKey(subkey),
00275       m_AttrDB(attr_db),
00276       m_Buffer(0),
00277 //      m_BytesInBuffer(0),
00278       m_OverflowFile(0),
00279       m_TTL(ttl),
00280       m_RequestTime(request_time),
00281       m_Flushed(false),
00282       m_BlobSize(0),
00283       m_Overflow(0),
00284       m_BlobStore(0),
00285       m_BlobUpdate(0),
00286       m_Owner(owner),
00287       m_BlobLock(blob_lock.GetLockVector(), blob_lock.GetTimeout())
00288     {
00289         //_TRACE("CBDB_CacheIWriter::CBDB_CacheIWriter point 1");
00290 //        m_Buffer = new unsigned char[m_Cache.GetOverflowLimit()];
00291         m_Buffer.reserve(4 * 1024);
00292         m_BlobLock.TakeFrom(blob_lock);
00293         //_TRACE("CBDB_CacheIWriter::CBDB_CacheIWriter point 2");
00294     }
00295 
00296     virtual ~CBDB_CacheIWriter()
00297     {
00298         //_TRACE("CBDB_CacheIWriter::~CBDB_CacheIWriter point 1");
00299         try {
00300             //bool upd_statistics = false;
00301 
00302             // Dumping the buffer
00303             try {
00304                 //if (m_Buffer.size()) {
00305                 if (!m_OverflowFile  &&  !m_Flushed) {
00306                     _TRACE("CBDB_CacheIWriter::~CBDB_CacheIWriter point 2");
00307                     m_Cache.x_Store(m_BlobIdExt,
00308                                     m_BlobKey,
00309                                     m_Version,
00310                                     m_SubKey,
00311                                     m_Buffer.data(),
00312                                     m_Buffer.size(), // m_BytesInBuffer,
00313                                     m_TTL,
00314                                     m_Owner,
00315                                     false // do not lock blob
00316                                     );
00317                     //delete[] m_Buffer; m_Buffer = 0;
00318                 }
00319             } catch (CBDB_Exception& ) {
00320                 //_TRACE("CBDB_CacheIWriter::~CBDB_CacheIWriter point 3");
00321                 m_Cache.KillBlob(m_BlobKey, m_Version, m_SubKey,
00322                                  1,
00323                                  0);
00324                 throw;
00325             }
00326             //_TRACE("CBDB_CacheIWriter::~CBDB_CacheIWriter point 4");
00327 
00328             if (m_OverflowFile) {
00329                 if (m_OverflowFile->is_open()) {
00330                     //_TRACE("CBDB_CacheIWriter::~CBDB_CacheIWriter point 5");
00331                     m_OverflowFile->close();
00332                     try {
00333                         m_Cache.RegisterOverflow(m_BlobKey, m_Version, m_SubKey,
00334                                                  m_TTL, m_Owner);
00335                     } catch (exception& ) {
00336                         m_Cache.KillBlob(m_BlobKey, m_Version, m_SubKey, 1, 0);
00337                         throw;
00338                     }
00339                     //_TRACE("CBDB_CacheIWriter::~CBDB_CacheIWriter point 6");
00340                     //upd_statistics = true;
00341                 }
00342                 delete m_OverflowFile; m_OverflowFile = 0;
00343 
00344                 // statistics
00345                 // FIXME: ideally statistics should not sit on the same main mutex
00346                 if (/*upd_statistics && */m_Cache.IsSaveStatistics()) {
00347                     try {
00348                         CFastMutexGuard guard(m_Cache.m_DB_Lock);
00349                         m_Cache.m_Statistics.AddStore(m_Owner,
00350                                                       m_RequestTime,
00351                                                       m_BlobStore,
00352                                                       m_BlobUpdate,
00353                                                       m_BlobSize,
00354                                                       m_Overflow);
00355                     } catch (exception& ) {
00356                         // ignore non critical exceptions
00357                     }
00358                 }
00359             }
00360             //_TRACE("CBDB_CacheIWriter::~CBDB_CacheIWriter point 7");
00361 
00362         } catch (exception & ex) {
00363             ERR_POST_X(1, "Exception in ~CBDB_CacheIWriter() : " << ex.what()
00364                           << " " << m_BlobKey);
00365             // final attempt to avoid leaks
00366 //            delete[] m_Buffer;
00367             delete   m_OverflowFile;
00368         }
00369     }
00370 
00371     virtual ERW_Result Write(const void*  buf,
00372                              size_t       count,
00373                              size_t*      bytes_written = 0)
00374     {
00375         _ASSERT(m_Flushed == false);
00376 
00377         if (bytes_written)
00378             *bytes_written = 0;
00379         if (count == 0)
00380             return eRW_Success;
00381 //      m_AttrUpdFlag = false;
00382         m_BlobSize += count;
00383         m_Flushed = false;
00384 
00385         // check BLOB size quota
00386         if (m_Cache.GetMaxBlobSize()) {
00387             if (m_BlobSize > m_Cache.GetMaxBlobSize()) {
00388                 //delete m_Buffer; m_Buffer = 0;
00389                 m_Buffer.clear();
00390                 delete m_OverflowFile; m_OverflowFile = 0;
00391                 m_Cache.KillBlob(m_BlobKey, m_Version, m_SubKey, 1, 0);
00392 // FIXME:
00393                 if (m_Cache.IsSaveStatistics()) {
00394                     CFastMutexGuard guard(m_Cache.m_DB_Lock);
00395                     m_Cache.m_Statistics.AddBlobQuotaError(m_Owner);
00396                 }
00397 
00398                 string msg("BLOB larger than allowed. size=");
00399                 msg.append(NStr::UIntToString(m_BlobSize));
00400                 msg.append(" quota=");
00401                 msg.append(NStr::UIntToString(m_Cache.GetMaxBlobSize()));
00402                 BDB_THROW(eQuotaLimit, msg);
00403             }
00404         }
00405 
00406 
00407 //        if (m_Buffer.size()) {
00408         if (!m_OverflowFile) {
00409             unsigned int new_buf_length = m_Buffer.size() + count;
00410             if (new_buf_length <= m_Cache.GetOverflowLimit()) {
00411                 size_t old_size = m_Buffer.size();
00412                 m_Buffer.resize(old_size + count);
00413                 ::memcpy(m_Buffer.data() + old_size, buf, count);
00414                 //m_BytesInBuffer = new_buf_length;
00415                 if (bytes_written) {
00416                     *bytes_written = count;
00417                 }
00418 
00419                 return eRW_Success;
00420             } else {
00421                 // Buffer overflow. Writing to file.
00422                 OpenOverflowFile();
00423                 if (m_OverflowFile) {
00424                     if (m_Buffer.size()) {
00425                         try {
00426                             x_WriteOverflow((const char*)m_Buffer.data(),
00427                                              m_Buffer.size());
00428                         }
00429                         catch (exception& ) {
00430                             m_Buffer.clear();
00431                             //delete[] m_Buffer; m_Buffer = 0;
00432                             delete m_OverflowFile; m_OverflowFile = 0;
00433                             throw;
00434                         }
00435                     }
00436                     //delete[] m_Buffer; m_Buffer = 0; m_BytesInBuffer = 0;
00437                     m_Buffer.clear();
00438                 }
00439             }
00440 
00441         }
00442         if (m_OverflowFile) {
00443 
00444             _ASSERT(m_Buffer.size() == 0);
00445 
00446             x_WriteOverflow((char*)buf, count);
00447             if (bytes_written) {
00448                 *bytes_written = count;
00449             }
00450             return eRW_Success;
00451         }
00452 
00453         return eRW_Error;
00454     }
00455 
00456     /// Flush pending data (if any) down to output device.
00457     virtual ERW_Result Flush(void)
00458     {
00459         if (m_Flushed)
00460             return eRW_Success;
00461 
00462         m_Flushed = true;
00463 
00464         // Dumping the buffer
00465 
00466 //        if (m_Buffer.size()) {
00467         if (!m_OverflowFile) {
00468 
00469             try {
00470                 m_Cache.x_Store(m_BlobIdExt,
00471                                 m_BlobKey,
00472                                 m_Version,
00473                                 m_SubKey,
00474                                 m_Buffer.data(),
00475                                 m_Buffer.size(), // m_BytesInBuffer,
00476                                 m_TTL,
00477                                 m_Owner,
00478                                 false // do not lock blob
00479                                 );
00480                 //m_BlobLock.Unlock();
00481                 //return eRW_Success;
00482             }
00483             catch (exception&) {
00484                 m_Buffer.clear();
00485 //              delete[] m_Buffer; m_Buffer = 0;
00486                 throw;
00487             }
00488 
00489             //delete[] m_Buffer; m_Buffer = 0;
00490             //m_BytesInBuffer = 0;
00491             //m_Buffer.clear();
00492             //return eRW_Success;
00493         }
00494 
00495         if ( m_OverflowFile ) {
00496 
00497             _ASSERT(m_Buffer.size() == 0);
00498 
00499             try {
00500                 m_OverflowFile->flush();
00501                 if ( m_OverflowFile->bad() ) {
00502                     m_OverflowFile->close();
00503                     BDB_THROW(eOverflowFileIO,
00504                               "Error trying to flush an overflow file");
00505                 }
00506 /*
00507                 m_Cache.RegisterOverflow(m_BlobKey, m_Version, m_SubKey,
00508                                          m_TTL, m_Owner);
00509 
00510                 // FIXME:
00511                 if (m_Cache.IsSaveStatistics()) {
00512                     CFastMutexGuard guard(m_Cache.m_DB_Lock);
00513                     try {
00514                         m_Cache.m_Statistics.AddStore(m_Owner,
00515                                                     m_RequestTime,
00516                                                     m_BlobStore,
00517                                                     m_BlobUpdate,
00518                                                     m_BlobSize,
00519                                                     m_Overflow);
00520                     } catch (exception&) {
00521                         // ignore
00522                     }
00523                 }
00524 */
00525             } catch (CBDB_Exception& ) {
00526                 m_Cache.KillBlob(m_BlobKey, m_Version, m_SubKey,
00527                                  1,
00528                                  0);
00529                 throw;
00530             }
00531         }
00532         m_BlobLock.Unlock();
00533         return eRW_Success;
00534     }
00535 private:
00536     void OpenOverflowFile()
00537     {
00538         s_MakeOverflowFileName(
00539             m_OverflowFilePath, m_Path, m_Cache.GetName(), m_BlobKey, m_Version, m_SubKey);
00540 
00541         _TRACE("LC: Making overflow file " << m_OverflowFilePath);
00542         m_OverflowFile =
00543             new CNcbiOfstream(m_OverflowFilePath.c_str(),
00544                               IOS_BASE::out |
00545                               IOS_BASE::trunc |
00546                               IOS_BASE::binary);
00547         if (!m_OverflowFile->is_open() || m_OverflowFile->bad()) {
00548             delete m_OverflowFile; m_OverflowFile = 0;
00549             string err = "LC: Cannot create overflow file ";
00550             err += m_OverflowFilePath;
00551             BDB_THROW(eCannotOpenOverflowFile, err);
00552         }
00553         m_Overflow = 1;
00554     }
00555 
00556     void x_WriteOverflow(const char* buf, streamsize count)
00557     {
00558         _ASSERT(m_OverflowFile);
00559 
00560         if (!m_OverflowFile->is_open()) {
00561             BDB_THROW(eOverflowFileIO,
00562                  "LC: Attempt to write to a non-open overflow file");
00563         }
00564         try {
00565             m_Cache.WriteOverflow(*m_OverflowFile,
00566                                    m_OverflowFilePath,
00567                                    buf, count);
00568         }
00569         catch (exception&) {
00570             // any error here is critical to the BLOB storage integrity
00571             // if file IO error happens we delete BLOB altogether
00572             m_Cache.KillBlob(m_BlobKey, m_Version, m_SubKey, 1, 0);
00573             throw;
00574         }
00575     }
00576 
00577 private:
00578     CBDB_CacheIWriter(const CBDB_CacheIWriter&);
00579     CBDB_CacheIWriter& operator=(const CBDB_CacheIWriter&);
00580 
00581 private:
00582     CBDB_Cache&           m_Cache;
00583     const char*           m_Path;
00584     unsigned              m_BlobIdExt;
00585     string                m_BlobKey;
00586     int                   m_Version;
00587     string                m_SubKey;
00588     SCache_AttrDB&        m_AttrDB;
00589 
00590     TBuffer               m_Buffer;
00591 //    unsigned char*        m_Buffer;
00592 //    unsigned int          m_BytesInBuffer;
00593     CNcbiOfstream*        m_OverflowFile;
00594     string                m_OverflowFilePath;
00595 
00596     int                   m_StampSubKey;
00597 //  bool                  m_AttrUpdFlag; ///< Flags attributes are up to date
00598     CBDB_Cache::EWriteSyncMode m_WSync;
00599     unsigned int          m_TTL;
00600     time_t                m_RequestTime;
00601     bool                  m_Flushed;     ///< FALSE until Flush() called
00602 
00603     unsigned              m_BlobSize;    ///< Number of bytes written
00604     unsigned              m_Overflow;    ///< Overflow file created
00605     unsigned              m_BlobStore;
00606     unsigned              m_BlobUpdate;
00607     string                m_Owner;
00608     CBDB_Cache::TBlobLock m_BlobLock;
00609 };
00610 
00611 
00612 SBDB_CacheUnitStatistics::SBDB_CacheUnitStatistics()
00613 {
00614     Init();
00615 }
00616 
00617 void SBDB_CacheUnitStatistics::Init()
00618 {
00619     blobs_stored_total = blobs_updates_total =
00620         blobs_never_read_total = blobs_expl_deleted_total =
00621         blobs_purge_deleted_total = blobs_db =
00622         blobs_overflow_total = blobs_read_total = blob_size_max_total = 0;
00623 
00624     err_protocol = err_internal = err_communication =
00625         err_no_blob = err_blob_get = err_blob_put = err_blob_over_quota = 0;
00626 
00627     blobs_size_total = blobs_size_db = 0.0;
00628     InitHistorgam(&blob_size_hist);
00629 
00630     time_access.erase(time_access.begin(), time_access.end());
00631 }
00632 
00633 void SBDB_CacheUnitStatistics::InitHistorgam(TBlobSizeHistogram* hist)
00634 {
00635     _ASSERT(hist);
00636 
00637     hist->clear();
00638     unsigned hist_value = 512;
00639     for (unsigned i = 0; i < 100; ++i) {
00640         (*hist)[hist_value] = 0;
00641         hist_value *= 2;
00642     }
00643     (*hist)[kMax_UInt] = 0;
00644 }
00645 
00646 void SBDB_CacheUnitStatistics::AddToHistogram(TBlobSizeHistogram* hist,
00647                                           unsigned            size)
00648 {
00649     if (hist->empty()) {
00650         return;
00651     }
00652     TBlobSizeHistogram::iterator it = hist->upper_bound(size);
00653     if (it == hist->end()) {
00654         return;
00655     }
00656     ++(it->second);
00657 }
00658 
00659 
00660 /// Compute current hour of day
00661 static
00662 void s_GetDayHour(time_t time_in_secs, unsigned* day, unsigned* hour)
00663 {
00664     *day = static_cast<unsigned>(time_in_secs / (24 * 60 * 60));
00665     unsigned secs_in_day = static_cast<unsigned>(time_in_secs % (24 * 60 * 60));
00666     *hour = secs_in_day / 3600;
00667 }
00668 
00669 void SBDB_CacheUnitStatistics::AddStore(time_t   tm,
00670                                         unsigned store,
00671                                         unsigned update,
00672                                         unsigned blob_size,
00673                                         unsigned overflow)
00674 {
00675     blobs_stored_total += store;
00676     blobs_updates_total += update;
00677     blobs_size_total += blob_size;
00678     blobs_overflow_total += overflow;
00679     if (blob_size > blob_size_max_total) {
00680         blob_size_max_total = blob_size;
00681     }
00682     AddToHistogram(&blob_size_hist, blob_size);
00683 
00684     unsigned day, hour;
00685     s_GetDayHour(tm, &day, &hour);
00686     if (time_access.empty()) {
00687         time_access.push_back(SBDB_TimeAccessStatistics(day, hour, 1, 0));
00688         return;
00689     }
00690     SBDB_TimeAccessStatistics& ta_stat = time_access.back();
00691     if (ta_stat.day == day && ta_stat.hour == hour) {
00692         ++ta_stat.put_count;
00693     } else {
00694         time_access.push_back(SBDB_TimeAccessStatistics(day, hour, 1, 0));
00695         if (time_access.size() > 48) {
00696             time_access.pop_front();
00697         }
00698     }
00699 }
00700 
00701 void SBDB_CacheUnitStatistics::AddRead(time_t tm)
00702 {
00703     ++blobs_read_total;
00704 
00705     unsigned day, hour;
00706     s_GetDayHour(tm, &day, &hour);
00707     if (time_access.empty()) {
00708         time_access.push_back(SBDB_TimeAccessStatistics(day, hour, 0, 1));
00709         return;
00710     }
00711     SBDB_TimeAccessStatistics& ta_stat = time_access.back();
00712     if (ta_stat.day == day && ta_stat.hour == hour) {
00713         ++ta_stat.get_count;
00714     } else {
00715         time_access.push_back(SBDB_TimeAccessStatistics(day, hour, 0, 1));
00716         if (time_access.size() > 48) {
00717             time_access.pop_front();
00718         }
00719     }
00720 }
00721 
00722 void SBDB_CacheUnitStatistics::x_AddErrGetPut(EErrGetPut operation)
00723 {
00724     switch (operation) {
00725     case eErr_Put:
00726         ++err_blob_put;
00727         break;
00728     case eErr_Get:
00729         ++err_blob_get;
00730         break;
00731     case eErr_Unknown:
00732         break;
00733     default:
00734         _ASSERT(0);
00735         break;
00736     } // switch
00737 }
00738 
00739 void SBDB_CacheUnitStatistics::AddInternalError(EErrGetPut operation)
00740 {
00741     ++err_internal;
00742     x_AddErrGetPut(operation);
00743 }
00744 
00745 void SBDB_CacheUnitStatistics::AddBlobQuotaError()
00746 {
00747     ++err_blob_over_quota;
00748     x_AddErrGetPut(eErr_Put);
00749 }
00750 
00751 void SBDB_CacheUnitStatistics::AddProtocolError(EErrGetPut operation)
00752 {
00753     ++err_protocol;
00754     x_AddErrGetPut(operation);
00755 }
00756 
00757 void SBDB_CacheUnitStatistics::AddNoBlobError(EErrGetPut operation)
00758 {
00759     ++err_no_blob;
00760     x_AddErrGetPut(operation);
00761 }
00762 
00763 void SBDB_CacheUnitStatistics::AddCommError(EErrGetPut operation)
00764 {
00765     ++err_communication;
00766     x_AddErrGetPut(operation);
00767 }
00768 
00769 void SBDB_CacheUnitStatistics::PrintStatistics(CNcbiOstream& out) const
00770 {
00771     out
00772         << "Total number of blobs ever stored                  " << "\t" << blobs_stored_total   << "\n"
00773         << "Total number of overflow blobs (large size)        " << "\t" << blobs_overflow_total << "\n"
00774         << "Total number of blobs updates                      " << "\t" << blobs_updates_total    << "\n"
00775         << "Total number of blobs stored but never read        " << "\t" << blobs_never_read_total << "\n"
00776         << "Total number of reads                              " << "\t" << blobs_read_total << "\n"
00777         << "Total number of explicit deletes                   " << "\t" << blobs_expl_deleted_total << "\n"
00778         << "Total number of BLOBs deletes by garbage collector " << "\t" << blobs_purge_deleted_total << "\n"
00779         << "Total size of all BLOBs ever stored                " << "\t" << blob_size_max_total << "\n"
00780 
00781         << "Current database number of records(BLOBs)          " << "\t" << blobs_db << "\n"
00782         << "Current size of all BLOBs                          "  << "\t" << blobs_size_db << "\n"
00783 
00784         << "Number of NetCache protocol errors                 " << "\t" << err_protocol << "\n"
00785         << "Number of communication errors                     " << "\t" << err_communication << "\n"
00786         << "Number of NetCache server internal errors          " << "\t" << err_internal << "\n"
00787         << "Number of BLOB not found situations                " << "\t" << err_no_blob << "\n"
00788         << "Number of errors when getting BLOBs                " << "\t" << err_blob_get << "\n"
00789         << "Number of errors when storing BLOBs                " << "\t" << err_blob_put << "\n"
00790         << "Number of errors when BLOB is over the size limit  " << "\t" << err_blob_over_quota << "\n";
00791 
00792     out << "\n\n";
00793 
00794     if (!time_access.empty()) {
00795         out << "# Time access statistics:" << "\n" << "\n";
00796         out << "# Hour \t Puts \t Gets" << "\n";
00797 
00798         ITERATE(TTimeAccess, it, time_access) {
00799             const SBDB_TimeAccessStatistics& ta_stat = *it;
00800             out << ta_stat.hour << "\t"
00801                 << ta_stat.put_count << "\t"
00802                 << ta_stat.get_count << "\n";
00803         }
00804     }
00805     out << "\n\n";
00806 
00807     if (!blob_size_hist.empty()) {
00808         out << "# BLOB size histogram:" << "\n" << "\n";
00809         out << "# Size \t Count" << "\n";
00810 
00811         const TBlobSizeHistogram& hist = blob_size_hist;
00812         SBDB_CacheUnitStatistics::TBlobSizeHistogram::const_iterator hist_end =
00813             hist.end();
00814 
00815         ITERATE(SBDB_CacheUnitStatistics::TBlobSizeHistogram, it, hist) {
00816             if (it->second > 0) {
00817                 hist_end = it;
00818             }
00819         }
00820         ITERATE(SBDB_CacheUnitStatistics::TBlobSizeHistogram, it, hist) {
00821             out << it->first << "\t" << it->second << "\n";
00822             if (it == hist_end) {
00823                 break;
00824             }
00825         } // for
00826     }
00827 
00828 }
00829 
00830 void SBDB_CacheUnitStatistics::ConvertToRegistry(IRWRegistry* reg,
00831                                                  const string& sect_name_postfix) const
00832 {
00833     string postfix(sect_name_postfix);
00834     postfix = NStr::Replace(postfix, " ", "_");
00835 
00836     // convert main parameters
00837 
00838     {{
00839     string sect_stat("bdb_stat");
00840      if (!postfix.empty()) {
00841         sect_stat.append("_");
00842         sect_stat.append(postfix);
00843     }
00844 
00845     reg->Set(sect_stat, "blobs_stored_total",
00846              NStr::UIntToString(blobs_stored_total), 0,
00847              "Total number of blobs ever stored");
00848     reg->Set(sect_stat, "blobs_overflow_total",
00849              NStr::UIntToString(blobs_overflow_total), 0,
00850              "Total number of overflow blobs (large size)");
00851     reg->Set(sect_stat, "blobs_updates_total",
00852              NStr::UIntToString(blobs_updates_total), 0,
00853              "Total number of blobs updates");
00854     reg->Set(sect_stat, "blobs_never_read_total",
00855              NStr::UIntToString(blobs_never_read_total), 0,
00856              "Total number of blobs stored but never read");
00857     reg->Set(sect_stat, "blobs_read_total",
00858              NStr::UIntToString(blobs_read_total), 0,
00859              "Total number of reads");
00860     reg->Set(sect_stat, "blobs_expl_deleted_total",
00861              NStr::UIntToString(blobs_expl_deleted_total), 0,
00862              "Total number of explicit deletes");
00863     reg->Set(sect_stat, "blobs_purge_deleted_total",
00864              NStr::UIntToString(blobs_purge_deleted_total), 0,
00865              "Total number of BLOBs deletes by garbage collector");
00866     reg->Set(sect_stat, "blobs_size_total",
00867              NStr::UIntToString(unsigned(blobs_size_total)), 0,
00868              "Total size of all BLOBs ever stored");
00869     reg->Set(sect_stat, "blob_size_max_total",
00870              NStr::UIntToString(blob_size_max_total), 0,
00871              "Size of the largest BLOB ever stored");
00872 
00873 
00874     reg->Set(sect_stat, "blobs_db",
00875              NStr::UIntToString(blobs_db), 0,
00876              "Current database number of records(BLOBs)");
00877     reg->Set(sect_stat, "blobs_size_db",
00878              NStr::UIntToString(unsigned(blobs_size_db)), 0,
00879              "Current size of all BLOBs");
00880 
00881     reg->Set(sect_stat, "err_protocol",
00882              NStr::UIntToString(err_protocol), 0,
00883              "Number of NetCache protocol errors");
00884     reg->Set(sect_stat, "err_communication",
00885              NStr::UIntToString(err_communication), 0,
00886              "Number of communication errors");
00887     reg->Set(sect_stat, "err_internal",
00888              NStr::UIntToString(err_internal), 0,
00889              "Number of NetCache server internal errors");
00890     reg->Set(sect_stat, "err_no_blob",
00891              NStr::UIntToString(err_no_blob), 0,
00892              "Number of BLOB not found situations");
00893     reg->Set(sect_stat, "err_blob_get",
00894              NStr::UIntToString(err_blob_get), 0,
00895              "Number of errors when getting BLOBs");
00896     reg->Set(sect_stat, "err_blob_put",
00897              NStr::UIntToString(err_blob_put), 0,
00898              "Number of errors when storing BLOBs");
00899     reg->Set(sect_stat, "err_blob_over_quota",
00900              NStr::UIntToString(err_blob_over_quota), 0,
00901              "Number of errors when BLOB is over the size limit");
00902 
00903     // convert time access statistics
00904     //
00905     if (!time_access.empty()) {
00906 
00907         string get_history;
00908         string put_history;
00909 
00910         ITERATE(TTimeAccess, it, time_access) {
00911             const SBDB_TimeAccessStatistics& ta_stat = *it;
00912             string hour = NStr::UIntToString(ta_stat.hour);
00913             string put_count = NStr::UIntToString(ta_stat.put_count);
00914             string get_count = NStr::UIntToString(ta_stat.get_count);
00915 
00916             string pair = hour;
00917             pair.append("=");
00918             pair.append(put_count);
00919 
00920             if (!put_history.empty()) {
00921                 put_history.append(";");
00922             }
00923             put_history.append(pair);
00924 
00925 
00926             pair = hour;
00927             pair.append("=");
00928             pair.append(get_count);
00929 
00930             if (!get_history.empty()) {
00931                 get_history.append(";");
00932             }
00933             get_history.append(pair);
00934 
00935         } // ITERATE
00936 
00937         reg->Set(sect_stat, "get_access",
00938              get_history, 0,
00939              "Read access by hours (hour=count)");
00940         reg->Set(sect_stat, "put_access",
00941              put_history, 0,
00942              "Write access by hours (hour=count)");
00943 
00944     }
00945 
00946 
00947     }}
00948 
00949     // convert historgam
00950     //
00951     if (!blob_size_hist.empty()) {
00952         string sect_hist("bdb_stat_hist");
00953         if (!postfix.empty()) {
00954             sect_hist.append("_");
00955             sect_hist.append(postfix);
00956         }
00957 
00958         const TBlobSizeHistogram& hist = blob_size_hist;
00959         SBDB_CacheUnitStatistics::TBlobSizeHistogram::const_iterator hist_end =
00960             hist.end();
00961 
00962         ITERATE(SBDB_CacheUnitStatistics::TBlobSizeHistogram, it, hist) {
00963             if (it->second > 0) {
00964                 hist_end = it;
00965             }
00966         }
00967         ITERATE(SBDB_CacheUnitStatistics::TBlobSizeHistogram, it, hist) {
00968             string var_name = "size_";
00969             var_name += NStr::UIntToString(it->first);
00970 
00971             reg->Set(sect_hist, var_name, NStr::UIntToString(it->second));
00972 
00973             if (it == hist_end) {
00974                 break;
00975             }
00976         } // for
00977     }
00978 
00979 }
00980 
00981 
00982 
00983 
00984 SBDB_CacheStatistics::SBDB_CacheStatistics()
00985 {
00986 }
00987 
00988 void SBDB_CacheStatistics::Init()
00989 {
00990     m_GlobalStat.Init();
00991     m_OwnerStatMap.erase(m_OwnerStatMap.begin(), m_OwnerStatMap.end());
00992 }
00993 
00994 
00995 void SBDB_CacheStatistics::AddStore(const string& client,
00996                                     time_t        tm,
00997                                     unsigned      store,
00998                                     unsigned      update,
00999                                     unsigned      blob_size,
01000                                     unsigned      overflow)
01001 {
01002     m_GlobalStat.AddStore(tm, store, update, blob_size, overflow);
01003     if (!client.empty()) {
01004         m_OwnerStatMap[client].AddStore(tm, store, update, blob_size, overflow);
01005     }
01006 }
01007 
01008 void SBDB_CacheStatistics::AddRead(const string&  client, time_t tm)
01009 {
01010     m_GlobalStat.AddRead(tm);
01011     if (!client.empty()) {
01012         m_OwnerStatMap[client].AddRead(tm);
01013     }
01014 }
01015 
01016 void SBDB_CacheStatistics::AddExplDelete(const string&  client)
01017 {
01018     m_GlobalStat.AddExplDelete();
01019     if (!client.empty()) {
01020         m_OwnerStatMap[client].AddExplDelete();
01021     }
01022 }
01023 
01024 void SBDB_CacheStatistics::AddPurgeDelete(const string& client)
01025 {
01026     m_GlobalStat.AddPurgeDelete();
01027     if (!client.empty()) {
01028         m_OwnerStatMap[client].AddPurgeDelete();
01029     }
01030 }
01031 
01032 void SBDB_CacheStatistics::AddNeverRead(const string&   client)
01033 {
01034     m_GlobalStat.AddNeverRead();
01035     if (!client.empty()) {
01036         m_OwnerStatMap[client].AddNeverRead();
01037     }
01038 }
01039 
01040 void SBDB_CacheStatistics::AddBlobQuotaError(const string& client)
01041 {
01042     m_GlobalStat.AddBlobQuotaError();
01043     if (!client.empty()) {
01044         m_OwnerStatMap[client].AddBlobQuotaError();
01045     }
01046 }
01047 
01048 void SBDB_CacheStatistics::AddInternalError(const string& client,
01049                      SBDB_CacheUnitStatistics::EErrGetPut operation)
01050 {
01051     m_GlobalStat.AddInternalError(operation);
01052     if (!client.empty()) {
01053         m_OwnerStatMap[client].AddInternalError(operation);
01054     }
01055 }
01056 
01057 void SBDB_CacheStatistics::AddProtocolError(const string& client,
01058                      SBDB_CacheUnitStatistics::EErrGetPut operation)
01059 {
01060     m_GlobalStat.AddProtocolError(operation);
01061     if (!client.empty()) {
01062         m_OwnerStatMap[client].AddProtocolError(operation);
01063     }
01064 }
01065 
01066 void SBDB_CacheStatistics::AddNoBlobError(const string& client,
01067                    SBDB_CacheUnitStatistics::EErrGetPut operation)
01068 {
01069     m_GlobalStat.AddNoBlobError(operation);
01070     if (!client.empty()) {
01071         m_OwnerStatMap[client].AddNoBlobError(operation);
01072     }
01073 }
01074 
01075 void SBDB_CacheStatistics::AddCommError(const string& client,
01076                  SBDB_CacheUnitStatistics::EErrGetPut operation)
01077 {
01078     m_GlobalStat.AddCommError(operation);
01079     if (!client.empty()) {
01080         m_OwnerStatMap[client].AddCommError(operation);
01081     }
01082 }
01083 
01084 void SBDB_CacheStatistics::ConvertToRegistry(IRWRegistry* reg) const
01085 {
01086     m_GlobalStat.ConvertToRegistry(reg, kEmptyStr);
01087     ITERATE(TOwnerStatMap, it, m_OwnerStatMap) {
01088         const string& client = it->first;
01089         const SBDB_CacheUnitStatistics& stat = it->second;
01090         stat.ConvertToRegistry(reg, client);
01091     }
01092 }
01093 
01094 void SBDB_CacheStatistics::PrintStatistics(CNcbiOstream& out) const
01095 {
01096     out << "## " << "\n"
01097         << "## Global statistics" << "\n"
01098         << "## " << "\n\n";
01099     m_GlobalStat.PrintStatistics(out);
01100     out << "\n\n";
01101 
01102     ITERATE(TOwnerStatMap, it, m_OwnerStatMap) {
01103         const string& client = it->first;
01104         const SBDB_CacheUnitStatistics& stat = it->second;
01105 
01106         out << "## " << "\n"
01107             << "## Owner statistics:" << client << "\n"
01108             << "## " << "\n\n";
01109 
01110         stat.PrintStatistics(out);
01111         out << "\n\n";
01112     }
01113 }
01114 
01115 
01116 CBDB_Cache::CBDB_Cache()
01117 : m_PidGuard(0),
01118   m_ReadOnly(false),
01119   m_InitIfDirty(false),
01120   m_JoinedEnv(false),
01121   m_LockTimeout(20 * 1000),
01122   m_Env(0),
01123   m_Closed(true),
01124   m_BLOB_SplitStore(0),
01125   m_CacheAttrDB(0),
01126   m_CacheIdIDX(0),
01127   m_CacheAttrDB_RO1(0),
01128   m_CacheAttrDB_RO2(0),
01129   m_CacheIdIDX_RO(0),
01130   m_Timeout(0),
01131   m_MaxTimeout(0),
01132   m_VersionFlag(eDropOlder),
01133   m_WSync(eWriteNoSync),
01134   m_PurgeBatchSize(150),
01135   m_BatchSleep(0),
01136   m_PurgeStopSignal(0, 100), // purge stop semaphore max 100
01137   m_CleanLogOnPurge(0),
01138   m_PurgeCount(0),
01139   m_LogSizeMax(0),
01140   m_PurgeNowRunning(false),
01141   m_RunPurgeThread(false),
01142   m_PurgeThreadDelay(10),
01143   m_CheckPointInterval(24 * (1024 * 1024)),
01144   m_OverflowLimit(512 * 1024),
01145   m_MaxTTL_prolong(0),
01146   m_SaveStatistics(false),
01147   m_MaxBlobSize(0),
01148   m_RoundRobinVolumes(0),
01149   m_MempTrickle(10),
01150   m_Monitor(0),
01151   m_TimeLine(0)
01152 {
01153     m_TimeStampFlag = fTimeStampOnRead |
01154                       fExpireLeastFrequentlyUsed |
01155                       fPurgeOnStartup;
01156 
01157 }
01158 
01159 CBDB_Cache::~CBDB_Cache()
01160 {
01161     try {
01162         Close();
01163     } catch (exception& )
01164     {}
01165 }
01166 
01167 void CBDB_Cache::SetTTL_Prolongation(unsigned ttl_prolong)
01168 {
01169     m_MaxTTL_prolong = ttl_prolong;
01170 }
01171 
01172 void CBDB_Cache::x_PidLock(ELockMode lm)
01173 {
01174     string lock_file = string("lcs_") + m_Name + string(".pid");
01175     string lock_file_path = m_Path + lock_file;
01176 
01177     switch (lm)
01178     {
01179     case ePidLock:
01180         _ASSERT(m_PidGuard == 0);
01181         m_PidGuard = new CPIDGuard(lock_file_path, m_Path);
01182         break;
01183     case eNoLock:
01184         break;
01185     default:
01186         break;
01187     }
01188 }
01189 
01190 void CBDB_Cache::SetOverflowLimit(unsigned limit)
01191 {
01192     _ASSERT(!IsOpen());
01193     m_OverflowLimit = limit;
01194 }
01195 
01196 void CBDB_Cache::Open(const string& cache_path,
01197                       const string& cache_name,
01198                       ELockMode     lm,
01199                       Uint8         cache_ram_size,
01200                       ETRansact     use_trans,
01201                       unsigned int  log_mem_size)
01202 {
01203     {{
01204     m_NextExpTime = 0;
01205 //    m_PurgeSkipCnt = 0;
01206     m_LastTimeLineCheck = 0;
01207 
01208     Close();
01209     m_Closed = false;
01210 
01211     CFastMutexGuard guard(m_DB_Lock);
01212 
01213     m_Path = CDirEntry::AddTrailingPathSeparator(cache_path);
01214     m_Name = cache_name;
01215 
01216     CDir dir(m_Path);
01217     CDir::TEntries fl = dir.GetEntries("__db.*", CDir::eIgnoreRecursive);
01218     CFile fl_clean(CDirEntry::MakePath(m_Path, kBDBCacheStartedFileName));
01219 
01220     static set<string> s_OpenedDirs;
01221 
01222     if ((!fl.empty()  ||  fl_clean.Exists())
01223         &&  s_OpenedDirs.count(m_Path) == 0)
01224     {
01225         if (m_InitIfDirty) {
01226             LOG_POST("Database was closed uncleanly. Removing directory "
01227                      << m_Path);
01228             dir.Remove();
01229         }
01230         else {
01231             // Opening db with recover flags is unreliable.
01232             LOG_POST("Database was closed uncleanly. Running recovery...");
01233             BDB_RecoverEnv(m_Path, false);
01234             fl_clean.Remove();
01235         }
01236     }
01237 
01238 
01239     // Make sure our directory exists
01240     if ( !dir.Exists() ) {
01241         dir.Create();
01242     }
01243 
01244     if (!fl_clean.Exists()) {
01245         CFileWriter writer(fl_clean.GetPath());
01246         string pid = NStr::NumericToString(CProcess::GetCurrentPid());
01247         writer.Write(pid.data(), pid.size());
01248         s_OpenedDirs.insert(m_Path);
01249     }
01250 
01251     m_Env = new CBDB_Env();
01252 
01253     string err_file = m_Path + "err" + string(cache_name) + ".log";
01254     m_Env->OpenErrFile(err_file.c_str());
01255 
01256     m_JoinedEnv = false;
01257     bool needs_recovery = false;
01258 
01259     if (!fl.empty()) {
01260         try {
01261             m_Env->JoinEnv(cache_path, CBDB_Env::eThreaded);
01262             if (m_Env->IsTransactional()) {
01263                 LOG_POST_X(2, Info <<
01264                            "LC: '" << cache_name <<
01265                            "' Joined transactional environment ");
01266             } else {
01267                 LOG_POST_X(3, Info <<
01268                            "LC: '" << cache_name <<
01269                            "' Warning: Joined non-transactional environment ");
01270             }
01271             m_JoinedEnv = true;
01272         }
01273         catch (CBDB_ErrnoException& err_ex)
01274         {
01275             if (err_ex.IsRecovery()) {
01276                 LOG_POST_X(4, Warning <<
01277                            "LC: '" << cache_name <<
01278                            "'Warning: DB_ENV returned DB_RUNRECOVERY code."
01279                            " Running the recovery procedure.");
01280             }
01281             needs_recovery = true;
01282         }
01283         catch (CBDB_Exception&)
01284         {
01285             needs_recovery = true;
01286         }
01287     }
01288 
01289     if (!m_JoinedEnv) {
01290         if (log_mem_size == 0) {
01291             if (m_LogSizeMax >= (20 * 1024 * 1024)) {
01292                 m_Env->SetLogFileMax(m_LogSizeMax);
01293             } else {
01294                 m_Env->SetLogFileMax(200 * 1024 * 1024);
01295             }
01296             m_Env->SetLogBSize(1 * 1024 * 1024);
01297         } else {
01298             m_Env->SetLogInMemory(true);
01299             m_Env->SetLogBSize(log_mem_size);
01300         }
01301         if (!m_LogDir.empty()) {
01302             m_Env->SetLogDir(m_LogDir);
01303         }
01304 
01305         if (cache_ram_size) {
01306             // empirically if total cache size is large
01307             // you Berkeley DB works faster if you break it into several
01308             // files. The sweetspot seems to be around 250M per file.
01309             // (but growing number of files is not a good idea as well)
01310             // so here I'm dancing to set some reasonable number of caches
01311             //
01312             int cache_num = 1;
01313             if (cache_ram_size > (500 * 1024 * 1024)) {
01314                 cache_num = (int)((cache_ram_size) / Uint8(250 * 1024 * 1024));
01315             }
01316             if (!cache_num) {
01317                 cache_num = 1;  // paranoid check
01318             }
01319             if (cache_num > 10) { // too many files?
01320                 cache_num = 10;
01321             }
01322             m_Env->SetCacheSize(cache_ram_size, cache_num);
01323         }
01324 
01325         unsigned checkpoint_KB = m_CheckPointInterval / 1024;
01326         if (checkpoint_KB == 0) {
01327             checkpoint_KB = 64;
01328         }
01329         m_Env->SetCheckPointKB(checkpoint_KB);
01330 
01331         m_Env->SetLogAutoRemove(true);
01332 
01333         x_PidLock(lm);
01334         switch (use_trans)
01335         {
01336         case eUseTrans:
01337             {
01338             CBDB_Env::TEnvOpenFlags env_flags = CBDB_Env::eThreaded;
01339             if (needs_recovery) {
01340                 env_flags |= CBDB_Env::eRunRecovery;
01341             }
01342             m_Env->SetMaxLocks(50000);
01343             m_Env->OpenWithTrans(cache_path, env_flags);
01344             }
01345             break;
01346         case eNoTrans:
01347             LOG_POST_X(5, Info << "BDB_Cache: Creating locking environment");
01348             m_Env->OpenWithLocks(cache_path);
01349             break;
01350         default:
01351             _ASSERT(0);
01352         } // switch
01353 
01354         m_Env->SetLockTimeout(30 * 1000000); // 30 sec
01355         if (m_Env->IsTransactional()) {
01356             m_Env->SetTransactionTimeout(30 * 1000000); // 30 sec
01357         }
01358 
01359     }
01360 
01361     if (GetWriteSync() == eWriteSync) {
01362         m_Env->SetTransactionSync(CBDB_Transaction::eTransSync);
01363     } else {
01364         m_Env->SetTransactionSync(CBDB_Transaction::eTransASync);
01365     }
01366 
01367     m_BLOB_SplitStore =
01368         new TSplitStore(new CBDB_BlobDeMux_RoundRobin(m_RoundRobinVolumes));
01369     m_CacheAttrDB = new SCache_AttrDB();
01370     m_CacheAttrDB_RO1 = new SCache_AttrDB();
01371     m_CacheAttrDB_RO2 = new SCache_AttrDB();
01372     m_CacheIdIDX = new SCache_IdIDX();
01373     m_CacheIdIDX_RO = new SCache_IdIDX();
01374 
01375     m_BLOB_SplitStore->SetEnv(*m_Env);
01376     m_CacheAttrDB->SetEnv(*m_Env);
01377     m_CacheAttrDB_RO1->SetEnv(*m_Env);
01378     m_CacheAttrDB_RO2->SetEnv(*m_Env);
01379     m_CacheIdIDX->SetEnv(*m_Env);
01380     m_CacheIdIDX_RO->SetEnv(*m_Env);
01381 
01382     string cache_blob_db_name =
01383        string("lcs_") + string(cache_name) + string("_blob");
01384     string attr_db_name =
01385        string("lcs_") + string(cache_name) + string("_attr5") + string(".db");
01386     string id_idx_name =
01387        string("lcs_") + string(cache_name) + string("_id") + string(".idx");
01388 
01389     m_BLOB_SplitStore->Open(cache_blob_db_name, CBDB_RawFile::eReadWriteCreate);
01390     m_CacheAttrDB->Open(attr_db_name,        CBDB_RawFile::eReadWriteCreate);
01391     m_CacheAttrDB_RO1->Open(attr_db_name,    CBDB_RawFile::eReadOnly);
01392     m_CacheAttrDB_RO2->Open(attr_db_name,    CBDB_RawFile::eReadOnly);
01393     m_CacheIdIDX->Open(id_idx_name,          CBDB_RawFile::eReadWriteCreate);
01394     m_CacheIdIDX_RO->Open(id_idx_name,       CBDB_RawFile::eReadOnly);
01395 
01396     // try to open all databases
01397     //
01398     m_BLOB_SplitStore->OpenProjections();
01399 
01400     }}
01401 
01402     // Create timeline for BLOB expiration
01403     //
01404 
01405     {{
01406         unsigned timeline_precision = 5 * 60; // 5min default precision
01407         if (m_Timeout) {
01408             timeline_precision = m_Timeout / 10;
01409         }
01410         if (timeline_precision > (5 * 60)) { // 5 min max.
01411             timeline_precision = 5 * 60;
01412         }
01413         if (timeline_precision < (1 * 60)) {
01414             timeline_precision = 1 * 60;
01415         }
01416         m_TimeLine = new TTimeLine(timeline_precision, 0);
01417     }}
01418 
01419 
01420     // read cache attributes so we can adjust atomic counter
01421     //
01422 
01423     {{
01424     LOG_POST_X(6, Info << "Scanning cache content.");
01425 
01426     m_CacheAttrDB->SetTransaction(0);
01427 
01428     CBDB_FileCursor cur(*m_CacheAttrDB);
01429     cur.SetCondition(CBDB_FileCursor::eFirst);
01430     cur.InitMultiFetch(10 * 1024 * 1024);
01431     unsigned max_blob_id = 0;
01432     while (cur.Fetch() == eBDB_Ok) {
01433         unsigned blob_id = m_CacheAttrDB->blob_id;
01434         max_blob_id = max(max_blob_id, blob_id);
01435 
01436         unsigned coord[2];
01437         coord[0] = m_CacheAttrDB->volume_id;
01438         coord[1] = m_CacheAttrDB->split_id;
01439 
01440         m_BLOB_SplitStore->AssignCoordinates(blob_id, coord);
01441 
01442         // check BLOB id index integrity
01443         {{
01444             m_CacheIdIDX->blob_id = blob_id;
01445             if (m_CacheIdIDX->Fetch() == eBDB_Ok) {
01446             } else {
01447                 // index not found... corruption?
01448                 string key = m_CacheAttrDB->key.GetString();
01449                 int version = m_CacheAttrDB->version;
01450                 string subkey = m_CacheAttrDB->subkey.GetString();
01451 
01452                 m_CacheIdIDX->blob_id = blob_id;
01453                 m_CacheIdIDX->key = key;
01454                 m_CacheIdIDX->version = version;
01455                 m_CacheIdIDX->subkey = subkey;
01456 
01457                 m_CacheIdIDX->Insert();
01458             }
01459         }}
01460 
01461     }
01462     m_BlobIdCounter.Set(max_blob_id);
01463     }}
01464 
01465     if (m_TimeStampFlag & fPurgeOnStartup) {
01466         unsigned batch_sleep = m_BatchSleep;
01467         unsigned batch_size = m_PurgeBatchSize;
01468         unsigned purge_thread_delay = m_PurgeThreadDelay;
01469 
01470         // setup parameters which favor fast Purge execution
01471         // (Open purge needs to be fast because all waiting for it)
01472 
01473         if (m_PurgeBatchSize < 2500) {
01474             m_PurgeBatchSize = 2500;
01475         }
01476         m_BatchSleep = 0;
01477         m_PurgeThreadDelay = 0;
01478 
01479         Purge(GetTimeout());
01480 
01481         // Restore parameters
01482 
01483         m_BatchSleep = batch_sleep;
01484         m_PurgeBatchSize = batch_size;
01485         m_PurgeThreadDelay = purge_thread_delay;
01486     }
01487 
01488 
01489     if (m_RunPurgeThread) {
01490 # ifdef NCBI_THREADS
01491        LOG_POST_X(7, Info << "Starting cache cleaning thread.");
01492        m_PurgeThread.Reset(
01493            new CCacheCleanerThread(this, m_PurgeThreadDelay, 5));
01494        m_PurgeThread->Run();
01495 
01496         if (!m_JoinedEnv) {
01497             CBDB_Env::TBackgroundFlags flags =
01498                 CBDB_Env::eBackground_MempTrickle  |
01499                 CBDB_Env::eBackground_Checkpoint   |
01500                 CBDB_Env::eBackground_DeadLockDetect;
01501             //_TRACE("Running background writer with delay " << m_PurgeThreadDelay);
01502             m_Env->RunBackgroundWriter(flags,
01503                                        m_CheckPointDelay,
01504                                        m_MempTrickle);
01505         }
01506 # else
01507        LOG_POST_X(8, Warning <<
01508                   "Cannot run background thread in non-MT configuration.");
01509        m_Env->TransactionCheckpoint();
01510 # endif
01511     }
01512 
01513     m_ReadOnly = false;
01514 
01515     LOG_POST_X(9, Info <<
01516                "LC: '" << cache_name <<
01517                "' Cache mount at: " << cache_path);
01518 
01519 }
01520 
01521 void CBDB_Cache::RunPurgeThread(unsigned purge_delay)
01522 {
01523     m_RunPurgeThread = true;
01524     m_PurgeThreadDelay = purge_delay;
01525 }
01526 
01527 void CBDB_Cache::StopPurgeThread()
01528 {
01529 # ifdef NCBI_THREADS
01530     if (!m_PurgeThread.Empty()) {
01531         LOG_POST_X(10, Info << "Stopping cache cleaning thread...");
01532         StopPurge();
01533         m_PurgeThread->RequestStop();
01534         m_PurgeThread->Join();
01535         LOG_POST_X(11, Info << "Stopped.");
01536     }
01537 # endif
01538 }
01539 
01540 void CBDB_Cache::OpenReadOnly(const string&  cache_path,
01541                               const string&  cache_name,
01542                               unsigned int   cache_ram_size)
01543 {
01544     {{
01545 
01546     Close();
01547 
01548     CFastMutexGuard guard(m_DB_Lock);
01549 
01550     m_Path = CDirEntry::AddTrailingPathSeparator(cache_path);
01551     m_Name = cache_name;
01552 
01553     m_BLOB_SplitStore = new TSplitStore(new CBDB_BlobDeMux_RoundRobin(0));
01554     m_CacheAttrDB = new SCache_AttrDB();
01555 /*
01556     if (cache_ram_size)
01557         m_CacheBLOB_DB->SetCacheSize(cache_ram_size);
01558 */
01559     string cache_blob_db_name =
01560        string("lcs_") + string(cache_name) + string("_blob");
01561     string attr_db_name =
01562        m_Path + string("lcs_") + string(cache_name) + string("_attr5")
01563               + string(".db");
01564 
01565 
01566     m_BLOB_SplitStore->Open(cache_blob_db_name,  CBDB_RawFile::eReadOnly);
01567     m_CacheAttrDB->Open(attr_db_name.c_str(),    CBDB_RawFile::eReadOnly);
01568 
01569     }}
01570 
01571     m_ReadOnly = true;
01572 
01573     LOG_POST_X(12, Info <<
01574                "LC: '" << cache_name <<
01575                "' Cache mount read-only at: " << cache_path);
01576 }
01577 
01578 
01579 void CBDB_Cache::Close()
01580 {
01581     if (m_Closed) return;
01582     // We mark cache as closed early to prevent double close attempt if
01583     // exception fires before completion. We MUST not StopPurgeThread twice.
01584     m_Closed = true;
01585     StopPurgeThread();
01586 
01587     if (m_Env && !m_JoinedEnv) {
01588         m_Env->StopBackgroundWriterThread();
01589     }
01590 
01591     if (m_BLOB_SplitStore) {
01592         m_BLOB_SplitStore->Save();
01593     }
01594 
01595     delete m_PidGuard;        m_PidGuard = 0;
01596 
01597     delete m_CacheAttrDB_RO1; m_CacheAttrDB_RO1 = 0;
01598     delete m_CacheAttrDB_RO2; m_CacheAttrDB_RO2 = 0;
01599     delete m_CacheIdIDX_RO;   m_CacheIdIDX_RO = 0;
01600 
01601     delete m_BLOB_SplitStore; m_BLOB_SplitStore = 0;
01602     delete m_CacheAttrDB;     m_CacheAttrDB = 0;
01603     delete m_CacheIdIDX;      m_CacheIdIDX = 0;
01604 
01605     delete m_TimeLine;        m_TimeLine = 0;
01606 
01607     if (m_Env == 0) {
01608         return;
01609     }
01610     try {
01611         m_Env->ForceTransactionCheckpoint();
01612         CleanLog();
01613 
01614         if (m_Env->CheckRemove()) {
01615             LOG_POST_X(13, Info <<
01616                         "LC: '" << m_Name << "' Unmounted. BDB ENV deleted.");
01617         } else {
01618             LOG_POST_X(14, Info << "LC: '" << m_Name
01619                                 << "' environment still in use.");
01620         }
01621     }
01622     catch (exception& ex) {
01623         LOG_POST_X(15, Warning << "LC: '" << m_Name
01624                          << "' Exception in Close() " << ex.what()
01625                          << " (ignored.)");
01626     }
01627 
01628     delete m_Env; m_Env = 0;
01629 
01630     CFile fl_clean(CDirEntry::MakePath(m_Path, kBDBCacheStartedFileName));
01631     fl_clean.Remove();
01632 }
01633 
01634 void CBDB_Cache::CleanLog()
01635 {
01636     if (m_Env) {
01637         if (m_Env->IsTransactional()) {
01638             m_Env->CleanLog();
01639         }
01640     }
01641 }
01642 
01643 void CBDB_Cache::x_Close()
01644 {
01645     delete m_PidGuard;    m_PidGuard = 0;
01646     delete m_CacheAttrDB; m_CacheAttrDB = 0;
01647     delete m_Env;         m_Env = 0;
01648 }
01649 
01650 ICache::TFlags CBDB_Cache::GetFlags()
01651 {
01652     return (TFlags) 0;
01653 }
01654 
01655 void CBDB_Cache::SetFlags(ICache::TFlags flags)
01656 {
01657 }
01658 
01659 void CBDB_Cache::SetTimeStampPolicy(TTimeStampFlags policy,
01660                                     unsigned int    timeout,
01661                                     unsigned int    max_timeout)
01662 {
01663     CFastMutexGuard guard(m_DB_Lock);
01664 
01665     m_TimeStampFlag = policy;
01666     m_Timeout = timeout;
01667 
01668     if (max_timeout) {
01669         m_MaxTimeout = max_timeout > timeout ? max_timeout : timeout;
01670     } else {
01671         if (m_MaxTTL_prolong) {
01672             m_MaxTimeout = timeout * m_MaxTTL_prolong;
01673         } else {
01674             m_MaxTimeout = 0;
01675         }
01676     }
01677 }
01678 
01679 CBDB_Cache::TTimeStampFlags CBDB_Cache::GetTimeStampPolicy() const
01680 {
01681     return m_TimeStampFlag;
01682 }
01683 
01684 int CBDB_Cache::GetTimeout() const
01685 {
01686     return m_Timeout;
01687 }
01688 
01689 void CBDB_Cache::SetVersionRetention(EKeepVersions policy)
01690 {
01691     CFastMutexGuard guard(m_DB_Lock);
01692     m_VersionFlag = policy;
01693 }
01694 
01695 CBDB_Cache::EKeepVersions CBDB_Cache::GetVersionRetention() const
01696 {
01697     return m_VersionFlag;
01698 }
01699 
01700 void CBDB_Cache::GetStatistics(SBDB_CacheStatistics* cache_stat) const
01701 {
01702     _ASSERT(cache_stat);
01703 
01704     CFastMutexGuard guard(m_DB_Lock);
01705     *cache_stat = m_Statistics;
01706 }
01707 
01708 void CBDB_Cache::InitStatistics()
01709 {
01710     CFastMutexGuard guard(m_DB_Lock);
01711     m_Statistics.Init();
01712 }
01713 
01714 void CBDB_Cache::KillBlob(const string&  key,
01715                           int            version,
01716                           const string&  subkey,
01717                           int            overflow,
01718                           unsigned       blob_id)
01719 {
01720 
01721     CBDB_Transaction trans(*m_Env,
01722                             CBDB_Transaction::eEnvDefault,
01723                             CBDB_Transaction::eNoAssociation);
01724     {{
01725         CFastMutexGuard guard(m_DB_Lock);
01726         m_BLOB_SplitStore->SetTransaction(&trans);
01727         x_DropBlob(key, version, subkey, overflow, blob_id, trans);
01728     }}
01729     trans.Commit();
01730 
01731 }
01732 
01733 void CBDB_Cache::DropBlob(const string&  key,
01734                           int            version,
01735                           const string&  subkey,
01736                           bool           for_update,
01737                           unsigned*      blob_id,
01738                           unsigned*      coord)
01739 {
01740     _ASSERT(blob_id);
01741     _ASSERT(coord);
01742 
01743     int overflow = 0;
01744 
01745     {{
01746 
01747     CBDB_Transaction trans(*m_Env,
01748                             CBDB_Transaction::eEnvDefault,
01749                             CBDB_Transaction::eNoAssociation);
01750 
01751     {{
01752 
01753         CFastMutexGuard guard(m_DB_Lock);
01754         m_CacheAttrDB->SetTransaction(&trans);
01755 
01756         CBDB_FileCursor cur(*m_CacheAttrDB, trans,
01757                             CBDB_FileCursor::eReadModifyUpdate);
01758         cur.SetCondition(CBDB_FileCursor::eEQ);
01759 
01760         cur.From << key << version << subkey;
01761 
01762         if (cur.Fetch() == eBDB_Ok) {
01763             overflow = m_CacheAttrDB->overflow;
01764             *blob_id = m_CacheAttrDB->blob_id;
01765 
01766             coord[0] = m_CacheAttrDB->volume_id;
01767             coord[1] = m_CacheAttrDB->split_id;
01768 
01769             if (!for_update) {   // permanent BLOB removal
01770                 string owner_name;
01771                 m_CacheAttrDB->owner_name.ToString(owner_name);
01772 
01773                 // FIXME:
01774                 if (IsSaveStatistics()) {
01775                     m_Statistics.AddExplDelete(owner_name);
01776                     if (0 == m_CacheAttrDB->read_count) {
01777                         m_Statistics.AddNeverRead(owner_name);
01778                     }
01779                     x_UpdateOwnerStatOnDelete(owner_name,
01780                                               true);//explicit del
01781                 }
01782 
01783                 cur.Delete();
01784             }
01785 
01786         } else {
01787             *blob_id = 0;
01788             return;
01789         }
01790     }}
01791 
01792     // delete the split store BLOB
01793     if (!for_update) {   // permanent BLOB removal
01794         unsigned split_coord[2];
01795         EBDB_ErrCode ret =
01796             m_BLOB_SplitStore->GetCoordinates(*blob_id, split_coord);
01797 
01798         m_BLOB_SplitStore->SetTransaction(&trans);
01799         if (ret == eBDB_Ok) {
01800             if (coord[0] != split_coord[0] || coord[1] != split_coord[1]) {
01801                 m_BLOB_SplitStore->Delete(*blob_id, split_coord,
01802                                           CBDB_RawFile::eThrowOnError);
01803             }
01804         }
01805         m_BLOB_SplitStore->Delete(*blob_id, coord,
01806                                   CBDB_RawFile::eThrowOnError);
01807 
01808     }
01809 
01810     trans.Commit();
01811     }}
01812 
01813     if (overflow) {
01814         x_DropOverflow(key.c_str(), version, subkey.c_str());
01815     }
01816 
01817 }
01818 
01819 void CBDB_Cache::RegisterOverflow(const string&  key,
01820                                   int            version,
01821                                   const string&  subkey,
01822                                   unsigned       time_to_live,
01823                                   const string&  owner)
01824 {
01825     time_t curr = time(0);
01826 
01827     unsigned blob_id = 0;
01828     unsigned coord[2]={0,};
01829 
01830     {{
01831         CBDB_Transaction trans(*m_Env,
01832                             CBDB_Transaction::eEnvDefault,
01833                             CBDB_Transaction::eNoAssociation);
01834 
01835         {{
01836         CFastMutexGuard guard(m_DB_Lock);
01837         m_CacheAttrDB->SetTransaction(&trans);
01838         m_CacheIdIDX->SetTransaction(&trans);
01839 
01840         EBDB_ErrCode ret;
01841         {{
01842             CBDB_FileCursor cur(*m_CacheAttrDB,
01843                                 trans,
01844                                 CBDB_FileCursor::eReadModifyUpdate);
01845             cur.SetCondition(CBDB_FileCursor::eEQ);
01846             cur.From << key << version << subkey;
01847 
01848             ret = cur.Fetch();
01849             if (ret == eBDB_Ok) {
01850                 m_CacheAttrDB->time_stamp = (unsigned)curr;
01851                 m_CacheAttrDB->overflow = 1;
01852                 m_CacheAttrDB->ttl = time_to_live;
01853                 m_CacheAttrDB->max_time = (Uint4)ComputeMaxTime(curr);
01854                 unsigned upd_count = m_CacheAttrDB->upd_count;
01855                 ++upd_count;
01856                 m_CacheAttrDB->upd_count = upd_count;
01857                 m_CacheAttrDB->owner_name = owner;
01858 
01859                 blob_id = m_CacheAttrDB->blob_id;
01860                 coord[0] = m_CacheAttrDB->volume_id;
01861                 coord[1] = m_CacheAttrDB->split_id;
01862 
01863                 cur.Update();
01864             }
01865         }} // cursor
01866 
01867         if (ret != eBDB_Ok) { // record not found: re-registration
01868             unsigned blob_id = GetNextBlobId(false /*no locking*/);
01869             m_CacheAttrDB->key = key;
01870             m_CacheAttrDB->version = version;
01871             m_CacheAttrDB->subkey = subkey;
01872             m_CacheAttrDB->time_stamp = (unsigned)curr;
01873             m_CacheAttrDB->overflow = 1;
01874             m_CacheAttrDB->ttl = time_to_live;
01875             m_CacheAttrDB->max_time = (Uint4)ComputeMaxTime(curr);
01876             m_CacheAttrDB->upd_count  = 0;
01877             m_CacheAttrDB->read_count = 0;
01878             m_CacheAttrDB->owner_name = owner;
01879             m_CacheAttrDB->blob_id = blob_id;
01880             m_CacheAttrDB->volume_id = 0;
01881             m_CacheAttrDB->split_id = 0;
01882 
01883             ret = m_CacheAttrDB->Insert();
01884             if (ret != eBDB_Ok) {
01885                 LOG_POST_X(16, Error << "Failed to insert BLOB attributes "
01886                                << key << " " << version << " " << subkey);
01887             } else {
01888                 m_CacheIdIDX->blob_id = blob_id;
01889                 m_CacheIdIDX->key = key;
01890                 m_CacheIdIDX->version = version;
01891                 m_CacheIdIDX->subkey = subkey;
01892 
01893                 ret = m_CacheIdIDX->Insert();
01894                 if (ret != eBDB_Ok) {
01895                     LOG_POST_X(17, Error << "Failed to insert BLOB id index "
01896                                    << key << " " << version << " " << subkey);
01897                 }
01898             }
01899 
01900 
01901         }
01902 
01903         }} // m_DB_Lock
01904 
01905         trans.Commit();
01906     }} // trans
01907 
01908     if (blob_id) { // clean up the split store
01909         CBDB_Transaction trans(*m_Env,
01910                             CBDB_Transaction::eEnvDefault,
01911                             CBDB_Transaction::eNoAssociation);
01912 
01913         m_BLOB_SplitStore->SetTransaction(&trans);
01914 
01915         try {
01916             unsigned split_coord[2];
01917             EBDB_ErrCode ret =
01918                 m_BLOB_SplitStore->GetCoordinates(blob_id, split_coord);
01919             if (ret == eBDB_Ok) {
01920                 if (coord[0] != split_coord[0] || coord[1] != split_coord[1]) {
01921                     m_BLOB_SplitStore->Delete(blob_id, split_coord,
01922                                                 CBDB_RawFile::eThrowOnError);
01923                 }
01924             }
01925             m_BLOB_SplitStore->Delete(blob_id, coord,
01926                                     CBDB_RawFile::eThrowOnError);
01927         }
01928         catch (CBDB_Exception& ex) {
01929             LOG_POST_X(18, Error << "Cannot delete BLOB from split store "
01930                            << ex.what());
01931             throw;
01932         }
01933         trans.Commit();
01934     }
01935 }
01936 
01937 void CBDB_Cache::x_Store(unsigned       blob_id,
01938                          const string&  key,
01939                          int            version,
01940                          const string&  subkey,
01941                          const void*    data,
01942                          size_t         size,
01943                          unsigned int   time_to_live,
01944                          const string&  owner,
01945                          bool           do_blob_lock)
01946 {
01947     if (IsReadOnly()) {
01948         return;
01949     }
01950     //_TRACE("CBDB_Cache::x_Store point 1 size=" << size);
01951     unsigned coord[2] = {0,};
01952     unsigned overflow = 0, old_overflow = 0;
01953     EBDB_ErrCode ret;
01954 
01955     time_t curr = time(0);
01956     int tz_delta = m_LocalTimer.GetLocalTimezone();
01957 
01958     // ----------------------------------------------------
01959     // check BLOB size quota
01960     // ----------------------------------------------------
01961     if (GetMaxBlobSize() && size > GetMaxBlobSize()) {
01962         // FIXME:
01963         if (IsSaveStatistics()) {
01964             CFastMutexGuard guard(m_DB_Lock);
01965             m_Statistics.AddBlobQuotaError(owner);
01966         }
01967 
01968         string msg("BLOB larger than allowed. size=");
01969         msg.append(NStr::SizetToString(size));
01970         msg.append(" quota=");
01971         msg.append(NStr::UIntToString(GetMaxBlobSize()));
01972         BDB_THROW(eQuotaLimit, msg);
01973     }
01974 
01975 
01976     // ----------------------------------------------------
01977     // Optional pre-cleaning
01978     // ----------------------------------------------------
01979 
01980     if (m_VersionFlag == eDropAll || m_VersionFlag == eDropOlder) {
01981         //_TRACE("CBDB_Cache::x_Store point 2");
01982         Purge(key, subkey, 0, m_VersionFlag);
01983     }
01984 
01985     TBlobLock blob_lock(m_LockVector, m_LockTimeout);
01986 
01987     // ----------------------------------------------------
01988     // BLOB check-in, read attributes, lock the id
01989     // ----------------------------------------------------
01990 
01991     EBlobCheckinRes check_res =
01992         BlobCheckIn(blob_id,
01993                     key, version, subkey,
01994                     eBlobCheckIn_Create,
01995                     blob_lock, do_blob_lock,
01996                     &coord[0], &coord[1],
01997                     &old_overflow);
01998     //_TRACE("CBDB_Cache::x_Store point 3");
01999     _ASSERT(check_res != EBlobCheckIn_NotFound);
02000     _ASSERT(blob_lock.GetId());
02001 
02002     blob_id = blob_lock.GetId();
02003 
02004     try {
02005 
02006         if (size > GetOverflowLimit()) {
02007             //_TRACE("CBDB_Cache::x_Store point 4");
02008             // ----------------------------------------------------
02009             // write overflow file
02010             // ----------------------------------------------------
02011             string path;
02012             s_MakeOverflowFileName(path, m_Path, GetName(), key, version, subkey);
02013             {{
02014                 CNcbiOfstream oveflow_file(path.c_str(),
02015                                         IOS_BASE::out |
02016                                         IOS_BASE::trunc |
02017                                         IOS_BASE::binary);
02018                 if (!oveflow_file.is_open() || oveflow_file.bad()) {
02019                     string err = "LC: Cannot create overflow file ";
02020                     err += path;
02021                     BDB_THROW(eCannotOpenOverflowFile, err);
02022                 }
02023                 WriteOverflow(oveflow_file, path, (const char*)data, size);
02024             }}
02025             overflow = 1;
02026         }
02027 
02028         // ----------------------------------------------------
02029         // Check if BLOB changed overflow location
02030         // ----------------------------------------------------
02031 
02032         if (overflow != old_overflow && check_res == eBlobCheckIn_Found) {
02033             if (overflow) { // BLOB goes from split to file
02034                 //_TRACE("CBDB_Cache::x_Store point 5");
02035                 // ----------------------------------------------------
02036                 // Delete old split store content
02037                 // ----------------------------------------------------
02038                 CBDB_Transaction trans(*m_Env,
02039                                         CBDB_Transaction::eEnvDefault,
02040                                         CBDB_Transaction::eNoAssociation);
02041                 m_BLOB_SplitStore->SetTransaction(&trans);
02042 
02043                 try {
02044                     unsigned split_coord[2];
02045                     EBDB_ErrCode ret =
02046                         m_BLOB_SplitStore->GetCoordinates(blob_id, split_coord);
02047                     if (ret == eBDB_Ok) {
02048                         if (coord[0] != split_coord[0] || coord[1] != split_coord[1]) {
02049                             m_BLOB_SplitStore->Delete(blob_id, split_coord,
02050                                                       CBDB_RawFile::eThrowOnError);
02051                         }
02052                     }
02053                     m_BLOB_SplitStore->Delete(blob_id, coord,
02054                                               CBDB_RawFile::eThrowOnError);
02055                 }
02056                 catch (CBDB_Exception& ex) {
02057                     LOG_POST_X(19, Error << "Cannot delete BLOB from split store "
02058                                    << ex.what());
02059                     throw;
02060                 }
02061                 trans.Commit();
02062                 coord[0] = coord[1] = 0;
02063             } else {
02064                 //_TRACE("CBDB_Cache::x_Store point 6");
02065                 // ----------------------------------------------------
02066                 // Delete old overflow file
02067                 // ----------------------------------------------------
02068 
02069                 x_DropOverflow(key, version, subkey);
02070             }
02071         }
02072 
02073         // ----------------------------------------------------
02074         // Update the split store content
02075         // ----------------------------------------------------
02076 
02077         {{
02078             //_TRACE("CBDB_Cache::x_Store point 7");
02079             CBDB_Transaction trans(*m_Env,
02080                                     CBDB_Transaction::eEnvDefault,
02081                                     CBDB_Transaction::eNoAssociation);
02082             m_BLOB_SplitStore->SetTransaction(&trans);
02083             //_TRACE("CBDB_Cache::x_Store point 32");
02084 
02085             m_BLOB_SplitStore->UpdateInsert(blob_id,
02086                                             coord,
02087                                             data, size,
02088                                             coord);
02089             //_TRACE("CBDB_Cache::x_Store point 33");
02090             trans.Commit();
02091         }}
02092 
02093 
02094         // ----------------------------------------------------
02095         // Update BLOB attributes
02096         // ----------------------------------------------------
02097         if (m_MaxTimeout) {
02098             if (time_to_live > m_MaxTimeout) {
02099                 time_to_live = m_MaxTimeout;
02100             }
02101         } else { // m_MaxTimeout == 0
02102             if (m_MaxTTL_prolong != 0 && m_Timeout != 0) {
02103                 time_to_live = min(m_Timeout * m_MaxTTL_prolong, time_to_live);
02104             }
02105         }
02106 
02107         time_t ttl_max = ComputeMaxTime(curr);
02108 
02109         {{
02110             CBDB_Transaction trans(*m_Env,
02111                                 CBDB_Transaction::eEnvDefault,
02112                                 CBDB_Transaction::eNoAssociation);
02113             {{
02114                 CFastMutexGuard guard(m_DB_Lock);
02115                 m_CacheAttrDB->SetTransaction(&trans);
02116 
02117                 CBDB_FileCursor cur(*m_CacheAttrDB,
02118                                     trans,
02119                                     CBDB_FileCursor::eReadModifyUpdate);
02120                 cur.SetCondition(CBDB_FileCursor::eEQ);
02121                 cur.From << key << version << subkey;
02122                 ret = cur.Fetch();
02123                 if (ret == eBDB_Ok) {
02124                     old_overflow = m_CacheAttrDB->overflow;
02125 
02126                     m_CacheAttrDB->time_stamp = (unsigned)curr;
02127                     m_CacheAttrDB->overflow = overflow;
02128                     m_CacheAttrDB->ttl = time_to_live;
02129                     m_CacheAttrDB->max_time = (Uint4)ttl_max;
02130                     unsigned upd_count = m_CacheAttrDB->upd_count;
02131                     m_CacheAttrDB->upd_count = ++upd_count;
02132                     m_CacheAttrDB->owner_name = owner;
02133 
02134                     // here is a paranoid check in case blob id ever changes
02135                     // this should never happen!
02136                     //
02137                     unsigned old_blob_id = m_CacheAttrDB->blob_id;
02138                     if (blob_id != old_blob_id) {
02139                         BDB_THROW(eRaceCondition,
02140                                   "BLOB id mutation detected!");
02141                     }
02142                     m_CacheAttrDB->volume_id = coord[0];
02143                     m_CacheAttrDB->split_id = coord[1];
02144 
02145                     ret = cur.Update();
02146                 }
02147             }} // m_DB_Lock
02148 
02149 
02150             if (ret == eBDB_Ok) {
02151                 trans.Commit();
02152             } else {
02153                 // BLOB got deleted!
02154                 string msg = "BLOB insert-delete race detected!";
02155                 if (m_Monitor && m_Monitor->IsActive()) {
02156                     msg += "\n";
02157                     m_Monitor->Send(msg);
02158                 }
02159                 BDB_THROW(eRaceCondition, msg);
02160             }
02161 
02162         }} // trans
02163 
02164         {{
02165             time_t exp_time =
02166                 x_ComputeExpTime((int)curr, time_to_live, GetTimeout());
02167 
02168             CFastMutexGuard guard(m_TimeLine_Lock);
02169             m_TimeLine->AddObject(exp_time, blob_id);
02170         }} // m_TimeLine_Lock
02171 
02172 
02173         // FIXME: locks, correct statistics, etc
02174         unsigned blob_updated = 0;
02175         unsigned blob_stored  = 0;
02176 
02177         if (check_res == eBlobCheckIn_Found) {
02178             blob_updated = 1;
02179         } else {
02180             blob_stored = 1;
02181         }
02182 
02183         if (IsSaveStatistics()) {
02184             CFastMutexGuard guard(m_DB_Lock);
02185             m_Statistics.AddStore(owner, curr - tz_delta,
02186                                   blob_stored, blob_updated, size, overflow);
02187         }
02188 
02189     }
02190     catch (exception)
02191     {
02192         // if things go wrong we do not want the database in an uncertain state, so
02193         // BLOB is getting killed here
02194         //
02195 
02196         KillBlob(key, version, subkey,
02197                  1, // try to delete overflow file too
02198                  0);
02199         throw;
02200     }
02201 }
02202 
02203 
02204 void CBDB_Cache::Store(unsigned       blob_id_ext,
02205                        const string&  key,
02206                        int            version,
02207                        const string&  subkey,
02208                        const void*    data,
02209                        size_t         size,
02210                        unsigned int   time_to_live,
02211                        const string&  owner)
02212 {
02213     x_Store(blob_id_ext, key, version, subkey, data, size, time_to_live, owner,
02214             true // do blob is locking
02215             );
02216 }
02217 
02218 
02219 void CBDB_Cache::Store(const string&  key,
02220                        int            version,
02221                        const string&  subkey,
02222                        const void*    data,
02223                        size_t         size,
02224                        unsigned int   time_to_live,
02225                        const string&  owner)
02226 {
02227     x_Store(0, key, version, subkey, data, size, time_to_live, owner,
02228             true // do blob is locking
02229             );
02230 }
02231 
02232 
02233 bool CBDB_Cache::GetSizeEx(const string&  key,
02234                            int            version,
02235                            const string&  subkey,
02236                            size_t*        size)
02237 {
02238     size_t blob_size = 0;
02239     int overflow;
02240     unsigned int ttl, blob_id, volume_id, split_id;
02241 
02242     blob_id = GetBlobId(key, version, subkey);
02243     if (!blob_id) return false;
02244     TBlobLock blob_lock(m_LockVector, blob_id, m_LockTimeout);
02245 
02246     {{
02247         CFastMutexGuard guard(m_DB_Lock);
02248         m_CacheAttrDB->SetTransaction(0);
02249 
02250         bool rec_exists =
02251             x_RetrieveBlobAttributes(key, version, subkey,
02252                                     overflow, ttl,
02253                                     blob_id, volume_id, split_id);
02254         if (!rec_exists) return false;
02255 
02256         // check expiration here
02257         // joukovv 2006-06-24: expiration of WHAT? Apparently, not of a given
02258         // BLOB - no BLOB-specific parameters passed. Should we bother?
02259         if (m_TimeStampFlag & fCheckExpirationAlways) {
02260             if (x_CheckTimeStampExpired(*m_CacheAttrDB, time(0)))
02261                 return false;
02262         }
02263         overflow = m_CacheAttrDB->overflow;
02264     }}
02265 
02266     if (overflow) {
02267         string path;
02268         s_MakeOverflowFileName(path, m_Path, GetName(), key, version, subkey);
02269         CFile entry(path);
02270 
02271         if (!entry.Exists())
02272             return false;
02273         blob_size = (size_t)entry.GetLength();
02274     } else {
02275         // Regular inline BLOB
02276         if (blob_id == 0) return false;
02277 
02278         unsigned coords[2];
02279         coords[0] = volume_id;
02280         coords[1] = split_id;
02281 
02282         m_BLOB_SplitStore->SetTransaction(0);
02283 
02284         EBDB_ErrCode ret =
02285             m_BLOB_SplitStore->BlobSize(blob_id, coords, &blob_size);
02286         if (ret != eBDB_Ok) return false;
02287     }
02288     if (size) *size = blob_size;
02289     return true;
02290 }
02291 
02292 
02293 size_t CBDB_Cache::GetSize(const string&  key,
02294                            int            version,
02295                            const string&  subkey)
02296 {
02297     size_t size;
02298     return GetSizeEx(key, version, subkey, &size) ? size : 0;
02299 }
02300 
02301 
02302 void CBDB_Cache::GetBlobOwner(const string&  key,
02303                               int            version,
02304                               const string&  subkey,
02305                               string*        owner)
02306 {
02307     _ASSERT(owner);
02308 
02309     CFastMutexGuard guard(m_DB_Lock);
02310     m_CacheAttrDB->SetTransaction(0);
02311 
02312     if (!x_FetchBlobAttributes(key, version, subkey)) {
02313         owner->erase();
02314         return;
02315     }
02316 
02317     m_CacheAttrDB->owner_name.ToString(*owner);
02318 }
02319 
02320 
02321 bool CBDB_Cache::HasBlobs(const string&  key,
02322                           const string&  subkey)
02323 {
02324     time_t curr = time(0);
02325 
02326     CFastMutexGuard guard(m_DB_Lock);
02327     m_CacheAttrDB->SetTransaction(0);
02328 
02329     CBDB_FileCursor cur(*m_CacheAttrDB);
02330 
02331     // Simple case - key only. Just look it up
02332     if (subkey.empty()) {
02333         cur.SetCondition(CBDB_FileCursor::eEQ);
02334         cur.From << key;
02335         return cur.FetchFirst() == eBDB_Ok &&
02336                !x_CheckTimeStampExpired(*m_CacheAttrDB, curr);
02337     }
02338 
02339     // More complicated case with subkey
02340     int version = 0;
02341     for (;;) {
02342         // Get version for key
02343         cur.SetCondition(CBDB_FileCursor::eGE);
02344         cur.From << key;
02345         cur.From << version;
02346         if (cur.FetchFirst() != eBDB_Ok)
02347             return false;
02348         const char* found_key = m_CacheAttrDB->key;
02349         if ( key != found_key )
02350             return false;
02351         // We have version, now fetch subkey
02352         version = m_CacheAttrDB->version;
02353         cur.SetCondition(CBDB_FileCursor::eEQ);
02354         cur.From << key;
02355         cur.From << version;
02356         cur.From << subkey;
02357         if (cur.FetchFirst() == eBDB_Ok &&
02358             !x_CheckTimeStampExpired(*m_CacheAttrDB, curr))
02359             return true;
02360         ++version;
02361     }
02362 
02363     return false;
02364 
02365     /* I don't know why we need this extra check if we already have
02366        a record in CacheAttrDB. Moreover, for overflown blobs there is
02367        NO record in SplitStore DB. So I am commenting this out.
02368        joukovv 2007-11-27
02369     TBlobLock blob_lock(m_LockVector, blob_id, m_LockTimeout);
02370 
02371     _ASSERT(blob_id);
02372     unsigned split_coord[2];
02373     EBDB_ErrCode ret =
02374         m_BLOB_SplitStore->GetCoordinates(blob_id, split_coord);
02375     if (ret != eBDB_Ok) {
02376         return false;
02377     } */
02378 }
02379 
02380 
02381 bool CBDB_Cache::Read(const string& key,
02382                       int           version,
02383                       const string& subkey,
02384                       void*         buf,
02385                       size_t        buf_size)
02386 {
02387     EBDB_ErrCode ret;
02388 
02389     time_t curr = time(0);
02390     int tz_delta = m_LocalTimer.GetLocalTimezone();
02391 
02392     int overflow = 0;
02393     unsigned volume_id = 0, split_id = 0;
02394 
02395     unsigned blob_id = GetBlobId(key, version, subkey);
02396     if (!blob_id) return false;
02397     TBlobLock blob_lock(m_LockVector, blob_id, m_LockTimeout);
02398 
02399 
02400     CBDB_Transaction trans(*m_Env,
02401                         CBDB_Transaction::eTransASync, // async!
02402                         CBDB_Transaction::eNoAssociation);
02403 
02404 
02405     {{
02406         CFastMutexGuard guard(m_DB_Lock);
02407         m_CacheAttrDB->SetTransaction(&trans);
02408         CBDB_FileCursor cur(*m_CacheAttrDB, trans,
02409                             CBDB_FileCursor::eReadModifyUpdate);
02410         cur.SetCondition(CBDB_FileCursor::eEQ);
02411         cur.From << key << version << subkey;
02412         ret = cur.Fetch();
02413         if (ret == eBDB_Ok) {
02414             if (m_TimeStampFlag & fCheckExpirationAlways) {
02415                 if (x_CheckTimeStampExpired(*m_CacheAttrDB, curr)) {
02416                     return false;
02417                 }
02418             }
02419             unsigned read_count = m_CacheAttrDB->read_count;
02420             m_CacheAttrDB->read_count = read_count + 1;
02421 
02422             unsigned max_time = m_CacheAttrDB->max_time;
02423             if (max_time == 0 || max_time >= (unsigned)curr) {
02424                 if ( m_TimeStampFlag & fTimeStampOnRead ) {
02425                     m_CacheAttrDB->time_stamp = (unsigned)curr;
02426                 }
02427             }
02428 
02429             blob_id = m_CacheAttrDB->blob_id;
02430             overflow = m_CacheAttrDB->overflow;
02431             volume_id = m_CacheAttrDB->volume_id;
02432             split_id = m_CacheAttrDB->split_id;
02433 
02434             ret = cur.Update();
02435 
02436         } else {
02437             return false;
02438         }
02439     }} // m_DB_Lock
02440 
02441     if (ret != eBDB_Ok) {
02442         return false;
02443     }
02444     _ASSERT(blob_id);
02445     trans.Commit();
02446 
02447     //  FIXME: locks, etc
02448     string owner_name;
02449     m_CacheAttrDB->owner_name.ToString(owner_name);
02450     if (IsSaveStatistics()) {
02451         CFastMutexGuard guard(m_DB_Lock);
02452         m_Statistics.AddRead(owner_name, curr - tz_delta);
02453     }
02454 
02455     // read the data
02456 
02457 
02458     if (overflow) {
02459         string path;
02460         s_MakeOverflowFileName(path, m_Path, GetName(),key, version, subkey);
02461 
02462         auto_ptr<CNcbiIfstream>
02463             overflow_file(new CNcbiIfstream(path.c_str(),
02464                                             IOS_BASE::in | IOS_BASE::binary));
02465         if (!overflow_file->is_open()) {
02466             // TODO: Kill the registration record
02467             return false;
02468         }
02469         overflow_file->read((char*)buf, buf_size);
02470         if (!*overflow_file) {
02471             return false;
02472         }
02473     }
02474     else {
02475         if (blob_id == 0) {
02476             return false;
02477         }
02478         m_BLOB_SplitStore->SetTransaction(0);
02479 
02480         unsigned coords[2];
02481         ret = m_BLOB_SplitStore->GetCoordinates(blob_id, coords);
02482         if (ret == eBDB_Ok) {
02483             if (coords[0] != volume_id ||
02484                 coords[1] != split_id) {
02485                 // TODO: restore de-mux mapping
02486             }
02487         } else {
02488             // TODO: restore de-mux mapping
02489         }
02490 
02491         coords[0] = volume_id;
02492         coords[1] = split_id;
02493 
02494         size_t blob_size;
02495         ret = m_BLOB_SplitStore->Fetch(blob_id, coords,
02496                                        &buf, buf_size,
02497                                        CBDB_RawFile::eReallocForbidden,
02498                                        &blob_size);
02499 
02500         if (ret != eBDB_Ok) {
02501             return false;
02502         }
02503     }
02504     return true;
02505 
02506 
02507 
02508 /*
02509 
02510     {{
02511         CFastMutexGuard guard(m_DB_Lock);
02512         m_CacheAttrDB->SetTransaction(0);
02513 
02514         int overflow;
02515         unsigned int ttl, blob_id, volume_id, split_id;
02516         bool rec_exists =
02517             x_RetrieveBlobAttributes(key, version, subkey,
02518                                     overflow, ttl,
02519                                     blob_id, volume_id, split_id);
02520         if (!rec_exists) {
02521             return false;
02522         }
02523 
02524         // check expiration
02525         if (m_TimeStampFlag & fCheckExpirationAlways) {
02526             if (x_CheckTimeStampExpired()) {
02527                 return false;
02528             }
02529         }
02530 
02531         m_CacheAttrDB->owner_name.ToString(m_TmpOwnerName);
02532         m_Statistics.AddRead(m_TmpOwnerName, curr - tz_delta);
02533 
02534 
02535         if (overflow) {
02536             string path;
02537             s_MakeOverflowFileName(path, m_Path, GetName(),key, version, subkey);
02538 
02539             auto_ptr<CNcbiIfstream>
02540                 overflow_file(new CNcbiIfstream(path.c_str(),
02541                                                 IOS_BASE::in | IOS_BASE::binary));
02542             if (!overflow_file->is_open()) {
02543                 return false;
02544             }
02545             overflow_file->read((char*)buf, buf_size);
02546             if (!*overflow_file) {
02547                 return false;
02548             }
02549         }
02550         else {
02551 
02552             if (blob_id == 0) {
02553                 return false;
02554             }
02555 
02556             m_BLOB_SplitStore->SetTransaction(0);
02557 
02558 
02559             unsigned coords[2];
02560             ret = m_BLOB_SplitStore->GetCoordinates(blob_id, coords);
02561             if (ret == eBDB_Ok) {
02562                 if (coords[0] != volume_id ||
02563                     coords[1] != split_id) {
02564                     // TO DO: restore de-mux mapping
02565                 }
02566             } else {
02567                 // TODO: restore de-mux mapping
02568             }
02569 
02570             coords[0] = volume_id;
02571             coords[1] = split_id;
02572 
02573             size_t blob_size;
02574 
02575             ret =
02576                 m_BLOB_SplitStore->Fetch(blob_id, coords,
02577                                         &buf, buf_size,
02578                                         CBDB_RawFile::eReallocForbidden,
02579                                         &blob_size);
02580 
02581             if (ret != eBDB_Ok) {
02582                 return false;
02583             }
02584         }
02585 
02586 
02587 
02588         if ( m_TimeStampFlag & fTimeStampOnRead ) {
02589             m_CacheAttrDB->SetTransaction(&trans);
02590             x_UpdateReadAccessTime(key, version, subkey, trans);
02591         }
02592 
02593     }} // m_DB_Lock
02594 
02595     trans.Commit();
02596 
02597 
02598     return true;
02599 */
02600 
02601 }
02602 
02603 IReader* CBDB_Cache::x_CreateOverflowReader(const string&  key,
02604                                             int            version,
02605                                             const string&  subkey,
02606                                             size_t&        file_length,
02607                                             TBlobLock&     blob_lock)
02608 {
02609     string path;
02610     s_MakeOverflowFileName(path, m_Path, GetName(), key, version, subkey);
02611     auto_ptr<CNcbiIfstream>
02612         overflow_file(new CNcbiIfstream(path.c_str(),
02613                                         IOS_BASE::in | IOS_BASE::binary));
02614     if (!overflow_file->is_open()) {
02615         return 0;
02616     }
02617     CFile entry(path);
02618     file_length = (size_t) entry.GetLength();
02619 
02620     return new CBDB_CacheIReader(*this, overflow_file.release(), blob_lock);
02621 }
02622 
02623 
02624 IReader* CBDB_Cache::GetReadStream(const string&  key,
02625                                    int            version,
02626                                    const string&  subkey)
02627 {
02628     EBDB_ErrCode ret;
02629 
02630     time_t curr = time(0);
02631     int tz_delta = m_LocalTimer.GetLocalTimezone();
02632     int overflow = 0;
02633     unsigned volume_id = 0, split_id = 0;
02634 
02635     unsigned blob_id = GetBlobId(key, version, subkey);
02636     if (!blob_id) return 0;
02637     TBlobLock blob_lock(m_LockVector, blob_id, m_LockTimeout);
02638 
02639 
02640     // TODO: unify read prolog code for all read functions
02641 
02642     CBDB_Transaction trans(*m_Env,
02643                         CBDB_Transaction::eTransASync, // async!
02644                         CBDB_Transaction::eNoAssociation);
02645 
02646     {{
02647         CFastMutexGuard guard(m_DB_Lock);
02648         {{ // TODO: nested scope here is nonsensical
02649             m_CacheAttrDB->SetTransaction(&trans);
02650             CBDB_FileCursor cur(*m_CacheAttrDB, trans,
02651                                 CBDB_FileCursor::eReadModifyUpdate);
02652             cur.SetCondition(CBDB_FileCursor::eEQ);
02653             cur.From << key << version << subkey;
02654             ret = cur.Fetch();
02655             if (ret == eBDB_Ok) {
02656                 if (m_TimeStampFlag & fCheckExpirationAlways) {
02657                     if (x_CheckTimeStampExpired(*m_CacheAttrDB, curr)) {
02658                         return false;
02659                     }
02660                 }
02661                 unsigned read_count = m_CacheAttrDB->read_count;
02662                 m_CacheAttrDB->read_count = read_count + 1;
02663 
02664                 unsigned max_time = m_CacheAttrDB->max_time;
02665                 if (max_time == 0 || max_time >= (unsigned) curr) {
02666                     if ( m_TimeStampFlag & fTimeStampOnRead ) {
02667                         m_CacheAttrDB->time_stamp = (unsigned)curr;
02668                     }
02669                 }
02670 
02671                 blob_id = m_CacheAttrDB->blob_id;
02672                 overflow = m_CacheAttrDB->overflow;
02673                 volume_id = m_CacheAttrDB->volume_id;
02674                 split_id = m_CacheAttrDB->split_id;
02675 
02676                 ret = cur.Update();
02677 
02678             } else {
02679                 return false;
02680             }
02681         }} // cursor
02682     }} // m_DB_Lock
02683 
02684     if (ret != eBDB_Ok) {
02685         return false;
02686     }
02687     _ASSERT(blob_id);
02688     trans.Commit();
02689 
02690     //  FIXME: locks, statistics, etc
02691     string owner_name;
02692     m_CacheAttrDB->owner_name.ToString(owner_name);
02693     if (IsSaveStatistics()) {
02694         CFastMutexGuard guard(m_DB_Lock);
02695         m_Statistics.AddRead(owner_name, curr - tz_delta);
02696     }
02697 
02698     if (overflow) {
02699         size_t bsize;
02700         auto_ptr<IReader> rd;
02701         rd.reset(
02702             x_CreateOverflowReader(key, version, subkey, bsize, blob_lock));
02703         return rd.release();
02704     }
02705 
02706     // Inline BLOB, reading from BDB storage
02707     if (blob_id == 0) {
02708         return 0;
02709     }
02710 
02711     unsigned coords[2];
02712     ret = m_BLOB_SplitStore->GetCoordinates(blob_id, coords);
02713     if (ret == eBDB_Ok) {
02714         if (coords[0] != volume_id ||
02715             coords[1] != split_id) {
02716             // TODO: restore de-mux mapping
02717         }
02718     } else {
02719         // TODO: restore de-mux mapping
02720     }
02721 
02722     coords[0] = volume_id;
02723     coords[1] = split_id;
02724 
02725     m_BLOB_SplitStore->SetTransaction(0);
02726 
02727     auto_ptr<CBDB_RawFile::TBuffer> buffer(new CBDB_RawFile::TBuffer(8 * 1024));
02728     ret = m_BLOB_SplitStore->ReadRealloc(blob_id, coords, *buffer);
02729     if (ret != eBDB_Ok) {
02730         return 0;
02731     }
02732     return new CBDB_CacheIReader(*this, buffer.release(), blob_lock);
02733 
02734 
02735 /*
02736     {{
02737         CFastMutexGuard guard(m_DB_Lock);
02738         m_CacheAttrDB->SetTransaction(0);
02739 
02740         bool rec_exists =
02741             x_RetrieveBlobAttributes(key, version, subkey,
02742                                     overflow, ttl,
02743                                     blob_id, volume_id, split_id);
02744         if (!rec_exists) {
02745             return 0;
02746         }
02747         // check expiration
02748         if (m_TimeStampFlag & fCheckExpirationAlways) {
02749             if (x_CheckTimeStampExpired()) {
02750                 return 0;
02751             }
02752         }
02753 
02754         m_CacheAttrDB->owner_name.ToString(m_TmpOwnerName);
02755         m_Statistics.AddRead(m_TmpOwnerName, curr - tz_delta);
02756 
02757         // Update the time stamp
02758         if ( m_TimeStampFlag & fTimeStampOnRead ) {
02759             CBDB_Transaction trans(*m_Env,
02760                                     CBDB_Transaction::eEnvDefault,
02761                                     CBDB_Transaction::eNoAssociation);
02762             m_CacheAttrDB->SetTransaction(&trans);
02763 
02764             x_UpdateReadAccessTime(key, version, subkey, trans);
02765 
02766             trans.Commit();
02767         }
02768     }} // m_DB_Lock
02769 
02770 
02771 
02772     // Check if it's an overflow BLOB (external file)
02773     {{
02774         size_t bsize;
02775         auto_ptr<IReader> rd;
02776         rd.reset(x_CreateOverflowReader(overflow, key, version, subkey, bsize));
02777         if (rd.get()) {
02778             if ( m_TimeStampFlag & fTimeStampOnRead ) {
02779                 CBDB_Transaction trans(*m_Env,
02780                                         CBDB_Transaction::eEnvDefault,
02781                                         CBDB_Transaction::eNoAssociation);
02782                 {{
02783                     CFastMutexGuard guard(m_DB_Lock);
02784                     x_UpdateReadAccessTime(key, version, subkey, trans);
02785                 }} // m_DB_Lock
02786                 trans.Commit();
02787             }
02788             return rd.release();
02789         }
02790     }}
02791 
02792 
02793     // Inline BLOB, reading from BDB storage
02794     if (blob_id == 0) {
02795         return 0;
02796     }
02797 
02798     unsigned coords[2];
02799     ret = m_BLOB_SplitStore->GetCoordinates(blob_id, coords);
02800     if (ret == eBDB_Ok) {
02801         if (coords[0] != volume_id ||
02802             coords[1] != split_id) {
02803             // TO DO: restore de-mux mapping
02804         }
02805     } else {
02806         // TODO: restore de-mux mapping
02807     }
02808 
02809     coords[0] = volume_id;
02810     coords[1] = split_id;
02811 
02812     m_BLOB_SplitStore->SetTransaction(0);
02813 
02814     auto_ptr<CBDB_RawFile::TBuffer> buffer(new CBDB_RawFile::TBuffer(4096));
02815     ret = m_BLOB_SplitStore->ReadRealloc(blob_id, coords, *buffer);
02816     if (ret != eBDB_Ok) {
02817         return 0;
02818     }
02819     return new CBDB_CacheIReader(*this, buffer.release());
02820 */
02821 }
02822 
02823 IReader* CBDB_Cache::GetReadStream(const string&  /* key */,
02824                                    const string&  /* subkey */,
02825                                    int*           /* version */,
02826                                    ICache::EBlobVersionValidity* /* validity */)
02827 {
02828     NCBI_USER_THROW("CBDB_Cache::GetReadStream("
02829         "key, subkey, &version, &validity) is not implemented");
02830 }
02831 
02832 
02833 void CBDB_Cache::SetBlobVersionAsCurrent(const string&  /* key */,
02834                                        const string&  /* subkey */,
02835                                        int            /* version */)
02836 {
02837     NCBI_USER_THROW("CBDB_Cache::GetReadStream("
02838         "key, subkey, version) is not implemented");
02839 }
02840 
02841 
02842 void CBDB_Cache::GetBlobAccess(const string&     key,
02843                                int               version,
02844                                const string&     subkey,
02845                                SBlobAccessDescr* blob_descr)
02846 {
02847     _ASSERT(blob_descr);
02848 
02849     blob_descr->reader.reset();
02850     blob_descr->blob_size = 0;
02851     blob_descr->blob_found = false;
02852 
02853     EBDB_ErrCode ret;
02854 
02855     time_t curr = time(0);
02856     int tz_delta = m_LocalTimer.GetLocalTimezone();
02857     int overflow = 0;
02858     unsigned volume_id = 0, split_id = 0;
02859 
02860     unsigned blob_id = GetBlobId(key, version, subkey);
02861     if (!blob_id) {
02862         //_TRACE("CBDB_Cache::GetBlobAccess return 1");
02863         return;
02864     }
02865     TBlobLock blob_lock(m_LockVector, blob_id, m_LockTimeout);
02866 
02867 
02868     // TODO: unify read prolog code for all read functions
02869 
02870     CBDB_Transaction trans(*m_Env,
02871                         CBDB_Transaction::eTransASync, // async!
02872                         CBDB_Transaction::eNoAssociation);
02873 
02874     {{
02875         CFastMutexGuard guard(m_DB_Lock);
02876         {{ // TODO: nested scope here is nonsensical
02877             m_CacheAttrDB->SetTransaction(&trans);
02878 
02879             CBDB_FileCursor cur(*m_CacheAttrDB, trans,
02880                                 CBDB_FileCursor::eReadModifyUpdate);
02881             cur.SetCondition(CBDB_FileCursor::eEQ);
02882             cur.From << key << version << subkey;
02883             ret = cur.Fetch();
02884             if (ret == eBDB_Ok) {
02885                 if (m_TimeStampFlag & fCheckExpirationAlways) {
02886                     if (x_CheckTimeStampExpired(*m_CacheAttrDB, curr)) {
02887                         //_TRACE("CBDB_Cache::GetBlobAccess return 2");
02888                         return;
02889                     }
02890                 }
02891                 unsigned read_count = m_CacheAttrDB->read_count;
02892                 m_CacheAttrDB->read_count = read_count + 1;
02893 
02894                 unsigned max_time = m_CacheAttrDB->max_time;
02895                 if (max_time == 0 || max_time >= (unsigned) curr) {
02896                     if ( m_TimeStampFlag & fTimeStampOnRead ) {
02897                         m_CacheAttrDB->time_stamp = (unsigned)curr;
02898                     }
02899                 }
02900 
02901                 overflow = m_CacheAttrDB->overflow;
02902                 volume_id = m_CacheAttrDB->volume_id;
02903                 split_id = m_CacheAttrDB->split_id;
02904 
02905                 ret = cur.Update();
02906             } else {
02907                 //_TRACE("CBDB_Cache::GetBlobAccess return 3");
02908                 return;
02909             }
02910         }} // cursor
02911     }} // m_DB_Lock
02912 
02913     if (ret != eBDB_Ok) {
02914         //_TRACE("CBDB_Cache::GetBlobAccess return 4");
02915         return;
02916     }
02917     _ASSERT(blob_id);
02918     trans.Commit();
02919 
02920     //  FIXME: locks, statistics, etc
02921     string owner_name;
02922     m_CacheAttrDB->owner_name.ToString(owner_name);
02923     if (IsSaveStatistics()) {
02924         CFastMutexGuard guard(m_DB_Lock);
02925         m_Statistics.AddRead(owner_name, curr - tz_delta);
02926     }
02927 
02928 
02929 // Here we have a race(sic!) between overflow flag read/write and blob_id locking
02930 // same is in all read functions!
02931 // this race may affects coordinate mapping too (but it is auto-restored later)
02932 
02933     if (overflow) {
02934         blob_descr->reader.reset(
02935             x_CreateOverflowReader(key, version, subkey,
02936                                    blob_descr->blob_size, blob_lock));
02937         if (blob_descr->reader.get()) {
02938             blob_descr->blob_found = true;
02939             return;
02940         } else {
02941         }
02942     }
02943 
02944     // Inline BLOB, reading from BDB storage
02945     if (blob_id == 0) {
02946         //_TRACE("CBDB_Cache::GetBlobAccess return 5");
02947         return;
02948     }
02949 
02950     m_BLOB_SplitStore->SetTransaction(0);
02951 
02952     unsigned coords[2];
02953     ret = m_BLOB_SplitStore->GetCoordinates(blob_id, coords);
02954     if (ret == eBDB_Ok) {
02955         if (coords[0] != volume_id ||
02956             coords[1] != split_id) {
02957             // TO DO: restore de-mux mapping
02958         } else {
02959         // TODO: restore de-mux mapping
02960         }
02961     } else {
02962     }
02963 
02964     coords[0] = volume_id;
02965     coords[1] = split_id;
02966 
02967     if (blob_descr->buf && blob_descr->buf_size) {
02968         // use speculative read (hope provided buffer is sufficient)
02969         try {
02970 
02971             char** ptr = &blob_descr->buf;
02972             ret = m_BLOB_SplitStore->Fetch(blob_id, coords,
02973                                            (void**)ptr,
02974                                             blob_descr->buf_size,
02975                                             CBDB_RawFile::eReallocForbidden,
02976                                             &blob_descr->blob_size);
02977             if (ret == eBDB_Ok) {
02978                 blob_descr->blob_found = true;
02979                 return;
02980             } else {
02981             }
02982         } catch (CBDB_Exception&) {
02983         }
02984     }
02985 
02986     // speculative Fetch failed (or impossible), read it another way
02987     //
02988     // TODO: Add more realistic estimation of the BLOB size based on split
02989     //
02990     auto_ptr<CBDB_RawFile::TBuffer> buffer(
02991                         new CBDB_RawFile::TBuffer(8 * 1024));
02992     ret = m_BLOB_SplitStore->ReadRealloc(blob_id, coords, *buffer);
02993     if (ret != eBDB_Ok) {
02994         //_TRACE("CBDB_Cache::GetBlobAccess return 6");
02995         return;
02996     }
02997     blob_descr->blob_found = true;
02998     blob_descr->blob_size = buffer->size();
02999     blob_descr->reader.reset(
03000         new CBDB_CacheIReader(*this, buffer.release(), blob_lock));
03001 
03002 
03003 /*
03004 
03005     EBDB_ErrCode ret;
03006     int overflow;
03007     unsigned int ttl, blob_id, volume_id, split_id;
03008 
03009     time_t curr = time(0);
03010     int tz_delta = m_LocalTimer.GetLocalTimezone();
03011 
03012     {{
03013         CBDB_Transaction trans(*m_Env,
03014                             CBDB_Transaction::eEnvDefault,
03015                             CBDB_Transaction::eNoAssociation);
03016 
03017         {{
03018         CFastMutexGuard guard(m_DB_Lock);
03019         m_CacheAttrDB->SetTransaction(0);
03020 
03021         bool rec_exists =
03022             x_RetrieveBlobAttributes(key, version, subkey,
03023                                     overflow, ttl,
03024                                     blob_id, volume_id, split_id);
03025         if (!rec_exists) {
03026             return;
03027         }
03028         // check expiration
03029         if (m_TimeStampFlag & fCheckExpirationAlways) {
03030             if (x_CheckTimeStampExpired()) {
03031                 return;
03032             }
03033         }
03034 
03035         m_CacheAttrDB->owner_name.ToString(m_TmpOwnerName);
03036         m_Statistics.AddRead(m_TmpOwnerName, curr - tz_delta);
03037 
03038         // Update the time stamp
03039         if ( m_TimeStampFlag & fTimeStampOnRead ) {
03040             x_UpdateReadAccessTime(key, version, subkey, trans);
03041         }
03042         }} // m_DB_Lock
03043 
03044         trans.Commit();
03045     }}
03046 
03047     // Check if it's an overflow BLOB (external file)
03048     //
03049     blob_descr->reader.reset(
03050         x_CreateOverflowReader(overflow,
03051                                key, version, subkey,
03052                                blob_descr->blob_size));
03053     if (blob_descr->reader.get()) {
03054           blob_descr->blob_found = true;
03055           if ( m_TimeStampFlag & fTimeStampOnRead ) {
03056             CBDB_Transaction trans(*m_Env,
03057                                     CBDB_Transaction::eEnvDefault,
03058                                     CBDB_Transaction::eNoAssociation);
03059             {{
03060                 CFastMutexGuard guard(m_DB_Lock);
03061                 x_UpdateReadAccessTime(key, version, subkey, trans);
03062             }} // m_DB_Lock
03063             trans.Commit();
03064         }
03065         return;
03066     }
03067 
03068 
03069     // Inline BLOB, reading from BDB storage
03070     //
03071     if (blob_id == 0) {
03072         blob_descr->blob_found = false;
03073         return;
03074     }
03075 
03076 
03077     m_BLOB_SplitStore->SetTransaction(0);
03078 
03079     unsigned coords[2];
03080     ret = m_BLOB_SplitStore->GetCoordinates(blob_id, coords);
03081     if (ret == eBDB_Ok) {
03082         if (coords[0] != volume_id ||
03083             coords[1] != split_id) {
03084             // TO DO: restore de-mux mapping
03085         }
03086     } else {
03087         // TODO: restore de-mux mapping
03088     }
03089 
03090     coords[0] = volume_id;
03091     coords[1] = split_id;
03092 
03093 
03094     if (blob_descr->buf && blob_descr->buf_size) {
03095         // use speculative read (hope provided buffer is sufficient)
03096         try {
03097 
03098             char** ptr = &blob_descr->buf;
03099             ret = m_BLOB_SplitStore->Fetch(blob_id, coords,
03100                                            (void**)ptr,
03101                                             blob_descr->buf_size,
03102                                             CBDB_RawFile::eReallocForbidden,
03103                                             &blob_descr->blob_size);
03104             if (ret == eBDB_Ok) {
03105                 blob_descr->blob_found = true;
03106                 return;
03107             }
03108         } catch (CBDB_Exception&) {
03109         }
03110     }
03111 
03112     // speculative Fetch failed (or impossible), read it another way
03113     //
03114     // TODO: Add more realistic estimation of the BLOB size based on split
03115     //
03116     auto_ptr<CBDB_RawFile::TBuffer> buffer(new CBDB_RawFile::TBuffer(4096));
03117     ret = m_BLOB_SplitStore->ReadRealloc(blob_id, coords, *buffer);
03118     if (ret != eBDB_Ok) {
03119         blob_descr->blob_found = false;
03120         return;
03121     }
03122     blob_descr->blob_found = true;
03123     blob_descr->blob_size = buffer->size();
03124     blob_descr->reader.reset(
03125         new CBDB_CacheIReader(*this, buffer.release()));
03126 */
03127 }
03128 
03129 unsigned CBDB_Cache::GetBlobId(const string&  key,
03130                                int            version,
03131                                const string&  subkey)
03132 {
03133     CFastMutexGuard guard(m_CARO1_Lock);
03134 
03135     m_CacheAttrDB_RO1->SetTransaction(0);
03136 
03137     m_CacheAttrDB_RO1->key = key;
03138     m_CacheAttrDB_RO1->version = version;
03139     m_CacheAttrDB_RO1->subkey = subkey;
03140 
03141     if (m_CacheAttrDB_RO1->Fetch() == eBDB_Ok) {
03142         return m_CacheAttrDB_RO1->blob_id;
03143     }
03144     return 0;
03145 }
03146 
03147 
03148 IWriter* CBDB_Cache::GetWriteStream(const string&    key,
03149                                     int              version,
03150                                     const string&    subkey,
03151                                     unsigned int     time_to_live,
03152                                     const string&    owner)
03153 {
03154     return GetWriteStream(0, key, version, subkey, true /*lock id*/,
03155                           time_to_live, owner);
03156 }
03157 
03158 
03159 IWriter* CBDB_Cache::GetWriteStream(unsigned         blob_id_ext,
03160                                     const string&    key,
03161                                     int              version,
03162                                     const string&    subkey,
03163                                     bool             do_id_lock,
03164                                     unsigned int     time_to_live,
03165                                     const string&    owner)
03166 {
03167     if (IsReadOnly()) {
03168         return 0;
03169     }
03170 
03171     //_TRACE("CBDB_Cache::GetWriteStream point 1");
03172     if (m_VersionFlag == eDropAll || m_VersionFlag == eDropOlder) {
03173         //_TRACE("CBDB_Cache::GetWriteStream point 2");
03174         Purge(key, subkey, 0, m_VersionFlag);
03175     }
03176 
03177     //_TRACE("CBDB_Cache::GetWriteStream point 3");
03178     if (m_MaxTimeout) {
03179         if (time_to_live > m_MaxTimeout) {
03180             time_to_live = m_MaxTimeout;
03181         }
03182     } else { // m_MaxTimeout == 0
03183         if (m_MaxTTL_prolong != 0 && m_Timeout != 0) {
03184             time_to_live = min(m_Timeout * m_MaxTTL_prolong, time_to_live);
03185         }
03186     }
03187     time_t curr = time(0);
03188     int tz_delta = m_LocalTimer.GetLocalTimezone();
03189 
03190 
03191     TBlobLock blob_lock(m_LockVector, m_LockTimeout);
03192 
03193     // ----------------------------------------------------
03194     // BLOB check-in, read attributes, lock the id
03195     // ----------------------------------------------------
03196     unsigned coord[2] = {0,};
03197     unsigned overflow;
03198     _VERIFY(
03199         BlobCheckIn(blob_id_ext,
03200                     key, version, subkey,
03201                     eBlobCheckIn_Create,
03202                     blob_lock, do_id_lock,
03203                     &coord[0], &coord[1],
03204                     &overflow)
03205             != EBlobCheckIn_NotFound);
03206     _ASSERT(blob_lock.GetId());
03207 
03208 
03209     return
03210         new CBDB_CacheIWriter(*this,
03211                               m_Path.c_str(),
03212                               blob_lock.GetId(),
03213                               key,
03214                               version,
03215                               subkey,
03216                               *m_CacheAttrDB,
03217                               time_to_live,
03218                               curr - tz_delta,
03219                               owner,
03220                               blob_lock);
03221 }
03222 
03223 
03224 void CBDB_Cache::Remove(const string& key)
03225 {
03226     if (IsReadOnly()) {
03227         return;
03228     }
03229 
03230     vector<SCacheDescr>  cache_elements;
03231 
03232     // Search the records to delete
03233 
03234     {{
03235         CFastMutexGuard guard(m_DB_Lock);
03236         m_CacheAttrDB->SetTransaction(0);
03237 
03238         CBDB_FileCursor cur(*m_CacheAttrDB);
03239         cur.SetCondition(CBDB_FileCursor::eEQ);
03240 
03241         cur.From << key;
03242         while (cur.Fetch() == eBDB_Ok) {
03243             int version = m_CacheAttrDB->version;
03244             const char* subkey = m_CacheAttrDB->subkey;
03245 
03246             int overflow = m_CacheAttrDB->overflow;
03247             unsigned blob_id = m_CacheAttrDB->blob_id;
03248 
03249             cache_elements.push_back(
03250                 SCacheDescr(key, version, subkey, overflow, blob_id));
03251 
03252             if (IsSaveStatistics()) {
03253                 unsigned read_count = m_CacheAttrDB->read_count;
03254                 string owner_name;
03255                 m_CacheAttrDB->owner_name.ToString(owner_name);
03256                 if (0 == read_count) {
03257                     m_Statistics.AddNeverRead(owner_name);
03258                 }
03259                 m_Statistics.AddExplDelete(owner_name);
03260                 x_UpdateOwnerStatOnDelete(owner_name, true/*expl-delete*/);
03261             }
03262         }
03263     }} // m_DB_Lock
03264 
03265     {{
03266     CBDB_Transaction trans(*m_Env,
03267                         CBDB_Transaction::eEnvDefault,
03268                         CBDB_Transaction::eNoAssociation);
03269 
03270     // Now delete all objects
03271 
03272     ITERATE(vector<SCacheDescr>, it, cache_elements) {
03273         {{
03274             CFastMutexGuard guard(m_DB_Lock);
03275             m_BLOB_SplitStore->SetTransaction(&trans);
03276 
03277             x_DropBlob(it->key,
03278                        it->version,
03279                        it->subkey,
03280                        it->overflow,
03281                        it->blob_id,
03282                        trans);
03283 
03284         }} // m_DB_Lock
03285 
03286         {{
03287             CFastMutexGuard guard(m_TimeLine_Lock);
03288             m_TimeLine->RemoveObject(it->blob_id);
03289         }} // m_TimeLine_Lock
03290 
03291     }
03292 
03293     trans.Commit();
03294     }}
03295 
03296 }
03297 
03298 void CBDB_Cache::Remove(const string&    key,
03299                         int              version,
03300                         const string&    subkey)
03301 {
03302     if (IsReadOnly()) {
03303         return;
03304     }
03305 
03306     // Search the records to delete
03307 
03308     vector<SCacheDescr>  cache_elements;
03309 
03310     {{
03311     CFastMutexGuard guard(m_DB_Lock);
03312     m_CacheAttrDB->SetTransaction(0);
03313 
03314     CBDB_FileCursor cur(*m_CacheAttrDB);
03315     cur.SetCondition(CBDB_FileCursor::eEQ);
03316 
03317     cur.From << key << version << subkey;
03318     while (cur.Fetch() == eBDB_Ok) {
03319         int overflow = m_CacheAttrDB->overflow;
03320         unsigned blob_id = m_CacheAttrDB->blob_id;
03321 
03322         cache_elements.push_back(
03323             SCacheDescr(key, version, subkey, overflow, blob_id));
03324 
03325         if (IsSaveStatistics()) {
03326             unsigned read_count = m_CacheAttrDB->read_count;
03327             string owner_name;
03328             m_CacheAttrDB->owner_name.ToString(owner_name);
03329             if (0 == read_count) {
03330                 m_Statistics.AddNeverRead(owner_name);
03331             }
03332             m_Statistics.AddExplDelete(owner_name);
03333             x_UpdateOwnerStatOnDelete(owner_name, true/*expl-delete*/);
03334         }
03335     }
03336 
03337     }} // m_DB_Lock
03338 
03339 
03340     // TODO: add blob id locking here
03341     //
03342     CBDB_Transaction trans(*m_Env,
03343                             CBDB_Transaction::eEnvDefault,
03344                             CBDB_Transaction::eNoAssociation);
03345 
03346     ITERATE(vector<SCacheDescr>, it, cache_elements) {
03347         {{
03348             CFastMutexGuard guard(m_DB_Lock);
03349             m_BLOB_SplitStore->SetTransaction(&trans);
03350 
03351             x_DropBlob(it->key,
03352                        it->version,
03353                        it->subkey,
03354                        it->overflow,
03355                        it->blob_id,
03356                        trans);
03357         }} // m_DB_Lock
03358 
03359         {{
03360             CFastMutexGuard guard(m_TimeLine_Lock);
03361             m_TimeLine->RemoveObject(it->blob_id);
03362         }} // m_TimeLine_Lock
03363     }
03364 
03365     trans.Commit();
03366 }
03367 
03368 
03369 time_t CBDB_Cache::GetAccessTime(const string&  key,
03370                                  int            version,
03371                                  const string&  subkey)
03372 {
03373     _ASSERT(m_CacheAttrDB);
03374     CFastMutexGuard guard(m_DB_Lock);
03375     m_CacheAttrDB->SetTransaction(0);
03376 
03377     m_CacheAttrDB->key = key;
03378     m_CacheAttrDB->version = version;
03379     m_CacheAttrDB->subkey = subkey;
03380         //(m_TimeStampFlag & fTrackSubKey) ? subkey : "";
03381 
03382     EBDB_ErrCode ret = m_CacheAttrDB->Fetch();
03383     if (ret != eBDB_Ok) {
03384         return 0;
03385     }
03386 
03387     return (int) m_CacheAttrDB->time_stamp;
03388 }
03389 
03390 void CBDB_Cache::SetPurgeBatchSize(unsigned batch_size)
03391 {
03392     CFastMutexGuard guard(m_DB_Lock);
03393     m_PurgeBatchSize = batch_size;
03394 }
03395 
03396 unsigned CBDB_Cache::GetPurgeBatchSize() const
03397 {
03398     CFastMutexGuard guard(m_DB_Lock);
03399     return m_PurgeBatchSize;
03400 }
03401 
03402 void CBDB_Cache::SetBatchSleep(unsigned sleep)
03403 {
03404     CFastMutexGuard guard(m_DB_Lock);
03405     m_BatchSleep = sleep;
03406 }
03407 
03408 unsigned CBDB_Cache::GetBatchSleep() const
03409 {
03410     CFastMutexGuard guard(m_DB_Lock);
03411     return m_BatchSleep;
03412 }
03413 
03414 void CBDB_Cache::StopPurge()
03415 {
03416     CFastMutexGuard guard(m_DB_Lock);
03417     m_PurgeStopSignal.Post();
03418 }
03419 
03420 
03421 void CBDB_Cache::WriteOverflow(CNcbiOfstream& overflow_file,
03422                                const string&  overflow_file_path,
03423                                const char*    buf,
03424                                streamsize     count)
03425 {
03426     overflow_file.write(buf, count);
03427     if (overflow_file.bad()) {
03428 
03429         overflow_file.close();
03430 
03431         string err("Overflow file IO error ");
03432         err += overflow_file_path;
03433 
03434         x_DropOverflow(overflow_file_path.c_str());
03435 
03436         BDB_THROW(eOverflowFileIO, err);
03437     }
03438 }
03439 
03440 void CBDB_Cache::AddToTimeLine(unsigned blob_id, time_t exp_time)
03441 {
03442     {{
03443         CFastMutexGuard guard(m_TimeLine_Lock);
03444         m_TimeLine->AddObject(exp_time, blob_id);
03445     }}
03446 }
03447 
03448 /// @internal
03449 ///
03450 /// Sets specified flag to FALSE on destruction
03451 ///
03452 class CPurgeFlagGuard
03453 {
03454 public:
03455     CPurgeFlagGuard()
03456         : m_Flag(0)
03457     {}
03458 
03459     ~CPurgeFlagGuard()
03460     {
03461         if (m_Flag) {
03462             *m_Flag = false;
03463         }
03464     }
03465 
03466     void SetFlag(bool* flag)
03467     {
03468         m_Flag = flag;
03469         *m_Flag = true;
03470     }
03471 private:
03472     bool*  m_Flag;
03473 };
03474 
03475 
03476 void CBDB_Cache::EvaluateTimeLine(bool* interrupted)
03477 {
03478     time_t curr = time(0);
03479 
03480     if (m_LastTimeLineCheck) {
03481         if ((curr - m_LastTimeLineCheck) < (time_t)m_TimeLine->GetDiscrFactor()) {
03482             if (m_Monitor && m_Monitor->IsActive()) {
03483                 string msg = "Purge: Timeline evaluation skiped ";
03484                 msg += "(early wakeup for this precision) remains=";
03485                 msg += NStr::ULongToString(
03486                         (unsigned long)(m_TimeLine->GetDiscrFactor() -
03487                                         (curr - m_LastTimeLineCheck)));
03488                 msg += " precision=";
03489                 msg += NStr::UIntToString(m_TimeLine->GetDiscrFactor());
03490                 msg += "\n";
03491                 m_Monitor->Send(msg);
03492             }
03493 
03494             return;
03495         }
03496     }
03497     m_LastTimeLineCheck = curr;
03498 
03499     TBitVector delete_candidates;
03500     {{
03501         CFastMutexGuard guard(m_TimeLine_Lock);
03502         m_TimeLine->ExtractObjects(curr, &delete_candidates);
03503         if (!delete_candidates.any()) {
03504             if (m_Monitor && m_Monitor->IsActive()) {
03505                 string msg =
03506                     "Purge: Timeline evaluation exits "
03507                     "(no candidates) \n";
03508                 m_Monitor->Send(msg);
03509             }
03510 
03511             return;
03512         }
03513     }}
03514 
03515     unsigned batch_size = m_PurgeBatchSize;
03516     if (batch_size < 10) {
03517         batch_size = 10;
03518     }
03519 
03520     unsigned candidates = delete_candidates.count();
03521 
03522     delete_candidates -= m_GC_Deleted;
03523     unsigned cleaned_candidates = delete_candidates.count();
03524     if (cleaned_candidates != candidates) {
03525         if (m_Monitor && m_Monitor->IsActive()) {
03526             string msg;
03527             msg = "Purge: Timeline total timeline candidates=";
03528             msg += NStr::UIntToString(candidates);
03529             msg += " cleaned_candidates=";
03530             msg += NStr::UIntToString(cleaned_candidates);
03531             msg += "\n";
03532 
03533             m_Monitor->Send(msg);
03534         }
03535 
03536         candidates = cleaned_candidates;
03537     }
03538 
03539 
03540     {{
03541     double id_trans_time = 0.0;  // id translation time
03542     double exp_check_time = 0.0; // expiration check time
03543     double del_time = 0.0;       // BLOB delete time
03544     unsigned deleted_cnt = 0;    // total number of deleted BLOBs
03545 
03546     vector<unsigned> blob_id_vect(batch_size);
03547     vector<SCacheDescr> blob_batch_vect(batch_size);
03548     vector<SCacheDescr> blob_exp_vect(batch_size);
03549 
03550     for (TBitVector::enumerator en = delete_candidates.first(); en.valid();) {
03551 
03552         // extract batch of blob ids out of the bit-vector
03553         //
03554         blob_id_vect.resize(0);
03555         blob_batch_vect.resize(0);
03556         blob_exp_vect.resize(0);
03557         for (unsigned i = 0; en.valid() && i < batch_size; ++i) {
03558             unsigned blob_id = *en;
03559             blob_id_vect.push_back(blob_id);
03560             ++en;
03561         } // for i
03562 
03563         // Translate IDs to keys
03564         //
03565         if (blob_id_vect.size()) {
03566             CStopWatch  sw(CStopWatch::eStart);
03567             vector<unsigned>::const_iterator it = blob_id_vect.begin();
03568             unsigned blob_id = *it;
03569 
03570             CFastMutexGuard guard(m_IDIDX_Lock_RO);
03571             m_CacheIdIDX_RO->SetTransaction(0);
03572             CBDB_FileCursor cur(*m_CacheIdIDX_RO);
03573             cur.SetCondition(CBDB_FileCursor::eGE);
03574             cur.From << blob_id;
03575 
03576             while (true) {
03577                 if (cur.Fetch() == eBDB_Ok) {
03578                     unsigned bid = m_CacheIdIDX_RO->blob_id;
03579                     if (blob_id != bid) { // record has been deleted by now
03580                         if ((++it) == blob_id_vect.end())
03581                             break;
03582                         cur.SetCondition(CBDB_FileCursor::eGE);
03583                         cur.From << (blob_id = *it);
03584 
03585                     } else {
03586                         int version = m_CacheIdIDX_RO->version;
03587                         const char* key = m_CacheIdIDX_RO->key;
03588                         const char* subkey = m_CacheIdIDX_RO->subkey;
03589                         blob_batch_vect.push_back(
03590                             SCacheDescr(key, version, subkey, 0, blob_id));
03591                     }
03592                 }
03593                 ++it;
03594                 if (it == blob_id_vect.end()) {
03595                     break;
03596                 }
03597                 // check if next blob can be get by fetch or we need to restart
03598                 // the cursor
03599                 //
03600                 if ((blob_id + 1) == *it) {
03601                     ++blob_id;
03602                 } else {
03603                     // cursor restart
03604                     //
03605                     cur.SetCondition(CBDB_FileCursor::eGE);
03606                     cur.From << (blob_id = *it);
03607                 }
03608             } // while
03609 
03610             id_trans_time = sw.Elapsed();
03611 
03612         } // if
03613 
03614         // Check expiration of BLOB keys
03615         //
03616         if (blob_batch_vect.size()) {
03617             time_t curr = time(0); // initial timepoint
03618             CStopWatch  sw(CStopWatch::eStart);
03619 
03620             {{
03621             CFastMutexGuard guard(m_CARO2_Lock);
03622 
03623             m_CacheAttrDB_RO2->SetTransaction(0);
03624             CBDB_FileCursor cur(*m_CacheAttrDB_RO2);
03625 
03626             for (size_t i = 0; i < blob_batch_vect.size(); ++i) {
03627                 SCacheDescr& blob_descr = blob_batch_vect[i];
03628                 cur.SetCondition(CBDB_FileCursor::eEQ);
03629                 cur.From << blob_descr.key
03630                          << blob_descr.version
03631                          << blob_descr.subkey;
03632                 if (cur.Fetch() == eBDB_Ok) {
03633                     blob_descr.overflow = m_CacheAttrDB_RO2->overflow;
03634                     _ASSERT(blob_descr.blob_id == m_CacheAttrDB_RO2->blob_id);
03635                     blob_descr.blob_id = m_CacheAttrDB_RO2->blob_id;
03636                     time_t exp_time;
03637                     if (x_CheckTimeStampExpired(
03638                         *m_CacheAttrDB_RO2, curr, &exp_time)) {
03639 
03640                          blob_exp_vect.push_back(blob_descr);
03641 
03642                     } else {
03643                         {{
03644                             CFastMutexGuard guard(m_TimeLine_Lock);
03645                             m_TimeLine->AddObject(exp_time, blob_descr.blob_id);
03646                         }} // m_TimeLine_Lock
03647                     }
03648                 }
03649             } // for
03650             }} // m_CARO2_Lock
03651 
03652             exp_check_time = sw.Elapsed();
03653         }
03654 
03655         unsigned deleted_batch_cnt = 0;
03656         // Delete the batch of expired BLOBs
03657         //
03658         if (blob_exp_vect.size()) {
03659             CStopWatch  sw(CStopWatch::eStart);
03660             for (size_t i = 0; i < blob_exp_vect.size(); ++i) {
03661 
03662                 const SCacheDescr& blob_descr = blob_exp_vect[i];
03663                 CBDB_Transaction trans(*m_Env,
03664                                        CBDB_Transaction::eTransASync, // async!
03665                                        CBDB_Transaction::eNoAssociation);
03666 
03667                 if (m_Monitor && m_Monitor->IsActive()) {
03668                     string msg = GetFastLocalTime().AsString();
03669                     msg += " Purge: DELETING \"";
03670                     msg += blob_descr.key;
03671                     msg += "\"-";
03672                     msg += NStr::UIntToString(blob_descr.version);
03673                     msg += "-\"";
03674                     msg += blob_descr.subkey;
03675                     msg += "\"\n";
03676                     m_Monitor->Send(msg);
03677                 }
03678 
03679                 bool blob_deleted =
03680                     DropBlobWithExpCheck(blob_descr.key,
03681                                          blob_descr.version,
03682                                          blob_descr.subkey,
03683                                          trans);
03684                 trans.Commit();
03685                 deleted_batch_cnt += blob_deleted;
03686 /*
03687                 if (blob_deleted) {
03688                     if (m_Monitor && m_Monitor->IsActive()) {
03689                         string msg =
03690                             "Purge: Timeline DELETE blob =" + blob_descr.key
03691                             "\n";
03692                         m_Monitor->Send(msg);
03693                     }
03694                 }
03695 */
03696 
03697             } // for i
03698 
03699             del_time = sw.Elapsed();
03700             deleted_cnt += deleted_batch_cnt;
03701         }
03702 
03703         if (m_Monitor && m_Monitor->IsActive()) {
03704             string msg;
03705             msg = "Purge: Timeline deleted_batch_cnt=";
03706             msg += NStr::UIntToString(deleted_batch_cnt);
03707             msg += " deleted_cnt=";
03708             msg += NStr::UIntToString(deleted_cnt);
03709             msg += " candidates=";
03710             msg += NStr::UIntToString(candidates);
03711 
03712             msg += " id_trans_time=";
03713             msg += NStr::DoubleToString(id_trans_time, 5);
03714             msg += " exp_check_time=";
03715             msg += NStr::DoubleToString(exp_check_time, 5);
03716             msg += " del_time=";
03717             msg += NStr::DoubleToString(del_time, 5);
03718             msg += "\n";
03719 
03720             m_Monitor->Send(msg);
03721         }
03722 
03723         m_GC_Deleted.optimize();
03724 
03725         // check for shutdown signal
03726         {{
03727             unsigned delay = m_BatchSleep * 1000000;
03728             bool scan_stop = m_PurgeStopSignal.TryWait(0, delay);
03729             if (scan_stop) {
03730                 if (interrupted) {
03731                     *interrupted = true;
03732                     return;
03733                 }
03734             }
03735         }}
03736 
03737     } // for en
03738     }}
03739 
03740 /*
03741     for (; en.valid(); ++en) {
03742         unsigned blob_id = *en;
03743         try {
03744             CBDB_Transaction trans(*m_Env,
03745                                     CBDB_Transaction::eTransASync, // async!
03746                                     CBDB_Transaction::eNoAssociation);
03747 
03748             bool blob_deleted = DropBlobWithExpCheck(blob_id, trans);
03749 
03750             if (blob_deleted) {
03751                 trans.Commit();
03752                 ++deleted;
03753                 if (m_Monitor && m_Monitor->IsActive()) {
03754                     string msg =
03755                         "Purge: Timeline DELETE blob id=" +
03756                         NStr::UIntToString(blob_id)+ "\n";
03757                     m_Monitor->Send(msg);
03758                 }
03759             }
03760 
03761             // BLOB delete part of timeline can be quite expensive, lock
03762             // some resources needed by competing threads
03763             // so this code introduces delay to give way to others
03764             // ideally this delay should be adaptive dictated by current server load
03765             if (++cnt >= batch_size) {
03766                 // get to nanoseconds as CSemaphore::TryWait() needs
03767                 unsigned delay = m_BatchSleep * 1000000;
03768                 bool scan_stop = m_PurgeStopSignal.TryWait(0, delay);
03769                 if (scan_stop) {
03770                     if (interrupted) {
03771                         *interrupted = true;
03772                         return;
03773                     }
03774                 }
03775                 cnt = 0;
03776             }
03777 
03778         }
03779         catch (CBDB_ErrnoException& ex)
03780         {
03781             if (ex.IsRecovery()) {  // serious stuff! need to quit
03782                 throw;
03783             }
03784             LOG_POST_X(20, Error
03785                 <<  "Purge suppressed exception when deleting BLOB "
03786                 << ex.what());
03787             if (m_Monitor && m_Monitor->IsActive()) {
03788                 m_Monitor->Send(ex.what());
03789             }
03790 
03791         }
03792         catch(exception& ex) {
03793             LOG_POST_X(21, Error
03794                 <<  "Purge suppressed exception when deleting BLOB "
03795                 << ex.what());
03796             if (m_Monitor && m_Monitor->IsActive()) {
03797                 m_Monitor->Send(ex.what());
03798             }
03799         }
03800 
03801     } // for en
03802 
03803     if (interrupted) {
03804         *interrupted = false;
03805     }
03806 
03807     if (m_Monitor && m_Monitor->IsActive()) {
03808         string msg =
03809             "Purge: Timeline deleted=" + NStr::UIntToString(deleted)
03810             + " out of candidates=" + NStr::UIntToString(candidates)
03811             + "\n";
03812         m_Monitor->Send(msg);
03813     }
03814 */
03815 }
03816 
03817 void CBDB_Cache::Purge(time_t           access_timeout,
03818                        EKeepVersions    keep_last_version)
03819 {
03820     // Protect Purge form double (run from different threads)
03821     CPurgeFlagGuard pf_guard;
03822     {{
03823         CFastMutexGuard guard(m_DB_Lock);
03824         if (m_PurgeNowRunning)
03825             return;
03826         pf_guard.SetFlag(&m_PurgeNowRunning);
03827     }}
03828 
03829 
03830     if (IsReadOnly()) {
03831         return;
03832     }
03833 
03834     if (keep_last_version == eDropAll && access_timeout == 0) {
03835         CFastMutexGuard guard(m_DB_Lock);
03836         // TODO: SetTransaction here ??? Which one, zero?
03837         x_TruncateDB();
03838         return;
03839     }
03840 purge_start:
03841 
03842     bool scan_interrupted = false;
03843     EvaluateTimeLine(&scan_interrupted);
03844     if (scan_interrupted) {
03845         return;
03846     }
03847 
03848     time_t gc_start = time(0); // time when we started GC scan
03849 
03850     if (m_NextExpTime) {
03851 
03852         // Mutiply to give new GC chance to kill first
03853         unsigned precision = m_TimeLine->GetDiscrFactor() * 3;
03854 
03855         //                                 [ precision ]
03856         //                                   |
03857         //                                   V
03858         // -----------*-------------------#<========>---------*-->> time
03859         //            *                   #                   ^
03860         //            *                   #                   *
03861         //           gc_start             { Next Expiration } *
03862         //            *                                       *
03863         //            *****************************************
03864         //
03865         // wait until current time goes after next projected expiration, plus
03866         // timeline precision, otherwise no need to do scanning
03867         //
03868         if (gc_start < (m_NextExpTime + (time_t)precision)) {
03869             if (m_Monitor && m_Monitor->IsActive()) {
03870                 unsigned remains = (unsigned)((m_NextExpTime + precision) - gc_start);
03871                 unsigned rc = 0;
03872                 {{
03873                     CFastMutexGuard guard(m_CARO2_Lock);
03874                     m_CacheAttrDB_RO2->SetTransaction(0);
03875                     rc = m_CacheAttrDB_RO2->CountRecs();
03876                 }}
03877 
03878                 TSplitStore::TBitVector bv;
03879                 m_BLOB_SplitStore->GetIdVector(&bv);
03880                 unsigned split_store_blobs = bv.count();
03881 
03882                 TBitVector timeline_blobs;
03883                 {{
03884                     CFastMutexGuard guard(m_TimeLine_Lock);
03885                     m_TimeLine->EnumerateObjects(&timeline_blobs);
03886                 }}
03887                 unsigned tl_count = timeline_blobs.count();
03888 
03889                 m_Monitor->Send(
03890                     "Purge: scan skipped(nothing todo) remains="
03891                     + NStr::UIntToString(remains)
03892                     + "s. precision=" + NStr::UIntToString(precision)
03893                     + "s. RecCount="  + NStr::UIntToString(rc)
03894                     + " SplitCount="+ NStr::UIntToString(split_store_blobs)
03895                     + " TimeLineCount="+ NStr::UIntToString(tl_count)
03896                     + "\n");
03897             }
03898             return;
03899         } else {
03900             m_NextExpTime = 0;
03901         }
03902 
03903 /*
03904         if (m_PurgeSkipCnt >= 50) { // do not yeild Purge more than N times
03905             m_PurgeSkipCnt = 0;
03906         } else {
03907             // Check if we nothing to purge
03908             if (!(m_NextExpTime <= gc_start)) {
03909                 ++m_PurgeSkipCnt;
03910                 if (m_Monitor && m_Monitor->IsActive()) {
03911                     m_Monitor->Send("Purge: scan skipped(no delete candidates)\n");
03912                 }
03913                 return;
03914             }
03915         }
03916 */
03917     }
03918 
03919     if (m_Monitor && m_Monitor->IsActive()) {
03920         m_Monitor->Send("Purge: scan started.\n");
03921     }
03922     m_NextExpTime = 0;
03923 
03924 
03925     unsigned delay = 0;
03926     vector<SCacheDescr> cache_entries;
03927     cache_entries.reserve(1000);
03928 
03929     unsigned db_recs = 0;
03930 
03931 
03932     // Search the database for obsolete cache entries
03933     string first_key, last_key;
03934     string next_exp_key;
03935 
03936     for (bool flag = true; flag;) {
03937         unsigned batch_size = GetPurgeBatchSize();
03938         unsigned rec_cnt;
03939         {{
03940             time_t curr = time(0); // initial timepoint
03941 
03942             CFastMutexGuard guard(m_CARO2_Lock);
03943             // get to nanoseconds as CSemaphore::TryWait() needs
03944             delay = m_BatchSleep * 1000000;
03945 
03946             m_CacheAttrDB_RO2->SetTransaction(0);
03947 
03948             CBDB_FileCursor cur(*m_CacheAttrDB_RO2);
03949             cur.SetCondition(CBDB_FileCursor::eGE);
03950             cur.From << last_key;
03951 
03952             for (rec_cnt = 0; rec_cnt < batch_size; ++rec_cnt) {
03953                 if (cur.Fetch() != eBDB_Ok) {
03954                     flag = false;
03955                     break;
03956                 }
03957 
03958                 int version = m_CacheAttrDB_RO2->version;
03959                 const char* key = m_CacheAttrDB_RO2->key;
03960                 last_key = key;
03961                 int overflow = m_CacheAttrDB_RO2->overflow;
03962                 const char* subkey = m_CacheAttrDB_RO2->subkey;
03963                 unsigned blob_id = m_CacheAttrDB_RO2->blob_id;
03964 
03965                 time_t exp_time;
03966                 if (x_CheckTimeStampExpired(*m_CacheAttrDB_RO2, curr, &exp_time)) {
03967 
03968                     string owner_name;
03969                     m_CacheAttrDB_RO2->owner_name.ToString(owner_name);
03970 
03971                     // FIXME: statistics, locking, etc
03972                     if (IsSaveStatistics()) {
03973                         CFastMutexGuard guard(m_DB_Lock);
03974                         if (0 == m_CacheAttrDB_RO2->read_count) {
03975                             m_Statistics.AddNeverRead(owner_name);
03976                         }
03977                         m_Statistics.AddPurgeDelete(owner_name);
03978                         x_UpdateOwnerStatOnDelete(owner_name,
03979                                                   false//non-expl-delete
03980                                                   );
03981                     }
03982 
03983                     cache_entries.push_back(
03984                          SCacheDescr(key, version, subkey, overflow, blob_id));
03985                 } else {
03986                     ++db_recs;
03987 
03988                     {{
03989                         CFastMutexGuard guard(m_TimeLine_Lock);
03990                         m_TimeLine->AddObject(exp_time, blob_id);
03991                     }} // m_TimeLine_Lock
03992 
03993                     if (m_NextExpTime) {
03994                         if (exp_time < m_NextExpTime) {
03995                             // compute fraction of the difference
03996                             // with sensitivity based on purge thread delay
03997                             // we need this in order not to move point of restart
03998                             // too agressively
03999 
04000                             double delta = fabs(double(m_NextExpTime - exp_time));
04001                             double fraction = fabs(double(delta / m_NextExpTime));
04002                             if (fraction > 0.1 && delta > m_PurgeThreadDelay) {
04003                                 next_exp_key = key;
04004                                 m_NextExpTime = exp_time;
04005                             }
04006                         }
04007                     } else {
04008                         m_NextExpTime = exp_time;
04009                         next_exp_key = key;
04010                     }
04011 
04012 
04013                 }
04014                 if (rec_cnt == 0) { // first record in the batch
04015                     first_key = last_key;
04016                 } else
04017                 if (rec_cnt == (batch_size - 1)) { // last record
04018                     // if batch stops in the same key we increase
04019                     // the batch size to avoid the infinite loop
04020                     // when new batch starts with the very same key
04021                     // and never comes to the end
04022                     //
04023                     if (first_key == last_key) {
04024                         batch_size += 10;
04025                     }
04026                 }
04027 
04028             } // for i
04029         }} // m_CARO2_Lock
04030 
04031         if (m_Monitor && m_Monitor->IsActive()) {
04032             string msg("Purge: Inspected ");
04033             msg += NStr::UIntToString(rec_cnt);
04034             msg += " records.\n";
04035 
04036             m_Monitor->Send(msg);
04037         }
04038 
04039 
04040         // Delete BLOBs if there are deletion candidates
04041         if (cache_entries.size() > 0) {
04042             if (m_Monitor && m_Monitor->IsActive()) {
04043                 m_Monitor->Send("Purge:Deleting expired BLOBs...\n");
04044             }
04045 
04046             for (size_t i = 0; i < cache_entries.size(); ++i) {
04047                 try {
04048                     const SCacheDescr& it = cache_entries[i];
04049 
04050                     CBDB_Transaction trans(*m_Env,
04051                                             CBDB_Transaction::eTransASync, // async!
04052                                             CBDB_Transaction::eNoAssociation);
04053 
04054                     if (m_Monitor && m_Monitor->IsActive()) {
04055                         string msg = GetFastLocalTime().AsString();
04056                         msg += " Purge: DELETING \"";
04057                         msg += it.key;
04058                         msg += "\"-";
04059                         msg += NStr::UIntToString(it.version);
04060                         msg += "-\"";
04061                         msg += it.subkey;
04062                         msg += "\"\n";
04063                         m_Monitor->Send(msg);
04064                     }
04065 
04066                     {{
04067                         DropBlobWithExpCheck(it.key,
04068                                              it.version,
04069                                              it.subkey,
04070                                              trans);
04071                     }}
04072 
04073                     /*
04074                     {{
04075                         CFastMutexGuard guard(m_DB_Lock);
04076                         m_BLOB_SplitStore->SetTransaction(&trans);
04077 
04078                         x_DropBlob(it.key,
04079                                    it.version,
04080                                    it.subkey,
04081                                    it.overflow,
04082                                    it.blob_id,
04083                                    trans);
04084 
04085                     }} // m_DB_Lock
04086                     */
04087                     trans.Commit();
04088                 }
04089                 catch (CBDB_ErrnoException& ex)
04090                 {
04091                     if (ex.IsRecovery()) {  // serious stuff! need to quit
04092                         throw;
04093                     }
04094                     LOG_POST_X(22, Error
04095                         <<  "Purge suppressed exception when deleting BLOB "
04096                         << ex.what());
04097                     if (m_Monitor && m_Monitor->IsActive()) {
04098                         m_Monitor->Send(ex.what());
04099                     }
04100 
04101                 }
04102                 catch(exception& ex) {
04103                     LOG_POST_X(23, Error
04104                         <<  "Purge suppressed exception when deleting BLOB "
04105                         << ex.what());
04106                     if (m_Monitor && m_Monitor->IsActive()) {
04107                         m_Monitor->Send(ex.what());
04108                     }
04109                 }
04110 
04111             } // for i
04112 
04113             if (m_Monitor && m_Monitor->IsActive()) {
04114                 string msg =
04115                     "Purge: deleted " +
04116                     NStr::SizetToString(cache_entries.size()) +
04117                     " BLOBs\n";
04118                 m_Monitor->Send(msg);
04119             }
04120 
04121             cache_entries.resize(0);
04122         }
04123 
04124         bool purge_stop = m_PurgeStopSignal.TryWait(0, delay);
04125         if (purge_stop) {
04126             LOG_POST_X(24, Info << "BDB Cache: Stopping Purge execution.");
04127             return;
04128         }
04129 
04130         EvaluateTimeLine(&scan_interrupted);
04131         if (scan_interrupted) {
04132             LOG_POST_X(25, Warning << "BDB Cache: Stopping Purge execution.");
04133             return;
04134         }
04135 
04136 
04137     } // for flag
04138 
04139 
04140     ++m_PurgeCount;
04141 
04142     if (m_CleanLogOnPurge) {
04143         if ((m_PurgeCount % m_CleanLogOnPurge) == 0) {
04144             m_Env->CleanLog();
04145         }
04146     }
04147 
04148     if (IsSaveStatistics()) {
04149         CFastMutexGuard guard(m_DB_Lock);
04150         m_Statistics.GlobalStatistics().blobs_db = db_recs;
04151     }
04152 
04153     if ((m_PurgeCount % 200) == 0) {
04154         m_BLOB_SplitStore->Save();
04155         m_Env->ForceTransactionCheckpoint();
04156     }
04157     else
04158     if ((m_PurgeCount % 50) == 0) {
04159         m_BLOB_SplitStore->FreeUnusedMem();
04160         m_LockVector.FreeUnusedMem();
04161     }
04162 
04163     unsigned time_in_purge = 0;
04164 
04165     // check if we want to rescan the database right away
04166     if (m_PurgeThreadDelay) {
04167         time_t curr = time(0);
04168         // we spent more time in Purge than we planned to sleep
04169         // it means we have a lot of records and need to run GC
04170         // continuosly
04171         time_in_purge = (unsigned)curr - (unsigned)gc_start;
04172         if (time_in_purge >= m_PurgeThreadDelay ||
04173             (m_NextExpTime != 0 && (curr > m_NextExpTime))) {
04174 
04175             if (m_Monitor && m_Monitor->IsActive()) {
04176                 m_Monitor->Send("Purge: scan restarted. time=" +
04177                                 NStr::UIntToString(time_in_purge)+ "s.\n");
04178             }
04179 
04180             goto purge_start;
04181         }
04182     }
04183 
04184     if (m_Monitor && m_Monitor->IsActive()) {
04185         m_Monitor->Send("Purge: scan ended. Scan time=" +
04186                         NStr::UIntToString(time_in_purge)+ "s.\n");
04187     }
04188 
04189 }
04190 
04191 
04192 void CBDB_Cache::Purge(const string&    key,
04193                        const string&    subkey,
04194                        time_t           access_timeout,
04195                        EKeepVersions    keep_last_version)
04196 {
04197     if (IsReadOnly()) {
04198         return;
04199     }
04200 
04201     if (key.empty() && keep_last_version == eDropAll && access_timeout == 0) {
04202         CFastMutexGuard guard(m_DB_Lock);
04203         // TODO: SetTransaction ??? see TODO above
04204         x_TruncateDB();
04205         return;
04206     }
04207 
04208     // Search the database for obsolete cache entries
04209     vector<SCacheDescr> cache_entries;
04210 
04211     unsigned recs_scanned = 0;
04212 
04213     {{
04214     CFastMutexGuard guard(m_DB_Lock);
04215     m_CacheAttrDB->SetTransaction(0);
04216 
04217     CBDB_FileCursor cur(*m_CacheAttrDB);
04218     cur.SetCondition(CBDB_FileCursor::eEQ);
04219 
04220     cur.From << key;
04221 
04222     //CTime time_stamp(CTime::eCurrent);
04223     time_t curr = time(0); // (int)time_stamp.GetTimeT();
04224     int timeout = GetTimeout();
04225     if (access_timeout && access_timeout < timeout) {
04226         timeout = (int)access_timeout;
04227     }
04228 
04229     while (cur.Fetch() == eBDB_Ok) {
04230         ++recs_scanned;
04231 
04232         unsigned    db_time_stamp = m_CacheAttrDB->time_stamp;
04233         int         version       = m_CacheAttrDB->version;
04234         const char* x_key         = m_CacheAttrDB->key;
04235         int         overflow      = m_CacheAttrDB->overflow;
04236         string      x_subkey      = (const char*) m_CacheAttrDB->subkey;
04237         unsigned    blob_id       = m_CacheAttrDB->blob_id;
04238 
04239         unsigned ttl = m_CacheAttrDB->ttl;
04240         unsigned to = timeout;
04241 
04242         if (ttl) {  // individual timeout
04243             if (m_MaxTimeout && ttl > m_MaxTimeout) {
04244                 to = max((unsigned)timeout, (unsigned)m_MaxTimeout);
04245             } else {
04246                 to = ttl;
04247             }
04248         }
04249 
04250         if (subkey.empty()) {
04251         }
04252 
04253         // check if timeout control has been requested and stored element
04254         // should not be removed
04255         if (access_timeout && (!(curr - to > db_time_stamp))) {
04256             continue;
04257         }
04258 
04259         if (subkey.empty() || (subkey == x_subkey)) {
04260             cache_entries.push_back(
04261                      SCacheDescr(x_key, version, x_subkey, overflow, blob_id));
04262 
04263             if (IsSaveStatistics()) {
04264                 unsigned read_count = m_CacheAttrDB->read_count;
04265                 string owner_name;
04266                 m_CacheAttrDB->owner_name.ToString(owner_name);
04267                 if (0 == read_count) {
04268                     m_Statistics.AddNeverRead(owner_name);
04269                 }
04270                 m_Statistics.AddPurgeDelete(owner_name);
04271                 x_UpdateOwnerStatOnDelete(owner_name,
04272                                           false/*non-expl-delete*/);
04273             }
04274 
04275 
04276             continue;
04277         }
04278     } // while
04279 
04280     }} // m_DB_Lock
04281 
04282 
04283     ITERATE(vector<SCacheDescr>, it, cache_entries) {
04284         CBDB_Transaction trans(*m_Env,
04285                             CBDB_Transaction::eEnvDefault,
04286                             CBDB_Transaction::eNoAssociation);
04287         {{
04288             CFastMutexGuard guard(m_DB_Lock);
04289             m_BLOB_SplitStore->SetTransaction(&trans);
04290             x_DropBlob(it->key,
04291                        it->version,
04292                        it->subkey,
04293                        it->overflow,
04294                        it->blob_id,
04295                        trans);
04296         }} // m_DB_Lock
04297 
04298         trans.Commit();
04299     }
04300 }
04301 
04302 void CBDB_Cache::Verify(const string&  cache_path,
04303                         const string&  cache_name,
04304                         const string&  err_file,
04305                         bool           force_remove)
04306 {
04307     Close();
04308 
04309     m_Path = CDirEntry::AddTrailingPathSeparator(cache_path);
04310 
04311     m_Env = new CBDB_Env();
04312 
04313     m_Env->SetCacheSize(10 * 1024 * 1024);
04314     m_Env->OpenErrFile(!err_file.empty() ? err_file : string("stderr"));
04315     try {
04316         m_Env->Open(cache_path, DB_INIT_MPOOL | DB_USE_ENVIRON);
04317     }
04318     catch (CBDB_Exception& /*ex*/)
04319     {
04320         m_Env->Open(cache_path,
04321                     DB_CREATE | DB_INIT_MPOOL | DB_PRIVATE | DB_USE_ENVIRON);
04322     }
04323 
04324     LOG_POST_X(26, "Cache location: " + string(cache_path));
04325 
04326     string cache_blob_db_name =
04327        string("lcs_") + string(cache_name) + string("_blob")+ string(".db");
04328     string attr_db_name =
04329        string("lcs_") + string(cache_name) + string("_attr5") + string(".db");
04330 
04331     m_CacheAttrDB = new SCache_AttrDB();
04332     m_CacheAttrDB->SetEnv(*m_Env);
04333 
04334     string bak = m_Path + attr_db_name + ".bak";
04335     FILE* fl = fopen(bak.c_str(), "wb");
04336 
04337     LOG_POST_X(27, "Running verification for: " + attr_db_name);
04338     m_CacheAttrDB->Verify(attr_db_name.c_str(), 0, fl);
04339     delete m_CacheAttrDB; m_CacheAttrDB = 0;
04340 
04341     fclose(fl);
04342 
04343     if (force_remove) {
04344         m_Env->ForceRemove();
04345     } else {
04346         bool deleted = m_Env->Remove();
04347 
04348         if (!deleted)
04349             LOG_POST_X(28, "Cannot delete the environment (it is busy by another process)");
04350     }
04351     delete m_Env; m_Env = 0;
04352 }
04353 
04354 void CBDB_Cache::x_PerformCheckPointNoLock(unsigned bytes_written)
04355 {
04356     if (m_RunPurgeThread == false) {
04357         m_Env->TransactionCheckpoint();
04358     }
04359 }
04360 
04361 time_t
04362 CBDB_Cache::x_ComputeExpTime(int time_stamp, unsigned ttl, int timeout)
04363 {
04364     time_t exp_time;
04365     if (ttl) {  // individual timeout
04366         if (m_MaxTimeout && ttl > m_MaxTimeout) {
04367             timeout =
04368                 (int) max((unsigned)timeout, (unsigned)m_MaxTimeout);
04369             ttl = timeout; // used in diagnostics only
04370         } else {
04371             timeout = ttl;
04372         }
04373     }
04374 
04375     // predicted job expiration time
04376     exp_time = time_stamp + timeout;
04377     return exp_time;
04378 }
04379 
04380 bool CBDB_Cache::x_CheckTimeStampExpired(SCache_AttrDB& attr_db,
04381                                          time_t  curr,
04382                                          time_t* exp_time)
04383 {
04384     int timeout = GetTimeout();
04385 
04386     if (timeout) {
04387 
04388         int db_time_stamp = attr_db.time_stamp;
04389         unsigned int ttl = attr_db.ttl;
04390 
04391         if (ttl) {  // individual timeout
04392             if (m_MaxTimeout && ttl > m_MaxTimeout) {
04393                 timeout =
04394                     (int) max((unsigned)timeout, (unsigned)m_MaxTimeout);
04395                 ttl = timeout; // used in diagnostics only
04396             } else {
04397                 timeout = ttl;
04398             }
04399         }
04400 
04401         // predicted job expiration time
04402         if (exp_time) {
04403             *exp_time = db_time_stamp + timeout;
04404         }
04405 
04406         if (curr - timeout > db_time_stamp) {
04407             return true;
04408         }
04409     }
04410     return false;
04411 }
04412 
04413 
04414 void CBDB_Cache::x_UpdateAccessTime(const string&  key,
04415                                     int            version,
04416                                     const string&  subkey,
04417                                     EBlobAccessType access_type,
04418                                     CBDB_Transaction& trans)
04419 {
04420     if (IsReadOnly()) {
04421         return;
04422     }
04423 
04424     unsigned   timeout = (unsigned)time(0);
04425     {{
04426         CBDB_FileCursor cur(*m_CacheAttrDB, trans,
04427                             CBDB_FileCursor::eReadModifyUpdate);
04428 
04429         cur.SetCondition(CBDB_FileCursor::eEQ);
04430         cur.From << key << version;
04431 
04432         if (cur.Fetch() == eBDB_Ok) {
04433             unsigned old_ts = m_CacheAttrDB->time_stamp;
04434             if (old_ts < timeout) {
04435                 unsigned max_time = m_CacheAttrDB->max_time;
04436                 if (max_time == 0 || max_time >= timeout) {
04437                     m_CacheAttrDB->time_stamp = timeout;
04438 
04439                     switch (access_type) {
04440                     case eBlobRead:
04441                         {
04442                         unsigned read_count = m_CacheAttrDB->read_count;
04443                         ++read_count;
04444                         m_CacheAttrDB->read_count = read_count;
04445                         }
04446                         break;
04447                     case eBlobUpdate:
04448                         {
04449                         unsigned upd_count = m_CacheAttrDB->upd_count;
04450                         ++upd_count;
04451                         m_CacheAttrDB->upd_count = upd_count;
04452                         }
04453                         break;
04454                     case eBlobStore:
04455                         break;
04456                     default:
04457                         _ASSERT(0);
04458                     }
04459 
04460                     cur.Update();
04461                 }
04462             }
04463         } // if
04464 
04465     }} // cursor
04466 }
04467 
04468 void CBDB_Cache::x_TruncateDB()
04469 {
04470     if (IsReadOnly()) {
04471         return;
04472     }
04473 
04474     // TODO: optimization of store truncate
04475     {{
04476     CBDB_FileCursor cur(*m_CacheAttrDB);
04477     cur.SetCondition(CBDB_FileCursor::eFirst);
04478     while (cur.Fetch() == eBDB_Ok) {
04479         unsigned blob_id, volume_id, split_id;
04480         blob_id = m_CacheAttrDB->blob_id;
04481         volume_id = m_CacheAttrDB->volume_id;
04482         split_id = m_CacheAttrDB->split_id;
04483 
04484         if (blob_id) {
04485             unsigned coords[2] = { volume_id, split_id };
04486             m_BLOB_SplitStore->SetTransaction(0);
04487             m_BLOB_SplitStore->Delete(blob_id, coords);
04488         }
04489     } // while
04490 
04491     }}
04492 
04493     m_BLOB_SplitStore->Save();
04494 
04495 
04496     LOG_POST_X(29, Info << "CBDB_BLOB_Cache:: cache database truncated");
04497     m_CacheAttrDB->Truncate();
04498 
04499     CDir dir(m_Path);
04500     CMaskFileName mask;
04501     mask.Add(this->GetName() + "_*.ov_");
04502 
04503     string ext;
04504     string ov_(".ov_");
04505     if (dir.Exists()) {
04506         CDir::TEntries  content(dir.GetEntries(mask));
04507         ITERATE(CDir::TEntries, it, content) {
04508             if ( (*it)->IsFile() ) {
04509                 ext = (*it)->GetExt();
04510                 if (ext == ov_) {
04511                     (*it)->Remove();
04512                 }
04513             }
04514         }
04515     }
04516 }
04517 
04518 
04519 bool CBDB_Cache::x_RetrieveBlobAttributes(const string&  key,
04520                                           int            version,
04521                                           const string&  subkey,
04522                                           int&           overflow,
04523                                           unsigned int&  ttl,
04524                                           unsigned int&  blob_id,
04525                                           unsigned int&  volume_id,
04526                                           unsigned int&  split_id)
04527 {
04528     if (!x_FetchBlobAttributes(key, version, subkey)) return false;
04529 
04530     overflow  = m_CacheAttrDB->overflow;
04531     ttl       = m_CacheAttrDB->ttl;
04532     blob_id   = m_CacheAttrDB->blob_id;
04533     volume_id = m_CacheAttrDB->volume_id;
04534     split_id  = m_CacheAttrDB->split_id;
04535 
04536 /*
04537     if (!(m_TimeStampFlag & fTrackSubKey)) {
04538         m_CacheAttrDB->subkey = "";
04539 
04540         EBDB_ErrCode err = m_CacheAttrDB->Fetch();
04541         if (err != eBDB_Ok) {
04542             return false;
04543         }
04544     }
04545 */
04546     return true;
04547 }
04548 
04549 bool CBDB_Cache::x_FetchBlobAttributes(const string&  key,
04550                                        int            version,
04551                                        const string&  subkey)
04552 {
04553     m_CacheAttrDB->key = key;
04554     m_CacheAttrDB->version = version;
04555     m_CacheAttrDB->subkey = subkey;
04556 
04557     EBDB_ErrCode ret = m_CacheAttrDB->Fetch();
04558     if (ret != eBDB_Ok) {
04559         return false;
04560     }
04561     return true;
04562 }
04563 
04564 
04565 
04566 void CBDB_Cache::x_DropOverflow(const string&  key,
04567                                 int            version,
04568                                 const string&  subkey)
04569 {
04570     string path;
04571     try {
04572         s_MakeOverflowFileName(path, m_Path, GetName(), key, version, subkey);
04573         x_DropOverflow(path);
04574     } catch (exception& ex) {
04575         ERR_POST_X(30, "Blob Store: Cannot remove file: " << path
04576                    << " " << ex.what());
04577     }
04578 }
04579 
04580 void CBDB_Cache::x_DropOverflow(const string&  file_path)
04581 {
04582     try {
04583         CDirEntry entry(file_path);
04584         if (entry.Exists()) {
04585             entry.Remove();
04586         }
04587     } catch (exception& ex) {
04588         ERR_POST_X(31, "Blob Store: Cannot remove file: " << file_path
04589                    << " " << ex.what());
04590     }
04591 }
04592 
04593 unsigned CBDB_Cache::GetNextBlobId(bool lock_id)
04594 {
04595     unsigned blob_id = m_BlobIdCounter.Add(1);
04596     if (blob_id >= kMax_UInt) {
04597         m_BlobIdCounter.Set(0);
04598         blob_id = m_BlobIdCounter.Add(1);
04599         m_GC_Deleted.clear();
04600     }
04601     if (lock_id) {
04602         bool locked = m_LockVector.TryLock(blob_id);
04603         if (!locked) {
04604             BDB_THROW(eInvalidOperation, "Cannot lock new BLOB ID");
04605         }
04606     }
04607     return blob_id;
04608 }
04609 
04610 bool CBDB_Cache::DropBlobWithExpCheck(unsigned           blob_id,
04611                                       CBDB_Transaction&  trans)
04612 {
04613     string key, subkey;
04614     int version;
04615     EBDB_ErrCode ret;
04616     {{
04617         CFastMutexGuard guard(m_IDIDX_Lock_RO);
04618 
04619         m_CacheIdIDX_RO->blob_id = blob_id;
04620         ret = m_CacheIdIDX_RO->Fetch();
04621         if (ret == eBDB_Ok) {
04622             key = m_CacheIdIDX_RO->key;
04623             version = m_CacheIdIDX_RO->version;
04624             subkey = m_CacheIdIDX_RO->subkey;
04625         }
04626     }} // m_IDIDX_Lock_RO
04627 
04628     if (ret == eBDB_Ok) {
04629         return DropBlobWithExpCheck(key, version, subkey, trans);
04630     }
04631     return false;
04632 }
04633 
04634 
04635 
04636 bool CBDB_Cache::DropBlobWithExpCheck(const string&      key,
04637                                       int                version,
04638                                       const string&      subkey,
04639                                       CBDB_Transaction&  trans)
04640 {
04641     if (IsReadOnly()) {
04642         return false;
04643     }
04644 
04645     time_t curr = time(0);
04646 
04647     unsigned coords[2] = {0,};
04648     unsigned split_coord[2] = {0,};
04649     int overflow;
04650     unsigned  blob_id;
04651 
04652     bool blob_expired = false;
04653     time_t exp_time;
04654 
04655     {{
04656         CFastMutexGuard guard(m_CARO2_Lock);
04657 
04658         m_CacheAttrDB_RO2->SetTransaction(0);
04659 
04660         m_CacheAttrDB_RO2->key = key;
04661         m_CacheAttrDB_RO2->version = version;
04662         m_CacheAttrDB_RO2->subkey = subkey;
04663         if (m_CacheAttrDB_RO2->Fetch() != eBDB_Ok) {
04664             return false;
04665         }
04666 
04667         if (x_CheckTimeStampExpired(*m_CacheAttrDB_RO2, curr, &exp_time)) {
04668             blob_expired = true;
04669             overflow  = m_CacheAttrDB_RO2->overflow;
04670             coords[0] = m_CacheAttrDB_RO2->volume_id;
04671             coords[1] = m_CacheAttrDB_RO2->split_id;
04672             blob_id   = m_CacheAttrDB_RO2->blob_id;
04673         } else {
04674             blob_expired = false;
04675             blob_id   = m_CacheAttrDB_RO2->blob_id;
04676         }
04677 
04678     }} // m_CARO2_Lock
04679 
04680     if (!blob_expired) {
04681         {{
04682             CFastMutexGuard guard(m_TimeLine_Lock);
04683             _ASSERT(exp_time);
04684             m_TimeLine->AddObject(exp_time, blob_id);
04685         }} // m_TimeLine_Lock
04686         return false;
04687     }
04688 
04689     // BLOB expired and needs to be deleted
04690     //
04691 
04692 
04693     // Delete the overflow file
04694     //
04695     if (overflow == 1) {
04696         x_DropOverflow(key, version, subkey);
04697     }
04698 
04699     // Important implementation note:
04700     // There is a temptation to delete BLOB split store first
04701     // and registration record second to minimize time registration record
04702     // stays locked. We should NOT do it in ONE transaction
04703     // because in other places order of access is different and this may
04704     // create DEADLOCK
04705 
04706     // Delete BLOB attributes and index
04707     //
04708     {{
04709         CFastMutexGuard guard(m_DB_Lock);
04710         m_CacheAttrDB->SetTransaction(&trans);
04711 
04712         m_CacheAttrDB->key     = key;
04713         m_CacheAttrDB->version = version;
04714         m_CacheAttrDB->subkey  = subkey;
04715         if (m_CacheAttrDB->Fetch() != eBDB_Ok) {
04716             return false;
04717         }
04718         m_CacheAttrDB->Delete(CBDB_RawFile::eThrowOnError);
04719 
04720         m_CacheIdIDX->SetTransaction(&trans);
04721         m_CacheIdIDX->blob_id = blob_id;
04722         m_CacheIdIDX->Delete(CBDB_RawFile::eThrowOnError);
04723 
04724     }} // m_DB_Lock
04725 
04726     // Delete split store
04727     //
04728     EBDB_ErrCode ret =
04729         m_BLOB_SplitStore->GetCoordinates(blob_id, split_coord);
04730     m_BLOB_SplitStore->SetTransaction(&trans);
04731     if (ret == eBDB_Ok) {
04732         if (split_coord[0] != coords[0] ||
04733             split_coord[1] != coords[1]) {
04734             // split coords un-sync: delete de-facto BLOB
04735             m_BLOB_SplitStore->Delete(blob_id,
04736                                       CBDB_RawFile::eThrowOnError);
04737         }
04738     }
04739     // Delete BLOB by known coordinates
04740     m_BLOB_SplitStore->Delete(blob_id, coords,
04741                               CBDB_RawFile::eThrowOnError);
04742 
04743     m_GC_Deleted.set(blob_id);
04744 
04745     return true;
04746 }
04747 
04748 
04749 
04750 void CBDB_Cache::x_DropBlob(const string&      key,
04751                             int                version,
04752                             const string&      subkey,
04753                             int                overflow,
04754                             unsigned           blob_id,
04755                             CBDB_Transaction&  trans)
04756 {
04757     if (IsReadOnly()) {
04758         return;
04759     }
04760 
04761     if (overflow == 1) {
04762         x_DropOverflow(key, version, subkey);
04763     }
04764 
04765     unsigned coords[2] = {0,};
04766     unsigned split_coord[2] = {0,};
04767 
04768     if (blob_id) {
04769         bool delete_split = false;
04770         {{
04771         //m_CacheAttrDB->SetTransaction(0);
04772 
04773         CBDB_FileCursor cur(*m_CacheAttrDB);
04774         cur.SetCondition(CBDB_FileCursor::eEQ);
04775         cur.From << key << version << subkey;
04776         if (cur.Fetch() == eBDB_Ok) {
04777             coords[0] = m_CacheAttrDB->volume_id;
04778             coords[1] = m_CacheAttrDB->split_id;
04779 
04780             EBDB_ErrCode ret =
04781                 m_BLOB_SplitStore->GetCoordinates(blob_id, split_coord);
04782             if (ret == eBDB_Ok) {
04783                 if (split_coord[0] != coords[0] ||
04784                     split_coord[1] != coords[1]) {
04785                     delete_split = true;
04786                 }
04787             }
04788 
04789         }
04790         }}
04791 
04792         // delete blob as pointed by de-mux splitter
04793         if (delete_split) {
04794             m_BLOB_SplitStore->Delete(blob_id,
04795                                       CBDB_RawFile::eThrowOnError);
04796         }
04797         // delete blob as accounted by meta-information
04798         m_BLOB_SplitStore->Delete(blob_id, coords,
04799                                   CBDB_RawFile::eThrowOnError);
04800 
04801     }
04802     m_CacheAttrDB->SetTransaction(&trans);
04803 
04804     m_CacheAttrDB->key = key;
04805     m_CacheAttrDB->version = version;
04806     m_CacheAttrDB->subkey = subkey;
04807 
04808     m_CacheAttrDB->Delete(CBDB_RawFile::eThrowOnError);
04809 
04810     m_CacheIdIDX->SetTransaction(&trans);
04811 
04812     m_CacheIdIDX->blob_id = blob_id;
04813     m_CacheIdIDX->Delete(CBDB_RawFile::eThrowOnError);
04814 }
04815 
04816 bool CBDB_Cache::IsLocked(unsigned blob_id)
04817 {
04818     return m_LockVector.IsLocked(blob_id);
04819 }
04820 
04821 bool CBDB_Cache::IsLocked(const string&  key,
04822                           int            version,
04823                           const string&  subkey)
04824 {
04825     unsigned blob_id = GetBlobId(key, version, subkey);
04826     if (!blob_id) return false;
04827     return IsLocked(blob_id);
04828 }
04829 
04830 
04831 CBDB_Cache::EBlobCheckinRes
04832 CBDB_Cache::BlobCheckIn(unsigned         blob_id_ext,
04833                         const string&    key,
04834                         int              version,
04835                         const string&    subkey,
04836                         EBlobCheckinMode mode,
04837                         TBlobLock&       blob_lock,
04838                         bool             do_id_lock,
04839                         unsigned*        volume_id,
04840                         unsigned*        split_id,
04841                         unsigned*        overflow)
04842 {
04843     _ASSERT(volume_id);
04844     _ASSERT(split_id);
04845     _ASSERT(overflow);
04846 
04847     EBDB_ErrCode ret;
04848     unsigned blob_id = 0;
04849 
04850     while (blob_id == 0) {
04851         {{
04852             CFastMutexGuard guard(m_DB_Lock);
04853             m_CacheAttrDB->SetTransaction(0);
04854 
04855             CBDB_FileCursor cur(*m_CacheAttrDB);
04856             cur.SetCondition(CBDB_FileCursor::eEQ);
04857             cur.From << key << version << subkey;
04858 
04859             ret = cur.Fetch();
04860             if (ret == eBDB_Ok) {
04861                 blob_id = m_CacheAttrDB->blob_id;
04862                 *volume_id = m_CacheAttrDB->volume_id;
04863                 *split_id = m_CacheAttrDB->split_id;
04864                 *overflow = m_CacheAttrDB->overflow;
04865             }
04866         }} // m_DB_Lock
04867 
04868         if (ret == eBDB_Ok) {
04869             _ASSERT(blob_id);
04870             if (do_id_lock) {
04871                 blob_lock.Lock(blob_id);
04872             } else {
04873                 blob_lock.SetId(blob_id);
04874             }
04875             return eBlobCheckIn_Found;
04876         }
04877 
04878         *overflow = *volume_id = *split_id = 0;
04879 
04880         switch (mode) {
04881         case eBlobCheckIn:
04882             break;
04883         case eBlobCheckIn_Create:
04884             {
04885             if (blob_id_ext == 0) {
04886                 blob_id = GetNextBlobId(false/*do not lock*/);
04887             } else {
04888                 blob_id = blob_id_ext;
04889             }
04890 
04891             CBDB_Transaction trans(*m_Env,
04892                                 CBDB_Transaction::eTransASync,  // async!
04893                                 CBDB_Transaction::eNoAssociation);
04894 
04895             // create registration record: use temp magic values
04896             // (and overide some later)
04897             //
04898             {{
04899                 CFastMutexGuard guard(m_DB_Lock);
04900                 m_CacheAttrDB->SetTransaction(&trans);
04901 
04902                 m_CacheAttrDB->key = key;
04903                 m_CacheAttrDB->version = version;
04904                 m_CacheAttrDB->subkey = subkey;
04905                 m_CacheAttrDB->time_stamp = Uint4(time(0)+1);
04906                 m_CacheAttrDB->overflow = 0;
04907                 m_CacheAttrDB->ttl = 77;
04908                 m_CacheAttrDB->max_time = 77;
04909                 m_CacheAttrDB->upd_count  = 0;
04910                 m_CacheAttrDB->read_count = 0;
04911                 m_CacheAttrDB->owner_name = "BDB_cache";
04912                 m_CacheAttrDB->blob_id = blob_id;
04913                 m_CacheAttrDB->volume_id = 0;
04914                 m_CacheAttrDB->split_id = 0;
04915 
04916                 ret = m_CacheAttrDB->Insert();
04917 
04918                 if (ret == eBDB_Ok) {
04919                     m_CacheIdIDX->SetTransaction(&trans);
04920 
04921                     m_CacheIdIDX->blob_id = blob_id;
04922                     m_CacheIdIDX->key = key;
04923                     m_CacheIdIDX->version = version;
04924                     m_CacheIdIDX->subkey = subkey;
04925 
04926                     ret = m_CacheIdIDX->Insert();
04927                     if (ret == eBDB_Ok) {
04928                         trans.Commit();
04929                         if (do_id_lock) {
04930                             blob_lock.Lock(blob_id);
04931                         } else {
04932                             blob_lock.SetId(blob_id);
04933                         }
04934                         return eBlobCheckIn_Created;
04935                     } else {
04936                         BDB_THROW(eInvalidOperation,
04937                                   "Cannot update blob id index");
04938                     }
04939                 }
04940             }} // m_DB_Lock
04941 
04942             // Insert failed (record exists) - re-read the record
04943             blob_id = 0;
04944             }
04945             break;
04946         default:
04947             _ASSERT(0);
04948         }
04949     } // while (!blob_id)
04950 
04951     return EBlobCheckIn_NotFound;
04952 }
04953 
04954 
04955 
04956 void CBDB_Cache::x_UpdateOwnerStatOnDelete(const string& owner,
04957                                            bool          expl_delete)
04958 {
04959 /*
04960     SBDB_CacheStatistics& st = m_OwnerStatistics[owner];
04961 
04962     const char* key = m_CacheAttrDB->key;
04963     int version = m_CacheAttrDB->version;
04964     const char* subkey = m_CacheAttrDB->subkey;
04965 
04966     unsigned upd_count = m_CacheAttrDB->upd_count;
04967     unsigned read_count = m_CacheAttrDB->read_count;
04968     int overflow = m_CacheAttrDB->overflow;
04969 
04970     size_t blob_size = x_GetBlobSize(key, version, subkey);
04971 
04972     if (!blob_size) {
04973         return;
04974     }
04975 
04976     ++st.blobs_stored_total;
04977     st.blobs_overflow_total += overflow;
04978     st.blobs_updates_total += upd_count;
04979     if (!read_count) {
04980         ++st.blobs_never_read_total;
04981     }
04982     st.blobs_read_total += read_count;
04983     st.blobs_expl_deleted_total += expl_delete;
04984     if (!expl_delete) {
04985         st.AddPurgeDelete();
04986     }
04987     st.blobs_size_total += blob_size;
04988     if (blob_size > st.blob_size_max_total) {
04989         st.blob_size_max_total = blob_size;
04990     }
04991 
04992     SBDB_CacheStatistics::AddToHistogram(&st.blob_size_hist, blob_size);
04993 */
04994 }
04995 /*
04996 size_t CBDB_Cache::x_GetBlobSize(const char* key,
04997                                  int         version,
04998                                  const char* subkey)
04999 {
05000     int overflow = m_CacheAttrDB->overflow;
05001     unsigned blob_id = m_CacheAttrDB->blob_id;
05002     if (overflow) {
05003         string path;
05004         s_MakeOverflowFileName(path, m_Path, GetName(), key, version, subkey);
05005         CFile entry(path);
05006 
05007         if (entry.Exists()) {
05008             return (size_t) entry.GetLength();
05009         }
05010     }
05011 
05012     // Regular inline BLOB
05013 
05014     if (blob_id == 0) {
05015         return 0;
05016     }
05017 
05018     m_CacheBLOB_DB->blob_id = blob_id;
05019     EBDB_ErrCode ret = m_CacheBLOB_DB->Fetch();
05020 
05021     if (ret != eBDB_Ok) {
05022         return 0;
05023     }
05024     return m_CacheBLOB_DB->LobSize();
05025 }
05026 */
05027 
05028 void CBDB_Cache::Lock()
05029 {
05030     m_DB_Lock.Lock();
05031 }
05032 
05033 void CBDB_Cache::Unlock()
05034 {
05035     m_DB_Lock.Unlock();
05036 }
05037 
05038 
05039 
05040 void CBDB_Cache::RegisterInternalError(
05041         SBDB_CacheUnitStatistics::EErrGetPut operation,
05042         const string&                        owner)
05043 {
05044     if (IsSaveStatistics()) {
05045         CFastMutexGuard guard(m_DB_Lock);
05046         m_Statistics.AddInternalError(owner, operation);
05047     }
05048 }
05049 
05050 void CBDB_Cache::RegisterProtocolError(
05051                             SBDB_CacheUnitStatistics::EErrGetPut operation,
05052                             const string&                        owner)
05053 {
05054     if (IsSaveStatistics()) {
05055         CFastMutexGuard guard(m_DB_Lock);
05056         m_Statistics.AddProtocolError(owner, operation);
05057     }
05058 }
05059 
05060 void CBDB_Cache::RegisterNoBlobError(
05061                              SBDB_CacheUnitStatistics::EErrGetPut operation,
05062                              const string&                        owner)
05063 {
05064     if (IsSaveStatistics()) {
05065         CFastMutexGuard guard(m_DB_Lock);
05066         m_Statistics.AddNoBlobError(owner, operation);
05067     }
05068 }
05069 
05070 void CBDB_Cache::RegisterCommError(
05071                               SBDB_CacheUnitStatistics::EErrGetPut operation,
05072                               const string&                        owner)
05073 {
05074     if (IsSaveStatistics()) {
05075         CFastMutexGuard guard(m_DB_Lock);
05076         m_Statistics.AddCommError(owner, operation);
05077     }
05078 }
05079 
05080 
05081 
05082 
05083 
05084 
05085 
05086 
05087 CBDB_Cache::CacheKey::CacheKey(const string& x_key,
05088                                int           x_version,
05089                                const string& x_subkey)
05090 : key(x_key), version(x_version), subkey(x_subkey)
05091 {}
05092 
05093 
05094 bool
05095 CBDB_Cache::CacheKey::operator < (const CBDB_Cache::CacheKey& cache_key) const
05096 {
05097     int cmp = NStr::Compare(key, cache_key.key);
05098     if (cmp != 0)
05099         return cmp < 0;
05100     if (version != cache_key.version) return (version < cache_key.version);
05101     cmp = NStr::Compare(subkey, cache_key.subkey);
05102     if (cmp != 0)
05103         return cmp < 0;
05104     return false;
05105 }
05106 
05107 
05108 void BDB_Register_Cache(void)
05109 {
05110     RegisterEntryPoint<ICache>(NCBI_BDB_ICacheEntryPoint);
05111 }
05112 
05113 
05114 const char* kBDBCacheDriverName = "bdb";
05115 
05116 /// Class factory for BDB implementation of ICache
05117 ///
05118 /// @internal
05119 ///
05120 class CBDB_CacheReaderCF : public CICacheCF<CBDB_Cache>
05121 {
05122 public:
05123     typedef CICacheCF<CBDB_Cache> TParent;
05124 public:
05125     CBDB_CacheReaderCF() : TParent(kBDBCacheDriverName, 0)
05126     {
05127     }
05128     ~CBDB_CacheReaderCF()
05129     {
05130     }
05131 
05132     virtual
05133     ICache* CreateInstance(
05134                    const string&    driver  = kEmptyStr,
05135                    CVersionInfo     version = NCBI_INTERFACE_VERSION(ICache),
05136                    const TPluginManagerParamTree* params = 0) const;
05137 
05138 };
05139 
05140 // List of parameters accepted by the CF
05141 
05142 static const char* kCFParam_path               = "path";
05143 static const char* kCFParam_name               = "name";
05144 static const char* kCFParam_drop_if_dirty      = "drop_if_dirty";
05145 
05146 static const char* kCFParam_lock               = "lock";
05147 static const char* kCFParam_lock_default       = "no_lock";
05148 static const char* kCFParam_lock_pid_lock      = "pid_lock";
05149 
05150 static const char* kCFParam_mem_size           = "mem_size";
05151 static const char* kCFParam_log_mem_size       = "log_mem_size";
05152 static const char* kCFParam_read_only          = "read_only";
05153 static const char* kCFParam_write_sync         = "write_sync";
05154 static const char* kCFParam_use_transactions   = "use_transactions";
05155 static const char* kCFParam_direct_db          = "direct_db";
05156 static const char* kCFParam_direct_log         = "direct_log";
05157 static const char* kCFParam_transaction_log_path = "transaction_log_path";
05158 
05159 static const char* kCFParam_purge_batch_size   = "purge_batch_size";
05160 static const char* kCFParam_purge_batch_sleep  = "purge_batch_sleep";
05161 static const char* kCFParam_purge_clean_log    = "purge_clean_log";
05162 static const char* kCFParam_purge_thread       = "purge_thread";
05163 static const char* kCFParam_purge_thread_delay = "purge_thread_delay";
05164 static const char* kCFParam_checkpoint_delay   = "checkpoint_delay";
05165 static const char* kCFParam_checkpoint_bytes   = "checkpoint_bytes";
05166 static const char* kCFParam_log_file_max       = "log_file_max";
05167 static const char* kCFParam_overflow_limit     = "overflow_limit";
05168 static const char* kCFParam_ttl_prolong        = "ttl_prolong";
05169 static const char* kCFParam_max_blob_size      = "max_blob_size";
05170 static const char* kCFParam_rr_volumes         = "rr_volumes";
05171 static const char* kCFParam_memp_trickle       = "memp_trickle";
05172 static const char* kCFParam_TAS_spins          = "tas_spins";
05173 
05174 
05175 bool CBDB_Cache::SameCacheParams(const TCacheParams* params) const
05176 {
05177     if ( !params ) {
05178         return false;
05179     }
05180     const TCacheParams* driver = params->FindNode("driver");
05181     if (!driver  ||  driver->GetValue().value != kBDBCacheDriverName) {
05182         return false;
05183     }
05184     const TCacheParams* driver_params = params->FindNode(kBDBCacheDriverName);
05185     if ( !driver_params ) {
05186         return false;
05187     }
05188     const TCacheParams* path = driver_params->FindNode(kCFParam_path);
05189     string str_path = path ?
05190         CDirEntry::AddTrailingPathSeparator(
05191         path->GetValue().value) : kEmptyStr;
05192     if (!path  || str_path != m_Path) {
05193         return false;
05194     }
05195     const TCacheParams* name = driver_params->FindNode(kCFParam_name);
05196     return name  &&  name->GetValue().value == m_Name;
05197 }
05198 
05199 
05200 ICache* CBDB_CacheReaderCF::CreateInstance(
05201            const string&                  driver,
05202            CVersionInfo                   version,
05203            const TPluginManagerParamTree* params) const
05204 {
05205     auto_ptr<CBDB_Cache> drv;
05206     if (driver.empty() || driver == m_DriverName) {
05207         if (version.Match(NCBI_INTERFACE_VERSION(ICache))
05208                             != CVersionInfo::eNonCompatible) {
05209             drv.reset(new CBDB_Cache());
05210         }
05211     } else {
05212         return 0;
05213     }
05214 
05215     if (!params)
05216         return drv.release();
05217 
05218     // cache configuration
05219 
05220     const string& path =
05221         GetParam(params, kCFParam_path, true);
05222     string name =
05223         GetParam(params, kCFParam_name, false, "lcache");
05224     string locking =
05225         GetParam(params, kCFParam_lock, false, kCFParam_lock_default);
05226 
05227     CBDB_Cache::ELockMode lock = CBDB_Cache::eNoLock;
05228     if (NStr::CompareNocase(locking, kCFParam_lock_pid_lock) == 0) {
05229         lock = CBDB_Cache::ePidLock;
05230     }
05231     unsigned overflow_limit = (unsigned)
05232         GetParamDataSize(params, kCFParam_overflow_limit, false, 0);
05233     if (overflow_limit) {
05234         drv->SetOverflowLimit(overflow_limit);
05235     }
05236 
05237     drv->SetInitIfDirty(GetParamBool(params, kCFParam_drop_if_dirty, false, false));
05238 
05239     Uint8 mem_size =
05240         GetParamDataSize(params, kCFParam_mem_size, false, 0);
05241 
05242     Uint8 log_mem_size =
05243         GetParamDataSize(params, kCFParam_log_mem_size, false, 0);
05244 
05245     unsigned checkpoint_bytes = (unsigned)
05246         GetParamDataSize(params, kCFParam_checkpoint_bytes,
05247                          false, 24 * 1024 * 1024);
05248     drv->SetCheckpoint(checkpoint_bytes);
05249 
05250     unsigned checkpoint_delay =
05251         GetParamInt(params, kCFParam_checkpoint_delay, false, 15);
05252     drv->SetCheckpointDelay(checkpoint_delay);
05253 
05254     unsigned log_file_max = (unsigned)
05255         GetParamDataSize(params, kCFParam_log_file_max,
05256                          false, 200 * 1024 * 1024);
05257     drv->SetLogFileMax(log_file_max);
05258 
05259     string transaction_log_path =
05260         GetParam(params, kCFParam_transaction_log_path, false, path);
05261     if (transaction_log_path != path) {
05262         drv->SetLogDir(transaction_log_path);
05263     }
05264 
05265 
05266     bool ro =
05267         GetParamBool(params, kCFParam_read_only, false, false);
05268 
05269     bool w_sync =
05270         GetParamBool(params, kCFParam_write_sync, false, false);
05271     drv->SetWriteSync(w_sync ?
05272                         CBDB_Cache::eWriteSync : CBDB_Cache::eWriteNoSync);
05273 
05274     unsigned ttl_prolong =
05275         GetParamInt(params, kCFParam_ttl_prolong, false, 0);
05276     drv->SetTTL_Prolongation(ttl_prolong);
05277 
05278     unsigned max_blob_size =(unsigned)
05279         GetParamDataSize(params, kCFParam_max_blob_size, false, 0);
05280     drv->SetMaxBlobSize(max_blob_size);
05281 
05282     unsigned rr_volumes =
05283         GetParamInt(params, kCFParam_rr_volumes, false, 3);
05284     drv->SetRR_Volumes(rr_volumes);
05285 
05286     unsigned memp_trickle =
05287         GetParamInt(params, kCFParam_memp_trickle, false, 60);
05288     drv->SetMempTrickle(memp_trickle);
05289 
05290     ConfigureICache(drv.get(), params);
05291 
05292     bool use_trans =
05293         GetParamBool(params, kCFParam_use_transactions, false, true);
05294 
05295     unsigned batch_size =
05296         GetParamInt(params, kCFParam_purge_batch_size, false, 70);
05297     drv->SetPurgeBatchSize(batch_size);
05298 
05299     unsigned batch_sleep =
05300         GetParamInt(params, kCFParam_purge_batch_sleep, false, 0);
05301     drv->SetBatchSleep(batch_sleep);
05302 
05303     unsigned purge_clean_factor =
05304         GetParamInt(params, kCFParam_purge_clean_log, false, 0);
05305     drv->CleanLogOnPurge(purge_clean_factor);
05306 
05307     bool purge_thread =
05308         GetParamBool(params, kCFParam_purge_thread, false, false);
05309     unsigned purge_thread_delay =
05310         GetParamInt(params, kCFParam_purge_thread_delay, false, 30);
05311 
05312     if (purge_thread) {
05313         drv->RunPurgeThread(purge_thread_delay);
05314     }
05315 
05316     unsigned tas_spins =
05317         GetParamInt(params, kCFParam_TAS_spins, false, 200);
05318 
05319     if (ro) {
05320         drv->OpenReadOnly(path.c_str(), name.c_str(), (unsigned)mem_size);
05321     } else {
05322         drv->Open(path, name,
05323                   lock, mem_size,
05324                   use_trans ? CBDB_Cache::eUseTrans : CBDB_Cache::eNoTrans,
05325                   (unsigned)log_mem_size);
05326     }
05327 
05328     if (!drv->IsJoinedEnv()) {
05329         bool direct_db =
05330             GetParamBool(params, kCFParam_direct_db, false, false);
05331         bool direct_log =
05332             GetParamBool(params, kCFParam_direct_log, false, false);
05333 
05334         CBDB_Env* env = drv->GetEnv();
05335         env->SetDirectDB(direct_db);
05336         env->SetDirectLog(direct_log);
05337         if (tas_spins) {
05338             env->SetTasSpins(tas_spins);
05339         }
05340         env->SetLkDetect(CBDB_Env::eDeadLock_Default);
05341         env->SetMpMaxWrite(0, 0);
05342     }
05343 
05344     return drv.release();
05345 
05346 }
05347 
05348 
05349 void NCBI_BDB_ICacheEntryPoint(
05350      CPluginManager<ICache>::TDriverInfoList&   info_list,
05351      CPluginManager<ICache>::EEntryPointRequest method)
05352 {
05353     CHostEntryPointImpl<CBDB_CacheReaderCF>::
05354        NCBI_EntryPointImpl(info_list, method);
05355 }
05356 
05357 void NCBI_EntryPoint_xcache_bdb(
05358      CPluginManager<ICache>::TDriverInfoList&   info_list,
05359      CPluginManager<ICache>::EEntryPointRequest method)
05360 {
05361     NCBI_BDB_ICacheEntryPoint(info_list, method);
05362 }
05363 
05364 
05365 CBDB_CacheHolder::CBDB_CacheHolder(ICache* blob_cache, ICache* id_cache)
05366 : m_BlobCache(blob_cache),
05367   m_IdCache(id_cache)
05368 {}
05369 
05370 CBDB_CacheHolder::~CBDB_CacheHolder()
05371 {
05372     delete m_BlobCache;
05373     delete m_IdCache;
05374 }
05375 
05376 
05377 void BDB_ConfigureCache(CBDB_Cache&             bdb_cache,
05378                         const string&           path,
05379                         const string&           name,
05380                         unsigned                timeout,
05381                         ICache::TTimeStampFlags tflags)
05382 {
05383     if (!tflags) {
05384         tflags =
05385             ICache::fTimeStampOnCreate         |
05386             ICache::fExpireLeastFrequentlyUsed |
05387             ICache::fPurgeOnStartup            |
05388 //            ICache::fTrackSubKey               |
05389             ICache::fCheckExpirationAlways;
05390     }
05391     if (timeout == 0) {
05392         timeout = 24 * 60 * 60;
05393     }
05394 
05395     bdb_cache.SetTimeStampPolicy(tflags, timeout);
05396     bdb_cache.SetVersionRetention(ICache::eKeepAll);
05397 
05398     bdb_cache.Open(path.c_str(),
05399                    name.c_str(),
05400                    CBDB_Cache::eNoLock,
05401                    10 * 1024 * 1024,
05402                    CBDB_Cache::eUseTrans);
05403 
05404 }
05405 
05406 
05407 END_NCBI_SCOPE
Modified on Wed May 23 13:10:12 2012 by modify_doxy.py rev. 337098