|
NCBI Home IEB Home C++ Toolkit docs C Toolkit source browser C Toolkit source browser (2) |
NCBI C++ Toolkit Cross ReferenceC++/src/util/thread_pool_ctrl.cpp |
source navigation diff markup identifier search freetext search file search |
1 /* $Id: thread_pool_ctrl.cpp 154105 2009-03-06 20:35:18Z ivanovp $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Author: Pavel Ivanov
27 *
28 * File Description:
29 * Implementations of controllers for ThreadPool.
30 */
31
32
33 #include <ncbi_pch.hpp>
34 #include <util/thread_pool_ctrl.hpp>
35
36
37 BEGIN_NCBI_SCOPE
38
39
40 CThreadPool_Controller_PID::CThreadPool_Controller_PID
41 (
42 unsigned int max_threads,
43 unsigned int min_threads
44 )
45 : CThreadPool_Controller(max_threads, min_threads),
46 m_Timer(CStopWatch::eStart),
47 m_IntegrErr(0),
48 m_Threshold(3),
49 m_IntegrCoeff(0.2),
50 m_DerivCoeff(0.5),
51 m_DerivTime(0.3)
52 {
53 m_ErrHistory.push_back(SThreadPool_PID_ErrInfo(0, 0));
54 }
55
56 void
57 CThreadPool_Controller_PID::OnEvent(EEvent event)
58 {
59 if (event == eSuspend) {
60 return;
61 }
62
63 unsigned int threads_count = GetPool()->GetThreadsCount();
64 unsigned int queued_tasks = GetPool()->GetQueuedTasksCount();
65 unsigned int run_tasks = GetPool()->GetExecutingTasksCount();
66
67 if (threads_count == 0) {
68 EnsureLimits();
69 threads_count = GetMinThreads();
70
71 // Special case when MinThreads == 0
72 if (threads_count == 0) {
73 if (queued_tasks == 0) {
74 return;
75 }
76
77 threads_count = 1;
78 SetThreadsCount(threads_count);
79 }
80 }
81
82 double now_err = (double(queued_tasks + run_tasks) - threads_count)
83 / threads_count;
84 double now_time = m_Timer.Elapsed();
85
86 if (event == eResume) {
87 // When we resuming we need to avoid panic because of big changes in
88 // error value. So we will assume that current error value was began
89 // long time ago and didn't change afterwards.
90 m_ErrHistory.clear();
91 m_ErrHistory.push_back(SThreadPool_PID_ErrInfo(now_time - m_DerivTime,
92 now_err));
93 }
94
95 double period = now_time - m_ErrHistory.back().call_time;
96
97 if (now_err < 0 && threads_count == GetMinThreads()
98 && m_IntegrErr <= 0)
99 {
100 now_err = 0;
101 }
102
103 double integr_err = m_IntegrErr + (now_err + m_ErrHistory.back().err) / 2
104 * period / m_IntegrCoeff;
105
106 while (m_ErrHistory.size() > 1
107 && now_time - m_ErrHistory[1].call_time >= m_DerivTime)
108 {
109 m_ErrHistory.pop_front();
110 }
111 if (now_time - m_ErrHistory.back().call_time >= m_DerivTime / 10) {
112 m_ErrHistory.push_back(SThreadPool_PID_ErrInfo(now_time, now_err));
113 if (threads_count == GetMaxThreads() && integr_err > m_Threshold) {
114 m_IntegrErr = m_Threshold;
115 }
116 else if (threads_count == GetMinThreads()
117 && integr_err < -m_Threshold)
118 {
119 m_IntegrErr = -m_Threshold;
120 }
121 else {
122 m_IntegrErr = integr_err;
123 }
124 }
125
126 double deriv_err = (now_err - m_ErrHistory[0].err)
127 / m_DerivTime * m_DerivCoeff;
128
129 double final_val = (now_err + integr_err + deriv_err) / m_Threshold;
130 /*
131 LOG_POST(CTime(CTime::eCurrent).AsString("M/D/Y h:m:s.l").c_str()
132 << " count=" << threads_count << ", queued=" << queued_tasks
133 << ", run=" << run_tasks << ", err=" << now_err << ", time=" << now_time
134 << ", intErr=" << m_IntegrErr << ", derivErr=" << deriv_err
135 << ", final=" << final_val << ", hist_size=" << m_ErrHistory.size());
136 */
137 if (final_val >= 1 || final_val <= -1) {
138 if (final_val < 0 && -final_val > threads_count)
139 SetThreadsCount(GetMinThreads());
140 else
141 SetThreadsCount(threads_count + int(final_val));
142 }
143 else {
144 EnsureLimits();
145 }
146 }
147
148 CTimeSpan
149 CThreadPool_Controller_PID::GetSafeSleepTime(void) const
150 {
151 double last_err = 0, last_time = 0, integr_err = 0;
152 {{
153 CMutexGuard guard(GetMainPoolMutex());
154
155 if (! GetPool()) {
156 return CTimeSpan(0, 0);
157 }
158
159 if (m_ErrHistory.size() == 0) {
160 return CThreadPool_Controller::GetSafeSleepTime();
161 }
162
163 last_err = m_ErrHistory.back().err;
164 last_time = m_ErrHistory.back().call_time;
165 integr_err = m_IntegrErr;
166 }}
167
168 unsigned int threads_cnt = GetPool()->GetThreadsCount();
169 if (last_err == 0
170 || last_err > 0 && threads_cnt == GetMaxThreads()
171 || last_err < 0 && threads_cnt == GetMinThreads())
172 {
173 return CThreadPool_Controller::GetSafeSleepTime();
174 }
175
176 double sleep_time = 0;
177 if (last_err > 0) {
178 sleep_time = (m_Threshold - last_err - integr_err)
179 * m_IntegrCoeff / last_err;
180 }
181 else {
182 sleep_time = (-m_Threshold - last_err - integr_err)
183 * m_IntegrCoeff / last_err;
184 }
185 if (sleep_time < 0)
186 sleep_time = 0;
187
188 return CTimeSpan(sleep_time);
189 }
190
191
192 END_NCBI_SCOPE
193 |
This page was automatically generated by the
LXR engine.
Visit the LXR main site for more information. |