IBR-DTNSuite  0.8
daemon/src/core/AbstractWorker.cpp
Go to the documentation of this file.
00001 /*
00002  * AbstractWorker.cpp
00003  *
00004  *  Created on: 30.10.2009
00005  *      Author: morgenro
00006  */
00007 
00008 #include "config.h"
00009 #include "core/AbstractWorker.h"
00010 #include "core/BundleCore.h"
00011 #include "routing/QueueBundleEvent.h"
00012 #include "core/BundleGeneratedEvent.h"
00013 #include "core/BundleEvent.h"
00014 #include "core/BundlePurgeEvent.h"
00015 #ifdef WITH_BUNDLE_SECURITY
00016 #include "security/SecurityManager.h"
00017 #endif
00018 #ifdef WITH_COMPRESSION
00019 #include <ibrdtn/data/CompressedPayloadBlock.h>
00020 #endif
00021 #include <ibrcommon/thread/MutexLock.h>
00022 #include <ibrcommon/Logger.h>
00023 #include <typeinfo>
00024 
00025 namespace dtn
00026 {
00027         namespace core
00028         {
00029                 AbstractWorker::AbstractWorkerAsync::AbstractWorkerAsync(AbstractWorker &worker)
00030                  : _worker(worker), _running(true)
00031                 {
00032                         bindEvent(dtn::routing::QueueBundleEvent::className);
00033                 }
00034 
00035                 AbstractWorker::AbstractWorkerAsync::~AbstractWorkerAsync()
00036                 {
00037                         unbindEvent(dtn::routing::QueueBundleEvent::className);
00038                         shutdown();
00039                 }
00040 
00041                 void AbstractWorker::AbstractWorkerAsync::raiseEvent(const dtn::core::Event *evt)
00042                 {
00043                         try {
00044                                 const dtn::routing::QueueBundleEvent &queued = dynamic_cast<const dtn::routing::QueueBundleEvent&>(*evt);
00045 
00046                                 // ignore fragments - we can not deliver them directly to the client
00047                                 if (queued.bundle.fragment) return;
00048 
00049                                 // check for bundle destination
00050                                 if (queued.bundle.destination == _worker._eid)
00051                                 {
00052                                         _receive_bundles.push(queued.bundle);
00053                                         return;
00054                                 }
00055 
00056                                 // if the bundle is a singleton, stop here
00057                                 if (queued.bundle.procflags & dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON) return;
00058 
00059                                 // check for subscribed groups
00060                                 if (_worker._groups.find(queued.bundle.destination) != _worker._groups.end())
00061                                 {
00062                                         _receive_bundles.push(queued.bundle);
00063                                         return;
00064                                 }
00065                         } catch (const std::bad_cast&) { }
00066                 }
00067 
00068                 void AbstractWorker::AbstractWorkerAsync::shutdown()
00069                 {
00070                         _running = false;
00071                         _receive_bundles.abort();
00072 
00073                         join();
00074                 }
00075 
00076                 void AbstractWorker::AbstractWorkerAsync::run()
00077                 {
00078                         dtn::storage::BundleStorage &storage = BundleCore::getInstance().getStorage();
00079 
00080                         try {
00081                                 while (_running)
00082                                 {
00083                                         dtn::data::BundleID id = _receive_bundles.getnpop(true);
00084 
00085                                         try {
00086                                                 dtn::data::Bundle b = storage.get( id );
00087                                                 prepareBundle(b);
00088                                                 _worker.callbackBundleReceived( b );
00089 
00090                                                 // raise bundle event
00091                                                 dtn::core::BundleEvent::raise(b, BUNDLE_DELIVERED);
00092 
00093                                                 // remove the bundle from the storage
00094                                                 dtn::core::BundlePurgeEvent::raise(id);
00095                                         } catch (const dtn::storage::BundleStorage::NoBundleFoundException&) { };
00096 
00097                                         yield();
00098                                 }
00099                         } catch (const ibrcommon::QueueUnblockedException&) {
00100                                 // queue was aborted by another call
00101                         }
00102                 }
00103 
00104                 void AbstractWorker::AbstractWorkerAsync::__cancellation()
00105                 {
00106                         // cancel the main thread in here
00107                         _receive_bundles.abort();
00108                 }
00109 
00110                 void AbstractWorker::AbstractWorkerAsync::prepareBundle(dtn::data::Bundle &bundle) const
00111                 {
00112                         // process the bundle block (security, compression, ...)
00113                         dtn::core::BundleCore::processBlocks(bundle);
00114                 }
00115 
00116                 AbstractWorker::AbstractWorker() : _thread(*this)
00117                 {
00118                 };
00119 
00120                 void AbstractWorker::subscribe(const dtn::data::EID &endpoint)
00121                 {
00122                         _groups.insert(endpoint);
00123                 }
00124 
00125                 void AbstractWorker::unsubscribe(const dtn::data::EID &endpoint)
00126                 {
00127                         _groups.erase(endpoint);
00128                 }
00129 
00130                 void AbstractWorker::initialize(const string uri, const size_t cbhe, bool async)
00131                 {
00132                         if (BundleCore::local.getScheme() == dtn::data::EID::CBHE_SCHEME)
00133                         {
00134                                 std::stringstream cbhe_id; cbhe_id << cbhe;
00135                                 _eid = BundleCore::local + BundleCore::local.getDelimiter() + cbhe_id.str();
00136                         }
00137                         else
00138                         {
00139                                 _eid = BundleCore::local + uri;
00140                         }
00141 
00142                         try {
00143                                 if (async) _thread.start();
00144                         } catch (const ibrcommon::ThreadException &ex) {
00145                                 IBRCOMMON_LOGGER(error) << "failed to start thread in AbstractWorker\n" << ex.what() << IBRCOMMON_LOGGER_ENDL;
00146                         }
00147                 }
00148 
00149                 AbstractWorker::~AbstractWorker()
00150                 {
00151                         shutdown();
00152                 };
00153 
00154                 void AbstractWorker::shutdown()
00155                 {
00156                         // wait for the async thread
00157                         _thread.shutdown();
00158                 }
00159 
00160                 const EID AbstractWorker::getWorkerURI() const
00161                 {
00162                         return _eid;
00163                 }
00164 
00165                 void AbstractWorker::transmit(const Bundle &bundle)
00166                 {
00167                         dtn::core::BundleGeneratedEvent::raise(bundle);
00168                 }
00169         }
00170 }