NCBI C++ ToolKit
ns_handler.cpp
Go to the documentation of this file.
00001 /*  $Id: ns_handler.cpp 54572 2012-05-22 16:00:04Z satskyse $
00002  * ===========================================================================
00003  *
00004  *                            PUBLIC DOMAIN NOTICE
00005  *               National Center for Biotechnology Information
00006  *
00007  *  This software/database is a "United States Government Work" under the
00008  *  terms of the United States Copyright Act.  It was written as part of
00009  *  the author's official duties as a United States Government employee and
00010  *  thus cannot be copyrighted.  This software/database is freely available
00011  *  to the public for use. The National Library of Medicine and the U.S.
00012  *  Government have not placed any restriction on its use or reproduction.
00013  *
00014  *  Although all reasonable efforts have been taken to ensure the accuracy
00015  *  and reliability of the software and data, the NLM and the U.S.
00016  *  Government do not and cannot warrant the performance or results that
00017  *  may be obtained by using this software or data. The NLM and the U.S.
00018  *  Government disclaim all warranties, express or implied, including
00019  *  warranties of performance, merchantability or fitness for any particular
00020  *  purpose.
00021  *
00022  *  Please cite the author in any work or product based on this material.
00023  *
00024  * ===========================================================================
00025  *
00026  * Authors:  Anatoliy Kuznetsov, Victor Joukov
00027  *
00028  * File Description: netschedule commands handler
00029  *
00030  */
00031 
00032 #include <ncbi_pch.hpp>
00033 #include <corelib/request_ctx.hpp>
00034 
00035 #include "ns_handler.hpp"
00036 #include "ns_server.hpp"
00037 #include "ns_server_misc.hpp"
00038 
00039 #include <sys/types.h>
00040 #include <sys/socket.h>
00041 #include <netinet/in.h>
00042 #include <netinet/tcp.h>
00043 
00044 
00045 USING_NCBI_SCOPE;
00046 
00047 
00048 
00049 /// NetSchedule command parser
00050 ///
00051 /// @internal
00052 ///
00053 
00054 CNetScheduleHandler::SCommandMap CNetScheduleHandler::sm_BatchHeaderMap[] = {
00055     { "BTCH", { &CNetScheduleHandler::x_ProcessBatchStart, 0 },
00056         { { "size", eNSPT_Int, eNSPA_Required } } },
00057     { "ENDS", { &CNetScheduleHandler::x_ProcessBatchSequenceEnd, 0 } },
00058     { NULL }
00059 };
00060 
00061 CNetScheduleHandler::SCommandMap CNetScheduleHandler::sm_BatchEndMap[] = {
00062     { "ENDB" },
00063     { NULL }
00064 };
00065 
00066 SNSProtoArgument s_BatchArgs[] = {
00067     { "input", eNSPT_Str, eNSPA_Required },
00068     { "aff",   eNSPT_Str, eNSPA_Optional, "" },
00069     { "msk",   eNSPT_Int, eNSPA_Optional, "0" },
00070     { NULL }
00071 };
00072 
00073 CNetScheduleHandler::SCommandMap CNetScheduleHandler::sm_CommandMap[] = {
00074 
00075     /*** Admin role ***/
00076     { "SHUTDOWN",      { &CNetScheduleHandler::x_ProcessShutdown,
00077                          eNSCR_Admin },
00078         { { "drain", eNSPT_Int, eNSPA_Optional, 0 } } },
00079     { "GETCONF",       { &CNetScheduleHandler::x_ProcessGetConf,
00080                          eNSCR_Admin } },
00081     { "REFUSESUBMITS", { &CNetScheduleHandler::x_ProcessRefuseSubmits,
00082                          eNSCR_Admin },
00083         { { "mode", eNSPT_Int, eNSPA_Required } } },
00084 
00085     /*** Any role ***/
00086     { "VERSION",  { &CNetScheduleHandler::x_ProcessVersion,
00087                     eNSCR_Any } },
00088     { "QUIT",     { &CNetScheduleHandler::x_ProcessQuitSession,
00089                     eNSCR_Any } },
00090     { "RECO",     { &CNetScheduleHandler::x_ProcessReloadConfig,
00091                     eNSCR_Admin } },
00092     { "ACNT",     { &CNetScheduleHandler::x_ProcessActiveCount,
00093                     eNSCR_Any } },
00094     { "QLST",     { &CNetScheduleHandler::x_ProcessQList,
00095                     eNSCR_Any } },
00096     { "QINF",     { &CNetScheduleHandler::x_ProcessQueueInfo,
00097                     eNSCR_Any },
00098         { { "qname", eNSPT_Id, eNSPA_Required } } },
00099 
00100     /*** QueueAdmin role ***/
00101     { "DROPQ",    { &CNetScheduleHandler::x_ProcessDropQueue,
00102                     eNSCR_QueueAdmin } },
00103 
00104     /*** DynClassAdmin role ***/
00105     // QCRE qname : id  qclass : id [ comment : str ]
00106     { "QCRE",     { &CNetScheduleHandler::x_ProcessCreateQueue,
00107                     eNSAC_DynClassAdmin },
00108         { { "qname",   eNSPT_Id,  eNSPA_Required },
00109           { "qclass",  eNSPT_Id,  eNSPA_Required },
00110           { "comment", eNSPT_Str, eNSPA_Optional } } },
00111     // QDEL qname : id
00112     { "QDEL",     { &CNetScheduleHandler::x_ProcessDeleteQueue,
00113                     eNSAC_DynQueueAdmin },
00114         { { "qname", eNSPT_Id, eNSPA_Required } } },
00115 
00116     /*** Queue role ***/
00117     // STATUS job_key : id
00118     { "STATUS",   { &CNetScheduleHandler::x_ProcessStatus,
00119                     eNSCR_Queue },
00120         { { "job_key", eNSPT_Id, eNSPA_Required } } },
00121     // STATUS2 job_key : id
00122     { "STATUS2",  { &CNetScheduleHandler::x_ProcessStatus,
00123                     eNSCR_Queue },
00124         { { "job_key", eNSPT_Id, eNSPA_Required } } },
00125     // STAT [ option : id ] -- "ALL"
00126     { "STAT",     { &CNetScheduleHandler::x_ProcessStatistics,
00127                     eNSCR_Any },
00128         { { "option",  eNSPT_Id,  eNSPA_Optional },
00129           { "comment", eNSPT_Id,  eNSPA_Optional },
00130           { "aff",     eNSPT_Str, eNSPA_Optional },
00131           { "group",   eNSPT_Str, eNSPA_Optional } } },
00132     // MPUT job_key : id  progress_msg : str
00133     { "MPUT",     { &CNetScheduleHandler::x_ProcessPutMessage,
00134                     eNSCR_Queue },
00135         { { "job_key",      eNSPT_Id, eNSPA_Required },
00136           { "progress_msg", eNSPT_Str, eNSPA_Required } } },
00137     // MGET job_key : id
00138     { "MGET",     { &CNetScheduleHandler::x_ProcessGetMessage,
00139                     eNSCR_Queue },
00140         { { "job_key", eNSPT_Id, eNSPA_Required } } },
00141     // DUMP [ job_key : id ]
00142     { "DUMP",     { &CNetScheduleHandler::x_ProcessDump,
00143                     eNSCR_Queue },
00144         { { "job_key",     eNSPT_Id,  eNSPA_Optional      },
00145           { "status",      eNSPT_Id,  eNSPA_Optional      },
00146           { "start_after", eNSPT_Id,  eNSPA_Optional      },
00147           { "count",       eNSPT_Int, eNSPA_Optional, "0" },
00148           { "group",       eNSPT_Str, eNSPA_Optional } } },
00149     // GETP [ client_info : id ]
00150     { "GETP",     { &CNetScheduleHandler::x_ProcessGetParam,
00151                     eNSCR_Queue } },
00152     { "GETC",     { &CNetScheduleHandler::x_ProcessGetConfiguration,
00153                     eNSCR_Queue } },
00154     // CLRN
00155     { "CLRN",     { &CNetScheduleHandler::x_ProcessClearWorkerNode,
00156                     eNSCR_Queue } },
00157     { "CANCELQ",  { &CNetScheduleHandler::x_ProcessCancelQueue,
00158                     eNSCR_Queue } },
00159 
00160     /*** Submitter role ***/
00161     // SST job_key : id -- submitter fast status, changes timestamp
00162     { "SST",      { &CNetScheduleHandler::x_ProcessFastStatusS,
00163                     eNSCR_Submitter },
00164         { { "job_key", eNSPT_Id, eNSPA_Required } } },
00165     // SST2 job_key : id -- submitter fast status, changes timestamp
00166     { "SST2",     { &CNetScheduleHandler::x_ProcessFastStatusS,
00167                     eNSCR_Submitter },
00168         { { "job_key", eNSPT_Id, eNSPA_Required } } },
00169     // SUBMIT input : str [ progress_msg : str ] [ port : uint [ timeout : uint ]]
00170     //        [ affinity_token : keystr(aff) ] [ job_mask : keyint(msk) ]
00171     { "SUBMIT",   { &CNetScheduleHandler::x_ProcessSubmit,
00172                     eNSCR_Submitter },
00173         { { "input",        eNSPT_Str, eNSPA_Required },        // input
00174           { "port",         eNSPT_Int, eNSPA_Optional },
00175           { "timeout",      eNSPT_Int, eNSPA_Optional },
00176           { "aff",          eNSPT_Str, eNSPA_Optional, "" },    // affinity_token
00177           { "msk",          eNSPT_Int, eNSPA_Optional, "0" },
00178           { "ip",           eNSPT_Str, eNSPA_Optional, "" },
00179           { "sid",          eNSPT_Str, eNSPA_Optional, "" },
00180           { "group",        eNSPT_Str, eNSPA_Optional, "" } } },
00181     // CANCEL job_key : id
00182     { "CANCEL",   { &CNetScheduleHandler::x_ProcessCancel,
00183                     eNSCR_Submitter },
00184         { { "job_key", eNSPT_Id,  eNSPA_Optional },
00185           { "group",   eNSPT_Str, eNSPA_Optional } } },
00186     // DROJ job_key : id
00187     { "DROJ",     { &CNetScheduleHandler::x_ProcessDropJob,
00188                     eNSCR_Submitter },
00189         { { "job_key", eNSPT_Id, eNSPA_Required } } },
00190     { "BSUB",     { &CNetScheduleHandler::x_ProcessSubmitBatch,
00191                     eNSCR_Submitter },
00192         { { "port",         eNSPT_Int, eNSPA_Optional },
00193           { "timeout",      eNSPT_Int, eNSPA_Optional },
00194           { "ip",           eNSPT_Str, eNSPA_Optional, "" },
00195           { "sid",          eNSPT_Str, eNSPA_Optional, "" },
00196           { "group",        eNSPT_Str, eNSPA_Optional, "" } } },
00197     // READ [ timeout : int ] -> job key
00198     { "READ",     { &CNetScheduleHandler::x_ProcessReading,
00199                     eNSCR_Submitter },
00200         { { "timeout", eNSPT_Int, eNSPA_Optional, "0" },
00201           { "group",   eNSPT_Str, eNSPA_Optional, "" } } },
00202     // CFRM group : int jobs : str
00203     { "CFRM",     { &CNetScheduleHandler::x_ProcessConfirm,
00204                     eNSCR_Submitter },
00205         { { "job_key",    eNSPT_Id, eNSPA_Required },
00206           { "auth_token", eNSPT_Id, eNSPA_Required } } },
00207     // FRED group : int jobs : str [ message : str ]
00208     { "FRED",     { &CNetScheduleHandler::x_ProcessReadFailed,
00209                     eNSCR_Submitter },
00210         { { "job_key",    eNSPT_Id, eNSPA_Required },
00211           { "auth_token", eNSPT_Id, eNSPA_Required },
00212           { "err_msg",      eNSPT_Str, eNSPA_Optional } } },
00213     // RDRB group : int jobs : str
00214     { "RDRB",     { &CNetScheduleHandler::x_ProcessReadRollback,
00215                     eNSCR_Submitter },
00216         { { "job_key",    eNSPT_Id, eNSPA_Required },
00217           { "auth_token", eNSPT_Str, eNSPA_Required } } },
00218 
00219     /*** Worker node role ***/
00220     // WST job_key : id -- worker node fast status, does not change timestamp
00221     { "WST",      { &CNetScheduleHandler::x_ProcessFastStatusW,
00222                     eNSCR_Worker },
00223         { { "job_key", eNSPT_Id, eNSPA_Required } } },
00224     // WST2 job_key : id -- worker node fast status, does not change timestamp
00225     { "WST2",     { &CNetScheduleHandler::x_ProcessFastStatusW,
00226                     eNSCR_Worker },
00227         { { "job_key", eNSPT_Id, eNSPA_Required } } },
00228     // CHAFF [add: string] [del: string] -- change affinity
00229     { "CHAFF",    { &CNetScheduleHandler::x_ProcessChangeAffinity,
00230                     eNSCR_Worker },
00231         { { "add", eNSPT_Str, eNSPA_Optional, "" },
00232           { "del", eNSPT_Str, eNSPA_Optional, "" } } },
00233     // GET [ port : int ] [affinity_list : keystr(aff) ]
00234     { "GET",      { &CNetScheduleHandler::x_ProcessGetJob,
00235                     eNSCR_Worker },
00236         { { "port",      eNSPT_Id,  eNSPA_Optional },
00237           { "aff",       eNSPT_Str, eNSPA_Optional, "" } } },
00238     { "GET2",     { &CNetScheduleHandler::x_ProcessGetJob,
00239                     eNSCR_Worker },
00240         { { "wnode_aff",         eNSPT_Int, eNSPA_Required, 0 },
00241           { "any_aff",           eNSPT_Int, eNSPA_Required, 0 },
00242           { "exclusive_new_aff", eNSPT_Int, eNSPA_Optional, 0 },
00243           { "aff",               eNSPT_Str, eNSPA_Optional, "" },
00244           { "port",              eNSPT_Int, eNSPA_Optional },
00245           { "timeout",           eNSPT_Int, eNSPA_Optional } } },
00246     // PUT job_key : id  job_return_code : int  output : str
00247     { "PUT",      { &CNetScheduleHandler::x_ProcessPut,
00248                     eNSCR_Worker },
00249         { { "job_key",         eNSPT_Id,  eNSPA_Required },
00250           { "job_return_code", eNSPT_Id,  eNSPA_Required },
00251           { "output",          eNSPT_Str, eNSPA_Required } } },
00252     { "PUT2",     { &CNetScheduleHandler::x_ProcessPut,
00253                     eNSCR_Worker },
00254         { { "job_key",         eNSPT_Id,  eNSPA_Required },
00255           { "auth_token",      eNSPT_Id,  eNSPA_Required },
00256           { "job_return_code", eNSPT_Id,  eNSPA_Required },
00257           { "output",          eNSPT_Str, eNSPA_Required } } },
00258     // RETURN job_key : id
00259     { "RETURN",   { &CNetScheduleHandler::x_ProcessReturn,
00260                     eNSCR_Worker },
00261         { { "job_key", eNSPT_Id, eNSPA_Required } } },
00262     { "RETURN2",  { &CNetScheduleHandler::x_ProcessReturn,
00263                     eNSCR_Worker },
00264         { { "job_key",         eNSPT_Id,  eNSPA_Required },
00265           { "auth_token",      eNSPT_Id,  eNSPA_Required } } },
00266     // WGET port : uint  timeout : uint
00267     //      [affinity_list : keystr(aff) ]
00268     { "WGET",     { &CNetScheduleHandler::x_ProcessGetJob,
00269                     eNSCR_Worker },
00270         { { "port",      eNSPT_Int, eNSPA_Required },
00271           { "timeout",   eNSPT_Int, eNSPA_Required },
00272           { "aff",       eNSPT_Str, eNSPA_Optional, "" } } },
00273     { "CWGET",    { &CNetScheduleHandler::x_ProcessCancelWaitGet,
00274                     eNSCR_Worker } },
00275     // FPUT job_key : id  err_msg : str  output : str  job_return_code : int
00276     { "FPUT",     { &CNetScheduleHandler::x_ProcessPutFailure,
00277                     eNSCR_Worker },
00278         { { "job_key",         eNSPT_Id,  eNSPA_Required },
00279           { "err_msg",         eNSPT_Str, eNSPA_Required },
00280           { "output",          eNSPT_Str, eNSPA_Required },
00281           { "job_return_code", eNSPT_Int, eNSPA_Required } } },
00282     { "FPUT2",    { &CNetScheduleHandler::x_ProcessPutFailure,
00283                     eNSCR_Worker },
00284         { { "job_key",         eNSPT_Id,  eNSPA_Required },
00285           { "auth_token",      eNSPT_Id,  eNSPA_Required },
00286           { "err_msg",         eNSPT_Str, eNSPA_Required },
00287           { "output",          eNSPT_Str, eNSPA_Required },
00288           { "job_return_code", eNSPT_Int, eNSPA_Required } } },
00289     // JXCG [ job_key : id [ job_return_code : int [ output : str ] ] ]
00290     //      [affinity_list : keystr(aff) ]
00291     { "JXCG",     { &CNetScheduleHandler::x_ProcessJobExchange,
00292                     eNSCR_Worker },
00293         { { "job_key",         eNSPT_Id,  eNSPA_Optchain },
00294           { "job_return_code", eNSPT_Int, eNSPA_Optchain },
00295           { "output",          eNSPT_Str, eNSPA_Optional },
00296           { "aff",             eNSPT_Str, eNSPA_Optional, "" } } },
00297     // JDEX job_key : id timeout : uint
00298     { "JDEX",     { &CNetScheduleHandler::x_ProcessJobDelayExpiration,
00299                     eNSCR_Worker },
00300         { { "job_key", eNSPT_Id,  eNSPA_Required },
00301           { "timeout", eNSPT_Int, eNSPA_Required } } },
00302     // AFLS
00303     { "AFLS",     { &CNetScheduleHandler::x_ProcessGetAffinityList,
00304                     eNSCR_Worker } },
00305 
00306     // Obsolete commands
00307     { "REGC",     { &CNetScheduleHandler::x_CmdObsolete,
00308                     eNSCR_Worker },
00309         { { "port", eNSPT_Int, eNSPA_Optional } } },
00310     { "URGC",     { &CNetScheduleHandler::x_CmdObsolete,
00311                     eNSCR_Worker },
00312         { { "port", eNSPT_Int, eNSPA_Optional } } },
00313     { "INIT",     { &CNetScheduleHandler::x_CmdObsolete,
00314                     eNSCR_Worker } },
00315     { "JRTO",     { &CNetScheduleHandler::x_CmdNotImplemented,
00316                     eNSCR_Worker } },
00317     { "FRES",     { &CNetScheduleHandler::x_CmdNotImplemented,
00318                     eNSCR_Submitter } },
00319     { "QERY",     { &CNetScheduleHandler::x_CmdNotImplemented,
00320                     eNSCR_Queue } },
00321     { "QSEL",     { &CNetScheduleHandler::x_CmdNotImplemented,
00322                     eNSCR_Queue } },
00323     { "MONI",     { &CNetScheduleHandler::x_CmdNotImplemented,
00324                     eNSCR_Queue } },
00325     { "LOG",      { &CNetScheduleHandler::x_CmdNotImplemented,
00326                     eNSCR_Any } },
00327 
00328     { NULL },
00329 };
00330 
00331 
00332 static SNSProtoArgument s_AuthArgs[] = {
00333     { "client", eNSPT_Str, eNSPA_Optional, "Unknown client" },
00334     { "params", eNSPT_Str, eNSPA_Ellipsis },
00335     { NULL }
00336 };
00337 
00338 
00339 
00340 static void s_BufReadHelper(void* data, void* ptr, size_t size)
00341 {
00342     ((string*) data)->append((const char *) ptr, size);
00343 }
00344 
00345 
00346 static void s_ReadBufToString(BUF buf, string& str)
00347 {
00348     size_t      size = BUF_Size(buf);
00349 
00350     str.erase();
00351     str.reserve(size);
00352 
00353     BUF_PeekAtCB(buf, 0, s_BufReadHelper, &str, size);
00354     BUF_Read(buf, NULL, size);
00355 }
00356 
00357 
00358 CNetScheduleHandler::CNetScheduleHandler(CNetScheduleServer* server)
00359     : m_MsgBufferSize(kInitialMessageBufferSize),
00360       m_MsgBuffer(new char[kInitialMessageBufferSize]),
00361       m_Server(server),
00362       m_BatchSize(0),
00363       m_BatchPos(0),
00364       m_WithinBatchSubmit(false),
00365       m_SingleCmdParser(sm_CommandMap),
00366       m_BatchHeaderParser(sm_BatchHeaderMap),
00367       m_BatchEndParser(sm_BatchEndMap)
00368 {}
00369 
00370 
00371 CNetScheduleHandler::~CNetScheduleHandler()
00372 {
00373     delete [] m_MsgBuffer;
00374 }
00375 
00376 
00377 void CNetScheduleHandler::OnOpen(void)
00378 {
00379     CSocket &       socket = GetSocket();
00380     STimeout        to = {m_Server->GetInactivityTimeout(), 0};
00381 
00382     socket.DisableOSSendDelay();
00383     socket.SetTimeout(eIO_ReadWrite, &to);
00384     x_SetQuickAcknowledge();
00385 
00386     m_ProcessMessage = &CNetScheduleHandler::x_ProcessMsgAuth;
00387 
00388     // Log the fact of opened connection
00389     m_ConnContext.Reset(new CRequestContext());
00390     m_ConnContext->SetRequestID();
00391     m_ConnContext->SetClientIP(socket.GetPeerAddress(eSAF_IP));
00392 
00393     CDiagnosticsGuard   guard(this);
00394     if (m_Server->IsLog()) {
00395         GetDiagContext().PrintRequestStart()
00396                         .Print("_type", "conn")
00397                         .Flush();
00398     }
00399     m_ConnContext->SetRequestStatus(eStatus_OK);
00400 }
00401 
00402 
00403 void CNetScheduleHandler::OnWrite()
00404 {}
00405 
00406 
00407 void CNetScheduleHandler::OnClose(IServer_ConnectionHandler::EClosePeer peer)
00408 {
00409     if (m_WithinBatchSubmit) {
00410         m_WithinBatchSubmit = false;
00411         m_Server->DecrementCurrentSubmitsCounter();
00412     }
00413 
00414     // It's possible that this method will be called before OnOpen - when
00415     // connection is just created and server is shutting down. In this case
00416     // OnOpen will never be called.
00417     if (m_ConnContext.IsNull())
00418         return;
00419 
00420 
00421     CDiagnosticsGuard   guard(this);
00422 
00423     switch (peer)
00424     {
00425     case IServer_ConnectionHandler::eOurClose:
00426         if (m_DiagContext.NotNull()) {
00427             m_ConnContext->SetRequestStatus(m_DiagContext->GetRequestStatus());
00428         } else {
00429             if (m_ConnContext->GetRequestStatus() != eStatus_HTTPProbe &&
00430                 m_ConnContext->GetRequestStatus() != eStatus_BadRequest)
00431                 m_ConnContext->SetRequestStatus(eStatus_Inactive);
00432         }
00433         break;
00434     case IServer_ConnectionHandler::eClientClose:
00435         if (m_DiagContext.NotNull()) {
00436             m_DiagContext->SetRequestStatus(eStatus_SocketIOError);
00437             m_ConnContext->SetRequestStatus(eStatus_SocketIOError);
00438         }
00439         break;
00440     }
00441 
00442 
00443     if (m_Server->IsLog()) {
00444         CSocket&        socket = GetSocket();
00445         CDiagContext::SetRequestContext(m_ConnContext);
00446         m_ConnContext->SetBytesRd(socket.GetCount(eIO_Read));
00447         m_ConnContext->SetBytesWr(socket.GetCount(eIO_Write));
00448         GetDiagContext().PrintRequestStop();
00449     }
00450 
00451     m_ConnContext.Reset();
00452 }
00453 
00454 
00455 void CNetScheduleHandler::OnTimeout()
00456 {
00457     CDiagnosticsGuard       guard(this);
00458 
00459     if (m_Server->IsLog())
00460         m_ConnContext->SetRequestStatus(eStatus_Inactive);
00461 }
00462 
00463 
00464 void CNetScheduleHandler::OnOverflow(EOverflowReason reason)
00465 {
00466     switch (reason) {
00467     case eOR_ConnectionPoolFull:
00468         ERR_POST("eCommunicationError:Connection pool full");
00469         break;
00470     case eOR_UnpollableSocket:
00471         ERR_POST("eCommunicationError:Unpollable connection");
00472         break;
00473     case eOR_RequestQueueFull:
00474         ERR_POST("eCommunicationError:Request queue full");
00475         break;
00476     default:
00477         ERR_POST("eCommunicationError:Unknown overflow error");
00478         break;
00479     }
00480 }
00481 
00482 
00483 void CNetScheduleHandler::OnMessage(BUF buffer)
00484 {
00485     CDiagnosticsGuard   guard(this);
00486 
00487     if (m_Server->ShutdownRequested()) {
00488         x_WriteMessageNoThrow("ERR:eShuttingDown:NetSchedule server is shutting down. "
00489                               "Session aborted.");
00490         return;
00491     }
00492 
00493     try {
00494         // Single line user input processor
00495         (this->*m_ProcessMessage)(buffer);
00496     }
00497     catch (const CNetScheduleException &  ex) {
00498         x_WriteMessageNoThrow("ERR:", string(ex.GetErrCodeString()) +
00499                                       ":" + ex.GetMsg());
00500         ERR_POST("NetSchedule exception: " << ex);
00501         x_PrintRequestStop(ex.ErrCodeToHTTPStatusCode());
00502     }
00503     catch (const CNSProtoParserException &  ex) {
00504         x_WriteMessageNoThrow("ERR:", string(ex.GetErrCodeString()) +
00505                                       ":" + ex.GetMsg());
00506         ERR_POST("Command parser error: " << ex);
00507         x_PrintRequestStop(eStatus_BadRequest);
00508     }
00509     catch (const CNetServiceException &  ex) {
00510         x_WriteMessageNoThrow("ERR:", string(ex.GetErrCodeString()) +
00511                                       ":" + ex.GetMsg());
00512         ERR_POST("NetService exception: " << ex);
00513         if (ex.GetErrCode() == CNetServiceException::eCommunicationError)
00514             x_PrintRequestStop(eStatus_SocketIOError);
00515         else
00516             x_PrintRequestStop(eStatus_ServerError);
00517     }
00518     catch (const CBDB_ErrnoException &  ex) {
00519         string  error;
00520         if (ex.IsRecovery())
00521             error = NStr::PrintableString("Fatal Berkeley DB error "
00522                                           "(DB_RUNRECOVERY). Emergency "
00523                                           "shutdown initiated. " +
00524                                           string(ex.what()));
00525         else
00526             error = NStr::PrintableString("Internal database error - " +
00527                                           string(ex.what()));
00528 
00529         x_WriteMessageNoThrow("ERR:", "eInternalError:" + error);
00530         ERR_POST("BDB error: " << ex);
00531         x_PrintRequestStop(eStatus_ServerError);
00532 
00533         if (ex.IsRecovery())
00534             m_Server->SetShutdownFlag();
00535     }
00536     catch (const CBDB_Exception &  ex) {
00537         string  error = NStr::PrintableString("eInternalError:Internal "
00538                                               "database (BDB) error - " +
00539                                               string(ex.what()));
00540         x_WriteMessageNoThrow("ERR:", error);
00541         ERR_POST("BDB error: " << ex);
00542         x_PrintRequestStop(eStatus_ServerError);
00543     }
00544     catch (const exception &  ex) {
00545         string  error = NStr::PrintableString("eInternalError:Internal "
00546                                               "error - " + string(ex.what()));
00547         x_WriteMessageNoThrow("ERR:", error);
00548         ERR_POST("STL exception: " << ex.what());
00549         x_PrintRequestStop(eStatus_ServerError);
00550     }
00551     catch (...) {
00552         x_WriteMessageNoThrow("ERR:eInternalError:Unknown server exception.");
00553         ERR_POST("ERR:Unknown server exception.");
00554         x_PrintRequestStop(eStatus_ServerError);
00555     }
00556 }
00557 
00558 
00559 void CNetScheduleHandler::x_SetQuickAcknowledge(void)
00560 {
00561     int     fd = 0;
00562     int     val = 1;
00563 
00564     GetSocket().GetOSHandle(&fd, sizeof(fd));
00565     setsockopt(fd, IPPROTO_TCP, TCP_QUICKACK, &val, sizeof(val));
00566 }
00567 
00568 
00569 void CNetScheduleHandler::WriteMessage(CTempString  prefix,
00570                                        CTempString  msg)
00571 {
00572     size_t  prefix_size = prefix.size();
00573     size_t  msg_size = msg.size();
00574     if (msg[msg_size-1] == '\n')
00575         --msg_size;
00576     size_t  required_size = prefix_size + msg_size + 1;
00577 
00578     if (required_size > m_MsgBufferSize) {
00579         delete [] m_MsgBuffer;
00580         while (required_size > m_MsgBufferSize)
00581             m_MsgBufferSize += kMessageBufferIncrement;
00582         m_MsgBuffer = new char[m_MsgBufferSize];
00583     }
00584 
00585     memcpy(m_MsgBuffer, prefix.data(), prefix_size);
00586     memcpy(m_MsgBuffer + prefix_size, msg.data(), msg_size);
00587     m_MsgBuffer[required_size-1] = '\n';
00588 
00589     // Write to the socket as a single transaction
00590     size_t      n_written;
00591     if (GetSocket().Write(m_MsgBuffer, required_size,
00592                           &n_written) != eIO_Success) {
00593         NCBI_THROW(CNetServiceException,
00594                    eCommunicationError, "Socket write error; peer: " +
00595                                         GetSocket().GetPeerAddress());
00596     }
00597 }
00598 
00599 
00600 void CNetScheduleHandler::WriteMessage(CTempString msg)
00601 {
00602     size_t  msg_size = msg.size();
00603     if (msg[msg_size-1] == '\n')
00604         --msg_size;
00605     size_t  required_size = msg_size + 1;
00606 
00607     if (required_size > m_MsgBufferSize) {
00608         delete [] m_MsgBuffer;
00609         while (required_size > m_MsgBufferSize)
00610             m_MsgBufferSize += kMessageBufferIncrement;
00611         m_MsgBuffer = new char[m_MsgBufferSize];
00612     }
00613 
00614     memcpy(m_MsgBuffer, msg.data(), msg_size);
00615     m_MsgBuffer[required_size-1] = '\n';
00616 
00617     // Write to the socket as a single transaction
00618     size_t      n_written;
00619     if (GetSocket().Write(m_MsgBuffer, required_size,
00620                           &n_written) != eIO_Success) {
00621         NCBI_THROW(CNetServiceException,
00622                    eCommunicationError, "Socket write error; peer: " +
00623                                          GetSocket().GetPeerAddress());
00624     }
00625 }
00626 
00627 
00628 unsigned int  CNetScheduleHandler::x_WriteMessageNoThrow(CTempString  prefix,
00629                                                          CTempString  msg)
00630 {
00631     try {
00632         WriteMessage(prefix, msg);
00633     }
00634     catch (CNetServiceException&  ex) {
00635         ERR_POST(ex);
00636         return eStatus_SocketIOError;
00637     }
00638     catch (...) {
00639         ERR_POST("Unknown exception while writing into a socket.");
00640         return eStatus_ServerError;
00641     }
00642     return eStatus_OK;
00643 }
00644 
00645 unsigned int  CNetScheduleHandler::x_WriteMessageNoThrow(CTempString  msg)
00646 {
00647     try {
00648         WriteMessage(msg);
00649     }
00650     catch (CNetServiceException&  ex) {
00651         ERR_POST(ex);
00652         return eStatus_SocketIOError;
00653     }
00654     catch (...) {
00655         ERR_POST("Unknown exception while writing into a socket.");
00656         return eStatus_ServerError;
00657     }
00658     return eStatus_OK;
00659 }
00660 
00661 
00662 void CNetScheduleHandler::InitDiagnostics(void)
00663 {
00664     if (m_DiagContext.NotNull()) {
00665         CDiagContext::SetRequestContext(m_DiagContext);
00666     }
00667     else if (m_ConnContext.NotNull()) {
00668         CDiagContext::SetRequestContext(m_ConnContext);
00669     }
00670 }
00671 
00672 
00673 void CNetScheduleHandler::ResetDiagnostics(void)
00674 {
00675     CDiagContext::SetRequestContext(NULL);
00676 }
00677 
00678 
00679 unsigned int CNetScheduleHandler::x_GetPeerAddress(void)
00680 {
00681     unsigned int        peer_addr;
00682 
00683     GetSocket().GetPeerAddress(&peer_addr, 0, eNH_NetworkByteOrder);
00684 
00685     // always use localhost(127.0*) address for clients coming from
00686     // the same net address (sometimes it can be 127.* or full address)
00687     if (peer_addr == m_Server->GetHostNetAddr())
00688         return CSocketAPI::GetLoopbackAddress();
00689     return peer_addr;
00690 }
00691 
00692 
00693 void CNetScheduleHandler::x_ProcessMsgAuth(BUF buffer)
00694 {
00695     // This should only memorize the received string.
00696     // The x_ProcessMsgQueue(...)  will parse it.
00697     // This is done to avoid copying parsed parameters and to have exactly one
00698     // diagnostics extra with both auth parameters and queue name.
00699     s_ReadBufToString(buffer, m_RawAuthString);
00700 
00701     // Check if it was systems probe...
00702     if (strncmp(m_RawAuthString.c_str(), "GET / HTTP/1.", 13) == 0) {
00703         // That was systems probing ports
00704 
00705         if (m_ConnContext.NotNull())
00706             m_ConnContext->SetRequestStatus(eStatus_HTTPProbe);
00707 
00708         m_Server->CloseConnection(&GetSocket());
00709         return;
00710     }
00711 
00712     m_ProcessMessage = &CNetScheduleHandler::x_ProcessMsgQueue;
00713     x_SetQuickAcknowledge();
00714 }
00715 
00716 
00717 void CNetScheduleHandler::x_ProcessMsgQueue(BUF buffer)
00718 {
00719     s_ReadBufToString(buffer, m_QueueName);
00720 
00721     // Parse saved raw authorization string and produce log output
00722     TNSProtoParams      params;
00723     bool                parse_failed = false;
00724 
00725     try {
00726         m_SingleCmdParser.ParseArguments(m_RawAuthString, s_AuthArgs, &params);
00727     }
00728     catch (CNSProtoParserException &  ex) {
00729         ERR_POST("Error authenticating client: '" <<
00730                  m_RawAuthString << "': " << ex);
00731         parse_failed = true;
00732     }
00733 
00734     // Memorize what we know about the client
00735     try {
00736         m_ClientId.Update(this->x_GetPeerAddress(), params);
00737         if (parse_failed)
00738             m_ClientId.SetClientName(m_RawAuthString);
00739     }
00740     catch (const CNetScheduleException &  ex) {
00741         if (ex.GetErrCode() == CNetScheduleException::eAuthenticationError) {
00742             x_WriteMessageNoThrow("ERR:", string(ex.GetErrCodeString()) +
00743                                           ":" + ex.GetMsg());
00744             ERR_POST("Server error: " << ex);
00745             if (m_Server->IsLog()) {
00746                 CDiagContext::GetRequestContext().SetRequestStatus(
00747                                                 ex.ErrCodeToHTTPStatusCode());
00748                 GetDiagContext().PrintRequestStop();
00749                 m_ConnContext.Reset();
00750             }
00751             m_Server->CloseConnection(&GetSocket());
00752             return;
00753         }
00754         throw;
00755     }
00756 
00757 
00758     if (m_Server->AdminHostValid(m_ClientId.GetAddress()) &&
00759         m_Server->IsAdminClientName(m_ClientId.GetClientName()))
00760     {
00761         // TODO: queue admin should be checked in ProcessMsgQueue,
00762         // when queue info is available
00763         m_ClientId.AddCapability(eNSAC_Admin | eNSAC_QueueAdmin);
00764     }
00765 
00766     // Produce the log output if required
00767     if (m_Server->IsLog()) {
00768         CDiagContext_Extra diag_extra = GetDiagContext().Extra();
00769         diag_extra.Print("queue", m_QueueName);
00770         ITERATE(TNSProtoParams, it, params) {
00771             diag_extra.Print(it->first, it->second);
00772         }
00773     }
00774 
00775     // Empty queue name is a synonim for hardcoded 'noname'.
00776     // To have exactly one string comparison, make the name empty if 'noname'
00777     if (m_QueueName == "noname" )
00778         m_QueueName = "";
00779 
00780     if (!m_QueueName.empty()) {
00781         try {
00782             m_QueueRef.Reset(m_Server->OpenQueue(m_QueueName));
00783         }
00784         catch (const CNetScheduleException &  ex) {
00785             if (ex.GetErrCode() == CNetScheduleException::eUnknownQueue) {
00786                 x_WriteMessageNoThrow("ERR:", string(ex.GetErrCodeString()) +
00787                                               ":" + ex.GetMsg());
00788                 ERR_POST("Server error: " << ex);
00789                 if (m_Server->IsLog()) {
00790                     CDiagContext::GetRequestContext().SetRequestStatus(ex.ErrCodeToHTTPStatusCode());
00791                     GetDiagContext().PrintRequestStop();
00792                     m_ConnContext.Reset();
00793                 }
00794                 m_Server->CloseConnection(&GetSocket());
00795                 return;
00796             }
00797             throw;
00798         }
00799 
00800         m_ClientId.AddCapability(eNSAC_Queue);
00801 
00802         CRef<CQueue>    q(GetQueue());
00803         if (q->IsWorkerAllowed(m_ClientId.GetAddress()))
00804             m_ClientId.AddCapability(eNSAC_Worker);
00805         if (q->IsSubmitAllowed(m_ClientId.GetAddress()))
00806             m_ClientId.AddCapability(eNSAC_Submitter);
00807     }
00808     else
00809         m_QueueRef.Reset(NULL);
00810 
00811     m_ProcessMessage = &CNetScheduleHandler::x_ProcessMsgRequest;
00812     x_SetQuickAcknowledge();
00813 }
00814 
00815 
00816 // Workhorse method
00817 void CNetScheduleHandler::x_ProcessMsgRequest(BUF buffer)
00818 {
00819     m_DiagContext.Reset(new CRequestContext());
00820     m_DiagContext->SetRequestID();
00821     InitDiagnostics();
00822 
00823     SParsedCmd      cmd;
00824     string          msg;
00825     try {
00826         s_ReadBufToString(buffer, msg);
00827         cmd = m_SingleCmdParser.ParseCommand(msg);
00828         x_PrintRequestStart(cmd);
00829     }
00830     catch (const CNSProtoParserException &  ex) {
00831         // Parsing is done before PrintRequestStart(...) so a diag context is
00832         // not created here - create it just to provide an error output
00833         x_OnCmdParserError(true, ex.GetMsg(), "");
00834         return;
00835     }
00836 
00837     const SCommandExtra &   extra = cmd.command->extra;
00838 
00839 
00840     // It throws an exception if the input is not valid
00841     m_CommandArguments.AssignValues(cmd.params, cmd.command->cmd);
00842 
00843 
00844     if (extra.processor == &CNetScheduleHandler::x_ProcessQuitSession) {
00845         m_ClientId.CheckAccess(extra.role, NULL);
00846         x_ProcessQuitSession(0);
00847         return;
00848     }
00849 
00850 
00851     // If the command requires queue, hold a hard reference to this
00852     // queue from a weak one (m_Queue) in queue_ref, and take C pointer
00853     // into queue_ptr. Otherwise queue_ptr is NULL, which is OK for
00854     // commands which does not require a queue.
00855     CRef<CQueue>        queue_ref;
00856     CQueue *            queue_ptr = NULL;
00857     if (extra.role & eNSAC_Queue) {
00858         if (m_QueueName.empty())
00859             NCBI_THROW(CNetScheduleException, eUnknownQueue, "Job queue is required");
00860         queue_ref.Reset(GetQueue());
00861         queue_ptr = queue_ref.GetPointer();
00862     }
00863     else if (extra.processor == &CNetScheduleHandler::x_ProcessStatistics ||
00864              extra.processor == &CNetScheduleHandler::x_ProcessRefuseSubmits) {
00865         // The STAT and REFUSESUBMITS commands could be with or without a queue
00866         try {
00867             queue_ref.Reset(GetQueue());
00868             queue_ptr = queue_ref.GetPointer();
00869         }
00870         catch (...) {
00871             // That means no queue were found, i.e.
00872             // the STAT or REFUSESUBMITS is for whole server
00873         }
00874     }
00875 
00876     m_ClientId.CheckAccess(extra.role, queue_ptr);
00877 
00878     // we want status request to be fast, skip version control
00879     if ((extra.processor != &CNetScheduleHandler::x_ProcessStatus) &&
00880         (extra.processor != &CNetScheduleHandler::x_ProcessFastStatusS) &&
00881         (extra.processor != &CNetScheduleHandler::x_ProcessFastStatusW))
00882         if (!m_ClientId.CheckVersion(queue_ptr)) {
00883             WriteMessage("ERR:eInvalidClientOrVersion:");
00884             m_Server->CloseConnection(&GetSocket());
00885             return;
00886         }
00887 
00888     if (queue_ptr)
00889         // The cient has a queue, so memorize the client
00890         queue_ptr->TouchClientsRegistry(m_ClientId);
00891 
00892     // Execute the command
00893     (this->*extra.processor)(queue_ptr);
00894 }
00895 
00896 
00897 // Message processors for x_ProcessSubmitBatch
00898 void CNetScheduleHandler::x_ProcessMsgBatchHeader(BUF buffer)
00899 {
00900     // Expecting BTCH size | ENDS
00901     try {
00902         string          msg;
00903         s_ReadBufToString(buffer, msg);
00904 
00905         SParsedCmd      cmd      = m_BatchHeaderParser.ParseCommand(msg);
00906         const string &  size_str = cmd.params["size"];
00907 
00908         if (!size_str.empty())
00909             m_BatchSize = NStr::StringToInt(size_str);
00910         else
00911             m_BatchSize = 0;
00912         (this->*cmd.command->extra.processor)(0);
00913     }
00914     catch (const CNSProtoParserException &  ex) {
00915         m_WithinBatchSubmit = false;
00916         m_Server->DecrementCurrentSubmitsCounter();
00917         x_OnCmdParserError(false, ex.GetMsg(), ", BTCH or ENDS expected");
00918         return;
00919     }
00920     catch (const CNetScheduleException &  ex) {
00921         m_WithinBatchSubmit = false;
00922         m_Server->DecrementCurrentSubmitsCounter();
00923         x_WriteMessageNoThrow("ERR:", string(ex.GetErrCodeString()) +
00924                                       ":" + ex.GetMsg());
00925         ERR_POST("Server error: " << ex);
00926         x_PrintRequestStop(ex.ErrCodeToHTTPStatusCode());
00927 
00928         m_BatchJobs.clear();
00929         m_ProcessMessage = &CNetScheduleHandler::x_ProcessMsgRequest;
00930     }
00931     catch (const CException &  ex) {
00932         m_WithinBatchSubmit = false;
00933         m_Server->DecrementCurrentSubmitsCounter();
00934         x_WriteMessageNoThrow("ERR:", "eProtocolSyntaxError:"
00935                                       "Error processing BTCH or ENDS.");
00936         ERR_POST("Error processing command: " << ex);
00937         x_PrintRequestStop(eStatus_BadRequest);
00938 
00939         m_BatchJobs.clear();
00940         m_ProcessMessage = &CNetScheduleHandler::x_ProcessMsgRequest;
00941     }
00942     catch (...) {
00943         m_WithinBatchSubmit = false;
00944         m_Server->DecrementCurrentSubmitsCounter();
00945         x_WriteMessageNoThrow("ERR:", "eInternalError:Unknown error "
00946                                       "while expecting BTCH or ENDS.");
00947         ERR_POST("Unknown error while expecting BTCH or ENDS");
00948         x_PrintRequestStop(eStatus_BadRequest);
00949 
00950         m_BatchJobs.clear();
00951         m_ProcessMessage = &CNetScheduleHandler::x_ProcessMsgRequest;
00952     }
00953 }
00954 
00955 
00956 void CNetScheduleHandler::x_ProcessMsgBatchJob(BUF buffer)
00957 {
00958     // Expecting:
00959     // "input" [aff="affinity_token"] [msk=1]
00960     string          msg;
00961     s_ReadBufToString(buffer, msg);
00962 
00963     CJob &          job = m_BatchJobs[m_BatchPos].first;
00964     TNSProtoParams  params;
00965     try {
00966         m_BatchEndParser.ParseArguments(msg, s_BatchArgs, &params);
00967         m_CommandArguments.AssignValues(params);
00968     }
00969     catch (const CNSProtoParserException &  ex) {
00970         m_WithinBatchSubmit = false;
00971         m_Server->DecrementCurrentSubmitsCounter();
00972         x_OnCmdParserError(false, ex.GetMsg(), "");
00973         return;
00974     }
00975     catch (const CNetScheduleException &  ex) {
00976         m_WithinBatchSubmit = false;
00977         m_Server->DecrementCurrentSubmitsCounter();
00978         x_WriteMessageNoThrow("ERR:", string(ex.GetErrCodeString()) +
00979                                       ":" + ex.GetMsg());
00980         ERR_POST(ex.GetMsg());
00981         x_PrintRequestStop(ex.ErrCodeToHTTPStatusCode());
00982 
00983         m_BatchJobs.clear();
00984         m_ProcessMessage = &CNetScheduleHandler::x_ProcessMsgRequest;
00985         return;
00986     }
00987     catch (const CException &  ex) {
00988         m_WithinBatchSubmit = false;
00989         m_Server->DecrementCurrentSubmitsCounter();
00990         x_WriteMessageNoThrow("ERR:", "eProtocolSyntaxError:"
00991                                       "Invalid batch submission, syntax error");
00992         ERR_POST("Error processing command: " << ex);
00993         x_PrintRequestStop(eStatus_BadRequest);
00994 
00995         m_BatchJobs.clear();
00996         m_ProcessMessage = &CNetScheduleHandler::x_ProcessMsgRequest;
00997         return;
00998     }
00999     catch (...) {
01000         m_WithinBatchSubmit = false;
01001         m_Server->DecrementCurrentSubmitsCounter();
01002         x_WriteMessageNoThrow("ERR:", "eProtocolSyntaxError:"
01003                                       "Arguments parsing unknown exception");
01004         ERR_POST("Arguments parsing unknown exception. Batch submit is rejected.");
01005         x_PrintRequestStop(eStatus_BadRequest);
01006 
01007         m_BatchJobs.clear();
01008         m_ProcessMessage = &CNetScheduleHandler::x_ProcessMsgRequest;
01009         return;
01010     }
01011 
01012     job.SetInput(NStr::ParseEscapes(m_CommandArguments.input));
01013 
01014     // Memorize the job affinity if given
01015     if ( !m_CommandArguments.affinity_token.empty() )
01016         m_BatchJobs[m_BatchPos].second =
01017                     NStr::ParseEscapes(m_CommandArguments.affinity_token);
01018 
01019     job.SetMask(m_CommandArguments.job_mask);
01020     job.SetSubmNotifPort(m_BatchSubmPort);
01021     job.SetSubmNotifTimeout(m_BatchSubmTimeout);
01022     job.SetClientIP(m_BatchClientIP);
01023     job.SetClientSID(m_BatchClientSID);
01024 
01025     if (++m_BatchPos >= m_BatchSize)
01026         m_ProcessMessage = &CNetScheduleHandler::x_ProcessMsgBatchSubmit;
01027 }
01028 
01029 
01030 void CNetScheduleHandler::x_ProcessMsgBatchSubmit(BUF buffer)
01031 {
01032     // Expecting ENDB
01033     try {
01034         string      msg;
01035         s_ReadBufToString(buffer, msg);
01036         m_BatchEndParser.ParseCommand(msg);
01037     }
01038     catch (const CNSProtoParserException &  ex) {
01039         m_WithinBatchSubmit = false;
01040         m_Server->DecrementCurrentSubmitsCounter();
01041         x_OnCmdParserError(false, ex.GetMsg(), ", ENDB expected");
01042         return;
01043     }
01044     catch (const CException &  ex) {
01045         m_WithinBatchSubmit = false;
01046         m_Server->DecrementCurrentSubmitsCounter();
01047         BUF_Read(buffer, 0, BUF_Size(buffer));
01048         x_WriteMessageNoThrow("ERR:", "eProtocolSyntaxError:"
01049                               "Batch submit error - unexpected end of batch");
01050         ERR_POST("Error processing command: " << ex);
01051         x_PrintRequestStop(eStatus_BadRequest);
01052 
01053         m_BatchJobs.clear();
01054         m_ProcessMessage = &CNetScheduleHandler::x_ProcessMsgRequest;
01055         return;
01056     }
01057     catch (...) {
01058         m_WithinBatchSubmit = false;
01059         m_Server->DecrementCurrentSubmitsCounter();
01060         x_WriteMessageNoThrow("ERR:", "eInternalError:"
01061                               "Unknown error while expecting ENDB.");
01062         ERR_POST("Unknown error while expecting ENDB.");
01063         x_PrintRequestStop(eStatus_BadRequest);
01064 
01065         m_BatchJobs.clear();
01066         m_ProcessMessage = &CNetScheduleHandler::x_ProcessMsgRequest;
01067         return;
01068     }
01069 
01070     double      comm_elapsed = m_BatchStopWatch.Elapsed();
01071 
01072     // BTCH logging is in a separate context
01073     CRef<CRequestContext>   current_context(& CDiagContext::GetRequestContext());
01074     try {
01075         if (m_Server->IsLog()) {
01076             CRequestContext *   ctx(new CRequestContext());
01077             ctx->SetRequestID();
01078             GetDiagContext().SetRequestContext(ctx);
01079             GetDiagContext().PrintRequestStart()
01080                             .Print("_type", "cmd")
01081                             .Print("bsub", m_DiagContext->GetRequestID())
01082                             .Print("cmd", "BTCH")
01083                             .Print("size", m_BatchJobs.size());
01084             ctx->SetRequestStatus(CNetScheduleHandler::eStatus_OK);
01085         }
01086 
01087         // we have our batch now
01088         CStopWatch  sw(CStopWatch::eStart);
01089         unsigned    job_id = GetQueue()->SubmitBatch(m_ClientId,
01090                                                      m_BatchJobs,
01091                                                      m_BatchGroup);
01092         double      db_elapsed = sw.Elapsed();
01093 
01094         if (m_Server->IsLog())
01095             GetDiagContext().Extra()
01096                 .Print("start_job", job_id)
01097                 .Print("commit_time",
01098                        NStr::DoubleToString(comm_elapsed, 4, NStr::fDoubleFixed))
01099                 .Print("transaction_time",
01100                        NStr::DoubleToString(db_elapsed, 4, NStr::fDoubleFixed));
01101 
01102         WriteMessage("OK:", NStr::UIntToString(job_id) + " " +
01103                             m_Server->GetHost().c_str() + " " +
01104                             NStr::UIntToString(unsigned(m_Server->GetPort())));
01105 
01106     }
01107     catch (const CNetScheduleException &  ex) {
01108         m_WithinBatchSubmit = false;
01109         m_Server->DecrementCurrentSubmitsCounter();
01110         if (m_Server->IsLog()) {
01111             CDiagContext::GetRequestContext().SetRequestStatus(ex.ErrCodeToHTTPStatusCode());
01112             GetDiagContext().PrintRequestStop();
01113             GetDiagContext().SetRequestContext(current_context);
01114         }
01115         m_BatchJobs.clear();
01116         m_ProcessMessage = &CNetScheduleHandler::x_ProcessMsgRequest;
01117         throw;
01118     }
01119     catch (...) {
01120         m_WithinBatchSubmit = false;
01121         m_Server->DecrementCurrentSubmitsCounter();
01122         if (m_Server->IsLog()) {
01123             CDiagContext::GetRequestContext().SetRequestStatus(eStatus_ServerError);
01124             GetDiagContext().PrintRequestStop();
01125             GetDiagContext().SetRequestContext(current_context);
01126         }
01127         m_BatchJobs.clear();
01128         m_ProcessMessage = &CNetScheduleHandler::x_ProcessMsgRequest;
01129         throw;
01130     }
01131 
01132     if (m_Server->IsLog()) {
01133         GetDiagContext().PrintRequestStop();
01134         GetDiagContext().SetRequestContext(current_context);
01135     }
01136 
01137     m_ProcessMessage = &CNetScheduleHandler::x_ProcessMsgBatchHeader;
01138 }
01139 
01140 
01141 //////////////////////////////////////////////////////////////////////////
01142 // Process* methods for processing commands
01143 
01144 void CNetScheduleHandler::x_ProcessFastStatusS(CQueue* q)
01145 {
01146     bool            cmdv2(m_CommandArguments.cmd == "SST2");
01147     time_t          lifetime;
01148     TJobStatus      status = q->GetStatusAndLifetime(m_CommandArguments.job_id,
01149                                                      true, &lifetime);
01150 
01151 
01152     if (status == CNetScheduleAPI::eJobNotFound) {
01153         if (cmdv2)
01154             WriteMessage("ERR:eJobNotFound:");
01155         else
01156             WriteMessage("OK:", NStr::IntToString((int) status));
01157 
01158         ERR_POST(Warning << m_CommandArguments.cmd << " for unknown job: "
01159                          << m_CommandArguments.job_key);
01160         x_PrintRequestStop(eStatus_NotFound);
01161     } else {
01162         if (cmdv2)
01163             WriteMessage(
01164                     "OK:job_status=" +
01165                     CNetScheduleAPI::StatusToString(status) +
01166                     "&job_exptime=" + NStr::NumericToString(lifetime)
01167                     );
01168         else
01169             WriteMessage("OK:", NStr::IntToString((int) status));
01170         x_PrintRequestStop(eStatus_OK);
01171     }
01172 }
01173 
01174 
01175 void CNetScheduleHandler::x_ProcessFastStatusW(CQueue* q)
01176 {
01177     bool            cmdv2(m_CommandArguments.cmd == "WST2");
01178     time_t          lifetime;
01179     TJobStatus      status = q->GetStatusAndLifetime(m_CommandArguments.job_id,
01180                                                      false, &lifetime);
01181 
01182 
01183     if (status == CNetScheduleAPI::eJobNotFound) {
01184         if (cmdv2)
01185             WriteMessage("ERR:eJobNotFound:");
01186         else
01187             WriteMessage("OK:", NStr::IntToString((int) status));
01188 
01189         ERR_POST(Warning << m_CommandArguments.cmd << " for unknown job: "
01190                          << m_CommandArguments.job_key);
01191         x_PrintRequestStop(eStatus_NotFound);
01192     } else {
01193         if (cmdv2)
01194             WriteMessage(
01195                     "OK:job_status=" +
01196                     CNetScheduleAPI::StatusToString(status) +
01197                     "&job_exptime=" + NStr::NumericToString(lifetime)
01198                     );
01199         else
01200             WriteMessage("OK:", NStr::IntToString((int) status));
01201         x_PrintRequestStop(eStatus_OK);
01202     }
01203 }
01204 
01205 
01206 void CNetScheduleHandler::x_ProcessChangeAffinity(CQueue* q)
01207 {
01208     // This functionality requires client name and the session
01209     x_CheckNonAnonymousClient("use CHAFF command");
01210 
01211     if (m_CommandArguments.aff_to_add.empty() &&
01212         m_CommandArguments.aff_to_del.empty()) {
01213         ERR_POST(Warning << "CHAFF with neither add list nor del list");
01214         x_WriteMessageNoThrow("ERR:eInvalidParameter:");
01215         x_PrintRequestStop(eStatus_BadRequest);
01216         return;
01217     }
01218 
01219     list<string>    aff_to_add_list;
01220     list<string>    aff_to_del_list;
01221 
01222     NStr::Split(NStr::ParseEscapes(m_CommandArguments.aff_to_add),
01223                 "\t,", aff_to_add_list, NStr::eNoMergeDelims);
01224     NStr::Split(NStr::ParseEscapes(m_CommandArguments.aff_to_del),
01225                 "\t,", aff_to_del_list, NStr::eNoMergeDelims);
01226 
01227     string  msg = q->ChangeAffinity(m_ClientId, aff_to_add_list,
01228                                                 aff_to_del_list);
01229     if (msg.empty())
01230         WriteMessage("OK:");
01231     else
01232         WriteMessage("OK:WARNING:", msg + ";");
01233     x_PrintRequestStop(eStatus_OK);
01234 }
01235 
01236 
01237 void CNetScheduleHandler::x_ProcessSubmit(CQueue* q)
01238 {
01239     if (q->GetRefuseSubmits() || m_Server->GetRefuseSubmits()) {
01240         WriteMessage("ERR:eSubmitsDisabled:");
01241         x_PrintRequestStop(eStatus_SubmitRefused);
01242         return;
01243     }
01244 
01245     if (m_Server->IncrementCurrentSubmitsCounter() < kSubmitCounterInitialValue) {
01246         // This is a drained shutdown mode
01247         m_Server->DecrementCurrentSubmitsCounter();
01248         WriteMessage("ERR:eSubmitsDisabled:");
01249         x_PrintRequestStop(eStatus_SubmitRefused);
01250         return;
01251     }
01252 
01253 
01254     CJob        job(m_CommandArguments);
01255 
01256     // Never leave Client IP empty, if we're not provided with a real one,
01257     // use peer address as a last resort. See also x_ProcessSubmitBatch.
01258     if (m_CommandArguments.ip.empty())
01259         job.SetClientIP(NS_FormatIPAddress(m_ClientId.GetAddress()));
01260 
01261     try {
01262         WriteMessage("OK:", q->MakeKey(q->Submit(m_ClientId, job,
01263                                              m_CommandArguments.affinity_token,
01264                                              m_CommandArguments.group)));
01265         m_Server->DecrementCurrentSubmitsCounter();
01266     } catch (...) {
01267         m_Server->DecrementCurrentSubmitsCounter();
01268         throw;
01269     }
01270 
01271     // There is no need to log the job key, it is logged at lower level
01272     // together with all the submitted job parameters
01273     x_PrintRequestStop(eStatus_OK);
01274 }
01275 
01276 
01277 void CNetScheduleHandler::x_ProcessSubmitBatch(CQueue* q)
01278 {
01279     if (q->GetRefuseSubmits() || m_Server->GetRefuseSubmits()) {
01280         WriteMessage("ERR:eSubmitsDisabled:");
01281         x_PrintRequestStop(eStatus_SubmitRefused);
01282         return;
01283     }
01284 
01285     if (m_Server->IncrementCurrentSubmitsCounter() < kSubmitCounterInitialValue) {
01286         // This is a drained shutdown mode
01287         m_Server->DecrementCurrentSubmitsCounter();
01288         WriteMessage("ERR:eSubmitsDisabled:");
01289         x_PrintRequestStop(eStatus_SubmitRefused);
01290         return;
01291     }
01292 
01293     try {
01294         // Memorize the fact that batch submit started
01295         m_WithinBatchSubmit = true;
01296 
01297         m_BatchSubmPort    = m_CommandArguments.port;
01298         m_BatchSubmTimeout = m_CommandArguments.timeout;
01299         if (!m_CommandArguments.ip.empty())
01300             m_BatchClientIP = m_CommandArguments.ip;
01301         else
01302             m_BatchClientIP = NS_FormatIPAddress(m_ClientId.GetAddress());
01303         m_BatchClientSID   = m_CommandArguments.sid;
01304         m_BatchGroup       = m_CommandArguments.group;
01305 
01306         WriteMessage("OK:Batch submit ready");
01307         m_ProcessMessage = &CNetScheduleHandler::x_ProcessMsgBatchHeader;
01308     }
01309     catch (...) {
01310         // WriteMessage can generate an exception
01311         m_WithinBatchSubmit = false;
01312         m_Server->DecrementCurrentSubmitsCounter();
01313         m_ProcessMessage = &CNetScheduleHandler::x_ProcessMsgRequest;
01314         throw;
01315     }
01316 }
01317 
01318 
01319 void CNetScheduleHandler::x_ProcessBatchStart(CQueue*)
01320 {
01321     m_BatchPos = 0;
01322     m_BatchStopWatch.Restart();
01323     m_BatchJobs.resize(m_BatchSize);
01324     if (m_BatchSize)
01325         m_ProcessMessage = &CNetScheduleHandler::x_ProcessMsgBatchJob;
01326     else
01327         // Unfortunately, because batches can be generated by
01328         // client program, we better honor zero size ones.
01329         // Skip right to waiting for 'ENDB'.
01330         m_ProcessMessage = &CNetScheduleHandler::x_ProcessMsgBatchSubmit;
01331 }
01332 
01333 
01334 void CNetScheduleHandler::x_ProcessBatchSequenceEnd(CQueue*)
01335 {
01336     m_WithinBatchSubmit = false;
01337     m_Server->DecrementCurrentSubmitsCounter();
01338     m_BatchJobs.clear();
01339     m_ProcessMessage = &CNetScheduleHandler::x_ProcessMsgRequest;
01340     WriteMessage("OK:");
01341     x_PrintRequestStop(eStatus_OK);
01342 }
01343 
01344 
01345 void CNetScheduleHandler::x_ProcessCancel(CQueue* q)
01346 {
01347     // Job key or a group must be given
01348     if (m_CommandArguments.job_id == 0 && m_CommandArguments.group.empty()) {
01349         x_WriteMessageNoThrow("ERR:eInvalidParameter:"
01350                               "Job key or group must be given");
01351         LOG_POST(Message << Warning << "CANCEL must have a job key or a group");
01352         x_PrintRequestStop(eStatus_BadRequest);
01353         return;
01354     }
01355 
01356     if (m_CommandArguments.job_id != 0 && !m_CommandArguments.group.empty()) {
01357         x_WriteMessageNoThrow("ERR:eInvalidParameter:"
01358                               "CANCEL can accept a job key or "
01359                               "a group but not both");
01360         LOG_POST(Message << Warning << "CANCEL must have only one "
01361                                        "argument - a job key or a group");
01362         x_PrintRequestStop(eStatus_BadRequest);
01363         return;
01364     }
01365 
01366     // Here: one argument is given - a job key or a group
01367     if (!m_CommandArguments.group.empty()) {
01368         // CANCEL for a group
01369         q->CancelGroup(m_ClientId, m_CommandArguments.group);
01370         WriteMessage("OK:");
01371         x_PrintRequestStop(eStatus_OK);
01372         return;
01373     }
01374 
01375     // Here: CANCEL for a job
01376     switch (q->Cancel(m_ClientId, m_CommandArguments.job_id)) {
01377         case CNetScheduleAPI::eJobNotFound:
01378             x_WriteMessageNoThrow("OK:WARNING:Job not found;");
01379             ERR_POST(Warning << "CANCEL for unknown job: "
01380                              << m_CommandArguments.job_key);
01381             x_PrintRequestStop(eStatus_NotFound);
01382             break;
01383         case CNetScheduleAPI::eCanceled:
01384             WriteMessage("OK:WARNING:Already canceled;");
01385             x_PrintRequestStop(eStatus_OK);
01386             break;
01387         default:
01388             WriteMessage("OK:");
01389             x_PrintRequestStop(eStatus_OK);
01390     }
01391 }
01392 
01393 
01394 void CNetScheduleHandler::x_ProcessStatus(CQueue* q)
01395 {
01396     CJob            job;
01397     bool            cmdv2 = (m_CommandArguments.cmd == "STATUS2");
01398     time_t          lifetime;
01399 
01400     if (q->ReadAndTouchJob(m_CommandArguments.job_id, job, &lifetime) ==
01401                 CNetScheduleAPI::eJobNotFound) {
01402         // Here: there is no such a job
01403         if (cmdv2)
01404             x_WriteMessageNoThrow("ERR:eJobNotFound:");
01405         else
01406             x_WriteMessageNoThrow("OK:",
01407                      NStr::IntToString((int) CNetScheduleAPI::eJobNotFound));
01408         ERR_POST(Warning << m_CommandArguments.cmd << " for unknown job: "
01409                          << m_CommandArguments.job_key);
01410         x_PrintRequestStop(eStatus_NotFound);
01411         return;
01412     }
01413 
01414     // Here: the job was found
01415     if (cmdv2)
01416         WriteMessage("OK:",
01417                      "job_status=" + CNetScheduleAPI::StatusToString(job.GetStatus()) +
01418                      "&job_exptime=" + NStr::NumericToString(lifetime) +
01419                      "&ret_code=" + NStr::IntToString(job.GetRetCode()) +
01420                      "&output=" + NStr::URLEncode(job.GetOutput()) +
01421                      "&err_msg=" + NStr::URLEncode(job.GetErrorMsg()) +
01422                      "&input=" + NStr::URLEncode(job.GetInput())
01423                     );
01424 
01425     else
01426         WriteMessage("OK:", NStr::IntToString((int) job.GetStatus()) +
01427                             " " + NStr::IntToString(job.GetRetCode()) +
01428                             " \""   + NStr::PrintableString(job.GetOutput()) +
01429                             "\" \"" + NStr::PrintableString(job.GetErrorMsg()) +
01430                             "\" \"" + NStr::PrintableString(job.GetInput()) +
01431                             "\"");
01432     x_PrintRequestStop(eStatus_OK);
01433 }
01434 
01435 
01436 void CNetScheduleHandler::x_ProcessGetJob(CQueue* q)
01437 {
01438     // GET & WGET are first versions of the command
01439     bool    cmdv2(m_CommandArguments.cmd == "GET2");
01440 
01441     if (cmdv2) {
01442         x_CheckNonAnonymousClient("use GET2 command");
01443         x_CheckPortAndTimeout();
01444         x_CheckGetParameters();
01445     }
01446     else {
01447         // The affinity options are only for the second version of the command
01448         m_CommandArguments.wnode_affinity = false;
01449         m_CommandArguments.exclusive_new_aff = false;
01450 
01451         // The old clients must have any_affinity set to true
01452         // depending on the explicit affinity - to conform the old behavior
01453         m_CommandArguments.any_affinity = m_CommandArguments.affinity_token.empty();
01454     }
01455 
01456     list<string>    aff_list;
01457     NStr::Split(NStr::ParseEscapes(m_CommandArguments.affinity_token),
01458                 "\t,", aff_list, NStr::eNoMergeDelims);
01459 
01460     CJob            job;
01461     if (q->GetJobOrWait(m_ClientId,
01462                         m_CommandArguments.port,
01463                         m_CommandArguments.timeout,
01464                         time(0), &aff_list,
01465                         m_CommandArguments.wnode_affinity,
01466                         m_CommandArguments.any_affinity,
01467                         m_CommandArguments.exclusive_new_aff,
01468                         cmdv2,
01469                         &job) == false) {
01470         // Preferred affinities were reset for the client, so no job
01471         // and bad request
01472         WriteMessage("ERR:ePrefAffExpired:");
01473         x_PrintRequestStop(eStatus_BadRequest);
01474     } else {
01475         x_PrintGetJobResponse(q, job, cmdv2);
01476         x_PrintRequestStop(eStatus_OK);
01477     }
01478     return;
01479 }
01480 
01481 
01482 void CNetScheduleHandler::x_ProcessCancelWaitGet(CQueue* q)
01483 {
01484     x_CheckNonAnonymousClient("cancel waiting after WGET");
01485 
01486     q->CancelWaitGet(m_ClientId);
01487     WriteMessage("OK:");
01488     x_PrintRequestStop(eStatus_OK);
01489 }
01490 
01491 
01492 void CNetScheduleHandler::x_ProcessPut(CQueue* q)
01493 {
01494     bool    cmdv2(m_CommandArguments.cmd == "PUT2");
01495 
01496     if (cmdv2) {
01497         x_CheckNonAnonymousClient("use PUT2 command");
01498         x_CheckAuthorizationToken();
01499     }
01500 
01501     string      output = NStr::ParseEscapes(m_CommandArguments.output);
01502     TJobStatus  old_status = q->PutResult(m_ClientId, time(0),
01503                                           m_CommandArguments.job_id,
01504                                           m_CommandArguments.auth_token,
01505                                           m_CommandArguments.job_return_code,
01506                                           &output);
01507     if (old_status == CNetScheduleAPI::ePending ||
01508         old_status == CNetScheduleAPI::eRunning) {
01509         WriteMessage("OK:");
01510         x_PrintRequestStop(eStatus_OK);
01511         return;
01512     }
01513     if (old_status == CNetScheduleAPI::eDone) {
01514         WriteMessage("OK:WARNING:Already done;");
01515         ERR_POST(Warning << "Cannot accept job "
01516                          << m_CommandArguments.job_key
01517                          << " results. The job has already been done.");
01518         x_PrintRequestStop(eStatus_OK);
01519         return;
01520     }
01521     if (old_status == CNetScheduleAPI::eJobNotFound) {
01522         x_WriteMessageNoThrow("ERR:eJobNotFound:");
01523         ERR_POST(Warning << "Cannot accept job "
01524                          << m_CommandArguments.job_key
01525                          << " results. The job is unknown");
01526         x_PrintRequestStop(eStatus_NotFound);
01527         return;
01528     }
01529 
01530     // Here: invalid job status, nothing will be done
01531     x_WriteMessageNoThrow("ERR:eInvalidJobStatus:"
01532                           "Cannot accept job results; job is in " +
01533                           CNetScheduleAPI::StatusToString(old_status) +
01534                           " state");
01535     ERR_POST(Warning << "Cannot accept job "
01536                      << m_CommandArguments.job_key
01537                      << " results; job is in "
01538                      << CNetScheduleAPI::StatusToString(old_status)
01539                      << " state");
01540     x_PrintRequestStop(eStatus_InvalidJobStatus);
01541 }
01542 
01543 
01544 void CNetScheduleHandler::x_ProcessJobExchange(CQueue* q)
01545 {
01546     // The JXCG2 is not supported anymore, so this handler is called for
01547     // an old (obsolete) JXCG command only.
01548     //
01549     // The old clients must have any_affinity set to true
01550     // depending on the explicit affinity - to conform the old behavior
01551     m_CommandArguments.any_affinity = m_CommandArguments.affinity_token.empty();
01552 
01553 
01554     time_t      curr = time(0);
01555     string      output = NStr::ParseEscapes(m_CommandArguments.output);
01556 
01557     // PUT part
01558     TJobStatus      old_status = q->PutResult(m_ClientId, curr,
01559                                           m_CommandArguments.job_id,
01560                                           m_CommandArguments.auth_token,
01561                                           m_CommandArguments.job_return_code,
01562                                           &output);
01563 
01564     if (old_status == CNetScheduleAPI::eJobNotFound) {
01565         ERR_POST(Warning << "Cannot accept job "
01566                          << m_CommandArguments.job_key
01567                          << " results. The job is unknown");
01568     } else if (old_status != CNetScheduleAPI::ePending &&
01569                old_status != CNetScheduleAPI::eRunning) {
01570         ERR_POST(Warning << "Cannot accept job "
01571                          << m_CommandArguments.job_key
01572                          << " results. The job has already been done.");
01573     }
01574 
01575 
01576     // Get part
01577     list<string>        aff_list;
01578     NStr::Split(NStr::ParseEscapes(m_CommandArguments.affinity_token),
01579                 "\t,", aff_list, NStr::eNoMergeDelims);
01580 
01581     CJob                job;
01582     if (q->GetJobOrWait(m_ClientId,
01583                         m_CommandArguments.port,
01584                         m_CommandArguments.timeout,
01585                         curr, &aff_list,
01586                         m_CommandArguments.wnode_affinity,
01587                         m_CommandArguments.any_affinity,
01588                         false,
01589                         false,
01590                         &job) == false) {
01591         // Preferred affinities were reset for the client, so no job
01592         // and bad request
01593         WriteMessage("ERR:ePrefAffExpired:");
01594         x_PrintRequestStop(eStatus_BadRequest);
01595     } else {
01596         x_PrintGetJobResponse(q, job, false);
01597         x_PrintRequestStop(eStatus_OK);
01598     }
01599     return;
01600 }
01601 
01602 
01603 void CNetScheduleHandler::x_ProcessPutMessage(CQueue* q)
01604 {
01605     if (q->PutProgressMessage(m_CommandArguments.job_id,
01606                               m_CommandArguments.progress_msg)) {
01607         WriteMessage("OK:");
01608         x_PrintRequestStop(eStatus_OK);
01609     }
01610     else {
01611         x_WriteMessageNoThrow("ERR:eJobNotFound:");
01612         ERR_POST(Warning << "MPUT for unknown job "
01613                          << m_CommandArguments.job_key);
01614         x_PrintRequestStop(eStatus_NotFound);
01615     }
01616 }
01617 
01618 
01619 void CNetScheduleHandler::x_ProcessGetMessage(CQueue* q)
01620 {
01621     CJob        job;
01622     time_t      lifetime;
01623 
01624     if (q->ReadAndTouchJob(m_CommandArguments.job_id, job, &lifetime) !=
01625             CNetScheduleAPI::eJobNotFound) {
01626         WriteMessage("OK:", job.GetProgressMsg());
01627         x_PrintRequestStop(eStatus_OK);
01628     } else {
01629         x_WriteMessageNoThrow("ERR:eJobNotFound:");
01630         ERR_POST(Warning << m_CommandArguments.cmd
01631                          << "MGET for unknown job "
01632                          << m_CommandArguments.job_key);
01633         x_PrintRequestStop(eStatus_NotFound);
01634     }
01635 }
01636 
01637 
01638 void CNetScheduleHandler::x_ProcessPutFailure(CQueue* q)
01639 {
01640     bool    cmdv2(m_CommandArguments.cmd == "FPUT2");
01641 
01642     if (cmdv2) {
01643         x_CheckNonAnonymousClient("use FPUT2 command");
01644         x_CheckAuthorizationToken();
01645     }
01646 
01647     string      warning;
01648     TJobStatus  old_status = q->FailJob(
01649                                 m_ClientId,
01650                                 m_CommandArguments.job_id,
01651                                 m_CommandArguments.auth_token,
01652                                 NStr::ParseEscapes(m_CommandArguments.err_msg),
01653                                 NStr::ParseEscapes(m_CommandArguments.output),
01654                                 m_CommandArguments.job_return_code,
01655                                 warning);
01656 
01657     if (old_status == CNetScheduleAPI::eJobNotFound) {
01658         x_WriteMessageNoThrow("ERR:eJobNotFound:");
01659         ERR_POST(Warning << "FPUT for unknown job "
01660                          << m_CommandArguments.job_key);
01661         x_PrintRequestStop(eStatus_NotFound);
01662         return;
01663     }
01664 
01665     if (old_status == CNetScheduleAPI::eFailed) {
01666         WriteMessage("OK:WARNING:Already failed;");
01667         ERR_POST(Warning << "FPUT for already failed job "
01668                          << m_CommandArguments.job_key);
01669         x_PrintRequestStop(eStatus_OK);
01670         return;
01671     }
01672 
01673     if (old_status != CNetScheduleAPI::eRunning) {
01674         x_WriteMessageNoThrow("ERR:eInvalidJobStatus:Cannot fail job; "
01675                               "job is in " +
01676                               CNetScheduleAPI::StatusToString(old_status) +
01677                               " state");
01678         ERR_POST(Warning << "Cannot fail job "
01679                          << m_CommandArguments.job_key
01680                          << "; job is in "
01681                          << CNetScheduleAPI::StatusToString(old_status)
01682                          << " state");
01683         x_PrintRequestStop(eStatus_InvalidJobStatus);
01684         return;
01685     }
01686 
01687     // Here: all is fine
01688     if (warning.empty())
01689         WriteMessage("OK:");
01690     else
01691         WriteMessage("OK:WARNING:", warning + ";");
01692     x_PrintRequestStop(eStatus_OK);
01693 }
01694 
01695 
01696 void CNetScheduleHandler::x_ProcessDropQueue(CQueue* q)
01697 {
01698     q->Truncate();
01699     WriteMessage("OK:");
01700     x_PrintRequestStop(eStatus_OK);
01701 }
01702 
01703 
01704 void CNetScheduleHandler::x_ProcessReturn(CQueue* q)
01705 {
01706     bool    cmdv2(m_CommandArguments.cmd == "RETURN2");
01707 
01708     if (cmdv2) {
01709         x_CheckNonAnonymousClient("use RETURN2 command");
01710         x_CheckAuthorizationToken();
01711     }
01712 
01713     string          warning;
01714     TJobStatus      old_status = q->ReturnJob(m_ClientId,
01715                                               m_CommandArguments.job_id,
01716                                               m_CommandArguments.auth_token,
01717                                               warning);
01718 
01719     if (old_status == CNetScheduleAPI::eRunning) {
01720         if (warning.empty())
01721             WriteMessage("OK:");
01722         else
01723             WriteMessage("OK:WARNING:", warning + ";");
01724         x_PrintRequestStop(eStatus_OK);
01725         return;
01726     }
01727 
01728     if (old_status == CNetScheduleAPI::eJobNotFound) {
01729         x_WriteMessageNoThrow("ERR:eJobNotFound:");
01730         ERR_POST(Warning << "RETURN for unknown job "
01731                          << m_CommandArguments.job_key);
01732         x_PrintRequestStop(eStatus_NotFound);
01733         return;
01734     }
01735 
01736     x_WriteMessageNoThrow("ERR:eInvalidJobStatus:Cannot return job; job is in " +
01737                           CNetScheduleAPI::StatusToString(old_status) + " state");
01738     ERR_POST(Warning << "Cannot return job "
01739                      << m_CommandArguments.job_key
01740                      << "; job is in "
01741                      << CNetScheduleAPI::StatusToString(old_status) << " state");
01742 
01743     x_PrintRequestStop(eStatus_InvalidJobStatus);
01744 }
01745 
01746 
01747 void CNetScheduleHandler::x_ProcessJobDelayExpiration(CQueue* q)
01748 {
01749     if (m_CommandArguments.timeout <= 0) {
01750         x_WriteMessageNoThrow("ERR:eInvalidParameter:");
01751         ERR_POST(Warning << "Invalid timeout "
01752                          << m_CommandArguments.timeout
01753                          << " in JDEX for job "
01754                          << m_CommandArguments.job_key);
01755         x_PrintRequestStop(eStatus_BadRequest);
01756         return;
01757     }
01758 
01759     TJobStatus      status = q->JobDelayExpiration(m_CommandArguments.job_id,
01760                                                    m_CommandArguments.timeout);
01761 
01762     if (status == CNetScheduleAPI::eJobNotFound) {
01763         x_WriteMessageNoThrow("ERR:eJobNotFound:");
01764         ERR_POST(Warning << "JDEX for unknown job "
01765                          << m_CommandArguments.job_key);
01766         x_PrintRequestStop(eStatus_NotFound);
01767         return;
01768     }
01769     if (status != CNetScheduleAPI::eRunning) {
01770         x_WriteMessageNoThrow("ERR:eInvalidJobStatus:" +
01771                               CNetScheduleAPI::StatusToString(status));
01772         ERR_POST(Warning << "Cannot change expiration for job "
01773                          << m_CommandArguments.job_key
01774                          << " in status "
01775                          << CNetScheduleAPI::StatusToString(status));
01776 
01777         x_PrintRequestStop(eStatus_InvalidJobStatus);
01778         return;
01779     }
01780 
01781     // Here: the new timeout has been applied
01782     WriteMessage("OK:");
01783     x_PrintRequestStop(eStatus_OK);
01784 }
01785 
01786 
01787 void CNetScheduleHandler::x_ProcessDropJob(CQueue* q)
01788 {
01789     x_ProcessCancel(q);
01790 }
01791 
01792 
01793 void CNetScheduleHandler::x_ProcessStatistics(CQueue* q)
01794 {
01795     CSocket &           socket = GetSocket();
01796     const string &      what   = m_CommandArguments.option;
01797     time_t              curr   = time(0);
01798 
01799     if (q == NULL) {
01800         if (what == "JOBS")
01801             m_Server->PrintJobsStat(*this);
01802         else {
01803             // Transition counters for all the queues
01804             WriteMessage("OK:Started: ", m_Server->GetStartTime().AsString());
01805             if (m_Server->GetRefuseSubmits())
01806                 WriteMessage("OK:SubmitsDisabledEffective: 1");
01807             else
01808                 WriteMessage("OK:SubmitsDisabledEffective: 0");
01809             if (m_Server->IsDrainShutdown())
01810                 WriteMessage("OK:DrainedShutdown: 1");
01811             else
01812                 WriteMessage("OK:DrainedShutdown: 0");
01813             m_Server->PrintTransitionCounters(*this);
01814         }
01815         WriteMessage("OK:END");
01816         x_PrintRequestStop(eStatus_OK);
01817         return;
01818     }
01819 
01820 
01821     socket.DisableOSSendDelay(false);
01822     if (!what.empty() && what != "ALL") {
01823         x_StatisticsNew(q, what, curr);
01824         return;
01825     }
01826 
01827     WriteMessage("OK:Started: ", m_Server->GetStartTime().AsString());
01828     if (m_Server->GetRefuseSubmits() || q->GetRefuseSubmits())
01829         WriteMessage("OK:SubmitsDisabledEffective: 1");
01830     else
01831         WriteMessage("OK:SubmitsDisabledEffective: 0");
01832     if (q->GetRefuseSubmits())
01833         WriteMessage("OK:SubmitsDisabledPrivate: 1");
01834     else
01835         WriteMessage("OK:SubmitsDisabledPrivate: 0");
01836 
01837     for (size_t  k = 0; k < g_ValidJobStatusesSize; ++k) {
01838         TJobStatus      st = g_ValidJobStatuses[k];
01839         unsigned        count = q->CountStatus(st);
01840 
01841         WriteMessage("OK:", CNetScheduleAPI::StatusToString(st) +
01842                             ": " + NStr::UIntToString(count));
01843 
01844         if (what == "ALL") {
01845             TNSBitVector::statistics bv_stat;
01846             q->StatusStatistics(st, &bv_stat);
01847             WriteMessage("OK:",
01848                          "  bit_blk=" + NStr::UIntToString(bv_stat.bit_blocks) +
01849                          "; gap_blk=" + NStr::UIntToString(bv_stat.gap_blocks) +
01850                          "; mem_used=" + NStr::UIntToString(bv_stat.memory_used));
01851         }
01852     } // for
01853 
01854 
01855     if (what == "ALL") {
01856         WriteMessage("OK:[Berkeley DB Mutexes]:");
01857         {{
01858             CNcbiOstrstream ostr;
01859 
01860             try {
01861                 m_Server->PrintMutexStat(ostr);
01862                 ostr << ends;
01863                 WriteMessage("OK:", ostr.str());
01864             } catch (...) {
01865                 ostr.freeze(false);
01866                 throw;
01867             }
01868             ostr.freeze(false);
01869         }}
01870 
01871         WriteMessage("OK:[Berkeley DB Locks]:");
01872         {{
01873             CNcbiOstrstream ostr;
01874 
01875             try {
01876                 m_Server->PrintLockStat(ostr);
01877                 ostr << ends;
01878                 WriteMessage("OK:", ostr.str());
01879             } catch (...) {
01880                 ostr.freeze(false);
01881                 throw;
01882             }
01883             ostr.freeze(false);
01884         }}
01885 
01886         WriteMessage("OK:[Berkeley DB Memory Usage]:");
01887         {{
01888             CNcbiOstrstream ostr;
01889 
01890             try {
01891                 m_Server->PrintMemStat(ostr);
01892                 ostr << ends;
01893                 WriteMessage("OK:", ostr.str());
01894             } catch (...) {
01895                 ostr.freeze(false);
01896                 throw;
01897             }
01898             ostr.freeze(false);
01899         }}
01900 
01901         WriteMessage("OK:[BitVector block pool]:");
01902         {{
01903             const TBlockAlloc::TBucketPool::TBucketVector& bv =
01904                 TBlockAlloc::GetPoolVector();
01905             size_t      pool_vec_size = bv.size();
01906 
01907             WriteMessage("OK:Pool vector size: ",
01908                          NStr::SizetToString(pool_vec_size));
01909 
01910             for (size_t  i = 0; i < pool_vec_size; ++i) {
01911                 const TBlockAlloc::TBucketPool::TResourcePool* rp =
01912                     TBlockAlloc::GetPool(i);
01913                 if (rp) {
01914                     size_t pool_size = rp->GetSize();
01915                     if (pool_size) {
01916                         WriteMessage("OK:Pool [ " + NStr::SizetToString(i) +
01917                                      "] = " + NStr::SizetToString(pool_size));
01918                     }
01919                 }
01920             }
01921         }}
01922     }
01923 
01924     WriteMessage("OK:[Configured job submitters]:");
01925     q->PrintSubmHosts(*this);
01926 
01927     WriteMessage("OK:[Configured workers]:");
01928     q->PrintWNodeHosts(*this);
01929 
01930     WriteMessage("OK:[Transitions counters]:");
01931     q->PrintTransitionCounters(*this);
01932 
01933     WriteMessage("OK:END");
01934     x_PrintRequestStop(eStatus_OK);
01935 }
01936 
01937 
01938 void CNetScheduleHandler::x_ProcessReloadConfig(CQueue* q)
01939 {
01940     CNcbiApplication *      app = CNcbiApplication::Instance();
01941     bool                    reloaded = app->ReloadConfig(
01942                                             CMetaRegistry::fReloadIfChanged);
01943 
01944     if (reloaded) {
01945         const CNcbiRegistry &   reg = app->GetConfig();
01946 
01947         m_Server->Configure(reg);
01948 
01949         // Logging from the [server] section
01950         SNS_Parameters          params;
01951 
01952         params.Read(reg, "server");
01953         m_Server->SetNSParameters(params, true);
01954 
01955         WriteMessage("OK:");
01956     }
01957     else
01958         WriteMessage("OK:WARNING:Configuration file has not "
01959                      "been changed, RECO ignored;");
01960 
01961     x_PrintRequestStop(eStatus_OK);
01962 }
01963 
01964 
01965 void CNetScheduleHandler::x_ProcessActiveCount(CQueue* q)
01966 {
01967     string      active_jobs = NStr::UIntToString(m_Server->CountActiveJobs());
01968 
01969     WriteMessage("OK:", active_jobs);
01970     x_PrintRequestStop(eStatus_OK);
01971 }
01972 
01973 
01974 void CNetScheduleHandler::x_ProcessDump(CQueue* q)
01975 {
01976     if (m_CommandArguments.job_id == 0) {
01977         // The whole queue dump, may be restricted by one state
01978         if (m_CommandArguments.job_status == CNetScheduleAPI::eJobNotFound &&
01979             !m_CommandArguments.job_status_string.empty()) {
01980             // The state parameter was provided but it was not possible to
01981             // convert it into a valid job state
01982             x_WriteMessageNoThrow("ERR:eInvalidParameter:Status unknown: ",
01983                                   m_CommandArguments.job_status_string);
01984             x_PrintRequestStop(eStatus_BadRequest);
01985             return;
01986         }
01987 
01988         q->PrintAllJobDbStat(*this, m_CommandArguments.group,
01989                                     m_CommandArguments.job_status,
01990                                     m_CommandArguments.start_after_job_id,
01991                                     m_CommandArguments.count);
01992         WriteMessage("OK:END");
01993         x_PrintRequestStop(eStatus_OK);
01994         return;
01995     }
01996 
01997 
01998     // Certain job dump
01999     if (q->PrintJobDbStat(*this, m_CommandArguments.job_id) == 0) {
02000         // Nothing was printed because there is no such a job
02001         x_WriteMessageNoThrow("ERR:eJobNotFound:");
02002         x_PrintRequestStop(eStatus_NotFound);
02003         return;
02004     }
02005 
02006     WriteMessage("OK:END");
02007     x_PrintRequestStop(eStatus_OK);
02008     return;
02009 }
02010 
02011 
02012 void CNetScheduleHandler::x_ProcessShutdown(CQueue*)
02013 {
02014     if (m_CommandArguments.drain) {
02015         if (m_Server->GetShutdownFlag()) {
02016             WriteMessage("ERR:eShuttingDown:The server is in shutting down state");
02017             x_PrintRequestStop(eStatus_BadRequest);
02018             return;
02019         }
02020         WriteMessage("OK:");
02021         x_PrintRequestStop(eStatus_OK);
02022         m_Server->SetRefuseSubmits(true);
02023         m_Server->SetDrainShutdown();
02024         return;
02025     }
02026 
02027     // Unconditional immediate shutdown.
02028     x_PrintRequestStop(eStatus_OK);
02029     WriteMessage("OK:");
02030     m_Server->SetRefuseSubmits(true);
02031     m_Server->SetShutdownFlag();
02032     return;
02033 }
02034 
02035 
02036 void CNetScheduleHandler::x_ProcessGetConf(CQueue*)
02037 {
02038     CConn_SocketStream  ss(GetSocket().GetSOCK(), eNoOwnership);
02039 
02040     CNcbiApplication::Instance()->GetConfig().Write(ss);
02041     ss.flush();
02042     WriteMessage("OK:END");
02043     x_PrintRequestStop(eStatus_OK);
02044 }
02045 
02046 
02047 void CNetScheduleHandler::x_ProcessVersion(CQueue*)
02048 {
02049     WriteMessage("OK:",
02050                  "server_version=" NETSCHEDULED_VERSION
02051                  "&storage_version=" NETSCHEDULED_STORAGE_VERSION
02052                  "&protocol_version=" NETSCHEDULED_PROTOCOL_VERSION
02053                  "&build_date=" + NStr::URLEncode(NETSCHEDULED_BUILD_DATE) +
02054                  "&ns_node=" + m_Server->GetNodeID() +
02055                  "&ns_session=" + m_Server->GetSessionID());
02056     x_PrintRequestStop(eStatus_OK);
02057 }
02058 
02059 
02060 void CNetScheduleHandler::x_ProcessQList(CQueue*)
02061 {
02062     WriteMessage("OK:", m_Server->GetQueueNames(";"));
02063     x_PrintRequestStop(eStatus_OK);
02064 }
02065 
02066 
02067 void CNetScheduleHandler::x_ProcessQuitSession(CQueue*)
02068 {
02069     m_Server->CloseConnection(&GetSocket());
02070 
02071     // Log the close request
02072     CDiagContext::SetRequestContext(m_DiagContext);
02073     x_PrintRequestStop(eStatus_OK);
02074     CDiagContext::SetRequestContext(NULL);
02075 }
02076 
02077 
02078 void CNetScheduleHandler::x_ProcessCreateQueue(CQueue*)
02079 {
02080     m_Server->CreateQueue(m_CommandArguments.qname,
02081                           m_CommandArguments.qclass,
02082                           NStr::ParseEscapes(m_CommandArguments.comment));
02083     WriteMessage("OK:");
02084     x_PrintRequestStop(eStatus_OK);
02085 }
02086 
02087 
02088 void CNetScheduleHandler::x_ProcessDeleteQueue(CQueue*)
02089 {
02090     m_Server->DeleteQueue(m_CommandArguments.qname);
02091     WriteMessage("OK:");
02092     x_PrintRequestStop(eStatus_OK);
02093 }
02094 
02095 
02096 void CNetScheduleHandler::x_ProcessQueueInfo(CQueue*)
02097 {
02098     const string&   qname = m_CommandArguments.qname;
02099     int             kind;
02100     string          qclass;
02101     string          comment;
02102 
02103     m_Server->QueueInfo(qname, kind, &qclass, &comment);
02104     WriteMessage("OK:", NStr::IntToString(kind) + "\t" + qclass + "\t\"" +
02105                         NStr::PrintableString(comment) + "\"");
02106     x_PrintRequestStop(eStatus_OK);
02107 }
02108 
02109 
02110 void CNetScheduleHandler::x_ProcessGetParam(CQueue* q)
02111 {
02112     unsigned int    max_input_size;
02113     unsigned int    max_output_size;
02114 
02115     q->GetMaxIOSizes(max_input_size, max_output_size);
02116     WriteMessage("OK:", "max_input_size=" +
02117                         NStr::IntToString(max_input_size) + ";"
02118                         "max_output_size=" +
02119                         NStr::IntToString(max_output_size) + ";" +
02120                         NETSCHEDULED_FEATURES);
02121     x_PrintRequestStop(eStatus_OK);
02122 }
02123 
02124 
02125 void CNetScheduleHandler::x_ProcessGetConfiguration(CQueue* q)
02126 {
02127     CQueue::TParameterList      parameters = q->GetParameters();
02128 
02129     ITERATE(CQueue::TParameterList, it, parameters) {
02130         WriteMessage("OK:", it->first + '=' + it->second);
02131     }
02132     WriteMessage("OK:END");
02133     x_PrintRequestStop(eStatus_OK);
02134 }
02135 
02136 
02137 void CNetScheduleHandler::x_ProcessReading(CQueue* q)
02138 {
02139     x_CheckNonAnonymousClient("use READ command");
02140 
02141     CJob            job;
02142 
02143     q->GetJobForReading(m_ClientId, m_CommandArguments.timeout,
02144                                     m_CommandArguments.group,
02145                                     &job);
02146 
02147     unsigned int    job_id = job.GetId();
02148     string          job_key;
02149 
02150     if (job_id) {
02151         job_key = q->MakeKey(job_id);
02152         WriteMessage("OK:job_key=" + job_key +
02153                      "&auth_token=" + job.GetAuthToken() +
02154                      "&status=" + CNetScheduleAPI::StatusToString(job.GetStatus()));
02155     }
02156     else
02157         WriteMessage("OK:");
02158 
02159     if (m_Server->IsLog()) {
02160         if (job_id)
02161             GetDiagContext().Extra().Print("job_key", job_key)
02162                                     .Print("auth_token", job.GetAuthToken());
02163         else
02164             GetDiagContext().Extra().Print("job_key", "None");
02165     }
02166     x_PrintRequestStop(eStatus_OK);
02167 }
02168 
02169 
02170 void CNetScheduleHandler::x_ProcessConfirm(CQueue* q)
02171 {
02172     x_CheckNonAnonymousClient("use CFRM command");
02173     x_CheckAuthorizationToken();
02174 
02175     TJobStatus      old_status = q->ConfirmReadingJob(
02176                                             m_ClientId,
02177                                             m_CommandArguments.job_id,
02178                                             m_CommandArguments.auth_token);
02179     x_FinalizeReadCommand("CFRM", old_status);
02180 }
02181 
02182 
02183 void CNetScheduleHandler::x_ProcessReadFailed(CQueue* q)
02184 {
02185     x_CheckNonAnonymousClient("use FRED command");
02186     x_CheckAuthorizationToken();
02187 
02188     TJobStatus      old_status = q->FailReadingJob(
02189                                             m_ClientId,
02190                                             m_CommandArguments.job_id,
02191                                             m_CommandArguments.auth_token);
02192     x_FinalizeReadCommand("FRED", old_status);
02193 }
02194 
02195 
02196 void CNetScheduleHandler::x_ProcessReadRollback(CQueue* q)
02197 {
02198     x_CheckNonAnonymousClient("use RDRB command");
02199     x_CheckAuthorizationToken();
02200 
02201     TJobStatus      old_status = q->ReturnReadingJob(
02202                                             m_ClientId,
02203                                             m_CommandArguments.job_id,
02204                                             m_CommandArguments.auth_token);
02205     x_FinalizeReadCommand("RDRB", old_status);
02206 }
02207 
02208 
02209 void CNetScheduleHandler::x_FinalizeReadCommand(const string &  command,
02210                                                 TJobStatus      old_status)
02211 {
02212     if (old_status == CNetScheduleAPI::eJobNotFound) {
02213         WriteMessage("ERR:eJobNotFound:");
02214         ERR_POST(Warning << command << " for unknown job "
02215                          << m_CommandArguments.job_key);
02216         x_PrintRequestStop(eStatus_NotFound);
02217         return;
02218     }
02219 
02220     if (old_status != CNetScheduleAPI::eReading) {
02221         string      operation = "unknown";
02222 
02223         if (command == "CFRM")      operation = "confirm";
02224         else if (command == "FRED") operation = "fail";
02225         else if (command == "RDRB") operation = "rollback";
02226 
02227         x_WriteMessageNoThrow("ERR:eInvalidJobStatus:Cannot " +
02228                               operation + " job; job is in " +
02229                               CNetScheduleAPI::StatusToString(old_status) +
02230                               " state");
02231         ERR_POST(Warning << "Cannot " << operation << " read job; job is in "
02232                          << CNetScheduleAPI::StatusToString(old_status)
02233                          << " state");
02234 
02235         x_PrintRequestStop(eStatus_InvalidJobStatus);
02236         return;
02237     }
02238 
02239     WriteMessage("OK:");
02240     x_PrintRequestStop(eStatus_OK);
02241 }
02242 
02243 
02244 void CNetScheduleHandler::x_ProcessGetAffinityList(CQueue* q)
02245 {
02246     WriteMessage("OK:", q->GetAffinityList());
02247     x_PrintRequestStop(eStatus_OK);
02248 }
02249 
02250 
02251 void CNetScheduleHandler::x_ProcessClearWorkerNode(CQueue* q)
02252 {
02253     q->ClearWorkerNode(m_ClientId);
02254     WriteMessage("OK:");
02255     x_PrintRequestStop(eStatus_OK);
02256 }
02257 
02258 
02259 void CNetScheduleHandler::x_ProcessCancelQueue(CQueue* q)
02260 {
02261     q->CancelAllJobs(m_ClientId);
02262     WriteMessage("OK:");
02263     x_PrintRequestStop(eStatus_OK);
02264 }
02265 
02266 
02267 void CNetScheduleHandler::x_ProcessRefuseSubmits(CQueue* q)
02268 {
02269     if (m_CommandArguments.mode == false &&
02270             (m_Server->IsDrainShutdown() || m_Server->GetShutdownFlag())) {
02271         WriteMessage("ERR:eShuttingDown:"
02272                      "Server is in drained shutting down state");
02273         x_PrintRequestStop(eStatus_BadRequest);
02274         return;
02275     }
02276 
02277     if (q == NULL) {
02278         // This is a whole server scope request
02279         m_Server->SetRefuseSubmits(m_CommandArguments.mode);
02280         WriteMessage("OK:");
02281         x_PrintRequestStop(eStatus_OK);
02282         return;
02283     }
02284 
02285     // This is a queue scope request.
02286     q->SetRefuseSubmits(m_CommandArguments.mode);
02287 
02288     if (m_CommandArguments.mode == false &&
02289             m_Server->GetRefuseSubmits() == true)
02290         WriteMessage("OK:WARNING:Submits are disabled on the server level;");
02291     else
02292         WriteMessage("OK:");
02293     x_PrintRequestStop(eStatus_OK);
02294 }
02295 
02296 
02297 void CNetScheduleHandler::x_CmdNotImplemented(CQueue *)
02298 {
02299     x_WriteMessageNoThrow("ERR:eObsoleteCommand:");
02300     x_PrintRequestStop(eStatus_NotImplemented);
02301 }
02302 
02303 
02304 void CNetScheduleHandler::x_CmdObsolete(CQueue*)
02305 {
02306     x_WriteMessageNoThrow("OK:WARNING:Obsolete;");
02307     x_PrintRequestStop(eStatus_NotImplemented);
02308 }
02309 
02310 
02311 void CNetScheduleHandler::x_CheckNonAnonymousClient(const string &  message)
02312 {
02313     if (!m_ClientId.IsComplete())
02314         NCBI_THROW(CNetScheduleException, eInvalidClient,
02315                    "Anonymous client (no client_node and client_session"
02316                    " at handshake) cannot " + message);
02317     return;
02318 }
02319 
02320 
02321 void CNetScheduleHandler::x_CheckPortAndTimeout(void)
02322 {
02323     if ((m_CommandArguments.port != 0 &&
02324          m_CommandArguments.timeout == 0) ||
02325         (m_CommandArguments.port == 0 &&
02326          m_CommandArguments.timeout != 0))
02327         NCBI_THROW(CNetScheduleException, eInvalidParameter,
02328                    "Either both or neither of the port and "
02329                    "timeout parameters must be 0");
02330     return;
02331 }
02332 
02333 
02334 void CNetScheduleHandler::x_CheckAuthorizationToken(void)
02335 {
02336     if (m_CommandArguments.auth_token.empty())
02337         NCBI_THROW(CNetScheduleException, eInvalidAuthToken,
02338                    "Invalid authorization token. It cannot be empty.");
02339     return;
02340 }
02341 
02342 
02343 void CNetScheduleHandler::x_CheckGetParameters(void)
02344 {
02345     // Checks that the given GETx/JXCG parameters make sense
02346     if (m_CommandArguments.wnode_affinity == false &&
02347         m_CommandArguments.any_affinity == false &&
02348         m_CommandArguments.affinity_token.empty()) {
02349         ERR_POST(Warning << "The job request without explicit affinities, "
02350                             "without preferred affinities and "
02351                             "with any_aff flag set to false "
02352                             "will never match any job.");
02353         }
02354     if (m_CommandArguments.exclusive_new_aff == true &&
02355         m_CommandArguments.any_affinity == true)
02356         NCBI_THROW(CNetScheduleException, eInvalidParameter,
02357                    "It is forbidden to have both any_affinity and "
02358                    "exclusive_new_aff GET2 flags set to 1.");
02359     return;
02360 }
02361 
02362 
02363 void CNetScheduleHandler::x_PrintRequestStart(const SParsedCmd& cmd)
02364 {
02365     if (m_Server->IsLog()) {
02366         CDiagContext_Extra    ctxt_extra =
02367                 GetDiagContext().PrintRequestStart()
02368                             .Print("_type", "cmd")
02369                             .Print("cmd", cmd.command->cmd)
02370                             .Print("peer", GetSocket().GetPeerAddress(eSAF_IP))
02371                             .Print("conn", m_ConnContext->GetRequestID());
02372 
02373         // SUMBIT parameters should not be logged. The new job attributes will
02374         // be logged when a new job is actually submitted.
02375         if (cmd.command->extra.processor != &CNetScheduleHandler::x_ProcessSubmit) {
02376             ITERATE(TNSProtoParams, it, cmd.params) {
02377                 ctxt_extra.Print(it->first, it->second);
02378             }
02379         }
02380     }
02381     m_DiagContext->SetRequestStatus(eStatus_OK);
02382 }
02383 
02384 
02385 void CNetScheduleHandler::x_PrintRequestStart(CTempString  msg)
02386 {
02387     if (m_Server->IsLog())
02388         GetDiagContext().PrintRequestStart()
02389                 .Print("_type", "cmd")
02390                 .Print("info", msg)
02391                 .Print("peer",  GetSocket().GetPeerAddress(eSAF_IP))
02392                 .Print("conn", m_ConnContext->GetRequestID());
02393 }
02394 
02395 
02396 void CNetScheduleHandler::x_PrintRequestStop(unsigned int  status)
02397 {
02398     if (m_Server->IsLog()) {
02399         CDiagContext::GetRequestContext().SetRequestStatus(status);
02400         GetDiagContext().PrintRequestStop();
02401         CDiagContext::SetRequestContext(m_ConnContext);
02402         m_DiagContext.Reset();
02403     }
02404 }
02405 
02406 
02407 // The function forms a responce for various 'get job' commands and prints
02408 // extra to the log if required
02409 void
02410 CNetScheduleHandler::x_PrintGetJobResponse(const CQueue *  q,
02411                                            const CJob &    job,
02412                                            bool            cmdv2)
02413 {
02414     if (!job.GetId()) {
02415         // No suitable job found
02416         if (m_Server->IsLog())
02417             GetDiagContext().Extra().Print("job_key", "none");
02418         WriteMessage("OK:");
02419         return;
02420     }
02421 
02422     string      job_key = q->MakeKey(job.GetId());
02423     if (m_Server->IsLog()) {
02424         // The only piece required for logging is the job key
02425         GetDiagContext().Extra().Print("job_key", job_key);
02426     }
02427 
02428     string          output;
02429     if (cmdv2)
02430         output = "job_key=" + job_key +
02431                  "&input=" + NStr::URLEncode(job.GetInput()) +
02432                  "&affinity=" + NStr::URLEncode(q->GetAffinityTokenByID(job.GetAffinityId())) +
02433                  "&client_ip=" + NStr::URLEncode(job.GetClientIP()) +
02434                  "&client_sid=" + NStr::URLEncode(job.GetClientSID()) +
02435                  "&mask=" + NStr::UIntToString(job.GetMask()) +
02436                  "&auth_token=" + job.GetAuthToken();
02437     else
02438         output = job_key +
02439                  " \"" + NStr::PrintableString(job.GetInput()) + "\""
02440                  " \"" + NStr::PrintableString(
02441                            q->GetAffinityTokenByID(job.GetAffinityId())) + "\""
02442                  " \"" + job.GetClientIP() + " " + job.GetClientSID() + "\""
02443                  " " + NStr::UIntToString(job.GetMask());
02444 
02445 
02446     WriteMessage("OK:", output);
02447     return;
02448 }
02449 
02450 
02451 static const unsigned int   kMaxParserErrMsgLength = 128;
02452 
02453 // Write into socket, logs the message and closes the connection
02454 void CNetScheduleHandler::x_OnCmdParserError(bool            need_request_start,
02455                                              const string &  msg,
02456                                              const string &  suffix)
02457 {
02458     // Truncating is done to prevent output of an arbitrary long garbage
02459 
02460     if (need_request_start)
02461         x_PrintRequestStart("Invalid command");
02462 
02463     if (msg.size() < kMaxParserErrMsgLength * 2)
02464         ERR_POST("Error parsing command: " << msg + suffix);
02465     else
02466         ERR_POST("Error parsing command: " <<
02467                  msg.substr(0, kMaxParserErrMsgLength * 2) <<
02468                  " (TRUNCATED)" + suffix);
02469     x_PrintRequestStop(eStatus_BadRequest);
02470 
02471     if (msg.size() < kMaxParserErrMsgLength)
02472         x_WriteMessageNoThrow("ERR:", "eProtocolSyntaxError:" + msg + suffix);
02473     else
02474         x_WriteMessageNoThrow("ERR:", "eProtocolSyntaxError:" +
02475                                       msg.substr(0, kMaxParserErrMsgLength) +
02476                                       " (TRUNCATED)" + suffix);
02477 
02478     // Close the connection
02479     if (m_ConnContext.NotNull())
02480         m_ConnContext->SetRequestStatus(eStatus_BadRequest);
02481 
02482     m_Server->CloseConnection(&GetSocket());
02483     return;
02484 }
02485 
02486 
02487 void CNetScheduleHandler::x_StatisticsNew(CQueue *        q,
02488                                           const string &  what,
02489                                           time_t          curr)
02490 {
02491     if (what == "CLIENTS")
02492         q->PrintClientsList(*this,
02493                             m_CommandArguments.comment == "VERBOSE");
02494     else if (what == "NOTIFICATIONS")
02495         q->PrintNotificationsList(*this,
02496                                   m_CommandArguments.comment == "VERBOSE");
02497     else if (what == "AFFINITIES")
02498         q->PrintAffinitiesList(*this,
02499                                m_CommandArguments.comment == "VERBOSE");
02500     else if (what == "GROUPS")
02501         q->PrintGroupsList(*this,
02502                            m_CommandArguments.comment == "VERBOSE");
02503     else if (what == "JOBS")
02504         q->PrintJobsStat(*this, m_CommandArguments.group,
02505                                 m_CommandArguments.affinity_token);
02506     else if (what == "WNODE")
02507         WriteMessage("OK:WARNING:Obsolete, use STAT CLIENTS instead;");
02508 
02509     WriteMessage("OK:END");
02510     x_PrintRequestStop(eStatus_OK);
02511 }
02512 
Modified on Wed May 23 12:58:06 2012 by modify_doxy.py rev. 337098