|
NCBI C++ ToolKit
|
00001 /* $Id: grid_worker.cpp 54418 2012-05-11 22:04:54Z kazimird $ 00002 * =========================================================================== 00003 * 00004 * PUBLIC DOMAIN NOTICE 00005 * National Center for Biotechnology Information 00006 * 00007 * This software/database is a "United States Government Work" under the 00008 * terms of the United States Copyright Act. It was written as part of 00009 * the author's official duties as a United States Government employee and 00010 * thus cannot be copyrighted. This software/database is freely available 00011 * to the public for use. The National Library of Medicine and the U.S. 00012 * Government have not placed any restriction on its use or reproduction. 00013 * 00014 * Although all reasonable efforts have been taken to ensure the accuracy 00015 * and reliability of the software and data, the NLM and the U.S. 00016 * Government do not and cannot warrant the performance or results that 00017 * may be obtained by using this software or data. The NLM and the U.S. 00018 * Government disclaim all warranties, express or implied, including 00019 * warranties of performance, merchantability or fitness for any particular 00020 * purpose. 00021 * 00022 * Please cite the author in any work or product based on this material. 00023 * 00024 * =========================================================================== 00025 * 00026 * Authors: Maxim Didenko, Anatoliy Kuznetsov, Dmitry Kazimirov 00027 * 00028 * File Description: 00029 * NetSchedule Worker Node implementation 00030 */ 00031 00032 #include <ncbi_pch.hpp> 00033 00034 #include "grid_thread_context.hpp" 00035 #include "grid_debug_context.hpp" 00036 #include "netschedule_api_impl.hpp" 00037 00038 #include <connect/services/grid_globals.hpp> 00039 #include <connect/services/error_codes.hpp> 00040 #include <connect/services/netschedule_api_expt.hpp> 00041 #include <connect/services/grid_worker_app.hpp> 00042 #include <connect/services/grid_control_thread.hpp> 00043 00044 #include <connect/ncbi_socket.hpp> 00045 00046 #include <util/thread_pool.hpp> 00047 00048 #include <corelib/ncbireg.hpp> 00049 #include <corelib/ncbithr.hpp> 00050 #include <corelib/ncbitime.hpp> 00051 #include <corelib/ncbi_process.hpp> 00052 #include <corelib/ncbiexpt.hpp> 00053 #include <corelib/ncbi_system.hpp> 00054 #include <corelib/ncbi_safe_static.hpp> 00055 #include <corelib/request_ctx.hpp> 00056 00057 #ifdef NCBI_OS_UNIX 00058 #include <unistd.h> 00059 #endif 00060 00061 00062 #define NCBI_USE_ERRCODE_X ConnServ_WorkerNode 00063 00064 00065 BEGIN_NCBI_SCOPE 00066 00067 ///////////////////////////////////////////////////////////////////////////// 00068 // 00069 IWorkerNodeJobWatcher::~IWorkerNodeJobWatcher() 00070 { 00071 } 00072 00073 ///////////////////////////////////////////////////////////////////////////// 00074 // 00075 // CWorkerNodeJobContext -- 00076 00077 namespace { 00078 00079 class CWorkerNodeCleanup : public IWorkerNodeCleanupEventSource 00080 { 00081 public: 00082 typedef set<IWorkerNodeCleanupEventListener*> TListeners; 00083 00084 virtual void AddListener(IWorkerNodeCleanupEventListener* listener); 00085 virtual void RemoveListener(IWorkerNodeCleanupEventListener* listener); 00086 00087 virtual void CallEventHandlers(); 00088 00089 void RemoveListeners(const TListeners& listeners); 00090 00091 protected: 00092 TListeners m_Listeners; 00093 CFastMutex m_ListenersLock; 00094 }; 00095 00096 void CWorkerNodeCleanup::AddListener(IWorkerNodeCleanupEventListener* listener) 00097 { 00098 CFastMutexGuard g(m_ListenersLock); 00099 m_Listeners.insert(listener); 00100 } 00101 00102 void CWorkerNodeCleanup::RemoveListener( 00103 IWorkerNodeCleanupEventListener* listener) 00104 { 00105 CFastMutexGuard g(m_ListenersLock); 00106 m_Listeners.erase(listener); 00107 } 00108 00109 void CWorkerNodeCleanup::CallEventHandlers() 00110 { 00111 TListeners listeners; 00112 { 00113 CFastMutexGuard g(m_ListenersLock); 00114 listeners.swap(m_Listeners); 00115 } 00116 00117 ITERATE(TListeners, it, listeners) { 00118 try { 00119 (*it)->HandleEvent( 00120 IWorkerNodeCleanupEventListener::eRegularCleanup); 00121 delete *it; 00122 } 00123 NCBI_CATCH_ALL_X(39, "Job clean-up error"); 00124 } 00125 } 00126 00127 void CWorkerNodeCleanup::RemoveListeners( 00128 const CWorkerNodeCleanup::TListeners& listeners) 00129 { 00130 CFastMutexGuard g(m_ListenersLock); 00131 ITERATE(TListeners, it, listeners) { 00132 m_Listeners.erase(*it); 00133 } 00134 } 00135 00136 class CWorkerNodeJobCleanup : public CWorkerNodeCleanup 00137 { 00138 public: 00139 CWorkerNodeJobCleanup(CWorkerNodeCleanup* worker_node_cleanup); 00140 00141 virtual void AddListener(IWorkerNodeCleanupEventListener* listener); 00142 virtual void RemoveListener(IWorkerNodeCleanupEventListener* listener); 00143 00144 virtual void CallEventHandlers(); 00145 00146 private: 00147 CWorkerNodeCleanup* m_WorkerNodeCleanup; 00148 }; 00149 00150 CWorkerNodeJobCleanup::CWorkerNodeJobCleanup( 00151 CWorkerNodeCleanup* worker_node_cleanup) : 00152 m_WorkerNodeCleanup(worker_node_cleanup) 00153 { 00154 } 00155 00156 void CWorkerNodeJobCleanup::AddListener( 00157 IWorkerNodeCleanupEventListener* listener) 00158 { 00159 CWorkerNodeCleanup::AddListener(listener); 00160 m_WorkerNodeCleanup->AddListener(listener); 00161 } 00162 00163 void CWorkerNodeJobCleanup::RemoveListener( 00164 IWorkerNodeCleanupEventListener* listener) 00165 { 00166 CWorkerNodeCleanup::RemoveListener(listener); 00167 m_WorkerNodeCleanup->RemoveListener(listener); 00168 } 00169 00170 void CWorkerNodeJobCleanup::CallEventHandlers() 00171 { 00172 { 00173 CFastMutexGuard g(m_ListenersLock); 00174 m_WorkerNodeCleanup->RemoveListeners(m_Listeners); 00175 } 00176 CWorkerNodeCleanup::CallEventHandlers(); 00177 } 00178 00179 } // end of anonymous namespace 00180 00181 CWorkerNodeJobContext::CWorkerNodeJobContext(CGridWorkerNode& worker_node, 00182 const CNetScheduleJob& job, 00183 bool log_requested) 00184 : m_WorkerNode(worker_node), 00185 m_LogRequested(log_requested), 00186 m_ThreadContext(NULL), 00187 m_CleanupEventSource(new CWorkerNodeJobCleanup( 00188 static_cast<CWorkerNodeCleanup*>(worker_node.GetCleanupEventSource()))) 00189 { 00190 Reset(job); 00191 } 00192 00193 const string& CWorkerNodeJobContext::GetQueueName() const 00194 { 00195 return m_WorkerNode.GetQueueName(); 00196 } 00197 const string& CWorkerNodeJobContext::GetClientName() const 00198 { 00199 return m_WorkerNode.GetClientName(); 00200 } 00201 00202 CNcbiIstream& CWorkerNodeJobContext::GetIStream() 00203 { 00204 _ASSERT(m_ThreadContext); 00205 return m_ThreadContext->GetIStream(); 00206 } 00207 CNcbiOstream& CWorkerNodeJobContext::GetOStream() 00208 { 00209 _ASSERT(m_ThreadContext); 00210 return m_ThreadContext->GetOStream(); 00211 } 00212 00213 void CWorkerNodeJobContext::CommitJob() 00214 { 00215 CheckIfCanceled(); 00216 m_JobCommitted = eDone; 00217 } 00218 00219 void CWorkerNodeJobContext::CommitJobWithFailure(const string& err_msg) 00220 { 00221 CheckIfCanceled(); 00222 m_JobCommitted = eFailure; 00223 m_Job.error_msg = err_msg; 00224 } 00225 00226 void CWorkerNodeJobContext::ReturnJob() 00227 { 00228 CheckIfCanceled(); 00229 m_JobCommitted = eReturn; 00230 } 00231 00232 void CWorkerNodeJobContext::PutProgressMessage(const string& msg, 00233 bool send_immediately) 00234 { 00235 _ASSERT(m_ThreadContext); 00236 CheckIfCanceled(); 00237 m_ThreadContext->PutProgressMessage(msg, send_immediately); 00238 } 00239 00240 void CWorkerNodeJobContext::JobDelayExpiration(unsigned runtime_inc) 00241 { 00242 _ASSERT(m_ThreadContext); 00243 m_ThreadContext->JobDelayExpiration(runtime_inc); 00244 } 00245 00246 CNetScheduleAdmin::EShutdownLevel 00247 CWorkerNodeJobContext::GetShutdownLevel(void) const 00248 { 00249 _ASSERT(m_ThreadContext); 00250 if (m_ThreadContext->IsJobCanceled()) 00251 return CNetScheduleAdmin::eShutdownImmediate; 00252 return CGridGlobals::GetInstance().GetShutdownLevel(); 00253 } 00254 00255 void CWorkerNodeJobContext::CheckIfCanceled() 00256 { 00257 if (IsCanceled()) { 00258 NCBI_THROW_FMT(CGridWorkerNodeException, eJobIsCanceled, 00259 "Job " << m_Job.job_id << " has been canceled"); 00260 } 00261 } 00262 00263 void CWorkerNodeJobContext::Reset(const CNetScheduleJob& job) 00264 { 00265 m_Job = job; 00266 m_JobNumber = CGridGlobals::GetInstance().GetNewJobNumber(); 00267 00268 m_JobCommitted = eNotCommitted; 00269 m_InputBlobSize = 0; 00270 m_ExclusiveJob = m_Job.mask & CNetScheduleAPI::eExclusiveJob; 00271 } 00272 00273 void CWorkerNodeJobContext::RequestExclusiveMode() 00274 { 00275 if (!m_ExclusiveJob) { 00276 if (!m_WorkerNode.EnterExclusiveMode()) { 00277 NCBI_THROW(CGridWorkerNodeException, 00278 eExclusiveModeIsAlreadySet, ""); 00279 } 00280 m_ExclusiveJob = true; 00281 } 00282 } 00283 00284 size_t CWorkerNodeJobContext::GetMaxServerOutputSize() 00285 { 00286 return m_WorkerNode.GetServerOutputSize(); 00287 } 00288 00289 IWorkerNodeCleanupEventSource* CWorkerNodeJobContext::GetCleanupEventSource() 00290 { 00291 return m_CleanupEventSource; 00292 } 00293 00294 00295 ///////////////////////////////////////////////////////////////////////////// 00296 // 00297 ///@internal 00298 static void s_TlsCleanup(CGridThreadContext* p_value, void* /* data */ ) 00299 { 00300 delete p_value; 00301 } 00302 /// @internal 00303 static CStaticTls<CGridThreadContext> s_tls; 00304 00305 ///@internal 00306 class CWorkerNodeRequest : public CStdRequest 00307 { 00308 public: 00309 CWorkerNodeRequest(auto_ptr<CWorkerNodeJobContext> context); 00310 00311 virtual void Process(); 00312 00313 private: 00314 void x_HandleProcessError(exception* ex = NULL); 00315 00316 auto_ptr<CWorkerNodeJobContext> m_JobContext; 00317 }; 00318 00319 00320 CWorkerNodeRequest::CWorkerNodeRequest(auto_ptr<CWorkerNodeJobContext> context) 00321 : m_JobContext(context) 00322 { 00323 } 00324 00325 00326 static CGridWorkerNode::EDisabledRequestEvents s_ReqEventsDisabled = 00327 CGridWorkerNode::eEnableStartStop; 00328 00329 class CRequestStateGuard 00330 { 00331 public: 00332 CRequestStateGuard(CWorkerNodeJobContext& job_context); 00333 00334 ~CRequestStateGuard(); 00335 00336 private: 00337 CWorkerNodeJobContext& m_JobContext; 00338 }; 00339 00340 CRequestStateGuard::CRequestStateGuard(CWorkerNodeJobContext& job_context) : 00341 m_JobContext(job_context) 00342 { 00343 CRequestContext& request_context = CDiagContext::GetRequestContext(); 00344 00345 request_context.SetRequestID((int) job_context.GetJobNumber()); 00346 00347 const CNetScheduleJob& job = job_context.GetJob(); 00348 00349 if (!job.client_ip.empty()) 00350 request_context.SetClientIP(job.client_ip); 00351 00352 if (!job.session_id.empty()) 00353 request_context.SetSessionID(job.session_id); 00354 00355 request_context.SetAppState(eDiagAppState_RequestBegin); 00356 00357 if (s_ReqEventsDisabled == CGridWorkerNode::eEnableStartStop) 00358 GetDiagContext().PrintRequestStart().Print("jid", job.job_id); 00359 00360 request_context.SetAppState(eDiagAppState_Request); 00361 } 00362 00363 CRequestStateGuard::~CRequestStateGuard() 00364 { 00365 CRequestContext& request_context = CDiagContext::GetRequestContext(); 00366 00367 request_context.SetAppState(eDiagAppState_RequestEnd); 00368 00369 if (!request_context.IsSetRequestStatus()) 00370 request_context.SetRequestStatus( 00371 m_JobContext.GetCommitStatus() == CWorkerNodeJobContext::eDone && 00372 m_JobContext.GetJob().ret_code == 0 ? 200 : 500); 00373 00374 switch (request_context.GetAppState()) { 00375 case eDiagAppState_Request: 00376 request_context.SetAppState(eDiagAppState_RequestEnd); 00377 /* FALL THROUGH */ 00378 00379 default: 00380 switch (s_ReqEventsDisabled) { 00381 case CGridWorkerNode::eEnableStartStop: 00382 case CGridWorkerNode::eDisableStartOnly: 00383 GetDiagContext().PrintRequestStop(); 00384 00385 default: 00386 break; 00387 } 00388 request_context.SetAppState(eDiagAppState_NotSet); 00389 request_context.UnsetSessionID(); 00390 request_context.UnsetClientIP(); 00391 } 00392 } 00393 00394 void CGridThreadContext::RunJob(CWorkerNodeJobContext& job_context) 00395 { 00396 _ASSERT(!m_JobContext); 00397 m_JobContext = &job_context; 00398 job_context.SetThreadContext(this); 00399 00400 CGridDebugContext* debug_context = CGridDebugContext::GetInstance(); 00401 if (debug_context) { 00402 debug_context->DumpInput(m_JobContext->GetJobInput(), 00403 m_JobContext->GetJobNumber()); 00404 } 00405 m_JobContext->GetWorkerNode().x_NotifyJobWatcher(*m_JobContext, 00406 IWorkerNodeJobWatcher::eJobStarted); 00407 00408 {{ // CRequestStateGuard scope. 00409 00410 CRequestStateGuard request_state_guard(job_context); 00411 00412 try { 00413 CRef<IWorkerNodeJob> job(GetJob()); 00414 try { 00415 job_context.SetJobRetCode(job->Do(job_context)); 00416 } catch (CGridWorkerNodeException& ex) { 00417 if (ex.GetErrCode() != 00418 CGridWorkerNodeException::eExclusiveModeIsAlreadySet) { 00419 try { 00420 CloseStreams(); 00421 } 00422 NCBI_CATCH_ALL_X(11, "Could not close IO streams"); 00423 throw; 00424 } 00425 00426 if (job_context.IsLogRequested()) { 00427 LOG_POST_X(21, "Job " << job_context.GetJobKey() << 00428 " has been returned back to the queue " 00429 "because it requested exclusive mode but " 00430 "another job already has the exclusive status."); 00431 } 00432 } 00433 CloseStreams(); 00434 unsigned try_count = 0; 00435 for (;;) { 00436 try { 00437 switch (job_context.GetCommitStatus()) { 00438 case CWorkerNodeJobContext::eDone: 00439 PutResult(); 00440 break; 00441 00442 case CWorkerNodeJobContext::eFailure: 00443 PutFailure(); 00444 break; 00445 00446 case CWorkerNodeJobContext::eNotCommitted: 00447 if (!TWorkerNode_AllowImplicitJobReturn:: 00448 GetDefault() && 00449 job_context.GetShutdownLevel() == 00450 CNetScheduleAdmin::eNoShutdown) { 00451 job_context.m_Job.error_msg = 00452 "Job was not explicitly committed"; 00453 PutFailure(); 00454 break; 00455 } 00456 /* FALL THROUGH */ 00457 00458 case CWorkerNodeJobContext::eReturn: 00459 ReturnJob(); 00460 break; 00461 00462 case CWorkerNodeJobContext::eCanceled: 00463 ERR_POST("Job " << job_context.GetJobKey() << 00464 " has been canceled"); 00465 } 00466 break; 00467 } catch (CNetServiceException& ex) { 00468 if (++try_count >= TServConn_ConnMaxRetries::GetDefault()) 00469 throw; 00470 ERR_POST_X(22, "Communication Error : " << ex.what()); 00471 SleepMilliSec(s_GetRetryDelay()); 00472 } 00473 } 00474 00475 if (!CGridGlobals::GetInstance().IsShuttingDown()) 00476 static_cast<CWorkerNodeJobCleanup*>( 00477 job_context.GetCleanupEventSource())->CallEventHandlers(); 00478 } 00479 catch (CNetScheduleException& e) { 00480 ERR_POST_X(20, m_JobContext->GetJobKey() << 00481 " Error in Job execution: " << e.what()); 00482 if (e.GetErrCode() != CNetScheduleException::eJobNotFound) 00483 PutFailureAndIgnoreErrors(e.what()); 00484 } 00485 catch (exception& ex) { 00486 ERR_POST_X(18, m_JobContext->GetJobKey() << 00487 " Error in Job execution: " << ex.what()); 00488 PutFailureAndIgnoreErrors(ex.what()); 00489 } 00490 00491 m_JobContext->GetWorkerNode().x_NotifyJobWatcher(*m_JobContext, 00492 IWorkerNodeJobWatcher::eJobStopped); 00493 00494 }} // Call CRequestStateGuard destructor before job_context is reset. 00495 00496 m_JobContext->SetThreadContext(NULL); 00497 m_JobContext = NULL; 00498 } 00499 00500 void CWorkerNodeRequest::x_HandleProcessError(exception* ex) 00501 { 00502 string msg = " Error during job run"; 00503 if (ex) { 00504 msg += ": "; 00505 msg += ex->what(); 00506 } 00507 ERR_POST_X(23, msg); 00508 try { 00509 const CNetScheduleJob& job = m_JobContext->GetJob(); 00510 m_JobContext->GetWorkerNode().x_ReturnJob(job.job_id, job.auth_token); 00511 } catch (exception& ex1) { 00512 ERR_POST_X(24, "Could not return job back to queue: " << ex1.what()); 00513 } 00514 } 00515 00516 void CWorkerNodeRequest::Process() 00517 { 00518 try { 00519 CGridThreadContext* thread_context = s_tls.GetValue(); 00520 if (!thread_context) { 00521 thread_context = 00522 new CGridThreadContext(m_JobContext->GetWorkerNode()); 00523 s_tls.SetValue(thread_context, s_TlsCleanup); 00524 } 00525 thread_context->RunJob(*m_JobContext); 00526 } 00527 catch (exception& ex) { x_HandleProcessError(&ex); } 00528 } 00529 00530 ///////////////////////////////////////////////////////////////////////////// 00531 // 00532 // CWorkerNodeJobWatchers 00533 /// @internal 00534 00535 class CWorkerNodeJobWatchers : public IWorkerNodeJobWatcher 00536 { 00537 public: 00538 virtual ~CWorkerNodeJobWatchers() {}; 00539 virtual void Notify(const CWorkerNodeJobContext& job, EEvent event) 00540 { 00541 NON_CONST_ITERATE(TCont, it, m_Watchers) { 00542 IWorkerNodeJobWatcher* watcher = 00543 const_cast<IWorkerNodeJobWatcher*>(it->first); 00544 watcher->Notify(job, event); 00545 } 00546 } 00547 00548 void AttachJobWatcher(IWorkerNodeJobWatcher& job_watcher, EOwnership owner) 00549 { 00550 TCont::const_iterator it = m_Watchers.find(&job_watcher); 00551 if (it == m_Watchers.end()) { 00552 if (owner == eTakeOwnership) 00553 m_Watchers[&job_watcher] = AutoPtr<IWorkerNodeJobWatcher>(&job_watcher); 00554 else 00555 m_Watchers[&job_watcher] = AutoPtr<IWorkerNodeJobWatcher>(); 00556 } 00557 } 00558 00559 private: 00560 typedef map<IWorkerNodeJobWatcher*, AutoPtr<IWorkerNodeJobWatcher> > TCont; 00561 TCont m_Watchers; 00562 }; 00563 00564 ///////////////////////////////////////////////////////////////////////////// 00565 // 00566 // CGridControleThread 00567 /// @internal 00568 class CGridControlThread : public CThread 00569 { 00570 public: 00571 CGridControlThread(CGridWorkerNode* worker_node, 00572 unsigned int start_port, unsigned int end_port) : m_Control( 00573 new CWorkerNodeControlServer(worker_node, start_port, end_port)) {} 00574 00575 void Prepare() { m_Control->StartListening(); } 00576 00577 unsigned short GetControlPort() { return m_Control->GetControlPort(); } 00578 void Stop() { if (m_Control.get()) m_Control->RequestShutdown(); } 00579 00580 protected: 00581 virtual void* Main(void) 00582 { 00583 m_Control->Run(); 00584 return NULL; 00585 } 00586 virtual void OnExit(void) 00587 { 00588 CThread::OnExit(); 00589 CGridGlobals::GetInstance().RequestShutdown(CNetScheduleAdmin::eShutdownImmediate); 00590 LOG_POST_X(46, Info << "Control Thread has been stopped."); 00591 } 00592 00593 private: 00594 auto_ptr<CWorkerNodeControlServer> m_Control; 00595 }; 00596 00597 class CGridCleanupThread : public CThread 00598 { 00599 public: 00600 CGridCleanupThread(CGridWorkerNode* worker_node, 00601 IGridWorkerNodeApp_Listener* listener) : 00602 m_WorkerNode(worker_node), 00603 m_Listener(listener), 00604 m_Semaphore(0, 1) 00605 { 00606 } 00607 00608 bool Wait(unsigned seconds) {return m_Semaphore.TryWait(seconds);} 00609 00610 protected: 00611 virtual void* Main(); 00612 00613 private: 00614 CGridWorkerNode* m_WorkerNode; 00615 IGridWorkerNodeApp_Listener* m_Listener; 00616 CSemaphore m_Semaphore; 00617 }; 00618 00619 void* CGridCleanupThread::Main() 00620 { 00621 m_WorkerNode->GetCleanupEventSource()->CallEventHandlers(); 00622 m_Listener->OnGridWorkerStop(); 00623 m_Semaphore.Post(); 00624 00625 return NULL; 00626 } 00627 00628 ///////////////////////////////////////////////////////////////////////////// 00629 // 00630 // CWorkerNodeIdleThread -- 00631 class CWorkerNodeIdleThread : public CThread 00632 { 00633 public: 00634 CWorkerNodeIdleThread(IWorkerNodeIdleTask*, 00635 CGridWorkerNode& worker_node, 00636 unsigned run_delay, 00637 unsigned int auto_shutdown); 00638 00639 void RequestShutdown() 00640 { 00641 m_ShutdownFlag = true; 00642 m_Wait1.Post(); 00643 m_Wait2.Post(); 00644 } 00645 void Schedule() 00646 { 00647 CFastMutexGuard guard(m_Mutext); 00648 m_AutoShutdownSW.Restart(); 00649 if (m_StopFlag) { 00650 m_StopFlag = false; 00651 m_Wait1.Post(); 00652 } 00653 } 00654 void Suspend() 00655 { 00656 CFastMutexGuard guard(m_Mutext); 00657 m_AutoShutdownSW.Restart(); 00658 m_AutoShutdownSW.Stop(); 00659 if (!m_StopFlag) { 00660 m_StopFlag = true; 00661 m_Wait2.Post(); 00662 } 00663 } 00664 00665 bool IsShutdownRequested() const { return m_ShutdownFlag; } 00666 00667 00668 protected: 00669 virtual void* Main(void); 00670 virtual void OnExit(void); 00671 00672 CWorkerNodeIdleTaskContext& GetContext(); 00673 00674 private: 00675 00676 unsigned int x_GetInterval() const 00677 { 00678 CFastMutexGuard guard(m_Mutext); 00679 return m_AutoShutdown > 0 ? 00680 min( m_AutoShutdown - (unsigned int)m_AutoShutdownSW.Elapsed(), m_RunInterval ) 00681 : m_RunInterval; 00682 } 00683 bool x_GetStopFlag() const 00684 { 00685 CFastMutexGuard guard(m_Mutext); 00686 return m_StopFlag; 00687 } 00688 bool x_IsAutoShutdownTime() const 00689 { 00690 CFastMutexGuard guard(m_Mutext); 00691 return m_AutoShutdown > 0 ? m_AutoShutdownSW.Elapsed() > m_AutoShutdown : false; 00692 } 00693 00694 IWorkerNodeIdleTask* m_Task; 00695 CGridWorkerNode& m_WorkerNode; 00696 auto_ptr<CWorkerNodeIdleTaskContext> m_TaskContext; 00697 mutable CSemaphore m_Wait1; 00698 mutable CSemaphore m_Wait2; 00699 volatile bool m_StopFlag; 00700 volatile bool m_ShutdownFlag; 00701 unsigned int m_RunInterval; 00702 unsigned int m_AutoShutdown; 00703 CStopWatch m_AutoShutdownSW; 00704 mutable CFastMutex m_Mutext; 00705 00706 CWorkerNodeIdleThread(const CWorkerNodeIdleThread&); 00707 CWorkerNodeIdleThread& operator=(const CWorkerNodeIdleThread&); 00708 }; 00709 00710 CWorkerNodeIdleThread::CWorkerNodeIdleThread(IWorkerNodeIdleTask* task, 00711 CGridWorkerNode& worker_node, 00712 unsigned run_delay, 00713 unsigned int auto_shutdown) 00714 : m_Task(task), m_WorkerNode(worker_node), 00715 m_Wait1(0,100000), m_Wait2(0,1000000), 00716 m_StopFlag(false), m_ShutdownFlag(false), 00717 m_RunInterval(run_delay), 00718 m_AutoShutdown(auto_shutdown), m_AutoShutdownSW(CStopWatch::eStart) 00719 { 00720 } 00721 void* CWorkerNodeIdleThread::Main() 00722 { 00723 while (!m_ShutdownFlag) { 00724 if ( x_IsAutoShutdownTime() ) { 00725 LOG_POST_X(47, Info << 00726 "There are no more jobs to be done. Exiting."); 00727 CGridGlobals::GetInstance().RequestShutdown(CNetScheduleAdmin::eShutdownImmediate); 00728 break; 00729 } 00730 unsigned int interval = m_AutoShutdown > 0 ? min (m_RunInterval,m_AutoShutdown) : m_RunInterval; 00731 if (m_Wait1.TryWait(interval, 0)) { 00732 if (m_ShutdownFlag) 00733 continue; 00734 interval = x_GetInterval(); 00735 if (m_Wait2.TryWait(interval, 0)) { 00736 continue; 00737 } 00738 } 00739 if (m_Task && !x_GetStopFlag()) { 00740 try { 00741 do { 00742 if ( x_IsAutoShutdownTime() ) { 00743 LOG_POST_X(48, Info << 00744 "There are no more jobs to be done. Exiting."); 00745 CGridGlobals::GetInstance().RequestShutdown( 00746 CNetScheduleAdmin::eShutdownImmediate); 00747 m_ShutdownFlag = true; 00748 break; 00749 } 00750 GetContext().Reset(); 00751 m_Task->Run(GetContext()); 00752 } while( GetContext().NeedRunAgain() && !m_ShutdownFlag); 00753 } NCBI_CATCH_ALL_X(58, 00754 "CWorkerNodeIdleThread::Main: Idle Task failed"); 00755 } 00756 } 00757 return 0; 00758 } 00759 00760 void CWorkerNodeIdleThread::OnExit(void) 00761 { 00762 LOG_POST_X(49, Info << "Idle Thread has been stopped."); 00763 } 00764 00765 CWorkerNodeIdleTaskContext& CWorkerNodeIdleThread::GetContext() 00766 { 00767 if (!m_TaskContext.get()) 00768 m_TaskContext.reset(new CWorkerNodeIdleTaskContext(*this)); 00769 return *m_TaskContext; 00770 } 00771 00772 ///////////////////////////////////////////////////////////////////////////// 00773 // 00774 // CWorkerNodeIdleTaskContext -- 00775 CWorkerNodeIdleTaskContext:: 00776 CWorkerNodeIdleTaskContext(CWorkerNodeIdleThread& thread) 00777 : m_Thread(thread), m_RunAgain(false) 00778 { 00779 } 00780 bool CWorkerNodeIdleTaskContext::IsShutdownRequested() const 00781 { 00782 return m_Thread.IsShutdownRequested(); 00783 } 00784 void CWorkerNodeIdleTaskContext::Reset() 00785 { 00786 m_RunAgain = false; 00787 } 00788 00789 void CWorkerNodeIdleTaskContext::RequestShutdown() 00790 { 00791 m_Thread.RequestShutdown(); 00792 CGridGlobals::GetInstance(). 00793 RequestShutdown(CNetScheduleAdmin::eShutdownImmediate); 00794 } 00795 00796 ///////////////////////////////////////////////////////////////////////////// 00797 // 00798 // CIdleWatcher 00799 /// @internal 00800 class CIdleWatcher : public IWorkerNodeJobWatcher 00801 { 00802 public: 00803 CIdleWatcher(CWorkerNodeIdleThread& idle) 00804 : m_Idle(idle), m_RunningJobs(0) {} 00805 virtual ~CIdleWatcher() {}; 00806 virtual void Notify(const CWorkerNodeJobContext& job, EEvent event) 00807 { 00808 if (event == eJobStarted) { 00809 ++m_RunningJobs; 00810 m_Idle.Suspend(); 00811 } else if (event == eJobStopped) { 00812 --m_RunningJobs; 00813 if (m_RunningJobs == 0) 00814 m_Idle.Schedule(); 00815 } 00816 } 00817 00818 private: 00819 CWorkerNodeIdleThread& m_Idle; 00820 volatile int m_RunningJobs; 00821 }; 00822 00823 00824 ///////////////////////////////////////////////////////////////////////////// 00825 // 00826 00827 CGridWorkerNode::CGridWorkerNode(CNcbiApplication& app, 00828 IWorkerNodeJobFactory* job_factory) : 00829 m_JobFactory(job_factory), 00830 m_MaxThreads(1), 00831 m_NSTimeout(30), 00832 m_CheckStatusPeriod(2), 00833 m_ExclusiveJobSemaphore(1, 1), 00834 m_IsProcessingExclusiveJob(false), 00835 m_TotalMemoryLimit(0), 00836 m_TotalTimeLimit(0), 00837 m_StartupTime(0), 00838 m_CleanupEventSource(new CWorkerNodeCleanup()), 00839 m_Listener(new CGridWorkerNodeApp_Listener()), 00840 m_App(app), 00841 m_SingleThreadForced(false) 00842 { 00843 if (!m_JobFactory.get()) 00844 NCBI_THROW(CGridWorkerNodeException, 00845 eJobFactoryIsNotSet, "The JobFactory is not set."); 00846 } 00847 00848 CGridWorkerNode::~CGridWorkerNode() 00849 { 00850 } 00851 00852 void CGridWorkerNode::Init(bool default_merge_lines_value) 00853 { 00854 IRWRegistry& reg = m_App.GetConfig(); 00855 00856 if (reg.GetBool("log", "merge_lines", default_merge_lines_value)) { 00857 SetDiagPostFlag(eDPF_PreMergeLines); 00858 SetDiagPostFlag(eDPF_MergeLines); 00859 } 00860 00861 reg.Set(kNetScheduleAPIDriverName, "discover_low_priority_servers", "true"); 00862 00863 m_NetScheduleAPI = CNetScheduleAPI(reg); 00864 m_NetCacheAPI = CNetCacheAPI(reg); 00865 } 00866 00867 const char* kServerSec = "server"; 00868 00869 int CGridWorkerNode::Run() 00870 { 00871 LOG_POST_X(50, Info << GetJobFactory().GetJobVersion() << 00872 " build " WN_BUILD_DATE); 00873 00874 const IRegistry& reg = m_App.GetConfig(); 00875 CConfig conf(reg); 00876 00877 unsigned init_threads = 1; 00878 00879 if (!m_SingleThreadForced) { 00880 string max_threads = reg.GetString(kServerSec, "max_threads", "auto"); 00881 if (NStr::CompareNocase(max_threads, "auto") == 0) 00882 m_MaxThreads = GetCpuCount(); 00883 else { 00884 try { 00885 m_MaxThreads = NStr::StringToUInt(max_threads); 00886 } 00887 catch (exception&) { 00888 m_MaxThreads = GetCpuCount(); 00889 ERR_POST_X(51, "Could not convert [" << kServerSec << 00890 "] max_threads parameter to number.\n" 00891 "Using \'auto\' option (" << m_MaxThreads << ")."); 00892 } 00893 } 00894 init_threads = 00895 reg.GetInt(kServerSec, "init_threads", 1, 0, IRegistry::eReturn); 00896 } 00897 if (init_threads > m_MaxThreads) 00898 init_threads = m_MaxThreads; 00899 00900 m_NSTimeout = reg.GetInt(kServerSec, 00901 "job_wait_timeout", 30, 0, IRegistry::eReturn); 00902 00903 unsigned thread_pool_timeout = reg.GetInt(kServerSec, 00904 "thread_pool_timeout", 30, 0, IRegistry::eReturn); 00905 00906 const CArgs& args = m_App.GetArgs(); 00907 00908 unsigned int start_port, end_port; 00909 00910 string sport1, sport2; 00911 NStr::SplitInTwo(args["control_port"] ? args["control_port"].AsString() : 00912 reg.GetString(kServerSec, "control_port", "9300"), "-", sport1, sport2); 00913 start_port = NStr::StringToUInt(sport1); 00914 end_port = sport2.empty() ? start_port : NStr::StringToUInt(sport2); 00915 00916 bool log_requested = reg.GetBool(kServerSec, 00917 "log", false, 0, IRegistry::eReturn); 00918 00919 unsigned int idle_run_delay = 00920 reg.GetInt(kServerSec,"idle_run_delay",30,0,IRegistry::eReturn); 00921 00922 unsigned int auto_shutdown = 00923 reg.GetInt(kServerSec,"auto_shutdown_if_idle",0,0,IRegistry::eReturn); 00924 00925 unsigned int infinite_loop_time = 00926 reg.HasEntry(kServerSec, "infinite_loop_time") ? 00927 reg.GetInt(kServerSec, "infinite_loop_time", 0, 0, IRegistry::eReturn) : 00928 reg.GetInt(kServerSec, "infinit_loop_time", 0, 0, IRegistry::eReturn); 00929 00930 bool reuse_job_object = 00931 reg.GetBool(kServerSec, "reuse_job_object", false, 0, 00932 CNcbiRegistry::eReturn); 00933 00934 unsigned int max_total_jobs = 00935 reg.GetInt(kServerSec,"max_total_jobs",0,0,IRegistry::eReturn); 00936 00937 unsigned int max_failed_jobs = 00938 reg.GetInt(kServerSec,"max_failed_jobs",0,0,IRegistry::eReturn); 00939 00940 bool is_daemon = 00941 reg.GetBool(kServerSec, "daemon", false, 0, CNcbiRegistry::eReturn); 00942 00943 {{ 00944 string memlimitstr = 00945 reg.GetString(kServerSec,"total_memory_limit","",IRegistry::eReturn); 00946 if (!memlimitstr.empty()) { 00947 m_TotalMemoryLimit = NStr::StringToUInt8_DataSize(memlimitstr); 00948 } 00949 }} 00950 00951 m_TotalTimeLimit = 00952 reg.GetInt(kServerSec,"total_time_limit", 0, 0, IRegistry::eReturn); 00953 m_StartupTime = time(0); 00954 00955 vector<string> vhosts; 00956 00957 NStr::Tokenize(reg.GetString(kServerSec, 00958 "master_nodes", kEmptyStr), " ;,", vhosts); 00959 00960 ITERATE(vector<string>, it, vhosts) { 00961 string host, port; 00962 NStr::SplitInTwo(NStr::TruncateSpaces(*it), ":", host, port); 00963 if (host.empty() || port.empty()) 00964 continue; 00965 m_Masters.insert(SServerAddress(NStr::ToLower(host), 00966 (unsigned short) NStr::StringToUInt(port))); 00967 } 00968 00969 vhosts.clear(); 00970 00971 NStr::Tokenize(reg.GetString(kServerSec, 00972 "admin_hosts", kEmptyStr), " ;,", vhosts); 00973 00974 ITERATE(vector<string>, it, vhosts) { 00975 unsigned int ha = CSocketAPI::gethostbyname(*it); 00976 if (ha != 0) 00977 m_AdminHosts.insert(ha); 00978 } 00979 00980 m_CheckStatusPeriod = 00981 reg.GetInt(kServerSec, "check_status_period", 2, 0, IRegistry::eReturn); 00982 00983 if (reg.HasEntry(kServerSec,"wait_server_timeout")) { 00984 ERR_POST_X(52, "[" << kServerSec << 00985 "] \"wait_server_timeout\" is not used anymore.\n" 00986 "Use [" << kNetScheduleAPIDriverName << 00987 "] \"communication_timeout\" parameter instead."); 00988 } 00989 00990 CGridDebugContext::eMode debug_mode = CGridDebugContext::eGDC_NoDebug; 00991 string dbg_mode = reg.GetString("gw_debug", "mode", kEmptyStr); 00992 if (NStr::CompareNocase(dbg_mode, "gather")==0) { 00993 debug_mode = CGridDebugContext::eGDC_Gather; 00994 } else if (NStr::CompareNocase(dbg_mode, "execute")==0) { 00995 debug_mode = CGridDebugContext::eGDC_Execute; 00996 } 00997 if (debug_mode != CGridDebugContext::eGDC_NoDebug) { 00998 CGridDebugContext& debug_context = CGridDebugContext::Create( 00999 debug_mode, m_NetCacheAPI); 01000 string run_name = 01001 reg.GetString("gw_debug", "run_name", m_App.GetProgramDisplayName()); 01002 debug_context.SetRunName(run_name); 01003 if (debug_mode == CGridDebugContext::eGDC_Gather) { 01004 max_total_jobs = 01005 reg.GetInt("gw_debug","gather_nrequests",1,0,IRegistry::eReturn); 01006 } else if (debug_mode == CGridDebugContext::eGDC_Execute) { 01007 string files = 01008 reg.GetString("gw_debug", "execute_requests", kEmptyStr); 01009 max_total_jobs = 0; 01010 debug_context.SetExecuteList(files); 01011 } 01012 is_daemon = false; 01013 } 01014 01015 #if defined(NCBI_OS_UNIX) 01016 if (is_daemon) { 01017 LOG_POST_X(53, "Entering UNIX daemon mode..."); 01018 bool daemon = CProcess::Daemonize("/dev/null", 01019 CProcess::fDontChroot | 01020 CProcess::fKeepStdin | 01021 CProcess::fKeepStdout); 01022 if (!daemon) { 01023 return 0; 01024 } 01025 } 01026 #endif 01027 01028 AttachJobWatcher(CGridGlobals::GetInstance().GetJobsWatcher()); 01029 01030 CRef<CGridControlThread> control_thread( 01031 new CGridControlThread(this, start_port, end_port)); 01032 01033 m_NetScheduleAPI.SetAuthParam("control_port", 01034 NStr::NumericToString(control_thread->GetControlPort())); 01035 m_NetScheduleAPI.SetAuthParam("client_host", CSocketAPI::gethostname()); 01036 01037 try { 01038 control_thread->Prepare(); 01039 } 01040 catch (CServer_Exception& e) { 01041 if (e.GetErrCode() != CServer_Exception::eCouldntListen) 01042 throw; 01043 NCBI_THROW_FMT(CGridWorkerNodeException, ePortBusy, 01044 "Couldn't start a listener on a port from the specified " 01045 "control port range; last port tried: " << 01046 control_thread->GetControlPort() << ". Another process " 01047 "(probably another instance of this worker node) is occupying " 01048 "the port(s)."); 01049 } 01050 01051 if (m_NetScheduleAPI->m_ClientNode.empty()) { 01052 string client_node(m_NetScheduleAPI-> 01053 m_Service->m_ServerPool->m_ClientName); 01054 client_node.append(2, ':'); 01055 client_node.append(CSocketAPI::gethostname()); 01056 client_node.append(1, ':'); 01057 client_node.append(NStr::NumericToString( 01058 control_thread->GetControlPort())); 01059 m_NetScheduleAPI.SetClientNode(client_node); 01060 01061 m_NetScheduleAPI.SetClientSession(GetDiagContext().GetStringUID()); 01062 } 01063 01064 m_NSExecutor = m_NetScheduleAPI.GetExecutor(); 01065 01066 m_NetScheduleAPI.SetProgramVersion(m_JobFactory->GetJobVersion()); 01067 01068 CGridGlobals::GetInstance().SetReuseJobObject(reuse_job_object); 01069 CGridGlobals::GetInstance().GetJobsWatcher().SetMaxJobsAllowed(max_total_jobs); 01070 CGridGlobals::GetInstance().GetJobsWatcher().SetMaxFailuresAllowed(max_failed_jobs); 01071 CGridGlobals::GetInstance().GetJobsWatcher().SetInfinitLoopTime(infinite_loop_time); 01072 CGridGlobals::GetInstance().SetWorker(*this); 01073 01074 IWorkerNodeIdleTask* task = NULL; 01075 if (idle_run_delay > 0) 01076 task = GetJobFactory().GetIdleTask(); 01077 if (task || auto_shutdown > 0 ) { 01078 m_IdleThread.Reset(new CWorkerNodeIdleThread(task, *this, 01079 task ? idle_run_delay : auto_shutdown, 01080 auto_shutdown)); 01081 m_IdleThread->Run(); 01082 AttachJobWatcher(*(new CIdleWatcher(*m_IdleThread)), eTakeOwnership); 01083 } 01084 01085 control_thread->Run(); 01086 01087 LOG_POST_X(54, Info << "\n=================== NEW RUN : " << 01088 CGridGlobals::GetInstance().GetStartTime().AsString() << 01089 " ===================\n" << 01090 GetJobFactory().GetJobVersion() << " build " WN_BUILD_DATE << 01091 " is started.\n" 01092 "Waiting for control commands on " << CSocketAPI::gethostname() << 01093 ":" << control_thread->GetControlPort() << "\n" 01094 "Queue name: " << GetQueueName() << "\n" 01095 "Maximum job threads: " << m_MaxThreads << "\n"); 01096 01097 m_Listener->OnGridWorkerStart(); 01098 01099 auto_ptr<CGridThreadContext> single_thread_context; 01100 auto_ptr<CStdPoolOfThreads> thread_pool; 01101 01102 _ASSERT(m_MaxThreads > 0); 01103 01104 if (m_MaxThreads <= 1) 01105 single_thread_context.reset(new CGridThreadContext(*this)); 01106 else { 01107 thread_pool.reset(new CStdPoolOfThreads(m_MaxThreads, 0)); 01108 try { 01109 thread_pool->Spawn(init_threads); 01110 } 01111 catch (exception& ex) { 01112 ERR_POST_X(26, ex.what()); 01113 CGridGlobals::GetInstance().RequestShutdown( 01114 CNetScheduleAdmin::eShutdownImmediate); 01115 return 2; 01116 } 01117 } 01118 01119 CNetScheduleJob job; 01120 01121 unsigned try_count = 0; 01122 while (!CGridGlobals::GetInstance().IsShuttingDown()) { 01123 try { 01124 01125 if (m_MaxThreads > 1) { 01126 try { 01127 thread_pool->WaitForRoom(thread_pool_timeout); 01128 } catch (CBlockingQueueException&) { 01129 // threaded pool is busy 01130 continue; 01131 } 01132 } 01133 01134 if (x_GetNextJob(job)) { 01135 auto_ptr<CWorkerNodeJobContext> 01136 job_context(new CWorkerNodeJobContext(*this, 01137 job, 01138 log_requested)); 01139 job_context->Reset(job); 01140 if (m_MaxThreads > 1) { 01141 try { 01142 thread_pool->AcceptRequest(CRef<CStdRequest>( 01143 new CWorkerNodeRequest(job_context))); 01144 } catch (CBlockingQueueException& ex) { 01145 ERR_POST_X(28, ex.what()); 01146 // that must not happen after CBlockingQueue is fixed 01147 _ASSERT(0); 01148 x_ReturnJob(job.job_id, job.auth_token); 01149 } 01150 } else { 01151 try { 01152 single_thread_context->RunJob(*job_context); 01153 } catch (exception&) { 01154 x_ReturnJob(job.job_id, job.auth_token); 01155 throw; 01156 } 01157 } 01158 } 01159 } catch (CNetServiceException& ex) { 01160 ERR_POST_X(40, ex.what()); 01161 if (++try_count >= TServConn_ConnMaxRetries::GetDefault()) { 01162 CGridGlobals::GetInstance().RequestShutdown( 01163 CNetScheduleAdmin::eShutdownImmediate); 01164 } else { 01165 SleepMilliSec(s_GetRetryDelay()); 01166 continue; 01167 } 01168 } catch (exception& ex) { 01169 if (TWorkerNode_StopOnJobErrors::GetDefault()) { 01170 ERR_POST_X(29, ex.what()); 01171 CGridGlobals::GetInstance().RequestShutdown( 01172 CNetScheduleAdmin::eShutdownImmediate); 01173 } 01174 } 01175 try_count = 0; 01176 } 01177 LOG_POST_X(31, Info << "Shutting down..."); 01178 if (reg.GetBool(kServerSec, 01179 "force_exit", false, 0, CNcbiRegistry::eReturn)) { 01180 ERR_POST_X(45, "Force exit"); 01181 } else if (m_MaxThreads > 1) { 01182 try { 01183 LOG_POST_X(32, Info << "Stopping worker threads..."); 01184 thread_pool->KillAllThreads(true); 01185 thread_pool.reset(0); 01186 } 01187 catch (exception& ex) { 01188 ERR_POST_X(33, "Could not stop worker threads: " << ex.what()); 01189 } 01190 } 01191 try { 01192 GetNSExecutor().UnRegisterClient(); 01193 } 01194 catch (CNetServiceException& ex) { 01195 // if server does not understand this new command just ignore the error 01196 if (ex.GetErrCode() != CNetServiceException::eCommunicationError 01197 || NStr::Find(ex.what(),"Server error:Unknown request") == NPOS) { 01198 ERR_POST_X(35, "Could not unregister from NetSchedule services: " 01199 << ex.what()); 01200 } 01201 } 01202 catch (exception& ex) { 01203 ERR_POST_X(36, "Could not unregister from NetSchedule services: " 01204 << ex.what()); 01205 } 01206 01207 LOG_POST_X(38, Info << "Worker Node has been stopped."); 01208 01209 01210 CRef<CGridCleanupThread> cleanup_thread( 01211 new CGridCleanupThread(this, m_Listener.get())); 01212 01213 cleanup_thread->Run(); 01214 01215 LOG_POST_X(55, Info << "Stopping Control thread..."); 01216 control_thread->Stop(); 01217 control_thread->Join(); 01218 01219 CNcbiOstrstream os; 01220 CGridGlobals::GetInstance().GetJobsWatcher().Print(os); 01221 LOG_POST_X(56, Info << string(CNcbiOstrstreamToString(os))); 01222 01223 if (m_IdleThread) { 01224 if (!m_IdleThread->IsShutdownRequested()) { 01225 LOG_POST_X(57, "Stopping Idle thread..."); 01226 m_IdleThread->RequestShutdown(); 01227 } 01228 m_IdleThread->Join(); 01229 } 01230 01231 if (cleanup_thread->Wait(thread_pool_timeout)) { 01232 cleanup_thread->Join(); 01233 LOG_POST_X(58, Info << "Cleanup thread finished"); 01234 } else { 01235 ERR_POST_X(59, "Clean-up thread timed out"); 01236 } 01237 01238 return CGridGlobals::GetInstance().GetExitCode(); 01239 } 01240 01241 void CGridWorkerNode::RequestShutdown() 01242 { 01243 CGridGlobals::GetInstance(). 01244 RequestShutdown(CNetScheduleAdmin::eShutdownImmediate); 01245 } 01246 01247 01248 void CGridWorkerNode::AttachJobWatcher(IWorkerNodeJobWatcher& job_watcher, 01249 EOwnership owner) 01250 { 01251 if (!m_JobWatchers.get()) 01252 m_JobWatchers.reset(new CWorkerNodeJobWatchers); 01253 m_JobWatchers->AttachJobWatcher(job_watcher, owner); 01254 }; 01255 01256 void CGridWorkerNode::SetListener(IGridWorkerNodeApp_Listener* listener) 01257 { 01258 m_Listener.reset(listener ? listener : new CGridWorkerNodeApp_Listener()); 01259 } 01260 01261 01262 void CGridWorkerNode::x_NotifyJobWatcher(const CWorkerNodeJobContext& job, 01263 IWorkerNodeJobWatcher::EEvent event) 01264 { 01265 if (m_JobWatchers.get() != NULL) { 01266 CFastMutexGuard guard(m_JobWatcherMutex); 01267 m_JobWatchers->Notify(job, event); 01268 } 01269 } 01270 01271 bool CGridWorkerNode::x_GetNextJob(CNetScheduleJob& job) 01272 { 01273 bool job_exists = false; 01274 01275 CGridDebugContext* debug_context = CGridDebugContext::GetInstance(); 01276 01277 if (debug_context && 01278 debug_context->GetDebugMode() == CGridDebugContext::eGDC_Execute) { 01279 job_exists = debug_context->GetNextJob(job.job_id, job.input); 01280 if (!job_exists) { 01281 CGridGlobals::GetInstance(). 01282 RequestShutdown(CNetScheduleAdmin::eNormalShutdown); 01283 } 01284 } else { 01285 if (!x_AreMastersBusy()) { 01286 SleepSec(m_NSTimeout); 01287 return false; 01288 } 01289 01290 if (!WaitForExclusiveJobToFinish()) 01291 return false; 01292 01293 job_exists = GetNSExecutor().GetJob(job, m_NSTimeout); 01294 01295 if (job_exists && job.mask & CNetScheduleAPI::eExclusiveJob) { 01296 if (EnterExclusiveMode()) 01297 job_exists = true; 01298 else { 01299 x_ReturnJob(job.job_id, job.auth_token); 01300 job_exists = false; 01301 } 01302 } 01303 } 01304 if (job_exists && CGridGlobals::GetInstance().IsShuttingDown()) { 01305 x_ReturnJob(job.job_id, job.auth_token); 01306 return false; 01307 } 01308 return job_exists; 01309 } 01310 01311 void CGridWorkerNode::x_ReturnJob(const string& job_key, 01312 const string& auth_token) 01313 { 01314 CGridDebugContext* debug_context = CGridDebugContext::GetInstance(); 01315 if (!debug_context || 01316 debug_context->GetDebugMode() != CGridDebugContext::eGDC_Execute) { 01317 GetNSExecutor().ReturnJob(job_key, auth_token); 01318 } 01319 } 01320 01321 void CGridWorkerNode::x_FailJob(const CNetScheduleJob& job, const string& reason) 01322 { 01323 CGridDebugContext* debug_context = CGridDebugContext::GetInstance(); 01324 if (!debug_context || 01325 debug_context->GetDebugMode() != CGridDebugContext::eGDC_Execute) { 01326 CNetScheduleJob job_copy(job); 01327 job_copy.error_msg = reason; 01328 GetNSExecutor().PutFailure(job_copy); 01329 } 01330 } 01331 01332 size_t CGridWorkerNode::GetServerOutputSize() 01333 { 01334 return m_NetScheduleAPI->m_UseEmbeddedStorage ? 01335 m_NetScheduleAPI.GetServerParams().max_output_size : 0; 01336 } 01337 01338 bool CGridWorkerNode::IsHostInAdminHostsList(const string& host) const 01339 { 01340 if (m_AdminHosts.empty()) 01341 return true; 01342 unsigned int ha = CSocketAPI::gethostbyname(host); 01343 if (m_AdminHosts.find(ha) != m_AdminHosts.end()) 01344 return true; 01345 unsigned int ha_lh = CSocketAPI::gethostbyname(""); 01346 if (ha == ha_lh) { 01347 ha = CSocketAPI::gethostbyname("localhost"); 01348 if (m_AdminHosts.find(ha) != m_AdminHosts.end()) 01349 return true; 01350 } 01351 return false; 01352 } 01353 01354 bool CGridWorkerNode::x_AreMastersBusy() const 01355 { 01356 ITERATE(set<SServerAddress>, it, m_Masters) { 01357 STimeout tmo = {0, 500}; 01358 CSocket socket(it->host, it->port, &tmo, eOff); 01359 if (socket.GetStatus(eIO_Open) != eIO_Success) 01360 continue; 01361 01362 CNcbiOstrstream os; 01363 os << GetJobVersion() << endl; 01364 os << GetNetScheduleAPI().GetQueueName() <<";" 01365 << GetServiceName(); 01366 os << endl; 01367 os << "GETLOAD" << ends; 01368 if (socket.Write(os.str(), (size_t)os.pcount()) != eIO_Success) { 01369 os.freeze(false); 01370 continue; 01371 } 01372 os.freeze(false); 01373 string reply; 01374 if (socket.ReadLine(reply) != eIO_Success) 01375 continue; 01376 if (NStr::StartsWith(reply, "ERR:")) { 01377 string msg; 01378 NStr::Replace(reply, "ERR:", "", msg); 01379 ERR_POST_X(43, "Worker Node at " << it->AsString() << 01380 " returned error: " << msg); 01381 } else if (NStr::StartsWith(reply, "OK:")) { 01382 string msg; 01383 NStr::Replace(reply, "OK:", "", msg); 01384 try { 01385 int load = NStr::StringToInt(msg); 01386 if (load > 0) 01387 return false; 01388 } catch (exception&) {} 01389 } else { 01390 ERR_POST_X(44, "Worker Node at " << it->AsString() << 01391 " returned unknown reply: " << reply); 01392 } 01393 } 01394 return true; 01395 } 01396 01397 bool CGridWorkerNode::EnterExclusiveMode() 01398 { 01399 if (m_ExclusiveJobSemaphore.TryWait()) { 01400 _ASSERT(!m_IsProcessingExclusiveJob); 01401 01402 m_IsProcessingExclusiveJob = true; 01403 return true; 01404 } 01405 return false; 01406 } 01407 01408 void CGridWorkerNode::LeaveExclusiveMode() 01409 { 01410 _ASSERT(m_IsProcessingExclusiveJob); 01411 01412 m_IsProcessingExclusiveJob = false; 01413 m_ExclusiveJobSemaphore.Post(); 01414 } 01415 01416 bool CGridWorkerNode::WaitForExclusiveJobToFinish() 01417 { 01418 if (m_ExclusiveJobSemaphore.TryWait(m_NSTimeout)) { 01419 m_ExclusiveJobSemaphore.Post(); 01420 return true; 01421 } 01422 return false; 01423 } 01424 01425 void CGridWorkerNode::DisableDefaultRequestEventLogging( 01426 CGridWorkerNode::EDisabledRequestEvents disabled_events) 01427 { 01428 s_ReqEventsDisabled = disabled_events; 01429 } 01430 01431 IWorkerNodeCleanupEventSource* CGridWorkerNode::GetCleanupEventSource() 01432 { 01433 return m_CleanupEventSource; 01434 } 01435 01436 END_NCBI_SCOPE
1.7.5.1
Modified on Wed May 23 12:58:07 2012 by modify_doxy.py rev. 337098