IBR-DTNSuite
0.8
|
00001 /* 00002 * NodeHandshakeExtension.cpp 00003 * 00004 * Created on: 08.12.2011 00005 * Author: morgenro 00006 */ 00007 00008 #include "routing/NodeHandshakeExtension.h" 00009 #include "routing/NodeHandshakeEvent.h" 00010 00011 #include "core/NodeEvent.h" 00012 #include "net/ConnectionEvent.h" 00013 #include "core/BundleCore.h" 00014 #include "core/BundleEvent.h" 00015 00016 #include <ibrdtn/data/AgeBlock.h> 00017 #include <ibrdtn/data/ScopeControlHopLimitBlock.h> 00018 #include <ibrdtn/utils/Clock.h> 00019 00020 #include <ibrcommon/thread/MutexLock.h> 00021 #include <ibrcommon/Logger.h> 00022 00023 namespace dtn 00024 { 00025 namespace routing 00026 { 00027 NodeHandshakeExtension::NodeHandshakeExtension() 00028 : _endpoint(*this) 00029 { 00030 } 00031 00032 NodeHandshakeExtension::~NodeHandshakeExtension() 00033 { 00034 } 00035 00036 void NodeHandshakeExtension::requestHandshake(const dtn::data::EID&, NodeHandshake &request) const 00037 { 00038 request.addRequest(BloomFilterPurgeVector::identifier); 00039 } 00040 00041 void NodeHandshakeExtension::responseHandshake(const dtn::data::EID&, const NodeHandshake &request, NodeHandshake &answer) 00042 { 00043 if (request.hasRequest(BloomFilterSummaryVector::identifier)) 00044 { 00045 // add own summary vector to the message 00046 const SummaryVector vec = (**this).getSummaryVector(); 00047 00048 // create an item 00049 BloomFilterSummaryVector *item = new BloomFilterSummaryVector(vec); 00050 00051 // add it to the handshake 00052 answer.addItem(item); 00053 } 00054 00055 if (request.hasRequest(BloomFilterPurgeVector::identifier)) 00056 { 00057 // add own purge vector to the message 00058 const SummaryVector vec = (**this).getPurgedBundles(); 00059 00060 // create an item 00061 BloomFilterSummaryVector *item = new BloomFilterSummaryVector(vec); 00062 00063 // add it to the handshake 00064 answer.addItem(item); 00065 } 00066 } 00067 00068 void NodeHandshakeExtension::processHandshake(const dtn::data::EID &source, NodeHandshake &answer) 00069 { 00070 try { 00071 const BloomFilterSummaryVector bfsv = answer.get<BloomFilterSummaryVector>(); 00072 00073 // get the summary vector (bloomfilter) of this ECM 00074 const ibrcommon::BloomFilter &filter = bfsv.getVector().getBloomFilter(); 00075 00081 NeighborDatabase &db = (**this).getNeighborDB(); 00082 ibrcommon::MutexLock l(db); 00083 NeighborDatabase::NeighborEntry &entry = db.get(source.getNode()); 00084 entry.update(filter, answer.getLifetime()); 00085 } catch (std::exception&) { }; 00086 00087 try { 00088 const BloomFilterPurgeVector bfpv = answer.get<BloomFilterPurgeVector>(); 00089 00090 // get the purge vector (bloomfilter) of this ECM 00091 const ibrcommon::BloomFilter &purge = bfpv.getVector().getBloomFilter(); 00092 00093 dtn::storage::BundleStorage &storage = (**this).getStorage(); 00094 00095 while (true) 00096 { 00097 // delete bundles in the purge vector 00098 const dtn::data::MetaBundle meta = storage.remove(purge); 00099 00100 // log the purged bundle 00101 IBRCOMMON_LOGGER(notice) << "bundle purged: " << meta.toString() << IBRCOMMON_LOGGER_ENDL; 00102 00103 // gen a report 00104 dtn::core::BundleEvent::raise(meta, dtn::core::BUNDLE_DELETED, StatusReportBlock::DEPLETED_STORAGE); 00105 00106 // add this bundle to the own purge vector 00107 (**this).addPurgedBundle(meta); 00108 } 00109 } catch (std::exception&) { }; 00110 } 00111 00112 void NodeHandshakeExtension::doHandshake(const dtn::data::EID &eid) 00113 { 00114 _endpoint.query(eid); 00115 } 00116 00117 void NodeHandshakeExtension::notify(const dtn::core::Event *evt) 00118 { 00119 // If a new neighbor comes available, send him a request for the summary vector 00120 // If a neighbor went away we can free the stored summary vector 00121 try { 00122 const dtn::core::NodeEvent &nodeevent = dynamic_cast<const dtn::core::NodeEvent&>(*evt); 00123 const dtn::core::Node &n = nodeevent.getNode(); 00124 00125 if (nodeevent.getAction() == NODE_UNAVAILABLE) 00126 { 00127 // remove the item from the blacklist 00128 _endpoint.removeFromBlacklist(n.getEID()); 00129 } 00130 00131 return; 00132 } catch (const std::bad_cast&) { }; 00133 } 00134 00135 const std::list<BaseRouter::Extension*>& NodeHandshakeExtension::getExtensions() 00136 { 00137 return (**this).getExtensions(); 00138 } 00139 00140 NodeHandshakeExtension::HandshakeEndpoint::HandshakeEndpoint(NodeHandshakeExtension &callback) 00141 : _callback(callback) 00142 { 00143 AbstractWorker::initialize("/routing", 50, true); 00144 } 00145 00146 NodeHandshakeExtension::HandshakeEndpoint::~HandshakeEndpoint() 00147 { 00148 } 00149 00150 void NodeHandshakeExtension::HandshakeEndpoint::callbackBundleReceived(const Bundle &b) 00151 { 00152 _callback.processHandshake(b); 00153 } 00154 00155 void NodeHandshakeExtension::HandshakeEndpoint::send(const dtn::data::Bundle &b) 00156 { 00157 transmit(b); 00158 } 00159 00160 void NodeHandshakeExtension::HandshakeEndpoint::removeFromBlacklist(const dtn::data::EID &eid) 00161 { 00162 ibrcommon::MutexLock l(_blacklist_lock); 00163 _blacklist.erase(eid); 00164 } 00165 00166 void NodeHandshakeExtension::HandshakeEndpoint::query(const dtn::data::EID &origin) 00167 { 00168 { 00169 ibrcommon::MutexLock l(_blacklist_lock); 00170 // only query once each 60 seconds 00171 if (_blacklist[origin] > dtn::utils::Clock::getUnixTimestamp()) return; 00172 _blacklist[origin] = dtn::utils::Clock::getUnixTimestamp() + 60; 00173 } 00174 00175 // create a new request for the summary vector of the neighbor 00176 NodeHandshake request(NodeHandshake::HANDSHAKE_REQUEST); 00177 00178 // walk through all extensions to process the contents of the response 00179 const std::list<BaseRouter::Extension*>& extensions = _callback.getExtensions(); 00180 00181 for (std::list<BaseRouter::Extension*>::const_iterator iter = extensions.begin(); iter != extensions.end(); iter++) 00182 { 00183 BaseRouter::Extension &extension = (**iter); 00184 extension.requestHandshake(origin, request); 00185 } 00186 00187 // create a new bundle 00188 dtn::data::Bundle req; 00189 00190 // set the source of the bundle 00191 req._source = getWorkerURI(); 00192 00193 // set the destination of the bundle 00194 req.set(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON, true); 00195 00196 if (origin.isCompressable()) 00197 req._destination = origin + origin.getDelimiter() + "50"; 00198 else 00199 req._destination = origin + origin.getDelimiter() + "routing"; 00200 00201 // limit the lifetime to 60 seconds 00202 req._lifetime = 60; 00203 00204 // set high priority 00205 req.set(dtn::data::PrimaryBlock::PRIORITY_BIT1, false); 00206 req.set(dtn::data::PrimaryBlock::PRIORITY_BIT2, true); 00207 00208 dtn::data::PayloadBlock &p = req.push_back<PayloadBlock>(); 00209 ibrcommon::BLOB::Reference ref = p.getBLOB(); 00210 00211 // serialize the request into the payload 00212 { 00213 ibrcommon::BLOB::iostream ios = ref.iostream(); 00214 (*ios) << request; 00215 } 00216 00217 // add a schl block 00218 dtn::data::ScopeControlHopLimitBlock &schl = req.push_front<dtn::data::ScopeControlHopLimitBlock>(); 00219 schl.setLimit(1); 00220 00221 // add an age block (to prevent expiring due to wrong clocks) 00222 req.push_front<dtn::data::AgeBlock>(); 00223 00224 // send the bundle 00225 transmit(req); 00226 } 00227 00228 void NodeHandshakeExtension::processHandshake(const dtn::data::Bundle &bundle) 00229 { 00230 // read the ecm 00231 const dtn::data::PayloadBlock &p = bundle.getBlock<dtn::data::PayloadBlock>(); 00232 ibrcommon::BLOB::Reference ref = p.getBLOB(); 00233 NodeHandshake handshake; 00234 00235 // locked within this region 00236 { 00237 ibrcommon::BLOB::iostream s = ref.iostream(); 00238 (*s) >> handshake; 00239 } 00240 00241 // if this is a request answer with an summary vector 00242 if (handshake.getType() == NodeHandshake::HANDSHAKE_REQUEST) 00243 { 00244 // create a new request for the summary vector of the neighbor 00245 NodeHandshake response(NodeHandshake::HANDSHAKE_RESPONSE); 00246 00247 // walk through all extensions to process the contents of the response 00248 const std::list<BaseRouter::Extension*>& extensions = (**this).getExtensions(); 00249 00250 for (std::list<BaseRouter::Extension*>::const_iterator iter = extensions.begin(); iter != extensions.end(); iter++) 00251 { 00252 BaseRouter::Extension &extension = (**iter); 00253 extension.responseHandshake(bundle._source, handshake, response); 00254 } 00255 00256 // create a new bundle 00257 dtn::data::Bundle answer; 00258 00259 // set the source of the bundle 00260 answer._source = _endpoint.getWorkerURI(); 00261 00262 // set the destination of the bundle 00263 answer.set(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON, true); 00264 answer._destination = bundle._source; 00265 00266 // limit the lifetime to 60 seconds 00267 answer._lifetime = 60; 00268 00269 // set high priority 00270 answer.set(dtn::data::PrimaryBlock::PRIORITY_BIT1, false); 00271 answer.set(dtn::data::PrimaryBlock::PRIORITY_BIT2, true); 00272 00273 dtn::data::PayloadBlock &p = answer.push_back<PayloadBlock>(); 00274 ibrcommon::BLOB::Reference ref = p.getBLOB(); 00275 00276 // serialize the request into the payload 00277 { 00278 ibrcommon::BLOB::iostream ios = ref.iostream(); 00279 (*ios) << response; 00280 } 00281 00282 // add a schl block 00283 dtn::data::ScopeControlHopLimitBlock &schl = answer.push_front<dtn::data::ScopeControlHopLimitBlock>(); 00284 schl.setLimit(1); 00285 00286 // add an age block (to prevent expiring due to wrong clocks) 00287 answer.push_front<dtn::data::AgeBlock>(); 00288 00289 // transfer the bundle to the neighbor 00290 _endpoint.send(answer); 00291 00292 // call handshake completed event 00293 NodeHandshakeEvent::raiseEvent( NodeHandshakeEvent::HANDSHAKE_REPLIED, bundle._source ); 00294 } 00295 else if (handshake.getType() == NodeHandshake::HANDSHAKE_RESPONSE) 00296 { 00297 // walk through all extensions to process the contents of the response 00298 const std::list<BaseRouter::Extension*>& extensions = (**this).getExtensions(); 00299 00300 for (std::list<BaseRouter::Extension*>::const_iterator iter = extensions.begin(); iter != extensions.end(); iter++) 00301 { 00302 BaseRouter::Extension &extension = (**iter); 00303 extension.processHandshake(bundle._source, handshake); 00304 } 00305 00306 // call handshake completed event 00307 NodeHandshakeEvent::raiseEvent( NodeHandshakeEvent::HANDSHAKE_COMPLETED, bundle._source ); 00308 } 00309 } 00310 } /* namespace routing */ 00311 } /* namespace dtn */