IBR-DTNSuite  0.8
daemon/src/routing/NeighborRoutingExtension.cpp
Go to the documentation of this file.
00001 /*
00002  * NeighborRoutingExtension.cpp
00003  *
00004  *  Created on: 16.02.2010
00005  *      Author: morgenro
00006  */
00007 
00008 #include "config.h"
00009 #include "routing/NeighborRoutingExtension.h"
00010 #include "routing/QueueBundleEvent.h"
00011 #include "core/TimeEvent.h"
00012 #include "net/TransferCompletedEvent.h"
00013 #include "net/TransferAbortedEvent.h"
00014 #include "net/ConnectionEvent.h"
00015 #include "core/BundlePurgeEvent.h"
00016 #include "core/NodeEvent.h"
00017 #include "core/Node.h"
00018 #include "net/ConnectionManager.h"
00019 #include "ibrcommon/thread/MutexLock.h"
00020 #include "storage/SimpleBundleStorage.h"
00021 #include "core/BundleEvent.h"
00022 #include <ibrcommon/Logger.h>
00023 
00024 #ifdef HAVE_SQLITE
00025 #include "storage/SQLiteBundleStorage.h"
00026 #endif
00027 
00028 #include <functional>
00029 #include <list>
00030 #include <algorithm>
00031 #include <typeinfo>
00032 #include <memory>
00033 
00034 namespace dtn
00035 {
00036         namespace routing
00037         {
00038                 NeighborRoutingExtension::NeighborRoutingExtension()
00039                 {
00040                 }
00041 
00042                 NeighborRoutingExtension::~NeighborRoutingExtension()
00043                 {
00044                         stop();
00045                         join();
00046                 }
00047 
00048                 void NeighborRoutingExtension::__cancellation()
00049                 {
00050                         _taskqueue.abort();
00051                 }
00052 
00053                 void NeighborRoutingExtension::run()
00054                 {
00055 #ifdef HAVE_SQLITE
00056                         class BundleFilter : public dtn::storage::BundleStorage::BundleFilterCallback, public dtn::storage::SQLiteDatabase::SQLBundleQuery
00057 #else
00058                         class BundleFilter : public dtn::storage::BundleStorage::BundleFilterCallback
00059 #endif
00060                         {
00061                         public:
00062                                 BundleFilter(const NeighborDatabase::NeighborEntry &entry)
00063                                  : _entry(entry)
00064                                 {};
00065 
00066                                 virtual ~BundleFilter() {};
00067 
00068                                 virtual size_t limit() const { return 10; };
00069 
00070                                 virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const
00071                                 {
00072                                         // check Scope Control Block - do not forward bundles with hop limit == 0
00073                                         if (meta.hopcount == 0)
00074                                         {
00075                                                 return false;
00076                                         }
00077 
00078                                         if (meta.get(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON))
00079                                         {
00080                                                 // do not forward local bundles
00081                                                 if (meta.destination.getNode() == dtn::core::BundleCore::local)
00082                                                 {
00083                                                         return false;
00084                                                 }
00085 
00086                                                 // do not forward bundles for other nodes
00087                                                 if (_entry.eid.getNode() != meta.destination.getNode())
00088                                                 {
00089                                                         return false;
00090                                                 }
00091                                         }
00092 
00093                                         // do not forward bundles already known by the destination
00094                                         if (_entry.has(meta))
00095                                         {
00096                                                 return false;
00097                                         }
00098 
00099                                         return true;
00100                                 };
00101 
00102 #ifdef HAVE_SQLITE
00103                                 const std::string getWhere() const
00104                                 {
00105                                         return "destination LIKE ?";
00106                                 };
00107 
00108                                 size_t bind(sqlite3_stmt *st, size_t offset) const
00109                                 {
00110                                         const std::string d = _entry.eid.getNode().getString() + "%";
00111                                         sqlite3_bind_text(st, offset, d.c_str(), d.size(), SQLITE_TRANSIENT);
00112                                         return offset + 1;
00113                                 }
00114 #endif
00115 
00116                         private:
00117                                 const NeighborDatabase::NeighborEntry &_entry;
00118                         };
00119 
00120                         dtn::storage::BundleStorage &storage = (**this).getStorage();
00121 
00122                         while (true)
00123                         {
00124                                 NeighborDatabase &db = (**this).getNeighborDB();
00125 
00126                                 try {
00127                                         Task *t = _taskqueue.getnpop(true);
00128                                         std::auto_ptr<Task> killer(t);
00129 
00130                                         IBRCOMMON_LOGGER_DEBUG(5) << "processing neighbor routing task " << t->toString() << IBRCOMMON_LOGGER_ENDL;
00131 
00137                                         try {
00138                                                 SearchNextBundleTask &task = dynamic_cast<SearchNextBundleTask&>(*t);
00139 
00140                                                 // this destination is not handles by any static route
00141                                                 ibrcommon::MutexLock l(db);
00142                                                 NeighborDatabase::NeighborEntry &entry = db.get(task.eid);
00143 
00144                                                 // create a new bundle filter
00145                                                 BundleFilter filter(entry);
00146 
00147                                                 // query an unknown bundle from the storage, the list contains max. 10 items.
00148                                                 const std::list<dtn::data::MetaBundle> list = storage.get(filter);
00149 
00150                                                 IBRCOMMON_LOGGER_DEBUG(5) << "got " << list.size() << " items to transfer to " << task.eid.getString() << IBRCOMMON_LOGGER_ENDL;
00151 
00152                                                 // send the bundles as long as we have resources
00153                                                 for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); iter++)
00154                                                 {
00155                                                         try {
00156                                                                 // transfer the bundle to the neighbor
00157                                                                 transferTo(entry, *iter);
00158                                                         } catch (const NeighborDatabase::AlreadyInTransitException&) { };
00159                                                 }
00160                                         } catch (const NeighborDatabase::NoMoreTransfersAvailable&) {
00161                                         } catch (const NeighborDatabase::NeighborNotAvailableException&) {
00162                                         } catch (const std::bad_cast&) { };
00163 
00167                                         try {
00168                                                 dynamic_cast<ProcessBundleTask&>(*t);
00169 
00170                                                 // new bundles trigger a recheck for all neighbors
00171                                                 const std::set<dtn::core::Node> nl = dtn::core::BundleCore::getInstance().getNeighbors();
00172 
00173                                                 for (std::set<dtn::core::Node>::const_iterator iter = nl.begin(); iter != nl.end(); iter++)
00174                                                 {
00175                                                         const dtn::core::Node &n = (*iter);
00176 
00177                                                         // transfer the next bundle to this destination
00178                                                         _taskqueue.push( new SearchNextBundleTask( n.getEID() ) );
00179                                                 }
00180                                         } catch (const std::bad_cast&) { };
00181 
00182                                 } catch (const std::exception &ex) {
00183                                         IBRCOMMON_LOGGER_DEBUG(20) << "neighbor routing failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00184                                         return;
00185                                 }
00186 
00187                                 yield();
00188                         }
00189                 }
00190 
00191                 void NeighborRoutingExtension::notify(const dtn::core::Event *evt)
00192                 {
00193                         try {
00194                                 const QueueBundleEvent &queued = dynamic_cast<const QueueBundleEvent&>(*evt);
00195                                 _taskqueue.push( new ProcessBundleTask(queued.bundle, queued.origin) );
00196                                 return;
00197                         } catch (const std::bad_cast&) { };
00198 
00199                         try {
00200                                 const dtn::net::TransferCompletedEvent &completed = dynamic_cast<const dtn::net::TransferCompletedEvent&>(*evt);
00201                                 const dtn::data::MetaBundle &meta = completed.getBundle();
00202                                 const dtn::data::EID &peer = completed.getPeer();
00203 
00204                                 if ((meta.destination.getNode() == peer.getNode())
00205                                                 && (meta.procflags & dtn::data::Bundle::DESTINATION_IS_SINGLETON))
00206                                 {
00207                                         try {
00208                                                 dtn::storage::BundleStorage &storage = (**this).getStorage();
00209 
00210                                                 // bundle has been delivered to its destination
00211                                                 // delete it from our storage
00212                                                 dtn::core::BundlePurgeEvent::raise(meta);
00213 
00214                                                 IBRCOMMON_LOGGER(notice) << "singleton bundle delivered: " << meta.toString() << IBRCOMMON_LOGGER_ENDL;
00215 
00216                                                 // gen a report
00217                                                 dtn::core::BundleEvent::raise(meta, dtn::core::BUNDLE_DELETED, dtn::data::StatusReportBlock::DEPLETED_STORAGE);
00218                                         } catch (const dtn::storage::BundleStorage::NoBundleFoundException&) { };
00219 
00220                                         // transfer the next bundle to this destination
00221                                         _taskqueue.push( new SearchNextBundleTask( peer ) );
00222                                 }
00223                                 return;
00224                         } catch (const std::bad_cast&) { };
00225 
00226                         try {
00227                                 const dtn::net::TransferAbortedEvent &aborted = dynamic_cast<const dtn::net::TransferAbortedEvent&>(*evt);
00228                                 const dtn::data::EID &peer = aborted.getPeer();
00229                                 const dtn::data::BundleID &id = aborted.getBundleID();
00230 
00231                                 switch (aborted.reason)
00232                                 {
00233                                         case dtn::net::TransferAbortedEvent::REASON_UNDEFINED:
00234                                                 break;
00235 
00236                                         case dtn::net::TransferAbortedEvent::REASON_RETRY_LIMIT_REACHED:
00237                                                 break;
00238 
00239                                         case dtn::net::TransferAbortedEvent::REASON_BUNDLE_DELETED:
00240                                                 break;
00241 
00242                                         case dtn::net::TransferAbortedEvent::REASON_CONNECTION_DOWN:
00243                                                 return;
00244 
00245                                         case dtn::net::TransferAbortedEvent::REASON_REFUSED:
00246                                         {
00247                                                 try {
00248                                                         const dtn::data::MetaBundle meta = (**this).getStorage().get(id);
00249 
00250                                                         // if the bundle has been sent by this module delete it
00251                                                         if ((meta.destination.getNode() == peer.getNode())
00252                                                                         && (meta.procflags & dtn::data::Bundle::DESTINATION_IS_SINGLETON))
00253                                                         {
00254                                                                 // bundle is not deliverable
00255                                                                 dtn::core::BundlePurgeEvent::raise(id, dtn::data::StatusReportBlock::NO_KNOWN_ROUTE_TO_DESTINATION_FROM_HERE);
00256                                                         }
00257                                                 } catch (const dtn::storage::BundleStorage::NoBundleFoundException&) { };
00258                                         }
00259                                         break;
00260                                 }
00261 
00262                                 // transfer the next bundle to this destination
00263                                 _taskqueue.push( new SearchNextBundleTask( peer ) );
00264 
00265                                 return;
00266                         } catch (const std::bad_cast&) { };
00267 
00268                         // If a new neighbor comes available, send him a request for the summary vector
00269                         // If a neighbor went away we can free the stored summary vector
00270                         try {
00271                                 const dtn::core::NodeEvent &nodeevent = dynamic_cast<const dtn::core::NodeEvent&>(*evt);
00272                                 const dtn::core::Node &n = nodeevent.getNode();
00273 
00274                                 if (nodeevent.getAction() == NODE_AVAILABLE)
00275                                 {
00276                                         _taskqueue.push( new SearchNextBundleTask( n.getEID() ) );
00277                                 }
00278 
00279                                 return;
00280                         } catch (const std::bad_cast&) { };
00281 
00282                         try {
00283                                 const dtn::net::ConnectionEvent &ce = dynamic_cast<const dtn::net::ConnectionEvent&>(*evt);
00284 
00285                                 if (ce.state == dtn::net::ConnectionEvent::CONNECTION_UP)
00286                                 {
00287                                         // send all (multi-hop) bundles in the storage to the neighbor
00288                                         _taskqueue.push( new SearchNextBundleTask(ce.peer) );
00289                                 }
00290                                 return;
00291                         } catch (const std::bad_cast&) { };
00292                 }
00293 
00294                 /****************************************/
00295 
00296                 NeighborRoutingExtension::SearchNextBundleTask::SearchNextBundleTask(const dtn::data::EID &e)
00297                  : eid(e)
00298                 { }
00299 
00300                 NeighborRoutingExtension::SearchNextBundleTask::~SearchNextBundleTask()
00301                 { }
00302 
00303                 std::string NeighborRoutingExtension::SearchNextBundleTask::toString()
00304                 {
00305                         return "SearchNextBundleTask: " + eid.getString();
00306                 }
00307 
00308                 /****************************************/
00309 
00310                 NeighborRoutingExtension::ProcessBundleTask::ProcessBundleTask(const dtn::data::MetaBundle &meta, const dtn::data::EID &o)
00311                  : bundle(meta), origin(o)
00312                 { }
00313 
00314                 NeighborRoutingExtension::ProcessBundleTask::~ProcessBundleTask()
00315                 { }
00316 
00317                 std::string NeighborRoutingExtension::ProcessBundleTask::toString()
00318                 {
00319                         return "ProcessBundleTask: " + bundle.toString();
00320                 }
00321         }
00322 }