IBR-DTNSuite  0.8
daemon/src/net/FileConvergenceLayer.cpp
Go to the documentation of this file.
00001 /*
00002  * FileConvergenceLayer.cpp
00003  *
00004  *  Created on: 20.09.2011
00005  *      Author: morgenro
00006  */
00007 
00008 #include "net/FileConvergenceLayer.h"
00009 #include "net/TransferCompletedEvent.h"
00010 #include "net/TransferAbortedEvent.h"
00011 #include "net/BundleReceivedEvent.h"
00012 #include "core/BundleEvent.h"
00013 #include "core/BundleCore.h"
00014 #include "core/NodeEvent.h"
00015 #include "core/TimeEvent.h"
00016 #include "routing/BaseRouter.h"
00017 #include "routing/NodeHandshake.h"
00018 #include "routing/RequeueBundleEvent.h"
00019 #include <ibrdtn/data/ScopeControlHopLimitBlock.h>
00020 #include <ibrdtn/utils/Clock.h>
00021 #include <ibrcommon/data/File.h>
00022 #include <ibrcommon/Logger.h>
00023 #include <ibrcommon/thread/MutexLock.h>
00024 
00025 namespace dtn
00026 {
00027         namespace net
00028         {
00029                 FileConvergenceLayer::Task::Task(FileConvergenceLayer::Task::Action a, const dtn::core::Node &n)
00030                  : action(a), node(n)
00031                 {
00032                 }
00033 
00034                 FileConvergenceLayer::Task::~Task()
00035                 {
00036                 }
00037 
00038                 FileConvergenceLayer::StoreBundleTask::StoreBundleTask(const dtn::core::Node &n, const ConvergenceLayer::Job &j)
00039                  : FileConvergenceLayer::Task(TASK_STORE, n), job(j)
00040                 {
00041                 }
00042 
00043                 FileConvergenceLayer::StoreBundleTask::~StoreBundleTask()
00044                 {
00045                 }
00046 
00047                 FileConvergenceLayer::FileConvergenceLayer()
00048                 {
00049                 }
00050 
00051                 FileConvergenceLayer::~FileConvergenceLayer()
00052                 {
00053                 }
00054 
00055                 void FileConvergenceLayer::componentUp()
00056                 {
00057                         bindEvent(dtn::core::NodeEvent::className);
00058                         bindEvent(dtn::core::TimeEvent::className);
00059                 }
00060 
00061                 void FileConvergenceLayer::componentDown()
00062                 {
00063                         unbindEvent(dtn::core::NodeEvent::className);
00064                         unbindEvent(dtn::core::TimeEvent::className);
00065                 }
00066 
00067                 void FileConvergenceLayer::__cancellation()
00068                 {
00069                         _tasks.abort();
00070                 }
00071 
00072                 void FileConvergenceLayer::componentRun()
00073                 {
00074                         try {
00075                                 while (true)
00076                                 {
00077                                         Task *t = _tasks.getnpop(true);
00078 
00079                                         try {
00080                                                 switch (t->action)
00081                                                 {
00082                                                         case Task::TASK_LOAD:
00083                                                         {
00084                                                                 // load bundles (receive)
00085                                                                 load(t->node);
00086                                                                 break;
00087                                                         }
00088 
00089                                                         case Task::TASK_STORE:
00090                                                         {
00091                                                                 try {
00092                                                                         const StoreBundleTask &sbt = dynamic_cast<const StoreBundleTask&>(*t);
00093                                                                         dtn::storage::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage();
00094 
00095                                                                         // get the file path of the node
00096                                                                         ibrcommon::File path = getPath(sbt.node);
00097 
00098                                                                         // scan for bundles
00099                                                                         std::list<dtn::data::MetaBundle> bundles = scan(path);
00100 
00101                                                                         try {
00102                                                                                 // check if bundle is a routing bundle
00103                                                                                 if (sbt.job._bundle.source == (dtn::core::BundleCore::local + "/routing"))
00104                                                                                 {
00105                                                                                         // read the bundle out of the storage
00106                                                                                         const dtn::data::Bundle bundle = storage.get(sbt.job._bundle);
00107 
00108                                                                                         if (bundle._destination == (sbt.node.getEID() + "/routing"))
00109                                                                                         {
00110                                                                                                 // add this bundle to the blacklist
00111                                                                                                 {
00112                                                                                                         ibrcommon::MutexLock l(_blacklist_mutex);
00113                                                                                                         if (_blacklist.find(bundle) != _blacklist.end())
00114                                                                                                         {
00115                                                                                                                 // send transfer aborted event
00116                                                                                                                 dtn::net::TransferAbortedEvent::raise(sbt.node.getEID(), sbt.job._bundle, dtn::net::TransferAbortedEvent::REASON_REFUSED);
00117                                                                                                                 continue;
00118                                                                                                         }
00119                                                                                                         _blacklist.add(bundle);
00120                                                                                                 }
00121 
00122                                                                                                 // create ECM reply
00123                                                                                                 replyHandshake(bundle, bundles);
00124 
00125                                                                                                 // raise bundle event
00126                                                                                                 dtn::net::TransferCompletedEvent::raise(sbt.node.getEID(), bundle);
00127                                                                                                 dtn::core::BundleEvent::raise(bundle, dtn::core::BUNDLE_FORWARDED);
00128                                                                                                 continue;
00129                                                                                         }
00130                                                                                 }
00131 
00132                                                                                 // check if bundle is already in the path
00133                                                                                 for (std::list<dtn::data::MetaBundle>::const_iterator iter = bundles.begin(); iter != bundles.end(); iter++)
00134                                                                                 {
00135                                                                                         if ((*iter) == sbt.job._bundle)
00136                                                                                         {
00137                                                                                                 // send transfer aborted event
00138                                                                                                 dtn::net::TransferAbortedEvent::raise(sbt.node.getEID(), sbt.job._bundle, dtn::net::TransferAbortedEvent::REASON_REFUSED);
00139                                                                                                 continue;
00140                                                                                         }
00141                                                                                 }
00142 
00143                                                                                 ibrcommon::TemporaryFile filename(path, "bundle");
00144 
00145                                                                                 try {
00146                                                                                         // read the bundle out of the storage
00147                                                                                         const dtn::data::Bundle bundle = storage.get(sbt.job._bundle);
00148 
00149                                                                                         std::fstream fs(filename.getPath().c_str(), std::fstream::out);
00150 
00151                                                                                         IBRCOMMON_LOGGER(info) << "write bundle " << sbt.job._bundle.toString() << " to file " << filename.getPath() << IBRCOMMON_LOGGER_ENDL;
00152 
00153                                                                                         dtn::data::DefaultSerializer s(fs);
00154 
00155                                                                                         // serialize the bundle
00156                                                                                         s << bundle;
00157 
00158                                                                                         // raise bundle event
00159                                                                                         dtn::net::TransferCompletedEvent::raise(sbt.node.getEID(), bundle);
00160                                                                                         dtn::core::BundleEvent::raise(bundle, dtn::core::BUNDLE_FORWARDED);
00161                                                                                 } catch (const ibrcommon::Exception&) {
00162                                                                                         filename.remove();
00163                                                                                         throw;
00164                                                                                 }
00165                                                                         } catch (const dtn::storage::BundleStorage::NoBundleFoundException&) {
00166                                                                                 // send transfer aborted event
00167                                                                                 dtn::net::TransferAbortedEvent::raise(sbt.node.getEID(), sbt.job._bundle, dtn::net::TransferAbortedEvent::REASON_BUNDLE_DELETED);
00168                                                                         } catch (const ibrcommon::Exception&) {
00169                                                                                 // something went wrong - requeue transfer for later
00170                                                                                 dtn::routing::RequeueBundleEvent::raise(sbt.node.getEID(), sbt.job._bundle);
00171                                                                         }
00172 
00173                                                                 } catch (const std::bad_cast&) { }
00174                                                                 break;
00175                                                         }
00176                                                 }
00177                                         } catch (const std::exception &ex) {
00178                                                 IBRCOMMON_LOGGER(error) << "error while processing file convergencelayer task: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00179                                         }
00180                                         delete t;
00181                                 }
00182                         } catch (const ibrcommon::QueueUnblockedException &ex) { };
00183                 }
00184 
00185                 void FileConvergenceLayer::raiseEvent(const dtn::core::Event *evt)
00186                 {
00187                         try {
00188                                 const dtn::core::NodeEvent &node = dynamic_cast<const dtn::core::NodeEvent&>(*evt);
00189 
00190                                 if (node.getAction() == dtn::core::NODE_AVAILABLE)
00191                                 {
00192                                         const dtn::core::Node &n = node.getNode();
00193                                         if ( n.has(dtn::core::Node::CONN_FILE) )
00194                                         {
00195                                                 _tasks.push(new Task(Task::TASK_LOAD, n));
00196                                         }
00197                                 }
00198                         } catch (const std::bad_cast&) { };
00199 
00200                         try {
00201                                 const dtn::core::TimeEvent &time = dynamic_cast<const dtn::core::TimeEvent&>(*evt);
00202 
00203                                 if (time.getAction() == dtn::core::TIME_SECOND_TICK)
00204                                 {
00205                                         ibrcommon::MutexLock l(_blacklist_mutex);
00206                                         _blacklist.expire(time.getTimestamp());
00207                                 }
00208                         } catch (const std::bad_cast&) { };
00209                 }
00210 
00211                 const std::string FileConvergenceLayer::getName() const
00212                 {
00213                         return "FileConvergenceLayer";
00214                 }
00215 
00216                 dtn::core::Node::Protocol FileConvergenceLayer::getDiscoveryProtocol() const
00217                 {
00218                         return dtn::core::Node::CONN_FILE;
00219                 }
00220 
00221                 void FileConvergenceLayer::open(const dtn::core::Node&)
00222                 {
00223                 }
00224 
00225                 void FileConvergenceLayer::load(const dtn::core::Node &n)
00226                 {
00227                         std::list<dtn::data::MetaBundle> ret;
00228                         std::list<ibrcommon::File> files;
00229 
00230                         // list all files in the folder
00231                         getPath(n).getFiles(files);
00232 
00233                         // get a reference to the router
00234                         dtn::routing::BaseRouter &router = dtn::core::BundleCore::getInstance().getRouter();
00235 
00236                         for (std::list<ibrcommon::File>::const_iterator iter = files.begin(); iter != files.end(); iter++)
00237                         {
00238                                 const ibrcommon::File &f = (*iter);
00239 
00240                                 // skip system files
00241                                 if (f.isSystem()) continue;
00242 
00243                                 try {
00244                                         // open the file
00245                                         std::fstream fs(f.getPath().c_str(), std::fstream::in);
00246 
00247                                         // get a deserializer
00248                                         dtn::data::DefaultDeserializer d(fs);
00249 
00250                                         dtn::data::MetaBundle bundle;
00251 
00252                                         // load meta data
00253                                         d >> bundle;
00254 
00255                                         // check the bundle
00256                                         if ( ( bundle.destination == EID() ) || ( bundle.source == EID() ) )
00257                                         {
00258                                                 // invalid bundle!
00259                                                 throw dtn::data::Validator::RejectedException("destination or source EID is null");
00260                                         }
00261 
00262                                         // ask if the bundle is already known
00263                                         if ( router.isKnown(bundle) ) continue;
00264                                 } catch (const std::exception&) {
00265                                         // bundle could not be read
00266                                         continue;
00267                                 }
00268 
00269                                 try {
00270                                         // open the file
00271                                         std::fstream fs(f.getPath().c_str(), std::fstream::in);
00272 
00273                                         // get a deserializer
00274                                         dtn::data::DefaultDeserializer d(fs, dtn::core::BundleCore::getInstance());
00275 
00276                                         dtn::data::Bundle bundle;
00277 
00278                                         // load meta data
00279                                         d >> bundle;
00280 
00281                                         // increment value in the scope control hop limit block
00282                                         try {
00283                                                 dtn::data::ScopeControlHopLimitBlock &schl = bundle.getBlock<dtn::data::ScopeControlHopLimitBlock>();
00284                                                 schl.increment();
00285                                         } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) { }
00286 
00287                                         // raise default bundle received event
00288                                         dtn::net::BundleReceivedEvent::raise(n.getEID(), bundle, false, true);
00289                                 }
00290                                 catch (const dtn::data::Validator::RejectedException &ex)
00291                                 {
00292                                         // display the rejection
00293                                         IBRCOMMON_LOGGER(warning) << "bundle has been rejected: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00294                                 }
00295                                 catch (const dtn::InvalidDataException &ex) {
00296                                         // display the rejection
00297                                         IBRCOMMON_LOGGER(warning) << "invalid bundle-data received: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00298                                 }
00299                         }
00300                 }
00301 
00302                 ibrcommon::File FileConvergenceLayer::getPath(const dtn::core::Node &n)
00303                 {
00304                         std::list<dtn::core::Node::URI> uris = n.get(dtn::core::Node::CONN_FILE);
00305 
00306                         // abort the transfer, if no URI exists
00307                         if (uris.size() == 0) throw ibrcommon::Exception("path not defined");
00308 
00309                         // get the URI of the file path
00310                         const std::string &uri = uris.front().value;
00311 
00312                         if (uri.substr(0, 7) != "file://") throw ibrcommon::Exception("path invalid");
00313 
00314                         return ibrcommon::File(uri.substr(7, uri.length() - 7));
00315                 }
00316 
00317                 std::list<dtn::data::MetaBundle> FileConvergenceLayer::scan(const ibrcommon::File &path)
00318                 {
00319                         std::list<dtn::data::MetaBundle> ret;
00320                         std::list<ibrcommon::File> files;
00321 
00322                         // list all files in the folder
00323                         path.getFiles(files);
00324 
00325                         for (std::list<ibrcommon::File>::const_iterator iter = files.begin(); iter != files.end(); iter++)
00326                         {
00327                                 const ibrcommon::File &f = (*iter);
00328 
00329                                 // skip system files
00330                                 if (f.isSystem()) continue;
00331 
00332                                 try {
00333                                         // open the file
00334                                         std::fstream fs(f.getPath().c_str(), std::fstream::in);
00335 
00336                                         // get a deserializer
00337                                         dtn::data::DefaultDeserializer d(fs);
00338 
00339                                         dtn::data::MetaBundle meta;
00340 
00341                                         // load meta data
00342                                         d >> meta;
00343 
00344                                         if (meta.expiretime < dtn::utils::Clock::getTime())
00345                                         {
00346                                                 dtn::core::BundleEvent::raise(meta, dtn::core::BUNDLE_DELETED, dtn::data::StatusReportBlock::LIFETIME_EXPIRED);
00347                                                 throw ibrcommon::Exception("bundle is expired");
00348                                         }
00349 
00350                                         // put the meta bundle in the list
00351                                         ret.push_back(meta);
00352                                 } catch (const std::exception&) {
00353                                         IBRCOMMON_LOGGER_DEBUG(34) << "bundle in file " << f.getPath() << " invalid or expired" << IBRCOMMON_LOGGER_ENDL;
00354 
00355                                         // delete the file
00356                                         ibrcommon::File(f).remove();
00357                                 }
00358                         }
00359 
00360                         return ret;
00361                 }
00362 
00363                 void FileConvergenceLayer::queue(const dtn::core::Node &n, const ConvergenceLayer::Job &job)
00364                 {
00365                         _tasks.push(new StoreBundleTask(n, job));
00366                 }
00367 
00368                 void FileConvergenceLayer::replyHandshake(const dtn::data::Bundle &bundle, std::list<dtn::data::MetaBundle> &bl)
00369                 {
00370                         // read the ecm
00371                         const dtn::data::PayloadBlock &p = bundle.getBlock<dtn::data::PayloadBlock>();
00372                         ibrcommon::BLOB::Reference ref = p.getBLOB();
00373                         dtn::routing::NodeHandshake request;
00374 
00375                         // locked within this region
00376                         {
00377                                 ibrcommon::BLOB::iostream s = ref.iostream();
00378                                 (*s) >> request;
00379                         }
00380 
00381                         // if this is a request answer with an summary vector
00382                         if (request.getType() == dtn::routing::NodeHandshake::HANDSHAKE_REQUEST)
00383                         {
00384                                 // create a new request for the summary vector of the neighbor
00385                                 dtn::routing::NodeHandshake response(dtn::routing::NodeHandshake::HANDSHAKE_RESPONSE);
00386 
00387                                 if (request.hasRequest(dtn::routing::BloomFilterSummaryVector::identifier))
00388                                 {
00389                                         // add own summary vector to the message
00390                                         dtn::routing::SummaryVector vec;
00391 
00392                                         // add bundles in the path
00393                                         for (std::list<dtn::data::MetaBundle>::const_iterator iter = bl.begin(); iter != bl.end(); iter++)
00394                                         {
00395                                                 vec.add(*iter);
00396                                         }
00397 
00398                                         // add bundles from the blacklist
00399                                         {
00400                                                 ibrcommon::MutexLock l(_blacklist_mutex);
00401                                                 for (std::set<dtn::data::MetaBundle>::const_iterator iter = _blacklist.begin(); iter != _blacklist.end(); iter++)
00402                                                 {
00403                                                         vec.add(*iter);
00404                                                 }
00405                                         }
00406 
00407                                         // create an item
00408                                         dtn::routing::BloomFilterSummaryVector *item = new dtn::routing::BloomFilterSummaryVector(vec);
00409 
00410                                         // add it to the handshake
00411                                         response.addItem(item);
00412                                 }
00413 
00414                                 // create a new bundle
00415                                 dtn::data::Bundle answer;
00416 
00417                                 // set the source of the bundle
00418                                 answer._source = bundle._destination;
00419 
00420                                 // set the destination of the bundle
00421                                 answer.set(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON, true);
00422                                 answer._destination = bundle._source;
00423 
00424                                 // limit the lifetime to 60 seconds
00425                                 answer._lifetime = 60;
00426 
00427                                 // set high priority
00428                                 answer.set(dtn::data::PrimaryBlock::PRIORITY_BIT1, false);
00429                                 answer.set(dtn::data::PrimaryBlock::PRIORITY_BIT2, true);
00430 
00431                                 dtn::data::PayloadBlock &p = answer.push_back<PayloadBlock>();
00432                                 ibrcommon::BLOB::Reference ref = p.getBLOB();
00433 
00434                                 // serialize the request into the payload
00435                                 {
00436                                         ibrcommon::BLOB::iostream ios = ref.iostream();
00437                                         (*ios) << response;
00438                                 }
00439 
00440                                 // add a schl block
00441                                 dtn::data::ScopeControlHopLimitBlock &schl = answer.push_front<dtn::data::ScopeControlHopLimitBlock>();
00442                                 schl.setLimit(1);
00443 
00444                                 // raise default bundle received event
00445                                 dtn::net::BundleReceivedEvent::raise(bundle._destination.getNode(), answer, false, true);
00446                         }
00447                 }
00448         } /* namespace net */
00449 } /* namespace dtn */