00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032 #include <ncbi_pch.hpp>
00033 #include "streambuf.hpp"
00034 #include <util/compress/stream.hpp>
00035 #include <util/error_codes.hpp>
00036 #include <memory>
00037
00038
00039 #define NCBI_USE_ERRCODE_X Util_Compress
00040
00041
00042 BEGIN_NCBI_SCOPE
00043
00044
00045 #define CP CCompressionProcessor
00046
00047
00048
00049
00050
00051
00052
00053 CCompressionStreambuf::CCompressionStreambuf(
00054 CNcbiIos* stream,
00055 CCompressionStreamProcessor* read_sp,
00056 CCompressionStreamProcessor* write_sp)
00057
00058 : m_Stream(stream), m_Reader(read_sp), m_Writer(write_sp), m_Buf(0)
00059 {
00060
00061 if ( !stream ||
00062 !((read_sp && read_sp->m_Processor) ||
00063 (write_sp && write_sp->m_Processor))) {
00064 return;
00065 }
00066
00067
00068 streamsize read_bufsize = 0, write_bufsize = 0;
00069 if ( m_Reader ) {
00070 read_bufsize = m_Reader->m_InBufSize + m_Reader->m_OutBufSize;
00071 }
00072 if ( m_Writer ) {
00073 write_bufsize = m_Writer->m_InBufSize + m_Writer->m_OutBufSize;
00074 }
00075
00076
00077 auto_ptr<CT_CHAR_TYPE> bp(new CT_CHAR_TYPE[read_bufsize + write_bufsize]);
00078 m_Buf = bp.get();
00079 if ( !m_Buf ) {
00080 return;
00081 }
00082
00083
00084 if ( m_Reader ) {
00085 m_Reader->Init();
00086 m_Reader->m_InBuf = m_Buf;
00087 m_Reader->m_OutBuf = m_Buf + m_Reader->m_InBufSize;
00088 m_Reader->m_Begin = m_Reader->m_InBuf;
00089 m_Reader->m_End = m_Reader->m_InBuf;
00090
00091 setg(m_Reader->m_OutBuf, m_Reader->m_OutBuf, m_Reader->m_OutBuf);
00092 } else {
00093 setg(0,0,0);
00094 }
00095 if ( m_Writer ) {
00096 m_Writer->Init();
00097 m_Writer->m_InBuf = m_Buf + read_bufsize;
00098 m_Writer->m_OutBuf = m_Writer->m_InBuf + m_Writer->m_InBufSize;
00099 m_Writer->m_Begin = m_Writer->m_OutBuf;
00100 m_Writer->m_End = m_Writer->m_OutBuf;
00101
00102
00103 setp(m_Writer->m_InBuf, m_Writer->m_InBuf + m_Writer->m_InBufSize - 1);
00104 } else {
00105 setp(0,0);
00106 }
00107 bp.release();
00108 }
00109
00110
00111 CCompressionStreambuf::~CCompressionStreambuf()
00112 {
00113
00114
00115 CCompressionStreamProcessor* sp;
00116 #define msg_where "CCompressionStreambuf::~CCompressionStreambuf: "
00117 #define msg_overflow "Overflow occurred, lost some processed data " \
00118 "through call Finalize()"
00119 #define msg_error "Finalize() failed"
00120
00121
00122 sp = GetStreamProcessor(CCompressionStream::eRead);
00123 if ( sp ) {
00124 sp->m_Processor->End();
00125 sp->m_State = CCompressionStreamProcessor::eDone;
00126 }
00127
00128
00129 sp = GetStreamProcessor(CCompressionStream::eWrite);
00130 if ( sp ) {
00131 if ( sp->m_State == CCompressionStreamProcessor::eActive ) {
00132 Finalize(CCompressionStream::eWrite);
00133 if ( sp->m_LastStatus == CP::eStatus_Overflow ) {
00134 ERR_COMPRESS(72, msg_where << msg_overflow);
00135 }
00136 if ( sp->m_LastStatus == CP::eStatus_Error ) {
00137 ERR_COMPRESS(73, msg_where << msg_error);
00138 }
00139 }
00140 sp->m_Processor->End();
00141 sp->m_State = CCompressionStreamProcessor::eDone;
00142
00143 WriteOutBufToStream(true );
00144 }
00145
00146 delete[] m_Buf;
00147 }
00148
00149
00150 int CCompressionStreambuf::sync()
00151 {
00152 if ( !IsOkay() ) {
00153 return -1;
00154 }
00155 int status = 0;
00156
00157 CCompressionStreamProcessor*
00158 sp = GetStreamProcessor(CCompressionStream::eWrite);
00159 if ( sp && sp->m_State != CCompressionStreamProcessor::eDone &&
00160 !(sp->m_State == CCompressionStreamProcessor::eFinalize &&
00161 sp->m_LastStatus == CCompressionProcessor::eStatus_EndOfData)
00162 ) {
00163 if ( Sync(CCompressionStream::eWrite) != 0 ) {
00164 status = -1;
00165 }
00166 }
00167
00168 status += m_Stream->rdbuf()->PUBSYNC();
00169 return (status < 0 ? -1 : 0);
00170 }
00171
00172
00173 int CCompressionStreambuf::Sync(CCompressionStream::EDirection dir)
00174 {
00175
00176 if ( !IsStreamProcessorOkay(dir) ) {
00177 return -1;
00178 }
00179
00180 if ( !Process(dir) ) {
00181 return -1;
00182 }
00183
00184 return Flush(dir);
00185 }
00186
00187
00188 int CCompressionStreambuf::Finish(CCompressionStream::EDirection dir)
00189 {
00190
00191 if ( !IsStreamProcessorOkay(dir) ) {
00192 return -1;
00193 }
00194 CCompressionStreamProcessor* sp = GetStreamProcessor(dir);
00195
00196 if ( sp->m_LastStatus == CP::eStatus_Error ) {
00197 return -1;
00198 }
00199 if ( sp->m_State == CCompressionStreamProcessor::eFinalize ) {
00200
00201 return 0;
00202 }
00203
00204 Process(dir);
00205 if ( sp->m_LastStatus == CP::eStatus_Error ) {
00206 return -1;
00207 }
00208
00209 sp->m_State = CCompressionStreamProcessor::eFinalize;
00210 return Flush(dir);
00211 }
00212
00213
00214 int CCompressionStreambuf::Flush(CCompressionStream::EDirection dir)
00215 {
00216 CCompressionStreamProcessor* sp = GetStreamProcessor(dir);
00217
00218
00219 if ( sp->m_LastStatus == CP::eStatus_Error ) {
00220 return -1;
00221 }
00222 if ( sp->m_LastStatus == CP::eStatus_EndOfData ) {
00223
00224 if (dir == CCompressionStream::eWrite &&
00225 !WriteOutBufToStream(true )) {
00226 return -1;
00227 }
00228
00229 return 0;
00230 }
00231
00232
00233 CT_CHAR_TYPE* buf = 0;
00234 size_t out_size = 0, out_avail = 0;
00235 do {
00236
00237 if ( dir == CCompressionStream::eRead ) {
00238 buf = egptr();
00239 } else {
00240 buf = sp->m_End;
00241 }
00242 out_size = sp->m_OutBuf + sp->m_OutBufSize - buf;
00243
00244
00245 out_avail = 0;
00246 if ( sp->m_State == CCompressionStreamProcessor::eFinalize ) {
00247
00248 sp->m_LastStatus =
00249 sp->m_Processor->Finish(buf, out_size, &out_avail);
00250 } else {
00251
00252 _VERIFY(sp->m_State == CCompressionStreamProcessor::eActive);
00253 sp->m_LastStatus =
00254 sp->m_Processor->Flush(buf, out_size, &out_avail);
00255
00256 if ( sp->m_LastStatus == CP::eStatus_EndOfData ) {
00257 sp->m_State = CCompressionStreamProcessor::eFinalize;
00258 }
00259 }
00260
00261 if ( sp->m_LastStatus == CP::eStatus_Error ) {
00262 return -1;
00263 }
00264 if ( dir == CCompressionStream::eRead ) {
00265
00266 setg(sp->m_OutBuf, gptr(), egptr() + out_avail);
00267 } else {
00268
00269 sp->m_End += out_avail;
00270
00271
00272 if ( !WriteOutBufToStream() ) {
00273 return -1;
00274 }
00275 }
00276 } while (sp->m_LastStatus == CP::eStatus_Repeat ||
00277 (out_avail && sp->m_LastStatus == CP::eStatus_Overflow));
00278
00279
00280 if (dir == CCompressionStream::eWrite) {
00281 if ( sp->m_LastStatus == CP::eStatus_EndOfData ||
00282 (sp->m_State == CCompressionStreamProcessor::eFinalize && !out_avail)) {
00283 if ( !WriteOutBufToStream(true ) ) {
00284 return -1;
00285 }
00286 }
00287 }
00288 return 0;
00289 }
00290
00291
00292 CT_INT_TYPE CCompressionStreambuf::overflow(CT_INT_TYPE c)
00293 {
00294
00295 if ( !IsStreamProcessorOkay(CCompressionStream::eWrite) ) {
00296 return CT_EOF;
00297 }
00298 if ( m_Writer->m_State == CCompressionStreamProcessor::eFinalize ) {
00299 return CT_EOF;
00300 }
00301 if ( !CT_EQ_INT_TYPE(c, CT_EOF) ) {
00302
00303
00304
00305
00306 *pptr() = c;
00307
00308 pbump(1);
00309 }
00310 if ( ProcessStreamWrite() ) {
00311 return CT_NOT_EOF(CT_EOF);
00312 }
00313 return CT_EOF;
00314 }
00315
00316
00317 CT_INT_TYPE CCompressionStreambuf::underflow(void)
00318 {
00319
00320 if ( !IsStreamProcessorOkay(CCompressionStream::eRead) ) {
00321 return CT_EOF;
00322 }
00323
00324
00325 setg(m_Reader->m_OutBuf, m_Reader->m_OutBuf, m_Reader->m_OutBuf);
00326
00327
00328 if ( !ProcessStreamRead() || gptr() == egptr() ) {
00329 return CT_EOF;
00330 }
00331 return CT_TO_INT_TYPE(*gptr());
00332 }
00333
00334
00335 bool CCompressionStreambuf::ProcessStreamRead()
00336 {
00337 size_t in_len, in_avail, out_size, out_avail;
00338 streamsize n_read;
00339
00340
00341 if ( m_Reader->m_LastStatus == CP::eStatus_EndOfData ) {
00342 return false;
00343 }
00344
00345
00346 if ( m_Reader->m_State == CCompressionStreamProcessor::eFinalize ) {
00347 return Flush(CCompressionStream::eRead) == 0;
00348 }
00349
00350
00351
00352 do {
00353 in_avail = 0;
00354 out_avail = 0;
00355 out_size = m_Reader->m_OutBuf + m_Reader->m_OutBufSize - egptr();
00356
00357
00358 if ( m_Reader->m_LastStatus != CP::eStatus_Overflow ) {
00359
00360
00361 if ( m_Reader->m_Begin == m_Reader->m_End ) {
00362 n_read = m_Stream->rdbuf()->sgetn(m_Reader->m_InBuf,
00363 m_Reader->m_InBufSize);
00364 if ( !n_read ) {
00365
00366
00367 m_Reader->m_State = CCompressionStreamProcessor::eFinalize;
00368 return Flush(CCompressionStream::eRead) == 0;
00369 }
00370
00371 m_Reader->m_Begin = m_Reader->m_InBuf;
00372 m_Reader->m_End = m_Reader->m_InBuf + n_read;
00373 }
00374
00375 in_len = m_Reader->m_End - m_Reader->m_Begin;
00376 m_Reader->m_LastStatus = m_Reader->m_Processor->Process(
00377 m_Reader->m_Begin, in_len, egptr(), out_size,
00378 &in_avail, &out_avail);
00379 } else {
00380
00381 if ( !out_size ) {
00382 return false;
00383 }
00384
00385 in_len = m_Reader->m_End - m_Reader->m_Begin;
00386 in_avail = in_len;
00387 m_Reader->m_LastStatus =
00388 m_Reader->m_Processor->Flush(egptr(), out_size, &out_avail);
00389 }
00390 if ( m_Reader->m_LastStatus == CP::eStatus_Error ) {
00391 return false;
00392 }
00393
00394 if ( m_Reader->m_LastStatus == CP::eStatus_EndOfData ) {
00395 m_Reader->m_State = CCompressionStreamProcessor::eFinalize;
00396 }
00397
00398
00399 m_Reader->m_Begin += (in_len - in_avail);
00400
00401 setg(m_Reader->m_OutBuf, gptr(), egptr() + out_avail);
00402
00403 if ( m_Reader->m_LastStatus == CP::eStatus_EndOfData && !out_avail ) {
00404 return false;
00405 }
00406
00407 } while ( !out_avail );
00408
00409 return true;
00410 }
00411
00412
00413 bool CCompressionStreambuf::ProcessStreamWrite()
00414 {
00415 const char* in_buf = pbase();
00416 const streamsize count = pptr() - pbase();
00417 size_t in_avail = count;
00418
00419
00420 if ( m_Writer->m_LastStatus == CP::eStatus_EndOfData ) {
00421 return false;
00422 }
00423
00424 if ( m_Writer->m_State == CCompressionStreamProcessor::eFinalize ) {
00425 return Flush(CCompressionStream::eWrite) == 0;
00426 }
00427
00428
00429 while ( in_avail ) {
00430
00431 size_t out_avail = 0;
00432 streamsize out_size = m_Writer->m_OutBuf +
00433 m_Writer->m_OutBufSize - m_Writer->m_End;
00434 m_Writer->m_LastStatus = m_Writer->m_Processor->Process(
00435 in_buf + count - in_avail, in_avail, m_Writer->m_End, out_size,
00436 &in_avail, &out_avail);
00437
00438
00439 if ( m_Writer->m_LastStatus == CP::eStatus_Error ) {
00440 return false;
00441 }
00442
00443 if ( m_Writer->m_LastStatus == CP::eStatus_EndOfData ) {
00444 m_Writer->m_State = CCompressionStreamProcessor::eFinalize;
00445 }
00446
00447 m_Writer->m_End += out_avail;
00448
00449
00450
00451 if ( !WriteOutBufToStream() ) {
00452 return false;
00453 }
00454 }
00455
00456 pbump(-(int)count);
00457 return true;
00458 }
00459
00460
00461 bool CCompressionStreambuf::WriteOutBufToStream(bool force_write)
00462 {
00463
00464
00465 if ( force_write ||
00466 (m_Writer->m_End == m_Writer->m_OutBuf + m_Writer->m_OutBufSize) ||
00467 m_Writer->m_LastStatus == CP::eStatus_Overflow ||
00468 m_Writer->m_LastStatus == CP::eStatus_EndOfData ) {
00469
00470 streamsize to_write = m_Writer->m_End - m_Writer->m_Begin;
00471 if ( to_write ) {
00472 streamsize n_write = m_Stream->rdbuf()->sputn(m_Writer->m_Begin, to_write);
00473 if ( n_write != to_write ) {
00474 m_Writer->m_Begin += n_write;
00475 return false;
00476 }
00477
00478 m_Writer->m_Begin = m_Writer->m_OutBuf;
00479 m_Writer->m_End = m_Writer->m_OutBuf;
00480 }
00481 }
00482 return true;
00483 }
00484
00485
00486 streamsize CCompressionStreambuf::xsputn(const CT_CHAR_TYPE* buf,
00487 streamsize count)
00488 {
00489
00490 if ( !IsStreamProcessorOkay(CCompressionStream::eWrite) ) {
00491 return CT_EOF;
00492 }
00493 if ( m_Writer->m_State == CCompressionStreamProcessor::eFinalize ) {
00494 return CT_EOF;
00495 }
00496
00497 if ( !buf || count <= 0 ) {
00498 return 0;
00499 }
00500
00501 streamsize done = 0;
00502
00503
00504 while ( done < count ) {
00505
00506
00507 size_t block_size = min(size_t(count-done), size_t(epptr()-pptr()+1));
00508
00509 memcpy(pptr(), buf + done, block_size);
00510
00511 pbump((int)block_size);
00512
00513 if ( pptr() >= epptr() && !ProcessStreamWrite() ) {
00514 break;
00515 }
00516 done += block_size;
00517 }
00518 return done;
00519 };
00520
00521
00522 streamsize CCompressionStreambuf::xsgetn(CT_CHAR_TYPE* buf, streamsize count)
00523 {
00524
00525
00526
00527 if ( !IsOkay() || !m_Reader->m_Processor ) {
00528 return 0;
00529 }
00530
00531 if ( !buf || count <= 0 ) {
00532 return 0;
00533 }
00534
00535 streamsize done = 0;
00536
00537
00538 for (;;) {
00539
00540 size_t block_size = min(size_t(count-done), size_t(egptr()-gptr()));
00541
00542 if ( block_size ) {
00543 memcpy(buf + done, gptr(), block_size);
00544 done += block_size;
00545
00546
00547 if ( block_size == size_t(egptr() - gptr()) ) {
00548 *m_Reader->m_OutBuf = buf[done - 1];
00549 setg(m_Reader->m_OutBuf, m_Reader->m_OutBuf + 1,
00550 m_Reader->m_OutBuf + 1);
00551 } else {
00552
00553 gbump((int)block_size);
00554 }
00555 }
00556
00557 if ( done == count || !ProcessStreamRead() ) {
00558 break;
00559 }
00560 }
00561 return done;
00562 }
00563
00564
00565 END_NCBI_SCOPE
00566
00567