32 : _buffer_size(buffer_size), _statebits(STREAM_SOB),
_conn(conn), in_buf_(buffer_size), out_buf_(buffer_size), _stream(stream),
33 _recv_size(0), _underflow_data_remain(0), _underflow_state(IDLE), _idle_timer(*
this, 0)
37 setp(&out_buf_[0], &out_buf_[0] + _buffer_size - 1);
40 StreamConnection::StreamBuffer::~StreamBuffer()
46 bool StreamConnection::StreamBuffer::get(
const StateBits bit)
const
48 return (_statebits & bit);
51 void StreamConnection::StreamBuffer::set(
const StateBits bit)
57 void StreamConnection::StreamBuffer::unset(
const StateBits bit)
63 void StreamConnection::StreamBuffer::__error()
const
76 if (_statebits & STREAM_FAILED)
81 if (_statebits & STREAM_BAD)
86 if (_statebits & STREAM_EOF)
91 if (_statebits & STREAM_SHUTDOWN)
96 if (_statebits & STREAM_CLOSED)
107 bool StreamConnection::StreamBuffer::__good()
const
109 int badbits = STREAM_FAILED | STREAM_BAD | STREAM_EOF | STREAM_SHUTDOWN | STREAM_CLOSED;
110 return !(badbits & _statebits);
131 _stream << header << std::flush;
142 if (peer._keepalive > 0)
145 set(STREAM_TIMER_SUPPORT);
149 set(STREAM_HANDSHAKE);
151 }
catch (
const std::exception&) {
159 _conn.shutdown(CONNECTION_SHUTDOWN_ERROR);
162 throw StreamErrorException(
"handshake not completed");
180 }
catch (
const std::exception&) {
184 throw StreamErrorException(
"can not send shutdown message");
188 void StreamConnection::StreamBuffer::keepalive()
199 }
catch (
const std::exception&) {
205 void StreamConnection::StreamBuffer::close()
208 set(STREAM_SHUTDOWN);
211 void StreamConnection::StreamBuffer::reject()
222 void StreamConnection::StreamBuffer::abort()
227 void StreamConnection::StreamBuffer::wait()
244 std::char_traits<char>::int_type StreamConnection::StreamBuffer::overflow(std::char_traits<char>::int_type c)
249 char *ibegin = &out_buf_[0];
253 setp(&out_buf_[0], &out_buf_[0] + _buffer_size - 1);
256 if(!traits_type::eq_int_type(c, traits_type::eof())) {
257 *iend++ = traits_type::to_char_type(c);
261 if ((iend - ibegin) == 0)
263 return traits_type::not_eof(c);
277 if (char_traits<char>::eq_int_type(c, char_traits<char>::eof()))
284 if (!
get(STREAM_SKIP))
287 if (
get(STREAM_ACK_SUPPORT))
295 _conn.eventBundleForwarded();
299 if (!_stream.good())
throw StreamErrorException(
"stream went bad");
303 _stream.write(&out_buf_[0], seg._value.get<
size_t>());
306 _conn._callback.addTrafficOut(seg._value.get<
size_t>());
309 return traits_type::not_eof(c);
310 }
catch (
const StreamClosedException&) {
317 }
catch (
const StreamErrorException&) {
324 }
catch (
const ios_base::failure&) {
333 return traits_type::eof();
338 int StreamConnection::StreamBuffer::sync()
340 int ret = traits_type::eq_int_type(this->overflow(traits_type::eof()),
341 traits_type::eof()) ? -1 : 0;
348 }
catch (
const ios_base::failure&) {
352 _conn.shutdown(CONNECTION_SHUTDOWN_ERROR);
361 std::vector<char> tmpbuf(_buffer_size);
365 while (size > 0 && _stream.good())
368 if (size < _buffer_size) readsize = size;
371 _stream.read(&tmpbuf[0], (std::streamsize)readsize);
374 _conn._callback.addTrafficIn(readsize);
382 }
catch (
const ios_base::failure &ex) {
383 _underflow_state = IDLE;
384 throw StreamErrorException(
"read error during data skip: " + std::string(ex.what()));
389 std::char_traits<char>::int_type StreamConnection::StreamBuffer::underflow()
394 if (_underflow_state == DATA_TRANSFER)
397 if (
get(STREAM_REJECT))
400 if (
get(STREAM_NACK_SUPPORT))
403 if (!_stream.good())
throw StreamErrorException(
"stream went bad");
410 skipData(_underflow_data_remain);
413 _underflow_state = IDLE;
416 else if (_underflow_data_remain == 0)
419 if (
get(STREAM_ACK_SUPPORT))
422 if (!_stream.good())
throw StreamErrorException(
"stream went bad");
427 _underflow_state = IDLE;
432 while (_underflow_state == IDLE)
439 if (!_stream.good())
throw StreamErrorException(
"stream went bad");
442 }
catch (
const ios_base::failure &ex) {
443 throw StreamErrorException(
"read error: " + std::string(ex.what()));
461 unset(STREAM_REJECT);
471 if (
get(STREAM_REJECT))
474 if (
get(STREAM_NACK_SUPPORT))
478 if (!_stream.good())
throw StreamErrorException(
"stream went bad");
485 skipData(_underflow_data_remain);
490 _underflow_state = DATA_TRANSFER;
500 if (
get(STREAM_ACK_SUPPORT))
517 _conn.eventBundleForwarded();
537 if (
get(STREAM_ACK_SUPPORT) &&
get(STREAM_NACK_SUPPORT))
540 if (!_rejected_segments.empty())
542 _rejected_segments.pop();
555 while (!_segments.empty())
564 _rejected_segments.push(seg);
569 _conn.eventBundleRefused();
575 if (_segments.empty())
599 throw StreamShutdownException();
606 if (_underflow_data_remain < _buffer_size) readsize = _underflow_data_remain;
609 if (!_stream.good())
throw StreamErrorException(
"stream went bad");
612 _stream.read(&in_buf_[0], (std::streamsize)readsize);
615 _conn._callback.addTrafficIn(readsize);
619 }
catch (
const ios_base::failure &ex) {
620 _underflow_state = IDLE;
621 throw StreamErrorException(
"read error: " + std::string(ex.what()));
625 _underflow_data_remain -= readsize;
629 setg(&in_buf_[0], &in_buf_[0], &in_buf_[0] + readsize);
631 return traits_type::not_eof(in_buf_[0]);
633 }
catch (
const StreamClosedException&) {
639 }
catch (
const StreamErrorException &ex) {
646 }
catch (
const StreamShutdownException&) {
653 return traits_type::eof();
665 void StreamConnection::StreamBuffer::enableIdleTimeout(
const dtn::data::Timeout &seconds)
667 _idle_timer.set(seconds);