|
NCBI C++ ToolKit
|
00001 /* $Id: sqlite_cache.cpp 24283 2011-09-01 17:38:45Z kuznets $ 00002 * =========================================================================== 00003 * 00004 * PUBLIC DOMAIN NOTICE 00005 * National Center for Biotechnology Information 00006 * 00007 * This software/database is a "United States Government Work" under the 00008 * terms of the United States Copyright Act. It was written as part of 00009 * the author's official duties as a United States Government employee and 00010 * thus cannot be copyrighted. This software/database is freely available 00011 * to the public for use. The National Library of Medicine and the U.S. 00012 * Government have not placed any restriction on its use or reproduction. 00013 * 00014 * Although all reasonable efforts have been taken to ensure the accuracy 00015 * and reliability of the software and data, the NLM and the U.S. 00016 * Government do not and cannot warrant the performance or results that 00017 * may be obtained by using this software or data. The NLM and the U.S. 00018 * Government disclaim all warranties, express or implied, including 00019 * warranties of performance, merchantability or fitness for any particular 00020 * purpose. 00021 * 00022 * Please cite the author in any work or product based on this material. 00023 * 00024 * =========================================================================== 00025 * 00026 * Author: Mike DiCuccio 00027 * 00028 * File Description: SQLITE3 based ICache interface 00029 * 00030 */ 00031 00032 #include <ncbi_pch.hpp> 00033 #include <corelib/ncbistr.hpp> 00034 #include <corelib/ncbimtx.hpp> 00035 #include <corelib/ncbitime.hpp> 00036 #include <corelib/ncbifile.hpp> 00037 #include <util/simple_buffer.hpp> 00038 00039 #include <gui/cache/sqlite_cache.hpp> 00040 00041 #include <sqlite3.h> 00042 00043 00044 BEGIN_NCBI_SCOPE 00045 00046 00047 ///////////////////////////////////////////////////////////////////////////// 00048 00049 CSQLITE3_Cache::CWriterThread::CWriterThread(CSQLITE3_Cache& cache, 00050 CSQLITE3_Cache::TWriteQueue& write_q) 00051 : m_StopRequest(new SWriteRequest), 00052 m_Cache(cache), 00053 m_WriteQueue(write_q) 00054 { 00055 } 00056 00057 void CSQLITE3_Cache::CWriterThread::Stop() 00058 { 00059 LOG_POST(Info << "CSQLITE3_Cache::CWriterThread: shutting down writer thread..."); 00060 m_WriteQueue.Push(m_StopRequest); 00061 } 00062 00063 00064 void* CSQLITE3_Cache::CWriterThread::Main() 00065 { 00066 m_Cache.Purge(m_Cache.GetTimeout()); 00067 00068 bool vac_ok = m_Cache.Vacuum(); 00069 if (!vac_ok) { 00070 // re-open db with file removal 00071 LOG_POST("SQLLite Vacuum failed use recovery procedure"); 00072 m_Cache.Open(m_Cache.GetDatabase(), true); 00073 } 00074 00075 00076 bool done = false; 00077 while ( !done ) { 00078 CRef<SWriteRequest> req; 00079 try { 00080 req = m_WriteQueue.Pop(); 00081 } 00082 catch (CException& ex) { 00083 LOG_POST(Error << ex); 00084 } 00085 00086 if ( !req ) { 00087 continue; 00088 } 00089 00090 if (m_StopRequest == req) { 00091 done = true; 00092 continue; 00093 } 00094 00095 // 00096 // write it! 00097 // 00098 m_Cache.StoreSynchronous(req->key, req->version, req->subkey, 00099 &req->buffer[0], req->buffer.size()); 00100 /* 00101 #ifdef _DEBUG 00102 size_t size = m_WriteQueue.GetSize(); 00103 if (size > 0) { 00104 _TRACE("CSQLITE3_Cache::CWriterThread::Main(): " 00105 << size << " requests pending..."); 00106 } 00107 #endif 00108 */ 00109 } 00110 LOG_POST(Info << "CSQLITE3_Cache::CWriterThread: writer thread stopped"); 00111 return NULL; 00112 } 00113 00114 00115 ///////////////////////////////////////////////////////////////////////////// 00116 00117 struct SStats { 00118 CAtomicCounter bytes_read; 00119 CAtomicCounter objects_read; 00120 CAtomicCounter total_time; 00121 }; 00122 00123 static SStats s_CacheStats; 00124 00125 /// Add BLOB key specific where condition 00126 static 00127 void s_MakeKeyCondition(const string& key, 00128 int version, 00129 const string& subkey, 00130 string* out_str) 00131 { 00132 *out_str += " key = "; 00133 *out_str += "'" + key + "'"; 00134 *out_str += " AND version = "; 00135 *out_str += NStr::IntToString(version); 00136 *out_str += " AND subkey = "; 00137 *out_str += "'" + subkey + "'"; 00138 } 00139 00140 /////////////////////////////////////////////////////////////////////////////// 00141 00142 class CSQLITE3_Statement 00143 { 00144 public: 00145 /// compile a statement 00146 CSQLITE3_Statement(sqlite3* db, const string& sql); 00147 ~CSQLITE3_Statement(); 00148 00149 sqlite3_stmt* GetStatement(); 00150 00151 void Bind(int col_or_id, const void* data, size_t size); 00152 void Bind(int col_or_id, const string& val); 00153 void Bind(int col_or_id, int val); 00154 00155 bool Execute(); 00156 void Reset(); 00157 int Step(); 00158 00159 /// return / extract an integer value for a column 00160 int GetInt(int col); 00161 CSQLITE3_Statement& operator>> (int& i); 00162 00163 private: 00164 sqlite3* m_DB; 00165 sqlite3_stmt* m_Stmt; 00166 string m_Sql; 00167 00168 void x_Log(int ret, const string& msg) 00169 { 00170 LOG_POST(Error << msg << ": [" << ret << "] " 00171 << sqlite3_errmsg(m_DB)); 00172 } 00173 00174 void x_Throw(int ret, const string& msg) 00175 { 00176 CNcbiOstrstream ostr; 00177 ostr << msg << ": [" << ret << "] " 00178 << sqlite3_errmsg(m_DB); 00179 string s = string(CNcbiOstrstreamToString(ostr)); 00180 NCBI_THROW(CSQLITE3_ICacheException, eUnknown, s); 00181 } 00182 00183 private: 00184 CSQLITE3_Statement(const CSQLITE3_Statement&); 00185 CSQLITE3_Statement& operator=(const CSQLITE3_Statement&); 00186 }; 00187 00188 00189 CSQLITE3_Statement::CSQLITE3_Statement(sqlite3* db, const string& sql) 00190 : m_DB(db), m_Stmt(NULL), m_Sql(sql) 00191 { 00192 _TRACE("sql: " << sql); 00193 int ret = 0; 00194 #if (SQLITE_VERSION_NUMBER > 3005001) 00195 if ( (ret = sqlite3_prepare_v2(m_DB, sql.c_str(), -1, 00196 &m_Stmt, NULL)) != SQLITE_OK) { 00197 #else 00198 if ( (ret = sqlite3_prepare(m_DB, sql.c_str(), -1, 00199 &m_Stmt, NULL)) != SQLITE_OK) { 00200 #endif 00201 m_Stmt = NULL; 00202 x_Throw(ret, "error preparing statement for \"" + sql + "\""); 00203 } 00204 //_TRACE("exec sql: " << sql); 00205 } 00206 00207 CSQLITE3_Statement::~CSQLITE3_Statement() 00208 { 00209 if (m_Stmt) { 00210 sqlite3_finalize(m_Stmt); 00211 } 00212 } 00213 00214 sqlite3_stmt* CSQLITE3_Statement::GetStatement() 00215 { 00216 return m_Stmt; 00217 } 00218 00219 int CSQLITE3_Statement::GetInt(int col) 00220 { 00221 return sqlite3_column_int(m_Stmt, col); 00222 } 00223 00224 CSQLITE3_Statement& CSQLITE3_Statement::operator>>(int& i) 00225 { 00226 i = GetInt(0); 00227 return *this; 00228 } 00229 00230 void CSQLITE3_Statement::Bind(int col_or_id, const void* data, size_t size) 00231 { 00232 _ASSERT(m_Stmt); 00233 int ret = 0; 00234 if ( (ret = sqlite3_bind_blob(m_Stmt, col_or_id, 00235 data, size, NULL)) != SQLITE_OK) { 00236 x_Throw(ret, "error binding blob"); 00237 } 00238 } 00239 00240 void CSQLITE3_Statement::Bind(int col_or_id, const string& val) 00241 { 00242 _ASSERT(m_Stmt); 00243 int ret = 0; 00244 if ( (ret = sqlite3_bind_text(m_Stmt, col_or_id, 00245 val.data(), val.size(), NULL)) != SQLITE_OK) { 00246 x_Throw(ret, "error binding string"); 00247 } 00248 } 00249 00250 void CSQLITE3_Statement::Bind(int col_or_id, int val) 00251 { 00252 _ASSERT(m_Stmt); 00253 int ret = 0; 00254 if ( (ret = sqlite3_bind_int(m_Stmt, col_or_id, val)) != SQLITE_OK) { 00255 x_Throw(ret, "error binding int"); 00256 } 00257 } 00258 00259 bool CSQLITE3_Statement::Execute() 00260 { 00261 size_t count = 0; 00262 for (;; ++count) { 00263 int ret = 0; 00264 switch ( (ret = sqlite3_step(m_Stmt)) ) { 00265 case SQLITE_ROW: 00266 break; 00267 00268 case SQLITE_DONE: 00269 return true; 00270 00271 default: 00272 return false; 00273 } 00274 } 00275 00276 return true; 00277 } 00278 00279 int CSQLITE3_Statement::Step() 00280 { 00281 return sqlite3_step(m_Stmt); 00282 } 00283 00284 void CSQLITE3_Statement::Reset() 00285 { 00286 sqlite3_reset(m_Stmt); 00287 #if (SQLITE_VERSION_NUMBER > 3005001) 00288 sqlite3_clear_bindings(m_Stmt); 00289 #endif 00290 } 00291 00292 00293 ////////////////////////////////////////////////////////////////////////////// 00294 00295 static IReader* GetBlobReader(CSQLITE3_Statement& stmt, int col) 00296 { 00297 /// local IReader implementation 00298 class CBlobReader : public IReader 00299 { 00300 public: 00301 CBlobReader(const unsigned char* buf, size_t size) 00302 : m_Pos(0) 00303 { 00304 m_Buf.resize(size); 00305 memcpy(&m_Buf[0], buf, size); 00306 } 00307 00308 virtual ~CBlobReader() 00309 { 00310 /** 00311 _TRACE(NCBI_CURRENT_FUNCTION << ": read " 00312 << m_Pos << "/" << m_Buf.size() << " bytes"); 00313 **/ 00314 } 00315 00316 ERW_Result Read(void* buf, 00317 size_t count, 00318 size_t* bytes_read = 0) 00319 { 00320 if ( !bytes_read ) { 00321 return eRW_Success; 00322 } 00323 00324 count = min(count, m_Buf.size() - m_Pos); 00325 *bytes_read = count; 00326 if (count) { 00327 memcpy(buf, &m_Buf[m_Pos], count); 00328 m_Pos += count; 00329 return eRW_Success; 00330 } else { 00331 return eRW_Eof; 00332 } 00333 } 00334 00335 ERW_Result PendingCount(size_t* count) 00336 { 00337 *count = m_Buf.size() - m_Pos; 00338 return eRW_Success; 00339 } 00340 00341 private: 00342 CSimpleBuffer m_Buf; 00343 size_t m_Pos; 00344 }; 00345 00346 /// 00347 /// retrieve our row and create a blob reader 00348 /// FIXME: use incremental I/O in the future 00349 /// 00350 00351 CStopWatch sw; 00352 sw.Start(); 00353 00354 auto_ptr<IReader> reader; 00355 int size = sqlite3_column_bytes(stmt.GetStatement(), 0); 00356 const void* data = sqlite3_column_blob(stmt.GetStatement(), 0); 00357 if (data) { 00358 reader.reset(new CBlobReader((const unsigned char*)data, size)); 00359 s_CacheStats.bytes_read.Add(size); 00360 s_CacheStats.objects_read.Add(1); 00361 } 00362 double e = sw.Elapsed(); 00363 s_CacheStats.total_time.Add((int)(e * 1000)); 00364 //LOG_POST(Info << NCBI_CURRENT_FUNCTION << "(): read = " << sw.Elapsed() * 1000 << " msec"); 00365 00366 return reader.release(); 00367 } 00368 00369 00370 /////////////////////////////////////////////////////////////////////////////// 00371 const char* CSQLITE3_ICacheException::GetErrCodeString(void) const 00372 { 00373 switch (GetErrCode()) 00374 { 00375 case eUnknown: return "eUnknown"; 00376 case eInitError: return "eInitError"; 00377 case eNotImplemented: return "eNotImplemented"; 00378 default: return CException::GetErrCodeString(); 00379 } 00380 } 00381 00382 /////////////////////////////////////////////////////////////////////////////// 00383 00384 CSQLITE3_Cache::CSQLITE3_Cache() 00385 : m_Timeout(7 * (24*60*60)) 00386 , m_TimeStampFlag(kDefaultTimestampPolicy) 00387 , m_VersionFlag(eKeepAll) 00388 , m_DB(NULL) 00389 { 00390 // m_WriterThread.Reset(new CWriterThread(*this, m_WriteQueue)); 00391 // m_WriterThread->Run(); 00392 } 00393 00394 CSQLITE3_Cache::~CSQLITE3_Cache() 00395 { 00396 LOG_POST(Info << NCBI_CURRENT_FUNCTION); 00397 00398 // stop the write thread and queue 00399 if (!m_WriterThread.IsNull()) { 00400 m_WriterThread->Stop(); 00401 m_WriterThread->Join(); 00402 } 00403 00404 if (m_DB) { 00405 00406 m_Stmt_Store.release(); 00407 m_Stmt_HasBlobs_key.release(); 00408 m_Stmt_HasBlobs_key_subkey.release(); 00409 m_Stmt_GetBlobAccess.release(); 00410 m_Stmt_GetReadStream.release(); 00411 m_Stmt_SetTimestamp.release(); 00412 00413 00414 // close all pending statements 00415 size_t count = 0; 00416 sqlite3_stmt *stmt = NULL; 00417 while ( (stmt = sqlite3_next_stmt(m_DB, 0))!=0 ){ 00418 sqlite3_finalize(stmt); 00419 ++count; 00420 } 00421 if (count) { 00422 LOG_POST(Warning << "CSQLITE3_Cache::~CSQLITE3_Cache(): flushed " 00423 << count << " pending statements"); 00424 } else { 00425 _TRACE("CSQLITE3_Cache::~CSQLITE3_Cache(): no pending statements"); 00426 } 00427 00428 00429 // close the database 00430 int ret = sqlite3_close(m_DB); 00431 m_DB = NULL; 00432 if (ret != SQLITE_OK) { 00433 _ASSERT(ret != SQLITE_BUSY); 00434 LOG_POST(Error << "CSQLITE3_Cache::~CSQLITE3_Cache(): " 00435 "error closing database '" << m_Database << "'"); 00436 } 00437 } 00438 00439 size_t items = s_CacheStats.objects_read.Get(); 00440 size_t bytes = s_CacheStats.bytes_read.Get(); 00441 size_t msec = s_CacheStats.total_time.Get(); 00442 00443 LOG_POST(Info << "CSQLITE3_Cache::~CSQLITE3_Cache(): read " 00444 << items << " items / " 00445 << bytes << " bytes / " 00446 << msec << " msec / " 00447 << bytes / double(items) << " bytes/item / " 00448 << msec / double(items) << " msec/item / " 00449 ); 00450 } 00451 00452 bool CSQLITE3_Cache::Vacuum() 00453 { 00454 CSQLITE3_Statement stmt(m_DB, "VACUUM"); 00455 if (!stmt.Execute() ) { 00456 LOG_POST(Warning << "Failed to vacuum the sqllite3 database:" << m_Database); 00457 return false; 00458 } 00459 return true; 00460 } 00461 00462 00463 void CSQLITE3_Cache::Open(const string& database, bool remove) 00464 { 00465 CStopWatch sw; 00466 sw.Start(); 00467 00468 LOG_POST(Info << NCBI_CURRENT_FUNCTION << "(" << database << ")"); 00469 00470 00471 00472 // 00473 // open the database 00474 m_Database = database; 00475 00476 if (m_DB) { 00477 /*int ret = */sqlite3_close(m_DB); 00478 m_DB = 0; 00479 } 00480 00481 if (remove) { 00482 CDirEntry de(m_Database); 00483 de.Remove(); 00484 } 00485 00486 // check if target dir is present 00487 // 00488 { 00489 string dir; 00490 CDirEntry::SplitPath(m_Database, &dir); 00491 00492 CDir de(dir); 00493 if (!de.Exists()) { 00494 de.Create(); 00495 } 00496 } 00497 00498 00499 #if (SQLITE_VERSION_NUMBER > 3005001) 00500 int ret = sqlite3_open_v2(m_Database.c_str(), &m_DB, 00501 SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, 00502 NULL); 00503 #else 00504 int ret = sqlite3_open(m_Database.c_str(), &m_DB); 00505 #endif 00506 if (ret != SQLITE_OK) { 00507 string msg("error opening database '"); 00508 msg += m_Database; 00509 msg += "': "; 00510 msg += sqlite3_errmsg(m_DB); 00511 00512 try { 00513 CDirEntry de(m_Database); 00514 de.Remove(); 00515 } 00516 catch (std::exception& e) 00517 { 00518 LOG_POST(Error << "Attempt to delete damaged DB failed: " << e.what()); 00519 } 00520 00521 NCBI_THROW(CSQLITE3_ICacheException, eInitError, msg); 00522 } 00523 00524 00525 00526 // 00527 // standard db connection setup fior SQLite 00528 // 00529 if ( !CSQLITE3_Statement(m_DB, "PRAGMA journal_mode = OFF").Execute() ) { 00530 NCBI_THROW(CSQLITE3_ICacheException, eInitError, 00531 "failed to set journaling mode"); 00532 } 00533 00534 if ( !CSQLITE3_Statement(m_DB, "PRAGMA temp_store = MEMORY").Execute() ) { 00535 NCBI_THROW(CSQLITE3_ICacheException, eInitError, 00536 "failed to set temp store"); 00537 } 00538 00539 if ( !CSQLITE3_Statement(m_DB, "PRAGMA page_size = 32768").Execute() ) { 00540 NCBI_THROW(CSQLITE3_ICacheException, eInitError, 00541 "failed to set page_size"); 00542 } 00543 00544 // synchronous = OFF gives major speed difference when we update timestamps 00545 // 00546 if ( !CSQLITE3_Statement(m_DB, "PRAGMA synchronous = OFF").Execute() ) { 00547 NCBI_THROW(CSQLITE3_ICacheException, eInitError, 00548 "failed to set synchronous mode"); 00549 } 00550 if ( !CSQLITE3_Statement(m_DB, "PRAGMA count_changes = OFF").Execute() ) { 00551 NCBI_THROW(CSQLITE3_ICacheException, eInitError, 00552 "failed to disable count chnages mode"); 00553 } 00554 00555 00556 // 00557 // we now have an open connection 00558 // we can try to purge some data 00559 // 00560 if (CSQLITE3_Statement(m_DB, "PRAGMA table_info(CacheBlobs)").Step() == SQLITE_ROW) { 00561 } else { 00562 // create our table 00563 CSQLITE3_Statement stmt 00564 (m_DB, 00565 "CREATE TABLE CacheBlobs " \ 00566 "(" \ 00567 " key varchar(256) NOT NULL, " \ 00568 " version int NOT NULL, " \ 00569 " subkey varchar(256) NOT NULL, " \ 00570 "" \ 00571 " timestamp int NOT NULL, " \ 00572 " data blob NULL " \ 00573 ")"); 00574 if ( !stmt.Execute() ) { 00575 NCBI_THROW(CSQLITE3_ICacheException, eInitError, 00576 "failed to initialize cache"); 00577 } 00578 } 00579 00580 // check our indices as well 00581 // some earlier versions were created without indices 00582 if (CSQLITE3_Statement(m_DB, "PRAGMA index_info(CacheBlobs_pk)").Step() != SQLITE_ROW) { 00583 CSQLITE3_Statement stmt 00584 (m_DB, 00585 "CREATE UNIQUE INDEX CacheBlobs_pk ON CacheBlobs(key, version, subkey)"); 00586 if ( !stmt.Execute() ) { 00587 NCBI_THROW(CSQLITE3_ICacheException, eInitError, 00588 "failed to initialize cache: failed to create PK index"); 00589 } 00590 } 00591 00592 if (CSQLITE3_Statement(m_DB, "PRAGMA index_info(CacheBlobs_timestamp)").Step() != SQLITE_ROW) { 00593 CSQLITE3_Statement stmt 00594 (m_DB, 00595 "CREATE INDEX CacheBlobs_timestamp ON CacheBlobs(timestamp)"); 00596 if ( !stmt.Execute() ) { 00597 NCBI_THROW(CSQLITE3_ICacheException, eInitError, 00598 "failed to initialize cache: failed to create timestamp index"); 00599 } 00600 } 00601 00602 // start background writer (if it is not started yet) 00603 if (m_WriterThread.IsNull()) { 00604 m_WriterThread.Reset(new CWriterThread(*this, m_WriteQueue)); 00605 m_WriterThread->Run(); 00606 } 00607 00608 LOG_POST(Info << "CSQLITE3_Cache::Open(): " << sw.Elapsed() << " seconds"); 00609 } 00610 00611 00612 bool CSQLITE3_Cache::IsOpen() const 00613 { 00614 //LOG_POST(Info << NCBI_CURRENT_FUNCTION); 00615 return (m_DB != NULL); 00616 } 00617 00618 00619 string CSQLITE3_Cache::GetCacheName() const 00620 { 00621 //LOG_POST(Info << NCBI_CURRENT_FUNCTION); 00622 return m_Database; 00623 } 00624 00625 00626 void CSQLITE3_Cache::SetTimeStampPolicy(TTimeStampFlags policy, 00627 unsigned int timeout, 00628 unsigned int max_timeout) 00629 { 00630 //LOG_POST(Info << NCBI_CURRENT_FUNCTION); 00631 CMutexGuard guard(m_Mutex); 00632 if (policy) { 00633 m_TimeStampFlag = policy; 00634 } else { 00635 m_TimeStampFlag = kDefaultTimestampPolicy; 00636 } 00637 00638 m_Timeout = timeout; 00639 } 00640 00641 00642 CSQLITE3_Cache::TTimeStampFlags CSQLITE3_Cache::GetTimeStampPolicy() const 00643 { 00644 //LOG_POST(Info << NCBI_CURRENT_FUNCTION); 00645 return m_TimeStampFlag; 00646 } 00647 00648 00649 int CSQLITE3_Cache::GetTimeout() const 00650 { 00651 //LOG_POST(Info << NCBI_CURRENT_FUNCTION); 00652 return m_Timeout; 00653 } 00654 00655 00656 void CSQLITE3_Cache::SetVersionRetention(EKeepVersions policy) 00657 { 00658 //LOG_POST(Info << NCBI_CURRENT_FUNCTION); 00659 m_VersionFlag = policy; 00660 } 00661 00662 00663 CSQLITE3_Cache::EKeepVersions CSQLITE3_Cache::GetVersionRetention() const 00664 { 00665 //LOG_POST(Info << NCBI_CURRENT_FUNCTION); 00666 return m_VersionFlag; 00667 } 00668 00669 00670 void CSQLITE3_Cache::x_SetTimestamp(const string& key, 00671 int version, 00672 const string& subkey) 00673 { 00674 CMutexGuard LOCK(m_Mutex); 00675 time_t time = CTime(CTime::eCurrent).GetTimeT(); 00676 00677 if ( !m_Stmt_SetTimestamp.get() ) { 00678 string sql = 00679 "UPDATE CacheBlobs SET timestamp = ?1 WHERE " 00680 "key = ?2 AND version = ?3 AND subkey = ?4"; 00681 //s_MakeKeyCondition(key, version, subkey, &sql); 00682 m_Stmt_SetTimestamp.reset(new CSQLITE3_Statement(m_DB, sql)); 00683 } else { 00684 m_Stmt_SetTimestamp->Reset(); 00685 } 00686 00687 //CSQLITE3_Statement stmt2(m_DB, sql); 00688 m_Stmt_SetTimestamp->Bind(1, int(time)); 00689 m_Stmt_SetTimestamp->Bind(2, key); 00690 m_Stmt_SetTimestamp->Bind(3, version); 00691 m_Stmt_SetTimestamp->Bind(4, subkey); 00692 if ( !m_Stmt_SetTimestamp->Execute() ) { 00693 LOG_POST(Error << "failed to update timestamp on cache blob: " 00694 << "\"" << key << "\", " << version << ", \"" 00695 << subkey << "\": " << sqlite3_errmsg(m_DB)); 00696 } else { 00697 /** 00698 LOG_POST(Info 00699 << "\"" << key << "\", " << version << ", \"" 00700 << subkey << "\": timestamp=" << time); 00701 **/ 00702 } 00703 } 00704 00705 void CSQLITE3_Cache::Store(const string& key, 00706 int version, 00707 const string& subkey, 00708 const void* data, 00709 size_t size, 00710 unsigned int time_to_live, 00711 const string& owner) 00712 { 00713 // 00714 // prepare our write reque 00715 CRef<SWriteRequest> req(new SWriteRequest); 00716 req->key = key; 00717 req->version = version; 00718 req->subkey = subkey; 00719 req->buffer.resize(size); 00720 memcpy(&req->buffer[0], data, size); 00721 00722 // push to our queue 00723 m_WriteQueue.Push(req); 00724 } 00725 00726 00727 00728 void CSQLITE3_Cache::StoreSynchronous(const string& key, 00729 int version, 00730 const string& subkey, 00731 const void* data, 00732 size_t size) 00733 { 00734 CMutexGuard LOCK(m_Mutex); 00735 00736 /** 00737 LOG_POST(Info << NCBI_CURRENT_FUNCTION 00738 << "(\"" << key << "\", " << version << ", \"" 00739 << subkey << "\", " << data << ", " << size << "): thread=" 00740 << CThread::GetSelf()); 00741 **/ 00742 _ASSERT(m_DB); 00743 00744 if (m_VersionFlag == eDropAll || m_VersionFlag == eDropOlder) { 00745 Purge(key, subkey, 0, m_VersionFlag); 00746 } 00747 00748 string sql; 00749 int ret = 0; 00750 time_t timestamp = CTime(CTime::eCurrent).GetTimeT(); 00751 00752 // 00753 // insert the row into cache data 00754 // we scan first to see if the row already exists 00755 // 00756 if ( !m_Stmt_Store.get() ) { 00757 m_Stmt_Store.reset(new CSQLITE3_Statement(m_DB, 00758 "INSERT OR REPLACE INTO CacheBlobs (key, version, subkey, timestamp, data) " 00759 "VALUES( ?1, ?2, ?3, ?4, ?5 )")); 00760 } 00761 00762 m_Stmt_Store->Reset(); 00763 m_Stmt_Store->Bind(1, key); 00764 m_Stmt_Store->Bind(2, version); 00765 m_Stmt_Store->Bind(3, subkey); 00766 m_Stmt_Store->Bind(4, int(timestamp)); 00767 m_Stmt_Store->Bind(5, data, size); 00768 00769 // execute... 00770 if ( (ret = m_Stmt_Store->Step()) != SQLITE_DONE) { 00771 LOG_POST(Error << "failed to write " << size << " bytes: " 00772 << sql << ": [" << ret << "] " << sqlite3_errmsg(m_DB)); 00773 } 00774 } 00775 00776 00777 size_t CSQLITE3_Cache::GetSize(const string& key, 00778 int version, 00779 const string& subkey) 00780 { 00781 LOG_POST(Info << NCBI_CURRENT_FUNCTION); 00782 00783 string sql = "SELECT data FROM CacheBlobs WHERE "; 00784 s_MakeKeyCondition(key, version, subkey, &sql); 00785 CSQLITE3_Statement stmt(m_DB, sql); 00786 if (stmt.Step() == SQLITE_ROW) { 00787 return sqlite3_column_bytes(stmt.GetStatement(), 0); 00788 } 00789 00790 return 0; 00791 } 00792 00793 00794 bool CSQLITE3_Cache::Read(const string& key, 00795 int version, 00796 const string& subkey, 00797 void* buf, 00798 size_t buf_size) 00799 { 00800 LOG_POST(Info << NCBI_CURRENT_FUNCTION); 00801 00802 string sql = "SELECT data FROM CacheBlobs WHERE "; 00803 s_MakeKeyCondition(key, version, subkey, &sql); 00804 00805 CSQLITE3_Statement stmt(m_DB, sql); 00806 if (stmt.Step() == SQLITE_ROW) { 00807 size_t size = sqlite3_column_bytes(stmt.GetStatement(), 0); 00808 size = min(size, buf_size); 00809 memcpy(buf, 00810 sqlite3_column_blob(stmt.GetStatement(), 0), 00811 size); 00812 00813 /// set timestamp 00814 if (m_TimeStampFlag & fTimeStampOnRead) { 00815 x_SetTimestamp(key, version, subkey); 00816 } 00817 return true; 00818 } 00819 00820 return false; 00821 } 00822 00823 00824 IReader* CSQLITE3_Cache::GetReadStream(const string& key, 00825 int version, 00826 const string& subkey) 00827 { 00828 CMutexGuard LOCK(m_Mutex); 00829 //LOG_POST(Info << NCBI_CURRENT_FUNCTION << "(" << key << ", " << version << ", " << subkey << ")"); 00830 00831 // 00832 // retrieve our row and create a blob reader 00833 // FIXME: use incremental I/O in the future 00834 // 00835 if ( !m_Stmt_GetReadStream.get() ) { 00836 string sql = 00837 "SELECT data FROM CacheBlobs WHERE " 00838 "key = ?1 AND version = ?2 AND subkey = ?3" 00839 ; 00840 m_Stmt_GetReadStream.reset(new CSQLITE3_Statement(m_DB, sql)); 00841 } else { 00842 m_Stmt_GetReadStream->Reset(); 00843 } 00844 //s_MakeKeyCondition(key, version, subkey, &sql); 00845 //CSQLITE3_Statement stmt(m_DB, sql); 00846 m_Stmt_GetReadStream->Bind(1, key); 00847 m_Stmt_GetReadStream->Bind(2, version); 00848 m_Stmt_GetReadStream->Bind(3, subkey); 00849 if (m_Stmt_GetReadStream->Step() == SQLITE_ROW) { 00850 auto_ptr<IReader> reader(GetBlobReader(*m_Stmt_GetReadStream, 0)); 00851 00852 /// set timestamp 00853 if (m_TimeStampFlag & fTimeStampOnRead) { 00854 x_SetTimestamp(key, version, subkey); 00855 } 00856 return reader.release(); 00857 } 00858 00859 return NULL; 00860 } 00861 00862 IReader* CSQLITE3_Cache::GetReadStream( 00863 const string& /*key*/, 00864 const string& /*subkey*/, 00865 int* /*version*/, 00866 EBlobValidity* /*validity*/) 00867 { 00868 // ICache last valid version protocol is not implemented in GBench 00869 // (this is ok it is optimization only important for network cache) 00870 NCBI_THROW(CSQLITE3_ICacheException, eNotImplemented, 00871 "CSQLITE3_Cache::GetReadStream(key, subkey, version, validity) " 00872 "is not implemented"); 00873 } 00874 00875 void CSQLITE3_Cache::SetBlobVersionAsValid(const string& /* key */, 00876 const string& /* subkey */, 00877 int /* version */) 00878 { 00879 // ICache last valid version protocol is not implemented in GBench 00880 // (this is ok it is optimization only important for network cache) 00881 NCBI_THROW(CSQLITE3_ICacheException, eNotImplemented, 00882 "CSQLITE3_Cache::SetBlobVersionAsValid(key, subkey, version) " 00883 "is not implemented"); 00884 } 00885 00886 00887 IWriter* CSQLITE3_Cache::GetWriteStream(const string& key, 00888 int version, 00889 const string& subkey, 00890 unsigned int time_to_live, 00891 const string& owner) 00892 { 00893 //LOG_POST(Info << NCBI_CURRENT_FUNCTION << "(" << key << ", " << version << ", " << subkey << ", " << time_to_live << ", " << owner << ")"); 00894 00895 class CBlobWriter : public IWriter 00896 { 00897 public: 00898 CBlobWriter(CSQLITE3_Cache& cache, 00899 const string& key, 00900 int version, 00901 const string& subkey) 00902 : m_Cache(cache) 00903 , m_Key(key) 00904 , m_Version(version) 00905 , m_Subkey(subkey) 00906 , m_Flushed(false) 00907 { 00908 } 00909 00910 ~CBlobWriter() 00911 { 00912 if ( !m_Flushed ) { 00913 Flush(); 00914 } 00915 } 00916 00917 ERW_Result Write(const void* buf, 00918 size_t count, 00919 size_t* bytes_written = 0) 00920 { 00921 if ( !m_Flushed ) { 00922 m_Data.insert(m_Data.end(), 00923 (const unsigned char*)(buf), 00924 (const unsigned char*)(buf) + count); 00925 if (bytes_written) { 00926 *bytes_written = count; 00927 } 00928 return eRW_Success; 00929 } else { 00930 return eRW_Error; 00931 } 00932 } 00933 00934 ERW_Result Flush() 00935 { 00936 if (m_Data.size()) { 00937 m_Cache.Store(m_Key, m_Version, m_Subkey, 00938 &m_Data[0], m_Data.size()); 00939 //LOG_POST(Info << NCBI_CURRENT_FUNCTION << "(): wrote " << m_Data.size() << " bytes"); 00940 } 00941 m_Flushed = true; 00942 return eRW_Success; 00943 } 00944 00945 private: 00946 CSQLITE3_Cache& m_Cache; 00947 string m_Key; 00948 int m_Version; 00949 string m_Subkey; 00950 vector<unsigned char> m_Data; 00951 bool m_Flushed; 00952 }; 00953 00954 return new CBlobWriter(*this, key, version, subkey); 00955 } 00956 00957 00958 void CSQLITE3_Cache::Remove(const string& key) 00959 { 00960 //LOG_POST(Info << NCBI_CURRENT_FUNCTION); 00961 00962 // create transaction 00963 string sql = "DELETE FROM CacheBlobs WHERE key = '"; 00964 sql += key; 00965 sql += "'"; 00966 CSQLITE3_Statement stmt(m_DB, sql); 00967 stmt.Execute(); 00968 } 00969 00970 00971 void CSQLITE3_Cache::Remove(const string& key, 00972 int version, 00973 const string& subkey) 00974 { 00975 //LOG_POST(Info << NCBI_CURRENT_FUNCTION); 00976 00977 /// create transaction 00978 string sql = "DELETE FROM CacheBlobs WHERE "; 00979 s_MakeKeyCondition(key, version, subkey, &sql); 00980 CSQLITE3_Statement stmt(m_DB, sql); 00981 stmt.Execute(); 00982 } 00983 00984 00985 time_t CSQLITE3_Cache::GetAccessTime(const string& key, 00986 int version, 00987 const string& subkey) 00988 { 00989 LOG_POST(Info << NCBI_CURRENT_FUNCTION); 00990 00991 string sql = "SELECT timestamp FROM CacheBlobs WHERE "; 00992 s_MakeKeyCondition(key, version, subkey, &sql); 00993 CSQLITE3_Statement stmt(m_DB, sql); 00994 if (stmt.Step() == SQLITE_ROW) { 00995 return stmt.GetInt(0); 00996 } 00997 00998 return 0; 00999 } 01000 01001 01002 void CSQLITE3_Cache::Purge(time_t access_timeout, 01003 EKeepVersions keep_last_version) 01004 { 01005 LOG_POST(Info << NCBI_CURRENT_FUNCTION); 01006 01007 if (keep_last_version == eDropAll && access_timeout == 0) { 01008 //x_TruncateDB(); 01009 return; 01010 } 01011 01012 CTime time_stamp(CTime::eCurrent); 01013 time_t curr = time_stamp.GetTimeT(); 01014 int timeout = GetTimeout(); 01015 curr -= timeout; 01016 01017 string sql = "DELETE FROM CacheBlobs WHERE timestamp < ?1"; 01018 CSQLITE3_Statement stmt(m_DB, sql); 01019 stmt.Bind(1, int(curr)); 01020 if (stmt.Step() == SQLITE_DONE) { 01021 int count = sqlite3_changes(m_DB); 01022 LOG_POST(Info << "CSQLITE3_Cache::Purge(): " 01023 << count << " items purged"); 01024 } 01025 } 01026 01027 01028 void CSQLITE3_Cache::Purge(const string& key, 01029 const string& subkey, 01030 time_t access_timeout, 01031 EKeepVersions keep_last_version) 01032 { 01033 LOG_POST(Info << NCBI_CURRENT_FUNCTION); 01034 01035 if (keep_last_version == eDropAll && access_timeout == 0) { 01036 //x_TruncateDB(); 01037 return; 01038 } 01039 01040 CTime time_stamp(CTime::eCurrent); 01041 time_t curr = time_stamp.GetTimeT(); 01042 int timeout = GetTimeout(); 01043 curr -= timeout; 01044 01045 string sql = 01046 "DELETE FROM CacheBlobs WHERE " 01047 " timestamp < ?1 "; 01048 01049 if (!key.empty()) { 01050 sql += " AND key = '"; 01051 sql += key; 01052 sql += "'"; 01053 } 01054 01055 if (!subkey.empty()) { 01056 sql += " AND subkey = '"; 01057 sql += subkey; 01058 sql += "'"; 01059 } 01060 CSQLITE3_Statement stmt(m_DB, sql); 01061 stmt.Bind(1, int(curr)); 01062 if (stmt.Step() == SQLITE_DONE) { 01063 int count = sqlite3_changes(m_DB); 01064 LOG_POST(Info << "CSQLITE3_Cache::Purge(): " 01065 << count << " items purged"); 01066 } 01067 } 01068 01069 01070 bool CSQLITE3_Cache::SameCacheParams(const TCacheParams* params) const 01071 { 01072 if ( !params ) { 01073 return false; 01074 } 01075 const TCacheParams* driver = params->FindNode("driver"); 01076 if (!driver || driver->GetValue().value != kSQLITE3_BlobCacheDriverName) { 01077 return false; 01078 } 01079 const TCacheParams* driver_params = 01080 params->FindNode(kSQLITE3_BlobCacheDriverName); 01081 if ( !driver_params ) { 01082 return false; 01083 } 01084 const TCacheParams* path = driver_params->FindNode("database"); 01085 if (!path) { 01086 return false; 01087 } 01088 const string& database = path->GetValue().value; 01089 01090 string base1, base2; 01091 CDirEntry::SplitPath(database, 0, &base1, 0); 01092 CDirEntry::SplitPath(m_Database, 0, &base2, 0); 01093 01094 if (base1 == base2) 01095 return true; 01096 return false; 01097 } 01098 01099 01100 bool CSQLITE3_Cache::HasBlobs(const string& key, 01101 const string& subkey) 01102 { 01103 CMutexGuard LOCK(m_Mutex); 01104 //LOG_POST(Info << NCBI_CURRENT_FUNCTION << "(\"" << key << "\", \"" << subkey << "\")"); 01105 01106 CSQLITE3_Statement* stmt = NULL; 01107 if ( !m_Stmt_HasBlobs_key.get() ) { 01108 m_Stmt_HasBlobs_key.reset 01109 (new CSQLITE3_Statement(m_DB, 01110 "SELECT timestamp FROM CacheBlobs WHERE " 01111 "key = ?1")); 01112 } 01113 01114 if ( !m_Stmt_HasBlobs_key_subkey.get() ) { 01115 m_Stmt_HasBlobs_key_subkey.reset 01116 (new CSQLITE3_Statement(m_DB, 01117 "SELECT timestamp FROM CacheBlobs WHERE " 01118 "key = ?1 AND subkey = ?2")); 01119 } 01120 01121 if (subkey.empty()) { 01122 stmt = &*m_Stmt_HasBlobs_key; 01123 } else { 01124 stmt = &*m_Stmt_HasBlobs_key_subkey; 01125 } 01126 01127 stmt->Reset(); 01128 stmt->Bind(1, key); 01129 if ( !subkey.empty() ) { 01130 stmt->Bind(2, subkey); 01131 } 01132 return (stmt->Step() == SQLITE_ROW); 01133 } 01134 01135 01136 void CSQLITE3_Cache::GetBlobAccess(const string& key, 01137 int version, 01138 const string& subkey, 01139 SBlobAccessDescr* blob_descr) 01140 { 01141 CMutexGuard LOCK(m_Mutex); 01142 /** 01143 LOG_POST(Info << NCBI_CURRENT_FUNCTION 01144 << "(\"" << key << "\", " << version << ", \"" << subkey << "\"): thread=" 01145 << CThread::GetSelf()); 01146 **/ 01147 01148 blob_descr->reader.reset(); 01149 blob_descr->blob_size = 0; 01150 blob_descr->blob_found = false; 01151 01152 string sql; 01153 time_t time = CTime(CTime::eCurrent).GetTimeT(); 01154 time_t timeout = time - GetTimeout(); 01155 01156 // check to see if the row exists in the cache attributes 01157 // if so, we intend on updating the timestamp to be now 01158 int this_timestamp = 0; 01159 01160 // we have a blob 01161 if ( !m_Stmt_GetBlobAccess.get() ) { 01162 sql = 01163 "SELECT timestamp, data FROM CacheBlobs WHERE " 01164 "key = ?1 AND version = ?2 AND subkey = ?3"; 01165 //s_MakeKeyCondition(key, version, subkey, &sql); 01166 m_Stmt_GetBlobAccess.reset(new CSQLITE3_Statement(m_DB, sql)); 01167 } else { 01168 m_Stmt_GetBlobAccess->Reset(); 01169 } 01170 01171 m_Stmt_GetBlobAccess->Bind(1, key); 01172 m_Stmt_GetBlobAccess->Bind(2, version); 01173 m_Stmt_GetBlobAccess->Bind(3, subkey); 01174 if (m_Stmt_GetBlobAccess->Step() == SQLITE_ROW) { 01175 // read and process our timestamp 01176 this_timestamp = m_Stmt_GetBlobAccess->GetInt(0); 01177 if (this_timestamp < timeout) { 01178 Remove(key, version, subkey); 01179 } else { 01180 // read and process our blob 01181 blob_descr->blob_size = 01182 sqlite3_column_bytes(m_Stmt_GetBlobAccess->GetStatement(), 1); 01183 blob_descr->blob_found = true; 01184 01185 if (blob_descr->buf && 01186 blob_descr->buf_size >= blob_descr->blob_size) { 01187 memcpy(blob_descr->buf, 01188 sqlite3_column_blob(m_Stmt_GetBlobAccess->GetStatement(), 1), 01189 blob_descr->blob_size); 01190 //LOG_POST(Info << " direct copy: " << blob_descr->blob_size << " bytes"); 01191 } else { 01192 blob_descr->reader.reset(GetBlobReader(*m_Stmt_GetBlobAccess, 1)); 01193 /** 01194 LOG_POST(Info << " stream read: " << blob_descr->blob_size 01195 << " bytes / " << blob_descr->buf_size << " bytes in buffer"); 01196 **/ 01197 } 01198 } 01199 01200 // set timestamp 01201 if (m_TimeStampFlag & fTimeStampOnRead) { 01202 x_SetTimestamp(key, version, subkey); 01203 } 01204 } 01205 } 01206 01207 01208 /// @name unimplemented 01209 /// @{ 01210 void CSQLITE3_Cache::GetBlobOwner(const string& key, 01211 int version, 01212 const string& subkey, 01213 string* owner) 01214 { 01215 LOG_POST(Info << NCBI_CURRENT_FUNCTION); 01216 _ASSERT(owner); 01217 owner->erase(); // not supported in this implementation 01218 } 01219 01220 void CSQLITE3_Cache::SetMemBufferSize(unsigned int buf_size) 01221 { 01222 /// noop 01223 } 01224 01225 01226 /// @} 01227 01228 END_NCBI_SCOPE
1.7.5.1
Modified on Wed May 23 13:10:38 2012 by modify_doxy.py rev. 337098