IBR-DTNSuite  0.8
ibrdtn/ibrdtn/streams/StreamBuffer.cpp
Go to the documentation of this file.
00001 /*
00002  * bpstreambuf.cpp
00003  *
00004  *  Created on: 14.07.2009
00005  *      Author: morgenro
00006  */
00007 
00008 #include "ibrdtn/streams/StreamConnection.h"
00009 #include <ibrcommon/Logger.h>
00010 #include <ibrcommon/TimeMeasurement.h>
00011 
00012 namespace dtn
00013 {
00014         namespace streams
00015         {
00016                 StreamConnection::StreamBuffer::StreamBuffer(StreamConnection &conn, iostream &stream, const size_t buffer_size)
00017                         : _buffer_size(buffer_size), _statebits(STREAM_SOB), _conn(conn), in_buf_(new char[buffer_size]), out_buf_(new char[buffer_size]), _stream(stream),
00018                           _recv_size(0), _underflow_data_remain(0), _underflow_state(IDLE), _idle_timer(*this, 0)
00019                 {
00020                         // Initialize get pointer.  This should be zero so that underflow is called upon first read.
00021                         setg(0, 0, 0);
00022                         setp(out_buf_, out_buf_ + _buffer_size - 1);
00023                 }
00024 
00025                 StreamConnection::StreamBuffer::~StreamBuffer()
00026                 {
00027                         // clear the own buffer
00028                         delete [] in_buf_;
00029                         delete [] out_buf_;
00030 
00031                         // stop the idle timer
00032                         _idle_timer.stop();
00033                 }
00034 
00035                 bool StreamConnection::StreamBuffer::get(const StateBits bit) const
00036                 {
00037                         return (_statebits & bit);
00038                 }
00039 
00040                 void StreamConnection::StreamBuffer::set(const StateBits bit)
00041                 {
00042                         ibrcommon::MutexLock l(_statelock);
00043                         _statebits |= bit;
00044                 }
00045 
00046                 void StreamConnection::StreamBuffer::unset(const StateBits bit)
00047                 {
00048                         ibrcommon::MutexLock l(_statelock);
00049                         _statebits &= ~(bit);
00050                 }
00051 
00052                 void StreamConnection::StreamBuffer::__error() const
00053                 {
00054                         IBRCOMMON_LOGGER_DEBUG(80) << "StreamBuffer Debugging" << IBRCOMMON_LOGGER_ENDL;
00055                         IBRCOMMON_LOGGER_DEBUG(80) << "---------------------------------------" << IBRCOMMON_LOGGER_ENDL;
00056                         IBRCOMMON_LOGGER_DEBUG(80) << "Buffer size: " << _buffer_size << IBRCOMMON_LOGGER_ENDL;
00057                         IBRCOMMON_LOGGER_DEBUG(80) << "State bits: " << _statebits << IBRCOMMON_LOGGER_ENDL;
00058                         IBRCOMMON_LOGGER_DEBUG(80) << "Recv size: " << _recv_size << IBRCOMMON_LOGGER_ENDL;
00059                         IBRCOMMON_LOGGER_DEBUG(80) << "Segments: " << _segments.size() << IBRCOMMON_LOGGER_ENDL;
00060                         IBRCOMMON_LOGGER_DEBUG(80) << "Reject segments: " << _rejected_segments.size() << IBRCOMMON_LOGGER_ENDL;
00061                         IBRCOMMON_LOGGER_DEBUG(80) << "Underflow remaining: " << _underflow_data_remain << IBRCOMMON_LOGGER_ENDL;
00062                         IBRCOMMON_LOGGER_DEBUG(80) << "Underflow state: " << _underflow_state << IBRCOMMON_LOGGER_ENDL;
00063                         IBRCOMMON_LOGGER_DEBUG(80) << "---------------------------------------" << IBRCOMMON_LOGGER_ENDL;
00064 
00065                         if (_statebits & STREAM_FAILED)
00066                         {
00067                                 IBRCOMMON_LOGGER_DEBUG(80) << "stream went bad: STREAM_FAILED is set" << IBRCOMMON_LOGGER_ENDL;
00068                         }
00069 
00070                         if (_statebits & STREAM_BAD)
00071                         {
00072                                 IBRCOMMON_LOGGER_DEBUG(80) << "stream went bad: STREAM_BAD is set" << IBRCOMMON_LOGGER_ENDL;
00073                         }
00074 
00075                         if (_statebits & STREAM_EOF)
00076                         {
00077                                 IBRCOMMON_LOGGER_DEBUG(80) << "stream went bad: STREAM_EOF is set" << IBRCOMMON_LOGGER_ENDL;
00078                         }
00079 
00080                         if (_statebits & STREAM_SHUTDOWN)
00081                         {
00082                                 IBRCOMMON_LOGGER_DEBUG(80) << "stream went bad: STREAM_SHUTDOWN is set" << IBRCOMMON_LOGGER_ENDL;
00083                         }
00084 
00085                         if (_statebits & STREAM_CLOSED)
00086                         {
00087                                 IBRCOMMON_LOGGER_DEBUG(80) << "stream went bad: STREAM_CLOSED is set" << IBRCOMMON_LOGGER_ENDL;
00088                         }
00089 
00090                         if (!_stream.good())
00091                         {
00092                                 IBRCOMMON_LOGGER_DEBUG(80) << "stream went bad: good() returned false" << IBRCOMMON_LOGGER_ENDL;
00093                         }
00094                 }
00095 
00096                 bool StreamConnection::StreamBuffer::__good() const
00097                 {
00098                         int badbits = STREAM_FAILED + STREAM_BAD + STREAM_EOF + STREAM_SHUTDOWN + STREAM_CLOSED;
00099                         return !(badbits & _statebits);
00100                 }
00101 
00110                 const StreamContactHeader StreamConnection::StreamBuffer::handshake(const StreamContactHeader &header)
00111                 {
00112                         StreamContactHeader peer;
00113 
00114                         try {
00115                                 // make the send-call atomic
00116                                 {
00117                                         ibrcommon::MutexLock l(_sendlock);
00118 
00119                                         // transfer the local header
00120                                         _stream << header << std::flush;
00121                                 }
00122 
00123                                 // receive the remote header
00124                                 _stream >> peer;
00125 
00126                                 // enable/disable ACK/NACK support
00127                                 if (peer._flags & StreamContactHeader::REQUEST_ACKNOWLEDGMENTS) set(STREAM_ACK_SUPPORT);
00128                                 if (peer._flags & StreamContactHeader::REQUEST_NEGATIVE_ACKNOWLEDGMENTS) set(STREAM_NACK_SUPPORT);
00129 
00130                                 // set the incoming timer if set (> 0)
00131                                 if (peer._keepalive > 0)
00132                                 {
00133                                         // mark timer support
00134                                         set(STREAM_TIMER_SUPPORT);
00135                                 }
00136 
00137                                 // set handshake completed bit
00138                                 set(STREAM_HANDSHAKE);
00139 
00140                         } catch (const std::exception&) {
00141                                 // set failed bit
00142                                 set(STREAM_FAILED);
00143 
00144                                 // shutdown the stream
00145                                 shutdown(StreamDataSegment::MSG_SHUTDOWN_VERSION_MISSMATCH);
00146 
00147                                 // call the shutdown event
00148                                 _conn.shutdown(CONNECTION_SHUTDOWN_ERROR);
00149 
00150                                 // forward the catched exception
00151                                 throw StreamErrorException("handshake not completed");
00152                         }
00153 
00154                         // return the received header
00155                         return peer;
00156                 }
00157 
00163                 void StreamConnection::StreamBuffer::shutdown(const StreamDataSegment::ShutdownReason reason)
00164                 {
00165                         try {
00166                                 ibrcommon::MutexLock l(_sendlock);
00167                                 // send a SHUTDOWN message
00168                                 _stream << StreamDataSegment(reason) << std::flush;
00169                         } catch (const std::exception&) {
00170                                 // set failed bit
00171                                 set(STREAM_FAILED);
00172 
00173                                 throw StreamErrorException("can not send shutdown message");
00174                         }
00175                 }
00176 
00177                 void StreamConnection::StreamBuffer::keepalive()
00178                 {
00179                         try {
00180                                 try {
00181                                         ibrcommon::MutexTryLock l(_sendlock);
00182                                         _stream << StreamDataSegment() << std::flush;
00183                                         IBRCOMMON_LOGGER_DEBUG(15) << "KEEPALIVE sent" << IBRCOMMON_LOGGER_ENDL;
00184                                 } catch (const ibrcommon::MutexException&) {
00185                                         // could not grab the lock - another process is sending something
00186                                         // then we do nothing since a data frame do the same as a keepalive frame.
00187                                 };
00188                         } catch (const std::exception&) {
00189                                 // set failed bit
00190                                 set(STREAM_FAILED);
00191                         }
00192                 }
00193 
00194                 void StreamConnection::StreamBuffer::close()
00195                 {
00196                         // set shutdown bit
00197                         set(STREAM_SHUTDOWN);
00198                 }
00199 
00200                 void StreamConnection::StreamBuffer::reject()
00201                 {
00202                         // we have to reject the current transmission
00203                         // so we have to discard all all data until the next segment with a start bit
00204                         set(STREAM_REJECT);
00205 
00206                         // set the current in buffer to zero
00207                         // this should result in a underflow call on the next read
00208                         setg(0, 0, 0);
00209                 }
00210 
00211                 void StreamConnection::StreamBuffer::abort()
00212                 {
00213                         _segments.abort();
00214                 }
00215 
00216                 void StreamConnection::StreamBuffer::wait()
00217                 {
00218                         // TODO: get max time to wait out of the timeout values
00219                         size_t timeout = 0;
00220 
00221                         try {
00222                                 IBRCOMMON_LOGGER_DEBUG(15) << "waitCompleted(): wait for completion of transmission, " << _segments.size() << " ACKs left" << IBRCOMMON_LOGGER_ENDL;
00223                                 _segments.wait(ibrcommon::Queue<StreamDataSegment>::QUEUE_EMPTY, timeout);
00224                                 IBRCOMMON_LOGGER_DEBUG(15) << "waitCompleted(): transfer completed" << IBRCOMMON_LOGGER_ENDL;
00225                         } catch (const ibrcommon::QueueUnblockedException&) {
00226                                 IBRCOMMON_LOGGER_DEBUG(15) << "waitCompleted(): transfer aborted (timeout)" << IBRCOMMON_LOGGER_ENDL;
00227                         }
00228                 }
00229 
00230                 // This function is called when the output buffer is filled.
00231                 // In this function, the buffer should be written to wherever it should
00232                 // be written to (in this case, the streambuf object that this is controlling).
00233                 std::char_traits<char>::int_type StreamConnection::StreamBuffer::overflow(std::char_traits<char>::int_type c)
00234                 {
00235                         IBRCOMMON_LOGGER_DEBUG(90) << "StreamBuffer::overflow() called" << IBRCOMMON_LOGGER_ENDL;
00236 
00237                         try {
00238                                 char *ibegin = out_buf_;
00239                                 char *iend = pptr();
00240 
00241                                 // mark the buffer as free
00242                                 setp(out_buf_, out_buf_ + _buffer_size - 1);
00243 
00244                                 // append the last character
00245                                 if(!traits_type::eq_int_type(c, traits_type::eof())) {
00246                                         *iend++ = traits_type::to_char_type(c);
00247                                 }
00248 
00249                                 // if there is nothing to send, just return
00250                                 if ((iend - ibegin) == 0)
00251                                 {
00252                                         return traits_type::not_eof(c);
00253                                 }
00254 
00255                                 // wrap a segment around the data
00256                                 StreamDataSegment seg(StreamDataSegment::MSG_DATA_SEGMENT, (iend - ibegin));
00257 
00258                                 // set the start flag
00259                                 if (get(STREAM_SOB))
00260                                 {
00261                                         seg._flags |= StreamDataSegment::MSG_MARK_BEGINN;
00262                                         unset(STREAM_SKIP);
00263                                         unset(STREAM_SOB);
00264                                 }
00265 
00266                                 if (char_traits<char>::eq_int_type(c, char_traits<char>::eof()))
00267                                 {
00268                                         // set the end flag
00269                                         seg._flags |= StreamDataSegment::MSG_MARK_END;
00270                                         set(STREAM_SOB);
00271                                 }
00272 
00273                                 if (!get(STREAM_SKIP))
00274                                 {
00275                                         // put the segment into the queue
00276                                         if (get(STREAM_ACK_SUPPORT))
00277                                         {
00278                                                 _segments.push(seg);
00279                                         }
00280                                         else if (seg._flags & StreamDataSegment::MSG_MARK_END)
00281                                         {
00282                                                 // without ACK support we have to assume that a bundle is forwarded
00283                                                 // when the last segment is sent.
00284                                                 _conn.eventBundleForwarded();
00285                                         }
00286 
00287                                         ibrcommon::MutexLock l(_sendlock);
00288                                         if (!_stream.good()) throw StreamErrorException("stream went bad");
00289 
00290                                         // write the segment to the stream
00291                                         _stream << seg;
00292                                         _stream.write(out_buf_, seg._value);
00293                                 }
00294 
00295                                 return traits_type::not_eof(c);
00296                         } catch (const StreamClosedException&) {
00297                                 // set failed bit
00298                                 set(STREAM_FAILED);
00299 
00300                                 IBRCOMMON_LOGGER_DEBUG(10) << "StreamClosedException in overflow()" << IBRCOMMON_LOGGER_ENDL;
00301 
00302                                 throw;
00303                         } catch (const StreamErrorException&) {
00304                                 // set failed bit
00305                                 set(STREAM_FAILED);
00306 
00307                                 IBRCOMMON_LOGGER_DEBUG(10) << "StreamErrorException in overflow()" << IBRCOMMON_LOGGER_ENDL;
00308 
00309                                 throw;
00310                         } catch (const ios_base::failure&) {
00311                                 // set failed bit
00312                                 set(STREAM_FAILED);
00313 
00314                                 IBRCOMMON_LOGGER_DEBUG(10) << "ios_base::failure in overflow()" << IBRCOMMON_LOGGER_ENDL;
00315 
00316                                 throw;
00317                         }
00318 
00319                         return traits_type::eof();
00320                 }
00321 
00322                 // This is called to flush the buffer.
00323                 // This is called when we're done with the file stream (or when .flush() is called).
00324                 int StreamConnection::StreamBuffer::sync()
00325                 {
00326                         int ret = traits_type::eq_int_type(this->overflow(traits_type::eof()),
00327                                                                                         traits_type::eof()) ? -1 : 0;
00328 
00329                         try {
00330                                 ibrcommon::MutexLock l(_sendlock);
00331 
00332                                 // ... and flush.
00333                                 _stream.flush();
00334                         } catch (const ios_base::failure&) {
00335                                 // set failed bit
00336                                 set(STREAM_BAD);
00337 
00338                                 _conn.shutdown(CONNECTION_SHUTDOWN_ERROR);
00339                         }
00340 
00341                         return ret;
00342                 }
00343 
00344                 void StreamConnection::StreamBuffer::skipData(size_t &size)
00345                 {
00346                         // a temporary buffer
00347                         char tmpbuf[_buffer_size];
00348 
00349                         try {
00350                                 //  and read until the next segment
00351                                 while (size > 0 && _stream.good())
00352                                 {
00353                                         size_t readsize = _buffer_size;
00354                                         if (size < _buffer_size) readsize = size;
00355 
00356                                         // to reject a bundle read all remaining data of this segment
00357                                         _stream.read(tmpbuf, readsize);
00358 
00359                                         // reset idle timeout
00360                                         _idle_timer.reset();
00361 
00362                                         // adjust the remain counter
00363                                         size -= readsize;
00364                                 }
00365                         } catch (const ios_base::failure &ex) {
00366                                 _underflow_state = IDLE;
00367                                 throw StreamErrorException("read error during data skip: " + std::string(ex.what()));
00368                         }
00369                 }
00370 
00371                 // Fill the input buffer.  This reads out of the streambuf.
00372                 std::char_traits<char>::int_type StreamConnection::StreamBuffer::underflow()
00373                 {
00374                         IBRCOMMON_LOGGER_DEBUG(90) << "StreamBuffer::underflow() called" << IBRCOMMON_LOGGER_ENDL;
00375 
00376                         try {
00377                                 if (_underflow_state == DATA_TRANSFER)
00378                                 {
00379                                         // on bundle reject
00380                                         if (get(STREAM_REJECT))
00381                                         {
00382                                                 // send NACK on bundle reject
00383                                                 if (get(STREAM_NACK_SUPPORT))
00384                                                 {
00385                                                         ibrcommon::MutexLock l(_sendlock);
00386                                                         if (!_stream.good()) throw StreamErrorException("stream went bad");
00387 
00388                                                         // send a REFUSE message
00389                                                         _stream << StreamDataSegment(StreamDataSegment::MSG_REFUSE_BUNDLE) << std::flush;
00390                                                 }
00391 
00392                                                 // skip data in this segment
00393                                                 skipData(_underflow_data_remain);
00394 
00395                                                 // return to idle state
00396                                                 _underflow_state = IDLE;
00397                                         }
00398                                         // send ACK if the data segment is received completely
00399                                         else if (_underflow_data_remain == 0)
00400                                         {
00401                                                 // New data segment received. Send an ACK.
00402                                                 if (get(STREAM_ACK_SUPPORT))
00403                                                 {
00404                                                         ibrcommon::MutexLock l(_sendlock);
00405                                                         if (!_stream.good()) throw StreamErrorException("stream went bad");
00406                                                         _stream << StreamDataSegment(StreamDataSegment::MSG_ACK_SEGMENT, _recv_size) << std::flush;
00407                                                 }
00408 
00409                                                 // return to idle state
00410                                                 _underflow_state = IDLE;
00411                                         }
00412                                 }
00413 
00414                                 // read segments until DATA is AVAILABLE
00415                                 while (_underflow_state == IDLE)
00416                                 {
00417                                         // container for segment data
00418                                         dtn::streams::StreamDataSegment seg;
00419 
00420                                         try {
00421                                                 // read the segment
00422                                                 if (!_stream.good()) throw StreamErrorException("stream went bad");
00423 
00424                                                 _stream >> seg;
00425                                         } catch (const ios_base::failure &ex) {
00426                                                 throw StreamErrorException("read error: " + std::string(ex.what()));
00427                                         }
00428 
00429                                         if (seg._type != StreamDataSegment::MSG_KEEPALIVE)
00430                                         {
00431                                                 // reset idle timeout
00432                                                 _idle_timer.reset();
00433                                         }
00434 
00435                                         switch (seg._type)
00436                                         {
00437                                                 case StreamDataSegment::MSG_DATA_SEGMENT:
00438                                                 {
00439                                                         IBRCOMMON_LOGGER_DEBUG(70) << "MSG_DATA_SEGMENT received, size: " << seg._value << IBRCOMMON_LOGGER_ENDL;
00440 
00441                                                         if (seg._flags & StreamDataSegment::MSG_MARK_BEGINN)
00442                                                         {
00443                                                                 _recv_size = seg._value;
00444                                                                 unset(STREAM_REJECT);
00445                                                         }
00446                                                         else
00447                                                         {
00448                                                                 _recv_size += seg._value;
00449                                                         }
00450 
00451                                                         // set the new data length
00452                                                         _underflow_data_remain = seg._value;
00453 
00454                                                         if (get(STREAM_REJECT))
00455                                                         {
00456                                                                 // send NACK on bundle reject
00457                                                                 if (get(STREAM_NACK_SUPPORT))
00458                                                                 {
00459                                                                         // lock for sending
00460                                                                         ibrcommon::MutexLock l(_sendlock);
00461                                                                         if (!_stream.good()) throw StreamErrorException("stream went bad");
00462 
00463                                                                         // send a NACK message
00464                                                                         _stream << StreamDataSegment(StreamDataSegment::MSG_REFUSE_BUNDLE, 0) << std::flush;
00465                                                                 }
00466 
00467                                                                 // skip data in this segment
00468                                                                 skipData(_underflow_data_remain);
00469                                                         }
00470                                                         else
00471                                                         {
00472                                                                 // announce the new data block
00473                                                                 _underflow_state = DATA_TRANSFER;
00474                                                         }
00475                                                         break;
00476                                                 }
00477 
00478                                                 case StreamDataSegment::MSG_ACK_SEGMENT:
00479                                                 {
00480                                                         IBRCOMMON_LOGGER_DEBUG(70) << "MSG_ACK_SEGMENT received, size: " << seg._value << IBRCOMMON_LOGGER_ENDL;
00481 
00482                                                         // remove the segment in the queue
00483                                                         if (get(STREAM_ACK_SUPPORT))
00484                                                         {
00485                                                                 ibrcommon::Queue<StreamDataSegment>::Locked q = _segments.exclusive();
00486                                                                 if (q.empty())
00487                                                                 {
00488                                                                         IBRCOMMON_LOGGER(error) << "got an unexpected ACK with size of " << seg._value << IBRCOMMON_LOGGER_ENDL;
00489                                                                 }
00490                                                                 else
00491                                                                 {
00492                                                                         StreamDataSegment &qs = q.front();
00493 
00494                                                                         if (qs._flags & StreamDataSegment::MSG_MARK_END)
00495                                                                         {
00496                                                                                 _conn.eventBundleForwarded();
00497                                                                         }
00498 
00499                                                                         IBRCOMMON_LOGGER_DEBUG(60) << q.size() << " elements to ACK" << IBRCOMMON_LOGGER_ENDL;
00500 
00501                                                                         _conn.eventBundleAck(seg._value);
00502 
00503                                                                         q.pop();
00504                                                                 }
00505                                                         }
00506                                                         break;
00507                                                 }
00508 
00509                                                 case StreamDataSegment::MSG_KEEPALIVE:
00510                                                         IBRCOMMON_LOGGER_DEBUG(70) << "MSG_KEEPALIVE received, size: " << seg._value << IBRCOMMON_LOGGER_ENDL;
00511                                                         break;
00512 
00513                                                 case StreamDataSegment::MSG_REFUSE_BUNDLE:
00514                                                 {
00515                                                         IBRCOMMON_LOGGER_DEBUG(70) << "MSG_REFUSE_BUNDLE received, flags: " << seg._flags << IBRCOMMON_LOGGER_ENDL;
00516 
00517                                                         // TODO: Test bundle rejection!
00518 
00519                                                         // remove the segment in the queue
00520                                                         if (get(STREAM_ACK_SUPPORT) && get(STREAM_NACK_SUPPORT))
00521                                                         {
00522                                                                 // skip segments
00523                                                                 if (!_rejected_segments.empty())
00524                                                                 {
00525                                                                         _rejected_segments.pop();
00526 
00527                                                                         // we received a NACK
00528                                                                         IBRCOMMON_LOGGER_DEBUG(30) << "NACK received, still " << _rejected_segments.size() << " segments to NACK" << IBRCOMMON_LOGGER_ENDL;
00529                                                                 }
00530                                                                 else try
00531                                                                 {
00532                                                                         StreamDataSegment qs = _segments.getnpop();
00533 
00534                                                                         // we received a NACK
00535                                                                         IBRCOMMON_LOGGER_DEBUG(20) << "NACK received!" << IBRCOMMON_LOGGER_ENDL;
00536 
00537                                                                         // get all segment ACKs in the queue for this transmission
00538                                                                         while (!_segments.empty())
00539                                                                         {
00540                                                                                 StreamDataSegment &seg = _segments.front();
00541                                                                                 if (seg._flags & StreamDataSegment::MSG_MARK_BEGINN)
00542                                                                                 {
00543                                                                                         break;
00544                                                                                 }
00545 
00546                                                                                 // move the segments to another queue
00547                                                                                 _rejected_segments.push(seg);
00548                                                                                 _segments.pop();
00549                                                                         }
00550 
00551                                                                         // call event reject
00552                                                                         _conn.eventBundleRefused();
00553 
00554                                                                         // we received a NACK
00555                                                                         IBRCOMMON_LOGGER_DEBUG(30) << _rejected_segments.size() << " segments to NACK" << IBRCOMMON_LOGGER_ENDL;
00556 
00557                                                                         // the queue is empty, then skip the current transfer
00558                                                                         if (_segments.empty())
00559                                                                         {
00560                                                                                 set(STREAM_SKIP);
00561 
00562                                                                                 // we received a NACK
00563                                                                                 IBRCOMMON_LOGGER_DEBUG(25) << "skip the current transfer" << IBRCOMMON_LOGGER_ENDL;
00564                                                                         }
00565 
00566                                                                 } catch (const ibrcommon::QueueUnblockedException&) {
00567                                                                         IBRCOMMON_LOGGER(error) << "got an unexpected NACK" << IBRCOMMON_LOGGER_ENDL;
00568                                                                 }
00569 
00570                                                         }
00571                                                         else
00572                                                         {
00573                                                                 IBRCOMMON_LOGGER(error) << "got an unexpected NACK" << IBRCOMMON_LOGGER_ENDL;
00574                                                         }
00575 
00576                                                         break;
00577                                                 }
00578 
00579                                                 case StreamDataSegment::MSG_SHUTDOWN:
00580                                                 {
00581                                                         IBRCOMMON_LOGGER_DEBUG(70) << "MSG_SHUTDOWN received" << IBRCOMMON_LOGGER_ENDL;
00582                                                         throw StreamShutdownException();
00583                                                 }
00584                                         }
00585                                 }
00586 
00587                                 // currently transferring data
00588                                 size_t readsize = _buffer_size;
00589                                 if (_underflow_data_remain < _buffer_size) readsize = _underflow_data_remain;
00590 
00591                                 try {
00592                                         if (!_stream.good()) throw StreamErrorException("stream went bad");
00593 
00594                                         // here receive the data
00595                                         _stream.read(in_buf_, readsize);
00596 
00597                                         // reset idle timeout
00598                                         _idle_timer.reset();
00599                                 } catch (const ios_base::failure &ex) {
00600                                         _underflow_state = IDLE;
00601                                         throw StreamErrorException("read error: " + std::string(ex.what()));
00602                                 }
00603 
00604                                 // adjust the remain counter
00605                                 _underflow_data_remain -= readsize;
00606 
00607                                 // Since the input buffer content is now valid (or is new)
00608                                 // the get pointer should be initialized (or reset).
00609                                 setg(in_buf_, in_buf_, in_buf_ + readsize);
00610 
00611                                 return traits_type::not_eof((unsigned char)in_buf_[0]);
00612 
00613                         } catch (const StreamClosedException&) {
00614                                 // set failed bit
00615                                 set(STREAM_FAILED);
00616 
00617                                 IBRCOMMON_LOGGER_DEBUG(10) << "StreamClosedException in underflow()" << IBRCOMMON_LOGGER_ENDL;
00618 
00619                         } catch (const StreamErrorException &ex) {
00620                                 // set failed bit
00621                                 set(STREAM_FAILED);
00622 
00623                                 IBRCOMMON_LOGGER_DEBUG(10) << "StreamErrorException in underflow(): " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00624 
00625                                 throw;
00626                         } catch (const StreamShutdownException&) {
00627                                 // set failed bit
00628                                 set(STREAM_FAILED);
00629 
00630                                 IBRCOMMON_LOGGER_DEBUG(10) << "StreamShutdownException in underflow()" << IBRCOMMON_LOGGER_ENDL;
00631                         }
00632 
00633                         return traits_type::eof();
00634                 }
00635 
00636                 size_t StreamConnection::StreamBuffer::timeout(ibrcommon::Timer *timer)
00637                 {
00638                         if (__good())
00639                         {
00640                                 shutdown(StreamDataSegment::MSG_SHUTDOWN_IDLE_TIMEOUT);
00641                         }
00642                         throw ibrcommon::Timer::StopTimerException();
00643                 }
00644 
00645                 void StreamConnection::StreamBuffer::enableIdleTimeout(size_t seconds)
00646                 {
00647                         _idle_timer.set(seconds);
00648                         _idle_timer.start();
00649                 }
00650         }
00651 }