|
NCBI Home IEB Home C++ Toolkit docs C Toolkit source browser C Toolkit source browser (2) |
NCBI C++ Toolkit Cross ReferenceC++/src/util/thread_pool.cpp |
source navigation diff markup identifier search freetext search file search |
1 /* $Id: thread_pool.cpp 164845 2009-07-01 16:44:27Z satskyse $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Author: Pavel Ivanov
27 *
28 * File Description:
29 * Pool of threads.
30 */
31
32 #include <ncbi_pch.hpp>
33 #include <util/thread_pool.hpp>
34 #include <util/thread_pool_ctrl.hpp>
35 #include <util/sync_queue.hpp>
36 #include <util/error_codes.hpp>
37
38 #define NCBI_USE_ERRCODE_X Util_Thread
39
40 BEGIN_NCBI_SCOPE
41
42
43 class CThreadPool_Guard;
44 class CThreadPool_ServiceThread;
45
46
47 /// Functor to compare tasks by priority
48 struct SThreadPool_TaskCompare {
49 bool operator() (const CRef<CThreadPool_Task>& left,
50 const CRef<CThreadPool_Task>& right)
51 {
52 return left->GetPriority() < right->GetPriority();
53 }
54 };
55
56
57 /// Real implementation of all ThreadPool functions
58 class CThreadPool_Impl : public CObject
59 {
60 public:
61 typedef CThreadPool::TExclusiveFlags TExclusiveFlags;
62
63 /// Convert pointer to CThreadPool object into pointer to CThreadPool_Impl
64 /// object. Can be done only here to avoid excessive friendship to
65 /// CThreadPool class.
66 static CThreadPool_Impl* s_GetImplPointer(CThreadPool* pool);
67
68 /// Call x_SetTaskStatus() for the given task.
69 /// Method introduced to avoid excessive friendship to CThreadPool_Task
70 /// class.
71 ///
72 /// @sa CThreadPool_Task::x_SetTaskStatus()
73 static void sx_SetTaskStatus(CThreadPool_Task* task,
74 CThreadPool_Task::EStatus status);
75
76 /// Call x_RequestToCancel() for the given task.
77 /// Method introduced to avoid excessive friendship to CThreadPool_Task
78 /// class.
79 ///
80 /// @sa CThreadPool_Task::x_RequestToCancel()
81 static void sx_RequestToCancel(CThreadPool_Task* task);
82
83
84 /// Constructor with default controller
85 /// @param pool_intf
86 /// ThreadPool interface object attached to this implementation
87 ///
88 /// @sa CThreadPool::CThreadPool()
89 CThreadPool_Impl(CThreadPool* pool_intf,
90 unsigned int queue_size,
91 unsigned int max_threads,
92 unsigned int min_threads,
93 CThread::TRunMode threads_mode = CThread::fRunDefault);
94
95 /// Constructor with explicitly given controller
96 /// @param pool_intf
97 /// ThreadPool interface object attached to this implementation
98 ///
99 /// @sa CThreadPool::CThreadPool()
100 CThreadPool_Impl(CThreadPool* pool_intf,
101 unsigned int queue_size,
102 CThreadPool_Controller* controller,
103 CThread::TRunMode threads_mode = CThread::fRunDefault);
104
105 /// Get pointer to ThreadPool interface object
106 CThreadPool* GetPoolInterface(void) const;
107
108 /// Set destroy timeout for the pool
109 ///
110 /// @sa CThreadPool::SetDestroyTimeout()
111 void SetDestroyTimeout(const CTimeSpan& timeout);
112
113 /// Get destroy timeout for the pool
114 ///
115 /// @sa CThreadPool::GetDestroyTimeout()
116 const CTimeSpan& GetDestroyTimeout(void) const;
117
118 /// Destroy reference to this object
119 /// Method is called when CThreadPool object is destroyed which means
120 /// that implementation can be destroyed too if there is no references
121 /// to it left.
122 void DestroyReference(void);
123
124 /// Get main pool mutex
125 ///
126 /// @sa CThreadPool::GetMainPoolMutex()
127 CMutex& GetMainPoolMutex(void);
128
129 /// Add task to the pool
130 ///
131 /// @sa CThreadPool::AddTask()
132 void AddTask(CThreadPool_Task* task, const CTimeSpan* timeout);
133
134 /// Request to cancel the task
135 ///
136 /// @sa CThreadPool::CancelTask()
137 void CancelTask(CThreadPool_Task* task);
138
139 /// Cancel the selected groups of tasks in the pool
140 ///
141 /// @sa CThreadPool::CancelTasks()
142 void CancelTasks(TExclusiveFlags tasks_group);
143
144 /// Add the task for exclusive execution in the pool
145 ///
146 /// @sa CThreadPool::RequestExclusiveExecution()
147 void RequestExclusiveExecution(CThreadPool_Task* task,
148 TExclusiveFlags flags);
149
150 /// Launch new threads in pool
151 /// @param count
152 /// Number of threads to launch
153 void LaunchThreads(unsigned int count);
154
155 /// Finish threads in pool
156 /// Stop first all idle threads then stop busy threads without
157 /// cancelation of currently executing tasks.
158 /// @param count
159 /// Number of threads to finish
160 void FinishThreads(unsigned int count);
161
162 /// Get number of threads running in the pool
163 unsigned int GetThreadsCount(void) const;
164
165 /// Mark thread as idle or non-idle
166 /// @param thread
167 /// Thread to mark
168 /// @param is_idle
169 /// If thread should be marked as idle or not
170 void SetThreadIdle(CThreadPool_ThreadImpl* thread, bool is_idle);
171
172 /// Callback from working thread when it finished its Main() method
173 void ThreadStopped(CThreadPool_ThreadImpl* thread);
174
175 /// Callback when some thread changed its idleness or finished
176 /// (including service thread)
177 void ThreadStateChanged(void);
178
179 /// Get next task from queue if there is one
180 /// If the queue is empty then return NULL.
181 CRef<CThreadPool_Task> TryGetNextTask(void);
182
183 /// Callback from thread when it is starting to execute task
184 void TaskStarting(void);
185
186 /// Callback from thread when it has finished to execute task
187 void TaskFinished(void);
188
189 /// Get the number of tasks currently waiting in queue
190 unsigned int GetQueuedTasksCount(void) const;
191
192 /// Get the number of currently executing tasks
193 unsigned int GetExecutingTasksCount(void) const;
194
195 /// Type for storing information about exclusive task launching
196 typedef pair< TExclusiveFlags,
197 CRef<CThreadPool_Task> > TExclusiveTaskInfo;
198
199 /// Get information about next exclusive task to execute
200 TExclusiveTaskInfo TryGetExclusiveTask(void);
201
202 /// Request suspension of the pool
203 /// @param flags
204 /// Parameters for necessary exclusive execution environment
205 void RequestSuspend(TExclusiveFlags flags);
206
207 /// Resume the pool operation after exclusive task execution
208 void ResumeWork(void);
209
210 /// Check if the pool is suspended for exclusive execution
211 bool IsSuspended(void) const;
212
213 /// Check if it is already allowed to execute exclusive task
214 bool CanDoExclusiveTask(void) const;
215
216 /// Abort the pool operation
217 ///
218 /// @sa CThreadPool::Abort()
219 void Abort(const CTimeSpan* timeout);
220
221 /// Check if the pool is already aborted
222 bool IsAborted(void) const;
223
224 /// Finish all current threads and replace them with new ones
225 ///
226 /// @sa CThreadPool::FlushThreads()
227 void FlushThreads(CThreadPool::EFlushType flush_type);
228
229 /// Call the CThreadPool_Controller::HandleEvent() method of the pool
230 /// controller with the given event type. If ThreadPool is already aborted
231 /// and controller is reset then do nothing.
232 void CallController(CThreadPool_Controller::EEvent event);
233
234 /// Schedule running of CThreadPool_Controller::HandleEvent() with eOther
235 /// event type
236 void CallControllerOther(void);
237
238 /// Call the CThreadPool_Controller::GetSafeSleepTime() method of the pool
239 /// controller. If ThreadPool is already aborted and controller is reset
240 /// then return time period of 1 second.
241 CTimeSpan GetSafeSleepTime(void) const;
242
243 /// Mark that initialization of the interface was finished
244 void SetInterfaceStarted(void);
245
246
247 private:
248 /// Type of queue used for storing tasks
249 typedef CSyncQueue< CRef<CThreadPool_Task>,
250 CSyncQueue_multiset< CRef<CThreadPool_Task>,
251 SThreadPool_TaskCompare > >
252 TQueue;
253 /// Type of queue used for storing information about exclusive tasks
254 typedef CSyncQueue<TExclusiveTaskInfo> TExclusiveQueue;
255 /// Type of list of all poolled threads
256 typedef set<CThreadPool_ThreadImpl*> TThreadsList;
257
258
259 /// Prohibit copying and assigning
260 CThreadPool_Impl(const CThreadPool_Impl&);
261 CThreadPool_Impl& operator= (const CThreadPool_Impl&);
262
263 /// Transform size of queue given in constructor to the size passed to
264 /// CSyncQueue constructor.
265 /// Method can be called only from constructor because it initializes
266 /// value of m_IsQueueAllowed member variable.
267 unsigned int x_GetQueueSize(unsigned int queue_size);
268
269 /// Initialization of all class member variables that can be initialized
270 /// outside of constructor
271 /// @param pool_intf
272 /// ThreadPool interface object attached to this implementation
273 /// @param controller
274 /// Controller for the pool
275 void x_Init(CThreadPool* pool_intf,
276 CThreadPool_Controller* controller,
277 CThread::TRunMode threads_mode);
278
279 /// Destructor. Will be called from CRef
280 ~CThreadPool_Impl(void);
281
282 /// Delete task from the queue
283 /// If task does not exist in queue then does nothing.
284 void x_RemoveTaskFromQueue(const CThreadPool_Task* task);
285
286 /// Cancel all tasks waiting in the queue
287 void x_CancelQueuedTasks(void);
288
289 /// Cancel all currently executing tasks
290 void x_CancelExecutingTasks(void);
291
292 /// Type of some simple predicate
293 ///
294 /// @sa x_WaitForPredicate
295 typedef bool (CThreadPool_Impl::*TWaitPredicate)(void) const;
296
297 /// Check if new task can be added to the pool
298 bool x_IsNewTaskAllowed(void) const;
299
300 /// Check if new task can be added to the pool when queuing is disabled
301 bool x_CanAddImmediateTask(void) const;
302
303 /// Check if all threads in pool finished their work
304 bool x_HasNoThreads(void) const;
305
306 /// Wait for some predicate to be true
307 /// @param wait_func
308 /// Predicate to wait for
309 /// @param pool_guard
310 /// Guardian that locks main pool mutex at the time of method call and
311 /// that have to be unlocked for the time of waiting
312 /// @param wait_sema
313 /// Semaphore which will be posted when predicate become true
314 /// @param timeout
315 /// Maximum amount of time to wait
316 /// @param timer
317 /// Timer for mesuring elapsed time. Method assumes that timer is
318 /// started at the moment from which timeout should be calculated.
319 bool x_WaitForPredicate(TWaitPredicate wait_func,
320 CThreadPool_Guard* pool_guard,
321 CSemaphore* wait_sema,
322 const CTimeSpan* timeout,
323 const CStopWatch* timer);
324
325
326 private:
327 /// ThreadPool interface object attached to this implementation
328 CThreadPool* m_Interface;
329 /// Reference to this pool to prevent its destroying earlier than we
330 /// allow it to
331 CRef<CThreadPool_Impl> m_SelfRef;
332 /// Timeout to wait for all threads to finish before the ThreadPool
333 /// interface object will be able to destroy
334 CTimeSpan m_DestroyTimeout;
335 /// Queue for storing tasks
336 TQueue m_Queue;
337 /// Mutex for guarding all changes in the pool, its threads and controller
338 CMutex m_MainPoolMutex;
339 /// Semaphore for waiting for available threads to process task when
340 /// queuing is disabled.
341 CSemaphore m_RoomWait;
342 /// Controller managing count of threads in pool
343 CRef<CThreadPool_Controller> m_Controller;
344 /// List of all idle threads
345 TThreadsList m_IdleThreads;
346 /// List of all threads currently executing some tasks
347 TThreadsList m_WorkingThreads;
348 /// Running mode of all threads
349 CThread::TRunMode m_ThreadsMode;
350 /// Total number of threads
351 /// Introduced for more adequate and fast reflecting to threads starting
352 /// and stopping events
353 CAtomicCounter m_ThreadsCount;
354 /// Number of tasks executing now
355 /// Introduced for more adequate and fast reflecting to task executing
356 /// start and finish events
357 CAtomicCounter m_ExecutingTasks;
358 /// Total number of tasks acquired by pool
359 /// Includes queued tasks and executing tasks. Introduced for
360 /// maintaining atomicity of this number changing
361 CAtomicCounter m_TotalTasks;
362 /// Flag about working with special case:
363 /// FALSE - queue_size == 0, TRUE - queue_size > 0
364 bool m_IsQueueAllowed;
365 /// If pool is already aborted or not
366 volatile bool m_Aborted;
367 /// Semaphore for waiting for threads finishing in Abort() method
368 ///
369 /// @sa Abort()
370 CSemaphore m_AbortWait;
371 /// If pool is suspended for exclusive task execution or not
372 volatile bool m_Suspended;
373 /// Requested requirements for the exclusive execution environment
374 volatile TExclusiveFlags m_SuspendFlags;
375 /// Flag indicating if flush of threads requested after adding exclusive
376 /// task but before it is started its execution.
377 volatile bool m_FlushRequested;
378 /// Thread for execution of exclusive tasks and passing of events
379 /// to the controller
380 CRef<CThreadPool_ServiceThread> m_ServiceThread;
381 /// Queue for information about exclusive tasks
382 TExclusiveQueue m_ExclusiveQueue;
383 };
384
385
386
387 /// Real implementation of all CThreadPool_Thread functions
388 class CThreadPool_ThreadImpl
389 {
390 public:
391 /// Convert pointer to CThreadPool_Thread object into pointer
392 /// to CThreadPool_ThreadImpl object. Can be done only here to avoid
393 /// excessive friendship to CThreadPool_Thread class.
394 static CThreadPool_ThreadImpl*
395 s_GetImplPointer(CThreadPool_Thread* thread);
396
397 /// Create new CThreadPool_Thread object
398 /// Method introduced to avoid excessive friendship to CThreadPool_Thread
399 /// class.
400 ///
401 /// @sa CThreadPool_Thread::CThreadPool_Thread()
402 static CThreadPool_Thread* s_CreateThread(CThreadPool* pool);
403
404 /// Constructor
405 /// @param thread_intf
406 /// ThreadPool_Thread interface object attached to this implementation
407 /// @param pool
408 /// Pool implementation owning this thread
409 CThreadPool_ThreadImpl(CThreadPool_Thread* thread_intf,
410 CThreadPool_Impl* pool);
411
412 /// Destructor
413 /// Called directly from CThreadPool destructor
414 ~CThreadPool_ThreadImpl(void);
415
416 /// Get ThreadPool interface object owning this thread
417 ///
418 /// @sa CThreadPool_Thread::GetPool()
419 CThreadPool* GetPool(void) const;
420
421 /// Request this thread to finish its operation.
422 /// It renders the thread unusable and eventually ready for destruction
423 /// (as soon as its current task is finished and there are no CRefs to
424 /// this thread left).
425 void RequestToFinish(void);
426
427 /// If finishing of this thread is already in progress or not
428 bool IsFinishing(void) const;
429
430 /// Wake up the thread from idle state
431 ///
432 /// @sa x_Idle
433 void WakeUp(void);
434
435 /// Get task currently executing in the thread
436 /// May be NULL if thread is idle or is in the middle of changing of
437 /// current task
438 ///
439 /// @sa CThreadPool_Thread::GetCurrentTask()
440 CRef<CThreadPool_Task> GetCurrentTask(void) const;
441
442 /// Request to cancel current task execution
443 void CancelCurrentTask(void);
444
445 /// Implementation of thread Main() method
446 ///
447 /// @sa CThreadPool_Thread::Main()
448 void Main(void);
449
450 /// Implementation of threadOnExit() method
451 ///
452 /// @sa CThreadPool_Thread::OnExit()
453 void OnExit(void);
454
455 private:
456 /// Prohibit copying and assigning
457 CThreadPool_ThreadImpl(const CThreadPool_ThreadImpl&);
458 CThreadPool_ThreadImpl& operator= (const CThreadPool_ThreadImpl&);
459
460 /// Suspend until the wake up signal.
461 ///
462 /// @sa WakeUp()
463 void x_Idle(void);
464
465 /// Mark the thread idle or non-idle
466 void x_SetIdleState(bool is_idle);
467
468 /// Do finalizing when task finished its execution
469 /// @param status
470 /// Status that the task must get
471 void x_TaskFinished(CThreadPool_Task::EStatus status);
472
473
474 /// ThreadPool_Thread interface object attached to this implementation
475 CThreadPool_Thread* m_Interface;
476 /// Pool running the thread
477 CRef<CThreadPool_Impl> m_Pool;
478 /// If the thread is already asked to finish or not
479 volatile bool m_Finishing;
480 /// If cancel of the currently executing task is requested or not
481 volatile bool m_CancelRequested;
482 /// Idleness of the thread
483 bool m_IsIdle;
484 /// Task currently executing in the thread
485 CRef<CThreadPool_Task> m_CurrentTask;
486 /// Semaphore for waking up from idle waiting
487 CSemaphore m_IdleTrigger;
488 };
489
490
491
492 /// Thread used in pool for different internal needs: execution of exclusive
493 /// tasks and passing events to controller
494 class CThreadPool_ServiceThread : public CThread
495 {
496 public:
497 /// Constructor
498 /// @param pool
499 /// ThreadPool owning this thread
500 CThreadPool_ServiceThread(CThreadPool_Impl* pool);
501
502 /// Wake up from idle waiting or waiting of pool preparing exclusive
503 /// environment
504 void WakeUp(void);
505
506 /// Request finishing of the thread
507 void RequestToFinish(void);
508
509 /// Check if this thread have already finished or not
510 bool IsFinished(void);
511
512 /// Tell the thread that controller should handle eOther event
513 ///
514 /// @sa CThreadPool_Controller::HandleEvent()
515 void NeedCallController(void);
516
517 protected:
518 /// Destructor. Will be called from CRef
519 virtual ~CThreadPool_ServiceThread(void);
520
521 private:
522 /// Main thread execution
523 virtual void* Main(void);
524
525 /// Do "idle" work when thread is not busy executing exclusive tasks
526 void x_Idle(void);
527
528 /// Wait until pool is ready for execution of exclusive task
529 void x_WaitForPoolStop(CThreadPool_Guard* pool_guard);
530
531 /// Pool owning this thread
532 CRef<CThreadPool_Impl> m_Pool;
533 /// Semaphore for idle sleeping
534 CSemaphore m_IdleTrigger;
535 /// If finishing of the thread is already requested
536 volatile bool m_Finishing;
537 /// If the thread has already finished its Main() method
538 volatile bool m_Finished;
539 /// Currently executing exclusive task
540 CRef<CThreadPool_Task> m_CurrentTask;
541 /// Flag indicating that thread should pass eOther event to the controller
542 CAtomicCounter m_NeedCallController;
543 };
544
545
546
547 /// Guardian for protecting pool by locking its main mutex
548 class CThreadPool_Guard : private CMutexGuard
549 {
550 public:
551 /// Constructor
552 /// @param pool
553 /// Pool to protect
554 /// @param is_active
555 /// If the mutex should be locked in constructor or not
556 CThreadPool_Guard(CThreadPool_Impl* pool, bool is_active = true);
557
558 /// Turn this guardian on
559 void Guard(void);
560
561 /// Turn this guardian off
562 void Release(void);
563
564 private:
565 /// Pool protected by the guardian
566 CThreadPool_Impl* m_Pool;
567 };
568
569
570
571 /// Special task which does nothing
572 /// It's used in FlushThreads to force pool to wait while all old threads
573 /// finish their operation to start new ones.
574 ///
575 /// @sa CThreadPool_Impl::FlushThreads()
576 class CThreadPool_EmptyTask : public CThreadPool_Task
577 {
578 public:
579 /// Empty main method
580 virtual EStatus Execute(void) { return eCompleted; }
581
582 #ifdef NCBI_COMPILER_ICC
583 // In the absence of the following constructor,
584 // ICC fills the object memory with zeros,
585 // erasing flags set by CObject::operator new
586 CThreadPool_EmptyTask(void) {}
587 #endif
588 };
589
590
591
592 /// Check if status returned from CThreadPool_Task::Execute() is allowed
593 /// and change it to eCompleted value if it is invalid
594 static inline CThreadPool_Task::EStatus
595 s_ConvertTaskResult(CThreadPool_Task::EStatus status)
596 {
597 _ASSERT(status == CThreadPool_Task::eCompleted
598 || status == CThreadPool_Task::eFailed
599 || status == CThreadPool_Task::eCanceled);
600
601 if (status != CThreadPool_Task::eCompleted
602 && status != CThreadPool_Task::eFailed
603 && status != CThreadPool_Task::eCanceled)
604 {
605 ERR_POST_X(9, Critical
606 << "Wrong status returned from "
607 "CThreadPool_Task::Execute(): "
608 << status);
609 status = CThreadPool_Task::eCompleted;
610 }
611
612 return status;
613 }
614
615
616
617 const CAtomicCounter::TValue kNeedCallController_Shift = 0x0FFFFFFF;
618
619
620 inline void
621 CThreadPool_ServiceThread::WakeUp(void)
622 {
623 m_IdleTrigger.Post();
624 }
625
626 inline void
627 CThreadPool_ServiceThread::NeedCallController(void)
628 {
629 if (m_NeedCallController.Add(1) > kNeedCallController_Shift + 1) {
630 m_NeedCallController.Add(-1);
631 }
632 else {
633 WakeUp();
634 }
635 }
636
637
638
639 inline void
640 CThreadPool_ThreadImpl::WakeUp(void)
641 {
642 m_IdleTrigger.Post();
643 }
644
645
646
647 inline CMutex&
648 CThreadPool_Impl::GetMainPoolMutex(void)
649 {
650 return m_MainPoolMutex;
651 }
652
653
654
655 CThreadPool_Guard::CThreadPool_Guard(CThreadPool_Impl* pool, bool is_active)
656 : CMutexGuard(eEmptyGuard),
657 m_Pool(pool)
658 {
659 _ASSERT(pool);
660
661 if (is_active)
662 Guard();
663 }
664
665 void
666 CThreadPool_Guard::Guard(void)
667 {
668 CMutexGuard::Guard(m_Pool->GetMainPoolMutex());
669 }
670
671 void
672 CThreadPool_Guard::Release(void)
673 {
674 CMutexGuard::Release();
675 }
676
677
678
679 inline void
680 CThreadPool_Impl::sx_SetTaskStatus(CThreadPool_Task* task,
681 CThreadPool_Task::EStatus status)
682 {
683 task->x_SetStatus(status);
684 }
685
686 inline void
687 CThreadPool_Impl::sx_RequestToCancel(CThreadPool_Task* task)
688 {
689 task->x_RequestToCancel();
690 }
691
692 inline CThreadPool*
693 CThreadPool_Impl::GetPoolInterface(void) const
694 {
695 return m_Interface;
696 }
697
698 inline void
699 CThreadPool_Impl::SetInterfaceStarted(void)
700 {
701 m_ServiceThread->Run(CThread::fRunDetached);
702 }
703
704 inline bool
705 CThreadPool_Impl::IsAborted(void) const
706 {
707 return m_Aborted;
708 }
709
710 inline bool
711 CThreadPool_Impl::IsSuspended(void) const
712 {
713 return m_Suspended;
714 }
715
716 inline unsigned int
717 CThreadPool_Impl::GetThreadsCount(void) const
718 {
719 return (unsigned int)m_ThreadsCount.Get();
720 }
721
722 inline unsigned int
723 CThreadPool_Impl::GetQueuedTasksCount(void) const
724 {
725 return (unsigned int)m_Queue.GetSize();
726 }
727
728 inline unsigned int
729 CThreadPool_Impl::GetExecutingTasksCount(void) const
730 {
731 return (unsigned int)m_ExecutingTasks.Get();
732 }
733
734 inline CTimeSpan
735 CThreadPool_Impl::GetSafeSleepTime(void) const
736 {
737 // m_Controller variable can be uninitialized in only when ThreadPool
738 // is already aborted
739 CThreadPool_Controller* controller = m_Controller.GetNCPointerOrNull();
740 if (controller && ! m_Aborted) {
741 return controller->GetSafeSleepTime();
742 }
743 else {
744 return CTimeSpan(0, 0);
745 }
746 }
747
748 inline void
749 CThreadPool_Impl::CallController(CThreadPool_Controller::EEvent event)
750 {
751 CThreadPool_Controller* controller = m_Controller.GetNCPointerOrNull();
752 if (controller && ! m_Aborted &&
753 (! m_Suspended || event == CThreadPool_Controller::eSuspend))
754 {
755 controller->HandleEvent(event);
756 }
757 }
758
759 inline void
760 CThreadPool_Impl::CallControllerOther(void)
761 {
762 CThreadPool_ServiceThread* thread = m_ServiceThread;
763 if (thread) {
764 thread->NeedCallController();
765 }
766 }
767
768 inline void
769 CThreadPool_Impl::TaskStarting(void)
770 {
771 m_ExecutingTasks.Add(1);
772 // In current implementation controller operation doesn't depend on this
773 // action. So we will save mutex locks for the sake of performance
774 //CallControllerOther();
775 }
776
777 inline void
778 CThreadPool_Impl::TaskFinished(void)
779 {
780 m_ExecutingTasks.Add(-1);
781 m_TotalTasks.Add(-1);
782 m_RoomWait.Post();
783 CallControllerOther();
784 }
785
786 inline void
787 CThreadPool_Impl::ThreadStateChanged(void)
788 {
789 if (m_Aborted) {
790 if (x_HasNoThreads()) {
791 m_AbortWait.Post();
792 }
793 }
794 else if (m_Suspended) {
795 if ((m_SuspendFlags & CThreadPool::fFlushThreads)
796 && GetThreadsCount() == 0
797 || ! (m_SuspendFlags & CThreadPool::fFlushThreads)
798 && m_WorkingThreads.size() == 0)
799 {
800 m_ServiceThread->WakeUp();
801 }
802 }
803 }
804
805 inline void
806 CThreadPool_Impl::ThreadStopped(CThreadPool_ThreadImpl* thread)
807 {
808 m_ThreadsCount.Add(-1);
809
810 CThreadPool_Guard guard(this);
811
812 m_IdleThreads.erase(thread);
813 m_WorkingThreads.erase(thread);
814
815 CallControllerOther();
816
817 ThreadStateChanged();
818 }
819
820 inline CRef<CThreadPool_Task>
821 CThreadPool_Impl::TryGetNextTask(void)
822 {
823 if (!m_Suspended || (m_SuspendFlags & CThreadPool::fExecuteQueuedTasks)) {
824 TQueue::TAccessGuard guard(m_Queue);
825
826 if (m_Queue.GetSize() != 0) {
827 return m_Queue.Pop();
828 }
829 }
830
831 return CRef<CThreadPool_Task>();
832 }
833
834 inline CThreadPool_Impl::TExclusiveTaskInfo
835 CThreadPool_Impl::TryGetExclusiveTask(void)
836 {
837 TExclusiveQueue::TAccessGuard guard(m_ExclusiveQueue);
838
839 if (m_ExclusiveQueue.GetSize() != 0) {
840 CThreadPool_Impl::TExclusiveTaskInfo info = m_ExclusiveQueue.Pop();
841 if (m_FlushRequested) {
842 info.first |= CThreadPool::fFlushThreads;
843 m_FlushRequested = false;
844 }
845 return info;
846 }
847
848 return TExclusiveTaskInfo(0, CRef<CThreadPool_Task>());
849 }
850
851 inline bool
852 CThreadPool_Impl::CanDoExclusiveTask(void) const
853 {
854 if ((m_SuspendFlags & CThreadPool::fExecuteQueuedTasks)
855 && GetQueuedTasksCount() != 0)
856 {
857 return false;
858 }
859
860 if ((m_SuspendFlags & CThreadPool::fFlushThreads)
861 && GetThreadsCount() != 0)
862 {
863 return false;
864 }
865
866 return m_WorkingThreads.size() == 0;
867 }
868
869 inline void
870 CThreadPool_Impl::RequestSuspend(TExclusiveFlags flags)
871 {
872 m_SuspendFlags = flags;
873 m_Suspended = true;
874 if (flags & CThreadPool::fCancelQueuedTasks) {
875 x_CancelQueuedTasks();
876 }
877 if (flags & CThreadPool::fCancelExecutingTasks) {
878 x_CancelExecutingTasks();
879 }
880
881 if (flags & CThreadPool::fFlushThreads) {
882 FinishThreads(m_IdleThreads.size());
883 }
884
885 CallController(CThreadPool_Controller::eSuspend);
886 }
887
888 inline void
889 CThreadPool_Impl::ResumeWork(void)
890 {
891 m_Suspended = false;
892
893 CallController(CThreadPool_Controller::eResume);
894
895 ITERATE(TThreadsList, it, m_IdleThreads) {
896 (*it)->WakeUp();
897 }
898 }
899
900
901
902 inline void
903 CThreadPool_Controller::x_AttachToPool(CThreadPool_Impl* pool)
904 {
905 if (m_Pool != NULL) {
906 NCBI_THROW(CThreadPoolException, eControllerBusy,
907 "Cannot attach Controller to several ThreadPools.");
908 }
909
910 m_Pool = pool;
911 }
912
913 inline void
914 CThreadPool_Controller::x_DetachFromPool(void)
915 {
916 m_Pool = NULL;
917 }
918
919
920
921 CThreadPool_Task::CThreadPool_Task(unsigned int priority)
922 {
923 x_Init(priority);
924 }
925
926 CThreadPool_Task::CThreadPool_Task(const CThreadPool_Task& other)
927 {
928 x_Init(other.m_Priority);
929 }
930
931 void
932 CThreadPool_Task::x_Init(unsigned int priority)
933 {
934 m_Pool = NULL;
935 m_Priority = priority;
936 m_Status = eIdle;
937 m_CancelRequested = false;
938 }
939
940 CThreadPool_Task::~CThreadPool_Task(void)
941 {}
942
943 CThreadPool_Task&
944 CThreadPool_Task::operator= (const CThreadPool_Task& other)
945 {
946 if (m_IsBusy.Get() != 0) {
947 NCBI_THROW(CThreadPoolException, eTaskBusy,
948 "Cannot change task when it is already added "
949 "to ThreadPool");
950 }
951
952 CObject::operator= (other);
953 // There can be race with CThreadPool_Impl::AddTask()
954 // If task will be already added to queue and priority will be then
955 // changed queue can crush later
956 m_Priority = other.m_Priority;
957 return *this;
958 }
959
960 void
961 CThreadPool_Task::OnStatusChange(EStatus /* old */)
962 {}
963
964 void
965 CThreadPool_Task::OnCancelRequested(void)
966 {}
967
968 inline void
969 CThreadPool_Task::x_SetOwner(CThreadPool_Impl* pool)
970 {
971 if (m_IsBusy.Add(1) != 1) {
972 m_IsBusy.Add(-1);
973 NCBI_THROW(CThreadPoolException, eTaskBusy,
974 "Cannot add task in ThreadPool several times");
975 }
976
977 m_Pool = pool;
978 }
979
980 inline void
981 CThreadPool_Task::x_ResetOwner(void)
982 {
983 m_Pool = NULL;
984 m_IsBusy.Add(-1);
985 }
986
987 void
988 CThreadPool_Task::x_SetStatus(EStatus new_status)
989 {
990 EStatus old_status = m_Status;
991 if (old_status != new_status && old_status != eCanceled) {
992 m_Status = new_status;
993 OnStatusChange(old_status);
994 }
995
996 if (IsFinished()) {
997 m_Pool = NULL;
998 }
999 }
1000
1001 inline void
1002 CThreadPool_Task::x_RequestToCancel(void)
1003 {
1004 m_CancelRequested = true;
1005
1006 OnCancelRequested();
1007
1008 if (GetStatus() <= eQueued) {
1009 x_SetStatus(eCanceled);
1010 }
1011 }
1012
1013 void
1014 CThreadPool_Task::RequestToCancel(void)
1015 {
1016 // Protect from possible reseting of the pool variable during execution
1017 CThreadPool_Impl* pool = m_Pool;
1018 if (IsFinished()) {
1019 return;
1020 }
1021 else if (!pool) {
1022 x_RequestToCancel();
1023 }
1024 else {
1025 pool->CancelTask(this);
1026 }
1027 }
1028
1029 CThreadPool*
1030 CThreadPool_Task::GetPool(void) const
1031 {
1032 // Protect from possible reseting of the pool variable during execution
1033 CThreadPool_Impl* pool_impl = m_Pool;
1034 return pool_impl? pool_impl->GetPoolInterface(): NULL;
1035 }
1036
1037
1038
1039 CThreadPool_ServiceThread::CThreadPool_ServiceThread(CThreadPool_Impl* pool)
1040 : m_Pool(pool),
1041 m_IdleTrigger(0, kMax_Int),
1042 m_Finishing(false),
1043 m_Finished(false)
1044 {
1045 _ASSERT(pool);
1046
1047 m_NeedCallController.Set(kNeedCallController_Shift);
1048 }
1049
1050 CThreadPool_ServiceThread::~CThreadPool_ServiceThread(void)
1051 {}
1052
1053 inline bool
1054 CThreadPool_ServiceThread::IsFinished(void)
1055 {
1056 return m_Finished;
1057 }
1058
1059 inline void
1060 CThreadPool_ServiceThread::x_Idle(void)
1061 {
1062 if (m_NeedCallController.Add(-1) < kNeedCallController_Shift) {
1063 m_NeedCallController.Add(1);
1064 }
1065 m_Pool->CallController(CThreadPool_Controller::eOther);
1066
1067 CTimeSpan timeout = m_Pool->GetSafeSleepTime();
1068 m_IdleTrigger.TryWait(timeout.GetCompleteSeconds(),
1069 timeout.GetNanoSecondsAfterSecond());
1070 }
1071
1072 inline void
1073 CThreadPool_ServiceThread::x_WaitForPoolStop(CThreadPool_Guard* pool_guard)
1074 {
1075 while (! m_Pool->IsAborted() && ! m_Pool->CanDoExclusiveTask()) {
1076 pool_guard->Release();
1077 m_IdleTrigger.Wait();
1078 pool_guard->Guard();
1079 }
1080 }
1081
1082 inline void
1083 CThreadPool_ServiceThread::RequestToFinish(void)
1084 {
1085 m_Finishing = true;
1086 WakeUp();
1087
1088 CThreadPool_Task* task = m_CurrentTask;
1089 if (task) {
1090 CThreadPool_Impl::sx_RequestToCancel(task);
1091 }
1092 }
1093
1094 void*
1095 CThreadPool_ServiceThread::Main(void)
1096 {
1097 while (! m_Finishing) {
1098 CThreadPool_Impl::TExclusiveTaskInfo task_info =
1099 m_Pool->TryGetExclusiveTask();
1100 m_CurrentTask = task_info.second;
1101
1102 if (m_CurrentTask.IsNull()) {
1103 x_Idle();
1104 }
1105 else {
1106 CThreadPool_Guard guard(m_Pool);
1107
1108 if (m_Finishing) {
1109 if (! m_CurrentTask->IsCancelRequested()) {
1110 CThreadPool_Impl::sx_RequestToCancel(m_CurrentTask);
1111 }
1112 CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask,
1113 CThreadPool_Task::eCanceled);
1114 break;
1115 }
1116
1117 m_Pool->RequestSuspend(task_info.first);
1118 x_WaitForPoolStop(&guard);
1119
1120 if (m_Finishing) {
1121 if (!m_CurrentTask->IsCancelRequested()) {
1122 CThreadPool_Impl::sx_RequestToCancel(m_CurrentTask);
1123 }
1124 CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask,
1125 CThreadPool_Task::eCanceled);
1126 break;
1127 }
1128
1129 guard.Release();
1130
1131 CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask,
1132 CThreadPool_Task::eExecuting);
1133 try {
1134 CThreadPool_Task::EStatus status =
1135 s_ConvertTaskResult(m_CurrentTask->Execute());
1136 CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask, status);
1137 }
1138 catch (exception& e) {
1139 ERR_POST_X(11, "Exception from exclusive task in ThreadPool: "
1140 << e.what());
1141 CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask,
1142 CThreadPool_Task::eFailed);
1143 }
1144 catch (...) {
1145 ERR_POST_X(12, "Unknown exception from exclusive task "
1146 "in ThreadPool.");
1147 CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask,
1148 CThreadPool_Task::eFailed);
1149 }
1150
1151 guard.Guard();
1152 m_Pool->ResumeWork();
1153 }
1154 }
1155
1156 m_Finished = true;
1157 m_Pool->ThreadStateChanged();
1158
1159 return NULL;
1160 }
1161
1162
1163
1164 inline CThreadPool_ThreadImpl*
1165 CThreadPool_ThreadImpl::s_GetImplPointer(CThreadPool_Thread* thread)
1166 {
1167 return thread->m_Impl;
1168 }
1169
1170 inline CThreadPool_Thread*
1171 CThreadPool_ThreadImpl::s_CreateThread(CThreadPool* pool)
1172 {
1173 return new CThreadPool_Thread(pool);
1174 }
1175
1176 inline
1177 CThreadPool_ThreadImpl::CThreadPool_ThreadImpl
1178 (
1179 CThreadPool_Thread* thread_intf,
1180 CThreadPool_Impl* pool
1181 )
1182 : m_Interface(thread_intf),
1183 m_Pool(pool),
1184 m_Finishing(false),
1185 m_CancelRequested(false),
1186 m_IsIdle(true),
1187 m_IdleTrigger(0, kMax_Int)
1188 {}
1189
1190 inline
1191 CThreadPool_ThreadImpl::~CThreadPool_ThreadImpl(void)
1192 {}
1193
1194 inline CThreadPool*
1195 CThreadPool_ThreadImpl::GetPool(void) const
1196 {
1197 return m_Pool->GetPoolInterface();
1198 }
1199
1200 inline bool
1201 CThreadPool_ThreadImpl::IsFinishing(void) const
1202 {
1203 return m_Finishing;
1204 }
1205
1206 inline CRef<CThreadPool_Task>
1207 CThreadPool_ThreadImpl::GetCurrentTask(void) const
1208 {
1209 return m_CurrentTask;
1210 }
1211
1212 inline void
1213 CThreadPool_ThreadImpl::x_SetIdleState(bool is_idle)
1214 {
1215 if (m_IsIdle != is_idle) {
1216 m_IsIdle = is_idle;
1217 m_Pool->SetThreadIdle(this, is_idle);
1218 }
1219 }
1220
1221 inline void
1222 CThreadPool_ThreadImpl::x_TaskFinished(CThreadPool_Task::EStatus status)
1223 {
1224 if (m_CurrentTask->GetStatus() == CThreadPool_Task::eExecuting) {
1225 CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask, status);
1226 }
1227
1228 m_CurrentTask.Reset();
1229 m_Pool->TaskFinished();
1230 }
1231
1232 inline void
1233 CThreadPool_ThreadImpl::x_Idle(void)
1234 {
1235 x_SetIdleState(true);
1236
1237 m_IdleTrigger.Wait();
1238 }
1239
1240 inline void
1241 CThreadPool_ThreadImpl::RequestToFinish(void)
1242 {
1243 m_Finishing = true;
1244 WakeUp();
1245 }
1246
1247 inline void
1248 CThreadPool_ThreadImpl::CancelCurrentTask(void)
1249 {
1250 // Avoid resetting of the pointer during execution
1251 CRef<CThreadPool_Task> task = m_CurrentTask;
1252 if (task.NotNull()) {
1253 CThreadPool_Impl::sx_RequestToCancel(task);
1254 }
1255 else {
1256 m_CancelRequested = true;
1257 }
1258 }
1259
1260 inline void
1261 CThreadPool_ThreadImpl::Main(void)
1262 {
1263 m_Interface->Initialize();
1264
1265 while (!m_Finishing) {
1266 m_CancelRequested = false;
1267 m_CurrentTask = m_Pool->TryGetNextTask();
1268
1269 if (m_CurrentTask.IsNull()) {
1270 x_Idle();
1271 }
1272 else {
1273 if (m_CurrentTask->IsCancelRequested() || m_CancelRequested) {
1274 // Some race can appear if task is canceled at the time
1275 // when it's being queued or at the time when it's being
1276 // unqueued
1277 if (! m_CurrentTask->IsCancelRequested()) {
1278 CThreadPool_Impl::sx_RequestToCancel(m_CurrentTask);
1279 }
1280 CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask,
1281 CThreadPool_Task::eCanceled);
1282 m_CurrentTask = NULL;
1283 continue;
1284 }
1285
1286 x_SetIdleState(false);
1287 m_Pool->TaskStarting();
1288
1289 CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask,
1290 CThreadPool_Task::eExecuting);
1291
1292 try {
1293 CThreadPool_Task::EStatus status =
1294 s_ConvertTaskResult(m_CurrentTask->Execute());
1295 x_TaskFinished(status);
1296 }
1297 catch (exception& e) {
1298 ERR_POST_X(7, "Exception from task in ThreadPool: "
1299 << e.what());
1300 x_TaskFinished(CThreadPool_Task::eFailed);
1301 }
1302 catch (...) {
1303 x_TaskFinished(CThreadPool_Task::eFailed);
1304 throw;
1305 }
1306 }
1307 }
1308 }
1309
1310 inline void
1311 CThreadPool_ThreadImpl::OnExit(void)
1312 {
1313 try {
1314 m_Interface->Finalize();
1315 } STD_CATCH_ALL_X(8, "Finalize")
1316
1317 m_Pool->ThreadStopped(this);
1318 }
1319
1320
1321
1322 inline CThreadPool_Impl*
1323 CThreadPool_Impl::s_GetImplPointer(CThreadPool* pool)
1324 {
1325 return pool->m_Impl;
1326 }
1327
1328 inline unsigned int
1329 CThreadPool_Impl::x_GetQueueSize(unsigned int queue_size)
1330 {
1331 if (queue_size == 0) {
1332 // 10 is just in case, in fact when queue_size == 0 pool will always
1333 // check for idle threads, so tasks will never crowd in the queue
1334 queue_size = 10;
1335 m_IsQueueAllowed = false;
1336 }
1337 else {
1338 m_IsQueueAllowed = true;
1339 }
1340
1341 return queue_size;
1342 }
1343
1344 inline
1345 CThreadPool_Impl::CThreadPool_Impl(CThreadPool* pool_intf,
1346 unsigned int queue_size,
1347 unsigned int max_threads,
1348 unsigned int min_threads,
1349 CThread::TRunMode threads_mode)
1350 : m_Queue(x_GetQueueSize(queue_size)),
1351 m_RoomWait(0, kMax_Int),
1352 m_AbortWait(0, kMax_Int)
1353 {
1354 x_Init(pool_intf,
1355 new CThreadPool_Controller_PID(max_threads, min_threads),
1356 threads_mode);
1357 }
1358
1359 inline
1360 CThreadPool_Impl::CThreadPool_Impl(CThreadPool* pool_intf,
1361 unsigned int queue_size,
1362 CThreadPool_Controller* controller,
1363 CThread::TRunMode threads_mode)
1364 : m_Queue(x_GetQueueSize(queue_size)),
1365 m_RoomWait(0, kMax_Int),
1366 m_AbortWait(0, kMax_Int)
1367 {
1368 x_Init(pool_intf, controller, threads_mode);
1369 }
1370
1371 void
1372 CThreadPool_Impl::x_Init(CThreadPool* pool_intf,
1373 CThreadPool_Controller* controller,
1374 CThread::TRunMode threads_mode)
1375 {
1376 m_Interface = pool_intf;
1377 m_SelfRef = this;
1378 m_DestroyTimeout = CTimeSpan(10, 0);
1379 m_ThreadsCount.Set(0);
1380 m_ExecutingTasks.Set(0);
1381 m_TotalTasks.Set(0);
1382 m_Aborted = false;
1383 m_Suspended = false;
1384 m_ThreadsMode = (threads_mode | CThread::fRunDetached)
1385 & ~CThread::fRunAllowST;
1386
1387 controller->x_AttachToPool(this);
1388 m_Controller = controller;
1389
1390 m_ServiceThread = new CThreadPool_ServiceThread(this);
1391 }
1392
1393 CThreadPool_Impl::~CThreadPool_Impl(void)
1394 {}
1395
1396 inline void
1397 CThreadPool_Impl::DestroyReference(void)
1398 {
1399 // Abort even if m_Aborted == true because threads can still be running
1400 // and we have to wait for their termination
1401 Abort(&m_DestroyTimeout);
1402
1403 m_Interface = NULL;
1404 m_ServiceThread = NULL;
1405 m_SelfRef = NULL;
1406 }
1407
1408 inline void
1409 CThreadPool_Impl::SetDestroyTimeout(const CTimeSpan& timeout)
1410 {
1411 m_DestroyTimeout = timeout;
1412 }
1413
1414 inline const CTimeSpan&
1415 CThreadPool_Impl::GetDestroyTimeout(void) const
1416 {
1417 return m_DestroyTimeout;
1418 }
1419
1420 void
1421 CThreadPool_Impl::LaunchThreads(unsigned int count)
1422 {
1423 if (count == 0)
1424 return;
1425
1426 CThreadPool_Guard guard(this);
1427
1428 for (unsigned int i = 0; i < count; ++i) {
1429 CRef<CThreadPool_Thread> thread(m_Interface->CreateThread());
1430 m_IdleThreads.insert(
1431 CThreadPool_ThreadImpl::s_GetImplPointer(thread));
1432 thread->Run(m_ThreadsMode);
1433 }
1434
1435 m_ThreadsCount.Add(count);
1436 CallControllerOther();
1437 }
1438
1439 void
1440 CThreadPool_Impl::FinishThreads(unsigned int count)
1441 {
1442 if (count == 0)
1443 return;
1444
1445 CThreadPool_Guard guard(this);
1446
1447 // The cast is theoretically extraneous, but Sun's WorkShop
1448 // compiler otherwise calls the wrong versions of begin() and
1449 // end() and refuses to convert the resulting iterators.
1450 REVERSE_ITERATE(TThreadsList, it,
1451 static_cast<const TThreadsList&>(m_IdleThreads))
1452 {
1453 // Maybe in case of several quick consecutive calls we should favor
1454 // the willing to finish several threads.
1455 //if ((*it)->IsFinishing())
1456 // continue;
1457
1458 (*it)->RequestToFinish();
1459 --count;
1460 if (count == 0)
1461 break;
1462 }
1463
1464 REVERSE_ITERATE(TThreadsList, it,
1465 static_cast<const TThreadsList&>(m_WorkingThreads))
1466 {
1467 if (count == 0)
1468 break;
1469
1470 (*it)->RequestToFinish();
1471 --count;
1472 }
1473 }
1474
1475 void
1476 CThreadPool_Impl::SetThreadIdle(CThreadPool_ThreadImpl* thread, bool is_idle)
1477 {
1478 CThreadPool_Guard guard(this);
1479
1480 TThreadsList* to_del;
1481 TThreadsList* to_ins;
1482 if (is_idle) {
1483 to_del = &m_WorkingThreads;
1484 to_ins = &m_IdleThreads;
1485 }
1486 else {
1487 to_del = &m_IdleThreads;
1488 to_ins = &m_WorkingThreads;
1489 }
1490
1491 TThreadsList::iterator it = to_del->find(thread);
1492 if (it != to_del->end()) {
1493 to_del->erase(it);
1494 }
1495 to_ins->insert(thread);
1496
1497 if (is_idle && m_Suspended
1498 && (m_SuspendFlags & CThreadPool::fFlushThreads))
1499 {
1500 thread->RequestToFinish();
1501 }
1502
1503 ThreadStateChanged();
1504 }
1505
1506 inline bool
1507 CThreadPool_Impl::x_IsNewTaskAllowed(void) const
1508 {
1509 return !m_Aborted
1510 && (!m_Suspended
1511 || !(m_SuspendFlags & CThreadPool::fDoNotAllowNewTasks));
1512 }
1513
1514 bool
1515 CThreadPool_Impl::x_CanAddImmediateTask(void) const
1516 {
1517 // If pool aborts at some point in waiting it has to stop waiting
1518 // immediately
1519 return !x_IsNewTaskAllowed()
1520 || !m_Suspended && (unsigned int)m_TotalTasks.Get()
1521 < m_Controller->GetMaxThreads();
1522 }
1523
1524 bool
1525 CThreadPool_Impl::x_HasNoThreads(void) const
1526 {
1527 CThreadPool_ServiceThread* thread = m_ServiceThread.GetNCPointerOrNull();
1528 return m_IdleThreads.size() + m_WorkingThreads.size() == 0
1529 && (! thread || thread->IsFinished());
1530 }
1531
1532 bool
1533 CThreadPool_Impl::x_WaitForPredicate(TWaitPredicate wait_func,
1534 CThreadPool_Guard* pool_guard,
1535 CSemaphore* wait_sema,
1536 const CTimeSpan* timeout,
1537 const CStopWatch* timer)
1538 {
1539 while (!(this->*wait_func)()) {
1540 pool_guard->Release();
1541
1542 if (timeout) {
1543 CTimeSpan next_tm = CTimeSpan(timeout->GetAsDouble()
1544 - timer->Elapsed());
1545 if (next_tm.GetSign() == eNegative) {
1546 return false;
1547 }
1548
1549 if (! wait_sema->TryWait(next_tm.GetCompleteSeconds(),
1550 next_tm.GetNanoSecondsAfterSecond()))
1551 {
1552 return false;
1553 }
1554 }
1555 else {
1556 wait_sema->Wait();
1557 }
1558
1559 pool_guard->Guard();
1560 }
1561
1562 return true;
1563 }
1564
1565 /// Throw an exception with standard message when AddTask() is called
1566 /// but ThreadPool is aborted or do not allow new tasks
1567 static inline void
1568 ThrowAddProhibited(void)
1569 {
1570 NCBI_THROW(CThreadPoolException, eProhibited,
1571 "Adding of new tasks is prohibited");
1572 }
1573
1574 inline void
1575 CThreadPool_Impl::AddTask(CThreadPool_Task* task, const CTimeSpan* timeout)
1576 {
1577 _ASSERT(task);
1578
1579 // To be sure that if simple new operator was passed as argument the task
1580 // will still be referenced even if some exception happen in this method
1581 CRef<CThreadPool_Task> task_ref(task);
1582
1583 if (!x_IsNewTaskAllowed()) {
1584 ThrowAddProhibited();
1585 }
1586
1587 CThreadPool_Guard guard(this, false);
1588 auto_ptr<CTimeSpan> real_timeout;
1589
1590 if (!m_IsQueueAllowed) {
1591 guard.Guard();
1592
1593 CStopWatch timer(CStopWatch::eStart);
1594 if (! x_WaitForPredicate(&CThreadPool_Impl::x_CanAddImmediateTask,
1595 &guard, &m_RoomWait, timeout, &timer))
1596 {
1597 NCBI_THROW(CSyncQueueException, eNoRoom,
1598 "Cannot add task - all threads are busy");
1599 }
1600
1601 if (!x_IsNewTaskAllowed()) {
1602 ThrowAddProhibited();
1603 }
1604
1605 if (timeout) {
1606 real_timeout.reset(new CTimeSpan(timeout->GetAsDouble()
1607 - timer.Elapsed()));
1608 }
1609 }
1610
1611 task->x_SetOwner(this);
1612 task->x_SetStatus(CThreadPool_Task::eQueued);
1613 try {
1614 // Pushing to queue must be out of mutex to be able to wait
1615 // for available space.
1616 m_Queue.Push(Ref(task), real_timeout.get());
1617 }
1618 catch (...) {
1619 task->x_SetStatus(CThreadPool_Task::eIdle);
1620 task->x_ResetOwner();
1621 throw;
1622 }
1623
1624 if (m_IsQueueAllowed) {
1625 guard.Guard();
1626 }
1627
1628 // Check if someone aborted the pool or suspended it with cacelation of
1629 // queued tasks after we added this task to the queue but before we were
1630 // able to acquire the mutex
1631 CThreadPool::TExclusiveFlags check_flags
1632 = CThreadPool::fDoNotAllowNewTasks + CThreadPool::fCancelQueuedTasks;
1633 if (m_Aborted || m_Suspended
1634 && (m_SuspendFlags & check_flags) == check_flags)
1635 {
1636 if (m_Queue.GetSize() != 0) {
1637 x_CancelQueuedTasks();
1638 }
1639 return;
1640 }
1641
1642 unsigned int cnt_req = (unsigned int)m_TotalTasks.Add(1);
1643
1644 if (!m_IsQueueAllowed && cnt_req > GetThreadsCount()) {
1645 LaunchThreads(cnt_req - GetThreadsCount());
1646 }
1647
1648 if (! m_Suspended) {
1649 int count = GetQueuedTasksCount();
1650 ITERATE(TThreadsList, it, m_IdleThreads) {
1651 if (! (*it)->IsFinishing()) {
1652 (*it)->WakeUp();
1653 --count;
1654 if (count == 0)
1655 break;
1656 }
1657 }
1658 }
1659
1660 CallControllerOther();
1661 }
1662
1663 inline void
1664 CThreadPool_Impl::x_RemoveTaskFromQueue(const CThreadPool_Task* task)
1665 {
1666 TQueue::TAccessGuard q_guard(m_Queue);
1667
1668 TQueue::TAccessGuard::TIterator it = q_guard.Begin();
1669 while (it != q_guard.End() && *it != task) {
1670 ++it;
1671 }
1672
1673 if (it != q_guard.End()) {
1674 q_guard.Erase(it);
1675 }
1676 }
1677
1678 void
1679 CThreadPool_Impl::RequestExclusiveExecution(CThreadPool_Task* task,
1680 TExclusiveFlags flags)
1681 {
1682 _ASSERT(task);
1683
1684 // To be sure that if simple new operator was passed as argument the task
1685 // will still be referenced even if some exception happen in this method
1686 CRef<CThreadPool_Task> task_ref(task);
1687
1688 if (m_Aborted) {
1689 NCBI_THROW(CThreadPoolException, eProhibited,
1690 "Cannot add exclusive task when ThreadPool is aborted");
1691 }
1692
1693 task->x_SetOwner(this);
1694 task->x_SetStatus(CThreadPool_Task::eQueued);
1695 m_ExclusiveQueue.Push(TExclusiveTaskInfo(flags, Ref(task)));
1696
1697 CThreadPool_ServiceThread* thread = m_ServiceThread;
1698 if (thread) {
1699 thread->WakeUp();
1700 }
1701 }
1702
1703 void
1704 CThreadPool_Impl::CancelTask(CThreadPool_Task* task)
1705 {
1706 _ASSERT(task);
1707
1708 if (task->IsFinished()) {
1709 return;
1710 }
1711 // Some race can happen here if the task is being queued now
1712 if (task->GetStatus() == CThreadPool_Task::eIdle) {
1713 task->x_RequestToCancel();
1714 return;
1715 }
1716
1717 CThreadPool* task_pool = task->GetPool();
1718 if (task_pool != m_Interface) {
1719 if (!task_pool) {
1720 // Task have just finished - we can do nothing
1721 return;
1722 }
1723
1724 NCBI_THROW(CThreadPoolException, eInvalid,
1725 "Cannot cancel task execution "
1726 "if it is inserted in another ThreadPool");
1727 }
1728
1729 task->x_RequestToCancel();
1730 x_RemoveTaskFromQueue(task);
1731
1732 CallControllerOther();
1733 }
1734
1735 inline void
1736 CThreadPool_Impl::CancelTasks(TExclusiveFlags tasks_group)
1737 {
1738 _ASSERT( (tasks_group & (CThreadPool::fCancelExecutingTasks
1739 + CThreadPool::fCancelQueuedTasks))
1740 == tasks_group
1741 && tasks_group != 0);
1742
1743 if (tasks_group & CThreadPool::fCancelQueuedTasks) {
1744 x_CancelQueuedTasks();
1745 }
1746 if (tasks_group & CThreadPool::fCancelExecutingTasks) {
1747 x_CancelExecutingTasks();
1748 }
1749
1750 CallControllerOther();
1751 }
1752
1753 void
1754 CThreadPool_Impl::x_CancelExecutingTasks(void)
1755 {
1756 CThreadPool_Guard guard(this);
1757
1758 ITERATE(TThreadsList, it, m_WorkingThreads) {
1759 (*it)->CancelCurrentTask();
1760 }
1761
1762 // CThreadPool_ThreadImpl::Main() acts not under guard, so we cannot be
1763 // sure that it doesn't have already task to execute before it marked
1764 // itself as working
1765 ITERATE(TThreadsList, it, m_IdleThreads) {
1766 (*it)->CancelCurrentTask();
1767 }
1768 }
1769
1770 void
1771 CThreadPool_Impl::x_CancelQueuedTasks(void)
1772 {
1773 TQueue::TAccessGuard q_guard(m_Queue);
1774
1775 for (TQueue::TAccessGuard::TIterator it = q_guard.Begin();
1776 it != q_guard.End(); ++it)
1777 {
1778 it->GetNCPointer()->x_RequestToCancel();
1779 }
1780
1781 m_Queue.Clear();
1782 }
1783
1784 inline void
1785 CThreadPool_Impl::FlushThreads(CThreadPool::EFlushType flush_type)
1786 {
1787 CThreadPool_Guard guard(this);
1788
1789 if (m_Aborted) {
1790 NCBI_THROW(CThreadPoolException, eProhibited,
1791 "Cannot flush threads when ThreadPool aborted");
1792 }
1793
1794 if (flush_type == CThreadPool::eStartImmediately
1795 || flush_type == CThreadPool::eWaitToFinish && m_Suspended)
1796 {
1797 FinishThreads(GetThreadsCount());
1798 }
1799 else if (flush_type == CThreadPool::eWaitToFinish) {
1800 bool need_add = true;
1801
1802 {{
1803 // To avoid races with TryGetExclusiveTask() we need to put
1804 // guard here
1805 TExclusiveQueue::TAccessGuard q_guard(m_ExclusiveQueue);
1806
1807 if (m_ExclusiveQueue.GetSize() != 0) {
1808 m_FlushRequested = true;
1809 need_add = false;
1810 }
1811 }}
1812
1813 if (need_add) {
1814 RequestExclusiveExecution(new CThreadPool_EmptyTask(),
1815 CThreadPool::fFlushThreads);
1816 }
1817 }
1818 }
1819
1820 inline void
1821 CThreadPool_Impl::Abort(const CTimeSpan* timeout)
1822 {
1823 CThreadPool_Guard guard(this);
1824
1825 // Method can be called several times in a row and every time we need
1826 // to wait for threads to finish operation
1827 m_Aborted = true;
1828
1829 x_CancelQueuedTasks();
1830 x_CancelExecutingTasks();
1831
1832 {{
1833 TExclusiveQueue::TAccessGuard q_guard(m_ExclusiveQueue);
1834
1835 for (TExclusiveQueue::TAccessGuard::TIterator it = q_guard.Begin();
1836 it != q_guard.End(); ++it)
1837 {
1838 it->second->x_RequestToCancel();
1839 }
1840
1841 m_ExclusiveQueue.Clear();
1842 }}
1843
1844 if (m_ServiceThread.NotNull()) {
1845 m_ServiceThread->RequestToFinish();
1846 }
1847
1848 FinishThreads(GetThreadsCount());
1849
1850 if (m_Controller.NotNull()) {
1851 m_Controller->x_DetachFromPool();
1852 }
1853
1854 CStopWatch timer(CStopWatch::eStart);
1855 x_WaitForPredicate(&CThreadPool_Impl::x_HasNoThreads,
1856 &guard, &m_AbortWait, timeout, &timer);
1857 m_AbortWait.Post();
1858
1859 // This assigning can destroy the controller. If some threads are not
1860 // finished yet and at this very moment will call controller it can crush.
1861 //m_Controller = NULL;
1862 }
1863
1864
1865
1866 CThreadPool_Controller::CThreadPool_Controller(unsigned int max_threads,
1867 unsigned int min_threads)
1868 : m_Pool(NULL),
1869 m_MinThreads(min_threads),
1870 m_MaxThreads(max_threads),
1871 m_InHandleEvent(false)
1872 {
1873 }
1874
1875 CThreadPool_Controller::~CThreadPool_Controller(void)
1876 {}
1877
1878 CThreadPool*
1879 CThreadPool_Controller::GetPool(void) const
1880 {
1881 // Avoid changing of pointer during method execution
1882 CThreadPool_Impl* pool = m_Pool;
1883 return pool? pool->GetPoolInterface(): NULL;
1884 }
1885
1886 CMutex&
1887 CThreadPool_Controller::GetMainPoolMutex(void) const
1888 {
1889 CThreadPool_Impl* pool = m_Pool;
1890 if (!pool) {
1891 NCBI_THROW(CThreadPoolException, eInactive,
1892 "Cannot do active work when not attached "
1893 "to some ThreadPool");
1894 }
1895 return pool->GetMainPoolMutex();
1896 }
1897
1898 void
1899 CThreadPool_Controller::EnsureLimits(void)
1900 {
1901 CThreadPool_Impl* pool = m_Pool;
1902
1903 if (! pool)
1904 return;
1905
1906 Uint4 count = pool->GetThreadsCount();
1907 if (count > m_MaxThreads) {
1908 pool->FinishThreads(count - m_MaxThreads);
1909 }
1910 if (count < m_MinThreads) {
1911 pool->LaunchThreads(m_MinThreads - count);
1912 }
1913 }
1914
1915 void
1916 CThreadPool_Controller::SetMinThreads(unsigned int min_threads)
1917 {
1918 CThreadPool_Guard guard(m_Pool, false);
1919 if (m_Pool)
1920 guard.Guard();
1921
1922 m_MinThreads = min_threads;
1923
1924 EnsureLimits();
1925 }
1926
1927 void
1928 CThreadPool_Controller::SetMaxThreads(unsigned int max_threads)
1929 {
1930 CThreadPool_Guard guard(m_Pool, false);
1931 if (m_Pool)
1932 guard.Guard();
1933
1934 m_MaxThreads = max_threads;
1935
1936 EnsureLimits();
1937 }
1938
1939 void
1940 CThreadPool_Controller::SetThreadsCount(unsigned int count)
1941 {
1942 if (count > GetMaxThreads())
1943 count = GetMaxThreads();
1944 if (count < GetMinThreads())
1945 count = GetMinThreads();
1946
1947 CThreadPool_Impl* pool = m_Pool;
1948
1949 unsigned int now_cnt = pool->GetThreadsCount();
1950 if (count > now_cnt) {
1951 pool->LaunchThreads(count - now_cnt);
1952 }
1953 else if (count < now_cnt) {
1954 pool->FinishThreads(now_cnt - count);
1955 }
1956 }
1957
1958 void
1959 CThreadPool_Controller::HandleEvent(EEvent event)
1960 {
1961 CThreadPool_Impl* pool = m_Pool;
1962 if (! pool)
1963 return;
1964
1965 CThreadPool_Guard guard(pool);
1966
1967 if (m_InHandleEvent || pool->IsAborted() || pool->IsSuspended())
1968 return;
1969
1970 m_InHandleEvent = true;
1971
1972 try {
1973 OnEvent(event);
1974 m_InHandleEvent = false;
1975 }
1976 catch (...) {
1977 m_InHandleEvent = false;
1978 throw;
1979 }
1980 }
1981
1982 CTimeSpan
1983 CThreadPool_Controller::GetSafeSleepTime(void) const
1984 {
1985 if (m_Pool) {
1986 return CTimeSpan(1, 0);
1987 }
1988 else {
1989 return CTimeSpan(0, 0);
1990 }
1991 }
1992
1993
1994
1995 CThreadPool_Thread::CThreadPool_Thread(CThreadPool* pool)
1996 {
1997 _ASSERT(pool);
1998
1999 m_Impl = new CThreadPool_ThreadImpl(this,
2000 CThreadPool_Impl::s_GetImplPointer(pool));
2001 }
2002
2003 CThreadPool_Thread::~CThreadPool_Thread(void)
2004 {
2005 delete m_Impl;
2006 }
2007
2008 void
2009 CThreadPool_Thread::Initialize(void)
2010 {}
2011
2012 void
2013 CThreadPool_Thread::Finalize(void)
2014 {}
2015
2016 CThreadPool*
2017 CThreadPool_Thread::GetPool(void) const
2018 {
2019 return m_Impl->GetPool();
2020 }
2021
2022 CRef<CThreadPool_Task>
2023 CThreadPool_Thread::GetCurrentTask(void) const
2024 {
2025 return m_Impl->GetCurrentTask();
2026 }
2027
2028 void*
2029 CThreadPool_Thread::Main(void)
2030 {
2031 m_Impl->Main();
2032 return NULL;
2033 }
2034
2035 void
2036 CThreadPool_Thread::OnExit(void)
2037 {
2038 m_Impl->OnExit();
2039 }
2040
2041
2042
2043 CThreadPool::CThreadPool(unsigned int queue_size,
2044 unsigned int max_threads,
2045 unsigned int min_threads,
2046 CThread::TRunMode threads_mode)
2047 {
2048 m_Impl = new CThreadPool_Impl(this, queue_size, max_threads, min_threads,
2049 threads_mode);
2050 m_Impl->SetInterfaceStarted();
2051 }
2052
2053 CThreadPool::CThreadPool(unsigned int queue_size,
2054 CThreadPool_Controller* controller,
2055 CThread::TRunMode threads_mode)
2056 {
2057 m_Impl = new CThreadPool_Impl(this, queue_size, controller, threads_mode);
2058 m_Impl->SetInterfaceStarted();
2059 }
2060
2061 CThreadPool::~CThreadPool(void)
2062 {
2063 m_Impl->DestroyReference();
2064 }
2065
2066 CMutex&
2067 CThreadPool::GetMainPoolMutex(void)
2068 {
2069 return m_Impl->GetMainPoolMutex();
2070 }
2071
2072 CThreadPool_Thread*
2073 CThreadPool::CreateThread(void)
2074 {
2075 return CThreadPool_ThreadImpl::s_CreateThread(this);
2076 }
2077
2078 void
2079 CThreadPool::AddTask(CThreadPool_Task* task, const CTimeSpan* timeout)
2080 {
2081 m_Impl->AddTask(task, timeout);
2082 }
2083
2084 void
2085 CThreadPool::CancelTask(CThreadPool_Task* task)
2086 {
2087 m_Impl->CancelTask(task);
2088 }
2089
2090 void
2091 CThreadPool::Abort(const CTimeSpan* timeout)
2092 {
2093 m_Impl->Abort(timeout);
2094 }
2095
2096 bool
2097 CThreadPool::IsAborted(void) const
2098 {
2099 return m_Impl->IsAborted();
2100 }
2101
2102 void
2103 CThreadPool::SetDestroyTimeout(const CTimeSpan& timeout)
2104 {
2105 m_Impl->SetDestroyTimeout(timeout);
2106 }
2107
2108 const CTimeSpan&
2109 CThreadPool::GetDestroyTimeout(void) const
2110 {
2111 return m_Impl->GetDestroyTimeout();
2112 }
2113
2114 void
2115 CThreadPool::RequestExclusiveExecution(CThreadPool_Task* task,
2116 TExclusiveFlags flags)
2117 {
2118 m_Impl->RequestExclusiveExecution(task, flags);
2119 }
2120
2121 void
2122 CThreadPool::CancelTasks(TExclusiveFlags tasks_group)
2123 {
2124 m_Impl->CancelTasks(tasks_group);
2125 }
2126
2127 void
2128 CThreadPool::FlushThreads(EFlushType flush_type)
2129 {
2130 m_Impl->FlushThreads(flush_type);
2131 }
2132
2133 unsigned int
2134 CThreadPool::GetThreadsCount(void) const
2135 {
2136 return m_Impl->GetThreadsCount();
2137 }
2138
2139 unsigned int
2140 CThreadPool::GetQueuedTasksCount(void) const
2141 {
2142 return m_Impl->GetQueuedTasksCount();
2143 }
2144
2145 unsigned int
2146 CThreadPool::GetExecutingTasksCount(void) const
2147 {
2148 return m_Impl->GetExecutingTasksCount();
2149 }
2150
2151
2152
2153 END_NCBI_SCOPE
2154 |
This page was automatically generated by the
LXR engine.
Visit the LXR main site for more information. |