IBR-DTNSuite
0.8
|
00001 /* 00002 * FileConvergenceLayer.cpp 00003 * 00004 * Created on: 20.09.2011 00005 * Author: morgenro 00006 */ 00007 00008 #include "net/FileConvergenceLayer.h" 00009 #include "net/TransferCompletedEvent.h" 00010 #include "net/TransferAbortedEvent.h" 00011 #include "net/BundleReceivedEvent.h" 00012 #include "core/BundleEvent.h" 00013 #include "core/BundleCore.h" 00014 #include "core/NodeEvent.h" 00015 #include "core/TimeEvent.h" 00016 #include "routing/BaseRouter.h" 00017 #include "routing/NodeHandshake.h" 00018 #include "routing/RequeueBundleEvent.h" 00019 #include <ibrdtn/data/ScopeControlHopLimitBlock.h> 00020 #include <ibrdtn/utils/Clock.h> 00021 #include <ibrcommon/data/File.h> 00022 #include <ibrcommon/Logger.h> 00023 #include <ibrcommon/thread/MutexLock.h> 00024 00025 namespace dtn 00026 { 00027 namespace net 00028 { 00029 FileConvergenceLayer::Task::Task(FileConvergenceLayer::Task::Action a, const dtn::core::Node &n) 00030 : action(a), node(n) 00031 { 00032 } 00033 00034 FileConvergenceLayer::Task::~Task() 00035 { 00036 } 00037 00038 FileConvergenceLayer::StoreBundleTask::StoreBundleTask(const dtn::core::Node &n, const ConvergenceLayer::Job &j) 00039 : FileConvergenceLayer::Task(TASK_STORE, n), job(j) 00040 { 00041 } 00042 00043 FileConvergenceLayer::StoreBundleTask::~StoreBundleTask() 00044 { 00045 } 00046 00047 FileConvergenceLayer::FileConvergenceLayer() 00048 { 00049 } 00050 00051 FileConvergenceLayer::~FileConvergenceLayer() 00052 { 00053 } 00054 00055 void FileConvergenceLayer::componentUp() 00056 { 00057 bindEvent(dtn::core::NodeEvent::className); 00058 bindEvent(dtn::core::TimeEvent::className); 00059 } 00060 00061 void FileConvergenceLayer::componentDown() 00062 { 00063 unbindEvent(dtn::core::NodeEvent::className); 00064 unbindEvent(dtn::core::TimeEvent::className); 00065 } 00066 00067 void FileConvergenceLayer::__cancellation() 00068 { 00069 _tasks.abort(); 00070 } 00071 00072 void FileConvergenceLayer::componentRun() 00073 { 00074 try { 00075 while (true) 00076 { 00077 Task *t = _tasks.getnpop(true); 00078 00079 try { 00080 switch (t->action) 00081 { 00082 case Task::TASK_LOAD: 00083 { 00084 // load bundles (receive) 00085 load(t->node); 00086 break; 00087 } 00088 00089 case Task::TASK_STORE: 00090 { 00091 try { 00092 const StoreBundleTask &sbt = dynamic_cast<const StoreBundleTask&>(*t); 00093 dtn::storage::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage(); 00094 00095 // get the file path of the node 00096 ibrcommon::File path = getPath(sbt.node); 00097 00098 // scan for bundles 00099 std::list<dtn::data::MetaBundle> bundles = scan(path); 00100 00101 try { 00102 // check if bundle is a routing bundle 00103 if (sbt.job._bundle.source == (dtn::core::BundleCore::local + "/routing")) 00104 { 00105 // read the bundle out of the storage 00106 const dtn::data::Bundle bundle = storage.get(sbt.job._bundle); 00107 00108 if (bundle._destination == (sbt.node.getEID() + "/routing")) 00109 { 00110 // add this bundle to the blacklist 00111 { 00112 ibrcommon::MutexLock l(_blacklist_mutex); 00113 if (_blacklist.find(bundle) != _blacklist.end()) 00114 { 00115 // send transfer aborted event 00116 dtn::net::TransferAbortedEvent::raise(sbt.node.getEID(), sbt.job._bundle, dtn::net::TransferAbortedEvent::REASON_REFUSED); 00117 continue; 00118 } 00119 _blacklist.add(bundle); 00120 } 00121 00122 // create ECM reply 00123 replyHandshake(bundle, bundles); 00124 00125 // raise bundle event 00126 dtn::net::TransferCompletedEvent::raise(sbt.node.getEID(), bundle); 00127 dtn::core::BundleEvent::raise(bundle, dtn::core::BUNDLE_FORWARDED); 00128 continue; 00129 } 00130 } 00131 00132 // check if bundle is already in the path 00133 for (std::list<dtn::data::MetaBundle>::const_iterator iter = bundles.begin(); iter != bundles.end(); iter++) 00134 { 00135 if ((*iter) == sbt.job._bundle) 00136 { 00137 // send transfer aborted event 00138 dtn::net::TransferAbortedEvent::raise(sbt.node.getEID(), sbt.job._bundle, dtn::net::TransferAbortedEvent::REASON_REFUSED); 00139 continue; 00140 } 00141 } 00142 00143 ibrcommon::TemporaryFile filename(path, "bundle"); 00144 00145 try { 00146 // read the bundle out of the storage 00147 const dtn::data::Bundle bundle = storage.get(sbt.job._bundle); 00148 00149 std::fstream fs(filename.getPath().c_str(), std::fstream::out); 00150 00151 IBRCOMMON_LOGGER(info) << "write bundle " << sbt.job._bundle.toString() << " to file " << filename.getPath() << IBRCOMMON_LOGGER_ENDL; 00152 00153 dtn::data::DefaultSerializer s(fs); 00154 00155 // serialize the bundle 00156 s << bundle; 00157 00158 // raise bundle event 00159 dtn::net::TransferCompletedEvent::raise(sbt.node.getEID(), bundle); 00160 dtn::core::BundleEvent::raise(bundle, dtn::core::BUNDLE_FORWARDED); 00161 } catch (const ibrcommon::Exception&) { 00162 filename.remove(); 00163 throw; 00164 } 00165 } catch (const dtn::storage::BundleStorage::NoBundleFoundException&) { 00166 // send transfer aborted event 00167 dtn::net::TransferAbortedEvent::raise(sbt.node.getEID(), sbt.job._bundle, dtn::net::TransferAbortedEvent::REASON_BUNDLE_DELETED); 00168 } catch (const ibrcommon::Exception&) { 00169 // something went wrong - requeue transfer for later 00170 dtn::routing::RequeueBundleEvent::raise(sbt.node.getEID(), sbt.job._bundle); 00171 } 00172 00173 } catch (const std::bad_cast&) { } 00174 break; 00175 } 00176 } 00177 } catch (const std::exception &ex) { 00178 IBRCOMMON_LOGGER(error) << "error while processing file convergencelayer task: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00179 } 00180 delete t; 00181 } 00182 } catch (const ibrcommon::QueueUnblockedException &ex) { }; 00183 } 00184 00185 void FileConvergenceLayer::raiseEvent(const dtn::core::Event *evt) 00186 { 00187 try { 00188 const dtn::core::NodeEvent &node = dynamic_cast<const dtn::core::NodeEvent&>(*evt); 00189 00190 if (node.getAction() == dtn::core::NODE_AVAILABLE) 00191 { 00192 const dtn::core::Node &n = node.getNode(); 00193 if ( n.has(dtn::core::Node::CONN_FILE) ) 00194 { 00195 _tasks.push(new Task(Task::TASK_LOAD, n)); 00196 } 00197 } 00198 } catch (const std::bad_cast&) { }; 00199 00200 try { 00201 const dtn::core::TimeEvent &time = dynamic_cast<const dtn::core::TimeEvent&>(*evt); 00202 00203 if (time.getAction() == dtn::core::TIME_SECOND_TICK) 00204 { 00205 ibrcommon::MutexLock l(_blacklist_mutex); 00206 _blacklist.expire(time.getTimestamp()); 00207 } 00208 } catch (const std::bad_cast&) { }; 00209 } 00210 00211 const std::string FileConvergenceLayer::getName() const 00212 { 00213 return "FileConvergenceLayer"; 00214 } 00215 00216 dtn::core::Node::Protocol FileConvergenceLayer::getDiscoveryProtocol() const 00217 { 00218 return dtn::core::Node::CONN_FILE; 00219 } 00220 00221 void FileConvergenceLayer::open(const dtn::core::Node&) 00222 { 00223 } 00224 00225 void FileConvergenceLayer::load(const dtn::core::Node &n) 00226 { 00227 std::list<dtn::data::MetaBundle> ret; 00228 std::list<ibrcommon::File> files; 00229 00230 // list all files in the folder 00231 getPath(n).getFiles(files); 00232 00233 // get a reference to the router 00234 dtn::routing::BaseRouter &router = dtn::core::BundleCore::getInstance().getRouter(); 00235 00236 for (std::list<ibrcommon::File>::const_iterator iter = files.begin(); iter != files.end(); iter++) 00237 { 00238 const ibrcommon::File &f = (*iter); 00239 00240 // skip system files 00241 if (f.isSystem()) continue; 00242 00243 try { 00244 // open the file 00245 std::fstream fs(f.getPath().c_str(), std::fstream::in); 00246 00247 // get a deserializer 00248 dtn::data::DefaultDeserializer d(fs); 00249 00250 dtn::data::MetaBundle bundle; 00251 00252 // load meta data 00253 d >> bundle; 00254 00255 // check the bundle 00256 if ( ( bundle.destination == EID() ) || ( bundle.source == EID() ) ) 00257 { 00258 // invalid bundle! 00259 throw dtn::data::Validator::RejectedException("destination or source EID is null"); 00260 } 00261 00262 // ask if the bundle is already known 00263 if ( router.isKnown(bundle) ) continue; 00264 } catch (const std::exception&) { 00265 // bundle could not be read 00266 continue; 00267 } 00268 00269 try { 00270 // open the file 00271 std::fstream fs(f.getPath().c_str(), std::fstream::in); 00272 00273 // get a deserializer 00274 dtn::data::DefaultDeserializer d(fs, dtn::core::BundleCore::getInstance()); 00275 00276 dtn::data::Bundle bundle; 00277 00278 // load meta data 00279 d >> bundle; 00280 00281 // increment value in the scope control hop limit block 00282 try { 00283 dtn::data::ScopeControlHopLimitBlock &schl = bundle.getBlock<dtn::data::ScopeControlHopLimitBlock>(); 00284 schl.increment(); 00285 } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) { } 00286 00287 // raise default bundle received event 00288 dtn::net::BundleReceivedEvent::raise(n.getEID(), bundle, false, true); 00289 } 00290 catch (const dtn::data::Validator::RejectedException &ex) 00291 { 00292 // display the rejection 00293 IBRCOMMON_LOGGER(warning) << "bundle has been rejected: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00294 } 00295 catch (const dtn::InvalidDataException &ex) { 00296 // display the rejection 00297 IBRCOMMON_LOGGER(warning) << "invalid bundle-data received: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00298 } 00299 } 00300 } 00301 00302 ibrcommon::File FileConvergenceLayer::getPath(const dtn::core::Node &n) 00303 { 00304 std::list<dtn::core::Node::URI> uris = n.get(dtn::core::Node::CONN_FILE); 00305 00306 // abort the transfer, if no URI exists 00307 if (uris.size() == 0) throw ibrcommon::Exception("path not defined"); 00308 00309 // get the URI of the file path 00310 const std::string &uri = uris.front().value; 00311 00312 if (uri.substr(0, 7) != "file://") throw ibrcommon::Exception("path invalid"); 00313 00314 return ibrcommon::File(uri.substr(7, uri.length() - 7)); 00315 } 00316 00317 std::list<dtn::data::MetaBundle> FileConvergenceLayer::scan(const ibrcommon::File &path) 00318 { 00319 std::list<dtn::data::MetaBundle> ret; 00320 std::list<ibrcommon::File> files; 00321 00322 // list all files in the folder 00323 path.getFiles(files); 00324 00325 for (std::list<ibrcommon::File>::const_iterator iter = files.begin(); iter != files.end(); iter++) 00326 { 00327 const ibrcommon::File &f = (*iter); 00328 00329 // skip system files 00330 if (f.isSystem()) continue; 00331 00332 try { 00333 // open the file 00334 std::fstream fs(f.getPath().c_str(), std::fstream::in); 00335 00336 // get a deserializer 00337 dtn::data::DefaultDeserializer d(fs); 00338 00339 dtn::data::MetaBundle meta; 00340 00341 // load meta data 00342 d >> meta; 00343 00344 if (meta.expiretime < dtn::utils::Clock::getTime()) 00345 { 00346 dtn::core::BundleEvent::raise(meta, dtn::core::BUNDLE_DELETED, dtn::data::StatusReportBlock::LIFETIME_EXPIRED); 00347 throw ibrcommon::Exception("bundle is expired"); 00348 } 00349 00350 // put the meta bundle in the list 00351 ret.push_back(meta); 00352 } catch (const std::exception&) { 00353 IBRCOMMON_LOGGER_DEBUG(34) << "bundle in file " << f.getPath() << " invalid or expired" << IBRCOMMON_LOGGER_ENDL; 00354 00355 // delete the file 00356 ibrcommon::File(f).remove(); 00357 } 00358 } 00359 00360 return ret; 00361 } 00362 00363 void FileConvergenceLayer::queue(const dtn::core::Node &n, const ConvergenceLayer::Job &job) 00364 { 00365 _tasks.push(new StoreBundleTask(n, job)); 00366 } 00367 00368 void FileConvergenceLayer::replyHandshake(const dtn::data::Bundle &bundle, std::list<dtn::data::MetaBundle> &bl) 00369 { 00370 // read the ecm 00371 const dtn::data::PayloadBlock &p = bundle.getBlock<dtn::data::PayloadBlock>(); 00372 ibrcommon::BLOB::Reference ref = p.getBLOB(); 00373 dtn::routing::NodeHandshake request; 00374 00375 // locked within this region 00376 { 00377 ibrcommon::BLOB::iostream s = ref.iostream(); 00378 (*s) >> request; 00379 } 00380 00381 // if this is a request answer with an summary vector 00382 if (request.getType() == dtn::routing::NodeHandshake::HANDSHAKE_REQUEST) 00383 { 00384 // create a new request for the summary vector of the neighbor 00385 dtn::routing::NodeHandshake response(dtn::routing::NodeHandshake::HANDSHAKE_RESPONSE); 00386 00387 if (request.hasRequest(dtn::routing::BloomFilterSummaryVector::identifier)) 00388 { 00389 // add own summary vector to the message 00390 dtn::routing::SummaryVector vec; 00391 00392 // add bundles in the path 00393 for (std::list<dtn::data::MetaBundle>::const_iterator iter = bl.begin(); iter != bl.end(); iter++) 00394 { 00395 vec.add(*iter); 00396 } 00397 00398 // add bundles from the blacklist 00399 { 00400 ibrcommon::MutexLock l(_blacklist_mutex); 00401 for (std::set<dtn::data::MetaBundle>::const_iterator iter = _blacklist.begin(); iter != _blacklist.end(); iter++) 00402 { 00403 vec.add(*iter); 00404 } 00405 } 00406 00407 // create an item 00408 dtn::routing::BloomFilterSummaryVector *item = new dtn::routing::BloomFilterSummaryVector(vec); 00409 00410 // add it to the handshake 00411 response.addItem(item); 00412 } 00413 00414 // create a new bundle 00415 dtn::data::Bundle answer; 00416 00417 // set the source of the bundle 00418 answer._source = bundle._destination; 00419 00420 // set the destination of the bundle 00421 answer.set(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON, true); 00422 answer._destination = bundle._source; 00423 00424 // limit the lifetime to 60 seconds 00425 answer._lifetime = 60; 00426 00427 // set high priority 00428 answer.set(dtn::data::PrimaryBlock::PRIORITY_BIT1, false); 00429 answer.set(dtn::data::PrimaryBlock::PRIORITY_BIT2, true); 00430 00431 dtn::data::PayloadBlock &p = answer.push_back<PayloadBlock>(); 00432 ibrcommon::BLOB::Reference ref = p.getBLOB(); 00433 00434 // serialize the request into the payload 00435 { 00436 ibrcommon::BLOB::iostream ios = ref.iostream(); 00437 (*ios) << response; 00438 } 00439 00440 // add a schl block 00441 dtn::data::ScopeControlHopLimitBlock &schl = answer.push_front<dtn::data::ScopeControlHopLimitBlock>(); 00442 schl.setLimit(1); 00443 00444 // raise default bundle received event 00445 dtn::net::BundleReceivedEvent::raise(bundle._destination.getNode(), answer, false, true); 00446 } 00447 } 00448 } /* namespace net */ 00449 } /* namespace dtn */