NCBI C++ Toolkit Cross Reference

C++/src/util/scheduler.cpp


  1 /*  $Id: scheduler.cpp 166123 2009-07-17 16:19:20Z satskyse $
  2  * ===========================================================================
  3  *
  4  *                            PUBLIC DOMAIN NOTICE
  5  *               National Center for Biotechnology Information
  6  *
  7  *  This software/database is a "United States Government Work" under the
  8  *  terms of the United States Copyright Act.  It was written as part of
  9  *  the author's official duties as a United States Government employee and
 10  *  thus cannot be copyrighted.  This software/database is freely available
 11  *  to the public for use. The National Library of Medicine and the U.S.
 12  *  Government have not placed any restriction on its use or reproduction.
 13  *
 14  *  Although all reasonable efforts have been taken to ensure the accuracy
 15  *  and reliability of the software and data, the NLM and the U.S.
 16  *  Government do not and cannot warrant the performance or results that
 17  *  may be obtained by using this software or data. The NLM and the U.S.
 18  *  Government disclaim all warranties, express or implied, including
 19  *  warranties of performance, merchantability or fitness for any particular
 20  *  purpose.
 21  *
 22  *  Please cite the author in any work or product based on this material.
 23  *
 24  * ===========================================================================
 25  *
 26  * Authors:  Pavel Ivanov
 27  *
 28  * File Description:
 29  *   Implementation of Scheduler-related classes.
 30  *
 31  */
 32 
 33 #include <ncbi_pch.hpp>
 34 #include <corelib/ncbimtx.hpp>
 35 #include <corelib/ncbi_limits.hpp>
 36 #include <util/scheduler.hpp>
 37 #include <util/sync_queue.hpp>
 38 #include <util/error_codes.hpp>
 39 
 40 
 41 #define NCBI_USE_ERRCODE_X   Util_Scheduler
 42 
 43 
 44 BEGIN_NCBI_SCOPE
 45 
 46 
 47 /// Class storing full information about scheduled event for task execution
 48 class CScheduler_QueueEvent : public CObject
 49 {
 50 public:
 51     /// Id of the series
 52     TScheduler_SeriesID     id;
 53     /// Task itself
 54     CIRef<IScheduler_Task>  task;
 55     /// Time when this event will be executed
 56     CTime                   exec_time;
 57     /// Period of repetitive execution of the task
 58     CTimeSpan               period;
 59 
 60     /// How to run repetitive tasks including not repeating at all
 61     /// @sa IScheduler::ERepeatPattern
 62     enum ERepeatPattern
 63     {
 64         /// Execute tasks in the specified period of time after the *START*
 65         /// of previous task's execution
 66         eWithRate = IScheduler::eWithRate,
 67         /// Execute tasks in the specified period of time after the *END*
 68         /// of previous task's execution
 69         eWithDelay = IScheduler::eWithDelay,
 70         /// Execute the task only once
 71         eNoRepeat
 72     };
 73 
 74     /// Repeating pattern of the task
 75     ERepeatPattern          repeat_pattern;
 76 
 77 
 78     /// Check if this event matches given series id
 79     bool IsMatch(TScheduler_SeriesID id) const
 80     {
 81         return this->id == id;
 82     }
 83 
 84     /// Check if this event matches given task
 85     bool IsMatch(IScheduler_Task* task) const
 86     {
 87         return &*this->task == task;
 88     }
 89 
 90     /// Dummy function to support code templates and avoid duplication of code
 91     bool IsMatch(bool dummy_val) const
 92     {
 93         return dummy_val;
 94     }
 95 
 96 #ifdef NCBI_COMPILER_ICC
 97     // In the absence of the following constructor,
 98     // ICC fills the object memory with zeros,
 99     // erasing flags set by CObject::operator new
100     CScheduler_QueueEvent() {}
101 #endif
102 };
103 
104 
105 
106 /// Class for comparing references to CSchedQueueTask by its execution time
107 struct PScheduler_QueueEvent_Compare
108 {
109     bool operator() (const CRef<CScheduler_QueueEvent>&  left,
110                      const CRef<CScheduler_QueueEvent>&  right)
111     {
112         return left->exec_time < right->exec_time;
113     }
114 };
115 
116 
117 
118 /// Thread-safe implementation of IScheduler interface
119 class CScheduler_MT
120     : public IScheduler,
121       public CObject
122 {
123 public:
124     /// Schedule task for one-time execution
125     virtual
126     TScheduler_SeriesID AddTask(IScheduler_Task* task,
127                                 const CTime&     exec_time);
128 
129     /// Schedule task for repetitive execution
130     virtual
131     TScheduler_SeriesID AddRepetitiveTask(IScheduler_Task*  task,
132                                           const CTime&      start_time,
133                                           const CTimeSpan&  period,
134                                           ERepeatPattern    repeat_pattern);
135 
136     /// Remove series from scheduler queue
137     virtual
138     void RemoveSeries(TScheduler_SeriesID series_id);
139 
140     /// Remove task from scheduler queue
141     virtual
142     void RemoveTask(IScheduler_Task* task);
143 
144     /// Unschedule all series waiting in scheduler queue
145     virtual
146     void RemoveAllSeries(void);
147 
148     /// Get full scheduler series list
149     virtual
150     void GetScheduledSeries(vector<SScheduler_SeriesInfo>* series) const;
151 
152     /// Add listener which will be notified about changing in time
153     /// of availability of next scheduled task
154     virtual
155     void RegisterListener(IScheduler_Listener* listener);
156 
157     /// Remove scheduler listener
158     virtual
159     void UnregisterListener(IScheduler_Listener* listener);
160 
161     /// Get next time point when scheduler will be ready to execute some task
162     /// If there are already tasks to execute then return current time.
163     virtual
164     CTime GetNextExecutionTime(void) const;
165 
166     /// Check if there are tasks in scheduler queue (if it is not empty)
167     virtual
168     bool IsEmpty(void) const;
169 
170     /// Check if there are tasks ready to execute
171     virtual
172     bool HasTasksToExecute(const CTime& now) const;
173 
174     /// Get information about next task that is ready to execute
175     /// If there are no tasks to execute then return id = 0 and task = NULL
176     virtual
177     SScheduler_SeriesInfo GetNextTaskToExecute(const CTime& now);
178 
179     /// Be aware that task was just finished its execution
180     virtual
181     void TaskExecuted(TScheduler_SeriesID series_id, const CTime& now);
182 
183     /// Constructor
184     CScheduler_MT(void);
185 
186 protected:
187     /// Destructor. To be called from CRef.
188     virtual ~CScheduler_MT(void);
189 
190 private:
191     /// Prohibit copying and assigning
192     CScheduler_MT(const CScheduler_MT&);
193     CScheduler_MT& operator= (const CScheduler_MT&);
194 
195     /// Schedule task execution
196     /// @param id
197     ///   id of the scheduler series. if 0 then it is assigned automatically
198     /// @param task
199     ///   Task to execute
200     /// @param exec_time
201     ///   Time when task will be executed
202     /// @param num_repeats
203     ///   Total number of task executions
204     /// @param period
205     ///   Period between task executions
206     /// @param isDelay
207     ///   Whether period is executed from the beginning oor from the ending
208     ///   of the task execution
209     /// @param guard
210     ///   Guard for the main mutex which will be released at the end of method
211     TScheduler_SeriesID x_AddQueueTask
212     (
213         TScheduler_SeriesID                    id,
214         IScheduler_Task*                       task,
215         const CTime&                           exec_time,
216         const CTimeSpan&                       period,
217         CScheduler_QueueEvent::ERepeatPattern  repeat_pattern,
218         CMutexGuard*                           guard
219     );
220 
221     /// Change next execution time when queue of scheduled tasks is changed.
222     /// Notify all listeners about change if needed.
223     /// @param guard
224     ///   Guardian locking main scheduler mutex which must be unlocked before
225     ///   notification of listeners. NB: after method execution mutex is not
226     ///   locked anymore.
227     void x_SchedQueueChanged(CMutexGuard* guard);
228 
229     /// Implementation of removing task from queue.
230     /// The task is searched by criteria given as a parameter. Parameter
231     /// can be of any type that is accepted by
232     /// CScheduler_QueueEvent::IsMatch().
233     ///
234     /// @sa CScheduler_QueueEvent::IsMatch(), RemoveSeries(), RemoveTask()
235     template <class T>
236     void x_RemoveTaskImpl(T task);
237 
238 
239     /// Type of queue for information about scheduled tasks
240     typedef CSyncQueue_multiset<CRef<CScheduler_QueueEvent>,
241                                 PScheduler_QueueEvent_Compare> TSchedQueue;
242     /// Type of list of information about currently executing tasks
243     typedef deque< CRef<CScheduler_QueueEvent> >               TExecList;
244     /// Type of list of all scheduler listeners
245     typedef vector<IScheduler_Listener*>                       TListenersList;
246 
247     /// Queue of scheduled tasks
248     TSchedQueue     m_ScheduledTasks;
249     /// List of executing tasks
250     TExecList       m_ExecutingTasks;
251     /// Counter for generating task id
252     CAtomicCounter  m_IDCounter;
253     /// Main mutex for protection of changes in scheduler
254     mutable CMutex  m_MainMutex;
255     /// List of all scheduler listeners
256     TListenersList  m_Listeners;
257     /// Time of execution of nearest task
258     CTime           m_NextExecTime;
259 };
260 
261 
262 
263 // Max time_t minus a couple days to avoid any possible problems related to
264 // conversions to local time etc.
265 static const CTime kInfinityTime(0x7FFB0000);
266 
267 
268 CScheduler_MT::CScheduler_MT(void)
269     : m_NextExecTime(kInfinityTime)
270 {
271     m_IDCounter.Set(0);
272 }
273 
274 CScheduler_MT::~CScheduler_MT(void)
275 {
276 }
277 
278 TScheduler_SeriesID
279 CScheduler_MT::x_AddQueueTask
280 (
281     TScheduler_SeriesID                    id,
282     IScheduler_Task*                       task,
283     const CTime&                           exec_time,
284     const CTimeSpan&                       period,
285     CScheduler_QueueEvent::ERepeatPattern  repeat_pattern,
286     CMutexGuard*                           guard
287 )
288 {
289     // Be sure that task is referenced and will be destroyed in case
290     // of any exception
291     CIRef<IScheduler_Task> task_ref(task);
292 
293     CRef<CScheduler_QueueEvent> event_info(new CScheduler_QueueEvent());
294 
295     if (id == 0) {
296         id = m_IDCounter.Add(1);
297     }
298 
299     event_info->id = id;
300     event_info->task = task;
301     event_info->exec_time = exec_time;
302     event_info->period = period;
303     event_info->repeat_pattern = repeat_pattern;
304 
305     m_ScheduledTasks.push_back(event_info);
306 
307     x_SchedQueueChanged(guard);
308     // Mutex unlocked!!!
309 
310     return id;
311 }
312 
313 void
314 CScheduler_MT::x_SchedQueueChanged(CMutexGuard* guard)
315 {
316     TListenersList listeners;
317 
318     {{
319         // This part will be guarded by the mutex
320 
321         CTime next_time;
322 
323         if (m_ScheduledTasks.size() == 0) {
324             next_time = kInfinityTime;
325         }
326         else {
327             next_time = (*m_ScheduledTasks.begin())->exec_time;
328         }
329 
330         if (next_time != m_NextExecTime) {
331             m_NextExecTime = next_time;
332             listeners = m_Listeners;
333         }
334 
335         guard->Release();
336     }}
337 
338     NON_CONST_ITERATE(TListenersList, it, listeners) {
339         (*it)->OnNextExecutionTimeChange(this);
340     }
341 }
342 
343 TScheduler_SeriesID
344 CScheduler_MT::AddTask(IScheduler_Task* task, const CTime& exec_time)
345 {
346     CMutexGuard guard(m_MainMutex);
347 
348     return x_AddQueueTask(0, task, exec_time, CTimeSpan(),
349                           CScheduler_QueueEvent::eNoRepeat, &guard);
350 }
351 
352 TScheduler_SeriesID
353 CScheduler_MT::AddRepetitiveTask(IScheduler_Task*  task,
354                                  const CTime&      start_time,
355                                  const CTimeSpan&  period,
356                                  ERepeatPattern    repeat_pattern)
357 {
358     CMutexGuard guard(m_MainMutex);
359 
360     return x_AddQueueTask(0, task, start_time, period,
361                        CScheduler_QueueEvent::ERepeatPattern(repeat_pattern),
362                        &guard);
363 }
364 
365 template <class T>
366 inline void
367 CScheduler_MT::x_RemoveTaskImpl(T task)
368 {
369     CMutexGuard guard(m_MainMutex);
370 
371     bool is_begin_removed = false;
372 
373     for (TSchedQueue::iterator it = m_ScheduledTasks.begin();
374                                 it != m_ScheduledTasks.end(); )
375     {
376         if ((*it)->IsMatch(task)) {
377             if (it == m_ScheduledTasks.begin()) {
378                 is_begin_removed = true;
379             }
380             it = m_ScheduledTasks.erase(it);
381         }
382         else {
383             ++it;
384         }
385     }
386 
387     ITERATE(TExecList, it, m_ExecutingTasks) {
388         if ((*it)->IsMatch(task)) {
389             it->GetNCPointer()->repeat_pattern = CScheduler_QueueEvent::eNoRepeat;
390         }
391     }
392 
393     if (is_begin_removed) {
394         x_SchedQueueChanged(&guard);
395         // Mutex unlocked!!!
396     }
397 }
398 
399 void
400 CScheduler_MT::RemoveSeries(TScheduler_SeriesID series_id)
401 {
402     x_RemoveTaskImpl(series_id);
403 }
404 
405 void
406 CScheduler_MT::RemoveTask(IScheduler_Task* task)
407 {
408     x_RemoveTaskImpl(task);
409 }
410 
411 void
412 CScheduler_MT::RemoveAllSeries(void)
413 {
414     x_RemoveTaskImpl(true);
415 }
416 
417 void
418 CScheduler_MT::GetScheduledSeries(vector<SScheduler_SeriesInfo>* series) const
419 {
420     series->clear();
421 
422     {{
423         CMutexGuard guard(m_MainMutex);
424 
425         series->resize(m_ScheduledTasks.size());
426         size_t ind = 0;
427         ITERATE (TSchedQueue, it, m_ScheduledTasks) {
428             (*series)[ind].id   = (*it)->id;
429             (*series)[ind].task = (*it)->task;
430             ++ind;
431         }
432 
433         ITERATE(TExecList, it, m_ExecutingTasks) {
434             if ((*it)->repeat_pattern != CScheduler_QueueEvent::eNoRepeat) {
435                 series->resize(ind + 1);
436                 (*series)[ind].id   = (*it)->id;
437                 (*series)[ind].task = (*it)->task;
438                 ++ind;
439             }
440         }
441     }}
442 }
443 
444 void
445 CScheduler_MT::RegisterListener(IScheduler_Listener* listener)
446 {
447     CMutexGuard guard(m_MainMutex);
448 
449     m_Listeners.push_back(listener);
450 }
451 
452 void
453 CScheduler_MT::UnregisterListener(IScheduler_Listener* listener)
454 {
455     CMutexGuard guard(m_MainMutex);
456 
457     TListenersList::iterator it = find(m_Listeners.begin(), m_Listeners.end(),
458                                        listener);
459 
460     if (it != m_Listeners.end()) {
461         m_Listeners.erase(it);
462     }
463 }
464 
465 CTime
466 CScheduler_MT::GetNextExecutionTime(void) const
467 {
468     CMutexGuard guard(m_MainMutex);
469 
470     return m_NextExecTime;
471 }
472 
473 bool
474 CScheduler_MT::IsEmpty(void) const
475 {
476     CMutexGuard guard(m_MainMutex);
477 
478     bool result = m_ScheduledTasks.empty();
479 
480     if (result) {
481         ITERATE(TExecList, it, m_ExecutingTasks) {
482             if ((*it)->repeat_pattern != CScheduler_QueueEvent::eNoRepeat)
483             {
484                 result = false;
485                 break;
486             }
487         }
488     }
489 
490     return result;
491 }
492 
493 bool
494 CScheduler_MT::HasTasksToExecute(const CTime& now) const
495 {
496     CMutexGuard guard(m_MainMutex);
497 
498     return m_NextExecTime <= now;
499 }
500 
501 SScheduler_SeriesInfo
502 CScheduler_MT::GetNextTaskToExecute(const CTime& now)
503 {
504     SScheduler_SeriesInfo res_info;
505     res_info.id = 0;
506     CRef<CScheduler_QueueEvent> event_info;
507 
508     {{
509         CMutexGuard guard(m_MainMutex);
510 
511         if (m_ScheduledTasks.size() == 0
512             ||  (*m_ScheduledTasks.begin())->exec_time > now)
513         {
514             return res_info;
515         }
516 
517         event_info = m_ScheduledTasks.front();
518         m_ScheduledTasks.pop_front();
519         m_ExecutingTasks.push_back(event_info);
520 
521         res_info.id   = event_info->id;
522         res_info.task = event_info->task;
523 
524         if (event_info->repeat_pattern == CScheduler_QueueEvent::eWithRate) {
525             x_AddQueueTask(event_info->id,
526                            event_info->task,
527                            event_info->exec_time + event_info->period,
528                            event_info->period,
529                            event_info->repeat_pattern,
530                            &guard);
531             // Mutex unlocked!!!
532             // x_SchedQueueChanged() is called inside x_AddQueueTask()
533         }
534         else {
535             // x_SchedQueueChanged() should be called anyway because we've changed
536             // the beginning of the queue
537             x_SchedQueueChanged(&guard);
538             // Mutex unlocked!!!
539         }
540     }}
541 
542     return res_info;
543 }
544 
545 void
546 CScheduler_MT::TaskExecuted(TScheduler_SeriesID series_id, const CTime& now)
547 {
548     CMutexGuard guard(m_MainMutex);
549 
550     CRef<CScheduler_QueueEvent> event_info;
551 
552     NON_CONST_ITERATE(TExecList, it, m_ExecutingTasks) {
553         if ((*it)->IsMatch(series_id)) {
554             event_info = *it;
555             m_ExecutingTasks.erase(it);
556             break;
557         }
558     }
559 
560     if (event_info.IsNull()) {
561         return;
562     }
563 
564     if (event_info->repeat_pattern == CScheduler_QueueEvent::eWithDelay) {
565         x_AddQueueTask(event_info->id,
566                        event_info->task,
567                        now + event_info->period,
568                        event_info->period,
569                        event_info->repeat_pattern,
570                        &guard);
571         // Mutex unlocked!!!
572     }
573 }
574 
575 
576 
577 CIRef<IScheduler>
578 IScheduler::Create(void)
579 {
580     return CIRef<IScheduler>(new CScheduler_MT());
581 }
582 
583 
584 
585 /// Standalone thread to execute scheduled tasks - implementation
586 class CScheduler_ExecThread_Impl
587     : public IScheduler_Listener,
588       public CThread
589 {
590 public:
591     /// Constructor
592     /// @param scheduler
593     ///   Scheduler which tasks will be executed
594     CScheduler_ExecThread_Impl(IScheduler* scheduler);
595 
596     /// Callback from the scheduler -- about changes in the execution timeline
597     virtual void OnNextExecutionTimeChange(IScheduler*);
598 
599     /// Stop executing the tasks, and finish the thread.
600     /// This method should be called to force executor to finish its work
601     /// and destroy. Without it destruction will be impossible.
602     void Stop(void);
603 
604 protected:
605     /// Destructor, to be called from CRef
606     virtual ~CScheduler_ExecThread_Impl();
607 
608     /// Main thread function
609     virtual void* Main(void);
610 
611 private:
612     /// Prohibit copying and assignment
613     CScheduler_ExecThread_Impl(const CScheduler_ExecThread_Impl&);
614     CScheduler_ExecThread_Impl& operator= (const CScheduler_ExecThread_Impl&);
615 
616     /// Scheduler controlled by the executor
617     CIRef<IScheduler>                 m_Scheduler;
618     /// Reference to self to avoid destruction earlier than needed
619     CRef<CScheduler_ExecThread_Impl>  m_SelfRef;
620     /// Semaphore for handling idle waiting
621     CSemaphore                        m_WaitTrigger;
622     /// If the thread has been requested to stop
623     volatile bool                     m_Stopped;
624 };
625 
626 
627 
628 CScheduler_ExecThread_Impl::CScheduler_ExecThread_Impl(IScheduler*  scheduler)
629     : m_Scheduler(scheduler),
630       m_WaitTrigger(0, kMax_Int),
631       m_Stopped(false)
632 {
633     m_SelfRef = this;
634     m_Scheduler->RegisterListener(this);
635     Run(CThread::fRunDetached);
636 }
637 
638 CScheduler_ExecThread_Impl::~CScheduler_ExecThread_Impl(void)
639 {}
640 
641 void
642 CScheduler_ExecThread_Impl::OnNextExecutionTimeChange(IScheduler*)
643 {
644     m_WaitTrigger.Post();
645 }
646 
647 void
648 CScheduler_ExecThread_Impl::Stop(void)
649 {
650     m_Stopped = true;
651     m_WaitTrigger.Post();
652     m_SelfRef = NULL;
653 }
654 
655 void*
656 CScheduler_ExecThread_Impl::Main(void)
657 {
658     CTime cur_time(CTime::eCurrent);
659     while ( !m_Stopped ) {
660         CTimeSpan timeout = m_Scheduler->GetNextExecutionTime() - cur_time;
661         m_WaitTrigger.TryWait(timeout.GetCompleteSeconds(),
662                               timeout.GetNanoSecondsAfterSecond());
663 
664         // If we are already stopped we will not do unnecessary work
665         if ( !m_Stopped ) {
666             cur_time.SetCurrent();
667             for (;;) {
668                 SScheduler_SeriesInfo task_info =
669                     m_Scheduler->GetNextTaskToExecute(cur_time);
670 
671                 if (task_info.task.IsNull())
672                     break;
673 
674                 try {
675                     task_info.task->Execute();
676                 }
677                 catch (exception& e) {
678                     ERR_POST_X(1, "Exception in scheduler task execution: "
679                                   << e.what());
680                 }
681 
682                 if (m_Stopped)
683                     break;
684 
685                 cur_time.SetCurrent();
686                 m_Scheduler->TaskExecuted(task_info.id, cur_time);
687             }
688         }
689     }
690 
691     return NULL;
692 }
693 
694 
695 
696 CScheduler_ExecutionThread::CScheduler_ExecutionThread(IScheduler* scheduler)
697     : m_Impl(new CScheduler_ExecThread_Impl(scheduler))
698 {
699 }
700 
701 CScheduler_ExecutionThread::~CScheduler_ExecutionThread(void)
702 {
703     m_Impl->Stop();
704 }
705 
706 END_NCBI_SCOPE
707 

source navigation ]   [ diff markup ]   [ identifier search ]   [ freetext search ]   [ file search ]  

This page was automatically generated by the LXR engine.
Visit the LXR main site for more information.