NCBI C++ ToolKit
thread_pool_ctrl.cpp
Go to the documentation of this file.
00001 /*  $Id: thread_pool_ctrl.cpp 52442 2011-12-27 16:21:37Z ivanovp $
00002 * ===========================================================================
00003 *
00004 *                            PUBLIC DOMAIN NOTICE
00005 *               National Center for Biotechnology Information
00006 *
00007 *  This software/database is a "United States Government Work" under the
00008 *  terms of the United States Copyright Act.  It was written as part of
00009 *  the author's official duties as a United States Government employee and
00010 *  thus cannot be copyrighted.  This software/database is freely available
00011 *  to the public for use. The National Library of Medicine and the U.S.
00012 *  Government have not placed any restriction on its use or reproduction.
00013 *
00014 *  Although all reasonable efforts have been taken to ensure the accuracy
00015 *  and reliability of the software and data, the NLM and the U.S.
00016 *  Government do not and cannot warrant the performance or results that
00017 *  may be obtained by using this software or data. The NLM and the U.S.
00018 *  Government disclaim all warranties, express or implied, including
00019 *  warranties of performance, merchantability or fitness for any particular
00020 *  purpose.
00021 *
00022 *  Please cite the author in any work or product based on this material.
00023 *
00024 * ===========================================================================
00025 *
00026 * Author:  Pavel Ivanov
00027 *
00028 * File Description:
00029 *   Implementations of controllers for ThreadPool.
00030 */
00031 
00032 
00033 #include <ncbi_pch.hpp>
00034 #include <util/thread_pool_ctrl.hpp>
00035 
00036 
00037 BEGIN_NCBI_SCOPE
00038 
00039 
00040 CThreadPool_Controller_PID::CThreadPool_Controller_PID
00041 (
00042     unsigned int max_threads,
00043     unsigned int min_threads
00044 )
00045   : CThreadPool_Controller(max_threads, min_threads),
00046     m_Timer(CStopWatch::eStart),
00047     m_IntegrErr(0),
00048     m_Threshold(3),
00049     m_IntegrCoeff(0.2),
00050     m_DerivCoeff(0.5),
00051     m_DerivTime(0.3)
00052 {
00053     m_ErrHistory.push_back(SThreadPool_PID_ErrInfo(0, 0));
00054 }
00055 
00056 void
00057 CThreadPool_Controller_PID::OnEvent(EEvent event)
00058 {
00059     if (event == eSuspend) {
00060         return;
00061     }
00062 
00063     // All reads below are atomic reads of one variable, thus they cannot
00064     // return bad results. They can be a little bit inconsistent with each
00065     // other because of races with other threads but that's okay for the
00066     // purposes of this controller.
00067     unsigned int threads_count = GetPool()->GetThreadsCount();
00068     unsigned int queued_tasks  = GetPool()->GetQueuedTasksCount();
00069     unsigned int run_tasks     = GetPool()->GetExecutingTasksCount();
00070 
00071     if (threads_count == 0) {
00072         EnsureLimits();
00073         threads_count = GetMinThreads();
00074 
00075         // Special case when MinThreads == 0
00076         if (threads_count == 0) {
00077             if (queued_tasks == 0) {
00078                 return;
00079             }
00080 
00081             threads_count = 1;
00082             SetThreadsCount(threads_count);
00083         }
00084     }
00085 
00086     double now_err = (double(queued_tasks + run_tasks) - threads_count)
00087                             / threads_count;
00088     double now_time = m_Timer.Elapsed();
00089 
00090     if (event == eResume) {
00091         // When we resuming we need to avoid panic because of big changes in
00092         // error value. So we will assume that current error value was began
00093         // long time ago and didn't change afterwards.
00094         m_ErrHistory.clear();
00095         m_ErrHistory.push_back(SThreadPool_PID_ErrInfo(now_time - m_DerivTime,
00096                                                        now_err));
00097     }
00098 
00099     double period = now_time - m_ErrHistory.back().call_time;
00100 
00101     if (now_err < 0  &&  threads_count == GetMinThreads()
00102         &&  m_IntegrErr <= 0)
00103     {
00104         now_err = 0;
00105     }
00106 
00107     double integr_err = m_IntegrErr + (now_err + m_ErrHistory.back().err) / 2
00108                                        * period / m_IntegrCoeff;
00109 
00110     while (m_ErrHistory.size() > 1
00111            &&  now_time - m_ErrHistory[1].call_time >= m_DerivTime)
00112     {
00113         m_ErrHistory.pop_front();
00114     }
00115     if (now_time - m_ErrHistory.back().call_time >= m_DerivTime / 10) {
00116         m_ErrHistory.push_back(SThreadPool_PID_ErrInfo(now_time, now_err));
00117         if (threads_count == GetMaxThreads()  &&  integr_err > m_Threshold) {
00118             m_IntegrErr = m_Threshold;
00119         }
00120         else if (threads_count == GetMinThreads()
00121                  &&  integr_err < -m_Threshold)
00122         {
00123             m_IntegrErr = -m_Threshold;
00124         }
00125         else {
00126             m_IntegrErr = integr_err;
00127         }
00128     }
00129 
00130     double deriv_err = (now_err - m_ErrHistory[0].err)
00131                         / m_DerivTime * m_DerivCoeff;
00132 
00133     double final_val = (now_err + integr_err + deriv_err) / m_Threshold;
00134 /*
00135     LOG_POST(CTime(CTime::eCurrent).AsString("M/D/Y h:m:s.l").c_str()
00136              << "  count=" << threads_count << ", queued=" << queued_tasks
00137              << ", run=" << run_tasks << ", err=" << now_err << ", time=" << now_time
00138              << ", intErr=" << m_IntegrErr << ", derivErr=" << deriv_err
00139              << ", final=" << final_val << ", hist_size=" << m_ErrHistory.size());
00140 */
00141     if (final_val >= 1  ||  final_val <= -1) {
00142         if (final_val < 0 && -final_val > threads_count)
00143             SetThreadsCount(GetMinThreads());
00144         else
00145             SetThreadsCount(threads_count + int(final_val));
00146     }
00147     else {
00148         EnsureLimits();
00149     }
00150 }
00151 
00152 CTimeSpan
00153 CThreadPool_Controller_PID::GetSafeSleepTime(void) const
00154 {
00155     double last_err = 0, integr_err = 0;
00156     CThreadPool* pool = GetPool();
00157     if (!pool) {
00158         return CTimeSpan(0, 0);
00159     }
00160     {{
00161         CMutexGuard guard(GetMainPoolMutex(pool));
00162 
00163         if (m_ErrHistory.size() == 0) {
00164             return CThreadPool_Controller::GetSafeSleepTime();
00165         }
00166 
00167         last_err = m_ErrHistory.back().err;
00168         integr_err = m_IntegrErr;
00169     }}
00170 
00171     unsigned int threads_cnt = pool->GetThreadsCount();
00172     if (last_err == 0
00173         ||  (last_err > 0  &&  threads_cnt == GetMaxThreads())
00174         ||  (last_err < 0  &&  threads_cnt == GetMinThreads()))
00175     {
00176         return CThreadPool_Controller::GetSafeSleepTime();
00177     }
00178 
00179     double sleep_time = 0;
00180     if (last_err > 0) {
00181         sleep_time = (m_Threshold - last_err - integr_err)
00182                      * m_IntegrCoeff / last_err;
00183     }
00184     else {
00185         sleep_time = (-m_Threshold - last_err - integr_err)
00186                      * m_IntegrCoeff / last_err;
00187     }
00188     if (sleep_time < 0)
00189         sleep_time = 0;
00190 
00191     return CTimeSpan(sleep_time);
00192 }
00193 
00194 
00195 END_NCBI_SCOPE
Modified on Wed May 23 13:32:03 2012 by modify_doxy.py rev. 337098