38 #define AVG_RTT_WEIGHT 0.875
47 : _send_state(SEND_IDLE), _recv_state(RECV_IDLE), _callback(callback), _identifier(identifier), _stream(*this, params.max_msg_length), _sender(*this, _stream),
48 _last_ack(0), _next_seqno(0), _head_buf(params.max_msg_length), _head_len(0), _params(params), _avg_rtt(static_cast<double>(params.initial_timeout))
88 deserializer >> bundle;
95 }
catch (std::exception &ex) {
115 }
catch (
const std::exception&) { };
123 }
catch (
const std::exception&) { };
143 _sender.queue.push(job);
164 if (_next_seqno != seqno)
176 _stream.queue(buf, len);
179 _recv_state = RECV_IDLE;
181 else if (flags & DatagramService::SEGMENT_FIRST)
187 if (_recv_state == RECV_IDLE)
191 ::memcpy(&_head_buf[0], buf, len);
195 _recv_state = RECV_HEAD;
197 else if (_recv_state == RECV_HEAD)
202 ::memcpy(&_head_buf[0], buf, len);
216 if (_recv_state == RECV_HEAD)
219 _stream.queue(&_head_buf[0], _head_len);
223 _recv_state = RECV_TRANSMISSION;
227 _stream.queue(buf, len);
229 if (flags & DatagramService::SEGMENT_LAST)
232 _recv_state = RECV_IDLE;
264 unsigned int seqno = _last_ack;
274 for (
int i = 0; i < 5; ++i)
279 _callback.callback_send(*
this, flags, seqno, getIdentifier(), buf, len);
282 _send_state = SEND_WAIT_ACK;
292 while (_last_ack != ((seqno + 1) % _params.max_seq_numbers))
298 _send_state = last ? SEND_IDLE : SEND_NEXT;
311 adjust_rtt(static_cast<double>(_avg_rtt) * 2);
319 _send_state = SEND_ERROR;
326 _send_state = last ? SEND_IDLE : SEND_NEXT;
330 _last_ack = (seqno + 1) % _params.max_seq_numbers;
347 void DatagramConnection::adjust_rtt(
double value)
350 double new_rtt = _avg_rtt;
362 : std::iostream(this), _buf_size(maxmsglen), _last_segment(false),
363 _queue_buf(_buf_size), _queue_buf_len(0),
364 _out_buf(_buf_size), _in_buf(_buf_size),
365 _abort(false), _callback(conn)
374 setp(&_out_buf[0], &_out_buf[0] + _buf_size - 1);
377 DatagramConnection::Stream::~Stream()
388 while (_queue_buf_len > 0)
390 _queue_buf_cond.wait();
394 ::memcpy(&_queue_buf[0], buf, len);
397 _queue_buf_len = len;
400 _queue_buf_cond.signal();
406 void DatagramConnection::Stream::close()
410 _queue_buf_cond.abort();
413 int DatagramConnection::Stream::sync()
416 _last_segment =
true;
418 int ret = std::char_traits<char>::eq_int_type(this->overflow(
419 std::char_traits<char>::eof()), std::char_traits<char>::eof()) ? -1
423 _last_segment =
false;
428 std::char_traits<char>::int_type DatagramConnection::Stream::overflow(std::char_traits<char>::int_type c)
434 char *ibegin = &_out_buf[0];
440 setp(&_out_buf[0], &_out_buf[0] + _buf_size - 1);
442 if (!std::char_traits<char>::eq_int_type(c, std::char_traits<char>::eof()))
444 *iend++ = std::char_traits<char>::to_char_type(c);
454 return std::char_traits<char>::not_eof(c);
459 _callback.stream_send(&_out_buf[0], bytes, _last_segment);
470 return std::char_traits<char>::not_eof(c);
473 std::char_traits<char>::int_type DatagramConnection::Stream::underflow()
480 while (_queue_buf_len == 0)
483 _queue_buf_cond.wait();
487 ::memcpy(&_in_buf[0], &_queue_buf[0], _queue_buf_len);
491 setg(&_in_buf[0], &_in_buf[0], &_in_buf[0] + _queue_buf_len);
495 _queue_buf_cond.signal();
497 return std::char_traits<char>::not_eof(_in_buf[0]);
504 : _stream(stream), _connection(conn)
508 DatagramConnection::Sender::~Sender()
512 void DatagramConnection::Sender::run() throw ()
524 while(_stream.good())
534 serializer << bundle; _stream.flush();
552 }
catch (std::exception &ex) {
557 void DatagramConnection::Sender::finally() throw ()
561 void DatagramConnection::Sender::__cancellation() throw ()