IBR-DTNSuite  0.10
DatagramConnection.cpp
Go to the documentation of this file.
1 /*
2  * DatagramConnection.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "Configuration.h"
23 #include "net/DatagramConnection.h"
25 #include "core/BundleEvent.h"
27 #include "core/BundleCore.h"
28 
29 #include <ibrdtn/utils/Utils.h>
30 #include <ibrdtn/data/Serializer.h>
31 
33 #include <ibrcommon/Logger.h>
34 #include <string.h>
35 
36 #include <iomanip>
37 
38 #define AVG_RTT_WEIGHT 0.875
39 
40 namespace dtn
41 {
42  namespace net
43  {
44  const std::string DatagramConnection::TAG = "DatagramConnection";
45 
46  DatagramConnection::DatagramConnection(const std::string &identifier, const DatagramService::Parameter &params, DatagramConnectionCallback &callback)
47  : _send_state(SEND_IDLE), _recv_state(RECV_IDLE), _callback(callback), _identifier(identifier), _stream(*this, params.max_msg_length), _sender(*this, _stream),
48  _last_ack(0), _next_seqno(0), _head_buf(params.max_msg_length), _head_len(0), _params(params), _avg_rtt(static_cast<double>(params.initial_timeout))
49  {
50  }
51 
53  {
54  // do not destroy this instance as long as
55  // the sender thread is running
56  _sender.join();
57  }
58 
60  {
62 
63  // close the stream
65  }
66 
68  {
69  // close the stream
70  try {
71  _stream.close();
72  } catch (const ibrcommon::Exception&) { };
73  }
74 
75  void DatagramConnection::run() throw ()
76  {
78 
79  // create a deserializer for the stream
81 
82  try {
83  while(_stream.good())
84  {
85  dtn::data::Bundle bundle;
86 
87  // read the bundle out of the stream
88  deserializer >> bundle;
89 
90  // raise default bundle received event
91  dtn::net::BundleReceivedEvent::raise(_peer_eid, bundle, false);
92  }
93  } catch (const dtn::InvalidDataException &ex) {
94  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 25) << "Received an invalid bundle: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
95  } catch (std::exception &ex) {
96  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 25) << "Main-thread died: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
97  }
98  }
99 
101  {
103 
104  _callback.connectionUp(this);
105  _sender.start();
106  }
107 
109  {
111 
112  try {
113  ibrcommon::MutexLock l(_ack_cond);
114  _ack_cond.abort();
115  } catch (const std::exception&) { };
116 
117  try {
118  // shutdown the sender thread
119  _sender.stop();
120 
121  // wait until all operations are stopped
122  _sender.join();
123  } catch (const std::exception&) { };
124 
125  try {
126  // remove this connection from the connection list
127  _callback.connectionDown(this);
128  } catch (const ibrcommon::MutexException&) { };
129  }
130 
131  const std::string& DatagramConnection::getIdentifier() const
132  {
133  return _identifier;
134  }
135 
141  {
142  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 15) << "queue bundle " << job.getBundle().toString() << " to " << job.getNeighbor().getString() << IBRCOMMON_LOGGER_ENDL;
143  _sender.queue.push(job);
144  }
145 
151  void DatagramConnection::queue(const char &flags, const unsigned int &seqno, const char *buf, const dtn::data::Length &len)
152  {
153  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 25) << "frame received, flags: " << (int)flags << ", seqno: " << seqno << ", len: " << len << IBRCOMMON_LOGGER_ENDL;
154 
155  try {
156  // sequence number checks
157  if (_params.seq_check)
158  {
159  // we will accept every sequence number on first segments
160  // if this is not the first segment
161  if (!(flags & DatagramService::SEGMENT_FIRST))
162  {
163  // if the sequence number is not expected
164  if (_next_seqno != seqno)
165  // then drop it and send an ack
166  throw WrongSeqNoException(_next_seqno);
167  }
168  }
169 
170  // if this is the last segment then...
172  {
174 
175  // forward the last segment to the stream
176  _stream.queue(buf, len);
177 
178  // switch to IDLE state
179  _recv_state = RECV_IDLE;
180  }
181  else if (flags & DatagramService::SEGMENT_FIRST)
182  {
184 
185  // the first segment is only allowed on IDLE state or on
186  // retransmissions due to lost ACKs
187  if (_recv_state == RECV_IDLE)
188  {
189  // first segment received
190  // store the segment in a buffer
191  ::memcpy(&_head_buf[0], buf, len);
192  _head_len = len;
193 
194  // enter the HEAD state
195  _recv_state = RECV_HEAD;
196  }
197  else if (_recv_state == RECV_HEAD)
198  {
199  // last ACK seams to be lost or the peer has been restarted after
200  // sending the first segment
201  // overwrite the buffer with the new segment
202  ::memcpy(&_head_buf[0], buf, len);
203  _head_len = len;
204  }
205  else
206  {
207  // failure - abort the stream
208  throw DatagramException("stream went inconsistent");
209  }
210  }
211  else
212  {
213  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 45) << ((flags & DatagramService::SEGMENT_LAST) ? "last" : "middle") << " segment received" << IBRCOMMON_LOGGER_ENDL;
214 
215  // this is one segment after the HEAD flush the buffers
216  if (_recv_state == RECV_HEAD)
217  {
218  // forward HEAD buffer to the stream
219  _stream.queue(&_head_buf[0], _head_len);
220  _head_len = 0;
221 
222  // switch to TRANSMISSION state
223  _recv_state = RECV_TRANSMISSION;
224  }
225 
226  // forward the current segment to the stream
227  _stream.queue(buf, len);
228 
229  if (flags & DatagramService::SEGMENT_LAST)
230  {
231  // switch to IDLE state
232  _recv_state = RECV_IDLE;
233  }
234  }
235 
236  // increment next sequence number
237  _next_seqno = (seqno + 1) % _params.max_seq_numbers;
238  } catch (const WrongSeqNoException &ex) {
239  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 15) << "sequence number received " << seqno << ", expected " << ex.expected_seqno << IBRCOMMON_LOGGER_ENDL;
240  }
241 
243  {
244  // send ack for this message
245  _callback.callback_ack(*this, _next_seqno, getIdentifier());
246  }
247  }
248 
249  void DatagramConnection::stream_send(const char *buf, const dtn::data::Length &len, bool last) throw (DatagramException)
250  {
251  // measure the time until the ack is received
253 
254  // build the right flags
255  char flags = 0;
256 
257  // if this is the first segment, then set the FIRST bit
258  if (_send_state == SEND_IDLE) flags |= DatagramService::SEGMENT_FIRST;
259 
260  // if this is the last segment, then set the LAST bit
261  if (last) flags |= DatagramService::SEGMENT_LAST;
262 
263  // set the seqno for this segment
264  unsigned int seqno = _last_ack;
265 
266  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 25) << "frame to send, flags: " << (int)flags << ", seqno: " << seqno << ", len: " << len << IBRCOMMON_LOGGER_ENDL;
267 
268  if (_params.flowcontrol == DatagramService::FLOW_STOPNWAIT)
269  {
270  // start time measurement
271  tm.start();
272 
273  // max. 5 retries
274  for (int i = 0; i < 5; ++i)
275  {
276  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 30) << "transmit frame seqno: " << seqno << IBRCOMMON_LOGGER_ENDL;
277 
278  // send the datagram
279  _callback.callback_send(*this, flags, seqno, getIdentifier(), buf, len);
280 
281  // enter the wait state
282  _send_state = SEND_WAIT_ACK;
283 
284  // set timeout to twice the average round-trip-time
285  struct timespec ts;
286  ibrcommon::Conditional::gettimeout(static_cast<size_t>(_avg_rtt * 2) + 1, &ts);
287 
288  try {
289  ibrcommon::MutexLock l(_ack_cond);
290 
291  // wait here for an ACK
292  while (_last_ack != ((seqno + 1) % _params.max_seq_numbers))
293  {
294  _ack_cond.wait(&ts);
295  }
296 
297  // success!
298  _send_state = last ? SEND_IDLE : SEND_NEXT;
299 
300  // stop the measurement
301  tm.stop();
302 
303  // adjust the average rtt
304  adjust_rtt(tm.getMilliseconds());
305 
306  return;
308  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 20) << "ack timeout for seqno " << seqno << IBRCOMMON_LOGGER_ENDL;
309 
310  // fail -> increment the future timeout
311  adjust_rtt(static_cast<double>(_avg_rtt) * 2);
312 
313  // retransmit the frame
314  continue;
315  }
316  }
317 
318  // maximum number of retransmissions hit
319  _send_state = SEND_ERROR;
320 
321  // transmission failed - abort the stream
322  throw DatagramException("transmission failed - abort the stream");
323  }
324 
325  // if this is the last segment switch directly to IDLE
326  _send_state = last ? SEND_IDLE : SEND_NEXT;
327 
328  // increment next sequence number
329  ibrcommon::MutexLock l(_ack_cond);
330  _last_ack = (seqno + 1) % _params.max_seq_numbers;
331  }
332 
333  void DatagramConnection::ack(const unsigned int &seqno)
334  {
335  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 20) << "ack received for seqno " << seqno << IBRCOMMON_LOGGER_ENDL;
336 
337  ibrcommon::MutexLock l(_ack_cond);
338  _last_ack = seqno;
339  _ack_cond.signal(true);
340  }
341 
343  {
344  _peer_eid = peer;
345  }
346 
347  void DatagramConnection::adjust_rtt(double value)
348  {
349  // convert current avg to float
350  double new_rtt = _avg_rtt;
351 
352  // calculate average
353  new_rtt = (new_rtt * AVG_RTT_WEIGHT) + ((1 - AVG_RTT_WEIGHT) * value);
354 
355  // assign the new value
356  _avg_rtt = new_rtt;
357 
358  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 40) << "RTT adjusted, measured value: " << std::setprecision(4) << value << ", new avg. RTT: " << std::setprecision(4) << _avg_rtt << IBRCOMMON_LOGGER_ENDL;
359  }
360 
361  DatagramConnection::Stream::Stream(DatagramConnection &conn, const dtn::data::Length &maxmsglen)
362  : std::iostream(this), _buf_size(maxmsglen), _last_segment(false),
363  _queue_buf(_buf_size), _queue_buf_len(0),
364  _out_buf(_buf_size), _in_buf(_buf_size),
365  _abort(false), _callback(conn)
366  {
367  // Initialize get pointer. This should be zero so that underflow
368  // is called upon first read.
369  setg(0, 0, 0);
370 
371  // mark the buffer for outgoing data as free
372  // the +1 sparse the first byte in the buffer and leave room
373  // for the processing flags of the segment
374  setp(&_out_buf[0], &_out_buf[0] + _buf_size - 1);
375  }
376 
377  DatagramConnection::Stream::~Stream()
378  {
379  }
380 
381  void DatagramConnection::Stream::queue(const char *buf, const dtn::data::Length &len) throw (DatagramException)
382  {
383  try {
384  ibrcommon::MutexLock l(_queue_buf_cond);
385  if (_abort) throw DatagramException("stream aborted");
386 
387  // wait until the buffer is free
388  while (_queue_buf_len > 0)
389  {
390  _queue_buf_cond.wait();
391  }
392 
393  // copy the new data into the buffer, but leave out the first byte (header)
394  ::memcpy(&_queue_buf[0], buf, len);
395 
396  // store the buffer length
397  _queue_buf_len = len;
398 
399  // notify waiting threads
400  _queue_buf_cond.signal();
402  throw DatagramException("stream aborted");
403  }
404  }
405 
406  void DatagramConnection::Stream::close()
407  {
408  ibrcommon::MutexLock l(_queue_buf_cond);
409  _abort = true;
410  _queue_buf_cond.abort();
411  }
412 
413  int DatagramConnection::Stream::sync()
414  {
415  // Here we know we get the last segment. Mark it so.
416  _last_segment = true;
417 
418  int ret = std::char_traits<char>::eq_int_type(this->overflow(
419  std::char_traits<char>::eof()), std::char_traits<char>::eof()) ? -1
420  : 0;
421 
422  // initialize the first byte with SEGMENT_FIRST flag
423  _last_segment = false;
424 
425  return ret;
426  }
427 
428  std::char_traits<char>::int_type DatagramConnection::Stream::overflow(std::char_traits<char>::int_type c)
429  {
431 
432  if (_abort) throw DatagramException("stream aborted");
433 
434  char *ibegin = &_out_buf[0];
435  char *iend = pptr();
436 
437  // mark the buffer for outgoing data as free
438  // the +1 sparse the first byte in the buffer and leave room
439  // for the processing flags of the segment
440  setp(&_out_buf[0], &_out_buf[0] + _buf_size - 1);
441 
442  if (!std::char_traits<char>::eq_int_type(c, std::char_traits<char>::eof()))
443  {
444  *iend++ = std::char_traits<char>::to_char_type(c);
445  }
446 
447  // bytes to send
448  const dtn::data::Length bytes = (iend - ibegin);
449 
450  // if there is nothing to send, just return
451  if (bytes == 0)
452  {
453  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 35) << "Stream::overflow() nothing to sent" << IBRCOMMON_LOGGER_ENDL;
454  return std::char_traits<char>::not_eof(c);
455  }
456 
457  try {
458  // Send segment to CL, use callback interface
459  _callback.stream_send(&_out_buf[0], bytes, _last_segment);
460  } catch (const DatagramException &ex) {
461  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 35) << "Stream::overflow() exception: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
462 
463  // close this stream
464  close();
465 
466  // re-throw the DatagramException
467  throw;
468  }
469 
470  return std::char_traits<char>::not_eof(c);
471  }
472 
473  std::char_traits<char>::int_type DatagramConnection::Stream::underflow()
474  {
476 
477  try {
478  ibrcommon::MutexLock l(_queue_buf_cond);
479 
480  while (_queue_buf_len == 0)
481  {
482  if (_abort) throw ibrcommon::Exception("stream aborted");
483  _queue_buf_cond.wait();
484  }
485 
486  // copy the queue buffer to an internal buffer
487  ::memcpy(&_in_buf[0], &_queue_buf[0], _queue_buf_len);
488 
489  // Since the input buffer content is now valid (or is new)
490  // the get pointer should be initialized (or reset).
491  setg(&_in_buf[0], &_in_buf[0], &_in_buf[0] + _queue_buf_len);
492 
493  // mark the queue buffer as free
494  _queue_buf_len = 0;
495  _queue_buf_cond.signal();
496 
497  return std::char_traits<char>::not_eof(_in_buf[0]);
499  throw DatagramException("stream aborted");
500  }
501  }
502 
503  DatagramConnection::Sender::Sender(DatagramConnection &conn, Stream &stream)
504  : _stream(stream), _connection(conn)
505  {
506  }
507 
508  DatagramConnection::Sender::~Sender()
509  {
510  }
511 
512  void DatagramConnection::Sender::run() throw ()
513  {
515 
516  try {
517  // get reference to the storage
519 
520  // create a standard serializer
521  dtn::data::DefaultSerializer serializer(_stream);
522 
523  // as long as the stream is marked as good ...
524  while(_stream.good())
525  {
526  // get the next job
527  dtn::net::BundleTransfer job = queue.getnpop(true);
528 
529  try {
530  // read the bundle out of the storage
531  const dtn::data::Bundle bundle = storage.get(job.getBundle());
532 
533  // write the bundle into the stream
534  serializer << bundle; _stream.flush();
535 
536  // the transmission was successful if the stream is still marked as good
537  if (_stream.good())
538  {
539  // bundle send completely - raise bundle event
540  job.complete();
541  }
542  } catch (const dtn::storage::NoBundleFoundException&) {
543  // could not load the bundle, abort the job
545  }
546  }
547 
548  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 25) << "Sender::run() stream destroyed"<< IBRCOMMON_LOGGER_ENDL;
549  } catch (const ibrcommon::QueueUnblockedException &ex) {
550  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 25) << "Sender::run() exception: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
551  return;
552  } catch (std::exception &ex) {
553  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 25) << "Sender::run() exception: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
554  }
555  }
556 
557  void DatagramConnection::Sender::finally() throw ()
558  {
559  }
560 
561  void DatagramConnection::Sender::__cancellation() throw ()
562  {
563  // abort all blocking operations on the stream
564  _stream.close();
565 
566  // abort blocking calls on the queue
567  queue.abort();
568  }
569  } /* namespace data */
570 } /* namespace dtn */