|
NCBI C++ ToolKit
|
00001 /* $Id: thread_pool_ctrl.cpp 52442 2011-12-27 16:21:37Z ivanovp $ 00002 * =========================================================================== 00003 * 00004 * PUBLIC DOMAIN NOTICE 00005 * National Center for Biotechnology Information 00006 * 00007 * This software/database is a "United States Government Work" under the 00008 * terms of the United States Copyright Act. It was written as part of 00009 * the author's official duties as a United States Government employee and 00010 * thus cannot be copyrighted. This software/database is freely available 00011 * to the public for use. The National Library of Medicine and the U.S. 00012 * Government have not placed any restriction on its use or reproduction. 00013 * 00014 * Although all reasonable efforts have been taken to ensure the accuracy 00015 * and reliability of the software and data, the NLM and the U.S. 00016 * Government do not and cannot warrant the performance or results that 00017 * may be obtained by using this software or data. The NLM and the U.S. 00018 * Government disclaim all warranties, express or implied, including 00019 * warranties of performance, merchantability or fitness for any particular 00020 * purpose. 00021 * 00022 * Please cite the author in any work or product based on this material. 00023 * 00024 * =========================================================================== 00025 * 00026 * Author: Pavel Ivanov 00027 * 00028 * File Description: 00029 * Implementations of controllers for ThreadPool. 00030 */ 00031 00032 00033 #include <ncbi_pch.hpp> 00034 #include <util/thread_pool_ctrl.hpp> 00035 00036 00037 BEGIN_NCBI_SCOPE 00038 00039 00040 CThreadPool_Controller_PID::CThreadPool_Controller_PID 00041 ( 00042 unsigned int max_threads, 00043 unsigned int min_threads 00044 ) 00045 : CThreadPool_Controller(max_threads, min_threads), 00046 m_Timer(CStopWatch::eStart), 00047 m_IntegrErr(0), 00048 m_Threshold(3), 00049 m_IntegrCoeff(0.2), 00050 m_DerivCoeff(0.5), 00051 m_DerivTime(0.3) 00052 { 00053 m_ErrHistory.push_back(SThreadPool_PID_ErrInfo(0, 0)); 00054 } 00055 00056 void 00057 CThreadPool_Controller_PID::OnEvent(EEvent event) 00058 { 00059 if (event == eSuspend) { 00060 return; 00061 } 00062 00063 // All reads below are atomic reads of one variable, thus they cannot 00064 // return bad results. They can be a little bit inconsistent with each 00065 // other because of races with other threads but that's okay for the 00066 // purposes of this controller. 00067 unsigned int threads_count = GetPool()->GetThreadsCount(); 00068 unsigned int queued_tasks = GetPool()->GetQueuedTasksCount(); 00069 unsigned int run_tasks = GetPool()->GetExecutingTasksCount(); 00070 00071 if (threads_count == 0) { 00072 EnsureLimits(); 00073 threads_count = GetMinThreads(); 00074 00075 // Special case when MinThreads == 0 00076 if (threads_count == 0) { 00077 if (queued_tasks == 0) { 00078 return; 00079 } 00080 00081 threads_count = 1; 00082 SetThreadsCount(threads_count); 00083 } 00084 } 00085 00086 double now_err = (double(queued_tasks + run_tasks) - threads_count) 00087 / threads_count; 00088 double now_time = m_Timer.Elapsed(); 00089 00090 if (event == eResume) { 00091 // When we resuming we need to avoid panic because of big changes in 00092 // error value. So we will assume that current error value was began 00093 // long time ago and didn't change afterwards. 00094 m_ErrHistory.clear(); 00095 m_ErrHistory.push_back(SThreadPool_PID_ErrInfo(now_time - m_DerivTime, 00096 now_err)); 00097 } 00098 00099 double period = now_time - m_ErrHistory.back().call_time; 00100 00101 if (now_err < 0 && threads_count == GetMinThreads() 00102 && m_IntegrErr <= 0) 00103 { 00104 now_err = 0; 00105 } 00106 00107 double integr_err = m_IntegrErr + (now_err + m_ErrHistory.back().err) / 2 00108 * period / m_IntegrCoeff; 00109 00110 while (m_ErrHistory.size() > 1 00111 && now_time - m_ErrHistory[1].call_time >= m_DerivTime) 00112 { 00113 m_ErrHistory.pop_front(); 00114 } 00115 if (now_time - m_ErrHistory.back().call_time >= m_DerivTime / 10) { 00116 m_ErrHistory.push_back(SThreadPool_PID_ErrInfo(now_time, now_err)); 00117 if (threads_count == GetMaxThreads() && integr_err > m_Threshold) { 00118 m_IntegrErr = m_Threshold; 00119 } 00120 else if (threads_count == GetMinThreads() 00121 && integr_err < -m_Threshold) 00122 { 00123 m_IntegrErr = -m_Threshold; 00124 } 00125 else { 00126 m_IntegrErr = integr_err; 00127 } 00128 } 00129 00130 double deriv_err = (now_err - m_ErrHistory[0].err) 00131 / m_DerivTime * m_DerivCoeff; 00132 00133 double final_val = (now_err + integr_err + deriv_err) / m_Threshold; 00134 /* 00135 LOG_POST(CTime(CTime::eCurrent).AsString("M/D/Y h:m:s.l").c_str() 00136 << " count=" << threads_count << ", queued=" << queued_tasks 00137 << ", run=" << run_tasks << ", err=" << now_err << ", time=" << now_time 00138 << ", intErr=" << m_IntegrErr << ", derivErr=" << deriv_err 00139 << ", final=" << final_val << ", hist_size=" << m_ErrHistory.size()); 00140 */ 00141 if (final_val >= 1 || final_val <= -1) { 00142 if (final_val < 0 && -final_val > threads_count) 00143 SetThreadsCount(GetMinThreads()); 00144 else 00145 SetThreadsCount(threads_count + int(final_val)); 00146 } 00147 else { 00148 EnsureLimits(); 00149 } 00150 } 00151 00152 CTimeSpan 00153 CThreadPool_Controller_PID::GetSafeSleepTime(void) const 00154 { 00155 double last_err = 0, integr_err = 0; 00156 CThreadPool* pool = GetPool(); 00157 if (!pool) { 00158 return CTimeSpan(0, 0); 00159 } 00160 {{ 00161 CMutexGuard guard(GetMainPoolMutex(pool)); 00162 00163 if (m_ErrHistory.size() == 0) { 00164 return CThreadPool_Controller::GetSafeSleepTime(); 00165 } 00166 00167 last_err = m_ErrHistory.back().err; 00168 integr_err = m_IntegrErr; 00169 }} 00170 00171 unsigned int threads_cnt = pool->GetThreadsCount(); 00172 if (last_err == 0 00173 || (last_err > 0 && threads_cnt == GetMaxThreads()) 00174 || (last_err < 0 && threads_cnt == GetMinThreads())) 00175 { 00176 return CThreadPool_Controller::GetSafeSleepTime(); 00177 } 00178 00179 double sleep_time = 0; 00180 if (last_err > 0) { 00181 sleep_time = (m_Threshold - last_err - integr_err) 00182 * m_IntegrCoeff / last_err; 00183 } 00184 else { 00185 sleep_time = (-m_Threshold - last_err - integr_err) 00186 * m_IntegrCoeff / last_err; 00187 } 00188 if (sleep_time < 0) 00189 sleep_time = 0; 00190 00191 return CTimeSpan(sleep_time); 00192 } 00193 00194 00195 END_NCBI_SCOPE
1.7.5.1
Modified on Wed May 23 13:32:03 2012 by modify_doxy.py rev. 337098