NCBI C++ ToolKit
ns_queue.cpp
Go to the documentation of this file.
00001 /*  $Id: ns_queue.cpp 54574 2012-05-22 16:50:47Z satskyse $
00002  * ===========================================================================
00003  *
00004  *                            PUBLIC DOMAIN NOTICE
00005  *               National Center for Biotechnology Information
00006  *
00007  *  This software/database is a "United States Government Work" under the
00008  *  terms of the United States Copyright Act.  It was written as part of
00009  *  the author's official duties as a United States Government employee and
00010  *  thus cannot be copyrighted.  This software/database is freely available
00011  *  to the public for use. The National Library of Medicine and the U.S.
00012  *  Government have not placed any restriction on its use or reproduction.
00013  *
00014  *  Although all reasonable efforts have been taken to ensure the accuracy
00015  *  and reliability of the software and data, the NLM and the U.S.
00016  *  Government do not and cannot warrant the performance or results that
00017  *  may be obtained by using this software or data. The NLM and the U.S.
00018  *  Government disclaim all warranties, express or implied, including
00019  *  warranties of performance, merchantability or fitness for any particular
00020  *  purpose.
00021  *
00022  *  Please cite the author in any work or product based on this material.
00023  *
00024  * ===========================================================================
00025  *
00026  * Authors:  Victor Joukov
00027  *
00028  * File Description:
00029  *   NetSchedule queue structure and parameters
00030  */
00031 #include <ncbi_pch.hpp>
00032 
00033 #include "ns_queue.hpp"
00034 #include "ns_queue_param_accessor.hpp"
00035 #include "background_host.hpp"
00036 #include "ns_util.hpp"
00037 #include "ns_format.hpp"
00038 #include "ns_server.hpp"
00039 #include "ns_handler.hpp"
00040 
00041 #include <corelib/ncbi_system.hpp> // SleepMilliSec
00042 #include <corelib/request_ctx.hpp>
00043 #include <db/bdb/bdb_trans.hpp>
00044 #include <util/qparse/query_parse.hpp>
00045 #include <util/qparse/query_exec.hpp>
00046 #include <util/qparse/query_exec_bv.hpp>
00047 #include <util/bitset/bmalgo.h>
00048 
00049 
00050 BEGIN_NCBI_SCOPE
00051 
00052 
00053 
00054 //////////////////////////////////////////////////////////////////////////
00055 CQueueEnumCursor::CQueueEnumCursor(CQueue *  queue, unsigned int  start_after)
00056   : CBDB_FileCursor(queue->m_QueueDbBlock->job_db)
00057 {
00058     SetCondition(CBDB_FileCursor::eGT);
00059     From << start_after;
00060 }
00061 
00062 
00063 //////////////////////////////////////////////////////////////////////////
00064 // CQueue
00065 
00066 
00067 // Used together with m_SavedId. m_SavedId is saved in a DB and is used
00068 // as to start from value for the restarted neschedule.
00069 // s_ReserveDelta value is used to avoid to often DB updates
00070 static const unsigned int       s_ReserveDelta = 10000;
00071 
00072 
00073 CQueue::CQueue(CRequestExecutor&     executor,
00074                const string&         queue_name,
00075                const string&         qclass_name,
00076                TQueueKind            queue_kind,
00077                CNetScheduleServer *  server)
00078   :
00079     m_RunTimeLine(NULL),
00080     m_DeleteDatabase(false),
00081     m_Executor(executor),
00082     m_QueueName(queue_name),
00083     m_QueueClass(qclass_name),
00084     m_Kind(queue_kind),
00085     m_QueueDbBlock(0),
00086 
00087     m_BecameEmpty(-1),
00088 
00089     m_LastId(0),
00090     m_SavedId(s_ReserveDelta),
00091 
00092     m_ParamLock(CRWLock::fFavorWriters),
00093     m_Timeout(3600),
00094     m_RunTimeout(3600),
00095     m_RunTimeoutPrecision(-1),
00096     m_FailedRetries(0),
00097     m_BlacklistTime(0),
00098     m_EmptyLifetime(0),
00099     m_MaxInputSize(kNetScheduleMaxDBDataSize),
00100     m_MaxOutputSize(kNetScheduleMaxDBDataSize),
00101     m_DenyAccessViolations(false),
00102     m_WNodeTimeout(40),
00103     m_KeyGenerator(server->GetHost(), server->GetPort()),
00104     m_Log(server->IsLog()),
00105     m_LogBatchEachJob(server->IsLogBatchEachJob()),
00106     m_RefuseSubmits(false),
00107     m_LastAffinityGC(0),
00108     m_MaxAffinities(server->GetMaxAffinities()),
00109     m_AffinityHighMarkPercentage(server->GetAffinityHighMarkPercentage()),
00110     m_AffinityLowMarkPercentage(server->GetAffinityLowMarkPercentage()),
00111     m_AffinityHighRemoval(server->GetAffinityHighRemoval()),
00112     m_AffinityLowRemoval(server->GetAffinityLowRemoval()),
00113     m_AffinityDirtPercentage(server->GetAffinityDirtPercentage()),
00114     m_NotificationsList(server->GetNodeID(), queue_name),
00115     m_NotifHifreqInterval(0.1),
00116     m_NotifHifreqPeriod(5),
00117     m_NotifLofreqMult(50),
00118     m_DumpBufferSize(100)
00119 {
00120     _ASSERT(!queue_name.empty());
00121 }
00122 
00123 
00124 CQueue::~CQueue()
00125 {
00126     delete m_RunTimeLine;
00127     Detach();
00128 }
00129 
00130 
00131 void CQueue::Attach(SQueueDbBlock* block)
00132 {
00133     Detach();
00134     m_QueueDbBlock = block;
00135     m_AffinityRegistry.Attach(&m_QueueDbBlock->aff_dict_db);
00136     m_GroupRegistry.Attach(&m_QueueDbBlock->group_dict_db);
00137 
00138     // Here we have a db, so we can read the counter value we should start from
00139     m_LastId = x_ReadStartFromCounter();
00140     m_SavedId = m_LastId + s_ReserveDelta;
00141     if (m_SavedId < m_LastId) {
00142         // Overflow
00143         m_LastId = 0;
00144         m_SavedId = s_ReserveDelta;
00145     }
00146     x_UpdateStartFromCounter();
00147 }
00148 
00149 
00150 class CTruncateRequest : public CStdRequest
00151 {
00152 public:
00153     CTruncateRequest(SQueueDbBlock* qdbblock)
00154         : m_QueueDbBlock(qdbblock)
00155     {}
00156 
00157     virtual void Process()
00158     {
00159         // See CQueue::Detach why we can do this without locking.
00160         CStopWatch      sw(CStopWatch::eStart);
00161         m_QueueDbBlock->Truncate();
00162         m_QueueDbBlock->allocated = false;
00163         LOG_POST(Message << Warning << "Clean up of db block "
00164                          << m_QueueDbBlock->pos << " complete, "
00165                          << sw.Elapsed() << " elapsed");
00166     }
00167 private:
00168     SQueueDbBlock* m_QueueDbBlock;
00169 
00170 };
00171 
00172 
00173 void CQueue::Detach()
00174 {
00175     // We are here have synchronized access to m_QueueDbBlock without mutex
00176     // because we are here only when the last reference to CQueue is
00177     // destroyed. So as long m_QueueDbBlock->allocated is true it cannot
00178     // be allocated again and thus cannot be accessed as well.
00179     // As soon as we're done with detaching (here) or truncating (in separate
00180     // request, submitted for execution to the server thread pool), we can
00181     // set m_QueueDbBlock->allocated to false. Boolean write is atomic by
00182     // definition and test-and-set is executed from synchronized code in
00183     // CQueueDbBlockArray::Allocate.
00184     m_AffinityRegistry.Detach();
00185     if (!m_QueueDbBlock)
00186         return;
00187 
00188     if (m_DeleteDatabase) {
00189         CRef<CStdRequest> request(new CTruncateRequest(m_QueueDbBlock));
00190         m_Executor.SubmitRequest(request);
00191         m_DeleteDatabase = false;
00192     } else
00193         m_QueueDbBlock->allocated = false;
00194     m_QueueDbBlock = 0;
00195 }
00196 
00197 
00198 void CQueue::SetParameters(const SQueueParameters &  params)
00199 {
00200     // When modifying this, modify all places marked with PARAMETERS
00201     CWriteLockGuard     guard(m_ParamLock);
00202 
00203     m_Timeout    = params.timeout;
00204     m_RunTimeout = params.run_timeout;
00205     if (params.run_timeout && !m_RunTimeLine) {
00206         // One time only. Precision can not be reset.
00207         m_RunTimeLine =
00208             new CJobTimeLine(params.run_timeout_precision, 0);
00209         m_RunTimeoutPrecision = params.run_timeout_precision;
00210     }
00211 
00212     m_FailedRetries        = params.failed_retries;
00213     m_BlacklistTime        = params.blacklist_time;
00214     m_EmptyLifetime        = params.empty_lifetime;
00215     m_MaxInputSize         = params.max_input_size;
00216     m_MaxOutputSize        = params.max_output_size;
00217     m_DenyAccessViolations = params.deny_access_violations;
00218     m_WNodeTimeout         = params.wnode_timeout;
00219     m_NotifHifreqInterval  = params.notif_hifreq_interval;
00220     m_NotifHifreqPeriod    = params.notif_hifreq_period;
00221     m_NotifLofreqMult      = params.notif_lofreq_mult;
00222     m_DumpBufferSize       = params.dump_buffer_size;
00223 
00224     // program version control
00225     m_ProgramVersionList.Clear();
00226     if (!params.program_name.empty()) {
00227         m_ProgramVersionList.AddClientInfo(params.program_name);
00228     }
00229     m_SubmHosts.SetHosts(params.subm_hosts);
00230     m_WnodeHosts.SetHosts(params.wnode_hosts);
00231 }
00232 
00233 
00234 CQueue::TParameterList CQueue::GetParameters() const
00235 {
00236     TParameterList          parameters;
00237     CQueueParamAccessor     qp(*this);
00238     unsigned                nParams = qp.GetNumParams();
00239 
00240     for (unsigned n = 0; n < nParams; ++n) {
00241         parameters.push_back(
00242             pair<string, string>(qp.GetParamName(n), qp.GetParamValue(n)));
00243     }
00244     return parameters;
00245 }
00246 
00247 
00248 void CQueue::GetMaxIOSizes(unsigned int &  max_input_size,
00249                            unsigned int &  max_output_size) const
00250 {
00251     CQueueParamAccessor     qp(*this);
00252 
00253     max_input_size = qp.GetMaxInputSize();
00254     max_output_size = qp.GetMaxOutputSize();
00255     return;
00256 }
00257 
00258 
00259 unsigned CQueue::LoadStatusMatrix()
00260 {
00261     // Load the known affinities and groups
00262     m_AffinityRegistry.LoadAffinityDictionary();
00263     m_GroupRegistry.LoadGroupDictionary();
00264 
00265     // scan the queue, load the state machine from DB
00266     CBDB_FileCursor     cur(m_QueueDbBlock->job_db);
00267 
00268     cur.InitMultiFetch(1024*1024);
00269     cur.SetCondition(CBDB_FileCursor::eGE);
00270     cur.From << 0;
00271 
00272     unsigned int    recs = 0;
00273 
00274     for (; cur.Fetch() == eBDB_Ok; ) {
00275         unsigned int    job_id = m_QueueDbBlock->job_db.id;
00276         unsigned int    group_id = m_QueueDbBlock->job_db.group_id;
00277         unsigned int    aff_id = m_QueueDbBlock->job_db.aff_id;
00278         time_t          last_touch = m_QueueDbBlock->job_db.last_touch;
00279         time_t          job_timeout = m_QueueDbBlock->job_db.timeout;
00280         time_t          job_run_timeout = m_QueueDbBlock->job_db.run_timeout;
00281         TJobStatus      status = TJobStatus(static_cast<int>(m_QueueDbBlock->job_db.status));
00282 
00283         m_StatusTracker.SetExactStatusNoLock(job_id, status, true);
00284 
00285         if ((status == CNetScheduleAPI::eRunning ||
00286              status == CNetScheduleAPI::eReading) &&
00287             m_RunTimeLine) {
00288             // Add object to the first available slot;
00289             // it is going to be rescheduled or dropped
00290             // in the background control thread
00291             // We can use time line without lock here because
00292             // the queue is still in single-use mode while
00293             // being loaded.
00294             m_RunTimeLine->AddObject(m_RunTimeLine->GetHead(), job_id);
00295         }
00296 
00297         // Register the job for the affinity if so
00298         if (aff_id != 0)
00299             m_AffinityRegistry.AddJobToAffinity(job_id, aff_id);
00300 
00301         // Register the job in the group registry
00302         if (group_id != 0)
00303             m_GroupRegistry.AddJob(group_id, job_id);
00304 
00305         // Register the loaded job with the garbage collector
00306         m_GCRegistry.RegisterJob(job_id, aff_id, group_id,
00307                                  GetJobExpirationTime(last_touch, status,
00308                                                       job_timeout,
00309                                                       job_run_timeout,
00310                                                       m_Timeout, m_RunTimeout));
00311 
00312         ++recs;
00313     }
00314 
00315     // Make sure that there are no affinity IDs in the registry for which there
00316     // are no jobs and initialize the next affinity ID counter.
00317     m_AffinityRegistry.FinalizeAffinityDictionaryLoading();
00318 
00319     // Make sure that there are no group IDs in the registry for which there
00320     // are no jobs and initialize the next group ID counter.
00321     m_GroupRegistry.FinalizeGroupDictionaryLoading();
00322 
00323     return recs;
00324 }
00325 
00326 
00327 // Used to log a single job
00328 void CQueue::x_LogSubmit(const CJob &       job,
00329                          const string &     aff,
00330                          const string &     group)
00331 {
00332     if (!m_Log)
00333         return;
00334 
00335     string                  group_token(group);
00336 
00337     if (group_token.empty())
00338         group_token = "n/a";
00339 
00340     CDiagContext_Extra  extra = GetDiagContext().Extra()
00341         .Print("job_key", MakeKey(job.GetId()))
00342         .Print("queue", GetQueueName())
00343         .Print("input", job.GetInput())
00344         .Print("aff", aff)
00345         .Print("group", group_token)
00346         .Print("mask", job.GetMask())
00347         .Print("subm_addr", CSocketAPI::gethostbyaddr(job.GetSubmAddr()))
00348         .Print("subm_notif_port", job.GetSubmNotifPort())
00349         .Print("subm_notif_timeout", job.GetSubmNotifTimeout())
00350         .Print("timeout", Uint8(job.GetTimeout()))
00351         .Print("run_timeout", job.GetRunTimeout())
00352         .Print("progress_msg", job.GetProgressMsg());
00353 
00354     extra.Flush();
00355     return;
00356 }
00357 
00358 
00359 unsigned int  CQueue::Submit(const CNSClientId &  client,
00360                              CJob &               job,
00361                              const string &       aff_token,
00362                              const string &       group)
00363 {
00364     // the only config parameter used here is the max input size so there is no
00365     // need to have a safe parameters accessor.
00366 
00367     if (job.GetInput().size() > m_MaxInputSize)
00368         NCBI_THROW(CNetScheduleException, eDataTooLong, "Input is too long");
00369 
00370     unsigned int    aff_id = 0;
00371     unsigned int    group_id = 0;
00372     time_t          current_time = time(0);
00373     unsigned int    job_id = GetNextId();
00374     CJobEvent &     event = job.AppendEvent();
00375 
00376     job.SetId(job_id);
00377     job.SetPassport(rand());
00378     job.SetLastTouch(current_time);
00379 
00380     event.SetNodeAddr(client.GetAddress());
00381     event.SetStatus(CNetScheduleAPI::ePending);
00382     event.SetEvent(CJobEvent::eSubmit);
00383     event.SetTimestamp(current_time);
00384     event.SetClientNode(client.GetNode());
00385     event.SetClientSession(client.GetSession());
00386 
00387     // Special treatment for system job masks
00388     if (job.GetMask() & CNetScheduleAPI::eOutOfOrder)
00389     {
00390         // NOT IMPLEMENTED YET: put job id into OutOfOrder list.
00391         // The idea is that there can be an urgent job, which
00392         // should be executed before jobs which were submitted
00393         // earlier, e.g. for some administrative purposes. See
00394         // CNetScheduleAPI::EJobMask in file netschedule_api.hpp
00395     }
00396 
00397     // Take the queue lock and start the operation
00398     {{
00399         CFastMutexGuard     guard(m_OperationLock);
00400 
00401         {{
00402             CNSTransaction      transaction(this);
00403 
00404             if (!group.empty()) {
00405                 group_id = m_GroupRegistry.AddJob(group, job_id);
00406                 job.SetGroupId(group_id);
00407             }
00408             if (!aff_token.empty()) {
00409                 aff_id = m_AffinityRegistry.ResolveAffinityToken(aff_token,
00410                                                                  job_id, 0);
00411                 job.SetAffinityId(aff_id);
00412             }
00413             job.Flush(this);
00414 
00415             transaction.Commit();
00416         }}
00417 
00418         m_StatusTracker.AddPendingJob(job_id);
00419 
00420         // Register the job with the client
00421         m_ClientsRegistry.AddToSubmitted(client, 1);
00422 
00423         // Make the decision whether to send or not a notification
00424         m_NotificationsList.Notify(job_id, aff_id,
00425                                    m_ClientsRegistry,
00426                                    m_AffinityRegistry,
00427                                    m_NotifHifreqPeriod);
00428 
00429         m_GCRegistry.RegisterJob(job_id, aff_id, group_id,
00430                                  job.GetExpirationTime(m_Timeout,
00431                                                        m_RunTimeout));
00432     }}
00433 
00434     m_StatisticsCounters.CountSubmit(1);
00435     x_LogSubmit(job, aff_token, group);
00436     return job_id;
00437 }
00438 
00439 
00440 unsigned int  CQueue::SubmitBatch(const CNSClientId &             client,
00441                                   vector< pair<CJob, string> > &  batch,
00442                                   const string &                  group)
00443 {
00444     unsigned int    batch_size = batch.size();
00445     unsigned int    job_id = GetNextJobIdForBatch(batch_size);
00446     TNSBitVector    affinities;
00447 
00448     {{
00449         unsigned int        job_id_cnt = job_id;
00450         time_t              curr_time = time(0);
00451         unsigned int        group_id = 0;
00452         bool                no_aff_jobs = false;
00453 
00454         CFastMutexGuard     guard(m_OperationLock);
00455 
00456         {{
00457             CNSTransaction      transaction(this);
00458 
00459             // This might create a new record in the DB
00460             group_id = m_GroupRegistry.ResolveGroup(group);
00461 
00462             for (size_t  k = 0; k < batch_size; ++k) {
00463 
00464                 CJob &              job = batch[k].first;
00465                 const string &      aff_token = batch[k].second;
00466                 CJobEvent &         event = job.AppendEvent();
00467 
00468                 job.SetId(job_id_cnt);
00469                 job.SetPassport(rand());
00470                 job.SetGroupId(group_id);
00471                 job.SetLastTouch(curr_time);
00472 
00473                 event.SetNodeAddr(client.GetAddress());
00474                 event.SetStatus(CNetScheduleAPI::ePending);
00475                 event.SetEvent(CJobEvent::eBatchSubmit);
00476                 event.SetTimestamp(curr_time);
00477                 event.SetClientNode(client.GetNode());
00478                 event.SetClientSession(client.GetSession());
00479 
00480                 if (!aff_token.empty()) {
00481                     unsigned int    aff_id = m_AffinityRegistry.
00482                                                 ResolveAffinityToken(aff_token,
00483                                                                      job_id_cnt,
00484                                                                      0);
00485 
00486                     job.SetAffinityId(aff_id);
00487                     affinities.set_bit(aff_id, true);
00488                 }
00489                 else
00490                     no_aff_jobs = true;
00491 
00492                 job.Flush(this);
00493                 ++job_id_cnt;
00494             }
00495 
00496             transaction.Commit();
00497         }}
00498 
00499         m_GroupRegistry.AddJobs(group_id, job_id, batch_size);
00500         m_StatusTracker.AddPendingBatch(job_id, job_id + batch_size - 1);
00501         m_ClientsRegistry.AddToSubmitted(client, batch_size);
00502 
00503         // Make a decision whether to notify clients or not
00504         TNSBitVector        jobs;
00505         jobs.set_range(job_id, job_id + batch_size - 1);
00506         m_NotificationsList.Notify(jobs, affinities, no_aff_jobs,
00507                                    m_ClientsRegistry,
00508                                    m_AffinityRegistry,
00509                                    m_NotifHifreqPeriod);
00510 
00511         for (size_t  k = 0; k < batch_size; ++k)
00512             m_GCRegistry.RegisterJob(batch[k].first.GetId(),
00513                                      batch[k].first.GetAffinityId(),
00514                                      group_id,
00515                                      batch[k].first.GetExpirationTime(m_Timeout,
00516                                                                       m_RunTimeout));
00517     }}
00518 
00519     m_StatisticsCounters.CountSubmit(batch_size);
00520     if (m_LogBatchEachJob)
00521         for (size_t  k = 0; k < batch_size; ++k)
00522             x_LogSubmit(batch[k].first, batch[k].second, group);
00523 
00524     return job_id;
00525 }
00526 
00527 
00528 
00529 static const size_t     k_max_dead_locks = 10;  // max. dead lock repeats
00530 
00531 TJobStatus  CQueue::PutResult(const CNSClientId &  client,
00532                               time_t               curr,
00533                               unsigned int         job_id,
00534                               const string &       auth_token,
00535                               int                  ret_code,
00536                               const string *       output)
00537 {
00538     _ASSERT(job_id && output);
00539 
00540     // The only one parameter (max output size) is required for the put
00541     // operation so there is no need to use CQueueParamAccessor
00542 
00543     if (output->size() > m_MaxOutputSize)
00544         NCBI_THROW(CNetScheduleException, eDataTooLong,
00545                    "Output is too long");
00546 
00547     CJob                job;
00548     size_t              dead_locks = 0; // dead lock counter
00549 
00550     for (;;) {
00551         try {
00552             CFastMutexGuard     guard(m_OperationLock);
00553             TJobStatus          old_status = GetJobStatus(job_id);
00554 
00555             if (old_status == CNetScheduleAPI::eDone) {
00556                 m_StatisticsCounters.CountTransition(CNetScheduleAPI::eDone,
00557                                                      CNetScheduleAPI::eDone);
00558                 return old_status;
00559             }
00560 
00561             if (old_status != CNetScheduleAPI::ePending &&
00562                 old_status != CNetScheduleAPI::eRunning)
00563                 return old_status;
00564 
00565             {{
00566                 CNSTransaction      transaction(this);
00567                 x_UpdateDB_PutResultNoLock(job_id, auth_token, curr,
00568                                            ret_code, *output, job,
00569                                            client);
00570                 transaction.Commit();
00571             }}
00572 
00573             m_StatusTracker.ChangeStatus(this, job_id, CNetScheduleAPI::eDone);
00574             m_ClientsRegistry.ClearExecuting(job_id);
00575 
00576             m_GCRegistry.UpdateLifetime(job_id, job.GetExpirationTime(m_Timeout,
00577                                                                       m_RunTimeout));
00578 
00579             TimeLineRemove(job_id);
00580 
00581             if (job.ShouldNotify(curr))
00582                 m_NotificationsList.NotifyJobStatus(job.GetSubmAddr(),
00583                                                     job.GetSubmNotifPort(),
00584                                                     MakeKey(job_id),
00585                                                     job.GetStatus());
00586             return old_status;
00587         }
00588         catch (CBDB_ErrnoException &  ex) {
00589             if (ex.IsDeadLock()) {
00590                 if (++dead_locks < k_max_dead_locks) {
00591                     ERR_POST("DeadLock repeat in CQueue::PutResult");
00592                     SleepMilliSec(100);
00593                     continue;
00594                 }
00595             } else if (ex.IsNoMem()) {
00596                 if (++dead_locks < k_max_dead_locks) {
00597                     ERR_POST("No resource repeat in CQueue::PutResult");
00598                     SleepMilliSec(100);
00599                     continue;
00600                 }
00601             }
00602 
00603             if (ex.IsDeadLock() || ex.IsNoMem()) {
00604                 string  message = "Too many transaction repeats in "
00605                                   "CQueue::PutResult";
00606                 ERR_POST(message);
00607                 NCBI_THROW(CNetScheduleException, eTryAgain, message);
00608             }
00609 
00610             throw;
00611         }
00612     }
00613 }
00614 
00615 
00616 bool  CQueue::GetJobOrWait(const CNSClientId &     client,
00617                            unsigned short          port, // Port the client
00618                                                          // will wait on
00619                            unsigned int            timeout, // If timeout != 0 => WGET
00620                            time_t                  curr,
00621                            const list<string> *    aff_list,
00622                            bool                    wnode_affinity,
00623                            bool                    any_affinity,
00624                            bool                    exclusive_new_affinity,
00625                            bool                    new_format,
00626                            CJob *                  new_job)
00627 {
00628     // We need exactly 1 parameter - m_RunTimeout, so we can access it without
00629     // CQueueParamAccessor
00630 
00631     size_t      dead_locks = 0;                 // dead lock counter
00632 
00633     for (;;) {
00634         try {
00635             TNSBitVector        aff_ids;
00636 
00637             {{
00638                 CFastMutexGuard     guard(m_OperationLock);
00639 
00640                 if (wnode_affinity) {
00641                     // Check first that the were no preferred affinities reset for
00642                     // the client
00643                     if (m_ClientsRegistry.GetAffinityReset(client))
00644                         return false;   // Affinity was reset, so no job for the client
00645                 }
00646 
00647                 if (timeout != 0) {
00648                     // WGET:
00649                     // The affinities has to be resolved straight away, however at this
00650                     // point it is still unknown that the client will wait for them, so
00651                     // the client ID is passed as 0. Later on the client info will be
00652                     // updated in the affinity registry if the client really waits for
00653                     // this affinities.
00654                     CNSTransaction      transaction(this);
00655                     aff_ids = m_AffinityRegistry.ResolveAffinitiesForWaitClient(*aff_list, 0);
00656                     transaction.Commit();
00657                 }
00658                 else
00659                     // GET:
00660                     // No need to create aff records if they are not known
00661                     aff_ids = m_AffinityRegistry.GetAffinityIDs(*aff_list);
00662 
00663                 x_UnregisterGetListener(client, port);
00664             }}
00665 
00666             for (;;) {
00667                 // No lock here to make it possible to pick a job
00668                 // simultaneously from many threads
00669                 x_SJobPick  job_pick = x_FindPendingJob(client, aff_ids,
00670                                                         wnode_affinity,
00671                                                         any_affinity,
00672                                                         exclusive_new_affinity);
00673                 {{
00674                     CFastMutexGuard     guard(m_OperationLock);
00675 
00676                     if (job_pick.job_id == 0) {
00677                         if (timeout != 0) {
00678                             // WGET:
00679                             // There is no job, so the client might need to be registered
00680                             // in the waiting list
00681                             if (port > 0)
00682                                 x_RegisterGetListener(client, port, timeout, aff_ids,
00683                                                       wnode_affinity, any_affinity,
00684                                                       exclusive_new_affinity,
00685                                                       new_format);
00686                         }
00687                         return true;
00688                     }
00689 
00690                     // Check that the job is still Pending
00691                     if (GetJobStatus(job_pick.job_id) !=
00692                             CNetScheduleAPI::ePending)
00693                         continue;   // Try to pick a job again
00694 
00695                     // the job is still pending, check if it was received as
00696                     // with exclusive affinity
00697                     if (job_pick.exclusive && job_pick.aff_id != 0) {
00698                         if (m_ClientsRegistry.IsPreferredByAny(job_pick.aff_id))
00699                             continue;   // Other WN grabbed this affinity already
00700                         m_ClientsRegistry.UpdatePreferredAffinities(client,
00701                                                                     job_pick.aff_id,
00702                                                                     0);
00703                     }
00704 
00705                     if (x_UpdateDB_GetJobNoLock(client, curr,
00706                                                 job_pick.job_id, *new_job)) {
00707                         // The job is not expired and successfully read
00708                         m_StatusTracker.ChangeStatus(this, job_pick.job_id,
00709                                                      CNetScheduleAPI::eRunning);
00710                         m_GCRegistry.UpdateLifetime(job_pick.job_id,
00711                                                     new_job->GetExpirationTime(m_Timeout,
00712                                                                                m_RunTimeout));
00713                         TimeLineAdd(job_pick.job_id, curr + m_RunTimeout);
00714                         m_ClientsRegistry.AddToRunning(client, job_pick.job_id);
00715                         return true;
00716                     }
00717 
00718                     // Reset job_id and try again
00719                     new_job->SetId(0);
00720                 }}
00721             }
00722         }
00723         catch (CBDB_ErrnoException &  ex) {
00724             if (ex.IsDeadLock()) {
00725                 if (++dead_locks < k_max_dead_locks) {
00726                     ERR_POST("DeadLock repeat in CQueue::GetJobOrWait");
00727                     SleepMilliSec(100);
00728                     continue;
00729                 }
00730             } else if (ex.IsNoMem()) {
00731                 if (++dead_locks < k_max_dead_locks) {
00732                     ERR_POST("No resource repeat in CQueue::GetJobOrWait");
00733                     SleepMilliSec(100);
00734                     continue;
00735                 }
00736             }
00737 
00738             if (ex.IsDeadLock() || ex.IsNoMem()) {
00739                 string  message = "Too many transaction repeats in "
00740                                   "CQueue::GetJobOrWait";
00741                 ERR_POST(message);
00742                 NCBI_THROW(CNetScheduleException, eTryAgain, message);
00743             }
00744 
00745             throw;
00746         }
00747     }
00748     return true;
00749 }
00750 
00751 
00752 void  CQueue::CancelWaitGet(const CNSClientId &  client)
00753 {
00754     bool    result;
00755 
00756     {{
00757         CFastMutexGuard     guard(m_OperationLock);
00758 
00759         result = x_UnregisterGetListener(client, 0);
00760     }}
00761 
00762     if (result == false)
00763         LOG_POST(Message << Warning << "Attempt to cancel WGET for the client "
00764                                        "which does not wait anything (node: "
00765                          << client.GetNode() << " session: "
00766                          << client.GetSession() << ")");
00767     return;
00768 }
00769 
00770 
00771 string  CQueue::ChangeAffinity(const CNSClientId &     client,
00772                                const list<string> &    aff_to_add,
00773                                const list<string> &    aff_to_del)
00774 {
00775     // It is guaranteed here that the client is a new style one.
00776     // I.e. it has both client_node and client_session.
00777 
00778     string          msg;    // Warning message for the socket
00779     unsigned int    client_id = client.GetID();
00780     TNSBitVector    current_affinities =
00781                          m_ClientsRegistry.GetPreferredAffinities(client);
00782     TNSBitVector    aff_id_to_add;
00783     TNSBitVector    aff_id_to_del;
00784     bool            any_to_add = false;
00785     bool            any_to_del = false;
00786 
00787     // Identify the affinities which should be deleted
00788     for (list<string>::const_iterator  k(aff_to_del.begin());
00789          k != aff_to_del.end(); ++k) {
00790         unsigned int    aff_id = m_AffinityRegistry.GetIDByToken(*k);
00791 
00792         if (aff_id == 0) {
00793             // The affinity is not known for NS at all
00794             LOG_POST(Message << Warning << "Client '" << client.GetNode()
00795                              << "' deletes unknown affinity '"
00796                              << *k << "'. Ignored.");
00797             if (!msg.empty())
00798                 msg += "; ";
00799             msg += "unknown affinity to delete: " + *k;
00800             continue;
00801         }
00802 
00803         if (current_affinities[aff_id] == false) {
00804             // This a try to delete something which has not been added or
00805             // deleted before.
00806             LOG_POST(Message << Warning << "Client '" << client.GetNode()
00807                              << "' deletes affinity '" << *k
00808                              << "' which is not in the list of the "
00809                                 "preferred client affinities. Ignored.");
00810             if (!msg.empty())
00811                 msg += "; ";
00812             msg += "not registered affinity to delete: " + *k;
00813             continue;
00814         }
00815 
00816         // The affinity will really be deleted
00817         aff_id_to_del.set(aff_id, true);
00818         any_to_del = true;
00819     }
00820 
00821 
00822     // Check that the update of the affinities list will not exceed the limit
00823     // for the max number of affinities per client.
00824     // Note: this is not precise check. There could be non-unique affinities in
00825     // the add list or some of affinities to add could already be in the list.
00826     // The precise checking however requires more CPU and blocking so only an
00827     // approximate (but fast) checking is done.
00828     if (current_affinities.count() + aff_to_add.size()
00829                                    - aff_id_to_del.count() >
00830                                          m_MaxAffinities) {
00831         NCBI_THROW(CNetScheduleException, eTooManyPreferredAffinities,
00832                    "The client '" + client.GetNode() +
00833                    "' exceeds the limit (" +
00834                    NStr::UIntToString(m_MaxAffinities) +
00835                    ") of the preferred affinities. Changed request ignored.");
00836     }
00837 
00838     // To avoid logging under the lock
00839     vector<string>  already_added_affinities;
00840 
00841     {{
00842 
00843         CFastMutexGuard     guard(m_OperationLock);
00844         CNSTransaction      transaction(this);
00845 
00846         // Convert the aff_to_add to the affinity IDs
00847         for (list<string>::const_iterator  k(aff_to_add.begin());
00848              k != aff_to_add.end(); ++k ) {
00849             unsigned int    aff_id =
00850                                  m_AffinityRegistry.ResolveAffinityToken(*k,
00851                                                                  0, client_id);
00852 
00853             if (current_affinities[aff_id] == true) {
00854                 already_added_affinities.push_back(*k);
00855                 continue;
00856             }
00857 
00858             aff_id_to_add.set(aff_id, true);
00859             any_to_add = true;
00860         }
00861 
00862         transaction.Commit();
00863     }}
00864 
00865     // Log the warnings and add it to the warning message
00866     for (vector<string>::const_iterator  j(already_added_affinities.begin());
00867          j != already_added_affinities.end(); ++j) {
00868         // That was a try to add something which has already been added
00869         LOG_POST(Message << Warning << "Client '" << client.GetNode()
00870                          << "' adds affinity '" << *j
00871                          << "' which is already in the list of the "
00872                             "preferred client affinities. Ignored.");
00873         if (!msg.empty())
00874             msg += "; ";
00875         msg += "already registered affinity to add: " + *j;
00876     }
00877 
00878     if (any_to_del)
00879         m_AffinityRegistry.RemoveClientFromAffinities(client_id,
00880                                                       aff_id_to_del);
00881 
00882     if (any_to_add || any_to_del)
00883         m_ClientsRegistry.UpdatePreferredAffinities(client,
00884                                                     aff_id_to_add,
00885                                                     aff_id_to_del);
00886     return msg;
00887 }
00888 
00889 
00890 TJobStatus  CQueue::JobDelayExpiration(unsigned int     job_id,
00891                                        time_t           tm)
00892 {
00893     CJob                job;
00894     unsigned            queue_run_timeout = GetRunTimeout();
00895     time_t              curr = time(0);
00896 
00897     {{
00898         CFastMutexGuard     guard(m_OperationLock);
00899         TJobStatus          status = GetJobStatus(job_id);
00900 
00901         if (status != CNetScheduleAPI::eRunning)
00902             return status;
00903 
00904         time_t          time_start = 0;
00905         time_t          run_timeout = 0;
00906         {{
00907             CNSTransaction      transaction(this);
00908 
00909             if (job.Fetch(this, job_id) != CJob::eJF_Ok)
00910                 return CNetScheduleAPI::eJobNotFound;
00911 
00912             time_start = job.GetLastEvent()->GetTimestamp();
00913             run_timeout = job.GetRunTimeout();
00914             if (run_timeout == 0)
00915                 run_timeout = queue_run_timeout;
00916 
00917             if (time_start + run_timeout > curr + tm)
00918                 return CNetScheduleAPI::eRunning;   // Old timeout is enough to
00919                                                     // cover this request, so
00920                                                     // keep it.
00921 
00922             job.SetRunTimeout(curr + tm - time_start);
00923             job.SetLastTouch(curr);
00924             job.Flush(this);
00925             transaction.Commit();
00926         }}
00927 
00928         // No need to update the GC registry because the running (and reading)
00929         // jobs are skipped by GC
00930 
00931         time_t  exp_time = run_timeout == 0 ? 0 : time_start + run_timeout;
00932 
00933         TimeLineMove(job_id, exp_time, curr + tm);
00934     }}
00935 
00936     return CNetScheduleAPI::eRunning;
00937 }
00938 
00939 
00940 TJobStatus  CQueue::GetStatusAndLifetime(unsigned int  job_id,
00941                                          bool          need_touch,
00942                                          time_t *      lifetime)
00943 {
00944     CFastMutexGuard     guard(m_OperationLock);
00945     TJobStatus          status = GetJobStatus(job_id);
00946 
00947     if (status == CNetScheduleAPI::eJobNotFound)
00948         return status;
00949 
00950     if (need_touch) {
00951         CJob                job;
00952         time_t              curr = time(0);
00953 
00954         {{
00955             CNSTransaction      transaction(this);
00956 
00957             if (job.Fetch(this, job_id) != CJob::eJF_Ok)
00958                 NCBI_THROW(CNetScheduleException, eInternalError,
00959                            "Error fetching job: " + DecorateJobId(job_id));
00960 
00961             job.SetLastTouch(curr);
00962             job.Flush(this);
00963             transaction.Commit();
00964         }}
00965 
00966         m_GCRegistry.UpdateLifetime(job_id,
00967                                     job.GetExpirationTime(m_Timeout,
00968                                                           m_RunTimeout,
00969                                                           curr));
00970     }
00971 
00972     *lifetime = x_GetEstimatedJobLifetime(job_id, status);
00973     return status;
00974 }
00975 
00976 
00977 bool CQueue::PutProgressMessage(unsigned int    job_id,
00978                                 const string &  msg)
00979 {
00980     CJob                job;
00981     time_t              curr = time(0);
00982 
00983     {{
00984         CFastMutexGuard     guard(m_OperationLock);
00985 
00986         {{
00987             CNSTransaction      transaction(this);
00988 
00989             if (job.Fetch(this, job_id) != CJob::eJF_Ok)
00990                 return false;
00991 
00992             job.SetProgressMsg(msg);
00993             job.SetLastTouch(curr);
00994             job.Flush(this);
00995             transaction.Commit();
00996         }}
00997 
00998         m_GCRegistry.UpdateLifetime(job_id,
00999                                     job.GetExpirationTime(m_Timeout,
01000                                                           m_RunTimeout,
01001                                                           curr));
01002     }}
01003 
01004     return true;
01005 }
01006 
01007 
01008 TJobStatus  CQueue::ReturnJob(const CNSClientId &     client,
01009                               unsigned int            job_id,
01010                               const string &          auth_token,
01011                               string &                warning)
01012 {
01013     CJob                job;
01014     CFastMutexGuard     guard(m_OperationLock);
01015     TJobStatus          old_status = GetJobStatus(job_id);
01016 
01017     if (old_status != CNetScheduleAPI::eRunning)
01018         return old_status;
01019 
01020     {{
01021          CNSTransaction      transaction(this);
01022 
01023         if (job.Fetch(this, job_id) != CJob::eJF_Ok)
01024             NCBI_THROW(CNetScheduleException, eInternalError,
01025                        "Error fetching job: " + DecorateJobId(job_id));
01026         if (job.GetStatus() != CNetScheduleAPI::eRunning)
01027             NCBI_THROW(CNetScheduleException, eInvalidJobStatus,
01028                        "Operation is applicable to eRunning job state only");
01029 
01030         if (!auth_token.empty()) {
01031             // Need to check authorization token first
01032             CJob::EAuthTokenCompareResult   token_compare_result =
01033                                             job.CompareAuthToken(auth_token);
01034             if (token_compare_result == CJob::eInvalidTokenFormat)
01035                 NCBI_THROW(CNetScheduleException, eInvalidAuthToken,
01036                            "Invalid authorization token format");
01037             if (token_compare_result == CJob::eNoMatch)
01038                 NCBI_THROW(CNetScheduleException, eInvalidAuthToken,
01039                            "Authorization token does not match");
01040             if (token_compare_result == CJob::ePassportOnlyMatch) {
01041                 // That means the job has been given to another worker node
01042                 // by whatever reason (expired/failed/returned before)
01043                 LOG_POST(Message << Warning << "Received RETURN2 with only "
01044                                                "passport matched.");
01045                 warning = "Only job passport matched. Command is ignored.";
01046                 return old_status;
01047             }
01048             // Here: the authorization token is OK, we can continue
01049         }
01050 
01051 
01052         unsigned int    run_count = job.GetRunCount();
01053         CJobEvent *     event = job.GetLastEvent();
01054         time_t          current_time = time(0);
01055 
01056         if (!event)
01057             ERR_POST("No JobEvent for running job " << DecorateJobId(job_id));
01058 
01059         event = &job.AppendEvent();
01060         event->SetNodeAddr(client.GetAddress());
01061         event->SetStatus(CNetScheduleAPI::ePending);
01062         event->SetEvent(CJobEvent::eReturn);
01063         event->SetTimestamp(current_time);
01064         event->SetClientNode(client.GetNode());
01065         event->SetClientSession(client.GetSession());
01066 
01067         if (run_count)
01068             job.SetRunCount(run_count-1);
01069 
01070         job.SetStatus(CNetScheduleAPI::ePending);
01071         job.SetLastTouch(current_time);
01072         job.Flush(this);
01073 
01074         transaction.Commit();
01075     }}
01076 
01077     m_StatusTracker.ChangeStatus(this, job_id, CNetScheduleAPI::ePending);
01078     TimeLineRemove(job_id);
01079     m_ClientsRegistry.ClearExecuting(job_id);
01080     m_ClientsRegistry.AddToBlacklist(client, job_id);
01081     m_GCRegistry.UpdateLifetime(job_id, job.GetExpirationTime(m_Timeout,
01082                                                               m_RunTimeout));
01083 
01084     m_NotificationsList.Notify(job_id,
01085                                job.GetAffinityId(), m_ClientsRegistry,
01086                                m_AffinityRegistry, m_NotifHifreqPeriod);
01087     return old_status;
01088 }
01089 
01090 
01091 TJobStatus  CQueue::ReadAndTouchJob(unsigned int  job_id,
01092                                     CJob &        job,
01093                                     time_t *      lifetime)
01094 {
01095     CFastMutexGuard         guard(m_OperationLock);
01096     TJobStatus              status = GetJobStatus(job_id);
01097 
01098     if (status == CNetScheduleAPI::eJobNotFound)
01099         return status;
01100 
01101     time_t                  curr = time(0);
01102     {{
01103         CNSTransaction      transaction(this);
01104 
01105         if (job.Fetch(this, job_id) != CJob::eJF_Ok)
01106             NCBI_THROW(CNetScheduleException, eInternalError,
01107                        "Error fetching job: " + DecorateJobId(job_id));
01108 
01109         job.SetLastTouch(curr);
01110         job.Flush(this);
01111         transaction.Commit();
01112     }}
01113 
01114     m_GCRegistry.UpdateLifetime(job_id,
01115                                 job.GetExpirationTime(m_Timeout,
01116                                                       m_RunTimeout,
01117                                                       curr));
01118     *lifetime = x_GetEstimatedJobLifetime(job_id, status);
01119     return status;
01120 }
01121 
01122 
01123 // Deletes all the jobs from the queue
01124 void CQueue::Truncate(void)
01125 {
01126     CJob                                job;
01127     TNSBitVector                        bv;
01128     vector<CNetScheduleAPI::EJobStatus> statuses;
01129     TNSBitVector                        jobs_to_notify;
01130     time_t                              current_time = time(0);
01131 
01132     // Pending and running jobs should be notified if requested
01133     statuses.push_back(CNetScheduleAPI::ePending);
01134     statuses.push_back(CNetScheduleAPI::eRunning);
01135 
01136     {{
01137         CFastMutexGuard     guard(m_OperationLock);
01138 
01139         jobs_to_notify = m_StatusTracker.GetJobs(statuses);
01140 
01141         // Make a decision if the jobs should really be notified
01142         {{
01143             CNSTransaction              transaction(this);
01144             TNSBitVector::enumerator    en(jobs_to_notify.first());
01145             for (; en.valid(); ++en) {
01146                 unsigned int    job_id = *en;
01147 
01148                 if (job.Fetch(this, job_id) != CJob::eJF_Ok) {
01149                     ERR_POST("Cannot fetch job " << MakeKey(job_id) <<
01150                              " while dropping all jobs in the queue");
01151                     continue;
01152                 }
01153 
01154                 if (job.ShouldNotify(current_time))
01155                     m_NotificationsList.NotifyJobStatus(job.GetSubmAddr(),
01156                                                         job.GetSubmNotifPort(),
01157                                                         MakeKey(job_id),
01158                                                         job.GetStatus());
01159             }
01160 
01161             // There is no need to commit transaction - there were no changes
01162         }}
01163 
01164         m_StatusTracker.ClearAll(&bv);
01165         m_AffinityRegistry.ClearMemoryAndDatabase();
01166         m_GroupRegistry.ClearMemoryAndDatabase();
01167 
01168         CWriteLockGuard     rtl_guard(m_RunTimeLineLock);
01169         m_RunTimeLine->ReInit(0);
01170     }}
01171 
01172     x_Erase(bv);
01173 
01174     // Next call updates 'm_BecameEmpty' timestamp
01175     IsExpired(); // locks CQueue lock
01176 }
01177 
01178 
01179 TJobStatus  CQueue::Cancel(const CNSClientId &  client,
01180                            unsigned int         job_id)
01181 {
01182     TJobStatus          old_status;
01183     CJob                job;
01184     time_t              current_time = time(0);
01185 
01186     {{
01187         CFastMutexGuard     guard(m_OperationLock);
01188 
01189         old_status = m_StatusTracker.GetStatus(job_id);
01190         if (old_status == CNetScheduleAPI::eJobNotFound)
01191             return CNetScheduleAPI::eJobNotFound;
01192         if (old_status == CNetScheduleAPI::eCanceled) {
01193             m_StatisticsCounters.CountTransition(
01194                                         CNetScheduleAPI::eCanceled,
01195                                         CNetScheduleAPI::eCanceled);
01196             return CNetScheduleAPI::eCanceled;
01197         }
01198 
01199         {{
01200             CNSTransaction      transaction(this);
01201             if (job.Fetch(this, job_id) != CJob::eJF_Ok)
01202                 return CNetScheduleAPI::eJobNotFound;
01203 
01204             CJobEvent *     event = &job.AppendEvent();
01205 
01206             event->SetNodeAddr(client.GetAddress());
01207             event->SetStatus(CNetScheduleAPI::eCanceled);
01208             event->SetEvent(CJobEvent::eCancel);
01209             event->SetTimestamp(current_time);
01210             event->SetClientNode(client.GetNode());
01211             event->SetClientSession(client.GetSession());
01212 
01213             job.SetStatus(CNetScheduleAPI::eCanceled);
01214             job.SetLastTouch(current_time);
01215             job.Flush(this);
01216 
01217             transaction.Commit();
01218         }}
01219 
01220         m_StatusTracker.ChangeStatus(this, job_id, CNetScheduleAPI::eCanceled);
01221 
01222         TimeLineRemove(job_id);
01223         if (old_status == CNetScheduleAPI::eRunning)
01224             m_ClientsRegistry.ClearExecuting(job_id);
01225         else if (old_status == CNetScheduleAPI::eReading)
01226             m_ClientsRegistry.ClearReading(job_id);
01227 
01228         m_GCRegistry.UpdateLifetime(job_id, job.GetExpirationTime(m_Timeout,
01229                                                                   m_RunTimeout));
01230 
01231     }}
01232 
01233     if ((old_status == CNetScheduleAPI::eRunning ||
01234          old_status == CNetScheduleAPI::ePending) && job.ShouldNotify(current_time))
01235         m_NotificationsList.NotifyJobStatus(job.GetSubmAddr(),
01236                                             job.GetSubmNotifPort(),
01237                                             MakeKey(job_id),
01238                                             job.GetStatus());
01239 
01240     return old_status;
01241 }
01242 
01243 
01244 void  CQueue::CancelAllJobs(const CNSClientId &  client)
01245 {
01246     vector<CNetScheduleAPI::EJobStatus> statuses;
01247 
01248     // All except cancelled
01249     statuses.push_back(CNetScheduleAPI::ePending);
01250     statuses.push_back(CNetScheduleAPI::eRunning);
01251     statuses.push_back(CNetScheduleAPI::eFailed);
01252     statuses.push_back(CNetScheduleAPI::eDone);
01253     statuses.push_back(CNetScheduleAPI::eReading);
01254     statuses.push_back(CNetScheduleAPI::eConfirmed);
01255     statuses.push_back(CNetScheduleAPI::eReadFailed);
01256 
01257     CFastMutexGuard     guard(m_OperationLock);
01258     x_CancelJobs(client, m_StatusTracker.GetJobs(statuses));
01259     return;
01260 }
01261 
01262 
01263 void CQueue::x_CancelJobs(const CNSClientId &   client,
01264                           const TNSBitVector &  jobs_to_cancel)
01265 {
01266     CJob                        job;
01267     time_t                      current_time = time(0);
01268     TNSBitVector::enumerator    en(jobs_to_cancel.first());
01269     for (; en.valid(); ++en) {
01270         unsigned int    job_id = *en;
01271         TJobStatus      old_status = m_StatusTracker.GetStatus(job_id);
01272 
01273         {{
01274             CNSTransaction              transaction(this);
01275             if (job.Fetch(this, job_id) != CJob::eJF_Ok) {
01276                 ERR_POST("Cannot fetch job " << MakeKey(job_id) <<
01277                          " while cancelling jobs");
01278                 continue;
01279             }
01280 
01281             CJobEvent *     event = &job.AppendEvent();
01282 
01283             event->SetNodeAddr(client.GetAddress());
01284             event->SetStatus(CNetScheduleAPI::eCanceled);
01285             event->SetEvent(CJobEvent::eCancel);
01286             event->SetTimestamp(current_time);
01287             event->SetClientNode(client.GetNode());
01288             event->SetClientSession(client.GetSession());
01289 
01290             job.SetStatus(CNetScheduleAPI::eCanceled);
01291             job.SetLastTouch(current_time);
01292             job.Flush(this);
01293 
01294             transaction.Commit();
01295         }}
01296 
01297         m_StatusTracker.ChangeStatus(this, job_id, CNetScheduleAPI::eCanceled);
01298 
01299         TimeLineRemove(job_id);
01300         if (old_status == CNetScheduleAPI::eRunning)
01301             m_ClientsRegistry.ClearExecuting(job_id);
01302         else if (old_status == CNetScheduleAPI::eReading)
01303             m_ClientsRegistry.ClearReading(job_id);
01304 
01305         m_GCRegistry.UpdateLifetime(job_id, job.GetExpirationTime(m_Timeout,
01306                                                                   m_RunTimeout));
01307         if ((old_status == CNetScheduleAPI::eRunning ||
01308              old_status == CNetScheduleAPI::ePending) && job.ShouldNotify(current_time))
01309             m_NotificationsList.NotifyJobStatus(job.GetSubmAddr(),
01310                                                 job.GetSubmNotifPort(),
01311                                                 MakeKey(job_id),
01312                                                 job.GetStatus());
01313     }
01314 
01315     return;
01316 }
01317 
01318 
01319 // This function must be called under the operations lock.
01320 // If called for not existing job then an exception is generated
01321 time_t
01322 CQueue::x_GetEstimatedJobLifetime(unsigned int   job_id,
01323                                   TJobStatus     status) const
01324 {
01325     if (status == CNetScheduleAPI::eRunning ||
01326         status == CNetScheduleAPI::eReading)
01327         return time(0) + GetTimeout();
01328     return m_GCRegistry.GetLifetime(job_id);
01329 }
01330 
01331 
01332 void  CQueue::CancelGroup(const CNSClientId &  client,
01333                           const string &       group)
01334 {
01335     CFastMutexGuard     guard(m_OperationLock);
01336     x_CancelJobs(client, m_GroupRegistry.GetJobs(group));
01337     return;
01338 }
01339 
01340 
01341 TJobStatus  CQueue::GetJobStatus(unsigned int  job_id) const
01342 {
01343     return m_StatusTracker.GetStatus(job_id);
01344 }
01345 
01346 
01347 bool CQueue::IsExpired()
01348 {
01349     time_t              empty_lifetime = GetEmptyLifetime();
01350     CFastMutexGuard     guard(m_OperationLock);
01351 
01352     if (m_Kind && empty_lifetime > 0) {
01353         if (m_StatusTracker.Count()) {
01354             m_BecameEmpty = -1;
01355         } else {
01356             if (m_BecameEmpty != -1 &&
01357                 m_BecameEmpty + empty_lifetime < time(0))
01358             {
01359                 LOG_POST(Message << Warning << "Queue " << m_QueueName
01360                                  << " expired. Became empty: "
01361                                  << CTime(m_BecameEmpty).ToLocalTime().AsString()
01362                                  << " Empty lifetime: " << empty_lifetime
01363                                  << " sec." );
01364                 return true;
01365             }
01366             if (m_BecameEmpty == -1)
01367                 m_BecameEmpty = time(0);
01368         }
01369     }
01370     return false;
01371 }
01372 
01373 
01374 unsigned int CQueue::GetNextId()
01375 {
01376     CFastMutexGuard     guard(m_LastIdLock);
01377 
01378     // Job indexes are expected to start from 1,
01379     // the m_LastId is 0 at the very beginning
01380     ++m_LastId;
01381     if (m_LastId >= m_SavedId) {
01382         m_SavedId += s_ReserveDelta;
01383         if (m_SavedId < m_LastId) {
01384             // Overflow for the saved ID
01385             m_LastId = 1;
01386             m_SavedId = s_ReserveDelta;
01387         }
01388         x_UpdateStartFromCounter();
01389     }
01390     return m_LastId;
01391 }
01392 
01393 
01394 // Reserves the given number of the job IDs
01395 unsigned int CQueue::GetNextJobIdForBatch(unsigned int  count)
01396 {
01397     CFastMutexGuard     guard(m_LastIdLock);
01398 
01399     // Job indexes are expected to start from 1 and be monotonously growing
01400     unsigned int    start_index = m_LastId;
01401 
01402     m_LastId += count;
01403     if (m_LastId < start_index ) {
01404         // Overflow
01405         m_LastId = count;
01406         m_SavedId = count + s_ReserveDelta;
01407         x_UpdateStartFromCounter();
01408     }
01409 
01410     // There were no overflow, check the reserved value
01411     if (m_LastId >= m_SavedId) {
01412         m_SavedId += s_ReserveDelta;
01413         if (m_SavedId < m_LastId) {
01414             // Overflow for the saved ID
01415             m_LastId = count;
01416             m_SavedId = count + s_ReserveDelta;
01417         }
01418         x_UpdateStartFromCounter();
01419     }
01420 
01421     return m_LastId - count + 1;
01422 }
01423 
01424 
01425 void CQueue::x_UpdateStartFromCounter(void)
01426 {
01427     // Updates the start_from value in the Berkley DB file
01428     if (m_QueueDbBlock) {
01429         // The only record is expected and its key is always 1
01430         m_QueueDbBlock->start_from_db.pseudo_key = 1;
01431         m_QueueDbBlock->start_from_db.start_from = m_SavedId;
01432         m_QueueDbBlock->start_from_db.UpdateInsert();
01433     }
01434     return;
01435 }
01436 
01437 
01438 unsigned int  CQueue::x_ReadStartFromCounter(void)
01439 {
01440     // Reads the start_from value from the Berkley DB file
01441     if (m_QueueDbBlock) {
01442         // The only record is expected and its key is always 1
01443         m_QueueDbBlock->start_from_db.pseudo_key = 1;
01444         m_QueueDbBlock->start_from_db.Fetch();
01445         return m_QueueDbBlock->start_from_db.start_from;
01446     }
01447     return 1;
01448 }
01449 
01450 
01451 void CQueue::GetJobForReading(const CNSClientId &   client,
01452                               unsigned int          read_timeout,
01453                               const string &        group,
01454                               CJob *                job)
01455 {
01456     time_t                                  curr(time(0));
01457     vector<CNetScheduleAPI::EJobStatus>     from_state;
01458 
01459     from_state.push_back(CNetScheduleAPI::eDone);
01460     from_state.push_back(CNetScheduleAPI::eFailed);
01461 
01462     CFastMutexGuard     guard(m_OperationLock);
01463     TNSBitVector        candidates = m_StatusTracker.GetJobs(from_state);
01464 
01465     // Exclude blacklisted jobs
01466     candidates -= m_ClientsRegistry.GetBlacklistedJobs(client);
01467 
01468     if (!group.empty())
01469         // Apply restrictions on the group jobs if so
01470         candidates &= m_GroupRegistry.GetJobs(group);
01471 
01472     if (!candidates.any())
01473         return;
01474 
01475     unsigned int        job_id = *candidates.first();
01476     {{
01477          CNSTransaction     transaction(this);
01478 
01479         // Fetch the job and update it and flush
01480         job->Fetch(this, job_id);
01481 
01482         unsigned int    read_count = job->GetReadCount() + 1;
01483         CJobEvent *     event = &job->AppendEvent();
01484 
01485         event->SetStatus(CNetScheduleAPI::eReading);
01486         event->SetEvent(CJobEvent::eRead);
01487         event->SetTimestamp(curr);
01488         event->SetNodeAddr(client.GetAddress());
01489         event->SetClientNode(client.GetNode());
01490         event->SetClientSession(client.GetSession());
01491 
01492         job->SetRunTimeout(read_timeout);
01493         job->SetStatus(CNetScheduleAPI::eReading);
01494         job->SetReadCount(read_count);
01495         job->SetLastTouch(curr);
01496         job->Flush(this);
01497 
01498         transaction.Commit();
01499     }}
01500 
01501     if (read_timeout == 0)
01502         read_timeout = m_RunTimeout;
01503 
01504     if (read_timeout != 0)
01505         TimeLineAdd(job_id, curr + read_timeout);
01506 
01507     // Update the memory cache
01508     m_StatusTracker.ChangeStatus(this, job_id, CNetScheduleAPI::eReading);
01509     m_ClientsRegistry.AddToReading(client, job_id);
01510 
01511     m_GCRegistry.UpdateLifetime(job_id, job->GetExpirationTime(m_Timeout,
01512                                                                m_RunTimeout));
01513     return;
01514 }
01515 
01516 
01517 TJobStatus  CQueue::ConfirmReadingJob(const CNSClientId &  client,
01518                                       unsigned int         job_id,
01519                                       const string &       auth_token)
01520 {
01521     TJobStatus      old_status = x_ChangeReadingStatus(
01522                                                 client, job_id,
01523                                                 auth_token,
01524                                                 CNetScheduleAPI::eConfirmed);
01525     m_ClientsRegistry.ClearReading(client, job_id);
01526     return old_status;
01527 }
01528 
01529 
01530 TJobStatus  CQueue::FailReadingJob(const CNSClientId &  client,
01531                                    unsigned int         job_id,
01532                                    const string &       auth_token)
01533 {
01534     TJobStatus      old_status = x_ChangeReadingStatus(
01535                                                 client, job_id,
01536                                                 auth_token,
01537                                                 CNetScheduleAPI::eReadFailed);
01538     m_ClientsRegistry.ClearReadingSetBlacklist(job_id);
01539     return old_status;
01540 }
01541 
01542 
01543 TJobStatus  CQueue::ReturnReadingJob(const CNSClientId &  client,
01544                                      unsigned int         job_id,
01545                                      const string &       auth_token)
01546 {
01547     TJobStatus      old_status = x_ChangeReadingStatus(
01548                                                 client, job_id,
01549                                                 auth_token,
01550                                                 CNetScheduleAPI::eDone);
01551     m_ClientsRegistry.ClearReadingSetBlacklist(job_id);
01552     return old_status;
01553 }
01554 
01555 
01556 TJobStatus  CQueue::x_ChangeReadingStatus(const CNSClientId &  client,
01557                                           unsigned int         job_id,
01558                                           const string &       auth_token,
01559                                           TJobStatus           target_status)
01560 {
01561     time_t                                      current_time = time(0);
01562     CJob                                        job;
01563     TJobStatus                                  new_status = target_status;
01564     CStatisticsCounters::ETransitionPathOption  path_option =
01565                                                     CStatisticsCounters::eNone;
01566     CFastMutexGuard                             guard(m_OperationLock);
01567     TJobStatus                                  old_status = GetJobStatus(job_id);
01568 
01569     if (old_status != CNetScheduleAPI::eReading)
01570         return old_status;
01571 
01572     {{
01573         CNSTransaction      transaction(this);
01574 
01575         if (job.Fetch(this, job_id) != CJob::eJF_Ok)
01576             NCBI_THROW(CNetScheduleException, eInternalError,
01577                        "Error fetching job: " + DecorateJobId(job_id));
01578 
01579         // Check that authorization token matches
01580         CJob::EAuthTokenCompareResult   token_compare_result =
01581                                             job.CompareAuthToken(auth_token);
01582         if (token_compare_result == CJob::eInvalidTokenFormat)
01583             NCBI_THROW(CNetScheduleException, eInvalidAuthToken,
01584                        "Invalid authorization token format");
01585         if (token_compare_result == CJob::eNoMatch)
01586             NCBI_THROW(CNetScheduleException, eInvalidAuthToken,
01587                        "Authorization token does not match");
01588 
01589         // Sanity check of the current job state
01590         if (job.GetStatus() != CNetScheduleAPI::eReading)
01591             NCBI_THROW(CNetScheduleException, eInternalError,
01592                 "Internal inconsistency detected. The job " +
01593                 DecorateJobId(job_id) + " state in memory is " +
01594                 CNetScheduleAPI::StatusToString(CNetScheduleAPI::eReading) +
01595                 " while in database it is " +
01596                 CNetScheduleAPI::StatusToString(job.GetStatus()));
01597 
01598         // Add an event
01599         CJobEvent &     event = job.AppendEvent();
01600         event.SetTimestamp(current_time);
01601         event.SetNodeAddr(client.GetAddress());
01602         event.SetClientNode(client.GetNode());
01603         event.SetClientSession(client.GetSession());
01604 
01605         switch (target_status) {
01606             case CNetScheduleAPI::eDone:
01607                 event.SetEvent(CJobEvent::eReadRollback);
01608                 job.SetReadCount(job.GetReadCount() - 1);
01609                 break;
01610             case CNetScheduleAPI::eReadFailed:
01611                 event.SetEvent(CJobEvent::eReadFail);
01612                 // Check the number of tries first
01613                 if (job.GetReadCount() <= m_FailedRetries) {
01614                     // The job needs to be re-scheduled for reading
01615                     new_status = CNetScheduleAPI::eDone;
01616                     path_option = CStatisticsCounters::eFail;
01617                 }
01618                 break;
01619             case CNetScheduleAPI::eConfirmed:
01620                 event.SetEvent(CJobEvent::eReadDone);
01621                 break;
01622             default:
01623                 _ASSERT(0);
01624                 break;
01625         }
01626 
01627         event.SetStatus(new_status);
01628         job.SetStatus(new_status);
01629         job.SetLastTouch(current_time);
01630 
01631         job.Flush(this);
01632         transaction.Commit();
01633     }}
01634 
01635     TimeLineRemove(job_id);
01636 
01637     m_StatusTracker.SetStatus(job_id, new_status);
01638     m_GCRegistry.UpdateLifetime(job_id, job.GetExpirationTime(m_Timeout,
01639                                                               m_RunTimeout));
01640     m_StatisticsCounters.CountTransition(CNetScheduleAPI::eReading,
01641                                          new_status,
01642                                          path_option);
01643     return CNetScheduleAPI::eReading;
01644 }
01645 
01646 
01647 // This function is called from places where the operations lock has been
01648 // already taken. So there is no lock around memory status tracker
01649 void CQueue::EraseJob(unsigned int  job_id)
01650 {
01651     m_StatusTracker.Erase(job_id);
01652 
01653     {{
01654         // Request delayed record delete
01655         CFastMutexGuard     jtd_guard(m_JobsToDeleteLock);
01656 
01657         m_JobsToDelete.set_bit(job_id);
01658     }}
01659     TimeLineRemove(job_id);
01660 }
01661 
01662 
01663 void CQueue::x_Erase(const TNSBitVector &  job_ids)
01664 {
01665     CFastMutexGuard     jtd_guard(m_JobsToDeleteLock);
01666 
01667     m_JobsToDelete |= job_ids;
01668 }
01669 
01670 
01671 void CQueue::OptimizeMem()
01672 {
01673     m_StatusTracker.OptimizeMem();
01674 }
01675 
01676 
01677 CQueue::x_SJobPick
01678 CQueue::x_FindPendingJob(const CNSClientId  &  client,
01679                          const TNSBitVector &  aff_ids,
01680                          bool                  wnode_affinity,
01681                          bool                  any_affinity,
01682                          bool                  exclusive_new_affinity)
01683 {
01684     x_SJobPick      job_pick;
01685     bool            explicit_aff = aff_ids.any();
01686     unsigned int    wnode_aff_candidate = 0;
01687     unsigned int    exclusive_aff_candidate = 0;
01688     TNSBitVector    blacklisted_jobs =
01689                     m_ClientsRegistry.GetBlacklistedJobs(client);
01690 
01691     TNSBitVector    pref_aff;
01692     if (wnode_affinity) {
01693         pref_aff = m_ClientsRegistry.GetPreferredAffinities(client);
01694         wnode_affinity = wnode_affinity && pref_aff.any();
01695     }
01696 
01697     TNSBitVector    all_pref_aff;
01698     if (exclusive_new_affinity)
01699         all_pref_aff = m_ClientsRegistry.GetAllPreferredAffinities();
01700 
01701     if (explicit_aff || wnode_affinity || exclusive_new_affinity) {
01702         // Check all pending jobs
01703         TNSBitVector                pending_jobs =
01704                             m_StatusTracker.GetJobs(CNetScheduleAPI::ePending);
01705         TNSBitVector::enumerator    en(pending_jobs.first());
01706 
01707         for (; en.valid(); ++en) {
01708             unsigned int    job_id = *en;
01709 
01710             if (blacklisted_jobs[job_id] == true)
01711                 continue;
01712 
01713             unsigned int    aff_id = m_GCRegistry.GetAffinityID(job_id);
01714             if (aff_id != 0 && explicit_aff) {
01715                 if (aff_ids[aff_id] == true) {
01716                     job_pick.job_id = job_id;
01717                     job_pick.exclusive = false;
01718                     job_pick.aff_id = aff_id;
01719                     return job_pick;
01720                 }
01721             }
01722 
01723             if (aff_id != 0 && wnode_affinity) {
01724                 if (pref_aff[aff_id] == true) {
01725                     if (explicit_aff == false) {
01726                         job_pick.job_id = job_id;
01727                         job_pick.exclusive = false;
01728                         job_pick.aff_id = aff_id;
01729                         return job_pick;
01730                     }
01731                     if (wnode_aff_candidate == 0)
01732                         wnode_aff_candidate = job_id;
01733                     continue;
01734                 }
01735             }
01736 
01737             if (exclusive_new_affinity) {
01738                 if (aff_id == 0 || all_pref_aff[aff_id] == false) {
01739                     if (explicit_aff == false && wnode_affinity == false) {
01740                         job_pick.job_id = job_id;
01741                         job_pick.exclusive = true;
01742                         job_pick.aff_id = aff_id;
01743                         return job_pick;
01744                     }
01745                     if (exclusive_aff_candidate == 0)
01746                         exclusive_aff_candidate = job_id;
01747                 }
01748             }
01749         }
01750 
01751         if (wnode_aff_candidate != 0) {
01752             job_pick.job_id = wnode_aff_candidate;
01753             job_pick.exclusive = false;
01754             job_pick.aff_id = 0;
01755             return job_pick;
01756         }
01757 
01758         if (exclusive_aff_candidate != 0) {
01759             job_pick.job_id = exclusive_aff_candidate;
01760             job_pick.exclusive = true;
01761             job_pick.aff_id = m_GCRegistry.GetAffinityID(exclusive_aff_candidate);
01762             return job_pick;
01763         }
01764     }
01765 
01766 
01767     if (any_affinity || (!explicit_aff && !wnode_affinity)) {
01768         job_pick.job_id = m_StatusTracker.GetJobByStatus(
01769                                     CNetScheduleAPI::ePending,
01770                                     blacklisted_jobs,
01771                                     TNSBitVector());
01772         job_pick.exclusive = false;
01773         job_pick.aff_id = 0;
01774         return job_pick;
01775     }
01776 
01777     job_pick.job_id = 0;
01778     job_pick.exclusive = false;
01779     job_pick.aff_id = 0;
01780     return job_pick;
01781 }
01782 
01783 
01784 
01785 string CQueue::GetAffinityList()
01786 {
01787     list< SAffinityStatistics >
01788                 statistics = m_AffinityRegistry.GetAffinityStatistics(m_StatusTracker);
01789 
01790     string          aff_list;
01791     for (list< SAffinityStatistics >::const_iterator  k = statistics.begin();
01792          k != statistics.end(); ++k) {
01793         if (!aff_list.empty())
01794             aff_list += "&";
01795 
01796         aff_list += NStr::URLEncode(k->m_Token) + '=' +
01797                     NStr::SizetToString(k->m_NumberOfPendingJobs) + "," +
01798                     NStr::SizetToString(k->m_NumberOfRunningJobs) + "," +
01799                     NStr::SizetToString(k->m_NumberOfPreferred) + "," +
01800                     NStr::SizetToString(k->m_NumberOfWaitGet);
01801     }
01802     return aff_list;
01803 }
01804 
01805 
01806 TJobStatus CQueue::FailJob(const CNSClientId &    client,
01807                            unsigned int           job_id,
01808                            const string &         auth_token,
01809                            const string &         err_msg,
01810                            const string &         output,
01811                            int                    ret_code,
01812                            string                 warning)
01813 {
01814     unsigned        failed_retries;
01815     unsigned        max_output_size;
01816     // time_t          blacklist_time;
01817     {{
01818         CQueueParamAccessor     qp(*this);
01819         failed_retries  = qp.GetFailedRetries();
01820         max_output_size = qp.GetMaxOutputSize();
01821         // blacklist_time  = qp.GetBlacklistTime();
01822     }}
01823 
01824     if (output.size() > max_output_size) {
01825         NCBI_THROW(CNetScheduleException, eDataTooLong,
01826                    "Output is too long");
01827     }
01828 
01829     CJob                job;
01830     time_t              curr = time(0);
01831     bool                rescheduled = false;
01832     TJobStatus          old_status;
01833 
01834     {{
01835         CFastMutexGuard     guard(m_OperationLock);
01836         TJobStatus          new_status = CNetScheduleAPI::eFailed;
01837 
01838         old_status = GetJobStatus(job_id);
01839         if (old_status == CNetScheduleAPI::eFailed) {
01840             m_StatisticsCounters.CountTransition(CNetScheduleAPI::eFailed,
01841                                                  CNetScheduleAPI::eFailed);
01842             return old_status;
01843         }
01844 
01845         if (old_status != CNetScheduleAPI::eRunning) {
01846             // No job state change
01847             return old_status;
01848         }
01849 
01850         {{
01851             CNSTransaction     transaction(this);
01852 
01853             if (job.Fetch(this, job_id) != CJob::eJF_Ok)
01854                 NCBI_THROW(CNetScheduleException, eInternalError,
01855                            "Error fetching job: " + DecorateJobId(job_id));
01856 
01857             if (!auth_token.empty()) {
01858                 // Need to check authorization token first
01859                 CJob::EAuthTokenCompareResult   token_compare_result =
01860                                             job.CompareAuthToken(auth_token);
01861                 if (token_compare_result == CJob::eInvalidTokenFormat)
01862                     NCBI_THROW(CNetScheduleException, eInvalidAuthToken,
01863                                "Invalid authorization token format");
01864                 if (token_compare_result == CJob::eNoMatch)
01865                     NCBI_THROW(CNetScheduleException, eInvalidAuthToken,
01866                                "Authorization token does not match");
01867                 if (token_compare_result == CJob::ePassportOnlyMatch) {
01868                     // That means the job has been given to another worker node
01869                     // by whatever reason (expired/failed/returned before)
01870                     LOG_POST(Message << Warning << "Received FPUT2 with only "
01871                                                    "passport matched.");
01872                     warning = "Only job passport matched. Command is ignored.";
01873                     return old_status;
01874                 }
01875                 // Here: the authorization token is OK, we can continue
01876             }
01877 
01878             CJobEvent *     event = job.GetLastEvent();
01879             if (!event)
01880                 ERR_POST("No JobEvent for running job " << DecorateJobId(job_id));
01881 
01882             event = &job.AppendEvent();
01883             event->SetEvent(CJobEvent::eFail);
01884             event->SetStatus(CNetScheduleAPI::eFailed);
01885             event->SetTimestamp(curr);
01886             event->SetErrorMsg(err_msg);
01887             event->SetRetCode(ret_code);
01888             event->SetNodeAddr(client.GetAddress());
01889             event->SetClientNode(client.GetNode());
01890             event->SetClientSession(client.GetSession());
01891 
01892             unsigned                run_count = job.GetRunCount();
01893             if (run_count <= failed_retries) {
01894                 job.SetStatus(CNetScheduleAPI::ePending);
01895                 event->SetStatus(CNetScheduleAPI::ePending);
01896 
01897                 new_status = CNetScheduleAPI::ePending;
01898 
01899                 rescheduled = true;
01900             } else {
01901                 job.SetStatus(CNetScheduleAPI::eFailed);
01902                 event->SetStatus(CNetScheduleAPI::eFailed);
01903                 new_status = CNetScheduleAPI::eFailed;
01904                 rescheduled = false;
01905                 if (m_Log)
01906                     LOG_POST(Message << Warning << "Job " << DecorateJobId(job_id)
01907                                      << " failed, exceeded max number of retries ("
01908                                      << failed_retries << ")");
01909             }
01910 
01911             job.SetOutput(output);
01912             job.SetLastTouch(curr);
01913             job.Flush(this);
01914             transaction.Commit();
01915         }}
01916 
01917         m_StatusTracker.SetStatus(job_id, new_status);
01918         if (new_status == CNetScheduleAPI::ePending)
01919             m_StatisticsCounters.CountTransition(CNetScheduleAPI::eRunning,
01920                                                  new_status,
01921                                                  CStatisticsCounters::eFail);
01922         else
01923             m_StatisticsCounters.CountTransition(CNetScheduleAPI::eRunning,
01924                                                  new_status,
01925                                                  CStatisticsCounters::eNone);
01926 
01927         TimeLineRemove(job_id);
01928 
01929         // Replace it with ClearExecuting(client, job_id) when all clients provide
01930         // their credentials and job passport is checked strictly
01931         m_ClientsRegistry.ClearExecuting(job_id);
01932         m_ClientsRegistry.AddToBlacklist(client, job_id);
01933 
01934         m_GCRegistry.UpdateLifetime(job_id, job.GetExpirationTime(m_Timeout,
01935                                                                   m_RunTimeout));
01936 
01937         if (!rescheduled  &&  job.ShouldNotify(curr)) {
01938             m_NotificationsList.NotifyJobStatus(job.GetSubmAddr(),
01939                                                 job.GetSubmNotifPort(),
01940                                                 MakeKey(job_id),
01941                                                 job.GetStatus());
01942         }
01943 
01944         if (rescheduled)
01945             m_NotificationsList.Notify(job_id,
01946                                        job.GetAffinityId(), m_ClientsRegistry,
01947                                        m_AffinityRegistry, m_NotifHifreqPeriod);
01948 
01949     }}
01950 
01951     return old_status;
01952 }
01953 
01954 
01955 string  CQueue::GetAffinityTokenByID(unsigned int  aff_id) const
01956 {
01957     return m_AffinityRegistry.GetTokenByID(aff_id);
01958 }
01959 
01960 
01961 
01962 /// Specified status is OR-ed with the target vector
01963 void CQueue::JobsWithStatus(TJobStatus    status,
01964                             TNSBitVector* bv) const
01965 {
01966     m_StatusTracker.StatusSnapshot(status, bv);
01967 }
01968 
01969 
01970 void CQueue::ClearWorkerNode(const CNSClientId &  client)
01971 {
01972     // Get the running and reading jobs and move them to the corresponding
01973     // states (pending and done)
01974 
01975     TNSBitVector    running_jobs;
01976     TNSBitVector    reading_jobs;
01977 
01978     {{
01979         CFastMutexGuard     guard(m_OperationLock);
01980         unsigned short      wait_port = m_ClientsRegistry.ClearWorkerNode(
01981                                                 client,
01982                                                 m_AffinityRegistry,
01983                                                 running_jobs,
01984                                                 reading_jobs);
01985 
01986         if (wait_port != 0)
01987             m_NotificationsList.UnregisterListener(client, wait_port);
01988     }}
01989 
01990     if (running_jobs.any())
01991         x_ResetRunningDueToClear(client, running_jobs);
01992     if (reading_jobs.any())
01993         x_ResetReadingDueToClear(client, reading_jobs);
01994     return;
01995 }
01996 
01997 
01998 void CQueue::NotifyListenersPeriodically(time_t  current_time)
01999 {
02000     // Triggered from a notification thread only, so check first the
02001     // configured notification interval
02002     static double   last_notif_timeout = -1.0;
02003     static size_t   skip_limit = 0;
02004     static size_t   skip_count;
02005 
02006     if (m_NotifHifreqInterval != last_notif_timeout) {
02007         last_notif_timeout = m_NotifHifreqInterval;
02008         skip_count = 0;
02009         skip_limit = size_t(m_NotifHifreqInterval/0.1);
02010     }
02011 
02012     ++skip_count;
02013     if (skip_count < skip_limit)
02014         return;
02015 
02016     skip_count = 0;
02017 
02018     // The NotifyPeriodically() and CheckTimeout() calls may need to modify
02019     // the clients and affinity registry so it is safer to take the queue lock.
02020     CFastMutexGuard     guard(m_OperationLock);
02021     if (m_StatusTracker.AnyPending())
02022         m_NotificationsList.NotifyPeriodically(current_time,
02023                                                m_NotifLofreqMult,
02024                                                m_ClientsRegistry,
02025                                                m_AffinityRegistry);
02026     else
02027         m_NotificationsList.CheckTimeout(current_time,
02028                                          m_ClientsRegistry,
02029                                          m_AffinityRegistry);
02030 
02031     return;
02032 }
02033 
02034 
02035 void CQueue::PrintClientsList(CNetScheduleHandler &  handler,
02036                               bool                   verbose) const
02037 {
02038     m_ClientsRegistry.PrintClientsList(this, handler,
02039                                        m_AffinityRegistry, verbose);
02040 }
02041 
02042 
02043 void CQueue::PrintNotificationsList(CNetScheduleHandler &  handler,
02044                                     bool                   verbose) const
02045 {
02046     m_NotificationsList.Print(handler,
02047                               m_ClientsRegistry,
02048                               m_AffinityRegistry, verbose);
02049 }
02050 
02051 
02052 void CQueue::PrintAffinitiesList(CNetScheduleHandler &  handler,
02053                                  bool                   verbose) const
02054 {
02055     m_AffinityRegistry.Print(this,
02056                              handler,
02057                              m_ClientsRegistry,
02058                              verbose);
02059 }
02060 
02061 
02062 void CQueue::PrintGroupsList(CNetScheduleHandler &  handler,
02063                              bool                   verbose) const
02064 {
02065     m_GroupRegistry.Print(this, handler, verbose);
02066 }
02067 
02068 
02069 void CQueue::CheckExecutionTimeout(bool  logging)
02070 {
02071     if (!m_RunTimeLine)
02072         return;
02073 
02074     unsigned        queue_run_timeout = GetRunTimeout();
02075     time_t          curr = time(0);
02076     TNSBitVector    bv;
02077     {{
02078         CReadLockGuard  guard(m_RunTimeLineLock);
02079         m_RunTimeLine->ExtractObjects(curr, &bv);
02080     }}
02081 
02082     TNSBitVector::enumerator en(bv.first());
02083     for ( ;en.valid(); ++en) {
02084         x_CheckExecutionTimeout(queue_run_timeout, *en, curr, logging);
02085     }
02086 }
02087 
02088 
02089 void CQueue::x_CheckExecutionTimeout(unsigned   queue_run_timeout,
02090                                      unsigned   job_id,
02091                                      time_t     curr_time,
02092                                      bool       logging)
02093 {
02094     CJob                    job;
02095     unsigned                time_start = 0;
02096     unsigned                run_timeout = 0;
02097     time_t                  exp_time = 0;
02098     TJobStatus              status;
02099     TJobStatus              new_status;
02100     CJobEvent::EJobEvent    event_type;
02101 
02102 
02103     {{
02104         CFastMutexGuard         guard(m_OperationLock);
02105 
02106         status = GetJobStatus(job_id);
02107         if (status == CNetScheduleAPI::eRunning) {
02108             new_status = CNetScheduleAPI::ePending;
02109             event_type = CJobEvent::eTimeout;
02110         } else if (status == CNetScheduleAPI::eReading) {
02111             new_status = CNetScheduleAPI::eDone;
02112             event_type = CJobEvent::eReadTimeout;
02113         } else
02114             return; // Execution timeout is for Running and Reading jobs only
02115 
02116 
02117         {{
02118             CNSTransaction      transaction(this);
02119 
02120             if (job.Fetch(this, job_id) != CJob::eJF_Ok)
02121                 return;
02122 
02123             CJobEvent *     event = job.GetLastEvent();
02124             time_start = event->GetTimestamp();
02125             run_timeout = job.GetRunTimeout();
02126             if (run_timeout == 0)
02127                 run_timeout = queue_run_timeout;
02128 
02129             exp_time = run_timeout ? time_start + run_timeout : 0;
02130             if (curr_time < exp_time) {
02131                 // we need to register job in time line
02132                 TimeLineAdd(job_id, exp_time);
02133                 return;
02134             }
02135 
02136             // The job timeout (running or reading) is expired.
02137             // Check the try counter, we may need to fail the job.
02138             if (status == CNetScheduleAPI::eRunning) {
02139                 // Running state
02140                 if (job.GetRunCount() > m_FailedRetries)
02141                     new_status = CNetScheduleAPI::eFailed;
02142             } else {
02143                 // Reading state
02144                 if (job.GetReadCount() > m_FailedRetries)
02145                     new_status = CNetScheduleAPI::eReadFailed;
02146             }
02147 
02148             job.SetStatus(new_status);
02149             job.SetLastTouch(curr_time);
02150 
02151             event = &job.AppendEvent();
02152             event->SetStatus(new_status);
02153             event->SetEvent(event_type);
02154             event->SetTimestamp(curr_time);
02155 
02156             job.Flush(this);
02157             transaction.Commit();
02158         }}
02159 
02160 
02161         m_StatusTracker.SetStatus(job_id, new_status);
02162         m_GCRegistry.UpdateLifetime(job_id, job.GetExpirationTime(m_Timeout,
02163                                                                   m_RunTimeout));
02164 
02165         if (status == CNetScheduleAPI::eRunning) {
02166             if (new_status == CNetScheduleAPI::ePending) {
02167                 // Timeout and reschedule, put to blacklist as well
02168                 m_ClientsRegistry.ClearExecutingSetBlacklist(job_id);
02169                 m_StatisticsCounters.CountTransition(CNetScheduleAPI::eRunning,
02170                                                      CNetScheduleAPI::ePending,
02171                                                      CStatisticsCounters::eTimeout);
02172             }
02173             else {
02174                 m_ClientsRegistry.ClearExecuting(job_id);
02175                 m_StatisticsCounters.CountTransition(CNetScheduleAPI::eRunning,
02176                                                      CNetScheduleAPI::eFailed,
02177                                                      CStatisticsCounters::eTimeout);
02178             }
02179         } else {
02180             if (new_status == CNetScheduleAPI::eDone) {
02181                 // Timeout and reschedule for reading, put to the blacklist
02182                 m_ClientsRegistry.ClearReadingSetBlacklist(job_id);
02183                 m_StatisticsCounters.CountTransition(CNetScheduleAPI::eReading,
02184                                                      CNetScheduleAPI::eDone,
02185                                                      CStatisticsCounters::eTimeout);
02186             }
02187             else {
02188                 m_ClientsRegistry.ClearReading(job_id);
02189                 m_StatisticsCounters.CountTransition(CNetScheduleAPI::eReading,
02190                                                      CNetScheduleAPI::eReadFailed,
02191                                                      CStatisticsCounters::eTimeout);
02192             }
02193         }
02194 
02195         if (new_status == CNetScheduleAPI::ePending)
02196             m_NotificationsList.Notify(job_id,
02197                                        job.GetAffinityId(), m_ClientsRegistry,
02198                                        m_AffinityRegistry, m_NotifHifreqPeriod);
02199 
02200     }}
02201 
02202     if (new_status == CNetScheduleAPI::eFailed && job.ShouldNotify(time(0)))
02203         m_NotificationsList.NotifyJobStatus(job.GetSubmAddr(),
02204                                             job.GetSubmNotifPort(),
02205                                             MakeKey(job_id),
02206                                             job.GetStatus());
02207 
02208     if (logging)
02209     {
02210         CTime       tm_start;
02211         tm_start.SetTimeT(time_start);
02212         tm_start.ToLocalTime();
02213 
02214         CTime       tm_exp;
02215         tm_exp.SetTimeT(exp_time);
02216         tm_exp.ToLocalTime();
02217 
02218         string      purpose;
02219         if (status == CNetScheduleAPI::eRunning)
02220             purpose = "execution";
02221         else
02222             purpose = "reading";
02223 
02224         GetDiagContext().Extra()
02225                 .Print("msg", "Timeout expired, rescheduled for " + purpose)
02226                 .Print("msg_code", "410")   // The code is for searching in applog
02227                 .Print("job_key", MakeKey(job.GetId()))
02228                 .Print("queue", m_QueueName)
02229                 .Print("run_counter", job.GetRunCount())
02230                 .Print("read_counter", job.GetReadCount())
02231                 .Print("time_start", tm_start.AsString())
02232                 .Print("exp_time", tm_exp.AsString())
02233                 .Print("run_timeout", run_timeout);
02234     }
02235 }
02236 
02237 
02238 // Checks up to given # of jobs at the given status for expiration and
02239 // marks up to given # of jobs for deletion.
02240 // Returns the # of performed scans, the # of jobs marked for deletion and
02241 // the last scanned job id.
02242 SPurgeAttributes  CQueue::CheckJobsExpiry(time_t             current_time,
02243                                           SPurgeAttributes   attributes,
02244                                           unsigned int       last_job,
02245                                           TJobStatus         status)
02246 {
02247     TNSBitVector        job_ids;
02248     SPurgeAttributes    result;
02249     unsigned int        job_id;
02250     unsigned int        aff;
02251     unsigned int        group;
02252 
02253     result.job_id = attributes.job_id;
02254     result.deleted = 0;
02255     {{
02256         CFastMutexGuard     guard(m_OperationLock);
02257 
02258         for (result.scans = 0; result.scans < attributes.scans; ++result.scans) {
02259             job_id = m_StatusTracker.GetNext(status, result.job_id);
02260             if (job_id == 0)
02261                 break;  // No more jobs in the state
02262             if (last_job != 0 && job_id >= last_job)
02263                 break;  // The job in the state is above the limit
02264 
02265             result.job_id = job_id;
02266 
02267             if (m_GCRegistry.DeleteIfTimedOut(job_id, current_time,
02268                                               &aff, &group))
02269             {
02270                 // The job is expired and needs to be marked for deletion
02271                 m_StatusTracker.Erase(job_id);
02272                 job_ids.set_bit(job_id);
02273                 ++result.deleted;
02274 
02275                 // check if the affinity should also be updated
02276                 if (aff != 0)
02277                     m_AffinityRegistry.RemoveJobFromAffinity(job_id, aff);
02278 
02279                 // Check if the group registry should also be updated
02280                 if (group != 0)
02281                     m_GroupRegistry.RemoveJob(group, job_id);
02282 
02283                 if (result.deleted >= attributes.deleted)
02284                     break;
02285             }
02286         }
02287     }}
02288 
02289     if (result.deleted > 0)
02290         x_Erase(job_ids);
02291 
02292     return result;
02293 }
02294 
02295 
02296 void CQueue::TimeLineMove(unsigned job_id, time_t old_time, time_t new_time)
02297 {
02298     if (!job_id || !m_RunTimeLine)
02299         return;
02300 
02301     CWriteLockGuard guard(m_RunTimeLineLock);
02302     m_RunTimeLine->MoveObject(old_time, new_time, job_id);
02303 }
02304 
02305 
02306 void CQueue::TimeLineAdd(unsigned job_id, time_t job_time)
02307 {
02308     if (!job_id  ||  !m_RunTimeLine  ||  !job_time)
02309         return;
02310 
02311     CWriteLockGuard guard(m_RunTimeLineLock);
02312     m_RunTimeLine->AddObject(job_time, job_id);
02313 }
02314 
02315 
02316 void CQueue::TimeLineRemove(unsigned job_id)
02317 {
02318     if (!m_RunTimeLine)
02319         return;
02320 
02321     CWriteLockGuard guard(m_RunTimeLineLock);
02322     m_RunTimeLine->RemoveObject(job_id);
02323 }
02324 
02325 
02326 void CQueue::TimeLineExchange(unsigned  remove_job_id,
02327                               unsigned  add_job_id,
02328                               time_t    new_time)
02329 {
02330     if (!m_RunTimeLine)
02331         return;
02332 
02333     CWriteLockGuard guard(m_RunTimeLineLock);
02334     if (remove_job_id)
02335         m_RunTimeLine->RemoveObject(remove_job_id);
02336     if (add_job_id)
02337         m_RunTimeLine->AddObject(new_time, add_job_id);
02338 }
02339 
02340 
02341 unsigned int  CQueue::DeleteBatch(unsigned int  max_deleted)
02342 {
02343     // Copy the vector with deleted jobs
02344     TNSBitVector    jobs_to_delete;
02345 
02346     {{
02347          CFastMutexGuard     guard(m_JobsToDeleteLock);
02348          jobs_to_delete = m_JobsToDelete;
02349     }}
02350 
02351     static const size_t         chunk_size = 100;
02352     unsigned int                del_rec = 0;
02353     TNSBitVector::enumerator    en = jobs_to_delete.first();
02354     TNSBitVector                deleted_jobs;
02355 
02356     while (en.valid() && del_rec < max_deleted) {
02357         {{
02358             CFastMutexGuard     guard(m_OperationLock);
02359             CNSTransaction      transaction(this);
02360 
02361             for (size_t n = 0;
02362                  en.valid() && n < chunk_size && del_rec < max_deleted;
02363                  ++en, ++n) {
02364                 unsigned int    job_id = *en;
02365 
02366                 try {
02367                     m_QueueDbBlock->job_db.id = job_id;
02368                     m_QueueDbBlock->job_db.Delete();
02369                     ++del_rec;
02370                     deleted_jobs.set_bit(job_id, true);
02371                 } catch (CBDB_ErrnoException& ex) {
02372                     ERR_POST("BDB error " << ex.what());
02373                 }
02374 
02375                 try {
02376                     m_QueueDbBlock->job_info_db.id = job_id;
02377                     m_QueueDbBlock->job_info_db.Delete();
02378                 } catch (CBDB_ErrnoException& ex) {
02379                     ERR_POST("BDB error " << ex.what());
02380                 }
02381 
02382                 x_DeleteJobEvents(job_id);
02383             }
02384 
02385             transaction.Commit();
02386         }}
02387     }
02388 
02389     if (del_rec > 0) {
02390         m_StatisticsCounters.CountDBDeletion(del_rec);
02391 
02392         TNSBitVector::enumerator    en = deleted_jobs.first();
02393         CFastMutexGuard             guard(m_JobsToDeleteLock);
02394         for (; en.valid(); ++en)
02395             m_JobsToDelete.set_bit(*en, false);
02396     }
02397     return del_rec;
02398 }
02399 
02400 
02401 // See CXX-2838 for the description of how affinities garbage collection is
02402 // going  to work.
02403 unsigned int  CQueue::PurgeAffinities(void)
02404 {
02405     unsigned int    aff_dict_size = m_AffinityRegistry.size();
02406 
02407     if (aff_dict_size < (m_AffinityLowMarkPercentage / 100.0) * m_MaxAffinities)
02408         // Did not reach the dictionary low mark
02409         return 0;
02410 
02411     unsigned int    del_limit = m_AffinityHighRemoval;
02412     if (aff_dict_size < (m_AffinityHighMarkPercentage / 100.0) * m_MaxAffinities) {
02413         // Here: check the percentage of the affinities that have no references
02414         // to them
02415         unsigned int    candidates_size = m_AffinityRegistry.CheckRemoveCandidates();
02416 
02417         if (candidates_size < (m_AffinityDirtPercentage / 100.0) * m_MaxAffinities)
02418             // The number of candidates to be deleted is low
02419             return 0;
02420 
02421         del_limit = m_AffinityLowRemoval;
02422     }
02423 
02424 
02425     // Here: need to delete affinities from the memory and DB
02426     CFastMutexGuard     guard(m_OperationLock);
02427     CNSTransaction      transaction(this);
02428 
02429     unsigned int        del_count = m_AffinityRegistry.CollectGarbage(del_limit);
02430     transaction.Commit();
02431 
02432     return del_count;
02433 }
02434 
02435 
02436 unsigned int  CQueue::PurgeGroups(void)
02437 {
02438     CFastMutexGuard     guard(m_OperationLock);
02439     CNSTransaction      transaction(this);
02440 
02441     unsigned int        del_count = m_GroupRegistry.CollectGarbage(100);
02442     transaction.Commit();
02443 
02444     return del_count;
02445 }
02446 
02447 
02448 void  CQueue::PurgeWNodes(time_t  current_time)
02449 {
02450     // Clears the worker nodes affinities if the workers are inactive for
02451     // certain time
02452     CFastMutexGuard     guard(m_OperationLock);
02453 
02454     vector< pair< unsigned int,
02455                   unsigned short > >  notif_to_reset =
02456                                         m_ClientsRegistry.Purge(
02457                                                 current_time, m_WNodeTimeout,
02458                                                 m_AffinityRegistry);
02459     for (vector< pair< unsigned int,
02460                        unsigned short > >::const_iterator  k = notif_to_reset.begin();
02461          k != notif_to_reset.end(); ++k)
02462         m_NotificationsList.UnregisterListener(k->first, k->second);
02463 
02464     return;
02465 }
02466 
02467 
02468 void CQueue::x_DeleteJobEvents(unsigned int  job_id)
02469 {
02470     try {
02471         for (unsigned int  event_number = 0; ; ++event_number) {
02472             m_QueueDbBlock->events_db.id = job_id;
02473             m_QueueDbBlock->events_db.event_id = event_number;
02474             if (m_QueueDbBlock->events_db.Delete() == eBDB_NotFound)
02475                 break;
02476         }
02477     } catch (CBDB_ErrnoException& ex) {
02478         ERR_POST("BDB error " << ex.what());
02479     }
02480 }
02481 
02482 
02483 CBDB_FileCursor& CQueue::GetEventsCursor()
02484 {
02485     CBDB_Transaction* trans = m_QueueDbBlock->events_db.GetBDBTransaction();
02486     CBDB_FileCursor* cur = m_EventsCursor.get();
02487     if (!cur) {
02488         cur = new CBDB_FileCursor(m_QueueDbBlock->events_db);
02489         m_EventsCursor.reset(cur);
02490     } else {
02491         cur->Close();
02492     }
02493     cur->ReOpen(trans);
02494     return *cur;
02495 }
02496 
02497 
02498 void CQueue::PrintSubmHosts(CNetScheduleHandler &  handler) const
02499 {
02500     string      hosts;
02501 
02502     {{
02503         CQueueParamAccessor     qp(*this);
02504         hosts = qp.GetSubmHosts().Print("OK:", "\n");
02505     }}
02506 
02507     if (!hosts.empty())
02508         hosts += "\n";
02509     handler.WriteMessage(hosts);
02510 }
02511 
02512 
02513 void CQueue::PrintWNodeHosts(CNetScheduleHandler &  handler) const
02514 {
02515     string      hosts;
02516 
02517     {{
02518          CQueueParamAccessor     qp(*this);
02519          hosts = qp.GetWnodeHosts().Print("OK:", "\n");
02520     }}
02521 
02522     if (!hosts.empty())
02523         hosts += "\n";
02524     handler.WriteMessage(hosts);
02525 }
02526 
02527 
02528 void CQueue::x_PrintShortJobStat(CNetScheduleHandler &  handler,
02529                                  const CJob &           job)
02530 {
02531     string      reply = MakeKey(job.GetId()) + "\t" +
02532                         CNetScheduleAPI::StatusToString(job.GetStatus()) + "\t" +
02533                         job.GetQuotedInput() + "\t" +
02534                         job.GetQuotedOutput() + "\t";
02535 
02536     const CJobEvent *   last_event = job.GetLastEvent();
02537     if (last_event)
02538         reply += last_event->GetQuotedErrorMsg();
02539     else
02540         reply += "''";
02541     handler.WriteMessage("OK:", reply);
02542 }
02543 
02544 
02545 size_t CQueue::PrintJobDbStat(CNetScheduleHandler &     handler,
02546                               unsigned                  job_id)
02547 {
02548     // Check first that the job has not been deleted yet
02549     {{
02550         CFastMutexGuard     guard(m_JobsToDeleteLock);
02551         if (m_JobsToDelete[job_id])
02552             return 0;
02553     }}
02554 
02555     CJob                job;
02556     {{
02557         CFastMutexGuard     guard(m_OperationLock);
02558 
02559         m_QueueDbBlock->job_db.SetTransaction(NULL);
02560         m_QueueDbBlock->events_db.SetTransaction(NULL);
02561         m_QueueDbBlock->job_info_db.SetTransaction(NULL);
02562 
02563         CJob::EJobFetchResult   res = job.Fetch(this, job_id);
02564         if (res != CJob::eJF_Ok)
02565             return 0;
02566     }}
02567 
02568     job.Print(handler, *this, m_AffinityRegistry, m_GroupRegistry);
02569     return 1;
02570 }
02571 
02572 
02573 void CQueue::PrintAllJobDbStat(CNetScheduleHandler &   handler,
02574                                const string &          group,
02575                                TJobStatus              job_status,
02576                                unsigned int            start_after_job_id,
02577                                unsigned int            count)
02578 {
02579     // Form a bit vector of all jobs to dump
02580     vector<CNetScheduleAPI::EJobStatus>     statuses;
02581     TNSBitVector                            jobs_to_dump;
02582 
02583     if (job_status == CNetScheduleAPI::eJobNotFound) {
02584         // All statuses
02585         statuses.push_back(CNetScheduleAPI::ePending);
02586         statuses.push_back(CNetScheduleAPI::eRunning);
02587         statuses.push_back(CNetScheduleAPI::eCanceled);
02588         statuses.push_back(CNetScheduleAPI::eFailed);
02589         statuses.push_back(CNetScheduleAPI::eDone);
02590         statuses.push_back(CNetScheduleAPI::eReading);
02591         statuses.push_back(CNetScheduleAPI::eConfirmed);
02592         statuses.push_back(CNetScheduleAPI::eReadFailed);
02593     }
02594     else {
02595         // The user specified one state explicitly
02596         statuses.push_back(job_status);
02597     }
02598 
02599 
02600     {{
02601         CFastMutexGuard     guard(m_OperationLock);
02602 
02603         jobs_to_dump = m_StatusTracker.GetJobs(statuses);
02604     }}
02605 
02606     // Check if a certain group has been specified
02607     if (!group.empty())
02608         jobs_to_dump &= m_GroupRegistry.GetJobs(group);
02609 
02610     x_DumpJobs(handler, jobs_to_dump, start_after_job_id, count);
02611     return;
02612 }
02613 
02614 
02615 void CQueue::x_DumpJobs(CNetScheduleHandler &   handler,
02616                         const TNSBitVector &    jobs_to_dump,
02617                         unsigned int            start_after_job_id,
02618                         unsigned int            count)
02619 {
02620     // Skip the jobs which should not be dumped
02621     TNSBitVector::enumerator    en(jobs_to_dump.first());
02622     while (en.valid() && *en <= start_after_job_id)
02623         ++en;
02624 
02625     // Identify the required buffer size for jobs
02626     size_t      buffer_size = m_DumpBufferSize;
02627     if (count != 0 && count < buffer_size)
02628         buffer_size = count;
02629 
02630     {{
02631         CJob    buffer[buffer_size];
02632 
02633         size_t      read_jobs = 0;
02634         size_t      printed_count = 0;
02635 
02636         for ( ; en.valid(); ) {
02637             {{
02638                 CFastMutexGuard     guard(m_OperationLock);
02639                 m_QueueDbBlock->job_db.SetTransaction(NULL);
02640                 m_QueueDbBlock->events_db.SetTransaction(NULL);
02641                 m_QueueDbBlock->job_info_db.SetTransaction(NULL);
02642 
02643                 for ( ; en.valid() && read_jobs < buffer_size; ++en )
02644                     if (buffer[read_jobs].Fetch(this, *en) == CJob::eJF_Ok) {
02645                         ++read_jobs;
02646                         ++printed_count;
02647 
02648                         if (count != 0)
02649                             if (printed_count >= count)
02650                                 break;
02651                     }
02652             }}
02653 
02654             // Print what was read
02655             for (size_t  index = 0; index < read_jobs; ++index) {
02656                 handler.WriteMessage("");
02657                 buffer[index].Print(handler, *this,
02658                                     m_AffinityRegistry, m_GroupRegistry);
02659             }
02660 
02661             if (count != 0)
02662                 if (printed_count >= count)
02663                     break;
02664 
02665             read_jobs = 0;
02666         }
02667     }}
02668 
02669     return;
02670 }
02671 
02672 
02673 unsigned CQueue::CountStatus(TJobStatus st) const
02674 {
02675     return m_StatusTracker.CountStatus(st);
02676 }
02677 
02678 
02679 void CQueue::StatusStatistics(TJobStatus                status,
02680                               TNSBitVector::statistics* st) const
02681 {
02682     m_StatusTracker.StatusStatistics(status, st);
02683 }
02684 
02685 
02686 void CQueue::TouchClientsRegistry(CNSClientId &  client)
02687 {
02688     TNSBitVector        running_jobs;
02689     TNSBitVector        reading_jobs;
02690 
02691     {{
02692         CFastMutexGuard     guard(m_OperationLock);
02693         unsigned short      wait_port = m_ClientsRegistry.Touch(
02694                                                 client,
02695                                                 m_AffinityRegistry,
02696                                                 running_jobs,
02697                                                 reading_jobs);
02698         if (wait_port != 0)
02699             m_NotificationsList.UnregisterListener(client, wait_port);
02700     }}
02701 
02702 
02703     if (running_jobs.any())
02704         x_ResetRunningDueToNewSession(client, running_jobs);
02705     if (reading_jobs.any())
02706         x_ResetReadingDueToNewSession(client, reading_jobs);
02707     return;
02708 }
02709 
02710 
02711 // Moves the job to Pending/Failed or to Done/ReadFailed
02712 // when event event_type has come
02713 TJobStatus  CQueue::x_ResetDueTo(const CNSClientId &   client,
02714                                  unsigned int          job_id,
02715                                  time_t                current_time,
02716                                  TJobStatus            status_from,
02717                                  CJobEvent::EJobEvent  event_type)
02718 {
02719     CJob                job;
02720     TJobStatus          new_status;
02721 
02722     CFastMutexGuard     guard(m_OperationLock);
02723 
02724     {{
02725         CNSTransaction      transaction(this);
02726 
02727         if (job.Fetch(this, job_id) != CJob::eJF_Ok) {
02728             ERR_POST("Cannot fetch job to reset it due to " <<
02729                      CJobEvent::EventToString(event_type) <<
02730                      ". Job: " << DecorateJobId(job_id));
02731             return CNetScheduleAPI::eJobNotFound;
02732         }
02733 
02734         if (status_from == CNetScheduleAPI::eRunning) {
02735             // The job was running
02736             if (job.GetRunCount() > m_FailedRetries)
02737                 new_status = CNetScheduleAPI::eFailed;
02738             else
02739                 new_status = CNetScheduleAPI::ePending;
02740         } else {
02741             // The job was reading
02742             if (job.GetReadCount() > m_FailedRetries)
02743                 new_status = CNetScheduleAPI::eReadFailed;
02744             else
02745                 new_status = CNetScheduleAPI::eDone;
02746         }
02747 
02748         job.SetStatus(new_status);
02749         job.SetLastTouch(current_time);
02750 
02751         CJobEvent *     event = &job.AppendEvent();
02752         event->SetStatus(new_status);
02753         event->SetEvent(event_type);
02754         event->SetTimestamp(current_time);
02755         event->SetClientNode(client.GetNode());
02756         event->SetClientSession(client.GetSession());
02757 
02758         job.Flush(this);
02759         transaction.Commit();
02760     }}
02761 
02762     // Update the memory map
02763     m_StatusTracker.SetStatus(job_id, new_status);
02764 
02765     m_GCRegistry.UpdateLifetime(job_id, job.GetExpirationTime(m_Timeout,
02766                                                               m_RunTimeout));
02767 
02768     // remove the job from the time line
02769     TimeLineRemove(job_id);
02770 
02771     // Notify those who wait for the jobs if needed
02772     if (new_status == CNetScheduleAPI::ePending)
02773         m_NotificationsList.Notify(job_id, job.GetAffinityId(),
02774                                    m_ClientsRegistry,
02775                                    m_AffinityRegistry,
02776                                    m_NotifHifreqPeriod);
02777 
02778     return new_status;
02779 }
02780 
02781 
02782 void CQueue::x_ResetRunningDueToClear(const CNSClientId &   client,
02783                                       const TNSBitVector &  jobs)
02784 {
02785     time_t      current_time = time(0);
02786     for (TNSBitVector::enumerator  en(jobs.first()); en.valid(); ++en) {
02787         try {
02788             TJobStatus  new_status = x_ResetDueTo(client, *en,
02789                                                   current_time,
02790                                                   CNetScheduleAPI::eRunning,
02791                                                   CJobEvent::eClear);
02792             if (new_status != CNetScheduleAPI::eJobNotFound)
02793                 m_StatisticsCounters.CountTransition(
02794                                             CNetScheduleAPI::eRunning,
02795                                             new_status,
02796                                             CStatisticsCounters::eClear);
02797         } catch (...) {
02798             ERR_POST("Error resetting a running job when worker node is "
02799                      "cleared. Job: " << DecorateJobId(*en));
02800         }
02801     }
02802 }
02803 
02804 
02805 void CQueue::x_ResetReadingDueToClear(const CNSClientId &   client,
02806                                       const TNSBitVector &  jobs)
02807 {
02808     time_t      current_time = time(0);
02809     for (TNSBitVector::enumerator  en(jobs.first()); en.valid(); ++en) {
02810         try {
02811             TJobStatus  new_status = x_ResetDueTo(client, *en,
02812                                                   current_time,
02813                                                   CNetScheduleAPI::eReading,
02814                                                   CJobEvent::eClear);
02815             if (new_status != CNetScheduleAPI::eJobNotFound)
02816                 m_StatisticsCounters.CountTransition(
02817                                             CNetScheduleAPI::eReading,
02818                                             new_status,
02819                                             CStatisticsCounters::eClear);
02820         } catch (...) {
02821             ERR_POST("Error resetting a reading job when worker node is "
02822                      "cleared. Job: " << DecorateJobId(*en));
02823         }
02824     }
02825 }
02826 
02827 
02828 void CQueue::x_ResetRunningDueToNewSession(const CNSClientId &   client,
02829                                            const TNSBitVector &  jobs)
02830 {
02831     time_t      current_time = time(0);
02832     for (TNSBitVector::enumerator  en(jobs.first()); en.valid(); ++en) {
02833         try {
02834             TJobStatus  new_status = x_ResetDueTo(client, *en,
02835                                                   current_time,
02836                                                   CNetScheduleAPI::eRunning,
02837                                                   CJobEvent::eSessionChanged);
02838             if (new_status != CNetScheduleAPI::eJobNotFound)
02839                 m_StatisticsCounters.CountTransition(
02840                                             CNetScheduleAPI::eRunning,
02841                                             new_status,
02842                                             CStatisticsCounters::eNewSession);
02843         } catch (...) {
02844             ERR_POST("Error resetting a running job when worker node "
02845                      "changed session. Job: " << DecorateJobId(*en));
02846         }
02847     }
02848 }
02849 
02850 
02851 void CQueue::x_ResetReadingDueToNewSession(const CNSClientId &   client,
02852                                            const TNSBitVector &  jobs)
02853 {
02854     time_t      current_time = time(0);
02855     for (TNSBitVector::enumerator  en(jobs.first()); en.valid(); ++en) {
02856         try {
02857             TJobStatus  new_status = x_ResetDueTo(client, *en,
02858                                                   current_time,
02859                                                   CNetScheduleAPI::eReading,
02860                                                   CJobEvent::eSessionChanged);
02861             if (new_status != CNetScheduleAPI::eJobNotFound)
02862                 m_StatisticsCounters.CountTransition(
02863                                             CNetScheduleAPI::eReading,
02864                                             new_status,
02865                                             CStatisticsCounters::eNewSession);
02866         } catch (...) {
02867             ERR_POST("Error resetting a reading job when worker node "
02868                      "changed session. Job: " << DecorateJobId(*en));
02869         }
02870     }
02871 }
02872 
02873 
02874 void CQueue::x_RegisterGetListener(const CNSClientId &   client,
02875                                    unsigned short        port,
02876                                    unsigned int          timeout,
02877                                    const TNSBitVector &  aff_ids,
02878                                    bool                  wnode_aff,
02879                                    bool                  any_aff,
02880                                    bool                  exclusive_new_affinity,
02881                                    bool                  new_format)
02882 {
02883     // Add to the notification list and save the wait port
02884     m_NotificationsList.RegisterListener(client, port, timeout,
02885                                          wnode_aff, any_aff,
02886                                          exclusive_new_affinity, new_format);
02887     if (client.IsComplete())
02888         m_ClientsRegistry.SetWaiting(client, port, aff_ids, m_AffinityRegistry);
02889     return;
02890 }
02891 
02892 
02893 bool CQueue::x_UnregisterGetListener(const CNSClientId &  client,
02894                                      unsigned short       port)
02895 {
02896     unsigned short      notif_port = port;
02897 
02898     if (client.IsComplete())
02899         // New clients have the port in their registry record
02900         notif_port = m_ClientsRegistry.ResetWaiting(client, m_AffinityRegistry);
02901 
02902     if (notif_port > 0) {
02903         m_NotificationsList.UnregisterListener(client, notif_port);
02904         return true;
02905     }
02906 
02907     return false;
02908 }
02909 
02910 
02911 void CQueue::PrintStatistics(size_t &  aff_count)
02912 {
02913     size_t      affinities = m_AffinityRegistry.size();
02914     aff_count += affinities;
02915 
02916     // The member is called only if there is a request context
02917     CDiagContext_Extra extra = GetDiagContext().Extra()
02918         .Print("queue", GetQueueName())
02919         .Print("affinities", affinities)
02920         .Print("pending", CountStatus(CNetScheduleAPI::ePending))
02921         .Print("running", CountStatus(CNetScheduleAPI::eRunning))
02922         .Print("canceled", CountStatus(CNetScheduleAPI::eCanceled))
02923         .Print("failed", CountStatus(CNetScheduleAPI::eFailed))
02924         .Print("done", CountStatus(CNetScheduleAPI::eDone))
02925         .Print("reading", CountStatus(CNetScheduleAPI::eReading))
02926         .Print("confirmed", CountStatus(CNetScheduleAPI::eConfirmed))
02927         .Print("readfailed", CountStatus(CNetScheduleAPI::eReadFailed));
02928     m_StatisticsCounters.PrintTransitions(extra);
02929     extra.Flush();
02930 }
02931 
02932 
02933 void CQueue::PrintTransitionCounters(CNetScheduleHandler &  handler)
02934 {
02935     m_StatisticsCounters.PrintTransitions(handler);
02936     handler.WriteMessage("OK:garbage_jobs: " +
02937                          NStr::IntToString(m_JobsToDelete.count()));
02938     handler.WriteMessage("OK:affinity_registry_size: " +
02939                          NStr::SizetToString(m_AffinityRegistry.size()));
02940     handler.WriteMessage("OK:client_registry_size: " +
02941                          NStr::SizetToString(m_ClientsRegistry.size()));
02942     return;
02943 }
02944 
02945 
02946 void CQueue::PrintJobsStat(CNetScheduleHandler &  handler,
02947                            const string &         group_token,
02948                            const string &         aff_token)
02949 {
02950     size_t              jobs_per_state[g_ValidJobStatusesSize];
02951     TNSBitVector        group_jobs;
02952     TNSBitVector        aff_jobs;
02953 
02954     {{
02955         CFastMutexGuard     guard(m_OperationLock);
02956 
02957         if (!group_token.empty())
02958             group_jobs = m_GroupRegistry.GetJobs(group_token);
02959         if (!aff_token.empty()) {
02960             unsigned int  aff_id = m_AffinityRegistry.GetIDByToken(aff_token);
02961             if (aff_id == 0)
02962                 NCBI_THROW(CNetScheduleException, eAffinityNotFound,
02963                            "Unknown affinity token \"" + aff_token + "\"");
02964 
02965             aff_jobs = m_AffinityRegistry.GetJobsWithAffinity(aff_id);
02966         }
02967 
02968         for (size_t  index(0); index < g_ValidJobStatusesSize; ++index) {
02969             TNSBitVector  candidates =
02970                             m_StatusTracker.GetJobs(g_ValidJobStatuses[index]);
02971 
02972             if (!group_token.empty())
02973                 candidates &= group_jobs;
02974             if (!aff_token.empty())
02975                 candidates &= aff_jobs;
02976 
02977             jobs_per_state[index] = candidates.count();
02978         }
02979     }}
02980 
02981     size_t              total = 0;
02982     for (size_t  index(0); index < g_ValidJobStatusesSize; ++index) {
02983         handler.WriteMessage("OK:" + CNetScheduleAPI::StatusToString(g_ValidJobStatuses[index]) +
02984                              ": " + NStr::SizetToString(jobs_per_state[index]));
02985         total += jobs_per_state[index];
02986     }
02987     handler.WriteMessage("OK:Total: " +
02988                          NStr::SizetToString(total));
02989     return;
02990 }
02991 
02992 
02993 unsigned int CQueue::CountActiveJobs(void) const
02994 {
02995     vector<CNetScheduleAPI::EJobStatus>     statuses;
02996 
02997     statuses.push_back(CNetScheduleAPI::ePending);
02998     statuses.push_back(CNetScheduleAPI::eRunning);
02999     return m_StatusTracker.CountStatus(statuses);
03000 }
03001 
03002 
03003 void CQueue::x_UpdateDB_PutResultNoLock(unsigned             job_id,
03004                                         const string &       auth_token,
03005                                         time_t               curr,
03006                                         int                  ret_code,
03007                                         const string &       output,
03008                                         CJob &               job,
03009                                         const CNSClientId &  client)
03010 {
03011     if (job.Fetch(this, job_id) != CJob::eJF_Ok)
03012         NCBI_THROW(CNetScheduleException, eInternalError,
03013                    "Error fetching job: " + DecorateJobId(job_id));
03014 
03015     if (!auth_token.empty()) {
03016         // Need to check authorization token first
03017         CJob::EAuthTokenCompareResult   token_compare_result =
03018                                         job.CompareAuthToken(auth_token);
03019         if (token_compare_result == CJob::eInvalidTokenFormat)
03020             NCBI_THROW(CNetScheduleException, eInvalidAuthToken,
03021                        "Invalid authorization token format");
03022         if (token_compare_result == CJob::eNoMatch)
03023             NCBI_THROW(CNetScheduleException, eInvalidAuthToken,
03024                        "Authorization token does not match");
03025         if (token_compare_result == CJob::ePassportOnlyMatch) {
03026             // That means that the job has been executing by another worker
03027             // node at the moment, but we can accept the results anyway
03028             LOG_POST(Message << Warning << "Received PUT2 with only "
03029                                            "passport matched.");
03030         }
03031         // Here: the authorization token is OK, we can continue
03032     }
03033 
03034     // Append the event
03035     CJobEvent *     event = &job.AppendEvent();
03036     event->SetStatus(CNetScheduleAPI::eDone);
03037     event->SetEvent(CJobEvent::eDone);
03038     event->SetTimestamp(curr);
03039     event->SetRetCode(ret_code);
03040 
03041     event->SetClientNode(client.GetNode());
03042     event->SetClientSession(client.GetSession());
03043     event->SetNodeAddr(client.GetAddress());
03044 
03045     job.SetStatus(CNetScheduleAPI::eDone);
03046     job.SetOutput(output);
03047     job.SetLastTouch(curr);
03048 
03049     job.Flush(this);
03050     return;
03051 }
03052 
03053 
03054 // If the job.job_id != 0 => the job has been read successfully
03055 // Exception => DB errors
03056 // Return: true  => job is ok to get
03057 //         false => job is expired and was marked for deletion,
03058 //                  need to get another job
03059 bool  CQueue::x_UpdateDB_GetJobNoLock(const CNSClientId &  client,
03060                                       time_t               curr,
03061                                       unsigned int         job_id,
03062                                       CJob &               job)
03063 {
03064     CNSTransaction      transaction(this);
03065 
03066     if (job.Fetch(this, job_id) != CJob::eJF_Ok)
03067         NCBI_THROW(CNetScheduleException, eInternalError,
03068                    "Cannot read job info from DB");
03069 
03070     // The job has been read successfully, check if it is still within the
03071     // expiration timeout; note: run_timeout is not applicable because the job
03072     // is guaranteed in the pending state
03073     if (job.GetExpirationTime(m_Timeout, 0) <= curr) {
03074         // The job has expired, so mark it for deletion
03075         EraseJob(job_id);
03076 
03077         if (job.GetAffinityId() != 0)
03078             m_AffinityRegistry.RemoveJobFromAffinity(
03079                                     job_id, job.GetAffinityId());
03080         if (job.GetGroupId() != 0)
03081             m_GroupRegistry.RemoveJob(job.GetGroupId(), job_id);
03082 
03083         return false;
03084     }
03085 
03086     unsigned        run_count = job.GetRunCount() + 1;
03087     CJobEvent &     event = job.AppendEvent();
03088 
03089     event.SetStatus(CNetScheduleAPI::eRunning);
03090     event.SetEvent(CJobEvent::eRequest);
03091     event.SetTimestamp(curr);
03092 
03093     event.SetClientNode(client.GetNode());
03094     event.SetClientSession(client.GetSession());
03095     event.SetNodeAddr(client.GetAddress());
03096 
03097     job.SetStatus(CNetScheduleAPI::eRunning);
03098     job.SetRunTimeout(0);
03099     job.SetRunCount(run_count);
03100     job.SetLastTouch(curr);
03101 
03102     job.Flush(this);
03103     transaction.Commit();
03104 
03105     return true;
03106 }
03107 
03108 
03109 END_NCBI_SCOPE
03110 
Modified on Wed May 23 12:58:46 2012 by modify_doxy.py rev. 337098