NCBI C++ ToolKit
threads_man.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: threads_man.cpp 76997 2017-03-17 13:19:30Z gouriano $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Author: Pavel Ivanov
27  *
28  */
29 
30 #include "task_server_pch.hpp"
31 
32 #include <corelib/ncbireg.hpp>
33 
34 #include "threads_man.hpp"
35 #include "timers.hpp"
36 #include "sockets_man.hpp"
37 #include "memory_man.hpp"
38 #include "logging.hpp"
39 #include "time_man.hpp"
40 #include "server_core.hpp"
41 #include "rcu.hpp"
42 #include "scheduler.hpp"
43 #include "srv_stat.hpp"
44 
45 #ifdef NCBI_OS_LINUX
46 # include <sys/prctl.h>
47 #endif
48 
49 
51 
52 
53 // We should reserve space for main thread and service thread, so that total
54 // amount doesn't overflow.
57 
58 
59 SSrvThread** s_Threads = NULL;
62 static SSrvThread* s_CurMgrThread = NULL;
63 static SSrvThread* s_MainThr = NULL;
64 static SSrvThread* s_SvcThr = NULL;
65 
66 #ifdef NCBI_OS_LINUX
67 static pthread_key_t s_CurThreadKey;
68 #endif
70 
72 
73 
74 extern Uint8 s_CurJiffies;
75 extern CSrvTime s_JiffyTime;
76 extern string s_AppBaseName;
77 
78 
79 
80 
83 {
84  SSrvThread* thr = NULL;
85 #ifdef NCBI_OS_LINUX
86  thr = (SSrvThread*)pthread_getspecific(s_CurThreadKey);
87 #endif
88  return thr;
89 }
90 
93 {
94  SSrvThread* thr = GetCurThread();
95  if (thr->thread_num == 0 || thr->thread_num == s_MaxRunningThreads + 1) {
96  SRV_FATAL("Unexpected ThreadNum: " << thr->thread_num);
97  }
98  return thr->thread_num - 1;
99 }
100 
103 {
104  TSrvThreadNum result = 0;
105  for (TSrvThreadNum i = 1; i <= s_MaxRunningThreads; ++i, ++result) {
106  if (!IsThreadRunning(s_Threads[i]))
107  break;
108  }
109  return result;
110 }
111 
112 static void
114 {
115 #ifdef NCBI_OS_LINUX
116  pthread_setspecific(s_CurThreadKey, thr);
117  char buf[20];
118  if (thr->thread_state != eThreadDormant)
119  snprintf(buf, 20, "%s_%d", s_AppBaseName.c_str(), thr->thread_num);
120  else if (thr->thread_num == 0)
121  snprintf(buf, 20, "%s", s_AppBaseName.c_str());
122  else
123  snprintf(buf, 20, "%s_S", s_AppBaseName.c_str());
124  prctl(PR_SET_NAME, (unsigned long)buf, 0, 0, 0);
125 #endif
126 }
127 
128 static void
130 {
132  s_SetCurThread(thr);
133  if (s_ThreadMgrState == eThrMgrStarting) {
134  thr->stat->ThreadStarted();
135  s_ThrMgrLock.Lock();
136  if (s_CurMgrThread != thr) {
137  SRV_FATAL("Invalid SrvThread state");
138  }
139  s_ThreadMgrState = eThrMgrIdle;
140  s_CurMgrThread = NULL;
141  s_ThrMgrLock.Unlock();
142  }
143 }
144 
145 static void
147 {
149  RCUPassQS(thr->rcu);
150  return;
151  }
152 
153  if (thr->seen_jiffy == s_CurJiffies)
154  return;
155  thr->seen_jiffy = s_CurJiffies;
156  RCUPassQS(thr->rcu);
157 
158  if (thr->seen_secs == CSrvTime::CurSecs())
159  return;
160  thr->seen_secs = CSrvTime::CurSecs();
161  CheckLoggingFlush(thr);
162 }
163 
164 static void
166 {
168  RCUPassQS(thr->rcu);
169  return;
170  }
171 
172  if (thr->seen_jiffy == s_CurJiffies)
173  return;
174  thr->seen_jiffy = s_CurJiffies;
175  RCUPassQS(thr->rcu);
177 
178  if (thr->seen_secs == CSrvTime::CurSecs())
179  return;
180  thr->seen_secs = CSrvTime::CurSecs();
181  CheckLoggingFlush(thr);
182  TimerTick();
183 }
184 
185 static void
187 {
189  if (thr->seen_srv_state != s_SrvState) {
190  thr->seen_srv_state = s_SrvState;
192  }
193  RCUPassQS(thr->rcu);
194  PromoteSockAmount(thr->socks);
196  if (IsServerStopping() && !RCUHasCalls(thr->rcu))
198  return;
199  }
200 
201  if (thr->seen_jiffy == s_CurJiffies)
202  return;
203  thr->seen_jiffy = s_CurJiffies;
204  RCUPassQS(thr->rcu);
205  SchedStartJiffy(thr);
206  PromoteSockAmount(thr->socks);
208 
209  if (thr->seen_secs == CSrvTime::CurSecs())
210  return;
211  thr->seen_secs = CSrvTime::CurSecs();
212  CheckLoggingFlush(thr);
213  if (thr->thread_state != eThreadLockedForStop) {
214  CleanSocketList(thr->socks);
215  if (thr->thread_num == 1 && s_ThreadMgrState == eThrMgrThreadExited) {
216  MoveAllSockets(thr->socks, s_CurMgrThread->socks);
217  s_ThrMgrLock.Lock();
218  s_ThreadMgrState = eThrMgrSocksMoved;
219  s_ThrMgrLock.Unlock();
220  }
221  }
222 }
223 
224 static SSrvThread*
226 {
227  SSrvThread* new_thr = new SSrvThread();
228  new_thr->thread_num = thread_num;
229  new_thr->stat = new CSrvStat();
230  AssignThreadMemMgr(new_thr);
231  AssignThreadLogging(new_thr);
232  AssignThreadSched(new_thr);
233  AssignThreadSocks(new_thr);
234  s_Threads[thread_num] = new_thr;
235  return new_thr;
236 }
237 
238 static void*
240 {
241  SSrvThread* thr = (SSrvThread*)data;
242  s_RegisterNewThread(thr);
243  RCUInitNewThread(thr);
244 
245  while (thr->thread_state != eThreadStopped) {
246  SchedExecuteTask(thr);
248  }
249 
250  RCUFinalizeThread(thr);
251 
252  if (!IsServerStopping()) {
253  s_ThrMgrLock.Lock();
254  if(s_CurMgrThread != thr || s_ThreadMgrState != eThrMgrPreparesToStop) {
255  SRV_FATAL("Invalid WorkerThread state");
256  }
257  s_ThreadMgrState = eThrMgrThreadExited;
258  s_ThrMgrLock.Unlock();
259  }
260  return NULL;
261 }
262 
263 static bool
264 s_StartThread(SSrvThread* thr, void* (*thr_func)(void*))
265 {
266 #ifdef NCBI_OS_LINUX
267  int res = pthread_create(&thr->thread_handle, NULL, thr_func, (void*)thr);
268  if (res != 0) {
269  SRV_LOG(Critical, "Unable to create new thread, result=" << res);
270  return false;
271  }
272 #endif
273  return true;
274 }
275 
276 static void
278 {
279 #ifdef NCBI_OS_LINUX
280  int res = pthread_join(s_CurMgrThread->thread_handle, NULL);
281  if (res != 0) {
282  SRV_LOG(Critical, "Cannot join the thread, res=" << res);
283  }
284 #endif
285 
286  ReleaseThreadMemMgr(s_CurMgrThread);
287  ReleaseThreadSched(s_CurMgrThread);
288  ReleaseThreadSocks(s_CurMgrThread);
289  StopThreadLogging(s_CurMgrThread);
290  s_CurMgrThread->stat->ThreadStopped();
291 
292  s_ThrMgrLock.Lock();
293  s_ThreadMgrState = eThrMgrIdle;
294  s_CurMgrThread->thread_state = eThreadReleased;
295  s_CurMgrThread = NULL;
296  s_ThrMgrLock.Unlock();
297 }
298 
299 static void
301 {
302  s_ThrMgrLock.Lock();
303  s_ThreadMgrState = eThrMgrStarting;
304  s_ThrMgrLock.Unlock();
305  StartThreadLogging(s_CurMgrThread);
306  if (!s_StartThread(s_CurMgrThread, &s_WorkerThreadMain)) {
307  s_ThrMgrLock.Lock();
308  s_ThreadMgrState = eThrMgrIdle;
309  s_CurMgrThread->thread_state = eThreadReleased;
310  s_CurMgrThread = NULL;
311  s_ThrMgrLock.Unlock();
312  }
313 }
314 
315 static void*
317 {
318  s_SetCurThread(s_SvcThr);
319  RCUInitNewThread(s_SvcThr);
320 
321  CSrvTime next_jfy_time = CSrvTime::Current();
322  next_jfy_time += s_JiffyTime;
323  while (!IsServerStopping() || RCUHasCalls(s_SvcThr->rcu)) {
324  s_PerJiffyTasks_Service(s_SvcThr);
325 
326  if (s_ThreadMgrState == eThrMgrNeedNewThread)
328  else if (s_ThreadMgrState == eThrMgrSocksMoved)
330 
333 
334  CSrvTime cur_time = CSrvTime::Current();
335  if (next_jfy_time > cur_time) {
336  CSrvTime wait_time = next_jfy_time;
337  wait_time -= cur_time;
338  s_SvcSignal.WaitValueChange(0, wait_time);
339  }
340  IncCurJiffies();
341 
342  next_jfy_time = s_LastJiffyTime;
343  next_jfy_time += s_JiffyTime;
344  }
345 
346  RCUFinalizeThread(s_SvcThr);
347  return NULL;
348 }
349 
350 void
352 {
353  if (s_ThreadMgrState != eThrMgrIdle || CTaskServer::IsInShutdown())
354  return;
355 
356  s_ThrMgrLock.Lock();
357  if (s_ThreadMgrState == eThrMgrIdle && thr->thread_state == eThreadReleased) {
358  s_ThreadMgrState = eThrMgrNeedNewThread;
359  s_CurMgrThread = thr;
361  }
362  s_ThrMgrLock.Unlock();
363 }
364 
365 void
367 {
368  if (s_ThreadMgrState != eThrMgrIdle || CTaskServer::IsInShutdown())
369  return;
370 
371  s_ThrMgrLock.Lock();
372  if (s_ThreadMgrState == eThrMgrIdle && thr->thread_state == eThreadRunning) {
373  s_ThreadMgrState = eThrMgrPreparesToStop;
374  s_CurMgrThread = thr;
376  thr->CallRCU();
377  }
378  s_ThrMgrLock.Unlock();
379 }
380 
381 void
383 {
384  s_ThrMgrLock.Lock();
385  if (thr->thread_state != eThreadLockedForStop ||
386  s_ThreadMgrState != eThrMgrPreparesToStop) {
387  SRV_FATAL("Invalid thread state");
388  }
390  s_ThreadMgrState = eThrMgrIdle;
391  s_CurMgrThread = NULL;
392  s_ThrMgrLock.Unlock();
393 }
394 
395 static bool
397 {
399 
400  if (!s_StartThread(s_SvcThr, &s_ServiceThreadMain))
401  return false;
402  for (TSrvThreadNum i = 1; i <= s_MaxRunningThreads; ++i) {
403  if (!s_StartThread(s_Threads[i], &s_WorkerThreadMain)) {
404  for (TSrvThreadNum j = i; j <= s_MaxRunningThreads; ++j) {
405  s_Threads[j]->thread_state = eThreadReleased;
406  }
407  break;
408  }
409  }
410  return true;
411 }
412 
413 static void
415 {
416  RCUFinalizeThread(s_MainThr);
417 
418 #ifdef NCBI_OS_LINUX
419  for (TSrvThreadNum i = 1; i <= s_MaxRunningThreads + 1; ++i) {
420  SSrvThread* thr = s_Threads[i];
421  if (thr->thread_state < eThreadReleased) {
422  pthread_join(thr->thread_handle, NULL);
424  }
425  }
426  ReleaseThreadLogging(s_SvcThr);
427 #endif
428 }
429 
430 void
432 {
433 #ifdef NCBI_OS_LINUX
434  int res = pthread_key_create(&s_CurThreadKey, NULL);
435  if (res) {
436  printf("terminating after pthread_key_create returned error %d\n", res);
437  SRV_FATAL("pthread_key_create returned error: " << res);
438  }
439 #endif
440 }
441 
442 void
444 {
445  s_MaxRunningThreads = TSrvThreadNum(reg->GetInt(section, "max_threads", 20));
446  if (s_MaxRunningThreads > kMaxNumberOfThreads)
447  s_MaxRunningThreads = kMaxNumberOfThreads;
448 }
449 
450 bool ReConfig_Threads(const CTempString&, const CNcbiRegistry&, string&)
451 {
452  return true;
453 }
454 
456 {
457  string is("\": "), eol(",\n\"");
458  task.WriteText(eol).WriteText("max_threads").WriteText(is ).WriteNumber( s_MaxRunningThreads);
459 }
460 
461 bool
463 {
464  s_Threads = (SSrvThread**)calloc(s_MaxRunningThreads + 2, sizeof(s_Threads[0]));
465  for (TSrvThreadNum i = 0; i <= s_MaxRunningThreads + 1; ++i) {
466  s_AllocThread(i);
467  }
468 
469  s_MainThr = s_Threads[0];
470  s_SvcThr = s_Threads[s_MaxRunningThreads + 1];
471 
472  s_MainThr->thread_state = eThreadDormant;
473  s_SvcThr->thread_state = eThreadDormant;
474  s_SetCurThread(s_MainThr);
475  RCUInitNewThread(s_MainThr);
476 
477  return true;
478 }
479 
480 void
482 {
483  if (!s_StartAllThreads())
484  return;
485  if (!IsThreadRunning(s_Threads[1]))
487 
488  while (!IsServerStopping()) {
489  s_PerJiffyTasks_Main(s_MainThr);
490  DoSocketWait();
491  }
492 
494 }
495 
496 void
498 {}
499 
500 
501 
503  : seen_jiffy(0),
504  seen_secs(0),
505  thread_state(eThreadStarting),
506  seen_srv_state(eSrvRunning),
507  cur_task(NULL),
508  mm_pool(NULL),
509  sched(NULL),
510  log_data(NULL),
511  rcu(NULL),
512  socks(NULL)
513 {}
514 
516 {}
517 
518 void
520 {
521  switch (thread_state) {
524  break;
525  case eThreadRevived:
527  break;
528  default:
529  SRV_FATAL("Unexpected thread state: " << thread_state);
530  }
531 }
532 
void ReleaseThreadMemMgr(SSrvThread *thr)
SSrvThread * GetCurThread(void)
Definition: threads_man.cpp:82
void AssignThreadSocks(SSrvThread *thr)
#define SRV_FATAL(msg)
Definition: srv_diag.hpp:173
static bool IsInShutdown(void)
Checks if TaskServer received request to shutdown.
void CallRCU(void)
Method to be called to schedule call of ExecuteRCU() at appropriate time.
Definition: rcu.cpp:194
void Lock(void)
Lock the mutex.
Definition: srv_sync.cpp:108
char * buf
void CheckLoggingFlush(SSrvThread *thr)
Definition: logging.cpp:619
SRCUInfo * rcu
Definition: threads_man.hpp:91
void ConfigureThreads(const CNcbiRegistry *reg, CTempString section)
void CleanSocketList(SSocketsData *socks)
unsigned NCBI_INT8_TYPE Uint8
Unsigned 8 byte sized integer.
Definition: ncbitype.h:146
CTempString implements a light-weight string on top of a storage buffer whose lifetime management is ...
Definition: tempstr.hpp:62
static void s_PerJiffyTasks_Worker(SSrvThread *thr)
virtual void ExecuteRCU(void)
Method implementing RCU job that was scheduled earlier by CallRCU().
TSrvThreadNum s_MaxRunningThreads
Definition: threads_man.cpp:71
virtual int GetInt(const string &section, const string &name, int default_value, TFlags flags=0, EErrAction err_action=eThrow) const
Get integer value of specified parameter name.
Definition: ncbireg.cpp:362
void IncCurJiffies(void)
Definition: time_man.cpp:110
void RCUFinalizeThread(SSrvThread *thr)
Definition: rcu.cpp:160
void TimerTick(void)
Definition: timers.cpp:177
void WriteSetup_Threads(CSrvSocketTask &task)
static EThreadMgrState s_ThreadMgrState
Definition: threads_man.cpp:61
Uint2 TSrvThreadNum
Type for thread number in TaskServer.
Definition: task_server.hpp:42
void RequestThreadStop(SSrvThread *thr)
void AssignThreadMemMgr(SSrvThread *thr)
CSrvTime s_LastJiffyTime
Definition: time_man.cpp:52
void ReleaseThreadSocks(SSrvThread *thr)
void MoveAllSockets(SSocketsData *dst_socks, SSocketsData *src_socks)
void TrackShuttingDown(void)
void CheckConnectsTimeout(SSocketsData *socks)
#define NULL
Definition: ncbistd.hpp:225
static const unsigned char res[3][32]
Definition: ccm.c:389
CSrvSocketTask & WriteText(CTempString message)
Write text into socket.
EThreadMgrState
static void s_PerJiffyTasks_Main(SSrvThread *thr)
static bool s_StartAllThreads(void)
EWaitResult WaitValueChange(int old_value)
Wait for futex's value to change (with and without timeout).
Definition: srv_sync.cpp:44
bool IsThreadRunning(SSrvThread *thr)
static void s_StopCurMgrThread(void)
static void s_RegisterNewThread(SSrvThread *thr)
string s_AppBaseName
Definition: server_core.cpp:77
int i
void SchedExecuteTask(SSrvThread *thr)
Definition: scheduler.cpp:520
static CMiniMutex s_ThrMgrLock
Definition: threads_man.cpp:60
void ThreadStarted(void)
Definition: srv_stat.cpp:195
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:101
static TSrvThreadNum GetCurThreadNum(void)
Returns number of current worker thread. Number is 0-based.
Definition: threads_man.cpp:92
static SSrvThread * s_AllocThread(TSrvThreadNum thread_num)
SSrvThread(void)
void StopThreadLogging(SSrvThread *thr)
Definition: logging.cpp:641
static void s_SetCurThread(SSrvThread *thr)
TSrvThreadNum thread_num
Definition: threads_man.hpp:79
void ThreadStopped(void)
Definition: srv_stat.cpp:201
EServerState s_SrvState
Definition: server_core.cpp:68
void SchedStartJiffy(SSrvThread *thr)
Definition: scheduler.cpp:388
Wrapper around Linux's futex.
Definition: srv_sync.hpp:140
void StartThreadLogging(SSrvThread *thr)
Definition: logging.cpp:635
void FinalizeThreadsMan(void)
static const TSrvThreadNum kMaxNumberOfThreads
Definition: threads_man.cpp:56
static void s_PerJiffyTasks_Service(SSrvThread *thr)
void InitCurThreadStorage(void)
static CFutex s_SvcSignal
Definition: threads_man.cpp:69
static void s_StartCurMgrThread(void)
bool InitThreadsMan(void)
bool RCUHasCalls(SRCUInfo *rcu)
Definition: rcu.cpp:137
static bool s_StartThread(SSrvThread *thr, void *(*thr_func)(void *))
void RunMainThread(void)
#define SRV_LOG(sev, msg)
Macro to be used for printing log messages.
Definition: srv_diag.hpp:162
T max(T x_, T y_)
static CSrvTime Current(void)
Exact current time with precision up to nanoseconds.
void PromoteSockAmount(SSocketsData *socks)
EServerState seen_srv_state
Definition: threads_man.hpp:83
void AssignThreadSched(SSrvThread *thr)
Definition: scheduler.cpp:584
Class incorporating convenient methods to work with struct timespec.
Definition: srv_time.hpp:59
bool IsServerStopping(void)
Process information in the NCBI Registry, including working with configuration files.
void ReleaseThreadSched(SSrvThread *thr)
Definition: scheduler.cpp:593
void RCUPassQS(SRCUInfo *rcu)
Definition: rcu.cpp:122
static void RequestShutdown(ESrvShutdownType shutdown_type)
Asks server to start shutdown procedures.
virtual ~SSrvThread(void)
bool ReConfig_Threads(const CTempString &, const CNcbiRegistry &, string &)
Uint8 seen_jiffy
Definition: threads_man.hpp:80
void DoSocketWait(void)
else result
Definition: token2.c:20
CNcbiRegistry –.
Definition: ncbireg.hpp:914
void AssignThreadLogging(SSrvThread *thr)
Definition: logging.cpp:606
SSocketsData * socks
Definition: threads_man.hpp:92
static void s_JoinAllThreads(void)
void Critical(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1047
void RCUInitNewThread(SSrvThread *thr)
Definition: rcu.cpp:143
CSrvStat * stat
Definition: threads_man.hpp:93
void SchedCheckOverloads(void)
Definition: scheduler.cpp:350
static int CurSecs(void)
Current time in seconds since epoch (time_t).
void ReleaseThreadLogging(SSrvThread *thr)
Definition: logging.cpp:653
Mutex created to have minimum possible size (its size is 4 bytes) and to sleep using kernel capabilit...
Definition: srv_sync.hpp:192
EThreadState thread_state
Definition: threads_man.hpp:82
CSrvTime s_JiffyTime
Definition: time_man.cpp:54
static void * s_WorkerThreadMain(void *data)
void SetAllSocksRunnable(SSocketsData *socks)
TSrvThreadNum GetCntRunningThreads(void)
static void * s_ServiceThreadMain(void *)
CSrvSocketTask & WriteNumber(NumType num)
Write number into socket as string, i.e.
void LogNoteThreadsStarted(void)
Definition: logging.cpp:517
Uint8 s_CurJiffies
Definition: time_man.cpp:53
CRef< CTestThread > thr[k_NumThreadsMax]
Definition: test_mt.cpp:245
void RequestThreadStart(SSrvThread *thr)
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:98
Task controlling a socket.
void RequestThreadRevive(SSrvThread *thr)
void Unlock(void)
Unlock the mutex.
Definition: srv_sync.cpp:119
Modified on Sun Nov 19 16:42:18 2017 by modify_doxy.py rev. 546573