|
NCBI C++ ToolKit
|
00001 /* $Id: netschedule_client_sample.cpp 52403 2011-12-21 21:36:31Z kazimird $ 00002 * =========================================================================== 00003 * 00004 * PUBLIC DOMAIN NOTICE 00005 * National Center for Biotechnology Information 00006 * 00007 * This software/database is a "United States Government Work" under the 00008 * terms of the United States Copyright Act. It was written as part of 00009 * the author's official duties as a United States Government employee and 00010 * thus cannot be copyrighted. This software/database is freely available 00011 * to the public for use. The National Library of Medicine and the U.S. 00012 * Government have not placed any restriction on its use or reproduction. 00013 * 00014 * Although all reasonable efforts have been taken to ensure the accuracy 00015 * and reliability of the software and data, the NLM and the U.S. 00016 * Government do not and cannot warrant the performance or results that 00017 * may be obtained by using this software or data. The NLM and the U.S. 00018 * Government disclaim all warranties, express or implied, including 00019 * warranties of performance, merchantability or fitness for any particular 00020 * purpose. 00021 * 00022 * Please cite the author in any work or product based on this material. 00023 * 00024 * =========================================================================== 00025 * 00026 * Authors: Anatoliy Kuznetsov 00027 * 00028 * File Description: NetSchedule client test 00029 * 00030 */ 00031 00032 #include <ncbi_pch.hpp> 00033 #include <corelib/ncbiapp.hpp> 00034 #include <corelib/ncbiargs.hpp> 00035 #include <corelib/ncbienv.hpp> 00036 #include <corelib/ncbireg.hpp> 00037 #include <corelib/ncbi_system.hpp> 00038 #include <corelib/ncbimisc.hpp> 00039 00040 #include <connect/services/netschedule_api.hpp> 00041 #include <connect/ncbi_socket.hpp> 00042 #include <connect/ncbi_core_cxx.hpp> 00043 #include <connect/ncbi_types.h> 00044 00045 00046 USING_NCBI_SCOPE; 00047 00048 00049 /////////////////////////////////////////////////////////////////////// 00050 00051 00052 /// Sample application 00053 /// 00054 /// @internal 00055 /// 00056 class CSampleNetScheduleClient : public CNcbiApplication 00057 { 00058 public: 00059 void Init(void); 00060 int Run(void); 00061 }; 00062 00063 00064 00065 void CSampleNetScheduleClient::Init(void) 00066 { 00067 SetDiagPostFlag(eDPF_Trace); 00068 SetDiagPostLevel(eDiag_Info); 00069 00070 // Setup command line arguments and parameters 00071 00072 // Create command-line argument descriptions class 00073 auto_ptr<CArgDescriptions> arg_desc(new CArgDescriptions); 00074 00075 // Specify USAGE context 00076 arg_desc->SetUsageContext(GetArguments().GetProgramBasename(), 00077 "NetSchedule client"); 00078 00079 arg_desc->AddPositional("service", 00080 "NetSchedule service name", 00081 CArgDescriptions::eString); 00082 00083 arg_desc->AddPositional("queue", 00084 "NetSchedule queue name (like: noname).", 00085 CArgDescriptions::eString); 00086 00087 00088 arg_desc->AddOptionalKey("jcount", 00089 "jcount", 00090 "Number of jobs to submit", 00091 CArgDescriptions::eInteger); 00092 00093 00094 00095 // Setup arg.descriptions for this application 00096 SetupArgDescriptions(arg_desc.release()); 00097 } 00098 00099 00100 int CSampleNetScheduleClient::Run(void) 00101 { 00102 CArgs args = GetArgs(); 00103 const string& service = args["service"].AsString(); 00104 const string& queue_name = args["queue"].AsString(); 00105 00106 unsigned jcount = 100; 00107 if (args["jcount"]) { 00108 jcount = args["jcount"].AsInteger(); 00109 } 00110 CNetScheduleAPI::EJobStatus status; 00111 CNetScheduleAPI cl(service, "client_sample", queue_name); 00112 00113 CNetScheduleSubmitter submitter = cl.GetSubmitter(); 00114 00115 const string input = "Hello " + queue_name; 00116 00117 CNetScheduleJob job(input); 00118 submitter.SubmitJob(job); 00119 NcbiCout << job.job_id << NcbiEndl; 00120 00121 00122 00123 vector<string> jobs; 00124 00125 {{ 00126 CStopWatch sw(CStopWatch::eStart); 00127 00128 NcbiCout << "Submit " << jcount << " jobs..." << NcbiEndl; 00129 00130 for (unsigned i = 0; i < jcount; ++i) { 00131 CNetScheduleJob job(input); 00132 submitter.SubmitJob(job); 00133 jobs.push_back(job.job_id); 00134 if (i % 1000 == 0) { 00135 NcbiCout << "." << flush; 00136 } 00137 } 00138 NcbiCout << NcbiEndl << "Done." << NcbiEndl; 00139 double elapsed = sw.Elapsed(); 00140 double avg = elapsed / jcount; 00141 00142 NcbiCout.setf(IOS_BASE::fixed, IOS_BASE::floatfield); 00143 NcbiCout << "Avg time:" << avg << " sec." << NcbiEndl; 00144 }} 00145 00146 00147 // Waiting for jobs to be done 00148 00149 NcbiCout << "Waiting for jobs..." << jobs.size() << NcbiEndl; 00150 unsigned cnt = 0; 00151 SleepMilliSec(5000); 00152 00153 CNetScheduleAdmin admin = cl.GetAdmin(); 00154 /* 00155 CNetScheduleKeys keys; 00156 admin.RetrieveKeys("status=pending", keys); 00157 00158 for (CNetScheduleKeys::const_iterator it = keys.begin(); 00159 it != keys.end(); ++it) { 00160 cout << string(*it) << endl; 00161 } 00162 */ 00163 00164 unsigned last_jobs = 0; 00165 unsigned no_jobs_executes_cnt = 0; 00166 00167 while (jobs.size()) { 00168 NON_CONST_ITERATE(vector<string>, it, jobs) { 00169 CNetScheduleJob job; 00170 job.job_id = *it; 00171 status = submitter.GetJobDetails(job); 00172 00173 if (status == CNetScheduleAPI::eDone) { 00174 string expected_output = "DONE " + queue_name; 00175 if (job.output != expected_output || job.ret_code != 0) { 00176 ERR_POST("Unexpected output or return code:" + 00177 job.output); 00178 } 00179 jobs.erase(it); 00180 ++cnt; 00181 break; 00182 } else 00183 if (status != CNetScheduleAPI::ePending) { 00184 if (status == CNetScheduleAPI::eJobNotFound) { 00185 NcbiCerr << "Job lost:" << job.job_id << NcbiEndl; 00186 } 00187 jobs.erase(it); 00188 ++cnt; 00189 break; 00190 } 00191 00192 ++cnt; 00193 if (cnt % 1000 == 0) { 00194 NcbiCout << "Waiting for " 00195 << jobs.size() 00196 << " jobs." 00197 << NcbiEndl; 00198 // it is necessary to give system a rest periodically 00199 SleepMilliSec(2000); 00200 // check status of only first 1000 jobs 00201 // since the JS queue execution priority is FIFO 00202 break; 00203 } 00204 } 00205 00206 // check if worker node picks up jobs, otherwise stop 00207 // trying after 10 attempts. 00208 00209 if (jobs.size() == last_jobs) { 00210 ++no_jobs_executes_cnt; 00211 if (no_jobs_executes_cnt == 3) { 00212 NcbiCout << "No progress in job execution. Stopping..." 00213 << NcbiEndl; 00214 break; 00215 } else { 00216 last_jobs = jobs.size(); 00217 } 00218 } 00219 00220 } // while 00221 00222 NcbiCout << NcbiEndl << "Done." << NcbiEndl; 00223 if (jobs.size()) { 00224 NcbiCout << "Remaning job count = " << jobs.size() << NcbiEndl; 00225 } 00226 return 0; 00227 } 00228 00229 00230 int main(int argc, const char* argv[]) 00231 { 00232 return CSampleNetScheduleClient().AppMain(argc, argv); 00233 }
1.7.5.1
Modified on Wed May 23 13:21:16 2012 by modify_doxy.py rev. 337098