00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 #include <ncbi_pch.hpp>
00032
00033 #include "ns_queue.hpp"
00034 #include "background_host.hpp"
00035 #include "ns_util.hpp"
00036 #include "ns_format.hpp"
00037
00038 #include <corelib/ncbi_system.hpp>
00039 #include <corelib/request_ctx.hpp>
00040 #include <db/bdb/bdb_trans.hpp>
00041 #include <util/qparse/query_parse.hpp>
00042 #include <util/qparse/query_exec.hpp>
00043 #include <util/qparse/query_exec_bv.hpp>
00044 #include <util/bitset/bmalgo.h>
00045
00046 BEGIN_NCBI_SCOPE
00047
00048
00049
00050
00051
00052 void SQueueDbBlock::Open(CBDB_Env& env, const string& path, int pos_)
00053 {
00054 pos = pos_;
00055 string prefix = string("jsq_") + NStr::IntToString(pos);
00056 allocated = false;
00057 bool group_tables_for_queue = false;
00058 try {
00059 string fname = prefix + ".db";
00060 string tname = "";
00061 job_db.SetEnv(env);
00062
00063
00064
00065
00066 job_db.RevSplitOff();
00067 if (group_tables_for_queue) tname = "job";
00068 job_db.Open(fname, tname, CBDB_RawFile::eReadWriteCreate);
00069
00070 if (group_tables_for_queue)
00071 tname = "jobinfo";
00072 else
00073 fname = prefix + "_jobinfo.db";
00074 job_info_db.SetEnv(env);
00075 job_info_db.Open(fname, tname, CBDB_RawFile::eReadWriteCreate);
00076
00077 if (group_tables_for_queue)
00078 tname = "runs";
00079 else
00080 fname = prefix + "_runs.db";
00081 runs_db.SetEnv(env);
00082 runs_db.Open(fname, tname, CBDB_RawFile::eReadWriteCreate);
00083
00084 if (group_tables_for_queue)
00085 tname = "deleted";
00086 else
00087 fname = prefix + "_deleted.db";
00088 deleted_jobs_db.SetEnv(env);
00089 deleted_jobs_db.Open(fname, tname, CBDB_RawFile::eReadWriteCreate);
00090
00091 if (group_tables_for_queue)
00092 tname = "affid";
00093 else
00094 fname = prefix + "_affid.idx";
00095 affinity_idx.SetEnv(env);
00096 affinity_idx.Open(fname, tname, CBDB_RawFile::eReadWriteCreate);
00097
00098 if (group_tables_for_queue)
00099 tname = "affdict";
00100 else
00101 fname = prefix + "_affdict.db";
00102 aff_dict_db.SetEnv(env);
00103 aff_dict_db.Open(fname, tname, CBDB_RawFile::eReadWriteCreate);
00104
00105 if (group_tables_for_queue)
00106 tname = "affdict_token";
00107 else
00108 fname = prefix + "_affdict_token.idx";
00109 aff_dict_token_idx.SetEnv(env);
00110 aff_dict_token_idx.Open(fname, tname, CBDB_RawFile::eReadWriteCreate);
00111
00112 if (group_tables_for_queue)
00113 tname = "tag";
00114 else
00115 fname = prefix + "_tag.idx";
00116 tag_db.SetEnv(env);
00117 tag_db.SetPageSize(32*1024);
00118 tag_db.RevSplitOff();
00119 tag_db.Open(fname, tname, CBDB_RawFile::eReadWriteCreate);
00120
00121 } catch (CBDB_ErrnoException&) {
00122 throw;
00123 }
00124 }
00125
00126
00127 void SQueueDbBlock::Close()
00128 {
00129 tag_db.Close();
00130 aff_dict_token_idx.Close();
00131 aff_dict_db.Close();
00132 affinity_idx.Close();
00133 deleted_jobs_db.Close();
00134 runs_db.Close();
00135 job_info_db.Close();
00136 job_db.Close();
00137 }
00138
00139
00140 void SQueueDbBlock::Truncate()
00141 {
00142 tag_db.SafeTruncate();
00143 aff_dict_token_idx.SafeTruncate();
00144 aff_dict_db.SafeTruncate();
00145 affinity_idx.SafeTruncate();
00146 deleted_jobs_db.SafeTruncate();
00147 runs_db.SafeTruncate();
00148 job_info_db.SafeTruncate();
00149 job_db.SafeTruncate();
00150
00151 CBDB_Env& env = *job_db.GetEnv();
00152 env.ForceTransactionCheckpoint();
00153 env.CleanLog();
00154 }
00155
00156
00157
00158
00159 CQueueDbBlockArray::CQueueDbBlockArray()
00160 : m_Count(0), m_Array(0)
00161 {
00162 };
00163
00164
00165 CQueueDbBlockArray::~CQueueDbBlockArray()
00166 {
00167 }
00168
00169
00170 void CQueueDbBlockArray::Init(CBDB_Env& env, const string& path,
00171 unsigned count)
00172 {
00173 m_Count = count;
00174 m_Array = new SQueueDbBlock[m_Count];
00175 for (unsigned n = 0; n < m_Count; ++n) {
00176 m_Array[n].Open(env, path, n);
00177 }
00178 }
00179
00180
00181 void CQueueDbBlockArray::Close()
00182 {
00183 for (unsigned n = 0; n < m_Count; ++n) {
00184 m_Array[n].Close();
00185 }
00186 delete [] m_Array;
00187 m_Array = 0;
00188 m_Count = 0;
00189 }
00190
00191
00192 int CQueueDbBlockArray::Allocate()
00193 {
00194 for (unsigned n = 0; n < m_Count; ++n) {
00195 if (!m_Array[n].allocated) {
00196 m_Array[n].allocated = true;
00197 return n;
00198 }
00199 }
00200 return -1;
00201 }
00202
00203
00204 bool CQueueDbBlockArray::Allocate(int pos)
00205 {
00206 if (pos < 0 || pos >= int(m_Count)) return false;
00207 if (m_Array[pos].allocated) return false;
00208 m_Array[pos].allocated = true;
00209 return true;
00210 }
00211
00212
00213 SQueueDbBlock* CQueueDbBlockArray::Get(int pos)
00214 {
00215 if (pos < 0 || unsigned(pos) >= m_Count) return 0;
00216 return &m_Array[pos];
00217 }
00218
00219
00220
00221
00222 void SQueueParameters::Read(const IRegistry& reg, const string& sname)
00223 {
00224
00225 #define GetIntNoErr(name, dflt) reg.GetInt(sname, name, dflt, 0, IRegistry::eReturn)
00226
00227 timeout = GetIntNoErr("timeout", 3600);
00228
00229 notif_timeout = GetIntNoErr("notif_timeout", 7);
00230 run_timeout = GetIntNoErr("run_timeout", timeout);
00231 run_timeout_precision = GetIntNoErr("run_timeout_precision", run_timeout);
00232
00233 program_name = reg.GetString(sname, "program", kEmptyStr);
00234
00235 delete_done = reg.GetBool(sname, "delete_done", false,
00236 0, IRegistry::eReturn);
00237
00238 failed_retries = GetIntNoErr("failed_retries", 0);
00239
00240 blacklist_time = GetIntNoErr("blacklist_time", 0);
00241
00242 empty_lifetime = GetIntNoErr("empty_lifetime", -1);
00243
00244 string s = reg.GetString(sname, "max_input_size", kEmptyStr);
00245 max_input_size = kNetScheduleMaxDBDataSize;
00246 try {
00247 max_input_size = (unsigned) NStr::StringToUInt8_DataSize(s);
00248 }
00249 catch (CStringException&) {}
00250 max_input_size = min(kNetScheduleMaxOverflowSize, max_input_size);
00251
00252 s = reg.GetString(sname, "max_output_size", kEmptyStr);
00253 max_output_size = kNetScheduleMaxDBDataSize;
00254 try {
00255 max_output_size = (unsigned) NStr::StringToUInt8_DataSize(s);
00256 }
00257 catch (CStringException&) {}
00258 max_output_size = min(kNetScheduleMaxOverflowSize, max_output_size);
00259
00260 deny_access_violations = reg.GetBool(sname, "deny_access_violations", false,
00261 0, IRegistry::eReturn);
00262 log_access_violations = reg.GetBool(sname, "log_access_violations", true,
00263 0, IRegistry::eReturn);
00264 log_job_state = GetIntNoErr("log_job_state", 0);
00265
00266 subm_hosts = reg.GetString(sname, "subm_host", kEmptyStr);
00267 wnode_hosts = reg.GetString(sname, "wnode_host", kEmptyStr);
00268 }
00269
00270
00271
00272
00273
00274 CNSTagMap::CNSTagMap()
00275 {
00276 }
00277
00278
00279 CNSTagMap::~CNSTagMap()
00280 {
00281 NON_CONST_ITERATE(TNSTagMap, it, m_TagMap) {
00282 if (it->second) {
00283 delete it->second;
00284 it->second = 0;
00285 }
00286 }
00287 }
00288
00289
00290
00291 CQueueEnumCursor::CQueueEnumCursor(CQueue* queue)
00292 : CBDB_FileCursor(queue->m_JobDB)
00293 {
00294 SetCondition(CBDB_FileCursor::eFirst);
00295 }
00296
00297
00298
00299
00300
00301 CQueue::CQueue(CRequestExecutor& executor,
00302 const string& queue_name,
00303 const string& qclass_name,
00304 TQueueKind queue_kind)
00305 :
00306 m_RunTimeLine(NULL),
00307 m_HasNotificationPort(false),
00308 m_DeleteDatabase(false),
00309 m_Executor(executor),
00310 m_QueueName(queue_name),
00311 m_QueueClass(qclass_name),
00312 m_Kind(queue_kind),
00313 m_QueueDbBlock(0),
00314
00315 m_BecameEmpty(-1),
00316 m_WorkerNodeList(queue_name),
00317 m_AffWrapped(false),
00318 m_CurrAffId(0),
00319 m_LastAffId(0),
00320
00321 m_ParamLock(CRWLock::fFavorWriters),
00322 m_Timeout(3600),
00323 m_NotifyTimeout(7),
00324 m_DeleteDone(false),
00325 m_RunTimeout(3600),
00326 m_RunTimeoutPrecision(-1),
00327 m_FailedRetries(0),
00328 m_BlacklistTime(0),
00329 m_EmptyLifetime(0),
00330 m_MaxInputSize(kNetScheduleMaxDBDataSize),
00331 m_MaxOutputSize(kNetScheduleMaxDBDataSize),
00332 m_DenyAccessViolations(false),
00333 m_LogAccessViolations(true),
00334 m_LogJobState(0)
00335 {
00336 _ASSERT(!queue_name.empty());
00337 for (TStatEvent n = 0; n < eStatNumEvents; ++n) {
00338 m_EventCounter[n].Set(0);
00339 m_Average[n] = 0;
00340 }
00341 m_StatThread.Reset(new CStatisticsThread(*this));
00342 m_StatThread->Run();
00343 m_LastId.Set(0);
00344 m_GroupLastId.Set(0);
00345 }
00346
00347 CQueue::~CQueue()
00348 {
00349 delete m_RunTimeLine;
00350 Detach();
00351 m_StatThread->RequestStop();
00352 m_StatThread->Join(NULL);
00353 }
00354
00355
00356 void CQueue::Attach(SQueueDbBlock* block)
00357 {
00358 Detach();
00359 m_QueueDbBlock = block;
00360 m_AffinityDict.Attach(&m_QueueDbBlock->aff_dict_db,
00361 &m_QueueDbBlock->aff_dict_token_idx);
00362 }
00363
00364 class CTruncateRequest : public CStdRequest
00365 {
00366 public:
00367 CTruncateRequest(SQueueDbBlock* qdbblock)
00368 : m_QueueDbBlock(qdbblock)
00369 {}
00370 virtual void Process() {
00371
00372 CStopWatch sw(CStopWatch::eStart);
00373 m_QueueDbBlock->Truncate();
00374 m_QueueDbBlock->allocated = false;
00375 LOG_POST(Info << "Clean up of db block "
00376 << m_QueueDbBlock->pos
00377 << " complete, "
00378 << sw.Elapsed()
00379 << " elapsed");
00380 }
00381 private:
00382 SQueueDbBlock* m_QueueDbBlock;
00383
00384 };
00385
00386
00387 void CQueue::Detach()
00388 {
00389
00390
00391
00392
00393
00394
00395
00396
00397
00398 m_AffinityDict.Detach();
00399 if (!m_QueueDbBlock) return;
00400 if (m_DeleteDatabase) {
00401 CRef<CStdRequest> request(new CTruncateRequest(m_QueueDbBlock));
00402 m_Executor.SubmitRequest(request);
00403 m_DeleteDatabase = false;
00404 } else
00405 m_QueueDbBlock->allocated = false;
00406 m_QueueDbBlock = 0;
00407 }
00408
00409
00410 void CQueue::SetParameters(const SQueueParameters& params)
00411 {
00412
00413 CWriteLockGuard guard(m_ParamLock);
00414 m_Timeout = params.timeout;
00415 m_NotifyTimeout = params.notif_timeout;
00416 m_DeleteDone = params.delete_done;
00417
00418 m_RunTimeout = params.run_timeout;
00419 if (params.run_timeout && !m_RunTimeLine) {
00420
00421 m_RunTimeLine =
00422 new CJobTimeLine(params.run_timeout_precision, 0);
00423 m_RunTimeoutPrecision = params.run_timeout_precision;
00424 }
00425
00426 m_FailedRetries = params.failed_retries;
00427 m_BlacklistTime = params.blacklist_time;
00428 m_EmptyLifetime = params.empty_lifetime;
00429 m_MaxInputSize = params.max_input_size;
00430 m_MaxOutputSize = params.max_output_size;
00431 m_DenyAccessViolations = params.deny_access_violations;
00432 m_LogAccessViolations = params.log_access_violations;
00433 m_LogJobState = params.log_job_state;
00434
00435 m_ProgramVersionList.Clear();
00436 if (!params.program_name.empty()) {
00437 m_ProgramVersionList.AddClientInfo(params.program_name);
00438 }
00439 m_SubmHosts.SetHosts(params.subm_hosts);
00440 m_WnodeHosts.SetHosts(params.wnode_hosts);
00441 }
00442
00443
00444 CQueue::TParameterList CQueue::GetParameters() const
00445 {
00446 TParameterList parameters;
00447 CQueueParamAccessor qp(*this);
00448 unsigned nParams = qp.GetNumParams();
00449 for (unsigned n = 0; n < nParams; ++n) {
00450 parameters.push_back(
00451 pair<string, string>(qp.GetParamName(n), qp.GetParamValue(n)));
00452 }
00453 return parameters;
00454 }
00455
00456
00457 unsigned CQueue::LoadStatusMatrix()
00458 {
00459 unsigned queue_run_timeout = GetRunTimeout();
00460 EBDB_ErrCode err;
00461
00462 static EVectorId all_ids[] = { eVIJob, eVITag, eVIAffinity };
00463 TNSBitVector all_vects[] =
00464 { m_JobsToDelete, m_DeletedJobs, m_AffJobsToDelete };
00465 for (size_t i = 0; i < sizeof(all_ids) / sizeof(all_ids[0]); ++i) {
00466 m_DeletedJobsDB.id = all_ids[i];
00467 err = m_DeletedJobsDB.ReadVector(&all_vects[i]);
00468 if (err != eBDB_Ok && err != eBDB_NotFound) {
00469
00470 }
00471 all_vects[i].optimize();
00472 }
00473
00474
00475 TNSBitVector running_jobs;
00476 CBDB_FileCursor cur(m_JobDB);
00477 cur.InitMultiFetch(1024*1024);
00478 cur.SetCondition(CBDB_FileCursor::eGE);
00479 cur.From << 0;
00480
00481 unsigned recs = 0;
00482
00483 unsigned last_id = 0;
00484 unsigned group_last_id = 0;
00485 for (; cur.Fetch() == eBDB_Ok; ) {
00486 unsigned job_id = m_JobDB.id;
00487 if (m_JobsToDelete.test(job_id)) continue;
00488 int i_status = m_JobDB.status;
00489 if (i_status < (int) CNetScheduleAPI::ePending ||
00490 i_status >= (int) CNetScheduleAPI::eLastStatus)
00491 {
00492
00493 LOG_POST(Warning << "Job " << job_id
00494 << " has invalid status " << i_status
00495 << ", ignored.");
00496 continue;
00497 }
00498 TJobStatus status = TJobStatus(i_status);
00499 m_StatusTracker.SetExactStatusNoLock(job_id, status, true);
00500
00501 if (status == CNetScheduleAPI::eReading) {
00502 unsigned group_id = m_JobDB.read_group;
00503 x_AddToReadGroupNoLock(group_id, job_id);
00504 if (group_last_id < group_id) group_last_id = group_id;
00505 }
00506 if ((status == CNetScheduleAPI::eRunning ||
00507 status == CNetScheduleAPI::eReading) &&
00508 m_RunTimeLine) {
00509
00510
00511
00512
00513
00514
00515 m_RunTimeLine->AddObject(m_RunTimeLine->GetHead(), job_id);
00516 }
00517 if (status == CNetScheduleAPI::eRunning)
00518 running_jobs.set(job_id);
00519 if (last_id < job_id) last_id = job_id;
00520 ++recs;
00521 }
00522
00523 TNSBitVector::enumerator en(running_jobs.first());
00524 for (; en.valid(); ++en) {
00525 unsigned job_id = *en;
00526 CJob job;
00527 CJob::EJobFetchResult res = job.Fetch(this, job_id);
00528 if (res != CJob::eJF_Ok) {
00529
00530 ERR_POST(Error << "Can't read job " << DecorateJobId(job_id)
00531 << " while loading status matrix");
00532 continue;
00533 }
00534 const CJobRun* run = job.GetLastRun();
00535 if (!run) {
00536 ERR_POST(Error << "No job run for Running job "
00537 << DecorateJobId(job_id)
00538 << " while loading status matrix");
00539 continue;
00540 }
00541 if (run->GetStatus() != CNetScheduleAPI::eRunning)
00542 continue;
00543
00544
00545 if (m_WorkerNodeList.FindJobById(job_id) != NULL)
00546 continue;
00547
00548
00549
00550
00551
00552
00553 TWorkerNodeRef worker_node(m_WorkerNodeList.FindWorkerNodeByAddress(
00554 run->GetNodeAddr(), run->GetNodePort()));
00555
00556 if (worker_node == NULL) {
00557 worker_node = new CWorkerNode_Real(run->GetNodeAddr());
00558
00559 m_WorkerNodeList.RegisterNode(worker_node);
00560 m_WorkerNodeList.SetId(worker_node, run->GetNodeId());
00561 m_WorkerNodeList.SetPort(worker_node, run->GetNodePort());
00562 }
00563
00564 unsigned run_timeout = job.GetRunTimeout();
00565 if (run_timeout == 0)
00566 run_timeout = queue_run_timeout;
00567 unsigned exp_time = run->GetTimeStart() + run_timeout;
00568
00569 m_WorkerNodeList.AddJob(worker_node, job, exp_time, 0, false);
00570 }
00571 m_LastId.Set(last_id);
00572 m_GroupLastId.Set(group_last_id);
00573 return recs;
00574 }
00575
00576
00577 static void s_LogSubmit(CQueue& q,
00578 CJob& job,
00579 SQueueDescription& qdesc)
00580 {
00581 CRequestContext& ctx = CDiagContext::GetRequestContext();
00582 if (!job.GetClientIP().empty())
00583 ctx.SetClientIP(job.GetClientIP());
00584 ctx.SetSessionID(job.GetClientSID());
00585
00586 CDiagContext_Extra extra = GetDiagContext().Extra()
00587 .Print("action", "submit")
00588 .Print("queue", q.GetQueueName())
00589 .Print("job_id", NStr::UIntToString(job.GetId()))
00590 .Print("input", job.GetInput())
00591 .Print("aff", job.GetAffinityToken())
00592 .Print("mask", NStr::UIntToString(job.GetMask()))
00593 .Print("progress_msg", job.GetProgressMsg())
00594 .Print("subm_addr", FormatHostName(job.GetSubmAddr(), &qdesc))
00595 .Print("subm_port", NStr::UIntToString(job.GetSubmPort()))
00596 .Print("subm_timeout", NStr::UIntToString(job.GetSubmTimeout()));
00597 ITERATE(TNSTagList, it, job.GetTags()) {
00598 extra.Print(string("tag_")+it->first, it->second);
00599 }
00600 }
00601
00602
00603 unsigned CQueue::Submit(CJob& job)
00604 {
00605 unsigned max_input_size;
00606 unsigned log_job_state;
00607 {{
00608 CQueueParamAccessor qp(*this);
00609 log_job_state = qp.GetLogJobState();
00610 max_input_size = qp.GetMaxInputSize();
00611 }}
00612 if (job.GetInput().size() > max_input_size)
00613 NCBI_THROW(CNetScheduleException, eDataTooLong,
00614 "Input is too long");
00615
00616 bool was_empty = !m_StatusTracker.AnyPending();
00617
00618 unsigned job_id = GetNextId();
00619 job.SetId(job_id);
00620 job.SetTimeSubmit(time(0));
00621
00622
00623 unsigned mask = job.GetMask();
00624 if (mask & CNetScheduleAPI::eOutOfOrder)
00625 {
00626
00627
00628
00629
00630
00631 }
00632
00633 CNS_Transaction trans(this);
00634 unsigned affinity_id;
00635 {{
00636 CAffinityDictGuard aff_dict_guard(GetAffinityDict(), trans);
00637 job.CheckAffinityToken(aff_dict_guard);
00638 affinity_id = job.GetAffinityId();
00639 CQueueGuard guard(this, &trans);
00640
00641 job.Flush(this);
00642
00643 CountEvent(CQueue::eStatDBWriteEvent, 1);
00644
00645
00646
00647 if (affinity_id)
00648 AddJobsToAffinity(trans, affinity_id, job_id);
00649
00650
00651 CNSTagMap tag_map;
00652 AppendTags(tag_map, job.GetTags(), job_id);
00653 FlushTags(tag_map, trans);
00654 }}
00655
00656 trans.Commit();
00657 m_StatusTracker.SetStatus(job_id, CNetScheduleAPI::ePending);
00658 if (log_job_state >= 1) {
00659 CRequestContext rq;
00660 rq.SetRequestID(CRequestContext::GetNextRequestID());
00661
00662 CDiagContext::SetRequestContext(&rq);
00663
00664 SQueueDescription qdesc;
00665 s_LogSubmit(*this, job, qdesc);
00666 CDiagContext::SetRequestContext(0);
00667 }
00668 if (was_empty) NotifyListeners(true, affinity_id);
00669
00670 return job_id;
00671 }
00672
00673
00674 unsigned CQueue::SubmitBatch(vector<CJob>& batch)
00675 {
00676 unsigned max_input_size;
00677 unsigned log_job_state;
00678 {{
00679 CQueueParamAccessor qp(*this);
00680 log_job_state = qp.GetLogJobState();
00681 max_input_size = qp.GetMaxInputSize();
00682 }}
00683
00684 SQueueDescription qdesc;
00685
00686 if (log_job_state >= 1) {
00687 GetDiagContext().PrintRequestStart()
00688 .Print("action", "submit_batch")
00689 .Print("size", NStr::UIntToString(batch.size()));
00690 }
00691
00692 bool was_empty = !m_StatusTracker.AnyPending();
00693
00694 unsigned job_id = GetNextIdBatch(batch.size());
00695
00696 unsigned batch_aff_id = 0;
00697 bool batch_has_aff = false;
00698
00699
00700 {{
00701 CNS_Transaction trans(this);
00702 CAffinityDictGuard aff_dict_guard(GetAffinityDict(), trans);
00703 for (unsigned i = 0; i < batch.size(); ++i) {
00704 CJob& job = batch[i];
00705 if (job.GetAffinityId() == (unsigned)kMax_I4) {
00706 _ASSERT(i > 0);
00707 unsigned prev_aff_id = 0;
00708 if (i > 0) {
00709 prev_aff_id = batch[i-1].GetAffinityId();
00710 if (!prev_aff_id) {
00711 prev_aff_id = 0;
00712 LOG_POST(Warning << "Reference to empty previous "
00713 "affinity token");
00714 }
00715 } else {
00716 LOG_POST(Warning << "First job in batch cannot have "
00717 "reference to previous affinity token");
00718 }
00719 job.SetAffinityId(prev_aff_id);
00720 } else {
00721 if (job.HasAffinityToken()) {
00722 job.CheckAffinityToken(aff_dict_guard);
00723 batch_has_aff = true;
00724 batch_aff_id = (i == 0 )? job.GetAffinityId() : 0;
00725 } else {
00726 batch_aff_id = 0;
00727 }
00728
00729 }
00730 }
00731 trans.Commit();
00732 }}
00733
00734 CNS_Transaction trans(this);
00735 {{
00736 CNSTagMap tag_map;
00737 CQueueGuard guard(this, &trans);
00738
00739 unsigned job_id_cnt = job_id;
00740 time_t now = time(0);
00741 int aux_inserts = 0;
00742 NON_CONST_ITERATE(vector<CJob>, it, batch) {
00743 if (it->GetInput().size() > max_input_size)
00744 NCBI_THROW(CNetScheduleException, eDataTooLong,
00745 "Input is too long");
00746 unsigned cur_job_id = job_id_cnt++;
00747 it->SetId(cur_job_id);
00748 it->SetTimeSubmit(now);
00749 it->Flush(this);
00750
00751 AppendTags(tag_map, it->GetTags(), cur_job_id);
00752 if (log_job_state >= 2) {
00753 s_LogSubmit(*this, *it, qdesc);
00754 }
00755 }
00756 CountEvent(CQueue::eStatDBWriteEvent,
00757 batch.size() + aux_inserts);
00758
00759
00760 if (batch_has_aff) {
00761 if (batch_aff_id) {
00762 AddJobsToAffinity(trans,
00763 batch_aff_id,
00764 job_id,
00765 job_id + batch.size() - 1);
00766 } else {
00767 AddJobsToAffinity(trans, batch);
00768 }
00769 }
00770 FlushTags(tag_map, trans);
00771 }}
00772 trans.Commit();
00773 m_StatusTracker.AddPendingBatch(job_id, job_id + batch.size() - 1);
00774 if (log_job_state >= 1)
00775 GetDiagContext().PrintRequestStop();
00776
00777
00778
00779
00780 if (was_empty) NotifyListeners(true, batch_aff_id);
00781
00782 return job_id;
00783 }
00784
00785 static const unsigned k_max_dead_locks = 100;
00786
00787 void CQueue::PutResultGetJob(
00788 CWorkerNode* worker_node,
00789
00790 unsigned done_job_id,
00791 int ret_code,
00792 const string* output,
00793
00794
00795 CRequestContextFactory* rec_ctx_f,
00796 const list<string>* aff_list,
00797
00798 CJob* new_job)
00799 {
00800
00801 _ASSERT(!done_job_id || output);
00802 _ASSERT(!new_job || (rec_ctx_f && aff_list));
00803
00804 bool delete_done;
00805 unsigned max_output_size;
00806 unsigned run_timeout;
00807 {{
00808 CQueueParamAccessor qp(*this);
00809 delete_done = qp.GetDeleteDone();
00810 max_output_size = qp.GetMaxOutputSize();
00811 run_timeout = qp.GetRunTimeout();
00812 }}
00813
00814 if (done_job_id && output->size() > max_output_size) {
00815 NCBI_THROW(CNetScheduleException, eDataTooLong,
00816 "Output is too long");
00817 }
00818
00819 unsigned dead_locks = 0;
00820
00821 time_t curr = time(0);
00822
00823
00824 bool need_update = false;
00825 CQueueJSGuard js_guard(this, done_job_id,
00826 CNetScheduleAPI::eDone,
00827 &need_update);
00828
00829
00830
00831
00832
00833
00834
00835
00836
00837
00838
00839
00840
00841
00842
00843
00844
00845
00846
00847
00848
00849 unsigned pending_job_id = 0;
00850 if (new_job)
00851 pending_job_id =
00852 FindPendingJob(worker_node, *aff_list, curr);
00853 bool done_rec_updated = false;
00854 CJob job;
00855
00856
00857
00858
00859
00860
00861
00862 for (;;) {
00863 try {
00864 CNS_Transaction trans(this);
00865
00866 EGetJobUpdateStatus upd_status = eGetJobUpdate_Ok;
00867 {{
00868 CQueueGuard guard(this, &trans);
00869
00870 if (need_update) {
00871 done_rec_updated = x_UpdateDB_PutResultNoLock(
00872 done_job_id, curr, delete_done,
00873 ret_code, *output, job);
00874 }
00875
00876 if (pending_job_id) {
00877
00878
00879
00880 upd_status =
00881 x_UpdateDB_GetJobNoLock(worker_node,
00882 curr, pending_job_id,
00883 *new_job);
00884 }
00885 }}
00886
00887 trans.Commit();
00888 js_guard.Commit();
00889
00890 switch (upd_status) {
00891 case eGetJobUpdate_JobFailed:
00892 m_StatusTracker.ChangeStatus(pending_job_id,
00893 CNetScheduleAPI::eFailed);
00894
00895 case eGetJobUpdate_JobStopped:
00896 case eGetJobUpdate_NotFound:
00897 pending_job_id = 0;
00898 break;
00899 case eGetJobUpdate_Ok:
00900 break;
00901 default:
00902 _ASSERT(0);
00903 }
00904
00905
00906 if (done_rec_updated)
00907 RemoveJobFromWorkerNode(job, eNSCDone);
00908 if (pending_job_id)
00909 AddJobToWorkerNode(worker_node, rec_ctx_f,
00910 *new_job, curr + run_timeout);
00911 break;
00912 }
00913 catch (CBDB_ErrnoException& ex) {
00914 if (ex.IsDeadLock()) {
00915 if (++dead_locks < k_max_dead_locks) {
00916 if (IsMonitoring()) {
00917 MonitorPost(
00918 "DeadLock repeat in CQueue::JobExchange");
00919 }
00920 SleepMilliSec(250);
00921 continue;
00922 }
00923 } else if (ex.IsNoMem()) {
00924 if (++dead_locks < k_max_dead_locks) {
00925 if (IsMonitoring()) {
00926 MonitorPost(
00927 "No resource repeat in CQueue::JobExchange");
00928 }
00929 SleepMilliSec(250);
00930 continue;
00931 }
00932 } else {
00933 throw;
00934 }
00935 ERR_POST("Too many transaction repeats in CQueue::JobExchange.");
00936 throw;
00937 }
00938 }
00939
00940 unsigned job_aff_id;
00941 if (new_job && (job_aff_id = new_job->GetAffinityId())) {
00942 CStopWatch sw(CStopWatch::eStart);
00943 time_t exp_time = run_timeout ? curr + 2*run_timeout : 0;
00944 AddAffinity(worker_node, job_aff_id, exp_time);
00945
00946 }
00947
00948 TimeLineExchange(done_job_id, pending_job_id, curr + run_timeout);
00949
00950 if (done_rec_updated && job.ShouldNotify(curr)) {
00951 Notify(job.GetSubmAddr(), job.GetSubmPort(), done_job_id);
00952 }
00953
00954 if (IsMonitoring()) {
00955 CTime tmp_t(CTime::eCurrent);
00956 string msg = tmp_t.AsString();
00957
00958 msg += " CQueue::PutResultGetJob()";
00959 if (done_job_id) {
00960 msg += " (PUT) job id=";
00961 msg += NStr::IntToString(done_job_id);
00962 msg += " ret_code=";
00963 msg += NStr::IntToString(ret_code);
00964 msg += " output=\"";
00965 msg += *output + '"';
00966 }
00967 if (pending_job_id) {
00968 msg += " (GET) job id=";
00969 msg += NStr::IntToString(pending_job_id);
00970 msg += " worker_node=";
00971
00972 msg += worker_node->GetId();
00973 }
00974 MonitorPost(msg);
00975 }
00976
00977
00978 if (!pending_job_id && aff_list && aff_list->size()) {
00979 NCBI_THROW(CNetScheduleException, eNoJobsWithAffinity,
00980 GetAffinityList());
00981 }
00982 }
00983
00984
00985 void CQueue::JobDelayExpiration(CWorkerNode* worker_node,
00986 unsigned job_id,
00987 time_t tm)
00988 {
00989 unsigned queue_run_timeout = GetRunTimeout();
00990
00991 if (tm <= 0) return;
00992
00993 time_t run_timeout = 0;
00994 time_t time_start = 0;
00995
00996 TJobStatus st = GetJobStatus(job_id);
00997 if (st != CNetScheduleAPI::eRunning)
00998 return;
00999
01000 CJob job;
01001 CNS_Transaction trans(this);
01002
01003 time_t exp_time = 0;
01004 time_t curr = time(0);
01005
01006 bool job_updated = false;
01007 {{
01008 CQueueGuard guard(this, &trans);
01009
01010 CJob::EJobFetchResult res;
01011 if ((res = job.Fetch(this, job_id)) != CJob::eJF_Ok)
01012 return;
01013
01014 CJobRun* run = job.GetLastRun();
01015 if (!run) {
01016 ERR_POST(Error << "No JobRun for running job "
01017 << DecorateJobId(job_id));
01018
01019 run = &job.AppendRun();
01020 job_updated = true;
01021 }
01022
01023 time_start = run->GetTimeStart();
01024 if (time_start == 0) {
01025
01026 ERR_POST(Error
01027 << "Internal error: time_start == 0 for running job id="
01028 << DecorateJobId(job_id));
01029
01030 time_start = curr;
01031 run->SetTimeStart(curr);
01032 job_updated = true;
01033 }
01034 run_timeout = job.GetRunTimeout();
01035 if (run_timeout == 0) run_timeout = queue_run_timeout;
01036
01037 if (time_start + run_timeout > curr + tm) {
01038
01039
01040 if (job_updated) job.Flush(this);
01041 return;
01042 }
01043
01044 job.SetRunTimeout(curr + tm - time_start);
01045 job.Flush(this);
01046 }}
01047
01048 trans.Commit();
01049 UpdateWorkerNodeJob(job_id, curr + tm);
01050
01051 exp_time = run_timeout == 0 ? 0 : time_start + run_timeout;
01052
01053 TimeLineMove(job_id, exp_time, curr + tm);
01054
01055 if (IsMonitoring()) {
01056 CTime tmp_t(CTime::eCurrent);
01057 string msg = tmp_t.AsString();
01058 msg += " CQueue::JobDelayExpiration: Job id=";
01059 msg += NStr::IntToString(job_id);
01060 tmp_t.SetTimeT(curr + tm);
01061 tmp_t.ToLocalTime();
01062 msg += " new_expiration_time=";
01063 msg += tmp_t.AsString();
01064 msg += " job_timeout(sec)=";
01065 msg += NStr::Int8ToString(run_timeout);
01066 msg += " job_timeout(minutes)=";
01067 msg += NStr::Int8ToString(run_timeout/60);
01068
01069 MonitorPost(msg);
01070 }
01071 return;
01072 }
01073
01074
01075 bool CQueue::PutProgressMessage(unsigned job_id,
01076 const string& msg)
01077 {
01078 CJob job;
01079 CNS_Transaction trans(this);
01080 {{
01081 CQueueGuard guard(this, &trans);
01082
01083 CJob::EJobFetchResult res = job.Fetch(this, job_id);
01084 if (res != CJob::eJF_Ok) return false;
01085 job.SetProgressMsg(msg);
01086 job.Flush(this);
01087 }}
01088 trans.Commit();
01089
01090 if (IsMonitoring()) {
01091 CTime tmp_t(CTime::eCurrent);
01092 string mmsg = tmp_t.AsString();
01093 mmsg += " CQueue::PutProgressMessage() job id=";
01094 mmsg += NStr::IntToString(job_id);
01095 mmsg += " msg=";
01096 mmsg += msg;
01097
01098 MonitorPost(mmsg);
01099 }
01100 return true;
01101 }
01102
01103
01104 void CQueue::ReturnJob(unsigned job_id)
01105 {
01106
01107
01108 if (!job_id) return;
01109
01110 CQueueJSGuard js_guard(this, job_id, CNetScheduleAPI::ePending);
01111 TJobStatus st = js_guard.GetOldStatus();
01112
01113 if (st != CNetScheduleAPI::eRunning)
01114 return;
01115
01116 CJob job;
01117 CNS_Transaction trans(this);
01118 {{
01119 CQueueGuard guard(this, &trans);
01120
01121 CJob::EJobFetchResult res = job.Fetch(this, job_id);
01122 if (res != CJob::eJF_Ok)
01123 return;
01124
01125 job.SetStatus(CNetScheduleAPI::ePending);
01126 unsigned run_count = job.GetRunCount();
01127 CJobRun* run = job.GetLastRun();
01128 if (!run) {
01129 ERR_POST(Error << "No JobRun for running job "
01130 << DecorateJobId(job_id));
01131 run = &job.AppendRun();
01132 }
01133
01134
01135 run->SetStatus(CNetScheduleAPI::eReturned);
01136 run->SetTimeDone(time(0));
01137
01138 if (run_count) {
01139 job.SetRunCount(run_count-1);
01140 }
01141 job.Flush(this);
01142 }}
01143 trans.Commit();
01144 js_guard.Commit();
01145 RemoveJobFromWorkerNode(job, eNSCReturned);
01146 TimeLineRemove(job_id);
01147
01148 if (IsMonitoring()) {
01149 CTime tmp_t(CTime::eCurrent);
01150 string msg = tmp_t.AsString();
01151 msg += " CQueue::ReturnJob: job id=";
01152 msg += NStr::IntToString(job_id);
01153 MonitorPost(msg);
01154 }
01155 }
01156
01157
01158 bool
01159 CQueue::GetJobDescr(unsigned job_id,
01160 int* ret_code,
01161 string* input,
01162 string* output,
01163 string* err_msg,
01164 string* progress_msg,
01165 TJobStatus expected_status)
01166 {
01167 for (unsigned i = 0; i < 3; ++i) {
01168 if (i) {
01169
01170
01171 if (IsMonitoring()) {
01172 MonitorPost(string("GetJobDescr sleep for job_id ") +
01173 NStr::IntToString(job_id));
01174 }
01175 SleepMilliSec(300);
01176 }
01177
01178 CJob job;
01179 CJob::EJobFetchResult res;
01180 {{
01181 CQueueGuard guard(this);
01182 res = job.Fetch(this, job_id);
01183 }}
01184 if (res == CJob::eJF_Ok) {
01185 if (expected_status != CNetScheduleAPI::eJobNotFound) {
01186 TJobStatus status = job.GetStatus();
01187 if (status != expected_status) {
01188
01189 continue;
01190 }
01191 }
01192 const CJobRun *last_run = job.GetLastRun();
01193 if (ret_code && last_run)
01194 *ret_code = last_run->GetRetCode();
01195 if (input)
01196 *input = job.GetInput();
01197 if (output)
01198 *output = job.GetOutput();
01199 if (err_msg && last_run)
01200 *err_msg = last_run->GetErrorMsg();
01201 if (progress_msg)
01202 *progress_msg = job.GetProgressMsg();
01203
01204 return true;
01205 } else if (res == CJob::eJF_NotFound) {
01206 return false;
01207 }
01208 }
01209
01210 return false;
01211 }
01212
01213
01214 void CQueue::Truncate(void)
01215 {
01216 Clear();
01217
01218 IsExpired();
01219 }
01220
01221
01222 void CQueue::Cancel(unsigned job_id)
01223 {
01224 CQueueJSGuard js_guard(this, job_id,
01225 CNetScheduleAPI::eCanceled);
01226 TJobStatus st = js_guard.GetOldStatus();
01227 if (st != CNetScheduleAPI::ePending &&
01228 st != CNetScheduleAPI::eRunning) {
01229 return;
01230 }
01231 CJob job;
01232 CNS_Transaction trans(this);
01233 {{
01234 CQueueGuard guard(this, &trans);
01235 CJob::EJobFetchResult res = job.Fetch(this, job_id);
01236 if (res != CJob::eJF_Ok) {
01237
01238 return;
01239 }
01240
01241 TJobStatus status = job.GetStatus();
01242 if (status != st) {
01243 ERR_POST(Error
01244 << "Status mismatch for job " << DecorateJobId(job_id)
01245 << " matrix " << st
01246 << " db " << status);
01247
01248 return;
01249 }
01250 CJobRun* run = job.GetLastRun();
01251 if (!run) run = &job.AppendRun();
01252 run->SetStatus(CNetScheduleAPI::eCanceled);
01253 run->SetTimeDone(time(0));
01254 run->SetErrorMsg(string("Job canceled from ")
01255 + CNetScheduleAPI::StatusToString(status)
01256 + " state");
01257 job.SetStatus(CNetScheduleAPI::eCanceled);
01258 job.Flush(this);
01259 }}
01260 trans.Commit();
01261 js_guard.Commit();
01262
01263 TimeLineRemove(job_id);
01264 }
01265
01266
01267 void CQueue::ForceReschedule(unsigned job_id)
01268 {
01269 CJob job;
01270 CNS_Transaction trans(this);
01271 {{
01272 CQueueGuard guard(this, &trans);
01273 CJob::EJobFetchResult res = job.Fetch(this, job_id);
01274
01275 if (res == CJob::eJF_Ok) {
01276 job.SetStatus(CNetScheduleAPI::ePending);
01277 job.SetRunCount(0);
01278 } else {
01279
01280 return;
01281 }
01282 }}
01283 trans.Commit();
01284
01285 m_StatusTracker.SetStatus(job_id, CNetScheduleAPI::ePending);
01286 }
01287
01288
01289 TJobStatus
01290 CQueue::GetJobStatus(unsigned job_id) const
01291 {
01292 return m_StatusTracker.GetStatus(job_id);
01293 }
01294
01295
01296 bool
01297 CQueue::CountStatus(CJobStatusTracker::TStatusSummaryMap* status_map,
01298 const string& affinity_token)
01299 {
01300 unsigned aff_id = 0;
01301 TNSBitVector aff_jobs;
01302 if (!affinity_token.empty()) {
01303 aff_id = m_AffinityDict.GetTokenId(affinity_token);
01304 if (!aff_id)
01305 return false;
01306 GetJobsWithAffinity(aff_id, &aff_jobs);
01307 }
01308
01309 m_StatusTracker.CountStatus(status_map, aff_id!=0 ? &aff_jobs : 0);
01310
01311 return true;
01312 }
01313
01314
01315 void CQueue::SetPort(unsigned short port)
01316 {
01317 if (port) {
01318 m_UdpSocket.SetReuseAddress(eOn);
01319 m_UdpSocket.Bind(port);
01320 m_HasNotificationPort = true;
01321 }
01322 }
01323
01324
01325 bool CQueue::IsExpired()
01326 {
01327 time_t empty_lifetime = GetEmptyLifetime();
01328 CQueueGuard guard(this);
01329 if (m_Kind && empty_lifetime > 0) {
01330 unsigned cnt = m_StatusTracker.Count();
01331 if (cnt) {
01332 m_BecameEmpty = -1;
01333 } else {
01334 if (m_BecameEmpty != -1 &&
01335 m_BecameEmpty + empty_lifetime < time(0))
01336 {
01337 LOG_POST(Info << "Queue " << m_QueueName << " expired."
01338 << " Became empty: "
01339 << CTime(m_BecameEmpty).ToLocalTime().AsString()
01340 << " Empty lifetime: " << empty_lifetime
01341 << " sec." );
01342 return true;
01343 }
01344 if (m_BecameEmpty == -1)
01345 m_BecameEmpty = time(0);
01346 }
01347 }
01348 return false;
01349 }
01350
01351
01352 unsigned int CQueue::GetNextId()
01353 {
01354 if (m_LastId.Get() >= CAtomicCounter::TValue(kMax_I4))
01355 m_LastId.Set(0);
01356 return (unsigned) m_LastId.Add(1);
01357 }
01358
01359
01360 unsigned int CQueue::GetNextIdBatch(unsigned count)
01361 {
01362
01363
01364
01365 if (m_LastId.Get() >= CAtomicCounter::TValue(kMax_I4 - count))
01366 m_LastId.Set(0);
01367 unsigned int id = (unsigned) m_LastId.Add(count);
01368 id = id - count + 1;
01369 return id;
01370 }
01371
01372
01373 void CQueue::ReadJobs(unsigned peer_addr,
01374 unsigned count, unsigned timeout,
01375 unsigned& read_id, TNSBitVector& jobs)
01376 {
01377 unsigned group_id = 0;
01378 time_t curr = time(0);
01379 time_t exp_time = timeout ? curr + timeout : 0;
01380
01381 m_StatusTracker.SwitchJobs(count,
01382 CNetScheduleAPI::eDone, CNetScheduleAPI::eReading,
01383 jobs);
01384 CNetSchedule_JSGroupGuard gr_guard(m_StatusTracker, CNetScheduleAPI::eDone, jobs);
01385
01386
01387 CJob job;
01388 CNS_Transaction trans(this);
01389 {{
01390 CQueueGuard guard(this, &trans);
01391 TNSBitVector::enumerator en(jobs.first());
01392 for (; en.valid(); ++en) {
01393 unsigned job_id = *en;
01394 CJob::EJobFetchResult res = job.Fetch(this, job_id);
01395 if (res != CJob::eJF_Ok) {
01396
01397 ERR_POST(Error << "Can't read job " << DecorateJobId(job_id));
01398 continue;
01399 }
01400 if (!group_id) {
01401 if (m_GroupLastId.Get() >= CAtomicCounter::TValue(kMax_I4))
01402 m_GroupLastId.Set(0);
01403 group_id = (unsigned) m_GroupLastId.Add(1);
01404 }
01405
01406 unsigned run_count = job.GetRunCount() + 1;
01407 CJobRun& run = job.AppendRun();
01408 run.SetStatus(CNetScheduleAPI::eReading);
01409 run.SetTimeStart(curr);
01410 run.SetNodeAddr(peer_addr);
01411 job.SetStatus(CNetScheduleAPI::eReading);
01412 job.SetRunTimeout(timeout);
01413 job.SetRunCount(run_count);
01414 job.SetReadGroup(group_id);
01415
01416 job.Flush(this);
01417 if (exp_time && m_RunTimeLine) {
01418
01419
01420 CWriteLockGuard rtl_guard(m_RunTimeLineLock);
01421 m_RunTimeLine->AddObject(exp_time, job_id);
01422 }
01423 }
01424 }}
01425 trans.Commit();
01426 gr_guard.Commit();
01427
01428 if (group_id)
01429 x_CreateReadGroup(group_id, jobs);
01430 read_id = group_id;
01431 }
01432
01433 void CQueue::x_ChangeGroupStatus(unsigned group_id,
01434 const TNSBitVector& jobs,
01435 TJobStatus status)
01436 {
01437 time_t curr = time(0);
01438 {{
01439 CFastMutexGuard guard(m_GroupMapLock);
01440
01441 TGroupMap::iterator i = m_GroupMap.find(group_id);
01442 if (i == m_GroupMap.end())
01443 NCBI_THROW(CNetScheduleException, eInvalidParameter,
01444 "No such read group");
01445
01446 TNSBitVector& bv = (*i).second;
01447
01448
01449 TNSBitVector check(jobs);
01450 check -= bv;
01451 if (check.any())
01452 NCBI_THROW(CNetScheduleException, eInvalidParameter,
01453 "Jobs do not belong to group");
01454
01455
01456 bv -= jobs;
01457 if (!bv.any())
01458 m_GroupMap.erase(i);
01459 }}
01460
01461
01462
01463
01464
01465 CNetSchedule_JSGroupGuard gr_guard(m_StatusTracker,
01466 CNetScheduleAPI::eReading,
01467 jobs,
01468 status);
01469
01470 CJob job;
01471 CNS_Transaction trans(this);
01472 {{
01473 CQueueGuard guard(this, &trans);
01474 TNSBitVector::enumerator en(jobs.first());
01475 for (; en.valid(); ++en) {
01476 unsigned job_id = *en;
01477 CJob::EJobFetchResult res = job.Fetch(this, job_id);
01478 if (res != CJob::eJF_Ok) {
01479
01480 ERR_POST(Error << "Can't read job " << DecorateJobId(job_id));
01481 continue;
01482 }
01483
01484 CJobRun* run = job.GetLastRun();
01485 if (!run) {
01486 ERR_POST(Error << "No JobRun for reading job "
01487 << DecorateJobId(job_id));
01488
01489 run = &job.AppendRun();
01490 run->SetTimeStart(curr);
01491 }
01492 run->SetStatus(status);
01493 run->SetTimeDone(curr);
01494 job.SetStatus(status);
01495
01496 job.Flush(this);
01497 if (m_RunTimeLine) {
01498
01499
01500 CWriteLockGuard rtl_guard(m_RunTimeLineLock);
01501
01502
01503 m_RunTimeLine->RemoveObject(job_id);
01504 }
01505 }
01506 }}
01507 trans.Commit();
01508 gr_guard.Commit();
01509
01510 }
01511
01512
01513 void CQueue::ConfirmJobs(unsigned read_id, TNSBitVector& jobs)
01514 {
01515 x_ChangeGroupStatus(read_id, jobs, CNetScheduleAPI::eConfirmed);
01516 }
01517
01518
01519 void CQueue::FailReadingJobs(unsigned read_id, TNSBitVector& jobs)
01520 {
01521 x_ChangeGroupStatus(read_id, jobs, CNetScheduleAPI::eReadFailed);
01522 }
01523
01524
01525 void CQueue::ReturnReadingJobs(unsigned read_id, TNSBitVector& jobs)
01526 {
01527 x_ChangeGroupStatus(read_id, jobs, CNetScheduleAPI::eDone);
01528 }
01529
01530
01531 void CQueue::EraseJob(unsigned job_id)
01532 {
01533 m_StatusTracker.Erase(job_id);
01534
01535 {{
01536
01537 CFastMutexGuard jtd_guard(m_JobsToDeleteLock);
01538
01539
01540 m_JobsToDelete.set_bit(job_id);
01541 m_DeletedJobs.set_bit(job_id);
01542 m_AffJobsToDelete.set_bit(job_id);
01543
01544
01545 m_AffWrapped = false;
01546 m_LastAffId = m_CurrAffId;
01547
01548 FlushDeletedVectors();
01549 }}
01550 TimeLineRemove(job_id);
01551 if (IsMonitoring()) {
01552 CTime tmp_t(CTime::eCurrent);
01553 string msg = tmp_t.AsString();
01554 msg += " CQueue::EraseJob() job id=";
01555 msg += NStr::IntToString(job_id);
01556 MonitorPost(msg);
01557 }
01558
01559 }
01560
01561
01562 void CQueue::Erase(const TNSBitVector& job_ids)
01563 {
01564 CFastMutexGuard jtd_guard(m_JobsToDeleteLock);
01565 m_JobsToDelete |= job_ids;
01566 m_DeletedJobs |= job_ids;
01567 m_AffJobsToDelete |= job_ids;
01568
01569
01570 m_AffWrapped = false;
01571 m_LastAffId = m_CurrAffId;
01572
01573 FlushDeletedVectors();
01574 }
01575
01576
01577 void CQueue::Clear()
01578 {
01579 TNSBitVector bv;
01580
01581 {{
01582 CWriteLockGuard rtl_guard(m_RunTimeLineLock);
01583
01584 m_StatusTracker.ClearAll(&bv);
01585 m_RunTimeLine->ReInit(0);
01586 }}
01587
01588 Erase(bv);
01589 }
01590
01591
01592 void CQueue::FlushDeletedVectors(EVectorId vid)
01593 {
01594 static EVectorId all_ids[] = { eVIJob, eVITag, eVIAffinity };
01595 TNSBitVector all_vects[] =
01596 { m_JobsToDelete, m_DeletedJobs, m_AffJobsToDelete };
01597 for (size_t i = 0; i < sizeof(all_ids) / sizeof(all_ids[0]); ++i) {
01598 if (vid != eVIAll && vid != all_ids[i]) continue;
01599 m_DeletedJobsDB.id = all_ids[i];
01600 TNSBitVector& bv = all_vects[i];
01601 bv.optimize();
01602 m_DeletedJobsDB.WriteVector(bv, SDeletedJobsDB::eNoCompact);
01603 }
01604 }
01605
01606
01607 void CQueue::FilterJobs(TNSBitVector& ids)
01608 {
01609 TNSBitVector alive_jobs;
01610 m_StatusTracker.GetAliveJobs(alive_jobs);
01611 ids &= alive_jobs;
01612
01613
01614 }
01615
01616 void CQueue::ClearAffinityIdx()
01617 {
01618 const unsigned kDeletedJobsThreshold = 10000;
01619 const unsigned kAffBatchSize = 1000;
01620
01621 unsigned curr_aff_id = 0;
01622 unsigned last_aff_id = 0;
01623 {{
01624
01625 CFastMutexGuard jtd_guard(m_JobsToDeleteLock);
01626
01627
01628
01629 if (m_AffJobsToDelete.count() < kDeletedJobsThreshold)
01630 return;
01631 curr_aff_id = m_CurrAffId;
01632 last_aff_id = m_LastAffId;
01633 }}
01634
01635 TNSBitVector bv(bm::BM_GAP);
01636
01637
01638 bool wrapped = curr_aff_id < last_aff_id;
01639
01640
01641 {{
01642 CFastMutexGuard guard(m_AffinityIdxLock);
01643 m_AffinityIdx.SetTransaction(NULL);
01644 CBDB_FileCursor cur(m_AffinityIdx);
01645 cur.SetCondition(CBDB_FileCursor::eGE);
01646 cur.From << curr_aff_id;
01647
01648 unsigned n = 0;
01649 EBDB_ErrCode ret;
01650 for (; (ret = cur.Fetch()) == eBDB_Ok && n < kAffBatchSize; ++n) {
01651 curr_aff_id = m_AffinityIdx.aff_id;
01652 if (wrapped && curr_aff_id >= last_aff_id)
01653 break;
01654 bv.set(curr_aff_id);
01655 }
01656 if (ret != eBDB_Ok) {
01657 if (ret != eBDB_NotFound)
01658 ERR_POST(Error << "Error reading affinity index: " << ret);
01659 if (wrapped) {
01660 curr_aff_id = last_aff_id;
01661 } else {
01662
01663 curr_aff_id = 0;
01664 wrapped = true;
01665 cur.SetCondition(CBDB_FileCursor::eGE);
01666 cur.From << curr_aff_id;
01667 for (; n < kAffBatchSize && (ret = cur.Fetch()) == eBDB_Ok; ++n) {
01668 curr_aff_id = m_AffinityIdx.aff_id;
01669 if (curr_aff_id >= last_aff_id)
01670 break;
01671 bv.set(curr_aff_id);
01672 }
01673 if (ret != eBDB_NotFound)
01674 ERR_POST(Error << "Error reading affinity index");
01675 }
01676 }
01677 }}
01678
01679 {{
01680 CFastMutexGuard jtd_guard(m_JobsToDeleteLock);
01681 m_AffWrapped = wrapped;
01682 m_CurrAffId = curr_aff_id;
01683 }}
01684
01685
01686 TNSBitVector::enumerator en(bv.first());
01687 for (; en.valid(); ++en) {
01688 unsigned aff_id = *en;
01689 CNS_Transaction trans(this);
01690 CFastMutexGuard guard(m_AffinityIdxLock);
01691 m_AffinityIdx.SetTransaction(&trans);
01692
01693 TNSBitVector bvect(bm::BM_GAP);
01694 m_AffinityIdx.aff_id = aff_id;
01695 EBDB_ErrCode ret =
01696 m_AffinityIdx.ReadVector(&bvect, bm::set_OR, NULL);
01697 if (ret != eBDB_Ok) {
01698 if (ret != eBDB_NotFound)
01699 ERR_POST(Error << "Error reading affinity index: " << ret);
01700 continue;
01701 }
01702 unsigned old_count = bvect.count();
01703 bvect -= m_AffJobsToDelete;
01704 unsigned new_count = bvect.count();
01705 if (new_count == old_count) {
01706 continue;
01707 }
01708 m_AffinityIdx.aff_id = aff_id;
01709 if (bvect.any()) {
01710 bvect.optimize();
01711 m_AffinityIdx.WriteVector(bvect, SAffinityIdx::eNoCompact);
01712 } else {
01713
01714
01715
01716
01717
01718
01719
01720
01721
01722 m_AffinityIdx.Delete();
01723 }
01724 trans.Commit();
01725
01726 }
01727
01728 {{
01729 CFastMutexGuard jtd_guard(m_JobsToDeleteLock);
01730 if (m_AffWrapped && m_CurrAffId >= m_LastAffId) {
01731 m_AffJobsToDelete.clear(true);
01732 FlushDeletedVectors(eVIAffinity);
01733 }
01734 }}
01735 }
01736
01737
01738 void CQueue::Notify(unsigned addr, unsigned short port, unsigned job_id)
01739 {
01740 if (!m_HasNotificationPort) return;
01741 char msg[1024];
01742 sprintf(msg, "JNTF %u", job_id);
01743
01744 CFastMutexGuard guard(m_UdpSocketLock);
01745
01746 m_UdpSocket.Send(msg, strlen(msg)+1,
01747 CSocketAPI::ntoa(addr), port);
01748 }
01749
01750
01751 void CQueue::OptimizeMem()
01752 {
01753 m_StatusTracker.OptimizeMem();
01754 }
01755
01756
01757 void CQueue::AddJobsToAffinity(CBDB_Transaction& trans,
01758 unsigned aff_id,
01759 unsigned job_id_from,
01760 unsigned job_id_to)
01761
01762 {
01763 CFastMutexGuard guard(m_AffinityIdxLock);
01764 m_AffinityIdx.SetTransaction(&trans);
01765 TNSBitVector bv(bm::BM_GAP);
01766
01767
01768
01769
01770 m_AffinityIdx.aff_id = aff_id;
01771
01772 m_AffinityIdx.ReadVector(&bv, bm::set_OR, NULL);
01773 if (job_id_to == 0) {
01774 bv.set(job_id_from);
01775 } else {
01776 bv.set_range(job_id_from, job_id_to);
01777 }
01778 m_AffinityIdx.aff_id = aff_id;
01779 m_AffinityIdx.WriteVector(bv, SAffinityIdx::eNoCompact);
01780 }
01781
01782
01783 void CQueue::AddJobsToAffinity(CBDB_Transaction& trans,
01784 const vector<CJob>& batch)
01785 {
01786 CFastMutexGuard guard(m_AffinityIdxLock);
01787 m_AffinityIdx.SetTransaction(&trans);
01788
01789
01790 typedef map<unsigned, TNSBitVector*> TBVMap;
01791
01792 TBVMap bv_map;
01793 try {
01794 unsigned bsize = batch.size();
01795 for (unsigned i = 0; i < bsize; ++i) {
01796 const CJob& job = batch[i];
01797 unsigned aff_id = job.GetAffinityId();
01798 unsigned job_id_start = job.GetId();
01799
01800 TNSBitVector* aff_bv;
01801
01802 TBVMap::iterator aff_it = bv_map.find(aff_id);
01803 if (aff_it == bv_map.end()) {
01804 auto_ptr<TNSBitVector> bv(new TNSBitVector(bm::BM_GAP));
01805 m_AffinityIdx.aff_id = aff_id;
01806
01807 m_AffinityIdx.ReadVector(bv.get(), bm::set_OR, NULL);
01808 aff_bv = bv.get();
01809 bv_map[aff_id] = bv.release();
01810 } else {
01811 aff_bv = aff_it->second;
01812 }
01813
01814
01815
01816 unsigned j;
01817 for (j=i+1; j < bsize; ++j) {
01818 if (batch[j].GetAffinityId() != aff_id) {
01819 break;
01820 }
01821 _ASSERT(batch[j].GetId() == (batch[j-1].GetId()+1));
01822
01823 }
01824 --j;
01825
01826 if ((i!=j) && (aff_id == batch[j].GetAffinityId())) {
01827 unsigned job_id_end = batch[j].GetId();
01828 aff_bv->set_range(job_id_start, job_id_end);
01829 i = j;
01830 } else {
01831 aff_bv->set(job_id_start);
01832 }
01833
01834 }
01835
01836
01837 NON_CONST_ITERATE(TBVMap, it, bv_map) {
01838 unsigned aff_id = it->first;
01839 TNSBitVector* bv = it->second;
01840 bv->optimize();
01841
01842 m_AffinityIdx.aff_id = aff_id;
01843 m_AffinityIdx.WriteVector(*bv, SAffinityIdx::eNoCompact);
01844
01845 delete it->second; it->second = 0;
01846 }
01847 }
01848 catch (exception& )
01849 {
01850 NON_CONST_ITERATE(TBVMap, it, bv_map) {
01851 delete it->second; it->second = 0;
01852 }
01853 throw;
01854 }
01855
01856 }
01857
01858
01859 void
01860 CQueue::x_ReadAffIdx_NoLock(unsigned aff_id,
01861 TNSBitVector* jobs)
01862 {
01863 m_AffinityIdx.aff_id = aff_id;
01864 m_AffinityIdx.ReadVector(jobs, bm::set_OR);
01865 }
01866
01867
01868 void
01869 CQueue::GetJobsWithAffinities(const TNSBitVector& aff_id_set,
01870 TNSBitVector* jobs)
01871 {
01872 CFastMutexGuard guard(m_AffinityIdxLock);
01873 m_AffinityIdx.SetTransaction(NULL);
01874 TNSBitVector::enumerator en(aff_id_set.first());
01875 for (; en.valid(); ++en) {
01876 unsigned aff_id = *en;
01877 x_ReadAffIdx_NoLock(aff_id, jobs);
01878 }
01879 }
01880
01881
01882 void
01883 CQueue::GetJobsWithAffinity(unsigned aff_id,
01884 TNSBitVector* jobs)
01885 {
01886 CFastMutexGuard guard(m_AffinityIdxLock);
01887 m_AffinityIdx.SetTransaction(NULL);
01888 x_ReadAffIdx_NoLock(aff_id, jobs);
01889 }
01890
01891
01892 class CAffinityResolver : public IAffinityResolver
01893 {
01894 public:
01895 CAffinityResolver(CQueue& queue) : m_Queue(queue) {}
01896 virtual void GetJobsWithAffinities(const TNSBitVector& aff_id_set,
01897 TNSBitVector* jobs)
01898 {
01899 m_Queue.GetJobsWithAffinities(aff_id_set, jobs);
01900 }
01901 private:
01902 CQueue& m_Queue;
01903 };
01904
01905
01906 unsigned
01907 CQueue::FindPendingJob(CWorkerNode* worker_node,
01908 const list<string>& aff_list,
01909 time_t curr)
01910 {
01911 unsigned job_id = 0;
01912
01913 TNSBitVector blacklisted_jobs;
01914 TNSBitVector aff_ids;
01915 const TNSBitVector* effective_aff_ids = 0;
01916
01917
01918 bool is_specific_aff = !aff_list.empty();
01919 m_AffinityDict.GetTokensIds(aff_list, aff_ids);
01920 if (is_specific_aff && !aff_ids.any())
01921
01922 return 0;
01923
01924
01925
01926 {{
01927 CWorkerNodeAffinityGuard na(*worker_node);
01928
01929 if (is_specific_aff)
01930 na.CleanCandidates(aff_ids);
01931
01932
01933
01934 if (is_specific_aff) effective_aff_ids = &aff_ids;
01935
01936
01937
01938
01939 blacklisted_jobs = na.GetBlacklistedJobs(curr);
01940 CAffinityResolver affinity_resolver(*this);
01941 job_id = na.GetJobWithAffinities(effective_aff_ids,
01942 m_StatusTracker,
01943 affinity_resolver);
01944 if (job_id == unsigned(-1) || (!job_id && is_specific_aff)) return 0;
01945 }}
01946
01947
01948
01949
01950
01951 if (false && !job_id) {
01952 TNSBitVector assigned_aff;
01953 GetAllAssignedAffinities(curr, assigned_aff);
01954
01955 if (assigned_aff.any()) {
01956
01957
01958 TNSBitVector assigned_candidate_jobs(blacklisted_jobs);
01959
01960 GetJobsWithAffinities(assigned_aff, &assigned_candidate_jobs);
01961
01962
01963
01964 bool pending_jobs_avail =
01965 m_StatusTracker.GetPendingJob(assigned_candidate_jobs,
01966 &job_id);
01967 if (!job_id && !pending_jobs_avail)
01968 return 0;
01969 }
01970 }
01971
01972
01973
01974 if (!job_id)
01975 m_StatusTracker.GetPendingJob(blacklisted_jobs, &job_id);
01976
01977 return job_id;
01978 }
01979
01980
01981 struct SAffinityJobs
01982 {
01983 string aff_token;
01984 unsigned job_count;
01985 list<TWorkerNodeRef> nodes;
01986 };
01987 typedef map<unsigned, SAffinityJobs> TAffinities;
01988
01989 string CQueue::GetAffinityList()
01990 {
01991 string aff_list;
01992 unsigned aff_id;
01993 unsigned bv_cnt;
01994 TAffinities affinities;
01995 time_t now = time(0);
01996 {{
01997 CFastMutexGuard guard(m_AffinityIdxLock);
01998 m_AffinityIdx.SetTransaction(NULL);
01999 CBDB_FileCursor cur(m_AffinityIdx);
02000 cur.SetCondition(CBDB_FileCursor::eGE);
02001 cur.From << 0;
02002
02003 EBDB_ErrCode ret;
02004
02005 for (; (ret = cur.Fetch()) == eBDB_Ok;) {
02006 aff_id = m_AffinityIdx.aff_id;
02007 TNSBitVector bvect(bm::BM_GAP);
02008 ret = m_AffinityIdx.ReadVector(&bvect, bm::set_OR);
02009 m_StatusTracker.PendingIntersect(&bvect);
02010
02011 bv_cnt = bvect.count();
02012 if (ret != eBDB_Ok) {
02013 if (ret != eBDB_NotFound)
02014 ERR_POST(Error << "Error reading affinity index: " << ret);
02015 continue;
02016 }
02017 SAffinityJobs aff_jobs;
02018
02019 aff_jobs.aff_token = m_AffinityDict.GetAffToken(aff_id);
02020 aff_jobs.job_count = bv_cnt;
02021 affinities[aff_id] = aff_jobs;
02022
02023 }
02024 }}
02025
02026 {{
02027 CQueueWorkerNodeListGuard wnl(m_WorkerNodeList);
02028 list<TWorkerNodeRef> nodes;
02029 wnl.GetNodes(now, nodes);
02030 NON_CONST_ITERATE(list<TWorkerNodeRef>, node_it, nodes) {
02031 TNSBitVector::enumerator
02032 en(wnl.GetNodeAffinities(now, *node_it).first());
02033 for (; en.valid(); ++en) {
02034 unsigned aff_id = *en;
02035 affinities[aff_id].nodes.push_back(*node_it);
02036 }
02037 }
02038 }}
02039
02040 ITERATE(TAffinities, it, affinities) {
02041 if (aff_list.size()) aff_list += ", ";
02042 aff_list += it->second.aff_token;
02043 aff_list += ' ';
02044 aff_list += NStr::UIntToString(it->second.job_count);
02045 ITERATE(list<TWorkerNodeRef>, node_it, it->second.nodes) {
02046 aff_list += ' ';
02047 aff_list += (*node_it)->GetId();
02048 }
02049 }
02050 return aff_list;
02051 }
02052
02053
02054 bool CQueue::FailJob(CWorkerNode* worker_node,
02055 unsigned job_id,
02056 const string& err_msg,
02057 const string& output,
02058 int ret_code)
02059 {
02060 unsigned failed_retries;
02061 unsigned max_output_size;
02062 time_t blacklist_time;
02063 {{
02064 CQueueParamAccessor qp(*this);
02065 failed_retries = qp.GetFailedRetries();
02066 max_output_size = qp.GetMaxOutputSize();
02067 blacklist_time = qp.GetBlacklistTime();
02068 }}
02069
02070 if (output.size() > max_output_size) {
02071 NCBI_THROW(CNetScheduleException, eDataTooLong,
02072 "Output is too long");
02073 }
02074
02075
02076
02077
02078 CQueueJSGuard js_guard(this, job_id,
02079 CNetScheduleAPI::eFailed);
02080
02081 CJob job;
02082 CNS_Transaction trans(this);
02083
02084 time_t curr = time(0);
02085
02086 bool rescheduled = false;
02087 {{
02088 CQueueGuard guard(this, &trans);
02089
02090 CJob::EJobFetchResult res = job.Fetch(this, job_id);
02091 if (res != CJob::eJF_Ok) {
02092
02093 LOG_POST(Error << "Can not fetch job " << DecorateJobId(job_id));
02094 return false;
02095 }
02096
02097 unsigned run_count = job.GetRunCount();
02098 if (run_count <= failed_retries) {
02099 time_t exp_time = blacklist_time ? curr + blacklist_time : 0;
02100 BlacklistJob(worker_node, job_id, exp_time);
02101 job.SetStatus(CNetScheduleAPI::ePending);
02102 js_guard.SetStatus(CNetScheduleAPI::ePending);
02103 rescheduled = true;
02104 LOG_POST(Warning << "Job " << DecorateJobId(job_id)
02105 << " rescheduled with "
02106 << (failed_retries - run_count)
02107 << " retries left");
02108 } else {
02109 job.SetStatus(CNetScheduleAPI::eFailed);
02110 rescheduled = false;
02111 LOG_POST(Warning << "Job " << DecorateJobId(job_id)
02112 << " failed");
02113 }
02114
02115 CJobRun* run = job.GetLastRun();
02116 if (!run) {
02117 ERR_POST(Error << "No JobRun for running job "
02118 << DecorateJobId(job_id));
02119 run = &job.AppendRun();
02120 }
02121
02122 run->SetStatus(CNetScheduleAPI::eFailed);
02123 run->SetTimeDone(curr);
02124 run->SetErrorMsg(err_msg);
02125 run->SetRetCode(ret_code);
02126 job.SetOutput(output);
02127
02128 job.Flush(this);
02129 }}
02130 RemoveJobFromWorkerNode(job, eNSCFailed);
02131
02132
02133 trans.Commit();
02134 js_guard.Commit();
02135
02136 if (m_RunTimeLine) {
02137 CWriteLockGuard guard(m_RunTimeLineLock);
02138 m_RunTimeLine->RemoveObject(job_id);
02139 }
02140
02141 if (!rescheduled && job.ShouldNotify(curr)) {
02142 Notify(job.GetSubmAddr(), job.GetSubmPort(), job_id);
02143 }
02144
02145 if (IsMonitoring()) {
02146 CTime tmp_t(CTime::eCurrent);
02147 string msg = tmp_t.AsString();
02148 msg += " CQueue::JobFailed() job id=";
02149 msg += NStr::IntToString(job_id);
02150 msg += " err_msg=";
02151 msg += err_msg;
02152 msg += " output=";
02153 msg += output;
02154 if (rescheduled)
02155 msg += " rescheduled";
02156 MonitorPost(msg);
02157 }
02158 return true;
02159 }
02160
02161
02162
02163 void CQueue::JobsWithStatus(TJobStatus status,
02164 TNSBitVector* bv) const
02165 {
02166 m_StatusTracker.StatusSnapshot(status, bv);
02167 }
02168
02169
02170
02171 class CQueryFunctionEQ : public CQueryFunction_BV_Base<TNSBitVector>
02172 {
02173 public:
02174 CQueryFunctionEQ(CQueue& queue) :
02175 m_Queue(queue)
02176 {}
02177 typedef CQueryFunction_BV_Base<TNSBitVector> TParent;
02178 typedef TParent::TBVContainer::TBuffer TBuffer;
02179 typedef TParent::TBVContainer::TBitVector TBitVector;
02180 virtual void Evaluate(CQueryParseTree::TNode& qnode);
02181 private:
02182 void x_CheckArgs(const CQueryFunctionBase::TArgVector& args);
02183 CQueue& m_Queue;
02184 };
02185
02186 void CQueryFunctionEQ::Evaluate(CQueryParseTree::TNode& qnode)
02187 {
02188
02189 CQueryFunctionBase::TArgVector args;
02190 this->MakeArgVector(qnode, args);
02191 x_CheckArgs(args);
02192 const string& key = args[0]->GetValue().GetStrValue();
02193 const string& val = args[1]->GetValue().GetStrValue();
02194 auto_ptr<TNSBitVector> bv;
02195 auto_ptr<TBuffer> buf;
02196 if (key == "status") {
02197
02198 CNetScheduleAPI::EJobStatus status =
02199 CNetScheduleAPI::StringToStatus(val);
02200 if (status == CNetScheduleAPI::eJobNotFound)
02201 NCBI_THROW(CNetScheduleException,
02202 eQuerySyntaxError, string("Unknown status: ") + val);
02203 bv.reset(new TNSBitVector);
02204 m_Queue.JobsWithStatus(status, bv.get());
02205 } else if (key == "id") {
02206 unsigned job_id = CNetScheduleKey(val).id;
02207 bv.reset(new TNSBitVector);
02208 bv->set(job_id);
02209 } else {
02210 if (val == "*") {
02211
02212 bv.reset(new TNSBitVector);
02213 m_Queue.ReadTags(key, bv.get());
02214 } else {
02215 buf.reset(new TBuffer);
02216 if (!m_Queue.ReadTag(key, val, buf.get())) {
02217
02218 bv.reset(new TNSBitVector());
02219 buf.reset(NULL);
02220 }
02221 }
02222 }
02223 if (qnode.GetValue().IsNot()) {
02224
02225 if (bv.get()) {
02226 bv->invert();
02227 } else if (buf.get()) {
02228 bv.reset(new TNSBitVector());
02229 bm::operation_deserializer<TNSBitVector>::deserialize(*bv,
02230 &((*buf.get())[0]),
02231 0,
02232 bm::set_ASSIGN);
02233 bv.get()->invert();
02234 }
02235 }
02236 if (bv.get())
02237 this->MakeContainer(qnode)->SetBV(bv.release());
02238 else if (buf.get())
02239 this->MakeContainer(qnode)->SetBuffer(buf.release());
02240 }
02241
02242
02243 void CQueryFunctionEQ::x_CheckArgs(const CQueryFunctionBase::TArgVector& args)
02244 {
02245 if (args.size() != 2 ||
02246 (args[0]->GetValue().GetType() != CQueryParseNode::eIdentifier &&
02247 args[0]->GetValue().GetType() != CQueryParseNode::eString)) {
02248 NCBI_THROW(CNetScheduleException,
02249 eQuerySyntaxError, "Wrong arguments for '='");
02250 }
02251 }
02252
02253
02254 TNSBitVector* CQueue::ExecSelect(const string& query, list<string>& fields)
02255 {
02256 CQueryParseTree qtree;
02257 try {
02258 qtree.Parse(query.c_str());
02259 } catch (CQueryParseException& ex) {
02260 NCBI_THROW(CNetScheduleException, eQuerySyntaxError, ex.GetMsg());
02261 }
02262 CQueryParseTree::TNode* top = qtree.GetQueryTree();
02263 if (!top)
02264 NCBI_THROW(CNetScheduleException,
02265 eQuerySyntaxError, "Query syntax error in parse");
02266
02267 if (top->GetValue().GetType() == CQueryParseNode::eSelect) {
02268
02269 typedef CQueryParseTree::TNode::TNodeList_I TNodeIterator;
02270 for (TNodeIterator it = top->SubNodeBegin();
02271 it != top->SubNodeEnd(); ++it) {
02272 CQueryParseTree::TNode* node = *it;
02273 CQueryParseNode::EType node_type = node->GetValue().GetType();
02274 if (node_type == CQueryParseNode::eList) {
02275 for (TNodeIterator it2 = node->SubNodeBegin();
02276 it2 != node->SubNodeEnd(); ++it2) {
02277 fields.push_back((*it2)->GetValue().GetStrValue());
02278 }
02279 }
02280 if (node_type == CQueryParseNode::eWhere) {
02281 TNodeIterator it2 = node->SubNodeBegin();
02282 if (it2 == node->SubNodeEnd())
02283 NCBI_THROW(CNetScheduleException,
02284 eQuerySyntaxError,
02285 "Query syntax error in WHERE clause");
02286 top = (*it2);
02287 break;
02288 }
02289 }
02290 }
02291
02292
02293 CQueryExec qexec;
02294 qexec.AddFunc(CQueryParseNode::eAnd,
02295 new CQueryFunction_BV_Logic<TNSBitVector>(bm::set_AND));
02296 qexec.AddFunc(CQueryParseNode::eOr,
02297 new CQueryFunction_BV_Logic<TNSBitVector>(bm::set_OR));
02298 qexec.AddFunc(CQueryParseNode::eSub,
02299 new CQueryFunction_BV_Logic<TNSBitVector>(bm::set_SUB));
02300 qexec.AddFunc(CQueryParseNode::eXor,
02301 new CQueryFunction_BV_Logic<TNSBitVector>(bm::set_XOR));
02302 qexec.AddFunc(CQueryParseNode::eIn,
02303 new CQueryFunction_BV_In_Or<TNSBitVector>());
02304 qexec.AddFunc(CQueryParseNode::eNot,
02305 new CQueryFunction_BV_Not<TNSBitVector>());
02306
02307 qexec.AddFunc(CQueryParseNode::eEQ,
02308 new CQueryFunctionEQ(*this));
02309
02310 {{
02311 CFastMutexGuard guard(GetTagLock());
02312 SetTagDbTransaction(NULL);
02313 qexec.Evaluate(qtree, *top);
02314 }}
02315
02316 IQueryParseUserObject* uo = top->GetValue().GetUserObject();
02317 if (!uo)
02318 NCBI_THROW(CNetScheduleException,
02319 eQuerySyntaxError, "Query syntax error in eval");
02320 typedef CQueryEval_BV_Value<TNSBitVector> BV_UserObject;
02321 BV_UserObject* result =
02322 dynamic_cast<BV_UserObject*>(uo);
02323 _ASSERT(result);
02324 auto_ptr<TNSBitVector> bv(result->ReleaseBV());
02325 if (!bv.get()) {
02326 bv.reset(new TNSBitVector());
02327 BV_UserObject::TBuffer *buf = result->ReleaseBuffer();
02328 if (buf && buf->size()) {
02329 bm::operation_deserializer<TNSBitVector>::deserialize(*bv,
02330 &((*buf)[0]),
02331 0,
02332 bm::set_ASSIGN);
02333 }
02334 }
02335
02336 FilterJobs(*(bv.get()));
02337
02338 return bv.release();
02339 }
02340
02341 void CQueue::PrepareFields(SFieldsDescription& field_descr,
02342 const list<string>& fields)
02343 {
02344 field_descr.Init();
02345
02346
02347 ITERATE(list<string>, it, fields) {
02348 const string& field_name = NStr::TruncateSpaces(*it);
02349 string tag_name;
02350 SFieldsDescription::EFieldType ftype;
02351 int i;
02352 if ((i = CJob::GetFieldIndex(field_name)) >= 0) {
02353 ftype = SFieldsDescription::eFT_Job;
02354 if (field_name == "affinity")
02355 field_descr.has_affinity = true;
02356 } else if ((i = CJobRun::GetFieldIndex(field_name)) >= 0) {
02357 ftype = SFieldsDescription::eFT_Run;
02358
02359 } else if (field_name == "run") {
02360 ftype = SFieldsDescription::eFT_RunNum;
02361 field_descr.has_run = true;
02362 } else if (NStr::StartsWith(field_name, "tag.")) {
02363 ftype = SFieldsDescription::eFT_Tag;
02364 tag_name = field_name.substr(4);
02365 field_descr.has_tags = true;
02366 } else {
02367 NCBI_THROW(CNetScheduleException, eQuerySyntaxError,
02368 string("Unknown field: ") + (*it));
02369 }
02370 SFieldsDescription::FFormatter formatter = NULL;
02371 if (i >= 0) {
02372 if (field_name == "id") {
02373 formatter = FormatNSId;
02374 } else if (field_name == "node_addr" ||
02375 field_name == "subm_addr") {
02376 formatter = FormatHostName;
02377 }
02378 }
02379 field_descr.field_types.push_back(ftype);
02380 field_descr.field_nums.push_back(i);
02381 field_descr.formatters.push_back(formatter);
02382 field_descr.pos_to_tag.push_back(tag_name);
02383 }
02384 }
02385
02386
02387 void CQueue::ExecProject(TRecordSet& record_set,
02388 const TNSBitVector& ids,
02389 const SFieldsDescription& field_descr)
02390 {
02391 int record_size = field_descr.field_nums.size();
02392
02393 CJob job;
02394 TNSBitVector::enumerator en(ids.first());
02395 CQueueGuard guard(this);
02396 for ( ; en.valid(); ++en) {
02397 map<string, string> tags;
02398 unsigned job_id = *en;
02399
02400
02401
02402 CJob::EJobFetchResult res = job.Fetch(this, job_id);
02403 if (res != CJob::eJF_Ok) {
02404 if (res != CJob::eJF_NotFound)
02405 ERR_POST(Error << "Error reading queue job db");
02406 continue;
02407 }
02408 if (field_descr.has_affinity)
02409 job.FetchAffinityToken(this);
02410 if (field_descr.has_tags) {
02411
02412 ITERATE(TNSTagList, it, job.GetTags()) {
02413 tags[it->first] = it->second;
02414 }
02415 }
02416
02417 if (!field_descr.has_run) {
02418 vector<string> record(record_size);
02419 if (x_FillRecord(record, field_descr, job, tags, job.GetLastRun(), 0))
02420 record_set.push_back(record);
02421 } else {
02422 int run_num = 1;
02423 ITERATE(vector<CJobRun>, it, job.GetRuns()) {
02424 vector<string> record(record_size);
02425 if (x_FillRecord(record, field_descr, job, tags, &(*it), run_num++))
02426 record_set.push_back(record);
02427 }
02428 }
02429 }
02430 }
02431
02432
02433 bool CQueue::x_FillRecord(vector<string>& record,
02434 const SFieldsDescription& field_descr,
02435 const CJob& job,
02436 map<string, string>& tags,
02437 const CJobRun* run,
02438 int run_num)
02439 {
02440 int record_size = field_descr.field_nums.size();
02441
02442 bool complete = true;
02443 for (int i = 0; i < record_size; ++i) {
02444 int fnum = field_descr.field_nums[i];
02445 switch(field_descr.field_types[i]) {
02446 case SFieldsDescription::eFT_Job:
02447 record[i] = job.GetField(fnum);
02448 break;
02449 case SFieldsDescription::eFT_Run:
02450 if (run)
02451 record[i] = run->GetField(fnum);
02452 else
02453 record[i] = "NULL";
02454 break;
02455 case SFieldsDescription::eFT_Tag:
02456 {
02457 map<string, string>::iterator it =
02458 tags.find((field_descr.pos_to_tag)[i]);
02459 if (it == tags.end()) {
02460 complete = false;
02461 } else {
02462 record[i] = it->second;
02463 }
02464 }
02465 break;
02466 case SFieldsDescription::eFT_RunNum:
02467 record[i] = NStr::IntToString(run_num);
02468 break;
02469 }
02470 if (!complete) break;
02471 }
02472 return complete;
02473 }
02474
02475
02476
02477
02478
02479
02480 void CQueue::ClearWorkerNode(CWorkerNode* worker_node, const string& reason)
02481 {
02482 TJobList jobs;
02483 m_WorkerNodeList.ClearNode(worker_node, jobs);
02484 x_FailJobs(jobs, worker_node, "Node closed, " + reason);
02485 }
02486
02487
02488 void CQueue::ClearWorkerNode(const string& node_id, const string& reason)
02489 {
02490 TJobList jobs;
02491 TWorkerNodeRef worker_node = m_WorkerNodeList.ClearNode(node_id, jobs);
02492
02493 if (worker_node)
02494 x_FailJobs(jobs, worker_node, "Node closed, " + reason);
02495 }
02496
02497 void CQueue::x_FailJobs(const TJobList& jobs,
02498 CWorkerNode* worker_node,
02499 const string& err_msg)
02500 {
02501 ITERATE(TJobList, it, jobs) {
02502 FailJob(worker_node, *it, err_msg, "", 0);
02503 }
02504 }
02505
02506 void CQueue::NotifyListeners(bool unconditional, unsigned aff_id)
02507 {
02508
02509
02510
02511 if (!m_HasNotificationPort) return;
02512
02513 int notify_timeout = GetNotifyTimeout();
02514
02515 time_t curr = time(0);
02516
02517 list<TWorkerNodeHostPort> notify_list;
02518 if ((unconditional || m_StatusTracker.AnyPending()) &&
02519 m_WorkerNodeList.GetNotifyList(unconditional, curr,
02520 notify_timeout, notify_list)) {
02521
02522 #define JSQ_PREFIX "NCBI_JSQ_"
02523 char msg[256];
02524 strcpy(msg, JSQ_PREFIX);
02525 strncat(msg, m_QueueName.c_str(), sizeof(msg) - sizeof(JSQ_PREFIX) - 1);
02526 size_t msg_len = sizeof(JSQ_PREFIX) + m_QueueName.length();
02527
02528 unsigned i = 0;
02529 ITERATE(list<TWorkerNodeHostPort>, it, notify_list) {
02530 unsigned host = it->first;
02531 unsigned short port = it->second;
02532 {{
02533 CFastMutexGuard guard(m_UdpSocketLock);
02534
02535 m_UdpSocket.Send(msg, msg_len,
02536 CSocketAPI::ntoa(host), port);
02537 }}
02538
02539 if ((++i % 10 == 0) && !m_StatusTracker.AnyPending())
02540 break;
02541 }
02542 }
02543 }
02544
02545
02546 void CQueue::PrintWorkerNodeStat(CNcbiOstream& out,
02547 time_t curr,
02548 EWNodeFormat fmt) const
02549 {
02550 list<TWorkerNodeRef> nodes;
02551 m_WorkerNodeList.GetNodes(curr, nodes);
02552 ITERATE(list<TWorkerNodeRef>, it, nodes) {
02553 const CWorkerNode* wn = *it;
02554 out << wn->AsString(curr, fmt) << "\n";
02555 }
02556 }
02557
02558
02559 void CQueue::UnRegisterNotificationListener(CWorkerNode* worker_node)
02560 {
02561 if (m_WorkerNodeList.UnRegisterNotificationListener(worker_node)) {
02562
02563
02564 ClearWorkerNode(worker_node, "URGC");
02565 }
02566 }
02567
02568
02569 void CQueue::AddJobToWorkerNode(CWorkerNode* worker_node,
02570 CRequestContextFactory* rec_ctx_f,
02571 const CJob& job,
02572 time_t exp_time)
02573 {
02574 unsigned log_job_state = GetLogJobState();
02575 m_WorkerNodeList.AddJob(worker_node, job, exp_time,
02576 rec_ctx_f, log_job_state);
02577 CountEvent(eStatGetEvent);
02578 }
02579
02580
02581 void CQueue::UpdateWorkerNodeJob(unsigned job_id, time_t exp_time)
02582 {
02583 m_WorkerNodeList.UpdateJob(job_id, exp_time);
02584 }
02585
02586
02587 void CQueue::RemoveJobFromWorkerNode(const CJob& job,
02588 ENSCompletion reason)
02589 {
02590 unsigned log_job_state = GetLogJobState();
02591 m_WorkerNodeList.RemoveJob(job, reason, log_job_state);
02592 CountEvent(eStatPutEvent);
02593 }
02594
02595
02596
02597
02598 void CQueue::GetAllAssignedAffinities(time_t t, TNSBitVector& aff_ids)
02599 {
02600 CQueueWorkerNodeListGuard wnl(m_WorkerNodeList);
02601 wnl.GetAffinities(t, aff_ids);
02602 }
02603
02604
02605 void CQueue::AddAffinity(CWorkerNode* worker_node,
02606 unsigned aff_id,
02607 time_t exp_time)
02608 {
02609 CWorkerNodeAffinityGuard na(*worker_node);
02610 CStopWatch sw(CStopWatch::eStart);
02611 na.AddAffinity(aff_id,exp_time);
02612
02613 }
02614
02615
02616 void CQueue::BlacklistJob(CWorkerNode* worker_node,
02617 unsigned job_id,
02618 time_t exp_time)
02619 {
02620 CWorkerNodeAffinityGuard na(*worker_node);
02621 na.BlacklistJob(job_id, exp_time);
02622 }
02623
02624
02625
02626
02627 void CQueue::SetTagDbTransaction(CBDB_Transaction* trans)
02628 {
02629 m_TagDB.SetTransaction(trans);
02630 }
02631
02632
02633 void CQueue::AppendTags(CNSTagMap& tag_map,
02634 const TNSTagList& tags,
02635 unsigned job_id)
02636 {
02637 ITERATE(TNSTagList, it, tags) {
02638
02639
02640
02641
02642
02643
02644
02645
02646 TNSTagMap::iterator tag_it = (*tag_map).find((*it));
02647 if (tag_it == (*tag_map).end()) {
02648 pair<TNSTagMap::iterator, bool> tag_map_it =
02649
02650 (*tag_map).insert(TNSTagMap::value_type((*it), new TNSShortIntSet));
02651 tag_it = tag_map_it.first;
02652 }
02653
02654 tag_it->second->push_back(job_id);
02655 }
02656 }
02657
02658
02659 void CQueue::FlushTags(CNSTagMap& tag_map, CBDB_Transaction& trans)
02660 {
02661 CFastMutexGuard guard(m_TagLock);
02662 m_TagDB.SetTransaction(&trans);
02663 NON_CONST_ITERATE(TNSTagMap, it, *tag_map) {
02664 m_TagDB.key = it->first.first;
02665 m_TagDB.val = it->first.second;
02666
02667
02668
02669
02670
02671
02672
02673
02674
02675
02676
02677
02678
02679
02680 TNSBitVector bv_tmp(bm::BM_GAP);
02681 EBDB_ErrCode err = m_TagDB.ReadVector(&bv_tmp);
02682 if (err != eBDB_Ok && err != eBDB_NotFound) {
02683
02684 }
02685 bm::combine_or(bv_tmp, it->second->begin(), it->second->end());
02686
02687 m_TagDB.key = it->first.first;
02688 m_TagDB.val = it->first.second;
02689 m_TagDB.WriteVector(bv_tmp, STagDB::eNoCompact);
02690
02691 delete it->second;
02692 it->second = 0;
02693 }
02694 (*tag_map).clear();
02695 }
02696
02697
02698 bool CQueue::ReadTag(const string& key,
02699 const string& val,
02700 TBuffer* buf)
02701 {
02702
02703 CBDB_FileCursor cur(m_TagDB);
02704 cur.SetCondition(CBDB_FileCursor::eEQ);
02705 cur.From << key << val;
02706
02707 return cur.Fetch(buf) == eBDB_Ok;
02708 }
02709
02710
02711 void CQueue::ReadTags(const string& key, TNSBitVector* bv)
02712 {
02713
02714 CBDB_FileCursor cur(m_TagDB);
02715 cur.SetCondition(CBDB_FileCursor::eEQ);
02716 cur.From << key;
02717 TBuffer buf;
02718 bm::set_operation op_code = bm::set_ASSIGN;
02719 EBDB_ErrCode err;
02720 while ((err = cur.Fetch(&buf)) == eBDB_Ok) {
02721 bm::operation_deserializer<TNSBitVector>::deserialize(
02722 *bv, &(buf[0]), 0, op_code);
02723 op_code = bm::set_OR;
02724 }
02725 if (err != eBDB_Ok && err != eBDB_NotFound) {
02726
02727 }
02728 }
02729
02730
02731 void CQueue::x_RemoveTags(CBDB_Transaction& trans,
02732 const TNSBitVector& ids)
02733 {
02734 CFastMutexGuard guard(m_TagLock);
02735 m_TagDB.SetTransaction(&trans);
02736 CBDB_FileCursor cur(m_TagDB, trans,
02737 CBDB_FileCursor::eReadModifyUpdate);
02738
02739 cur.SetCondition(CBDB_FileCursor::eFirst);
02740 CBDB_RawFile::TBuffer buf;
02741 TNSBitVector bv;
02742 while (cur.Fetch(&buf) == eBDB_Ok) {
02743 bm::deserialize(bv, &buf[0]);
02744 unsigned before_remove = bv.count();
02745 bv -= ids;
02746 unsigned new_count;
02747 if ((new_count = bv.count()) != before_remove) {
02748 if (new_count) {
02749 TNSBitVector::statistics st;
02750 bv.optimize(0, TNSBitVector::opt_compress, &st);
02751 if (st.max_serialize_mem > buf.size()) {
02752 buf.resize(st.max_serialize_mem);
02753 }
02754
02755 size_t size = bm::serialize(bv, &buf[0]);
02756 cur.UpdateBlob(&buf[0], size);
02757 } else {
02758 cur.Delete(CBDB_File::eIgnoreError);
02759 }
02760 }
02761 bv.clear(true);
02762 }
02763 }
02764
02765
02766
02767
02768 void CQueue::CheckExecutionTimeout()
02769 {
02770 if (!m_RunTimeLine)
02771 return;
02772 unsigned queue_run_timeout = GetRunTimeout();
02773 time_t curr = time(0);
02774 TNSBitVector bv;
02775 {{
02776 CReadLockGuard guard(m_RunTimeLineLock);
02777 m_RunTimeLine->ExtractObjects(curr, &bv);
02778 }}
02779 TNSBitVector::enumerator en(bv.first());
02780 for ( ;en.valid(); ++en) {
02781 x_CheckExecutionTimeout(queue_run_timeout, *en, curr);
02782 }
02783 }
02784
02785
02786 void
02787 CQueue::x_CheckExecutionTimeout(unsigned queue_run_timeout,
02788 unsigned job_id,
02789 time_t curr_time)
02790 {
02791 unsigned log_job_state = GetLogJobState();
02792 TJobStatus status = GetJobStatus(job_id);
02793
02794 TJobStatus new_status, run_status;
02795 if (status == CNetScheduleAPI::eRunning) {
02796 new_status = CNetScheduleAPI::ePending;
02797 run_status = CNetScheduleAPI::eTimeout;
02798 } else if (status == CNetScheduleAPI::eReading) {
02799 new_status = CNetScheduleAPI::eDone;
02800 run_status = CNetScheduleAPI::eReadTimeout;
02801 } else {
02802 return;
02803 }
02804
02805 CJob job;
02806 CNS_Transaction trans(this);
02807
02808 unsigned time_start = 0;
02809 unsigned run_timeout = 0;
02810 time_t exp_time;
02811 {{
02812 CQueueGuard guard(this, &trans);
02813
02814 CJob::EJobFetchResult res = job.Fetch(this, job_id);
02815 if (res != CJob::eJF_Ok)
02816 return;
02817
02818 if (job.GetStatus() != status) {
02819 ERR_POST(Error << "Job status mismatch between status tracker"
02820 << " and database for job " << DecorateJobId(job_id));
02821 return;
02822 }
02823
02824 CJobRun* run = job.GetLastRun();
02825 if (!run) {
02826 ERR_POST(Error << "No JobRun for running job "
02827 << DecorateJobId(job_id));
02828
02829 run = &job.AppendRun();
02830 run->SetTimeStart(curr_time);
02831 }
02832 time_start = run->GetTimeStart();
02833 _ASSERT(time_start);
02834 run_timeout = job.GetRunTimeout();
02835 if (run_timeout == 0) run_timeout = queue_run_timeout;
02836
02837 exp_time = run_timeout ? time_start + run_timeout : 0;
02838 if (curr_time < exp_time) {
02839
02840 TimeLineAdd(job_id, exp_time);
02841 return;
02842 }
02843
02844 if (status == CNetScheduleAPI::eReading) {
02845 unsigned group = job.GetReadGroup();
02846 x_RemoveFromReadGroup(group, job_id);
02847 }
02848
02849 job.SetStatus(new_status);
02850 run->SetStatus(run_status);
02851 run->SetTimeDone(curr_time);
02852
02853 job.Flush(this);
02854 }}
02855
02856 trans.Commit();
02857
02858 m_StatusTracker.SetStatus(job_id, new_status);
02859
02860 if (status == CNetScheduleAPI::eRunning)
02861 m_WorkerNodeList.RemoveJob(job, eNSCTimeout, log_job_state);
02862
02863 {{
02864 CTime tm(CTime::eCurrent);
02865 string msg = tm.AsString();
02866 msg += " CQueue::CheckExecutionTimeout: Job rescheduled for ";
02867 if (status == CNetScheduleAPI::eRunning)
02868 msg += "execution";
02869 else
02870 msg += "reading";
02871 msg += " id=";
02872 msg += NStr::IntToString(job_id);
02873 tm.SetTimeT(time_start);
02874 tm.ToLocalTime();
02875 msg += " time_start=";
02876 msg += tm.AsString();
02877
02878 tm.SetTimeT(exp_time);
02879 tm.ToLocalTime();
02880 msg += " exp_time=";
02881 msg += tm.AsString();
02882 msg += " run_timeout(sec)=";
02883 msg += NStr::IntToString(run_timeout);
02884 msg += " run_timeout(minutes)=";
02885 msg += NStr::IntToString(run_timeout/60);
02886 LOG_POST(Warning << msg);
02887
02888 if (IsMonitoring()) {
02889 MonitorPost(msg);
02890 }
02891 }}
02892 }
02893
02894
02895 unsigned CQueue::CheckJobsExpiry(unsigned batch_size, TJobStatus status)
02896 {
02897 unsigned queue_timeout, queue_run_timeout;
02898 {{
02899 CQueueParamAccessor qp(*this);
02900 queue_timeout = qp.GetTimeout();
02901 queue_run_timeout = qp.GetRunTimeout();
02902 }}
02903
02904 TNSBitVector job_ids;
02905 TNSBitVector not_found_jobs;
02906 time_t curr = time(0);
02907 CJob job;
02908 unsigned del_count = 0;
02909 bool has_db_error = false;
02910 #ifdef _DEBUG
02911 static bool debug_job_discrepancy = false;
02912 #endif
02913 {{
02914 CQueueGuard guard(this);
02915 unsigned job_id = 0;
02916 for (unsigned n = 0; n < batch_size; ++n) {
02917 job_id = m_StatusTracker.GetNext(status, job_id);
02918 if (job_id == 0)
02919 break;
02920
02921
02922 CJob::EJobFetchResult res = job.Fetch(this, job_id);
02923 if (res != CJob::eJF_Ok
02924 #ifdef _DEBUG
02925 || debug_job_discrepancy
02926 #endif
02927 ) {
02928 if (res != CJob::eJF_NotFound)
02929 has_db_error = true;
02930
02931 not_found_jobs.set_bit(job_id);
02932 m_StatusTracker.Erase(job_id);
02933
02934 continue;
02935 }
02936
02937
02938 time_t time_update, time_done;
02939 time_t timeout, run_timeout;
02940
02941 TJobStatus status = job.GetStatus();
02942
02943 timeout = job.GetTimeout();
02944 if (timeout == 0) timeout = queue_timeout;
02945 run_timeout = job.GetRunTimeout();
02946 if (run_timeout == 0) run_timeout = queue_run_timeout;
02947
02948
02949 const CJobRun* run = job.GetLastRun();
02950 time_done = 0;
02951 if (run) time_done = run->GetTimeDone();
02952 if (status == CNetScheduleAPI::eRunning ||
02953 status == CNetScheduleAPI::eReading) {
02954
02955 if (run) {
02956 time_update = run->GetTimeStart();
02957 } else {
02958 ERR_POST(Error << "No JobRun for running/reading job "
02959 << DecorateJobId(job_id));
02960 time_update = 0;
02961 }
02962 timeout += run_timeout;
02963 } else if (time_done == 0) {
02964
02965 time_update = job.GetTimeSubmit();
02966 } else {
02967
02968 time_update = time_done;
02969 }
02970
02971 if (time_update + timeout <= curr) {
02972 m_StatusTracker.Erase(job_id);
02973 job_ids.set_bit(job_id);
02974 if (status == CNetScheduleAPI::eReading) {
02975 unsigned group_id = job.GetReadGroup();
02976 x_RemoveFromReadGroup(group_id, job_id);
02977 }
02978 ++del_count;
02979 }
02980 }
02981 }}
02982 if (not_found_jobs.any()) {
02983 if (has_db_error) {
02984 LOG_POST(Error << "While deleting, errors in database"
02985 << ", status " << CNetScheduleAPI::StatusToString(status)
02986 << ", queue " << m_QueueName
02987 << ", job(s) " << NS_EncodeBitVector(not_found_jobs));
02988 } else {
02989 LOG_POST(Warning << "While deleting, jobs not found in database"
02990 << ", status " << CNetScheduleAPI::StatusToString(status)
02991 << ", queue " << m_QueueName
02992 << ", job(s) " << NS_EncodeBitVector(not_found_jobs));
02993 }
02994 }
02995 if (del_count)
02996 Erase(job_ids);
02997 return del_count;
02998 }
02999
03000
03001 void CQueue::TimeLineMove(unsigned job_id, time_t old_time, time_t new_time)
03002 {
03003 CWriteLockGuard guard(m_RunTimeLineLock);
03004 m_RunTimeLine->MoveObject(old_time, new_time, job_id);
03005 }
03006
03007
03008 void CQueue::TimeLineAdd(unsigned job_id, time_t timeout)
03009 {
03010 if (!job_id || !m_RunTimeLine || !timeout) return;
03011 CWriteLockGuard guard(m_RunTimeLineLock);
03012 m_RunTimeLine->AddObject(timeout, job_id);
03013 }
03014
03015
03016 void CQueue::TimeLineRemove(unsigned job_id)
03017 {
03018 if (!m_RunTimeLine) return;
03019
03020 CWriteLockGuard guard(m_RunTimeLineLock);
03021 m_RunTimeLine->RemoveObject(job_id);
03022
03023 if (IsMonitoring()) {
03024 CTime tmp_t(CTime::eCurrent);
03025 string msg = tmp_t.AsString();
03026 msg += " CQueue::RemoveFromTimeLine: job id=";
03027 msg += NStr::IntToString(job_id);
03028 MonitorPost(msg);
03029 }
03030 }
03031
03032
03033 void CQueue::TimeLineExchange(unsigned remove_job_id,
03034 unsigned add_job_id,
03035 time_t timeout)
03036 {
03037 if (!m_RunTimeLine) return;
03038
03039 CWriteLockGuard guard(m_RunTimeLineLock);
03040 if (remove_job_id)
03041 m_RunTimeLine->RemoveObject(remove_job_id);
03042 if (add_job_id)
03043 m_RunTimeLine->AddObject(timeout, add_job_id);
03044
03045 if (IsMonitoring()) {
03046 CTime tmp_t(CTime::eCurrent);
03047 string msg = tmp_t.AsString();
03048 msg += " CQueue::TimeLineExchange:";
03049 if (remove_job_id) {
03050 msg += " job removed=";
03051 msg += NStr::IntToString(remove_job_id);
03052 }
03053 if (add_job_id) {
03054 msg += " job added=";
03055 msg += NStr::IntToString(add_job_id);
03056 }
03057 MonitorPost(msg);
03058 }
03059 }
03060
03061
03062 unsigned CQueue::DeleteBatch(unsigned batch_size)
03063 {
03064 TNSBitVector batch;
03065 {{
03066 CFastMutexGuard guard(m_JobsToDeleteLock);
03067 unsigned job_id = 0;
03068 for (unsigned n = 0; n < batch_size &&
03069 (job_id = m_JobsToDelete.extract_next(job_id));
03070 ++n)
03071 {
03072 batch.set(job_id);
03073 }
03074 if (batch.any())
03075 FlushDeletedVectors(eVIJob);
03076 }}
03077 if (batch.none()) return 0;
03078
03079 unsigned actual_batch_size = batch.count();
03080 unsigned chunks = (actual_batch_size + 999) / 1000;
03081 unsigned chunk_size = actual_batch_size / chunks;
03082 unsigned residue = actual_batch_size - chunks*chunk_size;
03083
03084 TNSBitVector::enumerator en = batch.first();
03085 unsigned del_rec = 0;
03086 while (en.valid()) {
03087 unsigned txn_size = chunk_size;
03088 if (residue) {
03089 ++txn_size; --residue;
03090 }
03091
03092 CNS_Transaction trans(this);
03093 CQueueGuard guard(this, &trans);
03094
03095 unsigned n;
03096 for (n = 0; en.valid() && n < txn_size; ++en, ++n) {
03097 unsigned job_id = *en;
03098 m_JobDB.id = job_id;
03099 try {
03100 m_JobDB.Delete();
03101 ++del_rec;
03102 } catch (CBDB_ErrnoException& ex) {
03103 ERR_POST(Error << "BDB error " << ex.what());
03104 }
03105
03106 m_JobInfoDB.id = job_id;
03107 try {
03108 m_JobInfoDB.Delete();
03109 } catch (CBDB_ErrnoException& ex) {
03110 ERR_POST(Error << "BDB error " << ex.what());
03111 }
03112 }
03113 trans.Commit();
03114
03115 }
03116 if (del_rec > 0 && IsMonitoring()) {
03117 CTime tm(CTime::eCurrent);
03118 string msg = tm.AsString();
03119 msg += " CQueue::DeleteBatch: " +
03120 NStr::IntToString(del_rec) + " job(s) deleted";
03121 MonitorPost(msg);
03122 }
03123 return del_rec;
03124 }
03125
03126
03127 CBDB_FileCursor& CQueue::GetRunsCursor()
03128 {
03129 CBDB_Transaction* trans = m_RunsDB.GetBDBTransaction();
03130 CBDB_FileCursor* cur = m_RunsCursor.get();
03131 if (!cur) {
03132 cur = new CBDB_FileCursor(m_RunsDB);
03133 m_RunsCursor.reset(cur);
03134 } else {
03135 cur->Close();
03136 }
03137 cur->ReOpen(trans);
03138 return *cur;
03139 }
03140
03141
03142 void CQueue::SetMonitorSocket(CSocket& socket)
03143 {
03144 m_Monitor.SetSocket(socket);
03145 }
03146
03147
03148 bool CQueue::IsMonitoring()
03149 {
03150 return m_Monitor.IsMonitorActive();
03151 }
03152
03153
03154 void CQueue::MonitorPost(const string& msg)
03155 {
03156 m_Monitor.SendString(msg+'\n');
03157 }
03158
03159
03160 void CQueue::PrintSubmHosts(CNcbiOstream& out) const
03161 {
03162 CQueueParamAccessor qp(*this);
03163 qp.GetSubmHosts().PrintHosts(out);
03164 }
03165
03166
03167 void CQueue::PrintWNodeHosts(CNcbiOstream& out) const
03168 {
03169 CQueueParamAccessor qp(*this);
03170 qp.GetWnodeHosts().PrintHosts(out);
03171 }
03172
03173
03174 void CQueue::PrintJobStatusMatrix(CNcbiOstream& out) const
03175 {
03176 m_StatusTracker.PrintStatusMatrix(out);
03177 }
03178
03179
03180 #define NS_PRINT_TIME(msg, t) \
03181 do \
03182 { time_t tt = t; \
03183 CTime _t(tt); _t.ToLocalTime(); \
03184 out << msg << (tt ? _t.AsString() : kEmptyStr) << fsp; \
03185 } while(0)
03186
03187 #define NS_PFNAME(x_fname) \
03188 (fflag ? x_fname : "")
03189
03190 void CQueue::x_PrintJobStat(const CJob& job,
03191 unsigned queue_run_timeout,
03192 CNcbiOstream& out,
03193 const char* fsp,
03194 bool fflag)
03195 {
03196 out << fsp << NS_PFNAME("id: ") << job.GetId() << fsp;
03197 TJobStatus status = job.GetStatus();
03198 out << NS_PFNAME("status: ") << CNetScheduleAPI::StatusToString(status)
03199 << fsp;
03200
03201 const CJobRun* last_run = job.GetLastRun();
03202
03203 NS_PRINT_TIME(NS_PFNAME("time_submit: "), job.GetTimeSubmit());
03204 if (last_run) {
03205 NS_PRINT_TIME(NS_PFNAME("time_start: "), last_run->GetTimeStart());
03206 NS_PRINT_TIME(NS_PFNAME("time_done: "), last_run->GetTimeDone());
03207 }
03208
03209 out << NS_PFNAME("timeout: ") << job.GetTimeout() << fsp;
03210 unsigned run_timeout = job.GetRunTimeout();
03211 out << NS_PFNAME("run_timeout: ") << run_timeout << fsp;
03212
03213 if (last_run) {
03214 if (run_timeout == 0) run_timeout = queue_run_timeout;
03215 time_t exp_time =
03216 run_timeout == 0 ? 0 : last_run->GetTimeStart() + run_timeout;
03217 NS_PRINT_TIME(NS_PFNAME("time_run_expire: "), exp_time);
03218 }
03219
03220 unsigned subm_addr = job.GetSubmAddr();
03221 out << NS_PFNAME("subm_addr: ")
03222 << (subm_addr ? CSocketAPI::gethostbyaddr(subm_addr) : kEmptyStr) << fsp;
03223 out << NS_PFNAME("subm_port: ") << job.GetSubmPort() << fsp;
03224 out << NS_PFNAME("subm_timeout: ") << job.GetSubmTimeout() << fsp;
03225
03226 int node_num = 1;
03227 ITERATE(vector<CJobRun>, it, job.GetRuns()) {
03228 unsigned addr = it->GetNodeAddr();
03229 string node_name = "worker_node" + NStr::IntToString(node_num++) + ": ";
03230 out << NS_PFNAME(node_name)
03231 << (addr ? CSocketAPI::gethostbyaddr(addr) : kEmptyStr) << fsp;
03232 }
03233
03234 out << NS_PFNAME("run_counter: ") << job.GetRunCount() << fsp;
03235 if (last_run)
03236 out << NS_PFNAME("ret_code: ") << last_run->GetRetCode() << fsp;
03237
03238 unsigned aff_id = job.GetAffinityId();
03239 if (aff_id) {
03240 out << NS_PFNAME("aff_token: ") << "'" << job.GetAffinityToken()
03241 << "'" << fsp;
03242 }
03243 out << NS_PFNAME("aff_id: ") << aff_id << fsp;
03244 out << NS_PFNAME("mask: ") << job.GetMask() << fsp;
03245
03246 out << NS_PFNAME("input: ") << "'" << job.GetInput() << "'" << fsp;
03247 out << NS_PFNAME("output: ") << "'" << job.GetOutput() << "'" << fsp;
03248
03249 if (last_run) {
03250 out << NS_PFNAME("err_msg: ")
03251 << "'" << last_run->GetErrorMsg() << "'" << fsp;
03252 }
03253 out << NS_PFNAME("progress_msg: ")
03254 << "'" << job.GetProgressMsg() << "'" << fsp;
03255 out << "\n";
03256 }
03257
03258
03259 void CQueue::x_PrintShortJobStat(const CJob& job,
03260 const string& host,
03261 unsigned port,
03262 CNcbiOstream& out,
03263 const char* fsp)
03264 {
03265 out << string(CNetScheduleKey(job.GetId(), host, port)) << fsp;
03266 TJobStatus status = job.GetStatus();
03267 out << CNetScheduleAPI::StatusToString(status) << fsp;
03268
03269 out << "'" << job.GetInput() << "'" << fsp;
03270 out << "'" << job.GetOutput() << "'" << fsp;
03271 const CJobRun* last_run = job.GetLastRun();
03272 if (last_run)
03273 out << "'" << last_run->GetErrorMsg() << "'" << fsp;
03274 else
03275 out << "''" << fsp;
03276
03277 out << "\n";
03278 }
03279
03280
03281 void
03282 CQueue::PrintJobDbStat(unsigned job_id,
03283 CNcbiOstream& out,
03284 TJobStatus status)
03285 {
03286 unsigned queue_run_timeout = GetRunTimeout();
03287
03288 CJob job;
03289 if (status == CNetScheduleAPI::eJobNotFound) {
03290 CQueueGuard guard(this);
03291 CJob::EJobFetchResult res = job.Fetch(this, job_id);
03292
03293 if (res == CJob::eJF_Ok) {
03294 job.FetchAffinityToken(this);
03295 x_PrintJobStat(job, queue_run_timeout, out);
03296 } else {
03297 out << "Job not found id=" << DecorateJobId(job_id);
03298 }
03299 out << "\n";
03300 } else {
03301 TNSBitVector bv;
03302 JobsWithStatus(status, &bv);
03303
03304 TNSBitVector::enumerator en(bv.first());
03305 for (;en.valid(); ++en) {
03306 CQueueGuard guard(this);
03307 CJob::EJobFetchResult res = job.Fetch(this, *en);
03308 if (res == CJob::eJF_Ok) {
03309 job.FetchAffinityToken(this);
03310 x_PrintJobStat(job, queue_run_timeout, out);
03311 }
03312 }
03313 out << "\n";
03314 }
03315 }
03316
03317
03318 void CQueue::PrintAllJobDbStat(CNcbiOstream& out)
03319 {
03320 unsigned queue_run_timeout = GetRunTimeout();
03321
03322 CJob job;
03323 CQueueGuard guard(this);
03324
03325 CQueueEnumCursor cur(this);
03326 while (cur.Fetch() == eBDB_Ok) {
03327 CJob::EJobFetchResult res = job.Fetch(this);
03328 if (res == CJob::eJF_Ok) {
03329 job.FetchAffinityToken(this);
03330 x_PrintJobStat(job, queue_run_timeout, out);
03331 }
03332 if (!out.good()) break;
03333 }
03334 }
03335
03336
03337 void CQueue::PrintQueue(CNcbiOstream& out,
03338 TJobStatus job_status,
03339 const string& host,
03340 unsigned port)
03341 {
03342 TNSBitVector bv;
03343 JobsWithStatus(job_status, &bv);
03344
03345 CJob job;
03346 TNSBitVector::enumerator en(bv.first());
03347 for (;en.valid(); ++en) {
03348 CQueueGuard guard(this);
03349
03350 CJob::EJobFetchResult res = job.Fetch(this, *en);
03351 if (res == CJob::eJF_Ok)
03352 x_PrintShortJobStat(job, host, port, out);
03353 }
03354 }
03355
03356
03357 unsigned CQueue::CountStatus(TJobStatus st) const
03358 {
03359 return m_StatusTracker.CountStatus(st);
03360 }
03361
03362
03363 void
03364 CQueue::StatusStatistics(TJobStatus status,
03365 TNSBitVector::statistics* st) const
03366 {
03367 m_StatusTracker.StatusStatistics(status, st);
03368 }
03369
03370
03371 unsigned CQueue::CountRecs()
03372 {
03373 CQueueGuard guard(this);
03374 return m_JobDB.CountRecs();
03375 }
03376
03377
03378 void CQueue::PrintStat(CNcbiOstream& out)
03379 {
03380 CQueueGuard guard(this);
03381 m_JobDB.PrintStat(out);
03382 }
03383
03384
03385 const unsigned kMeasureInterval = 1;
03386
03387 const unsigned kDecayInterval = 10;
03388 const unsigned kFixedShift = 7;
03389 const unsigned kFixed_1 = 1 << kFixedShift;
03390
03391 const unsigned kDecayExp = 116;
03392
03393 CQueue::CStatisticsThread::CStatisticsThread(TContainer& container)
03394 : CThreadNonStop(kMeasureInterval),
03395 m_Container(container)
03396 {
03397 }
03398
03399
03400 void CQueue::CStatisticsThread::DoJob(void) {
03401 unsigned counter;
03402 for (TStatEvent n = 0; n < eStatNumEvents; ++n) {
03403 counter = m_Container.m_EventCounter[n].Get();
03404 m_Container.m_EventCounter[n].Add(-counter);
03405 m_Container.m_Average[n] = (kDecayExp * m_Container.m_Average[n] +
03406 (kFixed_1-kDecayExp) * (counter << kFixedShift)
03407 ) >> kFixedShift;
03408 }
03409 }
03410
03411
03412 void CQueue::CountEvent(TStatEvent event, int num)
03413 {
03414 m_EventCounter[event].Add(num);
03415 }
03416
03417
03418 double CQueue::GetAverage(TStatEvent n_event)
03419 {
03420 return m_Average[n_event] / double(kFixed_1 * kMeasureInterval);
03421 }
03422
03423
03424 void CQueue::x_AddToReadGroupNoLock(unsigned group_id, unsigned job_id)
03425 {
03426 TGroupMap::iterator i = m_GroupMap.find(group_id);
03427 if (i != m_GroupMap.end()) {
03428 TNSBitVector& bv = (*i).second;
03429 bv.set(job_id);
03430 } else {
03431 TNSBitVector bv;
03432 bv.set(job_id);
03433 m_GroupMap[group_id] = bv;
03434 }
03435 }
03436
03437
03438 void CQueue::x_CreateReadGroup(unsigned group_id, const TNSBitVector& bv_jobs)
03439 {
03440 CFastMutexGuard guard(m_GroupMapLock);
03441 m_GroupMap[group_id] = bv_jobs;
03442 }
03443
03444
03445 void CQueue::x_RemoveFromReadGroup(unsigned group_id, unsigned job_id)
03446 {
03447 CFastMutexGuard guard(m_GroupMapLock);
03448 TGroupMap::iterator i = m_GroupMap.find(group_id);
03449 if (i != m_GroupMap.end()) {
03450 TNSBitVector& bv = (*i).second;
03451 bv.set(job_id, false);
03452 if (!bv.any())
03453 m_GroupMap.erase(i);
03454 }
03455 }
03456
03457
03458 bool
03459 CQueue::x_UpdateDB_PutResultNoLock(
03460 unsigned job_id,
03461 time_t curr,
03462 bool delete_done,
03463 int ret_code,
03464 const string& output,
03465 CJob& job)
03466 {
03467 CJob::EJobFetchResult res = job.Fetch(this, job_id);
03468 if (res != CJob::eJF_Ok) {
03469
03470 return false;
03471 }
03472
03473 CJobRun* run = job.GetLastRun();
03474 if (!run) {
03475 ERR_POST(Error << "No JobRun for running job "
03476 << DecorateJobId(job_id));
03477 NCBI_THROW(CNetScheduleException, eInvalidJobStatus, "Job never ran");
03478 }
03479
03480 if (delete_done) {
03481 job.Delete();
03482 } else {
03483 run->SetStatus(CNetScheduleAPI::eDone);
03484 run->SetTimeDone(curr);
03485 run->SetRetCode(ret_code);
03486 job.SetStatus(CNetScheduleAPI::eDone);
03487 job.SetOutput(output);
03488
03489 job.Flush(this);
03490 }
03491 return true;
03492 }
03493
03494
03495 CQueue::EGetJobUpdateStatus
03496 CQueue::x_UpdateDB_GetJobNoLock(
03497 CWorkerNode* worker_node,
03498 time_t curr,
03499 unsigned job_id,
03500 CJob& job)
03501 {
03502 unsigned queue_timeout = GetTimeout();
03503
03504 const unsigned kMaxGetAttempts = 100;
03505
03506
03507 for (unsigned fetch_attempts = 0; fetch_attempts < kMaxGetAttempts;
03508 ++fetch_attempts)
03509 {
03510 CJob::EJobFetchResult res;
03511 if ((res = job.Fetch(this, job_id)) != CJob::eJF_Ok) {
03512 if (res == CJob::eJF_NotFound) return eGetJobUpdate_NotFound;
03513
03514 continue;
03515 }
03516 TJobStatus status = job.GetStatus();
03517
03518
03519 if (status != CNetScheduleAPI::ePending) {
03520 if (CJobStatusTracker::IsCancelCode(status)) {
03521
03522 return eGetJobUpdate_JobStopped;
03523 }
03524 ERR_POST(Error
03525 << "x_UpdateDB_GetJobNoLock: Status integrity violation "
03526 << " job = " << DecorateJobId(job_id)
03527 << " status = " << status
03528 << " expected status = "
03529 << (int)CNetScheduleAPI::ePending);
03530 return eGetJobUpdate_JobStopped;
03531 }
03532
03533 time_t time_submit = job.GetTimeSubmit();
03534 time_t timeout = job.GetTimeout();
03535 if (timeout == 0) timeout = queue_timeout;
03536
03537 _ASSERT(timeout);
03538
03539 if (timeout && (time_submit + timeout < curr)) {
03540
03541 CJobRun& run = job.AppendRun();
03542 run.SetStatus(CNetScheduleAPI::eFailed);
03543 run.SetTimeDone(curr);
03544 run.SetErrorMsg("Job expired and cannot be scheduled.");
03545 job.SetStatus(CNetScheduleAPI::eFailed);
03546 m_StatusTracker.ChangeStatus(job_id, CNetScheduleAPI::eFailed);
03547 job.Flush(this);
03548
03549 if (IsMonitoring()) {
03550 CTime tmp_t(CTime::eCurrent);
03551 string msg = tmp_t.AsString();
03552 msg +=
03553 " CQueue::x_UpdateDB_GetJobNoLock() timeout expired job id=";
03554 msg += NStr::IntToString(job_id);
03555 MonitorPost(msg);
03556 }
03557 return eGetJobUpdate_JobFailed;
03558 }
03559
03560
03561 job.FetchAffinityToken(this);
03562
03563 unsigned run_count = job.GetRunCount() + 1;
03564 CJobRun& run = job.AppendRun();
03565 run.SetStatus(CNetScheduleAPI::eRunning);
03566 run.SetTimeStart(curr);
03567
03568
03569
03570
03571 run.SetNodeId(worker_node->GetId());
03572 run.SetNodeAddr(worker_node->GetHost());
03573 run.SetNodePort(worker_node->GetPort());
03574
03575 job.SetStatus(CNetScheduleAPI::eRunning);
03576 job.SetRunTimeout(0);
03577 job.SetRunCount(run_count);
03578
03579 job.Flush(this);
03580 return eGetJobUpdate_Ok;
03581 }
03582
03583 return eGetJobUpdate_NotFound;
03584 }
03585
03586
03587 END_NCBI_SCOPE
03588
03589