38 #define AVG_RTT_WEIGHT 0.875
44 const std::string DatagramConnection::TAG =
"DatagramConnection";
47 : _send_state(SEND_IDLE), _recv_state(RECV_IDLE), _callback(callback), _identifier(identifier), _stream(*this, params.max_msg_length), _sender(*this, _stream),
48 _last_ack(0), _next_seqno(0), _head_buf(params.max_msg_length), _head_len(0), _params(params), _avg_rtt(static_cast<double>(params.initial_timeout))
92 deserializer >> bundle;
108 }
catch (std::exception &ex) {
128 }
catch (
const std::exception&) { };
136 }
catch (
const std::exception&) { };
156 _sender.queue.push(job);
177 if (_next_seqno != seqno)
189 _stream.queue(buf, len,
true);
192 _recv_state = RECV_IDLE;
194 else if (flags & DatagramService::SEGMENT_FIRST)
200 if (_recv_state == RECV_IDLE)
204 ::memcpy(&_head_buf[0], buf, len);
208 _recv_state = RECV_HEAD;
210 else if (_recv_state == RECV_HEAD)
215 ::memcpy(&_head_buf[0], buf, len);
229 if (_recv_state == RECV_HEAD)
232 _stream.queue(&_head_buf[0], _head_len,
true);
236 _recv_state = RECV_TRANSMISSION;
240 _stream.queue(buf, len,
false);
242 if (flags & DatagramService::SEGMENT_LAST)
245 _recv_state = RECV_IDLE;
274 unsigned int seqno = _last_ack;
287 for (
size_t i = 0; i < _params.retry_limit; ++i)
292 _callback.callback_send(*
this, flags, seqno, getIdentifier(), buf, len);
295 _send_state = SEND_WAIT_ACK;
305 while (_last_ack != ((seqno + 1) % _params.max_seq_numbers))
314 _send_state = last ? SEND_IDLE : SEND_NEXT;
329 adjust_rtt(static_cast<double>(_avg_rtt) * 2);
343 _send_state = SEND_ERROR;
346 _callback.reportFailure();
364 while (sw_frames_full()) _ack_cond.wait(&ts);
367 _sw_frames.push_back(window_frame());
369 window_frame &new_frame = _sw_frames.back();
371 new_frame.flags = flags;
372 new_frame.seqno = seqno;
373 new_frame.buf.assign(buf, buf+len);
377 new_frame.tm.start();
380 _callback.callback_send(*
this, new_frame.flags, new_frame.seqno, getIdentifier(), &new_frame.buf[0], new_frame.buf.size());
383 _last_ack = (seqno + 1) % _params.max_seq_numbers;
386 _send_state = SEND_WAIT_ACK;
393 while ((last && !_sw_frames.empty()) || (!last && sw_frames_full()))
406 _send_state = SEND_ERROR;
409 _callback.reportFailure();
417 _send_state = last ? SEND_IDLE : SEND_NEXT;
424 _callback.callback_send(*
this, flags, seqno, getIdentifier(), buf, len);
427 _send_state = last ? SEND_IDLE : SEND_NEXT;
431 _last_ack = (seqno + 1) % _params.max_seq_numbers;
435 bool DatagramConnection::sw_frames_full()
440 void DatagramConnection::sw_timeout(
bool last)
450 adjust_rtt(static_cast<double>(_avg_rtt) * 2);
452 if (_sw_frames.size() > 0)
454 window_frame &front_frame = _sw_frames.front();
460 _send_state = SEND_ERROR;
471 for (std::list<window_frame>::iterator it = _sw_frames.begin(); it != _sw_frames.end(); ++it)
473 window_frame &retry_frame = (*it);
476 _callback.
callback_send(*
this, retry_frame.flags, retry_frame.seqno,
getIdentifier(), &retry_frame.buf[0], retry_frame.buf.size());
483 _send_state = SEND_WAIT_ACK;
490 while ((last && !_sw_frames.empty()) || (!last && sw_frames_full()))
511 if (temporary)
return;
526 if (_sw_frames.size() > 0) {
527 window_frame &f = _sw_frames.front();
534 adjust_rtt(f.tm.getMilliseconds());
540 _sw_frames.pop_front();
563 void DatagramConnection::adjust_rtt(
double value)
566 double new_rtt = _avg_rtt;
578 : std::iostream(this), _buf_size(maxmsglen), _first_segment(true), _last_segment(false),
579 _queue_buf(_buf_size), _queue_buf_len(0), _queue_buf_head(false),
580 _out_buf(_buf_size), _in_buf(_buf_size),
581 _abort(false), _skip(false), _reject(false), _callback(conn)
590 setp(&_out_buf[0], &_out_buf[0] + _buf_size - 1);
593 DatagramConnection::Stream::~Stream()
604 while (_queue_buf_len > 0)
607 _queue_buf_cond.wait();
611 ::memcpy(&_queue_buf[0], buf, len);
614 _queue_buf_len = len;
615 _queue_buf_head = isFirst;
618 _queue_buf_cond.signal();
624 void DatagramConnection::Stream::skip()
628 _queue_buf_cond.signal(
true);
631 void DatagramConnection::Stream::reject()
637 _queue_buf_cond.signal(
true);
640 void DatagramConnection::Stream::close()
644 _queue_buf_cond.abort();
647 int DatagramConnection::Stream::sync()
651 _last_segment =
true;
653 int ret = std::char_traits<char>::eq_int_type(this->overflow(
654 std::char_traits<char>::eof()), std::char_traits<char>::eof()) ? -1
660 std::char_traits<char>::int_type DatagramConnection::Stream::overflow(std::char_traits<char>::int_type c)
666 char *ibegin = &_out_buf[0];
672 setp(&_out_buf[0], &_out_buf[0] + _buf_size - 1);
674 if (!std::char_traits<char>::eq_int_type(c, std::char_traits<char>::eof()))
676 *iend++ = std::char_traits<char>::to_char_type(c);
686 return std::char_traits<char>::not_eof(c);
691 if (_first_segment) _skip =
false;
694 if (!_skip) _callback.stream_send(&_out_buf[0], bytes, _last_segment);
697 _first_segment = _last_segment;
698 _last_segment =
false;
709 return std::char_traits<char>::not_eof(c);
712 std::char_traits<char>::int_type DatagramConnection::Stream::underflow()
721 while ((_queue_buf_len == 0) || (_reject && !_queue_buf_head))
725 _queue_buf_cond.signal(
true);
728 _queue_buf_cond.wait();
735 ::memcpy(&_in_buf[0], &_queue_buf[0], _queue_buf_len);
739 setg(&_in_buf[0], &_in_buf[0], &_in_buf[0] + _queue_buf_len);
743 _queue_buf_cond.signal();
745 return std::char_traits<char>::not_eof(_in_buf[0]);
752 : _stream(stream), _connection(conn), _skip(false)
756 DatagramConnection::Sender::~Sender()
760 void DatagramConnection::Sender::skip() throw ()
767 void DatagramConnection::Sender::run() throw ()
779 while(_stream.good())
792 serializer << bundle; _stream.flush();
816 }
catch (std::exception &ex) {
821 void DatagramConnection::Sender::finally() throw ()
825 void DatagramConnection::Sender::__cancellation() throw ()