include/serial/rpcbase.hpp

Go to the documentation of this file.
00001 #ifndef SERIAL___RPCBASE__HPP
00002 #define SERIAL___RPCBASE__HPP
00003 
00004 /*  $Id: rpcbase.hpp 136935 2008-08-08 17:54:05Z lavr $
00005  * ===========================================================================
00006  *
00007  *                            PUBLIC DOMAIN NOTICE
00008  *               National Center for Biotechnology Information
00009  *
00010  *  This software/database is a "United States Government Work" under the
00011  *  terms of the United States Copyright Act.  It was written as part of
00012  *  the author's official duties as a United States Government employee and
00013  *  thus cannot be copyrighted.  This software/database is freely available
00014  *  to the public for use. The National Library of Medicine and the U.S.
00015  *  Government have not placed any restriction on its use or reproduction.
00016  *
00017  *  Although all reasonable efforts have been taken to ensure the accuracy
00018  *  and reliability of the software and data, the NLM and the U.S.
00019  *  Government do not and cannot warrant the performance or results that
00020  *  may be obtained by using this software or data. The NLM and the U.S.
00021  *  Government disclaim all warranties, express or implied, including
00022  *  warranties of performance, merchantability or fitness for any particular
00023  *  purpose.
00024  *
00025  *  Please cite the author in any work or product based on this material.
00026  *
00027  * ===========================================================================
00028  *
00029  * Author:  Aaron Ucko, NCBI
00030  *
00031  * File Description:
00032  *   Generic template class for ASN.1/XML RPC clients
00033  *
00034  */
00035 
00036 #include <connect/ncbi_conn_stream.hpp>
00037 #include <corelib/ncbimtx.hpp>
00038 #include <serial/objistr.hpp>
00039 #include <serial/objostr.hpp>
00040 #include <serial/serial.hpp>
00041 
00042 
00043 /** @addtogroup GenClassSupport
00044  *
00045  * @{
00046  */
00047 
00048 
00049 BEGIN_NCBI_SCOPE
00050 
00051 /// CRPCClient -- prototype client for ASN.1/XML-based RPC.
00052 /// Normally connects automatically on the first real request and
00053 /// disconnects automatically in the destructor, but allows both events
00054 /// to occur explicitly.
00055 
00056 template <class TRequest, class TReply>
00057 class CRPCClient : virtual public CConnIniter,
00058                    public CObject
00059 {
00060 public:
00061     CRPCClient(const string&     service     = kEmptyStr,
00062                ESerialDataFormat format      = eSerial_AsnBinary,
00063                unsigned int      retry_limit = 3)
00064         : m_Service(service), m_Format(format), m_Timeout(kDefaultTimeout),
00065           m_RetryLimit(retry_limit)
00066         { }
00067     virtual ~CRPCClient(void);
00068 
00069     virtual void Ask(const TRequest& request, TReply& reply);
00070             void Connect(void);
00071             void Disconnect(void);
00072             void Reset(void);
00073 
00074     const string& GetService(void)                  { return m_Service; }
00075              void SetService(const string& service) { m_Service = service; }
00076 
00077     ESerialDataFormat GetFormat(void)                  { return m_Format; }
00078                  void SetFormat(ESerialDataFormat fmt) { m_Format = fmt; }
00079 
00080     unsigned int GetRetryLimit(void)           { return m_RetryLimit; }
00081             void SetRetryLimit(unsigned int n) { m_RetryLimit = n; }
00082 
00083     EIO_Status      SetTimeout(const STimeout* timeout,
00084                                EIO_Event direction = eIO_ReadWrite);
00085     const STimeout* GetTimeout(EIO_Event direction = eIO_Read);
00086 
00087 protected:
00088     virtual string GetAffinity(const TRequest& request) const;
00089               void SetAffinity(const string& affinity);
00090 
00091     /// These run with m_Mutex already acquired.
00092     virtual void x_Connect(void);
00093     virtual void x_Disconnect(void);
00094             void x_SetStream(CNcbiIostream* stream);
00095     /// Connect to a URL.  (Discouraged; please establish and use a
00096     /// suitable named service if possible.)
00097             void x_ConnectURL(const string& url);
00098 
00099     /// Retry policy; by default, just _TRACEs the event and returns
00100     /// true.  May reset the connection (or do anything else, really),
00101     /// but note that Ask will already automatically reconnect if the
00102     /// stream is explicitly bad.  (Ask also takes care of enforcing
00103     /// m_RetryLimit.)
00104     virtual bool x_ShouldRetry(unsigned int tries);
00105 
00106 private:
00107     static bool x_IsSpecial(const STimeout* timeout)
00108         { return timeout == kDefaultTimeout  ||  timeout == kInfiniteTimeout; }
00109 
00110     typedef CRPCClient<TRequest, TReply> TSelf;
00111     /// Prohibit default copy constructor and assignment operator.
00112     CRPCClient(const TSelf& x);
00113     bool operator= (const TSelf& x);
00114 
00115     auto_ptr<CNcbiIostream>  m_Stream;
00116     auto_ptr<CObjectIStream> m_In;
00117     auto_ptr<CObjectOStream> m_Out;
00118     string                   m_Service; ///< Used by default Connect().
00119     string                   m_Affinity;
00120     ESerialDataFormat        m_Format;
00121     CMutex                   m_Mutex;   ///< To allow sharing across threads.
00122     const STimeout*          m_Timeout; ///< Cloned if not special.
00123 
00124 protected:
00125     unsigned int             m_RetryLimit;
00126 };
00127 
00128 
00129 ///////////////////////////////////////////////////////////////////////////
00130 // Inline methods
00131 
00132 
00133 template <class TRequest, class TReply>
00134 inline
00135 CRPCClient<TRequest, TReply>::~CRPCClient(void)
00136 {
00137     Disconnect();
00138     if ( !x_IsSpecial(m_Timeout) ) {
00139         delete const_cast<STimeout*>(m_Timeout);
00140     }
00141 }
00142 
00143 
00144 template <class TRequest, class TReply>
00145 inline
00146 void CRPCClient<TRequest, TReply>::Connect(void)
00147 {
00148     if (m_Stream.get()  &&  m_Stream->good()) {
00149         return; // already connected
00150     }
00151     CMutexGuard LOCK(m_Mutex);
00152     // repeat test with mutex held to avoid races
00153     if (m_Stream.get()  &&  m_Stream->good()) {
00154         return; // already connected
00155     }
00156     x_Connect();
00157 }
00158 
00159 
00160 template <class TRequest, class TReply>
00161 inline
00162 void CRPCClient<TRequest, TReply>::Disconnect(void)
00163 {
00164     CMutexGuard LOCK(m_Mutex);
00165     if ( !m_Stream.get()  ||  !m_Stream->good() ) {
00166         // not connected -- don't call x_Disconnect, which might
00167         // temporarily reconnect to send a fini!
00168         return;
00169     }
00170     x_Disconnect();
00171 }
00172 
00173 
00174 template <class TRequest, class TReply>
00175 inline
00176 void CRPCClient<TRequest, TReply>::Reset(void)
00177 {
00178     CMutexGuard LOCK(m_Mutex);
00179     if (m_Stream.get()  &&  m_Stream->good()) {
00180         x_Disconnect();
00181     }
00182     x_Connect();
00183 }
00184 
00185 
00186 template <class TRequest, class TReply>
00187 inline
00188 string CRPCClient<TRequest, TReply>::GetAffinity(const TRequest& ) const
00189 {
00190     return kEmptyStr;
00191 }
00192 
00193 
00194 template <class TRequest, class TReply>
00195 inline
00196 void CRPCClient<TRequest, TReply>::SetAffinity(const string& affinity)
00197 {
00198     if (affinity != m_Affinity) {
00199         Disconnect();
00200     }
00201     m_Affinity = affinity;
00202 }
00203 
00204 
00205 template <class TRequest, class TReply>
00206 inline
00207 EIO_Status CRPCClient<TRequest, TReply>::SetTimeout(const STimeout* timeout,
00208                                                     EIO_Event direction)
00209 {
00210     // save for future use, especially if there's no stream at present.
00211     {{
00212         const STimeout* old_timeout = m_Timeout;
00213         if (x_IsSpecial(timeout)) {
00214             m_Timeout = timeout;
00215         } else { // make a copy
00216             m_Timeout = new STimeout(*timeout);
00217         }
00218         if ( !x_IsSpecial(old_timeout) ) {
00219             delete const_cast<STimeout*>(old_timeout);
00220         }
00221     }}
00222 
00223     CConn_IOStream* conn_stream
00224         = dynamic_cast<CConn_IOStream*>(m_Stream.get());
00225     if (conn_stream) {
00226         return CONN_SetTimeout(conn_stream->GetCONN(), direction, timeout);
00227     } else if ( !m_Stream.get() ) {
00228         return eIO_Success; // we've saved it, which is the best we can do...
00229     } else {
00230         return eIO_NotSupported;
00231     }
00232 }
00233 
00234 
00235 template <class TRequest, class TReply>
00236 inline
00237 const STimeout* CRPCClient<TRequest, TReply>::GetTimeout(EIO_Event direction)
00238 {
00239     CConn_IOStream* conn_stream
00240         = dynamic_cast<CConn_IOStream*>(m_Stream.get());
00241     if (conn_stream) {
00242         return CONN_GetTimeout(conn_stream->GetCONN(), direction);
00243     } else {
00244         return m_Timeout;
00245     }
00246 }
00247 
00248 
00249 template <class TRequest, class TReply>
00250 inline
00251 void CRPCClient<TRequest, TReply>::Ask(const TRequest& request, TReply& reply)
00252 {
00253     CMutexGuard LOCK(m_Mutex);
00254     
00255     unsigned int tries = 0;
00256     for (;;) {
00257         try {
00258             SetAffinity(GetAffinity(request));
00259             Connect(); // No-op if already connected
00260             *m_Out << request;
00261             *m_In >> reply;
00262             break;
00263         } catch (CException& e) {
00264             // Some exceptions tend to correspond to transient glitches;
00265             // the remainder, however, may as well get propagated immediately.
00266             if ( !dynamic_cast<CSerialException*>(&e)
00267                  &&  !dynamic_cast<CIOException*>(&e) ) {
00268                 throw;
00269             } else if (++tries == m_RetryLimit  ||  !x_ShouldRetry(tries) ) {
00270                 throw;
00271             } else if ( !(tries % 2) ) {
00272                 // reset on every other attempt in case we're out of sync
00273                 try {
00274                     Reset();
00275                 } STD_CATCH_ALL_XX(Serial_RPCClient, 1, "CRPCClient<>::Reset()")
00276             }
00277         }
00278     }
00279 }
00280 
00281 
00282 template <class TRequest, class TReply>
00283 inline
00284 void CRPCClient<TRequest, TReply>::x_Connect(void)
00285 {
00286     _ASSERT( !m_Service.empty() );
00287     SConnNetInfo* net_info;
00288     if (!m_Affinity.empty()) {
00289         net_info = ConnNetInfo_Create(m_Service.c_str());
00290         ConnNetInfo_PostOverrideArg(net_info, m_Affinity.c_str(), 0);
00291     } else {
00292         net_info = 0;
00293     }
00294     x_SetStream(new CConn_ServiceStream(m_Service, fSERV_Any, net_info, 0,
00295                                         m_Timeout));
00296     // No-op on NULL net_info
00297     ConnNetInfo_Destroy(net_info);
00298 }
00299 
00300 
00301 template <class TRequest, class TReply>
00302 inline
00303 void CRPCClient<TRequest, TReply>::x_Disconnect(void)
00304 {
00305     m_In.reset();
00306     m_Out.reset();
00307     m_Stream.reset();
00308 }
00309 
00310 
00311 template <class TRequest, class TReply>
00312 inline
00313 void CRPCClient<TRequest, TReply>::x_SetStream(CNcbiIostream* stream)
00314 {
00315     m_In .reset();
00316     m_Out.reset();
00317     m_Stream.reset(stream);
00318     m_In .reset(CObjectIStream::Open(m_Format, *stream));
00319     m_Out.reset(CObjectOStream::Open(m_Format, *stream));
00320 }
00321 
00322 
00323 template <class TRequest, class TReply>
00324 inline
00325 void CRPCClient<TRequest, TReply>::x_ConnectURL(const string& url)
00326 {
00327     x_SetStream(new CConn_HttpStream(url, fHCC_AutoReconnect, m_Timeout));
00328 }
00329 
00330 
00331 template <class TRequest, class TReply>
00332 inline
00333 bool CRPCClient<TRequest, TReply>::x_ShouldRetry(unsigned int tries)
00334 {
00335     _TRACE("CRPCClient<>::x_ShouldRetry: retrying after " << tries
00336            << " failures");
00337     return true;
00338 }
00339 
00340 
00341 END_NCBI_SCOPE
00342 
00343 
00344 /* @} */
00345 
00346 #endif  /* SERIAL___RPCBASE__HPP */
00347 
00348 

Generated on Sun Dec 6 22:15:38 2009 for NCBI C++ ToolKit by  doxygen 1.4.6
Modified on Mon Dec 07 16:20:49 2009 by modify_doxy.py rev. 173732