NCBI C++ ToolKit
file_messaging.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: file_messaging.cpp 66600 2015-03-13 12:56:58Z vasilche $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Authors: Paul Thiessen
27 *
28 * File Description:
29 * file-based messaging system
30 *
31 * ===========================================================================
32 */
33 
34 #include <ncbi_pch.hpp>
35 #include <corelib/ncbistd.hpp>
36 #include <corelib/ncbidiag.hpp>
37 #include <corelib/ncbi_system.hpp>
38 #include <corelib/stream_utils.hpp>
39 #include <serial/serial.hpp>
40 #include <serial/objostr.hpp>
41 #include <serial/objostrxml.hpp>
42 #include <serial/objistr.hpp>
43 
44 #include <memory>
45 
46 #include "file_messaging.hpp"
47 
48 
51 
52 // diagnostic streams
53 #define TRACEMSG(stream) ERR_POST(Trace << stream)
54 #define INFOMSG(stream) ERR_POST(Info << stream)
55 #define WARNINGMSG(stream) ERR_POST(Warning << stream)
56 #define ERRORMSG(stream) ERR_POST(Error << stream)
57 #define FATALMSG(stream) ERR_FATAL(stream)
58 
59 
61  const std::string& messageFilename, MessageResponder *responderObject, bool isReadOnly) :
62  manager(parentManager),
63  messageFile(messageFilename), lockFile(string(messageFilename) + ".lock"),
64  responder(responderObject), readOnly(isReadOnly), lastKnownSize(0)
65 {
66  TRACEMSG("monitoring message file " << messageFilename);
67 }
68 
69 static CPIDGuard * CreateLock(const CDirEntry& lockFile)
70 {
71  CPIDGuard* guard = NULL;
72  try {
73  guard = new CPIDGuard(lockFile.GetPath());
74  TRACEMSG("FileMessenger: lock file established: " << lockFile.GetPath());
75  } catch (CPIDGuardException& pidge) {
76 
77  if (pidge.GetErrCode() == CPIDGuardException::eStillRunning) {
78  TRACEMSG("FileMessenger: unable to establish a lock for new PID - old PID is still running\n" << pidge.ReportThis());
79  } else if (pidge.GetErrCode() == CPIDGuardException::eWrite) {
80  TRACEMSG("FileMessenger: write to PID-guarded file failed\n" << pidge.ReportThis());
81  } else {
82  TRACEMSG("FileMessenger: unknown Toolkit PID-guard failure\n" << pidge.ReportAll());
83  }
84 
85  delete guard; // calls CPIDGuard::Release()
86  guard = NULL;
87 
88  } catch (...) {
89  TRACEMSG("FileMessenger: unknown exception while creating lock");
90  delete guard; // calls CPIDGuard::Release()
91  guard = NULL;
92  }
93 
94  return guard;
95 }
96 
98 {
99  // sanity check to make sure each command issued received a reply
100  bool okay = false;
103  if (commandsSent.size() == repliesReceived.size()) {
104  for (c=commandsSent.begin(); c!=ce; ++c) {
106  if (r == repliesReceived.end())
107  break;
108  for (a=c->second.begin(), ae=c->second.end(); a!=ae; ++a) {
109  if (r->second.find(a->first) == r->second.end())
110  break;
111  }
112  if (a != ae) break;
113  }
114  if (c == ce) okay = true;
115  }
116  if (!okay) WARNINGMSG("FileMessenger: did not receive a reply to all commands sent!");
117 
118  // last-minute attempt to write any pending commands to the file
119  if (pendingCommands.size() > 0) {
120  auto_ptr<CPIDGuard> lockStream(CreateLock(lockFile));
121  if (lockStream.get() == NULL) {
122  int nTries = 1;
123  do {
124  SleepSec(1);
125  lockStream.reset(CreateLock(lockFile));
126  ++nTries;
127  } while (lockStream.get() == NULL && nTries <= 30);
128  }
129  if (lockStream.get() != NULL) {
131  }
132  else
133  ERRORMSG("Timeout occurred when attempting to flush pending commands to file");
134 
135  // CPIDGuard pointer cleaned up when auto_ptr goes out of scope.
136  }
137 
138  // sanity check to make sure each command received was sent a reply
139  okay = false;
140  ce = commandsReceived.end();
141  if (commandsReceived.size() == repliesSent.size()) {
142  for (c=commandsReceived.begin(); c!=ce; ++c) {
144  if (r == repliesSent.end())
145  break;
146  for (a=c->second.begin(), ae=c->second.end(); a!=ae; ++a) {
147  if (r->second.find(a->first) == r->second.end())
148  break;
149  }
150  if (a != ae) break;
151  }
152  if (c == ce) okay = true;
153  }
154  if (!okay) ERRORMSG("FileMessenger: did not send a reply to all commands received!");
155 }
156 
157 void FileMessenger::SendCommand(const std::string& targetApp, unsigned long id,
158  const std::string& command, const std::string& data)
159 {
160  if (readOnly) {
161  WARNINGMSG("command '" << command << "' to " << targetApp
162  << " received but not written to read-only message file");
163  return;
164  }
165 
166  // check against record of commands already sent
168  if (c != commandsSent.end() && c->second.find(targetApp) != c->second.end()) {
169  ERRORMSG("Already sent command " << id << " to " << targetApp << '!');
170  return;
171  }
172  commandsSent[id][targetApp] = command;
173 
174  // create a new CommandInfo on the queue - will actually be sent later
175  pendingCommands.resize(pendingCommands.size() + 1);
176  pendingCommands.back().to = targetApp;
177  pendingCommands.back().id = id;
178  pendingCommands.back().command = command;
179  pendingCommands.back().data = data;
180 }
181 
182 void FileMessenger::SendReply(const std::string& targetApp, unsigned long id,
183  MessageResponder::ReplyStatus status, const std::string& data)
184 {
185  // check against record of commands already received and replies already sent
187  if (c == commandsReceived.end() || c->second.find(targetApp) == c->second.end()) {
188  ERRORMSG("Can't reply; have not received command " << id << " from " << targetApp << '!');
189  return;
190  }
192  if (r != repliesSent.end() && r->second.find(targetApp) != r->second.end()) {
193  ERRORMSG("Already sent reply " << id << " to " << targetApp << '!');
194  return;
195  }
196  repliesSent[id][targetApp] = status;
197 
198  if (readOnly) {
199  TRACEMSG("reply " << id << " to " << targetApp
200  << " logged but not written to read-only message file");
201  } else {
202  // create a new CommandInfo on the queue - will actually be sent later
203  pendingCommands.resize(pendingCommands.size() + 1);
204  pendingCommands.back().to = targetApp;
205  pendingCommands.back().id = id;
206  switch (status) {
208  pendingCommands.back().command = "OKAY"; break;
210  pendingCommands.back().command = "ERROR"; break;
211  default:
212  ERRORMSG("Unknown reply status " << status << '!');
213  pendingCommands.back().command = "ERROR"; break;
214  }
215  pendingCommands.back().data = data;
216  }
217 }
218 
220 {
221  // skip all checking if message file doesn't exist or lock file is already present
223  return;
224 
225  // check to see if we need to read file's contents
226  CFile mf(messageFile.GetPath());
227  Int8 messageFileSize = mf.GetLength();
228  if (messageFileSize < 0) {
229  ERRORMSG("Couldn't get message file size!");
230  return;
231  }
232  bool needToRead = (messageFileSize > lastKnownSize);
233 
234  // only continue if have new commands to receive, or have pending commands to send
235  if (!needToRead && pendingCommands.size() == 0)
236  return;
237 
238  TRACEMSG("message file: " << messageFile.GetPath());
239 // TRACEMSG("current size: " << (long) messageFileSize);
240 // TRACEMSG("last known size: " << (long) lastKnownSize);
241  if (needToRead) TRACEMSG("message file has grown since last read");
242  if (pendingCommands.size() > 0) TRACEMSG("has pending commands to send");
243 
244  // since we're going to read or write the file, establish a lock now
245  auto_ptr<CPIDGuard> lockStream(CreateLock(lockFile));
246  if (lockStream.get() == NULL)
247  return; // try again later, so program isn't locked during wait
248 
249  // first read any new commands from the file
250  if (needToRead) ReceiveCommands();
251 
252  // then send any pending commands
253  if (pendingCommands.size() > 0)
255 
256  // now update the size stamp to current size so we don't unnecessarily read in any commands just sent
257  lastKnownSize = mf.GetLength();
258  if (lastKnownSize < 0) {
259  ERRORMSG("Couldn't get message file size!");
260  lastKnownSize = 0;
261  }
262 
263  // CPIDGuard pointer cleaned up when auto_ptr goes out of scope.
264 }
265 
266 static const string COMMAND_END = "### END COMMAND ###";
267 
268 // returns true if some data was read (up to eol) before eof
269 static bool ReadSingleLine(CNcbiIfstream& inStream, string *str)
270 {
271  str->erase();
272  CT_CHAR_TYPE ch;
273  do {
274  ch = inStream.get();
275  if (inStream.bad() || inStream.fail() || CT_EQ_INT_TYPE(CT_TO_INT_TYPE(ch), CT_EOF))
276  return false;
277  else if (CT_EQ_INT_TYPE(CT_TO_INT_TYPE(ch), CT_TO_INT_TYPE('\n')))
278  break;
279  else
280  *str += CT_TO_CHAR_TYPE(ch);
281  } while (1);
282  return true;
283 }
284 
285 // must be called only after lock is established!
287 {
288  TRACEMSG("receiving commands...");
289 
290  auto_ptr<CNcbiIfstream> inStream(new ncbi::CNcbiIfstream(
291  messageFile.GetPath().c_str(), IOS_BASE::in));
292  if (!(*inStream)) {
293  ERRORMSG("cannot open message file for reading!");
294  return;
295  }
296 
297 #define GET_EXPECTED_LINE \
298  if (!ReadSingleLine(*inStream, &line)) { \
299  ERRORMSG("unexpected EOF!"); \
300  return; \
301  }
302 
303 #define SKIP_THROUGH_END_OF_COMMAND \
304  do { \
305  if (!ReadSingleLine(*inStream, &line)) { \
306  ERRORMSG("no end-of-command marker found before EOF!"); \
307  return; \
308  } \
309  if (line == COMMAND_END) break; \
310  } while (1)
311 
312 #define GET_ITEM(ident) \
313  item = string(ident); \
314  if (line.substr(0, item.size()) != item) { \
315  ERRORMSG("Line does not begin with expected '" << item << "'!"); \
316  return; \
317  } \
318  item = line.substr(item.size());
319 
320  string line, item, from;
322  do {
323 
324  // get To: (ignore if not this app)
325  if (!ReadSingleLine(*inStream, &line) || line.size() == 0) {
326  return;
327  }
328 
329  GET_ITEM("To: ")
330  if (item != manager->applicationName) {
332  continue;
333  }
334 
335  // get From:
337  GET_ITEM("From: ")
338  from = item;
339 
340  // get ID:
342  GET_ITEM("ID: ")
343  char *endptr;
344  command.id = strtoul(item.c_str(), &endptr, 10);
345  if (endptr == item.c_str()) {
346  ERRORMSG("Bad " << line << '!');
347  return;
348  }
349 
350  // get command or reply
352  if (line.substr(0, 9) != "Command: " && line.substr(0, 7) != "Reply: ") {
353  ERRORMSG("Line does not begin with expected 'Command: ' or 'Reply: '!");
354  return;
355  }
356  bool isCommand = (line.substr(0, 9) == "Command: ");
357  command.command = line.substr(isCommand ? 9 : 7);
358 
359  // skip commands/replies already read
360  if (isCommand) {
362  if (c != commandsReceived.end() && c->second.find(from) != c->second.end()) {
364  continue;
365  }
366  } else { // reply
368  if (r != repliesReceived.end() && r->second.find(from) != r->second.end()) {
370  continue;
371  }
372  }
373 
374  // get data (all lines up to end marker)
375  command.data.erase();
376  do {
378  if (line == COMMAND_END) break;
379  command.data += line;
380  command.data += '\n';
381  } while (1);
382 
383  // process new commands/replies
384  if (isCommand) {
385  commandsReceived[command.id][from] = command.command;
386  TRACEMSG("processing command " << command.id << " from " << from << ": " << command.command);
387  // command received callback
388  responder->ReceivedCommand(from, command.id, command.command, command.data);
389 
390  } else { // reply
392  if (command.command == "OKAY")
394  else if (command.command != "ERROR")
395  ERRORMSG("Unknown reply status " << command.command << '!');
396  repliesReceived[command.id][from] = status;
397  TRACEMSG("processing reply " << command.id << " from " << from);
398  // reply received callback
399  responder->ReceivedReply(from, command.id, status, command.data);
400  }
401 
402  } while (1);
403 }
404 
405 // must be called only after lock is established!
407 {
408  TRACEMSG("sending commands...");
409  if (pendingCommands.size() == 0)
410  return;
411 
412  auto_ptr<CNcbiOfstream> outStream(new ncbi::CNcbiOfstream(
413  messageFile.GetPath().c_str(), IOS_BASE::out | IOS_BASE::app));
414  if (!(*outStream)) {
415  ERRORMSG("cannot open message file for writing!");
416  return;
417  }
418 
419  CommandList::iterator c, ce = pendingCommands.end();
420  for (c=pendingCommands.begin(); c!=ce; ++c) {
421  // dump the command to the file, noting different syntax for replies
422  bool isReply = (c->command == "OKAY" || c->command == "ERROR");
423  *outStream
424  << "To: " << c->to << '\n'
425  << "From: " << manager->applicationName << '\n'
426  << "ID: " << c->id << '\n'
427  << (isReply ? "Reply: " : "Command: ") << c->command << '\n';
428  if (c->data.size() > 0) {
429  *outStream << c->data;
430  if (c->data[c->data.size() - 1] != '\n') // append \n if data doesn't end with one
431  *outStream << '\n';
432  }
433  *outStream << COMMAND_END << '\n';
434  outStream->flush();
435  TRACEMSG("sent " << (isReply ? "reply " : "command ") << c->id << " to " << c->to);
436  }
437  pendingCommands.clear();
438 }
439 
440 
442  applicationName(appName)
443 {
444 }
445 
447 {
448  FileMessengerList::iterator m, me = messengers.end();
449  for (m=messengers.begin(); m!=me; ++m)
450  delete *m;
451 }
452 
454  const std::string& messageFilename, MessageResponder *responderObject, bool readOnly)
455 {
456  if (!responderObject) {
457  ERRORMSG("CreateNewFileMessenger() - got NULL responderObject!");
458  return NULL;
459  }
460  FileMessenger *newMessenger = new FileMessenger(this, messageFilename, responderObject, readOnly);
461  messengers.push_back(newMessenger);
462  return newMessenger;
463 }
464 
466 {
467  FileMessengerList::iterator f, fe = messengers.end();
468  for (f=messengers.begin(); f!=fe; ++f) {
469  if (*f == messenger) {
470  delete *f;
471  messengers.erase(f);
472  return;
473  }
474  }
475  ERRORMSG("DeleteFileMessenger() - given FileMessenger* not created by this FileMessagingManager!");
476 }
477 
479 {
480  FileMessengerList::iterator f, fe = messengers.end();
481  for (f=messengers.begin(); f!=fe; ++f)
482  (*f)->PollMessageFile();
483 }
484 
485 bool SeqIdToIdentifier(const CRef < ncbi::objects::CSeq_id >& seqID, string& identifier)
486 {
487  if (seqID.Empty())
488  return false;
489  try {
490  CNcbiOstrstream oss;
491  auto_ptr < CObjectOStream > osa(CObjectOStream::Open(eSerial_Xml, oss, eNoOwnership));
492  osa->SetUseIndentation(false);
493  CObjectOStreamXml *osx = dynamic_cast<CObjectOStreamXml*>(osa.get());
494  if (osx)
495  osx->SetReferenceDTD(false);
496  *osa << *seqID;
497  identifier = CNcbiOstrstreamToString(oss);
498  NStr::ReplaceInPlace(identifier, "\n", "");
499  NStr::ReplaceInPlace(identifier, "\r", "");
500  NStr::ReplaceInPlace(identifier, "\t", "");
501  if (NStr::StartsWith(identifier, "<?")) {
502  SIZE_TYPE endb = NStr::Find(identifier, "?>");
503  identifier = identifier.substr(endb + 2);
504  }
505  return true;
506  } catch (...) {
507  return false;
508  }
509 }
510 
511 bool IdentifierToSeqId(const string& identifier, CRef < ncbi::objects::CSeq_id >& seqID)
512 {
513  try {
514  CNcbiIstrstream iss(identifier.data(), identifier.size());
515  auto_ptr < CObjectIStream > isa(CObjectIStream::Open(eSerial_Xml, iss, eNoOwnership));
516  seqID.Reset(new CSeq_id());
517  *isa >> *seqID;
518  return true;
519  } catch (...) {
520  return false;
521  }
522 }
523 
static CObjectOStream * Open(ESerialDataFormat format, CNcbiOstream &outStream, bool deleteOutStream)
Create serial object writer and attach it to an output stream.
Definition: objostr.cpp:126
size_type size() const
Definition: map.hpp:148
const FileMessagingManager *const manager
#define GET_ITEM(ident)
const string applicationName
#define CT_EOF
Definition: ncbistre.hpp:563
FileMessenger(FileMessagingManager *parentManager, const string &messageFilename, MessageResponder *responderObject, bool isReadOnly)
No ownership is assumed.
Definition: ncbi_types.h:135
CObjectOStreamXml –.
Definition: objostrxml.hpp:53
std::ofstream out("events_result.xml")
main entry point for tests
static string & ReplaceInPlace(string &src, const string &search, const string &replace, SIZE_TYPE start_pos=0, SIZE_TYPE max_replace=0, SIZE_TYPE *num_replace=0)
Replace occurrences of a substring within a string.
Definition: ncbistr.cpp:3306
CDirEntry –.
Definition: ncbifile.hpp:263
#define SKIP_THROUGH_END_OF_COMMAND
static Messenger messenger
Definition: messenger.cpp:71
CPIDGuard – Process guard.
string
Definition: cgiapp.hpp:514
Unable to write into the PID file.
signed NCBI_INT8_TYPE Int8
Signed 8 byte sized integer.
Definition: ncbitype.h:143
void ReceiveCommands(void)
const string & GetPath(void) const
Get entry path.
Definition: ncbifile.hpp:3879
static CPIDGuard * CreateLock(const CDirEntry &lockFile)
#define NULL
Definition: ncbistd.hpp:225
USING_SCOPE(objects)
static bool ReadSingleLine(CNcbiIfstream &inStream, string *str)
void DeleteFileMessenger(FileMessenger *messenger)
static CObjectIStream * Open(ESerialDataFormat format, CNcbiIstream &inStream, bool deleteInStream)
Create serial object reader and attach it to an input stream.
Definition: objistr.cpp:195
const CDirEntry lockFile
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:101
const_iterator end() const
Definition: map.hpp:152
void SleepSec(unsigned long sec, EInterruptOnSignal onsignal=eRestartOnSignal)
Sleep.
static SIZE_TYPE Find(const CTempString str, const CTempString pattern, ECase use_case=eCase, EDirection direction=eForwardSearch, SIZE_TYPE occurrence=0)
Find the pattern in the string.
Definition: ncbistr.cpp:2786
const_iterator find(const key_type &key) const
Definition: map.hpp:153
CFile –.
Definition: ncbifile.hpp:1596
bool Empty(void) const THROWS_NONE
Check if CRef is empty – not pointing to any object, which means having a null value.
Definition: ncbiobj.hpp:714
#define CT_EQ_INT_TYPE
Definition: ncbistre.hpp:567
IO_PREFIX::istrstream CNcbiIstrstream
Portable alias for istrstream.
Definition: ncbistre.hpp:151
double f(double x_, const double &y_)
Definition: njn_root.hpp:188
virtual void ReceivedReply(const string &fromApp, unsigned long id, ReplyStatus status, const string &data)=0
#define CT_CHAR_TYPE
Definition: ncbistre.hpp:560
static bool StartsWith(const CTempString str, const CTempString start, ECase use_case=eCase)
Check if a string starts with a specified prefix value.
Definition: ncbistr.hpp:5141
void PollMessageFile(void)
FileMessenger * CreateNewFileMessenger(const string &messageFilename, MessageResponder *responderObject, bool readOnly)
const char * command
virtual bool Exists(void) const
Check the entry existence.
Definition: ncbifile.cpp:2317
void SendCommand(const string &targetApp, unsigned long id, const string &command, const string &data)
MessageResponder *const responder
CommandReplies repliesSent
#define CT_TO_INT_TYPE
Definition: ncbistre.hpp:565
Int8 GetLength(void) const
Get size of file.
Definition: ncbifile.cpp:3159
#define ERRORMSG(stream)
friend class FileMessenger
if(yy_accept[yy_current_state])
CommandReplies repliesReceived
const GenericPointer< typename T::ValueType > T2 T::AllocatorType & a
Definition: pointer.h:1084
CPIDGuardException –.
#define GET_EXPECTED_LINE
CRef –.
Definition: ncbiobj.hpp:616
Defines NCBI C++ diagnostic APIs, classes, and macros.
void SetReferenceDTD(bool use_dtd=true)
Make generated XML document reference DTD.
Definition: objostrxml.cpp:138
CommandOriginators commandsReceived
FileMessengerList messengers
static const string COMMAND_END
#define CT_TO_CHAR_TYPE
Definition: ncbistre.hpp:566
virtual void ReceivedCommand(const string &fromApp, unsigned long id, const string &command, const string &data)=0
NCBI_NS_STD::string::size_type SIZE_TYPE
Definition: ncbistr.hpp:130
const bool readOnly
IO_PREFIX::ostrstream CNcbiOstrstream
Portable alias for ostrstream.
Definition: ncbistre.hpp:155
void SendReply(const string &targetApp, unsigned long id, MessageResponder::ReplyStatus status, const string &data)
IO_PREFIX::ifstream CNcbiIfstream
Portable alias for ifstream.
Definition: ncbistre.hpp:245
FileMessagingManager(const string &appName)
double r(size_t dimension_, const Int4 *score_, const double *prob_, double theta_)
const_iterator begin() const
Definition: map.hpp:151
bool SeqIdToIdentifier(const CRef< ncbi::objects::CSeq_id > &seqID, string &identifier)
std::istream & in(std::istream &in_, double &x_)
static const char * str(char *buf, int n)
Definition: stats.c:84
#define TRACEMSG(stream)
IO_PREFIX::ofstream CNcbiOfstream
Portable alias for ofstream.
Definition: ncbistre.hpp:318
#define WARNINGMSG(stream)
bool IdentifierToSeqId(const string &identifier, CRef< ncbi::objects::CSeq_id > &seqID)
CommandList pendingCommands
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:98
bool IsFile(EFollowLinks follow=eFollowLinks) const
Check whether a directory entry is a file.
Definition: ncbifile.hpp:3909
void Reset(void)
Reset reference object.
Definition: ncbiobj.hpp:768
CommandOriginators commandsSent
The process listed in the file is still around.
const CDirEntry messageFile
void SendPendingCommands(void)
Modified on Mon Apr 23 11:39:40 2018 by modify_doxy.py rev. 546573