NCBI C++ Toolkit Cross Reference

C++/src/util/thread_pool_ctrl.cpp


  1 /*  $Id: thread_pool_ctrl.cpp 154105 2009-03-06 20:35:18Z ivanovp $
  2 * ===========================================================================
  3 *
  4 *                            PUBLIC DOMAIN NOTICE
  5 *               National Center for Biotechnology Information
  6 *
  7 *  This software/database is a "United States Government Work" under the
  8 *  terms of the United States Copyright Act.  It was written as part of
  9 *  the author's official duties as a United States Government employee and
 10 *  thus cannot be copyrighted.  This software/database is freely available
 11 *  to the public for use. The National Library of Medicine and the U.S.
 12 *  Government have not placed any restriction on its use or reproduction.
 13 *
 14 *  Although all reasonable efforts have been taken to ensure the accuracy
 15 *  and reliability of the software and data, the NLM and the U.S.
 16 *  Government do not and cannot warrant the performance or results that
 17 *  may be obtained by using this software or data. The NLM and the U.S.
 18 *  Government disclaim all warranties, express or implied, including
 19 *  warranties of performance, merchantability or fitness for any particular
 20 *  purpose.
 21 *
 22 *  Please cite the author in any work or product based on this material.
 23 *
 24 * ===========================================================================
 25 *
 26 * Author:  Pavel Ivanov
 27 *
 28 * File Description:
 29 *   Implementations of controllers for ThreadPool.
 30 */
 31 
 32 
 33 #include <ncbi_pch.hpp>
 34 #include <util/thread_pool_ctrl.hpp>
 35 
 36 
 37 BEGIN_NCBI_SCOPE
 38 
 39 
 40 CThreadPool_Controller_PID::CThreadPool_Controller_PID
 41 (
 42     unsigned int max_threads,
 43     unsigned int min_threads
 44 )
 45   : CThreadPool_Controller(max_threads, min_threads),
 46     m_Timer(CStopWatch::eStart),
 47     m_IntegrErr(0),
 48     m_Threshold(3),
 49     m_IntegrCoeff(0.2),
 50     m_DerivCoeff(0.5),
 51     m_DerivTime(0.3)
 52 {
 53     m_ErrHistory.push_back(SThreadPool_PID_ErrInfo(0, 0));
 54 }
 55 
 56 void
 57 CThreadPool_Controller_PID::OnEvent(EEvent event)
 58 {
 59     if (event == eSuspend) {
 60         return;
 61     }
 62 
 63     unsigned int threads_count = GetPool()->GetThreadsCount();
 64     unsigned int queued_tasks  = GetPool()->GetQueuedTasksCount();
 65     unsigned int run_tasks     = GetPool()->GetExecutingTasksCount();
 66 
 67     if (threads_count == 0) {
 68         EnsureLimits();
 69         threads_count = GetMinThreads();
 70 
 71         // Special case when MinThreads == 0
 72         if (threads_count == 0) {
 73             if (queued_tasks == 0) {
 74                 return;
 75             }
 76 
 77             threads_count = 1;
 78             SetThreadsCount(threads_count);
 79         }
 80     }
 81 
 82     double now_err = (double(queued_tasks + run_tasks) - threads_count)
 83                             / threads_count;
 84     double now_time = m_Timer.Elapsed();
 85 
 86     if (event == eResume) {
 87         // When we resuming we need to avoid panic because of big changes in
 88         // error value. So we will assume that current error value was began
 89         // long time ago and didn't change afterwards.
 90         m_ErrHistory.clear();
 91         m_ErrHistory.push_back(SThreadPool_PID_ErrInfo(now_time - m_DerivTime,
 92                                                        now_err));
 93     }
 94 
 95     double period = now_time - m_ErrHistory.back().call_time;
 96 
 97     if (now_err < 0  &&  threads_count == GetMinThreads()
 98         &&  m_IntegrErr <= 0)
 99     {
100         now_err = 0;
101     }
102 
103     double integr_err = m_IntegrErr + (now_err + m_ErrHistory.back().err) / 2
104                                        * period / m_IntegrCoeff;
105 
106     while (m_ErrHistory.size() > 1
107            &&  now_time - m_ErrHistory[1].call_time >= m_DerivTime)
108     {
109         m_ErrHistory.pop_front();
110     }
111     if (now_time - m_ErrHistory.back().call_time >= m_DerivTime / 10) {
112         m_ErrHistory.push_back(SThreadPool_PID_ErrInfo(now_time, now_err));
113         if (threads_count == GetMaxThreads()  &&  integr_err > m_Threshold) {
114             m_IntegrErr = m_Threshold;
115         }
116         else if (threads_count == GetMinThreads()
117                  &&  integr_err < -m_Threshold)
118         {
119             m_IntegrErr = -m_Threshold;
120         }
121         else {
122             m_IntegrErr = integr_err;
123         }
124     }
125 
126     double deriv_err = (now_err - m_ErrHistory[0].err)
127                         / m_DerivTime * m_DerivCoeff;
128 
129     double final_val = (now_err + integr_err + deriv_err) / m_Threshold;
130 /*
131     LOG_POST(CTime(CTime::eCurrent).AsString("M/D/Y h:m:s.l").c_str()
132              << "  count=" << threads_count << ", queued=" << queued_tasks
133              << ", run=" << run_tasks << ", err=" << now_err << ", time=" << now_time
134              << ", intErr=" << m_IntegrErr << ", derivErr=" << deriv_err
135              << ", final=" << final_val << ", hist_size=" << m_ErrHistory.size());
136 */
137     if (final_val >= 1  ||  final_val <= -1) {
138         if (final_val < 0 && -final_val > threads_count)
139             SetThreadsCount(GetMinThreads());
140         else
141             SetThreadsCount(threads_count + int(final_val));
142     }
143     else {
144         EnsureLimits();
145     }
146 }
147 
148 CTimeSpan
149 CThreadPool_Controller_PID::GetSafeSleepTime(void) const
150 {
151     double last_err = 0, last_time = 0, integr_err = 0;
152     {{
153         CMutexGuard guard(GetMainPoolMutex());
154 
155         if (! GetPool()) {
156             return CTimeSpan(0, 0);
157         }
158 
159         if (m_ErrHistory.size() == 0) {
160             return CThreadPool_Controller::GetSafeSleepTime();
161         }
162 
163         last_err = m_ErrHistory.back().err;
164         last_time = m_ErrHistory.back().call_time;
165         integr_err = m_IntegrErr;
166     }}
167 
168     unsigned int threads_cnt = GetPool()->GetThreadsCount();
169     if (last_err == 0
170         ||  last_err > 0  &&  threads_cnt == GetMaxThreads()
171         ||  last_err < 0  &&  threads_cnt == GetMinThreads())
172     {
173         return CThreadPool_Controller::GetSafeSleepTime();
174     }
175 
176     double sleep_time = 0;
177     if (last_err > 0) {
178         sleep_time = (m_Threshold - last_err - integr_err)
179                      * m_IntegrCoeff / last_err;
180     }
181     else {
182         sleep_time = (-m_Threshold - last_err - integr_err)
183                      * m_IntegrCoeff / last_err;
184     }
185     if (sleep_time < 0)
186         sleep_time = 0;
187 
188     return CTimeSpan(sleep_time);
189 }
190 
191 
192 END_NCBI_SCOPE
193 

source navigation ]   [ diff markup ]   [ identifier search ]   [ freetext search ]   [ file search ]  

This page was automatically generated by the LXR engine.
Visit the LXR main site for more information.