src/util/thread_pool.cpp

Go to the documentation of this file.
00001 /*  $Id: thread_pool.cpp 164845 2009-07-01 16:44:27Z satskyse $
00002 * ===========================================================================
00003 *
00004 *                            PUBLIC DOMAIN NOTICE
00005 *               National Center for Biotechnology Information
00006 *
00007 *  This software/database is a "United States Government Work" under the
00008 *  terms of the United States Copyright Act.  It was written as part of
00009 *  the author's official duties as a United States Government employee and
00010 *  thus cannot be copyrighted.  This software/database is freely available
00011 *  to the public for use. The National Library of Medicine and the U.S.
00012 *  Government have not placed any restriction on its use or reproduction.
00013 *
00014 *  Although all reasonable efforts have been taken to ensure the accuracy
00015 *  and reliability of the software and data, the NLM and the U.S.
00016 *  Government do not and cannot warrant the performance or results that
00017 *  may be obtained by using this software or data. The NLM and the U.S.
00018 *  Government disclaim all warranties, express or implied, including
00019 *  warranties of performance, merchantability or fitness for any particular
00020 *  purpose.
00021 *
00022 *  Please cite the author in any work or product based on this material.
00023 *
00024 * ===========================================================================
00025 *
00026 * Author:  Pavel Ivanov
00027 *
00028 * File Description:
00029 *   Pool of threads.
00030 */
00031 
00032 #include <ncbi_pch.hpp>
00033 #include <util/thread_pool.hpp>
00034 #include <util/thread_pool_ctrl.hpp>
00035 #include <util/sync_queue.hpp>
00036 #include <util/error_codes.hpp>
00037 
00038 #define NCBI_USE_ERRCODE_X  Util_Thread
00039 
00040 BEGIN_NCBI_SCOPE
00041 
00042 
00043 class CThreadPool_Guard;
00044 class CThreadPool_ServiceThread;
00045 
00046 
00047 /// Functor to compare tasks by priority
00048 struct SThreadPool_TaskCompare {
00049     bool operator() (const CRef<CThreadPool_Task>& left,
00050                      const CRef<CThreadPool_Task>& right)
00051     {
00052         return left->GetPriority() < right->GetPriority();
00053     }
00054 };
00055 
00056 
00057 /// Real implementation of all ThreadPool functions
00058 class CThreadPool_Impl : public CObject
00059 {
00060 public:
00061     typedef CThreadPool::TExclusiveFlags  TExclusiveFlags;
00062 
00063     /// Convert pointer to CThreadPool object into pointer to CThreadPool_Impl
00064     /// object. Can be done only here to avoid excessive friendship to
00065     /// CThreadPool class.
00066     static CThreadPool_Impl* s_GetImplPointer(CThreadPool* pool);
00067 
00068     /// Call x_SetTaskStatus() for the given task.
00069     /// Method introduced to avoid excessive friendship to CThreadPool_Task
00070     /// class.
00071     ///
00072     /// @sa CThreadPool_Task::x_SetTaskStatus()
00073     static void sx_SetTaskStatus(CThreadPool_Task*          task,
00074                                  CThreadPool_Task::EStatus  status);
00075 
00076     /// Call x_RequestToCancel() for the given task.
00077     /// Method introduced to avoid excessive friendship to CThreadPool_Task
00078     /// class.
00079     ///
00080     /// @sa CThreadPool_Task::x_RequestToCancel()
00081     static void sx_RequestToCancel(CThreadPool_Task* task);
00082 
00083 
00084     /// Constructor with default controller
00085     /// @param pool_intf
00086     ///   ThreadPool interface object attached to this implementation
00087     ///
00088     /// @sa CThreadPool::CThreadPool()
00089     CThreadPool_Impl(CThreadPool*      pool_intf,
00090                      unsigned int      queue_size,
00091                      unsigned int      max_threads,
00092                      unsigned int      min_threads,
00093                      CThread::TRunMode threads_mode = CThread::fRunDefault);
00094 
00095     /// Constructor with explicitly given controller
00096     /// @param pool_intf
00097     ///   ThreadPool interface object attached to this implementation
00098     ///
00099     /// @sa CThreadPool::CThreadPool()
00100     CThreadPool_Impl(CThreadPool*        pool_intf,
00101                      unsigned int        queue_size,
00102                      CThreadPool_Controller* controller,
00103                      CThread::TRunMode   threads_mode = CThread::fRunDefault);
00104 
00105     /// Get pointer to ThreadPool interface object
00106     CThreadPool* GetPoolInterface(void) const;
00107 
00108     /// Set destroy timeout for the pool
00109     ///
00110     /// @sa CThreadPool::SetDestroyTimeout()
00111     void SetDestroyTimeout(const CTimeSpan& timeout);
00112 
00113     /// Get destroy timeout for the pool
00114     ///
00115     /// @sa CThreadPool::GetDestroyTimeout()
00116     const CTimeSpan& GetDestroyTimeout(void) const;
00117 
00118     /// Destroy reference to this object
00119     /// Method is called when CThreadPool object is destroyed which means
00120     /// that implementation can be destroyed too if there is no references
00121     /// to it left.
00122     void DestroyReference(void);
00123 
00124     /// Get main pool mutex
00125     ///
00126     /// @sa CThreadPool::GetMainPoolMutex()
00127     CMutex& GetMainPoolMutex(void);
00128 
00129     /// Add task to the pool
00130     ///
00131     /// @sa CThreadPool::AddTask()
00132     void AddTask(CThreadPool_Task* task, const CTimeSpan* timeout);
00133 
00134     /// Request to cancel the task
00135     ///
00136     /// @sa CThreadPool::CancelTask()
00137     void CancelTask(CThreadPool_Task* task);
00138 
00139     /// Cancel the selected groups of tasks in the pool
00140     ///
00141     /// @sa CThreadPool::CancelTasks()
00142     void CancelTasks(TExclusiveFlags tasks_group);
00143 
00144     /// Add the task for exclusive execution in the pool
00145     ///
00146     /// @sa CThreadPool::RequestExclusiveExecution()
00147     void RequestExclusiveExecution(CThreadPool_Task*  task,
00148                                    TExclusiveFlags    flags);
00149 
00150     /// Launch new threads in pool
00151     /// @param count
00152     ///   Number of threads to launch
00153     void LaunchThreads(unsigned int count);
00154 
00155     /// Finish threads in pool
00156     /// Stop first all idle threads then stop busy threads without
00157     /// cancelation of currently executing tasks.
00158     /// @param count
00159     ///   Number of threads to finish
00160     void FinishThreads(unsigned int count);
00161 
00162     /// Get number of threads running in the pool
00163     unsigned int GetThreadsCount(void) const;
00164 
00165     /// Mark thread as idle or non-idle
00166     /// @param thread
00167     ///   Thread to mark
00168     /// @param is_idle
00169     ///   If thread should be marked as idle or not
00170     void SetThreadIdle(CThreadPool_ThreadImpl* thread, bool is_idle);
00171 
00172     /// Callback from working thread when it finished its Main() method
00173     void ThreadStopped(CThreadPool_ThreadImpl* thread);
00174 
00175     /// Callback when some thread changed its idleness or finished
00176     /// (including service thread)
00177     void ThreadStateChanged(void);
00178 
00179     /// Get next task from queue if there is one
00180     /// If the queue is empty then return NULL.
00181     CRef<CThreadPool_Task> TryGetNextTask(void);
00182 
00183     /// Callback from thread when it is starting to execute task
00184     void TaskStarting(void);
00185 
00186     /// Callback from thread when it has finished to execute task
00187     void TaskFinished(void);
00188 
00189     /// Get the number of tasks currently waiting in queue
00190     unsigned int GetQueuedTasksCount(void) const;
00191 
00192     /// Get the number of currently executing tasks
00193     unsigned int GetExecutingTasksCount(void) const;
00194 
00195     /// Type for storing information about exclusive task launching
00196     typedef pair< TExclusiveFlags,
00197                   CRef<CThreadPool_Task> > TExclusiveTaskInfo;
00198 
00199     /// Get information about next exclusive task to execute
00200     TExclusiveTaskInfo TryGetExclusiveTask(void);
00201 
00202     /// Request suspension of the pool
00203     /// @param flags
00204     ///   Parameters for necessary exclusive execution environment
00205     void RequestSuspend(TExclusiveFlags flags);
00206 
00207     /// Resume the pool operation after exclusive task execution
00208     void ResumeWork(void);
00209 
00210     /// Check if the pool is suspended for exclusive execution
00211     bool IsSuspended(void) const;
00212 
00213     /// Check if it is already allowed to execute exclusive task
00214     bool CanDoExclusiveTask(void) const;
00215 
00216     /// Abort the pool operation
00217     ///
00218     /// @sa CThreadPool::Abort()
00219     void Abort(const CTimeSpan* timeout);
00220 
00221     /// Check if the pool is already aborted
00222     bool IsAborted(void) const;
00223 
00224     /// Finish all current threads and replace them with new ones
00225     ///
00226     /// @sa CThreadPool::FlushThreads()
00227     void FlushThreads(CThreadPool::EFlushType flush_type);
00228 
00229     /// Call the CThreadPool_Controller::HandleEvent() method of the pool
00230     /// controller with the given event type. If ThreadPool is already aborted
00231     /// and controller is reset then do nothing.
00232     void CallController(CThreadPool_Controller::EEvent event);
00233 
00234     /// Schedule running of CThreadPool_Controller::HandleEvent() with eOther
00235     /// event type
00236     void CallControllerOther(void);
00237 
00238     /// Call the CThreadPool_Controller::GetSafeSleepTime() method of the pool
00239     /// controller. If ThreadPool is already aborted and controller is reset
00240     /// then return time period of 1 second.
00241     CTimeSpan GetSafeSleepTime(void) const;
00242 
00243     /// Mark that initialization of the interface was finished
00244     void SetInterfaceStarted(void);
00245 
00246 
00247 private:
00248     /// Type of queue used for storing tasks
00249     typedef CSyncQueue< CRef<CThreadPool_Task>,
00250                         CSyncQueue_multiset< CRef<CThreadPool_Task>,
00251                                              SThreadPool_TaskCompare > >
00252             TQueue;
00253     /// Type of queue used for storing information about exclusive tasks
00254     typedef CSyncQueue<TExclusiveTaskInfo>                 TExclusiveQueue;
00255     /// Type of list of all poolled threads
00256     typedef set<CThreadPool_ThreadImpl*> TThreadsList;
00257 
00258 
00259     /// Prohibit copying and assigning
00260     CThreadPool_Impl(const CThreadPool_Impl&);
00261     CThreadPool_Impl& operator= (const CThreadPool_Impl&);
00262 
00263     /// Transform size of queue given in constructor to the size passed to
00264     /// CSyncQueue constructor.
00265     /// Method can be called only from constructor because it initializes
00266     /// value of m_IsQueueAllowed member variable.
00267     unsigned int x_GetQueueSize(unsigned int queue_size);
00268 
00269     /// Initialization of all class member variables that can be initialized
00270     /// outside of constructor
00271     /// @param pool_intf
00272     ///   ThreadPool interface object attached to this implementation
00273     /// @param controller
00274     ///   Controller for the pool
00275     void x_Init(CThreadPool*            pool_intf,
00276                 CThreadPool_Controller* controller,
00277                 CThread::TRunMode       threads_mode);
00278 
00279     /// Destructor. Will be called from CRef
00280     ~CThreadPool_Impl(void);
00281 
00282     /// Delete task from the queue
00283     /// If task does not exist in queue then does nothing.
00284     void x_RemoveTaskFromQueue(const CThreadPool_Task* task);
00285 
00286     /// Cancel all tasks waiting in the queue
00287     void x_CancelQueuedTasks(void);
00288 
00289     /// Cancel all currently executing tasks
00290     void x_CancelExecutingTasks(void);
00291 
00292     /// Type of some simple predicate
00293     ///
00294     /// @sa x_WaitForPredicate
00295     typedef bool (CThreadPool_Impl::*TWaitPredicate)(void) const;
00296 
00297     /// Check if new task can be added to the pool
00298     bool x_IsNewTaskAllowed(void) const;
00299 
00300     /// Check if new task can be added to the pool when queuing is disabled
00301     bool x_CanAddImmediateTask(void) const;
00302 
00303     /// Check if all threads in pool finished their work
00304     bool x_HasNoThreads(void) const;
00305 
00306     /// Wait for some predicate to be true
00307     /// @param wait_func
00308     ///   Predicate to wait for
00309     /// @param pool_guard
00310     ///   Guardian that locks main pool mutex at the time of method call and
00311     ///   that have to be unlocked for the time of waiting
00312     /// @param wait_sema
00313     ///   Semaphore which will be posted when predicate become true
00314     /// @param timeout
00315     ///   Maximum amount of time to wait
00316     /// @param timer
00317     ///   Timer for mesuring elapsed time. Method assumes that timer is
00318     ///   started at the moment from which timeout should be calculated.
00319     bool x_WaitForPredicate(TWaitPredicate      wait_func,
00320                             CThreadPool_Guard*  pool_guard,
00321                             CSemaphore*         wait_sema,
00322                             const CTimeSpan*    timeout,
00323                             const CStopWatch*   timer);
00324 
00325 
00326 private:
00327     /// ThreadPool interface object attached to this implementation
00328     CThreadPool*                     m_Interface;
00329     /// Reference to this pool to prevent its destroying earlier than we
00330     /// allow it to
00331     CRef<CThreadPool_Impl>           m_SelfRef;
00332     /// Timeout to wait for all threads to finish before the ThreadPool
00333     /// interface object will be able to destroy
00334     CTimeSpan                        m_DestroyTimeout;
00335     /// Queue for storing tasks
00336     TQueue                           m_Queue;
00337     /// Mutex for guarding all changes in the pool, its threads and controller
00338     CMutex                           m_MainPoolMutex;
00339     /// Semaphore for waiting for available threads to process task when
00340     /// queuing is disabled.
00341     CSemaphore                       m_RoomWait;
00342     /// Controller managing count of threads in pool
00343     CRef<CThreadPool_Controller>     m_Controller;
00344     /// List of all idle threads
00345     TThreadsList                     m_IdleThreads;
00346     /// List of all threads currently executing some tasks
00347     TThreadsList                     m_WorkingThreads;
00348     /// Running mode of all threads
00349     CThread::TRunMode                m_ThreadsMode;
00350     /// Total number of threads
00351     /// Introduced for more adequate and fast reflecting to threads starting
00352     /// and stopping events
00353     CAtomicCounter                   m_ThreadsCount;
00354     /// Number of tasks executing now
00355     /// Introduced for more adequate and fast reflecting to task executing
00356     /// start and finish events
00357     CAtomicCounter                   m_ExecutingTasks;
00358     /// Total number of tasks acquired by pool
00359     /// Includes queued tasks and executing tasks. Introduced for
00360     /// maintaining atomicity of this number changing
00361     CAtomicCounter                   m_TotalTasks;
00362     /// Flag about working with special case:
00363     /// FALSE - queue_size == 0, TRUE - queue_size > 0
00364     bool                             m_IsQueueAllowed;
00365     /// If pool is already aborted or not
00366     volatile bool                    m_Aborted;
00367     /// Semaphore for waiting for threads finishing in Abort() method
00368     ///
00369     /// @sa Abort()
00370     CSemaphore                       m_AbortWait;
00371     /// If pool is suspended for exclusive task execution or not
00372     volatile bool                    m_Suspended;
00373     /// Requested requirements for the exclusive execution environment
00374     volatile TExclusiveFlags         m_SuspendFlags;
00375     /// Flag indicating if flush of threads requested after adding exclusive
00376     /// task but before it is started its execution.
00377     volatile bool                    m_FlushRequested;
00378     /// Thread for execution of exclusive tasks and passing of events
00379     /// to the controller
00380     CRef<CThreadPool_ServiceThread>  m_ServiceThread;
00381     /// Queue for information about exclusive tasks
00382     TExclusiveQueue                  m_ExclusiveQueue;
00383 };
00384 
00385 
00386 
00387 /// Real implementation of all CThreadPool_Thread functions
00388 class CThreadPool_ThreadImpl
00389 {
00390 public:
00391     /// Convert pointer to CThreadPool_Thread object into pointer
00392     /// to CThreadPool_ThreadImpl object. Can be done only here to avoid
00393     /// excessive friendship to CThreadPool_Thread class.
00394     static CThreadPool_ThreadImpl*
00395     s_GetImplPointer(CThreadPool_Thread* thread);
00396 
00397     /// Create new CThreadPool_Thread object
00398     /// Method introduced to avoid excessive friendship to CThreadPool_Thread
00399     /// class.
00400     ///
00401     /// @sa CThreadPool_Thread::CThreadPool_Thread()
00402     static CThreadPool_Thread* s_CreateThread(CThreadPool* pool);
00403 
00404     /// Constructor
00405     /// @param thread_intf
00406     ///   ThreadPool_Thread interface object attached to this implementation
00407     /// @param pool
00408     ///   Pool implementation owning this thread
00409     CThreadPool_ThreadImpl(CThreadPool_Thread* thread_intf,
00410                            CThreadPool_Impl*   pool);
00411 
00412     /// Destructor
00413     /// Called directly from CThreadPool destructor
00414     ~CThreadPool_ThreadImpl(void);
00415 
00416     /// Get ThreadPool interface object owning this thread
00417     ///
00418     /// @sa CThreadPool_Thread::GetPool()
00419     CThreadPool* GetPool(void) const;
00420 
00421     /// Request this thread to finish its operation.
00422     /// It renders the thread unusable and eventually ready for destruction
00423     /// (as soon as its current task is finished and there are no CRefs to
00424     /// this thread left).
00425     void RequestToFinish(void);
00426 
00427     /// If finishing of this thread is already in progress or not
00428     bool IsFinishing(void) const;
00429 
00430     /// Wake up the thread from idle state
00431     ///
00432     /// @sa x_Idle
00433     void WakeUp(void);
00434 
00435     /// Get task currently executing in the thread
00436     /// May be NULL if thread is idle or is in the middle of changing of
00437     /// current task
00438     ///
00439     /// @sa CThreadPool_Thread::GetCurrentTask()
00440     CRef<CThreadPool_Task> GetCurrentTask(void) const;
00441 
00442     /// Request to cancel current task execution
00443     void CancelCurrentTask(void);
00444 
00445     /// Implementation of thread Main() method
00446     ///
00447     /// @sa CThreadPool_Thread::Main()
00448     void Main(void);
00449 
00450     /// Implementation of threadOnExit() method
00451     ///
00452     /// @sa CThreadPool_Thread::OnExit()
00453     void OnExit(void);
00454 
00455 private:
00456     /// Prohibit copying and assigning
00457     CThreadPool_ThreadImpl(const CThreadPool_ThreadImpl&);
00458     CThreadPool_ThreadImpl& operator= (const CThreadPool_ThreadImpl&);
00459 
00460     /// Suspend until the wake up signal.
00461     ///
00462     /// @sa WakeUp()
00463     void x_Idle(void);
00464 
00465     /// Mark the thread idle or non-idle
00466     void x_SetIdleState(bool is_idle);
00467 
00468     /// Do finalizing when task finished its execution
00469     /// @param status
00470     ///   Status that the task must get
00471     void x_TaskFinished(CThreadPool_Task::EStatus status);
00472 
00473 
00474     /// ThreadPool_Thread interface object attached to this implementation
00475     CThreadPool_Thread*          m_Interface;
00476     /// Pool running the thread
00477     CRef<CThreadPool_Impl>       m_Pool;
00478     /// If the thread is already asked to finish or not
00479     volatile bool                m_Finishing;
00480     /// If cancel of the currently executing task is requested or not
00481     volatile bool                m_CancelRequested;
00482     /// Idleness of the thread
00483     bool                         m_IsIdle;
00484     /// Task currently executing in the thread
00485     CRef<CThreadPool_Task>       m_CurrentTask;
00486     /// Semaphore for waking up from idle waiting
00487     CSemaphore                   m_IdleTrigger;
00488 };
00489 
00490 
00491 
00492 /// Thread used in pool for different internal needs: execution of exclusive
00493 /// tasks and passing events to controller
00494 class CThreadPool_ServiceThread : public CThread
00495 {
00496 public:
00497     /// Constructor
00498     /// @param pool
00499     ///   ThreadPool owning this thread
00500     CThreadPool_ServiceThread(CThreadPool_Impl* pool);
00501 
00502     /// Wake up from idle waiting or waiting of pool preparing exclusive
00503     /// environment
00504     void WakeUp(void);
00505 
00506     /// Request finishing of the thread
00507     void RequestToFinish(void);
00508 
00509     /// Check if this thread have already finished or not
00510     bool IsFinished(void);
00511 
00512     /// Tell the thread that controller should handle eOther event
00513     ///
00514     /// @sa CThreadPool_Controller::HandleEvent()
00515     void NeedCallController(void);
00516 
00517 protected:
00518     /// Destructor. Will be called from CRef
00519     virtual ~CThreadPool_ServiceThread(void);
00520 
00521 private:
00522     /// Main thread execution
00523     virtual void* Main(void);
00524 
00525     /// Do "idle" work when thread is not busy executing exclusive tasks
00526     void x_Idle(void);
00527 
00528     /// Wait until pool is ready for execution of exclusive task
00529     void x_WaitForPoolStop(CThreadPool_Guard* pool_guard);
00530 
00531     /// Pool owning this thread
00532     CRef<CThreadPool_Impl>  m_Pool;
00533     /// Semaphore for idle sleeping
00534     CSemaphore              m_IdleTrigger;
00535     /// If finishing of the thread is already requested
00536     volatile bool           m_Finishing;
00537     /// If the thread has already finished its Main() method
00538     volatile bool           m_Finished;
00539     /// Currently executing exclusive task
00540     CRef<CThreadPool_Task>  m_CurrentTask;
00541     /// Flag indicating that thread should pass eOther event to the controller
00542     CAtomicCounter          m_NeedCallController;
00543 };
00544 
00545 
00546 
00547 /// Guardian for protecting pool by locking its main mutex
00548 class CThreadPool_Guard : private CMutexGuard
00549 {
00550 public:
00551     /// Constructor
00552     /// @param pool
00553     ///   Pool to protect
00554     /// @param is_active
00555     ///   If the mutex should be locked in constructor or not
00556     CThreadPool_Guard(CThreadPool_Impl* pool, bool is_active = true);
00557 
00558     /// Turn this guardian on
00559     void Guard(void);
00560 
00561     /// Turn this guardian off
00562     void Release(void);
00563 
00564 private:
00565     /// Pool protected by the guardian
00566     CThreadPool_Impl* m_Pool;
00567 };
00568 
00569 
00570 
00571 /// Special task which does nothing
00572 /// It's used in FlushThreads to force pool to wait while all old threads
00573 /// finish their operation to start new ones.
00574 ///
00575 /// @sa CThreadPool_Impl::FlushThreads()
00576 class CThreadPool_EmptyTask : public CThreadPool_Task
00577 {
00578 public:
00579     /// Empty main method
00580     virtual EStatus Execute(void) { return eCompleted; }
00581 
00582 #ifdef NCBI_COMPILER_ICC
00583     // In the absence of the following constructor,
00584     // ICC fills the object memory with zeros,
00585     // erasing flags set by CObject::operator new
00586     CThreadPool_EmptyTask(void) {}
00587 #endif
00588 };
00589 
00590 
00591 
00592 /// Check if status returned from CThreadPool_Task::Execute() is allowed
00593 /// and change it to eCompleted value if it is invalid
00594 static inline CThreadPool_Task::EStatus
00595 s_ConvertTaskResult(CThreadPool_Task::EStatus status)
00596 {
00597     _ASSERT(status == CThreadPool_Task::eCompleted
00598             ||  status == CThreadPool_Task::eFailed
00599             ||  status == CThreadPool_Task::eCanceled);
00600 
00601     if (status != CThreadPool_Task::eCompleted
00602         &&  status != CThreadPool_Task::eFailed
00603         &&  status != CThreadPool_Task::eCanceled)
00604     {
00605         ERR_POST_X(9, Critical
00606                       << "Wrong status returned from "
00607                          "CThreadPool_Task::Execute(): "
00608                       << status);
00609         status = CThreadPool_Task::eCompleted;
00610     }
00611 
00612     return status;
00613 }
00614 
00615 
00616 
00617 const CAtomicCounter::TValue kNeedCallController_Shift = 0x0FFFFFFF;
00618 
00619 
00620 inline void
00621 CThreadPool_ServiceThread::WakeUp(void)
00622 {
00623     m_IdleTrigger.Post();
00624 }
00625 
00626 inline void
00627 CThreadPool_ServiceThread::NeedCallController(void)
00628 {
00629     if (m_NeedCallController.Add(1) > kNeedCallController_Shift + 1) {
00630         m_NeedCallController.Add(-1);
00631     }
00632     else {
00633         WakeUp();
00634     }
00635 }
00636 
00637 
00638 
00639 inline void
00640 CThreadPool_ThreadImpl::WakeUp(void)
00641 {
00642     m_IdleTrigger.Post();
00643 }
00644 
00645 
00646 
00647 inline CMutex&
00648 CThreadPool_Impl::GetMainPoolMutex(void)
00649 {
00650     return m_MainPoolMutex;
00651 }
00652 
00653 
00654 
00655 CThreadPool_Guard::CThreadPool_Guard(CThreadPool_Impl* pool, bool is_active)
00656     : CMutexGuard(eEmptyGuard),
00657       m_Pool(pool)
00658 {
00659     _ASSERT(pool);
00660 
00661     if (is_active)
00662         Guard();
00663 }
00664 
00665 void
00666 CThreadPool_Guard::Guard(void)
00667 {
00668     CMutexGuard::Guard(m_Pool->GetMainPoolMutex());
00669 }
00670 
00671 void
00672 CThreadPool_Guard::Release(void)
00673 {
00674     CMutexGuard::Release();
00675 }
00676 
00677 
00678 
00679 inline void
00680 CThreadPool_Impl::sx_SetTaskStatus(CThreadPool_Task*          task,
00681                                    CThreadPool_Task::EStatus  status)
00682 {
00683     task->x_SetStatus(status);
00684 }
00685 
00686 inline void
00687 CThreadPool_Impl::sx_RequestToCancel(CThreadPool_Task* task)
00688 {
00689     task->x_RequestToCancel();
00690 }
00691 
00692 inline CThreadPool*
00693 CThreadPool_Impl::GetPoolInterface(void) const
00694 {
00695     return m_Interface;
00696 }
00697 
00698 inline void
00699 CThreadPool_Impl::SetInterfaceStarted(void)
00700 {
00701     m_ServiceThread->Run(CThread::fRunDetached);
00702 }
00703 
00704 inline bool
00705 CThreadPool_Impl::IsAborted(void) const
00706 {
00707     return m_Aborted;
00708 }
00709 
00710 inline bool
00711 CThreadPool_Impl::IsSuspended(void) const
00712 {
00713     return m_Suspended;
00714 }
00715 
00716 inline unsigned int
00717 CThreadPool_Impl::GetThreadsCount(void) const
00718 {
00719     return (unsigned int)m_ThreadsCount.Get();
00720 }
00721 
00722 inline unsigned int
00723 CThreadPool_Impl::GetQueuedTasksCount(void) const
00724 {
00725     return (unsigned int)m_Queue.GetSize();
00726 }
00727 
00728 inline unsigned int
00729 CThreadPool_Impl::GetExecutingTasksCount(void) const
00730 {
00731     return (unsigned int)m_ExecutingTasks.Get();
00732 }
00733 
00734 inline CTimeSpan
00735 CThreadPool_Impl::GetSafeSleepTime(void) const
00736 {
00737     // m_Controller variable can be uninitialized in only when ThreadPool
00738     // is already aborted
00739     CThreadPool_Controller* controller = m_Controller.GetNCPointerOrNull();
00740     if (controller  &&  ! m_Aborted) {
00741         return controller->GetSafeSleepTime();
00742     }
00743     else {
00744         return CTimeSpan(0, 0);
00745     }
00746 }
00747 
00748 inline void
00749 CThreadPool_Impl::CallController(CThreadPool_Controller::EEvent event)
00750 {
00751     CThreadPool_Controller* controller = m_Controller.GetNCPointerOrNull();
00752     if (controller  &&  ! m_Aborted  &&
00753         (! m_Suspended  ||  event == CThreadPool_Controller::eSuspend))
00754     {
00755         controller->HandleEvent(event);
00756     }
00757 }
00758 
00759 inline void
00760 CThreadPool_Impl::CallControllerOther(void)
00761 {
00762     CThreadPool_ServiceThread* thread = m_ServiceThread;
00763     if (thread) {
00764         thread->NeedCallController();
00765     }
00766 }
00767 
00768 inline void
00769 CThreadPool_Impl::TaskStarting(void)
00770 {
00771     m_ExecutingTasks.Add(1);
00772     // In current implementation controller operation doesn't depend on this
00773     // action. So we will save mutex locks for the sake of performance
00774     //CallControllerOther();
00775 }
00776 
00777 inline void
00778 CThreadPool_Impl::TaskFinished(void)
00779 {
00780     m_ExecutingTasks.Add(-1);
00781     m_TotalTasks.Add(-1);
00782     m_RoomWait.Post();
00783     CallControllerOther();
00784 }
00785 
00786 inline void
00787 CThreadPool_Impl::ThreadStateChanged(void)
00788 {
00789     if (m_Aborted) {
00790         if (x_HasNoThreads()) {
00791             m_AbortWait.Post();
00792         }
00793     }
00794     else if (m_Suspended) {
00795         if ((m_SuspendFlags & CThreadPool::fFlushThreads)
00796                  &&  GetThreadsCount() == 0
00797             ||  ! (m_SuspendFlags & CThreadPool::fFlushThreads)
00798                  &&  m_WorkingThreads.size() == 0)
00799         {
00800             m_ServiceThread->WakeUp();
00801         }
00802     }
00803 }
00804 
00805 inline void
00806 CThreadPool_Impl::ThreadStopped(CThreadPool_ThreadImpl* thread)
00807 {
00808     m_ThreadsCount.Add(-1);
00809 
00810     CThreadPool_Guard guard(this);
00811 
00812     m_IdleThreads.erase(thread);
00813     m_WorkingThreads.erase(thread);
00814 
00815     CallControllerOther();
00816 
00817     ThreadStateChanged();
00818 }
00819 
00820 inline CRef<CThreadPool_Task>
00821 CThreadPool_Impl::TryGetNextTask(void)
00822 {
00823     if (!m_Suspended  ||  (m_SuspendFlags & CThreadPool::fExecuteQueuedTasks)) {
00824         TQueue::TAccessGuard guard(m_Queue);
00825 
00826         if (m_Queue.GetSize() != 0) {
00827             return m_Queue.Pop();
00828         }
00829     }
00830 
00831     return CRef<CThreadPool_Task>();
00832 }
00833 
00834 inline CThreadPool_Impl::TExclusiveTaskInfo
00835 CThreadPool_Impl::TryGetExclusiveTask(void)
00836 {
00837     TExclusiveQueue::TAccessGuard guard(m_ExclusiveQueue);
00838 
00839     if (m_ExclusiveQueue.GetSize() != 0) {
00840         CThreadPool_Impl::TExclusiveTaskInfo info = m_ExclusiveQueue.Pop();
00841         if (m_FlushRequested) {
00842             info.first |= CThreadPool::fFlushThreads;
00843             m_FlushRequested = false;
00844         }
00845         return info;
00846     }
00847 
00848     return TExclusiveTaskInfo(0, CRef<CThreadPool_Task>());
00849 }
00850 
00851 inline bool
00852 CThreadPool_Impl::CanDoExclusiveTask(void) const
00853 {
00854     if ((m_SuspendFlags & CThreadPool::fExecuteQueuedTasks)
00855         &&  GetQueuedTasksCount() != 0)
00856     {
00857         return false;
00858     }
00859 
00860     if ((m_SuspendFlags & CThreadPool::fFlushThreads)
00861         &&  GetThreadsCount() != 0)
00862     {
00863         return false;
00864     }
00865 
00866     return m_WorkingThreads.size() == 0;
00867 }
00868 
00869 inline void
00870 CThreadPool_Impl::RequestSuspend(TExclusiveFlags flags)
00871 {
00872     m_SuspendFlags = flags;
00873     m_Suspended = true;
00874     if (flags & CThreadPool::fCancelQueuedTasks) {
00875         x_CancelQueuedTasks();
00876     }
00877     if (flags & CThreadPool::fCancelExecutingTasks) {
00878         x_CancelExecutingTasks();
00879     }
00880 
00881     if (flags & CThreadPool::fFlushThreads) {
00882         FinishThreads(m_IdleThreads.size());
00883     }
00884 
00885     CallController(CThreadPool_Controller::eSuspend);
00886 }
00887 
00888 inline void
00889 CThreadPool_Impl::ResumeWork(void)
00890 {
00891     m_Suspended = false;
00892 
00893     CallController(CThreadPool_Controller::eResume);
00894 
00895     ITERATE(TThreadsList, it, m_IdleThreads) {
00896         (*it)->WakeUp();
00897     }
00898 }
00899 
00900 
00901 
00902 inline void
00903 CThreadPool_Controller::x_AttachToPool(CThreadPool_Impl* pool)
00904 {
00905     if (m_Pool != NULL) {
00906         NCBI_THROW(CThreadPoolException, eControllerBusy,
00907                    "Cannot attach Controller to several ThreadPools.");
00908     }
00909 
00910     m_Pool = pool;
00911 }
00912 
00913 inline void
00914 CThreadPool_Controller::x_DetachFromPool(void)
00915 {
00916     m_Pool = NULL;
00917 }
00918 
00919 
00920 
00921 CThreadPool_Task::CThreadPool_Task(unsigned int priority)
00922 {
00923     x_Init(priority);
00924 }
00925 
00926 CThreadPool_Task::CThreadPool_Task(const CThreadPool_Task& other)
00927 {
00928     x_Init(other.m_Priority);
00929 }
00930 
00931 void
00932 CThreadPool_Task::x_Init(unsigned int priority)
00933 {
00934     m_Pool = NULL;
00935     m_Priority = priority;
00936     m_Status = eIdle;
00937     m_CancelRequested = false;
00938 }
00939 
00940 CThreadPool_Task::~CThreadPool_Task(void)
00941 {}
00942 
00943 CThreadPool_Task&
00944 CThreadPool_Task::operator= (const CThreadPool_Task& other)
00945 {
00946     if (m_IsBusy.Get() != 0) {
00947         NCBI_THROW(CThreadPoolException, eTaskBusy,
00948                    "Cannot change task when it is already added "
00949                    "to ThreadPool");
00950     }
00951 
00952     CObject::operator= (other);
00953     // There can be race with CThreadPool_Impl::AddTask()
00954     // If task will be already added to queue and priority will be then
00955     // changed queue can crush later
00956     m_Priority = other.m_Priority;
00957     return *this;
00958 }
00959 
00960 void
00961 CThreadPool_Task::OnStatusChange(EStatus /* old */)
00962 {}
00963 
00964 void
00965 CThreadPool_Task::OnCancelRequested(void)
00966 {}
00967 
00968 inline void
00969 CThreadPool_Task::x_SetOwner(CThreadPool_Impl* pool)
00970 {
00971     if (m_IsBusy.Add(1) != 1) {
00972         m_IsBusy.Add(-1);
00973         NCBI_THROW(CThreadPoolException, eTaskBusy,
00974                    "Cannot add task in ThreadPool several times");
00975     }
00976 
00977     m_Pool = pool;
00978 }
00979 
00980 inline void
00981 CThreadPool_Task::x_ResetOwner(void)
00982 {
00983     m_Pool = NULL;
00984     m_IsBusy.Add(-1);
00985 }
00986 
00987 void
00988 CThreadPool_Task::x_SetStatus(EStatus new_status)
00989 {
00990     EStatus old_status = m_Status;
00991     if (old_status != new_status  &&  old_status != eCanceled) {
00992         m_Status = new_status;
00993         OnStatusChange(old_status);
00994     }
00995 
00996     if (IsFinished()) {
00997         m_Pool = NULL;
00998     }
00999 }
01000 
01001 inline void
01002 CThreadPool_Task::x_RequestToCancel(void)
01003 {
01004     m_CancelRequested = true;
01005 
01006     OnCancelRequested();
01007 
01008     if (GetStatus() <= eQueued) {
01009         x_SetStatus(eCanceled);
01010     }
01011 }
01012 
01013 void
01014 CThreadPool_Task::RequestToCancel(void)
01015 {
01016     // Protect from possible reseting of the pool variable during execution
01017     CThreadPool_Impl* pool = m_Pool;
01018     if (IsFinished()) {
01019         return;
01020     }
01021     else if (!pool) {
01022         x_RequestToCancel();
01023     }
01024     else {
01025         pool->CancelTask(this);
01026     }
01027 }
01028 
01029 CThreadPool*
01030 CThreadPool_Task::GetPool(void) const
01031 {
01032     // Protect from possible reseting of the pool variable during execution
01033     CThreadPool_Impl* pool_impl = m_Pool;
01034     return pool_impl? pool_impl->GetPoolInterface(): NULL;
01035 }
01036 
01037 
01038 
01039 CThreadPool_ServiceThread::CThreadPool_ServiceThread(CThreadPool_Impl* pool)
01040     : m_Pool(pool),
01041       m_IdleTrigger(0, kMax_Int),
01042       m_Finishing(false),
01043       m_Finished(false)
01044 {
01045     _ASSERT(pool);
01046 
01047     m_NeedCallController.Set(kNeedCallController_Shift);
01048 }
01049 
01050 CThreadPool_ServiceThread::~CThreadPool_ServiceThread(void)
01051 {}
01052 
01053 inline bool
01054 CThreadPool_ServiceThread::IsFinished(void)
01055 {
01056     return m_Finished;
01057 }
01058 
01059 inline void
01060 CThreadPool_ServiceThread::x_Idle(void)
01061 {
01062     if (m_NeedCallController.Add(-1) < kNeedCallController_Shift) {
01063         m_NeedCallController.Add(1);
01064     }
01065     m_Pool->CallController(CThreadPool_Controller::eOther);
01066 
01067     CTimeSpan timeout = m_Pool->GetSafeSleepTime();
01068     m_IdleTrigger.TryWait(timeout.GetCompleteSeconds(),
01069                           timeout.GetNanoSecondsAfterSecond());
01070 }
01071 
01072 inline void
01073 CThreadPool_ServiceThread::x_WaitForPoolStop(CThreadPool_Guard* pool_guard)
01074 {
01075     while (! m_Pool->IsAborted()  &&  ! m_Pool->CanDoExclusiveTask()) {
01076         pool_guard->Release();
01077         m_IdleTrigger.Wait();
01078         pool_guard->Guard();
01079     }
01080 }
01081 
01082 inline void
01083 CThreadPool_ServiceThread::RequestToFinish(void)
01084 {
01085     m_Finishing = true;
01086     WakeUp();
01087 
01088     CThreadPool_Task* task = m_CurrentTask;
01089     if (task) {
01090         CThreadPool_Impl::sx_RequestToCancel(task);
01091     }
01092 }
01093 
01094 void*
01095 CThreadPool_ServiceThread::Main(void)
01096 {
01097     while (! m_Finishing) {
01098         CThreadPool_Impl::TExclusiveTaskInfo task_info =
01099                                               m_Pool->TryGetExclusiveTask();
01100         m_CurrentTask = task_info.second;
01101 
01102         if (m_CurrentTask.IsNull()) {
01103             x_Idle();
01104         }
01105         else {
01106             CThreadPool_Guard guard(m_Pool);
01107 
01108             if (m_Finishing) {
01109                 if (! m_CurrentTask->IsCancelRequested()) {
01110                     CThreadPool_Impl::sx_RequestToCancel(m_CurrentTask);
01111                 }
01112                 CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask,
01113                                                  CThreadPool_Task::eCanceled);
01114                 break;
01115             }
01116 
01117             m_Pool->RequestSuspend(task_info.first);
01118             x_WaitForPoolStop(&guard);
01119 
01120             if (m_Finishing) {
01121                 if (!m_CurrentTask->IsCancelRequested()) {
01122                     CThreadPool_Impl::sx_RequestToCancel(m_CurrentTask);
01123                 }
01124                 CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask,
01125                                                  CThreadPool_Task::eCanceled);
01126                 break;
01127             }
01128 
01129             guard.Release();
01130 
01131             CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask,
01132                                                CThreadPool_Task::eExecuting);
01133             try {
01134                 CThreadPool_Task::EStatus status =
01135                                 s_ConvertTaskResult(m_CurrentTask->Execute());
01136                 CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask, status);
01137             }
01138             catch (exception& e) {
01139                 ERR_POST_X(11, "Exception from exclusive task in ThreadPool: "
01140                                << e.what());
01141                 CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask,
01142                                                    CThreadPool_Task::eFailed);
01143             }
01144             catch (...) {
01145                 ERR_POST_X(12, "Unknown exception from exclusive task "
01146                                "in ThreadPool.");
01147                 CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask,
01148                                                    CThreadPool_Task::eFailed);
01149             }
01150 
01151             guard.Guard();
01152             m_Pool->ResumeWork();
01153         }
01154     }
01155 
01156     m_Finished = true;
01157     m_Pool->ThreadStateChanged();
01158 
01159     return NULL;
01160 }
01161 
01162 
01163 
01164 inline CThreadPool_ThreadImpl*
01165 CThreadPool_ThreadImpl::s_GetImplPointer(CThreadPool_Thread* thread)
01166 {
01167     return thread->m_Impl;
01168 }
01169 
01170 inline CThreadPool_Thread*
01171 CThreadPool_ThreadImpl::s_CreateThread(CThreadPool* pool)
01172 {
01173     return new CThreadPool_Thread(pool);
01174 }
01175 
01176 inline
01177 CThreadPool_ThreadImpl::CThreadPool_ThreadImpl
01178 (
01179     CThreadPool_Thread*  thread_intf,
01180     CThreadPool_Impl*    pool
01181 )
01182   : m_Interface(thread_intf),
01183     m_Pool(pool),
01184     m_Finishing(false),
01185     m_CancelRequested(false),
01186     m_IsIdle(true),
01187     m_IdleTrigger(0, kMax_Int)
01188 {}
01189 
01190 inline
01191 CThreadPool_ThreadImpl::~CThreadPool_ThreadImpl(void)
01192 {}
01193 
01194 inline CThreadPool*
01195 CThreadPool_ThreadImpl::GetPool(void) const
01196 {
01197     return m_Pool->GetPoolInterface();
01198 }
01199 
01200 inline bool
01201 CThreadPool_ThreadImpl::IsFinishing(void) const
01202 {
01203     return m_Finishing;
01204 }
01205 
01206 inline CRef<CThreadPool_Task>
01207 CThreadPool_ThreadImpl::GetCurrentTask(void) const
01208 {
01209     return m_CurrentTask;
01210 }
01211 
01212 inline void
01213 CThreadPool_ThreadImpl::x_SetIdleState(bool is_idle)
01214 {
01215     if (m_IsIdle != is_idle) {
01216         m_IsIdle = is_idle;
01217         m_Pool->SetThreadIdle(this, is_idle);
01218     }
01219 }
01220 
01221 inline void
01222 CThreadPool_ThreadImpl::x_TaskFinished(CThreadPool_Task::EStatus status)
01223 {
01224     if (m_CurrentTask->GetStatus() == CThreadPool_Task::eExecuting) {
01225         CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask, status);
01226     }
01227 
01228     m_CurrentTask.Reset();
01229     m_Pool->TaskFinished();
01230 }
01231 
01232 inline void
01233 CThreadPool_ThreadImpl::x_Idle(void)
01234 {
01235     x_SetIdleState(true);
01236 
01237     m_IdleTrigger.Wait();
01238 }
01239 
01240 inline void
01241 CThreadPool_ThreadImpl::RequestToFinish(void)
01242 {
01243     m_Finishing = true;
01244     WakeUp();
01245 }
01246 
01247 inline void
01248 CThreadPool_ThreadImpl::CancelCurrentTask(void)
01249 {
01250     // Avoid resetting of the pointer during execution
01251     CRef<CThreadPool_Task> task = m_CurrentTask;
01252     if (task.NotNull()) {
01253         CThreadPool_Impl::sx_RequestToCancel(task);
01254     }
01255     else {
01256         m_CancelRequested = true;
01257     }
01258 }
01259 
01260 inline void
01261 CThreadPool_ThreadImpl::Main(void)
01262 {
01263     m_Interface->Initialize();
01264 
01265     while (!m_Finishing) {
01266         m_CancelRequested = false;
01267         m_CurrentTask = m_Pool->TryGetNextTask();
01268 
01269         if (m_CurrentTask.IsNull()) {
01270             x_Idle();
01271         }
01272         else {
01273             if (m_CurrentTask->IsCancelRequested()  ||  m_CancelRequested) {
01274                 // Some race can appear if task is canceled at the time
01275                 // when it's being queued or at the time when it's being
01276                 // unqueued
01277                 if (! m_CurrentTask->IsCancelRequested()) {
01278                     CThreadPool_Impl::sx_RequestToCancel(m_CurrentTask);
01279                 }
01280                 CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask,
01281                                                  CThreadPool_Task::eCanceled);
01282                 m_CurrentTask = NULL;
01283                 continue;
01284             }
01285 
01286             x_SetIdleState(false);
01287             m_Pool->TaskStarting();
01288 
01289             CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask,
01290                                                CThreadPool_Task::eExecuting);
01291 
01292             try {
01293                 CThreadPool_Task::EStatus status =
01294                                 s_ConvertTaskResult(m_CurrentTask->Execute());
01295                 x_TaskFinished(status);
01296             }
01297             catch (exception& e) {
01298                 ERR_POST_X(7, "Exception from task in ThreadPool: "
01299                               << e.what());
01300                 x_TaskFinished(CThreadPool_Task::eFailed);
01301             }
01302             catch (...) {
01303                 x_TaskFinished(CThreadPool_Task::eFailed);
01304                 throw;
01305             }
01306         }
01307     }
01308 }
01309 
01310 inline void
01311 CThreadPool_ThreadImpl::OnExit(void)
01312 {
01313     try {
01314         m_Interface->Finalize();
01315     } STD_CATCH_ALL_X(8, "Finalize")
01316 
01317     m_Pool->ThreadStopped(this);
01318 }
01319 
01320 
01321 
01322 inline CThreadPool_Impl*
01323 CThreadPool_Impl::s_GetImplPointer(CThreadPool* pool)
01324 {
01325     return pool->m_Impl;
01326 }
01327 
01328 inline unsigned int
01329 CThreadPool_Impl::x_GetQueueSize(unsigned int queue_size)
01330 {
01331     if (queue_size == 0) {
01332         // 10 is just in case, in fact when queue_size == 0 pool will always
01333         // check for idle threads, so tasks will never crowd in the queue
01334         queue_size = 10;
01335         m_IsQueueAllowed = false;
01336     }
01337     else {
01338         m_IsQueueAllowed = true;
01339     }
01340 
01341     return queue_size;
01342 }
01343 
01344 inline
01345 CThreadPool_Impl::CThreadPool_Impl(CThreadPool*      pool_intf,
01346                                    unsigned int      queue_size,
01347                                    unsigned int      max_threads,
01348                                    unsigned int      min_threads,
01349                                    CThread::TRunMode threads_mode)
01350     : m_Queue(x_GetQueueSize(queue_size)),
01351       m_RoomWait(0, kMax_Int),
01352       m_AbortWait(0, kMax_Int)
01353 {
01354     x_Init(pool_intf,
01355            new CThreadPool_Controller_PID(max_threads, min_threads),
01356            threads_mode);
01357 }
01358 
01359 inline
01360 CThreadPool_Impl::CThreadPool_Impl(CThreadPool*            pool_intf,
01361                                    unsigned int            queue_size,
01362                                    CThreadPool_Controller* controller,
01363                                    CThread::TRunMode       threads_mode)
01364     : m_Queue(x_GetQueueSize(queue_size)),
01365       m_RoomWait(0, kMax_Int),
01366       m_AbortWait(0, kMax_Int)
01367 {
01368     x_Init(pool_intf, controller, threads_mode);
01369 }
01370 
01371 void
01372 CThreadPool_Impl::x_Init(CThreadPool*             pool_intf,
01373                          CThreadPool_Controller*  controller,
01374                          CThread::TRunMode        threads_mode)
01375 {
01376     m_Interface = pool_intf;
01377     m_SelfRef = this;
01378     m_DestroyTimeout = CTimeSpan(10, 0);
01379     m_ThreadsCount.Set(0);
01380     m_ExecutingTasks.Set(0);
01381     m_TotalTasks.Set(0);
01382     m_Aborted = false;
01383     m_Suspended = false;
01384     m_ThreadsMode = (threads_mode | CThread::fRunDetached)
01385                      & ~CThread::fRunAllowST;
01386 
01387     controller->x_AttachToPool(this);
01388     m_Controller = controller;
01389 
01390     m_ServiceThread = new CThreadPool_ServiceThread(this);
01391 }
01392 
01393 CThreadPool_Impl::~CThreadPool_Impl(void)
01394 {}
01395 
01396 inline void
01397 CThreadPool_Impl::DestroyReference(void)
01398 {
01399     // Abort even if m_Aborted == true because threads can still be running
01400     // and we have to wait for their termination
01401     Abort(&m_DestroyTimeout);
01402 
01403     m_Interface = NULL;
01404     m_ServiceThread = NULL;
01405     m_SelfRef = NULL;
01406 }
01407 
01408 inline void
01409 CThreadPool_Impl::SetDestroyTimeout(const CTimeSpan& timeout)
01410 {
01411     m_DestroyTimeout = timeout;
01412 }
01413 
01414 inline const CTimeSpan&
01415 CThreadPool_Impl::GetDestroyTimeout(void) const
01416 {
01417     return m_DestroyTimeout;
01418 }
01419 
01420 void
01421 CThreadPool_Impl::LaunchThreads(unsigned int count)
01422 {
01423     if (count == 0)
01424         return;
01425 
01426     CThreadPool_Guard guard(this);
01427 
01428     for (unsigned int i = 0; i < count; ++i) {
01429         CRef<CThreadPool_Thread> thread(m_Interface->CreateThread());
01430         m_IdleThreads.insert(
01431                         CThreadPool_ThreadImpl::s_GetImplPointer(thread));
01432         thread->Run(m_ThreadsMode);
01433     }
01434 
01435     m_ThreadsCount.Add(count);
01436     CallControllerOther();
01437 }
01438 
01439 void
01440 CThreadPool_Impl::FinishThreads(unsigned int count)
01441 {
01442     if (count == 0)
01443         return;
01444 
01445     CThreadPool_Guard guard(this);
01446 
01447     // The cast is theoretically extraneous, but Sun's WorkShop
01448     // compiler otherwise calls the wrong versions of begin() and
01449     // end() and refuses to convert the resulting iterators.
01450     REVERSE_ITERATE(TThreadsList, it,
01451                     static_cast<const TThreadsList&>(m_IdleThreads))
01452     {
01453         // Maybe in case of several quick consecutive calls we should favor
01454         // the willing to finish several threads.
01455         //if ((*it)->IsFinishing())
01456         //    continue;
01457 
01458         (*it)->RequestToFinish();
01459         --count;
01460         if (count == 0)
01461             break;
01462     }
01463 
01464     REVERSE_ITERATE(TThreadsList, it,
01465                     static_cast<const TThreadsList&>(m_WorkingThreads))
01466     {
01467         if (count == 0)
01468             break;
01469 
01470         (*it)->RequestToFinish();
01471         --count;
01472     }
01473 }
01474 
01475 void
01476 CThreadPool_Impl::SetThreadIdle(CThreadPool_ThreadImpl* thread, bool is_idle)
01477 {
01478     CThreadPool_Guard guard(this);
01479 
01480     TThreadsList* to_del;
01481     TThreadsList* to_ins;
01482     if (is_idle) {
01483         to_del = &m_WorkingThreads;
01484         to_ins = &m_IdleThreads;
01485     }
01486     else {
01487         to_del = &m_IdleThreads;
01488         to_ins = &m_WorkingThreads;
01489     }
01490 
01491     TThreadsList::iterator it = to_del->find(thread);
01492     if (it != to_del->end()) {
01493         to_del->erase(it);
01494     }
01495     to_ins->insert(thread);
01496 
01497     if (is_idle  &&  m_Suspended
01498         &&  (m_SuspendFlags & CThreadPool::fFlushThreads))
01499     {
01500         thread->RequestToFinish();
01501     }
01502 
01503     ThreadStateChanged();
01504 }
01505 
01506 inline bool
01507 CThreadPool_Impl::x_IsNewTaskAllowed(void) const
01508 {
01509     return !m_Aborted
01510             &&  (!m_Suspended
01511                   ||  !(m_SuspendFlags & CThreadPool::fDoNotAllowNewTasks));
01512 }
01513 
01514 bool
01515 CThreadPool_Impl::x_CanAddImmediateTask(void) const
01516 {
01517     // If pool aborts at some point in waiting it has to stop waiting
01518     // immediately
01519     return !x_IsNewTaskAllowed()
01520         ||  !m_Suspended  &&  (unsigned int)m_TotalTasks.Get()
01521                                               < m_Controller->GetMaxThreads();
01522 }
01523 
01524 bool
01525 CThreadPool_Impl::x_HasNoThreads(void) const
01526 {
01527     CThreadPool_ServiceThread* thread = m_ServiceThread.GetNCPointerOrNull();
01528     return m_IdleThreads.size() + m_WorkingThreads.size() == 0
01529            &&  (! thread  ||  thread->IsFinished());
01530 }
01531 
01532 bool
01533 CThreadPool_Impl::x_WaitForPredicate(TWaitPredicate      wait_func,
01534                                      CThreadPool_Guard*  pool_guard,
01535                                      CSemaphore*         wait_sema,
01536                                      const CTimeSpan*    timeout,
01537                                      const CStopWatch*   timer)
01538 {
01539     while (!(this->*wait_func)()) {
01540         pool_guard->Release();
01541 
01542         if (timeout) {
01543             CTimeSpan next_tm = CTimeSpan(timeout->GetAsDouble()
01544                                               - timer->Elapsed());
01545             if (next_tm.GetSign() == eNegative) {
01546                 return false;
01547             }
01548 
01549             if (! wait_sema->TryWait(next_tm.GetCompleteSeconds(),
01550                                      next_tm.GetNanoSecondsAfterSecond()))
01551             {
01552                 return false;
01553             }
01554         }
01555         else {
01556             wait_sema->Wait();
01557         }
01558 
01559         pool_guard->Guard();
01560     }
01561 
01562     return true;
01563 }
01564 
01565 /// Throw an exception with standard message when AddTask() is called
01566 /// but ThreadPool is aborted or do not allow new tasks
01567 static inline void
01568 ThrowAddProhibited(void)
01569 {
01570     NCBI_THROW(CThreadPoolException, eProhibited,
01571                "Adding of new tasks is prohibited");
01572 }
01573 
01574  inline void
01575 CThreadPool_Impl::AddTask(CThreadPool_Task* task, const CTimeSpan* timeout)
01576 {
01577     _ASSERT(task);
01578 
01579     // To be sure that if simple new operator was passed as argument the task
01580     // will still be referenced even if some exception happen in this method
01581     CRef<CThreadPool_Task> task_ref(task);
01582 
01583     if (!x_IsNewTaskAllowed()) {
01584         ThrowAddProhibited();
01585     }
01586 
01587     CThreadPool_Guard guard(this, false);
01588     auto_ptr<CTimeSpan> real_timeout;
01589 
01590     if (!m_IsQueueAllowed) {
01591         guard.Guard();
01592 
01593         CStopWatch timer(CStopWatch::eStart);
01594         if (! x_WaitForPredicate(&CThreadPool_Impl::x_CanAddImmediateTask,
01595                                  &guard, &m_RoomWait, timeout, &timer))
01596         {
01597             NCBI_THROW(CSyncQueueException, eNoRoom,
01598                        "Cannot add task - all threads are busy");
01599         }
01600 
01601         if (!x_IsNewTaskAllowed()) {
01602             ThrowAddProhibited();
01603         }
01604 
01605         if (timeout) {
01606             real_timeout.reset(new CTimeSpan(timeout->GetAsDouble()
01607                                                   - timer.Elapsed()));
01608         }
01609     }
01610 
01611     task->x_SetOwner(this);
01612     task->x_SetStatus(CThreadPool_Task::eQueued);
01613     try {
01614         // Pushing to queue must be out of mutex to be able to wait
01615         // for available space.
01616         m_Queue.Push(Ref(task), real_timeout.get());
01617     }
01618     catch (...) {
01619         task->x_SetStatus(CThreadPool_Task::eIdle);
01620         task->x_ResetOwner();
01621         throw;
01622     }
01623 
01624     if (m_IsQueueAllowed) {
01625         guard.Guard();
01626     }
01627 
01628     // Check if someone aborted the pool or suspended it with cacelation of
01629     // queued tasks after we added this task to the queue but before we were
01630     // able to acquire the mutex
01631     CThreadPool::TExclusiveFlags check_flags
01632         = CThreadPool::fDoNotAllowNewTasks + CThreadPool::fCancelQueuedTasks;
01633     if (m_Aborted  ||  m_Suspended
01634                        &&  (m_SuspendFlags & check_flags)  == check_flags)
01635     {
01636         if (m_Queue.GetSize() != 0) {
01637             x_CancelQueuedTasks();
01638         }
01639         return;
01640     }
01641 
01642     unsigned int cnt_req = (unsigned int)m_TotalTasks.Add(1);
01643 
01644     if (!m_IsQueueAllowed  &&  cnt_req > GetThreadsCount()) {
01645         LaunchThreads(cnt_req - GetThreadsCount());
01646     }
01647 
01648     if (! m_Suspended) {
01649         int count = GetQueuedTasksCount();
01650         ITERATE(TThreadsList, it, m_IdleThreads) {
01651             if (! (*it)->IsFinishing()) {
01652                 (*it)->WakeUp();
01653                 --count;
01654                 if (count == 0)
01655                     break;
01656             }
01657         }
01658     }
01659 
01660     CallControllerOther();
01661 }
01662 
01663 inline void
01664 CThreadPool_Impl::x_RemoveTaskFromQueue(const CThreadPool_Task* task)
01665 {
01666     TQueue::TAccessGuard q_guard(m_Queue);
01667 
01668     TQueue::TAccessGuard::TIterator it = q_guard.Begin();
01669     while (it != q_guard.End()  &&  *it != task) {
01670         ++it;
01671     }
01672 
01673     if (it != q_guard.End()) {
01674         q_guard.Erase(it);
01675     }
01676 }
01677 
01678 void
01679 CThreadPool_Impl::RequestExclusiveExecution(CThreadPool_Task*  task,
01680                                             TExclusiveFlags    flags)
01681 {
01682     _ASSERT(task);
01683 
01684     // To be sure that if simple new operator was passed as argument the task
01685     // will still be referenced even if some exception happen in this method
01686     CRef<CThreadPool_Task> task_ref(task);
01687 
01688     if (m_Aborted) {
01689         NCBI_THROW(CThreadPoolException, eProhibited,
01690                    "Cannot add exclusive task when ThreadPool is aborted");
01691     }
01692 
01693     task->x_SetOwner(this);
01694     task->x_SetStatus(CThreadPool_Task::eQueued);
01695     m_ExclusiveQueue.Push(TExclusiveTaskInfo(flags, Ref(task)));
01696 
01697     CThreadPool_ServiceThread* thread = m_ServiceThread;
01698     if (thread) {
01699         thread->WakeUp();
01700     }
01701 }
01702 
01703 void
01704 CThreadPool_Impl::CancelTask(CThreadPool_Task* task)
01705 {
01706     _ASSERT(task);
01707 
01708     if (task->IsFinished()) {
01709         return;
01710     }
01711     // Some race can happen here if the task is being queued now
01712     if (task->GetStatus() == CThreadPool_Task::eIdle) {
01713         task->x_RequestToCancel();
01714         return;
01715     }
01716 
01717     CThreadPool* task_pool = task->GetPool();
01718     if (task_pool != m_Interface) {
01719         if (!task_pool) {
01720             // Task have just finished - we can do nothing
01721             return;
01722         }
01723 
01724         NCBI_THROW(CThreadPoolException, eInvalid,
01725                    "Cannot cancel task execution "
01726                    "if it is inserted in another ThreadPool");
01727     }
01728 
01729     task->x_RequestToCancel();
01730     x_RemoveTaskFromQueue(task);
01731 
01732     CallControllerOther();
01733 }
01734 
01735 inline void
01736 CThreadPool_Impl::CancelTasks(TExclusiveFlags tasks_group)
01737 {
01738     _ASSERT( (tasks_group & (CThreadPool::fCancelExecutingTasks
01739                              + CThreadPool::fCancelQueuedTasks))
01740                   == tasks_group
01741              &&  tasks_group != 0);
01742 
01743     if (tasks_group & CThreadPool::fCancelQueuedTasks) {
01744         x_CancelQueuedTasks();
01745     }
01746     if (tasks_group & CThreadPool::fCancelExecutingTasks) {
01747         x_CancelExecutingTasks();
01748     }
01749 
01750     CallControllerOther();
01751 }
01752 
01753 void
01754 CThreadPool_Impl::x_CancelExecutingTasks(void)
01755 {
01756     CThreadPool_Guard guard(this);
01757 
01758     ITERATE(TThreadsList, it, m_WorkingThreads) {
01759         (*it)->CancelCurrentTask();
01760     }
01761 
01762     // CThreadPool_ThreadImpl::Main() acts not under guard, so we cannot be
01763     // sure that it doesn't have already task to execute before it marked
01764     // itself as working
01765     ITERATE(TThreadsList, it, m_IdleThreads) {
01766         (*it)->CancelCurrentTask();
01767     }
01768 }
01769 
01770 void
01771 CThreadPool_Impl::x_CancelQueuedTasks(void)
01772 {
01773     TQueue::TAccessGuard q_guard(m_Queue);
01774 
01775     for (TQueue::TAccessGuard::TIterator it = q_guard.Begin();
01776                                          it != q_guard.End(); ++it)
01777     {
01778         it->GetNCPointer()->x_RequestToCancel();
01779     }
01780 
01781     m_Queue.Clear();
01782 }
01783 
01784 inline void
01785 CThreadPool_Impl::FlushThreads(CThreadPool::EFlushType flush_type)
01786 {
01787     CThreadPool_Guard guard(this);
01788 
01789     if (m_Aborted) {
01790         NCBI_THROW(CThreadPoolException, eProhibited,
01791                    "Cannot flush threads when ThreadPool aborted");
01792     }
01793 
01794     if (flush_type == CThreadPool::eStartImmediately
01795         ||  flush_type == CThreadPool::eWaitToFinish  &&  m_Suspended)
01796     {
01797         FinishThreads(GetThreadsCount());
01798     }
01799     else if (flush_type == CThreadPool::eWaitToFinish) {
01800         bool need_add = true;
01801 
01802         {{
01803             // To avoid races with TryGetExclusiveTask() we need to put
01804             // guard here
01805             TExclusiveQueue::TAccessGuard q_guard(m_ExclusiveQueue);
01806 
01807             if (m_ExclusiveQueue.GetSize() != 0) {
01808                 m_FlushRequested = true;
01809                 need_add = false;
01810             }
01811         }}
01812 
01813         if (need_add) {
01814             RequestExclusiveExecution(new CThreadPool_EmptyTask(),
01815                                       CThreadPool::fFlushThreads);
01816         }
01817     }
01818 }
01819 
01820 inline void
01821 CThreadPool_Impl::Abort(const CTimeSpan* timeout)
01822 {
01823     CThreadPool_Guard guard(this);
01824 
01825     // Method can be called several times in a row and every time we need
01826     // to wait for threads to finish operation
01827     m_Aborted = true;
01828 
01829     x_CancelQueuedTasks();
01830     x_CancelExecutingTasks();
01831 
01832     {{
01833         TExclusiveQueue::TAccessGuard q_guard(m_ExclusiveQueue);
01834 
01835         for (TExclusiveQueue::TAccessGuard::TIterator it = q_guard.Begin();
01836                                                 it != q_guard.End(); ++it)
01837         {
01838             it->second->x_RequestToCancel();
01839         }
01840 
01841         m_ExclusiveQueue.Clear();
01842     }}
01843 
01844     if (m_ServiceThread.NotNull()) {
01845         m_ServiceThread->RequestToFinish();
01846     }
01847 
01848     FinishThreads(GetThreadsCount());
01849 
01850     if (m_Controller.NotNull()) {
01851         m_Controller->x_DetachFromPool();
01852     }
01853 
01854     CStopWatch timer(CStopWatch::eStart);
01855     x_WaitForPredicate(&CThreadPool_Impl::x_HasNoThreads,
01856                        &guard, &m_AbortWait, timeout, &timer);
01857     m_AbortWait.Post();
01858 
01859     // This assigning can destroy the controller. If some threads are not
01860     // finished yet and at this very moment will call controller it can crush.
01861     //m_Controller = NULL;
01862 }
01863 
01864 
01865 
01866 CThreadPool_Controller::CThreadPool_Controller(unsigned int max_threads,
01867                                                unsigned int min_threads)
01868     : m_Pool(NULL),
01869       m_MinThreads(min_threads),
01870       m_MaxThreads(max_threads),
01871       m_InHandleEvent(false)
01872 {
01873 }
01874 
01875 CThreadPool_Controller::~CThreadPool_Controller(void)
01876 {}
01877 
01878 CThreadPool*
01879 CThreadPool_Controller::GetPool(void) const
01880 {
01881     // Avoid changing of pointer during method execution
01882     CThreadPool_Impl* pool = m_Pool;
01883     return pool? pool->GetPoolInterface(): NULL;
01884 }
01885 
01886 CMutex&
01887 CThreadPool_Controller::GetMainPoolMutex(void) const
01888 {
01889     CThreadPool_Impl* pool = m_Pool;
01890     if (!pool) {
01891         NCBI_THROW(CThreadPoolException, eInactive,
01892                    "Cannot do active work when not attached "
01893                    "to some ThreadPool");
01894     }
01895     return pool->GetMainPoolMutex();
01896 }
01897 
01898 void
01899 CThreadPool_Controller::EnsureLimits(void)
01900 {
01901     CThreadPool_Impl* pool = m_Pool;
01902 
01903     if (! pool)
01904         return;
01905 
01906     Uint4 count = pool->GetThreadsCount();
01907     if (count > m_MaxThreads) {
01908         pool->FinishThreads(count - m_MaxThreads);
01909     }
01910     if (count < m_MinThreads) {
01911         pool->LaunchThreads(m_MinThreads - count);
01912     }
01913 }
01914 
01915 void
01916 CThreadPool_Controller::SetMinThreads(unsigned int min_threads)
01917 {
01918     CThreadPool_Guard guard(m_Pool, false);
01919     if (m_Pool)
01920         guard.Guard();
01921 
01922     m_MinThreads = min_threads;
01923 
01924     EnsureLimits();
01925 }
01926 
01927 void
01928 CThreadPool_Controller::SetMaxThreads(unsigned int max_threads)
01929 {
01930     CThreadPool_Guard guard(m_Pool, false);
01931     if (m_Pool)
01932         guard.Guard();
01933 
01934     m_MaxThreads = max_threads;
01935 
01936     EnsureLimits();
01937 }
01938 
01939 void
01940 CThreadPool_Controller::SetThreadsCount(unsigned int count)
01941 {
01942     if (count > GetMaxThreads())
01943         count = GetMaxThreads();
01944     if (count < GetMinThreads())
01945         count = GetMinThreads();
01946 
01947     CThreadPool_Impl* pool = m_Pool;
01948 
01949     unsigned int now_cnt = pool->GetThreadsCount();
01950     if (count > now_cnt) {
01951         pool->LaunchThreads(count - now_cnt);
01952     }
01953     else if (count < now_cnt) {
01954         pool->FinishThreads(now_cnt - count);
01955     }
01956 }
01957 
01958 void
01959 CThreadPool_Controller::HandleEvent(EEvent event)
01960 {
01961     CThreadPool_Impl* pool = m_Pool;
01962     if (! pool)
01963         return;
01964 
01965     CThreadPool_Guard guard(pool);
01966 
01967     if (m_InHandleEvent  ||  pool->IsAborted()  ||  pool->IsSuspended())
01968         return;
01969 
01970     m_InHandleEvent = true;
01971 
01972     try {
01973         OnEvent(event);
01974         m_InHandleEvent = false;
01975     }
01976     catch (...) {
01977         m_InHandleEvent = false;
01978         throw;
01979     }
01980 }
01981 
01982 CTimeSpan
01983 CThreadPool_Controller::GetSafeSleepTime(void) const
01984 {
01985     if (m_Pool) {
01986         return CTimeSpan(1, 0);
01987     }
01988     else {
01989         return CTimeSpan(0, 0);
01990     }
01991 }
01992 
01993 
01994 
01995 CThreadPool_Thread::CThreadPool_Thread(CThreadPool* pool)
01996 {
01997     _ASSERT(pool);
01998 
01999     m_Impl = new CThreadPool_ThreadImpl(this,
02000                                     CThreadPool_Impl::s_GetImplPointer(pool));
02001 }
02002 
02003 CThreadPool_Thread::~CThreadPool_Thread(void)
02004 {
02005     delete m_Impl;
02006 }
02007 
02008 void
02009 CThreadPool_Thread::Initialize(void)
02010 {}
02011 
02012 void
02013 CThreadPool_Thread::Finalize(void)
02014 {}
02015 
02016 CThreadPool*
02017 CThreadPool_Thread::GetPool(void) const
02018 {
02019     return m_Impl->GetPool();
02020 }
02021 
02022 CRef<CThreadPool_Task>
02023 CThreadPool_Thread::GetCurrentTask(void) const
02024 {
02025     return m_Impl->GetCurrentTask();
02026 }
02027 
02028 void*
02029 CThreadPool_Thread::Main(void)
02030 {
02031     m_Impl->Main();
02032     return NULL;
02033 }
02034 
02035 void
02036 CThreadPool_Thread::OnExit(void)
02037 {
02038     m_Impl->OnExit();
02039 }
02040 
02041 
02042 
02043 CThreadPool::CThreadPool(unsigned int      queue_size,
02044                          unsigned int      max_threads,
02045                          unsigned int      min_threads,
02046                          CThread::TRunMode threads_mode)
02047 {
02048     m_Impl = new CThreadPool_Impl(this, queue_size, max_threads, min_threads,
02049                                   threads_mode);
02050     m_Impl->SetInterfaceStarted();
02051 }
02052 
02053 CThreadPool::CThreadPool(unsigned int            queue_size,
02054                          CThreadPool_Controller* controller,
02055                          CThread::TRunMode       threads_mode)
02056 {
02057     m_Impl = new CThreadPool_Impl(this, queue_size, controller, threads_mode);
02058     m_Impl->SetInterfaceStarted();
02059 }
02060 
02061 CThreadPool::~CThreadPool(void)
02062 {
02063     m_Impl->DestroyReference();
02064 }
02065 
02066 CMutex&
02067 CThreadPool::GetMainPoolMutex(void)
02068 {
02069     return m_Impl->GetMainPoolMutex();
02070 }
02071 
02072 CThreadPool_Thread*
02073 CThreadPool::CreateThread(void)
02074 {
02075     return CThreadPool_ThreadImpl::s_CreateThread(this);
02076 }
02077 
02078 void
02079 CThreadPool::AddTask(CThreadPool_Task* task, const CTimeSpan* timeout)
02080 {
02081     m_Impl->AddTask(task, timeout);
02082 }
02083 
02084 void
02085 CThreadPool::CancelTask(CThreadPool_Task* task)
02086 {
02087     m_Impl->CancelTask(task);
02088 }
02089 
02090 void
02091 CThreadPool::Abort(const CTimeSpan* timeout)
02092 {
02093     m_Impl->Abort(timeout);
02094 }
02095 
02096 bool
02097 CThreadPool::IsAborted(void) const
02098 {
02099     return m_Impl->IsAborted();
02100 }
02101 
02102 void
02103 CThreadPool::SetDestroyTimeout(const CTimeSpan& timeout)
02104 {
02105     m_Impl->SetDestroyTimeout(timeout);
02106 }
02107 
02108 const CTimeSpan&
02109 CThreadPool::GetDestroyTimeout(void) const
02110 {
02111     return m_Impl->GetDestroyTimeout();
02112 }
02113 
02114 void
02115 CThreadPool::RequestExclusiveExecution(CThreadPool_Task*  task,
02116                                        TExclusiveFlags    flags)
02117 {
02118     m_Impl->RequestExclusiveExecution(task, flags);
02119 }
02120 
02121 void
02122 CThreadPool::CancelTasks(TExclusiveFlags tasks_group)
02123 {
02124     m_Impl->CancelTasks(tasks_group);
02125 }
02126 
02127 void
02128 CThreadPool::FlushThreads(EFlushType flush_type)
02129 {
02130     m_Impl->FlushThreads(flush_type);
02131 }
02132 
02133 unsigned int
02134 CThreadPool::GetThreadsCount(void) const
02135 {
02136     return m_Impl->GetThreadsCount();
02137 }
02138 
02139 unsigned int
02140 CThreadPool::GetQueuedTasksCount(void) const
02141 {
02142     return m_Impl->GetQueuedTasksCount();
02143 }
02144 
02145 unsigned int
02146 CThreadPool::GetExecutingTasksCount(void) const
02147 {
02148     return m_Impl->GetExecutingTasksCount();
02149 }
02150 
02151 
02152 
02153 END_NCBI_SCOPE
02154 
02155 

Generated on Sun Dec 6 22:44:29 2009 for NCBI C++ ToolKit by  doxygen 1.4.6
Modified on Mon Dec 07 16:21:15 2009 by modify_doxy.py rev. 173732