IBR-DTNSuite
0.8
|
00001 /* 00002 * NeighborRoutingExtension.cpp 00003 * 00004 * Created on: 16.02.2010 00005 * Author: morgenro 00006 */ 00007 00008 #include "config.h" 00009 #include "routing/NeighborRoutingExtension.h" 00010 #include "routing/QueueBundleEvent.h" 00011 #include "core/TimeEvent.h" 00012 #include "net/TransferCompletedEvent.h" 00013 #include "net/TransferAbortedEvent.h" 00014 #include "net/ConnectionEvent.h" 00015 #include "core/BundlePurgeEvent.h" 00016 #include "core/NodeEvent.h" 00017 #include "core/Node.h" 00018 #include "net/ConnectionManager.h" 00019 #include "ibrcommon/thread/MutexLock.h" 00020 #include "storage/SimpleBundleStorage.h" 00021 #include "core/BundleEvent.h" 00022 #include <ibrcommon/Logger.h> 00023 00024 #ifdef HAVE_SQLITE 00025 #include "storage/SQLiteBundleStorage.h" 00026 #endif 00027 00028 #include <functional> 00029 #include <list> 00030 #include <algorithm> 00031 #include <typeinfo> 00032 #include <memory> 00033 00034 namespace dtn 00035 { 00036 namespace routing 00037 { 00038 NeighborRoutingExtension::NeighborRoutingExtension() 00039 { 00040 } 00041 00042 NeighborRoutingExtension::~NeighborRoutingExtension() 00043 { 00044 stop(); 00045 join(); 00046 } 00047 00048 void NeighborRoutingExtension::__cancellation() 00049 { 00050 _taskqueue.abort(); 00051 } 00052 00053 void NeighborRoutingExtension::run() 00054 { 00055 #ifdef HAVE_SQLITE 00056 class BundleFilter : public dtn::storage::BundleStorage::BundleFilterCallback, public dtn::storage::SQLiteDatabase::SQLBundleQuery 00057 #else 00058 class BundleFilter : public dtn::storage::BundleStorage::BundleFilterCallback 00059 #endif 00060 { 00061 public: 00062 BundleFilter(const NeighborDatabase::NeighborEntry &entry) 00063 : _entry(entry) 00064 {}; 00065 00066 virtual ~BundleFilter() {}; 00067 00068 virtual size_t limit() const { return 10; }; 00069 00070 virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const 00071 { 00072 // check Scope Control Block - do not forward bundles with hop limit == 0 00073 if (meta.hopcount == 0) 00074 { 00075 return false; 00076 } 00077 00078 if (meta.get(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON)) 00079 { 00080 // do not forward local bundles 00081 if (meta.destination.getNode() == dtn::core::BundleCore::local) 00082 { 00083 return false; 00084 } 00085 00086 // do not forward bundles for other nodes 00087 if (_entry.eid.getNode() != meta.destination.getNode()) 00088 { 00089 return false; 00090 } 00091 } 00092 00093 // do not forward bundles already known by the destination 00094 if (_entry.has(meta)) 00095 { 00096 return false; 00097 } 00098 00099 return true; 00100 }; 00101 00102 #ifdef HAVE_SQLITE 00103 const std::string getWhere() const 00104 { 00105 return "destination LIKE ?"; 00106 }; 00107 00108 size_t bind(sqlite3_stmt *st, size_t offset) const 00109 { 00110 const std::string d = _entry.eid.getNode().getString() + "%"; 00111 sqlite3_bind_text(st, offset, d.c_str(), d.size(), SQLITE_TRANSIENT); 00112 return offset + 1; 00113 } 00114 #endif 00115 00116 private: 00117 const NeighborDatabase::NeighborEntry &_entry; 00118 }; 00119 00120 dtn::storage::BundleStorage &storage = (**this).getStorage(); 00121 00122 while (true) 00123 { 00124 NeighborDatabase &db = (**this).getNeighborDB(); 00125 00126 try { 00127 Task *t = _taskqueue.getnpop(true); 00128 std::auto_ptr<Task> killer(t); 00129 00130 IBRCOMMON_LOGGER_DEBUG(5) << "processing neighbor routing task " << t->toString() << IBRCOMMON_LOGGER_ENDL; 00131 00137 try { 00138 SearchNextBundleTask &task = dynamic_cast<SearchNextBundleTask&>(*t); 00139 00140 // this destination is not handles by any static route 00141 ibrcommon::MutexLock l(db); 00142 NeighborDatabase::NeighborEntry &entry = db.get(task.eid); 00143 00144 // create a new bundle filter 00145 BundleFilter filter(entry); 00146 00147 // query an unknown bundle from the storage, the list contains max. 10 items. 00148 const std::list<dtn::data::MetaBundle> list = storage.get(filter); 00149 00150 IBRCOMMON_LOGGER_DEBUG(5) << "got " << list.size() << " items to transfer to " << task.eid.getString() << IBRCOMMON_LOGGER_ENDL; 00151 00152 // send the bundles as long as we have resources 00153 for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); iter++) 00154 { 00155 try { 00156 // transfer the bundle to the neighbor 00157 transferTo(entry, *iter); 00158 } catch (const NeighborDatabase::AlreadyInTransitException&) { }; 00159 } 00160 } catch (const NeighborDatabase::NoMoreTransfersAvailable&) { 00161 } catch (const NeighborDatabase::NeighborNotAvailableException&) { 00162 } catch (const std::bad_cast&) { }; 00163 00167 try { 00168 dynamic_cast<ProcessBundleTask&>(*t); 00169 00170 // new bundles trigger a recheck for all neighbors 00171 const std::set<dtn::core::Node> nl = dtn::core::BundleCore::getInstance().getNeighbors(); 00172 00173 for (std::set<dtn::core::Node>::const_iterator iter = nl.begin(); iter != nl.end(); iter++) 00174 { 00175 const dtn::core::Node &n = (*iter); 00176 00177 // transfer the next bundle to this destination 00178 _taskqueue.push( new SearchNextBundleTask( n.getEID() ) ); 00179 } 00180 } catch (const std::bad_cast&) { }; 00181 00182 } catch (const std::exception &ex) { 00183 IBRCOMMON_LOGGER_DEBUG(20) << "neighbor routing failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00184 return; 00185 } 00186 00187 yield(); 00188 } 00189 } 00190 00191 void NeighborRoutingExtension::notify(const dtn::core::Event *evt) 00192 { 00193 try { 00194 const QueueBundleEvent &queued = dynamic_cast<const QueueBundleEvent&>(*evt); 00195 _taskqueue.push( new ProcessBundleTask(queued.bundle, queued.origin) ); 00196 return; 00197 } catch (const std::bad_cast&) { }; 00198 00199 try { 00200 const dtn::net::TransferCompletedEvent &completed = dynamic_cast<const dtn::net::TransferCompletedEvent&>(*evt); 00201 const dtn::data::MetaBundle &meta = completed.getBundle(); 00202 const dtn::data::EID &peer = completed.getPeer(); 00203 00204 if ((meta.destination.getNode() == peer.getNode()) 00205 && (meta.procflags & dtn::data::Bundle::DESTINATION_IS_SINGLETON)) 00206 { 00207 try { 00208 dtn::storage::BundleStorage &storage = (**this).getStorage(); 00209 00210 // bundle has been delivered to its destination 00211 // delete it from our storage 00212 dtn::core::BundlePurgeEvent::raise(meta); 00213 00214 IBRCOMMON_LOGGER(notice) << "singleton bundle delivered: " << meta.toString() << IBRCOMMON_LOGGER_ENDL; 00215 00216 // gen a report 00217 dtn::core::BundleEvent::raise(meta, dtn::core::BUNDLE_DELETED, dtn::data::StatusReportBlock::DEPLETED_STORAGE); 00218 } catch (const dtn::storage::BundleStorage::NoBundleFoundException&) { }; 00219 00220 // transfer the next bundle to this destination 00221 _taskqueue.push( new SearchNextBundleTask( peer ) ); 00222 } 00223 return; 00224 } catch (const std::bad_cast&) { }; 00225 00226 try { 00227 const dtn::net::TransferAbortedEvent &aborted = dynamic_cast<const dtn::net::TransferAbortedEvent&>(*evt); 00228 const dtn::data::EID &peer = aborted.getPeer(); 00229 const dtn::data::BundleID &id = aborted.getBundleID(); 00230 00231 switch (aborted.reason) 00232 { 00233 case dtn::net::TransferAbortedEvent::REASON_UNDEFINED: 00234 break; 00235 00236 case dtn::net::TransferAbortedEvent::REASON_RETRY_LIMIT_REACHED: 00237 break; 00238 00239 case dtn::net::TransferAbortedEvent::REASON_BUNDLE_DELETED: 00240 break; 00241 00242 case dtn::net::TransferAbortedEvent::REASON_CONNECTION_DOWN: 00243 return; 00244 00245 case dtn::net::TransferAbortedEvent::REASON_REFUSED: 00246 { 00247 try { 00248 const dtn::data::MetaBundle meta = (**this).getStorage().get(id); 00249 00250 // if the bundle has been sent by this module delete it 00251 if ((meta.destination.getNode() == peer.getNode()) 00252 && (meta.procflags & dtn::data::Bundle::DESTINATION_IS_SINGLETON)) 00253 { 00254 // bundle is not deliverable 00255 dtn::core::BundlePurgeEvent::raise(id, dtn::data::StatusReportBlock::NO_KNOWN_ROUTE_TO_DESTINATION_FROM_HERE); 00256 } 00257 } catch (const dtn::storage::BundleStorage::NoBundleFoundException&) { }; 00258 } 00259 break; 00260 } 00261 00262 // transfer the next bundle to this destination 00263 _taskqueue.push( new SearchNextBundleTask( peer ) ); 00264 00265 return; 00266 } catch (const std::bad_cast&) { }; 00267 00268 // If a new neighbor comes available, send him a request for the summary vector 00269 // If a neighbor went away we can free the stored summary vector 00270 try { 00271 const dtn::core::NodeEvent &nodeevent = dynamic_cast<const dtn::core::NodeEvent&>(*evt); 00272 const dtn::core::Node &n = nodeevent.getNode(); 00273 00274 if (nodeevent.getAction() == NODE_AVAILABLE) 00275 { 00276 _taskqueue.push( new SearchNextBundleTask( n.getEID() ) ); 00277 } 00278 00279 return; 00280 } catch (const std::bad_cast&) { }; 00281 00282 try { 00283 const dtn::net::ConnectionEvent &ce = dynamic_cast<const dtn::net::ConnectionEvent&>(*evt); 00284 00285 if (ce.state == dtn::net::ConnectionEvent::CONNECTION_UP) 00286 { 00287 // send all (multi-hop) bundles in the storage to the neighbor 00288 _taskqueue.push( new SearchNextBundleTask(ce.peer) ); 00289 } 00290 return; 00291 } catch (const std::bad_cast&) { }; 00292 } 00293 00294 /****************************************/ 00295 00296 NeighborRoutingExtension::SearchNextBundleTask::SearchNextBundleTask(const dtn::data::EID &e) 00297 : eid(e) 00298 { } 00299 00300 NeighborRoutingExtension::SearchNextBundleTask::~SearchNextBundleTask() 00301 { } 00302 00303 std::string NeighborRoutingExtension::SearchNextBundleTask::toString() 00304 { 00305 return "SearchNextBundleTask: " + eid.getString(); 00306 } 00307 00308 /****************************************/ 00309 00310 NeighborRoutingExtension::ProcessBundleTask::ProcessBundleTask(const dtn::data::MetaBundle &meta, const dtn::data::EID &o) 00311 : bundle(meta), origin(o) 00312 { } 00313 00314 NeighborRoutingExtension::ProcessBundleTask::~ProcessBundleTask() 00315 { } 00316 00317 std::string NeighborRoutingExtension::ProcessBundleTask::toString() 00318 { 00319 return "ProcessBundleTask: " + bundle.toString(); 00320 } 00321 } 00322 }