00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032 #include <ncbi_pch.hpp>
00033 #include <util/thread_pool.hpp>
00034 #include <util/thread_pool_ctrl.hpp>
00035 #include <util/sync_queue.hpp>
00036 #include <util/error_codes.hpp>
00037
00038 #define NCBI_USE_ERRCODE_X Util_Thread
00039
00040 BEGIN_NCBI_SCOPE
00041
00042
00043 class CThreadPool_Guard;
00044 class CThreadPool_ServiceThread;
00045
00046
00047
00048 struct SThreadPool_TaskCompare {
00049 bool operator() (const CRef<CThreadPool_Task>& left,
00050 const CRef<CThreadPool_Task>& right)
00051 {
00052 return left->GetPriority() < right->GetPriority();
00053 }
00054 };
00055
00056
00057
00058 class CThreadPool_Impl : public CObject
00059 {
00060 public:
00061 typedef CThreadPool::TExclusiveFlags TExclusiveFlags;
00062
00063
00064
00065
00066 static CThreadPool_Impl* s_GetImplPointer(CThreadPool* pool);
00067
00068
00069
00070
00071
00072
00073 static void sx_SetTaskStatus(CThreadPool_Task* task,
00074 CThreadPool_Task::EStatus status);
00075
00076
00077
00078
00079
00080
00081 static void sx_RequestToCancel(CThreadPool_Task* task);
00082
00083
00084
00085
00086
00087
00088
00089 CThreadPool_Impl(CThreadPool* pool_intf,
00090 unsigned int queue_size,
00091 unsigned int max_threads,
00092 unsigned int min_threads,
00093 CThread::TRunMode threads_mode = CThread::fRunDefault);
00094
00095
00096
00097
00098
00099
00100 CThreadPool_Impl(CThreadPool* pool_intf,
00101 unsigned int queue_size,
00102 CThreadPool_Controller* controller,
00103 CThread::TRunMode threads_mode = CThread::fRunDefault);
00104
00105
00106 CThreadPool* GetPoolInterface(void) const;
00107
00108
00109
00110
00111 void SetDestroyTimeout(const CTimeSpan& timeout);
00112
00113
00114
00115
00116 const CTimeSpan& GetDestroyTimeout(void) const;
00117
00118
00119
00120
00121
00122 void DestroyReference(void);
00123
00124
00125
00126
00127 CMutex& GetMainPoolMutex(void);
00128
00129
00130
00131
00132 void AddTask(CThreadPool_Task* task, const CTimeSpan* timeout);
00133
00134
00135
00136
00137 void CancelTask(CThreadPool_Task* task);
00138
00139
00140
00141
00142 void CancelTasks(TExclusiveFlags tasks_group);
00143
00144
00145
00146
00147 void RequestExclusiveExecution(CThreadPool_Task* task,
00148 TExclusiveFlags flags);
00149
00150
00151
00152
00153 void LaunchThreads(unsigned int count);
00154
00155
00156
00157
00158
00159
00160 void FinishThreads(unsigned int count);
00161
00162
00163 unsigned int GetThreadsCount(void) const;
00164
00165
00166
00167
00168
00169
00170 void SetThreadIdle(CThreadPool_ThreadImpl* thread, bool is_idle);
00171
00172
00173 void ThreadStopped(CThreadPool_ThreadImpl* thread);
00174
00175
00176
00177 void ThreadStateChanged(void);
00178
00179
00180
00181 CRef<CThreadPool_Task> TryGetNextTask(void);
00182
00183
00184 void TaskStarting(void);
00185
00186
00187 void TaskFinished(void);
00188
00189
00190 unsigned int GetQueuedTasksCount(void) const;
00191
00192
00193 unsigned int GetExecutingTasksCount(void) const;
00194
00195
00196 typedef pair< TExclusiveFlags,
00197 CRef<CThreadPool_Task> > TExclusiveTaskInfo;
00198
00199
00200 TExclusiveTaskInfo TryGetExclusiveTask(void);
00201
00202
00203
00204
00205 void RequestSuspend(TExclusiveFlags flags);
00206
00207
00208 void ResumeWork(void);
00209
00210
00211 bool IsSuspended(void) const;
00212
00213
00214 bool CanDoExclusiveTask(void) const;
00215
00216
00217
00218
00219 void Abort(const CTimeSpan* timeout);
00220
00221
00222 bool IsAborted(void) const;
00223
00224
00225
00226
00227 void FlushThreads(CThreadPool::EFlushType flush_type);
00228
00229
00230
00231
00232 void CallController(CThreadPool_Controller::EEvent event);
00233
00234
00235
00236 void CallControllerOther(void);
00237
00238
00239
00240
00241 CTimeSpan GetSafeSleepTime(void) const;
00242
00243
00244 void SetInterfaceStarted(void);
00245
00246
00247 private:
00248
00249 typedef CSyncQueue< CRef<CThreadPool_Task>,
00250 CSyncQueue_multiset< CRef<CThreadPool_Task>,
00251 SThreadPool_TaskCompare > >
00252 TQueue;
00253
00254 typedef CSyncQueue<TExclusiveTaskInfo> TExclusiveQueue;
00255
00256 typedef set<CThreadPool_ThreadImpl*> TThreadsList;
00257
00258
00259
00260 CThreadPool_Impl(const CThreadPool_Impl&);
00261 CThreadPool_Impl& operator= (const CThreadPool_Impl&);
00262
00263
00264
00265
00266
00267 unsigned int x_GetQueueSize(unsigned int queue_size);
00268
00269
00270
00271
00272
00273
00274
00275 void x_Init(CThreadPool* pool_intf,
00276 CThreadPool_Controller* controller,
00277 CThread::TRunMode threads_mode);
00278
00279
00280 ~CThreadPool_Impl(void);
00281
00282
00283
00284 void x_RemoveTaskFromQueue(const CThreadPool_Task* task);
00285
00286
00287 void x_CancelQueuedTasks(void);
00288
00289
00290 void x_CancelExecutingTasks(void);
00291
00292
00293
00294
00295 typedef bool (CThreadPool_Impl::*TWaitPredicate)(void) const;
00296
00297
00298 bool x_IsNewTaskAllowed(void) const;
00299
00300
00301 bool x_CanAddImmediateTask(void) const;
00302
00303
00304 bool x_HasNoThreads(void) const;
00305
00306
00307
00308
00309
00310
00311
00312
00313
00314
00315
00316
00317
00318
00319 bool x_WaitForPredicate(TWaitPredicate wait_func,
00320 CThreadPool_Guard* pool_guard,
00321 CSemaphore* wait_sema,
00322 const CTimeSpan* timeout,
00323 const CStopWatch* timer);
00324
00325
00326 private:
00327
00328 CThreadPool* m_Interface;
00329
00330
00331 CRef<CThreadPool_Impl> m_SelfRef;
00332
00333
00334 CTimeSpan m_DestroyTimeout;
00335
00336 TQueue m_Queue;
00337
00338 CMutex m_MainPoolMutex;
00339
00340
00341 CSemaphore m_RoomWait;
00342
00343 CRef<CThreadPool_Controller> m_Controller;
00344
00345 TThreadsList m_IdleThreads;
00346
00347 TThreadsList m_WorkingThreads;
00348
00349 CThread::TRunMode m_ThreadsMode;
00350
00351
00352
00353 CAtomicCounter m_ThreadsCount;
00354
00355
00356
00357 CAtomicCounter m_ExecutingTasks;
00358
00359
00360
00361 CAtomicCounter m_TotalTasks;
00362
00363
00364 bool m_IsQueueAllowed;
00365
00366 volatile bool m_Aborted;
00367
00368
00369
00370 CSemaphore m_AbortWait;
00371
00372 volatile bool m_Suspended;
00373
00374 volatile TExclusiveFlags m_SuspendFlags;
00375
00376
00377 volatile bool m_FlushRequested;
00378
00379
00380 CRef<CThreadPool_ServiceThread> m_ServiceThread;
00381
00382 TExclusiveQueue m_ExclusiveQueue;
00383 };
00384
00385
00386
00387
00388 class CThreadPool_ThreadImpl
00389 {
00390 public:
00391
00392
00393
00394 static CThreadPool_ThreadImpl*
00395 s_GetImplPointer(CThreadPool_Thread* thread);
00396
00397
00398
00399
00400
00401
00402 static CThreadPool_Thread* s_CreateThread(CThreadPool* pool);
00403
00404
00405
00406
00407
00408
00409 CThreadPool_ThreadImpl(CThreadPool_Thread* thread_intf,
00410 CThreadPool_Impl* pool);
00411
00412
00413
00414 ~CThreadPool_ThreadImpl(void);
00415
00416
00417
00418
00419 CThreadPool* GetPool(void) const;
00420
00421
00422
00423
00424
00425 void RequestToFinish(void);
00426
00427
00428 bool IsFinishing(void) const;
00429
00430
00431
00432
00433 void WakeUp(void);
00434
00435
00436
00437
00438
00439
00440 CRef<CThreadPool_Task> GetCurrentTask(void) const;
00441
00442
00443 void CancelCurrentTask(void);
00444
00445
00446
00447
00448 void Main(void);
00449
00450
00451
00452
00453 void OnExit(void);
00454
00455 private:
00456
00457 CThreadPool_ThreadImpl(const CThreadPool_ThreadImpl&);
00458 CThreadPool_ThreadImpl& operator= (const CThreadPool_ThreadImpl&);
00459
00460
00461
00462
00463 void x_Idle(void);
00464
00465
00466 void x_SetIdleState(bool is_idle);
00467
00468
00469
00470
00471 void x_TaskFinished(CThreadPool_Task::EStatus status);
00472
00473
00474
00475 CThreadPool_Thread* m_Interface;
00476
00477 CRef<CThreadPool_Impl> m_Pool;
00478
00479 volatile bool m_Finishing;
00480
00481 volatile bool m_CancelRequested;
00482
00483 bool m_IsIdle;
00484
00485 CRef<CThreadPool_Task> m_CurrentTask;
00486
00487 CSemaphore m_IdleTrigger;
00488 };
00489
00490
00491
00492
00493
00494 class CThreadPool_ServiceThread : public CThread
00495 {
00496 public:
00497
00498
00499
00500 CThreadPool_ServiceThread(CThreadPool_Impl* pool);
00501
00502
00503
00504 void WakeUp(void);
00505
00506
00507 void RequestToFinish(void);
00508
00509
00510 bool IsFinished(void);
00511
00512
00513
00514
00515 void NeedCallController(void);
00516
00517 protected:
00518
00519 virtual ~CThreadPool_ServiceThread(void);
00520
00521 private:
00522
00523 virtual void* Main(void);
00524
00525
00526 void x_Idle(void);
00527
00528
00529 void x_WaitForPoolStop(CThreadPool_Guard* pool_guard);
00530
00531
00532 CRef<CThreadPool_Impl> m_Pool;
00533
00534 CSemaphore m_IdleTrigger;
00535
00536 volatile bool m_Finishing;
00537
00538 volatile bool m_Finished;
00539
00540 CRef<CThreadPool_Task> m_CurrentTask;
00541
00542 CAtomicCounter m_NeedCallController;
00543 };
00544
00545
00546
00547
00548 class CThreadPool_Guard : private CMutexGuard
00549 {
00550 public:
00551
00552
00553
00554
00555
00556 CThreadPool_Guard(CThreadPool_Impl* pool, bool is_active = true);
00557
00558
00559 void Guard(void);
00560
00561
00562 void Release(void);
00563
00564 private:
00565
00566 CThreadPool_Impl* m_Pool;
00567 };
00568
00569
00570
00571
00572
00573
00574
00575
00576 class CThreadPool_EmptyTask : public CThreadPool_Task
00577 {
00578 public:
00579
00580 virtual EStatus Execute(void) { return eCompleted; }
00581
00582 #ifdef NCBI_COMPILER_ICC
00583
00584
00585
00586 CThreadPool_EmptyTask(void) {}
00587 #endif
00588 };
00589
00590
00591
00592
00593
00594 static inline CThreadPool_Task::EStatus
00595 s_ConvertTaskResult(CThreadPool_Task::EStatus status)
00596 {
00597 _ASSERT(status == CThreadPool_Task::eCompleted
00598 || status == CThreadPool_Task::eFailed
00599 || status == CThreadPool_Task::eCanceled);
00600
00601 if (status != CThreadPool_Task::eCompleted
00602 && status != CThreadPool_Task::eFailed
00603 && status != CThreadPool_Task::eCanceled)
00604 {
00605 ERR_POST_X(9, Critical
00606 << "Wrong status returned from "
00607 "CThreadPool_Task::Execute(): "
00608 << status);
00609 status = CThreadPool_Task::eCompleted;
00610 }
00611
00612 return status;
00613 }
00614
00615
00616
00617 const CAtomicCounter::TValue kNeedCallController_Shift = 0x0FFFFFFF;
00618
00619
00620 inline void
00621 CThreadPool_ServiceThread::WakeUp(void)
00622 {
00623 m_IdleTrigger.Post();
00624 }
00625
00626 inline void
00627 CThreadPool_ServiceThread::NeedCallController(void)
00628 {
00629 if (m_NeedCallController.Add(1) > kNeedCallController_Shift + 1) {
00630 m_NeedCallController.Add(-1);
00631 }
00632 else {
00633 WakeUp();
00634 }
00635 }
00636
00637
00638
00639 inline void
00640 CThreadPool_ThreadImpl::WakeUp(void)
00641 {
00642 m_IdleTrigger.Post();
00643 }
00644
00645
00646
00647 inline CMutex&
00648 CThreadPool_Impl::GetMainPoolMutex(void)
00649 {
00650 return m_MainPoolMutex;
00651 }
00652
00653
00654
00655 CThreadPool_Guard::CThreadPool_Guard(CThreadPool_Impl* pool, bool is_active)
00656 : CMutexGuard(eEmptyGuard),
00657 m_Pool(pool)
00658 {
00659 _ASSERT(pool);
00660
00661 if (is_active)
00662 Guard();
00663 }
00664
00665 void
00666 CThreadPool_Guard::Guard(void)
00667 {
00668 CMutexGuard::Guard(m_Pool->GetMainPoolMutex());
00669 }
00670
00671 void
00672 CThreadPool_Guard::Release(void)
00673 {
00674 CMutexGuard::Release();
00675 }
00676
00677
00678
00679 inline void
00680 CThreadPool_Impl::sx_SetTaskStatus(CThreadPool_Task* task,
00681 CThreadPool_Task::EStatus status)
00682 {
00683 task->x_SetStatus(status);
00684 }
00685
00686 inline void
00687 CThreadPool_Impl::sx_RequestToCancel(CThreadPool_Task* task)
00688 {
00689 task->x_RequestToCancel();
00690 }
00691
00692 inline CThreadPool*
00693 CThreadPool_Impl::GetPoolInterface(void) const
00694 {
00695 return m_Interface;
00696 }
00697
00698 inline void
00699 CThreadPool_Impl::SetInterfaceStarted(void)
00700 {
00701 m_ServiceThread->Run(CThread::fRunDetached);
00702 }
00703
00704 inline bool
00705 CThreadPool_Impl::IsAborted(void) const
00706 {
00707 return m_Aborted;
00708 }
00709
00710 inline bool
00711 CThreadPool_Impl::IsSuspended(void) const
00712 {
00713 return m_Suspended;
00714 }
00715
00716 inline unsigned int
00717 CThreadPool_Impl::GetThreadsCount(void) const
00718 {
00719 return (unsigned int)m_ThreadsCount.Get();
00720 }
00721
00722 inline unsigned int
00723 CThreadPool_Impl::GetQueuedTasksCount(void) const
00724 {
00725 return (unsigned int)m_Queue.GetSize();
00726 }
00727
00728 inline unsigned int
00729 CThreadPool_Impl::GetExecutingTasksCount(void) const
00730 {
00731 return (unsigned int)m_ExecutingTasks.Get();
00732 }
00733
00734 inline CTimeSpan
00735 CThreadPool_Impl::GetSafeSleepTime(void) const
00736 {
00737
00738
00739 CThreadPool_Controller* controller = m_Controller.GetNCPointerOrNull();
00740 if (controller && ! m_Aborted) {
00741 return controller->GetSafeSleepTime();
00742 }
00743 else {
00744 return CTimeSpan(0, 0);
00745 }
00746 }
00747
00748 inline void
00749 CThreadPool_Impl::CallController(CThreadPool_Controller::EEvent event)
00750 {
00751 CThreadPool_Controller* controller = m_Controller.GetNCPointerOrNull();
00752 if (controller && ! m_Aborted &&
00753 (! m_Suspended || event == CThreadPool_Controller::eSuspend))
00754 {
00755 controller->HandleEvent(event);
00756 }
00757 }
00758
00759 inline void
00760 CThreadPool_Impl::CallControllerOther(void)
00761 {
00762 CThreadPool_ServiceThread* thread = m_ServiceThread;
00763 if (thread) {
00764 thread->NeedCallController();
00765 }
00766 }
00767
00768 inline void
00769 CThreadPool_Impl::TaskStarting(void)
00770 {
00771 m_ExecutingTasks.Add(1);
00772
00773
00774
00775 }
00776
00777 inline void
00778 CThreadPool_Impl::TaskFinished(void)
00779 {
00780 m_ExecutingTasks.Add(-1);
00781 m_TotalTasks.Add(-1);
00782 m_RoomWait.Post();
00783 CallControllerOther();
00784 }
00785
00786 inline void
00787 CThreadPool_Impl::ThreadStateChanged(void)
00788 {
00789 if (m_Aborted) {
00790 if (x_HasNoThreads()) {
00791 m_AbortWait.Post();
00792 }
00793 }
00794 else if (m_Suspended) {
00795 if ((m_SuspendFlags & CThreadPool::fFlushThreads)
00796 && GetThreadsCount() == 0
00797 || ! (m_SuspendFlags & CThreadPool::fFlushThreads)
00798 && m_WorkingThreads.size() == 0)
00799 {
00800 m_ServiceThread->WakeUp();
00801 }
00802 }
00803 }
00804
00805 inline void
00806 CThreadPool_Impl::ThreadStopped(CThreadPool_ThreadImpl* thread)
00807 {
00808 m_ThreadsCount.Add(-1);
00809
00810 CThreadPool_Guard guard(this);
00811
00812 m_IdleThreads.erase(thread);
00813 m_WorkingThreads.erase(thread);
00814
00815 CallControllerOther();
00816
00817 ThreadStateChanged();
00818 }
00819
00820 inline CRef<CThreadPool_Task>
00821 CThreadPool_Impl::TryGetNextTask(void)
00822 {
00823 if (!m_Suspended || (m_SuspendFlags & CThreadPool::fExecuteQueuedTasks)) {
00824 TQueue::TAccessGuard guard(m_Queue);
00825
00826 if (m_Queue.GetSize() != 0) {
00827 return m_Queue.Pop();
00828 }
00829 }
00830
00831 return CRef<CThreadPool_Task>();
00832 }
00833
00834 inline CThreadPool_Impl::TExclusiveTaskInfo
00835 CThreadPool_Impl::TryGetExclusiveTask(void)
00836 {
00837 TExclusiveQueue::TAccessGuard guard(m_ExclusiveQueue);
00838
00839 if (m_ExclusiveQueue.GetSize() != 0) {
00840 CThreadPool_Impl::TExclusiveTaskInfo info = m_ExclusiveQueue.Pop();
00841 if (m_FlushRequested) {
00842 info.first |= CThreadPool::fFlushThreads;
00843 m_FlushRequested = false;
00844 }
00845 return info;
00846 }
00847
00848 return TExclusiveTaskInfo(0, CRef<CThreadPool_Task>());
00849 }
00850
00851 inline bool
00852 CThreadPool_Impl::CanDoExclusiveTask(void) const
00853 {
00854 if ((m_SuspendFlags & CThreadPool::fExecuteQueuedTasks)
00855 && GetQueuedTasksCount() != 0)
00856 {
00857 return false;
00858 }
00859
00860 if ((m_SuspendFlags & CThreadPool::fFlushThreads)
00861 && GetThreadsCount() != 0)
00862 {
00863 return false;
00864 }
00865
00866 return m_WorkingThreads.size() == 0;
00867 }
00868
00869 inline void
00870 CThreadPool_Impl::RequestSuspend(TExclusiveFlags flags)
00871 {
00872 m_SuspendFlags = flags;
00873 m_Suspended = true;
00874 if (flags & CThreadPool::fCancelQueuedTasks) {
00875 x_CancelQueuedTasks();
00876 }
00877 if (flags & CThreadPool::fCancelExecutingTasks) {
00878 x_CancelExecutingTasks();
00879 }
00880
00881 if (flags & CThreadPool::fFlushThreads) {
00882 FinishThreads(m_IdleThreads.size());
00883 }
00884
00885 CallController(CThreadPool_Controller::eSuspend);
00886 }
00887
00888 inline void
00889 CThreadPool_Impl::ResumeWork(void)
00890 {
00891 m_Suspended = false;
00892
00893 CallController(CThreadPool_Controller::eResume);
00894
00895 ITERATE(TThreadsList, it, m_IdleThreads) {
00896 (*it)->WakeUp();
00897 }
00898 }
00899
00900
00901
00902 inline void
00903 CThreadPool_Controller::x_AttachToPool(CThreadPool_Impl* pool)
00904 {
00905 if (m_Pool != NULL) {
00906 NCBI_THROW(CThreadPoolException, eControllerBusy,
00907 "Cannot attach Controller to several ThreadPools.");
00908 }
00909
00910 m_Pool = pool;
00911 }
00912
00913 inline void
00914 CThreadPool_Controller::x_DetachFromPool(void)
00915 {
00916 m_Pool = NULL;
00917 }
00918
00919
00920
00921 CThreadPool_Task::CThreadPool_Task(unsigned int priority)
00922 {
00923 x_Init(priority);
00924 }
00925
00926 CThreadPool_Task::CThreadPool_Task(const CThreadPool_Task& other)
00927 {
00928 x_Init(other.m_Priority);
00929 }
00930
00931 void
00932 CThreadPool_Task::x_Init(unsigned int priority)
00933 {
00934 m_Pool = NULL;
00935 m_Priority = priority;
00936 m_Status = eIdle;
00937 m_CancelRequested = false;
00938 }
00939
00940 CThreadPool_Task::~CThreadPool_Task(void)
00941 {}
00942
00943 CThreadPool_Task&
00944 CThreadPool_Task::operator= (const CThreadPool_Task& other)
00945 {
00946 if (m_IsBusy.Get() != 0) {
00947 NCBI_THROW(CThreadPoolException, eTaskBusy,
00948 "Cannot change task when it is already added "
00949 "to ThreadPool");
00950 }
00951
00952 CObject::operator= (other);
00953
00954
00955
00956 m_Priority = other.m_Priority;
00957 return *this;
00958 }
00959
00960 void
00961 CThreadPool_Task::OnStatusChange(EStatus )
00962 {}
00963
00964 void
00965 CThreadPool_Task::OnCancelRequested(void)
00966 {}
00967
00968 inline void
00969 CThreadPool_Task::x_SetOwner(CThreadPool_Impl* pool)
00970 {
00971 if (m_IsBusy.Add(1) != 1) {
00972 m_IsBusy.Add(-1);
00973 NCBI_THROW(CThreadPoolException, eTaskBusy,
00974 "Cannot add task in ThreadPool several times");
00975 }
00976
00977 m_Pool = pool;
00978 }
00979
00980 inline void
00981 CThreadPool_Task::x_ResetOwner(void)
00982 {
00983 m_Pool = NULL;
00984 m_IsBusy.Add(-1);
00985 }
00986
00987 void
00988 CThreadPool_Task::x_SetStatus(EStatus new_status)
00989 {
00990 EStatus old_status = m_Status;
00991 if (old_status != new_status && old_status != eCanceled) {
00992 m_Status = new_status;
00993 OnStatusChange(old_status);
00994 }
00995
00996 if (IsFinished()) {
00997 m_Pool = NULL;
00998 }
00999 }
01000
01001 inline void
01002 CThreadPool_Task::x_RequestToCancel(void)
01003 {
01004 m_CancelRequested = true;
01005
01006 OnCancelRequested();
01007
01008 if (GetStatus() <= eQueued) {
01009 x_SetStatus(eCanceled);
01010 }
01011 }
01012
01013 void
01014 CThreadPool_Task::RequestToCancel(void)
01015 {
01016
01017 CThreadPool_Impl* pool = m_Pool;
01018 if (IsFinished()) {
01019 return;
01020 }
01021 else if (!pool) {
01022 x_RequestToCancel();
01023 }
01024 else {
01025 pool->CancelTask(this);
01026 }
01027 }
01028
01029 CThreadPool*
01030 CThreadPool_Task::GetPool(void) const
01031 {
01032
01033 CThreadPool_Impl* pool_impl = m_Pool;
01034 return pool_impl? pool_impl->GetPoolInterface(): NULL;
01035 }
01036
01037
01038
01039 CThreadPool_ServiceThread::CThreadPool_ServiceThread(CThreadPool_Impl* pool)
01040 : m_Pool(pool),
01041 m_IdleTrigger(0, kMax_Int),
01042 m_Finishing(false),
01043 m_Finished(false)
01044 {
01045 _ASSERT(pool);
01046
01047 m_NeedCallController.Set(kNeedCallController_Shift);
01048 }
01049
01050 CThreadPool_ServiceThread::~CThreadPool_ServiceThread(void)
01051 {}
01052
01053 inline bool
01054 CThreadPool_ServiceThread::IsFinished(void)
01055 {
01056 return m_Finished;
01057 }
01058
01059 inline void
01060 CThreadPool_ServiceThread::x_Idle(void)
01061 {
01062 if (m_NeedCallController.Add(-1) < kNeedCallController_Shift) {
01063 m_NeedCallController.Add(1);
01064 }
01065 m_Pool->CallController(CThreadPool_Controller::eOther);
01066
01067 CTimeSpan timeout = m_Pool->GetSafeSleepTime();
01068 m_IdleTrigger.TryWait(timeout.GetCompleteSeconds(),
01069 timeout.GetNanoSecondsAfterSecond());
01070 }
01071
01072 inline void
01073 CThreadPool_ServiceThread::x_WaitForPoolStop(CThreadPool_Guard* pool_guard)
01074 {
01075 while (! m_Pool->IsAborted() && ! m_Pool->CanDoExclusiveTask()) {
01076 pool_guard->Release();
01077 m_IdleTrigger.Wait();
01078 pool_guard->Guard();
01079 }
01080 }
01081
01082 inline void
01083 CThreadPool_ServiceThread::RequestToFinish(void)
01084 {
01085 m_Finishing = true;
01086 WakeUp();
01087
01088 CThreadPool_Task* task = m_CurrentTask;
01089 if (task) {
01090 CThreadPool_Impl::sx_RequestToCancel(task);
01091 }
01092 }
01093
01094 void*
01095 CThreadPool_ServiceThread::Main(void)
01096 {
01097 while (! m_Finishing) {
01098 CThreadPool_Impl::TExclusiveTaskInfo task_info =
01099 m_Pool->TryGetExclusiveTask();
01100 m_CurrentTask = task_info.second;
01101
01102 if (m_CurrentTask.IsNull()) {
01103 x_Idle();
01104 }
01105 else {
01106 CThreadPool_Guard guard(m_Pool);
01107
01108 if (m_Finishing) {
01109 if (! m_CurrentTask->IsCancelRequested()) {
01110 CThreadPool_Impl::sx_RequestToCancel(m_CurrentTask);
01111 }
01112 CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask,
01113 CThreadPool_Task::eCanceled);
01114 break;
01115 }
01116
01117 m_Pool->RequestSuspend(task_info.first);
01118 x_WaitForPoolStop(&guard);
01119
01120 if (m_Finishing) {
01121 if (!m_CurrentTask->IsCancelRequested()) {
01122 CThreadPool_Impl::sx_RequestToCancel(m_CurrentTask);
01123 }
01124 CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask,
01125 CThreadPool_Task::eCanceled);
01126 break;
01127 }
01128
01129 guard.Release();
01130
01131 CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask,
01132 CThreadPool_Task::eExecuting);
01133 try {
01134 CThreadPool_Task::EStatus status =
01135 s_ConvertTaskResult(m_CurrentTask->Execute());
01136 CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask, status);
01137 }
01138 catch (exception& e) {
01139 ERR_POST_X(11, "Exception from exclusive task in ThreadPool: "
01140 << e.what());
01141 CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask,
01142 CThreadPool_Task::eFailed);
01143 }
01144 catch (...) {
01145 ERR_POST_X(12, "Unknown exception from exclusive task "
01146 "in ThreadPool.");
01147 CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask,
01148 CThreadPool_Task::eFailed);
01149 }
01150
01151 guard.Guard();
01152 m_Pool->ResumeWork();
01153 }
01154 }
01155
01156 m_Finished = true;
01157 m_Pool->ThreadStateChanged();
01158
01159 return NULL;
01160 }
01161
01162
01163
01164 inline CThreadPool_ThreadImpl*
01165 CThreadPool_ThreadImpl::s_GetImplPointer(CThreadPool_Thread* thread)
01166 {
01167 return thread->m_Impl;
01168 }
01169
01170 inline CThreadPool_Thread*
01171 CThreadPool_ThreadImpl::s_CreateThread(CThreadPool* pool)
01172 {
01173 return new CThreadPool_Thread(pool);
01174 }
01175
01176 inline
01177 CThreadPool_ThreadImpl::CThreadPool_ThreadImpl
01178 (
01179 CThreadPool_Thread* thread_intf,
01180 CThreadPool_Impl* pool
01181 )
01182 : m_Interface(thread_intf),
01183 m_Pool(pool),
01184 m_Finishing(false),
01185 m_CancelRequested(false),
01186 m_IsIdle(true),
01187 m_IdleTrigger(0, kMax_Int)
01188 {}
01189
01190 inline
01191 CThreadPool_ThreadImpl::~CThreadPool_ThreadImpl(void)
01192 {}
01193
01194 inline CThreadPool*
01195 CThreadPool_ThreadImpl::GetPool(void) const
01196 {
01197 return m_Pool->GetPoolInterface();
01198 }
01199
01200 inline bool
01201 CThreadPool_ThreadImpl::IsFinishing(void) const
01202 {
01203 return m_Finishing;
01204 }
01205
01206 inline CRef<CThreadPool_Task>
01207 CThreadPool_ThreadImpl::GetCurrentTask(void) const
01208 {
01209 return m_CurrentTask;
01210 }
01211
01212 inline void
01213 CThreadPool_ThreadImpl::x_SetIdleState(bool is_idle)
01214 {
01215 if (m_IsIdle != is_idle) {
01216 m_IsIdle = is_idle;
01217 m_Pool->SetThreadIdle(this, is_idle);
01218 }
01219 }
01220
01221 inline void
01222 CThreadPool_ThreadImpl::x_TaskFinished(CThreadPool_Task::EStatus status)
01223 {
01224 if (m_CurrentTask->GetStatus() == CThreadPool_Task::eExecuting) {
01225 CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask, status);
01226 }
01227
01228 m_CurrentTask.Reset();
01229 m_Pool->TaskFinished();
01230 }
01231
01232 inline void
01233 CThreadPool_ThreadImpl::x_Idle(void)
01234 {
01235 x_SetIdleState(true);
01236
01237 m_IdleTrigger.Wait();
01238 }
01239
01240 inline void
01241 CThreadPool_ThreadImpl::RequestToFinish(void)
01242 {
01243 m_Finishing = true;
01244 WakeUp();
01245 }
01246
01247 inline void
01248 CThreadPool_ThreadImpl::CancelCurrentTask(void)
01249 {
01250
01251 CRef<CThreadPool_Task> task = m_CurrentTask;
01252 if (task.NotNull()) {
01253 CThreadPool_Impl::sx_RequestToCancel(task);
01254 }
01255 else {
01256 m_CancelRequested = true;
01257 }
01258 }
01259
01260 inline void
01261 CThreadPool_ThreadImpl::Main(void)
01262 {
01263 m_Interface->Initialize();
01264
01265 while (!m_Finishing) {
01266 m_CancelRequested = false;
01267 m_CurrentTask = m_Pool->TryGetNextTask();
01268
01269 if (m_CurrentTask.IsNull()) {
01270 x_Idle();
01271 }
01272 else {
01273 if (m_CurrentTask->IsCancelRequested() || m_CancelRequested) {
01274
01275
01276
01277 if (! m_CurrentTask->IsCancelRequested()) {
01278 CThreadPool_Impl::sx_RequestToCancel(m_CurrentTask);
01279 }
01280 CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask,
01281 CThreadPool_Task::eCanceled);
01282 m_CurrentTask = NULL;
01283 continue;
01284 }
01285
01286 x_SetIdleState(false);
01287 m_Pool->TaskStarting();
01288
01289 CThreadPool_Impl::sx_SetTaskStatus(m_CurrentTask,
01290 CThreadPool_Task::eExecuting);
01291
01292 try {
01293 CThreadPool_Task::EStatus status =
01294 s_ConvertTaskResult(m_CurrentTask->Execute());
01295 x_TaskFinished(status);
01296 }
01297 catch (exception& e) {
01298 ERR_POST_X(7, "Exception from task in ThreadPool: "
01299 << e.what());
01300 x_TaskFinished(CThreadPool_Task::eFailed);
01301 }
01302 catch (...) {
01303 x_TaskFinished(CThreadPool_Task::eFailed);
01304 throw;
01305 }
01306 }
01307 }
01308 }
01309
01310 inline void
01311 CThreadPool_ThreadImpl::OnExit(void)
01312 {
01313 try {
01314 m_Interface->Finalize();
01315 } STD_CATCH_ALL_X(8, "Finalize")
01316
01317 m_Pool->ThreadStopped(this);
01318 }
01319
01320
01321
01322 inline CThreadPool_Impl*
01323 CThreadPool_Impl::s_GetImplPointer(CThreadPool* pool)
01324 {
01325 return pool->m_Impl;
01326 }
01327
01328 inline unsigned int
01329 CThreadPool_Impl::x_GetQueueSize(unsigned int queue_size)
01330 {
01331 if (queue_size == 0) {
01332
01333
01334 queue_size = 10;
01335 m_IsQueueAllowed = false;
01336 }
01337 else {
01338 m_IsQueueAllowed = true;
01339 }
01340
01341 return queue_size;
01342 }
01343
01344 inline
01345 CThreadPool_Impl::CThreadPool_Impl(CThreadPool* pool_intf,
01346 unsigned int queue_size,
01347 unsigned int max_threads,
01348 unsigned int min_threads,
01349 CThread::TRunMode threads_mode)
01350 : m_Queue(x_GetQueueSize(queue_size)),
01351 m_RoomWait(0, kMax_Int),
01352 m_AbortWait(0, kMax_Int)
01353 {
01354 x_Init(pool_intf,
01355 new CThreadPool_Controller_PID(max_threads, min_threads),
01356 threads_mode);
01357 }
01358
01359 inline
01360 CThreadPool_Impl::CThreadPool_Impl(CThreadPool* pool_intf,
01361 unsigned int queue_size,
01362 CThreadPool_Controller* controller,
01363 CThread::TRunMode threads_mode)
01364 : m_Queue(x_GetQueueSize(queue_size)),
01365 m_RoomWait(0, kMax_Int),
01366 m_AbortWait(0, kMax_Int)
01367 {
01368 x_Init(pool_intf, controller, threads_mode);
01369 }
01370
01371 void
01372 CThreadPool_Impl::x_Init(CThreadPool* pool_intf,
01373 CThreadPool_Controller* controller,
01374 CThread::TRunMode threads_mode)
01375 {
01376 m_Interface = pool_intf;
01377 m_SelfRef = this;
01378 m_DestroyTimeout = CTimeSpan(10, 0);
01379 m_ThreadsCount.Set(0);
01380 m_ExecutingTasks.Set(0);
01381 m_TotalTasks.Set(0);
01382 m_Aborted = false;
01383 m_Suspended = false;
01384 m_ThreadsMode = (threads_mode | CThread::fRunDetached)
01385 & ~CThread::fRunAllowST;
01386
01387 controller->x_AttachToPool(this);
01388 m_Controller = controller;
01389
01390 m_ServiceThread = new CThreadPool_ServiceThread(this);
01391 }
01392
01393 CThreadPool_Impl::~CThreadPool_Impl(void)
01394 {}
01395
01396 inline void
01397 CThreadPool_Impl::DestroyReference(void)
01398 {
01399
01400
01401 Abort(&m_DestroyTimeout);
01402
01403 m_Interface = NULL;
01404 m_ServiceThread = NULL;
01405 m_SelfRef = NULL;
01406 }
01407
01408 inline void
01409 CThreadPool_Impl::SetDestroyTimeout(const CTimeSpan& timeout)
01410 {
01411 m_DestroyTimeout = timeout;
01412 }
01413
01414 inline const CTimeSpan&
01415 CThreadPool_Impl::GetDestroyTimeout(void) const
01416 {
01417 return m_DestroyTimeout;
01418 }
01419
01420 void
01421 CThreadPool_Impl::LaunchThreads(unsigned int count)
01422 {
01423 if (count == 0)
01424 return;
01425
01426 CThreadPool_Guard guard(this);
01427
01428 for (unsigned int i = 0; i < count; ++i) {
01429 CRef<CThreadPool_Thread> thread(m_Interface->CreateThread());
01430 m_IdleThreads.insert(
01431 CThreadPool_ThreadImpl::s_GetImplPointer(thread));
01432 thread->Run(m_ThreadsMode);
01433 }
01434
01435 m_ThreadsCount.Add(count);
01436 CallControllerOther();
01437 }
01438
01439 void
01440 CThreadPool_Impl::FinishThreads(unsigned int count)
01441 {
01442 if (count == 0)
01443 return;
01444
01445 CThreadPool_Guard guard(this);
01446
01447
01448
01449
01450 REVERSE_ITERATE(TThreadsList, it,
01451 static_cast<const TThreadsList&>(m_IdleThreads))
01452 {
01453
01454
01455
01456
01457
01458 (*it)->RequestToFinish();
01459 --count;
01460 if (count == 0)
01461 break;
01462 }
01463
01464 REVERSE_ITERATE(TThreadsList, it,
01465 static_cast<const TThreadsList&>(m_WorkingThreads))
01466 {
01467 if (count == 0)
01468 break;
01469
01470 (*it)->RequestToFinish();
01471 --count;
01472 }
01473 }
01474
01475 void
01476 CThreadPool_Impl::SetThreadIdle(CThreadPool_ThreadImpl* thread, bool is_idle)
01477 {
01478 CThreadPool_Guard guard(this);
01479
01480 TThreadsList* to_del;
01481 TThreadsList* to_ins;
01482 if (is_idle) {
01483 to_del = &m_WorkingThreads;
01484 to_ins = &m_IdleThreads;
01485 }
01486 else {
01487 to_del = &m_IdleThreads;
01488 to_ins = &m_WorkingThreads;
01489 }
01490
01491 TThreadsList::iterator it = to_del->find(thread);
01492 if (it != to_del->end()) {
01493 to_del->erase(it);
01494 }
01495 to_ins->insert(thread);
01496
01497 if (is_idle && m_Suspended
01498 && (m_SuspendFlags & CThreadPool::fFlushThreads))
01499 {
01500 thread->RequestToFinish();
01501 }
01502
01503 ThreadStateChanged();
01504 }
01505
01506 inline bool
01507 CThreadPool_Impl::x_IsNewTaskAllowed(void) const
01508 {
01509 return !m_Aborted
01510 && (!m_Suspended
01511 || !(m_SuspendFlags & CThreadPool::fDoNotAllowNewTasks));
01512 }
01513
01514 bool
01515 CThreadPool_Impl::x_CanAddImmediateTask(void) const
01516 {
01517
01518
01519 return !x_IsNewTaskAllowed()
01520 || !m_Suspended && (unsigned int)m_TotalTasks.Get()
01521 < m_Controller->GetMaxThreads();
01522 }
01523
01524 bool
01525 CThreadPool_Impl::x_HasNoThreads(void) const
01526 {
01527 CThreadPool_ServiceThread* thread = m_ServiceThread.GetNCPointerOrNull();
01528 return m_IdleThreads.size() + m_WorkingThreads.size() == 0
01529 && (! thread || thread->IsFinished());
01530 }
01531
01532 bool
01533 CThreadPool_Impl::x_WaitForPredicate(TWaitPredicate wait_func,
01534 CThreadPool_Guard* pool_guard,
01535 CSemaphore* wait_sema,
01536 const CTimeSpan* timeout,
01537 const CStopWatch* timer)
01538 {
01539 while (!(this->*wait_func)()) {
01540 pool_guard->Release();
01541
01542 if (timeout) {
01543 CTimeSpan next_tm = CTimeSpan(timeout->GetAsDouble()
01544 - timer->Elapsed());
01545 if (next_tm.GetSign() == eNegative) {
01546 return false;
01547 }
01548
01549 if (! wait_sema->TryWait(next_tm.GetCompleteSeconds(),
01550 next_tm.GetNanoSecondsAfterSecond()))
01551 {
01552 return false;
01553 }
01554 }
01555 else {
01556 wait_sema->Wait();
01557 }
01558
01559 pool_guard->Guard();
01560 }
01561
01562 return true;
01563 }
01564
01565
01566
01567 static inline void
01568 ThrowAddProhibited(void)
01569 {
01570 NCBI_THROW(CThreadPoolException, eProhibited,
01571 "Adding of new tasks is prohibited");
01572 }
01573
01574 inline void
01575 CThreadPool_Impl::AddTask(CThreadPool_Task* task, const CTimeSpan* timeout)
01576 {
01577 _ASSERT(task);
01578
01579
01580
01581 CRef<CThreadPool_Task> task_ref(task);
01582
01583 if (!x_IsNewTaskAllowed()) {
01584 ThrowAddProhibited();
01585 }
01586
01587 CThreadPool_Guard guard(this, false);
01588 auto_ptr<CTimeSpan> real_timeout;
01589
01590 if (!m_IsQueueAllowed) {
01591 guard.Guard();
01592
01593 CStopWatch timer(CStopWatch::eStart);
01594 if (! x_WaitForPredicate(&CThreadPool_Impl::x_CanAddImmediateTask,
01595 &guard, &m_RoomWait, timeout, &timer))
01596 {
01597 NCBI_THROW(CSyncQueueException, eNoRoom,
01598 "Cannot add task - all threads are busy");
01599 }
01600
01601 if (!x_IsNewTaskAllowed()) {
01602 ThrowAddProhibited();
01603 }
01604
01605 if (timeout) {
01606 real_timeout.reset(new CTimeSpan(timeout->GetAsDouble()
01607 - timer.Elapsed()));
01608 }
01609 }
01610
01611 task->x_SetOwner(this);
01612 task->x_SetStatus(CThreadPool_Task::eQueued);
01613 try {
01614
01615
01616 m_Queue.Push(Ref(task), real_timeout.get());
01617 }
01618 catch (...) {
01619 task->x_SetStatus(CThreadPool_Task::eIdle);
01620 task->x_ResetOwner();
01621 throw;
01622 }
01623
01624 if (m_IsQueueAllowed) {
01625 guard.Guard();
01626 }
01627
01628
01629
01630
01631 CThreadPool::TExclusiveFlags check_flags
01632 = CThreadPool::fDoNotAllowNewTasks + CThreadPool::fCancelQueuedTasks;
01633 if (m_Aborted || m_Suspended
01634 && (m_SuspendFlags & check_flags) == check_flags)
01635 {
01636 if (m_Queue.GetSize() != 0) {
01637 x_CancelQueuedTasks();
01638 }
01639 return;
01640 }
01641
01642 unsigned int cnt_req = (unsigned int)m_TotalTasks.Add(1);
01643
01644 if (!m_IsQueueAllowed && cnt_req > GetThreadsCount()) {
01645 LaunchThreads(cnt_req - GetThreadsCount());
01646 }
01647
01648 if (! m_Suspended) {
01649 int count = GetQueuedTasksCount();
01650 ITERATE(TThreadsList, it, m_IdleThreads) {
01651 if (! (*it)->IsFinishing()) {
01652 (*it)->WakeUp();
01653 --count;
01654 if (count == 0)
01655 break;
01656 }
01657 }
01658 }
01659
01660 CallControllerOther();
01661 }
01662
01663 inline void
01664 CThreadPool_Impl::x_RemoveTaskFromQueue(const CThreadPool_Task* task)
01665 {
01666 TQueue::TAccessGuard q_guard(m_Queue);
01667
01668 TQueue::TAccessGuard::TIterator it = q_guard.Begin();
01669 while (it != q_guard.End() && *it != task) {
01670 ++it;
01671 }
01672
01673 if (it != q_guard.End()) {
01674 q_guard.Erase(it);
01675 }
01676 }
01677
01678 void
01679 CThreadPool_Impl::RequestExclusiveExecution(CThreadPool_Task* task,
01680 TExclusiveFlags flags)
01681 {
01682 _ASSERT(task);
01683
01684
01685
01686 CRef<CThreadPool_Task> task_ref(task);
01687
01688 if (m_Aborted) {
01689 NCBI_THROW(CThreadPoolException, eProhibited,
01690 "Cannot add exclusive task when ThreadPool is aborted");
01691 }
01692
01693 task->x_SetOwner(this);
01694 task->x_SetStatus(CThreadPool_Task::eQueued);
01695 m_ExclusiveQueue.Push(TExclusiveTaskInfo(flags, Ref(task)));
01696
01697 CThreadPool_ServiceThread* thread = m_ServiceThread;
01698 if (thread) {
01699 thread->WakeUp();
01700 }
01701 }
01702
01703 void
01704 CThreadPool_Impl::CancelTask(CThreadPool_Task* task)
01705 {
01706 _ASSERT(task);
01707
01708 if (task->IsFinished()) {
01709 return;
01710 }
01711
01712 if (task->GetStatus() == CThreadPool_Task::eIdle) {
01713 task->x_RequestToCancel();
01714 return;
01715 }
01716
01717 CThreadPool* task_pool = task->GetPool();
01718 if (task_pool != m_Interface) {
01719 if (!task_pool) {
01720
01721 return;
01722 }
01723
01724 NCBI_THROW(CThreadPoolException, eInvalid,
01725 "Cannot cancel task execution "
01726 "if it is inserted in another ThreadPool");
01727 }
01728
01729 task->x_RequestToCancel();
01730 x_RemoveTaskFromQueue(task);
01731
01732 CallControllerOther();
01733 }
01734
01735 inline void
01736 CThreadPool_Impl::CancelTasks(TExclusiveFlags tasks_group)
01737 {
01738 _ASSERT( (tasks_group & (CThreadPool::fCancelExecutingTasks
01739 + CThreadPool::fCancelQueuedTasks))
01740 == tasks_group
01741 && tasks_group != 0);
01742
01743 if (tasks_group & CThreadPool::fCancelQueuedTasks) {
01744 x_CancelQueuedTasks();
01745 }
01746 if (tasks_group & CThreadPool::fCancelExecutingTasks) {
01747 x_CancelExecutingTasks();
01748 }
01749
01750 CallControllerOther();
01751 }
01752
01753 void
01754 CThreadPool_Impl::x_CancelExecutingTasks(void)
01755 {
01756 CThreadPool_Guard guard(this);
01757
01758 ITERATE(TThreadsList, it, m_WorkingThreads) {
01759 (*it)->CancelCurrentTask();
01760 }
01761
01762
01763
01764
01765 ITERATE(TThreadsList, it, m_IdleThreads) {
01766 (*it)->CancelCurrentTask();
01767 }
01768 }
01769
01770 void
01771 CThreadPool_Impl::x_CancelQueuedTasks(void)
01772 {
01773 TQueue::TAccessGuard q_guard(m_Queue);
01774
01775 for (TQueue::TAccessGuard::TIterator it = q_guard.Begin();
01776 it != q_guard.End(); ++it)
01777 {
01778 it->GetNCPointer()->x_RequestToCancel();
01779 }
01780
01781 m_Queue.Clear();
01782 }
01783
01784 inline void
01785 CThreadPool_Impl::FlushThreads(CThreadPool::EFlushType flush_type)
01786 {
01787 CThreadPool_Guard guard(this);
01788
01789 if (m_Aborted) {
01790 NCBI_THROW(CThreadPoolException, eProhibited,
01791 "Cannot flush threads when ThreadPool aborted");
01792 }
01793
01794 if (flush_type == CThreadPool::eStartImmediately
01795 || flush_type == CThreadPool::eWaitToFinish && m_Suspended)
01796 {
01797 FinishThreads(GetThreadsCount());
01798 }
01799 else if (flush_type == CThreadPool::eWaitToFinish) {
01800 bool need_add = true;
01801
01802 {{
01803
01804
01805 TExclusiveQueue::TAccessGuard q_guard(m_ExclusiveQueue);
01806
01807 if (m_ExclusiveQueue.GetSize() != 0) {
01808 m_FlushRequested = true;
01809 need_add = false;
01810 }
01811 }}
01812
01813 if (need_add) {
01814 RequestExclusiveExecution(new CThreadPool_EmptyTask(),
01815 CThreadPool::fFlushThreads);
01816 }
01817 }
01818 }
01819
01820 inline void
01821 CThreadPool_Impl::Abort(const CTimeSpan* timeout)
01822 {
01823 CThreadPool_Guard guard(this);
01824
01825
01826
01827 m_Aborted = true;
01828
01829 x_CancelQueuedTasks();
01830 x_CancelExecutingTasks();
01831
01832 {{
01833 TExclusiveQueue::TAccessGuard q_guard(m_ExclusiveQueue);
01834
01835 for (TExclusiveQueue::TAccessGuard::TIterator it = q_guard.Begin();
01836 it != q_guard.End(); ++it)
01837 {
01838 it->second->x_RequestToCancel();
01839 }
01840
01841 m_ExclusiveQueue.Clear();
01842 }}
01843
01844 if (m_ServiceThread.NotNull()) {
01845 m_ServiceThread->RequestToFinish();
01846 }
01847
01848 FinishThreads(GetThreadsCount());
01849
01850 if (m_Controller.NotNull()) {
01851 m_Controller->x_DetachFromPool();
01852 }
01853
01854 CStopWatch timer(CStopWatch::eStart);
01855 x_WaitForPredicate(&CThreadPool_Impl::x_HasNoThreads,
01856 &guard, &m_AbortWait, timeout, &timer);
01857 m_AbortWait.Post();
01858
01859
01860
01861
01862 }
01863
01864
01865
01866 CThreadPool_Controller::CThreadPool_Controller(unsigned int max_threads,
01867 unsigned int min_threads)
01868 : m_Pool(NULL),
01869 m_MinThreads(min_threads),
01870 m_MaxThreads(max_threads),
01871 m_InHandleEvent(false)
01872 {
01873 }
01874
01875 CThreadPool_Controller::~CThreadPool_Controller(void)
01876 {}
01877
01878 CThreadPool*
01879 CThreadPool_Controller::GetPool(void) const
01880 {
01881
01882 CThreadPool_Impl* pool = m_Pool;
01883 return pool? pool->GetPoolInterface(): NULL;
01884 }
01885
01886 CMutex&
01887 CThreadPool_Controller::GetMainPoolMutex(void) const
01888 {
01889 CThreadPool_Impl* pool = m_Pool;
01890 if (!pool) {
01891 NCBI_THROW(CThreadPoolException, eInactive,
01892 "Cannot do active work when not attached "
01893 "to some ThreadPool");
01894 }
01895 return pool->GetMainPoolMutex();
01896 }
01897
01898 void
01899 CThreadPool_Controller::EnsureLimits(void)
01900 {
01901 CThreadPool_Impl* pool = m_Pool;
01902
01903 if (! pool)
01904 return;
01905
01906 Uint4 count = pool->GetThreadsCount();
01907 if (count > m_MaxThreads) {
01908 pool->FinishThreads(count - m_MaxThreads);
01909 }
01910 if (count < m_MinThreads) {
01911 pool->LaunchThreads(m_MinThreads - count);
01912 }
01913 }
01914
01915 void
01916 CThreadPool_Controller::SetMinThreads(unsigned int min_threads)
01917 {
01918 CThreadPool_Guard guard(m_Pool, false);
01919 if (m_Pool)
01920 guard.Guard();
01921
01922 m_MinThreads = min_threads;
01923
01924 EnsureLimits();
01925 }
01926
01927 void
01928 CThreadPool_Controller::SetMaxThreads(unsigned int max_threads)
01929 {
01930 CThreadPool_Guard guard(m_Pool, false);
01931 if (m_Pool)
01932 guard.Guard();
01933
01934 m_MaxThreads = max_threads;
01935
01936 EnsureLimits();
01937 }
01938
01939 void
01940 CThreadPool_Controller::SetThreadsCount(unsigned int count)
01941 {
01942 if (count > GetMaxThreads())
01943 count = GetMaxThreads();
01944 if (count < GetMinThreads())
01945 count = GetMinThreads();
01946
01947 CThreadPool_Impl* pool = m_Pool;
01948
01949 unsigned int now_cnt = pool->GetThreadsCount();
01950 if (count > now_cnt) {
01951 pool->LaunchThreads(count - now_cnt);
01952 }
01953 else if (count < now_cnt) {
01954 pool->FinishThreads(now_cnt - count);
01955 }
01956 }
01957
01958 void
01959 CThreadPool_Controller::HandleEvent(EEvent event)
01960 {
01961 CThreadPool_Impl* pool = m_Pool;
01962 if (! pool)
01963 return;
01964
01965 CThreadPool_Guard guard(pool);
01966
01967 if (m_InHandleEvent || pool->IsAborted() || pool->IsSuspended())
01968 return;
01969
01970 m_InHandleEvent = true;
01971
01972 try {
01973 OnEvent(event);
01974 m_InHandleEvent = false;
01975 }
01976 catch (...) {
01977 m_InHandleEvent = false;
01978 throw;
01979 }
01980 }
01981
01982 CTimeSpan
01983 CThreadPool_Controller::GetSafeSleepTime(void) const
01984 {
01985 if (m_Pool) {
01986 return CTimeSpan(1, 0);
01987 }
01988 else {
01989 return CTimeSpan(0, 0);
01990 }
01991 }
01992
01993
01994
01995 CThreadPool_Thread::CThreadPool_Thread(CThreadPool* pool)
01996 {
01997 _ASSERT(pool);
01998
01999 m_Impl = new CThreadPool_ThreadImpl(this,
02000 CThreadPool_Impl::s_GetImplPointer(pool));
02001 }
02002
02003 CThreadPool_Thread::~CThreadPool_Thread(void)
02004 {
02005 delete m_Impl;
02006 }
02007
02008 void
02009 CThreadPool_Thread::Initialize(void)
02010 {}
02011
02012 void
02013 CThreadPool_Thread::Finalize(void)
02014 {}
02015
02016 CThreadPool*
02017 CThreadPool_Thread::GetPool(void) const
02018 {
02019 return m_Impl->GetPool();
02020 }
02021
02022 CRef<CThreadPool_Task>
02023 CThreadPool_Thread::GetCurrentTask(void) const
02024 {
02025 return m_Impl->GetCurrentTask();
02026 }
02027
02028 void*
02029 CThreadPool_Thread::Main(void)
02030 {
02031 m_Impl->Main();
02032 return NULL;
02033 }
02034
02035 void
02036 CThreadPool_Thread::OnExit(void)
02037 {
02038 m_Impl->OnExit();
02039 }
02040
02041
02042
02043 CThreadPool::CThreadPool(unsigned int queue_size,
02044 unsigned int max_threads,
02045 unsigned int min_threads,
02046 CThread::TRunMode threads_mode)
02047 {
02048 m_Impl = new CThreadPool_Impl(this, queue_size, max_threads, min_threads,
02049 threads_mode);
02050 m_Impl->SetInterfaceStarted();
02051 }
02052
02053 CThreadPool::CThreadPool(unsigned int queue_size,
02054 CThreadPool_Controller* controller,
02055 CThread::TRunMode threads_mode)
02056 {
02057 m_Impl = new CThreadPool_Impl(this, queue_size, controller, threads_mode);
02058 m_Impl->SetInterfaceStarted();
02059 }
02060
02061 CThreadPool::~CThreadPool(void)
02062 {
02063 m_Impl->DestroyReference();
02064 }
02065
02066 CMutex&
02067 CThreadPool::GetMainPoolMutex(void)
02068 {
02069 return m_Impl->GetMainPoolMutex();
02070 }
02071
02072 CThreadPool_Thread*
02073 CThreadPool::CreateThread(void)
02074 {
02075 return CThreadPool_ThreadImpl::s_CreateThread(this);
02076 }
02077
02078 void
02079 CThreadPool::AddTask(CThreadPool_Task* task, const CTimeSpan* timeout)
02080 {
02081 m_Impl->AddTask(task, timeout);
02082 }
02083
02084 void
02085 CThreadPool::CancelTask(CThreadPool_Task* task)
02086 {
02087 m_Impl->CancelTask(task);
02088 }
02089
02090 void
02091 CThreadPool::Abort(const CTimeSpan* timeout)
02092 {
02093 m_Impl->Abort(timeout);
02094 }
02095
02096 bool
02097 CThreadPool::IsAborted(void) const
02098 {
02099 return m_Impl->IsAborted();
02100 }
02101
02102 void
02103 CThreadPool::SetDestroyTimeout(const CTimeSpan& timeout)
02104 {
02105 m_Impl->SetDestroyTimeout(timeout);
02106 }
02107
02108 const CTimeSpan&
02109 CThreadPool::GetDestroyTimeout(void) const
02110 {
02111 return m_Impl->GetDestroyTimeout();
02112 }
02113
02114 void
02115 CThreadPool::RequestExclusiveExecution(CThreadPool_Task* task,
02116 TExclusiveFlags flags)
02117 {
02118 m_Impl->RequestExclusiveExecution(task, flags);
02119 }
02120
02121 void
02122 CThreadPool::CancelTasks(TExclusiveFlags tasks_group)
02123 {
02124 m_Impl->CancelTasks(tasks_group);
02125 }
02126
02127 void
02128 CThreadPool::FlushThreads(EFlushType flush_type)
02129 {
02130 m_Impl->FlushThreads(flush_type);
02131 }
02132
02133 unsigned int
02134 CThreadPool::GetThreadsCount(void) const
02135 {
02136 return m_Impl->GetThreadsCount();
02137 }
02138
02139 unsigned int
02140 CThreadPool::GetQueuedTasksCount(void) const
02141 {
02142 return m_Impl->GetQueuedTasksCount();
02143 }
02144
02145 unsigned int
02146 CThreadPool::GetExecutingTasksCount(void) const
02147 {
02148 return m_Impl->GetExecutingTasksCount();
02149 }
02150
02151
02152
02153 END_NCBI_SCOPE
02154
02155