|
NCBI C++ ToolKit
|
00001 /* $Id: ns_queue.cpp 54574 2012-05-22 16:50:47Z satskyse $ 00002 * =========================================================================== 00003 * 00004 * PUBLIC DOMAIN NOTICE 00005 * National Center for Biotechnology Information 00006 * 00007 * This software/database is a "United States Government Work" under the 00008 * terms of the United States Copyright Act. It was written as part of 00009 * the author's official duties as a United States Government employee and 00010 * thus cannot be copyrighted. This software/database is freely available 00011 * to the public for use. The National Library of Medicine and the U.S. 00012 * Government have not placed any restriction on its use or reproduction. 00013 * 00014 * Although all reasonable efforts have been taken to ensure the accuracy 00015 * and reliability of the software and data, the NLM and the U.S. 00016 * Government do not and cannot warrant the performance or results that 00017 * may be obtained by using this software or data. The NLM and the U.S. 00018 * Government disclaim all warranties, express or implied, including 00019 * warranties of performance, merchantability or fitness for any particular 00020 * purpose. 00021 * 00022 * Please cite the author in any work or product based on this material. 00023 * 00024 * =========================================================================== 00025 * 00026 * Authors: Victor Joukov 00027 * 00028 * File Description: 00029 * NetSchedule queue structure and parameters 00030 */ 00031 #include <ncbi_pch.hpp> 00032 00033 #include "ns_queue.hpp" 00034 #include "ns_queue_param_accessor.hpp" 00035 #include "background_host.hpp" 00036 #include "ns_util.hpp" 00037 #include "ns_format.hpp" 00038 #include "ns_server.hpp" 00039 #include "ns_handler.hpp" 00040 00041 #include <corelib/ncbi_system.hpp> // SleepMilliSec 00042 #include <corelib/request_ctx.hpp> 00043 #include <db/bdb/bdb_trans.hpp> 00044 #include <util/qparse/query_parse.hpp> 00045 #include <util/qparse/query_exec.hpp> 00046 #include <util/qparse/query_exec_bv.hpp> 00047 #include <util/bitset/bmalgo.h> 00048 00049 00050 BEGIN_NCBI_SCOPE 00051 00052 00053 00054 ////////////////////////////////////////////////////////////////////////// 00055 CQueueEnumCursor::CQueueEnumCursor(CQueue * queue, unsigned int start_after) 00056 : CBDB_FileCursor(queue->m_QueueDbBlock->job_db) 00057 { 00058 SetCondition(CBDB_FileCursor::eGT); 00059 From << start_after; 00060 } 00061 00062 00063 ////////////////////////////////////////////////////////////////////////// 00064 // CQueue 00065 00066 00067 // Used together with m_SavedId. m_SavedId is saved in a DB and is used 00068 // as to start from value for the restarted neschedule. 00069 // s_ReserveDelta value is used to avoid to often DB updates 00070 static const unsigned int s_ReserveDelta = 10000; 00071 00072 00073 CQueue::CQueue(CRequestExecutor& executor, 00074 const string& queue_name, 00075 const string& qclass_name, 00076 TQueueKind queue_kind, 00077 CNetScheduleServer * server) 00078 : 00079 m_RunTimeLine(NULL), 00080 m_DeleteDatabase(false), 00081 m_Executor(executor), 00082 m_QueueName(queue_name), 00083 m_QueueClass(qclass_name), 00084 m_Kind(queue_kind), 00085 m_QueueDbBlock(0), 00086 00087 m_BecameEmpty(-1), 00088 00089 m_LastId(0), 00090 m_SavedId(s_ReserveDelta), 00091 00092 m_ParamLock(CRWLock::fFavorWriters), 00093 m_Timeout(3600), 00094 m_RunTimeout(3600), 00095 m_RunTimeoutPrecision(-1), 00096 m_FailedRetries(0), 00097 m_BlacklistTime(0), 00098 m_EmptyLifetime(0), 00099 m_MaxInputSize(kNetScheduleMaxDBDataSize), 00100 m_MaxOutputSize(kNetScheduleMaxDBDataSize), 00101 m_DenyAccessViolations(false), 00102 m_WNodeTimeout(40), 00103 m_KeyGenerator(server->GetHost(), server->GetPort()), 00104 m_Log(server->IsLog()), 00105 m_LogBatchEachJob(server->IsLogBatchEachJob()), 00106 m_RefuseSubmits(false), 00107 m_LastAffinityGC(0), 00108 m_MaxAffinities(server->GetMaxAffinities()), 00109 m_AffinityHighMarkPercentage(server->GetAffinityHighMarkPercentage()), 00110 m_AffinityLowMarkPercentage(server->GetAffinityLowMarkPercentage()), 00111 m_AffinityHighRemoval(server->GetAffinityHighRemoval()), 00112 m_AffinityLowRemoval(server->GetAffinityLowRemoval()), 00113 m_AffinityDirtPercentage(server->GetAffinityDirtPercentage()), 00114 m_NotificationsList(server->GetNodeID(), queue_name), 00115 m_NotifHifreqInterval(0.1), 00116 m_NotifHifreqPeriod(5), 00117 m_NotifLofreqMult(50), 00118 m_DumpBufferSize(100) 00119 { 00120 _ASSERT(!queue_name.empty()); 00121 } 00122 00123 00124 CQueue::~CQueue() 00125 { 00126 delete m_RunTimeLine; 00127 Detach(); 00128 } 00129 00130 00131 void CQueue::Attach(SQueueDbBlock* block) 00132 { 00133 Detach(); 00134 m_QueueDbBlock = block; 00135 m_AffinityRegistry.Attach(&m_QueueDbBlock->aff_dict_db); 00136 m_GroupRegistry.Attach(&m_QueueDbBlock->group_dict_db); 00137 00138 // Here we have a db, so we can read the counter value we should start from 00139 m_LastId = x_ReadStartFromCounter(); 00140 m_SavedId = m_LastId + s_ReserveDelta; 00141 if (m_SavedId < m_LastId) { 00142 // Overflow 00143 m_LastId = 0; 00144 m_SavedId = s_ReserveDelta; 00145 } 00146 x_UpdateStartFromCounter(); 00147 } 00148 00149 00150 class CTruncateRequest : public CStdRequest 00151 { 00152 public: 00153 CTruncateRequest(SQueueDbBlock* qdbblock) 00154 : m_QueueDbBlock(qdbblock) 00155 {} 00156 00157 virtual void Process() 00158 { 00159 // See CQueue::Detach why we can do this without locking. 00160 CStopWatch sw(CStopWatch::eStart); 00161 m_QueueDbBlock->Truncate(); 00162 m_QueueDbBlock->allocated = false; 00163 LOG_POST(Message << Warning << "Clean up of db block " 00164 << m_QueueDbBlock->pos << " complete, " 00165 << sw.Elapsed() << " elapsed"); 00166 } 00167 private: 00168 SQueueDbBlock* m_QueueDbBlock; 00169 00170 }; 00171 00172 00173 void CQueue::Detach() 00174 { 00175 // We are here have synchronized access to m_QueueDbBlock without mutex 00176 // because we are here only when the last reference to CQueue is 00177 // destroyed. So as long m_QueueDbBlock->allocated is true it cannot 00178 // be allocated again and thus cannot be accessed as well. 00179 // As soon as we're done with detaching (here) or truncating (in separate 00180 // request, submitted for execution to the server thread pool), we can 00181 // set m_QueueDbBlock->allocated to false. Boolean write is atomic by 00182 // definition and test-and-set is executed from synchronized code in 00183 // CQueueDbBlockArray::Allocate. 00184 m_AffinityRegistry.Detach(); 00185 if (!m_QueueDbBlock) 00186 return; 00187 00188 if (m_DeleteDatabase) { 00189 CRef<CStdRequest> request(new CTruncateRequest(m_QueueDbBlock)); 00190 m_Executor.SubmitRequest(request); 00191 m_DeleteDatabase = false; 00192 } else 00193 m_QueueDbBlock->allocated = false; 00194 m_QueueDbBlock = 0; 00195 } 00196 00197 00198 void CQueue::SetParameters(const SQueueParameters & params) 00199 { 00200 // When modifying this, modify all places marked with PARAMETERS 00201 CWriteLockGuard guard(m_ParamLock); 00202 00203 m_Timeout = params.timeout; 00204 m_RunTimeout = params.run_timeout; 00205 if (params.run_timeout && !m_RunTimeLine) { 00206 // One time only. Precision can not be reset. 00207 m_RunTimeLine = 00208 new CJobTimeLine(params.run_timeout_precision, 0); 00209 m_RunTimeoutPrecision = params.run_timeout_precision; 00210 } 00211 00212 m_FailedRetries = params.failed_retries; 00213 m_BlacklistTime = params.blacklist_time; 00214 m_EmptyLifetime = params.empty_lifetime; 00215 m_MaxInputSize = params.max_input_size; 00216 m_MaxOutputSize = params.max_output_size; 00217 m_DenyAccessViolations = params.deny_access_violations; 00218 m_WNodeTimeout = params.wnode_timeout; 00219 m_NotifHifreqInterval = params.notif_hifreq_interval; 00220 m_NotifHifreqPeriod = params.notif_hifreq_period; 00221 m_NotifLofreqMult = params.notif_lofreq_mult; 00222 m_DumpBufferSize = params.dump_buffer_size; 00223 00224 // program version control 00225 m_ProgramVersionList.Clear(); 00226 if (!params.program_name.empty()) { 00227 m_ProgramVersionList.AddClientInfo(params.program_name); 00228 } 00229 m_SubmHosts.SetHosts(params.subm_hosts); 00230 m_WnodeHosts.SetHosts(params.wnode_hosts); 00231 } 00232 00233 00234 CQueue::TParameterList CQueue::GetParameters() const 00235 { 00236 TParameterList parameters; 00237 CQueueParamAccessor qp(*this); 00238 unsigned nParams = qp.GetNumParams(); 00239 00240 for (unsigned n = 0; n < nParams; ++n) { 00241 parameters.push_back( 00242 pair<string, string>(qp.GetParamName(n), qp.GetParamValue(n))); 00243 } 00244 return parameters; 00245 } 00246 00247 00248 void CQueue::GetMaxIOSizes(unsigned int & max_input_size, 00249 unsigned int & max_output_size) const 00250 { 00251 CQueueParamAccessor qp(*this); 00252 00253 max_input_size = qp.GetMaxInputSize(); 00254 max_output_size = qp.GetMaxOutputSize(); 00255 return; 00256 } 00257 00258 00259 unsigned CQueue::LoadStatusMatrix() 00260 { 00261 // Load the known affinities and groups 00262 m_AffinityRegistry.LoadAffinityDictionary(); 00263 m_GroupRegistry.LoadGroupDictionary(); 00264 00265 // scan the queue, load the state machine from DB 00266 CBDB_FileCursor cur(m_QueueDbBlock->job_db); 00267 00268 cur.InitMultiFetch(1024*1024); 00269 cur.SetCondition(CBDB_FileCursor::eGE); 00270 cur.From << 0; 00271 00272 unsigned int recs = 0; 00273 00274 for (; cur.Fetch() == eBDB_Ok; ) { 00275 unsigned int job_id = m_QueueDbBlock->job_db.id; 00276 unsigned int group_id = m_QueueDbBlock->job_db.group_id; 00277 unsigned int aff_id = m_QueueDbBlock->job_db.aff_id; 00278 time_t last_touch = m_QueueDbBlock->job_db.last_touch; 00279 time_t job_timeout = m_QueueDbBlock->job_db.timeout; 00280 time_t job_run_timeout = m_QueueDbBlock->job_db.run_timeout; 00281 TJobStatus status = TJobStatus(static_cast<int>(m_QueueDbBlock->job_db.status)); 00282 00283 m_StatusTracker.SetExactStatusNoLock(job_id, status, true); 00284 00285 if ((status == CNetScheduleAPI::eRunning || 00286 status == CNetScheduleAPI::eReading) && 00287 m_RunTimeLine) { 00288 // Add object to the first available slot; 00289 // it is going to be rescheduled or dropped 00290 // in the background control thread 00291 // We can use time line without lock here because 00292 // the queue is still in single-use mode while 00293 // being loaded. 00294 m_RunTimeLine->AddObject(m_RunTimeLine->GetHead(), job_id); 00295 } 00296 00297 // Register the job for the affinity if so 00298 if (aff_id != 0) 00299 m_AffinityRegistry.AddJobToAffinity(job_id, aff_id); 00300 00301 // Register the job in the group registry 00302 if (group_id != 0) 00303 m_GroupRegistry.AddJob(group_id, job_id); 00304 00305 // Register the loaded job with the garbage collector 00306 m_GCRegistry.RegisterJob(job_id, aff_id, group_id, 00307 GetJobExpirationTime(last_touch, status, 00308 job_timeout, 00309 job_run_timeout, 00310 m_Timeout, m_RunTimeout)); 00311 00312 ++recs; 00313 } 00314 00315 // Make sure that there are no affinity IDs in the registry for which there 00316 // are no jobs and initialize the next affinity ID counter. 00317 m_AffinityRegistry.FinalizeAffinityDictionaryLoading(); 00318 00319 // Make sure that there are no group IDs in the registry for which there 00320 // are no jobs and initialize the next group ID counter. 00321 m_GroupRegistry.FinalizeGroupDictionaryLoading(); 00322 00323 return recs; 00324 } 00325 00326 00327 // Used to log a single job 00328 void CQueue::x_LogSubmit(const CJob & job, 00329 const string & aff, 00330 const string & group) 00331 { 00332 if (!m_Log) 00333 return; 00334 00335 string group_token(group); 00336 00337 if (group_token.empty()) 00338 group_token = "n/a"; 00339 00340 CDiagContext_Extra extra = GetDiagContext().Extra() 00341 .Print("job_key", MakeKey(job.GetId())) 00342 .Print("queue", GetQueueName()) 00343 .Print("input", job.GetInput()) 00344 .Print("aff", aff) 00345 .Print("group", group_token) 00346 .Print("mask", job.GetMask()) 00347 .Print("subm_addr", CSocketAPI::gethostbyaddr(job.GetSubmAddr())) 00348 .Print("subm_notif_port", job.GetSubmNotifPort()) 00349 .Print("subm_notif_timeout", job.GetSubmNotifTimeout()) 00350 .Print("timeout", Uint8(job.GetTimeout())) 00351 .Print("run_timeout", job.GetRunTimeout()) 00352 .Print("progress_msg", job.GetProgressMsg()); 00353 00354 extra.Flush(); 00355 return; 00356 } 00357 00358 00359 unsigned int CQueue::Submit(const CNSClientId & client, 00360 CJob & job, 00361 const string & aff_token, 00362 const string & group) 00363 { 00364 // the only config parameter used here is the max input size so there is no 00365 // need to have a safe parameters accessor. 00366 00367 if (job.GetInput().size() > m_MaxInputSize) 00368 NCBI_THROW(CNetScheduleException, eDataTooLong, "Input is too long"); 00369 00370 unsigned int aff_id = 0; 00371 unsigned int group_id = 0; 00372 time_t current_time = time(0); 00373 unsigned int job_id = GetNextId(); 00374 CJobEvent & event = job.AppendEvent(); 00375 00376 job.SetId(job_id); 00377 job.SetPassport(rand()); 00378 job.SetLastTouch(current_time); 00379 00380 event.SetNodeAddr(client.GetAddress()); 00381 event.SetStatus(CNetScheduleAPI::ePending); 00382 event.SetEvent(CJobEvent::eSubmit); 00383 event.SetTimestamp(current_time); 00384 event.SetClientNode(client.GetNode()); 00385 event.SetClientSession(client.GetSession()); 00386 00387 // Special treatment for system job masks 00388 if (job.GetMask() & CNetScheduleAPI::eOutOfOrder) 00389 { 00390 // NOT IMPLEMENTED YET: put job id into OutOfOrder list. 00391 // The idea is that there can be an urgent job, which 00392 // should be executed before jobs which were submitted 00393 // earlier, e.g. for some administrative purposes. See 00394 // CNetScheduleAPI::EJobMask in file netschedule_api.hpp 00395 } 00396 00397 // Take the queue lock and start the operation 00398 {{ 00399 CFastMutexGuard guard(m_OperationLock); 00400 00401 {{ 00402 CNSTransaction transaction(this); 00403 00404 if (!group.empty()) { 00405 group_id = m_GroupRegistry.AddJob(group, job_id); 00406 job.SetGroupId(group_id); 00407 } 00408 if (!aff_token.empty()) { 00409 aff_id = m_AffinityRegistry.ResolveAffinityToken(aff_token, 00410 job_id, 0); 00411 job.SetAffinityId(aff_id); 00412 } 00413 job.Flush(this); 00414 00415 transaction.Commit(); 00416 }} 00417 00418 m_StatusTracker.AddPendingJob(job_id); 00419 00420 // Register the job with the client 00421 m_ClientsRegistry.AddToSubmitted(client, 1); 00422 00423 // Make the decision whether to send or not a notification 00424 m_NotificationsList.Notify(job_id, aff_id, 00425 m_ClientsRegistry, 00426 m_AffinityRegistry, 00427 m_NotifHifreqPeriod); 00428 00429 m_GCRegistry.RegisterJob(job_id, aff_id, group_id, 00430 job.GetExpirationTime(m_Timeout, 00431 m_RunTimeout)); 00432 }} 00433 00434 m_StatisticsCounters.CountSubmit(1); 00435 x_LogSubmit(job, aff_token, group); 00436 return job_id; 00437 } 00438 00439 00440 unsigned int CQueue::SubmitBatch(const CNSClientId & client, 00441 vector< pair<CJob, string> > & batch, 00442 const string & group) 00443 { 00444 unsigned int batch_size = batch.size(); 00445 unsigned int job_id = GetNextJobIdForBatch(batch_size); 00446 TNSBitVector affinities; 00447 00448 {{ 00449 unsigned int job_id_cnt = job_id; 00450 time_t curr_time = time(0); 00451 unsigned int group_id = 0; 00452 bool no_aff_jobs = false; 00453 00454 CFastMutexGuard guard(m_OperationLock); 00455 00456 {{ 00457 CNSTransaction transaction(this); 00458 00459 // This might create a new record in the DB 00460 group_id = m_GroupRegistry.ResolveGroup(group); 00461 00462 for (size_t k = 0; k < batch_size; ++k) { 00463 00464 CJob & job = batch[k].first; 00465 const string & aff_token = batch[k].second; 00466 CJobEvent & event = job.AppendEvent(); 00467 00468 job.SetId(job_id_cnt); 00469 job.SetPassport(rand()); 00470 job.SetGroupId(group_id); 00471 job.SetLastTouch(curr_time); 00472 00473 event.SetNodeAddr(client.GetAddress()); 00474 event.SetStatus(CNetScheduleAPI::ePending); 00475 event.SetEvent(CJobEvent::eBatchSubmit); 00476 event.SetTimestamp(curr_time); 00477 event.SetClientNode(client.GetNode()); 00478 event.SetClientSession(client.GetSession()); 00479 00480 if (!aff_token.empty()) { 00481 unsigned int aff_id = m_AffinityRegistry. 00482 ResolveAffinityToken(aff_token, 00483 job_id_cnt, 00484 0); 00485 00486 job.SetAffinityId(aff_id); 00487 affinities.set_bit(aff_id, true); 00488 } 00489 else 00490 no_aff_jobs = true; 00491 00492 job.Flush(this); 00493 ++job_id_cnt; 00494 } 00495 00496 transaction.Commit(); 00497 }} 00498 00499 m_GroupRegistry.AddJobs(group_id, job_id, batch_size); 00500 m_StatusTracker.AddPendingBatch(job_id, job_id + batch_size - 1); 00501 m_ClientsRegistry.AddToSubmitted(client, batch_size); 00502 00503 // Make a decision whether to notify clients or not 00504 TNSBitVector jobs; 00505 jobs.set_range(job_id, job_id + batch_size - 1); 00506 m_NotificationsList.Notify(jobs, affinities, no_aff_jobs, 00507 m_ClientsRegistry, 00508 m_AffinityRegistry, 00509 m_NotifHifreqPeriod); 00510 00511 for (size_t k = 0; k < batch_size; ++k) 00512 m_GCRegistry.RegisterJob(batch[k].first.GetId(), 00513 batch[k].first.GetAffinityId(), 00514 group_id, 00515 batch[k].first.GetExpirationTime(m_Timeout, 00516 m_RunTimeout)); 00517 }} 00518 00519 m_StatisticsCounters.CountSubmit(batch_size); 00520 if (m_LogBatchEachJob) 00521 for (size_t k = 0; k < batch_size; ++k) 00522 x_LogSubmit(batch[k].first, batch[k].second, group); 00523 00524 return job_id; 00525 } 00526 00527 00528 00529 static const size_t k_max_dead_locks = 10; // max. dead lock repeats 00530 00531 TJobStatus CQueue::PutResult(const CNSClientId & client, 00532 time_t curr, 00533 unsigned int job_id, 00534 const string & auth_token, 00535 int ret_code, 00536 const string * output) 00537 { 00538 _ASSERT(job_id && output); 00539 00540 // The only one parameter (max output size) is required for the put 00541 // operation so there is no need to use CQueueParamAccessor 00542 00543 if (output->size() > m_MaxOutputSize) 00544 NCBI_THROW(CNetScheduleException, eDataTooLong, 00545 "Output is too long"); 00546 00547 CJob job; 00548 size_t dead_locks = 0; // dead lock counter 00549 00550 for (;;) { 00551 try { 00552 CFastMutexGuard guard(m_OperationLock); 00553 TJobStatus old_status = GetJobStatus(job_id); 00554 00555 if (old_status == CNetScheduleAPI::eDone) { 00556 m_StatisticsCounters.CountTransition(CNetScheduleAPI::eDone, 00557 CNetScheduleAPI::eDone); 00558 return old_status; 00559 } 00560 00561 if (old_status != CNetScheduleAPI::ePending && 00562 old_status != CNetScheduleAPI::eRunning) 00563 return old_status; 00564 00565 {{ 00566 CNSTransaction transaction(this); 00567 x_UpdateDB_PutResultNoLock(job_id, auth_token, curr, 00568 ret_code, *output, job, 00569 client); 00570 transaction.Commit(); 00571 }} 00572 00573 m_StatusTracker.ChangeStatus(this, job_id, CNetScheduleAPI::eDone); 00574 m_ClientsRegistry.ClearExecuting(job_id); 00575 00576 m_GCRegistry.UpdateLifetime(job_id, job.GetExpirationTime(m_Timeout, 00577 m_RunTimeout)); 00578 00579 TimeLineRemove(job_id); 00580 00581 if (job.ShouldNotify(curr)) 00582 m_NotificationsList.NotifyJobStatus(job.GetSubmAddr(), 00583 job.GetSubmNotifPort(), 00584 MakeKey(job_id), 00585 job.GetStatus()); 00586 return old_status; 00587 } 00588 catch (CBDB_ErrnoException & ex) { 00589 if (ex.IsDeadLock()) { 00590 if (++dead_locks < k_max_dead_locks) { 00591 ERR_POST("DeadLock repeat in CQueue::PutResult"); 00592 SleepMilliSec(100); 00593 continue; 00594 } 00595 } else if (ex.IsNoMem()) { 00596 if (++dead_locks < k_max_dead_locks) { 00597 ERR_POST("No resource repeat in CQueue::PutResult"); 00598 SleepMilliSec(100); 00599 continue; 00600 } 00601 } 00602 00603 if (ex.IsDeadLock() || ex.IsNoMem()) { 00604 string message = "Too many transaction repeats in " 00605 "CQueue::PutResult"; 00606 ERR_POST(message); 00607 NCBI_THROW(CNetScheduleException, eTryAgain, message); 00608 } 00609 00610 throw; 00611 } 00612 } 00613 } 00614 00615 00616 bool CQueue::GetJobOrWait(const CNSClientId & client, 00617 unsigned short port, // Port the client 00618 // will wait on 00619 unsigned int timeout, // If timeout != 0 => WGET 00620 time_t curr, 00621 const list<string> * aff_list, 00622 bool wnode_affinity, 00623 bool any_affinity, 00624 bool exclusive_new_affinity, 00625 bool new_format, 00626 CJob * new_job) 00627 { 00628 // We need exactly 1 parameter - m_RunTimeout, so we can access it without 00629 // CQueueParamAccessor 00630 00631 size_t dead_locks = 0; // dead lock counter 00632 00633 for (;;) { 00634 try { 00635 TNSBitVector aff_ids; 00636 00637 {{ 00638 CFastMutexGuard guard(m_OperationLock); 00639 00640 if (wnode_affinity) { 00641 // Check first that the were no preferred affinities reset for 00642 // the client 00643 if (m_ClientsRegistry.GetAffinityReset(client)) 00644 return false; // Affinity was reset, so no job for the client 00645 } 00646 00647 if (timeout != 0) { 00648 // WGET: 00649 // The affinities has to be resolved straight away, however at this 00650 // point it is still unknown that the client will wait for them, so 00651 // the client ID is passed as 0. Later on the client info will be 00652 // updated in the affinity registry if the client really waits for 00653 // this affinities. 00654 CNSTransaction transaction(this); 00655 aff_ids = m_AffinityRegistry.ResolveAffinitiesForWaitClient(*aff_list, 0); 00656 transaction.Commit(); 00657 } 00658 else 00659 // GET: 00660 // No need to create aff records if they are not known 00661 aff_ids = m_AffinityRegistry.GetAffinityIDs(*aff_list); 00662 00663 x_UnregisterGetListener(client, port); 00664 }} 00665 00666 for (;;) { 00667 // No lock here to make it possible to pick a job 00668 // simultaneously from many threads 00669 x_SJobPick job_pick = x_FindPendingJob(client, aff_ids, 00670 wnode_affinity, 00671 any_affinity, 00672 exclusive_new_affinity); 00673 {{ 00674 CFastMutexGuard guard(m_OperationLock); 00675 00676 if (job_pick.job_id == 0) { 00677 if (timeout != 0) { 00678 // WGET: 00679 // There is no job, so the client might need to be registered 00680 // in the waiting list 00681 if (port > 0) 00682 x_RegisterGetListener(client, port, timeout, aff_ids, 00683 wnode_affinity, any_affinity, 00684 exclusive_new_affinity, 00685 new_format); 00686 } 00687 return true; 00688 } 00689 00690 // Check that the job is still Pending 00691 if (GetJobStatus(job_pick.job_id) != 00692 CNetScheduleAPI::ePending) 00693 continue; // Try to pick a job again 00694 00695 // the job is still pending, check if it was received as 00696 // with exclusive affinity 00697 if (job_pick.exclusive && job_pick.aff_id != 0) { 00698 if (m_ClientsRegistry.IsPreferredByAny(job_pick.aff_id)) 00699 continue; // Other WN grabbed this affinity already 00700 m_ClientsRegistry.UpdatePreferredAffinities(client, 00701 job_pick.aff_id, 00702 0); 00703 } 00704 00705 if (x_UpdateDB_GetJobNoLock(client, curr, 00706 job_pick.job_id, *new_job)) { 00707 // The job is not expired and successfully read 00708 m_StatusTracker.ChangeStatus(this, job_pick.job_id, 00709 CNetScheduleAPI::eRunning); 00710 m_GCRegistry.UpdateLifetime(job_pick.job_id, 00711 new_job->GetExpirationTime(m_Timeout, 00712 m_RunTimeout)); 00713 TimeLineAdd(job_pick.job_id, curr + m_RunTimeout); 00714 m_ClientsRegistry.AddToRunning(client, job_pick.job_id); 00715 return true; 00716 } 00717 00718 // Reset job_id and try again 00719 new_job->SetId(0); 00720 }} 00721 } 00722 } 00723 catch (CBDB_ErrnoException & ex) { 00724 if (ex.IsDeadLock()) { 00725 if (++dead_locks < k_max_dead_locks) { 00726 ERR_POST("DeadLock repeat in CQueue::GetJobOrWait"); 00727 SleepMilliSec(100); 00728 continue; 00729 } 00730 } else if (ex.IsNoMem()) { 00731 if (++dead_locks < k_max_dead_locks) { 00732 ERR_POST("No resource repeat in CQueue::GetJobOrWait"); 00733 SleepMilliSec(100); 00734 continue; 00735 } 00736 } 00737 00738 if (ex.IsDeadLock() || ex.IsNoMem()) { 00739 string message = "Too many transaction repeats in " 00740 "CQueue::GetJobOrWait"; 00741 ERR_POST(message); 00742 NCBI_THROW(CNetScheduleException, eTryAgain, message); 00743 } 00744 00745 throw; 00746 } 00747 } 00748 return true; 00749 } 00750 00751 00752 void CQueue::CancelWaitGet(const CNSClientId & client) 00753 { 00754 bool result; 00755 00756 {{ 00757 CFastMutexGuard guard(m_OperationLock); 00758 00759 result = x_UnregisterGetListener(client, 0); 00760 }} 00761 00762 if (result == false) 00763 LOG_POST(Message << Warning << "Attempt to cancel WGET for the client " 00764 "which does not wait anything (node: " 00765 << client.GetNode() << " session: " 00766 << client.GetSession() << ")"); 00767 return; 00768 } 00769 00770 00771 string CQueue::ChangeAffinity(const CNSClientId & client, 00772 const list<string> & aff_to_add, 00773 const list<string> & aff_to_del) 00774 { 00775 // It is guaranteed here that the client is a new style one. 00776 // I.e. it has both client_node and client_session. 00777 00778 string msg; // Warning message for the socket 00779 unsigned int client_id = client.GetID(); 00780 TNSBitVector current_affinities = 00781 m_ClientsRegistry.GetPreferredAffinities(client); 00782 TNSBitVector aff_id_to_add; 00783 TNSBitVector aff_id_to_del; 00784 bool any_to_add = false; 00785 bool any_to_del = false; 00786 00787 // Identify the affinities which should be deleted 00788 for (list<string>::const_iterator k(aff_to_del.begin()); 00789 k != aff_to_del.end(); ++k) { 00790 unsigned int aff_id = m_AffinityRegistry.GetIDByToken(*k); 00791 00792 if (aff_id == 0) { 00793 // The affinity is not known for NS at all 00794 LOG_POST(Message << Warning << "Client '" << client.GetNode() 00795 << "' deletes unknown affinity '" 00796 << *k << "'. Ignored."); 00797 if (!msg.empty()) 00798 msg += "; "; 00799 msg += "unknown affinity to delete: " + *k; 00800 continue; 00801 } 00802 00803 if (current_affinities[aff_id] == false) { 00804 // This a try to delete something which has not been added or 00805 // deleted before. 00806 LOG_POST(Message << Warning << "Client '" << client.GetNode() 00807 << "' deletes affinity '" << *k 00808 << "' which is not in the list of the " 00809 "preferred client affinities. Ignored."); 00810 if (!msg.empty()) 00811 msg += "; "; 00812 msg += "not registered affinity to delete: " + *k; 00813 continue; 00814 } 00815 00816 // The affinity will really be deleted 00817 aff_id_to_del.set(aff_id, true); 00818 any_to_del = true; 00819 } 00820 00821 00822 // Check that the update of the affinities list will not exceed the limit 00823 // for the max number of affinities per client. 00824 // Note: this is not precise check. There could be non-unique affinities in 00825 // the add list or some of affinities to add could already be in the list. 00826 // The precise checking however requires more CPU and blocking so only an 00827 // approximate (but fast) checking is done. 00828 if (current_affinities.count() + aff_to_add.size() 00829 - aff_id_to_del.count() > 00830 m_MaxAffinities) { 00831 NCBI_THROW(CNetScheduleException, eTooManyPreferredAffinities, 00832 "The client '" + client.GetNode() + 00833 "' exceeds the limit (" + 00834 NStr::UIntToString(m_MaxAffinities) + 00835 ") of the preferred affinities. Changed request ignored."); 00836 } 00837 00838 // To avoid logging under the lock 00839 vector<string> already_added_affinities; 00840 00841 {{ 00842 00843 CFastMutexGuard guard(m_OperationLock); 00844 CNSTransaction transaction(this); 00845 00846 // Convert the aff_to_add to the affinity IDs 00847 for (list<string>::const_iterator k(aff_to_add.begin()); 00848 k != aff_to_add.end(); ++k ) { 00849 unsigned int aff_id = 00850 m_AffinityRegistry.ResolveAffinityToken(*k, 00851 0, client_id); 00852 00853 if (current_affinities[aff_id] == true) { 00854 already_added_affinities.push_back(*k); 00855 continue; 00856 } 00857 00858 aff_id_to_add.set(aff_id, true); 00859 any_to_add = true; 00860 } 00861 00862 transaction.Commit(); 00863 }} 00864 00865 // Log the warnings and add it to the warning message 00866 for (vector<string>::const_iterator j(already_added_affinities.begin()); 00867 j != already_added_affinities.end(); ++j) { 00868 // That was a try to add something which has already been added 00869 LOG_POST(Message << Warning << "Client '" << client.GetNode() 00870 << "' adds affinity '" << *j 00871 << "' which is already in the list of the " 00872 "preferred client affinities. Ignored."); 00873 if (!msg.empty()) 00874 msg += "; "; 00875 msg += "already registered affinity to add: " + *j; 00876 } 00877 00878 if (any_to_del) 00879 m_AffinityRegistry.RemoveClientFromAffinities(client_id, 00880 aff_id_to_del); 00881 00882 if (any_to_add || any_to_del) 00883 m_ClientsRegistry.UpdatePreferredAffinities(client, 00884 aff_id_to_add, 00885 aff_id_to_del); 00886 return msg; 00887 } 00888 00889 00890 TJobStatus CQueue::JobDelayExpiration(unsigned int job_id, 00891 time_t tm) 00892 { 00893 CJob job; 00894 unsigned queue_run_timeout = GetRunTimeout(); 00895 time_t curr = time(0); 00896 00897 {{ 00898 CFastMutexGuard guard(m_OperationLock); 00899 TJobStatus status = GetJobStatus(job_id); 00900 00901 if (status != CNetScheduleAPI::eRunning) 00902 return status; 00903 00904 time_t time_start = 0; 00905 time_t run_timeout = 0; 00906 {{ 00907 CNSTransaction transaction(this); 00908 00909 if (job.Fetch(this, job_id) != CJob::eJF_Ok) 00910 return CNetScheduleAPI::eJobNotFound; 00911 00912 time_start = job.GetLastEvent()->GetTimestamp(); 00913 run_timeout = job.GetRunTimeout(); 00914 if (run_timeout == 0) 00915 run_timeout = queue_run_timeout; 00916 00917 if (time_start + run_timeout > curr + tm) 00918 return CNetScheduleAPI::eRunning; // Old timeout is enough to 00919 // cover this request, so 00920 // keep it. 00921 00922 job.SetRunTimeout(curr + tm - time_start); 00923 job.SetLastTouch(curr); 00924 job.Flush(this); 00925 transaction.Commit(); 00926 }} 00927 00928 // No need to update the GC registry because the running (and reading) 00929 // jobs are skipped by GC 00930 00931 time_t exp_time = run_timeout == 0 ? 0 : time_start + run_timeout; 00932 00933 TimeLineMove(job_id, exp_time, curr + tm); 00934 }} 00935 00936 return CNetScheduleAPI::eRunning; 00937 } 00938 00939 00940 TJobStatus CQueue::GetStatusAndLifetime(unsigned int job_id, 00941 bool need_touch, 00942 time_t * lifetime) 00943 { 00944 CFastMutexGuard guard(m_OperationLock); 00945 TJobStatus status = GetJobStatus(job_id); 00946 00947 if (status == CNetScheduleAPI::eJobNotFound) 00948 return status; 00949 00950 if (need_touch) { 00951 CJob job; 00952 time_t curr = time(0); 00953 00954 {{ 00955 CNSTransaction transaction(this); 00956 00957 if (job.Fetch(this, job_id) != CJob::eJF_Ok) 00958 NCBI_THROW(CNetScheduleException, eInternalError, 00959 "Error fetching job: " + DecorateJobId(job_id)); 00960 00961 job.SetLastTouch(curr); 00962 job.Flush(this); 00963 transaction.Commit(); 00964 }} 00965 00966 m_GCRegistry.UpdateLifetime(job_id, 00967 job.GetExpirationTime(m_Timeout, 00968 m_RunTimeout, 00969 curr)); 00970 } 00971 00972 *lifetime = x_GetEstimatedJobLifetime(job_id, status); 00973 return status; 00974 } 00975 00976 00977 bool CQueue::PutProgressMessage(unsigned int job_id, 00978 const string & msg) 00979 { 00980 CJob job; 00981 time_t curr = time(0); 00982 00983 {{ 00984 CFastMutexGuard guard(m_OperationLock); 00985 00986 {{ 00987 CNSTransaction transaction(this); 00988 00989 if (job.Fetch(this, job_id) != CJob::eJF_Ok) 00990 return false; 00991 00992 job.SetProgressMsg(msg); 00993 job.SetLastTouch(curr); 00994 job.Flush(this); 00995 transaction.Commit(); 00996 }} 00997 00998 m_GCRegistry.UpdateLifetime(job_id, 00999 job.GetExpirationTime(m_Timeout, 01000 m_RunTimeout, 01001 curr)); 01002 }} 01003 01004 return true; 01005 } 01006 01007 01008 TJobStatus CQueue::ReturnJob(const CNSClientId & client, 01009 unsigned int job_id, 01010 const string & auth_token, 01011 string & warning) 01012 { 01013 CJob job; 01014 CFastMutexGuard guard(m_OperationLock); 01015 TJobStatus old_status = GetJobStatus(job_id); 01016 01017 if (old_status != CNetScheduleAPI::eRunning) 01018 return old_status; 01019 01020 {{ 01021 CNSTransaction transaction(this); 01022 01023 if (job.Fetch(this, job_id) != CJob::eJF_Ok) 01024 NCBI_THROW(CNetScheduleException, eInternalError, 01025 "Error fetching job: " + DecorateJobId(job_id)); 01026 if (job.GetStatus() != CNetScheduleAPI::eRunning) 01027 NCBI_THROW(CNetScheduleException, eInvalidJobStatus, 01028 "Operation is applicable to eRunning job state only"); 01029 01030 if (!auth_token.empty()) { 01031 // Need to check authorization token first 01032 CJob::EAuthTokenCompareResult token_compare_result = 01033 job.CompareAuthToken(auth_token); 01034 if (token_compare_result == CJob::eInvalidTokenFormat) 01035 NCBI_THROW(CNetScheduleException, eInvalidAuthToken, 01036 "Invalid authorization token format"); 01037 if (token_compare_result == CJob::eNoMatch) 01038 NCBI_THROW(CNetScheduleException, eInvalidAuthToken, 01039 "Authorization token does not match"); 01040 if (token_compare_result == CJob::ePassportOnlyMatch) { 01041 // That means the job has been given to another worker node 01042 // by whatever reason (expired/failed/returned before) 01043 LOG_POST(Message << Warning << "Received RETURN2 with only " 01044 "passport matched."); 01045 warning = "Only job passport matched. Command is ignored."; 01046 return old_status; 01047 } 01048 // Here: the authorization token is OK, we can continue 01049 } 01050 01051 01052 unsigned int run_count = job.GetRunCount(); 01053 CJobEvent * event = job.GetLastEvent(); 01054 time_t current_time = time(0); 01055 01056 if (!event) 01057 ERR_POST("No JobEvent for running job " << DecorateJobId(job_id)); 01058 01059 event = &job.AppendEvent(); 01060 event->SetNodeAddr(client.GetAddress()); 01061 event->SetStatus(CNetScheduleAPI::ePending); 01062 event->SetEvent(CJobEvent::eReturn); 01063 event->SetTimestamp(current_time); 01064 event->SetClientNode(client.GetNode()); 01065 event->SetClientSession(client.GetSession()); 01066 01067 if (run_count) 01068 job.SetRunCount(run_count-1); 01069 01070 job.SetStatus(CNetScheduleAPI::ePending); 01071 job.SetLastTouch(current_time); 01072 job.Flush(this); 01073 01074 transaction.Commit(); 01075 }} 01076 01077 m_StatusTracker.ChangeStatus(this, job_id, CNetScheduleAPI::ePending); 01078 TimeLineRemove(job_id); 01079 m_ClientsRegistry.ClearExecuting(job_id); 01080 m_ClientsRegistry.AddToBlacklist(client, job_id); 01081 m_GCRegistry.UpdateLifetime(job_id, job.GetExpirationTime(m_Timeout, 01082 m_RunTimeout)); 01083 01084 m_NotificationsList.Notify(job_id, 01085 job.GetAffinityId(), m_ClientsRegistry, 01086 m_AffinityRegistry, m_NotifHifreqPeriod); 01087 return old_status; 01088 } 01089 01090 01091 TJobStatus CQueue::ReadAndTouchJob(unsigned int job_id, 01092 CJob & job, 01093 time_t * lifetime) 01094 { 01095 CFastMutexGuard guard(m_OperationLock); 01096 TJobStatus status = GetJobStatus(job_id); 01097 01098 if (status == CNetScheduleAPI::eJobNotFound) 01099 return status; 01100 01101 time_t curr = time(0); 01102 {{ 01103 CNSTransaction transaction(this); 01104 01105 if (job.Fetch(this, job_id) != CJob::eJF_Ok) 01106 NCBI_THROW(CNetScheduleException, eInternalError, 01107 "Error fetching job: " + DecorateJobId(job_id)); 01108 01109 job.SetLastTouch(curr); 01110 job.Flush(this); 01111 transaction.Commit(); 01112 }} 01113 01114 m_GCRegistry.UpdateLifetime(job_id, 01115 job.GetExpirationTime(m_Timeout, 01116 m_RunTimeout, 01117 curr)); 01118 *lifetime = x_GetEstimatedJobLifetime(job_id, status); 01119 return status; 01120 } 01121 01122 01123 // Deletes all the jobs from the queue 01124 void CQueue::Truncate(void) 01125 { 01126 CJob job; 01127 TNSBitVector bv; 01128 vector<CNetScheduleAPI::EJobStatus> statuses; 01129 TNSBitVector jobs_to_notify; 01130 time_t current_time = time(0); 01131 01132 // Pending and running jobs should be notified if requested 01133 statuses.push_back(CNetScheduleAPI::ePending); 01134 statuses.push_back(CNetScheduleAPI::eRunning); 01135 01136 {{ 01137 CFastMutexGuard guard(m_OperationLock); 01138 01139 jobs_to_notify = m_StatusTracker.GetJobs(statuses); 01140 01141 // Make a decision if the jobs should really be notified 01142 {{ 01143 CNSTransaction transaction(this); 01144 TNSBitVector::enumerator en(jobs_to_notify.first()); 01145 for (; en.valid(); ++en) { 01146 unsigned int job_id = *en; 01147 01148 if (job.Fetch(this, job_id) != CJob::eJF_Ok) { 01149 ERR_POST("Cannot fetch job " << MakeKey(job_id) << 01150 " while dropping all jobs in the queue"); 01151 continue; 01152 } 01153 01154 if (job.ShouldNotify(current_time)) 01155 m_NotificationsList.NotifyJobStatus(job.GetSubmAddr(), 01156 job.GetSubmNotifPort(), 01157 MakeKey(job_id), 01158 job.GetStatus()); 01159 } 01160 01161 // There is no need to commit transaction - there were no changes 01162 }} 01163 01164 m_StatusTracker.ClearAll(&bv); 01165 m_AffinityRegistry.ClearMemoryAndDatabase(); 01166 m_GroupRegistry.ClearMemoryAndDatabase(); 01167 01168 CWriteLockGuard rtl_guard(m_RunTimeLineLock); 01169 m_RunTimeLine->ReInit(0); 01170 }} 01171 01172 x_Erase(bv); 01173 01174 // Next call updates 'm_BecameEmpty' timestamp 01175 IsExpired(); // locks CQueue lock 01176 } 01177 01178 01179 TJobStatus CQueue::Cancel(const CNSClientId & client, 01180 unsigned int job_id) 01181 { 01182 TJobStatus old_status; 01183 CJob job; 01184 time_t current_time = time(0); 01185 01186 {{ 01187 CFastMutexGuard guard(m_OperationLock); 01188 01189 old_status = m_StatusTracker.GetStatus(job_id); 01190 if (old_status == CNetScheduleAPI::eJobNotFound) 01191 return CNetScheduleAPI::eJobNotFound; 01192 if (old_status == CNetScheduleAPI::eCanceled) { 01193 m_StatisticsCounters.CountTransition( 01194 CNetScheduleAPI::eCanceled, 01195 CNetScheduleAPI::eCanceled); 01196 return CNetScheduleAPI::eCanceled; 01197 } 01198 01199 {{ 01200 CNSTransaction transaction(this); 01201 if (job.Fetch(this, job_id) != CJob::eJF_Ok) 01202 return CNetScheduleAPI::eJobNotFound; 01203 01204 CJobEvent * event = &job.AppendEvent(); 01205 01206 event->SetNodeAddr(client.GetAddress()); 01207 event->SetStatus(CNetScheduleAPI::eCanceled); 01208 event->SetEvent(CJobEvent::eCancel); 01209 event->SetTimestamp(current_time); 01210 event->SetClientNode(client.GetNode()); 01211 event->SetClientSession(client.GetSession()); 01212 01213 job.SetStatus(CNetScheduleAPI::eCanceled); 01214 job.SetLastTouch(current_time); 01215 job.Flush(this); 01216 01217 transaction.Commit(); 01218 }} 01219 01220 m_StatusTracker.ChangeStatus(this, job_id, CNetScheduleAPI::eCanceled); 01221 01222 TimeLineRemove(job_id); 01223 if (old_status == CNetScheduleAPI::eRunning) 01224 m_ClientsRegistry.ClearExecuting(job_id); 01225 else if (old_status == CNetScheduleAPI::eReading) 01226 m_ClientsRegistry.ClearReading(job_id); 01227 01228 m_GCRegistry.UpdateLifetime(job_id, job.GetExpirationTime(m_Timeout, 01229 m_RunTimeout)); 01230 01231 }} 01232 01233 if ((old_status == CNetScheduleAPI::eRunning || 01234 old_status == CNetScheduleAPI::ePending) && job.ShouldNotify(current_time)) 01235 m_NotificationsList.NotifyJobStatus(job.GetSubmAddr(), 01236 job.GetSubmNotifPort(), 01237 MakeKey(job_id), 01238 job.GetStatus()); 01239 01240 return old_status; 01241 } 01242 01243 01244 void CQueue::CancelAllJobs(const CNSClientId & client) 01245 { 01246 vector<CNetScheduleAPI::EJobStatus> statuses; 01247 01248 // All except cancelled 01249 statuses.push_back(CNetScheduleAPI::ePending); 01250 statuses.push_back(CNetScheduleAPI::eRunning); 01251 statuses.push_back(CNetScheduleAPI::eFailed); 01252 statuses.push_back(CNetScheduleAPI::eDone); 01253 statuses.push_back(CNetScheduleAPI::eReading); 01254 statuses.push_back(CNetScheduleAPI::eConfirmed); 01255 statuses.push_back(CNetScheduleAPI::eReadFailed); 01256 01257 CFastMutexGuard guard(m_OperationLock); 01258 x_CancelJobs(client, m_StatusTracker.GetJobs(statuses)); 01259 return; 01260 } 01261 01262 01263 void CQueue::x_CancelJobs(const CNSClientId & client, 01264 const TNSBitVector & jobs_to_cancel) 01265 { 01266 CJob job; 01267 time_t current_time = time(0); 01268 TNSBitVector::enumerator en(jobs_to_cancel.first()); 01269 for (; en.valid(); ++en) { 01270 unsigned int job_id = *en; 01271 TJobStatus old_status = m_StatusTracker.GetStatus(job_id); 01272 01273 {{ 01274 CNSTransaction transaction(this); 01275 if (job.Fetch(this, job_id) != CJob::eJF_Ok) { 01276 ERR_POST("Cannot fetch job " << MakeKey(job_id) << 01277 " while cancelling jobs"); 01278 continue; 01279 } 01280 01281 CJobEvent * event = &job.AppendEvent(); 01282 01283 event->SetNodeAddr(client.GetAddress()); 01284 event->SetStatus(CNetScheduleAPI::eCanceled); 01285 event->SetEvent(CJobEvent::eCancel); 01286 event->SetTimestamp(current_time); 01287 event->SetClientNode(client.GetNode()); 01288 event->SetClientSession(client.GetSession()); 01289 01290 job.SetStatus(CNetScheduleAPI::eCanceled); 01291 job.SetLastTouch(current_time); 01292 job.Flush(this); 01293 01294 transaction.Commit(); 01295 }} 01296 01297 m_StatusTracker.ChangeStatus(this, job_id, CNetScheduleAPI::eCanceled); 01298 01299 TimeLineRemove(job_id); 01300 if (old_status == CNetScheduleAPI::eRunning) 01301 m_ClientsRegistry.ClearExecuting(job_id); 01302 else if (old_status == CNetScheduleAPI::eReading) 01303 m_ClientsRegistry.ClearReading(job_id); 01304 01305 m_GCRegistry.UpdateLifetime(job_id, job.GetExpirationTime(m_Timeout, 01306 m_RunTimeout)); 01307 if ((old_status == CNetScheduleAPI::eRunning || 01308 old_status == CNetScheduleAPI::ePending) && job.ShouldNotify(current_time)) 01309 m_NotificationsList.NotifyJobStatus(job.GetSubmAddr(), 01310 job.GetSubmNotifPort(), 01311 MakeKey(job_id), 01312 job.GetStatus()); 01313 } 01314 01315 return; 01316 } 01317 01318 01319 // This function must be called under the operations lock. 01320 // If called for not existing job then an exception is generated 01321 time_t 01322 CQueue::x_GetEstimatedJobLifetime(unsigned int job_id, 01323 TJobStatus status) const 01324 { 01325 if (status == CNetScheduleAPI::eRunning || 01326 status == CNetScheduleAPI::eReading) 01327 return time(0) + GetTimeout(); 01328 return m_GCRegistry.GetLifetime(job_id); 01329 } 01330 01331 01332 void CQueue::CancelGroup(const CNSClientId & client, 01333 const string & group) 01334 { 01335 CFastMutexGuard guard(m_OperationLock); 01336 x_CancelJobs(client, m_GroupRegistry.GetJobs(group)); 01337 return; 01338 } 01339 01340 01341 TJobStatus CQueue::GetJobStatus(unsigned int job_id) const 01342 { 01343 return m_StatusTracker.GetStatus(job_id); 01344 } 01345 01346 01347 bool CQueue::IsExpired() 01348 { 01349 time_t empty_lifetime = GetEmptyLifetime(); 01350 CFastMutexGuard guard(m_OperationLock); 01351 01352 if (m_Kind && empty_lifetime > 0) { 01353 if (m_StatusTracker.Count()) { 01354 m_BecameEmpty = -1; 01355 } else { 01356 if (m_BecameEmpty != -1 && 01357 m_BecameEmpty + empty_lifetime < time(0)) 01358 { 01359 LOG_POST(Message << Warning << "Queue " << m_QueueName 01360 << " expired. Became empty: " 01361 << CTime(m_BecameEmpty).ToLocalTime().AsString() 01362 << " Empty lifetime: " << empty_lifetime 01363 << " sec." ); 01364 return true; 01365 } 01366 if (m_BecameEmpty == -1) 01367 m_BecameEmpty = time(0); 01368 } 01369 } 01370 return false; 01371 } 01372 01373 01374 unsigned int CQueue::GetNextId() 01375 { 01376 CFastMutexGuard guard(m_LastIdLock); 01377 01378 // Job indexes are expected to start from 1, 01379 // the m_LastId is 0 at the very beginning 01380 ++m_LastId; 01381 if (m_LastId >= m_SavedId) { 01382 m_SavedId += s_ReserveDelta; 01383 if (m_SavedId < m_LastId) { 01384 // Overflow for the saved ID 01385 m_LastId = 1; 01386 m_SavedId = s_ReserveDelta; 01387 } 01388 x_UpdateStartFromCounter(); 01389 } 01390 return m_LastId; 01391 } 01392 01393 01394 // Reserves the given number of the job IDs 01395 unsigned int CQueue::GetNextJobIdForBatch(unsigned int count) 01396 { 01397 CFastMutexGuard guard(m_LastIdLock); 01398 01399 // Job indexes are expected to start from 1 and be monotonously growing 01400 unsigned int start_index = m_LastId; 01401 01402 m_LastId += count; 01403 if (m_LastId < start_index ) { 01404 // Overflow 01405 m_LastId = count; 01406 m_SavedId = count + s_ReserveDelta; 01407 x_UpdateStartFromCounter(); 01408 } 01409 01410 // There were no overflow, check the reserved value 01411 if (m_LastId >= m_SavedId) { 01412 m_SavedId += s_ReserveDelta; 01413 if (m_SavedId < m_LastId) { 01414 // Overflow for the saved ID 01415 m_LastId = count; 01416 m_SavedId = count + s_ReserveDelta; 01417 } 01418 x_UpdateStartFromCounter(); 01419 } 01420 01421 return m_LastId - count + 1; 01422 } 01423 01424 01425 void CQueue::x_UpdateStartFromCounter(void) 01426 { 01427 // Updates the start_from value in the Berkley DB file 01428 if (m_QueueDbBlock) { 01429 // The only record is expected and its key is always 1 01430 m_QueueDbBlock->start_from_db.pseudo_key = 1; 01431 m_QueueDbBlock->start_from_db.start_from = m_SavedId; 01432 m_QueueDbBlock->start_from_db.UpdateInsert(); 01433 } 01434 return; 01435 } 01436 01437 01438 unsigned int CQueue::x_ReadStartFromCounter(void) 01439 { 01440 // Reads the start_from value from the Berkley DB file 01441 if (m_QueueDbBlock) { 01442 // The only record is expected and its key is always 1 01443 m_QueueDbBlock->start_from_db.pseudo_key = 1; 01444 m_QueueDbBlock->start_from_db.Fetch(); 01445 return m_QueueDbBlock->start_from_db.start_from; 01446 } 01447 return 1; 01448 } 01449 01450 01451 void CQueue::GetJobForReading(const CNSClientId & client, 01452 unsigned int read_timeout, 01453 const string & group, 01454 CJob * job) 01455 { 01456 time_t curr(time(0)); 01457 vector<CNetScheduleAPI::EJobStatus> from_state; 01458 01459 from_state.push_back(CNetScheduleAPI::eDone); 01460 from_state.push_back(CNetScheduleAPI::eFailed); 01461 01462 CFastMutexGuard guard(m_OperationLock); 01463 TNSBitVector candidates = m_StatusTracker.GetJobs(from_state); 01464 01465 // Exclude blacklisted jobs 01466 candidates -= m_ClientsRegistry.GetBlacklistedJobs(client); 01467 01468 if (!group.empty()) 01469 // Apply restrictions on the group jobs if so 01470 candidates &= m_GroupRegistry.GetJobs(group); 01471 01472 if (!candidates.any()) 01473 return; 01474 01475 unsigned int job_id = *candidates.first(); 01476 {{ 01477 CNSTransaction transaction(this); 01478 01479 // Fetch the job and update it and flush 01480 job->Fetch(this, job_id); 01481 01482 unsigned int read_count = job->GetReadCount() + 1; 01483 CJobEvent * event = &job->AppendEvent(); 01484 01485 event->SetStatus(CNetScheduleAPI::eReading); 01486 event->SetEvent(CJobEvent::eRead); 01487 event->SetTimestamp(curr); 01488 event->SetNodeAddr(client.GetAddress()); 01489 event->SetClientNode(client.GetNode()); 01490 event->SetClientSession(client.GetSession()); 01491 01492 job->SetRunTimeout(read_timeout); 01493 job->SetStatus(CNetScheduleAPI::eReading); 01494 job->SetReadCount(read_count); 01495 job->SetLastTouch(curr); 01496 job->Flush(this); 01497 01498 transaction.Commit(); 01499 }} 01500 01501 if (read_timeout == 0) 01502 read_timeout = m_RunTimeout; 01503 01504 if (read_timeout != 0) 01505 TimeLineAdd(job_id, curr + read_timeout); 01506 01507 // Update the memory cache 01508 m_StatusTracker.ChangeStatus(this, job_id, CNetScheduleAPI::eReading); 01509 m_ClientsRegistry.AddToReading(client, job_id); 01510 01511 m_GCRegistry.UpdateLifetime(job_id, job->GetExpirationTime(m_Timeout, 01512 m_RunTimeout)); 01513 return; 01514 } 01515 01516 01517 TJobStatus CQueue::ConfirmReadingJob(const CNSClientId & client, 01518 unsigned int job_id, 01519 const string & auth_token) 01520 { 01521 TJobStatus old_status = x_ChangeReadingStatus( 01522 client, job_id, 01523 auth_token, 01524 CNetScheduleAPI::eConfirmed); 01525 m_ClientsRegistry.ClearReading(client, job_id); 01526 return old_status; 01527 } 01528 01529 01530 TJobStatus CQueue::FailReadingJob(const CNSClientId & client, 01531 unsigned int job_id, 01532 const string & auth_token) 01533 { 01534 TJobStatus old_status = x_ChangeReadingStatus( 01535 client, job_id, 01536 auth_token, 01537 CNetScheduleAPI::eReadFailed); 01538 m_ClientsRegistry.ClearReadingSetBlacklist(job_id); 01539 return old_status; 01540 } 01541 01542 01543 TJobStatus CQueue::ReturnReadingJob(const CNSClientId & client, 01544 unsigned int job_id, 01545 const string & auth_token) 01546 { 01547 TJobStatus old_status = x_ChangeReadingStatus( 01548 client, job_id, 01549 auth_token, 01550 CNetScheduleAPI::eDone); 01551 m_ClientsRegistry.ClearReadingSetBlacklist(job_id); 01552 return old_status; 01553 } 01554 01555 01556 TJobStatus CQueue::x_ChangeReadingStatus(const CNSClientId & client, 01557 unsigned int job_id, 01558 const string & auth_token, 01559 TJobStatus target_status) 01560 { 01561 time_t current_time = time(0); 01562 CJob job; 01563 TJobStatus new_status = target_status; 01564 CStatisticsCounters::ETransitionPathOption path_option = 01565 CStatisticsCounters::eNone; 01566 CFastMutexGuard guard(m_OperationLock); 01567 TJobStatus old_status = GetJobStatus(job_id); 01568 01569 if (old_status != CNetScheduleAPI::eReading) 01570 return old_status; 01571 01572 {{ 01573 CNSTransaction transaction(this); 01574 01575 if (job.Fetch(this, job_id) != CJob::eJF_Ok) 01576 NCBI_THROW(CNetScheduleException, eInternalError, 01577 "Error fetching job: " + DecorateJobId(job_id)); 01578 01579 // Check that authorization token matches 01580 CJob::EAuthTokenCompareResult token_compare_result = 01581 job.CompareAuthToken(auth_token); 01582 if (token_compare_result == CJob::eInvalidTokenFormat) 01583 NCBI_THROW(CNetScheduleException, eInvalidAuthToken, 01584 "Invalid authorization token format"); 01585 if (token_compare_result == CJob::eNoMatch) 01586 NCBI_THROW(CNetScheduleException, eInvalidAuthToken, 01587 "Authorization token does not match"); 01588 01589 // Sanity check of the current job state 01590 if (job.GetStatus() != CNetScheduleAPI::eReading) 01591 NCBI_THROW(CNetScheduleException, eInternalError, 01592 "Internal inconsistency detected. The job " + 01593 DecorateJobId(job_id) + " state in memory is " + 01594 CNetScheduleAPI::StatusToString(CNetScheduleAPI::eReading) + 01595 " while in database it is " + 01596 CNetScheduleAPI::StatusToString(job.GetStatus())); 01597 01598 // Add an event 01599 CJobEvent & event = job.AppendEvent(); 01600 event.SetTimestamp(current_time); 01601 event.SetNodeAddr(client.GetAddress()); 01602 event.SetClientNode(client.GetNode()); 01603 event.SetClientSession(client.GetSession()); 01604 01605 switch (target_status) { 01606 case CNetScheduleAPI::eDone: 01607 event.SetEvent(CJobEvent::eReadRollback); 01608 job.SetReadCount(job.GetReadCount() - 1); 01609 break; 01610 case CNetScheduleAPI::eReadFailed: 01611 event.SetEvent(CJobEvent::eReadFail); 01612 // Check the number of tries first 01613 if (job.GetReadCount() <= m_FailedRetries) { 01614 // The job needs to be re-scheduled for reading 01615 new_status = CNetScheduleAPI::eDone; 01616 path_option = CStatisticsCounters::eFail; 01617 } 01618 break; 01619 case CNetScheduleAPI::eConfirmed: 01620 event.SetEvent(CJobEvent::eReadDone); 01621 break; 01622 default: 01623 _ASSERT(0); 01624 break; 01625 } 01626 01627 event.SetStatus(new_status); 01628 job.SetStatus(new_status); 01629 job.SetLastTouch(current_time); 01630 01631 job.Flush(this); 01632 transaction.Commit(); 01633 }} 01634 01635 TimeLineRemove(job_id); 01636 01637 m_StatusTracker.SetStatus(job_id, new_status); 01638 m_GCRegistry.UpdateLifetime(job_id, job.GetExpirationTime(m_Timeout, 01639 m_RunTimeout)); 01640 m_StatisticsCounters.CountTransition(CNetScheduleAPI::eReading, 01641 new_status, 01642 path_option); 01643 return CNetScheduleAPI::eReading; 01644 } 01645 01646 01647 // This function is called from places where the operations lock has been 01648 // already taken. So there is no lock around memory status tracker 01649 void CQueue::EraseJob(unsigned int job_id) 01650 { 01651 m_StatusTracker.Erase(job_id); 01652 01653 {{ 01654 // Request delayed record delete 01655 CFastMutexGuard jtd_guard(m_JobsToDeleteLock); 01656 01657 m_JobsToDelete.set_bit(job_id); 01658 }} 01659 TimeLineRemove(job_id); 01660 } 01661 01662 01663 void CQueue::x_Erase(const TNSBitVector & job_ids) 01664 { 01665 CFastMutexGuard jtd_guard(m_JobsToDeleteLock); 01666 01667 m_JobsToDelete |= job_ids; 01668 } 01669 01670 01671 void CQueue::OptimizeMem() 01672 { 01673 m_StatusTracker.OptimizeMem(); 01674 } 01675 01676 01677 CQueue::x_SJobPick 01678 CQueue::x_FindPendingJob(const CNSClientId & client, 01679 const TNSBitVector & aff_ids, 01680 bool wnode_affinity, 01681 bool any_affinity, 01682 bool exclusive_new_affinity) 01683 { 01684 x_SJobPick job_pick; 01685 bool explicit_aff = aff_ids.any(); 01686 unsigned int wnode_aff_candidate = 0; 01687 unsigned int exclusive_aff_candidate = 0; 01688 TNSBitVector blacklisted_jobs = 01689 m_ClientsRegistry.GetBlacklistedJobs(client); 01690 01691 TNSBitVector pref_aff; 01692 if (wnode_affinity) { 01693 pref_aff = m_ClientsRegistry.GetPreferredAffinities(client); 01694 wnode_affinity = wnode_affinity && pref_aff.any(); 01695 } 01696 01697 TNSBitVector all_pref_aff; 01698 if (exclusive_new_affinity) 01699 all_pref_aff = m_ClientsRegistry.GetAllPreferredAffinities(); 01700 01701 if (explicit_aff || wnode_affinity || exclusive_new_affinity) { 01702 // Check all pending jobs 01703 TNSBitVector pending_jobs = 01704 m_StatusTracker.GetJobs(CNetScheduleAPI::ePending); 01705 TNSBitVector::enumerator en(pending_jobs.first()); 01706 01707 for (; en.valid(); ++en) { 01708 unsigned int job_id = *en; 01709 01710 if (blacklisted_jobs[job_id] == true) 01711 continue; 01712 01713 unsigned int aff_id = m_GCRegistry.GetAffinityID(job_id); 01714 if (aff_id != 0 && explicit_aff) { 01715 if (aff_ids[aff_id] == true) { 01716 job_pick.job_id = job_id; 01717 job_pick.exclusive = false; 01718 job_pick.aff_id = aff_id; 01719 return job_pick; 01720 } 01721 } 01722 01723 if (aff_id != 0 && wnode_affinity) { 01724 if (pref_aff[aff_id] == true) { 01725 if (explicit_aff == false) { 01726 job_pick.job_id = job_id; 01727 job_pick.exclusive = false; 01728 job_pick.aff_id = aff_id; 01729 return job_pick; 01730 } 01731 if (wnode_aff_candidate == 0) 01732 wnode_aff_candidate = job_id; 01733 continue; 01734 } 01735 } 01736 01737 if (exclusive_new_affinity) { 01738 if (aff_id == 0 || all_pref_aff[aff_id] == false) { 01739 if (explicit_aff == false && wnode_affinity == false) { 01740 job_pick.job_id = job_id; 01741 job_pick.exclusive = true; 01742 job_pick.aff_id = aff_id; 01743 return job_pick; 01744 } 01745 if (exclusive_aff_candidate == 0) 01746 exclusive_aff_candidate = job_id; 01747 } 01748 } 01749 } 01750 01751 if (wnode_aff_candidate != 0) { 01752 job_pick.job_id = wnode_aff_candidate; 01753 job_pick.exclusive = false; 01754 job_pick.aff_id = 0; 01755 return job_pick; 01756 } 01757 01758 if (exclusive_aff_candidate != 0) { 01759 job_pick.job_id = exclusive_aff_candidate; 01760 job_pick.exclusive = true; 01761 job_pick.aff_id = m_GCRegistry.GetAffinityID(exclusive_aff_candidate); 01762 return job_pick; 01763 } 01764 } 01765 01766 01767 if (any_affinity || (!explicit_aff && !wnode_affinity)) { 01768 job_pick.job_id = m_StatusTracker.GetJobByStatus( 01769 CNetScheduleAPI::ePending, 01770 blacklisted_jobs, 01771 TNSBitVector()); 01772 job_pick.exclusive = false; 01773 job_pick.aff_id = 0; 01774 return job_pick; 01775 } 01776 01777 job_pick.job_id = 0; 01778 job_pick.exclusive = false; 01779 job_pick.aff_id = 0; 01780 return job_pick; 01781 } 01782 01783 01784 01785 string CQueue::GetAffinityList() 01786 { 01787 list< SAffinityStatistics > 01788 statistics = m_AffinityRegistry.GetAffinityStatistics(m_StatusTracker); 01789 01790 string aff_list; 01791 for (list< SAffinityStatistics >::const_iterator k = statistics.begin(); 01792 k != statistics.end(); ++k) { 01793 if (!aff_list.empty()) 01794 aff_list += "&"; 01795 01796 aff_list += NStr::URLEncode(k->m_Token) + '=' + 01797 NStr::SizetToString(k->m_NumberOfPendingJobs) + "," + 01798 NStr::SizetToString(k->m_NumberOfRunningJobs) + "," + 01799 NStr::SizetToString(k->m_NumberOfPreferred) + "," + 01800 NStr::SizetToString(k->m_NumberOfWaitGet); 01801 } 01802 return aff_list; 01803 } 01804 01805 01806 TJobStatus CQueue::FailJob(const CNSClientId & client, 01807 unsigned int job_id, 01808 const string & auth_token, 01809 const string & err_msg, 01810 const string & output, 01811 int ret_code, 01812 string warning) 01813 { 01814 unsigned failed_retries; 01815 unsigned max_output_size; 01816 // time_t blacklist_time; 01817 {{ 01818 CQueueParamAccessor qp(*this); 01819 failed_retries = qp.GetFailedRetries(); 01820 max_output_size = qp.GetMaxOutputSize(); 01821 // blacklist_time = qp.GetBlacklistTime(); 01822 }} 01823 01824 if (output.size() > max_output_size) { 01825 NCBI_THROW(CNetScheduleException, eDataTooLong, 01826 "Output is too long"); 01827 } 01828 01829 CJob job; 01830 time_t curr = time(0); 01831 bool rescheduled = false; 01832 TJobStatus old_status; 01833 01834 {{ 01835 CFastMutexGuard guard(m_OperationLock); 01836 TJobStatus new_status = CNetScheduleAPI::eFailed; 01837 01838 old_status = GetJobStatus(job_id); 01839 if (old_status == CNetScheduleAPI::eFailed) { 01840 m_StatisticsCounters.CountTransition(CNetScheduleAPI::eFailed, 01841 CNetScheduleAPI::eFailed); 01842 return old_status; 01843 } 01844 01845 if (old_status != CNetScheduleAPI::eRunning) { 01846 // No job state change 01847 return old_status; 01848 } 01849 01850 {{ 01851 CNSTransaction transaction(this); 01852 01853 if (job.Fetch(this, job_id) != CJob::eJF_Ok) 01854 NCBI_THROW(CNetScheduleException, eInternalError, 01855 "Error fetching job: " + DecorateJobId(job_id)); 01856 01857 if (!auth_token.empty()) { 01858 // Need to check authorization token first 01859 CJob::EAuthTokenCompareResult token_compare_result = 01860 job.CompareAuthToken(auth_token); 01861 if (token_compare_result == CJob::eInvalidTokenFormat) 01862 NCBI_THROW(CNetScheduleException, eInvalidAuthToken, 01863 "Invalid authorization token format"); 01864 if (token_compare_result == CJob::eNoMatch) 01865 NCBI_THROW(CNetScheduleException, eInvalidAuthToken, 01866 "Authorization token does not match"); 01867 if (token_compare_result == CJob::ePassportOnlyMatch) { 01868 // That means the job has been given to another worker node 01869 // by whatever reason (expired/failed/returned before) 01870 LOG_POST(Message << Warning << "Received FPUT2 with only " 01871 "passport matched."); 01872 warning = "Only job passport matched. Command is ignored."; 01873 return old_status; 01874 } 01875 // Here: the authorization token is OK, we can continue 01876 } 01877 01878 CJobEvent * event = job.GetLastEvent(); 01879 if (!event) 01880 ERR_POST("No JobEvent for running job " << DecorateJobId(job_id)); 01881 01882 event = &job.AppendEvent(); 01883 event->SetEvent(CJobEvent::eFail); 01884 event->SetStatus(CNetScheduleAPI::eFailed); 01885 event->SetTimestamp(curr); 01886 event->SetErrorMsg(err_msg); 01887 event->SetRetCode(ret_code); 01888 event->SetNodeAddr(client.GetAddress()); 01889 event->SetClientNode(client.GetNode()); 01890 event->SetClientSession(client.GetSession()); 01891 01892 unsigned run_count = job.GetRunCount(); 01893 if (run_count <= failed_retries) { 01894 job.SetStatus(CNetScheduleAPI::ePending); 01895 event->SetStatus(CNetScheduleAPI::ePending); 01896 01897 new_status = CNetScheduleAPI::ePending; 01898 01899 rescheduled = true; 01900 } else { 01901 job.SetStatus(CNetScheduleAPI::eFailed); 01902 event->SetStatus(CNetScheduleAPI::eFailed); 01903 new_status = CNetScheduleAPI::eFailed; 01904 rescheduled = false; 01905 if (m_Log) 01906 LOG_POST(Message << Warning << "Job " << DecorateJobId(job_id) 01907 << " failed, exceeded max number of retries (" 01908 << failed_retries << ")"); 01909 } 01910 01911 job.SetOutput(output); 01912 job.SetLastTouch(curr); 01913 job.Flush(this); 01914 transaction.Commit(); 01915 }} 01916 01917 m_StatusTracker.SetStatus(job_id, new_status); 01918 if (new_status == CNetScheduleAPI::ePending) 01919 m_StatisticsCounters.CountTransition(CNetScheduleAPI::eRunning, 01920 new_status, 01921 CStatisticsCounters::eFail); 01922 else 01923 m_StatisticsCounters.CountTransition(CNetScheduleAPI::eRunning, 01924 new_status, 01925 CStatisticsCounters::eNone); 01926 01927 TimeLineRemove(job_id); 01928 01929 // Replace it with ClearExecuting(client, job_id) when all clients provide 01930 // their credentials and job passport is checked strictly 01931 m_ClientsRegistry.ClearExecuting(job_id); 01932 m_ClientsRegistry.AddToBlacklist(client, job_id); 01933 01934 m_GCRegistry.UpdateLifetime(job_id, job.GetExpirationTime(m_Timeout, 01935 m_RunTimeout)); 01936 01937 if (!rescheduled && job.ShouldNotify(curr)) { 01938 m_NotificationsList.NotifyJobStatus(job.GetSubmAddr(), 01939 job.GetSubmNotifPort(), 01940 MakeKey(job_id), 01941 job.GetStatus()); 01942 } 01943 01944 if (rescheduled) 01945 m_NotificationsList.Notify(job_id, 01946 job.GetAffinityId(), m_ClientsRegistry, 01947 m_AffinityRegistry, m_NotifHifreqPeriod); 01948 01949 }} 01950 01951 return old_status; 01952 } 01953 01954 01955 string CQueue::GetAffinityTokenByID(unsigned int aff_id) const 01956 { 01957 return m_AffinityRegistry.GetTokenByID(aff_id); 01958 } 01959 01960 01961 01962 /// Specified status is OR-ed with the target vector 01963 void CQueue::JobsWithStatus(TJobStatus status, 01964 TNSBitVector* bv) const 01965 { 01966 m_StatusTracker.StatusSnapshot(status, bv); 01967 } 01968 01969 01970 void CQueue::ClearWorkerNode(const CNSClientId & client) 01971 { 01972 // Get the running and reading jobs and move them to the corresponding 01973 // states (pending and done) 01974 01975 TNSBitVector running_jobs; 01976 TNSBitVector reading_jobs; 01977 01978 {{ 01979 CFastMutexGuard guard(m_OperationLock); 01980 unsigned short wait_port = m_ClientsRegistry.ClearWorkerNode( 01981 client, 01982 m_AffinityRegistry, 01983 running_jobs, 01984 reading_jobs); 01985 01986 if (wait_port != 0) 01987 m_NotificationsList.UnregisterListener(client, wait_port); 01988 }} 01989 01990 if (running_jobs.any()) 01991 x_ResetRunningDueToClear(client, running_jobs); 01992 if (reading_jobs.any()) 01993 x_ResetReadingDueToClear(client, reading_jobs); 01994 return; 01995 } 01996 01997 01998 void CQueue::NotifyListenersPeriodically(time_t current_time) 01999 { 02000 // Triggered from a notification thread only, so check first the 02001 // configured notification interval 02002 static double last_notif_timeout = -1.0; 02003 static size_t skip_limit = 0; 02004 static size_t skip_count; 02005 02006 if (m_NotifHifreqInterval != last_notif_timeout) { 02007 last_notif_timeout = m_NotifHifreqInterval; 02008 skip_count = 0; 02009 skip_limit = size_t(m_NotifHifreqInterval/0.1); 02010 } 02011 02012 ++skip_count; 02013 if (skip_count < skip_limit) 02014 return; 02015 02016 skip_count = 0; 02017 02018 // The NotifyPeriodically() and CheckTimeout() calls may need to modify 02019 // the clients and affinity registry so it is safer to take the queue lock. 02020 CFastMutexGuard guard(m_OperationLock); 02021 if (m_StatusTracker.AnyPending()) 02022 m_NotificationsList.NotifyPeriodically(current_time, 02023 m_NotifLofreqMult, 02024 m_ClientsRegistry, 02025 m_AffinityRegistry); 02026 else 02027 m_NotificationsList.CheckTimeout(current_time, 02028 m_ClientsRegistry, 02029 m_AffinityRegistry); 02030 02031 return; 02032 } 02033 02034 02035 void CQueue::PrintClientsList(CNetScheduleHandler & handler, 02036 bool verbose) const 02037 { 02038 m_ClientsRegistry.PrintClientsList(this, handler, 02039 m_AffinityRegistry, verbose); 02040 } 02041 02042 02043 void CQueue::PrintNotificationsList(CNetScheduleHandler & handler, 02044 bool verbose) const 02045 { 02046 m_NotificationsList.Print(handler, 02047 m_ClientsRegistry, 02048 m_AffinityRegistry, verbose); 02049 } 02050 02051 02052 void CQueue::PrintAffinitiesList(CNetScheduleHandler & handler, 02053 bool verbose) const 02054 { 02055 m_AffinityRegistry.Print(this, 02056 handler, 02057 m_ClientsRegistry, 02058 verbose); 02059 } 02060 02061 02062 void CQueue::PrintGroupsList(CNetScheduleHandler & handler, 02063 bool verbose) const 02064 { 02065 m_GroupRegistry.Print(this, handler, verbose); 02066 } 02067 02068 02069 void CQueue::CheckExecutionTimeout(bool logging) 02070 { 02071 if (!m_RunTimeLine) 02072 return; 02073 02074 unsigned queue_run_timeout = GetRunTimeout(); 02075 time_t curr = time(0); 02076 TNSBitVector bv; 02077 {{ 02078 CReadLockGuard guard(m_RunTimeLineLock); 02079 m_RunTimeLine->ExtractObjects(curr, &bv); 02080 }} 02081 02082 TNSBitVector::enumerator en(bv.first()); 02083 for ( ;en.valid(); ++en) { 02084 x_CheckExecutionTimeout(queue_run_timeout, *en, curr, logging); 02085 } 02086 } 02087 02088 02089 void CQueue::x_CheckExecutionTimeout(unsigned queue_run_timeout, 02090 unsigned job_id, 02091 time_t curr_time, 02092 bool logging) 02093 { 02094 CJob job; 02095 unsigned time_start = 0; 02096 unsigned run_timeout = 0; 02097 time_t exp_time = 0; 02098 TJobStatus status; 02099 TJobStatus new_status; 02100 CJobEvent::EJobEvent event_type; 02101 02102 02103 {{ 02104 CFastMutexGuard guard(m_OperationLock); 02105 02106 status = GetJobStatus(job_id); 02107 if (status == CNetScheduleAPI::eRunning) { 02108 new_status = CNetScheduleAPI::ePending; 02109 event_type = CJobEvent::eTimeout; 02110 } else if (status == CNetScheduleAPI::eReading) { 02111 new_status = CNetScheduleAPI::eDone; 02112 event_type = CJobEvent::eReadTimeout; 02113 } else 02114 return; // Execution timeout is for Running and Reading jobs only 02115 02116 02117 {{ 02118 CNSTransaction transaction(this); 02119 02120 if (job.Fetch(this, job_id) != CJob::eJF_Ok) 02121 return; 02122 02123 CJobEvent * event = job.GetLastEvent(); 02124 time_start = event->GetTimestamp(); 02125 run_timeout = job.GetRunTimeout(); 02126 if (run_timeout == 0) 02127 run_timeout = queue_run_timeout; 02128 02129 exp_time = run_timeout ? time_start + run_timeout : 0; 02130 if (curr_time < exp_time) { 02131 // we need to register job in time line 02132 TimeLineAdd(job_id, exp_time); 02133 return; 02134 } 02135 02136 // The job timeout (running or reading) is expired. 02137 // Check the try counter, we may need to fail the job. 02138 if (status == CNetScheduleAPI::eRunning) { 02139 // Running state 02140 if (job.GetRunCount() > m_FailedRetries) 02141 new_status = CNetScheduleAPI::eFailed; 02142 } else { 02143 // Reading state 02144 if (job.GetReadCount() > m_FailedRetries) 02145 new_status = CNetScheduleAPI::eReadFailed; 02146 } 02147 02148 job.SetStatus(new_status); 02149 job.SetLastTouch(curr_time); 02150 02151 event = &job.AppendEvent(); 02152 event->SetStatus(new_status); 02153 event->SetEvent(event_type); 02154 event->SetTimestamp(curr_time); 02155 02156 job.Flush(this); 02157 transaction.Commit(); 02158 }} 02159 02160 02161 m_StatusTracker.SetStatus(job_id, new_status); 02162 m_GCRegistry.UpdateLifetime(job_id, job.GetExpirationTime(m_Timeout, 02163 m_RunTimeout)); 02164 02165 if (status == CNetScheduleAPI::eRunning) { 02166 if (new_status == CNetScheduleAPI::ePending) { 02167 // Timeout and reschedule, put to blacklist as well 02168 m_ClientsRegistry.ClearExecutingSetBlacklist(job_id); 02169 m_StatisticsCounters.CountTransition(CNetScheduleAPI::eRunning, 02170 CNetScheduleAPI::ePending, 02171 CStatisticsCounters::eTimeout); 02172 } 02173 else { 02174 m_ClientsRegistry.ClearExecuting(job_id); 02175 m_StatisticsCounters.CountTransition(CNetScheduleAPI::eRunning, 02176 CNetScheduleAPI::eFailed, 02177 CStatisticsCounters::eTimeout); 02178 } 02179 } else { 02180 if (new_status == CNetScheduleAPI::eDone) { 02181 // Timeout and reschedule for reading, put to the blacklist 02182 m_ClientsRegistry.ClearReadingSetBlacklist(job_id); 02183 m_StatisticsCounters.CountTransition(CNetScheduleAPI::eReading, 02184 CNetScheduleAPI::eDone, 02185 CStatisticsCounters::eTimeout); 02186 } 02187 else { 02188 m_ClientsRegistry.ClearReading(job_id); 02189 m_StatisticsCounters.CountTransition(CNetScheduleAPI::eReading, 02190 CNetScheduleAPI::eReadFailed, 02191 CStatisticsCounters::eTimeout); 02192 } 02193 } 02194 02195 if (new_status == CNetScheduleAPI::ePending) 02196 m_NotificationsList.Notify(job_id, 02197 job.GetAffinityId(), m_ClientsRegistry, 02198 m_AffinityRegistry, m_NotifHifreqPeriod); 02199 02200 }} 02201 02202 if (new_status == CNetScheduleAPI::eFailed && job.ShouldNotify(time(0))) 02203 m_NotificationsList.NotifyJobStatus(job.GetSubmAddr(), 02204 job.GetSubmNotifPort(), 02205 MakeKey(job_id), 02206 job.GetStatus()); 02207 02208 if (logging) 02209 { 02210 CTime tm_start; 02211 tm_start.SetTimeT(time_start); 02212 tm_start.ToLocalTime(); 02213 02214 CTime tm_exp; 02215 tm_exp.SetTimeT(exp_time); 02216 tm_exp.ToLocalTime(); 02217 02218 string purpose; 02219 if (status == CNetScheduleAPI::eRunning) 02220 purpose = "execution"; 02221 else 02222 purpose = "reading"; 02223 02224 GetDiagContext().Extra() 02225 .Print("msg", "Timeout expired, rescheduled for " + purpose) 02226 .Print("msg_code", "410") // The code is for searching in applog 02227 .Print("job_key", MakeKey(job.GetId())) 02228 .Print("queue", m_QueueName) 02229 .Print("run_counter", job.GetRunCount()) 02230 .Print("read_counter", job.GetReadCount()) 02231 .Print("time_start", tm_start.AsString()) 02232 .Print("exp_time", tm_exp.AsString()) 02233 .Print("run_timeout", run_timeout); 02234 } 02235 } 02236 02237 02238 // Checks up to given # of jobs at the given status for expiration and 02239 // marks up to given # of jobs for deletion. 02240 // Returns the # of performed scans, the # of jobs marked for deletion and 02241 // the last scanned job id. 02242 SPurgeAttributes CQueue::CheckJobsExpiry(time_t current_time, 02243 SPurgeAttributes attributes, 02244 unsigned int last_job, 02245 TJobStatus status) 02246 { 02247 TNSBitVector job_ids; 02248 SPurgeAttributes result; 02249 unsigned int job_id; 02250 unsigned int aff; 02251 unsigned int group; 02252 02253 result.job_id = attributes.job_id; 02254 result.deleted = 0; 02255 {{ 02256 CFastMutexGuard guard(m_OperationLock); 02257 02258 for (result.scans = 0; result.scans < attributes.scans; ++result.scans) { 02259 job_id = m_StatusTracker.GetNext(status, result.job_id); 02260 if (job_id == 0) 02261 break; // No more jobs in the state 02262 if (last_job != 0 && job_id >= last_job) 02263 break; // The job in the state is above the limit 02264 02265 result.job_id = job_id; 02266 02267 if (m_GCRegistry.DeleteIfTimedOut(job_id, current_time, 02268 &aff, &group)) 02269 { 02270 // The job is expired and needs to be marked for deletion 02271 m_StatusTracker.Erase(job_id); 02272 job_ids.set_bit(job_id); 02273 ++result.deleted; 02274 02275 // check if the affinity should also be updated 02276 if (aff != 0) 02277 m_AffinityRegistry.RemoveJobFromAffinity(job_id, aff); 02278 02279 // Check if the group registry should also be updated 02280 if (group != 0) 02281 m_GroupRegistry.RemoveJob(group, job_id); 02282 02283 if (result.deleted >= attributes.deleted) 02284 break; 02285 } 02286 } 02287 }} 02288 02289 if (result.deleted > 0) 02290 x_Erase(job_ids); 02291 02292 return result; 02293 } 02294 02295 02296 void CQueue::TimeLineMove(unsigned job_id, time_t old_time, time_t new_time) 02297 { 02298 if (!job_id || !m_RunTimeLine) 02299 return; 02300 02301 CWriteLockGuard guard(m_RunTimeLineLock); 02302 m_RunTimeLine->MoveObject(old_time, new_time, job_id); 02303 } 02304 02305 02306 void CQueue::TimeLineAdd(unsigned job_id, time_t job_time) 02307 { 02308 if (!job_id || !m_RunTimeLine || !job_time) 02309 return; 02310 02311 CWriteLockGuard guard(m_RunTimeLineLock); 02312 m_RunTimeLine->AddObject(job_time, job_id); 02313 } 02314 02315 02316 void CQueue::TimeLineRemove(unsigned job_id) 02317 { 02318 if (!m_RunTimeLine) 02319 return; 02320 02321 CWriteLockGuard guard(m_RunTimeLineLock); 02322 m_RunTimeLine->RemoveObject(job_id); 02323 } 02324 02325 02326 void CQueue::TimeLineExchange(unsigned remove_job_id, 02327 unsigned add_job_id, 02328 time_t new_time) 02329 { 02330 if (!m_RunTimeLine) 02331 return; 02332 02333 CWriteLockGuard guard(m_RunTimeLineLock); 02334 if (remove_job_id) 02335 m_RunTimeLine->RemoveObject(remove_job_id); 02336 if (add_job_id) 02337 m_RunTimeLine->AddObject(new_time, add_job_id); 02338 } 02339 02340 02341 unsigned int CQueue::DeleteBatch(unsigned int max_deleted) 02342 { 02343 // Copy the vector with deleted jobs 02344 TNSBitVector jobs_to_delete; 02345 02346 {{ 02347 CFastMutexGuard guard(m_JobsToDeleteLock); 02348 jobs_to_delete = m_JobsToDelete; 02349 }} 02350 02351 static const size_t chunk_size = 100; 02352 unsigned int del_rec = 0; 02353 TNSBitVector::enumerator en = jobs_to_delete.first(); 02354 TNSBitVector deleted_jobs; 02355 02356 while (en.valid() && del_rec < max_deleted) { 02357 {{ 02358 CFastMutexGuard guard(m_OperationLock); 02359 CNSTransaction transaction(this); 02360 02361 for (size_t n = 0; 02362 en.valid() && n < chunk_size && del_rec < max_deleted; 02363 ++en, ++n) { 02364 unsigned int job_id = *en; 02365 02366 try { 02367 m_QueueDbBlock->job_db.id = job_id; 02368 m_QueueDbBlock->job_db.Delete(); 02369 ++del_rec; 02370 deleted_jobs.set_bit(job_id, true); 02371 } catch (CBDB_ErrnoException& ex) { 02372 ERR_POST("BDB error " << ex.what()); 02373 } 02374 02375 try { 02376 m_QueueDbBlock->job_info_db.id = job_id; 02377 m_QueueDbBlock->job_info_db.Delete(); 02378 } catch (CBDB_ErrnoException& ex) { 02379 ERR_POST("BDB error " << ex.what()); 02380 } 02381 02382 x_DeleteJobEvents(job_id); 02383 } 02384 02385 transaction.Commit(); 02386 }} 02387 } 02388 02389 if (del_rec > 0) { 02390 m_StatisticsCounters.CountDBDeletion(del_rec); 02391 02392 TNSBitVector::enumerator en = deleted_jobs.first(); 02393 CFastMutexGuard guard(m_JobsToDeleteLock); 02394 for (; en.valid(); ++en) 02395 m_JobsToDelete.set_bit(*en, false); 02396 } 02397 return del_rec; 02398 } 02399 02400 02401 // See CXX-2838 for the description of how affinities garbage collection is 02402 // going to work. 02403 unsigned int CQueue::PurgeAffinities(void) 02404 { 02405 unsigned int aff_dict_size = m_AffinityRegistry.size(); 02406 02407 if (aff_dict_size < (m_AffinityLowMarkPercentage / 100.0) * m_MaxAffinities) 02408 // Did not reach the dictionary low mark 02409 return 0; 02410 02411 unsigned int del_limit = m_AffinityHighRemoval; 02412 if (aff_dict_size < (m_AffinityHighMarkPercentage / 100.0) * m_MaxAffinities) { 02413 // Here: check the percentage of the affinities that have no references 02414 // to them 02415 unsigned int candidates_size = m_AffinityRegistry.CheckRemoveCandidates(); 02416 02417 if (candidates_size < (m_AffinityDirtPercentage / 100.0) * m_MaxAffinities) 02418 // The number of candidates to be deleted is low 02419 return 0; 02420 02421 del_limit = m_AffinityLowRemoval; 02422 } 02423 02424 02425 // Here: need to delete affinities from the memory and DB 02426 CFastMutexGuard guard(m_OperationLock); 02427 CNSTransaction transaction(this); 02428 02429 unsigned int del_count = m_AffinityRegistry.CollectGarbage(del_limit); 02430 transaction.Commit(); 02431 02432 return del_count; 02433 } 02434 02435 02436 unsigned int CQueue::PurgeGroups(void) 02437 { 02438 CFastMutexGuard guard(m_OperationLock); 02439 CNSTransaction transaction(this); 02440 02441 unsigned int del_count = m_GroupRegistry.CollectGarbage(100); 02442 transaction.Commit(); 02443 02444 return del_count; 02445 } 02446 02447 02448 void CQueue::PurgeWNodes(time_t current_time) 02449 { 02450 // Clears the worker nodes affinities if the workers are inactive for 02451 // certain time 02452 CFastMutexGuard guard(m_OperationLock); 02453 02454 vector< pair< unsigned int, 02455 unsigned short > > notif_to_reset = 02456 m_ClientsRegistry.Purge( 02457 current_time, m_WNodeTimeout, 02458 m_AffinityRegistry); 02459 for (vector< pair< unsigned int, 02460 unsigned short > >::const_iterator k = notif_to_reset.begin(); 02461 k != notif_to_reset.end(); ++k) 02462 m_NotificationsList.UnregisterListener(k->first, k->second); 02463 02464 return; 02465 } 02466 02467 02468 void CQueue::x_DeleteJobEvents(unsigned int job_id) 02469 { 02470 try { 02471 for (unsigned int event_number = 0; ; ++event_number) { 02472 m_QueueDbBlock->events_db.id = job_id; 02473 m_QueueDbBlock->events_db.event_id = event_number; 02474 if (m_QueueDbBlock->events_db.Delete() == eBDB_NotFound) 02475 break; 02476 } 02477 } catch (CBDB_ErrnoException& ex) { 02478 ERR_POST("BDB error " << ex.what()); 02479 } 02480 } 02481 02482 02483 CBDB_FileCursor& CQueue::GetEventsCursor() 02484 { 02485 CBDB_Transaction* trans = m_QueueDbBlock->events_db.GetBDBTransaction(); 02486 CBDB_FileCursor* cur = m_EventsCursor.get(); 02487 if (!cur) { 02488 cur = new CBDB_FileCursor(m_QueueDbBlock->events_db); 02489 m_EventsCursor.reset(cur); 02490 } else { 02491 cur->Close(); 02492 } 02493 cur->ReOpen(trans); 02494 return *cur; 02495 } 02496 02497 02498 void CQueue::PrintSubmHosts(CNetScheduleHandler & handler) const 02499 { 02500 string hosts; 02501 02502 {{ 02503 CQueueParamAccessor qp(*this); 02504 hosts = qp.GetSubmHosts().Print("OK:", "\n"); 02505 }} 02506 02507 if (!hosts.empty()) 02508 hosts += "\n"; 02509 handler.WriteMessage(hosts); 02510 } 02511 02512 02513 void CQueue::PrintWNodeHosts(CNetScheduleHandler & handler) const 02514 { 02515 string hosts; 02516 02517 {{ 02518 CQueueParamAccessor qp(*this); 02519 hosts = qp.GetWnodeHosts().Print("OK:", "\n"); 02520 }} 02521 02522 if (!hosts.empty()) 02523 hosts += "\n"; 02524 handler.WriteMessage(hosts); 02525 } 02526 02527 02528 void CQueue::x_PrintShortJobStat(CNetScheduleHandler & handler, 02529 const CJob & job) 02530 { 02531 string reply = MakeKey(job.GetId()) + "\t" + 02532 CNetScheduleAPI::StatusToString(job.GetStatus()) + "\t" + 02533 job.GetQuotedInput() + "\t" + 02534 job.GetQuotedOutput() + "\t"; 02535 02536 const CJobEvent * last_event = job.GetLastEvent(); 02537 if (last_event) 02538 reply += last_event->GetQuotedErrorMsg(); 02539 else 02540 reply += "''"; 02541 handler.WriteMessage("OK:", reply); 02542 } 02543 02544 02545 size_t CQueue::PrintJobDbStat(CNetScheduleHandler & handler, 02546 unsigned job_id) 02547 { 02548 // Check first that the job has not been deleted yet 02549 {{ 02550 CFastMutexGuard guard(m_JobsToDeleteLock); 02551 if (m_JobsToDelete[job_id]) 02552 return 0; 02553 }} 02554 02555 CJob job; 02556 {{ 02557 CFastMutexGuard guard(m_OperationLock); 02558 02559 m_QueueDbBlock->job_db.SetTransaction(NULL); 02560 m_QueueDbBlock->events_db.SetTransaction(NULL); 02561 m_QueueDbBlock->job_info_db.SetTransaction(NULL); 02562 02563 CJob::EJobFetchResult res = job.Fetch(this, job_id); 02564 if (res != CJob::eJF_Ok) 02565 return 0; 02566 }} 02567 02568 job.Print(handler, *this, m_AffinityRegistry, m_GroupRegistry); 02569 return 1; 02570 } 02571 02572 02573 void CQueue::PrintAllJobDbStat(CNetScheduleHandler & handler, 02574 const string & group, 02575 TJobStatus job_status, 02576 unsigned int start_after_job_id, 02577 unsigned int count) 02578 { 02579 // Form a bit vector of all jobs to dump 02580 vector<CNetScheduleAPI::EJobStatus> statuses; 02581 TNSBitVector jobs_to_dump; 02582 02583 if (job_status == CNetScheduleAPI::eJobNotFound) { 02584 // All statuses 02585 statuses.push_back(CNetScheduleAPI::ePending); 02586 statuses.push_back(CNetScheduleAPI::eRunning); 02587 statuses.push_back(CNetScheduleAPI::eCanceled); 02588 statuses.push_back(CNetScheduleAPI::eFailed); 02589 statuses.push_back(CNetScheduleAPI::eDone); 02590 statuses.push_back(CNetScheduleAPI::eReading); 02591 statuses.push_back(CNetScheduleAPI::eConfirmed); 02592 statuses.push_back(CNetScheduleAPI::eReadFailed); 02593 } 02594 else { 02595 // The user specified one state explicitly 02596 statuses.push_back(job_status); 02597 } 02598 02599 02600 {{ 02601 CFastMutexGuard guard(m_OperationLock); 02602 02603 jobs_to_dump = m_StatusTracker.GetJobs(statuses); 02604 }} 02605 02606 // Check if a certain group has been specified 02607 if (!group.empty()) 02608 jobs_to_dump &= m_GroupRegistry.GetJobs(group); 02609 02610 x_DumpJobs(handler, jobs_to_dump, start_after_job_id, count); 02611 return; 02612 } 02613 02614 02615 void CQueue::x_DumpJobs(CNetScheduleHandler & handler, 02616 const TNSBitVector & jobs_to_dump, 02617 unsigned int start_after_job_id, 02618 unsigned int count) 02619 { 02620 // Skip the jobs which should not be dumped 02621 TNSBitVector::enumerator en(jobs_to_dump.first()); 02622 while (en.valid() && *en <= start_after_job_id) 02623 ++en; 02624 02625 // Identify the required buffer size for jobs 02626 size_t buffer_size = m_DumpBufferSize; 02627 if (count != 0 && count < buffer_size) 02628 buffer_size = count; 02629 02630 {{ 02631 CJob buffer[buffer_size]; 02632 02633 size_t read_jobs = 0; 02634 size_t printed_count = 0; 02635 02636 for ( ; en.valid(); ) { 02637 {{ 02638 CFastMutexGuard guard(m_OperationLock); 02639 m_QueueDbBlock->job_db.SetTransaction(NULL); 02640 m_QueueDbBlock->events_db.SetTransaction(NULL); 02641 m_QueueDbBlock->job_info_db.SetTransaction(NULL); 02642 02643 for ( ; en.valid() && read_jobs < buffer_size; ++en ) 02644 if (buffer[read_jobs].Fetch(this, *en) == CJob::eJF_Ok) { 02645 ++read_jobs; 02646 ++printed_count; 02647 02648 if (count != 0) 02649 if (printed_count >= count) 02650 break; 02651 } 02652 }} 02653 02654 // Print what was read 02655 for (size_t index = 0; index < read_jobs; ++index) { 02656 handler.WriteMessage(""); 02657 buffer[index].Print(handler, *this, 02658 m_AffinityRegistry, m_GroupRegistry); 02659 } 02660 02661 if (count != 0) 02662 if (printed_count >= count) 02663 break; 02664 02665 read_jobs = 0; 02666 } 02667 }} 02668 02669 return; 02670 } 02671 02672 02673 unsigned CQueue::CountStatus(TJobStatus st) const 02674 { 02675 return m_StatusTracker.CountStatus(st); 02676 } 02677 02678 02679 void CQueue::StatusStatistics(TJobStatus status, 02680 TNSBitVector::statistics* st) const 02681 { 02682 m_StatusTracker.StatusStatistics(status, st); 02683 } 02684 02685 02686 void CQueue::TouchClientsRegistry(CNSClientId & client) 02687 { 02688 TNSBitVector running_jobs; 02689 TNSBitVector reading_jobs; 02690 02691 {{ 02692 CFastMutexGuard guard(m_OperationLock); 02693 unsigned short wait_port = m_ClientsRegistry.Touch( 02694 client, 02695 m_AffinityRegistry, 02696 running_jobs, 02697 reading_jobs); 02698 if (wait_port != 0) 02699 m_NotificationsList.UnregisterListener(client, wait_port); 02700 }} 02701 02702 02703 if (running_jobs.any()) 02704 x_ResetRunningDueToNewSession(client, running_jobs); 02705 if (reading_jobs.any()) 02706 x_ResetReadingDueToNewSession(client, reading_jobs); 02707 return; 02708 } 02709 02710 02711 // Moves the job to Pending/Failed or to Done/ReadFailed 02712 // when event event_type has come 02713 TJobStatus CQueue::x_ResetDueTo(const CNSClientId & client, 02714 unsigned int job_id, 02715 time_t current_time, 02716 TJobStatus status_from, 02717 CJobEvent::EJobEvent event_type) 02718 { 02719 CJob job; 02720 TJobStatus new_status; 02721 02722 CFastMutexGuard guard(m_OperationLock); 02723 02724 {{ 02725 CNSTransaction transaction(this); 02726 02727 if (job.Fetch(this, job_id) != CJob::eJF_Ok) { 02728 ERR_POST("Cannot fetch job to reset it due to " << 02729 CJobEvent::EventToString(event_type) << 02730 ". Job: " << DecorateJobId(job_id)); 02731 return CNetScheduleAPI::eJobNotFound; 02732 } 02733 02734 if (status_from == CNetScheduleAPI::eRunning) { 02735 // The job was running 02736 if (job.GetRunCount() > m_FailedRetries) 02737 new_status = CNetScheduleAPI::eFailed; 02738 else 02739 new_status = CNetScheduleAPI::ePending; 02740 } else { 02741 // The job was reading 02742 if (job.GetReadCount() > m_FailedRetries) 02743 new_status = CNetScheduleAPI::eReadFailed; 02744 else 02745 new_status = CNetScheduleAPI::eDone; 02746 } 02747 02748 job.SetStatus(new_status); 02749 job.SetLastTouch(current_time); 02750 02751 CJobEvent * event = &job.AppendEvent(); 02752 event->SetStatus(new_status); 02753 event->SetEvent(event_type); 02754 event->SetTimestamp(current_time); 02755 event->SetClientNode(client.GetNode()); 02756 event->SetClientSession(client.GetSession()); 02757 02758 job.Flush(this); 02759 transaction.Commit(); 02760 }} 02761 02762 // Update the memory map 02763 m_StatusTracker.SetStatus(job_id, new_status); 02764 02765 m_GCRegistry.UpdateLifetime(job_id, job.GetExpirationTime(m_Timeout, 02766 m_RunTimeout)); 02767 02768 // remove the job from the time line 02769 TimeLineRemove(job_id); 02770 02771 // Notify those who wait for the jobs if needed 02772 if (new_status == CNetScheduleAPI::ePending) 02773 m_NotificationsList.Notify(job_id, job.GetAffinityId(), 02774 m_ClientsRegistry, 02775 m_AffinityRegistry, 02776 m_NotifHifreqPeriod); 02777 02778 return new_status; 02779 } 02780 02781 02782 void CQueue::x_ResetRunningDueToClear(const CNSClientId & client, 02783 const TNSBitVector & jobs) 02784 { 02785 time_t current_time = time(0); 02786 for (TNSBitVector::enumerator en(jobs.first()); en.valid(); ++en) { 02787 try { 02788 TJobStatus new_status = x_ResetDueTo(client, *en, 02789 current_time, 02790 CNetScheduleAPI::eRunning, 02791 CJobEvent::eClear); 02792 if (new_status != CNetScheduleAPI::eJobNotFound) 02793 m_StatisticsCounters.CountTransition( 02794 CNetScheduleAPI::eRunning, 02795 new_status, 02796 CStatisticsCounters::eClear); 02797 } catch (...) { 02798 ERR_POST("Error resetting a running job when worker node is " 02799 "cleared. Job: " << DecorateJobId(*en)); 02800 } 02801 } 02802 } 02803 02804 02805 void CQueue::x_ResetReadingDueToClear(const CNSClientId & client, 02806 const TNSBitVector & jobs) 02807 { 02808 time_t current_time = time(0); 02809 for (TNSBitVector::enumerator en(jobs.first()); en.valid(); ++en) { 02810 try { 02811 TJobStatus new_status = x_ResetDueTo(client, *en, 02812 current_time, 02813 CNetScheduleAPI::eReading, 02814 CJobEvent::eClear); 02815 if (new_status != CNetScheduleAPI::eJobNotFound) 02816 m_StatisticsCounters.CountTransition( 02817 CNetScheduleAPI::eReading, 02818 new_status, 02819 CStatisticsCounters::eClear); 02820 } catch (...) { 02821 ERR_POST("Error resetting a reading job when worker node is " 02822 "cleared. Job: " << DecorateJobId(*en)); 02823 } 02824 } 02825 } 02826 02827 02828 void CQueue::x_ResetRunningDueToNewSession(const CNSClientId & client, 02829 const TNSBitVector & jobs) 02830 { 02831 time_t current_time = time(0); 02832 for (TNSBitVector::enumerator en(jobs.first()); en.valid(); ++en) { 02833 try { 02834 TJobStatus new_status = x_ResetDueTo(client, *en, 02835 current_time, 02836 CNetScheduleAPI::eRunning, 02837 CJobEvent::eSessionChanged); 02838 if (new_status != CNetScheduleAPI::eJobNotFound) 02839 m_StatisticsCounters.CountTransition( 02840 CNetScheduleAPI::eRunning, 02841 new_status, 02842 CStatisticsCounters::eNewSession); 02843 } catch (...) { 02844 ERR_POST("Error resetting a running job when worker node " 02845 "changed session. Job: " << DecorateJobId(*en)); 02846 } 02847 } 02848 } 02849 02850 02851 void CQueue::x_ResetReadingDueToNewSession(const CNSClientId & client, 02852 const TNSBitVector & jobs) 02853 { 02854 time_t current_time = time(0); 02855 for (TNSBitVector::enumerator en(jobs.first()); en.valid(); ++en) { 02856 try { 02857 TJobStatus new_status = x_ResetDueTo(client, *en, 02858 current_time, 02859 CNetScheduleAPI::eReading, 02860 CJobEvent::eSessionChanged); 02861 if (new_status != CNetScheduleAPI::eJobNotFound) 02862 m_StatisticsCounters.CountTransition( 02863 CNetScheduleAPI::eReading, 02864 new_status, 02865 CStatisticsCounters::eNewSession); 02866 } catch (...) { 02867 ERR_POST("Error resetting a reading job when worker node " 02868 "changed session. Job: " << DecorateJobId(*en)); 02869 } 02870 } 02871 } 02872 02873 02874 void CQueue::x_RegisterGetListener(const CNSClientId & client, 02875 unsigned short port, 02876 unsigned int timeout, 02877 const TNSBitVector & aff_ids, 02878 bool wnode_aff, 02879 bool any_aff, 02880 bool exclusive_new_affinity, 02881 bool new_format) 02882 { 02883 // Add to the notification list and save the wait port 02884 m_NotificationsList.RegisterListener(client, port, timeout, 02885 wnode_aff, any_aff, 02886 exclusive_new_affinity, new_format); 02887 if (client.IsComplete()) 02888 m_ClientsRegistry.SetWaiting(client, port, aff_ids, m_AffinityRegistry); 02889 return; 02890 } 02891 02892 02893 bool CQueue::x_UnregisterGetListener(const CNSClientId & client, 02894 unsigned short port) 02895 { 02896 unsigned short notif_port = port; 02897 02898 if (client.IsComplete()) 02899 // New clients have the port in their registry record 02900 notif_port = m_ClientsRegistry.ResetWaiting(client, m_AffinityRegistry); 02901 02902 if (notif_port > 0) { 02903 m_NotificationsList.UnregisterListener(client, notif_port); 02904 return true; 02905 } 02906 02907 return false; 02908 } 02909 02910 02911 void CQueue::PrintStatistics(size_t & aff_count) 02912 { 02913 size_t affinities = m_AffinityRegistry.size(); 02914 aff_count += affinities; 02915 02916 // The member is called only if there is a request context 02917 CDiagContext_Extra extra = GetDiagContext().Extra() 02918 .Print("queue", GetQueueName()) 02919 .Print("affinities", affinities) 02920 .Print("pending", CountStatus(CNetScheduleAPI::ePending)) 02921 .Print("running", CountStatus(CNetScheduleAPI::eRunning)) 02922 .Print("canceled", CountStatus(CNetScheduleAPI::eCanceled)) 02923 .Print("failed", CountStatus(CNetScheduleAPI::eFailed)) 02924 .Print("done", CountStatus(CNetScheduleAPI::eDone)) 02925 .Print("reading", CountStatus(CNetScheduleAPI::eReading)) 02926 .Print("confirmed", CountStatus(CNetScheduleAPI::eConfirmed)) 02927 .Print("readfailed", CountStatus(CNetScheduleAPI::eReadFailed)); 02928 m_StatisticsCounters.PrintTransitions(extra); 02929 extra.Flush(); 02930 } 02931 02932 02933 void CQueue::PrintTransitionCounters(CNetScheduleHandler & handler) 02934 { 02935 m_StatisticsCounters.PrintTransitions(handler); 02936 handler.WriteMessage("OK:garbage_jobs: " + 02937 NStr::IntToString(m_JobsToDelete.count())); 02938 handler.WriteMessage("OK:affinity_registry_size: " + 02939 NStr::SizetToString(m_AffinityRegistry.size())); 02940 handler.WriteMessage("OK:client_registry_size: " + 02941 NStr::SizetToString(m_ClientsRegistry.size())); 02942 return; 02943 } 02944 02945 02946 void CQueue::PrintJobsStat(CNetScheduleHandler & handler, 02947 const string & group_token, 02948 const string & aff_token) 02949 { 02950 size_t jobs_per_state[g_ValidJobStatusesSize]; 02951 TNSBitVector group_jobs; 02952 TNSBitVector aff_jobs; 02953 02954 {{ 02955 CFastMutexGuard guard(m_OperationLock); 02956 02957 if (!group_token.empty()) 02958 group_jobs = m_GroupRegistry.GetJobs(group_token); 02959 if (!aff_token.empty()) { 02960 unsigned int aff_id = m_AffinityRegistry.GetIDByToken(aff_token); 02961 if (aff_id == 0) 02962 NCBI_THROW(CNetScheduleException, eAffinityNotFound, 02963 "Unknown affinity token \"" + aff_token + "\""); 02964 02965 aff_jobs = m_AffinityRegistry.GetJobsWithAffinity(aff_id); 02966 } 02967 02968 for (size_t index(0); index < g_ValidJobStatusesSize; ++index) { 02969 TNSBitVector candidates = 02970 m_StatusTracker.GetJobs(g_ValidJobStatuses[index]); 02971 02972 if (!group_token.empty()) 02973 candidates &= group_jobs; 02974 if (!aff_token.empty()) 02975 candidates &= aff_jobs; 02976 02977 jobs_per_state[index] = candidates.count(); 02978 } 02979 }} 02980 02981 size_t total = 0; 02982 for (size_t index(0); index < g_ValidJobStatusesSize; ++index) { 02983 handler.WriteMessage("OK:" + CNetScheduleAPI::StatusToString(g_ValidJobStatuses[index]) + 02984 ": " + NStr::SizetToString(jobs_per_state[index])); 02985 total += jobs_per_state[index]; 02986 } 02987 handler.WriteMessage("OK:Total: " + 02988 NStr::SizetToString(total)); 02989 return; 02990 } 02991 02992 02993 unsigned int CQueue::CountActiveJobs(void) const 02994 { 02995 vector<CNetScheduleAPI::EJobStatus> statuses; 02996 02997 statuses.push_back(CNetScheduleAPI::ePending); 02998 statuses.push_back(CNetScheduleAPI::eRunning); 02999 return m_StatusTracker.CountStatus(statuses); 03000 } 03001 03002 03003 void CQueue::x_UpdateDB_PutResultNoLock(unsigned job_id, 03004 const string & auth_token, 03005 time_t curr, 03006 int ret_code, 03007 const string & output, 03008 CJob & job, 03009 const CNSClientId & client) 03010 { 03011 if (job.Fetch(this, job_id) != CJob::eJF_Ok) 03012 NCBI_THROW(CNetScheduleException, eInternalError, 03013 "Error fetching job: " + DecorateJobId(job_id)); 03014 03015 if (!auth_token.empty()) { 03016 // Need to check authorization token first 03017 CJob::EAuthTokenCompareResult token_compare_result = 03018 job.CompareAuthToken(auth_token); 03019 if (token_compare_result == CJob::eInvalidTokenFormat) 03020 NCBI_THROW(CNetScheduleException, eInvalidAuthToken, 03021 "Invalid authorization token format"); 03022 if (token_compare_result == CJob::eNoMatch) 03023 NCBI_THROW(CNetScheduleException, eInvalidAuthToken, 03024 "Authorization token does not match"); 03025 if (token_compare_result == CJob::ePassportOnlyMatch) { 03026 // That means that the job has been executing by another worker 03027 // node at the moment, but we can accept the results anyway 03028 LOG_POST(Message << Warning << "Received PUT2 with only " 03029 "passport matched."); 03030 } 03031 // Here: the authorization token is OK, we can continue 03032 } 03033 03034 // Append the event 03035 CJobEvent * event = &job.AppendEvent(); 03036 event->SetStatus(CNetScheduleAPI::eDone); 03037 event->SetEvent(CJobEvent::eDone); 03038 event->SetTimestamp(curr); 03039 event->SetRetCode(ret_code); 03040 03041 event->SetClientNode(client.GetNode()); 03042 event->SetClientSession(client.GetSession()); 03043 event->SetNodeAddr(client.GetAddress()); 03044 03045 job.SetStatus(CNetScheduleAPI::eDone); 03046 job.SetOutput(output); 03047 job.SetLastTouch(curr); 03048 03049 job.Flush(this); 03050 return; 03051 } 03052 03053 03054 // If the job.job_id != 0 => the job has been read successfully 03055 // Exception => DB errors 03056 // Return: true => job is ok to get 03057 // false => job is expired and was marked for deletion, 03058 // need to get another job 03059 bool CQueue::x_UpdateDB_GetJobNoLock(const CNSClientId & client, 03060 time_t curr, 03061 unsigned int job_id, 03062 CJob & job) 03063 { 03064 CNSTransaction transaction(this); 03065 03066 if (job.Fetch(this, job_id) != CJob::eJF_Ok) 03067 NCBI_THROW(CNetScheduleException, eInternalError, 03068 "Cannot read job info from DB"); 03069 03070 // The job has been read successfully, check if it is still within the 03071 // expiration timeout; note: run_timeout is not applicable because the job 03072 // is guaranteed in the pending state 03073 if (job.GetExpirationTime(m_Timeout, 0) <= curr) { 03074 // The job has expired, so mark it for deletion 03075 EraseJob(job_id); 03076 03077 if (job.GetAffinityId() != 0) 03078 m_AffinityRegistry.RemoveJobFromAffinity( 03079 job_id, job.GetAffinityId()); 03080 if (job.GetGroupId() != 0) 03081 m_GroupRegistry.RemoveJob(job.GetGroupId(), job_id); 03082 03083 return false; 03084 } 03085 03086 unsigned run_count = job.GetRunCount() + 1; 03087 CJobEvent & event = job.AppendEvent(); 03088 03089 event.SetStatus(CNetScheduleAPI::eRunning); 03090 event.SetEvent(CJobEvent::eRequest); 03091 event.SetTimestamp(curr); 03092 03093 event.SetClientNode(client.GetNode()); 03094 event.SetClientSession(client.GetSession()); 03095 event.SetNodeAddr(client.GetAddress()); 03096 03097 job.SetStatus(CNetScheduleAPI::eRunning); 03098 job.SetRunTimeout(0); 03099 job.SetRunCount(run_count); 03100 job.SetLastTouch(curr); 03101 03102 job.Flush(this); 03103 transaction.Commit(); 03104 03105 return true; 03106 } 03107 03108 03109 END_NCBI_SCOPE 03110
1.7.5.1
Modified on Wed May 23 12:58:46 2012 by modify_doxy.py rev. 337098