IBR-DTNSuite
0.8
|
00001 #include "ProphetRoutingExtension.h" 00002 00003 #include "core/BundleCore.h" 00004 00005 #include <algorithm> 00006 #include <memory> 00007 00008 #include "routing/QueueBundleEvent.h" 00009 #include "routing/NodeHandshakeEvent.h" 00010 #include "net/TransferCompletedEvent.h" 00011 #include "net/TransferAbortedEvent.h" 00012 #include "core/TimeEvent.h" 00013 #include "core/NodeEvent.h" 00014 #include "core/BundlePurgeEvent.h" 00015 #include "core/BundleEvent.h" 00016 00017 #include <ibrcommon/Logger.h> 00018 00019 #include <ibrdtn/data/SDNV.h> 00020 #include <ibrdtn/data/Exceptions.h> 00021 #include <ibrdtn/utils/Clock.h> 00022 00023 namespace dtn 00024 { 00025 namespace routing 00026 { 00027 ProphetRoutingExtension::ProphetRoutingExtension(ForwardingStrategy *strategy, float p_encounter_max, float p_encounter_first, float p_first_threshold, 00028 float beta, float gamma, float delta, ibrcommon::Timer::time_t time_unit, ibrcommon::Timer::time_t i_typ, 00029 ibrcommon::Timer::time_t next_exchange_timeout) 00030 : _forwardingStrategy(strategy), _next_exchange_timeout(next_exchange_timeout), _p_encounter_max(p_encounter_max), _p_encounter_first(p_encounter_first), 00031 _p_first_threshold(p_first_threshold), _beta(beta), _gamma(gamma), _delta(delta), _time_unit(time_unit), _i_typ(i_typ) 00032 { 00033 // assign myself to the forwarding strategy 00034 strategy->setProphetRouter(this); 00035 00036 _deliveryPredictabilityMap[core::BundleCore::local] = 1.0; 00037 // write something to the syslog 00038 IBRCOMMON_LOGGER(info) << "Initializing PRoPHET routing module" << IBRCOMMON_LOGGER_ENDL; 00039 } 00040 00041 ProphetRoutingExtension::~ProphetRoutingExtension() 00042 { 00043 stop(); 00044 join(); 00045 delete _forwardingStrategy; 00046 } 00047 00048 void ProphetRoutingExtension::requestHandshake(const dtn::data::EID&, NodeHandshake& handshake) const 00049 { 00050 handshake.addRequest(DeliveryPredictabilityMap::identifier); 00051 handshake.addRequest(AcknowledgementSet::identifier); 00052 00053 // request summary vector to exclude bundles known by the peer 00054 handshake.addRequest(BloomFilterSummaryVector::identifier); 00055 } 00056 00057 void ProphetRoutingExtension::responseHandshake(const dtn::data::EID& neighbor, const NodeHandshake& request, NodeHandshake& response) 00058 { 00059 const dtn::data::EID neighbor_node = neighbor.getNode(); 00060 if (request.hasRequest(DeliveryPredictabilityMap::identifier)) 00061 { 00062 ibrcommon::MutexLock l(_deliveryPredictabilityMap); 00063 00064 #ifndef DISABLE_MAP_STORE 00065 /* check if a saved predictablity map exists */ 00066 map_store::iterator it; 00067 if((it = _mapStore.find(neighbor_node)) != _mapStore.end()) 00068 { 00069 /* the map has already been aged by processHandshake */ 00070 response.addItem(new DeliveryPredictabilityMap(it->second)); 00071 } 00072 else 00073 { 00074 age(); 00075 00076 /* copy the map to the map_store so that processHandshakes knows that it was already aged */ 00077 _mapStore[neighbor_node] = _deliveryPredictabilityMap; 00078 00079 response.addItem(new DeliveryPredictabilityMap(_deliveryPredictabilityMap)); 00080 } 00081 #else 00082 age(); 00083 response.addItem(new DeliveryPredictabilityMap(_deliveryPredictabilityMap)); 00084 #endif 00085 } 00086 if (request.hasRequest(AcknowledgementSet::identifier)) 00087 { 00088 ibrcommon::MutexLock l(_acknowledgementSet); 00089 00090 _acknowledgementSet.purge(); 00091 response.addItem(new AcknowledgementSet(_acknowledgementSet)); 00092 } 00093 } 00094 00095 void ProphetRoutingExtension::processHandshake(const dtn::data::EID& neighbor, NodeHandshake& response) 00096 { 00097 const dtn::data::EID neighbor_node = neighbor.getNode(); 00098 00099 /* ignore neighbors, that have our EID */ 00100 if(neighbor_node == dtn::core::BundleCore::local) 00101 return; 00102 00103 // update the encounter on every routing handshake 00104 { 00105 ibrcommon::MutexLock l(_deliveryPredictabilityMap); 00106 00107 age(); 00108 00109 /* update predictability for this neighbor */ 00110 updateNeighbor(neighbor_node); 00111 } 00112 00113 try { 00114 const DeliveryPredictabilityMap& neighbor_dp_map = response.get<DeliveryPredictabilityMap>(); 00115 00116 ibrcommon::MutexLock l(_deliveryPredictabilityMap); 00117 00118 #ifndef DISABLE_MAP_STORE 00119 /* save the current map for this neighbor */ 00120 map_store::iterator it; 00121 if((it = _mapStore.find(neighbor_node)) == _mapStore.end()) 00122 { 00123 /* the map has not been aged yet */ 00124 age(); 00125 } 00126 _mapStore[neighbor_node] = _deliveryPredictabilityMap; 00127 #endif 00128 00129 /* update the dp_map */ 00130 update(neighbor_dp_map, neighbor_node); 00131 00132 } catch (std::exception&) { } 00133 00134 try { 00135 const AcknowledgementSet& neighbor_ack_set = response.get<AcknowledgementSet>(); 00136 00137 ibrcommon::MutexLock l(_acknowledgementSet); 00138 00139 _acknowledgementSet.merge(neighbor_ack_set); 00140 00141 /* remove acknowledged bundles from bundle store if we do not have custody */ 00142 dtn::storage::BundleStorage &storage = (**this).getStorage(); 00143 00144 class BundleFilter : public dtn::storage::BundleStorage::BundleFilterCallback 00145 { 00146 public: 00147 BundleFilter(const AcknowledgementSet& entry) 00148 : _entry(entry) 00149 {} 00150 00151 virtual ~BundleFilter() {} 00152 00153 virtual size_t limit() const { return 0; } 00154 00155 virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const 00156 { 00157 // do not delete any bundles with 00158 if ((meta.destination.getNode() == dtn::core::BundleCore::local) || (meta.custodian.getNode() == dtn::core::BundleCore::local)) 00159 return false; 00160 00161 if(!_entry.has(meta)) 00162 return false; 00163 00164 return true; 00165 } 00166 00167 private: 00168 const AcknowledgementSet& _entry; 00169 } filter(_acknowledgementSet); 00170 00171 const std::list<dtn::data::MetaBundle> removeList = storage.get(filter); 00172 00173 for (std::list<dtn::data::MetaBundle>::const_iterator it = removeList.begin(); it != removeList.end(); ++it) 00174 { 00175 const dtn::data::MetaBundle &meta = (*it); 00176 00177 dtn::core::BundlePurgeEvent::raise(meta); 00178 00179 IBRCOMMON_LOGGER(notice) << "Bundle removed due to prophet ack: " << meta.toString() << IBRCOMMON_LOGGER_ENDL; 00180 00181 /* generate a report */ 00182 dtn::core::BundleEvent::raise(meta, dtn::core::BUNDLE_DELETED, dtn::data::StatusReportBlock::DEPLETED_STORAGE); 00183 } 00184 } catch (std::exception&) { } 00185 } 00186 00187 void ProphetRoutingExtension::notify(const dtn::core::Event *evt) 00188 { 00189 try { 00190 const dtn::core::TimeEvent &time = dynamic_cast<const dtn::core::TimeEvent&>(*evt); 00191 00192 ibrcommon::MutexLock l(_next_exchange_mutex); 00193 if ((_next_exchange_timestamp > 0) && (_next_exchange_timestamp < time.getUnixTimestamp())) 00194 { 00195 _taskqueue.push( new NextExchangeTask() ); 00196 00197 // define the next exchange timestamp 00198 _next_exchange_timestamp = time.getUnixTimestamp() + _next_exchange_timeout; 00199 00200 ibrcommon::MutexLock l(_acknowledgementSet); 00201 _acknowledgementSet.purge(); 00202 } 00203 return; 00204 } catch (const std::bad_cast&) { }; 00205 00206 #ifndef DISABLE_MAP_STORE 00207 /* in case of timer or node disconnect event, remove corresponding _mapStore items */ 00208 try { 00209 const dtn::core::NodeEvent &nodeevent = dynamic_cast<const dtn::core::NodeEvent&>(*evt); 00210 const dtn::data::EID &neighbor = nodeevent.getNode().getEID().getNode(); 00211 00212 if (nodeevent.getAction() == NODE_UNAVAILABLE) 00213 { 00214 ibrcommon::MutexLock l(_deliveryPredictabilityMap); 00215 _mapStore.erase(neighbor); 00216 } 00217 return; 00218 } catch (const std::bad_cast&) { }; 00219 #endif 00220 00221 // If an incoming bundle is received, forward it to all connected neighbors 00222 try { 00223 dynamic_cast<const QueueBundleEvent&>(*evt); 00224 00225 // new bundles trigger a recheck for all neighbors 00226 const std::set<dtn::core::Node> nl = dtn::core::BundleCore::getInstance().getNeighbors(); 00227 00228 for (std::set<dtn::core::Node>::const_iterator iter = nl.begin(); iter != nl.end(); iter++) 00229 { 00230 const dtn::core::Node &n = (*iter); 00231 00232 // transfer the next bundle to this destination 00233 _taskqueue.push( new SearchNextBundleTask( n.getEID() ) ); 00234 } 00235 return; 00236 } catch (const std::bad_cast&) { }; 00237 00238 // If a new neighbor comes available search for bundles 00239 try { 00240 const dtn::core::NodeEvent &nodeevent = dynamic_cast<const dtn::core::NodeEvent&>(*evt); 00241 const dtn::core::Node &n = nodeevent.getNode(); 00242 00243 if (nodeevent.getAction() == NODE_AVAILABLE) 00244 { 00245 _taskqueue.push( new SearchNextBundleTask( n.getEID() ) ); 00246 } 00247 00248 return; 00249 } catch (const std::bad_cast&) { }; 00250 00251 try { 00252 const NodeHandshakeEvent &handshake = dynamic_cast<const NodeHandshakeEvent&>(*evt); 00253 00254 if (handshake.state == NodeHandshakeEvent::HANDSHAKE_UPDATED) 00255 { 00256 // transfer the next bundle to this destination 00257 _taskqueue.push( new SearchNextBundleTask( handshake.peer ) ); 00258 } 00259 else if (handshake.state == NodeHandshakeEvent::HANDSHAKE_COMPLETED) 00260 { 00261 // transfer the next bundle to this destination 00262 _taskqueue.push( new SearchNextBundleTask( handshake.peer ) ); 00263 } 00264 return; 00265 } catch (const std::bad_cast&) { }; 00266 00267 // The bundle transfer has been aborted 00268 try { 00269 const dtn::net::TransferAbortedEvent &aborted = dynamic_cast<const dtn::net::TransferAbortedEvent&>(*evt); 00270 00271 // transfer the next bundle to this destination 00272 _taskqueue.push( new SearchNextBundleTask( aborted.getPeer() ) ); 00273 00274 return; 00275 } catch (const std::bad_cast&) { }; 00276 00277 // A bundle transfer was successful 00278 try { 00279 const dtn::net::TransferCompletedEvent &completed = dynamic_cast<const dtn::net::TransferCompletedEvent&>(*evt); 00280 const dtn::data::MetaBundle &meta = completed.getBundle(); 00281 const dtn::data::EID &peer = completed.getPeer(); 00282 00283 if ((meta.destination.getNode() == peer.getNode()) 00284 /* send prophet ack only for singleton */ 00285 && (meta.procflags & dtn::data::Bundle::DESTINATION_IS_SINGLETON)) 00286 { 00287 /* the bundle was transferred, mark it as acknowledged */ 00288 ibrcommon::MutexLock l(_acknowledgementSet); 00289 _acknowledgementSet.insert(Acknowledgement(meta, meta.expiretime)); 00290 } 00291 00292 // add forwarded entry to GTMX strategy 00293 try { 00294 GTMX_Strategy >mx = dynamic_cast<GTMX_Strategy&>(*_forwardingStrategy); 00295 gtmx.addForward(meta); 00296 } catch (const std::bad_cast &ex) { }; 00297 00298 // search for the next bundle 00299 _taskqueue.push( new SearchNextBundleTask( completed.getPeer() ) ); 00300 return; 00301 } catch (const std::bad_cast&) { }; 00302 } 00303 00304 ibrcommon::ThreadsafeReference<ProphetRoutingExtension::DeliveryPredictabilityMap> ProphetRoutingExtension::getDeliveryPredictabilityMap() 00305 { 00306 { 00307 ibrcommon::MutexLock l(_deliveryPredictabilityMap); 00308 age(); 00309 } 00310 return ibrcommon::ThreadsafeReference<DeliveryPredictabilityMap>(_deliveryPredictabilityMap, _deliveryPredictabilityMap); 00311 } 00312 00313 ibrcommon::ThreadsafeReference<const ProphetRoutingExtension::DeliveryPredictabilityMap> ProphetRoutingExtension::getDeliveryPredictabilityMap() const 00314 { 00315 return ibrcommon::ThreadsafeReference<const DeliveryPredictabilityMap>(_deliveryPredictabilityMap, const_cast<DeliveryPredictabilityMap&>(_deliveryPredictabilityMap)); 00316 } 00317 00318 ibrcommon::ThreadsafeReference<const ProphetRoutingExtension::AcknowledgementSet> ProphetRoutingExtension::getAcknowledgementSet() const 00319 { 00320 return ibrcommon::ThreadsafeReference<const AcknowledgementSet>(_acknowledgementSet, const_cast<AcknowledgementSet&>(_acknowledgementSet)); 00321 } 00322 00323 void ProphetRoutingExtension::ProphetRoutingExtension::run() 00324 { 00325 class BundleFilter : public dtn::storage::BundleStorage::BundleFilterCallback 00326 { 00327 public: 00328 BundleFilter(const NeighborDatabase::NeighborEntry &entry, ForwardingStrategy &strategy) 00329 : _entry(entry), _strategy(strategy) 00330 {}; 00331 00332 virtual ~BundleFilter() {}; 00333 00334 virtual size_t limit() const { return 10; }; 00335 00336 virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const 00337 { 00338 // check Scope Control Block - do not forward bundles with hop limit == 0 00339 if (meta.hopcount == 0) 00340 { 00341 return false; 00342 } 00343 00344 // do not forward any routing control message 00345 // this is done by the neighbor routing module 00346 if (isRouting(meta.source)) 00347 { 00348 return false; 00349 } 00350 00351 // do not forward local bundles 00352 if ((meta.destination.getNode() == dtn::core::BundleCore::local) 00353 && meta.get(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON) 00354 ) 00355 { 00356 return false; 00357 } 00358 00359 // check Scope Control Block - do not forward non-group bundles with hop limit <= 1 00360 if ((meta.hopcount <= 1) && (meta.get(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON))) 00361 { 00362 return false; 00363 } 00364 00365 // do not forward to any blacklisted destination 00366 const dtn::data::EID dest = meta.destination.getNode(); 00367 if (_blacklist.find(dest) != _blacklist.end()) 00368 { 00369 return false; 00370 } 00371 00372 // do not forward bundles already known by the destination 00373 // throws BloomfilterNotAvailableException if no filter is available or it is expired 00374 if (_entry.has(meta, true)) 00375 { 00376 return false; 00377 } 00378 00379 // ask the routing strategy if this bundle should be selected 00380 if (meta.get(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON)) 00381 { 00382 return _strategy.shallForward(_entry.eid, meta); 00383 } 00384 00385 return true; 00386 } 00387 00388 void blacklist(const dtn::data::EID& id) 00389 { 00390 _blacklist.insert(id); 00391 } 00392 00393 private: 00394 std::set<dtn::data::EID> _blacklist; 00395 const NeighborDatabase::NeighborEntry &_entry; 00396 const ForwardingStrategy &_strategy; 00397 }; 00398 00399 dtn::storage::BundleStorage &storage = (**this).getStorage(); 00400 00401 { 00402 ibrcommon::MutexLock l(_next_exchange_mutex); 00403 // define the next exchange timestamp 00404 _next_exchange_timestamp = dtn::utils::Clock::getUnixTimestamp() + _next_exchange_timeout; 00405 } 00406 00407 while (true) 00408 { 00409 try { 00410 Task *t = _taskqueue.getnpop(true); 00411 std::auto_ptr<Task> killer(t); 00412 00413 IBRCOMMON_LOGGER_DEBUG(50) << "processing prophet task " << t->toString() << IBRCOMMON_LOGGER_ENDL; 00414 00415 try { 00421 try { 00422 SearchNextBundleTask &task = dynamic_cast<SearchNextBundleTask&>(*t); 00423 NeighborDatabase &db = (**this).getNeighborDB(); 00424 00425 ibrcommon::MutexLock l(db); 00426 NeighborDatabase::NeighborEntry &entry = db.get(task.eid); 00427 00428 try { 00429 // get the bundle filter of the neighbor 00430 BundleFilter filter(entry, *_forwardingStrategy); 00431 00432 // some debug output 00433 IBRCOMMON_LOGGER_DEBUG(40) << "search some bundles not known by " << task.eid.getString() << IBRCOMMON_LOGGER_ENDL; 00434 00435 // blacklist the neighbor itself, because this is handled by neighbor routing extension 00436 filter.blacklist(task.eid); 00437 00438 // query some unknown bundle from the storage, the list contains max. 10 items. 00439 const std::list<dtn::data::MetaBundle> list = storage.get(filter); 00440 00441 // send the bundles as long as we have resources 00442 for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); iter++) 00443 { 00444 const dtn::data::MetaBundle &meta = (*iter); 00445 00446 try { 00447 transferTo(entry, meta); 00448 } catch (const NeighborDatabase::AlreadyInTransitException&) { }; 00449 } 00450 } catch (const NeighborDatabase::BloomfilterNotAvailableException&) { 00451 // query a new summary vector from this neighbor 00452 (**this).doHandshake(task.eid); 00453 } 00454 } catch (const NeighborDatabase::NoMoreTransfersAvailable&) { 00455 } catch (const NeighborDatabase::NeighborNotAvailableException&) { 00456 } catch (const std::bad_cast&) { } 00457 00462 try { 00463 dynamic_cast<NextExchangeTask&>(*t); 00464 00465 #ifndef DISABLE_MAP_STORE 00466 { 00467 /* lock the dp map when accessing the mapStore */ 00468 ibrcommon::MutexLock l(_deliveryPredictabilityMap); 00469 _mapStore.clear(); 00470 } 00471 #endif 00472 00473 std::set<dtn::core::Node> neighbors = dtn::core::BundleCore::getInstance().getNeighbors(); 00474 std::set<dtn::core::Node>::const_iterator it; 00475 for(it = neighbors.begin(); it != neighbors.end(); ++it) 00476 { 00477 try{ 00478 (**this).doHandshake(it->getEID()); 00479 } catch (const ibrcommon::Exception &ex) { } 00480 } 00481 } catch (const std::bad_cast&) { } 00482 00483 } catch (const ibrcommon::Exception &ex) { 00484 IBRCOMMON_LOGGER_DEBUG(20) << "Exception occurred in ProphetRoutingExtension: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00485 } 00486 } catch (const std::exception&) { 00487 return; 00488 } 00489 00490 yield(); 00491 } 00492 } 00493 00494 void ProphetRoutingExtension::__cancellation() 00495 { 00496 { 00497 ibrcommon::MutexLock l(_next_exchange_mutex); 00498 _next_exchange_timestamp = 0; 00499 } 00500 _taskqueue.abort(); 00501 } 00502 00503 float ProphetRoutingExtension::p_encounter(const dtn::data::EID &neighbor) const 00504 { 00505 age_map::const_iterator it = _ageMap.find(neighbor); 00506 if(it == _ageMap.end()) 00507 { 00508 /* In this case, we got a transitive update for the node earlier but havent encountered it ourselves */ 00509 return _p_encounter_max; 00510 } 00511 00512 size_t currentTime = dtn::utils::Clock::getUnixTimestamp(); 00513 size_t time_diff = currentTime - it->second; 00514 #ifdef __DEVELOPMENT_ASSERTIONS__ 00515 assert(currentTime >= it->second && "the ageMap timestamp should be smaller than the current timestamp"); 00516 #endif 00517 if(time_diff > _i_typ) 00518 { 00519 return _p_encounter_max; 00520 } 00521 else 00522 { 00523 return _p_encounter_max * ((float) time_diff / _i_typ); 00524 } 00525 } 00526 00527 const size_t ProphetRoutingExtension::DeliveryPredictabilityMap::identifier = NodeHandshakeItem::DELIVERY_PREDICTABILITY_MAP; 00528 const size_t ProphetRoutingExtension::AcknowledgementSet::identifier = NodeHandshakeItem::PROPHET_ACKNOWLEDGEMENT_SET; 00529 00530 size_t ProphetRoutingExtension::DeliveryPredictabilityMap::DeliveryPredictabilityMap::getIdentifier() const 00531 { 00532 return identifier; 00533 } 00534 00535 size_t ProphetRoutingExtension::DeliveryPredictabilityMap::getLength() const 00536 { 00537 size_t len = 0; 00538 for(const_iterator it = begin(); it != end(); ++it) 00539 { 00540 /* calculate length of the EID */ 00541 const std::string eid = it->first.getString(); 00542 size_t eid_len = eid.length(); 00543 len += data::SDNV(eid_len).getLength() + eid_len; 00544 00545 /* calculate length of the float in fixed notation */ 00546 const float& f = it->second; 00547 std::stringstream ss; 00548 ss << f << std::flush; 00549 00550 size_t float_len = ss.str().length(); 00551 len += data::SDNV(float_len).getLength() + float_len; 00552 } 00553 return data::SDNV(size()).getLength() + len; 00554 } 00555 00556 std::ostream& ProphetRoutingExtension::DeliveryPredictabilityMap::serialize(std::ostream& stream) const 00557 { 00558 stream << data::SDNV(size()); 00559 for(const_iterator it = begin(); it != end(); ++it) 00560 { 00561 const std::string eid = it->first.getString(); 00562 stream << data::SDNV(eid.length()) << eid; 00563 00564 const float& f = it->second; 00565 /* write f into a stringstream to get final length */ 00566 std::stringstream ss; 00567 ss << f << std::flush; 00568 00569 stream << data::SDNV(ss.str().length()); 00570 stream << ss.str(); 00571 } 00572 IBRCOMMON_LOGGER_DEBUG(20) << "ProphetRouting: Serialized DeliveryPredictabilityMap with " << size() << " items." << IBRCOMMON_LOGGER_ENDL; 00573 IBRCOMMON_LOGGER_DEBUG(60) << *this << IBRCOMMON_LOGGER_ENDL; 00574 return stream; 00575 } 00576 00577 std::istream& ProphetRoutingExtension::DeliveryPredictabilityMap::deserialize(std::istream& stream) 00578 { 00579 data::SDNV elements_read(0); 00580 data::SDNV map_size; 00581 stream >> map_size; 00582 00583 while(elements_read < map_size) 00584 { 00585 /* read the EID */ 00586 data::SDNV eid_len; 00587 stream >> eid_len; 00588 char eid_cstr[eid_len.getValue()+1]; 00589 stream.read(eid_cstr, sizeof(eid_cstr)-1); 00590 eid_cstr[sizeof(eid_cstr)-1] = 0; 00591 data::EID eid(eid_cstr); 00592 if(eid == data::EID()) 00593 throw dtn::InvalidDataException("EID could not be casted, while parsing a dp_map."); 00594 00595 /* read the probability (float) */ 00596 float f; 00597 data::SDNV float_len; 00598 stream >> float_len; 00599 char f_cstr[float_len.getValue()+1]; 00600 stream.read(f_cstr, sizeof(f_cstr)-1); 00601 f_cstr[sizeof(f_cstr)-1] = 0; 00602 00603 std::stringstream ss(f_cstr); 00604 ss >> f; 00605 if(ss.fail()) 00606 throw dtn::InvalidDataException("Float could not be casted, while parsing a dp_map."); 00607 00608 /* check if f is in a proper range */ 00609 if(f < 0 || f > 1) 00610 continue; 00611 00612 /* insert the data into the map */ 00613 (*this)[eid] = f; 00614 00615 elements_read += 1; 00616 } 00617 00618 IBRCOMMON_LOGGER_DEBUG(20) << "ProphetRouting: Deserialized DeliveryPredictabilityMap with " << size() << " items." << IBRCOMMON_LOGGER_ENDL; 00619 IBRCOMMON_LOGGER_DEBUG(60) << *this << IBRCOMMON_LOGGER_ENDL; 00620 return stream; 00621 } 00622 00623 void ProphetRoutingExtension::updateNeighbor(const dtn::data::EID &neighbor) 00624 { 00628 DeliveryPredictabilityMap::const_iterator it; 00629 float neighbor_dp = _p_encounter_first; 00630 00631 if ((it = _deliveryPredictabilityMap.find(neighbor)) != _deliveryPredictabilityMap.end()) 00632 { 00633 neighbor_dp = it->second; 00634 00635 if(it->second < _p_first_threshold) 00636 { 00637 neighbor_dp = _p_encounter_first; 00638 } 00639 else 00640 { 00641 neighbor_dp += (1 - _delta - it->second) * p_encounter(neighbor); 00642 } 00643 } 00644 00645 _deliveryPredictabilityMap[neighbor] = neighbor_dp; 00646 _ageMap[neighbor] = dtn::utils::Clock::getUnixTimestamp(); 00647 } 00648 00649 void ProphetRoutingExtension::update(const DeliveryPredictabilityMap& neighbor_dp_map, const dtn::data::EID& neighbor) 00650 { 00651 DeliveryPredictabilityMap::const_iterator it; 00652 float neighbor_dp = _p_encounter_first; 00653 00654 if ((it = _deliveryPredictabilityMap.find(neighbor)) != _deliveryPredictabilityMap.end()) 00655 { 00656 neighbor_dp = it->second; 00657 } 00658 00662 for (it = neighbor_dp_map.begin(); it != neighbor_dp_map.end(); ++it) 00663 { 00664 if ((it->first != neighbor) && (it->first != dtn::core::BundleCore::local)) 00665 { 00666 float dp = 0; 00667 00668 DeliveryPredictabilityMap::iterator dp_it; 00669 if((dp_it = _deliveryPredictabilityMap.find(it->first)) != _deliveryPredictabilityMap.end()) 00670 dp = dp_it->second; 00671 00672 dp = max(dp, neighbor_dp * it->second * _beta); 00673 00674 if(dp_it != _deliveryPredictabilityMap.end()) 00675 dp_it->second = dp; 00676 else 00677 _deliveryPredictabilityMap[it->first] = dp; 00678 } 00679 } 00680 } 00681 00682 void ProphetRoutingExtension::age() 00683 { 00684 size_t current_time = dtn::utils::Clock::getUnixTimestamp(); 00685 00686 // prevent double aging 00687 if (current_time <= _lastAgingTime) return; 00688 00689 unsigned int k = (current_time - _lastAgingTime) / _time_unit; 00690 00691 DeliveryPredictabilityMap::iterator it; 00692 for(it = _deliveryPredictabilityMap.begin(); it != _deliveryPredictabilityMap.end();) 00693 { 00694 if(it->first == dtn::core::BundleCore::local) 00695 { 00696 ++it; 00697 continue; 00698 } 00699 00700 it->second *= pow(_gamma, (int)k); 00701 00702 if(it->second < _p_first_threshold) 00703 { 00704 _deliveryPredictabilityMap.erase(it++); 00705 } else { 00706 ++it; 00707 } 00708 } 00709 00710 _lastAgingTime = current_time; 00711 } 00712 00713 std::ostream& operator<<(std::ostream& stream, const ProphetRoutingExtension::DeliveryPredictabilityMap& map) 00714 { 00715 ProphetRoutingExtension::DeliveryPredictabilityMap::const_iterator it; 00716 for(it = map.begin(); it != map.end(); ++it) 00717 { 00718 stream << it->first.getString() << ": " << it->second << std::endl; 00719 } 00720 00721 return stream; 00722 } 00723 00724 std::ostream& operator<<(std::ostream& stream, const ProphetRoutingExtension::AcknowledgementSet& ack_set) 00725 { 00726 std::set<ProphetRoutingExtension::Acknowledgement>::const_iterator it; 00727 for(it = ack_set._ackSet.begin(); it != ack_set._ackSet.end(); ++it) 00728 { 00729 stream << it->bundleID.toString() << std::endl; 00730 stream << it->expire_time << std::endl; 00731 } 00732 00733 return stream; 00734 } 00735 00736 ProphetRoutingExtension::Acknowledgement::Acknowledgement() 00737 { 00738 } 00739 00740 ProphetRoutingExtension::Acknowledgement::Acknowledgement(const dtn::data::BundleID &bundleID, size_t expire_time) 00741 : bundleID(bundleID), expire_time(expire_time) 00742 { 00743 } 00744 00745 ProphetRoutingExtension::Acknowledgement::~Acknowledgement() 00746 { 00747 } 00748 00749 bool ProphetRoutingExtension::Acknowledgement::operator<(const Acknowledgement &other) const 00750 { 00751 return (bundleID < other.bundleID); 00752 } 00753 00754 ProphetRoutingExtension::AcknowledgementSet::AcknowledgementSet() 00755 { 00756 } 00757 00758 ProphetRoutingExtension::AcknowledgementSet::AcknowledgementSet(const AcknowledgementSet &other) 00759 : ibrcommon::Mutex(), _ackSet(other._ackSet) 00760 { 00761 } 00762 00763 void ProphetRoutingExtension::AcknowledgementSet::insert(const Acknowledgement& ack) 00764 { 00765 _ackSet.insert(ack); 00766 } 00767 00768 void ProphetRoutingExtension::AcknowledgementSet::purge() 00769 { 00770 std::set<Acknowledgement>::iterator it; 00771 for(it = _ackSet.begin(); it != _ackSet.end();) 00772 { 00773 if(dtn::utils::Clock::isExpired(it->expire_time)) 00774 _ackSet.erase(it++); 00775 else 00776 ++it; 00777 } 00778 } 00779 00780 void ProphetRoutingExtension::AcknowledgementSet::merge(const AcknowledgementSet &other) 00781 { 00782 std::set<Acknowledgement>::iterator it; 00783 for(it = other._ackSet.begin(); it != other._ackSet.end(); ++it) 00784 { 00785 _ackSet.insert(*it); 00786 } 00787 } 00788 00789 bool ProphetRoutingExtension::AcknowledgementSet::has(const dtn::data::BundleID &bundle) const 00790 { 00791 return _ackSet.find(Acknowledgement(bundle, 0)) != _ackSet.end(); 00792 } 00793 00794 size_t ProphetRoutingExtension::AcknowledgementSet::getIdentifier() const 00795 { 00796 return identifier; 00797 } 00798 size_t ProphetRoutingExtension::AcknowledgementSet::getLength() const 00799 { 00800 std::stringstream ss; 00801 serialize(ss); 00802 return ss.str().length(); 00803 } 00804 00805 std::ostream& ProphetRoutingExtension::AcknowledgementSet::serialize(std::ostream& stream) const 00806 { 00807 stream << dtn::data::SDNV(_ackSet.size()); 00808 std::set<Acknowledgement>::const_iterator it; 00809 for(it = _ackSet.begin(); it != _ackSet.end(); ++it) 00810 { 00811 it->serialize(stream); 00812 } 00813 return stream; 00814 } 00815 00816 std::istream& ProphetRoutingExtension::AcknowledgementSet::deserialize(std::istream& stream) 00817 { 00818 dtn::data::SDNV size; 00819 stream >> size; 00820 00821 for(dtn::data::SDNV i = 0; i < size; i += 1) 00822 { 00823 Acknowledgement ack; 00824 ack.deserialize(stream); 00825 _ackSet.insert(ack); 00826 } 00827 return stream; 00828 } 00829 00830 std::ostream& ProphetRoutingExtension::Acknowledgement::serialize(std::ostream& stream) const 00831 { 00832 stream << bundleID; 00833 stream << dtn::data::SDNV(expire_time); 00834 return stream; 00835 } 00836 00837 std::istream& ProphetRoutingExtension::Acknowledgement::deserialize(std::istream& stream) 00838 { 00839 dtn::data::SDNV expire_time; 00840 stream >> bundleID; 00841 stream >> expire_time; 00842 this->expire_time = expire_time.getValue(); 00843 /* TODO only accept entries with bundles that we routed ourselves? */ 00844 00845 if (BundleCore::max_lifetime > 0 && this->expire_time > BundleCore::max_lifetime) 00846 this->expire_time = BundleCore::max_lifetime; 00847 00848 return stream; 00849 } 00850 00851 ProphetRoutingExtension::SearchNextBundleTask::SearchNextBundleTask(const dtn::data::EID &eid) 00852 : eid(eid) 00853 { 00854 } 00855 00856 ProphetRoutingExtension::SearchNextBundleTask::~SearchNextBundleTask() 00857 { 00858 } 00859 00860 std::string ProphetRoutingExtension::SearchNextBundleTask::toString() const 00861 { 00862 return "SearchNextBundleTask: " + eid.getString(); 00863 } 00864 00865 ProphetRoutingExtension::NextExchangeTask::NextExchangeTask() 00866 { 00867 } 00868 00869 ProphetRoutingExtension::NextExchangeTask::~NextExchangeTask() 00870 { 00871 } 00872 00873 std::string ProphetRoutingExtension::NextExchangeTask::toString() const 00874 { 00875 return "NextExchangeTask"; 00876 } 00877 00878 ProphetRoutingExtension::ForwardingStrategy::ForwardingStrategy() 00879 : _prophet_router(NULL) 00880 {} 00881 00882 ProphetRoutingExtension::ForwardingStrategy::~ForwardingStrategy() 00883 {} 00884 00885 bool ProphetRoutingExtension::ForwardingStrategy::neighborDPIsGreater(const dtn::data::EID& neighbor, const dtn::data::EID& destination) const 00886 { 00887 bool ret = false; 00888 const DeliveryPredictabilityMap& dp_map = _prophet_router->_deliveryPredictabilityMap; 00889 00890 DeliveryPredictabilityMap::const_iterator neighborIT = dp_map.find(neighbor); 00891 DeliveryPredictabilityMap::const_iterator destinationIT = dp_map.find(destination); 00892 00893 if(destinationIT == dp_map.end()) { 00894 ret = true; 00895 } else { 00896 float neighbor_dp = _prophet_router->_p_first_threshold; 00897 if(neighborIT != dp_map.end()) { 00898 neighbor_dp = neighborIT->second; 00899 } 00900 ret = (neighbor_dp > destinationIT->second); 00901 } 00902 00903 return ret; 00904 } 00905 00906 void ProphetRoutingExtension::ForwardingStrategy::setProphetRouter(ProphetRoutingExtension *router) 00907 { 00908 _prophet_router = router; 00909 } 00910 00911 ProphetRoutingExtension::GRTR_Strategy::GRTR_Strategy() 00912 { 00913 } 00914 00915 ProphetRoutingExtension::GRTR_Strategy::~GRTR_Strategy() 00916 { 00917 } 00918 00919 bool ProphetRoutingExtension::GRTR_Strategy::shallForward(const dtn::data::EID& neighbor, const dtn::data::MetaBundle& bundle) const 00920 { 00921 return neighborDPIsGreater(neighbor, bundle.destination); 00922 } 00923 00924 ProphetRoutingExtension::GTMX_Strategy::GTMX_Strategy(unsigned int NF_max) 00925 : _NF_max(NF_max) 00926 { 00927 } 00928 00929 ProphetRoutingExtension::GTMX_Strategy::~GTMX_Strategy() 00930 { 00931 } 00932 00933 void ProphetRoutingExtension::GTMX_Strategy::addForward(const dtn::data::BundleID &id) 00934 { 00935 nf_map::iterator nf_it = _NF_map.find(id); 00936 00937 if (nf_it == _NF_map.end()) { 00938 nf_it = _NF_map.insert(std::make_pair(id, 0)).first; 00939 } 00940 00941 ++nf_it->second; 00942 } 00943 00944 bool ProphetRoutingExtension::GTMX_Strategy::shallForward(const dtn::data::EID &neighbor, const dtn::data::MetaBundle& bundle) const 00945 { 00946 unsigned int NF = 0; 00947 00948 nf_map::const_iterator nf_it = _NF_map.find(bundle); 00949 if(nf_it != _NF_map.end()) { 00950 NF = nf_it->second; 00951 } 00952 00953 if (NF > _NF_max) return false; 00954 00955 return neighborDPIsGreater(neighbor, bundle.destination); 00956 } 00957 00958 } // namespace routing 00959 } // namespace dtn