|
NCBI Home IEB Home C++ Toolkit docs C Toolkit source browser C Toolkit source browser (2) |
NCBI C++ Toolkit Cross ReferenceC++/src/util/scheduler.cpp |
source navigation diff markup identifier search freetext search file search |
1 /* $Id: scheduler.cpp 166123 2009-07-17 16:19:20Z satskyse $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Authors: Pavel Ivanov
27 *
28 * File Description:
29 * Implementation of Scheduler-related classes.
30 *
31 */
32
33 #include <ncbi_pch.hpp>
34 #include <corelib/ncbimtx.hpp>
35 #include <corelib/ncbi_limits.hpp>
36 #include <util/scheduler.hpp>
37 #include <util/sync_queue.hpp>
38 #include <util/error_codes.hpp>
39
40
41 #define NCBI_USE_ERRCODE_X Util_Scheduler
42
43
44 BEGIN_NCBI_SCOPE
45
46
47 /// Class storing full information about scheduled event for task execution
48 class CScheduler_QueueEvent : public CObject
49 {
50 public:
51 /// Id of the series
52 TScheduler_SeriesID id;
53 /// Task itself
54 CIRef<IScheduler_Task> task;
55 /// Time when this event will be executed
56 CTime exec_time;
57 /// Period of repetitive execution of the task
58 CTimeSpan period;
59
60 /// How to run repetitive tasks including not repeating at all
61 /// @sa IScheduler::ERepeatPattern
62 enum ERepeatPattern
63 {
64 /// Execute tasks in the specified period of time after the *START*
65 /// of previous task's execution
66 eWithRate = IScheduler::eWithRate,
67 /// Execute tasks in the specified period of time after the *END*
68 /// of previous task's execution
69 eWithDelay = IScheduler::eWithDelay,
70 /// Execute the task only once
71 eNoRepeat
72 };
73
74 /// Repeating pattern of the task
75 ERepeatPattern repeat_pattern;
76
77
78 /// Check if this event matches given series id
79 bool IsMatch(TScheduler_SeriesID id) const
80 {
81 return this->id == id;
82 }
83
84 /// Check if this event matches given task
85 bool IsMatch(IScheduler_Task* task) const
86 {
87 return &*this->task == task;
88 }
89
90 /// Dummy function to support code templates and avoid duplication of code
91 bool IsMatch(bool dummy_val) const
92 {
93 return dummy_val;
94 }
95
96 #ifdef NCBI_COMPILER_ICC
97 // In the absence of the following constructor,
98 // ICC fills the object memory with zeros,
99 // erasing flags set by CObject::operator new
100 CScheduler_QueueEvent() {}
101 #endif
102 };
103
104
105
106 /// Class for comparing references to CSchedQueueTask by its execution time
107 struct PScheduler_QueueEvent_Compare
108 {
109 bool operator() (const CRef<CScheduler_QueueEvent>& left,
110 const CRef<CScheduler_QueueEvent>& right)
111 {
112 return left->exec_time < right->exec_time;
113 }
114 };
115
116
117
118 /// Thread-safe implementation of IScheduler interface
119 class CScheduler_MT
120 : public IScheduler,
121 public CObject
122 {
123 public:
124 /// Schedule task for one-time execution
125 virtual
126 TScheduler_SeriesID AddTask(IScheduler_Task* task,
127 const CTime& exec_time);
128
129 /// Schedule task for repetitive execution
130 virtual
131 TScheduler_SeriesID AddRepetitiveTask(IScheduler_Task* task,
132 const CTime& start_time,
133 const CTimeSpan& period,
134 ERepeatPattern repeat_pattern);
135
136 /// Remove series from scheduler queue
137 virtual
138 void RemoveSeries(TScheduler_SeriesID series_id);
139
140 /// Remove task from scheduler queue
141 virtual
142 void RemoveTask(IScheduler_Task* task);
143
144 /// Unschedule all series waiting in scheduler queue
145 virtual
146 void RemoveAllSeries(void);
147
148 /// Get full scheduler series list
149 virtual
150 void GetScheduledSeries(vector<SScheduler_SeriesInfo>* series) const;
151
152 /// Add listener which will be notified about changing in time
153 /// of availability of next scheduled task
154 virtual
155 void RegisterListener(IScheduler_Listener* listener);
156
157 /// Remove scheduler listener
158 virtual
159 void UnregisterListener(IScheduler_Listener* listener);
160
161 /// Get next time point when scheduler will be ready to execute some task
162 /// If there are already tasks to execute then return current time.
163 virtual
164 CTime GetNextExecutionTime(void) const;
165
166 /// Check if there are tasks in scheduler queue (if it is not empty)
167 virtual
168 bool IsEmpty(void) const;
169
170 /// Check if there are tasks ready to execute
171 virtual
172 bool HasTasksToExecute(const CTime& now) const;
173
174 /// Get information about next task that is ready to execute
175 /// If there are no tasks to execute then return id = 0 and task = NULL
176 virtual
177 SScheduler_SeriesInfo GetNextTaskToExecute(const CTime& now);
178
179 /// Be aware that task was just finished its execution
180 virtual
181 void TaskExecuted(TScheduler_SeriesID series_id, const CTime& now);
182
183 /// Constructor
184 CScheduler_MT(void);
185
186 protected:
187 /// Destructor. To be called from CRef.
188 virtual ~CScheduler_MT(void);
189
190 private:
191 /// Prohibit copying and assigning
192 CScheduler_MT(const CScheduler_MT&);
193 CScheduler_MT& operator= (const CScheduler_MT&);
194
195 /// Schedule task execution
196 /// @param id
197 /// id of the scheduler series. if 0 then it is assigned automatically
198 /// @param task
199 /// Task to execute
200 /// @param exec_time
201 /// Time when task will be executed
202 /// @param num_repeats
203 /// Total number of task executions
204 /// @param period
205 /// Period between task executions
206 /// @param isDelay
207 /// Whether period is executed from the beginning oor from the ending
208 /// of the task execution
209 /// @param guard
210 /// Guard for the main mutex which will be released at the end of method
211 TScheduler_SeriesID x_AddQueueTask
212 (
213 TScheduler_SeriesID id,
214 IScheduler_Task* task,
215 const CTime& exec_time,
216 const CTimeSpan& period,
217 CScheduler_QueueEvent::ERepeatPattern repeat_pattern,
218 CMutexGuard* guard
219 );
220
221 /// Change next execution time when queue of scheduled tasks is changed.
222 /// Notify all listeners about change if needed.
223 /// @param guard
224 /// Guardian locking main scheduler mutex which must be unlocked before
225 /// notification of listeners. NB: after method execution mutex is not
226 /// locked anymore.
227 void x_SchedQueueChanged(CMutexGuard* guard);
228
229 /// Implementation of removing task from queue.
230 /// The task is searched by criteria given as a parameter. Parameter
231 /// can be of any type that is accepted by
232 /// CScheduler_QueueEvent::IsMatch().
233 ///
234 /// @sa CScheduler_QueueEvent::IsMatch(), RemoveSeries(), RemoveTask()
235 template <class T>
236 void x_RemoveTaskImpl(T task);
237
238
239 /// Type of queue for information about scheduled tasks
240 typedef CSyncQueue_multiset<CRef<CScheduler_QueueEvent>,
241 PScheduler_QueueEvent_Compare> TSchedQueue;
242 /// Type of list of information about currently executing tasks
243 typedef deque< CRef<CScheduler_QueueEvent> > TExecList;
244 /// Type of list of all scheduler listeners
245 typedef vector<IScheduler_Listener*> TListenersList;
246
247 /// Queue of scheduled tasks
248 TSchedQueue m_ScheduledTasks;
249 /// List of executing tasks
250 TExecList m_ExecutingTasks;
251 /// Counter for generating task id
252 CAtomicCounter m_IDCounter;
253 /// Main mutex for protection of changes in scheduler
254 mutable CMutex m_MainMutex;
255 /// List of all scheduler listeners
256 TListenersList m_Listeners;
257 /// Time of execution of nearest task
258 CTime m_NextExecTime;
259 };
260
261
262
263 // Max time_t minus a couple days to avoid any possible problems related to
264 // conversions to local time etc.
265 static const CTime kInfinityTime(0x7FFB0000);
266
267
268 CScheduler_MT::CScheduler_MT(void)
269 : m_NextExecTime(kInfinityTime)
270 {
271 m_IDCounter.Set(0);
272 }
273
274 CScheduler_MT::~CScheduler_MT(void)
275 {
276 }
277
278 TScheduler_SeriesID
279 CScheduler_MT::x_AddQueueTask
280 (
281 TScheduler_SeriesID id,
282 IScheduler_Task* task,
283 const CTime& exec_time,
284 const CTimeSpan& period,
285 CScheduler_QueueEvent::ERepeatPattern repeat_pattern,
286 CMutexGuard* guard
287 )
288 {
289 // Be sure that task is referenced and will be destroyed in case
290 // of any exception
291 CIRef<IScheduler_Task> task_ref(task);
292
293 CRef<CScheduler_QueueEvent> event_info(new CScheduler_QueueEvent());
294
295 if (id == 0) {
296 id = m_IDCounter.Add(1);
297 }
298
299 event_info->id = id;
300 event_info->task = task;
301 event_info->exec_time = exec_time;
302 event_info->period = period;
303 event_info->repeat_pattern = repeat_pattern;
304
305 m_ScheduledTasks.push_back(event_info);
306
307 x_SchedQueueChanged(guard);
308 // Mutex unlocked!!!
309
310 return id;
311 }
312
313 void
314 CScheduler_MT::x_SchedQueueChanged(CMutexGuard* guard)
315 {
316 TListenersList listeners;
317
318 {{
319 // This part will be guarded by the mutex
320
321 CTime next_time;
322
323 if (m_ScheduledTasks.size() == 0) {
324 next_time = kInfinityTime;
325 }
326 else {
327 next_time = (*m_ScheduledTasks.begin())->exec_time;
328 }
329
330 if (next_time != m_NextExecTime) {
331 m_NextExecTime = next_time;
332 listeners = m_Listeners;
333 }
334
335 guard->Release();
336 }}
337
338 NON_CONST_ITERATE(TListenersList, it, listeners) {
339 (*it)->OnNextExecutionTimeChange(this);
340 }
341 }
342
343 TScheduler_SeriesID
344 CScheduler_MT::AddTask(IScheduler_Task* task, const CTime& exec_time)
345 {
346 CMutexGuard guard(m_MainMutex);
347
348 return x_AddQueueTask(0, task, exec_time, CTimeSpan(),
349 CScheduler_QueueEvent::eNoRepeat, &guard);
350 }
351
352 TScheduler_SeriesID
353 CScheduler_MT::AddRepetitiveTask(IScheduler_Task* task,
354 const CTime& start_time,
355 const CTimeSpan& period,
356 ERepeatPattern repeat_pattern)
357 {
358 CMutexGuard guard(m_MainMutex);
359
360 return x_AddQueueTask(0, task, start_time, period,
361 CScheduler_QueueEvent::ERepeatPattern(repeat_pattern),
362 &guard);
363 }
364
365 template <class T>
366 inline void
367 CScheduler_MT::x_RemoveTaskImpl(T task)
368 {
369 CMutexGuard guard(m_MainMutex);
370
371 bool is_begin_removed = false;
372
373 for (TSchedQueue::iterator it = m_ScheduledTasks.begin();
374 it != m_ScheduledTasks.end(); )
375 {
376 if ((*it)->IsMatch(task)) {
377 if (it == m_ScheduledTasks.begin()) {
378 is_begin_removed = true;
379 }
380 it = m_ScheduledTasks.erase(it);
381 }
382 else {
383 ++it;
384 }
385 }
386
387 ITERATE(TExecList, it, m_ExecutingTasks) {
388 if ((*it)->IsMatch(task)) {
389 it->GetNCPointer()->repeat_pattern = CScheduler_QueueEvent::eNoRepeat;
390 }
391 }
392
393 if (is_begin_removed) {
394 x_SchedQueueChanged(&guard);
395 // Mutex unlocked!!!
396 }
397 }
398
399 void
400 CScheduler_MT::RemoveSeries(TScheduler_SeriesID series_id)
401 {
402 x_RemoveTaskImpl(series_id);
403 }
404
405 void
406 CScheduler_MT::RemoveTask(IScheduler_Task* task)
407 {
408 x_RemoveTaskImpl(task);
409 }
410
411 void
412 CScheduler_MT::RemoveAllSeries(void)
413 {
414 x_RemoveTaskImpl(true);
415 }
416
417 void
418 CScheduler_MT::GetScheduledSeries(vector<SScheduler_SeriesInfo>* series) const
419 {
420 series->clear();
421
422 {{
423 CMutexGuard guard(m_MainMutex);
424
425 series->resize(m_ScheduledTasks.size());
426 size_t ind = 0;
427 ITERATE (TSchedQueue, it, m_ScheduledTasks) {
428 (*series)[ind].id = (*it)->id;
429 (*series)[ind].task = (*it)->task;
430 ++ind;
431 }
432
433 ITERATE(TExecList, it, m_ExecutingTasks) {
434 if ((*it)->repeat_pattern != CScheduler_QueueEvent::eNoRepeat) {
435 series->resize(ind + 1);
436 (*series)[ind].id = (*it)->id;
437 (*series)[ind].task = (*it)->task;
438 ++ind;
439 }
440 }
441 }}
442 }
443
444 void
445 CScheduler_MT::RegisterListener(IScheduler_Listener* listener)
446 {
447 CMutexGuard guard(m_MainMutex);
448
449 m_Listeners.push_back(listener);
450 }
451
452 void
453 CScheduler_MT::UnregisterListener(IScheduler_Listener* listener)
454 {
455 CMutexGuard guard(m_MainMutex);
456
457 TListenersList::iterator it = find(m_Listeners.begin(), m_Listeners.end(),
458 listener);
459
460 if (it != m_Listeners.end()) {
461 m_Listeners.erase(it);
462 }
463 }
464
465 CTime
466 CScheduler_MT::GetNextExecutionTime(void) const
467 {
468 CMutexGuard guard(m_MainMutex);
469
470 return m_NextExecTime;
471 }
472
473 bool
474 CScheduler_MT::IsEmpty(void) const
475 {
476 CMutexGuard guard(m_MainMutex);
477
478 bool result = m_ScheduledTasks.empty();
479
480 if (result) {
481 ITERATE(TExecList, it, m_ExecutingTasks) {
482 if ((*it)->repeat_pattern != CScheduler_QueueEvent::eNoRepeat)
483 {
484 result = false;
485 break;
486 }
487 }
488 }
489
490 return result;
491 }
492
493 bool
494 CScheduler_MT::HasTasksToExecute(const CTime& now) const
495 {
496 CMutexGuard guard(m_MainMutex);
497
498 return m_NextExecTime <= now;
499 }
500
501 SScheduler_SeriesInfo
502 CScheduler_MT::GetNextTaskToExecute(const CTime& now)
503 {
504 SScheduler_SeriesInfo res_info;
505 res_info.id = 0;
506 CRef<CScheduler_QueueEvent> event_info;
507
508 {{
509 CMutexGuard guard(m_MainMutex);
510
511 if (m_ScheduledTasks.size() == 0
512 || (*m_ScheduledTasks.begin())->exec_time > now)
513 {
514 return res_info;
515 }
516
517 event_info = m_ScheduledTasks.front();
518 m_ScheduledTasks.pop_front();
519 m_ExecutingTasks.push_back(event_info);
520
521 res_info.id = event_info->id;
522 res_info.task = event_info->task;
523
524 if (event_info->repeat_pattern == CScheduler_QueueEvent::eWithRate) {
525 x_AddQueueTask(event_info->id,
526 event_info->task,
527 event_info->exec_time + event_info->period,
528 event_info->period,
529 event_info->repeat_pattern,
530 &guard);
531 // Mutex unlocked!!!
532 // x_SchedQueueChanged() is called inside x_AddQueueTask()
533 }
534 else {
535 // x_SchedQueueChanged() should be called anyway because we've changed
536 // the beginning of the queue
537 x_SchedQueueChanged(&guard);
538 // Mutex unlocked!!!
539 }
540 }}
541
542 return res_info;
543 }
544
545 void
546 CScheduler_MT::TaskExecuted(TScheduler_SeriesID series_id, const CTime& now)
547 {
548 CMutexGuard guard(m_MainMutex);
549
550 CRef<CScheduler_QueueEvent> event_info;
551
552 NON_CONST_ITERATE(TExecList, it, m_ExecutingTasks) {
553 if ((*it)->IsMatch(series_id)) {
554 event_info = *it;
555 m_ExecutingTasks.erase(it);
556 break;
557 }
558 }
559
560 if (event_info.IsNull()) {
561 return;
562 }
563
564 if (event_info->repeat_pattern == CScheduler_QueueEvent::eWithDelay) {
565 x_AddQueueTask(event_info->id,
566 event_info->task,
567 now + event_info->period,
568 event_info->period,
569 event_info->repeat_pattern,
570 &guard);
571 // Mutex unlocked!!!
572 }
573 }
574
575
576
577 CIRef<IScheduler>
578 IScheduler::Create(void)
579 {
580 return CIRef<IScheduler>(new CScheduler_MT());
581 }
582
583
584
585 /// Standalone thread to execute scheduled tasks - implementation
586 class CScheduler_ExecThread_Impl
587 : public IScheduler_Listener,
588 public CThread
589 {
590 public:
591 /// Constructor
592 /// @param scheduler
593 /// Scheduler which tasks will be executed
594 CScheduler_ExecThread_Impl(IScheduler* scheduler);
595
596 /// Callback from the scheduler -- about changes in the execution timeline
597 virtual void OnNextExecutionTimeChange(IScheduler*);
598
599 /// Stop executing the tasks, and finish the thread.
600 /// This method should be called to force executor to finish its work
601 /// and destroy. Without it destruction will be impossible.
602 void Stop(void);
603
604 protected:
605 /// Destructor, to be called from CRef
606 virtual ~CScheduler_ExecThread_Impl();
607
608 /// Main thread function
609 virtual void* Main(void);
610
611 private:
612 /// Prohibit copying and assignment
613 CScheduler_ExecThread_Impl(const CScheduler_ExecThread_Impl&);
614 CScheduler_ExecThread_Impl& operator= (const CScheduler_ExecThread_Impl&);
615
616 /// Scheduler controlled by the executor
617 CIRef<IScheduler> m_Scheduler;
618 /// Reference to self to avoid destruction earlier than needed
619 CRef<CScheduler_ExecThread_Impl> m_SelfRef;
620 /// Semaphore for handling idle waiting
621 CSemaphore m_WaitTrigger;
622 /// If the thread has been requested to stop
623 volatile bool m_Stopped;
624 };
625
626
627
628 CScheduler_ExecThread_Impl::CScheduler_ExecThread_Impl(IScheduler* scheduler)
629 : m_Scheduler(scheduler),
630 m_WaitTrigger(0, kMax_Int),
631 m_Stopped(false)
632 {
633 m_SelfRef = this;
634 m_Scheduler->RegisterListener(this);
635 Run(CThread::fRunDetached);
636 }
637
638 CScheduler_ExecThread_Impl::~CScheduler_ExecThread_Impl(void)
639 {}
640
641 void
642 CScheduler_ExecThread_Impl::OnNextExecutionTimeChange(IScheduler*)
643 {
644 m_WaitTrigger.Post();
645 }
646
647 void
648 CScheduler_ExecThread_Impl::Stop(void)
649 {
650 m_Stopped = true;
651 m_WaitTrigger.Post();
652 m_SelfRef = NULL;
653 }
654
655 void*
656 CScheduler_ExecThread_Impl::Main(void)
657 {
658 CTime cur_time(CTime::eCurrent);
659 while ( !m_Stopped ) {
660 CTimeSpan timeout = m_Scheduler->GetNextExecutionTime() - cur_time;
661 m_WaitTrigger.TryWait(timeout.GetCompleteSeconds(),
662 timeout.GetNanoSecondsAfterSecond());
663
664 // If we are already stopped we will not do unnecessary work
665 if ( !m_Stopped ) {
666 cur_time.SetCurrent();
667 for (;;) {
668 SScheduler_SeriesInfo task_info =
669 m_Scheduler->GetNextTaskToExecute(cur_time);
670
671 if (task_info.task.IsNull())
672 break;
673
674 try {
675 task_info.task->Execute();
676 }
677 catch (exception& e) {
678 ERR_POST_X(1, "Exception in scheduler task execution: "
679 << e.what());
680 }
681
682 if (m_Stopped)
683 break;
684
685 cur_time.SetCurrent();
686 m_Scheduler->TaskExecuted(task_info.id, cur_time);
687 }
688 }
689 }
690
691 return NULL;
692 }
693
694
695
696 CScheduler_ExecutionThread::CScheduler_ExecutionThread(IScheduler* scheduler)
697 : m_Impl(new CScheduler_ExecThread_Impl(scheduler))
698 {
699 }
700
701 CScheduler_ExecutionThread::~CScheduler_ExecutionThread(void)
702 {
703 m_Impl->Stop();
704 }
705
706 END_NCBI_SCOPE
707 |
This page was automatically generated by the
LXR engine.
Visit the LXR main site for more information. |