src/util/compress/streambuf.cpp

Go to the documentation of this file.
00001 /*  $Id: streambuf.cpp 149145 2009-01-07 19:24:11Z ivanov $
00002  * ===========================================================================
00003  *
00004  *                            PUBLIC DOMAIN NOTICE
00005  *               National Center for Biotechnology Information
00006  *
00007  *  This software/database is a "United States Government Work" under the
00008  *  terms of the United States Copyright Act.  It was written as part of
00009  *  the author's official duties as a United States Government employee and
00010  *  thus cannot be copyrighted.  This software/database is freely available
00011  *  to the public for use. The National Library of Medicine and the U.S.
00012  *  Government have not placed any restriction on its use or reproduction.
00013  *
00014  *  Although all reasonable efforts have been taken to ensure the accuracy
00015  *  and reliability of the software and data, the NLM and the U.S.
00016  *  Government do not and cannot warrant the performance or results that
00017  *  may be obtained by using this software or data. The NLM and the U.S.
00018  *  Government disclaim all warranties, express or implied, including
00019  *  warranties of performance, merchantability or fitness for any particular
00020  *  purpose.
00021  *
00022  *  Please cite the author in any work or product based on this material.
00023  *
00024  * ===========================================================================
00025  *
00026  * Authors:  Vladimir Ivanov
00027  *
00028  * File Description:  CCompression based C++ streambuf
00029  *
00030  */
00031 
00032 #include <ncbi_pch.hpp>
00033 #include "streambuf.hpp"
00034 #include <util/compress/stream.hpp>
00035 #include <util/error_codes.hpp>
00036 #include <memory>
00037 
00038 
00039 #define NCBI_USE_ERRCODE_X   Util_Compress
00040 
00041 
00042 BEGIN_NCBI_SCOPE
00043 
00044 // Abbreviation for long name
00045 #define CP CCompressionProcessor
00046 
00047 
00048 //////////////////////////////////////////////////////////////////////////////
00049 //
00050 // CCompressionStreambuf
00051 //
00052 
00053 CCompressionStreambuf::CCompressionStreambuf(
00054     CNcbiIos*                    stream,
00055     CCompressionStreamProcessor* read_sp,
00056     CCompressionStreamProcessor* write_sp)
00057 
00058     :  m_Stream(stream), m_Reader(read_sp), m_Writer(write_sp), m_Buf(0)
00059 {
00060     // Check parameters
00061     if ( !stream  ||  
00062          !((read_sp   &&  read_sp->m_Processor) ||
00063            (write_sp  &&  write_sp->m_Processor))) {
00064         return;
00065     }
00066 
00067     // Get buffers sizes
00068     streamsize read_bufsize = 0, write_bufsize = 0;
00069     if ( m_Reader ) {
00070         read_bufsize = m_Reader->m_InBufSize + m_Reader->m_OutBufSize;
00071     }
00072     if ( m_Writer ) {
00073         write_bufsize = m_Writer->m_InBufSize + m_Writer->m_OutBufSize;
00074     }
00075 
00076     // Allocate memory for all buffers at one time
00077     auto_ptr<CT_CHAR_TYPE> bp(new CT_CHAR_TYPE[read_bufsize + write_bufsize]);
00078     m_Buf = bp.get();
00079     if ( !m_Buf ) {
00080         return;
00081     }
00082 
00083     // Init processors and set the buffer pointers
00084     if ( m_Reader ) {
00085         m_Reader->Init();
00086         m_Reader->m_InBuf  = m_Buf;
00087         m_Reader->m_OutBuf = m_Buf + m_Reader->m_InBufSize;
00088         m_Reader->m_Begin  = m_Reader->m_InBuf;
00089         m_Reader->m_End    = m_Reader->m_InBuf;
00090         // We wish to have underflow() called at the first read
00091         setg(m_Reader->m_OutBuf, m_Reader->m_OutBuf, m_Reader->m_OutBuf);
00092     } else {
00093         setg(0,0,0);
00094     }
00095     if ( m_Writer ) {
00096         m_Writer->Init();
00097         m_Writer->m_InBuf  = m_Buf + read_bufsize;
00098         m_Writer->m_OutBuf = m_Writer->m_InBuf + m_Writer->m_InBufSize;
00099         m_Writer->m_Begin  = m_Writer->m_OutBuf;
00100         m_Writer->m_End    = m_Writer->m_OutBuf;
00101         // Use one character less for the input buffer than the really
00102         // available one (see overflow())
00103         setp(m_Writer->m_InBuf, m_Writer->m_InBuf + m_Writer->m_InBufSize - 1);
00104     } else {
00105         setp(0,0);
00106     }
00107     bp.release();
00108 }
00109 
00110 
00111 CCompressionStreambuf::~CCompressionStreambuf()
00112 {
00113     // Finalize processors
00114 
00115     CCompressionStreamProcessor* sp;
00116     #define msg_where    "CCompressionStreambuf::~CCompressionStreambuf: "
00117     #define msg_overflow "Overflow occurred, lost some processed data " \
00118                          "through call Finalize()"
00119     #define msg_error    "Finalize() failed"
00120 
00121     // Read processor
00122     sp = GetStreamProcessor(CCompressionStream::eRead);
00123     if ( sp ) {
00124         sp->m_Processor->End();
00125         sp->m_State = CCompressionStreamProcessor::eDone;
00126     }
00127 
00128     // Write processor
00129     sp = GetStreamProcessor(CCompressionStream::eWrite);
00130     if ( sp ) {
00131         if ( sp->m_State == CCompressionStreamProcessor::eActive ) {
00132             Finalize(CCompressionStream::eWrite);
00133             if ( sp->m_LastStatus == CP::eStatus_Overflow ) {
00134                 ERR_COMPRESS(72, msg_where << msg_overflow);
00135             }
00136             if ( sp->m_LastStatus == CP::eStatus_Error ) {
00137                 ERR_COMPRESS(73, msg_where << msg_error);
00138             }
00139         }
00140         sp->m_Processor->End();
00141         sp->m_State = CCompressionStreamProcessor::eDone;
00142         // Write remaining data from buffers to underlying stream
00143         WriteOutBufToStream(true /*force write*/);
00144     }
00145     // Delete buffers
00146     delete[] m_Buf;
00147 }
00148 
00149 
00150 int CCompressionStreambuf::sync()
00151 {
00152     if ( !IsOkay() ) {
00153         return -1;
00154     }
00155     int status = 0;
00156     // Sync write processor buffers
00157     CCompressionStreamProcessor* 
00158         sp = GetStreamProcessor(CCompressionStream::eWrite);
00159     if ( sp  &&  sp->m_State != CCompressionStreamProcessor::eDone  &&
00160                !(sp->m_State == CCompressionStreamProcessor::eFinalize  &&  
00161                  sp->m_LastStatus == CCompressionProcessor::eStatus_EndOfData)
00162         ) {
00163         if ( Sync(CCompressionStream::eWrite) != 0 ) {
00164             status = -1;
00165         }
00166     }
00167     // Sync the underlying stream
00168     status += m_Stream->rdbuf()->PUBSYNC();
00169     return (status < 0 ? -1 : 0);
00170 }
00171 
00172 
00173 int CCompressionStreambuf::Sync(CCompressionStream::EDirection dir)
00174 {
00175     // Check processor status
00176     if ( !IsStreamProcessorOkay(dir) ) {
00177         return -1;
00178     }
00179     // Process remaining data in the preprocessing buffer
00180     if ( !Process(dir) ) {
00181         return -1;
00182     }
00183     // Flush
00184     return Flush(dir);
00185 }
00186 
00187 
00188 int CCompressionStreambuf::Finish(CCompressionStream::EDirection dir)
00189 {
00190     // Check processor status
00191     if ( !IsStreamProcessorOkay(dir) ) {
00192         return -1;
00193     }
00194     CCompressionStreamProcessor* sp = GetStreamProcessor(dir);
00195     // Check processor status
00196     if ( sp->m_LastStatus == CP::eStatus_Error ) {
00197         return -1;
00198     }
00199     if ( sp->m_State == CCompressionStreamProcessor::eFinalize ) {
00200         // Already finalized
00201         return 0;
00202     }
00203     // Process remaining data in the preprocessing buffer
00204     Process(dir);
00205     if ( sp->m_LastStatus == CP::eStatus_Error ) {
00206         return -1;
00207     }
00208     // Finish. Change state to 'finalized'.
00209     sp->m_State = CCompressionStreamProcessor::eFinalize;
00210     return Flush(dir);
00211 }
00212 
00213 
00214 int CCompressionStreambuf::Flush(CCompressionStream::EDirection dir)
00215 {
00216     CCompressionStreamProcessor* sp = GetStreamProcessor(dir);
00217 
00218     // Check processor status
00219     if ( sp->m_LastStatus == CP::eStatus_Error ) {
00220         return -1;
00221     }
00222     if ( sp->m_LastStatus == CP::eStatus_EndOfData ) {
00223         // Flush underlying stream (on write)
00224         if (dir == CCompressionStream::eWrite  &&  
00225             !WriteOutBufToStream(true /*force write*/)) {
00226             return -1;
00227         }
00228         // End of data, nothing to do
00229         return 0;
00230     }
00231 
00232     // Flush stream compressor
00233     CT_CHAR_TYPE* buf = 0;
00234     size_t out_size = 0, out_avail = 0;
00235     do {
00236         // Get pointer to the free space in the buffer
00237         if ( dir == CCompressionStream::eRead ) {
00238             buf = egptr();
00239         } else {
00240             buf = sp->m_End;
00241         }
00242         out_size = sp->m_OutBuf + sp->m_OutBufSize - buf;
00243 
00244         // Get data from processor
00245         out_avail = 0;
00246         if ( sp->m_State == CCompressionStreamProcessor::eFinalize ) {
00247             // State is eFinalize
00248             sp->m_LastStatus = 
00249                 sp->m_Processor->Finish(buf, out_size, &out_avail);
00250         } else {
00251             // State is eActive
00252             _VERIFY(sp->m_State == CCompressionStreamProcessor::eActive);
00253             sp->m_LastStatus = 
00254                 sp->m_Processor->Flush(buf, out_size, &out_avail);
00255             // No more data -- automaticaly finalize stream
00256             if ( sp->m_LastStatus == CP::eStatus_EndOfData ) {
00257                 sp->m_State = CCompressionStreamProcessor::eFinalize;
00258             }
00259         } 
00260         // Check on error
00261         if ( sp->m_LastStatus == CP::eStatus_Error ) {
00262             return -1;
00263         }
00264         if ( dir == CCompressionStream::eRead ) {
00265             // Update the get's pointers
00266             setg(sp->m_OutBuf, gptr(), egptr() + out_avail);
00267         } else { // CCompressionStream::eWrite
00268             // Update the output buffer pointer
00269             sp->m_End += out_avail;
00270             // Write data to the underlying stream only if the output buffer
00271             // is full or an overflow/endofdata occurs.
00272             if ( !WriteOutBufToStream() ) {
00273                 return -1;
00274             }
00275         }
00276     } while (sp->m_LastStatus == CP::eStatus_Repeat  ||
00277             (out_avail  &&  sp->m_LastStatus == CP::eStatus_Overflow));
00278 
00279     // Flush underlying stream (on write)
00280     if (dir == CCompressionStream::eWrite) {
00281         if ( sp->m_LastStatus == CP::eStatus_EndOfData  ||
00282             (sp->m_State == CCompressionStreamProcessor::eFinalize && !out_avail)) {
00283             if ( !WriteOutBufToStream(true /*force write*/) ) {
00284                 return -1;
00285             }
00286         }
00287     }
00288     return 0;
00289 }
00290 
00291 
00292 CT_INT_TYPE CCompressionStreambuf::overflow(CT_INT_TYPE c)
00293 {
00294     // Check processor status
00295     if ( !IsStreamProcessorOkay(CCompressionStream::eWrite) ) {
00296         return CT_EOF;
00297     }
00298     if ( m_Writer->m_State == CCompressionStreamProcessor::eFinalize ) {
00299         return CT_EOF;
00300     }
00301     if ( !CT_EQ_INT_TYPE(c, CT_EOF) ) {
00302         // Put this character in the last position
00303         // (this function is called when pptr() == eptr() but we
00304         // have reserved one byte more in the constructor, thus
00305         // *epptr() and now *pptr() point to valid positions)
00306         *pptr() = c;
00307         // Increment put pointer
00308         pbump(1);
00309     }
00310     if ( ProcessStreamWrite() ) {
00311         return CT_NOT_EOF(CT_EOF);
00312     }
00313     return CT_EOF;
00314 }
00315 
00316 
00317 CT_INT_TYPE CCompressionStreambuf::underflow(void)
00318 {
00319     // Check processor status
00320     if ( !IsStreamProcessorOkay(CCompressionStream::eRead) ) {
00321         return CT_EOF;
00322     }
00323 
00324     // Reset pointer to the processed data
00325     setg(m_Reader->m_OutBuf, m_Reader->m_OutBuf, m_Reader->m_OutBuf);
00326 
00327     // Try to process next data
00328     if ( !ProcessStreamRead()  ||  gptr() == egptr() ) {
00329         return CT_EOF;
00330     }
00331     return CT_TO_INT_TYPE(*gptr());
00332 }
00333 
00334 
00335 bool CCompressionStreambuf::ProcessStreamRead()
00336 {
00337     size_t     in_len, in_avail, out_size, out_avail;
00338     streamsize n_read;
00339 
00340     // End of stream has been detected
00341     if ( m_Reader->m_LastStatus == CP::eStatus_EndOfData ) {
00342         return false;
00343     }
00344 
00345     // Flush remaining data from compression stream if it has finalized
00346     if ( m_Reader->m_State == CCompressionStreamProcessor::eFinalize ) {
00347         return Flush(CCompressionStream::eRead) == 0;
00348     }
00349 
00350     // Put data into the (de)compressor until there is something
00351     // in the output buffer
00352     do {
00353         in_avail  = 0;
00354         out_avail = 0;
00355         out_size  = m_Reader->m_OutBuf + m_Reader->m_OutBufSize - egptr();
00356 
00357         // Refill the output buffer if necessary
00358         if ( m_Reader->m_LastStatus != CP::eStatus_Overflow ) {
00359 
00360             // Refill the input buffer if necessary
00361             if ( m_Reader->m_Begin == m_Reader->m_End ) {
00362                 n_read = m_Stream->rdbuf()->sgetn(m_Reader->m_InBuf,
00363                                                   m_Reader->m_InBufSize);
00364                 if ( !n_read ) {
00365                     // We can't read more of data.
00366                     // Automaticaly 'finalize' (de)compressor.
00367                     m_Reader->m_State = CCompressionStreamProcessor::eFinalize;
00368                     return Flush(CCompressionStream::eRead) == 0;
00369                 }
00370                 // Update the input buffer pointers
00371                 m_Reader->m_Begin = m_Reader->m_InBuf;
00372                 m_Reader->m_End   = m_Reader->m_InBuf + n_read;
00373             }
00374             // Process next data portion
00375             in_len = m_Reader->m_End - m_Reader->m_Begin;
00376             m_Reader->m_LastStatus = m_Reader->m_Processor->Process(
00377                                 m_Reader->m_Begin, in_len, egptr(), out_size,
00378                                 &in_avail, &out_avail);
00379         } else {
00380             // Check available space in the output buffer
00381             if ( !out_size ) {
00382                 return false;
00383             }
00384             // Get unprocessed data size
00385             in_len = m_Reader->m_End - m_Reader->m_Begin;
00386             in_avail = in_len;
00387             m_Reader->m_LastStatus = 
00388                 m_Reader->m_Processor->Flush(egptr(), out_size, &out_avail);
00389         }
00390         if ( m_Reader->m_LastStatus == CP::eStatus_Error ) {
00391             return false;
00392         }
00393         // No more data -- automaticaly finalize stream
00394         if ( m_Reader->m_LastStatus == CP::eStatus_EndOfData ) {
00395             m_Reader->m_State = CCompressionStreamProcessor::eFinalize;
00396         }
00397 
00398         // Update pointer to an unprocessed data
00399         m_Reader->m_Begin += (in_len - in_avail);
00400         // Update the get's pointers
00401         setg(m_Reader->m_OutBuf, gptr(), egptr() + out_avail);
00402 
00403         if ( m_Reader->m_LastStatus == CP::eStatus_EndOfData   &&  !out_avail ) { 
00404             return false;
00405         }
00406 
00407     } while ( !out_avail );
00408 
00409     return true;
00410 }
00411 
00412 
00413 bool CCompressionStreambuf::ProcessStreamWrite()
00414 {
00415     const char*      in_buf    = pbase();
00416     const streamsize count     = pptr() - pbase();
00417     size_t           in_avail  = count;
00418 
00419     // End of stream has been detected
00420     if ( m_Writer->m_LastStatus == CP::eStatus_EndOfData ) {
00421         return false;
00422     }
00423     // Flush remaining data from compression stream if it is finalized
00424     if ( m_Writer->m_State == CCompressionStreamProcessor::eFinalize ) {
00425         return Flush(CCompressionStream::eWrite) == 0;
00426     }
00427 
00428     // Loop until no data is left
00429     while ( in_avail ) {
00430         // Process next data portion
00431         size_t out_avail = 0;
00432         streamsize out_size = m_Writer->m_OutBuf + 
00433                               m_Writer->m_OutBufSize - m_Writer->m_End;
00434         m_Writer->m_LastStatus = m_Writer->m_Processor->Process(
00435             in_buf + count - in_avail, in_avail, m_Writer->m_End, out_size,
00436             &in_avail, &out_avail);
00437 
00438         // Check on error / small output buffer
00439         if ( m_Writer->m_LastStatus == CP::eStatus_Error ) {
00440             return false;
00441         }
00442         // No more data -- automaticaly finalize stream
00443         if ( m_Writer->m_LastStatus == CP::eStatus_EndOfData ) {
00444             m_Writer->m_State = CCompressionStreamProcessor::eFinalize;
00445         }
00446         // Update the output buffer pointer
00447         m_Writer->m_End += out_avail;
00448 
00449         // Write data to the underlying stream only if the output buffer
00450         // is full or an overflow occurs.
00451         if ( !WriteOutBufToStream() ) {
00452             return false;
00453         }
00454     }
00455     // Decrease the put pointer
00456     pbump(-(int)count);
00457     return true;
00458 }
00459 
00460 
00461 bool CCompressionStreambuf::WriteOutBufToStream(bool force_write)
00462 {
00463     // Write data from out buffer to the underlying stream only if the buffer
00464     // is full or an overflow/endofdata occurs, or 'force_write' is TRUE.
00465     if ( force_write  || 
00466          (m_Writer->m_End == m_Writer->m_OutBuf + m_Writer->m_OutBufSize)  ||
00467          m_Writer->m_LastStatus == CP::eStatus_Overflow  ||
00468          m_Writer->m_LastStatus == CP::eStatus_EndOfData ) {
00469 
00470         streamsize to_write = m_Writer->m_End - m_Writer->m_Begin;
00471         if ( to_write ) {
00472             streamsize n_write = m_Stream->rdbuf()->sputn(m_Writer->m_Begin, to_write);
00473             if ( n_write != to_write ) {
00474                 m_Writer->m_Begin += n_write;
00475                 return false;
00476             }
00477             // Update the output buffer pointers
00478             m_Writer->m_Begin = m_Writer->m_OutBuf;
00479             m_Writer->m_End   = m_Writer->m_OutBuf;
00480         }
00481     }
00482     return true;
00483 }
00484 
00485 
00486 streamsize CCompressionStreambuf::xsputn(const CT_CHAR_TYPE* buf,
00487                                          streamsize count)
00488 {
00489     // Check processor status
00490     if ( !IsStreamProcessorOkay(CCompressionStream::eWrite) ) {
00491         return CT_EOF;
00492     }
00493     if ( m_Writer->m_State == CCompressionStreamProcessor::eFinalize ) {
00494         return CT_EOF;
00495     }
00496     // Check parameters
00497     if ( !buf  ||  count <= 0 ) {
00498         return 0;
00499     }
00500     // The number of chars copied
00501     streamsize done = 0;
00502 
00503     // Loop until no data is left
00504     while ( done < count ) {
00505         // Get the number of chars to write in this iteration
00506         // (we've got one more char than epptr thinks)
00507         size_t block_size = min(size_t(count-done), size_t(epptr()-pptr()+1));
00508         // Write them
00509         memcpy(pptr(), buf + done, block_size);
00510         // Update the write pointer
00511         pbump((int)block_size);
00512         // Process block if necessary
00513         if ( pptr() >= epptr()  &&  !ProcessStreamWrite() ) {
00514             break;
00515         }
00516         done += block_size;
00517     }
00518     return done;
00519 };
00520 
00521 
00522 streamsize CCompressionStreambuf::xsgetn(CT_CHAR_TYPE* buf, streamsize count)
00523 {
00524     // We don't doing here a check for the streambuf finalization because
00525     // underflow() can be called after Finalize() to read a rest of
00526     // produced data.
00527     if ( !IsOkay()  ||  !m_Reader->m_Processor ) {
00528         return 0;
00529     }
00530     // Check parameters
00531     if ( !buf  ||  count <= 0 ) {
00532         return 0;
00533     }
00534     // The number of chars copied
00535     streamsize done = 0;
00536 
00537     // Loop until all data are not read yet
00538     for (;;) {
00539         // Get the number of chars to write in this iteration
00540         size_t block_size = min(size_t(count-done), size_t(egptr()-gptr()));
00541         // Copy them
00542         if ( block_size ) {
00543             memcpy(buf + done, gptr(), block_size);
00544             done += block_size;
00545             // Update get pointers.
00546             // Satisfy "usual backup condition", see standard: 27.5.2.4.3.13
00547             if ( block_size == size_t(egptr() - gptr()) ) {
00548                 *m_Reader->m_OutBuf = buf[done - 1];
00549                 setg(m_Reader->m_OutBuf, m_Reader->m_OutBuf + 1,
00550                      m_Reader->m_OutBuf + 1);
00551             } else {
00552                 // Update the read pointer
00553                 gbump((int)block_size);
00554             }
00555         }
00556         // Process block if necessary
00557         if ( done == count  ||  !ProcessStreamRead() ) {
00558             break;
00559         }
00560     }
00561     return done;
00562 }
00563 
00564 
00565 END_NCBI_SCOPE
00566 
00567 

Generated on Sun Dec 6 22:44:00 2009 for NCBI C++ ToolKit by  doxygen 1.4.6
Modified on Mon Dec 07 16:21:14 2009 by modify_doxy.py rev. 173732