IBR-DTNSuite  0.8
daemon/src/core/FragmentManager.cpp
Go to the documentation of this file.
00001 /*
00002  * FragmentManager.cpp
00003  *
00004  *  Created on: 03.02.2012
00005  *      Author: morgenro
00006  */
00007 
00008 #include "core/FragmentManager.h"
00009 #include "core/BundleCore.h"
00010 #include "core/EventSwitch.h"
00011 #include "core/TimeEvent.h"
00012 #include "core/BundlePurgeEvent.h"
00013 #include "net/BundleReceivedEvent.h"
00014 #include "routing/QueueBundleEvent.h"
00015 #include <ibrdtn/data/BundleMerger.h>
00016 #include <ibrdtn/utils/Clock.h>
00017 #include <ibrcommon/Logger.h>
00018 #include <ibrcommon/thread/MutexLock.h>
00019 
00020 namespace dtn
00021 {
00022         namespace core
00023         {
00024                 ibrcommon::Mutex FragmentManager::_offsets_mutex;
00025                 std::set<FragmentManager::Transmission> FragmentManager::_offsets;
00026 
00027                 FragmentManager::FragmentManager()
00028                  : _running(false)
00029                 {
00030                 }
00031 
00032                 FragmentManager::~FragmentManager()
00033                 {
00034                 }
00035 
00036                 const std::string FragmentManager::getName() const
00037                 {
00038                         return "FragmentManager";
00039                 }
00040 
00041                 void FragmentManager::__cancellation()
00042                 {
00043                         _running = false;
00044                         _incoming.abort();
00045                 }
00046 
00047                 void FragmentManager::componentUp()
00048                 {
00049                         bindEvent(dtn::routing::QueueBundleEvent::className);
00050                         bindEvent(dtn::core::TimeEvent::className);
00051                         _running = true;
00052                 }
00053 
00054                 void FragmentManager::componentRun()
00055                 {
00056                         // TODO: scan storage for fragments to reassemble on startup
00057 
00058                         // create a task loop to reassemble fragments asynchronously
00059                         try {
00060                                 while (_running)
00061                                 {
00062                                         dtn::data::MetaBundle meta = _incoming.getnpop(true);
00063 
00064                                         // search for matching bundles
00065                                         const std::list<dtn::data::MetaBundle> list = search(meta);
00066 
00067                                         IBRCOMMON_LOGGER_DEBUG(20) << "found " << list.size() << " fragments similar to bundle " << meta.toString() << IBRCOMMON_LOGGER_ENDL;
00068 
00069                                         // TODO: drop fragments if other fragments available containing the same payload
00070 
00071                                         // check first if all fragment are available
00072                                         std::set<BundleMerger::Chunk> chunks;
00073                                         for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); iter++)
00074                                         {
00075                                                 const dtn::data::MetaBundle &m = (*iter);
00076                                                 if (meta.payloadlength > 0)
00077                                                 {
00078                                                         BundleMerger::Chunk chunk(m.offset, m.payloadlength);
00079                                                         chunks.insert(chunk);
00080                                                 }
00081                                         }
00082 
00083                                         // wait for the next bundle if the fragment is not complete
00084                                         if (!BundleMerger::Chunk::isComplete(meta.appdatalength, chunks)) continue;
00085 
00086                                         // create a new bundle merger container
00087                                         dtn::data::BundleMerger::Container c = dtn::data::BundleMerger::getContainer();
00088 
00089                                         for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); iter++)
00090                                         {
00091                                                 const dtn::data::MetaBundle &meta = (*iter);
00092 
00093                                                 if (meta.payloadlength > 0)
00094                                                 {
00095                                                         IBRCOMMON_LOGGER_DEBUG(20) << "fragment: " << (*iter).toString() << IBRCOMMON_LOGGER_ENDL;
00096 
00097                                                         try {
00098                                                                 // load bundle from storage
00099                                                                 dtn::data::Bundle bundle = dtn::core::BundleCore::getInstance().getStorage().get(*iter);
00100 
00101                                                                 // merge the bundle
00102                                                                 c << bundle;
00103                                                         } catch (const dtn::storage::BundleStorage::NoBundleFoundException&) {
00104                                                                 IBRCOMMON_LOGGER(error) << "could not load fragment to merge bundle" << IBRCOMMON_LOGGER_ENDL;
00105                                                         };
00106                                                 }
00107                                         }
00108 
00109                                         if (c.isComplete())
00110                                         {
00111                                                 dtn::data::Bundle &merged = c.getBundle();
00112 
00113                                                 // raise default bundle received event
00114                                                 dtn::net::BundleReceivedEvent::raise(dtn::core::BundleCore::local, merged, true, true);
00115 
00116                                                 // delete all fragments of the merged bundle
00117                                                 for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); iter++)
00118                                                 {
00119                                                         dtn::core::BundlePurgeEvent::raise(*iter);
00120                                                 }
00121                                         }
00122                                 }
00123                         } catch (const ibrcommon::QueueUnblockedException&) { }
00124                 }
00125 
00126                 void FragmentManager::componentDown()
00127                 {
00128                         unbindEvent(dtn::routing::QueueBundleEvent::className);
00129                         unbindEvent(dtn::core::TimeEvent::className);
00130                 }
00131 
00132                 void FragmentManager::raiseEvent(const Event *evt)
00133                 {
00134                         try {
00135                                 const dtn::core::TimeEvent &time = dynamic_cast<const dtn::core::TimeEvent&>(*evt);
00136                                 _fragments.expire(time.getTimestamp());
00137                                 FragmentManager::expire_offsets(time.getTimestamp());
00138                         } catch (const std::bad_cast&) {}
00139 
00140                         try {
00141                                 const dtn::routing::QueueBundleEvent &queued = dynamic_cast<const dtn::routing::QueueBundleEvent&>(*evt);
00142 
00143                                 // process fragments
00144                                 if (queued.bundle.fragment) signal(queued.bundle);
00145                         } catch (const std::bad_cast&) {}
00146                 }
00147 
00148                 void FragmentManager::signal(const dtn::data::MetaBundle &meta)
00149                 {
00150                         // do not merge a bundle if it is non-local and singleton
00151                         // we only touch local and group bundles which might be delivered locally
00152                         if (meta.get(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON))
00153                         {
00154                                 if (meta.destination.getNode() != dtn::core::BundleCore::local)
00155                                 {
00156                                         return;
00157                                 }
00158                         }
00159 
00160                         // push the meta bundle into the incoming queue
00161                         _incoming.push(meta);
00162                 }
00163 
00164                 const std::list<dtn::data::MetaBundle> FragmentManager::search(const dtn::data::MetaBundle &meta)
00165                 {
00166                         class BundleFilter : public dtn::storage::BundleStorage::BundleFilterCallback
00167                         {
00168                         public:
00169                                 BundleFilter(const dtn::data::MetaBundle &meta)
00170                                  : _similar(meta)
00171                                 {};
00172 
00173                                 virtual ~BundleFilter() {};
00174 
00175                                 virtual size_t limit() const { return 0; };
00176 
00177                                 virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const
00178                                 {
00179                                         // fragments only
00180                                         if (!meta.fragment) return false;
00181 
00182                                         // with the same unique bundle id
00183                                         if (meta.source != _similar.source) return false;
00184                                         if (meta.timestamp != _similar.timestamp) return false;
00185                                         if (meta.sequencenumber != _similar.sequencenumber) return false;
00186 
00187                                         return true;
00188                                 };
00189 
00190                         private:
00191                                 const dtn::data::MetaBundle &_similar;
00192                         };
00193 
00194                         // create a bundle filter
00195                         BundleFilter filter(meta);
00196                         dtn::storage::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage();
00197                         return storage.get(filter);
00198                 }
00199 
00200                 void FragmentManager::setOffset(const dtn::data::EID &peer, const dtn::data::BundleID &id, size_t abs_offset)
00201                 {
00202                         try {
00203                                 Transmission t;
00204                                 dtn::data::Bundle b = dtn::core::BundleCore::getInstance().getStorage().get(id);
00205                                 t.offset = get_payload_offset(b, abs_offset);
00206 
00207                                 if (t.offset <= 0) return;
00208 
00209                                 t.id = id;
00210                                 t.peer = peer;
00211                                 t.expires = dtn::utils::Clock::getExpireTime( b );
00212 
00213                                 IBRCOMMON_LOGGER_DEBUG(20) << "[FragmentManager] store offset of partial transmitted bundle " <<
00214                                                 id.toString() << "; offset: " << t.offset << " (" << abs_offset << ")" << IBRCOMMON_LOGGER_ENDL;
00215 
00216                                 ibrcommon::MutexLock l(_offsets_mutex);
00217                                 _offsets.erase(t);
00218                                 _offsets.insert(t);
00219                         } catch (const dtn::storage::BundleStorage::NoBundleFoundException&) { };
00220                 }
00221 
00222                 size_t FragmentManager::getOffset(const dtn::data::EID &peer, const dtn::data::BundleID &id)
00223                 {
00224                         ibrcommon::MutexLock l(_offsets_mutex);
00225                         for (std::set<Transmission>::const_iterator iter = _offsets.begin(); iter != _offsets.end(); iter++)
00226                         {
00227                                 const Transmission &t = (*iter);
00228                                 if (t.peer != peer) continue;
00229                                 if (t.id != id) continue;
00230                                 return t.offset;
00231                         }
00232 
00233                         return 0;
00234                 }
00235 
00236                 size_t FragmentManager::get_payload_offset(const dtn::data::Bundle &bundle, size_t abs_offset)
00237                 {
00238                         dtn::data::DefaultSerializer serializer(std::cout);
00239                         size_t header = serializer.getLength((dtn::data::PrimaryBlock&)bundle);
00240 
00241                         const std::list<const dtn::data::Block*> blocks = bundle.getBlocks();
00242                         for (std::list<const dtn::data::Block*>::const_iterator iter = blocks.begin(); iter != blocks.end(); iter++)
00243                         {
00244                                 const dtn::data::Block *b = (*iter);
00245                                 header += serializer.getLength(*b);
00246 
00247                                 try {
00248                                         const dtn::data::PayloadBlock &payload = dynamic_cast<const dtn::data::PayloadBlock&>(*b);
00249                                         header -= payload.getLength();
00250                                         if (abs_offset < header) return 0;
00251                                         return abs_offset - header;
00252                                 } catch (std::bad_cast&) { };
00253                         }
00254 
00255                         return 0;
00256                 }
00257 
00258                 void FragmentManager::expire_offsets(size_t timestamp)
00259                 {
00260                         ibrcommon::MutexLock l(_offsets_mutex);
00261                         for (std::set<Transmission>::iterator iter = _offsets.begin(); iter != _offsets.end();)
00262                         {
00263                                 const Transmission &t = (*iter);
00264                                 if (t.expires >= timestamp) return;
00265                                 _offsets.erase(iter++);
00266                         }
00267                 }
00268 
00269                 FragmentManager::Transmission::Transmission()
00270                  : offset(0), expires(0)
00271                 {
00272                 }
00273 
00274                 FragmentManager::Transmission::~Transmission()
00275                 {
00276                 }
00277 
00278                 bool FragmentManager::Transmission::operator<(const Transmission &other) const
00279                 {
00280                         if (expires < other.expires) return true;
00281                         if (expires != other.expires) return false;
00282 
00283                         if (peer < other.peer) return true;
00284                         if (peer != other.peer) return false;
00285 
00286                         return (id < other.id);
00287                 }
00288 
00289                 bool FragmentManager::Transmission::operator==(const Transmission &other) const
00290                 {
00291                         if (expires != other.expires) return false;
00292                         if (peer != other.peer) return false;
00293                         if (id != other.id) return false;
00294 
00295                         return true;
00296                 }
00297         } /* namespace core */
00298 } /* namespace dtn */