IBR-DTNSuite  0.12
DatagramConnection.cpp
Go to the documentation of this file.
1 /*
2  * DatagramConnection.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "Configuration.h"
23 #include "net/DatagramConnection.h"
25 #include "core/BundleEvent.h"
27 #include "core/BundleCore.h"
28 
29 #include <ibrdtn/utils/Utils.h>
30 #include <ibrdtn/data/Serializer.h>
31 
33 #include <ibrcommon/Logger.h>
34 #include <string.h>
35 
36 #include <iomanip>
37 
38 #define AVG_RTT_WEIGHT 0.875
39 
40 namespace dtn
41 {
42  namespace net
43  {
44  const std::string DatagramConnection::TAG = "DatagramConnection";
45 
46  DatagramConnection::DatagramConnection(const std::string &identifier, const DatagramService::Parameter &params, DatagramConnectionCallback &callback)
47  : _send_state(SEND_IDLE), _recv_state(RECV_IDLE), _callback(callback), _identifier(identifier), _stream(*this, params.max_msg_length), _sender(*this, _stream),
48  _last_ack(0), _next_seqno(0), _head_buf(params.max_msg_length), _head_len(0), _params(params), _avg_rtt(static_cast<double>(params.initial_timeout))
49  {
50  }
51 
53  {
54  // do not destroy this instance as long as
55  // the sender thread is running
56  _sender.join();
57 
58  // join ourself
59  join();
60  }
61 
63  {
64  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 40) << "shutdown()" << IBRCOMMON_LOGGER_ENDL;
65 
66  // close the stream
68  }
69 
71  {
72  // close the stream
73  try {
74  _stream.close();
75  } catch (const ibrcommon::Exception&) { };
76  }
77 
78  void DatagramConnection::run() throw ()
79  {
80  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 40) << "run()" << IBRCOMMON_LOGGER_ENDL;
81 
82  // create a deserializer for the stream
84 
85  try {
86  while(_stream.good())
87  {
88  try {
89  dtn::data::Bundle bundle;
90 
91  // read the bundle out of the stream
92  deserializer >> bundle;
93 
94  // raise default bundle received event
95  dtn::net::BundleReceivedEvent::raise(_peer_eid, bundle, false);
96  } catch (const dtn::data::Validator::RejectedException &ex) {
97  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 25) << "Bundle rejected: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
98 
99  // TODO: send NACK
100  _stream.reject();
101  } catch (const dtn::InvalidDataException &ex) {
102  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 25) << "Received an invalid bundle: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
103 
104  // TODO: send NACK
105  _stream.reject();
106  }
107  }
108  } catch (std::exception &ex) {
109  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 25) << "Main-thread died: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
110  }
111  }
112 
114  {
115  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 40) << "setup()" << IBRCOMMON_LOGGER_ENDL;
116 
117  _callback.connectionUp(this);
118  _sender.start();
119  }
120 
122  {
123  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 40) << "finally()" << IBRCOMMON_LOGGER_ENDL;
124 
125  try {
126  ibrcommon::MutexLock l(_ack_cond);
127  _ack_cond.abort();
128  } catch (const std::exception&) { };
129 
130  try {
131  // shutdown the sender thread
132  _sender.stop();
133 
134  // wait until all operations are stopped
135  _sender.join();
136  } catch (const std::exception&) { };
137 
138  try {
139  // remove this connection from the connection list
140  _callback.connectionDown(this);
141  } catch (const ibrcommon::MutexException&) { };
142  }
143 
144  const std::string& DatagramConnection::getIdentifier() const
145  {
146  return _identifier;
147  }
148 
154  {
155  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 15) << "queue bundle " << job.getBundle().toString() << " to " << job.getNeighbor().getString() << IBRCOMMON_LOGGER_ENDL;
156  _sender.queue.push(job);
157  }
158 
164  void DatagramConnection::queue(const char &flags, const unsigned int &seqno, const char *buf, const dtn::data::Length &len)
165  {
166  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 25) << "frame received, flags: " << (int)flags << ", seqno: " << seqno << ", len: " << len << IBRCOMMON_LOGGER_ENDL;
167 
168  try {
169  // sequence number checks
170  if (_params.seq_check)
171  {
172  // we will accept every sequence number on first segments
173  // if this is not the first segment
174  if (!(flags & DatagramService::SEGMENT_FIRST))
175  {
176  // if the sequence number is not expected
177  if (_next_seqno != seqno)
178  // then drop it and send an ack
179  throw WrongSeqNoException(_next_seqno);
180  }
181  }
182 
183  // if this is the last segment then...
185  {
186  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 45) << "full segment received" << IBRCOMMON_LOGGER_ENDL;
187 
188  // forward the last segment to the stream
189  _stream.queue(buf, len, true);
190 
191  // switch to IDLE state
192  _recv_state = RECV_IDLE;
193  }
194  else if (flags & DatagramService::SEGMENT_FIRST)
195  {
196  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 45) << "first segment received" << IBRCOMMON_LOGGER_ENDL;
197 
198  // the first segment is only allowed on IDLE state or on
199  // retransmissions due to lost ACKs
200  if (_recv_state == RECV_IDLE)
201  {
202  // first segment received
203  // store the segment in a buffer
204  ::memcpy(&_head_buf[0], buf, len);
205  _head_len = len;
206 
207  // enter the HEAD state
208  _recv_state = RECV_HEAD;
209  }
210  else if (_recv_state == RECV_HEAD)
211  {
212  // last ACK seams to be lost or the peer has been restarted after
213  // sending the first segment
214  // overwrite the buffer with the new segment
215  ::memcpy(&_head_buf[0], buf, len);
216  _head_len = len;
217  }
218  else
219  {
220  // failure - abort the stream
221  throw DatagramException("stream went inconsistent");
222  }
223  }
224  else
225  {
226  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 45) << ((flags & DatagramService::SEGMENT_LAST) ? "last" : "middle") << " segment received" << IBRCOMMON_LOGGER_ENDL;
227 
228  // this is one segment after the HEAD flush the buffers
229  if (_recv_state == RECV_HEAD)
230  {
231  // forward HEAD buffer to the stream
232  _stream.queue(&_head_buf[0], _head_len, true);
233  _head_len = 0;
234 
235  // switch to TRANSMISSION state
236  _recv_state = RECV_TRANSMISSION;
237  }
238 
239  // forward the current segment to the stream
240  _stream.queue(buf, len, false);
241 
242  if (flags & DatagramService::SEGMENT_LAST)
243  {
244  // switch to IDLE state
245  _recv_state = RECV_IDLE;
246  }
247  }
248 
249  // increment next sequence number
250  _next_seqno = (seqno + 1) % _params.max_seq_numbers;
251  } catch (const WrongSeqNoException &ex) {
252  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 15) << "sequence number received " << seqno << ", expected " << ex.expected_seqno << IBRCOMMON_LOGGER_ENDL;
253  }
254 
256  {
257  // send ack for this message
258  _callback.callback_ack(*this, _next_seqno, getIdentifier());
259  }
260  }
261 
262  void DatagramConnection::stream_send(const char *buf, const dtn::data::Length &len, bool last) throw (DatagramException)
263  {
264  // build the right flags
265  char flags = 0;
266 
267  // if this is the first segment, then set the FIRST bit
268  if (_send_state == SEND_IDLE) flags |= DatagramService::SEGMENT_FIRST;
269 
270  // if this is the last segment, then set the LAST bit
271  if (last) flags |= DatagramService::SEGMENT_LAST;
272 
273  // set the seqno for this segment
274  unsigned int seqno = _last_ack;
275 
276  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 25) << "frame to send, flags: " << (int)flags << ", seqno: " << seqno << ", len: " << len << IBRCOMMON_LOGGER_ENDL;
277 
278  if (_params.flowcontrol == DatagramService::FLOW_STOPNWAIT)
279  {
280  // measure the time until the ack is received
282 
283  // start time measurement
284  tm.start();
285 
286  // max. 5 retries
287  for (size_t i = 0; i < _params.retry_limit; ++i)
288  {
289  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 30) << "transmit frame seqno: " << seqno << IBRCOMMON_LOGGER_ENDL;
290 
291  // send the datagram
292  _callback.callback_send(*this, flags, seqno, getIdentifier(), buf, len);
293 
294  // enter the wait state
295  _send_state = SEND_WAIT_ACK;
296 
297  // set timeout to twice the average round-trip-time
298  struct timespec ts;
299  ibrcommon::Conditional::gettimeout(static_cast<size_t>(_avg_rtt * 2) + 1, &ts);
300 
301  try {
302  ibrcommon::MutexLock l(_ack_cond);
303 
304  // wait here for an ACK
305  while (_last_ack != ((seqno + 1) % _params.max_seq_numbers))
306  {
307  _ack_cond.wait(&ts);
308  }
309 
310  // stop the measurement
311  tm.stop();
312 
313  // success!
314  _send_state = last ? SEND_IDLE : SEND_NEXT;
315 
316  // adjust the average rtt
317  adjust_rtt(tm.getMilliseconds());
318 
319  // report result
320  _callback.reportSuccess(i, tm.getMilliseconds());
321 
322  return;
325  {
326  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 20) << "ack timeout for seqno " << seqno << IBRCOMMON_LOGGER_ENDL;
327 
328  // fail -> increment the future timeout
329  adjust_rtt(static_cast<double>(_avg_rtt) * 2);
330 
331  // retransmit the frame
332  continue;
333  }
334  else
335  {
336  // aborted
337  break;
338  }
339  }
340  }
341 
342  // maximum number of retransmissions hit
343  _send_state = SEND_ERROR;
344 
345  // report failure
346  _callback.reportFailure();
347 
348  // transmission failed - abort the stream
349  throw DatagramException("transmission failed - abort the stream");
350  }
351  else if (_params.flowcontrol == DatagramService::FLOW_SLIDING_WINDOW)
352  {
353  try {
354  // lock the ACK variables and frame window
355  ibrcommon::MutexLock l(_ack_cond);
356 
357  // timeout value
358  struct timespec ts;
359 
360  // set timeout to twice the average round-trip-time
361  ibrcommon::Conditional::gettimeout(static_cast<size_t>(_avg_rtt * 2) + 1, &ts);
362 
363  // wait until window has at least one free slot
364  while (sw_frames_full()) _ack_cond.wait(&ts);
365 
366  // add new frame to the window
367  _sw_frames.push_back(window_frame());
368 
369  window_frame &new_frame = _sw_frames.back();
370 
371  new_frame.flags = flags;
372  new_frame.seqno = seqno;
373  new_frame.buf.assign(buf, buf+len);
374  new_frame.retry = 0;
375 
376  // start RTT measurement
377  new_frame.tm.start();
378 
379  // send the datagram
380  _callback.callback_send(*this, new_frame.flags, new_frame.seqno, getIdentifier(), &new_frame.buf[0], new_frame.buf.size());
381 
382  // increment next sequence number
383  _last_ack = (seqno + 1) % _params.max_seq_numbers;
384 
385  // enter the wait state
386  _send_state = SEND_WAIT_ACK;
387 
388  // set timeout to twice the average round-trip-time
389  ibrcommon::Conditional::gettimeout(static_cast<size_t>(_avg_rtt * 2) + 1, &ts);
390 
391  // wait until one more slot is available
392  // or no more frames are to ACK (if this was the last frame)
393  while ((last && !_sw_frames.empty()) || (!last && sw_frames_full()))
394  {
395  _ack_cond.wait(&ts);
396  }
399  {
400  // timeout - retransmit the whole window
401  sw_timeout(last);
402  }
403  else
404  {
405  // maximum number of retransmissions hit
406  _send_state = SEND_ERROR;
407 
408  // report failure
409  _callback.reportFailure();
410 
411  // transmission failed - abort the stream
412  throw DatagramException("transmission failed - abort the stream");
413  }
414  }
415 
416  // if this is the last segment switch directly to IDLE
417  _send_state = last ? SEND_IDLE : SEND_NEXT;
418  }
419  else
420  {
421  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 30) << "transmit frame seqno: " << seqno << IBRCOMMON_LOGGER_ENDL;
422 
423  // send the datagram
424  _callback.callback_send(*this, flags, seqno, getIdentifier(), buf, len);
425 
426  // if this is the last segment switch directly to IDLE
427  _send_state = last ? SEND_IDLE : SEND_NEXT;
428 
429  // increment next sequence number
430  ibrcommon::MutexLock l(_ack_cond);
431  _last_ack = (seqno + 1) % _params.max_seq_numbers;
432  }
433  }
434 
435  bool DatagramConnection::sw_frames_full()
436  {
437  return _sw_frames.size() >= (_params.max_seq_numbers / 2);
438  }
439 
440  void DatagramConnection::sw_timeout(bool last)
441  {
442  // timeout value
443  struct timespec ts;
444 
445  while (true) {
446  try {
447  ibrcommon::MutexLock l(_ack_cond);
448 
449  // fail -> increment the future timeout
450  adjust_rtt(static_cast<double>(_avg_rtt) * 2);
451 
452  if (_sw_frames.size() > 0)
453  {
454  window_frame &front_frame = _sw_frames.front();
455 
456  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 20) << "ack timeout for seqno " << front_frame.seqno << IBRCOMMON_LOGGER_ENDL;
457 
458  if (front_frame.retry > _params.retry_limit) {
459  // maximum number of retransmissions hit
460  _send_state = SEND_ERROR;
461 
462  // report failure
463  _callback.reportFailure();
464 
465  // transmission failed - abort the stream
466  throw DatagramException("transmission failed - abort the stream");
467  }
468  }
469 
470  // retransmit the window
471  for (std::list<window_frame>::iterator it = _sw_frames.begin(); it != _sw_frames.end(); ++it)
472  {
473  window_frame &retry_frame = (*it);
474 
475  // send the datagram
476  _callback.callback_send(*this, retry_frame.flags, retry_frame.seqno, getIdentifier(), &retry_frame.buf[0], retry_frame.buf.size());
477 
478  // increment retry counter
479  retry_frame.retry++;
480  }
481 
482  // enter the wait state
483  _send_state = SEND_WAIT_ACK;
484 
485  // set timeout to twice the average round-trip-time
486  ibrcommon::Conditional::gettimeout(static_cast<size_t>(_avg_rtt * 2) + 1, &ts);
487 
488  // wait until one more slot is available
489  // or no more frames are to ACK (if this was the last frame)
490  while ((last && !_sw_frames.empty()) || (!last && sw_frames_full()))
491  {
492  _ack_cond.wait(&ts);
493  }
496  {
497  // timeout again - repeat at while loop
498  continue;
499  }
500  }
501 
502  // done
503  return;
504  }
505  }
506 
507  void DatagramConnection::nack(const unsigned int &seqno, const bool temporary)
508  {
509  // if the NACK is temporary skip ignore it
510  // and repeat the frame after the timeout
511  if (temporary) return;
512 
513  // skip the currently transmitted bundle
514  _sender.skip();
515 
516  // handle the NACK as an ACK to move on with the next frame
517  ack(seqno);
518  }
519 
520  void DatagramConnection::ack(const unsigned int &seqno)
521  {
522  ibrcommon::MutexLock l(_ack_cond);
523 
524  switch (_params.flowcontrol) {
526  if (_sw_frames.size() > 0) {
527  window_frame &f = _sw_frames.front();
528 
529  if (seqno == ((f.seqno + 1) % _params.max_seq_numbers)) {
530  // stop the measurement
531  f.tm.stop();
532 
533  // adjust the average rtt
534  adjust_rtt(f.tm.getMilliseconds());
535 
536  // report result
537  _callback.reportSuccess(f.retry, f.tm.getMilliseconds());
538 
539  // remove front element
540  _sw_frames.pop_front();
541  }
542  }
543  break;
544 
545  default:
546  _last_ack = seqno;
547  break;
548  }
549 
550  _ack_cond.signal(true);
551  }
552 
554  {
555  _peer_eid = peer;
556  }
557 
559  {
560  return _peer_eid;
561  }
562 
563  void DatagramConnection::adjust_rtt(double value)
564  {
565  // convert current avg to float
566  double new_rtt = _avg_rtt;
567 
568  // calculate average
569  new_rtt = (new_rtt * AVG_RTT_WEIGHT) + ((1 - AVG_RTT_WEIGHT) * value);
570 
571  // assign the new value
572  _avg_rtt = new_rtt;
573 
574  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 40) << "RTT adjusted, measured value: " << std::setprecision(4) << value << ", new avg. RTT: " << std::setprecision(4) << _avg_rtt << IBRCOMMON_LOGGER_ENDL;
575  }
576 
577  DatagramConnection::Stream::Stream(DatagramConnection &conn, const dtn::data::Length &maxmsglen)
578  : std::iostream(this), _buf_size(maxmsglen), _first_segment(true), _last_segment(false),
579  _queue_buf(_buf_size), _queue_buf_len(0), _queue_buf_head(false),
580  _out_buf(_buf_size), _in_buf(_buf_size),
581  _abort(false), _skip(false), _reject(false), _callback(conn)
582  {
583  // Initialize get pointer. This should be zero so that underflow
584  // is called upon first read.
585  setg(0, 0, 0);
586 
587  // mark the buffer for outgoing data as free
588  // the +1 sparse the first byte in the buffer and leave room
589  // for the processing flags of the segment
590  setp(&_out_buf[0], &_out_buf[0] + _buf_size - 1);
591  }
592 
593  DatagramConnection::Stream::~Stream()
594  {
595  }
596 
597  void DatagramConnection::Stream::queue(const char *buf, const dtn::data::Length &len, bool isFirst) throw (DatagramException)
598  {
599  try {
600  ibrcommon::MutexLock l(_queue_buf_cond);
601  if (_abort) throw DatagramException("stream aborted");
602 
603  // wait until the buffer is free
604  while (_queue_buf_len > 0)
605  {
606  if (_abort) throw DatagramException("stream aborted");
607  _queue_buf_cond.wait();
608  }
609 
610  // copy the new data into the buffer, but leave out the first byte (header)
611  ::memcpy(&_queue_buf[0], buf, len);
612 
613  // store the buffer length
614  _queue_buf_len = len;
615  _queue_buf_head = isFirst;
616 
617  // notify waiting threads
618  _queue_buf_cond.signal();
620  throw DatagramException("stream aborted");
621  }
622  }
623 
624  void DatagramConnection::Stream::skip()
625  {
626  ibrcommon::MutexLock l(_queue_buf_cond);
627  _skip = true;
628  _queue_buf_cond.signal(true);
629  }
630 
631  void DatagramConnection::Stream::reject()
632  {
633  ibrcommon::MutexLock l(_queue_buf_cond);
634 
635  // set reject flag for futher frames
636  _reject = true;
637  _queue_buf_cond.signal(true);
638  }
639 
640  void DatagramConnection::Stream::close()
641  {
642  ibrcommon::MutexLock l(_queue_buf_cond);
643  _abort = true;
644  _queue_buf_cond.abort();
645  }
646 
647  int DatagramConnection::Stream::sync()
648  {
649  // We process the last segment in the set. Set this variable, so
650  // that this information is available for the overflow method.
651  _last_segment = true;
652 
653  int ret = std::char_traits<char>::eq_int_type(this->overflow(
654  std::char_traits<char>::eof()), std::char_traits<char>::eof()) ? -1
655  : 0;
656 
657  return ret;
658  }
659 
660  std::char_traits<char>::int_type DatagramConnection::Stream::overflow(std::char_traits<char>::int_type c)
661  {
662  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 40) << "Stream::overflow()" << IBRCOMMON_LOGGER_ENDL;
663 
664  if (_abort) throw DatagramException("stream aborted");
665 
666  char *ibegin = &_out_buf[0];
667  char *iend = pptr();
668 
669  // mark the buffer for outgoing data as free
670  // the +1 sparse the first byte in the buffer and leave room
671  // for the processing flags of the segment
672  setp(&_out_buf[0], &_out_buf[0] + _buf_size - 1);
673 
674  if (!std::char_traits<char>::eq_int_type(c, std::char_traits<char>::eof()))
675  {
676  *iend++ = std::char_traits<char>::to_char_type(c);
677  }
678 
679  // bytes to send
680  const dtn::data::Length bytes = (iend - ibegin);
681 
682  // if there is nothing to send, just return
683  if (bytes == 0)
684  {
685  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 35) << "Stream::overflow() nothing to sent" << IBRCOMMON_LOGGER_ENDL;
686  return std::char_traits<char>::not_eof(c);
687  }
688 
689  try {
690  // disable skipping if this is the first segment
691  if (_first_segment) _skip = false;
692 
693  // send segment to CL, use callback interface
694  if (!_skip) _callback.stream_send(&_out_buf[0], bytes, _last_segment);
695 
696  // set the flags for the next segment
697  _first_segment = _last_segment;
698  _last_segment = false;
699  } catch (const DatagramException &ex) {
700  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 35) << "Stream::overflow() exception: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
701 
702  // close this stream
703  close();
704 
705  // re-throw the DatagramException
706  throw;
707  }
708 
709  return std::char_traits<char>::not_eof(c);
710  }
711 
712  std::char_traits<char>::int_type DatagramConnection::Stream::underflow()
713  {
714  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 40) << "Stream::underflow()" << IBRCOMMON_LOGGER_ENDL;
715 
716  try {
717  ibrcommon::MutexLock l(_queue_buf_cond);
718  if (_abort) throw ibrcommon::Exception("stream aborted");
719 
720  // ignore this frame if this frame set is rejected
721  while ((_queue_buf_len == 0) || (_reject && !_queue_buf_head))
722  {
723  // clear the buffer
724  _queue_buf_len = 0;
725  _queue_buf_cond.signal(true);
726 
727  if (_abort) throw ibrcommon::Exception("stream aborted");
728  _queue_buf_cond.wait();
729  }
730 
731  // reset reject
732  _reject = false;
733 
734  // copy the queue buffer to an internal buffer
735  ::memcpy(&_in_buf[0], &_queue_buf[0], _queue_buf_len);
736 
737  // Since the input buffer content is now valid (or is new)
738  // the get pointer should be initialized (or reset).
739  setg(&_in_buf[0], &_in_buf[0], &_in_buf[0] + _queue_buf_len);
740 
741  // mark the queue buffer as free
742  _queue_buf_len = 0;
743  _queue_buf_cond.signal();
744 
745  return std::char_traits<char>::not_eof(_in_buf[0]);
747  throw DatagramException("stream aborted");
748  }
749  }
750 
751  DatagramConnection::Sender::Sender(DatagramConnection &conn, Stream &stream)
752  : _stream(stream), _connection(conn), _skip(false)
753  {
754  }
755 
756  DatagramConnection::Sender::~Sender()
757  {
758  }
759 
760  void DatagramConnection::Sender::skip() throw ()
761  {
762  // skip all data of the current transmission
763  _skip = true;
764  _stream.skip();
765  }
766 
767  void DatagramConnection::Sender::run() throw ()
768  {
769  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 40) << "Sender::run()"<< IBRCOMMON_LOGGER_ENDL;
770 
771  try {
772  // get reference to the storage
774 
775  // create a standard serializer
776  dtn::data::DefaultSerializer serializer(_stream);
777 
778  // as long as the stream is marked as good ...
779  while(_stream.good())
780  {
781  // get the next job
782  dtn::net::BundleTransfer job = queue.getnpop(true);
783 
784  try {
785  // read the bundle out of the storage
786  const dtn::data::Bundle bundle = storage.get(job.getBundle());
787 
788  // reset skip flag
789  _skip = false;
790 
791  // write the bundle into the stream
792  serializer << bundle; _stream.flush();
793 
794  // check if the stream is still marked as good
795  if (_stream.good())
796  {
797  // check if last transmission was refused
798  if (_skip) {
799  // send transfer aborted event
801  } else {
802  // bundle send completely - raise bundle event
803  job.complete();
804  }
805  }
806  } catch (const dtn::storage::NoBundleFoundException&) {
807  // could not load the bundle, abort the job
809  }
810  }
811 
812  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 25) << "Sender::run() stream destroyed"<< IBRCOMMON_LOGGER_ENDL;
813  } catch (const ibrcommon::QueueUnblockedException &ex) {
814  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 25) << "Sender::run() exception: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
815  return;
816  } catch (std::exception &ex) {
817  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 25) << "Sender::run() exception: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
818  }
819  }
820 
821  void DatagramConnection::Sender::finally() throw ()
822  {
823  }
824 
825  void DatagramConnection::Sender::__cancellation() throw ()
826  {
827  // abort all blocking operations on the stream
828  _stream.close();
829 
830  // abort blocking calls on the queue
831  queue.abort();
832  }
833  } /* namespace data */
834 } /* namespace dtn */