IBR-DTNSuite
0.8
|
00001 /* 00002 * BundleMerger.cpp 00003 * 00004 * Created on: 25.01.2010 00005 * Author: morgenro 00006 */ 00007 00008 #include "ibrdtn/config.h" 00009 #include "ibrdtn/data/BundleMerger.h" 00010 #include "ibrdtn/data/Block.h" 00011 #include "ibrdtn/data/PayloadBlock.h" 00012 #include "ibrdtn/data/Exceptions.h" 00013 #include <ibrcommon/thread/MutexLock.h> 00014 #include <ibrcommon/Logger.h> 00015 00016 namespace dtn 00017 { 00018 namespace data 00019 { 00020 BundleMerger::Container::Container(ibrcommon::BLOB::Reference &ref) 00021 : _bundle(), _blob(ref), _initialized(false), _appdatalength(0) 00022 { 00023 } 00024 00025 BundleMerger::Container::~Container() 00026 { 00027 00028 } 00029 00030 bool BundleMerger::Container::isComplete() 00031 { 00032 return Chunk::isComplete(_appdatalength, _chunks); 00033 } 00034 00035 Bundle& BundleMerger::Container::getBundle() 00036 { 00037 return _bundle; 00038 } 00039 00040 bool BundleMerger::Container::contains(size_t offset, size_t length) const 00041 { 00042 // check if the offered payload is already in the chunk list 00043 for (std::set<Chunk>::const_iterator iter = _chunks.begin(); iter != _chunks.end(); iter++) 00044 { 00045 const Chunk &chunk = (*iter); 00046 00047 if (offset < chunk.offset) break; 00048 00049 if ( (offset >= chunk.offset) && (offset < (chunk.offset + chunk.length)) ) 00050 { 00051 if ((offset + length) <= (chunk.offset + chunk.length)) return true; 00052 } 00053 } 00054 00055 return false; 00056 } 00057 00058 void BundleMerger::Container::add(size_t offset, size_t length) 00059 { 00060 BundleMerger::Chunk chunk(offset, length); 00061 _chunks.insert(chunk); 00062 } 00063 00064 BundleMerger::Container &operator<<(BundleMerger::Container &c, dtn::data::Bundle &obj) 00065 { 00066 // check if the given bundle is a fragment 00067 if (!obj.get(dtn::data::Bundle::FRAGMENT)) 00068 { 00069 throw ibrcommon::Exception("This bundle is not a fragment!"); 00070 } 00071 00072 if (c._initialized) 00073 { 00074 if ( (c._bundle._timestamp != obj._timestamp) || 00075 (c._bundle._sequencenumber != obj._sequencenumber) || 00076 (c._bundle._source != obj._source) ) 00077 throw ibrcommon::Exception("This fragment does not belongs to the others."); 00078 } 00079 else 00080 { 00081 // copy the bundle 00082 c._bundle = obj; 00083 00084 // store the app data length 00085 c._appdatalength = obj._appdatalength; 00086 00087 // remove all block of the copy 00088 c._bundle.clearBlocks(); 00089 00090 // mark the copy as non-fragment 00091 c._bundle.set(dtn::data::Bundle::FRAGMENT, false); 00092 00093 // add a new payloadblock 00094 c._bundle.push_back(c._blob); 00095 00096 c._initialized = true; 00097 } 00098 00099 ibrcommon::BLOB::iostream stream = c._blob.iostream(); 00100 (*stream).seekp(obj._fragmentoffset); 00101 00102 dtn::data::PayloadBlock &p = obj.getBlock<dtn::data::PayloadBlock>(); 00103 const size_t plength = p.getLength(); 00104 00105 // skip write operation if chunk is already in the merged bundle 00106 if (c.contains(obj._fragmentoffset, plength)) return c; 00107 00108 ibrcommon::BLOB::Reference ref = p.getBLOB(); 00109 ibrcommon::BLOB::iostream s = ref.iostream(); 00110 00111 // copy payload of the fragment into the new blob 00112 (*stream) << (*s).rdbuf() << std::flush; 00113 00114 // add the chunk to the list of chunks 00115 c.add(obj._fragmentoffset, plength); 00116 00117 return c; 00118 } 00119 00120 BundleMerger::Container BundleMerger::getContainer() 00121 { 00122 ibrcommon::BLOB::Reference ref = ibrcommon::BLOB::create(); 00123 return Container(ref); 00124 } 00125 00126 BundleMerger::Container BundleMerger::getContainer(ibrcommon::BLOB::Reference &ref) 00127 { 00128 return Container(ref); 00129 } 00130 00131 BundleMerger::Chunk::Chunk(size_t o, size_t l) 00132 : offset(o), length(l) 00133 { 00134 } 00135 00136 BundleMerger::Chunk::~Chunk() 00137 { 00138 } 00139 00140 bool BundleMerger::Chunk::operator<(const Chunk &other) const 00141 { 00142 if (offset < other.offset) return true; 00143 if (offset != other.offset) return false; 00144 00145 return (length < other.length); 00146 } 00147 00148 bool BundleMerger::Chunk::isComplete(size_t length, const std::set<Chunk> &chunks) 00149 { 00150 // check if the bundle payload is complete 00151 size_t position = 0; 00152 00153 for (std::set<Chunk>::const_iterator iter = chunks.begin(); iter != chunks.end(); iter++) 00154 { 00155 const Chunk &chunk = (*iter); 00156 00157 // if the next offset is too small, we do not got all fragments 00158 if (chunk.offset > position) return false; 00159 00160 position = chunk.offset + chunk.length; 00161 } 00162 00163 // return true, if we reached the application data length 00164 return (position >= length); 00165 } 00166 } 00167 }