|
NCBI C++ ToolKit
|
00001 /* $Id: thread_pool.cpp 53797 2012-04-18 19:34:34Z ivanovp $ 00002 * =========================================================================== 00003 * 00004 * PUBLIC DOMAIN NOTICE 00005 * National Center for Biotechnology Information 00006 * 00007 * This software/database is a "United States Government Work" under the 00008 * terms of the United States Copyright Act. It was written as part of 00009 * the author's official duties as a United States Government employee and 00010 * thus cannot be copyrighted. This software/database is freely available 00011 * to the public for use. The National Library of Medicine and the U.S. 00012 * Government have not placed any restriction on its use or reproduction. 00013 * 00014 * Although all reasonable efforts have been taken to ensure the accuracy 00015 * and reliability of the software and data, the NLM and the U.S. 00016 * Government do not and cannot warrant the performance or results that 00017 * may be obtained by using this software or data. The NLM and the U.S. 00018 * Government disclaim all warranties, express or implied, including 00019 * warranties of performance, merchantability or fitness for any particular 00020 * purpose. 00021 * 00022 * Please cite the author in any work or product based on this material. 00023 * 00024 * =========================================================================== 00025 * 00026 * Author: Pavel Ivanov 00027 * 00028 * File Description: 00029 * Pool of threads. 00030 */ 00031 00032 #include <ncbi_pch.hpp> 00033 #include <util/thread_pool.hpp> 00034 #include <util/thread_pool_ctrl.hpp> 00035 #include <util/sync_queue.hpp> 00036 #include <util/error_codes.hpp> 00037 00038 #define NCBI_USE_ERRCODE_X Util_Thread 00039 00040 BEGIN_NCBI_SCOPE 00041 00042 00043 class CThreadPool_Guard; 00044 class CThreadPool_ServiceThread; 00045 00046 00047 /// Functor to compare tasks by priority 00048 struct SThreadPool_TaskCompare { 00049 bool operator() (const CRef<CThreadPool_Task>& left, 00050 const CRef<CThreadPool_Task>& right) 00051 { 00052 return left->GetPriority() < right->GetPriority(); 00053 } 00054 }; 00055 00056 00057 /// Real implementation of all ThreadPool functions 00058 class CThreadPool_Impl : public CObject 00059 { 00060 public: 00061 typedef CThreadPool::TExclusiveFlags TExclusiveFlags; 00062 00063 /// Convert pointer to CThreadPool object into pointer to CThreadPool_Impl 00064 /// object. Can be done only here to avoid excessive friendship to 00065 /// CThreadPool class. 00066 static CThreadPool_Impl* s_GetImplPointer(CThreadPool* pool); 00067 00068 /// Call x_SetTaskStatus() for the given task. 00069 /// Method introduced to avoid excessive friendship to CThreadPool_Task 00070 /// class. 00071 /// 00072 /// @sa CThreadPool_Task::x_SetTaskStatus() 00073 static void sx_SetTaskStatus(CThreadPool_Task* task, 00074 CThreadPool_Task::EStatus status); 00075 00076 /// Call x_RequestToCancel() for the given task. 00077 /// Method introduced to avoid excessive friendship to CThreadPool_Task 00078 /// class. 00079 /// 00080 /// @sa CThreadPool_Task::x_RequestToCancel() 00081 static void sx_RequestToCancel(CThreadPool_Task* task); 00082 00083 00084 /// Constructor with default controller 00085 /// @param pool_intf 00086 /// ThreadPool interface object attached to this implementation 00087 /// 00088 /// @sa CThreadPool::CThreadPool() 00089 CThreadPool_Impl(CThreadPool* pool_intf, 00090 unsigned int queue_size, 00091 unsigned int max_threads, 00092 unsigned int min_threads, 00093 CThread::TRunMode threads_mode = CThread::fRunDefault); 00094 00095 /// Constructor with explicitly given controller 00096 /// @param pool_intf 00097 /// ThreadPool interface object attached to this implementation 00098 /// 00099 /// @sa CThreadPool::CThreadPool() 00100 CThreadPool_Impl(CThreadPool* pool_intf, 00101 unsigned int queue_size, 00102 CThreadPool_Controller* controller, 00103 CThread::TRunMode threads_mode = CThread::fRunDefault); 00104 00105 /// Get pointer to ThreadPool interface object 00106 CThreadPool* GetPoolInterface(void) const; 00107 00108 /// Set destroy timeout for the pool 00109 /// 00110 /// @sa CThreadPool::SetDestroyTimeout() 00111 void SetDestroyTimeout(const CTimeSpan& timeout); 00112 00113 /// Get destroy timeout for the pool 00114 /// 00115 /// @sa CThreadPool::GetDestroyTimeout() 00116 const CTimeSpan& GetDestroyTimeout(void) const; 00117 00118 /// Destroy reference to this object 00119 /// Method is called when CThreadPool object is destroyed which means 00120 /// that implementation can be destroyed too if there is no references 00121 /// to it left. 00122 void DestroyReference(void); 00123 00124 /// Get main pool mutex 00125 /// 00126 /// @sa CThreadPool::GetMainPoolMutex() 00127 CMutex& GetMainPoolMutex(void); 00128 00129 /// Add task to the pool 00130 /// 00131 /// @sa CThreadPool::AddTask() 00132 void AddTask(CThreadPool_Task* task, const CTimeSpan* timeout); 00133 00134 /// Request to cancel the task 00135 /// 00136 /// @sa CThreadPool::CancelTask() 00137 void CancelTask(CThreadPool_Task* task); 00138 00139 /// Cancel the selected groups of tasks in the pool 00140 /// 00141 /// @sa CThreadPool::CancelTasks() 00142 void CancelTasks(TExclusiveFlags tasks_group); 00143 00144 /// Add the task for exclusive execution in the pool 00145 /// 00146 /// @sa CThreadPool::RequestExclusiveExecution() 00147 void RequestExclusiveExecution(CThreadPool_Task* task, 00148 TExclusiveFlags flags); 00149 00150 /// Launch new threads in pool 00151 /// @param count 00152 /// Number of threads to launch 00153 void LaunchThreads(unsigned int count); 00154 00155 /// Finish threads in pool 00156 /// Stop first all idle threads then stop busy threads without 00157 /// cancelation of currently executing tasks. 00158 /// @param count 00159 /// Number of threads to finish 00160 void FinishThreads(unsigned int count); 00161 00162 /// Get number of threads running in the pool 00163 unsigned int GetThreadsCount(void) const; 00164 00165 /// Mark thread as idle or non-idle 00166 /// @param thread 00167 /// Thread to mark 00168 /// @param is_idle 00169 /// If thread should be marked as idle or not 00170 void SetThreadIdle(CThreadPool_ThreadImpl* thread, bool is_idle); 00171 00172 /// Callback from working thread when it finished its Main() method 00173 void ThreadStopped(CThreadPool_ThreadImpl* thread); 00174 00175 /// Callback when some thread changed its idleness or finished 00176 /// (including service thread) 00177 void ThreadStateChanged(void); 00178 00179 /// Get next task from queue if there is one 00180 /// If the queue is empty then return NULL. 00181 CRef<CThreadPool_Task> TryGetNextTask(void); 00182 00183 /// Callback from thread when it is starting to execute task 00184 void TaskStarting(void); 00185 00186 /// Callback from thread when it has finished to execute task 00187 void TaskFinished(void); 00188 00189 /// Get the number of tasks currently waiting in queue 00190 unsigned int GetQueuedTasksCount(void) const; 00191 00192 /// Get the number of currently executing tasks 00193 unsigned int GetExecutingTasksCount(void) const; 00194 00195 /// Type for storing information about exclusive task launching 00196 typedef pair< TExclusiveFlags, 00197 CRef<CThreadPool_Task> > TExclusiveTaskInfo; 00198 00199 /// Get information about next exclusive task to execute 00200 TExclusiveTaskInfo TryGetExclusiveTask(void); 00201 00202 /// Request suspension of the pool 00203 /// @param flags 00204 /// Parameters for necessary exclusive execution environment 00205 void RequestSuspend(TExclusiveFlags flags); 00206 00207 /// Resume the pool operation after exclusive task execution 00208 void ResumeWork(void); 00209 00210 /// Check if the pool is suspended for exclusive execution 00211 bool IsSuspended(void) const; 00212 00213 /// Check if it is already allowed to execute exclusive task 00214 bool CanDoExclusiveTask(void) const; 00215 00216 /// Abort the pool operation 00217 /// 00218 /// @sa CThreadPool::Abort() 00219 void Abort(const CTimeSpan* timeout); 00220 00221 /// Check if the pool is already aborted 00222 bool IsAborted(void) const; 00223 00224 /// Finish all current threads and replace them with new ones 00225 /// 00226 /// @sa CThreadPool::FlushThreads() 00227 void FlushThreads(CThreadPool::EFlushType flush_type); 00228 00229 /// Call the CThreadPool_Controller::HandleEvent() method of the pool 00230 /// controller with the given event type. If ThreadPool is already aborted 00231 /// and controller is reset then do nothing. 00232 void CallController(CThreadPool_Controller::EEvent event); 00233 00234 /// Schedule running of CThreadPool_Controller::HandleEvent() with eOther 00235 /// event type 00236 void CallControllerOther(void); 00237 00238 /// Call the CThreadPool_Controller::GetSafeSleepTime() method of the pool 00239 /// controller. If ThreadPool is already aborted and controller is reset 00240 /// then return time period of 1 second. 00241 CTimeSpan GetSafeSleepTime(void) const; 00242 00243 /// Mark that initialization of the interface was finished 00244 void SetInterfaceStarted(void); 00245 00246 00247 private: 00248 /// Type of queue used for storing tasks 00249 typedef CSyncQueue< CRef<CThreadPool_Task>, 00250 CSyncQueue_multiset< CRef<CThreadPool_Task>, 00251 SThreadPool_TaskCompare > > 00252 TQueue; 00253 /// Type of queue used for storing information about exclusive tasks 00254 typedef CSyncQueue<TExclusiveTaskInfo> TExclusiveQueue; 00255 /// Type of list of all poolled threads 00256 typedef set<CThreadPool_ThreadImpl*> TThreadsList; 00257 00258 00259 /// Prohibit copying and assigning 00260 CThreadPool_Impl(const CThreadPool_Impl&); 00261 CThreadPool_Impl& operator= (const CThreadPool_Impl&); 00262 00263 /// Transform size of queue given in constructor to the size passed to 00264 /// CSyncQueue constructor. 00265 /// Method can be called only from constructor because it initializes 00266 /// value of m_IsQueueAllowed member variable. 00267 unsigned int x_GetQueueSize(unsigned int queue_size); 00268 00269 /// Initialization of all class member variables that can be initialized 00270 /// outside of constructor 00271 /// @param pool_intf 00272 /// ThreadPool interface object attached to this implementation 00273 /// @param controller 00274 /// Controller for the pool 00275 void x_Init(CThreadPool* pool_intf, 00276 CThreadPool_Controller* controller, 00277 CThread::TRunMode threads_mode); 00278 00279 /// Destructor. Will be called from CRef 00280 ~CThreadPool_Impl(void); 00281 00282 /// Delete task from the queue 00283 /// If task does not exist in queue then does nothing. 00284 void x_RemoveTaskFromQueue(const CThreadPool_Task* task); 00285 00286 /// Cancel all tasks waiting in the queue 00287 void x_CancelQueuedTasks(void); 00288 00289 /// Cancel all currently executing tasks 00290 void x_CancelExecutingTasks(void); 00291 00292 /// Type of some simple predicate 00293 /// 00294 /// @sa x_WaitForPredicate 00295 typedef bool (CThreadPool_Impl::*TWaitPredicate)(void) const; 00296 00297 /// Check if new task can be added to the pool 00298 bool x_IsNewTaskAllowed(void) const; 00299 00300 /// Check if new task can be added to the pool when queuing is disabled 00301 bool x_CanAddImmediateTask(void) const; 00302 00303 /// Check if all threads in pool finished their work 00304 bool x_HasNoThreads(void) const; 00305 00306 /// Wait for some predicate to be true 00307 /// @param wait_func 00308 /// Predicate to wait for 00309 /// @param pool_guard 00310 /// Guardian that locks main pool mutex at the time of method call and 00311 /// that have to be unlocked for the time of waiting 00312 /// @param wait_sema 00313 /// Semaphore which will be posted when predicate become true 00314 /// @param timeout 00315 /// Maximum amount of time to wait 00316 /// @param timer 00317 /// Timer for mesuring elapsed time. Method assumes that timer is 00318 /// started at the moment from which timeout should be calculated. 00319 bool x_WaitForPredicate(TWaitPredicate wait_func, 00320 CThreadPool_Guard* pool_guard, 00321 CSemaphore* wait_sema, 00322 const CTimeSpan* timeout, 00323 const CStopWatch* timer); 00324 00325 00326 private: 00327 /// ThreadPool interface object attached to this implementation 00328 CThreadPool* m_Interface; 00329 /// Reference to this pool to prevent its destroying earlier than we 00330 /// allow it to 00331 CRef<CThreadPool_Impl> m_SelfRef; 00332 /// Timeout to wait for all threads to finish before the ThreadPool 00333 /// interface object will be able to destroy 00334 CTimeSpan m_DestroyTimeout; 00335 /// Queue for storing tasks 00336 TQueue m_Queue; 00337 /// Mutex for guarding all changes in the pool, its threads and controller 00338 CMutex m_MainPoolMutex; 00339 /// Semaphore for waiting for available threads to process task when 00340 /// queuing is disabled. 00341 CSemaphore m_RoomWait; 00342 /// Controller managing count of threads in pool 00343 CRef<CThreadPool_Controller> m_Controller; 00344 /// List of all idle threads 00345 TThreadsList m_IdleThreads; 00346 /// List of all threads currently executing some tasks 00347 TThreadsList m_WorkingThreads; 00348 /// Running mode of all threads 00349 CThread::TRunMode m_ThreadsMode; 00350 /// Total number of threads 00351 /// Introduced for more adequate and fast reflecting to threads starting 00352 /// and stopping events 00353 CAtomicCounter m_ThreadsCount; 00354 /// Number of tasks executing now 00355 /// Introduced for more adequate and fast reflecting to task executing 00356 /// start and finish events 00357 CAtomicCounter m_ExecutingTasks; 00358 /// Total number of tasks acquired by pool 00359 /// Includes queued tasks and executing tasks. Introduced for 00360 /// maintaining atomicity of this number changing 00361 CAtomicCounter m_TotalTasks; 00362 /// Flag about working with special case: 00363 /// FALSE - queue_size == 0, TRUE - queue_size > 0 00364 bool m_IsQueueAllowed; 00365 /// If pool is already aborted or not 00366 volatile bool m_Aborted; 00367 /// Semaphore for waiting for threads finishing in Abort() method 00368 /// 00369 /// @sa Abort() 00370 CSemaphore m_AbortWait; 00371 /// If pool is suspended for exclusive task execution or not. 00372 /// Thread Checker can complain that access to this variable everywhere is 00373 /// not guarded by some mutex. But it's okay because special care is taken 00374 /// to make any race a matter of timing - suspend will happen properly in 00375 /// any case. Also everything is written with the assumption that there's 00376 /// no other threads (besides this very thread pool) that could call any 00377 /// methods here. 00378 volatile bool m_Suspended; 00379 /// Requested requirements for the exclusive execution environment 00380 volatile TExclusiveFlags m_SuspendFlags; 00381 /// Flag indicating if flush of threads requested after adding exclusive 00382 /// task but before it is started its execution. 00383 volatile bool m_FlushRequested; 00384 /// Thread for execution of exclusive tasks and passing of events 00385 /// to the controller 00386 CRef<CThreadPool_ServiceThread> m_ServiceThread; 00387 /// Queue for information about exclusive tasks 00388 TExclusiveQueue m_ExclusiveQueue; 00389 }; 00390 00391 00392 00393 /// Real implementation of all CThreadPool_Thread functions 00394 class CThreadPool_ThreadImpl 00395 { 00396 public: 00397 /// Convert pointer to CThreadPool_Thread object into pointer 00398 /// to CThreadPool_ThreadImpl object. Can be done only here to avoid 00399 /// excessive friendship to CThreadPool_Thread class. 00400 static CThreadPool_ThreadImpl* 00401 s_GetImplPointer(CThreadPool_Thread* thread); 00402 00403 /// Create new CThreadPool_Thread object 00404 /// Method introduced to avoid excessive friendship to CThreadPool_Thread 00405 /// class. 00406 /// 00407 /// @sa CThreadPool_Thread::CThreadPool_Thread() 00408 static CThreadPool_Thread* s_CreateThread(CThreadPool* pool); 00409 00410 /// Constructor 00411 /// @param thread_intf 00412 /// ThreadPool_Thread interface object attached to this implementation 00413 /// @param pool 00414 /// Pool implementation owning this thread 00415 CThreadPool_ThreadImpl(CThreadPool_Thread* thread_intf, 00416 CThreadPool_Impl* pool); 00417 00418 /// Destructor 00419 /// Called directly from CThreadPool destructor 00420 ~CThreadPool_ThreadImpl(void); 00421 00422 /// Get ThreadPool interface object owning this thread 00423 /// 00424 /// @sa CThreadPool_Thread::GetPool() 00425 CThreadPool* GetPool(void) const; 00426 00427 /// Request this thread to finish its operation. 00428 /// It renders the thread unusable and eventually ready for destruction 00429 /// (as soon as its current task is finished and there are no CRefs to 00430 /// this thread left). 00431 void RequestToFinish(void); 00432 00433 /// If finishing of this thread is already in progress or not 00434 bool IsFinishing(void) const; 00435 00436 /// Wake up the thread from idle state 00437 /// 00438 /// @sa x_Idle 00439 void WakeUp(void); 00440 00441 /// Get task currently executing in the thread 00442 /// May be NULL if thread is idle or is in the middle of changing of 00443 /// current task 00444 /// 00445 /// @sa CThreadPool_Thread::GetCurrentTask() 00446 CRef<CThreadPool_Task> GetCurrentTask(void) const; 00447 00448 /// Request to cancel current task execution 00449 void CancelCurrentTask(void); 00450 00451 /// Implementation of thread Main() method 00452 /// 00453 /// @sa CThreadPool_Thread::Main() 00454 void Main(void); 00455 00456 /// Implementation of threadOnExit() method 00457 /// 00458 /// @sa CThreadPool_Thread::OnExit() 00459 void OnExit(void); 00460 00461 private: 00462 /// Prohibit copying and assigning 00463 CThreadPool_ThreadImpl(const CThreadPool_ThreadImpl&); 00464 CThreadPool_ThreadImpl& operator= (const CThreadPool_ThreadImpl&); 00465 00466 /// Suspend until the wake up signal. 00467 /// 00468 /// @sa WakeUp() 00469 void x_Idle(void); 00470 00471 /// Mark the thread idle or non-idle 00472 void x_SetIdleState(bool is_idle); 00473 00474 /// Do finalizing when task finished its execution 00475 /// @param status 00476 /// Status that the task must get 00477 void x_TaskFinished(CThreadPool_Task::EStatus status); 00478 00479 00480 /// ThreadPool_Thread interface object attached to this implementation 00481 CThreadPool_Thread* m_Interface; 00482 /// Pool running the thread 00483 CRef<CThreadPool_Impl> m_Pool; 00484 /// If the thread is already asked to finish or not 00485 volatile bool m_Finishing; 00486 /// If cancel of the currently executing task is requested or not 00487 volatile bool m_CancelRequested; 00488 /// Idleness of the thread 00489 bool m_IsIdle; 00490 /// Task currently executing in the thread 00491 CRef<CThreadPool_Task> m_CurrentTask; 00492 /// Semaphore for waking up from idle waiting 00493 CSemaphore m_IdleTrigger; 00494 }; 00495 00496 00497 00498 /// Thread used in pool for different internal needs: execution of exclusive 00499 /// tasks and passing events to controller 00500 class CThreadPool_ServiceThread : public CThread 00501 { 00502 public: 00503 /// Constructor 00504 /// @param pool 00505 /// ThreadPool owning this thread 00506 CThreadPool_ServiceThread(CThreadPool_Impl* pool); 00507 00508 /// Wake up from idle waiting or waiting of pool preparing exclusive 00509 /// environment 00510 void WakeUp(void); 00511 00512 /// Request finishing of the thread 00513 void RequestToFinish(void); 00514 00515 /// Check if this thread have already finished or not 00516 bool IsFinished(void); 00517 00518 /// Tell the thread that controller should handle eOther event 00519 /// 00520 /// @sa CThreadPool_Controller::HandleEvent() 00521 void NeedCallController(void); 00522 00523 protected: 00524 /// Destructor. Will be called from CRef 00525 virtual ~CThreadPool_ServiceThread(void); 00526 00527 private: 00528 /// Main thread execution 00529 virtual void* Main(void); 00530 00531 /// Do "idle" work when thread is not busy executing exclusive tasks 00532 void x_Idle(void); 00533 00534 /// Wait until pool is ready for execution of exclusive task 00535 void x_WaitForPoolStop(CThreadPool_Guard* pool_guard); 00536 00537 /// Pool owning this thread 00538 CRef<CThreadPool_Impl> m_Pool; 00539 /// Semaphore for idle sleeping 00540 CSemaphore m_IdleTrigger; 00541 /// If finishing of the thread is already requested 00542 volatile bool m_Finishing; 00543 /// If the thread has already finished its Main() method 00544 volatile bool m_Finished; 00545 /// Currently executing exclusive task 00546 CRef<CThreadPool_Task> m_CurrentTask; 00547 /// Flag indicating that thread should pass eOther event to the controller 00548 CAtomicCounter m_NeedCallController; 00549 }; 00550 00551 00552 00553 /// Guardian for protecting pool by locking its main mutex 00554 class CThreadPool_Guard : private CMutexGuard 00555 { 00556 public: 00557 /// Constructor 00558 /// @param pool 00559 /// Pool to protect 00560 /// @param is_active 00561 /// If the mutex should be locked in constructor or not 00562 CThreadPool_Guard(CThreadPool_Impl* pool, bool is_active = true); 00563 00564 /// Turn this guardian on 00565 void Guard(void); 00566 00567 /// Turn this guardian off 00568 void Release(void); 00569 00570 private: 00571 /// Pool protected by the guardian 00572 CThreadPool_Impl* m_Pool; 00573 }; 00574 00575 00576 00577 /// Special task which does nothing 00578 /// It's used in FlushThreads to force pool to wait while all old threads 00579 /// finish their operation to start new ones. 00580 /// 00581 /// @sa CThreadPool_Impl::FlushThreads() 00582 class CThreadPool_EmptyTask : public CThreadPool_Task 00583 { 00584 public: 00585 /// Empty main method 00586 virtual EStatus Execute(void) { return eCompleted; } 00587 00588 // In the absence of the following constructor, new compilers (as required 00589 // by the new C++ standard) may fill the object memory with zeros, 00590 // erasing flags set by CObject::operator new (see CXX-1808) 00591 CThreadPool_EmptyTask(void) {} 00592 }; 00593 00594 00595 00596 /// Check if status returned from CThreadPool_Task::Execute() is allowed 00597 /// and change it to eCompleted value if it is invalid 00598 static inline CThreadPool_Task::EStatus 00599 s_ConvertTaskResult(CThreadPool_Task::EStatus status) 00600 { 00601 _ASSERT(status == CThreadPool_Task::eCompleted 00602 || status == CThreadPool_Task::eFailed 00603 || status == CThreadPool_Task::eCanceled); 00604 00605 if (status != CThreadPool_Task::eCompleted 00606 && status != CThreadPool_Task::eFailed 00607 && status != CThreadPool_Task::eCanceled) 00608 { 00609 ERR_POST_X(9, Critical 00610 << "Wrong status returned from " 00611 "CThreadPool_Task::Execute(): " 00612 << status); 00613 status = CThreadPool_Task::eCompleted; 00614 } 00615 00616 return status; 00617 } 00618 00619 00620 00621 const CAtomicCounter::TValue kNeedCallController_Shift = 0x0FFFFFFF; 00622 00623 00624 inline void 00625 CThreadPool_ServiceThread::WakeUp(void) 00626 { 00627 m_IdleTrigger.Post(); 00628 } 00629 00630 inline void 00631 CThreadPool_ServiceThread::NeedCallController(void) 00632 { 00633 if (m_NeedCallController.Add(1) > kNeedCallController_Shift + 1) { 00634 m_NeedCallController.Add(-1); 00635 } 00636 else { 00637 WakeUp(); 00638 } 00639 } 00640 00641 00642 00643 inline void 00644 CThreadPool_ThreadImpl::WakeUp(void) 00645 { 00646 m_IdleTrigger.Post(); 00647 } 00648 00649 00650 00651 inline CMutex& 00652 CThreadPool_Impl::GetMainPoolMutex(void) 00653 { 00654 return m_MainPoolMutex; 00655 } 00656 00657 00658 00659 CThreadPool_Guard::CThreadPool_Guard(CThreadPool_Impl* pool, bool is_active) 00660 : CMutexGuard(eEmptyGuard), 00661 m_Pool(pool) 00662 { 00663 _ASSERT(pool); 00664 00665 if (is_active) 00666 Guard(); 00667 } 00668 00669 void 00670 CThreadPool_Guard::Guard(void) 00671 { 00672 CMutexGuard::Guard(m_Pool->GetMainPoolMutex()); 00673 } 00674 00675 void 00676 CThreadPool_Guard::Release(void) 00677 { 00678 CMutexGuard::Release(); 00679 } 00680 00681 00682 00683 inline void 00684 CThreadPool_Impl::sx_SetTaskStatus(CThreadPool_Task* task, 00685 CThreadPool_Task::EStatus status) 00686 { 00687 task->x_SetStatus(status); 00688 } 00689 00690 inline void 00691 CThreadPool_Impl::sx_RequestToCancel(CThreadPool_Task* task) 00692 { 00693 task->x_RequestToCancel(); 00694 } 00695 00696 inline CThreadPool* 00697 CThreadPool_Impl::GetPoolInterface(void) const 00698 { 00699 return m_Interface; 00700 } 00701 00702 inline void 00703 CThreadPool_Impl::SetInterfaceStarted(void) 00704 { 00705 m_ServiceThread->Run(CThread::fRunDetached); 00706 } 00707 00708 inline bool 00709 CThreadPool_Impl::IsAborted(void) const 00710 { 00711 return m_Aborted; 00712 } 00713 00714 inline bool 00715 CThreadPool_Impl::IsSuspended(void) const 00716 { 00717 return m_Suspended; 00718 } 00719 00720 inline unsigned int 00721 CThreadPool_Impl::GetThreadsCount(void) const 00722 { 00723 return (unsigned int)m_ThreadsCount.Get(); 00724 } 00725 00726 inline unsigned int 00727 CThreadPool_Impl::GetQueuedTasksCount(void) const 00728 { 00729 return (unsigned int)m_Queue.GetSize(); 00730 } 00731 00732 inline unsigned int 00733 CThreadPool_Impl::GetExecutingTasksCount(void) const 00734 { 00735 return (unsigned int)m_ExecutingTasks.Get(); 00736 } 00737 00738 inline CTimeSpan 00739 CThreadPool_Impl::GetSafeSleepTime(void) const 00740 { 00741 // m_Controller variable can be uninitialized in only when ThreadPool 00742 // is already aborted 00743 CThreadPool_Controller* controller = m_Controller.GetNCPointerOrNull(); 00744 if (controller && ! m_Aborted) { 00745 return controller->GetSafeSleepTime(); 00746 } 00747 else { 00748 return CTimeSpan(0, 0); 00749 } 00750 } 00751 00752 inline void 00753 CThreadPool_Impl::CallController(CThreadPool_Controller::EEvent event) 00754 { 00755 CThreadPool_Controller* controller = m_Controller.GetNCPointerOrNull(); 00756 if (controller && ! m_Aborted && 00757 (! m_Suspended || event == CThreadPool_Controller::eSuspend)) 00758 { 00759 controller->HandleEvent(event); 00760 } 00761 } 00762 00763 inline void 00764 CThreadPool_Impl::CallControllerOther(void) 00765 { 00766 CThreadPool_ServiceThread* thread = m_ServiceThread; 00767 if (thread) { 00768 thread->NeedCallController(); 00769 } 00770 } 00771 00772 inline void 00773 CThreadPool_Impl::TaskStarting(void) 00774 { 00775 m_ExecutingTasks.Add(1); 00776 // In current implementation controller operation doesn't depend on this 00777 // action. So we will save mutex locks for the sake of performance 00778 //CallControllerOther(); 00779 } 00780 00781 inline void 00782 CThreadPool_Impl::TaskFinished(void) 00783 { 00784 m_ExecutingTasks.Add(-1); 00785 m_TotalTasks.Add(-1); 00786 m_RoomWait.Post(); 00787 CallControllerOther(); 00788 } 00789 00790 inline void 00791 CThreadPool_Impl::ThreadStateChanged(void) 00792 { 00793 if (m_Aborted) { 00794 if (x_HasNoThreads()) { 00795 m_AbortWait.Post(); 00796 } 00797 } 00798 else if (m_Suspended) { 00799 if (((m_SuspendFlags & CThreadPool::fFlushThreads) 00800 && GetThreadsCount() == 0) 00801 || (! (m_SuspendFlags & CThreadPool::fFlushThreads) 00802 && m_WorkingThreads.size() == 0)) 00803 { 00804 m_ServiceThread->WakeUp(); 00805 } 00806 } 00807 } 00808 00809 inline void 00810 CThreadPool_Impl::ThreadStopped(CThreadPool_ThreadImpl* thread) 00811 { 00812 m_ThreadsCount.Add(-1); 00813 00814 CThreadPool_Guard guard(this); 00815 00816 m_IdleThreads.erase(thread); 00817 m_WorkingThreads.erase(thread); 00818 00819 CallControllerOther(); 00820 00821 ThreadStateChanged(); 00822 } 00823 00824 inline CRef<CThreadPool_Task> 00825 CThreadPool_Impl::TryGetNextTask(void) 00826 { 00827 if (!m_Suspended || (m_SuspendFlags & CThreadPool::fExecuteQueuedTasks)) { 00828 TQueue::TAccessGuard guard(m_Queue); 00829 00830 if (m_Queue.GetSize() != 0) { 00831 return m_Queue.Pop(); 00832 } 00833 } 00834 00835 return CRef<CThreadPool_Task>(); 00836 } 00837 00838 inline CThreadPool_Impl::TExclusiveTaskInfo 00839 CThreadPool_Impl::TryGetExclusiveTask(void) 00840 { 00841 TExclusiveQueue::TAccessGuard guard(m_ExclusiveQueue); 00842 00843 if (m_ExclusiveQueue.GetSize() != 0) { 00844 CThreadPool_Impl::TExclusiveTaskInfo info = m_ExclusiveQueue.Pop(); 00845 if (m_FlushRequested) { 00846 info.first |= CThreadPool::fFlushThreads; 00847 m_FlushRequested = false; 00848 } 00849 return info; 00850 } 00851 00852 return TExclusiveTaskInfo(0, CRef<CThreadPool_Task>()); 00853 } 00854 00855 inline bool 00856 CThreadPool_Impl::CanDoExclusiveTask(void) const 00857 { 00858 if ((m_SuspendFlags & CThreadPool::fExecuteQueuedTasks) 00859 && GetQueuedTasksCount() != 0) 00860 { 00861 return false; 00862 } 00863 00864 if ((m_SuspendFlags & CThreadPool::fFlushThreads) 00865 && GetThreadsCount() != 0) 00866 { 00867 return false; 00868 } 00869 00870 return m_WorkingThreads.size() == 0; 00871 } 00872 00873 inline void 00874 CThreadPool_Impl::RequestSuspend(TExclusiveFlags flags) 00875 { 00876 m_SuspendFlags = flags; 00877 m_Suspended = true; 00878 if (flags & CThreadPool::fCancelQueuedTasks) { 00879 x_CancelQueuedTasks(); 00880 } 00881 if (flags & CThreadPool::fCancelExecutingTasks) { 00882 x_CancelExecutingTasks(); 00883 } 00884 00885 if (flags & CThreadPool::fFlushThreads) { 00886 FinishThreads((unsigned int)m_IdleThreads.size()); 00887 } 00888 00889 CallController(CThreadPool_Controller::eSuspend); 00890 } 00891 00892 inline void 00893 CThreadPool_Impl::ResumeWork(void) 00894 { 00895 m_Suspended = false; 00896 00897 CallController(CThreadPool_Controller::eResume); 00898 00899 ITERATE(TThreadsList, it, m_IdleThreads) { 00900 (*it)->WakeUp(); 00901 } 00902 } 00903 00904 00905 00906 inline void 00907 CThreadPool_Controller::x_AttachToPool(CThreadPool_Impl* pool) 00908 { 00909 if (m_Pool != NULL) { 00910 NCBI_THROW(CThreadPoolException, eControllerBusy, 00911 "Cannot attach Controller to several ThreadPools."); 00912 } 00913 00914 m_Pool = pool; 00915 } 00916 00917 inline void 00918 CThreadPool_Controller::x_DetachFromPool(void) 00919 { 00920 m_Pool = NULL; 00921 } 00922 00923 00924 00925 CThreadPool_Task::CThreadPool_Task(unsigned int priority) 00926 { 00927 x_Init(priority); 00928 } 00929 00930 CThreadPool_Task::CThreadPool_Task(const CThreadPool_Task& other) 00931 { 00932 x_Init(other.m_Priority); 00933 } 00934 00935 void 00936 CThreadPool_Task::x_Init(unsigned int priority) 00937 { 00938 m_Pool = NULL; 00939 m_Priority = priority; 00940 // Thread Checker complains here but this code is called only from 00941 // constructor, so no one else can reference this task yet. 00942 m_Status = eIdle; 00943 m_CancelRequested = false; 00944 } 00945 00946 CThreadPool_Task::~CThreadPool_Task(void) 00947 {} 00948 00949 CThreadPool_Task& 00950 CThreadPool_Task::operator= (const CThreadPool_Task& other) 00951 { 00952 if (m_IsBusy.Get() != 0) { 00953 NCBI_THROW(CThreadPoolException, eTaskBusy, 00954 "Cannot change task when it is already added " 00955 "to ThreadPool"); 00956 } 00957 00958 CObject::operator= (other); 00959 // There can be race with CThreadPool_Impl::AddTask() 00960 // If task will be already added to queue and priority will be then 00961 // changed queue can crush later 00962 m_Priority = other.m_Priority; 00963 return *this; 00964 } 00965 00966 void 00967 CThreadPool_Task::OnStatusChange(EStatus /* old */) 00968 {} 00969 00970 void 00971 CThreadPool_Task::OnCancelRequested(void) 00972 {} 00973 00974 inline void 00975 CThreadPool_Task::x_SetOwner(CThreadPool_Impl* pool) 00976 { 00977 if (m_IsBusy.Add(1) != 1) { 00978 m_IsBusy.Add(-1); 00979 NCBI_THROW(CThreadPoolException, eTaskBusy, 00980 "Cannot add task in ThreadPool several times"); 00981 } 00982 00983 // Thread Checker complains that this races with task canceling and 00984 // resetting m_Pool below. But it's an thread pool usage error if 00985 // someone tries to call concurrently AddTask and CancelTask. With a proper 00986 // workflow CancelTask shouldn't be called until AddTask has returned. 00987 m_Pool = pool; 00988 } 00989 00990 inline void 00991 CThreadPool_Task::x_ResetOwner(void) 00992 { 00993 m_Pool = NULL; 00994 m_IsBusy.Add(-1); 00995 } 00996 00997 void 00998 CThreadPool_Task::x_SetStatus(EStatus new_status) 00999 { 01000 EStatus old_status = m_Status; 01001 if (old_status != new_status && old_status != eCanceled) { 01002 // Thread Checker complains here, but all status transitions are 01003 // properly guarded with different mutexes and they cannot mix with 01004 // each other. 01005 m_Status = new_status; 01006 OnStatusChange(old_status); 01007 } 01008 01009 if (IsFinished()) { 01010 // Thread Checker complains here. See comment in x_SetOwner above for 01011 // details. 01012 m_Pool = NULL; 01013 } 01014 } 01015 01016 inline void 01017 CThreadPool_Task::x_RequestToCancel(void) 01018 { 01019 m_CancelRequested = true; 01020 01021 OnCancelRequested(); 01022 01023 if (GetStatus() <= eQueued) { 01024 // This can race with calling task's Execute() method but it's okay. 01025 // For details see comment in CThreadPool_ThreadImpl::Main(). 01026 x_SetStatus(eCanceled); 01027 } 01028 } 01029 01030 void 01031 CThreadPool_Task::RequestToCancel(void) 01032 { 01033 // Protect from possible reseting of the pool variable during execution 01034 CThreadPool_Impl* pool = m_Pool; 01035 if (IsFinished()) { 01036 return; 01037 } 01038 else if (!pool) { 01039 x_RequestToCancel(); 01040 } 01041 else { 01042 pool->CancelTask(this); 01043 } 01044 } 01045 01046 CThreadPool* 01047 CThreadPool_Task::GetPool(void) const 01048 { 01049 // Protect from possible reseting of the pool variable during execution 01050 CThreadPool_Impl* pool_impl = m_Pool; 01051 return pool_impl? pool_impl->GetPoolInterface(): NULL; 01052 } 01053 01054 01055 01056 CThreadPool_ServiceThread::CThreadPool_ServiceThread(CThreadPool_Impl* pool) 01057 : m_Pool(pool), 01058 m_IdleTrigger(0, kMax_Int), 01059 m_Finishing(false), 01060 m_Finished(false) 01061 { 01062 _ASSERT(pool); 01063 01064 m_NeedCallController.Set(kNeedCallController_Shift); 01065 } 01066 01067 CThreadPool_ServiceThread::~CThreadPool_ServiceThread(void) 01068 {} 01069 01070 inline bool 01071 CThreadPool_ServiceThread::IsFinished(void) 01072 { 01073 return m_Finished; 01074 } 01075 01076 inline void 01077 CThreadPool_ServiceThread::x_Idle(void) 01078 { 01079 if (m_NeedCallController.Add(-1) < kNeedCallController_Shift) { 01080 m_NeedCallController.Add(1); 01081 } 01082 m_Pool->CallController(CThreadPool_Controller::eOther); 01083 01084 CTimeSpan timeout = m_Pool->GetSafeSleepTime(); 01085 m_IdleTrigger.TryWait(timeout.GetCompleteSeconds(), 01086 timeout.GetNanoSecondsAfterSecond()); 01087 } 01088 01089 inline void 01090 CThreadPool_ServiceThread::x_WaitForPoolStop(CThreadPool_Guard* pool_guard) 01091 { 01092 while (! m_Pool->IsAborted() && ! m_Pool->CanDoExclusiveTask()) { 01093 pool_guard->Release(); 01094 m_IdleTrigger.Wait(); 01095 pool_guard->Guard(); 01096 } 01097 } 01098 01099 inline void 01100 CThreadPool_ServiceThread::RequestToFinish(void) 01101 { 01102 m_Finishing = true; 01103 WakeUp(); 01104 01105 CThreadPool_Task* task = m_CurrentTask; 01106 if (task) { 01107 CThreadPool_Impl::sx_RequestToCancel(task); 01108 } 01109 } 01110 01111 void* 01112 CThreadPool_ServiceThread::Main(void) 01113 { 01114 while (! m_Finishing) { 01115 CThreadPool_Impl::TExclusiveTaskInfo task_info = 01116 m_Pool->TryGetExclusiveTask(); 01117 m_CurrentTask = task_info.second; 01118 01119 if (m_CurrentTask.IsNull()) { 01120 x_Idle(); 01121 } 01122 else { 01123 CThreadPool_Guard guard(m_Pool); 01124 01125 if (m_Finishing) { 01126 if (! m_CurrentTask->IsCancelRequested()) { 01127 CThreadPool_Impl::sx_RequestToCancel(m_CurrentTask); 01128 } 01129 CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask, 01130 CThreadPool_Task::eCanceled); 01131 break; 01132 } 01133 01134 m_Pool->RequestSuspend(task_info.first); 01135 x_WaitForPoolStop(&guard); 01136 01137 if (m_Finishing) { 01138 if (!m_CurrentTask->IsCancelRequested()) { 01139 CThreadPool_Impl::sx_RequestToCancel(m_CurrentTask); 01140 } 01141 CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask, 01142 CThreadPool_Task::eCanceled); 01143 break; 01144 } 01145 01146 guard.Release(); 01147 01148 CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask, 01149 CThreadPool_Task::eExecuting); 01150 try { 01151 CThreadPool_Task::EStatus status = 01152 s_ConvertTaskResult(m_CurrentTask->Execute()); 01153 CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask, status); 01154 } 01155 catch (exception& e) { 01156 ERR_POST_X(11, "Exception from exclusive task in ThreadPool: " 01157 << e.what()); 01158 CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask, 01159 CThreadPool_Task::eFailed); 01160 } 01161 catch (...) { 01162 ERR_POST_X(12, "Unknown exception from exclusive task " 01163 "in ThreadPool."); 01164 CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask, 01165 CThreadPool_Task::eFailed); 01166 } 01167 01168 guard.Guard(); 01169 m_Pool->ResumeWork(); 01170 } 01171 } 01172 01173 m_Finished = true; 01174 m_Pool->ThreadStateChanged(); 01175 01176 return NULL; 01177 } 01178 01179 01180 01181 inline CThreadPool_ThreadImpl* 01182 CThreadPool_ThreadImpl::s_GetImplPointer(CThreadPool_Thread* thread) 01183 { 01184 return thread->m_Impl; 01185 } 01186 01187 inline CThreadPool_Thread* 01188 CThreadPool_ThreadImpl::s_CreateThread(CThreadPool* pool) 01189 { 01190 return new CThreadPool_Thread(pool); 01191 } 01192 01193 inline 01194 CThreadPool_ThreadImpl::CThreadPool_ThreadImpl 01195 ( 01196 CThreadPool_Thread* thread_intf, 01197 CThreadPool_Impl* pool 01198 ) 01199 : m_Interface(thread_intf), 01200 m_Pool(pool), 01201 m_Finishing(false), 01202 m_CancelRequested(false), 01203 m_IsIdle(true), 01204 m_IdleTrigger(0, kMax_Int) 01205 {} 01206 01207 inline 01208 CThreadPool_ThreadImpl::~CThreadPool_ThreadImpl(void) 01209 {} 01210 01211 inline CThreadPool* 01212 CThreadPool_ThreadImpl::GetPool(void) const 01213 { 01214 return m_Pool->GetPoolInterface(); 01215 } 01216 01217 inline bool 01218 CThreadPool_ThreadImpl::IsFinishing(void) const 01219 { 01220 return m_Finishing; 01221 } 01222 01223 inline CRef<CThreadPool_Task> 01224 CThreadPool_ThreadImpl::GetCurrentTask(void) const 01225 { 01226 return m_CurrentTask; 01227 } 01228 01229 inline void 01230 CThreadPool_ThreadImpl::x_SetIdleState(bool is_idle) 01231 { 01232 if (m_IsIdle != is_idle) { 01233 m_IsIdle = is_idle; 01234 m_Pool->SetThreadIdle(this, is_idle); 01235 } 01236 } 01237 01238 inline void 01239 CThreadPool_ThreadImpl::x_TaskFinished(CThreadPool_Task::EStatus status) 01240 { 01241 if (m_CurrentTask->GetStatus() == CThreadPool_Task::eExecuting) { 01242 CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask, status); 01243 } 01244 01245 m_CurrentTask.Reset(); 01246 m_Pool->TaskFinished(); 01247 } 01248 01249 inline void 01250 CThreadPool_ThreadImpl::x_Idle(void) 01251 { 01252 x_SetIdleState(true); 01253 01254 m_IdleTrigger.Wait(); 01255 } 01256 01257 inline void 01258 CThreadPool_ThreadImpl::RequestToFinish(void) 01259 { 01260 m_Finishing = true; 01261 WakeUp(); 01262 } 01263 01264 inline void 01265 CThreadPool_ThreadImpl::CancelCurrentTask(void) 01266 { 01267 // Avoid resetting of the pointer during execution 01268 // TODO: there's possible race if before we add reference on the task 01269 // m_CurrentTask will be reset to NULL, last reference will be removed and 01270 // task will be deleted. But nobody uses CThreadPool in this way, thus 01271 // this assignment is safe (ThreadPool won't own the last reference to 01272 // the task). 01273 CRef<CThreadPool_Task> task = m_CurrentTask; 01274 if (task.NotNull()) { 01275 CThreadPool_Impl::sx_RequestToCancel(task); 01276 } 01277 else { 01278 m_CancelRequested = true; 01279 } 01280 } 01281 01282 inline void 01283 CThreadPool_ThreadImpl::Main(void) 01284 { 01285 m_Interface->Initialize(); 01286 01287 while (!m_Finishing) { 01288 // We have to heed call to CancelCurrentTask() only after this point. 01289 // So we reset value of m_CancelRequested here without any mutexes. 01290 // If CancelCurrentTask() is called earlier or this assignment races 01291 // with assignment in CancelCurrentTask() then caller of 01292 // CancelCurrentTask() will make sure that TryGetNextTask() returns 01293 // NULL. 01294 m_CancelRequested = false; 01295 m_CurrentTask = m_Pool->TryGetNextTask(); 01296 01297 if (m_CurrentTask.IsNull()) { 01298 x_Idle(); 01299 } 01300 else { 01301 if (m_CurrentTask->IsCancelRequested() || m_CancelRequested) { 01302 // Some race can appear if task is canceled at the time 01303 // when it's being queued or at the time when it's being 01304 // unqueued 01305 if (! m_CurrentTask->IsCancelRequested()) { 01306 CThreadPool_Impl::sx_RequestToCancel(m_CurrentTask); 01307 } 01308 CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask, 01309 CThreadPool_Task::eCanceled); 01310 m_CurrentTask = NULL; 01311 continue; 01312 } 01313 01314 x_SetIdleState(false); 01315 m_Pool->TaskStarting(); 01316 01317 // This can race with canceling of the task. This can result in 01318 // task's Execute() method called with the state of eCanceled 01319 // already set or cancellation being totally ignored in the task's 01320 // status (m_CancelRequested will be still set). Both outcomes are 01321 // simple timing and cancellation should be checked in the task's 01322 // Execute() method anyways. The worst outcome here is that task 01323 // can be marked as eCanceled when it's completely and successfully 01324 // executed. I don't think it's too bad though. 01325 CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask, 01326 CThreadPool_Task::eExecuting); 01327 01328 try { 01329 CThreadPool_Task::EStatus status = 01330 s_ConvertTaskResult(m_CurrentTask->Execute()); 01331 x_TaskFinished(status); 01332 } 01333 catch (exception& e) { 01334 ERR_POST_X(7, "Exception from task in ThreadPool: " 01335 << e.what()); 01336 x_TaskFinished(CThreadPool_Task::eFailed); 01337 } 01338 catch (...) { 01339 x_TaskFinished(CThreadPool_Task::eFailed); 01340 throw; 01341 } 01342 } 01343 } 01344 } 01345 01346 inline void 01347 CThreadPool_ThreadImpl::OnExit(void) 01348 { 01349 try { 01350 m_Interface->Finalize(); 01351 } STD_CATCH_ALL_X(8, "Finalize") 01352 01353 m_Pool->ThreadStopped(this); 01354 } 01355 01356 01357 01358 inline CThreadPool_Impl* 01359 CThreadPool_Impl::s_GetImplPointer(CThreadPool* pool) 01360 { 01361 return pool->m_Impl; 01362 } 01363 01364 inline unsigned int 01365 CThreadPool_Impl::x_GetQueueSize(unsigned int queue_size) 01366 { 01367 if (queue_size == 0) { 01368 // 10 is just in case, in fact when queue_size == 0 pool will always 01369 // check for idle threads, so tasks will never crowd in the queue 01370 queue_size = 10; 01371 m_IsQueueAllowed = false; 01372 } 01373 else { 01374 m_IsQueueAllowed = true; 01375 } 01376 01377 return queue_size; 01378 } 01379 01380 inline 01381 CThreadPool_Impl::CThreadPool_Impl(CThreadPool* pool_intf, 01382 unsigned int queue_size, 01383 unsigned int max_threads, 01384 unsigned int min_threads, 01385 CThread::TRunMode threads_mode) 01386 : m_Queue(x_GetQueueSize(queue_size)), 01387 m_RoomWait(0, kMax_Int), 01388 m_AbortWait(0, kMax_Int) 01389 { 01390 x_Init(pool_intf, 01391 new CThreadPool_Controller_PID(max_threads, min_threads), 01392 threads_mode); 01393 } 01394 01395 inline 01396 CThreadPool_Impl::CThreadPool_Impl(CThreadPool* pool_intf, 01397 unsigned int queue_size, 01398 CThreadPool_Controller* controller, 01399 CThread::TRunMode threads_mode) 01400 : m_Queue(x_GetQueueSize(queue_size)), 01401 m_RoomWait(0, kMax_Int), 01402 m_AbortWait(0, kMax_Int) 01403 { 01404 x_Init(pool_intf, controller, threads_mode); 01405 } 01406 01407 void 01408 CThreadPool_Impl::x_Init(CThreadPool* pool_intf, 01409 CThreadPool_Controller* controller, 01410 CThread::TRunMode threads_mode) 01411 { 01412 m_Interface = pool_intf; 01413 m_SelfRef = this; 01414 m_DestroyTimeout = CTimeSpan(10, 0); 01415 m_ThreadsCount.Set(0); 01416 m_ExecutingTasks.Set(0); 01417 m_TotalTasks.Set(0); 01418 m_Aborted = false; 01419 m_Suspended = false; 01420 m_ThreadsMode = (threads_mode | CThread::fRunDetached) 01421 & ~CThread::fRunAllowST; 01422 01423 controller->x_AttachToPool(this); 01424 m_Controller = controller; 01425 01426 m_ServiceThread = new CThreadPool_ServiceThread(this); 01427 } 01428 01429 CThreadPool_Impl::~CThreadPool_Impl(void) 01430 {} 01431 01432 inline void 01433 CThreadPool_Impl::DestroyReference(void) 01434 { 01435 // Abort even if m_Aborted == true because threads can still be running 01436 // and we have to wait for their termination 01437 Abort(&m_DestroyTimeout); 01438 01439 m_Interface = NULL; 01440 m_ServiceThread = NULL; 01441 m_SelfRef = NULL; 01442 } 01443 01444 inline void 01445 CThreadPool_Impl::SetDestroyTimeout(const CTimeSpan& timeout) 01446 { 01447 m_DestroyTimeout = timeout; 01448 } 01449 01450 inline const CTimeSpan& 01451 CThreadPool_Impl::GetDestroyTimeout(void) const 01452 { 01453 return m_DestroyTimeout; 01454 } 01455 01456 void 01457 CThreadPool_Impl::LaunchThreads(unsigned int count) 01458 { 01459 if (count == 0) 01460 return; 01461 01462 CThreadPool_Guard guard(this); 01463 01464 for (unsigned int i = 0; i < count; ++i) { 01465 CRef<CThreadPool_Thread> thread(m_Interface->CreateThread()); 01466 m_IdleThreads.insert( 01467 CThreadPool_ThreadImpl::s_GetImplPointer(thread)); 01468 thread->Run(m_ThreadsMode); 01469 } 01470 01471 m_ThreadsCount.Add(count); 01472 CallControllerOther(); 01473 } 01474 01475 void 01476 CThreadPool_Impl::FinishThreads(unsigned int count) 01477 { 01478 if (count == 0) 01479 return; 01480 01481 CThreadPool_Guard guard(this); 01482 01483 // The cast is theoretically extraneous, but Sun's WorkShop 01484 // compiler otherwise calls the wrong versions of begin() and 01485 // end() and refuses to convert the resulting iterators. 01486 REVERSE_ITERATE(TThreadsList, it, 01487 static_cast<const TThreadsList&>(m_IdleThreads)) 01488 { 01489 // Maybe in case of several quick consecutive calls we should favor 01490 // the willing to finish several threads. 01491 //if ((*it)->IsFinishing()) 01492 // continue; 01493 01494 (*it)->RequestToFinish(); 01495 --count; 01496 if (count == 0) 01497 break; 01498 } 01499 01500 REVERSE_ITERATE(TThreadsList, it, 01501 static_cast<const TThreadsList&>(m_WorkingThreads)) 01502 { 01503 if (count == 0) 01504 break; 01505 01506 (*it)->RequestToFinish(); 01507 --count; 01508 } 01509 } 01510 01511 void 01512 CThreadPool_Impl::SetThreadIdle(CThreadPool_ThreadImpl* thread, bool is_idle) 01513 { 01514 CThreadPool_Guard guard(this); 01515 01516 if (is_idle && !m_Suspended && m_Queue.GetSize() != 0) { 01517 thread->WakeUp(); 01518 return; 01519 } 01520 01521 TThreadsList* to_del; 01522 TThreadsList* to_ins; 01523 if (is_idle) { 01524 to_del = &m_WorkingThreads; 01525 to_ins = &m_IdleThreads; 01526 } 01527 else { 01528 to_del = &m_IdleThreads; 01529 to_ins = &m_WorkingThreads; 01530 } 01531 01532 TThreadsList::iterator it = to_del->find(thread); 01533 if (it != to_del->end()) { 01534 to_del->erase(it); 01535 } 01536 to_ins->insert(thread); 01537 01538 if (is_idle && m_Suspended 01539 && (m_SuspendFlags & CThreadPool::fFlushThreads)) 01540 { 01541 thread->RequestToFinish(); 01542 } 01543 01544 ThreadStateChanged(); 01545 } 01546 01547 inline bool 01548 CThreadPool_Impl::x_IsNewTaskAllowed(void) const 01549 { 01550 return !m_Aborted 01551 && (!m_Suspended 01552 || !(m_SuspendFlags & CThreadPool::fDoNotAllowNewTasks)); 01553 } 01554 01555 bool 01556 CThreadPool_Impl::x_CanAddImmediateTask(void) const 01557 { 01558 // If pool aborts at some point in waiting it has to stop waiting 01559 // immediately 01560 return !x_IsNewTaskAllowed() 01561 || (!m_Suspended && (unsigned int)m_TotalTasks.Get() 01562 < m_Controller->GetMaxThreads()); 01563 } 01564 01565 bool 01566 CThreadPool_Impl::x_HasNoThreads(void) const 01567 { 01568 CThreadPool_ServiceThread* thread = m_ServiceThread.GetNCPointerOrNull(); 01569 return m_IdleThreads.size() + m_WorkingThreads.size() == 0 01570 && (! thread || thread->IsFinished()); 01571 } 01572 01573 bool 01574 CThreadPool_Impl::x_WaitForPredicate(TWaitPredicate wait_func, 01575 CThreadPool_Guard* pool_guard, 01576 CSemaphore* wait_sema, 01577 const CTimeSpan* timeout, 01578 const CStopWatch* timer) 01579 { 01580 while (!(this->*wait_func)()) { 01581 pool_guard->Release(); 01582 01583 if (timeout) { 01584 CTimeSpan next_tm = CTimeSpan(timeout->GetAsDouble() 01585 - timer->Elapsed()); 01586 if (next_tm.GetSign() == eNegative) { 01587 return false; 01588 } 01589 01590 if (! wait_sema->TryWait(next_tm.GetCompleteSeconds(), 01591 next_tm.GetNanoSecondsAfterSecond())) 01592 { 01593 return false; 01594 } 01595 } 01596 else { 01597 wait_sema->Wait(); 01598 } 01599 01600 pool_guard->Guard(); 01601 } 01602 01603 return true; 01604 } 01605 01606 /// Throw an exception with standard message when AddTask() is called 01607 /// but ThreadPool is aborted or do not allow new tasks 01608 NCBI_NORETURN 01609 static inline void 01610 ThrowAddProhibited(void) 01611 { 01612 NCBI_THROW(CThreadPoolException, eProhibited, 01613 "Adding of new tasks is prohibited"); 01614 } 01615 01616 inline void 01617 CThreadPool_Impl::AddTask(CThreadPool_Task* task, const CTimeSpan* timeout) 01618 { 01619 _ASSERT(task); 01620 01621 // To be sure that if simple new operator was passed as argument the task 01622 // will still be referenced even if some exception happen in this method 01623 CRef<CThreadPool_Task> task_ref(task); 01624 01625 if (!x_IsNewTaskAllowed()) { 01626 ThrowAddProhibited(); 01627 } 01628 01629 CThreadPool_Guard guard(this, false); 01630 auto_ptr<CTimeSpan> real_timeout; 01631 01632 if (!m_IsQueueAllowed) { 01633 guard.Guard(); 01634 01635 CStopWatch timer(CStopWatch::eStart); 01636 if (! x_WaitForPredicate(&CThreadPool_Impl::x_CanAddImmediateTask, 01637 &guard, &m_RoomWait, timeout, &timer)) 01638 { 01639 NCBI_THROW(CSyncQueueException, eNoRoom, 01640 "Cannot add task - all threads are busy"); 01641 } 01642 01643 if (!x_IsNewTaskAllowed()) { 01644 ThrowAddProhibited(); 01645 } 01646 01647 if (timeout) { 01648 real_timeout.reset(new CTimeSpan(timeout->GetAsDouble() 01649 - timer.Elapsed())); 01650 } 01651 } 01652 01653 task->x_SetOwner(this); 01654 task->x_SetStatus(CThreadPool_Task::eQueued); 01655 try { 01656 // Pushing to queue must be out of mutex to be able to wait 01657 // for available space. 01658 m_Queue.Push(Ref(task), real_timeout.get()); 01659 } 01660 catch (...) { 01661 task->x_SetStatus(CThreadPool_Task::eIdle); 01662 task->x_ResetOwner(); 01663 throw; 01664 } 01665 01666 if (m_IsQueueAllowed) { 01667 guard.Guard(); 01668 } 01669 01670 // Check if someone aborted the pool or suspended it with cacelation of 01671 // queued tasks after we added this task to the queue but before we were 01672 // able to acquire the mutex 01673 CThreadPool::TExclusiveFlags check_flags 01674 = CThreadPool::fDoNotAllowNewTasks + CThreadPool::fCancelQueuedTasks; 01675 if (m_Aborted || (m_Suspended 01676 && (m_SuspendFlags & check_flags) == check_flags)) 01677 { 01678 if (m_Queue.GetSize() != 0) { 01679 x_CancelQueuedTasks(); 01680 } 01681 return; 01682 } 01683 01684 unsigned int cnt_req = (unsigned int)m_TotalTasks.Add(1); 01685 01686 if (!m_IsQueueAllowed && cnt_req > GetThreadsCount()) { 01687 LaunchThreads(cnt_req - GetThreadsCount()); 01688 } 01689 01690 if (! m_Suspended) { 01691 int count = GetQueuedTasksCount(); 01692 ITERATE(TThreadsList, it, m_IdleThreads) { 01693 if (! (*it)->IsFinishing()) { 01694 (*it)->WakeUp(); 01695 --count; 01696 if (count == 0) 01697 break; 01698 } 01699 } 01700 } 01701 01702 CallControllerOther(); 01703 } 01704 01705 inline void 01706 CThreadPool_Impl::x_RemoveTaskFromQueue(const CThreadPool_Task* task) 01707 { 01708 TQueue::TAccessGuard q_guard(m_Queue); 01709 01710 TQueue::TAccessGuard::TIterator it = q_guard.Begin(); 01711 while (it != q_guard.End() && *it != task) { 01712 ++it; 01713 } 01714 01715 if (it != q_guard.End()) { 01716 q_guard.Erase(it); 01717 } 01718 } 01719 01720 void 01721 CThreadPool_Impl::RequestExclusiveExecution(CThreadPool_Task* task, 01722 TExclusiveFlags flags) 01723 { 01724 _ASSERT(task); 01725 01726 // To be sure that if simple new operator was passed as argument the task 01727 // will still be referenced even if some exception happen in this method 01728 CRef<CThreadPool_Task> task_ref(task); 01729 01730 if (m_Aborted) { 01731 NCBI_THROW(CThreadPoolException, eProhibited, 01732 "Cannot add exclusive task when ThreadPool is aborted"); 01733 } 01734 01735 task->x_SetOwner(this); 01736 task->x_SetStatus(CThreadPool_Task::eQueued); 01737 m_ExclusiveQueue.Push(TExclusiveTaskInfo(flags, Ref(task))); 01738 01739 CThreadPool_ServiceThread* thread = m_ServiceThread; 01740 if (thread) { 01741 thread->WakeUp(); 01742 } 01743 } 01744 01745 void 01746 CThreadPool_Impl::CancelTask(CThreadPool_Task* task) 01747 { 01748 _ASSERT(task); 01749 01750 if (task->IsFinished()) { 01751 return; 01752 } 01753 // Some race can happen here if the task is being queued now 01754 if (task->GetStatus() == CThreadPool_Task::eIdle) { 01755 task->x_RequestToCancel(); 01756 return; 01757 } 01758 01759 CThreadPool* task_pool = task->GetPool(); 01760 if (task_pool != m_Interface) { 01761 if (!task_pool) { 01762 // Task have just finished - we can do nothing 01763 return; 01764 } 01765 01766 NCBI_THROW(CThreadPoolException, eInvalid, 01767 "Cannot cancel task execution " 01768 "if it is inserted in another ThreadPool"); 01769 } 01770 01771 task->x_RequestToCancel(); 01772 x_RemoveTaskFromQueue(task); 01773 01774 CallControllerOther(); 01775 } 01776 01777 inline void 01778 CThreadPool_Impl::CancelTasks(TExclusiveFlags tasks_group) 01779 { 01780 _ASSERT( (tasks_group & (CThreadPool::fCancelExecutingTasks 01781 + CThreadPool::fCancelQueuedTasks)) 01782 == tasks_group 01783 && tasks_group != 0); 01784 01785 if (tasks_group & CThreadPool::fCancelQueuedTasks) { 01786 x_CancelQueuedTasks(); 01787 } 01788 if (tasks_group & CThreadPool::fCancelExecutingTasks) { 01789 x_CancelExecutingTasks(); 01790 } 01791 01792 CallControllerOther(); 01793 } 01794 01795 void 01796 CThreadPool_Impl::x_CancelExecutingTasks(void) 01797 { 01798 CThreadPool_Guard guard(this); 01799 01800 ITERATE(TThreadsList, it, m_WorkingThreads) { 01801 (*it)->CancelCurrentTask(); 01802 } 01803 01804 // CThreadPool_ThreadImpl::Main() acts not under guard, so we cannot be 01805 // sure that it doesn't have already task to execute before it marked 01806 // itself as working 01807 ITERATE(TThreadsList, it, m_IdleThreads) { 01808 (*it)->CancelCurrentTask(); 01809 } 01810 } 01811 01812 void 01813 CThreadPool_Impl::x_CancelQueuedTasks(void) 01814 { 01815 TQueue::TAccessGuard q_guard(m_Queue); 01816 01817 for (TQueue::TAccessGuard::TIterator it = q_guard.Begin(); 01818 it != q_guard.End(); ++it) 01819 { 01820 it->GetNCPointer()->x_RequestToCancel(); 01821 } 01822 01823 m_Queue.Clear(); 01824 } 01825 01826 inline void 01827 CThreadPool_Impl::FlushThreads(CThreadPool::EFlushType flush_type) 01828 { 01829 CThreadPool_Guard guard(this); 01830 01831 if (m_Aborted) { 01832 NCBI_THROW(CThreadPoolException, eProhibited, 01833 "Cannot flush threads when ThreadPool aborted"); 01834 } 01835 01836 if (flush_type == CThreadPool::eStartImmediately 01837 || (flush_type == CThreadPool::eWaitToFinish && m_Suspended)) 01838 { 01839 FinishThreads(GetThreadsCount()); 01840 } 01841 else if (flush_type == CThreadPool::eWaitToFinish) { 01842 bool need_add = true; 01843 01844 {{ 01845 // To avoid races with TryGetExclusiveTask() we need to put 01846 // guard here 01847 TExclusiveQueue::TAccessGuard q_guard(m_ExclusiveQueue); 01848 01849 if (m_ExclusiveQueue.GetSize() != 0) { 01850 m_FlushRequested = true; 01851 need_add = false; 01852 } 01853 }} 01854 01855 if (need_add) { 01856 RequestExclusiveExecution(new CThreadPool_EmptyTask(), 01857 CThreadPool::fFlushThreads); 01858 } 01859 } 01860 } 01861 01862 inline void 01863 CThreadPool_Impl::Abort(const CTimeSpan* timeout) 01864 { 01865 CThreadPool_Guard guard(this); 01866 01867 // Method can be called several times in a row and every time we need 01868 // to wait for threads to finish operation 01869 m_Aborted = true; 01870 01871 x_CancelQueuedTasks(); 01872 x_CancelExecutingTasks(); 01873 01874 {{ 01875 TExclusiveQueue::TAccessGuard q_guard(m_ExclusiveQueue); 01876 01877 for (TExclusiveQueue::TAccessGuard::TIterator it = q_guard.Begin(); 01878 it != q_guard.End(); ++it) 01879 { 01880 it->second->x_RequestToCancel(); 01881 } 01882 01883 m_ExclusiveQueue.Clear(); 01884 }} 01885 01886 if (m_ServiceThread.NotNull()) { 01887 m_ServiceThread->RequestToFinish(); 01888 } 01889 01890 FinishThreads(GetThreadsCount()); 01891 01892 if (m_Controller.NotNull()) { 01893 m_Controller->x_DetachFromPool(); 01894 } 01895 01896 CStopWatch timer(CStopWatch::eStart); 01897 x_WaitForPredicate(&CThreadPool_Impl::x_HasNoThreads, 01898 &guard, &m_AbortWait, timeout, &timer); 01899 m_AbortWait.Post(); 01900 01901 // This assigning can destroy the controller. If some threads are not 01902 // finished yet and at this very moment will call controller it can crash. 01903 //m_Controller = NULL; 01904 } 01905 01906 01907 01908 CThreadPool_Controller::CThreadPool_Controller(unsigned int max_threads, 01909 unsigned int min_threads) 01910 : m_Pool(NULL), 01911 m_MinThreads(min_threads), 01912 m_MaxThreads(max_threads), 01913 m_InHandleEvent(false) 01914 { 01915 if (max_threads < min_threads || max_threads == 0) { 01916 NCBI_THROW_FMT(CThreadPoolException, eInvalid, 01917 "Invalid numbers of min and max number of threads:" 01918 " min=" << min_threads << ", max=" << max_threads); 01919 } 01920 } 01921 01922 CThreadPool_Controller::~CThreadPool_Controller(void) 01923 {} 01924 01925 CThreadPool* 01926 CThreadPool_Controller::GetPool(void) const 01927 { 01928 // Avoid changing of pointer during method execution 01929 CThreadPool_Impl* pool = m_Pool; 01930 return pool? pool->GetPoolInterface(): NULL; 01931 } 01932 01933 CMutex& 01934 CThreadPool_Controller::GetMainPoolMutex(CThreadPool* pool) const 01935 { 01936 CThreadPool_Impl* impl = CThreadPool_Impl::s_GetImplPointer(pool); 01937 if (!impl) { 01938 NCBI_THROW(CThreadPoolException, eInactive, 01939 "Cannot do active work when not attached " 01940 "to some ThreadPool"); 01941 } 01942 return impl->GetMainPoolMutex(); 01943 } 01944 01945 void 01946 CThreadPool_Controller::EnsureLimits(void) 01947 { 01948 CThreadPool_Impl* pool = m_Pool; 01949 01950 if (! pool) 01951 return; 01952 01953 Uint4 count = pool->GetThreadsCount(); 01954 if (count > m_MaxThreads) { 01955 pool->FinishThreads(count - m_MaxThreads); 01956 } 01957 if (count < m_MinThreads) { 01958 pool->LaunchThreads(m_MinThreads - count); 01959 } 01960 } 01961 01962 void 01963 CThreadPool_Controller::SetMinThreads(unsigned int min_threads) 01964 { 01965 CThreadPool_Guard guard(m_Pool, false); 01966 if (m_Pool) 01967 guard.Guard(); 01968 01969 m_MinThreads = min_threads; 01970 01971 EnsureLimits(); 01972 } 01973 01974 void 01975 CThreadPool_Controller::SetMaxThreads(unsigned int max_threads) 01976 { 01977 CThreadPool_Guard guard(m_Pool, false); 01978 if (m_Pool) 01979 guard.Guard(); 01980 01981 m_MaxThreads = max_threads; 01982 01983 EnsureLimits(); 01984 } 01985 01986 void 01987 CThreadPool_Controller::SetThreadsCount(unsigned int count) 01988 { 01989 if (count > GetMaxThreads()) 01990 count = GetMaxThreads(); 01991 if (count < GetMinThreads()) 01992 count = GetMinThreads(); 01993 01994 CThreadPool_Impl* pool = m_Pool; 01995 01996 unsigned int now_cnt = pool->GetThreadsCount(); 01997 if (count > now_cnt) { 01998 pool->LaunchThreads(count - now_cnt); 01999 } 02000 else if (count < now_cnt) { 02001 pool->FinishThreads(now_cnt - count); 02002 } 02003 } 02004 02005 void 02006 CThreadPool_Controller::HandleEvent(EEvent event) 02007 { 02008 CThreadPool_Impl* pool = m_Pool; 02009 if (! pool) 02010 return; 02011 02012 CThreadPool_Guard guard(pool); 02013 02014 if (m_InHandleEvent || pool->IsAborted() || pool->IsSuspended()) 02015 return; 02016 02017 m_InHandleEvent = true; 02018 02019 try { 02020 OnEvent(event); 02021 m_InHandleEvent = false; 02022 } 02023 catch (...) { 02024 m_InHandleEvent = false; 02025 throw; 02026 } 02027 } 02028 02029 CTimeSpan 02030 CThreadPool_Controller::GetSafeSleepTime(void) const 02031 { 02032 if (m_Pool) { 02033 return CTimeSpan(1, 0); 02034 } 02035 else { 02036 return CTimeSpan(0, 0); 02037 } 02038 } 02039 02040 02041 02042 CThreadPool_Thread::CThreadPool_Thread(CThreadPool* pool) 02043 { 02044 _ASSERT(pool); 02045 02046 m_Impl = new CThreadPool_ThreadImpl(this, 02047 CThreadPool_Impl::s_GetImplPointer(pool)); 02048 } 02049 02050 CThreadPool_Thread::~CThreadPool_Thread(void) 02051 { 02052 delete m_Impl; 02053 } 02054 02055 void 02056 CThreadPool_Thread::Initialize(void) 02057 {} 02058 02059 void 02060 CThreadPool_Thread::Finalize(void) 02061 {} 02062 02063 CThreadPool* 02064 CThreadPool_Thread::GetPool(void) const 02065 { 02066 return m_Impl->GetPool(); 02067 } 02068 02069 CRef<CThreadPool_Task> 02070 CThreadPool_Thread::GetCurrentTask(void) const 02071 { 02072 return m_Impl->GetCurrentTask(); 02073 } 02074 02075 void* 02076 CThreadPool_Thread::Main(void) 02077 { 02078 m_Impl->Main(); 02079 return NULL; 02080 } 02081 02082 void 02083 CThreadPool_Thread::OnExit(void) 02084 { 02085 m_Impl->OnExit(); 02086 } 02087 02088 02089 02090 CThreadPool::CThreadPool(unsigned int queue_size, 02091 unsigned int max_threads, 02092 unsigned int min_threads, 02093 CThread::TRunMode threads_mode) 02094 { 02095 m_Impl = new CThreadPool_Impl(this, queue_size, max_threads, min_threads, 02096 threads_mode); 02097 m_Impl->SetInterfaceStarted(); 02098 } 02099 02100 CThreadPool::CThreadPool(unsigned int queue_size, 02101 CThreadPool_Controller* controller, 02102 CThread::TRunMode threads_mode) 02103 { 02104 m_Impl = new CThreadPool_Impl(this, queue_size, controller, threads_mode); 02105 m_Impl->SetInterfaceStarted(); 02106 } 02107 02108 CThreadPool::~CThreadPool(void) 02109 { 02110 m_Impl->DestroyReference(); 02111 } 02112 02113 CMutex& 02114 CThreadPool::GetMainPoolMutex(void) 02115 { 02116 return m_Impl->GetMainPoolMutex(); 02117 } 02118 02119 CThreadPool_Thread* 02120 CThreadPool::CreateThread(void) 02121 { 02122 return CThreadPool_ThreadImpl::s_CreateThread(this); 02123 } 02124 02125 void 02126 CThreadPool::AddTask(CThreadPool_Task* task, const CTimeSpan* timeout) 02127 { 02128 m_Impl->AddTask(task, timeout); 02129 } 02130 02131 void 02132 CThreadPool::CancelTask(CThreadPool_Task* task) 02133 { 02134 m_Impl->CancelTask(task); 02135 } 02136 02137 void 02138 CThreadPool::Abort(const CTimeSpan* timeout) 02139 { 02140 m_Impl->Abort(timeout); 02141 } 02142 02143 bool 02144 CThreadPool::IsAborted(void) const 02145 { 02146 return m_Impl->IsAborted(); 02147 } 02148 02149 void 02150 CThreadPool::SetDestroyTimeout(const CTimeSpan& timeout) 02151 { 02152 m_Impl->SetDestroyTimeout(timeout); 02153 } 02154 02155 const CTimeSpan& 02156 CThreadPool::GetDestroyTimeout(void) const 02157 { 02158 return m_Impl->GetDestroyTimeout(); 02159 } 02160 02161 void 02162 CThreadPool::RequestExclusiveExecution(CThreadPool_Task* task, 02163 TExclusiveFlags flags) 02164 { 02165 m_Impl->RequestExclusiveExecution(task, flags); 02166 } 02167 02168 void 02169 CThreadPool::CancelTasks(TExclusiveFlags tasks_group) 02170 { 02171 m_Impl->CancelTasks(tasks_group); 02172 } 02173 02174 void 02175 CThreadPool::FlushThreads(EFlushType flush_type) 02176 { 02177 m_Impl->FlushThreads(flush_type); 02178 } 02179 02180 unsigned int 02181 CThreadPool::GetThreadsCount(void) const 02182 { 02183 return m_Impl->GetThreadsCount(); 02184 } 02185 02186 unsigned int 02187 CThreadPool::GetQueuedTasksCount(void) const 02188 { 02189 return m_Impl->GetQueuedTasksCount(); 02190 } 02191 02192 unsigned int 02193 CThreadPool::GetExecutingTasksCount(void) const 02194 { 02195 return m_Impl->GetExecutingTasksCount(); 02196 } 02197 02198 02199 02200 END_NCBI_SCOPE
1.7.5.1
Modified on Wed May 23 12:58:16 2012 by modify_doxy.py rev. 337098