IBR-DTNSuite  0.8
daemon/src/routing/prophet/ProphetRoutingExtension.cpp
Go to the documentation of this file.
00001 #include "ProphetRoutingExtension.h"
00002 
00003 #include "core/BundleCore.h"
00004 
00005 #include <algorithm>
00006 #include <memory>
00007 
00008 #include "routing/QueueBundleEvent.h"
00009 #include "routing/NodeHandshakeEvent.h"
00010 #include "net/TransferCompletedEvent.h"
00011 #include "net/TransferAbortedEvent.h"
00012 #include "core/TimeEvent.h"
00013 #include "core/NodeEvent.h"
00014 #include "core/BundlePurgeEvent.h"
00015 #include "core/BundleEvent.h"
00016 
00017 #include <ibrcommon/Logger.h>
00018 
00019 #include <ibrdtn/data/SDNV.h>
00020 #include <ibrdtn/data/Exceptions.h>
00021 #include <ibrdtn/utils/Clock.h>
00022 
00023 namespace dtn
00024 {
00025         namespace routing
00026         {
00027                 ProphetRoutingExtension::ProphetRoutingExtension(ForwardingStrategy *strategy, float p_encounter_max, float p_encounter_first, float p_first_threshold,
00028                                                                  float beta, float gamma, float delta, ibrcommon::Timer::time_t time_unit, ibrcommon::Timer::time_t i_typ,
00029                                                                  ibrcommon::Timer::time_t next_exchange_timeout)
00030                         : _forwardingStrategy(strategy), _next_exchange_timeout(next_exchange_timeout), _p_encounter_max(p_encounter_max), _p_encounter_first(p_encounter_first),
00031                           _p_first_threshold(p_first_threshold), _beta(beta), _gamma(gamma), _delta(delta), _time_unit(time_unit), _i_typ(i_typ)
00032                 {
00033                         // assign myself to the forwarding strategy
00034                         strategy->setProphetRouter(this);
00035 
00036                         _deliveryPredictabilityMap[core::BundleCore::local] = 1.0;
00037                         // write something to the syslog
00038                         IBRCOMMON_LOGGER(info) << "Initializing PRoPHET routing module" << IBRCOMMON_LOGGER_ENDL;
00039                 }
00040 
00041                 ProphetRoutingExtension::~ProphetRoutingExtension()
00042                 {
00043                         stop();
00044                         join();
00045                         delete _forwardingStrategy;
00046                 }
00047 
00048                 void ProphetRoutingExtension::requestHandshake(const dtn::data::EID&, NodeHandshake& handshake) const
00049                 {
00050                         handshake.addRequest(DeliveryPredictabilityMap::identifier);
00051                         handshake.addRequest(AcknowledgementSet::identifier);
00052 
00053                         // request summary vector to exclude bundles known by the peer
00054                         handshake.addRequest(BloomFilterSummaryVector::identifier);
00055                 }
00056 
00057                 void ProphetRoutingExtension::responseHandshake(const dtn::data::EID& neighbor, const NodeHandshake& request, NodeHandshake& response)
00058                 {
00059                         const dtn::data::EID neighbor_node = neighbor.getNode();
00060                         if (request.hasRequest(DeliveryPredictabilityMap::identifier))
00061                         {
00062                                 ibrcommon::MutexLock l(_deliveryPredictabilityMap);
00063 
00064 #ifndef DISABLE_MAP_STORE
00065                                 /* check if a saved predictablity map exists */
00066                                 map_store::iterator it;
00067                                 if((it = _mapStore.find(neighbor_node)) != _mapStore.end())
00068                                 {
00069                                         /* the map has already been aged by processHandshake */
00070                                         response.addItem(new DeliveryPredictabilityMap(it->second));
00071                                 }
00072                                 else
00073                                 {
00074                                         age();
00075 
00076                                         /* copy the map to the map_store so that processHandshakes knows that it was already aged */
00077                                         _mapStore[neighbor_node] = _deliveryPredictabilityMap;
00078 
00079                                         response.addItem(new DeliveryPredictabilityMap(_deliveryPredictabilityMap));
00080                                 }
00081 #else
00082                                 age();
00083                                 response.addItem(new DeliveryPredictabilityMap(_deliveryPredictabilityMap));
00084 #endif
00085                         }
00086                         if (request.hasRequest(AcknowledgementSet::identifier))
00087                         {
00088                                 ibrcommon::MutexLock l(_acknowledgementSet);
00089 
00090                                 _acknowledgementSet.purge();
00091                                 response.addItem(new AcknowledgementSet(_acknowledgementSet));
00092                         }
00093                 }
00094 
00095                 void ProphetRoutingExtension::processHandshake(const dtn::data::EID& neighbor, NodeHandshake& response)
00096                 {
00097                         const dtn::data::EID neighbor_node = neighbor.getNode();
00098 
00099                         /* ignore neighbors, that have our EID */
00100                         if(neighbor_node == dtn::core::BundleCore::local)
00101                                 return;
00102 
00103                         // update the encounter on every routing handshake
00104                         {
00105                                 ibrcommon::MutexLock l(_deliveryPredictabilityMap);
00106 
00107                                 age();
00108 
00109                                 /* update predictability for this neighbor */
00110                                 updateNeighbor(neighbor_node);
00111                         }
00112 
00113                         try {
00114                                 const DeliveryPredictabilityMap& neighbor_dp_map = response.get<DeliveryPredictabilityMap>();
00115 
00116                                 ibrcommon::MutexLock l(_deliveryPredictabilityMap);
00117 
00118 #ifndef DISABLE_MAP_STORE
00119                                 /* save the current map for this neighbor */
00120                                 map_store::iterator it;
00121                                 if((it = _mapStore.find(neighbor_node)) == _mapStore.end())
00122                                 {
00123                                         /* the map has not been aged yet */
00124                                         age();
00125                                 }
00126                                 _mapStore[neighbor_node] = _deliveryPredictabilityMap;
00127 #endif
00128 
00129                                 /* update the dp_map */
00130                                 update(neighbor_dp_map, neighbor_node);
00131 
00132                         } catch (std::exception&) { }
00133 
00134                         try {
00135                                 const AcknowledgementSet& neighbor_ack_set = response.get<AcknowledgementSet>();
00136 
00137                                 ibrcommon::MutexLock l(_acknowledgementSet);
00138 
00139                                 _acknowledgementSet.merge(neighbor_ack_set);
00140 
00141                                 /* remove acknowledged bundles from bundle store if we do not have custody */
00142                                 dtn::storage::BundleStorage &storage = (**this).getStorage();
00143 
00144                                 class BundleFilter : public dtn::storage::BundleStorage::BundleFilterCallback
00145                                 {
00146                                 public:
00147                                         BundleFilter(const AcknowledgementSet& entry)
00148                                          : _entry(entry)
00149                                         {}
00150 
00151                                         virtual ~BundleFilter() {}
00152 
00153                                         virtual size_t limit() const { return 0; }
00154 
00155                                         virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const
00156                                         {
00157                                                 // do not delete any bundles with
00158                                                 if ((meta.destination.getNode() == dtn::core::BundleCore::local) || (meta.custodian.getNode() == dtn::core::BundleCore::local))
00159                                                         return false;
00160 
00161                                                 if(!_entry.has(meta))
00162                                                         return false;
00163 
00164                                                 return true;
00165                                         }
00166 
00167                                 private:
00168                                         const AcknowledgementSet& _entry;
00169                                 } filter(_acknowledgementSet);
00170 
00171                                 const std::list<dtn::data::MetaBundle> removeList = storage.get(filter);
00172 
00173                                 for (std::list<dtn::data::MetaBundle>::const_iterator it = removeList.begin(); it != removeList.end(); ++it)
00174                                 {
00175                                         const dtn::data::MetaBundle &meta = (*it);
00176 
00177                                         dtn::core::BundlePurgeEvent::raise(meta);
00178 
00179                                         IBRCOMMON_LOGGER(notice) << "Bundle removed due to prophet ack: " << meta.toString() << IBRCOMMON_LOGGER_ENDL;
00180 
00181                                         /* generate a report */
00182                                         dtn::core::BundleEvent::raise(meta, dtn::core::BUNDLE_DELETED, dtn::data::StatusReportBlock::DEPLETED_STORAGE);
00183                                 }
00184                         } catch (std::exception&) { }
00185                 }
00186 
00187                 void ProphetRoutingExtension::notify(const dtn::core::Event *evt)
00188                 {
00189                         try {
00190                                 const dtn::core::TimeEvent &time = dynamic_cast<const dtn::core::TimeEvent&>(*evt);
00191 
00192                                 ibrcommon::MutexLock l(_next_exchange_mutex);
00193                                 if ((_next_exchange_timestamp > 0) && (_next_exchange_timestamp < time.getUnixTimestamp()))
00194                                 {
00195                                         _taskqueue.push( new NextExchangeTask() );
00196 
00197                                         // define the next exchange timestamp
00198                                         _next_exchange_timestamp = time.getUnixTimestamp() + _next_exchange_timeout;
00199 
00200                                         ibrcommon::MutexLock l(_acknowledgementSet);
00201                                         _acknowledgementSet.purge();
00202                                 }
00203                                 return;
00204                         } catch (const std::bad_cast&) { };
00205 
00206 #ifndef DISABLE_MAP_STORE
00207                         /* in case of timer or node disconnect event, remove corresponding _mapStore items */
00208                         try {
00209                                 const dtn::core::NodeEvent &nodeevent = dynamic_cast<const dtn::core::NodeEvent&>(*evt);
00210                                 const dtn::data::EID &neighbor = nodeevent.getNode().getEID().getNode();
00211 
00212                                 if (nodeevent.getAction() == NODE_UNAVAILABLE)
00213                                 {
00214                                         ibrcommon::MutexLock l(_deliveryPredictabilityMap);
00215                                         _mapStore.erase(neighbor);
00216                                 }
00217                                 return;
00218                         } catch (const std::bad_cast&) { };
00219 #endif
00220 
00221                         // If an incoming bundle is received, forward it to all connected neighbors
00222                         try {
00223                                 dynamic_cast<const QueueBundleEvent&>(*evt);
00224 
00225                                 // new bundles trigger a recheck for all neighbors
00226                                 const std::set<dtn::core::Node> nl = dtn::core::BundleCore::getInstance().getNeighbors();
00227 
00228                                 for (std::set<dtn::core::Node>::const_iterator iter = nl.begin(); iter != nl.end(); iter++)
00229                                 {
00230                                         const dtn::core::Node &n = (*iter);
00231 
00232                                         // transfer the next bundle to this destination
00233                                         _taskqueue.push( new SearchNextBundleTask( n.getEID() ) );
00234                                 }
00235                                 return;
00236                         } catch (const std::bad_cast&) { };
00237 
00238                         // If a new neighbor comes available search for bundles
00239                         try {
00240                                 const dtn::core::NodeEvent &nodeevent = dynamic_cast<const dtn::core::NodeEvent&>(*evt);
00241                                 const dtn::core::Node &n = nodeevent.getNode();
00242 
00243                                 if (nodeevent.getAction() == NODE_AVAILABLE)
00244                                 {
00245                                         _taskqueue.push( new SearchNextBundleTask( n.getEID() ) );
00246                                 }
00247 
00248                                 return;
00249                         } catch (const std::bad_cast&) { };
00250 
00251                         try {
00252                                 const NodeHandshakeEvent &handshake = dynamic_cast<const NodeHandshakeEvent&>(*evt);
00253 
00254                                 if (handshake.state == NodeHandshakeEvent::HANDSHAKE_UPDATED)
00255                                 {
00256                                         // transfer the next bundle to this destination
00257                                         _taskqueue.push( new SearchNextBundleTask( handshake.peer ) );
00258                                 }
00259                                 else if (handshake.state == NodeHandshakeEvent::HANDSHAKE_COMPLETED)
00260                                 {
00261                                         // transfer the next bundle to this destination
00262                                         _taskqueue.push( new SearchNextBundleTask( handshake.peer ) );
00263                                 }
00264                                 return;
00265                         } catch (const std::bad_cast&) { };
00266 
00267                         // The bundle transfer has been aborted
00268                         try {
00269                                 const dtn::net::TransferAbortedEvent &aborted = dynamic_cast<const dtn::net::TransferAbortedEvent&>(*evt);
00270 
00271                                 // transfer the next bundle to this destination
00272                                 _taskqueue.push( new SearchNextBundleTask( aborted.getPeer() ) );
00273 
00274                                 return;
00275                         } catch (const std::bad_cast&) { };
00276 
00277                         // A bundle transfer was successful
00278                         try {
00279                                 const dtn::net::TransferCompletedEvent &completed = dynamic_cast<const dtn::net::TransferCompletedEvent&>(*evt);
00280                                 const dtn::data::MetaBundle &meta = completed.getBundle();
00281                                 const dtn::data::EID &peer = completed.getPeer();
00282 
00283                                 if ((meta.destination.getNode() == peer.getNode())
00284                                                 /* send prophet ack only for singleton */
00285                                                 && (meta.procflags & dtn::data::Bundle::DESTINATION_IS_SINGLETON))
00286                                 {
00287                                         /* the bundle was transferred, mark it as acknowledged */
00288                                         ibrcommon::MutexLock l(_acknowledgementSet);
00289                                         _acknowledgementSet.insert(Acknowledgement(meta, meta.expiretime));
00290                                 }
00291 
00292                                 // add forwarded entry to GTMX strategy
00293                                 try {
00294                                         GTMX_Strategy &gtmx = dynamic_cast<GTMX_Strategy&>(*_forwardingStrategy);
00295                                         gtmx.addForward(meta);
00296                                 } catch (const std::bad_cast &ex) { };
00297 
00298                                 // search for the next bundle
00299                                 _taskqueue.push( new SearchNextBundleTask( completed.getPeer() ) );
00300                                 return;
00301                         } catch (const std::bad_cast&) { };
00302                 }
00303 
00304                 ibrcommon::ThreadsafeReference<ProphetRoutingExtension::DeliveryPredictabilityMap> ProphetRoutingExtension::getDeliveryPredictabilityMap()
00305                 {
00306                         {
00307                                 ibrcommon::MutexLock l(_deliveryPredictabilityMap);
00308                                 age();
00309                         }
00310                         return ibrcommon::ThreadsafeReference<DeliveryPredictabilityMap>(_deliveryPredictabilityMap, _deliveryPredictabilityMap);
00311                 }
00312 
00313                 ibrcommon::ThreadsafeReference<const ProphetRoutingExtension::DeliveryPredictabilityMap> ProphetRoutingExtension::getDeliveryPredictabilityMap() const
00314                 {
00315                         return ibrcommon::ThreadsafeReference<const DeliveryPredictabilityMap>(_deliveryPredictabilityMap, const_cast<DeliveryPredictabilityMap&>(_deliveryPredictabilityMap));
00316                 }
00317 
00318                 ibrcommon::ThreadsafeReference<const ProphetRoutingExtension::AcknowledgementSet> ProphetRoutingExtension::getAcknowledgementSet() const
00319                 {
00320                         return ibrcommon::ThreadsafeReference<const AcknowledgementSet>(_acknowledgementSet, const_cast<AcknowledgementSet&>(_acknowledgementSet));
00321                 }
00322 
00323                 void ProphetRoutingExtension::ProphetRoutingExtension::run()
00324                 {
00325                         class BundleFilter : public dtn::storage::BundleStorage::BundleFilterCallback
00326                         {
00327                         public:
00328                                 BundleFilter(const NeighborDatabase::NeighborEntry &entry, ForwardingStrategy &strategy)
00329                                  : _entry(entry), _strategy(strategy)
00330                                 {};
00331 
00332                                 virtual ~BundleFilter() {};
00333 
00334                                 virtual size_t limit() const { return 10; };
00335 
00336                                 virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const
00337                                 {
00338                                         // check Scope Control Block - do not forward bundles with hop limit == 0
00339                                         if (meta.hopcount == 0)
00340                                         {
00341                                                 return false;
00342                                         }
00343 
00344                                         // do not forward any routing control message
00345                                         // this is done by the neighbor routing module
00346                                         if (isRouting(meta.source))
00347                                         {
00348                                                 return false;
00349                                         }
00350 
00351                                         // do not forward local bundles
00352                                         if ((meta.destination.getNode() == dtn::core::BundleCore::local)
00353                                                         && meta.get(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON)
00354                                                 )
00355                                         {
00356                                                 return false;
00357                                         }
00358 
00359                                         // check Scope Control Block - do not forward non-group bundles with hop limit <= 1
00360                                         if ((meta.hopcount <= 1) && (meta.get(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON)))
00361                                         {
00362                                                 return false;
00363                                         }
00364 
00365                                         // do not forward to any blacklisted destination
00366                                         const dtn::data::EID dest = meta.destination.getNode();
00367                                         if (_blacklist.find(dest) != _blacklist.end())
00368                                         {
00369                                                 return false;
00370                                         }
00371 
00372                                         // do not forward bundles already known by the destination
00373                                         // throws BloomfilterNotAvailableException if no filter is available or it is expired
00374                                         if (_entry.has(meta, true))
00375                                         {
00376                                                 return false;
00377                                         }
00378 
00379                                         // ask the routing strategy if this bundle should be selected
00380                                         if (meta.get(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON))
00381                                         {
00382                                                 return _strategy.shallForward(_entry.eid, meta);
00383                                         }
00384 
00385                                         return true;
00386                                 }
00387 
00388                                 void blacklist(const dtn::data::EID& id)
00389                                 {
00390                                         _blacklist.insert(id);
00391                                 }
00392 
00393                         private:
00394                                 std::set<dtn::data::EID> _blacklist;
00395                                 const NeighborDatabase::NeighborEntry &_entry;
00396                                 const ForwardingStrategy &_strategy;
00397                         };
00398 
00399                         dtn::storage::BundleStorage &storage = (**this).getStorage();
00400 
00401                         {
00402                                 ibrcommon::MutexLock l(_next_exchange_mutex);
00403                                 // define the next exchange timestamp
00404                                 _next_exchange_timestamp = dtn::utils::Clock::getUnixTimestamp() + _next_exchange_timeout;
00405                         }
00406 
00407                         while (true)
00408                         {
00409                                 try {
00410                                         Task *t = _taskqueue.getnpop(true);
00411                                         std::auto_ptr<Task> killer(t);
00412 
00413                                         IBRCOMMON_LOGGER_DEBUG(50) << "processing prophet task " << t->toString() << IBRCOMMON_LOGGER_ENDL;
00414 
00415                                         try {
00421                                                 try {
00422                                                         SearchNextBundleTask &task = dynamic_cast<SearchNextBundleTask&>(*t);
00423                                                         NeighborDatabase &db = (**this).getNeighborDB();
00424 
00425                                                         ibrcommon::MutexLock l(db);
00426                                                         NeighborDatabase::NeighborEntry &entry = db.get(task.eid);
00427 
00428                                                         try {
00429                                                                 // get the bundle filter of the neighbor
00430                                                                 BundleFilter filter(entry, *_forwardingStrategy);
00431 
00432                                                                 // some debug output
00433                                                                 IBRCOMMON_LOGGER_DEBUG(40) << "search some bundles not known by " << task.eid.getString() << IBRCOMMON_LOGGER_ENDL;
00434 
00435                                                                 // blacklist the neighbor itself, because this is handled by neighbor routing extension
00436                                                                 filter.blacklist(task.eid);
00437 
00438                                                                 // query some unknown bundle from the storage, the list contains max. 10 items.
00439                                                                 const std::list<dtn::data::MetaBundle> list = storage.get(filter);
00440 
00441                                                                 // send the bundles as long as we have resources
00442                                                                 for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); iter++)
00443                                                                 {
00444                                                                         const dtn::data::MetaBundle &meta = (*iter);
00445 
00446                                                                         try {
00447                                                                                 transferTo(entry, meta);
00448                                                                         } catch (const NeighborDatabase::AlreadyInTransitException&) { };
00449                                                                 }
00450                                                         } catch (const NeighborDatabase::BloomfilterNotAvailableException&) {
00451                                                                 // query a new summary vector from this neighbor
00452                                                                 (**this).doHandshake(task.eid);
00453                                                         }
00454                                                 } catch (const NeighborDatabase::NoMoreTransfersAvailable&) {
00455                                                 } catch (const NeighborDatabase::NeighborNotAvailableException&) {
00456                                                 } catch (const std::bad_cast&) { }
00457 
00462                                                 try {
00463                                                         dynamic_cast<NextExchangeTask&>(*t);
00464 
00465 #ifndef DISABLE_MAP_STORE
00466                                                         {
00467                                                                 /* lock the dp map when accessing the mapStore */
00468                                                                 ibrcommon::MutexLock l(_deliveryPredictabilityMap);
00469                                                                 _mapStore.clear();
00470                                                         }
00471 #endif
00472 
00473                                                         std::set<dtn::core::Node> neighbors = dtn::core::BundleCore::getInstance().getNeighbors();
00474                                                         std::set<dtn::core::Node>::const_iterator it;
00475                                                         for(it = neighbors.begin(); it != neighbors.end(); ++it)
00476                                                         {
00477                                                                 try{
00478                                                                         (**this).doHandshake(it->getEID());
00479                                                                 } catch (const ibrcommon::Exception &ex) { }
00480                                                         }
00481                                                 } catch (const std::bad_cast&) { }
00482 
00483                                         } catch (const ibrcommon::Exception &ex) {
00484                                                 IBRCOMMON_LOGGER_DEBUG(20) << "Exception occurred in ProphetRoutingExtension: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00485                                         }
00486                                 } catch (const std::exception&) {
00487                                         return;
00488                                 }
00489 
00490                                 yield();
00491                         }
00492                 }
00493 
00494                 void ProphetRoutingExtension::__cancellation()
00495                 {
00496                         {
00497                                 ibrcommon::MutexLock l(_next_exchange_mutex);
00498                                 _next_exchange_timestamp = 0;
00499                         }
00500                         _taskqueue.abort();
00501                 }
00502 
00503                 float ProphetRoutingExtension::p_encounter(const dtn::data::EID &neighbor) const
00504                 {
00505                         age_map::const_iterator it = _ageMap.find(neighbor);
00506                         if(it == _ageMap.end())
00507                         {
00508                                 /* In this case, we got a transitive update for the node earlier but havent encountered it ourselves */
00509                                 return _p_encounter_max;
00510                         }
00511 
00512                         size_t currentTime = dtn::utils::Clock::getUnixTimestamp();
00513                         size_t time_diff = currentTime - it->second;
00514 #ifdef __DEVELOPMENT_ASSERTIONS__
00515                         assert(currentTime >= it->second && "the ageMap timestamp should be smaller than the current timestamp");
00516 #endif
00517                         if(time_diff > _i_typ)
00518                         {
00519                                 return _p_encounter_max;
00520                         }
00521                         else
00522                         {
00523                                 return _p_encounter_max * ((float) time_diff / _i_typ);
00524                         }
00525                 }
00526 
00527                 const size_t ProphetRoutingExtension::DeliveryPredictabilityMap::identifier = NodeHandshakeItem::DELIVERY_PREDICTABILITY_MAP;
00528                 const size_t ProphetRoutingExtension::AcknowledgementSet::identifier = NodeHandshakeItem::PROPHET_ACKNOWLEDGEMENT_SET;
00529 
00530                 size_t ProphetRoutingExtension::DeliveryPredictabilityMap::DeliveryPredictabilityMap::getIdentifier() const
00531                 {
00532                         return identifier;
00533                 }
00534 
00535                 size_t ProphetRoutingExtension::DeliveryPredictabilityMap::getLength() const
00536                 {
00537                         size_t len = 0;
00538                         for(const_iterator it = begin(); it != end(); ++it)
00539                         {
00540                                 /* calculate length of the EID */
00541                                 const std::string eid = it->first.getString();
00542                                 size_t eid_len = eid.length();
00543                                 len += data::SDNV(eid_len).getLength() + eid_len;
00544 
00545                                 /* calculate length of the float in fixed notation */
00546                                 const float& f = it->second;
00547                                 std::stringstream ss;
00548                                 ss << f << std::flush;
00549 
00550                                 size_t float_len = ss.str().length();
00551                                 len += data::SDNV(float_len).getLength() + float_len;
00552                         }
00553                         return data::SDNV(size()).getLength() + len;
00554                 }
00555 
00556                 std::ostream& ProphetRoutingExtension::DeliveryPredictabilityMap::serialize(std::ostream& stream) const
00557                 {
00558                         stream << data::SDNV(size());
00559                         for(const_iterator it = begin(); it != end(); ++it)
00560                         {
00561                                 const std::string eid = it->first.getString();
00562                                 stream << data::SDNV(eid.length()) << eid;
00563 
00564                                 const float& f = it->second;
00565                                 /* write f into a stringstream to get final length */
00566                                 std::stringstream ss;
00567                                 ss << f << std::flush;
00568 
00569                                 stream << data::SDNV(ss.str().length());
00570                                 stream << ss.str();
00571                         }
00572                         IBRCOMMON_LOGGER_DEBUG(20) << "ProphetRouting: Serialized DeliveryPredictabilityMap with " << size() << " items." << IBRCOMMON_LOGGER_ENDL;
00573                         IBRCOMMON_LOGGER_DEBUG(60) << *this << IBRCOMMON_LOGGER_ENDL;
00574                         return stream;
00575                 }
00576 
00577                 std::istream& ProphetRoutingExtension::DeliveryPredictabilityMap::deserialize(std::istream& stream)
00578                 {
00579                         data::SDNV elements_read(0);
00580                         data::SDNV map_size;
00581                         stream >> map_size;
00582 
00583                         while(elements_read < map_size)
00584                         {
00585                                 /* read the EID */
00586                                 data::SDNV eid_len;
00587                                 stream >> eid_len;
00588                                 char eid_cstr[eid_len.getValue()+1];
00589                                 stream.read(eid_cstr, sizeof(eid_cstr)-1);
00590                                 eid_cstr[sizeof(eid_cstr)-1] = 0;
00591                                 data::EID eid(eid_cstr);
00592                                 if(eid == data::EID())
00593                                         throw dtn::InvalidDataException("EID could not be casted, while parsing a dp_map.");
00594 
00595                                 /* read the probability (float) */
00596                                 float f;
00597                                 data::SDNV float_len;
00598                                 stream >> float_len;
00599                                 char f_cstr[float_len.getValue()+1];
00600                                 stream.read(f_cstr, sizeof(f_cstr)-1);
00601                                 f_cstr[sizeof(f_cstr)-1] = 0;
00602 
00603                                 std::stringstream ss(f_cstr);
00604                                 ss >> f;
00605                                 if(ss.fail())
00606                                         throw dtn::InvalidDataException("Float could not be casted, while parsing a dp_map.");
00607 
00608                                 /* check if f is in a proper range */
00609                                 if(f < 0 || f > 1)
00610                                         continue;
00611 
00612                                 /* insert the data into the map */
00613                                 (*this)[eid] = f;
00614 
00615                                 elements_read += 1;
00616                         }
00617 
00618                         IBRCOMMON_LOGGER_DEBUG(20) << "ProphetRouting: Deserialized DeliveryPredictabilityMap with " << size() << " items." << IBRCOMMON_LOGGER_ENDL;
00619                         IBRCOMMON_LOGGER_DEBUG(60) << *this << IBRCOMMON_LOGGER_ENDL;
00620                         return stream;
00621                 }
00622 
00623                 void ProphetRoutingExtension::updateNeighbor(const dtn::data::EID &neighbor)
00624                 {
00628                         DeliveryPredictabilityMap::const_iterator it;
00629                         float neighbor_dp = _p_encounter_first;
00630 
00631                         if ((it = _deliveryPredictabilityMap.find(neighbor)) != _deliveryPredictabilityMap.end())
00632                         {
00633                                 neighbor_dp = it->second;
00634 
00635                                 if(it->second < _p_first_threshold)
00636                                 {
00637                                         neighbor_dp = _p_encounter_first;
00638                                 }
00639                                 else
00640                                 {
00641                                         neighbor_dp += (1 - _delta - it->second) * p_encounter(neighbor);
00642                                 }
00643                         }
00644 
00645                         _deliveryPredictabilityMap[neighbor] = neighbor_dp;
00646                         _ageMap[neighbor] = dtn::utils::Clock::getUnixTimestamp();
00647                 }
00648 
00649                 void ProphetRoutingExtension::update(const DeliveryPredictabilityMap& neighbor_dp_map, const dtn::data::EID& neighbor)
00650                 {
00651                         DeliveryPredictabilityMap::const_iterator it;
00652                         float neighbor_dp = _p_encounter_first;
00653 
00654                         if ((it = _deliveryPredictabilityMap.find(neighbor)) != _deliveryPredictabilityMap.end())
00655                         {
00656                                 neighbor_dp = it->second;
00657                         }
00658 
00662                         for (it = neighbor_dp_map.begin(); it != neighbor_dp_map.end(); ++it)
00663                         {
00664                                 if ((it->first != neighbor) && (it->first != dtn::core::BundleCore::local))
00665                                 {
00666                                         float dp = 0;
00667 
00668                                         DeliveryPredictabilityMap::iterator dp_it;
00669                                         if((dp_it = _deliveryPredictabilityMap.find(it->first)) != _deliveryPredictabilityMap.end())
00670                                                 dp = dp_it->second;
00671 
00672                                         dp = max(dp, neighbor_dp * it->second * _beta);
00673 
00674                                         if(dp_it != _deliveryPredictabilityMap.end())
00675                                                 dp_it->second = dp;
00676                                         else
00677                                                 _deliveryPredictabilityMap[it->first] = dp;
00678                                 }
00679                         }
00680                 }
00681 
00682                 void ProphetRoutingExtension::age()
00683                 {
00684                         size_t current_time = dtn::utils::Clock::getUnixTimestamp();
00685 
00686                         // prevent double aging
00687                         if (current_time <= _lastAgingTime) return;
00688 
00689                         unsigned int k = (current_time - _lastAgingTime) / _time_unit;
00690 
00691                         DeliveryPredictabilityMap::iterator it;
00692                         for(it = _deliveryPredictabilityMap.begin(); it != _deliveryPredictabilityMap.end();)
00693                         {
00694                                 if(it->first == dtn::core::BundleCore::local)
00695                                 {
00696                                         ++it;
00697                                         continue;
00698                                 }
00699 
00700                                 it->second *= pow(_gamma, (int)k);
00701 
00702                                 if(it->second < _p_first_threshold)
00703                                 {
00704                                         _deliveryPredictabilityMap.erase(it++);
00705                                 } else {
00706                                         ++it;
00707                                 }
00708                         }
00709 
00710                         _lastAgingTime = current_time;
00711                 }
00712 
00713                 std::ostream& operator<<(std::ostream& stream, const ProphetRoutingExtension::DeliveryPredictabilityMap& map)
00714                 {
00715                         ProphetRoutingExtension::DeliveryPredictabilityMap::const_iterator it;
00716                         for(it = map.begin(); it != map.end(); ++it)
00717                         {
00718                                 stream << it->first.getString() << ": " << it->second << std::endl;
00719                         }
00720 
00721                         return  stream;
00722                 }
00723 
00724                 std::ostream& operator<<(std::ostream& stream, const ProphetRoutingExtension::AcknowledgementSet& ack_set)
00725                 {
00726                         std::set<ProphetRoutingExtension::Acknowledgement>::const_iterator it;
00727                         for(it = ack_set._ackSet.begin(); it != ack_set._ackSet.end(); ++it)
00728                         {
00729                                 stream << it->bundleID.toString() << std::endl;
00730                                 stream << it->expire_time << std::endl;
00731                         }
00732 
00733                         return stream;
00734                 }
00735 
00736                 ProphetRoutingExtension::Acknowledgement::Acknowledgement()
00737                 {
00738                 }
00739 
00740                 ProphetRoutingExtension::Acknowledgement::Acknowledgement(const dtn::data::BundleID &bundleID, size_t expire_time)
00741                         : bundleID(bundleID), expire_time(expire_time)
00742                 {
00743                 }
00744 
00745                 ProphetRoutingExtension::Acknowledgement::~Acknowledgement()
00746                 {
00747                 }
00748 
00749                 bool ProphetRoutingExtension::Acknowledgement::operator<(const Acknowledgement &other) const
00750                 {
00751                         return (bundleID < other.bundleID);
00752                 }
00753 
00754                 ProphetRoutingExtension::AcknowledgementSet::AcknowledgementSet()
00755                 {
00756                 }
00757 
00758                 ProphetRoutingExtension::AcknowledgementSet::AcknowledgementSet(const AcknowledgementSet &other)
00759                         : ibrcommon::Mutex(), _ackSet(other._ackSet)
00760                 {
00761                 }
00762 
00763                 void ProphetRoutingExtension::AcknowledgementSet::insert(const Acknowledgement& ack)
00764                 {
00765                         _ackSet.insert(ack);
00766                 }
00767 
00768                 void ProphetRoutingExtension::AcknowledgementSet::purge()
00769                 {
00770                         std::set<Acknowledgement>::iterator it;
00771                         for(it = _ackSet.begin(); it != _ackSet.end();)
00772                         {
00773                                 if(dtn::utils::Clock::isExpired(it->expire_time))
00774                                         _ackSet.erase(it++);
00775                                 else
00776                                         ++it;
00777                         }
00778                 }
00779 
00780                 void ProphetRoutingExtension::AcknowledgementSet::merge(const AcknowledgementSet &other)
00781                 {
00782                         std::set<Acknowledgement>::iterator it;
00783                         for(it = other._ackSet.begin(); it != other._ackSet.end(); ++it)
00784                         {
00785                                 _ackSet.insert(*it);
00786                         }
00787                 }
00788 
00789                 bool ProphetRoutingExtension::AcknowledgementSet::has(const dtn::data::BundleID &bundle) const
00790                 {
00791                         return _ackSet.find(Acknowledgement(bundle, 0)) != _ackSet.end();
00792                 }
00793 
00794                 size_t ProphetRoutingExtension::AcknowledgementSet::getIdentifier() const
00795                 {
00796                         return identifier;
00797                 }
00798                 size_t ProphetRoutingExtension::AcknowledgementSet::getLength() const
00799                 {
00800                         std::stringstream ss;
00801                         serialize(ss);
00802                         return ss.str().length();
00803                 }
00804 
00805                 std::ostream& ProphetRoutingExtension::AcknowledgementSet::serialize(std::ostream& stream) const
00806                 {
00807                         stream << dtn::data::SDNV(_ackSet.size());
00808                         std::set<Acknowledgement>::const_iterator it;
00809                         for(it = _ackSet.begin(); it != _ackSet.end(); ++it)
00810                         {
00811                                 it->serialize(stream);
00812                         }
00813                         return stream;
00814                 }
00815 
00816                 std::istream& ProphetRoutingExtension::AcknowledgementSet::deserialize(std::istream& stream)
00817                 {
00818                         dtn::data::SDNV size;
00819                         stream >> size;
00820 
00821                         for(dtn::data::SDNV i = 0; i < size; i += 1)
00822                         {
00823                                 Acknowledgement ack;
00824                                 ack.deserialize(stream);
00825                                 _ackSet.insert(ack);
00826                         }
00827                         return stream;
00828                 }
00829 
00830                 std::ostream& ProphetRoutingExtension::Acknowledgement::serialize(std::ostream& stream) const
00831                 {
00832                         stream << bundleID;
00833                         stream << dtn::data::SDNV(expire_time);
00834                         return stream;
00835                 }
00836 
00837                 std::istream& ProphetRoutingExtension::Acknowledgement::deserialize(std::istream& stream)
00838                 {
00839                         dtn::data::SDNV expire_time;
00840                         stream >> bundleID;
00841                         stream >> expire_time;
00842                         this->expire_time = expire_time.getValue();
00843                         /* TODO only accept entries with bundles that we routed ourselves? */
00844 
00845                         if (BundleCore::max_lifetime > 0 && this->expire_time > BundleCore::max_lifetime)
00846                                 this->expire_time = BundleCore::max_lifetime;
00847 
00848                         return stream;
00849                 }
00850 
00851                 ProphetRoutingExtension::SearchNextBundleTask::SearchNextBundleTask(const dtn::data::EID &eid)
00852                         : eid(eid)
00853                 {
00854                 }
00855 
00856                 ProphetRoutingExtension::SearchNextBundleTask::~SearchNextBundleTask()
00857                 {
00858                 }
00859 
00860                 std::string ProphetRoutingExtension::SearchNextBundleTask::toString() const
00861                 {
00862                         return "SearchNextBundleTask: " + eid.getString();
00863                 }
00864 
00865                 ProphetRoutingExtension::NextExchangeTask::NextExchangeTask()
00866                 {
00867                 }
00868 
00869                 ProphetRoutingExtension::NextExchangeTask::~NextExchangeTask()
00870                 {
00871                 }
00872 
00873                 std::string ProphetRoutingExtension::NextExchangeTask::toString() const
00874                 {
00875                         return "NextExchangeTask";
00876                 }
00877 
00878                 ProphetRoutingExtension::ForwardingStrategy::ForwardingStrategy()
00879                  : _prophet_router(NULL)
00880                 {}
00881 
00882                 ProphetRoutingExtension::ForwardingStrategy::~ForwardingStrategy()
00883                 {}
00884 
00885                 bool ProphetRoutingExtension::ForwardingStrategy::neighborDPIsGreater(const dtn::data::EID& neighbor, const dtn::data::EID& destination) const
00886                 {
00887                         bool ret = false;
00888                         const DeliveryPredictabilityMap& dp_map = _prophet_router->_deliveryPredictabilityMap;
00889 
00890                         DeliveryPredictabilityMap::const_iterator neighborIT = dp_map.find(neighbor);
00891                         DeliveryPredictabilityMap::const_iterator destinationIT = dp_map.find(destination);
00892 
00893                         if(destinationIT == dp_map.end()) {
00894                                 ret = true;
00895                         } else {
00896                                 float neighbor_dp = _prophet_router->_p_first_threshold;
00897                                 if(neighborIT != dp_map.end()) {
00898                                         neighbor_dp = neighborIT->second;
00899                                 }
00900                                 ret = (neighbor_dp > destinationIT->second);
00901                         }
00902 
00903                         return ret;
00904                 }
00905 
00906                 void ProphetRoutingExtension::ForwardingStrategy::setProphetRouter(ProphetRoutingExtension *router)
00907                 {
00908                         _prophet_router = router;
00909                 }
00910 
00911                 ProphetRoutingExtension::GRTR_Strategy::GRTR_Strategy()
00912                 {
00913                 }
00914 
00915                 ProphetRoutingExtension::GRTR_Strategy::~GRTR_Strategy()
00916                 {
00917                 }
00918 
00919                 bool ProphetRoutingExtension::GRTR_Strategy::shallForward(const dtn::data::EID& neighbor, const dtn::data::MetaBundle& bundle) const
00920                 {
00921                         return neighborDPIsGreater(neighbor, bundle.destination);
00922                 }
00923 
00924                 ProphetRoutingExtension::GTMX_Strategy::GTMX_Strategy(unsigned int NF_max)
00925                  : _NF_max(NF_max)
00926                 {
00927                 }
00928 
00929                 ProphetRoutingExtension::GTMX_Strategy::~GTMX_Strategy()
00930                 {
00931                 }
00932 
00933                 void ProphetRoutingExtension::GTMX_Strategy::addForward(const dtn::data::BundleID &id)
00934                 {
00935                         nf_map::iterator nf_it = _NF_map.find(id);
00936 
00937                         if (nf_it == _NF_map.end()) {
00938                                 nf_it = _NF_map.insert(std::make_pair(id, 0)).first;
00939                         }
00940 
00941                         ++nf_it->second;
00942                 }
00943 
00944                 bool ProphetRoutingExtension::GTMX_Strategy::shallForward(const dtn::data::EID &neighbor, const dtn::data::MetaBundle& bundle) const
00945                 {
00946                         unsigned int NF = 0;
00947 
00948                         nf_map::const_iterator nf_it = _NF_map.find(bundle);
00949                         if(nf_it != _NF_map.end()) {
00950                                 NF = nf_it->second;
00951                         }
00952 
00953                         if (NF > _NF_max) return false;
00954 
00955                         return neighborDPIsGreater(neighbor, bundle.destination);
00956                 }
00957 
00958         } // namespace routing
00959 } // namespace dtn