IBR-DTNSuite  0.8
daemon/src/routing/NodeHandshakeExtension.cpp
Go to the documentation of this file.
00001 /*
00002  * NodeHandshakeExtension.cpp
00003  *
00004  *  Created on: 08.12.2011
00005  *      Author: morgenro
00006  */
00007 
00008 #include "routing/NodeHandshakeExtension.h"
00009 #include "routing/NodeHandshakeEvent.h"
00010 
00011 #include "core/NodeEvent.h"
00012 #include "net/ConnectionEvent.h"
00013 #include "core/BundleCore.h"
00014 #include "core/BundleEvent.h"
00015 
00016 #include <ibrdtn/data/AgeBlock.h>
00017 #include <ibrdtn/data/ScopeControlHopLimitBlock.h>
00018 #include <ibrdtn/utils/Clock.h>
00019 
00020 #include <ibrcommon/thread/MutexLock.h>
00021 #include <ibrcommon/Logger.h>
00022 
00023 namespace dtn
00024 {
00025         namespace routing
00026         {
00027                 NodeHandshakeExtension::NodeHandshakeExtension()
00028                  : _endpoint(*this)
00029                 {
00030                 }
00031 
00032                 NodeHandshakeExtension::~NodeHandshakeExtension()
00033                 {
00034                 }
00035 
00036                 void NodeHandshakeExtension::requestHandshake(const dtn::data::EID&, NodeHandshake &request) const
00037                 {
00038                         request.addRequest(BloomFilterPurgeVector::identifier);
00039                 }
00040 
00041                 void NodeHandshakeExtension::responseHandshake(const dtn::data::EID&, const NodeHandshake &request, NodeHandshake &answer)
00042                 {
00043                         if (request.hasRequest(BloomFilterSummaryVector::identifier))
00044                         {
00045                                 // add own summary vector to the message
00046                                 const SummaryVector vec = (**this).getSummaryVector();
00047 
00048                                 // create an item
00049                                 BloomFilterSummaryVector *item = new BloomFilterSummaryVector(vec);
00050 
00051                                 // add it to the handshake
00052                                 answer.addItem(item);
00053                         }
00054 
00055                         if (request.hasRequest(BloomFilterPurgeVector::identifier))
00056                         {
00057                                 // add own purge vector to the message
00058                                 const SummaryVector vec = (**this).getPurgedBundles();
00059 
00060                                 // create an item
00061                                 BloomFilterSummaryVector *item = new BloomFilterSummaryVector(vec);
00062 
00063                                 // add it to the handshake
00064                                 answer.addItem(item);
00065                         }
00066                 }
00067 
00068                 void NodeHandshakeExtension::processHandshake(const dtn::data::EID &source, NodeHandshake &answer)
00069                 {
00070                         try {
00071                                 const BloomFilterSummaryVector bfsv = answer.get<BloomFilterSummaryVector>();
00072 
00073                                 // get the summary vector (bloomfilter) of this ECM
00074                                 const ibrcommon::BloomFilter &filter = bfsv.getVector().getBloomFilter();
00075 
00081                                 NeighborDatabase &db = (**this).getNeighborDB();
00082                                 ibrcommon::MutexLock l(db);
00083                                 NeighborDatabase::NeighborEntry &entry = db.get(source.getNode());
00084                                 entry.update(filter, answer.getLifetime());
00085                         } catch (std::exception&) { };
00086 
00087                         try {
00088                                 const BloomFilterPurgeVector bfpv = answer.get<BloomFilterPurgeVector>();
00089 
00090                                 // get the purge vector (bloomfilter) of this ECM
00091                                 const ibrcommon::BloomFilter &purge = bfpv.getVector().getBloomFilter();
00092 
00093                                 dtn::storage::BundleStorage &storage = (**this).getStorage();
00094 
00095                                 while (true)
00096                                 {
00097                                         // delete bundles in the purge vector
00098                                         const dtn::data::MetaBundle meta = storage.remove(purge);
00099 
00100                                         // log the purged bundle
00101                                         IBRCOMMON_LOGGER(notice) << "bundle purged: " << meta.toString() << IBRCOMMON_LOGGER_ENDL;
00102 
00103                                         // gen a report
00104                                         dtn::core::BundleEvent::raise(meta, dtn::core::BUNDLE_DELETED, StatusReportBlock::DEPLETED_STORAGE);
00105 
00106                                         // add this bundle to the own purge vector
00107                                         (**this).addPurgedBundle(meta);
00108                                 }
00109                         } catch (std::exception&) { };
00110                 }
00111 
00112                 void NodeHandshakeExtension::doHandshake(const dtn::data::EID &eid)
00113                 {
00114                         _endpoint.query(eid);
00115                 }
00116 
00117                 void NodeHandshakeExtension::notify(const dtn::core::Event *evt)
00118                 {
00119                         // If a new neighbor comes available, send him a request for the summary vector
00120                         // If a neighbor went away we can free the stored summary vector
00121                         try {
00122                                 const dtn::core::NodeEvent &nodeevent = dynamic_cast<const dtn::core::NodeEvent&>(*evt);
00123                                 const dtn::core::Node &n = nodeevent.getNode();
00124 
00125                                 if (nodeevent.getAction() == NODE_UNAVAILABLE)
00126                                 {
00127                                         // remove the item from the blacklist
00128                                         _endpoint.removeFromBlacklist(n.getEID());
00129                                 }
00130 
00131                                 return;
00132                         } catch (const std::bad_cast&) { };
00133                 }
00134 
00135                 const std::list<BaseRouter::Extension*>& NodeHandshakeExtension::getExtensions()
00136                 {
00137                         return (**this).getExtensions();
00138                 }
00139 
00140                 NodeHandshakeExtension::HandshakeEndpoint::HandshakeEndpoint(NodeHandshakeExtension &callback)
00141                  : _callback(callback)
00142                 {
00143                         AbstractWorker::initialize("/routing", 50, true);
00144                 }
00145 
00146                 NodeHandshakeExtension::HandshakeEndpoint::~HandshakeEndpoint()
00147                 {
00148                 }
00149 
00150                 void NodeHandshakeExtension::HandshakeEndpoint::callbackBundleReceived(const Bundle &b)
00151                 {
00152                         _callback.processHandshake(b);
00153                 }
00154 
00155                 void NodeHandshakeExtension::HandshakeEndpoint::send(const dtn::data::Bundle &b)
00156                 {
00157                         transmit(b);
00158                 }
00159 
00160                 void NodeHandshakeExtension::HandshakeEndpoint::removeFromBlacklist(const dtn::data::EID &eid)
00161                 {
00162                         ibrcommon::MutexLock l(_blacklist_lock);
00163                         _blacklist.erase(eid);
00164                 }
00165 
00166                 void NodeHandshakeExtension::HandshakeEndpoint::query(const dtn::data::EID &origin)
00167                 {
00168                         {
00169                                 ibrcommon::MutexLock l(_blacklist_lock);
00170                                 // only query once each 60 seconds
00171                                 if (_blacklist[origin] > dtn::utils::Clock::getUnixTimestamp()) return;
00172                                 _blacklist[origin] = dtn::utils::Clock::getUnixTimestamp() + 60;
00173                         }
00174 
00175                         // create a new request for the summary vector of the neighbor
00176                         NodeHandshake request(NodeHandshake::HANDSHAKE_REQUEST);
00177 
00178                         // walk through all extensions to process the contents of the response
00179                         const std::list<BaseRouter::Extension*>& extensions = _callback.getExtensions();
00180 
00181                         for (std::list<BaseRouter::Extension*>::const_iterator iter = extensions.begin(); iter != extensions.end(); iter++)
00182                         {
00183                                 BaseRouter::Extension &extension = (**iter);
00184                                 extension.requestHandshake(origin, request);
00185                         }
00186 
00187                         // create a new bundle
00188                         dtn::data::Bundle req;
00189 
00190                         // set the source of the bundle
00191                         req._source = getWorkerURI();
00192 
00193                         // set the destination of the bundle
00194                         req.set(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON, true);
00195 
00196                         if (origin.isCompressable())
00197                                 req._destination = origin + origin.getDelimiter() + "50";
00198                         else
00199                                 req._destination = origin + origin.getDelimiter() + "routing";
00200 
00201                         // limit the lifetime to 60 seconds
00202                         req._lifetime = 60;
00203 
00204                         // set high priority
00205                         req.set(dtn::data::PrimaryBlock::PRIORITY_BIT1, false);
00206                         req.set(dtn::data::PrimaryBlock::PRIORITY_BIT2, true);
00207 
00208                         dtn::data::PayloadBlock &p = req.push_back<PayloadBlock>();
00209                         ibrcommon::BLOB::Reference ref = p.getBLOB();
00210 
00211                         // serialize the request into the payload
00212                         {
00213                                 ibrcommon::BLOB::iostream ios = ref.iostream();
00214                                 (*ios) << request;
00215                         }
00216 
00217                         // add a schl block
00218                         dtn::data::ScopeControlHopLimitBlock &schl = req.push_front<dtn::data::ScopeControlHopLimitBlock>();
00219                         schl.setLimit(1);
00220 
00221                         // add an age block (to prevent expiring due to wrong clocks)
00222                         req.push_front<dtn::data::AgeBlock>();
00223 
00224                         // send the bundle
00225                         transmit(req);
00226                 }
00227 
00228                 void NodeHandshakeExtension::processHandshake(const dtn::data::Bundle &bundle)
00229                 {
00230                         // read the ecm
00231                         const dtn::data::PayloadBlock &p = bundle.getBlock<dtn::data::PayloadBlock>();
00232                         ibrcommon::BLOB::Reference ref = p.getBLOB();
00233                         NodeHandshake handshake;
00234 
00235                         // locked within this region
00236                         {
00237                                 ibrcommon::BLOB::iostream s = ref.iostream();
00238                                 (*s) >> handshake;
00239                         }
00240 
00241                         // if this is a request answer with an summary vector
00242                         if (handshake.getType() == NodeHandshake::HANDSHAKE_REQUEST)
00243                         {
00244                                 // create a new request for the summary vector of the neighbor
00245                                 NodeHandshake response(NodeHandshake::HANDSHAKE_RESPONSE);
00246 
00247                                 // walk through all extensions to process the contents of the response
00248                                 const std::list<BaseRouter::Extension*>& extensions = (**this).getExtensions();
00249 
00250                                 for (std::list<BaseRouter::Extension*>::const_iterator iter = extensions.begin(); iter != extensions.end(); iter++)
00251                                 {
00252                                         BaseRouter::Extension &extension = (**iter);
00253                                         extension.responseHandshake(bundle._source, handshake, response);
00254                                 }
00255 
00256                                 // create a new bundle
00257                                 dtn::data::Bundle answer;
00258 
00259                                 // set the source of the bundle
00260                                 answer._source = _endpoint.getWorkerURI();
00261 
00262                                 // set the destination of the bundle
00263                                 answer.set(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON, true);
00264                                 answer._destination = bundle._source;
00265 
00266                                 // limit the lifetime to 60 seconds
00267                                 answer._lifetime = 60;
00268 
00269                                 // set high priority
00270                                 answer.set(dtn::data::PrimaryBlock::PRIORITY_BIT1, false);
00271                                 answer.set(dtn::data::PrimaryBlock::PRIORITY_BIT2, true);
00272 
00273                                 dtn::data::PayloadBlock &p = answer.push_back<PayloadBlock>();
00274                                 ibrcommon::BLOB::Reference ref = p.getBLOB();
00275 
00276                                 // serialize the request into the payload
00277                                 {
00278                                         ibrcommon::BLOB::iostream ios = ref.iostream();
00279                                         (*ios) << response;
00280                                 }
00281 
00282                                 // add a schl block
00283                                 dtn::data::ScopeControlHopLimitBlock &schl = answer.push_front<dtn::data::ScopeControlHopLimitBlock>();
00284                                 schl.setLimit(1);
00285 
00286                                 // add an age block (to prevent expiring due to wrong clocks)
00287                                 answer.push_front<dtn::data::AgeBlock>();
00288 
00289                                 // transfer the bundle to the neighbor
00290                                 _endpoint.send(answer);
00291 
00292                                 // call handshake completed event
00293                                 NodeHandshakeEvent::raiseEvent( NodeHandshakeEvent::HANDSHAKE_REPLIED, bundle._source );
00294                         }
00295                         else if (handshake.getType() == NodeHandshake::HANDSHAKE_RESPONSE)
00296                         {
00297                                 // walk through all extensions to process the contents of the response
00298                                 const std::list<BaseRouter::Extension*>& extensions = (**this).getExtensions();
00299 
00300                                 for (std::list<BaseRouter::Extension*>::const_iterator iter = extensions.begin(); iter != extensions.end(); iter++)
00301                                 {
00302                                         BaseRouter::Extension &extension = (**iter);
00303                                         extension.processHandshake(bundle._source, handshake);
00304                                 }
00305 
00306                                 // call handshake completed event
00307                                 NodeHandshakeEvent::raiseEvent( NodeHandshakeEvent::HANDSHAKE_COMPLETED, bundle._source );
00308                         }
00309                 }
00310         } /* namespace routing */
00311 } /* namespace dtn */