IBR-DTNSuite  0.12
DatagramConvergenceLayer.cpp
Go to the documentation of this file.
1 /*
2  * DatagramConvergenceLayer.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
23 #include "net/DatagramConnection.h"
24 
25 #include "core/BundleCore.h"
26 #include "core/NodeEvent.h"
27 #include "core/EventDispatcher.h"
28 
29 #include <ibrcommon/Logger.h>
32 
33 #include <string.h>
34 #include <vector>
35 
36 namespace dtn
37 {
38  namespace net
39  {
40  const std::string DatagramConvergenceLayer::TAG = "DatagramConvergenceLayer";
41 
43  : _service(ds), _receiver(*this), _running(false),
44  _stats_in(0), _stats_out(0), _stats_rtt(0.0), _stats_retries(0), _stats_failure(0)
45  {
46  }
47 
49  {
50  // wait until the component thread is terminated
51  join();
52 
53  // wait until all connections are down
54  {
55  ibrcommon::MutexLock l(_cond_connections);
56  while (_connections.size() > 0) _cond_connections.wait();
57  }
58 
59  // delete the associated service
60  delete _service;
61  }
62 
64  {
65  try {
66  const dtn::core::NodeEvent &event = dynamic_cast<const dtn::core::NodeEvent&>(*evt);
67 
68  if (event.getAction() == NODE_UNAVAILABLE)
69  {
70  NodeGone *gone = new NodeGone();
71  gone->eid = event.getNode().getEID();
72  _action_queue.push(gone);
73  }
74  } catch (const std::bad_cast&) { };
75  }
76 
78  {
79  _stats_in = 0;
80  _stats_out = 0;
81  _stats_rtt = 0.0;
82  _stats_retries = 0;
83  _stats_failure = 0;
84  }
85 
87  {
88  std::stringstream ss_format;
89 
90  static const std::string IN_TAG = dtn::core::Node::toString(getDiscoveryProtocol()) + "|in";
91  static const std::string OUT_TAG = dtn::core::Node::toString(getDiscoveryProtocol()) + "|out";
92 
93  static const std::string RTT_TAG = dtn::core::Node::toString(getDiscoveryProtocol()) + "|rtt";
94  static const std::string RETRIES_TAG = dtn::core::Node::toString(getDiscoveryProtocol()) + "|retries";
95  static const std::string FAIL_TAG = dtn::core::Node::toString(getDiscoveryProtocol()) + "|fail";
96 
97  ss_format << _stats_in;
98  data[IN_TAG] = ss_format.str();
99  ss_format.str("");
100 
101  ss_format << _stats_out;
102  data[OUT_TAG] = ss_format.str();
103  ss_format.str("");
104 
105  ss_format << _stats_rtt;
106  data[RTT_TAG] = ss_format.str();
107  ss_format.str("");
108 
109  ss_format << _stats_retries;
110  data[RETRIES_TAG] = ss_format.str();
111  ss_format.str("");
112 
113  ss_format << _stats_failure;
114  data[FAIL_TAG] = ss_format.str();
115  }
116 
118  {
119  return _service->getProtocol();
120  }
121 
122  void DatagramConvergenceLayer::callback_send(DatagramConnection&, const char &flags, const unsigned int &seqno, const std::string &destination, const char *buf, const dtn::data::Length &len) throw (DatagramException)
123  {
124  // only on sender at once
125  ibrcommon::MutexLock l(_send_lock);
126 
127  // forward the send request to DatagramService
128  _service->send(HEADER_SEGMENT, flags, seqno, destination, buf, len);
129 
130  // traffic monitoring
131  _stats_out += len;
132  }
133 
134  void DatagramConvergenceLayer::callback_ack(DatagramConnection&, const unsigned int &seqno, const std::string &destination) throw (DatagramException)
135  {
136  // only on sender at once
137  ibrcommon::MutexLock l(_send_lock);
138 
139  // forward the send request to DatagramService
140  _service->send(HEADER_ACK, 0, seqno, destination, NULL, 0);
141  }
142 
143  void DatagramConvergenceLayer::callback_nack(DatagramConnection&, const unsigned int &seqno, const std::string &destination) throw (DatagramException)
144  {
145  // only on sender at once
146  ibrcommon::MutexLock l(_send_lock);
147 
148  // forward the send request to DatagramService
149  _service->send(HEADER_NACK, 0, seqno, destination, NULL, 0);
150  }
151 
153  {
154  // do not queue any new jobs if the convergence layer goes down
155  if (!_running) return;
156 
157  const std::list<dtn::core::Node::URI> uri_list = node.get(_service->getProtocol());
158  if (uri_list.empty()) return;
159 
160  // get the first element of the result
161  const dtn::core::Node::URI &uri = uri_list.front();
162 
163  // some debugging
164  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConvergenceLayer::TAG, 10) << "job queued for " << node.getEID().getString() << IBRCOMMON_LOGGER_ENDL;
165 
166  QueueBundle *queue = new QueueBundle(job);
167  queue->uri = uri.value;
168 
169  _action_queue.push( queue );
170  }
171 
172  DatagramConnection& DatagramConvergenceLayer::getConnection(const std::string &identifier, bool create) throw (ConnectionNotAvailableException)
173  {
174  DatagramConnection *connection = NULL;
175 
176  // Test if connection for this address already exist
177  for(connection_list::const_iterator i = _connections.begin(); i != _connections.end(); ++i)
178  {
179  if ((*i)->getIdentifier() == identifier)
180  return *(*i);
181  }
182 
183  // throw exception if we should not create new connections
184  if (!create) throw ConnectionNotAvailableException();
185 
186  // Connection does not exist, create one and put it into the list
187  connection = new DatagramConnection(identifier, _service->getParameter(), (*this));
188 
189  // increment the number of active connections
190  {
191  ibrcommon::MutexLock l(_cond_connections);
192 
193  // add a new connection to the list of connections
194  _connections.push_back(connection);
195 
196  // signal the modified connection list
197  _cond_connections.signal(true);
198  }
199 
200  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConvergenceLayer::TAG, 10) << "Selected identifier: " << connection->getIdentifier() << IBRCOMMON_LOGGER_ENDL;
201  connection->start();
202  return *connection;
203  }
204 
205  void DatagramConvergenceLayer::reportSuccess(size_t retries, double rtt)
206  {
207  _stats_rtt = rtt;
208  _stats_retries += retries;
209  }
210 
212  {
213  _stats_failure++;
214  }
215 
217  {
218  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConvergenceLayer::TAG, 10) << "Up: " << conn->getIdentifier() << IBRCOMMON_LOGGER_ENDL;
219  }
220 
222  {
223  ConnectionDown *cd = new ConnectionDown();
224  cd->id = conn->getIdentifier();
225  _action_queue.push(cd);
226  }
227 
229  {
230  // routine checked for throw() on 15.02.2013
231  try {
232  _service->bind();
233  } catch (const std::exception &e) {
234  IBRCOMMON_LOGGER_TAG(DatagramConvergenceLayer::TAG, error) << "bind to " << _service->getInterface().toString() << " failed (" << e.what() << ")" << IBRCOMMON_LOGGER_ENDL;
235  }
236 
237  // register for NodeEvent objects
239 
240  // register for discovery beacon handling
242 
243  // set the running variable
244  _running = true;
245  }
246 
248  {
249  // un-register for discovery beacon handling
251 
252  // un-register for NodeEvent objects
254 
255  _action_queue.push(new Shutdown());
256  }
257 
259  {
260  // only handler beacons for this interface
261  if (iface != _service->getInterface()) return;
262 
263  // serialize announcement
264  stringstream ss;
265  ss << beacon;
266 
267  std::streamsize len = ss.str().size();
268 
269  try {
270  // only on sender at once
271  ibrcommon::MutexLock l(_send_lock);
272 
273  // forward the send request to DatagramService
274  _service->send(HEADER_BROADCAST, 0, 0, ss.str().c_str(), static_cast<dtn::data::Length>(len));
275  } catch (const DatagramException&) {
276  // ignore any send failure
277  };
278  }
279 
281  {
282  size_t maxlen = _service->getParameter().max_msg_length;
283  std::string address;
284  unsigned int seqno = 0;
285  char flags = 0;
286  char type = 0;
287  std::vector<char> data(maxlen);
288  size_t len = 0;
289 
290  // get the reference to the discovery agent
292 
293  while (_running)
294  {
295  try {
296  // Receive full frame from socket
297  len = _service->recvfrom(&data[0], maxlen, type, flags, seqno, address);
298 
299  // traffic monitoring
300  _stats_in += len;
301  } catch (const DatagramException &ex) {
302  if (_running) {
303  IBRCOMMON_LOGGER_TAG(DatagramConvergenceLayer::TAG, error) << "recvfrom() failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
304  }
305  _running = false;
306  break;
307  }
308 
309  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConvergenceLayer::TAG, 10) << "receive() Address: " << address << IBRCOMMON_LOGGER_ENDL;
310 
311  // Check for extended header and retrieve if available
312  if (type == HEADER_BROADCAST)
313  {
314  try {
315  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConvergenceLayer::TAG, 10) << "receive() Announcement received" << IBRCOMMON_LOGGER_ENDL;
316 
317  DiscoveryBeacon beacon = agent.obtainBeacon();
318 
319  stringstream ss;
320  ss.write(&data[0], len);
321  ss >> beacon;
322 
323  // ignore own beacons
324  if (beacon.getEID() == dtn::core::BundleCore::local) continue;
325 
326  if (beacon.isShort())
327  {
328  // can not generate node name from short beacons
329  continue;
330  }
331 
332  // add discovered service entry
333  beacon.addService(dtn::net::DiscoveryService(_service->getProtocol(), address));
334 
335  BeaconReceived *bc = new BeaconReceived();
336  bc->address = address;
337  bc->data = beacon;
338  _action_queue.push(bc);
339  } catch (const ibrcommon::Exception&) {
340  // catch wrong formats
341  }
342 
343  continue;
344  }
345  else if ( type == HEADER_SEGMENT )
346  {
347  SegmentReceived *seg = new SegmentReceived(maxlen);
348  seg->address = address;
349  seg->seqno = seqno;
350  seg->flags = flags;
351  seg->data = data;
352  seg->len = len;
353  _action_queue.push(seg);
354  }
355  else if ( type == HEADER_ACK )
356  {
357  AckReceived *ack = new AckReceived();
358  ack->address = address;
359  ack->seqno = seqno;
360  _action_queue.push(ack);
361  }
362  else if ( type == HEADER_NACK )
363  {
364  NackReceived *nack = new NackReceived();
365  nack->address = address;
366  nack->seqno = seqno;
367  nack->temporary = flags & DatagramService::NACK_TEMPORARY;
368  _action_queue.push(nack);
369  }
370  }
371  }
372 
374  {
375  // start receiver
376  _receiver.init();
377  _receiver.start();
378 
379  // get the reference to the discovery agent
381 
382  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConvergenceLayer::TAG, 10) << "componentRun() entered" << IBRCOMMON_LOGGER_ENDL;
383 
384  try {
385  while (_running || (_connections.size() > 0))
386  {
387  Action *action = _action_queue.getnpop(true);
388 
389  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConvergenceLayer::TAG, 10) << "processing task" << IBRCOMMON_LOGGER_ENDL;
390 
391  try {
392  AckReceived &ack = dynamic_cast<AckReceived&>(*action);
393 
394  try {
395  // Connection instance for this address
396  DatagramConnection& connection = getConnection(ack.address, false);
397 
398  IBRCOMMON_LOGGER_DEBUG_TAG(TAG, 20) << "ack received for seqno " << ack.seqno << IBRCOMMON_LOGGER_ENDL;
399 
400  // Decide in which queue to write based on the src address
401  connection.ack(ack.seqno);
402  } catch (const ConnectionNotAvailableException &ex) {
403  // connection does not exists - ignore the ACK
404  }
405  } catch (const std::bad_cast&) { };
406 
407  try {
408  NackReceived &nack = dynamic_cast<NackReceived&>(*action);
409 
410  // the peer refused the current bundle
411  try {
412  // Connection instance for this address
413  DatagramConnection& connection = getConnection(nack.address, false);
414 
415  IBRCOMMON_LOGGER_DEBUG_TAG(TAG, 20) << "nack received for seqno " << nack.seqno << IBRCOMMON_LOGGER_ENDL;
416 
417  // Decide in which queue to write based on the src address
418  connection.nack(nack.seqno, nack.temporary);
419  } catch (const ConnectionNotAvailableException &ex) {
420  // connection does not exists - ignore the NACK
421  }
422  } catch (const std::bad_cast&) { };
423 
424  try {
425  SegmentReceived &segment = dynamic_cast<SegmentReceived&>(*action);
426 
427  // Connection instance for this address
428  DatagramConnection& connection = getConnection(segment.address, true);
429 
430  try {
431  // Decide in which queue to write based on the src address
432  connection.queue(segment.flags, segment.seqno, &segment.data[0], segment.len);
433  } catch (const ibrcommon::Exception &ex) {
434  IBRCOMMON_LOGGER_TAG(DatagramConvergenceLayer::TAG, error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
435  connection.shutdown();
436  };
437  } catch (const std::bad_cast&) { };
438 
439  try {
440  BeaconReceived &beacon = dynamic_cast<BeaconReceived&>(*action);
441 
442  // Connection instance for this address
443  DatagramConnection& connection = getConnection(beacon.address, true);
444  connection.setPeerEID(beacon.data.getEID());
445 
446  // announce the received beacon
447  agent.onBeaconReceived(beacon.data);
448  } catch (const std::bad_cast&) { };
449 
450  try {
451  ConnectionDown &cd = dynamic_cast<ConnectionDown&>(*action);
452 
453  ibrcommon::MutexLock l(_cond_connections);
454  for (connection_list::iterator i = _connections.begin(); i != _connections.end(); ++i)
455  {
456  if ((*i)->getIdentifier() == cd.id)
457  {
458  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConvergenceLayer::TAG, 10) << "Down: " << cd.id << IBRCOMMON_LOGGER_ENDL;
459  _connections.erase(i);
460 
461  // delete the connection
462  delete (*i);
463 
464  // signal the modified connection list
465  _cond_connections.signal(true);
466  break;
467  }
468  }
469  } catch (const std::bad_cast&) { };
470 
471  try {
472  NodeGone &gone = dynamic_cast<NodeGone&>(*action);
473 
474  for (connection_list::iterator i = _connections.begin(); i != _connections.end(); ++i)
475  {
476  if ((*i)->getPeerEID() == gone.eid)
477  {
478  // shutdown the connection
479  (*i)->shutdown();
480  break;
481  }
482  }
483  } catch (const std::bad_cast&) { };
484 
485  try {
486  QueueBundle &queue = dynamic_cast<QueueBundle&>(*action);
487 
488  // get a new or the existing connection for this address
489  DatagramConnection &conn = getConnection( queue.uri, true );
490 
491  // queue the job to the connection
492  conn.queue(queue.job);
493  } catch (const std::bad_cast&) { };
494 
495  try {
496  dynamic_cast<Shutdown&>(*action);
497 
498  // shutdown all connections
499  for(connection_list::const_iterator i = _connections.begin(); i != _connections.end(); ++i)
500  {
501  (*i)->shutdown();
502  }
503 
504  _running = false;
505  } catch (const std::bad_cast&) { };
506 
507  // delete task object
508  delete action;
509 
510  yield();
511  }
512  } catch (const ibrcommon::QueueUnblockedException &ex) {
513  // unblocked
514  }
515 
516  _receiver.join();
517  }
518 
520  {
521  _service->shutdown();
522  }
523 
524  const std::string DatagramConvergenceLayer::getName() const
525  {
526  return DatagramConvergenceLayer::TAG;
527  }
528 
529  DatagramConvergenceLayer::Receiver::Receiver(DatagramConvergenceLayer &cl)
530  : _cl(cl)
531  {
532  }
533 
534  DatagramConvergenceLayer::Receiver::~Receiver()
535  {
536  }
537 
539  {
540  // reset receiver is necessary
541  if (JoinableThread::isFinalized()) JoinableThread::reset();
542  }
543 
544  void DatagramConvergenceLayer::Receiver::run() throw ()
545  {
546  _cl.receive();
547  }
548 
549  void DatagramConvergenceLayer::Receiver::__cancellation() throw ()
550  {
551  }
552  } /* namespace data */
553 } /* namespace dtn */