NCBI C++ ToolKit
sqlite_cache.cpp
Go to the documentation of this file.
00001 /*  $Id: sqlite_cache.cpp 24283 2011-09-01 17:38:45Z kuznets $
00002  * ===========================================================================
00003  *
00004  *                            PUBLIC DOMAIN NOTICE
00005  *               National Center for Biotechnology Information
00006  *
00007  *  This software/database is a "United States Government Work" under the
00008  *  terms of the United States Copyright Act.  It was written as part of
00009  *  the author's official duties as a United States Government employee and
00010  *  thus cannot be copyrighted.  This software/database is freely available
00011  *  to the public for use. The National Library of Medicine and the U.S.
00012  *  Government have not placed any restriction on its use or reproduction.
00013  *
00014  *  Although all reasonable efforts have been taken to ensure the accuracy
00015  *  and reliability of the software and data, the NLM and the U.S.
00016  *  Government do not and cannot warrant the performance or results that
00017  *  may be obtained by using this software or data. The NLM and the U.S.
00018  *  Government disclaim all warranties, express or implied, including
00019  *  warranties of performance, merchantability or fitness for any particular
00020  *  purpose.
00021  *
00022  *  Please cite the author in any work or product based on this material.
00023  *
00024  * ===========================================================================
00025  *
00026  * Author: Mike DiCuccio
00027  *
00028  * File Description:  SQLITE3 based ICache interface
00029  *
00030  */
00031 
00032 #include <ncbi_pch.hpp>
00033 #include <corelib/ncbistr.hpp>
00034 #include <corelib/ncbimtx.hpp>
00035 #include <corelib/ncbitime.hpp>
00036 #include <corelib/ncbifile.hpp>
00037 #include <util/simple_buffer.hpp>
00038 
00039 #include <gui/cache/sqlite_cache.hpp>
00040 
00041 #include <sqlite3.h>
00042 
00043 
00044 BEGIN_NCBI_SCOPE
00045 
00046 
00047 /////////////////////////////////////////////////////////////////////////////
00048 
00049 CSQLITE3_Cache::CWriterThread::CWriterThread(CSQLITE3_Cache& cache,
00050                                              CSQLITE3_Cache::TWriteQueue& write_q)
00051     : m_StopRequest(new SWriteRequest),
00052       m_Cache(cache),
00053       m_WriteQueue(write_q)
00054 {
00055 }
00056 
00057 void CSQLITE3_Cache::CWriterThread::Stop()
00058 {
00059     LOG_POST(Info << "CSQLITE3_Cache::CWriterThread: shutting down writer thread...");
00060     m_WriteQueue.Push(m_StopRequest);
00061 }
00062 
00063 
00064 void* CSQLITE3_Cache::CWriterThread::Main()
00065 {
00066     m_Cache.Purge(m_Cache.GetTimeout());
00067 
00068     bool vac_ok = m_Cache.Vacuum();
00069     if (!vac_ok) {
00070         // re-open db with file removal
00071         LOG_POST("SQLLite Vacuum failed use recovery procedure");
00072         m_Cache.Open(m_Cache.GetDatabase(), true);
00073     }
00074     
00075 
00076     bool done = false;
00077     while ( !done ) {
00078         CRef<SWriteRequest> req;
00079         try {
00080             req = m_WriteQueue.Pop();
00081         }
00082         catch (CException& ex) {
00083             LOG_POST(Error << ex);
00084         }
00085 
00086         if ( !req ) {
00087             continue;
00088         }
00089 
00090         if (m_StopRequest == req) {
00091             done = true;
00092             continue;
00093         }
00094 
00095         //
00096         // write it!
00097         //
00098         m_Cache.StoreSynchronous(req->key, req->version, req->subkey,
00099                                  &req->buffer[0], req->buffer.size());
00100 /*
00101 #ifdef _DEBUG
00102         size_t size = m_WriteQueue.GetSize();
00103         if (size > 0) {
00104             _TRACE("CSQLITE3_Cache::CWriterThread::Main(): "
00105                 << size << " requests pending...");
00106         }
00107 #endif
00108 */
00109     }
00110     LOG_POST(Info << "CSQLITE3_Cache::CWriterThread: writer thread stopped");
00111     return NULL;
00112 }
00113 
00114 
00115 /////////////////////////////////////////////////////////////////////////////
00116 
00117 struct SStats {
00118     CAtomicCounter bytes_read;
00119     CAtomicCounter objects_read;
00120     CAtomicCounter total_time;
00121 };
00122 
00123 static SStats s_CacheStats;
00124 
00125 /// Add BLOB key specific where condition
00126 static
00127 void s_MakeKeyCondition(const string&  key,
00128                         int            version,
00129                         const string&  subkey,
00130                         string*        out_str)
00131 {
00132     *out_str += " key = ";
00133     *out_str += "'" + key + "'";
00134     *out_str += " AND version = ";
00135     *out_str += NStr::IntToString(version);
00136     *out_str += " AND subkey = ";
00137     *out_str += "'" + subkey + "'";
00138 }
00139 
00140 ///////////////////////////////////////////////////////////////////////////////
00141 
00142 class CSQLITE3_Statement
00143 {
00144 public:
00145     /// compile a statement
00146     CSQLITE3_Statement(sqlite3* db, const string& sql);
00147     ~CSQLITE3_Statement();
00148 
00149     sqlite3_stmt* GetStatement();
00150 
00151     void Bind(int col_or_id, const void* data, size_t size);
00152     void Bind(int col_or_id, const string& val);
00153     void Bind(int col_or_id, int val);
00154 
00155     bool Execute();
00156     void Reset();
00157     int Step();
00158 
00159     /// return / extract an integer value for a column
00160     int GetInt(int col);
00161     CSQLITE3_Statement& operator>> (int& i);
00162 
00163 private:
00164     sqlite3*      m_DB;
00165     sqlite3_stmt* m_Stmt;
00166     string        m_Sql;
00167 
00168     void x_Log(int ret, const string& msg)
00169     {
00170         LOG_POST(Error << msg << ": [" << ret << "] "
00171                  << sqlite3_errmsg(m_DB));
00172     }
00173 
00174     void x_Throw(int ret, const string& msg)
00175     {
00176         CNcbiOstrstream ostr;
00177         ostr << msg << ": [" << ret << "] "
00178             << sqlite3_errmsg(m_DB);
00179         string s = string(CNcbiOstrstreamToString(ostr));
00180         NCBI_THROW(CSQLITE3_ICacheException, eUnknown, s);
00181     }
00182 
00183 private:
00184     CSQLITE3_Statement(const CSQLITE3_Statement&);
00185     CSQLITE3_Statement& operator=(const CSQLITE3_Statement&);
00186 };
00187 
00188 
00189 CSQLITE3_Statement::CSQLITE3_Statement(sqlite3* db, const string& sql)
00190 : m_DB(db), m_Stmt(NULL), m_Sql(sql)
00191 {
00192     _TRACE("sql: " << sql);
00193     int ret = 0;
00194 #if (SQLITE_VERSION_NUMBER > 3005001)
00195     if ( (ret = sqlite3_prepare_v2(m_DB, sql.c_str(), -1,
00196                                    &m_Stmt, NULL)) != SQLITE_OK) {
00197 #else
00198     if ( (ret = sqlite3_prepare(m_DB, sql.c_str(), -1,
00199                                 &m_Stmt, NULL)) != SQLITE_OK) {
00200 #endif
00201         m_Stmt = NULL;
00202         x_Throw(ret, "error preparing statement for \"" + sql + "\"");
00203     }
00204     //_TRACE("exec sql: " << sql);
00205 }
00206 
00207 CSQLITE3_Statement::~CSQLITE3_Statement()
00208 {
00209     if (m_Stmt) {
00210         sqlite3_finalize(m_Stmt);
00211     }
00212 }
00213 
00214 sqlite3_stmt* CSQLITE3_Statement::GetStatement()
00215 {
00216     return m_Stmt;
00217 }
00218 
00219 int CSQLITE3_Statement::GetInt(int col)
00220 {
00221     return sqlite3_column_int(m_Stmt, col);
00222 }
00223 
00224 CSQLITE3_Statement& CSQLITE3_Statement::operator>>(int& i)
00225 {
00226     i = GetInt(0);
00227     return *this;
00228 }
00229 
00230 void CSQLITE3_Statement::Bind(int col_or_id, const void* data, size_t size)
00231 {
00232     _ASSERT(m_Stmt);
00233     int ret = 0;
00234     if ( (ret = sqlite3_bind_blob(m_Stmt, col_or_id,
00235                                   data, size, NULL)) != SQLITE_OK) {
00236         x_Throw(ret, "error binding blob");
00237     }
00238 }
00239 
00240 void CSQLITE3_Statement::Bind(int col_or_id, const string& val)
00241 {
00242     _ASSERT(m_Stmt);
00243     int ret = 0;
00244     if ( (ret = sqlite3_bind_text(m_Stmt, col_or_id,
00245                                   val.data(), val.size(), NULL)) != SQLITE_OK) {
00246         x_Throw(ret, "error binding string");
00247     }
00248 }
00249 
00250 void CSQLITE3_Statement::Bind(int col_or_id, int val)
00251 {
00252     _ASSERT(m_Stmt);
00253     int ret = 0;
00254     if ( (ret = sqlite3_bind_int(m_Stmt, col_or_id, val)) != SQLITE_OK) {
00255         x_Throw(ret, "error binding int");
00256     }
00257 }
00258 
00259 bool CSQLITE3_Statement::Execute()
00260 {
00261     size_t count = 0;
00262     for (;;  ++count) {
00263         int ret = 0;
00264         switch ( (ret = sqlite3_step(m_Stmt)) ) {
00265         case SQLITE_ROW:
00266             break;
00267 
00268         case SQLITE_DONE:
00269             return true;
00270 
00271         default:
00272             return false;
00273         }
00274     }
00275 
00276     return true;
00277 }
00278 
00279 int CSQLITE3_Statement::Step()
00280 {
00281     return sqlite3_step(m_Stmt);
00282 }
00283 
00284 void CSQLITE3_Statement::Reset()
00285 {
00286     sqlite3_reset(m_Stmt);
00287 #if (SQLITE_VERSION_NUMBER > 3005001)
00288     sqlite3_clear_bindings(m_Stmt);
00289 #endif
00290 }
00291 
00292 
00293 //////////////////////////////////////////////////////////////////////////////
00294 
00295 static IReader* GetBlobReader(CSQLITE3_Statement& stmt, int col)
00296 {
00297     /// local IReader implementation
00298     class CBlobReader : public IReader
00299     {
00300     public:
00301         CBlobReader(const unsigned char* buf, size_t size)
00302             : m_Pos(0)
00303         {
00304             m_Buf.resize(size);
00305             memcpy(&m_Buf[0], buf, size);
00306         }
00307 
00308         virtual ~CBlobReader()
00309         {
00310             /**
00311             _TRACE(NCBI_CURRENT_FUNCTION << ": read "
00312                    << m_Pos << "/" << m_Buf.size() << " bytes");
00313                    **/
00314         }
00315 
00316         ERW_Result Read(void*   buf,
00317                         size_t  count,
00318                         size_t* bytes_read = 0)
00319         {
00320             if ( !bytes_read ) {
00321                 return eRW_Success;
00322             }
00323 
00324             count = min(count, m_Buf.size() - m_Pos);
00325             *bytes_read = count;
00326             if (count) {
00327                 memcpy(buf, &m_Buf[m_Pos], count);
00328                 m_Pos += count;
00329                 return eRW_Success;
00330             } else {
00331                 return eRW_Eof;
00332             }
00333         }
00334 
00335         ERW_Result PendingCount(size_t* count)
00336         {
00337             *count = m_Buf.size() - m_Pos;
00338             return eRW_Success;
00339         }
00340 
00341     private:
00342         CSimpleBuffer m_Buf;
00343         size_t m_Pos;
00344     };
00345 
00346     ///
00347     /// retrieve our row and create a blob reader
00348     /// FIXME: use incremental I/O in the future
00349     ///
00350 
00351     CStopWatch sw;
00352     sw.Start();
00353 
00354     auto_ptr<IReader> reader;
00355     int size = sqlite3_column_bytes(stmt.GetStatement(), 0);
00356     const void* data = sqlite3_column_blob(stmt.GetStatement(), 0);
00357     if (data) {
00358         reader.reset(new CBlobReader((const unsigned char*)data, size));
00359         s_CacheStats.bytes_read.Add(size);
00360         s_CacheStats.objects_read.Add(1);
00361     }
00362     double e = sw.Elapsed();
00363     s_CacheStats.total_time.Add((int)(e * 1000));
00364     //LOG_POST(Info << NCBI_CURRENT_FUNCTION << "(): read = " << sw.Elapsed() * 1000 << " msec");
00365 
00366     return reader.release();
00367 }
00368 
00369 
00370 ///////////////////////////////////////////////////////////////////////////////
00371 const char* CSQLITE3_ICacheException::GetErrCodeString(void) const
00372 {
00373     switch (GetErrCode())
00374     {
00375     case eUnknown:          return "eUnknown";
00376     case eInitError:        return "eInitError";
00377     case eNotImplemented:   return "eNotImplemented";
00378     default:                return  CException::GetErrCodeString();
00379     }
00380 }
00381 
00382 ///////////////////////////////////////////////////////////////////////////////
00383 
00384 CSQLITE3_Cache::CSQLITE3_Cache()
00385     : m_Timeout(7 * (24*60*60))
00386     , m_TimeStampFlag(kDefaultTimestampPolicy)
00387     , m_VersionFlag(eKeepAll)
00388     , m_DB(NULL)
00389 {
00390 //    m_WriterThread.Reset(new CWriterThread(*this, m_WriteQueue));
00391 //    m_WriterThread->Run();
00392 }
00393 
00394 CSQLITE3_Cache::~CSQLITE3_Cache()
00395 {
00396     LOG_POST(Info << NCBI_CURRENT_FUNCTION);
00397 
00398     // stop the write thread and queue
00399     if (!m_WriterThread.IsNull()) {
00400         m_WriterThread->Stop();
00401         m_WriterThread->Join();
00402     }
00403 
00404     if (m_DB) {
00405         
00406         m_Stmt_Store.release();
00407         m_Stmt_HasBlobs_key.release();
00408         m_Stmt_HasBlobs_key_subkey.release();
00409         m_Stmt_GetBlobAccess.release();
00410         m_Stmt_GetReadStream.release();
00411         m_Stmt_SetTimestamp.release();
00412 
00413         
00414         // close all pending statements
00415         size_t count = 0;
00416         sqlite3_stmt *stmt = NULL;
00417         while ( (stmt = sqlite3_next_stmt(m_DB, 0))!=0 ){
00418             sqlite3_finalize(stmt);
00419             ++count;
00420         }
00421         if (count) {
00422             LOG_POST(Warning << "CSQLITE3_Cache::~CSQLITE3_Cache(): flushed "
00423                      << count << " pending statements");
00424         } else {
00425             _TRACE("CSQLITE3_Cache::~CSQLITE3_Cache(): no pending statements");
00426         }
00427         
00428 
00429         // close the database
00430         int ret = sqlite3_close(m_DB);
00431         m_DB = NULL;
00432         if (ret != SQLITE_OK) {
00433             _ASSERT(ret != SQLITE_BUSY);
00434             LOG_POST(Error << "CSQLITE3_Cache::~CSQLITE3_Cache(): "
00435                      "error closing database '" << m_Database << "'");
00436         }
00437     }
00438 
00439     size_t items = s_CacheStats.objects_read.Get();
00440     size_t bytes = s_CacheStats.bytes_read.Get();
00441     size_t msec  = s_CacheStats.total_time.Get();
00442 
00443     LOG_POST(Info << "CSQLITE3_Cache::~CSQLITE3_Cache(): read "
00444         << items << " items / "
00445         << bytes << " bytes / "
00446         << msec << " msec / "
00447         << bytes / double(items) << " bytes/item / "
00448         << msec / double(items) << " msec/item / "
00449         );
00450 }
00451 
00452 bool CSQLITE3_Cache::Vacuum()
00453 {
00454     CSQLITE3_Statement stmt(m_DB, "VACUUM");
00455     if (!stmt.Execute() ) {
00456         LOG_POST(Warning << "Failed to vacuum the sqllite3 database:" << m_Database);
00457         return false;
00458     }
00459     return true;
00460 }
00461 
00462 
00463 void CSQLITE3_Cache::Open(const string& database, bool remove)
00464 {
00465     CStopWatch sw;
00466     sw.Start();
00467 
00468     LOG_POST(Info << NCBI_CURRENT_FUNCTION << "(" << database << ")");
00469 
00470 
00471 
00472     //
00473     // open the database
00474     m_Database = database;
00475 
00476     if (m_DB) {
00477         /*int ret = */sqlite3_close(m_DB);
00478         m_DB = 0;
00479     }
00480 
00481     if (remove) {
00482         CDirEntry de(m_Database);
00483         de.Remove();
00484     }
00485 
00486     // check if target dir is present
00487     //
00488     {
00489         string dir;
00490         CDirEntry::SplitPath(m_Database, &dir);
00491 
00492         CDir de(dir);
00493         if (!de.Exists()) {
00494             de.Create();
00495         }
00496     }
00497 
00498 
00499     #if (SQLITE_VERSION_NUMBER > 3005001)
00500         int ret = sqlite3_open_v2(m_Database.c_str(), &m_DB,
00501                                   SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE,
00502                                   NULL);
00503     #else
00504         int ret = sqlite3_open(m_Database.c_str(), &m_DB);
00505     #endif
00506     if (ret != SQLITE_OK) {
00507         string msg("error opening database '");
00508         msg += m_Database;
00509         msg += "': ";
00510         msg += sqlite3_errmsg(m_DB);
00511 
00512         try {
00513             CDirEntry de(m_Database);
00514             de.Remove();
00515         } 
00516         catch (std::exception& e) 
00517         {
00518             LOG_POST(Error << "Attempt to delete damaged DB failed: " << e.what());
00519         }
00520 
00521         NCBI_THROW(CSQLITE3_ICacheException, eInitError, msg);
00522     }
00523 
00524 
00525 
00526     //
00527     // standard db connection setup fior SQLite
00528     //
00529     if ( !CSQLITE3_Statement(m_DB, "PRAGMA journal_mode = OFF").Execute() ) {
00530         NCBI_THROW(CSQLITE3_ICacheException, eInitError,
00531                    "failed to set journaling mode");
00532     }
00533 
00534     if ( !CSQLITE3_Statement(m_DB, "PRAGMA temp_store = MEMORY").Execute() ) {
00535         NCBI_THROW(CSQLITE3_ICacheException, eInitError,
00536                    "failed to set temp store");
00537     }
00538 
00539     if ( !CSQLITE3_Statement(m_DB, "PRAGMA page_size = 32768").Execute() ) {
00540         NCBI_THROW(CSQLITE3_ICacheException, eInitError,
00541                    "failed to set page_size");
00542     }
00543 
00544     // synchronous = OFF gives major speed difference when we update timestamps
00545     //
00546     if ( !CSQLITE3_Statement(m_DB, "PRAGMA synchronous = OFF").Execute() ) {
00547         NCBI_THROW(CSQLITE3_ICacheException, eInitError,
00548                    "failed to set synchronous mode");
00549     }
00550     if ( !CSQLITE3_Statement(m_DB, "PRAGMA count_changes = OFF").Execute() ) {
00551         NCBI_THROW(CSQLITE3_ICacheException, eInitError,
00552                    "failed to disable count chnages mode");
00553     }
00554 
00555 
00556     //
00557     // we now have an open connection
00558     // we can try to purge some data
00559     //
00560     if (CSQLITE3_Statement(m_DB, "PRAGMA table_info(CacheBlobs)").Step() == SQLITE_ROW) {
00561     } else {
00562         // create our table
00563         CSQLITE3_Statement stmt
00564             (m_DB,
00565              "CREATE TABLE CacheBlobs " \
00566              "(" \
00567              "   key       varchar(256) NOT NULL, " \
00568              "   version   int          NOT NULL, " \
00569              "   subkey    varchar(256) NOT NULL, " \
00570              "" \
00571              "   timestamp int          NOT NULL, " \
00572              "   data      blob         NULL " \
00573              ")");
00574         if ( !stmt.Execute() ) {
00575             NCBI_THROW(CSQLITE3_ICacheException, eInitError,
00576                        "failed to initialize cache");
00577         }
00578     }
00579 
00580     // check our indices as well
00581     // some earlier versions were created without indices
00582     if (CSQLITE3_Statement(m_DB, "PRAGMA index_info(CacheBlobs_pk)").Step() != SQLITE_ROW) {
00583         CSQLITE3_Statement stmt
00584             (m_DB,
00585             "CREATE UNIQUE INDEX CacheBlobs_pk ON CacheBlobs(key, version, subkey)");
00586         if ( !stmt.Execute() ) {
00587             NCBI_THROW(CSQLITE3_ICacheException, eInitError,
00588                        "failed to initialize cache: failed to create PK index");
00589         }
00590     }
00591 
00592     if (CSQLITE3_Statement(m_DB, "PRAGMA index_info(CacheBlobs_timestamp)").Step() != SQLITE_ROW) {
00593         CSQLITE3_Statement stmt
00594             (m_DB,
00595             "CREATE INDEX CacheBlobs_timestamp ON CacheBlobs(timestamp)");
00596         if ( !stmt.Execute() ) {
00597             NCBI_THROW(CSQLITE3_ICacheException, eInitError,
00598                 "failed to initialize cache: failed to create timestamp index");
00599         }
00600     }
00601 
00602     // start background writer (if it is not started yet)
00603     if (m_WriterThread.IsNull()) {
00604         m_WriterThread.Reset(new CWriterThread(*this, m_WriteQueue));
00605         m_WriterThread->Run();
00606     }
00607 
00608     LOG_POST(Info << "CSQLITE3_Cache::Open(): " << sw.Elapsed() << " seconds");
00609 }
00610 
00611 
00612 bool CSQLITE3_Cache::IsOpen() const
00613 {
00614     //LOG_POST(Info << NCBI_CURRENT_FUNCTION);
00615     return (m_DB != NULL);
00616 }
00617 
00618 
00619 string CSQLITE3_Cache::GetCacheName() const
00620 {
00621     //LOG_POST(Info << NCBI_CURRENT_FUNCTION);
00622     return m_Database;
00623 }
00624 
00625 
00626 void CSQLITE3_Cache::SetTimeStampPolicy(TTimeStampFlags policy,
00627                                         unsigned int    timeout,
00628                                         unsigned int    max_timeout)
00629 {
00630     //LOG_POST(Info << NCBI_CURRENT_FUNCTION);
00631     CMutexGuard guard(m_Mutex);
00632     if (policy) {
00633         m_TimeStampFlag = policy;
00634     } else {
00635         m_TimeStampFlag = kDefaultTimestampPolicy;
00636     }
00637 
00638     m_Timeout = timeout;
00639 }
00640 
00641 
00642 CSQLITE3_Cache::TTimeStampFlags CSQLITE3_Cache::GetTimeStampPolicy() const
00643 {
00644     //LOG_POST(Info << NCBI_CURRENT_FUNCTION);
00645     return m_TimeStampFlag;
00646 }
00647 
00648 
00649 int CSQLITE3_Cache::GetTimeout() const
00650 {
00651     //LOG_POST(Info << NCBI_CURRENT_FUNCTION);
00652     return m_Timeout;
00653 }
00654 
00655 
00656 void CSQLITE3_Cache::SetVersionRetention(EKeepVersions policy)
00657 {
00658     //LOG_POST(Info << NCBI_CURRENT_FUNCTION);
00659     m_VersionFlag = policy;
00660 }
00661 
00662 
00663 CSQLITE3_Cache::EKeepVersions CSQLITE3_Cache::GetVersionRetention() const
00664 {
00665     //LOG_POST(Info << NCBI_CURRENT_FUNCTION);
00666     return m_VersionFlag;
00667 }
00668 
00669 
00670 void CSQLITE3_Cache::x_SetTimestamp(const string& key,
00671                                     int           version,
00672                                     const string& subkey)
00673 {
00674     CMutexGuard LOCK(m_Mutex);
00675     time_t time = CTime(CTime::eCurrent).GetTimeT();
00676 
00677     if ( !m_Stmt_SetTimestamp.get() ) {
00678         string sql =
00679             "UPDATE CacheBlobs SET timestamp = ?1 WHERE "
00680             "key = ?2 AND version = ?3 AND subkey = ?4";
00681         //s_MakeKeyCondition(key, version, subkey, &sql);
00682         m_Stmt_SetTimestamp.reset(new CSQLITE3_Statement(m_DB, sql));
00683     } else {
00684         m_Stmt_SetTimestamp->Reset();
00685     }
00686 
00687     //CSQLITE3_Statement stmt2(m_DB, sql);
00688     m_Stmt_SetTimestamp->Bind(1, int(time));
00689     m_Stmt_SetTimestamp->Bind(2, key);
00690     m_Stmt_SetTimestamp->Bind(3, version);
00691     m_Stmt_SetTimestamp->Bind(4, subkey);
00692     if ( !m_Stmt_SetTimestamp->Execute() ) {
00693         LOG_POST(Error << "failed to update timestamp on cache blob: "
00694             << "\"" << key << "\", " << version << ", \""
00695             << subkey << "\": " << sqlite3_errmsg(m_DB));
00696     } else {
00697         /**
00698         LOG_POST(Info
00699             << "\"" << key << "\", " << version << ", \""
00700             << subkey << "\": timestamp=" << time);
00701             **/
00702     }
00703 }
00704 
00705 void CSQLITE3_Cache::Store(const string&  key,
00706                            int            version,
00707                            const string&  subkey,
00708                            const void*    data,
00709                            size_t         size,
00710                            unsigned int   time_to_live,
00711                            const string&  owner)
00712 {
00713     //
00714     // prepare our write reque    
00715     CRef<SWriteRequest> req(new SWriteRequest);
00716     req->key = key;
00717     req->version = version;
00718     req->subkey = subkey;
00719     req->buffer.resize(size);
00720     memcpy(&req->buffer[0], data, size);
00721 
00722     // push to our queue
00723     m_WriteQueue.Push(req);
00724 }
00725 
00726 
00727 
00728 void CSQLITE3_Cache::StoreSynchronous(const string&  key,
00729                                       int            version,
00730                                       const string&  subkey,
00731                                       const void*    data,
00732                                       size_t         size)
00733 {
00734     CMutexGuard LOCK(m_Mutex);
00735 
00736     /**
00737     LOG_POST(Info << NCBI_CURRENT_FUNCTION
00738         << "(\"" << key << "\", " << version << ", \""
00739         << subkey << "\", " << data << ", " << size << "): thread="
00740         << CThread::GetSelf());
00741         **/
00742     _ASSERT(m_DB);
00743 
00744     if (m_VersionFlag == eDropAll || m_VersionFlag == eDropOlder) {
00745         Purge(key, subkey, 0, m_VersionFlag);
00746     }
00747 
00748     string sql;
00749     int ret = 0;
00750     time_t timestamp = CTime(CTime::eCurrent).GetTimeT();
00751 
00752     //
00753     // insert the row into cache data
00754     // we scan first to see if the row already exists
00755     //
00756     if ( !m_Stmt_Store.get() ) {
00757         m_Stmt_Store.reset(new CSQLITE3_Statement(m_DB,
00758             "INSERT OR REPLACE INTO CacheBlobs (key, version, subkey, timestamp, data) "
00759             "VALUES( ?1, ?2, ?3, ?4, ?5 )"));
00760     }
00761 
00762     m_Stmt_Store->Reset();
00763     m_Stmt_Store->Bind(1, key);
00764     m_Stmt_Store->Bind(2, version);
00765     m_Stmt_Store->Bind(3, subkey);
00766     m_Stmt_Store->Bind(4, int(timestamp));
00767     m_Stmt_Store->Bind(5, data, size);
00768 
00769      // execute...
00770      if ( (ret = m_Stmt_Store->Step()) != SQLITE_DONE) {
00771          LOG_POST(Error << "failed to write " << size << " bytes: "
00772                   << sql << ": [" << ret << "] " << sqlite3_errmsg(m_DB));
00773      }
00774 }
00775 
00776 
00777 size_t CSQLITE3_Cache::GetSize(const string&  key,
00778                                int            version,
00779                                const string&  subkey)
00780 {
00781     LOG_POST(Info << NCBI_CURRENT_FUNCTION);
00782 
00783     string sql = "SELECT data FROM CacheBlobs WHERE ";
00784     s_MakeKeyCondition(key, version, subkey, &sql);
00785     CSQLITE3_Statement stmt(m_DB, sql);
00786     if (stmt.Step() == SQLITE_ROW) {
00787         return sqlite3_column_bytes(stmt.GetStatement(), 0);
00788     }
00789 
00790     return 0;
00791 }
00792 
00793 
00794 bool CSQLITE3_Cache::Read(const string& key,
00795                           int           version,
00796                           const string& subkey,
00797                           void*         buf,
00798                           size_t        buf_size)
00799 {
00800     LOG_POST(Info << NCBI_CURRENT_FUNCTION);
00801 
00802     string sql = "SELECT data FROM CacheBlobs WHERE ";
00803     s_MakeKeyCondition(key, version, subkey, &sql);
00804 
00805     CSQLITE3_Statement stmt(m_DB, sql);
00806     if (stmt.Step() == SQLITE_ROW) {
00807         size_t size = sqlite3_column_bytes(stmt.GetStatement(), 0);
00808         size = min(size, buf_size);
00809         memcpy(buf,
00810                sqlite3_column_blob(stmt.GetStatement(), 0),
00811                size);
00812 
00813         /// set timestamp
00814         if (m_TimeStampFlag & fTimeStampOnRead) {
00815             x_SetTimestamp(key, version, subkey);
00816         }
00817         return true;
00818     }
00819 
00820     return false;
00821 }
00822 
00823 
00824 IReader* CSQLITE3_Cache::GetReadStream(const string&  key,
00825                                        int            version,
00826                                        const string&  subkey)
00827 {
00828     CMutexGuard LOCK(m_Mutex);
00829     //LOG_POST(Info << NCBI_CURRENT_FUNCTION << "(" << key << ", " << version << ", " << subkey << ")");
00830 
00831     //
00832     // retrieve our row and create a blob reader
00833     // FIXME: use incremental I/O in the future
00834     //
00835     if ( !m_Stmt_GetReadStream.get() ) {
00836         string sql =
00837             "SELECT data FROM CacheBlobs WHERE "
00838             "key = ?1 AND version = ?2 AND subkey = ?3"
00839             ;
00840         m_Stmt_GetReadStream.reset(new CSQLITE3_Statement(m_DB, sql));
00841     } else {
00842         m_Stmt_GetReadStream->Reset();
00843     }
00844     //s_MakeKeyCondition(key, version, subkey, &sql);
00845     //CSQLITE3_Statement stmt(m_DB, sql);
00846     m_Stmt_GetReadStream->Bind(1, key);
00847     m_Stmt_GetReadStream->Bind(2, version);
00848     m_Stmt_GetReadStream->Bind(3, subkey);
00849     if (m_Stmt_GetReadStream->Step() == SQLITE_ROW) {
00850         auto_ptr<IReader> reader(GetBlobReader(*m_Stmt_GetReadStream, 0));
00851 
00852         /// set timestamp
00853         if (m_TimeStampFlag & fTimeStampOnRead) {
00854             x_SetTimestamp(key, version, subkey);
00855         }
00856         return reader.release();
00857     }
00858 
00859     return NULL;
00860 }
00861 
00862 IReader* CSQLITE3_Cache::GetReadStream(
00863     const string&  /*key*/,
00864     const string&  /*subkey*/,
00865     int*           /*version*/,
00866     EBlobValidity* /*validity*/)
00867 {
00868     // ICache last valid version protocol is not implemented in GBench
00869     // (this is ok it is optimization only important for network cache)
00870     NCBI_THROW(CSQLITE3_ICacheException, eNotImplemented,
00871         "CSQLITE3_Cache::GetReadStream(key, subkey, version, validity) "
00872         "is not implemented");
00873 }
00874 
00875 void CSQLITE3_Cache::SetBlobVersionAsValid(const string&  /* key */,
00876                                            const string&  /* subkey */,
00877                                            int            /* version */)
00878 {
00879     // ICache last valid version protocol is not implemented in GBench
00880     // (this is ok it is optimization only important for network cache)
00881     NCBI_THROW(CSQLITE3_ICacheException, eNotImplemented,
00882         "CSQLITE3_Cache::SetBlobVersionAsValid(key, subkey, version) "
00883         "is not implemented");
00884 }
00885 
00886 
00887 IWriter* CSQLITE3_Cache::GetWriteStream(const string& key,
00888                                         int           version,
00889                                         const string& subkey,
00890                                         unsigned int  time_to_live,
00891                                         const string& owner)
00892 {
00893     //LOG_POST(Info << NCBI_CURRENT_FUNCTION << "(" << key << ", " << version << ", " << subkey << ", " << time_to_live << ", " << owner << ")");
00894 
00895     class CBlobWriter : public IWriter
00896     {
00897     public:
00898         CBlobWriter(CSQLITE3_Cache& cache,
00899                     const string&   key,
00900                     int             version,
00901                     const string&   subkey)
00902             : m_Cache(cache)
00903             , m_Key(key)
00904             , m_Version(version)
00905             , m_Subkey(subkey)
00906             , m_Flushed(false)
00907         {
00908         }
00909 
00910         ~CBlobWriter()
00911         {
00912             if ( !m_Flushed ) {
00913                 Flush();
00914             }
00915         }
00916 
00917         ERW_Result Write(const void* buf,
00918                          size_t      count,
00919                          size_t*     bytes_written = 0)
00920         {
00921             if ( !m_Flushed ) {
00922                 m_Data.insert(m_Data.end(),
00923                               (const unsigned char*)(buf),
00924                               (const unsigned char*)(buf) + count);
00925                 if (bytes_written) {
00926                     *bytes_written = count;
00927                 }
00928                 return eRW_Success;
00929             } else {
00930                 return eRW_Error;
00931             }
00932         }
00933 
00934         ERW_Result Flush()
00935         {
00936             if (m_Data.size()) {
00937                 m_Cache.Store(m_Key, m_Version, m_Subkey,
00938                               &m_Data[0], m_Data.size());
00939                 //LOG_POST(Info << NCBI_CURRENT_FUNCTION << "(): wrote " << m_Data.size() << " bytes");
00940             }
00941             m_Flushed = true;
00942             return eRW_Success;
00943         }
00944 
00945     private:
00946         CSQLITE3_Cache& m_Cache;
00947         string m_Key;
00948         int m_Version;
00949         string m_Subkey;
00950         vector<unsigned char> m_Data;
00951         bool m_Flushed;
00952     };
00953 
00954     return new CBlobWriter(*this, key, version, subkey);
00955 }
00956 
00957 
00958 void CSQLITE3_Cache::Remove(const string& key)
00959 {
00960     //LOG_POST(Info << NCBI_CURRENT_FUNCTION);
00961 
00962     // create transaction
00963     string sql = "DELETE FROM CacheBlobs WHERE key = '";
00964     sql += key;
00965     sql += "'";
00966     CSQLITE3_Statement stmt(m_DB, sql);
00967     stmt.Execute();
00968 }
00969 
00970 
00971 void CSQLITE3_Cache::Remove(const string&    key,
00972                             int              version,
00973                             const string&    subkey)
00974 {
00975     //LOG_POST(Info << NCBI_CURRENT_FUNCTION);
00976 
00977     /// create transaction
00978     string sql = "DELETE FROM CacheBlobs WHERE ";
00979     s_MakeKeyCondition(key, version, subkey, &sql);
00980     CSQLITE3_Statement stmt(m_DB, sql);
00981     stmt.Execute();
00982 }
00983 
00984 
00985 time_t CSQLITE3_Cache::GetAccessTime(const string&  key,
00986                                      int            version,
00987                                      const string&  subkey)
00988 {
00989     LOG_POST(Info << NCBI_CURRENT_FUNCTION);
00990 
00991     string sql = "SELECT timestamp FROM CacheBlobs WHERE ";
00992     s_MakeKeyCondition(key, version, subkey, &sql);
00993     CSQLITE3_Statement stmt(m_DB, sql);
00994     if (stmt.Step() == SQLITE_ROW) {
00995         return stmt.GetInt(0);
00996     }
00997 
00998     return 0;
00999 }
01000 
01001 
01002 void CSQLITE3_Cache::Purge(time_t           access_timeout,
01003                            EKeepVersions    keep_last_version)
01004 {
01005     LOG_POST(Info << NCBI_CURRENT_FUNCTION);
01006 
01007     if (keep_last_version == eDropAll && access_timeout == 0) {
01008         //x_TruncateDB();
01009         return;
01010     }
01011 
01012     CTime time_stamp(CTime::eCurrent);
01013     time_t curr = time_stamp.GetTimeT();
01014     int timeout = GetTimeout();
01015     curr -= timeout;
01016 
01017     string sql = "DELETE FROM CacheBlobs WHERE timestamp < ?1";
01018     CSQLITE3_Statement stmt(m_DB, sql);
01019     stmt.Bind(1, int(curr));
01020     if (stmt.Step() == SQLITE_DONE) {
01021         int count = sqlite3_changes(m_DB);
01022         LOG_POST(Info << "CSQLITE3_Cache::Purge(): "
01023             << count << " items purged");
01024     }
01025 }
01026 
01027 
01028 void CSQLITE3_Cache::Purge(const string&    key,
01029                            const string&    subkey,
01030                            time_t           access_timeout,
01031                            EKeepVersions    keep_last_version)
01032 {
01033     LOG_POST(Info << NCBI_CURRENT_FUNCTION);
01034 
01035     if (keep_last_version == eDropAll && access_timeout == 0) {
01036         //x_TruncateDB();
01037         return;
01038     }
01039 
01040     CTime time_stamp(CTime::eCurrent);
01041     time_t curr = time_stamp.GetTimeT();
01042     int timeout = GetTimeout();
01043     curr -= timeout;
01044 
01045     string sql =
01046         "DELETE FROM CacheBlobs WHERE "
01047         " timestamp < ?1 ";
01048 
01049     if (!key.empty()) {
01050         sql += " AND key = '";
01051         sql += key;
01052         sql += "'";
01053     }
01054 
01055     if (!subkey.empty()) {
01056         sql += " AND subkey = '";
01057         sql += subkey;
01058         sql += "'";
01059     }
01060     CSQLITE3_Statement stmt(m_DB, sql);
01061     stmt.Bind(1, int(curr));
01062     if (stmt.Step() == SQLITE_DONE) {
01063         int count = sqlite3_changes(m_DB);
01064         LOG_POST(Info << "CSQLITE3_Cache::Purge(): "
01065             << count << " items purged");
01066     }
01067 }
01068 
01069 
01070 bool CSQLITE3_Cache::SameCacheParams(const TCacheParams* params) const
01071 {
01072     if ( !params ) {
01073         return false;
01074     }
01075     const TCacheParams* driver = params->FindNode("driver");
01076     if (!driver  ||  driver->GetValue().value != kSQLITE3_BlobCacheDriverName) {
01077         return false;
01078     }
01079     const TCacheParams* driver_params =
01080         params->FindNode(kSQLITE3_BlobCacheDriverName);
01081     if ( !driver_params ) {
01082         return false;
01083     }
01084     const TCacheParams* path = driver_params->FindNode("database");
01085     if (!path) {
01086         return false;
01087     }
01088     const string& database = path->GetValue().value;
01089 
01090     string base1, base2;
01091     CDirEntry::SplitPath(database, 0, &base1, 0);
01092     CDirEntry::SplitPath(m_Database, 0, &base2, 0);
01093 
01094     if (base1 == base2) 
01095         return true;
01096     return false;
01097 }
01098 
01099 
01100 bool CSQLITE3_Cache::HasBlobs(const string&  key,
01101                               const string&  subkey)
01102 {
01103     CMutexGuard LOCK(m_Mutex);
01104     //LOG_POST(Info << NCBI_CURRENT_FUNCTION << "(\"" << key << "\", \"" << subkey << "\")");
01105 
01106     CSQLITE3_Statement* stmt = NULL;
01107     if ( !m_Stmt_HasBlobs_key.get() ) {
01108         m_Stmt_HasBlobs_key.reset
01109             (new CSQLITE3_Statement(m_DB,
01110                                     "SELECT timestamp FROM CacheBlobs WHERE "
01111                                     "key = ?1"));
01112     }
01113 
01114     if ( !m_Stmt_HasBlobs_key_subkey.get() ) {
01115         m_Stmt_HasBlobs_key_subkey.reset
01116             (new CSQLITE3_Statement(m_DB,
01117                                     "SELECT timestamp FROM CacheBlobs WHERE "
01118                                     "key = ?1 AND subkey = ?2"));
01119     }
01120 
01121     if (subkey.empty()) {
01122         stmt = &*m_Stmt_HasBlobs_key;
01123     } else {
01124         stmt = &*m_Stmt_HasBlobs_key_subkey;
01125     }
01126 
01127     stmt->Reset();
01128     stmt->Bind(1, key);
01129     if ( !subkey.empty() ) {
01130         stmt->Bind(2, subkey);
01131     }
01132     return (stmt->Step() == SQLITE_ROW);
01133 }
01134 
01135 
01136 void CSQLITE3_Cache::GetBlobAccess(const string&     key,
01137                                    int               version,
01138                                    const string&     subkey,
01139                                    SBlobAccessDescr*  blob_descr)
01140 {
01141     CMutexGuard LOCK(m_Mutex);
01142     /**
01143     LOG_POST(Info << NCBI_CURRENT_FUNCTION
01144              << "(\"" << key << "\", " << version << ", \"" << subkey << "\"): thread="
01145              << CThread::GetSelf());
01146              **/
01147 
01148     blob_descr->reader.reset();
01149     blob_descr->blob_size = 0;
01150     blob_descr->blob_found = false;
01151 
01152     string sql;
01153     time_t time    = CTime(CTime::eCurrent).GetTimeT();
01154     time_t timeout = time - GetTimeout();
01155 
01156     // check to see if the row exists in the cache attributes
01157     // if so, we intend on updating the timestamp to be now
01158     int this_timestamp = 0;
01159 
01160     // we have a blob
01161     if ( !m_Stmt_GetBlobAccess.get() ) {
01162         sql =
01163             "SELECT timestamp, data FROM CacheBlobs WHERE "
01164             "key = ?1 AND version = ?2 AND subkey = ?3";
01165         //s_MakeKeyCondition(key, version, subkey, &sql);
01166         m_Stmt_GetBlobAccess.reset(new CSQLITE3_Statement(m_DB, sql));
01167     } else {
01168         m_Stmt_GetBlobAccess->Reset();
01169     }
01170 
01171     m_Stmt_GetBlobAccess->Bind(1, key);
01172     m_Stmt_GetBlobAccess->Bind(2, version);
01173     m_Stmt_GetBlobAccess->Bind(3, subkey);
01174     if (m_Stmt_GetBlobAccess->Step() == SQLITE_ROW) {
01175         // read and process our timestamp
01176         this_timestamp = m_Stmt_GetBlobAccess->GetInt(0);
01177         if (this_timestamp < timeout) {
01178             Remove(key, version, subkey);
01179         } else {
01180             // read and process our blob
01181             blob_descr->blob_size =
01182                 sqlite3_column_bytes(m_Stmt_GetBlobAccess->GetStatement(), 1);
01183             blob_descr->blob_found = true;
01184 
01185             if (blob_descr->buf  &&
01186                 blob_descr->buf_size >= blob_descr->blob_size) {
01187                 memcpy(blob_descr->buf,
01188                        sqlite3_column_blob(m_Stmt_GetBlobAccess->GetStatement(), 1),
01189                        blob_descr->blob_size);
01190                 //LOG_POST(Info << "  direct copy: " << blob_descr->blob_size << " bytes");
01191             } else {
01192                 blob_descr->reader.reset(GetBlobReader(*m_Stmt_GetBlobAccess, 1));
01193                 /**
01194                 LOG_POST(Info << "  stream read: " << blob_descr->blob_size
01195                          << " bytes / " << blob_descr->buf_size << " bytes in buffer");
01196                          **/
01197             }
01198         }
01199 
01200         // set timestamp
01201         if (m_TimeStampFlag & fTimeStampOnRead) {
01202             x_SetTimestamp(key, version, subkey);
01203         }
01204     }
01205 }
01206 
01207 
01208 /// @name unimplemented
01209 /// @{
01210 void CSQLITE3_Cache::GetBlobOwner(const string&  key,
01211                                   int            version,
01212                                   const string&  subkey,
01213                                   string*        owner)
01214 {
01215     LOG_POST(Info << NCBI_CURRENT_FUNCTION);
01216     _ASSERT(owner);
01217     owner->erase(); // not supported in this implementation
01218 }
01219 
01220 void CSQLITE3_Cache::SetMemBufferSize(unsigned int buf_size)
01221 {
01222     /// noop
01223 }
01224 
01225 
01226 /// @}
01227 
01228 END_NCBI_SCOPE
Modified on Wed May 23 13:10:38 2012 by modify_doxy.py rev. 337098