IBR-DTNSuite
0.8
|
00001 /* 00002 * FragmentManager.cpp 00003 * 00004 * Created on: 03.02.2012 00005 * Author: morgenro 00006 */ 00007 00008 #include "core/FragmentManager.h" 00009 #include "core/BundleCore.h" 00010 #include "core/EventSwitch.h" 00011 #include "core/TimeEvent.h" 00012 #include "core/BundlePurgeEvent.h" 00013 #include "net/BundleReceivedEvent.h" 00014 #include "routing/QueueBundleEvent.h" 00015 #include <ibrdtn/data/BundleMerger.h> 00016 #include <ibrdtn/utils/Clock.h> 00017 #include <ibrcommon/Logger.h> 00018 #include <ibrcommon/thread/MutexLock.h> 00019 00020 namespace dtn 00021 { 00022 namespace core 00023 { 00024 ibrcommon::Mutex FragmentManager::_offsets_mutex; 00025 std::set<FragmentManager::Transmission> FragmentManager::_offsets; 00026 00027 FragmentManager::FragmentManager() 00028 : _running(false) 00029 { 00030 } 00031 00032 FragmentManager::~FragmentManager() 00033 { 00034 } 00035 00036 const std::string FragmentManager::getName() const 00037 { 00038 return "FragmentManager"; 00039 } 00040 00041 void FragmentManager::__cancellation() 00042 { 00043 _running = false; 00044 _incoming.abort(); 00045 } 00046 00047 void FragmentManager::componentUp() 00048 { 00049 bindEvent(dtn::routing::QueueBundleEvent::className); 00050 bindEvent(dtn::core::TimeEvent::className); 00051 _running = true; 00052 } 00053 00054 void FragmentManager::componentRun() 00055 { 00056 // TODO: scan storage for fragments to reassemble on startup 00057 00058 // create a task loop to reassemble fragments asynchronously 00059 try { 00060 while (_running) 00061 { 00062 dtn::data::MetaBundle meta = _incoming.getnpop(true); 00063 00064 // search for matching bundles 00065 const std::list<dtn::data::MetaBundle> list = search(meta); 00066 00067 IBRCOMMON_LOGGER_DEBUG(20) << "found " << list.size() << " fragments similar to bundle " << meta.toString() << IBRCOMMON_LOGGER_ENDL; 00068 00069 // TODO: drop fragments if other fragments available containing the same payload 00070 00071 // check first if all fragment are available 00072 std::set<BundleMerger::Chunk> chunks; 00073 for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); iter++) 00074 { 00075 const dtn::data::MetaBundle &m = (*iter); 00076 if (meta.payloadlength > 0) 00077 { 00078 BundleMerger::Chunk chunk(m.offset, m.payloadlength); 00079 chunks.insert(chunk); 00080 } 00081 } 00082 00083 // wait for the next bundle if the fragment is not complete 00084 if (!BundleMerger::Chunk::isComplete(meta.appdatalength, chunks)) continue; 00085 00086 // create a new bundle merger container 00087 dtn::data::BundleMerger::Container c = dtn::data::BundleMerger::getContainer(); 00088 00089 for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); iter++) 00090 { 00091 const dtn::data::MetaBundle &meta = (*iter); 00092 00093 if (meta.payloadlength > 0) 00094 { 00095 IBRCOMMON_LOGGER_DEBUG(20) << "fragment: " << (*iter).toString() << IBRCOMMON_LOGGER_ENDL; 00096 00097 try { 00098 // load bundle from storage 00099 dtn::data::Bundle bundle = dtn::core::BundleCore::getInstance().getStorage().get(*iter); 00100 00101 // merge the bundle 00102 c << bundle; 00103 } catch (const dtn::storage::BundleStorage::NoBundleFoundException&) { 00104 IBRCOMMON_LOGGER(error) << "could not load fragment to merge bundle" << IBRCOMMON_LOGGER_ENDL; 00105 }; 00106 } 00107 } 00108 00109 if (c.isComplete()) 00110 { 00111 dtn::data::Bundle &merged = c.getBundle(); 00112 00113 // raise default bundle received event 00114 dtn::net::BundleReceivedEvent::raise(dtn::core::BundleCore::local, merged, true, true); 00115 00116 // delete all fragments of the merged bundle 00117 for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); iter++) 00118 { 00119 dtn::core::BundlePurgeEvent::raise(*iter); 00120 } 00121 } 00122 } 00123 } catch (const ibrcommon::QueueUnblockedException&) { } 00124 } 00125 00126 void FragmentManager::componentDown() 00127 { 00128 unbindEvent(dtn::routing::QueueBundleEvent::className); 00129 unbindEvent(dtn::core::TimeEvent::className); 00130 } 00131 00132 void FragmentManager::raiseEvent(const Event *evt) 00133 { 00134 try { 00135 const dtn::core::TimeEvent &time = dynamic_cast<const dtn::core::TimeEvent&>(*evt); 00136 _fragments.expire(time.getTimestamp()); 00137 FragmentManager::expire_offsets(time.getTimestamp()); 00138 } catch (const std::bad_cast&) {} 00139 00140 try { 00141 const dtn::routing::QueueBundleEvent &queued = dynamic_cast<const dtn::routing::QueueBundleEvent&>(*evt); 00142 00143 // process fragments 00144 if (queued.bundle.fragment) signal(queued.bundle); 00145 } catch (const std::bad_cast&) {} 00146 } 00147 00148 void FragmentManager::signal(const dtn::data::MetaBundle &meta) 00149 { 00150 // do not merge a bundle if it is non-local and singleton 00151 // we only touch local and group bundles which might be delivered locally 00152 if (meta.get(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON)) 00153 { 00154 if (meta.destination.getNode() != dtn::core::BundleCore::local) 00155 { 00156 return; 00157 } 00158 } 00159 00160 // push the meta bundle into the incoming queue 00161 _incoming.push(meta); 00162 } 00163 00164 const std::list<dtn::data::MetaBundle> FragmentManager::search(const dtn::data::MetaBundle &meta) 00165 { 00166 class BundleFilter : public dtn::storage::BundleStorage::BundleFilterCallback 00167 { 00168 public: 00169 BundleFilter(const dtn::data::MetaBundle &meta) 00170 : _similar(meta) 00171 {}; 00172 00173 virtual ~BundleFilter() {}; 00174 00175 virtual size_t limit() const { return 0; }; 00176 00177 virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const 00178 { 00179 // fragments only 00180 if (!meta.fragment) return false; 00181 00182 // with the same unique bundle id 00183 if (meta.source != _similar.source) return false; 00184 if (meta.timestamp != _similar.timestamp) return false; 00185 if (meta.sequencenumber != _similar.sequencenumber) return false; 00186 00187 return true; 00188 }; 00189 00190 private: 00191 const dtn::data::MetaBundle &_similar; 00192 }; 00193 00194 // create a bundle filter 00195 BundleFilter filter(meta); 00196 dtn::storage::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage(); 00197 return storage.get(filter); 00198 } 00199 00200 void FragmentManager::setOffset(const dtn::data::EID &peer, const dtn::data::BundleID &id, size_t abs_offset) 00201 { 00202 try { 00203 Transmission t; 00204 dtn::data::Bundle b = dtn::core::BundleCore::getInstance().getStorage().get(id); 00205 t.offset = get_payload_offset(b, abs_offset); 00206 00207 if (t.offset <= 0) return; 00208 00209 t.id = id; 00210 t.peer = peer; 00211 t.expires = dtn::utils::Clock::getExpireTime( b ); 00212 00213 IBRCOMMON_LOGGER_DEBUG(20) << "[FragmentManager] store offset of partial transmitted bundle " << 00214 id.toString() << "; offset: " << t.offset << " (" << abs_offset << ")" << IBRCOMMON_LOGGER_ENDL; 00215 00216 ibrcommon::MutexLock l(_offsets_mutex); 00217 _offsets.erase(t); 00218 _offsets.insert(t); 00219 } catch (const dtn::storage::BundleStorage::NoBundleFoundException&) { }; 00220 } 00221 00222 size_t FragmentManager::getOffset(const dtn::data::EID &peer, const dtn::data::BundleID &id) 00223 { 00224 ibrcommon::MutexLock l(_offsets_mutex); 00225 for (std::set<Transmission>::const_iterator iter = _offsets.begin(); iter != _offsets.end(); iter++) 00226 { 00227 const Transmission &t = (*iter); 00228 if (t.peer != peer) continue; 00229 if (t.id != id) continue; 00230 return t.offset; 00231 } 00232 00233 return 0; 00234 } 00235 00236 size_t FragmentManager::get_payload_offset(const dtn::data::Bundle &bundle, size_t abs_offset) 00237 { 00238 dtn::data::DefaultSerializer serializer(std::cout); 00239 size_t header = serializer.getLength((dtn::data::PrimaryBlock&)bundle); 00240 00241 const std::list<const dtn::data::Block*> blocks = bundle.getBlocks(); 00242 for (std::list<const dtn::data::Block*>::const_iterator iter = blocks.begin(); iter != blocks.end(); iter++) 00243 { 00244 const dtn::data::Block *b = (*iter); 00245 header += serializer.getLength(*b); 00246 00247 try { 00248 const dtn::data::PayloadBlock &payload = dynamic_cast<const dtn::data::PayloadBlock&>(*b); 00249 header -= payload.getLength(); 00250 if (abs_offset < header) return 0; 00251 return abs_offset - header; 00252 } catch (std::bad_cast&) { }; 00253 } 00254 00255 return 0; 00256 } 00257 00258 void FragmentManager::expire_offsets(size_t timestamp) 00259 { 00260 ibrcommon::MutexLock l(_offsets_mutex); 00261 for (std::set<Transmission>::iterator iter = _offsets.begin(); iter != _offsets.end();) 00262 { 00263 const Transmission &t = (*iter); 00264 if (t.expires >= timestamp) return; 00265 _offsets.erase(iter++); 00266 } 00267 } 00268 00269 FragmentManager::Transmission::Transmission() 00270 : offset(0), expires(0) 00271 { 00272 } 00273 00274 FragmentManager::Transmission::~Transmission() 00275 { 00276 } 00277 00278 bool FragmentManager::Transmission::operator<(const Transmission &other) const 00279 { 00280 if (expires < other.expires) return true; 00281 if (expires != other.expires) return false; 00282 00283 if (peer < other.peer) return true; 00284 if (peer != other.peer) return false; 00285 00286 return (id < other.id); 00287 } 00288 00289 bool FragmentManager::Transmission::operator==(const Transmission &other) const 00290 { 00291 if (expires != other.expires) return false; 00292 if (peer != other.peer) return false; 00293 if (id != other.id) return false; 00294 00295 return true; 00296 } 00297 } /* namespace core */ 00298 } /* namespace dtn */