NCBI C++ ToolKit
ns_job_serializer.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: ns_job_serializer.cpp 70751 2016-01-15 16:50:24Z sadyrovr $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Dmitry Kazimirov
27  *
28  * File Description: NetSchedule job serialization - implementation.
29  *
30  */
31 
32 #include <ncbi_pch.hpp>
33 
35 
38 
39 #include <corelib/rwstream.hpp>
40 
41 
43 
44 string CNetScheduleJobSerializer::SaveJobInput(const string& target_dir,
45  CNetCacheAPI& nc_api)
46 {
47  string target_file = CDirEntry::ConcatPath(target_dir,
48  m_Job.job_id + ".in");
49 
50  CNcbiOfstream output_stream(target_file.c_str(), CNcbiOfstream::binary);
51 
52  bool need_space = false;
53 
54  if (!m_Job.affinity.empty()) {
55  output_stream << "affinity=\"" <<
57  need_space = true;
58  }
59 
60  if (!m_Job.group.empty()) {
61  if (need_space)
62  output_stream << ' ';
63  output_stream << "group=\"" <<
65  need_space = true;
66  }
67 
69  if (need_space)
70  output_stream << ' ';
71  output_stream << "exclusive";
72  }
73 
74  output_stream << NcbiEndl;
75 
76  CStringOrBlobStorageReader job_input_reader(m_Job.input, nc_api);
77  CRStream job_input_istream(&job_input_reader);
78  NcbiStreamCopy(output_stream, job_input_istream);
79 
80  return target_file;
81 }
82 
83 void CNetScheduleJobSerializer::LoadJobInput(const string& source_file)
84 {
85  CNcbiIfstream input_stream(source_file.c_str(), CNcbiIfstream::binary);
86 
87  if (input_stream.fail() && !input_stream.eof()) {
89  "Error while reading job input from '" << source_file << '\'');
90  }
91 
92  string header;
93  getline(input_stream, header);
94 
95  CAttrListParser attr_parser;
96  attr_parser.Reset(header);
97 
99 
100  CTempString attr_name;
101  string attr_value;
102  size_t attr_column;
103 
104 #define ATTR_POS " at column " << attr_column
105 
106  while ((attr_type = attr_parser.NextAttribute(&attr_name,
107  &attr_value, &attr_column)) != CAttrListParser::eNoMoreAttributes) {
108  if (attr_name == "affinity")
109  m_Job.affinity = attr_value;
110  else if (attr_name == "group")
111  m_Job.group = attr_value;
112  else if (attr_name == "exclusive") {
114  continue;
115  } else {
116  NCBI_THROW_FMT(CArgException, eInvalidArg,
117  "unknown attribute '" << attr_name << "'" ATTR_POS);
118  }
119 
120  if (attr_type != CAttrListParser::eAttributeWithValue) {
121  NCBI_THROW_FMT(CArgException, eInvalidArg,
122  "attribute '" << attr_name << "' requires a value" ATTR_POS);
123  }
124  }
125 
126  if (!input_stream.eof()) {
127  CStringOrBlobStorageWriter job_input_writer(
128  numeric_limits<size_t>().max(), NULL, m_Job.input);
129  CWStream job_input_ostream(&job_input_writer, 0, NULL);
130  NcbiStreamCopy(job_input_ostream, input_stream);
131  }
132 
133  CDirEntry file_name(source_file);
134  m_Job.job_id = file_name.GetBase();
135 }
136 
138  CNetScheduleAPI::EJobStatus job_status,
139  const string& target_dir,
140  CNetCacheAPI& nc_api)
141 {
142  string target_file = CDirEntry::ConcatPath(target_dir,
143  m_Job.job_id + ".out");
144 
145  CNcbiOfstream output_stream(target_file.c_str(), CNcbiOfstream::binary);
146 
147  output_stream <<
148  "job_status=" << CNetScheduleAPI::StatusToString(job_status) <<
149  " ret_code=" << m_Job.ret_code;
150 
151  if (!m_Job.error_msg.empty()) {
152  output_stream << " error_msg=\"" <<
154  }
155 
156  output_stream << NcbiEndl;
157 
158  CStringOrBlobStorageReader job_output_reader(m_Job.output, nc_api);
159  CRStream job_output_istream(&job_output_reader);
160  NcbiStreamCopy(output_stream, job_output_istream);
161 
162  return target_file;
163 }
164 
#define NCBI_THROW_FMT(exception_class, err_code, message)
The same as NCBI_THROW but with message processed as output to ostream.
Definition: ncbiexpt.hpp:561
void Reset(const char *position, const char *eol)
int ret_code
Job return code.
CTempString implements a light-weight string on top of a storage buffer whose lifetime management is ...
Definition: tempstr.hpp:62
String or Blob Storage Reader.
CDirEntry –.
Definition: ncbifile.hpp:263
string GetBase(void) const
Get the base entry name without extension.
Definition: ncbifile.hpp:3863
#define NcbiEndl
Definition: ncbistre.hpp:403
string SaveJobInput(const string &target_dir, CNetCacheAPI &nc_api)
string input
Input data.
#define ATTR_POS
#define NULL
Definition: ncbistd.hpp:225
Reader-writer based streams.
Note about the "buf_size" parameter for streams in this API.
Definition: rwstream.hpp:105
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:101
CArgException –.
Definition: ncbiargs.hpp:119
string job_id
Output job key.
string output
Job result data.
ENextAttributeType NextAttribute(CTempString *attr_name, string *attr_value, size_t *attr_column)
T max(T x_, T y_)
static string StatusToString(EJobStatus status)
Printable status type.
Writer-based output stream.
Definition: rwstream.hpp:142
bool NcbiStreamCopy(CNcbiOstream &os, CNcbiIstream &is)
Copy entire contents of stream "is" into "os".
Definition: ncbistre.cpp:216
static string PrintableString(const CTempString str, TPrintableMode mode=fNewLine_Quote|fNonAscii_Passthru)
Get a printable version of the specified string.
Definition: ncbistr.cpp:3826
const char * file_name[]
EJobStatus
Job status codes.
string SaveJobOutput(CNetScheduleAPI::EJobStatus job_status, const string &target_dir, CNetCacheAPI &nc_api)
String or Blob Storage Writer.
Client API for NetCache server.
void LoadJobInput(const string &source_file)
CNetScheduleAPI::TJobMask mask
IO_PREFIX::ifstream CNcbiIfstream
Portable alias for ifstream.
Definition: ncbistre.hpp:245
static string ConcatPath(const string &first, const string &second)
Concatenate two parts of the path for the current OS.
Definition: ncbifile.cpp:771
IO_PREFIX::ofstream CNcbiOfstream
Portable alias for ofstream.
Definition: ncbistre.hpp:318
Exclusive job - the node executes only this job, even if there are processor resources.
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:98
Modified on Fri Sep 22 15:39:41 2017 by modify_doxy.py rev. 546573