NCBI C++ ToolKit
wn_main_loop.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

00001 /*  $Id: wn_main_loop.cpp 64298 2014-09-02 20:23:37Z kazimird $
00002  * ===========================================================================
00003  *
00004  *                            PUBLIC DOMAIN NOTICE
00005  *               National Center for Biotechnology Information
00006  *
00007  *  This software/database is a "United States Government Work" under the
00008  *  terms of the United States Copyright Act.  It was written as part of
00009  *  the author's official duties as a United States Government employee and
00010  *  thus cannot be copyrighted.  This software/database is freely available
00011  *  to the public for use. The National Library of Medicine and the U.S.
00012  *   Government have not placed any restriction on its use or reproduction.
00013  *
00014  *  Although all reasonable efforts have been taken to ensure the accuracy
00015  *  and reliability of the software and data, the NLM and the U.S.
00016  *  Government do not and cannot warrant the performance or results that
00017  *  may be obtained by using this software or data. The NLM and the U.S.
00018  *  Government disclaim all warranties, express or implied, including
00019  *  warranties of performance, merchantability or fitness for any particular
00020  *  purpose.
00021  *
00022  *  Please cite the author in any work or product based on this material.
00023  *
00024  * ===========================================================================
00025  *
00026  * Authors:  Maxim Didenko, Anatoliy Kuznetsov, Dmitry Kazimirov
00027  *
00028  * File Description:
00029  *    NetSchedule Worker Node implementation
00030  */
00031 
00032 #include <ncbi_pch.hpp>
00033 
00034 #include "wn_commit_thread.hpp"
00035 #include "wn_cleanup.hpp"
00036 #include "grid_worker_impl.hpp"
00037 #include "netschedule_api_impl.hpp"
00038 
00039 #include <connect/services/grid_globals.hpp>
00040 #include <connect/services/grid_worker_app.hpp>
00041 #include <connect/services/grid_rw_impl.hpp>
00042 #include <connect/services/ns_job_serializer.hpp>
00043 
00044 #include <corelib/rwstream.hpp>
00045 
00046 
00047 #define NCBI_USE_ERRCODE_X   ConnServ_WorkerNode
00048 
00049 BEGIN_NCBI_SCOPE
00050 
00051 /////////////////////////////////////////////////////////////////////////////
00052 //
00053 //     CWorkerNodeJobContext     --
00054 
00055 
00056 const CNetScheduleJob& CWorkerNodeJobContext::GetJob() const
00057 {
00058     return m_Impl->m_Job;
00059 }
00060 
00061 CNetScheduleJob& CWorkerNodeJobContext::GetJob()
00062 {
00063     return m_Impl->m_Job;
00064 }
00065 
00066 const string& CWorkerNodeJobContext::GetJobKey() const
00067 {
00068     return m_Impl->m_Job.job_id;
00069 }
00070 
00071 const string& CWorkerNodeJobContext::GetJobInput() const
00072 {
00073     return m_Impl->m_Job.input;
00074 }
00075 
00076 void CWorkerNodeJobContext::SetJobOutput(const string& output)
00077 {
00078     m_Impl->m_Job.output = output;
00079 }
00080 
00081 void CWorkerNodeJobContext::SetJobRetCode(int ret_code)
00082 {
00083     m_Impl->m_Job.ret_code = ret_code;
00084 }
00085 
00086 size_t CWorkerNodeJobContext::GetInputBlobSize() const
00087 {
00088     return m_Impl->m_InputBlobSize;
00089 }
00090 
00091 const string& CWorkerNodeJobContext::GetJobOutput() const
00092 {
00093     return m_Impl->m_Job.output;
00094 }
00095 
00096 CNetScheduleAPI::TJobMask CWorkerNodeJobContext::GetJobMask() const
00097 {
00098     return m_Impl->m_Job.mask;
00099 }
00100 
00101 unsigned int CWorkerNodeJobContext::GetJobNumber() const
00102 {
00103     return m_Impl->m_JobNumber;
00104 }
00105 
00106 bool CWorkerNodeJobContext::IsJobCommitted() const
00107 {
00108     return m_Impl->m_JobCommitted != eNotCommitted;
00109 }
00110 
00111 CWorkerNodeJobContext::ECommitStatus
00112         CWorkerNodeJobContext::GetCommitStatus() const
00113 {
00114     return m_Impl->m_JobCommitted;
00115 }
00116 
00117 bool CWorkerNodeJobContext::IsCanceled() const
00118 {
00119     return m_Impl->m_JobCommitted == eCanceled;
00120 }
00121 
00122 IWorkerNodeCleanupEventSource* CWorkerNodeJobContext::GetCleanupEventSource()
00123 {
00124     return m_Impl->m_CleanupEventSource;
00125 }
00126 
00127 CGridWorkerNode CWorkerNodeJobContext::GetWorkerNode() const
00128 {
00129     return m_Impl->m_WorkerNode;
00130 }
00131 
00132 SWorkerNodeJobContextImpl::SWorkerNodeJobContextImpl(
00133         SGridWorkerNodeImpl* worker_node) :
00134     m_WorkerNode(worker_node),
00135     m_CleanupEventSource(
00136             new CWorkerNodeJobCleanup(worker_node->m_CleanupEventSource)),
00137     m_RequestContext(new CRequestContext),
00138     m_StatusThrottler(1, CTimeSpan(worker_node->m_CheckStatusPeriod, 0)),
00139     m_ProgressMsgThrottler(1),
00140     m_NetScheduleExecutor(worker_node->m_NSExecutor),
00141     m_NetCacheAPI(worker_node->m_NetCacheAPI),
00142     m_JobGeneration(worker_node->m_CurrentJobGeneration),
00143     m_CommitExpiration(0, 0)
00144 {
00145 }
00146 
00147 const string& CWorkerNodeJobContext::GetQueueName() const
00148 {
00149     return m_Impl->m_WorkerNode->GetQueueName();
00150 }
00151 const string& CWorkerNodeJobContext::GetClientName() const
00152 {
00153     return m_Impl->m_WorkerNode->GetClientName();
00154 }
00155 
00156 CNcbiIstream& CWorkerNodeJobContext::GetIStream()
00157 {
00158     IReader* reader = new CStringOrBlobStorageReader(GetJobInput(),
00159             m_Impl->m_NetCacheAPI, &m_Impl->m_InputBlobSize);
00160     m_Impl->m_RStream.reset(new CRStream(reader, 0, 0,
00161             CRWStreambuf::fOwnReader | CRWStreambuf::fLeakExceptions));
00162     m_Impl->m_RStream->exceptions(IOS_BASE::badbit | IOS_BASE::failbit);
00163     return *m_Impl->m_RStream;
00164 }
00165 CNcbiOstream& CWorkerNodeJobContext::GetOStream()
00166 {
00167     m_Impl->m_Writer.reset(new CStringOrBlobStorageWriter(
00168             m_Impl->m_WorkerNode->m_QueueEmbeddedOutputSize,
00169                     m_Impl->m_NetCacheAPI, GetJob().output));
00170     m_Impl->m_WStream.reset(new CWStream(m_Impl->m_Writer.get(), 0, 0,
00171             CRWStreambuf::fLeakExceptions));
00172     m_Impl->m_WStream->exceptions(IOS_BASE::badbit | IOS_BASE::failbit);
00173     return *m_Impl->m_WStream;
00174 }
00175 
00176 void CWorkerNodeJobContext::CloseStreams()
00177 {
00178     try {
00179         m_Impl->m_ProgressMsgThrottler.Reset(1);
00180         m_Impl->m_StatusThrottler.Reset(1,
00181                 CTimeSpan(m_Impl->m_WorkerNode->m_CheckStatusPeriod, 0));
00182 
00183         m_Impl->m_RStream.reset();
00184         m_Impl->m_WStream.reset();
00185 
00186         if (m_Impl->m_Writer.get() != NULL) {
00187             m_Impl->m_Writer->Close();
00188             m_Impl->m_Writer.reset();
00189         }
00190     }
00191     NCBI_CATCH_ALL_X(61, "Could not close IO streams");
00192 }
00193 
00194 void CWorkerNodeJobContext::CommitJob()
00195 {
00196     m_Impl->CheckIfCanceled();
00197     m_Impl->m_JobCommitted = eDone;
00198 }
00199 
00200 void CWorkerNodeJobContext::CommitJobWithFailure(const string& err_msg)
00201 {
00202     m_Impl->CheckIfCanceled();
00203     m_Impl->m_JobCommitted = eFailure;
00204     m_Impl->m_Job.error_msg = err_msg;
00205 }
00206 
00207 void CWorkerNodeJobContext::ReturnJob()
00208 {
00209     m_Impl->CheckIfCanceled();
00210     m_Impl->m_JobCommitted = eReturn;
00211 }
00212 
00213 void CWorkerNodeJobContext::RescheduleJob(
00214         const string& affinity, const string& group)
00215 {
00216     m_Impl->CheckIfCanceled();
00217     m_Impl->m_JobCommitted = eRescheduled;
00218     m_Impl->m_Job.affinity = affinity;
00219     m_Impl->m_Job.group = group;
00220 }
00221 
00222 void CWorkerNodeJobContext::PutProgressMessage(const string& msg,
00223                                                bool send_immediately)
00224 {
00225     m_Impl->CheckIfCanceled();
00226     if (!send_immediately &&
00227             !m_Impl->m_ProgressMsgThrottler.Approve(
00228                     CRequestRateControl::eErrCode)) {
00229         LOG_POST(Warning << "Progress message \"" <<
00230                 msg << "\" has been suppressed.");
00231         return;
00232     }
00233 
00234     if (m_Impl->m_WorkerNode->m_ProgressLogRequested) {
00235         LOG_POST(GetJobKey() << " progress: " <<
00236                 NStr::TruncateSpaces(msg, NStr::eTrunc_End));
00237     }
00238 
00239     try {
00240         if (m_Impl->m_Job.progress_msg.empty() ) {
00241             m_Impl->m_NetScheduleExecutor.GetProgressMsg(m_Impl->m_Job);
00242         }
00243         if (!m_Impl->m_Job.progress_msg.empty())
00244             m_Impl->m_NetCacheAPI.PutData(m_Impl->m_Job.progress_msg,
00245                     msg.data(), msg.length());
00246         else {
00247             m_Impl->m_Job.progress_msg =
00248                     m_Impl->m_NetCacheAPI.PutData(msg.data(), msg.length());
00249 
00250             m_Impl->m_NetScheduleExecutor.PutProgressMsg(m_Impl->m_Job);
00251         }
00252     }
00253     catch (exception& ex) {
00254         ERR_POST_X(6, "Couldn't send a progress message: " << ex.what());
00255     }
00256 }
00257 
00258 void CWorkerNodeJobContext::JobDelayExpiration(unsigned runtime_inc)
00259 {
00260     try {
00261         m_Impl->m_NetScheduleExecutor.JobDelayExpiration(GetJobKey(),
00262                 runtime_inc);
00263     }
00264     catch (exception& ex) {
00265         ERR_POST_X(8, "CWorkerNodeJobContext::JobDelayExpiration: " <<
00266                 ex.what());
00267     }
00268 }
00269 
00270 bool CWorkerNodeJobContext::IsLogRequested() const
00271 {
00272     return m_Impl->m_WorkerNode->m_LogRequested;
00273 }
00274 
00275 CNetScheduleAdmin::EShutdownLevel CWorkerNodeJobContext::GetShutdownLevel()
00276 {
00277     if (m_Impl->m_StatusThrottler.Approve(CRequestRateControl::eErrCode))
00278         try {
00279             ENetScheduleQueuePauseMode pause_mode;
00280             switch (m_Impl->m_NetScheduleExecutor.GetJobStatus(GetJobKey(),
00281                     NULL, &pause_mode)) {
00282             case CNetScheduleAPI::eRunning:
00283                 if (pause_mode == eNSQ_WithPullback) {
00284                     m_Impl->m_WorkerNode->SetJobPullbackTimer(
00285                             m_Impl->m_WorkerNode->m_DefaultPullbackTimeout);
00286                     LOG_POST("Pullback request from the server, "
00287                             "(default) pullback timeout=" <<
00288                             m_Impl->m_WorkerNode->m_DefaultPullbackTimeout);
00289                 }
00290                 break;
00291 
00292             case CNetScheduleAPI::eCanceled:
00293                 m_Impl->x_SetCanceled();
00294                 /* FALL THROUGH */
00295 
00296             default:
00297                 return CNetScheduleAdmin::eShutdownImmediate;
00298             }
00299         }
00300         catch(exception& ex) {
00301             ERR_POST("Cannot retrieve job status for " << GetJobKey() <<
00302                     ": " << ex.what());
00303         }
00304 
00305     if (m_Impl->m_WorkerNode->CheckForPullback(m_Impl->m_JobGeneration)) {
00306         LOG_POST("Pullback timeout for " << m_Impl->m_Job.job_id);
00307         return CNetScheduleAdmin::eShutdownImmediate;
00308     }
00309 
00310     return CGridGlobals::GetInstance().GetShutdownLevel();
00311 }
00312 
00313 void SWorkerNodeJobContextImpl::CheckIfCanceled()
00314 {
00315     if (m_JobCommitted == CWorkerNodeJobContext::eCanceled) {
00316         NCBI_THROW_FMT(CGridWorkerNodeException, eJobIsCanceled,
00317             "Job " << m_Job.job_id << " has been canceled");
00318     }
00319 }
00320 
00321 void SWorkerNodeJobContextImpl::Reset()
00322 {
00323     m_JobNumber = CGridGlobals::GetInstance().GetNewJobNumber();
00324 
00325     m_JobCommitted = CWorkerNodeJobContext::eNotCommitted;
00326     m_InputBlobSize = 0;
00327     m_ExclusiveJob =
00328             (m_Job.mask & CNetScheduleAPI::eExclusiveJob) != 0;
00329 
00330     m_RequestContext->Reset();
00331 }
00332 
00333 void CWorkerNodeJobContext::RequestExclusiveMode()
00334 {
00335     if (!m_Impl->m_ExclusiveJob) {
00336         if (!m_Impl->m_WorkerNode->EnterExclusiveMode()) {
00337             NCBI_THROW(CGridWorkerNodeException,
00338                 eExclusiveModeIsAlreadySet, "");
00339         }
00340         m_Impl->m_ExclusiveJob = true;
00341     }
00342 }
00343 
00344 const char* CWorkerNodeJobContext::GetCommitStatusDescription(
00345         CWorkerNodeJobContext::ECommitStatus commit_status)
00346 {
00347     switch (commit_status) {
00348     case eDone:
00349         return "done";
00350     case eFailure:
00351         return "failed";
00352     case eReturn:
00353         return "returned";
00354     case eRescheduled:
00355         return "rescheduled";
00356     case eCanceled:
00357         return "canceled";
00358     default:
00359         return "not committed";
00360     }
00361 }
00362 
00363 void SWorkerNodeJobContextImpl::x_PrintRequestStop()
00364 {
00365     m_RequestContext->SetAppState(eDiagAppState_RequestEnd);
00366 
00367     if (!m_RequestContext->IsSetRequestStatus())
00368         m_RequestContext->SetRequestStatus(
00369             m_JobCommitted == CWorkerNodeJobContext::eDone &&
00370                 m_Job.ret_code == 0 ? 200 : 500);
00371 
00372     if (m_RequestContext->GetAppState() == eDiagAppState_Request)
00373         m_RequestContext->SetAppState(eDiagAppState_RequestEnd);
00374 
00375     if (g_IsRequestStopEventEnabled())
00376         GetDiagContext().PrintRequestStop();
00377 }
00378 
00379 void SWorkerNodeJobContextImpl::x_RunJob()
00380 {
00381     CWorkerNodeJobContext this_job_context(this);
00382 
00383     m_RequestContext->SetRequestID((int) this_job_context.GetJobNumber());
00384 
00385     if (!m_Job.client_ip.empty())
00386         m_RequestContext->SetClientIP(m_Job.client_ip);
00387 
00388     if (!m_Job.session_id.empty())
00389         m_RequestContext->SetSessionID(m_Job.session_id);
00390 
00391     m_RequestContext->SetAppState(eDiagAppState_RequestBegin);
00392 
00393     CRequestContextSwitcher request_state_guard(m_RequestContext);
00394 
00395     if (g_IsRequestStartEventEnabled())
00396         GetDiagContext().PrintRequestStart().Print("jid", m_Job.job_id);
00397 
00398     m_WorkerNode->x_NotifyJobWatchers(this_job_context,
00399             IWorkerNodeJobWatcher::eJobStarted);
00400 
00401     m_RequestContext->SetAppState(eDiagAppState_Request);
00402 
00403     try {
00404         this_job_context.SetJobRetCode(
00405                 m_WorkerNode->GetJobProcessor()->Do(this_job_context));
00406     }
00407     catch (CGridWorkerNodeException& ex) {
00408         switch (ex.GetErrCode()) {
00409         case CGridWorkerNodeException::eJobIsCanceled:
00410             x_SetCanceled();
00411             break;
00412 
00413         case CGridWorkerNodeException::eExclusiveModeIsAlreadySet:
00414             if (this_job_context.IsLogRequested()) {
00415                 LOG_POST_X(21, "Job " << m_Job.job_id <<
00416                     " will be returned back to the queue "
00417                     "because it requested exclusive mode while "
00418                     "another job already has the exclusive status.");
00419             }
00420             if (!this_job_context.IsJobCommitted())
00421                 this_job_context.ReturnJob();
00422             break;
00423 
00424         default:
00425             ERR_POST_X(62, ex);
00426             if (!this_job_context.IsJobCommitted())
00427                 this_job_context.ReturnJob();
00428         }
00429     }
00430     catch (CNetScheduleException& e) {
00431         ERR_POST_X(20, "job" << m_Job.job_id << " failed: " << e);
00432         if (e.GetErrCode() == CNetScheduleException::eJobNotFound)
00433             x_SetCanceled();
00434         else if (!this_job_context.IsJobCommitted())
00435             this_job_context.CommitJobWithFailure(e.what());
00436     }
00437     catch (exception& e) {
00438         ERR_POST_X(18, "job" << m_Job.job_id << " failed: " << e.what());
00439         if (!this_job_context.IsJobCommitted())
00440             this_job_context.CommitJobWithFailure(e.what());
00441     }
00442 
00443     this_job_context.CloseStreams();
00444 
00445     if (m_WorkerNode->IsExclusiveMode() && m_ExclusiveJob)
00446         m_WorkerNode->LeaveExclusiveMode();
00447 
00448     switch (this_job_context.GetCommitStatus()) {
00449     case CWorkerNodeJobContext::eDone:
00450         m_WorkerNode->x_NotifyJobWatchers(this_job_context,
00451                 IWorkerNodeJobWatcher::eJobSucceeded);
00452         break;
00453 
00454     case CWorkerNodeJobContext::eNotCommitted:
00455         if (TWorkerNode_AllowImplicitJobReturn::GetDefault() ||
00456                 this_job_context.GetShutdownLevel() !=
00457                         CNetScheduleAdmin::eNoShutdown) {
00458             this_job_context.ReturnJob();
00459             m_WorkerNode->x_NotifyJobWatchers(this_job_context,
00460                     IWorkerNodeJobWatcher::eJobReturned);
00461             break;
00462         }
00463 
00464         this_job_context.CommitJobWithFailure(
00465                 "Job was not explicitly committed");
00466         /* FALL THROUGH */
00467 
00468     case CWorkerNodeJobContext::eFailure:
00469         m_WorkerNode->x_NotifyJobWatchers(this_job_context,
00470                 IWorkerNodeJobWatcher::eJobFailed);
00471         break;
00472 
00473     case CWorkerNodeJobContext::eReturn:
00474         m_WorkerNode->x_NotifyJobWatchers(this_job_context,
00475                 IWorkerNodeJobWatcher::eJobReturned);
00476         break;
00477 
00478     case CWorkerNodeJobContext::eRescheduled:
00479         m_WorkerNode->x_NotifyJobWatchers(this_job_context,
00480                 IWorkerNodeJobWatcher::eJobRescheduled);
00481         break;
00482 
00483     default: // eCanceled - will be processed in x_SendJobResults().
00484         break;
00485     }
00486 
00487     m_WorkerNode->x_NotifyJobWatchers(this_job_context,
00488             IWorkerNodeJobWatcher::eJobStopped);
00489 
00490     if (!CGridGlobals::GetInstance().IsShuttingDown())
00491         static_cast<CWorkerNodeJobCleanup*>(
00492                 this_job_context.GetCleanupEventSource())->CallEventHandlers();
00493 
00494     m_WorkerNode->m_JobCommitterThread->RecycleJobContextAndCommitJob(this);
00495 }
00496 
00497 void* CMainLoopThread::Main()
00498 {
00499     CDeadline max_wait_for_servers(
00500             TWorkerNode_MaxWaitForServers::GetDefault());
00501 
00502     CWorkerNodeJobContext job_context(
00503             m_WorkerNode->m_JobCommitterThread->AllocJobContext());
00504 
00505     unsigned try_count = 0;
00506     while (!CGridGlobals::GetInstance().IsShuttingDown()) {
00507         try {
00508             try {
00509                 m_WorkerNode->m_ThreadPool->WaitForRoom(
00510                         m_WorkerNode->m_ThreadPoolTimeout);
00511             }
00512             catch (CBlockingQueueException&) {
00513                 // threaded pool is busy
00514                 continue;
00515             }
00516 
00517             if (x_GetNextJob(job_context->m_Job)) {
00518                 job_context->Reset();
00519 
00520                 try {
00521                     m_WorkerNode->m_ThreadPool->AcceptRequest(CRef<CStdRequest>(
00522                             new CWorkerNodeRequest(job_context)));
00523                 }
00524                 catch (CBlockingQueueException& ex) {
00525                     ERR_POST_X(28, ex);
00526                     // that must not happen after CBlockingQueue is fixed
00527                     _ASSERT(0);
00528                     job_context->m_JobCommitted =
00529                             CWorkerNodeJobContext::eReturn;
00530                     m_WorkerNode->m_JobCommitterThread->
00531                             RecycleJobContextAndCommitJob(job_context);
00532                 }
00533                 job_context =
00534                         m_WorkerNode->m_JobCommitterThread->AllocJobContext();
00535             }
00536             max_wait_for_servers =
00537                 CDeadline(TWorkerNode_MaxWaitForServers::GetDefault());
00538         }
00539         catch (CNetSrvConnException& e) {
00540             SleepMilliSec(s_GetRetryDelay());
00541             if (e.GetErrCode() == CNetSrvConnException::eConnectionFailure &&
00542                     !max_wait_for_servers.GetRemainingTime().IsZero())
00543                 continue;
00544             ERR_POST(Critical << "Could not connect to the "
00545                     "configured servers, exiting...");
00546             CGridGlobals::GetInstance().RequestShutdown(
00547                     CNetScheduleAdmin::eShutdownImmediate);
00548         }
00549         catch (CNetServiceException& ex) {
00550             ERR_POST_X(40, ex);
00551             if (++try_count >= TServConn_ConnMaxRetries::GetDefault()) {
00552                 CGridGlobals::GetInstance().RequestShutdown(
00553                     CNetScheduleAdmin::eShutdownImmediate);
00554             } else {
00555                 SleepMilliSec(s_GetRetryDelay());
00556                 continue;
00557             }
00558         }
00559         catch (exception& ex) {
00560             ERR_POST_X(29, ex.what());
00561             if (TWorkerNode_StopOnJobErrors::GetDefault()) {
00562                 CGridGlobals::GetInstance().RequestShutdown(
00563                     CNetScheduleAdmin::eShutdownImmediate);
00564             }
00565         }
00566         try_count = 0;
00567     }
00568 
00569     return NULL;
00570 }
00571 
00572 CMainLoopThread::~CMainLoopThread()
00573 {
00574     ITERATE(TTimelineEntries, it, m_TimelineEntryByAddress) {
00575         (*it)->RemoveReference();
00576     }
00577 }
00578 
00579 bool CMainLoopThread::x_GetJobWithAffinityList(SNetServerImpl* server,
00580         const CDeadline* timeout, CNetScheduleJob& job,
00581         CNetScheduleExecutor::EJobAffinityPreference affinity_preference,
00582         const string& affinity_list)
00583 {
00584     string cmd(m_WorkerNode->m_NSExecutor->
00585             m_NotificationHandler.CmdAppendTimeoutAndClientInfo(
00586                     CNetScheduleNotificationHandler::MkBaseGETCmd(
00587                             affinity_preference, affinity_list), timeout));
00588 
00589     return m_WorkerNode->m_NSExecutor->ExecGET(server, cmd, job);
00590 }
00591 
00592 bool CMainLoopThread::x_GetJobWithAffinityLadder(SNetServerImpl* server,
00593         const CDeadline* timeout, CNetScheduleJob& job)
00594 {
00595     if (m_WorkerNode->m_NSExecutor->m_API->m_AffinityLadder.empty())
00596         return x_GetJobWithAffinityList(server, timeout, job,
00597                 m_WorkerNode->m_NSExecutor->m_AffinityPreference, kEmptyStr);
00598 
00599     list<string>::const_iterator it =
00600             m_WorkerNode->m_NSExecutor->m_API->m_AffinityLadder.begin();
00601 
00602     for (;;) {
00603         string affinity_list = *it;
00604         if (++it == m_WorkerNode->m_NSExecutor->m_API->m_AffinityLadder.end())
00605             return x_GetJobWithAffinityList(server, timeout, job,
00606                     m_WorkerNode->m_NSExecutor->m_AffinityPreference,
00607                             affinity_list);
00608         else if (x_GetJobWithAffinityList(server, NULL, job,
00609                 CNetScheduleExecutor::ePreferredAffinities, affinity_list))
00610             return true;
00611     }
00612 }
00613 
00614 SNotificationTimelineEntry*
00615     CMainLoopThread::x_GetTimelineEntry(SNetServerImpl* server_impl)
00616 {
00617     SNotificationTimelineEntry search_pattern(
00618             server_impl->m_ServerInPool->m_Address, 0);
00619 
00620     TTimelineEntries::iterator it(
00621             m_TimelineEntryByAddress.find(&search_pattern));
00622 
00623     if (it != m_TimelineEntryByAddress.end())
00624         return *it;
00625 
00626     SNotificationTimelineEntry* new_entry = new SNotificationTimelineEntry(
00627             search_pattern.m_ServerAddress, m_DiscoveryIteration);
00628 
00629     m_TimelineEntryByAddress.insert(new_entry);
00630     new_entry->AddReference();
00631 
00632     return new_entry;
00633 }
00634 
00635 bool CMainLoopThread::x_PerformTimelineAction(
00636         CMainLoopThread::TNotificationTimeline& timeline, CNetScheduleJob& job)
00637 {
00638     SNotificationTimelineEntry::TRef timeline_entry;
00639 
00640     timeline.Shift(timeline_entry);
00641 
00642     if (timeline_entry->IsDiscoveryAction()) {
00643         if (!x_EnterSuspendedState()) {
00644             ++m_DiscoveryIteration;
00645             for (CNetServiceIterator it =
00646                     m_WorkerNode->m_NetScheduleAPI.GetService().Iterate(
00647                             CNetService::eIncludePenalized); it; ++it) {
00648                 SNotificationTimelineEntry* srv_entry = x_GetTimelineEntry(*it);
00649                 srv_entry->m_DiscoveryIteration = m_DiscoveryIteration;
00650                 if (!srv_entry->IsInTimeline())
00651                     m_ImmediateActions.Push(srv_entry);
00652             }
00653         }
00654 
00655         timeline_entry->ResetTimeout(m_WorkerNode->m_NSTimeout);
00656         m_Timeline.Push(timeline_entry);
00657         return false;
00658     }
00659 
00660     if (x_EnterSuspendedState() ||
00661             // Skip servers that disappeared from LBSM.
00662             timeline_entry->m_DiscoveryIteration != m_DiscoveryIteration)
00663         return false;
00664 
00665     CNetServer server(m_WorkerNode->m_NetScheduleAPI->m_Service.GetServer(
00666             timeline_entry->m_ServerAddress));
00667 
00668     timeline_entry->ResetTimeout(m_WorkerNode->m_NSTimeout);
00669 
00670     try {
00671         if (x_GetJobWithAffinityLadder(server,
00672                 &timeline_entry->GetTimeout(), job)) {
00673             // A job has been returned; add the server to
00674             // m_ImmediateActions because there can be more
00675             // jobs in the queue.
00676             m_ImmediateActions.Push(timeline_entry);
00677             return true;
00678         } else {
00679             // No job has been returned by this server;
00680             // query the server later.
00681             m_Timeline.Push(timeline_entry);
00682             return false;
00683         }
00684     }
00685     catch (CNetSrvConnException& e) {
00686         // Because a connection error has occurred, do not
00687         // put this server back to the timeline.
00688         LOG_POST(Warning << e.GetMsg());
00689         return false;
00690     }
00691 }
00692 
00693 bool CMainLoopThread::x_EnterSuspendedState()
00694 {
00695     if (CGridGlobals::GetInstance().IsShuttingDown())
00696         return true;
00697 
00698     void* event;
00699 
00700     while ((event = SwapPointers(&m_WorkerNode->m_SuspendResumeEvent,
00701             NO_EVENT)) != NO_EVENT) {
00702         if (event == SUSPEND_EVENT) {
00703             if (!m_WorkerNode->m_TimelineIsSuspended) {
00704                 // Stop the timeline.
00705                 m_WorkerNode->m_TimelineIsSuspended = true;
00706                 m_ImmediateActions.Clear();
00707                 m_Timeline.Clear();
00708                 m_DiscoveryAction->ResetTimeout(m_WorkerNode->m_NSTimeout);
00709                 m_Timeline.Push(m_DiscoveryAction);
00710             }
00711         } else { /* event == RESUME_EVENT */
00712             if (m_WorkerNode->m_TimelineIsSuspended) {
00713                 // Resume the timeline.
00714                 m_WorkerNode->m_TimelineIsSuspended = false;
00715                 m_DiscoveryAction->MoveTo(&m_ImmediateActions);
00716             }
00717         }
00718     }
00719 
00720     return m_WorkerNode->m_TimelineIsSuspended;
00721 }
00722 
00723 void CMainLoopThread::x_ProcessRequestJobNotification()
00724 {
00725     if (!x_EnterSuspendedState()) {
00726         CNetServer server;
00727 
00728         if (m_WorkerNode->m_NSExecutor->
00729                 m_NotificationHandler.CheckRequestJobNotification(
00730                         m_WorkerNode->m_NSExecutor, &server))
00731             x_GetTimelineEntry(server)->MoveTo(&m_ImmediateActions);
00732     }
00733 }
00734 
00735 bool CMainLoopThread::x_WaitForNewJob(CNetScheduleJob& job)
00736 {
00737     for (;;) {
00738         while (!m_ImmediateActions.IsEmpty()) {
00739             if (x_PerformTimelineAction(m_ImmediateActions, job))
00740                 return true;
00741 
00742             while (!m_Timeline.IsEmpty() && m_Timeline.GetHead()->TimeHasCome())
00743                 m_Timeline.MoveHeadTo(&m_ImmediateActions);
00744 
00745             // Check if there's a notification in the UDP socket.
00746             while (m_WorkerNode->m_NSExecutor->
00747                     m_NotificationHandler.ReceiveNotification())
00748                 x_ProcessRequestJobNotification();
00749         }
00750 
00751         if (CGridGlobals::GetInstance().IsShuttingDown())
00752             return false;
00753 
00754         if (!m_Timeline.IsEmpty()) {
00755             if (m_WorkerNode->m_NSExecutor->
00756                     m_NotificationHandler.WaitForNotification(
00757                             m_Timeline.GetHead()->GetTimeout()))
00758                 x_ProcessRequestJobNotification();
00759             else if (x_PerformTimelineAction(m_Timeline, job))
00760                 return true;
00761         }
00762     }
00763 }
00764 
00765 bool CMainLoopThread::x_GetNextJob(CNetScheduleJob& job)
00766 {
00767     if (!m_WorkerNode->x_AreMastersBusy()) {
00768         SleepSec(m_WorkerNode->m_NSTimeout);
00769         return false;
00770     }
00771 
00772     if (!m_WorkerNode->WaitForExclusiveJobToFinish())
00773         return false;
00774 
00775     bool job_exists = x_WaitForNewJob(job);
00776 
00777     if (job_exists && job.mask & CNetScheduleAPI::eExclusiveJob) {
00778         if (!m_WorkerNode->EnterExclusiveMode()) {
00779             x_ReturnJob(job);
00780             job_exists = false;
00781         }
00782     }
00783     if (job_exists && x_EnterSuspendedState()) {
00784         x_ReturnJob(job);
00785         return false;
00786     }
00787     return job_exists;
00788 }
00789 
00790 void CMainLoopThread::x_ReturnJob(const CNetScheduleJob& job)
00791 {
00792     m_WorkerNode->m_NSExecutor.ReturnJob(job.job_id, job.auth_token);
00793 }
00794 
00795 size_t CGridWorkerNode::GetServerOutputSize()
00796 {
00797     return m_Impl->m_QueueEmbeddedOutputSize;
00798 }
00799 
00800 END_NCBI_SCOPE
Modified on Mon Sep 15 17:17:29 2014 by modify_doxy.py rev. 426318