00001 #ifndef SERIAL___RPCBASE__HPP
00002 #define SERIAL___RPCBASE__HPP
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 #include <connect/ncbi_conn_stream.hpp>
00037 #include <corelib/ncbimtx.hpp>
00038 #include <serial/objistr.hpp>
00039 #include <serial/objostr.hpp>
00040 #include <serial/serial.hpp>
00041
00042
00043
00044
00045
00046
00047
00048
00049 BEGIN_NCBI_SCOPE
00050
00051
00052
00053
00054
00055
00056 template <class TRequest, class TReply>
00057 class CRPCClient : virtual public CConnIniter,
00058 public CObject
00059 {
00060 public:
00061 CRPCClient(const string& service = kEmptyStr,
00062 ESerialDataFormat format = eSerial_AsnBinary,
00063 unsigned int retry_limit = 3)
00064 : m_Service(service), m_Format(format), m_Timeout(kDefaultTimeout),
00065 m_RetryLimit(retry_limit)
00066 { }
00067 virtual ~CRPCClient(void);
00068
00069 virtual void Ask(const TRequest& request, TReply& reply);
00070 void Connect(void);
00071 void Disconnect(void);
00072 void Reset(void);
00073
00074 const string& GetService(void) { return m_Service; }
00075 void SetService(const string& service) { m_Service = service; }
00076
00077 ESerialDataFormat GetFormat(void) { return m_Format; }
00078 void SetFormat(ESerialDataFormat fmt) { m_Format = fmt; }
00079
00080 unsigned int GetRetryLimit(void) { return m_RetryLimit; }
00081 void SetRetryLimit(unsigned int n) { m_RetryLimit = n; }
00082
00083 EIO_Status SetTimeout(const STimeout* timeout,
00084 EIO_Event direction = eIO_ReadWrite);
00085 const STimeout* GetTimeout(EIO_Event direction = eIO_Read);
00086
00087 protected:
00088 virtual string GetAffinity(const TRequest& request) const;
00089 void SetAffinity(const string& affinity);
00090
00091
00092 virtual void x_Connect(void);
00093 virtual void x_Disconnect(void);
00094 void x_SetStream(CNcbiIostream* stream);
00095
00096
00097 void x_ConnectURL(const string& url);
00098
00099
00100
00101
00102
00103
00104 virtual bool x_ShouldRetry(unsigned int tries);
00105
00106 private:
00107 static bool x_IsSpecial(const STimeout* timeout)
00108 { return timeout == kDefaultTimeout || timeout == kInfiniteTimeout; }
00109
00110 typedef CRPCClient<TRequest, TReply> TSelf;
00111
00112 CRPCClient(const TSelf& x);
00113 bool operator= (const TSelf& x);
00114
00115 auto_ptr<CNcbiIostream> m_Stream;
00116 auto_ptr<CObjectIStream> m_In;
00117 auto_ptr<CObjectOStream> m_Out;
00118 string m_Service;
00119 string m_Affinity;
00120 ESerialDataFormat m_Format;
00121 CMutex m_Mutex;
00122 const STimeout* m_Timeout;
00123
00124 protected:
00125 unsigned int m_RetryLimit;
00126 };
00127
00128
00129
00130
00131
00132
00133 template <class TRequest, class TReply>
00134 inline
00135 CRPCClient<TRequest, TReply>::~CRPCClient(void)
00136 {
00137 Disconnect();
00138 if ( !x_IsSpecial(m_Timeout) ) {
00139 delete const_cast<STimeout*>(m_Timeout);
00140 }
00141 }
00142
00143
00144 template <class TRequest, class TReply>
00145 inline
00146 void CRPCClient<TRequest, TReply>::Connect(void)
00147 {
00148 if (m_Stream.get() && m_Stream->good()) {
00149 return;
00150 }
00151 CMutexGuard LOCK(m_Mutex);
00152
00153 if (m_Stream.get() && m_Stream->good()) {
00154 return;
00155 }
00156 x_Connect();
00157 }
00158
00159
00160 template <class TRequest, class TReply>
00161 inline
00162 void CRPCClient<TRequest, TReply>::Disconnect(void)
00163 {
00164 CMutexGuard LOCK(m_Mutex);
00165 if ( !m_Stream.get() || !m_Stream->good() ) {
00166
00167
00168 return;
00169 }
00170 x_Disconnect();
00171 }
00172
00173
00174 template <class TRequest, class TReply>
00175 inline
00176 void CRPCClient<TRequest, TReply>::Reset(void)
00177 {
00178 CMutexGuard LOCK(m_Mutex);
00179 if (m_Stream.get() && m_Stream->good()) {
00180 x_Disconnect();
00181 }
00182 x_Connect();
00183 }
00184
00185
00186 template <class TRequest, class TReply>
00187 inline
00188 string CRPCClient<TRequest, TReply>::GetAffinity(const TRequest& ) const
00189 {
00190 return kEmptyStr;
00191 }
00192
00193
00194 template <class TRequest, class TReply>
00195 inline
00196 void CRPCClient<TRequest, TReply>::SetAffinity(const string& affinity)
00197 {
00198 if (affinity != m_Affinity) {
00199 Disconnect();
00200 }
00201 m_Affinity = affinity;
00202 }
00203
00204
00205 template <class TRequest, class TReply>
00206 inline
00207 EIO_Status CRPCClient<TRequest, TReply>::SetTimeout(const STimeout* timeout,
00208 EIO_Event direction)
00209 {
00210
00211 {{
00212 const STimeout* old_timeout = m_Timeout;
00213 if (x_IsSpecial(timeout)) {
00214 m_Timeout = timeout;
00215 } else {
00216 m_Timeout = new STimeout(*timeout);
00217 }
00218 if ( !x_IsSpecial(old_timeout) ) {
00219 delete const_cast<STimeout*>(old_timeout);
00220 }
00221 }}
00222
00223 CConn_IOStream* conn_stream
00224 = dynamic_cast<CConn_IOStream*>(m_Stream.get());
00225 if (conn_stream) {
00226 return CONN_SetTimeout(conn_stream->GetCONN(), direction, timeout);
00227 } else if ( !m_Stream.get() ) {
00228 return eIO_Success;
00229 } else {
00230 return eIO_NotSupported;
00231 }
00232 }
00233
00234
00235 template <class TRequest, class TReply>
00236 inline
00237 const STimeout* CRPCClient<TRequest, TReply>::GetTimeout(EIO_Event direction)
00238 {
00239 CConn_IOStream* conn_stream
00240 = dynamic_cast<CConn_IOStream*>(m_Stream.get());
00241 if (conn_stream) {
00242 return CONN_GetTimeout(conn_stream->GetCONN(), direction);
00243 } else {
00244 return m_Timeout;
00245 }
00246 }
00247
00248
00249 template <class TRequest, class TReply>
00250 inline
00251 void CRPCClient<TRequest, TReply>::Ask(const TRequest& request, TReply& reply)
00252 {
00253 CMutexGuard LOCK(m_Mutex);
00254
00255 unsigned int tries = 0;
00256 for (;;) {
00257 try {
00258 SetAffinity(GetAffinity(request));
00259 Connect();
00260 *m_Out << request;
00261 *m_In >> reply;
00262 break;
00263 } catch (CException& e) {
00264
00265
00266 if ( !dynamic_cast<CSerialException*>(&e)
00267 && !dynamic_cast<CIOException*>(&e) ) {
00268 throw;
00269 } else if (++tries == m_RetryLimit || !x_ShouldRetry(tries) ) {
00270 throw;
00271 } else if ( !(tries % 2) ) {
00272
00273 try {
00274 Reset();
00275 } STD_CATCH_ALL_XX(Serial_RPCClient, 1, "CRPCClient<>::Reset()")
00276 }
00277 }
00278 }
00279 }
00280
00281
00282 template <class TRequest, class TReply>
00283 inline
00284 void CRPCClient<TRequest, TReply>::x_Connect(void)
00285 {
00286 _ASSERT( !m_Service.empty() );
00287 SConnNetInfo* net_info;
00288 if (!m_Affinity.empty()) {
00289 net_info = ConnNetInfo_Create(m_Service.c_str());
00290 ConnNetInfo_PostOverrideArg(net_info, m_Affinity.c_str(), 0);
00291 } else {
00292 net_info = 0;
00293 }
00294 x_SetStream(new CConn_ServiceStream(m_Service, fSERV_Any, net_info, 0,
00295 m_Timeout));
00296
00297 ConnNetInfo_Destroy(net_info);
00298 }
00299
00300
00301 template <class TRequest, class TReply>
00302 inline
00303 void CRPCClient<TRequest, TReply>::x_Disconnect(void)
00304 {
00305 m_In.reset();
00306 m_Out.reset();
00307 m_Stream.reset();
00308 }
00309
00310
00311 template <class TRequest, class TReply>
00312 inline
00313 void CRPCClient<TRequest, TReply>::x_SetStream(CNcbiIostream* stream)
00314 {
00315 m_In .reset();
00316 m_Out.reset();
00317 m_Stream.reset(stream);
00318 m_In .reset(CObjectIStream::Open(m_Format, *stream));
00319 m_Out.reset(CObjectOStream::Open(m_Format, *stream));
00320 }
00321
00322
00323 template <class TRequest, class TReply>
00324 inline
00325 void CRPCClient<TRequest, TReply>::x_ConnectURL(const string& url)
00326 {
00327 x_SetStream(new CConn_HttpStream(url, fHCC_AutoReconnect, m_Timeout));
00328 }
00329
00330
00331 template <class TRequest, class TReply>
00332 inline
00333 bool CRPCClient<TRequest, TReply>::x_ShouldRetry(unsigned int tries)
00334 {
00335 _TRACE("CRPCClient<>::x_ShouldRetry: retrying after " << tries
00336 << " failures");
00337 return true;
00338 }
00339
00340
00341 END_NCBI_SCOPE
00342
00343
00344
00345
00346 #endif
00347
00348