IBR-DTNSuite  0.8
ibrdtn/ibrdtn/data/BundleMerger.cpp
Go to the documentation of this file.
00001 /*
00002  * BundleMerger.cpp
00003  *
00004  *  Created on: 25.01.2010
00005  *      Author: morgenro
00006  */
00007 
00008 #include "ibrdtn/config.h"
00009 #include "ibrdtn/data/BundleMerger.h"
00010 #include "ibrdtn/data/Block.h"
00011 #include "ibrdtn/data/PayloadBlock.h"
00012 #include "ibrdtn/data/Exceptions.h"
00013 #include <ibrcommon/thread/MutexLock.h>
00014 #include <ibrcommon/Logger.h>
00015 
00016 namespace dtn
00017 {
00018         namespace data
00019         {
00020                 BundleMerger::Container::Container(ibrcommon::BLOB::Reference &ref)
00021                  : _bundle(), _blob(ref), _initialized(false), _appdatalength(0)
00022                 {
00023                 }
00024 
00025                 BundleMerger::Container::~Container()
00026                 {
00027 
00028                 }
00029 
00030                 bool BundleMerger::Container::isComplete()
00031                 {
00032                         return Chunk::isComplete(_appdatalength, _chunks);
00033                 }
00034 
00035                 Bundle& BundleMerger::Container::getBundle()
00036                 {
00037                         return _bundle;
00038                 }
00039 
00040                 bool BundleMerger::Container::contains(size_t offset, size_t length) const
00041                 {
00042                         // check if the offered payload is already in the chunk list
00043                         for (std::set<Chunk>::const_iterator iter = _chunks.begin(); iter != _chunks.end(); iter++)
00044                         {
00045                                 const Chunk &chunk = (*iter);
00046 
00047                                 if (offset < chunk.offset) break;
00048 
00049                                 if ( (offset >= chunk.offset) && (offset < (chunk.offset + chunk.length)) )
00050                                 {
00051                                         if ((offset + length) <= (chunk.offset + chunk.length)) return true;
00052                                 }
00053                         }
00054 
00055                         return false;
00056                 }
00057 
00058                 void BundleMerger::Container::add(size_t offset, size_t length)
00059                 {
00060                         BundleMerger::Chunk chunk(offset, length);
00061                         _chunks.insert(chunk);
00062                 }
00063 
00064                 BundleMerger::Container &operator<<(BundleMerger::Container &c, dtn::data::Bundle &obj)
00065                 {
00066                         // check if the given bundle is a fragment
00067                         if (!obj.get(dtn::data::Bundle::FRAGMENT))
00068                         {
00069                                 throw ibrcommon::Exception("This bundle is not a fragment!");
00070                         }
00071 
00072                         if (c._initialized)
00073                         {
00074                                 if (    (c._bundle._timestamp != obj._timestamp) ||
00075                                                 (c._bundle._sequencenumber != obj._sequencenumber) ||
00076                                                 (c._bundle._source != obj._source) )
00077                                         throw ibrcommon::Exception("This fragment does not belongs to the others.");
00078                         }
00079                         else
00080                         {
00081                                 // copy the bundle
00082                                 c._bundle = obj;
00083 
00084                                 // store the app data length
00085                                 c._appdatalength = obj._appdatalength;
00086 
00087                                 // remove all block of the copy
00088                                 c._bundle.clearBlocks();
00089 
00090                                 // mark the copy as non-fragment
00091                                 c._bundle.set(dtn::data::Bundle::FRAGMENT, false);
00092 
00093                                 // add a new payloadblock
00094                                 c._bundle.push_back(c._blob);
00095 
00096                                 c._initialized = true;
00097                         }
00098 
00099                         ibrcommon::BLOB::iostream stream = c._blob.iostream();
00100                         (*stream).seekp(obj._fragmentoffset);
00101 
00102                         dtn::data::PayloadBlock &p = obj.getBlock<dtn::data::PayloadBlock>();
00103                         const size_t plength = p.getLength();
00104 
00105                         // skip write operation if chunk is already in the merged bundle
00106                         if (c.contains(obj._fragmentoffset, plength)) return c;
00107 
00108                         ibrcommon::BLOB::Reference ref = p.getBLOB();
00109                         ibrcommon::BLOB::iostream s = ref.iostream();
00110 
00111                         // copy payload of the fragment into the new blob
00112                         (*stream) << (*s).rdbuf() << std::flush;
00113 
00114                         // add the chunk to the list of chunks
00115                         c.add(obj._fragmentoffset, plength);
00116 
00117                         return c;
00118                 }
00119 
00120                 BundleMerger::Container BundleMerger::getContainer()
00121                 {
00122                         ibrcommon::BLOB::Reference ref = ibrcommon::BLOB::create();
00123                         return Container(ref);
00124                 }
00125 
00126                 BundleMerger::Container BundleMerger::getContainer(ibrcommon::BLOB::Reference &ref)
00127                 {
00128                         return Container(ref);
00129                 }
00130 
00131                 BundleMerger::Chunk::Chunk(size_t o, size_t l)
00132                  : offset(o), length(l)
00133                 {
00134                 }
00135 
00136                 BundleMerger::Chunk::~Chunk()
00137                 {
00138                 }
00139 
00140                 bool BundleMerger::Chunk::operator<(const Chunk &other) const
00141                 {
00142                         if (offset < other.offset) return true;
00143                         if (offset != other.offset) return false;
00144 
00145                         return (length < other.length);
00146                 }
00147 
00148                 bool BundleMerger::Chunk::isComplete(size_t length, const std::set<Chunk> &chunks)
00149                 {
00150                         // check if the bundle payload is complete
00151                         size_t position = 0;
00152 
00153                         for (std::set<Chunk>::const_iterator iter = chunks.begin(); iter != chunks.end(); iter++)
00154                         {
00155                                 const Chunk &chunk = (*iter);
00156 
00157                                 // if the next offset is too small, we do not got all fragments
00158                                 if (chunk.offset > position) return false;
00159 
00160                                 position = chunk.offset + chunk.length;
00161                         }
00162 
00163                         // return true, if we reached the application data length
00164                         return (position >= length);
00165                 }
00166         }
00167 }