src/connect/services/grid_worker.cpp

Go to the documentation of this file.
00001 /*  $Id: grid_worker.cpp 177980 2009-12-07 20:09:42Z kazimird $
00002  * ===========================================================================
00003  *
00004  *                            PUBLIC DOMAIN NOTICE
00005  *               National Center for Biotechnology Information
00006  *
00007  *  This software/database is a "United States Government Work" under the
00008  *  terms of the United States Copyright Act.  It was written as part of
00009  *  the author's official duties as a United States Government employee and
00010  *  thus cannot be copyrighted.  This software/database is freely available
00011  *  to the public for use. The National Library of Medicine and the U.S.
00012  *   Government have not placed any restriction on its use or reproduction.
00013  *
00014  *  Although all reasonable efforts have been taken to ensure the accuracy
00015  *  and reliability of the software and data, the NLM and the U.S.
00016  *  Government do not and cannot warrant the performance or results that
00017  *  may be obtained by using this software or data. The NLM and the U.S.
00018  *  Government disclaim all warranties, express or implied, including
00019  *  warranties of performance, merchantability or fitness for any particular
00020  *  purpose.
00021  *
00022  *  Please cite the author in any work or product based on this material.
00023  *
00024  * ===========================================================================
00025  *
00026  * Authors:  Maxim Didenko, Anatoliy Kuznetsov, Dmitry Kazimirov
00027  *
00028  * File Description:
00029  *    NetSchedule Worker Node implementation
00030  */
00031 
00032 #include <ncbi_pch.hpp>
00033 
00034 #include "grid_thread_context.hpp"
00035 #include "netservice_params.hpp"
00036 #include "balancing.hpp"
00037 
00038 #include <connect/services/grid_globals.hpp>
00039 #include <connect/services/error_codes.hpp>
00040 #include <connect/services/grid_debug_context.hpp>
00041 #include <connect/services/netschedule_api_expt.hpp>
00042 #include <connect/services/grid_worker_app.hpp>
00043 #include <connect/services/grid_control_thread.hpp>
00044 
00045 #include <connect/ncbi_socket.hpp>
00046 
00047 #include <util/thread_pool.hpp>
00048 
00049 #include <corelib/ncbireg.hpp>
00050 #include <corelib/ncbithr.hpp>
00051 #include <corelib/ncbitime.hpp>
00052 #include <corelib/ncbi_config.hpp>
00053 #include <corelib/ncbi_process.hpp>
00054 #include <corelib/ncbiexpt.hpp>
00055 #include <corelib/ncbi_system.hpp>
00056 #include <corelib/ncbi_safe_static.hpp>
00057 #include <corelib/request_ctx.hpp>
00058 #include <corelib/blob_storage.hpp>
00059 
00060 #ifdef NCBI_OS_UNIX
00061 #include <unistd.h>
00062 #endif
00063 
00064 
00065 #define NCBI_USE_ERRCODE_X   ConnServ_WorkerNode
00066 
00067 
00068 BEGIN_NCBI_SCOPE
00069 
00070 /////////////////////////////////////////////////////////////////////////////
00071 //
00072 IWorkerNodeJobWatcher::~IWorkerNodeJobWatcher()
00073 {
00074 }
00075 
00076 /////////////////////////////////////////////////////////////////////////////
00077 //
00078 //     CWorkerNodeJobContext     --
00079 
00080 namespace {
00081 
00082 class CWorkerNodeCleanup : public IWorkerNodeCleanupEventSource
00083 {
00084 public:
00085     typedef set<IWorkerNodeCleanupEventListener*> TListeners;
00086 
00087     virtual void AddListener(IWorkerNodeCleanupEventListener* listener);
00088     virtual void RemoveListener(IWorkerNodeCleanupEventListener* listener);
00089 
00090     virtual void CallEventHandlers();
00091 
00092     void RemoveListeners(const TListeners& listeners);
00093 
00094 protected:
00095     TListeners m_Listeners;
00096     CFastMutex m_ListenersLock;
00097 };
00098 
00099 void CWorkerNodeCleanup::AddListener(IWorkerNodeCleanupEventListener* listener)
00100 {
00101     CFastMutexGuard g(m_ListenersLock);
00102     m_Listeners.insert(listener);
00103 }
00104 
00105 void CWorkerNodeCleanup::RemoveListener(
00106     IWorkerNodeCleanupEventListener* listener)
00107 {
00108     CFastMutexGuard g(m_ListenersLock);
00109     m_Listeners.erase(listener);
00110 }
00111 
00112 void CWorkerNodeCleanup::CallEventHandlers()
00113 {
00114     TListeners listeners;
00115     {
00116         CFastMutexGuard g(m_ListenersLock);
00117         listeners.swap(m_Listeners);
00118     }
00119 
00120     ITERATE(TListeners, it, listeners) {
00121         try {
00122             (*it)->HandleEvent(
00123                 IWorkerNodeCleanupEventListener::eRegularCleanup);
00124             delete *it;
00125         }
00126         NCBI_CATCH_ALL_X(39, "Job clean-up error");
00127     }
00128 }
00129 
00130 void CWorkerNodeCleanup::RemoveListeners(
00131     const CWorkerNodeCleanup::TListeners& listeners)
00132 {
00133     CFastMutexGuard g(m_ListenersLock);
00134     ITERATE(TListeners, it, listeners) {
00135         m_Listeners.erase(*it);
00136     }
00137 }
00138 
00139 class CWorkerNodeJobCleanup : public CWorkerNodeCleanup
00140 {
00141 public:
00142     CWorkerNodeJobCleanup(CWorkerNodeCleanup* worker_node_cleanup);
00143 
00144     virtual void AddListener(IWorkerNodeCleanupEventListener* listener);
00145     virtual void RemoveListener(IWorkerNodeCleanupEventListener* listener);
00146 
00147     virtual void CallEventHandlers();
00148 
00149 private:
00150     CWorkerNodeCleanup* m_WorkerNodeCleanup;
00151 };
00152 
00153 CWorkerNodeJobCleanup::CWorkerNodeJobCleanup(
00154         CWorkerNodeCleanup* worker_node_cleanup) :
00155     m_WorkerNodeCleanup(worker_node_cleanup)
00156 {
00157 }
00158 
00159 void CWorkerNodeJobCleanup::AddListener(
00160     IWorkerNodeCleanupEventListener* listener)
00161 {
00162     CWorkerNodeCleanup::AddListener(listener);
00163     m_WorkerNodeCleanup->AddListener(listener);
00164 }
00165 
00166 void CWorkerNodeJobCleanup::RemoveListener(
00167     IWorkerNodeCleanupEventListener* listener)
00168 {
00169     CWorkerNodeCleanup::RemoveListener(listener);
00170     m_WorkerNodeCleanup->RemoveListener(listener);
00171 }
00172 
00173 void CWorkerNodeJobCleanup::CallEventHandlers()
00174 {
00175     {
00176         CFastMutexGuard g(m_ListenersLock);
00177         m_WorkerNodeCleanup->RemoveListeners(m_Listeners);
00178     }
00179     CWorkerNodeCleanup::CallEventHandlers();
00180 }
00181 
00182 } // end of anonymous namespace
00183 
00184 CWorkerNodeJobContext::CWorkerNodeJobContext(CGridWorkerNode& worker_node,
00185                                              const CNetScheduleJob&    job,
00186                                              bool             log_requested)
00187     : m_WorkerNode(worker_node),
00188       m_LogRequested(log_requested),
00189       m_ThreadContext(NULL),
00190       m_CleanupEventSource(new CWorkerNodeJobCleanup(
00191         static_cast<CWorkerNodeCleanup*>(worker_node.GetCleanupEventSource())))
00192 {
00193     Reset(job);
00194 }
00195 
00196 const string& CWorkerNodeJobContext::GetQueueName() const
00197 {
00198     return m_WorkerNode.GetQueueName();
00199 }
00200 const string& CWorkerNodeJobContext::GetClientName() const
00201 {
00202     return m_WorkerNode.GetClientName();
00203 }
00204 
00205 CNcbiIstream& CWorkerNodeJobContext::GetIStream(IBlobStorage::ELockMode mode)
00206 {
00207     _ASSERT(m_ThreadContext);
00208     return m_ThreadContext->GetIStream(mode);
00209 }
00210 CNcbiOstream& CWorkerNodeJobContext::GetOStream()
00211 {
00212     _ASSERT(m_ThreadContext);
00213     return m_ThreadContext->GetOStream();
00214 }
00215 
00216 void CWorkerNodeJobContext::PutProgressMessage(const string& msg,
00217                                                bool send_immediately)
00218 {
00219     _ASSERT(m_ThreadContext);
00220     m_ThreadContext->PutProgressMessage(msg, send_immediately);
00221 }
00222 
00223 void CWorkerNodeJobContext::SetThreadContext(CGridThreadContext* thr_context)
00224 {
00225     m_ThreadContext = thr_context;
00226 }
00227 
00228 void CWorkerNodeJobContext::SetJobRunTimeout(unsigned time_to_run)
00229 {
00230     _ASSERT(m_ThreadContext);
00231     m_ThreadContext->SetJobRunTimeout(time_to_run);
00232 }
00233 
00234 void CWorkerNodeJobContext::JobDelayExpiration(unsigned runtime_inc)
00235 {
00236     _ASSERT(m_ThreadContext);
00237     m_ThreadContext->JobDelayExpiration(runtime_inc);
00238 }
00239 
00240 CNetScheduleAdmin::EShutdownLevel
00241 CWorkerNodeJobContext::GetShutdownLevel(void) const
00242 {
00243     _ASSERT(m_ThreadContext);
00244     if (m_ThreadContext->IsJobCanceled())
00245         return CNetScheduleAdmin::eShutdownImmediate;
00246     return CGridGlobals::GetInstance().GetShutdownLevel();
00247 }
00248 
00249 void CWorkerNodeJobContext::Reset(const CNetScheduleJob& job)
00250 {
00251     m_Job = job;
00252     m_JobNumber = CGridGlobals::GetInstance().GetNewJobNumber();
00253 
00254     m_JobCommitted = eNotCommitted;
00255     m_InputBlobSize = 0;
00256     m_ExclusiveJob = m_Job.mask & CNetScheduleAPI::eExclusiveJob;
00257 }
00258 
00259 void CWorkerNodeJobContext::RequestExclusiveMode()
00260 {
00261     if (!m_ExclusiveJob) {
00262         if (!m_WorkerNode.EnterExclusiveMode()) {
00263             NCBI_THROW(CGridWorkerNodeException,
00264                 eExclusiveModeIsAlreadySet, "");
00265         }
00266         m_ExclusiveJob = true;
00267     }
00268 }
00269 
00270 size_t CWorkerNodeJobContext::GetMaxServerOutputSize() const
00271 {
00272     return m_WorkerNode.GetServerOutputSize();
00273 }
00274 
00275 IWorkerNodeCleanupEventSource* CWorkerNodeJobContext::GetCleanupEventSource()
00276 {
00277     return m_CleanupEventSource;
00278 }
00279 
00280 
00281 /////////////////////////////////////////////////////////////////////////////
00282 //
00283 ///@internal
00284 static void s_TlsCleanup(CGridThreadContext* p_value, void* /* data */ )
00285 {
00286     delete p_value;
00287 }
00288 /// @internal
00289 static CStaticTls<CGridThreadContext> s_tls;
00290 
00291 ///@internal
00292 class CWorkerNodeRequest : public CStdRequest
00293 {
00294 public:
00295     CWorkerNodeRequest(auto_ptr<CWorkerNodeJobContext> context);
00296 
00297     virtual void Process();
00298 
00299 private:
00300     void x_HandleProcessError(exception* ex = NULL);
00301 
00302     auto_ptr<CWorkerNodeJobContext> m_JobContext;
00303 };
00304 
00305 
00306 CWorkerNodeRequest::CWorkerNodeRequest(auto_ptr<CWorkerNodeJobContext> context)
00307   : m_JobContext(context)
00308 {
00309 }
00310 
00311 
00312 void CGridThreadContext::x_HandleRunJobError(exception* ex /*= NULL*/)
00313 {
00314     string msg = " Error in Job execution";
00315     if (ex) {
00316         msg += ": ";
00317         msg += ex->what();
00318     }
00319     ERR_POST_X(18, m_JobContext->GetJobKey() << msg);
00320     try {
00321         PutFailure(ex ? ex->what() : "Unknown error");
00322     } catch (exception& ex1) {
00323         ERR_POST_X(19, "Failed to report exception: " <<
00324             m_JobContext->GetJobKey() << " " << ex1.what());
00325     } catch (...) {
00326         ERR_POST_X(20, "Failed to report exception while processing " <<
00327             m_JobContext->GetJobKey());
00328     }
00329 }
00330 
00331 static bool s_ReqEventsDisabled = false;
00332 
00333 class CRequestStateGuard
00334 {
00335 public:
00336     CRequestStateGuard(CWorkerNodeJobContext& job_context);
00337 
00338     void RequestStart();
00339     void RequestStop();
00340 
00341     ~CRequestStateGuard();
00342 
00343 private:
00344     CWorkerNodeJobContext& m_JobContext;
00345 };
00346 
00347 CRequestStateGuard::CRequestStateGuard(CWorkerNodeJobContext& job_context) :
00348     m_JobContext(job_context)
00349 {
00350     CRequestContext& request_context = CDiagContext::GetRequestContext();
00351 
00352     request_context.SetRequestID((int) job_context.GetJobNumber());
00353 
00354     const CNetScheduleJob& job = job_context.GetJob();
00355 
00356     if (!job.client_ip.empty())
00357         request_context.SetClientIP(job.client_ip);
00358 
00359     if (!job.session_id.empty())
00360         request_context.SetSessionID(job.session_id);
00361 
00362     request_context.SetAppState(eDiagAppState_RequestBegin);
00363 
00364     if (!s_ReqEventsDisabled)
00365         GetDiagContext().PrintRequestStart().Print("jid", job.job_id);
00366 }
00367 
00368 inline void CRequestStateGuard::RequestStart()
00369 {
00370     CDiagContext::GetRequestContext().SetAppState(eDiagAppState_Request);
00371 }
00372 
00373 inline void CRequestStateGuard::RequestStop()
00374 {
00375     CDiagContext::GetRequestContext().SetAppState(eDiagAppState_RequestEnd);
00376 }
00377 
00378 CRequestStateGuard::~CRequestStateGuard()
00379 {
00380     CRequestContext& request_context = CDiagContext::GetRequestContext();
00381 
00382     if (!request_context.IsSetRequestStatus())
00383         request_context.SetRequestStatus(
00384             m_JobContext.GetCommitStatus() == CWorkerNodeJobContext::eDone &&
00385                 m_JobContext.GetJob().ret_code == 0 ? 200 : 500);
00386 
00387     switch (request_context.GetAppState()) {
00388     case eDiagAppState_Request:
00389         request_context.SetAppState(eDiagAppState_RequestEnd);
00390         /* FALL THROUGH */
00391 
00392     default:
00393         if (!s_ReqEventsDisabled)
00394             GetDiagContext().PrintRequestStop();
00395         request_context.SetAppState(eDiagAppState_NotSet);
00396         request_context.UnsetSessionID();
00397         request_context.UnsetClientIP();
00398     }
00399 }
00400 
00401 void CGridThreadContext::RunJobs(CWorkerNodeJobContext& job_context)
00402 {
00403     _ASSERT(!m_JobContext);
00404     m_JobContext = &job_context;
00405     job_context.SetThreadContext(this);
00406 
00407     bool more_jobs;
00408 
00409     do {
00410         more_jobs = false;
00411 
00412         CGridDebugContext* debug_context = CGridDebugContext::GetInstance();
00413         if (debug_context) {
00414             debug_context->DumpInput(m_JobContext->GetJobInput(),
00415                 m_JobContext->GetJobNumber());
00416         }
00417         m_JobContext->GetWorkerNode().x_NotifyJobWatcher(*m_JobContext,
00418             IWorkerNodeJobWatcher::eJobStarted);
00419 
00420         CRequestStateGuard request_state_guard(job_context);
00421 
00422         request_state_guard.RequestStart();
00423 
00424         CNetScheduleJob new_job;
00425         try {
00426             CRef<IWorkerNodeJob> job(GetJob());
00427             try {
00428                 job_context.SetJobRetCode(job->Do(job_context));
00429             } catch (CGridWorkerNodeException& ex) {
00430                 if (ex.GetErrCode() !=
00431                     CGridWorkerNodeException::eExclusiveModeIsAlreadySet)
00432                     throw;
00433 
00434                 if (job_context.IsLogRequested()) {
00435                     LOG_POST_X(21, "Job " << job_context.GetJobKey() <<
00436                         " has been returned back to the queue "
00437                         "because it requested exclusive mode but "
00438                         "another job already has the exclusive status.");
00439                 }
00440             }
00441             CloseStreams();
00442             unsigned try_count = 0;
00443             for (;;) {
00444                 try {
00445                     switch (job_context.GetCommitStatus()) {
00446                         case CWorkerNodeJobContext::eDone:
00447                             more_jobs = PutResult(new_job);
00448                             break;
00449 
00450                         case CWorkerNodeJobContext::eFailure:
00451                             PutFailure(kEmptyStr);
00452                             break;
00453 
00454                         case CWorkerNodeJobContext::eNotCommitted:
00455                             if (!TWorkerNode_AllowImplicitJobReturn::
00456                                         GetDefault() &&
00457                                     job_context.GetShutdownLevel() ==
00458                                         CNetScheduleAdmin::eNoShutdown) {
00459                                 PutFailure("Job was not explicitly committed");
00460                                 break;
00461                             }
00462                             /* FALL THROUGH */
00463 
00464                         case CWorkerNodeJobContext::eReturn:
00465                             ReturnJob();
00466                     }
00467                     break;
00468                 } catch (CNetServiceException& ex) {
00469                     if (++try_count >= TServConn_ConnMaxRetries::GetDefault())
00470                         throw;
00471                     ERR_POST_X(22, "Communication Error : " << ex.what());
00472                     SleepMilliSec(s_GetRetryDelay());
00473                 }
00474             }
00475 
00476             if (!CGridGlobals::GetInstance().IsShuttingDown())
00477                 static_cast<CWorkerNodeJobCleanup*>(
00478                     job_context.GetCleanupEventSource())->CallEventHandlers();
00479 
00480             request_state_guard.RequestStop();
00481         }
00482         catch (exception& ex) {
00483             x_HandleRunJobError(&ex);
00484         }
00485         catch (...) {
00486             x_HandleRunJobError();
00487         }
00488 
00489         CloseStreams();
00490 
00491         _ASSERT(m_JobContext);
00492         m_JobContext->GetWorkerNode().x_NotifyJobWatcher(*m_JobContext,
00493             IWorkerNodeJobWatcher::eJobStopped);
00494 
00495         if (more_jobs)
00496             job_context.Reset(new_job);
00497     } while (more_jobs);
00498 
00499     m_JobContext->SetThreadContext(NULL);
00500     m_JobContext = NULL;
00501 }
00502 
00503 void CWorkerNodeRequest::x_HandleProcessError(exception* ex)
00504 {
00505     string msg = " Error during job run";
00506     if (ex) {
00507         msg += ": ";
00508         msg += ex->what();
00509     }
00510     ERR_POST_X(23, msg);
00511     try {
00512         m_JobContext->GetWorkerNode().x_ReturnJob(m_JobContext->GetJobKey());
00513     } catch (exception& ex1) {
00514         ERR_POST_X(24, "Could not return job back to queue: " << ex1.what());
00515     } catch (...) {
00516         ERR_POST_X(25, "Could not return job back to queue.");
00517     }
00518 }
00519 
00520 void CWorkerNodeRequest::Process()
00521 {
00522     try {
00523         CGridThreadContext* thread_context = s_tls.GetValue();
00524         if (!thread_context) {
00525             thread_context =
00526                 new CGridThreadContext(m_JobContext->GetWorkerNode());
00527             s_tls.SetValue(thread_context, s_TlsCleanup);
00528         }
00529         thread_context->RunJobs(*m_JobContext);
00530     }
00531     catch (exception& ex) { x_HandleProcessError(&ex);  }
00532     catch (...) { x_HandleProcessError(); }
00533 }
00534 
00535 /////////////////////////////////////////////////////////////////////////////
00536 //
00537 //     CWorkerNodeJobWatchers
00538 /// @internal
00539 
00540 class CWorkerNodeJobWatchers : public IWorkerNodeJobWatcher
00541 {
00542 public:
00543     virtual ~CWorkerNodeJobWatchers() {};
00544     virtual void Notify(const CWorkerNodeJobContext& job, EEvent event)
00545     {
00546         NON_CONST_ITERATE(TCont, it, m_Watchers) {
00547             IWorkerNodeJobWatcher* watcher =
00548                 const_cast<IWorkerNodeJobWatcher*>(it->first);
00549             watcher->Notify(job, event);
00550         }
00551     }
00552 
00553     void AttachJobWatcher(IWorkerNodeJobWatcher& job_watcher, EOwnership owner)
00554     {
00555         TCont::const_iterator it = m_Watchers.find(&job_watcher);
00556         if (it == m_Watchers.end()) {
00557             if (owner == eTakeOwnership)
00558                 m_Watchers[&job_watcher] = AutoPtr<IWorkerNodeJobWatcher>(&job_watcher);
00559             else
00560                 m_Watchers[&job_watcher] = AutoPtr<IWorkerNodeJobWatcher>();
00561         }
00562     }
00563 
00564 private:
00565     typedef map<IWorkerNodeJobWatcher*, AutoPtr<IWorkerNodeJobWatcher> > TCont;
00566     TCont m_Watchers;
00567 };
00568 
00569 /////////////////////////////////////////////////////////////////////////////
00570 //
00571 //     CGridControleThread
00572 /// @internal
00573 class CGridControlThread : public CThread
00574 {
00575 public:
00576     CGridControlThread(CGridWorkerNode* worker_node,
00577         unsigned int start_port, unsigned int end_port) : m_Control(
00578             new CWorkerNodeControlServer(worker_node, start_port, end_port)) {}
00579 
00580     void Prepare() { m_Control->StartListening(); }
00581 
00582     unsigned short GetControlPort() { return m_Control->GetControlPort(); }
00583     void Stop() { if (m_Control.get()) m_Control->RequestShutdown(); }
00584 
00585 protected:
00586     virtual void* Main(void)
00587     {
00588         m_Control->Run();
00589         return NULL;
00590     }
00591     virtual void OnExit(void)
00592     {
00593         CThread::OnExit();
00594         CGridGlobals::GetInstance().RequestShutdown(CNetScheduleAdmin::eShutdownImmediate);
00595         LOG_POST_X(46, "Control Thread has been stopped.");
00596     }
00597 
00598 private:
00599     auto_ptr<CWorkerNodeControlServer> m_Control;
00600 };
00601 
00602 class CGridCleanupThread : public CThread
00603 {
00604 public:
00605     CGridCleanupThread(CGridWorkerNode* worker_node,
00606         IGridWorkerNodeApp_Listener* listener) :
00607             m_WorkerNode(worker_node),
00608             m_Listener(listener),
00609             m_Semaphore(0, 1)
00610     {
00611     }
00612 
00613     bool Wait(unsigned seconds) {return m_Semaphore.TryWait(seconds);}
00614 
00615 protected:
00616     virtual void* Main();
00617 
00618 private:
00619     CGridWorkerNode* m_WorkerNode;
00620     IGridWorkerNodeApp_Listener* m_Listener;
00621     CSemaphore m_Semaphore;
00622 };
00623 
00624 void* CGridCleanupThread::Main()
00625 {
00626     m_WorkerNode->GetCleanupEventSource()->CallEventHandlers();
00627     m_Listener->OnGridWorkerStop();
00628     m_Semaphore.Post();
00629 
00630     return NULL;
00631 }
00632 
00633 /////////////////////////////////////////////////////////////////////////////
00634 //
00635 //     CWorkerNodeIdleThread      --
00636 class CWorkerNodeIdleThread : public CThread
00637 {
00638 public:
00639     CWorkerNodeIdleThread(IWorkerNodeIdleTask*,
00640                           CGridWorkerNode& worker_node,
00641                           unsigned run_delay,
00642                           unsigned int auto_shutdown);
00643 
00644     void RequestShutdown()
00645     {
00646         m_ShutdownFlag = true;
00647         m_Wait1.Post();
00648         m_Wait2.Post();
00649     }
00650     void Schedule()
00651     {
00652         CFastMutexGuard guard(m_Mutext);
00653         m_AutoShutdownSW.Restart();
00654         if (m_StopFlag) {
00655             m_StopFlag = false;
00656             m_Wait1.Post();
00657         }
00658     }
00659     void Suspend()
00660     {
00661         CFastMutexGuard guard(m_Mutext);
00662         m_AutoShutdownSW.Restart();
00663         m_AutoShutdownSW.Stop();
00664         if (!m_StopFlag) {
00665             m_StopFlag = true;
00666             m_Wait2.Post();
00667         }
00668     }
00669 
00670     bool IsShutdownRequested() const { return m_ShutdownFlag; }
00671 
00672 
00673 protected:
00674     virtual void* Main(void);
00675     virtual void OnExit(void);
00676 
00677     CWorkerNodeIdleTaskContext& GetContext();
00678 
00679 private:
00680 
00681     unsigned int x_GetInterval() const
00682     {
00683         CFastMutexGuard guard(m_Mutext);
00684         return  m_AutoShutdown > 0 ?
00685                 min( m_AutoShutdown - (unsigned int)m_AutoShutdownSW.Elapsed(), m_RunInterval )
00686                 : m_RunInterval;
00687     }
00688     bool x_GetStopFlag() const
00689     {
00690         CFastMutexGuard guard(m_Mutext);
00691         return m_StopFlag;
00692     }
00693     bool x_IsAutoShutdownTime() const
00694     {
00695         CFastMutexGuard guard(m_Mutext);
00696         return m_AutoShutdown > 0 ? m_AutoShutdownSW.Elapsed() > m_AutoShutdown : false;
00697     }
00698 
00699     IWorkerNodeIdleTask* m_Task;
00700     CGridWorkerNode& m_WorkerNode;
00701     auto_ptr<CWorkerNodeIdleTaskContext> m_TaskContext;
00702     mutable CSemaphore  m_Wait1;
00703     mutable CSemaphore  m_Wait2;
00704     volatile bool       m_StopFlag;
00705     volatile bool       m_ShutdownFlag;
00706     unsigned int        m_RunInterval;
00707     unsigned int        m_AutoShutdown;
00708     CStopWatch          m_AutoShutdownSW;
00709     mutable CFastMutex  m_Mutext;
00710 
00711     CWorkerNodeIdleThread(const CWorkerNodeIdleThread&);
00712     CWorkerNodeIdleThread& operator=(const CWorkerNodeIdleThread&);
00713 };
00714 
00715 CWorkerNodeIdleThread::CWorkerNodeIdleThread(IWorkerNodeIdleTask* task,
00716                                              CGridWorkerNode& worker_node,
00717                                              unsigned run_delay,
00718                                              unsigned int auto_shutdown)
00719     : m_Task(task), m_WorkerNode(worker_node),
00720       m_Wait1(0,100000), m_Wait2(0,1000000),
00721       m_StopFlag(false), m_ShutdownFlag(false),
00722       m_RunInterval(run_delay),
00723       m_AutoShutdown(auto_shutdown), m_AutoShutdownSW(CStopWatch::eStart)
00724 {
00725 }
00726 void* CWorkerNodeIdleThread::Main()
00727 {
00728     while (!m_ShutdownFlag) {
00729         if ( x_IsAutoShutdownTime() ) {
00730             LOG_POST_X(47, "There are no more jobs to be done. Exiting.");
00731             CGridGlobals::GetInstance().RequestShutdown(CNetScheduleAdmin::eShutdownImmediate);
00732             break;
00733         }
00734         unsigned int interval = m_AutoShutdown > 0 ? min (m_RunInterval,m_AutoShutdown) : m_RunInterval;
00735         if (m_Wait1.TryWait(interval, 0)) {
00736             if (m_ShutdownFlag)
00737                 continue;
00738             interval = x_GetInterval();
00739             if (m_Wait2.TryWait(interval, 0)) {
00740                 continue;
00741             }
00742         }
00743         if (m_Task && !x_GetStopFlag()) {
00744             try {
00745                 do {
00746                     if ( x_IsAutoShutdownTime() ) {
00747                         LOG_POST_X(48,
00748                             "There are no more jobs to be done. Exiting.");
00749                         CGridGlobals::GetInstance().RequestShutdown(
00750                             CNetScheduleAdmin::eShutdownImmediate);
00751                         m_ShutdownFlag = true;
00752                         break;
00753                     }
00754                     GetContext().Reset();
00755                     m_Task->Run(GetContext());
00756                 } while( GetContext().NeedRunAgain() && !m_ShutdownFlag);
00757             } NCBI_CATCH_ALL_X(58,
00758                 "CWorkerNodeIdleThread::Main: Idle Task failed");
00759         }
00760     }
00761     return 0;
00762 }
00763 
00764 void CWorkerNodeIdleThread::OnExit(void)
00765 {
00766     LOG_POST_X(49, "Idle Thread has been stopped.");
00767 }
00768 
00769 CWorkerNodeIdleTaskContext& CWorkerNodeIdleThread::GetContext()
00770 {
00771     if (!m_TaskContext.get())
00772         m_TaskContext.reset(new CWorkerNodeIdleTaskContext(*this));
00773     return *m_TaskContext;
00774 }
00775 
00776 /////////////////////////////////////////////////////////////////////////////
00777 //
00778 //     CWorkerNodeIdleTaskContext      --
00779 CWorkerNodeIdleTaskContext::
00780 CWorkerNodeIdleTaskContext(CWorkerNodeIdleThread& thread)
00781     : m_Thread(thread), m_RunAgain(false)
00782 {
00783 }
00784 bool CWorkerNodeIdleTaskContext::IsShutdownRequested() const
00785 {
00786     return m_Thread.IsShutdownRequested();
00787 }
00788 void CWorkerNodeIdleTaskContext::Reset()
00789 {
00790     m_RunAgain = false;
00791 }
00792 
00793 void CWorkerNodeIdleTaskContext::RequestShutdown()
00794 {
00795     m_Thread.RequestShutdown();
00796     CGridGlobals::GetInstance().
00797         RequestShutdown(CNetScheduleAdmin::eShutdownImmediate);
00798 }
00799 
00800 /////////////////////////////////////////////////////////////////////////////
00801 //
00802 //    CIdleWatcher
00803 /// @internal
00804 class CIdleWatcher : public IWorkerNodeJobWatcher
00805 {
00806 public:
00807     CIdleWatcher(CWorkerNodeIdleThread& idle)
00808         : m_Idle(idle), m_RunningJobs(0) {}
00809     virtual ~CIdleWatcher() {};
00810     virtual void Notify(const CWorkerNodeJobContext& job, EEvent event)
00811     {
00812         if (event == eJobStarted) {
00813             ++m_RunningJobs;
00814             m_Idle.Suspend();
00815         } else if (event == eJobStopped) {
00816             --m_RunningJobs;
00817             if (m_RunningJobs == 0)
00818                 m_Idle.Schedule();
00819         }
00820     }
00821 
00822 private:
00823     CWorkerNodeIdleThread& m_Idle;
00824     volatile int m_RunningJobs;
00825 };
00826 
00827 
00828 /////////////////////////////////////////////////////////////////////////////
00829 //
00830 
00831 CGridWorkerNode::CGridWorkerNode(
00832                                CNcbiApplication&          app,
00833                                IWorkerNodeJobFactory*     job_factory,
00834                                IBlobStorageFactory*       storage_factory,
00835                                INetScheduleClientFactory* client_factory) :
00836     m_JobFactory(job_factory),
00837     m_StorageFactory(storage_factory),
00838     m_ClientFactory(client_factory),
00839     m_MaxThreads(1),
00840     m_NSTimeout(30),
00841     m_CheckStatusPeriod(2),
00842     m_ExclusiveJobSemaphore(1, 1),
00843     m_IsProcessingExclusiveJob(false),
00844     m_UseEmbeddedStorage(false),
00845     m_CleanupEventSource(new CWorkerNodeCleanup()),
00846     m_Listener(new CGridWorkerNodeApp_Listener()),
00847     m_App(app),
00848     m_SingleThreadForced(false)
00849 {
00850     if (!m_JobFactory.get())
00851         NCBI_THROW(CGridWorkerAppException,
00852                  eJobFactoryIsNotSet, "The JobFactory is not set.");
00853 }
00854 
00855 CGridWorkerNode::~CGridWorkerNode()
00856 {
00857 }
00858 
00859 void CGridWorkerNode::Init(bool default_merge_lines_value)
00860 {
00861     IRWRegistry& reg = m_App.GetConfig();
00862 
00863     if (reg.GetBool("log", "merge_lines", default_merge_lines_value)) {
00864         SetDiagPostFlag(eDPF_PreMergeLines);
00865         SetDiagPostFlag(eDPF_MergeLines);
00866     }
00867 
00868     reg.Set(kNetScheduleAPIDriverName, "discover_low_priority_servers", "true");
00869 
00870     if (!m_StorageFactory.get())
00871         m_StorageFactory.reset(new CBlobStorageFactory(reg));
00872     if (!m_ClientFactory.get())
00873         m_ClientFactory.reset(new CNetScheduleClientFactory(reg));
00874 }
00875 
00876 const char* kServerSec = "server";
00877 
00878 int CGridWorkerNode::Run()
00879 {
00880     LOG_POST_X(50, GetJobFactory().GetJobVersion() << WN_BUILD_DATE);
00881 
00882     const IRegistry& reg = m_App.GetConfig();
00883     CConfig conf(reg);
00884 
00885     unsigned init_threads = 1;
00886 
00887     if (!m_SingleThreadForced) {
00888         string max_threads = reg.GetString(kServerSec, "max_threads", "auto");
00889         if (NStr::CompareNocase(max_threads, "auto") == 0)
00890             m_MaxThreads = GetCpuCount();
00891         else {
00892             try {
00893                 m_MaxThreads = NStr::StringToUInt(max_threads);
00894             } catch (...) {
00895                 m_MaxThreads = GetCpuCount();
00896                 ERR_POST_X(51, "Could not convert [" << kServerSec <<
00897                     "] max_threads parameter to number.\n"
00898                     "Using \'auto\' option (" << m_MaxThreads << ").");
00899             }
00900         }
00901         init_threads =
00902             reg.GetInt(kServerSec, "init_threads", 1, 0, IRegistry::eReturn);
00903     }
00904     if (init_threads > m_MaxThreads)
00905         init_threads = m_MaxThreads;
00906 
00907     m_NSTimeout = reg.GetInt(kServerSec,
00908         "job_wait_timeout", 30, 0, IRegistry::eReturn);
00909 
00910     unsigned thread_pool_timeout = reg.GetInt(kServerSec,
00911         "thread_pool_timeout", 30, 0, IRegistry::eReturn);
00912 
00913     const CArgs& args = m_App.GetArgs();
00914 
00915     unsigned int start_port, end_port;
00916 
00917     string sport1, sport2;
00918     NStr::SplitInTwo(args["control_port"] ? args["control_port"].AsString() :
00919         reg.GetString(kServerSec, "control_port", "9300"), "-", sport1, sport2);
00920     start_port = NStr::StringToUInt(sport1);
00921     end_port = sport2.empty() ? start_port : NStr::StringToUInt(sport2);
00922 
00923     bool log_requested = reg.GetBool(kServerSec,
00924         "log", false, 0, IRegistry::eReturn);
00925 
00926     unsigned int idle_run_delay =
00927         reg.GetInt(kServerSec,"idle_run_delay",30,0,IRegistry::eReturn);
00928 
00929     unsigned int auto_shutdown =
00930         reg.GetInt(kServerSec,"auto_shutdown_if_idle",0,0,IRegistry::eReturn);
00931 
00932     unsigned int infinite_loop_time =
00933         reg.HasEntry(kServerSec, "infinite_loop_time") ?
00934         reg.GetInt(kServerSec, "infinite_loop_time", 0, 0, IRegistry::eReturn) :
00935         reg.GetInt(kServerSec, "infinit_loop_time", 0, 0, IRegistry::eReturn);
00936 
00937     bool reuse_job_object =
00938         reg.GetBool(kServerSec, "reuse_job_object", false, 0,
00939                     CNcbiRegistry::eReturn);
00940 
00941     unsigned int max_total_jobs =
00942         reg.GetInt(kServerSec,"max_total_jobs",0,0,IRegistry::eReturn);
00943 
00944     unsigned int max_failed_jobs =
00945         reg.GetInt(kServerSec,"max_failed_jobs",0,0,IRegistry::eReturn);
00946 
00947     bool is_daemon =
00948         reg.GetBool(kServerSec, "daemon", false, 0, CNcbiRegistry::eReturn);
00949 
00950     vector<string> vhosts;
00951 
00952     NStr::Tokenize(reg.GetString(kServerSec,
00953         "master_nodes", kEmptyStr), " ;,", vhosts);
00954 
00955     ITERATE(vector<string>, it, vhosts) {
00956         string host, port;
00957         NStr::SplitInTwo(NStr::TruncateSpaces(*it), ":", host, port);
00958         if (host.empty() || port.empty())
00959             continue;
00960         try {
00961             m_Masters.insert(SServerAddress(NStr::ToLower(host),
00962                 (unsigned short) NStr::StringToUInt(port)));
00963         } catch(...) {}
00964     }
00965 
00966     vhosts.clear();
00967 
00968     NStr::Tokenize(reg.GetString(kServerSec,
00969         "admin_hosts", kEmptyStr), " ;,", vhosts);
00970 
00971     ITERATE(vector<string>, it, vhosts) {
00972         unsigned int ha = CSocketAPI::gethostbyname(*it);
00973         if (ha != 0)
00974             m_AdminHosts.insert(ha);
00975     }
00976 
00977     m_CheckStatusPeriod =
00978         reg.GetInt(kServerSec, "check_status_period", 2, 0, IRegistry::eReturn);
00979 
00980     if (reg.HasEntry(kServerSec,"wait_server_timeout")) {
00981         ERR_POST_X(52, "[" << kServerSec <<
00982             "] \"wait_server_timeout\" is not used anymore.\n"
00983             "Use [" << kNetScheduleAPIDriverName <<
00984             "] \"communication_timeout\" parameter instead.");
00985     }
00986 
00987     if (reg.HasEntry(kNetScheduleAPIDriverName, "use_embedded_storage"))
00988         m_UseEmbeddedStorage = reg.
00989             GetBool(kNetScheduleAPIDriverName, "use_embedded_storage", false, 0,
00990                     CNcbiRegistry::eReturn);
00991     else
00992         m_UseEmbeddedStorage = reg.
00993             GetBool(kNetScheduleAPIDriverName, "use_embedded_input", false, 0,
00994                     CNcbiRegistry::eReturn);
00995 
00996     CGridDebugContext::eMode debug_mode = CGridDebugContext::eGDC_NoDebug;
00997     string dbg_mode = reg.GetString("gw_debug", "mode", kEmptyStr);
00998     if (NStr::CompareNocase(dbg_mode, "gather")==0) {
00999         debug_mode = CGridDebugContext::eGDC_Gather;
01000     } else if (NStr::CompareNocase(dbg_mode, "execute")==0) {
01001         debug_mode = CGridDebugContext::eGDC_Execute;
01002     }
01003     if (debug_mode != CGridDebugContext::eGDC_NoDebug) {
01004         CGridDebugContext& debug_context =
01005             CGridDebugContext::Create(debug_mode, *m_StorageFactory);
01006         string run_name =
01007             reg.GetString("gw_debug", "run_name", m_App.GetProgramDisplayName());
01008         debug_context.SetRunName(run_name);
01009         if (debug_mode == CGridDebugContext::eGDC_Gather) {
01010             max_total_jobs =
01011                 reg.GetInt("gw_debug","gather_nrequests",1,0,IRegistry::eReturn);
01012         } else if (debug_mode == CGridDebugContext::eGDC_Execute) {
01013             string files =
01014                 reg.GetString("gw_debug", "execute_requests", kEmptyStr);
01015             max_total_jobs = 0;
01016             debug_context.SetExecuteList(files);
01017         }
01018         is_daemon = false;
01019     }
01020 
01021 #if defined(NCBI_OS_UNIX)
01022     if (is_daemon) {
01023         LOG_POST_X(53, "Entering UNIX daemon mode...");
01024         bool daemon = CProcess::Daemonize("/dev/null",
01025                                           CProcess::fDontChroot |
01026                                           CProcess::fKeepStdin  |
01027                                           CProcess::fKeepStdout);
01028         if (!daemon) {
01029             return 0;
01030         }
01031     }
01032 #endif
01033 
01034     AttachJobWatcher(CGridGlobals::GetInstance().GetJobsWatcher());
01035 
01036     CRef<CGridControlThread> control_thread(
01037         new CGridControlThread(this, start_port, end_port));
01038 
01039     control_thread->Prepare();
01040 
01041     m_RebalanceStrategy = CreateSimpleRebalanceStrategy(conf, "server");
01042 
01043     m_SharedNSClient = m_ClientFactory->CreateInstance();
01044     m_NSExecuter =
01045         m_SharedNSClient.GetExecuter(control_thread->GetControlPort());
01046 
01047     m_SharedNSClient.SetProgramVersion(m_JobFactory->GetJobVersion());
01048 
01049     CGridGlobals::GetInstance().SetReuseJobObject(reuse_job_object);
01050     CGridGlobals::GetInstance().GetJobsWatcher().SetMaxJobsAllowed(max_total_jobs);
01051     CGridGlobals::GetInstance().GetJobsWatcher().SetMaxFailuresAllowed(max_failed_jobs);
01052     CGridGlobals::GetInstance().GetJobsWatcher().SetInfinitLoopTime(infinite_loop_time);
01053     CGridGlobals::GetInstance().SetWorker(*this);
01054 
01055     IWorkerNodeIdleTask* task = NULL;
01056     if (idle_run_delay > 0)
01057         task = GetJobFactory().GetIdleTask();
01058     if (task || auto_shutdown > 0 ) {
01059         m_IdleThread.Reset(new CWorkerNodeIdleThread(task, *this,
01060                                                      task ? idle_run_delay : auto_shutdown,
01061                                                      auto_shutdown));
01062         m_IdleThread->Run();
01063         AttachJobWatcher(*(new CIdleWatcher(*m_IdleThread)), eTakeOwnership);
01064     }
01065 
01066     control_thread->Run();
01067 
01068     LOG_POST_X(54, "\n=================== NEW RUN : " <<
01069         CGridGlobals::GetInstance().GetStartTime().AsString() <<
01070             " ===================\n" <<
01071         GetJobFactory().GetJobVersion() << WN_BUILD_DATE << " is started.\n"
01072         "Waiting for control commands on " << CSocketAPI::gethostname() <<
01073             ":" << control_thread->GetControlPort() << "\n"
01074         "Queue name: " << GetQueueName() << "\n"
01075         "Maximum job threads: " << m_MaxThreads << "\n");
01076 
01077     try {
01078         GetNSExecuter().RegisterClient();
01079     } catch (CNetServiceException& ex) {
01080         // If the server does not understand this new command, ignore the error.
01081         if (ex.GetErrCode() != CNetServiceException::eCommunicationError ||
01082                 NStr::Find(ex.what(), "Server error:Unknown request") == NPOS)
01083             throw;
01084     }
01085 
01086     m_Listener->OnGridWorkerStart();
01087 
01088     auto_ptr<CGridThreadContext> single_thread_context;
01089     auto_ptr<CStdPoolOfThreads> thread_pool;
01090 
01091     _ASSERT(m_MaxThreads > 0);
01092 
01093     if (m_MaxThreads <= 1)
01094         single_thread_context.reset(new CGridThreadContext(*this));
01095     else {
01096         thread_pool.reset(new CStdPoolOfThreads(m_MaxThreads, 0));
01097         try {
01098             thread_pool->Spawn(init_threads);
01099         }
01100         catch (exception& ex) {
01101             ERR_POST_X(26, ex.what());
01102             CGridGlobals::GetInstance().RequestShutdown(
01103                 CNetScheduleAdmin::eShutdownImmediate);
01104             return 2;
01105         }
01106         catch (...) {
01107             ERR_POST_X(27, "Unknown error");
01108             CGridGlobals::GetInstance().RequestShutdown(
01109                 CNetScheduleAdmin::eShutdownImmediate);
01110             return 3;
01111         }
01112     }
01113 
01114     CNetScheduleJob job;
01115 
01116     unsigned try_count = 0;
01117     while (!CGridGlobals::GetInstance().IsShuttingDown()) {
01118         try {
01119 
01120             if (m_MaxThreads > 1) {
01121                 try {
01122                     thread_pool->WaitForRoom(thread_pool_timeout);
01123                 } catch (CBlockingQueueException&) {
01124                     // threaded pool is busy
01125                     continue;
01126                 }
01127             }
01128 
01129             if (x_GetNextJob(job)) {
01130                 auto_ptr<CWorkerNodeJobContext>
01131                     job_context(new CWorkerNodeJobContext(*this,
01132                                                           job,
01133                                                           log_requested));
01134                 job_context->Reset(job);
01135                 if (m_MaxThreads > 1 ) {
01136                     CRef<CStdRequest> job_req(
01137                                     new CWorkerNodeRequest(job_context));
01138                     try {
01139                         thread_pool->AcceptRequest(job_req);
01140                     } catch (CBlockingQueueException& ex) {
01141                         ERR_POST_X(28, ex.what());
01142                         // that must not happen after CBlockingQueue is fixed
01143                         _ASSERT(0);
01144                         x_ReturnJob(job.job_id);
01145                     }
01146                 } else {
01147                     try {
01148                         single_thread_context->RunJobs(*job_context);
01149                     } catch (...) {
01150                         x_ReturnJob(job_context->GetJobKey());
01151                         throw;
01152                     }
01153                 }
01154             }
01155         } catch (CNetServiceException& ex) {
01156             ERR_POST_X(40, ex.what());
01157             if (++try_count >= TServConn_ConnMaxRetries::GetDefault()) {
01158                 CGridGlobals::GetInstance().RequestShutdown(
01159                     CNetScheduleAdmin::eShutdownImmediate);
01160             } else {
01161                 SleepMilliSec(s_GetRetryDelay());
01162                 continue;
01163             }
01164         } catch (exception& ex) {
01165             if (TWorkerNode_StopOnJobErrors::GetDefault()) {
01166                 ERR_POST_X(29, ex.what());
01167                 CGridGlobals::GetInstance().RequestShutdown(
01168                     CNetScheduleAdmin::eShutdownImmediate);
01169             }
01170         } catch (...) {
01171             if (TWorkerNode_StopOnJobErrors::GetDefault()) {
01172                 ERR_POST_X(30, "Unknown error");
01173                 CGridGlobals::GetInstance().RequestShutdown(
01174                     CNetScheduleAdmin::eShutdownImmediate);
01175             }
01176         }
01177         try_count = 0;
01178     }
01179     LOG_POST_X(31, "Shutting down...");
01180     if (reg.GetBool(kServerSec,
01181             "force_exit", false, 0, CNcbiRegistry::eReturn)) {
01182         ERR_POST_X(45, "Force exit");
01183     } else if (m_MaxThreads > 1) {
01184         try {
01185             LOG_POST_X(32, "Stopping worker threads...");
01186             thread_pool->KillAllThreads(true);
01187             thread_pool.reset(0);
01188         } catch (exception& ex) {
01189             ERR_POST_X(33, "Could not stop worker threads: " << ex.what());
01190         } catch (...) {
01191             ERR_POST_X(34, "Could not stop worker threads: Unknown error");
01192         }
01193     }
01194     try {
01195         GetNSExecuter().UnRegisterClient();
01196     } catch (CNetServiceException& ex) {
01197         // if server does not understand this new command just ignore the error
01198         if (ex.GetErrCode() != CNetServiceException::eCommunicationError
01199             || NStr::Find(ex.what(),"Server error:Unknown request") == NPOS) {
01200             ERR_POST_X(35, "Could unregister from NetScehdule services: "
01201                        << ex.what());
01202         }
01203     } catch(exception& ex) {
01204         ERR_POST_X(36, "Could unregister from NetScehdule services: "
01205                    << ex.what());
01206     } catch(...) {
01207         ERR_POST_X(37, "Could unregister from NetScehdule services: "
01208             "Unknown error." );
01209     }
01210 
01211     LOG_POST_X(38, "Worker Node has been stopped.");
01212 
01213 
01214     CRef<CGridCleanupThread> cleanup_thread(
01215         new CGridCleanupThread(this, m_Listener.get()));
01216 
01217     cleanup_thread->Run();
01218 
01219     LOG_POST_X(55, "Stopping Control thread...");
01220     control_thread->Stop();
01221     control_thread->Join();
01222 
01223     CNcbiOstrstream os;
01224     CGridGlobals::GetInstance().GetJobsWatcher().Print(os);
01225     LOG_POST_X(56, string(CNcbiOstrstreamToString(os)));
01226 
01227     if (m_IdleThread) {
01228         if (!m_IdleThread->IsShutdownRequested()) {
01229             LOG_POST_X(57, "Stopping Idle thread...");
01230             m_IdleThread->RequestShutdown();
01231         }
01232         m_IdleThread->Join();
01233     }
01234 
01235     if (cleanup_thread->Wait(thread_pool_timeout)) {
01236         cleanup_thread->Join();
01237         LOG_POST_X(58, "Cleanup thread finished");
01238     } else {
01239         ERR_POST_X(59, "Clean-up thread timed out");
01240     }
01241 
01242     return 0;
01243 }
01244 
01245 void CGridWorkerNode::RequestShutdown()
01246 {
01247     CGridGlobals::GetInstance().
01248         RequestShutdown(CNetScheduleAdmin::eShutdownImmediate);
01249 }
01250 
01251 
01252 void CGridWorkerNode::AttachJobWatcher(IWorkerNodeJobWatcher& job_watcher,
01253                                            EOwnership owner)
01254 {
01255     if (!m_JobWatchers.get())
01256         m_JobWatchers.reset(new CWorkerNodeJobWatchers);
01257     m_JobWatchers->AttachJobWatcher(job_watcher, owner);
01258 };
01259 
01260 void CGridWorkerNode::SetListener(IGridWorkerNodeApp_Listener* listener)
01261 {
01262     m_Listener.reset(listener ? listener : new CGridWorkerNodeApp_Listener());
01263 }
01264 
01265 
01266 void CGridWorkerNode::x_NotifyJobWatcher(const CWorkerNodeJobContext& job,
01267     IWorkerNodeJobWatcher::EEvent event)
01268 {
01269     if (m_JobWatchers.get() != NULL) {
01270         CFastMutexGuard guard(m_JobWatcherMutex);
01271         m_JobWatchers->Notify(job, event);
01272     }
01273 }
01274 
01275 bool CGridWorkerNode::x_GetNextJob(CNetScheduleJob& job)
01276 {
01277     bool job_exists = false;
01278 
01279     CGridDebugContext* debug_context = CGridDebugContext::GetInstance();
01280 
01281     if (debug_context &&
01282         debug_context->GetDebugMode() == CGridDebugContext::eGDC_Execute) {
01283         job_exists = debug_context->GetNextJob(job.job_id, job.input);
01284         if (!job_exists) {
01285             CGridGlobals::GetInstance().
01286                 RequestShutdown(CNetScheduleAdmin::eNormalShutdown);
01287         }
01288     } else {
01289         if (!x_AreMastersBusy()) {
01290             SleepSec(m_NSTimeout);
01291             return false;
01292         }
01293 
01294         if (!WaitForExclusiveJobToFinish())
01295             return false;
01296 
01297         job_exists = GetNSExecuter().WaitJob(job, m_NSTimeout);
01298 
01299         if (job_exists && job.mask & CNetScheduleAPI::eExclusiveJob)
01300             job_exists = EnterExclusiveModeOrReturnJob(job);
01301     }
01302     if (job_exists && CGridGlobals::GetInstance().IsShuttingDown()) {
01303         x_ReturnJob(job.job_id);
01304         return false;
01305     }
01306     return job_exists;
01307 }
01308 
01309 void CGridWorkerNode::x_ReturnJob(const string& job_key)
01310 {
01311     CGridDebugContext* debug_context = CGridDebugContext::GetInstance();
01312     if (!debug_context || debug_context->GetDebugMode() != CGridDebugContext::eGDC_Execute) {
01313          GetNSExecuter().ReturnJob(job_key);
01314     }
01315 }
01316 
01317 void CGridWorkerNode::x_FailJob(const string& job_key, const string& reason)
01318 {
01319     CGridDebugContext* debug_context = CGridDebugContext::GetInstance();
01320     if (!debug_context ||
01321             debug_context->GetDebugMode() != CGridDebugContext::eGDC_Execute) {
01322          CNetScheduleJob job(job_key);
01323          job.error_msg = reason;
01324          GetNSExecuter().PutFailure(job);
01325     }
01326 }
01327 
01328 size_t CGridWorkerNode::GetServerOutputSize() const
01329 {
01330     return IsEmeddedStorageUsed() ?
01331         GetNSClient().GetServerParams().max_output_size : 0;
01332 }
01333 
01334 bool CGridWorkerNode::IsHostInAdminHostsList(const string& host) const
01335 {
01336     if (m_AdminHosts.empty())
01337         return true;
01338     unsigned int ha = CSocketAPI::gethostbyname(host);
01339     if (m_AdminHosts.find(ha) != m_AdminHosts.end())
01340         return true;
01341     unsigned int ha_lh = CSocketAPI::gethostbyname("");
01342     if (ha == ha_lh) {
01343         ha = CSocketAPI::gethostbyname("localhost");
01344         if (m_AdminHosts.find(ha) != m_AdminHosts.end())
01345             return true;
01346     }
01347     return false;
01348 }
01349 
01350 bool CGridWorkerNode::x_AreMastersBusy() const
01351 {
01352     ITERATE(set<SServerAddress>, it, m_Masters) {
01353         STimeout tmo = {0, 500};
01354         CSocket socket(it->host, it->port, &tmo, eOff);
01355         if (socket.GetStatus(eIO_Open) != eIO_Success)
01356             continue;
01357 
01358         CNcbiOstrstream os;
01359         os << GetJobVersion() << endl;
01360         os << GetNSClient().GetQueueName()  <<";"
01361            << GetNSClient().GetService().GetServiceName();
01362         os << endl;
01363         os << "GETLOAD" << ends;
01364         if (socket.Write(os.str(), os.pcount()) != eIO_Success) {
01365             os.freeze(false);
01366             continue;
01367         }
01368         os.freeze(false);
01369         string reply;
01370         if (socket.ReadLine(reply) != eIO_Success)
01371             continue;
01372         if (NStr::StartsWith(reply, "ERR:")) {
01373             string msg;
01374             NStr::Replace(reply, "ERR:", "", msg);
01375             ERR_POST_X(43, "Worker Node at " << it->AsString() <<
01376                 " returned error: " << msg);
01377         } else if (NStr::StartsWith(reply, "OK:")) {
01378             string msg;
01379             NStr::Replace(reply, "OK:", "", msg);
01380             try {
01381                 int load = NStr::StringToInt(msg);
01382                 if (load > 0)
01383                     return false;
01384             } catch (...) {}
01385         } else {
01386             ERR_POST_X(44, "Worker Node at " << it->AsString() <<
01387                 " returned unknown reply: " << reply);
01388         }
01389     }
01390     return true;
01391 }
01392 
01393 bool CGridWorkerNode::IsTimeToRebalance()
01394 {
01395     if (!m_SharedNSClient.GetService().IsLoadBalanced() ||
01396             TWorkerNode_DoNotRebalance::GetDefault())
01397         return false;
01398 
01399     m_RebalanceStrategy->OnResourceRequested();
01400     return m_RebalanceStrategy->NeedRebalance();
01401 }
01402 
01403 bool CGridWorkerNode::EnterExclusiveMode()
01404 {
01405     if (m_ExclusiveJobSemaphore.TryWait()) {
01406         _ASSERT(!m_IsProcessingExclusiveJob);
01407 
01408         m_IsProcessingExclusiveJob = true;
01409         return true;
01410     }
01411     return false;
01412 }
01413 
01414 bool CGridWorkerNode::EnterExclusiveModeOrReturnJob(
01415     CNetScheduleJob& exclusive_job)
01416 {
01417     _ASSERT(exclusive_job.mask & CNetScheduleAPI::eExclusiveJob);
01418 
01419     if (!EnterExclusiveMode()) {
01420         x_ReturnJob(exclusive_job.job_id);
01421         return false;
01422     }
01423     return true;
01424 }
01425 
01426 void CGridWorkerNode::LeaveExclusiveMode()
01427 {
01428     _ASSERT(m_IsProcessingExclusiveJob);
01429 
01430     m_IsProcessingExclusiveJob = false;
01431     m_ExclusiveJobSemaphore.Post();
01432 }
01433 
01434 bool CGridWorkerNode::WaitForExclusiveJobToFinish()
01435 {
01436     if (m_ExclusiveJobSemaphore.TryWait(m_NSTimeout)) {
01437         m_ExclusiveJobSemaphore.Post();
01438         return true;
01439     }
01440     return false;
01441 }
01442 
01443 void CGridWorkerNode::DisableDefaultRequestEventLogging()
01444 {
01445     s_ReqEventsDisabled = true;
01446 }
01447 
01448 IWorkerNodeCleanupEventSource* CGridWorkerNode::GetCleanupEventSource()
01449 {
01450     return m_CleanupEventSource;
01451 }
01452 
01453 END_NCBI_SCOPE
01454 
01455 

Generated on Wed Dec 9 04:14:05 2009 for NCBI C++ ToolKit by  doxygen 1.4.6
Modified on Wed Dec 09 08:17:56 2009 by modify_doxy.py rev. 173732