IBR-DTNSuite  0.12
ProphetRoutingExtension.cpp
Go to the documentation of this file.
1 /*
2  * ProphetRoutingExtension.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
24 
25 #include "core/BundleCore.h"
26 #include "core/EventDispatcher.h"
27 
28 #include <algorithm>
29 #include <memory>
30 
35 #include "net/ConnectionEvent.h"
36 #include "core/TimeEvent.h"
37 #include "core/NodeEvent.h"
38 #include "core/BundlePurgeEvent.h"
39 #include "core/BundleEvent.h"
40 
41 #include <ibrcommon/Logger.h>
43 
44 #include <ibrdtn/data/SDNV.h>
45 #include <ibrdtn/data/Exceptions.h>
46 #include <ibrdtn/utils/Clock.h>
47 
48 namespace dtn
49 {
50  namespace routing
51  {
52  const std::string ProphetRoutingExtension::TAG = "ProphetRoutingExtension";
53 
54  ProphetRoutingExtension::ProphetRoutingExtension(ForwardingStrategy *strategy, float p_encounter_max, float p_encounter_first, float p_first_threshold,
55  float beta, float gamma, float delta, ibrcommon::Timer::time_t time_unit, ibrcommon::Timer::time_t i_typ,
56  dtn::data::Timestamp next_exchange_timeout)
57  : _deliveryPredictabilityMap(time_unit, beta, gamma),
58  _forwardingStrategy(strategy), _next_exchange_timeout(next_exchange_timeout), _next_exchange_timestamp(0),
59  _p_encounter_max(p_encounter_max), _p_encounter_first(p_encounter_first),
60  _p_first_threshold(p_first_threshold), _delta(delta), _i_typ(i_typ)
61  {
62  // assign myself to the forwarding strategy
63  strategy->setProphetRouter(this);
64 
65  // set value for local EID to 1.0
66  _deliveryPredictabilityMap.set(core::BundleCore::local, 1.0);
67 
68  // define the first exchange timestamp
69  _next_exchange_timestamp = dtn::utils::Clock::getMonotonicTimestamp() + _next_exchange_timeout;
70 
71  // write something to the syslog
72  IBRCOMMON_LOGGER_TAG(ProphetRoutingExtension::TAG, info) << "Initializing PRoPHET routing module" << IBRCOMMON_LOGGER_ENDL;
73  }
74 
76  {
77  stop();
78  join();
79  delete _forwardingStrategy;
80  }
81 
83  {
86 
87  // request summary vector to exclude bundles known by the peer
89  }
90 
92  {
94  {
95  ibrcommon::MutexLock l(_deliveryPredictabilityMap);
96  age();
97  response.addItem(new DeliveryPredictabilityMap(_deliveryPredictabilityMap));
98  }
100  {
101  ibrcommon::MutexLock l(_acknowledgementSet);
102  response.addItem(new AcknowledgementSet(_acknowledgementSet));
103  }
104  }
105 
107  {
108  /* ignore neighbors, that have our EID */
109  if (neighbor.sameHost(dtn::core::BundleCore::local)) return;
110 
111  try {
112  const DeliveryPredictabilityMap& neighbor_dp_map = response.get<DeliveryPredictabilityMap>();
113 
114  // strip possible application part off the neighbor EID
115  const dtn::data::EID neighbor_node = neighbor.getNode();
116 
117  IBRCOMMON_LOGGER_DEBUG_TAG(ProphetRoutingExtension::TAG, 10) << "delivery predictability map received from " << neighbor_node.getString() << IBRCOMMON_LOGGER_ENDL;
118 
119  // store a copy of the map in the neighbor database
120  try {
121  NeighborDatabase &db = (**this).getNeighborDB();
122  NeighborDataset ds(new DeliveryPredictabilityMap(neighbor_dp_map));
123 
124  ibrcommon::MutexLock l(db);
125  db.get(neighbor_node).putDataset(ds);
126  } catch (const NeighborNotAvailableException&) { };
127 
128  /* update predictability for this neighbor */
129  updateNeighbor(neighbor_node, neighbor_dp_map);
130  } catch (std::exception&) { }
131 
132  try {
133  const AcknowledgementSet& neighbor_ack_set = response.get<AcknowledgementSet>();
134 
135  IBRCOMMON_LOGGER_DEBUG_TAG(ProphetRoutingExtension::TAG, 10) << "ack'set received from " << neighbor.getString() << IBRCOMMON_LOGGER_ENDL;
136 
137  // merge ack'set into the known bundles
138  for (AcknowledgementSet::const_iterator it = neighbor_ack_set.begin(); it != neighbor_ack_set.end(); ++it) {
139  (**this).setKnown(*it);
140  }
141 
142  // merge the received ack set with the local one
143  {
144  ibrcommon::MutexLock l(_acknowledgementSet);
145  _acknowledgementSet.merge(neighbor_ack_set);
146  }
147 
148  /* remove acknowledged bundles from bundle store if we do not have custody */
149  dtn::storage::BundleStorage &storage = (**this).getStorage();
150 
151  class BundleFilter : public dtn::storage::BundleSelector
152  {
153  public:
154  BundleFilter(const AcknowledgementSet& neighbor_ack_set)
155  : _ackset(neighbor_ack_set)
156  {}
157 
158  virtual ~BundleFilter() {}
159 
160  virtual dtn::data::Size limit() const throw () { return 0; }
161 
162  virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const throw (dtn::storage::BundleSelectorException)
163  {
164  // do not delete any bundles with
166  return false;
167 
168  if(!_ackset.has(meta))
169  return false;
170 
171  return true;
172  }
173 
174  private:
175  const AcknowledgementSet& _ackset;
176  } filter(neighbor_ack_set);
177 
179  storage.get(filter, removeList);
180 
181  for (std::list<dtn::data::MetaBundle>::const_iterator it = removeList.begin(); it != removeList.end(); ++it)
182  {
183  const dtn::data::MetaBundle &meta = (*it);
184 
186  {
188  IBRCOMMON_LOGGER_DEBUG_TAG(ProphetRoutingExtension::TAG, 10) << "Bundle removed due to prophet ack: " << meta.toString() << IBRCOMMON_LOGGER_ENDL;
189  }
190  else
191  {
192  IBRCOMMON_LOGGER_TAG(ProphetRoutingExtension::TAG, warning) << neighbor.getString() << " requested to purge a bundle with a non-singleton destination: " << meta.toString() << IBRCOMMON_LOGGER_ENDL;
193  }
194 
195  /* generate a report */
197  }
198  } catch (const dtn::storage::NoBundleFoundException&) {
199  } catch (std::exception&) { }
200  }
201 
203  {
204  // transfer the next bundle to this destination
205  _taskqueue.push( new SearchNextBundleTask( peer ) );
206  }
207 
209  {
210  // add forwarded entry to GTMX strategy
211  try {
212  GTMX_Strategy &gtmx = dynamic_cast<GTMX_Strategy&>(*_forwardingStrategy);
213  gtmx.addForward(meta);
214  } catch (const std::bad_cast &ex) { };
215  }
216 
218  {
219  // new bundles trigger a recheck for all neighbors
220  const std::set<dtn::core::Node> nl = dtn::core::BundleCore::getInstance().getConnectionManager().getNeighbors();
221 
222  for (std::set<dtn::core::Node>::const_iterator iter = nl.begin(); iter != nl.end(); ++iter)
223  {
224  const dtn::core::Node &n = (*iter);
225 
226  if (n.getEID() != peer)
227  {
228  // trigger all routing modules to search for bundles to forward
229  eventDataChanged(n.getEID());
230  }
231  }
232  }
233 
235  {
236  try {
237  const dtn::core::TimeEvent &time = dynamic_cast<const dtn::core::TimeEvent&>(*evt);
238 
239  // expire bundles in the acknowledgement set
240  {
241  ibrcommon::MutexLock l(_acknowledgementSet);
242  _acknowledgementSet.expire(time.getTimestamp());
243  }
244 
245  ibrcommon::MutexLock l(_next_exchange_mutex);
247 
248  if ((_next_exchange_timestamp > 0) && (_next_exchange_timestamp < now))
249  {
250  _taskqueue.push( new NextExchangeTask() );
251 
252  // define the next exchange timestamp
253  _next_exchange_timestamp = now + _next_exchange_timeout;
254  }
255  return;
256  } catch (const std::bad_cast&) { };
257 
258  try {
259  const NodeHandshakeEvent &handshake = dynamic_cast<const NodeHandshakeEvent&>(*evt);
260 
262  {
263  // transfer the next bundle to this destination
264  _taskqueue.push( new SearchNextBundleTask( handshake.peer ) );
265  }
266  else if (handshake.state == NodeHandshakeEvent::HANDSHAKE_COMPLETED)
267  {
268  // transfer the next bundle to this destination
269  _taskqueue.push( new SearchNextBundleTask( handshake.peer ) );
270  }
271  return;
272  } catch (const std::bad_cast&) { };
273 
274  try {
275  const dtn::core::BundlePurgeEvent &purge = dynamic_cast<const dtn::core::BundlePurgeEvent&>(*evt);
276 
278  {
279  /* the bundle was finally delivered, mark it as acknowledged */
280  ibrcommon::MutexLock l(_acknowledgementSet);
281  _acknowledgementSet.add(purge.bundle);
282  }
283 
284  // since no routing module is interested in purge events yet - we exit here
285  return;
286  } catch (const std::bad_cast&) { }
287  }
288 
290  {
294 
295  // reset task queue
296  _taskqueue.reset();
297 
298  // routine checked for throw() on 15.02.2013
299  try {
300  // run the thread
301  start();
302  } catch (const ibrcommon::ThreadException &ex) {
303  IBRCOMMON_LOGGER_TAG(ProphetRoutingExtension::TAG, error) << "componentUp failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
304  }
305  }
306 
308  {
312 
313  try {
314  // stop the thread
315  stop();
316  join();
317  } catch (const ibrcommon::ThreadException &ex) {
318  IBRCOMMON_LOGGER_TAG(ProphetRoutingExtension::TAG, error) << "componentDown failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
319  }
320  }
321 
323  {
324  {
325  ibrcommon::MutexLock l(_deliveryPredictabilityMap);
326  age();
327  }
328  return ibrcommon::ThreadsafeReference<DeliveryPredictabilityMap>(_deliveryPredictabilityMap, _deliveryPredictabilityMap);
329  }
330 
332  {
333  return ibrcommon::ThreadsafeReference<const DeliveryPredictabilityMap>(_deliveryPredictabilityMap, const_cast<DeliveryPredictabilityMap&>(_deliveryPredictabilityMap));
334  }
335 
337  {
338  return ibrcommon::ThreadsafeReference<const AcknowledgementSet>(_acknowledgementSet, const_cast<AcknowledgementSet&>(_acknowledgementSet));
339  }
340 
341  void ProphetRoutingExtension::ProphetRoutingExtension::run() throw ()
342  {
343  class BundleFilter : public dtn::storage::BundleSelector
344  {
345  public:
346  BundleFilter(const NeighborDatabase::NeighborEntry &entry, ForwardingStrategy &strategy, const DeliveryPredictabilityMap &dpm, const std::set<dtn::core::Node> &neighbors)
347  : _entry(entry), _strategy(strategy), _dpm(dpm), _neighbors(neighbors)
348  { };
349 
350  virtual ~BundleFilter() {};
351 
352  virtual dtn::data::Size limit() const throw () { return _entry.getFreeTransferSlots(); };
353 
354  virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const throw (dtn::storage::BundleSelectorException)
355  {
356  // check Scope Control Block - do not forward bundles with hop limit == 0
357  if (meta.hopcount == 0)
358  {
359  return false;
360  }
361 
362  // do not forward local bundles
363  if ((meta.destination.getNode() == dtn::core::BundleCore::local)
365  )
366  {
367  return false;
368  }
369 
370  // check Scope Control Block - do not forward non-group bundles with hop limit <= 1
371  if ((meta.hopcount <= 1) && (meta.get(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON)))
372  {
373  return false;
374  }
375 
376  // do not forward bundles addressed to this neighbor,
377  // because this is handled by neighbor routing extension
378  if (_entry.eid == meta.destination.getNode())
379  {
380  return false;
381  }
382 
383  // if this is a singleton bundle ...
385  {
386  const dtn::core::Node n(meta.destination.getNode());
387 
388  // do not forward the bundle if the final destination is available
389  if (_neighbors.find(n) != _neighbors.end())
390  {
391  return false;
392  }
393  }
394  else
395  {
396  // if this is a non-singleton, check if the peer knows a way to the source
397  try {
398  if (_dpm.get(meta.source.getNode()) <= 0.0) return false;
400  return false;
401  }
402  }
403 
404  // do not forward bundles already known by the destination
405  // throws BloomfilterNotAvailableException if no filter is available or it is expired
406  try {
407  if (_entry.has(meta, true))
408  {
409  return false;
410  }
413  }
414 
415  // ask the routing strategy if this bundle should be selected
417  {
418  return _strategy.shallForward(_dpm, meta);
419  }
420 
421  return true;
422  }
423 
424  private:
425  const NeighborDatabase::NeighborEntry &_entry;
426  const ForwardingStrategy &_strategy;
427  const DeliveryPredictabilityMap &_dpm;
428  const std::set<dtn::core::Node> &_neighbors;
429  };
430 
431  // list for bundles
433 
434  // set of known neighbors
435  std::set<dtn::core::Node> neighbors;
436 
437  while (true)
438  {
439  try {
440  Task *t = _taskqueue.getnpop(true);
441  std::auto_ptr<Task> killer(t);
442 
443  IBRCOMMON_LOGGER_DEBUG_TAG(ProphetRoutingExtension::TAG, 50) << "processing task " << t->toString() << IBRCOMMON_LOGGER_ENDL;
444 
445  try {
451  try {
452  SearchNextBundleTask &task = dynamic_cast<SearchNextBundleTask&>(*t);
453 
454  // lock the neighbor database while searching for bundles
455  try {
456  NeighborDatabase &db = (**this).getNeighborDB();
457 
458  ibrcommon::MutexLock l(db);
459  NeighborDatabase::NeighborEntry &entry = db.get(task.eid, true);
460 
461  // check if enough transfer slots available (threshold reached)
462  if (!entry.isTransferThresholdReached())
463  throw NeighborDatabase::NoMoreTransfersAvailable();
464 
465  // get the DeliveryPredictabilityMap of the potentially next hop
466  const DeliveryPredictabilityMap &dpm = entry.getDataset<DeliveryPredictabilityMap>();
467 
469  // get current neighbor list
471  } else {
472  // "prefer direct" option disabled - clear the list of neighbors
473  neighbors.clear();
474  }
475 
476  // get the bundle filter of the neighbor
477  const BundleFilter filter(entry, *_forwardingStrategy, dpm, neighbors);
478 
479  // some debug output
480  IBRCOMMON_LOGGER_DEBUG_TAG(ProphetRoutingExtension::TAG, 40) << "search some bundles not known by " << task.eid.getString() << IBRCOMMON_LOGGER_ENDL;
481 
482  // query some unknown bundle from the storage, the list contains max. 10 items.
483  list.clear();
484  (**this).getSeeker().get(filter, list);
485  } catch (const NeighborDatabase::DatasetNotAvailableException&) {
486  // if there is no DeliveryPredictabilityMap for the next hop
487  // perform a routing handshake with the peer
488  (**this).doHandshake(task.eid);
489  } catch (const dtn::storage::BundleSelectorException&) {
490  // query a new summary vector from this neighbor
491  (**this).doHandshake(task.eid);
492  }
493 
494  // send the bundles as long as we have resources
495  for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); ++iter)
496  {
497  const dtn::data::MetaBundle &meta = (*iter);
498 
499  try {
500  transferTo(task.eid, meta);
501  } catch (const NeighborDatabase::AlreadyInTransitException&) { };
502  }
503  } catch (const NeighborDatabase::NoMoreTransfersAvailable &ex) {
504  IBRCOMMON_LOGGER_DEBUG_TAG(ProphetRoutingExtension::TAG, 10) << "task " << t->toString() << " aborted: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
505  } catch (const NeighborDatabase::NeighborNotAvailableException &ex) {
506  IBRCOMMON_LOGGER_DEBUG_TAG(ProphetRoutingExtension::TAG, 10) << "task " << t->toString() << " aborted: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
507  } catch (const dtn::storage::NoBundleFoundException &ex) {
508  IBRCOMMON_LOGGER_DEBUG_TAG(ProphetRoutingExtension::TAG, 10) << "task " << t->toString() << " aborted: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
509  } catch (const std::bad_cast&) { }
510 
515  try {
516  dynamic_cast<NextExchangeTask&>(*t);
517 
518  std::set<dtn::core::Node> neighbors = dtn::core::BundleCore::getInstance().getConnectionManager().getNeighbors();
519  std::set<dtn::core::Node>::const_iterator it;
520  for(it = neighbors.begin(); it != neighbors.end(); ++it)
521  {
522  try{
523  (**this).doHandshake(it->getEID());
524  } catch (const ibrcommon::Exception &ex) { }
525  }
526  } catch (const std::bad_cast&) { }
527 
528  } catch (const ibrcommon::Exception &ex) {
529  IBRCOMMON_LOGGER_DEBUG_TAG(ProphetRoutingExtension::TAG, 20) << "task failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
530  }
531  } catch (const std::exception &ex) {
532  IBRCOMMON_LOGGER_DEBUG_TAG(ProphetRoutingExtension::TAG, 15) << "terminated due to " << ex.what() << IBRCOMMON_LOGGER_ENDL;
533  return;
534  }
535 
536  yield();
537  }
538  }
539 
541  {
542  _taskqueue.abort();
543  }
544 
545  float ProphetRoutingExtension::p_encounter(const dtn::data::EID &neighbor) const
546  {
547  age_map::const_iterator it = _ageMap.find(neighbor);
548  if(it == _ageMap.end())
549  {
550  /* In this case, we got a transitive update for the node we have not encountered before */
551  return _p_encounter_max;
552  }
553 
555  const dtn::data::Timestamp time_diff = currentTime - it->second;
556 #ifdef __DEVELOPMENT_ASSERTIONS__
557  assert(currentTime >= it->second && "the ageMap timestamp should be smaller than the current timestamp");
558 #endif
559  if(time_diff > _i_typ)
560  {
561  return _p_encounter_max;
562  }
563  else
564  {
565  return _p_encounter_max * time_diff.get<float>() / static_cast<float>(_i_typ);
566  }
567  }
568 
569  void ProphetRoutingExtension::updateNeighbor(const dtn::data::EID &neighbor, const DeliveryPredictabilityMap& neighbor_dp_map)
570  {
571  // update the encounter on every routing handshake
572  ibrcommon::MutexLock l(_deliveryPredictabilityMap);
573 
574  // age the local predictability map
575  age();
576 
580  try {
581  float neighbor_dp = _deliveryPredictabilityMap.get(neighbor);
582 
583  if (neighbor_dp < _p_first_threshold)
584  {
585  neighbor_dp = _p_encounter_first;
586  }
587  else
588  {
589  neighbor_dp += (1 - _delta - neighbor_dp) * p_encounter(neighbor);
590  }
591 
592  _deliveryPredictabilityMap.set(neighbor, neighbor_dp);
593  } catch (const DeliveryPredictabilityMap::ValueNotFoundException&) {
594  _deliveryPredictabilityMap.set(neighbor, _p_encounter_first);
595  }
596 
597  _ageMap[neighbor] = dtn::utils::Clock::getMonotonicTimestamp();
598 
599  /* update the dp_map */
600  _deliveryPredictabilityMap.update(neighbor, neighbor_dp_map, _p_encounter_first);
601  }
602 
603  void ProphetRoutingExtension::age()
604  {
605  _deliveryPredictabilityMap.age(_p_first_threshold);
606  }
607 
608  ProphetRoutingExtension::SearchNextBundleTask::SearchNextBundleTask(const dtn::data::EID &eid)
609  : eid(eid)
610  {
611  }
612 
613  ProphetRoutingExtension::SearchNextBundleTask::~SearchNextBundleTask()
614  {
615  }
616 
617  std::string ProphetRoutingExtension::SearchNextBundleTask::toString() const
618  {
619  return "SearchNextBundleTask: " + eid.getString();
620  }
621 
622  ProphetRoutingExtension::NextExchangeTask::NextExchangeTask()
623  {
624  }
625 
626  ProphetRoutingExtension::NextExchangeTask::~NextExchangeTask()
627  {
628  }
629 
630  std::string ProphetRoutingExtension::NextExchangeTask::toString() const
631  {
632  return "NextExchangeTask";
633  }
634 
636  {
637  }
638 
640  {
641  }
642 
644  {
645  return neighborDPIsGreater(neighbor_dpm, bundle.destination);
646  }
647 
649  : _NF_max(NF_max)
650  {
651  }
652 
654  {
655  }
656 
658  {
659  nf_map::iterator nf_it = _NF_map.find(id);
660 
661  if (nf_it == _NF_map.end()) {
662  nf_it = _NF_map.insert(std::make_pair(id, 0)).first;
663  }
664 
665  ++nf_it->second;
666  }
667 
669  {
670  unsigned int NF = 0;
671 
672  nf_map::const_iterator nf_it = _NF_map.find(bundle);
673  if(nf_it != _NF_map.end()) {
674  NF = nf_it->second;
675  }
676 
677  if (NF > _NF_max) return false;
678 
679  return neighborDPIsGreater(neighbor_dpm, bundle.destination);
680  }
681 
682  } // namespace routing
683 } // namespace dtn