00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 #include <ncbi_pch.hpp>
00032 #include <corelib/ncbi_system.hpp>
00033
00034 #include <connect/services/netschedule_api.hpp>
00035 #include <util/bitset/bmalgo.h>
00036
00037 #include "job_status.hpp"
00038
00039 BEGIN_NCBI_SCOPE
00040
00041
00042 CJobStatusTracker::CJobStatusTracker()
00043 : m_LastPending(0),
00044 m_DoneCnt(0)
00045 {
00046 for (int i = 0; i < CNetScheduleAPI::eLastStatus; ++i) {
00047 TNSBitVector* bv;
00048 bv = new TNSBitVector(bm::BM_GAP);
00049 m_StatusStor.push_back(bv);
00050 }
00051 }
00052
00053
00054 CJobStatusTracker::~CJobStatusTracker()
00055 {
00056 for (TStatusStorage::size_type i = 0; i < m_StatusStor.size(); ++i) {
00057 TNSBitVector* bv = m_StatusStor[i];
00058 delete bv;
00059 }
00060 }
00061
00062
00063 TJobStatus
00064 CJobStatusTracker::GetStatus(unsigned job_id) const
00065 {
00066 CReadLockGuard guard(m_Lock);
00067 return x_GetStatusNoLock(job_id);
00068 }
00069
00070
00071 TJobStatus
00072 CJobStatusTracker::x_GetStatusNoLock(unsigned job_id) const
00073 {
00074 for (int i = m_StatusStor.size()-1; i >= 0; --i) {
00075 const TNSBitVector& bv = *m_StatusStor[i];
00076 bool b = bv[job_id];
00077 if (b) return TJobStatus(i);
00078 }
00079 return CNetScheduleAPI::eJobNotFound;
00080 }
00081
00082
00083 unsigned
00084 CJobStatusTracker::CountStatus(TJobStatus status) const
00085 {
00086 CReadLockGuard guard(m_Lock);
00087 return m_StatusStor[(int)status]->count();
00088 }
00089
00090
00091 void CJobStatusTracker::CountStatus(TStatusSummaryMap* status_map,
00092 const TNSBitVector* candidate_set)
00093 {
00094 _ASSERT(status_map);
00095 status_map->clear();
00096
00097 CReadLockGuard guard(m_Lock);
00098
00099 for (size_t i = 0; i < m_StatusStor.size(); ++i) {
00100 const TNSBitVector& bv = *m_StatusStor[i];
00101 unsigned cnt;
00102 if (candidate_set) {
00103 cnt = bm::count_and(bv, *candidate_set);
00104 } else {
00105 cnt = bv.count();
00106 }
00107 (*status_map)[(TJobStatus) i] = cnt;
00108 }
00109 }
00110
00111
00112 unsigned CJobStatusTracker::Count(void)
00113 {
00114 CReadLockGuard guard(m_Lock);
00115
00116 unsigned cnt = 0;
00117 for (size_t i = 0; i < m_StatusStor.size(); ++i) {
00118 const TNSBitVector& bv = *m_StatusStor[i];
00119 cnt += bv.count();
00120 }
00121 return cnt;
00122 }
00123
00124
00125 void
00126 CJobStatusTracker::StatusStatistics(TJobStatus status,
00127 TNSBitVector::statistics* st) const
00128 {
00129 _ASSERT(st);
00130 CReadLockGuard guard(m_Lock);
00131
00132 const TNSBitVector& bv = *m_StatusStor[(int)status];
00133 bv.calc_stat(st);
00134
00135 }
00136
00137
00138 void CJobStatusTracker::StatusSnapshot(TJobStatus status,
00139 TNSBitVector* bv) const
00140 {
00141 _ASSERT(bv);
00142 CReadLockGuard guard(m_Lock);
00143
00144 const TNSBitVector& bv_s = *m_StatusStor[(int)status];
00145 *bv |= bv_s;
00146 }
00147
00148
00149 void
00150 CJobStatusTracker::SetStatus(unsigned job_id,
00151 TJobStatus status)
00152 {
00153 CWriteLockGuard guard(m_Lock);
00154
00155 TJobStatus old_status = TJobStatus(-1);
00156 for (TStatusStorage::size_type i = 0; i < m_StatusStor.size(); ++i) {
00157 TNSBitVector& bv = *m_StatusStor[i];
00158 if (bv[job_id]) {
00159 if (old_status != TJobStatus(-1))
00160 ERR_POST("State matrix was damaged, "
00161 "more than one status active for job " << job_id);
00162 old_status = TJobStatus(i);
00163 }
00164 bv.set(job_id, (int)status == (int)i);
00165 }
00166 if (old_status == CNetScheduleAPI::eDone &&
00167 status == CNetScheduleAPI::ePending)
00168 ERR_POST("Illegal status change from Done to Pending for job " <<
00169 job_id);
00170 if (status == CNetScheduleAPI::eDone) {
00171 IncDoneJobs();
00172 }
00173 }
00174
00175 void CJobStatusTracker::Erase(unsigned job_id)
00176 {
00177 SetStatus(job_id, CNetScheduleAPI::eJobNotFound);
00178 }
00179
00180
00181 void CJobStatusTracker::ClearAll(TNSBitVector* bv)
00182 {
00183 CWriteLockGuard guard(m_Lock);
00184
00185 for (TStatusStorage::size_type i = 0; i < m_StatusStor.size(); ++i) {
00186 TNSBitVector& bv1 = *m_StatusStor[i];
00187 if (bv) {
00188 *bv |= bv1;
00189 }
00190 bv1.clear(true);
00191 }
00192 }
00193
00194
00195 void CJobStatusTracker::OptimizeMem()
00196 {
00197 for (TStatusStorage::size_type i = 0; i < m_StatusStor.size(); ++i) {
00198 TNSBitVector& bv = *m_StatusStor[i];
00199 {{
00200 CWriteLockGuard guard(m_Lock);
00201 bv.optimize(0, TNSBitVector::opt_free_0);
00202 }}
00203 }
00204 }
00205
00206
00207 void
00208 CJobStatusTracker::SetExactStatusNoLock(unsigned job_id,
00209 TJobStatus status,
00210 bool set_clear)
00211 {
00212 TNSBitVector& bv = *m_StatusStor[(int)status];
00213 bv.set(job_id, set_clear);
00214
00215 if ((status == CNetScheduleAPI::ePending) &&
00216 (job_id < m_LastPending)) {
00217 m_LastPending = job_id - 1;
00218 }
00219 }
00220
00221
00222 TJobStatus
00223 CJobStatusTracker::ChangeStatus(unsigned job_id,
00224 TJobStatus status,
00225 bool* updated)
00226 {
00227 CWriteLockGuard guard(m_Lock);
00228 bool status_updated = false;
00229 TJobStatus old_status = CNetScheduleAPI::eJobNotFound;
00230
00231 switch (status) {
00232
00233 case CNetScheduleAPI::ePending:
00234 old_status = x_GetStatusNoLock(job_id);
00235 if (old_status == CNetScheduleAPI::eJobNotFound) {
00236 SetExactStatusNoLock(job_id, status, true);
00237 status_updated = true;
00238 break;
00239 }
00240 if (old_status == CNetScheduleAPI::eRunning) {
00241 x_SetClearStatusNoLock(job_id, status, old_status);
00242 status_updated = true;
00243 break;
00244 }
00245 ReportInvalidStatus(job_id, status, old_status);
00246 break;
00247
00248 case CNetScheduleAPI::eRunning:
00249 old_status = IsStatusNoLock(job_id,
00250 CNetScheduleAPI::ePending,
00251 CNetScheduleAPI::eCanceled);
00252 if (old_status != CNetScheduleAPI::eJobNotFound) {
00253 if (IsCancelCode(old_status)) {
00254 break;
00255 }
00256 x_SetClearStatusNoLock(job_id, status, old_status);
00257 status_updated = true;
00258 break;
00259 }
00260
00261 ReportInvalidStatus(job_id, status, old_status);
00262 break;
00263
00264 case CNetScheduleAPI::eReturned:
00265 _ASSERT(0);
00266 break;
00267
00268 case CNetScheduleAPI::eCanceled:
00269 old_status = IsStatusNoLock(job_id,
00270 CNetScheduleAPI::ePending,
00271 CNetScheduleAPI::eRunning);
00272 if (old_status != CNetScheduleAPI::eJobNotFound) {
00273 x_SetClearStatusNoLock(job_id, status, old_status);
00274 status_updated = true;
00275 break;
00276 }
00277
00278 old_status = CNetScheduleAPI::eCanceled;
00279 break;
00280
00281 case CNetScheduleAPI::eFailed:
00282 old_status = IsStatusNoLock(job_id,
00283 CNetScheduleAPI::eRunning);
00284 if (old_status != CNetScheduleAPI::eJobNotFound) {
00285 x_SetClearStatusNoLock(job_id, status, old_status);
00286 status_updated = true;
00287 break;
00288 }
00289
00290 old_status = CNetScheduleAPI::eFailed;
00291 break;
00292
00293 case CNetScheduleAPI::eDone:
00294
00295 old_status = IsStatusNoLock(job_id,
00296 CNetScheduleAPI::eRunning,
00297 CNetScheduleAPI::ePending);
00298 if (old_status != CNetScheduleAPI::eJobNotFound) {
00299 x_SetClearStatusNoLock(job_id, status, old_status);
00300 status_updated = true;
00301 IncDoneJobs();
00302 break;
00303 }
00304 old_status = IsStatusNoLock(job_id,
00305 CNetScheduleAPI::eCanceled,
00306 CNetScheduleAPI::eFailed);
00307 if (IsCancelCode(old_status)) {
00308 break;
00309 }
00310 old_status = x_GetStatusNoLock(job_id);
00311 if (old_status == CNetScheduleAPI::eDone) {
00312 break;
00313 }
00314 ReportInvalidStatus(job_id, status, old_status);
00315 break;
00316
00317 default:
00318 _ASSERT(0);
00319 }
00320
00321 if (updated) *updated = status_updated;
00322 return old_status;
00323 }
00324
00325
00326 void
00327 CJobStatusTracker::AddPendingBatch(unsigned job_id_from, unsigned job_id_to)
00328 {
00329 CWriteLockGuard guard(m_Lock);
00330
00331 TNSBitVector& bv = *m_StatusStor[(int) CNetScheduleAPI::ePending];
00332 bv.set_range(job_id_from, job_id_to);
00333 }
00334
00335
00336 bool
00337 CJobStatusTracker::GetPendingJobFromSet(TNSBitVector* candidate_set,
00338 unsigned* job_id)
00339 {
00340 *job_id = 0;
00341 TNSBitVector& bv = *m_StatusStor[(int) CNetScheduleAPI::ePending];
00342
00343 unsigned candidate_id = 0;
00344 while ( 0 == candidate_id ) {
00345
00346
00347 {{
00348 CReadLockGuard guard(m_Lock);
00349 TNSBitVector::enumerator en(candidate_set->first());
00350 if (!en.valid()) {
00351 return bv.any();
00352 }
00353 for (; en.valid(); ++en) {
00354 unsigned id = *en;
00355 if (bv[id]) {
00356 candidate_id = id;
00357 break;
00358 }
00359 }
00360 }}
00361
00362
00363
00364
00365
00366
00367 if (candidate_id) {
00368 CWriteLockGuard guard(m_Lock);
00369
00370 candidate_set->set_range(0, candidate_id, false);
00371 if (bv[candidate_id]) {
00372 x_SetClearStatusNoLock(candidate_id,
00373 CNetScheduleAPI::eRunning,
00374 CNetScheduleAPI::ePending);
00375 *job_id = candidate_id;
00376 return true;
00377 } else {
00378
00379 candidate_id = 0;
00380 }
00381 } else {
00382
00383
00384 candidate_set->clear(true);
00385 break;
00386 }
00387 }
00388
00389 CReadLockGuard guard(m_Lock);
00390 return bv.any();
00391 }
00392
00393
00394 bool
00395 CJobStatusTracker::GetPendingJob(const TNSBitVector& unwanted_jobs,
00396 unsigned* job_id)
00397 {
00398 CWriteLockGuard guard(m_Lock);
00399 *job_id = 0;
00400 TNSBitVector& bv = *m_StatusStor[(int) CNetScheduleAPI::ePending];
00401 if (!bv.any())
00402 return false;
00403 TNSBitVector bv_pending(bv);
00404 bv_pending -= unwanted_jobs;
00405 TNSBitVector::enumerator en(bv_pending.first());
00406 if (!en.valid()) {
00407 return bv.any();
00408 }
00409
00410 unsigned candidate_id = *en;
00411 x_SetClearStatusNoLock(candidate_id,
00412 CNetScheduleAPI::eRunning,
00413 CNetScheduleAPI::ePending);
00414 *job_id = candidate_id;
00415 return true;
00416 }
00417
00418
00419 void
00420 CJobStatusTracker::PendingIntersect(TNSBitVector* candidate_set)
00421 {
00422 CReadLockGuard guard(m_Lock);
00423
00424 TNSBitVector& bv = *m_StatusStor[(int) CNetScheduleAPI::ePending];
00425 *candidate_set &= bv;
00426 }
00427
00428
00429 void CJobStatusTracker::SwitchJobs(unsigned count,
00430 TJobStatus old_status, TJobStatus new_status,
00431 TNSBitVector& jobs,
00432 const TNSBitVector* unwanted_jobs)
00433 {
00434 CReadLockGuard guard(m_Lock);
00435 TNSBitVector& bv_old = *m_StatusStor[(int) old_status];
00436 TNSBitVector& bv_new = *m_StatusStor[(int) new_status];
00437 TNSBitVector::enumerator en(bv_old.first());
00438 for (unsigned n = 0; en.valid() && n < count; ++en) {
00439 unsigned id = *en;
00440 if (unwanted_jobs && (*unwanted_jobs)[id]) continue;
00441 jobs.set(id);
00442 ++n;
00443 }
00444 bv_old -= jobs;
00445 bv_new |= jobs;
00446 }
00447
00448
00449 void CJobStatusTracker::GetAliveJobs(TNSBitVector& ids)
00450 {
00451 CReadLockGuard guard(m_Lock);
00452 for (TStatusStorage::size_type i = 0; i < m_StatusStor.size(); ++i) {
00453 TNSBitVector& bv = *m_StatusStor[i];
00454 ids |= bv;
00455 }
00456 }
00457
00458
00459 bool CJobStatusTracker::AnyPending() const
00460 {
00461 const TNSBitVector& bv =
00462 *m_StatusStor[(int) CNetScheduleAPI::ePending];
00463
00464 CReadLockGuard guard(m_Lock);
00465 return bv.any();
00466 }
00467
00468
00469 unsigned CJobStatusTracker::GetFirstDone() const
00470 {
00471 return GetFirst(CNetScheduleAPI::eDone);
00472 }
00473
00474
00475 unsigned
00476 CJobStatusTracker::GetFirst(TJobStatus status) const
00477 {
00478 const TNSBitVector& bv = *m_StatusStor[(int)status];
00479
00480 CReadLockGuard guard(m_Lock);
00481 return bv.get_first();
00482 }
00483
00484
00485 unsigned
00486 CJobStatusTracker::GetNext(TJobStatus status, unsigned job_id) const
00487 {
00488 const TNSBitVector& bv = *m_StatusStor[(int)status];
00489
00490 CReadLockGuard guard(m_Lock);
00491 return bv.get_next(job_id);
00492 }
00493
00494
00495 void
00496 CJobStatusTracker::x_SetClearStatusNoLock(unsigned job_id,
00497 TJobStatus status,
00498 TJobStatus old_status)
00499 {
00500 SetExactStatusNoLock(job_id, status, true);
00501 SetExactStatusNoLock(job_id, old_status, false);
00502 }
00503
00504
00505 void
00506 CJobStatusTracker::ReportInvalidStatus(unsigned job_id,
00507 TJobStatus status,
00508 TJobStatus old_status)
00509 {
00510 string msg = "Job status cannot be changed. ";
00511 msg += "Old status ";
00512 msg += CNetScheduleAPI::StatusToString(old_status);
00513 msg += ". New status ";
00514 msg += CNetScheduleAPI::StatusToString(status);
00515 NCBI_THROW(CNetScheduleException, eInvalidJobStatus, msg);
00516 }
00517
00518
00519 TJobStatus
00520 CJobStatusTracker::IsStatusNoLock(unsigned job_id,
00521 TJobStatus st1,
00522 TJobStatus st2,
00523 TJobStatus st3) const
00524 {
00525 if (st1 == CNetScheduleAPI::eJobNotFound) {
00526 return CNetScheduleAPI::eJobNotFound;;
00527 } else {
00528 const TNSBitVector& bv = *m_StatusStor[(int)st1];
00529 if (bv[job_id]) {
00530 return st1;
00531 }
00532 }
00533
00534 if (st2 == CNetScheduleAPI::eJobNotFound) {
00535 return CNetScheduleAPI::eJobNotFound;;
00536 } else {
00537 const TNSBitVector& bv = *m_StatusStor[(int)st2];
00538 if (bv[job_id]) {
00539 return st2;
00540 }
00541 }
00542
00543 if (st3 == CNetScheduleAPI::eJobNotFound) {
00544 return CNetScheduleAPI::eJobNotFound;;
00545 } else {
00546 const TNSBitVector& bv = *m_StatusStor[(int)st3];
00547 if (bv[job_id]) {
00548 return st3;
00549 }
00550 }
00551
00552 return CNetScheduleAPI::eJobNotFound;
00553 }
00554
00555
00556 void
00557 CJobStatusTracker::PrintStatusMatrix(CNcbiOstream& out) const
00558 {
00559 CReadLockGuard guard(m_Lock);
00560 for (size_t i = CNetScheduleAPI::ePending;
00561 i < m_StatusStor.size(); ++i) {
00562 out << "status:"
00563 << CNetScheduleAPI::StatusToString(TJobStatus(i))
00564 << "\n\n";
00565 const TNSBitVector& bv = *m_StatusStor[i];
00566 TNSBitVector::enumerator en(bv.first());
00567 for (int cnt = 0; en.valid(); ++en, ++cnt) {
00568 out << *en << ", ";
00569 if (cnt % 10 == 0) {
00570 out << "\n";
00571 }
00572 }
00573 out << "\n\n";
00574 if (!out.good()) break;
00575 }
00576 out << "\n\n";
00577 }
00578
00579
00580 void CJobStatusTracker::IncDoneJobs()
00581 {
00582 ++m_DoneCnt;
00583 if (m_DoneCnt == 65535 * 2) {
00584 m_DoneCnt = 0;
00585 {{
00586 TNSBitVector& bv = *m_StatusStor[(int) CNetScheduleAPI::eDone];
00587 bv.optimize(0, TNSBitVector::opt_free_01);
00588 }}
00589 {{
00590 TNSBitVector& bv = *m_StatusStor[(int) CNetScheduleAPI::ePending];
00591 bv.optimize(0, TNSBitVector::opt_free_0);
00592 }}
00593 }
00594 }
00595
00596
00597
00598
00599 CNetSchedule_JSGroupGuard::CNetSchedule_JSGroupGuard(
00600 CJobStatusTracker& strack,
00601 TJobStatus old_status,
00602 const TNSBitVector& jobs,
00603 TJobStatus new_status)
00604 : m_Tracker(strack),
00605 m_Commited(false),
00606 m_OldStatus(old_status),
00607 m_Jobs(jobs)
00608 {
00609 if (new_status != CNetScheduleAPI::eJobNotFound) {
00610 CReadLockGuard guard(m_Tracker.m_Lock);
00611 CJobStatusTracker::TStatusStorage sstor = m_Tracker.m_StatusStor;
00612 for (size_t i = 0; i < sstor.size(); ++i) {
00613 TNSBitVector& bv = *sstor[i];
00614 if ((size_t) new_status == i)
00615 bv |= m_Jobs;
00616 else
00617 bv -= m_Jobs;
00618 }
00619 }
00620 }
00621
00622 CNetSchedule_JSGroupGuard::~CNetSchedule_JSGroupGuard()
00623 {
00624 if (m_Commited) return;
00625 CReadLockGuard guard(m_Tracker.m_Lock);
00626 CJobStatusTracker::TStatusStorage sstor = m_Tracker.m_StatusStor;
00627 for (size_t i = 0; i < sstor.size(); ++i) {
00628 TNSBitVector& bv = *sstor[i];
00629 if ((size_t) m_OldStatus == i)
00630 bv |= m_Jobs;
00631 else
00632 bv -= m_Jobs;
00633 }
00634 }
00635
00636
00637
00638
00639
00640
00641
00642
00643
00644
00645
00646
00647
00648
00649
00650
00651
00652
00653
00654
00655
00656
00657
00658
00659
00660
00661
00662
00663
00664
00665
00666
00667
00668
00669
00670 END_NCBI_SCOPE
00671
00672