IBR-DTNSuite
0.8
|
00001 /* 00002 * AbstractWorker.cpp 00003 * 00004 * Created on: 30.10.2009 00005 * Author: morgenro 00006 */ 00007 00008 #include "config.h" 00009 #include "core/AbstractWorker.h" 00010 #include "core/BundleCore.h" 00011 #include "routing/QueueBundleEvent.h" 00012 #include "core/BundleGeneratedEvent.h" 00013 #include "core/BundleEvent.h" 00014 #include "core/BundlePurgeEvent.h" 00015 #ifdef WITH_BUNDLE_SECURITY 00016 #include "security/SecurityManager.h" 00017 #endif 00018 #ifdef WITH_COMPRESSION 00019 #include <ibrdtn/data/CompressedPayloadBlock.h> 00020 #endif 00021 #include <ibrcommon/thread/MutexLock.h> 00022 #include <ibrcommon/Logger.h> 00023 #include <typeinfo> 00024 00025 namespace dtn 00026 { 00027 namespace core 00028 { 00029 AbstractWorker::AbstractWorkerAsync::AbstractWorkerAsync(AbstractWorker &worker) 00030 : _worker(worker), _running(true) 00031 { 00032 bindEvent(dtn::routing::QueueBundleEvent::className); 00033 } 00034 00035 AbstractWorker::AbstractWorkerAsync::~AbstractWorkerAsync() 00036 { 00037 unbindEvent(dtn::routing::QueueBundleEvent::className); 00038 shutdown(); 00039 } 00040 00041 void AbstractWorker::AbstractWorkerAsync::raiseEvent(const dtn::core::Event *evt) 00042 { 00043 try { 00044 const dtn::routing::QueueBundleEvent &queued = dynamic_cast<const dtn::routing::QueueBundleEvent&>(*evt); 00045 00046 // ignore fragments - we can not deliver them directly to the client 00047 if (queued.bundle.fragment) return; 00048 00049 // check for bundle destination 00050 if (queued.bundle.destination == _worker._eid) 00051 { 00052 _receive_bundles.push(queued.bundle); 00053 return; 00054 } 00055 00056 // if the bundle is a singleton, stop here 00057 if (queued.bundle.procflags & dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON) return; 00058 00059 // check for subscribed groups 00060 if (_worker._groups.find(queued.bundle.destination) != _worker._groups.end()) 00061 { 00062 _receive_bundles.push(queued.bundle); 00063 return; 00064 } 00065 } catch (const std::bad_cast&) { } 00066 } 00067 00068 void AbstractWorker::AbstractWorkerAsync::shutdown() 00069 { 00070 _running = false; 00071 _receive_bundles.abort(); 00072 00073 join(); 00074 } 00075 00076 void AbstractWorker::AbstractWorkerAsync::run() 00077 { 00078 dtn::storage::BundleStorage &storage = BundleCore::getInstance().getStorage(); 00079 00080 try { 00081 while (_running) 00082 { 00083 dtn::data::BundleID id = _receive_bundles.getnpop(true); 00084 00085 try { 00086 dtn::data::Bundle b = storage.get( id ); 00087 prepareBundle(b); 00088 _worker.callbackBundleReceived( b ); 00089 00090 // raise bundle event 00091 dtn::core::BundleEvent::raise(b, BUNDLE_DELIVERED); 00092 00093 // remove the bundle from the storage 00094 dtn::core::BundlePurgeEvent::raise(id); 00095 } catch (const dtn::storage::BundleStorage::NoBundleFoundException&) { }; 00096 00097 yield(); 00098 } 00099 } catch (const ibrcommon::QueueUnblockedException&) { 00100 // queue was aborted by another call 00101 } 00102 } 00103 00104 void AbstractWorker::AbstractWorkerAsync::__cancellation() 00105 { 00106 // cancel the main thread in here 00107 _receive_bundles.abort(); 00108 } 00109 00110 void AbstractWorker::AbstractWorkerAsync::prepareBundle(dtn::data::Bundle &bundle) const 00111 { 00112 // process the bundle block (security, compression, ...) 00113 dtn::core::BundleCore::processBlocks(bundle); 00114 } 00115 00116 AbstractWorker::AbstractWorker() : _thread(*this) 00117 { 00118 }; 00119 00120 void AbstractWorker::subscribe(const dtn::data::EID &endpoint) 00121 { 00122 _groups.insert(endpoint); 00123 } 00124 00125 void AbstractWorker::unsubscribe(const dtn::data::EID &endpoint) 00126 { 00127 _groups.erase(endpoint); 00128 } 00129 00130 void AbstractWorker::initialize(const string uri, const size_t cbhe, bool async) 00131 { 00132 if (BundleCore::local.getScheme() == dtn::data::EID::CBHE_SCHEME) 00133 { 00134 std::stringstream cbhe_id; cbhe_id << cbhe; 00135 _eid = BundleCore::local + BundleCore::local.getDelimiter() + cbhe_id.str(); 00136 } 00137 else 00138 { 00139 _eid = BundleCore::local + uri; 00140 } 00141 00142 try { 00143 if (async) _thread.start(); 00144 } catch (const ibrcommon::ThreadException &ex) { 00145 IBRCOMMON_LOGGER(error) << "failed to start thread in AbstractWorker\n" << ex.what() << IBRCOMMON_LOGGER_ENDL; 00146 } 00147 } 00148 00149 AbstractWorker::~AbstractWorker() 00150 { 00151 shutdown(); 00152 }; 00153 00154 void AbstractWorker::shutdown() 00155 { 00156 // wait for the async thread 00157 _thread.shutdown(); 00158 } 00159 00160 const EID AbstractWorker::getWorkerURI() const 00161 { 00162 return _eid; 00163 } 00164 00165 void AbstractWorker::transmit(const Bundle &bundle) 00166 { 00167 dtn::core::BundleGeneratedEvent::raise(bundle); 00168 } 00169 } 00170 }