NCBI C++ Toolkit Cross Reference

C++/src/util/thread_pool.cpp


  1 /*  $Id: thread_pool.cpp 164845 2009-07-01 16:44:27Z satskyse $
  2 * ===========================================================================
  3 *
  4 *                            PUBLIC DOMAIN NOTICE
  5 *               National Center for Biotechnology Information
  6 *
  7 *  This software/database is a "United States Government Work" under the
  8 *  terms of the United States Copyright Act.  It was written as part of
  9 *  the author's official duties as a United States Government employee and
 10 *  thus cannot be copyrighted.  This software/database is freely available
 11 *  to the public for use. The National Library of Medicine and the U.S.
 12 *  Government have not placed any restriction on its use or reproduction.
 13 *
 14 *  Although all reasonable efforts have been taken to ensure the accuracy
 15 *  and reliability of the software and data, the NLM and the U.S.
 16 *  Government do not and cannot warrant the performance or results that
 17 *  may be obtained by using this software or data. The NLM and the U.S.
 18 *  Government disclaim all warranties, express or implied, including
 19 *  warranties of performance, merchantability or fitness for any particular
 20 *  purpose.
 21 *
 22 *  Please cite the author in any work or product based on this material.
 23 *
 24 * ===========================================================================
 25 *
 26 * Author:  Pavel Ivanov
 27 *
 28 * File Description:
 29 *   Pool of threads.
 30 */
 31 
 32 #include <ncbi_pch.hpp>
 33 #include <util/thread_pool.hpp>
 34 #include <util/thread_pool_ctrl.hpp>
 35 #include <util/sync_queue.hpp>
 36 #include <util/error_codes.hpp>
 37 
 38 #define NCBI_USE_ERRCODE_X  Util_Thread
 39 
 40 BEGIN_NCBI_SCOPE
 41 
 42 
 43 class CThreadPool_Guard;
 44 class CThreadPool_ServiceThread;
 45 
 46 
 47 /// Functor to compare tasks by priority
 48 struct SThreadPool_TaskCompare {
 49     bool operator() (const CRef<CThreadPool_Task>& left,
 50                      const CRef<CThreadPool_Task>& right)
 51     {
 52         return left->GetPriority() < right->GetPriority();
 53     }
 54 };
 55 
 56 
 57 /// Real implementation of all ThreadPool functions
 58 class CThreadPool_Impl : public CObject
 59 {
 60 public:
 61     typedef CThreadPool::TExclusiveFlags  TExclusiveFlags;
 62 
 63     /// Convert pointer to CThreadPool object into pointer to CThreadPool_Impl
 64     /// object. Can be done only here to avoid excessive friendship to
 65     /// CThreadPool class.
 66     static CThreadPool_Impl* s_GetImplPointer(CThreadPool* pool);
 67 
 68     /// Call x_SetTaskStatus() for the given task.
 69     /// Method introduced to avoid excessive friendship to CThreadPool_Task
 70     /// class.
 71     ///
 72     /// @sa CThreadPool_Task::x_SetTaskStatus()
 73     static void sx_SetTaskStatus(CThreadPool_Task*          task,
 74                                  CThreadPool_Task::EStatus  status);
 75 
 76     /// Call x_RequestToCancel() for the given task.
 77     /// Method introduced to avoid excessive friendship to CThreadPool_Task
 78     /// class.
 79     ///
 80     /// @sa CThreadPool_Task::x_RequestToCancel()
 81     static void sx_RequestToCancel(CThreadPool_Task* task);
 82 
 83 
 84     /// Constructor with default controller
 85     /// @param pool_intf
 86     ///   ThreadPool interface object attached to this implementation
 87     ///
 88     /// @sa CThreadPool::CThreadPool()
 89     CThreadPool_Impl(CThreadPool*      pool_intf,
 90                      unsigned int      queue_size,
 91                      unsigned int      max_threads,
 92                      unsigned int      min_threads,
 93                      CThread::TRunMode threads_mode = CThread::fRunDefault);
 94 
 95     /// Constructor with explicitly given controller
 96     /// @param pool_intf
 97     ///   ThreadPool interface object attached to this implementation
 98     ///
 99     /// @sa CThreadPool::CThreadPool()
100     CThreadPool_Impl(CThreadPool*        pool_intf,
101                      unsigned int        queue_size,
102                      CThreadPool_Controller* controller,
103                      CThread::TRunMode   threads_mode = CThread::fRunDefault);
104 
105     /// Get pointer to ThreadPool interface object
106     CThreadPool* GetPoolInterface(void) const;
107 
108     /// Set destroy timeout for the pool
109     ///
110     /// @sa CThreadPool::SetDestroyTimeout()
111     void SetDestroyTimeout(const CTimeSpan& timeout);
112 
113     /// Get destroy timeout for the pool
114     ///
115     /// @sa CThreadPool::GetDestroyTimeout()
116     const CTimeSpan& GetDestroyTimeout(void) const;
117 
118     /// Destroy reference to this object
119     /// Method is called when CThreadPool object is destroyed which means
120     /// that implementation can be destroyed too if there is no references
121     /// to it left.
122     void DestroyReference(void);
123 
124     /// Get main pool mutex
125     ///
126     /// @sa CThreadPool::GetMainPoolMutex()
127     CMutex& GetMainPoolMutex(void);
128 
129     /// Add task to the pool
130     ///
131     /// @sa CThreadPool::AddTask()
132     void AddTask(CThreadPool_Task* task, const CTimeSpan* timeout);
133 
134     /// Request to cancel the task
135     ///
136     /// @sa CThreadPool::CancelTask()
137     void CancelTask(CThreadPool_Task* task);
138 
139     /// Cancel the selected groups of tasks in the pool
140     ///
141     /// @sa CThreadPool::CancelTasks()
142     void CancelTasks(TExclusiveFlags tasks_group);
143 
144     /// Add the task for exclusive execution in the pool
145     ///
146     /// @sa CThreadPool::RequestExclusiveExecution()
147     void RequestExclusiveExecution(CThreadPool_Task*  task,
148                                    TExclusiveFlags    flags);
149 
150     /// Launch new threads in pool
151     /// @param count
152     ///   Number of threads to launch
153     void LaunchThreads(unsigned int count);
154 
155     /// Finish threads in pool
156     /// Stop first all idle threads then stop busy threads without
157     /// cancelation of currently executing tasks.
158     /// @param count
159     ///   Number of threads to finish
160     void FinishThreads(unsigned int count);
161 
162     /// Get number of threads running in the pool
163     unsigned int GetThreadsCount(void) const;
164 
165     /// Mark thread as idle or non-idle
166     /// @param thread
167     ///   Thread to mark
168     /// @param is_idle
169     ///   If thread should be marked as idle or not
170     void SetThreadIdle(CThreadPool_ThreadImpl* thread, bool is_idle);
171 
172     /// Callback from working thread when it finished its Main() method
173     void ThreadStopped(CThreadPool_ThreadImpl* thread);
174 
175     /// Callback when some thread changed its idleness or finished
176     /// (including service thread)
177     void ThreadStateChanged(void);
178 
179     /// Get next task from queue if there is one
180     /// If the queue is empty then return NULL.
181     CRef<CThreadPool_Task> TryGetNextTask(void);
182 
183     /// Callback from thread when it is starting to execute task
184     void TaskStarting(void);
185 
186     /// Callback from thread when it has finished to execute task
187     void TaskFinished(void);
188 
189     /// Get the number of tasks currently waiting in queue
190     unsigned int GetQueuedTasksCount(void) const;
191 
192     /// Get the number of currently executing tasks
193     unsigned int GetExecutingTasksCount(void) const;
194 
195     /// Type for storing information about exclusive task launching
196     typedef pair< TExclusiveFlags,
197                   CRef<CThreadPool_Task> > TExclusiveTaskInfo;
198 
199     /// Get information about next exclusive task to execute
200     TExclusiveTaskInfo TryGetExclusiveTask(void);
201 
202     /// Request suspension of the pool
203     /// @param flags
204     ///   Parameters for necessary exclusive execution environment
205     void RequestSuspend(TExclusiveFlags flags);
206 
207     /// Resume the pool operation after exclusive task execution
208     void ResumeWork(void);
209 
210     /// Check if the pool is suspended for exclusive execution
211     bool IsSuspended(void) const;
212 
213     /// Check if it is already allowed to execute exclusive task
214     bool CanDoExclusiveTask(void) const;
215 
216     /// Abort the pool operation
217     ///
218     /// @sa CThreadPool::Abort()
219     void Abort(const CTimeSpan* timeout);
220 
221     /// Check if the pool is already aborted
222     bool IsAborted(void) const;
223 
224     /// Finish all current threads and replace them with new ones
225     ///
226     /// @sa CThreadPool::FlushThreads()
227     void FlushThreads(CThreadPool::EFlushType flush_type);
228 
229     /// Call the CThreadPool_Controller::HandleEvent() method of the pool
230     /// controller with the given event type. If ThreadPool is already aborted
231     /// and controller is reset then do nothing.
232     void CallController(CThreadPool_Controller::EEvent event);
233 
234     /// Schedule running of CThreadPool_Controller::HandleEvent() with eOther
235     /// event type
236     void CallControllerOther(void);
237 
238     /// Call the CThreadPool_Controller::GetSafeSleepTime() method of the pool
239     /// controller. If ThreadPool is already aborted and controller is reset
240     /// then return time period of 1 second.
241     CTimeSpan GetSafeSleepTime(void) const;
242 
243     /// Mark that initialization of the interface was finished
244     void SetInterfaceStarted(void);
245 
246 
247 private:
248     /// Type of queue used for storing tasks
249     typedef CSyncQueue< CRef<CThreadPool_Task>,
250                         CSyncQueue_multiset< CRef<CThreadPool_Task>,
251                                              SThreadPool_TaskCompare > >
252             TQueue;
253     /// Type of queue used for storing information about exclusive tasks
254     typedef CSyncQueue<TExclusiveTaskInfo>                 TExclusiveQueue;
255     /// Type of list of all poolled threads
256     typedef set<CThreadPool_ThreadImpl*> TThreadsList;
257 
258 
259     /// Prohibit copying and assigning
260     CThreadPool_Impl(const CThreadPool_Impl&);
261     CThreadPool_Impl& operator= (const CThreadPool_Impl&);
262 
263     /// Transform size of queue given in constructor to the size passed to
264     /// CSyncQueue constructor.
265     /// Method can be called only from constructor because it initializes
266     /// value of m_IsQueueAllowed member variable.
267     unsigned int x_GetQueueSize(unsigned int queue_size);
268 
269     /// Initialization of all class member variables that can be initialized
270     /// outside of constructor
271     /// @param pool_intf
272     ///   ThreadPool interface object attached to this implementation
273     /// @param controller
274     ///   Controller for the pool
275     void x_Init(CThreadPool*            pool_intf,
276                 CThreadPool_Controller* controller,
277                 CThread::TRunMode       threads_mode);
278 
279     /// Destructor. Will be called from CRef
280     ~CThreadPool_Impl(void);
281 
282     /// Delete task from the queue
283     /// If task does not exist in queue then does nothing.
284     void x_RemoveTaskFromQueue(const CThreadPool_Task* task);
285 
286     /// Cancel all tasks waiting in the queue
287     void x_CancelQueuedTasks(void);
288 
289     /// Cancel all currently executing tasks
290     void x_CancelExecutingTasks(void);
291 
292     /// Type of some simple predicate
293     ///
294     /// @sa x_WaitForPredicate
295     typedef bool (CThreadPool_Impl::*TWaitPredicate)(void) const;
296 
297     /// Check if new task can be added to the pool
298     bool x_IsNewTaskAllowed(void) const;
299 
300     /// Check if new task can be added to the pool when queuing is disabled
301     bool x_CanAddImmediateTask(void) const;
302 
303     /// Check if all threads in pool finished their work
304     bool x_HasNoThreads(void) const;
305 
306     /// Wait for some predicate to be true
307     /// @param wait_func
308     ///   Predicate to wait for
309     /// @param pool_guard
310     ///   Guardian that locks main pool mutex at the time of method call and
311     ///   that have to be unlocked for the time of waiting
312     /// @param wait_sema
313     ///   Semaphore which will be posted when predicate become true
314     /// @param timeout
315     ///   Maximum amount of time to wait
316     /// @param timer
317     ///   Timer for mesuring elapsed time. Method assumes that timer is
318     ///   started at the moment from which timeout should be calculated.
319     bool x_WaitForPredicate(TWaitPredicate      wait_func,
320                             CThreadPool_Guard*  pool_guard,
321                             CSemaphore*         wait_sema,
322                             const CTimeSpan*    timeout,
323                             const CStopWatch*   timer);
324 
325 
326 private:
327     /// ThreadPool interface object attached to this implementation
328     CThreadPool*                     m_Interface;
329     /// Reference to this pool to prevent its destroying earlier than we
330     /// allow it to
331     CRef<CThreadPool_Impl>           m_SelfRef;
332     /// Timeout to wait for all threads to finish before the ThreadPool
333     /// interface object will be able to destroy
334     CTimeSpan                        m_DestroyTimeout;
335     /// Queue for storing tasks
336     TQueue                           m_Queue;
337     /// Mutex for guarding all changes in the pool, its threads and controller
338     CMutex                           m_MainPoolMutex;
339     /// Semaphore for waiting for available threads to process task when
340     /// queuing is disabled.
341     CSemaphore                       m_RoomWait;
342     /// Controller managing count of threads in pool
343     CRef<CThreadPool_Controller>     m_Controller;
344     /// List of all idle threads
345     TThreadsList                     m_IdleThreads;
346     /// List of all threads currently executing some tasks
347     TThreadsList                     m_WorkingThreads;
348     /// Running mode of all threads
349     CThread::TRunMode                m_ThreadsMode;
350     /// Total number of threads
351     /// Introduced for more adequate and fast reflecting to threads starting
352     /// and stopping events
353     CAtomicCounter                   m_ThreadsCount;
354     /// Number of tasks executing now
355     /// Introduced for more adequate and fast reflecting to task executing
356     /// start and finish events
357     CAtomicCounter                   m_ExecutingTasks;
358     /// Total number of tasks acquired by pool
359     /// Includes queued tasks and executing tasks. Introduced for
360     /// maintaining atomicity of this number changing
361     CAtomicCounter                   m_TotalTasks;
362     /// Flag about working with special case:
363     /// FALSE - queue_size == 0, TRUE - queue_size > 0
364     bool                             m_IsQueueAllowed;
365     /// If pool is already aborted or not
366     volatile bool                    m_Aborted;
367     /// Semaphore for waiting for threads finishing in Abort() method
368     ///
369     /// @sa Abort()
370     CSemaphore                       m_AbortWait;
371     /// If pool is suspended for exclusive task execution or not
372     volatile bool                    m_Suspended;
373     /// Requested requirements for the exclusive execution environment
374     volatile TExclusiveFlags         m_SuspendFlags;
375     /// Flag indicating if flush of threads requested after adding exclusive
376     /// task but before it is started its execution.
377     volatile bool                    m_FlushRequested;
378     /// Thread for execution of exclusive tasks and passing of events
379     /// to the controller
380     CRef<CThreadPool_ServiceThread>  m_ServiceThread;
381     /// Queue for information about exclusive tasks
382     TExclusiveQueue                  m_ExclusiveQueue;
383 };
384 
385 
386 
387 /// Real implementation of all CThreadPool_Thread functions
388 class CThreadPool_ThreadImpl
389 {
390 public:
391     /// Convert pointer to CThreadPool_Thread object into pointer
392     /// to CThreadPool_ThreadImpl object. Can be done only here to avoid
393     /// excessive friendship to CThreadPool_Thread class.
394     static CThreadPool_ThreadImpl*
395     s_GetImplPointer(CThreadPool_Thread* thread);
396 
397     /// Create new CThreadPool_Thread object
398     /// Method introduced to avoid excessive friendship to CThreadPool_Thread
399     /// class.
400     ///
401     /// @sa CThreadPool_Thread::CThreadPool_Thread()
402     static CThreadPool_Thread* s_CreateThread(CThreadPool* pool);
403 
404     /// Constructor
405     /// @param thread_intf
406     ///   ThreadPool_Thread interface object attached to this implementation
407     /// @param pool
408     ///   Pool implementation owning this thread
409     CThreadPool_ThreadImpl(CThreadPool_Thread* thread_intf,
410                            CThreadPool_Impl*   pool);
411 
412     /// Destructor
413     /// Called directly from CThreadPool destructor
414     ~CThreadPool_ThreadImpl(void);
415 
416     /// Get ThreadPool interface object owning this thread
417     ///
418     /// @sa CThreadPool_Thread::GetPool()
419     CThreadPool* GetPool(void) const;
420 
421     /// Request this thread to finish its operation.
422     /// It renders the thread unusable and eventually ready for destruction
423     /// (as soon as its current task is finished and there are no CRefs to
424     /// this thread left).
425     void RequestToFinish(void);
426 
427     /// If finishing of this thread is already in progress or not
428     bool IsFinishing(void) const;
429 
430     /// Wake up the thread from idle state
431     ///
432     /// @sa x_Idle
433     void WakeUp(void);
434 
435     /// Get task currently executing in the thread
436     /// May be NULL if thread is idle or is in the middle of changing of
437     /// current task
438     ///
439     /// @sa CThreadPool_Thread::GetCurrentTask()
440     CRef<CThreadPool_Task> GetCurrentTask(void) const;
441 
442     /// Request to cancel current task execution
443     void CancelCurrentTask(void);
444 
445     /// Implementation of thread Main() method
446     ///
447     /// @sa CThreadPool_Thread::Main()
448     void Main(void);
449 
450     /// Implementation of threadOnExit() method
451     ///
452     /// @sa CThreadPool_Thread::OnExit()
453     void OnExit(void);
454 
455 private:
456     /// Prohibit copying and assigning
457     CThreadPool_ThreadImpl(const CThreadPool_ThreadImpl&);
458     CThreadPool_ThreadImpl& operator= (const CThreadPool_ThreadImpl&);
459 
460     /// Suspend until the wake up signal.
461     ///
462     /// @sa WakeUp()
463     void x_Idle(void);
464 
465     /// Mark the thread idle or non-idle
466     void x_SetIdleState(bool is_idle);
467 
468     /// Do finalizing when task finished its execution
469     /// @param status
470     ///   Status that the task must get
471     void x_TaskFinished(CThreadPool_Task::EStatus status);
472 
473 
474     /// ThreadPool_Thread interface object attached to this implementation
475     CThreadPool_Thread*          m_Interface;
476     /// Pool running the thread
477     CRef<CThreadPool_Impl>       m_Pool;
478     /// If the thread is already asked to finish or not
479     volatile bool                m_Finishing;
480     /// If cancel of the currently executing task is requested or not
481     volatile bool                m_CancelRequested;
482     /// Idleness of the thread
483     bool                         m_IsIdle;
484     /// Task currently executing in the thread
485     CRef<CThreadPool_Task>       m_CurrentTask;
486     /// Semaphore for waking up from idle waiting
487     CSemaphore                   m_IdleTrigger;
488 };
489 
490 
491 
492 /// Thread used in pool for different internal needs: execution of exclusive
493 /// tasks and passing events to controller
494 class CThreadPool_ServiceThread : public CThread
495 {
496 public:
497     /// Constructor
498     /// @param pool
499     ///   ThreadPool owning this thread
500     CThreadPool_ServiceThread(CThreadPool_Impl* pool);
501 
502     /// Wake up from idle waiting or waiting of pool preparing exclusive
503     /// environment
504     void WakeUp(void);
505 
506     /// Request finishing of the thread
507     void RequestToFinish(void);
508 
509     /// Check if this thread have already finished or not
510     bool IsFinished(void);
511 
512     /// Tell the thread that controller should handle eOther event
513     ///
514     /// @sa CThreadPool_Controller::HandleEvent()
515     void NeedCallController(void);
516 
517 protected:
518     /// Destructor. Will be called from CRef
519     virtual ~CThreadPool_ServiceThread(void);
520 
521 private:
522     /// Main thread execution
523     virtual void* Main(void);
524 
525     /// Do "idle" work when thread is not busy executing exclusive tasks
526     void x_Idle(void);
527 
528     /// Wait until pool is ready for execution of exclusive task
529     void x_WaitForPoolStop(CThreadPool_Guard* pool_guard);
530 
531     /// Pool owning this thread
532     CRef<CThreadPool_Impl>  m_Pool;
533     /// Semaphore for idle sleeping
534     CSemaphore              m_IdleTrigger;
535     /// If finishing of the thread is already requested
536     volatile bool           m_Finishing;
537     /// If the thread has already finished its Main() method
538     volatile bool           m_Finished;
539     /// Currently executing exclusive task
540     CRef<CThreadPool_Task>  m_CurrentTask;
541     /// Flag indicating that thread should pass eOther event to the controller
542     CAtomicCounter          m_NeedCallController;
543 };
544 
545 
546 
547 /// Guardian for protecting pool by locking its main mutex
548 class CThreadPool_Guard : private CMutexGuard
549 {
550 public:
551     /// Constructor
552     /// @param pool
553     ///   Pool to protect
554     /// @param is_active
555     ///   If the mutex should be locked in constructor or not
556     CThreadPool_Guard(CThreadPool_Impl* pool, bool is_active = true);
557 
558     /// Turn this guardian on
559     void Guard(void);
560 
561     /// Turn this guardian off
562     void Release(void);
563 
564 private:
565     /// Pool protected by the guardian
566     CThreadPool_Impl* m_Pool;
567 };
568 
569 
570 
571 /// Special task which does nothing
572 /// It's used in FlushThreads to force pool to wait while all old threads
573 /// finish their operation to start new ones.
574 ///
575 /// @sa CThreadPool_Impl::FlushThreads()
576 class CThreadPool_EmptyTask : public CThreadPool_Task
577 {
578 public:
579     /// Empty main method
580     virtual EStatus Execute(void) { return eCompleted; }
581 
582 #ifdef NCBI_COMPILER_ICC
583     // In the absence of the following constructor,
584     // ICC fills the object memory with zeros,
585     // erasing flags set by CObject::operator new
586     CThreadPool_EmptyTask(void) {}
587 #endif
588 };
589 
590 
591 
592 /// Check if status returned from CThreadPool_Task::Execute() is allowed
593 /// and change it to eCompleted value if it is invalid
594 static inline CThreadPool_Task::EStatus
595 s_ConvertTaskResult(CThreadPool_Task::EStatus status)
596 {
597     _ASSERT(status == CThreadPool_Task::eCompleted
598             ||  status == CThreadPool_Task::eFailed
599             ||  status == CThreadPool_Task::eCanceled);
600 
601     if (status != CThreadPool_Task::eCompleted
602         &&  status != CThreadPool_Task::eFailed
603         &&  status != CThreadPool_Task::eCanceled)
604     {
605         ERR_POST_X(9, Critical
606                       << "Wrong status returned from "
607                          "CThreadPool_Task::Execute(): "
608                       << status);
609         status = CThreadPool_Task::eCompleted;
610     }
611 
612     return status;
613 }
614 
615 
616 
617 const CAtomicCounter::TValue kNeedCallController_Shift = 0x0FFFFFFF;
618 
619 
620 inline void
621 CThreadPool_ServiceThread::WakeUp(void)
622 {
623     m_IdleTrigger.Post();
624 }
625 
626 inline void
627 CThreadPool_ServiceThread::NeedCallController(void)
628 {
629     if (m_NeedCallController.Add(1) > kNeedCallController_Shift + 1) {
630         m_NeedCallController.Add(-1);
631     }
632     else {
633         WakeUp();
634     }
635 }
636 
637 
638 
639 inline void
640 CThreadPool_ThreadImpl::WakeUp(void)
641 {
642     m_IdleTrigger.Post();
643 }
644 
645 
646 
647 inline CMutex&
648 CThreadPool_Impl::GetMainPoolMutex(void)
649 {
650     return m_MainPoolMutex;
651 }
652 
653 
654 
655 CThreadPool_Guard::CThreadPool_Guard(CThreadPool_Impl* pool, bool is_active)
656     : CMutexGuard(eEmptyGuard),
657       m_Pool(pool)
658 {
659     _ASSERT(pool);
660 
661     if (is_active)
662         Guard();
663 }
664 
665 void
666 CThreadPool_Guard::Guard(void)
667 {
668     CMutexGuard::Guard(m_Pool->GetMainPoolMutex());
669 }
670 
671 void
672 CThreadPool_Guard::Release(void)
673 {
674     CMutexGuard::Release();
675 }
676 
677 
678 
679 inline void
680 CThreadPool_Impl::sx_SetTaskStatus(CThreadPool_Task*          task,
681                                    CThreadPool_Task::EStatus  status)
682 {
683     task->x_SetStatus(status);
684 }
685 
686 inline void
687 CThreadPool_Impl::sx_RequestToCancel(CThreadPool_Task* task)
688 {
689     task->x_RequestToCancel();
690 }
691 
692 inline CThreadPool*
693 CThreadPool_Impl::GetPoolInterface(void) const
694 {
695     return m_Interface;
696 }
697 
698 inline void
699 CThreadPool_Impl::SetInterfaceStarted(void)
700 {
701     m_ServiceThread->Run(CThread::fRunDetached);
702 }
703 
704 inline bool
705 CThreadPool_Impl::IsAborted(void) const
706 {
707     return m_Aborted;
708 }
709 
710 inline bool
711 CThreadPool_Impl::IsSuspended(void) const
712 {
713     return m_Suspended;
714 }
715 
716 inline unsigned int
717 CThreadPool_Impl::GetThreadsCount(void) const
718 {
719     return (unsigned int)m_ThreadsCount.Get();
720 }
721 
722 inline unsigned int
723 CThreadPool_Impl::GetQueuedTasksCount(void) const
724 {
725     return (unsigned int)m_Queue.GetSize();
726 }
727 
728 inline unsigned int
729 CThreadPool_Impl::GetExecutingTasksCount(void) const
730 {
731     return (unsigned int)m_ExecutingTasks.Get();
732 }
733 
734 inline CTimeSpan
735 CThreadPool_Impl::GetSafeSleepTime(void) const
736 {
737     // m_Controller variable can be uninitialized in only when ThreadPool
738     // is already aborted
739     CThreadPool_Controller* controller = m_Controller.GetNCPointerOrNull();
740     if (controller  &&  ! m_Aborted) {
741         return controller->GetSafeSleepTime();
742     }
743     else {
744         return CTimeSpan(0, 0);
745     }
746 }
747 
748 inline void
749 CThreadPool_Impl::CallController(CThreadPool_Controller::EEvent event)
750 {
751     CThreadPool_Controller* controller = m_Controller.GetNCPointerOrNull();
752     if (controller  &&  ! m_Aborted  &&
753         (! m_Suspended  ||  event == CThreadPool_Controller::eSuspend))
754     {
755         controller->HandleEvent(event);
756     }
757 }
758 
759 inline void
760 CThreadPool_Impl::CallControllerOther(void)
761 {
762     CThreadPool_ServiceThread* thread = m_ServiceThread;
763     if (thread) {
764         thread->NeedCallController();
765     }
766 }
767 
768 inline void
769 CThreadPool_Impl::TaskStarting(void)
770 {
771     m_ExecutingTasks.Add(1);
772     // In current implementation controller operation doesn't depend on this
773     // action. So we will save mutex locks for the sake of performance
774     //CallControllerOther();
775 }
776 
777 inline void
778 CThreadPool_Impl::TaskFinished(void)
779 {
780     m_ExecutingTasks.Add(-1);
781     m_TotalTasks.Add(-1);
782     m_RoomWait.Post();
783     CallControllerOther();
784 }
785 
786 inline void
787 CThreadPool_Impl::ThreadStateChanged(void)
788 {
789     if (m_Aborted) {
790         if (x_HasNoThreads()) {
791             m_AbortWait.Post();
792         }
793     }
794     else if (m_Suspended) {
795         if ((m_SuspendFlags & CThreadPool::fFlushThreads)
796                  &&  GetThreadsCount() == 0
797             ||  ! (m_SuspendFlags & CThreadPool::fFlushThreads)
798                  &&  m_WorkingThreads.size() == 0)
799         {
800             m_ServiceThread->WakeUp();
801         }
802     }
803 }
804 
805 inline void
806 CThreadPool_Impl::ThreadStopped(CThreadPool_ThreadImpl* thread)
807 {
808     m_ThreadsCount.Add(-1);
809 
810     CThreadPool_Guard guard(this);
811 
812     m_IdleThreads.erase(thread);
813     m_WorkingThreads.erase(thread);
814 
815     CallControllerOther();
816 
817     ThreadStateChanged();
818 }
819 
820 inline CRef<CThreadPool_Task>
821 CThreadPool_Impl::TryGetNextTask(void)
822 {
823     if (!m_Suspended  ||  (m_SuspendFlags & CThreadPool::fExecuteQueuedTasks)) {
824         TQueue::TAccessGuard guard(m_Queue);
825 
826         if (m_Queue.GetSize() != 0) {
827             return m_Queue.Pop();
828         }
829     }
830 
831     return CRef<CThreadPool_Task>();
832 }
833 
834 inline CThreadPool_Impl::TExclusiveTaskInfo
835 CThreadPool_Impl::TryGetExclusiveTask(void)
836 {
837     TExclusiveQueue::TAccessGuard guard(m_ExclusiveQueue);
838 
839     if (m_ExclusiveQueue.GetSize() != 0) {
840         CThreadPool_Impl::TExclusiveTaskInfo info = m_ExclusiveQueue.Pop();
841         if (m_FlushRequested) {
842             info.first |= CThreadPool::fFlushThreads;
843             m_FlushRequested = false;
844         }
845         return info;
846     }
847 
848     return TExclusiveTaskInfo(0, CRef<CThreadPool_Task>());
849 }
850 
851 inline bool
852 CThreadPool_Impl::CanDoExclusiveTask(void) const
853 {
854     if ((m_SuspendFlags & CThreadPool::fExecuteQueuedTasks)
855         &&  GetQueuedTasksCount() != 0)
856     {
857         return false;
858     }
859 
860     if ((m_SuspendFlags & CThreadPool::fFlushThreads)
861         &&  GetThreadsCount() != 0)
862     {
863         return false;
864     }
865 
866     return m_WorkingThreads.size() == 0;
867 }
868 
869 inline void
870 CThreadPool_Impl::RequestSuspend(TExclusiveFlags flags)
871 {
872     m_SuspendFlags = flags;
873     m_Suspended = true;
874     if (flags & CThreadPool::fCancelQueuedTasks) {
875         x_CancelQueuedTasks();
876     }
877     if (flags & CThreadPool::fCancelExecutingTasks) {
878         x_CancelExecutingTasks();
879     }
880 
881     if (flags & CThreadPool::fFlushThreads) {
882         FinishThreads(m_IdleThreads.size());
883     }
884 
885     CallController(CThreadPool_Controller::eSuspend);
886 }
887 
888 inline void
889 CThreadPool_Impl::ResumeWork(void)
890 {
891     m_Suspended = false;
892 
893     CallController(CThreadPool_Controller::eResume);
894 
895     ITERATE(TThreadsList, it, m_IdleThreads) {
896         (*it)->WakeUp();
897     }
898 }
899 
900 
901 
902 inline void
903 CThreadPool_Controller::x_AttachToPool(CThreadPool_Impl* pool)
904 {
905     if (m_Pool != NULL) {
906         NCBI_THROW(CThreadPoolException, eControllerBusy,
907                    "Cannot attach Controller to several ThreadPools.");
908     }
909 
910     m_Pool = pool;
911 }
912 
913 inline void
914 CThreadPool_Controller::x_DetachFromPool(void)
915 {
916     m_Pool = NULL;
917 }
918 
919 
920 
921 CThreadPool_Task::CThreadPool_Task(unsigned int priority)
922 {
923     x_Init(priority);
924 }
925 
926 CThreadPool_Task::CThreadPool_Task(const CThreadPool_Task& other)
927 {
928     x_Init(other.m_Priority);
929 }
930 
931 void
932 CThreadPool_Task::x_Init(unsigned int priority)
933 {
934     m_Pool = NULL;
935     m_Priority = priority;
936     m_Status = eIdle;
937     m_CancelRequested = false;
938 }
939 
940 CThreadPool_Task::~CThreadPool_Task(void)
941 {}
942 
943 CThreadPool_Task&
944 CThreadPool_Task::operator= (const CThreadPool_Task& other)
945 {
946     if (m_IsBusy.Get() != 0) {
947         NCBI_THROW(CThreadPoolException, eTaskBusy,
948                    "Cannot change task when it is already added "
949                    "to ThreadPool");
950     }
951 
952     CObject::operator= (other);
953     // There can be race with CThreadPool_Impl::AddTask()
954     // If task will be already added to queue and priority will be then
955     // changed queue can crush later
956     m_Priority = other.m_Priority;
957     return *this;
958 }
959 
960 void
961 CThreadPool_Task::OnStatusChange(EStatus /* old */)
962 {}
963 
964 void
965 CThreadPool_Task::OnCancelRequested(void)
966 {}
967 
968 inline void
969 CThreadPool_Task::x_SetOwner(CThreadPool_Impl* pool)
970 {
971     if (m_IsBusy.Add(1) != 1) {
972         m_IsBusy.Add(-1);
973         NCBI_THROW(CThreadPoolException, eTaskBusy,
974                    "Cannot add task in ThreadPool several times");
975     }
976 
977     m_Pool = pool;
978 }
979 
980 inline void
981 CThreadPool_Task::x_ResetOwner(void)
982 {
983     m_Pool = NULL;
984     m_IsBusy.Add(-1);
985 }
986 
987 void
988 CThreadPool_Task::x_SetStatus(EStatus new_status)
989 {
990     EStatus old_status = m_Status;
991     if (old_status != new_status  &&  old_status != eCanceled) {
992         m_Status = new_status;
993         OnStatusChange(old_status);
994     }
995 
996     if (IsFinished()) {
997         m_Pool = NULL;
998     }
999 }
1000 
1001 inline void
1002 CThreadPool_Task::x_RequestToCancel(void)
1003 {
1004     m_CancelRequested = true;
1005 
1006     OnCancelRequested();
1007 
1008     if (GetStatus() <= eQueued) {
1009         x_SetStatus(eCanceled);
1010     }
1011 }
1012 
1013 void
1014 CThreadPool_Task::RequestToCancel(void)
1015 {
1016     // Protect from possible reseting of the pool variable during execution
1017     CThreadPool_Impl* pool = m_Pool;
1018     if (IsFinished()) {
1019         return;
1020     }
1021     else if (!pool) {
1022         x_RequestToCancel();
1023     }
1024     else {
1025         pool->CancelTask(this);
1026     }
1027 }
1028 
1029 CThreadPool*
1030 CThreadPool_Task::GetPool(void) const
1031 {
1032     // Protect from possible reseting of the pool variable during execution
1033     CThreadPool_Impl* pool_impl = m_Pool;
1034     return pool_impl? pool_impl->GetPoolInterface(): NULL;
1035 }
1036 
1037 
1038 
1039 CThreadPool_ServiceThread::CThreadPool_ServiceThread(CThreadPool_Impl* pool)
1040     : m_Pool(pool),
1041       m_IdleTrigger(0, kMax_Int),
1042       m_Finishing(false),
1043       m_Finished(false)
1044 {
1045     _ASSERT(pool);
1046 
1047     m_NeedCallController.Set(kNeedCallController_Shift);
1048 }
1049 
1050 CThreadPool_ServiceThread::~CThreadPool_ServiceThread(void)
1051 {}
1052 
1053 inline bool
1054 CThreadPool_ServiceThread::IsFinished(void)
1055 {
1056     return m_Finished;
1057 }
1058 
1059 inline void
1060 CThreadPool_ServiceThread::x_Idle(void)
1061 {
1062     if (m_NeedCallController.Add(-1) < kNeedCallController_Shift) {
1063         m_NeedCallController.Add(1);
1064     }
1065     m_Pool->CallController(CThreadPool_Controller::eOther);
1066 
1067     CTimeSpan timeout = m_Pool->GetSafeSleepTime();
1068     m_IdleTrigger.TryWait(timeout.GetCompleteSeconds(),
1069                           timeout.GetNanoSecondsAfterSecond());
1070 }
1071 
1072 inline void
1073 CThreadPool_ServiceThread::x_WaitForPoolStop(CThreadPool_Guard* pool_guard)
1074 {
1075     while (! m_Pool->IsAborted()  &&  ! m_Pool->CanDoExclusiveTask()) {
1076         pool_guard->Release();
1077         m_IdleTrigger.Wait();
1078         pool_guard->Guard();
1079     }
1080 }
1081 
1082 inline void
1083 CThreadPool_ServiceThread::RequestToFinish(void)
1084 {
1085     m_Finishing = true;
1086     WakeUp();
1087 
1088     CThreadPool_Task* task = m_CurrentTask;
1089     if (task) {
1090         CThreadPool_Impl::sx_RequestToCancel(task);
1091     }
1092 }
1093 
1094 void*
1095 CThreadPool_ServiceThread::Main(void)
1096 {
1097     while (! m_Finishing) {
1098         CThreadPool_Impl::TExclusiveTaskInfo task_info =
1099                                               m_Pool->TryGetExclusiveTask();
1100         m_CurrentTask = task_info.second;
1101 
1102         if (m_CurrentTask.IsNull()) {
1103             x_Idle();
1104         }
1105         else {
1106             CThreadPool_Guard guard(m_Pool);
1107 
1108             if (m_Finishing) {
1109                 if (! m_CurrentTask->IsCancelRequested()) {
1110                     CThreadPool_Impl::sx_RequestToCancel(m_CurrentTask);
1111                 }
1112                 CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask,
1113                                                  CThreadPool_Task::eCanceled);
1114                 break;
1115             }
1116 
1117             m_Pool->RequestSuspend(task_info.first);
1118             x_WaitForPoolStop(&guard);
1119 
1120             if (m_Finishing) {
1121                 if (!m_CurrentTask->IsCancelRequested()) {
1122                     CThreadPool_Impl::sx_RequestToCancel(m_CurrentTask);
1123                 }
1124                 CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask,
1125                                                  CThreadPool_Task::eCanceled);
1126                 break;
1127             }
1128 
1129             guard.Release();
1130 
1131             CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask,
1132                                                CThreadPool_Task::eExecuting);
1133             try {
1134                 CThreadPool_Task::EStatus status =
1135                                 s_ConvertTaskResult(m_CurrentTask->Execute());
1136                 CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask, status);
1137             }
1138             catch (exception& e) {
1139                 ERR_POST_X(11, "Exception from exclusive task in ThreadPool: "
1140                                << e.what());
1141                 CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask,
1142                                                    CThreadPool_Task::eFailed);
1143             }
1144             catch (...) {
1145                 ERR_POST_X(12, "Unknown exception from exclusive task "
1146                                "in ThreadPool.");
1147                 CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask,
1148                                                    CThreadPool_Task::eFailed);
1149             }
1150 
1151             guard.Guard();
1152             m_Pool->ResumeWork();
1153         }
1154     }
1155 
1156     m_Finished = true;
1157     m_Pool->ThreadStateChanged();
1158 
1159     return NULL;
1160 }
1161 
1162 
1163 
1164 inline CThreadPool_ThreadImpl*
1165 CThreadPool_ThreadImpl::s_GetImplPointer(CThreadPool_Thread* thread)
1166 {
1167     return thread->m_Impl;
1168 }
1169 
1170 inline CThreadPool_Thread*
1171 CThreadPool_ThreadImpl::s_CreateThread(CThreadPool* pool)
1172 {
1173     return new CThreadPool_Thread(pool);
1174 }
1175 
1176 inline
1177 CThreadPool_ThreadImpl::CThreadPool_ThreadImpl
1178 (
1179     CThreadPool_Thread*  thread_intf,
1180     CThreadPool_Impl*    pool
1181 )
1182   : m_Interface(thread_intf),
1183     m_Pool(pool),
1184     m_Finishing(false),
1185     m_CancelRequested(false),
1186     m_IsIdle(true),
1187     m_IdleTrigger(0, kMax_Int)
1188 {}
1189 
1190 inline
1191 CThreadPool_ThreadImpl::~CThreadPool_ThreadImpl(void)
1192 {}
1193 
1194 inline CThreadPool*
1195 CThreadPool_ThreadImpl::GetPool(void) const
1196 {
1197     return m_Pool->GetPoolInterface();
1198 }
1199 
1200 inline bool
1201 CThreadPool_ThreadImpl::IsFinishing(void) const
1202 {
1203     return m_Finishing;
1204 }
1205 
1206 inline CRef<CThreadPool_Task>
1207 CThreadPool_ThreadImpl::GetCurrentTask(void) const
1208 {
1209     return m_CurrentTask;
1210 }
1211 
1212 inline void
1213 CThreadPool_ThreadImpl::x_SetIdleState(bool is_idle)
1214 {
1215     if (m_IsIdle != is_idle) {
1216         m_IsIdle = is_idle;
1217         m_Pool->SetThreadIdle(this, is_idle);
1218     }
1219 }
1220 
1221 inline void
1222 CThreadPool_ThreadImpl::x_TaskFinished(CThreadPool_Task::EStatus status)
1223 {
1224     if (m_CurrentTask->GetStatus() == CThreadPool_Task::eExecuting) {
1225         CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask, status);
1226     }
1227 
1228     m_CurrentTask.Reset();
1229     m_Pool->TaskFinished();
1230 }
1231 
1232 inline void
1233 CThreadPool_ThreadImpl::x_Idle(void)
1234 {
1235     x_SetIdleState(true);
1236 
1237     m_IdleTrigger.Wait();
1238 }
1239 
1240 inline void
1241 CThreadPool_ThreadImpl::RequestToFinish(void)
1242 {
1243     m_Finishing = true;
1244     WakeUp();
1245 }
1246 
1247 inline void
1248 CThreadPool_ThreadImpl::CancelCurrentTask(void)
1249 {
1250     // Avoid resetting of the pointer during execution
1251     CRef<CThreadPool_Task> task = m_CurrentTask;
1252     if (task.NotNull()) {
1253         CThreadPool_Impl::sx_RequestToCancel(task);
1254     }
1255     else {
1256         m_CancelRequested = true;
1257     }
1258 }
1259 
1260 inline void
1261 CThreadPool_ThreadImpl::Main(void)
1262 {
1263     m_Interface->Initialize();
1264 
1265     while (!m_Finishing) {
1266         m_CancelRequested = false;
1267         m_CurrentTask = m_Pool->TryGetNextTask();
1268 
1269         if (m_CurrentTask.IsNull()) {
1270             x_Idle();
1271         }
1272         else {
1273             if (m_CurrentTask->IsCancelRequested()  ||  m_CancelRequested) {
1274                 // Some race can appear if task is canceled at the time
1275                 // when it's being queued or at the time when it's being
1276                 // unqueued
1277                 if (! m_CurrentTask->IsCancelRequested()) {
1278                     CThreadPool_Impl::sx_RequestToCancel(m_CurrentTask);
1279                 }
1280                 CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask,
1281                                                  CThreadPool_Task::eCanceled);
1282                 m_CurrentTask = NULL;
1283                 continue;
1284             }
1285 
1286             x_SetIdleState(false);
1287             m_Pool->TaskStarting();
1288 
1289             CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask,
1290                                                CThreadPool_Task::eExecuting);
1291 
1292             try {
1293                 CThreadPool_Task::EStatus status =
1294                                 s_ConvertTaskResult(m_CurrentTask->Execute());
1295                 x_TaskFinished(status);
1296             }
1297             catch (exception& e) {
1298                 ERR_POST_X(7, "Exception from task in ThreadPool: "
1299                               << e.what());
1300                 x_TaskFinished(CThreadPool_Task::eFailed);
1301             }
1302             catch (...) {
1303                 x_TaskFinished(CThreadPool_Task::eFailed);
1304                 throw;
1305             }
1306         }
1307     }
1308 }
1309 
1310 inline void
1311 CThreadPool_ThreadImpl::OnExit(void)
1312 {
1313     try {
1314         m_Interface->Finalize();
1315     } STD_CATCH_ALL_X(8, "Finalize")
1316 
1317     m_Pool->ThreadStopped(this);
1318 }
1319 
1320 
1321 
1322 inline CThreadPool_Impl*
1323 CThreadPool_Impl::s_GetImplPointer(CThreadPool* pool)
1324 {
1325     return pool->m_Impl;
1326 }
1327 
1328 inline unsigned int
1329 CThreadPool_Impl::x_GetQueueSize(unsigned int queue_size)
1330 {
1331     if (queue_size == 0) {
1332         // 10 is just in case, in fact when queue_size == 0 pool will always
1333         // check for idle threads, so tasks will never crowd in the queue
1334         queue_size = 10;
1335         m_IsQueueAllowed = false;
1336     }
1337     else {
1338         m_IsQueueAllowed = true;
1339     }
1340 
1341     return queue_size;
1342 }
1343 
1344 inline
1345 CThreadPool_Impl::CThreadPool_Impl(CThreadPool*      pool_intf,
1346                                    unsigned int      queue_size,
1347                                    unsigned int      max_threads,
1348                                    unsigned int      min_threads,
1349                                    CThread::TRunMode threads_mode)
1350     : m_Queue(x_GetQueueSize(queue_size)),
1351       m_RoomWait(0, kMax_Int),
1352       m_AbortWait(0, kMax_Int)
1353 {
1354     x_Init(pool_intf,
1355            new CThreadPool_Controller_PID(max_threads, min_threads),
1356            threads_mode);
1357 }
1358 
1359 inline
1360 CThreadPool_Impl::CThreadPool_Impl(CThreadPool*            pool_intf,
1361                                    unsigned int            queue_size,
1362                                    CThreadPool_Controller* controller,
1363                                    CThread::TRunMode       threads_mode)
1364     : m_Queue(x_GetQueueSize(queue_size)),
1365       m_RoomWait(0, kMax_Int),
1366       m_AbortWait(0, kMax_Int)
1367 {
1368     x_Init(pool_intf, controller, threads_mode);
1369 }
1370 
1371 void
1372 CThreadPool_Impl::x_Init(CThreadPool*             pool_intf,
1373                          CThreadPool_Controller*  controller,
1374                          CThread::TRunMode        threads_mode)
1375 {
1376     m_Interface = pool_intf;
1377     m_SelfRef = this;
1378     m_DestroyTimeout = CTimeSpan(10, 0);
1379     m_ThreadsCount.Set(0);
1380     m_ExecutingTasks.Set(0);
1381     m_TotalTasks.Set(0);
1382     m_Aborted = false;
1383     m_Suspended = false;
1384     m_ThreadsMode = (threads_mode | CThread::fRunDetached)
1385                      & ~CThread::fRunAllowST;
1386 
1387     controller->x_AttachToPool(this);
1388     m_Controller = controller;
1389 
1390     m_ServiceThread = new CThreadPool_ServiceThread(this);
1391 }
1392 
1393 CThreadPool_Impl::~CThreadPool_Impl(void)
1394 {}
1395 
1396 inline void
1397 CThreadPool_Impl::DestroyReference(void)
1398 {
1399     // Abort even if m_Aborted == true because threads can still be running
1400     // and we have to wait for their termination
1401     Abort(&m_DestroyTimeout);
1402 
1403     m_Interface = NULL;
1404     m_ServiceThread = NULL;
1405     m_SelfRef = NULL;
1406 }
1407 
1408 inline void
1409 CThreadPool_Impl::SetDestroyTimeout(const CTimeSpan& timeout)
1410 {
1411     m_DestroyTimeout = timeout;
1412 }
1413 
1414 inline const CTimeSpan&
1415 CThreadPool_Impl::GetDestroyTimeout(void) const
1416 {
1417     return m_DestroyTimeout;
1418 }
1419 
1420 void
1421 CThreadPool_Impl::LaunchThreads(unsigned int count)
1422 {
1423     if (count == 0)
1424         return;
1425 
1426     CThreadPool_Guard guard(this);
1427 
1428     for (unsigned int i = 0; i < count; ++i) {
1429         CRef<CThreadPool_Thread> thread(m_Interface->CreateThread());
1430         m_IdleThreads.insert(
1431                         CThreadPool_ThreadImpl::s_GetImplPointer(thread));
1432         thread->Run(m_ThreadsMode);
1433     }
1434 
1435     m_ThreadsCount.Add(count);
1436     CallControllerOther();
1437 }
1438 
1439 void
1440 CThreadPool_Impl::FinishThreads(unsigned int count)
1441 {
1442     if (count == 0)
1443         return;
1444 
1445     CThreadPool_Guard guard(this);
1446 
1447     // The cast is theoretically extraneous, but Sun's WorkShop
1448     // compiler otherwise calls the wrong versions of begin() and
1449     // end() and refuses to convert the resulting iterators.
1450     REVERSE_ITERATE(TThreadsList, it,
1451                     static_cast<const TThreadsList&>(m_IdleThreads))
1452     {
1453         // Maybe in case of several quick consecutive calls we should favor
1454         // the willing to finish several threads.
1455         //if ((*it)->IsFinishing())
1456         //    continue;
1457 
1458         (*it)->RequestToFinish();
1459         --count;
1460         if (count == 0)
1461             break;
1462     }
1463 
1464     REVERSE_ITERATE(TThreadsList, it,
1465                     static_cast<const TThreadsList&>(m_WorkingThreads))
1466     {
1467         if (count == 0)
1468             break;
1469 
1470         (*it)->RequestToFinish();
1471         --count;
1472     }
1473 }
1474 
1475 void
1476 CThreadPool_Impl::SetThreadIdle(CThreadPool_ThreadImpl* thread, bool is_idle)
1477 {
1478     CThreadPool_Guard guard(this);
1479 
1480     TThreadsList* to_del;
1481     TThreadsList* to_ins;
1482     if (is_idle) {
1483         to_del = &m_WorkingThreads;
1484         to_ins = &m_IdleThreads;
1485     }
1486     else {
1487         to_del = &m_IdleThreads;
1488         to_ins = &m_WorkingThreads;
1489     }
1490 
1491     TThreadsList::iterator it = to_del->find(thread);
1492     if (it != to_del->end()) {
1493         to_del->erase(it);
1494     }
1495     to_ins->insert(thread);
1496 
1497     if (is_idle  &&  m_Suspended
1498         &&  (m_SuspendFlags & CThreadPool::fFlushThreads))
1499     {
1500         thread->RequestToFinish();
1501     }
1502 
1503     ThreadStateChanged();
1504 }
1505 
1506 inline bool
1507 CThreadPool_Impl::x_IsNewTaskAllowed(void) const
1508 {
1509     return !m_Aborted
1510             &&  (!m_Suspended
1511                   ||  !(m_SuspendFlags & CThreadPool::fDoNotAllowNewTasks));
1512 }
1513 
1514 bool
1515 CThreadPool_Impl::x_CanAddImmediateTask(void) const
1516 {
1517     // If pool aborts at some point in waiting it has to stop waiting
1518     // immediately
1519     return !x_IsNewTaskAllowed()
1520         ||  !m_Suspended  &&  (unsigned int)m_TotalTasks.Get()
1521                                               < m_Controller->GetMaxThreads();
1522 }
1523 
1524 bool
1525 CThreadPool_Impl::x_HasNoThreads(void) const
1526 {
1527     CThreadPool_ServiceThread* thread = m_ServiceThread.GetNCPointerOrNull();
1528     return m_IdleThreads.size() + m_WorkingThreads.size() == 0
1529            &&  (! thread  ||  thread->IsFinished());
1530 }
1531 
1532 bool
1533 CThreadPool_Impl::x_WaitForPredicate(TWaitPredicate      wait_func,
1534                                      CThreadPool_Guard*  pool_guard,
1535                                      CSemaphore*         wait_sema,
1536                                      const CTimeSpan*    timeout,
1537                                      const CStopWatch*   timer)
1538 {
1539     while (!(this->*wait_func)()) {
1540         pool_guard->Release();
1541 
1542         if (timeout) {
1543             CTimeSpan next_tm = CTimeSpan(timeout->GetAsDouble()
1544                                               - timer->Elapsed());
1545             if (next_tm.GetSign() == eNegative) {
1546                 return false;
1547             }
1548 
1549             if (! wait_sema->TryWait(next_tm.GetCompleteSeconds(),
1550                                      next_tm.GetNanoSecondsAfterSecond()))
1551             {
1552                 return false;
1553             }
1554         }
1555         else {
1556             wait_sema->Wait();
1557         }
1558 
1559         pool_guard->Guard();
1560     }
1561 
1562     return true;
1563 }
1564 
1565 /// Throw an exception with standard message when AddTask() is called
1566 /// but ThreadPool is aborted or do not allow new tasks
1567 static inline void
1568 ThrowAddProhibited(void)
1569 {
1570     NCBI_THROW(CThreadPoolException, eProhibited,
1571                "Adding of new tasks is prohibited");
1572 }
1573 
1574  inline void
1575 CThreadPool_Impl::AddTask(CThreadPool_Task* task, const CTimeSpan* timeout)
1576 {
1577     _ASSERT(task);
1578 
1579     // To be sure that if simple new operator was passed as argument the task
1580     // will still be referenced even if some exception happen in this method
1581     CRef<CThreadPool_Task> task_ref(task);
1582 
1583     if (!x_IsNewTaskAllowed()) {
1584         ThrowAddProhibited();
1585     }
1586 
1587     CThreadPool_Guard guard(this, false);
1588     auto_ptr<CTimeSpan> real_timeout;
1589 
1590     if (!m_IsQueueAllowed) {
1591         guard.Guard();
1592 
1593         CStopWatch timer(CStopWatch::eStart);
1594         if (! x_WaitForPredicate(&CThreadPool_Impl::x_CanAddImmediateTask,
1595                                  &guard, &m_RoomWait, timeout, &timer))
1596         {
1597             NCBI_THROW(CSyncQueueException, eNoRoom,
1598                        "Cannot add task - all threads are busy");
1599         }
1600 
1601         if (!x_IsNewTaskAllowed()) {
1602             ThrowAddProhibited();
1603         }
1604 
1605         if (timeout) {
1606             real_timeout.reset(new CTimeSpan(timeout->GetAsDouble()
1607                                                   - timer.Elapsed()));
1608         }
1609     }
1610 
1611     task->x_SetOwner(this);
1612     task->x_SetStatus(CThreadPool_Task::eQueued);
1613     try {
1614         // Pushing to queue must be out of mutex to be able to wait
1615         // for available space.
1616         m_Queue.Push(Ref(task), real_timeout.get());
1617     }
1618     catch (...) {
1619         task->x_SetStatus(CThreadPool_Task::eIdle);
1620         task->x_ResetOwner();
1621         throw;
1622     }
1623 
1624     if (m_IsQueueAllowed) {
1625         guard.Guard();
1626     }
1627 
1628     // Check if someone aborted the pool or suspended it with cacelation of
1629     // queued tasks after we added this task to the queue but before we were
1630     // able to acquire the mutex
1631     CThreadPool::TExclusiveFlags check_flags
1632         = CThreadPool::fDoNotAllowNewTasks + CThreadPool::fCancelQueuedTasks;
1633     if (m_Aborted  ||  m_Suspended
1634                        &&  (m_SuspendFlags & check_flags)  == check_flags)
1635     {
1636         if (m_Queue.GetSize() != 0) {
1637             x_CancelQueuedTasks();
1638         }
1639         return;
1640     }
1641 
1642     unsigned int cnt_req = (unsigned int)m_TotalTasks.Add(1);
1643 
1644     if (!m_IsQueueAllowed  &&  cnt_req > GetThreadsCount()) {
1645         LaunchThreads(cnt_req - GetThreadsCount());
1646     }
1647 
1648     if (! m_Suspended) {
1649         int count = GetQueuedTasksCount();
1650         ITERATE(TThreadsList, it, m_IdleThreads) {
1651             if (! (*it)->IsFinishing()) {
1652                 (*it)->WakeUp();
1653                 --count;
1654                 if (count == 0)
1655                     break;
1656             }
1657         }
1658     }
1659 
1660     CallControllerOther();
1661 }
1662 
1663 inline void
1664 CThreadPool_Impl::x_RemoveTaskFromQueue(const CThreadPool_Task* task)
1665 {
1666     TQueue::TAccessGuard q_guard(m_Queue);
1667 
1668     TQueue::TAccessGuard::TIterator it = q_guard.Begin();
1669     while (it != q_guard.End()  &&  *it != task) {
1670         ++it;
1671     }
1672 
1673     if (it != q_guard.End()) {
1674         q_guard.Erase(it);
1675     }
1676 }
1677 
1678 void
1679 CThreadPool_Impl::RequestExclusiveExecution(CThreadPool_Task*  task,
1680                                             TExclusiveFlags    flags)
1681 {
1682     _ASSERT(task);
1683 
1684     // To be sure that if simple new operator was passed as argument the task
1685     // will still be referenced even if some exception happen in this method
1686     CRef<CThreadPool_Task> task_ref(task);
1687 
1688     if (m_Aborted) {
1689         NCBI_THROW(CThreadPoolException, eProhibited,
1690                    "Cannot add exclusive task when ThreadPool is aborted");
1691     }
1692 
1693     task->x_SetOwner(this);
1694     task->x_SetStatus(CThreadPool_Task::eQueued);
1695     m_ExclusiveQueue.Push(TExclusiveTaskInfo(flags, Ref(task)));
1696 
1697     CThreadPool_ServiceThread* thread = m_ServiceThread;
1698     if (thread) {
1699         thread->WakeUp();
1700     }
1701 }
1702 
1703 void
1704 CThreadPool_Impl::CancelTask(CThreadPool_Task* task)
1705 {
1706     _ASSERT(task);
1707 
1708     if (task->IsFinished()) {
1709         return;
1710     }
1711     // Some race can happen here if the task is being queued now
1712     if (task->GetStatus() == CThreadPool_Task::eIdle) {
1713         task->x_RequestToCancel();
1714         return;
1715     }
1716 
1717     CThreadPool* task_pool = task->GetPool();
1718     if (task_pool != m_Interface) {
1719         if (!task_pool) {
1720             // Task have just finished - we can do nothing
1721             return;
1722         }
1723 
1724         NCBI_THROW(CThreadPoolException, eInvalid,
1725                    "Cannot cancel task execution "
1726                    "if it is inserted in another ThreadPool");
1727     }
1728 
1729     task->x_RequestToCancel();
1730     x_RemoveTaskFromQueue(task);
1731 
1732     CallControllerOther();
1733 }
1734 
1735 inline void
1736 CThreadPool_Impl::CancelTasks(TExclusiveFlags tasks_group)
1737 {
1738     _ASSERT( (tasks_group & (CThreadPool::fCancelExecutingTasks
1739                              + CThreadPool::fCancelQueuedTasks))
1740                   == tasks_group
1741              &&  tasks_group != 0);
1742 
1743     if (tasks_group & CThreadPool::fCancelQueuedTasks) {
1744         x_CancelQueuedTasks();
1745     }
1746     if (tasks_group & CThreadPool::fCancelExecutingTasks) {
1747         x_CancelExecutingTasks();
1748     }
1749 
1750     CallControllerOther();
1751 }
1752 
1753 void
1754 CThreadPool_Impl::x_CancelExecutingTasks(void)
1755 {
1756     CThreadPool_Guard guard(this);
1757 
1758     ITERATE(TThreadsList, it, m_WorkingThreads) {
1759         (*it)->CancelCurrentTask();
1760     }
1761 
1762     // CThreadPool_ThreadImpl::Main() acts not under guard, so we cannot be
1763     // sure that it doesn't have already task to execute before it marked
1764     // itself as working
1765     ITERATE(TThreadsList, it, m_IdleThreads) {
1766         (*it)->CancelCurrentTask();
1767     }
1768 }
1769 
1770 void
1771 CThreadPool_Impl::x_CancelQueuedTasks(void)
1772 {
1773     TQueue::TAccessGuard q_guard(m_Queue);
1774 
1775     for (TQueue::TAccessGuard::TIterator it = q_guard.Begin();
1776                                          it != q_guard.End(); ++it)
1777     {
1778         it->GetNCPointer()->x_RequestToCancel();
1779     }
1780 
1781     m_Queue.Clear();
1782 }
1783 
1784 inline void
1785 CThreadPool_Impl::FlushThreads(CThreadPool::EFlushType flush_type)
1786 {
1787     CThreadPool_Guard guard(this);
1788 
1789     if (m_Aborted) {
1790         NCBI_THROW(CThreadPoolException, eProhibited,
1791                    "Cannot flush threads when ThreadPool aborted");
1792     }
1793 
1794     if (flush_type == CThreadPool::eStartImmediately
1795         ||  flush_type == CThreadPool::eWaitToFinish  &&  m_Suspended)
1796     {
1797         FinishThreads(GetThreadsCount());
1798     }
1799     else if (flush_type == CThreadPool::eWaitToFinish) {
1800         bool need_add = true;
1801 
1802         {{
1803             // To avoid races with TryGetExclusiveTask() we need to put
1804             // guard here
1805             TExclusiveQueue::TAccessGuard q_guard(m_ExclusiveQueue);
1806 
1807             if (m_ExclusiveQueue.GetSize() != 0) {
1808                 m_FlushRequested = true;
1809                 need_add = false;
1810             }
1811         }}
1812 
1813         if (need_add) {
1814             RequestExclusiveExecution(new CThreadPool_EmptyTask(),
1815                                       CThreadPool::fFlushThreads);
1816         }
1817     }
1818 }
1819 
1820 inline void
1821 CThreadPool_Impl::Abort(const CTimeSpan* timeout)
1822 {
1823     CThreadPool_Guard guard(this);
1824 
1825     // Method can be called several times in a row and every time we need
1826     // to wait for threads to finish operation
1827     m_Aborted = true;
1828 
1829     x_CancelQueuedTasks();
1830     x_CancelExecutingTasks();
1831 
1832     {{
1833         TExclusiveQueue::TAccessGuard q_guard(m_ExclusiveQueue);
1834 
1835         for (TExclusiveQueue::TAccessGuard::TIterator it = q_guard.Begin();
1836                                                 it != q_guard.End(); ++it)
1837         {
1838             it->second->x_RequestToCancel();
1839         }
1840 
1841         m_ExclusiveQueue.Clear();
1842     }}
1843 
1844     if (m_ServiceThread.NotNull()) {
1845         m_ServiceThread->RequestToFinish();
1846     }
1847 
1848     FinishThreads(GetThreadsCount());
1849 
1850     if (m_Controller.NotNull()) {
1851         m_Controller->x_DetachFromPool();
1852     }
1853 
1854     CStopWatch timer(CStopWatch::eStart);
1855     x_WaitForPredicate(&CThreadPool_Impl::x_HasNoThreads,
1856                        &guard, &m_AbortWait, timeout, &timer);
1857     m_AbortWait.Post();
1858 
1859     // This assigning can destroy the controller. If some threads are not
1860     // finished yet and at this very moment will call controller it can crush.
1861     //m_Controller = NULL;
1862 }
1863 
1864 
1865 
1866 CThreadPool_Controller::CThreadPool_Controller(unsigned int max_threads,
1867                                                unsigned int min_threads)
1868     : m_Pool(NULL),
1869       m_MinThreads(min_threads),
1870       m_MaxThreads(max_threads),
1871       m_InHandleEvent(false)
1872 {
1873 }
1874 
1875 CThreadPool_Controller::~CThreadPool_Controller(void)
1876 {}
1877 
1878 CThreadPool*
1879 CThreadPool_Controller::GetPool(void) const
1880 {
1881     // Avoid changing of pointer during method execution
1882     CThreadPool_Impl* pool = m_Pool;
1883     return pool? pool->GetPoolInterface(): NULL;
1884 }
1885 
1886 CMutex&
1887 CThreadPool_Controller::GetMainPoolMutex(void) const
1888 {
1889     CThreadPool_Impl* pool = m_Pool;
1890     if (!pool) {
1891         NCBI_THROW(CThreadPoolException, eInactive,
1892                    "Cannot do active work when not attached "
1893                    "to some ThreadPool");
1894     }
1895     return pool->GetMainPoolMutex();
1896 }
1897 
1898 void
1899 CThreadPool_Controller::EnsureLimits(void)
1900 {
1901     CThreadPool_Impl* pool = m_Pool;
1902 
1903     if (! pool)
1904         return;
1905 
1906     Uint4 count = pool->GetThreadsCount();
1907     if (count > m_MaxThreads) {
1908         pool->FinishThreads(count - m_MaxThreads);
1909     }
1910     if (count < m_MinThreads) {
1911         pool->LaunchThreads(m_MinThreads - count);
1912     }
1913 }
1914 
1915 void
1916 CThreadPool_Controller::SetMinThreads(unsigned int min_threads)
1917 {
1918     CThreadPool_Guard guard(m_Pool, false);
1919     if (m_Pool)
1920         guard.Guard();
1921 
1922     m_MinThreads = min_threads;
1923 
1924     EnsureLimits();
1925 }
1926 
1927 void
1928 CThreadPool_Controller::SetMaxThreads(unsigned int max_threads)
1929 {
1930     CThreadPool_Guard guard(m_Pool, false);
1931     if (m_Pool)
1932         guard.Guard();
1933 
1934     m_MaxThreads = max_threads;
1935 
1936     EnsureLimits();
1937 }
1938 
1939 void
1940 CThreadPool_Controller::SetThreadsCount(unsigned int count)
1941 {
1942     if (count > GetMaxThreads())
1943         count = GetMaxThreads();
1944     if (count < GetMinThreads())
1945         count = GetMinThreads();
1946 
1947     CThreadPool_Impl* pool = m_Pool;
1948 
1949     unsigned int now_cnt = pool->GetThreadsCount();
1950     if (count > now_cnt) {
1951         pool->LaunchThreads(count - now_cnt);
1952     }
1953     else if (count < now_cnt) {
1954         pool->FinishThreads(now_cnt - count);
1955     }
1956 }
1957 
1958 void
1959 CThreadPool_Controller::HandleEvent(EEvent event)
1960 {
1961     CThreadPool_Impl* pool = m_Pool;
1962     if (! pool)
1963         return;
1964 
1965     CThreadPool_Guard guard(pool);
1966 
1967     if (m_InHandleEvent  ||  pool->IsAborted()  ||  pool->IsSuspended())
1968         return;
1969 
1970     m_InHandleEvent = true;
1971 
1972     try {
1973         OnEvent(event);
1974         m_InHandleEvent = false;
1975     }
1976     catch (...) {
1977         m_InHandleEvent = false;
1978         throw;
1979     }
1980 }
1981 
1982 CTimeSpan
1983 CThreadPool_Controller::GetSafeSleepTime(void) const
1984 {
1985     if (m_Pool) {
1986         return CTimeSpan(1, 0);
1987     }
1988     else {
1989         return CTimeSpan(0, 0);
1990     }
1991 }
1992 
1993 
1994 
1995 CThreadPool_Thread::CThreadPool_Thread(CThreadPool* pool)
1996 {
1997     _ASSERT(pool);
1998 
1999     m_Impl = new CThreadPool_ThreadImpl(this,
2000                                     CThreadPool_Impl::s_GetImplPointer(pool));
2001 }
2002 
2003 CThreadPool_Thread::~CThreadPool_Thread(void)
2004 {
2005     delete m_Impl;
2006 }
2007 
2008 void
2009 CThreadPool_Thread::Initialize(void)
2010 {}
2011 
2012 void
2013 CThreadPool_Thread::Finalize(void)
2014 {}
2015 
2016 CThreadPool*
2017 CThreadPool_Thread::GetPool(void) const
2018 {
2019     return m_Impl->GetPool();
2020 }
2021 
2022 CRef<CThreadPool_Task>
2023 CThreadPool_Thread::GetCurrentTask(void) const
2024 {
2025     return m_Impl->GetCurrentTask();
2026 }
2027 
2028 void*
2029 CThreadPool_Thread::Main(void)
2030 {
2031     m_Impl->Main();
2032     return NULL;
2033 }
2034 
2035 void
2036 CThreadPool_Thread::OnExit(void)
2037 {
2038     m_Impl->OnExit();
2039 }
2040 
2041 
2042 
2043 CThreadPool::CThreadPool(unsigned int      queue_size,
2044                          unsigned int      max_threads,
2045                          unsigned int      min_threads,
2046                          CThread::TRunMode threads_mode)
2047 {
2048     m_Impl = new CThreadPool_Impl(this, queue_size, max_threads, min_threads,
2049                                   threads_mode);
2050     m_Impl->SetInterfaceStarted();
2051 }
2052 
2053 CThreadPool::CThreadPool(unsigned int            queue_size,
2054                          CThreadPool_Controller* controller,
2055                          CThread::TRunMode       threads_mode)
2056 {
2057     m_Impl = new CThreadPool_Impl(this, queue_size, controller, threads_mode);
2058     m_Impl->SetInterfaceStarted();
2059 }
2060 
2061 CThreadPool::~CThreadPool(void)
2062 {
2063     m_Impl->DestroyReference();
2064 }
2065 
2066 CMutex&
2067 CThreadPool::GetMainPoolMutex(void)
2068 {
2069     return m_Impl->GetMainPoolMutex();
2070 }
2071 
2072 CThreadPool_Thread*
2073 CThreadPool::CreateThread(void)
2074 {
2075     return CThreadPool_ThreadImpl::s_CreateThread(this);
2076 }
2077 
2078 void
2079 CThreadPool::AddTask(CThreadPool_Task* task, const CTimeSpan* timeout)
2080 {
2081     m_Impl->AddTask(task, timeout);
2082 }
2083 
2084 void
2085 CThreadPool::CancelTask(CThreadPool_Task* task)
2086 {
2087     m_Impl->CancelTask(task);
2088 }
2089 
2090 void
2091 CThreadPool::Abort(const CTimeSpan* timeout)
2092 {
2093     m_Impl->Abort(timeout);
2094 }
2095 
2096 bool
2097 CThreadPool::IsAborted(void) const
2098 {
2099     return m_Impl->IsAborted();
2100 }
2101 
2102 void
2103 CThreadPool::SetDestroyTimeout(const CTimeSpan& timeout)
2104 {
2105     m_Impl->SetDestroyTimeout(timeout);
2106 }
2107 
2108 const CTimeSpan&
2109 CThreadPool::GetDestroyTimeout(void) const
2110 {
2111     return m_Impl->GetDestroyTimeout();
2112 }
2113 
2114 void
2115 CThreadPool::RequestExclusiveExecution(CThreadPool_Task*  task,
2116                                        TExclusiveFlags    flags)
2117 {
2118     m_Impl->RequestExclusiveExecution(task, flags);
2119 }
2120 
2121 void
2122 CThreadPool::CancelTasks(TExclusiveFlags tasks_group)
2123 {
2124     m_Impl->CancelTasks(tasks_group);
2125 }
2126 
2127 void
2128 CThreadPool::FlushThreads(EFlushType flush_type)
2129 {
2130     m_Impl->FlushThreads(flush_type);
2131 }
2132 
2133 unsigned int
2134 CThreadPool::GetThreadsCount(void) const
2135 {
2136     return m_Impl->GetThreadsCount();
2137 }
2138 
2139 unsigned int
2140 CThreadPool::GetQueuedTasksCount(void) const
2141 {
2142     return m_Impl->GetQueuedTasksCount();
2143 }
2144 
2145 unsigned int
2146 CThreadPool::GetExecutingTasksCount(void) const
2147 {
2148     return m_Impl->GetExecutingTasksCount();
2149 }
2150 
2151 
2152 
2153 END_NCBI_SCOPE
2154 

source navigation ]   [ diff markup ]   [ identifier search ]   [ freetext search ]   [ file search ]  

This page was automatically generated by the LXR engine.
Visit the LXR main site for more information.