NCBI C++ ToolKit
grid_worker.cpp
Go to the documentation of this file.
00001 /*  $Id: grid_worker.cpp 54418 2012-05-11 22:04:54Z kazimird $
00002  * ===========================================================================
00003  *
00004  *                            PUBLIC DOMAIN NOTICE
00005  *               National Center for Biotechnology Information
00006  *
00007  *  This software/database is a "United States Government Work" under the
00008  *  terms of the United States Copyright Act.  It was written as part of
00009  *  the author's official duties as a United States Government employee and
00010  *  thus cannot be copyrighted.  This software/database is freely available
00011  *  to the public for use. The National Library of Medicine and the U.S.
00012  *   Government have not placed any restriction on its use or reproduction.
00013  *
00014  *  Although all reasonable efforts have been taken to ensure the accuracy
00015  *  and reliability of the software and data, the NLM and the U.S.
00016  *  Government do not and cannot warrant the performance or results that
00017  *  may be obtained by using this software or data. The NLM and the U.S.
00018  *  Government disclaim all warranties, express or implied, including
00019  *  warranties of performance, merchantability or fitness for any particular
00020  *  purpose.
00021  *
00022  *  Please cite the author in any work or product based on this material.
00023  *
00024  * ===========================================================================
00025  *
00026  * Authors:  Maxim Didenko, Anatoliy Kuznetsov, Dmitry Kazimirov
00027  *
00028  * File Description:
00029  *    NetSchedule Worker Node implementation
00030  */
00031 
00032 #include <ncbi_pch.hpp>
00033 
00034 #include "grid_thread_context.hpp"
00035 #include "grid_debug_context.hpp"
00036 #include "netschedule_api_impl.hpp"
00037 
00038 #include <connect/services/grid_globals.hpp>
00039 #include <connect/services/error_codes.hpp>
00040 #include <connect/services/netschedule_api_expt.hpp>
00041 #include <connect/services/grid_worker_app.hpp>
00042 #include <connect/services/grid_control_thread.hpp>
00043 
00044 #include <connect/ncbi_socket.hpp>
00045 
00046 #include <util/thread_pool.hpp>
00047 
00048 #include <corelib/ncbireg.hpp>
00049 #include <corelib/ncbithr.hpp>
00050 #include <corelib/ncbitime.hpp>
00051 #include <corelib/ncbi_process.hpp>
00052 #include <corelib/ncbiexpt.hpp>
00053 #include <corelib/ncbi_system.hpp>
00054 #include <corelib/ncbi_safe_static.hpp>
00055 #include <corelib/request_ctx.hpp>
00056 
00057 #ifdef NCBI_OS_UNIX
00058 #include <unistd.h>
00059 #endif
00060 
00061 
00062 #define NCBI_USE_ERRCODE_X   ConnServ_WorkerNode
00063 
00064 
00065 BEGIN_NCBI_SCOPE
00066 
00067 /////////////////////////////////////////////////////////////////////////////
00068 //
00069 IWorkerNodeJobWatcher::~IWorkerNodeJobWatcher()
00070 {
00071 }
00072 
00073 /////////////////////////////////////////////////////////////////////////////
00074 //
00075 //     CWorkerNodeJobContext     --
00076 
00077 namespace {
00078 
00079 class CWorkerNodeCleanup : public IWorkerNodeCleanupEventSource
00080 {
00081 public:
00082     typedef set<IWorkerNodeCleanupEventListener*> TListeners;
00083 
00084     virtual void AddListener(IWorkerNodeCleanupEventListener* listener);
00085     virtual void RemoveListener(IWorkerNodeCleanupEventListener* listener);
00086 
00087     virtual void CallEventHandlers();
00088 
00089     void RemoveListeners(const TListeners& listeners);
00090 
00091 protected:
00092     TListeners m_Listeners;
00093     CFastMutex m_ListenersLock;
00094 };
00095 
00096 void CWorkerNodeCleanup::AddListener(IWorkerNodeCleanupEventListener* listener)
00097 {
00098     CFastMutexGuard g(m_ListenersLock);
00099     m_Listeners.insert(listener);
00100 }
00101 
00102 void CWorkerNodeCleanup::RemoveListener(
00103     IWorkerNodeCleanupEventListener* listener)
00104 {
00105     CFastMutexGuard g(m_ListenersLock);
00106     m_Listeners.erase(listener);
00107 }
00108 
00109 void CWorkerNodeCleanup::CallEventHandlers()
00110 {
00111     TListeners listeners;
00112     {
00113         CFastMutexGuard g(m_ListenersLock);
00114         listeners.swap(m_Listeners);
00115     }
00116 
00117     ITERATE(TListeners, it, listeners) {
00118         try {
00119             (*it)->HandleEvent(
00120                 IWorkerNodeCleanupEventListener::eRegularCleanup);
00121             delete *it;
00122         }
00123         NCBI_CATCH_ALL_X(39, "Job clean-up error");
00124     }
00125 }
00126 
00127 void CWorkerNodeCleanup::RemoveListeners(
00128     const CWorkerNodeCleanup::TListeners& listeners)
00129 {
00130     CFastMutexGuard g(m_ListenersLock);
00131     ITERATE(TListeners, it, listeners) {
00132         m_Listeners.erase(*it);
00133     }
00134 }
00135 
00136 class CWorkerNodeJobCleanup : public CWorkerNodeCleanup
00137 {
00138 public:
00139     CWorkerNodeJobCleanup(CWorkerNodeCleanup* worker_node_cleanup);
00140 
00141     virtual void AddListener(IWorkerNodeCleanupEventListener* listener);
00142     virtual void RemoveListener(IWorkerNodeCleanupEventListener* listener);
00143 
00144     virtual void CallEventHandlers();
00145 
00146 private:
00147     CWorkerNodeCleanup* m_WorkerNodeCleanup;
00148 };
00149 
00150 CWorkerNodeJobCleanup::CWorkerNodeJobCleanup(
00151         CWorkerNodeCleanup* worker_node_cleanup) :
00152     m_WorkerNodeCleanup(worker_node_cleanup)
00153 {
00154 }
00155 
00156 void CWorkerNodeJobCleanup::AddListener(
00157     IWorkerNodeCleanupEventListener* listener)
00158 {
00159     CWorkerNodeCleanup::AddListener(listener);
00160     m_WorkerNodeCleanup->AddListener(listener);
00161 }
00162 
00163 void CWorkerNodeJobCleanup::RemoveListener(
00164     IWorkerNodeCleanupEventListener* listener)
00165 {
00166     CWorkerNodeCleanup::RemoveListener(listener);
00167     m_WorkerNodeCleanup->RemoveListener(listener);
00168 }
00169 
00170 void CWorkerNodeJobCleanup::CallEventHandlers()
00171 {
00172     {
00173         CFastMutexGuard g(m_ListenersLock);
00174         m_WorkerNodeCleanup->RemoveListeners(m_Listeners);
00175     }
00176     CWorkerNodeCleanup::CallEventHandlers();
00177 }
00178 
00179 } // end of anonymous namespace
00180 
00181 CWorkerNodeJobContext::CWorkerNodeJobContext(CGridWorkerNode& worker_node,
00182                                              const CNetScheduleJob&    job,
00183                                              bool             log_requested)
00184     : m_WorkerNode(worker_node),
00185       m_LogRequested(log_requested),
00186       m_ThreadContext(NULL),
00187       m_CleanupEventSource(new CWorkerNodeJobCleanup(
00188         static_cast<CWorkerNodeCleanup*>(worker_node.GetCleanupEventSource())))
00189 {
00190     Reset(job);
00191 }
00192 
00193 const string& CWorkerNodeJobContext::GetQueueName() const
00194 {
00195     return m_WorkerNode.GetQueueName();
00196 }
00197 const string& CWorkerNodeJobContext::GetClientName() const
00198 {
00199     return m_WorkerNode.GetClientName();
00200 }
00201 
00202 CNcbiIstream& CWorkerNodeJobContext::GetIStream()
00203 {
00204     _ASSERT(m_ThreadContext);
00205     return m_ThreadContext->GetIStream();
00206 }
00207 CNcbiOstream& CWorkerNodeJobContext::GetOStream()
00208 {
00209     _ASSERT(m_ThreadContext);
00210     return m_ThreadContext->GetOStream();
00211 }
00212 
00213 void CWorkerNodeJobContext::CommitJob()
00214 {
00215     CheckIfCanceled();
00216     m_JobCommitted = eDone;
00217 }
00218 
00219 void CWorkerNodeJobContext::CommitJobWithFailure(const string& err_msg)
00220 {
00221     CheckIfCanceled();
00222     m_JobCommitted = eFailure;
00223     m_Job.error_msg = err_msg;
00224 }
00225 
00226 void CWorkerNodeJobContext::ReturnJob()
00227 {
00228     CheckIfCanceled();
00229     m_JobCommitted = eReturn;
00230 }
00231 
00232 void CWorkerNodeJobContext::PutProgressMessage(const string& msg,
00233                                                bool send_immediately)
00234 {
00235     _ASSERT(m_ThreadContext);
00236     CheckIfCanceled();
00237     m_ThreadContext->PutProgressMessage(msg, send_immediately);
00238 }
00239 
00240 void CWorkerNodeJobContext::JobDelayExpiration(unsigned runtime_inc)
00241 {
00242     _ASSERT(m_ThreadContext);
00243     m_ThreadContext->JobDelayExpiration(runtime_inc);
00244 }
00245 
00246 CNetScheduleAdmin::EShutdownLevel
00247 CWorkerNodeJobContext::GetShutdownLevel(void) const
00248 {
00249     _ASSERT(m_ThreadContext);
00250     if (m_ThreadContext->IsJobCanceled())
00251         return CNetScheduleAdmin::eShutdownImmediate;
00252     return CGridGlobals::GetInstance().GetShutdownLevel();
00253 }
00254 
00255 void CWorkerNodeJobContext::CheckIfCanceled()
00256 {
00257     if (IsCanceled()) {
00258         NCBI_THROW_FMT(CGridWorkerNodeException, eJobIsCanceled,
00259             "Job " << m_Job.job_id << " has been canceled");
00260     }
00261 }
00262 
00263 void CWorkerNodeJobContext::Reset(const CNetScheduleJob& job)
00264 {
00265     m_Job = job;
00266     m_JobNumber = CGridGlobals::GetInstance().GetNewJobNumber();
00267 
00268     m_JobCommitted = eNotCommitted;
00269     m_InputBlobSize = 0;
00270     m_ExclusiveJob = m_Job.mask & CNetScheduleAPI::eExclusiveJob;
00271 }
00272 
00273 void CWorkerNodeJobContext::RequestExclusiveMode()
00274 {
00275     if (!m_ExclusiveJob) {
00276         if (!m_WorkerNode.EnterExclusiveMode()) {
00277             NCBI_THROW(CGridWorkerNodeException,
00278                 eExclusiveModeIsAlreadySet, "");
00279         }
00280         m_ExclusiveJob = true;
00281     }
00282 }
00283 
00284 size_t CWorkerNodeJobContext::GetMaxServerOutputSize()
00285 {
00286     return m_WorkerNode.GetServerOutputSize();
00287 }
00288 
00289 IWorkerNodeCleanupEventSource* CWorkerNodeJobContext::GetCleanupEventSource()
00290 {
00291     return m_CleanupEventSource;
00292 }
00293 
00294 
00295 /////////////////////////////////////////////////////////////////////////////
00296 //
00297 ///@internal
00298 static void s_TlsCleanup(CGridThreadContext* p_value, void* /* data */ )
00299 {
00300     delete p_value;
00301 }
00302 /// @internal
00303 static CStaticTls<CGridThreadContext> s_tls;
00304 
00305 ///@internal
00306 class CWorkerNodeRequest : public CStdRequest
00307 {
00308 public:
00309     CWorkerNodeRequest(auto_ptr<CWorkerNodeJobContext> context);
00310 
00311     virtual void Process();
00312 
00313 private:
00314     void x_HandleProcessError(exception* ex = NULL);
00315 
00316     auto_ptr<CWorkerNodeJobContext> m_JobContext;
00317 };
00318 
00319 
00320 CWorkerNodeRequest::CWorkerNodeRequest(auto_ptr<CWorkerNodeJobContext> context)
00321   : m_JobContext(context)
00322 {
00323 }
00324 
00325 
00326 static CGridWorkerNode::EDisabledRequestEvents s_ReqEventsDisabled =
00327     CGridWorkerNode::eEnableStartStop;
00328 
00329 class CRequestStateGuard
00330 {
00331 public:
00332     CRequestStateGuard(CWorkerNodeJobContext& job_context);
00333 
00334     ~CRequestStateGuard();
00335 
00336 private:
00337     CWorkerNodeJobContext& m_JobContext;
00338 };
00339 
00340 CRequestStateGuard::CRequestStateGuard(CWorkerNodeJobContext& job_context) :
00341     m_JobContext(job_context)
00342 {
00343     CRequestContext& request_context = CDiagContext::GetRequestContext();
00344 
00345     request_context.SetRequestID((int) job_context.GetJobNumber());
00346 
00347     const CNetScheduleJob& job = job_context.GetJob();
00348 
00349     if (!job.client_ip.empty())
00350         request_context.SetClientIP(job.client_ip);
00351 
00352     if (!job.session_id.empty())
00353         request_context.SetSessionID(job.session_id);
00354 
00355     request_context.SetAppState(eDiagAppState_RequestBegin);
00356 
00357     if (s_ReqEventsDisabled == CGridWorkerNode::eEnableStartStop)
00358         GetDiagContext().PrintRequestStart().Print("jid", job.job_id);
00359 
00360     request_context.SetAppState(eDiagAppState_Request);
00361 }
00362 
00363 CRequestStateGuard::~CRequestStateGuard()
00364 {
00365     CRequestContext& request_context = CDiagContext::GetRequestContext();
00366 
00367     request_context.SetAppState(eDiagAppState_RequestEnd);
00368 
00369     if (!request_context.IsSetRequestStatus())
00370         request_context.SetRequestStatus(
00371             m_JobContext.GetCommitStatus() == CWorkerNodeJobContext::eDone &&
00372                 m_JobContext.GetJob().ret_code == 0 ? 200 : 500);
00373 
00374     switch (request_context.GetAppState()) {
00375     case eDiagAppState_Request:
00376         request_context.SetAppState(eDiagAppState_RequestEnd);
00377         /* FALL THROUGH */
00378 
00379     default:
00380         switch (s_ReqEventsDisabled) {
00381         case CGridWorkerNode::eEnableStartStop:
00382         case CGridWorkerNode::eDisableStartOnly:
00383             GetDiagContext().PrintRequestStop();
00384 
00385         default:
00386             break;
00387         }
00388         request_context.SetAppState(eDiagAppState_NotSet);
00389         request_context.UnsetSessionID();
00390         request_context.UnsetClientIP();
00391     }
00392 }
00393 
00394 void CGridThreadContext::RunJob(CWorkerNodeJobContext& job_context)
00395 {
00396     _ASSERT(!m_JobContext);
00397     m_JobContext = &job_context;
00398     job_context.SetThreadContext(this);
00399 
00400     CGridDebugContext* debug_context = CGridDebugContext::GetInstance();
00401     if (debug_context) {
00402         debug_context->DumpInput(m_JobContext->GetJobInput(),
00403             m_JobContext->GetJobNumber());
00404     }
00405     m_JobContext->GetWorkerNode().x_NotifyJobWatcher(*m_JobContext,
00406         IWorkerNodeJobWatcher::eJobStarted);
00407 
00408     {{ // CRequestStateGuard scope.
00409 
00410     CRequestStateGuard request_state_guard(job_context);
00411 
00412     try {
00413         CRef<IWorkerNodeJob> job(GetJob());
00414         try {
00415             job_context.SetJobRetCode(job->Do(job_context));
00416         } catch (CGridWorkerNodeException& ex) {
00417             if (ex.GetErrCode() !=
00418                     CGridWorkerNodeException::eExclusiveModeIsAlreadySet) {
00419                 try {
00420                     CloseStreams();
00421                 }
00422                 NCBI_CATCH_ALL_X(11, "Could not close IO streams");
00423                 throw;
00424             }
00425 
00426             if (job_context.IsLogRequested()) {
00427                 LOG_POST_X(21, "Job " << job_context.GetJobKey() <<
00428                     " has been returned back to the queue "
00429                     "because it requested exclusive mode but "
00430                     "another job already has the exclusive status.");
00431             }
00432         }
00433         CloseStreams();
00434         unsigned try_count = 0;
00435         for (;;) {
00436             try {
00437                 switch (job_context.GetCommitStatus()) {
00438                     case CWorkerNodeJobContext::eDone:
00439                         PutResult();
00440                         break;
00441 
00442                     case CWorkerNodeJobContext::eFailure:
00443                         PutFailure();
00444                         break;
00445 
00446                     case CWorkerNodeJobContext::eNotCommitted:
00447                         if (!TWorkerNode_AllowImplicitJobReturn::
00448                                     GetDefault() &&
00449                                 job_context.GetShutdownLevel() ==
00450                                     CNetScheduleAdmin::eNoShutdown) {
00451                             job_context.m_Job.error_msg =
00452                                 "Job was not explicitly committed";
00453                             PutFailure();
00454                             break;
00455                         }
00456                         /* FALL THROUGH */
00457 
00458                     case CWorkerNodeJobContext::eReturn:
00459                         ReturnJob();
00460                         break;
00461 
00462                     case CWorkerNodeJobContext::eCanceled:
00463                         ERR_POST("Job " << job_context.GetJobKey() <<
00464                             " has been canceled");
00465                 }
00466                 break;
00467             } catch (CNetServiceException& ex) {
00468                 if (++try_count >= TServConn_ConnMaxRetries::GetDefault())
00469                     throw;
00470                 ERR_POST_X(22, "Communication Error : " << ex.what());
00471                 SleepMilliSec(s_GetRetryDelay());
00472             }
00473         }
00474 
00475         if (!CGridGlobals::GetInstance().IsShuttingDown())
00476             static_cast<CWorkerNodeJobCleanup*>(
00477                 job_context.GetCleanupEventSource())->CallEventHandlers();
00478     }
00479     catch (CNetScheduleException& e) {
00480         ERR_POST_X(20, m_JobContext->GetJobKey() <<
00481             " Error in Job execution: " << e.what());
00482         if (e.GetErrCode() != CNetScheduleException::eJobNotFound)
00483             PutFailureAndIgnoreErrors(e.what());
00484     }
00485     catch (exception& ex) {
00486         ERR_POST_X(18, m_JobContext->GetJobKey() <<
00487             " Error in Job execution: " << ex.what());
00488         PutFailureAndIgnoreErrors(ex.what());
00489     }
00490 
00491     m_JobContext->GetWorkerNode().x_NotifyJobWatcher(*m_JobContext,
00492         IWorkerNodeJobWatcher::eJobStopped);
00493 
00494     }} // Call CRequestStateGuard destructor before job_context is reset.
00495 
00496     m_JobContext->SetThreadContext(NULL);
00497     m_JobContext = NULL;
00498 }
00499 
00500 void CWorkerNodeRequest::x_HandleProcessError(exception* ex)
00501 {
00502     string msg = " Error during job run";
00503     if (ex) {
00504         msg += ": ";
00505         msg += ex->what();
00506     }
00507     ERR_POST_X(23, msg);
00508     try {
00509         const CNetScheduleJob& job = m_JobContext->GetJob();
00510         m_JobContext->GetWorkerNode().x_ReturnJob(job.job_id, job.auth_token);
00511     } catch (exception& ex1) {
00512         ERR_POST_X(24, "Could not return job back to queue: " << ex1.what());
00513     }
00514 }
00515 
00516 void CWorkerNodeRequest::Process()
00517 {
00518     try {
00519         CGridThreadContext* thread_context = s_tls.GetValue();
00520         if (!thread_context) {
00521             thread_context =
00522                 new CGridThreadContext(m_JobContext->GetWorkerNode());
00523             s_tls.SetValue(thread_context, s_TlsCleanup);
00524         }
00525         thread_context->RunJob(*m_JobContext);
00526     }
00527     catch (exception& ex) { x_HandleProcessError(&ex);  }
00528 }
00529 
00530 /////////////////////////////////////////////////////////////////////////////
00531 //
00532 //     CWorkerNodeJobWatchers
00533 /// @internal
00534 
00535 class CWorkerNodeJobWatchers : public IWorkerNodeJobWatcher
00536 {
00537 public:
00538     virtual ~CWorkerNodeJobWatchers() {};
00539     virtual void Notify(const CWorkerNodeJobContext& job, EEvent event)
00540     {
00541         NON_CONST_ITERATE(TCont, it, m_Watchers) {
00542             IWorkerNodeJobWatcher* watcher =
00543                 const_cast<IWorkerNodeJobWatcher*>(it->first);
00544             watcher->Notify(job, event);
00545         }
00546     }
00547 
00548     void AttachJobWatcher(IWorkerNodeJobWatcher& job_watcher, EOwnership owner)
00549     {
00550         TCont::const_iterator it = m_Watchers.find(&job_watcher);
00551         if (it == m_Watchers.end()) {
00552             if (owner == eTakeOwnership)
00553                 m_Watchers[&job_watcher] = AutoPtr<IWorkerNodeJobWatcher>(&job_watcher);
00554             else
00555                 m_Watchers[&job_watcher] = AutoPtr<IWorkerNodeJobWatcher>();
00556         }
00557     }
00558 
00559 private:
00560     typedef map<IWorkerNodeJobWatcher*, AutoPtr<IWorkerNodeJobWatcher> > TCont;
00561     TCont m_Watchers;
00562 };
00563 
00564 /////////////////////////////////////////////////////////////////////////////
00565 //
00566 //     CGridControleThread
00567 /// @internal
00568 class CGridControlThread : public CThread
00569 {
00570 public:
00571     CGridControlThread(CGridWorkerNode* worker_node,
00572         unsigned int start_port, unsigned int end_port) : m_Control(
00573             new CWorkerNodeControlServer(worker_node, start_port, end_port)) {}
00574 
00575     void Prepare() { m_Control->StartListening(); }
00576 
00577     unsigned short GetControlPort() { return m_Control->GetControlPort(); }
00578     void Stop() { if (m_Control.get()) m_Control->RequestShutdown(); }
00579 
00580 protected:
00581     virtual void* Main(void)
00582     {
00583         m_Control->Run();
00584         return NULL;
00585     }
00586     virtual void OnExit(void)
00587     {
00588         CThread::OnExit();
00589         CGridGlobals::GetInstance().RequestShutdown(CNetScheduleAdmin::eShutdownImmediate);
00590         LOG_POST_X(46, Info << "Control Thread has been stopped.");
00591     }
00592 
00593 private:
00594     auto_ptr<CWorkerNodeControlServer> m_Control;
00595 };
00596 
00597 class CGridCleanupThread : public CThread
00598 {
00599 public:
00600     CGridCleanupThread(CGridWorkerNode* worker_node,
00601         IGridWorkerNodeApp_Listener* listener) :
00602             m_WorkerNode(worker_node),
00603             m_Listener(listener),
00604             m_Semaphore(0, 1)
00605     {
00606     }
00607 
00608     bool Wait(unsigned seconds) {return m_Semaphore.TryWait(seconds);}
00609 
00610 protected:
00611     virtual void* Main();
00612 
00613 private:
00614     CGridWorkerNode* m_WorkerNode;
00615     IGridWorkerNodeApp_Listener* m_Listener;
00616     CSemaphore m_Semaphore;
00617 };
00618 
00619 void* CGridCleanupThread::Main()
00620 {
00621     m_WorkerNode->GetCleanupEventSource()->CallEventHandlers();
00622     m_Listener->OnGridWorkerStop();
00623     m_Semaphore.Post();
00624 
00625     return NULL;
00626 }
00627 
00628 /////////////////////////////////////////////////////////////////////////////
00629 //
00630 //     CWorkerNodeIdleThread      --
00631 class CWorkerNodeIdleThread : public CThread
00632 {
00633 public:
00634     CWorkerNodeIdleThread(IWorkerNodeIdleTask*,
00635                           CGridWorkerNode& worker_node,
00636                           unsigned run_delay,
00637                           unsigned int auto_shutdown);
00638 
00639     void RequestShutdown()
00640     {
00641         m_ShutdownFlag = true;
00642         m_Wait1.Post();
00643         m_Wait2.Post();
00644     }
00645     void Schedule()
00646     {
00647         CFastMutexGuard guard(m_Mutext);
00648         m_AutoShutdownSW.Restart();
00649         if (m_StopFlag) {
00650             m_StopFlag = false;
00651             m_Wait1.Post();
00652         }
00653     }
00654     void Suspend()
00655     {
00656         CFastMutexGuard guard(m_Mutext);
00657         m_AutoShutdownSW.Restart();
00658         m_AutoShutdownSW.Stop();
00659         if (!m_StopFlag) {
00660             m_StopFlag = true;
00661             m_Wait2.Post();
00662         }
00663     }
00664 
00665     bool IsShutdownRequested() const { return m_ShutdownFlag; }
00666 
00667 
00668 protected:
00669     virtual void* Main(void);
00670     virtual void OnExit(void);
00671 
00672     CWorkerNodeIdleTaskContext& GetContext();
00673 
00674 private:
00675 
00676     unsigned int x_GetInterval() const
00677     {
00678         CFastMutexGuard guard(m_Mutext);
00679         return  m_AutoShutdown > 0 ?
00680                 min( m_AutoShutdown - (unsigned int)m_AutoShutdownSW.Elapsed(), m_RunInterval )
00681                 : m_RunInterval;
00682     }
00683     bool x_GetStopFlag() const
00684     {
00685         CFastMutexGuard guard(m_Mutext);
00686         return m_StopFlag;
00687     }
00688     bool x_IsAutoShutdownTime() const
00689     {
00690         CFastMutexGuard guard(m_Mutext);
00691         return m_AutoShutdown > 0 ? m_AutoShutdownSW.Elapsed() > m_AutoShutdown : false;
00692     }
00693 
00694     IWorkerNodeIdleTask* m_Task;
00695     CGridWorkerNode& m_WorkerNode;
00696     auto_ptr<CWorkerNodeIdleTaskContext> m_TaskContext;
00697     mutable CSemaphore  m_Wait1;
00698     mutable CSemaphore  m_Wait2;
00699     volatile bool       m_StopFlag;
00700     volatile bool       m_ShutdownFlag;
00701     unsigned int        m_RunInterval;
00702     unsigned int        m_AutoShutdown;
00703     CStopWatch          m_AutoShutdownSW;
00704     mutable CFastMutex  m_Mutext;
00705 
00706     CWorkerNodeIdleThread(const CWorkerNodeIdleThread&);
00707     CWorkerNodeIdleThread& operator=(const CWorkerNodeIdleThread&);
00708 };
00709 
00710 CWorkerNodeIdleThread::CWorkerNodeIdleThread(IWorkerNodeIdleTask* task,
00711                                              CGridWorkerNode& worker_node,
00712                                              unsigned run_delay,
00713                                              unsigned int auto_shutdown)
00714     : m_Task(task), m_WorkerNode(worker_node),
00715       m_Wait1(0,100000), m_Wait2(0,1000000),
00716       m_StopFlag(false), m_ShutdownFlag(false),
00717       m_RunInterval(run_delay),
00718       m_AutoShutdown(auto_shutdown), m_AutoShutdownSW(CStopWatch::eStart)
00719 {
00720 }
00721 void* CWorkerNodeIdleThread::Main()
00722 {
00723     while (!m_ShutdownFlag) {
00724         if ( x_IsAutoShutdownTime() ) {
00725             LOG_POST_X(47, Info <<
00726                 "There are no more jobs to be done. Exiting.");
00727             CGridGlobals::GetInstance().RequestShutdown(CNetScheduleAdmin::eShutdownImmediate);
00728             break;
00729         }
00730         unsigned int interval = m_AutoShutdown > 0 ? min (m_RunInterval,m_AutoShutdown) : m_RunInterval;
00731         if (m_Wait1.TryWait(interval, 0)) {
00732             if (m_ShutdownFlag)
00733                 continue;
00734             interval = x_GetInterval();
00735             if (m_Wait2.TryWait(interval, 0)) {
00736                 continue;
00737             }
00738         }
00739         if (m_Task && !x_GetStopFlag()) {
00740             try {
00741                 do {
00742                     if ( x_IsAutoShutdownTime() ) {
00743                         LOG_POST_X(48, Info <<
00744                             "There are no more jobs to be done. Exiting.");
00745                         CGridGlobals::GetInstance().RequestShutdown(
00746                             CNetScheduleAdmin::eShutdownImmediate);
00747                         m_ShutdownFlag = true;
00748                         break;
00749                     }
00750                     GetContext().Reset();
00751                     m_Task->Run(GetContext());
00752                 } while( GetContext().NeedRunAgain() && !m_ShutdownFlag);
00753             } NCBI_CATCH_ALL_X(58,
00754                 "CWorkerNodeIdleThread::Main: Idle Task failed");
00755         }
00756     }
00757     return 0;
00758 }
00759 
00760 void CWorkerNodeIdleThread::OnExit(void)
00761 {
00762     LOG_POST_X(49, Info << "Idle Thread has been stopped.");
00763 }
00764 
00765 CWorkerNodeIdleTaskContext& CWorkerNodeIdleThread::GetContext()
00766 {
00767     if (!m_TaskContext.get())
00768         m_TaskContext.reset(new CWorkerNodeIdleTaskContext(*this));
00769     return *m_TaskContext;
00770 }
00771 
00772 /////////////////////////////////////////////////////////////////////////////
00773 //
00774 //     CWorkerNodeIdleTaskContext      --
00775 CWorkerNodeIdleTaskContext::
00776 CWorkerNodeIdleTaskContext(CWorkerNodeIdleThread& thread)
00777     : m_Thread(thread), m_RunAgain(false)
00778 {
00779 }
00780 bool CWorkerNodeIdleTaskContext::IsShutdownRequested() const
00781 {
00782     return m_Thread.IsShutdownRequested();
00783 }
00784 void CWorkerNodeIdleTaskContext::Reset()
00785 {
00786     m_RunAgain = false;
00787 }
00788 
00789 void CWorkerNodeIdleTaskContext::RequestShutdown()
00790 {
00791     m_Thread.RequestShutdown();
00792     CGridGlobals::GetInstance().
00793         RequestShutdown(CNetScheduleAdmin::eShutdownImmediate);
00794 }
00795 
00796 /////////////////////////////////////////////////////////////////////////////
00797 //
00798 //    CIdleWatcher
00799 /// @internal
00800 class CIdleWatcher : public IWorkerNodeJobWatcher
00801 {
00802 public:
00803     CIdleWatcher(CWorkerNodeIdleThread& idle)
00804         : m_Idle(idle), m_RunningJobs(0) {}
00805     virtual ~CIdleWatcher() {};
00806     virtual void Notify(const CWorkerNodeJobContext& job, EEvent event)
00807     {
00808         if (event == eJobStarted) {
00809             ++m_RunningJobs;
00810             m_Idle.Suspend();
00811         } else if (event == eJobStopped) {
00812             --m_RunningJobs;
00813             if (m_RunningJobs == 0)
00814                 m_Idle.Schedule();
00815         }
00816     }
00817 
00818 private:
00819     CWorkerNodeIdleThread& m_Idle;
00820     volatile int m_RunningJobs;
00821 };
00822 
00823 
00824 /////////////////////////////////////////////////////////////////////////////
00825 //
00826 
00827 CGridWorkerNode::CGridWorkerNode(CNcbiApplication& app,
00828         IWorkerNodeJobFactory* job_factory) :
00829     m_JobFactory(job_factory),
00830     m_MaxThreads(1),
00831     m_NSTimeout(30),
00832     m_CheckStatusPeriod(2),
00833     m_ExclusiveJobSemaphore(1, 1),
00834     m_IsProcessingExclusiveJob(false),
00835     m_TotalMemoryLimit(0),
00836     m_TotalTimeLimit(0),
00837     m_StartupTime(0),
00838     m_CleanupEventSource(new CWorkerNodeCleanup()),
00839     m_Listener(new CGridWorkerNodeApp_Listener()),
00840     m_App(app),
00841     m_SingleThreadForced(false)
00842 {
00843     if (!m_JobFactory.get())
00844         NCBI_THROW(CGridWorkerNodeException,
00845                  eJobFactoryIsNotSet, "The JobFactory is not set.");
00846 }
00847 
00848 CGridWorkerNode::~CGridWorkerNode()
00849 {
00850 }
00851 
00852 void CGridWorkerNode::Init(bool default_merge_lines_value)
00853 {
00854     IRWRegistry& reg = m_App.GetConfig();
00855 
00856     if (reg.GetBool("log", "merge_lines", default_merge_lines_value)) {
00857         SetDiagPostFlag(eDPF_PreMergeLines);
00858         SetDiagPostFlag(eDPF_MergeLines);
00859     }
00860 
00861     reg.Set(kNetScheduleAPIDriverName, "discover_low_priority_servers", "true");
00862 
00863     m_NetScheduleAPI = CNetScheduleAPI(reg);
00864     m_NetCacheAPI = CNetCacheAPI(reg);
00865 }
00866 
00867 const char* kServerSec = "server";
00868 
00869 int CGridWorkerNode::Run()
00870 {
00871     LOG_POST_X(50, Info << GetJobFactory().GetJobVersion() <<
00872             " build " WN_BUILD_DATE);
00873 
00874     const IRegistry& reg = m_App.GetConfig();
00875     CConfig conf(reg);
00876 
00877     unsigned init_threads = 1;
00878 
00879     if (!m_SingleThreadForced) {
00880         string max_threads = reg.GetString(kServerSec, "max_threads", "auto");
00881         if (NStr::CompareNocase(max_threads, "auto") == 0)
00882             m_MaxThreads = GetCpuCount();
00883         else {
00884             try {
00885                 m_MaxThreads = NStr::StringToUInt(max_threads);
00886             }
00887             catch (exception&) {
00888                 m_MaxThreads = GetCpuCount();
00889                 ERR_POST_X(51, "Could not convert [" << kServerSec <<
00890                     "] max_threads parameter to number.\n"
00891                     "Using \'auto\' option (" << m_MaxThreads << ").");
00892             }
00893         }
00894         init_threads =
00895             reg.GetInt(kServerSec, "init_threads", 1, 0, IRegistry::eReturn);
00896     }
00897     if (init_threads > m_MaxThreads)
00898         init_threads = m_MaxThreads;
00899 
00900     m_NSTimeout = reg.GetInt(kServerSec,
00901         "job_wait_timeout", 30, 0, IRegistry::eReturn);
00902 
00903     unsigned thread_pool_timeout = reg.GetInt(kServerSec,
00904         "thread_pool_timeout", 30, 0, IRegistry::eReturn);
00905 
00906     const CArgs& args = m_App.GetArgs();
00907 
00908     unsigned int start_port, end_port;
00909 
00910     string sport1, sport2;
00911     NStr::SplitInTwo(args["control_port"] ? args["control_port"].AsString() :
00912         reg.GetString(kServerSec, "control_port", "9300"), "-", sport1, sport2);
00913     start_port = NStr::StringToUInt(sport1);
00914     end_port = sport2.empty() ? start_port : NStr::StringToUInt(sport2);
00915 
00916     bool log_requested = reg.GetBool(kServerSec,
00917         "log", false, 0, IRegistry::eReturn);
00918 
00919     unsigned int idle_run_delay =
00920         reg.GetInt(kServerSec,"idle_run_delay",30,0,IRegistry::eReturn);
00921 
00922     unsigned int auto_shutdown =
00923         reg.GetInt(kServerSec,"auto_shutdown_if_idle",0,0,IRegistry::eReturn);
00924 
00925     unsigned int infinite_loop_time =
00926         reg.HasEntry(kServerSec, "infinite_loop_time") ?
00927         reg.GetInt(kServerSec, "infinite_loop_time", 0, 0, IRegistry::eReturn) :
00928         reg.GetInt(kServerSec, "infinit_loop_time", 0, 0, IRegistry::eReturn);
00929 
00930     bool reuse_job_object =
00931         reg.GetBool(kServerSec, "reuse_job_object", false, 0,
00932                     CNcbiRegistry::eReturn);
00933 
00934     unsigned int max_total_jobs =
00935         reg.GetInt(kServerSec,"max_total_jobs",0,0,IRegistry::eReturn);
00936 
00937     unsigned int max_failed_jobs =
00938         reg.GetInt(kServerSec,"max_failed_jobs",0,0,IRegistry::eReturn);
00939 
00940     bool is_daemon =
00941         reg.GetBool(kServerSec, "daemon", false, 0, CNcbiRegistry::eReturn);
00942 
00943     {{
00944     string memlimitstr =
00945         reg.GetString(kServerSec,"total_memory_limit","",IRegistry::eReturn);
00946     if (!memlimitstr.empty()) {
00947         m_TotalMemoryLimit = NStr::StringToUInt8_DataSize(memlimitstr);
00948     }
00949     }}
00950 
00951     m_TotalTimeLimit =
00952         reg.GetInt(kServerSec,"total_time_limit", 0, 0, IRegistry::eReturn);
00953     m_StartupTime = time(0);
00954 
00955     vector<string> vhosts;
00956 
00957     NStr::Tokenize(reg.GetString(kServerSec,
00958         "master_nodes", kEmptyStr), " ;,", vhosts);
00959 
00960     ITERATE(vector<string>, it, vhosts) {
00961         string host, port;
00962         NStr::SplitInTwo(NStr::TruncateSpaces(*it), ":", host, port);
00963         if (host.empty() || port.empty())
00964             continue;
00965         m_Masters.insert(SServerAddress(NStr::ToLower(host),
00966             (unsigned short) NStr::StringToUInt(port)));
00967     }
00968 
00969     vhosts.clear();
00970 
00971     NStr::Tokenize(reg.GetString(kServerSec,
00972         "admin_hosts", kEmptyStr), " ;,", vhosts);
00973 
00974     ITERATE(vector<string>, it, vhosts) {
00975         unsigned int ha = CSocketAPI::gethostbyname(*it);
00976         if (ha != 0)
00977             m_AdminHosts.insert(ha);
00978     }
00979 
00980     m_CheckStatusPeriod =
00981         reg.GetInt(kServerSec, "check_status_period", 2, 0, IRegistry::eReturn);
00982 
00983     if (reg.HasEntry(kServerSec,"wait_server_timeout")) {
00984         ERR_POST_X(52, "[" << kServerSec <<
00985             "] \"wait_server_timeout\" is not used anymore.\n"
00986             "Use [" << kNetScheduleAPIDriverName <<
00987             "] \"communication_timeout\" parameter instead.");
00988     }
00989 
00990     CGridDebugContext::eMode debug_mode = CGridDebugContext::eGDC_NoDebug;
00991     string dbg_mode = reg.GetString("gw_debug", "mode", kEmptyStr);
00992     if (NStr::CompareNocase(dbg_mode, "gather")==0) {
00993         debug_mode = CGridDebugContext::eGDC_Gather;
00994     } else if (NStr::CompareNocase(dbg_mode, "execute")==0) {
00995         debug_mode = CGridDebugContext::eGDC_Execute;
00996     }
00997     if (debug_mode != CGridDebugContext::eGDC_NoDebug) {
00998         CGridDebugContext& debug_context = CGridDebugContext::Create(
00999             debug_mode, m_NetCacheAPI);
01000         string run_name =
01001             reg.GetString("gw_debug", "run_name", m_App.GetProgramDisplayName());
01002         debug_context.SetRunName(run_name);
01003         if (debug_mode == CGridDebugContext::eGDC_Gather) {
01004             max_total_jobs =
01005                 reg.GetInt("gw_debug","gather_nrequests",1,0,IRegistry::eReturn);
01006         } else if (debug_mode == CGridDebugContext::eGDC_Execute) {
01007             string files =
01008                 reg.GetString("gw_debug", "execute_requests", kEmptyStr);
01009             max_total_jobs = 0;
01010             debug_context.SetExecuteList(files);
01011         }
01012         is_daemon = false;
01013     }
01014 
01015 #if defined(NCBI_OS_UNIX)
01016     if (is_daemon) {
01017         LOG_POST_X(53, "Entering UNIX daemon mode...");
01018         bool daemon = CProcess::Daemonize("/dev/null",
01019                                           CProcess::fDontChroot |
01020                                           CProcess::fKeepStdin  |
01021                                           CProcess::fKeepStdout);
01022         if (!daemon) {
01023             return 0;
01024         }
01025     }
01026 #endif
01027 
01028     AttachJobWatcher(CGridGlobals::GetInstance().GetJobsWatcher());
01029 
01030     CRef<CGridControlThread> control_thread(
01031         new CGridControlThread(this, start_port, end_port));
01032 
01033     m_NetScheduleAPI.SetAuthParam("control_port",
01034             NStr::NumericToString(control_thread->GetControlPort()));
01035     m_NetScheduleAPI.SetAuthParam("client_host", CSocketAPI::gethostname());
01036 
01037     try {
01038         control_thread->Prepare();
01039     }
01040     catch (CServer_Exception& e) {
01041         if (e.GetErrCode() != CServer_Exception::eCouldntListen)
01042             throw;
01043         NCBI_THROW_FMT(CGridWorkerNodeException, ePortBusy,
01044             "Couldn't start a listener on a port from the specified "
01045             "control port range; last port tried: " <<
01046             control_thread->GetControlPort() << ". Another process "
01047             "(probably another instance of this worker node) is occupying "
01048             "the port(s).");
01049     }
01050 
01051     if (m_NetScheduleAPI->m_ClientNode.empty()) {
01052         string client_node(m_NetScheduleAPI->
01053                 m_Service->m_ServerPool->m_ClientName);
01054         client_node.append(2, ':');
01055         client_node.append(CSocketAPI::gethostname());
01056         client_node.append(1, ':');
01057         client_node.append(NStr::NumericToString(
01058             control_thread->GetControlPort()));
01059         m_NetScheduleAPI.SetClientNode(client_node);
01060 
01061         m_NetScheduleAPI.SetClientSession(GetDiagContext().GetStringUID());
01062     }
01063 
01064     m_NSExecutor = m_NetScheduleAPI.GetExecutor();
01065 
01066     m_NetScheduleAPI.SetProgramVersion(m_JobFactory->GetJobVersion());
01067 
01068     CGridGlobals::GetInstance().SetReuseJobObject(reuse_job_object);
01069     CGridGlobals::GetInstance().GetJobsWatcher().SetMaxJobsAllowed(max_total_jobs);
01070     CGridGlobals::GetInstance().GetJobsWatcher().SetMaxFailuresAllowed(max_failed_jobs);
01071     CGridGlobals::GetInstance().GetJobsWatcher().SetInfinitLoopTime(infinite_loop_time);
01072     CGridGlobals::GetInstance().SetWorker(*this);
01073 
01074     IWorkerNodeIdleTask* task = NULL;
01075     if (idle_run_delay > 0)
01076         task = GetJobFactory().GetIdleTask();
01077     if (task || auto_shutdown > 0 ) {
01078         m_IdleThread.Reset(new CWorkerNodeIdleThread(task, *this,
01079                                                      task ? idle_run_delay : auto_shutdown,
01080                                                      auto_shutdown));
01081         m_IdleThread->Run();
01082         AttachJobWatcher(*(new CIdleWatcher(*m_IdleThread)), eTakeOwnership);
01083     }
01084 
01085     control_thread->Run();
01086 
01087     LOG_POST_X(54, Info << "\n=================== NEW RUN : " <<
01088         CGridGlobals::GetInstance().GetStartTime().AsString() <<
01089             " ===================\n" <<
01090         GetJobFactory().GetJobVersion() << " build " WN_BUILD_DATE <<
01091         " is started.\n"
01092         "Waiting for control commands on " << CSocketAPI::gethostname() <<
01093             ":" << control_thread->GetControlPort() << "\n"
01094         "Queue name: " << GetQueueName() << "\n"
01095         "Maximum job threads: " << m_MaxThreads << "\n");
01096 
01097     m_Listener->OnGridWorkerStart();
01098 
01099     auto_ptr<CGridThreadContext> single_thread_context;
01100     auto_ptr<CStdPoolOfThreads> thread_pool;
01101 
01102     _ASSERT(m_MaxThreads > 0);
01103 
01104     if (m_MaxThreads <= 1)
01105         single_thread_context.reset(new CGridThreadContext(*this));
01106     else {
01107         thread_pool.reset(new CStdPoolOfThreads(m_MaxThreads, 0));
01108         try {
01109             thread_pool->Spawn(init_threads);
01110         }
01111         catch (exception& ex) {
01112             ERR_POST_X(26, ex.what());
01113             CGridGlobals::GetInstance().RequestShutdown(
01114                 CNetScheduleAdmin::eShutdownImmediate);
01115             return 2;
01116         }
01117     }
01118 
01119     CNetScheduleJob job;
01120 
01121     unsigned try_count = 0;
01122     while (!CGridGlobals::GetInstance().IsShuttingDown()) {
01123         try {
01124 
01125             if (m_MaxThreads > 1) {
01126                 try {
01127                     thread_pool->WaitForRoom(thread_pool_timeout);
01128                 } catch (CBlockingQueueException&) {
01129                     // threaded pool is busy
01130                     continue;
01131                 }
01132             }
01133 
01134             if (x_GetNextJob(job)) {
01135                 auto_ptr<CWorkerNodeJobContext>
01136                     job_context(new CWorkerNodeJobContext(*this,
01137                                                           job,
01138                                                           log_requested));
01139                 job_context->Reset(job);
01140                 if (m_MaxThreads > 1) {
01141                     try {
01142                         thread_pool->AcceptRequest(CRef<CStdRequest>(
01143                                 new CWorkerNodeRequest(job_context)));
01144                     } catch (CBlockingQueueException& ex) {
01145                         ERR_POST_X(28, ex.what());
01146                         // that must not happen after CBlockingQueue is fixed
01147                         _ASSERT(0);
01148                         x_ReturnJob(job.job_id, job.auth_token);
01149                     }
01150                 } else {
01151                     try {
01152                         single_thread_context->RunJob(*job_context);
01153                     } catch (exception&) {
01154                         x_ReturnJob(job.job_id, job.auth_token);
01155                         throw;
01156                     }
01157                 }
01158             }
01159         } catch (CNetServiceException& ex) {
01160             ERR_POST_X(40, ex.what());
01161             if (++try_count >= TServConn_ConnMaxRetries::GetDefault()) {
01162                 CGridGlobals::GetInstance().RequestShutdown(
01163                     CNetScheduleAdmin::eShutdownImmediate);
01164             } else {
01165                 SleepMilliSec(s_GetRetryDelay());
01166                 continue;
01167             }
01168         } catch (exception& ex) {
01169             if (TWorkerNode_StopOnJobErrors::GetDefault()) {
01170                 ERR_POST_X(29, ex.what());
01171                 CGridGlobals::GetInstance().RequestShutdown(
01172                     CNetScheduleAdmin::eShutdownImmediate);
01173             }
01174         }
01175         try_count = 0;
01176     }
01177     LOG_POST_X(31, Info << "Shutting down...");
01178     if (reg.GetBool(kServerSec,
01179             "force_exit", false, 0, CNcbiRegistry::eReturn)) {
01180         ERR_POST_X(45, "Force exit");
01181     } else if (m_MaxThreads > 1) {
01182         try {
01183             LOG_POST_X(32, Info << "Stopping worker threads...");
01184             thread_pool->KillAllThreads(true);
01185             thread_pool.reset(0);
01186         }
01187         catch (exception& ex) {
01188             ERR_POST_X(33, "Could not stop worker threads: " << ex.what());
01189         }
01190     }
01191     try {
01192         GetNSExecutor().UnRegisterClient();
01193     }
01194     catch (CNetServiceException& ex) {
01195         // if server does not understand this new command just ignore the error
01196         if (ex.GetErrCode() != CNetServiceException::eCommunicationError
01197             || NStr::Find(ex.what(),"Server error:Unknown request") == NPOS) {
01198             ERR_POST_X(35, "Could not unregister from NetSchedule services: "
01199                        << ex.what());
01200         }
01201     }
01202     catch (exception& ex) {
01203         ERR_POST_X(36, "Could not unregister from NetSchedule services: "
01204                    << ex.what());
01205     }
01206 
01207     LOG_POST_X(38, Info << "Worker Node has been stopped.");
01208 
01209 
01210     CRef<CGridCleanupThread> cleanup_thread(
01211         new CGridCleanupThread(this, m_Listener.get()));
01212 
01213     cleanup_thread->Run();
01214 
01215     LOG_POST_X(55, Info << "Stopping Control thread...");
01216     control_thread->Stop();
01217     control_thread->Join();
01218 
01219     CNcbiOstrstream os;
01220     CGridGlobals::GetInstance().GetJobsWatcher().Print(os);
01221     LOG_POST_X(56, Info << string(CNcbiOstrstreamToString(os)));
01222 
01223     if (m_IdleThread) {
01224         if (!m_IdleThread->IsShutdownRequested()) {
01225             LOG_POST_X(57, "Stopping Idle thread...");
01226             m_IdleThread->RequestShutdown();
01227         }
01228         m_IdleThread->Join();
01229     }
01230 
01231     if (cleanup_thread->Wait(thread_pool_timeout)) {
01232         cleanup_thread->Join();
01233         LOG_POST_X(58, Info << "Cleanup thread finished");
01234     } else {
01235         ERR_POST_X(59, "Clean-up thread timed out");
01236     }
01237 
01238     return CGridGlobals::GetInstance().GetExitCode();
01239 }
01240 
01241 void CGridWorkerNode::RequestShutdown()
01242 {
01243     CGridGlobals::GetInstance().
01244         RequestShutdown(CNetScheduleAdmin::eShutdownImmediate);
01245 }
01246 
01247 
01248 void CGridWorkerNode::AttachJobWatcher(IWorkerNodeJobWatcher& job_watcher,
01249                                            EOwnership owner)
01250 {
01251     if (!m_JobWatchers.get())
01252         m_JobWatchers.reset(new CWorkerNodeJobWatchers);
01253     m_JobWatchers->AttachJobWatcher(job_watcher, owner);
01254 };
01255 
01256 void CGridWorkerNode::SetListener(IGridWorkerNodeApp_Listener* listener)
01257 {
01258     m_Listener.reset(listener ? listener : new CGridWorkerNodeApp_Listener());
01259 }
01260 
01261 
01262 void CGridWorkerNode::x_NotifyJobWatcher(const CWorkerNodeJobContext& job,
01263     IWorkerNodeJobWatcher::EEvent event)
01264 {
01265     if (m_JobWatchers.get() != NULL) {
01266         CFastMutexGuard guard(m_JobWatcherMutex);
01267         m_JobWatchers->Notify(job, event);
01268     }
01269 }
01270 
01271 bool CGridWorkerNode::x_GetNextJob(CNetScheduleJob& job)
01272 {
01273     bool job_exists = false;
01274 
01275     CGridDebugContext* debug_context = CGridDebugContext::GetInstance();
01276 
01277     if (debug_context &&
01278         debug_context->GetDebugMode() == CGridDebugContext::eGDC_Execute) {
01279         job_exists = debug_context->GetNextJob(job.job_id, job.input);
01280         if (!job_exists) {
01281             CGridGlobals::GetInstance().
01282                 RequestShutdown(CNetScheduleAdmin::eNormalShutdown);
01283         }
01284     } else {
01285         if (!x_AreMastersBusy()) {
01286             SleepSec(m_NSTimeout);
01287             return false;
01288         }
01289 
01290         if (!WaitForExclusiveJobToFinish())
01291             return false;
01292 
01293         job_exists = GetNSExecutor().GetJob(job, m_NSTimeout);
01294 
01295         if (job_exists && job.mask & CNetScheduleAPI::eExclusiveJob) {
01296             if (EnterExclusiveMode())
01297                 job_exists = true;
01298             else {
01299                 x_ReturnJob(job.job_id, job.auth_token);
01300                 job_exists = false;
01301             }
01302         }
01303     }
01304     if (job_exists && CGridGlobals::GetInstance().IsShuttingDown()) {
01305         x_ReturnJob(job.job_id, job.auth_token);
01306         return false;
01307     }
01308     return job_exists;
01309 }
01310 
01311 void CGridWorkerNode::x_ReturnJob(const string& job_key,
01312         const string& auth_token)
01313 {
01314     CGridDebugContext* debug_context = CGridDebugContext::GetInstance();
01315     if (!debug_context ||
01316             debug_context->GetDebugMode() != CGridDebugContext::eGDC_Execute) {
01317          GetNSExecutor().ReturnJob(job_key, auth_token);
01318     }
01319 }
01320 
01321 void CGridWorkerNode::x_FailJob(const CNetScheduleJob& job, const string& reason)
01322 {
01323     CGridDebugContext* debug_context = CGridDebugContext::GetInstance();
01324     if (!debug_context ||
01325             debug_context->GetDebugMode() != CGridDebugContext::eGDC_Execute) {
01326         CNetScheduleJob job_copy(job);
01327         job_copy.error_msg = reason;
01328         GetNSExecutor().PutFailure(job_copy);
01329     }
01330 }
01331 
01332 size_t CGridWorkerNode::GetServerOutputSize()
01333 {
01334     return m_NetScheduleAPI->m_UseEmbeddedStorage ?
01335         m_NetScheduleAPI.GetServerParams().max_output_size : 0;
01336 }
01337 
01338 bool CGridWorkerNode::IsHostInAdminHostsList(const string& host) const
01339 {
01340     if (m_AdminHosts.empty())
01341         return true;
01342     unsigned int ha = CSocketAPI::gethostbyname(host);
01343     if (m_AdminHosts.find(ha) != m_AdminHosts.end())
01344         return true;
01345     unsigned int ha_lh = CSocketAPI::gethostbyname("");
01346     if (ha == ha_lh) {
01347         ha = CSocketAPI::gethostbyname("localhost");
01348         if (m_AdminHosts.find(ha) != m_AdminHosts.end())
01349             return true;
01350     }
01351     return false;
01352 }
01353 
01354 bool CGridWorkerNode::x_AreMastersBusy() const
01355 {
01356     ITERATE(set<SServerAddress>, it, m_Masters) {
01357         STimeout tmo = {0, 500};
01358         CSocket socket(it->host, it->port, &tmo, eOff);
01359         if (socket.GetStatus(eIO_Open) != eIO_Success)
01360             continue;
01361 
01362         CNcbiOstrstream os;
01363         os << GetJobVersion() << endl;
01364         os << GetNetScheduleAPI().GetQueueName()  <<";"
01365            << GetServiceName();
01366         os << endl;
01367         os << "GETLOAD" << ends;
01368         if (socket.Write(os.str(), (size_t)os.pcount()) != eIO_Success) {
01369             os.freeze(false);
01370             continue;
01371         }
01372         os.freeze(false);
01373         string reply;
01374         if (socket.ReadLine(reply) != eIO_Success)
01375             continue;
01376         if (NStr::StartsWith(reply, "ERR:")) {
01377             string msg;
01378             NStr::Replace(reply, "ERR:", "", msg);
01379             ERR_POST_X(43, "Worker Node at " << it->AsString() <<
01380                 " returned error: " << msg);
01381         } else if (NStr::StartsWith(reply, "OK:")) {
01382             string msg;
01383             NStr::Replace(reply, "OK:", "", msg);
01384             try {
01385                 int load = NStr::StringToInt(msg);
01386                 if (load > 0)
01387                     return false;
01388             } catch (exception&) {}
01389         } else {
01390             ERR_POST_X(44, "Worker Node at " << it->AsString() <<
01391                 " returned unknown reply: " << reply);
01392         }
01393     }
01394     return true;
01395 }
01396 
01397 bool CGridWorkerNode::EnterExclusiveMode()
01398 {
01399     if (m_ExclusiveJobSemaphore.TryWait()) {
01400         _ASSERT(!m_IsProcessingExclusiveJob);
01401 
01402         m_IsProcessingExclusiveJob = true;
01403         return true;
01404     }
01405     return false;
01406 }
01407 
01408 void CGridWorkerNode::LeaveExclusiveMode()
01409 {
01410     _ASSERT(m_IsProcessingExclusiveJob);
01411 
01412     m_IsProcessingExclusiveJob = false;
01413     m_ExclusiveJobSemaphore.Post();
01414 }
01415 
01416 bool CGridWorkerNode::WaitForExclusiveJobToFinish()
01417 {
01418     if (m_ExclusiveJobSemaphore.TryWait(m_NSTimeout)) {
01419         m_ExclusiveJobSemaphore.Post();
01420         return true;
01421     }
01422     return false;
01423 }
01424 
01425 void CGridWorkerNode::DisableDefaultRequestEventLogging(
01426     CGridWorkerNode::EDisabledRequestEvents disabled_events)
01427 {
01428     s_ReqEventsDisabled = disabled_events;
01429 }
01430 
01431 IWorkerNodeCleanupEventSource* CGridWorkerNode::GetCleanupEventSource()
01432 {
01433     return m_CleanupEventSource;
01434 }
01435 
01436 END_NCBI_SCOPE
Modified on Wed May 23 12:58:07 2012 by modify_doxy.py rev. 337098