src/app/netschedule/ns_queue.cpp

Go to the documentation of this file.
00001 /*  $Id: ns_queue.cpp 177761 2009-12-03 22:15:58Z joukovv $
00002  * ===========================================================================
00003  *
00004  *                            PUBLIC DOMAIN NOTICE
00005  *               National Center for Biotechnology Information
00006  *
00007  *  This software/database is a "United States Government Work" under the
00008  *  terms of the United States Copyright Act.  It was written as part of
00009  *  the author's official duties as a United States Government employee and
00010  *  thus cannot be copyrighted.  This software/database is freely available
00011  *  to the public for use. The National Library of Medicine and the U.S.
00012  *  Government have not placed any restriction on its use or reproduction.
00013  *
00014  *  Although all reasonable efforts have been taken to ensure the accuracy
00015  *  and reliability of the software and data, the NLM and the U.S.
00016  *  Government do not and cannot warrant the performance or results that
00017  *  may be obtained by using this software or data. The NLM and the U.S.
00018  *  Government disclaim all warranties, express or implied, including
00019  *  warranties of performance, merchantability or fitness for any particular
00020  *  purpose.
00021  *
00022  *  Please cite the author in any work or product based on this material.
00023  *
00024  * ===========================================================================
00025  *
00026  * Authors:  Victor Joukov
00027  *
00028  * File Description:
00029  *   NetSchedule queue structure and parameters
00030  */
00031 #include <ncbi_pch.hpp>
00032 
00033 #include "ns_queue.hpp"
00034 #include "background_host.hpp"
00035 #include "ns_util.hpp"
00036 #include "ns_format.hpp"
00037 
00038 #include <corelib/ncbi_system.hpp> // SleepMilliSec
00039 #include <corelib/request_ctx.hpp>
00040 #include <db/bdb/bdb_trans.hpp>
00041 #include <util/qparse/query_parse.hpp>
00042 #include <util/qparse/query_exec.hpp>
00043 #include <util/qparse/query_exec_bv.hpp>
00044 #include <util/bitset/bmalgo.h>
00045 
00046 BEGIN_NCBI_SCOPE
00047 
00048 
00049 //////////////////////////////////////////////////////////////////////////
00050 // SQueueDbBlock
00051 
00052 void SQueueDbBlock::Open(CBDB_Env& env, const string& path, int pos_)
00053 {
00054     pos = pos_;
00055     string prefix = string("jsq_") + NStr::IntToString(pos);
00056     allocated = false;
00057     bool group_tables_for_queue = false;
00058     try {
00059         string fname = prefix + ".db";
00060         string tname = "";
00061         job_db.SetEnv(env);
00062         // TODO: RevSplitOff make sense only for long living queues,
00063         // for dynamic ones it slows down the process, but because queue
00064         // if eventually is disposed of, it does not make sense to save
00065         // space here
00066         job_db.RevSplitOff();
00067         if (group_tables_for_queue) tname = "job";
00068         job_db.Open(fname, tname, CBDB_RawFile::eReadWriteCreate);
00069 
00070         if (group_tables_for_queue)
00071             tname = "jobinfo";
00072         else
00073             fname = prefix + "_jobinfo.db";
00074         job_info_db.SetEnv(env);
00075         job_info_db.Open(fname, tname, CBDB_RawFile::eReadWriteCreate);
00076 
00077         if (group_tables_for_queue)
00078             tname = "runs";
00079         else
00080             fname = prefix + "_runs.db";
00081         runs_db.SetEnv(env);
00082         runs_db.Open(fname, tname, CBDB_RawFile::eReadWriteCreate);
00083 
00084         if (group_tables_for_queue)
00085             tname = "deleted";
00086         else
00087             fname = prefix + "_deleted.db";
00088         deleted_jobs_db.SetEnv(env);
00089         deleted_jobs_db.Open(fname, tname, CBDB_RawFile::eReadWriteCreate);
00090 
00091         if (group_tables_for_queue)
00092             tname = "affid";
00093         else
00094             fname = prefix + "_affid.idx";
00095         affinity_idx.SetEnv(env);
00096         affinity_idx.Open(fname, tname, CBDB_RawFile::eReadWriteCreate);
00097 
00098         if (group_tables_for_queue)
00099             tname = "affdict";
00100         else
00101             fname = prefix + "_affdict.db";
00102         aff_dict_db.SetEnv(env);
00103         aff_dict_db.Open(fname, tname, CBDB_RawFile::eReadWriteCreate);
00104 
00105         if (group_tables_for_queue)
00106             tname = "affdict_token";
00107         else
00108             fname = prefix + "_affdict_token.idx";
00109         aff_dict_token_idx.SetEnv(env);
00110         aff_dict_token_idx.Open(fname, tname, CBDB_RawFile::eReadWriteCreate);
00111 
00112         if (group_tables_for_queue)
00113             tname = "tag";
00114         else
00115             fname = prefix + "_tag.idx";
00116         tag_db.SetEnv(env);
00117         tag_db.SetPageSize(32*1024);
00118         tag_db.RevSplitOff();
00119         tag_db.Open(fname, tname, CBDB_RawFile::eReadWriteCreate);
00120 
00121     } catch (CBDB_ErrnoException&) {
00122         throw;
00123     }
00124 }
00125 
00126 
00127 void SQueueDbBlock::Close()
00128 {
00129     tag_db.Close();
00130     aff_dict_token_idx.Close();
00131     aff_dict_db.Close();
00132     affinity_idx.Close();
00133     deleted_jobs_db.Close();
00134     runs_db.Close();
00135     job_info_db.Close();
00136     job_db.Close();
00137 }
00138 
00139 
00140 void SQueueDbBlock::Truncate()
00141 {
00142     tag_db.SafeTruncate();
00143     aff_dict_token_idx.SafeTruncate();
00144     aff_dict_db.SafeTruncate();
00145     affinity_idx.SafeTruncate();
00146     deleted_jobs_db.SafeTruncate();
00147     runs_db.SafeTruncate();
00148     job_info_db.SafeTruncate();
00149     job_db.SafeTruncate();
00150 
00151     CBDB_Env& env = *job_db.GetEnv();
00152     env.ForceTransactionCheckpoint();
00153     env.CleanLog();
00154 }
00155 
00156 //////////////////////////////////////////////////////////////////////////
00157 // CQueueDbBlockArray
00158 
00159 CQueueDbBlockArray::CQueueDbBlockArray()
00160   : m_Count(0), m_Array(0)
00161 {
00162 };
00163 
00164 
00165 CQueueDbBlockArray::~CQueueDbBlockArray()
00166 {
00167 }
00168 
00169 
00170 void CQueueDbBlockArray::Init(CBDB_Env& env, const string& path,
00171                              unsigned count)
00172 {
00173     m_Count = count;
00174     m_Array = new SQueueDbBlock[m_Count];
00175     for (unsigned n = 0; n < m_Count; ++n) {
00176         m_Array[n].Open(env, path, n);
00177     }
00178 }
00179 
00180 
00181 void CQueueDbBlockArray::Close()
00182 {
00183     for (unsigned n = 0; n < m_Count; ++n) {
00184         m_Array[n].Close();
00185     }
00186     delete [] m_Array;
00187     m_Array = 0;
00188     m_Count = 0;
00189 }
00190 
00191 
00192 int CQueueDbBlockArray::Allocate()
00193 {
00194     for (unsigned n = 0; n < m_Count; ++n) {
00195         if (!m_Array[n].allocated) {
00196             m_Array[n].allocated = true;
00197             return n;
00198         }
00199     }
00200     return -1;
00201 }
00202 
00203 
00204 bool CQueueDbBlockArray::Allocate(int pos)
00205 {
00206     if (pos < 0  || pos >= int(m_Count)) return false;
00207     if (m_Array[pos].allocated) return false;
00208     m_Array[pos].allocated = true;
00209     return true;
00210 }
00211 
00212 
00213 SQueueDbBlock* CQueueDbBlockArray::Get(int pos)
00214 {
00215     if (pos < 0 || unsigned(pos) >= m_Count) return 0;
00216     return &m_Array[pos];
00217 }
00218 
00219 //////////////////////////////////////////////////////////////////////////
00220 // SQueueParameters
00221 
00222 void SQueueParameters::Read(const IRegistry& reg, const string& sname)
00223 {
00224     // When modifying this, modify all places marked with PARAMETERS
00225 #define GetIntNoErr(name, dflt) reg.GetInt(sname, name, dflt, 0, IRegistry::eReturn)
00226     // Read parameters
00227     timeout = GetIntNoErr("timeout", 3600);
00228 
00229     notif_timeout = GetIntNoErr("notif_timeout", 7);
00230     run_timeout = GetIntNoErr("run_timeout", timeout);
00231     run_timeout_precision = GetIntNoErr("run_timeout_precision", run_timeout);
00232 
00233     program_name = reg.GetString(sname, "program", kEmptyStr);
00234 
00235     delete_done = reg.GetBool(sname, "delete_done", false,
00236         0, IRegistry::eReturn);
00237 
00238     failed_retries = GetIntNoErr("failed_retries", 0);
00239 
00240     blacklist_time = GetIntNoErr("blacklist_time", 0);
00241 
00242     empty_lifetime = GetIntNoErr("empty_lifetime", -1);
00243 
00244     string s = reg.GetString(sname, "max_input_size", kEmptyStr);
00245     max_input_size = kNetScheduleMaxDBDataSize;
00246     try {
00247         max_input_size = (unsigned) NStr::StringToUInt8_DataSize(s);
00248     }
00249     catch (CStringException&) {}
00250     max_input_size = min(kNetScheduleMaxOverflowSize, max_input_size);
00251 
00252     s =  reg.GetString(sname, "max_output_size", kEmptyStr);
00253     max_output_size = kNetScheduleMaxDBDataSize;
00254     try {
00255         max_output_size = (unsigned) NStr::StringToUInt8_DataSize(s);
00256     }
00257     catch (CStringException&) {}
00258     max_output_size = min(kNetScheduleMaxOverflowSize, max_output_size);
00259 
00260     deny_access_violations = reg.GetBool(sname, "deny_access_violations", false,
00261         0, IRegistry::eReturn);
00262     log_access_violations  = reg.GetBool(sname, "log_access_violations", true,
00263         0, IRegistry::eReturn);
00264     log_job_state          = GetIntNoErr("log_job_state", 0);
00265 
00266     subm_hosts = reg.GetString(sname,  "subm_host",  kEmptyStr);
00267     wnode_hosts = reg.GetString(sname, "wnode_host", kEmptyStr);
00268 }
00269 
00270 
00271 //////////////////////////////////////////////////////////////////////////
00272 // CNSTagMap
00273 
00274 CNSTagMap::CNSTagMap()
00275 {
00276 }
00277 
00278 
00279 CNSTagMap::~CNSTagMap()
00280 {
00281     NON_CONST_ITERATE(TNSTagMap, it, m_TagMap) {
00282         if (it->second) {
00283             delete it->second;
00284             it->second = 0;
00285         }
00286     }
00287 }
00288 
00289 
00290 //////////////////////////////////////////////////////////////////////////
00291 CQueueEnumCursor::CQueueEnumCursor(CQueue* queue)
00292   : CBDB_FileCursor(queue->m_JobDB)
00293 {
00294     SetCondition(CBDB_FileCursor::eFirst);
00295 }
00296 
00297 
00298 //////////////////////////////////////////////////////////////////////////
00299 // CQueue
00300 
00301 CQueue::CQueue(CRequestExecutor& executor,
00302                const string&     queue_name,
00303                const string&     qclass_name,
00304                TQueueKind        queue_kind)
00305   :
00306     m_RunTimeLine(NULL),
00307     m_HasNotificationPort(false),
00308     m_DeleteDatabase(false),
00309     m_Executor(executor),
00310     m_QueueName(queue_name),
00311     m_QueueClass(qclass_name),
00312     m_Kind(queue_kind),
00313     m_QueueDbBlock(0),
00314 
00315     m_BecameEmpty(-1),
00316     m_WorkerNodeList(queue_name),
00317     m_AffWrapped(false),
00318     m_CurrAffId(0),
00319     m_LastAffId(0),
00320 
00321     m_ParamLock(CRWLock::fFavorWriters),
00322     m_Timeout(3600),
00323     m_NotifyTimeout(7),
00324     m_DeleteDone(false),
00325     m_RunTimeout(3600),
00326     m_RunTimeoutPrecision(-1),
00327     m_FailedRetries(0),
00328     m_BlacklistTime(0),
00329     m_EmptyLifetime(0),
00330     m_MaxInputSize(kNetScheduleMaxDBDataSize),
00331     m_MaxOutputSize(kNetScheduleMaxDBDataSize),
00332     m_DenyAccessViolations(false),
00333     m_LogAccessViolations(true),
00334     m_LogJobState(0)
00335 {
00336     _ASSERT(!queue_name.empty());
00337     for (TStatEvent n = 0; n < eStatNumEvents; ++n) {
00338         m_EventCounter[n].Set(0);
00339         m_Average[n] = 0;
00340     }
00341     m_StatThread.Reset(new CStatisticsThread(*this));
00342     m_StatThread->Run();
00343     m_LastId.Set(0);
00344     m_GroupLastId.Set(0);
00345 }
00346 
00347 CQueue::~CQueue()
00348 {
00349     delete m_RunTimeLine;
00350     Detach();
00351     m_StatThread->RequestStop();
00352     m_StatThread->Join(NULL);
00353 }
00354 
00355 
00356 void CQueue::Attach(SQueueDbBlock* block)
00357 {
00358     Detach();
00359     m_QueueDbBlock = block;
00360     m_AffinityDict.Attach(&m_QueueDbBlock->aff_dict_db,
00361                           &m_QueueDbBlock->aff_dict_token_idx);
00362 }
00363 
00364 class CTruncateRequest : public CStdRequest
00365 {
00366 public:
00367     CTruncateRequest(SQueueDbBlock* qdbblock)
00368         : m_QueueDbBlock(qdbblock)
00369     {}
00370     virtual void Process() {
00371         // See CQueue::Detach why we can do this without locking.
00372         CStopWatch sw(CStopWatch::eStart);
00373         m_QueueDbBlock->Truncate();
00374         m_QueueDbBlock->allocated = false;
00375         LOG_POST(Info << "Clean up of db block "
00376                       << m_QueueDbBlock->pos
00377                       << " complete, "
00378                       << sw.Elapsed()
00379                       << " elapsed");
00380     }
00381 private:
00382     SQueueDbBlock* m_QueueDbBlock;
00383 
00384 };
00385 
00386 
00387 void CQueue::Detach()
00388 {
00389     // We are here have synchronized access to m_QueueDbBlock without mutex
00390     // because we are here only when the last reference to CQueue is
00391     // destroyed. So as long m_QueueDbBlock->allocated is true it cannot
00392     // be allocated again and thus cannot be accessed as well.
00393     // As soon as we're done with detaching (here) or truncating (in separate
00394     // request, submitted for execution to the server thread pool), we can
00395     // set m_QueueDbBlock->allocated to false. Boolean write is atomic by
00396     // definition and test-and-set is executed from synchronized code in
00397     // CQueueDbBlockArray::Allocate.
00398     m_AffinityDict.Detach();
00399     if (!m_QueueDbBlock) return;
00400     if (m_DeleteDatabase) {
00401         CRef<CStdRequest> request(new CTruncateRequest(m_QueueDbBlock));
00402         m_Executor.SubmitRequest(request);
00403         m_DeleteDatabase = false;
00404     } else 
00405         m_QueueDbBlock->allocated = false;
00406     m_QueueDbBlock = 0;
00407 }
00408 
00409 
00410 void CQueue::SetParameters(const SQueueParameters& params)
00411 {
00412     // When modifying this, modify all places marked with PARAMETERS
00413     CWriteLockGuard guard(m_ParamLock);
00414     m_Timeout = params.timeout;
00415     m_NotifyTimeout = params.notif_timeout;
00416     m_DeleteDone = params.delete_done;
00417 
00418     m_RunTimeout = params.run_timeout;
00419     if (params.run_timeout && !m_RunTimeLine) {
00420         // One time only. Precision can not be reset.
00421         m_RunTimeLine =
00422             new CJobTimeLine(params.run_timeout_precision, 0);
00423         m_RunTimeoutPrecision = params.run_timeout_precision;
00424     }
00425 
00426     m_FailedRetries = params.failed_retries;
00427     m_BlacklistTime = params.blacklist_time;
00428     m_EmptyLifetime = params.empty_lifetime;
00429     m_MaxInputSize  = params.max_input_size;
00430     m_MaxOutputSize = params.max_output_size;
00431     m_DenyAccessViolations = params.deny_access_violations;
00432     m_LogAccessViolations  = params.log_access_violations;
00433     m_LogJobState          = params.log_job_state;
00434     // program version control
00435     m_ProgramVersionList.Clear();
00436     if (!params.program_name.empty()) {
00437         m_ProgramVersionList.AddClientInfo(params.program_name);
00438     }
00439     m_SubmHosts.SetHosts(params.subm_hosts);
00440     m_WnodeHosts.SetHosts(params.wnode_hosts);
00441 }
00442 
00443 
00444 CQueue::TParameterList CQueue::GetParameters() const
00445 {
00446     TParameterList parameters;
00447     CQueueParamAccessor qp(*this);
00448     unsigned nParams = qp.GetNumParams();
00449     for (unsigned n = 0; n < nParams; ++n) {
00450         parameters.push_back(
00451             pair<string, string>(qp.GetParamName(n), qp.GetParamValue(n)));
00452     }
00453     return parameters;
00454 }
00455 
00456 
00457 unsigned CQueue::LoadStatusMatrix()
00458 {
00459     unsigned queue_run_timeout = GetRunTimeout();
00460     EBDB_ErrCode err;
00461 
00462     static EVectorId all_ids[] = { eVIJob, eVITag, eVIAffinity };
00463     TNSBitVector all_vects[] =
00464         { m_JobsToDelete, m_DeletedJobs, m_AffJobsToDelete };
00465     for (size_t i = 0; i < sizeof(all_ids) / sizeof(all_ids[0]); ++i) {
00466         m_DeletedJobsDB.id = all_ids[i];
00467         err = m_DeletedJobsDB.ReadVector(&all_vects[i]);
00468         if (err != eBDB_Ok && err != eBDB_NotFound) {
00469             // TODO: throw db error
00470         }
00471         all_vects[i].optimize();
00472     }
00473 
00474     // scan the queue, load the state machine from DB
00475     TNSBitVector running_jobs;
00476     CBDB_FileCursor cur(m_JobDB);
00477     cur.InitMultiFetch(1024*1024);
00478     cur.SetCondition(CBDB_FileCursor::eGE);
00479     cur.From << 0;
00480 
00481     unsigned recs = 0;
00482 
00483     unsigned last_id = 0;
00484     unsigned group_last_id = 0;
00485     for (; cur.Fetch() == eBDB_Ok; ) {
00486         unsigned job_id = m_JobDB.id;
00487         if (m_JobsToDelete.test(job_id)) continue;
00488         int i_status = m_JobDB.status;
00489         if (i_status <  (int) CNetScheduleAPI::ePending ||
00490             i_status >= (int) CNetScheduleAPI::eLastStatus)
00491         {
00492             // Invalid job, skip it
00493             LOG_POST(Warning << "Job " << job_id
00494                              << " has invalid status " << i_status
00495                              << ", ignored.");
00496             continue;
00497         }
00498         TJobStatus status = TJobStatus(i_status);
00499         m_StatusTracker.SetExactStatusNoLock(job_id, status, true);
00500 
00501         if (status == CNetScheduleAPI::eReading) {
00502             unsigned group_id = m_JobDB.read_group;
00503             x_AddToReadGroupNoLock(group_id, job_id);
00504             if (group_last_id < group_id) group_last_id = group_id;
00505         }
00506         if ((status == CNetScheduleAPI::eRunning ||
00507              status == CNetScheduleAPI::eReading) &&
00508             m_RunTimeLine) {
00509             // Add object to the first available slot;
00510             // it is going to be rescheduled or dropped
00511             // in the background control thread
00512             // We can use time line without lock here because
00513             // the queue is still in single-use mode while
00514             // being loaded.
00515             m_RunTimeLine->AddObject(m_RunTimeLine->GetHead(), job_id);
00516         }
00517         if (status == CNetScheduleAPI::eRunning)
00518             running_jobs.set(job_id);
00519         if (last_id < job_id) last_id = job_id;
00520         ++recs;
00521     }
00522     // Recover worker node info using job runs
00523     TNSBitVector::enumerator en(running_jobs.first());
00524     for (; en.valid(); ++en) {
00525         unsigned job_id = *en;
00526         CJob job;
00527         CJob::EJobFetchResult res = job.Fetch(this, job_id);
00528         if (res != CJob::eJF_Ok) {
00529             // TODO: check for db error here
00530             ERR_POST(Error << "Can't read job " << DecorateJobId(job_id)
00531                            << " while loading status matrix");
00532             continue;
00533         }
00534         const CJobRun* run = job.GetLastRun();
00535         if (!run) {
00536             ERR_POST(Error << "No job run for Running job "
00537                            << DecorateJobId(job_id)
00538                            << " while loading status matrix");
00539             continue;
00540         }
00541         if (run->GetStatus() != CNetScheduleAPI::eRunning)
00542             continue;
00543 
00544         // FIXME Most likely, the following is not needed.
00545         if (m_WorkerNodeList.FindJobById(job_id) != NULL)
00546             continue;
00547 
00548         // We don't have auth in job run, but it is not that crucial.
00549         // Either the node is the old style one, and jobs are going to
00550         // be failed (or retried) on another node registration with the
00551         // same host:port, or for the new style nodes with same node id,
00552         // auth will be fixed on the INIT command.
00553         TWorkerNodeRef worker_node(m_WorkerNodeList.FindWorkerNodeByAddress(
00554             run->GetNodeAddr(), run->GetNodePort()));
00555 
00556         if (worker_node == NULL) {
00557             worker_node = new CWorkerNode_Real(run->GetNodeAddr());
00558 
00559             m_WorkerNodeList.RegisterNode(worker_node);
00560             m_WorkerNodeList.SetId(worker_node, run->GetNodeId());
00561             m_WorkerNodeList.SetPort(worker_node, run->GetNodePort());
00562         }
00563 
00564         unsigned run_timeout = job.GetRunTimeout();
00565         if (run_timeout == 0)
00566             run_timeout = queue_run_timeout;
00567         unsigned exp_time = run->GetTimeStart() + run_timeout;
00568 
00569         m_WorkerNodeList.AddJob(worker_node, job, exp_time, 0, false);
00570     }
00571     m_LastId.Set(last_id);
00572     m_GroupLastId.Set(group_last_id);
00573     return recs;
00574 }
00575 
00576 
00577 static void s_LogSubmit(CQueue&            q,
00578                         CJob&              job,
00579                         SQueueDescription& qdesc)
00580 {
00581     CRequestContext& ctx = CDiagContext::GetRequestContext();
00582     if (!job.GetClientIP().empty())
00583         ctx.SetClientIP(job.GetClientIP());
00584     ctx.SetSessionID(job.GetClientSID());
00585 
00586     CDiagContext_Extra extra = GetDiagContext().Extra()
00587         .Print("action", "submit")
00588         .Print("queue", q.GetQueueName())
00589         .Print("job_id", NStr::UIntToString(job.GetId()))
00590         .Print("input", job.GetInput())
00591         .Print("aff", job.GetAffinityToken())
00592         .Print("mask", NStr::UIntToString(job.GetMask()))
00593         .Print("progress_msg", job.GetProgressMsg())
00594         .Print("subm_addr", FormatHostName(job.GetSubmAddr(), &qdesc))
00595         .Print("subm_port", NStr::UIntToString(job.GetSubmPort()))
00596         .Print("subm_timeout", NStr::UIntToString(job.GetSubmTimeout()));
00597     ITERATE(TNSTagList, it, job.GetTags()) {
00598         extra.Print(string("tag_")+it->first, it->second);
00599     }
00600 }
00601 
00602 
00603 unsigned CQueue::Submit(CJob& job)
00604 {
00605     unsigned max_input_size;
00606     unsigned log_job_state;
00607     {{
00608         CQueueParamAccessor qp(*this);
00609         log_job_state = qp.GetLogJobState();
00610         max_input_size = qp.GetMaxInputSize();
00611     }}
00612     if (job.GetInput().size() > max_input_size)
00613         NCBI_THROW(CNetScheduleException, eDataTooLong,
00614         "Input is too long");
00615 
00616     bool was_empty = !m_StatusTracker.AnyPending();
00617 
00618     unsigned job_id = GetNextId();
00619     job.SetId(job_id);
00620     job.SetTimeSubmit(time(0));
00621 
00622     // Special treatment for system job masks
00623     unsigned mask = job.GetMask();
00624     if (mask & CNetScheduleAPI::eOutOfOrder)
00625     {
00626         // NOT IMPLEMENTED YET: put job id into OutOfOrder list.
00627         // The idea is that there can be an urgent job, which
00628         // should be executed before jobs which were submitted
00629         // earlier, e.g. for some administrative purposes. See
00630         // CNetScheduleAPI::EJobMask in file netschedule_api.hpp
00631     }
00632 
00633     CNS_Transaction trans(this);
00634     unsigned affinity_id;
00635     {{
00636         CAffinityDictGuard aff_dict_guard(GetAffinityDict(), trans);
00637         job.CheckAffinityToken(aff_dict_guard);
00638         affinity_id = job.GetAffinityId();
00639         CQueueGuard guard(this, &trans);
00640 
00641         job.Flush(this);
00642         // TODO: move event count to Flush
00643         CountEvent(CQueue::eStatDBWriteEvent, 1);
00644         //            need_aux_insert ? 2 : 1);
00645 
00646         // update affinity index
00647         if (affinity_id)
00648             AddJobsToAffinity(trans, affinity_id, job_id);
00649 
00650         // update tags
00651         CNSTagMap tag_map;
00652         AppendTags(tag_map, job.GetTags(), job_id);
00653         FlushTags(tag_map, trans);
00654     }}
00655 
00656     trans.Commit();
00657     m_StatusTracker.SetStatus(job_id, CNetScheduleAPI::ePending);
00658     if (log_job_state >= 1) {
00659         CRequestContext rq;
00660         rq.SetRequestID(CRequestContext::GetNextRequestID());
00661 
00662         CDiagContext::SetRequestContext(&rq);
00663 
00664         SQueueDescription qdesc;
00665         s_LogSubmit(*this, job, qdesc);
00666         CDiagContext::SetRequestContext(0);
00667     }
00668     if (was_empty) NotifyListeners(true, affinity_id);
00669 
00670     return job_id;
00671 }
00672 
00673 
00674 unsigned CQueue::SubmitBatch(vector<CJob>& batch)
00675 {
00676     unsigned max_input_size;
00677     unsigned log_job_state;
00678     {{
00679         CQueueParamAccessor qp(*this);
00680         log_job_state = qp.GetLogJobState();
00681         max_input_size = qp.GetMaxInputSize();
00682     }}
00683 
00684     SQueueDescription qdesc;
00685 
00686     if (log_job_state >= 1) {
00687         GetDiagContext().PrintRequestStart()
00688             .Print("action", "submit_batch")
00689             .Print("size", NStr::UIntToString(batch.size()));
00690     }
00691 
00692     bool was_empty = !m_StatusTracker.AnyPending();
00693 
00694     unsigned job_id = GetNextIdBatch(batch.size());
00695 
00696     unsigned batch_aff_id = 0; // if batch comes with the same affinity
00697     bool     batch_has_aff = false;
00698 
00699     // process affinity ids
00700     {{
00701         CNS_Transaction trans(this);
00702         CAffinityDictGuard aff_dict_guard(GetAffinityDict(), trans);
00703         for (unsigned i = 0; i < batch.size(); ++i) {
00704             CJob& job = batch[i];
00705             if (job.GetAffinityId() == (unsigned)kMax_I4) { // take prev. token
00706                 _ASSERT(i > 0);
00707                 unsigned prev_aff_id = 0;
00708                 if (i > 0) {
00709                     prev_aff_id = batch[i-1].GetAffinityId();
00710                     if (!prev_aff_id) {
00711                         prev_aff_id = 0;
00712                         LOG_POST(Warning << "Reference to empty previous "
00713                             "affinity token");
00714                     }
00715                 } else {
00716                     LOG_POST(Warning << "First job in batch cannot have "
00717                         "reference to previous affinity token");
00718                 }
00719                 job.SetAffinityId(prev_aff_id);
00720             } else {
00721                 if (job.HasAffinityToken()) {
00722                     job.CheckAffinityToken(aff_dict_guard);
00723                     batch_has_aff = true;
00724                     batch_aff_id = (i == 0 )? job.GetAffinityId() : 0;
00725                 } else {
00726                     batch_aff_id = 0;
00727                 }
00728 
00729             }
00730         }
00731         trans.Commit();
00732     }}
00733 
00734     CNS_Transaction trans(this);
00735     {{
00736         CNSTagMap tag_map;
00737         CQueueGuard guard(this, &trans);
00738 
00739         unsigned job_id_cnt = job_id;
00740         time_t now = time(0);
00741         int aux_inserts = 0;
00742         NON_CONST_ITERATE(vector<CJob>, it, batch) {
00743             if (it->GetInput().size() > max_input_size)
00744                 NCBI_THROW(CNetScheduleException, eDataTooLong,
00745                 "Input is too long");
00746             unsigned cur_job_id = job_id_cnt++;
00747             it->SetId(cur_job_id);
00748             it->SetTimeSubmit(now);
00749             it->Flush(this);
00750 
00751             AppendTags(tag_map, it->GetTags(), cur_job_id);
00752             if (log_job_state >= 2) {
00753                 s_LogSubmit(*this, *it, qdesc);
00754             }
00755         }
00756         CountEvent(CQueue::eStatDBWriteEvent,
00757             batch.size() + aux_inserts);
00758 
00759         // Store the affinity index
00760         if (batch_has_aff) {
00761             if (batch_aff_id) {  // whole batch comes with the same affinity
00762                 AddJobsToAffinity(trans,
00763                     batch_aff_id,
00764                     job_id,
00765                     job_id + batch.size() - 1);
00766             } else {
00767                 AddJobsToAffinity(trans, batch);
00768             }
00769         }
00770         FlushTags(tag_map, trans);
00771     }}
00772     trans.Commit();
00773     m_StatusTracker.AddPendingBatch(job_id, job_id + batch.size() - 1);
00774     if (log_job_state >= 1)
00775         GetDiagContext().PrintRequestStop();
00776 
00777     // This case is a bit complicated. If whole batch has the same
00778     // affinity, we include it in notification broadcast.
00779     // If not, or it has no affinity at all - 0.
00780     if (was_empty) NotifyListeners(true, batch_aff_id);
00781 
00782     return job_id;
00783 }
00784 
00785 static const unsigned k_max_dead_locks = 100;  // max. dead lock repeats
00786 
00787 void CQueue::PutResultGetJob(
00788     CWorkerNode* worker_node,
00789     // PutResult parameters
00790     unsigned         done_job_id,
00791     int              ret_code,
00792     const string*    output,
00793     // GetJob parameters
00794     // in
00795     CRequestContextFactory* rec_ctx_f,
00796     const list<string>* aff_list,
00797     // out
00798     CJob*            new_job)
00799 {
00800     // PutResult parameter check
00801     _ASSERT(!done_job_id || output);
00802     _ASSERT(!new_job || (rec_ctx_f && aff_list));
00803 
00804     bool delete_done;
00805     unsigned max_output_size;
00806     unsigned run_timeout;
00807     {{
00808         CQueueParamAccessor qp(*this);
00809         delete_done = qp.GetDeleteDone();
00810         max_output_size = qp.GetMaxOutputSize();
00811         run_timeout = qp.GetRunTimeout();
00812     }}
00813 
00814     if (done_job_id && output->size() > max_output_size) {
00815         NCBI_THROW(CNetScheduleException, eDataTooLong,
00816             "Output is too long");
00817     }
00818 
00819     unsigned dead_locks = 0; // dead lock counter
00820 
00821     time_t curr = time(0);
00822 
00823     //
00824     bool need_update = false;
00825     CQueueJSGuard js_guard(this, done_job_id,
00826         CNetScheduleAPI::eDone,
00827         &need_update);
00828 
00829     // FIXME: This code detects a case when a node reports results for the job
00830     // which already timeouted for the node and is being executed by another
00831     // node. This attempt succeeds, but the second legitimate result leads to
00832     // this error. It is not correct (but may be useful) to accept results from
00833     // the first node and we should better handle this case.
00834     // I commented out the code to reduce Error log output, but it needs to be
00835     // FIXED for real.
00836     // if (done_job_id  &&  ! need_update) {
00837     //    ERR_POST("Attempt to PUT already Done job "
00838     //             << q->DecorateJobId(done_job_id));
00839     //}
00840 
00841     // This is a HACK - if js_guard is not committed, it will rollback
00842     // to previous state, so it is safe to change status after the guard.
00843     //    if (delete_done) {
00844     //        q->Erase(done_job_id);
00845     //    }
00846     // TODO: implement transaction wrapper (a la js_guard above)
00847     // for FindPendingJob
00848     // TODO: move affinity assignment there as well
00849     unsigned pending_job_id = 0;
00850     if (new_job)
00851         pending_job_id =
00852             FindPendingJob(worker_node, *aff_list, curr);
00853     bool done_rec_updated = false;
00854     CJob job;
00855 
00856     // When working with the same database file concurrently there is
00857     // chance of internal Berkeley DB deadlock. (They say it's legal)
00858     // In this case Berkeley DB returns an error code(DB_LOCK_DEADLOCK)
00859     // and recovery is up to the application.
00860     // If it happens I repeat the transaction several times.
00861     //
00862     for (;;) {
00863         try {
00864             CNS_Transaction trans(this);
00865 
00866             EGetJobUpdateStatus upd_status = eGetJobUpdate_Ok;
00867             {{
00868                 CQueueGuard guard(this, &trans);
00869 
00870                 if (need_update) {
00871                     done_rec_updated = x_UpdateDB_PutResultNoLock(
00872                         done_job_id, curr, delete_done,
00873                         ret_code, *output, job);
00874                 }
00875 
00876                 if (pending_job_id) {
00877                     // NB Synchronized access to worker_node is required inside,
00878                     // and it's under queue lock already. Take out this access
00879                     // it is host, port, node_id read.
00880                     upd_status =
00881                         x_UpdateDB_GetJobNoLock(worker_node,
00882                         curr, pending_job_id,
00883                         *new_job);
00884                 }
00885             }}
00886 
00887             trans.Commit();
00888             js_guard.Commit();
00889             // TODO: commit FindPendingJob guard here
00890             switch (upd_status) {
00891             case eGetJobUpdate_JobFailed:
00892                 m_StatusTracker.ChangeStatus(pending_job_id,
00893                     CNetScheduleAPI::eFailed);
00894                 /* FALLTHROUGH */
00895             case eGetJobUpdate_JobStopped:
00896             case eGetJobUpdate_NotFound:
00897                 pending_job_id = 0;
00898                 break;
00899             case eGetJobUpdate_Ok:
00900                 break;
00901             default:
00902                 _ASSERT(0);
00903             }
00904 
00905             // NB BOTH Remove and Add lock worker node list
00906             if (done_rec_updated)
00907                 RemoveJobFromWorkerNode(job, eNSCDone);
00908             if (pending_job_id)
00909                 AddJobToWorkerNode(worker_node, rec_ctx_f,
00910                 *new_job, curr + run_timeout);
00911             break;
00912         }
00913         catch (CBDB_ErrnoException& ex) {
00914             if (ex.IsDeadLock()) {
00915                 if (++dead_locks < k_max_dead_locks) {
00916                     if (IsMonitoring()) {
00917                         MonitorPost(
00918                             "DeadLock repeat in CQueue::JobExchange");
00919                     }
00920                     SleepMilliSec(250);
00921                     continue;
00922                 }
00923             } else if (ex.IsNoMem()) {
00924                 if (++dead_locks < k_max_dead_locks) {
00925                     if (IsMonitoring()) {
00926                         MonitorPost(
00927                             "No resource repeat in CQueue::JobExchange");
00928                     }
00929                     SleepMilliSec(250);
00930                     continue;
00931                 }
00932             } else {
00933                 throw;
00934             }
00935             ERR_POST("Too many transaction repeats in CQueue::JobExchange.");
00936             throw;
00937         }
00938     }
00939 
00940     unsigned job_aff_id;
00941     if (new_job && (job_aff_id = new_job->GetAffinityId())) {
00942         CStopWatch sw(CStopWatch::eStart);
00943         time_t exp_time = run_timeout ? curr + 2*run_timeout : 0;
00944         AddAffinity(worker_node, job_aff_id, exp_time);
00945 //        LOG_POST(Warning << "Added affinity: " << sw.Elapsed() * 1000 << "ms");
00946     }
00947 
00948     TimeLineExchange(done_job_id, pending_job_id, curr + run_timeout);
00949 
00950     if (done_rec_updated  &&  job.ShouldNotify(curr)) {
00951         Notify(job.GetSubmAddr(), job.GetSubmPort(), done_job_id);
00952     }
00953 
00954     if (IsMonitoring()) {
00955         CTime tmp_t(CTime::eCurrent);
00956         string msg = tmp_t.AsString();
00957 
00958         msg += " CQueue::PutResultGetJob()";
00959         if (done_job_id) {
00960             msg += " (PUT) job id=";
00961             msg += NStr::IntToString(done_job_id);
00962             msg += " ret_code=";
00963             msg += NStr::IntToString(ret_code);
00964             msg += " output=\"";
00965             msg += *output + '"';
00966         }
00967         if (pending_job_id) {
00968             msg += " (GET) job id=";
00969             msg += NStr::IntToString(pending_job_id);
00970             msg += " worker_node=";
00971             // NB synchronized access?
00972             msg += worker_node->GetId();
00973         }
00974         MonitorPost(msg);
00975     }
00976     // Final touch, if we requested a job with specific affinity, and there was
00977     // no such job found, report it as an exception with affinity preference.
00978     if (!pending_job_id && aff_list && aff_list->size()) {
00979         NCBI_THROW(CNetScheduleException, eNoJobsWithAffinity,
00980             GetAffinityList());
00981     }
00982 }
00983 
00984 
00985 void CQueue::JobDelayExpiration(CWorkerNode*     worker_node,
00986                                 unsigned         job_id,
00987                                 time_t           tm)
00988 {
00989     unsigned queue_run_timeout = GetRunTimeout();
00990 
00991     if (tm <= 0) return;
00992 
00993     time_t run_timeout = 0;
00994     time_t time_start = 0;
00995 
00996     TJobStatus st = GetJobStatus(job_id);
00997     if (st != CNetScheduleAPI::eRunning)
00998         return;
00999 
01000     CJob job;
01001     CNS_Transaction trans(this);
01002 
01003     time_t exp_time = 0;
01004     time_t curr = time(0);
01005 
01006     bool job_updated = false;
01007     {{
01008         CQueueGuard guard(this, &trans);
01009 
01010         CJob::EJobFetchResult res;
01011         if ((res = job.Fetch(this, job_id)) != CJob::eJF_Ok)
01012             return;
01013 
01014         CJobRun* run = job.GetLastRun();
01015         if (!run) {
01016             ERR_POST(Error << "No JobRun for running job "
01017                            << DecorateJobId(job_id));
01018             // Fix it
01019             run = &job.AppendRun();
01020             job_updated = true;
01021         }
01022 
01023         time_start = run->GetTimeStart();
01024         if (time_start == 0) {
01025             // Impossible
01026             ERR_POST(Error
01027                 << "Internal error: time_start == 0 for running job id="
01028                 << DecorateJobId(job_id));
01029             // Fix it just in case
01030             time_start = curr;
01031             run->SetTimeStart(curr);
01032             job_updated = true;
01033         }
01034         run_timeout = job.GetRunTimeout();
01035         if (run_timeout == 0) run_timeout = queue_run_timeout;
01036 
01037         if (time_start + run_timeout > curr + tm) {
01038             // Old timeout is enough to cover this request, keep it.
01039             // If we already changed job object (fixing it), we flush it.
01040             if (job_updated) job.Flush(this);
01041             return;
01042         }
01043 
01044         job.SetRunTimeout(curr + tm - time_start);
01045         job.Flush(this);
01046     }}
01047 
01048     trans.Commit();
01049     UpdateWorkerNodeJob(job_id, curr + tm);
01050 
01051     exp_time = run_timeout == 0 ? 0 : time_start + run_timeout;
01052 
01053     TimeLineMove(job_id, exp_time, curr + tm);
01054 
01055     if (IsMonitoring()) {
01056         CTime tmp_t(CTime::eCurrent);
01057         string msg = tmp_t.AsString();
01058         msg += " CQueue::JobDelayExpiration: Job id=";
01059         msg += NStr::IntToString(job_id);
01060         tmp_t.SetTimeT(curr + tm);
01061         tmp_t.ToLocalTime();
01062         msg += " new_expiration_time=";
01063         msg += tmp_t.AsString();
01064         msg += " job_timeout(sec)=";
01065         msg += NStr::Int8ToString(run_timeout);
01066         msg += " job_timeout(minutes)=";
01067         msg += NStr::Int8ToString(run_timeout/60);
01068 
01069         MonitorPost(msg);
01070     }
01071     return;
01072 }
01073 
01074 
01075 bool CQueue::PutProgressMessage(unsigned      job_id,
01076                                 const string& msg)
01077 {
01078     CJob job;
01079     CNS_Transaction trans(this);
01080     {{
01081         CQueueGuard guard(this, &trans);
01082 
01083         CJob::EJobFetchResult res = job.Fetch(this, job_id);
01084         if (res != CJob::eJF_Ok) return false;
01085         job.SetProgressMsg(msg);
01086         job.Flush(this);
01087     }}
01088     trans.Commit();
01089 
01090     if (IsMonitoring()) {
01091         CTime tmp_t(CTime::eCurrent);
01092         string mmsg = tmp_t.AsString();
01093         mmsg += " CQueue::PutProgressMessage() job id=";
01094         mmsg += NStr::IntToString(job_id);
01095         mmsg += " msg=";
01096         mmsg += msg;
01097 
01098         MonitorPost(mmsg);
01099     }
01100     return true;
01101 }
01102 
01103 
01104 void CQueue::ReturnJob(unsigned job_id)
01105 {
01106     // FIXME: Provide fallback to
01107     // RegisterWorkerNodeVisit if unsuccessful
01108     if (!job_id) return;
01109 
01110     CQueueJSGuard js_guard(this, job_id, CNetScheduleAPI::ePending);
01111     TJobStatus st = js_guard.GetOldStatus();
01112 
01113     if (st != CNetScheduleAPI::eRunning)
01114         return;
01115 
01116     CJob job;
01117     CNS_Transaction trans(this);
01118     {{
01119         CQueueGuard guard(this, &trans);
01120 
01121         CJob::EJobFetchResult res = job.Fetch(this, job_id);
01122         if (res != CJob::eJF_Ok)
01123             return;
01124 
01125         job.SetStatus(CNetScheduleAPI::ePending);
01126         unsigned run_count = job.GetRunCount();
01127         CJobRun* run = job.GetLastRun();
01128         if (!run) {
01129             ERR_POST(Error << "No JobRun for running job "
01130                            << DecorateJobId(job_id));
01131             run = &job.AppendRun();
01132         }
01133         // This is the only legitimate place where Returned status appears
01134         // as a signal that the job was actually returned
01135         run->SetStatus(CNetScheduleAPI::eReturned);
01136         run->SetTimeDone(time(0));
01137 
01138         if (run_count) {
01139             job.SetRunCount(run_count-1);
01140         }
01141         job.Flush(this);
01142     }}
01143     trans.Commit();
01144     js_guard.Commit();
01145     RemoveJobFromWorkerNode(job, eNSCReturned);
01146     TimeLineRemove(job_id);
01147 
01148     if (IsMonitoring()) {
01149         CTime tmp_t(CTime::eCurrent);
01150         string msg = tmp_t.AsString();
01151         msg += " CQueue::ReturnJob: job id=";
01152         msg += NStr::IntToString(job_id);
01153         MonitorPost(msg);
01154     }
01155 }
01156 
01157 
01158 bool
01159 CQueue::GetJobDescr(unsigned   job_id,
01160                     int*       ret_code,
01161                     string*    input,
01162                     string*    output,
01163                     string*    err_msg,
01164                     string*    progress_msg,
01165                     TJobStatus expected_status)
01166 {
01167     for (unsigned i = 0; i < 3; ++i) {
01168         if (i) {
01169             // failed to read the record (maybe looks like writer is late,
01170             // so we need to retry a bit later)
01171             if (IsMonitoring()) {
01172                 MonitorPost(string("GetJobDescr sleep for job_id ") +
01173                     NStr::IntToString(job_id));
01174             }
01175             SleepMilliSec(300);
01176         }
01177 
01178         CJob job;
01179         CJob::EJobFetchResult res;
01180         {{
01181             CQueueGuard guard(this);
01182             res = job.Fetch(this, job_id);
01183         }}
01184         if (res == CJob::eJF_Ok) {
01185             if (expected_status != CNetScheduleAPI::eJobNotFound) {
01186                 TJobStatus status = job.GetStatus();
01187                 if (status != expected_status) {
01188                     // Retry after sleep
01189                     continue;
01190                 }
01191             }
01192             const CJobRun *last_run = job.GetLastRun();
01193             if (ret_code && last_run)
01194                 *ret_code = last_run->GetRetCode();
01195             if (input)
01196                 *input = job.GetInput();
01197             if (output)
01198                 *output = job.GetOutput();
01199             if (err_msg && last_run)
01200                 *err_msg = last_run->GetErrorMsg();
01201             if (progress_msg)
01202                 *progress_msg = job.GetProgressMsg();
01203 
01204             return true;
01205         } else if (res == CJob::eJF_NotFound) {
01206             return false;
01207         } // else retry, or ?throw exception
01208     }
01209 
01210     return false; // job not found
01211 }
01212 
01213 
01214 void CQueue::Truncate(void)
01215 {
01216     Clear();
01217     // Next call updates 'm_BecameEmpty' timestamp
01218     IsExpired(); // locks CQueue lock
01219 }
01220 
01221 
01222 void CQueue::Cancel(unsigned job_id)
01223 {
01224     CQueueJSGuard js_guard(this, job_id,
01225         CNetScheduleAPI::eCanceled);
01226     TJobStatus st = js_guard.GetOldStatus();
01227     if (st != CNetScheduleAPI::ePending &&
01228         st != CNetScheduleAPI::eRunning) {
01229             return;
01230     }
01231     CJob job;
01232     CNS_Transaction trans(this);
01233     {{
01234         CQueueGuard guard(this, &trans);
01235         CJob::EJobFetchResult res = job.Fetch(this, job_id);
01236         if (res != CJob::eJF_Ok) {
01237             // TODO: Integrity error or job just expired?
01238             return;
01239         }
01240 
01241         TJobStatus status = job.GetStatus();
01242         if (status != st) {
01243             ERR_POST(Error
01244                 << "Status mismatch for job " << DecorateJobId(job_id)
01245                 << " matrix " << st
01246                 << " db " << status);
01247             // TODO: server error exception?
01248             return;
01249         }
01250         CJobRun* run = job.GetLastRun();
01251         if (!run) run = &job.AppendRun();
01252         run->SetStatus(CNetScheduleAPI::eCanceled);
01253         run->SetTimeDone(time(0));
01254         run->SetErrorMsg(string("Job canceled from ")
01255             + CNetScheduleAPI::StatusToString(status)
01256             + " state");
01257         job.SetStatus(CNetScheduleAPI::eCanceled);
01258         job.Flush(this);
01259     }}
01260     trans.Commit();
01261     js_guard.Commit();
01262 
01263     TimeLineRemove(job_id);
01264 }
01265 
01266 
01267 void CQueue::ForceReschedule(unsigned job_id)
01268 {
01269     CJob job;
01270     CNS_Transaction trans(this);
01271     {{
01272         CQueueGuard guard(this, &trans);
01273         CJob::EJobFetchResult res = job.Fetch(this, job_id);
01274 
01275         if (res == CJob::eJF_Ok) {
01276             job.SetStatus(CNetScheduleAPI::ePending);
01277             job.SetRunCount(0);
01278         } else {
01279             // TODO: Integrity error or job just expired?
01280             return;
01281         }
01282     }}
01283     trans.Commit();
01284 
01285     m_StatusTracker.SetStatus(job_id, CNetScheduleAPI::ePending);
01286 }
01287 
01288 
01289 TJobStatus
01290 CQueue::GetJobStatus(unsigned job_id) const
01291 {
01292     return m_StatusTracker.GetStatus(job_id);
01293 }
01294 
01295 
01296 bool
01297 CQueue::CountStatus(CJobStatusTracker::TStatusSummaryMap* status_map,
01298                           const string&                         affinity_token)
01299 {
01300     unsigned aff_id = 0;
01301     TNSBitVector aff_jobs;
01302     if (!affinity_token.empty()) {
01303         aff_id = m_AffinityDict.GetTokenId(affinity_token);
01304         if (!aff_id)
01305             return false;
01306         GetJobsWithAffinity(aff_id, &aff_jobs);
01307     }
01308 
01309     m_StatusTracker.CountStatus(status_map, aff_id!=0 ? &aff_jobs : 0);
01310 
01311     return true;
01312 }
01313 
01314 
01315 void CQueue::SetPort(unsigned short port)
01316 {
01317     if (port) {
01318         m_UdpSocket.SetReuseAddress(eOn);
01319         m_UdpSocket.Bind(port);
01320         m_HasNotificationPort = true;
01321     }
01322 }
01323 
01324 
01325 bool CQueue::IsExpired()
01326 {
01327     time_t empty_lifetime = GetEmptyLifetime();
01328     CQueueGuard guard(this);
01329     if (m_Kind && empty_lifetime > 0) {
01330         unsigned cnt = m_StatusTracker.Count();
01331         if (cnt) {
01332             m_BecameEmpty = -1;
01333         } else {
01334             if (m_BecameEmpty != -1 &&
01335                 m_BecameEmpty + empty_lifetime < time(0))
01336             {
01337                 LOG_POST(Info << "Queue " << m_QueueName << " expired."
01338                     << " Became empty: "
01339                     << CTime(m_BecameEmpty).ToLocalTime().AsString()
01340                     << " Empty lifetime: " << empty_lifetime
01341                     << " sec." );
01342                 return true;
01343             }
01344             if (m_BecameEmpty == -1)
01345                 m_BecameEmpty = time(0);
01346         }
01347     }
01348     return false;
01349 }
01350 
01351 
01352 unsigned int CQueue::GetNextId()
01353 {
01354     if (m_LastId.Get() >= CAtomicCounter::TValue(kMax_I4))
01355         m_LastId.Set(0);
01356     return (unsigned) m_LastId.Add(1);
01357 }
01358 
01359 
01360 unsigned int CQueue::GetNextIdBatch(unsigned count)
01361 {
01362     // Modified wrap-around zero, so it will work in the
01363     // case of batch id - client expect monotonously
01364     // growing range of ids.
01365     if (m_LastId.Get() >= CAtomicCounter::TValue(kMax_I4 - count))
01366         m_LastId.Set(0);
01367     unsigned int id = (unsigned) m_LastId.Add(count);
01368     id = id - count + 1;
01369     return id;
01370 }
01371 
01372 
01373 void CQueue::ReadJobs(unsigned peer_addr,
01374                       unsigned count, unsigned timeout,
01375                       unsigned& read_id, TNSBitVector& jobs)
01376 {
01377     unsigned group_id = 0;
01378     time_t curr = time(0); // TODO: move it to parameter???
01379     time_t exp_time = timeout ? curr + timeout : 0;
01380     // Get jobs from status tracker here
01381     m_StatusTracker.SwitchJobs(count,
01382                               CNetScheduleAPI::eDone, CNetScheduleAPI::eReading,
01383                               jobs);
01384     CNetSchedule_JSGroupGuard gr_guard(m_StatusTracker, CNetScheduleAPI::eDone, jobs);
01385 
01386     // Fix the db objects
01387     CJob job;
01388     CNS_Transaction trans(this);
01389     {{
01390         CQueueGuard guard(this, &trans);
01391         TNSBitVector::enumerator en(jobs.first());
01392         for (; en.valid(); ++en) {
01393             unsigned job_id = *en;
01394             CJob::EJobFetchResult res = job.Fetch(this, job_id);
01395             if (res != CJob::eJF_Ok) {
01396                 // TODO: check for db error here
01397                 ERR_POST(Error << "Can't read job " << DecorateJobId(job_id));
01398                 continue;
01399             }
01400             if (!group_id) {
01401                 if (m_GroupLastId.Get() >= CAtomicCounter::TValue(kMax_I4))
01402                     m_GroupLastId.Set(0);
01403                 group_id = (unsigned) m_GroupLastId.Add(1);
01404             }
01405 
01406             unsigned run_count = job.GetRunCount() + 1;
01407             CJobRun& run = job.AppendRun();
01408             run.SetStatus(CNetScheduleAPI::eReading);
01409             run.SetTimeStart(curr);
01410             run.SetNodeAddr(peer_addr);
01411             job.SetStatus(CNetScheduleAPI::eReading);
01412             job.SetRunTimeout(timeout);
01413             job.SetRunCount(run_count);
01414             job.SetReadGroup(group_id);
01415 
01416             job.Flush(this);
01417             if (exp_time && m_RunTimeLine) {
01418                 // TODO: Optimize locking of rtl lock for every object
01419                 // hoist it out of the loop
01420                 CWriteLockGuard rtl_guard(m_RunTimeLineLock);
01421                 m_RunTimeLine->AddObject(exp_time, job_id);
01422             }
01423         }
01424     }}
01425     trans.Commit();
01426     gr_guard.Commit();
01427 
01428     if (group_id)
01429         x_CreateReadGroup(group_id, jobs);
01430     read_id = group_id;
01431 }
01432 
01433 void CQueue::x_ChangeGroupStatus(unsigned            group_id,
01434                                  const TNSBitVector& jobs,
01435                                  TJobStatus          status)
01436 {
01437     time_t curr = time(0);
01438     {{
01439         CFastMutexGuard guard(m_GroupMapLock);
01440         // Check that the group is valid
01441         TGroupMap::iterator i = m_GroupMap.find(group_id);
01442         if (i == m_GroupMap.end())
01443             NCBI_THROW(CNetScheduleException, eInvalidParameter,
01444                         "No such read group");
01445 
01446         TNSBitVector& bv = (*i).second;
01447 
01448         // Check that all jobs are in read group
01449         TNSBitVector check(jobs);
01450         check -= bv;
01451         if (check.any())
01452             NCBI_THROW(CNetScheduleException, eInvalidParameter,
01453                        "Jobs do not belong to group");
01454 
01455         // Remove jobs from read group, remove group if empty
01456         bv -= jobs;
01457         if (!bv.any())
01458             m_GroupMap.erase(i);
01459     }}
01460 
01461     // TODO: Introduce read group guard here
01462     // CNetSchedule_ReadGroupGuard rg_guard(this, jobs);
01463 
01464     // Switch state
01465     CNetSchedule_JSGroupGuard gr_guard(m_StatusTracker,
01466                                        CNetScheduleAPI::eReading,
01467                                        jobs,
01468                                        status);
01469     // Fix the db objects
01470     CJob job;
01471     CNS_Transaction trans(this);
01472     {{
01473         CQueueGuard guard(this, &trans);
01474         TNSBitVector::enumerator en(jobs.first());
01475         for (; en.valid(); ++en) {
01476             unsigned job_id = *en;
01477             CJob::EJobFetchResult res = job.Fetch(this, job_id);
01478             if (res != CJob::eJF_Ok) {
01479                 // TODO: check for db error here
01480                 ERR_POST(Error << "Can't read job " << DecorateJobId(job_id));
01481                 continue;
01482             }
01483 
01484             CJobRun* run = job.GetLastRun();
01485             if (!run) {
01486                 ERR_POST(Error << "No JobRun for reading job "
01487                                << DecorateJobId(job_id));
01488                 // fix it here as well as we can
01489                 run = &job.AppendRun();
01490                 run->SetTimeStart(curr);
01491             }
01492             run->SetStatus(status);
01493             run->SetTimeDone(curr);
01494             job.SetStatus(status);
01495 
01496             job.Flush(this);
01497             if (m_RunTimeLine) {
01498                 // TODO: Optimize locking of rtl lock for every object
01499                 // hoist it out of the loop
01500                 CWriteLockGuard rtl_guard(m_RunTimeLineLock);
01501                 // TODO: Ineffective, better learn expiration time from
01502                 // object, then remove it with another method
01503                 m_RunTimeLine->RemoveObject(job_id);
01504             }
01505         }
01506     }}
01507     trans.Commit();
01508     gr_guard.Commit();
01509     // rg_guard.Commit();
01510 }
01511 
01512 
01513 void CQueue::ConfirmJobs(unsigned read_id, TNSBitVector& jobs)
01514 {
01515     x_ChangeGroupStatus(read_id, jobs, CNetScheduleAPI::eConfirmed);
01516 }
01517 
01518 
01519 void CQueue::FailReadingJobs(unsigned read_id, TNSBitVector& jobs)
01520 {
01521     x_ChangeGroupStatus(read_id, jobs, CNetScheduleAPI::eReadFailed);
01522 }
01523 
01524 
01525 void CQueue::ReturnReadingJobs(unsigned read_id, TNSBitVector& jobs)
01526 {
01527     x_ChangeGroupStatus(read_id, jobs, CNetScheduleAPI::eDone);
01528 }
01529 
01530 
01531 void CQueue::EraseJob(unsigned job_id)
01532 {
01533     m_StatusTracker.Erase(job_id);
01534 
01535     {{
01536         // Request delayed record delete
01537         CFastMutexGuard jtd_guard(m_JobsToDeleteLock);
01538         // TODO: Check that object is not already in these vectors,
01539         // saving us DB flush
01540         m_JobsToDelete.set_bit(job_id);
01541         m_DeletedJobs.set_bit(job_id);
01542         m_AffJobsToDelete.set_bit(job_id);
01543 
01544         // start affinity erase process
01545         m_AffWrapped = false;
01546         m_LastAffId = m_CurrAffId;
01547 
01548         FlushDeletedVectors();
01549     }}
01550     TimeLineRemove(job_id);
01551     if (IsMonitoring()) {
01552         CTime tmp_t(CTime::eCurrent);
01553         string msg = tmp_t.AsString();
01554         msg += " CQueue::EraseJob() job id=";
01555         msg += NStr::IntToString(job_id);
01556         MonitorPost(msg);
01557     }
01558 
01559 }
01560 
01561 
01562 void CQueue::Erase(const TNSBitVector& job_ids)
01563 {
01564     CFastMutexGuard jtd_guard(m_JobsToDeleteLock);
01565     m_JobsToDelete    |= job_ids;
01566     m_DeletedJobs     |= job_ids;
01567     m_AffJobsToDelete |= job_ids;
01568 
01569     // start affinity erase process
01570     m_AffWrapped = false;
01571     m_LastAffId = m_CurrAffId;
01572 
01573     FlushDeletedVectors();
01574 }
01575 
01576 
01577 void CQueue::Clear()
01578 {
01579     TNSBitVector bv;
01580 
01581     {{
01582         CWriteLockGuard rtl_guard(m_RunTimeLineLock);
01583         // TODO: interdependency btw m_StatusTracker.lock and m_RunTimeLineLock
01584         m_StatusTracker.ClearAll(&bv);
01585         m_RunTimeLine->ReInit(0);
01586     }}
01587 
01588     Erase(bv);
01589 }
01590 
01591 
01592 void CQueue::FlushDeletedVectors(EVectorId vid)
01593 {
01594     static EVectorId all_ids[] = { eVIJob, eVITag, eVIAffinity };
01595     TNSBitVector all_vects[] =
01596         { m_JobsToDelete, m_DeletedJobs, m_AffJobsToDelete };
01597     for (size_t i = 0; i < sizeof(all_ids) / sizeof(all_ids[0]); ++i) {
01598         if (vid != eVIAll && vid != all_ids[i]) continue;
01599         m_DeletedJobsDB.id = all_ids[i];
01600         TNSBitVector& bv = all_vects[i];
01601         bv.optimize();
01602         m_DeletedJobsDB.WriteVector(bv, SDeletedJobsDB::eNoCompact);
01603     }
01604 }
01605 
01606 
01607 void CQueue::FilterJobs(TNSBitVector& ids)
01608 {
01609     TNSBitVector alive_jobs;
01610     m_StatusTracker.GetAliveJobs(alive_jobs);
01611     ids &= alive_jobs;
01612     //CFastMutexGuard guard(m_JobsToDeleteLock);
01613     //ids -= m_DeletedJobs;
01614 }
01615 
01616 void CQueue::ClearAffinityIdx()
01617 {
01618     const unsigned kDeletedJobsThreshold = 10000;
01619     const unsigned kAffBatchSize = 1000;
01620     // thread-safe copies of progress pointers
01621     unsigned curr_aff_id = 0;
01622     unsigned last_aff_id = 0;
01623     {{
01624         // Ensure that we have some job to do
01625         CFastMutexGuard jtd_guard(m_JobsToDeleteLock);
01626         // TODO: calculate (job_to_delete_count * maturity) instead
01627         // of just job count. Provides more safe version - even a
01628         // single deleted job will be eventually cleaned up.
01629         if (m_AffJobsToDelete.count() < kDeletedJobsThreshold)
01630             return;
01631         curr_aff_id = m_CurrAffId;
01632         last_aff_id = m_LastAffId;
01633     }}
01634 
01635     TNSBitVector bv(bm::BM_GAP);
01636 
01637     // mark if we are wrapped and chasing the "tail"
01638     bool wrapped = curr_aff_id < last_aff_id;
01639 
01640     // get batch of affinity tokens in the index
01641     {{
01642         CFastMutexGuard guard(m_AffinityIdxLock);
01643         m_AffinityIdx.SetTransaction(NULL);
01644         CBDB_FileCursor cur(m_AffinityIdx);
01645         cur.SetCondition(CBDB_FileCursor::eGE);
01646         cur.From << curr_aff_id;
01647 
01648         unsigned n = 0;
01649         EBDB_ErrCode ret;
01650         for (; (ret = cur.Fetch()) == eBDB_Ok && n < kAffBatchSize; ++n) {
01651             curr_aff_id = m_AffinityIdx.aff_id;
01652             if (wrapped && curr_aff_id >= last_aff_id) // run over the tail
01653                 break;
01654             bv.set(curr_aff_id);
01655         }
01656         if (ret != eBDB_Ok) {
01657             if (ret != eBDB_NotFound)
01658                 ERR_POST(Error << "Error reading affinity index: " << ret);
01659             if (wrapped) {
01660                 curr_aff_id = last_aff_id;
01661             } else {
01662                 // wrap-around
01663                 curr_aff_id = 0;
01664                 wrapped = true;
01665                 cur.SetCondition(CBDB_FileCursor::eGE);
01666                 cur.From << curr_aff_id;
01667                 for (; n < kAffBatchSize && (ret = cur.Fetch()) == eBDB_Ok; ++n) {
01668                     curr_aff_id = m_AffinityIdx.aff_id;
01669                     if (curr_aff_id >= last_aff_id) // run over the tail
01670                         break;
01671                     bv.set(curr_aff_id);
01672                 }
01673                 if (ret != eBDB_NotFound)
01674                     ERR_POST(Error << "Error reading affinity index");
01675             }
01676         }
01677     }}
01678 
01679     {{
01680         CFastMutexGuard jtd_guard(m_JobsToDeleteLock);
01681         m_AffWrapped = wrapped;
01682         m_CurrAffId = curr_aff_id;
01683     }}
01684 
01685     // clear all hanging references
01686     TNSBitVector::enumerator en(bv.first());
01687     for (; en.valid(); ++en) {
01688         unsigned aff_id = *en;
01689         CNS_Transaction trans(this);
01690         CFastMutexGuard guard(m_AffinityIdxLock);
01691         m_AffinityIdx.SetTransaction(&trans);
01692 
01693         TNSBitVector bvect(bm::BM_GAP);
01694         m_AffinityIdx.aff_id = aff_id;
01695         EBDB_ErrCode ret =
01696             m_AffinityIdx.ReadVector(&bvect, bm::set_OR, NULL);
01697         if (ret != eBDB_Ok) {
01698             if (ret != eBDB_NotFound)
01699                 ERR_POST(Error << "Error reading affinity index: " << ret);
01700             continue;
01701         }
01702         unsigned old_count = bvect.count();
01703         bvect -= m_AffJobsToDelete;
01704         unsigned new_count = bvect.count();
01705         if (new_count == old_count) {
01706             continue;
01707         }
01708         m_AffinityIdx.aff_id = aff_id;
01709         if (bvect.any()) {
01710             bvect.optimize();
01711             m_AffinityIdx.WriteVector(bvect, SAffinityIdx::eNoCompact);
01712         } else {
01713             // TODO: if there is no record in m_AffinityMap,
01714             // remove record from SAffinityDictDB
01715             // As of now, token stays indefinitely, even if empty.
01716             // NB: Potential DEADLOCK, see NB in FindPendingJob
01717             //{{
01718             //    CFastMutexGuard aff_guard(m_AffinityMapLock);
01719             //    if (!m_AffinityMap.CheckAffinity(aff_id);
01720             //        m_AffinityDict.RemoveToken(aff_id, trans);
01721             //}}
01722             m_AffinityIdx.Delete();
01723         }
01724         trans.Commit();
01725 //        cout << aff_id << " cleaned" << endl;
01726     } // for
01727 
01728     {{
01729         CFastMutexGuard jtd_guard(m_JobsToDeleteLock);
01730         if (m_AffWrapped && m_CurrAffId >= m_LastAffId) {
01731             m_AffJobsToDelete.clear(true);
01732             FlushDeletedVectors(eVIAffinity);
01733         }
01734     }}
01735 }
01736 
01737 
01738 void CQueue::Notify(unsigned addr, unsigned short port, unsigned job_id)
01739 {
01740     if (!m_HasNotificationPort) return;
01741     char msg[1024];
01742     sprintf(msg, "JNTF %u", job_id);
01743 
01744     CFastMutexGuard guard(m_UdpSocketLock);
01745 
01746     m_UdpSocket.Send(msg, strlen(msg)+1,
01747                     CSocketAPI::ntoa(addr), port);
01748 }
01749 
01750 
01751 void CQueue::OptimizeMem()
01752 {
01753     m_StatusTracker.OptimizeMem();
01754 }
01755 
01756 
01757 void CQueue::AddJobsToAffinity(CBDB_Transaction& trans,
01758                                unsigned aff_id,
01759                                unsigned job_id_from,
01760                                unsigned job_id_to)
01761 
01762 {
01763     CFastMutexGuard guard(m_AffinityIdxLock);
01764     m_AffinityIdx.SetTransaction(&trans);
01765     TNSBitVector bv(bm::BM_GAP);
01766 
01767     // check if vector is in the database
01768 
01769     // read vector from the file
01770     m_AffinityIdx.aff_id = aff_id;
01771     /*EBDB_ErrCode ret = */
01772     m_AffinityIdx.ReadVector(&bv, bm::set_OR, NULL);
01773     if (job_id_to == 0) {
01774         bv.set(job_id_from);
01775     } else {
01776         bv.set_range(job_id_from, job_id_to);
01777     }
01778     m_AffinityIdx.aff_id = aff_id;
01779     m_AffinityIdx.WriteVector(bv, SAffinityIdx::eNoCompact);
01780 }
01781 
01782 
01783 void CQueue::AddJobsToAffinity(CBDB_Transaction& trans,
01784                                const vector<CJob>& batch)
01785 {
01786     CFastMutexGuard guard(m_AffinityIdxLock);
01787     m_AffinityIdx.SetTransaction(&trans);
01788     // TODO: Is not it easier to have map with auto_ptrs?
01789     // In this case we don't need this clumsy map freeing.
01790     typedef map<unsigned, TNSBitVector*> TBVMap;
01791 
01792     TBVMap  bv_map;
01793     try {
01794         unsigned bsize = batch.size();
01795         for (unsigned i = 0; i < bsize; ++i) {
01796             const CJob& job = batch[i];
01797             unsigned aff_id = job.GetAffinityId();
01798             unsigned job_id_start = job.GetId();
01799 
01800             TNSBitVector* aff_bv;
01801 
01802             TBVMap::iterator aff_it = bv_map.find(aff_id);
01803             if (aff_it == bv_map.end()) { // new element
01804                 auto_ptr<TNSBitVector> bv(new TNSBitVector(bm::BM_GAP));
01805                 m_AffinityIdx.aff_id = aff_id;
01806                 /*EBDB_ErrCode ret = */
01807                 m_AffinityIdx.ReadVector(bv.get(), bm::set_OR, NULL);
01808                 aff_bv = bv.get();
01809                 bv_map[aff_id] = bv.release();
01810             } else {
01811                 aff_bv = aff_it->second;
01812             }
01813 
01814 
01815             // look ahead for the same affinity id
01816             unsigned j;
01817             for (j=i+1; j < bsize; ++j) {
01818                 if (batch[j].GetAffinityId() != aff_id) {
01819                     break;
01820                 }
01821                 _ASSERT(batch[j].GetId() == (batch[j-1].GetId()+1));
01822                 //job_id_end = batch[j].GetId();
01823             }
01824             --j;
01825 
01826             if ((i!=j) && (aff_id == batch[j].GetAffinityId())) {
01827                 unsigned job_id_end = batch[j].GetId();
01828                 aff_bv->set_range(job_id_start, job_id_end);
01829                 i = j;
01830             } else { // look ahead failed
01831                 aff_bv->set(job_id_start);
01832             }
01833 
01834         } // for
01835 
01836         // save all changes to the database
01837         NON_CONST_ITERATE(TBVMap, it, bv_map) {
01838             unsigned aff_id = it->first;
01839             TNSBitVector* bv = it->second;
01840             bv->optimize();
01841 
01842             m_AffinityIdx.aff_id = aff_id;
01843             m_AffinityIdx.WriteVector(*bv, SAffinityIdx::eNoCompact);
01844 
01845             delete it->second; it->second = 0;
01846         }
01847     }
01848     catch (exception& )
01849     {
01850         NON_CONST_ITERATE(TBVMap, it, bv_map) {
01851             delete it->second; it->second = 0;
01852         }
01853         throw;
01854     }
01855 
01856 }
01857 
01858 
01859 void
01860 CQueue::x_ReadAffIdx_NoLock(unsigned      aff_id,
01861                             TNSBitVector* jobs)
01862 {
01863     m_AffinityIdx.aff_id = aff_id;
01864     m_AffinityIdx.ReadVector(jobs, bm::set_OR);
01865 }
01866 
01867 
01868 void
01869 CQueue::GetJobsWithAffinities(const TNSBitVector& aff_id_set,
01870                               TNSBitVector*       jobs)
01871 {
01872     CFastMutexGuard guard(m_AffinityIdxLock);
01873     m_AffinityIdx.SetTransaction(NULL);
01874     TNSBitVector::enumerator en(aff_id_set.first());
01875     for (; en.valid(); ++en) {  // for each affinity id
01876         unsigned aff_id = *en;
01877         x_ReadAffIdx_NoLock(aff_id, jobs);
01878     }
01879 }
01880 
01881 
01882 void
01883 CQueue::GetJobsWithAffinity(unsigned      aff_id,
01884                             TNSBitVector* jobs)
01885 {
01886     CFastMutexGuard guard(m_AffinityIdxLock);
01887     m_AffinityIdx.SetTransaction(NULL);
01888     x_ReadAffIdx_NoLock(aff_id, jobs);
01889 }
01890 
01891 
01892 class CAffinityResolver : public IAffinityResolver
01893 {
01894 public:
01895     CAffinityResolver(CQueue& queue) : m_Queue(queue) {}
01896     virtual void GetJobsWithAffinities(const TNSBitVector& aff_id_set,
01897                                        TNSBitVector*       jobs)
01898     {
01899         m_Queue.GetJobsWithAffinities(aff_id_set, jobs);
01900     }
01901 private:
01902     CQueue& m_Queue;
01903 };
01904 
01905 
01906 unsigned
01907 CQueue::FindPendingJob(CWorkerNode* worker_node,
01908                        const list<string>& aff_list,
01909                        time_t curr)
01910 {
01911     unsigned job_id = 0;
01912 
01913     TNSBitVector blacklisted_jobs;
01914     TNSBitVector aff_ids;
01915     const TNSBitVector* effective_aff_ids = 0;
01916 
01917     // request specified affinity explicitly - client managed affinity
01918     bool is_specific_aff = !aff_list.empty();
01919     m_AffinityDict.GetTokensIds(aff_list, aff_ids);
01920     if (is_specific_aff && !aff_ids.any())
01921         // Requested affinities are not known to the server at all
01922         return 0;
01923 
01924     // affinity: get list of job candidates
01925     // previous FindPendingJob() call may have precomputed candidate job ids
01926     {{
01927         CWorkerNodeAffinityGuard na(*worker_node);
01928 
01929         if (is_specific_aff)
01930             na.CleanCandidates(aff_ids);
01931 
01932         // If we have a specific affinity request, use it for job search,
01933         // otherwise use what we have for the node
01934         if (is_specific_aff) effective_aff_ids = &aff_ids;
01935         // else effective_aff_ids = &na.GetAffinities(curr);
01936 
01937         // 'blacklisted_jobs' are also required in the code, finding new
01938         // affinity association below
01939         blacklisted_jobs = na.GetBlacklistedJobs(curr);
01940         CAffinityResolver affinity_resolver(*this);
01941         job_id = na.GetJobWithAffinities(effective_aff_ids,
01942                                          m_StatusTracker,
01943                                          affinity_resolver);
01944         if (job_id == unsigned(-1) || (!job_id && is_specific_aff)) return 0;
01945     }}
01946 
01947     // No affinity association or there are no more jobs with
01948     // established affinity
01949 
01950     // try to find a vacant(not taken by any other worker node) affinity id
01951     if (false && !job_id) { // DEBUG <- this is the most expensive affinity op, disable it for now
01952         TNSBitVector assigned_aff;
01953         GetAllAssignedAffinities(curr, assigned_aff);
01954 
01955         if (assigned_aff.any()) {
01956             // get all jobs belonging to other (already assigned) affinities,
01957             // ORing them with our own blacklisted jobs
01958             TNSBitVector assigned_candidate_jobs(blacklisted_jobs);
01959             // GetJobsWithAffinity actually ORs into second argument
01960             GetJobsWithAffinities(assigned_aff, &assigned_candidate_jobs);
01961             // we got list of jobs we do NOT want to schedule, use them as
01962             // blacklisted to get a job with possibly with unassigned affinity
01963             // (or without affinity at all).
01964             bool pending_jobs_avail =
01965                 m_StatusTracker.GetPendingJob(assigned_candidate_jobs,
01966                                              &job_id);
01967             if (!job_id && !pending_jobs_avail)
01968                 return 0;
01969         }
01970     }
01971 
01972     // We just take the first available job in the queue, taking into account
01973     // blacklisted jobs as usual.
01974     if (!job_id)
01975         m_StatusTracker.GetPendingJob(blacklisted_jobs, &job_id);
01976 
01977     return job_id;
01978 }
01979 
01980 
01981 struct SAffinityJobs
01982 {
01983     string aff_token;
01984     unsigned job_count;
01985     list<TWorkerNodeRef> nodes;
01986 };
01987 typedef map<unsigned, SAffinityJobs> TAffinities;
01988 
01989 string CQueue::GetAffinityList()
01990 {
01991     string aff_list;
01992     unsigned aff_id;
01993     unsigned bv_cnt;
01994     TAffinities affinities;
01995     time_t now = time(0);
01996     {{
01997     CFastMutexGuard guard(m_AffinityIdxLock);
01998     m_AffinityIdx.SetTransaction(NULL);
01999     CBDB_FileCursor cur(m_AffinityIdx);
02000     cur.SetCondition(CBDB_FileCursor::eGE);
02001     cur.From << 0;
02002 
02003     EBDB_ErrCode ret;
02004     // for every affinity
02005     for (; (ret = cur.Fetch()) == eBDB_Ok;) {
02006         aff_id = m_AffinityIdx.aff_id;
02007         TNSBitVector bvect(bm::BM_GAP);
02008         ret = m_AffinityIdx.ReadVector(&bvect, bm::set_OR);
02009         m_StatusTracker.PendingIntersect(&bvect);
02010         // how many pending tasks with this affinity
02011         bv_cnt = bvect.count();
02012         if (ret != eBDB_Ok) {
02013             if (ret != eBDB_NotFound)
02014                 ERR_POST(Error << "Error reading affinity index: " << ret);
02015             continue;
02016         }
02017         SAffinityJobs aff_jobs;
02018 
02019         aff_jobs.aff_token = m_AffinityDict.GetAffToken(aff_id);
02020         aff_jobs.job_count = bv_cnt;
02021         affinities[aff_id] = aff_jobs;
02022         // what are the nodes executing this affinity
02023     }
02024     }}
02025 
02026     {{
02027         CQueueWorkerNodeListGuard wnl(m_WorkerNodeList);
02028         list<TWorkerNodeRef> nodes;
02029         wnl.GetNodes(now, nodes);
02030         NON_CONST_ITERATE(list<TWorkerNodeRef>, node_it, nodes) {
02031             TNSBitVector::enumerator
02032                 en(wnl.GetNodeAffinities(now, *node_it).first());
02033             for (; en.valid(); ++en) {
02034                 unsigned aff_id = *en;
02035                 affinities[aff_id].nodes.push_back(*node_it);
02036             }
02037         }
02038     }}
02039 
02040     ITERATE(TAffinities, it, affinities) {
02041         if (aff_list.size()) aff_list += ", ";
02042         aff_list += it->second.aff_token;
02043         aff_list += ' ';
02044         aff_list += NStr::UIntToString(it->second.job_count);
02045         ITERATE(list<TWorkerNodeRef>, node_it, it->second.nodes) {
02046             aff_list += ' ';
02047             aff_list += (*node_it)->GetId();
02048         }
02049     }
02050     return aff_list;
02051 }
02052 
02053 
02054 bool CQueue::FailJob(CWorkerNode*  worker_node,
02055                      unsigned      job_id,
02056                      const string& err_msg,
02057                      const string& output,
02058                      int           ret_code)
02059 {
02060     unsigned failed_retries;
02061     unsigned max_output_size;
02062     time_t   blacklist_time;
02063     {{
02064         CQueueParamAccessor qp(*this);
02065         failed_retries  = qp.GetFailedRetries();
02066         max_output_size = qp.GetMaxOutputSize();
02067         blacklist_time  = qp.GetBlacklistTime();
02068     }}
02069 
02070     if (output.size() > max_output_size) {
02071         NCBI_THROW(CNetScheduleException, eDataTooLong,
02072            "Output is too long");
02073     }
02074     // We first change memory state to "Failed", it is safer because
02075     // there is only danger to find job in inconsistent state, and because
02076     // Failed is terminal, usually you can not allocate job or do anything
02077     // disturbing from this state.
02078     CQueueJSGuard js_guard(this, job_id,
02079                            CNetScheduleAPI::eFailed);
02080 
02081     CJob job;
02082     CNS_Transaction trans(this);
02083 
02084     time_t curr = time(0);
02085 
02086     bool rescheduled = false;
02087     {{
02088         CQueueGuard guard(this, &trans);
02089 
02090         CJob::EJobFetchResult res = job.Fetch(this, job_id);
02091         if (res != CJob::eJF_Ok) {
02092             // TODO: Integrity error or job just expired?
02093             LOG_POST(Error << "Can not fetch job " << DecorateJobId(job_id));
02094             return false;
02095         }
02096 
02097         unsigned run_count = job.GetRunCount();
02098         if (run_count <= failed_retries) {
02099             time_t exp_time = blacklist_time ? curr + blacklist_time : 0;
02100             BlacklistJob(worker_node, job_id, exp_time);
02101             job.SetStatus(CNetScheduleAPI::ePending);
02102             js_guard.SetStatus(CNetScheduleAPI::ePending);
02103             rescheduled = true;
02104             LOG_POST(Warning << "Job " << DecorateJobId(job_id)
02105                              << " rescheduled with "
02106                              << (failed_retries - run_count)
02107                              << " retries left");
02108         } else {
02109             job.SetStatus(CNetScheduleAPI::eFailed);
02110             rescheduled = false;
02111             LOG_POST(Warning << "Job " << DecorateJobId(job_id)
02112                              << " failed");
02113         }
02114 
02115         CJobRun* run = job.GetLastRun();
02116         if (!run) {
02117             ERR_POST(Error << "No JobRun for running job "
02118                            << DecorateJobId(job_id));
02119             run = &job.AppendRun();
02120         }
02121 
02122         run->SetStatus(CNetScheduleAPI::eFailed);
02123         run->SetTimeDone(curr);
02124         run->SetErrorMsg(err_msg);
02125         run->SetRetCode(ret_code);
02126         job.SetOutput(output);
02127 
02128         job.Flush(this);
02129     }}
02130     RemoveJobFromWorkerNode(job, eNSCFailed);
02131 
02132 
02133     trans.Commit();
02134     js_guard.Commit();
02135 
02136     if (m_RunTimeLine) {
02137         CWriteLockGuard guard(m_RunTimeLineLock);
02138         m_RunTimeLine->RemoveObject(job_id);
02139     }
02140 
02141     if (!rescheduled  &&  job.ShouldNotify(curr)) {
02142         Notify(job.GetSubmAddr(), job.GetSubmPort(), job_id);
02143     }
02144 
02145     if (IsMonitoring()) {
02146         CTime tmp_t(CTime::eCurrent);
02147         string msg = tmp_t.AsString();
02148         msg += " CQueue::JobFailed() job id=";
02149         msg += NStr::IntToString(job_id);
02150         msg += " err_msg=";
02151         msg += err_msg;
02152         msg += " output=";
02153         msg += output;
02154         if (rescheduled)
02155             msg += " rescheduled";
02156         MonitorPost(msg);
02157     }
02158     return true;
02159 }
02160 
02161 
02162 /// Specified status is OR-ed with the target vector
02163 void CQueue::JobsWithStatus(TJobStatus    status,
02164                             TNSBitVector* bv) const
02165 {
02166     m_StatusTracker.StatusSnapshot(status, bv);
02167 }
02168 
02169 
02170 // Functor for EQ
02171 class CQueryFunctionEQ : public CQueryFunction_BV_Base<TNSBitVector>
02172 {
02173 public:
02174     CQueryFunctionEQ(CQueue& queue) :
02175       m_Queue(queue)
02176       {}
02177       typedef CQueryFunction_BV_Base<TNSBitVector> TParent;
02178       typedef TParent::TBVContainer::TBuffer TBuffer;
02179       typedef TParent::TBVContainer::TBitVector TBitVector;
02180       virtual void Evaluate(CQueryParseTree::TNode& qnode);
02181 private:
02182     void x_CheckArgs(const CQueryFunctionBase::TArgVector& args);
02183     CQueue& m_Queue;
02184 };
02185 
02186 void CQueryFunctionEQ::Evaluate(CQueryParseTree::TNode& qnode)
02187 {
02188     //NcbiCout << "Key: " << key << " Value: " << val << NcbiEndl;
02189     CQueryFunctionBase::TArgVector args;
02190     this->MakeArgVector(qnode, args);
02191     x_CheckArgs(args);
02192     const string& key = args[0]->GetValue().GetStrValue();
02193     const string& val = args[1]->GetValue().GetStrValue();
02194     auto_ptr<TNSBitVector> bv;
02195     auto_ptr<TBuffer> buf;
02196     if (key == "status") {
02197         // special case for status
02198         CNetScheduleAPI::EJobStatus status =
02199             CNetScheduleAPI::StringToStatus(val);
02200         if (status == CNetScheduleAPI::eJobNotFound)
02201             NCBI_THROW(CNetScheduleException,
02202             eQuerySyntaxError, string("Unknown status: ") + val);
02203         bv.reset(new TNSBitVector);
02204         m_Queue.JobsWithStatus(status, bv.get());
02205     } else if (key == "id") {
02206         unsigned job_id = CNetScheduleKey(val).id;
02207         bv.reset(new TNSBitVector);
02208         bv->set(job_id);
02209     } else {
02210         if (val == "*") {
02211             // wildcard
02212             bv.reset(new TNSBitVector);
02213             m_Queue.ReadTags(key, bv.get());
02214         } else {
02215             buf.reset(new TBuffer);
02216             if (!m_Queue.ReadTag(key, val, buf.get())) {
02217                 // Signal empty set by setting empty bitvector
02218                 bv.reset(new TNSBitVector());
02219                 buf.reset(NULL);
02220             }
02221         }
02222     }
02223     if (qnode.GetValue().IsNot()) {
02224         // Apply NOT here
02225         if (bv.get()) {
02226             bv->invert();
02227         } else if (buf.get()) {
02228             bv.reset(new TNSBitVector());
02229             bm::operation_deserializer<TNSBitVector>::deserialize(*bv,
02230                 &((*buf.get())[0]),
02231                 0,
02232                 bm::set_ASSIGN);
02233             bv.get()->invert();
02234         }
02235     }
02236     if (bv.get())
02237         this->MakeContainer(qnode)->SetBV(bv.release());
02238     else if (buf.get())
02239         this->MakeContainer(qnode)->SetBuffer(buf.release());
02240 }
02241 
02242 
02243 void CQueryFunctionEQ::x_CheckArgs(const CQueryFunctionBase::TArgVector& args)
02244 {
02245     if (args.size() != 2 ||
02246         (args[0]->GetValue().GetType() != CQueryParseNode::eIdentifier  &&
02247         args[0]->GetValue().GetType() != CQueryParseNode::eString)) {
02248             NCBI_THROW(CNetScheduleException,
02249                 eQuerySyntaxError, "Wrong arguments for '='");
02250     }
02251 }
02252 
02253 
02254 TNSBitVector* CQueue::ExecSelect(const string& query, list<string>& fields)
02255 {
02256     CQueryParseTree qtree;
02257     try {
02258         qtree.Parse(query.c_str());
02259     } catch (CQueryParseException& ex) {
02260         NCBI_THROW(CNetScheduleException, eQuerySyntaxError, ex.GetMsg());
02261     }
02262     CQueryParseTree::TNode* top = qtree.GetQueryTree();
02263     if (!top)
02264         NCBI_THROW(CNetScheduleException,
02265         eQuerySyntaxError, "Query syntax error in parse");
02266 
02267     if (top->GetValue().GetType() == CQueryParseNode::eSelect) {
02268         // Find where clause here
02269         typedef CQueryParseTree::TNode::TNodeList_I TNodeIterator;
02270         for (TNodeIterator it = top->SubNodeBegin();
02271             it != top->SubNodeEnd(); ++it) {
02272                 CQueryParseTree::TNode* node = *it;
02273                 CQueryParseNode::EType node_type = node->GetValue().GetType();
02274                 if (node_type == CQueryParseNode::eList) {
02275                     for (TNodeIterator it2 = node->SubNodeBegin();
02276                         it2 != node->SubNodeEnd(); ++it2) {
02277                             fields.push_back((*it2)->GetValue().GetStrValue());
02278                     }
02279                 }
02280                 if (node_type == CQueryParseNode::eWhere) {
02281                     TNodeIterator it2 = node->SubNodeBegin();
02282                     if (it2 == node->SubNodeEnd())
02283                         NCBI_THROW(CNetScheduleException,
02284                         eQuerySyntaxError,
02285                         "Query syntax error in WHERE clause");
02286                     top = (*it2);
02287                     break;
02288                 }
02289         }
02290     }
02291 
02292     // Execute 'select' phase.
02293     CQueryExec qexec;
02294     qexec.AddFunc(CQueryParseNode::eAnd,
02295         new CQueryFunction_BV_Logic<TNSBitVector>(bm::set_AND));
02296     qexec.AddFunc(CQueryParseNode::eOr,
02297         new CQueryFunction_BV_Logic<TNSBitVector>(bm::set_OR));
02298     qexec.AddFunc(CQueryParseNode::eSub,
02299         new CQueryFunction_BV_Logic<TNSBitVector>(bm::set_SUB));
02300     qexec.AddFunc(CQueryParseNode::eXor,
02301         new CQueryFunction_BV_Logic<TNSBitVector>(bm::set_XOR));
02302     qexec.AddFunc(CQueryParseNode::eIn,
02303         new CQueryFunction_BV_In_Or<TNSBitVector>());
02304     qexec.AddFunc(CQueryParseNode::eNot,
02305         new CQueryFunction_BV_Not<TNSBitVector>());
02306 
02307     qexec.AddFunc(CQueryParseNode::eEQ,
02308         new CQueryFunctionEQ(*this));
02309 
02310     {{
02311         CFastMutexGuard guard(GetTagLock());
02312         SetTagDbTransaction(NULL);
02313         qexec.Evaluate(qtree, *top);
02314     }}
02315 
02316     IQueryParseUserObject* uo = top->GetValue().GetUserObject();
02317     if (!uo)
02318         NCBI_THROW(CNetScheduleException,
02319         eQuerySyntaxError, "Query syntax error in eval");
02320     typedef CQueryEval_BV_Value<TNSBitVector> BV_UserObject;
02321     BV_UserObject* result =
02322         dynamic_cast<BV_UserObject*>(uo);
02323     _ASSERT(result);
02324     auto_ptr<TNSBitVector> bv(result->ReleaseBV());
02325     if (!bv.get()) {
02326         bv.reset(new TNSBitVector());
02327         BV_UserObject::TBuffer *buf = result->ReleaseBuffer();
02328         if (buf && buf->size()) {
02329             bm::operation_deserializer<TNSBitVector>::deserialize(*bv,
02330                 &((*buf)[0]),
02331                 0,
02332                 bm::set_ASSIGN);
02333         }
02334     }
02335     // Filter against deleted jobs
02336     FilterJobs(*(bv.get()));
02337 
02338     return bv.release();
02339 }
02340 
02341 void CQueue::PrepareFields(SFieldsDescription& field_descr,
02342                            const list<string>& fields)
02343 {
02344     field_descr.Init();
02345 
02346     // Verify the fields, and convert them into field numbers
02347     ITERATE(list<string>, it, fields) {
02348         const string& field_name = NStr::TruncateSpaces(*it);
02349         string tag_name;
02350         SFieldsDescription::EFieldType ftype;
02351         int i;
02352         if ((i = CJob::GetFieldIndex(field_name)) >= 0) {
02353             ftype = SFieldsDescription::eFT_Job;
02354             if (field_name == "affinity")
02355                 field_descr.has_affinity = true;
02356         } else if ((i = CJobRun::GetFieldIndex(field_name)) >= 0) {
02357             ftype = SFieldsDescription::eFT_Run;
02358             // field_descr.has_run = true;
02359         } else if (field_name == "run") {
02360             ftype = SFieldsDescription::eFT_RunNum;
02361             field_descr.has_run = true;
02362         } else if (NStr::StartsWith(field_name, "tag.")) {
02363             ftype = SFieldsDescription::eFT_Tag;
02364             tag_name = field_name.substr(4);
02365             field_descr.has_tags = true;
02366         } else {
02367             NCBI_THROW(CNetScheduleException, eQuerySyntaxError,
02368                 string("Unknown field: ") + (*it));
02369         }
02370         SFieldsDescription::FFormatter formatter = NULL;
02371         if (i >= 0) {
02372             if (field_name == "id") {
02373                 formatter = FormatNSId;
02374             } else if (field_name == "node_addr" ||
02375                 field_name == "subm_addr") {
02376                     formatter = FormatHostName;
02377             }
02378         }
02379         field_descr.field_types.push_back(ftype);
02380         field_descr.field_nums.push_back(i);
02381         field_descr.formatters.push_back(formatter);
02382         field_descr.pos_to_tag.push_back(tag_name);
02383     }
02384 }
02385 
02386 
02387 void CQueue::ExecProject(TRecordSet&               record_set,
02388                          const TNSBitVector&       ids,
02389                          const SFieldsDescription& field_descr)
02390 {
02391     int record_size = field_descr.field_nums.size();
02392     // Retrieve fields
02393     CJob job;
02394     TNSBitVector::enumerator en(ids.first());
02395     CQueueGuard guard(this);
02396     for ( ; en.valid(); ++en) {
02397         map<string, string> tags;
02398         unsigned job_id = *en;
02399 
02400         // FIXME: fetch minimal required part of the job based on
02401         // field_descr.has_* flags, may be add has_input_output flag
02402         CJob::EJobFetchResult res = job.Fetch(this, job_id);
02403         if (res != CJob::eJF_Ok) {
02404             if (res != CJob::eJF_NotFound)
02405                 ERR_POST(Error << "Error reading queue job db");
02406             continue;
02407         }
02408         if (field_descr.has_affinity)
02409             job.FetchAffinityToken(this);
02410         if (field_descr.has_tags) {
02411             // Parse tags record
02412             ITERATE(TNSTagList, it, job.GetTags()) {
02413                 tags[it->first] = it->second;
02414             }
02415         }
02416         // Fill out the record
02417         if (!field_descr.has_run) {
02418             vector<string> record(record_size);
02419             if (x_FillRecord(record, field_descr, job, tags, job.GetLastRun(), 0))
02420                 record_set.push_back(record);
02421         } else {
02422             int run_num = 1;
02423             ITERATE(vector<CJobRun>, it, job.GetRuns()) {
02424                 vector<string> record(record_size);
02425                 if (x_FillRecord(record, field_descr, job, tags, &(*it), run_num++))
02426                     record_set.push_back(record);
02427             }
02428         }
02429     }
02430 }
02431 
02432 
02433 bool CQueue::x_FillRecord(vector<string>&           record,
02434                           const SFieldsDescription& field_descr,
02435                           const CJob&               job,
02436                           map<string, string>&      tags,
02437                           const CJobRun*            run,
02438                           int                       run_num)
02439 {
02440     int record_size = field_descr.field_nums.size();
02441 
02442     bool complete = true;
02443     for (int i = 0; i < record_size; ++i) {
02444         int fnum = field_descr.field_nums[i];
02445         switch(field_descr.field_types[i]) {
02446         case SFieldsDescription::eFT_Job:
02447             record[i] = job.GetField(fnum);
02448             break;
02449         case SFieldsDescription::eFT_Run:
02450             if (run)
02451                 record[i] = run->GetField(fnum);
02452             else
02453                 record[i] = "NULL";
02454             break;
02455         case SFieldsDescription::eFT_Tag:
02456             {
02457                 map<string, string>::iterator it =
02458                     tags.find((field_descr.pos_to_tag)[i]);
02459                 if (it == tags.end()) {
02460                     complete = false;
02461                 } else {
02462                     record[i] = it->second;
02463                 }
02464             }
02465             break;
02466         case SFieldsDescription::eFT_RunNum:
02467             record[i] = NStr::IntToString(run_num);
02468             break;
02469         }
02470         if (!complete) break;
02471     }
02472     return complete;
02473 }
02474 
02475 
02476 //////////////////////////////////////////////////////////////////////////
02477 // Worker node
02478 
02479 
02480 void CQueue::ClearWorkerNode(CWorkerNode* worker_node, const string& reason)
02481 {
02482     TJobList jobs;
02483     m_WorkerNodeList.ClearNode(worker_node, jobs);
02484     x_FailJobs(jobs, worker_node, "Node closed, " + reason);
02485 }
02486 
02487 
02488 void CQueue::ClearWorkerNode(const string& node_id, const string& reason)
02489 {
02490     TJobList jobs;
02491     TWorkerNodeRef worker_node = m_WorkerNodeList.ClearNode(node_id, jobs);
02492 
02493     if (worker_node)
02494         x_FailJobs(jobs, worker_node, "Node closed, " + reason);
02495 }
02496 
02497 void CQueue::x_FailJobs(const TJobList& jobs,
02498                         CWorkerNode* worker_node,
02499                         const string& err_msg)
02500 {
02501     ITERATE(TJobList, it, jobs) {
02502         FailJob(worker_node, *it, err_msg, "", 0);
02503     }
02504 }
02505 
02506 void CQueue::NotifyListeners(bool unconditional, unsigned aff_id)
02507 {
02508     // TODO: if affinity valency is full for this aff_id, notify only nodes
02509     // with this aff_id, otherwise notify all nodes in the hope that some
02510     // of them will pick up the task with this aff_id
02511     if (!m_HasNotificationPort) return;
02512 
02513     int notify_timeout = GetNotifyTimeout();
02514 
02515     time_t curr = time(0);
02516 
02517     list<TWorkerNodeHostPort> notify_list;
02518     if ((unconditional || m_StatusTracker.AnyPending()) &&
02519         m_WorkerNodeList.GetNotifyList(unconditional, curr,
02520                                        notify_timeout, notify_list)) {
02521 
02522 #define JSQ_PREFIX "NCBI_JSQ_"
02523         char msg[256];
02524         strcpy(msg, JSQ_PREFIX);
02525         strncat(msg, m_QueueName.c_str(), sizeof(msg) - sizeof(JSQ_PREFIX) - 1);
02526         size_t msg_len = sizeof(JSQ_PREFIX) + m_QueueName.length();
02527 
02528         unsigned i = 0;
02529         ITERATE(list<TWorkerNodeHostPort>, it, notify_list) {
02530             unsigned host = it->first;
02531             unsigned short port = it->second;
02532             {{
02533                 CFastMutexGuard guard(m_UdpSocketLock);
02534                 //EIO_Status status =
02535                     m_UdpSocket.Send(msg, msg_len,
02536                                        CSocketAPI::ntoa(host), port);
02537             }}
02538             // periodically check if we have no more jobs left
02539             if ((++i % 10 == 0) && !m_StatusTracker.AnyPending())
02540                 break;
02541         }
02542     }
02543 }
02544 
02545 
02546 void CQueue::PrintWorkerNodeStat(CNcbiOstream& out,
02547                                  time_t curr,
02548                                  EWNodeFormat fmt) const
02549 {
02550     list<TWorkerNodeRef> nodes;
02551     m_WorkerNodeList.GetNodes(curr, nodes);
02552     ITERATE(list<TWorkerNodeRef>, it, nodes) {
02553         const CWorkerNode* wn = *it;
02554         out << wn->AsString(curr, fmt) << "\n";
02555     }
02556 }
02557 
02558 
02559 void CQueue::UnRegisterNotificationListener(CWorkerNode* worker_node)
02560 {
02561     if (m_WorkerNodeList.UnRegisterNotificationListener(worker_node)) {
02562         // Clean affinity association only for old style worker nodes
02563         // New style nodes should explicitly call ClearWorkerNode
02564         ClearWorkerNode(worker_node, "URGC");
02565     }
02566 }
02567 
02568 
02569 void CQueue::AddJobToWorkerNode(CWorkerNode*            worker_node,
02570                                 CRequestContextFactory* rec_ctx_f,
02571                                 const CJob&             job,
02572                                 time_t                  exp_time)
02573 {
02574     unsigned log_job_state = GetLogJobState();
02575     m_WorkerNodeList.AddJob(worker_node, job, exp_time,
02576                             rec_ctx_f, log_job_state);
02577     CountEvent(eStatGetEvent);
02578 }
02579 
02580 
02581 void CQueue::UpdateWorkerNodeJob(unsigned job_id, time_t exp_time)
02582 {
02583     m_WorkerNodeList.UpdateJob(job_id, exp_time);
02584 }
02585 
02586 
02587 void CQueue::RemoveJobFromWorkerNode(const CJob&   job,
02588                                      ENSCompletion reason)
02589 {
02590     unsigned log_job_state = GetLogJobState();
02591     m_WorkerNodeList.RemoveJob(job, reason, log_job_state);
02592     CountEvent(eStatPutEvent);
02593 }
02594 
02595 
02596 // Affinity
02597 
02598 void CQueue::GetAllAssignedAffinities(time_t t, TNSBitVector& aff_ids)
02599 {
02600     CQueueWorkerNodeListGuard wnl(m_WorkerNodeList);
02601     wnl.GetAffinities(t, aff_ids);
02602 }
02603 
02604 
02605 void CQueue::AddAffinity(CWorkerNode* worker_node,
02606                          unsigned     aff_id,
02607                          time_t       exp_time)
02608 {
02609     CWorkerNodeAffinityGuard na(*worker_node);
02610     CStopWatch sw(CStopWatch::eStart);
02611     na.AddAffinity(aff_id,exp_time);
02612 //    LOG_POST(Warning << "Added affinity1: " << sw.Elapsed() * 1000 << "ms");
02613 }
02614 
02615 
02616 void CQueue::BlacklistJob(CWorkerNode*  worker_node,
02617                           unsigned      job_id,
02618                           time_t        exp_time)
02619 {
02620     CWorkerNodeAffinityGuard na(*worker_node);
02621     na.BlacklistJob(job_id, exp_time);
02622 }
02623 
02624 
02625 // Tags
02626 
02627 void CQueue::SetTagDbTransaction(CBDB_Transaction* trans)
02628 {
02629     m_TagDB.SetTransaction(trans);
02630 }
02631 
02632 
02633 void CQueue::AppendTags(CNSTagMap&        tag_map,
02634                         const TNSTagList& tags,
02635                         unsigned          job_id)
02636 {
02637     ITERATE(TNSTagList, it, tags) {
02638         /*
02639         auto_ptr<TNSBitVector> bv(new TNSBitVector(bm::BM_GAP));
02640         pair<TNSTagMap::iterator, bool> tag_map_it =
02641             (*tag_map).insert(TNSTagMap::value_type((*it), bv.get()));
02642         if (tag_map_it.second) {
02643             bv.release();
02644         }
02645         */
02646         TNSTagMap::iterator tag_it = (*tag_map).find((*it));
02647         if (tag_it == (*tag_map).end()) {
02648             pair<TNSTagMap::iterator, bool> tag_map_it =
02649 //                (*tag_map).insert(TNSTagMap::value_type((*it), new TNSBitVector(bm::BM_GAP)));
02650                 (*tag_map).insert(TNSTagMap::value_type((*it), new TNSShortIntSet));
02651             tag_it = tag_map_it.first;
02652         }
02653 //        tag_it->second->set(job_id);
02654         tag_it->second->push_back(job_id);
02655     }
02656 }
02657 
02658 
02659 void CQueue::FlushTags(CNSTagMap& tag_map, CBDB_Transaction& trans)
02660 {
02661     CFastMutexGuard guard(m_TagLock);
02662     m_TagDB.SetTransaction(&trans);
02663     NON_CONST_ITERATE(TNSTagMap, it, *tag_map) {
02664         m_TagDB.key = it->first.first;
02665         m_TagDB.val = it->first.second;
02666         /*
02667         EBDB_ErrCode err = m_TagDB.ReadVector(it->second, bm::set_OR);
02668         if (err != eBDB_Ok && err != eBDB_NotFound) {
02669             // TODO: throw db error
02670         }
02671         m_TagDB.key = it->first.first;
02672         m_TagDB.val = it->first.second;
02673         it->second->optimize();
02674         if (it->first.first == "transcript") {
02675             it->second->stat();
02676         }
02677         m_TagDB.WriteVector(*(it->second), STagDB::eNoCompact);
02678         */
02679 
02680         TNSBitVector bv_tmp(bm::BM_GAP);
02681         EBDB_ErrCode err = m_TagDB.ReadVector(&bv_tmp);
02682         if (err != eBDB_Ok && err != eBDB_NotFound) {
02683             // TODO: throw db error
02684         }
02685         bm::combine_or(bv_tmp, it->second->begin(), it->second->end());
02686 
02687         m_TagDB.key = it->first.first;
02688         m_TagDB.val = it->first.second;
02689         m_TagDB.WriteVector(bv_tmp, STagDB::eNoCompact);
02690 
02691         delete it->second;
02692         it->second = 0;
02693     }
02694     (*tag_map).clear();
02695 }
02696 
02697 
02698 bool CQueue::ReadTag(const string& key,
02699                      const string& val,
02700                      TBuffer*      buf)
02701 {
02702     // Guarded by m_TagLock through GetTagLock()
02703     CBDB_FileCursor cur(m_TagDB);
02704     cur.SetCondition(CBDB_FileCursor::eEQ);
02705     cur.From << key << val;
02706 
02707     return cur.Fetch(buf) == eBDB_Ok;
02708 }
02709 
02710 
02711 void CQueue::ReadTags(const string& key, TNSBitVector* bv)
02712 {
02713     // Guarded by m_TagLock through GetTagLock()
02714     CBDB_FileCursor cur(m_TagDB);
02715     cur.SetCondition(CBDB_FileCursor::eEQ);
02716     cur.From << key;
02717     TBuffer buf;
02718     bm::set_operation op_code = bm::set_ASSIGN;
02719     EBDB_ErrCode err;
02720     while ((err = cur.Fetch(&buf)) == eBDB_Ok) {
02721         bm::operation_deserializer<TNSBitVector>::deserialize(
02722             *bv, &(buf[0]), 0, op_code);
02723         op_code = bm::set_OR;
02724     }
02725     if (err != eBDB_Ok  &&  err != eBDB_NotFound) {
02726         // TODO: signal disaster somehow, e.g. throw CBDB_Exception
02727     }
02728 }
02729 
02730 
02731 void CQueue::x_RemoveTags(CBDB_Transaction&   trans,
02732                           const TNSBitVector& ids)
02733 {
02734     CFastMutexGuard guard(m_TagLock);
02735     m_TagDB.SetTransaction(&trans);
02736     CBDB_FileCursor cur(m_TagDB, trans,
02737                         CBDB_FileCursor::eReadModifyUpdate);
02738     // iterate over tags database, deleting ids from every entry
02739     cur.SetCondition(CBDB_FileCursor::eFirst);
02740     CBDB_RawFile::TBuffer buf;
02741     TNSBitVector bv;
02742     while (cur.Fetch(&buf) == eBDB_Ok) {
02743         bm::deserialize(bv, &buf[0]);
02744         unsigned before_remove = bv.count();
02745         bv -= ids;
02746         unsigned new_count;
02747         if ((new_count = bv.count()) != before_remove) {
02748             if (new_count) {
02749                 TNSBitVector::statistics st;
02750                 bv.optimize(0, TNSBitVector::opt_compress, &st);
02751                 if (st.max_serialize_mem > buf.size()) {
02752                     buf.resize(st.max_serialize_mem);
02753                 }
02754 
02755                 size_t size = bm::serialize(bv, &buf[0]);
02756                 cur.UpdateBlob(&buf[0], size);
02757             } else {
02758                 cur.Delete(CBDB_File::eIgnoreError);
02759             }
02760         }
02761         bv.clear(true);
02762     }
02763 }
02764 
02765 
02766 //
02767 
02768 void CQueue::CheckExecutionTimeout()
02769 {
02770     if (!m_RunTimeLine)
02771         return;
02772     unsigned queue_run_timeout = GetRunTimeout();
02773     time_t curr = time(0);
02774     TNSBitVector bv;
02775     {{
02776         CReadLockGuard guard(m_RunTimeLineLock);
02777         m_RunTimeLine->ExtractObjects(curr, &bv);
02778     }}
02779     TNSBitVector::enumerator en(bv.first());
02780     for ( ;en.valid(); ++en) {
02781         x_CheckExecutionTimeout(queue_run_timeout, *en, curr);
02782     }
02783 }
02784 
02785 
02786 void
02787 CQueue::x_CheckExecutionTimeout(unsigned queue_run_timeout,
02788                                 unsigned job_id,
02789                                 time_t curr_time)
02790 {
02791     unsigned log_job_state = GetLogJobState();
02792     TJobStatus status = GetJobStatus(job_id);
02793 
02794     TJobStatus new_status, run_status;
02795     if (status == CNetScheduleAPI::eRunning) {
02796         new_status = CNetScheduleAPI::ePending;
02797         run_status = CNetScheduleAPI::eTimeout;
02798     } else if (status == CNetScheduleAPI::eReading) {
02799         new_status = CNetScheduleAPI::eDone;
02800         run_status = CNetScheduleAPI::eReadTimeout;
02801     } else {
02802         return;
02803     }
02804 
02805     CJob job;
02806     CNS_Transaction trans(this);
02807 
02808     unsigned time_start = 0;
02809     unsigned run_timeout = 0;
02810     time_t   exp_time;
02811     {{
02812         CQueueGuard guard(this, &trans);
02813 
02814         CJob::EJobFetchResult res = job.Fetch(this, job_id);
02815         if (res != CJob::eJF_Ok)
02816             return;
02817 
02818         if (job.GetStatus() != status) {
02819             ERR_POST(Error << "Job status mismatch between status tracker"
02820                            << " and database for job " << DecorateJobId(job_id));
02821             return;
02822         }
02823 
02824         CJobRun* run = job.GetLastRun();
02825         if (!run) {
02826             ERR_POST(Error << "No JobRun for running job "
02827                            << DecorateJobId(job_id));
02828             // fix it here as well as we can
02829             run = &job.AppendRun();
02830             run->SetTimeStart(curr_time);
02831         }
02832         time_start = run->GetTimeStart();
02833         _ASSERT(time_start);
02834         run_timeout = job.GetRunTimeout();
02835         if (run_timeout == 0) run_timeout = queue_run_timeout;
02836 
02837         exp_time = run_timeout ? time_start + run_timeout : 0;
02838         if (curr_time < exp_time) {
02839             // we need to register job in time line
02840             TimeLineAdd(job_id, exp_time);
02841             return;
02842         }
02843 
02844         if (status == CNetScheduleAPI::eReading) {
02845             unsigned group = job.GetReadGroup();
02846             x_RemoveFromReadGroup(group, job_id);
02847         }
02848 
02849         job.SetStatus(new_status);
02850         run->SetStatus(run_status);
02851         run->SetTimeDone(curr_time);
02852 
02853         job.Flush(this);
02854     }}
02855 
02856     trans.Commit();
02857 
02858     m_StatusTracker.SetStatus(job_id, new_status);
02859 
02860     if (status == CNetScheduleAPI::eRunning)
02861         m_WorkerNodeList.RemoveJob(job, eNSCTimeout, log_job_state);
02862 
02863     {{
02864         CTime tm(CTime::eCurrent);
02865         string msg = tm.AsString();
02866         msg += " CQueue::CheckExecutionTimeout: Job rescheduled for ";
02867         if (status == CNetScheduleAPI::eRunning)
02868             msg += "execution";
02869         else
02870             msg += "reading";
02871         msg += " id=";
02872         msg += NStr::IntToString(job_id);
02873         tm.SetTimeT(time_start);
02874         tm.ToLocalTime();
02875         msg += " time_start=";
02876         msg += tm.AsString();
02877 
02878         tm.SetTimeT(exp_time);
02879         tm.ToLocalTime();
02880         msg += " exp_time=";
02881         msg += tm.AsString();
02882         msg += " run_timeout(sec)=";
02883         msg += NStr::IntToString(run_timeout);
02884         msg += " run_timeout(minutes)=";
02885         msg += NStr::IntToString(run_timeout/60);
02886         LOG_POST(Warning << msg);
02887 
02888         if (IsMonitoring()) {
02889             MonitorPost(msg);
02890         }
02891     }}
02892 }
02893 
02894 
02895 unsigned CQueue::CheckJobsExpiry(unsigned batch_size, TJobStatus status)
02896 {
02897     unsigned queue_timeout, queue_run_timeout;
02898     {{
02899         CQueueParamAccessor qp(*this);
02900         queue_timeout = qp.GetTimeout();
02901         queue_run_timeout = qp.GetRunTimeout();
02902     }}
02903 
02904     TNSBitVector job_ids;
02905     TNSBitVector not_found_jobs;
02906     time_t curr = time(0);
02907     CJob job;
02908     unsigned del_count = 0;
02909     bool has_db_error = false;
02910 #ifdef _DEBUG
02911     static bool debug_job_discrepancy = false;
02912 #endif
02913     {{
02914         CQueueGuard guard(this);
02915         unsigned job_id = 0;
02916         for (unsigned n = 0; n < batch_size; ++n) {
02917             job_id = m_StatusTracker.GetNext(status, job_id);
02918             if (job_id == 0)
02919                 break;
02920             // FIXME: should fetch only main part of the job and run info,
02921             // does not need potentially huge tags and input/output
02922             CJob::EJobFetchResult res = job.Fetch(this, job_id);
02923             if (res != CJob::eJF_Ok
02924 #ifdef _DEBUG
02925                 || debug_job_discrepancy
02926 #endif
02927             ) {
02928                 if (res != CJob::eJF_NotFound)
02929                     has_db_error = true;
02930                 // ? already deleted - report as a batch later
02931                 not_found_jobs.set_bit(job_id);
02932                 m_StatusTracker.Erase(job_id);
02933                 // Do not break the process of erasing expired jobs
02934                 continue;
02935             }
02936 
02937             // Is the job expired?
02938             time_t time_update, time_done;
02939             time_t timeout, run_timeout;
02940 
02941             TJobStatus status = job.GetStatus();
02942 
02943             timeout = job.GetTimeout();
02944             if (timeout == 0) timeout = queue_timeout;
02945             run_timeout = job.GetRunTimeout();
02946             if (run_timeout == 0) run_timeout = queue_run_timeout;
02947 
02948             // Calculate time of last update and effective timeout
02949             const CJobRun* run = job.GetLastRun();
02950             time_done = 0;
02951             if (run) time_done = run->GetTimeDone();
02952             if (status == CNetScheduleAPI::eRunning ||
02953                 status == CNetScheduleAPI::eReading) {
02954                 // Running/reading job
02955                 if (run) {
02956                     time_update = run->GetTimeStart();
02957                 } else {
02958                     ERR_POST(Error << "No JobRun for running/reading job "
02959                                    << DecorateJobId(job_id));
02960                     time_update = 0; // ??? force job deletion
02961                 }
02962                 timeout += run_timeout;
02963             } else if (time_done == 0) {
02964                 // Submitted job
02965                 time_update = job.GetTimeSubmit();
02966             } else {
02967                 // Done, Failed, Canceled, Reading, Confirmed, ReadFailed
02968                 time_update = time_done;
02969             }
02970 
02971             if (time_update + timeout <= curr) {
02972                 m_StatusTracker.Erase(job_id);
02973                 job_ids.set_bit(job_id);
02974                 if (status == CNetScheduleAPI::eReading) {
02975                     unsigned group_id = job.GetReadGroup();
02976                     x_RemoveFromReadGroup(group_id, job_id);
02977                 }
02978                 ++del_count;
02979             }
02980         }
02981     }}
02982     if (not_found_jobs.any()) {
02983         if (has_db_error) {
02984             LOG_POST(Error << "While deleting, errors in database"
02985                     << ", status " << CNetScheduleAPI::StatusToString(status)
02986                     << ", queue "  << m_QueueName
02987                     << ", job(s) " << NS_EncodeBitVector(not_found_jobs));
02988         } else {
02989             LOG_POST(Warning << "While deleting, jobs not found in database"
02990                     << ", status " << CNetScheduleAPI::StatusToString(status)
02991                     << ", queue "  << m_QueueName
02992                     << ", job(s) " << NS_EncodeBitVector(not_found_jobs));
02993         }
02994     }
02995     if (del_count)
02996         Erase(job_ids);
02997     return del_count;
02998 }
02999 
03000 
03001 void CQueue::TimeLineMove(unsigned job_id, time_t old_time, time_t new_time)
03002 {
03003     CWriteLockGuard guard(m_RunTimeLineLock);
03004     m_RunTimeLine->MoveObject(old_time, new_time, job_id);
03005 }
03006 
03007 
03008 void CQueue::TimeLineAdd(unsigned job_id, time_t timeout)
03009 {
03010     if (!job_id  ||  !m_RunTimeLine  ||  !timeout) return;
03011     CWriteLockGuard guard(m_RunTimeLineLock);
03012     m_RunTimeLine->AddObject(timeout, job_id);
03013 }
03014 
03015 
03016 void CQueue::TimeLineRemove(unsigned job_id)
03017 {
03018     if (!m_RunTimeLine) return;
03019 
03020     CWriteLockGuard guard(m_RunTimeLineLock);
03021     m_RunTimeLine->RemoveObject(job_id);
03022 
03023     if (IsMonitoring()) {
03024         CTime tmp_t(CTime::eCurrent);
03025         string msg = tmp_t.AsString();
03026         msg += " CQueue::RemoveFromTimeLine: job id=";
03027         msg += NStr::IntToString(job_id);
03028         MonitorPost(msg);
03029     }
03030 }
03031 
03032 
03033 void CQueue::TimeLineExchange(unsigned remove_job_id,
03034                               unsigned add_job_id,
03035                               time_t   timeout)
03036 {
03037     if (!m_RunTimeLine) return;
03038 
03039     CWriteLockGuard guard(m_RunTimeLineLock);
03040     if (remove_job_id)
03041         m_RunTimeLine->RemoveObject(remove_job_id);
03042     if (add_job_id)
03043         m_RunTimeLine->AddObject(timeout, add_job_id);
03044 
03045     if (IsMonitoring()) {
03046         CTime tmp_t(CTime::eCurrent);
03047         string msg = tmp_t.AsString();
03048         msg += " CQueue::TimeLineExchange:";
03049         if (remove_job_id) {
03050             msg += " job removed=";
03051             msg += NStr::IntToString(remove_job_id);
03052         }
03053         if (add_job_id) {
03054             msg += " job added=";
03055             msg += NStr::IntToString(add_job_id);
03056         }
03057         MonitorPost(msg);
03058     }
03059 }
03060 
03061 
03062 unsigned CQueue::DeleteBatch(unsigned batch_size)
03063 {
03064     TNSBitVector batch;
03065     {{
03066         CFastMutexGuard guard(m_JobsToDeleteLock);
03067         unsigned job_id = 0;
03068         for (unsigned n = 0; n < batch_size &&
03069                              (job_id = m_JobsToDelete.extract_next(job_id));
03070              ++n)
03071         {
03072             batch.set(job_id);
03073         }
03074         if (batch.any())
03075             FlushDeletedVectors(eVIJob);
03076     }}
03077     if (batch.none()) return 0;
03078 
03079     unsigned actual_batch_size = batch.count();
03080     unsigned chunks = (actual_batch_size + 999) / 1000;
03081     unsigned chunk_size = actual_batch_size / chunks;
03082     unsigned residue = actual_batch_size - chunks*chunk_size;
03083 
03084     TNSBitVector::enumerator en = batch.first();
03085     unsigned del_rec = 0;
03086     while (en.valid()) {
03087         unsigned txn_size = chunk_size;
03088         if (residue) {
03089             ++txn_size; --residue;
03090         }
03091 
03092         CNS_Transaction trans(this);
03093         CQueueGuard guard(this, &trans);
03094 
03095         unsigned n;
03096         for (n = 0; en.valid() && n < txn_size; ++en, ++n) {
03097             unsigned job_id = *en;
03098             m_JobDB.id = job_id;
03099             try {
03100                 m_JobDB.Delete();
03101                 ++del_rec;
03102             } catch (CBDB_ErrnoException& ex) {
03103                 ERR_POST(Error << "BDB error " << ex.what());
03104             }
03105 
03106             m_JobInfoDB.id = job_id;
03107             try {
03108                 m_JobInfoDB.Delete();
03109             } catch (CBDB_ErrnoException& ex) {
03110                 ERR_POST(Error << "BDB error " << ex.what());
03111             }
03112         }
03113         trans.Commit();
03114         // x_RemoveTags(trans, batch);
03115     }
03116     if (del_rec > 0 && IsMonitoring()) {
03117         CTime tm(CTime::eCurrent);
03118         string msg = tm.AsString();
03119         msg += " CQueue::DeleteBatch: " +
03120             NStr::IntToString(del_rec) + " job(s) deleted";
03121         MonitorPost(msg);
03122     }
03123     return del_rec;
03124 }
03125 
03126 
03127 CBDB_FileCursor& CQueue::GetRunsCursor()
03128 {
03129     CBDB_Transaction* trans = m_RunsDB.GetBDBTransaction();
03130     CBDB_FileCursor* cur = m_RunsCursor.get();
03131     if (!cur) {
03132         cur = new CBDB_FileCursor(m_RunsDB);
03133         m_RunsCursor.reset(cur);
03134     } else {
03135         cur->Close();
03136     }
03137     cur->ReOpen(trans);
03138     return *cur;
03139 }
03140 
03141 
03142 void CQueue::SetMonitorSocket(CSocket& socket)
03143 {
03144     m_Monitor.SetSocket(socket);
03145 }
03146 
03147 
03148 bool CQueue::IsMonitoring()
03149 {
03150     return m_Monitor.IsMonitorActive();
03151 }
03152 
03153 
03154 void CQueue::MonitorPost(const string& msg)
03155 {
03156     m_Monitor.SendString(msg+'\n');
03157 }
03158 
03159 
03160 void CQueue::PrintSubmHosts(CNcbiOstream& out) const
03161 {
03162     CQueueParamAccessor qp(*this);
03163     qp.GetSubmHosts().PrintHosts(out);
03164 }
03165 
03166 
03167 void CQueue::PrintWNodeHosts(CNcbiOstream& out) const
03168 {
03169     CQueueParamAccessor qp(*this);
03170     qp.GetWnodeHosts().PrintHosts(out);
03171 }
03172 
03173 
03174 void CQueue::PrintJobStatusMatrix(CNcbiOstream& out) const
03175 {
03176     m_StatusTracker.PrintStatusMatrix(out);
03177 }
03178 
03179 
03180 #define NS_PRINT_TIME(msg, t) \
03181 do \
03182 { time_t tt = t; \
03183     CTime _t(tt); _t.ToLocalTime(); \
03184     out << msg << (tt ? _t.AsString() : kEmptyStr) << fsp; \
03185 } while(0)
03186 
03187 #define NS_PFNAME(x_fname) \
03188     (fflag ? x_fname : "")
03189 
03190 void CQueue::x_PrintJobStat(const CJob&   job,
03191                             unsigned      queue_run_timeout,
03192                             CNcbiOstream& out,
03193                             const char*   fsp,
03194                             bool          fflag)
03195 {
03196     out << fsp << NS_PFNAME("id: ") << job.GetId() << fsp;
03197     TJobStatus status = job.GetStatus();
03198     out << NS_PFNAME("status: ") << CNetScheduleAPI::StatusToString(status)
03199         << fsp;
03200 
03201     const CJobRun* last_run = job.GetLastRun();
03202 
03203     NS_PRINT_TIME(NS_PFNAME("time_submit: "), job.GetTimeSubmit());
03204     if (last_run) {
03205         NS_PRINT_TIME(NS_PFNAME("time_start: "), last_run->GetTimeStart());
03206         NS_PRINT_TIME(NS_PFNAME("time_done: "), last_run->GetTimeDone());
03207     }
03208 
03209     out << NS_PFNAME("timeout: ") << job.GetTimeout() << fsp;
03210     unsigned run_timeout = job.GetRunTimeout();
03211     out << NS_PFNAME("run_timeout: ") << run_timeout << fsp;
03212 
03213     if (last_run) {
03214         if (run_timeout == 0) run_timeout = queue_run_timeout;
03215         time_t exp_time =
03216             run_timeout == 0 ? 0 : last_run->GetTimeStart() + run_timeout;
03217         NS_PRINT_TIME(NS_PFNAME("time_run_expire: "), exp_time);
03218     }
03219 
03220     unsigned subm_addr = job.GetSubmAddr();
03221     out << NS_PFNAME("subm_addr: ")
03222         << (subm_addr ? CSocketAPI::gethostbyaddr(subm_addr) : kEmptyStr) << fsp;
03223     out << NS_PFNAME("subm_port: ") << job.GetSubmPort() << fsp;
03224     out << NS_PFNAME("subm_timeout: ") << job.GetSubmTimeout() << fsp;
03225 
03226     int node_num = 1;
03227     ITERATE(vector<CJobRun>, it, job.GetRuns()) {
03228         unsigned addr = it->GetNodeAddr();
03229         string node_name = "worker_node" + NStr::IntToString(node_num++) + ": ";
03230         out << NS_PFNAME(node_name)
03231             << (addr ? CSocketAPI::gethostbyaddr(addr) : kEmptyStr) << fsp;
03232     }
03233 
03234     out << NS_PFNAME("run_counter: ") << job.GetRunCount() << fsp;
03235     if (last_run)
03236         out << NS_PFNAME("ret_code: ") << last_run->GetRetCode() << fsp;
03237 
03238     unsigned aff_id = job.GetAffinityId();
03239     if (aff_id) {
03240         out << NS_PFNAME("aff_token: ") << "'" << job.GetAffinityToken()
03241             << "'" << fsp;
03242     }
03243     out << NS_PFNAME("aff_id: ") << aff_id << fsp;
03244     out << NS_PFNAME("mask: ") << job.GetMask() << fsp;
03245 
03246     out << NS_PFNAME("input: ")  << "'" << job.GetInput()  << "'" << fsp;
03247     out << NS_PFNAME("output: ") << "'" << job.GetOutput() << "'" << fsp;
03248     //out << NS_PFNAME("tags: ") << "'" << job.GetTags() << "'" << fsp;
03249     if (last_run) {
03250         out << NS_PFNAME("err_msg: ")
03251             << "'" << last_run->GetErrorMsg() << "'" << fsp;
03252     }
03253     out << NS_PFNAME("progress_msg: ")
03254         << "'" << job.GetProgressMsg() << "'" << fsp;
03255     out << "\n";
03256 }
03257 
03258 
03259 void CQueue::x_PrintShortJobStat(const CJob&   job,
03260                                  const string& host,
03261                                  unsigned      port,
03262                                  CNcbiOstream& out,
03263                                  const char*   fsp)
03264 {
03265     out << string(CNetScheduleKey(job.GetId(), host, port)) << fsp;
03266     TJobStatus status = job.GetStatus();
03267     out << CNetScheduleAPI::StatusToString(status) << fsp;
03268 
03269     out << "'" << job.GetInput()    << "'" << fsp;
03270     out << "'" << job.GetOutput()   << "'" << fsp;
03271     const CJobRun* last_run = job.GetLastRun();
03272     if (last_run)
03273         out << "'" << last_run->GetErrorMsg() << "'" << fsp;
03274     else
03275         out << "''" << fsp;
03276 
03277     out << "\n";
03278 }
03279 
03280 
03281 void
03282 CQueue::PrintJobDbStat(unsigned      job_id,
03283                        CNcbiOstream& out,
03284                        TJobStatus    status)
03285 {
03286     unsigned queue_run_timeout = GetRunTimeout();
03287 
03288     CJob job;
03289     if (status == CNetScheduleAPI::eJobNotFound) {
03290         CQueueGuard guard(this);
03291         CJob::EJobFetchResult res = job.Fetch(this, job_id);
03292 
03293         if (res == CJob::eJF_Ok) {
03294             job.FetchAffinityToken(this);
03295             x_PrintJobStat(job, queue_run_timeout, out);
03296         } else {
03297             out << "Job not found id=" << DecorateJobId(job_id);
03298         }
03299         out << "\n";
03300     } else {
03301         TNSBitVector bv;
03302         JobsWithStatus(status, &bv);
03303 
03304         TNSBitVector::enumerator en(bv.first());
03305         for (;en.valid(); ++en) {
03306             CQueueGuard guard(this);
03307             CJob::EJobFetchResult res = job.Fetch(this, *en);
03308             if (res == CJob::eJF_Ok) {
03309                 job.FetchAffinityToken(this);
03310                 x_PrintJobStat(job, queue_run_timeout, out);
03311             }
03312         }
03313         out << "\n";
03314     }
03315 }
03316 
03317 
03318 void CQueue::PrintAllJobDbStat(CNcbiOstream& out)
03319 {
03320     unsigned queue_run_timeout = GetRunTimeout();
03321 
03322     CJob job;
03323     CQueueGuard guard(this);
03324 
03325     CQueueEnumCursor cur(this);
03326     while (cur.Fetch() == eBDB_Ok) {
03327         CJob::EJobFetchResult res = job.Fetch(this);
03328         if (res == CJob::eJF_Ok) {
03329             job.FetchAffinityToken(this);
03330             x_PrintJobStat(job, queue_run_timeout, out);
03331         }
03332         if (!out.good()) break;
03333     }
03334 }
03335 
03336 
03337 void CQueue::PrintQueue(CNcbiOstream& out,
03338                         TJobStatus    job_status,
03339                         const string& host,
03340                         unsigned      port)
03341 {
03342     TNSBitVector bv;
03343     JobsWithStatus(job_status, &bv);
03344 
03345     CJob job;
03346     TNSBitVector::enumerator en(bv.first());
03347     for (;en.valid(); ++en) {
03348         CQueueGuard guard(this);
03349 
03350         CJob::EJobFetchResult res = job.Fetch(this, *en);
03351         if (res == CJob::eJF_Ok)
03352             x_PrintShortJobStat(job, host, port, out);
03353     }
03354 }
03355 
03356 
03357 unsigned CQueue::CountStatus(TJobStatus st) const
03358 {
03359     return m_StatusTracker.CountStatus(st);
03360 }
03361 
03362 
03363 void
03364 CQueue::StatusStatistics(TJobStatus status,
03365                          TNSBitVector::statistics* st) const
03366 {
03367     m_StatusTracker.StatusStatistics(status, st);
03368 }
03369 
03370 
03371 unsigned CQueue::CountRecs()
03372 {
03373     CQueueGuard guard(this);
03374     return m_JobDB.CountRecs();
03375 }
03376 
03377 
03378 void CQueue::PrintStat(CNcbiOstream& out)
03379 {
03380     CQueueGuard guard(this);
03381     m_JobDB.PrintStat(out);
03382 }
03383 
03384 
03385 const unsigned kMeasureInterval = 1;
03386 // for informational purposes only, see kDecayExp below
03387 const unsigned kDecayInterval = 10;
03388 const unsigned kFixedShift = 7;
03389 const unsigned kFixed_1 = 1 << kFixedShift;
03390 // kDecayExp = 2 ^ kFixedShift / 2 ^ ( kMeasureInterval * log2(e) / kDecayInterval)
03391 const unsigned kDecayExp = 116;
03392 
03393 CQueue::CStatisticsThread::CStatisticsThread(TContainer& container)
03394     : CThreadNonStop(kMeasureInterval),
03395         m_Container(container)
03396 {
03397 }
03398 
03399 
03400 void CQueue::CStatisticsThread::DoJob(void) {
03401     unsigned counter;
03402     for (TStatEvent n = 0; n < eStatNumEvents; ++n) {
03403         counter = m_Container.m_EventCounter[n].Get();
03404         m_Container.m_EventCounter[n].Add(-counter);
03405         m_Container.m_Average[n] = (kDecayExp * m_Container.m_Average[n] +
03406                                 (kFixed_1-kDecayExp) * (counter << kFixedShift)
03407                                    ) >> kFixedShift;
03408     }
03409 }
03410 
03411 
03412 void CQueue::CountEvent(TStatEvent event, int num)
03413 {
03414     m_EventCounter[event].Add(num);
03415 }
03416 
03417 
03418 double CQueue::GetAverage(TStatEvent n_event)
03419 {
03420     return m_Average[n_event] / double(kFixed_1 * kMeasureInterval);
03421 }
03422 
03423 
03424 void CQueue::x_AddToReadGroupNoLock(unsigned group_id, unsigned job_id)
03425 {
03426     TGroupMap::iterator i = m_GroupMap.find(group_id);
03427     if (i != m_GroupMap.end()) {
03428         TNSBitVector& bv = (*i).second;
03429         bv.set(job_id);
03430     } else {
03431         TNSBitVector bv;
03432         bv.set(job_id);
03433         m_GroupMap[group_id] = bv;
03434     }
03435 }
03436 
03437 
03438 void CQueue::x_CreateReadGroup(unsigned group_id, const TNSBitVector& bv_jobs)
03439 {
03440     CFastMutexGuard guard(m_GroupMapLock);
03441     m_GroupMap[group_id] = bv_jobs;
03442 }
03443 
03444 
03445 void CQueue::x_RemoveFromReadGroup(unsigned group_id, unsigned job_id)
03446 {
03447     CFastMutexGuard guard(m_GroupMapLock);
03448     TGroupMap::iterator i = m_GroupMap.find(group_id);
03449     if (i != m_GroupMap.end()) {
03450         TNSBitVector& bv = (*i).second;
03451         bv.set(job_id, false);
03452         if (!bv.any())
03453             m_GroupMap.erase(i);
03454     }
03455 }
03456 
03457 
03458 bool
03459 CQueue::x_UpdateDB_PutResultNoLock(
03460     unsigned             job_id,
03461     time_t               curr,
03462     bool                 delete_done,
03463     int                  ret_code,
03464     const string&        output,
03465     CJob&                job)
03466 {
03467     CJob::EJobFetchResult res = job.Fetch(this, job_id);
03468     if (res != CJob::eJF_Ok) {
03469         // TODO: Integrity error or job just expired?
03470         return false;
03471     }
03472 
03473     CJobRun* run = job.GetLastRun();
03474     if (!run) {
03475         ERR_POST(Error << "No JobRun for running job "
03476             << DecorateJobId(job_id));
03477         NCBI_THROW(CNetScheduleException, eInvalidJobStatus, "Job never ran");
03478     }
03479 
03480     if (delete_done) {
03481         job.Delete();
03482     } else {
03483         run->SetStatus(CNetScheduleAPI::eDone);
03484         run->SetTimeDone(curr);
03485         run->SetRetCode(ret_code);
03486         job.SetStatus(CNetScheduleAPI::eDone);
03487         job.SetOutput(output);
03488 
03489         job.Flush(this);
03490     }
03491     return true;
03492 }
03493 
03494 
03495 CQueue::EGetJobUpdateStatus
03496 CQueue::x_UpdateDB_GetJobNoLock(
03497     CWorkerNode*      worker_node,
03498     time_t            curr,
03499     unsigned          job_id,
03500     CJob&             job)
03501 {
03502     unsigned queue_timeout = GetTimeout();
03503 
03504     const unsigned kMaxGetAttempts = 100;
03505 
03506 
03507     for (unsigned fetch_attempts = 0; fetch_attempts < kMaxGetAttempts;
03508         ++fetch_attempts)
03509     {
03510         CJob::EJobFetchResult res;
03511         if ((res = job.Fetch(this, job_id)) != CJob::eJF_Ok) {
03512             if (res == CJob::eJF_NotFound) return eGetJobUpdate_NotFound;
03513             // FIXME: does it make sense to retry right after DB error?
03514             continue;
03515         }
03516         TJobStatus status = job.GetStatus();
03517 
03518         // integrity check
03519         if (status != CNetScheduleAPI::ePending) {
03520             if (CJobStatusTracker::IsCancelCode(status)) {
03521                 // this job has been canceled while I was fetching it
03522                 return eGetJobUpdate_JobStopped;
03523             }
03524             ERR_POST(Error
03525                 << "x_UpdateDB_GetJobNoLock: Status integrity violation "
03526                 << " job = "     << DecorateJobId(job_id)
03527                 << " status = "  << status
03528                 << " expected status = "
03529                 << (int)CNetScheduleAPI::ePending);
03530             return eGetJobUpdate_JobStopped;
03531         }
03532 
03533         time_t time_submit = job.GetTimeSubmit();
03534         time_t timeout = job.GetTimeout();
03535         if (timeout == 0) timeout = queue_timeout;
03536 
03537         _ASSERT(timeout);
03538         // check if job already expired
03539         if (timeout && (time_submit + timeout < curr)) {
03540             // Job expired, fail it
03541             CJobRun& run = job.AppendRun();
03542             run.SetStatus(CNetScheduleAPI::eFailed);
03543             run.SetTimeDone(curr);
03544             run.SetErrorMsg("Job expired and cannot be scheduled.");
03545             job.SetStatus(CNetScheduleAPI::eFailed);
03546             m_StatusTracker.ChangeStatus(job_id, CNetScheduleAPI::eFailed);
03547             job.Flush(this);
03548 
03549             if (IsMonitoring()) {
03550                 CTime tmp_t(CTime::eCurrent);
03551                 string msg = tmp_t.AsString();
03552                 msg +=
03553                     " CQueue::x_UpdateDB_GetJobNoLock() timeout expired job id=";
03554                 msg += NStr::IntToString(job_id);
03555                 MonitorPost(msg);
03556             }
03557             return eGetJobUpdate_JobFailed;
03558         }
03559 
03560         // job not expired
03561         job.FetchAffinityToken(this);
03562 
03563         unsigned run_count = job.GetRunCount() + 1;
03564         CJobRun& run = job.AppendRun();
03565         run.SetStatus(CNetScheduleAPI::eRunning);
03566         run.SetTimeStart(curr);
03567 
03568         // We're setting host:port here using worker_node. It is faster
03569         // than looking up this info by node_id in worker node list and
03570         // should provide exactly same info.
03571         run.SetNodeId(worker_node->GetId());
03572         run.SetNodeAddr(worker_node->GetHost());
03573         run.SetNodePort(worker_node->GetPort());
03574 
03575         job.SetStatus(CNetScheduleAPI::eRunning);
03576         job.SetRunTimeout(0);
03577         job.SetRunCount(run_count);
03578 
03579         job.Flush(this);
03580         return eGetJobUpdate_Ok;
03581     }
03582 
03583     return eGetJobUpdate_NotFound;
03584 }
03585 
03586 
03587 END_NCBI_SCOPE
03588 
03589 

Generated on Sun Dec 6 22:20:46 2009 for NCBI C++ ToolKit by  doxygen 1.4.6
Modified on Mon Dec 07 16:20:55 2009 by modify_doxy.py rev. 173732