00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 #include <ncbi_pch.hpp>
00032
00033 #include "../ncbi_lbsmd.h"
00034 #include "../ncbi_servicep.h"
00035
00036 #include "netservice_api_impl.hpp"
00037
00038 #include <connect/services/error_codes.hpp>
00039 #include <connect/services/srv_connections_expt.hpp>
00040 #include <connect/services/netservice_api_expt.hpp>
00041
00042 #include <connect/ncbi_conn_exception.hpp>
00043
00044 #include <corelib/ncbi_system.hpp>
00045 #include <corelib/ncbi_config.hpp>
00046
00047 #define NCBI_USE_ERRCODE_X ConnServ_Connection
00048
00049 BEGIN_NCBI_SCOPE
00050
00051 void SNetServerGroupImpl::Delete()
00052 {
00053
00054
00055
00056
00057 CFastMutexGuard g(m_Service->m_ServerGroupMutex);
00058
00059 if (GetRefCount() == 0)
00060 m_Service = NULL;
00061 }
00062
00063 CNetServer CNetServerGroupIterator::GetServer()
00064 {
00065 return m_Impl->m_ServerGroup->m_Service->ReturnServer(*m_Impl->m_Position);
00066 }
00067
00068 bool CNetServerGroupIterator::Next()
00069 {
00070 if (++m_Impl->m_Position != m_Impl->m_ServerGroup->m_Servers.end())
00071 return true;
00072
00073 m_Impl.Assign(NULL);
00074 return false;
00075 }
00076
00077 CNetServerGroupIterator CNetServerGroup::Iterate()
00078 {
00079 TNetServerList::const_iterator it = m_Impl->m_Servers.begin();
00080
00081 return it != m_Impl->m_Servers.end() ?
00082 new SNetServerGroupIteratorImpl(m_Impl, it) : NULL;
00083 }
00084
00085 SNetServiceImpl::SNetServiceImpl(CConfig* config, const string& section,
00086 const string& service_name, const string& client_name,
00087 const string& lbsm_affinity_name)
00088 {
00089 _ASSERT(!section.empty());
00090
00091 m_ServiceName = service_name;
00092 m_ClientName = client_name;
00093 m_LBSMAffinityName = lbsm_affinity_name;
00094
00095 auto_ptr<CConfig> app_reg_config;
00096 auto_ptr<CConfig::TParamTree> param_tree;
00097
00098 if (config == NULL) {
00099 CNcbiApplication* app = CNcbiApplication::Instance();
00100 CNcbiRegistry* reg;
00101 if (app != NULL && (reg = &app->GetConfig()) != NULL) {
00102 param_tree.reset(CConfig::ConvertRegToTree(*reg));
00103
00104 const CConfig::TParamTree* section_param_tree =
00105 param_tree->FindSubNode(section);
00106
00107 app_reg_config.reset(section_param_tree != NULL ?
00108 new CConfig(section_param_tree) : new CConfig(*reg));
00109
00110 config = app_reg_config.get();
00111 }
00112 }
00113
00114 if (config != NULL) {
00115 if (m_ServiceName.empty()) {
00116 try {
00117 m_ServiceName = config->GetString(section,
00118 "service", CConfig::eErr_Throw, kEmptyStr);
00119 }
00120 catch (exception&) {
00121 m_ServiceName = config->GetString(section,
00122 "service_name", CConfig::eErr_NoThrow, kEmptyStr);
00123 }
00124 if (m_ServiceName.empty()) {
00125 string host;
00126 try {
00127 host = config->GetString(section,
00128 "server", CConfig::eErr_Throw, kEmptyStr);
00129 }
00130 catch (exception&) {
00131 m_ServiceName = config->GetString(section,
00132 "host", CConfig::eErr_NoThrow, kEmptyStr);
00133 }
00134 string port = config->GetString(section,
00135 "port", CConfig::eErr_NoThrow, kEmptyStr);
00136 if (!host.empty() && !port.empty()) {
00137 m_ServiceName = host + ":";
00138 m_ServiceName += port;
00139 }
00140 }
00141 }
00142
00143 if (m_ClientName.empty()) {
00144 try {
00145 m_ClientName = config->GetString(section,
00146 "client_name", CConfig::eErr_Throw, kEmptyStr);
00147 }
00148 catch (exception&) {
00149 m_ClientName = config->GetString(section,
00150 "client", CConfig::eErr_NoThrow, kEmptyStr);
00151 }
00152 }
00153
00154 if (m_LBSMAffinityName.empty())
00155 m_LBSMAffinityName = config->GetString(section,
00156 "use_lbsm_affinity", CConfig::eErr_NoThrow, kEmptyStr);
00157
00158 unsigned long timeout = s_SecondsToMilliseconds(config->GetString(
00159 section, "communication_timeout", CConfig::eErr_NoThrow, "0"), 0);
00160
00161 if (timeout > 0)
00162 NcbiMsToTimeout(&m_Timeout, timeout);
00163 else
00164 m_Timeout = s_GetDefaultCommTimeout();
00165
00166 m_ServerThrottlePeriod = config->GetInt(section,
00167 "throttle_relaxation_period", CConfig::eErr_NoThrow,
00168 THROTTLE_RELAXATION_PERIOD_DEFAULT);
00169
00170 if (m_ServerThrottlePeriod > 0) {
00171 string numerator_str, denominator_str;
00172
00173 NStr::SplitInTwo(config->GetString(section,
00174 "throttle_by_connection_error_rate", CConfig::eErr_NoThrow,
00175 THROTTLE_BY_ERROR_RATE_DEFAULT), "/",
00176 numerator_str, denominator_str);
00177
00178 int numerator = NStr::StringToInt(numerator_str,
00179 NStr::fConvErr_NoThrow |
00180 NStr::fAllowLeadingSpaces | NStr::fAllowTrailingSpaces);
00181 int denominator = NStr::StringToInt(denominator_str,
00182 NStr::fConvErr_NoThrow |
00183 NStr::fAllowLeadingSpaces | NStr::fAllowTrailingSpaces);
00184
00185 if (denominator < 1)
00186 denominator = 1;
00187 else if (denominator > CONNECTION_ERROR_HISTORY_MAX) {
00188 numerator = (numerator * CONNECTION_ERROR_HISTORY_MAX) /
00189 denominator;
00190 denominator = CONNECTION_ERROR_HISTORY_MAX;
00191 }
00192
00193 if (numerator < 0)
00194 numerator = 0;
00195
00196 m_ReconnectionFailureThresholdNumerator = numerator;
00197 m_ReconnectionFailureThresholdDenominator = denominator;
00198
00199 m_MaxSubsequentConnectionFailures = config->GetInt(section,
00200 "throttle_by_subsequent_connection_failures",
00201 CConfig::eErr_NoThrow,
00202 THROTTLE_BY_SUBSEQUENT_CONNECTION_FAILURES_DEFAULT);
00203
00204 m_MaxQueryTime = s_SecondsToMilliseconds(config->GetString(section,
00205 "max_connection_time", CConfig::eErr_NoThrow,
00206 NCBI_AS_STRING(MAX_CONNECTION_TIME_DEFAULT)),
00207 SECONDS_DOUBLE_TO_MS_UL(MAX_CONNECTION_TIME_DEFAULT));
00208
00209 m_ThrottleUntilDiscoverable = config->GetBool(section,
00210 "throttle_hold_until_active_in_lb", CConfig::eErr_NoThrow,
00211 THROTTLE_HOLD_UNTIL_ACTIVE_IN_LB_DEFAULT);
00212
00213 m_ForceRebalanceAfterThrottleWithin = config->GetInt(section,
00214 "throttle_forced_rebalance", CConfig::eErr_NoThrow,
00215 THROTTLE_FORCED_REBALANCE_DEFAULT);
00216 }
00217
00218 m_RebalanceStrategy = CreateSimpleRebalanceStrategy(*config, section);
00219 } else {
00220 m_Timeout = s_GetDefaultCommTimeout();
00221
00222
00223 m_ServerThrottlePeriod = THROTTLE_RELAXATION_PERIOD_DEFAULT;
00224 m_ReconnectionFailureThresholdNumerator =
00225 THROTTLE_BY_ERROR_RATE_DEFAULT_NUMERATOR;
00226 m_ReconnectionFailureThresholdDenominator =
00227 THROTTLE_BY_ERROR_RATE_DEFAULT_DENOMINATOR;
00228 m_MaxSubsequentConnectionFailures =
00229 THROTTLE_BY_SUBSEQUENT_CONNECTION_FAILURES_DEFAULT;
00230 m_MaxQueryTime = SECONDS_DOUBLE_TO_MS_UL(MAX_CONNECTION_TIME_DEFAULT);
00231 m_ThrottleUntilDiscoverable = THROTTLE_HOLD_UNTIL_ACTIVE_IN_LB_DEFAULT;
00232 m_ForceRebalanceAfterThrottleWithin = THROTTLE_FORCED_REBALANCE_DEFAULT;
00233
00234 m_RebalanceStrategy = CreateDefaultRebalanceStrategy();
00235 }
00236
00237 m_LatestDiscoveryIteration = 0;
00238 m_PermanentConnection = eOn;
00239
00240 NStr::TruncateSpacesInPlace(m_ServiceName);
00241
00242 if (m_ServiceName.empty()) {
00243 m_ServiceType = eNotDefined;
00244 } else {
00245 string sport, host;
00246
00247 if (NStr::SplitInTwo(m_ServiceName, ":", host, sport)) {
00248 m_ServiceType = eSingleServer;
00249 unsigned int port = NStr::StringToInt(sport);
00250 host = CSocketAPI::ntoa(CSocketAPI::gethostbyname(host));
00251
00252
00253 SNetServerImpl* single_server = new SNetServerImplReal(host, port);
00254 m_Servers.insert(single_server);
00255 (m_SignleServerGroup = new SNetServerGroupImpl(0))->
00256 m_Servers.push_back(single_server);
00257 } else {
00258 m_ServiceType = eLoadBalanced;
00259 memset(&m_ServerGroups, 0, sizeof(m_ServerGroups));
00260 }
00261 }
00262
00263 if (m_ClientName.empty() || m_ClientName == "noname" ||
00264 NStr::FindNoCase(m_ClientName, "sample") != NPOS ||
00265 NStr::FindNoCase(m_ClientName, "unknown") != NPOS) {
00266 CNcbiApplication* app = CNcbiApplication::Instance();
00267 if (app == NULL) {
00268 NCBI_THROW(CArgException, eNoValue, "Client name is not set.");
00269 }
00270 m_ClientName = app->GetProgramDisplayName();
00271 }
00272
00273
00274 m_LBSMAffinityValue = m_LBSMAffinityName.empty() ? NULL :
00275 LBSMD_GetHostParameter(SERV_LOCALHOST, m_LBSMAffinityName.c_str());
00276 }
00277
00278 const string& CNetService::GetClientName() const
00279 {
00280 return m_Impl->m_ClientName;
00281 }
00282
00283 const string& CNetService::GetServiceName() const
00284 {
00285 return m_Impl->m_ServiceName;
00286 }
00287
00288 bool CNetService::IsLoadBalanced() const
00289 {
00290 return m_Impl->m_ServiceType == SNetServiceImpl::eLoadBalanced;
00291 }
00292
00293 void CNetService::SetRebalanceStrategy(IRebalanceStrategy* strategy)
00294 {
00295 m_Impl->m_RebalanceStrategy = strategy != NULL ? strategy :
00296 CreateDefaultRebalanceStrategy().GetPtr();
00297 }
00298
00299 void CNetService::PrintCmdOutput(const string& cmd,
00300 CNcbiOstream& output_stream, CNetService::ECmdOutputStyle output_style)
00301 {
00302 for (CNetServerGroupIterator it = DiscoverServers().Iterate(); it; ++it) {
00303 if (output_style != eDumpNoHeaders)
00304 output_stream << (*it)->m_Address.AsString() << endl;
00305
00306 CNetServer::SExecResult exec_result((*it).ExecWithRetry(cmd));
00307
00308 if (output_style == eSingleLineOutput)
00309 output_stream << exec_result.response;
00310 else {
00311 CNetServerMultilineCmdOutput output(exec_result);
00312
00313 if (output_style == eMultilineOutput_NetCacheStyle)
00314 output->SetNetCacheCompatMode();
00315
00316 string line;
00317
00318 while (output.ReadLine(line))
00319 output_stream << line << endl;
00320 }
00321
00322 if (output_style != eDumpNoHeaders)
00323 output_stream << endl;
00324 }
00325 }
00326
00327 SNetServerImpl* SNetServiceImpl::FindOrCreateServerImpl(
00328 const string& host, unsigned short port)
00329 {
00330 SNetServerImpl search_image(host, port);
00331
00332 TNetServerSet::iterator it = m_Servers.find(&search_image);
00333
00334 if (it != m_Servers.end())
00335 return *it;
00336
00337 SNetServerImpl* server = new SNetServerImplReal(host, port);
00338
00339 m_Servers.insert(server);
00340
00341 return server;
00342 }
00343
00344 CNetServer SNetServiceImpl::ReturnServer(SNetServerImpl* server_impl)
00345 {
00346 m_RebalanceStrategy->OnResourceRequested();
00347
00348 CFastMutexGuard g(m_ServerMutex);
00349
00350 server_impl->m_Service = this;
00351 return server_impl;
00352 }
00353
00354 CNetServer SNetServiceImpl::GetServer(const string& host, unsigned int port)
00355 {
00356 m_RebalanceStrategy->OnResourceRequested();
00357
00358 CFastMutexGuard g(m_ServerMutex);
00359
00360 SNetServerImpl* server = FindOrCreateServerImpl(host, port);
00361 server->m_Service = this;
00362 return server;
00363 }
00364
00365 CNetServer SNetServiceImpl::GetSingleServer()
00366 {
00367 _ASSERT(m_ServiceType != eLoadBalanced);
00368
00369 if (m_ServiceType == eNotDefined)
00370 NCBI_THROW(CNetSrvConnException, eSrvListEmpty,
00371 "The service is not set.");
00372
00373 return ReturnServer(m_SignleServerGroup->m_Servers.front());
00374 }
00375
00376 CNetServer::SExecResult CNetService::FindServerAndExec(const string& cmd)
00377 {
00378 _ASSERT(m_Impl->m_ServiceType != SNetServiceImpl::eNotDefined);
00379
00380 if (m_Impl->m_ServiceType != SNetServiceImpl::eLoadBalanced)
00381 return m_Impl->GetSingleServer().ExecWithRetry(cmd);
00382
00383 for (CNetServerGroupIterator it = DiscoverServers().Iterate(); it; ++it) {
00384 try {
00385 return (*it).ExecWithRetry(cmd);
00386 }
00387 catch (CNetSrvConnException& ex) {
00388 if (ex.GetErrCode() != CNetSrvConnException::eConnectionFailure &&
00389 ex.GetErrCode() != CNetSrvConnException::eServerThrottle)
00390 throw;
00391 ERR_POST_X(2, ex.what());
00392 }
00393 }
00394
00395 NCBI_THROW(CNetSrvConnException, eSrvListEmpty,
00396 "Couldn't find any availbale servers for the " +
00397 GetServiceName() + " service.");
00398 }
00399
00400 CNetServer SNetServiceImpl::RequireStandAloneServerSpec()
00401 {
00402 if (m_ServiceType != eLoadBalanced)
00403 return GetSingleServer();
00404
00405 NCBI_THROW(CNetServiceException, eCommandIsNotAllowed,
00406 "This command requires explicit server address (host:port)");
00407 }
00408
00409 #define LBSMD_IS_PENALIZED_RATE(rate) (rate <= -0.01)
00410
00411 SNetServerGroupImpl* SNetServiceImpl::CreateServerGroup(
00412 CNetService::EDiscoveryMode discovery_mode)
00413 {
00414 SNetServerGroupImpl** server_group = m_ServerGroups + discovery_mode;
00415
00416 if (*server_group != NULL && (*server_group)->GetRefCount() == 0)
00417 delete *server_group;
00418
00419 return *server_group = new SNetServerGroupImpl(m_LatestDiscoveryIteration);
00420 }
00421
00422 SNetServerGroupImpl* SNetServiceImpl::DiscoverServers(
00423 CNetService::EDiscoveryMode discovery_mode)
00424 {
00425 switch (m_ServiceType) {
00426 case SNetServiceImpl::eNotDefined:
00427 NCBI_THROW(CNetSrvConnException, eSrvListEmpty,
00428 "Service name is not set.");
00429
00430 case SNetServiceImpl::eSingleServer:
00431 m_SignleServerGroup->m_Service = this;
00432 return m_SignleServerGroup;
00433
00434 case SNetServiceImpl::eLoadBalanced:
00435 break;
00436 }
00437
00438
00439 m_RebalanceStrategy->OnResourceRequested();
00440 if (m_RebalanceStrategy->NeedRebalance())
00441 ++m_LatestDiscoveryIteration;
00442
00443 _ASSERT(discovery_mode >= 0 &&
00444 discovery_mode < CNetService::eNumberOfDiscoveryModes);
00445
00446 SNetServerGroupImpl* result = m_ServerGroups[discovery_mode];
00447
00448 if (result != NULL &&
00449 result->m_DiscoveryIteration == m_LatestDiscoveryIteration) {
00450 result->m_Service = this;
00451 return result;
00452 }
00453
00454
00455
00456
00457
00458 SNetServerGroupImpl* base_group = m_ServerGroups[CNetService::eSortByLoad];
00459
00460 bool base_group_requested = discovery_mode == CNetService::eSortByLoad ||
00461 discovery_mode == CNetService::eIncludePenalized;
00462
00463 if (base_group_requested || base_group == NULL ||
00464 base_group->m_DiscoveryIteration != m_LatestDiscoveryIteration) {
00465
00466 SERV_ITER srv_it;
00467
00468 int try_count = TServConn_MaxFineLBNameRetries::GetDefault();
00469 for (;;) {
00470 SConnNetInfo* net_info = ConnNetInfo_Create(m_ServiceName.c_str());
00471
00472 srv_it = SERV_OpenP(m_ServiceName.c_str(),
00473 discovery_mode == CNetService::eIncludePenalized ?
00474 fSERV_Standalone | fSERV_IncludeSuppressed :
00475 fSERV_Standalone,
00476 SERV_LOCALHOST, 0, 0.0, net_info, NULL, 0, 0 ,
00477 m_LBSMAffinityName.c_str(), m_LBSMAffinityValue);
00478
00479 ConnNetInfo_Destroy(net_info);
00480
00481 if (srv_it != 0)
00482 break;
00483
00484 if (--try_count < 0) {
00485 NCBI_THROW(CNetSrvConnException, eLBNameNotFound,
00486 "Load balancer cannot find service name '" +
00487 m_ServiceName + "'");
00488 }
00489 ERR_POST_X(4, "Could not find LB service name '" <<
00490 m_ServiceName << "', will retry after delay");
00491 SleepMilliSec(s_GetRetryDelay());
00492 }
00493
00494 base_group = CreateServerGroup(
00495 discovery_mode == CNetService::eIncludePenalized ?
00496 CNetService::eIncludePenalized : CNetService::eSortByLoad);
00497
00498 CFastMutexGuard g(m_ServerMutex);
00499
00500 const SSERV_Info* sinfo;
00501
00502 while ((sinfo = SERV_GetNextInfoEx(srv_it, 0)) != 0) {
00503 if (sinfo->time > 0 && sinfo->time != NCBI_TIME_INFINITE &&
00504 (sinfo->rate > 0.0 ||
00505 (discovery_mode == CNetService::eIncludePenalized &&
00506 LBSMD_IS_PENALIZED_RATE(sinfo->rate)))) {
00507
00508 SNetServerImpl* server = FindOrCreateServerImpl(
00509 CSocketAPI::ntoa(sinfo->host), sinfo->port);
00510 server->m_DiscoveryIteration = m_LatestDiscoveryIteration;
00511 base_group->m_Servers.push_back(server);
00512 }
00513 }
00514
00515 g.Release();
00516
00517 SERV_Close(srv_it);
00518
00519 if (base_group_requested) {
00520 base_group->m_Service = this;
00521 return base_group;
00522 }
00523 }
00524
00525 result = CreateServerGroup(discovery_mode);
00526
00527 if (discovery_mode == CNetService::eRandomize) {
00528 size_t servers_size = base_group->m_Servers.size();
00529
00530 if (servers_size > 0) {
00531
00532
00533 for (unsigned current = rand() % servers_size,
00534 last = current + servers_size; current < last; ++current)
00535 result->m_Servers.push_back(
00536 base_group->m_Servers[current % servers_size]);
00537 }
00538 } else {
00539 result->m_Servers.insert(result->m_Servers.begin(),
00540 base_group->m_Servers.begin(), base_group->m_Servers.end());
00541 sort(result->m_Servers.begin(), result->m_Servers.end());
00542 }
00543
00544 result->m_Service = this;
00545 return result;
00546 }
00547
00548 void SNetServiceImpl::Monitor(CNcbiOstream& out, const string& cmd)
00549 {
00550 CNetServer::SExecResult exec_result(
00551 RequireStandAloneServerSpec().ExecWithRetry(cmd));
00552
00553 out << exec_result.response << "\n" << flush;
00554
00555 STimeout rto = {1, 0};
00556
00557 CSocket* the_socket = &exec_result.conn->m_Socket;
00558
00559 the_socket->SetTimeout(eIO_Read, &rto);
00560
00561 string line;
00562
00563 for (;;)
00564 if (the_socket->ReadLine(line) == eIO_Success)
00565 out << line << "\n" << flush;
00566 else
00567 if (the_socket->GetStatus(eIO_Open) != eIO_Success)
00568 break;
00569
00570 exec_result.conn->Close();
00571 }
00572
00573 SNetServiceImpl::~SNetServiceImpl()
00574 {
00575
00576 NON_CONST_ITERATE(TNetServerSet, it, m_Servers) {
00577 delete *it;
00578 }
00579
00580 switch (m_ServiceType) {
00581 case eLoadBalanced:
00582 {{
00583
00584 int i = (int) CNetService::eNumberOfDiscoveryModes;
00585 SNetServerGroupImpl** server_group = m_ServerGroups;
00586 while (--i >= 0) {
00587 if (*server_group != NULL)
00588 delete *server_group;
00589 ++server_group;
00590 }
00591 }}
00592 break;
00593
00594 case eSingleServer:
00595 delete m_SignleServerGroup;
00596 break;
00597
00598 case eNotDefined:
00599 break;
00600 }
00601
00602 if (m_LBSMAffinityValue != NULL)
00603 free((void*) m_LBSMAffinityValue);
00604 }
00605
00606 void CNetService::SetCommunicationTimeout(const STimeout& to)
00607 {
00608 m_Impl->m_Timeout = to;
00609 }
00610 const STimeout& CNetService::GetCommunicationTimeout() const
00611 {
00612 return m_Impl->m_Timeout;
00613 }
00614
00615 void CNetService::SetPermanentConnection(ESwitch type)
00616 {
00617 m_Impl->m_PermanentConnection = type;
00618 }
00619
00620 CNetServerGroup CNetService::DiscoverServers(
00621 CNetService::EDiscoveryMode discovery_mode)
00622 {
00623 CFastMutexGuard g(m_Impl->m_ServerGroupMutex);
00624
00625 return m_Impl->DiscoverServers(discovery_mode);
00626 }
00627
00628 bool CNetServerGroup::FindServer(INetServerFinder* finder)
00629 {
00630 bool had_comm_err = false;
00631
00632 for (CNetServerGroupIterator it = Iterate(); it; ++it) {
00633 CNetServer server = *it;
00634
00635 try {
00636 if (finder->Consider(server))
00637 return true;
00638 }
00639 catch (CNetServiceException& ex) {
00640 ERR_POST_X(5, server->m_Address.AsString() <<
00641 " returned error: \"" << ex.what() << "\"");
00642
00643 if (ex.GetErrCode() != CNetServiceException::eCommunicationError)
00644 throw;
00645
00646 had_comm_err = true;
00647 }
00648 catch (CIO_Exception& ex) {
00649 ERR_POST_X(6, server->m_Address.AsString() <<
00650 " returned error: \"" << ex.what() << "\"");
00651
00652 had_comm_err = true;
00653 }
00654 }
00655
00656 if (had_comm_err)
00657 NCBI_THROW(CNetServiceException,
00658 eCommunicationError, "Communication error");
00659
00660 return false;
00661 }
00662
00663 END_NCBI_SCOPE
00664
00665