IBR-DTNSuite
0.8
|
00001 /* 00002 * EpidemicRoutingExtension.cpp 00003 * 00004 * Created on: 18.02.2010 00005 * Author: morgenro 00006 */ 00007 00008 #include "routing/epidemic/EpidemicRoutingExtension.h" 00009 00010 #include "routing/QueueBundleEvent.h" 00011 #include "routing/NodeHandshakeEvent.h" 00012 #include "net/TransferCompletedEvent.h" 00013 #include "net/TransferAbortedEvent.h" 00014 #include "core/NodeEvent.h" 00015 #include "core/TimeEvent.h" 00016 #include "core/Node.h" 00017 #include "net/ConnectionManager.h" 00018 #include "Configuration.h" 00019 #include "core/BundleCore.h" 00020 #include "core/BundleEvent.h" 00021 00022 #include <ibrdtn/data/MetaBundle.h> 00023 #include <ibrcommon/thread/MutexLock.h> 00024 #include <ibrcommon/Logger.h> 00025 00026 #include <functional> 00027 #include <list> 00028 #include <algorithm> 00029 00030 #include <iomanip> 00031 #include <ios> 00032 #include <iostream> 00033 #include <set> 00034 #include <memory> 00035 00036 #include <stdlib.h> 00037 #include <typeinfo> 00038 00039 namespace dtn 00040 { 00041 namespace routing 00042 { 00043 EpidemicRoutingExtension::EpidemicRoutingExtension() 00044 { 00045 // write something to the syslog 00046 IBRCOMMON_LOGGER(info) << "Initializing epidemic routing module" << IBRCOMMON_LOGGER_ENDL; 00047 } 00048 00049 EpidemicRoutingExtension::~EpidemicRoutingExtension() 00050 { 00051 stop(); 00052 join(); 00053 } 00054 00055 void EpidemicRoutingExtension::requestHandshake(const dtn::data::EID&, NodeHandshake &request) const 00056 { 00057 request.addRequest(BloomFilterSummaryVector::identifier); 00058 } 00059 00060 void EpidemicRoutingExtension::notify(const dtn::core::Event *evt) 00061 { 00062 // If an incoming bundle is received, forward it to all connected neighbors 00063 try { 00064 dynamic_cast<const QueueBundleEvent&>(*evt); 00065 00066 // new bundles trigger a recheck for all neighbors 00067 const std::set<dtn::core::Node> nl = dtn::core::BundleCore::getInstance().getNeighbors(); 00068 00069 for (std::set<dtn::core::Node>::const_iterator iter = nl.begin(); iter != nl.end(); iter++) 00070 { 00071 const dtn::core::Node &n = (*iter); 00072 00073 // transfer the next bundle to this destination 00074 _taskqueue.push( new SearchNextBundleTask( n.getEID() ) ); 00075 } 00076 return; 00077 } catch (const std::bad_cast&) { }; 00078 00079 // If a new neighbor comes available search for bundles 00080 try { 00081 const dtn::core::NodeEvent &nodeevent = dynamic_cast<const dtn::core::NodeEvent&>(*evt); 00082 const dtn::core::Node &n = nodeevent.getNode(); 00083 00084 if (nodeevent.getAction() == NODE_AVAILABLE) 00085 { 00086 _taskqueue.push( new SearchNextBundleTask( n.getEID() ) ); 00087 } 00088 00089 return; 00090 } catch (const std::bad_cast&) { }; 00091 00092 try { 00093 const NodeHandshakeEvent &handshake = dynamic_cast<const NodeHandshakeEvent&>(*evt); 00094 00095 if (handshake.state == NodeHandshakeEvent::HANDSHAKE_UPDATED) 00096 { 00097 // transfer the next bundle to this destination 00098 _taskqueue.push( new SearchNextBundleTask( handshake.peer ) ); 00099 } 00100 else if (handshake.state == NodeHandshakeEvent::HANDSHAKE_COMPLETED) 00101 { 00102 // transfer the next bundle to this destination 00103 _taskqueue.push( new SearchNextBundleTask( handshake.peer ) ); 00104 } 00105 return; 00106 } catch (const std::bad_cast&) { }; 00107 00108 // The bundle transfer has been aborted 00109 try { 00110 const dtn::net::TransferAbortedEvent &aborted = dynamic_cast<const dtn::net::TransferAbortedEvent&>(*evt); 00111 00112 // transfer the next bundle to this destination 00113 _taskqueue.push( new SearchNextBundleTask( aborted.getPeer() ) ); 00114 00115 return; 00116 } catch (const std::bad_cast&) { }; 00117 00118 // A bundle transfer was successful 00119 try { 00120 const dtn::net::TransferCompletedEvent &completed = dynamic_cast<const dtn::net::TransferCompletedEvent&>(*evt); 00121 00122 // transfer the next bundle to this destination 00123 _taskqueue.push( new SearchNextBundleTask( completed.getPeer() ) ); 00124 return; 00125 } catch (const std::bad_cast&) { }; 00126 } 00127 00128 void EpidemicRoutingExtension::__cancellation() 00129 { 00130 _taskqueue.abort(); 00131 } 00132 00133 void EpidemicRoutingExtension::run() 00134 { 00135 class BundleFilter : public dtn::storage::BundleStorage::BundleFilterCallback 00136 { 00137 public: 00138 BundleFilter(const NeighborDatabase::NeighborEntry &entry) 00139 : _entry(entry) 00140 {}; 00141 00142 virtual ~BundleFilter() {}; 00143 00144 virtual size_t limit() const { return 10; }; 00145 00146 virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const 00147 { 00148 // check Scope Control Block - do not forward bundles with hop limit == 0 00149 if (meta.hopcount == 0) 00150 { 00151 return false; 00152 } 00153 00154 // do not forward any routing control message 00155 // this is done by the neighbor routing module 00156 if (isRouting(meta.source)) 00157 { 00158 return false; 00159 } 00160 00161 // do not forward local bundles 00162 if ((meta.destination.getNode() == dtn::core::BundleCore::local) 00163 && meta.get(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON) 00164 ) 00165 { 00166 return false; 00167 } 00168 00169 // check Scope Control Block - do not forward non-group bundles with hop limit <= 1 00170 if ((meta.hopcount <= 1) && (meta.get(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON))) 00171 { 00172 return false; 00173 } 00174 00175 // do not forward to any blacklisted destination 00176 const dtn::data::EID dest = meta.destination.getNode(); 00177 if (_blacklist.find(dest) != _blacklist.end()) 00178 { 00179 return false; 00180 } 00181 00182 // do not forward bundles already known by the destination 00183 // throws BloomfilterNotAvailableException if no filter is available or it is expired 00184 if (_entry.has(meta, true)) 00185 { 00186 return false; 00187 } 00188 00189 return true; 00190 }; 00191 00192 void blacklist(const dtn::data::EID& id) 00193 { 00194 _blacklist.insert(id); 00195 }; 00196 00197 private: 00198 std::set<dtn::data::EID> _blacklist; 00199 const NeighborDatabase::NeighborEntry &_entry; 00200 }; 00201 00202 dtn::storage::BundleStorage &storage = (**this).getStorage(); 00203 00204 while (true) 00205 { 00206 try { 00207 Task *t = _taskqueue.getnpop(true); 00208 std::auto_ptr<Task> killer(t); 00209 00210 IBRCOMMON_LOGGER_DEBUG(50) << "processing epidemic task " << t->toString() << IBRCOMMON_LOGGER_ENDL; 00211 00212 try { 00218 try { 00219 SearchNextBundleTask &task = dynamic_cast<SearchNextBundleTask&>(*t); 00220 NeighborDatabase &db = (**this).getNeighborDB(); 00221 00222 ibrcommon::MutexLock l(db); 00223 NeighborDatabase::NeighborEntry &entry = db.get(task.eid); 00224 00225 try { 00226 // get the bundle filter of the neighbor 00227 BundleFilter filter(entry); 00228 00229 // some debug output 00230 IBRCOMMON_LOGGER_DEBUG(40) << "search some bundles not known by " << task.eid.getString() << IBRCOMMON_LOGGER_ENDL; 00231 00232 // blacklist the neighbor itself, because this is handled by neighbor routing extension 00233 filter.blacklist(task.eid); 00234 00235 // query some unknown bundle from the storage, the list contains max. 10 items. 00236 const std::list<dtn::data::MetaBundle> list = storage.get(filter); 00237 00238 // send the bundles as long as we have resources 00239 for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); iter++) 00240 { 00241 try { 00242 // transfer the bundle to the neighbor 00243 transferTo(entry, *iter); 00244 } catch (const NeighborDatabase::AlreadyInTransitException&) { }; 00245 } 00246 } catch (const NeighborDatabase::BloomfilterNotAvailableException&) { 00247 // query a new summary vector from this neighbor 00248 (**this).doHandshake(task.eid); 00249 } 00250 } catch (const NeighborDatabase::NoMoreTransfersAvailable&) { 00251 } catch (const NeighborDatabase::NeighborNotAvailableException&) { 00252 } catch (const std::bad_cast&) { }; 00253 } catch (const ibrcommon::Exception &ex) { 00254 IBRCOMMON_LOGGER_DEBUG(20) << "Exception occurred in EpidemicRoutingExtension: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00255 } 00256 } catch (const std::exception&) { 00257 return; 00258 } 00259 00260 yield(); 00261 } 00262 } 00263 00264 /****************************************/ 00265 00266 EpidemicRoutingExtension::SearchNextBundleTask::SearchNextBundleTask(const dtn::data::EID &e) 00267 : eid(e) 00268 { } 00269 00270 EpidemicRoutingExtension::SearchNextBundleTask::~SearchNextBundleTask() 00271 { } 00272 00273 std::string EpidemicRoutingExtension::SearchNextBundleTask::toString() 00274 { 00275 return "SearchNextBundleTask: " + eid.getString(); 00276 } 00277 } 00278 }