IBR-DTNSuite  0.8
daemon/src/routing/epidemic/EpidemicRoutingExtension.cpp
Go to the documentation of this file.
00001 /*
00002  * EpidemicRoutingExtension.cpp
00003  *
00004  *  Created on: 18.02.2010
00005  *      Author: morgenro
00006  */
00007 
00008 #include "routing/epidemic/EpidemicRoutingExtension.h"
00009 
00010 #include "routing/QueueBundleEvent.h"
00011 #include "routing/NodeHandshakeEvent.h"
00012 #include "net/TransferCompletedEvent.h"
00013 #include "net/TransferAbortedEvent.h"
00014 #include "core/NodeEvent.h"
00015 #include "core/TimeEvent.h"
00016 #include "core/Node.h"
00017 #include "net/ConnectionManager.h"
00018 #include "Configuration.h"
00019 #include "core/BundleCore.h"
00020 #include "core/BundleEvent.h"
00021 
00022 #include <ibrdtn/data/MetaBundle.h>
00023 #include <ibrcommon/thread/MutexLock.h>
00024 #include <ibrcommon/Logger.h>
00025 
00026 #include <functional>
00027 #include <list>
00028 #include <algorithm>
00029 
00030 #include <iomanip>
00031 #include <ios>
00032 #include <iostream>
00033 #include <set>
00034 #include <memory>
00035 
00036 #include <stdlib.h>
00037 #include <typeinfo>
00038 
00039 namespace dtn
00040 {
00041         namespace routing
00042         {
00043                 EpidemicRoutingExtension::EpidemicRoutingExtension()
00044                 {
00045                         // write something to the syslog
00046                         IBRCOMMON_LOGGER(info) << "Initializing epidemic routing module" << IBRCOMMON_LOGGER_ENDL;
00047                 }
00048 
00049                 EpidemicRoutingExtension::~EpidemicRoutingExtension()
00050                 {
00051                         stop();
00052                         join();
00053                 }
00054 
00055                 void EpidemicRoutingExtension::requestHandshake(const dtn::data::EID&, NodeHandshake &request) const
00056                 {
00057                         request.addRequest(BloomFilterSummaryVector::identifier);
00058                 }
00059 
00060                 void EpidemicRoutingExtension::notify(const dtn::core::Event *evt)
00061                 {
00062                         // If an incoming bundle is received, forward it to all connected neighbors
00063                         try {
00064                                 dynamic_cast<const QueueBundleEvent&>(*evt);
00065 
00066                                 // new bundles trigger a recheck for all neighbors
00067                                 const std::set<dtn::core::Node> nl = dtn::core::BundleCore::getInstance().getNeighbors();
00068 
00069                                 for (std::set<dtn::core::Node>::const_iterator iter = nl.begin(); iter != nl.end(); iter++)
00070                                 {
00071                                         const dtn::core::Node &n = (*iter);
00072 
00073                                         // transfer the next bundle to this destination
00074                                         _taskqueue.push( new SearchNextBundleTask( n.getEID() ) );
00075                                 }
00076                                 return;
00077                         } catch (const std::bad_cast&) { };
00078 
00079                         // If a new neighbor comes available search for bundles
00080                         try {
00081                                 const dtn::core::NodeEvent &nodeevent = dynamic_cast<const dtn::core::NodeEvent&>(*evt);
00082                                 const dtn::core::Node &n = nodeevent.getNode();
00083 
00084                                 if (nodeevent.getAction() == NODE_AVAILABLE)
00085                                 {
00086                                         _taskqueue.push( new SearchNextBundleTask( n.getEID() ) );
00087                                 }
00088 
00089                                 return;
00090                         } catch (const std::bad_cast&) { };
00091 
00092                         try {
00093                                 const NodeHandshakeEvent &handshake = dynamic_cast<const NodeHandshakeEvent&>(*evt);
00094 
00095                                 if (handshake.state == NodeHandshakeEvent::HANDSHAKE_UPDATED)
00096                                 {
00097                                         // transfer the next bundle to this destination
00098                                         _taskqueue.push( new SearchNextBundleTask( handshake.peer ) );
00099                                 }
00100                                 else if (handshake.state == NodeHandshakeEvent::HANDSHAKE_COMPLETED)
00101                                 {
00102                                         // transfer the next bundle to this destination
00103                                         _taskqueue.push( new SearchNextBundleTask( handshake.peer ) );
00104                                 }
00105                                 return;
00106                         } catch (const std::bad_cast&) { };
00107 
00108                         // The bundle transfer has been aborted
00109                         try {
00110                                 const dtn::net::TransferAbortedEvent &aborted = dynamic_cast<const dtn::net::TransferAbortedEvent&>(*evt);
00111 
00112                                 // transfer the next bundle to this destination
00113                                 _taskqueue.push( new SearchNextBundleTask( aborted.getPeer() ) );
00114 
00115                                 return;
00116                         } catch (const std::bad_cast&) { };
00117 
00118                         // A bundle transfer was successful
00119                         try {
00120                                 const dtn::net::TransferCompletedEvent &completed = dynamic_cast<const dtn::net::TransferCompletedEvent&>(*evt);
00121 
00122                                 // transfer the next bundle to this destination
00123                                 _taskqueue.push( new SearchNextBundleTask( completed.getPeer() ) );
00124                                 return;
00125                         } catch (const std::bad_cast&) { };
00126                 }
00127 
00128                 void EpidemicRoutingExtension::__cancellation()
00129                 {
00130                         _taskqueue.abort();
00131                 }
00132 
00133                 void EpidemicRoutingExtension::run()
00134                 {
00135                         class BundleFilter : public dtn::storage::BundleStorage::BundleFilterCallback
00136                         {
00137                         public:
00138                                 BundleFilter(const NeighborDatabase::NeighborEntry &entry)
00139                                  : _entry(entry)
00140                                 {};
00141 
00142                                 virtual ~BundleFilter() {};
00143 
00144                                 virtual size_t limit() const { return 10; };
00145 
00146                                 virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const
00147                                 {
00148                                         // check Scope Control Block - do not forward bundles with hop limit == 0
00149                                         if (meta.hopcount == 0)
00150                                         {
00151                                                 return false;
00152                                         }
00153 
00154                                         // do not forward any routing control message
00155                                         // this is done by the neighbor routing module
00156                                         if (isRouting(meta.source))
00157                                         {
00158                                                 return false;
00159                                         }
00160 
00161                                         // do not forward local bundles
00162                                         if ((meta.destination.getNode() == dtn::core::BundleCore::local)
00163                                                         && meta.get(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON)
00164                                                 )
00165                                         {
00166                                                 return false;
00167                                         }
00168 
00169                                         // check Scope Control Block - do not forward non-group bundles with hop limit <= 1
00170                                         if ((meta.hopcount <= 1) && (meta.get(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON)))
00171                                         {
00172                                                 return false;
00173                                         }
00174 
00175                                         // do not forward to any blacklisted destination
00176                                         const dtn::data::EID dest = meta.destination.getNode();
00177                                         if (_blacklist.find(dest) != _blacklist.end())
00178                                         {
00179                                                 return false;
00180                                         }
00181 
00182                                         // do not forward bundles already known by the destination
00183                                         // throws BloomfilterNotAvailableException if no filter is available or it is expired
00184                                         if (_entry.has(meta, true))
00185                                         {
00186                                                 return false;
00187                                         }
00188 
00189                                         return true;
00190                                 };
00191 
00192                                 void blacklist(const dtn::data::EID& id)
00193                                 {
00194                                         _blacklist.insert(id);
00195                                 };
00196 
00197                         private:
00198                                 std::set<dtn::data::EID> _blacklist;
00199                                 const NeighborDatabase::NeighborEntry &_entry;
00200                         };
00201 
00202                         dtn::storage::BundleStorage &storage = (**this).getStorage();
00203 
00204                         while (true)
00205                         {
00206                                 try {
00207                                         Task *t = _taskqueue.getnpop(true);
00208                                         std::auto_ptr<Task> killer(t);
00209 
00210                                         IBRCOMMON_LOGGER_DEBUG(50) << "processing epidemic task " << t->toString() << IBRCOMMON_LOGGER_ENDL;
00211 
00212                                         try {
00218                                                 try {
00219                                                         SearchNextBundleTask &task = dynamic_cast<SearchNextBundleTask&>(*t);
00220                                                         NeighborDatabase &db = (**this).getNeighborDB();
00221 
00222                                                         ibrcommon::MutexLock l(db);
00223                                                         NeighborDatabase::NeighborEntry &entry = db.get(task.eid);
00224 
00225                                                         try {
00226                                                                 // get the bundle filter of the neighbor
00227                                                                 BundleFilter filter(entry);
00228 
00229                                                                 // some debug output
00230                                                                 IBRCOMMON_LOGGER_DEBUG(40) << "search some bundles not known by " << task.eid.getString() << IBRCOMMON_LOGGER_ENDL;
00231 
00232                                                                 // blacklist the neighbor itself, because this is handled by neighbor routing extension
00233                                                                 filter.blacklist(task.eid);
00234 
00235                                                                 // query some unknown bundle from the storage, the list contains max. 10 items.
00236                                                                 const std::list<dtn::data::MetaBundle> list = storage.get(filter);
00237 
00238                                                                 // send the bundles as long as we have resources
00239                                                                 for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); iter++)
00240                                                                 {
00241                                                                         try {
00242                                                                                 // transfer the bundle to the neighbor
00243                                                                                 transferTo(entry, *iter);
00244                                                                         } catch (const NeighborDatabase::AlreadyInTransitException&) { };
00245                                                                 }
00246                                                         } catch (const NeighborDatabase::BloomfilterNotAvailableException&) {
00247                                                                 // query a new summary vector from this neighbor
00248                                                                 (**this).doHandshake(task.eid);
00249                                                         }
00250                                                 } catch (const NeighborDatabase::NoMoreTransfersAvailable&) {
00251                                                 } catch (const NeighborDatabase::NeighborNotAvailableException&) {
00252                                                 } catch (const std::bad_cast&) { };
00253                                         } catch (const ibrcommon::Exception &ex) {
00254                                                 IBRCOMMON_LOGGER_DEBUG(20) << "Exception occurred in EpidemicRoutingExtension: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00255                                         }
00256                                 } catch (const std::exception&) {
00257                                         return;
00258                                 }
00259 
00260                                 yield();
00261                         }
00262                 }
00263 
00264                 /****************************************/
00265 
00266                 EpidemicRoutingExtension::SearchNextBundleTask::SearchNextBundleTask(const dtn::data::EID &e)
00267                  : eid(e)
00268                 { }
00269 
00270                 EpidemicRoutingExtension::SearchNextBundleTask::~SearchNextBundleTask()
00271                 { }
00272 
00273                 std::string EpidemicRoutingExtension::SearchNextBundleTask::toString()
00274                 {
00275                         return "SearchNextBundleTask: " + eid.getString();
00276                 }
00277         }
00278 }