NCBI C++ ToolKit
netschedule_client_sample.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: netschedule_client_sample.cpp 71417 2016-03-03 16:46:54Z gouriano $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Anatoliy Kuznetsov
27  *
28  * File Description: NetSchedule client test
29  *
30  */
31 
32 #include <ncbi_pch.hpp>
33 #include <corelib/ncbiapp.hpp>
34 #include <corelib/ncbiargs.hpp>
35 #include <corelib/ncbienv.hpp>
36 #include <corelib/ncbireg.hpp>
37 #include <corelib/ncbi_system.hpp>
38 #include <corelib/ncbimisc.hpp>
39 
41 #include <connect/ncbi_socket.hpp>
43 #include <connect/ncbi_types.h>
44 
45 
47 
48 
49 ///////////////////////////////////////////////////////////////////////
50 
51 
52 /// Sample application
53 ///
54 /// @internal
55 ///
57 {
58 public:
59  void Init(void);
60  int Run(void);
61 };
62 
63 
64 
66 {
69 
70  // Setup command line arguments and parameters
71 
72  // Create command-line argument descriptions class
73  auto_ptr<CArgDescriptions> arg_desc(new CArgDescriptions);
74 
75  // Specify USAGE context
76  arg_desc->SetUsageContext(GetArguments().GetProgramBasename(),
77  "NetSchedule client");
78 
79  arg_desc->AddPositional("service",
80  "NetSchedule service name",
82 
83  arg_desc->AddPositional("queue",
84  "NetSchedule queue name (like: noname).",
86 
87 
88  arg_desc->AddOptionalKey("jcount",
89  "jcount",
90  "Number of jobs to submit",
92 
93 
94 
95  // Setup arg.descriptions for this application
96  SetupArgDescriptions(arg_desc.release());
97 }
98 
99 
101 {
102  const CArgs& args = GetArgs();
103  const string& service = args["service"].AsString();
104  const string& queue_name = args["queue"].AsString();
105 
106  unsigned jcount = 100;
107  if (args["jcount"]) {
108  jcount = args["jcount"].AsInteger();
109  }
111  CNetScheduleAPI cl(service, "client_sample", queue_name);
112 
113  CNetScheduleSubmitter submitter = cl.GetSubmitter();
114 
115  const string input = "Hello " + queue_name;
116 
117  CNetScheduleJob job(input);
118  submitter.SubmitJob(job);
119  NcbiCout << job.job_id << NcbiEndl;
120 
121 
122 
123  vector<string> jobs;
124 
125  {{
127 
128  NcbiCout << "Submit " << jcount << " jobs..." << NcbiEndl;
129 
130  for (unsigned i = 0; i < jcount; ++i) {
131  CNetScheduleJob job(input);
132  submitter.SubmitJob(job);
133  jobs.push_back(job.job_id);
134  if (i % 1000 == 0) {
135  NcbiCout << "." << flush;
136  }
137  }
138  NcbiCout << NcbiEndl << "Done." << NcbiEndl;
139  double elapsed = sw.Elapsed();
140  double avg = elapsed / jcount;
141 
142  NcbiCout.setf(IOS_BASE::fixed, IOS_BASE::floatfield);
143  NcbiCout << "Avg time:" << avg << " sec." << NcbiEndl;
144  }}
145 
146 
147  // Waiting for jobs to be done
148 
149  NcbiCout << "Waiting for jobs..." << jobs.size() << NcbiEndl;
150  unsigned cnt = 0;
151  SleepMilliSec(5000);
152 
153  CNetScheduleAdmin admin = cl.GetAdmin();
154  /*
155  CNetScheduleKeys keys;
156  admin.RetrieveKeys("status=pending", keys);
157 
158  for (CNetScheduleKeys::const_iterator it = keys.begin();
159  it != keys.end(); ++it) {
160  cout << string(*it) << endl;
161  }
162  */
163 
164  unsigned last_jobs = 0;
165  unsigned no_jobs_executes_cnt = 0;
166 
167  while (jobs.size()) {
168  NON_CONST_ITERATE(vector<string>, it, jobs) {
169  CNetScheduleJob job;
170  job.job_id = *it;
171  status = submitter.GetJobDetails(job);
172 
173  if (status == CNetScheduleAPI::eDone) {
174  string expected_output = "DONE " + queue_name;
175  if (job.output != expected_output || job.ret_code != 0) {
176  ERR_POST("Unexpected output or return code:" +
177  job.output);
178  }
179  jobs.erase(it);
180  ++cnt;
181  break;
182  } else
183  if (status != CNetScheduleAPI::ePending) {
184  if (status == CNetScheduleAPI::eJobNotFound) {
185  NcbiCerr << "Job lost:" << job.job_id << NcbiEndl;
186  }
187  jobs.erase(it);
188  ++cnt;
189  break;
190  }
191 
192  ++cnt;
193  if (cnt % 1000 == 0) {
194  NcbiCout << "Waiting for "
195  << jobs.size()
196  << " jobs."
197  << NcbiEndl;
198  // it is necessary to give system a rest periodically
199  SleepMilliSec(2000);
200  // check status of only first 1000 jobs
201  // since the JS queue execution priority is FIFO
202  break;
203  }
204  }
205 
206  // check if worker node picks up jobs, otherwise stop
207  // trying after 10 attempts.
208 
209  if (jobs.size() == last_jobs) {
210  ++no_jobs_executes_cnt;
211  if (no_jobs_executes_cnt == 3) {
212  NcbiCout << "No progress in job execution. Stopping..."
213  << NcbiEndl;
214  break;
215  } else {
216  last_jobs = jobs.size();
217  }
218  }
219 
220  } // while
221 
222  NcbiCout << NcbiEndl << "Done." << NcbiEndl;
223  if (jobs.size()) {
224  NcbiCout << "Remaning job count = " << jobs.size() << NcbiEndl;
225  }
226  return 0;
227 }
228 
229 
230 int NcbiSys_main(int argc, ncbi::TXChar* argv[])
231 {
232  return CSampleNetScheduleClient().AppMain(argc, argv);
233 }
CStopWatch –.
Definition: ncbitime.hpp:1891
#define NcbiCerr
Definition: ncbistre.hpp:399
#define NON_CONST_ITERATE(Type, Var, Cont)
Non constant version of ITERATE macro.
Definition: ncbimisc.hpp:783
int ret_code
Job return code.
int Run(void)
Run the application.
Start timer immediately after creating.
Definition: ncbitime.hpp:1896
static unsigned cnt[256]
Defines command line argument related classes.
virtual void SetupArgDescriptions(CArgDescriptions *arg_desc)
Setup the command line argument descriptions.
Definition: ncbiapp.cpp:926
#define NcbiCout
Definition: ncbistre.hpp:398
EDiagSev SetDiagPostLevel(EDiagSev post_sev=eDiag_Error)
Set the threshold severity for posting the messages.
Definition: ncbidiag.cpp:5450
#define NcbiEndl
Definition: ncbistre.hpp:403
Job description.
Defines unified interface to application:
int AppMain(int argc, const char *const *argv, const char *const *envp=0, EAppDiagStream diag=eDS_Default, const char *conf=NcbiEmptyCStr, const string &name=NcbiEmptyString)
Main function (entry point) for the NCBI application.
Definition: ncbiapp.cpp:611
int i
Default flags to use when tracing.
Definition: ncbidiag.hpp:715
NetSchedule client specs.
Job is ready (computed successfully)
Convertible into an integer number (int or Int8)
Definition: ncbiargs.hpp:566
CNetScheduleAPI::EJobStatus GetJobDetails(CNetScheduleJob &job, time_t *job_exptime=NULL, ENetScheduleQueuePauseMode *pause_mode=NULL)
Get full information about the specified job.
void SetDiagPostFlag(EDiagPostFlag flag)
Set the specified flag (globally).
Definition: ncbidiag.cpp:5391
Informational message.
Definition: ncbidiag.hpp:645
Smart pointer to the job submission part of the NetSchedule API.
Waiting for execution.
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
Definition: ncbidiag.hpp:185
char TXChar
Definition: ncbistr.hpp:169
int NcbiSys_main(int argc, ncbi::TXChar *argv[])
string job_id
Output job key.
string output
Job result data.
virtual const CArgs & GetArgs(void) const
Get parsed command line arguments.
Definition: ncbiapp.cpp:190
CArgDescriptions –.
Definition: ncbiargs.hpp:514
void SleepMilliSec(unsigned long ml_sec, EInterruptOnSignal onsignal=eRestartOnSignal)
An arbitrary string.
Definition: ncbiargs.hpp:563
Process information in the NCBI Registry, including working with configuration files.
Miscellaneous common-use basic types and functionality.
const CNcbiArguments & GetArguments(void) const
Get the application's cached unprocessed command-line arguments.
Definition: ncbiapp.hpp:681
CArgs –.
Definition: ncbiargs.hpp:356
static CStopWatch sw
Defines the CNcbiApplication and CAppException classes for creating NCBI applications.
Client API for NCBI NetSchedule server.
double Elapsed(void) const
Return time elapsed since first Start() or last Restart() call (in seconds).
Definition: ncbitime.hpp:2730
EJobStatus
Job status codes.
CNetScheduleAdmin GetAdmin()
void Init(void)
Initialize the application.
CNcbiApplication –.
Definition: ncbiapp.hpp:120
CNetScheduleSubmitter GetSubmitter()
Create an instance of CNetScheduleSubmitter.
string SubmitJob(CNetScheduleNewJob &job)
Submit job.
static int input()
Modified on Mon May 29 17:00:46 2017 by modify_doxy.py rev. 533848