00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032 #include <ncbi_pch.hpp>
00033
00034 #include "grid_thread_context.hpp"
00035 #include "netservice_params.hpp"
00036 #include "balancing.hpp"
00037
00038 #include <connect/services/grid_globals.hpp>
00039 #include <connect/services/error_codes.hpp>
00040 #include <connect/services/grid_debug_context.hpp>
00041 #include <connect/services/netschedule_api_expt.hpp>
00042 #include <connect/services/grid_worker_app.hpp>
00043 #include <connect/services/grid_control_thread.hpp>
00044
00045 #include <connect/ncbi_socket.hpp>
00046
00047 #include <util/thread_pool.hpp>
00048
00049 #include <corelib/ncbireg.hpp>
00050 #include <corelib/ncbithr.hpp>
00051 #include <corelib/ncbitime.hpp>
00052 #include <corelib/ncbi_config.hpp>
00053 #include <corelib/ncbi_process.hpp>
00054 #include <corelib/ncbiexpt.hpp>
00055 #include <corelib/ncbi_system.hpp>
00056 #include <corelib/ncbi_safe_static.hpp>
00057 #include <corelib/request_ctx.hpp>
00058 #include <corelib/blob_storage.hpp>
00059
00060 #ifdef NCBI_OS_UNIX
00061 #include <unistd.h>
00062 #endif
00063
00064
00065 #define NCBI_USE_ERRCODE_X ConnServ_WorkerNode
00066
00067
00068 BEGIN_NCBI_SCOPE
00069
00070
00071
00072 IWorkerNodeJobWatcher::~IWorkerNodeJobWatcher()
00073 {
00074 }
00075
00076
00077
00078
00079
00080 namespace {
00081
00082 class CWorkerNodeCleanup : public IWorkerNodeCleanupEventSource
00083 {
00084 public:
00085 typedef set<IWorkerNodeCleanupEventListener*> TListeners;
00086
00087 virtual void AddListener(IWorkerNodeCleanupEventListener* listener);
00088 virtual void RemoveListener(IWorkerNodeCleanupEventListener* listener);
00089
00090 virtual void CallEventHandlers();
00091
00092 void RemoveListeners(const TListeners& listeners);
00093
00094 protected:
00095 TListeners m_Listeners;
00096 CFastMutex m_ListenersLock;
00097 };
00098
00099 void CWorkerNodeCleanup::AddListener(IWorkerNodeCleanupEventListener* listener)
00100 {
00101 CFastMutexGuard g(m_ListenersLock);
00102 m_Listeners.insert(listener);
00103 }
00104
00105 void CWorkerNodeCleanup::RemoveListener(
00106 IWorkerNodeCleanupEventListener* listener)
00107 {
00108 CFastMutexGuard g(m_ListenersLock);
00109 m_Listeners.erase(listener);
00110 }
00111
00112 void CWorkerNodeCleanup::CallEventHandlers()
00113 {
00114 TListeners listeners;
00115 {
00116 CFastMutexGuard g(m_ListenersLock);
00117 listeners.swap(m_Listeners);
00118 }
00119
00120 ITERATE(TListeners, it, listeners) {
00121 try {
00122 (*it)->HandleEvent(
00123 IWorkerNodeCleanupEventListener::eRegularCleanup);
00124 delete *it;
00125 }
00126 NCBI_CATCH_ALL_X(39, "Job clean-up error");
00127 }
00128 }
00129
00130 void CWorkerNodeCleanup::RemoveListeners(
00131 const CWorkerNodeCleanup::TListeners& listeners)
00132 {
00133 CFastMutexGuard g(m_ListenersLock);
00134 ITERATE(TListeners, it, listeners) {
00135 m_Listeners.erase(*it);
00136 }
00137 }
00138
00139 class CWorkerNodeJobCleanup : public CWorkerNodeCleanup
00140 {
00141 public:
00142 CWorkerNodeJobCleanup(CWorkerNodeCleanup* worker_node_cleanup);
00143
00144 virtual void AddListener(IWorkerNodeCleanupEventListener* listener);
00145 virtual void RemoveListener(IWorkerNodeCleanupEventListener* listener);
00146
00147 virtual void CallEventHandlers();
00148
00149 private:
00150 CWorkerNodeCleanup* m_WorkerNodeCleanup;
00151 };
00152
00153 CWorkerNodeJobCleanup::CWorkerNodeJobCleanup(
00154 CWorkerNodeCleanup* worker_node_cleanup) :
00155 m_WorkerNodeCleanup(worker_node_cleanup)
00156 {
00157 }
00158
00159 void CWorkerNodeJobCleanup::AddListener(
00160 IWorkerNodeCleanupEventListener* listener)
00161 {
00162 CWorkerNodeCleanup::AddListener(listener);
00163 m_WorkerNodeCleanup->AddListener(listener);
00164 }
00165
00166 void CWorkerNodeJobCleanup::RemoveListener(
00167 IWorkerNodeCleanupEventListener* listener)
00168 {
00169 CWorkerNodeCleanup::RemoveListener(listener);
00170 m_WorkerNodeCleanup->RemoveListener(listener);
00171 }
00172
00173 void CWorkerNodeJobCleanup::CallEventHandlers()
00174 {
00175 {
00176 CFastMutexGuard g(m_ListenersLock);
00177 m_WorkerNodeCleanup->RemoveListeners(m_Listeners);
00178 }
00179 CWorkerNodeCleanup::CallEventHandlers();
00180 }
00181
00182 }
00183
00184 CWorkerNodeJobContext::CWorkerNodeJobContext(CGridWorkerNode& worker_node,
00185 const CNetScheduleJob& job,
00186 bool log_requested)
00187 : m_WorkerNode(worker_node),
00188 m_LogRequested(log_requested),
00189 m_ThreadContext(NULL),
00190 m_CleanupEventSource(new CWorkerNodeJobCleanup(
00191 static_cast<CWorkerNodeCleanup*>(worker_node.GetCleanupEventSource())))
00192 {
00193 Reset(job);
00194 }
00195
00196 const string& CWorkerNodeJobContext::GetQueueName() const
00197 {
00198 return m_WorkerNode.GetQueueName();
00199 }
00200 const string& CWorkerNodeJobContext::GetClientName() const
00201 {
00202 return m_WorkerNode.GetClientName();
00203 }
00204
00205 CNcbiIstream& CWorkerNodeJobContext::GetIStream(IBlobStorage::ELockMode mode)
00206 {
00207 _ASSERT(m_ThreadContext);
00208 return m_ThreadContext->GetIStream(mode);
00209 }
00210 CNcbiOstream& CWorkerNodeJobContext::GetOStream()
00211 {
00212 _ASSERT(m_ThreadContext);
00213 return m_ThreadContext->GetOStream();
00214 }
00215
00216 void CWorkerNodeJobContext::PutProgressMessage(const string& msg,
00217 bool send_immediately)
00218 {
00219 _ASSERT(m_ThreadContext);
00220 m_ThreadContext->PutProgressMessage(msg, send_immediately);
00221 }
00222
00223 void CWorkerNodeJobContext::SetThreadContext(CGridThreadContext* thr_context)
00224 {
00225 m_ThreadContext = thr_context;
00226 }
00227
00228 void CWorkerNodeJobContext::SetJobRunTimeout(unsigned time_to_run)
00229 {
00230 _ASSERT(m_ThreadContext);
00231 m_ThreadContext->SetJobRunTimeout(time_to_run);
00232 }
00233
00234 void CWorkerNodeJobContext::JobDelayExpiration(unsigned runtime_inc)
00235 {
00236 _ASSERT(m_ThreadContext);
00237 m_ThreadContext->JobDelayExpiration(runtime_inc);
00238 }
00239
00240 CNetScheduleAdmin::EShutdownLevel
00241 CWorkerNodeJobContext::GetShutdownLevel(void) const
00242 {
00243 _ASSERT(m_ThreadContext);
00244 if (m_ThreadContext->IsJobCanceled())
00245 return CNetScheduleAdmin::eShutdownImmediate;
00246 return CGridGlobals::GetInstance().GetShutdownLevel();
00247 }
00248
00249 void CWorkerNodeJobContext::Reset(const CNetScheduleJob& job)
00250 {
00251 m_Job = job;
00252 m_JobNumber = CGridGlobals::GetInstance().GetNewJobNumber();
00253
00254 m_JobCommitted = eNotCommitted;
00255 m_InputBlobSize = 0;
00256 m_ExclusiveJob = m_Job.mask & CNetScheduleAPI::eExclusiveJob;
00257 }
00258
00259 void CWorkerNodeJobContext::RequestExclusiveMode()
00260 {
00261 if (!m_ExclusiveJob) {
00262 if (!m_WorkerNode.EnterExclusiveMode()) {
00263 NCBI_THROW(CGridWorkerNodeException,
00264 eExclusiveModeIsAlreadySet, "");
00265 }
00266 m_ExclusiveJob = true;
00267 }
00268 }
00269
00270 size_t CWorkerNodeJobContext::GetMaxServerOutputSize() const
00271 {
00272 return m_WorkerNode.GetServerOutputSize();
00273 }
00274
00275 IWorkerNodeCleanupEventSource* CWorkerNodeJobContext::GetCleanupEventSource()
00276 {
00277 return m_CleanupEventSource;
00278 }
00279
00280
00281
00282
00283
00284 static void s_TlsCleanup(CGridThreadContext* p_value, void* )
00285 {
00286 delete p_value;
00287 }
00288
00289 static CStaticTls<CGridThreadContext> s_tls;
00290
00291
00292 class CWorkerNodeRequest : public CStdRequest
00293 {
00294 public:
00295 CWorkerNodeRequest(auto_ptr<CWorkerNodeJobContext> context);
00296
00297 virtual void Process();
00298
00299 private:
00300 void x_HandleProcessError(exception* ex = NULL);
00301
00302 auto_ptr<CWorkerNodeJobContext> m_JobContext;
00303 };
00304
00305
00306 CWorkerNodeRequest::CWorkerNodeRequest(auto_ptr<CWorkerNodeJobContext> context)
00307 : m_JobContext(context)
00308 {
00309 }
00310
00311
00312 void CGridThreadContext::x_HandleRunJobError(exception* ex )
00313 {
00314 string msg = " Error in Job execution";
00315 if (ex) {
00316 msg += ": ";
00317 msg += ex->what();
00318 }
00319 ERR_POST_X(18, m_JobContext->GetJobKey() << msg);
00320 try {
00321 PutFailure(ex ? ex->what() : "Unknown error");
00322 } catch (exception& ex1) {
00323 ERR_POST_X(19, "Failed to report exception: " <<
00324 m_JobContext->GetJobKey() << " " << ex1.what());
00325 } catch (...) {
00326 ERR_POST_X(20, "Failed to report exception while processing " <<
00327 m_JobContext->GetJobKey());
00328 }
00329 }
00330
00331 static bool s_ReqEventsDisabled = false;
00332
00333 class CRequestStateGuard
00334 {
00335 public:
00336 CRequestStateGuard(CWorkerNodeJobContext& job_context);
00337
00338 void RequestStart();
00339 void RequestStop();
00340
00341 ~CRequestStateGuard();
00342
00343 private:
00344 CWorkerNodeJobContext& m_JobContext;
00345 };
00346
00347 CRequestStateGuard::CRequestStateGuard(CWorkerNodeJobContext& job_context) :
00348 m_JobContext(job_context)
00349 {
00350 CRequestContext& request_context = CDiagContext::GetRequestContext();
00351
00352 request_context.SetRequestID((int) job_context.GetJobNumber());
00353
00354 const CNetScheduleJob& job = job_context.GetJob();
00355
00356 if (!job.client_ip.empty())
00357 request_context.SetClientIP(job.client_ip);
00358
00359 if (!job.session_id.empty())
00360 request_context.SetSessionID(job.session_id);
00361
00362 request_context.SetAppState(eDiagAppState_RequestBegin);
00363
00364 if (!s_ReqEventsDisabled)
00365 GetDiagContext().PrintRequestStart().Print("jid", job.job_id);
00366 }
00367
00368 inline void CRequestStateGuard::RequestStart()
00369 {
00370 CDiagContext::GetRequestContext().SetAppState(eDiagAppState_Request);
00371 }
00372
00373 inline void CRequestStateGuard::RequestStop()
00374 {
00375 CDiagContext::GetRequestContext().SetAppState(eDiagAppState_RequestEnd);
00376 }
00377
00378 CRequestStateGuard::~CRequestStateGuard()
00379 {
00380 CRequestContext& request_context = CDiagContext::GetRequestContext();
00381
00382 if (!request_context.IsSetRequestStatus())
00383 request_context.SetRequestStatus(
00384 m_JobContext.GetCommitStatus() == CWorkerNodeJobContext::eDone &&
00385 m_JobContext.GetJob().ret_code == 0 ? 200 : 500);
00386
00387 switch (request_context.GetAppState()) {
00388 case eDiagAppState_Request:
00389 request_context.SetAppState(eDiagAppState_RequestEnd);
00390
00391
00392 default:
00393 if (!s_ReqEventsDisabled)
00394 GetDiagContext().PrintRequestStop();
00395 request_context.SetAppState(eDiagAppState_NotSet);
00396 request_context.UnsetSessionID();
00397 request_context.UnsetClientIP();
00398 }
00399 }
00400
00401 void CGridThreadContext::RunJobs(CWorkerNodeJobContext& job_context)
00402 {
00403 _ASSERT(!m_JobContext);
00404 m_JobContext = &job_context;
00405 job_context.SetThreadContext(this);
00406
00407 bool more_jobs;
00408
00409 do {
00410 more_jobs = false;
00411
00412 CGridDebugContext* debug_context = CGridDebugContext::GetInstance();
00413 if (debug_context) {
00414 debug_context->DumpInput(m_JobContext->GetJobInput(),
00415 m_JobContext->GetJobNumber());
00416 }
00417 m_JobContext->GetWorkerNode().x_NotifyJobWatcher(*m_JobContext,
00418 IWorkerNodeJobWatcher::eJobStarted);
00419
00420 CRequestStateGuard request_state_guard(job_context);
00421
00422 request_state_guard.RequestStart();
00423
00424 CNetScheduleJob new_job;
00425 try {
00426 CRef<IWorkerNodeJob> job(GetJob());
00427 try {
00428 job_context.SetJobRetCode(job->Do(job_context));
00429 } catch (CGridWorkerNodeException& ex) {
00430 if (ex.GetErrCode() !=
00431 CGridWorkerNodeException::eExclusiveModeIsAlreadySet)
00432 throw;
00433
00434 if (job_context.IsLogRequested()) {
00435 LOG_POST_X(21, "Job " << job_context.GetJobKey() <<
00436 " has been returned back to the queue "
00437 "because it requested exclusive mode but "
00438 "another job already has the exclusive status.");
00439 }
00440 }
00441 CloseStreams();
00442 unsigned try_count = 0;
00443 for (;;) {
00444 try {
00445 switch (job_context.GetCommitStatus()) {
00446 case CWorkerNodeJobContext::eDone:
00447 more_jobs = PutResult(new_job);
00448 break;
00449
00450 case CWorkerNodeJobContext::eFailure:
00451 PutFailure(kEmptyStr);
00452 break;
00453
00454 case CWorkerNodeJobContext::eNotCommitted:
00455 if (!TWorkerNode_AllowImplicitJobReturn::
00456 GetDefault() &&
00457 job_context.GetShutdownLevel() ==
00458 CNetScheduleAdmin::eNoShutdown) {
00459 PutFailure("Job was not explicitly committed");
00460 break;
00461 }
00462
00463
00464 case CWorkerNodeJobContext::eReturn:
00465 ReturnJob();
00466 }
00467 break;
00468 } catch (CNetServiceException& ex) {
00469 if (++try_count >= TServConn_ConnMaxRetries::GetDefault())
00470 throw;
00471 ERR_POST_X(22, "Communication Error : " << ex.what());
00472 SleepMilliSec(s_GetRetryDelay());
00473 }
00474 }
00475
00476 if (!CGridGlobals::GetInstance().IsShuttingDown())
00477 static_cast<CWorkerNodeJobCleanup*>(
00478 job_context.GetCleanupEventSource())->CallEventHandlers();
00479
00480 request_state_guard.RequestStop();
00481 }
00482 catch (exception& ex) {
00483 x_HandleRunJobError(&ex);
00484 }
00485 catch (...) {
00486 x_HandleRunJobError();
00487 }
00488
00489 CloseStreams();
00490
00491 _ASSERT(m_JobContext);
00492 m_JobContext->GetWorkerNode().x_NotifyJobWatcher(*m_JobContext,
00493 IWorkerNodeJobWatcher::eJobStopped);
00494
00495 if (more_jobs)
00496 job_context.Reset(new_job);
00497 } while (more_jobs);
00498
00499 m_JobContext->SetThreadContext(NULL);
00500 m_JobContext = NULL;
00501 }
00502
00503 void CWorkerNodeRequest::x_HandleProcessError(exception* ex)
00504 {
00505 string msg = " Error during job run";
00506 if (ex) {
00507 msg += ": ";
00508 msg += ex->what();
00509 }
00510 ERR_POST_X(23, msg);
00511 try {
00512 m_JobContext->GetWorkerNode().x_ReturnJob(m_JobContext->GetJobKey());
00513 } catch (exception& ex1) {
00514 ERR_POST_X(24, "Could not return job back to queue: " << ex1.what());
00515 } catch (...) {
00516 ERR_POST_X(25, "Could not return job back to queue.");
00517 }
00518 }
00519
00520 void CWorkerNodeRequest::Process()
00521 {
00522 try {
00523 CGridThreadContext* thread_context = s_tls.GetValue();
00524 if (!thread_context) {
00525 thread_context =
00526 new CGridThreadContext(m_JobContext->GetWorkerNode());
00527 s_tls.SetValue(thread_context, s_TlsCleanup);
00528 }
00529 thread_context->RunJobs(*m_JobContext);
00530 }
00531 catch (exception& ex) { x_HandleProcessError(&ex); }
00532 catch (...) { x_HandleProcessError(); }
00533 }
00534
00535
00536
00537
00538
00539
00540 class CWorkerNodeJobWatchers : public IWorkerNodeJobWatcher
00541 {
00542 public:
00543 virtual ~CWorkerNodeJobWatchers() {};
00544 virtual void Notify(const CWorkerNodeJobContext& job, EEvent event)
00545 {
00546 NON_CONST_ITERATE(TCont, it, m_Watchers) {
00547 IWorkerNodeJobWatcher* watcher =
00548 const_cast<IWorkerNodeJobWatcher*>(it->first);
00549 watcher->Notify(job, event);
00550 }
00551 }
00552
00553 void AttachJobWatcher(IWorkerNodeJobWatcher& job_watcher, EOwnership owner)
00554 {
00555 TCont::const_iterator it = m_Watchers.find(&job_watcher);
00556 if (it == m_Watchers.end()) {
00557 if (owner == eTakeOwnership)
00558 m_Watchers[&job_watcher] = AutoPtr<IWorkerNodeJobWatcher>(&job_watcher);
00559 else
00560 m_Watchers[&job_watcher] = AutoPtr<IWorkerNodeJobWatcher>();
00561 }
00562 }
00563
00564 private:
00565 typedef map<IWorkerNodeJobWatcher*, AutoPtr<IWorkerNodeJobWatcher> > TCont;
00566 TCont m_Watchers;
00567 };
00568
00569
00570
00571
00572
00573 class CGridControlThread : public CThread
00574 {
00575 public:
00576 CGridControlThread(CGridWorkerNode* worker_node,
00577 unsigned int start_port, unsigned int end_port) : m_Control(
00578 new CWorkerNodeControlServer(worker_node, start_port, end_port)) {}
00579
00580 void Prepare() { m_Control->StartListening(); }
00581
00582 unsigned short GetControlPort() { return m_Control->GetControlPort(); }
00583 void Stop() { if (m_Control.get()) m_Control->RequestShutdown(); }
00584
00585 protected:
00586 virtual void* Main(void)
00587 {
00588 m_Control->Run();
00589 return NULL;
00590 }
00591 virtual void OnExit(void)
00592 {
00593 CThread::OnExit();
00594 CGridGlobals::GetInstance().RequestShutdown(CNetScheduleAdmin::eShutdownImmediate);
00595 LOG_POST_X(46, "Control Thread has been stopped.");
00596 }
00597
00598 private:
00599 auto_ptr<CWorkerNodeControlServer> m_Control;
00600 };
00601
00602 class CGridCleanupThread : public CThread
00603 {
00604 public:
00605 CGridCleanupThread(CGridWorkerNode* worker_node,
00606 IGridWorkerNodeApp_Listener* listener) :
00607 m_WorkerNode(worker_node),
00608 m_Listener(listener),
00609 m_Semaphore(0, 1)
00610 {
00611 }
00612
00613 bool Wait(unsigned seconds) {return m_Semaphore.TryWait(seconds);}
00614
00615 protected:
00616 virtual void* Main();
00617
00618 private:
00619 CGridWorkerNode* m_WorkerNode;
00620 IGridWorkerNodeApp_Listener* m_Listener;
00621 CSemaphore m_Semaphore;
00622 };
00623
00624 void* CGridCleanupThread::Main()
00625 {
00626 m_WorkerNode->GetCleanupEventSource()->CallEventHandlers();
00627 m_Listener->OnGridWorkerStop();
00628 m_Semaphore.Post();
00629
00630 return NULL;
00631 }
00632
00633
00634
00635
00636 class CWorkerNodeIdleThread : public CThread
00637 {
00638 public:
00639 CWorkerNodeIdleThread(IWorkerNodeIdleTask*,
00640 CGridWorkerNode& worker_node,
00641 unsigned run_delay,
00642 unsigned int auto_shutdown);
00643
00644 void RequestShutdown()
00645 {
00646 m_ShutdownFlag = true;
00647 m_Wait1.Post();
00648 m_Wait2.Post();
00649 }
00650 void Schedule()
00651 {
00652 CFastMutexGuard guard(m_Mutext);
00653 m_AutoShutdownSW.Restart();
00654 if (m_StopFlag) {
00655 m_StopFlag = false;
00656 m_Wait1.Post();
00657 }
00658 }
00659 void Suspend()
00660 {
00661 CFastMutexGuard guard(m_Mutext);
00662 m_AutoShutdownSW.Restart();
00663 m_AutoShutdownSW.Stop();
00664 if (!m_StopFlag) {
00665 m_StopFlag = true;
00666 m_Wait2.Post();
00667 }
00668 }
00669
00670 bool IsShutdownRequested() const { return m_ShutdownFlag; }
00671
00672
00673 protected:
00674 virtual void* Main(void);
00675 virtual void OnExit(void);
00676
00677 CWorkerNodeIdleTaskContext& GetContext();
00678
00679 private:
00680
00681 unsigned int x_GetInterval() const
00682 {
00683 CFastMutexGuard guard(m_Mutext);
00684 return m_AutoShutdown > 0 ?
00685 min( m_AutoShutdown - (unsigned int)m_AutoShutdownSW.Elapsed(), m_RunInterval )
00686 : m_RunInterval;
00687 }
00688 bool x_GetStopFlag() const
00689 {
00690 CFastMutexGuard guard(m_Mutext);
00691 return m_StopFlag;
00692 }
00693 bool x_IsAutoShutdownTime() const
00694 {
00695 CFastMutexGuard guard(m_Mutext);
00696 return m_AutoShutdown > 0 ? m_AutoShutdownSW.Elapsed() > m_AutoShutdown : false;
00697 }
00698
00699 IWorkerNodeIdleTask* m_Task;
00700 CGridWorkerNode& m_WorkerNode;
00701 auto_ptr<CWorkerNodeIdleTaskContext> m_TaskContext;
00702 mutable CSemaphore m_Wait1;
00703 mutable CSemaphore m_Wait2;
00704 volatile bool m_StopFlag;
00705 volatile bool m_ShutdownFlag;
00706 unsigned int m_RunInterval;
00707 unsigned int m_AutoShutdown;
00708 CStopWatch m_AutoShutdownSW;
00709 mutable CFastMutex m_Mutext;
00710
00711 CWorkerNodeIdleThread(const CWorkerNodeIdleThread&);
00712 CWorkerNodeIdleThread& operator=(const CWorkerNodeIdleThread&);
00713 };
00714
00715 CWorkerNodeIdleThread::CWorkerNodeIdleThread(IWorkerNodeIdleTask* task,
00716 CGridWorkerNode& worker_node,
00717 unsigned run_delay,
00718 unsigned int auto_shutdown)
00719 : m_Task(task), m_WorkerNode(worker_node),
00720 m_Wait1(0,100000), m_Wait2(0,1000000),
00721 m_StopFlag(false), m_ShutdownFlag(false),
00722 m_RunInterval(run_delay),
00723 m_AutoShutdown(auto_shutdown), m_AutoShutdownSW(CStopWatch::eStart)
00724 {
00725 }
00726 void* CWorkerNodeIdleThread::Main()
00727 {
00728 while (!m_ShutdownFlag) {
00729 if ( x_IsAutoShutdownTime() ) {
00730 LOG_POST_X(47, "There are no more jobs to be done. Exiting.");
00731 CGridGlobals::GetInstance().RequestShutdown(CNetScheduleAdmin::eShutdownImmediate);
00732 break;
00733 }
00734 unsigned int interval = m_AutoShutdown > 0 ? min (m_RunInterval,m_AutoShutdown) : m_RunInterval;
00735 if (m_Wait1.TryWait(interval, 0)) {
00736 if (m_ShutdownFlag)
00737 continue;
00738 interval = x_GetInterval();
00739 if (m_Wait2.TryWait(interval, 0)) {
00740 continue;
00741 }
00742 }
00743 if (m_Task && !x_GetStopFlag()) {
00744 try {
00745 do {
00746 if ( x_IsAutoShutdownTime() ) {
00747 LOG_POST_X(48,
00748 "There are no more jobs to be done. Exiting.");
00749 CGridGlobals::GetInstance().RequestShutdown(
00750 CNetScheduleAdmin::eShutdownImmediate);
00751 m_ShutdownFlag = true;
00752 break;
00753 }
00754 GetContext().Reset();
00755 m_Task->Run(GetContext());
00756 } while( GetContext().NeedRunAgain() && !m_ShutdownFlag);
00757 } NCBI_CATCH_ALL_X(58,
00758 "CWorkerNodeIdleThread::Main: Idle Task failed");
00759 }
00760 }
00761 return 0;
00762 }
00763
00764 void CWorkerNodeIdleThread::OnExit(void)
00765 {
00766 LOG_POST_X(49, "Idle Thread has been stopped.");
00767 }
00768
00769 CWorkerNodeIdleTaskContext& CWorkerNodeIdleThread::GetContext()
00770 {
00771 if (!m_TaskContext.get())
00772 m_TaskContext.reset(new CWorkerNodeIdleTaskContext(*this));
00773 return *m_TaskContext;
00774 }
00775
00776
00777
00778
00779 CWorkerNodeIdleTaskContext::
00780 CWorkerNodeIdleTaskContext(CWorkerNodeIdleThread& thread)
00781 : m_Thread(thread), m_RunAgain(false)
00782 {
00783 }
00784 bool CWorkerNodeIdleTaskContext::IsShutdownRequested() const
00785 {
00786 return m_Thread.IsShutdownRequested();
00787 }
00788 void CWorkerNodeIdleTaskContext::Reset()
00789 {
00790 m_RunAgain = false;
00791 }
00792
00793 void CWorkerNodeIdleTaskContext::RequestShutdown()
00794 {
00795 m_Thread.RequestShutdown();
00796 CGridGlobals::GetInstance().
00797 RequestShutdown(CNetScheduleAdmin::eShutdownImmediate);
00798 }
00799
00800
00801
00802
00803
00804 class CIdleWatcher : public IWorkerNodeJobWatcher
00805 {
00806 public:
00807 CIdleWatcher(CWorkerNodeIdleThread& idle)
00808 : m_Idle(idle), m_RunningJobs(0) {}
00809 virtual ~CIdleWatcher() {};
00810 virtual void Notify(const CWorkerNodeJobContext& job, EEvent event)
00811 {
00812 if (event == eJobStarted) {
00813 ++m_RunningJobs;
00814 m_Idle.Suspend();
00815 } else if (event == eJobStopped) {
00816 --m_RunningJobs;
00817 if (m_RunningJobs == 0)
00818 m_Idle.Schedule();
00819 }
00820 }
00821
00822 private:
00823 CWorkerNodeIdleThread& m_Idle;
00824 volatile int m_RunningJobs;
00825 };
00826
00827
00828
00829
00830
00831 CGridWorkerNode::CGridWorkerNode(
00832 CNcbiApplication& app,
00833 IWorkerNodeJobFactory* job_factory,
00834 IBlobStorageFactory* storage_factory,
00835 INetScheduleClientFactory* client_factory) :
00836 m_JobFactory(job_factory),
00837 m_StorageFactory(storage_factory),
00838 m_ClientFactory(client_factory),
00839 m_MaxThreads(1),
00840 m_NSTimeout(30),
00841 m_CheckStatusPeriod(2),
00842 m_ExclusiveJobSemaphore(1, 1),
00843 m_IsProcessingExclusiveJob(false),
00844 m_UseEmbeddedStorage(false),
00845 m_CleanupEventSource(new CWorkerNodeCleanup()),
00846 m_Listener(new CGridWorkerNodeApp_Listener()),
00847 m_App(app),
00848 m_SingleThreadForced(false)
00849 {
00850 if (!m_JobFactory.get())
00851 NCBI_THROW(CGridWorkerAppException,
00852 eJobFactoryIsNotSet, "The JobFactory is not set.");
00853 }
00854
00855 CGridWorkerNode::~CGridWorkerNode()
00856 {
00857 }
00858
00859 void CGridWorkerNode::Init(bool default_merge_lines_value)
00860 {
00861 IRWRegistry& reg = m_App.GetConfig();
00862
00863 if (reg.GetBool("log", "merge_lines", default_merge_lines_value)) {
00864 SetDiagPostFlag(eDPF_PreMergeLines);
00865 SetDiagPostFlag(eDPF_MergeLines);
00866 }
00867
00868 reg.Set(kNetScheduleAPIDriverName, "discover_low_priority_servers", "true");
00869
00870 if (!m_StorageFactory.get())
00871 m_StorageFactory.reset(new CBlobStorageFactory(reg));
00872 if (!m_ClientFactory.get())
00873 m_ClientFactory.reset(new CNetScheduleClientFactory(reg));
00874 }
00875
00876 const char* kServerSec = "server";
00877
00878 int CGridWorkerNode::Run()
00879 {
00880 LOG_POST_X(50, GetJobFactory().GetJobVersion() << WN_BUILD_DATE);
00881
00882 const IRegistry& reg = m_App.GetConfig();
00883 CConfig conf(reg);
00884
00885 unsigned init_threads = 1;
00886
00887 if (!m_SingleThreadForced) {
00888 string max_threads = reg.GetString(kServerSec, "max_threads", "auto");
00889 if (NStr::CompareNocase(max_threads, "auto") == 0)
00890 m_MaxThreads = GetCpuCount();
00891 else {
00892 try {
00893 m_MaxThreads = NStr::StringToUInt(max_threads);
00894 } catch (...) {
00895 m_MaxThreads = GetCpuCount();
00896 ERR_POST_X(51, "Could not convert [" << kServerSec <<
00897 "] max_threads parameter to number.\n"
00898 "Using \'auto\' option (" << m_MaxThreads << ").");
00899 }
00900 }
00901 init_threads =
00902 reg.GetInt(kServerSec, "init_threads", 1, 0, IRegistry::eReturn);
00903 }
00904 if (init_threads > m_MaxThreads)
00905 init_threads = m_MaxThreads;
00906
00907 m_NSTimeout = reg.GetInt(kServerSec,
00908 "job_wait_timeout", 30, 0, IRegistry::eReturn);
00909
00910 unsigned thread_pool_timeout = reg.GetInt(kServerSec,
00911 "thread_pool_timeout", 30, 0, IRegistry::eReturn);
00912
00913 const CArgs& args = m_App.GetArgs();
00914
00915 unsigned int start_port, end_port;
00916
00917 string sport1, sport2;
00918 NStr::SplitInTwo(args["control_port"] ? args["control_port"].AsString() :
00919 reg.GetString(kServerSec, "control_port", "9300"), "-", sport1, sport2);
00920 start_port = NStr::StringToUInt(sport1);
00921 end_port = sport2.empty() ? start_port : NStr::StringToUInt(sport2);
00922
00923 bool log_requested = reg.GetBool(kServerSec,
00924 "log", false, 0, IRegistry::eReturn);
00925
00926 unsigned int idle_run_delay =
00927 reg.GetInt(kServerSec,"idle_run_delay",30,0,IRegistry::eReturn);
00928
00929 unsigned int auto_shutdown =
00930 reg.GetInt(kServerSec,"auto_shutdown_if_idle",0,0,IRegistry::eReturn);
00931
00932 unsigned int infinite_loop_time =
00933 reg.HasEntry(kServerSec, "infinite_loop_time") ?
00934 reg.GetInt(kServerSec, "infinite_loop_time", 0, 0, IRegistry::eReturn) :
00935 reg.GetInt(kServerSec, "infinit_loop_time", 0, 0, IRegistry::eReturn);
00936
00937 bool reuse_job_object =
00938 reg.GetBool(kServerSec, "reuse_job_object", false, 0,
00939 CNcbiRegistry::eReturn);
00940
00941 unsigned int max_total_jobs =
00942 reg.GetInt(kServerSec,"max_total_jobs",0,0,IRegistry::eReturn);
00943
00944 unsigned int max_failed_jobs =
00945 reg.GetInt(kServerSec,"max_failed_jobs",0,0,IRegistry::eReturn);
00946
00947 bool is_daemon =
00948 reg.GetBool(kServerSec, "daemon", false, 0, CNcbiRegistry::eReturn);
00949
00950 vector<string> vhosts;
00951
00952 NStr::Tokenize(reg.GetString(kServerSec,
00953 "master_nodes", kEmptyStr), " ;,", vhosts);
00954
00955 ITERATE(vector<string>, it, vhosts) {
00956 string host, port;
00957 NStr::SplitInTwo(NStr::TruncateSpaces(*it), ":", host, port);
00958 if (host.empty() || port.empty())
00959 continue;
00960 try {
00961 m_Masters.insert(SServerAddress(NStr::ToLower(host),
00962 (unsigned short) NStr::StringToUInt(port)));
00963 } catch(...) {}
00964 }
00965
00966 vhosts.clear();
00967
00968 NStr::Tokenize(reg.GetString(kServerSec,
00969 "admin_hosts", kEmptyStr), " ;,", vhosts);
00970
00971 ITERATE(vector<string>, it, vhosts) {
00972 unsigned int ha = CSocketAPI::gethostbyname(*it);
00973 if (ha != 0)
00974 m_AdminHosts.insert(ha);
00975 }
00976
00977 m_CheckStatusPeriod =
00978 reg.GetInt(kServerSec, "check_status_period", 2, 0, IRegistry::eReturn);
00979
00980 if (reg.HasEntry(kServerSec,"wait_server_timeout")) {
00981 ERR_POST_X(52, "[" << kServerSec <<
00982 "] \"wait_server_timeout\" is not used anymore.\n"
00983 "Use [" << kNetScheduleAPIDriverName <<
00984 "] \"communication_timeout\" parameter instead.");
00985 }
00986
00987 if (reg.HasEntry(kNetScheduleAPIDriverName, "use_embedded_storage"))
00988 m_UseEmbeddedStorage = reg.
00989 GetBool(kNetScheduleAPIDriverName, "use_embedded_storage", false, 0,
00990 CNcbiRegistry::eReturn);
00991 else
00992 m_UseEmbeddedStorage = reg.
00993 GetBool(kNetScheduleAPIDriverName, "use_embedded_input", false, 0,
00994 CNcbiRegistry::eReturn);
00995
00996 CGridDebugContext::eMode debug_mode = CGridDebugContext::eGDC_NoDebug;
00997 string dbg_mode = reg.GetString("gw_debug", "mode", kEmptyStr);
00998 if (NStr::CompareNocase(dbg_mode, "gather")==0) {
00999 debug_mode = CGridDebugContext::eGDC_Gather;
01000 } else if (NStr::CompareNocase(dbg_mode, "execute")==0) {
01001 debug_mode = CGridDebugContext::eGDC_Execute;
01002 }
01003 if (debug_mode != CGridDebugContext::eGDC_NoDebug) {
01004 CGridDebugContext& debug_context =
01005 CGridDebugContext::Create(debug_mode, *m_StorageFactory);
01006 string run_name =
01007 reg.GetString("gw_debug", "run_name", m_App.GetProgramDisplayName());
01008 debug_context.SetRunName(run_name);
01009 if (debug_mode == CGridDebugContext::eGDC_Gather) {
01010 max_total_jobs =
01011 reg.GetInt("gw_debug","gather_nrequests",1,0,IRegistry::eReturn);
01012 } else if (debug_mode == CGridDebugContext::eGDC_Execute) {
01013 string files =
01014 reg.GetString("gw_debug", "execute_requests", kEmptyStr);
01015 max_total_jobs = 0;
01016 debug_context.SetExecuteList(files);
01017 }
01018 is_daemon = false;
01019 }
01020
01021 #if defined(NCBI_OS_UNIX)
01022 if (is_daemon) {
01023 LOG_POST_X(53, "Entering UNIX daemon mode...");
01024 bool daemon = CProcess::Daemonize("/dev/null",
01025 CProcess::fDontChroot |
01026 CProcess::fKeepStdin |
01027 CProcess::fKeepStdout);
01028 if (!daemon) {
01029 return 0;
01030 }
01031 }
01032 #endif
01033
01034 AttachJobWatcher(CGridGlobals::GetInstance().GetJobsWatcher());
01035
01036 CRef<CGridControlThread> control_thread(
01037 new CGridControlThread(this, start_port, end_port));
01038
01039 control_thread->Prepare();
01040
01041 m_RebalanceStrategy = CreateSimpleRebalanceStrategy(conf, "server");
01042
01043 m_SharedNSClient = m_ClientFactory->CreateInstance();
01044 m_NSExecuter =
01045 m_SharedNSClient.GetExecuter(control_thread->GetControlPort());
01046
01047 m_SharedNSClient.SetProgramVersion(m_JobFactory->GetJobVersion());
01048
01049 CGridGlobals::GetInstance().SetReuseJobObject(reuse_job_object);
01050 CGridGlobals::GetInstance().GetJobsWatcher().SetMaxJobsAllowed(max_total_jobs);
01051 CGridGlobals::GetInstance().GetJobsWatcher().SetMaxFailuresAllowed(max_failed_jobs);
01052 CGridGlobals::GetInstance().GetJobsWatcher().SetInfinitLoopTime(infinite_loop_time);
01053 CGridGlobals::GetInstance().SetWorker(*this);
01054
01055 IWorkerNodeIdleTask* task = NULL;
01056 if (idle_run_delay > 0)
01057 task = GetJobFactory().GetIdleTask();
01058 if (task || auto_shutdown > 0 ) {
01059 m_IdleThread.Reset(new CWorkerNodeIdleThread(task, *this,
01060 task ? idle_run_delay : auto_shutdown,
01061 auto_shutdown));
01062 m_IdleThread->Run();
01063 AttachJobWatcher(*(new CIdleWatcher(*m_IdleThread)), eTakeOwnership);
01064 }
01065
01066 control_thread->Run();
01067
01068 LOG_POST_X(54, "\n=================== NEW RUN : " <<
01069 CGridGlobals::GetInstance().GetStartTime().AsString() <<
01070 " ===================\n" <<
01071 GetJobFactory().GetJobVersion() << WN_BUILD_DATE << " is started.\n"
01072 "Waiting for control commands on " << CSocketAPI::gethostname() <<
01073 ":" << control_thread->GetControlPort() << "\n"
01074 "Queue name: " << GetQueueName() << "\n"
01075 "Maximum job threads: " << m_MaxThreads << "\n");
01076
01077 try {
01078 GetNSExecuter().RegisterClient();
01079 } catch (CNetServiceException& ex) {
01080
01081 if (ex.GetErrCode() != CNetServiceException::eCommunicationError ||
01082 NStr::Find(ex.what(), "Server error:Unknown request") == NPOS)
01083 throw;
01084 }
01085
01086 m_Listener->OnGridWorkerStart();
01087
01088 auto_ptr<CGridThreadContext> single_thread_context;
01089 auto_ptr<CStdPoolOfThreads> thread_pool;
01090
01091 _ASSERT(m_MaxThreads > 0);
01092
01093 if (m_MaxThreads <= 1)
01094 single_thread_context.reset(new CGridThreadContext(*this));
01095 else {
01096 thread_pool.reset(new CStdPoolOfThreads(m_MaxThreads, 0));
01097 try {
01098 thread_pool->Spawn(init_threads);
01099 }
01100 catch (exception& ex) {
01101 ERR_POST_X(26, ex.what());
01102 CGridGlobals::GetInstance().RequestShutdown(
01103 CNetScheduleAdmin::eShutdownImmediate);
01104 return 2;
01105 }
01106 catch (...) {
01107 ERR_POST_X(27, "Unknown error");
01108 CGridGlobals::GetInstance().RequestShutdown(
01109 CNetScheduleAdmin::eShutdownImmediate);
01110 return 3;
01111 }
01112 }
01113
01114 CNetScheduleJob job;
01115
01116 unsigned try_count = 0;
01117 while (!CGridGlobals::GetInstance().IsShuttingDown()) {
01118 try {
01119
01120 if (m_MaxThreads > 1) {
01121 try {
01122 thread_pool->WaitForRoom(thread_pool_timeout);
01123 } catch (CBlockingQueueException&) {
01124
01125 continue;
01126 }
01127 }
01128
01129 if (x_GetNextJob(job)) {
01130 auto_ptr<CWorkerNodeJobContext>
01131 job_context(new CWorkerNodeJobContext(*this,
01132 job,
01133 log_requested));
01134 job_context->Reset(job);
01135 if (m_MaxThreads > 1 ) {
01136 CRef<CStdRequest> job_req(
01137 new CWorkerNodeRequest(job_context));
01138 try {
01139 thread_pool->AcceptRequest(job_req);
01140 } catch (CBlockingQueueException& ex) {
01141 ERR_POST_X(28, ex.what());
01142
01143 _ASSERT(0);
01144 x_ReturnJob(job.job_id);
01145 }
01146 } else {
01147 try {
01148 single_thread_context->RunJobs(*job_context);
01149 } catch (...) {
01150 x_ReturnJob(job_context->GetJobKey());
01151 throw;
01152 }
01153 }
01154 }
01155 } catch (CNetServiceException& ex) {
01156 ERR_POST_X(40, ex.what());
01157 if (++try_count >= TServConn_ConnMaxRetries::GetDefault()) {
01158 CGridGlobals::GetInstance().RequestShutdown(
01159 CNetScheduleAdmin::eShutdownImmediate);
01160 } else {
01161 SleepMilliSec(s_GetRetryDelay());
01162 continue;
01163 }
01164 } catch (exception& ex) {
01165 if (TWorkerNode_StopOnJobErrors::GetDefault()) {
01166 ERR_POST_X(29, ex.what());
01167 CGridGlobals::GetInstance().RequestShutdown(
01168 CNetScheduleAdmin::eShutdownImmediate);
01169 }
01170 } catch (...) {
01171 if (TWorkerNode_StopOnJobErrors::GetDefault()) {
01172 ERR_POST_X(30, "Unknown error");
01173 CGridGlobals::GetInstance().RequestShutdown(
01174 CNetScheduleAdmin::eShutdownImmediate);
01175 }
01176 }
01177 try_count = 0;
01178 }
01179 LOG_POST_X(31, "Shutting down...");
01180 if (reg.GetBool(kServerSec,
01181 "force_exit", false, 0, CNcbiRegistry::eReturn)) {
01182 ERR_POST_X(45, "Force exit");
01183 } else if (m_MaxThreads > 1) {
01184 try {
01185 LOG_POST_X(32, "Stopping worker threads...");
01186 thread_pool->KillAllThreads(true);
01187 thread_pool.reset(0);
01188 } catch (exception& ex) {
01189 ERR_POST_X(33, "Could not stop worker threads: " << ex.what());
01190 } catch (...) {
01191 ERR_POST_X(34, "Could not stop worker threads: Unknown error");
01192 }
01193 }
01194 try {
01195 GetNSExecuter().UnRegisterClient();
01196 } catch (CNetServiceException& ex) {
01197
01198 if (ex.GetErrCode() != CNetServiceException::eCommunicationError
01199 || NStr::Find(ex.what(),"Server error:Unknown request") == NPOS) {
01200 ERR_POST_X(35, "Could unregister from NetScehdule services: "
01201 << ex.what());
01202 }
01203 } catch(exception& ex) {
01204 ERR_POST_X(36, "Could unregister from NetScehdule services: "
01205 << ex.what());
01206 } catch(...) {
01207 ERR_POST_X(37, "Could unregister from NetScehdule services: "
01208 "Unknown error." );
01209 }
01210
01211 LOG_POST_X(38, "Worker Node has been stopped.");
01212
01213
01214 CRef<CGridCleanupThread> cleanup_thread(
01215 new CGridCleanupThread(this, m_Listener.get()));
01216
01217 cleanup_thread->Run();
01218
01219 LOG_POST_X(55, "Stopping Control thread...");
01220 control_thread->Stop();
01221 control_thread->Join();
01222
01223 CNcbiOstrstream os;
01224 CGridGlobals::GetInstance().GetJobsWatcher().Print(os);
01225 LOG_POST_X(56, string(CNcbiOstrstreamToString(os)));
01226
01227 if (m_IdleThread) {
01228 if (!m_IdleThread->IsShutdownRequested()) {
01229 LOG_POST_X(57, "Stopping Idle thread...");
01230 m_IdleThread->RequestShutdown();
01231 }
01232 m_IdleThread->Join();
01233 }
01234
01235 if (cleanup_thread->Wait(thread_pool_timeout)) {
01236 cleanup_thread->Join();
01237 LOG_POST_X(58, "Cleanup thread finished");
01238 } else {
01239 ERR_POST_X(59, "Clean-up thread timed out");
01240 }
01241
01242 return 0;
01243 }
01244
01245 void CGridWorkerNode::RequestShutdown()
01246 {
01247 CGridGlobals::GetInstance().
01248 RequestShutdown(CNetScheduleAdmin::eShutdownImmediate);
01249 }
01250
01251
01252 void CGridWorkerNode::AttachJobWatcher(IWorkerNodeJobWatcher& job_watcher,
01253 EOwnership owner)
01254 {
01255 if (!m_JobWatchers.get())
01256 m_JobWatchers.reset(new CWorkerNodeJobWatchers);
01257 m_JobWatchers->AttachJobWatcher(job_watcher, owner);
01258 };
01259
01260 void CGridWorkerNode::SetListener(IGridWorkerNodeApp_Listener* listener)
01261 {
01262 m_Listener.reset(listener ? listener : new CGridWorkerNodeApp_Listener());
01263 }
01264
01265
01266 void CGridWorkerNode::x_NotifyJobWatcher(const CWorkerNodeJobContext& job,
01267 IWorkerNodeJobWatcher::EEvent event)
01268 {
01269 if (m_JobWatchers.get() != NULL) {
01270 CFastMutexGuard guard(m_JobWatcherMutex);
01271 m_JobWatchers->Notify(job, event);
01272 }
01273 }
01274
01275 bool CGridWorkerNode::x_GetNextJob(CNetScheduleJob& job)
01276 {
01277 bool job_exists = false;
01278
01279 CGridDebugContext* debug_context = CGridDebugContext::GetInstance();
01280
01281 if (debug_context &&
01282 debug_context->GetDebugMode() == CGridDebugContext::eGDC_Execute) {
01283 job_exists = debug_context->GetNextJob(job.job_id, job.input);
01284 if (!job_exists) {
01285 CGridGlobals::GetInstance().
01286 RequestShutdown(CNetScheduleAdmin::eNormalShutdown);
01287 }
01288 } else {
01289 if (!x_AreMastersBusy()) {
01290 SleepSec(m_NSTimeout);
01291 return false;
01292 }
01293
01294 if (!WaitForExclusiveJobToFinish())
01295 return false;
01296
01297 job_exists = GetNSExecuter().WaitJob(job, m_NSTimeout);
01298
01299 if (job_exists && job.mask & CNetScheduleAPI::eExclusiveJob)
01300 job_exists = EnterExclusiveModeOrReturnJob(job);
01301 }
01302 if (job_exists && CGridGlobals::GetInstance().IsShuttingDown()) {
01303 x_ReturnJob(job.job_id);
01304 return false;
01305 }
01306 return job_exists;
01307 }
01308
01309 void CGridWorkerNode::x_ReturnJob(const string& job_key)
01310 {
01311 CGridDebugContext* debug_context = CGridDebugContext::GetInstance();
01312 if (!debug_context || debug_context->GetDebugMode() != CGridDebugContext::eGDC_Execute) {
01313 GetNSExecuter().ReturnJob(job_key);
01314 }
01315 }
01316
01317 void CGridWorkerNode::x_FailJob(const string& job_key, const string& reason)
01318 {
01319 CGridDebugContext* debug_context = CGridDebugContext::GetInstance();
01320 if (!debug_context ||
01321 debug_context->GetDebugMode() != CGridDebugContext::eGDC_Execute) {
01322 CNetScheduleJob job(job_key);
01323 job.error_msg = reason;
01324 GetNSExecuter().PutFailure(job);
01325 }
01326 }
01327
01328 size_t CGridWorkerNode::GetServerOutputSize() const
01329 {
01330 return IsEmeddedStorageUsed() ?
01331 GetNSClient().GetServerParams().max_output_size : 0;
01332 }
01333
01334 bool CGridWorkerNode::IsHostInAdminHostsList(const string& host) const
01335 {
01336 if (m_AdminHosts.empty())
01337 return true;
01338 unsigned int ha = CSocketAPI::gethostbyname(host);
01339 if (m_AdminHosts.find(ha) != m_AdminHosts.end())
01340 return true;
01341 unsigned int ha_lh = CSocketAPI::gethostbyname("");
01342 if (ha == ha_lh) {
01343 ha = CSocketAPI::gethostbyname("localhost");
01344 if (m_AdminHosts.find(ha) != m_AdminHosts.end())
01345 return true;
01346 }
01347 return false;
01348 }
01349
01350 bool CGridWorkerNode::x_AreMastersBusy() const
01351 {
01352 ITERATE(set<SServerAddress>, it, m_Masters) {
01353 STimeout tmo = {0, 500};
01354 CSocket socket(it->host, it->port, &tmo, eOff);
01355 if (socket.GetStatus(eIO_Open) != eIO_Success)
01356 continue;
01357
01358 CNcbiOstrstream os;
01359 os << GetJobVersion() << endl;
01360 os << GetNSClient().GetQueueName() <<";"
01361 << GetNSClient().GetService().GetServiceName();
01362 os << endl;
01363 os << "GETLOAD" << ends;
01364 if (socket.Write(os.str(), os.pcount()) != eIO_Success) {
01365 os.freeze(false);
01366 continue;
01367 }
01368 os.freeze(false);
01369 string reply;
01370 if (socket.ReadLine(reply) != eIO_Success)
01371 continue;
01372 if (NStr::StartsWith(reply, "ERR:")) {
01373 string msg;
01374 NStr::Replace(reply, "ERR:", "", msg);
01375 ERR_POST_X(43, "Worker Node at " << it->AsString() <<
01376 " returned error: " << msg);
01377 } else if (NStr::StartsWith(reply, "OK:")) {
01378 string msg;
01379 NStr::Replace(reply, "OK:", "", msg);
01380 try {
01381 int load = NStr::StringToInt(msg);
01382 if (load > 0)
01383 return false;
01384 } catch (...) {}
01385 } else {
01386 ERR_POST_X(44, "Worker Node at " << it->AsString() <<
01387 " returned unknown reply: " << reply);
01388 }
01389 }
01390 return true;
01391 }
01392
01393 bool CGridWorkerNode::IsTimeToRebalance()
01394 {
01395 if (!m_SharedNSClient.GetService().IsLoadBalanced() ||
01396 TWorkerNode_DoNotRebalance::GetDefault())
01397 return false;
01398
01399 m_RebalanceStrategy->OnResourceRequested();
01400 return m_RebalanceStrategy->NeedRebalance();
01401 }
01402
01403 bool CGridWorkerNode::EnterExclusiveMode()
01404 {
01405 if (m_ExclusiveJobSemaphore.TryWait()) {
01406 _ASSERT(!m_IsProcessingExclusiveJob);
01407
01408 m_IsProcessingExclusiveJob = true;
01409 return true;
01410 }
01411 return false;
01412 }
01413
01414 bool CGridWorkerNode::EnterExclusiveModeOrReturnJob(
01415 CNetScheduleJob& exclusive_job)
01416 {
01417 _ASSERT(exclusive_job.mask & CNetScheduleAPI::eExclusiveJob);
01418
01419 if (!EnterExclusiveMode()) {
01420 x_ReturnJob(exclusive_job.job_id);
01421 return false;
01422 }
01423 return true;
01424 }
01425
01426 void CGridWorkerNode::LeaveExclusiveMode()
01427 {
01428 _ASSERT(m_IsProcessingExclusiveJob);
01429
01430 m_IsProcessingExclusiveJob = false;
01431 m_ExclusiveJobSemaphore.Post();
01432 }
01433
01434 bool CGridWorkerNode::WaitForExclusiveJobToFinish()
01435 {
01436 if (m_ExclusiveJobSemaphore.TryWait(m_NSTimeout)) {
01437 m_ExclusiveJobSemaphore.Post();
01438 return true;
01439 }
01440 return false;
01441 }
01442
01443 void CGridWorkerNode::DisableDefaultRequestEventLogging()
01444 {
01445 s_ReqEventsDisabled = true;
01446 }
01447
01448 IWorkerNodeCleanupEventSource* CGridWorkerNode::GetCleanupEventSource()
01449 {
01450 return m_CleanupEventSource;
01451 }
01452
01453 END_NCBI_SCOPE
01454
01455