src/app/netschedule/job_status.cpp

Go to the documentation of this file.
00001 /*  $Id: job_status.cpp 157161 2009-04-13 15:39:48Z joukovv $
00002  * ===========================================================================
00003  *
00004  *                            PUBLIC DOMAIN NOTICE
00005  *               National Center for Biotechnology Information
00006  *
00007  *  This software/database is a "United States Government Work" under the
00008  *  terms of the United States Copyright Act.  It was written as part of
00009  *  the author's official duties as a United States Government employee and
00010  *  thus cannot be copyrighted.  This software/database is freely available
00011  *  to the public for use. The National Library of Medicine and the U.S.
00012  *  Government have not placed any restriction on its use or reproduction.
00013  *
00014  *  Although all reasonable efforts have been taken to ensure the accuracy
00015  *  and reliability of the software and data, the NLM and the U.S.
00016  *  Government do not and cannot warrant the performance or results that
00017  *  may be obtained by using this software or data. The NLM and the U.S.
00018  *  Government disclaim all warranties, express or implied, including
00019  *  warranties of performance, merchantability or fitness for any particular
00020  *  purpose.
00021  *
00022  *  Please cite the author in any work or product based on this material.
00023  *
00024  * ===========================================================================
00025  *
00026  * Authors:  Anatoliy Kuznetsov, Victor Joukov
00027  *
00028  * File Description: Network scheduler job status store
00029  *
00030  */
00031 #include <ncbi_pch.hpp>
00032 #include <corelib/ncbi_system.hpp>
00033 
00034 #include <connect/services/netschedule_api.hpp>
00035 #include <util/bitset/bmalgo.h>
00036 
00037 #include "job_status.hpp"
00038 
00039 BEGIN_NCBI_SCOPE
00040 
00041 
00042 CJobStatusTracker::CJobStatusTracker()
00043  : m_LastPending(0),
00044    m_DoneCnt(0)
00045 {
00046     for (int i = 0; i < CNetScheduleAPI::eLastStatus; ++i) {
00047         TNSBitVector* bv;
00048         bv = new TNSBitVector(bm::BM_GAP);
00049         m_StatusStor.push_back(bv);
00050     }
00051 }
00052 
00053 
00054 CJobStatusTracker::~CJobStatusTracker()
00055 {
00056     for (TStatusStorage::size_type i = 0; i < m_StatusStor.size(); ++i) {
00057         TNSBitVector* bv = m_StatusStor[i];
00058         delete bv;
00059     }
00060 }
00061 
00062 
00063 TJobStatus 
00064 CJobStatusTracker::GetStatus(unsigned job_id) const
00065 {
00066     CReadLockGuard guard(m_Lock);
00067     return x_GetStatusNoLock(job_id);
00068 }
00069 
00070 
00071 TJobStatus 
00072 CJobStatusTracker::x_GetStatusNoLock(unsigned job_id) const
00073 {
00074     for (int i = m_StatusStor.size()-1; i >= 0; --i) {
00075         const TNSBitVector& bv = *m_StatusStor[i];
00076         bool b = bv[job_id];
00077         if (b) return TJobStatus(i);
00078     }
00079     return CNetScheduleAPI::eJobNotFound;
00080 }
00081 
00082 
00083 unsigned
00084 CJobStatusTracker::CountStatus(TJobStatus status) const
00085 {
00086     CReadLockGuard guard(m_Lock);
00087     return m_StatusStor[(int)status]->count();
00088 }
00089 
00090 
00091 void CJobStatusTracker::CountStatus(TStatusSummaryMap*  status_map,
00092                                     const TNSBitVector* candidate_set)
00093 {
00094     _ASSERT(status_map);
00095     status_map->clear();
00096 
00097     CReadLockGuard guard(m_Lock);
00098 
00099     for (size_t i = 0; i < m_StatusStor.size(); ++i) {
00100         const TNSBitVector& bv = *m_StatusStor[i];
00101         unsigned cnt;
00102         if (candidate_set) {
00103             cnt = bm::count_and(bv, *candidate_set);
00104         } else {
00105             cnt = bv.count();
00106         }
00107         (*status_map)[(TJobStatus) i] = cnt;
00108     }
00109 }
00110 
00111 
00112 unsigned CJobStatusTracker::Count(void)
00113 {
00114     CReadLockGuard guard(m_Lock);
00115 
00116     unsigned cnt = 0;
00117     for (size_t i = 0; i < m_StatusStor.size(); ++i) {
00118         const TNSBitVector& bv = *m_StatusStor[i];
00119         cnt += bv.count();
00120     }
00121     return cnt;
00122 }
00123 
00124 
00125 void
00126 CJobStatusTracker::StatusStatistics(TJobStatus                status,
00127                                     TNSBitVector::statistics* st) const
00128 {
00129     _ASSERT(st);
00130     CReadLockGuard guard(m_Lock);
00131 
00132     const TNSBitVector& bv = *m_StatusStor[(int)status];
00133     bv.calc_stat(st);
00134 
00135 }
00136 
00137 
00138 void CJobStatusTracker::StatusSnapshot(TJobStatus    status,
00139                                        TNSBitVector* bv) const
00140 {
00141     _ASSERT(bv);
00142     CReadLockGuard guard(m_Lock);
00143 
00144     const TNSBitVector& bv_s = *m_StatusStor[(int)status];
00145     *bv |= bv_s;
00146 }
00147 
00148 
00149 void 
00150 CJobStatusTracker::SetStatus(unsigned   job_id,
00151                              TJobStatus status)
00152 {
00153     CWriteLockGuard guard(m_Lock);
00154 
00155     TJobStatus old_status = TJobStatus(-1);
00156     for (TStatusStorage::size_type i = 0; i < m_StatusStor.size(); ++i) {
00157         TNSBitVector& bv = *m_StatusStor[i];
00158         if (bv[job_id]) {
00159             if (old_status != TJobStatus(-1))
00160                 ERR_POST("State matrix was damaged, "
00161                          "more than one status active for job " << job_id);
00162             old_status = TJobStatus(i);
00163         }
00164         bv.set(job_id, (int)status == (int)i);
00165     }
00166     if (old_status == CNetScheduleAPI::eDone &&
00167         status     == CNetScheduleAPI::ePending)
00168         ERR_POST("Illegal status change from Done to Pending for job " <<
00169                  job_id);
00170     if (status == CNetScheduleAPI::eDone) {
00171         IncDoneJobs();
00172     }
00173 }
00174 
00175 void CJobStatusTracker::Erase(unsigned job_id)
00176 {
00177     SetStatus(job_id, CNetScheduleAPI::eJobNotFound);
00178 }
00179 
00180 
00181 void CJobStatusTracker::ClearAll(TNSBitVector* bv)
00182 {
00183     CWriteLockGuard guard(m_Lock);
00184 
00185     for (TStatusStorage::size_type i = 0; i < m_StatusStor.size(); ++i) {
00186         TNSBitVector& bv1 = *m_StatusStor[i];
00187         if (bv) {
00188             *bv |= bv1;
00189         }
00190         bv1.clear(true);
00191     }
00192 }
00193 
00194 
00195 void CJobStatusTracker::OptimizeMem()
00196 {
00197     for (TStatusStorage::size_type i = 0; i < m_StatusStor.size(); ++i) {
00198         TNSBitVector& bv = *m_StatusStor[i];
00199         {{
00200             CWriteLockGuard guard(m_Lock);
00201             bv.optimize(0, TNSBitVector::opt_free_0);
00202         }}
00203     }
00204 }
00205 
00206 
00207 void
00208 CJobStatusTracker::SetExactStatusNoLock(unsigned   job_id,
00209                                         TJobStatus status,
00210                                         bool       set_clear)
00211 {
00212     TNSBitVector& bv = *m_StatusStor[(int)status];
00213     bv.set(job_id, set_clear);
00214 
00215     if ((status == CNetScheduleAPI::ePending) &&
00216         (job_id < m_LastPending)) {
00217         m_LastPending = job_id - 1;
00218     }
00219 }
00220 
00221 
00222 TJobStatus 
00223 CJobStatusTracker::ChangeStatus(unsigned   job_id, 
00224                                 TJobStatus status,
00225                                 bool*      updated)
00226 {
00227     CWriteLockGuard guard(m_Lock);
00228     bool status_updated = false;
00229     TJobStatus old_status = CNetScheduleAPI::eJobNotFound;
00230 
00231     switch (status) {
00232 
00233     case CNetScheduleAPI::ePending:
00234         old_status = x_GetStatusNoLock(job_id);
00235         if (old_status == CNetScheduleAPI::eJobNotFound) { // new job
00236             SetExactStatusNoLock(job_id, status, true);
00237             status_updated = true;
00238             break;
00239         }
00240         if (old_status == CNetScheduleAPI::eRunning) {
00241             x_SetClearStatusNoLock(job_id, status, old_status);
00242             status_updated = true;
00243             break;
00244         }
00245         ReportInvalidStatus(job_id, status, old_status);
00246         break;
00247 
00248     case CNetScheduleAPI::eRunning:
00249         old_status = IsStatusNoLock(job_id,
00250                                     CNetScheduleAPI::ePending,
00251                                     CNetScheduleAPI::eCanceled);
00252         if (old_status != CNetScheduleAPI::eJobNotFound) {
00253             if (IsCancelCode(old_status)) {
00254                 break;
00255             }
00256             x_SetClearStatusNoLock(job_id, status, old_status);
00257             status_updated = true;
00258             break;
00259         }
00260 
00261         ReportInvalidStatus(job_id, status, old_status);
00262         break;
00263 
00264     case CNetScheduleAPI::eReturned:
00265         _ASSERT(0);
00266         break;
00267 
00268     case CNetScheduleAPI::eCanceled:
00269         old_status = IsStatusNoLock(job_id,
00270                                     CNetScheduleAPI::ePending,
00271                                     CNetScheduleAPI::eRunning);
00272         if (old_status != CNetScheduleAPI::eJobNotFound) {
00273             x_SetClearStatusNoLock(job_id, status, old_status);
00274             status_updated = true;
00275             break;
00276         }
00277         // in this case (failed, done) we just do nothing.
00278         old_status = CNetScheduleAPI::eCanceled;
00279         break;
00280 
00281     case CNetScheduleAPI::eFailed:
00282         old_status = IsStatusNoLock(job_id,
00283                                     CNetScheduleAPI::eRunning);
00284         if (old_status != CNetScheduleAPI::eJobNotFound) {
00285             x_SetClearStatusNoLock(job_id, status, old_status);
00286             status_updated = true;
00287             break;
00288         }
00289 
00290         old_status = CNetScheduleAPI::eFailed;
00291         break;
00292 
00293     case CNetScheduleAPI::eDone:
00294 
00295         old_status = IsStatusNoLock(job_id,
00296                                     CNetScheduleAPI::eRunning,
00297                                     CNetScheduleAPI::ePending);
00298         if (old_status != CNetScheduleAPI::eJobNotFound) {
00299             x_SetClearStatusNoLock(job_id, status, old_status);
00300             status_updated = true;
00301             IncDoneJobs();
00302             break;
00303         }
00304         old_status = IsStatusNoLock(job_id,
00305                                     CNetScheduleAPI::eCanceled,
00306                                     CNetScheduleAPI::eFailed);
00307         if (IsCancelCode(old_status)) {
00308             break;
00309         }
00310         old_status = x_GetStatusNoLock(job_id);
00311         if (old_status == CNetScheduleAPI::eDone) {
00312             break;
00313         }
00314         ReportInvalidStatus(job_id, status, old_status);
00315         break;
00316 
00317     default:
00318         _ASSERT(0);
00319     }
00320 
00321     if (updated) *updated = status_updated;
00322     return old_status;
00323 }
00324 
00325 
00326 void
00327 CJobStatusTracker::AddPendingBatch(unsigned job_id_from, unsigned job_id_to)
00328 {
00329     CWriteLockGuard guard(m_Lock);
00330 
00331     TNSBitVector& bv = *m_StatusStor[(int) CNetScheduleAPI::ePending];
00332     bv.set_range(job_id_from, job_id_to);
00333 }
00334 
00335 
00336 bool
00337 CJobStatusTracker::GetPendingJobFromSet(TNSBitVector* candidate_set,
00338                                         unsigned* job_id)
00339 {
00340     *job_id = 0;
00341     TNSBitVector& bv = *m_StatusStor[(int) CNetScheduleAPI::ePending];
00342 
00343     unsigned candidate_id = 0;
00344     while ( 0 == candidate_id ) {
00345         // STAGE 1: (read lock)
00346         // look for the first pending candidate bit
00347         {{
00348             CReadLockGuard guard(m_Lock);
00349             TNSBitVector::enumerator en(candidate_set->first());
00350             if (!en.valid()) { // no more candidates
00351                 return bv.any();
00352             }
00353             for (; en.valid(); ++en) {
00354                 unsigned id = *en;
00355                 if (bv[id]) { // check if candidate is pending
00356                     candidate_id = id;
00357                     break;
00358                 }
00359             }
00360         }}
00361 
00362         // STAGE 2: (write lock)
00363         // candidate job goes to running status
00364         // set of candidates is corrected to reflect new disposition
00365         // (clear all non-pending candidates)
00366         //
00367         if (candidate_id) {
00368             CWriteLockGuard guard(m_Lock);
00369             // clean the candidate set, to reflect stage 1 scan
00370             candidate_set->set_range(0, candidate_id, false);
00371             if (bv[candidate_id]) { // still pending?
00372                 x_SetClearStatusNoLock(candidate_id,
00373                                        CNetScheduleAPI::eRunning,
00374                                        CNetScheduleAPI::ePending);
00375                 *job_id = candidate_id;
00376                 return true;
00377             } else {
00378                 // somebody picked up this id already
00379                 candidate_id = 0;
00380             }
00381         } else {
00382             // previous step did not pick up a sutable(pending) candidate
00383             // candidate set is dismissed, pending search stopped
00384             candidate_set->clear(true); // clear with memfree
00385             break;
00386         }
00387     } // while
00388 
00389     CReadLockGuard guard(m_Lock);
00390     return bv.any();
00391 }
00392 
00393 
00394 bool
00395 CJobStatusTracker::GetPendingJob(const TNSBitVector& unwanted_jobs,
00396                                  unsigned*           job_id)
00397 {
00398     CWriteLockGuard guard(m_Lock);
00399     *job_id = 0;
00400     TNSBitVector& bv = *m_StatusStor[(int) CNetScheduleAPI::ePending];
00401     if (!bv.any())
00402         return false;
00403     TNSBitVector  bv_pending(bv);
00404     bv_pending -= unwanted_jobs;
00405     TNSBitVector::enumerator en(bv_pending.first());
00406     if (!en.valid()) { // no more candidates
00407         return bv.any();
00408     }
00409 
00410     unsigned candidate_id = *en;
00411     x_SetClearStatusNoLock(candidate_id,
00412                             CNetScheduleAPI::eRunning,
00413                             CNetScheduleAPI::ePending);
00414     *job_id = candidate_id;
00415     return true;
00416 }
00417 
00418 
00419 void
00420 CJobStatusTracker::PendingIntersect(TNSBitVector* candidate_set)
00421 {
00422     CReadLockGuard guard(m_Lock);
00423 
00424     TNSBitVector& bv = *m_StatusStor[(int) CNetScheduleAPI::ePending];
00425     *candidate_set &= bv;
00426 }
00427 
00428 
00429 void CJobStatusTracker::SwitchJobs(unsigned count,
00430                                    TJobStatus old_status, TJobStatus new_status,
00431                                    TNSBitVector& jobs,
00432                                    const TNSBitVector* unwanted_jobs)
00433 {
00434     CReadLockGuard guard(m_Lock);
00435     TNSBitVector& bv_old = *m_StatusStor[(int) old_status];
00436     TNSBitVector& bv_new = *m_StatusStor[(int) new_status];
00437     TNSBitVector::enumerator en(bv_old.first());
00438     for (unsigned n = 0; en.valid() && n < count; ++en) {
00439         unsigned id = *en;
00440         if (unwanted_jobs && (*unwanted_jobs)[id]) continue;
00441         jobs.set(id);
00442         ++n;
00443     }
00444     bv_old -= jobs;
00445     bv_new |= jobs;
00446 }
00447 
00448 
00449 void CJobStatusTracker::GetAliveJobs(TNSBitVector& ids)
00450 {
00451     CReadLockGuard guard(m_Lock);
00452     for (TStatusStorage::size_type i = 0; i < m_StatusStor.size(); ++i) {
00453         TNSBitVector& bv = *m_StatusStor[i];
00454         ids |= bv;
00455     }
00456 }
00457 
00458 
00459 bool CJobStatusTracker::AnyPending() const
00460 {
00461     const TNSBitVector& bv =
00462         *m_StatusStor[(int) CNetScheduleAPI::ePending];
00463 
00464     CReadLockGuard guard(m_Lock);
00465     return bv.any();
00466 }
00467 
00468 
00469 unsigned CJobStatusTracker::GetFirstDone() const
00470 {
00471     return GetFirst(CNetScheduleAPI::eDone);
00472 }
00473 
00474 
00475 unsigned 
00476 CJobStatusTracker::GetFirst(TJobStatus status) const
00477 {
00478     const TNSBitVector& bv = *m_StatusStor[(int)status];
00479 
00480     CReadLockGuard guard(m_Lock);
00481     return bv.get_first();
00482 }
00483 
00484 
00485 unsigned 
00486 CJobStatusTracker::GetNext(TJobStatus status, unsigned job_id) const
00487 {
00488     const TNSBitVector& bv = *m_StatusStor[(int)status];
00489 
00490     CReadLockGuard guard(m_Lock);
00491     return bv.get_next(job_id);
00492 }
00493 
00494 
00495 void
00496 CJobStatusTracker::x_SetClearStatusNoLock(unsigned   job_id,
00497                                           TJobStatus status,
00498                                           TJobStatus old_status)
00499 {
00500     SetExactStatusNoLock(job_id, status, true);
00501     SetExactStatusNoLock(job_id, old_status, false);
00502 }
00503 
00504 
00505 void 
00506 CJobStatusTracker::ReportInvalidStatus(unsigned   job_id, 
00507                                        TJobStatus status,
00508                                        TJobStatus old_status)
00509 {
00510     string msg = "Job status cannot be changed. ";
00511     msg += "Old status ";
00512     msg += CNetScheduleAPI::StatusToString(old_status);
00513     msg += ". New status ";
00514     msg += CNetScheduleAPI::StatusToString(status);
00515     NCBI_THROW(CNetScheduleException, eInvalidJobStatus, msg);
00516 }
00517 
00518 
00519 TJobStatus 
00520 CJobStatusTracker::IsStatusNoLock(unsigned job_id, 
00521                                   TJobStatus st1,
00522                                   TJobStatus st2,
00523                                   TJobStatus st3) const
00524 {
00525     if (st1 == CNetScheduleAPI::eJobNotFound) {
00526         return CNetScheduleAPI::eJobNotFound;;
00527     } else {
00528         const TNSBitVector& bv = *m_StatusStor[(int)st1];
00529         if (bv[job_id]) {
00530             return st1;
00531         }
00532     }
00533 
00534     if (st2 == CNetScheduleAPI::eJobNotFound) {
00535         return CNetScheduleAPI::eJobNotFound;;
00536     } else {
00537         const TNSBitVector& bv = *m_StatusStor[(int)st2];
00538         if (bv[job_id]) {
00539             return st2;
00540         }
00541     }
00542 
00543     if (st3 == CNetScheduleAPI::eJobNotFound) {
00544         return CNetScheduleAPI::eJobNotFound;;
00545     } else {
00546         const TNSBitVector& bv = *m_StatusStor[(int)st3];
00547         if (bv[job_id]) {
00548             return st3;
00549         }
00550     }
00551 
00552     return CNetScheduleAPI::eJobNotFound;
00553 }
00554 
00555 
00556 void 
00557 CJobStatusTracker::PrintStatusMatrix(CNcbiOstream& out) const
00558 {
00559     CReadLockGuard guard(m_Lock);
00560     for (size_t i = CNetScheduleAPI::ePending;
00561                 i < m_StatusStor.size(); ++i) {
00562         out << "status:"
00563             << CNetScheduleAPI::StatusToString(TJobStatus(i))
00564             << "\n\n";
00565         const TNSBitVector& bv = *m_StatusStor[i];
00566         TNSBitVector::enumerator en(bv.first());
00567         for (int cnt = 0; en.valid(); ++en, ++cnt) {
00568             out << *en << ", ";
00569             if (cnt % 10 == 0) {
00570                 out << "\n";
00571             }
00572         }
00573         out << "\n\n";
00574         if (!out.good()) break;
00575     }
00576     out << "\n\n";
00577 }
00578 
00579 
00580 void CJobStatusTracker::IncDoneJobs()
00581 {
00582     ++m_DoneCnt;
00583     if (m_DoneCnt == 65535 * 2) {
00584         m_DoneCnt = 0;
00585         {{
00586         TNSBitVector& bv = *m_StatusStor[(int) CNetScheduleAPI::eDone];
00587         bv.optimize(0, TNSBitVector::opt_free_01);
00588         }}
00589         {{
00590         TNSBitVector& bv = *m_StatusStor[(int) CNetScheduleAPI::ePending];
00591         bv.optimize(0, TNSBitVector::opt_free_0);
00592         }}
00593     }
00594 }
00595 
00596 ///////////////////////////////////////////////////////////////////////
00597 // CNetSchedule_JSGroupGuard
00598 
00599 CNetSchedule_JSGroupGuard::CNetSchedule_JSGroupGuard(
00600     CJobStatusTracker&  strack,
00601     TJobStatus          old_status,
00602     const TNSBitVector& jobs,
00603     TJobStatus          new_status)
00604     : m_Tracker(strack),
00605     m_Commited(false),
00606     m_OldStatus(old_status),
00607     m_Jobs(jobs)
00608 {
00609     if (new_status != CNetScheduleAPI::eJobNotFound) {
00610         CReadLockGuard guard(m_Tracker.m_Lock);
00611         CJobStatusTracker::TStatusStorage sstor = m_Tracker.m_StatusStor;
00612         for (size_t i = 0; i < sstor.size(); ++i) {
00613             TNSBitVector& bv = *sstor[i];
00614             if ((size_t) new_status == i)
00615                 bv |= m_Jobs;
00616             else
00617                 bv -= m_Jobs;
00618         }
00619     }
00620 }
00621 
00622 CNetSchedule_JSGroupGuard::~CNetSchedule_JSGroupGuard()
00623 {
00624     if (m_Commited) return;
00625     CReadLockGuard guard(m_Tracker.m_Lock);
00626     CJobStatusTracker::TStatusStorage sstor = m_Tracker.m_StatusStor;
00627     for (size_t i = 0; i < sstor.size(); ++i) {
00628         TNSBitVector& bv = *sstor[i];
00629         if ((size_t) m_OldStatus == i)
00630             bv |= m_Jobs;
00631         else
00632             bv -= m_Jobs;
00633     }
00634 }
00635 
00636 
00637 ///////////////////////////////////////////////////////////////////////
00638 // CJSGuard
00639 /*
00640 CJSGuard::CJSGuard(CJobStatusTracker& strack,
00641                    unsigned           job_id,
00642                    TJobStatus         status,
00643                    int                timeout_ms)
00644     : m_Tracker(strack),
00645     m_JobId(job_id),
00646     m_NewStatus(status)
00647 {
00648     _ASSERT(job_id);
00649     unsigned cnt = 0; unsigned sleep_ms = 10;
00650     while (true) {
00651         {{
00652             CWriteLockGuard guard(m_Tracker.m_Lock);
00653             if (!m_Tracker.m_UsedIds[job_id]) {
00654                 m_OldStatus = (TJobStatus)
00655                     m_Tracker.x_GetStatusNoLock(job_id);
00656                 m_Tracker.m_UsedIds.set(job_id);
00657                 break;
00658             }
00659         }}
00660         cnt += sleep_ms;
00661         if (timeout_ms >= 0 && cnt > (unsigned) timeout_ms) {
00662             NCBI_THROW(CNetServiceException,
00663                     eTimeout, "Failed to lock object");
00664         }
00665         SleepMilliSec(sleep_ms);
00666     } // while
00667 }
00668 */
00669 
00670 END_NCBI_SCOPE
00671 
00672 

Generated on Wed Dec 9 04:08:48 2009 for NCBI C++ ToolKit by  doxygen 1.4.6
Modified on Wed Dec 09 08:17:54 2009 by modify_doxy.py rev. 173732