IBR-DTNSuite  0.10
ProphetRoutingExtension.cpp
Go to the documentation of this file.
1 /*
2  * ProphetRoutingExtension.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
24 
25 #include "core/BundleCore.h"
26 
27 #include <algorithm>
28 #include <memory>
29 
34 #include "net/ConnectionEvent.h"
35 #include "core/TimeEvent.h"
36 #include "core/NodeEvent.h"
37 #include "core/BundlePurgeEvent.h"
38 #include "core/BundleEvent.h"
39 
40 #include <ibrcommon/Logger.h>
42 
43 #include <ibrdtn/data/SDNV.h>
44 #include <ibrdtn/data/Exceptions.h>
45 #include <ibrdtn/utils/Clock.h>
46 
47 namespace dtn
48 {
49  namespace routing
50  {
51  const std::string ProphetRoutingExtension::TAG = "ProphetRoutingExtension";
52 
53  ProphetRoutingExtension::ProphetRoutingExtension(ForwardingStrategy *strategy, float p_encounter_max, float p_encounter_first, float p_first_threshold,
54  float beta, float gamma, float delta, ibrcommon::Timer::time_t time_unit, ibrcommon::Timer::time_t i_typ,
55  dtn::data::Timestamp next_exchange_timeout)
56  : _deliveryPredictabilityMap(time_unit, beta, gamma),
57  _forwardingStrategy(strategy), _next_exchange_timeout(next_exchange_timeout), _next_exchange_timestamp(0),
58  _p_encounter_max(p_encounter_max), _p_encounter_first(p_encounter_first),
59  _p_first_threshold(p_first_threshold), _delta(delta), _i_typ(i_typ)
60  {
61  // assign myself to the forwarding strategy
62  strategy->setProphetRouter(this);
63 
64  // set value for local EID to 1.0
65  _deliveryPredictabilityMap.set(core::BundleCore::local, 1.0);
66 
67  // define the first exchange timestamp
68  _next_exchange_timestamp = dtn::utils::Clock::getUnixTimestamp() + _next_exchange_timeout;
69 
70  // write something to the syslog
71  IBRCOMMON_LOGGER_TAG(ProphetRoutingExtension::TAG, info) << "Initializing PRoPHET routing module" << IBRCOMMON_LOGGER_ENDL;
72  }
73 
75  {
76  stop();
77  join();
78  delete _forwardingStrategy;
79  }
80 
82  {
85 
86  // request summary vector to exclude bundles known by the peer
88  }
89 
91  {
92  const dtn::data::EID neighbor_node = neighbor.getNode();
94  {
95  ibrcommon::MutexLock l(_deliveryPredictabilityMap);
96  age();
97  response.addItem(new DeliveryPredictabilityMap(_deliveryPredictabilityMap));
98  }
100  {
101  ibrcommon::MutexLock l(_acknowledgementSet);
102  response.addItem(new AcknowledgementSet(_acknowledgementSet));
103  }
104  }
105 
107  {
108  const dtn::data::EID neighbor_node = neighbor.getNode();
109 
110  /* ignore neighbors, that have our EID */
111  if (neighbor_node == dtn::core::BundleCore::local) return;
112 
113  try {
114  const DeliveryPredictabilityMap& neighbor_dp_map = response.get<DeliveryPredictabilityMap>();
115 
116  IBRCOMMON_LOGGER_DEBUG_TAG(ProphetRoutingExtension::TAG, 10) << "delivery predictability map received from " << neighbor.getString() << IBRCOMMON_LOGGER_ENDL;
117 
118  // update the encounter on every routing handshake
119  {
120  ibrcommon::MutexLock l(_deliveryPredictabilityMap);
121 
122  age();
123 
124  /* update predictability for this neighbor */
125  updateNeighbor(neighbor_node);
126  }
127 
128  // store a copy of the map in the neighbor database
129  {
130  NeighborDatabase &db = (**this).getNeighborDB();
131  NeighborDataset ds(new DeliveryPredictabilityMap(neighbor_dp_map));
132 
133  ibrcommon::MutexLock l(db);
134  db.create(neighbor_node).putDataset(ds);
135  }
136 
137  ibrcommon::MutexLock l(_deliveryPredictabilityMap);
138 
139  /* update the dp_map */
140  _deliveryPredictabilityMap.update(neighbor_node, neighbor_dp_map, _p_encounter_first);
141 
142  } catch (std::exception&) { }
143 
144  try {
145  const AcknowledgementSet& neighbor_ack_set = response.get<AcknowledgementSet>();
146 
147  IBRCOMMON_LOGGER_DEBUG_TAG(ProphetRoutingExtension::TAG, 10) << "ack'set received from " << neighbor.getString() << IBRCOMMON_LOGGER_ENDL;
148 
149  // merge ack'set into the known bundles
150  for (AcknowledgementSet::const_iterator it = _acknowledgementSet.begin(); it != _acknowledgementSet.end(); ++it) {
151  (**this).setKnown(*it);
152  }
153 
154  // merge the received ack set with the local one
155  {
156  ibrcommon::MutexLock l(_acknowledgementSet);
157  _acknowledgementSet.merge(neighbor_ack_set);
158  }
159 
160  /* remove acknowledged bundles from bundle store if we do not have custody */
161  dtn::storage::BundleStorage &storage = (**this).getStorage();
162 
163  class BundleFilter : public dtn::storage::BundleSelector
164  {
165  public:
166  BundleFilter(const AcknowledgementSet& neighbor_ack_set)
167  : _ackset(neighbor_ack_set)
168  {}
169 
170  virtual ~BundleFilter() {}
171 
172  virtual dtn::data::Size limit() const throw () { return 0; }
173 
174  virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const throw (dtn::storage::BundleSelectorException)
175  {
176  // do not delete any bundles with
178  return false;
179 
180  if(!_ackset.has(meta))
181  return false;
182 
183  return true;
184  }
185 
186  private:
187  const AcknowledgementSet& _ackset;
188  } filter(neighbor_ack_set);
189 
191  storage.get(filter, removeList);
192 
193  for (std::list<dtn::data::MetaBundle>::const_iterator it = removeList.begin(); it != removeList.end(); ++it)
194  {
195  const dtn::data::MetaBundle &meta = (*it);
196 
198  {
200  IBRCOMMON_LOGGER_DEBUG_TAG(ProphetRoutingExtension::TAG, 10) << "Bundle removed due to prophet ack: " << meta.toString() << IBRCOMMON_LOGGER_ENDL;
201  }
202  else
203  {
204  IBRCOMMON_LOGGER_TAG(ProphetRoutingExtension::TAG, warning) << neighbor.getString() << " requested to purge a bundle with a non-singleton destination: " << meta.toString() << IBRCOMMON_LOGGER_ENDL;
205  }
206 
207  /* generate a report */
209  }
210  } catch (const dtn::storage::NoBundleFoundException&) {
211  } catch (std::exception&) { }
212  }
213 
215  {
216  try {
217  const dtn::core::TimeEvent &time = dynamic_cast<const dtn::core::TimeEvent&>(*evt);
218 
219  // expire bundles in the acknowledgement set
220  {
221  ibrcommon::MutexLock l(_acknowledgementSet);
222  _acknowledgementSet.expire(time.getUnixTimestamp());
223  }
224 
225  ibrcommon::MutexLock l(_next_exchange_mutex);
226  if ((_next_exchange_timestamp > 0) && (_next_exchange_timestamp < time.getUnixTimestamp()))
227  {
228  _taskqueue.push( new NextExchangeTask() );
229 
230  // define the next exchange timestamp
231  _next_exchange_timestamp = time.getUnixTimestamp() + _next_exchange_timeout;
232  }
233  return;
234  } catch (const std::bad_cast&) { };
235 
236  // If an incoming bundle is received, forward it to all connected neighbors
237  try {
238  const QueueBundleEvent &queued = dynamic_cast<const QueueBundleEvent&>(*evt);
239 
240  // new bundles trigger a recheck for all neighbors
241  const std::set<dtn::core::Node> nl = dtn::core::BundleCore::getInstance().getConnectionManager().getNeighbors();
242 
243  for (std::set<dtn::core::Node>::const_iterator iter = nl.begin(); iter != nl.end(); ++iter)
244  {
245  const dtn::core::Node &n = (*iter);
246 
247  if (n.getEID() != queued.origin) {
248  // transfer the next bundle to this destination
249  _taskqueue.push( new SearchNextBundleTask( n.getEID() ) );
250  }
251  }
252  return;
253  } catch (const std::bad_cast&) { };
254 
255  // If a new neighbor comes available search for bundles
256  try {
257  const dtn::core::NodeEvent &nodeevent = dynamic_cast<const dtn::core::NodeEvent&>(*evt);
258  const dtn::core::Node &n = nodeevent.getNode();
259 
260  if (nodeevent.getAction() == NODE_AVAILABLE)
261  {
262  _taskqueue.push( new SearchNextBundleTask( n.getEID() ) );
263  }
264  else if (nodeevent.getAction() == NODE_DATA_ADDED)
265  {
266  _taskqueue.push( new SearchNextBundleTask( n.getEID() ) );
267  }
268 
269  return;
270  } catch (const std::bad_cast&) { };
271 
272  try {
273  const dtn::net::ConnectionEvent &ce = dynamic_cast<const dtn::net::ConnectionEvent&>(*evt);
274 
276  {
277  // send all (multi-hop) bundles in the storage to the neighbor
278  _taskqueue.push( new SearchNextBundleTask(ce.peer) );
279  }
280  return;
281  } catch (const std::bad_cast&) { };
282 
283  try {
284  const NodeHandshakeEvent &handshake = dynamic_cast<const NodeHandshakeEvent&>(*evt);
285 
287  {
288  // transfer the next bundle to this destination
289  _taskqueue.push( new SearchNextBundleTask( handshake.peer ) );
290  }
291  else if (handshake.state == NodeHandshakeEvent::HANDSHAKE_COMPLETED)
292  {
293  // transfer the next bundle to this destination
294  _taskqueue.push( new SearchNextBundleTask( handshake.peer ) );
295  }
296  return;
297  } catch (const std::bad_cast&) { };
298 
299  // The bundle transfer has been aborted
300  try {
301  const dtn::net::TransferAbortedEvent &aborted = dynamic_cast<const dtn::net::TransferAbortedEvent&>(*evt);
302 
303  // transfer the next bundle to this destination
304  _taskqueue.push( new SearchNextBundleTask( aborted.getPeer() ) );
305 
306  return;
307  } catch (const std::bad_cast&) { };
308 
309  // A bundle transfer was successful
310  try {
311  const dtn::net::TransferCompletedEvent &completed = dynamic_cast<const dtn::net::TransferCompletedEvent&>(*evt);
312  const dtn::data::MetaBundle &meta = completed.getBundle();
313  const dtn::data::EID &peer = completed.getPeer();
314 
315  if ((meta.destination.getNode() == peer.getNode())
316  /* send prophet ack only for singleton */
318  {
319  /* the bundle was transferred, mark it as acknowledged */
320  ibrcommon::MutexLock l(_acknowledgementSet);
321  _acknowledgementSet.add(meta);
322  }
323 
324  // add forwarded entry to GTMX strategy
325  try {
326  GTMX_Strategy &gtmx = dynamic_cast<GTMX_Strategy&>(*_forwardingStrategy);
327  gtmx.addForward(meta);
328  } catch (const std::bad_cast &ex) { };
329 
330  // search for the next bundle
331  _taskqueue.push( new SearchNextBundleTask( completed.getPeer() ) );
332  return;
333  } catch (const std::bad_cast&) { };
334  }
335 
337  {
338  // reset task queue
339  _taskqueue.reset();
340 
341  // routine checked for throw() on 15.02.2013
342  try {
343  // run the thread
344  start();
345  } catch (const ibrcommon::ThreadException &ex) {
346  IBRCOMMON_LOGGER_TAG(ProphetRoutingExtension::TAG, error) << "componentUp failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
347  }
348  }
349 
351  {
352  try {
353  // stop the thread
354  stop();
355  join();
356  } catch (const ibrcommon::ThreadException &ex) {
357  IBRCOMMON_LOGGER_TAG(ProphetRoutingExtension::TAG, error) << "componentDown failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
358  }
359  }
360 
362  {
363  {
364  ibrcommon::MutexLock l(_deliveryPredictabilityMap);
365  age();
366  }
367  return ibrcommon::ThreadsafeReference<DeliveryPredictabilityMap>(_deliveryPredictabilityMap, _deliveryPredictabilityMap);
368  }
369 
371  {
372  return ibrcommon::ThreadsafeReference<const DeliveryPredictabilityMap>(_deliveryPredictabilityMap, const_cast<DeliveryPredictabilityMap&>(_deliveryPredictabilityMap));
373  }
374 
376  {
377  return ibrcommon::ThreadsafeReference<const AcknowledgementSet>(_acknowledgementSet, const_cast<AcknowledgementSet&>(_acknowledgementSet));
378  }
379 
380  void ProphetRoutingExtension::ProphetRoutingExtension::run() throw ()
381  {
382  class BundleFilter : public dtn::storage::BundleSelector
383  {
384  public:
385  BundleFilter(const NeighborDatabase::NeighborEntry &entry, ForwardingStrategy &strategy, const DeliveryPredictabilityMap &dpm)
386  : _entry(entry), _strategy(strategy), _dpm(dpm)
387  { };
388 
389  virtual ~BundleFilter() {};
390 
391  virtual dtn::data::Size limit() const throw () { return _entry.getFreeTransferSlots(); };
392 
393  virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const throw (dtn::storage::BundleSelectorException)
394  {
395  // check Scope Control Block - do not forward bundles with hop limit == 0
396  if (meta.hopcount == 0)
397  {
398  return false;
399  }
400 
401  // do not forward any routing control message
402  // this is done by the neighbor routing module
403  if (isRouting(meta.source))
404  {
405  return false;
406  }
407 
408  // do not forward local bundles
409  if ((meta.destination.getNode() == dtn::core::BundleCore::local)
411  )
412  {
413  return false;
414  }
415 
416  // check Scope Control Block - do not forward non-group bundles with hop limit <= 1
417  if ((meta.hopcount <= 1) && (meta.get(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON)))
418  {
419  return false;
420  }
421 
422  // do not forward bundles addressed to this neighbor,
423  // because this is handled by neighbor routing extension
424  if (_entry.eid == meta.destination.getNode())
425  {
426  return false;
427  }
428 
429  // do not forward bundles already known by the destination
430  // throws BloomfilterNotAvailableException if no filter is available or it is expired
431  try {
432  if (_entry.has(meta, true))
433  {
434  return false;
435  }
438  }
439 
440  // ask the routing strategy if this bundle should be selected
442  {
443  return _strategy.shallForward(_dpm, meta);
444  }
445 
446  return true;
447  }
448 
449  private:
450  const NeighborDatabase::NeighborEntry &_entry;
451  const ForwardingStrategy &_strategy;
452  const DeliveryPredictabilityMap &_dpm;
453  };
454 
456 
457  while (true)
458  {
459  try {
460  Task *t = _taskqueue.getnpop(true);
461  std::auto_ptr<Task> killer(t);
462 
463  IBRCOMMON_LOGGER_DEBUG_TAG(ProphetRoutingExtension::TAG, 50) << "processing task " << t->toString() << IBRCOMMON_LOGGER_ENDL;
464 
465  try {
471  try {
472  SearchNextBundleTask &task = dynamic_cast<SearchNextBundleTask&>(*t);
473 
474  // lock the neighbor database while searching for bundles
475  try {
476  NeighborDatabase &db = (**this).getNeighborDB();
477 
478  ibrcommon::MutexLock l(db);
479  NeighborDatabase::NeighborEntry &entry = db.get(task.eid);
480 
481  // check if enough transfer slots available (threshold reached)
482  if (!entry.isTransferThresholdReached())
483  throw NeighborDatabase::NoMoreTransfersAvailable();
484 
485  // get the DeliveryPredictabilityMap of the potentially next hop
486  const DeliveryPredictabilityMap &dpm = entry.getDataset<DeliveryPredictabilityMap>();
487 
488  // get the bundle filter of the neighbor
489  BundleFilter filter(entry, *_forwardingStrategy, dpm);
490 
491  // some debug output
492  IBRCOMMON_LOGGER_DEBUG_TAG(ProphetRoutingExtension::TAG, 40) << "search some bundles not known by " << task.eid.getString() << IBRCOMMON_LOGGER_ENDL;
493 
494  // query some unknown bundle from the storage, the list contains max. 10 items.
495  list.clear();
496  (**this).getSeeker().get(filter, list);
497  } catch (const NeighborDatabase::DatasetNotAvailableException&) {
498  // if there is no DeliveryPredictabilityMap for the next hop
499  // perform a routing handshake with the peer
500  (**this).doHandshake(task.eid);
501  } catch (const dtn::storage::BundleSelectorException&) {
502  // query a new summary vector from this neighbor
503  (**this).doHandshake(task.eid);
504  }
505 
506  // send the bundles as long as we have resources
507  for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); ++iter)
508  {
509  const dtn::data::MetaBundle &meta = (*iter);
510 
511  try {
512  transferTo(task.eid, meta);
513  } catch (const NeighborDatabase::AlreadyInTransitException&) { };
514  }
515  } catch (const NeighborDatabase::NoMoreTransfersAvailable &ex) {
516  IBRCOMMON_LOGGER_DEBUG_TAG(ProphetRoutingExtension::TAG, 10) << "task " << t->toString() << " aborted: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
517  } catch (const NeighborDatabase::NeighborNotAvailableException &ex) {
518  IBRCOMMON_LOGGER_DEBUG_TAG(ProphetRoutingExtension::TAG, 10) << "task " << t->toString() << " aborted: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
519  } catch (const dtn::storage::NoBundleFoundException &ex) {
520  IBRCOMMON_LOGGER_DEBUG_TAG(ProphetRoutingExtension::TAG, 10) << "task " << t->toString() << " aborted: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
521  } catch (const std::bad_cast&) { }
522 
527  try {
528  dynamic_cast<NextExchangeTask&>(*t);
529 
530  std::set<dtn::core::Node> neighbors = dtn::core::BundleCore::getInstance().getConnectionManager().getNeighbors();
531  std::set<dtn::core::Node>::const_iterator it;
532  for(it = neighbors.begin(); it != neighbors.end(); ++it)
533  {
534  try{
535  (**this).doHandshake(it->getEID());
536  } catch (const ibrcommon::Exception &ex) { }
537  }
538  } catch (const std::bad_cast&) { }
539 
540  } catch (const ibrcommon::Exception &ex) {
541  IBRCOMMON_LOGGER_DEBUG_TAG(ProphetRoutingExtension::TAG, 20) << "task failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
542  }
543  } catch (const std::exception &ex) {
544  IBRCOMMON_LOGGER_DEBUG_TAG(ProphetRoutingExtension::TAG, 15) << "terminated due to " << ex.what() << IBRCOMMON_LOGGER_ENDL;
545  return;
546  }
547 
548  yield();
549  }
550  }
551 
553  {
554  _taskqueue.abort();
555  }
556 
557  float ProphetRoutingExtension::p_encounter(const dtn::data::EID &neighbor) const
558  {
559  age_map::const_iterator it = _ageMap.find(neighbor);
560  if(it == _ageMap.end())
561  {
562  /* In this case, we got a transitive update for the node earlier but havent encountered it ourselves */
563  return _p_encounter_max;
564  }
565 
567  const dtn::data::Timestamp time_diff = currentTime - it->second;
568 #ifdef __DEVELOPMENT_ASSERTIONS__
569  assert(currentTime >= it->second && "the ageMap timestamp should be smaller than the current timestamp");
570 #endif
571  if(time_diff > _i_typ)
572  {
573  return _p_encounter_max;
574  }
575  else
576  {
577  return _p_encounter_max * static_cast<float>(time_diff.get<size_t>() / _i_typ);
578  }
579  }
580 
581  void ProphetRoutingExtension::updateNeighbor(const dtn::data::EID &neighbor)
582  {
586  try {
587  float neighbor_dp = _deliveryPredictabilityMap.get(neighbor);
588 
589  if (neighbor_dp < _p_first_threshold)
590  {
591  neighbor_dp = _p_encounter_first;
592  }
593  else
594  {
595  neighbor_dp += (1 - _delta - neighbor_dp) * p_encounter(neighbor);
596  }
597 
598  _deliveryPredictabilityMap.set(neighbor, neighbor_dp);
599  } catch (const DeliveryPredictabilityMap::ValueNotFoundException&) {
600  _deliveryPredictabilityMap.set(neighbor, _p_encounter_first);
601  }
602 
603  _ageMap[neighbor] = dtn::utils::Clock::getUnixTimestamp();
604  }
605 
606  void ProphetRoutingExtension::age()
607  {
608  _deliveryPredictabilityMap.age(_p_first_threshold);
609  }
610 
611  ProphetRoutingExtension::SearchNextBundleTask::SearchNextBundleTask(const dtn::data::EID &eid)
612  : eid(eid)
613  {
614  }
615 
616  ProphetRoutingExtension::SearchNextBundleTask::~SearchNextBundleTask()
617  {
618  }
619 
620  std::string ProphetRoutingExtension::SearchNextBundleTask::toString() const
621  {
622  return "SearchNextBundleTask: " + eid.getString();
623  }
624 
625  ProphetRoutingExtension::NextExchangeTask::NextExchangeTask()
626  {
627  }
628 
629  ProphetRoutingExtension::NextExchangeTask::~NextExchangeTask()
630  {
631  }
632 
633  std::string ProphetRoutingExtension::NextExchangeTask::toString() const
634  {
635  return "NextExchangeTask";
636  }
637 
639  {
640  }
641 
643  {
644  }
645 
647  {
648  return neighborDPIsGreater(neighbor_dpm, bundle.destination);
649  }
650 
652  : _NF_max(NF_max)
653  {
654  }
655 
657  {
658  }
659 
661  {
662  nf_map::iterator nf_it = _NF_map.find(id);
663 
664  if (nf_it == _NF_map.end()) {
665  nf_it = _NF_map.insert(std::make_pair(id, 0)).first;
666  }
667 
668  ++nf_it->second;
669  }
670 
672  {
673  unsigned int NF = 0;
674 
675  nf_map::const_iterator nf_it = _NF_map.find(bundle);
676  if(nf_it != _NF_map.end()) {
677  NF = nf_it->second;
678  }
679 
680  if (NF > _NF_max) return false;
681 
682  return neighborDPIsGreater(neighbor_dpm, bundle.destination);
683  }
684 
685  } // namespace routing
686 } // namespace dtn