NCBI C++ ToolKit
netschedule_client_sample.cpp
Go to the documentation of this file.
00001 /*  $Id: netschedule_client_sample.cpp 52403 2011-12-21 21:36:31Z kazimird $
00002  * ===========================================================================
00003  *
00004  *                            PUBLIC DOMAIN NOTICE
00005  *               National Center for Biotechnology Information
00006  *
00007  *  This software/database is a "United States Government Work" under the
00008  *  terms of the United States Copyright Act.  It was written as part of
00009  *  the author's official duties as a United States Government employee and
00010  *  thus cannot be copyrighted.  This software/database is freely available
00011  *  to the public for use. The National Library of Medicine and the U.S.
00012  *  Government have not placed any restriction on its use or reproduction.
00013  *
00014  *  Although all reasonable efforts have been taken to ensure the accuracy
00015  *  and reliability of the software and data, the NLM and the U.S.
00016  *  Government do not and cannot warrant the performance or results that
00017  *  may be obtained by using this software or data. The NLM and the U.S.
00018  *  Government disclaim all warranties, express or implied, including
00019  *  warranties of performance, merchantability or fitness for any particular
00020  *  purpose.
00021  *
00022  *  Please cite the author in any work or product based on this material.
00023  *
00024  * ===========================================================================
00025  *
00026  * Authors:  Anatoliy Kuznetsov
00027  *
00028  * File Description:  NetSchedule client test
00029  *
00030  */
00031 
00032 #include <ncbi_pch.hpp>
00033 #include <corelib/ncbiapp.hpp>
00034 #include <corelib/ncbiargs.hpp>
00035 #include <corelib/ncbienv.hpp>
00036 #include <corelib/ncbireg.hpp>
00037 #include <corelib/ncbi_system.hpp>
00038 #include <corelib/ncbimisc.hpp>
00039 
00040 #include <connect/services/netschedule_api.hpp>
00041 #include <connect/ncbi_socket.hpp>
00042 #include <connect/ncbi_core_cxx.hpp>
00043 #include <connect/ncbi_types.h>
00044 
00045 
00046 USING_NCBI_SCOPE;
00047 
00048     
00049 ///////////////////////////////////////////////////////////////////////
00050 
00051 
00052 /// Sample application
00053 ///
00054 /// @internal
00055 ///
00056 class CSampleNetScheduleClient : public CNcbiApplication
00057 {
00058 public:
00059     void Init(void);
00060     int Run(void);
00061 };
00062 
00063 
00064 
00065 void CSampleNetScheduleClient::Init(void)
00066 {
00067     SetDiagPostFlag(eDPF_Trace);
00068     SetDiagPostLevel(eDiag_Info);
00069     
00070     // Setup command line arguments and parameters
00071 
00072     // Create command-line argument descriptions class
00073     auto_ptr<CArgDescriptions> arg_desc(new CArgDescriptions);
00074 
00075     // Specify USAGE context
00076     arg_desc->SetUsageContext(GetArguments().GetProgramBasename(),
00077                               "NetSchedule client");
00078     
00079     arg_desc->AddPositional("service", 
00080                             "NetSchedule service name", 
00081                             CArgDescriptions::eString);
00082 
00083     arg_desc->AddPositional("queue", 
00084                             "NetSchedule queue name (like: noname).",
00085                             CArgDescriptions::eString);
00086 
00087 
00088     arg_desc->AddOptionalKey("jcount", 
00089                              "jcount",
00090                              "Number of jobs to submit",
00091                              CArgDescriptions::eInteger);
00092 
00093 
00094     
00095     // Setup arg.descriptions for this application
00096     SetupArgDescriptions(arg_desc.release());
00097 }
00098 
00099 
00100 int CSampleNetScheduleClient::Run(void)
00101 {
00102     CArgs args = GetArgs();
00103     const string&  service  = args["service"].AsString();
00104     const string&  queue_name = args["queue"].AsString();  
00105 
00106     unsigned jcount = 100;
00107     if (args["jcount"]) {
00108         jcount = args["jcount"].AsInteger();
00109     }
00110     CNetScheduleAPI::EJobStatus status;
00111     CNetScheduleAPI cl(service, "client_sample", queue_name);
00112 
00113     CNetScheduleSubmitter submitter = cl.GetSubmitter();
00114 
00115     const string input = "Hello " + queue_name;
00116 
00117     CNetScheduleJob job(input);
00118     submitter.SubmitJob(job);
00119     NcbiCout << job.job_id << NcbiEndl;
00120 
00121     
00122     
00123     vector<string> jobs;
00124 
00125     {{
00126     CStopWatch sw(CStopWatch::eStart);
00127 
00128     NcbiCout << "Submit " << jcount << " jobs..." << NcbiEndl;
00129 
00130     for (unsigned i = 0; i < jcount; ++i) {
00131         CNetScheduleJob job(input);
00132         submitter.SubmitJob(job);
00133         jobs.push_back(job.job_id);
00134         if (i % 1000 == 0) {
00135             NcbiCout << "." << flush;
00136         }
00137     }
00138     NcbiCout << NcbiEndl << "Done." << NcbiEndl;
00139     double elapsed = sw.Elapsed();
00140     double avg = elapsed / jcount;
00141 
00142     NcbiCout.setf(IOS_BASE::fixed, IOS_BASE::floatfield);
00143     NcbiCout << "Avg time:" << avg << " sec." << NcbiEndl;
00144     }}
00145 
00146 
00147     // Waiting for jobs to be done
00148 
00149     NcbiCout << "Waiting for jobs..." << jobs.size() << NcbiEndl;
00150     unsigned cnt = 0;
00151     SleepMilliSec(5000);
00152 
00153     CNetScheduleAdmin admin = cl.GetAdmin();
00154     /*
00155     CNetScheduleKeys keys;
00156     admin.RetrieveKeys("status=pending", keys);
00157 
00158     for (CNetScheduleKeys::const_iterator it = keys.begin();
00159         it != keys.end(); ++it) {
00160         cout << string(*it) << endl;
00161     }
00162     */
00163             
00164     unsigned last_jobs = 0;
00165     unsigned no_jobs_executes_cnt = 0;
00166     
00167     while (jobs.size()) {
00168         NON_CONST_ITERATE(vector<string>, it, jobs) {
00169             CNetScheduleJob job;
00170             job.job_id = *it;
00171             status = submitter.GetJobDetails(job);
00172 
00173             if (status == CNetScheduleAPI::eDone) {
00174                 string expected_output = "DONE " + queue_name;
00175                 if (job.output != expected_output || job.ret_code != 0) {
00176                     ERR_POST("Unexpected output or return code:" +
00177                              job.output);
00178                 }
00179                 jobs.erase(it);
00180                 ++cnt;
00181                 break;
00182             } else 
00183             if (status != CNetScheduleAPI::ePending) {
00184                 if (status == CNetScheduleAPI::eJobNotFound) {
00185                     NcbiCerr << "Job lost:" << job.job_id << NcbiEndl;
00186                 }
00187                 jobs.erase(it);
00188                 ++cnt;
00189                 break;                
00190             }
00191             
00192             ++cnt;
00193             if (cnt % 1000 == 0) {
00194                 NcbiCout << "Waiting for " 
00195                          << jobs.size() 
00196                          << " jobs."
00197                          << NcbiEndl;
00198                 // it is necessary to give system a rest periodically
00199                 SleepMilliSec(2000);
00200                 // check status of only first 1000 jobs
00201                 // since the JS queue execution priority is FIFO
00202                 break;
00203             }
00204         }
00205         
00206         // check if worker node picks up jobs, otherwise stop
00207         // trying after 10 attempts.        
00208         
00209         if (jobs.size() == last_jobs) {
00210             ++no_jobs_executes_cnt;
00211             if (no_jobs_executes_cnt == 3) {
00212                 NcbiCout << "No progress in job execution. Stopping..."
00213                          << NcbiEndl;
00214                 break;
00215             } else {
00216                 last_jobs = jobs.size();
00217             }
00218         }
00219         
00220     } // while
00221 
00222     NcbiCout << NcbiEndl << "Done." << NcbiEndl;
00223     if (jobs.size()) {
00224         NcbiCout << "Remaning job count = " << jobs.size() << NcbiEndl;
00225     }
00226     return 0;
00227 }
00228 
00229 
00230 int main(int argc, const char* argv[])
00231 {
00232     return CSampleNetScheduleClient().AppMain(argc, argv);
00233 }
Modified on Wed May 23 13:21:16 2012 by modify_doxy.py rev. 337098