NCBI C++ ToolKit
thread_pool.cpp
Go to the documentation of this file.
00001 /*  $Id: thread_pool.cpp 53797 2012-04-18 19:34:34Z ivanovp $
00002 * ===========================================================================
00003 *
00004 *                            PUBLIC DOMAIN NOTICE
00005 *               National Center for Biotechnology Information
00006 *
00007 *  This software/database is a "United States Government Work" under the
00008 *  terms of the United States Copyright Act.  It was written as part of
00009 *  the author's official duties as a United States Government employee and
00010 *  thus cannot be copyrighted.  This software/database is freely available
00011 *  to the public for use. The National Library of Medicine and the U.S.
00012 *  Government have not placed any restriction on its use or reproduction.
00013 *
00014 *  Although all reasonable efforts have been taken to ensure the accuracy
00015 *  and reliability of the software and data, the NLM and the U.S.
00016 *  Government do not and cannot warrant the performance or results that
00017 *  may be obtained by using this software or data. The NLM and the U.S.
00018 *  Government disclaim all warranties, express or implied, including
00019 *  warranties of performance, merchantability or fitness for any particular
00020 *  purpose.
00021 *
00022 *  Please cite the author in any work or product based on this material.
00023 *
00024 * ===========================================================================
00025 *
00026 * Author:  Pavel Ivanov
00027 *
00028 * File Description:
00029 *   Pool of threads.
00030 */
00031 
00032 #include <ncbi_pch.hpp>
00033 #include <util/thread_pool.hpp>
00034 #include <util/thread_pool_ctrl.hpp>
00035 #include <util/sync_queue.hpp>
00036 #include <util/error_codes.hpp>
00037 
00038 #define NCBI_USE_ERRCODE_X  Util_Thread
00039 
00040 BEGIN_NCBI_SCOPE
00041 
00042 
00043 class CThreadPool_Guard;
00044 class CThreadPool_ServiceThread;
00045 
00046 
00047 /// Functor to compare tasks by priority
00048 struct SThreadPool_TaskCompare {
00049     bool operator() (const CRef<CThreadPool_Task>& left,
00050                      const CRef<CThreadPool_Task>& right)
00051     {
00052         return left->GetPriority() < right->GetPriority();
00053     }
00054 };
00055 
00056 
00057 /// Real implementation of all ThreadPool functions
00058 class CThreadPool_Impl : public CObject
00059 {
00060 public:
00061     typedef CThreadPool::TExclusiveFlags  TExclusiveFlags;
00062 
00063     /// Convert pointer to CThreadPool object into pointer to CThreadPool_Impl
00064     /// object. Can be done only here to avoid excessive friendship to
00065     /// CThreadPool class.
00066     static CThreadPool_Impl* s_GetImplPointer(CThreadPool* pool);
00067 
00068     /// Call x_SetTaskStatus() for the given task.
00069     /// Method introduced to avoid excessive friendship to CThreadPool_Task
00070     /// class.
00071     ///
00072     /// @sa CThreadPool_Task::x_SetTaskStatus()
00073     static void sx_SetTaskStatus(CThreadPool_Task*          task,
00074                                  CThreadPool_Task::EStatus  status);
00075 
00076     /// Call x_RequestToCancel() for the given task.
00077     /// Method introduced to avoid excessive friendship to CThreadPool_Task
00078     /// class.
00079     ///
00080     /// @sa CThreadPool_Task::x_RequestToCancel()
00081     static void sx_RequestToCancel(CThreadPool_Task* task);
00082 
00083 
00084     /// Constructor with default controller
00085     /// @param pool_intf
00086     ///   ThreadPool interface object attached to this implementation
00087     ///
00088     /// @sa CThreadPool::CThreadPool()
00089     CThreadPool_Impl(CThreadPool*      pool_intf,
00090                      unsigned int      queue_size,
00091                      unsigned int      max_threads,
00092                      unsigned int      min_threads,
00093                      CThread::TRunMode threads_mode = CThread::fRunDefault);
00094 
00095     /// Constructor with explicitly given controller
00096     /// @param pool_intf
00097     ///   ThreadPool interface object attached to this implementation
00098     ///
00099     /// @sa CThreadPool::CThreadPool()
00100     CThreadPool_Impl(CThreadPool*        pool_intf,
00101                      unsigned int        queue_size,
00102                      CThreadPool_Controller* controller,
00103                      CThread::TRunMode   threads_mode = CThread::fRunDefault);
00104 
00105     /// Get pointer to ThreadPool interface object
00106     CThreadPool* GetPoolInterface(void) const;
00107 
00108     /// Set destroy timeout for the pool
00109     ///
00110     /// @sa CThreadPool::SetDestroyTimeout()
00111     void SetDestroyTimeout(const CTimeSpan& timeout);
00112 
00113     /// Get destroy timeout for the pool
00114     ///
00115     /// @sa CThreadPool::GetDestroyTimeout()
00116     const CTimeSpan& GetDestroyTimeout(void) const;
00117 
00118     /// Destroy reference to this object
00119     /// Method is called when CThreadPool object is destroyed which means
00120     /// that implementation can be destroyed too if there is no references
00121     /// to it left.
00122     void DestroyReference(void);
00123 
00124     /// Get main pool mutex
00125     ///
00126     /// @sa CThreadPool::GetMainPoolMutex()
00127     CMutex& GetMainPoolMutex(void);
00128 
00129     /// Add task to the pool
00130     ///
00131     /// @sa CThreadPool::AddTask()
00132     void AddTask(CThreadPool_Task* task, const CTimeSpan* timeout);
00133 
00134     /// Request to cancel the task
00135     ///
00136     /// @sa CThreadPool::CancelTask()
00137     void CancelTask(CThreadPool_Task* task);
00138 
00139     /// Cancel the selected groups of tasks in the pool
00140     ///
00141     /// @sa CThreadPool::CancelTasks()
00142     void CancelTasks(TExclusiveFlags tasks_group);
00143 
00144     /// Add the task for exclusive execution in the pool
00145     ///
00146     /// @sa CThreadPool::RequestExclusiveExecution()
00147     void RequestExclusiveExecution(CThreadPool_Task*  task,
00148                                    TExclusiveFlags    flags);
00149 
00150     /// Launch new threads in pool
00151     /// @param count
00152     ///   Number of threads to launch
00153     void LaunchThreads(unsigned int count);
00154 
00155     /// Finish threads in pool
00156     /// Stop first all idle threads then stop busy threads without
00157     /// cancelation of currently executing tasks.
00158     /// @param count
00159     ///   Number of threads to finish
00160     void FinishThreads(unsigned int count);
00161 
00162     /// Get number of threads running in the pool
00163     unsigned int GetThreadsCount(void) const;
00164 
00165     /// Mark thread as idle or non-idle
00166     /// @param thread
00167     ///   Thread to mark
00168     /// @param is_idle
00169     ///   If thread should be marked as idle or not
00170     void SetThreadIdle(CThreadPool_ThreadImpl* thread, bool is_idle);
00171 
00172     /// Callback from working thread when it finished its Main() method
00173     void ThreadStopped(CThreadPool_ThreadImpl* thread);
00174 
00175     /// Callback when some thread changed its idleness or finished
00176     /// (including service thread)
00177     void ThreadStateChanged(void);
00178 
00179     /// Get next task from queue if there is one
00180     /// If the queue is empty then return NULL.
00181     CRef<CThreadPool_Task> TryGetNextTask(void);
00182 
00183     /// Callback from thread when it is starting to execute task
00184     void TaskStarting(void);
00185 
00186     /// Callback from thread when it has finished to execute task
00187     void TaskFinished(void);
00188 
00189     /// Get the number of tasks currently waiting in queue
00190     unsigned int GetQueuedTasksCount(void) const;
00191 
00192     /// Get the number of currently executing tasks
00193     unsigned int GetExecutingTasksCount(void) const;
00194 
00195     /// Type for storing information about exclusive task launching
00196     typedef pair< TExclusiveFlags,
00197                   CRef<CThreadPool_Task> > TExclusiveTaskInfo;
00198 
00199     /// Get information about next exclusive task to execute
00200     TExclusiveTaskInfo TryGetExclusiveTask(void);
00201 
00202     /// Request suspension of the pool
00203     /// @param flags
00204     ///   Parameters for necessary exclusive execution environment
00205     void RequestSuspend(TExclusiveFlags flags);
00206 
00207     /// Resume the pool operation after exclusive task execution
00208     void ResumeWork(void);
00209 
00210     /// Check if the pool is suspended for exclusive execution
00211     bool IsSuspended(void) const;
00212 
00213     /// Check if it is already allowed to execute exclusive task
00214     bool CanDoExclusiveTask(void) const;
00215 
00216     /// Abort the pool operation
00217     ///
00218     /// @sa CThreadPool::Abort()
00219     void Abort(const CTimeSpan* timeout);
00220 
00221     /// Check if the pool is already aborted
00222     bool IsAborted(void) const;
00223 
00224     /// Finish all current threads and replace them with new ones
00225     ///
00226     /// @sa CThreadPool::FlushThreads()
00227     void FlushThreads(CThreadPool::EFlushType flush_type);
00228 
00229     /// Call the CThreadPool_Controller::HandleEvent() method of the pool
00230     /// controller with the given event type. If ThreadPool is already aborted
00231     /// and controller is reset then do nothing.
00232     void CallController(CThreadPool_Controller::EEvent event);
00233 
00234     /// Schedule running of CThreadPool_Controller::HandleEvent() with eOther
00235     /// event type
00236     void CallControllerOther(void);
00237 
00238     /// Call the CThreadPool_Controller::GetSafeSleepTime() method of the pool
00239     /// controller. If ThreadPool is already aborted and controller is reset
00240     /// then return time period of 1 second.
00241     CTimeSpan GetSafeSleepTime(void) const;
00242 
00243     /// Mark that initialization of the interface was finished
00244     void SetInterfaceStarted(void);
00245 
00246 
00247 private:
00248     /// Type of queue used for storing tasks
00249     typedef CSyncQueue< CRef<CThreadPool_Task>,
00250                         CSyncQueue_multiset< CRef<CThreadPool_Task>,
00251                                              SThreadPool_TaskCompare > >
00252             TQueue;
00253     /// Type of queue used for storing information about exclusive tasks
00254     typedef CSyncQueue<TExclusiveTaskInfo>                 TExclusiveQueue;
00255     /// Type of list of all poolled threads
00256     typedef set<CThreadPool_ThreadImpl*> TThreadsList;
00257 
00258 
00259     /// Prohibit copying and assigning
00260     CThreadPool_Impl(const CThreadPool_Impl&);
00261     CThreadPool_Impl& operator= (const CThreadPool_Impl&);
00262 
00263     /// Transform size of queue given in constructor to the size passed to
00264     /// CSyncQueue constructor.
00265     /// Method can be called only from constructor because it initializes
00266     /// value of m_IsQueueAllowed member variable.
00267     unsigned int x_GetQueueSize(unsigned int queue_size);
00268 
00269     /// Initialization of all class member variables that can be initialized
00270     /// outside of constructor
00271     /// @param pool_intf
00272     ///   ThreadPool interface object attached to this implementation
00273     /// @param controller
00274     ///   Controller for the pool
00275     void x_Init(CThreadPool*            pool_intf,
00276                 CThreadPool_Controller* controller,
00277                 CThread::TRunMode       threads_mode);
00278 
00279     /// Destructor. Will be called from CRef
00280     ~CThreadPool_Impl(void);
00281 
00282     /// Delete task from the queue
00283     /// If task does not exist in queue then does nothing.
00284     void x_RemoveTaskFromQueue(const CThreadPool_Task* task);
00285 
00286     /// Cancel all tasks waiting in the queue
00287     void x_CancelQueuedTasks(void);
00288 
00289     /// Cancel all currently executing tasks
00290     void x_CancelExecutingTasks(void);
00291 
00292     /// Type of some simple predicate
00293     ///
00294     /// @sa x_WaitForPredicate
00295     typedef bool (CThreadPool_Impl::*TWaitPredicate)(void) const;
00296 
00297     /// Check if new task can be added to the pool
00298     bool x_IsNewTaskAllowed(void) const;
00299 
00300     /// Check if new task can be added to the pool when queuing is disabled
00301     bool x_CanAddImmediateTask(void) const;
00302 
00303     /// Check if all threads in pool finished their work
00304     bool x_HasNoThreads(void) const;
00305 
00306     /// Wait for some predicate to be true
00307     /// @param wait_func
00308     ///   Predicate to wait for
00309     /// @param pool_guard
00310     ///   Guardian that locks main pool mutex at the time of method call and
00311     ///   that have to be unlocked for the time of waiting
00312     /// @param wait_sema
00313     ///   Semaphore which will be posted when predicate become true
00314     /// @param timeout
00315     ///   Maximum amount of time to wait
00316     /// @param timer
00317     ///   Timer for mesuring elapsed time. Method assumes that timer is
00318     ///   started at the moment from which timeout should be calculated.
00319     bool x_WaitForPredicate(TWaitPredicate      wait_func,
00320                             CThreadPool_Guard*  pool_guard,
00321                             CSemaphore*         wait_sema,
00322                             const CTimeSpan*    timeout,
00323                             const CStopWatch*   timer);
00324 
00325 
00326 private:
00327     /// ThreadPool interface object attached to this implementation
00328     CThreadPool*                     m_Interface;
00329     /// Reference to this pool to prevent its destroying earlier than we
00330     /// allow it to
00331     CRef<CThreadPool_Impl>           m_SelfRef;
00332     /// Timeout to wait for all threads to finish before the ThreadPool
00333     /// interface object will be able to destroy
00334     CTimeSpan                        m_DestroyTimeout;
00335     /// Queue for storing tasks
00336     TQueue                           m_Queue;
00337     /// Mutex for guarding all changes in the pool, its threads and controller
00338     CMutex                           m_MainPoolMutex;
00339     /// Semaphore for waiting for available threads to process task when
00340     /// queuing is disabled.
00341     CSemaphore                       m_RoomWait;
00342     /// Controller managing count of threads in pool
00343     CRef<CThreadPool_Controller>     m_Controller;
00344     /// List of all idle threads
00345     TThreadsList                     m_IdleThreads;
00346     /// List of all threads currently executing some tasks
00347     TThreadsList                     m_WorkingThreads;
00348     /// Running mode of all threads
00349     CThread::TRunMode                m_ThreadsMode;
00350     /// Total number of threads
00351     /// Introduced for more adequate and fast reflecting to threads starting
00352     /// and stopping events
00353     CAtomicCounter                   m_ThreadsCount;
00354     /// Number of tasks executing now
00355     /// Introduced for more adequate and fast reflecting to task executing
00356     /// start and finish events
00357     CAtomicCounter                   m_ExecutingTasks;
00358     /// Total number of tasks acquired by pool
00359     /// Includes queued tasks and executing tasks. Introduced for
00360     /// maintaining atomicity of this number changing
00361     CAtomicCounter                   m_TotalTasks;
00362     /// Flag about working with special case:
00363     /// FALSE - queue_size == 0, TRUE - queue_size > 0
00364     bool                             m_IsQueueAllowed;
00365     /// If pool is already aborted or not
00366     volatile bool                    m_Aborted;
00367     /// Semaphore for waiting for threads finishing in Abort() method
00368     ///
00369     /// @sa Abort()
00370     CSemaphore                       m_AbortWait;
00371     /// If pool is suspended for exclusive task execution or not.
00372     /// Thread Checker can complain that access to this variable everywhere is
00373     /// not guarded by some mutex. But it's okay because special care is taken
00374     /// to make any race a matter of timing - suspend will happen properly in
00375     /// any case. Also everything is written with the assumption that there's
00376     /// no other threads (besides this very thread pool) that could call any
00377     /// methods here.
00378     volatile bool                    m_Suspended;
00379     /// Requested requirements for the exclusive execution environment
00380     volatile TExclusiveFlags         m_SuspendFlags;
00381     /// Flag indicating if flush of threads requested after adding exclusive
00382     /// task but before it is started its execution.
00383     volatile bool                    m_FlushRequested;
00384     /// Thread for execution of exclusive tasks and passing of events
00385     /// to the controller
00386     CRef<CThreadPool_ServiceThread>  m_ServiceThread;
00387     /// Queue for information about exclusive tasks
00388     TExclusiveQueue                  m_ExclusiveQueue;
00389 };
00390 
00391 
00392 
00393 /// Real implementation of all CThreadPool_Thread functions
00394 class CThreadPool_ThreadImpl
00395 {
00396 public:
00397     /// Convert pointer to CThreadPool_Thread object into pointer
00398     /// to CThreadPool_ThreadImpl object. Can be done only here to avoid
00399     /// excessive friendship to CThreadPool_Thread class.
00400     static CThreadPool_ThreadImpl*
00401     s_GetImplPointer(CThreadPool_Thread* thread);
00402 
00403     /// Create new CThreadPool_Thread object
00404     /// Method introduced to avoid excessive friendship to CThreadPool_Thread
00405     /// class.
00406     ///
00407     /// @sa CThreadPool_Thread::CThreadPool_Thread()
00408     static CThreadPool_Thread* s_CreateThread(CThreadPool* pool);
00409 
00410     /// Constructor
00411     /// @param thread_intf
00412     ///   ThreadPool_Thread interface object attached to this implementation
00413     /// @param pool
00414     ///   Pool implementation owning this thread
00415     CThreadPool_ThreadImpl(CThreadPool_Thread* thread_intf,
00416                            CThreadPool_Impl*   pool);
00417 
00418     /// Destructor
00419     /// Called directly from CThreadPool destructor
00420     ~CThreadPool_ThreadImpl(void);
00421 
00422     /// Get ThreadPool interface object owning this thread
00423     ///
00424     /// @sa CThreadPool_Thread::GetPool()
00425     CThreadPool* GetPool(void) const;
00426 
00427     /// Request this thread to finish its operation.
00428     /// It renders the thread unusable and eventually ready for destruction
00429     /// (as soon as its current task is finished and there are no CRefs to
00430     /// this thread left).
00431     void RequestToFinish(void);
00432 
00433     /// If finishing of this thread is already in progress or not
00434     bool IsFinishing(void) const;
00435 
00436     /// Wake up the thread from idle state
00437     ///
00438     /// @sa x_Idle
00439     void WakeUp(void);
00440 
00441     /// Get task currently executing in the thread
00442     /// May be NULL if thread is idle or is in the middle of changing of
00443     /// current task
00444     ///
00445     /// @sa CThreadPool_Thread::GetCurrentTask()
00446     CRef<CThreadPool_Task> GetCurrentTask(void) const;
00447 
00448     /// Request to cancel current task execution
00449     void CancelCurrentTask(void);
00450 
00451     /// Implementation of thread Main() method
00452     ///
00453     /// @sa CThreadPool_Thread::Main()
00454     void Main(void);
00455 
00456     /// Implementation of threadOnExit() method
00457     ///
00458     /// @sa CThreadPool_Thread::OnExit()
00459     void OnExit(void);
00460 
00461 private:
00462     /// Prohibit copying and assigning
00463     CThreadPool_ThreadImpl(const CThreadPool_ThreadImpl&);
00464     CThreadPool_ThreadImpl& operator= (const CThreadPool_ThreadImpl&);
00465 
00466     /// Suspend until the wake up signal.
00467     ///
00468     /// @sa WakeUp()
00469     void x_Idle(void);
00470 
00471     /// Mark the thread idle or non-idle
00472     void x_SetIdleState(bool is_idle);
00473 
00474     /// Do finalizing when task finished its execution
00475     /// @param status
00476     ///   Status that the task must get
00477     void x_TaskFinished(CThreadPool_Task::EStatus status);
00478 
00479 
00480     /// ThreadPool_Thread interface object attached to this implementation
00481     CThreadPool_Thread*          m_Interface;
00482     /// Pool running the thread
00483     CRef<CThreadPool_Impl>       m_Pool;
00484     /// If the thread is already asked to finish or not
00485     volatile bool                m_Finishing;
00486     /// If cancel of the currently executing task is requested or not
00487     volatile bool                m_CancelRequested;
00488     /// Idleness of the thread
00489     bool                         m_IsIdle;
00490     /// Task currently executing in the thread
00491     CRef<CThreadPool_Task>       m_CurrentTask;
00492     /// Semaphore for waking up from idle waiting
00493     CSemaphore                   m_IdleTrigger;
00494 };
00495 
00496 
00497 
00498 /// Thread used in pool for different internal needs: execution of exclusive
00499 /// tasks and passing events to controller
00500 class CThreadPool_ServiceThread : public CThread
00501 {
00502 public:
00503     /// Constructor
00504     /// @param pool
00505     ///   ThreadPool owning this thread
00506     CThreadPool_ServiceThread(CThreadPool_Impl* pool);
00507 
00508     /// Wake up from idle waiting or waiting of pool preparing exclusive
00509     /// environment
00510     void WakeUp(void);
00511 
00512     /// Request finishing of the thread
00513     void RequestToFinish(void);
00514 
00515     /// Check if this thread have already finished or not
00516     bool IsFinished(void);
00517 
00518     /// Tell the thread that controller should handle eOther event
00519     ///
00520     /// @sa CThreadPool_Controller::HandleEvent()
00521     void NeedCallController(void);
00522 
00523 protected:
00524     /// Destructor. Will be called from CRef
00525     virtual ~CThreadPool_ServiceThread(void);
00526 
00527 private:
00528     /// Main thread execution
00529     virtual void* Main(void);
00530 
00531     /// Do "idle" work when thread is not busy executing exclusive tasks
00532     void x_Idle(void);
00533 
00534     /// Wait until pool is ready for execution of exclusive task
00535     void x_WaitForPoolStop(CThreadPool_Guard* pool_guard);
00536 
00537     /// Pool owning this thread
00538     CRef<CThreadPool_Impl>  m_Pool;
00539     /// Semaphore for idle sleeping
00540     CSemaphore              m_IdleTrigger;
00541     /// If finishing of the thread is already requested
00542     volatile bool           m_Finishing;
00543     /// If the thread has already finished its Main() method
00544     volatile bool           m_Finished;
00545     /// Currently executing exclusive task
00546     CRef<CThreadPool_Task>  m_CurrentTask;
00547     /// Flag indicating that thread should pass eOther event to the controller
00548     CAtomicCounter          m_NeedCallController;
00549 };
00550 
00551 
00552 
00553 /// Guardian for protecting pool by locking its main mutex
00554 class CThreadPool_Guard : private CMutexGuard
00555 {
00556 public:
00557     /// Constructor
00558     /// @param pool
00559     ///   Pool to protect
00560     /// @param is_active
00561     ///   If the mutex should be locked in constructor or not
00562     CThreadPool_Guard(CThreadPool_Impl* pool, bool is_active = true);
00563 
00564     /// Turn this guardian on
00565     void Guard(void);
00566 
00567     /// Turn this guardian off
00568     void Release(void);
00569 
00570 private:
00571     /// Pool protected by the guardian
00572     CThreadPool_Impl* m_Pool;
00573 };
00574 
00575 
00576 
00577 /// Special task which does nothing
00578 /// It's used in FlushThreads to force pool to wait while all old threads
00579 /// finish their operation to start new ones.
00580 ///
00581 /// @sa CThreadPool_Impl::FlushThreads()
00582 class CThreadPool_EmptyTask : public CThreadPool_Task
00583 {
00584 public:
00585     /// Empty main method
00586     virtual EStatus Execute(void) { return eCompleted; }
00587 
00588     // In the absence of the following constructor, new compilers (as required
00589     // by the new C++ standard) may fill the object memory with zeros,
00590     // erasing flags set by CObject::operator new (see CXX-1808)
00591     CThreadPool_EmptyTask(void) {}
00592 };
00593 
00594 
00595 
00596 /// Check if status returned from CThreadPool_Task::Execute() is allowed
00597 /// and change it to eCompleted value if it is invalid
00598 static inline CThreadPool_Task::EStatus
00599 s_ConvertTaskResult(CThreadPool_Task::EStatus status)
00600 {
00601     _ASSERT(status == CThreadPool_Task::eCompleted
00602             ||  status == CThreadPool_Task::eFailed
00603             ||  status == CThreadPool_Task::eCanceled);
00604 
00605     if (status != CThreadPool_Task::eCompleted
00606         &&  status != CThreadPool_Task::eFailed
00607         &&  status != CThreadPool_Task::eCanceled)
00608     {
00609         ERR_POST_X(9, Critical
00610                       << "Wrong status returned from "
00611                          "CThreadPool_Task::Execute(): "
00612                       << status);
00613         status = CThreadPool_Task::eCompleted;
00614     }
00615 
00616     return status;
00617 }
00618 
00619 
00620 
00621 const CAtomicCounter::TValue kNeedCallController_Shift = 0x0FFFFFFF;
00622 
00623 
00624 inline void
00625 CThreadPool_ServiceThread::WakeUp(void)
00626 {
00627     m_IdleTrigger.Post();
00628 }
00629 
00630 inline void
00631 CThreadPool_ServiceThread::NeedCallController(void)
00632 {
00633     if (m_NeedCallController.Add(1) > kNeedCallController_Shift + 1) {
00634         m_NeedCallController.Add(-1);
00635     }
00636     else {
00637         WakeUp();
00638     }
00639 }
00640 
00641 
00642 
00643 inline void
00644 CThreadPool_ThreadImpl::WakeUp(void)
00645 {
00646     m_IdleTrigger.Post();
00647 }
00648 
00649 
00650 
00651 inline CMutex&
00652 CThreadPool_Impl::GetMainPoolMutex(void)
00653 {
00654     return m_MainPoolMutex;
00655 }
00656 
00657 
00658 
00659 CThreadPool_Guard::CThreadPool_Guard(CThreadPool_Impl* pool, bool is_active)
00660     : CMutexGuard(eEmptyGuard),
00661       m_Pool(pool)
00662 {
00663     _ASSERT(pool);
00664 
00665     if (is_active)
00666         Guard();
00667 }
00668 
00669 void
00670 CThreadPool_Guard::Guard(void)
00671 {
00672     CMutexGuard::Guard(m_Pool->GetMainPoolMutex());
00673 }
00674 
00675 void
00676 CThreadPool_Guard::Release(void)
00677 {
00678     CMutexGuard::Release();
00679 }
00680 
00681 
00682 
00683 inline void
00684 CThreadPool_Impl::sx_SetTaskStatus(CThreadPool_Task*          task,
00685                                    CThreadPool_Task::EStatus  status)
00686 {
00687     task->x_SetStatus(status);
00688 }
00689 
00690 inline void
00691 CThreadPool_Impl::sx_RequestToCancel(CThreadPool_Task* task)
00692 {
00693     task->x_RequestToCancel();
00694 }
00695 
00696 inline CThreadPool*
00697 CThreadPool_Impl::GetPoolInterface(void) const
00698 {
00699     return m_Interface;
00700 }
00701 
00702 inline void
00703 CThreadPool_Impl::SetInterfaceStarted(void)
00704 {
00705     m_ServiceThread->Run(CThread::fRunDetached);
00706 }
00707 
00708 inline bool
00709 CThreadPool_Impl::IsAborted(void) const
00710 {
00711     return m_Aborted;
00712 }
00713 
00714 inline bool
00715 CThreadPool_Impl::IsSuspended(void) const
00716 {
00717     return m_Suspended;
00718 }
00719 
00720 inline unsigned int
00721 CThreadPool_Impl::GetThreadsCount(void) const
00722 {
00723     return (unsigned int)m_ThreadsCount.Get();
00724 }
00725 
00726 inline unsigned int
00727 CThreadPool_Impl::GetQueuedTasksCount(void) const
00728 {
00729     return (unsigned int)m_Queue.GetSize();
00730 }
00731 
00732 inline unsigned int
00733 CThreadPool_Impl::GetExecutingTasksCount(void) const
00734 {
00735     return (unsigned int)m_ExecutingTasks.Get();
00736 }
00737 
00738 inline CTimeSpan
00739 CThreadPool_Impl::GetSafeSleepTime(void) const
00740 {
00741     // m_Controller variable can be uninitialized in only when ThreadPool
00742     // is already aborted
00743     CThreadPool_Controller* controller = m_Controller.GetNCPointerOrNull();
00744     if (controller  &&  ! m_Aborted) {
00745         return controller->GetSafeSleepTime();
00746     }
00747     else {
00748         return CTimeSpan(0, 0);
00749     }
00750 }
00751 
00752 inline void
00753 CThreadPool_Impl::CallController(CThreadPool_Controller::EEvent event)
00754 {
00755     CThreadPool_Controller* controller = m_Controller.GetNCPointerOrNull();
00756     if (controller  &&  ! m_Aborted  &&
00757         (! m_Suspended  ||  event == CThreadPool_Controller::eSuspend))
00758     {
00759         controller->HandleEvent(event);
00760     }
00761 }
00762 
00763 inline void
00764 CThreadPool_Impl::CallControllerOther(void)
00765 {
00766     CThreadPool_ServiceThread* thread = m_ServiceThread;
00767     if (thread) {
00768         thread->NeedCallController();
00769     }
00770 }
00771 
00772 inline void
00773 CThreadPool_Impl::TaskStarting(void)
00774 {
00775     m_ExecutingTasks.Add(1);
00776     // In current implementation controller operation doesn't depend on this
00777     // action. So we will save mutex locks for the sake of performance
00778     //CallControllerOther();
00779 }
00780 
00781 inline void
00782 CThreadPool_Impl::TaskFinished(void)
00783 {
00784     m_ExecutingTasks.Add(-1);
00785     m_TotalTasks.Add(-1);
00786     m_RoomWait.Post();
00787     CallControllerOther();
00788 }
00789 
00790 inline void
00791 CThreadPool_Impl::ThreadStateChanged(void)
00792 {
00793     if (m_Aborted) {
00794         if (x_HasNoThreads()) {
00795             m_AbortWait.Post();
00796         }
00797     }
00798     else if (m_Suspended) {
00799         if (((m_SuspendFlags & CThreadPool::fFlushThreads)
00800                  &&  GetThreadsCount() == 0)
00801             ||  (! (m_SuspendFlags & CThreadPool::fFlushThreads)
00802                  &&  m_WorkingThreads.size() == 0))
00803         {
00804             m_ServiceThread->WakeUp();
00805         }
00806     }
00807 }
00808 
00809 inline void
00810 CThreadPool_Impl::ThreadStopped(CThreadPool_ThreadImpl* thread)
00811 {
00812     m_ThreadsCount.Add(-1);
00813 
00814     CThreadPool_Guard guard(this);
00815 
00816     m_IdleThreads.erase(thread);
00817     m_WorkingThreads.erase(thread);
00818 
00819     CallControllerOther();
00820 
00821     ThreadStateChanged();
00822 }
00823 
00824 inline CRef<CThreadPool_Task>
00825 CThreadPool_Impl::TryGetNextTask(void)
00826 {
00827     if (!m_Suspended  ||  (m_SuspendFlags & CThreadPool::fExecuteQueuedTasks)) {
00828         TQueue::TAccessGuard guard(m_Queue);
00829 
00830         if (m_Queue.GetSize() != 0) {
00831             return m_Queue.Pop();
00832         }
00833     }
00834 
00835     return CRef<CThreadPool_Task>();
00836 }
00837 
00838 inline CThreadPool_Impl::TExclusiveTaskInfo
00839 CThreadPool_Impl::TryGetExclusiveTask(void)
00840 {
00841     TExclusiveQueue::TAccessGuard guard(m_ExclusiveQueue);
00842 
00843     if (m_ExclusiveQueue.GetSize() != 0) {
00844         CThreadPool_Impl::TExclusiveTaskInfo info = m_ExclusiveQueue.Pop();
00845         if (m_FlushRequested) {
00846             info.first |= CThreadPool::fFlushThreads;
00847             m_FlushRequested = false;
00848         }
00849         return info;
00850     }
00851 
00852     return TExclusiveTaskInfo(0, CRef<CThreadPool_Task>());
00853 }
00854 
00855 inline bool
00856 CThreadPool_Impl::CanDoExclusiveTask(void) const
00857 {
00858     if ((m_SuspendFlags & CThreadPool::fExecuteQueuedTasks)
00859         &&  GetQueuedTasksCount() != 0)
00860     {
00861         return false;
00862     }
00863 
00864     if ((m_SuspendFlags & CThreadPool::fFlushThreads)
00865         &&  GetThreadsCount() != 0)
00866     {
00867         return false;
00868     }
00869 
00870     return m_WorkingThreads.size() == 0;
00871 }
00872 
00873 inline void
00874 CThreadPool_Impl::RequestSuspend(TExclusiveFlags flags)
00875 {
00876     m_SuspendFlags = flags;
00877     m_Suspended = true;
00878     if (flags & CThreadPool::fCancelQueuedTasks) {
00879         x_CancelQueuedTasks();
00880     }
00881     if (flags & CThreadPool::fCancelExecutingTasks) {
00882         x_CancelExecutingTasks();
00883     }
00884 
00885     if (flags & CThreadPool::fFlushThreads) {
00886         FinishThreads((unsigned int)m_IdleThreads.size());
00887     }
00888 
00889     CallController(CThreadPool_Controller::eSuspend);
00890 }
00891 
00892 inline void
00893 CThreadPool_Impl::ResumeWork(void)
00894 {
00895     m_Suspended = false;
00896 
00897     CallController(CThreadPool_Controller::eResume);
00898 
00899     ITERATE(TThreadsList, it, m_IdleThreads) {
00900         (*it)->WakeUp();
00901     }
00902 }
00903 
00904 
00905 
00906 inline void
00907 CThreadPool_Controller::x_AttachToPool(CThreadPool_Impl* pool)
00908 {
00909     if (m_Pool != NULL) {
00910         NCBI_THROW(CThreadPoolException, eControllerBusy,
00911                    "Cannot attach Controller to several ThreadPools.");
00912     }
00913 
00914     m_Pool = pool;
00915 }
00916 
00917 inline void
00918 CThreadPool_Controller::x_DetachFromPool(void)
00919 {
00920     m_Pool = NULL;
00921 }
00922 
00923 
00924 
00925 CThreadPool_Task::CThreadPool_Task(unsigned int priority)
00926 {
00927     x_Init(priority);
00928 }
00929 
00930 CThreadPool_Task::CThreadPool_Task(const CThreadPool_Task& other)
00931 {
00932     x_Init(other.m_Priority);
00933 }
00934 
00935 void
00936 CThreadPool_Task::x_Init(unsigned int priority)
00937 {
00938     m_Pool = NULL;
00939     m_Priority = priority;
00940     // Thread Checker complains here but this code is called only from
00941     // constructor, so no one else can reference this task yet.
00942     m_Status = eIdle;
00943     m_CancelRequested = false;
00944 }
00945 
00946 CThreadPool_Task::~CThreadPool_Task(void)
00947 {}
00948 
00949 CThreadPool_Task&
00950 CThreadPool_Task::operator= (const CThreadPool_Task& other)
00951 {
00952     if (m_IsBusy.Get() != 0) {
00953         NCBI_THROW(CThreadPoolException, eTaskBusy,
00954                    "Cannot change task when it is already added "
00955                    "to ThreadPool");
00956     }
00957 
00958     CObject::operator= (other);
00959     // There can be race with CThreadPool_Impl::AddTask()
00960     // If task will be already added to queue and priority will be then
00961     // changed queue can crush later
00962     m_Priority = other.m_Priority;
00963     return *this;
00964 }
00965 
00966 void
00967 CThreadPool_Task::OnStatusChange(EStatus /* old */)
00968 {}
00969 
00970 void
00971 CThreadPool_Task::OnCancelRequested(void)
00972 {}
00973 
00974 inline void
00975 CThreadPool_Task::x_SetOwner(CThreadPool_Impl* pool)
00976 {
00977     if (m_IsBusy.Add(1) != 1) {
00978         m_IsBusy.Add(-1);
00979         NCBI_THROW(CThreadPoolException, eTaskBusy,
00980                    "Cannot add task in ThreadPool several times");
00981     }
00982 
00983     // Thread Checker complains that this races with task canceling and
00984     // resetting m_Pool below. But it's an thread pool usage error if
00985     // someone tries to call concurrently AddTask and CancelTask. With a proper
00986     // workflow CancelTask shouldn't be called until AddTask has returned.
00987     m_Pool = pool;
00988 }
00989 
00990 inline void
00991 CThreadPool_Task::x_ResetOwner(void)
00992 {
00993     m_Pool = NULL;
00994     m_IsBusy.Add(-1);
00995 }
00996 
00997 void
00998 CThreadPool_Task::x_SetStatus(EStatus new_status)
00999 {
01000     EStatus old_status = m_Status;
01001     if (old_status != new_status  &&  old_status != eCanceled) {
01002         // Thread Checker complains here, but all status transitions are
01003         // properly guarded with different mutexes and they cannot mix with
01004         // each other.
01005         m_Status = new_status;
01006         OnStatusChange(old_status);
01007     }
01008 
01009     if (IsFinished()) {
01010         // Thread Checker complains here. See comment in x_SetOwner above for
01011         // details.
01012         m_Pool = NULL;
01013     }
01014 }
01015 
01016 inline void
01017 CThreadPool_Task::x_RequestToCancel(void)
01018 {
01019     m_CancelRequested = true;
01020 
01021     OnCancelRequested();
01022 
01023     if (GetStatus() <= eQueued) {
01024         // This can race with calling task's Execute() method but it's okay.
01025         // For details see comment in CThreadPool_ThreadImpl::Main().
01026         x_SetStatus(eCanceled);
01027     }
01028 }
01029 
01030 void
01031 CThreadPool_Task::RequestToCancel(void)
01032 {
01033     // Protect from possible reseting of the pool variable during execution
01034     CThreadPool_Impl* pool = m_Pool;
01035     if (IsFinished()) {
01036         return;
01037     }
01038     else if (!pool) {
01039         x_RequestToCancel();
01040     }
01041     else {
01042         pool->CancelTask(this);
01043     }
01044 }
01045 
01046 CThreadPool*
01047 CThreadPool_Task::GetPool(void) const
01048 {
01049     // Protect from possible reseting of the pool variable during execution
01050     CThreadPool_Impl* pool_impl = m_Pool;
01051     return pool_impl? pool_impl->GetPoolInterface(): NULL;
01052 }
01053 
01054 
01055 
01056 CThreadPool_ServiceThread::CThreadPool_ServiceThread(CThreadPool_Impl* pool)
01057     : m_Pool(pool),
01058       m_IdleTrigger(0, kMax_Int),
01059       m_Finishing(false),
01060       m_Finished(false)
01061 {
01062     _ASSERT(pool);
01063 
01064     m_NeedCallController.Set(kNeedCallController_Shift);
01065 }
01066 
01067 CThreadPool_ServiceThread::~CThreadPool_ServiceThread(void)
01068 {}
01069 
01070 inline bool
01071 CThreadPool_ServiceThread::IsFinished(void)
01072 {
01073     return m_Finished;
01074 }
01075 
01076 inline void
01077 CThreadPool_ServiceThread::x_Idle(void)
01078 {
01079     if (m_NeedCallController.Add(-1) < kNeedCallController_Shift) {
01080         m_NeedCallController.Add(1);
01081     }
01082     m_Pool->CallController(CThreadPool_Controller::eOther);
01083 
01084     CTimeSpan timeout = m_Pool->GetSafeSleepTime();
01085     m_IdleTrigger.TryWait(timeout.GetCompleteSeconds(),
01086                           timeout.GetNanoSecondsAfterSecond());
01087 }
01088 
01089 inline void
01090 CThreadPool_ServiceThread::x_WaitForPoolStop(CThreadPool_Guard* pool_guard)
01091 {
01092     while (! m_Pool->IsAborted()  &&  ! m_Pool->CanDoExclusiveTask()) {
01093         pool_guard->Release();
01094         m_IdleTrigger.Wait();
01095         pool_guard->Guard();
01096     }
01097 }
01098 
01099 inline void
01100 CThreadPool_ServiceThread::RequestToFinish(void)
01101 {
01102     m_Finishing = true;
01103     WakeUp();
01104 
01105     CThreadPool_Task* task = m_CurrentTask;
01106     if (task) {
01107         CThreadPool_Impl::sx_RequestToCancel(task);
01108     }
01109 }
01110 
01111 void*
01112 CThreadPool_ServiceThread::Main(void)
01113 {
01114     while (! m_Finishing) {
01115         CThreadPool_Impl::TExclusiveTaskInfo task_info =
01116                                               m_Pool->TryGetExclusiveTask();
01117         m_CurrentTask = task_info.second;
01118 
01119         if (m_CurrentTask.IsNull()) {
01120             x_Idle();
01121         }
01122         else {
01123             CThreadPool_Guard guard(m_Pool);
01124 
01125             if (m_Finishing) {
01126                 if (! m_CurrentTask->IsCancelRequested()) {
01127                     CThreadPool_Impl::sx_RequestToCancel(m_CurrentTask);
01128                 }
01129                 CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask,
01130                                                  CThreadPool_Task::eCanceled);
01131                 break;
01132             }
01133 
01134             m_Pool->RequestSuspend(task_info.first);
01135             x_WaitForPoolStop(&guard);
01136 
01137             if (m_Finishing) {
01138                 if (!m_CurrentTask->IsCancelRequested()) {
01139                     CThreadPool_Impl::sx_RequestToCancel(m_CurrentTask);
01140                 }
01141                 CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask,
01142                                                  CThreadPool_Task::eCanceled);
01143                 break;
01144             }
01145 
01146             guard.Release();
01147 
01148             CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask,
01149                                                CThreadPool_Task::eExecuting);
01150             try {
01151                 CThreadPool_Task::EStatus status =
01152                                 s_ConvertTaskResult(m_CurrentTask->Execute());
01153                 CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask, status);
01154             }
01155             catch (exception& e) {
01156                 ERR_POST_X(11, "Exception from exclusive task in ThreadPool: "
01157                                << e.what());
01158                 CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask,
01159                                                    CThreadPool_Task::eFailed);
01160             }
01161             catch (...) {
01162                 ERR_POST_X(12, "Unknown exception from exclusive task "
01163                                "in ThreadPool.");
01164                 CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask,
01165                                                    CThreadPool_Task::eFailed);
01166             }
01167 
01168             guard.Guard();
01169             m_Pool->ResumeWork();
01170         }
01171     }
01172 
01173     m_Finished = true;
01174     m_Pool->ThreadStateChanged();
01175 
01176     return NULL;
01177 }
01178 
01179 
01180 
01181 inline CThreadPool_ThreadImpl*
01182 CThreadPool_ThreadImpl::s_GetImplPointer(CThreadPool_Thread* thread)
01183 {
01184     return thread->m_Impl;
01185 }
01186 
01187 inline CThreadPool_Thread*
01188 CThreadPool_ThreadImpl::s_CreateThread(CThreadPool* pool)
01189 {
01190     return new CThreadPool_Thread(pool);
01191 }
01192 
01193 inline
01194 CThreadPool_ThreadImpl::CThreadPool_ThreadImpl
01195 (
01196     CThreadPool_Thread*  thread_intf,
01197     CThreadPool_Impl*    pool
01198 )
01199   : m_Interface(thread_intf),
01200     m_Pool(pool),
01201     m_Finishing(false),
01202     m_CancelRequested(false),
01203     m_IsIdle(true),
01204     m_IdleTrigger(0, kMax_Int)
01205 {}
01206 
01207 inline
01208 CThreadPool_ThreadImpl::~CThreadPool_ThreadImpl(void)
01209 {}
01210 
01211 inline CThreadPool*
01212 CThreadPool_ThreadImpl::GetPool(void) const
01213 {
01214     return m_Pool->GetPoolInterface();
01215 }
01216 
01217 inline bool
01218 CThreadPool_ThreadImpl::IsFinishing(void) const
01219 {
01220     return m_Finishing;
01221 }
01222 
01223 inline CRef<CThreadPool_Task>
01224 CThreadPool_ThreadImpl::GetCurrentTask(void) const
01225 {
01226     return m_CurrentTask;
01227 }
01228 
01229 inline void
01230 CThreadPool_ThreadImpl::x_SetIdleState(bool is_idle)
01231 {
01232     if (m_IsIdle != is_idle) {
01233         m_IsIdle = is_idle;
01234         m_Pool->SetThreadIdle(this, is_idle);
01235     }
01236 }
01237 
01238 inline void
01239 CThreadPool_ThreadImpl::x_TaskFinished(CThreadPool_Task::EStatus status)
01240 {
01241     if (m_CurrentTask->GetStatus() == CThreadPool_Task::eExecuting) {
01242         CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask, status);
01243     }
01244 
01245     m_CurrentTask.Reset();
01246     m_Pool->TaskFinished();
01247 }
01248 
01249 inline void
01250 CThreadPool_ThreadImpl::x_Idle(void)
01251 {
01252     x_SetIdleState(true);
01253 
01254     m_IdleTrigger.Wait();
01255 }
01256 
01257 inline void
01258 CThreadPool_ThreadImpl::RequestToFinish(void)
01259 {
01260     m_Finishing = true;
01261     WakeUp();
01262 }
01263 
01264 inline void
01265 CThreadPool_ThreadImpl::CancelCurrentTask(void)
01266 {
01267     // Avoid resetting of the pointer during execution
01268     // TODO: there's possible race if before we add reference on the task
01269     // m_CurrentTask will be reset to NULL, last reference will be removed and
01270     // task will be deleted. But nobody uses CThreadPool in this way, thus
01271     // this assignment is safe (ThreadPool won't own the last reference to
01272     // the task).
01273     CRef<CThreadPool_Task> task = m_CurrentTask;
01274     if (task.NotNull()) {
01275         CThreadPool_Impl::sx_RequestToCancel(task);
01276     }
01277     else {
01278         m_CancelRequested = true;
01279     }
01280 }
01281 
01282 inline void
01283 CThreadPool_ThreadImpl::Main(void)
01284 {
01285     m_Interface->Initialize();
01286 
01287     while (!m_Finishing) {
01288         // We have to heed call to CancelCurrentTask() only after this point.
01289         // So we reset value of m_CancelRequested here without any mutexes.
01290         // If CancelCurrentTask() is called earlier or this assignment races
01291         // with assignment in CancelCurrentTask() then caller of
01292         // CancelCurrentTask() will make sure that TryGetNextTask() returns
01293         // NULL.
01294         m_CancelRequested = false;
01295         m_CurrentTask = m_Pool->TryGetNextTask();
01296 
01297         if (m_CurrentTask.IsNull()) {
01298             x_Idle();
01299         }
01300         else {
01301             if (m_CurrentTask->IsCancelRequested()  ||  m_CancelRequested) {
01302                 // Some race can appear if task is canceled at the time
01303                 // when it's being queued or at the time when it's being
01304                 // unqueued
01305                 if (! m_CurrentTask->IsCancelRequested()) {
01306                     CThreadPool_Impl::sx_RequestToCancel(m_CurrentTask);
01307                 }
01308                 CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask,
01309                                                  CThreadPool_Task::eCanceled);
01310                 m_CurrentTask = NULL;
01311                 continue;
01312             }
01313 
01314             x_SetIdleState(false);
01315             m_Pool->TaskStarting();
01316 
01317             // This can race with canceling of the task. This can result in
01318             // task's Execute() method called with the state of eCanceled
01319             // already set or cancellation being totally ignored in the task's
01320             // status (m_CancelRequested will be still set). Both outcomes are
01321             // simple timing and cancellation should be checked in the task's
01322             // Execute() method anyways. The worst outcome here is that task
01323             // can be marked as eCanceled when it's completely and successfully
01324             // executed. I don't think it's too bad though.
01325             CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask,
01326                                                CThreadPool_Task::eExecuting);
01327 
01328             try {
01329                 CThreadPool_Task::EStatus status =
01330                                 s_ConvertTaskResult(m_CurrentTask->Execute());
01331                 x_TaskFinished(status);
01332             }
01333             catch (exception& e) {
01334                 ERR_POST_X(7, "Exception from task in ThreadPool: "
01335                               << e.what());
01336                 x_TaskFinished(CThreadPool_Task::eFailed);
01337             }
01338             catch (...) {
01339                 x_TaskFinished(CThreadPool_Task::eFailed);
01340                 throw;
01341             }
01342         }
01343     }
01344 }
01345 
01346 inline void
01347 CThreadPool_ThreadImpl::OnExit(void)
01348 {
01349     try {
01350         m_Interface->Finalize();
01351     } STD_CATCH_ALL_X(8, "Finalize")
01352 
01353     m_Pool->ThreadStopped(this);
01354 }
01355 
01356 
01357 
01358 inline CThreadPool_Impl*
01359 CThreadPool_Impl::s_GetImplPointer(CThreadPool* pool)
01360 {
01361     return pool->m_Impl;
01362 }
01363 
01364 inline unsigned int
01365 CThreadPool_Impl::x_GetQueueSize(unsigned int queue_size)
01366 {
01367     if (queue_size == 0) {
01368         // 10 is just in case, in fact when queue_size == 0 pool will always
01369         // check for idle threads, so tasks will never crowd in the queue
01370         queue_size = 10;
01371         m_IsQueueAllowed = false;
01372     }
01373     else {
01374         m_IsQueueAllowed = true;
01375     }
01376 
01377     return queue_size;
01378 }
01379 
01380 inline
01381 CThreadPool_Impl::CThreadPool_Impl(CThreadPool*      pool_intf,
01382                                    unsigned int      queue_size,
01383                                    unsigned int      max_threads,
01384                                    unsigned int      min_threads,
01385                                    CThread::TRunMode threads_mode)
01386     : m_Queue(x_GetQueueSize(queue_size)),
01387       m_RoomWait(0, kMax_Int),
01388       m_AbortWait(0, kMax_Int)
01389 {
01390     x_Init(pool_intf,
01391            new CThreadPool_Controller_PID(max_threads, min_threads),
01392            threads_mode);
01393 }
01394 
01395 inline
01396 CThreadPool_Impl::CThreadPool_Impl(CThreadPool*            pool_intf,
01397                                    unsigned int            queue_size,
01398                                    CThreadPool_Controller* controller,
01399                                    CThread::TRunMode       threads_mode)
01400     : m_Queue(x_GetQueueSize(queue_size)),
01401       m_RoomWait(0, kMax_Int),
01402       m_AbortWait(0, kMax_Int)
01403 {
01404     x_Init(pool_intf, controller, threads_mode);
01405 }
01406 
01407 void
01408 CThreadPool_Impl::x_Init(CThreadPool*             pool_intf,
01409                          CThreadPool_Controller*  controller,
01410                          CThread::TRunMode        threads_mode)
01411 {
01412     m_Interface = pool_intf;
01413     m_SelfRef = this;
01414     m_DestroyTimeout = CTimeSpan(10, 0);
01415     m_ThreadsCount.Set(0);
01416     m_ExecutingTasks.Set(0);
01417     m_TotalTasks.Set(0);
01418     m_Aborted = false;
01419     m_Suspended = false;
01420     m_ThreadsMode = (threads_mode | CThread::fRunDetached)
01421                      & ~CThread::fRunAllowST;
01422 
01423     controller->x_AttachToPool(this);
01424     m_Controller = controller;
01425 
01426     m_ServiceThread = new CThreadPool_ServiceThread(this);
01427 }
01428 
01429 CThreadPool_Impl::~CThreadPool_Impl(void)
01430 {}
01431 
01432 inline void
01433 CThreadPool_Impl::DestroyReference(void)
01434 {
01435     // Abort even if m_Aborted == true because threads can still be running
01436     // and we have to wait for their termination
01437     Abort(&m_DestroyTimeout);
01438 
01439     m_Interface = NULL;
01440     m_ServiceThread = NULL;
01441     m_SelfRef = NULL;
01442 }
01443 
01444 inline void
01445 CThreadPool_Impl::SetDestroyTimeout(const CTimeSpan& timeout)
01446 {
01447     m_DestroyTimeout = timeout;
01448 }
01449 
01450 inline const CTimeSpan&
01451 CThreadPool_Impl::GetDestroyTimeout(void) const
01452 {
01453     return m_DestroyTimeout;
01454 }
01455 
01456 void
01457 CThreadPool_Impl::LaunchThreads(unsigned int count)
01458 {
01459     if (count == 0)
01460         return;
01461 
01462     CThreadPool_Guard guard(this);
01463 
01464     for (unsigned int i = 0; i < count; ++i) {
01465         CRef<CThreadPool_Thread> thread(m_Interface->CreateThread());
01466         m_IdleThreads.insert(
01467                         CThreadPool_ThreadImpl::s_GetImplPointer(thread));
01468         thread->Run(m_ThreadsMode);
01469     }
01470 
01471     m_ThreadsCount.Add(count);
01472     CallControllerOther();
01473 }
01474 
01475 void
01476 CThreadPool_Impl::FinishThreads(unsigned int count)
01477 {
01478     if (count == 0)
01479         return;
01480 
01481     CThreadPool_Guard guard(this);
01482 
01483     // The cast is theoretically extraneous, but Sun's WorkShop
01484     // compiler otherwise calls the wrong versions of begin() and
01485     // end() and refuses to convert the resulting iterators.
01486     REVERSE_ITERATE(TThreadsList, it,
01487                     static_cast<const TThreadsList&>(m_IdleThreads))
01488     {
01489         // Maybe in case of several quick consecutive calls we should favor
01490         // the willing to finish several threads.
01491         //if ((*it)->IsFinishing())
01492         //    continue;
01493 
01494         (*it)->RequestToFinish();
01495         --count;
01496         if (count == 0)
01497             break;
01498     }
01499 
01500     REVERSE_ITERATE(TThreadsList, it,
01501                     static_cast<const TThreadsList&>(m_WorkingThreads))
01502     {
01503         if (count == 0)
01504             break;
01505 
01506         (*it)->RequestToFinish();
01507         --count;
01508     }
01509 }
01510 
01511 void
01512 CThreadPool_Impl::SetThreadIdle(CThreadPool_ThreadImpl* thread, bool is_idle)
01513 {
01514     CThreadPool_Guard guard(this);
01515 
01516     if (is_idle  &&  !m_Suspended  &&  m_Queue.GetSize() != 0) {
01517         thread->WakeUp();
01518         return;
01519     }
01520 
01521     TThreadsList* to_del;
01522     TThreadsList* to_ins;
01523     if (is_idle) {
01524         to_del = &m_WorkingThreads;
01525         to_ins = &m_IdleThreads;
01526     }
01527     else {
01528         to_del = &m_IdleThreads;
01529         to_ins = &m_WorkingThreads;
01530     }
01531 
01532     TThreadsList::iterator it = to_del->find(thread);
01533     if (it != to_del->end()) {
01534         to_del->erase(it);
01535     }
01536     to_ins->insert(thread);
01537 
01538     if (is_idle  &&  m_Suspended
01539         &&  (m_SuspendFlags & CThreadPool::fFlushThreads))
01540     {
01541         thread->RequestToFinish();
01542     }
01543 
01544     ThreadStateChanged();
01545 }
01546 
01547 inline bool
01548 CThreadPool_Impl::x_IsNewTaskAllowed(void) const
01549 {
01550     return !m_Aborted
01551             &&  (!m_Suspended
01552                   ||  !(m_SuspendFlags & CThreadPool::fDoNotAllowNewTasks));
01553 }
01554 
01555 bool
01556 CThreadPool_Impl::x_CanAddImmediateTask(void) const
01557 {
01558     // If pool aborts at some point in waiting it has to stop waiting
01559     // immediately
01560     return !x_IsNewTaskAllowed()
01561            ||  (!m_Suspended  &&  (unsigned int)m_TotalTasks.Get()
01562                                               < m_Controller->GetMaxThreads());
01563 }
01564 
01565 bool
01566 CThreadPool_Impl::x_HasNoThreads(void) const
01567 {
01568     CThreadPool_ServiceThread* thread = m_ServiceThread.GetNCPointerOrNull();
01569     return m_IdleThreads.size() + m_WorkingThreads.size() == 0
01570            &&  (! thread  ||  thread->IsFinished());
01571 }
01572 
01573 bool
01574 CThreadPool_Impl::x_WaitForPredicate(TWaitPredicate      wait_func,
01575                                      CThreadPool_Guard*  pool_guard,
01576                                      CSemaphore*         wait_sema,
01577                                      const CTimeSpan*    timeout,
01578                                      const CStopWatch*   timer)
01579 {
01580     while (!(this->*wait_func)()) {
01581         pool_guard->Release();
01582 
01583         if (timeout) {
01584             CTimeSpan next_tm = CTimeSpan(timeout->GetAsDouble()
01585                                               - timer->Elapsed());
01586             if (next_tm.GetSign() == eNegative) {
01587                 return false;
01588             }
01589 
01590             if (! wait_sema->TryWait(next_tm.GetCompleteSeconds(),
01591                                      next_tm.GetNanoSecondsAfterSecond()))
01592             {
01593                 return false;
01594             }
01595         }
01596         else {
01597             wait_sema->Wait();
01598         }
01599 
01600         pool_guard->Guard();
01601     }
01602 
01603     return true;
01604 }
01605 
01606 /// Throw an exception with standard message when AddTask() is called
01607 /// but ThreadPool is aborted or do not allow new tasks
01608 NCBI_NORETURN
01609 static inline void
01610 ThrowAddProhibited(void)
01611 {
01612     NCBI_THROW(CThreadPoolException, eProhibited,
01613                "Adding of new tasks is prohibited");
01614 }
01615 
01616  inline void
01617 CThreadPool_Impl::AddTask(CThreadPool_Task* task, const CTimeSpan* timeout)
01618 {
01619     _ASSERT(task);
01620 
01621     // To be sure that if simple new operator was passed as argument the task
01622     // will still be referenced even if some exception happen in this method
01623     CRef<CThreadPool_Task> task_ref(task);
01624 
01625     if (!x_IsNewTaskAllowed()) {
01626         ThrowAddProhibited();
01627     }
01628 
01629     CThreadPool_Guard guard(this, false);
01630     auto_ptr<CTimeSpan> real_timeout;
01631 
01632     if (!m_IsQueueAllowed) {
01633         guard.Guard();
01634 
01635         CStopWatch timer(CStopWatch::eStart);
01636         if (! x_WaitForPredicate(&CThreadPool_Impl::x_CanAddImmediateTask,
01637                                  &guard, &m_RoomWait, timeout, &timer))
01638         {
01639             NCBI_THROW(CSyncQueueException, eNoRoom,
01640                        "Cannot add task - all threads are busy");
01641         }
01642 
01643         if (!x_IsNewTaskAllowed()) {
01644             ThrowAddProhibited();
01645         }
01646 
01647         if (timeout) {
01648             real_timeout.reset(new CTimeSpan(timeout->GetAsDouble()
01649                                                   - timer.Elapsed()));
01650         }
01651     }
01652 
01653     task->x_SetOwner(this);
01654     task->x_SetStatus(CThreadPool_Task::eQueued);
01655     try {
01656         // Pushing to queue must be out of mutex to be able to wait
01657         // for available space.
01658         m_Queue.Push(Ref(task), real_timeout.get());
01659     }
01660     catch (...) {
01661         task->x_SetStatus(CThreadPool_Task::eIdle);
01662         task->x_ResetOwner();
01663         throw;
01664     }
01665 
01666     if (m_IsQueueAllowed) {
01667         guard.Guard();
01668     }
01669 
01670     // Check if someone aborted the pool or suspended it with cacelation of
01671     // queued tasks after we added this task to the queue but before we were
01672     // able to acquire the mutex
01673     CThreadPool::TExclusiveFlags check_flags
01674         = CThreadPool::fDoNotAllowNewTasks + CThreadPool::fCancelQueuedTasks;
01675     if (m_Aborted  ||  (m_Suspended
01676                         &&  (m_SuspendFlags & check_flags)  == check_flags))
01677     {
01678         if (m_Queue.GetSize() != 0) {
01679             x_CancelQueuedTasks();
01680         }
01681         return;
01682     }
01683 
01684     unsigned int cnt_req = (unsigned int)m_TotalTasks.Add(1);
01685 
01686     if (!m_IsQueueAllowed  &&  cnt_req > GetThreadsCount()) {
01687         LaunchThreads(cnt_req - GetThreadsCount());
01688     }
01689 
01690     if (! m_Suspended) {
01691         int count = GetQueuedTasksCount();
01692         ITERATE(TThreadsList, it, m_IdleThreads) {
01693             if (! (*it)->IsFinishing()) {
01694                 (*it)->WakeUp();
01695                 --count;
01696                 if (count == 0)
01697                     break;
01698             }
01699         }
01700     }
01701 
01702     CallControllerOther();
01703 }
01704 
01705 inline void
01706 CThreadPool_Impl::x_RemoveTaskFromQueue(const CThreadPool_Task* task)
01707 {
01708     TQueue::TAccessGuard q_guard(m_Queue);
01709 
01710     TQueue::TAccessGuard::TIterator it = q_guard.Begin();
01711     while (it != q_guard.End()  &&  *it != task) {
01712         ++it;
01713     }
01714 
01715     if (it != q_guard.End()) {
01716         q_guard.Erase(it);
01717     }
01718 }
01719 
01720 void
01721 CThreadPool_Impl::RequestExclusiveExecution(CThreadPool_Task*  task,
01722                                             TExclusiveFlags    flags)
01723 {
01724     _ASSERT(task);
01725 
01726     // To be sure that if simple new operator was passed as argument the task
01727     // will still be referenced even if some exception happen in this method
01728     CRef<CThreadPool_Task> task_ref(task);
01729 
01730     if (m_Aborted) {
01731         NCBI_THROW(CThreadPoolException, eProhibited,
01732                    "Cannot add exclusive task when ThreadPool is aborted");
01733     }
01734 
01735     task->x_SetOwner(this);
01736     task->x_SetStatus(CThreadPool_Task::eQueued);
01737     m_ExclusiveQueue.Push(TExclusiveTaskInfo(flags, Ref(task)));
01738 
01739     CThreadPool_ServiceThread* thread = m_ServiceThread;
01740     if (thread) {
01741         thread->WakeUp();
01742     }
01743 }
01744 
01745 void
01746 CThreadPool_Impl::CancelTask(CThreadPool_Task* task)
01747 {
01748     _ASSERT(task);
01749 
01750     if (task->IsFinished()) {
01751         return;
01752     }
01753     // Some race can happen here if the task is being queued now
01754     if (task->GetStatus() == CThreadPool_Task::eIdle) {
01755         task->x_RequestToCancel();
01756         return;
01757     }
01758 
01759     CThreadPool* task_pool = task->GetPool();
01760     if (task_pool != m_Interface) {
01761         if (!task_pool) {
01762             // Task have just finished - we can do nothing
01763             return;
01764         }
01765 
01766         NCBI_THROW(CThreadPoolException, eInvalid,
01767                    "Cannot cancel task execution "
01768                    "if it is inserted in another ThreadPool");
01769     }
01770 
01771     task->x_RequestToCancel();
01772     x_RemoveTaskFromQueue(task);
01773 
01774     CallControllerOther();
01775 }
01776 
01777 inline void
01778 CThreadPool_Impl::CancelTasks(TExclusiveFlags tasks_group)
01779 {
01780     _ASSERT( (tasks_group & (CThreadPool::fCancelExecutingTasks
01781                              + CThreadPool::fCancelQueuedTasks))
01782                   == tasks_group
01783              &&  tasks_group != 0);
01784 
01785     if (tasks_group & CThreadPool::fCancelQueuedTasks) {
01786         x_CancelQueuedTasks();
01787     }
01788     if (tasks_group & CThreadPool::fCancelExecutingTasks) {
01789         x_CancelExecutingTasks();
01790     }
01791 
01792     CallControllerOther();
01793 }
01794 
01795 void
01796 CThreadPool_Impl::x_CancelExecutingTasks(void)
01797 {
01798     CThreadPool_Guard guard(this);
01799 
01800     ITERATE(TThreadsList, it, m_WorkingThreads) {
01801         (*it)->CancelCurrentTask();
01802     }
01803 
01804     // CThreadPool_ThreadImpl::Main() acts not under guard, so we cannot be
01805     // sure that it doesn't have already task to execute before it marked
01806     // itself as working
01807     ITERATE(TThreadsList, it, m_IdleThreads) {
01808         (*it)->CancelCurrentTask();
01809     }
01810 }
01811 
01812 void
01813 CThreadPool_Impl::x_CancelQueuedTasks(void)
01814 {
01815     TQueue::TAccessGuard q_guard(m_Queue);
01816 
01817     for (TQueue::TAccessGuard::TIterator it = q_guard.Begin();
01818                                          it != q_guard.End(); ++it)
01819     {
01820         it->GetNCPointer()->x_RequestToCancel();
01821     }
01822 
01823     m_Queue.Clear();
01824 }
01825 
01826 inline void
01827 CThreadPool_Impl::FlushThreads(CThreadPool::EFlushType flush_type)
01828 {
01829     CThreadPool_Guard guard(this);
01830 
01831     if (m_Aborted) {
01832         NCBI_THROW(CThreadPoolException, eProhibited,
01833                    "Cannot flush threads when ThreadPool aborted");
01834     }
01835 
01836     if (flush_type == CThreadPool::eStartImmediately
01837         ||  (flush_type == CThreadPool::eWaitToFinish  &&  m_Suspended))
01838     {
01839         FinishThreads(GetThreadsCount());
01840     }
01841     else if (flush_type == CThreadPool::eWaitToFinish) {
01842         bool need_add = true;
01843 
01844         {{
01845             // To avoid races with TryGetExclusiveTask() we need to put
01846             // guard here
01847             TExclusiveQueue::TAccessGuard q_guard(m_ExclusiveQueue);
01848 
01849             if (m_ExclusiveQueue.GetSize() != 0) {
01850                 m_FlushRequested = true;
01851                 need_add = false;
01852             }
01853         }}
01854 
01855         if (need_add) {
01856             RequestExclusiveExecution(new CThreadPool_EmptyTask(),
01857                                       CThreadPool::fFlushThreads);
01858         }
01859     }
01860 }
01861 
01862 inline void
01863 CThreadPool_Impl::Abort(const CTimeSpan* timeout)
01864 {
01865     CThreadPool_Guard guard(this);
01866 
01867     // Method can be called several times in a row and every time we need
01868     // to wait for threads to finish operation
01869     m_Aborted = true;
01870 
01871     x_CancelQueuedTasks();
01872     x_CancelExecutingTasks();
01873 
01874     {{
01875         TExclusiveQueue::TAccessGuard q_guard(m_ExclusiveQueue);
01876 
01877         for (TExclusiveQueue::TAccessGuard::TIterator it = q_guard.Begin();
01878                                                 it != q_guard.End(); ++it)
01879         {
01880             it->second->x_RequestToCancel();
01881         }
01882 
01883         m_ExclusiveQueue.Clear();
01884     }}
01885 
01886     if (m_ServiceThread.NotNull()) {
01887         m_ServiceThread->RequestToFinish();
01888     }
01889 
01890     FinishThreads(GetThreadsCount());
01891 
01892     if (m_Controller.NotNull()) {
01893         m_Controller->x_DetachFromPool();
01894     }
01895 
01896     CStopWatch timer(CStopWatch::eStart);
01897     x_WaitForPredicate(&CThreadPool_Impl::x_HasNoThreads,
01898                        &guard, &m_AbortWait, timeout, &timer);
01899     m_AbortWait.Post();
01900 
01901     // This assigning can destroy the controller. If some threads are not
01902     // finished yet and at this very moment will call controller it can crash.
01903     //m_Controller = NULL;
01904 }
01905 
01906 
01907 
01908 CThreadPool_Controller::CThreadPool_Controller(unsigned int max_threads,
01909                                                unsigned int min_threads)
01910     : m_Pool(NULL),
01911       m_MinThreads(min_threads),
01912       m_MaxThreads(max_threads),
01913       m_InHandleEvent(false)
01914 {
01915     if (max_threads < min_threads  ||  max_threads == 0) {
01916         NCBI_THROW_FMT(CThreadPoolException, eInvalid,
01917                        "Invalid numbers of min and max number of threads:"
01918                        " min=" << min_threads << ", max=" << max_threads);
01919     }
01920 }
01921 
01922 CThreadPool_Controller::~CThreadPool_Controller(void)
01923 {}
01924 
01925 CThreadPool*
01926 CThreadPool_Controller::GetPool(void) const
01927 {
01928     // Avoid changing of pointer during method execution
01929     CThreadPool_Impl* pool = m_Pool;
01930     return pool? pool->GetPoolInterface(): NULL;
01931 }
01932 
01933 CMutex&
01934 CThreadPool_Controller::GetMainPoolMutex(CThreadPool* pool) const
01935 {
01936     CThreadPool_Impl* impl = CThreadPool_Impl::s_GetImplPointer(pool);
01937     if (!impl) {
01938         NCBI_THROW(CThreadPoolException, eInactive,
01939                    "Cannot do active work when not attached "
01940                    "to some ThreadPool");
01941     }
01942     return impl->GetMainPoolMutex();
01943 }
01944 
01945 void
01946 CThreadPool_Controller::EnsureLimits(void)
01947 {
01948     CThreadPool_Impl* pool = m_Pool;
01949 
01950     if (! pool)
01951         return;
01952 
01953     Uint4 count = pool->GetThreadsCount();
01954     if (count > m_MaxThreads) {
01955         pool->FinishThreads(count - m_MaxThreads);
01956     }
01957     if (count < m_MinThreads) {
01958         pool->LaunchThreads(m_MinThreads - count);
01959     }
01960 }
01961 
01962 void
01963 CThreadPool_Controller::SetMinThreads(unsigned int min_threads)
01964 {
01965     CThreadPool_Guard guard(m_Pool, false);
01966     if (m_Pool)
01967         guard.Guard();
01968 
01969     m_MinThreads = min_threads;
01970 
01971     EnsureLimits();
01972 }
01973 
01974 void
01975 CThreadPool_Controller::SetMaxThreads(unsigned int max_threads)
01976 {
01977     CThreadPool_Guard guard(m_Pool, false);
01978     if (m_Pool)
01979         guard.Guard();
01980 
01981     m_MaxThreads = max_threads;
01982 
01983     EnsureLimits();
01984 }
01985 
01986 void
01987 CThreadPool_Controller::SetThreadsCount(unsigned int count)
01988 {
01989     if (count > GetMaxThreads())
01990         count = GetMaxThreads();
01991     if (count < GetMinThreads())
01992         count = GetMinThreads();
01993 
01994     CThreadPool_Impl* pool = m_Pool;
01995 
01996     unsigned int now_cnt = pool->GetThreadsCount();
01997     if (count > now_cnt) {
01998         pool->LaunchThreads(count - now_cnt);
01999     }
02000     else if (count < now_cnt) {
02001         pool->FinishThreads(now_cnt - count);
02002     }
02003 }
02004 
02005 void
02006 CThreadPool_Controller::HandleEvent(EEvent event)
02007 {
02008     CThreadPool_Impl* pool = m_Pool;
02009     if (! pool)
02010         return;
02011 
02012     CThreadPool_Guard guard(pool);
02013 
02014     if (m_InHandleEvent  ||  pool->IsAborted()  ||  pool->IsSuspended())
02015         return;
02016 
02017     m_InHandleEvent = true;
02018 
02019     try {
02020         OnEvent(event);
02021         m_InHandleEvent = false;
02022     }
02023     catch (...) {
02024         m_InHandleEvent = false;
02025         throw;
02026     }
02027 }
02028 
02029 CTimeSpan
02030 CThreadPool_Controller::GetSafeSleepTime(void) const
02031 {
02032     if (m_Pool) {
02033         return CTimeSpan(1, 0);
02034     }
02035     else {
02036         return CTimeSpan(0, 0);
02037     }
02038 }
02039 
02040 
02041 
02042 CThreadPool_Thread::CThreadPool_Thread(CThreadPool* pool)
02043 {
02044     _ASSERT(pool);
02045 
02046     m_Impl = new CThreadPool_ThreadImpl(this,
02047                                     CThreadPool_Impl::s_GetImplPointer(pool));
02048 }
02049 
02050 CThreadPool_Thread::~CThreadPool_Thread(void)
02051 {
02052     delete m_Impl;
02053 }
02054 
02055 void
02056 CThreadPool_Thread::Initialize(void)
02057 {}
02058 
02059 void
02060 CThreadPool_Thread::Finalize(void)
02061 {}
02062 
02063 CThreadPool*
02064 CThreadPool_Thread::GetPool(void) const
02065 {
02066     return m_Impl->GetPool();
02067 }
02068 
02069 CRef<CThreadPool_Task>
02070 CThreadPool_Thread::GetCurrentTask(void) const
02071 {
02072     return m_Impl->GetCurrentTask();
02073 }
02074 
02075 void*
02076 CThreadPool_Thread::Main(void)
02077 {
02078     m_Impl->Main();
02079     return NULL;
02080 }
02081 
02082 void
02083 CThreadPool_Thread::OnExit(void)
02084 {
02085     m_Impl->OnExit();
02086 }
02087 
02088 
02089 
02090 CThreadPool::CThreadPool(unsigned int      queue_size,
02091                          unsigned int      max_threads,
02092                          unsigned int      min_threads,
02093                          CThread::TRunMode threads_mode)
02094 {
02095     m_Impl = new CThreadPool_Impl(this, queue_size, max_threads, min_threads,
02096                                   threads_mode);
02097     m_Impl->SetInterfaceStarted();
02098 }
02099 
02100 CThreadPool::CThreadPool(unsigned int            queue_size,
02101                          CThreadPool_Controller* controller,
02102                          CThread::TRunMode       threads_mode)
02103 {
02104     m_Impl = new CThreadPool_Impl(this, queue_size, controller, threads_mode);
02105     m_Impl->SetInterfaceStarted();
02106 }
02107 
02108 CThreadPool::~CThreadPool(void)
02109 {
02110     m_Impl->DestroyReference();
02111 }
02112 
02113 CMutex&
02114 CThreadPool::GetMainPoolMutex(void)
02115 {
02116     return m_Impl->GetMainPoolMutex();
02117 }
02118 
02119 CThreadPool_Thread*
02120 CThreadPool::CreateThread(void)
02121 {
02122     return CThreadPool_ThreadImpl::s_CreateThread(this);
02123 }
02124 
02125 void
02126 CThreadPool::AddTask(CThreadPool_Task* task, const CTimeSpan* timeout)
02127 {
02128     m_Impl->AddTask(task, timeout);
02129 }
02130 
02131 void
02132 CThreadPool::CancelTask(CThreadPool_Task* task)
02133 {
02134     m_Impl->CancelTask(task);
02135 }
02136 
02137 void
02138 CThreadPool::Abort(const CTimeSpan* timeout)
02139 {
02140     m_Impl->Abort(timeout);
02141 }
02142 
02143 bool
02144 CThreadPool::IsAborted(void) const
02145 {
02146     return m_Impl->IsAborted();
02147 }
02148 
02149 void
02150 CThreadPool::SetDestroyTimeout(const CTimeSpan& timeout)
02151 {
02152     m_Impl->SetDestroyTimeout(timeout);
02153 }
02154 
02155 const CTimeSpan&
02156 CThreadPool::GetDestroyTimeout(void) const
02157 {
02158     return m_Impl->GetDestroyTimeout();
02159 }
02160 
02161 void
02162 CThreadPool::RequestExclusiveExecution(CThreadPool_Task*  task,
02163                                        TExclusiveFlags    flags)
02164 {
02165     m_Impl->RequestExclusiveExecution(task, flags);
02166 }
02167 
02168 void
02169 CThreadPool::CancelTasks(TExclusiveFlags tasks_group)
02170 {
02171     m_Impl->CancelTasks(tasks_group);
02172 }
02173 
02174 void
02175 CThreadPool::FlushThreads(EFlushType flush_type)
02176 {
02177     m_Impl->FlushThreads(flush_type);
02178 }
02179 
02180 unsigned int
02181 CThreadPool::GetThreadsCount(void) const
02182 {
02183     return m_Impl->GetThreadsCount();
02184 }
02185 
02186 unsigned int
02187 CThreadPool::GetQueuedTasksCount(void) const
02188 {
02189     return m_Impl->GetQueuedTasksCount();
02190 }
02191 
02192 unsigned int
02193 CThreadPool::GetExecutingTasksCount(void) const
02194 {
02195     return m_Impl->GetExecutingTasksCount();
02196 }
02197 
02198 
02199 
02200 END_NCBI_SCOPE
Modified on Wed May 23 12:58:16 2012 by modify_doxy.py rev. 337098