|
NCBI C++ ToolKit
|
00001 /* $Id: ns_handler.cpp 54572 2012-05-22 16:00:04Z satskyse $ 00002 * =========================================================================== 00003 * 00004 * PUBLIC DOMAIN NOTICE 00005 * National Center for Biotechnology Information 00006 * 00007 * This software/database is a "United States Government Work" under the 00008 * terms of the United States Copyright Act. It was written as part of 00009 * the author's official duties as a United States Government employee and 00010 * thus cannot be copyrighted. This software/database is freely available 00011 * to the public for use. The National Library of Medicine and the U.S. 00012 * Government have not placed any restriction on its use or reproduction. 00013 * 00014 * Although all reasonable efforts have been taken to ensure the accuracy 00015 * and reliability of the software and data, the NLM and the U.S. 00016 * Government do not and cannot warrant the performance or results that 00017 * may be obtained by using this software or data. The NLM and the U.S. 00018 * Government disclaim all warranties, express or implied, including 00019 * warranties of performance, merchantability or fitness for any particular 00020 * purpose. 00021 * 00022 * Please cite the author in any work or product based on this material. 00023 * 00024 * =========================================================================== 00025 * 00026 * Authors: Anatoliy Kuznetsov, Victor Joukov 00027 * 00028 * File Description: netschedule commands handler 00029 * 00030 */ 00031 00032 #include <ncbi_pch.hpp> 00033 #include <corelib/request_ctx.hpp> 00034 00035 #include "ns_handler.hpp" 00036 #include "ns_server.hpp" 00037 #include "ns_server_misc.hpp" 00038 00039 #include <sys/types.h> 00040 #include <sys/socket.h> 00041 #include <netinet/in.h> 00042 #include <netinet/tcp.h> 00043 00044 00045 USING_NCBI_SCOPE; 00046 00047 00048 00049 /// NetSchedule command parser 00050 /// 00051 /// @internal 00052 /// 00053 00054 CNetScheduleHandler::SCommandMap CNetScheduleHandler::sm_BatchHeaderMap[] = { 00055 { "BTCH", { &CNetScheduleHandler::x_ProcessBatchStart, 0 }, 00056 { { "size", eNSPT_Int, eNSPA_Required } } }, 00057 { "ENDS", { &CNetScheduleHandler::x_ProcessBatchSequenceEnd, 0 } }, 00058 { NULL } 00059 }; 00060 00061 CNetScheduleHandler::SCommandMap CNetScheduleHandler::sm_BatchEndMap[] = { 00062 { "ENDB" }, 00063 { NULL } 00064 }; 00065 00066 SNSProtoArgument s_BatchArgs[] = { 00067 { "input", eNSPT_Str, eNSPA_Required }, 00068 { "aff", eNSPT_Str, eNSPA_Optional, "" }, 00069 { "msk", eNSPT_Int, eNSPA_Optional, "0" }, 00070 { NULL } 00071 }; 00072 00073 CNetScheduleHandler::SCommandMap CNetScheduleHandler::sm_CommandMap[] = { 00074 00075 /*** Admin role ***/ 00076 { "SHUTDOWN", { &CNetScheduleHandler::x_ProcessShutdown, 00077 eNSCR_Admin }, 00078 { { "drain", eNSPT_Int, eNSPA_Optional, 0 } } }, 00079 { "GETCONF", { &CNetScheduleHandler::x_ProcessGetConf, 00080 eNSCR_Admin } }, 00081 { "REFUSESUBMITS", { &CNetScheduleHandler::x_ProcessRefuseSubmits, 00082 eNSCR_Admin }, 00083 { { "mode", eNSPT_Int, eNSPA_Required } } }, 00084 00085 /*** Any role ***/ 00086 { "VERSION", { &CNetScheduleHandler::x_ProcessVersion, 00087 eNSCR_Any } }, 00088 { "QUIT", { &CNetScheduleHandler::x_ProcessQuitSession, 00089 eNSCR_Any } }, 00090 { "RECO", { &CNetScheduleHandler::x_ProcessReloadConfig, 00091 eNSCR_Admin } }, 00092 { "ACNT", { &CNetScheduleHandler::x_ProcessActiveCount, 00093 eNSCR_Any } }, 00094 { "QLST", { &CNetScheduleHandler::x_ProcessQList, 00095 eNSCR_Any } }, 00096 { "QINF", { &CNetScheduleHandler::x_ProcessQueueInfo, 00097 eNSCR_Any }, 00098 { { "qname", eNSPT_Id, eNSPA_Required } } }, 00099 00100 /*** QueueAdmin role ***/ 00101 { "DROPQ", { &CNetScheduleHandler::x_ProcessDropQueue, 00102 eNSCR_QueueAdmin } }, 00103 00104 /*** DynClassAdmin role ***/ 00105 // QCRE qname : id qclass : id [ comment : str ] 00106 { "QCRE", { &CNetScheduleHandler::x_ProcessCreateQueue, 00107 eNSAC_DynClassAdmin }, 00108 { { "qname", eNSPT_Id, eNSPA_Required }, 00109 { "qclass", eNSPT_Id, eNSPA_Required }, 00110 { "comment", eNSPT_Str, eNSPA_Optional } } }, 00111 // QDEL qname : id 00112 { "QDEL", { &CNetScheduleHandler::x_ProcessDeleteQueue, 00113 eNSAC_DynQueueAdmin }, 00114 { { "qname", eNSPT_Id, eNSPA_Required } } }, 00115 00116 /*** Queue role ***/ 00117 // STATUS job_key : id 00118 { "STATUS", { &CNetScheduleHandler::x_ProcessStatus, 00119 eNSCR_Queue }, 00120 { { "job_key", eNSPT_Id, eNSPA_Required } } }, 00121 // STATUS2 job_key : id 00122 { "STATUS2", { &CNetScheduleHandler::x_ProcessStatus, 00123 eNSCR_Queue }, 00124 { { "job_key", eNSPT_Id, eNSPA_Required } } }, 00125 // STAT [ option : id ] -- "ALL" 00126 { "STAT", { &CNetScheduleHandler::x_ProcessStatistics, 00127 eNSCR_Any }, 00128 { { "option", eNSPT_Id, eNSPA_Optional }, 00129 { "comment", eNSPT_Id, eNSPA_Optional }, 00130 { "aff", eNSPT_Str, eNSPA_Optional }, 00131 { "group", eNSPT_Str, eNSPA_Optional } } }, 00132 // MPUT job_key : id progress_msg : str 00133 { "MPUT", { &CNetScheduleHandler::x_ProcessPutMessage, 00134 eNSCR_Queue }, 00135 { { "job_key", eNSPT_Id, eNSPA_Required }, 00136 { "progress_msg", eNSPT_Str, eNSPA_Required } } }, 00137 // MGET job_key : id 00138 { "MGET", { &CNetScheduleHandler::x_ProcessGetMessage, 00139 eNSCR_Queue }, 00140 { { "job_key", eNSPT_Id, eNSPA_Required } } }, 00141 // DUMP [ job_key : id ] 00142 { "DUMP", { &CNetScheduleHandler::x_ProcessDump, 00143 eNSCR_Queue }, 00144 { { "job_key", eNSPT_Id, eNSPA_Optional }, 00145 { "status", eNSPT_Id, eNSPA_Optional }, 00146 { "start_after", eNSPT_Id, eNSPA_Optional }, 00147 { "count", eNSPT_Int, eNSPA_Optional, "0" }, 00148 { "group", eNSPT_Str, eNSPA_Optional } } }, 00149 // GETP [ client_info : id ] 00150 { "GETP", { &CNetScheduleHandler::x_ProcessGetParam, 00151 eNSCR_Queue } }, 00152 { "GETC", { &CNetScheduleHandler::x_ProcessGetConfiguration, 00153 eNSCR_Queue } }, 00154 // CLRN 00155 { "CLRN", { &CNetScheduleHandler::x_ProcessClearWorkerNode, 00156 eNSCR_Queue } }, 00157 { "CANCELQ", { &CNetScheduleHandler::x_ProcessCancelQueue, 00158 eNSCR_Queue } }, 00159 00160 /*** Submitter role ***/ 00161 // SST job_key : id -- submitter fast status, changes timestamp 00162 { "SST", { &CNetScheduleHandler::x_ProcessFastStatusS, 00163 eNSCR_Submitter }, 00164 { { "job_key", eNSPT_Id, eNSPA_Required } } }, 00165 // SST2 job_key : id -- submitter fast status, changes timestamp 00166 { "SST2", { &CNetScheduleHandler::x_ProcessFastStatusS, 00167 eNSCR_Submitter }, 00168 { { "job_key", eNSPT_Id, eNSPA_Required } } }, 00169 // SUBMIT input : str [ progress_msg : str ] [ port : uint [ timeout : uint ]] 00170 // [ affinity_token : keystr(aff) ] [ job_mask : keyint(msk) ] 00171 { "SUBMIT", { &CNetScheduleHandler::x_ProcessSubmit, 00172 eNSCR_Submitter }, 00173 { { "input", eNSPT_Str, eNSPA_Required }, // input 00174 { "port", eNSPT_Int, eNSPA_Optional }, 00175 { "timeout", eNSPT_Int, eNSPA_Optional }, 00176 { "aff", eNSPT_Str, eNSPA_Optional, "" }, // affinity_token 00177 { "msk", eNSPT_Int, eNSPA_Optional, "0" }, 00178 { "ip", eNSPT_Str, eNSPA_Optional, "" }, 00179 { "sid", eNSPT_Str, eNSPA_Optional, "" }, 00180 { "group", eNSPT_Str, eNSPA_Optional, "" } } }, 00181 // CANCEL job_key : id 00182 { "CANCEL", { &CNetScheduleHandler::x_ProcessCancel, 00183 eNSCR_Submitter }, 00184 { { "job_key", eNSPT_Id, eNSPA_Optional }, 00185 { "group", eNSPT_Str, eNSPA_Optional } } }, 00186 // DROJ job_key : id 00187 { "DROJ", { &CNetScheduleHandler::x_ProcessDropJob, 00188 eNSCR_Submitter }, 00189 { { "job_key", eNSPT_Id, eNSPA_Required } } }, 00190 { "BSUB", { &CNetScheduleHandler::x_ProcessSubmitBatch, 00191 eNSCR_Submitter }, 00192 { { "port", eNSPT_Int, eNSPA_Optional }, 00193 { "timeout", eNSPT_Int, eNSPA_Optional }, 00194 { "ip", eNSPT_Str, eNSPA_Optional, "" }, 00195 { "sid", eNSPT_Str, eNSPA_Optional, "" }, 00196 { "group", eNSPT_Str, eNSPA_Optional, "" } } }, 00197 // READ [ timeout : int ] -> job key 00198 { "READ", { &CNetScheduleHandler::x_ProcessReading, 00199 eNSCR_Submitter }, 00200 { { "timeout", eNSPT_Int, eNSPA_Optional, "0" }, 00201 { "group", eNSPT_Str, eNSPA_Optional, "" } } }, 00202 // CFRM group : int jobs : str 00203 { "CFRM", { &CNetScheduleHandler::x_ProcessConfirm, 00204 eNSCR_Submitter }, 00205 { { "job_key", eNSPT_Id, eNSPA_Required }, 00206 { "auth_token", eNSPT_Id, eNSPA_Required } } }, 00207 // FRED group : int jobs : str [ message : str ] 00208 { "FRED", { &CNetScheduleHandler::x_ProcessReadFailed, 00209 eNSCR_Submitter }, 00210 { { "job_key", eNSPT_Id, eNSPA_Required }, 00211 { "auth_token", eNSPT_Id, eNSPA_Required }, 00212 { "err_msg", eNSPT_Str, eNSPA_Optional } } }, 00213 // RDRB group : int jobs : str 00214 { "RDRB", { &CNetScheduleHandler::x_ProcessReadRollback, 00215 eNSCR_Submitter }, 00216 { { "job_key", eNSPT_Id, eNSPA_Required }, 00217 { "auth_token", eNSPT_Str, eNSPA_Required } } }, 00218 00219 /*** Worker node role ***/ 00220 // WST job_key : id -- worker node fast status, does not change timestamp 00221 { "WST", { &CNetScheduleHandler::x_ProcessFastStatusW, 00222 eNSCR_Worker }, 00223 { { "job_key", eNSPT_Id, eNSPA_Required } } }, 00224 // WST2 job_key : id -- worker node fast status, does not change timestamp 00225 { "WST2", { &CNetScheduleHandler::x_ProcessFastStatusW, 00226 eNSCR_Worker }, 00227 { { "job_key", eNSPT_Id, eNSPA_Required } } }, 00228 // CHAFF [add: string] [del: string] -- change affinity 00229 { "CHAFF", { &CNetScheduleHandler::x_ProcessChangeAffinity, 00230 eNSCR_Worker }, 00231 { { "add", eNSPT_Str, eNSPA_Optional, "" }, 00232 { "del", eNSPT_Str, eNSPA_Optional, "" } } }, 00233 // GET [ port : int ] [affinity_list : keystr(aff) ] 00234 { "GET", { &CNetScheduleHandler::x_ProcessGetJob, 00235 eNSCR_Worker }, 00236 { { "port", eNSPT_Id, eNSPA_Optional }, 00237 { "aff", eNSPT_Str, eNSPA_Optional, "" } } }, 00238 { "GET2", { &CNetScheduleHandler::x_ProcessGetJob, 00239 eNSCR_Worker }, 00240 { { "wnode_aff", eNSPT_Int, eNSPA_Required, 0 }, 00241 { "any_aff", eNSPT_Int, eNSPA_Required, 0 }, 00242 { "exclusive_new_aff", eNSPT_Int, eNSPA_Optional, 0 }, 00243 { "aff", eNSPT_Str, eNSPA_Optional, "" }, 00244 { "port", eNSPT_Int, eNSPA_Optional }, 00245 { "timeout", eNSPT_Int, eNSPA_Optional } } }, 00246 // PUT job_key : id job_return_code : int output : str 00247 { "PUT", { &CNetScheduleHandler::x_ProcessPut, 00248 eNSCR_Worker }, 00249 { { "job_key", eNSPT_Id, eNSPA_Required }, 00250 { "job_return_code", eNSPT_Id, eNSPA_Required }, 00251 { "output", eNSPT_Str, eNSPA_Required } } }, 00252 { "PUT2", { &CNetScheduleHandler::x_ProcessPut, 00253 eNSCR_Worker }, 00254 { { "job_key", eNSPT_Id, eNSPA_Required }, 00255 { "auth_token", eNSPT_Id, eNSPA_Required }, 00256 { "job_return_code", eNSPT_Id, eNSPA_Required }, 00257 { "output", eNSPT_Str, eNSPA_Required } } }, 00258 // RETURN job_key : id 00259 { "RETURN", { &CNetScheduleHandler::x_ProcessReturn, 00260 eNSCR_Worker }, 00261 { { "job_key", eNSPT_Id, eNSPA_Required } } }, 00262 { "RETURN2", { &CNetScheduleHandler::x_ProcessReturn, 00263 eNSCR_Worker }, 00264 { { "job_key", eNSPT_Id, eNSPA_Required }, 00265 { "auth_token", eNSPT_Id, eNSPA_Required } } }, 00266 // WGET port : uint timeout : uint 00267 // [affinity_list : keystr(aff) ] 00268 { "WGET", { &CNetScheduleHandler::x_ProcessGetJob, 00269 eNSCR_Worker }, 00270 { { "port", eNSPT_Int, eNSPA_Required }, 00271 { "timeout", eNSPT_Int, eNSPA_Required }, 00272 { "aff", eNSPT_Str, eNSPA_Optional, "" } } }, 00273 { "CWGET", { &CNetScheduleHandler::x_ProcessCancelWaitGet, 00274 eNSCR_Worker } }, 00275 // FPUT job_key : id err_msg : str output : str job_return_code : int 00276 { "FPUT", { &CNetScheduleHandler::x_ProcessPutFailure, 00277 eNSCR_Worker }, 00278 { { "job_key", eNSPT_Id, eNSPA_Required }, 00279 { "err_msg", eNSPT_Str, eNSPA_Required }, 00280 { "output", eNSPT_Str, eNSPA_Required }, 00281 { "job_return_code", eNSPT_Int, eNSPA_Required } } }, 00282 { "FPUT2", { &CNetScheduleHandler::x_ProcessPutFailure, 00283 eNSCR_Worker }, 00284 { { "job_key", eNSPT_Id, eNSPA_Required }, 00285 { "auth_token", eNSPT_Id, eNSPA_Required }, 00286 { "err_msg", eNSPT_Str, eNSPA_Required }, 00287 { "output", eNSPT_Str, eNSPA_Required }, 00288 { "job_return_code", eNSPT_Int, eNSPA_Required } } }, 00289 // JXCG [ job_key : id [ job_return_code : int [ output : str ] ] ] 00290 // [affinity_list : keystr(aff) ] 00291 { "JXCG", { &CNetScheduleHandler::x_ProcessJobExchange, 00292 eNSCR_Worker }, 00293 { { "job_key", eNSPT_Id, eNSPA_Optchain }, 00294 { "job_return_code", eNSPT_Int, eNSPA_Optchain }, 00295 { "output", eNSPT_Str, eNSPA_Optional }, 00296 { "aff", eNSPT_Str, eNSPA_Optional, "" } } }, 00297 // JDEX job_key : id timeout : uint 00298 { "JDEX", { &CNetScheduleHandler::x_ProcessJobDelayExpiration, 00299 eNSCR_Worker }, 00300 { { "job_key", eNSPT_Id, eNSPA_Required }, 00301 { "timeout", eNSPT_Int, eNSPA_Required } } }, 00302 // AFLS 00303 { "AFLS", { &CNetScheduleHandler::x_ProcessGetAffinityList, 00304 eNSCR_Worker } }, 00305 00306 // Obsolete commands 00307 { "REGC", { &CNetScheduleHandler::x_CmdObsolete, 00308 eNSCR_Worker }, 00309 { { "port", eNSPT_Int, eNSPA_Optional } } }, 00310 { "URGC", { &CNetScheduleHandler::x_CmdObsolete, 00311 eNSCR_Worker }, 00312 { { "port", eNSPT_Int, eNSPA_Optional } } }, 00313 { "INIT", { &CNetScheduleHandler::x_CmdObsolete, 00314 eNSCR_Worker } }, 00315 { "JRTO", { &CNetScheduleHandler::x_CmdNotImplemented, 00316 eNSCR_Worker } }, 00317 { "FRES", { &CNetScheduleHandler::x_CmdNotImplemented, 00318 eNSCR_Submitter } }, 00319 { "QERY", { &CNetScheduleHandler::x_CmdNotImplemented, 00320 eNSCR_Queue } }, 00321 { "QSEL", { &CNetScheduleHandler::x_CmdNotImplemented, 00322 eNSCR_Queue } }, 00323 { "MONI", { &CNetScheduleHandler::x_CmdNotImplemented, 00324 eNSCR_Queue } }, 00325 { "LOG", { &CNetScheduleHandler::x_CmdNotImplemented, 00326 eNSCR_Any } }, 00327 00328 { NULL }, 00329 }; 00330 00331 00332 static SNSProtoArgument s_AuthArgs[] = { 00333 { "client", eNSPT_Str, eNSPA_Optional, "Unknown client" }, 00334 { "params", eNSPT_Str, eNSPA_Ellipsis }, 00335 { NULL } 00336 }; 00337 00338 00339 00340 static void s_BufReadHelper(void* data, void* ptr, size_t size) 00341 { 00342 ((string*) data)->append((const char *) ptr, size); 00343 } 00344 00345 00346 static void s_ReadBufToString(BUF buf, string& str) 00347 { 00348 size_t size = BUF_Size(buf); 00349 00350 str.erase(); 00351 str.reserve(size); 00352 00353 BUF_PeekAtCB(buf, 0, s_BufReadHelper, &str, size); 00354 BUF_Read(buf, NULL, size); 00355 } 00356 00357 00358 CNetScheduleHandler::CNetScheduleHandler(CNetScheduleServer* server) 00359 : m_MsgBufferSize(kInitialMessageBufferSize), 00360 m_MsgBuffer(new char[kInitialMessageBufferSize]), 00361 m_Server(server), 00362 m_BatchSize(0), 00363 m_BatchPos(0), 00364 m_WithinBatchSubmit(false), 00365 m_SingleCmdParser(sm_CommandMap), 00366 m_BatchHeaderParser(sm_BatchHeaderMap), 00367 m_BatchEndParser(sm_BatchEndMap) 00368 {} 00369 00370 00371 CNetScheduleHandler::~CNetScheduleHandler() 00372 { 00373 delete [] m_MsgBuffer; 00374 } 00375 00376 00377 void CNetScheduleHandler::OnOpen(void) 00378 { 00379 CSocket & socket = GetSocket(); 00380 STimeout to = {m_Server->GetInactivityTimeout(), 0}; 00381 00382 socket.DisableOSSendDelay(); 00383 socket.SetTimeout(eIO_ReadWrite, &to); 00384 x_SetQuickAcknowledge(); 00385 00386 m_ProcessMessage = &CNetScheduleHandler::x_ProcessMsgAuth; 00387 00388 // Log the fact of opened connection 00389 m_ConnContext.Reset(new CRequestContext()); 00390 m_ConnContext->SetRequestID(); 00391 m_ConnContext->SetClientIP(socket.GetPeerAddress(eSAF_IP)); 00392 00393 CDiagnosticsGuard guard(this); 00394 if (m_Server->IsLog()) { 00395 GetDiagContext().PrintRequestStart() 00396 .Print("_type", "conn") 00397 .Flush(); 00398 } 00399 m_ConnContext->SetRequestStatus(eStatus_OK); 00400 } 00401 00402 00403 void CNetScheduleHandler::OnWrite() 00404 {} 00405 00406 00407 void CNetScheduleHandler::OnClose(IServer_ConnectionHandler::EClosePeer peer) 00408 { 00409 if (m_WithinBatchSubmit) { 00410 m_WithinBatchSubmit = false; 00411 m_Server->DecrementCurrentSubmitsCounter(); 00412 } 00413 00414 // It's possible that this method will be called before OnOpen - when 00415 // connection is just created and server is shutting down. In this case 00416 // OnOpen will never be called. 00417 if (m_ConnContext.IsNull()) 00418 return; 00419 00420 00421 CDiagnosticsGuard guard(this); 00422 00423 switch (peer) 00424 { 00425 case IServer_ConnectionHandler::eOurClose: 00426 if (m_DiagContext.NotNull()) { 00427 m_ConnContext->SetRequestStatus(m_DiagContext->GetRequestStatus()); 00428 } else { 00429 if (m_ConnContext->GetRequestStatus() != eStatus_HTTPProbe && 00430 m_ConnContext->GetRequestStatus() != eStatus_BadRequest) 00431 m_ConnContext->SetRequestStatus(eStatus_Inactive); 00432 } 00433 break; 00434 case IServer_ConnectionHandler::eClientClose: 00435 if (m_DiagContext.NotNull()) { 00436 m_DiagContext->SetRequestStatus(eStatus_SocketIOError); 00437 m_ConnContext->SetRequestStatus(eStatus_SocketIOError); 00438 } 00439 break; 00440 } 00441 00442 00443 if (m_Server->IsLog()) { 00444 CSocket& socket = GetSocket(); 00445 CDiagContext::SetRequestContext(m_ConnContext); 00446 m_ConnContext->SetBytesRd(socket.GetCount(eIO_Read)); 00447 m_ConnContext->SetBytesWr(socket.GetCount(eIO_Write)); 00448 GetDiagContext().PrintRequestStop(); 00449 } 00450 00451 m_ConnContext.Reset(); 00452 } 00453 00454 00455 void CNetScheduleHandler::OnTimeout() 00456 { 00457 CDiagnosticsGuard guard(this); 00458 00459 if (m_Server->IsLog()) 00460 m_ConnContext->SetRequestStatus(eStatus_Inactive); 00461 } 00462 00463 00464 void CNetScheduleHandler::OnOverflow(EOverflowReason reason) 00465 { 00466 switch (reason) { 00467 case eOR_ConnectionPoolFull: 00468 ERR_POST("eCommunicationError:Connection pool full"); 00469 break; 00470 case eOR_UnpollableSocket: 00471 ERR_POST("eCommunicationError:Unpollable connection"); 00472 break; 00473 case eOR_RequestQueueFull: 00474 ERR_POST("eCommunicationError:Request queue full"); 00475 break; 00476 default: 00477 ERR_POST("eCommunicationError:Unknown overflow error"); 00478 break; 00479 } 00480 } 00481 00482 00483 void CNetScheduleHandler::OnMessage(BUF buffer) 00484 { 00485 CDiagnosticsGuard guard(this); 00486 00487 if (m_Server->ShutdownRequested()) { 00488 x_WriteMessageNoThrow("ERR:eShuttingDown:NetSchedule server is shutting down. " 00489 "Session aborted."); 00490 return; 00491 } 00492 00493 try { 00494 // Single line user input processor 00495 (this->*m_ProcessMessage)(buffer); 00496 } 00497 catch (const CNetScheduleException & ex) { 00498 x_WriteMessageNoThrow("ERR:", string(ex.GetErrCodeString()) + 00499 ":" + ex.GetMsg()); 00500 ERR_POST("NetSchedule exception: " << ex); 00501 x_PrintRequestStop(ex.ErrCodeToHTTPStatusCode()); 00502 } 00503 catch (const CNSProtoParserException & ex) { 00504 x_WriteMessageNoThrow("ERR:", string(ex.GetErrCodeString()) + 00505 ":" + ex.GetMsg()); 00506 ERR_POST("Command parser error: " << ex); 00507 x_PrintRequestStop(eStatus_BadRequest); 00508 } 00509 catch (const CNetServiceException & ex) { 00510 x_WriteMessageNoThrow("ERR:", string(ex.GetErrCodeString()) + 00511 ":" + ex.GetMsg()); 00512 ERR_POST("NetService exception: " << ex); 00513 if (ex.GetErrCode() == CNetServiceException::eCommunicationError) 00514 x_PrintRequestStop(eStatus_SocketIOError); 00515 else 00516 x_PrintRequestStop(eStatus_ServerError); 00517 } 00518 catch (const CBDB_ErrnoException & ex) { 00519 string error; 00520 if (ex.IsRecovery()) 00521 error = NStr::PrintableString("Fatal Berkeley DB error " 00522 "(DB_RUNRECOVERY). Emergency " 00523 "shutdown initiated. " + 00524 string(ex.what())); 00525 else 00526 error = NStr::PrintableString("Internal database error - " + 00527 string(ex.what())); 00528 00529 x_WriteMessageNoThrow("ERR:", "eInternalError:" + error); 00530 ERR_POST("BDB error: " << ex); 00531 x_PrintRequestStop(eStatus_ServerError); 00532 00533 if (ex.IsRecovery()) 00534 m_Server->SetShutdownFlag(); 00535 } 00536 catch (const CBDB_Exception & ex) { 00537 string error = NStr::PrintableString("eInternalError:Internal " 00538 "database (BDB) error - " + 00539 string(ex.what())); 00540 x_WriteMessageNoThrow("ERR:", error); 00541 ERR_POST("BDB error: " << ex); 00542 x_PrintRequestStop(eStatus_ServerError); 00543 } 00544 catch (const exception & ex) { 00545 string error = NStr::PrintableString("eInternalError:Internal " 00546 "error - " + string(ex.what())); 00547 x_WriteMessageNoThrow("ERR:", error); 00548 ERR_POST("STL exception: " << ex.what()); 00549 x_PrintRequestStop(eStatus_ServerError); 00550 } 00551 catch (...) { 00552 x_WriteMessageNoThrow("ERR:eInternalError:Unknown server exception."); 00553 ERR_POST("ERR:Unknown server exception."); 00554 x_PrintRequestStop(eStatus_ServerError); 00555 } 00556 } 00557 00558 00559 void CNetScheduleHandler::x_SetQuickAcknowledge(void) 00560 { 00561 int fd = 0; 00562 int val = 1; 00563 00564 GetSocket().GetOSHandle(&fd, sizeof(fd)); 00565 setsockopt(fd, IPPROTO_TCP, TCP_QUICKACK, &val, sizeof(val)); 00566 } 00567 00568 00569 void CNetScheduleHandler::WriteMessage(CTempString prefix, 00570 CTempString msg) 00571 { 00572 size_t prefix_size = prefix.size(); 00573 size_t msg_size = msg.size(); 00574 if (msg[msg_size-1] == '\n') 00575 --msg_size; 00576 size_t required_size = prefix_size + msg_size + 1; 00577 00578 if (required_size > m_MsgBufferSize) { 00579 delete [] m_MsgBuffer; 00580 while (required_size > m_MsgBufferSize) 00581 m_MsgBufferSize += kMessageBufferIncrement; 00582 m_MsgBuffer = new char[m_MsgBufferSize]; 00583 } 00584 00585 memcpy(m_MsgBuffer, prefix.data(), prefix_size); 00586 memcpy(m_MsgBuffer + prefix_size, msg.data(), msg_size); 00587 m_MsgBuffer[required_size-1] = '\n'; 00588 00589 // Write to the socket as a single transaction 00590 size_t n_written; 00591 if (GetSocket().Write(m_MsgBuffer, required_size, 00592 &n_written) != eIO_Success) { 00593 NCBI_THROW(CNetServiceException, 00594 eCommunicationError, "Socket write error; peer: " + 00595 GetSocket().GetPeerAddress()); 00596 } 00597 } 00598 00599 00600 void CNetScheduleHandler::WriteMessage(CTempString msg) 00601 { 00602 size_t msg_size = msg.size(); 00603 if (msg[msg_size-1] == '\n') 00604 --msg_size; 00605 size_t required_size = msg_size + 1; 00606 00607 if (required_size > m_MsgBufferSize) { 00608 delete [] m_MsgBuffer; 00609 while (required_size > m_MsgBufferSize) 00610 m_MsgBufferSize += kMessageBufferIncrement; 00611 m_MsgBuffer = new char[m_MsgBufferSize]; 00612 } 00613 00614 memcpy(m_MsgBuffer, msg.data(), msg_size); 00615 m_MsgBuffer[required_size-1] = '\n'; 00616 00617 // Write to the socket as a single transaction 00618 size_t n_written; 00619 if (GetSocket().Write(m_MsgBuffer, required_size, 00620 &n_written) != eIO_Success) { 00621 NCBI_THROW(CNetServiceException, 00622 eCommunicationError, "Socket write error; peer: " + 00623 GetSocket().GetPeerAddress()); 00624 } 00625 } 00626 00627 00628 unsigned int CNetScheduleHandler::x_WriteMessageNoThrow(CTempString prefix, 00629 CTempString msg) 00630 { 00631 try { 00632 WriteMessage(prefix, msg); 00633 } 00634 catch (CNetServiceException& ex) { 00635 ERR_POST(ex); 00636 return eStatus_SocketIOError; 00637 } 00638 catch (...) { 00639 ERR_POST("Unknown exception while writing into a socket."); 00640 return eStatus_ServerError; 00641 } 00642 return eStatus_OK; 00643 } 00644 00645 unsigned int CNetScheduleHandler::x_WriteMessageNoThrow(CTempString msg) 00646 { 00647 try { 00648 WriteMessage(msg); 00649 } 00650 catch (CNetServiceException& ex) { 00651 ERR_POST(ex); 00652 return eStatus_SocketIOError; 00653 } 00654 catch (...) { 00655 ERR_POST("Unknown exception while writing into a socket."); 00656 return eStatus_ServerError; 00657 } 00658 return eStatus_OK; 00659 } 00660 00661 00662 void CNetScheduleHandler::InitDiagnostics(void) 00663 { 00664 if (m_DiagContext.NotNull()) { 00665 CDiagContext::SetRequestContext(m_DiagContext); 00666 } 00667 else if (m_ConnContext.NotNull()) { 00668 CDiagContext::SetRequestContext(m_ConnContext); 00669 } 00670 } 00671 00672 00673 void CNetScheduleHandler::ResetDiagnostics(void) 00674 { 00675 CDiagContext::SetRequestContext(NULL); 00676 } 00677 00678 00679 unsigned int CNetScheduleHandler::x_GetPeerAddress(void) 00680 { 00681 unsigned int peer_addr; 00682 00683 GetSocket().GetPeerAddress(&peer_addr, 0, eNH_NetworkByteOrder); 00684 00685 // always use localhost(127.0*) address for clients coming from 00686 // the same net address (sometimes it can be 127.* or full address) 00687 if (peer_addr == m_Server->GetHostNetAddr()) 00688 return CSocketAPI::GetLoopbackAddress(); 00689 return peer_addr; 00690 } 00691 00692 00693 void CNetScheduleHandler::x_ProcessMsgAuth(BUF buffer) 00694 { 00695 // This should only memorize the received string. 00696 // The x_ProcessMsgQueue(...) will parse it. 00697 // This is done to avoid copying parsed parameters and to have exactly one 00698 // diagnostics extra with both auth parameters and queue name. 00699 s_ReadBufToString(buffer, m_RawAuthString); 00700 00701 // Check if it was systems probe... 00702 if (strncmp(m_RawAuthString.c_str(), "GET / HTTP/1.", 13) == 0) { 00703 // That was systems probing ports 00704 00705 if (m_ConnContext.NotNull()) 00706 m_ConnContext->SetRequestStatus(eStatus_HTTPProbe); 00707 00708 m_Server->CloseConnection(&GetSocket()); 00709 return; 00710 } 00711 00712 m_ProcessMessage = &CNetScheduleHandler::x_ProcessMsgQueue; 00713 x_SetQuickAcknowledge(); 00714 } 00715 00716 00717 void CNetScheduleHandler::x_ProcessMsgQueue(BUF buffer) 00718 { 00719 s_ReadBufToString(buffer, m_QueueName); 00720 00721 // Parse saved raw authorization string and produce log output 00722 TNSProtoParams params; 00723 bool parse_failed = false; 00724 00725 try { 00726 m_SingleCmdParser.ParseArguments(m_RawAuthString, s_AuthArgs, ¶ms); 00727 } 00728 catch (CNSProtoParserException & ex) { 00729 ERR_POST("Error authenticating client: '" << 00730 m_RawAuthString << "': " << ex); 00731 parse_failed = true; 00732 } 00733 00734 // Memorize what we know about the client 00735 try { 00736 m_ClientId.Update(this->x_GetPeerAddress(), params); 00737 if (parse_failed) 00738 m_ClientId.SetClientName(m_RawAuthString); 00739 } 00740 catch (const CNetScheduleException & ex) { 00741 if (ex.GetErrCode() == CNetScheduleException::eAuthenticationError) { 00742 x_WriteMessageNoThrow("ERR:", string(ex.GetErrCodeString()) + 00743 ":" + ex.GetMsg()); 00744 ERR_POST("Server error: " << ex); 00745 if (m_Server->IsLog()) { 00746 CDiagContext::GetRequestContext().SetRequestStatus( 00747 ex.ErrCodeToHTTPStatusCode()); 00748 GetDiagContext().PrintRequestStop(); 00749 m_ConnContext.Reset(); 00750 } 00751 m_Server->CloseConnection(&GetSocket()); 00752 return; 00753 } 00754 throw; 00755 } 00756 00757 00758 if (m_Server->AdminHostValid(m_ClientId.GetAddress()) && 00759 m_Server->IsAdminClientName(m_ClientId.GetClientName())) 00760 { 00761 // TODO: queue admin should be checked in ProcessMsgQueue, 00762 // when queue info is available 00763 m_ClientId.AddCapability(eNSAC_Admin | eNSAC_QueueAdmin); 00764 } 00765 00766 // Produce the log output if required 00767 if (m_Server->IsLog()) { 00768 CDiagContext_Extra diag_extra = GetDiagContext().Extra(); 00769 diag_extra.Print("queue", m_QueueName); 00770 ITERATE(TNSProtoParams, it, params) { 00771 diag_extra.Print(it->first, it->second); 00772 } 00773 } 00774 00775 // Empty queue name is a synonim for hardcoded 'noname'. 00776 // To have exactly one string comparison, make the name empty if 'noname' 00777 if (m_QueueName == "noname" ) 00778 m_QueueName = ""; 00779 00780 if (!m_QueueName.empty()) { 00781 try { 00782 m_QueueRef.Reset(m_Server->OpenQueue(m_QueueName)); 00783 } 00784 catch (const CNetScheduleException & ex) { 00785 if (ex.GetErrCode() == CNetScheduleException::eUnknownQueue) { 00786 x_WriteMessageNoThrow("ERR:", string(ex.GetErrCodeString()) + 00787 ":" + ex.GetMsg()); 00788 ERR_POST("Server error: " << ex); 00789 if (m_Server->IsLog()) { 00790 CDiagContext::GetRequestContext().SetRequestStatus(ex.ErrCodeToHTTPStatusCode()); 00791 GetDiagContext().PrintRequestStop(); 00792 m_ConnContext.Reset(); 00793 } 00794 m_Server->CloseConnection(&GetSocket()); 00795 return; 00796 } 00797 throw; 00798 } 00799 00800 m_ClientId.AddCapability(eNSAC_Queue); 00801 00802 CRef<CQueue> q(GetQueue()); 00803 if (q->IsWorkerAllowed(m_ClientId.GetAddress())) 00804 m_ClientId.AddCapability(eNSAC_Worker); 00805 if (q->IsSubmitAllowed(m_ClientId.GetAddress())) 00806 m_ClientId.AddCapability(eNSAC_Submitter); 00807 } 00808 else 00809 m_QueueRef.Reset(NULL); 00810 00811 m_ProcessMessage = &CNetScheduleHandler::x_ProcessMsgRequest; 00812 x_SetQuickAcknowledge(); 00813 } 00814 00815 00816 // Workhorse method 00817 void CNetScheduleHandler::x_ProcessMsgRequest(BUF buffer) 00818 { 00819 m_DiagContext.Reset(new CRequestContext()); 00820 m_DiagContext->SetRequestID(); 00821 InitDiagnostics(); 00822 00823 SParsedCmd cmd; 00824 string msg; 00825 try { 00826 s_ReadBufToString(buffer, msg); 00827 cmd = m_SingleCmdParser.ParseCommand(msg); 00828 x_PrintRequestStart(cmd); 00829 } 00830 catch (const CNSProtoParserException & ex) { 00831 // Parsing is done before PrintRequestStart(...) so a diag context is 00832 // not created here - create it just to provide an error output 00833 x_OnCmdParserError(true, ex.GetMsg(), ""); 00834 return; 00835 } 00836 00837 const SCommandExtra & extra = cmd.command->extra; 00838 00839 00840 // It throws an exception if the input is not valid 00841 m_CommandArguments.AssignValues(cmd.params, cmd.command->cmd); 00842 00843 00844 if (extra.processor == &CNetScheduleHandler::x_ProcessQuitSession) { 00845 m_ClientId.CheckAccess(extra.role, NULL); 00846 x_ProcessQuitSession(0); 00847 return; 00848 } 00849 00850 00851 // If the command requires queue, hold a hard reference to this 00852 // queue from a weak one (m_Queue) in queue_ref, and take C pointer 00853 // into queue_ptr. Otherwise queue_ptr is NULL, which is OK for 00854 // commands which does not require a queue. 00855 CRef<CQueue> queue_ref; 00856 CQueue * queue_ptr = NULL; 00857 if (extra.role & eNSAC_Queue) { 00858 if (m_QueueName.empty()) 00859 NCBI_THROW(CNetScheduleException, eUnknownQueue, "Job queue is required"); 00860 queue_ref.Reset(GetQueue()); 00861 queue_ptr = queue_ref.GetPointer(); 00862 } 00863 else if (extra.processor == &CNetScheduleHandler::x_ProcessStatistics || 00864 extra.processor == &CNetScheduleHandler::x_ProcessRefuseSubmits) { 00865 // The STAT and REFUSESUBMITS commands could be with or without a queue 00866 try { 00867 queue_ref.Reset(GetQueue()); 00868 queue_ptr = queue_ref.GetPointer(); 00869 } 00870 catch (...) { 00871 // That means no queue were found, i.e. 00872 // the STAT or REFUSESUBMITS is for whole server 00873 } 00874 } 00875 00876 m_ClientId.CheckAccess(extra.role, queue_ptr); 00877 00878 // we want status request to be fast, skip version control 00879 if ((extra.processor != &CNetScheduleHandler::x_ProcessStatus) && 00880 (extra.processor != &CNetScheduleHandler::x_ProcessFastStatusS) && 00881 (extra.processor != &CNetScheduleHandler::x_ProcessFastStatusW)) 00882 if (!m_ClientId.CheckVersion(queue_ptr)) { 00883 WriteMessage("ERR:eInvalidClientOrVersion:"); 00884 m_Server->CloseConnection(&GetSocket()); 00885 return; 00886 } 00887 00888 if (queue_ptr) 00889 // The cient has a queue, so memorize the client 00890 queue_ptr->TouchClientsRegistry(m_ClientId); 00891 00892 // Execute the command 00893 (this->*extra.processor)(queue_ptr); 00894 } 00895 00896 00897 // Message processors for x_ProcessSubmitBatch 00898 void CNetScheduleHandler::x_ProcessMsgBatchHeader(BUF buffer) 00899 { 00900 // Expecting BTCH size | ENDS 00901 try { 00902 string msg; 00903 s_ReadBufToString(buffer, msg); 00904 00905 SParsedCmd cmd = m_BatchHeaderParser.ParseCommand(msg); 00906 const string & size_str = cmd.params["size"]; 00907 00908 if (!size_str.empty()) 00909 m_BatchSize = NStr::StringToInt(size_str); 00910 else 00911 m_BatchSize = 0; 00912 (this->*cmd.command->extra.processor)(0); 00913 } 00914 catch (const CNSProtoParserException & ex) { 00915 m_WithinBatchSubmit = false; 00916 m_Server->DecrementCurrentSubmitsCounter(); 00917 x_OnCmdParserError(false, ex.GetMsg(), ", BTCH or ENDS expected"); 00918 return; 00919 } 00920 catch (const CNetScheduleException & ex) { 00921 m_WithinBatchSubmit = false; 00922 m_Server->DecrementCurrentSubmitsCounter(); 00923 x_WriteMessageNoThrow("ERR:", string(ex.GetErrCodeString()) + 00924 ":" + ex.GetMsg()); 00925 ERR_POST("Server error: " << ex); 00926 x_PrintRequestStop(ex.ErrCodeToHTTPStatusCode()); 00927 00928 m_BatchJobs.clear(); 00929 m_ProcessMessage = &CNetScheduleHandler::x_ProcessMsgRequest; 00930 } 00931 catch (const CException & ex) { 00932 m_WithinBatchSubmit = false; 00933 m_Server->DecrementCurrentSubmitsCounter(); 00934 x_WriteMessageNoThrow("ERR:", "eProtocolSyntaxError:" 00935 "Error processing BTCH or ENDS."); 00936 ERR_POST("Error processing command: " << ex); 00937 x_PrintRequestStop(eStatus_BadRequest); 00938 00939 m_BatchJobs.clear(); 00940 m_ProcessMessage = &CNetScheduleHandler::x_ProcessMsgRequest; 00941 } 00942 catch (...) { 00943 m_WithinBatchSubmit = false; 00944 m_Server->DecrementCurrentSubmitsCounter(); 00945 x_WriteMessageNoThrow("ERR:", "eInternalError:Unknown error " 00946 "while expecting BTCH or ENDS."); 00947 ERR_POST("Unknown error while expecting BTCH or ENDS"); 00948 x_PrintRequestStop(eStatus_BadRequest); 00949 00950 m_BatchJobs.clear(); 00951 m_ProcessMessage = &CNetScheduleHandler::x_ProcessMsgRequest; 00952 } 00953 } 00954 00955 00956 void CNetScheduleHandler::x_ProcessMsgBatchJob(BUF buffer) 00957 { 00958 // Expecting: 00959 // "input" [aff="affinity_token"] [msk=1] 00960 string msg; 00961 s_ReadBufToString(buffer, msg); 00962 00963 CJob & job = m_BatchJobs[m_BatchPos].first; 00964 TNSProtoParams params; 00965 try { 00966 m_BatchEndParser.ParseArguments(msg, s_BatchArgs, ¶ms); 00967 m_CommandArguments.AssignValues(params); 00968 } 00969 catch (const CNSProtoParserException & ex) { 00970 m_WithinBatchSubmit = false; 00971 m_Server->DecrementCurrentSubmitsCounter(); 00972 x_OnCmdParserError(false, ex.GetMsg(), ""); 00973 return; 00974 } 00975 catch (const CNetScheduleException & ex) { 00976 m_WithinBatchSubmit = false; 00977 m_Server->DecrementCurrentSubmitsCounter(); 00978 x_WriteMessageNoThrow("ERR:", string(ex.GetErrCodeString()) + 00979 ":" + ex.GetMsg()); 00980 ERR_POST(ex.GetMsg()); 00981 x_PrintRequestStop(ex.ErrCodeToHTTPStatusCode()); 00982 00983 m_BatchJobs.clear(); 00984 m_ProcessMessage = &CNetScheduleHandler::x_ProcessMsgRequest; 00985 return; 00986 } 00987 catch (const CException & ex) { 00988 m_WithinBatchSubmit = false; 00989 m_Server->DecrementCurrentSubmitsCounter(); 00990 x_WriteMessageNoThrow("ERR:", "eProtocolSyntaxError:" 00991 "Invalid batch submission, syntax error"); 00992 ERR_POST("Error processing command: " << ex); 00993 x_PrintRequestStop(eStatus_BadRequest); 00994 00995 m_BatchJobs.clear(); 00996 m_ProcessMessage = &CNetScheduleHandler::x_ProcessMsgRequest; 00997 return; 00998 } 00999 catch (...) { 01000 m_WithinBatchSubmit = false; 01001 m_Server->DecrementCurrentSubmitsCounter(); 01002 x_WriteMessageNoThrow("ERR:", "eProtocolSyntaxError:" 01003 "Arguments parsing unknown exception"); 01004 ERR_POST("Arguments parsing unknown exception. Batch submit is rejected."); 01005 x_PrintRequestStop(eStatus_BadRequest); 01006 01007 m_BatchJobs.clear(); 01008 m_ProcessMessage = &CNetScheduleHandler::x_ProcessMsgRequest; 01009 return; 01010 } 01011 01012 job.SetInput(NStr::ParseEscapes(m_CommandArguments.input)); 01013 01014 // Memorize the job affinity if given 01015 if ( !m_CommandArguments.affinity_token.empty() ) 01016 m_BatchJobs[m_BatchPos].second = 01017 NStr::ParseEscapes(m_CommandArguments.affinity_token); 01018 01019 job.SetMask(m_CommandArguments.job_mask); 01020 job.SetSubmNotifPort(m_BatchSubmPort); 01021 job.SetSubmNotifTimeout(m_BatchSubmTimeout); 01022 job.SetClientIP(m_BatchClientIP); 01023 job.SetClientSID(m_BatchClientSID); 01024 01025 if (++m_BatchPos >= m_BatchSize) 01026 m_ProcessMessage = &CNetScheduleHandler::x_ProcessMsgBatchSubmit; 01027 } 01028 01029 01030 void CNetScheduleHandler::x_ProcessMsgBatchSubmit(BUF buffer) 01031 { 01032 // Expecting ENDB 01033 try { 01034 string msg; 01035 s_ReadBufToString(buffer, msg); 01036 m_BatchEndParser.ParseCommand(msg); 01037 } 01038 catch (const CNSProtoParserException & ex) { 01039 m_WithinBatchSubmit = false; 01040 m_Server->DecrementCurrentSubmitsCounter(); 01041 x_OnCmdParserError(false, ex.GetMsg(), ", ENDB expected"); 01042 return; 01043 } 01044 catch (const CException & ex) { 01045 m_WithinBatchSubmit = false; 01046 m_Server->DecrementCurrentSubmitsCounter(); 01047 BUF_Read(buffer, 0, BUF_Size(buffer)); 01048 x_WriteMessageNoThrow("ERR:", "eProtocolSyntaxError:" 01049 "Batch submit error - unexpected end of batch"); 01050 ERR_POST("Error processing command: " << ex); 01051 x_PrintRequestStop(eStatus_BadRequest); 01052 01053 m_BatchJobs.clear(); 01054 m_ProcessMessage = &CNetScheduleHandler::x_ProcessMsgRequest; 01055 return; 01056 } 01057 catch (...) { 01058 m_WithinBatchSubmit = false; 01059 m_Server->DecrementCurrentSubmitsCounter(); 01060 x_WriteMessageNoThrow("ERR:", "eInternalError:" 01061 "Unknown error while expecting ENDB."); 01062 ERR_POST("Unknown error while expecting ENDB."); 01063 x_PrintRequestStop(eStatus_BadRequest); 01064 01065 m_BatchJobs.clear(); 01066 m_ProcessMessage = &CNetScheduleHandler::x_ProcessMsgRequest; 01067 return; 01068 } 01069 01070 double comm_elapsed = m_BatchStopWatch.Elapsed(); 01071 01072 // BTCH logging is in a separate context 01073 CRef<CRequestContext> current_context(& CDiagContext::GetRequestContext()); 01074 try { 01075 if (m_Server->IsLog()) { 01076 CRequestContext * ctx(new CRequestContext()); 01077 ctx->SetRequestID(); 01078 GetDiagContext().SetRequestContext(ctx); 01079 GetDiagContext().PrintRequestStart() 01080 .Print("_type", "cmd") 01081 .Print("bsub", m_DiagContext->GetRequestID()) 01082 .Print("cmd", "BTCH") 01083 .Print("size", m_BatchJobs.size()); 01084 ctx->SetRequestStatus(CNetScheduleHandler::eStatus_OK); 01085 } 01086 01087 // we have our batch now 01088 CStopWatch sw(CStopWatch::eStart); 01089 unsigned job_id = GetQueue()->SubmitBatch(m_ClientId, 01090 m_BatchJobs, 01091 m_BatchGroup); 01092 double db_elapsed = sw.Elapsed(); 01093 01094 if (m_Server->IsLog()) 01095 GetDiagContext().Extra() 01096 .Print("start_job", job_id) 01097 .Print("commit_time", 01098 NStr::DoubleToString(comm_elapsed, 4, NStr::fDoubleFixed)) 01099 .Print("transaction_time", 01100 NStr::DoubleToString(db_elapsed, 4, NStr::fDoubleFixed)); 01101 01102 WriteMessage("OK:", NStr::UIntToString(job_id) + " " + 01103 m_Server->GetHost().c_str() + " " + 01104 NStr::UIntToString(unsigned(m_Server->GetPort()))); 01105 01106 } 01107 catch (const CNetScheduleException & ex) { 01108 m_WithinBatchSubmit = false; 01109 m_Server->DecrementCurrentSubmitsCounter(); 01110 if (m_Server->IsLog()) { 01111 CDiagContext::GetRequestContext().SetRequestStatus(ex.ErrCodeToHTTPStatusCode()); 01112 GetDiagContext().PrintRequestStop(); 01113 GetDiagContext().SetRequestContext(current_context); 01114 } 01115 m_BatchJobs.clear(); 01116 m_ProcessMessage = &CNetScheduleHandler::x_ProcessMsgRequest; 01117 throw; 01118 } 01119 catch (...) { 01120 m_WithinBatchSubmit = false; 01121 m_Server->DecrementCurrentSubmitsCounter(); 01122 if (m_Server->IsLog()) { 01123 CDiagContext::GetRequestContext().SetRequestStatus(eStatus_ServerError); 01124 GetDiagContext().PrintRequestStop(); 01125 GetDiagContext().SetRequestContext(current_context); 01126 } 01127 m_BatchJobs.clear(); 01128 m_ProcessMessage = &CNetScheduleHandler::x_ProcessMsgRequest; 01129 throw; 01130 } 01131 01132 if (m_Server->IsLog()) { 01133 GetDiagContext().PrintRequestStop(); 01134 GetDiagContext().SetRequestContext(current_context); 01135 } 01136 01137 m_ProcessMessage = &CNetScheduleHandler::x_ProcessMsgBatchHeader; 01138 } 01139 01140 01141 ////////////////////////////////////////////////////////////////////////// 01142 // Process* methods for processing commands 01143 01144 void CNetScheduleHandler::x_ProcessFastStatusS(CQueue* q) 01145 { 01146 bool cmdv2(m_CommandArguments.cmd == "SST2"); 01147 time_t lifetime; 01148 TJobStatus status = q->GetStatusAndLifetime(m_CommandArguments.job_id, 01149 true, &lifetime); 01150 01151 01152 if (status == CNetScheduleAPI::eJobNotFound) { 01153 if (cmdv2) 01154 WriteMessage("ERR:eJobNotFound:"); 01155 else 01156 WriteMessage("OK:", NStr::IntToString((int) status)); 01157 01158 ERR_POST(Warning << m_CommandArguments.cmd << " for unknown job: " 01159 << m_CommandArguments.job_key); 01160 x_PrintRequestStop(eStatus_NotFound); 01161 } else { 01162 if (cmdv2) 01163 WriteMessage( 01164 "OK:job_status=" + 01165 CNetScheduleAPI::StatusToString(status) + 01166 "&job_exptime=" + NStr::NumericToString(lifetime) 01167 ); 01168 else 01169 WriteMessage("OK:", NStr::IntToString((int) status)); 01170 x_PrintRequestStop(eStatus_OK); 01171 } 01172 } 01173 01174 01175 void CNetScheduleHandler::x_ProcessFastStatusW(CQueue* q) 01176 { 01177 bool cmdv2(m_CommandArguments.cmd == "WST2"); 01178 time_t lifetime; 01179 TJobStatus status = q->GetStatusAndLifetime(m_CommandArguments.job_id, 01180 false, &lifetime); 01181 01182 01183 if (status == CNetScheduleAPI::eJobNotFound) { 01184 if (cmdv2) 01185 WriteMessage("ERR:eJobNotFound:"); 01186 else 01187 WriteMessage("OK:", NStr::IntToString((int) status)); 01188 01189 ERR_POST(Warning << m_CommandArguments.cmd << " for unknown job: " 01190 << m_CommandArguments.job_key); 01191 x_PrintRequestStop(eStatus_NotFound); 01192 } else { 01193 if (cmdv2) 01194 WriteMessage( 01195 "OK:job_status=" + 01196 CNetScheduleAPI::StatusToString(status) + 01197 "&job_exptime=" + NStr::NumericToString(lifetime) 01198 ); 01199 else 01200 WriteMessage("OK:", NStr::IntToString((int) status)); 01201 x_PrintRequestStop(eStatus_OK); 01202 } 01203 } 01204 01205 01206 void CNetScheduleHandler::x_ProcessChangeAffinity(CQueue* q) 01207 { 01208 // This functionality requires client name and the session 01209 x_CheckNonAnonymousClient("use CHAFF command"); 01210 01211 if (m_CommandArguments.aff_to_add.empty() && 01212 m_CommandArguments.aff_to_del.empty()) { 01213 ERR_POST(Warning << "CHAFF with neither add list nor del list"); 01214 x_WriteMessageNoThrow("ERR:eInvalidParameter:"); 01215 x_PrintRequestStop(eStatus_BadRequest); 01216 return; 01217 } 01218 01219 list<string> aff_to_add_list; 01220 list<string> aff_to_del_list; 01221 01222 NStr::Split(NStr::ParseEscapes(m_CommandArguments.aff_to_add), 01223 "\t,", aff_to_add_list, NStr::eNoMergeDelims); 01224 NStr::Split(NStr::ParseEscapes(m_CommandArguments.aff_to_del), 01225 "\t,", aff_to_del_list, NStr::eNoMergeDelims); 01226 01227 string msg = q->ChangeAffinity(m_ClientId, aff_to_add_list, 01228 aff_to_del_list); 01229 if (msg.empty()) 01230 WriteMessage("OK:"); 01231 else 01232 WriteMessage("OK:WARNING:", msg + ";"); 01233 x_PrintRequestStop(eStatus_OK); 01234 } 01235 01236 01237 void CNetScheduleHandler::x_ProcessSubmit(CQueue* q) 01238 { 01239 if (q->GetRefuseSubmits() || m_Server->GetRefuseSubmits()) { 01240 WriteMessage("ERR:eSubmitsDisabled:"); 01241 x_PrintRequestStop(eStatus_SubmitRefused); 01242 return; 01243 } 01244 01245 if (m_Server->IncrementCurrentSubmitsCounter() < kSubmitCounterInitialValue) { 01246 // This is a drained shutdown mode 01247 m_Server->DecrementCurrentSubmitsCounter(); 01248 WriteMessage("ERR:eSubmitsDisabled:"); 01249 x_PrintRequestStop(eStatus_SubmitRefused); 01250 return; 01251 } 01252 01253 01254 CJob job(m_CommandArguments); 01255 01256 // Never leave Client IP empty, if we're not provided with a real one, 01257 // use peer address as a last resort. See also x_ProcessSubmitBatch. 01258 if (m_CommandArguments.ip.empty()) 01259 job.SetClientIP(NS_FormatIPAddress(m_ClientId.GetAddress())); 01260 01261 try { 01262 WriteMessage("OK:", q->MakeKey(q->Submit(m_ClientId, job, 01263 m_CommandArguments.affinity_token, 01264 m_CommandArguments.group))); 01265 m_Server->DecrementCurrentSubmitsCounter(); 01266 } catch (...) { 01267 m_Server->DecrementCurrentSubmitsCounter(); 01268 throw; 01269 } 01270 01271 // There is no need to log the job key, it is logged at lower level 01272 // together with all the submitted job parameters 01273 x_PrintRequestStop(eStatus_OK); 01274 } 01275 01276 01277 void CNetScheduleHandler::x_ProcessSubmitBatch(CQueue* q) 01278 { 01279 if (q->GetRefuseSubmits() || m_Server->GetRefuseSubmits()) { 01280 WriteMessage("ERR:eSubmitsDisabled:"); 01281 x_PrintRequestStop(eStatus_SubmitRefused); 01282 return; 01283 } 01284 01285 if (m_Server->IncrementCurrentSubmitsCounter() < kSubmitCounterInitialValue) { 01286 // This is a drained shutdown mode 01287 m_Server->DecrementCurrentSubmitsCounter(); 01288 WriteMessage("ERR:eSubmitsDisabled:"); 01289 x_PrintRequestStop(eStatus_SubmitRefused); 01290 return; 01291 } 01292 01293 try { 01294 // Memorize the fact that batch submit started 01295 m_WithinBatchSubmit = true; 01296 01297 m_BatchSubmPort = m_CommandArguments.port; 01298 m_BatchSubmTimeout = m_CommandArguments.timeout; 01299 if (!m_CommandArguments.ip.empty()) 01300 m_BatchClientIP = m_CommandArguments.ip; 01301 else 01302 m_BatchClientIP = NS_FormatIPAddress(m_ClientId.GetAddress()); 01303 m_BatchClientSID = m_CommandArguments.sid; 01304 m_BatchGroup = m_CommandArguments.group; 01305 01306 WriteMessage("OK:Batch submit ready"); 01307 m_ProcessMessage = &CNetScheduleHandler::x_ProcessMsgBatchHeader; 01308 } 01309 catch (...) { 01310 // WriteMessage can generate an exception 01311 m_WithinBatchSubmit = false; 01312 m_Server->DecrementCurrentSubmitsCounter(); 01313 m_ProcessMessage = &CNetScheduleHandler::x_ProcessMsgRequest; 01314 throw; 01315 } 01316 } 01317 01318 01319 void CNetScheduleHandler::x_ProcessBatchStart(CQueue*) 01320 { 01321 m_BatchPos = 0; 01322 m_BatchStopWatch.Restart(); 01323 m_BatchJobs.resize(m_BatchSize); 01324 if (m_BatchSize) 01325 m_ProcessMessage = &CNetScheduleHandler::x_ProcessMsgBatchJob; 01326 else 01327 // Unfortunately, because batches can be generated by 01328 // client program, we better honor zero size ones. 01329 // Skip right to waiting for 'ENDB'. 01330 m_ProcessMessage = &CNetScheduleHandler::x_ProcessMsgBatchSubmit; 01331 } 01332 01333 01334 void CNetScheduleHandler::x_ProcessBatchSequenceEnd(CQueue*) 01335 { 01336 m_WithinBatchSubmit = false; 01337 m_Server->DecrementCurrentSubmitsCounter(); 01338 m_BatchJobs.clear(); 01339 m_ProcessMessage = &CNetScheduleHandler::x_ProcessMsgRequest; 01340 WriteMessage("OK:"); 01341 x_PrintRequestStop(eStatus_OK); 01342 } 01343 01344 01345 void CNetScheduleHandler::x_ProcessCancel(CQueue* q) 01346 { 01347 // Job key or a group must be given 01348 if (m_CommandArguments.job_id == 0 && m_CommandArguments.group.empty()) { 01349 x_WriteMessageNoThrow("ERR:eInvalidParameter:" 01350 "Job key or group must be given"); 01351 LOG_POST(Message << Warning << "CANCEL must have a job key or a group"); 01352 x_PrintRequestStop(eStatus_BadRequest); 01353 return; 01354 } 01355 01356 if (m_CommandArguments.job_id != 0 && !m_CommandArguments.group.empty()) { 01357 x_WriteMessageNoThrow("ERR:eInvalidParameter:" 01358 "CANCEL can accept a job key or " 01359 "a group but not both"); 01360 LOG_POST(Message << Warning << "CANCEL must have only one " 01361 "argument - a job key or a group"); 01362 x_PrintRequestStop(eStatus_BadRequest); 01363 return; 01364 } 01365 01366 // Here: one argument is given - a job key or a group 01367 if (!m_CommandArguments.group.empty()) { 01368 // CANCEL for a group 01369 q->CancelGroup(m_ClientId, m_CommandArguments.group); 01370 WriteMessage("OK:"); 01371 x_PrintRequestStop(eStatus_OK); 01372 return; 01373 } 01374 01375 // Here: CANCEL for a job 01376 switch (q->Cancel(m_ClientId, m_CommandArguments.job_id)) { 01377 case CNetScheduleAPI::eJobNotFound: 01378 x_WriteMessageNoThrow("OK:WARNING:Job not found;"); 01379 ERR_POST(Warning << "CANCEL for unknown job: " 01380 << m_CommandArguments.job_key); 01381 x_PrintRequestStop(eStatus_NotFound); 01382 break; 01383 case CNetScheduleAPI::eCanceled: 01384 WriteMessage("OK:WARNING:Already canceled;"); 01385 x_PrintRequestStop(eStatus_OK); 01386 break; 01387 default: 01388 WriteMessage("OK:"); 01389 x_PrintRequestStop(eStatus_OK); 01390 } 01391 } 01392 01393 01394 void CNetScheduleHandler::x_ProcessStatus(CQueue* q) 01395 { 01396 CJob job; 01397 bool cmdv2 = (m_CommandArguments.cmd == "STATUS2"); 01398 time_t lifetime; 01399 01400 if (q->ReadAndTouchJob(m_CommandArguments.job_id, job, &lifetime) == 01401 CNetScheduleAPI::eJobNotFound) { 01402 // Here: there is no such a job 01403 if (cmdv2) 01404 x_WriteMessageNoThrow("ERR:eJobNotFound:"); 01405 else 01406 x_WriteMessageNoThrow("OK:", 01407 NStr::IntToString((int) CNetScheduleAPI::eJobNotFound)); 01408 ERR_POST(Warning << m_CommandArguments.cmd << " for unknown job: " 01409 << m_CommandArguments.job_key); 01410 x_PrintRequestStop(eStatus_NotFound); 01411 return; 01412 } 01413 01414 // Here: the job was found 01415 if (cmdv2) 01416 WriteMessage("OK:", 01417 "job_status=" + CNetScheduleAPI::StatusToString(job.GetStatus()) + 01418 "&job_exptime=" + NStr::NumericToString(lifetime) + 01419 "&ret_code=" + NStr::IntToString(job.GetRetCode()) + 01420 "&output=" + NStr::URLEncode(job.GetOutput()) + 01421 "&err_msg=" + NStr::URLEncode(job.GetErrorMsg()) + 01422 "&input=" + NStr::URLEncode(job.GetInput()) 01423 ); 01424 01425 else 01426 WriteMessage("OK:", NStr::IntToString((int) job.GetStatus()) + 01427 " " + NStr::IntToString(job.GetRetCode()) + 01428 " \"" + NStr::PrintableString(job.GetOutput()) + 01429 "\" \"" + NStr::PrintableString(job.GetErrorMsg()) + 01430 "\" \"" + NStr::PrintableString(job.GetInput()) + 01431 "\""); 01432 x_PrintRequestStop(eStatus_OK); 01433 } 01434 01435 01436 void CNetScheduleHandler::x_ProcessGetJob(CQueue* q) 01437 { 01438 // GET & WGET are first versions of the command 01439 bool cmdv2(m_CommandArguments.cmd == "GET2"); 01440 01441 if (cmdv2) { 01442 x_CheckNonAnonymousClient("use GET2 command"); 01443 x_CheckPortAndTimeout(); 01444 x_CheckGetParameters(); 01445 } 01446 else { 01447 // The affinity options are only for the second version of the command 01448 m_CommandArguments.wnode_affinity = false; 01449 m_CommandArguments.exclusive_new_aff = false; 01450 01451 // The old clients must have any_affinity set to true 01452 // depending on the explicit affinity - to conform the old behavior 01453 m_CommandArguments.any_affinity = m_CommandArguments.affinity_token.empty(); 01454 } 01455 01456 list<string> aff_list; 01457 NStr::Split(NStr::ParseEscapes(m_CommandArguments.affinity_token), 01458 "\t,", aff_list, NStr::eNoMergeDelims); 01459 01460 CJob job; 01461 if (q->GetJobOrWait(m_ClientId, 01462 m_CommandArguments.port, 01463 m_CommandArguments.timeout, 01464 time(0), &aff_list, 01465 m_CommandArguments.wnode_affinity, 01466 m_CommandArguments.any_affinity, 01467 m_CommandArguments.exclusive_new_aff, 01468 cmdv2, 01469 &job) == false) { 01470 // Preferred affinities were reset for the client, so no job 01471 // and bad request 01472 WriteMessage("ERR:ePrefAffExpired:"); 01473 x_PrintRequestStop(eStatus_BadRequest); 01474 } else { 01475 x_PrintGetJobResponse(q, job, cmdv2); 01476 x_PrintRequestStop(eStatus_OK); 01477 } 01478 return; 01479 } 01480 01481 01482 void CNetScheduleHandler::x_ProcessCancelWaitGet(CQueue* q) 01483 { 01484 x_CheckNonAnonymousClient("cancel waiting after WGET"); 01485 01486 q->CancelWaitGet(m_ClientId); 01487 WriteMessage("OK:"); 01488 x_PrintRequestStop(eStatus_OK); 01489 } 01490 01491 01492 void CNetScheduleHandler::x_ProcessPut(CQueue* q) 01493 { 01494 bool cmdv2(m_CommandArguments.cmd == "PUT2"); 01495 01496 if (cmdv2) { 01497 x_CheckNonAnonymousClient("use PUT2 command"); 01498 x_CheckAuthorizationToken(); 01499 } 01500 01501 string output = NStr::ParseEscapes(m_CommandArguments.output); 01502 TJobStatus old_status = q->PutResult(m_ClientId, time(0), 01503 m_CommandArguments.job_id, 01504 m_CommandArguments.auth_token, 01505 m_CommandArguments.job_return_code, 01506 &output); 01507 if (old_status == CNetScheduleAPI::ePending || 01508 old_status == CNetScheduleAPI::eRunning) { 01509 WriteMessage("OK:"); 01510 x_PrintRequestStop(eStatus_OK); 01511 return; 01512 } 01513 if (old_status == CNetScheduleAPI::eDone) { 01514 WriteMessage("OK:WARNING:Already done;"); 01515 ERR_POST(Warning << "Cannot accept job " 01516 << m_CommandArguments.job_key 01517 << " results. The job has already been done."); 01518 x_PrintRequestStop(eStatus_OK); 01519 return; 01520 } 01521 if (old_status == CNetScheduleAPI::eJobNotFound) { 01522 x_WriteMessageNoThrow("ERR:eJobNotFound:"); 01523 ERR_POST(Warning << "Cannot accept job " 01524 << m_CommandArguments.job_key 01525 << " results. The job is unknown"); 01526 x_PrintRequestStop(eStatus_NotFound); 01527 return; 01528 } 01529 01530 // Here: invalid job status, nothing will be done 01531 x_WriteMessageNoThrow("ERR:eInvalidJobStatus:" 01532 "Cannot accept job results; job is in " + 01533 CNetScheduleAPI::StatusToString(old_status) + 01534 " state"); 01535 ERR_POST(Warning << "Cannot accept job " 01536 << m_CommandArguments.job_key 01537 << " results; job is in " 01538 << CNetScheduleAPI::StatusToString(old_status) 01539 << " state"); 01540 x_PrintRequestStop(eStatus_InvalidJobStatus); 01541 } 01542 01543 01544 void CNetScheduleHandler::x_ProcessJobExchange(CQueue* q) 01545 { 01546 // The JXCG2 is not supported anymore, so this handler is called for 01547 // an old (obsolete) JXCG command only. 01548 // 01549 // The old clients must have any_affinity set to true 01550 // depending on the explicit affinity - to conform the old behavior 01551 m_CommandArguments.any_affinity = m_CommandArguments.affinity_token.empty(); 01552 01553 01554 time_t curr = time(0); 01555 string output = NStr::ParseEscapes(m_CommandArguments.output); 01556 01557 // PUT part 01558 TJobStatus old_status = q->PutResult(m_ClientId, curr, 01559 m_CommandArguments.job_id, 01560 m_CommandArguments.auth_token, 01561 m_CommandArguments.job_return_code, 01562 &output); 01563 01564 if (old_status == CNetScheduleAPI::eJobNotFound) { 01565 ERR_POST(Warning << "Cannot accept job " 01566 << m_CommandArguments.job_key 01567 << " results. The job is unknown"); 01568 } else if (old_status != CNetScheduleAPI::ePending && 01569 old_status != CNetScheduleAPI::eRunning) { 01570 ERR_POST(Warning << "Cannot accept job " 01571 << m_CommandArguments.job_key 01572 << " results. The job has already been done."); 01573 } 01574 01575 01576 // Get part 01577 list<string> aff_list; 01578 NStr::Split(NStr::ParseEscapes(m_CommandArguments.affinity_token), 01579 "\t,", aff_list, NStr::eNoMergeDelims); 01580 01581 CJob job; 01582 if (q->GetJobOrWait(m_ClientId, 01583 m_CommandArguments.port, 01584 m_CommandArguments.timeout, 01585 curr, &aff_list, 01586 m_CommandArguments.wnode_affinity, 01587 m_CommandArguments.any_affinity, 01588 false, 01589 false, 01590 &job) == false) { 01591 // Preferred affinities were reset for the client, so no job 01592 // and bad request 01593 WriteMessage("ERR:ePrefAffExpired:"); 01594 x_PrintRequestStop(eStatus_BadRequest); 01595 } else { 01596 x_PrintGetJobResponse(q, job, false); 01597 x_PrintRequestStop(eStatus_OK); 01598 } 01599 return; 01600 } 01601 01602 01603 void CNetScheduleHandler::x_ProcessPutMessage(CQueue* q) 01604 { 01605 if (q->PutProgressMessage(m_CommandArguments.job_id, 01606 m_CommandArguments.progress_msg)) { 01607 WriteMessage("OK:"); 01608 x_PrintRequestStop(eStatus_OK); 01609 } 01610 else { 01611 x_WriteMessageNoThrow("ERR:eJobNotFound:"); 01612 ERR_POST(Warning << "MPUT for unknown job " 01613 << m_CommandArguments.job_key); 01614 x_PrintRequestStop(eStatus_NotFound); 01615 } 01616 } 01617 01618 01619 void CNetScheduleHandler::x_ProcessGetMessage(CQueue* q) 01620 { 01621 CJob job; 01622 time_t lifetime; 01623 01624 if (q->ReadAndTouchJob(m_CommandArguments.job_id, job, &lifetime) != 01625 CNetScheduleAPI::eJobNotFound) { 01626 WriteMessage("OK:", job.GetProgressMsg()); 01627 x_PrintRequestStop(eStatus_OK); 01628 } else { 01629 x_WriteMessageNoThrow("ERR:eJobNotFound:"); 01630 ERR_POST(Warning << m_CommandArguments.cmd 01631 << "MGET for unknown job " 01632 << m_CommandArguments.job_key); 01633 x_PrintRequestStop(eStatus_NotFound); 01634 } 01635 } 01636 01637 01638 void CNetScheduleHandler::x_ProcessPutFailure(CQueue* q) 01639 { 01640 bool cmdv2(m_CommandArguments.cmd == "FPUT2"); 01641 01642 if (cmdv2) { 01643 x_CheckNonAnonymousClient("use FPUT2 command"); 01644 x_CheckAuthorizationToken(); 01645 } 01646 01647 string warning; 01648 TJobStatus old_status = q->FailJob( 01649 m_ClientId, 01650 m_CommandArguments.job_id, 01651 m_CommandArguments.auth_token, 01652 NStr::ParseEscapes(m_CommandArguments.err_msg), 01653 NStr::ParseEscapes(m_CommandArguments.output), 01654 m_CommandArguments.job_return_code, 01655 warning); 01656 01657 if (old_status == CNetScheduleAPI::eJobNotFound) { 01658 x_WriteMessageNoThrow("ERR:eJobNotFound:"); 01659 ERR_POST(Warning << "FPUT for unknown job " 01660 << m_CommandArguments.job_key); 01661 x_PrintRequestStop(eStatus_NotFound); 01662 return; 01663 } 01664 01665 if (old_status == CNetScheduleAPI::eFailed) { 01666 WriteMessage("OK:WARNING:Already failed;"); 01667 ERR_POST(Warning << "FPUT for already failed job " 01668 << m_CommandArguments.job_key); 01669 x_PrintRequestStop(eStatus_OK); 01670 return; 01671 } 01672 01673 if (old_status != CNetScheduleAPI::eRunning) { 01674 x_WriteMessageNoThrow("ERR:eInvalidJobStatus:Cannot fail job; " 01675 "job is in " + 01676 CNetScheduleAPI::StatusToString(old_status) + 01677 " state"); 01678 ERR_POST(Warning << "Cannot fail job " 01679 << m_CommandArguments.job_key 01680 << "; job is in " 01681 << CNetScheduleAPI::StatusToString(old_status) 01682 << " state"); 01683 x_PrintRequestStop(eStatus_InvalidJobStatus); 01684 return; 01685 } 01686 01687 // Here: all is fine 01688 if (warning.empty()) 01689 WriteMessage("OK:"); 01690 else 01691 WriteMessage("OK:WARNING:", warning + ";"); 01692 x_PrintRequestStop(eStatus_OK); 01693 } 01694 01695 01696 void CNetScheduleHandler::x_ProcessDropQueue(CQueue* q) 01697 { 01698 q->Truncate(); 01699 WriteMessage("OK:"); 01700 x_PrintRequestStop(eStatus_OK); 01701 } 01702 01703 01704 void CNetScheduleHandler::x_ProcessReturn(CQueue* q) 01705 { 01706 bool cmdv2(m_CommandArguments.cmd == "RETURN2"); 01707 01708 if (cmdv2) { 01709 x_CheckNonAnonymousClient("use RETURN2 command"); 01710 x_CheckAuthorizationToken(); 01711 } 01712 01713 string warning; 01714 TJobStatus old_status = q->ReturnJob(m_ClientId, 01715 m_CommandArguments.job_id, 01716 m_CommandArguments.auth_token, 01717 warning); 01718 01719 if (old_status == CNetScheduleAPI::eRunning) { 01720 if (warning.empty()) 01721 WriteMessage("OK:"); 01722 else 01723 WriteMessage("OK:WARNING:", warning + ";"); 01724 x_PrintRequestStop(eStatus_OK); 01725 return; 01726 } 01727 01728 if (old_status == CNetScheduleAPI::eJobNotFound) { 01729 x_WriteMessageNoThrow("ERR:eJobNotFound:"); 01730 ERR_POST(Warning << "RETURN for unknown job " 01731 << m_CommandArguments.job_key); 01732 x_PrintRequestStop(eStatus_NotFound); 01733 return; 01734 } 01735 01736 x_WriteMessageNoThrow("ERR:eInvalidJobStatus:Cannot return job; job is in " + 01737 CNetScheduleAPI::StatusToString(old_status) + " state"); 01738 ERR_POST(Warning << "Cannot return job " 01739 << m_CommandArguments.job_key 01740 << "; job is in " 01741 << CNetScheduleAPI::StatusToString(old_status) << " state"); 01742 01743 x_PrintRequestStop(eStatus_InvalidJobStatus); 01744 } 01745 01746 01747 void CNetScheduleHandler::x_ProcessJobDelayExpiration(CQueue* q) 01748 { 01749 if (m_CommandArguments.timeout <= 0) { 01750 x_WriteMessageNoThrow("ERR:eInvalidParameter:"); 01751 ERR_POST(Warning << "Invalid timeout " 01752 << m_CommandArguments.timeout 01753 << " in JDEX for job " 01754 << m_CommandArguments.job_key); 01755 x_PrintRequestStop(eStatus_BadRequest); 01756 return; 01757 } 01758 01759 TJobStatus status = q->JobDelayExpiration(m_CommandArguments.job_id, 01760 m_CommandArguments.timeout); 01761 01762 if (status == CNetScheduleAPI::eJobNotFound) { 01763 x_WriteMessageNoThrow("ERR:eJobNotFound:"); 01764 ERR_POST(Warning << "JDEX for unknown job " 01765 << m_CommandArguments.job_key); 01766 x_PrintRequestStop(eStatus_NotFound); 01767 return; 01768 } 01769 if (status != CNetScheduleAPI::eRunning) { 01770 x_WriteMessageNoThrow("ERR:eInvalidJobStatus:" + 01771 CNetScheduleAPI::StatusToString(status)); 01772 ERR_POST(Warning << "Cannot change expiration for job " 01773 << m_CommandArguments.job_key 01774 << " in status " 01775 << CNetScheduleAPI::StatusToString(status)); 01776 01777 x_PrintRequestStop(eStatus_InvalidJobStatus); 01778 return; 01779 } 01780 01781 // Here: the new timeout has been applied 01782 WriteMessage("OK:"); 01783 x_PrintRequestStop(eStatus_OK); 01784 } 01785 01786 01787 void CNetScheduleHandler::x_ProcessDropJob(CQueue* q) 01788 { 01789 x_ProcessCancel(q); 01790 } 01791 01792 01793 void CNetScheduleHandler::x_ProcessStatistics(CQueue* q) 01794 { 01795 CSocket & socket = GetSocket(); 01796 const string & what = m_CommandArguments.option; 01797 time_t curr = time(0); 01798 01799 if (q == NULL) { 01800 if (what == "JOBS") 01801 m_Server->PrintJobsStat(*this); 01802 else { 01803 // Transition counters for all the queues 01804 WriteMessage("OK:Started: ", m_Server->GetStartTime().AsString()); 01805 if (m_Server->GetRefuseSubmits()) 01806 WriteMessage("OK:SubmitsDisabledEffective: 1"); 01807 else 01808 WriteMessage("OK:SubmitsDisabledEffective: 0"); 01809 if (m_Server->IsDrainShutdown()) 01810 WriteMessage("OK:DrainedShutdown: 1"); 01811 else 01812 WriteMessage("OK:DrainedShutdown: 0"); 01813 m_Server->PrintTransitionCounters(*this); 01814 } 01815 WriteMessage("OK:END"); 01816 x_PrintRequestStop(eStatus_OK); 01817 return; 01818 } 01819 01820 01821 socket.DisableOSSendDelay(false); 01822 if (!what.empty() && what != "ALL") { 01823 x_StatisticsNew(q, what, curr); 01824 return; 01825 } 01826 01827 WriteMessage("OK:Started: ", m_Server->GetStartTime().AsString()); 01828 if (m_Server->GetRefuseSubmits() || q->GetRefuseSubmits()) 01829 WriteMessage("OK:SubmitsDisabledEffective: 1"); 01830 else 01831 WriteMessage("OK:SubmitsDisabledEffective: 0"); 01832 if (q->GetRefuseSubmits()) 01833 WriteMessage("OK:SubmitsDisabledPrivate: 1"); 01834 else 01835 WriteMessage("OK:SubmitsDisabledPrivate: 0"); 01836 01837 for (size_t k = 0; k < g_ValidJobStatusesSize; ++k) { 01838 TJobStatus st = g_ValidJobStatuses[k]; 01839 unsigned count = q->CountStatus(st); 01840 01841 WriteMessage("OK:", CNetScheduleAPI::StatusToString(st) + 01842 ": " + NStr::UIntToString(count)); 01843 01844 if (what == "ALL") { 01845 TNSBitVector::statistics bv_stat; 01846 q->StatusStatistics(st, &bv_stat); 01847 WriteMessage("OK:", 01848 " bit_blk=" + NStr::UIntToString(bv_stat.bit_blocks) + 01849 "; gap_blk=" + NStr::UIntToString(bv_stat.gap_blocks) + 01850 "; mem_used=" + NStr::UIntToString(bv_stat.memory_used)); 01851 } 01852 } // for 01853 01854 01855 if (what == "ALL") { 01856 WriteMessage("OK:[Berkeley DB Mutexes]:"); 01857 {{ 01858 CNcbiOstrstream ostr; 01859 01860 try { 01861 m_Server->PrintMutexStat(ostr); 01862 ostr << ends; 01863 WriteMessage("OK:", ostr.str()); 01864 } catch (...) { 01865 ostr.freeze(false); 01866 throw; 01867 } 01868 ostr.freeze(false); 01869 }} 01870 01871 WriteMessage("OK:[Berkeley DB Locks]:"); 01872 {{ 01873 CNcbiOstrstream ostr; 01874 01875 try { 01876 m_Server->PrintLockStat(ostr); 01877 ostr << ends; 01878 WriteMessage("OK:", ostr.str()); 01879 } catch (...) { 01880 ostr.freeze(false); 01881 throw; 01882 } 01883 ostr.freeze(false); 01884 }} 01885 01886 WriteMessage("OK:[Berkeley DB Memory Usage]:"); 01887 {{ 01888 CNcbiOstrstream ostr; 01889 01890 try { 01891 m_Server->PrintMemStat(ostr); 01892 ostr << ends; 01893 WriteMessage("OK:", ostr.str()); 01894 } catch (...) { 01895 ostr.freeze(false); 01896 throw; 01897 } 01898 ostr.freeze(false); 01899 }} 01900 01901 WriteMessage("OK:[BitVector block pool]:"); 01902 {{ 01903 const TBlockAlloc::TBucketPool::TBucketVector& bv = 01904 TBlockAlloc::GetPoolVector(); 01905 size_t pool_vec_size = bv.size(); 01906 01907 WriteMessage("OK:Pool vector size: ", 01908 NStr::SizetToString(pool_vec_size)); 01909 01910 for (size_t i = 0; i < pool_vec_size; ++i) { 01911 const TBlockAlloc::TBucketPool::TResourcePool* rp = 01912 TBlockAlloc::GetPool(i); 01913 if (rp) { 01914 size_t pool_size = rp->GetSize(); 01915 if (pool_size) { 01916 WriteMessage("OK:Pool [ " + NStr::SizetToString(i) + 01917 "] = " + NStr::SizetToString(pool_size)); 01918 } 01919 } 01920 } 01921 }} 01922 } 01923 01924 WriteMessage("OK:[Configured job submitters]:"); 01925 q->PrintSubmHosts(*this); 01926 01927 WriteMessage("OK:[Configured workers]:"); 01928 q->PrintWNodeHosts(*this); 01929 01930 WriteMessage("OK:[Transitions counters]:"); 01931 q->PrintTransitionCounters(*this); 01932 01933 WriteMessage("OK:END"); 01934 x_PrintRequestStop(eStatus_OK); 01935 } 01936 01937 01938 void CNetScheduleHandler::x_ProcessReloadConfig(CQueue* q) 01939 { 01940 CNcbiApplication * app = CNcbiApplication::Instance(); 01941 bool reloaded = app->ReloadConfig( 01942 CMetaRegistry::fReloadIfChanged); 01943 01944 if (reloaded) { 01945 const CNcbiRegistry & reg = app->GetConfig(); 01946 01947 m_Server->Configure(reg); 01948 01949 // Logging from the [server] section 01950 SNS_Parameters params; 01951 01952 params.Read(reg, "server"); 01953 m_Server->SetNSParameters(params, true); 01954 01955 WriteMessage("OK:"); 01956 } 01957 else 01958 WriteMessage("OK:WARNING:Configuration file has not " 01959 "been changed, RECO ignored;"); 01960 01961 x_PrintRequestStop(eStatus_OK); 01962 } 01963 01964 01965 void CNetScheduleHandler::x_ProcessActiveCount(CQueue* q) 01966 { 01967 string active_jobs = NStr::UIntToString(m_Server->CountActiveJobs()); 01968 01969 WriteMessage("OK:", active_jobs); 01970 x_PrintRequestStop(eStatus_OK); 01971 } 01972 01973 01974 void CNetScheduleHandler::x_ProcessDump(CQueue* q) 01975 { 01976 if (m_CommandArguments.job_id == 0) { 01977 // The whole queue dump, may be restricted by one state 01978 if (m_CommandArguments.job_status == CNetScheduleAPI::eJobNotFound && 01979 !m_CommandArguments.job_status_string.empty()) { 01980 // The state parameter was provided but it was not possible to 01981 // convert it into a valid job state 01982 x_WriteMessageNoThrow("ERR:eInvalidParameter:Status unknown: ", 01983 m_CommandArguments.job_status_string); 01984 x_PrintRequestStop(eStatus_BadRequest); 01985 return; 01986 } 01987 01988 q->PrintAllJobDbStat(*this, m_CommandArguments.group, 01989 m_CommandArguments.job_status, 01990 m_CommandArguments.start_after_job_id, 01991 m_CommandArguments.count); 01992 WriteMessage("OK:END"); 01993 x_PrintRequestStop(eStatus_OK); 01994 return; 01995 } 01996 01997 01998 // Certain job dump 01999 if (q->PrintJobDbStat(*this, m_CommandArguments.job_id) == 0) { 02000 // Nothing was printed because there is no such a job 02001 x_WriteMessageNoThrow("ERR:eJobNotFound:"); 02002 x_PrintRequestStop(eStatus_NotFound); 02003 return; 02004 } 02005 02006 WriteMessage("OK:END"); 02007 x_PrintRequestStop(eStatus_OK); 02008 return; 02009 } 02010 02011 02012 void CNetScheduleHandler::x_ProcessShutdown(CQueue*) 02013 { 02014 if (m_CommandArguments.drain) { 02015 if (m_Server->GetShutdownFlag()) { 02016 WriteMessage("ERR:eShuttingDown:The server is in shutting down state"); 02017 x_PrintRequestStop(eStatus_BadRequest); 02018 return; 02019 } 02020 WriteMessage("OK:"); 02021 x_PrintRequestStop(eStatus_OK); 02022 m_Server->SetRefuseSubmits(true); 02023 m_Server->SetDrainShutdown(); 02024 return; 02025 } 02026 02027 // Unconditional immediate shutdown. 02028 x_PrintRequestStop(eStatus_OK); 02029 WriteMessage("OK:"); 02030 m_Server->SetRefuseSubmits(true); 02031 m_Server->SetShutdownFlag(); 02032 return; 02033 } 02034 02035 02036 void CNetScheduleHandler::x_ProcessGetConf(CQueue*) 02037 { 02038 CConn_SocketStream ss(GetSocket().GetSOCK(), eNoOwnership); 02039 02040 CNcbiApplication::Instance()->GetConfig().Write(ss); 02041 ss.flush(); 02042 WriteMessage("OK:END"); 02043 x_PrintRequestStop(eStatus_OK); 02044 } 02045 02046 02047 void CNetScheduleHandler::x_ProcessVersion(CQueue*) 02048 { 02049 WriteMessage("OK:", 02050 "server_version=" NETSCHEDULED_VERSION 02051 "&storage_version=" NETSCHEDULED_STORAGE_VERSION 02052 "&protocol_version=" NETSCHEDULED_PROTOCOL_VERSION 02053 "&build_date=" + NStr::URLEncode(NETSCHEDULED_BUILD_DATE) + 02054 "&ns_node=" + m_Server->GetNodeID() + 02055 "&ns_session=" + m_Server->GetSessionID()); 02056 x_PrintRequestStop(eStatus_OK); 02057 } 02058 02059 02060 void CNetScheduleHandler::x_ProcessQList(CQueue*) 02061 { 02062 WriteMessage("OK:", m_Server->GetQueueNames(";")); 02063 x_PrintRequestStop(eStatus_OK); 02064 } 02065 02066 02067 void CNetScheduleHandler::x_ProcessQuitSession(CQueue*) 02068 { 02069 m_Server->CloseConnection(&GetSocket()); 02070 02071 // Log the close request 02072 CDiagContext::SetRequestContext(m_DiagContext); 02073 x_PrintRequestStop(eStatus_OK); 02074 CDiagContext::SetRequestContext(NULL); 02075 } 02076 02077 02078 void CNetScheduleHandler::x_ProcessCreateQueue(CQueue*) 02079 { 02080 m_Server->CreateQueue(m_CommandArguments.qname, 02081 m_CommandArguments.qclass, 02082 NStr::ParseEscapes(m_CommandArguments.comment)); 02083 WriteMessage("OK:"); 02084 x_PrintRequestStop(eStatus_OK); 02085 } 02086 02087 02088 void CNetScheduleHandler::x_ProcessDeleteQueue(CQueue*) 02089 { 02090 m_Server->DeleteQueue(m_CommandArguments.qname); 02091 WriteMessage("OK:"); 02092 x_PrintRequestStop(eStatus_OK); 02093 } 02094 02095 02096 void CNetScheduleHandler::x_ProcessQueueInfo(CQueue*) 02097 { 02098 const string& qname = m_CommandArguments.qname; 02099 int kind; 02100 string qclass; 02101 string comment; 02102 02103 m_Server->QueueInfo(qname, kind, &qclass, &comment); 02104 WriteMessage("OK:", NStr::IntToString(kind) + "\t" + qclass + "\t\"" + 02105 NStr::PrintableString(comment) + "\""); 02106 x_PrintRequestStop(eStatus_OK); 02107 } 02108 02109 02110 void CNetScheduleHandler::x_ProcessGetParam(CQueue* q) 02111 { 02112 unsigned int max_input_size; 02113 unsigned int max_output_size; 02114 02115 q->GetMaxIOSizes(max_input_size, max_output_size); 02116 WriteMessage("OK:", "max_input_size=" + 02117 NStr::IntToString(max_input_size) + ";" 02118 "max_output_size=" + 02119 NStr::IntToString(max_output_size) + ";" + 02120 NETSCHEDULED_FEATURES); 02121 x_PrintRequestStop(eStatus_OK); 02122 } 02123 02124 02125 void CNetScheduleHandler::x_ProcessGetConfiguration(CQueue* q) 02126 { 02127 CQueue::TParameterList parameters = q->GetParameters(); 02128 02129 ITERATE(CQueue::TParameterList, it, parameters) { 02130 WriteMessage("OK:", it->first + '=' + it->second); 02131 } 02132 WriteMessage("OK:END"); 02133 x_PrintRequestStop(eStatus_OK); 02134 } 02135 02136 02137 void CNetScheduleHandler::x_ProcessReading(CQueue* q) 02138 { 02139 x_CheckNonAnonymousClient("use READ command"); 02140 02141 CJob job; 02142 02143 q->GetJobForReading(m_ClientId, m_CommandArguments.timeout, 02144 m_CommandArguments.group, 02145 &job); 02146 02147 unsigned int job_id = job.GetId(); 02148 string job_key; 02149 02150 if (job_id) { 02151 job_key = q->MakeKey(job_id); 02152 WriteMessage("OK:job_key=" + job_key + 02153 "&auth_token=" + job.GetAuthToken() + 02154 "&status=" + CNetScheduleAPI::StatusToString(job.GetStatus())); 02155 } 02156 else 02157 WriteMessage("OK:"); 02158 02159 if (m_Server->IsLog()) { 02160 if (job_id) 02161 GetDiagContext().Extra().Print("job_key", job_key) 02162 .Print("auth_token", job.GetAuthToken()); 02163 else 02164 GetDiagContext().Extra().Print("job_key", "None"); 02165 } 02166 x_PrintRequestStop(eStatus_OK); 02167 } 02168 02169 02170 void CNetScheduleHandler::x_ProcessConfirm(CQueue* q) 02171 { 02172 x_CheckNonAnonymousClient("use CFRM command"); 02173 x_CheckAuthorizationToken(); 02174 02175 TJobStatus old_status = q->ConfirmReadingJob( 02176 m_ClientId, 02177 m_CommandArguments.job_id, 02178 m_CommandArguments.auth_token); 02179 x_FinalizeReadCommand("CFRM", old_status); 02180 } 02181 02182 02183 void CNetScheduleHandler::x_ProcessReadFailed(CQueue* q) 02184 { 02185 x_CheckNonAnonymousClient("use FRED command"); 02186 x_CheckAuthorizationToken(); 02187 02188 TJobStatus old_status = q->FailReadingJob( 02189 m_ClientId, 02190 m_CommandArguments.job_id, 02191 m_CommandArguments.auth_token); 02192 x_FinalizeReadCommand("FRED", old_status); 02193 } 02194 02195 02196 void CNetScheduleHandler::x_ProcessReadRollback(CQueue* q) 02197 { 02198 x_CheckNonAnonymousClient("use RDRB command"); 02199 x_CheckAuthorizationToken(); 02200 02201 TJobStatus old_status = q->ReturnReadingJob( 02202 m_ClientId, 02203 m_CommandArguments.job_id, 02204 m_CommandArguments.auth_token); 02205 x_FinalizeReadCommand("RDRB", old_status); 02206 } 02207 02208 02209 void CNetScheduleHandler::x_FinalizeReadCommand(const string & command, 02210 TJobStatus old_status) 02211 { 02212 if (old_status == CNetScheduleAPI::eJobNotFound) { 02213 WriteMessage("ERR:eJobNotFound:"); 02214 ERR_POST(Warning << command << " for unknown job " 02215 << m_CommandArguments.job_key); 02216 x_PrintRequestStop(eStatus_NotFound); 02217 return; 02218 } 02219 02220 if (old_status != CNetScheduleAPI::eReading) { 02221 string operation = "unknown"; 02222 02223 if (command == "CFRM") operation = "confirm"; 02224 else if (command == "FRED") operation = "fail"; 02225 else if (command == "RDRB") operation = "rollback"; 02226 02227 x_WriteMessageNoThrow("ERR:eInvalidJobStatus:Cannot " + 02228 operation + " job; job is in " + 02229 CNetScheduleAPI::StatusToString(old_status) + 02230 " state"); 02231 ERR_POST(Warning << "Cannot " << operation << " read job; job is in " 02232 << CNetScheduleAPI::StatusToString(old_status) 02233 << " state"); 02234 02235 x_PrintRequestStop(eStatus_InvalidJobStatus); 02236 return; 02237 } 02238 02239 WriteMessage("OK:"); 02240 x_PrintRequestStop(eStatus_OK); 02241 } 02242 02243 02244 void CNetScheduleHandler::x_ProcessGetAffinityList(CQueue* q) 02245 { 02246 WriteMessage("OK:", q->GetAffinityList()); 02247 x_PrintRequestStop(eStatus_OK); 02248 } 02249 02250 02251 void CNetScheduleHandler::x_ProcessClearWorkerNode(CQueue* q) 02252 { 02253 q->ClearWorkerNode(m_ClientId); 02254 WriteMessage("OK:"); 02255 x_PrintRequestStop(eStatus_OK); 02256 } 02257 02258 02259 void CNetScheduleHandler::x_ProcessCancelQueue(CQueue* q) 02260 { 02261 q->CancelAllJobs(m_ClientId); 02262 WriteMessage("OK:"); 02263 x_PrintRequestStop(eStatus_OK); 02264 } 02265 02266 02267 void CNetScheduleHandler::x_ProcessRefuseSubmits(CQueue* q) 02268 { 02269 if (m_CommandArguments.mode == false && 02270 (m_Server->IsDrainShutdown() || m_Server->GetShutdownFlag())) { 02271 WriteMessage("ERR:eShuttingDown:" 02272 "Server is in drained shutting down state"); 02273 x_PrintRequestStop(eStatus_BadRequest); 02274 return; 02275 } 02276 02277 if (q == NULL) { 02278 // This is a whole server scope request 02279 m_Server->SetRefuseSubmits(m_CommandArguments.mode); 02280 WriteMessage("OK:"); 02281 x_PrintRequestStop(eStatus_OK); 02282 return; 02283 } 02284 02285 // This is a queue scope request. 02286 q->SetRefuseSubmits(m_CommandArguments.mode); 02287 02288 if (m_CommandArguments.mode == false && 02289 m_Server->GetRefuseSubmits() == true) 02290 WriteMessage("OK:WARNING:Submits are disabled on the server level;"); 02291 else 02292 WriteMessage("OK:"); 02293 x_PrintRequestStop(eStatus_OK); 02294 } 02295 02296 02297 void CNetScheduleHandler::x_CmdNotImplemented(CQueue *) 02298 { 02299 x_WriteMessageNoThrow("ERR:eObsoleteCommand:"); 02300 x_PrintRequestStop(eStatus_NotImplemented); 02301 } 02302 02303 02304 void CNetScheduleHandler::x_CmdObsolete(CQueue*) 02305 { 02306 x_WriteMessageNoThrow("OK:WARNING:Obsolete;"); 02307 x_PrintRequestStop(eStatus_NotImplemented); 02308 } 02309 02310 02311 void CNetScheduleHandler::x_CheckNonAnonymousClient(const string & message) 02312 { 02313 if (!m_ClientId.IsComplete()) 02314 NCBI_THROW(CNetScheduleException, eInvalidClient, 02315 "Anonymous client (no client_node and client_session" 02316 " at handshake) cannot " + message); 02317 return; 02318 } 02319 02320 02321 void CNetScheduleHandler::x_CheckPortAndTimeout(void) 02322 { 02323 if ((m_CommandArguments.port != 0 && 02324 m_CommandArguments.timeout == 0) || 02325 (m_CommandArguments.port == 0 && 02326 m_CommandArguments.timeout != 0)) 02327 NCBI_THROW(CNetScheduleException, eInvalidParameter, 02328 "Either both or neither of the port and " 02329 "timeout parameters must be 0"); 02330 return; 02331 } 02332 02333 02334 void CNetScheduleHandler::x_CheckAuthorizationToken(void) 02335 { 02336 if (m_CommandArguments.auth_token.empty()) 02337 NCBI_THROW(CNetScheduleException, eInvalidAuthToken, 02338 "Invalid authorization token. It cannot be empty."); 02339 return; 02340 } 02341 02342 02343 void CNetScheduleHandler::x_CheckGetParameters(void) 02344 { 02345 // Checks that the given GETx/JXCG parameters make sense 02346 if (m_CommandArguments.wnode_affinity == false && 02347 m_CommandArguments.any_affinity == false && 02348 m_CommandArguments.affinity_token.empty()) { 02349 ERR_POST(Warning << "The job request without explicit affinities, " 02350 "without preferred affinities and " 02351 "with any_aff flag set to false " 02352 "will never match any job."); 02353 } 02354 if (m_CommandArguments.exclusive_new_aff == true && 02355 m_CommandArguments.any_affinity == true) 02356 NCBI_THROW(CNetScheduleException, eInvalidParameter, 02357 "It is forbidden to have both any_affinity and " 02358 "exclusive_new_aff GET2 flags set to 1."); 02359 return; 02360 } 02361 02362 02363 void CNetScheduleHandler::x_PrintRequestStart(const SParsedCmd& cmd) 02364 { 02365 if (m_Server->IsLog()) { 02366 CDiagContext_Extra ctxt_extra = 02367 GetDiagContext().PrintRequestStart() 02368 .Print("_type", "cmd") 02369 .Print("cmd", cmd.command->cmd) 02370 .Print("peer", GetSocket().GetPeerAddress(eSAF_IP)) 02371 .Print("conn", m_ConnContext->GetRequestID()); 02372 02373 // SUMBIT parameters should not be logged. The new job attributes will 02374 // be logged when a new job is actually submitted. 02375 if (cmd.command->extra.processor != &CNetScheduleHandler::x_ProcessSubmit) { 02376 ITERATE(TNSProtoParams, it, cmd.params) { 02377 ctxt_extra.Print(it->first, it->second); 02378 } 02379 } 02380 } 02381 m_DiagContext->SetRequestStatus(eStatus_OK); 02382 } 02383 02384 02385 void CNetScheduleHandler::x_PrintRequestStart(CTempString msg) 02386 { 02387 if (m_Server->IsLog()) 02388 GetDiagContext().PrintRequestStart() 02389 .Print("_type", "cmd") 02390 .Print("info", msg) 02391 .Print("peer", GetSocket().GetPeerAddress(eSAF_IP)) 02392 .Print("conn", m_ConnContext->GetRequestID()); 02393 } 02394 02395 02396 void CNetScheduleHandler::x_PrintRequestStop(unsigned int status) 02397 { 02398 if (m_Server->IsLog()) { 02399 CDiagContext::GetRequestContext().SetRequestStatus(status); 02400 GetDiagContext().PrintRequestStop(); 02401 CDiagContext::SetRequestContext(m_ConnContext); 02402 m_DiagContext.Reset(); 02403 } 02404 } 02405 02406 02407 // The function forms a responce for various 'get job' commands and prints 02408 // extra to the log if required 02409 void 02410 CNetScheduleHandler::x_PrintGetJobResponse(const CQueue * q, 02411 const CJob & job, 02412 bool cmdv2) 02413 { 02414 if (!job.GetId()) { 02415 // No suitable job found 02416 if (m_Server->IsLog()) 02417 GetDiagContext().Extra().Print("job_key", "none"); 02418 WriteMessage("OK:"); 02419 return; 02420 } 02421 02422 string job_key = q->MakeKey(job.GetId()); 02423 if (m_Server->IsLog()) { 02424 // The only piece required for logging is the job key 02425 GetDiagContext().Extra().Print("job_key", job_key); 02426 } 02427 02428 string output; 02429 if (cmdv2) 02430 output = "job_key=" + job_key + 02431 "&input=" + NStr::URLEncode(job.GetInput()) + 02432 "&affinity=" + NStr::URLEncode(q->GetAffinityTokenByID(job.GetAffinityId())) + 02433 "&client_ip=" + NStr::URLEncode(job.GetClientIP()) + 02434 "&client_sid=" + NStr::URLEncode(job.GetClientSID()) + 02435 "&mask=" + NStr::UIntToString(job.GetMask()) + 02436 "&auth_token=" + job.GetAuthToken(); 02437 else 02438 output = job_key + 02439 " \"" + NStr::PrintableString(job.GetInput()) + "\"" 02440 " \"" + NStr::PrintableString( 02441 q->GetAffinityTokenByID(job.GetAffinityId())) + "\"" 02442 " \"" + job.GetClientIP() + " " + job.GetClientSID() + "\"" 02443 " " + NStr::UIntToString(job.GetMask()); 02444 02445 02446 WriteMessage("OK:", output); 02447 return; 02448 } 02449 02450 02451 static const unsigned int kMaxParserErrMsgLength = 128; 02452 02453 // Write into socket, logs the message and closes the connection 02454 void CNetScheduleHandler::x_OnCmdParserError(bool need_request_start, 02455 const string & msg, 02456 const string & suffix) 02457 { 02458 // Truncating is done to prevent output of an arbitrary long garbage 02459 02460 if (need_request_start) 02461 x_PrintRequestStart("Invalid command"); 02462 02463 if (msg.size() < kMaxParserErrMsgLength * 2) 02464 ERR_POST("Error parsing command: " << msg + suffix); 02465 else 02466 ERR_POST("Error parsing command: " << 02467 msg.substr(0, kMaxParserErrMsgLength * 2) << 02468 " (TRUNCATED)" + suffix); 02469 x_PrintRequestStop(eStatus_BadRequest); 02470 02471 if (msg.size() < kMaxParserErrMsgLength) 02472 x_WriteMessageNoThrow("ERR:", "eProtocolSyntaxError:" + msg + suffix); 02473 else 02474 x_WriteMessageNoThrow("ERR:", "eProtocolSyntaxError:" + 02475 msg.substr(0, kMaxParserErrMsgLength) + 02476 " (TRUNCATED)" + suffix); 02477 02478 // Close the connection 02479 if (m_ConnContext.NotNull()) 02480 m_ConnContext->SetRequestStatus(eStatus_BadRequest); 02481 02482 m_Server->CloseConnection(&GetSocket()); 02483 return; 02484 } 02485 02486 02487 void CNetScheduleHandler::x_StatisticsNew(CQueue * q, 02488 const string & what, 02489 time_t curr) 02490 { 02491 if (what == "CLIENTS") 02492 q->PrintClientsList(*this, 02493 m_CommandArguments.comment == "VERBOSE"); 02494 else if (what == "NOTIFICATIONS") 02495 q->PrintNotificationsList(*this, 02496 m_CommandArguments.comment == "VERBOSE"); 02497 else if (what == "AFFINITIES") 02498 q->PrintAffinitiesList(*this, 02499 m_CommandArguments.comment == "VERBOSE"); 02500 else if (what == "GROUPS") 02501 q->PrintGroupsList(*this, 02502 m_CommandArguments.comment == "VERBOSE"); 02503 else if (what == "JOBS") 02504 q->PrintJobsStat(*this, m_CommandArguments.group, 02505 m_CommandArguments.affinity_token); 02506 else if (what == "WNODE") 02507 WriteMessage("OK:WARNING:Obsolete, use STAT CLIENTS instead;"); 02508 02509 WriteMessage("OK:END"); 02510 x_PrintRequestStop(eStatus_OK); 02511 } 02512
1.7.5.1
Modified on Wed May 23 12:58:06 2012 by modify_doxy.py rev. 337098