32 : _buffer_size(buffer_size), _statebits(STREAM_SOB),
_conn(conn), in_buf_(buffer_size), out_buf_(buffer_size), _stream(stream),
33 _recv_size(0), _underflow_data_remain(0), _underflow_state(IDLE), _idle_timer(*
this, 0), _monitor(
false), _monitor_stats(2, 0)
37 setp(&out_buf_[0], &out_buf_[0] + _buffer_size - 1);
40 StreamConnection::StreamBuffer::~StreamBuffer()
46 bool StreamConnection::StreamBuffer::get(
const StateBits bit)
const
48 return (_statebits & bit);
51 void StreamConnection::StreamBuffer::set(
const StateBits bit)
57 void StreamConnection::StreamBuffer::unset(
const StateBits bit)
63 void StreamConnection::StreamBuffer::__error()
const
76 if (_statebits & STREAM_FAILED)
81 if (_statebits & STREAM_BAD)
86 if (_statebits & STREAM_EOF)
91 if (_statebits & STREAM_SHUTDOWN)
96 if (_statebits & STREAM_CLOSED)
107 bool StreamConnection::StreamBuffer::__good()
const
109 int badbits = STREAM_FAILED | STREAM_BAD | STREAM_EOF | STREAM_SHUTDOWN | STREAM_CLOSED;
110 return !(badbits & _statebits);
131 _stream << header << std::flush;
142 if (peer._keepalive > 0)
145 set(STREAM_TIMER_SUPPORT);
149 set(STREAM_HANDSHAKE);
151 }
catch (
const std::exception&) {
159 _conn.shutdown(CONNECTION_SHUTDOWN_ERROR);
162 throw StreamErrorException(
"handshake not completed");
180 }
catch (
const std::exception&) {
184 throw StreamErrorException(
"can not send shutdown message");
188 void StreamConnection::StreamBuffer::keepalive()
199 }
catch (
const std::exception&) {
205 void StreamConnection::StreamBuffer::close()
208 set(STREAM_SHUTDOWN);
211 void StreamConnection::StreamBuffer::reject()
222 void StreamConnection::StreamBuffer::abort()
227 void StreamConnection::StreamBuffer::wait()
244 std::char_traits<char>::int_type StreamConnection::StreamBuffer::overflow(std::char_traits<char>::int_type c)
249 char *ibegin = &out_buf_[0];
253 setp(&out_buf_[0], &out_buf_[0] + _buffer_size - 1);
256 if(!traits_type::eq_int_type(c, traits_type::eof())) {
257 *iend++ = traits_type::to_char_type(c);
261 if ((iend - ibegin) == 0)
263 return traits_type::not_eof(c);
277 if (char_traits<char>::eq_int_type(c, char_traits<char>::eof()))
284 if (!
get(STREAM_SKIP))
287 if (
get(STREAM_ACK_SUPPORT))
295 _conn.eventBundleForwarded();
299 if (!_stream.good())
throw StreamErrorException(
"stream went bad");
303 _stream.write(&out_buf_[0], seg._value.get<
size_t>());
307 _monitor_stats[1] += seg.
_value.
get<
size_t>();
311 return traits_type::not_eof(c);
312 }
catch (
const StreamClosedException&) {
319 }
catch (
const StreamErrorException&) {
326 }
catch (
const ios_base::failure&) {
335 return traits_type::eof();
340 int StreamConnection::StreamBuffer::sync()
342 int ret = traits_type::eq_int_type(this->overflow(traits_type::eof()),
343 traits_type::eof()) ? -1 : 0;
350 }
catch (
const ios_base::failure&) {
354 _conn.shutdown(CONNECTION_SHUTDOWN_ERROR);
363 std::vector<char> tmpbuf(_buffer_size);
367 while (size > 0 && _stream.good())
370 if (size < _buffer_size) readsize = size;
373 _stream.read(&tmpbuf[0], (std::streamsize)readsize);
377 _monitor_stats[0] += readsize;
386 }
catch (
const ios_base::failure &ex) {
387 _underflow_state = IDLE;
388 throw StreamErrorException(
"read error during data skip: " + std::string(ex.what()));
393 std::char_traits<char>::int_type StreamConnection::StreamBuffer::underflow()
398 if (_underflow_state == DATA_TRANSFER)
401 if (
get(STREAM_REJECT))
404 if (
get(STREAM_NACK_SUPPORT))
407 if (!_stream.good())
throw StreamErrorException(
"stream went bad");
414 skipData(_underflow_data_remain);
417 _underflow_state = IDLE;
420 else if (_underflow_data_remain == 0)
423 if (
get(STREAM_ACK_SUPPORT))
426 if (!_stream.good())
throw StreamErrorException(
"stream went bad");
431 _underflow_state = IDLE;
436 while (_underflow_state == IDLE)
443 if (!_stream.good())
throw StreamErrorException(
"stream went bad");
446 }
catch (
const ios_base::failure &ex) {
447 throw StreamErrorException(
"read error: " + std::string(ex.what()));
465 unset(STREAM_REJECT);
475 if (
get(STREAM_REJECT))
478 if (
get(STREAM_NACK_SUPPORT))
482 if (!_stream.good())
throw StreamErrorException(
"stream went bad");
489 skipData(_underflow_data_remain);
494 _underflow_state = DATA_TRANSFER;
504 if (
get(STREAM_ACK_SUPPORT))
517 _conn.eventBundleForwarded();
541 if (
get(STREAM_ACK_SUPPORT) &&
get(STREAM_NACK_SUPPORT))
544 if (!_rejected_segments.empty())
546 _rejected_segments.pop();
559 while (!_segments.empty())
568 _rejected_segments.push(seg);
573 _conn.eventBundleRefused();
579 if (_segments.empty())
603 throw StreamShutdownException();
610 if (_underflow_data_remain < _buffer_size) readsize = _underflow_data_remain;
613 if (!_stream.good())
throw StreamErrorException(
"stream went bad");
616 _stream.read(&in_buf_[0], (std::streamsize)readsize);
620 _monitor_stats[0] += readsize;
625 }
catch (
const ios_base::failure &ex) {
626 _underflow_state = IDLE;
627 throw StreamErrorException(
"read error: " + std::string(ex.what()));
631 _underflow_data_remain -= readsize;
635 setg(&in_buf_[0], &in_buf_[0], &in_buf_[0] + readsize);
637 return traits_type::not_eof(in_buf_[0]);
639 }
catch (
const StreamClosedException&) {
645 }
catch (
const StreamErrorException &ex) {
652 }
catch (
const StreamShutdownException&) {
659 return traits_type::eof();
671 void StreamConnection::StreamBuffer::enableIdleTimeout(
const dtn::data::Timeout &seconds)
673 _idle_timer.set(seconds);