NCBI C++ ToolKit
id2_fetch_simple.cpp
Go to the documentation of this file.
00001 /*  $Id: id2_fetch_simple.cpp 53744 2012-04-12 20:24:29Z vasilche $
00002  * ===========================================================================
00003  *
00004  *                            PUBLIC DOMAIN NOTICE
00005  *               National Center for Biotechnology Information
00006  *
00007  *  This software/database is a "United States Government Work" under the
00008  *  terms of the United States Copyright Act.  It was written as part of
00009  *  the author's official duties as a United States Government employee and
00010  *  thus cannot be copyrighted.  This software/database is freely available
00011  *  to the public for use. The National Library of Medicine and the U.S.
00012  *  Government have not placed any restriction on its use or reproduction.
00013  *
00014  *  Although all reasonable efforts have been taken to ensure the accuracy
00015  *  and reliability of the software and data, the NLM and the U.S.
00016  *  Government do not and cannot warrant the performance or results that
00017  *  may be obtained by using this software or data. The NLM and the U.S.
00018  *  Government disclaim all warranties, express or implied, including
00019  *  warranties of performance, merchantability or fitness for any particular
00020  *  purpose.
00021  *
00022  *  Please cite the author in any work or product based on this material.
00023  *
00024  * ===========================================================================
00025  *
00026  * Author:  Denis Vakatov, Aleksey Grichenko
00027  *
00028  * File Description:
00029  *   New IDFETCH network client (get Seq-Entry by GI)
00030  */
00031 
00032 #include <ncbi_pch.hpp>
00033 #include <corelib/ncbiapp.hpp>
00034 #include <corelib/ncbienv.hpp>
00035 #include <corelib/ncbiargs.hpp>
00036 #include <corelib/ncbireg.hpp>
00037 #include <corelib/rwstream.hpp>
00038 
00039 #include <connect/ncbi_util.h>
00040 #include <connect/ncbi_socket.h>
00041 #include <connect/ncbi_core_cxx.hpp>
00042 #include <connect/ncbi_conn_stream.hpp>
00043 
00044 #include <serial/serial.hpp>
00045 #include <serial/objistrasnb.hpp>
00046 #include <serial/objostrasnb.hpp>
00047 #include <serial/objcopy.hpp>
00048 #include <serial/objectio.hpp>
00049 #include <serial/iterator.hpp>
00050 #include <serial/impl/stdtypes.hpp>
00051 
00052 #include <objects/id2/ID2_Request_Packet.hpp>
00053 #include <objects/id2/ID2_Request.hpp>
00054 #include <objects/id2/ID2_Request_Get_Blob_Id.hpp>
00055 #include <objects/id2/ID2_Request_Get_Blob_Info.hpp>
00056 #include <objects/id2/ID2_Request_Get_Seq_id.hpp>
00057 #include <objects/id2/ID2_Get_Blob_Details.hpp>
00058 #include <objects/id2/ID2_Seq_id.hpp>
00059 #include <objects/id2/ID2_Blob_Id.hpp>
00060 #include <objects/id2/ID2_Reply.hpp>
00061 #include <objects/id2/ID2_Reply_Data.hpp>
00062 #include <objects/seqsplit/ID2S_Split_Info.hpp>
00063 #include <objects/seqsplit/ID2S_Chunk.hpp>
00064 #include <objects/seqloc/Seq_id.hpp>
00065 #include <objects/seqset/Seq_entry.hpp>
00066 #include <objects/seq/Seq_annot.hpp>
00067 
00068 #include <util/compress/reader_zlib.hpp>
00069 #include <util/compress/zlib.hpp>
00070 #include <corelib/rwstream.hpp>
00071 
00072 #include <dbapi/driver/driver_mgr.hpp>
00073 
00074 USING_NCBI_SCOPE;
00075 USING_SCOPE(objects);
00076 
00077 
00078 
00079 /////////////////////////////////
00080 //  CId1FetchApp::
00081 //
00082 
00083 class CId2FetchApp : public CNcbiApplication
00084 {
00085 public:
00086     virtual void Init(void);
00087     virtual int  Run(void);
00088     virtual void Exit(void);
00089 
00090 private:
00091     void x_InitConnection(bool show_init);
00092     void x_InitConnection(const string& server_name, bool show_init);
00093     void x_InitPubSeqConnection(const string& server_name, bool show_init);
00094     void x_SendRequestPacket(CID2_Request_Packet& packet);
00095     void x_ReadReply(CID2_Reply& reply);
00096     void x_ReadReply(CID2_Reply& reply, CObjectInfo& object);
00097     void x_ProcessRequest(CID2_Request& request, bool dump = true);
00098     void x_ProcessRequest(CID2_Request_Packet& packet, bool dump = true);
00099     void x_ProcessData(const CID2_Reply_Data& data);
00100     void x_SaveDataObject(const CObjectInfo& object, CNcbiOstream& out);
00101 
00102     AutoPtr<CConn_ServiceStream>  m_ID2Conn;
00103     AutoPtr<CDB_Connection>       m_PubSeqOS;
00104     AutoPtr<CNcbiIstream>         m_PubSeqOSReply;
00105     CNcbiOstream*                 m_OutFile;  // ID2 reply output
00106     CNcbiOstream*                 m_DataFile; // ID2 data output
00107     ESerialDataFormat             m_Format;
00108     bool                          m_SkipData;
00109     bool                          m_ParseData;
00110     bool                          m_CopyData;
00111     bool                          m_PipeData;
00112     int                           m_SerialNumber;
00113 };
00114 
00115 
00116 void CId2FetchApp::Init(void)
00117 {
00118     // Prepare command line descriptions
00119     //
00120 
00121     // Create
00122     AutoPtr<CArgDescriptions> arg_desc(new CArgDescriptions);
00123 
00124     // GI
00125     arg_desc->AddOptionalKey
00126         ("gi", "SeqEntryID",
00127          "GI id of the Seq-Entry to fetch",
00128          CArgDescriptions::eInteger);
00129     // Seq-id
00130     arg_desc->AddOptionalKey
00131         ("id", "SeqEntryID",
00132          "Seq-id of the Seq-Entry to fetch",
00133          CArgDescriptions::eString);
00134     // Seq-id
00135     arg_desc->AddOptionalKey
00136         ("blob_id", "BlobID",
00137          "Blob id of the Seq-Entry to fetch (sat,sat-key)",
00138          CArgDescriptions::eString);
00139     // Request
00140     arg_desc->AddOptionalKey
00141         ("req", "Request",
00142          "ID2 request in ASN.1 text format",
00143          CArgDescriptions::eString);
00144     arg_desc->AddOptionalKey
00145         ("packet", "Packet",
00146          "ID2 request packet in ASN.1 text format",
00147          CArgDescriptions::eString);
00148     arg_desc->AddOptionalKey
00149         ("in", "RequestFile",
00150          "File to read request(s) from",
00151          CArgDescriptions::eInputFile);
00152 
00153     // Skip blob data
00154     arg_desc->AddFlag("skip_data", "Skip blob data");
00155     // Copy blob data
00156     arg_desc->AddFlag("copy_data", "Copy blob data");
00157     // Parse blob data
00158     arg_desc->AddFlag("parse_data", "Parse blob data");
00159     // Pipe blob data
00160     arg_desc->AddFlag("pipe_data", "Pipe parsing blob data");
00161 
00162     // Print init reply
00163     arg_desc->AddFlag("show_init", "Show init reply");
00164 
00165     // Output format
00166     arg_desc->AddDefaultKey
00167         ("fmt", "OutputFormat",
00168          "Format to dump server reply in",
00169          CArgDescriptions::eString, "asn");
00170     arg_desc->SetConstraint("fmt", &(*new CArgAllow_Strings,
00171                                      "asn", "asnb", "xml", "none"));
00172 
00173     // Output file
00174     arg_desc->AddDefaultKey
00175         ("out", "ResultFile",
00176          "File to dump the resulting data to",
00177          CArgDescriptions::eOutputFile, "-", CArgDescriptions::fBinary);
00178 
00179     // ID2 data file
00180     arg_desc->AddOptionalKey
00181         ("data", "DataFile",
00182          "File to save blob data to",
00183          CArgDescriptions::eOutputFile, CArgDescriptions::fBinary);
00184 
00185     // Log file
00186     arg_desc->AddOptionalKey
00187         ("log", "LogFile",
00188          "File to post errors and messages to",
00189          CArgDescriptions::eOutputFile,
00190          0);
00191 
00192     // Server to connect
00193     arg_desc->AddDefaultKey
00194         ("server", "Server",
00195          "ID2 server name",
00196          CArgDescriptions::eString, "ID2");
00197 
00198     // Server to connect
00199     arg_desc->AddOptionalKey
00200         ("pubseqos", "PubSeqOS",
00201          "PubSeqOS server name",
00202          CArgDescriptions::eString);
00203 
00204     // Number of requests
00205     arg_desc->AddDefaultKey
00206         ("count", "Count",
00207          "Repeat request number of times",
00208          CArgDescriptions::eInteger, "1");
00209 
00210     // Program description
00211     string prog_description =
00212         "Fetch SeqEntry from ID server by its GI id";
00213     arg_desc->SetUsageContext(GetArguments().GetProgramBasename(),
00214                               prog_description, false);
00215 
00216     // Pass argument descriptions to the application
00217     //
00218     SetupArgDescriptions(arg_desc.release());
00219 }
00220 
00221 
00222 void CId2FetchApp::x_InitConnection(const string& server_name,
00223                                     bool show_init)
00224 {
00225     STimeout tmout;  tmout.sec = 9;  tmout.usec = 0;
00226     m_ID2Conn.reset(new CConn_ServiceStream
00227         (server_name, fSERV_Any, 0, 0, &tmout));
00228     
00229     CONN_Wait(m_ID2Conn->GetCONN(), eIO_Write, &tmout);
00230     const char* descr = CONN_Description(m_ID2Conn->GetCONN());
00231     if ( descr ) {
00232         LOG_POST("  connection description: " << descr);
00233     }
00234     
00235     x_InitConnection(show_init);
00236 }
00237 
00238 
00239 void CId2FetchApp::x_InitPubSeqConnection(const string& server_name,
00240                                           bool show_init)
00241 {
00242     C_DriverMgr drvMgr;
00243     map<string,string> args;
00244     args["packet"]="3584"; // 7*512
00245     args["version"]="125"; // for correct connection to OpenServer
00246     string errmsg;
00247     I_DriverContext* context = drvMgr.GetDriverContext("ftds", &errmsg, &args);
00248     if ( !context ) {
00249         ERR_POST(Fatal<<"Failed to create ftds context: "<<errmsg);
00250     }
00251 
00252     m_PubSeqOS.reset(context->Connect(server_name, "anyone", "allowed", 0));
00253     if ( !m_PubSeqOS.get() ) {
00254         ERR_POST(Fatal<<"Failed to open PubSeqOS connection: "<<server_name);
00255     }
00256 
00257     {{
00258         AutoPtr<CDB_LangCmd> cmd(m_PubSeqOS->LangCmd("set blob_stream on"));
00259         if ( cmd ) {
00260             cmd->Send();
00261             cmd->DumpResults();
00262         }
00263     }}
00264 
00265     x_InitConnection(show_init);
00266 }
00267 
00268 
00269 void CId2FetchApp::x_InitConnection(bool show_init)
00270 {
00271     m_SerialNumber = 0;
00272     CID2_Request req;
00273     req.SetRequest().SetInit();
00274     CID2_Request_Packet packet;
00275     packet.Set().push_back(Ref(&req));
00276 
00277     x_ProcessRequest(packet, show_init);
00278 }
00279 
00280 
00281 namespace {
00282     bool sx_FetchNextItem(CDB_Result& result, const CTempString& name)
00283     {
00284         while ( result.Fetch() ) {
00285             for ( size_t pos = 0; pos < result.NofItems(); ++pos ) {
00286                 if ( result.ItemName(pos) == name ) {
00287                     return true;
00288                 }
00289                 result.SkipItem();
00290             }
00291         }
00292         return false;
00293     }
00294     
00295     class CDB_Result_Reader : public CObject, public IReader
00296     {
00297     public:
00298         CDB_Result_Reader(AutoPtr<CDB_RPCCmd> cmd,
00299                           AutoPtr<CDB_Result> db_result)
00300             : m_DB_RPCCmd(cmd), m_DB_Result(db_result)
00301             {
00302             }
00303 
00304         ERW_Result Read(void*   buf,
00305                         size_t  count,
00306                         size_t* bytes_read)
00307             {
00308                 if ( !count ) {
00309                     if ( bytes_read ) {
00310                         *bytes_read = 0;
00311                     }
00312                     return eRW_Success;
00313                 }
00314                 size_t ret;
00315                 while ( (ret = m_DB_Result->ReadItem(buf, count)) == 0 ) {
00316                     if ( !sx_FetchNextItem(*m_DB_Result, "asnout") ) {
00317                         break;
00318                     }
00319                 }
00320                 if ( bytes_read ) {
00321                     *bytes_read = ret;
00322                 }
00323                 return ret? eRW_Success: eRW_Eof;
00324             }
00325         ERW_Result PendingCount(size_t* /*count*/)
00326             {
00327                 return eRW_NotImplemented;
00328             }
00329 
00330     private:
00331         AutoPtr<CDB_RPCCmd> m_DB_RPCCmd;
00332         AutoPtr<CDB_Result> m_DB_Result;
00333     };
00334 }
00335 
00336 
00337 void CId2FetchApp::x_SendRequestPacket(CID2_Request_Packet& packet)
00338 {
00339     // Open connection to ID1 server
00340     if ( m_ID2Conn ) {
00341         CObjectOStreamAsnBinary id2_server_output(*m_ID2Conn, false);
00342         // Send request packet to the server
00343         id2_server_output << packet;
00344         id2_server_output.Flush();
00345     }
00346     else {
00347         m_PubSeqOSReply.reset();
00348         const size_t MAX_ASN_IN = 20*1024;
00349         char buffer[MAX_ASN_IN];
00350         size_t size;
00351         {{
00352             CNcbiOstrstream mem_str(buffer, sizeof(buffer));
00353             {{
00354                 CObjectOStreamAsnBinary obj_str(mem_str);
00355                 obj_str << packet;
00356             }}
00357             if ( !mem_str ) {
00358                 ERR_POST(Fatal<<"PubSeqOS: packet size overflow");
00359             }
00360             size = mem_str.pcount();
00361         }}
00362         CDB_VarChar service("ID2");
00363         CDB_VarChar short_asn;
00364         CDB_LongBinary long_asn(size);
00365         long_asn.SetValue(buffer, size);
00366         CDB_TinyInt text_in(0);
00367         CDB_TinyInt text_out(0);
00368     
00369         AutoPtr<CDB_RPCCmd> cmd(m_PubSeqOS->RPC("os_asn_request"));
00370         cmd->SetParam("@service", &service);
00371         cmd->SetParam("@asnin", &short_asn);
00372         cmd->SetParam("@text", &text_in);
00373         cmd->SetParam("@out_text", &text_out);
00374         cmd->SetParam("@asnin_long", &long_asn);
00375         cmd->Send();
00376 
00377         AutoPtr<CDB_Result> dbr;
00378         while( cmd->HasMoreResults() ) {
00379             if ( cmd->HasFailed() ) {
00380                 ERR_POST(Fatal<<"PubSeqOS: failed RPC");
00381             }
00382             dbr = cmd->Result();
00383             if ( !dbr.get() || dbr->ResultType() != eDB_RowResult ) {
00384                 continue;
00385             }
00386             if ( sx_FetchNextItem(*dbr, "asnout") ) {
00387                 AutoPtr<CDB_Result_Reader> reader
00388                     (new CDB_Result_Reader(cmd, dbr));
00389                 m_PubSeqOSReply.reset(new CRStream(reader.release(),
00390                                                    0, 0,
00391                                                    CRWStreambuf::fOwnAll));
00392                 return;
00393             }
00394         }
00395         ERR_POST(Fatal<<"PubSeqOS: no more results");
00396     }
00397 }
00398 
00399 
00400 namespace {
00401     class COSSReader : public IReader
00402     {
00403     public:
00404         typedef vector<char> TOctetString;
00405         typedef list<TOctetString*> TOctetStringSequence;
00406 
00407         COSSReader(const TOctetStringSequence& in)
00408             : m_Input(in),
00409               m_CurVec(in.begin())
00410             {
00411                 x_SetVec();
00412             }
00413         
00414         virtual ERW_Result Read(void* buffer,
00415                                 size_t  count,
00416                                 size_t* bytes_read = 0)
00417             {
00418                 size_t pending = x_Pending();
00419                 count = min(pending, count);
00420                 if ( bytes_read ) {
00421                     *bytes_read = count;
00422                 }
00423                 if ( pending == 0 ) {
00424                     return eRW_Eof;
00425                 }
00426                 if ( count ) {
00427                     memcpy(buffer, &(**m_CurVec)[m_CurPos], count);
00428                     m_CurPos += count;
00429                 }
00430                 return eRW_Success;
00431             }
00432 
00433         virtual ERW_Result PendingCount(size_t* count)
00434             {
00435                 size_t pending = x_Pending();
00436                 *count = pending;
00437                 return pending? eRW_Success: eRW_Eof;
00438             }
00439 
00440     protected:
00441         void x_SetVec(void)
00442             {
00443                 m_CurPos = 0;
00444                 m_CurSize = m_CurVec == m_Input.end()? 0: (**m_CurVec).size();
00445             }
00446         size_t x_Pending(void)
00447             {
00448                 size_t size;
00449                 while ( (size = m_CurSize - m_CurPos) == 0 &&
00450                         m_CurVec != m_Input.end() ) {
00451                     ++m_CurVec;
00452                     x_SetVec();
00453                 }
00454                 return size;
00455             }
00456     private:
00457         const TOctetStringSequence& m_Input;
00458         TOctetStringSequence::const_iterator m_CurVec;
00459         size_t m_CurPos;
00460         size_t m_CurSize;
00461     };
00462 
00463     class CReadDataObjectHookSkip : public CReadClassMemberHook
00464     {
00465     public:
00466         virtual void ReadClassMember(CObjectIStream& in,
00467                                      const CObjectInfoMI& member) {
00468             CID2_Reply_Data& data =
00469                 *(CID2_Reply_Data*)member.GetClassObject().GetObjectPtr();
00470             for ( CIStreamContainerIterator it(in, member.GetMemberType());
00471                   it; it.NextElement(), ++it ) {
00472                 CObjectIStream::ByteBlock block(in);
00473                     
00474                 char buf[4096];
00475                 size_t count;
00476                 while ( (count = block.Read(buf, sizeof(buf))) != 0 ) {
00477                 }
00478                     
00479                 block.End();
00480             }
00481             data.SetData();
00482         }
00483     };
00484 
00485     class COSSPipeReader : public IReader
00486     {
00487     public:
00488         COSSPipeReader(CObjectIStream& in,
00489                        const CObjectInfoMI& member)
00490             : m_Input(in),
00491               m_Iter(in, member.GetMemberType()) {
00492         }
00493 
00494         ~COSSPipeReader() {
00495             do {
00496                 x_CloseBlock();
00497             } while ( x_OpenBlock() );
00498         }
00499         
00500         virtual ERW_Result Read(void* buffer,
00501                                 size_t  buffer_size,
00502                                 size_t* bytes_read = 0) {
00503             for ( ;; ) {
00504                 if ( m_Block.get() ) {
00505                     size_t count = m_Block->Read(buffer, buffer_size);
00506                     if ( count != 0 ) {
00507                         if ( bytes_read ) {
00508                             *bytes_read = count;
00509                         }
00510                         return eRW_Success;
00511                     }
00512                     else {
00513                         x_CloseBlock();
00514                     }
00515                 }
00516                 else {
00517                     if ( !x_OpenBlock() ) {
00518                         if ( bytes_read ) {
00519                             *bytes_read = 0;
00520                         }
00521                         return eRW_Eof;
00522                     }
00523                 }
00524             }
00525         }
00526         virtual ERW_Result PendingCount(size_t* /*count*/) {
00527             return eRW_NotImplemented;
00528         }
00529     protected:
00530         bool x_OpenBlock(void) {
00531             if ( !m_Iter ) {
00532                 return false;
00533             }
00534             m_Block.reset(new CObjectIStream::ByteBlock(m_Input));
00535             return true;
00536         }
00537         void x_CloseBlock(void) {
00538             if ( m_Block.get() ) {
00539                 m_Block->End();
00540                 m_Block.reset();
00541                 m_Iter.NextElement();
00542                 ++m_Iter;
00543             }
00544         }
00545     private:
00546         CObjectIStream& m_Input;
00547         CIStreamContainerIterator m_Iter;
00548         AutoPtr<CObjectIStream::ByteBlock> m_Block;
00549     };
00550 
00551     class CReadDataObjectHook : public CReadClassMemberHook
00552     {
00553     public:
00554         CObjectInfo m_Object;
00555 
00556         virtual void ReadClassMember(CObjectIStream& in,
00557                                      const CObjectInfoMI& member) {
00558             CID2_Reply_Data& data =
00559                 *(CID2_Reply_Data*)member.GetClassObject().GetObjectPtr();
00560 
00561             TTypeInfo obj_type = 0;
00562             switch ( data.GetData_type() ) {
00563             case CID2_Reply_Data::eData_type_seq_entry:
00564                 obj_type = CSeq_entry::GetTypeInfo();
00565                 break;
00566             case CID2_Reply_Data::eData_type_seq_annot:
00567                 obj_type = CSeq_annot::GetTypeInfo();
00568                 break;
00569             case CID2_Reply_Data::eData_type_id2s_split_info:
00570                 obj_type = CID2S_Split_Info::GetTypeInfo();
00571                 break;
00572             case CID2_Reply_Data::eData_type_id2s_chunk:
00573                 obj_type = CID2S_Chunk::GetTypeInfo();
00574                 break;
00575             default:
00576                 ERR_POST(Fatal << "Unknown data type in ID2_Reply_Data");
00577             }
00578                 
00579             ESerialDataFormat format;
00580             switch ( data.GetData_format() ) {
00581             case CID2_Reply_Data::eData_format_asn_binary:
00582                 format = eSerial_AsnBinary;
00583                 break;
00584             case CID2_Reply_Data::eData_format_asn_text:
00585                 format = eSerial_AsnText;
00586                 break;
00587             case CID2_Reply_Data::eData_format_xml:
00588                 format = eSerial_Xml;
00589                 break;
00590             default:
00591                 ERR_POST(Fatal << "Unknown data format in ID2_Reply_Data");
00592                 return;
00593             }
00594                 
00595             {{
00596                 COSSPipeReader reader(in, member);
00597                 CRStream stream(&reader);
00598                 AutoPtr<CObjectIStream> obj_stream;
00599                     
00600                 switch ( data.GetData_compression() ) {
00601                 case CID2_Reply_Data::eData_compression_none:
00602                 {
00603                     obj_stream.reset(CObjectIStream::Open(format, stream));
00604                     break;
00605                 }
00606                 case CID2_Reply_Data::eData_compression_gzip:
00607                 {
00608                     obj_stream.reset
00609                         (CObjectIStream::Open
00610                          (format,
00611                           *new CCompressionIStream(stream,
00612                                                    new CZipStreamDecompressor,
00613                                                    CCompressionIStream::fOwnProcessor),
00614                           eTakeOwnership));
00615                     break;
00616                 }
00617                 default:
00618                     ERR_POST(Fatal << "Unknown data compression in ID2_Reply_Data");
00619                     return;
00620                 }
00621                 _ASSERT( obj_stream.get() );
00622                 obj_stream->UseMemoryPool();
00623                 m_Object = CObjectInfo(obj_type);
00624                 obj_stream->Read(m_Object.GetObjectPtr(), obj_type);
00625             }}
00626 
00627             data.SetData();
00628         }
00629     };
00630 }
00631 
00632 
00633 void CId2FetchApp::x_ReadReply(CID2_Reply& reply)
00634 {
00635     // Read server response in ASN.1 binary format
00636     if ( m_ID2Conn ) {
00637         CObjectIStreamAsnBinary id2_server_input(*m_ID2Conn);
00638         id2_server_input >> reply;
00639     }
00640     else {
00641         CObjectIStreamAsnBinary id2_server_input(*m_PubSeqOSReply);
00642         id2_server_input >> reply;
00643     }
00644 }
00645 
00646 
00647 void CId2FetchApp::x_ReadReply(CID2_Reply& reply, CObjectInfo& object)
00648 {
00649     // Read server response in ASN.1 binary format
00650     CRef<CReadDataObjectHook> hook(new CReadDataObjectHook);
00651     CObjectHookGuard<CID2_Reply_Data> guard("data", *hook);
00652     if ( m_ID2Conn ) {
00653         CObjectIStreamAsnBinary id2_server_input(*m_ID2Conn);
00654         id2_server_input >> reply;
00655     }
00656     else {
00657         CObjectIStreamAsnBinary id2_server_input(*m_PubSeqOSReply);
00658         id2_server_input >> reply;
00659     }
00660     object = hook->m_Object;
00661 }
00662 
00663 
00664 void CId2FetchApp::x_ProcessRequest(CID2_Request& request, bool dump)
00665 {
00666     CID2_Request_Packet packet;
00667     packet.Set().push_back(Ref(&request));
00668     x_ProcessRequest(packet, dump);
00669 }
00670 
00671 
00672 void CId2FetchApp::x_ProcessRequest(CID2_Request_Packet& packet, bool dump)
00673 {
00674     CStopWatch sw(CStopWatch::eStart);
00675     NON_CONST_ITERATE(CID2_Request_Packet::Tdata, it, packet.Set()) {
00676         CID2_Request& req = **it;
00677         if ( !req.IsSetSerial_number() ) {
00678             req.SetSerial_number(m_SerialNumber++);
00679         }
00680     }
00681 
00682     if ( false && dump ) {
00683         CNcbiOstrstream ostr;
00684         ostr << MSerial_AsnText << packet;
00685         LOG_POST("\nProcessing request:\n" << ostr.rdbuf());
00686     }
00687 
00688     x_SendRequestPacket(packet);
00689 
00690     size_t remaining_count = packet.Set().size();
00691 
00692     CID2_Reply reply;
00693 
00694     double time_first = 0;
00695     while ( remaining_count > 0 ) {
00696         if ( m_PipeData ) {
00697             CObjectInfo object;
00698             x_ReadReply(reply, object);
00699             if ( !time_first ) time_first = sw.Elapsed();
00700             if ( object && m_DataFile ) {
00701                 x_SaveDataObject(object, *m_DataFile);
00702             }
00703         }
00704         else {
00705             x_ReadReply(reply);
00706             if ( !time_first ) time_first = sw.Elapsed();
00707             if ( m_ParseData || m_SkipData  ||  m_DataFile ) {
00708                 CTypeIterator<CID2_Reply_Data> iter = Begin(reply);
00709                 if ( iter && iter->IsSetData() ) {
00710                     if ( m_ParseData || m_DataFile ) {
00711                         x_ProcessData(*iter);
00712                     }
00713                     if ( m_SkipData ) {
00714                         iter->ResetData();
00715                         iter->SetData();
00716                     }
00717                 }
00718             }
00719         }
00720         if ( dump && m_OutFile ) {
00721             AutoPtr<CObjectOStream> id2_client_output
00722                 (CObjectOStream::Open(m_Format, *m_OutFile));
00723 
00724             *id2_client_output << reply;
00725             if (m_Format == eSerial_AsnText  ||  m_Format == eSerial_Xml) {
00726                 *m_OutFile << NcbiEndl;
00727             }
00728         }
00729         if ( reply.IsSetEnd_of_reply() ) {
00730             --remaining_count;
00731         }
00732     }
00733     LOG_POST("Packet processed in " << sw.Elapsed()<< " first at "<<time_first);
00734 }
00735 
00736 
00737 void CId2FetchApp::x_ProcessData(const CID2_Reply_Data& data)
00738 {
00739     _ASSERT( data.IsSetData() );
00740 
00741     TTypeInfo obj_type = 0;
00742     switch ( data.GetData_type() ) {
00743     case CID2_Reply_Data::eData_type_seq_entry:
00744         obj_type = CSeq_entry::GetTypeInfo();
00745         break;
00746     case CID2_Reply_Data::eData_type_seq_annot:
00747         obj_type = CSeq_annot::GetTypeInfo();
00748         break;
00749     case CID2_Reply_Data::eData_type_id2s_split_info:
00750         obj_type = CID2S_Split_Info::GetTypeInfo();
00751         break;
00752     case CID2_Reply_Data::eData_type_id2s_chunk:
00753         obj_type = CID2S_Chunk::GetTypeInfo();
00754         break;
00755     default:
00756         ERR_POST(Fatal << "Unknown data type in ID2_Reply_Data");
00757     }
00758 
00759     ESerialDataFormat format;
00760     switch ( data.GetData_format() ) {
00761     case CID2_Reply_Data::eData_format_asn_binary:
00762         format = eSerial_AsnBinary;
00763         break;
00764     case CID2_Reply_Data::eData_format_asn_text:
00765         format = eSerial_AsnText;
00766         break;
00767     case CID2_Reply_Data::eData_format_xml:
00768         format = eSerial_Xml;
00769         break;
00770     default:
00771         ERR_POST(Fatal << "Unknown data format in ID2_Reply_Data");
00772         return;
00773     }
00774 
00775     COSSReader reader(data.GetData());
00776     CRStream stream(&reader);
00777     AutoPtr<CObjectIStream> obj_stream;
00778     
00779     switch ( data.GetData_compression() ) {
00780     case CID2_Reply_Data::eData_compression_none:
00781     {
00782         obj_stream.reset(CObjectIStream::Open(format, stream));
00783         break;
00784     }
00785     case CID2_Reply_Data::eData_compression_gzip:
00786     {
00787         obj_stream.reset(CObjectIStream::Open(format,
00788             *(new CCompressionIStream(stream,
00789             new CZipStreamDecompressor,
00790             CCompressionIStream::fOwnProcessor)), eTakeOwnership));
00791         break;
00792     }
00793     default:
00794         ERR_POST(Fatal << "Unknown data compression in ID2_Reply_Data");
00795         return;
00796     }
00797     _ASSERT( obj_stream.get() );
00798     obj_stream->UseMemoryPool();
00799     AutoPtr<CObjectOStream> out_stream;
00800     if ( m_DataFile ) {
00801         out_stream.reset(CObjectOStream::Open(m_Format, *m_DataFile));
00802     }
00803     if ( m_CopyData && out_stream.get() ) {
00804         CObjectStreamCopier copier(*obj_stream, *out_stream);
00805         copier.Copy(obj_type);
00806     }
00807     else {
00808         CObjectInfo obj(obj_type);
00809         obj_stream->Read(obj.GetObjectPtr(), obj.GetTypeInfo());
00810         if ( out_stream.get() ) {
00811             out_stream->Write(obj.GetObjectPtr(), obj.GetTypeInfo());
00812         }
00813     }
00814     if ( out_stream.get() && m_Format != eSerial_AsnBinary) {
00815         out_stream->FlushBuffer();
00816         *m_DataFile << '\n';
00817     }
00818 }
00819 
00820 
00821 void CId2FetchApp::x_SaveDataObject(const CObjectInfo& object,
00822                                     CNcbiOstream& out)
00823 {
00824     AutoPtr<CObjectOStream> out_stream(
00825         CObjectOStream::Open(m_Format, out));
00826     out_stream->Write(object.GetObjectPtr(), object.GetTypeInfo());
00827     if ( m_Format != eSerial_AsnBinary) {
00828         out_stream->FlushBuffer();
00829         out << '\n';
00830     }
00831 }
00832 
00833 
00834 int CId2FetchApp::Run(void)
00835 {
00836     // Process command line args
00837     const CArgs& args = GetArgs();
00838 
00839     // Setup and tune logging facilities
00840     if ( args["log"] ) {
00841         SetDiagStream( &args["log"].AsOutputFile() );
00842     }
00843 #ifdef _DEBUG
00844     // SetDiagTrace(eDT_Enable);
00845     SetDiagPostLevel(eDiag_Info);
00846     SetDiagPostFlag(eDPF_All);
00847 #endif
00848 
00849     // Setup application registry, error log, and MT-lock for CONNECT library
00850     CONNECT_Init(&GetConfig());
00851 
00852     m_OutFile = 0;
00853     m_DataFile = 0;
00854     if ( args["fmt"].AsString() != "none" ) {
00855         if ( args["data"] ) {
00856             m_DataFile = &args["data"].AsOutputFile();
00857         }
00858         if ( !m_DataFile || args["out"].AsString() != "-" ) {
00859             m_OutFile = &args["out"].AsOutputFile();
00860         }
00861     }
00862     const string& fmt = args["fmt"].AsString();
00863     if        (fmt == "asn") {
00864         m_Format = eSerial_AsnText;
00865     } else if (fmt == "asnb") {
00866         m_Format = eSerial_AsnBinary;
00867     } else if (fmt == "xml") {
00868         m_Format = eSerial_Xml;
00869     }
00870     m_SkipData = args["skip_data"];
00871     m_CopyData = args["copy_data"];
00872     m_ParseData = args["parse_data"];
00873     m_PipeData = args["pipe_data"];
00874 
00875     int count = args["count"].AsInteger();
00876 
00877     if ( args["pubseqos"] ) {
00878         x_InitPubSeqConnection(args["pubseqos"].AsString(), args["show_init"]);
00879     }
00880     else {
00881         x_InitConnection(args["server"].AsString(), args["show_init"]);
00882     }
00883 
00884     typedef vector<CRef<CID2_Request_Packet> > TReqs;
00885     TReqs reqs;
00886 
00887     if ( args["gi"] ) {
00888         int gi = args["gi"].AsInteger();
00889         CRef<CID2_Request_Packet> packet(new CID2_Request_Packet);
00890         reqs.push_back(packet);
00891         CRef<CID2_Request> req(new CID2_Request);
00892         packet->Set().push_back(req);
00893 
00894         req->SetRequest().SetGet_blob_info().SetBlob_id().SetResolve().
00895             SetRequest().SetSeq_id().SetSeq_id().SetSeq_id().SetGi(gi);
00896         req->SetRequest().SetGet_blob_info().SetGet_data();
00897     }
00898     else if ( args["id"] ) {
00899         CRef<CSeq_id> id(new CSeq_id(args["id"].AsString()));
00900         CRef<CID2_Request_Packet> packet(new CID2_Request_Packet);
00901         reqs.push_back(packet);
00902         CRef<CID2_Request> req(new CID2_Request);
00903         packet->Set().push_back(req);
00904 
00905         req->SetRequest().SetGet_blob_info().SetBlob_id().SetResolve().
00906             SetRequest().SetSeq_id().SetSeq_id().SetSeq_id(*id);
00907         req->SetRequest().SetGet_blob_info().SetGet_data();
00908     }
00909     else if ( args["blob_id"] ) {
00910         vector<string> vv;
00911         NStr::Tokenize(args["blob_id"].AsString(), ",", vv);
00912         if ( vv.size() != 2 ) {
00913             ERR_POST(Fatal<<"Bad blob_id format: "<<args["blob_id"]);
00914         }
00915         int sat = NStr::StringToInt(vv[0]);
00916         int sat_key = NStr::StringToInt(vv[1]);
00917         CRef<CID2_Request_Packet> packet(new CID2_Request_Packet);
00918         reqs.push_back(packet);
00919         CRef<CID2_Request> req(new CID2_Request);
00920         packet->Set().push_back(req);
00921         CID2_Blob_Id& blob_id = 
00922             req->SetRequest().SetGet_blob_info().SetBlob_id().SetBlob_id();
00923         blob_id.SetSat(sat);
00924         blob_id.SetSat_key(sat_key);
00925         req->SetRequest().SetGet_blob_info().SetGet_data();
00926     }
00927     else if ( args["req"] ) {
00928         CRef<CID2_Request_Packet> packet(new CID2_Request_Packet);
00929         reqs.push_back(packet);
00930         CRef<CID2_Request> req(new CID2_Request);
00931         packet->Set().push_back(req);
00932 
00933         string text = args["req"].AsString();
00934         if ( text.find("::=") == NPOS ) {
00935             text = "ID2-Request ::= " + text;
00936         }
00937         CNcbiIstrstream in(text.data(), text.size());
00938         in >> MSerial_AsnText >> *req;
00939     }
00940     else if ( args["packet"] ) {
00941         CRef<CID2_Request_Packet> packet(new CID2_Request_Packet);
00942         reqs.push_back(packet);
00943 
00944         string text = args["packet"].AsString();
00945         if ( text.find("::=") == NPOS ) {
00946             text = "ID2-Request_Packet ::= " + text;
00947         }
00948         CID2_Request_Packet id2_packet;
00949         CNcbiIstrstream in(text.data(), text.size());
00950         in >> MSerial_AsnText >> *packet;
00951     }
00952     else if ( args["in"] ) {
00953         AutoPtr<CObjectIStream> req_input
00954             (CObjectIStream::Open(eSerial_AsnText, args["in"].AsInputFile()));
00955 
00956         while ( !req_input->EndOfData() ) {
00957             string type = req_input->ReadFileHeader();
00958             if (type == "ID2-Request") {
00959                 CRef<CID2_Request_Packet> packet(new CID2_Request_Packet);
00960                 reqs.push_back(packet);
00961                 CRef<CID2_Request> req(new CID2_Request);
00962                 packet->Set().push_back(req);
00963 
00964                 req_input->Read(req, req->GetThisTypeInfo(),
00965                                 CObjectIStream::eNoFileHeader);
00966             }
00967             else if (type == "ID2-Request-Packet") {
00968                 CRef<CID2_Request_Packet> packet(new CID2_Request_Packet);
00969                 reqs.push_back(packet);
00970                 
00971                 req_input->Read(packet, packet->GetThisTypeInfo(),
00972                                 CObjectIStream::eNoFileHeader);
00973             }
00974             else {
00975                 ERR_POST(Fatal <<
00976                     "Object type must be ID2-Request or ID2-Request-Packet.");
00977             }
00978         }
00979     }
00980     else {
00981         ERR_POST(Fatal << "No ID2-Request specified.");
00982     }
00983 
00984 
00985     NON_CONST_ITERATE ( TReqs, it, reqs ) {
00986         for ( int i = 0; i < count; ++i ) {
00987             x_ProcessRequest(**it);
00988         }
00989     }
00990     return 0;  // Done
00991 }
00992 
00993 
00994 
00995 // Cleanup
00996 void CId2FetchApp::Exit(void)
00997 {
00998     SOCK_ShutdownAPI();
00999     SetDiagStream(0);
01000 }
01001 
01002 
01003 
01004 /////////////////////////////////////////////////////////////////////////////
01005 //  MAIN
01006 //
01007 
01008 int main(int argc, const char* argv[]) 
01009 {
01010     return CId2FetchApp().AppMain(argc, argv /*, 0, eDS_Default, 0*/);
01011 }
Modified on Wed May 23 13:04:13 2012 by modify_doxy.py rev. 337098